// Package writer gère l'écriture asynchrone par batch des sessions // corrélées dans ClickHouse. package writer import ( "context" "encoding/json" "fmt" "log" "strings" "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/antitbone/ja4/ja4ebpf/internal/correlation" ) // ClickHouseWriter écrit les sessions corrélées dans ja4_logs.http_logs_raw // via des insertions batch asynchrones. type ClickHouseWriter struct { conn driver.Conn // connexion ClickHouse native ch chan *correlation.SessionState // canal d'entrée des sessions batchSz int // taille d'un batch d'insertion flush time.Duration // intervalle de flush forcé } // sessionRecord est la représentation JSON d'une session pour http_logs_raw. // Les noms de champs JSON correspondent exactement aux clés attendues par le MV mv_http_logs. type sessionRecord struct { Time time.Time `json:"time"` SrcIP string `json:"src_ip"` SrcPort int `json:"src_port"` DstIP string `json:"dst_ip"` DstPort int `json:"dst_port"` // Métadonnées IP (noms attendus par le MV) IPMetaDF *bool `json:"ip_meta_df,omitempty"` IPMetaID *uint16 `json:"ip_meta_id,omitempty"` IPMetaTTL *uint8 `json:"ip_meta_ttl,omitempty"` IPMetaTotalLength *uint16 `json:"ip_meta_total_length,omitempty"` // Métadonnées TCP (noms attendus par le MV) TCPMetaWindowSize *uint16 `json:"tcp_meta_window_size,omitempty"` TCPMetaWindowScale *uint8 `json:"tcp_meta_window_scale,omitempty"` TCPMetaMSS *uint16 `json:"tcp_meta_mss,omitempty"` TCPMetaOptions string `json:"tcp_meta_options,omitempty"` // TLS (noms attendus par le MV) JA4Hash string `json:"ja4,omitempty"` JA3Raw string `json:"ja3,omitempty"` JA3Hash string `json:"ja3_hash,omitempty"` TLSSNI string `json:"tls_sni,omitempty"` TLSALPN string `json:"tls_alpn,omitempty"` TLSVersion string `json:"tls_version,omitempty"` // HTTP Method string `json:"method,omitempty"` Path string `json:"path,omitempty"` Host string `json:"host,omitempty"` QueryString string `json:"query_string,omitempty"` Scheme string `json:"scheme,omitempty"` HTTPVersion string `json:"http_version,omitempty"` StatusCode *int `json:"status_code,omitempty"` ResponseSize *int64 `json:"response_size,omitempty"` DurationMS *float64 `json:"duration_ms,omitempty"` KeepAlives int `json:"keepalives,omitempty"` HeaderOrderSig string `json:"header_order_signature,omitempty"` HeadersRaw string `json:"headers_raw,omitempty"` HeaderUserAgent string `json:"header_User-Agent,omitempty"` HeaderAccept string `json:"header_Accept,omitempty"` HeaderAcceptEnc string `json:"header_Accept-Encoding,omitempty"` HeaderAcceptLang string `json:"header_Accept-Language,omitempty"` HeaderContentType string `json:"header_Content-Type,omitempty"` HeaderXReqID string `json:"header_X-Request-Id,omitempty"` HeaderXTraceID string `json:"header_X-Trace-Id,omitempty"` HeaderXForwarded string `json:"header_X-Forwarded-For,omitempty"` HeaderSecCHUA string `json:"header_Sec-CH-UA,omitempty"` HeaderSecCHUAMobile string `json:"header_Sec-CH-UA-Mobile,omitempty"` HeaderSecCHUAPlat string `json:"header_Sec-CH-UA-Platform,omitempty"` HeaderSecFetchDest string `json:"header_Sec-Fetch-Dest,omitempty"` HeaderSecFetchMode string `json:"header_Sec-Fetch-Mode,omitempty"` HeaderSecFetchSite string `json:"header_Sec-Fetch-Site,omitempty"` // HTTP/2 fingerprinting passif H2Fingerprint string `json:"h2_fingerprint,omitempty"` H2SettingsFP string `json:"h2_settings_fp,omitempty"` H2WindowUpdate uint32 `json:"h2_window_update,omitempty"` H2PseudoOrder string `json:"h2_pseudo_order,omitempty"` H2HasPriority uint8 `json:"h2_has_priority,omitempty"` H2HeaderTableSize *int32 `json:"h2_header_table_size,omitempty"` H2EnablePush *int32 `json:"h2_enable_push,omitempty"` H2MaxConcurrentStreams *int32 `json:"h2_max_concurrent_streams,omitempty"` H2InitialWindowSize *int64 `json:"h2_initial_window_size,omitempty"` H2MaxFrameSize *int32 `json:"h2_max_frame_size,omitempty"` H2MaxHeaderListSize *int32 `json:"h2_max_header_list_size,omitempty"` H2EnableConnectProtocol *int32 `json:"h2_enable_connect_protocol,omitempty"` } // NewClickHouseWriter crée un writer et établit la connexion ClickHouse. func NewClickHouseWriter(dsn string, batchSize int, flushInterval time.Duration) (*ClickHouseWriter, error) { opts, err := clickhouse.ParseDSN(dsn) if err != nil { return nil, fmt.Errorf("analyse DSN ClickHouse: %w", err) } conn, err := clickhouse.Open(opts) if err != nil { return nil, fmt.Errorf("connexion ClickHouse: %w", err) } // Vérifier la connexion avec un ping limité dans le temps pingCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := conn.Ping(pingCtx); err != nil { conn.Close() return nil, fmt.Errorf("ping ClickHouse: %w", err) } return &ClickHouseWriter{ conn: conn, ch: make(chan *correlation.SessionState, 8192), batchSz: batchSize, flush: flushInterval, }, nil } // Start lance la goroutine de consommation du canal de sessions. // Se termine proprement à l'annulation du contexte. func (w *ClickHouseWriter) Start(ctx context.Context) { go func() { batch := make([]*correlation.SessionState, 0, w.batchSz) ticker := time.NewTicker(w.flush) defer ticker.Stop() defer w.conn.Close() for { select { case s, ok := <-w.ch: if !ok { // Canal fermé : vider le batch restant if len(batch) > 0 { if err := w.flushBatch(ctx, batch); err != nil { log.Printf("[writer] erreur flush final: %v", err) } } return } batch = append(batch, s) if len(batch) >= w.batchSz { if err := w.flushBatch(ctx, batch); err != nil { log.Printf("[writer] erreur flush batch: %v", err) } batch = batch[:0] } case <-ticker.C: if len(batch) > 0 { if err := w.flushBatch(ctx, batch); err != nil { log.Printf("[writer] erreur flush périodique: %v", err) } batch = batch[:0] } case <-ctx.Done(): if len(batch) > 0 { flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) if err := w.flushBatch(flushCtx, batch); err != nil { log.Printf("[writer] erreur flush arrêt: %v", err) } cancel() } return } } }() } // Write envoie une session dans le canal d'écriture (non-bloquant). // Si le canal est plein, la session est abandonnée avec un log d'avertissement. func (w *ClickHouseWriter) Write(s *correlation.SessionState) { select { case w.ch <- s: default: log.Printf("[writer] canal plein, session abandonnée: src=%v:%d", s.Key.SrcIP, s.Key.SrcPort) } } // flushBatch insère un batch de sessions dans ja4_logs.http_logs_raw. // Chaque session est sérialisée en JSON et insérée dans la colonne raw_json. func (w *ClickHouseWriter) flushBatch(ctx context.Context, batch []*correlation.SessionState) error { b, err := w.conn.PrepareBatch(ctx, "INSERT INTO ja4_logs.http_logs_raw (raw_json)") if err != nil { return fmt.Errorf("préparation batch ClickHouse: %w", err) } for _, s := range batch { record := sessionToRecord(s) jsonBytes, err := json.Marshal(record) if err != nil { return fmt.Errorf("sérialisation session JSON: %w", err) } if err := b.Append(string(jsonBytes)); err != nil { return fmt.Errorf("ajout ligne au batch: %w", err) } } if err := b.Send(); err != nil { return fmt.Errorf("envoi batch ClickHouse (%d lignes): %w", len(batch), err) } return nil } // sessionToRecord convertit une SessionState en enregistrement JSON plat. func sessionToRecord(s *correlation.SessionState) sessionRecord { srcIP := fmt.Sprintf("%d.%d.%d.%d", s.Key.SrcIP[0], s.Key.SrcIP[1], s.Key.SrcIP[2], s.Key.SrcIP[3]) rec := sessionRecord{ Time: s.FirstSeen, SrcIP: srcIP, SrcPort: int(s.Key.SrcPort), KeepAlives: len(s.Requests), } // Champs métadonnées IP/TCP if s.L3L4 != nil { rec.DstIP = fmt.Sprintf("%d.%d.%d.%d", s.L3L4.DstIP[0], s.L3L4.DstIP[1], s.L3L4.DstIP[2], s.L3L4.DstIP[3]) rec.DstPort = int(s.L3L4.DstPort) rec.IPMetaDF = &s.L3L4.DFBit rec.IPMetaID = &s.L3L4.IPID rec.IPMetaTTL = &s.L3L4.TTL rec.TCPMetaWindowSize = &s.L3L4.WindowSize // WindowScale 0xFF = absent (convention C), ne pas inclure if s.L3L4.WindowScale != 0xFF { rec.TCPMetaWindowScale = &s.L3L4.WindowScale } // MSS 0 = absent, ne pas inclure if s.L3L4.MSS > 0 { rec.TCPMetaMSS = &s.L3L4.MSS } } // Champs TLS if s.TLS != nil { rec.JA4Hash = s.TLS.JA4Hash rec.JA3Raw = s.TLS.JA3Raw rec.JA3Hash = s.TLS.JA3Hash rec.TLSSNI = s.TLS.SNI rec.TLSALPN = strings.Join(s.TLS.ALPN, ",") rec.TLSVersion = formatTLSVersion(s.TLS.TLSVersion) // Fallback : si pas de Host HTTP, utiliser TLS SNI if rec.Host == "" && s.TLS.SNI != "" { rec.Host = s.TLS.SNI } // Scheme déduit de la présence TLS if s.TLS.SNI != "" { rec.Scheme = "https" } } // Champs HTTP (dernière requête) if len(s.Requests) > 0 { last := &s.Requests[len(s.Requests)-1] rec.Method = last.Method rec.Path = last.Path rec.QueryString = last.QueryString rec.Host = last.Host if last.Host != "" && s.TLS != nil { rec.Scheme = "https" } rec.HTTPVersion = last.HTTPVersion rec.StatusCode = &last.StatusCode rec.ResponseSize = &last.ResponseSize rec.DurationMS = &last.DurationMS rec.HeaderOrderSig = last.HeaderOrderSig // En-têtes HTTP individuels if last.HeaderKV != nil { rec.HeaderUserAgent = last.HeaderKV["User-Agent"] rec.HeaderAccept = last.HeaderKV["Accept"] rec.HeaderAcceptEnc = last.HeaderKV["Accept-Encoding"] rec.HeaderAcceptLang = last.HeaderKV["Accept-Language"] rec.HeaderContentType = last.HeaderKV["Content-Type"] rec.HeaderXReqID = last.HeaderKV["X-Request-Id"] rec.HeaderXTraceID = last.HeaderKV["X-Trace-Id"] rec.HeaderXForwarded = last.HeaderKV["X-Forwarded-For"] rec.HeaderSecCHUA = last.HeaderKV["Sec-CH-UA"] rec.HeaderSecCHUAMobile = last.HeaderKV["Sec-CH-UA-Mobile"] rec.HeaderSecCHUAPlat = last.HeaderKV["Sec-CH-UA-Platform"] rec.HeaderSecFetchDest = last.HeaderKV["Sec-Fetch-Dest"] rec.HeaderSecFetchMode = last.HeaderKV["Sec-Fetch-Mode"] rec.HeaderSecFetchSite = last.HeaderKV["Sec-Fetch-Site"] } // Construire headers_raw : ordre des noms joints par ";" if len(last.HeaderOrder) > 0 { rec.HeadersRaw = strings.Join(last.HeaderOrder, ";") } // Champs HTTP/2 passifs if last.HTTP2Settings != nil { h2 := last.HTTP2Settings rec.H2WindowUpdate = h2.WindowUpdateIncrement // Ordre des pseudo-headers → notation abrégée "m,a,s,p" if len(h2.PseudoHeaderOrder) > 0 { rec.H2PseudoOrder = pseudoOrderToShort(h2.PseudoHeaderOrder) } // Paramètres SETTINGS individuels (pointeurs : nil = absent du preface) rec.H2HeaderTableSize = &h2.HeaderTableSize rec.H2EnablePush = &h2.EnablePush rec.H2MaxConcurrentStreams = &h2.MaxConcurrentStreams h2InitWin := int64(h2.InitialWindowSize) rec.H2InitialWindowSize = &h2InitWin rec.H2MaxFrameSize = &h2.MaxFrameSize rec.H2MaxHeaderListSize = &h2.MaxHeaderListSize // Fingerprints composites Akamai rec.H2Fingerprint = buildH2Fingerprint(h2) rec.H2SettingsFP = buildH2SettingsFP(h2) } } return rec } // pseudoOrderToShort convertit la liste de pseudo-headers en notation abrégée. // Ex: [":method", ":authority", ":scheme", ":path"] → "m,a,s,p" func pseudoOrderToShort(headers []string) string { short := make([]byte, 0, len(headers)*2-1) for i, h := range headers { if i > 0 { short = append(short, ',') } switch { case h == ":method": short = append(short, 'm') case h == ":authority": short = append(short, 'a') case h == ":scheme": short = append(short, 's') case h == ":path": short = append(short, 'p') default: short = append(short, '?') } } return string(short) } // buildH2Fingerprint construit le fingerprint composite au format Akamai. // Format : SETTINGS[pairs]|WINDOW_UPDATE[value]|PRIORITY[0/1]|PSEUDO_ORDER[order] func buildH2Fingerprint(h2 *correlation.HTTP2Settings) string { var b strings.Builder // SETTINGS b.WriteString("1:") b.WriteString(fmt.Sprintf("%d", h2.HeaderTableSize)) b.WriteString(",2:") b.WriteString(fmt.Sprintf("%d", h2.EnablePush)) if h2.MaxConcurrentStreams >= 0 { b.WriteString(",3:") b.WriteString(fmt.Sprintf("%d", h2.MaxConcurrentStreams)) } b.WriteString(",4:") b.WriteString(fmt.Sprintf("%d", h2.InitialWindowSize)) if h2.MaxFrameSize >= 0 { b.WriteString(",5:") b.WriteString(fmt.Sprintf("%d", h2.MaxFrameSize)) } if h2.MaxHeaderListSize >= 0 { b.WriteString(",6:") b.WriteString(fmt.Sprintf("%d", h2.MaxHeaderListSize)) } // WINDOW_UPDATE b.WriteByte('|') if h2.WindowUpdateIncrement > 0 { b.WriteString(fmt.Sprintf("%d", h2.WindowUpdateIncrement)) } // PRIORITY (non capturé actuellement) b.WriteString("|0") // PSEUDO_ORDER b.WriteByte('|') if len(h2.PseudoHeaderOrder) > 0 { b.WriteString(pseudoOrderToShort(h2.PseudoHeaderOrder)) } return b.String() } // buildH2SettingsFP construit la chaîne brute des entrées SETTINGS. func buildH2SettingsFP(h2 *correlation.HTTP2Settings) string { var parts []string if h2.MaxConcurrentStreams >= 0 { parts = append(parts, fmt.Sprintf("3:%d", h2.MaxConcurrentStreams)) } if h2.InitialWindowSize >= 0 { parts = append(parts, fmt.Sprintf("4:%d", h2.InitialWindowSize)) } if h2.EnablePush >= 0 { parts = append(parts, fmt.Sprintf("2:%d", h2.EnablePush)) } return strings.Join(parts, ",") } // formatTLSVersion convertit la valeur numérique TLS en chaîne lisible. func formatTLSVersion(v uint16) string { switch v { case 0x0301: return "TLSv1.0" case 0x0302: return "TLSv1.1" case 0x0303: return "TLSv1.2" case 0x0304: return "TLSv1.3" default: return "" } }