- feat(observability): metrics server with /metrics and /health endpoints - feat(observability): correlation metrics (events, success/failed, reasons, buffers) - feat(correlation): IP exclusion filter (exact IPs and CIDR ranges) - feat(correlation): pending orphan delay for late-arriving B events - fix(stdout): sink is now a no-op for data; JSON must never appear on stdout - fix(clickhouse): all flush errors were silently discarded, now properly logged - fix(clickhouse): buffer overflow with DropOnOverflow now logged at WARN - fix(clickhouse): retry attempts logged at WARN with attempt/delay/error context - feat(clickhouse): connection success logged at INFO, batch sends at DEBUG - feat(clickhouse): SetLogger() for external logger injection - test(stdout): assert stdout remains empty for correlated and orphan logs - chore(rpm): bump version to 1.1.11, update changelog - docs: README and architecture.yml updated Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
742 lines
23 KiB
Markdown
742 lines
23 KiB
Markdown
# logcorrelator
|
|
|
|
Service de corrélation de logs HTTP et réseau écrit en Go.
|
|
|
|
## Description
|
|
|
|
**logcorrelator** reçoit deux flux de logs JSON via des sockets Unix :
|
|
- **Source A** : logs HTTP applicatifs (Apache, reverse proxy)
|
|
- **Source B** : logs réseau (métadonnées IP/TCP, JA3/JA4, etc.)
|
|
|
|
Il corrèle les événements sur la base de `src_ip + src_port` avec une fenêtre temporelle configurable, et produit des logs corrélés vers :
|
|
- Un fichier local (JSON lines)
|
|
- ClickHouse (pour analyse et archivage)
|
|
|
|
## Architecture
|
|
|
|
```
|
|
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
|
|
│ Apache Source │────▶│ │────▶│ File Sink │
|
|
│ (Unix Socket) │ │ Correlation │ │ (JSON lines) │
|
|
└─────────────────┘ │ Service │ └─────────────────┘
|
|
│ │
|
|
┌─────────────────┐ │ - Buffers │ ┌─────────────────┐
|
|
│ Network Source │────▶│ - Time Window │────▶│ ClickHouse │
|
|
│ (Unix Socket) │ │ - Orphan Policy │ │ Sink │
|
|
└─────────────────┘ └──────────────────┘ └─────────────────┘
|
|
```
|
|
|
|
## Build (100% Docker)
|
|
|
|
Tout le build et les tests s'exécutent dans des containers Docker :
|
|
|
|
```bash
|
|
# Build complet (binaire + tests + RPM)
|
|
make package-rpm
|
|
|
|
# Uniquement les tests
|
|
make test
|
|
|
|
# Build manuel avec Docker
|
|
docker build --target builder -t logcorrelator-builder .
|
|
docker build --target runtime -t logcorrelator:latest .
|
|
```
|
|
|
|
### Prérequis
|
|
|
|
- Docker 20.10+
|
|
- Bash
|
|
|
|
## Installation
|
|
|
|
### Depuis Docker
|
|
|
|
```bash
|
|
# Build de l'image
|
|
docker build --target runtime -t logcorrelator:latest .
|
|
|
|
# Exécuter
|
|
docker run -d \
|
|
--name logcorrelator \
|
|
-v /var/run/logcorrelator:/var/run/logcorrelator \
|
|
-v /var/log/logcorrelator:/var/log/logcorrelator \
|
|
-v ./config.example.yml:/etc/logcorrelator/logcorrelator.yml \
|
|
logcorrelator:latest
|
|
```
|
|
|
|
### Depuis les packages RPM
|
|
|
|
```bash
|
|
# Générer les packages
|
|
make package-rpm
|
|
|
|
# Installer le package RPM (Rocky Linux 8/9/10)
|
|
sudo dnf install -y dist/rpm/el8/logcorrelator-1.1.10-1.el8.x86_64.rpm
|
|
sudo dnf install -y dist/rpm/el9/logcorrelator-1.1.10-1.el9.x86_64.rpm
|
|
sudo dnf install -y dist/rpm/el10/logcorrelator-1.1.10-1.el10.x86_64.rpm
|
|
|
|
# Activer et démarrer le service
|
|
sudo systemctl enable logcorrelator
|
|
sudo systemctl start logcorrelator
|
|
|
|
# Vérifier le statut
|
|
sudo systemctl status logcorrelator
|
|
```
|
|
|
|
### Build manuel (sans Docker)
|
|
|
|
```bash
|
|
# Prérequis: Go 1.21+
|
|
go build -o logcorrelator ./cmd/logcorrelator
|
|
|
|
# Exécuter
|
|
./logcorrelator -config config.example.yml
|
|
```
|
|
|
|
## Configuration
|
|
|
|
La configuration utilise un fichier YAML. Voir `config.example.yml` pour un exemple complet.
|
|
|
|
```yaml
|
|
# /etc/logcorrelator/logcorrelator.yml
|
|
|
|
log:
|
|
level: INFO # DEBUG, INFO, WARN, ERROR
|
|
|
|
inputs:
|
|
unix_sockets:
|
|
# Source HTTP (A) : logs applicatifs en JSON
|
|
- name: http
|
|
path: /var/run/logcorrelator/http.sock
|
|
socket_permissions: "0666"
|
|
socket_type: dgram
|
|
|
|
# Source réseau (B) : logs IP/TCP/JA3... en JSON
|
|
- name: network
|
|
path: /var/run/logcorrelator/network.sock
|
|
socket_permissions: "0666"
|
|
socket_type: dgram
|
|
|
|
outputs:
|
|
file:
|
|
enabled: true
|
|
path: /var/log/logcorrelator/correlated.log
|
|
format: json_lines
|
|
|
|
clickhouse:
|
|
enabled: true
|
|
dsn: clickhouse://data_writer:password@localhost:9000/mabase_prod
|
|
table: http_logs_raw
|
|
batch_size: 500
|
|
flush_interval_ms: 200
|
|
max_buffer_size: 5000
|
|
drop_on_overflow: true
|
|
|
|
correlation:
|
|
time_window:
|
|
value: 1
|
|
unit: s
|
|
orphan_policy:
|
|
apache_always_emit: true
|
|
network_emit: false
|
|
matching:
|
|
mode: one_to_many # Keep-Alive : un B peut corréler plusieurs A
|
|
buffers:
|
|
max_http_items: 10000
|
|
max_network_items: 20000
|
|
ttl:
|
|
network_ttl_s: 30
|
|
# Exclure certaines IPs source (optionnel)
|
|
exclude_source_ips:
|
|
- 10.0.0.1 # IP unique
|
|
- 192.168.0.0/16 # Plage CIDR
|
|
```
|
|
|
|
### Format du DSN ClickHouse
|
|
|
|
```
|
|
clickhouse://username:password@host:port/database
|
|
```
|
|
|
|
Exemple : `clickhouse://data_writer:MonMotDePasse@127.0.0.1:9000/mabase_prod`
|
|
|
|
Ports courants :
|
|
- `9000` : port natif (recommandé pour le driver Go)
|
|
- `8123` : port HTTP (alternative)
|
|
|
|
## Format des logs
|
|
|
|
### Source A (HTTP)
|
|
|
|
```json
|
|
{
|
|
"src_ip": "192.168.1.1",
|
|
"src_port": 8080,
|
|
"dst_ip": "10.0.0.1",
|
|
"dst_port": 80,
|
|
"timestamp": 1704110400000000000,
|
|
"method": "GET",
|
|
"path": "/api/test",
|
|
"header_host": "example.com"
|
|
}
|
|
```
|
|
|
|
### Source B (Réseau)
|
|
|
|
```json
|
|
{
|
|
"src_ip": "192.168.1.1",
|
|
"src_port": 8080,
|
|
"dst_ip": "10.0.0.1",
|
|
"dst_port": 443,
|
|
"ja3": "abc123def456",
|
|
"ja4": "xyz789"
|
|
}
|
|
```
|
|
|
|
### Log corrélé (sortie)
|
|
|
|
**Version 1.0.3+** - Structure JSON plate (flat) :
|
|
|
|
```json
|
|
{
|
|
"timestamp": "2024-01-01T12:00:00Z",
|
|
"src_ip": "192.168.1.1",
|
|
"src_port": 8080,
|
|
"dst_ip": "10.0.0.1",
|
|
"dst_port": 80,
|
|
"correlated": true,
|
|
"method": "GET",
|
|
"path": "/api/test",
|
|
"ja3": "abc123def456",
|
|
"ja4": "xyz789"
|
|
}
|
|
```
|
|
|
|
Tous les champs des sources A et B sont fusionnés au même niveau. Les champs de corrélation (`src_ip`, `src_port`, `dst_ip`, `dst_port`, `correlated`, `orphan_side`) sont toujours présents, et tous les autres champs des logs sources sont ajoutés directement à la racine.
|
|
|
|
## Schema ClickHouse
|
|
|
|
Le service utilise deux tables ClickHouse dans la base `mabase_prod` :
|
|
|
|
### Setup complet
|
|
|
|
```sql
|
|
-- 1. Créer la base de données
|
|
CREATE DATABASE IF NOT EXISTS mabase_prod;
|
|
|
|
-- 2. Table brute avec TTL (1 jour de rétention)
|
|
DROP TABLE IF EXISTS mabase_prod.http_logs_raw;
|
|
|
|
CREATE TABLE mabase_prod.http_logs_raw
|
|
(
|
|
raw_json String,
|
|
ingest_time DateTime DEFAULT now()
|
|
)
|
|
ENGINE = MergeTree
|
|
PARTITION BY toDate(ingest_time)
|
|
ORDER BY ingest_time;
|
|
|
|
-- 3. Table parsée
|
|
DROP TABLE IF EXISTS mabase_prod.http_logs;
|
|
|
|
CREATE TABLE mabase_prod.http_logs
|
|
(
|
|
time DateTime,
|
|
log_date Date DEFAULT toDate(time),
|
|
|
|
src_ip IPv4,
|
|
src_port UInt16,
|
|
dst_ip IPv4,
|
|
dst_port UInt16,
|
|
|
|
method LowCardinality(String),
|
|
scheme LowCardinality(String),
|
|
host LowCardinality(String),
|
|
path String,
|
|
query String,
|
|
http_version LowCardinality(String),
|
|
orphan_side LowCardinality(String),
|
|
|
|
correlated UInt8,
|
|
keepalives UInt16,
|
|
a_timestamp UInt64,
|
|
b_timestamp UInt64,
|
|
conn_id String,
|
|
|
|
ip_meta_df UInt8,
|
|
ip_meta_id UInt32,
|
|
ip_meta_total_length UInt32,
|
|
ip_meta_ttl UInt8,
|
|
tcp_meta_options LowCardinality(String),
|
|
tcp_meta_window_size UInt32,
|
|
syn_to_clienthello_ms Int32,
|
|
|
|
tls_version LowCardinality(String),
|
|
tls_sni LowCardinality(String),
|
|
ja3 String,
|
|
ja3_hash String,
|
|
ja4 String,
|
|
|
|
header_user_agent String,
|
|
header_accept String,
|
|
header_accept_encoding String,
|
|
header_accept_language String,
|
|
header_x_request_id String,
|
|
header_x_trace_id String,
|
|
header_x_forwarded_for String,
|
|
|
|
header_sec_ch_ua String,
|
|
header_sec_ch_ua_mobile String,
|
|
header_sec_ch_ua_platform String,
|
|
header_sec_fetch_dest String,
|
|
header_sec_fetch_mode String,
|
|
header_sec_fetch_site String
|
|
)
|
|
ENGINE = MergeTree
|
|
PARTITION BY log_date
|
|
ORDER BY (time, src_ip, dst_ip, ja4);
|
|
|
|
-- 4. Vue matérialisée (RAW → logs)
|
|
DROP VIEW IF EXISTS mabase_prod.mv_http_logs;
|
|
|
|
CREATE MATERIALIZED VIEW mabase_prod.mv_http_logs
|
|
TO mabase_prod.http_logs
|
|
AS
|
|
SELECT
|
|
-- 1. Temps
|
|
parseDateTimeBestEffort(
|
|
coalesce(JSONExtractString(raw_json, 'time'), '1970-01-01T00:00:00Z')
|
|
) AS time,
|
|
toDate(time) AS log_date,
|
|
|
|
-- 2. Réseau L3/L4
|
|
toIPv4(coalesce(JSONExtractString(raw_json, 'src_ip'), '0.0.0.0')) AS src_ip,
|
|
toUInt16(coalesce(JSONExtractUInt(raw_json, 'src_port'), 0)) AS src_port,
|
|
toIPv4(coalesce(JSONExtractString(raw_json, 'dst_ip'), '0.0.0.0')) AS dst_ip,
|
|
toUInt16(coalesce(JSONExtractUInt(raw_json, 'dst_port'), 0)) AS dst_port,
|
|
|
|
-- 3. HTTP de base
|
|
coalesce(JSONExtractString(raw_json, 'method'), '') AS method,
|
|
coalesce(JSONExtractString(raw_json, 'scheme'), '') AS scheme,
|
|
coalesce(JSONExtractString(raw_json, 'host'), '') AS host,
|
|
coalesce(JSONExtractString(raw_json, 'path'), '') AS path,
|
|
coalesce(JSONExtractString(raw_json, 'query'), '') AS query,
|
|
coalesce(JSONExtractString(raw_json, 'http_version'), '') AS http_version,
|
|
coalesce(JSONExtractString(raw_json, 'orphan_side'), '') AS orphan_side,
|
|
|
|
-- 4. Connexion / corrélation
|
|
toUInt8(coalesce(JSONExtractBool(raw_json, 'correlated'), 0)) AS correlated,
|
|
toUInt16(coalesce(JSONExtractUInt(raw_json, 'keepalives'), 0)) AS keepalives,
|
|
coalesce(JSONExtractUInt(raw_json, 'a_timestamp'), 0) AS a_timestamp,
|
|
coalesce(JSONExtractUInt(raw_json, 'b_timestamp'), 0) AS b_timestamp,
|
|
coalesce(JSONExtractString(raw_json, 'conn_id'), '') AS conn_id,
|
|
|
|
-- 5. IP/TCP
|
|
toUInt8(coalesce(JSONExtractBool(raw_json, 'ip_meta_df'), 0)) AS ip_meta_df,
|
|
coalesce(JSONExtractUInt(raw_json, 'ip_meta_id'), 0) AS ip_meta_id,
|
|
coalesce(JSONExtractUInt(raw_json, 'ip_meta_total_length'), 0) AS ip_meta_total_length,
|
|
coalesce(JSONExtractUInt(raw_json, 'ip_meta_ttl'), 0) AS ip_meta_ttl,
|
|
coalesce(JSONExtractString(raw_json, 'tcp_meta_options'), '') AS tcp_meta_options,
|
|
coalesce(JSONExtractUInt(raw_json, 'tcp_meta_window_size'), 0) AS tcp_meta_window_size,
|
|
toInt32(coalesce(JSONExtractInt(raw_json, 'syn_to_clienthello_ms'), 0)) AS syn_to_clienthello_ms,
|
|
|
|
-- 6. TLS / JA3/JA4
|
|
coalesce(JSONExtractString(raw_json, 'tls_version'), '') AS tls_version,
|
|
coalesce(JSONExtractString(raw_json, 'tls_sni'), '') AS tls_sni,
|
|
coalesce(JSONExtractString(raw_json, 'ja3'), '') AS ja3,
|
|
coalesce(JSONExtractString(raw_json, 'ja3_hash'), '') AS ja3_hash,
|
|
coalesce(JSONExtractString(raw_json, 'ja4'), '') AS ja4,
|
|
|
|
-- 7. Headers HTTP
|
|
coalesce(JSONExtractString(raw_json, 'header_User-Agent'), '') AS header_user_agent,
|
|
coalesce(JSONExtractString(raw_json, 'header_Accept'), '') AS header_accept,
|
|
coalesce(JSONExtractString(raw_json, 'header_Accept-Encoding'), '') AS header_accept_encoding,
|
|
coalesce(JSONExtractString(raw_json, 'header_Accept-Language'), '') AS header_accept_language,
|
|
coalesce(JSONExtractString(raw_json, 'header_X-Request-Id'), '') AS header_x_request_id,
|
|
coalesce(JSONExtractString(raw_json, 'header_X-Trace-Id'), '') AS header_x_trace_id,
|
|
coalesce(JSONExtractString(raw_json, 'header_X-Forwarded-For'), '') AS header_x_forwarded_for,
|
|
|
|
coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA'), '') AS header_sec_ch_ua,
|
|
coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Mobile'), '') AS header_sec_ch_ua_mobile,
|
|
coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Platform'), '') AS header_sec_ch_ua_platform,
|
|
coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Dest'), '') AS header_sec_fetch_dest,
|
|
coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Mode'), '') AS header_sec_fetch_mode,
|
|
coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Site'), '') AS header_sec_fetch_site
|
|
|
|
FROM mabase_prod.http_logs_raw;
|
|
```
|
|
|
|
### Utilisateurs et permissions
|
|
|
|
```sql
|
|
-- Créer les utilisateurs
|
|
CREATE USER IF NOT EXISTS data_writer IDENTIFIED WITH TonMotDePasseInsert;
|
|
CREATE USER IF NOT EXISTS analyst IDENTIFIED WITH TonMotDePasseAnalyst;
|
|
|
|
-- Droits pour data_writer (INSERT + SELECT pour la MV)
|
|
GRANT INSERT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;
|
|
GRANT SELECT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;
|
|
|
|
-- Droits pour analyst (lecture seule sur les logs parsés)
|
|
GRANT SELECT ON mabase_prod.http_logs TO analyst;
|
|
```
|
|
|
|
### Format d'insertion
|
|
|
|
Le service envoie chaque log corrélé sérialisé en JSON dans la colonne `raw_json` :
|
|
|
|
```sql
|
|
INSERT INTO mabase_prod.http_logs_raw (raw_json) FORMAT JSONEachRow
|
|
{"raw_json":"{\"timestamp\":\"2024-01-01T12:00:00Z\",\"src_ip\":\"192.168.1.1\",\"correlated\":true,...}"}
|
|
```
|
|
|
|
### Migration des données existantes
|
|
|
|
Si vous avez déjà des données dans l'ancienne table `http_logs_raw` :
|
|
|
|
```sql
|
|
INSERT INTO mabase_prod.http_logs (raw_json)
|
|
SELECT raw_json
|
|
FROM mabase_prod.http_logs_raw;
|
|
```
|
|
|
|
### Sanity checks - Vérification de l'ingestion
|
|
|
|
Après avoir déployé le service, vérifiez que les données circulent correctement :
|
|
|
|
```sql
|
|
-- 1. Tables présentes
|
|
SELECT
|
|
database,
|
|
table,
|
|
engine
|
|
FROM system.tables
|
|
WHERE database = currentDatabase()
|
|
AND table IN ('http_logs_raw', 'http_logs', 'mv_http_logs');
|
|
|
|
-- 2. Définition de la vue matérialisée
|
|
SHOW CREATE TABLE mv_http_logs;
|
|
|
|
-- 3. Vérifier que les inserts bruts arrivent
|
|
SELECT
|
|
count(*) AS rows_raw,
|
|
min(ingest_time) AS min_ingest,
|
|
max(ingest_time) AS max_ingest
|
|
FROM http_logs_raw;
|
|
|
|
-- 4. Voir les derniers logs bruts
|
|
SELECT
|
|
ingest_time,
|
|
raw_json
|
|
FROM http_logs_raw
|
|
ORDER BY ingest_time DESC
|
|
LIMIT 5;
|
|
|
|
-- 5. Vérifier que la MV alimente http_logs
|
|
SELECT
|
|
count(*) AS rows_flat,
|
|
min(time) AS min_time,
|
|
max(time) AS max_time
|
|
FROM http_logs;
|
|
|
|
-- 6. Voir les derniers logs parsés
|
|
SELECT
|
|
time,
|
|
src_ip,
|
|
dst_ip,
|
|
method,
|
|
host,
|
|
path,
|
|
header_user_agent,
|
|
tls_version,
|
|
ja4
|
|
FROM http_logs
|
|
ORDER BY time DESC
|
|
LIMIT 10;
|
|
```
|
|
|
|
**Interprétation :**
|
|
- Si `rows_raw` > 0 mais `rows_flat` = 0 : la vue matérialisée ne fonctionne pas (vérifiez les droits SELECT sur `http_logs_raw`)
|
|
- Si les deux comptes sont > 0 : l'ingestion et le parsing fonctionnent correctement
|
|
|
|
## Tests
|
|
|
|
```bash
|
|
# Via Docker
|
|
./test.sh
|
|
|
|
# Local
|
|
go test ./...
|
|
go test -cover ./...
|
|
go test -coverprofile=coverage.out ./...
|
|
go tool cover -html=coverage.out
|
|
```
|
|
|
|
## Signaux
|
|
|
|
| Signal | Comportement |
|
|
|--------|--------------|
|
|
| `SIGINT` | Arrêt gracieux |
|
|
| `SIGTERM` | Arrêt gracieux |
|
|
|
|
Lors de l'arrêt gracieux :
|
|
1. Fermeture des sockets Unix
|
|
2. Flush des buffers
|
|
3. Émission des événements en attente
|
|
4. Fermeture des connexions ClickHouse
|
|
|
|
## Logs internes
|
|
|
|
Les logs internes sont envoyés vers stderr :
|
|
|
|
```bash
|
|
# Docker
|
|
docker logs -f logcorrelator
|
|
|
|
# Systemd
|
|
journalctl -u logcorrelator -f
|
|
```
|
|
|
|
## Structure du projet
|
|
|
|
```
|
|
.
|
|
├── cmd/logcorrelator/ # Point d'entrée
|
|
├── internal/
|
|
│ ├── adapters/
|
|
│ │ ├── inbound/unixsocket/ # Sources HTTP et réseau
|
|
│ │ └── outbound/
|
|
│ │ ├── clickhouse/ # Sink ClickHouse
|
|
│ │ ├── file/ # Sink fichier
|
|
│ │ └── multi/ # Multi-sink
|
|
│ ├── app/ # Orchestration
|
|
│ ├── config/ # Configuration YAML
|
|
│ ├── domain/ # Domaine (corrélation)
|
|
│ ├── observability/ # Logging interne
|
|
│ └── ports/ # Interfaces
|
|
├── config.example.yml # Exemple de config
|
|
├── Dockerfile # Build multi-stage
|
|
├── Dockerfile.package # Packaging RPM
|
|
├── Makefile # Commandes de build
|
|
├── architecture.yml # Spécification architecture
|
|
└── logcorrelator.service # Unité systemd
|
|
```
|
|
|
|
## License
|
|
|
|
MIT
|
|
|
|
## Troubleshooting
|
|
|
|
### ClickHouse : erreurs d'insertion
|
|
|
|
**Erreur : `No such column timestamp`**
|
|
- Vérifiez que la table de destination est bien `http_logs_raw` (colonne unique `raw_json`)
|
|
- Le service envoie un JSON sérialisé dans `raw_json`, pas des colonnes séparées
|
|
|
|
**Erreur : `ACCESS_DENIED`**
|
|
- Vérifiez les droits de l'utilisateur `data_writer` :
|
|
```sql
|
|
GRANT INSERT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;
|
|
GRANT SELECT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;
|
|
```
|
|
|
|
### Vue matérialisée ne fonctionne pas
|
|
|
|
**Symptôme :** `http_logs_raw` a des données, mais `http_logs` est vide
|
|
|
|
1. Vérifiez que la MV existe :
|
|
```sql
|
|
SHOW CREATE TABLE mv_http_logs;
|
|
```
|
|
|
|
2. Vérifiez les droits SELECT pour `data_writer` sur `http_logs_raw`
|
|
|
|
3. Testez manuellement :
|
|
```sql
|
|
INSERT INTO mabase_prod.http_logs
|
|
SELECT * FROM mabase_prod.mv_http_logs
|
|
WHERE time > now() - INTERVAL 1 HOUR;
|
|
```
|
|
|
|
### Sockets Unix : permission denied
|
|
|
|
**Erreur :** `permission denied` sur `/var/run/logcorrelator/*.sock`
|
|
|
|
- Vérifiez que les sockets ont les permissions `0666`
|
|
- Vérifiez que l'utilisateur `logcorrelator` peut lire/écrire
|
|
|
|
### Service systemd ne démarre pas
|
|
|
|
1. Vérifiez les logs :
|
|
```bash
|
|
journalctl -u logcorrelator -n 50 --no-pager
|
|
```
|
|
|
|
2. Vérifiez la configuration :
|
|
```bash
|
|
cat /etc/logcorrelator/logcorrelator.yml
|
|
```
|
|
|
|
3. Testez manuellement :
|
|
```bash
|
|
/usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.yml
|
|
```
|
|
|
|
## Débogage de la corrélation
|
|
|
|
### Activer les logs DEBUG
|
|
|
|
Pour diagnostiquer les problèmes de corrélation, activez le niveau de log `DEBUG` :
|
|
|
|
```yaml
|
|
# /etc/logcorrelator/logcorrelator.yml
|
|
log:
|
|
level: DEBUG
|
|
```
|
|
|
|
Les logs DEBUG affichent :
|
|
- Réception des événements A et B avec clé de corrélation et timestamp
|
|
- Tentatives de matching (succès et échecs)
|
|
- Raisons des échecs : `no_match_key`, `time_window`, `buffer_eviction`, `ttl_expired`
|
|
- Émission des orphelins (immédiats ou après délai)
|
|
- Resets TTL (mode Keep-Alive)
|
|
|
|
Exemple de logs :
|
|
```
|
|
[unixsocket:http] DEBUG event received: source=A src_ip=192.168.1.1 src_port=8080 timestamp=2026-03-04 11:00:00.123456789 +0000 UTC
|
|
[correlation] DEBUG processing A event: key=192.168.1.1:8080 timestamp=2026-03-04 11:00:00.123456789 +0000 UTC
|
|
[correlation] DEBUG A event has no matching B key in buffer: key=192.168.1.1:8080
|
|
[correlation] DEBUG A event added to pending orphans (delay=500ms): src_ip=192.168.1.1 src_port=8080
|
|
[correlation] DEBUG correlation found: A(src_ip=192.168.1.1 src_port=8080 ts=...) + B(src_ip=192.168.1.1 src_port=8080 ts=...)
|
|
```
|
|
|
|
### Serveur de métriques
|
|
|
|
Le service expose un serveur HTTP optionnel pour le monitoring et le débogage.
|
|
|
|
**Configuration :**
|
|
```yaml
|
|
metrics:
|
|
enabled: true
|
|
addr: ":8080" # Adresse d'écoute
|
|
```
|
|
|
|
**Endpoints :**
|
|
- `GET /metrics` - Retourne les métriques de corrélation au format JSON
|
|
- `GET /health` - Health check
|
|
|
|
**Métriques disponibles :**
|
|
|
|
| Métrique | Description |
|
|
|----------|-------------|
|
|
| `events_received_a` | Nombre d'événements A reçus |
|
|
| `events_received_b` | Nombre d'événements B reçus |
|
|
| `correlations_success` | Corrélations réussies |
|
|
| `correlations_failed` | Échecs de corrélation |
|
|
| `failed_no_match_key` | Échec : clé `src_ip:src_port` non trouvée |
|
|
| `failed_time_window` | Échec : hors fenêtre temporelle |
|
|
| `failed_buffer_eviction` | Échec : buffer plein |
|
|
| `failed_ttl_expired` | Échec : TTL expiré |
|
|
| `buffer_a_size` | Taille du buffer A |
|
|
| `buffer_b_size` | Taille du buffer B |
|
|
| `orphans_emitted_a` | Orphelins A émis |
|
|
| `orphans_emitted_b` | Orphelins B émis |
|
|
| `orphans_pending_a` | Orphelins A en attente |
|
|
| `pending_orphan_match` | B a corrélé avec un orphelin A en attente |
|
|
| `keepalive_resets` | Resets TTL (mode Keep-Alive) |
|
|
|
|
**Exemple de réponse :**
|
|
```json
|
|
{
|
|
"events_received_a": 150,
|
|
"events_received_b": 145,
|
|
"correlations_success": 140,
|
|
"correlations_failed": 10,
|
|
"failed_no_match_key": 5,
|
|
"failed_time_window": 3,
|
|
"failed_buffer_eviction": 0,
|
|
"failed_ttl_expired": 2,
|
|
"buffer_a_size": 10,
|
|
"buffer_b_size": 5,
|
|
"orphans_emitted_a": 10,
|
|
"keepalive_resets": 25
|
|
}
|
|
```
|
|
|
|
### Diagnostic rapide
|
|
|
|
Selon les métriques, identifiez la cause des échecs :
|
|
|
|
| Métrique élevée | Cause probable | Solution |
|
|
|----------------|----------------|----------|
|
|
| `failed_no_match_key` | Les logs A et B n'ont pas le même `src_ip + src_port` | Vérifiez que les deux sources utilisent bien la même combinaison IP/port |
|
|
| `failed_time_window` | Timestamps trop éloignés (>10s par défaut) | Augmentez `correlation.time_window.value` ou vérifiez la synchronisation des horloges |
|
|
| `failed_ttl_expired` | Les événements B expirent avant corrélation | Augmentez `correlation.ttl.network_ttl_s` |
|
|
| `failed_buffer_eviction` | Buffers trop petits pour le volume | Augmentez `correlation.buffers.max_http_items` et `max_network_items` |
|
|
| `orphans_emitted_a` élevé | Beaucoup de logs A sans B correspondant | Vérifiez que la source B envoie bien les événements attendus |
|
|
| `failed_ip_excluded` élevé | Traffic depuis des IPs exclues | Vérifiez la configuration `exclude_source_ips` |
|
|
|
|
### Exclure des IPs source
|
|
|
|
Pour exclure certains logs en fonction de l'IP source, utilisez la configuration `exclude_source_ips` :
|
|
|
|
```yaml
|
|
correlation:
|
|
exclude_source_ips:
|
|
- 10.0.0.1 # IP unique
|
|
- 192.168.1.100 # Autre IP unique
|
|
- 172.16.0.0/12 # Plage CIDR (réseau privé)
|
|
- 10.10.10.0/24 # Autre plage CIDR
|
|
```
|
|
|
|
**Cas d'usage :**
|
|
- Exclure les health checks et sondes de monitoring
|
|
- Filtrer le traffic interne connu
|
|
- Bloquer des IPs malveillantes ou indésirables
|
|
|
|
**Comportement :**
|
|
- Les événements depuis ces IPs sont silencieusement ignorés
|
|
- Ils ne sont pas corrélés, pas émis comme orphelins
|
|
- La métrique `failed_ip_excluded` compte le nombre d'événements exclus
|
|
- Les logs DEBUG montrent : `event excluded by IP filter: source=A src_ip=10.0.0.1 src_port=8080`
|
|
|
|
### Scripts de test
|
|
|
|
Deux scripts sont fournis pour tester la corrélation :
|
|
|
|
**Script Bash (simple) :**
|
|
```bash
|
|
# Test de base avec 10 paires d'événements
|
|
./scripts/test-correlation.sh -c 10 -v
|
|
|
|
# Avec chemins de sockets personnalisés
|
|
./scripts/test-correlation.sh \
|
|
-H /var/run/logcorrelator/http.socket \
|
|
-N /var/run/logcorrelator/network.socket \
|
|
-m http://localhost:8080/metrics
|
|
```
|
|
|
|
**Script Python (avancé) :**
|
|
```bash
|
|
# Installation des dépendances
|
|
pip install requests
|
|
|
|
# Test de base
|
|
python3 scripts/test-correlation-advanced.py -c 20 -v
|
|
|
|
# Tous les tests (basic, time window, different IP, keepalive)
|
|
python3 scripts/test-correlation-advanced.py --all
|
|
|
|
# Test spécifique
|
|
python3 scripts/test-correlation-advanced.py --time-window
|
|
python3 scripts/test-correlation-advanced.py --keepalive
|
|
```
|
|
|
|
**Prérequis :**
|
|
- `socat` ou `nc` (netcat) pour le script Bash
|
|
- Python 3.6+ et `requests` pour le script Python
|
|
- Le service `logcorrelator` doit être en cours d'exécution
|
|
- Le serveur de métriques doit être activé pour les vérifications automatiques
|