-- ============================================================================= -- 04_mv_http_logs.sql — Canonical http_logs target table + mv_http_logs MV -- -- Canonical version — supersedes services/correlator/sql/init.sql base version. -- This version includes full Anubis enrichment (anubis_bot_name, anubis_bot_action, -- anubis_bot_category) with combined UA+IP priority logic from -- bot_detector/anubis/mv_http_logs.sql. -- ============================================================================= -- ----------------------------------------------------------------------------- -- http_logs — parsed destination table (populated by mv_http_logs) -- Includes Anubis enrichment columns added by deploy_schema.sql items 9+ -- ----------------------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ja4_logs.http_logs ( -- Temporal `time` DateTime, `log_date` Date DEFAULT toDate(time), -- Network `src_ip` IPv4, `src_port` UInt16, `dst_ip` IPv4, `dst_port` UInt16, -- IPLocate enrichment `src_asn` UInt32, `src_country_code` LowCardinality(String), `src_as_name` LowCardinality(String), `src_org` LowCardinality(String), `src_domain` LowCardinality(String), -- HTTP `method` LowCardinality(String), `scheme` LowCardinality(String), `host` LowCardinality(String), `path` String CODEC(ZSTD(3)), `query` String CODEC(ZSTD(3)), `http_version` LowCardinality(String), -- Correlation `orphan_side` LowCardinality(String), `correlated` UInt8, `keepalives` UInt16, `a_timestamp` UInt64, `b_timestamp` UInt64, `conn_id` String CODEC(ZSTD(3)), -- Response metadata (captured at log_transaction phase) `status_code` UInt16 DEFAULT 0, `response_size` UInt64 DEFAULT 0, `duration_ms` UInt64 DEFAULT 0, -- Header fingerprinting `headers_raw` String DEFAULT '' CODEC(ZSTD(3)), `header_order_signature` String DEFAULT '' CODEC(ZSTD(3)), -- IP metadata `ip_meta_df` UInt8, `ip_meta_id` UInt16, `ip_meta_total_length` UInt16, `ip_meta_ttl` UInt8, -- TCP metadata `tcp_meta_options` LowCardinality(String), `tcp_meta_window_size` UInt32, `tcp_meta_mss` UInt16, `tcp_meta_window_scale` UInt8, `syn_to_clienthello_ms` Int32, -- TLS / fingerprints `tls_version` LowCardinality(String), `tls_sni` LowCardinality(String), `tls_alpn` LowCardinality(String), `ja3` String CODEC(ZSTD(3)), `ja3_hash` String CODEC(ZSTD(3)), `ja4` String CODEC(ZSTD(3)), -- HTTP headers `client_headers` String CODEC(ZSTD(3)), `header_user_agent` String CODEC(ZSTD(3)), `header_accept` String CODEC(ZSTD(3)), `header_accept_encoding` String CODEC(ZSTD(3)), `header_accept_language` String CODEC(ZSTD(3)), `header_content_type` String CODEC(ZSTD(3)), `header_x_request_id` String CODEC(ZSTD(3)), `header_x_trace_id` String CODEC(ZSTD(3)), `header_x_forwarded_for` String CODEC(ZSTD(3)), `header_sec_ch_ua` String CODEC(ZSTD(3)), `header_sec_ch_ua_mobile` String CODEC(ZSTD(3)), `header_sec_ch_ua_platform` String CODEC(ZSTD(3)), `header_sec_fetch_dest` String CODEC(ZSTD(3)), `header_sec_fetch_mode` String CODEC(ZSTD(3)), `header_sec_fetch_site` String CODEC(ZSTD(3)), -- Anubis enrichment columns `anubis_bot_name` LowCardinality(String) DEFAULT '', `anubis_bot_action` LowCardinality(String) DEFAULT '', `anubis_bot_category` LowCardinality(String) DEFAULT '', -- Fingerprint HTTP/2 passif (mod_reqin_log connection filter) `h2_fingerprint` String DEFAULT '' CODEC(ZSTD(3)), `h2_settings_fp` String DEFAULT '' CODEC(ZSTD(3)), `h2_window_update` UInt32 DEFAULT 0, `h2_pseudo_order` LowCardinality(String) DEFAULT '', `h2_has_priority` UInt8 DEFAULT 0, `h2_settings_ack` UInt8 DEFAULT 0, -- Paramètres SETTINGS HTTP/2 individuels (RFC 9113 §6.5.2) -- -1 = absent du preface client (le client n'a pas envoyé ce paramètre) `h2_header_table_size` Int32 DEFAULT -1, `h2_enable_push` Int32 DEFAULT -1, `h2_max_concurrent_streams` Int32 DEFAULT -1, `h2_initial_window_size` Int64 DEFAULT -1, `h2_max_frame_size` Int32 DEFAULT -1, `h2_max_header_list_size` Int32 DEFAULT -1, `h2_enable_connect_protocol` Int32 DEFAULT -1, -- Index bloom_filter sur src_ip : les requêtes WHERE src_ip = X sautent -- les granules qui ne contiennent pas cette IP (~90% des granules en pratique). -- Taux de faux positifs 1% (0.01) : bon compromis taille / efficacité. INDEX idx_src_ip src_ip TYPE bloom_filter(0.01) GRANULARITY 4, INDEX idx_ja4 ja4 TYPE bloom_filter(0.01) GRANULARITY 4 ) ENGINE = MergeTree PARTITION BY log_date ORDER BY (time, src_ip, dst_ip, ja4) TTL log_date + INTERVAL 30 DAY SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; -- ----------------------------------------------------------------------------- -- mv_http_logs — canonical materialized view with full Anubis enrichment -- Priority logic: (1) UA+IP combined [same rule_id] > (2) UA only > (3) IP only -- > (4) ASN > (5) Country -- Uses WITH clause CTEs for clean intermediate values. -- ----------------------------------------------------------------------------- DROP VIEW IF EXISTS ja4_logs.mv_http_logs; CREATE MATERIALIZED VIEW ja4_logs.mv_http_logs TO ja4_logs.http_logs AS WITH coalesce(JSONExtractString(raw_json, 'header_User-Agent'), '') AS _ua, toIPv6(toIPv4(coalesce(JSONExtractString(raw_json, 'src_ip'), '0.0.0.0'))) AS _ip, toUInt32(dictGetOrDefault('ja4_processing.dict_iplocate_asn', 'asn', _ip, toUInt32(0))) AS _asn, dictGetOrDefault('ja4_processing.dict_iplocate_asn', 'country_code', _ip, '') AS _cc SELECT parseDateTimeBestEffort(coalesce(JSONExtractString(raw_json, 'time'), '1970-01-01T00:00:00Z')) AS time, toDate(time) AS log_date, toIPv4(coalesce(nullIf(JSONExtractString(raw_json, 'src_ip'), ''), '0.0.0.0')) AS src_ip, toUInt16(coalesce(JSONExtractUInt(raw_json, 'src_port'), 0)) AS src_port, _asn AS src_asn, _cc AS src_country_code, toIPv4(coalesce(nullIf(JSONExtractString(raw_json, 'dst_ip'), ''), '0.0.0.0')) AS dst_ip, toUInt16(coalesce(JSONExtractUInt(raw_json, 'dst_port'), 0)) AS dst_port, dictGetOrDefault('ja4_processing.dict_iplocate_asn', 'name', _ip, '') AS src_as_name, '' AS src_org, '' AS src_domain, coalesce(JSONExtractString(raw_json, 'method'), '') AS method, coalesce(JSONExtractString(raw_json, 'scheme'), '') AS scheme, coalesce(nullIf(JSONExtractString(raw_json, 'host'), ''), JSONExtractString(raw_json, 'tls_sni'), '') AS host, coalesce(JSONExtractString(raw_json, 'path'), '') AS path, coalesce(JSONExtractString(raw_json, 'query_string'), JSONExtractString(raw_json, 'query'), '') AS query, coalesce(JSONExtractString(raw_json, 'http_version'), '') AS http_version, coalesce(JSONExtractString(raw_json, 'orphan_side'), '') AS orphan_side, toUInt8(coalesce(JSONExtractUInt(raw_json, 'correlated'), 0)) AS correlated, toUInt16(coalesce(JSONExtractUInt(raw_json, 'keepalives'), 0)) AS keepalives, coalesce(JSONExtractUInt(raw_json, 'a_timestamp'), 0) AS a_timestamp, coalesce(JSONExtractUInt(raw_json, 'b_timestamp'), 0) AS b_timestamp, coalesce(JSONExtractString(raw_json, 'conn_id'), '') AS conn_id, toUInt16(coalesce(JSONExtractUInt(raw_json, 'status_code'), 0)) AS status_code, coalesce(JSONExtractUInt(raw_json, 'response_size'), 0) AS response_size, coalesce(JSONExtractUInt(raw_json, 'duration_ms'), 0) AS duration_ms, coalesce(JSONExtractString(raw_json, 'headers_raw'), '') AS headers_raw, coalesce(JSONExtractString(raw_json, 'header_order_signature'), '') AS header_order_signature, toUInt8(coalesce(JSONExtractBool(raw_json, 'ip_meta_df'), 0)) AS ip_meta_df, toUInt16(coalesce(JSONExtractUInt(raw_json, 'ip_meta_id'), 0)) AS ip_meta_id, toUInt16(coalesce(JSONExtractUInt(raw_json, 'ip_meta_total_length'), 0)) AS ip_meta_total_length, toUInt8(coalesce(JSONExtractUInt(raw_json, 'ip_meta_ttl'), 0)) AS ip_meta_ttl, coalesce(JSONExtractString(raw_json, 'tcp_meta_options'), '') AS tcp_meta_options, toUInt32(coalesce(JSONExtractUInt(raw_json, 'tcp_meta_window_size'), 0)) AS tcp_meta_window_size, toUInt16(coalesce(JSONExtractUInt(raw_json, 'tcp_meta_mss'), 0)) AS tcp_meta_mss, toUInt8(coalesce(JSONExtractUInt(raw_json, 'tcp_meta_window_scale'), 0)) AS tcp_meta_window_scale, toInt32(coalesce(JSONExtractInt(raw_json, 'syn_to_clienthello_ms'), 0)) AS syn_to_clienthello_ms, coalesce(JSONExtractString(raw_json, 'tls_version'), '') AS tls_version, coalesce(JSONExtractString(raw_json, 'tls_sni'), '') AS tls_sni, coalesce(JSONExtractString(raw_json, 'tls_alpn'), '') AS tls_alpn, coalesce(JSONExtractString(raw_json, 'ja3'), '') AS ja3, coalesce(JSONExtractString(raw_json, 'ja3_hash'), '') AS ja3_hash, coalesce(JSONExtractString(raw_json, 'ja4'), '') AS ja4, coalesce(JSONExtractString(raw_json, 'client_headers'), '') AS client_headers, coalesce(JSONExtractString(raw_json, 'header_User-Agent'), '') AS header_user_agent, coalesce(JSONExtractString(raw_json, 'header_Accept'), '') AS header_accept, coalesce(JSONExtractString(raw_json, 'header_Accept-Encoding'), '') AS header_accept_encoding, coalesce(JSONExtractString(raw_json, 'header_Accept-Language'), '') AS header_accept_language, coalesce(JSONExtractString(raw_json, 'header_Content-Type'), '') AS header_content_type, coalesce(JSONExtractString(raw_json, 'header_X-Request-Id'), '') AS header_x_request_id, coalesce(JSONExtractString(raw_json, 'header_X-Trace-Id'), '') AS header_x_trace_id, coalesce(JSONExtractString(raw_json, 'header_X-Forwarded-For'), '') AS header_x_forwarded_for, coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA'), '') AS header_sec_ch_ua, coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Mobile'), '') AS header_sec_ch_ua_mobile, coalesce(JSONExtractString(raw_json, 'header_Sec-CH-UA-Platform'), '') AS header_sec_ch_ua_platform, coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Dest'), '') AS header_sec_fetch_dest, coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Mode'), '') AS header_sec_fetch_mode, coalesce(JSONExtractString(raw_json, 'header_Sec-Fetch-Site'), '') AS header_sec_fetch_site, -- Anubis enrichment: IP/CIDR > ASN (simplified — UA and Country rules removed) COALESCE( nullIf(dictGetOrDefault('ja4_processing.dict_anubis_ip', 'bot_name', _ip, ''), ''), nullIf(dictGetOrDefault('ja4_processing.dict_anubis_asn', 'bot_name', _asn, ''), ''), '' ) AS anubis_bot_name, COALESCE( nullIf(dictGetOrDefault('ja4_processing.dict_anubis_ip', 'action', _ip, ''), ''), nullIf(dictGetOrDefault('ja4_processing.dict_anubis_asn', 'action', _asn, ''), ''), '' ) AS anubis_bot_action, COALESCE( nullIf(dictGetOrDefault('ja4_processing.dict_anubis_ip', 'category', _ip, ''), ''), nullIf(dictGetOrDefault('ja4_processing.dict_anubis_asn', 'category', _asn, ''), ''), '' ) AS anubis_bot_category, -- Fingerprint HTTP/2 passif : champs émis par mod_reqin_log si HTTP/2 détecté coalesce(JSONExtractString(raw_json, 'h2_fingerprint'), '') AS h2_fingerprint, coalesce(JSONExtractString(raw_json, 'h2_settings_fp'), '') AS h2_settings_fp, toUInt32(coalesce(JSONExtractUInt(raw_json, 'h2_window_update'), 0)) AS h2_window_update, coalesce(JSONExtractString(raw_json, 'h2_pseudo_order'), '') AS h2_pseudo_order, toUInt8(coalesce(JSONExtractUInt(raw_json, 'h2_has_priority'), 0)) AS h2_has_priority, toUInt8(coalesce(JSONExtractUInt(raw_json, 'h2_settings_ack'), 0)) AS h2_settings_ack, -- Paramètres SETTINGS HTTP/2 individuels (-1 = absent du preface client) toInt32(if(JSONHas(raw_json, 'h2_header_table_size'), JSONExtractInt(raw_json, 'h2_header_table_size'), -1)) AS h2_header_table_size, toInt32(if(JSONHas(raw_json, 'h2_enable_push'), JSONExtractInt(raw_json, 'h2_enable_push'), -1)) AS h2_enable_push, toInt32(if(JSONHas(raw_json, 'h2_max_concurrent_streams'), JSONExtractInt(raw_json, 'h2_max_concurrent_streams'), -1)) AS h2_max_concurrent_streams, toInt64(if(JSONHas(raw_json, 'h2_initial_window_size'), JSONExtractInt(raw_json, 'h2_initial_window_size'), -1)) AS h2_initial_window_size, toInt32(if(JSONHas(raw_json, 'h2_max_frame_size'), JSONExtractInt(raw_json, 'h2_max_frame_size'), -1)) AS h2_max_frame_size, toInt32(if(JSONHas(raw_json, 'h2_max_header_list_size'), JSONExtractInt(raw_json, 'h2_max_header_list_size'), -1)) AS h2_max_header_list_size, toInt32(if(JSONHas(raw_json, 'h2_enable_connect_protocol'), JSONExtractInt(raw_json, 'h2_enable_connect_protocol'), -1)) AS h2_enable_connect_protocol FROM ja4_logs.http_logs_raw;