toto 379b310381 fix(rpm): correct spec file paths for source archive
The source archive contains files directly (usr/, etc/, var/)
not in a tmp/pkgroot subdirectory.

Fixed paths in %install section:
- Before: %{_sourcedir}/../tmp/pkgroot/usr/bin/logcorrelator
- After: %{_sourcedir}/usr/bin/logcorrelator

This fixes the rpmbuild error:
  install: cannot stat '/root/rpmbuild/SOURCES/../tmp/pkgroot/usr/bin/logcorrelator': No such file or directory

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
2026-03-03 22:21:51 +00:00

logcorrelator

Service de corrélation de logs HTTP et réseau écrit en Go.

Description

logcorrelator reçoit deux flux de logs JSON via des sockets Unix :

  • Source A : logs HTTP applicatifs (Apache, reverse proxy)
  • Source B : logs réseau (métadonnées IP/TCP, JA3/JA4, etc.)

Il corrèle les événements sur la base de src_ip + src_port avec une fenêtre temporelle configurable, et produit des logs corrélés vers :

  • Un fichier local (JSON lines)
  • ClickHouse (pour analyse et archivage)

Architecture

┌─────────────────┐     ┌──────────────────┐     ┌─────────────────┐
│  Apache Source  │────▶│                  │────▶│   File Sink     │
│  (Unix Socket)  │     │  Correlation     │     │   (JSON lines)  │
└─────────────────┘     │  Service         │     └─────────────────┘
                        │                  │
┌─────────────────┐     │  - Buffers       │     ┌─────────────────┐
│  Network Source │────▶│  - Time Window   │────▶│  ClickHouse     │
│  (Unix Socket)  │     │  - Orphan Policy │     │  Sink           │
└─────────────────┘     └──────────────────┘     └─────────────────┘

Build (100% Docker)

Tout le build et les tests s'exécutent dans des containers Docker :

# Build complet (binaire + tests + RPM)
make package-rpm

# Uniquement les tests
make test

# Build manuel avec Docker
docker build --target builder -t logcorrelator-builder .
docker build --target runtime -t logcorrelator:latest .

Prérequis

  • Docker 20.10+
  • Bash

Installation

Depuis Docker

# Build de l'image
docker build --target runtime -t logcorrelator:latest .

# Exécuter
docker run -d \
    --name logcorrelator \
    -v /var/run/logcorrelator:/var/run/logcorrelator \
    -v /var/log/logcorrelator:/var/log/logcorrelator \
    -v ./config.example.yml:/etc/logcorrelator/logcorrelator.yml \
    logcorrelator:latest

Depuis les packages RPM

# Générer les packages
make package-rpm

# Installer le package RPM (Rocky Linux 8/9/10)
sudo dnf install -y dist/rpm/el8/logcorrelator-1.1.6-1.el8.x86_64.rpm
sudo dnf install -y dist/rpm/el9/logcorrelator-1.1.6-1.el9.x86_64.rpm
sudo dnf install -y dist/rpm/el10/logcorrelator-1.1.6-1.el10.x86_64.rpm

# Activer et démarrer le service
sudo systemctl enable logcorrelator
sudo systemctl start logcorrelator

# Vérifier le statut
sudo systemctl status logcorrelator

Build manuel (sans Docker)

# Prérequis: Go 1.21+
go build -o logcorrelator ./cmd/logcorrelator

# Exécuter
./logcorrelator -config config.example.yml

Configuration

La configuration utilise un fichier YAML. Voir config.example.yml pour un exemple complet.

# /etc/logcorrelator/logcorrelator.yml

log:
  level: INFO  # DEBUG, INFO, WARN, ERROR

inputs:
  unix_sockets:
    # Source HTTP (A) : logs applicatifs en JSON
    - name: http
      path: /var/run/logcorrelator/http.sock
      socket_permissions: "0666"
      socket_type: dgram

    # Source réseau (B) : logs IP/TCP/JA3... en JSON
    - name: network
      path: /var/run/logcorrelator/network.sock
      socket_permissions: "0666"
      socket_type: dgram

outputs:
  file:
    enabled: true
    path: /var/log/logcorrelator/correlated.log
    format: json_lines

  clickhouse:
    enabled: true
    dsn: clickhouse://data_writer:password@localhost:9000/mabase_prod
    table: http_logs_raw
    batch_size: 500
    flush_interval_ms: 200
    max_buffer_size: 5000
    drop_on_overflow: true

correlation:
  time_window:
    value: 1
    unit: s
  orphan_policy:
    apache_always_emit: true
    network_emit: false
  matching:
    mode: one_to_many  # Keep-Alive : un B peut corréler plusieurs A
  buffers:
    max_http_items: 10000
    max_network_items: 20000
  ttl:
    network_ttl_s: 30

Format du DSN ClickHouse

clickhouse://username:password@host:port/database

Exemple : clickhouse://data_writer:MonMotDePasse@127.0.0.1:9000/mabase_prod

Ports courants :

  • 9000 : port natif (recommandé pour le driver Go)
  • 8123 : port HTTP (alternative)

Format des logs

Source A (HTTP)

{
  "src_ip": "192.168.1.1",
  "src_port": 8080,
  "dst_ip": "10.0.0.1",
  "dst_port": 80,
  "timestamp": 1704110400000000000,
  "method": "GET",
  "path": "/api/test",
  "header_host": "example.com"
}

Source B (Réseau)

{
  "src_ip": "192.168.1.1",
  "src_port": 8080,
  "dst_ip": "10.0.0.1",
  "dst_port": 443,
  "ja3": "abc123def456",
  "ja4": "xyz789"
}

Log corrélé (sortie)

Version 1.0.3+ - Structure JSON plate (flat) :

{
  "timestamp": "2024-01-01T12:00:00Z",
  "src_ip": "192.168.1.1",
  "src_port": 8080,
  "dst_ip": "10.0.0.1",
  "dst_port": 80,
  "correlated": true,
  "method": "GET",
  "path": "/api/test",
  "ja3": "abc123def456",
  "ja4": "xyz789"
}

Tous les champs des sources A et B sont fusionnés au même niveau. Les champs de corrélation (src_ip, src_port, dst_ip, dst_port, correlated, orphan_side) sont toujours présents, et tous les autres champs des logs sources sont ajoutés directement à la racine.

Schema ClickHouse

Le service utilise deux tables ClickHouse dans la base mabase_prod :

Setup complet

-- 1. Créer la base de données
CREATE DATABASE IF NOT EXISTS mabase_prod;

-- 2. Table brute avec TTL (1 jour de rétention)
DROP TABLE IF EXISTS mabase_prod.http_logs_raw;

CREATE TABLE mabase_prod.http_logs_raw
(
    raw_json    String,
    ingest_time DateTime DEFAULT now()
)
ENGINE = MergeTree
PARTITION BY toDate(ingest_time)
ORDER BY ingest_time;

-- 3. Table parsée
DROP TABLE IF EXISTS mabase_prod.http_logs;

CREATE TABLE mabase_prod.http_logs
(
    time                  DateTime,
    log_date              Date DEFAULT toDate(time),

    src_ip                IPv4,
    src_port              UInt16,
    dst_ip                IPv4,
    dst_port              UInt16,

    method                LowCardinality(String),
    scheme                LowCardinality(String),
    host                  LowCardinality(String),
    path                  String,
    query                 String,
    http_version          LowCardinality(String),
    orphan_side           LowCardinality(String),

    correlated            UInt8,
    keepalives            UInt16,
    a_timestamp           UInt64,
    b_timestamp           UInt64,
    conn_id               String,

    ip_meta_df            UInt8,
    ip_meta_id            UInt32,
    ip_meta_total_length  UInt32,
    ip_meta_ttl           UInt8,
    tcp_meta_options      LowCardinality(String),
    tcp_meta_window_size  UInt32,
    syn_to_clienthello_ms Int32,

    tls_version           LowCardinality(String),
    tls_sni               LowCardinality(String),
    ja3                   String,
    ja3_hash              String,
    ja4                   String,

    header_user_agent       String,
    header_accept           String,
    header_accept_encoding  String,
    header_accept_language  String,
    header_x_request_id     String,
    header_x_trace_id       String,
    header_x_forwarded_for  String,

    header_sec_ch_ua          String,
    header_sec_ch_ua_mobile   String,
    header_sec_ch_ua_platform String,
    header_sec_fetch_dest     String,
    header_sec_fetch_mode     String,
    header_sec_fetch_site     String
)
ENGINE = MergeTree
PARTITION BY log_date
ORDER BY (time, src_ip, dst_ip, ja4);

-- 4. Vue matérialisée (RAW → logs)
DROP VIEW IF EXISTS mabase_prod.mv_http_logs;

CREATE MATERIALIZED VIEW mabase_prod.mv_http_logs
TO mabase_prod.http_logs
AS
SELECT
    -- 1. Temps
    parseDateTimeBestEffort(
        coalesce(JSONExtractString(raw_json, 'time'), '1970-01-01T00:00:00Z')
    )                                                                           AS time,
    toDate(time)                                                                AS log_date,

    -- 2. Réseau L3/L4
    toIPv4(coalesce(JSONExtractString(raw_json, 'src_ip'), '0.0.0.0'))          AS src_ip,
    toUInt16(coalesce(JSONExtractUInt(raw_json, 'src_port'), 0))                AS src_port,
    toIPv4(coalesce(JSONExtractString(raw_json, 'dst_ip'), '0.0.0.0'))          AS dst_ip,
    toUInt16(coalesce(JSONExtractUInt(raw_json, 'dst_port'), 0))                AS dst_port,

    -- 3. HTTP de base
    coalesce(JSONExtractString(raw_json, 'method'),        '')                  AS method,
    coalesce(JSONExtractString(raw_json, 'scheme'),        '')                  AS scheme,
    coalesce(JSONExtractString(raw_json, 'host'),          '')                  AS host,
    coalesce(JSONExtractString(raw_json, 'path'),          '')                  AS path,
    coalesce(JSONExtractString(raw_json, 'query'),         '')                  AS query,
    coalesce(JSONExtractString(raw_json, 'http_version'),  '')                  AS http_version,
    coalesce(JSONExtractString(raw_json, 'orphan_side'),   '')                  AS orphan_side,

    -- 4. Connexion / corrélation
    toUInt8(coalesce(JSONExtractBool(raw_json, 'correlated'), 0))               AS correlated,
    toUInt16(coalesce(JSONExtractUInt(raw_json, 'keepalives'), 0))              AS keepalives,
    coalesce(JSONExtractUInt(raw_json, 'a_timestamp'), 0)                       AS a_timestamp,
    coalesce(JSONExtractUInt(raw_json, 'b_timestamp'), 0)                       AS b_timestamp,
    coalesce(JSONExtractString(raw_json, 'conn_id'), '')                        AS conn_id,

    -- 5. IP/TCP
    toUInt8(coalesce(JSONExtractBool(raw_json, 'ip_meta_df'), 0))               AS ip_meta_df,
    coalesce(JSONExtractUInt(raw_json, 'ip_meta_id'), 0)                        AS ip_meta_id,
    coalesce(JSONExtractUInt(raw_json, 'ip_meta_total_length'), 0)              AS ip_meta_total_length,
    coalesce(JSONExtractUInt(raw_json, 'ip_meta_ttl'), 0)                       AS ip_meta_ttl,
    coalesce(JSONExtractString(raw_json, 'tcp_meta_options'), '')               AS tcp_meta_options,
    coalesce(JSONExtractUInt(raw_json, 'tcp_meta_window_size'), 0)              AS tcp_meta_window_size,
    toInt32(coalesce(JSONExtractInt(raw_json, 'syn_to_clienthello_ms'), 0))     AS syn_to_clienthello_ms,

    -- 6. TLS / JA3/JA4
    coalesce(JSONExtractString(raw_json, 'tls_version'), '')                    AS tls_version,
    coalesce(JSONExtractString(raw_json, 'tls_sni'), '')                        AS tls_sni,
    coalesce(JSONExtractString(raw_json, 'ja3'), '')                            AS ja3,
    coalesce(JSONExtractString(raw_json, 'ja3_hash'), '')                       AS ja3_hash,
    coalesce(JSONExtractString(raw_json, 'ja4'), '')                            AS ja4,

    -- 7. Headers HTTP
    coalesce(JSONExtractString(raw_json, 'header_User-Agent'),       '')        AS header_user_agent,
    coalesce(JSONExtractString(raw_json, 'header_Accept'),           '')        AS header_accept,
    coalesce(JSONExtractString(raw_json, 'header_Accept-Encoding'),  '')        AS header_accept_encoding,
    coalesce(JSONExtractString(raw_json, 'header_Accept-Language'),  '')        AS header_accept_language,
    coalesce(JSONExtractString(raw_json, 'header_X-Request-Id'),     '')        AS header_x_request_id,
    coalesce(JSONExtractString(raw_json, 'header_X-Trace-Id'),       '')        AS header_x_trace_id,
    coalesce(JSONExtractString(raw_json, 'header_X-Forwarded-For'),  '')        AS header_x_forwarded_for,

    coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA'),        '')        AS header_sec_ch_ua,
    coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Mobile'), '')        AS header_sec_ch_ua_mobile,
    coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Platform'), '')      AS header_sec_ch_ua_platform,
    coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Dest'),  '')         AS header_sec_fetch_dest,
    coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Mode'), '')          AS header_sec_fetch_mode,
    coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Site'), '')          AS header_sec_fetch_site

FROM mabase_prod.http_logs_raw;

Utilisateurs et permissions

-- Créer les utilisateurs
CREATE USER IF NOT EXISTS data_writer IDENTIFIED WITH TonMotDePasseInsert;
CREATE USER IF NOT EXISTS analyst IDENTIFIED WITH TonMotDePasseAnalyst;

-- Droits pour data_writer (INSERT + SELECT pour la MV)
GRANT INSERT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;
GRANT SELECT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;

-- Droits pour analyst (lecture seule sur les logs parsés)
GRANT SELECT ON mabase_prod.http_logs TO analyst;

Format d'insertion

Le service envoie chaque log corrélé sérialisé en JSON dans la colonne raw_json :

INSERT INTO mabase_prod.http_logs_raw (raw_json) FORMAT JSONEachRow
{"raw_json":"{\"timestamp\":\"2024-01-01T12:00:00Z\",\"src_ip\":\"192.168.1.1\",\"correlated\":true,...}"}

Migration des données existantes

Si vous avez déjà des données dans l'ancienne table http_logs_raw :

INSERT INTO mabase_prod.http_logs (raw_json)
SELECT raw_json
FROM mabase_prod.http_logs_raw;

Sanity checks - Vérification de l'ingestion

Après avoir déployé le service, vérifiez que les données circulent correctement :

-- 1. Tables présentes
SELECT
    database,
    table,
    engine
FROM system.tables
WHERE database = currentDatabase()
  AND table IN ('http_logs_raw', 'http_logs', 'mv_http_logs');

-- 2. Définition de la vue matérialisée
SHOW CREATE TABLE mv_http_logs;

-- 3. Vérifier que les inserts bruts arrivent
SELECT
    count(*)  AS rows_raw,
    min(ingest_time) AS min_ingest,
    max(ingest_time) AS max_ingest
FROM http_logs_raw;

-- 4. Voir les derniers logs bruts
SELECT
    ingest_time,
    raw_json
FROM http_logs_raw
ORDER BY ingest_time DESC
LIMIT 5;

-- 5. Vérifier que la MV alimente http_logs
SELECT
    count(*) AS rows_flat,
    min(time) AS min_time,
    max(time) AS max_time
FROM http_logs;

-- 6. Voir les derniers logs parsés
SELECT
    time,
    src_ip,
    dst_ip,
    method,
    host,
    path,
    header_user_agent,
    tls_version,
    ja4
FROM http_logs
ORDER BY time DESC
LIMIT 10;

Interprétation :

  • Si rows_raw > 0 mais rows_flat = 0 : la vue matérialisée ne fonctionne pas (vérifiez les droits SELECT sur http_logs_raw)
  • Si les deux comptes sont > 0 : l'ingestion et le parsing fonctionnent correctement

Tests

# Via Docker
./test.sh

# Local
go test ./...
go test -cover ./...
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out

Signaux

Signal Comportement
SIGINT Arrêt gracieux
SIGTERM Arrêt gracieux

Lors de l'arrêt gracieux :

  1. Fermeture des sockets Unix
  2. Flush des buffers
  3. Émission des événements en attente
  4. Fermeture des connexions ClickHouse

Logs internes

Les logs internes sont envoyés vers stderr :

# Docker
docker logs -f logcorrelator

# Systemd
journalctl -u logcorrelator -f

Structure du projet

.
├── cmd/logcorrelator/           # Point d'entrée
├── internal/
│   ├── adapters/
│   │   ├── inbound/unixsocket/  # Sources HTTP et réseau
│   │   └── outbound/
│   │       ├── clickhouse/      # Sink ClickHouse
│   │       ├── file/            # Sink fichier
│   │       └── multi/           # Multi-sink
│   ├── app/                     # Orchestration
│   ├── config/                  # Configuration YAML
│   ├── domain/                  # Domaine (corrélation)
│   ├── observability/           # Logging interne
│   └── ports/                   # Interfaces
├── config.example.yml           # Exemple de config
├── Dockerfile                   # Build multi-stage
├── Dockerfile.package           # Packaging RPM
├── Makefile                     # Commandes de build
├── architecture.yml             # Spécification architecture
└── logcorrelator.service        # Unité systemd

License

MIT

Troubleshooting

ClickHouse : erreurs d'insertion

Erreur : No such column timestamp

  • Vérifiez que la table de destination est bien http_logs_raw (colonne unique raw_json)
  • Le service envoie un JSON sérialisé dans raw_json, pas des colonnes séparées

Erreur : ACCESS_DENIED

  • Vérifiez les droits de l'utilisateur data_writer :
    GRANT INSERT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;
    GRANT SELECT(raw_json) ON mabase_prod.http_logs_raw TO data_writer;
    

Vue matérialisée ne fonctionne pas

Symptôme : http_logs_raw a des données, mais http_logs est vide

  1. Vérifiez que la MV existe :

    SHOW CREATE TABLE mv_http_logs;
    
  2. Vérifiez les droits SELECT pour data_writer sur http_logs_raw

  3. Testez manuellement :

    INSERT INTO mabase_prod.http_logs
    SELECT * FROM mabase_prod.mv_http_logs
    WHERE time > now() - INTERVAL 1 HOUR;
    

Sockets Unix : permission denied

Erreur : permission denied sur /var/run/logcorrelator/*.sock

  • Vérifiez que les sockets ont les permissions 0666
  • Vérifiez que l'utilisateur logcorrelator peut lire/écrire

Service systemd ne démarre pas

  1. Vérifiez les logs :

    journalctl -u logcorrelator -n 50 --no-pager
    
  2. Vérifiez la configuration :

    cat /etc/logcorrelator/logcorrelator.yml
    
  3. Testez manuellement :

    /usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.yml
    
Description
No description provided
Readme 1.1 MiB
Languages
Go 81.4%
Shell 8.7%
Python 6.5%
Makefile 1.7%
Dockerfile 1.7%