-- ============================================================================= -- 06_ml_tables.sql — ML detection results tables -- Source: bot_detector/deploy_views.sql sections 6-6b + deploy_schema.sql items 11-12 -- -- Optimisations de performance : -- - ml_detected_anomalies : PARTITION BY date → élagage de partitions sur -- les requêtes temporelles (WHERE detected_at >= now() - INTERVAL N DAY) -- - INDEX idx_detected_at (minmax) → skip des granules hors plage temporelle -- - INDEX idx_threat_level (set) → skip pour les filtres par niveau de menace -- - ml_all_scores : PARTITION BY date + INDEX identiques -- ============================================================================= -- ----------------------------------------------------------------------------- -- ml_detected_anomalies — anomaly detections above threat threshold -- -- Déduplication : ReplacingMergeTree(detected_at) sur ORDER BY (src_ip) -- → conserve la détection la plus récente par IP. -- PARTITION BY : élagage journalier (les requêtes 24h/7j ignorent les vieilles -- partitions sans lire aucune donnée). -- INDEX idx_detected_at : skip des granules 8192 lignes hors de la plage -- temporelle demandée (minmax = min/max par granule). -- INDEX idx_threat_level : skip pour countIf(threat_level = 'CRITICAL') etc. -- ----------------------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ja4_processing.ml_detected_anomalies ( detected_at DateTime, src_ip IPv6, ja4 String, host String, bot_name String, anomaly_score Float32, threat_level String, model_name String, recurrence UInt32, asn_number String, asn_org String, asn_detail String, asn_domain String, country_code String, asn_label String, hits UInt64, hit_velocity Float32, fuzzing_index Float32, post_ratio Float32, port_exhaustion_ratio Float32, max_keepalives UInt32, orphan_ratio Float32, tcp_jitter_variance Float32, tcp_shared_count UInt32, true_window_size UInt64, window_mss_ratio Float32, alpn_http_mismatch UInt8, is_alpn_missing UInt8, sni_host_mismatch UInt8, header_count UInt16, has_accept_language UInt8, has_cookie UInt8, has_referer UInt8, modern_browser_score UInt8, is_headless UInt8, ua_ch_mismatch UInt8, header_order_shared_count UInt32, ip_id_zero_ratio Float32, request_size_variance Float32, multiplexing_efficiency Float32, mss_mobile_mismatch UInt8, correlated UInt8, reason String, asset_ratio Float32, direct_access_ratio Float32, is_ua_rotating UInt8, distinct_ja4_count UInt32, src_port_density Float32, ja4_asn_concentration Float32, ja4_country_concentration Float32, is_rare_ja4 UInt8, header_order_confidence Float32, distinct_header_orders UInt32, temporal_entropy Float32, path_diversity_ratio Float32, url_depth_variance Float32, anomalous_payload_ratio Float32, -- v11 additions campaign_id Int32 DEFAULT -1, raw_anomaly_score Float32 DEFAULT 0, -- Anubis enrichment (deploy_schema.sql item 11) anubis_bot_name LowCardinality(String) DEFAULT '', anubis_bot_action LowCardinality(String) DEFAULT '', anubis_bot_category LowCardinality(String) DEFAULT '', -- Index de saut : skip des granules hors plage temporelle INDEX idx_detected_at detected_at TYPE minmax GRANULARITY 4, -- Index de saut : skip pour les filtres sur threat_level (CRITICAL/HIGH/...) INDEX idx_threat_level threat_level TYPE set(8) GRANULARITY 4, -- Index de saut : skip pour les filtres bot_name != '' INDEX idx_bot_name bot_name TYPE bloom_filter() GRANULARITY 4 ) ENGINE = ReplacingMergeTree(detected_at) PARTITION BY toYYYYMMDD(detected_at) ORDER BY (src_ip) TTL detected_at + INTERVAL 30 DAY SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; -- supprime la partition entière à expiration (plus efficace) -- ----------------------------------------------------------------------------- -- ml_all_scores — all classifications (no threshold, for observability) -- -- PARTITION BY date : TTL de 3 jours → les partitions expirées sont supprimées -- entièrement sans avoir à lire chaque granule (ttl_only_drop_parts). -- INDEX idx_detected_at : idem ml_detected_anomalies. -- ----------------------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ja4_processing.ml_all_scores ( detected_at DateTime, window_start DateTime, src_ip IPv6, ja4 String, host String, bot_name String, anomaly_score Float32, raw_anomaly_score Float32, threat_level String, model_name String, correlated UInt8, asn_number String, asn_org String, country_code String, asn_label String, hits UInt64, hit_velocity Float32, fuzzing_index Float32, post_ratio Float32, campaign_id Int32, -- Anubis enrichment (deploy_schema.sql item 12) anubis_bot_name LowCardinality(String) DEFAULT '', anubis_bot_action LowCardinality(String) DEFAULT '', anubis_bot_category LowCardinality(String) DEFAULT '', INDEX idx_detected_at detected_at TYPE minmax GRANULARITY 4, INDEX idx_threat_level threat_level TYPE set(8) GRANULARITY 4 ) ENGINE = ReplacingMergeTree(detected_at) PARTITION BY toYYYYMMDD(window_start) ORDER BY (window_start, src_ip, ja4, host, model_name) TTL window_start + INTERVAL 3 DAY SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; -- ----------------------------------------------------------------------------- -- view_ip_recurrence — récurrence des détections par IP -- -- Agrège ml_detected_anomalies (ORDER BY src_ip) pour obtenir le profil -- de récurrence de chaque IP détectée. -- -- Optimisation : avec PARTITION BY toYYYYMMDD(detected_at) (ajouté en P1), -- chaque GROUP BY src_ip bénéficie de l'élagage de partitions si la vue est -- filtrée par date en amont (les routes filtrent généralement sur 30 jours max). -- Le ORDER BY (src_ip) garantit que le GROUP BY src_ip lit des données -- contiguës en mémoire (co-localisation des lignes d'une même IP). -- ----------------------------------------------------------------------------- CREATE OR REPLACE VIEW ja4_processing.view_ip_recurrence AS SELECT src_ip, count() AS recurrence, min(detected_at) AS first_seen, max(detected_at) AS last_seen, min(anomaly_score) AS worst_score, argMin(threat_level, anomaly_score) AS worst_threat_level FROM ja4_processing.ml_detected_anomalies -- Filtre temporel aligné sur le TTL de la table (30 jours) -- Évite de scanner les partitions expirées non encore supprimées par le TTL WHERE detected_at >= now() - INTERVAL 30 DAY GROUP BY src_ip;