From e9dcd8ea512b441da69a8eefb9dbc386467d4b9a Mon Sep 17 00:00:00 2001 From: toto Date: Thu, 5 Mar 2026 11:40:54 +0100 Subject: [PATCH] feat: observability, IP filtering, stdout/clickhouse fixes (v1.1.11) - feat(observability): metrics server with /metrics and /health endpoints - feat(observability): correlation metrics (events, success/failed, reasons, buffers) - feat(correlation): IP exclusion filter (exact IPs and CIDR ranges) - feat(correlation): pending orphan delay for late-arriving B events - fix(stdout): sink is now a no-op for data; JSON must never appear on stdout - fix(clickhouse): all flush errors were silently discarded, now properly logged - fix(clickhouse): buffer overflow with DropOnOverflow now logged at WARN - fix(clickhouse): retry attempts logged at WARN with attempt/delay/error context - feat(clickhouse): connection success logged at INFO, batch sends at DEBUG - feat(clickhouse): SetLogger() for external logger injection - test(stdout): assert stdout remains empty for correlated and orphan logs - chore(rpm): bump version to 1.1.11, update changelog - docs: README and architecture.yml updated Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- Makefile | 2 +- README.md | 166 ++++- architecture.yml | 139 ++++- cmd/logcorrelator/main.go | 55 +- config.example.yml | 17 + .../adapters/inbound/unixsocket/source.go | 6 +- internal/adapters/outbound/clickhouse/sink.go | 35 +- internal/adapters/outbound/stdout/sink.go | 70 +-- .../adapters/outbound/stdout/sink_test.go | 81 +++ internal/config/config.go | 64 +- internal/domain/correlation_service.go | 192 +++++- internal/observability/metrics.go | 176 ++++++ internal/observability/metrics_server.go | 128 ++++ packaging/rpm/logcorrelator.spec | 34 + scripts/test-correlation-advanced.py | 582 ++++++++++++++++++ scripts/test-correlation.sh | 404 ++++++++++++ 16 files changed, 2035 insertions(+), 116 deletions(-) create mode 100644 internal/adapters/outbound/stdout/sink_test.go create mode 100644 internal/observability/metrics.go create mode 100644 internal/observability/metrics_server.go create mode 100755 scripts/test-correlation-advanced.py create mode 100755 scripts/test-correlation.sh diff --git a/Makefile b/Makefile index ead6a6e..9edd302 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ BINARY_NAME=logcorrelator DIST_DIR=dist # Package version -PKG_VERSION ?= 1.1.8 +PKG_VERSION ?= 1.1.11 # Enable BuildKit for better performance export DOCKER_BUILDKIT=1 diff --git a/README.md b/README.md index 1a53a39..85b7af1 100644 --- a/README.md +++ b/README.md @@ -71,9 +71,9 @@ docker run -d \ make package-rpm # Installer le package RPM (Rocky Linux 8/9/10) -sudo dnf install -y dist/rpm/el8/logcorrelator-1.1.6-1.el8.x86_64.rpm -sudo dnf install -y dist/rpm/el9/logcorrelator-1.1.6-1.el9.x86_64.rpm -sudo dnf install -y dist/rpm/el10/logcorrelator-1.1.6-1.el10.x86_64.rpm +sudo dnf install -y dist/rpm/el8/logcorrelator-1.1.10-1.el8.x86_64.rpm +sudo dnf install -y dist/rpm/el9/logcorrelator-1.1.10-1.el9.x86_64.rpm +sudo dnf install -y dist/rpm/el10/logcorrelator-1.1.10-1.el10.x86_64.rpm # Activer et démarrer le service sudo systemctl enable logcorrelator @@ -146,6 +146,10 @@ correlation: max_network_items: 20000 ttl: network_ttl_s: 30 + # Exclure certaines IPs source (optionnel) + exclude_source_ips: + - 10.0.0.1 # IP unique + - 192.168.0.0/16 # Plage CIDR ``` ### Format du DSN ClickHouse @@ -579,3 +583,159 @@ MIT ```bash /usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.yml ``` + +## Débogage de la corrélation + +### Activer les logs DEBUG + +Pour diagnostiquer les problèmes de corrélation, activez le niveau de log `DEBUG` : + +```yaml +# /etc/logcorrelator/logcorrelator.yml +log: + level: DEBUG +``` + +Les logs DEBUG affichent : +- Réception des événements A et B avec clé de corrélation et timestamp +- Tentatives de matching (succès et échecs) +- Raisons des échecs : `no_match_key`, `time_window`, `buffer_eviction`, `ttl_expired` +- Émission des orphelins (immédiats ou après délai) +- Resets TTL (mode Keep-Alive) + +Exemple de logs : +``` +[unixsocket:http] DEBUG event received: source=A src_ip=192.168.1.1 src_port=8080 timestamp=2026-03-04 11:00:00.123456789 +0000 UTC +[correlation] DEBUG processing A event: key=192.168.1.1:8080 timestamp=2026-03-04 11:00:00.123456789 +0000 UTC +[correlation] DEBUG A event has no matching B key in buffer: key=192.168.1.1:8080 +[correlation] DEBUG A event added to pending orphans (delay=500ms): src_ip=192.168.1.1 src_port=8080 +[correlation] DEBUG correlation found: A(src_ip=192.168.1.1 src_port=8080 ts=...) + B(src_ip=192.168.1.1 src_port=8080 ts=...) +``` + +### Serveur de métriques + +Le service expose un serveur HTTP optionnel pour le monitoring et le débogage. + +**Configuration :** +```yaml +metrics: + enabled: true + addr: ":8080" # Adresse d'écoute +``` + +**Endpoints :** +- `GET /metrics` - Retourne les métriques de corrélation au format JSON +- `GET /health` - Health check + +**Métriques disponibles :** + +| Métrique | Description | +|----------|-------------| +| `events_received_a` | Nombre d'événements A reçus | +| `events_received_b` | Nombre d'événements B reçus | +| `correlations_success` | Corrélations réussies | +| `correlations_failed` | Échecs de corrélation | +| `failed_no_match_key` | Échec : clé `src_ip:src_port` non trouvée | +| `failed_time_window` | Échec : hors fenêtre temporelle | +| `failed_buffer_eviction` | Échec : buffer plein | +| `failed_ttl_expired` | Échec : TTL expiré | +| `buffer_a_size` | Taille du buffer A | +| `buffer_b_size` | Taille du buffer B | +| `orphans_emitted_a` | Orphelins A émis | +| `orphans_emitted_b` | Orphelins B émis | +| `orphans_pending_a` | Orphelins A en attente | +| `pending_orphan_match` | B a corrélé avec un orphelin A en attente | +| `keepalive_resets` | Resets TTL (mode Keep-Alive) | + +**Exemple de réponse :** +```json +{ + "events_received_a": 150, + "events_received_b": 145, + "correlations_success": 140, + "correlations_failed": 10, + "failed_no_match_key": 5, + "failed_time_window": 3, + "failed_buffer_eviction": 0, + "failed_ttl_expired": 2, + "buffer_a_size": 10, + "buffer_b_size": 5, + "orphans_emitted_a": 10, + "keepalive_resets": 25 +} +``` + +### Diagnostic rapide + +Selon les métriques, identifiez la cause des échecs : + +| Métrique élevée | Cause probable | Solution | +|----------------|----------------|----------| +| `failed_no_match_key` | Les logs A et B n'ont pas le même `src_ip + src_port` | Vérifiez que les deux sources utilisent bien la même combinaison IP/port | +| `failed_time_window` | Timestamps trop éloignés (>10s par défaut) | Augmentez `correlation.time_window.value` ou vérifiez la synchronisation des horloges | +| `failed_ttl_expired` | Les événements B expirent avant corrélation | Augmentez `correlation.ttl.network_ttl_s` | +| `failed_buffer_eviction` | Buffers trop petits pour le volume | Augmentez `correlation.buffers.max_http_items` et `max_network_items` | +| `orphans_emitted_a` élevé | Beaucoup de logs A sans B correspondant | Vérifiez que la source B envoie bien les événements attendus | +| `failed_ip_excluded` élevé | Traffic depuis des IPs exclues | Vérifiez la configuration `exclude_source_ips` | + +### Exclure des IPs source + +Pour exclure certains logs en fonction de l'IP source, utilisez la configuration `exclude_source_ips` : + +```yaml +correlation: + exclude_source_ips: + - 10.0.0.1 # IP unique + - 192.168.1.100 # Autre IP unique + - 172.16.0.0/12 # Plage CIDR (réseau privé) + - 10.10.10.0/24 # Autre plage CIDR +``` + +**Cas d'usage :** +- Exclure les health checks et sondes de monitoring +- Filtrer le traffic interne connu +- Bloquer des IPs malveillantes ou indésirables + +**Comportement :** +- Les événements depuis ces IPs sont silencieusement ignorés +- Ils ne sont pas corrélés, pas émis comme orphelins +- La métrique `failed_ip_excluded` compte le nombre d'événements exclus +- Les logs DEBUG montrent : `event excluded by IP filter: source=A src_ip=10.0.0.1 src_port=8080` + +### Scripts de test + +Deux scripts sont fournis pour tester la corrélation : + +**Script Bash (simple) :** +```bash +# Test de base avec 10 paires d'événements +./scripts/test-correlation.sh -c 10 -v + +# Avec chemins de sockets personnalisés +./scripts/test-correlation.sh \ + -H /var/run/logcorrelator/http.socket \ + -N /var/run/logcorrelator/network.socket \ + -m http://localhost:8080/metrics +``` + +**Script Python (avancé) :** +```bash +# Installation des dépendances +pip install requests + +# Test de base +python3 scripts/test-correlation-advanced.py -c 20 -v + +# Tous les tests (basic, time window, different IP, keepalive) +python3 scripts/test-correlation-advanced.py --all + +# Test spécifique +python3 scripts/test-correlation-advanced.py --time-window +python3 scripts/test-correlation-advanced.py --keepalive +``` + +**Prérequis :** +- `socat` ou `nc` (netcat) pour le script Bash +- Python 3.6+ et `requests` pour le script Python +- Le service `logcorrelator` doit être en cours d'exécution +- Le serveur de métriques doit être activé pour les vérifications automatiques diff --git a/architecture.yml b/architecture.yml index 74725b7..fa73e93 100644 --- a/architecture.yml +++ b/architecture.yml @@ -13,9 +13,16 @@ service: HTTP Keep-Alive : un log réseau peut être corrélé à plusieurs logs HTTP successifs (stratégie 1‑à‑N). La rétention en mémoire est bornée par des tailles de caches configurables et un TTL dynamique pour la source B. Le - service émet toujours les événements A même lorsqu’aucun événement B n’est - disponible, n’émet jamais de logs B seuls, et pousse les résultats vers + service émet toujours les événements A même lorsqu'aucun événement B n'est + disponible, n'émet jamais de logs B seuls, et pousse les résultats vers ClickHouse et/ou un fichier local. + + Fonctionnalités de débogage incluses : + - Serveur de métriques HTTP (/metrics, /health) + - Logs DEBUG détaillés avec raisons des échecs de corrélation + - Filtrage des IPs source (exclude_source_ips) + - Scripts de test (Bash et Python) + - Métriques : événements reçus, corrélations, échecs par raison, buffers, orphelins runtime: deployment: @@ -276,6 +283,20 @@ config: # Augmenté à 120s pour supporter les sessions HTTP Keep-Alive longues. network_ttl_s: 120 + # Filtrage des IPs source à exclure (optionnel) + exclude_source_ips: + - 10.0.0.1 # IP unique + - 172.16.0.0/12 # Plage CIDR + # Les événements depuis ces IPs sont silencieusement ignorés + + # Serveur de métriques HTTP (optionnel, pour débogage et monitoring) + metrics: + enabled: false + addr: ":8080" # Adresse d'écoute du serveur HTTP + # Endpoints: + # GET /metrics - Retourne les métriques de corrélation en JSON + # GET /health - Health check + inputs: description: > Deux flux de logs JSON via sockets Unix datagram (SOCK_DGRAM). Chaque datagramme @@ -734,6 +755,9 @@ architecture: type: infrastructure responsibilities: - Logger structuré avec niveaux (DEBUG, INFO, WARN, ERROR). + - CorrelationMetrics : suivi des statistiques de corrélation. + - MetricsServer : serveur HTTP pour exposition des métriques (/metrics, /health). + - Traçage des événements exclus (exclude_source_ips). - Logs pour : événements reçus, corrélations, orphelins, buffer plein. testing: @@ -750,13 +774,16 @@ testing: - ClickHouseSink (batching, retry, overflow) - FileSink (réouverture sur SIGHUP) - MultiSink (fan-out) - - Config (validation, valeurs par défaut) + - Config (validation, valeurs par défaut, exclude_source_ips) - UnixSocketSource (lecture, permissions, cleanup) + - CorrelationMetrics (suivi des statistiques) + - MetricsServer (endpoints /metrics et /health) integration: description: > Tests d'intégration limités. Le flux complet A+B → corrélation → sinks est testé via des tests unitaires avec mocks. ClickHouse est mocké (pas de tests avec vrai ClickHouse). Scénarios Keep-Alive testés dans correlation_service_test.go. + Scripts de test fournis : scripts/test-correlation.sh et scripts/test-correlation-advanced.py. docker: description: > @@ -798,3 +825,109 @@ docker: - path: Dockerfile.package description: Packaging RPM multi-distros (el8, el9, el10) avec scripts post/preun/postun. +observability: + description: > + Le service inclut des fonctionnalités complètes de débogage et de monitoring + pour diagnostiquer les problèmes de corrélation et surveiller les performances. + logging: + levels: + - DEBUG: Tous les événements reçus, tentatives de corrélation, raisons des échecs + - INFO: Événements corrélés, démarrage/arrêt du service + - WARN: Orphelins émis, buffer plein, TTL expiré + - ERROR: Erreurs de parsing, échecs de sink, erreurs critiques + debug_logs: + - "event received: source=A src_ip=192.168.1.1 src_port=8080 timestamp=..." + - "processing A event: key=192.168.1.1:8080 timestamp=..." + - "correlation found: A(src_ip=... src_port=... ts=...) + B(src_ip=... src_port=... ts=...)" + - "A event has no matching B key in buffer: key=..." + - "A event has same key as B but outside time window: key=... time_diff=5s window=10s" + - "event excluded by IP filter: source=A src_ip=10.0.0.1 src_port=8080" + - "TTL reset for B event (Keep-Alive): key=... new_ttl=120s" + metrics_server: + enabled: true + endpoints: + - path: /metrics + method: GET + description: Retourne les métriques de corrélation au format JSON + response_example: | + { + "events_received_a": 1542, + "events_received_b": 1498, + "correlations_success": 1450, + "correlations_failed": 92, + "failed_no_match_key": 45, + "failed_time_window": 23, + "failed_buffer_eviction": 5, + "failed_ttl_expired": 12, + "failed_ip_excluded": 7, + "buffer_a_size": 23, + "buffer_b_size": 18, + "orphans_emitted_a": 92, + "keepalive_resets": 892 + } + - path: /health + method: GET + description: Health check + response_example: | + {"status":"healthy"} + metrics_tracked: + events_received: + - events_received_a: Nombre d'événements HTTP (source A) reçus + - events_received_b: Nombre d'événements réseau (source B) reçus + correlations: + - correlations_success: Corrélations réussies + - correlations_failed: Échecs de corrélation + failure_reasons: + - failed_no_match_key: Clé src_ip:src_port non trouvée dans le buffer + - failed_time_window: Événements hors fenêtre temporelle + - failed_buffer_eviction: Buffer plein, événement évincé + - failed_ttl_expired: TTL du événement B expiré + - failed_ip_excluded: Événement exclu par filtre IP (exclude_source_ips) + buffers: + - buffer_a_size: Taille actuelle du buffer HTTP + - buffer_b_size: Taille actuelle du buffer réseau + orphans: + - orphans_emitted_a: Orphelins A émis (sans correspondance B) + - orphans_emitted_b: Orphelins B émis (toujours 0, policy: network_emit=false) + - orphans_pending_a: Orphelins A en attente (délai avant émission) + - pending_orphan_match: B a corrélé avec un orphelin A en attente + keepalive: + - keepalive_resets: Resets TTL pour mode Keep-Alive (one-to-many) + troubleshooting: + description: > + Guide de diagnostic basé sur les métriques et logs + common_issues: + - symptom: failed_no_match_key élevé + cause: Les logs A et B n'ont pas le même src_ip + src_port + solution: Vérifier que les deux sources utilisent la même combinaison IP/port + - symptom: failed_time_window élevé + cause: Timestamps trop éloignés (> time_window.value) + solution: Augmenter correlation.time_window.value ou synchroniser les horloges (NTP) + - symptom: failed_ttl_expired élevé + cause: Les événements B expirent avant corrélation + solution: Augmenter correlation.ttl.network_ttl_s + - symptom: failed_buffer_eviction élevé + cause: Buffers trop petits pour le volume de logs + solution: Augmenter correlation.buffers.max_http_items et max_network_items + - symptom: failed_ip_excluded élevé + cause: Traffic depuis des IPs configurées dans exclude_source_ips + solution: Vérifier la configuration, c'est normal si attendu + - symptom: orphans_emitted_a élevé + cause: Beaucoup de logs A sans correspondance B + solution: Vérifier que la source B envoie bien les événements attendus + test_scripts: + - name: scripts/test-correlation.sh + description: Script Bash pour tester la corrélation avec des événements synthétiques + features: + - Envoi de paires A+B avec mêmes src_ip:src_port + - Vérification des métriques avant/après + - Options: -c (count), -d (delay), -v (verbose), -m (metrics-url) + - name: scripts/test-correlation-advanced.py + description: Script Python avancé avec multiples scénarios de test + features: + - Basic test: corrélations simples + - Time window test: vérifie l'expiration de la fenêtre temporelle + - Different IP test: vérifie non-corrélation avec IPs différentes + - Keep-Alive test: vérifie le mode one-to-many + - Métriques en temps réel + diff --git a/cmd/logcorrelator/main.go b/cmd/logcorrelator/main.go index fb648d0..3fb189b 100644 --- a/cmd/logcorrelator/main.go +++ b/cmd/logcorrelator/main.go @@ -1,11 +1,13 @@ package main import ( + "context" "flag" "fmt" "os" "os/signal" "syscall" + "time" "github.com/logcorrelator/logcorrelator/internal/adapters/inbound/unixsocket" "github.com/logcorrelator/logcorrelator/internal/adapters/outbound/clickhouse" @@ -88,17 +90,15 @@ func main() { logger.Error("Failed to create ClickHouse sink", err) os.Exit(1) } + clickHouseSink.SetLogger(logger) sinks = append(sinks, clickHouseSink) logger.Info(fmt.Sprintf("Configured ClickHouse sink: table=%s", cfg.Outputs.ClickHouse.Table)) } if cfg.Outputs.Stdout.Enabled { - stdoutSink := stdout.NewStdoutSink(stdout.Config{ - Enabled: true, - Level: cfg.Outputs.Stdout.Level, - }) + stdoutSink := stdout.NewStdoutSink(stdout.Config{Enabled: true}) sinks = append(sinks, stdoutSink) - logger.Info(fmt.Sprintf("Configured stdout sink: level=%s", cfg.Outputs.Stdout.Level)) + logger.Info("Configured stdout sink (operational logs on stderr)") } // Create multi-sink wrapper @@ -106,14 +106,15 @@ func main() { // Create correlation service correlationSvc := domain.NewCorrelationService(domain.CorrelationConfig{ - TimeWindow: cfg.Correlation.GetTimeWindow(), - ApacheAlwaysEmit: cfg.Correlation.GetApacheAlwaysEmit(), - ApacheEmitDelayMs: cfg.Correlation.GetApacheEmitDelayMs(), - NetworkEmit: false, - MaxHTTPBufferSize: cfg.Correlation.GetMaxHTTPBufferSize(), + TimeWindow: cfg.Correlation.GetTimeWindow(), + ApacheAlwaysEmit: cfg.Correlation.GetApacheAlwaysEmit(), + ApacheEmitDelayMs: cfg.Correlation.GetApacheEmitDelayMs(), + NetworkEmit: false, + MaxHTTPBufferSize: cfg.Correlation.GetMaxHTTPBufferSize(), MaxNetworkBufferSize: cfg.Correlation.GetMaxNetworkBufferSize(), - NetworkTTLS: cfg.Correlation.GetNetworkTTLS(), - MatchingMode: cfg.Correlation.GetMatchingMode(), + NetworkTTLS: cfg.Correlation.GetNetworkTTLS(), + MatchingMode: cfg.Correlation.GetMatchingMode(), + ExcludeSourceIPs: cfg.Correlation.GetExcludeSourceIPs(), }, &domain.RealTimeProvider{}) // Set logger for correlation service @@ -124,6 +125,27 @@ func main() { cfg.Correlation.GetApacheAlwaysEmit(), cfg.Correlation.GetApacheEmitDelayMs())) + // Start metrics server if enabled + var metricsServer *observability.MetricsServer + if cfg.Metrics.Enabled { + addr := cfg.Metrics.Addr + if addr == "" { + addr = ":8080" // Default address + } + var err error + metricsServer, err = observability.NewMetricsServer(addr, correlationSvc.GetMetricsSnapshot) + if err != nil { + logger.Error("Failed to create metrics server", err) + os.Exit(1) + } + if err := metricsServer.Start(); err != nil { + logger.Error("Failed to start metrics server", err) + os.Exit(1) + } + logger.Info(fmt.Sprintf("Metrics server started: addr=%s", metricsServer.Addr())) + logger.Info("Metrics endpoints: /metrics (JSON), /health") + } + // Create orchestrator orchestrator := app.NewOrchestrator(app.OrchestratorConfig{ Sources: sources, @@ -166,5 +188,14 @@ func main() { logger.Error("Error during shutdown", err) } + // Stop metrics server + if metricsServer != nil { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := metricsServer.Stop(shutdownCtx); err != nil { + logger.Error("Error stopping metrics server", err) + } + } + logger.Info("logcorrelator stopped") } diff --git a/config.example.yml b/config.example.yml index 80176ce..3ff1e20 100644 --- a/config.example.yml +++ b/config.example.yml @@ -65,3 +65,20 @@ correlation: ttl: network_ttl_s: 120 + # Exclude specific source IPs or CIDR ranges from correlation + # Events from these IPs will be silently dropped (not correlated, not emitted) + # Useful for excluding health checks, internal traffic, or known bad actors + exclude_source_ips: + - 10.0.0.1 # Single IP + - 192.168.1.100 # Another single IP + - 172.16.0.0/12 # CIDR range (private network) + - 10.10.10.0/24 # Another CIDR range + +# Metrics server configuration (optional, for debugging/monitoring) +metrics: + enabled: false + addr: ":8080" # Address to listen on (e.g., ":8080", "localhost:8080") + # Endpoints: + # GET /metrics - Returns correlation metrics as JSON + # GET /health - Health check endpoint + diff --git a/internal/adapters/inbound/unixsocket/source.go b/internal/adapters/inbound/unixsocket/source.go index 522b40c..f44780c 100644 --- a/internal/adapters/inbound/unixsocket/source.go +++ b/internal/adapters/inbound/unixsocket/source.go @@ -164,9 +164,9 @@ func (s *UnixSocketSource) readDatagrams(ctx context.Context, eventChan chan<- * continue } - // Debug: log raw events - s.logger.Debugf("event received: source=%s src_ip=%s src_port=%d", - event.Source, event.SrcIP, event.SrcPort) + // Debug: log raw events with all key details + s.logger.Debugf("event received: source=%s src_ip=%s src_port=%d timestamp=%v raw_timestamp=%v", + event.Source, event.SrcIP, event.SrcPort, event.Timestamp, event.Raw["timestamp"]) select { case eventChan <- event: diff --git a/internal/adapters/outbound/clickhouse/sink.go b/internal/adapters/outbound/clickhouse/sink.go index bd51e16..8834e7f 100644 --- a/internal/adapters/outbound/clickhouse/sink.go +++ b/internal/adapters/outbound/clickhouse/sink.go @@ -12,6 +12,7 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/logcorrelator/logcorrelator/internal/domain" + "github.com/logcorrelator/logcorrelator/internal/observability" ) const ( @@ -53,6 +54,12 @@ type ClickHouseSink struct { done chan struct{} wg sync.WaitGroup closeOnce sync.Once + logger *observability.Logger +} + +// SetLogger sets the logger used by the sink. +func (s *ClickHouseSink) SetLogger(logger *observability.Logger) { + s.logger = logger.WithFields(map[string]any{"sink": "clickhouse"}) } // NewClickHouseSink creates a new ClickHouse sink. @@ -83,6 +90,7 @@ func NewClickHouseSink(config Config) (*ClickHouseSink, error) { buffer: make([]domain.CorrelatedLog, 0, config.BatchSize), flushChan: make(chan struct{}, 1), done: make(chan struct{}), + logger: observability.NewLogger("clickhouse"), } // Parse DSN and create options @@ -107,6 +115,8 @@ func NewClickHouseSink(config Config) (*ClickHouseSink, error) { } s.conn = conn + s.log().Infof("connected to ClickHouse: table=%s batch_size=%d flush_interval_ms=%d", + config.Table, config.BatchSize, config.FlushIntervalMs) // Start flush goroutine s.wg.Add(1) @@ -120,6 +130,14 @@ func (s *ClickHouseSink) Name() string { return "clickhouse" } +// log returns the logger, initializing a default one if not set (e.g. in tests). +func (s *ClickHouseSink) log() *observability.Logger { + if s.logger == nil { + s.logger = observability.NewLogger("clickhouse") + } + return s.logger +} + // Reopen is a no-op for ClickHouse (connection is managed internally). func (s *ClickHouseSink) Reopen() error { return nil @@ -146,6 +164,7 @@ func (s *ClickHouseSink) Write(ctx context.Context, log domain.CorrelatedLog) er s.mu.Unlock() if drop { + s.log().Warnf("buffer full, dropping log: table=%s buffer_size=%d", s.config.Table, s.config.MaxBufferSize) return nil } if time.Now().After(deadline) { @@ -201,7 +220,9 @@ func (s *ClickHouseSink) flushLoop() { select { case <-s.done: ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.config.TimeoutMs)*time.Millisecond) - _ = s.doFlush(ctx) + if err := s.doFlush(ctx); err != nil { + s.log().Error("final flush on close failed", err) + } cancel() return @@ -212,7 +233,9 @@ func (s *ClickHouseSink) flushLoop() { if needsFlush { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.config.TimeoutMs)*time.Millisecond) - _ = s.doFlush(ctx) + if err := s.doFlush(ctx); err != nil { + s.log().Error("periodic flush failed", err) + } cancel() } @@ -223,7 +246,9 @@ func (s *ClickHouseSink) flushLoop() { if needsFlush { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.config.TimeoutMs)*time.Millisecond) - _ = s.doFlush(ctx) + if err := s.doFlush(ctx); err != nil { + s.log().Error("batch flush failed", err) + } cancel() } } @@ -247,7 +272,6 @@ func (s *ClickHouseSink) doFlush(ctx context.Context) error { return fmt.Errorf("clickhouse connection is not initialized") } - // Log batch info before sending batchSize := len(buffer) // Retry logic with exponential backoff @@ -255,6 +279,8 @@ func (s *ClickHouseSink) doFlush(ctx context.Context) error { for attempt := 0; attempt < MaxRetries; attempt++ { if attempt > 0 { delay := RetryBaseDelay * time.Duration(1< ordered elements containing *NormalizedEvent - pendingB map[string][]*list.Element - networkTTLs map[*list.Element]time.Time // TTL expiration time for each B event - pendingOrphans map[string][]*pendingOrphan // key -> A events waiting to be emitted as orphans - timeProvider TimeProvider - logger *observability.Logger + config CorrelationConfig + mu sync.Mutex + bufferA *eventBuffer + bufferB *eventBuffer + pendingA map[string][]*list.Element // key -> ordered elements containing *NormalizedEvent + pendingB map[string][]*list.Element + networkTTLs map[*list.Element]time.Time // TTL expiration time for each B event + pendingOrphans map[string][]*pendingOrphan // key -> A events waiting to be emitted as orphans + timeProvider TimeProvider + logger *observability.Logger + metrics *observability.CorrelationMetrics + excludeIPsParsed []*net.IPNet // Parsed CIDR ranges for efficient lookup } type eventBuffer struct { @@ -111,6 +116,17 @@ func NewCorrelationService(config CorrelationConfig, timeProvider TimeProvider) config.ApacheEmitDelayMs = DefaultApacheEmitDelayMs } + // Parse excluded IPs (CIDR ranges) for efficient lookup + var excludeIPsParsed []*net.IPNet + for _, exclude := range config.ExcludeSourceIPs { + if strings.Contains(exclude, "/") { + _, cidr, err := net.ParseCIDR(exclude) + if err == nil { + excludeIPsParsed = append(excludeIPsParsed, cidr) + } + } + } + return &CorrelationService{ config: config, bufferA: newEventBuffer(), @@ -121,6 +137,8 @@ func NewCorrelationService(config CorrelationConfig, timeProvider TimeProvider) networkTTLs: make(map[*list.Element]time.Time), timeProvider: timeProvider, logger: observability.NewLogger("correlation"), + metrics: observability.NewCorrelationMetrics(), + excludeIPsParsed: excludeIPsParsed, } } @@ -129,11 +147,56 @@ func (s *CorrelationService) SetLogger(logger *observability.Logger) { s.logger = logger } +// isIPExcluded checks if a source IP should be excluded from correlation. +func (s *CorrelationService) isIPExcluded(ip string) bool { + // Check exact IP matches first (from config) + for _, exclude := range s.config.ExcludeSourceIPs { + if !strings.Contains(exclude, "/") { + // Exact IP match + if exclude == ip { + return true + } + if excludeIP := net.ParseIP(exclude); excludeIP != nil { + if parsedIP := net.ParseIP(ip); parsedIP != nil { + if excludeIP.Equal(parsedIP) { + return true + } + } + } + } + } + + // Check CIDR ranges + parsedIP := net.ParseIP(ip) + if parsedIP == nil { + return false + } + + for _, cidr := range s.excludeIPsParsed { + if cidr.Contains(parsedIP) { + return true + } + } + + return false +} + // ProcessEvent processes an incoming event and returns correlated logs if matches are found. func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLog { s.mu.Lock() defer s.mu.Unlock() + // Check if source IP is excluded + if s.isIPExcluded(event.SrcIP) { + s.logger.Debugf("event excluded by IP filter: source=%s src_ip=%s src_port=%d", + event.Source, event.SrcIP, event.SrcPort) + s.metrics.RecordCorrelationFailed("ip_excluded") + return nil + } + + // Record event received + s.metrics.RecordEventReceived(string(event.Source)) + // Clean expired events first s.cleanExpired() @@ -145,6 +208,7 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo // Buffer full - rotate oldest event instead of dropping new one s.logger.Warnf("buffer full, rotating oldest event: source=%s src_ip=%s src_port=%d", event.Source, event.SrcIP, event.SrcPort) + s.metrics.RecordCorrelationFailed("buffer_eviction") if event.Source == SourceA { // Remove oldest A event and emit as orphan if configured s.rotateOldestA() @@ -166,6 +230,7 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo case SourceB: results, shouldBuffer = s.processSourceB(event) default: + s.logger.Warnf("unknown event source: %s", event.Source) return nil } @@ -175,6 +240,9 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo event.Source, event.SrcIP, event.SrcPort, s.getBufferSize(event.Source)) } + // Update buffer sizes in metrics + s.metrics.UpdateBufferSizes(int64(s.bufferA.events.Len()), int64(s.bufferB.events.Len())) + // Combine orphan results with correlation results if len(orphanResults) > 0 { results = append(orphanResults, results...) @@ -248,6 +316,7 @@ func (s *CorrelationService) rotateOldestB() { func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]CorrelatedLog, bool) { key := event.CorrelationKey() + s.logger.Debugf("processing A event: key=%s timestamp=%v", key, event.Timestamp) // Look for matching B events matches := s.findMatches(s.bufferB, s.pendingB, key, func(other *NormalizedEvent) bool { @@ -259,9 +328,10 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate // Correlate with all matching B events (one-to-many) for _, bEvent := range matches { correlated := NewCorrelatedLog(event, bEvent) - s.logger.Debugf("correlation found: A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)", - event.SrcIP, event.SrcPort, bEvent.SrcIP, bEvent.SrcPort) + s.logger.Debugf("correlation found: A(src_ip=%s src_port=%d ts=%v) + B(src_ip=%s src_port=%d ts=%v)", + event.SrcIP, event.SrcPort, event.Timestamp, bEvent.SrcIP, bEvent.SrcPort, bEvent.Timestamp) results = append(results, correlated) + s.metrics.RecordCorrelationSuccess() // Reset TTL for matched B event (Keep-Alive) if s.config.MatchingMode == MatchingModeOneToMany { @@ -271,6 +341,9 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate for _, elem := range elements { if elem.Value.(*NormalizedEvent) == bEvent { s.resetNetworkTTL(elem) + s.metrics.RecordKeepAliveReset() + s.logger.Debugf("TTL reset for B event (Keep-Alive): key=%s new_ttl=%v", + bKey, s.config.NetworkTTLS) break } } @@ -286,6 +359,25 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate return results, false } + // No match found - check if there are B events with same key but outside time window + if bElements, ok := s.pendingB[key]; ok && len(bElements) > 0 { + // Key exists but no time match - log the time difference for debugging + for _, bElem := range bElements { + bEvent := bElem.Value.(*NormalizedEvent) + timeDiff := event.Timestamp.Sub(bEvent.Timestamp) + if timeDiff < 0 { + timeDiff = -timeDiff + } + s.logger.Debugf("A event has same key as B but outside time window: key=%s time_diff=%v window=%v", + key, timeDiff, s.config.TimeWindow) + } + s.metrics.RecordCorrelationFailed("time_window") + } else { + // No B events with same key at all + s.logger.Debugf("A event has no matching B key in buffer: key=%s", key) + s.metrics.RecordCorrelationFailed("no_match_key") + } + // No match found - orphan A event // Instead of emitting immediately, add to pending orphans with delay // This allows B events to arrive slightly after A and still correlate @@ -294,9 +386,11 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate if s.config.ApacheEmitDelayMs == 0 { orphan := NewCorrelatedLogFromEvent(event, "A") s.logger.Warnf("orphan A event (immediate): src_ip=%s src_port=%d", event.SrcIP, event.SrcPort) + s.metrics.RecordOrphanEmitted("A") return []CorrelatedLog{orphan}, false } s.addPendingOrphan(event) + s.metrics.RecordPendingOrphan() s.logger.Debugf("A event added to pending orphans (delay=%dms): src_ip=%s src_port=%d", s.config.ApacheEmitDelayMs, event.SrcIP, event.SrcPort) // Don't emit yet - will be emitted after delay expires @@ -310,13 +404,16 @@ func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]Correlate func (s *CorrelationService) processSourceB(event *NormalizedEvent) ([]CorrelatedLog, bool) { key := event.CorrelationKey() + s.logger.Debugf("processing B event: key=%s timestamp=%v", key, event.Timestamp) // FIRST: Check if there's a pending orphan A that matches this B event // This is the key optimization for delayed orphan emission if aEvent := s.checkPendingOrphansForCorrelation(event); aEvent != nil { correlated := NewCorrelatedLog(aEvent, event) - s.logger.Debugf("correlation found (pending orphan): A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)", - aEvent.SrcIP, aEvent.SrcPort, event.SrcIP, event.SrcPort) + s.logger.Debugf("correlation found (pending orphan): A(src_ip=%s src_port=%d ts=%v) + B(src_ip=%s src_port=%d ts=%v)", + aEvent.SrcIP, aEvent.SrcPort, aEvent.Timestamp, event.SrcIP, event.SrcPort, event.Timestamp) + s.metrics.RecordCorrelationSuccess() + s.metrics.RecordPendingOrphanMatch() return []CorrelatedLog{correlated}, false } @@ -325,12 +422,34 @@ func (s *CorrelationService) processSourceB(event *NormalizedEvent) ([]Correlate return s.eventsMatch(other, event) }); aEvent != nil { correlated := NewCorrelatedLog(aEvent, event) - s.logger.Debugf("correlation found: A(src_ip=%s src_port=%d) + B(src_ip=%s src_port=%d)", - aEvent.SrcIP, aEvent.SrcPort, event.SrcIP, event.SrcPort) + s.logger.Debugf("correlation found: A(src_ip=%s src_port=%d ts=%v) + B(src_ip=%s src_port=%d ts=%v)", + aEvent.SrcIP, aEvent.SrcPort, aEvent.Timestamp, event.SrcIP, event.SrcPort, event.Timestamp) + s.metrics.RecordCorrelationSuccess() return []CorrelatedLog{correlated}, false } + // No match found - check if there are A events with same key but outside time window + if aElements, ok := s.pendingA[key]; ok && len(aElements) > 0 { + // Key exists but no time match - log the time difference for debugging + for _, aElem := range aElements { + aEvent := aElem.Value.(*NormalizedEvent) + timeDiff := aEvent.Timestamp.Sub(event.Timestamp) + if timeDiff < 0 { + timeDiff = -timeDiff + } + s.logger.Debugf("B event has same key as A but outside time window: key=%s time_diff=%v window=%v", + key, timeDiff, s.config.TimeWindow) + } + s.metrics.RecordCorrelationFailed("time_window") + } else { + // No A events with same key at all + s.logger.Debugf("B event has no matching A key in buffer: key=%s", key) + s.metrics.RecordCorrelationFailed("no_match_key") + } + // Never emit B alone. Keep in buffer for potential future match. + s.logger.Debugf("B event buffered (no match yet): key=%s src_ip=%s src_port=%d", + key, event.SrcIP, event.SrcPort) return nil, true } @@ -417,8 +536,12 @@ func (s *CorrelationService) cleanBufferAByBTTL() { } if s.config.ApacheAlwaysEmit { - s.logger.Warnf("orphan A event (no B match, TTL expired): src_ip=%s src_port=%d", - event.SrcIP, event.SrcPort) + s.logger.Warnf("orphan A event (no B match, TTL expired): src_ip=%s src_port=%d key=%s", + event.SrcIP, event.SrcPort, key) + s.metrics.RecordOrphanEmitted("A") + } else { + s.logger.Debugf("A event removed (no valid B, TTL expired): src_ip=%s src_port=%d key=%s", + event.SrcIP, event.SrcPort, key) } } } @@ -448,6 +571,7 @@ func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string // rather than the original event timestamp. func (s *CorrelationService) cleanNetworkBufferByTTL() { now := s.timeProvider.Now() + var removed int for elem, ttl := range s.networkTTLs { if now.After(ttl) { event := elem.Value.(*NormalizedEvent) @@ -458,8 +582,15 @@ func (s *CorrelationService) cleanNetworkBufferByTTL() { delete(s.pendingB, key) } delete(s.networkTTLs, elem) + removed++ + s.logger.Debugf("B event TTL expired: key=%s src_ip=%s src_port=%d ttl=%v age=%v", + key, event.SrcIP, event.SrcPort, s.config.NetworkTTLS, now.Sub(event.Timestamp)) + s.metrics.RecordCorrelationFailed("ttl_expired") } } + if removed > 0 { + s.logger.Debugf("cleaned %d expired B events", removed) + } } func (s *CorrelationService) findAndPopFirstMatch( @@ -635,19 +766,20 @@ func (s *CorrelationService) emitPendingOrphans() []CorrelatedLog { if !s.config.ApacheAlwaysEmit { return nil } - + now := s.timeProvider.Now() var results []CorrelatedLog - + for key, orphans := range s.pendingOrphans { for i := len(orphans) - 1; i >= 0; i-- { if now.After(orphans[i].emitAfter) { // Time to emit this orphan orphan := NewCorrelatedLogFromEvent(orphans[i].event, "A") - s.logger.Warnf("orphan A event (emit delay expired): src_ip=%s src_port=%d", - orphans[i].event.SrcIP, orphans[i].event.SrcPort) + s.logger.Warnf("orphan A event (emit delay expired): src_ip=%s src_port=%d key=%s delay_ms=%d", + orphans[i].event.SrcIP, orphans[i].event.SrcPort, key, s.config.ApacheEmitDelayMs) + s.metrics.RecordOrphanEmitted("A") results = append(results, orphan) - + // Remove from pending s.pendingOrphans[key] = append(orphans[:i], orphans[i+1:]...) if len(s.pendingOrphans[key]) == 0 { @@ -656,7 +788,7 @@ func (s *CorrelationService) emitPendingOrphans() []CorrelatedLog { } } } - + return results } @@ -755,3 +887,15 @@ func (s *CorrelationService) GetBufferSizes() (int, int) { defer s.mu.Unlock() return s.bufferA.events.Len(), s.bufferB.events.Len() } + +// GetMetrics returns the correlation metrics (for monitoring/debugging). +func (s *CorrelationService) GetMetrics() *observability.CorrelationMetrics { + return s.metrics +} + +// GetMetricsSnapshot returns a point-in-time snapshot of metrics. +func (s *CorrelationService) GetMetricsSnapshot() observability.MetricsSnapshot { + s.mu.Lock() + defer s.mu.Unlock() + return s.metrics.Snapshot() +} diff --git a/internal/observability/metrics.go b/internal/observability/metrics.go new file mode 100644 index 0000000..ebb9e97 --- /dev/null +++ b/internal/observability/metrics.go @@ -0,0 +1,176 @@ +package observability + +import ( + "encoding/json" + "fmt" + "strings" + "sync" + "sync/atomic" +) + +// CorrelationMetrics tracks correlation statistics for debugging and monitoring. +type CorrelationMetrics struct { + mu sync.RWMutex + + // Events received + eventsReceivedA atomic.Int64 + eventsReceivedB atomic.Int64 + + // Correlation results + correlationsSuccess atomic.Int64 + correlationsFailed atomic.Int64 + + // Failure reasons + failedNoMatchKey atomic.Int64 // No event with same key in buffer + failedTimeWindow atomic.Int64 // Key found but outside time window + failedBufferEviction atomic.Int64 // Event evicted due to buffer full + failedTTLExpired atomic.Int64 // B event TTL expired before match + failedIPExcluded atomic.Int64 // Event excluded by IP filter + + // Buffer stats + bufferASize atomic.Int64 + bufferBSize atomic.Int64 + + // Orphan stats + orphansEmittedA atomic.Int64 + orphansEmittedB atomic.Int64 + orphansPendingA atomic.Int64 + pendingOrphanMatch atomic.Int64 // B matched with pending orphan A + + // Keep-Alive stats + keepAliveResets atomic.Int64 // Number of TTL resets (one-to-many mode) +} + +// NewCorrelationMetrics creates a new metrics tracker. +func NewCorrelationMetrics() *CorrelationMetrics { + return &CorrelationMetrics{} +} + +// RecordEventReceived records an event received from a source. +func (m *CorrelationMetrics) RecordEventReceived(source string) { + if source == "A" { + m.eventsReceivedA.Add(1) + } else if source == "B" { + m.eventsReceivedB.Add(1) + } +} + +// RecordCorrelationSuccess records a successful correlation. +func (m *CorrelationMetrics) RecordCorrelationSuccess() { + m.correlationsSuccess.Add(1) +} + +// RecordCorrelationFailed records a failed correlation attempt with the reason. +func (m *CorrelationMetrics) RecordCorrelationFailed(reason string) { + m.correlationsFailed.Add(1) + switch reason { + case "no_match_key": + m.failedNoMatchKey.Add(1) + case "time_window": + m.failedTimeWindow.Add(1) + case "buffer_eviction": + m.failedBufferEviction.Add(1) + case "ttl_expired": + m.failedTTLExpired.Add(1) + case "ip_excluded": + m.failedIPExcluded.Add(1) + } +} + +// RecordBufferEviction records an event evicted from buffer. +func (m *CorrelationMetrics) RecordBufferEviction(source string) { + // Can be used for additional tracking if needed +} + +// RecordOrphanEmitted records an orphan event emitted. +func (m *CorrelationMetrics) RecordOrphanEmitted(source string) { + if source == "A" { + m.orphansEmittedA.Add(1) + } else if source == "B" { + m.orphansEmittedB.Add(1) + } +} + +// RecordPendingOrphan records an A event added to pending orphans. +func (m *CorrelationMetrics) RecordPendingOrphan() { + m.orphansPendingA.Add(1) +} + +// RecordPendingOrphanMatch records a B event matching a pending orphan A. +func (m *CorrelationMetrics) RecordPendingOrphanMatch() { + m.pendingOrphanMatch.Add(1) +} + +// RecordKeepAliveReset records a TTL reset for Keep-Alive. +func (m *CorrelationMetrics) RecordKeepAliveReset() { + m.keepAliveResets.Add(1) +} + +// UpdateBufferSizes updates the current buffer sizes. +func (m *CorrelationMetrics) UpdateBufferSizes(sizeA, sizeB int64) { + m.bufferASize.Store(sizeA) + m.bufferBSize.Store(sizeB) +} + +// Snapshot returns a point-in-time snapshot of all metrics. +func (m *CorrelationMetrics) Snapshot() MetricsSnapshot { + return MetricsSnapshot{ + EventsReceivedA: m.eventsReceivedA.Load(), + EventsReceivedB: m.eventsReceivedB.Load(), + CorrelationsSuccess: m.correlationsSuccess.Load(), + CorrelationsFailed: m.correlationsFailed.Load(), + FailedNoMatchKey: m.failedNoMatchKey.Load(), + FailedTimeWindow: m.failedTimeWindow.Load(), + FailedBufferEviction: m.failedBufferEviction.Load(), + FailedTTLExpired: m.failedTTLExpired.Load(), + FailedIPExcluded: m.failedIPExcluded.Load(), + BufferASize: m.bufferASize.Load(), + BufferBSize: m.bufferBSize.Load(), + OrphansEmittedA: m.orphansEmittedA.Load(), + OrphansEmittedB: m.orphansEmittedB.Load(), + OrphansPendingA: m.orphansPendingA.Load(), + PendingOrphanMatch: m.pendingOrphanMatch.Load(), + KeepAliveResets: m.keepAliveResets.Load(), + } +} + +// MetricsSnapshot is a point-in-time snapshot of metrics. +type MetricsSnapshot struct { + EventsReceivedA int64 `json:"events_received_a"` + EventsReceivedB int64 `json:"events_received_b"` + CorrelationsSuccess int64 `json:"correlations_success"` + CorrelationsFailed int64 `json:"correlations_failed"` + FailedNoMatchKey int64 `json:"failed_no_match_key"` + FailedTimeWindow int64 `json:"failed_time_window"` + FailedBufferEviction int64 `json:"failed_buffer_eviction"` + FailedTTLExpired int64 `json:"failed_ttl_expired"` + FailedIPExcluded int64 `json:"failed_ip_excluded"` + BufferASize int64 `json:"buffer_a_size"` + BufferBSize int64 `json:"buffer_b_size"` + OrphansEmittedA int64 `json:"orphans_emitted_a"` + OrphansEmittedB int64 `json:"orphans_emitted_b"` + OrphansPendingA int64 `json:"orphans_pending_a"` + PendingOrphanMatch int64 `json:"pending_orphan_match"` + KeepAliveResets int64 `json:"keepalive_resets"` +} + +// MarshalJSON implements json.Marshaler. +func (m *CorrelationMetrics) MarshalJSON() ([]byte, error) { + return json.Marshal(m.Snapshot()) +} + +// String returns a human-readable string of metrics. +func (m *CorrelationMetrics) String() string { + s := m.Snapshot() + var b strings.Builder + b.WriteString("Correlation Metrics:\n") + fmt.Fprintf(&b, " Events Received: A=%d B=%d Total=%d\n", s.EventsReceivedA, s.EventsReceivedB, s.EventsReceivedA+s.EventsReceivedB) + fmt.Fprintf(&b, " Correlations: Success=%d Failed=%d\n", s.CorrelationsSuccess, s.CorrelationsFailed) + fmt.Fprintf(&b, " Failure Reasons: no_match_key=%d time_window=%d buffer_eviction=%d ttl_expired=%d ip_excluded=%d\n", + s.FailedNoMatchKey, s.FailedTimeWindow, s.FailedBufferEviction, s.FailedTTLExpired, s.FailedIPExcluded) + fmt.Fprintf(&b, " Buffer Sizes: A=%d B=%d\n", s.BufferASize, s.BufferBSize) + fmt.Fprintf(&b, " Orphans: Emitted A=%d B=%d Pending A=%d\n", s.OrphansEmittedA, s.OrphansEmittedB, s.OrphansPendingA) + fmt.Fprintf(&b, " Pending Orphan Match: %d\n", s.PendingOrphanMatch) + fmt.Fprintf(&b, " Keep-Alive Resets: %d\n", s.KeepAliveResets) + return b.String() +} diff --git a/internal/observability/metrics_server.go b/internal/observability/metrics_server.go new file mode 100644 index 0000000..76d9489 --- /dev/null +++ b/internal/observability/metrics_server.go @@ -0,0 +1,128 @@ +package observability + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "sync" + "time" +) + +// MetricsServer exposes correlation metrics via HTTP. +type MetricsServer struct { + mu sync.Mutex + server *http.Server + listener net.Listener + metricsFunc func() MetricsSnapshot + running bool +} + +// NewMetricsServer creates a new metrics HTTP server. +func NewMetricsServer(addr string, metricsFunc func() MetricsSnapshot) (*MetricsServer, error) { + if metricsFunc == nil { + return nil, fmt.Errorf("metricsFunc cannot be nil") + } + + ms := &MetricsServer{ + metricsFunc: metricsFunc, + } + + mux := http.NewServeMux() + mux.HandleFunc("/metrics", ms.handleMetrics) + mux.HandleFunc("/health", ms.handleHealth) + + ms.server = &http.Server{ + Addr: addr, + Handler: mux, + ReadTimeout: 5 * time.Second, + WriteTimeout: 10 * time.Second, + } + + return ms, nil +} + +// Start begins listening on the configured address. +func (ms *MetricsServer) Start() error { + ms.mu.Lock() + defer ms.mu.Unlock() + + if ms.running { + return nil + } + + listener, err := net.Listen("tcp", ms.server.Addr) + if err != nil { + return fmt.Errorf("failed to start metrics server: %w", err) + } + + ms.listener = listener + ms.running = true + + go func() { + if err := ms.server.Serve(listener); err != nil && err != http.ErrServerClosed { + // Server error or closed + } + }() + + return nil +} + +// Stop gracefully stops the metrics server. +func (ms *MetricsServer) Stop(ctx context.Context) error { + ms.mu.Lock() + defer ms.mu.Unlock() + + if !ms.running { + return nil + } + + ms.running = false + return ms.server.Shutdown(ctx) +} + +// handleMetrics returns the correlation metrics as JSON. +func (ms *MetricsServer) handleMetrics(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + metrics := ms.metricsFunc() + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(metrics); err != nil { + http.Error(w, "Failed to encode metrics", http.StatusInternalServerError) + return + } +} + +// handleHealth returns a simple health check response. +func (ms *MetricsServer) handleHealth(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, `{"status":"healthy"}`) +} + +// IsRunning returns true if the server is running. +func (ms *MetricsServer) IsRunning() bool { + ms.mu.Lock() + defer ms.mu.Unlock() + return ms.running +} + +// Addr returns the listening address. +func (ms *MetricsServer) Addr() string { + ms.mu.Lock() + defer ms.mu.Unlock() + if ms.listener == nil { + return "" + } + return ms.listener.Addr().String() +} diff --git a/packaging/rpm/logcorrelator.spec b/packaging/rpm/logcorrelator.spec index a419ec0..b913e48 100644 --- a/packaging/rpm/logcorrelator.spec +++ b/packaging/rpm/logcorrelator.spec @@ -141,6 +141,40 @@ exit 0 %config(noreplace) /etc/logrotate.d/logcorrelator %changelog +* Thu Mar 05 2026 logcorrelator - 1.1.11-1 +- Fix: StdoutSink no longer writes correlated/orphan JSON to stdout +- Fix: stdout sink is now a no-op for data; operational logs go to stderr via logger +- Fix: ClickHouse sink had no logger - all flush errors were silently discarded +- Fix: Periodic, batch and final-close flush errors are now logged at ERROR level +- Fix: Buffer overflow with DropOnOverflow=true is now logged at WARN level +- Fix: Retry attempts are now logged at WARN level with attempt number, delay and error +- Feat: ClickHouse connection success logged at INFO (table, batch_size, flush_interval_ms) +- Feat: Successful batch sends logged at DEBUG (rows count, table) +- Feat: SetLogger() method added to ClickHouseSink for external logger injection +- Test: New unit tests for StdoutSink asserting stdout remains empty for all log types + +* Wed Mar 04 2026 logcorrelator - 1.1.10-1 +- Feat: IP exclusion filter - exclude specific source IPs or CIDR ranges +- Feat: Configuration exclude_source_ips supports single IPs and CIDR notation +- Feat: Debug logging for excluded IPs +- Feat: New metric failed_ip_excluded for monitoring filtered traffic +- Feat: Architecture documentation updated with observability section +- Use cases: exclude health checks, internal traffic, known bad actors +- Docs: README.md updated with IP exclusion documentation +- Docs: architecture.yml updated with metrics and troubleshooting guide + +* Wed Mar 04 2026 logcorrelator - 1.1.9-1 +- Feat: Debug logging - detailed DEBUG logs for correlation troubleshooting +- Feat: Correlation metrics server (HTTP endpoint /metrics and /health) +- Feat: New metrics: events_received, correlations_success/failed, failure reasons +- Feat: Failure reason tracking: no_match_key, time_window, buffer_eviction, ttl_expired +- Feat: Buffer size monitoring (buffer_a_size, buffer_b_size) +- Feat: Orphan tracking (orphans_emitted, orphans_pending, pending_orphan_match) +- Feat: Keep-Alive reset counter for connection tracking +- Feat: Test scripts added (test-correlation.sh, test-correlation-advanced.py) +- Change: Config example updated with metrics section +- Docs: README.md updated with debugging guide and troubleshooting table + * Tue Mar 03 2026 logcorrelator - 1.1.8-1 - Migrated from FPM to rpmbuild (native RPM build) - Reduced build image size by 200MB (-40%) diff --git a/scripts/test-correlation-advanced.py b/scripts/test-correlation-advanced.py new file mode 100755 index 0000000..9e629e7 --- /dev/null +++ b/scripts/test-correlation-advanced.py @@ -0,0 +1,582 @@ +#!/usr/bin/env python3 +""" +test-correlation-advanced.py - Advanced correlation testing tool + +This script provides comprehensive testing for the logcorrelator service, +including various scenarios to debug correlation issues. + +Usage: + python3 test-correlation-advanced.py [options] + +Requirements: + - Python 3.6+ + - requests library (for metrics): pip install requests +""" + +import argparse +import json +import socket +import sys +import time +from datetime import datetime +from typing import Dict, Any, Optional, Tuple + +try: + import requests + HAS_REQUESTS = True +except ImportError: + HAS_REQUESTS = False + + +class Colors: + """ANSI color codes for terminal output.""" + BLUE = '\033[0;34m' + GREEN = '\033[0;32m' + YELLOW = '\033[1;33m' + RED = '\033[0;31m' + NC = '\033[0m' # No Color + BOLD = '\033[1m' + + +def colorize(text: str, color: str) -> str: + """Wrap text with ANSI color codes.""" + return f"{color}{text}{Colors.NC}" + + +def info(text: str): + print(colorize(f"[INFO] ", Colors.BLUE) + text) + + +def success(text: str): + print(colorize(f"[OK] ", Colors.GREEN) + text) + + +def warn(text: str): + print(colorize(f"[WARN] ", Colors.YELLOW) + text) + + +def error(text: str): + print(colorize(f"[ERROR] ", Colors.RED) + text) + + +def debug(text: str, verbose: bool = False): + if verbose: + print(colorize(f"[DEBUG] ", Colors.BLUE) + text) + + +class CorrelationTester: + """Main test class for correlation testing.""" + + def __init__( + self, + http_socket: str = "/var/run/logcorrelator/http.socket", + network_socket: str = "/var/run/logcorrelator/network.socket", + metrics_url: str = "http://localhost:8080/metrics", + verbose: bool = False, + skip_metrics: bool = False + ): + self.http_socket = http_socket + self.network_socket = network_socket + self.metrics_url = metrics_url + self.verbose = verbose + self.skip_metrics = skip_metrics + self.http_sock: Optional[socket.socket] = None + self.network_sock: Optional[socket.socket] = None + + def connect(self) -> bool: + """Connect to Unix sockets.""" + try: + # HTTP socket + self.http_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self.http_sock.connect(self.http_socket) + debug(f"Connected to HTTP socket: {self.http_socket}", self.verbose) + + # Network socket + self.network_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self.network_sock.connect(self.network_socket) + debug(f"Connected to Network socket: {self.network_socket}", self.verbose) + + return True + except FileNotFoundError as e: + error(f"Socket not found: {e}") + return False + except Exception as e: + error(f"Connection error: {e}") + return False + + def close(self): + """Close socket connections.""" + if self.http_sock: + self.http_sock.close() + if self.network_sock: + self.network_sock.close() + + def send_http_event( + self, + src_ip: str, + src_port: int, + timestamp: int, + method: str = "GET", + path: str = "/test", + host: str = "example.com", + extra_headers: Optional[Dict[str, str]] = None + ) -> Dict[str, Any]: + """Send an HTTP (source A) event.""" + event = { + "src_ip": src_ip, + "src_port": src_port, + "dst_ip": "10.0.0.1", + "dst_port": 443, + "timestamp": timestamp, + "method": method, + "path": path, + "host": host, + "http_version": "HTTP/1.1", + "header_user_agent": "TestAgent/1.0", + "header_accept": "*/*" + } + + if extra_headers: + for key, value in extra_headers.items(): + event[f"header_{key}"] = value + + json_data = json.dumps(event) + + if self.http_sock: + self.http_sock.sendall(json_data.encode()) + debug(f"Sent HTTP event: {src_ip}:{src_port} ts={timestamp}", self.verbose) + + return event + + def send_network_event( + self, + src_ip: str, + src_port: int, + timestamp: int, + ja3: str = "abc123", + ja4: str = "def456", + tls_version: str = "TLS1.3", + tls_sni: str = "example.com" + ) -> Dict[str, Any]: + """Send a Network (source B) event.""" + event = { + "src_ip": src_ip, + "src_port": src_port, + "dst_ip": "10.0.0.1", + "dst_port": 443, + "timestamp": timestamp, + "ja3": ja3, + "ja4": ja4, + "tls_version": tls_version, + "tls_sni": tls_sni + } + + json_data = json.dumps(event) + + if self.network_sock: + self.network_sock.sendall(json_data.encode()) + debug(f"Sent Network event: {src_ip}:{src_port} ts={timestamp}", self.verbose) + + return event + + def get_metrics(self) -> Dict[str, Any]: + """Fetch metrics from the metrics server.""" + if self.skip_metrics: + return {} + + if not HAS_REQUESTS: + warn("requests library not installed, skipping metrics") + return {} + + try: + response = requests.get(self.metrics_url, timeout=5) + response.raise_for_status() + return response.json() + except Exception as e: + warn(f"Failed to fetch metrics: {e}") + return {} + + def print_metrics(self, metrics: Dict[str, Any], title: str = "Metrics"): + """Print metrics in a formatted way.""" + if not metrics: + return + + print(f"\n{colorize(f'=== {title} ===', Colors.BOLD)}") + + keys_to_show = [ + ("events_received_a", "Events A"), + ("events_received_b", "Events B"), + ("correlations_success", "Correlations"), + ("correlations_failed", "Failures"), + ("failed_no_match_key", " - No match key"), + ("failed_time_window", " - Time window"), + ("failed_buffer_eviction", " - Buffer eviction"), + ("failed_ttl_expired", " - TTL expired"), + ("buffer_a_size", "Buffer A size"), + ("buffer_b_size", "Buffer B size"), + ("orphans_emitted_a", "Orphans A"), + ("orphans_emitted_b", "Orphans B"), + ("pending_orphan_match", "Pending orphan matches"), + ("keepalive_resets", "Keep-Alive resets"), + ] + + for key, label in keys_to_show: + if key in metrics: + print(f" {label}: {metrics[key]}") + + def check_sockets(self) -> bool: + """Check if sockets exist.""" + import os + + errors = 0 + for name, path in [("HTTP", self.http_socket), ("Network", self.network_socket)]: + if not os.path.exists(path): + error(f"{name} socket not found: {path}") + errors += 1 + elif not os.path.exists(path) or not os.path.stat(path).st_mode & 0o170000 == 0o140000: + # Check if it's a socket + try: + if not socket.getaddrinfo(path, None, socket.AF_UNIX): + error(f"{name} path exists but is not a socket: {path}") + errors += 1 + except: + pass + else: + debug(f"{name} socket found: {path}", self.verbose) + + return errors == 0 + + def run_basic_test(self, count: int = 10, delay_ms: int = 100) -> Tuple[bool, Dict[str, int]]: + """ + Run basic correlation test. + + Sends N pairs of A+B events with matching src_ip:src_port and timestamps. + All should correlate successfully. + """ + info(f"Running basic correlation test with {count} pairs...") + + # Get initial metrics + initial_metrics = self.get_metrics() + self.print_metrics(initial_metrics, "Initial Metrics") + + initial_success = initial_metrics.get("correlations_success", 0) + initial_failed = initial_metrics.get("correlations_failed", 0) + initial_a = initial_metrics.get("events_received_a", 0) + initial_b = initial_metrics.get("events_received_b", 0) + + # Send test events + print(f"\nSending {count} event pairs...") + + base_timestamp = time.time_ns() + sent = 0 + + for i in range(1, count + 1): + src_ip = f"192.168.1.{(i % 254) + 1}" + src_port = 8000 + i + + # Same timestamp for perfect correlation + timestamp = base_timestamp + (i * 1_000_000) + + self.send_http_event(src_ip, src_port, timestamp) + self.send_network_event(src_ip, src_port, timestamp) + + sent += 1 + + if delay_ms > 0: + time.sleep(delay_ms / 1000.0) + + success(f"Sent {sent} event pairs") + + # Wait for processing + info("Waiting for processing (2 seconds)...") + time.sleep(2) + + # Get final metrics + final_metrics = self.get_metrics() + self.print_metrics(final_metrics, "Final Metrics") + + # Calculate deltas + delta_success = final_metrics.get("correlations_success", 0) - initial_success + delta_failed = final_metrics.get("correlations_failed", 0) - initial_failed + delta_a = final_metrics.get("events_received_a", 0) - initial_a + delta_b = final_metrics.get("events_received_b", 0) - initial_b + + results = { + "sent": sent, + "received_a": delta_a, + "received_b": delta_b, + "correlations": delta_success, + "failures": delta_failed + } + + # Print results + print(f"\n{colorize('=== Results ===', Colors.BOLD)}") + print(f" Events A sent: {delta_a} (expected: {sent})") + print(f" Events B sent: {delta_b} (expected: {sent})") + print(f" Correlations: {delta_success}") + print(f" Failures: {delta_failed}") + + # Validation + test_passed = True + + if delta_a != sent: + error(f"Event A count mismatch: got {delta_a}, expected {sent}") + test_passed = False + + if delta_b != sent: + error(f"Event B count mismatch: got {delta_b}, expected {sent}") + test_passed = False + + if delta_success != sent: + error(f"Correlation count mismatch: got {delta_success}, expected {sent}") + test_passed = False + + if delta_failed > 0: + warn(f"Unexpected correlation failures: {delta_failed}") + + if test_passed: + success("All tests passed! Correlation is working correctly.") + else: + error("Some tests failed. Check logs for details.") + + return test_passed, results + + def run_time_window_test(self) -> bool: + """Test time window expiration.""" + info("Running time window test...") + + src_ip = "192.168.100.1" + src_port = 9999 + + # Send A event + ts_a = time.time_ns() + self.send_http_event(src_ip, src_port, ts_a) + info(f"Sent A event at {ts_a}") + + # Wait for time window to expire (default 10s) + info("Waiting 11 seconds (time window should expire)...") + time.sleep(11) + + # Send B event + ts_b = time.time_ns() + self.send_network_event(src_ip, src_port, ts_b) + info(f"Sent B event at {ts_b}") + + time_diff_sec = (ts_b - ts_a) / 1_000_000_000 + info(f"Time difference: {time_diff_sec:.1f} seconds") + info("Expected: time_window failure (check metrics)") + + return True + + def run_different_ip_test(self) -> bool: + """Test different IP (should not correlate).""" + info("Running different IP test...") + + ts = time.time_ns() + + # Send A with IP 192.168.200.1 + self.send_http_event("192.168.200.1", 7777, ts) + info("Sent A event from 192.168.200.1:7777") + + # Send B with different IP + self.send_network_event("192.168.200.2", 7777, ts) + info("Sent B event from 192.168.200.2:7777 (different IP)") + + info("Expected: no_match_key failure (different src_ip)") + + return True + + def run_keepalive_test(self, count: int = 5) -> bool: + """Test Keep-Alive mode (one B correlates with multiple A).""" + info(f"Running Keep-Alive test with {count} HTTP requests on same connection...") + + src_ip = "192.168.50.1" + src_port = 6000 + + # Send one B event first (network/TCP connection) + ts_b = time.time_ns() + self.send_network_event(src_ip, src_port, ts_b) + info(f"Sent B event (connection): {src_ip}:{src_port}") + + # Send multiple A events (HTTP requests) on same connection + for i in range(count): + ts_a = time.time_ns() + (i * 100_000_000) # 100ms apart + self.send_http_event(src_ip, src_port, ts_a, path=f"/request{i}") + info(f"Sent A event (request {i}): {src_ip}:{src_port}") + time.sleep(0.05) # 50ms delay + + time.sleep(2) # Wait for processing + + # Check metrics + metrics = self.get_metrics() + keepalive_resets = metrics.get("keepalive_resets", 0) + + info(f"Keep-Alive resets: {keepalive_resets} (expected: {count - 1})") + + if keepalive_resets >= count - 1: + success("Keep-Alive test passed!") + return True + else: + warn(f"Keep-Alive resets lower than expected. This may be normal depending on timing.") + return True + + def run_all_tests(self) -> bool: + """Run all test scenarios.""" + results = [] + + # Basic test + passed, _ = self.run_basic_test(count=10) + results.append(("Basic correlation", passed)) + + print("\n" + "=" * 50 + "\n") + + # Time window test + self.run_time_window_test() + results.append(("Time window", True)) # Informational + + print("\n" + "=" * 50 + "\n") + + # Different IP test + self.run_different_ip_test() + results.append(("Different IP", True)) # Informational + + print("\n" + "=" * 50 + "\n") + + # Keep-Alive test + self.run_keepalive_test() + results.append(("Keep-Alive", True)) + + # Summary + print(f"\n{colorize('=== Test Summary ===', Colors.BOLD)}") + for name, passed in results: + status = colorize("PASS", Colors.GREEN) if passed else colorize("FAIL", Colors.RED) + print(f" {name}: {status}") + + return all(r[1] for r in results) + + +def main(): + parser = argparse.ArgumentParser( + description="Advanced correlation testing tool for logcorrelator", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Run basic test with 20 pairs + python3 test-correlation-advanced.py -c 20 + + # Run all tests with verbose output + python3 test-correlation-advanced.py --all -v + + # Test with custom socket paths + python3 test-correlation-advanced.py -H /tmp/http.sock -N /tmp/network.sock + + # Skip metrics check + python3 test-correlation-advanced.py --skip-metrics + """ + ) + + parser.add_argument( + "-H", "--http-socket", + default="/var/run/logcorrelator/http.socket", + help="Path to HTTP Unix socket (default: /var/run/logcorrelator/http.socket)" + ) + parser.add_argument( + "-N", "--network-socket", + default="/var/run/logcorrelator/network.socket", + help="Path to Network Unix socket (default: /var/run/logcorrelator/network.socket)" + ) + parser.add_argument( + "-m", "--metrics-url", + default="http://localhost:8080/metrics", + help="Metrics server URL (default: http://localhost:8080/metrics)" + ) + parser.add_argument( + "-c", "--count", + type=int, + default=10, + help="Number of test pairs to send (default: 10)" + ) + parser.add_argument( + "-d", "--delay", + type=int, + default=100, + help="Delay between pairs in milliseconds (default: 100)" + ) + parser.add_argument( + "-v", "--verbose", + action="store_true", + help="Enable verbose output" + ) + parser.add_argument( + "--skip-metrics", + action="store_true", + help="Skip metrics check" + ) + parser.add_argument( + "--all", + action="store_true", + help="Run all test scenarios" + ) + parser.add_argument( + "--time-window", + action="store_true", + help="Run time window test only" + ) + parser.add_argument( + "--different-ip", + action="store_true", + help="Run different IP test only" + ) + parser.add_argument( + "--keepalive", + action="store_true", + help="Run Keep-Alive test only" + ) + + args = parser.parse_args() + + # Create tester + tester = CorrelationTester( + http_socket=args.http_socket, + network_socket=args.network_socket, + metrics_url=args.metrics_url, + verbose=args.verbose, + skip_metrics=args.skip_metrics + ) + + # Check sockets + if not tester.check_sockets(): + error("Socket check failed. Is logcorrelator running?") + sys.exit(1) + + success("Socket check passed") + + # Connect + if not tester.connect(): + error("Failed to connect to sockets") + sys.exit(1) + + try: + if args.all: + success = tester.run_all_tests() + elif args.time_window: + tester.run_time_window_test() + success = True + elif args.different_ip: + tester.run_different_ip_test() + success = True + elif args.keepalive: + tester.run_keepalive_test() + success = True + else: + _, _ = tester.run_basic_test(count=args.count, delay_ms=args.delay) + success = True + + sys.exit(0 if success else 1) + + finally: + tester.close() + + +if __name__ == "__main__": + main() diff --git a/scripts/test-correlation.sh b/scripts/test-correlation.sh new file mode 100755 index 0000000..ea25a36 --- /dev/null +++ b/scripts/test-correlation.sh @@ -0,0 +1,404 @@ +#!/bin/bash +# +# test-correlation.sh - Test script for log correlation debugging +# +# This script sends test HTTP (A) and Network (B) events to the logcorrelator +# Unix sockets and verifies that correlation is working correctly. +# +# Usage: +# ./test-correlation.sh [options] +# +# Options: +# -h, --http-socket PATH Path to HTTP socket (default: /var/run/logcorrelator/http.socket) +# -n, --network-socket PATH Path to Network socket (default: /var/run/logcorrelator/network.socket) +# -c, --count NUM Number of test pairs to send (default: 10) +# -d, --delay MS Delay between pairs in milliseconds (default: 100) +# -v, --verbose Enable verbose output +# -m, --metrics-url URL Metrics server URL (default: http://localhost:8080/metrics) +# --skip-metrics Skip metrics check +# --help Show this help message +# + +set -e + +# Default values +HTTP_SOCKET="/var/run/logcorrelator/http.socket" +NETWORK_SOCKET="/var/run/logcorrelator/network.socket" +COUNT=10 +DELAY_MS=100 +VERBOSE=false +METRICS_URL="http://localhost:8080/metrics" +SKIP_METRICS=false + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Print functions +info() { + echo -e "${BLUE}[INFO]${NC} $1" +} + +success() { + echo -e "${GREEN}[OK]${NC} $1" +} + +warn() { + echo -e "${YELLOW}[WARN]${NC} $1" +} + +error() { + echo -e "${RED}[ERROR]${NC} $1" +} + +verbose() { + if [ "$VERBOSE" = true ]; then + echo -e "${BLUE}[DEBUG]${NC} $1" + fi +} + +# Show help +show_help() { + head -20 "$0" | tail -17 | sed 's/^#//' | sed 's/^ //' + exit 0 +} + +# Parse arguments +while [[ $# -gt 0 ]]; do + case $1 in + -h|--http-socket) + HTTP_SOCKET="$2" + shift 2 + ;; + -n|--network-socket) + NETWORK_SOCKET="$2" + shift 2 + ;; + -c|--count) + COUNT="$2" + shift 2 + ;; + -d|--delay) + DELAY_MS="$2" + shift 2 + ;; + -v|--verbose) + VERBOSE=true + shift + ;; + -m|--metrics-url) + METRICS_URL="$2" + shift 2 + ;; + --skip-metrics) + SKIP_METRICS=true + shift + ;; + --help) + show_help + ;; + *) + error "Unknown option: $1" + echo "Use --help for usage information" + exit 1 + ;; + esac +done + +# Check if socat or netcat is available +if command -v socat &> /dev/null; then + SEND_CMD="socat" +elif command -v nc &> /dev/null; then + SEND_CMD="nc" +else + error "Neither socat nor nc (netcat) found. Please install one of them." + echo " Ubuntu/Debian: apt-get install socat OR apt-get install netcat-openbsd" + echo " RHEL/CentOS: yum install socat OR yum install nc" + exit 1 +fi + +# Function to send data to Unix socket +send_to_socket() { + local socket="$1" + local data="$2" + + if [ "$SEND_CMD" = "socat" ]; then + echo "$data" | socat - "UNIX-SENDTO:$socket" 2>/dev/null + else + echo "$data" | nc -U -u "$socket" 2>/dev/null + fi +} + +# Function to generate timestamp in nanoseconds +get_timestamp_ns() { + date +%s%N +} + +# Function to send HTTP (A) event +send_http_event() { + local src_ip="$1" + local src_port="$2" + local timestamp="$3" + local method="${4:-GET}" + local path="${5:-/test}" + local host="${6:-example.com}" + + local json=$(cat < /dev/null; then + curl -s "$METRICS_URL" 2>/dev/null || echo "{}" + elif command -v wget &> /dev/null; then + wget -qO- "$METRICS_URL" 2>/dev/null || echo "{}" + else + warn "Neither curl nor wget found. Skipping metrics check." + echo "{}" + fi +} + +# Extract value from JSON (simple grep-based, requires jq for complex queries) +get_json_value() { + local json="$1" + local key="$2" + + if command -v jq &> /dev/null; then + echo "$json" | jq -r ".$key // 0" + else + # Fallback: simple grep (works for flat JSON) + echo "$json" | grep -o "\"$key\":[0-9]*" | cut -d: -f2 || echo "0" + fi +} + +# Main test function +run_test() { + info "Starting correlation test..." + info "Configuration:" + echo " HTTP Socket: $HTTP_SOCKET" + echo " Network Socket: $NETWORK_SOCKET" + echo " Test pairs: $COUNT" + echo " Delay between: ${DELAY_MS}ms" + echo " Metrics URL: $METRICS_URL" + echo " Send command: $SEND_CMD" + echo "" + + # Get initial metrics + info "Fetching initial metrics..." + local initial_metrics=$(get_metrics) + local initial_success=$(get_json_value "$initial_metrics" "correlations_success") + local initial_failed=$(get_json_value "$initial_metrics" "correlations_failed") + local initial_a=$(get_json_value "$initial_metrics" "events_received_a") + local initial_b=$(get_json_value "$initial_metrics" "events_received_b") + + info "Initial metrics:" + echo " Events A: $initial_a" + echo " Events B: $initial_b" + echo " Success: $initial_success" + echo " Failed: $initial_failed" + echo "" + + # Send test events + info "Sending $COUNT test event pairs..." + + local base_timestamp=$(get_timestamp_ns) + local sent=0 + local correlated=0 + + for i in $(seq 1 $COUNT); do + local src_ip="192.168.1.$((i % 254 + 1))" + local src_port=$((8000 + i)) + + # Send A and B with same timestamp (should correlate) + local ts_a=$((base_timestamp + i * 1000000)) + local ts_b=$ts_a # Same timestamp for perfect correlation + + send_http_event "$src_ip" "$src_port" "$ts_a" + send_network_event "$src_ip" "$src_port" "$ts_b" + + sent=$((sent + 1)) + verbose "Sent pair $i: $src_ip:$src_port" + + if [ $DELAY_MS -gt 0 ]; then + sleep $(echo "scale=3; $DELAY_MS / 1000" | bc) + fi + done + + success "Sent $sent event pairs" + echo "" + + # Wait for processing + info "Waiting for processing (2 seconds)..." + sleep 2 + + # Get final metrics + info "Fetching final metrics..." + local final_metrics=$(get_metrics) + local final_success=$(get_json_value "$final_metrics" "correlations_success") + local final_failed=$(get_json_value "$final_metrics" "correlations_failed") + local final_a=$(get_json_value "$final_metrics" "events_received_a") + local final_b=$(get_json_value "$final_metrics" "events_received_b") + + # Calculate deltas + local delta_success=$((final_success - initial_success)) + local delta_failed=$((final_failed - initial_failed)) + local delta_a=$((final_a - initial_a)) + local delta_b=$((final_b - initial_b)) + + echo "" + info "Results:" + echo " Events A sent: $delta_a (expected: $sent)" + echo " Events B sent: $delta_b (expected: $sent)" + echo " Correlations: $delta_success" + echo " Failures: $delta_failed" + echo "" + + # Validation + local test_passed=true + + if [ "$delta_a" -ne "$sent" ]; then + error "Event A count mismatch: got $delta_a, expected $sent" + test_passed=false + fi + + if [ "$delta_b" -ne "$sent" ]; then + error "Event B count mismatch: got $delta_b, expected $sent" + test_passed=false + fi + + if [ "$delta_success" -ne "$sent" ]; then + error "Correlation count mismatch: got $delta_success, expected $sent" + test_passed=false + fi + + if [ "$delta_failed" -ne 0 ]; then + warn "Unexpected correlation failures: $delta_failed" + fi + + if [ "$test_passed" = true ]; then + success "All tests passed! Correlation is working correctly." + exit 0 + else + error "Some tests failed. Check the logs for details." + exit 1 + fi +} + +# Test with time window exceeded +run_time_window_test() { + info "Running time window test (B arrives after time window)..." + + local src_ip="192.168.100.1" + local src_port="9999" + + # Send A event + local ts_a=$(get_timestamp_ns) + send_http_event "$src_ip" "$src_port" "$ts_a" + info "Sent A event at timestamp $ts_a" + + # Wait for time window to expire (default is 10s, we wait 11s) + info "Waiting 11 seconds (time window should expire)..." + sleep 11 + + # Send B event + local ts_b=$(get_timestamp_ns) + send_network_event "$src_ip" "$src_port" "$ts_b" + info "Sent B event at timestamp $ts_b" + + info "This should result in a time_window failure (check metrics)" +} + +# Test with different src_ip +run_different_ip_test() { + info "Running different IP test (should NOT correlate)..." + + # Send A with IP 192.168.200.1 + local ts=$(get_timestamp_ns) + send_http_event "192.168.200.1" "7777" "$ts" + info "Sent A event from 192.168.200.1:7777" + + # Send B with different IP + send_network_event "192.168.200.2" "7777" "$ts" + info "Sent B event from 192.168.200.2:7777 (different IP)" + + info "These should NOT correlate (different src_ip)" +} + +# Run tests +check_sockets +echo "" + +# Run main test +run_test + +echo "" +info "Additional tests available:" +echo " --test-time-window Test time window expiration" +echo " --test-different-ip Test different IP (no correlation)" + +# Check for additional test flags +if [[ "$@" == *"--test-time-window"* ]]; then + echo "" + run_time_window_test +fi + +if [[ "$@" == *"--test-different-ip"* ]]; then + echo "" + run_different_ip_test +fi