#!/usr/bin/env python3 """ verify_db.py — Vérification exhaustive des données dans ClickHouse après génération de trafic Vérifie que toutes les couches de données attendues sont présentes : - L3/L4 : TTL, MSS, window_size, df_bit - TLS/L5 : ja4, sni, alpn, version - L7 HTTP : method, path, status_code, duration_ms, header_order_signature - Corrélation : correlated=1 (L3+L7), correlated=0 (L7 seul) - Keep-alives : requêtes multiplexées sur une même connexion TCP - HTTP/2 : tls_alpn='h2' ou h2_settings_count > 0 - HTTP plain : lignes sans TLS (port 80) Usage: python verify_db.py [--host clickhouse] [--port 9000] [--db-logs ja4_logs] [--db-processing ja4_processing] [--min-rows 10] """ import argparse import sys import time # --------------------------------------------------------------------------- # Client ClickHouse léger (HTTP interface port 8123) # --------------------------------------------------------------------------- import urllib.parse import urllib.request import json def ch_query(host: str, port: int, query: str) -> list: """Envoie une requête SELECT à ClickHouse via HTTP (port 8123) et retourne les lignes.""" url = f"http://{host}:{port}/?query={urllib.parse.quote(query + ' FORMAT JSON')}" try: with urllib.request.urlopen(url, timeout=10) as resp: data = json.loads(resp.read()) return data.get("data", []) except Exception as e: return [{"__error__": str(e)}] def ch_scalar(host: str, port: int, query: str) -> str: """Retourne la première colonne de la première ligne.""" rows = ch_query(host, port, query) if not rows or "__error__" in rows[0]: return str(rows[0].get("__error__", "?")) return str(list(rows[0].values())[0]) # --------------------------------------------------------------------------- # Checks # --------------------------------------------------------------------------- CHECK_OK = "✅" CHECK_FAIL = "❌" CHECK_WARN = "⚠️ " results: list = [] def check(name: str, query: str, host: str, port: int, expect_nonzero: bool = True, min_val: int = 1, warn_only: bool = False) -> bool: val = ch_scalar(host, port, query) try: n = int(float(val)) except ValueError: n = -1 ok = n >= min_val if expect_nonzero else n == 0 icon = CHECK_OK if ok else (CHECK_WARN if warn_only else CHECK_FAIL) results.append((icon, name, val)) return ok def run_checks(host: str, http_port: int, db_logs: str, db_processing: str, min_rows: int): print(f"\n{'='*65}") print(f" Vérification ClickHouse — {db_logs} / {db_processing}") print(f"{'='*65}\n") # ------------------------------------------------------------------ # 1. Tables brutes # ------------------------------------------------------------------ print("── 1. Tables brutes ─────────────────────────────────────────") check("http_logs_raw : lignes totales", f"SELECT count() FROM {db_logs}.http_logs_raw", host, http_port, min_val=min_rows) check("http_logs : lignes après MV", f"SELECT count() FROM {db_logs}.http_logs", host, http_port, min_val=min_rows) # ------------------------------------------------------------------ # 2. Métadonnées L3/L4 # ------------------------------------------------------------------ print("\n── 2. Métadonnées L3/L4 ─────────────────────────────────────") check("ip_meta_ttl > 0 (TTL capturé)", f"SELECT count() FROM {db_logs}.http_logs WHERE ip_meta_ttl > 0", host, http_port, min_val=1) check("tcp_meta_mss > 0 (MSS capturé)", f"SELECT count() FROM {db_logs}.http_logs WHERE tcp_meta_mss > 0", host, http_port, min_val=1) check("tcp_meta_window_size > 0 (window_size capturé)", f"SELECT count() FROM {db_logs}.http_logs WHERE tcp_meta_window_size > 0", host, http_port, min_val=1) check("src_port renseigné (> 1000)", f"SELECT count() FROM {db_logs}.http_logs WHERE src_port > 1000", host, http_port, min_val=1) # ------------------------------------------------------------------ # 3. TLS / JA4 # ------------------------------------------------------------------ print("\n── 3. TLS / JA4 ─────────────────────────────────────────────") check("ja4 renseigné (fingerprint TLS)", f"SELECT count() FROM {db_logs}.http_logs WHERE ja4 != ''", host, http_port, min_val=1) check("tls_sni renseigné (SNI extrait)", f"SELECT count() FROM {db_logs}.http_logs WHERE tls_sni != ''", host, http_port, min_val=1) check("tls_version TLSv1.2 présente", f"SELECT count() FROM {db_logs}.http_logs WHERE tls_version = 'TLSv1.2'", host, http_port, min_val=1, warn_only=True) check("tls_version TLSv1.3 présente", f"SELECT count() FROM {db_logs}.http_logs WHERE tls_version = 'TLSv1.3'", host, http_port, min_val=1) check("tls_alpn h2 (HTTP/2 négocié)", f"SELECT count() FROM {db_logs}.http_logs WHERE tls_alpn = 'h2'", host, http_port, min_val=1, warn_only=True) # ja4 format : t12d... ou t13d... selon la version TLS check("ja4 TLS1.2 (commence par t12)", f"SELECT count() FROM {db_logs}.http_logs WHERE startsWith(ja4, 't12')", host, http_port, min_val=1, warn_only=True) check("ja4 TLS1.3 (commence par t13)", f"SELECT count() FROM {db_logs}.http_logs WHERE startsWith(ja4, 't13')", host, http_port, min_val=1) # ------------------------------------------------------------------ # 4. Couche L7 HTTP # ------------------------------------------------------------------ print("\n── 4. Couche L7 HTTP ────────────────────────────────────────") for method in ("GET", "POST", "PUT", "DELETE", "HEAD", "OPTIONS", "PATCH"): check(f"méthode {method} présente", f"SELECT count() FROM {db_logs}.http_logs WHERE method = '{method}'", host, http_port, min_val=1) check("path renseigné", f"SELECT count() FROM {db_logs}.http_logs WHERE path != ''", host, http_port, min_val=1) check("status_code 200 présent", f"SELECT count() FROM {db_logs}.http_logs WHERE status_code = 200", host, http_port, min_val=1) check("status_code 404 présent (requêtes de test)", f"SELECT count() FROM {db_logs}.http_logs WHERE status_code IN (400,404,405,500)", host, http_port, min_val=1, warn_only=True) check("duration_ms > 0 (latence mesurée)", f"SELECT count() FROM {db_logs}.http_logs WHERE duration_ms > 0", host, http_port, min_val=1) check("header_order_signature renseigné", f"SELECT count() FROM {db_logs}.http_logs WHERE header_order_signature != ''", host, http_port, min_val=1) # ------------------------------------------------------------------ # 5. Corrélation L3+L7 # ------------------------------------------------------------------ print("\n── 5. Corrélation L3 ↔ L7 ───────────────────────────────────") check("correlated=1 (lignes L3+L7 fusionnées)", f"SELECT count() FROM {db_logs}.http_logs WHERE correlated = 1", host, http_port, min_val=1) check("correlated=0 (HTTP sans corr. réseau, port 80)", f"SELECT count() FROM {db_logs}.http_logs WHERE correlated = 0", host, http_port, min_val=1, warn_only=True) pct_corr = ch_scalar(host, http_port, f"SELECT round(100*countIf(correlated=1)/count(), 1) FROM {db_logs}.http_logs") results.append((CHECK_OK, f"taux corrélation ({pct_corr}%)", pct_corr)) # ------------------------------------------------------------------ # 6. Keep-alives (TCP multiplexing) # ------------------------------------------------------------------ print("\n── 6. Keep-alives ────────────────────────────────────────────") check("keepalives > 1 (connexions multiplexées)", f"SELECT count() FROM {db_logs}.http_logs WHERE keepalives > 1", host, http_port, min_val=1, warn_only=True) max_ka = ch_scalar(host, http_port, f"SELECT max(keepalives) FROM {db_logs}.http_logs") results.append((CHECK_OK, f"max keepalives ({max_ka})", max_ka)) # ------------------------------------------------------------------ # 7. Diversité des IPs sources # ------------------------------------------------------------------ print("\n── 7. Diversité sources ─────────────────────────────────────") check("IPs sources distinctes >= 5", f"SELECT uniqExact(src_ip) FROM {db_logs}.http_logs", host, http_port, min_val=5, warn_only=True) # ------------------------------------------------------------------ # 8. Tables de traitement (ja4_processing) # ------------------------------------------------------------------ print("\n── 8. Tables processing ─────────────────────────────────────") for tbl in ("agg_host_ip_ja4_1h", "agg_header_fingerprint_1h", "agg_ip_behavior_1h", "agg_request_timing_1h"): check(f"{tbl} peuplée", f"SELECT count() FROM {db_processing}.{tbl}", host, http_port, min_val=1, warn_only=True) # ------------------------------------------------------------------ # Résumé # ------------------------------------------------------------------ print(f"\n{'='*65}") fails = 0 warns = 0 for icon, name, val in results: print(f" {icon} {name:<50s} {val}") if icon == CHECK_FAIL: fails += 1 elif icon == CHECK_WARN: warns += 1 total = len(results) passed = total - fails - warns print(f"\n{'='*65}") print(f" Résultat : {passed} OK | {warns} WARN | {fails} FAIL (total {total})") print(f"{'='*65}\n") return fails == 0 def main(): parser = argparse.ArgumentParser(description="Vérification ClickHouse post-trafic") parser.add_argument("--host", default="clickhouse", help="Hôte ClickHouse") parser.add_argument("--port", type=int, default=8123, help="Port HTTP ClickHouse (8123)") parser.add_argument("--db-logs", default="ja4_logs", help="Base de données logs") parser.add_argument("--db-processing", default="ja4_processing", help="Base processing") parser.add_argument("--min-rows", type=int, default=10, help="Minimum de lignes attendues") parser.add_argument("--wait", type=int, default=5, help="Attendre N secondes avant de vérifier (flush MV)") args = parser.parse_args() if args.wait > 0: print(f"[verify_db] Attente {args.wait}s pour le flush des Materialized Views...") time.sleep(args.wait) ok = run_checks(args.host, args.port, args.db_logs, args.db_processing, args.min_rows) sys.exit(0 if ok else 1) if __name__ == "__main__": main()