service: name: logcorrelator context: http-network-correlation language: go pattern: hexagonal description: > logcorrelator est un service système (lancé par systemd) écrit en Go, chargé de recevoir deux flux de logs JSON via des sockets Unix, de corréler les événements HTTP applicatifs (source A, typiquement Apache ou reverse proxy) avec des événements réseau (source B, métadonnées IP/TCP, JA3/JA4, etc.) sur la base de la combinaison strictement définie src_ip + src_port, avec une fenêtre temporelle configurable. Le service supporte les connexions HTTP Keep-Alive : un log réseau peut être corrélé à plusieurs logs HTTP successifs (stratégie 1‑à‑N). La rétention en mémoire est bornée par des tailles de caches configurables et un TTL dynamique pour la source B. Le service émet toujours les événements A même lorsqu’aucun événement B n’est disponible, n’émet jamais de logs B seuls, et pousse les résultats vers ClickHouse et/ou un fichier local. runtime: deployment: unit_type: systemd description: > logcorrelator est livré sous forme de binaire autonome, exécuté comme un service systemd. L'unité systemd assure le démarrage automatique au boot, le redémarrage en cas de crash, et une intégration standard dans l'écosystème Linux. binary_path: /usr/bin/logcorrelator config_path: /etc/logcorrelator/logcorrelator.yml user: logcorrelator group: logcorrelator restart: on-failure systemd_unit: path: /etc/systemd/system/logcorrelator.service content_example: | [Unit] Description=logcorrelator service After=network.target [Service] Type=simple User=logcorrelator Group=logcorrelator ExecStart=/usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.yml ExecReload=/bin/kill -HUP $MAINPID Restart=on-failure RestartSec=5 [Install] WantedBy=multi-user.target os: supported: - rocky-linux-8 - rocky-linux-9 - almalinux-10 - autres-linux-recentes logs: stdout_stderr: journald structured: true description: > Les logs internes du service (erreurs, messages d’information) sont envoyés vers stdout/stderr et collectés par journald. Ils sont structurés et ne contiennent pas de données personnelles. signals: graceful_shutdown: - SIGINT - SIGTERM reload: - SIGHUP description: > SIGINT/SIGTERM : arrêt propre (arrêt des sockets, vidage des buffers, fermeture des sinks). SIGHUP : réouverture des fichiers de sortie (utile pour la rotation des logs via logrotate) sans arrêter le service. packaging: description: > logcorrelator est distribué sous forme de packages .rpm (Rocky Linux, AlmaLinux, RHEL), construits intégralement dans des conteneurs. Le changelog RPM est mis à jour à chaque changement de version. Tous les numéros de version doivent être cohérents entre le spec RPM, le Makefile (PKG_VERSION), le CHANGELOG.md et les tags git. formats: - rpm target_distros: - rocky-linux-8 - rocky-linux-9 - almalinux-10 - rhel-8 - rhel-9 - rhel-10 rpm: tool: fpm changelog: source: git # ou CHANGELOG.md description: > À chaque build, un script génère un fichier de changelog RPM à partir de l’historique (tags/commits) et le passe à fpm (option --rpm-changelog). contents: - path: /usr/bin/logcorrelator type: binary - path: /etc/logcorrelator/logcorrelator.yml type: config directives: "%config(noreplace)" - path: /etc/logcorrelator/logcorrelator.yml.example type: doc description: Fichier d'exemple toujours mis à jour par le RPM. - path: /etc/systemd/system/logcorrelator.service type: systemd_unit - path: /etc/logrotate.d/logcorrelator type: logrotate_script logrotate_example: | /var/log/logcorrelator/*.log { daily rotate 7 compress delaycompress missingok notifempty create 0640 logcorrelator logcorrelator postrotate systemctl reload logcorrelator > /dev/null 2>/dev/null || true endscript } config: format: yaml location: /etc/logcorrelator/logcorrelator.yml reload_strategy: signal_sighup_for_files description: > Toute la configuration est centralisée dans un fichier YAML lisible. Le RPM fournit aussi un fichier d’exemple mis à jour à chaque version. example: | # /etc/logcorrelator/logcorrelator.yml log: level: INFO # DEBUG, INFO, WARN, ERROR inputs: unix_sockets: # Source HTTP (A) : logs applicatifs en JSON, 1 datagramme = 1 log. - name: http path: /var/run/logcorrelator/http.sock socket_permissions: "0666" socket_type: dgram max_datagram_bytes: 65535 # Source réseau (B) : logs IP/TCP/JA3... en JSON, 1 datagramme = 1 log. - name: network path: /var/run/logcorrelator/network.sock socket_permissions: "0666" socket_type: dgram max_datagram_bytes: 65535 outputs: file: enabled: true path: /var/log/logcorrelator/correlated.log format: json_lines clickhouse: enabled: true dsn: clickhouse://user:pass@localhost:9000/db table: http_logs_raw batch_size: 500 flush_interval_ms: 200 max_buffer_size: 5000 drop_on_overflow: true async_insert: true timeout_ms: 1000 stdout: enabled: false level: INFO # DEBUG: tous les logs, INFO: seulement corrélés, ERROR: aucun correlation: # Fenêtre de corrélation : si le log HTTP arrive avant le réseau, il attend # au plus cette durée (sauf éviction du cache HTTP). time_window: value: 1 unit: s orphan_policy: apache_always_emit: true network_emit: false matching: mode: one_to_many # Keep‑Alive : un B peut corréler plusieurs A. buffers: # Tailles max des caches en mémoire (en nombre de logs). max_http_items: 10000 max_network_items: 20000 ttl: # Durée de vie standard d’un log réseau (B) en mémoire. Chaque corrélation # réussie avec un A réinitialise ce TTL. network_ttl_s: 30 inputs: description: > Deux flux de logs JSON via sockets Unix datagram (SOCK_DGRAM). Chaque datagramme contient un JSON complet. unix_sockets: - name: http_source id: A description: > Source A, logs HTTP applicatifs (Apache, reverse proxy, etc.). Schéma JSON variable, champ timestamp obligatoire, headers dynamiques (header_*). path: /var/run/logcorrelator/http.socket permissions: "0666" protocol: unix socket_type: dgram mode: datagram format: json framing: message max_datagram_bytes: 65535 retry_on_error: true - name: network_source id: B description: > Source B, logs réseau (métadonnées IP/TCP, JA3/JA4, etc.). Seuls src_ip et src_port sont requis pour la corrélation. path: /var/run/logcorrelator/network.socket permissions: "0666" protocol: unix socket_type: dgram mode: datagram format: json framing: message max_datagram_bytes: 65535 retry_on_error: true outputs: description: > Les logs corrélés sont envoyés vers un ou plusieurs sinks (MultiSink). sinks: file: enabled: true description: > Sink fichier local. Un JSON par ligne. Rotation gérée par logrotate, réouverture du fichier sur SIGHUP. path: /var/log/logcorrelator/correlated.log format: json_lines rotate_managed_by: external_logrotate clickhouse: enabled: true description: > Sink principal pour l'archivage et l'analyse quasi temps réel. Inserts batch asynchrones, drop en cas de saturation. dsn: clickhouse://user:pass@host:9000/db table: http_logs_raw batch_size: 500 flush_interval_ms: 200 max_buffer_size: 5000 drop_on_overflow: true async_insert: true timeout_ms: 1000 stdout: enabled: false level: INFO # DEBUG: tous les logs, INFO: seulement corrélés, ERROR: aucun description: > Sink optionnel pour les tests/développement. Le niveau de log filtre la sortie : DEBUG émet tout (y compris orphelins), INFO émet uniquement les logs corrélés, ERROR n'émet rien. correlation: description: > Corrélation stricte basée sur src_ip + src_port et une fenêtre temporelle configurable. Aucun autre champ n’est utilisé pour la décision de corrélation. key: - src_ip - src_port time_window: value: 1 unit: s description: > Fenêtre de temps appliquée aux timestamps de A et B. Si B n’arrive pas dans ce délai, A est émis comme orphelin. retention_limits: max_http_items: 10000 max_network_items: 20000 description: > Limites des caches. Si max_http_items est atteint, le plus ancien A est évincé et émis orphelin. Si max_network_items est atteint, le plus ancien B est supprimé silencieusement. ttl_management: network_ttl_s: 30 description: > TTL des logs réseau. Chaque fois qu’un B est corrélé à un A (Keep‑Alive), son TTL est remis à cette valeur. timestamp_source: apache: field_timestamp network: reception_time orphan_policy: apache_always_emit: true network_emit: false matching: mode: one_to_many description: > Stratégie 1‑à‑N : un log réseau peut être utilisé pour plusieurs logs HTTP successifs tant qu’il n’a pas expiré ni été évincé. schema: description: > Schémas variables pour A et B. Quelques champs seulement sont obligatoires pour la corrélation, les autres sont acceptés sans modification de code. source_A: description: > Logs HTTP applicatifs au format JSON. required_fields: - name: src_ip type: string - name: src_port type: int - name: timestamp type: int64 unit: ns optional_fields: - name: time type: string - name: dst_ip type: string - name: dst_port type: int - name: method type: string - name: path type: string - name: host type: string - name: http_version type: string dynamic_fields: - pattern: header_* target_map: headers - pattern: "*" target_map: extra source_B: description: Logs réseau JSON (IP/TCP, JA3/JA4...). required_fields: - name: src_ip type: string - name: src_port type: int optional_fields: - name: dst_ip type: string - name: dst_port type: int dynamic_fields: - pattern: "*" target_map: extra normalized_event: description: > Représentation interne unifiée des événements A/B. fields: - name: source type: enum("A","B") - name: timestamp type: time.Time - name: src_ip type: string - name: src_port type: int - name: dst_ip type: string optional: true - name: dst_port type: int optional: true - name: headers type: map[string]string optional: true - name: extra type: map[string]any correlated_log: description: > Structure du log corrélé émis vers les sinks. fields: - name: timestamp type: time.Time - name: src_ip type: string - name: src_port type: int - name: dst_ip type: string optional: true - name: dst_port type: int optional: true - name: correlated type: bool - name: orphan_side type: string - name: "*" type: map[string]any clickhouse_schema: strategy: external_ddls database: mabase_prod description: > La table ClickHouse est gérée en dehors du service. Deux tables sont utilisées : http_logs_raw (table d'ingestion avec TTL 1 jour) et http_logs (table enrichie avec extraction des champs via des colonnes matérialisées). Une vue matérialisée transfère automatiquement les données de RAW vers parsée. tables: - name: http_logs_raw description: > Table d'ingestion brute avec TTL. Une seule colonne raw_json contient le log corrélé complet sérialisé en JSON. TTL de 1 jour pour limiter le stockage. engine: MergeTree order_by: tuple() ttl: ingest_time + INTERVAL 1 DAY settings: ttl_only_drop_parts: 1 columns: - name: raw_json type: String - name: ingest_time type: DateTime default: now() insert_format: | INSERT INTO mabase_prod.http_logs_raw (raw_json) FORMAT JSONEachRow {"raw_json":"{...log corrélé sérialisé en JSON...}"} - name: http_logs description: > Table enrichie avec extraction des champs du JSON brut via des expressions DEFAULT. Partitionnée par mois, optimisée pour les requêtes analytiques. engine: MergeTree partition_by: toYYYYMM(log_date) order_by: (log_date, dst_ip, src_ip, time) columns: - name: raw_json type: String - name: time_str type: String default: JSONExtractString(raw_json, 'time') - name: timestamp_str type: String default: JSONExtractString(raw_json, 'timestamp') - name: time type: DateTime default: parseDateTimeBestEffort(time_str) - name: log_date type: Date default: toDate(time) - name: src_ip type: IPv4 default: toIPv4(JSONExtractString(raw_json, 'src_ip')) - name: src_port type: UInt16 default: toUInt16(JSONExtractUInt(raw_json, 'src_port')) - name: dst_ip type: IPv4 default: toIPv4(JSONExtractString(raw_json, 'dst_ip')) - name: dst_port type: UInt16 default: toUInt16(JSONExtractUInt(raw_json, 'dst_port')) - name: correlated type: UInt8 default: JSONExtractBool(raw_json, 'correlated') - name: keepalives type: UInt16 default: toUInt16(JSONExtractUInt(raw_json, 'keepalives')) - name: method type: LowCardinality(String) default: JSONExtractString(raw_json, 'method') - name: scheme type: LowCardinality(String) default: JSONExtractString(raw_json, 'scheme') - name: host type: LowCardinality(String) default: JSONExtractString(raw_json, 'host') - name: path type: String default: JSONExtractString(raw_json, 'path') - name: query type: String default: JSONExtractString(raw_json, 'query') - name: http_version type: LowCardinality(String) default: JSONExtractString(raw_json, 'http_version') - name: orphan_side type: LowCardinality(String) default: JSONExtractString(raw_json, 'orphan_side') - name: a_timestamp type: UInt64 default: JSONExtractUInt(raw_json, 'a_timestamp') - name: b_timestamp type: UInt64 default: JSONExtractUInt(raw_json, 'b_timestamp') - name: conn_id type: String default: JSONExtractString(raw_json, 'conn_id') - name: ip_meta_df type: UInt8 default: JSONExtractBool(raw_json, 'ip_meta_df') - name: ip_meta_id type: UInt32 default: JSONExtractUInt(raw_json, 'ip_meta_id') - name: ip_meta_total_length type: UInt32 default: JSONExtractUInt(raw_json, 'ip_meta_total_length') - name: ip_meta_ttl type: UInt8 default: JSONExtractUInt(raw_json, 'ip_meta_ttl') - name: tcp_meta_options type: LowCardinality(String) default: JSONExtractString(raw_json, 'tcp_meta_options') - name: tcp_meta_window_size type: UInt32 default: JSONExtractUInt(raw_json, 'tcp_meta_window_size') - name: syn_to_clienthello_ms type: Int32 default: toInt32(JSONExtractInt(raw_json, 'syn_to_clienthello_ms')) - name: tls_version type: LowCardinality(String) default: JSONExtractString(raw_json, 'tls_version') - name: tls_sni type: LowCardinality(String) default: JSONExtractString(raw_json, 'tls_sni') - name: ja3 type: String default: JSONExtractString(raw_json, 'ja3') - name: ja3_hash type: String default: JSONExtractString(raw_json, 'ja3_hash') - name: ja4 type: String default: JSONExtractString(raw_json, 'ja4') - name: extra type: JSON default: raw_json - name: mv_http_logs type: materialized_view description: > Vue matérialisée qui transfère automatiquement les données de http_logs_raw vers http_logs lors de chaque INSERT. target: mabase_prod.http_logs query: | SELECT raw_json FROM mabase_prod.http_logs_raw users: - name: data_writer description: Utilisateur pour l'insertion des logs (utilisé par logcorrelator) grants: - INSERT(raw_json) ON mabase_prod.http_logs_raw - SELECT(raw_json) ON mabase_prod.http_logs_raw - name: analyst description: Utilisateur pour la lecture des logs parsés (BI, requêtes) grants: - SELECT ON mabase_prod.http_logs migration: description: > Script de migration pour transférer les données existantes de l'ancienne table http_logs_raw vers la nouvelle structure avec vue matérialisée. sql: | INSERT INTO mabase_prod.http_logs (raw_json) SELECT raw_json FROM mabase_prod.http_logs_raw; architecture: description: > Architecture hexagonale : domaine de corrélation indépendant, ports abstraits pour les sources/sinks, adaptateurs pour sockets Unix, fichier et ClickHouse, couche application d’orchestration, et modules infra (config, observabilité). modules: - name: cmd/logcorrelator type: entrypoint responsibilities: - Chargement de la configuration YAML. - Initialisation des adaptateurs d'entrée/sortie. - Création du CorrelationService. - Démarrage de l’orchestrateur. - Gestion des signaux (SIGINT, SIGTERM, SIGHUP). - name: internal/domain type: domain responsibilities: - Modèles NormalizedEvent et CorrelatedLog. - CorrelationService (fenêtre, TTL, buffers bornés, 1‑à‑N, orphelins). - name: internal/ports type: ports responsibilities: - EventSource, CorrelatedLogSink, CorrelationProcessor. - name: internal/app type: application responsibilities: - Orchestrator : EventSource → CorrelationService → MultiSink. - name: internal/adapters/inbound/unixsocket type: adapter_inbound responsibilities: - Lecture Unix datagram (SOCK_DGRAM) et parsing JSON → NormalizedEvent. - name: internal/adapters/outbound/file type: adapter_outbound responsibilities: - Écriture JSON lines. - Réouverture du fichier sur SIGHUP. - name: internal/adapters/outbound/clickhouse type: adapter_outbound responsibilities: - Bufferisation + inserts batch, gestion du drop_on_overflow. - name: internal/adapters/outbound/multi type: adapter_outbound responsibilities: - Fan‑out vers plusieurs sinks. - name: internal/config type: infrastructure responsibilities: - Chargement/validation de la configuration YAML. - name: internal/observability type: infrastructure responsibilities: - Logging interne, métriques (tailles des caches, évictions, erreurs datagram). testing: unit: description: > Tests unitaires table‑driven, couverture cible ≥ 80 %, focale sur la logique de corrélation, les caches et les sinks. coverage_minimum: 0.8 focus: - CorrelationService (fenêtre, TTL, évictions, 1‑à‑N) - Parsing A/B → NormalizedEvent (datagrammes) - ClickHouseSink (batching, overflow) - FileSink (réouverture sur SIGHUP) - MultiSink integration: description: > Tests d’intégration validant le flux complet A+B → corrélation → sinks, avec sockets Unix datagram simulées, ClickHouse mocké et scénarios Keep‑Alive. docker: description: > Build, tests et packaging RPM sont exécutés intégralement dans des conteneurs via un multi‑stage build. build_pipeline: multi_stage: true stages: - name: test_and_compile base: golang:latest description: > go test ./... (échec si couverture < 80 %), puis compilation d’un binaire statique (CGO_ENABLED=0, GOOS=linux, GOARCH=amd64). - name: rpm_builder base: ruby:alpine description: > Installation de fpm, git et outils RPM. Génération du changelog RPM à partir de l’historique. Construction des .rpm pour les différentes distributions. - name: output_export base: scratch description: > Étape minimale pour exposer les paquets RPM produits (docker build --output).