feat: maximize data completeness across L3/L4/TLS/HTTP layers and add E2E test infra
Add SSL_write uprobe for HTTP response capture, HPACK decoder for HTTP/2 header extraction, and AcceptCache for reliable SSL/TC session correlation. Populate all ClickHouse fields including tcp_meta_options, ip_meta_total_length, syn_to_clienthello_ms, client_headers, TLS cipher suites/extensions, and h2_enable_connect_protocol. Increase BPF capture buffers (HTTP 512B, TLS 1024B). Add distributed E2E testing infrastructure with multi-VM Vagrant setup. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@ -28,6 +28,10 @@ import (
|
||||
// Durée de vie d'une entrée : 5 secondes (suffisant pour une requête HTTP).
|
||||
var fdCache = procutil.NewFDCache(5 * time.Second)
|
||||
|
||||
// acceptCache maps {tgid, fd} → SessionKey depuis les événements accept4.
|
||||
// Prioritaire sur fdCache car source de vérité (tracepoint kernel).
|
||||
var acceptCache = correlation.NewAcceptCache(10 * time.Second)
|
||||
|
||||
// Config décrit la configuration complète du démon ja4ebpf.
|
||||
// Chargée depuis un fichier YAML et enrichie par les variables d'environnement
|
||||
// avec le préfixe JA4EBPF_.
|
||||
@ -64,7 +68,7 @@ func loadConfig(path string) (*Config, error) {
|
||||
cfg.ClickHouse.DSN = "clickhouse://default:@localhost:9000/ja4_logs"
|
||||
cfg.ClickHouse.BatchSize = 500
|
||||
cfg.ClickHouse.FlushSecs = 1
|
||||
cfg.Correlation.TimeoutMS = 500
|
||||
cfg.Correlation.TimeoutMS = 5000
|
||||
cfg.Correlation.SlowlorisMS = 10000
|
||||
cfg.Log.Level = "info"
|
||||
cfg.Log.Format = "json"
|
||||
@ -303,8 +307,10 @@ func consumeSynEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Man
|
||||
|
||||
// struct tcp_syn_event (packed):
|
||||
// src_ip(4)+dst_ip(4)+src_port(2)+dst_port(2)+ttl(1)+df_bit(1)+ip_id(2)+
|
||||
// window_size(2)+window_scale(1)+mss(2)+tcp_options_raw[40]+tcp_options_len(1)+timestamp_ns(8)
|
||||
// offsets: 0 4 8 10 12 13 14 16 18 19 21 61 62
|
||||
// ip_total_length(2)+window_size(2)+window_scale(1)+mss(2)+tcp_options_raw[40]+
|
||||
// tcp_options_len(1)+timestamp_ns(8)
|
||||
// offsets: 0 4 8 10 12 13 14 16 18 19 21 61
|
||||
// total = 62 + 8 = 70
|
||||
if len(record.RawSample) < 70 {
|
||||
continue
|
||||
}
|
||||
@ -329,11 +335,12 @@ func consumeSynEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Man
|
||||
dstIP[2] = byte(dstIPRaw >> 8)
|
||||
dstIP[3] = byte(dstIPRaw)
|
||||
|
||||
// Champs IP/TCP aux offsets corrects (dst_ip occupe les octets 4-7)
|
||||
ttl := data[12]
|
||||
dfBit := data[13] != 0
|
||||
ipID := binary.LittleEndian.Uint16(data[14:16])
|
||||
windowSize := binary.LittleEndian.Uint16(data[16:18])
|
||||
// Champs IP/TCP aux offsets corrects (ip_total_length inséré entre ip_id et window_size)
|
||||
ttl := data[12]
|
||||
dfBit := data[13] != 0
|
||||
ipID := binary.LittleEndian.Uint16(data[14:16])
|
||||
ipTotalLength := binary.LittleEndian.Uint16(data[16:18])
|
||||
windowSize := binary.LittleEndian.Uint16(data[18:20])
|
||||
|
||||
optLen := int(data[61])
|
||||
if optLen > 40 {
|
||||
@ -347,16 +354,17 @@ func consumeSynEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Man
|
||||
|
||||
mgr.Update(key, func(s *correlation.SessionState) {
|
||||
s.L3L4 = &correlation.L3L4{
|
||||
DstIP: dstIP,
|
||||
DstPort: dstPort,
|
||||
TTL: ttl,
|
||||
DFBit: dfBit,
|
||||
IPID: ipID,
|
||||
WindowSize: windowSize,
|
||||
WindowScale: windowScale,
|
||||
MSS: mss,
|
||||
TCPOptionsRaw: tcpOpts,
|
||||
SYNTimestamp: time.Now(),
|
||||
DstIP: dstIP,
|
||||
DstPort: dstPort,
|
||||
TTL: ttl,
|
||||
DFBit: dfBit,
|
||||
IPID: ipID,
|
||||
IPTotalLength: ipTotalLength,
|
||||
WindowSize: windowSize,
|
||||
WindowScale: windowScale,
|
||||
MSS: mss,
|
||||
TCPOptionsRaw: tcpOpts,
|
||||
SYNTimestamp: time.Now(),
|
||||
}
|
||||
// Si TLS est déjà présent (arrivé avant SYN), les deux couches sont disponibles.
|
||||
if s.TLS != nil {
|
||||
@ -491,11 +499,12 @@ func consumeSSLEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Man
|
||||
srcIPRaw := binary.LittleEndian.Uint32(data[12:16])
|
||||
srcPort := binary.LittleEndian.Uint16(data[16:18])
|
||||
|
||||
// data[4096] commence à offset 18, data_len à offset 4114
|
||||
if len(data) < 4118 {
|
||||
// data[4096] commence à offset 18, data_len à offset 4114, direction à offset 4118
|
||||
if len(data) < 4119 {
|
||||
continue
|
||||
}
|
||||
dataLen := binary.LittleEndian.Uint32(data[4114:4118])
|
||||
direction := data[4118] // 0 = SSL_read (client→serveur), 1 = SSL_write (serveur→client)
|
||||
if dataLen > 4096 {
|
||||
dataLen = 4096
|
||||
}
|
||||
@ -504,38 +513,103 @@ func consumeSSLEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Man
|
||||
}
|
||||
sslData := data[18 : 18+dataLen]
|
||||
|
||||
// --- Fallback /proc quand accept4 n'a pas fourni l'IP ---
|
||||
var key correlation.SessionKey
|
||||
var dstIPFromAccept [4]byte
|
||||
var dstPortFromAccept uint16
|
||||
|
||||
if srcIPRaw == 0 && fd != 0 {
|
||||
// ssl_conn_map non peuplé : chercher la clé de session via le cache accept4
|
||||
tgid := uint32(pidTgid >> 32)
|
||||
if tgid == 0 {
|
||||
tgid = uint32(pidTgid) // fallback: utiliser le TID si TGID=0
|
||||
tgid = uint32(pidTgid)
|
||||
}
|
||||
if ip, port, lookupErr := fdCache.Lookup(tgid, fd); lookupErr == nil {
|
||||
ipv4 := ip.To4()
|
||||
if ipv4 != nil {
|
||||
srcIPRaw = uint32(ipv4[0])<<24 | uint32(ipv4[1])<<16 | uint32(ipv4[2])<<8 | uint32(ipv4[3])
|
||||
srcPort = port
|
||||
|
||||
// Priorité 1 : cache accept4 (source de vérité — tracepoint kernel)
|
||||
if skey, dstIP, dstPort, ok := acceptCache.Lookup(tgid, fd); ok {
|
||||
key = skey
|
||||
dstIPFromAccept = dstIP
|
||||
dstPortFromAccept = dstPort
|
||||
} else {
|
||||
// Priorité 2 : fallback /proc (moins fiable — port parfois erroné)
|
||||
if ip, port, lookupErr := fdCache.Lookup(tgid, fd); lookupErr == nil {
|
||||
ipv4 := ip.To4()
|
||||
if ipv4 != nil {
|
||||
key.SrcIP[0] = ipv4[0]
|
||||
key.SrcIP[1] = ipv4[1]
|
||||
key.SrcIP[2] = ipv4[2]
|
||||
key.SrcIP[3] = ipv4[3]
|
||||
key.SrcPort = port
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
key.SrcIP[0] = byte(srcIPRaw >> 24)
|
||||
key.SrcIP[1] = byte(srcIPRaw >> 16)
|
||||
key.SrcIP[2] = byte(srcIPRaw >> 8)
|
||||
key.SrcIP[3] = byte(srcIPRaw)
|
||||
key.SrcPort = srcPort
|
||||
}
|
||||
|
||||
// Ignorer les événements sans IP identifiable (ex: connexions locales non HTTP)
|
||||
if srcIPRaw == 0 && srcPort == 0 {
|
||||
if key.SrcIP == [4]byte{} && key.SrcPort == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
var key correlation.SessionKey
|
||||
key.SrcIP[0] = byte(srcIPRaw >> 24)
|
||||
key.SrcIP[1] = byte(srcIPRaw >> 16)
|
||||
key.SrcIP[2] = byte(srcIPRaw >> 8)
|
||||
key.SrcIP[3] = byte(srcIPRaw)
|
||||
key.SrcPort = srcPort
|
||||
|
||||
counter.Add(1)
|
||||
// === Routeur Magic Bytes ===
|
||||
|
||||
// === Routeur par direction ===
|
||||
// direction=0 (SSL_read) : requêtes du client
|
||||
// direction=1 (SSL_write) : réponses du serveur
|
||||
|
||||
if direction == 1 {
|
||||
// === Serveur → Client : réponses HTTP ===
|
||||
|
||||
// HTTP/1.x response
|
||||
if parser.IsHTTP1Response(sslData) {
|
||||
resp := parser.ParseHTTP1Response(sslData)
|
||||
if resp == nil {
|
||||
continue
|
||||
}
|
||||
mgr.Update(key, func(s *correlation.SessionState) {
|
||||
if len(s.Requests) > 0 {
|
||||
last := &s.Requests[len(s.Requests)-1]
|
||||
if last.StatusCode == 0 {
|
||||
last.StatusCode = resp.StatusCode
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// HTTP/2 server HEADERS frame (contient :status)
|
||||
if parser.IsH2FrameHeader(sslData) {
|
||||
h2kv := parser.ExtractH2HeaderKV(sslData)
|
||||
if statusCode, ok := h2kv[":status"]; ok {
|
||||
mgr.Update(key, func(s *correlation.SessionState) {
|
||||
if len(s.Requests) > 0 {
|
||||
last := &s.Requests[len(s.Requests)-1]
|
||||
if last.StatusCode == 0 {
|
||||
// Conversion du code de statut H2 (ex: "200" → 200)
|
||||
code := 0
|
||||
for _, c := range statusCode {
|
||||
if c >= '0' && c <= '9' {
|
||||
code = code*10 + int(c-'0')
|
||||
}
|
||||
}
|
||||
if code >= 100 && code <= 599 {
|
||||
last.StatusCode = code
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// === Client → Serveur : requêtes HTTP (direction=0) ===
|
||||
|
||||
if parser.DetectH2Preface(sslData) {
|
||||
// HTTP/2 : extraire les paramètres SETTINGS depuis la préface
|
||||
// HTTP/2 : extraire les paramètres SETTINGS et en-têtes depuis la préface
|
||||
afterPreface := sslData
|
||||
if len(afterPreface) > parser.H2MagicPrefaceLen() {
|
||||
afterPreface = sslData[parser.H2MagicPrefaceLen():]
|
||||
@ -557,19 +631,86 @@ func consumeSSLEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Man
|
||||
MaxFrameSize: h2settings.MaxFrameSize,
|
||||
MaxHeaderListSize: h2settings.MaxHeaderListSize,
|
||||
UnknownSettings: h2settings.UnknownSettings,
|
||||
EnableConnectProtocol: h2settings.EnableConnectProtocol,
|
||||
WindowUpdateIncrement: h2settings.WindowUpdateIncrement,
|
||||
PseudoHeaderOrder: h2settings.PseudoHeaderOrder,
|
||||
}
|
||||
// Extraire les en-têtes H2 (User-Agent, Accept, etc.)
|
||||
if len(h2settings.HeaderKV) > 0 {
|
||||
req.HeaderKV = h2settings.HeaderKV
|
||||
req.HeaderOrder = h2settings.HeaderOrder
|
||||
req.HeaderOrderSig = strings.Join(h2settings.HeaderOrder, ";")
|
||||
if h2settings.HeaderKV[":method"] != "" {
|
||||
req.Method = h2settings.HeaderKV[":method"]
|
||||
}
|
||||
if h2settings.HeaderKV[":path"] != "" {
|
||||
p := h2settings.HeaderKV[":path"]
|
||||
if idx := strings.Index(p, "?"); idx >= 0 {
|
||||
req.Path = p[:idx]
|
||||
req.QueryString = p[idx+1:]
|
||||
} else {
|
||||
req.Path = p
|
||||
}
|
||||
}
|
||||
if h2settings.HeaderKV[":authority"] != "" {
|
||||
req.Host = h2settings.HeaderKV[":authority"]
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(s.Requests) == 0 {
|
||||
req.HTTPVersion = "HTTP/2"
|
||||
s.Requests = append(s.Requests, req)
|
||||
}
|
||||
// Si la session n'a pas de L3L4 (pas de SYN capturé),
|
||||
// peupler dst_ip/dst_port depuis le cache accept4
|
||||
if s.L3L4 == nil && (dstIPFromAccept != [4]byte{} || dstPortFromAccept != 0) {
|
||||
s.L3L4 = &correlation.L3L4{
|
||||
DstIP: dstIPFromAccept,
|
||||
DstPort: dstPortFromAccept,
|
||||
}
|
||||
}
|
||||
_ = s.TLS // corrélation implicite
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
// HTTP/2 frames seules (sans préface — SSL_read ultérieurs)
|
||||
if parser.IsH2FrameHeader(sslData) {
|
||||
h2kv := parser.ExtractH2HeaderKV(sslData)
|
||||
if len(h2kv) > 0 {
|
||||
mgr.Update(key, func(s *correlation.SessionState) {
|
||||
if len(s.Requests) > 0 {
|
||||
last := &s.Requests[len(s.Requests)-1]
|
||||
if last.HeaderKV == nil {
|
||||
last.HeaderKV = make(map[string]string)
|
||||
}
|
||||
for k, v := range h2kv {
|
||||
if _, exists := last.HeaderKV[k]; !exists {
|
||||
last.HeaderKV[k] = v
|
||||
}
|
||||
}
|
||||
// Mettre à jour method/path/host si pas encore remplis
|
||||
if last.Method == "" && h2kv[":method"] != "" {
|
||||
last.Method = h2kv[":method"]
|
||||
}
|
||||
if last.Path == "" && h2kv[":path"] != "" {
|
||||
p := h2kv[":path"]
|
||||
if idx := strings.Index(p, "?"); idx >= 0 {
|
||||
last.Path = p[:idx]
|
||||
last.QueryString = p[idx+1:]
|
||||
} else {
|
||||
last.Path = p
|
||||
}
|
||||
}
|
||||
if last.Host == "" && h2kv[":authority"] != "" {
|
||||
last.Host = h2kv[":authority"]
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if parser.IsHTTP1Request(sslData) {
|
||||
// HTTP/1.x : parser la requête
|
||||
req := parser.ParseHTTP1Request(sslData)
|
||||
@ -588,27 +729,20 @@ func consumeSSLEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Man
|
||||
HeaderKV: req.HeaderKV,
|
||||
HTTPVersion: req.Protocol,
|
||||
})
|
||||
// Si la session n'a pas de L3L4 (pas de SYN capturé),
|
||||
// peupler dst_ip/dst_port depuis le cache accept4
|
||||
if s.L3L4 == nil && (dstIPFromAccept != [4]byte{} || dstPortFromAccept != 0) {
|
||||
s.L3L4 = &correlation.L3L4{
|
||||
DstIP: dstIPFromAccept,
|
||||
DstPort: dstPortFromAccept,
|
||||
}
|
||||
}
|
||||
_ = s.TLS // corrélation implicite
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
if parser.IsHTTP1Response(sslData) {
|
||||
// Réponse HTTP/1.x : extraire le code de statut
|
||||
resp := parser.ParseHTTP1Response(sslData)
|
||||
if resp == nil {
|
||||
continue
|
||||
}
|
||||
mgr.Update(key, func(s *correlation.SessionState) {
|
||||
// Mettre à jour le code de statut de la dernière requête
|
||||
if len(s.Requests) > 0 {
|
||||
last := &s.Requests[len(s.Requests)-1]
|
||||
if last.StatusCode == 0 {
|
||||
last.StatusCode = resp.StatusCode
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
// Les réponses HTTP sont maintenant traitées dans le bloc direction=1 ci-dessus
|
||||
}
|
||||
}
|
||||
|
||||
@ -636,6 +770,8 @@ func consumeAcceptEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.
|
||||
continue
|
||||
}
|
||||
|
||||
pidTgid := binary.LittleEndian.Uint64(data[0:8])
|
||||
fd := binary.LittleEndian.Uint32(data[8:12])
|
||||
srcIPRaw := binary.LittleEndian.Uint32(data[12:16])
|
||||
srcPort := binary.LittleEndian.Uint16(data[16:18])
|
||||
|
||||
@ -651,6 +787,10 @@ func consumeAcceptEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.
|
||||
continue
|
||||
}
|
||||
|
||||
// Peupler le cache accept4 pour corrélation SSL
|
||||
tgid := uint32(pidTgid >> 32)
|
||||
acceptCache.Store(tgid, fd, key, [4]byte{}, 0)
|
||||
|
||||
// S'assurer que la session existe
|
||||
mgr.GetOrCreate(key)
|
||||
counter.Add(1)
|
||||
|
||||
Reference in New Issue
Block a user