diff --git a/sql/mv1.sql b/sql/mv1.sql index 30140c9..2d656e0 100644 --- a/sql/mv1.sql +++ b/sql/mv1.sql @@ -1,163 +1,75 @@ --- ============================================================================== --- 1. ROLLUP 1 MINUTE (Temps Réel) --- ============================================================================== -CREATE TABLE IF NOT EXISTS mabase_prod.agg_traffic_1m ( - minute DateTime, - host LowCardinality(String), - src_ip IPv4, - ja4 String, - header_user_agent String, +-- ============================================================================ +-- PROJET : Moteur de Détection de Menaces HTTP +-- DESCRIPTION : Reconfiguration des vues d'agrégation, nouveauté et scoring. +-- DATE : 2026-03-08 +-- ============================================================================ - -- Métriques Basiques - hits AggregateFunction(count, UInt64), - uniq_paths AggregateFunction(uniq, String), +-- ---------------------------------------------------------------------------- +-- 1. NETTOYAGE DES OBJETS EXISTANTS (Ordre inverse des dépendances) +-- ---------------------------------------------------------------------------- +DROP VIEW IF EXISTS mabase_prod.live_threat_scores; +DROP VIEW IF EXISTS mabase_prod.mv_baseline_update; +DROP VIEW IF EXISTS mabase_prod.mv_novelty; +DROP VIEW IF EXISTS mabase_prod.mv_traffic_1d; +DROP VIEW IF EXISTS mabase_prod.mv_traffic_1h; +DROP VIEW IF EXISTS mabase_prod.mv_traffic_1m; - -- Métriques Réseau (Nouveau : Variance) - avg_syn_to_clienthello_ms AggregateFunction(avg, Int32), - var_syn_to_clienthello_ms AggregateFunction(varPop, Int32), -- Variance pour détecter la robotique (0 = Bot parfait) +-- ---------------------------------------------------------------------------- +-- 2. RECONSTRUCTION DE LA CHAÎNE DE ROLLUP (Aggrégations temporelles) +-- ---------------------------------------------------------------------------- - -- Anomalies Headers (Nouveau) - avg_headers_count AggregateFunction(avg, Float64), -- Moyenne du nombre de headers - spoofing_ua_tls AggregateFunction(countIf, UInt8), -- Incohérence UA vs TLS - spoofing_ua_alpn AggregateFunction(countIf, UInt8), -- Incohérence UA vs ALPN - spoofing_os_ttl AggregateFunction(countIf, UInt8), -- Incohérence OS (Windows) vs TTL Linux (<= 64) - missing_human_headers AggregateFunction(countIf, UInt8), - - -- Anomalies Applicatives - suspicious_methods AggregateFunction(countIf, UInt8), -- Rafales de PUT/DELETE/OPTIONS - suspicious_queries AggregateFunction(countIf, UInt8) -- Payloads suspects (très longs) - -) ENGINE = AggregatingMergeTree() -PARTITION BY toYYYYMM(minute) -ORDER BY (host, ja4, src_ip, minute); - -CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.mv_traffic_1m +-- MV 1 Minute : Transformation des logs bruts en métriques techniques +CREATE MATERIALIZED VIEW mabase_prod.mv_traffic_1m TO mabase_prod.agg_traffic_1m AS SELECT toStartOfMinute(time) AS minute, - host, - src_ip, - ja4, - header_user_agent, - - -- Comptage standard + host, src_ip, src_asn, src_country_code, ja4, ja3_hash, header_user_agent, countState() AS hits, uniqState(path) AS uniq_paths, - - -- Réseau : Moyenne et Variance Population (Variance proche de 0 = Automatisation/Bot) avgState(syn_to_clienthello_ms) AS avg_syn_to_clienthello_ms, varPopState(syn_to_clienthello_ms) AS var_syn_to_clienthello_ms, - - -- Comptage de headers (On compte le nombre de virgules + 1 pour avoir le nombre d'éléments dans le header) - avgState((length(client_headers) - length(replaceAll(client_headers, ',', '')) + 1)) AS avg_headers_count, - - -- Incohérences (Spoofing) - countIfState( - header_user_agent ILIKE '%Chrome%' AND ja4 NOT ILIKE 't13d%' - -- Exemple simplifié : Prétend être Chrome (tls 1.3) mais JA4 annonce autre chose - ) AS spoofing_ua_tls, - - countIfState( - header_user_agent ILIKE '%Chrome%' AND tls_alpn NOT ILIKE '%h2%' - ) AS spoofing_ua_alpn, - - countIfState( - header_user_agent ILIKE '%Windows%' AND ip_meta_ttl <= 64 - ) AS spoofing_os_ttl, - - countIfState( - header_accept_language = '' OR header_sec_ch_ua = '' - ) AS missing_human_headers, - - -- Anomalies de comportement + avgState(toFloat64((length(client_headers) - length(replaceAll(client_headers, ',', ''))) + 1)) AS avg_headers_count, + countIfState((header_user_agent ILIKE '%Chrome%') AND (ja4 NOT ILIKE 't13d%')) AS spoofing_ua_tls, + countIfState((header_user_agent ILIKE '%Chrome%') AND (tls_alpn NOT ILIKE '%h2%')) AS spoofing_ua_alpn, + countIfState((header_user_agent ILIKE '%Windows%') AND (ip_meta_ttl <= 64)) AS spoofing_os_ttl, + countIfState((header_accept_language = '') OR (header_sec_ch_ua = '')) AS missing_human_headers, countIfState(method IN ('PUT', 'DELETE', 'OPTIONS', 'TRACE')) AS suspicious_methods, - countIfState(length(query) > 200 OR match(query, '(%[0-9A-Fa-f]{2}){5,}')) AS suspicious_queries - + countIfState((length(query) > 200) OR match(query, '(%[0-9A-Fa-f]{2}){5,}')) AS suspicious_queries FROM mabase_prod.http_logs -GROUP BY minute, host, src_ip, ja4, header_user_agent; +GROUP BY minute, host, src_ip, src_asn, src_country_code, ja4, ja3_hash, header_user_agent; - --- ============================================================================== --- 2. ROLLUP 1 HEURE (Cascading) --- ============================================================================== - -CREATE TABLE IF NOT EXISTS mabase_prod.agg_traffic_1h ( - hour DateTime, - host LowCardinality(String), - ja4 String, - ja3_hash String, - header_user_agent String, - hits AggregateFunction(count, UInt64), - uniq_paths AggregateFunction(uniq, String), - missing_human_headers AggregateFunction(countIf, UInt8), - uniq_ips AggregateFunction(uniq, IPv4) -) ENGINE = AggregatingMergeTree() -PARTITION BY toYYYYMM(hour) -ORDER BY (host, ja4, ja3_hash, hour); - -CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.mv_traffic_1h +-- MV 1 Heure : Agrégation secondaire (Cascading) +CREATE MATERIALIZED VIEW mabase_prod.mv_traffic_1h TO mabase_prod.agg_traffic_1h AS SELECT toStartOfHour(minute) AS hour, - host, - ja4, - ja3_hash, - header_user_agent, + host, src_country_code, ja4, ja3_hash, header_user_agent, countMergeState(hits) AS hits, uniqMergeState(uniq_paths) AS uniq_paths, countIfMergeState(missing_human_headers) AS missing_human_headers, uniqState(src_ip) AS uniq_ips FROM mabase_prod.agg_traffic_1m -GROUP BY hour, host, ja4, ja3_hash, header_user_agent; +GROUP BY hour, host, src_country_code, ja4, ja3_hash, header_user_agent; --- ============================================================================== --- 3. ROLLUP 1 JOUR (Cascading) --- ============================================================================== - -CREATE TABLE IF NOT EXISTS mabase_prod.agg_traffic_1d ( - day Date, - host LowCardinality(String), - ja4 String, - ja3_hash String, - header_user_agent String, - hits AggregateFunction(count, UInt64), - uniq_ips AggregateFunction(uniq, IPv4), - uniq_paths AggregateFunction(uniq, String), - missing_human_headers AggregateFunction(countIf, UInt8) -) ENGINE = AggregatingMergeTree() -PARTITION BY toYYYYMM(day) -ORDER BY (host, ja4, ja3_hash, day); - -CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.mv_traffic_1d +-- MV 1 Jour : Agrégation tertiaire pour archivage et baseline +CREATE MATERIALIZED VIEW mabase_prod.mv_traffic_1d TO mabase_prod.agg_traffic_1d AS SELECT toDate(hour) AS day, - host, - ja4, - ja3_hash, - header_user_agent, + host, src_country_code, ja4, ja3_hash, header_user_agent, countMergeState(hits) AS hits, uniqMergeState(uniq_ips) AS uniq_ips, uniqMergeState(uniq_paths) AS uniq_paths, countIfMergeState(missing_human_headers) AS missing_human_headers FROM mabase_prod.agg_traffic_1h -GROUP BY day, host, ja4, ja3_hash, header_user_agent; +GROUP BY day, host, src_country_code, ja4, ja3_hash, header_user_agent; --- ============================================================================== --- 4. DÉTECTION DE NOUVEAUTÉ (First Seen) --- ============================================================================== +-- ---------------------------------------------------------------------------- +-- 3. RECONSTRUCTION DES BRIQUES D'INTELLIGENCE (Novelty & Baseline) +-- ---------------------------------------------------------------------------- -CREATE TABLE IF NOT EXISTS mabase_prod.agg_novelty ( - host LowCardinality(String), - ja4 String, - http_fp UInt64, - first_seen AggregateFunction(min, DateTime), - last_seen AggregateFunction(max, DateTime), - total_hits AggregateFunction(count, UInt64) -) ENGINE = AggregatingMergeTree() -ORDER BY (host, ja4, http_fp); - -CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.mv_novelty +-- MV Novelty : Détection de nouvelles empreintes (HTTP + TLS) +CREATE MATERIALIZED VIEW mabase_prod.mv_novelty TO mabase_prod.agg_novelty AS SELECT host, @@ -169,78 +81,46 @@ AS SELECT FROM mabase_prod.http_logs GROUP BY host, ja4, http_fp; --- ============================================================================== --- 5. BASELINE LONG TERME (Refreshable Materialized View) --- ============================================================================== - -SET allow_experimental_refreshable_materialized_view = 1; - -CREATE TABLE IF NOT EXISTS mabase_prod.tbl_baseline_ja4_7d ( - ja4 String, - p99_hits_per_hour Float64 -) ENGINE = MergeTree() -ORDER BY ja4; - -CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.baseline_ja4_7d -REFRESH EVERY 1 DAY +-- MV Baseline : Calcul statistique du trafic normal par JA4 +CREATE MATERIALIZED VIEW mabase_prod.mv_baseline_update TO mabase_prod.tbl_baseline_ja4_7d AS SELECT ja4, - quantile(0.99)(hourly_hits) AS p99_hits_per_hour + quantile(0.99)(hourly_hits) AS p99_hits_per_hour, + avg(hourly_hits) AS avg_hits_per_hour, + now() AS last_update FROM ( SELECT ja4, - hour, + toStartOfHour(minute) as hour, countMerge(hits) AS hourly_hits - FROM mabase_prod.agg_traffic_1h - WHERE hour >= now() - INTERVAL 7 DAY + FROM mabase_prod.agg_traffic_1m + WHERE minute >= now() - INTERVAL 7 DAY GROUP BY ja4, hour ) GROUP BY ja4; +-- ---------------------------------------------------------------------------- +-- 4. VUE DE SCORING FINAL (Verdict Temps Réel) +-- ---------------------------------------------------------------------------- --- ============================================================================== --- vue aggregée --- ============================================================================== - - +CREATE VIEW mabase_prod.live_threat_scores AS SELECT - src_ip, - ja4, - - -- 1. Récupération des valeurs fusionnées (Merge) - countMerge(hits) AS requetes_live, - uniqMerge(uniq_paths) AS urls_scannées, - round(varPopMerge(var_syn_to_clienthello_ms), 2) AS variance_ms, - round(avgMerge(avg_headers_count), 1) AS nb_moyen_headers, - - -- 2. CALCUL DU SCORE DE RISQUE MULTICOUCHE + T1.src_ip, + T1.ja4, + T1.src_asn, + T1.src_country_code, ( - -- A. Poids Comportemental - (countMerge(hits) * 0.1) - + (uniqMerge(uniq_paths) * 5.0) - + if(countIfMerge(suspicious_methods) > 10, 40.0, 0.0) - + if(countIfMerge(suspicious_queries) > 0, 40.0, 0.0) - - -- B. Poids de la Pauvreté Applicative (Moins de 5 headers = Suspect) - + if(avgMerge(avg_headers_count) < 5.0, 30.0, 0.0) - + if(countIfMerge(missing_human_headers) > 0, 40.0, 0.0) - - -- C. Poids de l'Incohérence (Le Spoofing absolu) - + if(countIfMerge(spoofing_ua_tls) > 0, 50.0, 0.0) - + if(countIfMerge(spoofing_ua_alpn) > 0, 40.0, 0.0) - + if(countIfMerge(spoofing_os_ttl) > 0, 50.0, 0.0) - - -- D. Poids du Réseau et de l'Automatisation - -- Un humain a un réseau bruité. Un bot en datacenter a une variance proche de zéro. - + if(varPopMerge(var_syn_to_clienthello_ms) < 2.0 AND countMerge(hits) > 5, 30.0, 0.0) - - ) AS final_threat_score - -FROM mabase_prod.agg_traffic_1m -WHERE minute >= now() - INTERVAL 5 MINUTE -GROUP BY src_ip, ja4 - --- On isole le trafic très dangereux -HAVING final_threat_score > 80 -ORDER BY final_threat_score DESC; + if(countMerge(T1.spoofing_ua_tls) > 0, 40, 0) + + if(countMerge(T1.spoofing_os_ttl) > 0, 40, 0) + + if(varPopMerge(T1.var_syn_to_clienthello_ms) < 1.0, 20, 0) + + if(dateDiff('hour', minMerge(N.first_seen), now()) < 2, 30, 0) + + if(countMerge(T1.hits) > coalesce(B.p99_hits_per_hour * 3, 1000), 50, 0) + ) AS final_score, + countMerge(T1.hits) AS current_hits, + B.p99_hits_per_hour AS historical_baseline +FROM mabase_prod.agg_traffic_1m AS T1 +LEFT JOIN mabase_prod.agg_novelty AS N ON T1.ja4 = N.ja4 AND T1.host = N.host +LEFT JOIN mabase_prod.tbl_baseline_ja4_7d AS B ON T1.ja4 = B.ja4 +WHERE T1.minute >= now() - INTERVAL 5 MINUTE +GROUP BY T1.src_ip, T1.ja4, T1.src_asn, T1.src_country_code, B.p99_hits_per_hour;