Files
logcorrelator/architecture.yml
toto 58b23ccc1e
Some checks failed
Build and Test / test (push) Has been cancelled
Build and Test / build (push) Has been cancelled
Build and Test / docker (push) Has been cancelled
docs: update ClickHouse schema (http_logs_raw + http_logs)
- README.md: documenter les deux tables (raw + enrichie)
- architecture.yml: décrire le schema complet avec colonnes matérialisées
- Table http_logs_raw: ingestion JSON brut (colonne raw_json unique)
- Table http_logs: extraction des champs via DEFAULT JSONExtract*

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
2026-03-03 11:53:13 +01:00

625 lines
20 KiB
YAML
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

service:
name: logcorrelator
context: http-network-correlation
language: go
pattern: hexagonal
description: >
logcorrelator est un service système (lancé par systemd) écrit en Go, chargé
de recevoir deux flux de logs JSON via des sockets Unix, de corréler les
événements HTTP applicatifs (source A, typiquement Apache ou reverse proxy)
avec des événements réseau (source B, métadonnées IP/TCP, JA3/JA4, etc.)
sur la base de la combinaison strictement définie src_ip + src_port, avec
une fenêtre temporelle configurable. Le service supporte les connexions
HTTP Keep-Alive : un log réseau peut être corrélé à plusieurs logs HTTP
successifs (stratégie 1àN). La rétention en mémoire est bornée par des
tailles de caches configurables et un TTL dynamique pour la source B. Le
service émet toujours les événements A même lorsquaucun événement B nest
disponible, német jamais de logs B seuls, et pousse les résultats vers
ClickHouse et/ou un fichier local.
runtime:
deployment:
unit_type: systemd
description: >
logcorrelator est livré sous forme de binaire autonome, exécuté comme un
service systemd. L'unité systemd assure le démarrage automatique au boot,
le redémarrage en cas de crash, et une intégration standard dans l'écosystème
Linux.
binary_path: /usr/bin/logcorrelator
config_path: /etc/logcorrelator/logcorrelator.yml
user: logcorrelator
group: logcorrelator
restart: on-failure
systemd_unit:
path: /etc/systemd/system/logcorrelator.service
content_example: |
[Unit]
Description=logcorrelator service
After=network.target
[Service]
Type=simple
User=logcorrelator
Group=logcorrelator
ExecStart=/usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.yml
ExecReload=/bin/kill -HUP $MAINPID
Restart=on-failure
RestartSec=5
[Install]
WantedBy=multi-user.target
os:
supported:
- rocky-linux-8
- rocky-linux-9
- almalinux-10
- autres-linux-recentes
logs:
stdout_stderr: journald
structured: true
description: >
Les logs internes du service (erreurs, messages dinformation) sont envoyés
vers stdout/stderr et collectés par journald. Ils sont structurés et ne
contiennent pas de données personnelles.
signals:
graceful_shutdown:
- SIGINT
- SIGTERM
reload:
- SIGHUP
description: >
SIGINT/SIGTERM : arrêt propre (arrêt des sockets, vidage des buffers, fermeture
des sinks). SIGHUP : réouverture des fichiers de sortie (utile pour la
rotation des logs via logrotate) sans arrêter le service.
packaging:
description: >
logcorrelator est distribué sous forme de packages .rpm (Rocky Linux, AlmaLinux,
RHEL), construits intégralement dans des conteneurs. Le changelog RPM est mis
à jour à chaque changement de version.
Tous les numéros de version doivent être cohérents entre le spec RPM, le Makefile
(PKG_VERSION), le CHANGELOG.md et les tags git.
formats:
- rpm
target_distros:
- rocky-linux-8
- rocky-linux-9
- almalinux-10
- rhel-8
- rhel-9
- rhel-10
rpm:
tool: fpm
changelog:
source: git # ou CHANGELOG.md
description: >
À chaque build, un script génère un fichier de changelog RPM à partir de
lhistorique (tags/commits) et le passe à fpm (option --rpm-changelog).
contents:
- path: /usr/bin/logcorrelator
type: binary
- path: /etc/logcorrelator/logcorrelator.yml
type: config
directives: "%config(noreplace)"
- path: /etc/logcorrelator/logcorrelator.yml.example
type: doc
description: Fichier d'exemple toujours mis à jour par le RPM.
- path: /etc/systemd/system/logcorrelator.service
type: systemd_unit
- path: /etc/logrotate.d/logcorrelator
type: logrotate_script
logrotate_example: |
/var/log/logcorrelator/*.log {
daily
rotate 7
compress
delaycompress
missingok
notifempty
create 0640 logcorrelator logcorrelator
postrotate
systemctl reload logcorrelator > /dev/null 2>/dev/null || true
endscript
}
config:
format: yaml
location: /etc/logcorrelator/logcorrelator.yml
reload_strategy: signal_sighup_for_files
description: >
Toute la configuration est centralisée dans un fichier YAML lisible. Le RPM
fournit aussi un fichier dexemple mis à jour à chaque version.
example: |
# /etc/logcorrelator/logcorrelator.yml
log:
level: INFO # DEBUG, INFO, WARN, ERROR
inputs:
unix_sockets:
# Source HTTP (A) : logs applicatifs en JSON, 1 datagramme = 1 log.
- name: http
path: /var/run/logcorrelator/http.sock
socket_permissions: "0666"
socket_type: dgram
max_datagram_bytes: 65535
# Source réseau (B) : logs IP/TCP/JA3... en JSON, 1 datagramme = 1 log.
- name: network
path: /var/run/logcorrelator/network.sock
socket_permissions: "0666"
socket_type: dgram
max_datagram_bytes: 65535
outputs:
file:
enabled: true
path: /var/log/logcorrelator/correlated.log
format: json_lines
clickhouse:
enabled: true
dsn: clickhouse://user:pass@localhost:9000/db
table: http_logs_raw
batch_size: 500
flush_interval_ms: 200
max_buffer_size: 5000
drop_on_overflow: true
async_insert: true
timeout_ms: 1000
stdout:
enabled: false
level: INFO # DEBUG: tous les logs, INFO: seulement corrélés, ERROR: aucun
correlation:
# Fenêtre de corrélation : si le log HTTP arrive avant le réseau, il attend
# au plus cette durée (sauf éviction du cache HTTP).
time_window:
value: 1
unit: s
orphan_policy:
apache_always_emit: true
network_emit: false
matching:
mode: one_to_many # KeepAlive : un B peut corréler plusieurs A.
buffers:
# Tailles max des caches en mémoire (en nombre de logs).
max_http_items: 10000
max_network_items: 20000
ttl:
# Durée de vie standard dun log réseau (B) en mémoire. Chaque corrélation
# réussie avec un A réinitialise ce TTL.
network_ttl_s: 30
inputs:
description: >
Deux flux de logs JSON via sockets Unix datagram (SOCK_DGRAM). Chaque datagramme
contient un JSON complet.
unix_sockets:
- name: http_source
id: A
description: >
Source A, logs HTTP applicatifs (Apache, reverse proxy, etc.). Schéma JSON
variable, champ timestamp obligatoire, headers dynamiques (header_*).
path: /var/run/logcorrelator/http.socket
permissions: "0666"
protocol: unix
socket_type: dgram
mode: datagram
format: json
framing: message
max_datagram_bytes: 65535
retry_on_error: true
- name: network_source
id: B
description: >
Source B, logs réseau (métadonnées IP/TCP, JA3/JA4, etc.). Seuls src_ip
et src_port sont requis pour la corrélation.
path: /var/run/logcorrelator/network.socket
permissions: "0666"
protocol: unix
socket_type: dgram
mode: datagram
format: json
framing: message
max_datagram_bytes: 65535
retry_on_error: true
outputs:
description: >
Les logs corrélés sont envoyés vers un ou plusieurs sinks (MultiSink).
sinks:
file:
enabled: true
description: >
Sink fichier local. Un JSON par ligne. Rotation gérée par logrotate,
réouverture du fichier sur SIGHUP.
path: /var/log/logcorrelator/correlated.log
format: json_lines
rotate_managed_by: external_logrotate
clickhouse:
enabled: true
description: >
Sink principal pour l'archivage et l'analyse quasi temps réel. Inserts
batch asynchrones, drop en cas de saturation.
dsn: clickhouse://user:pass@host:9000/db
table: http_logs_raw
batch_size: 500
flush_interval_ms: 200
max_buffer_size: 5000
drop_on_overflow: true
async_insert: true
timeout_ms: 1000
stdout:
enabled: false
level: INFO # DEBUG: tous les logs, INFO: seulement corrélés, ERROR: aucun
description: >
Sink optionnel pour les tests/développement.
Le niveau de log filtre la sortie : DEBUG émet tout (y compris orphelins),
INFO émet uniquement les logs corrélés, ERROR n'émet rien.
correlation:
description: >
Corrélation stricte basée sur src_ip + src_port et une fenêtre temporelle
configurable. Aucun autre champ nest utilisé pour la décision de corrélation.
key:
- src_ip
- src_port
time_window:
value: 1
unit: s
description: >
Fenêtre de temps appliquée aux timestamps de A et B. Si B narrive pas dans
ce délai, A est émis comme orphelin.
retention_limits:
max_http_items: 10000
max_network_items: 20000
description: >
Limites des caches. Si max_http_items est atteint, le plus ancien A est
évincé et émis orphelin. Si max_network_items est atteint, le plus ancien B
est supprimé silencieusement.
ttl_management:
network_ttl_s: 30
description: >
TTL des logs réseau. Chaque fois quun B est corrélé à un A (KeepAlive),
son TTL est remis à cette valeur.
timestamp_source:
apache: field_timestamp
network: reception_time
orphan_policy:
apache_always_emit: true
network_emit: false
matching:
mode: one_to_many
description: >
Stratégie 1àN : un log réseau peut être utilisé pour plusieurs logs HTTP
successifs tant quil na pas expiré ni été évincé.
schema:
description: >
Schémas variables pour A et B. Quelques champs seulement sont obligatoires
pour la corrélation, les autres sont acceptés sans modification de code.
source_A:
description: >
Logs HTTP applicatifs au format JSON.
required_fields:
- name: src_ip
type: string
- name: src_port
type: int
- name: timestamp
type: int64
unit: ns
optional_fields:
- name: time
type: string
- name: dst_ip
type: string
- name: dst_port
type: int
- name: method
type: string
- name: path
type: string
- name: host
type: string
- name: http_version
type: string
dynamic_fields:
- pattern: header_*
target_map: headers
- pattern: "*"
target_map: extra
source_B:
description: Logs réseau JSON (IP/TCP, JA3/JA4...).
required_fields:
- name: src_ip
type: string
- name: src_port
type: int
optional_fields:
- name: dst_ip
type: string
- name: dst_port
type: int
dynamic_fields:
- pattern: "*"
target_map: extra
normalized_event:
description: >
Représentation interne unifiée des événements A/B.
fields:
- name: source
type: enum("A","B")
- name: timestamp
type: time.Time
- name: src_ip
type: string
- name: src_port
type: int
- name: dst_ip
type: string
optional: true
- name: dst_port
type: int
optional: true
- name: headers
type: map[string]string
optional: true
- name: extra
type: map[string]any
correlated_log:
description: >
Structure du log corrélé émis vers les sinks.
fields:
- name: timestamp
type: time.Time
- name: src_ip
type: string
- name: src_port
type: int
- name: dst_ip
type: string
optional: true
- name: dst_port
type: int
optional: true
- name: correlated
type: bool
- name: orphan_side
type: string
- name: "*"
type: map[string]any
clickhouse_schema:
strategy: external_ddls
description: >
La table ClickHouse est gérée en dehors du service. Deux tables sont utilisées :
http_logs_raw (table d'ingestion avec le JSON brut) et http_logs (table enrichie
avec extraction des champs via des colonnes matérialisées).
tables:
- name: http_logs_raw
description: >
Table d'ingestion brute. Une seule colonne raw_json contient le log corrélé
complet sérialisé en JSON. Le service insère via INSERT INTO http_logs_raw (raw_json).
engine: MergeTree
order_by: tuple()
columns:
- name: raw_json
type: String
insert_format: >
INSERT INTO http_logs_raw (raw_json) FORMAT JSONEachRow
{"raw_json":"{...log corrélé sérialisé en JSON...}"}
- name: http_logs
description: >
Table enrichie avec extraction des champs du JSON brut via des expressions DEFAULT.
Partitionnée par mois, optimisée pour les requêtes analytiques.
engine: MergeTree
partition_by: toYYYYMM(log_date)
order_by: (log_date, dst_ip, src_ip, time)
columns:
- name: raw_json
type: String
- name: time_str
type: String
default: JSONExtractString(raw_json, 'time')
- name: timestamp_str
type: String
default: JSONExtractString(raw_json, 'timestamp')
- name: time
type: DateTime
default: parseDateTimeBestEffort(time_str)
- name: log_date
type: Date
default: toDate(time)
- name: src_ip
type: IPv4
default: toIPv4(JSONExtractString(raw_json, 'src_ip'))
- name: src_port
type: UInt16
default: toUInt16(JSONExtractUInt(raw_json, 'src_port'))
- name: dst_ip
type: IPv4
default: toIPv4(JSONExtractString(raw_json, 'dst_ip'))
- name: dst_port
type: UInt16
default: toUInt16(JSONExtractUInt(raw_json, 'dst_port'))
- name: correlated
type: UInt8
default: JSONExtractBool(raw_json, 'correlated')
- name: keepalives
type: UInt16
default: toUInt16(JSONExtractUInt(raw_json, 'keepalives'))
- name: method
type: LowCardinality(String)
default: JSONExtractString(raw_json, 'method')
- name: scheme
type: LowCardinality(String)
default: JSONExtractString(raw_json, 'scheme')
- name: host
type: LowCardinality(String)
default: JSONExtractString(raw_json, 'host')
- name: path
type: String
default: JSONExtractString(raw_json, 'path')
- name: query
type: String
default: JSONExtractString(raw_json, 'query')
- name: http_version
type: LowCardinality(String)
default: JSONExtractString(raw_json, 'http_version')
- name: orphan_side
type: LowCardinality(String)
default: JSONExtractString(raw_json, 'orphan_side')
- name: a_timestamp
type: UInt64
default: JSONExtractUInt(raw_json, 'a_timestamp')
- name: b_timestamp
type: UInt64
default: JSONExtractUInt(raw_json, 'b_timestamp')
- name: conn_id
type: String
default: JSONExtractString(raw_json, 'conn_id')
- name: ip_meta_df
type: UInt8
default: JSONExtractBool(raw_json, 'ip_meta_df')
- name: ip_meta_id
type: UInt32
default: JSONExtractUInt(raw_json, 'ip_meta_id')
- name: ip_meta_total_length
type: UInt32
default: JSONExtractUInt(raw_json, 'ip_meta_total_length')
- name: ip_meta_ttl
type: UInt8
default: JSONExtractUInt(raw_json, 'ip_meta_ttl')
- name: tcp_meta_options
type: LowCardinality(String)
default: JSONExtractString(raw_json, 'tcp_meta_options')
- name: tcp_meta_window_size
type: UInt32
default: JSONExtractUInt(raw_json, 'tcp_meta_window_size')
- name: syn_to_clienthello_ms
type: Int32
default: toInt32(JSONExtractInt(raw_json, 'syn_to_clienthello_ms'))
- name: tls_version
type: LowCardinality(String)
default: JSONExtractString(raw_json, 'tls_version')
- name: tls_sni
type: LowCardinality(String)
default: JSONExtractString(raw_json, 'tls_sni')
- name: ja3
type: String
default: JSONExtractString(raw_json, 'ja3')
- name: ja3_hash
type: String
default: JSONExtractString(raw_json, 'ja3_hash')
- name: ja4
type: String
default: JSONExtractString(raw_json, 'ja4')
- name: extra
type: JSON
default: raw_json
architecture:
description: >
Architecture hexagonale : domaine de corrélation indépendant, ports abstraits
pour les sources/sinks, adaptateurs pour sockets Unix, fichier et ClickHouse,
couche application dorchestration, et modules infra (config, observabilité).
modules:
- name: cmd/logcorrelator
type: entrypoint
responsibilities:
- Chargement de la configuration YAML.
- Initialisation des adaptateurs d'entrée/sortie.
- Création du CorrelationService.
- Démarrage de lorchestrateur.
- Gestion des signaux (SIGINT, SIGTERM, SIGHUP).
- name: internal/domain
type: domain
responsibilities:
- Modèles NormalizedEvent et CorrelatedLog.
- CorrelationService (fenêtre, TTL, buffers bornés, 1àN, orphelins).
- name: internal/ports
type: ports
responsibilities:
- EventSource, CorrelatedLogSink, CorrelationProcessor.
- name: internal/app
type: application
responsibilities:
- Orchestrator : EventSource → CorrelationService → MultiSink.
- name: internal/adapters/inbound/unixsocket
type: adapter_inbound
responsibilities:
- Lecture Unix datagram (SOCK_DGRAM) et parsing JSON → NormalizedEvent.
- name: internal/adapters/outbound/file
type: adapter_outbound
responsibilities:
- Écriture JSON lines.
- Réouverture du fichier sur SIGHUP.
- name: internal/adapters/outbound/clickhouse
type: adapter_outbound
responsibilities:
- Bufferisation + inserts batch, gestion du drop_on_overflow.
- name: internal/adapters/outbound/multi
type: adapter_outbound
responsibilities:
- Fanout vers plusieurs sinks.
- name: internal/config
type: infrastructure
responsibilities:
- Chargement/validation de la configuration YAML.
- name: internal/observability
type: infrastructure
responsibilities:
- Logging interne, métriques (tailles des caches, évictions, erreurs datagram).
testing:
unit:
description: >
Tests unitaires tabledriven, couverture cible ≥ 80 %, focale sur la logique
de corrélation, les caches et les sinks.
coverage_minimum: 0.8
focus:
- CorrelationService (fenêtre, TTL, évictions, 1àN)
- Parsing A/B → NormalizedEvent (datagrammes)
- ClickHouseSink (batching, overflow)
- FileSink (réouverture sur SIGHUP)
- MultiSink
integration:
description: >
Tests dintégration validant le flux complet A+B → corrélation → sinks,
avec sockets Unix datagram simulées, ClickHouse mocké et scénarios KeepAlive.
docker:
description: >
Build, tests et packaging RPM sont exécutés intégralement dans des conteneurs
via un multistage build.
build_pipeline:
multi_stage: true
stages:
- name: test_and_compile
base: golang:latest
description: >
go test ./... (échec si couverture < 80 %), puis compilation dun binaire
statique (CGO_ENABLED=0, GOOS=linux, GOARCH=amd64).
- name: rpm_builder
base: ruby:alpine
description: >
Installation de fpm, git et outils RPM. Génération du changelog RPM à
partir de lhistorique. Construction des .rpm pour les différentes
distributions.
- name: output_export
base: scratch
description: >
Étape minimale pour exposer les paquets RPM produits (docker build --output).