diff --git a/Makefile b/Makefile index 9edd302..9bc96dc 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ BINARY_NAME=logcorrelator DIST_DIR=dist # Package version -PKG_VERSION ?= 1.1.11 +PKG_VERSION ?= 1.1.12 # Enable BuildKit for better performance export DOCKER_BUILDKIT=1 diff --git a/README.md b/README.md index 85b7af1..3014188 100644 --- a/README.md +++ b/README.md @@ -4,152 +4,146 @@ Service de corrélation de logs HTTP et réseau écrit en Go. ## Description -**logcorrelator** reçoit deux flux de logs JSON via des sockets Unix : +**logcorrelator** reçoit deux flux de logs JSON via des sockets Unix datagrammes (SOCK_DGRAM) : - **Source A** : logs HTTP applicatifs (Apache, reverse proxy) - **Source B** : logs réseau (métadonnées IP/TCP, JA3/JA4, etc.) -Il corrèle les événements sur la base de `src_ip + src_port` avec une fenêtre temporelle configurable, et produit des logs corrélés vers : +Il corrèle les événements sur la base de `src_ip + src_port` dans une fenêtre temporelle configurable, et produit des logs corrélés vers : - Un fichier local (JSON lines) - ClickHouse (pour analyse et archivage) +Les logs opérationnels du service (démarrage, erreurs, métriques) sont écrits sur **stderr** et collectés par journald. Aucune donnée corrélée n'apparaît sur stdout. + ## Architecture ``` ┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ -│ Apache Source │────▶│ │────▶│ File Sink │ -│ (Unix Socket) │ │ Correlation │ │ (JSON lines) │ -└─────────────────┘ │ Service │ └─────────────────┘ - │ │ -┌─────────────────┐ │ - Buffers │ ┌─────────────────┐ -│ Network Source │────▶│ - Time Window │────▶│ ClickHouse │ -│ (Unix Socket) │ │ - Orphan Policy │ │ Sink │ -└─────────────────┘ └──────────────────┘ └─────────────────┘ +│ Source A │────▶│ │────▶│ File Sink │ +│ HTTP/Apache │ │ Correlation │ │ (JSON lines) │ +│ (Unix DGRAM) │ │ Service │ └─────────────────┘ +└─────────────────┘ │ │ + │ - Buffers │ ┌─────────────────┐ +┌─────────────────┐ │ - Time Window │────▶│ ClickHouse │ +│ Source B │────▶│ - Orphan Policy │ │ Sink │ +│ Réseau/JA4 │ │ - Keep-Alive │ └─────────────────┘ +│ (Unix DGRAM) │ └──────────────────┘ +└─────────────────┘ ``` +Architecture hexagonale : domaine pur (`internal/domain`), ports abstraits (`internal/ports`), adaptateurs (`internal/adapters`), orchestration (`internal/app`). + ## Build (100% Docker) -Tout le build et les tests s'exécutent dans des containers Docker : +Tout le build, les tests et le packaging RPM s'exécutent dans des conteneurs : ```bash -# Build complet (binaire + tests + RPM) +# Build complet avec tests (builder stage) +make docker-build-dev + +# Packaging RPM (el8, el9, el10) make package-rpm -# Uniquement les tests -make test +# Build rapide sans tests +make docker-build-dev-no-test -# Build manuel avec Docker -docker build --target builder -t logcorrelator-builder . -docker build --target runtime -t logcorrelator:latest . +# Tests en local (nécessite Go 1.21+) +make test ``` ### Prérequis - Docker 20.10+ -- Bash ## Installation -### Depuis Docker - -```bash -# Build de l'image -docker build --target runtime -t logcorrelator:latest . - -# Exécuter -docker run -d \ - --name logcorrelator \ - -v /var/run/logcorrelator:/var/run/logcorrelator \ - -v /var/log/logcorrelator:/var/log/logcorrelator \ - -v ./config.example.yml:/etc/logcorrelator/logcorrelator.yml \ - logcorrelator:latest -``` - -### Depuis les packages RPM +### Packages RPM ```bash # Générer les packages make package-rpm -# Installer le package RPM (Rocky Linux 8/9/10) -sudo dnf install -y dist/rpm/el8/logcorrelator-1.1.10-1.el8.x86_64.rpm -sudo dnf install -y dist/rpm/el9/logcorrelator-1.1.10-1.el9.x86_64.rpm -sudo dnf install -y dist/rpm/el10/logcorrelator-1.1.10-1.el10.x86_64.rpm +# Installer (Rocky Linux / AlmaLinux) +sudo dnf install -y dist/rpm/el8/logcorrelator-1.1.12-1.el8.x86_64.rpm +sudo dnf install -y dist/rpm/el9/logcorrelator-1.1.12-1.el9.x86_64.rpm +sudo dnf install -y dist/rpm/el10/logcorrelator-1.1.12-1.el10.x86_64.rpm -# Activer et démarrer le service -sudo systemctl enable logcorrelator -sudo systemctl start logcorrelator - -# Vérifier le statut +# Démarrer +sudo systemctl enable --now logcorrelator sudo systemctl status logcorrelator ``` -### Build manuel (sans Docker) +### Build manuel ```bash -# Prérequis: Go 1.21+ +# Binaire local (nécessite Go 1.21+) go build -o logcorrelator ./cmd/logcorrelator - -# Exécuter ./logcorrelator -config config.example.yml ``` ## Configuration -La configuration utilise un fichier YAML. Voir `config.example.yml` pour un exemple complet. +Fichier YAML. Voir `config.example.yml` pour un exemple complet. ```yaml -# /etc/logcorrelator/logcorrelator.yml - log: level: INFO # DEBUG, INFO, WARN, ERROR inputs: unix_sockets: - # Source HTTP (A) : logs applicatifs en JSON - name: http - path: /var/run/logcorrelator/http.sock + source_type: A # Source HTTP + path: /var/run/logcorrelator/http.socket + format: json socket_permissions: "0666" - socket_type: dgram - - # Source réseau (B) : logs IP/TCP/JA3... en JSON - name: network - path: /var/run/logcorrelator/network.sock + source_type: B # Source réseau + path: /var/run/logcorrelator/network.socket + format: json socket_permissions: "0666" - socket_type: dgram outputs: file: - enabled: true path: /var/log/logcorrelator/correlated.log - format: json_lines - clickhouse: - enabled: true - dsn: clickhouse://data_writer:password@localhost:9000/mabase_prod + enabled: false + dsn: clickhouse://user:pass@localhost:9000/db table: http_logs_raw batch_size: 500 flush_interval_ms: 200 max_buffer_size: 5000 drop_on_overflow: true + timeout_ms: 1000 + stdout: + enabled: false # no-op pour les données ; logs opérationnels toujours sur stderr correlation: time_window: - value: 1 + value: 10 unit: s orphan_policy: apache_always_emit: true + apache_emit_delay_ms: 500 # délai avant émission orphelin A (ms) network_emit: false matching: - mode: one_to_many # Keep-Alive : un B peut corréler plusieurs A + mode: one_to_many # Keep-Alive : un B peut corréler plusieurs A successifs buffers: max_http_items: 10000 max_network_items: 20000 ttl: - network_ttl_s: 30 - # Exclure certaines IPs source (optionnel) + network_ttl_s: 120 # TTL remis à zéro à chaque corrélation (Keep-Alive) + # Exclure des IPs source (IPs uniques ou plages CIDR) exclude_source_ips: - - 10.0.0.1 # IP unique - - 192.168.0.0/16 # Plage CIDR + - 10.0.0.1 + - 172.16.0.0/12 + # Restreindre la corrélation à certains ports de destination (optionnel) + # Si la liste est vide, tous les ports sont corrélés + include_dest_ports: + - 80 + - 443 + +metrics: + enabled: false + addr: ":8080" ``` ### Format du DSN ClickHouse @@ -158,11 +152,7 @@ correlation: clickhouse://username:password@host:port/database ``` -Exemple : `clickhouse://data_writer:MonMotDePasse@127.0.0.1:9000/mabase_prod` - -Ports courants : -- `9000` : port natif (recommandé pour le driver Go) -- `8123` : port HTTP (alternative) +Ports : `9000` (natif, recommandé) ou `8123` (HTTP). ## Format des logs @@ -170,14 +160,10 @@ Ports courants : ```json { - "src_ip": "192.168.1.1", - "src_port": 8080, - "dst_ip": "10.0.0.1", - "dst_port": 80, + "src_ip": "192.168.1.1", "src_port": 8080, + "dst_ip": "10.0.0.1", "dst_port": 443, "timestamp": 1704110400000000000, - "method": "GET", - "path": "/api/test", - "header_host": "example.com" + "method": "GET", "path": "/api/test" } ``` @@ -185,49 +171,40 @@ Ports courants : ```json { - "src_ip": "192.168.1.1", - "src_port": 8080, - "dst_ip": "10.0.0.1", - "dst_port": 443, - "ja3": "abc123def456", - "ja4": "xyz789" + "src_ip": "192.168.1.1", "src_port": 8080, + "dst_ip": "10.0.0.1", "dst_port": 443, + "ja3": "abc123", "ja4": "xyz789" } ``` ### Log corrélé (sortie) -**Version 1.0.3+** - Structure JSON plate (flat) : +Structure JSON plate — tous les champs A et B sont fusionnés à la racine : ```json { "timestamp": "2024-01-01T12:00:00Z", - "src_ip": "192.168.1.1", - "src_port": 8080, - "dst_ip": "10.0.0.1", - "dst_port": 80, + "src_ip": "192.168.1.1", "src_port": 8080, + "dst_ip": "10.0.0.1", "dst_port": 443, "correlated": true, - "method": "GET", - "path": "/api/test", - "ja3": "abc123def456", - "ja4": "xyz789" + "method": "GET", "path": "/api/test", + "ja3": "abc123", "ja4": "xyz789" } ``` -Tous les champs des sources A et B sont fusionnés au même niveau. Les champs de corrélation (`src_ip`, `src_port`, `dst_ip`, `dst_port`, `correlated`, `orphan_side`) sont toujours présents, et tous les autres champs des logs sources sont ajoutés directement à la racine. +En cas de collision de champ entre A et B, les deux valeurs sont conservées avec préfixes `a_` et `b_`. + +Les orphelins A (sans B correspondant) sont émis avec `"correlated": false, "orphan_side": "A"`. ## Schema ClickHouse -Le service utilise deux tables ClickHouse dans la base `mabase_prod` : - ### Setup complet ```sql --- 1. Créer la base de données +-- Base de données CREATE DATABASE IF NOT EXISTS mabase_prod; --- 2. Table brute avec TTL (1 jour de rétention) -DROP TABLE IF EXISTS mabase_prod.http_logs_raw; - +-- Table brute (cible des inserts du service) CREATE TABLE mabase_prod.http_logs_raw ( raw_json String, @@ -237,505 +214,268 @@ ENGINE = MergeTree PARTITION BY toDate(ingest_time) ORDER BY ingest_time; --- 3. Table parsée -DROP TABLE IF EXISTS mabase_prod.http_logs; - +-- Table parsée CREATE TABLE mabase_prod.http_logs ( - time DateTime, - log_date Date DEFAULT toDate(time), - - src_ip IPv4, - src_port UInt16, - dst_ip IPv4, - dst_port UInt16, - - method LowCardinality(String), - scheme LowCardinality(String), - host LowCardinality(String), - path String, - query String, - http_version LowCardinality(String), - orphan_side LowCardinality(String), - - correlated UInt8, - keepalives UInt16, - a_timestamp UInt64, - b_timestamp UInt64, - conn_id String, - - ip_meta_df UInt8, - ip_meta_id UInt32, - ip_meta_total_length UInt32, - ip_meta_ttl UInt8, - tcp_meta_options LowCardinality(String), - tcp_meta_window_size UInt32, - syn_to_clienthello_ms Int32, - - tls_version LowCardinality(String), - tls_sni LowCardinality(String), - ja3 String, - ja3_hash String, - ja4 String, - - header_user_agent String, - header_accept String, - header_accept_encoding String, - header_accept_language String, - header_x_request_id String, - header_x_trace_id String, - header_x_forwarded_for String, - - header_sec_ch_ua String, - header_sec_ch_ua_mobile String, + time DateTime, + log_date Date DEFAULT toDate(time), + src_ip IPv4, src_port UInt16, + dst_ip IPv4, dst_port UInt16, + method LowCardinality(String), + scheme LowCardinality(String), + host LowCardinality(String), + path String, query String, + http_version LowCardinality(String), + orphan_side LowCardinality(String), + correlated UInt8, + tls_version LowCardinality(String), + tls_sni LowCardinality(String), + ja3 String, ja3_hash String, ja4 String, + header_user_agent String, header_accept String, + header_accept_encoding String, header_accept_language String, + header_x_forwarded_for String, + header_sec_ch_ua String, header_sec_ch_ua_mobile String, header_sec_ch_ua_platform String, - header_sec_fetch_dest String, - header_sec_fetch_mode String, - header_sec_fetch_site String + header_sec_fetch_dest String, header_sec_fetch_mode String, header_sec_fetch_site String, + ip_meta_ttl UInt8, ip_meta_df UInt8, + tcp_meta_window_size UInt32, tcp_meta_options LowCardinality(String), + syn_to_clienthello_ms Int32 ) ENGINE = MergeTree PARTITION BY log_date ORDER BY (time, src_ip, dst_ip, ja4); --- 4. Vue matérialisée (RAW → logs) -DROP VIEW IF EXISTS mabase_prod.mv_http_logs; - +-- Vue matérialisée RAW → http_logs CREATE MATERIALIZED VIEW mabase_prod.mv_http_logs -TO mabase_prod.http_logs -AS +TO mabase_prod.http_logs AS SELECT - -- 1. Temps - parseDateTimeBestEffort( - coalesce(JSONExtractString(raw_json, 'time'), '1970-01-01T00:00:00Z') - ) AS time, - toDate(time) AS log_date, - - -- 2. Réseau L3/L4 - toIPv4(coalesce(JSONExtractString(raw_json, 'src_ip'), '0.0.0.0')) AS src_ip, - toUInt16(coalesce(JSONExtractUInt(raw_json, 'src_port'), 0)) AS src_port, - toIPv4(coalesce(JSONExtractString(raw_json, 'dst_ip'), '0.0.0.0')) AS dst_ip, - toUInt16(coalesce(JSONExtractUInt(raw_json, 'dst_port'), 0)) AS dst_port, - - -- 3. HTTP de base - coalesce(JSONExtractString(raw_json, 'method'), '') AS method, - coalesce(JSONExtractString(raw_json, 'scheme'), '') AS scheme, - coalesce(JSONExtractString(raw_json, 'host'), '') AS host, - coalesce(JSONExtractString(raw_json, 'path'), '') AS path, - coalesce(JSONExtractString(raw_json, 'query'), '') AS query, - coalesce(JSONExtractString(raw_json, 'http_version'), '') AS http_version, - coalesce(JSONExtractString(raw_json, 'orphan_side'), '') AS orphan_side, - - -- 4. Connexion / corrélation - toUInt8(coalesce(JSONExtractBool(raw_json, 'correlated'), 0)) AS correlated, - toUInt16(coalesce(JSONExtractUInt(raw_json, 'keepalives'), 0)) AS keepalives, - coalesce(JSONExtractUInt(raw_json, 'a_timestamp'), 0) AS a_timestamp, - coalesce(JSONExtractUInt(raw_json, 'b_timestamp'), 0) AS b_timestamp, - coalesce(JSONExtractString(raw_json, 'conn_id'), '') AS conn_id, - - -- 5. IP/TCP - toUInt8(coalesce(JSONExtractBool(raw_json, 'ip_meta_df'), 0)) AS ip_meta_df, - coalesce(JSONExtractUInt(raw_json, 'ip_meta_id'), 0) AS ip_meta_id, - coalesce(JSONExtractUInt(raw_json, 'ip_meta_total_length'), 0) AS ip_meta_total_length, - coalesce(JSONExtractUInt(raw_json, 'ip_meta_ttl'), 0) AS ip_meta_ttl, - coalesce(JSONExtractString(raw_json, 'tcp_meta_options'), '') AS tcp_meta_options, - coalesce(JSONExtractUInt(raw_json, 'tcp_meta_window_size'), 0) AS tcp_meta_window_size, - toInt32(coalesce(JSONExtractInt(raw_json, 'syn_to_clienthello_ms'), 0)) AS syn_to_clienthello_ms, - - -- 6. TLS / JA3/JA4 - coalesce(JSONExtractString(raw_json, 'tls_version'), '') AS tls_version, - coalesce(JSONExtractString(raw_json, 'tls_sni'), '') AS tls_sni, - coalesce(JSONExtractString(raw_json, 'ja3'), '') AS ja3, - coalesce(JSONExtractString(raw_json, 'ja3_hash'), '') AS ja3_hash, - coalesce(JSONExtractString(raw_json, 'ja4'), '') AS ja4, - - -- 7. Headers HTTP - coalesce(JSONExtractString(raw_json, 'header_User-Agent'), '') AS header_user_agent, - coalesce(JSONExtractString(raw_json, 'header_Accept'), '') AS header_accept, - coalesce(JSONExtractString(raw_json, 'header_Accept-Encoding'), '') AS header_accept_encoding, - coalesce(JSONExtractString(raw_json, 'header_Accept-Language'), '') AS header_accept_language, - coalesce(JSONExtractString(raw_json, 'header_X-Request-Id'), '') AS header_x_request_id, - coalesce(JSONExtractString(raw_json, 'header_X-Trace-Id'), '') AS header_x_trace_id, - coalesce(JSONExtractString(raw_json, 'header_X-Forwarded-For'), '') AS header_x_forwarded_for, - - coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA'), '') AS header_sec_ch_ua, - coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Mobile'), '') AS header_sec_ch_ua_mobile, - coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Platform'), '') AS header_sec_ch_ua_platform, - coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Dest'), '') AS header_sec_fetch_dest, - coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Mode'), '') AS header_sec_fetch_mode, - coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Site'), '') AS header_sec_fetch_site - + parseDateTimeBestEffort(coalesce(JSONExtractString(raw_json,'time'),'1970-01-01T00:00:00Z')) AS time, + toDate(time) AS log_date, + toIPv4(coalesce(JSONExtractString(raw_json,'src_ip'),'0.0.0.0')) AS src_ip, + toUInt16(JSONExtractUInt(raw_json,'src_port')) AS src_port, + toIPv4(coalesce(JSONExtractString(raw_json,'dst_ip'),'0.0.0.0')) AS dst_ip, + toUInt16(JSONExtractUInt(raw_json,'dst_port')) AS dst_port, + coalesce(JSONExtractString(raw_json,'method'),'') AS method, + coalesce(JSONExtractString(raw_json,'scheme'),'') AS scheme, + coalesce(JSONExtractString(raw_json,'host'),'') AS host, + coalesce(JSONExtractString(raw_json,'path'),'') AS path, + coalesce(JSONExtractString(raw_json,'query'),'') AS query, + coalesce(JSONExtractString(raw_json,'http_version'),'') AS http_version, + coalesce(JSONExtractString(raw_json,'orphan_side'),'') AS orphan_side, + toUInt8(JSONExtractBool(raw_json,'correlated')) AS correlated, + coalesce(JSONExtractString(raw_json,'tls_version'),'') AS tls_version, + coalesce(JSONExtractString(raw_json,'tls_sni'),'') AS tls_sni, + coalesce(JSONExtractString(raw_json,'ja3'),'') AS ja3, + coalesce(JSONExtractString(raw_json,'ja3_hash'),'') AS ja3_hash, + coalesce(JSONExtractString(raw_json,'ja4'),'') AS ja4, + coalesce(JSONExtractString(raw_json,'header_User-Agent'),'') AS header_user_agent, + coalesce(JSONExtractString(raw_json,'header_Accept'),'') AS header_accept, + coalesce(JSONExtractString(raw_json,'header_Accept-Encoding'),'') AS header_accept_encoding, + coalesce(JSONExtractString(raw_json,'header_Accept-Language'),'') AS header_accept_language, + coalesce(JSONExtractString(raw_json,'header_X-Forwarded-For'),'') AS header_x_forwarded_for, + coalesce(JSONExtractString(raw_json,'header_Sec-CH-UA'),'') AS header_sec_ch_ua, + coalesce(JSONExtractString(raw_json,'header_Sec-CH-UA-Mobile'),'') AS header_sec_ch_ua_mobile, + coalesce(JSONExtractString(raw_json,'header_Sec-CH-UA-Platform'),'') AS header_sec_ch_ua_platform, + coalesce(JSONExtractString(raw_json,'header_Sec-Fetch-Dest'),'') AS header_sec_fetch_dest, + coalesce(JSONExtractString(raw_json,'header_Sec-Fetch-Mode'),'') AS header_sec_fetch_mode, + coalesce(JSONExtractString(raw_json,'header_Sec-Fetch-Site'),'') AS header_sec_fetch_site, + toUInt8(JSONExtractUInt(raw_json,'ip_meta_ttl')) AS ip_meta_ttl, + toUInt8(JSONExtractBool(raw_json,'ip_meta_df')) AS ip_meta_df, + toUInt32(JSONExtractUInt(raw_json,'tcp_meta_window_size')) AS tcp_meta_window_size, + coalesce(JSONExtractString(raw_json,'tcp_meta_options'),'') AS tcp_meta_options, + toInt32(JSONExtractInt(raw_json,'syn_to_clienthello_ms')) AS syn_to_clienthello_ms FROM mabase_prod.http_logs_raw; ``` ### Utilisateurs et permissions ```sql --- Créer les utilisateurs -CREATE USER IF NOT EXISTS data_writer IDENTIFIED WITH TonMotDePasseInsert; -CREATE USER IF NOT EXISTS analyst IDENTIFIED WITH TonMotDePasseAnalyst; +CREATE USER IF NOT EXISTS data_writer IDENTIFIED WITH plaintext_password BY 'MotDePasse'; +CREATE USER IF NOT EXISTS analyst IDENTIFIED WITH plaintext_password BY 'MotDePasseAnalyst'; --- Droits pour data_writer (INSERT + SELECT pour la MV) GRANT INSERT(raw_json) ON mabase_prod.http_logs_raw TO data_writer; GRANT SELECT(raw_json) ON mabase_prod.http_logs_raw TO data_writer; - --- Droits pour analyst (lecture seule sur les logs parsés) GRANT SELECT ON mabase_prod.http_logs TO analyst; ``` -### Format d'insertion - -Le service envoie chaque log corrélé sérialisé en JSON dans la colonne `raw_json` : +### Vérification de l'ingestion ```sql -INSERT INTO mabase_prod.http_logs_raw (raw_json) FORMAT JSONEachRow -{"raw_json":"{\"timestamp\":\"2024-01-01T12:00:00Z\",\"src_ip\":\"192.168.1.1\",\"correlated\":true,...}"} -``` +-- Données brutes reçues +SELECT count(*), min(ingest_time), max(ingest_time) FROM http_logs_raw; -### Migration des données existantes +-- Données parsées par la vue matérialisée +SELECT count(*), min(time), max(time) FROM http_logs; -Si vous avez déjà des données dans l'ancienne table `http_logs_raw` : - -```sql -INSERT INTO mabase_prod.http_logs (raw_json) -SELECT raw_json -FROM mabase_prod.http_logs_raw; -``` - -### Sanity checks - Vérification de l'ingestion - -Après avoir déployé le service, vérifiez que les données circulent correctement : - -```sql --- 1. Tables présentes -SELECT - database, - table, - engine -FROM system.tables -WHERE database = currentDatabase() - AND table IN ('http_logs_raw', 'http_logs', 'mv_http_logs'); - --- 2. Définition de la vue matérialisée -SHOW CREATE TABLE mv_http_logs; - --- 3. Vérifier que les inserts bruts arrivent -SELECT - count(*) AS rows_raw, - min(ingest_time) AS min_ingest, - max(ingest_time) AS max_ingest -FROM http_logs_raw; - --- 4. Voir les derniers logs bruts -SELECT - ingest_time, - raw_json -FROM http_logs_raw -ORDER BY ingest_time DESC -LIMIT 5; - --- 5. Vérifier que la MV alimente http_logs -SELECT - count(*) AS rows_flat, - min(time) AS min_time, - max(time) AS max_time -FROM http_logs; - --- 6. Voir les derniers logs parsés -SELECT - time, - src_ip, - dst_ip, - method, - host, - path, - header_user_agent, - tls_version, - ja4 -FROM http_logs -ORDER BY time DESC -LIMIT 10; -``` - -**Interprétation :** -- Si `rows_raw` > 0 mais `rows_flat` = 0 : la vue matérialisée ne fonctionne pas (vérifiez les droits SELECT sur `http_logs_raw`) -- Si les deux comptes sont > 0 : l'ingestion et le parsing fonctionnent correctement - -## Tests - -```bash -# Via Docker -./test.sh - -# Local -go test ./... -go test -cover ./... -go test -coverprofile=coverage.out ./... -go tool cover -html=coverage.out +-- Derniers logs +SELECT time, src_ip, dst_ip, method, host, path, ja4 +FROM http_logs ORDER BY time DESC LIMIT 10; ``` ## Signaux | Signal | Comportement | |--------|--------------| -| `SIGINT` | Arrêt gracieux | -| `SIGTERM` | Arrêt gracieux | - -Lors de l'arrêt gracieux : -1. Fermeture des sockets Unix -2. Flush des buffers -3. Émission des événements en attente -4. Fermeture des connexions ClickHouse +| `SIGINT` / `SIGTERM` | Arrêt gracieux (drain buffers, flush sinks) | +| `SIGHUP` | Réouverture des fichiers de sortie (log rotation) | ## Logs internes -Les logs internes sont envoyés vers stderr : +Les logs opérationnels vont sur **stderr** : ```bash -# Docker -docker logs -f logcorrelator - # Systemd journalctl -u logcorrelator -f + +# Docker +docker logs -f logcorrelator ``` ## Structure du projet ``` -. -├── cmd/logcorrelator/ # Point d'entrée -├── internal/ -│ ├── adapters/ -│ │ ├── inbound/unixsocket/ # Sources HTTP et réseau -│ │ └── outbound/ -│ │ ├── clickhouse/ # Sink ClickHouse -│ │ ├── file/ # Sink fichier -│ │ └── multi/ # Multi-sink -│ ├── app/ # Orchestration -│ ├── config/ # Configuration YAML -│ ├── domain/ # Domaine (corrélation) -│ ├── observability/ # Logging interne -│ └── ports/ # Interfaces -├── config.example.yml # Exemple de config -├── Dockerfile # Build multi-stage -├── Dockerfile.package # Packaging RPM -├── Makefile # Commandes de build -├── architecture.yml # Spécification architecture -└── logcorrelator.service # Unité systemd +cmd/logcorrelator/ # Point d'entrée +internal/ + adapters/ + inbound/unixsocket/ # Lecture SOCK_DGRAM → NormalizedEvent + outbound/ + clickhouse/ # Sink ClickHouse (batch, retry, logging complet) + file/ # Sink fichier (JSON lines, SIGHUP reopen) + multi/ # Fan-out vers plusieurs sinks + stdout/ # No-op pour les données (logs opérationnels sur stderr) + app/ # Orchestrator (sources → corrélation → sinks) + config/ # Chargement/validation YAML + domain/ # CorrelationService, NormalizedEvent, CorrelatedLog + observability/ # Logger, métriques, serveur HTTP /metrics /health + ports/ # Interfaces EventSource, CorrelatedLogSink, CorrelationProcessor +config.example.yml # Exemple de configuration +Dockerfile # Build multi-stage (builder, runtime, dev) +Dockerfile.package # Packaging RPM multi-distros (el8, el9, el10) +Makefile # Cibles de build +architecture.yml # Spécification architecture +logcorrelator.service # Unité systemd ``` -## License +## Débogage -MIT +### Logs DEBUG + +```yaml +log: + level: DEBUG +``` + +Exemples de logs produits : +``` +[unixsocket:http] DEBUG event received: source=A src_ip=192.168.1.1 src_port=8080 +[correlation] DEBUG processing A event: key=192.168.1.1:8080 +[correlation] DEBUG correlation found: A(src_ip=... src_port=... ts=...) + B(...) +[correlation] DEBUG A event has no matching B key in buffer: key=... +[correlation] DEBUG event excluded by IP filter: source=A src_ip=10.0.0.1 src_port=8080 +[correlation] DEBUG event excluded by dest port filter: source=A dst_port=22 +[correlation] DEBUG TTL reset for B event (Keep-Alive): key=... new_ttl=120s +[clickhouse] DEBUG batch sent: rows=42 table=http_logs_raw +``` + +### Serveur de métriques + +```yaml +metrics: + enabled: true + addr: ":8080" +``` + +`GET /health` → `{"status":"healthy"}` + +`GET /metrics` : + +```json +{ + "events_received_a": 1542, "events_received_b": 1498, + "correlations_success": 1450, "correlations_failed": 92, + "failed_no_match_key": 45, "failed_time_window": 23, + "failed_buffer_eviction": 5, "failed_ttl_expired": 12, + "failed_ip_excluded": 7, "failed_dest_port_filtered": 3, + "buffer_a_size": 23, "buffer_b_size": 18, + "orphans_emitted_a": 92, "orphans_pending_a": 4, + "keepalive_resets": 892 +} +``` + +### Diagnostic par métriques + +| Métrique élevée | Cause | Solution | +|---|---|---| +| `failed_no_match_key` | A et B n'ont pas le même `src_ip:src_port` | Vérifier les deux sources | +| `failed_time_window` | Timestamps trop éloignés | Augmenter `time_window.value` ou vérifier NTP | +| `failed_ttl_expired` | B expire avant corrélation | Augmenter `ttl.network_ttl_s` | +| `failed_buffer_eviction` | Buffers trop petits | Augmenter `buffers.max_http_items` / `max_network_items` | +| `failed_ip_excluded` | Traffic depuis IPs exclues | Normal si attendu | +| `failed_dest_port_filtered` | Traffic sur ports non listés | Vérifier `include_dest_ports` | +| `orphans_emitted_a` élevé | Beaucoup de A sans B | Vérifier que la source B envoie des événements | + +### Filtrage par IP source + +```yaml +correlation: + exclude_source_ips: + - 10.0.0.1 # IP unique (health checks) + - 172.16.0.0/12 # Plage CIDR +``` + +Les événements depuis ces IPs sont silencieusement ignorés (non corrélés, non émis en orphelin). La métrique `failed_ip_excluded` comptabilise les exclusions. + +### Filtrage par port de destination + +```yaml +correlation: + include_dest_ports: + - 80 # HTTP + - 443 # HTTPS + - 8080 + - 8443 +``` + +Si la liste est non vide, seuls les événements dont le `dst_port` est dans la liste participent à la corrélation. Les autres sont silencieusement ignorés. Liste vide = tous les ports corrélés (comportement par défaut). La métrique `failed_dest_port_filtered` comptabilise les exclusions. + +### Scripts de test + +```bash +# Script Bash (simple) +./scripts/test-correlation.sh -c 10 -v + +# Script Python (scénarios complets : basic, time window, keepalive, différentes IPs) +pip install requests +python3 scripts/test-correlation-advanced.py --all +``` ## Troubleshooting ### ClickHouse : erreurs d'insertion -**Erreur : `No such column timestamp`** -- Vérifiez que la table de destination est bien `http_logs_raw` (colonne unique `raw_json`) -- Le service envoie un JSON sérialisé dans `raw_json`, pas des colonnes séparées +- **`No such column`** : vérifier que la table utilise la colonne unique `raw_json` (pas de colonnes séparées) +- **`ACCESS_DENIED`** : `GRANT INSERT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;` +- Les erreurs de flush sont loggées en ERROR dans les logs du service -**Erreur : `ACCESS_DENIED`** -- Vérifiez les droits de l'utilisateur `data_writer` : - ```sql - GRANT INSERT(raw_json) ON mabase_prod.http_logs_raw TO data_writer; - GRANT SELECT(raw_json) ON mabase_prod.http_logs_raw TO data_writer; - ``` +### Vue matérialisée vide -### Vue matérialisée ne fonctionne pas - -**Symptôme :** `http_logs_raw` a des données, mais `http_logs` est vide - -1. Vérifiez que la MV existe : - ```sql - SHOW CREATE TABLE mv_http_logs; - ``` - -2. Vérifiez les droits SELECT pour `data_writer` sur `http_logs_raw` - -3. Testez manuellement : - ```sql - INSERT INTO mabase_prod.http_logs - SELECT * FROM mabase_prod.mv_http_logs - WHERE time > now() - INTERVAL 1 HOUR; - ``` +Si `http_logs_raw` a des données mais `http_logs` est vide : +```sql +SHOW CREATE TABLE mv_http_logs; +GRANT SELECT(raw_json) ON mabase_prod.http_logs_raw TO data_writer; +``` ### Sockets Unix : permission denied -**Erreur :** `permission denied` sur `/var/run/logcorrelator/*.sock` - -- Vérifiez que les sockets ont les permissions `0666` -- Vérifiez que l'utilisateur `logcorrelator` peut lire/écrire +Vérifier que `socket_permissions: "0666"` est configuré et que le répertoire `/var/run/logcorrelator` appartient à l'utilisateur `logcorrelator`. ### Service systemd ne démarre pas -1. Vérifiez les logs : - ```bash - journalctl -u logcorrelator -n 50 --no-pager - ``` - -2. Vérifiez la configuration : - ```bash - cat /etc/logcorrelator/logcorrelator.yml - ``` - -3. Testez manuellement : - ```bash - /usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.yml - ``` - -## Débogage de la corrélation - -### Activer les logs DEBUG - -Pour diagnostiquer les problèmes de corrélation, activez le niveau de log `DEBUG` : - -```yaml -# /etc/logcorrelator/logcorrelator.yml -log: - level: DEBUG -``` - -Les logs DEBUG affichent : -- Réception des événements A et B avec clé de corrélation et timestamp -- Tentatives de matching (succès et échecs) -- Raisons des échecs : `no_match_key`, `time_window`, `buffer_eviction`, `ttl_expired` -- Émission des orphelins (immédiats ou après délai) -- Resets TTL (mode Keep-Alive) - -Exemple de logs : -``` -[unixsocket:http] DEBUG event received: source=A src_ip=192.168.1.1 src_port=8080 timestamp=2026-03-04 11:00:00.123456789 +0000 UTC -[correlation] DEBUG processing A event: key=192.168.1.1:8080 timestamp=2026-03-04 11:00:00.123456789 +0000 UTC -[correlation] DEBUG A event has no matching B key in buffer: key=192.168.1.1:8080 -[correlation] DEBUG A event added to pending orphans (delay=500ms): src_ip=192.168.1.1 src_port=8080 -[correlation] DEBUG correlation found: A(src_ip=192.168.1.1 src_port=8080 ts=...) + B(src_ip=192.168.1.1 src_port=8080 ts=...) -``` - -### Serveur de métriques - -Le service expose un serveur HTTP optionnel pour le monitoring et le débogage. - -**Configuration :** -```yaml -metrics: - enabled: true - addr: ":8080" # Adresse d'écoute -``` - -**Endpoints :** -- `GET /metrics` - Retourne les métriques de corrélation au format JSON -- `GET /health` - Health check - -**Métriques disponibles :** - -| Métrique | Description | -|----------|-------------| -| `events_received_a` | Nombre d'événements A reçus | -| `events_received_b` | Nombre d'événements B reçus | -| `correlations_success` | Corrélations réussies | -| `correlations_failed` | Échecs de corrélation | -| `failed_no_match_key` | Échec : clé `src_ip:src_port` non trouvée | -| `failed_time_window` | Échec : hors fenêtre temporelle | -| `failed_buffer_eviction` | Échec : buffer plein | -| `failed_ttl_expired` | Échec : TTL expiré | -| `buffer_a_size` | Taille du buffer A | -| `buffer_b_size` | Taille du buffer B | -| `orphans_emitted_a` | Orphelins A émis | -| `orphans_emitted_b` | Orphelins B émis | -| `orphans_pending_a` | Orphelins A en attente | -| `pending_orphan_match` | B a corrélé avec un orphelin A en attente | -| `keepalive_resets` | Resets TTL (mode Keep-Alive) | - -**Exemple de réponse :** -```json -{ - "events_received_a": 150, - "events_received_b": 145, - "correlations_success": 140, - "correlations_failed": 10, - "failed_no_match_key": 5, - "failed_time_window": 3, - "failed_buffer_eviction": 0, - "failed_ttl_expired": 2, - "buffer_a_size": 10, - "buffer_b_size": 5, - "orphans_emitted_a": 10, - "keepalive_resets": 25 -} -``` - -### Diagnostic rapide - -Selon les métriques, identifiez la cause des échecs : - -| Métrique élevée | Cause probable | Solution | -|----------------|----------------|----------| -| `failed_no_match_key` | Les logs A et B n'ont pas le même `src_ip + src_port` | Vérifiez que les deux sources utilisent bien la même combinaison IP/port | -| `failed_time_window` | Timestamps trop éloignés (>10s par défaut) | Augmentez `correlation.time_window.value` ou vérifiez la synchronisation des horloges | -| `failed_ttl_expired` | Les événements B expirent avant corrélation | Augmentez `correlation.ttl.network_ttl_s` | -| `failed_buffer_eviction` | Buffers trop petits pour le volume | Augmentez `correlation.buffers.max_http_items` et `max_network_items` | -| `orphans_emitted_a` élevé | Beaucoup de logs A sans B correspondant | Vérifiez que la source B envoie bien les événements attendus | -| `failed_ip_excluded` élevé | Traffic depuis des IPs exclues | Vérifiez la configuration `exclude_source_ips` | - -### Exclure des IPs source - -Pour exclure certains logs en fonction de l'IP source, utilisez la configuration `exclude_source_ips` : - -```yaml -correlation: - exclude_source_ips: - - 10.0.0.1 # IP unique - - 192.168.1.100 # Autre IP unique - - 172.16.0.0/12 # Plage CIDR (réseau privé) - - 10.10.10.0/24 # Autre plage CIDR -``` - -**Cas d'usage :** -- Exclure les health checks et sondes de monitoring -- Filtrer le traffic interne connu -- Bloquer des IPs malveillantes ou indésirables - -**Comportement :** -- Les événements depuis ces IPs sont silencieusement ignorés -- Ils ne sont pas corrélés, pas émis comme orphelins -- La métrique `failed_ip_excluded` compte le nombre d'événements exclus -- Les logs DEBUG montrent : `event excluded by IP filter: source=A src_ip=10.0.0.1 src_port=8080` - -### Scripts de test - -Deux scripts sont fournis pour tester la corrélation : - -**Script Bash (simple) :** ```bash -# Test de base avec 10 paires d'événements -./scripts/test-correlation.sh -c 10 -v - -# Avec chemins de sockets personnalisés -./scripts/test-correlation.sh \ - -H /var/run/logcorrelator/http.socket \ - -N /var/run/logcorrelator/network.socket \ - -m http://localhost:8080/metrics +journalctl -u logcorrelator -n 50 --no-pager +/usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.yml ``` -**Script Python (avancé) :** -```bash -# Installation des dépendances -pip install requests +## License -# Test de base -python3 scripts/test-correlation-advanced.py -c 20 -v - -# Tous les tests (basic, time window, different IP, keepalive) -python3 scripts/test-correlation-advanced.py --all - -# Test spécifique -python3 scripts/test-correlation-advanced.py --time-window -python3 scripts/test-correlation-advanced.py --keepalive -``` - -**Prérequis :** -- `socat` ou `nc` (netcat) pour le script Bash -- Python 3.6+ et `requests` pour le script Python -- Le service `logcorrelator` doit être en cours d'exécution -- Le serveur de métriques doit être activé pour les vérifications automatiques +MIT diff --git a/architecture.yml b/architecture.yml index 3871501..845d11f 100644 --- a/architecture.yml +++ b/architecture.yml @@ -412,6 +412,21 @@ correlation: description: > Stratégie 1‑à‑N : un log réseau peut être utilisé pour plusieurs logs HTTP successifs tant qu'il n'a pas expiré ni été évincé. + ip_filtering: + directive: exclude_source_ips + description: > + Liste d'IPs source (exactes ou plages CIDR) à ignorer silencieusement. + Événements non corrélés, non émis en orphelin. Métrique : failed_ip_excluded. + dest_port_filtering: + directive: include_dest_ports + description: > + Liste blanche de ports de destination. Si non vide, seuls les événements + dont le dst_port est dans la liste participent à la corrélation. Les autres + sont silencieusement ignorés (non corrélés, non émis en orphelin). + Liste vide = tous les ports autorisés (comportement par défaut). + Métrique : failed_dest_port_filtered. + example: + include_dest_ports: [80, 443, 8080, 8443] schema: description: > @@ -708,6 +723,8 @@ architecture: responsibilities: - Modèles NormalizedEvent et CorrelatedLog. - CorrelationService (fenêtre, TTL, buffers bornés, one-to-many/Keep-Alive, orphelins). + - Filtrage par IP source (exclude_source_ips, CIDR). + - Filtrage par port destination (include_dest_ports, liste blanche). - Custom JSON marshaling pour CorrelatedLog (structure plate). - name: internal/ports type: ports @@ -849,6 +866,7 @@ observability: - "A event has no matching B key in buffer: key=..." - "A event has same key as B but outside time window: key=... time_diff=5s window=10s" - "event excluded by IP filter: source=A src_ip=10.0.0.1 src_port=8080" + - "event excluded by dest port filter: source=A dst_port=22" - "TTL reset for B event (Keep-Alive): key=... new_ttl=120s" - "[clickhouse] DEBUG batch sent: rows=42 table=correlated_logs_http_network" info_logs: @@ -877,6 +895,7 @@ observability: "failed_buffer_eviction": 5, "failed_ttl_expired": 12, "failed_ip_excluded": 7, + "failed_dest_port_filtered": 3, "buffer_a_size": 23, "buffer_b_size": 18, "orphans_emitted_a": 92, @@ -900,6 +919,7 @@ observability: - failed_buffer_eviction: Buffer plein, événement évincé - failed_ttl_expired: TTL du événement B expiré - failed_ip_excluded: Événement exclu par filtre IP (exclude_source_ips) + - failed_dest_port_filtered: Événement exclu par filtre port destination (include_dest_ports) buffers: - buffer_a_size: Taille actuelle du buffer HTTP - buffer_b_size: Taille actuelle du buffer réseau @@ -929,6 +949,9 @@ observability: - symptom: failed_ip_excluded élevé cause: Traffic depuis des IPs configurées dans exclude_source_ips solution: Vérifier la configuration, c'est normal si attendu + - symptom: failed_dest_port_filtered élevé + cause: Traffic sur des ports non listés dans include_dest_ports + solution: Vérifier la configuration include_dest_ports, ou vider la liste pour tout accepter - symptom: orphans_emitted_a élevé cause: Beaucoup de logs A sans correspondance B solution: Vérifier que la source B envoie bien les événements attendus diff --git a/cmd/logcorrelator/main.go b/cmd/logcorrelator/main.go index 3fb189b..119f05a 100644 --- a/cmd/logcorrelator/main.go +++ b/cmd/logcorrelator/main.go @@ -115,6 +115,7 @@ func main() { NetworkTTLS: cfg.Correlation.GetNetworkTTLS(), MatchingMode: cfg.Correlation.GetMatchingMode(), ExcludeSourceIPs: cfg.Correlation.GetExcludeSourceIPs(), + IncludeDestPorts: cfg.Correlation.GetIncludeDestPorts(), }, &domain.RealTimeProvider{}) // Set logger for correlation service diff --git a/config.example.yml b/config.example.yml index 3ff1e20..5e83cf1 100644 --- a/config.example.yml +++ b/config.example.yml @@ -36,7 +36,6 @@ outputs: stdout: enabled: false - level: INFO # DEBUG: all logs including orphans, INFO: only correlated, WARN: correlated only, ERROR: none correlation: # Time window for correlation (A and B must be within this window) @@ -74,6 +73,16 @@ correlation: - 172.16.0.0/12 # CIDR range (private network) - 10.10.10.0/24 # Another CIDR range + # Restrict correlation to specific destination ports (optional) + # If non-empty, only events whose dst_port matches one of these values will be correlated + # Events on other ports are silently ignored (not correlated, not emitted as orphans) + # Useful to focus on HTTP/HTTPS traffic only and ignore unrelated connections + # include_dest_ports: + # - 80 # HTTP + # - 443 # HTTPS + # - 8080 # HTTP alt + # - 8443 # HTTPS alt + # Metrics server configuration (optional, for debugging/monitoring) metrics: enabled: false diff --git a/internal/config/config.go b/internal/config/config.go index 848d790..cc35356 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -98,7 +98,8 @@ type CorrelationConfig struct { Matching MatchingConfig `yaml:"matching"` Buffers BuffersConfig `yaml:"buffers"` TTL TTLConfig `yaml:"ttl"` - ExcludeSourceIPs []string `yaml:"exclude_source_ips"` // List of source IPs or CIDR ranges to exclude + ExcludeSourceIPs []string `yaml:"exclude_source_ips"` // List of source IPs or CIDR ranges to exclude + IncludeDestPorts []int `yaml:"include_dest_ports"` // If non-empty, only correlate events matching these destination ports // Deprecated: Use TimeWindow.Value instead TimeWindowS int `yaml:"time_window_s"` // Deprecated: Use OrphanPolicy.ApacheAlwaysEmit instead @@ -351,6 +352,12 @@ func (c *UnixSocketConfig) GetSocketPermissions() os.FileMode { return os.FileMode(perms) } +// GetIncludeDestPorts returns the list of destination ports allowed for correlation. +// An empty list means all ports are allowed. +func (c *CorrelationConfig) GetIncludeDestPorts() []int { + return c.IncludeDestPorts +} + // GetExcludeSourceIPs returns the list of excluded source IPs or CIDR ranges. func (c *CorrelationConfig) GetExcludeSourceIPs() []string { return c.ExcludeSourceIPs diff --git a/internal/domain/correlation_service.go b/internal/domain/correlation_service.go index 9b85cb2..8987619 100644 --- a/internal/domain/correlation_service.go +++ b/internal/domain/correlation_service.go @@ -42,8 +42,9 @@ type CorrelationConfig struct { MaxHTTPBufferSize int // Maximum events to buffer for source A (HTTP) MaxNetworkBufferSize int // Maximum events to buffer for source B (Network) NetworkTTLS int // TTL in seconds for network events (source B) - MatchingMode string // "one_to_one" or "one_to_many" (Keep-Alive) + MatchingMode string // "one_to_one" or "one_to_many" (Keep-Alive) ExcludeSourceIPs []string // List of source IPs or CIDR ranges to exclude + IncludeDestPorts []int // If non-empty, only correlate events matching these destination ports } // pendingOrphan represents an A event waiting to be emitted as orphan. @@ -181,6 +182,20 @@ func (s *CorrelationService) isIPExcluded(ip string) bool { return false } +// isDestPortFiltered returns true if the event's destination port is NOT in the +// include list. When IncludeDestPorts is empty, all ports are allowed. +func (s *CorrelationService) isDestPortFiltered(port int) bool { + if len(s.config.IncludeDestPorts) == 0 { + return false // no filter configured: allow all + } + for _, p := range s.config.IncludeDestPorts { + if p == port { + return false // port is in the allow-list + } + } + return true // port not in allow-list: filter out +} + // ProcessEvent processes an incoming event and returns correlated logs if matches are found. func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLog { s.mu.Lock() @@ -194,6 +209,14 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo return nil } + // Check if destination port is in the allow-list + if s.isDestPortFiltered(event.DstPort) { + s.logger.Debugf("event excluded by dest port filter: source=%s dst_port=%d", + event.Source, event.DstPort) + s.metrics.RecordCorrelationFailed("dest_port_filtered") + return nil + } + // Record event received s.metrics.RecordEventReceived(string(event.Source)) diff --git a/internal/domain/correlation_service_test.go b/internal/domain/correlation_service_test.go index 964fced..f2e931d 100644 --- a/internal/domain/correlation_service_test.go +++ b/internal/domain/correlation_service_test.go @@ -1317,3 +1317,100 @@ func TestCorrelationService_ApacheEmitDelay_Flush(t *testing.T) { } } + +func TestCorrelationService_IncludeDestPorts_AllowedPort(t *testing.T) { +now := time.Now() +mt := &mockTimeProvider{now: now} +svc := NewCorrelationService(CorrelationConfig{ +TimeWindow: 10 * time.Second, +ApacheAlwaysEmit: true, +MatchingMode: MatchingModeOneToMany, +IncludeDestPorts: []int{80, 443}, +}, mt) + +// B event on allowed port 443 +bEvent := &NormalizedEvent{ +Source: SourceB, Timestamp: now, +SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 443, +} +results := svc.ProcessEvent(bEvent) +if len(results) != 0 { +t.Fatalf("expected B buffered (no match yet), got %d results", len(results)) +} + +// A event on same key, allowed port +aEvent := &NormalizedEvent{ +Source: SourceA, Timestamp: now, +SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 443, +} +results = svc.ProcessEvent(aEvent) +if len(results) != 1 { +t.Fatalf("expected 1 correlation, got %d", len(results)) +} +if !results[0].Correlated { +t.Error("expected correlated=true") +} +} + +func TestCorrelationService_IncludeDestPorts_FilteredPort(t *testing.T) { +now := time.Now() +mt := &mockTimeProvider{now: now} +svc := NewCorrelationService(CorrelationConfig{ +TimeWindow: 10 * time.Second, +ApacheAlwaysEmit: true, +MatchingMode: MatchingModeOneToMany, +IncludeDestPorts: []int{80, 443}, +}, mt) + +// A event on port 22 (not in allow-list) +aEvent := &NormalizedEvent{ +Source: SourceA, Timestamp: now, +SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 22, +} +results := svc.ProcessEvent(aEvent) +if len(results) != 0 { +t.Fatalf("expected 0 results (filtered), got %d", len(results)) +} + +// B event on port 22 (not in allow-list) +bEvent := &NormalizedEvent{ +Source: SourceB, Timestamp: now, +SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 22, +} +results = svc.ProcessEvent(bEvent) +if len(results) != 0 { +t.Fatalf("expected 0 results (filtered), got %d", len(results)) +} + +// Flush should also return nothing +flushed := svc.Flush() +if len(flushed) != 0 { +t.Errorf("expected 0 flushed events, got %d", len(flushed)) +} +} + +func TestCorrelationService_IncludeDestPorts_EmptyAllowsAll(t *testing.T) { +now := time.Now() +mt := &mockTimeProvider{now: now} +// No IncludeDestPorts = all ports allowed +svc := NewCorrelationService(CorrelationConfig{ +TimeWindow: 10 * time.Second, +ApacheAlwaysEmit: true, +MatchingMode: MatchingModeOneToMany, +}, mt) + +bEvent := &NormalizedEvent{ +Source: SourceB, Timestamp: now, +SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 9999, +} +svc.ProcessEvent(bEvent) + +aEvent := &NormalizedEvent{ +Source: SourceA, Timestamp: now, +SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 9999, +} +results := svc.ProcessEvent(aEvent) +if len(results) != 1 || !results[0].Correlated { +t.Errorf("expected 1 correlation on any port when list is empty, got %d", len(results)) +} +} diff --git a/packaging/rpm/logcorrelator.spec b/packaging/rpm/logcorrelator.spec index b913e48..91b9a6e 100644 --- a/packaging/rpm/logcorrelator.spec +++ b/packaging/rpm/logcorrelator.spec @@ -141,6 +141,16 @@ exit 0 %config(noreplace) /etc/logrotate.d/logcorrelator %changelog +* Thu Mar 05 2026 logcorrelator - 1.1.12-1 +- Feat: New config directive include_dest_ports - restrict correlation to specific destination ports +- Feat: If include_dest_ports is non-empty, events on unlisted ports are silently ignored (not correlated, not emitted as orphan) +- Feat: New metric failed_dest_port_filtered for monitoring filtered traffic +- Feat: Debug log for filtered events: "event excluded by dest port filter: source=A dst_port=22" +- Test: New unit tests for include_dest_ports (allowed port, filtered port, empty=all) +- Docs: README.md updated with include_dest_ports section and current version references +- Docs: architecture.yml updated with include_dest_ports +- Fix: config.example.yml - removed obsolete stdout.level field + * Thu Mar 05 2026 logcorrelator - 1.1.11-1 - Fix: StdoutSink no longer writes correlated/orphan JSON to stdout - Fix: stdout sink is now a no-op for data; operational logs go to stderr via logger