Build optimizations implemented:
1. Makefile: Remove --no-cache flag
- Docker builds now use layer cache (incremental builds)
- Added DOCKER_BUILDKIT=1 for better performance
- Added buildx support for parallel builds
- New targets: docker-build-dev-no-test, package-rpm-sequential
2. Dockerfile: Add SKIP_TESTS argument
- SKIP_TESTS=true for faster production builds
- Tests still run in CI by default
- Added BuildKit cache mounts for:
- /go/pkg/mod (Go modules)
- /var/cache/apt (APT cache)
- /var/lib/apt/lists (APT lists)
3. Dockerfile.package: Factorize common RPM tools
- New stage: rpm-common-tools (shared across el8/el9/el10)
- fpm installed once, reused 3 times
- Common build script: /build-rpm.sh
- Reduced duplication from 300 lines to 60 lines per stage
4. Parallel RPM builds with buildx
- make package-rpm now uses buildx for parallel builds
- el8, el9, el10 built simultaneously
- Fallback: make package-rpm-sequential (if buildx fails)
Expected performance gains:
- Incremental build (code change only): 15-25 min → 3-5 min (-80%)
- Full build (no cache): 15-25 min → 8-12 min (-50%)
- RPM builds (parallel): 9 min → 4 min (-55%)
- Total typical workflow: ~20 min → ~5-7 min (-65%)
Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
logcorrelator
Service de corrélation de logs HTTP et réseau écrit en Go.
Description
logcorrelator reçoit deux flux de logs JSON via des sockets Unix :
- Source A : logs HTTP applicatifs (Apache, reverse proxy)
- Source B : logs réseau (métadonnées IP/TCP, JA3/JA4, etc.)
Il corrèle les événements sur la base de src_ip + src_port avec une fenêtre temporelle configurable, et produit des logs corrélés vers :
- Un fichier local (JSON lines)
- ClickHouse (pour analyse et archivage)
Architecture
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Apache Source │────▶│ │────▶│ File Sink │
│ (Unix Socket) │ │ Correlation │ │ (JSON lines) │
└─────────────────┘ │ Service │ └─────────────────┘
│ │
┌─────────────────┐ │ - Buffers │ ┌─────────────────┐
│ Network Source │────▶│ - Time Window │────▶│ ClickHouse │
│ (Unix Socket) │ │ - Orphan Policy │ │ Sink │
└─────────────────┘ └──────────────────┘ └─────────────────┘
Build (100% Docker)
Tout le build et les tests s'exécutent dans des containers Docker :
# Build complet (binaire + tests + RPM)
make package-rpm
# Uniquement les tests
make test
# Build manuel avec Docker
docker build --target builder -t logcorrelator-builder .
docker build --target runtime -t logcorrelator:latest .
Prérequis
- Docker 20.10+
- Bash
Installation
Depuis Docker
# Build de l'image
docker build --target runtime -t logcorrelator:latest .
# Exécuter
docker run -d \
--name logcorrelator \
-v /var/run/logcorrelator:/var/run/logcorrelator \
-v /var/log/logcorrelator:/var/log/logcorrelator \
-v ./config.example.yml:/etc/logcorrelator/logcorrelator.yml \
logcorrelator:latest
Depuis les packages RPM
# Générer les packages
make package-rpm
# Installer le package RPM (Rocky Linux 8/9/10)
sudo dnf install -y dist/rpm/el8/logcorrelator-1.1.6-1.el8.x86_64.rpm
sudo dnf install -y dist/rpm/el9/logcorrelator-1.1.6-1.el9.x86_64.rpm
sudo dnf install -y dist/rpm/el10/logcorrelator-1.1.6-1.el10.x86_64.rpm
# Activer et démarrer le service
sudo systemctl enable logcorrelator
sudo systemctl start logcorrelator
# Vérifier le statut
sudo systemctl status logcorrelator
Build manuel (sans Docker)
# Prérequis: Go 1.21+
go build -o logcorrelator ./cmd/logcorrelator
# Exécuter
./logcorrelator -config config.example.yml
Configuration
La configuration utilise un fichier YAML. Voir config.example.yml pour un exemple complet.
# /etc/logcorrelator/logcorrelator.yml
log:
level: INFO # DEBUG, INFO, WARN, ERROR
inputs:
unix_sockets:
# Source HTTP (A) : logs applicatifs en JSON
- name: http
path: /var/run/logcorrelator/http.sock
socket_permissions: "0666"
socket_type: dgram
# Source réseau (B) : logs IP/TCP/JA3... en JSON
- name: network
path: /var/run/logcorrelator/network.sock
socket_permissions: "0666"
socket_type: dgram
outputs:
file:
enabled: true
path: /var/log/logcorrelator/correlated.log
format: json_lines
clickhouse:
enabled: true
dsn: clickhouse://data_writer:password@localhost:9000/mabase_prod
table: http_logs_raw
batch_size: 500
flush_interval_ms: 200
max_buffer_size: 5000
drop_on_overflow: true
correlation:
time_window:
value: 1
unit: s
orphan_policy:
apache_always_emit: true
network_emit: false
matching:
mode: one_to_many # Keep-Alive : un B peut corréler plusieurs A
buffers:
max_http_items: 10000
max_network_items: 20000
ttl:
network_ttl_s: 30
Format du DSN ClickHouse
clickhouse://username:password@host:port/database
Exemple : clickhouse://data_writer:MonMotDePasse@127.0.0.1:9000/mabase_prod
Ports courants :
9000: port natif (recommandé pour le driver Go)8123: port HTTP (alternative)
Format des logs
Source A (HTTP)
{
"src_ip": "192.168.1.1",
"src_port": 8080,
"dst_ip": "10.0.0.1",
"dst_port": 80,
"timestamp": 1704110400000000000,
"method": "GET",
"path": "/api/test",
"header_host": "example.com"
}
Source B (Réseau)
{
"src_ip": "192.168.1.1",
"src_port": 8080,
"dst_ip": "10.0.0.1",
"dst_port": 443,
"ja3": "abc123def456",
"ja4": "xyz789"
}
Log corrélé (sortie)
Version 1.0.3+ - Structure JSON plate (flat) :
{
"timestamp": "2024-01-01T12:00:00Z",
"src_ip": "192.168.1.1",
"src_port": 8080,
"dst_ip": "10.0.0.1",
"dst_port": 80,
"correlated": true,
"method": "GET",
"path": "/api/test",
"ja3": "abc123def456",
"ja4": "xyz789"
}
Tous les champs des sources A et B sont fusionnés au même niveau. Les champs de corrélation (src_ip, src_port, dst_ip, dst_port, correlated, orphan_side) sont toujours présents, et tous les autres champs des logs sources sont ajoutés directement à la racine.
Schema ClickHouse
Le service utilise deux tables ClickHouse dans la base mabase_prod :
Setup complet
-- 1. Créer la base de données
CREATE DATABASE IF NOT EXISTS mabase_prod;
-- 2. Table brute avec TTL (1 jour de rétention)
DROP TABLE IF EXISTS mabase_prod.http_logs_raw;
CREATE TABLE mabase_prod.http_logs_raw
(
raw_json String,
ingest_time DateTime DEFAULT now()
)
ENGINE = MergeTree
PARTITION BY toDate(ingest_time)
ORDER BY ingest_time;
-- 3. Table parsée
DROP TABLE IF EXISTS mabase_prod.http_logs;
CREATE TABLE mabase_prod.http_logs
(
time DateTime,
log_date Date DEFAULT toDate(time),
src_ip IPv4,
src_port UInt16,
dst_ip IPv4,
dst_port UInt16,
method LowCardinality(String),
scheme LowCardinality(String),
host LowCardinality(String),
path String,
query String,
http_version LowCardinality(String),
orphan_side LowCardinality(String),
correlated UInt8,
keepalives UInt16,
a_timestamp UInt64,
b_timestamp UInt64,
conn_id String,
ip_meta_df UInt8,
ip_meta_id UInt32,
ip_meta_total_length UInt32,
ip_meta_ttl UInt8,
tcp_meta_options LowCardinality(String),
tcp_meta_window_size UInt32,
syn_to_clienthello_ms Int32,
tls_version LowCardinality(String),
tls_sni LowCardinality(String),
ja3 String,
ja3_hash String,
ja4 String,
header_user_agent String,
header_accept String,
header_accept_encoding String,
header_accept_language String,
header_x_request_id String,
header_x_trace_id String,
header_x_forwarded_for String,
header_sec_ch_ua String,
header_sec_ch_ua_mobile String,
header_sec_ch_ua_platform String,
header_sec_fetch_dest String,
header_sec_fetch_mode String,
header_sec_fetch_site String
)
ENGINE = MergeTree
PARTITION BY log_date
ORDER BY (time, src_ip, dst_ip, ja4);
-- 4. Vue matérialisée (RAW → logs)
DROP VIEW IF EXISTS mabase_prod.mv_http_logs;
CREATE MATERIALIZED VIEW mabase_prod.mv_http_logs
TO mabase_prod.http_logs
AS
SELECT
-- 1. Temps
parseDateTimeBestEffort(
coalesce(JSONExtractString(raw_json, 'time'), '1970-01-01T00:00:00Z')
) AS time,
toDate(time) AS log_date,
-- 2. Réseau L3/L4
toIPv4(coalesce(JSONExtractString(raw_json, 'src_ip'), '0.0.0.0')) AS src_ip,
toUInt16(coalesce(JSONExtractUInt(raw_json, 'src_port'), 0)) AS src_port,
toIPv4(coalesce(JSONExtractString(raw_json, 'dst_ip'), '0.0.0.0')) AS dst_ip,
toUInt16(coalesce(JSONExtractUInt(raw_json, 'dst_port'), 0)) AS dst_port,
-- 3. HTTP de base
coalesce(JSONExtractString(raw_json, 'method'), '') AS method,
coalesce(JSONExtractString(raw_json, 'scheme'), '') AS scheme,
coalesce(JSONExtractString(raw_json, 'host'), '') AS host,
coalesce(JSONExtractString(raw_json, 'path'), '') AS path,
coalesce(JSONExtractString(raw_json, 'query'), '') AS query,
coalesce(JSONExtractString(raw_json, 'http_version'), '') AS http_version,
coalesce(JSONExtractString(raw_json, 'orphan_side'), '') AS orphan_side,
-- 4. Connexion / corrélation
toUInt8(coalesce(JSONExtractBool(raw_json, 'correlated'), 0)) AS correlated,
toUInt16(coalesce(JSONExtractUInt(raw_json, 'keepalives'), 0)) AS keepalives,
coalesce(JSONExtractUInt(raw_json, 'a_timestamp'), 0) AS a_timestamp,
coalesce(JSONExtractUInt(raw_json, 'b_timestamp'), 0) AS b_timestamp,
coalesce(JSONExtractString(raw_json, 'conn_id'), '') AS conn_id,
-- 5. IP/TCP
toUInt8(coalesce(JSONExtractBool(raw_json, 'ip_meta_df'), 0)) AS ip_meta_df,
coalesce(JSONExtractUInt(raw_json, 'ip_meta_id'), 0) AS ip_meta_id,
coalesce(JSONExtractUInt(raw_json, 'ip_meta_total_length'), 0) AS ip_meta_total_length,
coalesce(JSONExtractUInt(raw_json, 'ip_meta_ttl'), 0) AS ip_meta_ttl,
coalesce(JSONExtractString(raw_json, 'tcp_meta_options'), '') AS tcp_meta_options,
coalesce(JSONExtractUInt(raw_json, 'tcp_meta_window_size'), 0) AS tcp_meta_window_size,
toInt32(coalesce(JSONExtractInt(raw_json, 'syn_to_clienthello_ms'), 0)) AS syn_to_clienthello_ms,
-- 6. TLS / JA3/JA4
coalesce(JSONExtractString(raw_json, 'tls_version'), '') AS tls_version,
coalesce(JSONExtractString(raw_json, 'tls_sni'), '') AS tls_sni,
coalesce(JSONExtractString(raw_json, 'ja3'), '') AS ja3,
coalesce(JSONExtractString(raw_json, 'ja3_hash'), '') AS ja3_hash,
coalesce(JSONExtractString(raw_json, 'ja4'), '') AS ja4,
-- 7. Headers HTTP
coalesce(JSONExtractString(raw_json, 'header_User-Agent'), '') AS header_user_agent,
coalesce(JSONExtractString(raw_json, 'header_Accept'), '') AS header_accept,
coalesce(JSONExtractString(raw_json, 'header_Accept-Encoding'), '') AS header_accept_encoding,
coalesce(JSONExtractString(raw_json, 'header_Accept-Language'), '') AS header_accept_language,
coalesce(JSONExtractString(raw_json, 'header_X-Request-Id'), '') AS header_x_request_id,
coalesce(JSONExtractString(raw_json, 'header_X-Trace-Id'), '') AS header_x_trace_id,
coalesce(JSONExtractString(raw_json, 'header_X-Forwarded-For'), '') AS header_x_forwarded_for,
coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA'), '') AS header_sec_ch_ua,
coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Mobile'), '') AS header_sec_ch_ua_mobile,
coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Platform'), '') AS header_sec_ch_ua_platform,
coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Dest'), '') AS header_sec_fetch_dest,
coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Mode'), '') AS header_sec_fetch_mode,
coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Site'), '') AS header_sec_fetch_site
FROM mabase_prod.http_logs_raw;
Utilisateurs et permissions
-- Créer les utilisateurs
CREATE USER IF NOT EXISTS data_writer IDENTIFIED WITH TonMotDePasseInsert;
CREATE USER IF NOT EXISTS analyst IDENTIFIED WITH TonMotDePasseAnalyst;
-- Droits pour data_writer (INSERT + SELECT pour la MV)
GRANT INSERT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;
GRANT SELECT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;
-- Droits pour analyst (lecture seule sur les logs parsés)
GRANT SELECT ON mabase_prod.http_logs TO analyst;
Format d'insertion
Le service envoie chaque log corrélé sérialisé en JSON dans la colonne raw_json :
INSERT INTO mabase_prod.http_logs_raw (raw_json) FORMAT JSONEachRow
{"raw_json":"{\"timestamp\":\"2024-01-01T12:00:00Z\",\"src_ip\":\"192.168.1.1\",\"correlated\":true,...}"}
Migration des données existantes
Si vous avez déjà des données dans l'ancienne table http_logs_raw :
INSERT INTO mabase_prod.http_logs (raw_json)
SELECT raw_json
FROM mabase_prod.http_logs_raw;
Sanity checks - Vérification de l'ingestion
Après avoir déployé le service, vérifiez que les données circulent correctement :
-- 1. Tables présentes
SELECT
database,
table,
engine
FROM system.tables
WHERE database = currentDatabase()
AND table IN ('http_logs_raw', 'http_logs', 'mv_http_logs');
-- 2. Définition de la vue matérialisée
SHOW CREATE TABLE mv_http_logs;
-- 3. Vérifier que les inserts bruts arrivent
SELECT
count(*) AS rows_raw,
min(ingest_time) AS min_ingest,
max(ingest_time) AS max_ingest
FROM http_logs_raw;
-- 4. Voir les derniers logs bruts
SELECT
ingest_time,
raw_json
FROM http_logs_raw
ORDER BY ingest_time DESC
LIMIT 5;
-- 5. Vérifier que la MV alimente http_logs
SELECT
count(*) AS rows_flat,
min(time) AS min_time,
max(time) AS max_time
FROM http_logs;
-- 6. Voir les derniers logs parsés
SELECT
time,
src_ip,
dst_ip,
method,
host,
path,
header_user_agent,
tls_version,
ja4
FROM http_logs
ORDER BY time DESC
LIMIT 10;
Interprétation :
- Si
rows_raw> 0 maisrows_flat= 0 : la vue matérialisée ne fonctionne pas (vérifiez les droits SELECT surhttp_logs_raw) - Si les deux comptes sont > 0 : l'ingestion et le parsing fonctionnent correctement
Tests
# Via Docker
./test.sh
# Local
go test ./...
go test -cover ./...
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out
Signaux
| Signal | Comportement |
|---|---|
SIGINT |
Arrêt gracieux |
SIGTERM |
Arrêt gracieux |
Lors de l'arrêt gracieux :
- Fermeture des sockets Unix
- Flush des buffers
- Émission des événements en attente
- Fermeture des connexions ClickHouse
Logs internes
Les logs internes sont envoyés vers stderr :
# Docker
docker logs -f logcorrelator
# Systemd
journalctl -u logcorrelator -f
Structure du projet
.
├── cmd/logcorrelator/ # Point d'entrée
├── internal/
│ ├── adapters/
│ │ ├── inbound/unixsocket/ # Sources HTTP et réseau
│ │ └── outbound/
│ │ ├── clickhouse/ # Sink ClickHouse
│ │ ├── file/ # Sink fichier
│ │ └── multi/ # Multi-sink
│ ├── app/ # Orchestration
│ ├── config/ # Configuration YAML
│ ├── domain/ # Domaine (corrélation)
│ ├── observability/ # Logging interne
│ └── ports/ # Interfaces
├── config.example.yml # Exemple de config
├── Dockerfile # Build multi-stage
├── Dockerfile.package # Packaging RPM
├── Makefile # Commandes de build
├── architecture.yml # Spécification architecture
└── logcorrelator.service # Unité systemd
License
MIT
Troubleshooting
ClickHouse : erreurs d'insertion
Erreur : No such column timestamp
- Vérifiez que la table de destination est bien
http_logs_raw(colonne uniqueraw_json) - Le service envoie un JSON sérialisé dans
raw_json, pas des colonnes séparées
Erreur : ACCESS_DENIED
- Vérifiez les droits de l'utilisateur
data_writer:GRANT INSERT(raw_json) ON mabase_prod.http_logs_raw TO data_writer; GRANT SELECT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;
Vue matérialisée ne fonctionne pas
Symptôme : http_logs_raw a des données, mais http_logs est vide
-
Vérifiez que la MV existe :
SHOW CREATE TABLE mv_http_logs; -
Vérifiez les droits SELECT pour
data_writersurhttp_logs_raw -
Testez manuellement :
INSERT INTO mabase_prod.http_logs SELECT * FROM mabase_prod.mv_http_logs WHERE time > now() - INTERVAL 1 HOUR;
Sockets Unix : permission denied
Erreur : permission denied sur /var/run/logcorrelator/*.sock
- Vérifiez que les sockets ont les permissions
0666 - Vérifiez que l'utilisateur
logcorrelatorpeut lire/écrire
Service systemd ne démarre pas
-
Vérifiez les logs :
journalctl -u logcorrelator -n 50 --no-pager -
Vérifiez la configuration :
cat /etc/logcorrelator/logcorrelator.yml -
Testez manuellement :
/usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.yml