feat: split ClickHouse into dual configurable databases (ja4_logs / ja4_processing)

Architecture:
- ja4_logs: raw log ingestion (http_logs_raw, http_logs, mv_http_logs)
- ja4_processing: analytics, aggregation, ML, dictionaries, audit

Configuration (env vars):
- CLICKHOUSE_DB_LOGS (default: ja4_logs)
- CLICKHOUSE_DB_PROCESSING (default: ja4_processing)

Changes:
- SQL migrations (10 files): all mabase_prod refs → ja4_logs or ja4_processing
  with correct cross-database references (MVs, views, dicts)
- deploy_schema.sh: substitutes DB names from env vars at deploy time
- Python shared settings: added CLICKHOUSE_DB_LOGS + CLICKHOUSE_DB_PROCESSING
- Dashboard routes (19 files): replaced ~80 hardcoded mabase_prod refs
  with settings.CLICKHOUSE_DB_LOGS / settings.CLICKHOUSE_DB_PROCESSING
- Bot-detector: DB → CLICKHOUSE_DB_PROCESSING, fetch_rules.py configurable
- Correlator: DSN example updated to ja4_logs
- Docker-compose + .env files: new env vars with defaults
- All documentation updated (14 markdown files)

All tests pass: sentinel 10/10, correlator 67.1%, bot-detector 11, dashboard 20, ja4_common 18

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
toto
2026-04-07 19:10:35 +02:00
parent b6391afbeb
commit 9f3e0621e5
46 changed files with 638 additions and 549 deletions

View File

@ -1,2 +1,2 @@
# correlator configuration — DO NOT COMMIT real values
LOGCORRELATOR_CLICKHOUSE_DSN=clickhouse://data_writer:ChangeMe@clickhouse:9000/mabase_prod
LOGCORRELATOR_CLICKHOUSE_DSN=clickhouse://data_writer:ChangeMe@clickhouse:9000/ja4_logs

View File

@ -232,25 +232,25 @@ http_logs_raw ← inserts du service (raw_json String)
```sql
-- data_writer : INSERT sur http_logs_raw uniquement (compte du service)
GRANT INSERT ON mabase_prod.http_logs_raw TO data_writer;
GRANT SELECT ON mabase_prod.http_logs_raw TO data_writer;
GRANT INSERT ON ja4_logs.http_logs_raw TO data_writer;
GRANT SELECT ON ja4_logs.http_logs_raw TO data_writer;
-- analyst : lecture sur la table parsée
GRANT SELECT ON mabase_prod.http_logs TO analyst;
GRANT SELECT ON ja4_logs.http_logs TO analyst;
```
### Vérification de l'ingestion
```sql
-- Données brutes reçues
SELECT count(*), min(ingest_time), max(ingest_time) FROM mabase_prod.http_logs_raw;
SELECT count(*), min(ingest_time), max(ingest_time) FROM ja4_logs.http_logs_raw;
-- Données parsées par la vue matérialisée
SELECT count(*), min(time), max(time) FROM mabase_prod.http_logs;
SELECT count(*), min(time), max(time) FROM ja4_logs.http_logs;
-- Derniers logs corrélés
SELECT time, src_ip, dst_ip, method, host, path, ja4
FROM mabase_prod.http_logs
FROM ja4_logs.http_logs
WHERE correlated = 1
ORDER BY time DESC LIMIT 10;
```
@ -397,7 +397,7 @@ python3 scripts/test-correlation-advanced.py --all
### ClickHouse : erreurs d'insertion
- **`No such column`** : vérifier que la table `http_logs_raw` utilise la colonne unique `raw_json` (pas de colonnes séparées)
- **`ACCESS_DENIED`** : `GRANT INSERT ON mabase_prod.http_logs_raw TO data_writer;`
- **`ACCESS_DENIED`** : `GRANT INSERT ON ja4_logs.http_logs_raw TO data_writer;`
- Les erreurs de flush sont loggées en ERROR dans les logs du service
### Vue matérialisée vide
@ -405,9 +405,9 @@ python3 scripts/test-correlation-advanced.py --all
Si `http_logs_raw` a des données mais `http_logs` est vide :
```sql
-- Vérifier la vue
SHOW CREATE TABLE mabase_prod.mv_http_logs;
SHOW CREATE TABLE ja4_logs.mv_http_logs;
-- Vérifier les permissions (la MV s'exécute sous le compte du service)
GRANT SELECT ON mabase_prod.http_logs_raw TO data_writer;
GRANT SELECT ON ja4_logs.http_logs_raw TO data_writer;
```
### Sockets Unix : permission denied

View File

@ -533,7 +533,7 @@ schema:
clickhouse_schema:
strategy: external_ddls
database: mabase_prod
database: ja4_processing
description: >
La table ClickHouse est gérée en dehors du service. Le service insère dans une
table RAW avec une seule colonne raw_json contenant le log corrélé complet
@ -556,7 +556,7 @@ clickhouse_schema:
type: DateTime
default: now()
insert_format: |
INSERT INTO mabase_prod.http_logs_raw (raw_json) VALUES
INSERT INTO ja4_processing.http_logs_raw (raw_json) VALUES
('{...log corrélé sérialisé en JSON...}')
notes: >
Le service utilise l'API native clickhouse-go/v2 (PrepareBatch + Append + Send).

View File

@ -25,8 +25,8 @@ outputs:
clickhouse:
enabled: false
dsn: clickhouse://user:pass@localhost:9000/db
table: correlated_logs_http_network
dsn: clickhouse://user:pass@localhost:9000/ja4_logs
table: http_logs_raw
batch_size: 500
flush_interval_ms: 200
max_buffer_size: 5000

View File

@ -96,7 +96,7 @@ Cette section détaille les vues d'agrégation et de détection pour identifier
```sql
-- Quel host est associé à cette IP/JA4 ?
SELECT src_ip, ja4, host, total_hits, unique_paths, user_agent
FROM mabase_prod.view_host_identification
FROM ja4_processing.view_host_identification
WHERE src_ip = '1.2.3.4'
ORDER BY total_hits DESC;
```
@ -105,7 +105,7 @@ ORDER BY total_hits DESC;
```sql
-- Ce JA4 est-il utilisé par plusieurs hosts différents ?
SELECT ja4, hosts, unique_hosts, unique_ips
FROM mabase_prod.view_host_ja4_anomalies
FROM ja4_processing.view_host_ja4_anomalies
HAVING unique_hosts >= 3;
-- Interprétation : 1 JA4 sur 3+ hosts = botnet cloné probable
```
@ -114,7 +114,7 @@ HAVING unique_hosts >= 3;
```sql
-- Cette IP change-t-elle de JA4 fréquemment ?
SELECT src_ip, ja4s, unique_ja4s
FROM mabase_prod.view_host_ip_ja4_rotation
FROM ja4_processing.view_host_ip_ja4_rotation
HAVING unique_ja4s >= 5;
-- Interprétation : 1 IP avec 5+ JA4 différents = fingerprint spoofing
```
@ -131,7 +131,7 @@ HAVING unique_ja4s >= 5;
```sql
-- Détecter les tentatives de brute force sur les login
SELECT window, src_ip, ja4, host, path, attempts, attempts_per_minute
FROM mabase_prod.view_bruteforce_post_detected
FROM ja4_processing.view_bruteforce_post_detected
WHERE host = 'api.example.com'
ORDER BY attempts DESC;
@ -147,7 +147,7 @@ ORDER BY attempts DESC;
```sql
-- Détecter les requêtes avec query params hautement variables
SELECT window, src_ip, ja4, host, path, requests, unique_query_patterns
FROM mabase_prod.view_form_bruteforce_detected
FROM ja4_processing.view_form_bruteforce_detected
HAVING requests >= 20 AND unique_query_patterns >= 10;
-- Interprétation : 20+ requêtes avec 10+ patterns query différents
@ -178,7 +178,7 @@ Exemple : `"Accept,Accept-Encoding,Sec-CH-UA,Sec-Fetch-Dest,User-Agent"`
```sql
-- Navigateurs "modernes" avec headers manquants
SELECT src_ip, ja4, header_user_agent, modern_browser_score, header_count
FROM mabase_prod.view_header_missing_modern_headers
FROM ja4_processing.view_header_missing_modern_headers
WHERE header_user_agent ILIKE '%Chrome%';
-- Threshold : score < 70 pour Chrome/Firefox = suspect
@ -189,7 +189,7 @@ WHERE header_user_agent ILIKE '%Chrome%';
```sql
-- Même User-Agent avec ordre de headers différent
SELECT header_user_agent, ja4, unique_hashes, unique_ips
FROM mabase_prod.view_header_ua_order_mismatch
FROM ja4_processing.view_header_ua_order_mismatch
HAVING unique_hashes > 1;
-- Interprétation : 1 UA avec 2+ ordres de headers = spoofing ou outil custom
@ -199,7 +199,7 @@ HAVING unique_hashes > 1;
```sql
-- Clients avec trop peu de headers
SELECT src_ip, ja4, header_count, header_user_agent
FROM mabase_prod.view_header_minimalist_count
FROM ja4_processing.view_header_minimalist_count
WHERE header_count < 6;
-- Threshold : < 6 headers = bot scripté (curl, Python requests, etc.)
@ -209,7 +209,7 @@ WHERE header_count < 6;
```sql
-- Chrome sans Sec-CH-UA (impossible pour un vrai Chrome)
SELECT src_ip, ja4, header_user_agent
FROM mabase_prod.view_header_sec_ch_missing
FROM ja4_processing.view_header_sec_ch_missing
WHERE header_user_agent ILIKE '%Chrome/%';
```
@ -217,7 +217,7 @@ WHERE header_user_agent ILIKE '%Chrome/%';
```sql
-- Même ordre de headers sur 10+ IPs différentes
SELECT header_order_hash, header_user_agent, unique_ips, total_hits
FROM mabase_prod.view_header_known_bot_signature
FROM ja4_processing.view_header_known_bot_signature
HAVING unique_ips >= 10;
-- Interprétation : 1 signature sur 10+ IPs = cluster de bots clonés
@ -243,7 +243,7 @@ ALPN (Application-Layer Protocol Negotiation) est une extension TLS qui négocie
```sql
-- Clients déclarant h2 mais parlant HTTP/1.1
SELECT src_ip, ja4, declared_alpn, actual_http_version, mismatches, mismatch_pct
FROM mabase_prod.view_alpn_mismatch_detected
FROM ja4_processing.view_alpn_mismatch_detected
HAVING mismatch_pct >= 80;
-- Threshold : ≥5 requêtes avec ≥80% d'incohérence
@ -262,7 +262,7 @@ HAVING mismatch_pct >= 80;
```sql
-- IPs dépassant 50 requêtes/minute
SELECT minute, src_ip, ja4, requests_per_min, unique_paths
FROM mabase_prod.view_rate_limit_exceeded
FROM ja4_processing.view_rate_limit_exceeded
ORDER BY requests_per_min DESC;
-- Threshold : > 50 req/min = trafic automatisé
@ -277,7 +277,7 @@ ORDER BY requests_per_min DESC;
```sql
-- Pics soudains de trafic
SELECT window, src_ip, ja4, burst_count
FROM mabase_prod.view_burst_detected
FROM ja4_processing.view_burst_detected
HAVING burst_count > 20;
-- Threshold : > 20 requêtes en 10 secondes = burst suspect
@ -294,7 +294,7 @@ HAVING burst_count > 20;
```sql
-- Détection de scanning de paths sensibles
SELECT window, src_ip, ja4, host, sensitive_hits, sensitive_ratio
FROM mabase_prod.view_path_scan_detected
FROM ja4_processing.view_path_scan_detected
HAVING sensitive_hits >= 5;
-- Paths surveillés : admin, backup, config, .env, .git, wp-admin,
@ -331,7 +331,7 @@ HAVING sensitive_hits >= 5;
-- Toutes les tentatives d'injection
SELECT window, src_ip, ja4, host, path,
sqli_attempts, xss_attempts, traversal_attempts
FROM mabase_prod.view_payload_attacks_detected
FROM ja4_processing.view_payload_attacks_detected
ORDER BY sqli_attempts DESC, xss_attempts DESC, traversal_attempts DESC;
-- Threshold : ≥1 tentative = alerte (zero tolerance)
@ -351,7 +351,7 @@ Un vrai navigateur a un fingerprint TLS unique. Un bot déployé sur 100 machine
```sql
-- JA4 partagé par 20+ IPs différentes
SELECT ja4, ja3_hash, unique_ips, unique_asns, unique_countries, total_hits
FROM mabase_prod.view_ja4_botnet_suspected
FROM ja4_processing.view_ja4_botnet_suspected
HAVING unique_ips >= 20;
-- Threshold : ≥20 IPs avec le même JA4 = botnet cloné
@ -380,7 +380,7 @@ Mesure le ratio d'événements non-corrélés (orphelins). Un trafic légitime a
```sql
-- Trafic avec >80% d'événements non-corrélés
SELECT hour, src_ip, ja4, host, correlated, orphans, orphan_pct
FROM mabase_prod.view_high_orphan_ratio
FROM ja4_processing.view_high_orphan_ratio
ORDER BY orphan_pct DESC;
-- Threshold : orphan_pct > 80% = trafic suspect
@ -440,13 +440,13 @@ Un IP bloquée par erreur retrouvera un score normal après expiration du TTL.
```sql
WITH threats AS (
SELECT src_ip, ja4, 'bruteforce' AS type, sum(attempts) AS score
FROM mabase_prod.view_bruteforce_post_detected GROUP BY src_ip, ja4
FROM ja4_processing.view_bruteforce_post_detected GROUP BY src_ip, ja4
UNION ALL
SELECT src_ip, ja4, 'path_scan', sum(sensitive_hits)
FROM mabase_prod.view_path_scan_detected GROUP BY src_ip, ja4
FROM ja4_processing.view_path_scan_detected GROUP BY src_ip, ja4
UNION ALL
SELECT src_ip, ja4, 'payload', sum(sqli_attempts + xss_attempts)
FROM mabase_prod.view_payload_attacks_detected GROUP BY src_ip, ja4
FROM ja4_processing.view_payload_attacks_detected GROUP BY src_ip, ja4
)
SELECT src_ip, ja4, sum(score) AS total_score, groupArray(type) AS threat_types
FROM threats
@ -462,7 +462,7 @@ SELECT
host,
countMerge(hits) AS requests,
uniqMerge(uniq_paths) AS unique_paths
FROM mabase_prod.agg_host_ip_ja4_1h
FROM ja4_processing.agg_host_ip_ja4_1h
WHERE src_ip = '1.2.3.4'
AND hour >= now() - INTERVAL 24 HOUR
GROUP BY hour, host
@ -476,7 +476,7 @@ SELECT
any(first_ua) AS user_agent,
groupArray(DISTINCT host) AS hosts,
sum(countMerge(hits)) AS total_requests
FROM mabase_prod.agg_host_ip_ja4_1h
FROM ja4_processing.agg_host_ip_ja4_1h
WHERE hour >= now() - INTERVAL 1 HOUR
GROUP BY ja4
ORDER BY total_requests DESC
@ -496,13 +496,13 @@ clickhouse-client --multiquery < sql/hosts.sql
### Vérification
```sql
-- Compter les enregistrements
SELECT count(*) FROM mabase_prod.agg_host_ip_ja4_1h;
SELECT count(*) FROM mabase_prod.agg_header_fingerprint_1h;
SELECT count(*) FROM ja4_processing.agg_host_ip_ja4_1h;
SELECT count(*) FROM ja4_processing.agg_header_fingerprint_1h;
-- Tester les vues
SELECT * FROM mabase_prod.view_host_identification LIMIT 10;
SELECT * FROM mabase_prod.view_bruteforce_post_detected LIMIT 10;
SELECT * FROM mabase_prod.view_payload_attacks_detected LIMIT 10;
SELECT * FROM ja4_processing.view_host_identification LIMIT 10;
SELECT * FROM ja4_processing.view_bruteforce_post_detected LIMIT 10;
SELECT * FROM ja4_processing.view_payload_attacks_detected LIMIT 10;
```
### Monitoring
@ -510,12 +510,12 @@ SELECT * FROM mabase_prod.view_payload_attacks_detected LIMIT 10;
-- Vues les plus actives (dernière heure)
SELECT
'bruteforce_post' AS view_name, count() AS alerts
FROM mabase_prod.view_bruteforce_post_detected
FROM ja4_processing.view_bruteforce_post_detected
UNION ALL
SELECT 'path_scan', count() FROM mabase_prod.view_path_scan_detected
SELECT 'path_scan', count() FROM ja4_processing.view_path_scan_detected
UNION ALL
SELECT 'payload_attacks', count() FROM mabase_prod.view_payload_attacks_detected
SELECT 'payload_attacks', count() FROM ja4_processing.view_payload_attacks_detected
UNION ALL
SELECT 'ja4_botnet', count() FROM mabase_prod.view_ja4_botnet_suspected
SELECT 'ja4_botnet', count() FROM ja4_processing.view_ja4_botnet_suspected
ORDER BY alerts DESC;
```

View File

@ -1,6 +1,6 @@
DROP TABLE IF EXISTS mabase_prod.ref_bot_networks;
DROP TABLE IF EXISTS ja4_processing.ref_bot_networks;
CREATE TABLE mabase_prod.ref_bot_networks (
CREATE TABLE ja4_processing.ref_bot_networks (
-- On utilise IPv6CIDR car il accepte aussi les IPv4 au format ::ffff:1.2.3.4/120
network IPv6CIDR,
bot_name LowCardinality(String),
@ -11,11 +11,11 @@ ORDER BY (network, bot_name);
-- Création de la table lisant le fichier des IPs
CREATE TABLE mabase_prod.bot_ip (
CREATE TABLE ja4_processing.bot_ip (
ip String
) ENGINE = File(CSV, 'bot_ip.csv');
-- Création de la table lisant le fichier des signatures JA4
CREATE TABLE mabase_prod.bot_ja4 (
CREATE TABLE ja4_processing.bot_ja4 (
ja4 String
) ENGINE = File(CSV, 'bot_ja4.csv');

View File

@ -11,13 +11,14 @@
-- -----------------------------------------------------------------------------
-- Base de données
-- -----------------------------------------------------------------------------
CREATE DATABASE IF NOT EXISTS mabase_prod;
CREATE DATABASE IF NOT EXISTS ja4_logs;
CREATE DATABASE IF NOT EXISTS ja4_processing;
-- -----------------------------------------------------------------------------
-- Table brute : cible directe des inserts du service
-- Le service n'insère que dans cette table (colonne raw_json).
-- -----------------------------------------------------------------------------
CREATE TABLE IF NOT EXISTS mabase_prod.http_logs_raw
CREATE TABLE IF NOT EXISTS ja4_logs.http_logs_raw
(
`raw_json` String CODEC(ZSTD(3)),
`ingest_time` DateTime DEFAULT now()
@ -34,7 +35,7 @@ SETTINGS
-- Table parsée : alimentée automatiquement par la vue matérialisée
-- -----------------------------------------------------------------------------
CREATE TABLE mabase_prod.http_logs
CREATE TABLE ja4_logs.http_logs
(
-- Temporel
`time` DateTime,
@ -118,10 +119,10 @@ SETTINGS
-- -----------------------------------------------------------------------------
-- Vue matérialisée : parse le JSON de http_logs_raw vers http_logs
-- -----------------------------------------------------------------------------
DROP VIEW IF EXISTS mabase_prod.mv_http_logs;
DROP VIEW IF EXISTS ja4_logs.mv_http_logs;
CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.mv_http_logs
TO mabase_prod.http_logs
CREATE MATERIALIZED VIEW IF NOT EXISTS ja4_logs.mv_http_logs
TO ja4_logs.http_logs
AS
SELECT
parseDateTimeBestEffort(coalesce(JSONExtractString(raw_json, 'time'), '1970-01-01T00:00:00Z')) AS time,
@ -133,31 +134,31 @@ SELECT
toUInt16(coalesce(JSONExtractUInt(raw_json, 'dst_port'), 0)) AS dst_port,
dictGetOrDefault(
'mabase_prod.dict_iplocate_asn',
'ja4_processing.dict_iplocate_asn',
'asn',
IPv4ToIPv6(IPv4StringToNum(toString(src_ip))),
toUInt32(0)
) AS src_asn,
dictGetOrDefault(
'mabase_prod.dict_iplocate_asn',
'ja4_processing.dict_iplocate_asn',
'country_code',
IPv4ToIPv6(IPv4StringToNum(toString(src_ip))),
''
) AS src_country_code,
dictGetOrDefault(
'mabase_prod.dict_iplocate_asn',
'ja4_processing.dict_iplocate_asn',
'name',
IPv4ToIPv6(IPv4StringToNum(toString(src_ip))),
''
) AS src_as_name,
dictGetOrDefault(
'mabase_prod.dict_iplocate_asn',
'ja4_processing.dict_iplocate_asn',
'org',
IPv4ToIPv6(IPv4StringToNum(toString(src_ip))),
''
) AS src_org,
dictGetOrDefault(
'mabase_prod.dict_iplocate_asn',
'ja4_processing.dict_iplocate_asn',
'domain',
IPv4ToIPv6(IPv4StringToNum(toString(src_ip))),
''
@ -211,7 +212,7 @@ SELECT
coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Mode'), '') AS header_sec_fetch_mode,
coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Site'), '') AS header_sec_fetch_site
FROM mabase_prod.http_logs_raw;
FROM ja4_logs.http_logs_raw;
-- -----------------------------------------------------------------------------
-- Utilisateurs et permissions
@ -220,15 +221,15 @@ CREATE USER IF NOT EXISTS data_writer IDENTIFIED WITH plaintext_password BY 'Cha
CREATE USER IF NOT EXISTS analyst IDENTIFIED WITH plaintext_password BY 'ChangeMe';
-- data_writer : INSERT uniquement sur la table brute
GRANT INSERT ON mabase_prod.http_logs_raw TO data_writer;
GRANT SELECT ON mabase_prod.http_logs_raw TO data_writer;
GRANT INSERT ON ja4_logs.http_logs_raw TO data_writer;
GRANT SELECT ON ja4_logs.http_logs_raw TO data_writer;
-- analyst : lecture sur la table parsée
GRANT SELECT ON mabase_prod.http_logs TO analyst;
GRANT SELECT ON ja4_logs.http_logs TO analyst;
-- -----------------------------------------------------------------------------
-- Vérifications post-installation
-- -----------------------------------------------------------------------------
-- SELECT count(*), min(ingest_time), max(ingest_time) FROM mabase_prod.http_logs_raw;
-- SELECT count(*), min(time), max(time) FROM mabase_prod.http_logs;
-- SELECT time, src_ip, dst_ip, method, host, path, ja4 FROM mabase_prod.http_logs ORDER BY time DESC LIMIT 10;
-- SELECT count(*), min(ingest_time), max(ingest_time) FROM ja4_logs.http_logs_raw;
-- SELECT count(*), min(time), max(time) FROM ja4_logs.http_logs;
-- SELECT time, src_ip, dst_ip, method, host, path, ja4 FROM ja4_logs.http_logs ORDER BY time DESC LIMIT 10;

View File

@ -1,6 +1,6 @@
DROP DICTIONARY IF EXISTS mabase_prod.dict_iplocate_asn;
DROP DICTIONARY IF EXISTS ja4_processing.dict_iplocate_asn;
CREATE DICTIONARY IF NOT EXISTS mabase_prod.dict_iplocate_asn
CREATE DICTIONARY IF NOT EXISTS ja4_processing.dict_iplocate_asn
(
network String,
asn UInt32,
@ -17,10 +17,10 @@ LIFETIME(MIN 3600 MAX 7200);
-- Suppression si existe pour reconfiguration
DROP TABLE IF EXISTS mabase_prod.ref_bot_networks;
DROP TABLE IF EXISTS ja4_processing.ref_bot_networks;
-- Table optimisée pour le filtrage binaire de CIDR
CREATE TABLE mabase_prod.ref_bot_networks (
CREATE TABLE ja4_processing.ref_bot_networks (
network IPv6CIDR, -- Gère nativement '1.2.3.0/24' et '2001:db8::/32'
bot_name LowCardinality(String),
is_legitimate UInt8, -- 1 = Whitelist, 0 = Blacklist