feat(correlation): add include_dest_ports filter + README/arch update (v1.1.12)

- feat: new config directive include_dest_ports ([]int) in correlation section
- feat: if non-empty, only events with a matching dst_port are correlated
- feat: filtered events are silently ignored (not correlated, not emitted as orphan)
- feat: new metric failed_dest_port_filtered tracked in ProcessEvent
- feat: DEBUG log 'event excluded by dest port filter: source=A dst_port=22'
- test: TestCorrelationService_IncludeDestPorts_AllowedPort
- test: TestCorrelationService_IncludeDestPorts_FilteredPort
- test: TestCorrelationService_IncludeDestPorts_EmptyAllowsAll
- docs(readme): full rewrite to match current code (v1.1.12)
- docs(readme): add include_dest_ports section, fix version refs, clean outdated sections
- docs(arch): add dest_port_filtering section, failed_dest_port_filtered metric, debug log example
- fix(config.example): remove obsolete stdout.level field
- chore: bump version to 1.1.12

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
toto
2026-03-05 13:51:20 +01:00
parent ba9e0ab477
commit a8e024105d
9 changed files with 454 additions and 544 deletions

View File

@ -20,7 +20,7 @@ BINARY_NAME=logcorrelator
DIST_DIR=dist DIST_DIR=dist
# Package version # Package version
PKG_VERSION ?= 1.1.11 PKG_VERSION ?= 1.1.12
# Enable BuildKit for better performance # Enable BuildKit for better performance
export DOCKER_BUILDKIT=1 export DOCKER_BUILDKIT=1

738
README.md
View File

@ -4,152 +4,146 @@ Service de corrélation de logs HTTP et réseau écrit en Go.
## Description ## Description
**logcorrelator** reçoit deux flux de logs JSON via des sockets Unix : **logcorrelator** reçoit deux flux de logs JSON via des sockets Unix datagrammes (SOCK_DGRAM) :
- **Source A** : logs HTTP applicatifs (Apache, reverse proxy) - **Source A** : logs HTTP applicatifs (Apache, reverse proxy)
- **Source B** : logs réseau (métadonnées IP/TCP, JA3/JA4, etc.) - **Source B** : logs réseau (métadonnées IP/TCP, JA3/JA4, etc.)
Il corrèle les événements sur la base de `src_ip + src_port` avec une fenêtre temporelle configurable, et produit des logs corrélés vers : Il corrèle les événements sur la base de `src_ip + src_port` dans une fenêtre temporelle configurable, et produit des logs corrélés vers :
- Un fichier local (JSON lines) - Un fichier local (JSON lines)
- ClickHouse (pour analyse et archivage) - ClickHouse (pour analyse et archivage)
Les logs opérationnels du service (démarrage, erreurs, métriques) sont écrits sur **stderr** et collectés par journald. Aucune donnée corrélée n'apparaît sur stdout.
## Architecture ## Architecture
``` ```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
Apache Source │────▶│ │────▶│ File Sink │ Source A │────▶│ │────▶│ File Sink │
(Unix Socket) │ │ Correlation │ │ (JSON lines) │ HTTP/Apache │ │ Correlation │ │ (JSON lines) │
└─────────────────┘ │ Service │ └─────────────────┘ │ (Unix DGRAM) │ │ Service │ └─────────────────┘
│ │ └─────────────────┘ │ │
┌─────────────────┐ │ - Buffers │ ┌─────────────────┐ │ - Buffers │ ┌─────────────────┐
│ Network Source │────▶│ - Time Window │────▶│ ClickHouse │ ┌─────────────────┐ │ - Time Window │────▶│ ClickHouse │
(Unix Socket) │ - Orphan Policy │ │ Sink │ Source B────▶│ - Orphan Policy │ │ Sink │
└─────────────────┘ └──────────────────┘ └─────────────────┘ │ Réseau/JA4 │ │ - Keep-Alive │ └─────────────────┘
│ (Unix DGRAM) │ └──────────────────┘
└─────────────────┘
``` ```
Architecture hexagonale : domaine pur (`internal/domain`), ports abstraits (`internal/ports`), adaptateurs (`internal/adapters`), orchestration (`internal/app`).
## Build (100% Docker) ## Build (100% Docker)
Tout le build et les tests s'exécutent dans des containers Docker : Tout le build, les tests et le packaging RPM s'exécutent dans des conteneurs :
```bash ```bash
# Build complet (binaire + tests + RPM) # Build complet avec tests (builder stage)
make docker-build-dev
# Packaging RPM (el8, el9, el10)
make package-rpm make package-rpm
# Uniquement les tests # Build rapide sans tests
make test make docker-build-dev-no-test
# Build manuel avec Docker # Tests en local (nécessite Go 1.21+)
docker build --target builder -t logcorrelator-builder . make test
docker build --target runtime -t logcorrelator:latest .
``` ```
### Prérequis ### Prérequis
- Docker 20.10+ - Docker 20.10+
- Bash
## Installation ## Installation
### Depuis Docker ### Packages RPM
```bash
# Build de l'image
docker build --target runtime -t logcorrelator:latest .
# Exécuter
docker run -d \
--name logcorrelator \
-v /var/run/logcorrelator:/var/run/logcorrelator \
-v /var/log/logcorrelator:/var/log/logcorrelator \
-v ./config.example.yml:/etc/logcorrelator/logcorrelator.yml \
logcorrelator:latest
```
### Depuis les packages RPM
```bash ```bash
# Générer les packages # Générer les packages
make package-rpm make package-rpm
# Installer le package RPM (Rocky Linux 8/9/10) # Installer (Rocky Linux / AlmaLinux)
sudo dnf install -y dist/rpm/el8/logcorrelator-1.1.10-1.el8.x86_64.rpm sudo dnf install -y dist/rpm/el8/logcorrelator-1.1.12-1.el8.x86_64.rpm
sudo dnf install -y dist/rpm/el9/logcorrelator-1.1.10-1.el9.x86_64.rpm sudo dnf install -y dist/rpm/el9/logcorrelator-1.1.12-1.el9.x86_64.rpm
sudo dnf install -y dist/rpm/el10/logcorrelator-1.1.10-1.el10.x86_64.rpm sudo dnf install -y dist/rpm/el10/logcorrelator-1.1.12-1.el10.x86_64.rpm
# Activer et démarrer le service # Démarrer
sudo systemctl enable logcorrelator sudo systemctl enable --now logcorrelator
sudo systemctl start logcorrelator
# Vérifier le statut
sudo systemctl status logcorrelator sudo systemctl status logcorrelator
``` ```
### Build manuel (sans Docker) ### Build manuel
```bash ```bash
# Prérequis: Go 1.21+ # Binaire local (nécessite Go 1.21+)
go build -o logcorrelator ./cmd/logcorrelator go build -o logcorrelator ./cmd/logcorrelator
# Exécuter
./logcorrelator -config config.example.yml ./logcorrelator -config config.example.yml
``` ```
## Configuration ## Configuration
La configuration utilise un fichier YAML. Voir `config.example.yml` pour un exemple complet. Fichier YAML. Voir `config.example.yml` pour un exemple complet.
```yaml ```yaml
# /etc/logcorrelator/logcorrelator.yml
log: log:
level: INFO # DEBUG, INFO, WARN, ERROR level: INFO # DEBUG, INFO, WARN, ERROR
inputs: inputs:
unix_sockets: unix_sockets:
# Source HTTP (A) : logs applicatifs en JSON
- name: http - name: http
path: /var/run/logcorrelator/http.sock source_type: A # Source HTTP
path: /var/run/logcorrelator/http.socket
format: json
socket_permissions: "0666" socket_permissions: "0666"
socket_type: dgram
# Source réseau (B) : logs IP/TCP/JA3... en JSON
- name: network - name: network
path: /var/run/logcorrelator/network.sock source_type: B # Source réseau
path: /var/run/logcorrelator/network.socket
format: json
socket_permissions: "0666" socket_permissions: "0666"
socket_type: dgram
outputs: outputs:
file: file:
enabled: true
path: /var/log/logcorrelator/correlated.log path: /var/log/logcorrelator/correlated.log
format: json_lines
clickhouse: clickhouse:
enabled: true enabled: false
dsn: clickhouse://data_writer:password@localhost:9000/mabase_prod dsn: clickhouse://user:pass@localhost:9000/db
table: http_logs_raw table: http_logs_raw
batch_size: 500 batch_size: 500
flush_interval_ms: 200 flush_interval_ms: 200
max_buffer_size: 5000 max_buffer_size: 5000
drop_on_overflow: true drop_on_overflow: true
timeout_ms: 1000
stdout:
enabled: false # no-op pour les données ; logs opérationnels toujours sur stderr
correlation: correlation:
time_window: time_window:
value: 1 value: 10
unit: s unit: s
orphan_policy: orphan_policy:
apache_always_emit: true apache_always_emit: true
apache_emit_delay_ms: 500 # délai avant émission orphelin A (ms)
network_emit: false network_emit: false
matching: matching:
mode: one_to_many # Keep-Alive : un B peut corréler plusieurs A mode: one_to_many # Keep-Alive : un B peut corréler plusieurs A successifs
buffers: buffers:
max_http_items: 10000 max_http_items: 10000
max_network_items: 20000 max_network_items: 20000
ttl: ttl:
network_ttl_s: 30 network_ttl_s: 120 # TTL remis à zéro à chaque corrélation (Keep-Alive)
# Exclure certaines IPs source (optionnel) # Exclure des IPs source (IPs uniques ou plages CIDR)
exclude_source_ips: exclude_source_ips:
- 10.0.0.1 # IP unique - 10.0.0.1
- 192.168.0.0/16 # Plage CIDR - 172.16.0.0/12
# Restreindre la corrélation à certains ports de destination (optionnel)
# Si la liste est vide, tous les ports sont corrélés
include_dest_ports:
- 80
- 443
metrics:
enabled: false
addr: ":8080"
``` ```
### Format du DSN ClickHouse ### Format du DSN ClickHouse
@ -158,11 +152,7 @@ correlation:
clickhouse://username:password@host:port/database clickhouse://username:password@host:port/database
``` ```
Exemple : `clickhouse://data_writer:MonMotDePasse@127.0.0.1:9000/mabase_prod` Ports : `9000` (natif, recommandé) ou `8123` (HTTP).
Ports courants :
- `9000` : port natif (recommandé pour le driver Go)
- `8123` : port HTTP (alternative)
## Format des logs ## Format des logs
@ -170,14 +160,10 @@ Ports courants :
```json ```json
{ {
"src_ip": "192.168.1.1", "src_ip": "192.168.1.1", "src_port": 8080,
"src_port": 8080, "dst_ip": "10.0.0.1", "dst_port": 443,
"dst_ip": "10.0.0.1",
"dst_port": 80,
"timestamp": 1704110400000000000, "timestamp": 1704110400000000000,
"method": "GET", "method": "GET", "path": "/api/test"
"path": "/api/test",
"header_host": "example.com"
} }
``` ```
@ -185,49 +171,40 @@ Ports courants :
```json ```json
{ {
"src_ip": "192.168.1.1", "src_ip": "192.168.1.1", "src_port": 8080,
"src_port": 8080, "dst_ip": "10.0.0.1", "dst_port": 443,
"dst_ip": "10.0.0.1", "ja3": "abc123", "ja4": "xyz789"
"dst_port": 443,
"ja3": "abc123def456",
"ja4": "xyz789"
} }
``` ```
### Log corrélé (sortie) ### Log corrélé (sortie)
**Version 1.0.3+** - Structure JSON plate (flat) : Structure JSON plate — tous les champs A et B sont fusionnés à la racine :
```json ```json
{ {
"timestamp": "2024-01-01T12:00:00Z", "timestamp": "2024-01-01T12:00:00Z",
"src_ip": "192.168.1.1", "src_ip": "192.168.1.1", "src_port": 8080,
"src_port": 8080, "dst_ip": "10.0.0.1", "dst_port": 443,
"dst_ip": "10.0.0.1",
"dst_port": 80,
"correlated": true, "correlated": true,
"method": "GET", "method": "GET", "path": "/api/test",
"path": "/api/test", "ja3": "abc123", "ja4": "xyz789"
"ja3": "abc123def456",
"ja4": "xyz789"
} }
``` ```
Tous les champs des sources A et B sont fusionnés au même niveau. Les champs de corrélation (`src_ip`, `src_port`, `dst_ip`, `dst_port`, `correlated`, `orphan_side`) sont toujours présents, et tous les autres champs des logs sources sont ajoutés directement à la racine. En cas de collision de champ entre A et B, les deux valeurs sont conservées avec préfixes `a_` et `b_`.
Les orphelins A (sans B correspondant) sont émis avec `"correlated": false, "orphan_side": "A"`.
## Schema ClickHouse ## Schema ClickHouse
Le service utilise deux tables ClickHouse dans la base `mabase_prod` :
### Setup complet ### Setup complet
```sql ```sql
-- 1. Créer la base de données -- Base de données
CREATE DATABASE IF NOT EXISTS mabase_prod; CREATE DATABASE IF NOT EXISTS mabase_prod;
-- 2. Table brute avec TTL (1 jour de rétention) -- Table brute (cible des inserts du service)
DROP TABLE IF EXISTS mabase_prod.http_logs_raw;
CREATE TABLE mabase_prod.http_logs_raw CREATE TABLE mabase_prod.http_logs_raw
( (
raw_json String, raw_json String,
@ -237,86 +214,47 @@ ENGINE = MergeTree
PARTITION BY toDate(ingest_time) PARTITION BY toDate(ingest_time)
ORDER BY ingest_time; ORDER BY ingest_time;
-- 3. Table parsée -- Table parsée
DROP TABLE IF EXISTS mabase_prod.http_logs;
CREATE TABLE mabase_prod.http_logs CREATE TABLE mabase_prod.http_logs
( (
time DateTime, time DateTime,
log_date Date DEFAULT toDate(time), log_date Date DEFAULT toDate(time),
src_ip IPv4, src_port UInt16,
src_ip IPv4, dst_ip IPv4, dst_port UInt16,
src_port UInt16,
dst_ip IPv4,
dst_port UInt16,
method LowCardinality(String), method LowCardinality(String),
scheme LowCardinality(String), scheme LowCardinality(String),
host LowCardinality(String), host LowCardinality(String),
path String, path String, query String,
query String,
http_version LowCardinality(String), http_version LowCardinality(String),
orphan_side LowCardinality(String), orphan_side LowCardinality(String),
correlated UInt8, correlated UInt8,
keepalives UInt16,
a_timestamp UInt64,
b_timestamp UInt64,
conn_id String,
ip_meta_df UInt8,
ip_meta_id UInt32,
ip_meta_total_length UInt32,
ip_meta_ttl UInt8,
tcp_meta_options LowCardinality(String),
tcp_meta_window_size UInt32,
syn_to_clienthello_ms Int32,
tls_version LowCardinality(String), tls_version LowCardinality(String),
tls_sni LowCardinality(String), tls_sni LowCardinality(String),
ja3 String, ja3 String, ja3_hash String, ja4 String,
ja3_hash String, header_user_agent String, header_accept String,
ja4 String, header_accept_encoding String, header_accept_language String,
header_user_agent String,
header_accept String,
header_accept_encoding String,
header_accept_language String,
header_x_request_id String,
header_x_trace_id String,
header_x_forwarded_for String, header_x_forwarded_for String,
header_sec_ch_ua String, header_sec_ch_ua_mobile String,
header_sec_ch_ua String,
header_sec_ch_ua_mobile String,
header_sec_ch_ua_platform String, header_sec_ch_ua_platform String,
header_sec_fetch_dest String, header_sec_fetch_dest String, header_sec_fetch_mode String, header_sec_fetch_site String,
header_sec_fetch_mode String, ip_meta_ttl UInt8, ip_meta_df UInt8,
header_sec_fetch_site String tcp_meta_window_size UInt32, tcp_meta_options LowCardinality(String),
syn_to_clienthello_ms Int32
) )
ENGINE = MergeTree ENGINE = MergeTree
PARTITION BY log_date PARTITION BY log_date
ORDER BY (time, src_ip, dst_ip, ja4); ORDER BY (time, src_ip, dst_ip, ja4);
-- 4. Vue matérialisée (RAW → logs) -- Vue matérialisée RAW → http_logs
DROP VIEW IF EXISTS mabase_prod.mv_http_logs;
CREATE MATERIALIZED VIEW mabase_prod.mv_http_logs CREATE MATERIALIZED VIEW mabase_prod.mv_http_logs
TO mabase_prod.http_logs TO mabase_prod.http_logs AS
AS
SELECT SELECT
-- 1. Temps parseDateTimeBestEffort(coalesce(JSONExtractString(raw_json,'time'),'1970-01-01T00:00:00Z')) AS time,
parseDateTimeBestEffort(
coalesce(JSONExtractString(raw_json, 'time'), '1970-01-01T00:00:00Z')
) AS time,
toDate(time) AS log_date, toDate(time) AS log_date,
-- 2. Réseau L3/L4
toIPv4(coalesce(JSONExtractString(raw_json,'src_ip'),'0.0.0.0')) AS src_ip, toIPv4(coalesce(JSONExtractString(raw_json,'src_ip'),'0.0.0.0')) AS src_ip,
toUInt16(coalesce(JSONExtractUInt(raw_json, 'src_port'), 0)) AS src_port, toUInt16(JSONExtractUInt(raw_json,'src_port')) AS src_port,
toIPv4(coalesce(JSONExtractString(raw_json,'dst_ip'),'0.0.0.0')) AS dst_ip, toIPv4(coalesce(JSONExtractString(raw_json,'dst_ip'),'0.0.0.0')) AS dst_ip,
toUInt16(coalesce(JSONExtractUInt(raw_json, 'dst_port'), 0)) AS dst_port, toUInt16(JSONExtractUInt(raw_json,'dst_port')) AS dst_port,
-- 3. HTTP de base
coalesce(JSONExtractString(raw_json,'method'),'') AS method, coalesce(JSONExtractString(raw_json,'method'),'') AS method,
coalesce(JSONExtractString(raw_json,'scheme'),'') AS scheme, coalesce(JSONExtractString(raw_json,'scheme'),'') AS scheme,
coalesce(JSONExtractString(raw_json,'host'),'') AS host, coalesce(JSONExtractString(raw_json,'host'),'') AS host,
@ -324,418 +262,220 @@ SELECT
coalesce(JSONExtractString(raw_json,'query'),'') AS query, coalesce(JSONExtractString(raw_json,'query'),'') AS query,
coalesce(JSONExtractString(raw_json,'http_version'),'') AS http_version, coalesce(JSONExtractString(raw_json,'http_version'),'') AS http_version,
coalesce(JSONExtractString(raw_json,'orphan_side'),'') AS orphan_side, coalesce(JSONExtractString(raw_json,'orphan_side'),'') AS orphan_side,
toUInt8(JSONExtractBool(raw_json,'correlated')) AS correlated,
-- 4. Connexion / corrélation
toUInt8(coalesce(JSONExtractBool(raw_json, 'correlated'), 0)) AS correlated,
toUInt16(coalesce(JSONExtractUInt(raw_json, 'keepalives'), 0)) AS keepalives,
coalesce(JSONExtractUInt(raw_json, 'a_timestamp'), 0) AS a_timestamp,
coalesce(JSONExtractUInt(raw_json, 'b_timestamp'), 0) AS b_timestamp,
coalesce(JSONExtractString(raw_json, 'conn_id'), '') AS conn_id,
-- 5. IP/TCP
toUInt8(coalesce(JSONExtractBool(raw_json, 'ip_meta_df'), 0)) AS ip_meta_df,
coalesce(JSONExtractUInt(raw_json, 'ip_meta_id'), 0) AS ip_meta_id,
coalesce(JSONExtractUInt(raw_json, 'ip_meta_total_length'), 0) AS ip_meta_total_length,
coalesce(JSONExtractUInt(raw_json, 'ip_meta_ttl'), 0) AS ip_meta_ttl,
coalesce(JSONExtractString(raw_json, 'tcp_meta_options'), '') AS tcp_meta_options,
coalesce(JSONExtractUInt(raw_json, 'tcp_meta_window_size'), 0) AS tcp_meta_window_size,
toInt32(coalesce(JSONExtractInt(raw_json, 'syn_to_clienthello_ms'), 0)) AS syn_to_clienthello_ms,
-- 6. TLS / JA3/JA4
coalesce(JSONExtractString(raw_json,'tls_version'),'') AS tls_version, coalesce(JSONExtractString(raw_json,'tls_version'),'') AS tls_version,
coalesce(JSONExtractString(raw_json,'tls_sni'),'') AS tls_sni, coalesce(JSONExtractString(raw_json,'tls_sni'),'') AS tls_sni,
coalesce(JSONExtractString(raw_json,'ja3'),'') AS ja3, coalesce(JSONExtractString(raw_json,'ja3'),'') AS ja3,
coalesce(JSONExtractString(raw_json,'ja3_hash'),'') AS ja3_hash, coalesce(JSONExtractString(raw_json,'ja3_hash'),'') AS ja3_hash,
coalesce(JSONExtractString(raw_json,'ja4'),'') AS ja4, coalesce(JSONExtractString(raw_json,'ja4'),'') AS ja4,
-- 7. Headers HTTP
coalesce(JSONExtractString(raw_json,'header_User-Agent'),'') AS header_user_agent, coalesce(JSONExtractString(raw_json,'header_User-Agent'),'') AS header_user_agent,
coalesce(JSONExtractString(raw_json,'header_Accept'),'') AS header_accept, coalesce(JSONExtractString(raw_json,'header_Accept'),'') AS header_accept,
coalesce(JSONExtractString(raw_json,'header_Accept-Encoding'),'') AS header_accept_encoding, coalesce(JSONExtractString(raw_json,'header_Accept-Encoding'),'') AS header_accept_encoding,
coalesce(JSONExtractString(raw_json,'header_Accept-Language'),'') AS header_accept_language, coalesce(JSONExtractString(raw_json,'header_Accept-Language'),'') AS header_accept_language,
coalesce(JSONExtractString(raw_json, 'header_X-Request-Id'), '') AS header_x_request_id,
coalesce(JSONExtractString(raw_json, 'header_X-Trace-Id'), '') AS header_x_trace_id,
coalesce(JSONExtractString(raw_json,'header_X-Forwarded-For'),'') AS header_x_forwarded_for, coalesce(JSONExtractString(raw_json,'header_X-Forwarded-For'),'') AS header_x_forwarded_for,
coalesce(JSONExtractString(raw_json,'header_Sec-CH-UA'),'') AS header_sec_ch_ua, coalesce(JSONExtractString(raw_json,'header_Sec-CH-UA'),'') AS header_sec_ch_ua,
coalesce(JSONExtractString(raw_json,'header_Sec-CH-UA-Mobile'),'') AS header_sec_ch_ua_mobile, coalesce(JSONExtractString(raw_json,'header_Sec-CH-UA-Mobile'),'') AS header_sec_ch_ua_mobile,
coalesce(JSONExtractString(raw_json,'header_Sec-CH-UA-Platform'),'') AS header_sec_ch_ua_platform, coalesce(JSONExtractString(raw_json,'header_Sec-CH-UA-Platform'),'') AS header_sec_ch_ua_platform,
coalesce(JSONExtractString(raw_json,'header_Sec-Fetch-Dest'),'') AS header_sec_fetch_dest, coalesce(JSONExtractString(raw_json,'header_Sec-Fetch-Dest'),'') AS header_sec_fetch_dest,
coalesce(JSONExtractString(raw_json,'header_Sec-Fetch-Mode'),'') AS header_sec_fetch_mode, coalesce(JSONExtractString(raw_json,'header_Sec-Fetch-Mode'),'') AS header_sec_fetch_mode,
coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Site'), '') AS header_sec_fetch_site coalesce(JSONExtractString(raw_json,'header_Sec-Fetch-Site'),'') AS header_sec_fetch_site,
toUInt8(JSONExtractUInt(raw_json,'ip_meta_ttl')) AS ip_meta_ttl,
toUInt8(JSONExtractBool(raw_json,'ip_meta_df')) AS ip_meta_df,
toUInt32(JSONExtractUInt(raw_json,'tcp_meta_window_size')) AS tcp_meta_window_size,
coalesce(JSONExtractString(raw_json,'tcp_meta_options'),'') AS tcp_meta_options,
toInt32(JSONExtractInt(raw_json,'syn_to_clienthello_ms')) AS syn_to_clienthello_ms
FROM mabase_prod.http_logs_raw; FROM mabase_prod.http_logs_raw;
``` ```
### Utilisateurs et permissions ### Utilisateurs et permissions
```sql ```sql
-- Créer les utilisateurs CREATE USER IF NOT EXISTS data_writer IDENTIFIED WITH plaintext_password BY 'MotDePasse';
CREATE USER IF NOT EXISTS data_writer IDENTIFIED WITH TonMotDePasseInsert; CREATE USER IF NOT EXISTS analyst IDENTIFIED WITH plaintext_password BY 'MotDePasseAnalyst';
CREATE USER IF NOT EXISTS analyst IDENTIFIED WITH TonMotDePasseAnalyst;
-- Droits pour data_writer (INSERT + SELECT pour la MV)
GRANT INSERT(raw_json) ON mabase_prod.http_logs_raw TO data_writer; GRANT INSERT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;
GRANT SELECT(raw_json) ON mabase_prod.http_logs_raw TO data_writer; GRANT SELECT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;
-- Droits pour analyst (lecture seule sur les logs parsés)
GRANT SELECT ON mabase_prod.http_logs TO analyst; GRANT SELECT ON mabase_prod.http_logs TO analyst;
``` ```
### Format d'insertion ### Vérification de l'ingestion
Le service envoie chaque log corrélé sérialisé en JSON dans la colonne `raw_json` :
```sql ```sql
INSERT INTO mabase_prod.http_logs_raw (raw_json) FORMAT JSONEachRow -- Données brutes reçues
{"raw_json":"{\"timestamp\":\"2024-01-01T12:00:00Z\",\"src_ip\":\"192.168.1.1\",\"correlated\":true,...}"} SELECT count(*), min(ingest_time), max(ingest_time) FROM http_logs_raw;
```
### Migration des données existantes -- Données parsées par la vue matérialisée
SELECT count(*), min(time), max(time) FROM http_logs;
Si vous avez déjà des données dans l'ancienne table `http_logs_raw` : -- Derniers logs
SELECT time, src_ip, dst_ip, method, host, path, ja4
```sql FROM http_logs ORDER BY time DESC LIMIT 10;
INSERT INTO mabase_prod.http_logs (raw_json)
SELECT raw_json
FROM mabase_prod.http_logs_raw;
```
### Sanity checks - Vérification de l'ingestion
Après avoir déployé le service, vérifiez que les données circulent correctement :
```sql
-- 1. Tables présentes
SELECT
database,
table,
engine
FROM system.tables
WHERE database = currentDatabase()
AND table IN ('http_logs_raw', 'http_logs', 'mv_http_logs');
-- 2. Définition de la vue matérialisée
SHOW CREATE TABLE mv_http_logs;
-- 3. Vérifier que les inserts bruts arrivent
SELECT
count(*) AS rows_raw,
min(ingest_time) AS min_ingest,
max(ingest_time) AS max_ingest
FROM http_logs_raw;
-- 4. Voir les derniers logs bruts
SELECT
ingest_time,
raw_json
FROM http_logs_raw
ORDER BY ingest_time DESC
LIMIT 5;
-- 5. Vérifier que la MV alimente http_logs
SELECT
count(*) AS rows_flat,
min(time) AS min_time,
max(time) AS max_time
FROM http_logs;
-- 6. Voir les derniers logs parsés
SELECT
time,
src_ip,
dst_ip,
method,
host,
path,
header_user_agent,
tls_version,
ja4
FROM http_logs
ORDER BY time DESC
LIMIT 10;
```
**Interprétation :**
- Si `rows_raw` > 0 mais `rows_flat` = 0 : la vue matérialisée ne fonctionne pas (vérifiez les droits SELECT sur `http_logs_raw`)
- Si les deux comptes sont > 0 : l'ingestion et le parsing fonctionnent correctement
## Tests
```bash
# Via Docker
./test.sh
# Local
go test ./...
go test -cover ./...
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out
``` ```
## Signaux ## Signaux
| Signal | Comportement | | Signal | Comportement |
|--------|--------------| |--------|--------------|
| `SIGINT` | Arrêt gracieux | | `SIGINT` / `SIGTERM` | Arrêt gracieux (drain buffers, flush sinks) |
| `SIGTERM` | Arrêt gracieux | | `SIGHUP` | Réouverture des fichiers de sortie (log rotation) |
Lors de l'arrêt gracieux :
1. Fermeture des sockets Unix
2. Flush des buffers
3. Émission des événements en attente
4. Fermeture des connexions ClickHouse
## Logs internes ## Logs internes
Les logs internes sont envoyés vers stderr : Les logs opérationnels vont sur **stderr** :
```bash ```bash
# Docker
docker logs -f logcorrelator
# Systemd # Systemd
journalctl -u logcorrelator -f journalctl -u logcorrelator -f
# Docker
docker logs -f logcorrelator
``` ```
## Structure du projet ## Structure du projet
``` ```
. cmd/logcorrelator/ # Point d'entrée
├── cmd/logcorrelator/ # Point d'entrée internal/
├── internal/ adapters/
├── adapters/ inbound/unixsocket/ # Lecture SOCK_DGRAM → NormalizedEvent
│ ├── inbound/unixsocket/ # Sources HTTP et réseau outbound/
│ │ └── outbound/ clickhouse/ # Sink ClickHouse (batch, retry, logging complet)
├── clickhouse/ # Sink ClickHouse file/ # Sink fichier (JSON lines, SIGHUP reopen)
├── file/ # Sink fichier multi/ # Fan-out vers plusieurs sinks
└── multi/ # Multi-sink stdout/ # No-op pour les données (logs opérationnels sur stderr)
├── app/ # Orchestration app/ # Orchestrator (sources → corrélation → sinks)
│ ├── config/ # Configuration YAML config/ # Chargement/validation YAML
│ ├── domain/ # Domaine (corrélation) domain/ # CorrelationService, NormalizedEvent, CorrelatedLog
│ ├── observability/ # Logging interne observability/ # Logger, métriques, serveur HTTP /metrics /health
│ └── ports/ # Interfaces ports/ # Interfaces EventSource, CorrelatedLogSink, CorrelationProcessor
├── config.example.yml # Exemple de config config.example.yml # Exemple de configuration
├── Dockerfile # Build multi-stage Dockerfile # Build multi-stage (builder, runtime, dev)
├── Dockerfile.package # Packaging RPM Dockerfile.package # Packaging RPM multi-distros (el8, el9, el10)
├── Makefile # Commandes de build Makefile # Cibles de build
├── architecture.yml # Spécification architecture architecture.yml # Spécification architecture
└── logcorrelator.service # Unité systemd logcorrelator.service # Unité systemd
``` ```
## License ## Débogage
MIT ### Logs DEBUG
```yaml
log:
level: DEBUG
```
Exemples de logs produits :
```
[unixsocket:http] DEBUG event received: source=A src_ip=192.168.1.1 src_port=8080
[correlation] DEBUG processing A event: key=192.168.1.1:8080
[correlation] DEBUG correlation found: A(src_ip=... src_port=... ts=...) + B(...)
[correlation] DEBUG A event has no matching B key in buffer: key=...
[correlation] DEBUG event excluded by IP filter: source=A src_ip=10.0.0.1 src_port=8080
[correlation] DEBUG event excluded by dest port filter: source=A dst_port=22
[correlation] DEBUG TTL reset for B event (Keep-Alive): key=... new_ttl=120s
[clickhouse] DEBUG batch sent: rows=42 table=http_logs_raw
```
### Serveur de métriques
```yaml
metrics:
enabled: true
addr: ":8080"
```
`GET /health``{"status":"healthy"}`
`GET /metrics` :
```json
{
"events_received_a": 1542, "events_received_b": 1498,
"correlations_success": 1450, "correlations_failed": 92,
"failed_no_match_key": 45, "failed_time_window": 23,
"failed_buffer_eviction": 5, "failed_ttl_expired": 12,
"failed_ip_excluded": 7, "failed_dest_port_filtered": 3,
"buffer_a_size": 23, "buffer_b_size": 18,
"orphans_emitted_a": 92, "orphans_pending_a": 4,
"keepalive_resets": 892
}
```
### Diagnostic par métriques
| Métrique élevée | Cause | Solution |
|---|---|---|
| `failed_no_match_key` | A et B n'ont pas le même `src_ip:src_port` | Vérifier les deux sources |
| `failed_time_window` | Timestamps trop éloignés | Augmenter `time_window.value` ou vérifier NTP |
| `failed_ttl_expired` | B expire avant corrélation | Augmenter `ttl.network_ttl_s` |
| `failed_buffer_eviction` | Buffers trop petits | Augmenter `buffers.max_http_items` / `max_network_items` |
| `failed_ip_excluded` | Traffic depuis IPs exclues | Normal si attendu |
| `failed_dest_port_filtered` | Traffic sur ports non listés | Vérifier `include_dest_ports` |
| `orphans_emitted_a` élevé | Beaucoup de A sans B | Vérifier que la source B envoie des événements |
### Filtrage par IP source
```yaml
correlation:
exclude_source_ips:
- 10.0.0.1 # IP unique (health checks)
- 172.16.0.0/12 # Plage CIDR
```
Les événements depuis ces IPs sont silencieusement ignorés (non corrélés, non émis en orphelin). La métrique `failed_ip_excluded` comptabilise les exclusions.
### Filtrage par port de destination
```yaml
correlation:
include_dest_ports:
- 80 # HTTP
- 443 # HTTPS
- 8080
- 8443
```
Si la liste est non vide, seuls les événements dont le `dst_port` est dans la liste participent à la corrélation. Les autres sont silencieusement ignorés. Liste vide = tous les ports corrélés (comportement par défaut). La métrique `failed_dest_port_filtered` comptabilise les exclusions.
### Scripts de test
```bash
# Script Bash (simple)
./scripts/test-correlation.sh -c 10 -v
# Script Python (scénarios complets : basic, time window, keepalive, différentes IPs)
pip install requests
python3 scripts/test-correlation-advanced.py --all
```
## Troubleshooting ## Troubleshooting
### ClickHouse : erreurs d'insertion ### ClickHouse : erreurs d'insertion
**Erreur : `No such column timestamp`** - **`No such column`** : vérifier que la table utilise la colonne unique `raw_json` (pas de colonnes séparées)
- Vérifiez que la table de destination est bien `http_logs_raw` (colonne unique `raw_json`) - **`ACCESS_DENIED`** : `GRANT INSERT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;`
- Le service envoie un JSON sérialisé dans `raw_json`, pas des colonnes séparées - Les erreurs de flush sont loggées en ERROR dans les logs du service
**Erreur : `ACCESS_DENIED`** ### Vue matérialisée vide
- Vérifiez les droits de l'utilisateur `data_writer` :
```sql
GRANT INSERT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;
GRANT SELECT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;
```
### Vue matérialisée ne fonctionne pas Si `http_logs_raw` a des données mais `http_logs` est vide :
**Symptôme :** `http_logs_raw` a des données, mais `http_logs` est vide
1. Vérifiez que la MV existe :
```sql ```sql
SHOW CREATE TABLE mv_http_logs; SHOW CREATE TABLE mv_http_logs;
``` GRANT SELECT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;
2. Vérifiez les droits SELECT pour `data_writer` sur `http_logs_raw`
3. Testez manuellement :
```sql
INSERT INTO mabase_prod.http_logs
SELECT * FROM mabase_prod.mv_http_logs
WHERE time > now() - INTERVAL 1 HOUR;
``` ```
### Sockets Unix : permission denied ### Sockets Unix : permission denied
**Erreur :** `permission denied` sur `/var/run/logcorrelator/*.sock` Vérifier que `socket_permissions: "0666"` est configuré et que le répertoire `/var/run/logcorrelator` appartient à l'utilisateur `logcorrelator`.
- Vérifiez que les sockets ont les permissions `0666`
- Vérifiez que l'utilisateur `logcorrelator` peut lire/écrire
### Service systemd ne démarre pas ### Service systemd ne démarre pas
1. Vérifiez les logs :
```bash ```bash
journalctl -u logcorrelator -n 50 --no-pager journalctl -u logcorrelator -n 50 --no-pager
```
2. Vérifiez la configuration :
```bash
cat /etc/logcorrelator/logcorrelator.yml
```
3. Testez manuellement :
```bash
/usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.yml /usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.yml
``` ```
## Débogage de la corrélation ## License
### Activer les logs DEBUG MIT
Pour diagnostiquer les problèmes de corrélation, activez le niveau de log `DEBUG` :
```yaml
# /etc/logcorrelator/logcorrelator.yml
log:
level: DEBUG
```
Les logs DEBUG affichent :
- Réception des événements A et B avec clé de corrélation et timestamp
- Tentatives de matching (succès et échecs)
- Raisons des échecs : `no_match_key`, `time_window`, `buffer_eviction`, `ttl_expired`
- Émission des orphelins (immédiats ou après délai)
- Resets TTL (mode Keep-Alive)
Exemple de logs :
```
[unixsocket:http] DEBUG event received: source=A src_ip=192.168.1.1 src_port=8080 timestamp=2026-03-04 11:00:00.123456789 +0000 UTC
[correlation] DEBUG processing A event: key=192.168.1.1:8080 timestamp=2026-03-04 11:00:00.123456789 +0000 UTC
[correlation] DEBUG A event has no matching B key in buffer: key=192.168.1.1:8080
[correlation] DEBUG A event added to pending orphans (delay=500ms): src_ip=192.168.1.1 src_port=8080
[correlation] DEBUG correlation found: A(src_ip=192.168.1.1 src_port=8080 ts=...) + B(src_ip=192.168.1.1 src_port=8080 ts=...)
```
### Serveur de métriques
Le service expose un serveur HTTP optionnel pour le monitoring et le débogage.
**Configuration :**
```yaml
metrics:
enabled: true
addr: ":8080" # Adresse d'écoute
```
**Endpoints :**
- `GET /metrics` - Retourne les métriques de corrélation au format JSON
- `GET /health` - Health check
**Métriques disponibles :**
| Métrique | Description |
|----------|-------------|
| `events_received_a` | Nombre d'événements A reçus |
| `events_received_b` | Nombre d'événements B reçus |
| `correlations_success` | Corrélations réussies |
| `correlations_failed` | Échecs de corrélation |
| `failed_no_match_key` | Échec : clé `src_ip:src_port` non trouvée |
| `failed_time_window` | Échec : hors fenêtre temporelle |
| `failed_buffer_eviction` | Échec : buffer plein |
| `failed_ttl_expired` | Échec : TTL expiré |
| `buffer_a_size` | Taille du buffer A |
| `buffer_b_size` | Taille du buffer B |
| `orphans_emitted_a` | Orphelins A émis |
| `orphans_emitted_b` | Orphelins B émis |
| `orphans_pending_a` | Orphelins A en attente |
| `pending_orphan_match` | B a corrélé avec un orphelin A en attente |
| `keepalive_resets` | Resets TTL (mode Keep-Alive) |
**Exemple de réponse :**
```json
{
"events_received_a": 150,
"events_received_b": 145,
"correlations_success": 140,
"correlations_failed": 10,
"failed_no_match_key": 5,
"failed_time_window": 3,
"failed_buffer_eviction": 0,
"failed_ttl_expired": 2,
"buffer_a_size": 10,
"buffer_b_size": 5,
"orphans_emitted_a": 10,
"keepalive_resets": 25
}
```
### Diagnostic rapide
Selon les métriques, identifiez la cause des échecs :
| Métrique élevée | Cause probable | Solution |
|----------------|----------------|----------|
| `failed_no_match_key` | Les logs A et B n'ont pas le même `src_ip + src_port` | Vérifiez que les deux sources utilisent bien la même combinaison IP/port |
| `failed_time_window` | Timestamps trop éloignés (>10s par défaut) | Augmentez `correlation.time_window.value` ou vérifiez la synchronisation des horloges |
| `failed_ttl_expired` | Les événements B expirent avant corrélation | Augmentez `correlation.ttl.network_ttl_s` |
| `failed_buffer_eviction` | Buffers trop petits pour le volume | Augmentez `correlation.buffers.max_http_items` et `max_network_items` |
| `orphans_emitted_a` élevé | Beaucoup de logs A sans B correspondant | Vérifiez que la source B envoie bien les événements attendus |
| `failed_ip_excluded` élevé | Traffic depuis des IPs exclues | Vérifiez la configuration `exclude_source_ips` |
### Exclure des IPs source
Pour exclure certains logs en fonction de l'IP source, utilisez la configuration `exclude_source_ips` :
```yaml
correlation:
exclude_source_ips:
- 10.0.0.1 # IP unique
- 192.168.1.100 # Autre IP unique
- 172.16.0.0/12 # Plage CIDR (réseau privé)
- 10.10.10.0/24 # Autre plage CIDR
```
**Cas d'usage :**
- Exclure les health checks et sondes de monitoring
- Filtrer le traffic interne connu
- Bloquer des IPs malveillantes ou indésirables
**Comportement :**
- Les événements depuis ces IPs sont silencieusement ignorés
- Ils ne sont pas corrélés, pas émis comme orphelins
- La métrique `failed_ip_excluded` compte le nombre d'événements exclus
- Les logs DEBUG montrent : `event excluded by IP filter: source=A src_ip=10.0.0.1 src_port=8080`
### Scripts de test
Deux scripts sont fournis pour tester la corrélation :
**Script Bash (simple) :**
```bash
# Test de base avec 10 paires d'événements
./scripts/test-correlation.sh -c 10 -v
# Avec chemins de sockets personnalisés
./scripts/test-correlation.sh \
-H /var/run/logcorrelator/http.socket \
-N /var/run/logcorrelator/network.socket \
-m http://localhost:8080/metrics
```
**Script Python (avancé) :**
```bash
# Installation des dépendances
pip install requests
# Test de base
python3 scripts/test-correlation-advanced.py -c 20 -v
# Tous les tests (basic, time window, different IP, keepalive)
python3 scripts/test-correlation-advanced.py --all
# Test spécifique
python3 scripts/test-correlation-advanced.py --time-window
python3 scripts/test-correlation-advanced.py --keepalive
```
**Prérequis :**
- `socat` ou `nc` (netcat) pour le script Bash
- Python 3.6+ et `requests` pour le script Python
- Le service `logcorrelator` doit être en cours d'exécution
- Le serveur de métriques doit être activé pour les vérifications automatiques

View File

@ -412,6 +412,21 @@ correlation:
description: > description: >
Stratégie 1àN : un log réseau peut être utilisé pour plusieurs logs HTTP Stratégie 1àN : un log réseau peut être utilisé pour plusieurs logs HTTP
successifs tant qu'il n'a pas expiré ni été évincé. successifs tant qu'il n'a pas expiré ni été évincé.
ip_filtering:
directive: exclude_source_ips
description: >
Liste d'IPs source (exactes ou plages CIDR) à ignorer silencieusement.
Événements non corrélés, non émis en orphelin. Métrique : failed_ip_excluded.
dest_port_filtering:
directive: include_dest_ports
description: >
Liste blanche de ports de destination. Si non vide, seuls les événements
dont le dst_port est dans la liste participent à la corrélation. Les autres
sont silencieusement ignorés (non corrélés, non émis en orphelin).
Liste vide = tous les ports autorisés (comportement par défaut).
Métrique : failed_dest_port_filtered.
example:
include_dest_ports: [80, 443, 8080, 8443]
schema: schema:
description: > description: >
@ -708,6 +723,8 @@ architecture:
responsibilities: responsibilities:
- Modèles NormalizedEvent et CorrelatedLog. - Modèles NormalizedEvent et CorrelatedLog.
- CorrelationService (fenêtre, TTL, buffers bornés, one-to-many/Keep-Alive, orphelins). - CorrelationService (fenêtre, TTL, buffers bornés, one-to-many/Keep-Alive, orphelins).
- Filtrage par IP source (exclude_source_ips, CIDR).
- Filtrage par port destination (include_dest_ports, liste blanche).
- Custom JSON marshaling pour CorrelatedLog (structure plate). - Custom JSON marshaling pour CorrelatedLog (structure plate).
- name: internal/ports - name: internal/ports
type: ports type: ports
@ -849,6 +866,7 @@ observability:
- "A event has no matching B key in buffer: key=..." - "A event has no matching B key in buffer: key=..."
- "A event has same key as B but outside time window: key=... time_diff=5s window=10s" - "A event has same key as B but outside time window: key=... time_diff=5s window=10s"
- "event excluded by IP filter: source=A src_ip=10.0.0.1 src_port=8080" - "event excluded by IP filter: source=A src_ip=10.0.0.1 src_port=8080"
- "event excluded by dest port filter: source=A dst_port=22"
- "TTL reset for B event (Keep-Alive): key=... new_ttl=120s" - "TTL reset for B event (Keep-Alive): key=... new_ttl=120s"
- "[clickhouse] DEBUG batch sent: rows=42 table=correlated_logs_http_network" - "[clickhouse] DEBUG batch sent: rows=42 table=correlated_logs_http_network"
info_logs: info_logs:
@ -877,6 +895,7 @@ observability:
"failed_buffer_eviction": 5, "failed_buffer_eviction": 5,
"failed_ttl_expired": 12, "failed_ttl_expired": 12,
"failed_ip_excluded": 7, "failed_ip_excluded": 7,
"failed_dest_port_filtered": 3,
"buffer_a_size": 23, "buffer_a_size": 23,
"buffer_b_size": 18, "buffer_b_size": 18,
"orphans_emitted_a": 92, "orphans_emitted_a": 92,
@ -900,6 +919,7 @@ observability:
- failed_buffer_eviction: Buffer plein, événement évincé - failed_buffer_eviction: Buffer plein, événement évincé
- failed_ttl_expired: TTL du événement B expiré - failed_ttl_expired: TTL du événement B expiré
- failed_ip_excluded: Événement exclu par filtre IP (exclude_source_ips) - failed_ip_excluded: Événement exclu par filtre IP (exclude_source_ips)
- failed_dest_port_filtered: Événement exclu par filtre port destination (include_dest_ports)
buffers: buffers:
- buffer_a_size: Taille actuelle du buffer HTTP - buffer_a_size: Taille actuelle du buffer HTTP
- buffer_b_size: Taille actuelle du buffer réseau - buffer_b_size: Taille actuelle du buffer réseau
@ -929,6 +949,9 @@ observability:
- symptom: failed_ip_excluded élevé - symptom: failed_ip_excluded élevé
cause: Traffic depuis des IPs configurées dans exclude_source_ips cause: Traffic depuis des IPs configurées dans exclude_source_ips
solution: Vérifier la configuration, c'est normal si attendu solution: Vérifier la configuration, c'est normal si attendu
- symptom: failed_dest_port_filtered élevé
cause: Traffic sur des ports non listés dans include_dest_ports
solution: Vérifier la configuration include_dest_ports, ou vider la liste pour tout accepter
- symptom: orphans_emitted_a élevé - symptom: orphans_emitted_a élevé
cause: Beaucoup de logs A sans correspondance B cause: Beaucoup de logs A sans correspondance B
solution: Vérifier que la source B envoie bien les événements attendus solution: Vérifier que la source B envoie bien les événements attendus

View File

@ -115,6 +115,7 @@ func main() {
NetworkTTLS: cfg.Correlation.GetNetworkTTLS(), NetworkTTLS: cfg.Correlation.GetNetworkTTLS(),
MatchingMode: cfg.Correlation.GetMatchingMode(), MatchingMode: cfg.Correlation.GetMatchingMode(),
ExcludeSourceIPs: cfg.Correlation.GetExcludeSourceIPs(), ExcludeSourceIPs: cfg.Correlation.GetExcludeSourceIPs(),
IncludeDestPorts: cfg.Correlation.GetIncludeDestPorts(),
}, &domain.RealTimeProvider{}) }, &domain.RealTimeProvider{})
// Set logger for correlation service // Set logger for correlation service

View File

@ -36,7 +36,6 @@ outputs:
stdout: stdout:
enabled: false enabled: false
level: INFO # DEBUG: all logs including orphans, INFO: only correlated, WARN: correlated only, ERROR: none
correlation: correlation:
# Time window for correlation (A and B must be within this window) # Time window for correlation (A and B must be within this window)
@ -74,6 +73,16 @@ correlation:
- 172.16.0.0/12 # CIDR range (private network) - 172.16.0.0/12 # CIDR range (private network)
- 10.10.10.0/24 # Another CIDR range - 10.10.10.0/24 # Another CIDR range
# Restrict correlation to specific destination ports (optional)
# If non-empty, only events whose dst_port matches one of these values will be correlated
# Events on other ports are silently ignored (not correlated, not emitted as orphans)
# Useful to focus on HTTP/HTTPS traffic only and ignore unrelated connections
# include_dest_ports:
# - 80 # HTTP
# - 443 # HTTPS
# - 8080 # HTTP alt
# - 8443 # HTTPS alt
# Metrics server configuration (optional, for debugging/monitoring) # Metrics server configuration (optional, for debugging/monitoring)
metrics: metrics:
enabled: false enabled: false

View File

@ -99,6 +99,7 @@ type CorrelationConfig struct {
Buffers BuffersConfig `yaml:"buffers"` Buffers BuffersConfig `yaml:"buffers"`
TTL TTLConfig `yaml:"ttl"` TTL TTLConfig `yaml:"ttl"`
ExcludeSourceIPs []string `yaml:"exclude_source_ips"` // List of source IPs or CIDR ranges to exclude ExcludeSourceIPs []string `yaml:"exclude_source_ips"` // List of source IPs or CIDR ranges to exclude
IncludeDestPorts []int `yaml:"include_dest_ports"` // If non-empty, only correlate events matching these destination ports
// Deprecated: Use TimeWindow.Value instead // Deprecated: Use TimeWindow.Value instead
TimeWindowS int `yaml:"time_window_s"` TimeWindowS int `yaml:"time_window_s"`
// Deprecated: Use OrphanPolicy.ApacheAlwaysEmit instead // Deprecated: Use OrphanPolicy.ApacheAlwaysEmit instead
@ -351,6 +352,12 @@ func (c *UnixSocketConfig) GetSocketPermissions() os.FileMode {
return os.FileMode(perms) return os.FileMode(perms)
} }
// GetIncludeDestPorts returns the list of destination ports allowed for correlation.
// An empty list means all ports are allowed.
func (c *CorrelationConfig) GetIncludeDestPorts() []int {
return c.IncludeDestPorts
}
// GetExcludeSourceIPs returns the list of excluded source IPs or CIDR ranges. // GetExcludeSourceIPs returns the list of excluded source IPs or CIDR ranges.
func (c *CorrelationConfig) GetExcludeSourceIPs() []string { func (c *CorrelationConfig) GetExcludeSourceIPs() []string {
return c.ExcludeSourceIPs return c.ExcludeSourceIPs

View File

@ -44,6 +44,7 @@ type CorrelationConfig struct {
NetworkTTLS int // TTL in seconds for network events (source B) NetworkTTLS int // TTL in seconds for network events (source B)
MatchingMode string // "one_to_one" or "one_to_many" (Keep-Alive) MatchingMode string // "one_to_one" or "one_to_many" (Keep-Alive)
ExcludeSourceIPs []string // List of source IPs or CIDR ranges to exclude ExcludeSourceIPs []string // List of source IPs or CIDR ranges to exclude
IncludeDestPorts []int // If non-empty, only correlate events matching these destination ports
} }
// pendingOrphan represents an A event waiting to be emitted as orphan. // pendingOrphan represents an A event waiting to be emitted as orphan.
@ -181,6 +182,20 @@ func (s *CorrelationService) isIPExcluded(ip string) bool {
return false return false
} }
// isDestPortFiltered returns true if the event's destination port is NOT in the
// include list. When IncludeDestPorts is empty, all ports are allowed.
func (s *CorrelationService) isDestPortFiltered(port int) bool {
if len(s.config.IncludeDestPorts) == 0 {
return false // no filter configured: allow all
}
for _, p := range s.config.IncludeDestPorts {
if p == port {
return false // port is in the allow-list
}
}
return true // port not in allow-list: filter out
}
// ProcessEvent processes an incoming event and returns correlated logs if matches are found. // ProcessEvent processes an incoming event and returns correlated logs if matches are found.
func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLog { func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLog {
s.mu.Lock() s.mu.Lock()
@ -194,6 +209,14 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo
return nil return nil
} }
// Check if destination port is in the allow-list
if s.isDestPortFiltered(event.DstPort) {
s.logger.Debugf("event excluded by dest port filter: source=%s dst_port=%d",
event.Source, event.DstPort)
s.metrics.RecordCorrelationFailed("dest_port_filtered")
return nil
}
// Record event received // Record event received
s.metrics.RecordEventReceived(string(event.Source)) s.metrics.RecordEventReceived(string(event.Source))

View File

@ -1317,3 +1317,100 @@ func TestCorrelationService_ApacheEmitDelay_Flush(t *testing.T) {
} }
} }
func TestCorrelationService_IncludeDestPorts_AllowedPort(t *testing.T) {
now := time.Now()
mt := &mockTimeProvider{now: now}
svc := NewCorrelationService(CorrelationConfig{
TimeWindow: 10 * time.Second,
ApacheAlwaysEmit: true,
MatchingMode: MatchingModeOneToMany,
IncludeDestPorts: []int{80, 443},
}, mt)
// B event on allowed port 443
bEvent := &NormalizedEvent{
Source: SourceB, Timestamp: now,
SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 443,
}
results := svc.ProcessEvent(bEvent)
if len(results) != 0 {
t.Fatalf("expected B buffered (no match yet), got %d results", len(results))
}
// A event on same key, allowed port
aEvent := &NormalizedEvent{
Source: SourceA, Timestamp: now,
SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 443,
}
results = svc.ProcessEvent(aEvent)
if len(results) != 1 {
t.Fatalf("expected 1 correlation, got %d", len(results))
}
if !results[0].Correlated {
t.Error("expected correlated=true")
}
}
func TestCorrelationService_IncludeDestPorts_FilteredPort(t *testing.T) {
now := time.Now()
mt := &mockTimeProvider{now: now}
svc := NewCorrelationService(CorrelationConfig{
TimeWindow: 10 * time.Second,
ApacheAlwaysEmit: true,
MatchingMode: MatchingModeOneToMany,
IncludeDestPorts: []int{80, 443},
}, mt)
// A event on port 22 (not in allow-list)
aEvent := &NormalizedEvent{
Source: SourceA, Timestamp: now,
SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 22,
}
results := svc.ProcessEvent(aEvent)
if len(results) != 0 {
t.Fatalf("expected 0 results (filtered), got %d", len(results))
}
// B event on port 22 (not in allow-list)
bEvent := &NormalizedEvent{
Source: SourceB, Timestamp: now,
SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 22,
}
results = svc.ProcessEvent(bEvent)
if len(results) != 0 {
t.Fatalf("expected 0 results (filtered), got %d", len(results))
}
// Flush should also return nothing
flushed := svc.Flush()
if len(flushed) != 0 {
t.Errorf("expected 0 flushed events, got %d", len(flushed))
}
}
func TestCorrelationService_IncludeDestPorts_EmptyAllowsAll(t *testing.T) {
now := time.Now()
mt := &mockTimeProvider{now: now}
// No IncludeDestPorts = all ports allowed
svc := NewCorrelationService(CorrelationConfig{
TimeWindow: 10 * time.Second,
ApacheAlwaysEmit: true,
MatchingMode: MatchingModeOneToMany,
}, mt)
bEvent := &NormalizedEvent{
Source: SourceB, Timestamp: now,
SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 9999,
}
svc.ProcessEvent(bEvent)
aEvent := &NormalizedEvent{
Source: SourceA, Timestamp: now,
SrcIP: "1.2.3.4", SrcPort: 1234, DstPort: 9999,
}
results := svc.ProcessEvent(aEvent)
if len(results) != 1 || !results[0].Correlated {
t.Errorf("expected 1 correlation on any port when list is empty, got %d", len(results))
}
}

View File

@ -141,6 +141,16 @@ exit 0
%config(noreplace) /etc/logrotate.d/logcorrelator %config(noreplace) /etc/logrotate.d/logcorrelator
%changelog %changelog
* Thu Mar 05 2026 logcorrelator <dev@example.com> - 1.1.12-1
- Feat: New config directive include_dest_ports - restrict correlation to specific destination ports
- Feat: If include_dest_ports is non-empty, events on unlisted ports are silently ignored (not correlated, not emitted as orphan)
- Feat: New metric failed_dest_port_filtered for monitoring filtered traffic
- Feat: Debug log for filtered events: "event excluded by dest port filter: source=A dst_port=22"
- Test: New unit tests for include_dest_ports (allowed port, filtered port, empty=all)
- Docs: README.md updated with include_dest_ports section and current version references
- Docs: architecture.yml updated with include_dest_ports
- Fix: config.example.yml - removed obsolete stdout.level field
* Thu Mar 05 2026 logcorrelator <dev@example.com> - 1.1.11-1 * Thu Mar 05 2026 logcorrelator <dev@example.com> - 1.1.11-1
- Fix: StdoutSink no longer writes correlated/orphan JSON to stdout - Fix: StdoutSink no longer writes correlated/orphan JSON to stdout
- Fix: stdout sink is now a no-op for data; operational logs go to stderr via logger - Fix: stdout sink is now a no-op for data; operational logs go to stderr via logger