#!/usr/bin/env python3 """ verify_mvs.py — Vérifie que les vues matérialisées ClickHouse sont correctement peuplées. Assertions effectuées : 1. http_logs_raw : nb lignes >= 5 000 (mod_reqin_log + logcorrelator fonctionnent) 2. http_logs : nb lignes >= 5 000 (MV mv_http_logs fonctionne) 3. agg_host_ip_ja4_1h : nb lignes > 0 (MV mv_agg_host_ip_ja4_1h fonctionne) 4. view_ai_features_1h : nb lignes > 0 (vue alimentée par agg_host_ip_ja4_1h) 5. ml_all_scores : nb lignes > 0 (bot_detector a tourné) 6. ml_detected_anomalies : requête OK (table accessible) Codes de sortie : 0 = tous les tests passent 1 = au moins un test échoue """ import os import sys import time import clickhouse_connect # -------------------------------------------------------------------------- # Connexion ClickHouse # -------------------------------------------------------------------------- CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST", "clickhouse") CLICKHOUSE_PORT = int(os.getenv("CLICKHOUSE_PORT", "8123")) CLICKHOUSE_DB = os.getenv("CLICKHOUSE_DB", "ja4_processing") CLICKHOUSE_USER = os.getenv("CLICKHOUSE_USER", "default") CLICKHOUSE_PASS = os.getenv("CLICKHOUSE_PASSWORD", "") # -------------------------------------------------------------------------- # Helpers # -------------------------------------------------------------------------- PASS = "\033[92m✓\033[0m" FAIL = "\033[91m✗\033[0m" def _count(client, query: str) -> int: """Exécute un SELECT count(*) et retourne le résultat.""" result = client.query(query) return int(result.result_rows[0][0]) def _check(label: str, actual: int, op: str, expected: int) -> bool: """Vérifie une assertion et affiche le résultat.""" ok = (op == ">=" and actual >= expected) or \ (op == ">" and actual > expected) or \ (op == "==" and actual == expected) symbol = PASS if ok else FAIL print(f" {symbol} {label:40s} {actual:>8,d} (attendu {op} {expected:,})") return ok def _wait_for_clickhouse(host: str, port: int, retries: int = 30) -> "clickhouse_connect.driver.client.Client": """Attend que ClickHouse soit disponible et retourne un client connecté.""" for attempt in range(retries): try: client = clickhouse_connect.get_client( host=host, port=port, database=CLICKHOUSE_DB, username=CLICKHOUSE_USER, password=CLICKHOUSE_PASS, connect_timeout=3, ) client.ping() return client except Exception as exc: if attempt < retries - 1: print(f" [verifier] ClickHouse non prêt ({exc}), tentative {attempt + 1}/{retries}...") time.sleep(3) else: raise def main() -> None: """Point d'entrée — exécute toutes les assertions.""" print() print("=" * 65) print(" VÉRIFICATION DES VUES MATÉRIALISÉES CLICKHOUSE") print("=" * 65) # Connexion print(f"\n[verifier] Connexion à ClickHouse ({CLICKHOUSE_HOST}:{CLICKHOUSE_PORT}/{CLICKHOUSE_DB})...") client = _wait_for_clickhouse(CLICKHOUSE_HOST, CLICKHOUSE_PORT) print(f" {PASS} Connexion établie\n") failures = 0 # ------------------------------------------------------------------ # 1. Données brutes (ingestion mod_reqin_log → logcorrelator) # ------------------------------------------------------------------ print("── 1. Ingestion des logs HTTP ──────────────────────────────") n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.http_logs_raw") if not _check("http_logs_raw count", n, ">=", 5000): failures += 1 # ------------------------------------------------------------------ # 2. MV mv_http_logs — parsing JSON → http_logs # ------------------------------------------------------------------ print("\n── 2. Vue matérialisée mv_http_logs ────────────────────────") n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.http_logs") if not _check("http_logs count", n, ">=", 5000): failures += 1 # Vérification champs non vides n_methods = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.http_logs WHERE method != ''") if not _check("http_logs rows with method", n_methods, ">=", 5000): failures += 1 n_ips = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.http_logs WHERE src_ip != toIPv4('0.0.0.0')") if not _check("http_logs rows with src_ip", n_ips, ">=", 5000): failures += 1 # ------------------------------------------------------------------ # 3. MV mv_agg_host_ip_ja4_1h — agrégation comportementale # ------------------------------------------------------------------ print("\n── 3. Vue matérialisée mv_agg_host_ip_ja4_1h ───────────────") n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.agg_host_ip_ja4_1h") if not _check("agg_host_ip_ja4_1h count", n, ">", 0): failures += 1 # Vérification que les hits sont cohérents total_hits = _count(client, f"SELECT sum(hits) FROM {CLICKHOUSE_DB}.agg_host_ip_ja4_1h") if not _check("agg_host_ip_ja4_1h total hits", total_hits, ">=", 5000): failures += 1 # ------------------------------------------------------------------ # 4. Vue view_ai_features_1h — entrée du bot_detector # ------------------------------------------------------------------ print("\n── 4. Vue view_ai_features_1h ──────────────────────────────") n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.view_ai_features_1h") if not _check("view_ai_features_1h count", n, ">", 0): failures += 1 # ------------------------------------------------------------------ # 5. Résultats bot_detector — ml_all_scores # ------------------------------------------------------------------ print("\n── 5. Résultats bot_detector (ml_all_scores) ───────────────") n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.ml_all_scores") if not _check("ml_all_scores count", n, ">", 0): failures += 1 # Distribution des scores result = client.query( f"SELECT threat_level, count(*) as n " f"FROM {CLICKHOUSE_DB}.ml_all_scores " f"GROUP BY threat_level ORDER BY n DESC" ) print(f"\n Distribution des threat_level :") for row in result.result_rows: print(f" {row[0]:20s} : {row[1]:,}") # ------------------------------------------------------------------ # 6. Table ml_detected_anomalies — anomalies détectées # ------------------------------------------------------------------ print("\n── 6. Anomalies détectées (ml_detected_anomalies) ──────────") n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.ml_detected_anomalies") _check("ml_detected_anomalies count", n, ">=", 0) # peut être 0 si seuil non atteint print(f" (peut être 0 si aucune session ne dépasse le seuil d'anomalie)") # ------------------------------------------------------------------ # Bilan des MVs secondaires # ------------------------------------------------------------------ print("\n── 7. Vues/tables secondaires ──────────────────────────────") for view in ["agg_header_fingerprint_1h", "view_ip_recurrence", "view_form_bruteforce_detected", "view_host_ip_ja4_rotation"]: try: n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.{view}") print(f" {PASS} {view:40s} {n:>8,d} lignes") except Exception as exc: print(f" \033[93m?\033[0m {view:40s} ERREUR : {exc}") # ------------------------------------------------------------------ # 8. Tables d'agrégation avancées (thèse §5) # ------------------------------------------------------------------ print("\n── 8. Tables d'agrégation thèse §5 ─────────────────────────") thesis_tables = [ ("agg_path_sequences_1h", "§5.1 Path Sequence Entropy"), ("agg_request_timing_1h", "§5.3 Request Cadence"), ("agg_ip_behavior_1h", "§5.5/§5.8 JA4 Drift + Cross-Domain"), ("agg_resource_cascade_1h", "§5.4 Resource Dependency Tree"), ] for table, desc in thesis_tables: try: n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.{table}") if not _check(f"{table} ({desc})", n, ">", 0): failures += 1 except Exception as exc: print(f" {FAIL} {table:40s} ERREUR : {exc}") failures += 1 # ------------------------------------------------------------------ # 9. Vue view_thesis_features_1h — features avancées # ------------------------------------------------------------------ print("\n── 9. Vue view_thesis_features_1h (thèse §5) ───────────────") try: n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.view_thesis_features_1h") if not _check("view_thesis_features_1h count", n, ">", 0): failures += 1 # Vérification des colonnes §5.1 result = client.query( f"SELECT avg(path_transition_entropy) AS avg_entropy " f"FROM {CLICKHOUSE_DB}.view_thesis_features_1h " f"WHERE path_transition_entropy >= 0" ) avg_ent = float(result.result_rows[0][0]) if result.result_rows else -1 ok = 0 <= avg_ent <= 1.0 print(f" {'✓' if ok else '✗'} §5.1 path_transition_entropy avg {avg_ent:.4f} (attendu [0, 1])") if not ok: failures += 1 # Vérification des colonnes §5.3 result = client.query( f"SELECT avg(cadence_cv) AS avg_cv, avg(burst_ratio) AS avg_burst " f"FROM {CLICKHOUSE_DB}.view_thesis_features_1h " f"WHERE cadence_cv IS NOT NULL" ) if result.result_rows: avg_cv = float(result.result_rows[0][0]) avg_burst = float(result.result_rows[0][1]) print(f" {PASS} §5.3 cadence_cv avg {avg_cv:.4f}") print(f" {PASS} §5.3 burst_ratio avg {avg_burst:.4f}") else: print(f" \033[93m?\033[0m §5.3 cadence features pas de données") # Vérification des colonnes §5.3 nouvelles (lag1_autocorrelation, benford_deviation) result = client.query( f"SELECT avg(lag1_autocorrelation) AS avg_lag1, avg(benford_deviation) AS avg_benford " f"FROM {CLICKHOUSE_DB}.view_thesis_features_1h " f"WHERE lag1_autocorrelation IS NOT NULL" ) if result.result_rows: avg_lag1 = float(result.result_rows[0][0]) avg_benford = float(result.result_rows[0][1]) ok_lag1 = -1.0 <= avg_lag1 <= 1.0 ok_benford = avg_benford >= 0 print(f" {'✓' if ok_lag1 else '✗'} §5.3 lag1_autocorrelation avg {avg_lag1:.4f} (attendu [-1, 1])") print(f" {'✓' if ok_benford else '✗'} §5.3 benford_deviation avg {avg_benford:.4f} (attendu >= 0)") if not ok_lag1: failures += 1 if not ok_benford: failures += 1 else: print(f" \033[93m?\033[0m §5.3 lag1/benford features pas de données") # Vérification des colonnes §5.5 result = client.query( f"SELECT avg(ja4_drift_ratio) AS avg_drift, " f" avg(host_diversity) AS avg_hosts " f"FROM {CLICKHOUSE_DB}.view_thesis_features_1h " f"WHERE ja4_drift_ratio IS NOT NULL" ) if result.result_rows: avg_drift = float(result.result_rows[0][0]) avg_hosts = float(result.result_rows[0][1]) ok_drift = 0 <= avg_drift <= 1.0 print(f" {'✓' if ok_drift else '✗'} §5.5 ja4_drift_ratio avg {avg_drift:.4f} (attendu [0, 1])") print(f" {PASS} §5.8 host_diversity avg {avg_hosts:.2f}") if not ok_drift: failures += 1 else: print(f" \033[93m?\033[0m §5.5/§5.8 drift/cross-domain pas de données") except Exception as exc: print(f" {FAIL} view_thesis_features_1h ERREUR : {exc}") failures += 1 # ------------------------------------------------------------------ # 10. Vue view_resource_cascade_1h (thèse §5.4) # ------------------------------------------------------------------ print("\n── 10. Vue view_resource_cascade_1h (thèse §5.4) ───────────") try: n = _count(client, f"SELECT count(*) FROM {CLICKHOUSE_DB}.view_resource_cascade_1h") _check("view_resource_cascade_1h count", n, ">=", 0) if n > 0: result = client.query( f"SELECT avg(root_to_first_asset_delay), avg(asset_load_stddev) " f"FROM {CLICKHOUSE_DB}.view_resource_cascade_1h " f"WHERE root_to_first_asset_delay >= 0" ) if result.result_rows: avg_delay = float(result.result_rows[0][0]) avg_stddev = float(result.result_rows[0][1]) print(f" {PASS} §5.4 root_to_first_asset_delay avg {avg_delay:.2f}s") print(f" {PASS} §5.4 asset_load_stddev avg {avg_stddev:.2f}s") else: print(f" (peut être 0 si pas de mix document/asset dans le trafic test)") except Exception as exc: print(f" {FAIL} view_resource_cascade_1h ERREUR : {exc}") # ------------------------------------------------------------------ # Résumé # ------------------------------------------------------------------ print() print("=" * 65) if failures == 0: print(f" {PASS} TOUS LES TESTS PASSENT") else: print(f" {FAIL} {failures} TEST(S) ÉCHOUÉ(S)") print("=" * 65) print() sys.exit(0 if failures == 0 else 1) if __name__ == "__main__": main()