feat: implement thesis §5 advanced detection techniques as ClickHouse MVs

New aggregation tables + materialized views:
- agg_path_sequences_1h + MV (§5.1 Path Sequence Entropy)
- agg_request_timing_1h + MV (§5.3 Request Cadence Fingerprint)
- agg_ip_behavior_1h + MV (§5.5 JA4 Drift + §5.8 Cross-Domain)
- agg_resource_cascade_1h + MV (§5.4 Resource Dependency Tree)

New analytical views:
- view_thesis_features_1h: unified view exposing all computable features
  (path_transition_entropy, cadence_cv, burst_ratio, pause_ratio,
   ja4_drift_ratio, host_diversity, host_sweep_speed,
   host_coverage_uniformity)
- view_resource_cascade_1h: root_to_first_asset_delay, asset_load_stddev

Documented future techniques (not feasible as MV):
- §5.2 Bipartite Fleet Graph (needs Python networkx)
- §5.6 DNS Shadow Analysis (needs sentinel UDP/53 extension)
- §5.7 Compression Ratio Invariant (needs mod_reqin_log extension)

Updated: deploy_schema.sh, verify_mvs.py (sections 8-10)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
toto
2026-04-08 01:42:52 +02:00
parent 0ccd417a02
commit 6d02f21c1e
3 changed files with 635 additions and 0 deletions

View File

@ -0,0 +1,534 @@
-- =============================================================================
-- 12_thesis_features.sql — Techniques avancées de détection (Thèse §5)
--
-- Implémente les techniques originales décrites dans :
-- docs/THESIS_HTTP_Traffic_Detection.md
--
-- Chaque section crée une table d'agrégation + MV + vue analytique.
-- Les features calculées sont exposées dans view_thesis_features_1h,
-- joinable avec view_ai_features_1h sur (window_start, src_ip, ja4, host).
-- =============================================================================
-- =============================================================================
-- §5.1 — Entropie de séquence de chemins (Path Sequence Entropy)
--
-- Principe : stocker les séquences ordonnées de chemins par session et calculer
-- l'entropie de transition de Markov d'ordre 1 sur les chemins normalisés à
-- profondeur 2 (ex: /shop/product/*).
--
-- Signal :
-- - Humain : entropie élevée (transitions variées, non-déterministes)
-- - Crawler : entropie faible (transitions prévisibles, séquentielles)
-- - Scanner : entropie nulle (même chemin répété)
-- =============================================================================
CREATE TABLE IF NOT EXISTS ja4_processing.agg_path_sequences_1h
(
window_start DateTime,
src_ip IPv6,
ja4 LowCardinality(String),
host LowCardinality(String),
-- Séquences (unix_timestamp, path) — triées par timestamp à la lecture
path_sequence AggregateFunction(groupArray(100), Tuple(UInt32, String))
)
ENGINE = AggregatingMergeTree()
PARTITION BY toDate(window_start)
ORDER BY (window_start, src_ip, ja4, host)
TTL window_start + INTERVAL 7 DAY
SETTINGS ttl_only_drop_parts = 1;
DROP VIEW IF EXISTS ja4_processing.mv_agg_path_sequences_1h;
CREATE MATERIALIZED VIEW ja4_processing.mv_agg_path_sequences_1h
TO ja4_processing.agg_path_sequences_1h AS
SELECT
toStartOfHour(time) AS window_start,
toIPv6(src_ip) AS src_ip,
ja4,
host,
groupArrayState(100)(
tuple(toUInt32(toUnixTimestamp(time)), path)
) AS path_sequence
FROM ja4_logs.http_logs
GROUP BY window_start, src_ip, ja4, host;
-- =============================================================================
-- §5.3 — Fingerprinting par timing inter-requêtes (Request Cadence Fingerprint)
--
-- Principe : stocker les timestamps nanoseconde de chaque requête et calculer
-- le coefficient de variation (CV) des intervalles inter-requêtes, ainsi que
-- le ratio burst/pause.
--
-- Signal :
-- - Humain : CV ≈ 1.53.0 (intervalles irréguliers)
-- - Bot régulier : CV ≈ 0.010.3 (sleep-based, quasi-constant)
-- - Bot avec jitter : CV ≈ 0.30.8 (aléatoire mais borné)
-- =============================================================================
CREATE TABLE IF NOT EXISTS ja4_processing.agg_request_timing_1h
(
window_start DateTime,
src_ip IPv6,
ja4 LowCardinality(String),
host LowCardinality(String),
-- Timestamps nanoseconde (a_timestamp de mod_reqin_log)
request_times AggregateFunction(groupArray(500), UInt64)
)
ENGINE = AggregatingMergeTree()
PARTITION BY toDate(window_start)
ORDER BY (window_start, src_ip, ja4, host)
TTL window_start + INTERVAL 7 DAY
SETTINGS ttl_only_drop_parts = 1;
DROP VIEW IF EXISTS ja4_processing.mv_agg_request_timing_1h;
CREATE MATERIALIZED VIEW ja4_processing.mv_agg_request_timing_1h
TO ja4_processing.agg_request_timing_1h AS
SELECT
toStartOfHour(time) AS window_start,
toIPv6(src_ip) AS src_ip,
ja4,
host,
-- a_timestamp = nanoseconde depuis epoch (mod_reqin_log)
-- Filtre les orphelins B-only (a_timestamp = 0)
groupArrayIfState(500)(a_timestamp, a_timestamp > 0) AS request_times
FROM ja4_logs.http_logs
GROUP BY window_start, src_ip, ja4, host;
-- =============================================================================
-- §5.5 — Dérive de fingerprint TLS intra-session (Intra-Session JA4 Drift)
-- §5.8 — Empreinte comportementale cross-domaine (Cross-Domain Session Linking)
--
-- Ces deux techniques nécessitent une agrégation par (window, src_ip) sans
-- décomposition par ja4/host.
-- §5.5 : séquence temporelle de JA4 par IP → drift_ratio
-- §5.8 : distribution des hits par host → host_diversity, sweep_speed,
-- coverage_uniformity
-- =============================================================================
CREATE TABLE IF NOT EXISTS ja4_processing.agg_ip_behavior_1h
(
window_start DateTime,
src_ip IPv6,
-- §5.5 : séquences (unix_timestamp, ja4) pour détection de drift
ja4_sequence AggregateFunction(groupArray(200), Tuple(UInt32, String)),
-- §5.8 : distribution des hits par host (sumMap agrège par clé)
host_hits_keys AggregateFunction(sumMap, Array(String), Array(UInt64)),
-- §5.8 : nombre de hosts distincts
host_count AggregateFunction(uniq, String),
-- §5.8 : métriques temporelles
total_hits SimpleAggregateFunction(sum, UInt64),
first_seen SimpleAggregateFunction(min, DateTime),
last_seen SimpleAggregateFunction(max, DateTime)
)
ENGINE = AggregatingMergeTree()
PARTITION BY toDate(window_start)
ORDER BY (window_start, src_ip)
TTL window_start + INTERVAL 7 DAY
SETTINGS ttl_only_drop_parts = 1;
DROP VIEW IF EXISTS ja4_processing.mv_agg_ip_behavior_1h;
CREATE MATERIALIZED VIEW ja4_processing.mv_agg_ip_behavior_1h
TO ja4_processing.agg_ip_behavior_1h AS
SELECT
toStartOfHour(time) AS window_start,
toIPv6(src_ip) AS src_ip,
groupArrayState(200)(
tuple(toUInt32(toUnixTimestamp(time)), ja4)
) AS ja4_sequence,
sumMapState([toString(host)], [toUInt64(1)]) AS host_hits_keys,
uniqState(toString(host)) AS host_count,
count() AS total_hits,
min(time) AS first_seen,
max(time) AS last_seen
FROM ja4_logs.http_logs
GROUP BY window_start, src_ip;
-- =============================================================================
-- §5.4 — Détection de navigation synthétique (Resource Dependency Tree)
--
-- Principe : stocker les tuples (timestamp, is_asset) par session pour
-- mesurer le délai HTML→premier asset et la simultanéité des assets.
--
-- Signal :
-- - Navigateur réel : cascade naturelle (50200ms HTML→CSS→JS)
-- - Playwright : chargement quasi-simultané (<10ms)
-- - Scraper avec assets : séquentiel sans cascade hiérarchique
-- =============================================================================
CREATE TABLE IF NOT EXISTS ja4_processing.agg_resource_cascade_1h
(
window_start DateTime,
src_ip IPv6,
ja4 LowCardinality(String),
host LowCardinality(String),
-- Tuples (unix_timestamp, is_asset) pour analyse de cascade
resource_loads AggregateFunction(groupArray(200), Tuple(UInt32, UInt8))
)
ENGINE = AggregatingMergeTree()
PARTITION BY toDate(window_start)
ORDER BY (window_start, src_ip, ja4, host)
TTL window_start + INTERVAL 7 DAY
SETTINGS ttl_only_drop_parts = 1;
DROP VIEW IF EXISTS ja4_processing.mv_agg_resource_cascade_1h;
CREATE MATERIALIZED VIEW ja4_processing.mv_agg_resource_cascade_1h
TO ja4_processing.agg_resource_cascade_1h AS
SELECT
toStartOfHour(time) AS window_start,
toIPv6(src_ip) AS src_ip,
ja4,
host,
groupArrayState(200)(
tuple(
toUInt32(toUnixTimestamp(time)),
-- Classification : 1 = asset statique, 0 = document/API
toUInt8(match(path, '(?i)\\.(css|js|png|jpg|jpeg|gif|svg|ico|woff2?|ttf|eot|webp|avif)$'))
)
) AS resource_loads
FROM ja4_logs.http_logs
GROUP BY window_start, src_ip, ja4, host;
-- =============================================================================
-- view_thesis_features_1h — Vue unifiée des features avancées
--
-- Joint les 4 tables d'agrégation ci-dessus pour exposer toutes les features
-- de la thèse §5 dans une seule vue, joinable avec view_ai_features_1h.
-- =============================================================================
CREATE OR REPLACE VIEW ja4_processing.view_thesis_features_1h AS
WITH
-- ── §5.1 : Extraire et trier les séquences de chemins ────────────────────────
path_raw AS (
SELECT
window_start, src_ip, ja4, host,
groupArrayMerge(100)(path_sequence) AS raw_tuples
FROM ja4_processing.agg_path_sequences_1h
WHERE window_start >= now() - INTERVAL 24 HOUR
GROUP BY window_start, src_ip, ja4, host
),
path_entropy AS (
SELECT
window_start, src_ip, ja4, host,
-- Normaliser les chemins à profondeur 2 (/shop/product/123 → /shop/product)
arrayMap(
t -> concat('/', arrayStringConcat(
arraySlice(splitByChar('/', t.2), 2, 2), '/'
)),
arraySort(x -> x.1, raw_tuples)
) AS norm_paths,
length(raw_tuples) AS request_count
FROM path_raw
),
path_bigrams AS (
SELECT
window_start, src_ip, ja4, host,
request_count,
arrayMap(
(a, b) -> concat(a, '->', b),
arraySlice(norm_paths, 1, length(norm_paths) - 1),
arraySlice(norm_paths, 2)
) AS bigrams
FROM path_entropy
WHERE request_count >= 3
),
path_features AS (
SELECT
window_start, src_ip, ja4, host,
request_count AS path_request_count,
-- Entropie de Shannon normalisée [0, 1] des transitions de chemins
if(
length(arrayDistinct(bigrams)) > 1,
-arrayReduce('sum', arrayMap(
bg -> (toFloat64(arrayCount(x -> x = bg, bigrams)) / toFloat64(length(bigrams)))
* log2(toFloat64(arrayCount(x -> x = bg, bigrams)) / toFloat64(length(bigrams))),
arrayDistinct(bigrams)
)) / log2(toFloat64(length(arrayDistinct(bigrams)))),
0.0
) AS path_transition_entropy
FROM path_bigrams
),
-- ── §5.3 : Cadence inter-requêtes ───────────────────────────────────────────
timing_raw AS (
SELECT
window_start, src_ip, ja4, host,
arraySort(groupArrayIfMerge(500)(request_times)) AS sorted_times
FROM ja4_processing.agg_request_timing_1h
WHERE window_start >= now() - INTERVAL 24 HOUR
GROUP BY window_start, src_ip, ja4, host
),
timing_deltas AS (
SELECT
window_start, src_ip, ja4, host,
-- Intervalles en millisecondes entre requêtes consécutives
arrayMap(
(a, b) -> toFloat64(b - a) / 1000000.0,
arraySlice(sorted_times, 1, length(sorted_times) - 1),
arraySlice(sorted_times, 2)
) AS deltas_ms
FROM timing_raw
WHERE length(sorted_times) >= 3
),
cadence_features AS (
SELECT
window_start, src_ip, ja4, host,
length(deltas_ms) + 1 AS cadence_request_count,
-- Coefficient de variation : σ/μ (humain ≈ 1.53.0 ; bot ≈ 0.010.3)
if(
arrayReduce('avg', deltas_ms) > 0,
sqrt(arrayReduce('varPop', deltas_ms))
/ arrayReduce('avg', deltas_ms),
0.0
) AS cadence_cv,
-- Ratio burst/pause : fraction de Δt < 100ms (burst) vs Δt > 5000ms (pause)
if(
length(deltas_ms) > 0,
toFloat64(arrayCount(x -> x < 100.0, deltas_ms))
/ toFloat64(length(deltas_ms)),
0.0
) AS burst_ratio,
if(
length(deltas_ms) > 0,
toFloat64(arrayCount(x -> x > 5000.0, deltas_ms))
/ toFloat64(length(deltas_ms)),
0.0
) AS pause_ratio
FROM timing_deltas
),
-- ── §5.5 : Dérive JA4 intra-session ────────────────────────────────────────
drift_raw AS (
SELECT
window_start, src_ip,
groupArrayMerge(200)(ja4_sequence) AS raw_ja4_tuples,
uniqMerge(host_count) AS n_hosts,
sum(total_hits) AS ip_total_hits,
min(first_seen) AS ip_first_seen,
max(last_seen) AS ip_last_seen,
sumMapMerge(host_hits_keys) AS host_hits_merged
FROM ja4_processing.agg_ip_behavior_1h
WHERE window_start >= now() - INTERVAL 24 HOUR
GROUP BY window_start, src_ip
),
drift_segments AS (
SELECT
window_start, src_ip,
n_hosts, ip_total_hits, ip_first_seen, ip_last_seen,
host_hits_merged,
-- Extraire la séquence de JA4 triée par temps
arrayMap(t -> t.2, arraySort(x -> x.1, raw_ja4_tuples)) AS ja4_seq,
-- Segmenter en fenêtres de 10 minutes : identifier le JA4 dominant par segment
-- Simplification : compter les transitions JA4 consécutives
length(raw_ja4_tuples) AS seq_len
FROM drift_raw
),
ja4_drift_features AS (
SELECT
window_start, src_ip,
n_hosts, ip_total_hits, ip_first_seen, ip_last_seen,
host_hits_merged,
-- Drift ratio = transitions consécutives / (len - 1)
-- Transition = ja4_seq[i] != ja4_seq[i-1]
if(
length(ja4_seq) > 1,
toFloat64(arrayCount(
(a, b) -> a != b,
arraySlice(ja4_seq, 1, length(ja4_seq) - 1),
arraySlice(ja4_seq, 2)
)) / toFloat64(length(ja4_seq) - 1),
0.0
) AS ja4_drift_ratio,
length(arrayDistinct(ja4_seq)) AS ja4_distinct_in_session
FROM drift_segments
WHERE seq_len >= 2
),
-- ── §5.8 : Cross-Domain Session Linking ──────────────────────────────────────
cross_domain_features AS (
SELECT
window_start, src_ip,
ja4_drift_ratio,
ja4_distinct_in_session,
-- Host diversity : nombre de hosts distincts visités
n_hosts AS host_diversity,
-- Host sweep speed : hosts / durée en secondes
if(
dateDiff('second', ip_first_seen, ip_last_seen) > 0,
toFloat64(n_hosts) / toFloat64(dateDiff('second', ip_first_seen, ip_last_seen)),
0.0
) AS host_sweep_speed,
-- Host coverage uniformity : 1 - σ(hits_per_host) / μ(hits_per_host)
-- Valeur proche de 1 = distribution uniforme (scanner)
-- Valeur proche de 0 = concentré sur 1-2 hosts (humain)
if(
length(host_hits_merged.2) > 1
AND arrayReduce('avg', arrayMap(x -> toFloat64(x), host_hits_merged.2)) > 0,
1.0 - least(1.0,
sqrt(arrayReduce('varPop', arrayMap(x -> toFloat64(x), host_hits_merged.2)))
/ arrayReduce('avg', arrayMap(x -> toFloat64(x), host_hits_merged.2))
),
0.0
) AS host_coverage_uniformity
FROM ja4_drift_features
)
-- ── Jointure finale : features §5.1/§5.3 par (window, ip, ja4, host)
-- enrichies des features §5.5/§5.8 par (window, ip)
SELECT
p.window_start,
p.src_ip,
p.ja4,
p.host,
-- §5.1 Path Sequence Entropy
p.path_transition_entropy,
p.path_request_count,
-- §5.3 Request Cadence Fingerprint
c.cadence_cv,
c.burst_ratio,
c.pause_ratio,
c.cadence_request_count,
-- §5.5 Intra-Session JA4 Drift
d.ja4_drift_ratio,
d.ja4_distinct_in_session,
-- §5.8 Cross-Domain Session Linking
d.host_diversity,
d.host_sweep_speed,
d.host_coverage_uniformity
FROM path_features p
LEFT JOIN cadence_features c
ON p.window_start = c.window_start
AND p.src_ip = c.src_ip
AND p.ja4 = c.ja4
AND p.host = c.host
LEFT JOIN cross_domain_features d
ON p.window_start = d.window_start
AND p.src_ip = d.src_ip;
-- =============================================================================
-- §5.4 — Vue resource_cascade (Resource Dependency Tree)
--
-- Calcule le délai moyen entre le premier document et le premier asset,
-- et l'écart-type des timestamps des assets (simultanéité).
-- =============================================================================
CREATE OR REPLACE VIEW ja4_processing.view_resource_cascade_1h AS
WITH
cascade_raw AS (
SELECT
window_start, src_ip, ja4, host,
arraySort(x -> x.1, groupArrayMerge(200)(resource_loads)) AS sorted_loads
FROM ja4_processing.agg_resource_cascade_1h
WHERE window_start >= now() - INTERVAL 24 HOUR
GROUP BY window_start, src_ip, ja4, host
HAVING length(sorted_loads) >= 3
),
cascade_split AS (
SELECT
window_start, src_ip, ja4, host,
-- Timestamps des documents (is_asset = 0)
arrayFilter(x -> x.2 = 0, sorted_loads) AS docs,
-- Timestamps des assets (is_asset = 1)
arrayFilter(x -> x.2 = 1, sorted_loads) AS assets
FROM cascade_raw
)
SELECT
window_start, src_ip, ja4, host,
length(docs) AS doc_count,
length(assets) AS asset_count,
-- Délai moyen premier document → premier asset (secondes)
-- Navigateur réel : 0.050.2s ; Playwright : <0.01s ; Scraper : >1s ou 0
if(
length(docs) > 0 AND length(assets) > 0,
toFloat64(assets[1].1 - docs[1].1),
-1.0
) AS root_to_first_asset_delay,
-- Simultanéité des assets : écart-type des timestamps des assets
-- Navigateur : faible (batch parallèle) ; Scraper : élevé (séquentiel)
if(
length(assets) >= 2,
sqrt(arrayReduce('varPop',
arrayMap(x -> toFloat64(x.1), assets)
)),
-1.0
) AS asset_load_stddev
FROM cascade_split
WHERE length(docs) > 0 OR length(assets) > 0;
-- =============================================================================
-- §5.2 — Graphe bipartite JA4×ASN (Bipartite Bot Fleet Detection)
--
-- IMPOSSIBLE EN MV PURE : nécessite un algorithme de détection de communautés
-- (Louvain / Label Propagation) sur un graphe bipartite, ce qui dépasse les
-- capacités du SQL analytique.
--
-- PLAN D'IMPLÉMENTATION FUTURE (Python) :
-- 1. Requête : SELECT ja4, toString(src_asn), count(DISTINCT src_ip) AS edge_weight
-- FROM ja4_processing.agg_host_ip_ja4_1h
-- WHERE window_start >= now() - INTERVAL 24 HOUR
-- GROUP BY ja4, src_asn HAVING edge_weight >= 3
-- 2. Construction du graphe bipartite G = (JA4 ASN, E) avec networkx
-- 3. Projection sur les JA4 : G_ja4 = bipartite.weighted_projected_graph(G, ja4_nodes)
-- 4. Détection de communautés : communities = community.louvain_communities(G_ja4)
-- 5. Pour chaque communauté :
-- fleet_score = len(community) * nx.density(G.subgraph(community)) / log(n_asn + 1)
-- 6. Écriture dans une table ja4_processing.fleet_detection_results
--
-- Dépendances : networkx >= 3.0, python-louvain
-- =============================================================================
-- =============================================================================
-- §5.6 — Corrélation DNS passive (DNS Shadow Analysis)
--
-- IMPOSSIBLE ACTUELLEMENT : ja4sentinel ne capture pas les paquets DNS (UDP/53).
--
-- PLAN D'IMPLÉMENTATION FUTURE :
-- 1. Étendre ja4sentinel (capture.go) :
-- - Ajouter un BPF filter pour UDP port 53
-- - Parser les réponses DNS (paquet → query_name, response_ip, ttl)
-- - Émettre un nouveau type d'événement : dns_event
-- 2. Nouvelle table ClickHouse : ja4_logs.dns_logs (time, src_ip, query_name,
-- response_ip, dns_ttl, query_type)
-- 3. MV d'agrégation : agg_dns_http_correlation_1h
-- - Jointure dns_logs × http_logs par (src_ip, host ≈ query_name)
-- - Feature : dns_shadow_ratio = count(http) / nullif(count(dns), 0)
-- 4. Ajout à view_thesis_features_1h
--
-- Effort estimé : modification de ja4sentinel (Go) + nouveau pipeline de corrélation
-- =============================================================================
-- =============================================================================
-- §5.7 — Invariant de ratio de compression (Compression Ratio Invariant)
--
-- IMPOSSIBLE ACTUELLEMENT : mod_reqin_log ne capture pas les tailles de réponse
-- pré/post-compression.
--
-- PLAN D'IMPLÉMENTATION FUTURE :
-- 1. Étendre mod_reqin_log (mod_reqin_log.c) :
-- - Capturer r->bytes_sent (taille compressée envoyée)
-- - Capturer la taille non-compressée via output filter ou r->clength
-- - Ajouter response_bytes_compressed, response_bytes_raw au JSON
-- 2. Propager dans http_logs : 2 nouvelles colonnes UInt32
-- 3. Features calculables :
-- - compression_ratio = response_bytes_compressed / response_bytes_raw
-- - compression_ratio_variance = varPop(compression_ratio) par session
-- - Un bot qui ne décompresse pas = ratio constant indépendant du contenu
-- 4. Ajout à l'agrégation existante ou nouvelle table
--
-- Effort estimé : modification C de mod_reqin_log + extension du schéma
-- =============================================================================

View File

@ -47,6 +47,7 @@ SQL_FILES=(
09_audit_table.sql
10_perf_indexes.sql
11_views.sql
12_thesis_features.sql
)
for f in "${SQL_FILES[@]}"; do