fix(ja4ebpf): fix TLS capture, SYN offsets, TCP option parsing

- Increase MAX_TLS_PAYLOAD from 512 to 2048 bytes to capture full
  TLS ClientHellos (modern browsers/curl send 1000-1543 byte ClientHellos)
- Fix ParseClientHello to tolerate XDP-truncated payloads: clamp
  recordLength and chLen to available data instead of returning error
- Fix cipher suites, compression, extensions truncation to use clamping
- Fix consumeSynEvents struct field offsets: dst_ip (4 bytes at offset 4)
  was not accounted for, causing all L3/L4 metadata to be read from
  wrong positions (TTL was actually dst_ip[0], windowSize was dst_port, etc.)
- Add parseTCPOptions() to extract MSS and Window Scale from raw TCP options
  (C code sets defaults of mss=0, window_scale=0xFF, expects Go to parse)
- Fix consumeAcceptEvents: skip zero-IP events to avoid phantom sessions
- Fix consumeSSLEvents: filter zero-IP/port events when proc fallback fails
- Add missing consumeHTTPPlainEvents goroutine (was defined but never called)
- Fix race condition: SYN consumer sets Correlated=true if TLS already present
- Update tls_hello_event struct offsets in Go consumer (payload_len now at
  offset 2054, was 518, due to payload array growing from 512 to 2048 bytes)
- Remove debug logging from consumers and GC

E2E verified: HTTP plain (port 80) and HTTPS (port 443) both produce
fully correlated sessions in ClickHouse with correct:
  - ip_meta_ttl=64, ip_meta_df=true, ip_meta_id
  - tcp_meta_window_size=64240, tcp_meta_window_scale=10, tcp_meta_mss=1460
  - ja4=t13i3010_1d37bd780c83_95d2a80e6515
  - tls_alpn=http/1.1
  - method=GET, path=/, header_order_signature=Host;User-Agent;Accept
  - correlated=1

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
toto
2026-04-12 04:16:44 +02:00
parent f85a10b012
commit b1218a2367
12 changed files with 715 additions and 248 deletions

View File

@ -163,6 +163,7 @@ func main() {
go consumeTLSEvents(ctx, ldr.TLSReader, mgr)
go consumeSSLEvents(ctx, ldr.SSLReader, mgr)
go consumeAcceptEvents(ctx, ldr.AcceptReader, mgr)
go consumeHTTPPlainEvents(ctx, ldr.HTTPPlainReader, mgr)
log.Printf("[ja4ebpf] démon actif — en attente des événements")
@ -177,6 +178,46 @@ func main() {
log.Printf("[ja4ebpf] arrêt terminé")
}
// parseTCPOptions extrait le MSS et le Window Scale depuis les options TCP brutes.
// Les options TCP suivent le format TLV (Type-Length-Value), sauf les options 0 et 1.
// Retourne (mss=0, windowScale=0xFF) si les options sont absentes ou mal formées.
func parseTCPOptions(opts []byte) (mss uint16, windowScale uint8) {
windowScale = 0xFF // 0xFF = absent (convention C)
i := 0
for i < len(opts) {
kind := opts[i]
i++
switch kind {
case 0: // End of Options
return
case 1: // NOP — padding, pas de longueur
continue
default:
if i >= len(opts) {
return
}
length := int(opts[i])
i++
if length < 2 || i+length-2 > len(opts) {
return // option malformée
}
val := opts[i : i+length-2]
switch kind {
case 2: // MSS
if len(val) >= 2 {
mss = binary.BigEndian.Uint16(val[0:2])
}
case 3: // Window Scale
if len(val) >= 1 {
windowScale = val[0]
}
}
i += length - 2
}
}
return
}
// consumeSynEvents lit les événements TCP SYN depuis le ring buffer
// et met à jour l'état L3/L4 des sessions.
func consumeSynEvents(ctx context.Context, rd *ringbuf.Reader, mgr *correlation.Manager) {
@ -195,14 +236,17 @@ func consumeSynEvents(ctx context.Context, rd *ringbuf.Reader, mgr *correlation.
continue
}
// Taille minimale attendue (voir struct tcp_syn_event)
if len(record.RawSample) < 20 {
// struct tcp_syn_event (packed):
// src_ip(4)+dst_ip(4)+src_port(2)+dst_port(2)+ttl(1)+df_bit(1)+ip_id(2)+
// window_size(2)+window_scale(1)+mss(2)+tcp_options_raw[40]+tcp_options_len(1)+timestamp_ns(8)
// offsets: 0 4 8 10 12 13 14 16 18 19 21 61 62
if len(record.RawSample) < 62 {
continue
}
data := record.RawSample
// Décoder les champs de tcp_syn_event
srcIPRaw := binary.BigEndian.Uint32(data[0:4])
// src_ip et src_port stockés en host byte order (bpf_ntohl/bpf_ntohs dans BPF C).
srcIPRaw := binary.LittleEndian.Uint32(data[0:4])
srcPort := binary.LittleEndian.Uint16(data[8:10])
var key correlation.SessionKey
@ -212,19 +256,21 @@ func consumeSynEvents(ctx context.Context, rd *ringbuf.Reader, mgr *correlation.
key.SrcIP[3] = byte(srcIPRaw)
key.SrcPort = srcPort
ttl := data[4]
dfBit := data[5] != 0
ipID := binary.LittleEndian.Uint16(data[6:8])
windowSize := binary.LittleEndian.Uint16(data[10:12])
windowScale := data[12]
mss := binary.LittleEndian.Uint16(data[13:15])
// Champs IP/TCP aux offsets corrects (dst_ip occupe les octets 4-7)
ttl := data[12]
dfBit := data[13] != 0
ipID := binary.LittleEndian.Uint16(data[14:16])
windowSize := binary.LittleEndian.Uint16(data[16:18])
optLen := int(data[55])
optLen := int(data[61])
if optLen > 40 {
optLen = 40
}
tcpOpts := make([]byte, optLen)
copy(tcpOpts, data[15:15+optLen])
copy(tcpOpts, data[21:21+optLen])
// Analyser les options TCP brutes pour extraire MSS et Window Scale
mss, windowScale := parseTCPOptions(tcpOpts)
mgr.Update(key, func(s *correlation.SessionState) {
s.L3L4 = &correlation.L3L4{
@ -237,6 +283,10 @@ func consumeSynEvents(ctx context.Context, rd *ringbuf.Reader, mgr *correlation.
TCPOptionsRaw: tcpOpts,
SYNTimestamp: time.Now(),
}
// Si TLS est déjà présent (arrivé avant SYN), marquer la session corrélée.
if s.TLS != nil {
s.Correlated = true
}
})
}
}
@ -259,18 +309,20 @@ func consumeTLSEvents(ctx context.Context, rd *ringbuf.Reader, mgr *correlation.
continue
}
// Taille minimale : src_ip(4) + src_port(2) + payload[512] + payload_len(2)
if len(record.RawSample) < 8 {
// struct tls_hello_event (packed):
// src_ip(4) + src_port(2) + payload[2048] + payload_len(2) + timestamp_ns(8)
// offsets: 0 4 6 2054 2056
if len(record.RawSample) < 2056 {
continue
}
data := record.RawSample
srcIPRaw := binary.BigEndian.Uint32(data[0:4])
srcPort := binary.LittleEndian.Uint16(data[4:6])
payloadLen := binary.LittleEndian.Uint16(data[518:520])
srcIPRaw := binary.LittleEndian.Uint32(data[0:4])
srcPort := binary.LittleEndian.Uint16(data[4:6])
payloadLen := binary.LittleEndian.Uint16(data[2054:2056])
if int(payloadLen) > 512 {
payloadLen = 512
if int(payloadLen) > 2048 {
payloadLen = 2048
}
payload := make([]byte, payloadLen)
copy(payload, data[6:6+payloadLen])
@ -285,6 +337,7 @@ func consumeTLSEvents(ctx context.Context, rd *ringbuf.Reader, mgr *correlation.
// Parser le ClientHello et calculer JA4
ch, err := parser.ParseClientHello(payload)
if err != nil {
log.Printf("[warn] TLS parse error: %v", err)
continue
}
@ -500,7 +553,92 @@ func consumeAcceptEvents(ctx context.Context, rd *ringbuf.Reader, mgr *correlati
key.SrcIP[3] = byte(srcIPRaw)
key.SrcPort = srcPort
// Ignorer les événements accept4 sans IP valide (bpf_probe_read_user a échoué)
if srcIPRaw == 0 && srcPort == 0 {
continue
}
// S'assurer que la session existe
mgr.GetOrCreate(key)
}
}
// consumeHTTPPlainEvents lit les payloads HTTP en clair depuis le ring buffer XDP.
// Parse la requête HTTP/1.x ou détecte la préface HTTP/2 pour les connexions
// non-chiffrées sur les ports 80/8080.
func consumeHTTPPlainEvents(ctx context.Context, rd *ringbuf.Reader, mgr *correlation.Manager) {
for {
select {
case <-ctx.Done():
return
default:
}
record, err := rd.Read()
if err != nil {
if err == ringbuf.ErrClosed {
return
}
continue
}
data := record.RawSample
// struct http_plain_event: src_ip(4)+dst_ip(4)+src_port(2)+dst_port(2)+payload(4096)+payload_len(2)+timestamp_ns(8)
if len(data) < 14 {
continue
}
// src_ip et src_port en host byte order (bpf_ntohl appliqué dans tc_capture.c)
srcIPRaw := binary.LittleEndian.Uint32(data[0:4])
srcPort := binary.LittleEndian.Uint16(data[8:10])
if srcIPRaw == 0 && srcPort == 0 {
continue
}
var key correlation.SessionKey
key.SrcIP[0] = byte(srcIPRaw >> 24)
key.SrcIP[1] = byte(srcIPRaw >> 16)
key.SrcIP[2] = byte(srcIPRaw >> 8)
key.SrcIP[3] = byte(srcIPRaw)
key.SrcPort = srcPort
// Extraire le payload HTTP
if len(data) < 4110 {
continue
}
payloadLen := int(binary.LittleEndian.Uint16(data[4108:4110]))
if payloadLen > 4096 {
payloadLen = 4096
}
if payloadLen == 0 {
continue
}
if 12+payloadLen > len(data) {
payloadLen = len(data) - 12
}
httpData := data[12 : 12+payloadLen]
// Routeur Magic Bytes : HTTP/1.x uniquement sur port 80
if parser.IsHTTP1Request(httpData) {
req := parser.ParseHTTP1Request(httpData)
if req == nil {
continue
}
mgr.Update(key, func(s *correlation.SessionState) {
s.Requests = append(s.Requests, correlation.HTTPRequest{
Timestamp: time.Now(),
Method: req.Method,
Path: req.Path,
QueryString: req.Query,
HeaderOrder: req.Headers,
HeaderOrderSig: req.HeaderSig,
})
// Corréler si L3/L4 est déjà présent (TCP SYN capturé)
if s.L3L4 != nil {
s.Correlated = true
}
})
}
}
}