Files
ja4-platform/shared/clickhouse/12_thesis_features.sql
toto a108814a56 feat: roadmap détection bots §2-9 — HTTP/2, cohérence, drift, flotte, Jaccard, ExIFFI, méta-learner, métriques
Étape 2 — Fingerprinting HTTP/2 dans le pipeline ML :
- Ajout du dictionnaire dict_browser_h2 (11 familles de navigateurs) dans 05_aggregation_tables.sql
- Ajout du CTE h2_agg et 4 features HTTP/2 dans 07_ai_features_view.sql :
  h2_settings_known, h2_pseudo_order_match, h2_ja4_coherence, h2_settings_rare
- Calcul du fingerprint_coherence_score (5 axes pondérés) dans la vue
- Ajout du 6e axe axis_h2_coherence dans browser.py (poids rééquilibrés)
- browser_h2.csv : 11 fingerprints Akamai → famille navigateur

Étape 3 — Pré-filtre de cohérence sur la baseline humaine :
- pipeline.py exclut les sessions avec fingerprint_coherence_score < seuil de la baseline d'entraînement
- FINGERPRINT_COHERENCE_THRESHOLD configurable via env (défaut 0.25)
- Log des sessions exclues pour analyse SOC

Étape 4 — Détection de drift améliorée :
- scoring.py : passage de 5 à 9 quantiles (p5…p95)
- Ajout de la divergence KL en complément du test KS
- Détection de drift adversarial (≥80% des features dérivent dans la même direction)
- Split temporel strict pour la validation

Étape 5 — Graphe bipartite JA4×ASN (§5.2) :
- fleet.py : détection de flottes via NetworkX + Louvain (imports optionnels)
- enrich_with_fleet_score() : ajout fleet_score + fleet_campaign_flag au DataFrame
- cycle.py : appel après preprocess_df avec log du nombre de sessions en flotte
- SQL migration 05_fleet_metrics_tables.sql : table fleet_detections (TTL 7j)
- Dashboard : /fleet + /api/fleet (communautés détectées) + template fleet.html

Étape 6 — Cross-domain Jaccard §5.8 :
- 12_thesis_features.sql : CTE jaccard_paths → cross_domain_path_similarity
- Signal : même chemins (/admin, /wp-login) sur plusieurs hosts = scanner

Étape 7 — ExIFFI + erreurs AE par feature :
- scoring.py : compute_exiffi_importance() par permutation, compute_ae_feature_errors()
- pipeline.py : calcul ExIFFI sur X_test, mapping index → dict pour anomalies
- build_reason() enrichi avec exiffi_top quand SHAP inactif

Étape 8 — Méta-learner pour la pondération de l'ensemble :
- scoring.py : classe MetaLearner (LogisticRegression, fallback poids fixes <1000 labels)
- Collecte des labels depuis le cycle courant (known_bots, légitimes, Anubis)
- pipeline.py : remplacement des poids fixes par MetaLearner.predict()

Étape 9 — Métriques de performance et monitoring :
- metrics.py : record_cycle_metrics() — taux anomalie, drift, corrélation, latence
- SQL migration 05_fleet_metrics_tables.sql : table ml_performance_metrics (TTL 90j)
- Dashboard : /health + /api/health + template health.html
- cycle.py : appel record_cycle_metrics en fin de cycle (Complet + Applicatif)

Tests : 36/36 bot-detector tests passent

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-10 00:11:35 +02:00

620 lines
24 KiB
SQL
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

-- =============================================================================
-- 12_thesis_features.sql — Techniques avancées de détection (Thèse §5)
--
-- Implémente les techniques originales décrites dans :
-- docs/THESIS_HTTP_Traffic_Detection.md
--
-- Chaque section crée une table d'agrégation + MV + vue analytique.
-- Les features calculées sont exposées dans view_thesis_features_1h,
-- joinable avec view_ai_features_1h sur (window_start, src_ip, ja4, host).
-- =============================================================================
-- =============================================================================
-- §5.1 — Entropie de séquence de chemins (Path Sequence Entropy)
--
-- Principe : stocker les séquences ordonnées de chemins par session et calculer
-- l'entropie de transition de Markov d'ordre 1 sur les chemins normalisés à
-- profondeur 2 (ex: /shop/product/*).
--
-- Signal :
-- - Humain : entropie élevée (transitions variées, non-déterministes)
-- - Crawler : entropie faible (transitions prévisibles, séquentielles)
-- - Scanner : entropie nulle (même chemin répété)
-- =============================================================================
CREATE TABLE IF NOT EXISTS ja4_processing.agg_path_sequences_1h
(
window_start DateTime,
src_ip IPv6,
ja4 LowCardinality(String),
host LowCardinality(String),
-- Séquences (unix_timestamp, path) — triées par timestamp à la lecture
path_sequence AggregateFunction(groupArray(100), Tuple(UInt32, String))
)
ENGINE = AggregatingMergeTree()
PARTITION BY toDate(window_start)
ORDER BY (window_start, src_ip, ja4, host)
TTL window_start + INTERVAL 7 DAY
SETTINGS ttl_only_drop_parts = 1;
DROP VIEW IF EXISTS ja4_processing.mv_agg_path_sequences_1h;
CREATE MATERIALIZED VIEW ja4_processing.mv_agg_path_sequences_1h
TO ja4_processing.agg_path_sequences_1h AS
SELECT
toStartOfHour(time) AS window_start,
toIPv6(src_ip) AS src_ip,
ja4,
host,
groupArrayState(100)(
tuple(toUInt32(toUnixTimestamp(time)), path)
) AS path_sequence
FROM ja4_logs.http_logs
GROUP BY window_start, src_ip, ja4, host;
-- =============================================================================
-- §5.3 — Fingerprinting par timing inter-requêtes (Request Cadence Fingerprint)
--
-- Principe : stocker les timestamps nanoseconde de chaque requête et calculer
-- le coefficient de variation (CV) des intervalles inter-requêtes, ainsi que
-- le ratio burst/pause.
--
-- Signal :
-- - Humain : CV ≈ 1.53.0 (intervalles irréguliers)
-- - Bot régulier : CV ≈ 0.010.3 (sleep-based, quasi-constant)
-- - Bot avec jitter : CV ≈ 0.30.8 (aléatoire mais borné)
-- =============================================================================
CREATE TABLE IF NOT EXISTS ja4_processing.agg_request_timing_1h
(
window_start DateTime,
src_ip IPv6,
ja4 LowCardinality(String),
host LowCardinality(String),
-- Timestamps nanoseconde (a_timestamp de mod_reqin_log)
request_times AggregateFunction(groupArrayIf(500), UInt64, UInt8)
)
ENGINE = AggregatingMergeTree()
PARTITION BY toDate(window_start)
ORDER BY (window_start, src_ip, ja4, host)
TTL window_start + INTERVAL 7 DAY
SETTINGS ttl_only_drop_parts = 1;
DROP VIEW IF EXISTS ja4_processing.mv_agg_request_timing_1h;
CREATE MATERIALIZED VIEW ja4_processing.mv_agg_request_timing_1h
TO ja4_processing.agg_request_timing_1h AS
SELECT
toStartOfHour(time) AS window_start,
toIPv6(src_ip) AS src_ip,
ja4,
host,
-- a_timestamp = nanoseconde depuis epoch (mod_reqin_log)
-- Filtre les orphelins B-only (a_timestamp = 0)
groupArrayIfState(500)(a_timestamp, a_timestamp > 0) AS request_times
FROM ja4_logs.http_logs
GROUP BY window_start, src_ip, ja4, host;
-- =============================================================================
-- §5.5 — Dérive de fingerprint TLS intra-session (Intra-Session JA4 Drift)
-- §5.8 — Empreinte comportementale cross-domaine (Cross-Domain Session Linking)
--
-- Ces deux techniques nécessitent une agrégation par (window, src_ip) sans
-- décomposition par ja4/host.
-- §5.5 : séquence temporelle de JA4 par IP → drift_ratio
-- §5.8 : distribution des hits par host → host_diversity, sweep_speed,
-- coverage_uniformity
-- =============================================================================
CREATE TABLE IF NOT EXISTS ja4_processing.agg_ip_behavior_1h
(
window_start DateTime,
src_ip IPv6,
-- §5.5 : séquences (unix_timestamp, ja4) pour détection de drift
ja4_sequence AggregateFunction(groupArray(200), Tuple(UInt32, String)),
-- §5.8 : distribution des hits par host (sumMap agrège par clé)
host_hits_keys AggregateFunction(sumMap, Array(String), Array(UInt64)),
-- §5.8 : nombre de hosts distincts
host_count AggregateFunction(uniq, String),
-- §5.8 : métriques temporelles
total_hits SimpleAggregateFunction(sum, UInt64),
first_seen SimpleAggregateFunction(min, DateTime),
last_seen SimpleAggregateFunction(max, DateTime)
)
ENGINE = AggregatingMergeTree()
PARTITION BY toDate(window_start)
ORDER BY (window_start, src_ip)
TTL window_start + INTERVAL 7 DAY
SETTINGS ttl_only_drop_parts = 1;
DROP VIEW IF EXISTS ja4_processing.mv_agg_ip_behavior_1h;
CREATE MATERIALIZED VIEW ja4_processing.mv_agg_ip_behavior_1h
TO ja4_processing.agg_ip_behavior_1h AS
SELECT
toStartOfHour(time) AS window_start,
toIPv6(src_ip) AS src_ip,
groupArrayState(200)(
tuple(toUInt32(toUnixTimestamp(time)), ja4)
) AS ja4_sequence,
sumMapState([toString(host)], [toUInt64(1)]) AS host_hits_keys,
uniqState(toString(host)) AS host_count,
count() AS total_hits,
min(time) AS first_seen,
max(time) AS last_seen
FROM ja4_logs.http_logs
GROUP BY window_start, src_ip;
-- =============================================================================
-- §5.4 — Détection de navigation synthétique (Resource Dependency Tree)
--
-- Principe : stocker les tuples (timestamp, is_asset) par session pour
-- mesurer le délai HTML→premier asset et la simultanéité des assets.
--
-- Signal :
-- - Navigateur réel : cascade naturelle (50200ms HTML→CSS→JS)
-- - Playwright : chargement quasi-simultané (<10ms)
-- - Scraper avec assets : séquentiel sans cascade hiérarchique
-- =============================================================================
CREATE TABLE IF NOT EXISTS ja4_processing.agg_resource_cascade_1h
(
window_start DateTime,
src_ip IPv6,
ja4 LowCardinality(String),
host LowCardinality(String),
-- Tuples (unix_timestamp, is_asset) pour analyse de cascade
resource_loads AggregateFunction(groupArray(200), Tuple(UInt32, UInt8))
)
ENGINE = AggregatingMergeTree()
PARTITION BY toDate(window_start)
ORDER BY (window_start, src_ip, ja4, host)
TTL window_start + INTERVAL 7 DAY
SETTINGS ttl_only_drop_parts = 1;
DROP VIEW IF EXISTS ja4_processing.mv_agg_resource_cascade_1h;
CREATE MATERIALIZED VIEW ja4_processing.mv_agg_resource_cascade_1h
TO ja4_processing.agg_resource_cascade_1h AS
SELECT
toStartOfHour(time) AS window_start,
toIPv6(src_ip) AS src_ip,
ja4,
host,
groupArrayState(200)(
tuple(
toUInt32(toUnixTimestamp(time)),
-- Classification : 1 = asset statique, 0 = document/API
toUInt8(match(path, '(?i)\\.(css|js|png|jpg|jpeg|gif|svg|ico|woff2?|ttf|eot|webp|avif)$'))
)
) AS resource_loads
FROM ja4_logs.http_logs
GROUP BY window_start, src_ip, ja4, host;
-- =============================================================================
-- §5.4 — Vue resource_cascade (Resource Dependency Tree)
--
-- Calcule le délai moyen entre le premier document et le premier asset,
-- et l'écart-type des timestamps des assets (simultanéité).
-- Doit être créée AVANT view_thesis_features_1h qui la référence.
-- =============================================================================
CREATE OR REPLACE VIEW ja4_processing.view_resource_cascade_1h AS
WITH
cascade_raw AS (
SELECT
window_start, src_ip, ja4, host,
arraySort(x -> x.1, groupArrayMerge(200)(resource_loads)) AS sorted_loads
FROM ja4_processing.agg_resource_cascade_1h
WHERE window_start >= now() - INTERVAL 24 HOUR
GROUP BY window_start, src_ip, ja4, host
HAVING length(sorted_loads) >= 3
),
cascade_split AS (
SELECT
window_start, src_ip, ja4, host,
-- Timestamps des documents (is_asset = 0)
arrayFilter(x -> x.2 = 0, sorted_loads) AS docs,
-- Timestamps des assets (is_asset = 1)
arrayFilter(x -> x.2 = 1, sorted_loads) AS assets
FROM cascade_raw
)
SELECT
window_start, src_ip, ja4, host,
length(docs) AS doc_count,
length(assets) AS asset_count,
-- Délai moyen premier document → premier asset (secondes)
-- Navigateur réel : 0.050.2s ; Playwright : <0.01s ; Scraper : >1s ou 0
if(
length(docs) > 0 AND length(assets) > 0,
toFloat64(assets[1].1 - docs[1].1),
-1.0
) AS root_to_first_asset_delay,
-- Simultanéité des assets : écart-type des timestamps des assets
-- Navigateur : faible (batch parallèle) ; Scraper : élevé (séquentiel)
if(
length(assets) >= 2,
sqrt(arrayReduce('varPop',
arrayMap(x -> toFloat64(x.1), assets)
)),
-1.0
) AS asset_load_stddev
FROM cascade_split
WHERE length(docs) > 0 OR length(assets) > 0;
-- =============================================================================
-- view_thesis_features_1h — Vue unifiée des features avancées
--
-- Joint les 4 tables d'agrégation ci-dessus pour exposer toutes les features
-- de la thèse §5 dans une seule vue, joinable avec view_ai_features_1h.
-- =============================================================================
CREATE OR REPLACE VIEW ja4_processing.view_thesis_features_1h AS
WITH
-- ── §5.1 : Extraire et trier les séquences de chemins ────────────────────────
path_raw AS (
SELECT
window_start, src_ip, ja4, host,
groupArrayMerge(100)(path_sequence) AS raw_tuples
FROM ja4_processing.agg_path_sequences_1h
WHERE window_start >= now() - INTERVAL 24 HOUR
GROUP BY window_start, src_ip, ja4, host
),
path_entropy AS (
SELECT
window_start, src_ip, ja4, host,
-- Normaliser les chemins à profondeur 2 (/shop/product/123 → /shop/product)
arrayMap(
t -> concat('/', arrayStringConcat(
arraySlice(splitByChar('/', t.2), 2, 2), '/'
)),
arraySort(x -> x.1, raw_tuples)
) AS norm_paths,
length(raw_tuples) AS request_count
FROM path_raw
),
path_bigrams AS (
SELECT
window_start, src_ip, ja4, host,
request_count,
arrayMap(
(a, b) -> concat(a, '->', b),
arraySlice(norm_paths, 1, length(norm_paths) - 1),
arraySlice(norm_paths, 2)
) AS bigrams
FROM path_entropy
WHERE request_count >= 3
),
path_features AS (
SELECT
window_start, src_ip, ja4, host,
request_count AS path_request_count,
-- Entropie de Shannon normalisée [0, 1] des transitions de chemins
if(
length(arrayDistinct(bigrams)) > 1,
-arrayReduce('sum', arrayMap(
bg -> (toFloat64(arrayCount(x -> x = bg, bigrams)) / toFloat64(length(bigrams)))
* log2(toFloat64(arrayCount(x -> x = bg, bigrams)) / toFloat64(length(bigrams))),
arrayDistinct(bigrams)
)) / log2(toFloat64(length(arrayDistinct(bigrams)))),
0.0
) AS path_transition_entropy
FROM path_bigrams
),
-- ── §5.3 : Cadence inter-requêtes ───────────────────────────────────────────
timing_raw AS (
SELECT
window_start, src_ip, ja4, host,
arraySort(groupArrayIfMerge(500)(request_times)) AS sorted_times
FROM ja4_processing.agg_request_timing_1h
WHERE window_start >= now() - INTERVAL 24 HOUR
GROUP BY window_start, src_ip, ja4, host
),
timing_deltas AS (
SELECT
window_start, src_ip, ja4, host,
-- Intervalles en millisecondes entre requêtes consécutives
arrayMap(
(a, b) -> toFloat64(b - a) / 1000000.0,
arraySlice(sorted_times, 1, length(sorted_times) - 1),
arraySlice(sorted_times, 2)
) AS deltas_ms
FROM timing_raw
WHERE length(sorted_times) >= 3
),
cadence_features AS (
SELECT
window_start, src_ip, ja4, host,
length(deltas_ms) + 1 AS cadence_request_count,
-- Coefficient de variation : σ/μ (humain ≈ 1.53.0 ; bot ≈ 0.010.3)
if(
arrayReduce('avg', deltas_ms) > 0,
sqrt(arrayReduce('varPop', deltas_ms))
/ arrayReduce('avg', deltas_ms),
0.0
) AS cadence_cv,
-- Ratio burst/pause : fraction de Δt < 100ms (burst) vs Δt > 5000ms (pause)
if(
length(deltas_ms) > 0,
toFloat64(arrayCount(x -> x < 100.0, deltas_ms))
/ toFloat64(length(deltas_ms)),
0.0
) AS burst_ratio,
if(
length(deltas_ms) > 0,
toFloat64(arrayCount(x -> x > 5000.0, deltas_ms))
/ toFloat64(length(deltas_ms)),
0.0
) AS pause_ratio,
-- Autocorrélation lag-1 : ρ₁(Δt) — humain ≈ 0 (indépendant), bot avec jitter ≈ 0.8+
if(
length(deltas_ms) >= 4 AND arrayReduce('varPop', deltas_ms) > 1e-9,
(
arrayReduce('avg',
arrayMap(
(a, b) -> (a - arrayReduce('avg', deltas_ms)) * (b - arrayReduce('avg', deltas_ms)),
arraySlice(deltas_ms, 1, length(deltas_ms) - 1),
arraySlice(deltas_ms, 2)
)
) / arrayReduce('varPop', deltas_ms)
),
0.0
) AS lag1_autocorrelation,
-- Loi de Benford : χ² entre premiers chiffres des Δt et distribution attendue
-- Benford P(d) = log10(1 + 1/d) pour d=1..9
if(
length(deltas_ms) >= 10,
arraySum(
arrayMap(
d -> pow(
(toFloat64(arrayCount(
x -> x = d,
arrayMap(v -> toUInt8(substring(toString(toUInt64(greatest(abs(v), 1))), 1, 1)), deltas_ms)
)) / toFloat64(length(deltas_ms)))
- [0.301, 0.176, 0.125, 0.097, 0.079, 0.067, 0.058, 0.051, 0.046][d],
2
) / [0.301, 0.176, 0.125, 0.097, 0.079, 0.067, 0.058, 0.051, 0.046][d],
[1, 2, 3, 4, 5, 6, 7, 8, 9]
)
),
0.0
) AS benford_deviation
FROM timing_deltas
),
-- ── §5.5 : Dérive JA4 intra-session ────────────────────────────────────────
drift_raw AS (
SELECT
window_start, src_ip,
groupArrayMerge(200)(ja4_sequence) AS raw_ja4_tuples,
uniqMerge(host_count) AS n_hosts,
sum(total_hits) AS ip_total_hits,
min(first_seen) AS ip_first_seen,
max(last_seen) AS ip_last_seen,
sumMapMerge(host_hits_keys) AS host_hits_merged
FROM ja4_processing.agg_ip_behavior_1h
WHERE window_start >= now() - INTERVAL 24 HOUR
GROUP BY window_start, src_ip
),
drift_segments AS (
SELECT
window_start, src_ip,
n_hosts, ip_total_hits, ip_first_seen, ip_last_seen,
host_hits_merged,
-- Extraire la séquence de JA4 triée par temps
arrayMap(t -> t.2, arraySort(x -> x.1, raw_ja4_tuples)) AS ja4_seq,
-- Segmenter en fenêtres de 10 minutes : identifier le JA4 dominant par segment
-- Simplification : compter les transitions JA4 consécutives
length(raw_ja4_tuples) AS seq_len
FROM drift_raw
),
ja4_drift_features AS (
SELECT
window_start, src_ip,
n_hosts, ip_total_hits, ip_first_seen, ip_last_seen,
host_hits_merged,
-- Drift ratio = transitions consécutives / (len - 1)
-- Transition = ja4_seq[i] != ja4_seq[i-1]
if(
length(ja4_seq) > 1,
toFloat64(arrayCount(
(a, b) -> a != b,
arraySlice(ja4_seq, 1, length(ja4_seq) - 1),
arraySlice(ja4_seq, 2)
)) / toFloat64(length(ja4_seq) - 1),
0.0
) AS ja4_drift_ratio,
length(arrayDistinct(ja4_seq)) AS ja4_distinct_in_session
FROM drift_segments
WHERE seq_len >= 2
),
-- ── §5.8 : Cross-Domain Session Linking ──────────────────────────────────────
cross_domain_features AS (
SELECT
window_start, src_ip,
ja4_drift_ratio,
ja4_distinct_in_session,
-- Host diversity : nombre de hosts distincts visités
n_hosts AS host_diversity,
-- Host sweep speed : hosts / durée en secondes
if(
dateDiff('second', ip_first_seen, ip_last_seen) > 0,
toFloat64(n_hosts) / toFloat64(dateDiff('second', ip_first_seen, ip_last_seen)),
0.0
) AS host_sweep_speed,
-- Host coverage uniformity : 1 - σ(hits_per_host) / μ(hits_per_host)
-- Valeur proche de 1 = distribution uniforme (scanner)
-- Valeur proche de 0 = concentré sur 1-2 hosts (humain)
if(
length(host_hits_merged.2) > 1
AND arrayReduce('avg', arrayMap(x -> toFloat64(x), host_hits_merged.2)) > 0,
1.0 - least(1.0,
sqrt(arrayReduce('varPop', arrayMap(x -> toFloat64(x), host_hits_merged.2)))
/ arrayReduce('avg', arrayMap(x -> toFloat64(x), host_hits_merged.2))
),
0.0
) AS host_coverage_uniformity
FROM ja4_drift_features
),
-- ── §5.8b : Similarité Jaccard cross-domaine ────────────────────────────────
-- Principe : un scanner visite les mêmes chemins (/admin, /wp-login.php, /.env)
-- sur plusieurs hosts distincts. Le coefficient de Jaccard mesure la proportion
-- de chemins partagés entre hosts.
-- Signal élevé (>0.5) = même liste de chemins sur plusieurs sites → scanning systématique.
jaccard_paths AS (
SELECT
toStartOfHour(time) AS window_start,
toIPv6(src_ip) AS src_ip,
-- Fraction de chemins normalisés apparaissant sur ≥2 hosts distincts
toFloat64(countIf(distinct_hosts >= 2)) / greatest(toFloat64(count()), 1.0)
AS cross_domain_path_similarity
FROM (
SELECT
toStartOfHour(time) AS time,
src_ip,
-- Normaliser le chemin à profondeur 2 (ignorer les paramètres de query)
arrayStringConcat(
arraySlice(
splitByChar('/', replaceRegexpAll(path, '\\?.*', '')),
1, 3
),
'/'
) AS path_norm,
uniqExact(host) AS distinct_hosts
FROM ja4_logs.http_logs
WHERE time >= now() - INTERVAL 24 HOUR
GROUP BY time, src_ip, path_norm
HAVING distinct_hosts >= 1
)
GROUP BY window_start, src_ip
)
-- ── Jointure finale : features §5.1/§5.3 par (window, ip, ja4, host)
-- enrichies des features §5.5/§5.8 par (window, ip)
-- et des features §5.4 Resource Cascade par (window, ip, ja4, host)
SELECT
p.window_start,
p.src_ip,
p.ja4,
p.host,
-- §5.1 Path Sequence Entropy
p.path_transition_entropy,
p.path_request_count,
-- §5.3 Request Cadence Fingerprint
c.cadence_cv,
c.burst_ratio,
c.pause_ratio,
c.lag1_autocorrelation,
c.benford_deviation,
c.cadence_request_count,
-- §5.4 Resource Dependency Tree
coalesce(rc.doc_count, 0) AS doc_count,
coalesce(rc.asset_count, 0) AS asset_count,
coalesce(rc.root_to_first_asset_delay, -1.0) AS root_to_first_asset_delay,
coalesce(rc.asset_load_stddev, -1.0) AS asset_load_stddev,
-- §5.5 Intra-Session JA4 Drift
d.ja4_drift_ratio,
d.ja4_distinct_in_session,
-- §5.8 Cross-Domain Session Linking
d.host_diversity,
d.host_sweep_speed,
d.host_coverage_uniformity,
-- §5.8b Jaccard cross-domaine (proportion de chemins partagés entre hosts)
coalesce(jp.cross_domain_path_similarity, 0.0) AS cross_domain_path_similarity
FROM path_features p
LEFT JOIN cadence_features c
ON p.window_start = c.window_start
AND p.src_ip = c.src_ip
AND p.ja4 = c.ja4
AND p.host = c.host
LEFT JOIN cross_domain_features d
ON p.window_start = d.window_start
AND p.src_ip = d.src_ip
LEFT JOIN jaccard_paths jp
ON p.window_start = jp.window_start
AND p.src_ip = jp.src_ip
LEFT JOIN ja4_processing.view_resource_cascade_1h rc
ON p.window_start = rc.window_start
AND p.src_ip = rc.src_ip
AND p.ja4 = rc.ja4
AND p.host = rc.host;
-- =============================================================================
-- §5.2 — Graphe bipartite JA4×ASN (Bipartite Bot Fleet Detection)
--
-- IMPOSSIBLE EN MV PURE : nécessite un algorithme de détection de communautés
-- (Louvain / Label Propagation) sur un graphe bipartite, ce qui dépasse les
-- capacités du SQL analytique.
--
-- PLAN D'IMPLÉMENTATION FUTURE (Python) :
-- 1. Requête : SELECT ja4, toString(src_asn), count(DISTINCT src_ip) AS edge_weight
-- FROM ja4_processing.agg_host_ip_ja4_1h
-- WHERE window_start >= now() - INTERVAL 24 HOUR
-- GROUP BY ja4, src_asn HAVING edge_weight >= 3
-- 2. Construction du graphe bipartite G = (JA4 ASN, E) avec networkx
-- 3. Projection sur les JA4 : G_ja4 = bipartite.weighted_projected_graph(G, ja4_nodes)
-- 4. Détection de communautés : communities = community.louvain_communities(G_ja4)
-- 5. Pour chaque communauté :
-- fleet_score = len(community) * nx.density(G.subgraph(community)) / log(n_asn + 1)
-- 6. Écriture dans une table ja4_processing.fleet_detection_results
--
-- Dépendances : networkx >= 3.0, python-louvain
-- =============================================================================
-- =============================================================================
-- §5.6 — Corrélation DNS passive (DNS Shadow Analysis)
--
-- IMPOSSIBLE ACTUELLEMENT : ja4sentinel ne capture pas les paquets DNS (UDP/53).
--
-- PLAN D'IMPLÉMENTATION FUTURE :
-- 1. Étendre ja4sentinel (capture.go) :
-- - Ajouter un BPF filter pour UDP port 53
-- - Parser les réponses DNS (paquet → query_name, response_ip, ttl)
-- - Émettre un nouveau type d'événement : dns_event
-- 2. Nouvelle table ClickHouse : ja4_logs.dns_logs (time, src_ip, query_name,
-- response_ip, dns_ttl, query_type)
-- 3. MV d'agrégation : agg_dns_http_correlation_1h
-- - Jointure dns_logs × http_logs par (src_ip, host ≈ query_name)
-- - Feature : dns_shadow_ratio = count(http) / nullif(count(dns), 0)
-- 4. Ajout à view_thesis_features_1h
--
-- Effort estimé : modification de ja4sentinel (Go) + nouveau pipeline de corrélation
-- =============================================================================
-- =============================================================================
-- §5.7 — Invariant de ratio de compression (Compression Ratio Invariant)
--
-- IMPOSSIBLE ACTUELLEMENT : mod_reqin_log ne capture pas les tailles de réponse
-- pré/post-compression.
--
-- PLAN D'IMPLÉMENTATION FUTURE :
-- 1. Étendre mod_reqin_log (mod_reqin_log.c) :
-- - Capturer r->bytes_sent (taille compressée envoyée)
-- - Capturer la taille non-compressée via output filter ou r->clength
-- - Ajouter response_bytes_compressed, response_bytes_raw au JSON
-- 2. Propager dans http_logs : 2 nouvelles colonnes UInt32
-- 3. Features calculables :
-- - compression_ratio = response_bytes_compressed / response_bytes_raw
-- - compression_ratio_variance = varPop(compression_ratio) par session
-- - Un bot qui ne décompresse pas = ratio constant indépendant du contenu
-- 4. Ajout à l'agrégation existante ou nouvelle table
--
-- Effort estimé : modification C de mod_reqin_log + extension du schéma
-- =============================================================================