Files
ja4-platform/services/ja4ebpf/cmd/ja4ebpf/main.go
Jacquin Antoine 382683710a feat(ebpf): add nginx HTTP capture infrastructure via kretprobe recvfrom
Add supporting infrastructure for nginx HTTP capture using kretprobe
on __x64_sys_recvfrom to replace the blocked tracepoint sys_exit_recvfrom.

Changes:
- bpf/bpf_types.h: Add nginx_pid_map for filtering recvfrom by PID
- cmd/ja4ebpf/main.go: Add Uprobes configuration section
- Makefile: Add test targets for recvfrom validation
- internal/loader: Generate nginx HTTP event structures

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-20 13:30:41 +02:00

1378 lines
40 KiB
Go

// Package main est le point d'entrée du démon ja4ebpf.
// Il initialise la configuration, charge les programmes eBPF, démarre
// les goroutines de traitement et gère les signaux système.
package main
import (
"context"
"encoding/binary"
"fmt"
"net"
"log"
"os"
"os/signal"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"
"github.com/antitbone/ja4/ja4ebpf/internal/correlation"
"github.com/antitbone/ja4/ja4ebpf/internal/loader"
"github.com/antitbone/ja4/ja4ebpf/internal/parser"
"github.com/antitbone/ja4/ja4ebpf/internal/procutil"
"github.com/antitbone/ja4/ja4ebpf/internal/writer"
"github.com/cilium/ebpf/perf"
"gopkg.in/yaml.v3"
)
// fdCache résout les associations fd → IP:port via /proc quand accept4 n'est pas disponible.
// Durée de vie d'une entrée : 5 secondes (suffisant pour une requête HTTP).
var fdCache = procutil.NewFDCache(5 * time.Second)
// acceptCache maps {tgid, fd} → SessionKey depuis les événements accept4.
// Prioritaire sur fdCache car source de vérité (tracepoint kernel).
var acceptCache = correlation.NewAcceptCache(10 * time.Second)
// ignoreNets contient les CIDR sources à ignorer (peuplé depuis cfg.IgnoreSrc).
var ignoreNets []*net.IPNet
// Config décrit la configuration complète du démon ja4ebpf.
// Chargée depuis un fichier YAML et enrichie par les variables d'environnement
// avec le préfixe JA4EBPF_.
type Config struct {
Interfaces []string `yaml:"interfaces"` // interfaces à surveiller (défaut: ["any"])
SSLLibPath string `yaml:"ssl_lib_path"` // chemin vers libssl (ex: "/usr/lib64/libssl.so.3")
ListenPorts []uint16 `yaml:"listen_ports"` // ports à surveiller (défaut: [80, 443])
IgnoreSrc []string `yaml:"ignore_src"` // CIDR/IP sources à ignorer (ex: ["10.0.0.0/8"])
Debug bool `yaml:"debug"` // mode debug : dump compteurs BPF, log verbeux, ClickHouse optionnel
ClickHouse struct {
DSN string `yaml:"dsn"` // DSN ClickHouse natif
BatchSize int `yaml:"batch_size"` // nombre de sessions par batch
FlushSecs int `yaml:"flush_secs"` // intervalle de flush en secondes
} `yaml:"clickhouse"`
Correlation struct {
TimeoutMS int `yaml:"timeout_ms"` // délai d'expiration session (ms)
SlowlorisMS int `yaml:"slowloris_ms"` // seuil Slowloris (ms)
} `yaml:"correlation"`
Log struct {
Level string `yaml:"level"` // niveau de log (debug, info, warn, error)
Format string `yaml:"format"` // format de log ("json" ou "text")
} `yaml:"log"`
Uprobes struct {
Enabled bool `yaml:"enabled"` // activer l'attachement automatique des uprobes
NginxBinPath string `yaml:"nginx_bin_path"` // chemin vers le binaire nginx
MaxRetries int `yaml:"max_retries"` // nombre de tentatives d'attachement (défaut: 30)
RetryIntervalSec int `yaml:"retry_interval_sec"` // intervalle entre tentatives (défaut: 2)
} `yaml:"uprobes"`
}
// loadConfig charge la configuration depuis le fichier YAML spécifié,
// puis applique les surcharges depuis les variables d'environnement.
func loadConfig(path string) (*Config, error) {
cfg := &Config{}
// Valeurs par défaut
cfg.Interfaces = []string{"any"}
cfg.SSLLibPath = "/usr/lib64/libssl.so.3"
cfg.ListenPorts = []uint16{80, 443}
cfg.ClickHouse.DSN = "clickhouse://default:@localhost:9000/ja4_logs?async_insert=0"
cfg.ClickHouse.BatchSize = 500
cfg.ClickHouse.FlushSecs = 1
cfg.Correlation.TimeoutMS = 5000
cfg.Correlation.SlowlorisMS = 10000
cfg.Log.Level = "info"
cfg.Log.Format = "json"
cfg.Uprobes.Enabled = false
cfg.Uprobes.NginxBinPath = "/usr/sbin/nginx"
cfg.Uprobes.MaxRetries = 30
cfg.Uprobes.RetryIntervalSec = 2
// Charger depuis le fichier YAML si spécifié
if path != "" {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("lecture fichier config %q: %w", path, err)
}
if err := yaml.Unmarshal(data, cfg); err != nil {
return nil, fmt.Errorf("parsing YAML config: %w", err)
}
}
// Surcharges via variables d'environnement
if v := os.Getenv("JA4EBPF_INTERFACES"); v != "" {
cfg.Interfaces = strings.Split(v, ",")
}
// Rétrocompatibilité : JA4EBPF_INTERFACE écrase la liste
if v := os.Getenv("JA4EBPF_INTERFACE"); v != "" {
cfg.Interfaces = []string{v}
}
if v := os.Getenv("JA4EBPF_SSL_LIB_PATH"); v != "" {
cfg.SSLLibPath = v
}
if v := os.Getenv("JA4EBPF_CLICKHOUSE_DSN"); v != "" {
cfg.ClickHouse.DSN = v
}
if v := os.Getenv("JA4EBPF_DEBUG"); v != "" {
cfg.Debug = strings.EqualFold(v, "true") || v == "1" || v == "yes"
}
if v := os.Getenv("JA4EBPF_LISTEN_PORTS"); v != "" {
cfg.ListenPorts = nil
for _, s := range strings.Split(v, ",") {
p, err := strconv.ParseUint(strings.TrimSpace(s), 10, 16)
if err != nil {
log.Printf("[ja4ebpf] port invalide dans JA4EBPF_LISTEN_PORTS: %q", s)
continue
}
cfg.ListenPorts = append(cfg.ListenPorts, uint16(p))
}
}
if v := os.Getenv("JA4EBPF_IGNORE_SRC"); v != "" {
cfg.IgnoreSrc = strings.Split(v, ",")
}
// Uprobes configuration via environment variables
if v := os.Getenv("JA4EBPF_UPROBES_ENABLED"); v != "" {
cfg.Uprobes.Enabled = strings.EqualFold(v, "true") || v == "1" || v == "yes"
}
if v := os.Getenv("JA4EBPF_NGINX_BIN_PATH"); v != "" {
cfg.Uprobes.NginxBinPath = v
}
return cfg, nil
}
// main est le point d'entrée du programme.
// parseCIDRs convertit une liste de CIDR/IP en clés LPM_TRIE (big-endian).
func parseCIDRs(cidrs []string) ([]loader.LPMKey, error) {
var keys []loader.LPMKey
for _, cidr := range cidrs {
cidr = strings.TrimSpace(cidr)
if !strings.Contains(cidr, "/") {
cidr += "/32"
}
_, ipNet, err := net.ParseCIDR(cidr)
if err != nil {
return nil, fmt.Errorf("CIDR invalide %q: %w", cidr, err)
}
ip4 := ipNet.IP.To4()
if ip4 == nil {
continue
}
prefixLen, _ := ipNet.Mask.Size()
var data [4]byte
copy(data[:], ip4)
keys = append(keys, loader.LPMKey{
Prefixlen: uint32(prefixLen),
Data: data,
})
}
return keys, nil
}
// isIgnoredIP vérifie si une adresse IPv4 (4 octets) match un des CIDR ignore_src.
func isIgnoredIP(ip [4]byte) bool {
ip4 := net.IPv4(ip[0], ip[1], ip[2], ip[3])
for _, cidr := range ignoreNets {
if cidr.Contains(ip4) {
return true
}
}
return false
}
// parseIgnoreNets convertit la liste de CIDR ignore_src en []*net.IPNet.
func parseIgnoreNets(cidrs []string) []*net.IPNet {
var nets []*net.IPNet
for _, cidr := range cidrs {
cidr = strings.TrimSpace(cidr)
if !strings.Contains(cidr, "/") {
cidr += "/32"
}
_, ipNet, err := net.ParseCIDR(cidr)
if err != nil {
log.Printf("[ja4ebpf] CIDR ignore_src invalide %q: %v", cidr, err)
continue
}
if ipNet.IP.To4() != nil {
nets = append(nets, ipNet)
}
}
return nets
}
func main() {
// Déterminer le chemin du fichier de configuration
configPath := os.Getenv("JA4EBPF_CONFIG")
if configPath == "" {
configPath = "/etc/ja4ebpf/config.yml"
}
cfg, err := loadConfig(configPath)
if err != nil {
log.Fatalf("erreur chargement configuration: %v", err)
}
if cfg.Debug {
log.Printf("[ja4ebpf] MODE DEBUG ACTIVÉ")
}
log.Printf("[ja4ebpf] démarrage — interfaces=%v ssl=%s debug=%v", cfg.Interfaces, cfg.SSLLibPath, cfg.Debug)
// Peupler ignoreNets pour filtrage userspace (SSL_read/SSL_write/accept4)
ignoreNets = parseIgnoreNets(cfg.IgnoreSrc)
// Contexte principal avec annulation sur signal système
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Intercepter SIGTERM et SIGINT pour l'arrêt gracieux
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)
// --- 1. Chargement des programmes eBPF ---
ldr, err := loader.New()
if err != nil {
log.Fatalf("erreur chargement eBPF: %v", err)
}
defer ldr.Close()
// --- 1b. Peuplement de la map allowed_ports ---
if err := ldr.PopulatePorts(cfg.ListenPorts); err != nil {
log.Fatalf("[ja4ebpf] erreur peuplement allowed_ports: %v", err)
}
for _, p := range cfg.ListenPorts {
log.Printf("[ja4ebpf] port %d surveillé", p)
}
// --- 1c. Peuplement de la map ignored_src (LPM_TRIE) ---
if len(cfg.IgnoreSrc) > 0 {
lpmKeys, err := parseCIDRs(cfg.IgnoreSrc)
if err != nil {
log.Fatalf("[ja4ebpf] erreur parsing ignore_src: %v", err)
}
if err := ldr.PopulateIgnoredSrc(lpmKeys); err != nil {
log.Fatalf("[ja4ebpf] erreur peuplement ignored_src: %v", err)
}
for _, c := range cfg.IgnoreSrc {
log.Printf("[ja4ebpf] ignore src: %s", c)
}
}
// --- 2. Attachement TC ingress ---
if len(cfg.Interfaces) == 1 && cfg.Interfaces[0] == "any" {
ifaces, err := ldr.AttachTCAll()
if err != nil {
log.Fatalf("[ja4ebpf] erreur attachement TC: %v", err)
}
log.Printf("[ja4ebpf] TC ingress attaché sur: %v", ifaces)
} else {
for _, iface := range cfg.Interfaces {
log.Printf("[ja4ebpf] attachement TC ingress sur %s...", iface)
if err := ldr.AttachTC(iface); err != nil {
log.Fatalf("[ja4ebpf] erreur attachement TC %s: %v", iface, err)
}
log.Printf("[ja4ebpf] TC ingress attaché sur %s", iface)
}
}
// --- 3. Attachement uprobes SSL ---
if err := ldr.AttachUprobes(cfg.SSLLibPath); err != nil {
log.Printf("[ja4ebpf] avertissement uprobes SSL: %v (désactivation uprobes)", err)
// Continuer sans uprobes SSL (capture L3/L4 toujours active)
}
// --- 4. Attachement tracepoints accept4 (sys_enter/exit_accept4) ---
if err := ldr.AttachAcceptProbe(); err != nil {
log.Printf("[ja4ebpf] avertissement tracepoint accept4: %v", err)
}
// --- 4b. Attachement uprobes nginx (avec retry automatique) ---
if err := attachNginxUprobesWithRetry(ctx, ldr, cfg); err != nil {
log.Printf("[ja4ebpf] erreur attachement uprobes nginx: %v", err)
}
// --- 5. Gestionnaire de sessions ---
sessionTimeout := time.Duration(cfg.Correlation.TimeoutMS) * time.Millisecond
mgr := correlation.NewManager(sessionTimeout)
mgr.StartGC(ctx)
defer mgr.Close()
// --- 6. Writer ClickHouse ---
var w *writer.ClickHouseWriter
flushInterval := time.Duration(cfg.ClickHouse.FlushSecs) * time.Second
w, err = writer.NewClickHouseWriter(cfg.ClickHouse.DSN, cfg.ClickHouse.BatchSize, flushInterval)
if err != nil {
if cfg.Debug {
log.Printf("[ja4ebpf] DEBUG: writer ClickHouse non disponible: %v (continue sans CH)", err)
} else {
log.Fatalf("erreur initialisation writer ClickHouse: %v", err)
}
}
if w != nil {
w.Start(ctx)
}
// --- 7. Goroutine : écriture des sessions prêtes ---
go func() {
for {
select {
case s, ok := <-mgr.ReadyCh:
if !ok {
// Canal fermé, terminer la goroutine
return
}
if w != nil {
w.Write(s)
}
case <-ctx.Done():
// Context annulé, terminer la goroutine gracieusement
return
}
}
}()
// --- 8. Compteurs d'événements consommés (mode debug) ---
consumed := &eventCounters{}
go consumeNginxHTTPEvents(ctx, ldr.NginxHTTPReader, mgr, &consumed.nginx)
// --- 9. Goroutines de consommation des ring buffers ---
go consumeSynEvents(ctx, ldr.SynReader, mgr, &consumed.syn)
go consumeTLSEvents(ctx, ldr.TLSReader, mgr, &consumed.tls)
go consumeSSLEvents(ctx, ldr.SSLReader, mgr, &consumed.ssl)
go consumeAcceptEvents(ctx, ldr.AcceptReader, mgr, &consumed.accept)
go consumeHTTPPlainEvents(ctx, ldr.HTTPPlainReader, mgr, &consumed.httpPlain)
// --- 10. Stats dumper (mode debug) ---
if cfg.Debug {
go debugStatsDumper(ctx, ldr, consumed)
}
log.Printf("[ja4ebpf] démon actif — en attente des événements")
// Attendre un signal d'arrêt
select {
case sig := <-sigCh:
log.Printf("[ja4ebpf] signal reçu: %v — arrêt gracieux", sig)
case <-ctx.Done():
log.Printf("[ja4ebpf] contexte annulé — arrêt gracieux")
}
// Annuler le contexte pour signaler toutes les goroutines
cancel()
log.Printf("[ja4ebpf] contexte annulé, attente de la fermeture des goroutines...")
// Attendre un court instant pour que les goroutines se terminent
// Le defer mgr.Close() fermera ReadyCh, ce qui débloquera la goroutine d'écriture
time.Sleep(100 * time.Millisecond)
log.Printf("[ja4ebpf] arrêt terminé")
}
// eventCounters contient les compteurs atomiques pour chaque type d'événement consommé.
type eventCounters struct {
syn atomic.Uint64
tls atomic.Uint64
ssl atomic.Uint64
accept atomic.Uint64
httpPlain atomic.Uint64
nginx atomic.Uint64
}
// debugStatsDumper affiche les compteurs BPF et les événements consommés toutes les 5 secondes.
func debugStatsDumper(ctx context.Context, ldr *loader.Loader, consumed *eventCounters) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
// Compteurs BPF kernel
stats, err := ldr.ReadStats()
if err != nil {
log.Printf("[debug] erreur lecture tc_stats: %v", err)
continue
}
log.Printf("[debug] BPF: TOTAL=%d IPV4=%d TCP=%d SYN=%d SYN_SUB=%d TLS_SUB=%d HTTP_SUB=%d",
stats[0], stats[1], stats[2], stats[3], stats[4], stats[5], stats[6])
// Compteurs userspace
log.Printf("[debug] GO: syn=%d tls=%d ssl=%d accept=%d http=%d",
consumed.syn.Load(), consumed.tls.Load(), consumed.ssl.Load(),
consumed.accept.Load(), consumed.httpPlain.Load())
}
}
// parseTCPOptions extrait le MSS et le Window Scale depuis les options TCP brutes.
// Les options TCP suivent le format TLV (Type-Length-Value), sauf les options 0 et 1.
// Retourne (mss=0, windowScale=0xFF) si les options sont absentes ou mal formées.
func parseTCPOptions(opts []byte) (mss uint16, windowScale uint8) {
windowScale = 0xFF // 0xFF = absent (convention C)
i := 0
for i < len(opts) {
kind := opts[i]
i++
switch kind {
case 0: // End of Options
return
case 1: // NOP — padding, pas de longueur
continue
default:
if i >= len(opts) {
return
}
length := int(opts[i])
i++
if length < 2 || i+length-2 > len(opts) {
return // option malformée
}
val := opts[i : i+length-2]
switch kind {
case 2: // MSS
if len(val) >= 2 {
mss = binary.BigEndian.Uint16(val[0:2])
}
case 3: // Window Scale
if len(val) >= 1 {
windowScale = val[0]
}
}
i += length - 2
}
}
return
}
// consumeSynEvents lit les événements TCP SYN depuis le ring buffer
// et met à jour l'état L3/L4 des sessions.
func consumeSynEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Manager, counter *atomic.Uint64) {
for {
select {
case <-ctx.Done():
return
default:
}
record, err := rd.Read()
if err != nil {
if err == os.ErrClosed {
return
}
continue
}
// struct tcp_syn_event (packed):
// src_ip(4)+dst_ip(4)+src_port(2)+dst_port(2)+ttl(1)+df_bit(1)+ip_id(2)+
// ip_total_length(2)+window_size(2)+window_scale(1)+mss(2)+tcp_options_raw[40]+
// tcp_options_len(1)+timestamp_ns(8)
// offsets: 0 4 8 10 12 13 14 16 18 20 21 23 63
// total = 64 + 8 = 72
if len(record.RawSample) < 72 {
continue
}
data := record.RawSample
// src_ip et src_port stockés en host byte order (bpf_ntohl/bpf_ntohs dans BPF C).
srcIPRaw := binary.LittleEndian.Uint32(data[0:4])
dstIPRaw := binary.LittleEndian.Uint32(data[4:8])
srcPort := binary.LittleEndian.Uint16(data[8:10])
dstPort := binary.LittleEndian.Uint16(data[10:12])
var key correlation.SessionKey
key.SrcIP[0] = byte(srcIPRaw >> 24)
key.SrcIP[1] = byte(srcIPRaw >> 16)
key.SrcIP[2] = byte(srcIPRaw >> 8)
key.SrcIP[3] = byte(srcIPRaw)
key.SrcPort = srcPort
var dstIP [4]byte
dstIP[0] = byte(dstIPRaw >> 24)
dstIP[1] = byte(dstIPRaw >> 16)
dstIP[2] = byte(dstIPRaw >> 8)
dstIP[3] = byte(dstIPRaw)
// Champs IP/TCP aux offsets corrects (ip_total_length inséré entre ip_id et window_size)
ttl := data[12]
dfBit := data[13] != 0
ipID := binary.LittleEndian.Uint16(data[14:16])
ipTotalLength := binary.LittleEndian.Uint16(data[16:18])
windowSize := binary.LittleEndian.Uint16(data[18:20])
optLen := int(data[63])
if optLen > 40 {
optLen = 40
}
tcpOpts := make([]byte, optLen)
copy(tcpOpts, data[23:23+optLen])
// Filtrer les IPs sources ignorées (ignore_src)
if isIgnoredIP(key.SrcIP) {
continue
}
// Analyser les options TCP brutes pour extraire MSS et Window Scale
mss, windowScale := parseTCPOptions(tcpOpts)
mgr.Update(key, func(s *correlation.SessionState) {
s.L3L4 = &correlation.L3L4{
DstIP: dstIP,
DstPort: dstPort,
TTL: ttl,
DFBit: dfBit,
IPID: ipID,
IPTotalLength: ipTotalLength,
WindowSize: windowSize,
WindowScale: windowScale,
MSS: mss,
TCPOptionsRaw: tcpOpts,
SYNTimestamp: time.Now(),
}
// Si TLS est déjà présent (arrivé avant SYN), les deux couches sont disponibles.
if s.TLS != nil {
_ = s.TLS // corrélation implicite par présence des deux champs
}
})
counter.Add(1)
}
}
// consumeTLSEvents lit les événements TLS ClientHello depuis le ring buffer
// et calcule l'empreinte JA4 pour chaque session.
func consumeTLSEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Manager, counter *atomic.Uint64) {
for {
select {
case <-ctx.Done():
return
default:
}
record, err := rd.Read()
if err != nil {
if err == os.ErrClosed {
return
}
continue
}
// struct tls_hello_event (packed):
// payload[2048] + src_ip(4) + dst_ip(4) + src_port(2) + dst_port(2) + payload_len(2) + timestamp_ns(8)
// offsets: 0 2048 2052 2056 2058 2060 2062
if len(record.RawSample) < 2070 {
continue
}
data := record.RawSample
srcIPRaw := binary.LittleEndian.Uint32(data[2048:2052])
dstIPRaw := binary.LittleEndian.Uint32(data[2052:2056])
srcPort := binary.LittleEndian.Uint16(data[2056:2058])
dstPort := binary.LittleEndian.Uint16(data[2058:2060])
payloadLen := binary.LittleEndian.Uint16(data[2060:2062])
if int(payloadLen) > 2048 {
payloadLen = 2048
}
payload := make([]byte, payloadLen)
copy(payload, data[0:payloadLen])
var key correlation.SessionKey
key.SrcIP[0] = byte(srcIPRaw >> 24)
key.SrcIP[1] = byte(srcIPRaw >> 16)
key.SrcIP[2] = byte(srcIPRaw >> 8)
key.SrcIP[3] = byte(srcIPRaw)
key.SrcPort = srcPort
var tlsDstIP [4]byte
tlsDstIP[0] = byte(dstIPRaw >> 24)
tlsDstIP[1] = byte(dstIPRaw >> 16)
tlsDstIP[2] = byte(dstIPRaw >> 8)
tlsDstIP[3] = byte(dstIPRaw)
// Filtrer les IPs sources ignorées (ignore_src)
if isIgnoredIP(key.SrcIP) {
continue
}
// Parser le ClientHello et calculer JA4
ch, err := parser.ParseClientHello(payload)
if err != nil {
log.Printf("[warn] TLS parse error: %v", err)
continue
}
ja4 := parser.ComputeJA4(ch)
ja3Raw, ja3Hash := parser.ComputeJA3(ch)
var alpn []string
var ciphers, extensions []uint16
for _, e := range ch.Extensions {
extensions = append(extensions, e.Type)
}
ciphers = ch.CipherSuites
alpn = ch.ALPN
// Déterminer la version TLS la plus haute (comme ComputeJA4)
var tlsVer uint16
for _, v := range ch.SupportedVersions {
if !parser.IsGREASE(v) && v > tlsVer {
tlsVer = v
}
}
if tlsVer == 0 {
tlsVer = ch.HandshakeVersion
}
mgr.Update(key, func(s *correlation.SessionState) {
s.TLS = &correlation.TLSInfo{
ClientHelloRaw: payload,
JA4Hash: ja4,
JA3Raw: ja3Raw,
JA3Hash: ja3Hash,
SNI: ch.SNI,
ALPN: alpn,
CipherSuites: ciphers,
Extensions: extensions,
TLSVersion: tlsVer,
Timestamp: time.Now(),
}
// Peupler L3/L4 si absent (SYN non capturé, TLS arrivé en premier)
if s.L3L4 == nil && dstIPRaw != 0 {
s.L3L4 = &correlation.L3L4{
DstIP: tlsDstIP,
DstPort: dstPort,
}
}
})
counter.Add(1)
}
}
// consumeSSLEvents lit les données SSL déchiffrées depuis le ring buffer.
// Parse les requêtes HTTP/1.x et détecte le préambule HTTP/2.
// Quand src_ip=0 (accept4 non disponible), tente un lookup /proc pour retrouver l'IP du client.
func consumeSSLEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Manager, counter *atomic.Uint64) {
for {
select {
case <-ctx.Done():
return
default:
}
record, err := rd.Read()
if err != nil {
if err == os.ErrClosed {
return
}
continue
}
data := record.RawSample
// Taille minimale : pid_tgid(8) + fd(4) + src_ip(4) + src_port(2) = 18
if len(data) < 18 {
continue
}
pidTgid := binary.LittleEndian.Uint64(data[0:8])
fd := binary.LittleEndian.Uint32(data[8:12])
srcIPRaw := binary.LittleEndian.Uint32(data[12:16])
srcPort := binary.LittleEndian.Uint16(data[16:18])
// data[4096] commence à offset 18, data_len à offset 4114,
// timestamp_ns à offset 4118, direction à offset 4126
if len(data) < 4127 {
continue
}
dataLen := binary.LittleEndian.Uint32(data[4114:4118])
direction := data[4126] // 0 = SSL_read (client→serveur), 1 = SSL_write (serveur→client)
if dataLen > 4096 {
dataLen = 4096
}
if dataLen == 0 {
continue
}
sslData := data[18 : 18+dataLen]
var key correlation.SessionKey
var dstIPFromAccept [4]byte
var dstPortFromAccept uint16
if srcIPRaw == 0 && fd != 0 {
// ssl_conn_map non peuplé : chercher la clé de session via le cache accept4
tgid := uint32(pidTgid >> 32)
if tgid == 0 {
tgid = uint32(pidTgid)
}
// Priorité 1 : cache accept4 (source de vérité — tracepoint kernel)
if skey, dstIP, dstPort, ok := acceptCache.Lookup(tgid, fd); ok {
key = skey
dstIPFromAccept = dstIP
dstPortFromAccept = dstPort
} else {
// Priorité 2 : fallback /proc (moins fiable — port parfois erroné)
if ip, port, lookupErr := fdCache.Lookup(tgid, fd); lookupErr == nil {
ipv4 := ip.To4()
if ipv4 != nil {
key.SrcIP[0] = ipv4[0]
key.SrcIP[1] = ipv4[1]
key.SrcIP[2] = ipv4[2]
key.SrcIP[3] = ipv4[3]
key.SrcPort = port
}
}
}
} else {
key.SrcIP[0] = byte(srcIPRaw >> 24)
key.SrcIP[1] = byte(srcIPRaw >> 16)
key.SrcIP[2] = byte(srcIPRaw >> 8)
key.SrcIP[3] = byte(srcIPRaw)
key.SrcPort = srcPort
}
// Ignorer les événements sans IP identifiable (ex: connexions locales non HTTP)
if key.SrcIP == [4]byte{} && key.SrcPort == 0 {
continue
}
// Filtrer les IPs sources ignorées (ignore_src)
if key.SrcIP != [4]byte{} && isIgnoredIP(key.SrcIP) {
log.Printf("[debug-ssl] FILTERED srcIP=%d.%d.%d.%d", key.SrcIP[0], key.SrcIP[1], key.SrcIP[2], key.SrcIP[3])
continue
}
counter.Add(1)
// === Routeur par direction ===
// direction=0 (SSL_read) : requêtes du client
// direction=1 (SSL_write) : réponses du serveur
if direction == 1 {
// === Serveur → Client : réponses HTTP ===
// HTTP/1.x response
if parser.IsHTTP1Response(sslData) {
resp := parser.ParseHTTP1Response(sslData)
if resp == nil {
log.Printf("[warn] HTTP/1.x response parsing failed for src=%v", key)
continue
}
mgr.Update(key, func(s *correlation.SessionState) {
if len(s.Requests) > 0 {
last := &s.Requests[len(s.Requests)-1]
if last.StatusCode == 0 {
last.StatusCode = resp.StatusCode
}
}
})
}
// HTTP/2 : traiter via H2ConnState si la connexion est H2
mgr.Update(key, func(s *correlation.SessionState) {
if s.H2Conn == nil {
return
}
result, err := s.H2Conn.ProcessFrames(sslData, 1)
if err != nil || result == nil {
return
}
// Extraire le code de statut des réponses serveur
if result.StatusCode > 0 && len(s.Requests) > 0 {
last := &s.Requests[len(s.Requests)-1]
if last.StatusCode == 0 {
last.StatusCode = result.StatusCode
}
}
// Mettre à jour les paramètres SETTINGS serveur
if result.ServerSettings != nil {
s.H2Conn.ServerSettings = result.ServerSettings
}
})
continue
}
// === Client → Serveur : requêtes HTTP (direction=0) ===
if parser.DetectH2Preface(sslData) {
// HTTP/2 : préface détectée, créer H2ConnState et traiter les frames
afterPreface := sslData
if len(afterPreface) > parser.H2MagicPrefaceLen() {
afterPreface = sslData[parser.H2MagicPrefaceLen():]
}
mgr.Update(key, func(s *correlation.SessionState) {
// Créer le H2ConnState s'il n'existe pas
if s.H2Conn == nil {
s.H2Conn = parser.NewH2ConnState()
}
result, _ := s.H2Conn.ProcessFrames(afterPreface, 0)
applyH2Result(s, result, dstIPFromAccept, dstPortFromAccept)
})
continue
}
// HTTP/2 frames seules (sans préface — SSL_read ultérieurs)
// Utiliser H2ConnState si disponible
var h2connExists bool
mgr.Update(key, func(s *correlation.SessionState) {
h2connExists = s.H2Conn != nil
})
if h2connExists {
mgr.Update(key, func(s *correlation.SessionState) {
result, _ := s.H2Conn.ProcessFrames(sslData, 0)
if result == nil {
return
}
// En-têtes décodés
if len(result.Headers) > 0 && len(s.Requests) > 0 {
last := &s.Requests[len(s.Requests)-1]
if last.HeaderKV == nil {
last.HeaderKV = make(map[string]string)
}
for _, h := range result.Headers {
nameLower := strings.ToLower(h.Name)
if parser.HpackCapturedHeaders[nameLower] && h.Value != "" {
if _, exists := last.HeaderKV[nameLower]; !exists {
last.HeaderKV[nameLower] = h.Value
last.HeaderOrder = append(last.HeaderOrder, nameLower)
}
}
switch nameLower {
case ":method":
if last.Method == "" {
last.Method = h.Value
}
case ":path":
if last.Path == "" {
p := h.Value
if idx := strings.Index(p, "?"); idx >= 0 {
last.Path = p[:idx]
last.QueryString = p[idx+1:]
} else {
last.Path = p
}
}
case ":authority":
if last.Host == "" {
last.Host = h.Value
}
}
}
if len(last.HeaderOrder) > 0 && last.HeaderOrderSig == "" {
last.HeaderOrderSig = strings.Join(last.HeaderOrder, ";")
}
}
// Mettre à jour SETTINGS client si présents
if result.ClientSettings != nil && len(s.Requests) > 0 {
last := &s.Requests[len(s.Requests)-1]
updateH2Settings(last, result.ClientSettings)
}
})
continue
}
// Première frame H2 sans préface — créer H2ConnState
if parser.IsH2FrameHeader(sslData) {
mgr.Update(key, func(s *correlation.SessionState) {
s.H2Conn = parser.NewH2ConnState()
result, _ := s.H2Conn.ProcessFrames(sslData, 0)
applyH2Result(s, result, dstIPFromAccept, dstPortFromAccept)
})
continue
}
if parser.IsHTTP1Request(sslData) {
// HTTP/1.x : parser la requête
req := parser.ParseHTTP1Request(sslData)
if req == nil {
log.Printf("[warn] HTTP/1.x request parsing failed, src=%v, data_len=%d", key, len(sslData))
continue
}
mgr.Update(key, func(s *correlation.SessionState) {
s.Requests = append(s.Requests, correlation.HTTPRequest{
Timestamp: time.Now(),
Method: req.Method,
Path: req.Path,
QueryString: req.Query,
Host: req.HeaderKV["Host"],
HeaderOrder: req.Headers,
HeaderOrderSig: req.HeaderSig,
HeaderKV: req.HeaderKV,
HTTPVersion: req.Protocol,
})
// Si la session n'a pas de L3L4 (pas de SYN capturé),
// peupler dst_ip/dst_port depuis le cache accept4
if s.L3L4 == nil && (dstIPFromAccept != [4]byte{} || dstPortFromAccept != 0) {
s.L3L4 = &correlation.L3L4{
DstIP: dstIPFromAccept,
DstPort: dstPortFromAccept,
}
}
_ = s.TLS // corrélation implicite
})
continue
}
// Les réponses HTTP sont maintenant traitées dans le bloc direction=1 ci-dessus
}
}
// consumeAcceptEvents lit les événements accept4 depuis le ring buffer.
// Met à jour les sessions avec les informations de connexion client.
func consumeAcceptEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Manager, counter *atomic.Uint64) {
for {
select {
case <-ctx.Done():
return
default:
}
record, err := rd.Read()
if err != nil {
if err == os.ErrClosed {
return
}
continue
}
data := record.RawSample
// Taille attendue : pid_tgid(8) + fd(4) + src_ip(4) + src_port(2) + timestamp(8) = 26
if len(data) < 22 {
continue
}
pidTgid := binary.LittleEndian.Uint64(data[0:8])
fd := binary.LittleEndian.Uint32(data[8:12])
srcIPRaw := binary.LittleEndian.Uint32(data[12:16])
srcPort := binary.LittleEndian.Uint16(data[16:18])
var key correlation.SessionKey
key.SrcIP[0] = byte(srcIPRaw >> 24)
key.SrcIP[1] = byte(srcIPRaw >> 16)
key.SrcIP[2] = byte(srcIPRaw >> 8)
key.SrcIP[3] = byte(srcIPRaw)
key.SrcPort = srcPort
// Ignorer les événements accept4 sans IP valide (bpf_probe_read_user a échoué)
if srcIPRaw == 0 && srcPort == 0 {
continue
}
// Filtrer les IPs sources ignorées (ignore_src)
if isIgnoredIP(key.SrcIP) {
continue
}
// Peupler le cache accept4 pour corrélation SSL
tgid := uint32(pidTgid >> 32)
acceptCache.Store(tgid, fd, key, [4]byte{}, 0)
// S'assurer que la session existe
mgr.GetOrCreate(key)
counter.Add(1)
}
}
// consumeHTTPPlainEvents lit les payloads HTTP en clair depuis le perf buffer TC.
// Parse la requête HTTP/1.x ou détecte la préface HTTP/2 pour les connexions
// non-chiffrées sur les ports 80/8080.
func consumeHTTPPlainEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Manager, counter *atomic.Uint64) {
for {
select {
case <-ctx.Done():
return
default:
}
record, err := rd.Read()
if err != nil {
if err == os.ErrClosed {
return
}
continue
}
data := record.RawSample
// struct http_plain_event: payload(4096)+src_ip(4)+dst_ip(4)+src_port(2)+dst_port(2)+payload_len(2)+timestamp_ns(8)
if len(data) < 14 {
continue
}
// src_ip et src_port en host byte order (bpf_ntohl appliqué dans tc_capture.c)
srcIPRaw := binary.LittleEndian.Uint32(data[4096:4100])
dstIPRaw := binary.LittleEndian.Uint32(data[4100:4104])
srcPort := binary.LittleEndian.Uint16(data[4104:4106])
dstPort := binary.LittleEndian.Uint16(data[4106:4108])
if srcIPRaw == 0 && srcPort == 0 {
continue
}
var key correlation.SessionKey
key.SrcIP[0] = byte(srcIPRaw >> 24)
key.SrcIP[1] = byte(srcIPRaw >> 16)
key.SrcIP[2] = byte(srcIPRaw >> 8)
key.SrcIP[3] = byte(srcIPRaw)
key.SrcPort = srcPort
var httpDstIP [4]byte
httpDstIP[0] = byte(dstIPRaw >> 24)
httpDstIP[1] = byte(dstIPRaw >> 16)
httpDstIP[2] = byte(dstIPRaw >> 8)
httpDstIP[3] = byte(dstIPRaw)
// Filtrer les IPs sources ignorées (ignore_src)
if isIgnoredIP(key.SrcIP) {
continue
}
// Extraire le payload HTTP
if len(data) < 4110 {
continue
}
payloadLen := int(binary.LittleEndian.Uint16(data[4108:4110]))
if payloadLen > 4096 {
payloadLen = 4096
}
if payloadLen == 0 {
continue
}
if 4096+payloadLen > len(data) {
payloadLen = len(data) - 4096
}
httpData := data[0:payloadLen]
// Routeur Magic Bytes : HTTP/1.x uniquement sur port 80
if parser.IsHTTP1Request(httpData) {
req := parser.ParseHTTP1Request(httpData)
if req == nil {
log.Printf("[warn] HTTP plain request parsing failed, src=%v, data_len=%d", key, len(httpData))
continue
}
mgr.Update(key, func(s *correlation.SessionState) {
s.Requests = append(s.Requests, correlation.HTTPRequest{
Timestamp: time.Now(),
Method: req.Method,
Path: req.Path,
QueryString: req.Query,
Host: req.HeaderKV["Host"],
HeaderOrder: req.Headers,
HeaderOrderSig: req.HeaderSig,
HeaderKV: req.HeaderKV,
HTTPVersion: req.Protocol,
})
// Peupler L3/L4 si absent (SYN non capturé)
if s.L3L4 == nil && dstIPRaw != 0 {
s.L3L4 = &correlation.L3L4{
DstIP: httpDstIP,
DstPort: dstPort,
}
}
})
counter.Add(1)
}
}
}
// applyH2Result applique le résultat du parsing H2 à la session.
// Crée ou met à jour la requête HTTP avec les paramètres SETTINGS et en-têtes.
func applyH2Result(s *correlation.SessionState, result *parser.H2FrameResult, dstIPFromAccept [4]byte, dstPortFromAccept uint16) {
if result == nil {
if len(s.Requests) == 0 {
s.Requests = append(s.Requests, correlation.HTTPRequest{
Timestamp: time.Now(),
HTTPVersion: "HTTP/2",
})
}
return
}
req := correlation.HTTPRequest{
Timestamp: time.Now(),
HTTPVersion: "HTTP/2",
}
// Paramètres SETTINGS client
if result.ClientSettings != nil {
req.HTTP2Settings = &correlation.HTTP2Settings{
HeaderTableSize: result.ClientSettings.HeaderTableSize,
EnablePush: result.ClientSettings.EnablePush,
MaxConcurrentStreams: result.ClientSettings.MaxConcurrentStreams,
InitialWindowSize: result.ClientSettings.InitialWindowSize,
MaxFrameSize: result.ClientSettings.MaxFrameSize,
MaxHeaderListSize: result.ClientSettings.MaxHeaderListSize,
UnknownSettings: result.ClientSettings.UnknownSettings,
EnableConnectProtocol: result.ClientSettings.EnableConnectProtocol,
WindowUpdateIncrement: result.ClientSettings.WindowUpdateIncrement,
PseudoHeaderOrder: result.ClientSettings.PseudoHeaderOrder,
}
}
// En-têtes décodés
if len(result.Headers) > 0 {
req.HeaderKV = make(map[string]string)
for _, h := range result.Headers {
nameLower := strings.ToLower(h.Name)
if parser.HpackCapturedHeaders[nameLower] && h.Value != "" {
req.HeaderKV[nameLower] = h.Value
req.HeaderOrder = append(req.HeaderOrder, nameLower)
}
switch nameLower {
case ":method":
req.Method = h.Value
case ":path":
if idx := strings.Index(h.Value, "?"); idx >= 0 {
req.Path = h.Value[:idx]
req.QueryString = h.Value[idx+1:]
} else {
req.Path = h.Value
}
case ":authority":
req.Host = h.Value
}
}
if len(req.HeaderOrder) > 0 {
req.HeaderOrderSig = strings.Join(req.HeaderOrder, ";")
}
}
// Pseudo-headers order (toujours disponible via result, même sans ClientSettings)
if len(result.PseudoHeaderOrder) > 0 {
if req.HTTP2Settings == nil {
req.HTTP2Settings = &correlation.HTTP2Settings{}
}
req.HTTP2Settings.PseudoHeaderOrder = result.PseudoHeaderOrder
}
if len(s.Requests) == 0 {
s.Requests = append(s.Requests, req)
}
if s.L3L4 == nil && (dstIPFromAccept != [4]byte{} || dstPortFromAccept != 0) {
s.L3L4 = &correlation.L3L4{
DstIP: dstIPFromAccept,
DstPort: dstPortFromAccept,
}
}
_ = s.TLS
}
// updateH2Settings met à jour les paramètres HTTP/2 d'une requête existante.
func updateH2Settings(last *correlation.HTTPRequest, settings *parser.HTTP2Settings) {
if last.HTTP2Settings == nil {
last.HTTP2Settings = &correlation.HTTP2Settings{
WindowUpdateIncrement: settings.WindowUpdateIncrement,
PseudoHeaderOrder: settings.PseudoHeaderOrder,
}
}
if settings.HeaderTableSize >= 0 {
last.HTTP2Settings.HeaderTableSize = settings.HeaderTableSize
}
if settings.EnablePush >= 0 {
last.HTTP2Settings.EnablePush = settings.EnablePush
}
if settings.MaxConcurrentStreams >= 0 {
last.HTTP2Settings.MaxConcurrentStreams = settings.MaxConcurrentStreams
}
if settings.InitialWindowSize >= 0 {
last.HTTP2Settings.InitialWindowSize = settings.InitialWindowSize
}
if settings.MaxFrameSize >= 0 {
last.HTTP2Settings.MaxFrameSize = settings.MaxFrameSize
}
if settings.MaxHeaderListSize >= 0 {
last.HTTP2Settings.MaxHeaderListSize = settings.MaxHeaderListSize
}
if settings.UnknownSettings >= 0 {
last.HTTP2Settings.UnknownSettings = settings.UnknownSettings
}
if settings.EnableConnectProtocol >= 0 {
last.HTTP2Settings.EnableConnectProtocol = settings.EnableConnectProtocol
}
if len(settings.PseudoHeaderOrder) > 0 {
last.HTTP2Settings.PseudoHeaderOrder = settings.PseudoHeaderOrder
}
}
// attachNginxUprobesWithRetry attache les uprobes nginx avec retry automatique.
// Retente jusqu'à maxRetries fois toutes les retryInterval secondes.
// Utile pour attendre que nginx démarre après ja4ebpf.
func attachNginxUprobesWithRetry(ctx context.Context, l *loader.Loader, cfg *Config) error {
if !cfg.Uprobes.Enabled {
log.Printf("[uprobes] nginx uprobes désactivés (uprobes.enabled=false)")
return nil
}
binPath := cfg.Uprobes.NginxBinPath
maxRetries := cfg.Uprobes.MaxRetries
retryInterval := time.Duration(cfg.Uprobes.RetryIntervalSec) * time.Second
log.Printf("[uprobes] tentative d'attachement nginx uprobes (bin=%s, max_retries=%d, interval=%v)",
binPath, maxRetries, retryInterval)
for attempt := 1; attempt <= maxRetries; attempt++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Vérifier que le binaire existe
if _, err := os.Stat(binPath); err != nil {
log.Printf("[uprobes] tentative %d/%d: binaire nginx non trouvé (%s), retry dans %v",
attempt, maxRetries, binPath, retryInterval)
time.Sleep(retryInterval)
continue
}
// Tenter d'attacher les uprobes
err := l.AttachUprobesNginx(binPath)
if err == nil {
log.Printf("[uprobes] nginx uprobes attachés avec succès (tentative %d/%d)", attempt, maxRetries)
return nil
}
log.Printf("[uprobes] tentative %d/%d: échec attachement: %v, retry dans %v",
attempt, maxRetries, err, retryInterval)
// Dernière tentative : ne pas sleep
if attempt < maxRetries {
time.Sleep(retryInterval)
}
}
return fmt.Errorf("attachement nginx uprobes échoué après %d tentatives", maxRetries)
}
// consumeNginxHTTPEvents lit et traite les événements HTTP depuis nginx via uprobes.
// Ces événements contiennent les requêtes HTTP complètes (méthode, URI, headers) capturées
// par nginx avant tout traitement, garantissant des headers complets.
func consumeNginxHTTPEvents(ctx context.Context, rd *perf.Reader, mgr *correlation.Manager, counter *atomic.Uint64) {
// Structure nginx_http_event BPF (version tracepoint recvfrom):
// Offset: 0:pid_tgid(8), 8:fd(4), 12:src_ip(4), 16:src_port(2), 18:timestamp_ns(8),
// 26:http_method[16], 42:uri[256], 298:query[128], 426:data[3640],
// 4066:method_len(4), 4070:uri_len(4), 4074:query_len(4), 4078:body_len(4), 4082:data_len(4)
// NOTE: Avec tracepoint recvfrom, http_method/uri/query sont vides, les données HTTP sont dans data[]
const minEventSize = 426
for {
select {
case <-ctx.Done():
return
default:
}
record, err := rd.Read()
if err != nil {
if err == os.ErrClosed {
return
}
continue
}
data := record.RawSample
if len(data) < 426 {
// Les événements doivent avoir au moins l'offset du champ data
continue
}
// Parser les metadata (structure tracepoint recvfrom)
pidTgid := binary.LittleEndian.Uint64(data[0:8])
fd := binary.LittleEndian.Uint32(data[8:12])
srcIPRaw := binary.LittleEndian.Uint32(data[12:16])
srcPort := binary.LittleEndian.Uint16(data[16:18])
dataLen := binary.LittleEndian.Uint32(data[4082:4086])
// Extraire les données HTTP brutes (offset 426)
rawHTTP := string(data[426 : 426+dataLen])
if len(rawHTTP) < 4 {
continue
}
// Parser la ligne de requête HTTP: "METHOD /path?query HTTP/1.1\r\n"
requestLineEnd := strings.Index(rawHTTP, "\r\n")
if requestLineEnd < 0 {
continue
}
requestLine := rawHTTP[:requestLineEnd]
parts := strings.Fields(requestLine)
if len(parts) < 2 {
continue
}
httpMethod := parts[0]
fullPath := parts[1]
// Séparer path et query string
queryStart := strings.Index(fullPath, "?")
var uri, query string
if queryStart >= 0 {
uri = fullPath[:queryStart]
query = fullPath[queryStart+1:]
} else {
uri = fullPath
}
// Parser les headers HTTP
headersData := rawHTTP[requestLineEnd+2:]
var req correlation.HTTPRequest
req.Method = httpMethod
req.Path = uri
req.QueryString = query
req.Timestamp = time.Now()
req.HeaderKV = make(map[string]string)
lines := strings.Split(headersData, "\r\n")
for _, line := range lines {
if line == "" {
break
}
colon := strings.Index(line, ":")
if colon > 0 {
name := strings.TrimSpace(line[:colon])
value := ""
if colon+1 < len(line) {
value = strings.TrimSpace(line[colon+1:])
}
nameLower := strings.ToLower(name)
req.HeaderOrder = append(req.HeaderOrder, nameLower)
req.HeaderKV[nameLower] = value
}
}
req.HeaderOrderSig = strings.Join(req.HeaderOrder, ";")
// Créer la clé de session
var key correlation.SessionKey
key.SrcIP[0] = byte(srcIPRaw >> 24)
key.SrcIP[1] = byte(srcIPRaw >> 16)
key.SrcIP[2] = byte(srcIPRaw >> 8)
key.SrcIP[3] = byte(srcIPRaw)
key.SrcPort = srcPort
// Filtrer les IPs sources ignorées
if isIgnoredIP(key.SrcIP) {
continue
}
// Mettre à jour la session
mgr.Update(key, func(s *correlation.SessionState) {
s.Requests = append(s.Requests, req)
})
counter.Add(1)
log.Printf("[nginx] HTTP: pid=%d fd=%d %s %s (headers=%d)",
pidTgid>>32, fd, httpMethod, uri, len(req.HeaderOrder))
}
}