// Package loader initialise les programmes eBPF via cilium/ebpf, // attache le hook TC ingress et les uprobes SSL, et expose // les readers PerfEvent aux consommateurs Go. package loader import ( "context" "fmt" "log" "os" "strconv" "strings" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/perf" "github.com/cilium/ebpf/rlimit" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" ) //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang -target amd64 -cflags "-O2 -g -Wall -D__TARGET_ARCH_x86 -Wno-pass-failed" Ja4Tc ../../bpf/tc_capture.c -- -I../../bpf/headers //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang -target amd64 -cflags "-O2 -g -Wall -D__TARGET_ARCH_x86 -Wno-pass-failed" Ja4Ssl ../../bpf/uprobe_ssl.c -- -I../../bpf/headers //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang -target amd64 -cflags "-O2 -g -Wall -D__TARGET_ARCH_x86 -Wno-pass-failed" Ja4Nginx ../../bpf/uprobe_nginx.c -- -I../../bpf/headers //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang -target amd64 -cflags "-O2 -g -Wall -D__TARGET_ARCH_x86 -Wno-pass-failed" Ja4Apache ../../bpf/uprobe_apache.c -- -I../../bpf/headers // perCPUBufferSize est la taille du buffer perf per-CPU en octets (256 KB). const perCPUBufferSize = 256 * 1024 // Loader encapsule les objets eBPF compilés, les liens vers les hooks, // et les readers PerfEvent exposés au pipeline de traitement. type Loader struct { tcObjs *Ja4TcObjects // généré par bpf2go (tc_capture.c) sslObjs *Ja4SslObjects // généré par bpf2go (uprobe_ssl.c) nginxObjs *Ja4NginxObjects // généré par bpf2go (uprobe_nginx.c) apacheObjs *Ja4ApacheObjects // généré par bpf2go (uprobe_apache.c) tcLinks []netlink.Link // interfaces netlink pour cleanup TC uprobeLinks []link.Link statsMap *ebpf.Map // map tc_stats pour lecture des compteurs BPF (mode debug) allowedPorts *ebpf.Map // map allowed_ports pour filtrage par port ignoredSrc *ebpf.Map // map ignored_src (LPM_TRIE) pour filtrage IP/CIDR nginxPidMap *ebpf.Map // map nginx_pid_map pour filtrage recvfrom par PID apachePidMap *ebpf.Map // map apache_pid_map pour filtrage read par PID // SynReader lit les événements TCP SYN depuis pb_tcp_syn. SynReader *perf.Reader // TLSReader lit les événements TLS ClientHello depuis pb_tls_hello. TLSReader *perf.Reader // SSLReader lit les données SSL déchiffrées depuis pb_ssl_data. SSLReader *perf.Reader // AcceptReader lit les événements accept4 depuis pb_accept. AcceptReader *perf.Reader // HTTPPlainReader lit les payloads HTTP en clair depuis pb_http_plain. HTTPPlainReader *perf.Reader // NginxHTTPReader lit les requêtes HTTP complètes depuis nginx via uprobes. NginxHTTPReader *perf.Reader // ApacheHTTPReader lit les requêtes HTTP complètes depuis Apache httpd via read(). ApacheHTTPReader *perf.Reader } // StatNames associe chaque index de compteur BPF à un nom lisible. var StatNames = map[uint32]string{ 0: "TOTAL", 1: "IPV4", 2: "TCP", 3: "SYN", 4: "SYN_SUBMIT", 5: "TLS_SUBMIT", 6: "HTTP_SUBMIT", } // ReadStats lit les compteurs de la map tc_stats (PERCPU_ARRAY). // Retourne une map[index] → somme de toutes les valeurs CPU. // Si la map n'est pas disponible, retourne une map vide. func (l *Loader) ReadStats() (map[uint32]uint64, error) { result := make(map[uint32]uint64) if l.statsMap == nil { return result, nil } for key := uint32(0); key < 7; key++ { var values []uint64 if err := l.statsMap.Lookup(key, &values); err != nil { continue } var sum uint64 for _, v := range values { sum += v } result[key] = sum } return result, nil } // PopulatePorts remplit la map BPF allowed_ports avec les ports spécifiés. // Doit être appelé avant AttachTC. Chaque port autorisé reçoit la valeur 1. func (l *Loader) PopulatePorts(ports []uint16) error { if l.allowedPorts == nil { return fmt.Errorf("map allowed_ports non disponible") } for _, port := range ports { var key uint16 = port var val uint8 = 1 if err := l.allowedPorts.Put(key, val); err != nil { return fmt.Errorf("ajout port %d dans allowed_ports: %w", port, err) } } return nil } // LPMKey est la clé pour BPF_MAP_TYPE_LPM_TRIE (IPv4). // Data est stocké en network byte order (big-endian) en mémoire // pour correspondre à iph.saddr dans le programme BPF. type LPMKey struct { Prefixlen uint32 Data [4]byte // IP en network byte order } // PopulateIgnoredSrc remplit la map BPF ignored_src (LPM_TRIE) avec les CIDR/IP à ignorer. // Les IPs doivent être en network byte order (big-endian) pour le LPM_TRIE. func (l *Loader) PopulateIgnoredSrc(cidrs []LPMKey) error { if l.ignoredSrc == nil { return fmt.Errorf("map ignored_src non disponible") } for _, key := range cidrs { var val uint8 = 1 if err := l.ignoredSrc.Put(key, val); err != nil { return fmt.Errorf("ajout CIDR dans ignored_src: %w", err) } } return nil } // AddNginxPid ajoute un PID nginx à la map nginx_pid_map pour le filtrage recvfrom. // Un PID nginx activé permettra la capture de ses appels recvfrom() via tracepoints. func (l *Loader) AddNginxPid(pid uint32) error { if l.nginxPidMap == nil { return fmt.Errorf("map nginx_pid_map non disponible") } var val uint8 = 1 if err := l.nginxPidMap.Put(pid, val); err != nil { return fmt.Errorf("ajout PID %d dans nginx_pid_map: %w", pid, err) } return nil } // RemoveNginxPid supprime un PID nginx de la map nginx_pid_map. func (l *Loader) RemoveNginxPid(pid uint32) error { if l.nginxPidMap == nil { return fmt.Errorf("map nginx_pid_map non disponible") } if err := l.nginxPidMap.Delete(pid); err != nil { return fmt.Errorf("suppression PID %d de nginx_pid_map: %w", pid, err) } return nil } // AddApachePid ajoute un PID Apache httpd à la map apache_pid_map pour le filtrage read. // Un PID Apache activé permettra la capture de ses appels read() via tracepoints. func (l *Loader) AddApachePid(pid uint32) error { if l.apachePidMap == nil { return fmt.Errorf("map apache_pid_map non disponible") } var val uint8 = 1 if err := l.apachePidMap.Put(pid, val); err != nil { return fmt.Errorf("ajout PID %d dans apache_pid_map: %w", pid, err) } return nil } // RemoveApachePid supprime un PID Apache de la map apache_pid_map. func (l *Loader) RemoveApachePid(pid uint32) error { if l.apachePidMap == nil { return fmt.Errorf("map apache_pid_map non disponible") } if err := l.apachePidMap.Delete(pid); err != nil { return fmt.Errorf("suppression PID %d de apache_pid_map: %w", pid, err) } return nil } // New charge le bytecode eBPF embarqué, supprime la limite mémoire // RLIMIT_MEMLOCK (requise pour les maps eBPF), // et retourne un Loader prêt à être attaché aux hooks. // // Cible : kernel 4.18+ avec BTF. Les perf event arrays sont supportés depuis // kernel 4.4, bpf_skb_load_bytes depuis kernel 4.5, assurant une compatibilité // maximale via le hook TC ingress. // Le BTF natif est détecté automatiquement par cilium/ebpf via // /sys/kernel/btf/vmlinux — aucun fallback manuel n'est requis. func New() (*Loader, error) { // Supprimer la limite mémoire pour les opérations eBPF if err := rlimit.RemoveMemlock(); err != nil { return nil, fmt.Errorf("suppression RLIMIT_MEMLOCK: %w", err) } // Charger les objets TC (tc_capture.c) tcObjs := &Ja4TcObjects{} if err := LoadJa4TcObjects(tcObjs, nil); err != nil { return nil, fmt.Errorf("chargement objets TC eBPF: %w", err) } // Charger les objets SSL/uprobe (uprobe_ssl.c) sslObjs := &Ja4SslObjects{} if err := LoadJa4SslObjects(sslObjs, nil); err != nil { tcObjs.Close() return nil, fmt.Errorf("chargement objets SSL eBPF: %w", err) } // Charger les objets nginx/uprobe (uprobe_nginx.c) nginxObjs := &Ja4NginxObjects{} if err := LoadJa4NginxObjects(nginxObjs, nil); err != nil { sslObjs.Close() tcObjs.Close() return nil, fmt.Errorf("chargement objets nginx eBPF: %w", err) } // Charger les objets Apache httpd (uprobe_apache.c) apacheObjs := &Ja4ApacheObjects{} if err := LoadJa4ApacheObjects(apacheObjs, nil); err != nil { nginxObjs.Close() sslObjs.Close() tcObjs.Close() return nil, fmt.Errorf("chargement objets Apache eBPF: %w", err) } // Initialiser les readers pour chaque perf event array synReader, err := perf.NewReader(tcObjs.PbTcpSyn, perCPUBufferSize) if err != nil { sslObjs.Close() tcObjs.Close() return nil, fmt.Errorf("création reader pb_tcp_syn: %w", err) } tlsReader, err := perf.NewReader(tcObjs.PbTlsHello, perCPUBufferSize) if err != nil { synReader.Close() sslObjs.Close() tcObjs.Close() return nil, fmt.Errorf("création reader pb_tls_hello: %w", err) } httpPlainReader, err := perf.NewReader(tcObjs.PbHttpPlain, perCPUBufferSize) if err != nil { tlsReader.Close() synReader.Close() sslObjs.Close() tcObjs.Close() return nil, fmt.Errorf("création reader pb_http_plain: %w", err) } sslReader, err := perf.NewReader(sslObjs.PbSslData, perCPUBufferSize) if err != nil { httpPlainReader.Close() tlsReader.Close() synReader.Close() sslObjs.Close() tcObjs.Close() return nil, fmt.Errorf("création reader pb_ssl_data: %w", err) } acceptReader, err := perf.NewReader(sslObjs.PbAccept, perCPUBufferSize) if err != nil { sslReader.Close() httpPlainReader.Close() tlsReader.Close() synReader.Close() sslObjs.Close() tcObjs.Close() return nil, fmt.Errorf("création reader pb_accept: %w", err) } nginxHTTPReader, err := perf.NewReader(nginxObjs.PbGinxHttp, perCPUBufferSize) if err != nil { nginxHTTPReader.Close() acceptReader.Close() sslReader.Close() httpPlainReader.Close() tlsReader.Close() synReader.Close() sslObjs.Close() return nil, fmt.Errorf("création reader pb_ginx_http: %w", err) } apacheHTTPReader, err := perf.NewReader(apacheObjs.PbApacheHttp, perCPUBufferSize) if err != nil { apacheHTTPReader.Close() nginxHTTPReader.Close() acceptReader.Close() sslReader.Close() httpPlainReader.Close() tlsReader.Close() synReader.Close() apacheObjs.Close() nginxObjs.Close() sslObjs.Close() tcObjs.Close() return nil, fmt.Errorf("création reader pb_apache_http: %w", err) } return &Loader{ tcObjs: tcObjs, sslObjs: sslObjs, nginxObjs: nginxObjs, apacheObjs: apacheObjs, statsMap: tcObjs.TcStats, allowedPorts: tcObjs.AllowedPorts, ignoredSrc: tcObjs.IgnoredSrc, nginxPidMap: nginxObjs.NginxPidMap, apachePidMap: apacheObjs.ApachePidMap, SynReader: synReader, TLSReader: tlsReader, SSLReader: sslReader, AcceptReader: acceptReader, HTTPPlainReader: httpPlainReader, NginxHTTPReader: nginxHTTPReader, ApacheHTTPReader: apacheHTTPReader, }, nil } // AttachTC attache le programme TC ingress (clsact qdisc) sur l'interface // réseau spécifiée. Crée le qdisc clsact (idempotent) et attache le filtre BPF // en mode direct-action. Compatible kernel 4.1+. func (l *Loader) AttachTC(iface string) error { nlLink, err := netlink.LinkByName(iface) if err != nil { return fmt.Errorf("interface réseau %q introuvable: %w", iface, err) } if err := l.attachTCOnLink(nlLink); err != nil { return err } l.tcLinks = append(l.tcLinks, nlLink) return nil } // AttachTCAll attache le programme TC ingress sur toutes les interfaces // réseau non-loopback et opérationnelles (OperUp). // Retourne la liste des noms d'interfaces attachées. func (l *Loader) AttachTCAll() ([]string, error) { links, err := netlink.LinkList() if err != nil { return nil, fmt.Errorf("énumération interfaces: %w", err) } var attached []string for _, nlLink := range links { if nlLink.Type() == "loopback" { continue } if nlLink.Attrs().OperState != netlink.OperUp { continue } if err := l.attachTCOnLink(nlLink); err != nil { log.Printf("[loader] TC %s: %v (ignoré)", nlLink.Attrs().Name, err) continue } attached = append(attached, nlLink.Attrs().Name) l.tcLinks = append(l.tcLinks, nlLink) } if len(attached) == 0 { return nil, fmt.Errorf("aucune interface TC attachée") } return attached, nil } // attachTCOnLink attache le programme TC ingress sur un link netlink donné. func (l *Loader) attachTCOnLink(nlLink netlink.Link) error { iface := nlLink.Attrs().Name // Créer le qdisc clsact (idempotent via QdiscReplace) qdisc := &netlink.Clsact{ QdiscAttrs: netlink.QdiscAttrs{ LinkIndex: nlLink.Attrs().Index, Handle: netlink.MakeHandle(0xffff, 0), Parent: netlink.HANDLE_CLSACT, }, } if err := netlink.QdiscReplace(qdisc); err != nil { return fmt.Errorf("clsact qdisc sur %q: %w", iface, err) } // Attacher le programme BPF comme filtre ingress filter := &netlink.BpfFilter{ FilterAttrs: netlink.FilterAttrs{ LinkIndex: nlLink.Attrs().Index, Parent: netlink.HANDLE_MIN_INGRESS, Handle: 1, Protocol: unix.ETH_P_ALL, Priority: 1, }, ClassId: netlink.MakeHandle(1, 1), Fd: l.tcObjs.CaptureTc.FD(), DirectAction: true, } if err := netlink.FilterReplace(filter); err != nil { return fmt.Errorf("TC filter ingress sur %q: %w", iface, err) } return nil } // AttachUprobes attache les uprobes SSL_read et SSL_set_fd // sur le binaire libssl spécifié (ex: "/usr/lib64/libssl.so.3"). func (l *Loader) AttachUprobes(sslLibPath string) error { if _, err := os.Stat(sslLibPath); err != nil { return fmt.Errorf("bibliothèque SSL %q: %w", sslLibPath, err) } ex, err := link.OpenExecutable(sslLibPath) if err != nil { return fmt.Errorf("ouverture exécutable %q pour uprobe: %w", sslLibPath, err) } setFdLink, err := ex.Uprobe("SSL_set_fd", l.sslObjs.UprobeSslSetFd, nil) if err != nil { return fmt.Errorf("attachement uprobe SSL_set_fd: %w", err) } l.uprobeLinks = append(l.uprobeLinks, setFdLink) readEntryLink, err := ex.Uprobe("SSL_read", l.sslObjs.UprobeSslReadEntry, nil) if err != nil { return fmt.Errorf("attachement uprobe SSL_read (entry): %w", err) } l.uprobeLinks = append(l.uprobeLinks, readEntryLink) readExitLink, err := ex.Uretprobe("SSL_read", l.sslObjs.UretprobeSslReadExit, nil) if err != nil { return fmt.Errorf("attachement uretprobe SSL_read (exit): %w", err) } l.uprobeLinks = append(l.uprobeLinks, readExitLink) // SSL_write — capture les réponses HTTP du serveur (direction=1) if err := l.attachSSLWrite(ex); err != nil { return fmt.Errorf("attachement SSL_write: %w", err) } return nil } // AttachAcceptProbe attache les tracepoints syscalls/sys_{enter,exit}_accept4. func (l *Loader) AttachAcceptProbe() error { kpEntry, err := link.Tracepoint("syscalls", "sys_enter_accept4", l.sslObjs.KprobeAccept4Entry, nil) if err != nil { return fmt.Errorf("attachement tracepoint sys_enter_accept4: %w", err) } l.uprobeLinks = append(l.uprobeLinks, kpEntry) kpExit, err := link.Tracepoint("syscalls", "sys_exit_accept4", l.sslObjs.KretprobeAccept4Exit, nil) if err != nil { return fmt.Errorf("attachement tracepoint sys_exit_accept4: %w", err) } l.uprobeLinks = append(l.uprobeLinks, kpExit) return nil } // AttachUprobesNginx configure les tracepoints recvfrom pour capturer // le trafic HTTP complet depuis nginx. Cette approche utilise les tracepoints // kernel sys_enter/exit_recvfrom. // Le PID nginx est ajouté à la map nginx_pid_map pour filtrer les appels recvfrom(). func (l *Loader) AttachUprobesNginx(nginxBinPath string) error { // Attacher les tracepoints recvfrom kpEntry, err := link.Tracepoint("syscalls", "sys_enter_recvfrom", l.nginxObjs.TpSysEnterRecvfrom, nil) if err != nil { return fmt.Errorf("attachement tracepoint sys_enter_recvfrom: %w", err) } l.uprobeLinks = append(l.uprobeLinks, kpEntry) // NOTE: Utilisation de Kretprobe pour sys_exit_recvfrom pour contourner // le bug "permission denied" des tracepoints sur certains kernels (Rocky Linux 9, kernel 5.14+). // Les kretprobes ciblent directement la fonction kernel __x64_sys_recvfrom. kpExit, err := link.Kretprobe("__x64_sys_recvfrom", l.nginxObjs.TpSysExitRecvfrom, &link.KprobeOptions{}) if err != nil { return fmt.Errorf("attachement kretprobe sys_exit_recvfrom: %w", err) } l.uprobeLinks = append(l.uprobeLinks, kpExit) // Trouver le PID nginx en cherchant dans /proc ou via pgrep pids, err := findNginxPIDs() if err != nil { return fmt.Errorf("recherche PID nginx: %w", err) } if len(pids) == 0 { return fmt.Errorf("aucun processus nginx trouvé") } // Ajouter tous les PIDs nginx trouvés à la map de filtrage for _, pid := range pids { if err := l.AddNginxPid(pid); err != nil { log.Printf("[ja4ebpf] avertissement: ajout PID nginx %d: %v", pid, err) } else { log.Printf("[ja4ebpf] tracepoints recvfrom activés pour PID nginx %d", pid) } } return nil } // findNginxPIDs trouve tous les PIDs des processus nginx en cours d'exécution. func findNginxPIDs() ([]uint32, error) { // Lire /proc pour trouver les processus nginx entries, err := os.ReadDir("/proc") if err != nil { return nil, fmt.Errorf("lecture /proc: %w", err) } var pids []uint32 for _, entry := range entries { // Vérifier que le nom est un nombre (PID) if !entry.IsDir() { continue } pid, err := strconv.ParseUint(entry.Name(), 10, 32) if err != nil { continue } // Vérifier si c'est un processus nginx en lisant /proc/[pid]/cmdline cmdlinePath := fmt.Sprintf("/proc/%d/cmdline", pid) cmdlineData, err := os.ReadFile(cmdlinePath) if err != nil { continue } // La cmdline contient le chemin du binaire, ex: "nginx: master process" ou "nginx: worker process" cmdline := string(cmdlineData) if strings.Contains(cmdline, "nginx") { pids = append(pids, uint32(pid)) } } return pids, nil } // AttachUprobesApache configure les tracepoints/kretprobe read pour capturer // le trafic HTTP complet depuis Apache httpd. Cette approche utilise les tracepoints // kernel sys_enter_read et kretprobe __x64_sys_read. // Le PID Apache est ajouté à la map apache_pid_map pour filtrer les appels read(). func (l *Loader) AttachUprobesApache() error { // Utilisation de Kretprobe pour __x64_sys_recvfrom // Apache httpd utilise recvfrom() pour lire les requêtes HTTP (similaire à nginx) kp, err := link.Kretprobe("__x64_sys_recvfrom", l.apacheObjs.KretprobeSysExitRecvfrom, &link.KprobeOptions{}) if err != nil { return fmt.Errorf("attachement kretprobe recvfrom: %w", err) } l.uprobeLinks = append(l.uprobeLinks, kp) // Trouver les PIDs Apache httpd en cours d'exécution pids, err := findApachePIDs() if err != nil { return fmt.Errorf("recherche PID Apache: %w", err) } if len(pids) == 0 { return fmt.Errorf("aucun processus Apache httpd trouvé") } // Ajouter tous les PIDS Apache trouvés à la map de filtrage for _, pid := range pids { if err := l.AddApachePid(pid); err != nil { log.Printf("[ja4ebpf] avertissement: ajout PID Apache %d: %v", pid, err) } else { log.Printf("[ja4ebpf] tracepoints recvfrom activés pour PID Apache %d", pid) } } return nil } // findApachePIDs trouve tous les PIDs des processus Apache httpd en cours d'exécution. func findApachePIDs() ([]uint32, error) { // Lire /proc pour trouver les processus Apache entries, err := os.ReadDir("/proc") if err != nil { return nil, fmt.Errorf("lecture /proc: %w", err) } var pids []uint32 for _, entry := range entries { // Vérifier que le nom est un nombre (PID) if !entry.IsDir() { continue } pid, err := strconv.ParseUint(entry.Name(), 10, 32) if err != nil { continue } // Vérifier si c'est un processus Apache en lisant /proc/[pid]/cmdline cmdlinePath := fmt.Sprintf("/proc/%d/cmdline", pid) cmdlineData, err := os.ReadFile(cmdlinePath) if err != nil { continue } // La cmdline contient le chemin du binaire, ex: "/usr/sbin/httpd" ou "apache2" cmdline := string(cmdlineData) if strings.Contains(cmdline, "httpd") || strings.Contains(cmdline, "apache2") { pids = append(pids, uint32(pid)) } } return pids, nil } // attachSSLWrite attache les uprobes SSL_write pour capturer // les réponses HTTP du serveur (direction=1). func (l *Loader) attachSSLWrite(ex *link.Executable) error { entryLink, err := ex.Uprobe("SSL_write", l.sslObjs.UprobeSslWriteEntry, nil) if err != nil { return fmt.Errorf("attachement uprobe SSL_write (entry): %w", err) } l.uprobeLinks = append(l.uprobeLinks, entryLink) exitLink, err := ex.Uretprobe("SSL_write", l.sslObjs.UretprobeSslWriteExit, nil) if err != nil { return fmt.Errorf("attachement uretprobe SSL_write (exit): %w", err) } l.uprobeLinks = append(l.uprobeLinks, exitLink) return nil } // Close détache tous les hooks eBPF et libère toutes les ressources associées. func (l *Loader) Close() error { if l.HTTPPlainReader != nil { l.HTTPPlainReader.Close() } if l.AcceptReader != nil { l.AcceptReader.Close() } if l.SSLReader != nil { l.SSLReader.Close() } if l.TLSReader != nil { l.TLSReader.Close() } if l.SynReader != nil { l.SynReader.Close() } if l.NginxHTTPReader != nil { l.NginxHTTPReader.Close() } // Détacher les filtres TC ingress sur toutes les interfaces for _, nlLink := range l.tcLinks { filter := &netlink.BpfFilter{ FilterAttrs: netlink.FilterAttrs{ LinkIndex: nlLink.Attrs().Index, Parent: netlink.HANDLE_MIN_INGRESS, Handle: 1, Priority: 1, }, } // Ignorer l'erreur — le filtre peut déjà être supprimé netlink.FilterDel(filter) } for _, lnk := range l.uprobeLinks { if lnk != nil { lnk.Close() } } if l.sslObjs != nil { l.sslObjs.Close() } if l.tcObjs != nil { l.tcObjs.Close() } return nil } // readRecord lit un record brut depuis un PerfReader avec annulation via context. func readRecord(ctx context.Context, rd *perf.Reader) (perf.Record, error) { type result struct { rec perf.Record err error } ch := make(chan result, 1) go func() { rec, err := rd.Read() ch <- result{rec, err} }() select { case <-ctx.Done(): rd.Close() return perf.Record{}, ctx.Err() case r := <-ch: return r.rec, r.err } }