-- ============================================================================== -- 1. ROLLUP 1 MINUTE (Temps Réel) -- ============================================================================== CREATE TABLE IF NOT EXISTS mabase_prod.agg_traffic_1m ( minute DateTime, host LowCardinality(String), src_ip IPv4, ja4 String, header_user_agent String, -- Métriques Basiques hits AggregateFunction(count, UInt64), uniq_paths AggregateFunction(uniq, String), -- Métriques Réseau (Nouveau : Variance) avg_syn_to_clienthello_ms AggregateFunction(avg, Int32), var_syn_to_clienthello_ms AggregateFunction(varPop, Int32), -- Variance pour détecter la robotique (0 = Bot parfait) -- Anomalies Headers (Nouveau) avg_headers_count AggregateFunction(avg, Float64), -- Moyenne du nombre de headers spoofing_ua_tls AggregateFunction(countIf, UInt8), -- Incohérence UA vs TLS spoofing_ua_alpn AggregateFunction(countIf, UInt8), -- Incohérence UA vs ALPN spoofing_os_ttl AggregateFunction(countIf, UInt8), -- Incohérence OS (Windows) vs TTL Linux (<= 64) missing_human_headers AggregateFunction(countIf, UInt8), -- Anomalies Applicatives suspicious_methods AggregateFunction(countIf, UInt8), -- Rafales de PUT/DELETE/OPTIONS suspicious_queries AggregateFunction(countIf, UInt8) -- Payloads suspects (très longs) ) ENGINE = AggregatingMergeTree() PARTITION BY toYYYYMM(minute) ORDER BY (host, ja4, src_ip, minute); CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.mv_traffic_1m TO mabase_prod.agg_traffic_1m AS SELECT toStartOfMinute(time) AS minute, host, src_ip, ja4, header_user_agent, -- Comptage standard countState() AS hits, uniqState(path) AS uniq_paths, -- Réseau : Moyenne et Variance Population (Variance proche de 0 = Automatisation/Bot) avgState(syn_to_clienthello_ms) AS avg_syn_to_clienthello_ms, varPopState(syn_to_clienthello_ms) AS var_syn_to_clienthello_ms, -- Comptage de headers (On compte le nombre de virgules + 1 pour avoir le nombre d'éléments dans le header) avgState((length(client_headers) - length(replaceAll(client_headers, ',', '')) + 1)) AS avg_headers_count, -- Incohérences (Spoofing) countIfState( header_user_agent ILIKE '%Chrome%' AND ja4 NOT ILIKE 't13d%' -- Exemple simplifié : Prétend être Chrome (tls 1.3) mais JA4 annonce autre chose ) AS spoofing_ua_tls, countIfState( header_user_agent ILIKE '%Chrome%' AND tls_alpn NOT ILIKE '%h2%' ) AS spoofing_ua_alpn, countIfState( header_user_agent ILIKE '%Windows%' AND ip_meta_ttl <= 64 ) AS spoofing_os_ttl, countIfState( header_accept_language = '' OR header_sec_ch_ua = '' ) AS missing_human_headers, -- Anomalies de comportement countIfState(method IN ('PUT', 'DELETE', 'OPTIONS', 'TRACE')) AS suspicious_methods, countIfState(length(query) > 200 OR match(query, '(%[0-9A-Fa-f]{2}){5,}')) AS suspicious_queries FROM mabase_prod.http_logs GROUP BY minute, host, src_ip, ja4, header_user_agent; -- ============================================================================== -- 2. ROLLUP 1 HEURE (Cascading) -- ============================================================================== CREATE TABLE IF NOT EXISTS mabase_prod.agg_traffic_1h ( hour DateTime, host LowCardinality(String), ja4 String, ja3_hash String, header_user_agent String, hits AggregateFunction(count, UInt64), uniq_paths AggregateFunction(uniq, String), missing_human_headers AggregateFunction(countIf, UInt8), uniq_ips AggregateFunction(uniq, IPv4) ) ENGINE = AggregatingMergeTree() PARTITION BY toYYYYMM(hour) ORDER BY (host, ja4, ja3_hash, hour); CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.mv_traffic_1h TO mabase_prod.agg_traffic_1h AS SELECT toStartOfHour(minute) AS hour, host, ja4, ja3_hash, header_user_agent, countMergeState(hits) AS hits, uniqMergeState(uniq_paths) AS uniq_paths, countIfMergeState(missing_human_headers) AS missing_human_headers, uniqState(src_ip) AS uniq_ips FROM mabase_prod.agg_traffic_1m GROUP BY hour, host, ja4, ja3_hash, header_user_agent; -- ============================================================================== -- 3. ROLLUP 1 JOUR (Cascading) -- ============================================================================== CREATE TABLE IF NOT EXISTS mabase_prod.agg_traffic_1d ( day Date, host LowCardinality(String), ja4 String, ja3_hash String, header_user_agent String, hits AggregateFunction(count, UInt64), uniq_ips AggregateFunction(uniq, IPv4), uniq_paths AggregateFunction(uniq, String), missing_human_headers AggregateFunction(countIf, UInt8) ) ENGINE = AggregatingMergeTree() PARTITION BY toYYYYMM(day) ORDER BY (host, ja4, ja3_hash, day); CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.mv_traffic_1d TO mabase_prod.agg_traffic_1d AS SELECT toDate(hour) AS day, host, ja4, ja3_hash, header_user_agent, countMergeState(hits) AS hits, uniqMergeState(uniq_ips) AS uniq_ips, uniqMergeState(uniq_paths) AS uniq_paths, countIfMergeState(missing_human_headers) AS missing_human_headers FROM mabase_prod.agg_traffic_1h GROUP BY day, host, ja4, ja3_hash, header_user_agent; -- ============================================================================== -- 4. DÉTECTION DE NOUVEAUTÉ (First Seen) -- ============================================================================== CREATE TABLE IF NOT EXISTS mabase_prod.agg_novelty ( host LowCardinality(String), ja4 String, http_fp UInt64, first_seen AggregateFunction(min, DateTime), last_seen AggregateFunction(max, DateTime), total_hits AggregateFunction(count, UInt64) ) ENGINE = AggregatingMergeTree() ORDER BY (host, ja4, http_fp); CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.mv_novelty TO mabase_prod.agg_novelty AS SELECT host, ja4, cityHash64(client_headers) AS http_fp, minState(time) AS first_seen, maxState(time) AS last_seen, countState() AS total_hits FROM mabase_prod.http_logs GROUP BY host, ja4, http_fp; -- ============================================================================== -- 5. BASELINE LONG TERME (Refreshable Materialized View) -- ============================================================================== SET allow_experimental_refreshable_materialized_view = 1; CREATE TABLE IF NOT EXISTS mabase_prod.tbl_baseline_ja4_7d ( ja4 String, p99_hits_per_hour Float64 ) ENGINE = MergeTree() ORDER BY ja4; CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.baseline_ja4_7d REFRESH EVERY 1 DAY TO mabase_prod.tbl_baseline_ja4_7d AS SELECT ja4, quantile(0.99)(hourly_hits) AS p99_hits_per_hour FROM ( SELECT ja4, hour, countMerge(hits) AS hourly_hits FROM mabase_prod.agg_traffic_1h WHERE hour >= now() - INTERVAL 7 DAY GROUP BY ja4, hour ) GROUP BY ja4; -- ============================================================================== -- vue aggregée -- ============================================================================== SELECT src_ip, ja4, -- 1. Récupération des valeurs fusionnées (Merge) countMerge(hits) AS requetes_live, uniqMerge(uniq_paths) AS urls_scannées, round(varPopMerge(var_syn_to_clienthello_ms), 2) AS variance_ms, round(avgMerge(avg_headers_count), 1) AS nb_moyen_headers, -- 2. CALCUL DU SCORE DE RISQUE MULTICOUCHE ( -- A. Poids Comportemental (countMerge(hits) * 0.1) + (uniqMerge(uniq_paths) * 5.0) + if(countIfMerge(suspicious_methods) > 10, 40.0, 0.0) + if(countIfMerge(suspicious_queries) > 0, 40.0, 0.0) -- B. Poids de la Pauvreté Applicative (Moins de 5 headers = Suspect) + if(avgMerge(avg_headers_count) < 5.0, 30.0, 0.0) + if(countIfMerge(missing_human_headers) > 0, 40.0, 0.0) -- C. Poids de l'Incohérence (Le Spoofing absolu) + if(countIfMerge(spoofing_ua_tls) > 0, 50.0, 0.0) + if(countIfMerge(spoofing_ua_alpn) > 0, 40.0, 0.0) + if(countIfMerge(spoofing_os_ttl) > 0, 50.0, 0.0) -- D. Poids du Réseau et de l'Automatisation -- Un humain a un réseau bruité. Un bot en datacenter a une variance proche de zéro. + if(varPopMerge(var_syn_to_clienthello_ms) < 2.0 AND countMerge(hits) > 5, 30.0, 0.0) ) AS final_threat_score FROM mabase_prod.agg_traffic_1m WHERE minute >= now() - INTERVAL 5 MINUTE GROUP BY src_ip, ja4 -- On isole le trafic très dangereux HAVING final_threat_score > 80 ORDER BY final_threat_score DESC;