diff --git a/sql/init.sql b/sql/init.sql index 60e1bf1..524226d 100644 --- a/sql/init.sql +++ b/sql/init.sql @@ -30,6 +30,7 @@ SETTINGS index_granularity = 8192; -- ----------------------------------------------------------------------------- -- Table parsée : alimentée automatiquement par la vue matérialisée -- ----------------------------------------------------------------------------- + CREATE TABLE IF NOT EXISTS mabase_prod.http_logs ( -- Temporel @@ -42,6 +43,13 @@ CREATE TABLE IF NOT EXISTS mabase_prod.http_logs `dst_ip` IPv4, `dst_port` UInt16, + -- Enrichissement IPLocate + `src_asn` UInt32, + `src_country_code` LowCardinality(String), + `src_as_name` LowCardinality(String), + `src_org` LowCardinality(String), + `src_domain` LowCardinality(String), + -- HTTP `method` LowCardinality(String), `scheme` LowCardinality(String), @@ -101,62 +109,103 @@ PARTITION BY log_date ORDER BY (time, src_ip, dst_ip, ja4) SETTINGS index_granularity = 8192; + -- ----------------------------------------------------------------------------- -- Vue matérialisée : parse le JSON de http_logs_raw vers http_logs -- ----------------------------------------------------------------------------- -DROP TABLE IF EXISTS mabase_prod.mv_http_logs; +DROP VIEW IF EXISTS mabase_prod.mv_http_logs; CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.mv_http_logs TO mabase_prod.http_logs -AS SELECT +AS +SELECT parseDateTimeBestEffort(coalesce(JSONExtractString(raw_json, 'time'), '1970-01-01T00:00:00Z')) AS time, - toDate(time) AS log_date, - toIPv4(coalesce(JSONExtractString(raw_json, 'src_ip'), '0.0.0.0')) AS src_ip, - toUInt16(coalesce(JSONExtractUInt(raw_json, 'src_port'), 0)) AS src_port, - toIPv4(coalesce(JSONExtractString(raw_json, 'dst_ip'), '0.0.0.0')) AS dst_ip, - toUInt16(coalesce(JSONExtractUInt(raw_json, 'dst_port'), 0)) AS dst_port, - coalesce(JSONExtractString(raw_json, 'method'), '') AS method, - coalesce(JSONExtractString(raw_json, 'scheme'), '') AS scheme, - coalesce(JSONExtractString(raw_json, 'host'), '') AS host, - coalesce(JSONExtractString(raw_json, 'path'), '') AS path, - coalesce(JSONExtractString(raw_json, 'query'), '') AS query, - coalesce(JSONExtractString(raw_json, 'http_version'), '') AS http_version, - coalesce(JSONExtractString(raw_json, 'orphan_side'), '') AS orphan_side, - toUInt8(coalesce(JSONExtractBool(raw_json, 'correlated'), 0)) AS correlated, - toUInt16(coalesce(JSONExtractUInt(raw_json, 'keepalives'), 0)) AS keepalives, - coalesce(JSONExtractUInt(raw_json, 'a_timestamp'), 0) AS a_timestamp, - coalesce(JSONExtractUInt(raw_json, 'b_timestamp'), 0) AS b_timestamp, - coalesce(JSONExtractString(raw_json, 'conn_id'), '') AS conn_id, - toUInt8(coalesce(JSONExtractBool(raw_json, 'ip_meta_df'), 0)) AS ip_meta_df, - toUInt16(coalesce(JSONExtractUInt(raw_json, 'ip_meta_id'), 0)) AS ip_meta_id, - toUInt16(coalesce(JSONExtractUInt(raw_json, 'ip_meta_total_length'), 0)) AS ip_meta_total_length, - toUInt8(coalesce(JSONExtractUInt(raw_json, 'ip_meta_ttl'), 0)) AS ip_meta_ttl, - coalesce(JSONExtractString(raw_json, 'tcp_meta_options'), '') AS tcp_meta_options, - toUInt32(coalesce(JSONExtractUInt(raw_json, 'tcp_meta_window_size'), 0)) AS tcp_meta_window_size, - toUInt16(coalesce(JSONExtractUInt(raw_json, 'tcp_meta_mss'), 0)) AS tcp_meta_mss, - toUInt8(coalesce(JSONExtractUInt(raw_json, 'tcp_meta_window_scale'), 0)) AS tcp_meta_window_scale, - toInt32(coalesce(JSONExtractInt(raw_json, 'syn_to_clienthello_ms'), 0)) AS syn_to_clienthello_ms, - coalesce(JSONExtractString(raw_json, 'tls_version'), '') AS tls_version, - coalesce(JSONExtractString(raw_json, 'tls_sni'), '') AS tls_sni, - coalesce(JSONExtractString(raw_json, 'tls_alpn'), '') AS tls_alpn, - coalesce(JSONExtractString(raw_json, 'ja3'), '') AS ja3, - coalesce(JSONExtractString(raw_json, 'ja3_hash'), '') AS ja3_hash, - coalesce(JSONExtractString(raw_json, 'ja4'), '') AS ja4, - coalesce(JSONExtractString(raw_json, 'client_headers'), '') AS client_headers, - coalesce(JSONExtractString(raw_json, 'header_User-Agent'), '') AS header_user_agent, - coalesce(JSONExtractString(raw_json, 'header_Accept'), '') AS header_accept, - coalesce(JSONExtractString(raw_json, 'header_Accept-Encoding'), '') AS header_accept_encoding, - coalesce(JSONExtractString(raw_json, 'header_Accept-Language'), '') AS header_accept_language, - coalesce(JSONExtractString(raw_json, 'header_Content-Type'), '') AS header_content_type, - coalesce(JSONExtractString(raw_json, 'header_X-Request-Id'), '') AS header_x_request_id, - coalesce(JSONExtractString(raw_json, 'header_X-Trace-Id'), '') AS header_x_trace_id, - coalesce(JSONExtractString(raw_json, 'header_X-Forwarded-For'), '') AS header_x_forwarded_for, - coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA'), '') AS header_sec_ch_ua, - coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Mobile'), '') AS header_sec_ch_ua_mobile, - coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Platform'), '') AS header_sec_ch_ua_platform, - coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Dest'), '') AS header_sec_fetch_dest, - coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Mode'), '') AS header_sec_fetch_mode, - coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Site'), '') AS header_sec_fetch_site + toDate(time) AS log_date, + + toIPv4(coalesce(JSONExtractString(raw_json, 'src_ip'), '0.0.0.0')) AS src_ip, + toUInt16(coalesce(JSONExtractUInt(raw_json, 'src_port'), 0)) AS src_port, + toIPv4(coalesce(JSONExtractString(raw_json, 'dst_ip'), '0.0.0.0')) AS dst_ip, + toUInt16(coalesce(JSONExtractUInt(raw_json, 'dst_port'), 0)) AS dst_port, + + dictGetOrDefault( + 'mabase_prod.dict_iplocate_asn', + 'asn', + IPv4ToIPv6(IPv4StringToNum(toString(src_ip))), + toUInt32(0) + ) AS src_asn, + dictGetOrDefault( + 'mabase_prod.dict_iplocate_asn', + 'country_code', + IPv4ToIPv6(IPv4StringToNum(toString(src_ip))), + '' + ) AS src_country_code, + dictGetOrDefault( + 'mabase_prod.dict_iplocate_asn', + 'name', + IPv4ToIPv6(IPv4StringToNum(toString(src_ip))), + '' + ) AS src_as_name, + dictGetOrDefault( + 'mabase_prod.dict_iplocate_asn', + 'org', + IPv4ToIPv6(IPv4StringToNum(toString(src_ip))), + '' + ) AS src_org, + dictGetOrDefault( + 'mabase_prod.dict_iplocate_asn', + 'domain', + IPv4ToIPv6(IPv4StringToNum(toString(src_ip))), + '' + ) AS src_domain, + + coalesce(JSONExtractString(raw_json, 'method'), '') AS method, + coalesce(JSONExtractString(raw_json, 'scheme'), '') AS scheme, + coalesce(JSONExtractString(raw_json, 'host'), '') AS host, + coalesce(JSONExtractString(raw_json, 'path'), '') AS path, + coalesce(JSONExtractString(raw_json, 'query'), '') AS query, + coalesce(JSONExtractString(raw_json, 'http_version'), '') AS http_version, + + coalesce(JSONExtractString(raw_json, 'orphan_side'), '') AS orphan_side, + toUInt8(coalesce(JSONExtractBool(raw_json, 'correlated'), 0)) AS correlated, + toUInt16(coalesce(JSONExtractUInt(raw_json, 'keepalives'), 0)) AS keepalives, + coalesce(JSONExtractUInt(raw_json, 'a_timestamp'), 0) AS a_timestamp, + coalesce(JSONExtractUInt(raw_json, 'b_timestamp'), 0) AS b_timestamp, + coalesce(JSONExtractString(raw_json, 'conn_id'), '') AS conn_id, + + toUInt8(coalesce(JSONExtractBool(raw_json, 'ip_meta_df'), 0)) AS ip_meta_df, + toUInt16(coalesce(JSONExtractUInt(raw_json, 'ip_meta_id'), 0)) AS ip_meta_id, + toUInt16(coalesce(JSONExtractUInt(raw_json, 'ip_meta_total_length'), 0)) AS ip_meta_total_length, + toUInt8(coalesce(JSONExtractUInt(raw_json, 'ip_meta_ttl'), 0)) AS ip_meta_ttl, + + coalesce(JSONExtractString(raw_json, 'tcp_meta_options'), '') AS tcp_meta_options, + toUInt32(coalesce(JSONExtractUInt(raw_json, 'tcp_meta_window_size'), 0)) AS tcp_meta_window_size, + toUInt16(coalesce(JSONExtractUInt(raw_json, 'tcp_meta_mss'), 0)) AS tcp_meta_mss, + toUInt8(coalesce(JSONExtractUInt(raw_json, 'tcp_meta_window_scale'), 0)) AS tcp_meta_window_scale, + toInt32(coalesce(JSONExtractInt(raw_json, 'syn_to_clienthello_ms'), 0)) AS syn_to_clienthello_ms, + + coalesce(JSONExtractString(raw_json, 'tls_version'), '') AS tls_version, + coalesce(JSONExtractString(raw_json, 'tls_sni'), '') AS tls_sni, + coalesce(JSONExtractString(raw_json, 'tls_alpn'), '') AS tls_alpn, + coalesce(JSONExtractString(raw_json, 'ja3'), '') AS ja3, + coalesce(JSONExtractString(raw_json, 'ja3_hash'), '') AS ja3_hash, + coalesce(JSONExtractString(raw_json, 'ja4'), '') AS ja4, + + coalesce(JSONExtractString(raw_json, 'client_headers'), '') AS client_headers, + coalesce(JSONExtractString(raw_json, 'header_User-Agent'), '') AS header_user_agent, + coalesce(JSONExtractString(raw_json, 'header_Accept'), '') AS header_accept, + coalesce(JSONExtractString(raw_json, 'header_Accept-Encoding'), '') AS header_accept_encoding, + coalesce(JSONExtractString(raw_json, 'header_Accept-Language'), '') AS header_accept_language, + coalesce(JSONExtractString(raw_json, 'header_Content-Type'), '') AS header_content_type, + coalesce(JSONExtractString(raw_json, 'header_X-Request-Id'), '') AS header_x_request_id, + coalesce(JSONExtractString(raw_json, 'header_X-Trace-Id'), '') AS header_x_trace_id, + coalesce(JSONExtractString(raw_json, 'header_X-Forwarded-For'), '') AS header_x_forwarded_for, + coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA'), '') AS header_sec_ch_ua, + coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Mobile'), '') AS header_sec_ch_ua_mobile, + coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Platform'), '') AS header_sec_ch_ua_platform, + coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Dest'), '') AS header_sec_fetch_dest, + coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Mode'), '') AS header_sec_fetch_mode, + coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Site'), '') AS header_sec_fetch_site + FROM mabase_prod.http_logs_raw; -- ----------------------------------------------------------------------------- diff --git a/sql/iptoasn.sql b/sql/iptoasn.sql new file mode 100644 index 0000000..ba00c37 --- /dev/null +++ b/sql/iptoasn.sql @@ -0,0 +1,15 @@ +DROP DICTIONARY IF EXISTS mabase_prod.dict_iplocate_asn; + +CREATE DICTIONARY IF NOT EXISTS mabase_prod.dict_iplocate_asn +( + network String, + asn UInt32, + country_code String, + name String, + org String, + domain String +) +PRIMARY KEY network +SOURCE(FILE(path '/var/lib/clickhouse/user_files/iplocate-ip-to-asn.csv' format 'CSVWithNames')) +LAYOUT(IP_TRIE()) +LIFETIME(MIN 3600 MAX 7200); diff --git a/sql/mv1.sql b/sql/mv1.sql new file mode 100644 index 0000000..30140c9 --- /dev/null +++ b/sql/mv1.sql @@ -0,0 +1,246 @@ +-- ============================================================================== +-- 1. ROLLUP 1 MINUTE (Temps Réel) +-- ============================================================================== +CREATE TABLE IF NOT EXISTS mabase_prod.agg_traffic_1m ( + minute DateTime, + host LowCardinality(String), + src_ip IPv4, + ja4 String, + header_user_agent String, + + -- Métriques Basiques + hits AggregateFunction(count, UInt64), + uniq_paths AggregateFunction(uniq, String), + + -- Métriques Réseau (Nouveau : Variance) + avg_syn_to_clienthello_ms AggregateFunction(avg, Int32), + var_syn_to_clienthello_ms AggregateFunction(varPop, Int32), -- Variance pour détecter la robotique (0 = Bot parfait) + + -- Anomalies Headers (Nouveau) + avg_headers_count AggregateFunction(avg, Float64), -- Moyenne du nombre de headers + spoofing_ua_tls AggregateFunction(countIf, UInt8), -- Incohérence UA vs TLS + spoofing_ua_alpn AggregateFunction(countIf, UInt8), -- Incohérence UA vs ALPN + spoofing_os_ttl AggregateFunction(countIf, UInt8), -- Incohérence OS (Windows) vs TTL Linux (<= 64) + missing_human_headers AggregateFunction(countIf, UInt8), + + -- Anomalies Applicatives + suspicious_methods AggregateFunction(countIf, UInt8), -- Rafales de PUT/DELETE/OPTIONS + suspicious_queries AggregateFunction(countIf, UInt8) -- Payloads suspects (très longs) + +) ENGINE = AggregatingMergeTree() +PARTITION BY toYYYYMM(minute) +ORDER BY (host, ja4, src_ip, minute); + +CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.mv_traffic_1m +TO mabase_prod.agg_traffic_1m +AS SELECT + toStartOfMinute(time) AS minute, + host, + src_ip, + ja4, + header_user_agent, + + -- Comptage standard + countState() AS hits, + uniqState(path) AS uniq_paths, + + -- Réseau : Moyenne et Variance Population (Variance proche de 0 = Automatisation/Bot) + avgState(syn_to_clienthello_ms) AS avg_syn_to_clienthello_ms, + varPopState(syn_to_clienthello_ms) AS var_syn_to_clienthello_ms, + + -- Comptage de headers (On compte le nombre de virgules + 1 pour avoir le nombre d'éléments dans le header) + avgState((length(client_headers) - length(replaceAll(client_headers, ',', '')) + 1)) AS avg_headers_count, + + -- Incohérences (Spoofing) + countIfState( + header_user_agent ILIKE '%Chrome%' AND ja4 NOT ILIKE 't13d%' + -- Exemple simplifié : Prétend être Chrome (tls 1.3) mais JA4 annonce autre chose + ) AS spoofing_ua_tls, + + countIfState( + header_user_agent ILIKE '%Chrome%' AND tls_alpn NOT ILIKE '%h2%' + ) AS spoofing_ua_alpn, + + countIfState( + header_user_agent ILIKE '%Windows%' AND ip_meta_ttl <= 64 + ) AS spoofing_os_ttl, + + countIfState( + header_accept_language = '' OR header_sec_ch_ua = '' + ) AS missing_human_headers, + + -- Anomalies de comportement + countIfState(method IN ('PUT', 'DELETE', 'OPTIONS', 'TRACE')) AS suspicious_methods, + countIfState(length(query) > 200 OR match(query, '(%[0-9A-Fa-f]{2}){5,}')) AS suspicious_queries + +FROM mabase_prod.http_logs +GROUP BY minute, host, src_ip, ja4, header_user_agent; + + +-- ============================================================================== +-- 2. ROLLUP 1 HEURE (Cascading) +-- ============================================================================== + +CREATE TABLE IF NOT EXISTS mabase_prod.agg_traffic_1h ( + hour DateTime, + host LowCardinality(String), + ja4 String, + ja3_hash String, + header_user_agent String, + hits AggregateFunction(count, UInt64), + uniq_paths AggregateFunction(uniq, String), + missing_human_headers AggregateFunction(countIf, UInt8), + uniq_ips AggregateFunction(uniq, IPv4) +) ENGINE = AggregatingMergeTree() +PARTITION BY toYYYYMM(hour) +ORDER BY (host, ja4, ja3_hash, hour); + +CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.mv_traffic_1h +TO mabase_prod.agg_traffic_1h +AS SELECT + toStartOfHour(minute) AS hour, + host, + ja4, + ja3_hash, + header_user_agent, + countMergeState(hits) AS hits, + uniqMergeState(uniq_paths) AS uniq_paths, + countIfMergeState(missing_human_headers) AS missing_human_headers, + uniqState(src_ip) AS uniq_ips +FROM mabase_prod.agg_traffic_1m +GROUP BY hour, host, ja4, ja3_hash, header_user_agent; + +-- ============================================================================== +-- 3. ROLLUP 1 JOUR (Cascading) +-- ============================================================================== + +CREATE TABLE IF NOT EXISTS mabase_prod.agg_traffic_1d ( + day Date, + host LowCardinality(String), + ja4 String, + ja3_hash String, + header_user_agent String, + hits AggregateFunction(count, UInt64), + uniq_ips AggregateFunction(uniq, IPv4), + uniq_paths AggregateFunction(uniq, String), + missing_human_headers AggregateFunction(countIf, UInt8) +) ENGINE = AggregatingMergeTree() +PARTITION BY toYYYYMM(day) +ORDER BY (host, ja4, ja3_hash, day); + +CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.mv_traffic_1d +TO mabase_prod.agg_traffic_1d +AS SELECT + toDate(hour) AS day, + host, + ja4, + ja3_hash, + header_user_agent, + countMergeState(hits) AS hits, + uniqMergeState(uniq_ips) AS uniq_ips, + uniqMergeState(uniq_paths) AS uniq_paths, + countIfMergeState(missing_human_headers) AS missing_human_headers +FROM mabase_prod.agg_traffic_1h +GROUP BY day, host, ja4, ja3_hash, header_user_agent; + +-- ============================================================================== +-- 4. DÉTECTION DE NOUVEAUTÉ (First Seen) +-- ============================================================================== + +CREATE TABLE IF NOT EXISTS mabase_prod.agg_novelty ( + host LowCardinality(String), + ja4 String, + http_fp UInt64, + first_seen AggregateFunction(min, DateTime), + last_seen AggregateFunction(max, DateTime), + total_hits AggregateFunction(count, UInt64) +) ENGINE = AggregatingMergeTree() +ORDER BY (host, ja4, http_fp); + +CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.mv_novelty +TO mabase_prod.agg_novelty +AS SELECT + host, + ja4, + cityHash64(client_headers) AS http_fp, + minState(time) AS first_seen, + maxState(time) AS last_seen, + countState() AS total_hits +FROM mabase_prod.http_logs +GROUP BY host, ja4, http_fp; + +-- ============================================================================== +-- 5. BASELINE LONG TERME (Refreshable Materialized View) +-- ============================================================================== + +SET allow_experimental_refreshable_materialized_view = 1; + +CREATE TABLE IF NOT EXISTS mabase_prod.tbl_baseline_ja4_7d ( + ja4 String, + p99_hits_per_hour Float64 +) ENGINE = MergeTree() +ORDER BY ja4; + +CREATE MATERIALIZED VIEW IF NOT EXISTS mabase_prod.baseline_ja4_7d +REFRESH EVERY 1 DAY +TO mabase_prod.tbl_baseline_ja4_7d +AS SELECT + ja4, + quantile(0.99)(hourly_hits) AS p99_hits_per_hour +FROM ( + SELECT + ja4, + hour, + countMerge(hits) AS hourly_hits + FROM mabase_prod.agg_traffic_1h + WHERE hour >= now() - INTERVAL 7 DAY + GROUP BY ja4, hour +) +GROUP BY ja4; + + +-- ============================================================================== +-- vue aggregée +-- ============================================================================== + + +SELECT + src_ip, + ja4, + + -- 1. Récupération des valeurs fusionnées (Merge) + countMerge(hits) AS requetes_live, + uniqMerge(uniq_paths) AS urls_scannées, + round(varPopMerge(var_syn_to_clienthello_ms), 2) AS variance_ms, + round(avgMerge(avg_headers_count), 1) AS nb_moyen_headers, + + -- 2. CALCUL DU SCORE DE RISQUE MULTICOUCHE + ( + -- A. Poids Comportemental + (countMerge(hits) * 0.1) + + (uniqMerge(uniq_paths) * 5.0) + + if(countIfMerge(suspicious_methods) > 10, 40.0, 0.0) + + if(countIfMerge(suspicious_queries) > 0, 40.0, 0.0) + + -- B. Poids de la Pauvreté Applicative (Moins de 5 headers = Suspect) + + if(avgMerge(avg_headers_count) < 5.0, 30.0, 0.0) + + if(countIfMerge(missing_human_headers) > 0, 40.0, 0.0) + + -- C. Poids de l'Incohérence (Le Spoofing absolu) + + if(countIfMerge(spoofing_ua_tls) > 0, 50.0, 0.0) + + if(countIfMerge(spoofing_ua_alpn) > 0, 40.0, 0.0) + + if(countIfMerge(spoofing_os_ttl) > 0, 50.0, 0.0) + + -- D. Poids du Réseau et de l'Automatisation + -- Un humain a un réseau bruité. Un bot en datacenter a une variance proche de zéro. + + if(varPopMerge(var_syn_to_clienthello_ms) < 2.0 AND countMerge(hits) > 5, 30.0, 0.0) + + ) AS final_threat_score + +FROM mabase_prod.agg_traffic_1m +WHERE minute >= now() - INTERVAL 5 MINUTE +GROUP BY src_ip, ja4 + +-- On isole le trafic très dangereux +HAVING final_threat_score > 80 +ORDER BY final_threat_score DESC;