feat(ja4ebpf): add SSL_write uprobe, HPACK decoder, and AcceptCache for session correlation
Add uprobe_ssl_write_entry/uretprobe_ssl_write_exit to capture server HTTP
responses via SSL_write with direction=1. Implement full HPACK decoder
(RFC 7541 static table, multi-byte integers, literal representations) for
HTTP/2 header extraction. Add AcceptCache mapping {tgid,fd}→SessionKey
from accept4 events as authoritative source for SSL correlation when BPF
ssl_conn_map has src_ip=0. Add ip_total_length to tcp_syn_event BPF struct.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
78
services/ja4ebpf/internal/correlation/accept_cache.go
Normal file
78
services/ja4ebpf/internal/correlation/accept_cache.go
Normal file
@ -0,0 +1,78 @@
|
||||
// Package correlation fournit un cache des associations accept4 → SessionKey
|
||||
// pour corriger la corrélation SSL quand ssl_conn_map n'est pas peuplé.
|
||||
package correlation
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// acceptCacheKey identifie une connexion par processus + fd.
|
||||
type acceptCacheKey struct {
|
||||
tgid uint32
|
||||
fd uint32
|
||||
}
|
||||
|
||||
// acceptCacheEntry stocke la clé de session et l'expiration.
|
||||
type acceptCacheEntry struct {
|
||||
key SessionKey
|
||||
dstIP [4]byte
|
||||
dstPort uint16
|
||||
expiresAt time.Time
|
||||
}
|
||||
|
||||
// AcceptCache maps {tgid, fd} → SessionKey + dst info depuis les événements accept4.
|
||||
// Utilisé par le handler SSL quand ssl_conn_map a src_ip=0.
|
||||
type AcceptCache struct {
|
||||
mu sync.RWMutex
|
||||
cache map[acceptCacheKey]*acceptCacheEntry
|
||||
ttl time.Duration
|
||||
}
|
||||
|
||||
// NewAcceptCache crée un cache avec la durée de vie spécifiée.
|
||||
func NewAcceptCache(ttl time.Duration) *AcceptCache {
|
||||
c := &AcceptCache{
|
||||
cache: make(map[acceptCacheKey]*acceptCacheEntry),
|
||||
ttl: ttl,
|
||||
}
|
||||
go c.purgeLoop()
|
||||
return c
|
||||
}
|
||||
|
||||
// Store enregistre l'association {tgid, fd} → SessionKey.
|
||||
func (c *AcceptCache) Store(tgid, fd uint32, key SessionKey, dstIP [4]byte, dstPort uint16) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.cache[acceptCacheKey{tgid: tgid, fd: fd}] = &acceptCacheEntry{
|
||||
key: key,
|
||||
dstIP: dstIP,
|
||||
dstPort: dstPort,
|
||||
expiresAt: time.Now().Add(c.ttl),
|
||||
}
|
||||
}
|
||||
|
||||
// Lookup retourne la SessionKey pour {tgid, fd}.
|
||||
func (c *AcceptCache) Lookup(tgid, fd uint32) (SessionKey, [4]byte, uint16, bool) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
e, ok := c.cache[acceptCacheKey{tgid: tgid, fd: fd}]
|
||||
if !ok || time.Now().After(e.expiresAt) {
|
||||
return SessionKey{}, [4]byte{}, 0, false
|
||||
}
|
||||
return e.key, e.dstIP, e.dstPort, true
|
||||
}
|
||||
|
||||
func (c *AcceptCache) purgeLoop() {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
c.mu.Lock()
|
||||
now := time.Now()
|
||||
for k, e := range c.cache {
|
||||
if now.After(e.expiresAt) {
|
||||
delete(c.cache, k)
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user