Files
ja4-platform/services/ja4ebpf/internal/loader/loader.go
Jacquin Antoine f0c8fe81c6 feat(ja4ebpf): add multi-interface TC, LPM_TRIE ignore_src, unit tests, and fix bugs
- Add multi-interface TC attachment (default "any" = all UP interfaces)
- Add BPF LPM_TRIE map ignored_src for kernel-side CIDR filtering
- Add userspace ignore_src filtering for SSL/accept4 path via net.IPNet.Contains()
- Add AcceptCache for fd→SessionKey correlation with TTL and Close()
- Add 5 test files covering writer, procutil, dispatcher, accept_cache, and cmd
- Fix formatTCPOptions infinite loop on EOL (case 0 break→return)
- Fix pseudoOrderToShort panic on empty slice (negative cap)
- Fix AcceptCache goroutine leak (add done channel + Close())
- Update config.yml.example with interfaces, listen_ports, ignore_src
- Rewrite docs/services/ja4ebpf.md (was massively stale: XDP, RingBuffer, etc.)
- Fix stale XDP/RingBuffer references in docs/architecture.md, thesis, tls.go

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-16 01:49:26 +02:00

431 lines
13 KiB
Go

// Package loader initialise les programmes eBPF via cilium/ebpf,
// attache le hook TC ingress et les uprobes SSL, et expose
// les readers PerfEvent aux consommateurs Go.
package loader
import (
"context"
"fmt"
"log"
"os"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/rlimit"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
)
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang -target amd64 -cflags "-O2 -g -Wall -D__TARGET_ARCH_x86 -Wno-pass-failed" Ja4Tc ../../bpf/tc_capture.c -- -I../../bpf/headers
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang -target amd64 -cflags "-O2 -g -Wall -D__TARGET_ARCH_x86 -Wno-pass-failed" Ja4Ssl ../../bpf/uprobe_ssl.c -- -I../../bpf/headers
// perCPUBufferSize est la taille du buffer perf per-CPU en octets (256 KB).
const perCPUBufferSize = 256 * 1024
// Loader encapsule les objets eBPF compilés, les liens vers les hooks,
// et les readers PerfEvent exposés au pipeline de traitement.
type Loader struct {
tcObjs *Ja4TcObjects // généré par bpf2go (tc_capture.c)
sslObjs *Ja4SslObjects // généré par bpf2go (uprobe_ssl.c)
tcLinks []netlink.Link // interfaces netlink pour cleanup TC
uprobeLinks []link.Link
statsMap *ebpf.Map // map tc_stats pour lecture des compteurs BPF (mode debug)
allowedPorts *ebpf.Map // map allowed_ports pour filtrage par port
ignoredSrc *ebpf.Map // map ignored_src (LPM_TRIE) pour filtrage IP/CIDR
// SynReader lit les événements TCP SYN depuis pb_tcp_syn.
SynReader *perf.Reader
// TLSReader lit les événements TLS ClientHello depuis pb_tls_hello.
TLSReader *perf.Reader
// SSLReader lit les données SSL déchiffrées depuis pb_ssl_data.
SSLReader *perf.Reader
// AcceptReader lit les événements accept4 depuis pb_accept.
AcceptReader *perf.Reader
// HTTPPlainReader lit les payloads HTTP en clair depuis pb_http_plain.
HTTPPlainReader *perf.Reader
}
// StatNames associe chaque index de compteur BPF à un nom lisible.
var StatNames = map[uint32]string{
0: "TOTAL",
1: "IPV4",
2: "TCP",
3: "SYN",
4: "SYN_SUBMIT",
5: "TLS_SUBMIT",
6: "HTTP_SUBMIT",
}
// ReadStats lit les compteurs de la map tc_stats (PERCPU_ARRAY).
// Retourne une map[index] → somme de toutes les valeurs CPU.
// Si la map n'est pas disponible, retourne une map vide.
func (l *Loader) ReadStats() (map[uint32]uint64, error) {
result := make(map[uint32]uint64)
if l.statsMap == nil {
return result, nil
}
for key := uint32(0); key < 7; key++ {
var values []uint64
if err := l.statsMap.Lookup(key, &values); err != nil {
continue
}
var sum uint64
for _, v := range values {
sum += v
}
result[key] = sum
}
return result, nil
}
// PopulatePorts remplit la map BPF allowed_ports avec les ports spécifiés.
// Doit être appelé avant AttachTC. Chaque port autorisé reçoit la valeur 1.
func (l *Loader) PopulatePorts(ports []uint16) error {
if l.allowedPorts == nil {
return fmt.Errorf("map allowed_ports non disponible")
}
for _, port := range ports {
var key uint16 = port
var val uint8 = 1
if err := l.allowedPorts.Put(key, val); err != nil {
return fmt.Errorf("ajout port %d dans allowed_ports: %w", port, err)
}
}
return nil
}
// LPMKey est la clé pour BPF_MAP_TYPE_LPM_TRIE (IPv4).
// Data est stocké en network byte order (big-endian) en mémoire
// pour correspondre à iph.saddr dans le programme BPF.
type LPMKey struct {
Prefixlen uint32
Data [4]byte // IP en network byte order
}
// PopulateIgnoredSrc remplit la map BPF ignored_src (LPM_TRIE) avec les CIDR/IP à ignorer.
// Les IPs doivent être en network byte order (big-endian) pour le LPM_TRIE.
func (l *Loader) PopulateIgnoredSrc(cidrs []LPMKey) error {
if l.ignoredSrc == nil {
return fmt.Errorf("map ignored_src non disponible")
}
for _, key := range cidrs {
var val uint8 = 1
if err := l.ignoredSrc.Put(key, val); err != nil {
return fmt.Errorf("ajout CIDR dans ignored_src: %w", err)
}
}
return nil
}
// New charge le bytecode eBPF embarqué, supprime la limite mémoire
// RLIMIT_MEMLOCK (requise pour les maps eBPF),
// et retourne un Loader prêt à être attaché aux hooks.
//
// Cible : kernel 4.18+ avec BTF. Les perf event arrays sont supportés depuis
// kernel 4.4, bpf_skb_load_bytes depuis kernel 4.5, assurant une compatibilité
// maximale via le hook TC ingress.
// Le BTF natif est détecté automatiquement par cilium/ebpf via
// /sys/kernel/btf/vmlinux — aucun fallback manuel n'est requis.
func New() (*Loader, error) {
// Supprimer la limite mémoire pour les opérations eBPF
if err := rlimit.RemoveMemlock(); err != nil {
return nil, fmt.Errorf("suppression RLIMIT_MEMLOCK: %w", err)
}
// Charger les objets TC (tc_capture.c)
tcObjs := &Ja4TcObjects{}
if err := LoadJa4TcObjects(tcObjs, nil); err != nil {
return nil, fmt.Errorf("chargement objets TC eBPF: %w", err)
}
// Charger les objets SSL/uprobe (uprobe_ssl.c)
sslObjs := &Ja4SslObjects{}
if err := LoadJa4SslObjects(sslObjs, nil); err != nil {
tcObjs.Close()
return nil, fmt.Errorf("chargement objets SSL eBPF: %w", err)
}
// Initialiser les readers pour chaque perf event array
synReader, err := perf.NewReader(tcObjs.PbTcpSyn, perCPUBufferSize)
if err != nil {
sslObjs.Close()
tcObjs.Close()
return nil, fmt.Errorf("création reader pb_tcp_syn: %w", err)
}
tlsReader, err := perf.NewReader(tcObjs.PbTlsHello, perCPUBufferSize)
if err != nil {
synReader.Close()
sslObjs.Close()
tcObjs.Close()
return nil, fmt.Errorf("création reader pb_tls_hello: %w", err)
}
httpPlainReader, err := perf.NewReader(tcObjs.PbHttpPlain, perCPUBufferSize)
if err != nil {
tlsReader.Close()
synReader.Close()
sslObjs.Close()
tcObjs.Close()
return nil, fmt.Errorf("création reader pb_http_plain: %w", err)
}
sslReader, err := perf.NewReader(sslObjs.PbSslData, perCPUBufferSize)
if err != nil {
httpPlainReader.Close()
tlsReader.Close()
synReader.Close()
sslObjs.Close()
tcObjs.Close()
return nil, fmt.Errorf("création reader pb_ssl_data: %w", err)
}
acceptReader, err := perf.NewReader(sslObjs.PbAccept, perCPUBufferSize)
if err != nil {
sslReader.Close()
httpPlainReader.Close()
tlsReader.Close()
synReader.Close()
sslObjs.Close()
tcObjs.Close()
return nil, fmt.Errorf("création reader pb_accept: %w", err)
}
return &Loader{
tcObjs: tcObjs,
sslObjs: sslObjs,
statsMap: tcObjs.TcStats,
allowedPorts: tcObjs.AllowedPorts,
ignoredSrc: tcObjs.IgnoredSrc,
SynReader: synReader,
TLSReader: tlsReader,
SSLReader: sslReader,
AcceptReader: acceptReader,
HTTPPlainReader: httpPlainReader,
}, nil
}
// AttachTC attache le programme TC ingress (clsact qdisc) sur l'interface
// réseau spécifiée. Crée le qdisc clsact (idempotent) et attache le filtre BPF
// en mode direct-action. Compatible kernel 4.1+.
func (l *Loader) AttachTC(iface string) error {
nlLink, err := netlink.LinkByName(iface)
if err != nil {
return fmt.Errorf("interface réseau %q introuvable: %w", iface, err)
}
if err := l.attachTCOnLink(nlLink); err != nil {
return err
}
l.tcLinks = append(l.tcLinks, nlLink)
return nil
}
// AttachTCAll attache le programme TC ingress sur toutes les interfaces
// réseau non-loopback et opérationnelles (OperUp).
// Retourne la liste des noms d'interfaces attachées.
func (l *Loader) AttachTCAll() ([]string, error) {
links, err := netlink.LinkList()
if err != nil {
return nil, fmt.Errorf("énumération interfaces: %w", err)
}
var attached []string
for _, nlLink := range links {
if nlLink.Type() == "loopback" {
continue
}
if nlLink.Attrs().OperState != netlink.OperUp {
continue
}
if err := l.attachTCOnLink(nlLink); err != nil {
log.Printf("[loader] TC %s: %v (ignoré)", nlLink.Attrs().Name, err)
continue
}
attached = append(attached, nlLink.Attrs().Name)
l.tcLinks = append(l.tcLinks, nlLink)
}
if len(attached) == 0 {
return nil, fmt.Errorf("aucune interface TC attachée")
}
return attached, nil
}
// attachTCOnLink attache le programme TC ingress sur un link netlink donné.
func (l *Loader) attachTCOnLink(nlLink netlink.Link) error {
iface := nlLink.Attrs().Name
// Créer le qdisc clsact (idempotent via QdiscReplace)
qdisc := &netlink.Clsact{
QdiscAttrs: netlink.QdiscAttrs{
LinkIndex: nlLink.Attrs().Index,
Handle: netlink.MakeHandle(0xffff, 0),
Parent: netlink.HANDLE_CLSACT,
},
}
if err := netlink.QdiscReplace(qdisc); err != nil {
return fmt.Errorf("clsact qdisc sur %q: %w", iface, err)
}
// Attacher le programme BPF comme filtre ingress
filter := &netlink.BpfFilter{
FilterAttrs: netlink.FilterAttrs{
LinkIndex: nlLink.Attrs().Index,
Parent: netlink.HANDLE_MIN_INGRESS,
Handle: 1,
Protocol: unix.ETH_P_ALL,
Priority: 1,
},
ClassId: netlink.MakeHandle(1, 1),
Fd: l.tcObjs.CaptureTc.FD(),
DirectAction: true,
}
if err := netlink.FilterReplace(filter); err != nil {
return fmt.Errorf("TC filter ingress sur %q: %w", iface, err)
}
return nil
}
// AttachUprobes attache les uprobes SSL_read et SSL_set_fd
// sur le binaire libssl spécifié (ex: "/usr/lib64/libssl.so.3").
func (l *Loader) AttachUprobes(sslLibPath string) error {
if _, err := os.Stat(sslLibPath); err != nil {
return fmt.Errorf("bibliothèque SSL %q: %w", sslLibPath, err)
}
ex, err := link.OpenExecutable(sslLibPath)
if err != nil {
return fmt.Errorf("ouverture exécutable %q pour uprobe: %w", sslLibPath, err)
}
setFdLink, err := ex.Uprobe("SSL_set_fd", l.sslObjs.UprobeSslSetFd, nil)
if err != nil {
return fmt.Errorf("attachement uprobe SSL_set_fd: %w", err)
}
l.uprobeLinks = append(l.uprobeLinks, setFdLink)
readEntryLink, err := ex.Uprobe("SSL_read", l.sslObjs.UprobeSslReadEntry, nil)
if err != nil {
return fmt.Errorf("attachement uprobe SSL_read (entry): %w", err)
}
l.uprobeLinks = append(l.uprobeLinks, readEntryLink)
readExitLink, err := ex.Uretprobe("SSL_read", l.sslObjs.UretprobeSslReadExit, nil)
if err != nil {
return fmt.Errorf("attachement uretprobe SSL_read (exit): %w", err)
}
l.uprobeLinks = append(l.uprobeLinks, readExitLink)
// SSL_write — capture les réponses HTTP du serveur (direction=1)
if err := l.attachSSLWrite(ex); err != nil {
return fmt.Errorf("attachement SSL_write: %w", err)
}
return nil
}
// AttachAcceptProbe attache les tracepoints syscalls/sys_{enter,exit}_accept4.
func (l *Loader) AttachAcceptProbe() error {
kpEntry, err := link.Tracepoint("syscalls", "sys_enter_accept4",
l.sslObjs.KprobeAccept4Entry, nil)
if err != nil {
return fmt.Errorf("attachement tracepoint sys_enter_accept4: %w", err)
}
l.uprobeLinks = append(l.uprobeLinks, kpEntry)
kpExit, err := link.Tracepoint("syscalls", "sys_exit_accept4",
l.sslObjs.KretprobeAccept4Exit, nil)
if err != nil {
return fmt.Errorf("attachement tracepoint sys_exit_accept4: %w", err)
}
l.uprobeLinks = append(l.uprobeLinks, kpExit)
return nil
}
// attachSSLWrite attache les uprobes SSL_write pour capturer
// les réponses HTTP du serveur (direction=1).
func (l *Loader) attachSSLWrite(ex *link.Executable) error {
entryLink, err := ex.Uprobe("SSL_write", l.sslObjs.UprobeSslWriteEntry, nil)
if err != nil {
return fmt.Errorf("attachement uprobe SSL_write (entry): %w", err)
}
l.uprobeLinks = append(l.uprobeLinks, entryLink)
exitLink, err := ex.Uretprobe("SSL_write", l.sslObjs.UretprobeSslWriteExit, nil)
if err != nil {
return fmt.Errorf("attachement uretprobe SSL_write (exit): %w", err)
}
l.uprobeLinks = append(l.uprobeLinks, exitLink)
return nil
}
// Close détache tous les hooks eBPF et libère toutes les ressources associées.
func (l *Loader) Close() error {
if l.HTTPPlainReader != nil {
l.HTTPPlainReader.Close()
}
if l.AcceptReader != nil {
l.AcceptReader.Close()
}
if l.SSLReader != nil {
l.SSLReader.Close()
}
if l.TLSReader != nil {
l.TLSReader.Close()
}
if l.SynReader != nil {
l.SynReader.Close()
}
// Détacher les filtres TC ingress sur toutes les interfaces
for _, nlLink := range l.tcLinks {
filter := &netlink.BpfFilter{
FilterAttrs: netlink.FilterAttrs{
LinkIndex: nlLink.Attrs().Index,
Parent: netlink.HANDLE_MIN_INGRESS,
Handle: 1,
Priority: 1,
},
}
// Ignorer l'erreur — le filtre peut déjà être supprimé
netlink.FilterDel(filter)
}
for _, lnk := range l.uprobeLinks {
if lnk != nil {
lnk.Close()
}
}
if l.sslObjs != nil {
l.sslObjs.Close()
}
if l.tcObjs != nil {
l.tcObjs.Close()
}
return nil
}
// readRecord lit un record brut depuis un PerfReader avec annulation via context.
func readRecord(ctx context.Context, rd *perf.Reader) (perf.Record, error) {
type result struct {
rec perf.Record
err error
}
ch := make(chan result, 1)
go func() {
rec, err := rd.Read()
ch <- result{rec, err}
}()
select {
case <-ctx.Done():
rd.Close()
return perf.Record{}, ctx.Err()
case r := <-ch:
return r.rec, r.err
}
}