Files
ja4-platform/tests/integration/verify_mvs.py
toto 51b8eb57a8 feat: port v14 schema fixes, migration, MV verifier, thesis from ja4/
deploy_views.sql (v13 → v14):
- CRITICAL: ml_detected_anomalies ORDER BY (src_ip) → (src_ip, ja4, host, model_name)
  ReplacingMergeTree was collapsing all detections to 1 row per IP on merge
- Add PARTITION BY toDate + ttl_only_drop_parts on all 4 data tables
- ml_all_scores TTL 3d → 7d; ml_detected_anomalies TTL 30d → 7d
- agg_host_ip_ja4_1h + agg_header_fingerprint_1h: add partition + TTL 7d
- view_ip_recurrence: add WHERE detected_at >= now() - 7 DAY (was full scan)
- Remove dead views: summary/timeseries/threat_dist/variability
- Add view_dashboard_entities (fixes HTTP 500 in clustering/incidents/fingerprints)
- Add view_dashboard_user_agents (fixes HTTP 500 in fingerprints/metrics)
- Add view_ai_features_24h (enables ENABLE_MULTIWINDOW in bot_detector)
- Mark max_requests_per_sec as DEPRECATED (always 0)

New files:
- correlator/sql/migrations/01_ttl_adjustments.sql: ALTER TABLE migration
- tests/integration/verify_mvs.py: MV pipeline verification assertions
- docs/THESIS_HTTP_Traffic_Detection.md: detection techniques thesis

All DB references use ja4_processing/ja4_logs (no mabase_prod).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-07 23:51:56 +02:00

196 lines
8.3 KiB
Python

#!/usr/bin/env python3
"""
verify_mvs.py — Vérifie que les vues matérialisées ClickHouse sont correctement peuplées.
Assertions effectuées :
1. http_logs_raw : nb lignes >= 5 000 (mod_reqin_log + logcorrelator fonctionnent)
2. http_logs : nb lignes >= 5 000 (MV mv_http_logs fonctionne)
3. agg_host_ip_ja4_1h : nb lignes > 0 (MV mv_agg_host_ip_ja4_1h fonctionne)
4. view_ai_features_1h : nb lignes > 0 (vue alimentée par agg_host_ip_ja4_1h)
5. ml_all_scores : nb lignes > 0 (bot_detector a tourné)
6. ml_detected_anomalies : requête OK (table accessible)
Codes de sortie :
0 = tous les tests passent
1 = au moins un test échoue
"""
import os
import sys
import time
import clickhouse_connect
# --------------------------------------------------------------------------
# Connexion ClickHouse
# --------------------------------------------------------------------------
CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST", "clickhouse")
CLICKHOUSE_PORT = int(os.getenv("CLICKHOUSE_PORT", "8123"))
CLICKHOUSE_DB = os.getenv("CLICKHOUSE_DB", "ja4_processing")
CLICKHOUSE_USER = os.getenv("CLICKHOUSE_USER", "default")
CLICKHOUSE_PASS = os.getenv("CLICKHOUSE_PASSWORD", "")
# --------------------------------------------------------------------------
# Helpers
# --------------------------------------------------------------------------
PASS = "\033[92m✓\033[0m"
FAIL = "\033[91m✗\033[0m"
def _count(client, query: str) -> int:
"""Exécute un SELECT count(*) et retourne le résultat."""
result = client.query(query)
return int(result.result_rows[0][0])
def _check(label: str, actual: int, op: str, expected: int) -> bool:
"""Vérifie une assertion et affiche le résultat."""
ok = (op == ">=" and actual >= expected) or \
(op == ">" and actual > expected) or \
(op == "==" and actual == expected)
symbol = PASS if ok else FAIL
print(f" {symbol} {label:40s} {actual:>8,d} (attendu {op} {expected:,})")
return ok
def _wait_for_clickhouse(host: str, port: int, retries: int = 30) -> "clickhouse_connect.driver.client.Client":
"""Attend que ClickHouse soit disponible et retourne un client connecté."""
for attempt in range(retries):
try:
client = clickhouse_connect.get_client(
host=host, port=port,
database=CLICKHOUSE_DB,
username=CLICKHOUSE_USER,
password=CLICKHOUSE_PASS,
connect_timeout=3,
)
client.ping()
return client
except Exception as exc:
if attempt < retries - 1:
print(f" [verifier] ClickHouse non prêt ({exc}), tentative {attempt + 1}/{retries}...")
time.sleep(3)
else:
raise
def main() -> None:
"""Point d'entrée — exécute toutes les assertions."""
print()
print("=" * 65)
print(" VÉRIFICATION DES VUES MATÉRIALISÉES CLICKHOUSE")
print("=" * 65)
# Connexion
print(f"\n[verifier] Connexion à ClickHouse ({CLICKHOUSE_HOST}:{CLICKHOUSE_PORT}/{CLICKHOUSE_DB})...")
client = _wait_for_clickhouse(CLICKHOUSE_HOST, CLICKHOUSE_PORT)
print(f" {PASS} Connexion établie\n")
failures = 0
# ------------------------------------------------------------------
# 1. Données brutes (ingestion mod_reqin_log → logcorrelator)
# ------------------------------------------------------------------
print("── 1. Ingestion des logs HTTP ──────────────────────────────")
n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.http_logs_raw")
if not _check("http_logs_raw count", n, ">=", 5000):
failures += 1
# ------------------------------------------------------------------
# 2. MV mv_http_logs — parsing JSON → http_logs
# ------------------------------------------------------------------
print("\n── 2. Vue matérialisée mv_http_logs ────────────────────────")
n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.http_logs")
if not _check("http_logs count", n, ">=", 5000):
failures += 1
# Vérification champs non vides
n_methods = _count(client,
f"SELECT count(*) FROM {CLICKHOUSE_DB}.http_logs WHERE method != ''")
if not _check("http_logs rows with method", n_methods, ">=", 5000):
failures += 1
n_ips = _count(client,
f"SELECT count(*) FROM {CLICKHOUSE_DB}.http_logs WHERE src_ip != toIPv4('0.0.0.0')")
if not _check("http_logs rows with src_ip", n_ips, ">=", 5000):
failures += 1
# ------------------------------------------------------------------
# 3. MV mv_agg_host_ip_ja4_1h — agrégation comportementale
# ------------------------------------------------------------------
print("\n── 3. Vue matérialisée mv_agg_host_ip_ja4_1h ───────────────")
n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.agg_host_ip_ja4_1h")
if not _check("agg_host_ip_ja4_1h count", n, ">", 0):
failures += 1
# Vérification que les hits sont cohérents
total_hits = _count(client,
f"SELECT sum(hits) FROM {CLICKHOUSE_DB}.agg_host_ip_ja4_1h")
if not _check("agg_host_ip_ja4_1h total hits", total_hits, ">=", 5000):
failures += 1
# ------------------------------------------------------------------
# 4. Vue view_ai_features_1h — entrée du bot_detector
# ------------------------------------------------------------------
print("\n── 4. Vue view_ai_features_1h ──────────────────────────────")
n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.view_ai_features_1h")
if not _check("view_ai_features_1h count", n, ">", 0):
failures += 1
# ------------------------------------------------------------------
# 5. Résultats bot_detector — ml_all_scores
# ------------------------------------------------------------------
print("\n── 5. Résultats bot_detector (ml_all_scores) ───────────────")
n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.ml_all_scores")
if not _check("ml_all_scores count", n, ">", 0):
failures += 1
# Distribution des scores
result = client.query(
f"SELECT threat_level, count(*) as n "
f"FROM {CLICKHOUSE_DB}.ml_all_scores "
f"GROUP BY threat_level ORDER BY n DESC"
)
print(f"\n Distribution des threat_level :")
for row in result.result_rows:
print(f" {row[0]:20s} : {row[1]:,}")
# ------------------------------------------------------------------
# 6. Table ml_detected_anomalies — anomalies détectées
# ------------------------------------------------------------------
print("\n── 6. Anomalies détectées (ml_detected_anomalies) ──────────")
n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.ml_detected_anomalies")
_check("ml_detected_anomalies count", n, ">=", 0) # peut être 0 si seuil non atteint
print(f" (peut être 0 si aucune session ne dépasse le seuil d'anomalie)")
# ------------------------------------------------------------------
# Bilan des MVs secondaires
# ------------------------------------------------------------------
print("\n── 7. Vues/tables secondaires ──────────────────────────────")
for view in ["agg_header_fingerprint_1h", "view_ip_recurrence",
"view_form_bruteforce_detected", "view_host_ip_ja4_rotation"]:
try:
n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.{view}")
print(f" {PASS} {view:40s} {n:>8,d} lignes")
except Exception as exc:
print(f" \033[93m?\033[0m {view:40s} ERREUR : {exc}")
# ------------------------------------------------------------------
# Résumé
# ------------------------------------------------------------------
print()
print("=" * 65)
if failures == 0:
print(f" {PASS} TOUS LES TESTS PASSENT")
else:
print(f" {FAIL} {failures} TEST(S) ÉCHOUÉ(S)")
print("=" * 65)
print()
sys.exit(0 if failures == 0 else 1)
if __name__ == "__main__":
main()