- http_logs_raw: partition by toDate(ingest_time), order by ingest_time - http_logs: explicit columns (no DEFAULT), extracted by MV - mv_http_logs: full SELECT with JSONExtract* + coalesce for all fields - Add 17 HTTP header fields (User-Agent, Accept, Sec-CH-UA, etc.) - New ORDER BY: (time, src_ip, dst_ip, ja4) - architecture.yml: match new schema with MV query details Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
707 lines
24 KiB
YAML
707 lines
24 KiB
YAML
service:
|
||
name: logcorrelator
|
||
context: http-network-correlation
|
||
language: go
|
||
pattern: hexagonal
|
||
description: >
|
||
logcorrelator est un service système (lancé par systemd) écrit en Go, chargé
|
||
de recevoir deux flux de logs JSON via des sockets Unix, de corréler les
|
||
événements HTTP applicatifs (source A, typiquement Apache ou reverse proxy)
|
||
avec des événements réseau (source B, métadonnées IP/TCP, JA3/JA4, etc.)
|
||
sur la base de la combinaison strictement définie src_ip + src_port, avec
|
||
une fenêtre temporelle configurable. Le service supporte les connexions
|
||
HTTP Keep-Alive : un log réseau peut être corrélé à plusieurs logs HTTP
|
||
successifs (stratégie 1‑à‑N). La rétention en mémoire est bornée par des
|
||
tailles de caches configurables et un TTL dynamique pour la source B. Le
|
||
service émet toujours les événements A même lorsqu’aucun événement B n’est
|
||
disponible, n’émet jamais de logs B seuls, et pousse les résultats vers
|
||
ClickHouse et/ou un fichier local.
|
||
|
||
runtime:
|
||
deployment:
|
||
unit_type: systemd
|
||
description: >
|
||
logcorrelator est livré sous forme de binaire autonome, exécuté comme un
|
||
service systemd. L'unité systemd assure le démarrage automatique au boot,
|
||
le redémarrage en cas de crash, et une intégration standard dans l'écosystème
|
||
Linux.
|
||
binary_path: /usr/bin/logcorrelator
|
||
config_path: /etc/logcorrelator/logcorrelator.yml
|
||
user: logcorrelator
|
||
group: logcorrelator
|
||
restart: on-failure
|
||
systemd_unit:
|
||
path: /etc/systemd/system/logcorrelator.service
|
||
content_example: |
|
||
[Unit]
|
||
Description=logcorrelator service
|
||
After=network.target
|
||
|
||
[Service]
|
||
Type=simple
|
||
User=logcorrelator
|
||
Group=logcorrelator
|
||
ExecStart=/usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.yml
|
||
ExecReload=/bin/kill -HUP $MAINPID
|
||
Restart=on-failure
|
||
RestartSec=5
|
||
|
||
[Install]
|
||
WantedBy=multi-user.target
|
||
os:
|
||
supported:
|
||
- rocky-linux-8
|
||
- rocky-linux-9
|
||
- almalinux-10
|
||
- autres-linux-recentes
|
||
logs:
|
||
stdout_stderr: journald
|
||
structured: true
|
||
description: >
|
||
Les logs internes du service (erreurs, messages d’information) sont envoyés
|
||
vers stdout/stderr et collectés par journald. Ils sont structurés et ne
|
||
contiennent pas de données personnelles.
|
||
signals:
|
||
graceful_shutdown:
|
||
- SIGINT
|
||
- SIGTERM
|
||
reload:
|
||
- SIGHUP
|
||
description: >
|
||
SIGINT/SIGTERM : arrêt propre (arrêt des sockets, vidage des buffers, fermeture
|
||
des sinks). SIGHUP : réouverture des fichiers de sortie (utile pour la
|
||
rotation des logs via logrotate) sans arrêter le service.
|
||
|
||
packaging:
|
||
description: >
|
||
logcorrelator est distribué sous forme de packages .rpm (Rocky Linux, AlmaLinux,
|
||
RHEL), construits intégralement dans des conteneurs. Le changelog RPM est mis
|
||
à jour à chaque changement de version.
|
||
Tous les numéros de version doivent être cohérents entre le spec RPM, le Makefile
|
||
(PKG_VERSION), le CHANGELOG.md et les tags git.
|
||
formats:
|
||
- rpm
|
||
target_distros:
|
||
- rocky-linux-8
|
||
- rocky-linux-9
|
||
- almalinux-10
|
||
- rhel-8
|
||
- rhel-9
|
||
- rhel-10
|
||
rpm:
|
||
tool: fpm
|
||
changelog:
|
||
source: git # ou CHANGELOG.md
|
||
description: >
|
||
À chaque build, un script génère un fichier de changelog RPM à partir de
|
||
l’historique (tags/commits) et le passe à fpm (option --rpm-changelog).
|
||
contents:
|
||
- path: /usr/bin/logcorrelator
|
||
type: binary
|
||
- path: /etc/logcorrelator/logcorrelator.yml
|
||
type: config
|
||
directives: "%config(noreplace)"
|
||
- path: /etc/logcorrelator/logcorrelator.yml.example
|
||
type: doc
|
||
description: Fichier d'exemple toujours mis à jour par le RPM.
|
||
- path: /etc/systemd/system/logcorrelator.service
|
||
type: systemd_unit
|
||
- path: /etc/logrotate.d/logcorrelator
|
||
type: logrotate_script
|
||
logrotate_example: |
|
||
/var/log/logcorrelator/*.log {
|
||
daily
|
||
rotate 7
|
||
compress
|
||
delaycompress
|
||
missingok
|
||
notifempty
|
||
create 0640 logcorrelator logcorrelator
|
||
postrotate
|
||
systemctl reload logcorrelator > /dev/null 2>/dev/null || true
|
||
endscript
|
||
}
|
||
|
||
config:
|
||
format: yaml
|
||
location: /etc/logcorrelator/logcorrelator.yml
|
||
reload_strategy: signal_sighup_for_files
|
||
description: >
|
||
Toute la configuration est centralisée dans un fichier YAML lisible. Le RPM
|
||
fournit aussi un fichier d’exemple mis à jour à chaque version.
|
||
example: |
|
||
# /etc/logcorrelator/logcorrelator.yml
|
||
|
||
log:
|
||
level: INFO # DEBUG, INFO, WARN, ERROR
|
||
|
||
inputs:
|
||
unix_sockets:
|
||
# Source HTTP (A) : logs applicatifs en JSON, 1 datagramme = 1 log.
|
||
- name: http
|
||
path: /var/run/logcorrelator/http.sock
|
||
socket_permissions: "0666"
|
||
socket_type: dgram
|
||
max_datagram_bytes: 65535
|
||
|
||
# Source réseau (B) : logs IP/TCP/JA3... en JSON, 1 datagramme = 1 log.
|
||
- name: network
|
||
path: /var/run/logcorrelator/network.sock
|
||
socket_permissions: "0666"
|
||
socket_type: dgram
|
||
max_datagram_bytes: 65535
|
||
|
||
outputs:
|
||
file:
|
||
enabled: true
|
||
path: /var/log/logcorrelator/correlated.log
|
||
format: json_lines
|
||
|
||
clickhouse:
|
||
enabled: true
|
||
dsn: clickhouse://user:pass@localhost:9000/db
|
||
table: http_logs_raw
|
||
batch_size: 500
|
||
flush_interval_ms: 200
|
||
max_buffer_size: 5000
|
||
drop_on_overflow: true
|
||
async_insert: true
|
||
timeout_ms: 1000
|
||
|
||
stdout:
|
||
enabled: false
|
||
level: INFO # DEBUG: tous les logs, INFO: seulement corrélés, ERROR: aucun
|
||
|
||
correlation:
|
||
# Fenêtre de corrélation : si le log HTTP arrive avant le réseau, il attend
|
||
# au plus cette durée (sauf éviction du cache HTTP).
|
||
time_window:
|
||
value: 1
|
||
unit: s
|
||
|
||
orphan_policy:
|
||
apache_always_emit: true
|
||
network_emit: false
|
||
|
||
matching:
|
||
mode: one_to_many # Keep‑Alive : un B peut corréler plusieurs A.
|
||
|
||
buffers:
|
||
# Tailles max des caches en mémoire (en nombre de logs).
|
||
max_http_items: 10000
|
||
max_network_items: 20000
|
||
|
||
ttl:
|
||
# Durée de vie standard d’un log réseau (B) en mémoire. Chaque corrélation
|
||
# réussie avec un A réinitialise ce TTL.
|
||
network_ttl_s: 30
|
||
|
||
inputs:
|
||
description: >
|
||
Deux flux de logs JSON via sockets Unix datagram (SOCK_DGRAM). Chaque datagramme
|
||
contient un JSON complet.
|
||
unix_sockets:
|
||
- name: http_source
|
||
id: A
|
||
description: >
|
||
Source A, logs HTTP applicatifs (Apache, reverse proxy, etc.). Schéma JSON
|
||
variable, champ timestamp obligatoire, headers dynamiques (header_*).
|
||
path: /var/run/logcorrelator/http.socket
|
||
permissions: "0666"
|
||
protocol: unix
|
||
socket_type: dgram
|
||
mode: datagram
|
||
format: json
|
||
framing: message
|
||
max_datagram_bytes: 65535
|
||
retry_on_error: true
|
||
|
||
- name: network_source
|
||
id: B
|
||
description: >
|
||
Source B, logs réseau (métadonnées IP/TCP, JA3/JA4, etc.). Seuls src_ip
|
||
et src_port sont requis pour la corrélation.
|
||
path: /var/run/logcorrelator/network.socket
|
||
permissions: "0666"
|
||
protocol: unix
|
||
socket_type: dgram
|
||
mode: datagram
|
||
format: json
|
||
framing: message
|
||
max_datagram_bytes: 65535
|
||
retry_on_error: true
|
||
|
||
outputs:
|
||
description: >
|
||
Les logs corrélés sont envoyés vers un ou plusieurs sinks (MultiSink).
|
||
sinks:
|
||
file:
|
||
enabled: true
|
||
description: >
|
||
Sink fichier local. Un JSON par ligne. Rotation gérée par logrotate,
|
||
réouverture du fichier sur SIGHUP.
|
||
path: /var/log/logcorrelator/correlated.log
|
||
format: json_lines
|
||
rotate_managed_by: external_logrotate
|
||
clickhouse:
|
||
enabled: true
|
||
description: >
|
||
Sink principal pour l'archivage et l'analyse quasi temps réel. Inserts
|
||
batch asynchrones, drop en cas de saturation.
|
||
dsn: clickhouse://user:pass@host:9000/db
|
||
table: http_logs_raw
|
||
batch_size: 500
|
||
flush_interval_ms: 200
|
||
max_buffer_size: 5000
|
||
drop_on_overflow: true
|
||
async_insert: true
|
||
timeout_ms: 1000
|
||
stdout:
|
||
enabled: false
|
||
level: INFO # DEBUG: tous les logs, INFO: seulement corrélés, ERROR: aucun
|
||
description: >
|
||
Sink optionnel pour les tests/développement.
|
||
Le niveau de log filtre la sortie : DEBUG émet tout (y compris orphelins),
|
||
INFO émet uniquement les logs corrélés, ERROR n'émet rien.
|
||
|
||
correlation:
|
||
description: >
|
||
Corrélation stricte basée sur src_ip + src_port et une fenêtre temporelle
|
||
configurable. Aucun autre champ n’est utilisé pour la décision de corrélation.
|
||
key:
|
||
- src_ip
|
||
- src_port
|
||
time_window:
|
||
value: 1
|
||
unit: s
|
||
description: >
|
||
Fenêtre de temps appliquée aux timestamps de A et B. Si B n’arrive pas dans
|
||
ce délai, A est émis comme orphelin.
|
||
retention_limits:
|
||
max_http_items: 10000
|
||
max_network_items: 20000
|
||
description: >
|
||
Limites des caches. Si max_http_items est atteint, le plus ancien A est
|
||
évincé et émis orphelin. Si max_network_items est atteint, le plus ancien B
|
||
est supprimé silencieusement.
|
||
ttl_management:
|
||
network_ttl_s: 30
|
||
description: >
|
||
TTL des logs réseau. Chaque fois qu’un B est corrélé à un A (Keep‑Alive),
|
||
son TTL est remis à cette valeur.
|
||
timestamp_source:
|
||
apache: field_timestamp
|
||
network: reception_time
|
||
orphan_policy:
|
||
apache_always_emit: true
|
||
network_emit: false
|
||
matching:
|
||
mode: one_to_many
|
||
description: >
|
||
Stratégie 1‑à‑N : un log réseau peut être utilisé pour plusieurs logs HTTP
|
||
successifs tant qu’il n’a pas expiré ni été évincé.
|
||
|
||
schema:
|
||
description: >
|
||
Schémas variables pour A et B. Quelques champs seulement sont obligatoires
|
||
pour la corrélation, les autres sont acceptés sans modification de code.
|
||
source_A:
|
||
description: >
|
||
Logs HTTP applicatifs au format JSON.
|
||
required_fields:
|
||
- name: src_ip
|
||
type: string
|
||
- name: src_port
|
||
type: int
|
||
- name: timestamp
|
||
type: int64
|
||
unit: ns
|
||
optional_fields:
|
||
- name: time
|
||
type: string
|
||
- name: dst_ip
|
||
type: string
|
||
- name: dst_port
|
||
type: int
|
||
- name: method
|
||
type: string
|
||
- name: path
|
||
type: string
|
||
- name: host
|
||
type: string
|
||
- name: http_version
|
||
type: string
|
||
dynamic_fields:
|
||
- pattern: header_*
|
||
target_map: headers
|
||
- pattern: "*"
|
||
target_map: extra
|
||
source_B:
|
||
description: Logs réseau JSON (IP/TCP, JA3/JA4...).
|
||
required_fields:
|
||
- name: src_ip
|
||
type: string
|
||
- name: src_port
|
||
type: int
|
||
optional_fields:
|
||
- name: dst_ip
|
||
type: string
|
||
- name: dst_port
|
||
type: int
|
||
dynamic_fields:
|
||
- pattern: "*"
|
||
target_map: extra
|
||
|
||
normalized_event:
|
||
description: >
|
||
Représentation interne unifiée des événements A/B.
|
||
fields:
|
||
- name: source
|
||
type: enum("A","B")
|
||
- name: timestamp
|
||
type: time.Time
|
||
- name: src_ip
|
||
type: string
|
||
- name: src_port
|
||
type: int
|
||
- name: dst_ip
|
||
type: string
|
||
optional: true
|
||
- name: dst_port
|
||
type: int
|
||
optional: true
|
||
- name: headers
|
||
type: map[string]string
|
||
optional: true
|
||
- name: extra
|
||
type: map[string]any
|
||
|
||
correlated_log:
|
||
description: >
|
||
Structure du log corrélé émis vers les sinks.
|
||
fields:
|
||
- name: timestamp
|
||
type: time.Time
|
||
- name: src_ip
|
||
type: string
|
||
- name: src_port
|
||
type: int
|
||
- name: dst_ip
|
||
type: string
|
||
optional: true
|
||
- name: dst_port
|
||
type: int
|
||
optional: true
|
||
- name: correlated
|
||
type: bool
|
||
- name: orphan_side
|
||
type: string
|
||
- name: "*"
|
||
type: map[string]any
|
||
|
||
clickhouse_schema:
|
||
strategy: external_ddls
|
||
database: mabase_prod
|
||
description: >
|
||
La table ClickHouse est gérée en dehors du service. Deux tables sont utilisées :
|
||
http_logs_raw (table d'ingestion partitionnée par jour) et http_logs (table parsée
|
||
avec extraction explicite des champs). Une vue matérialisée transfère automatiquement
|
||
les données de RAW vers parsée.
|
||
tables:
|
||
- name: http_logs_raw
|
||
description: >
|
||
Table d'ingestion brute. Une seule colonne raw_json contient le log corrélé
|
||
complet sérialisé en JSON. Partitionnée par jour pour optimiser le TTL.
|
||
engine: MergeTree
|
||
partition_by: toDate(ingest_time)
|
||
order_by: ingest_time
|
||
columns:
|
||
- name: raw_json
|
||
type: String
|
||
- name: ingest_time
|
||
type: DateTime
|
||
default: now()
|
||
insert_format: |
|
||
INSERT INTO mabase_prod.http_logs_raw (raw_json) FORMAT JSONEachRow
|
||
{"raw_json":"{...log corrélé sérialisé en JSON...}"}
|
||
|
||
- name: http_logs
|
||
description: >
|
||
Table parsée avec tous les champs extraits explicitement par la vue matérialisée.
|
||
Partitionnée par log_date, optimisée pour les requêtes analytiques.
|
||
engine: MergeTree
|
||
partition_by: log_date
|
||
order_by: (time, src_ip, dst_ip, ja4)
|
||
columns:
|
||
- name: time
|
||
type: DateTime
|
||
- name: log_date
|
||
type: Date
|
||
default: toDate(time)
|
||
- name: src_ip
|
||
type: IPv4
|
||
- name: src_port
|
||
type: UInt16
|
||
- name: dst_ip
|
||
type: IPv4
|
||
- name: dst_port
|
||
type: UInt16
|
||
- name: method
|
||
type: LowCardinality(String)
|
||
- name: scheme
|
||
type: LowCardinality(String)
|
||
- name: host
|
||
type: LowCardinality(String)
|
||
- name: path
|
||
type: String
|
||
- name: query
|
||
type: String
|
||
- name: http_version
|
||
type: LowCardinality(String)
|
||
- name: orphan_side
|
||
type: LowCardinality(String)
|
||
- name: correlated
|
||
type: UInt8
|
||
- name: keepalives
|
||
type: UInt16
|
||
- name: a_timestamp
|
||
type: UInt64
|
||
- name: b_timestamp
|
||
type: UInt64
|
||
- name: conn_id
|
||
type: String
|
||
- name: ip_meta_df
|
||
type: UInt8
|
||
- name: ip_meta_id
|
||
type: UInt32
|
||
- name: ip_meta_total_length
|
||
type: UInt32
|
||
- name: ip_meta_ttl
|
||
type: UInt8
|
||
- name: tcp_meta_options
|
||
type: LowCardinality(String)
|
||
- name: tcp_meta_window_size
|
||
type: UInt32
|
||
- name: syn_to_clienthello_ms
|
||
type: Int32
|
||
- name: tls_version
|
||
type: LowCardinality(String)
|
||
- name: tls_sni
|
||
type: LowCardinality(String)
|
||
- name: ja3
|
||
type: String
|
||
- name: ja3_hash
|
||
type: String
|
||
- name: ja4
|
||
type: String
|
||
- name: header_user_agent
|
||
type: String
|
||
- name: header_accept
|
||
type: String
|
||
- name: header_accept_encoding
|
||
type: String
|
||
- name: header_accept_language
|
||
type: String
|
||
- name: header_x_request_id
|
||
type: String
|
||
- name: header_x_trace_id
|
||
type: String
|
||
- name: header_x_forwarded_for
|
||
type: String
|
||
- name: header_sec_ch_ua
|
||
type: String
|
||
- name: header_sec_ch_ua_mobile
|
||
type: String
|
||
- name: header_sec_ch_ua_platform
|
||
type: String
|
||
- name: header_sec_fetch_dest
|
||
type: String
|
||
- name: header_sec_fetch_mode
|
||
type: String
|
||
- name: header_sec_fetch_site
|
||
type: String
|
||
|
||
- name: mv_http_logs
|
||
type: materialized_view
|
||
description: >
|
||
Vue matérialisée qui transfère les données de http_logs_raw vers http_logs
|
||
en extrayant tous les champs du JSON via JSONExtract* et coalesce pour les
|
||
valeurs par défaut.
|
||
target: mabase_prod.http_logs
|
||
query: |
|
||
SELECT
|
||
-- 1. Temps
|
||
parseDateTimeBestEffort(
|
||
coalesce(JSONExtractString(raw_json, 'time'), '1970-01-01T00:00:00Z')
|
||
) AS time,
|
||
toDate(time) AS log_date,
|
||
|
||
-- 2. Réseau L3/L4
|
||
toIPv4(coalesce(JSONExtractString(raw_json, 'src_ip'), '0.0.0.0')) AS src_ip,
|
||
toUInt16(coalesce(JSONExtractUInt(raw_json, 'src_port'), 0)) AS src_port,
|
||
toIPv4(coalesce(JSONExtractString(raw_json, 'dst_ip'), '0.0.0.0')) AS dst_ip,
|
||
toUInt16(coalesce(JSONExtractUInt(raw_json, 'dst_port'), 0)) AS dst_port,
|
||
|
||
-- 3. HTTP de base
|
||
coalesce(JSONExtractString(raw_json, 'method'), '') AS method,
|
||
coalesce(JSONExtractString(raw_json, 'scheme'), '') AS scheme,
|
||
coalesce(JSONExtractString(raw_json, 'host'), '') AS host,
|
||
coalesce(JSONExtractString(raw_json, 'path'), '') AS path,
|
||
coalesce(JSONExtractString(raw_json, 'query'), '') AS query,
|
||
coalesce(JSONExtractString(raw_json, 'http_version'), '') AS http_version,
|
||
coalesce(JSONExtractString(raw_json, 'orphan_side'), '') AS orphan_side,
|
||
|
||
-- 4. Connexion / corrélation
|
||
toUInt8(coalesce(JSONExtractBool(raw_json, 'correlated'), 0)) AS correlated,
|
||
toUInt16(coalesce(JSONExtractUInt(raw_json, 'keepalives'), 0)) AS keepalives,
|
||
coalesce(JSONExtractUInt(raw_json, 'a_timestamp'), 0) AS a_timestamp,
|
||
coalesce(JSONExtractUInt(raw_json, 'b_timestamp'), 0) AS b_timestamp,
|
||
coalesce(JSONExtractString(raw_json, 'conn_id'), '') AS conn_id,
|
||
|
||
-- 5. IP/TCP
|
||
toUInt8(coalesce(JSONExtractBool(raw_json, 'ip_meta_df'), 0)) AS ip_meta_df,
|
||
coalesce(JSONExtractUInt(raw_json, 'ip_meta_id'), 0) AS ip_meta_id,
|
||
coalesce(JSONExtractUInt(raw_json, 'ip_meta_total_length'), 0) AS ip_meta_total_length,
|
||
coalesce(JSONExtractUInt(raw_json, 'ip_meta_ttl'), 0) AS ip_meta_ttl,
|
||
coalesce(JSONExtractString(raw_json, 'tcp_meta_options'), '') AS tcp_meta_options,
|
||
coalesce(JSONExtractUInt(raw_json, 'tcp_meta_window_size'), 0) AS tcp_meta_window_size,
|
||
toInt32(coalesce(JSONExtractInt(raw_json, 'syn_to_clienthello_ms'), 0)) AS syn_to_clienthello_ms,
|
||
|
||
-- 6. TLS / JA3/JA4
|
||
coalesce(JSONExtractString(raw_json, 'tls_version'), '') AS tls_version,
|
||
coalesce(JSONExtractString(raw_json, 'tls_sni'), '') AS tls_sni,
|
||
coalesce(JSONExtractString(raw_json, 'ja3'), '') AS ja3,
|
||
coalesce(JSONExtractString(raw_json, 'ja3_hash'), '') AS ja3_hash,
|
||
coalesce(JSONExtractString(raw_json, 'ja4'), '') AS ja4,
|
||
|
||
-- 7. Headers HTTP
|
||
coalesce(JSONExtractString(raw_json, 'header_User-Agent'), '') AS header_user_agent,
|
||
coalesce(JSONExtractString(raw_json, 'header_Accept'), '') AS header_accept,
|
||
coalesce(JSONExtractString(raw_json, 'header_Accept-Encoding'), '') AS header_accept_encoding,
|
||
coalesce(JSONExtractString(raw_json, 'header_Accept-Language'), '') AS header_accept_language,
|
||
coalesce(JSONExtractString(raw_json, 'header_X-Request-Id'), '') AS header_x_request_id,
|
||
coalesce(JSONExtractString(raw_json, 'header_X-Trace-Id'), '') AS header_x_trace_id,
|
||
coalesce(JSONExtractString(raw_json, 'header_X-Forwarded-For'), '') AS header_x_forwarded_for,
|
||
coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA'), '') AS header_sec_ch_ua,
|
||
coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Mobile'), '') AS header_sec_ch_ua_mobile,
|
||
coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Platform'), '') AS header_sec_ch_ua_platform,
|
||
coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Dest'), '') AS header_sec_fetch_dest,
|
||
coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Mode'), '') AS header_sec_fetch_mode,
|
||
coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Site'), '') AS header_sec_fetch_site
|
||
FROM mabase_prod.http_logs_raw;
|
||
|
||
users:
|
||
- name: data_writer
|
||
description: Utilisateur pour l'insertion des logs (utilisé par logcorrelator)
|
||
grants:
|
||
- INSERT(raw_json) ON mabase_prod.http_logs_raw
|
||
- SELECT(raw_json) ON mabase_prod.http_logs_raw
|
||
|
||
- name: analyst
|
||
description: Utilisateur pour la lecture des logs parsés (BI, requêtes)
|
||
grants:
|
||
- SELECT ON mabase_prod.http_logs
|
||
|
||
migration:
|
||
description: >
|
||
Script de migration pour transférer les données existantes de l'ancienne
|
||
table http_logs_raw vers la nouvelle structure avec vue matérialisée.
|
||
sql: |
|
||
INSERT INTO mabase_prod.http_logs (raw_json)
|
||
SELECT raw_json
|
||
FROM mabase_prod.http_logs_raw;
|
||
|
||
architecture:
|
||
description: >
|
||
Architecture hexagonale : domaine de corrélation indépendant, ports abstraits
|
||
pour les sources/sinks, adaptateurs pour sockets Unix, fichier et ClickHouse,
|
||
couche application d’orchestration, et modules infra (config, observabilité).
|
||
modules:
|
||
- name: cmd/logcorrelator
|
||
type: entrypoint
|
||
responsibilities:
|
||
- Chargement de la configuration YAML.
|
||
- Initialisation des adaptateurs d'entrée/sortie.
|
||
- Création du CorrelationService.
|
||
- Démarrage de l’orchestrateur.
|
||
- Gestion des signaux (SIGINT, SIGTERM, SIGHUP).
|
||
- name: internal/domain
|
||
type: domain
|
||
responsibilities:
|
||
- Modèles NormalizedEvent et CorrelatedLog.
|
||
- CorrelationService (fenêtre, TTL, buffers bornés, 1‑à‑N, orphelins).
|
||
- name: internal/ports
|
||
type: ports
|
||
responsibilities:
|
||
- EventSource, CorrelatedLogSink, CorrelationProcessor.
|
||
- name: internal/app
|
||
type: application
|
||
responsibilities:
|
||
- Orchestrator : EventSource → CorrelationService → MultiSink.
|
||
- name: internal/adapters/inbound/unixsocket
|
||
type: adapter_inbound
|
||
responsibilities:
|
||
- Lecture Unix datagram (SOCK_DGRAM) et parsing JSON → NormalizedEvent.
|
||
- name: internal/adapters/outbound/file
|
||
type: adapter_outbound
|
||
responsibilities:
|
||
- Écriture JSON lines.
|
||
- Réouverture du fichier sur SIGHUP.
|
||
- name: internal/adapters/outbound/clickhouse
|
||
type: adapter_outbound
|
||
responsibilities:
|
||
- Bufferisation + inserts batch, gestion du drop_on_overflow.
|
||
- name: internal/adapters/outbound/multi
|
||
type: adapter_outbound
|
||
responsibilities:
|
||
- Fan‑out vers plusieurs sinks.
|
||
- name: internal/config
|
||
type: infrastructure
|
||
responsibilities:
|
||
- Chargement/validation de la configuration YAML.
|
||
- name: internal/observability
|
||
type: infrastructure
|
||
responsibilities:
|
||
- Logging interne, métriques (tailles des caches, évictions, erreurs datagram).
|
||
|
||
testing:
|
||
unit:
|
||
description: >
|
||
Tests unitaires table‑driven, couverture cible ≥ 80 %, focale sur la logique
|
||
de corrélation, les caches et les sinks.
|
||
coverage_minimum: 0.8
|
||
focus:
|
||
- CorrelationService (fenêtre, TTL, évictions, 1‑à‑N)
|
||
- Parsing A/B → NormalizedEvent (datagrammes)
|
||
- ClickHouseSink (batching, overflow)
|
||
- FileSink (réouverture sur SIGHUP)
|
||
- MultiSink
|
||
integration:
|
||
description: >
|
||
Tests d’intégration validant le flux complet A+B → corrélation → sinks,
|
||
avec sockets Unix datagram simulées, ClickHouse mocké et scénarios Keep‑Alive.
|
||
|
||
docker:
|
||
description: >
|
||
Build, tests et packaging RPM sont exécutés intégralement dans des conteneurs
|
||
via un multi‑stage build.
|
||
build_pipeline:
|
||
multi_stage: true
|
||
stages:
|
||
- name: test_and_compile
|
||
base: golang:latest
|
||
description: >
|
||
go test ./... (échec si couverture < 80 %), puis compilation d’un binaire
|
||
statique (CGO_ENABLED=0, GOOS=linux, GOARCH=amd64).
|
||
- name: rpm_builder
|
||
base: ruby:alpine
|
||
description: >
|
||
Installation de fpm, git et outils RPM. Génération du changelog RPM à
|
||
partir de l’historique. Construction des .rpm pour les différentes
|
||
distributions.
|
||
- name: output_export
|
||
base: scratch
|
||
description: >
|
||
Étape minimale pour exposer les paquets RPM produits (docker build --output).
|
||
|