Correctifs pipeline L7 (uprobe SSL_read) :
- uprobe_ssl.c : ssl_set_fd ne retourne plus tôt quand fd_conn_map est
vide (accept4 non disponible en Docker). Sauvegarde ssl_ptr→{fd,0,0}
pour permettre le fallback /proc côté Go.
- main.go : consumeSSLEvents reécrit avec routeur magic-bytes complet :
* HTTP/2 preface → extraction SETTINGS + conversion correlation.HTTP2Settings
* HTTP/1.x requête → method, path, query, headers, header_order_sig
* HTTP/1.x réponse → status_code
* Fallback /proc/<tgid>/fd/<fd> quand src_ip=0 (accept4 absent)
- writer/clickhouse.go : export header_order_signature ajouté
Nouveaux packages :
- internal/parser/http1.go : parseur HTTP/1.x (IsHTTP1Request,
ParseHTTP1Request, IsHTTP1Response, ParseHTTP1Response)
- internal/parser/http1_test.go : 11 tests unitaires (28 total passent)
- internal/procutil/proc_lookup.go : résolution fd→IP via /proc avec cache
TTL 5s (FDCache). Supporte /proc/PID/net/tcp et tcp6, IPv4-mappé IPv6.
Infrastructure tests VM (tests/vm/) :
- Vagrantfile : VM Rocky Linux 9 KVM, 4 CPU / 4 GB RAM
- provision.sh : installation toolchain eBPF + Go + Docker + nginx
- run-tests-vm.sh : suite de test complète dans la VM (L3/L4+TLS+L7)
- README.md : guide d'installation et d'utilisation
- Makefile : cibles vm-up, vm-down, vm-ssh, test-vm-nginx, test-vm-all,
vm-rebuild-ja4ebpf
Corrections stack Docker :
- Dockerfiles nginx/apache/nginx-varnish/hitch-varnish : suppression des
références à shared/go/ja4common/ (répertoire supprimé)
- clickhouse-init.sh : restauré depuis git, seed anubis_ua_rules obsolète
supprimé (table REGEXP_TREE supprimée du schéma)
- traffic-gen : ajout HTTP/1.0 (http.client) et HTTP/2 (httpx)
- verify_db.py : script de vérification 35 checks (L3/L4/TLS/L7/corrélation)
- run-stack-tests.sh : phase 6 verify_db ajoutée
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
248 lines
7.5 KiB
Go
248 lines
7.5 KiB
Go
// Package writer gère l'écriture asynchrone par batch des sessions
|
|
// corrélées dans ClickHouse.
|
|
package writer
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/ClickHouse/clickhouse-go/v2"
|
|
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
|
"github.com/antitbone/ja4/ja4ebpf/internal/correlation"
|
|
)
|
|
|
|
// ClickHouseWriter écrit les sessions corrélées dans ja4_logs.http_logs_raw
|
|
// via des insertions batch asynchrones.
|
|
type ClickHouseWriter struct {
|
|
conn driver.Conn // connexion ClickHouse native
|
|
ch chan *correlation.SessionState // canal d'entrée des sessions
|
|
batchSz int // taille d'un batch d'insertion
|
|
flush time.Duration // intervalle de flush forcé
|
|
}
|
|
|
|
// sessionRecord est la représentation JSON d'une session pour http_logs_raw.
|
|
// Les noms de champs JSON correspondent exactement aux clés attendues par le MV mv_http_logs.
|
|
type sessionRecord struct {
|
|
Time time.Time `json:"time"`
|
|
SrcIP string `json:"src_ip"`
|
|
SrcPort int `json:"src_port"`
|
|
DstIP string `json:"dst_ip"`
|
|
DstPort int `json:"dst_port"`
|
|
Correlated int `json:"correlated"`
|
|
|
|
// Métadonnées IP (noms attendus par le MV)
|
|
IPMetaDF *bool `json:"ip_meta_df,omitempty"`
|
|
IPMetaID *uint16 `json:"ip_meta_id,omitempty"`
|
|
IPMetaTTL *uint8 `json:"ip_meta_ttl,omitempty"`
|
|
IPMetaTotalLength *uint16 `json:"ip_meta_total_length,omitempty"`
|
|
|
|
// Métadonnées TCP (noms attendus par le MV)
|
|
TCPMetaWindowSize *uint16 `json:"tcp_meta_window_size,omitempty"`
|
|
TCPMetaWindowScale *uint8 `json:"tcp_meta_window_scale,omitempty"`
|
|
TCPMetaMSS *uint16 `json:"tcp_meta_mss,omitempty"`
|
|
TCPMetaOptions string `json:"tcp_meta_options,omitempty"`
|
|
|
|
// TLS (noms attendus par le MV)
|
|
JA4Hash string `json:"ja4,omitempty"`
|
|
TLSSNI string `json:"tls_sni,omitempty"`
|
|
TLSALPN string `json:"tls_alpn,omitempty"`
|
|
TLSVersion string `json:"tls_version,omitempty"`
|
|
|
|
// HTTP
|
|
Method string `json:"method,omitempty"`
|
|
Path string `json:"path,omitempty"`
|
|
QueryString string `json:"query_string,omitempty"`
|
|
StatusCode *int `json:"status_code,omitempty"`
|
|
ResponseSize *int64 `json:"response_size,omitempty"`
|
|
DurationMS *float64 `json:"duration_ms,omitempty"`
|
|
KeepAlives int `json:"keepalives,omitempty"`
|
|
HeaderOrderSig string `json:"header_order_signature,omitempty"`
|
|
}
|
|
|
|
// NewClickHouseWriter crée un writer et établit la connexion ClickHouse.
|
|
func NewClickHouseWriter(dsn string, batchSize int, flushInterval time.Duration) (*ClickHouseWriter, error) {
|
|
opts, err := clickhouse.ParseDSN(dsn)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("analyse DSN ClickHouse: %w", err)
|
|
}
|
|
|
|
conn, err := clickhouse.Open(opts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("connexion ClickHouse: %w", err)
|
|
}
|
|
|
|
// Vérifier la connexion avec un ping limité dans le temps
|
|
pingCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if err := conn.Ping(pingCtx); err != nil {
|
|
conn.Close()
|
|
return nil, fmt.Errorf("ping ClickHouse: %w", err)
|
|
}
|
|
|
|
return &ClickHouseWriter{
|
|
conn: conn,
|
|
ch: make(chan *correlation.SessionState, 8192),
|
|
batchSz: batchSize,
|
|
flush: flushInterval,
|
|
}, nil
|
|
}
|
|
|
|
// Start lance la goroutine de consommation du canal de sessions.
|
|
// Se termine proprement à l'annulation du contexte.
|
|
func (w *ClickHouseWriter) Start(ctx context.Context) {
|
|
go func() {
|
|
batch := make([]*correlation.SessionState, 0, w.batchSz)
|
|
ticker := time.NewTicker(w.flush)
|
|
defer ticker.Stop()
|
|
defer w.conn.Close()
|
|
|
|
for {
|
|
select {
|
|
case s, ok := <-w.ch:
|
|
if !ok {
|
|
// Canal fermé : vider le batch restant
|
|
if len(batch) > 0 {
|
|
if err := w.flushBatch(ctx, batch); err != nil {
|
|
log.Printf("[writer] erreur flush final: %v", err)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
batch = append(batch, s)
|
|
if len(batch) >= w.batchSz {
|
|
if err := w.flushBatch(ctx, batch); err != nil {
|
|
log.Printf("[writer] erreur flush batch: %v", err)
|
|
}
|
|
batch = batch[:0]
|
|
}
|
|
|
|
case <-ticker.C:
|
|
if len(batch) > 0 {
|
|
if err := w.flushBatch(ctx, batch); err != nil {
|
|
log.Printf("[writer] erreur flush périodique: %v", err)
|
|
}
|
|
batch = batch[:0]
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
if len(batch) > 0 {
|
|
flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
if err := w.flushBatch(flushCtx, batch); err != nil {
|
|
log.Printf("[writer] erreur flush arrêt: %v", err)
|
|
}
|
|
cancel()
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Write envoie une session dans le canal d'écriture (non-bloquant).
|
|
// Si le canal est plein, la session est abandonnée avec un log d'avertissement.
|
|
func (w *ClickHouseWriter) Write(s *correlation.SessionState) {
|
|
select {
|
|
case w.ch <- s:
|
|
default:
|
|
log.Printf("[writer] canal plein, session abandonnée: src=%v:%d", s.Key.SrcIP, s.Key.SrcPort)
|
|
}
|
|
}
|
|
|
|
// flushBatch insère un batch de sessions dans ja4_logs.http_logs_raw.
|
|
// Chaque session est sérialisée en JSON et insérée dans la colonne raw_json.
|
|
func (w *ClickHouseWriter) flushBatch(ctx context.Context, batch []*correlation.SessionState) error {
|
|
b, err := w.conn.PrepareBatch(ctx, "INSERT INTO ja4_logs.http_logs_raw (raw_json)")
|
|
if err != nil {
|
|
return fmt.Errorf("préparation batch ClickHouse: %w", err)
|
|
}
|
|
|
|
for _, s := range batch {
|
|
record := sessionToRecord(s)
|
|
jsonBytes, err := json.Marshal(record)
|
|
if err != nil {
|
|
return fmt.Errorf("sérialisation session JSON: %w", err)
|
|
}
|
|
if err := b.Append(string(jsonBytes)); err != nil {
|
|
return fmt.Errorf("ajout ligne au batch: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := b.Send(); err != nil {
|
|
return fmt.Errorf("envoi batch ClickHouse (%d lignes): %w", len(batch), err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// sessionToRecord convertit une SessionState en enregistrement JSON plat.
|
|
func sessionToRecord(s *correlation.SessionState) sessionRecord {
|
|
srcIP := fmt.Sprintf("%d.%d.%d.%d",
|
|
s.Key.SrcIP[0], s.Key.SrcIP[1], s.Key.SrcIP[2], s.Key.SrcIP[3])
|
|
|
|
correlated := 0
|
|
if s.Correlated {
|
|
correlated = 1
|
|
}
|
|
|
|
rec := sessionRecord{
|
|
Time: s.FirstSeen,
|
|
SrcIP: srcIP,
|
|
SrcPort: int(s.Key.SrcPort),
|
|
DstIP: "0.0.0.0", // destination non capturée par les sondes eBPF actuelles
|
|
DstPort: 0,
|
|
Correlated: correlated,
|
|
KeepAlives: len(s.Requests),
|
|
}
|
|
|
|
// Champs métadonnées IP/TCP
|
|
if s.L3L4 != nil {
|
|
rec.IPMetaDF = &s.L3L4.DFBit
|
|
rec.IPMetaID = &s.L3L4.IPID
|
|
rec.IPMetaTTL = &s.L3L4.TTL
|
|
rec.TCPMetaWindowSize = &s.L3L4.WindowSize
|
|
rec.TCPMetaWindowScale = &s.L3L4.WindowScale
|
|
rec.TCPMetaMSS = &s.L3L4.MSS
|
|
}
|
|
|
|
// Champs TLS
|
|
if s.TLS != nil {
|
|
rec.JA4Hash = s.TLS.JA4Hash
|
|
rec.TLSSNI = s.TLS.SNI
|
|
rec.TLSALPN = strings.Join(s.TLS.ALPN, ",")
|
|
rec.TLSVersion = formatTLSVersion(s.TLS.TLSVersion)
|
|
}
|
|
|
|
// Champs HTTP (dernière requête)
|
|
if len(s.Requests) > 0 {
|
|
last := &s.Requests[len(s.Requests)-1]
|
|
rec.Method = last.Method
|
|
rec.Path = last.Path
|
|
rec.QueryString = last.QueryString
|
|
rec.StatusCode = &last.StatusCode
|
|
rec.ResponseSize = &last.ResponseSize
|
|
rec.DurationMS = &last.DurationMS
|
|
rec.HeaderOrderSig = last.HeaderOrderSig
|
|
}
|
|
|
|
return rec
|
|
}
|
|
|
|
// formatTLSVersion convertit la valeur numérique TLS en chaîne lisible.
|
|
func formatTLSVersion(v uint16) string {
|
|
switch v {
|
|
case 0x0301:
|
|
return "TLSv1.0"
|
|
case 0x0302:
|
|
return "TLSv1.1"
|
|
case 0x0303:
|
|
return "TLSv1.2"
|
|
case 0x0304:
|
|
return "TLSv1.3"
|
|
default:
|
|
return ""
|
|
}
|
|
}
|