- README.md: documenter les deux tables (raw + enrichie) - architecture.yml: décrire le schema complet avec colonnes matérialisées - Table http_logs_raw: ingestion JSON brut (colonne raw_json unique) - Table http_logs: extraction des champs via DEFAULT JSONExtract* Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
9.6 KiB
logcorrelator
Service de corrélation de logs HTTP et réseau écrit en Go.
Description
logcorrelator reçoit deux flux de logs JSON via des sockets Unix :
- Source A : logs HTTP applicatifs (Apache, reverse proxy)
- Source B : logs réseau (métadonnées IP/TCP, JA3/JA4, etc.)
Il corrèle les événements sur la base de src_ip + src_port avec une fenêtre temporelle configurable, et produit des logs corrélés vers :
- Un fichier local (JSON lines)
- ClickHouse (pour analyse et archivage)
Architecture
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Apache Source │────▶│ │────▶│ File Sink │
│ (Unix Socket) │ │ Correlation │ │ (JSON lines) │
└─────────────────┘ │ Service │ └─────────────────┘
│ │
┌─────────────────┐ │ - Buffers │ ┌─────────────────┐
│ Network Source │────▶│ - Time Window │────▶│ ClickHouse │
│ (Unix Socket) │ │ - Orphan Policy │ │ Sink │
└─────────────────┘ └──────────────────┘ └─────────────────┘
Build (100% Docker)
Tout le build et les tests s'exécutent dans des containers Docker :
# Build complet (binaire + tests + RPM)
make package-rpm
# Uniquement les tests
make test
# Build manuel avec Docker
docker build --target builder -t logcorrelator-builder .
docker build --target runtime -t logcorrelator:latest .
Prérequis
- Docker 20.10+
- Bash
Installation
Depuis Docker
# Build de l'image
docker build --target runtime -t logcorrelator:latest .
# Exécuter
docker run -d \
--name logcorrelator \
-v /var/run/logcorrelator:/var/run/logcorrelator \
-v /var/log/logcorrelator:/var/log/logcorrelator \
-v ./config.example.yml:/etc/logcorrelator/logcorrelator.yml \
logcorrelator:latest
Depuis les packages RPM
# Générer les packages
make package-rpm
# Installer le package RPM (Rocky Linux 8/9/10)
sudo dnf install -y dist/rpm/rocky8/logcorrelator-1.0.7-1.el8.x86_64.rpm
sudo dnf install -y dist/rpm/rocky9/logcorrelator-1.0.7-1.el9.x86_64.rpm
sudo dnf install -y dist/rpm/almalinux10/logcorrelator-1.0.7-1.el10.x86_64.rpm
# Activer et démarrer le service
sudo systemctl enable logcorrelator
sudo systemctl start logcorrelator
# Vérifier le statut
sudo systemctl status logcorrelator
Build manuel (sans Docker)
# Prérequis: Go 1.21+
go build -o logcorrelator ./cmd/logcorrelator
# Exécuter
./logcorrelator -config config.example.yml
Configuration
La configuration utilise un fichier YAML :
# Service configuration
service:
name: logcorrelator
language: go
# Input sources (at least 2 required)
inputs:
unix_sockets:
- name: http_source
path: /var/run/logcorrelator/http.socket
format: json
- name: network_source
path: /var/run/logcorrelator/network.socket
format: json
# File output
outputs:
file:
enabled: true
path: /var/log/logcorrelator/correlated.log
# ClickHouse output
outputs:
clickhouse:
enabled: false
dsn: clickhouse://user:pass@localhost:9000/db
table: correlated_logs_http_network
# Correlation configuration
correlation:
key:
- src_ip
- src_port
time_window:
value: 1
unit: s
orphan_policy:
apache_always_emit: true
network_emit: false
Exemple complet dans config.example.yml.
Format des logs
Source A (HTTP)
{
"src_ip": "192.168.1.1",
"src_port": 8080,
"dst_ip": "10.0.0.1",
"dst_port": 80,
"timestamp": 1704110400000000000,
"method": "GET",
"path": "/api/test",
"header_host": "example.com"
}
Source B (Réseau)
{
"src_ip": "192.168.1.1",
"src_port": 8080,
"dst_ip": "10.0.0.1",
"dst_port": 443,
"ja3": "abc123def456",
"ja4": "xyz789"
}
Log corrélé (sortie)
Version 1.0.3+ - Structure JSON plate (flat) :
{
"timestamp": "2024-01-01T12:00:00Z",
"src_ip": "192.168.1.1",
"src_port": 8080,
"dst_ip": "10.0.0.1",
"dst_port": 80,
"correlated": true,
"method": "GET",
"path": "/api/test",
"ja3": "abc123def456",
"ja4": "xyz789"
}
Tous les champs des sources A et B sont fusionnés au même niveau. Les champs de corrélation (src_ip, src_port, dst_ip, dst_port, correlated, orphan_side) sont toujours présents, et tous les autres champs des logs sources sont ajoutés directement à la racine.
Schema ClickHouse
Le service utilise deux tables ClickHouse :
Table brute (http_logs_raw)
Table d'ingestion qui stocke le log corrélé brut au format JSON :
CREATE TABLE http_logs_raw
(
raw_json String
)
ENGINE = MergeTree
ORDER BY tuple();
Format d'insertion : Le service envoie chaque log corrélé sérialisé en JSON dans la colonne raw_json :
INSERT INTO http_logs_raw (raw_json) FORMAT JSONEachRow
{"raw_json":"{\"timestamp\":\"2024-01-01T12:00:00Z\",\"src_ip\":\"192.168.1.1\",\"correlated\":true,...}"}
Table enrichie (http_logs)
Vue matérialisée qui extrait les champs du JSON pour l'analyse :
CREATE TABLE http_logs
(
raw_json String,
-- champs de base
time_str String DEFAULT JSONExtractString(raw_json, 'time'),
timestamp_str String DEFAULT JSONExtractString(raw_json, 'timestamp'),
time DateTime DEFAULT parseDateTimeBestEffort(time_str),
log_date Date DEFAULT toDate(time),
src_ip IPv4 DEFAULT toIPv4(JSONExtractString(raw_json, 'src_ip')),
src_port UInt16 DEFAULT toUInt16(JSONExtractUInt(raw_json, 'src_port')),
dst_ip IPv4 DEFAULT toIPv4(JSONExtractString(raw_json, 'dst_ip')),
dst_port UInt16 DEFAULT toUInt16(JSONExtractUInt(raw_json, 'dst_port')),
correlated UInt8 DEFAULT JSONExtractBool(raw_json, 'correlated'),
keepalives UInt16 DEFAULT toUInt16(JSONExtractUInt(raw_json, 'keepalives')),
method LowCardinality(String) DEFAULT JSONExtractString(raw_json, 'method'),
scheme LowCardinality(String) DEFAULT JSONExtractString(raw_json, 'scheme'),
host LowCardinality(String) DEFAULT JSONExtractString(raw_json, 'host'),
path String DEFAULT JSONExtractString(raw_json, 'path'),
query String DEFAULT JSONExtractString(raw_json, 'query'),
http_version LowCardinality(String) DEFAULT JSONExtractString(raw_json, 'http_version'),
orphan_side LowCardinality(String) DEFAULT JSONExtractString(raw_json, 'orphan_side'),
-- champs « presque toujours là »
a_timestamp UInt64 DEFAULT JSONExtractUInt(raw_json, 'a_timestamp'),
b_timestamp UInt64 DEFAULT JSONExtractUInt(raw_json, 'b_timestamp'),
conn_id String DEFAULT JSONExtractString(raw_json, 'conn_id'),
ip_meta_df UInt8 DEFAULT JSONExtractBool(raw_json, 'ip_meta_df'),
ip_meta_id UInt32 DEFAULT JSONExtractUInt(raw_json, 'ip_meta_id'),
ip_meta_total_length UInt32 DEFAULT JSONExtractUInt(raw_json, 'ip_meta_total_length'),
ip_meta_ttl UInt8 DEFAULT JSONExtractUInt(raw_json, 'ip_meta_ttl'),
tcp_meta_options LowCardinality(String) DEFAULT JSONExtractString(raw_json, 'tcp_meta_options'),
tcp_meta_window_size UInt32 DEFAULT JSONExtractUInt(raw_json, 'tcp_meta_window_size'),
syn_to_clienthello_ms Int32 DEFAULT toInt32(JSONExtractInt(raw_json, 'syn_to_clienthello_ms')),
tls_version LowCardinality(String) DEFAULT JSONExtractString(raw_json, 'tls_version'),
tls_sni LowCardinality(String) DEFAULT JSONExtractString(raw_json, 'tls_sni'),
ja3 String DEFAULT JSONExtractString(raw_json, 'ja3'),
ja3_hash String DEFAULT JSONExtractString(raw_json, 'ja3_hash'),
ja4 String DEFAULT JSONExtractString(raw_json, 'ja4'),
-- tous les autres champs JSON (headers dynamiques etc.)
extra JSON DEFAULT raw_json
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(log_date)
ORDER BY (log_date, dst_ip, src_ip, time);
Tests
# Via Docker
./test.sh
# Local
go test ./...
go test -cover ./...
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out
Signaux
| Signal | Comportement |
|---|---|
SIGINT |
Arrêt gracieux |
SIGTERM |
Arrêt gracieux |
Lors de l'arrêt gracieux :
- Fermeture des sockets Unix
- Flush des buffers
- Émission des événements en attente
- Fermeture des connexions ClickHouse
Logs internes
Les logs internes sont envoyés vers stderr :
# Docker
docker logs -f logcorrelator
# Systemd
journalctl -u logcorrelator -f
Structure du projet
.
├── cmd/logcorrelator/ # Point d'entrée
├── internal/
│ ├── adapters/
│ │ ├── inbound/unixsocket/
│ │ └── outbound/
│ │ ├── clickhouse/
│ │ ├── file/
│ │ └── multi/
│ ├── app/ # Orchestration
│ ├── config/ # Configuration
│ ├── domain/ # Domaine (corrélation)
│ ├── observability/ # Logging
│ └── ports/ # Interfaces
├── config.example.conf # Exemple de config
├── Dockerfile # Build multi-stage
├── build.sh # Script de build
├── test.sh # Script de tests
└── logcorrelator.service # Unité systemd
License
MIT