// Package writer gère l'écriture asynchrone par batch des sessions // corrélées dans ClickHouse. package writer import ( "context" "encoding/json" "fmt" "log" "strings" "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/antitbone/ja4/ja4ebpf/internal/correlation" ) // ClickHouseWriter écrit les sessions corrélées dans ja4_logs.http_logs_raw // via des insertions batch asynchrones. type ClickHouseWriter struct { conn driver.Conn // connexion ClickHouse native ch chan *correlation.SessionState // canal d'entrée des sessions batchSz int // taille d'un batch d'insertion flush time.Duration // intervalle de flush forcé } // sessionRecord est la représentation JSON d'une session pour http_logs_raw. // Les noms de champs JSON correspondent exactement aux clés attendues par le MV mv_http_logs. type sessionRecord struct { Time time.Time `json:"time"` SrcIP string `json:"src_ip"` SrcPort int `json:"src_port"` DstIP string `json:"dst_ip"` DstPort int `json:"dst_port"` // Métadonnées IP (noms attendus par le MV) IPMetaDF *bool `json:"ip_meta_df,omitempty"` IPMetaID *uint16 `json:"ip_meta_id,omitempty"` IPMetaTTL *uint8 `json:"ip_meta_ttl,omitempty"` IPMetaTotalLength *uint16 `json:"ip_meta_total_length,omitempty"` // Métadonnées TCP (noms attendus par le MV) TCPMetaWindowSize *uint16 `json:"tcp_meta_window_size,omitempty"` TCPMetaWindowScale *uint8 `json:"tcp_meta_window_scale,omitempty"` TCPMetaMSS *uint16 `json:"tcp_meta_mss,omitempty"` TCPMetaOptions string `json:"tcp_meta_options,omitempty"` // TLS (noms attendus par le MV) JA4Hash string `json:"ja4,omitempty"` TLSSNI string `json:"tls_sni,omitempty"` TLSALPN string `json:"tls_alpn,omitempty"` TLSVersion string `json:"tls_version,omitempty"` // HTTP Method string `json:"method,omitempty"` Path string `json:"path,omitempty"` QueryString string `json:"query_string,omitempty"` StatusCode *int `json:"status_code,omitempty"` ResponseSize *int64 `json:"response_size,omitempty"` DurationMS *float64 `json:"duration_ms,omitempty"` KeepAlives int `json:"keepalives,omitempty"` HeaderOrderSig string `json:"header_order_signature,omitempty"` } // NewClickHouseWriter crée un writer et établit la connexion ClickHouse. func NewClickHouseWriter(dsn string, batchSize int, flushInterval time.Duration) (*ClickHouseWriter, error) { opts, err := clickhouse.ParseDSN(dsn) if err != nil { return nil, fmt.Errorf("analyse DSN ClickHouse: %w", err) } conn, err := clickhouse.Open(opts) if err != nil { return nil, fmt.Errorf("connexion ClickHouse: %w", err) } // Vérifier la connexion avec un ping limité dans le temps pingCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := conn.Ping(pingCtx); err != nil { conn.Close() return nil, fmt.Errorf("ping ClickHouse: %w", err) } return &ClickHouseWriter{ conn: conn, ch: make(chan *correlation.SessionState, 8192), batchSz: batchSize, flush: flushInterval, }, nil } // Start lance la goroutine de consommation du canal de sessions. // Se termine proprement à l'annulation du contexte. func (w *ClickHouseWriter) Start(ctx context.Context) { go func() { batch := make([]*correlation.SessionState, 0, w.batchSz) ticker := time.NewTicker(w.flush) defer ticker.Stop() defer w.conn.Close() for { select { case s, ok := <-w.ch: if !ok { // Canal fermé : vider le batch restant if len(batch) > 0 { if err := w.flushBatch(ctx, batch); err != nil { log.Printf("[writer] erreur flush final: %v", err) } } return } batch = append(batch, s) if len(batch) >= w.batchSz { if err := w.flushBatch(ctx, batch); err != nil { log.Printf("[writer] erreur flush batch: %v", err) } batch = batch[:0] } case <-ticker.C: if len(batch) > 0 { if err := w.flushBatch(ctx, batch); err != nil { log.Printf("[writer] erreur flush périodique: %v", err) } batch = batch[:0] } case <-ctx.Done(): if len(batch) > 0 { flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) if err := w.flushBatch(flushCtx, batch); err != nil { log.Printf("[writer] erreur flush arrêt: %v", err) } cancel() } return } } }() } // Write envoie une session dans le canal d'écriture (non-bloquant). // Si le canal est plein, la session est abandonnée avec un log d'avertissement. func (w *ClickHouseWriter) Write(s *correlation.SessionState) { select { case w.ch <- s: default: log.Printf("[writer] canal plein, session abandonnée: src=%v:%d", s.Key.SrcIP, s.Key.SrcPort) } } // flushBatch insère un batch de sessions dans ja4_logs.http_logs_raw. // Chaque session est sérialisée en JSON et insérée dans la colonne raw_json. func (w *ClickHouseWriter) flushBatch(ctx context.Context, batch []*correlation.SessionState) error { b, err := w.conn.PrepareBatch(ctx, "INSERT INTO ja4_logs.http_logs_raw (raw_json)") if err != nil { return fmt.Errorf("préparation batch ClickHouse: %w", err) } for _, s := range batch { record := sessionToRecord(s) jsonBytes, err := json.Marshal(record) if err != nil { return fmt.Errorf("sérialisation session JSON: %w", err) } if err := b.Append(string(jsonBytes)); err != nil { return fmt.Errorf("ajout ligne au batch: %w", err) } } if err := b.Send(); err != nil { return fmt.Errorf("envoi batch ClickHouse (%d lignes): %w", len(batch), err) } return nil } // sessionToRecord convertit une SessionState en enregistrement JSON plat. func sessionToRecord(s *correlation.SessionState) sessionRecord { srcIP := fmt.Sprintf("%d.%d.%d.%d", s.Key.SrcIP[0], s.Key.SrcIP[1], s.Key.SrcIP[2], s.Key.SrcIP[3]) rec := sessionRecord{ Time: s.FirstSeen, SrcIP: srcIP, SrcPort: int(s.Key.SrcPort), DstIP: "0.0.0.0", // destination non capturée par les sondes eBPF actuelles DstPort: 0, KeepAlives: len(s.Requests), } // Champs métadonnées IP/TCP if s.L3L4 != nil { rec.IPMetaDF = &s.L3L4.DFBit rec.IPMetaID = &s.L3L4.IPID rec.IPMetaTTL = &s.L3L4.TTL rec.TCPMetaWindowSize = &s.L3L4.WindowSize rec.TCPMetaWindowScale = &s.L3L4.WindowScale rec.TCPMetaMSS = &s.L3L4.MSS } // Champs TLS if s.TLS != nil { rec.JA4Hash = s.TLS.JA4Hash rec.TLSSNI = s.TLS.SNI rec.TLSALPN = strings.Join(s.TLS.ALPN, ",") rec.TLSVersion = formatTLSVersion(s.TLS.TLSVersion) } // Champs HTTP (dernière requête) if len(s.Requests) > 0 { last := &s.Requests[len(s.Requests)-1] rec.Method = last.Method rec.Path = last.Path rec.QueryString = last.QueryString rec.StatusCode = &last.StatusCode rec.ResponseSize = &last.ResponseSize rec.DurationMS = &last.DurationMS rec.HeaderOrderSig = last.HeaderOrderSig } return rec } // formatTLSVersion convertit la valeur numérique TLS en chaîne lisible. func formatTLSVersion(v uint16) string { switch v { case 0x0301: return "TLSv1.0" case 0x0302: return "TLSv1.1" case 0x0303: return "TLSv1.2" case 0x0304: return "TLSv1.3" default: return "" } }