-- ============================================================================= -- 12_thesis_features.sql — Techniques avancées de détection (Thèse §5) -- -- Implémente les techniques originales décrites dans : -- docs/THESIS_HTTP_Traffic_Detection.md -- -- Chaque section crée une table d'agrégation + MV + vue analytique. -- Les features calculées sont exposées dans view_thesis_features_1h, -- joinable avec view_ai_features_1h sur (window_start, src_ip, ja4, host). -- ============================================================================= -- ============================================================================= -- §5.1 — Entropie de séquence de chemins (Path Sequence Entropy) -- -- Principe : stocker les séquences ordonnées de chemins par session et calculer -- l'entropie de transition de Markov d'ordre 1 sur les chemins normalisés à -- profondeur 2 (ex: /shop/product/*). -- -- Signal : -- - Humain : entropie élevée (transitions variées, non-déterministes) -- - Crawler : entropie faible (transitions prévisibles, séquentielles) -- - Scanner : entropie nulle (même chemin répété) -- ============================================================================= CREATE TABLE IF NOT EXISTS ja4_processing.agg_path_sequences_1h ( window_start DateTime, src_ip IPv6, ja4 LowCardinality(String), host LowCardinality(String), -- Séquences (unix_timestamp, path) — triées par timestamp à la lecture path_sequence AggregateFunction(groupArray(100), Tuple(UInt32, String)) ) ENGINE = AggregatingMergeTree() PARTITION BY toDate(window_start) ORDER BY (window_start, src_ip, ja4, host) TTL window_start + INTERVAL 7 DAY SETTINGS ttl_only_drop_parts = 1; DROP VIEW IF EXISTS ja4_processing.mv_agg_path_sequences_1h; CREATE MATERIALIZED VIEW ja4_processing.mv_agg_path_sequences_1h TO ja4_processing.agg_path_sequences_1h AS SELECT toStartOfHour(time) AS window_start, toIPv6(src_ip) AS src_ip, ja4, host, groupArrayState(100)( tuple(toUInt32(toUnixTimestamp(time)), path) ) AS path_sequence FROM ja4_logs.http_logs GROUP BY window_start, src_ip, ja4, host; -- ============================================================================= -- §5.3 — Fingerprinting par timing inter-requêtes (Request Cadence Fingerprint) -- -- Principe : stocker les timestamps nanoseconde de chaque requête et calculer -- le coefficient de variation (CV) des intervalles inter-requêtes, ainsi que -- le ratio burst/pause. -- -- Signal : -- - Humain : CV ≈ 1.5–3.0 (intervalles irréguliers) -- - Bot régulier : CV ≈ 0.01–0.3 (sleep-based, quasi-constant) -- - Bot avec jitter : CV ≈ 0.3–0.8 (aléatoire mais borné) -- ============================================================================= CREATE TABLE IF NOT EXISTS ja4_processing.agg_request_timing_1h ( window_start DateTime, src_ip IPv6, ja4 LowCardinality(String), host LowCardinality(String), -- Timestamps nanoseconde (a_timestamp de mod_reqin_log) request_times AggregateFunction(groupArrayIf(500), UInt64, UInt8) ) ENGINE = AggregatingMergeTree() PARTITION BY toDate(window_start) ORDER BY (window_start, src_ip, ja4, host) TTL window_start + INTERVAL 7 DAY SETTINGS ttl_only_drop_parts = 1; DROP VIEW IF EXISTS ja4_processing.mv_agg_request_timing_1h; CREATE MATERIALIZED VIEW ja4_processing.mv_agg_request_timing_1h TO ja4_processing.agg_request_timing_1h AS SELECT toStartOfHour(time) AS window_start, toIPv6(src_ip) AS src_ip, ja4, host, -- a_timestamp = nanoseconde depuis epoch (mod_reqin_log) -- Filtre les orphelins B-only (a_timestamp = 0) groupArrayIfState(500)(a_timestamp, a_timestamp > 0) AS request_times FROM ja4_logs.http_logs GROUP BY window_start, src_ip, ja4, host; -- ============================================================================= -- §5.5 — Dérive de fingerprint TLS intra-session (Intra-Session JA4 Drift) -- §5.8 — Empreinte comportementale cross-domaine (Cross-Domain Session Linking) -- -- Ces deux techniques nécessitent une agrégation par (window, src_ip) sans -- décomposition par ja4/host. -- §5.5 : séquence temporelle de JA4 par IP → drift_ratio -- §5.8 : distribution des hits par host → host_diversity, sweep_speed, -- coverage_uniformity -- ============================================================================= CREATE TABLE IF NOT EXISTS ja4_processing.agg_ip_behavior_1h ( window_start DateTime, src_ip IPv6, -- §5.5 : séquences (unix_timestamp, ja4) pour détection de drift ja4_sequence AggregateFunction(groupArray(200), Tuple(UInt32, String)), -- §5.8 : distribution des hits par host (sumMap agrège par clé) host_hits_keys AggregateFunction(sumMap, Array(String), Array(UInt64)), -- §5.8 : nombre de hosts distincts host_count AggregateFunction(uniq, String), -- §5.8 : métriques temporelles total_hits SimpleAggregateFunction(sum, UInt64), first_seen SimpleAggregateFunction(min, DateTime), last_seen SimpleAggregateFunction(max, DateTime) ) ENGINE = AggregatingMergeTree() PARTITION BY toDate(window_start) ORDER BY (window_start, src_ip) TTL window_start + INTERVAL 7 DAY SETTINGS ttl_only_drop_parts = 1; DROP VIEW IF EXISTS ja4_processing.mv_agg_ip_behavior_1h; CREATE MATERIALIZED VIEW ja4_processing.mv_agg_ip_behavior_1h TO ja4_processing.agg_ip_behavior_1h AS SELECT toStartOfHour(time) AS window_start, toIPv6(src_ip) AS src_ip, groupArrayState(200)( tuple(toUInt32(toUnixTimestamp(time)), ja4) ) AS ja4_sequence, sumMapState([toString(host)], [toUInt64(1)]) AS host_hits_keys, uniqState(toString(host)) AS host_count, count() AS total_hits, min(time) AS first_seen, max(time) AS last_seen FROM ja4_logs.http_logs GROUP BY window_start, src_ip; -- ============================================================================= -- §5.4 — Détection de navigation synthétique (Resource Dependency Tree) -- -- Principe : stocker les tuples (timestamp, is_asset) par session pour -- mesurer le délai HTML→premier asset et la simultanéité des assets. -- -- Signal : -- - Navigateur réel : cascade naturelle (50–200ms HTML→CSS→JS) -- - Playwright : chargement quasi-simultané (<10ms) -- - Scraper avec assets : séquentiel sans cascade hiérarchique -- ============================================================================= CREATE TABLE IF NOT EXISTS ja4_processing.agg_resource_cascade_1h ( window_start DateTime, src_ip IPv6, ja4 LowCardinality(String), host LowCardinality(String), -- Tuples (unix_timestamp, is_asset) pour analyse de cascade resource_loads AggregateFunction(groupArray(200), Tuple(UInt32, UInt8)) ) ENGINE = AggregatingMergeTree() PARTITION BY toDate(window_start) ORDER BY (window_start, src_ip, ja4, host) TTL window_start + INTERVAL 7 DAY SETTINGS ttl_only_drop_parts = 1; DROP VIEW IF EXISTS ja4_processing.mv_agg_resource_cascade_1h; CREATE MATERIALIZED VIEW ja4_processing.mv_agg_resource_cascade_1h TO ja4_processing.agg_resource_cascade_1h AS SELECT toStartOfHour(time) AS window_start, toIPv6(src_ip) AS src_ip, ja4, host, groupArrayState(200)( tuple( toUInt32(toUnixTimestamp(time)), -- Classification : 1 = asset statique, 0 = document/API toUInt8(match(path, '(?i)\\.(css|js|png|jpg|jpeg|gif|svg|ico|woff2?|ttf|eot|webp|avif)$')) ) ) AS resource_loads FROM ja4_logs.http_logs GROUP BY window_start, src_ip, ja4, host; -- ============================================================================= -- §5.4 — Vue resource_cascade (Resource Dependency Tree) -- -- Calcule le délai moyen entre le premier document et le premier asset, -- et l'écart-type des timestamps des assets (simultanéité). -- Doit être créée AVANT view_thesis_features_1h qui la référence. -- ============================================================================= CREATE OR REPLACE VIEW ja4_processing.view_resource_cascade_1h AS WITH cascade_raw AS ( SELECT window_start, src_ip, ja4, host, arraySort(x -> x.1, groupArrayMerge(200)(resource_loads)) AS sorted_loads FROM ja4_processing.agg_resource_cascade_1h WHERE window_start >= now() - INTERVAL 24 HOUR GROUP BY window_start, src_ip, ja4, host HAVING length(sorted_loads) >= 3 ), cascade_split AS ( SELECT window_start, src_ip, ja4, host, -- Timestamps des documents (is_asset = 0) arrayFilter(x -> x.2 = 0, sorted_loads) AS docs, -- Timestamps des assets (is_asset = 1) arrayFilter(x -> x.2 = 1, sorted_loads) AS assets FROM cascade_raw ) SELECT window_start, src_ip, ja4, host, length(docs) AS doc_count, length(assets) AS asset_count, -- Délai moyen premier document → premier asset (secondes) -- Navigateur réel : 0.05–0.2s ; Playwright : <0.01s ; Scraper : >1s ou 0 if( length(docs) > 0 AND length(assets) > 0, toFloat64(assets[1].1 - docs[1].1), -1.0 ) AS root_to_first_asset_delay, -- Simultanéité des assets : écart-type des timestamps des assets -- Navigateur : faible (batch parallèle) ; Scraper : élevé (séquentiel) if( length(assets) >= 2, sqrt(arrayReduce('varPop', arrayMap(x -> toFloat64(x.1), assets) )), -1.0 ) AS asset_load_stddev FROM cascade_split WHERE length(docs) > 0 OR length(assets) > 0; -- ============================================================================= -- view_thesis_features_1h — Vue unifiée des features avancées -- -- Joint les 4 tables d'agrégation ci-dessus pour exposer toutes les features -- de la thèse §5 dans une seule vue, joinable avec view_ai_features_1h. -- ============================================================================= CREATE OR REPLACE VIEW ja4_processing.view_thesis_features_1h AS WITH -- ── §5.1 : Extraire et trier les séquences de chemins ──────────────────────── path_raw AS ( SELECT window_start, src_ip, ja4, host, groupArrayMerge(100)(path_sequence) AS raw_tuples FROM ja4_processing.agg_path_sequences_1h WHERE window_start >= now() - INTERVAL 24 HOUR GROUP BY window_start, src_ip, ja4, host ), path_entropy AS ( SELECT window_start, src_ip, ja4, host, -- Normaliser les chemins à profondeur 2 (/shop/product/123 → /shop/product) arrayMap( t -> concat('/', arrayStringConcat( arraySlice(splitByChar('/', t.2), 2, 2), '/' )), arraySort(x -> x.1, raw_tuples) ) AS norm_paths, length(raw_tuples) AS request_count FROM path_raw ), path_bigrams AS ( SELECT window_start, src_ip, ja4, host, request_count, arrayMap( (a, b) -> concat(a, '->', b), arraySlice(norm_paths, 1, length(norm_paths) - 1), arraySlice(norm_paths, 2) ) AS bigrams FROM path_entropy WHERE request_count >= 3 ), path_features AS ( SELECT window_start, src_ip, ja4, host, request_count AS path_request_count, -- Entropie de Shannon normalisée [0, 1] des transitions de chemins if( length(arrayDistinct(bigrams)) > 1, -arrayReduce('sum', arrayMap( bg -> (toFloat64(arrayCount(x -> x = bg, bigrams)) / toFloat64(length(bigrams))) * log2(toFloat64(arrayCount(x -> x = bg, bigrams)) / toFloat64(length(bigrams))), arrayDistinct(bigrams) )) / log2(toFloat64(length(arrayDistinct(bigrams)))), 0.0 ) AS path_transition_entropy FROM path_bigrams ), -- ── §5.3 : Cadence inter-requêtes ─────────────────────────────────────────── timing_raw AS ( SELECT window_start, src_ip, ja4, host, arraySort(groupArrayIfMerge(500)(request_times)) AS sorted_times FROM ja4_processing.agg_request_timing_1h WHERE window_start >= now() - INTERVAL 24 HOUR GROUP BY window_start, src_ip, ja4, host ), timing_deltas AS ( SELECT window_start, src_ip, ja4, host, -- Intervalles en millisecondes entre requêtes consécutives arrayMap( (a, b) -> toFloat64(b - a) / 1000000.0, arraySlice(sorted_times, 1, length(sorted_times) - 1), arraySlice(sorted_times, 2) ) AS deltas_ms FROM timing_raw WHERE length(sorted_times) >= 3 ), cadence_features AS ( SELECT window_start, src_ip, ja4, host, length(deltas_ms) + 1 AS cadence_request_count, -- Coefficient de variation : σ/μ (humain ≈ 1.5–3.0 ; bot ≈ 0.01–0.3) if( arrayReduce('avg', deltas_ms) > 0, sqrt(arrayReduce('varPop', deltas_ms)) / arrayReduce('avg', deltas_ms), 0.0 ) AS cadence_cv, -- Ratio burst/pause : fraction de Δt < 100ms (burst) vs Δt > 5000ms (pause) if( length(deltas_ms) > 0, toFloat64(arrayCount(x -> x < 100.0, deltas_ms)) / toFloat64(length(deltas_ms)), 0.0 ) AS burst_ratio, if( length(deltas_ms) > 0, toFloat64(arrayCount(x -> x > 5000.0, deltas_ms)) / toFloat64(length(deltas_ms)), 0.0 ) AS pause_ratio, -- Autocorrélation lag-1 : ρ₁(Δt) — humain ≈ 0 (indépendant), bot avec jitter ≈ 0.8+ if( length(deltas_ms) >= 4 AND arrayReduce('varPop', deltas_ms) > 1e-9, ( arrayReduce('avg', arrayMap( (a, b) -> (a - arrayReduce('avg', deltas_ms)) * (b - arrayReduce('avg', deltas_ms)), arraySlice(deltas_ms, 1, length(deltas_ms) - 1), arraySlice(deltas_ms, 2) ) ) / arrayReduce('varPop', deltas_ms) ), 0.0 ) AS lag1_autocorrelation, -- Loi de Benford : χ² entre premiers chiffres des Δt et distribution attendue -- Benford P(d) = log10(1 + 1/d) pour d=1..9 if( length(deltas_ms) >= 10, arraySum( arrayMap( d -> pow( (toFloat64(arrayCount( x -> x = d, arrayMap(v -> toUInt8(substring(toString(toUInt64(greatest(abs(v), 1))), 1, 1)), deltas_ms) )) / toFloat64(length(deltas_ms))) - [0.301, 0.176, 0.125, 0.097, 0.079, 0.067, 0.058, 0.051, 0.046][d], 2 ) / [0.301, 0.176, 0.125, 0.097, 0.079, 0.067, 0.058, 0.051, 0.046][d], [1, 2, 3, 4, 5, 6, 7, 8, 9] ) ), 0.0 ) AS benford_deviation FROM timing_deltas ), -- ── §5.5 : Dérive JA4 intra-session ──────────────────────────────────────── drift_raw AS ( SELECT window_start, src_ip, groupArrayMerge(200)(ja4_sequence) AS raw_ja4_tuples, uniqMerge(host_count) AS n_hosts, sum(total_hits) AS ip_total_hits, min(first_seen) AS ip_first_seen, max(last_seen) AS ip_last_seen, sumMapMerge(host_hits_keys) AS host_hits_merged FROM ja4_processing.agg_ip_behavior_1h WHERE window_start >= now() - INTERVAL 24 HOUR GROUP BY window_start, src_ip ), drift_segments AS ( SELECT window_start, src_ip, n_hosts, ip_total_hits, ip_first_seen, ip_last_seen, host_hits_merged, -- Extraire la séquence de JA4 triée par temps arrayMap(t -> t.2, arraySort(x -> x.1, raw_ja4_tuples)) AS ja4_seq, -- Segmenter en fenêtres de 10 minutes : identifier le JA4 dominant par segment -- Simplification : compter les transitions JA4 consécutives length(raw_ja4_tuples) AS seq_len FROM drift_raw ), ja4_drift_features AS ( SELECT window_start, src_ip, n_hosts, ip_total_hits, ip_first_seen, ip_last_seen, host_hits_merged, -- Drift ratio = transitions consécutives / (len - 1) -- Transition = ja4_seq[i] != ja4_seq[i-1] if( length(ja4_seq) > 1, toFloat64(arrayCount( (a, b) -> a != b, arraySlice(ja4_seq, 1, length(ja4_seq) - 1), arraySlice(ja4_seq, 2) )) / toFloat64(length(ja4_seq) - 1), 0.0 ) AS ja4_drift_ratio, length(arrayDistinct(ja4_seq)) AS ja4_distinct_in_session FROM drift_segments WHERE seq_len >= 2 ), -- ── §5.8 : Cross-Domain Session Linking ────────────────────────────────────── cross_domain_features AS ( SELECT window_start, src_ip, ja4_drift_ratio, ja4_distinct_in_session, -- Host diversity : nombre de hosts distincts visités n_hosts AS host_diversity, -- Host sweep speed : hosts / durée en secondes if( dateDiff('second', ip_first_seen, ip_last_seen) > 0, toFloat64(n_hosts) / toFloat64(dateDiff('second', ip_first_seen, ip_last_seen)), 0.0 ) AS host_sweep_speed, -- Host coverage uniformity : 1 - σ(hits_per_host) / μ(hits_per_host) -- Valeur proche de 1 = distribution uniforme (scanner) -- Valeur proche de 0 = concentré sur 1-2 hosts (humain) if( length(host_hits_merged.2) > 1 AND arrayReduce('avg', arrayMap(x -> toFloat64(x), host_hits_merged.2)) > 0, 1.0 - least(1.0, sqrt(arrayReduce('varPop', arrayMap(x -> toFloat64(x), host_hits_merged.2))) / arrayReduce('avg', arrayMap(x -> toFloat64(x), host_hits_merged.2)) ), 0.0 ) AS host_coverage_uniformity FROM ja4_drift_features ), -- ── §5.8b : Similarité Jaccard cross-domaine ──────────────────────────────── -- Principe : un scanner visite les mêmes chemins (/admin, /wp-login.php, /.env) -- sur plusieurs hosts distincts. Le coefficient de Jaccard mesure la proportion -- de chemins partagés entre hosts. -- Signal élevé (>0.5) = même liste de chemins sur plusieurs sites → scanning systématique. jaccard_paths AS ( SELECT toStartOfHour(time) AS window_start, toIPv6(src_ip) AS src_ip, -- Fraction de chemins normalisés apparaissant sur ≥2 hosts distincts toFloat64(countIf(distinct_hosts >= 2)) / greatest(toFloat64(count()), 1.0) AS cross_domain_path_similarity FROM ( SELECT toStartOfHour(time) AS time, src_ip, -- Normaliser le chemin à profondeur 2 (ignorer les paramètres de query) arrayStringConcat( arraySlice( splitByChar('/', replaceRegexpAll(path, '\\?.*', '')), 1, 3 ), '/' ) AS path_norm, uniqExact(host) AS distinct_hosts FROM ja4_logs.http_logs WHERE time >= now() - INTERVAL 24 HOUR GROUP BY time, src_ip, path_norm HAVING distinct_hosts >= 1 ) GROUP BY window_start, src_ip ) -- ── Jointure finale : features §5.1/§5.3 par (window, ip, ja4, host) -- enrichies des features §5.5/§5.8 par (window, ip) -- et des features §5.4 Resource Cascade par (window, ip, ja4, host) SELECT p.window_start, p.src_ip, p.ja4, p.host, -- §5.1 Path Sequence Entropy p.path_transition_entropy, p.path_request_count, -- §5.3 Request Cadence Fingerprint c.cadence_cv, c.burst_ratio, c.pause_ratio, c.lag1_autocorrelation, c.benford_deviation, c.cadence_request_count, -- §5.4 Resource Dependency Tree coalesce(rc.doc_count, 0) AS doc_count, coalesce(rc.asset_count, 0) AS asset_count, coalesce(rc.root_to_first_asset_delay, -1.0) AS root_to_first_asset_delay, coalesce(rc.asset_load_stddev, -1.0) AS asset_load_stddev, -- §5.5 Intra-Session JA4 Drift d.ja4_drift_ratio, d.ja4_distinct_in_session, -- §5.8 Cross-Domain Session Linking d.host_diversity, d.host_sweep_speed, d.host_coverage_uniformity, -- §5.8b Jaccard cross-domaine (proportion de chemins partagés entre hosts) coalesce(jp.cross_domain_path_similarity, 0.0) AS cross_domain_path_similarity FROM path_features p LEFT JOIN cadence_features c ON p.window_start = c.window_start AND p.src_ip = c.src_ip AND p.ja4 = c.ja4 AND p.host = c.host LEFT JOIN cross_domain_features d ON p.window_start = d.window_start AND p.src_ip = d.src_ip LEFT JOIN jaccard_paths jp ON p.window_start = jp.window_start AND p.src_ip = jp.src_ip LEFT JOIN ja4_processing.view_resource_cascade_1h rc ON p.window_start = rc.window_start AND p.src_ip = rc.src_ip AND p.ja4 = rc.ja4 AND p.host = rc.host; -- ============================================================================= -- §5.2 — Graphe bipartite JA4×ASN (Bipartite Bot Fleet Detection) -- -- IMPOSSIBLE EN MV PURE : nécessite un algorithme de détection de communautés -- (Louvain / Label Propagation) sur un graphe bipartite, ce qui dépasse les -- capacités du SQL analytique. -- -- PLAN D'IMPLÉMENTATION FUTURE (Python) : -- 1. Requête : SELECT ja4, toString(src_asn), count(DISTINCT src_ip) AS edge_weight -- FROM ja4_processing.agg_host_ip_ja4_1h -- WHERE window_start >= now() - INTERVAL 24 HOUR -- GROUP BY ja4, src_asn HAVING edge_weight >= 3 -- 2. Construction du graphe bipartite G = (JA4 ∪ ASN, E) avec networkx -- 3. Projection sur les JA4 : G_ja4 = bipartite.weighted_projected_graph(G, ja4_nodes) -- 4. Détection de communautés : communities = community.louvain_communities(G_ja4) -- 5. Pour chaque communauté : -- fleet_score = len(community) * nx.density(G.subgraph(community)) / log(n_asn + 1) -- 6. Écriture dans une table ja4_processing.fleet_detection_results -- -- Dépendances : networkx >= 3.0, python-louvain -- ============================================================================= -- ============================================================================= -- §5.6 — Corrélation DNS passive (DNS Shadow Analysis) -- -- IMPOSSIBLE ACTUELLEMENT : ja4sentinel ne capture pas les paquets DNS (UDP/53). -- -- PLAN D'IMPLÉMENTATION FUTURE : -- 1. Étendre ja4sentinel (capture.go) : -- - Ajouter un BPF filter pour UDP port 53 -- - Parser les réponses DNS (paquet → query_name, response_ip, ttl) -- - Émettre un nouveau type d'événement : dns_event -- 2. Nouvelle table ClickHouse : ja4_logs.dns_logs (time, src_ip, query_name, -- response_ip, dns_ttl, query_type) -- 3. MV d'agrégation : agg_dns_http_correlation_1h -- - Jointure dns_logs × http_logs par (src_ip, host ≈ query_name) -- - Feature : dns_shadow_ratio = count(http) / nullif(count(dns), 0) -- 4. Ajout à view_thesis_features_1h -- -- Effort estimé : modification de ja4sentinel (Go) + nouveau pipeline de corrélation -- ============================================================================= -- ============================================================================= -- §5.7 — Invariant de ratio de compression (Compression Ratio Invariant) -- -- IMPOSSIBLE ACTUELLEMENT : mod_reqin_log ne capture pas les tailles de réponse -- pré/post-compression. -- -- PLAN D'IMPLÉMENTATION FUTURE : -- 1. Étendre mod_reqin_log (mod_reqin_log.c) : -- - Capturer r->bytes_sent (taille compressée envoyée) -- - Capturer la taille non-compressée via output filter ou r->clength -- - Ajouter response_bytes_compressed, response_bytes_raw au JSON -- 2. Propager dans http_logs : 2 nouvelles colonnes UInt32 -- 3. Features calculables : -- - compression_ratio = response_bytes_compressed / response_bytes_raw -- - compression_ratio_variance = varPop(compression_ratio) par session -- - Un bot qui ne décompresse pas = ratio constant indépendant du contenu -- 4. Ajout à l'agrégation existante ou nouvelle table -- -- Effort estimé : modification C de mod_reqin_log + extension du schéma -- =============================================================================