-- ============================================================================= -- 05_aggregation_tables.sql — Behavioral aggregation tables + MVs -- Source: bot_detector/deploy_views.sql sections 2-5 -- ============================================================================= -- ----------------------------------------------------------------------------- -- Bot reputation dictionaries (in-RAM for fast lookup) -- CSV files must be placed at: /var/lib/clickhouse/user_files/ -- ----------------------------------------------------------------------------- DROP DICTIONARY IF EXISTS ja4_processing.dict_bot_ip; CREATE DICTIONARY ja4_processing.dict_bot_ip ( prefix String, bot_name String ) PRIMARY KEY prefix SOURCE(FILE(path '/var/lib/clickhouse/user_files/bot_ip.csv' format 'CSV')) LAYOUT(IP_TRIE()) LIFETIME(MIN 300 MAX 300); DROP DICTIONARY IF EXISTS ja4_processing.dict_bot_ja4; CREATE DICTIONARY ja4_processing.dict_bot_ja4 ( ja4 String, bot_name String ) PRIMARY KEY ja4 SOURCE(FILE(path '/var/lib/clickhouse/user_files/bot_ja4.csv' format 'CSV')) LAYOUT(COMPLEX_KEY_HASHED()) LIFETIME(MIN 300 MAX 300); DROP DICTIONARY IF EXISTS ja4_processing.dict_asn_reputation; CREATE DICTIONARY ja4_processing.dict_asn_reputation ( src_asn UInt64, label String ) PRIMARY KEY src_asn SOURCE(FILE(path '/var/lib/clickhouse/user_files/asn_reputation.csv' format 'CSV')) LAYOUT(HASHED()) LIFETIME(MIN 300 MAX 300); DROP DICTIONARY IF EXISTS ja4_processing.dict_browser_ja4; CREATE DICTIONARY ja4_processing.dict_browser_ja4 ( ja4 String, browser_family String, tls_library String, context String ) PRIMARY KEY ja4 SOURCE(FILE(path '/var/lib/clickhouse/user_files/browser_ja4.csv' format 'CSV')) LAYOUT(COMPLEX_KEY_HASHED()) LIFETIME(MIN 300 MAX 300); -- ----------------------------------------------------------------------------- -- agg_host_ip_ja4_1h — behavioral aggregation (L4/L5/L7) -- ----------------------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ja4_processing.agg_host_ip_ja4_1h ( window_start DateTime, src_ip IPv6, ja4 String, host String, src_asn UInt32, src_country_code SimpleAggregateFunction(any, String), src_as_name SimpleAggregateFunction(any, String), src_org SimpleAggregateFunction(any, String), src_domain SimpleAggregateFunction(any, String), first_seen SimpleAggregateFunction(min, DateTime), last_seen SimpleAggregateFunction(max, DateTime), hits SimpleAggregateFunction(sum, UInt64), count_post SimpleAggregateFunction(sum, UInt64), uniq_paths AggregateFunction(uniq, String), uniq_query_params AggregateFunction(uniq, String), tcp_fp_raw SimpleAggregateFunction(any, String), tcp_jitter_variance AggregateFunction(varPop, Float64), tcp_win_raw SimpleAggregateFunction(any, UInt32), tcp_scale_raw SimpleAggregateFunction(any, UInt32), tcp_mss_raw SimpleAggregateFunction(any, UInt32), tcp_ttl_raw SimpleAggregateFunction(any, UInt32), http_ver_raw SimpleAggregateFunction(any, String), tls_alpn_raw SimpleAggregateFunction(any, String), tls_sni_raw SimpleAggregateFunction(any, String), first_ua SimpleAggregateFunction(any, String), correlated_raw SimpleAggregateFunction(max, UInt8), unique_src_ports AggregateFunction(uniq, UInt16), unique_conn_id AggregateFunction(uniq, String), max_keepalives SimpleAggregateFunction(max, UInt32), orphan_count SimpleAggregateFunction(sum, UInt64), ip_id_zero_count SimpleAggregateFunction(sum, UInt64), total_ip_length_var AggregateFunction(varPop, Float64), mss_1460_count SimpleAggregateFunction(sum, UInt64), count_assets SimpleAggregateFunction(sum, UInt64), count_no_referer SimpleAggregateFunction(sum, UInt64), uniq_ua AggregateFunction(uniq, String), max_requests_per_sec SimpleAggregateFunction(max, UInt32), url_depth_variance AggregateFunction(varPop, Float64), count_anomalous_payload SimpleAggregateFunction(sum, UInt64), -- B features uniq_ja3 AggregateFunction(uniq, String), avg_syn_ms AggregateFunction(avg, Float64), tls12_count SimpleAggregateFunction(sum, UInt64), count_head SimpleAggregateFunction(sum, UInt64), count_no_sec_fetch SimpleAggregateFunction(sum, UInt64), count_generic_accept SimpleAggregateFunction(sum, UInt64), count_http10 SimpleAggregateFunction(sum, UInt64), ip_df_var AggregateFunction(varPop, Float64), -- TTL features (L4 fingerprint / OS) avg_ttl AggregateFunction(avgIf, Float64, UInt8), ttl_var AggregateFunction(varPopIf, Float64, UInt8), count_no_wscale SimpleAggregateFunction(sum, UInt64), count_correlated SimpleAggregateFunction(sum, UInt64), -- HTTP features count_no_accept_enc SimpleAggregateFunction(sum, UInt64), count_http_scheme SimpleAggregateFunction(sum, UInt64), -- P1 : nouvelles features de détection count_xff SimpleAggregateFunction(sum, UInt64), count_unusual_ct SimpleAggregateFunction(sum, UInt64), count_non_std_port SimpleAggregateFunction(sum, UInt64), count_login_post SimpleAggregateFunction(sum, UInt64), -- Projection pour les requêtes d'investigation par IP : -- ORDER BY actuel (window_start, src_ip, ...) est optimal pour heatmap -- mais inefficace pour WHERE src_ip = X (IP pas en première position). -- Cette projection stocke les données triées par (src_ip, ...) et est -- utilisée automatiquement par ClickHouse pour les filtres sur src_ip. PROJECTION proj_by_ip ( SELECT * ORDER BY (src_ip, window_start, ja4, host) ) ) ENGINE = AggregatingMergeTree() ORDER BY (window_start, src_ip, ja4, host) SETTINGS deduplicate_merge_projection_mode = 'drop'; -- ----------------------------------------------------------------------------- -- mv_agg_host_ip_ja4_1h — feeds agg_host_ip_ja4_1h from http_logs -- ----------------------------------------------------------------------------- DROP VIEW IF EXISTS ja4_processing.mv_agg_host_ip_ja4_1h; CREATE MATERIALIZED VIEW ja4_processing.mv_agg_host_ip_ja4_1h TO ja4_processing.agg_host_ip_ja4_1h AS SELECT toStartOfHour(src.time) AS window_start, toIPv6(src.src_ip) AS src_ip, src.ja4, src.host, src.src_asn, any(src.src_country_code) AS src_country_code, any(src.src_as_name) AS src_as_name, any(src.src_org) AS src_org, any(src.src_domain) AS src_domain, min(src.time) AS first_seen, max(src.time) AS last_seen, count() AS hits, sum(IF(src.method = 'POST', 1, 0)) AS count_post, uniqState(src.path) AS uniq_paths, uniqState(src.query) AS uniq_query_params, any(toString(cityHash64(concat(toString(src.tcp_meta_window_size), toString(src.tcp_meta_mss), toString(src.tcp_meta_window_scale), src.tcp_meta_options)))) AS tcp_fp_raw, varPopState(toFloat64(src.syn_to_clienthello_ms)) AS tcp_jitter_variance, any(src.tcp_meta_window_size) AS tcp_win_raw, any(src.tcp_meta_window_scale) AS tcp_scale_raw, any(src.tcp_meta_mss) AS tcp_mss_raw, any(src.ip_meta_ttl) AS tcp_ttl_raw, any(src.http_version) AS http_ver_raw, any(src.tls_alpn) AS tls_alpn_raw, any(src.tls_sni) AS tls_sni_raw, any(src.header_user_agent) AS first_ua, max(toUInt8(src.correlated)) AS correlated_raw, uniqState(toUInt16(src.src_port)) AS unique_src_ports, uniqState(src.conn_id) AS unique_conn_id, max(toUInt32(src.keepalives)) AS max_keepalives, sum(IF(src.orphan_side = 'A' OR src.correlated = 0, 1, 0)) AS orphan_count, sum(IF(src.ip_meta_id == 0, 1, 0)) AS ip_id_zero_count, varPopState(toFloat64(src.ip_meta_total_length)) AS total_ip_length_var, sum(IF(src.tcp_meta_mss == 1460, 1, 0)) AS mss_1460_count, sum(IF(match(src.path, '(?i)\.(png|jpg|jpeg|gif|css|js|ico|woff2|svg|eot)$'), 1, 0)) AS count_assets, sum(IF(position(src.client_headers, 'Referer') = 0, 1, 0)) AS count_no_referer, uniqState(src.header_user_agent) AS uniq_ua, 0 AS max_requests_per_sec, -- TODO(P0): calculer via sous-requête par seconde (impossible dans un seul GROUP BY) varPopState(toFloat64(length(replaceAll(src.path, '/', '//')) - length(src.path))) AS url_depth_variance, sum(IF(src.ip_meta_total_length < 60 OR src.ip_meta_total_length > 1500, 1, 0)) AS count_anomalous_payload, uniqState(src.ja3) AS uniq_ja3, avgState(toFloat64(src.syn_to_clienthello_ms)) AS avg_syn_ms, sum(IF(src.tls_version = '1.2', 1, 0)) AS tls12_count, sum(IF(src.method = 'HEAD', 1, 0)) AS count_head, sum(IF(length(src.header_sec_fetch_site) = 0, 1, 0)) AS count_no_sec_fetch, sum(IF(length(src.header_accept) < 5, 1, 0)) AS count_generic_accept, sum(IF(src.http_version = 'HTTP/1.0', 1, 0)) AS count_http10, varPopState(toFloat64(src.ip_meta_df)) AS ip_df_var, avgIfState(toFloat64(src.ip_meta_ttl), src.ip_meta_ttl > 0) AS avg_ttl, varPopIfState(toFloat64(src.ip_meta_ttl), src.ip_meta_ttl > 0) AS ttl_var, sum(IF(src.tcp_meta_window_scale = 0 AND src.correlated = 1, 1, 0)) AS count_no_wscale, sum(toUInt64(src.correlated)) AS count_correlated, sum(IF(length(src.header_accept_encoding) = 0, 1, 0)) AS count_no_accept_enc, sum(IF(src.scheme = 'http', 1, 0)) AS count_http_scheme, -- P1 : nouvelles features sum(IF(length(src.header_x_forwarded_for) > 0, 1, 0)) AS count_xff, sum(IF(src.method = 'POST' AND length(src.header_content_type) > 0 AND NOT match(src.header_content_type, '(?i)(form-urlencoded|multipart|json|xml|text/plain|grpc|protobuf)'), 1, 0)) AS count_unusual_ct, sum(IF(src.dst_port NOT IN (80, 443, 8080, 8443), 1, 0)) AS count_non_std_port, sum(IF(src.method = 'POST' AND match(src.path, '(?i)(login|signin|auth|token|session|wp-login|connect|oauth)'), 1, 0)) AS count_login_post FROM ja4_logs.http_logs AS src GROUP BY window_start, src_ip, ja4, host, src_asn; -- ----------------------------------------------------------------------------- -- agg_header_fingerprint_1h — header fingerprint aggregation (L7) -- ----------------------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ja4_processing.agg_header_fingerprint_1h ( window_start DateTime, src_ip IPv6, header_order_hash SimpleAggregateFunction(any, String), header_count SimpleAggregateFunction(max, UInt16), has_accept_language SimpleAggregateFunction(max, UInt8), has_cookie SimpleAggregateFunction(max, UInt8), has_referer SimpleAggregateFunction(max, UInt8), modern_browser_score SimpleAggregateFunction(max, UInt8), has_sec_ch_ua SimpleAggregateFunction(max, UInt8), ua_ch_mismatch SimpleAggregateFunction(max, UInt8), sec_ch_mobile_mismatch SimpleAggregateFunction(max, UInt8), sec_fetch_mode SimpleAggregateFunction(any, String), sec_fetch_dest SimpleAggregateFunction(any, String) ) ENGINE = AggregatingMergeTree() ORDER BY (window_start, src_ip); DROP VIEW IF EXISTS ja4_processing.mv_agg_header_fingerprint_1h; CREATE MATERIALIZED VIEW ja4_processing.mv_agg_header_fingerprint_1h TO ja4_processing.agg_header_fingerprint_1h AS SELECT toStartOfHour(src.time) AS window_start, toIPv6(src.src_ip) AS src_ip, any(toString(cityHash64(src.client_headers))) AS header_order_hash, max(toUInt16(length(src.client_headers) - length(replaceAll(src.client_headers, ',', '')) + 1)) AS header_count, max(toUInt8(if(position(src.client_headers, 'Accept-Language') > 0, 1, 0))) AS has_accept_language, max(toUInt8(if(position(src.client_headers, 'Cookie') > 0, 1, 0))) AS has_cookie, max(toUInt8(if(position(src.client_headers, 'Referer') > 0, 1, 0))) AS has_referer, max(toUInt8(if(length(src.header_sec_ch_ua) > 0, 100, if(length(src.header_sec_fetch_site) > 0, 70, 0)))) AS modern_browser_score, max(toUInt8(if(length(src.header_sec_ch_ua) > 0, 1, 0))) AS has_sec_ch_ua, max(toUInt8(if((position(src.header_user_agent, 'Windows') > 0 AND position(src.header_sec_ch_ua_platform, 'Windows') == 0) OR (position(src.header_user_agent, 'iPhone') > 0 AND position(src.header_sec_ch_ua_platform, 'iOS') == 0), 1, 0))) AS ua_ch_mismatch, max(toUInt8(if( (src.header_sec_ch_ua_mobile = '?1' AND position(src.header_sec_ch_ua_platform, 'Windows') > 0) OR (src.header_sec_ch_ua_mobile = '?0' AND position(src.header_sec_ch_ua_platform, 'Android') > 0), 1, 0))) AS sec_ch_mobile_mismatch, any(src.header_sec_fetch_mode) AS sec_fetch_mode, any(src.header_sec_fetch_dest) AS sec_fetch_dest FROM ja4_logs.http_logs AS src GROUP BY window_start, src.src_ip;