# logcorrelator Service de corrélation de logs HTTP et réseau écrit en Go. ## Description **logcorrelator** reçoit deux flux de logs JSON via des sockets Unix : - **Source A** : logs HTTP applicatifs (Apache, reverse proxy) - **Source B** : logs réseau (métadonnées IP/TCP, JA3/JA4, etc.) Il corrèle les événements sur la base de `src_ip + src_port` avec une fenêtre temporelle configurable, et produit des logs corrélés vers : - Un fichier local (JSON lines) - ClickHouse (pour analyse et archivage) ## Architecture ``` ┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ │ Apache Source │────▶│ │────▶│ File Sink │ │ (Unix Socket) │ │ Correlation │ │ (JSON lines) │ └─────────────────┘ │ Service │ └─────────────────┘ │ │ ┌─────────────────┐ │ - Buffers │ ┌─────────────────┐ │ Network Source │────▶│ - Time Window │────▶│ ClickHouse │ │ (Unix Socket) │ │ - Orphan Policy │ │ Sink │ └─────────────────┘ └──────────────────┘ └─────────────────┘ ``` ## Build (100% Docker) Tout le build et les tests s'exécutent dans des containers Docker : ```bash # Build complet (binaire + tests + RPM) make package-rpm # Uniquement les tests make test # Build manuel avec Docker docker build --target builder -t logcorrelator-builder . docker build --target runtime -t logcorrelator:latest . ``` ### Prérequis - Docker 20.10+ - Bash ## Installation ### Depuis Docker ```bash # Build de l'image docker build --target runtime -t logcorrelator:latest . # Exécuter docker run -d \ --name logcorrelator \ -v /var/run/logcorrelator:/var/run/logcorrelator \ -v /var/log/logcorrelator:/var/log/logcorrelator \ -v ./config.example.yml:/etc/logcorrelator/logcorrelator.yml \ logcorrelator:latest ``` ### Depuis les packages RPM ```bash # Générer les packages make package-rpm # Installer le package RPM (Rocky Linux 8/9/10) sudo dnf install -y dist/rpm/rocky8/logcorrelator-1.0.7-1.el8.x86_64.rpm sudo dnf install -y dist/rpm/rocky9/logcorrelator-1.0.7-1.el9.x86_64.rpm sudo dnf install -y dist/rpm/almalinux10/logcorrelator-1.0.7-1.el10.x86_64.rpm # Activer et démarrer le service sudo systemctl enable logcorrelator sudo systemctl start logcorrelator # Vérifier le statut sudo systemctl status logcorrelator ``` ### Build manuel (sans Docker) ```bash # Prérequis: Go 1.21+ go build -o logcorrelator ./cmd/logcorrelator # Exécuter ./logcorrelator -config config.example.yml ``` ## Configuration La configuration utilise un fichier YAML : ```yaml # Service configuration service: name: logcorrelator language: go # Input sources (at least 2 required) inputs: unix_sockets: - name: http_source path: /var/run/logcorrelator/http.socket format: json - name: network_source path: /var/run/logcorrelator/network.socket format: json # File output outputs: file: enabled: true path: /var/log/logcorrelator/correlated.log # ClickHouse output outputs: clickhouse: enabled: false dsn: clickhouse://user:pass@localhost:9000/db table: correlated_logs_http_network # Correlation configuration correlation: key: - src_ip - src_port time_window: value: 1 unit: s orphan_policy: apache_always_emit: true network_emit: false ``` Exemple complet dans `config.example.yml`. ## Format des logs ### Source A (HTTP) ```json { "src_ip": "192.168.1.1", "src_port": 8080, "dst_ip": "10.0.0.1", "dst_port": 80, "timestamp": 1704110400000000000, "method": "GET", "path": "/api/test", "header_host": "example.com" } ``` ### Source B (Réseau) ```json { "src_ip": "192.168.1.1", "src_port": 8080, "dst_ip": "10.0.0.1", "dst_port": 443, "ja3": "abc123def456", "ja4": "xyz789" } ``` ### Log corrélé (sortie) **Version 1.0.3+** - Structure JSON plate (flat) : ```json { "timestamp": "2024-01-01T12:00:00Z", "src_ip": "192.168.1.1", "src_port": 8080, "dst_ip": "10.0.0.1", "dst_port": 80, "correlated": true, "method": "GET", "path": "/api/test", "ja3": "abc123def456", "ja4": "xyz789" } ``` Tous les champs des sources A et B sont fusionnés au même niveau. Les champs de corrélation (`src_ip`, `src_port`, `dst_ip`, `dst_port`, `correlated`, `orphan_side`) sont toujours présents, et tous les autres champs des logs sources sont ajoutés directement à la racine. ## Schema ClickHouse Le service utilise deux tables ClickHouse dans la base `mabase_prod` : ### Setup complet ```sql -- 1. Créer la base de données CREATE DATABASE IF NOT EXISTS mabase_prod; -- 2. Table brute avec TTL (1 jour de rétention) DROP TABLE IF EXISTS mabase_prod.http_logs_raw; CREATE TABLE mabase_prod.http_logs_raw ( raw_json String, ingest_time DateTime DEFAULT now() ) ENGINE = MergeTree PARTITION BY toDate(ingest_time) ORDER BY ingest_time; -- 3. Table parsée DROP TABLE IF EXISTS mabase_prod.http_logs; CREATE TABLE mabase_prod.http_logs ( time DateTime, log_date Date DEFAULT toDate(time), src_ip IPv4, src_port UInt16, dst_ip IPv4, dst_port UInt16, method LowCardinality(String), scheme LowCardinality(String), host LowCardinality(String), path String, query String, http_version LowCardinality(String), orphan_side LowCardinality(String), correlated UInt8, keepalives UInt16, a_timestamp UInt64, b_timestamp UInt64, conn_id String, ip_meta_df UInt8, ip_meta_id UInt32, ip_meta_total_length UInt32, ip_meta_ttl UInt8, tcp_meta_options LowCardinality(String), tcp_meta_window_size UInt32, syn_to_clienthello_ms Int32, tls_version LowCardinality(String), tls_sni LowCardinality(String), ja3 String, ja3_hash String, ja4 String, header_user_agent String, header_accept String, header_accept_encoding String, header_accept_language String, header_x_request_id String, header_x_trace_id String, header_x_forwarded_for String, header_sec_ch_ua String, header_sec_ch_ua_mobile String, header_sec_ch_ua_platform String, header_sec_fetch_dest String, header_sec_fetch_mode String, header_sec_fetch_site String ) ENGINE = MergeTree PARTITION BY log_date ORDER BY (time, src_ip, dst_ip, ja4); -- 4. Vue matérialisée (RAW → logs) DROP VIEW IF EXISTS mabase_prod.mv_http_logs; CREATE MATERIALIZED VIEW mabase_prod.mv_http_logs TO mabase_prod.http_logs AS SELECT -- 1. Temps parseDateTimeBestEffort( coalesce(JSONExtractString(raw_json, 'time'), '1970-01-01T00:00:00Z') ) AS time, toDate(time) AS log_date, -- 2. Réseau L3/L4 toIPv4(coalesce(JSONExtractString(raw_json, 'src_ip'), '0.0.0.0')) AS src_ip, toUInt16(coalesce(JSONExtractUInt(raw_json, 'src_port'), 0)) AS src_port, toIPv4(coalesce(JSONExtractString(raw_json, 'dst_ip'), '0.0.0.0')) AS dst_ip, toUInt16(coalesce(JSONExtractUInt(raw_json, 'dst_port'), 0)) AS dst_port, -- 3. HTTP de base coalesce(JSONExtractString(raw_json, 'method'), '') AS method, coalesce(JSONExtractString(raw_json, 'scheme'), '') AS scheme, coalesce(JSONExtractString(raw_json, 'host'), '') AS host, coalesce(JSONExtractString(raw_json, 'path'), '') AS path, coalesce(JSONExtractString(raw_json, 'query'), '') AS query, coalesce(JSONExtractString(raw_json, 'http_version'), '') AS http_version, coalesce(JSONExtractString(raw_json, 'orphan_side'), '') AS orphan_side, -- 4. Connexion / corrélation toUInt8(coalesce(JSONExtractBool(raw_json, 'correlated'), 0)) AS correlated, toUInt16(coalesce(JSONExtractUInt(raw_json, 'keepalives'), 0)) AS keepalives, coalesce(JSONExtractUInt(raw_json, 'a_timestamp'), 0) AS a_timestamp, coalesce(JSONExtractUInt(raw_json, 'b_timestamp'), 0) AS b_timestamp, coalesce(JSONExtractString(raw_json, 'conn_id'), '') AS conn_id, -- 5. IP/TCP toUInt8(coalesce(JSONExtractBool(raw_json, 'ip_meta_df'), 0)) AS ip_meta_df, coalesce(JSONExtractUInt(raw_json, 'ip_meta_id'), 0) AS ip_meta_id, coalesce(JSONExtractUInt(raw_json, 'ip_meta_total_length'), 0) AS ip_meta_total_length, coalesce(JSONExtractUInt(raw_json, 'ip_meta_ttl'), 0) AS ip_meta_ttl, coalesce(JSONExtractString(raw_json, 'tcp_meta_options'), '') AS tcp_meta_options, coalesce(JSONExtractUInt(raw_json, 'tcp_meta_window_size'), 0) AS tcp_meta_window_size, toInt32(coalesce(JSONExtractInt(raw_json, 'syn_to_clienthello_ms'), 0)) AS syn_to_clienthello_ms, -- 6. TLS / JA3/JA4 coalesce(JSONExtractString(raw_json, 'tls_version'), '') AS tls_version, coalesce(JSONExtractString(raw_json, 'tls_sni'), '') AS tls_sni, coalesce(JSONExtractString(raw_json, 'ja3'), '') AS ja3, coalesce(JSONExtractString(raw_json, 'ja3_hash'), '') AS ja3_hash, coalesce(JSONExtractString(raw_json, 'ja4'), '') AS ja4, -- 7. Headers HTTP coalesce(JSONExtractString(raw_json, 'header_User-Agent'), '') AS header_user_agent, coalesce(JSONExtractString(raw_json, 'header_Accept'), '') AS header_accept, coalesce(JSONExtractString(raw_json, 'header_Accept-Encoding'), '') AS header_accept_encoding, coalesce(JSONExtractString(raw_json, 'header_Accept-Language'), '') AS header_accept_language, coalesce(JSONExtractString(raw_json, 'header_X-Request-Id'), '') AS header_x_request_id, coalesce(JSONExtractString(raw_json, 'header_X-Trace-Id'), '') AS header_x_trace_id, coalesce(JSONExtractString(raw_json, 'header_X-Forwarded-For'), '') AS header_x_forwarded_for, coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA'), '') AS header_sec_ch_ua, coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Mobile'), '') AS header_sec_ch_ua_mobile, coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Platform'), '') AS header_sec_ch_ua_platform, coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Dest'), '') AS header_sec_fetch_dest, coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Mode'), '') AS header_sec_fetch_mode, coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Site'), '') AS header_sec_fetch_site FROM mabase_prod.http_logs_raw; ``` ### Utilisateurs et permissions ```sql -- Créer les utilisateurs CREATE USER IF NOT EXISTS data_writer IDENTIFIED WITH TonMotDePasseInsert; CREATE USER IF NOT EXISTS analyst IDENTIFIED WITH TonMotDePasseAnalyst; -- Droits pour data_writer (INSERT + SELECT pour la MV) GRANT INSERT(raw_json) ON mabase_prod.http_logs_raw TO data_writer; GRANT SELECT(raw_json) ON mabase_prod.http_logs_raw TO data_writer; -- Droits pour analyst (lecture seule sur les logs parsés) GRANT SELECT ON mabase_prod.http_logs TO analyst; ``` ### Format d'insertion Le service envoie chaque log corrélé sérialisé en JSON dans la colonne `raw_json` : ```sql INSERT INTO mabase_prod.http_logs_raw (raw_json) FORMAT JSONEachRow {"raw_json":"{\"timestamp\":\"2024-01-01T12:00:00Z\",\"src_ip\":\"192.168.1.1\",\"correlated\":true,...}"} ``` ### Migration des données existantes Si vous avez déjà des données dans l'ancienne table `http_logs_raw` : ```sql INSERT INTO mabase_prod.http_logs (raw_json) SELECT raw_json FROM mabase_prod.http_logs_raw; ``` ### Sanity checks - Vérification de l'ingestion Après avoir déployé le service, vérifiez que les données circulent correctement : ```sql -- 1. Tables présentes SELECT database, table, engine FROM system.tables WHERE database = currentDatabase() AND table IN ('http_logs_raw', 'http_logs', 'mv_http_logs'); -- 2. Définition de la vue matérialisée SHOW CREATE TABLE mv_http_logs; -- 3. Vérifier que les inserts bruts arrivent SELECT count(*) AS rows_raw, min(ingest_time) AS min_ingest, max(ingest_time) AS max_ingest FROM http_logs_raw; -- 4. Voir les derniers logs bruts SELECT ingest_time, raw_json FROM http_logs_raw ORDER BY ingest_time DESC LIMIT 5; -- 5. Vérifier que la MV alimente http_logs SELECT count(*) AS rows_flat, min(time) AS min_time, max(time) AS max_time FROM http_logs; -- 6. Voir les derniers logs parsés SELECT time, src_ip, dst_ip, method, host, path, header_user_agent, tls_version, ja4 FROM http_logs ORDER BY time DESC LIMIT 10; ``` **Interprétation :** - Si `rows_raw` > 0 mais `rows_flat` = 0 : la vue matérialisée ne fonctionne pas (vérifiez les droits SELECT sur `http_logs_raw`) - Si les deux comptes sont > 0 : l'ingestion et le parsing fonctionnent correctement ## Tests ```bash # Via Docker ./test.sh # Local go test ./... go test -cover ./... go test -coverprofile=coverage.out ./... go tool cover -html=coverage.out ``` ## Signaux | Signal | Comportement | |--------|--------------| | `SIGINT` | Arrêt gracieux | | `SIGTERM` | Arrêt gracieux | Lors de l'arrêt gracieux : 1. Fermeture des sockets Unix 2. Flush des buffers 3. Émission des événements en attente 4. Fermeture des connexions ClickHouse ## Logs internes Les logs internes sont envoyés vers stderr : ```bash # Docker docker logs -f logcorrelator # Systemd journalctl -u logcorrelator -f ``` ## Structure du projet ``` . ├── cmd/logcorrelator/ # Point d'entrée ├── internal/ │ ├── adapters/ │ │ ├── inbound/unixsocket/ │ │ └── outbound/ │ │ ├── clickhouse/ │ │ ├── file/ │ │ └── multi/ │ ├── app/ # Orchestration │ ├── config/ # Configuration │ ├── domain/ # Domaine (corrélation) │ ├── observability/ # Logging │ └── ports/ # Interfaces ├── config.example.conf # Exemple de config ├── Dockerfile # Build multi-stage ├── build.sh # Script de build ├── test.sh # Script de tests └── logcorrelator.service # Unité systemd ``` ## License MIT