From 58b23ccc1e8c71c075232433415e359ffc137b9b Mon Sep 17 00:00:00 2001 From: toto Date: Tue, 3 Mar 2026 11:53:13 +0100 Subject: [PATCH] docs: update ClickHouse schema (http_logs_raw + http_logs) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - README.md: documenter les deux tables (raw + enrichie) - architecture.yml: décrire le schema complet avec colonnes matérialisées - Table http_logs_raw: ingestion JSON brut (colonne raw_json unique) - Table http_logs: extraction des champs via DEFAULT JSONExtract* Co-authored-by: Qwen-Coder --- README.md | 85 ++++++++++++++++++++++---- architecture.yml | 154 +++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 203 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index 112c2c8..48e66c1 100644 --- a/README.md +++ b/README.md @@ -194,18 +194,81 @@ Tous les champs des sources A et B sont fusionnés au même niveau. Les champs d ## Schema ClickHouse +Le service utilise deux tables ClickHouse : + +### Table brute (`http_logs_raw`) + +Table d'ingestion qui stocke le log corrélé brut au format JSON : + ```sql -CREATE TABLE correlated_logs_http_network ( - timestamp DateTime64(9), - src_ip String, - src_port UInt32, - dst_ip String, - dst_port UInt32, - correlated UInt8, - orphan_side String, - fields JSON -) ENGINE = MergeTree() -ORDER BY (timestamp, src_ip, src_port); +CREATE TABLE http_logs_raw +( + raw_json String +) +ENGINE = MergeTree +ORDER BY tuple(); +``` + +**Format d'insertion :** Le service envoie chaque log corrélé sérialisé en JSON dans la colonne `raw_json` : + +```sql +INSERT INTO http_logs_raw (raw_json) FORMAT JSONEachRow +{"raw_json":"{\"timestamp\":\"2024-01-01T12:00:00Z\",\"src_ip\":\"192.168.1.1\",\"correlated\":true,...}"} +``` + +### Table enrichie (`http_logs`) + +Vue matérialisée qui extrait les champs du JSON pour l'analyse : + +```sql +CREATE TABLE http_logs +( + raw_json String, + + -- champs de base + time_str String DEFAULT JSONExtractString(raw_json, 'time'), + timestamp_str String DEFAULT JSONExtractString(raw_json, 'timestamp'), + time DateTime DEFAULT parseDateTimeBestEffort(time_str), + log_date Date DEFAULT toDate(time), + + src_ip IPv4 DEFAULT toIPv4(JSONExtractString(raw_json, 'src_ip')), + src_port UInt16 DEFAULT toUInt16(JSONExtractUInt(raw_json, 'src_port')), + dst_ip IPv4 DEFAULT toIPv4(JSONExtractString(raw_json, 'dst_ip')), + dst_port UInt16 DEFAULT toUInt16(JSONExtractUInt(raw_json, 'dst_port')), + + correlated UInt8 DEFAULT JSONExtractBool(raw_json, 'correlated'), + keepalives UInt16 DEFAULT toUInt16(JSONExtractUInt(raw_json, 'keepalives')), + method LowCardinality(String) DEFAULT JSONExtractString(raw_json, 'method'), + scheme LowCardinality(String) DEFAULT JSONExtractString(raw_json, 'scheme'), + host LowCardinality(String) DEFAULT JSONExtractString(raw_json, 'host'), + path String DEFAULT JSONExtractString(raw_json, 'path'), + query String DEFAULT JSONExtractString(raw_json, 'query'), + http_version LowCardinality(String) DEFAULT JSONExtractString(raw_json, 'http_version'), + orphan_side LowCardinality(String) DEFAULT JSONExtractString(raw_json, 'orphan_side'), + + -- champs « presque toujours là » + a_timestamp UInt64 DEFAULT JSONExtractUInt(raw_json, 'a_timestamp'), + b_timestamp UInt64 DEFAULT JSONExtractUInt(raw_json, 'b_timestamp'), + conn_id String DEFAULT JSONExtractString(raw_json, 'conn_id'), + ip_meta_df UInt8 DEFAULT JSONExtractBool(raw_json, 'ip_meta_df'), + ip_meta_id UInt32 DEFAULT JSONExtractUInt(raw_json, 'ip_meta_id'), + ip_meta_total_length UInt32 DEFAULT JSONExtractUInt(raw_json, 'ip_meta_total_length'), + ip_meta_ttl UInt8 DEFAULT JSONExtractUInt(raw_json, 'ip_meta_ttl'), + tcp_meta_options LowCardinality(String) DEFAULT JSONExtractString(raw_json, 'tcp_meta_options'), + tcp_meta_window_size UInt32 DEFAULT JSONExtractUInt(raw_json, 'tcp_meta_window_size'), + syn_to_clienthello_ms Int32 DEFAULT toInt32(JSONExtractInt(raw_json, 'syn_to_clienthello_ms')), + tls_version LowCardinality(String) DEFAULT JSONExtractString(raw_json, 'tls_version'), + tls_sni LowCardinality(String) DEFAULT JSONExtractString(raw_json, 'tls_sni'), + ja3 String DEFAULT JSONExtractString(raw_json, 'ja3'), + ja3_hash String DEFAULT JSONExtractString(raw_json, 'ja3_hash'), + ja4 String DEFAULT JSONExtractString(raw_json, 'ja4'), + + -- tous les autres champs JSON (headers dynamiques etc.) + extra JSON DEFAULT raw_json +) +ENGINE = MergeTree +PARTITION BY toYYYYMM(log_date) +ORDER BY (log_date, dst_ip, src_ip, time); ``` ## Tests diff --git a/architecture.yml b/architecture.yml index 91cbb5b..9f7024a 100644 --- a/architecture.yml +++ b/architecture.yml @@ -160,7 +160,7 @@ config: clickhouse: enabled: true dsn: clickhouse://user:pass@localhost:9000/db - table: correlated_logs_http_network + table: http_logs_raw batch_size: 500 flush_interval_ms: 200 max_buffer_size: 5000 @@ -246,10 +246,10 @@ outputs: clickhouse: enabled: true description: > - Sink principal pour l’archivage et l’analyse quasi temps réel. Inserts + Sink principal pour l'archivage et l'analyse quasi temps réel. Inserts batch asynchrones, drop en cas de saturation. dsn: clickhouse://user:pass@host:9000/db - table: correlated_logs_http_network + table: http_logs_raw batch_size: 500 flush_interval_ms: 200 max_buffer_size: 5000 @@ -402,28 +402,132 @@ schema: clickhouse_schema: strategy: external_ddls description: > - La table ClickHouse est gérée en dehors du service. logcorrelator remplit - les colonnes connues et met NULL si un champ manque. Tous les champs fusionnés - sont exposés dans une colonne JSON (fields). - base_columns: - - name: timestamp - type: DateTime64(9) - - name: src_ip - type: String - - name: src_port - type: UInt32 - - name: dst_ip - type: String - - name: dst_port - type: UInt32 - - name: correlated - type: UInt8 - - name: orphan_side - type: String - - name: fields - type: JSON - dynamic_fields: - mode: map_or_additional_columns + La table ClickHouse est gérée en dehors du service. Deux tables sont utilisées : + http_logs_raw (table d'ingestion avec le JSON brut) et http_logs (table enrichie + avec extraction des champs via des colonnes matérialisées). + tables: + - name: http_logs_raw + description: > + Table d'ingestion brute. Une seule colonne raw_json contient le log corrélé + complet sérialisé en JSON. Le service insère via INSERT INTO http_logs_raw (raw_json). + engine: MergeTree + order_by: tuple() + columns: + - name: raw_json + type: String + insert_format: > + INSERT INTO http_logs_raw (raw_json) FORMAT JSONEachRow + {"raw_json":"{...log corrélé sérialisé en JSON...}"} + + - name: http_logs + description: > + Table enrichie avec extraction des champs du JSON brut via des expressions DEFAULT. + Partitionnée par mois, optimisée pour les requêtes analytiques. + engine: MergeTree + partition_by: toYYYYMM(log_date) + order_by: (log_date, dst_ip, src_ip, time) + columns: + - name: raw_json + type: String + - name: time_str + type: String + default: JSONExtractString(raw_json, 'time') + - name: timestamp_str + type: String + default: JSONExtractString(raw_json, 'timestamp') + - name: time + type: DateTime + default: parseDateTimeBestEffort(time_str) + - name: log_date + type: Date + default: toDate(time) + - name: src_ip + type: IPv4 + default: toIPv4(JSONExtractString(raw_json, 'src_ip')) + - name: src_port + type: UInt16 + default: toUInt16(JSONExtractUInt(raw_json, 'src_port')) + - name: dst_ip + type: IPv4 + default: toIPv4(JSONExtractString(raw_json, 'dst_ip')) + - name: dst_port + type: UInt16 + default: toUInt16(JSONExtractUInt(raw_json, 'dst_port')) + - name: correlated + type: UInt8 + default: JSONExtractBool(raw_json, 'correlated') + - name: keepalives + type: UInt16 + default: toUInt16(JSONExtractUInt(raw_json, 'keepalives')) + - name: method + type: LowCardinality(String) + default: JSONExtractString(raw_json, 'method') + - name: scheme + type: LowCardinality(String) + default: JSONExtractString(raw_json, 'scheme') + - name: host + type: LowCardinality(String) + default: JSONExtractString(raw_json, 'host') + - name: path + type: String + default: JSONExtractString(raw_json, 'path') + - name: query + type: String + default: JSONExtractString(raw_json, 'query') + - name: http_version + type: LowCardinality(String) + default: JSONExtractString(raw_json, 'http_version') + - name: orphan_side + type: LowCardinality(String) + default: JSONExtractString(raw_json, 'orphan_side') + - name: a_timestamp + type: UInt64 + default: JSONExtractUInt(raw_json, 'a_timestamp') + - name: b_timestamp + type: UInt64 + default: JSONExtractUInt(raw_json, 'b_timestamp') + - name: conn_id + type: String + default: JSONExtractString(raw_json, 'conn_id') + - name: ip_meta_df + type: UInt8 + default: JSONExtractBool(raw_json, 'ip_meta_df') + - name: ip_meta_id + type: UInt32 + default: JSONExtractUInt(raw_json, 'ip_meta_id') + - name: ip_meta_total_length + type: UInt32 + default: JSONExtractUInt(raw_json, 'ip_meta_total_length') + - name: ip_meta_ttl + type: UInt8 + default: JSONExtractUInt(raw_json, 'ip_meta_ttl') + - name: tcp_meta_options + type: LowCardinality(String) + default: JSONExtractString(raw_json, 'tcp_meta_options') + - name: tcp_meta_window_size + type: UInt32 + default: JSONExtractUInt(raw_json, 'tcp_meta_window_size') + - name: syn_to_clienthello_ms + type: Int32 + default: toInt32(JSONExtractInt(raw_json, 'syn_to_clienthello_ms')) + - name: tls_version + type: LowCardinality(String) + default: JSONExtractString(raw_json, 'tls_version') + - name: tls_sni + type: LowCardinality(String) + default: JSONExtractString(raw_json, 'tls_sni') + - name: ja3 + type: String + default: JSONExtractString(raw_json, 'ja3') + - name: ja3_hash + type: String + default: JSONExtractString(raw_json, 'ja3_hash') + - name: ja4 + type: String + default: JSONExtractString(raw_json, 'ja4') + - name: extra + type: JSON + default: raw_json architecture: description: >