"""JSON API endpoints for the JA4 SOC Dashboard.""" from __future__ import annotations import json import logging import os import re from collections import defaultdict from pathlib import Path from typing import Any from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel from backend.config import DB_PROCESSING, DB_LOGS, safe_identifier from backend.database import query, query_scalar, execute logger = logging.getLogger(__name__) router = APIRouter(prefix="/api") # Pre-validate DB identifiers at import time _DB = safe_identifier(DB_PROCESSING) _DB_LOGS = safe_identifier(DB_LOGS) # Regex pour extraire les features SHAP/ExIFFI depuis le champ reason # Format: "SHAP: feat1(+0.123) | feat2(-0.456)" ou "ExIFFI: ..." _SHAP_RE = re.compile(r"(?:SHAP|ExIFFI):\s*(.+?)(?:\s*\|\s*Threat|$)") _FEAT_RE = re.compile(r"(\w+)\(([+-]?\d+\.\d+)\)") def _aggregate_shap_importance(reasons: list[str]) -> list[dict]: """Agrège les valeurs SHAP/ExIFFI extraites des champs reason.""" totals: dict[str, float] = defaultdict(float) counts: dict[str, int] = defaultdict(int) for reason in reasons: m = _SHAP_RE.search(reason or "") if not m: continue for feat_match in _FEAT_RE.finditer(m.group(1)): name = feat_match.group(1) val = abs(float(feat_match.group(2))) totals[name] += val counts[name] += 1 if not totals: return [] return sorted( [{"name": k, "importance": round(totals[k] / counts[k], 4), "occurrences": counts[k]} for k in totals], key=lambda x: -x["importance"], ) # Whitelists for sort/order to prevent SQL injection _DETECTION_SORT_COLS = { "detected_at", "src_ip", "ja4", "host", "anomaly_score", "threat_level", "recurrence", "hits", "hit_velocity", "fuzzing_index", "post_ratio", "campaign_id", "asn_org", "country_code", "bot_name", "browser_family", } _SCORE_SORT_COLS = { "detected_at", "window_start", "src_ip", "ja4", "host", "anomaly_score", "raw_anomaly_score", "threat_level", "hits", "hit_velocity", "xgb_prob", "ae_recon_error", "asn_org", "country_code", "browser_family", } _TRAFFIC_SORT_COLS = { "time", "src_ip", "method", "host", "path", "http_version", "header_user_agent", "ja4", "src_country_code", } _ORDER_VALUES = {"ASC", "DESC"} def _validate_sort(value: str, whitelist: set[str], default: str) -> str: return value if value in whitelist else default def _validate_order(value: str) -> str: return value.upper() if value.upper() in _ORDER_VALUES else "DESC" # --------------------------------------------------------------------------- # GET /api/overview # --------------------------------------------------------------------------- @router.get("/overview") async def overview() -> dict[str, Any]: try: detections_24h = query_scalar( f"SELECT count() FROM {_DB}.ml_detected_anomalies " "WHERE detected_at >= now() - INTERVAL 1 DAY" ) or 0 scored_24h = query_scalar( f"SELECT count() FROM {_DB}.ml_all_scores " "WHERE detected_at >= now() - INTERVAL 1 DAY" ) or 0 threat_distribution = query( f"SELECT threat_level, count() AS cnt " f"FROM {_DB}.ml_all_scores " "WHERE detected_at >= now() - INTERVAL 1 DAY " "GROUP BY threat_level" ) # Compute critical / high counts from distribution threat_map = {r["threat_level"]: r["cnt"] for r in threat_distribution} critical_count = threat_map.get("CRITICAL", 0) high_count = threat_map.get("HIGH", 0) unique_ips = query_scalar( f"SELECT uniq(src_ip) FROM {_DB}.ml_detected_anomalies " "WHERE detected_at >= now() - INTERVAL 1 DAY" ) or 0 top_ips = query( f"SELECT toString(src_ip) AS src_ip, count() AS cnt, " f"max(anomaly_score) AS worst_score, " f"any(threat_level) AS threat_level, " f"any(asn_org) AS asn_org, any(country_code) AS country_code " f"FROM {_DB}.ml_detected_anomalies " "WHERE detected_at >= now() - INTERVAL 1 DAY " "GROUP BY src_ip ORDER BY cnt DESC LIMIT 10" ) timeline = query( f"SELECT toStartOfHour(detected_at) AS hour, count() AS cnt " f"FROM {_DB}.ml_detected_anomalies " "WHERE detected_at >= now() - INTERVAL 1 DAY " "GROUP BY hour ORDER BY hour" ) traffic_24h = query_scalar( f"SELECT count() FROM {_DB_LOGS}.http_logs " "WHERE time >= now() - INTERVAL 1 DAY" ) or 0 models = query( f"SELECT model_name, count() AS scored " f"FROM {_DB}.ml_all_scores " "WHERE detected_at >= now() - INTERVAL 1 DAY " "GROUP BY model_name" ) browser_stats = query( f"SELECT browser_family, count() AS cnt " f"FROM {_DB}.ml_all_scores " "WHERE detected_at >= now() - INTERVAL 1 DAY " "AND browser_family != '' " "GROUP BY browser_family ORDER BY cnt DESC" ) legitimate_browsers = query_scalar( f"SELECT count() FROM {_DB}.ml_all_scores " "WHERE detected_at >= now() - INTERVAL 1 DAY " "AND threat_level = 'LEGITIMATE_BROWSER'" ) or 0 return { "detections_24h": detections_24h, "scored_24h": scored_24h, "traffic_24h": traffic_24h, "unique_ips": unique_ips, "critical_count": critical_count, "high_count": high_count, "legitimate_browsers": legitimate_browsers, "browser_stats": browser_stats, "threat_distribution": threat_distribution, "top_ips": top_ips, "timeline": [{"hour": str(r["hour"]), "cnt": r["cnt"]} for r in timeline], "models": models, } except Exception as exc: logger.exception("overview query failed") raise HTTPException(status_code=500, detail=str(exc)) # --------------------------------------------------------------------------- # GET /api/detections # --------------------------------------------------------------------------- @router.get("/detections") async def detections( page: int = Query(1, ge=1), per_page: int = Query(50, ge=1, le=500), sort: str = Query("detected_at"), order: str = Query("DESC"), threat_level: str | None = Query(None), search: str | None = Query(None), asn_org: str | None = Query(None), country_code: str | None = Query(None), ja4: str | None = Query(None), bot_name: str | None = Query(None), browser_family: str | None = Query(None), ) -> dict[str, Any]: sort = _validate_sort(sort, _DETECTION_SORT_COLS, "detected_at") order = _validate_order(order) offset = (page - 1) * per_page where_clauses = ["detected_at >= now() - INTERVAL 30 DAY"] params: dict[str, Any] = {} if threat_level: where_clauses.append("threat_level = {tl:String}") params["tl"] = threat_level if search: where_clauses.append( "(toString(src_ip) LIKE {search:String} OR host LIKE {search:String})" ) params["search"] = f"%{search}%" if asn_org: where_clauses.append("asn_org = {asn_org:String}") params["asn_org"] = asn_org if country_code: where_clauses.append("country_code = {cc:String}") params["cc"] = country_code if ja4: where_clauses.append("ja4 = {ja4:String}") params["ja4"] = ja4 if bot_name: where_clauses.append("bot_name = {bn:String}") params["bn"] = bot_name if browser_family: where_clauses.append("browser_family = {bf:String}") params["bf"] = browser_family where = " AND ".join(where_clauses) try: total = query_scalar( f"SELECT count() FROM {_DB}.ml_detected_anomalies WHERE {where}", params, ) rows = query( f"SELECT *, toString(src_ip) AS src_ip_str " f"FROM {_DB}.ml_detected_anomalies " f"WHERE {where} ORDER BY {sort} {order} " f"LIMIT {{lim:UInt32}} OFFSET {{off:UInt32}}", {**params, "lim": per_page, "off": offset}, ) return { "data": rows, "total": total or 0, "page": page, "per_page": per_page, "pages": max(1, -(-((total or 0)) // per_page)), } except Exception as exc: logger.exception("detections query failed") raise HTTPException(status_code=500, detail=str(exc)) # --------------------------------------------------------------------------- # GET /api/scores # --------------------------------------------------------------------------- @router.get("/scores") async def scores( page: int = Query(1, ge=1), per_page: int = Query(50, ge=1, le=500), sort: str = Query("detected_at"), order: str = Query("DESC"), threat_level: str | None = Query(None), search: str | None = Query(None), asn_org: str | None = Query(None), country_code: str | None = Query(None), ja4: str | None = Query(None), browser_family: str | None = Query(None), ) -> dict[str, Any]: sort = _validate_sort(sort, _SCORE_SORT_COLS, "detected_at") order = _validate_order(order) offset = (page - 1) * per_page where_clauses = ["detected_at >= now() - INTERVAL 3 DAY"] params: dict[str, Any] = {} if threat_level: where_clauses.append("threat_level = {tl:String}") params["tl"] = threat_level if search: where_clauses.append( "(toString(src_ip) LIKE {search:String} OR host LIKE {search:String})" ) params["search"] = f"%{search}%" if asn_org: where_clauses.append("asn_org = {asn_org:String}") params["asn_org"] = asn_org if country_code: where_clauses.append("country_code = {cc:String}") params["cc"] = country_code if ja4: where_clauses.append("ja4 = {ja4:String}") params["ja4"] = ja4 if browser_family: where_clauses.append("browser_family = {bf:String}") params["bf"] = browser_family where = " AND ".join(where_clauses) try: total = query_scalar( f"SELECT count() FROM {_DB}.ml_all_scores WHERE {where}", params, ) rows = query( f"SELECT *, toString(src_ip) AS src_ip_str " f"FROM {_DB}.ml_all_scores " f"WHERE {where} ORDER BY {sort} {order} " f"LIMIT {{lim:UInt32}} OFFSET {{off:UInt32}}", {**params, "lim": per_page, "off": offset}, ) return { "data": rows, "total": total or 0, "page": page, "per_page": per_page, "pages": max(1, -(-((total or 0)) // per_page)), } except Exception as exc: logger.exception("scores query failed") raise HTTPException(status_code=500, detail=str(exc)) # --------------------------------------------------------------------------- # GET /api/traffic # --------------------------------------------------------------------------- @router.get("/traffic") async def traffic( page: int = Query(1, ge=1), per_page: int = Query(50, ge=1, le=500), sort: str = Query("time"), order: str = Query("DESC"), method: str | None = Query(None), host: str | None = Query(None), http_version: str | None = Query(None), status: int | None = Query(None, description="Not implemented — reserved"), search: str | None = Query(None), ) -> dict[str, Any]: sort = _validate_sort(sort, _TRAFFIC_SORT_COLS, "time") order = _validate_order(order) offset = (page - 1) * per_page where_clauses = ["time >= now() - INTERVAL 1 DAY"] params: dict[str, Any] = {} if method: where_clauses.append("method = {method:String}") params["method"] = method if host: where_clauses.append("host LIKE {host:String}") params["host"] = f"%{host}%" if http_version is not None: where_clauses.append("http_version = {http_version:String}") params["http_version"] = http_version if search: where_clauses.append( "(toString(src_ip) LIKE {search:String} " "OR path LIKE {search:String} " "OR header_user_agent LIKE {search:String})" ) params["search"] = f"%{search}%" where = " AND ".join(where_clauses) try: total = query_scalar( f"SELECT count() FROM {_DB_LOGS}.http_logs WHERE {where}", params, ) rows = query( f"SELECT time, toString(src_ip) AS src_ip, method, host, path, " f"http_version, header_user_agent, ja4, src_country_code " f"FROM {_DB_LOGS}.http_logs " f"WHERE {where} ORDER BY {sort} {order} " f"LIMIT {{lim:UInt32}} OFFSET {{off:UInt32}}", {**params, "lim": per_page, "off": offset}, ) return { "data": rows, "total": total or 0, "page": page, "per_page": per_page, "pages": max(1, -(-((total or 0)) // per_page)), } except Exception as exc: logger.exception("traffic query failed") raise HTTPException(status_code=500, detail=str(exc)) # --------------------------------------------------------------------------- # GET /api/ip/{ip} # --------------------------------------------------------------------------- @router.get("/ip/{ip}") async def ip_detail(ip: str) -> dict[str, Any]: # Strip ::ffff: prefix for IPv4-mapped addresses clean_ip = ip.replace("::ffff:", "") params = {"ip": clean_ip} try: detections = query( f"SELECT *, toString(src_ip) AS src_ip_str " f"FROM {_DB}.ml_detected_anomalies " "WHERE src_ip = toIPv6({ip:String}) " "AND detected_at >= now() - INTERVAL 30 DAY " "ORDER BY detected_at DESC", params, ) all_scores = query( f"SELECT *, toString(src_ip) AS src_ip_str " f"FROM {_DB}.ml_all_scores " "WHERE src_ip = toIPv6({ip:String}) " "AND detected_at >= now() - INTERVAL 3 DAY " "ORDER BY detected_at DESC", params, ) http_logs = query( f"SELECT time, method, host, path, http_version, header_user_agent, ja4 " f"FROM {_DB_LOGS}.http_logs " "WHERE src_ip = toIPv4OrZero({ip:String}) " "AND time >= now() - INTERVAL 1 DAY " "ORDER BY time DESC LIMIT 100", params, ) ai_features: list[dict] = [] try: ai_features = query( f"SELECT * FROM {_DB}.view_ai_features_1h " "WHERE src_ip = toIPv6({ip:String}) LIMIT 1", params, ) except Exception: logger.debug("view_ai_features_1h unavailable for %s", ip) recurrence: list[dict] = [] try: recurrence = query( f"SELECT * FROM {_DB}.view_ip_recurrence " "WHERE src_ip = toIPv6({ip:String})", params, ) except Exception: logger.debug("view_ip_recurrence unavailable for %s", ip) return { "ip": ip, "detections": detections, "scores": all_scores, "http_logs": http_logs, "ai_features": ai_features, "recurrence": recurrence, } except Exception as exc: logger.exception("ip detail query failed for %s", ip) raise HTTPException(status_code=500, detail=str(exc)) # --------------------------------------------------------------------------- # GET /api/features # --------------------------------------------------------------------------- @router.get("/features") async def features() -> dict[str, Any]: result: dict[str, Any] = { "ai_features": {}, "thesis_features": {}, "human_profile": {}, "bot_profile": {}, "feature_importance": [], } _feat_cols = ( "avg(hits) AS avg_hits, avg(hit_velocity) AS avg_velocity, " "avg(fuzzing_index) AS avg_fuzz, avg(post_ratio) AS avg_post, " "avg(asset_ratio) AS avg_asset, avg(direct_access_ratio) AS avg_direct, " "avg(temporal_entropy) AS avg_entropy, avg(path_diversity_ratio) AS avg_path_div, " "avg(modern_browser_score) AS avg_browser, avg(header_count) AS avg_headers, " "avg(src_port_density) AS avg_port_density, avg(distinct_ja4_count) AS avg_ja4_count" ) try: ai_stats = query( f"SELECT count() AS total, {_feat_cols} FROM {_DB}.view_ai_features_1h" ) if ai_stats: result["ai_features"] = ai_stats[0] except Exception: logger.debug("view_ai_features_1h not available") try: thesis_stats = query( f"SELECT count() AS total, {_feat_cols} FROM {_DB}.view_thesis_features_1h" ) if thesis_stats: result["thesis_features"] = thesis_stats[0] except Exception: logger.debug("view_thesis_features_1h not available") # ISP (residential) vs bot feature profiles for radar comparison try: human = query( f"SELECT {_feat_cols} FROM {_DB}.view_ai_features_1h " "WHERE asn_label = 'isp'" ) if human: result["human_profile"] = human[0] except Exception: pass try: bot = query( f"SELECT {_feat_cols} FROM {_DB}.view_ai_features_1h " "WHERE asn_label IN ('datacenter', 'hosting')" ) if bot: result["bot_profile"] = bot[0] except Exception: pass # Feature variance (importance proxy — fallback si SHAP indisponible) try: variance_rows = query( f"SELECT " f"varPop(hit_velocity) AS v_velocity, " f"varPop(fuzzing_index) AS v_fuzz, " f"varPop(post_ratio) AS v_post, " f"varPop(asset_ratio) AS v_asset, " f"varPop(direct_access_ratio) AS v_direct, " f"varPop(temporal_entropy) AS v_entropy, " f"varPop(path_diversity_ratio) AS v_path_div, " f"varPop(src_port_density) AS v_port_density " f"FROM {_DB}.view_ai_features_1h" ) if variance_rows: row = variance_rows[0] result["feature_importance"] = [ {"name": k.replace("v_", ""), "variance": v} for k, v in sorted(row.items(), key=lambda x: -(x[1] or 0)) ] except Exception: pass # SHAP/ExIFFI — importance réelle extraite des anomalies détectées try: reason_rows = query( f"SELECT reason FROM {_DB}.ml_detected_anomalies " "WHERE reason LIKE '%SHAP:%' OR reason LIKE '%ExIFFI:%' " "ORDER BY detected_at DESC LIMIT 500" ) if reason_rows: shap_importance = _aggregate_shap_importance( [r["reason"] for r in reason_rows] ) if shap_importance: result["shap_importance"] = shap_importance except Exception: logger.debug("SHAP importance extraction unavailable") return result # --------------------------------------------------------------------------- # GET /api/geo — Geographic & ASN breakdown # --------------------------------------------------------------------------- @router.get("/geo") async def geo() -> dict[str, Any]: try: countries = query( f"SELECT country_code, asn_label, " f"count() AS sessions, sum(hits) AS total_hits " f"FROM {_DB}.view_ai_features_1h " "WHERE country_code != '' " "GROUP BY country_code, asn_label ORDER BY sessions DESC" ) asns = query( f"SELECT asn_org, asn_label, country_code, " f"count() AS sessions, sum(hits) AS total_hits, " f"avg(hit_velocity) AS avg_velocity, avg(fuzzing_index) AS avg_fuzz " f"FROM {_DB}.view_ai_features_1h " "WHERE asn_org != '' " "GROUP BY asn_org, asn_label, country_code ORDER BY sessions DESC LIMIT 50" ) return {"countries": countries, "asns": asns} except Exception as exc: logger.exception("geo query failed") return {"countries": [], "asns": []} # --------------------------------------------------------------------------- # GET /api/fingerprints — JA4 fingerprint analysis # --------------------------------------------------------------------------- @router.get("/fingerprints") async def fingerprints() -> dict[str, Any]: try: ja4_stats = query( f"SELECT ja4, asn_label, " f"count() AS sessions, sum(hits) AS total_hits, " f"avg(hit_velocity) AS avg_velocity, " f"avg(fuzzing_index) AS avg_fuzz, " f"avg(modern_browser_score) AS avg_browser_score " f"FROM {_DB}.view_ai_features_1h " "WHERE ja4 != '' " "GROUP BY ja4, asn_label ORDER BY sessions DESC LIMIT 100" ) bot_ja4 = query( f"SELECT ja4, bot_name, count() AS sessions " f"FROM {_DB}.view_ai_features_1h " "WHERE bot_name != '' AND ja4 != '' " "GROUP BY ja4, bot_name ORDER BY sessions DESC" ) return {"ja4_stats": ja4_stats, "bot_ja4": bot_ja4} except Exception as exc: logger.exception("fingerprints query failed") return {"ja4_stats": [], "bot_ja4": []} # --------------------------------------------------------------------------- # GET /api/browsers — Browser family distribution from JA4 fingerprints # --------------------------------------------------------------------------- @router.get("/browsers") async def browsers() -> dict[str, Any]: """Browser identification via JA4 TLS fingerprint → browser_family dictionary.""" try: distribution = query( f"SELECT browser_family, count() AS sessions, " f"uniqExact(src_ip) AS unique_ips, sum(hits) AS total_hits " f"FROM {_DB}.view_ai_features_1h " "WHERE browser_family != '' " "GROUP BY browser_family ORDER BY sessions DESC" ) # Also get unknown (no browser match) unknown = query_scalar( f"SELECT count() FROM {_DB}.view_ai_features_1h " "WHERE browser_family = '' AND bot_name = ''" ) # Top JA4 per browser family top_ja4 = query( f"SELECT browser_family, ja4, count() AS sessions " f"FROM {_DB}.view_ai_features_1h " "WHERE browser_family != '' " "GROUP BY browser_family, ja4 ORDER BY browser_family, sessions DESC " "LIMIT 50" ) return { "distribution": distribution, "unknown_sessions": unknown or 0, "top_ja4_by_browser": top_ja4, } except Exception as exc: logger.exception("browsers query failed") return {"distribution": [], "unknown_sessions": 0, "top_ja4_by_browser": []} # --------------------------------------------------------------------------- # GET /api/behavior — Feature scatter + distributions # --------------------------------------------------------------------------- _BEHAVIOR_FEATURES = [ "hit_velocity", "fuzzing_index", "post_ratio", "asset_ratio", "direct_access_ratio", "temporal_entropy", "path_diversity_ratio", "modern_browser_score", "header_count", "is_ua_rotating", "distinct_ja4_count", "src_port_density", ] @router.get("/behavior") async def behavior() -> dict[str, Any]: cols = ", ".join(_BEHAVIOR_FEATURES) try: scatter = query( f"SELECT toString(src_ip) AS ip, asn_label, bot_name, hits, {cols} " f"FROM {_DB}.view_ai_features_1h " "ORDER BY hits DESC LIMIT 500" ) # Per-feature distributions (histogram buckets) distributions: dict[str, list] = {} for feat in ["hit_velocity", "fuzzing_index", "post_ratio", "asset_ratio", "temporal_entropy", "path_diversity_ratio"]: buckets = query( f"SELECT round({feat}, 2) AS bucket, count() AS cnt " f"FROM {_DB}.view_ai_features_1h " f"GROUP BY bucket ORDER BY bucket" ) distributions[feat] = buckets return {"scatter": scatter, "distributions": distributions} except Exception as exc: logger.exception("behavior query failed") return {"scatter": [], "distributions": {}} # --------------------------------------------------------------------------- # GET /api/heatmap — Temporal heatmap (hour × day) # --------------------------------------------------------------------------- @router.get("/heatmap") async def heatmap() -> dict[str, Any]: try: cells = query( f"SELECT toDayOfWeek(time) - 1 AS dow, toHour(time) AS hour, count() AS cnt " f"FROM {_DB_LOGS}.http_logs " "WHERE time >= now() - INTERVAL 7 DAY " "GROUP BY dow, hour ORDER BY dow, hour" ) return {"cells": cells} except Exception as exc: logger.exception("heatmap query failed") return {"cells": []} # --------------------------------------------------------------------------- # GET /api/ip/{ip}/radar — Radar comparison vs ISP baseline # --------------------------------------------------------------------------- _RADAR_FEATURES = [ "hit_velocity", "fuzzing_index", "post_ratio", "asset_ratio", "direct_access_ratio", "temporal_entropy", "path_diversity_ratio", "modern_browser_score", ] @router.get("/ip/{ip}/radar") async def ip_radar(ip: str) -> dict[str, Any]: clean_ip = ip.replace("::ffff:", "") cols_avg = ", ".join(f"avg({f}) AS {f}" for f in _RADAR_FEATURES) try: ip_data = query( f"SELECT {', '.join(_RADAR_FEATURES)} " f"FROM {_DB}.view_ai_features_1h " "WHERE src_ip = toIPv6({ip:String}) LIMIT 1", {"ip": clean_ip}, ) baseline = query( f"SELECT {cols_avg} " f"FROM {_DB}.view_ai_features_1h " "WHERE asn_label = 'isp'" ) bot_avg = query( f"SELECT {cols_avg} " f"FROM {_DB}.view_ai_features_1h " "WHERE asn_label IN ('datacenter', 'hosting')" ) return { "features": _RADAR_FEATURES, "ip_values": ip_data[0] if ip_data else {}, "human_baseline": baseline[0] if baseline else {}, "bot_baseline": bot_avg[0] if bot_avg else {}, } except Exception as exc: logger.exception("ip radar query failed for %s", ip) return {"features": _RADAR_FEATURES, "ip_values": {}, "human_baseline": {}, "bot_baseline": {}} # --------------------------------------------------------------------------- # GET /api/models # --------------------------------------------------------------------------- _MODEL_DIR = Path("/data/models") @router.get("/models") async def models() -> dict[str, Any]: model_info: list[dict[str, Any]] = [] if _MODEL_DIR.is_dir(): for p in sorted(_MODEL_DIR.glob("*.json")): try: data = json.loads(p.read_text()) model_info.append(data) except Exception: logger.warning("Could not read model metadata %s", p) # Also fetch latest scoring stats from ClickHouse scoring_stats: list[dict] = [] try: scoring_stats = query( f"SELECT model_name, count() AS scored, " f"min(detected_at) AS first_seen, max(detected_at) AS last_seen " f"FROM {_DB}.ml_all_scores " "WHERE detected_at >= now() - INTERVAL 7 DAY " "GROUP BY model_name" ) except Exception: logger.debug("could not fetch model scoring stats") return {"models": model_info, "scoring_stats": scoring_stats} # --------------------------------------------------------------------------- # GET /api/models/timeline — Scoring volume over time per model # --------------------------------------------------------------------------- @router.get("/models/timeline") async def models_timeline() -> dict[str, Any]: """Volume de scoring horaire par modèle (7 jours).""" try: rows = query( f"SELECT toStartOfHour(detected_at) AS hour, " f"model_name, count() AS cnt, " f"avg(anomaly_score) AS avg_score, " f"countIf(threat_level IN ('HIGH','CRITICAL')) AS anomalies " f"FROM {_DB}.ml_all_scores " "WHERE detected_at >= now() - INTERVAL 7 DAY " "GROUP BY hour, model_name " "ORDER BY hour" ) return {"timeline": rows} except Exception as exc: logger.exception("models timeline query failed") return {"timeline": []} # --------------------------------------------------------------------------- # GET /api/models/threats — Threat breakdown per model # --------------------------------------------------------------------------- @router.get("/models/threats") async def models_threats() -> dict[str, Any]: """Répartition des niveaux de menace par modèle.""" try: rows = query( f"SELECT model_name, threat_level, count() AS cnt " f"FROM {_DB}.ml_all_scores " "WHERE detected_at >= now() - INTERVAL 7 DAY " "GROUP BY model_name, threat_level " "ORDER BY model_name, cnt DESC" ) return {"threats": rows} except Exception as exc: logger.exception("models threats query failed") return {"threats": []} # --------------------------------------------------------------------------- # GET /api/classify/stats — Classification summary stats # --------------------------------------------------------------------------- @router.get("/classify/stats") async def classify_stats() -> dict[str, Any]: """Statistiques de classification SOC.""" try: _ensure_feedback_table() rows = query( f"SELECT classification, count() AS cnt " f"FROM {_DB}.soc_feedback " "GROUP BY classification" ) total = sum(r["cnt"] for r in rows) return {"stats": rows, "total": total} except Exception: return {"stats": [], "total": 0} # --------------------------------------------------------------------------- # GET /api/classify/suggested — Top unclassified IPs # --------------------------------------------------------------------------- @router.get("/classify/suggested") async def classify_suggested() -> dict[str, Any]: """IPs détectées non encore classifiées, triées par sévérité.""" try: _ensure_feedback_table() rows = query( f"SELECT toString(d.src_ip) AS src_ip, " f"max(d.anomaly_score) AS worst_score, " f"max(d.threat_level) AS threat_level, " f"count() AS detection_count, " f"any(d.ja4) AS ja4, any(d.host) AS host, " f"any(d.asn_org) AS asn_org, any(d.country_code) AS country_code " f"FROM {_DB}.ml_detected_anomalies AS d " f"LEFT JOIN {_DB}.soc_feedback AS f " f"ON d.src_ip = f.src_ip " "WHERE d.detected_at >= now() - INTERVAL 3 DAY " "AND f.src_ip IS NULL " "GROUP BY d.src_ip " "ORDER BY worst_score DESC " "LIMIT 20" ) return {"suggested": rows} except Exception as exc: logger.exception("classify suggested query failed") return {"suggested": []} # --------------------------------------------------------------------------- # POST /api/classify — SOC analyst feedback # --------------------------------------------------------------------------- class ClassifyRequest(BaseModel): src_ip: str classification: str # true_positive | false_positive | suspicious comment: str = "" _VALID_CLASSIFICATIONS = {"true_positive", "false_positive", "suspicious"} _feedback_table_ensured = False def _ensure_feedback_table() -> None: global _feedback_table_ensured if _feedback_table_ensured: return execute( f"CREATE TABLE IF NOT EXISTS {_DB}.soc_feedback (" " created_at DateTime DEFAULT now()," " src_ip IPv6," " classification LowCardinality(String)," " comment String" ") ENGINE = MergeTree() ORDER BY (src_ip, created_at)" ) _feedback_table_ensured = True @router.post("/classify") async def classify(body: ClassifyRequest) -> dict[str, Any]: if body.classification not in _VALID_CLASSIFICATIONS: raise HTTPException( status_code=422, detail=f"classification must be one of {_VALID_CLASSIFICATIONS}", ) try: _ensure_feedback_table() execute( f"INSERT INTO {_DB}.soc_feedback (src_ip, classification, comment) VALUES " "(toIPv6({ip:String}), {cls:String}, {cmt:String})", {"ip": body.src_ip, "cls": body.classification, "cmt": body.comment}, ) return {"status": "ok", "src_ip": body.src_ip, "classification": body.classification} except Exception as exc: logger.exception("classify insert failed") raise HTTPException(status_code=500, detail=str(exc)) # --------------------------------------------------------------------------- # GET /api/classifications — recent SOC feedback # --------------------------------------------------------------------------- @router.get("/classifications") async def classifications() -> dict[str, Any]: try: _ensure_feedback_table() rows = query( f"SELECT created_at, toString(src_ip) AS src_ip, classification, comment " f"FROM {_DB}.soc_feedback " "ORDER BY created_at DESC LIMIT 50" ) return {"data": rows} except Exception as exc: logger.exception("classifications query failed") return {"data": []} # --------------------------------------------------------------------------- # GET /api/campaigns — HDBSCAN bot campaign clusters # --------------------------------------------------------------------------- @router.get("/campaigns") async def campaigns(days: int = 7) -> dict[str, Any]: """Campagnes de bots détectées par clustering HDBSCAN.""" days = max(1, min(days, 90)) try: rows = query( f"SELECT campaign_id, " f"count() AS members, " f"min(detected_at) AS first_seen, max(detected_at) AS last_seen, " f"avg(anomaly_score) AS avg_score, " f"max(anomaly_score) AS max_score, " f"uniqExact(src_ip) AS unique_ips, " f"groupUniqArray(10)(ja4) AS ja4_list, " f"groupUniqArray(5)(asn_org) AS asn_list, " f"groupUniqArray(5)(country_code) AS countries, " f"avg(hits) AS avg_hits, " f"avg(hit_velocity) AS avg_velocity, " f"avg(fuzzing_index) AS avg_fuzzing, " f"avg(post_ratio) AS avg_post_ratio " f"FROM {_DB}.ml_detected_anomalies " "WHERE campaign_id >= 0 " "AND detected_at >= now() - INTERVAL {days:UInt16} DAY " "GROUP BY campaign_id " "ORDER BY members DESC LIMIT 50", {"days": days}, ) return {"campaigns": rows} except Exception as exc: logger.exception("campaigns query failed") return {"campaigns": []} # --------------------------------------------------------------------------- # GET /api/campaigns/graph — Network graph data (shared JA4/ASN links) # --------------------------------------------------------------------------- @router.get("/campaigns/graph") async def campaigns_graph(days: int = 7) -> dict[str, Any]: """Données de graphe réseau : nœuds (IPs) et liens (JA4/ASN partagés).""" days = max(1, min(days, 90)) try: # Nœuds : chaque IP avec ses attributs principaux nodes = query( f"SELECT toString(src_ip) AS id, " f"campaign_id AS group, " f"any(ja4) AS ja4, any(asn_org) AS asn_org, " f"any(country_code) AS country, any(threat_level) AS threat, " f"any(browser_family) AS browser_family, " f"sum(hits) AS total_hits, " f"min(anomaly_score) AS worst_score " f"FROM {_DB}.ml_detected_anomalies " "WHERE campaign_id >= 0 " "AND detected_at >= now() - INTERVAL {days:UInt16} DAY " "GROUP BY src_ip, campaign_id " "ORDER BY campaign_id, worst_score ASC " "LIMIT 500", {"days": days}, ) # Liens : IPs partageant le même JA4 dans la même campagne edges = query( f"SELECT " f"toString(a.src_ip) AS source, " f"toString(b.src_ip) AS target, " f"a.ja4 AS shared_ja4 " f"FROM {_DB}.ml_detected_anomalies a " f"INNER JOIN {_DB}.ml_detected_anomalies b " "ON a.ja4 = b.ja4 AND a.campaign_id = b.campaign_id " "WHERE a.campaign_id >= 0 " "AND a.src_ip < b.src_ip " "AND a.detected_at >= now() - INTERVAL {days:UInt16} DAY " "AND b.detected_at >= now() - INTERVAL {days:UInt16} DAY " "LIMIT 2000", {"days": days}, ) return {"nodes": nodes, "edges": edges} except Exception as exc: logger.exception("campaigns graph query failed") return {"nodes": [], "edges": []} # --------------------------------------------------------------------------- # GET /api/campaigns/scatter — Scatter plot : une bulle par campagne # --------------------------------------------------------------------------- @router.get("/campaigns/scatter") async def campaigns_scatter(days: int = 7) -> dict[str, Any]: """Données scatter plot : score vs vélocité agrégés par campagne.""" days = max(1, min(days, 90)) try: rows = query( f"SELECT campaign_id, " f"avg(anomaly_score) AS avg_score, " f"avg(hit_velocity) AS avg_velocity, " f"sum(hits) AS total_hits, " f"uniqExact(src_ip) AS unique_ips, " f"groupUniqArray(5)(ja4) AS ja4_list, " f"groupUniqArray(3)(asn_org) AS asn_list, " f"groupUniqArray(3)(country_code) AS country_list, " f"any(threat_level) AS threat, " f"min(detected_at) AS first_seen, " f"max(detected_at) AS last_seen " f"FROM {_DB}.ml_detected_anomalies " "WHERE campaign_id >= 0 " "AND detected_at >= now() - INTERVAL {days:UInt16} DAY " "GROUP BY campaign_id " "ORDER BY avg_score ASC LIMIT 100", {"days": days}, ) return {"data": rows} except Exception as exc: logger.exception("campaigns scatter query failed") return {"data": []} # --------------------------------------------------------------------------- # GET /api/campaigns/{cid} — Campaign detail (member IPs + features) # --------------------------------------------------------------------------- @router.get("/campaigns/{cid}") async def campaign_detail(cid: int, days: int = 7) -> dict[str, Any]: """Détail d'une campagne : IPs membres, features comportementales, timeline.""" days = max(1, min(days, 90)) try: members = query( f"SELECT toString(src_ip) AS src_ip, ja4, host, " f"anomaly_score, raw_anomaly_score, threat_level, " f"hits, hit_velocity, fuzzing_index, post_ratio, " f"port_exhaustion_ratio, orphan_ratio, " f"asn_org, asn_number, country_code, " f"browser_family, bot_name, detected_at, reason " f"FROM {_DB}.ml_detected_anomalies " "WHERE campaign_id = {cid:Int32} " "AND detected_at >= now() - INTERVAL {days:UInt16} DAY " "ORDER BY anomaly_score ASC LIMIT 200", {"cid": cid, "days": days}, ) # Profil agrégé de la campagne profile = query( f"SELECT " f"avg(hits) AS avg_hits, avg(hit_velocity) AS avg_velocity, " f"avg(fuzzing_index) AS avg_fuzzing, avg(post_ratio) AS avg_post_ratio, " f"avg(port_exhaustion_ratio) AS avg_port_exhaustion, " f"avg(orphan_ratio) AS avg_orphan, " f"avg(anomaly_score) AS avg_score, max(anomaly_score) AS max_score, " f"uniqExact(src_ip) AS unique_ips, uniqExact(ja4) AS unique_ja4, " f"uniqExact(host) AS unique_hosts, uniqExact(asn_org) AS unique_asns, " f"groupUniqArray(20)(ja4) AS ja4_list, " f"groupUniqArray(10)(asn_org) AS asn_list, " f"groupUniqArray(10)(country_code) AS country_list, " f"groupUniqArray(10)(host) AS host_list, " f"min(detected_at) AS first_seen, max(detected_at) AS last_seen " f"FROM {_DB}.ml_detected_anomalies " "WHERE campaign_id = {cid:Int32} " "AND detected_at >= now() - INTERVAL {days:UInt16} DAY", {"cid": cid, "days": days}, ) # Timeline horaire de la campagne timeline = query( f"SELECT toStartOfHour(detected_at) AS hour, " f"count() AS detections, uniqExact(src_ip) AS active_ips " f"FROM {_DB}.ml_detected_anomalies " "WHERE campaign_id = {cid:Int32} " "AND detected_at >= now() - INTERVAL {days:UInt16} DAY " "GROUP BY hour ORDER BY hour", {"cid": cid, "days": days}, ) return { "campaign_id": cid, "members": members, "profile": profile[0] if profile else {}, "timeline": timeline, } except Exception as exc: logger.exception("campaign detail query failed for %s", cid) return {"campaign_id": cid, "members": [], "profile": {}, "timeline": []} # --------------------------------------------------------------------------- # GET /api/brute-force — Form brute-force detection # --------------------------------------------------------------------------- @router.get("/brute-force") async def brute_force() -> dict[str, Any]: """Détection de brute-force / credential stuffing via view_form_bruteforce_detected.""" try: rows = query( f"SELECT toString(src_ip) AS src_ip, host, " f"post_count, distinct_paths, first_seen, last_seen " f"FROM {_DB}.view_form_bruteforce_detected " "ORDER BY post_count DESC LIMIT 100" ) return {"data": rows} except Exception as exc: logger.exception("brute-force query failed") return {"data": []} # --------------------------------------------------------------------------- # GET /api/ja4-rotation — JA4 fingerprint rotation detection # --------------------------------------------------------------------------- @router.get("/ja4-rotation") async def ja4_rotation() -> dict[str, Any]: """IPs présentant une rotation de fingerprints JA4 (évasion potentielle).""" try: rows = query( f"SELECT toString(src_ip) AS src_ip, host, " f"distinct_ja4, ja4_list, total_hits, window_start " f"FROM {_DB}.view_host_ip_ja4_rotation " "ORDER BY distinct_ja4 DESC LIMIT 100" ) return {"data": rows} except Exception as exc: logger.exception("ja4-rotation query failed") return {"data": []} # --------------------------------------------------------------------------- # GET /api/recurrence — Persistent threat IPs # --------------------------------------------------------------------------- @router.get("/recurrence") async def recurrence() -> dict[str, Any]: """IPs récurrentes détectées sur plusieurs fenêtres temporelles.""" try: rows = query( f"SELECT toString(src_ip) AS src_ip, " f"recurrence, worst_score, worst_threat, " f"first_seen, last_seen, " f"top_ja4, top_host " f"FROM {_DB}.view_ip_recurrence " "ORDER BY recurrence DESC, worst_score DESC LIMIT 100" ) return {"data": rows} except Exception as exc: logger.exception("recurrence query failed") return {"data": []} # --------------------------------------------------------------------------- # GET /api/cascade/{ip} — Resource cascade for headless detection # --------------------------------------------------------------------------- @router.get("/cascade/{ip}") async def cascade(ip: str) -> dict[str, Any]: """Cascade de ressources (détection navigateurs headless) pour une IP.""" clean_ip = ip.replace("::ffff:", "") try: rows = query( f"SELECT toString(src_ip) AS src_ip, host, " f"page_count, avg_sub_delay_ms, stddev_sub_delay_ms, " f"max_sub_resources, window_start " f"FROM {_DB}.view_resource_cascade_1h " "WHERE src_ip = toIPv6({ip:String}) " "ORDER BY window_start DESC LIMIT 50", {"ip": clean_ip}, ) return {"data": rows} except Exception as exc: logger.exception("cascade query failed for %s", ip) return {"data": []} # --------------------------------------------------------------------------- # GET /api/alerts — Live alert feed (recent HIGH/CRITICAL) # --------------------------------------------------------------------------- @router.get("/alerts") async def alerts( limit: int = Query(20, ge=1, le=100), ) -> dict[str, Any]: """Flux d'alertes en temps réel (CRITICAL, HIGH, KNOWN_BOT).""" try: rows = query( f"SELECT detected_at, toString(src_ip) AS src_ip, " f"anomaly_score, threat_level, ja4, host, " f"asn_org, country_code, bot_name, campaign_id, " f"hits, hit_velocity, reason " f"FROM {_DB}.ml_detected_anomalies " "WHERE detected_at >= now() - INTERVAL 1 DAY " "ORDER BY detected_at DESC " f"LIMIT {{lim:UInt32}}", {"lim": limit}, ) return {"alerts": rows} except Exception as exc: logger.exception("alerts query failed") return {"alerts": []} # --------------------------------------------------------------------------- # GET /api/timeline-detail — Hourly threat-level breakdown # --------------------------------------------------------------------------- @router.get("/timeline-detail") async def timeline_detail() -> dict[str, Any]: """Timeline horaire avec ventilation par threat level.""" try: rows = query( f"SELECT toStartOfHour(detected_at) AS hour, " f"threat_level, count() AS cnt " f"FROM {_DB}.ml_detected_anomalies " "WHERE detected_at >= now() - INTERVAL 1 DAY " "GROUP BY hour, threat_level " "ORDER BY hour" ) # Pivot: group by hour hours: dict[str, dict] = {} for r in rows: h = str(r["hour"]) if h not in hours: hours[h] = {"hour": h} hours[h][r["threat_level"]] = r["cnt"] return {"timeline": list(hours.values())} except Exception as exc: logger.exception("timeline-detail query failed") return {"timeline": []} # --------------------------------------------------------------------------- # GET /api/ua-rotation — User-Agent rotation detection # --------------------------------------------------------------------------- @router.get("/ua-rotation") async def ua_rotation() -> dict[str, Any]: """IPs avec rotation de User-Agent (évasion potentielle).""" try: rows = query( f"SELECT toString(src_ip) AS src_ip, ja4, " f"distinct_ua_count, ua_samples, total_requests, window_start " f"FROM {_DB}.view_dashboard_user_agents " "ORDER BY distinct_ua_count DESC LIMIT 100" ) return {"data": rows} except Exception as exc: logger.exception("ua-rotation query failed") return {"data": []} # --------------------------------------------------------------------------- # GET /api/ja4/{fingerprint} — JA4 fingerprint investigation detail # --------------------------------------------------------------------------- @router.get("/ja4/{fingerprint:path}") async def ja4_detail(fingerprint: str) -> dict[str, Any]: """Investigation complète d'une empreinte JA4 : IPs, scores, comportement.""" params = {"ja4": fingerprint} try: # IPs utilisant cette empreinte (détections) detections = query( f"SELECT toString(src_ip) AS src_ip, anomaly_score, " f"raw_anomaly_score, threat_level, hits, hit_velocity, " f"host, asn_org, country_code, browser_family, bot_name, " f"detected_at, campaign_id " f"FROM {_DB}.ml_detected_anomalies " "WHERE ja4 = {ja4:String} " "AND detected_at >= now() - INTERVAL 7 DAY " "ORDER BY detected_at DESC LIMIT 500", params, ) # Scores ML pour cette JA4 all_scores = query( f"SELECT toString(src_ip) AS src_ip, anomaly_score, " f"raw_anomaly_score, ae_recon_error, xgb_prob, " f"threat_level, model_name, host, hits, " f"asn_org, country_code, browser_family, detected_at " f"FROM {_DB}.ml_all_scores " "WHERE ja4 = {ja4:String} " "AND detected_at >= now() - INTERVAL 3 DAY " "ORDER BY detected_at DESC LIMIT 500", params, ) # Profil agrégé profile = query( f"SELECT " f"count() AS total_sessions, " f"uniqExact(src_ip) AS unique_ips, " f"uniqExact(host) AS unique_hosts, " f"uniqExact(asn_org) AS unique_asns, " f"avg(anomaly_score) AS avg_score, " f"max(anomaly_score) AS max_score, " f"avg(hits) AS avg_hits, " f"avg(hit_velocity) AS avg_velocity, " f"sum(hits) AS total_hits, " f"groupUniqArray(20)(toString(src_ip)) AS ip_sample, " f"groupUniqArray(10)(host) AS host_list, " f"groupUniqArray(10)(asn_org) AS asn_list, " f"groupUniqArray(10)(country_code) AS country_list, " f"groupUniqArray(5)(browser_family) AS browser_list, " f"groupUniqArray(5)(bot_name) AS bot_names, " f"min(detected_at) AS first_seen, max(detected_at) AS last_seen, " f"countIf(threat_level IN ('HIGH','CRITICAL')) AS threat_count, " f"countIf(threat_level = 'KNOWN_BOT') AS known_bot_count, " f"countIf(browser_family != '') AS browser_count " f"FROM {_DB}.ml_all_scores " "WHERE ja4 = {ja4:String} " "AND detected_at >= now() - INTERVAL 7 DAY", params, ) # Timeline horaire timeline = query( f"SELECT toStartOfHour(detected_at) AS hour, " f"count() AS sessions, uniqExact(src_ip) AS active_ips, " f"avg(anomaly_score) AS avg_score " f"FROM {_DB}.ml_all_scores " "WHERE ja4 = {ja4:String} " "AND detected_at >= now() - INTERVAL 3 DAY " "GROUP BY hour ORDER BY hour", params, ) # Threat breakdown threats = query( f"SELECT threat_level, count() AS cnt " f"FROM {_DB}.ml_all_scores " "WHERE ja4 = {ja4:String} " "AND detected_at >= now() - INTERVAL 7 DAY " "GROUP BY threat_level ORDER BY cnt DESC", params, ) # Trafic HTTP brut http_logs = query( f"SELECT time, toString(src_ip) AS src_ip, method, host, path, " f"http_version, header_user_agent " f"FROM {_DB_LOGS}.http_logs " "WHERE ja4 = {ja4:String} " "AND time >= now() - INTERVAL 1 DAY " "ORDER BY time DESC LIMIT 200", params, ) # AI features pour cette JA4 ai_features: list[dict] = [] try: ai_features = query( f"SELECT * FROM {_DB}.view_ai_features_1h " "WHERE ja4 = {ja4:String} LIMIT 20", params, ) except Exception: logger.debug("view_ai_features_1h unavailable for ja4=%s", fingerprint) return { "ja4": fingerprint, "profile": profile[0] if profile else {}, "detections": detections, "scores": all_scores, "timeline": timeline, "threats": threats, "http_logs": http_logs, "ai_features": ai_features, } except Exception as exc: logger.exception("ja4 detail query failed for %s", fingerprint) raise HTTPException(status_code=500, detail=str(exc)) # --------------------------------------------------------------------------- # GET /api/cluster/{cid} — Enhanced cluster investigation # --------------------------------------------------------------------------- @router.get("/cluster/{cid}") async def cluster_detail(cid: int) -> dict[str, Any]: """Investigation complète d'un cluster : profil, membres, graphe, timeline.""" params = {"cid": cid} try: # Profil agrégé enrichi profile = query( f"SELECT " f"count() AS total_members, " f"uniqExact(src_ip) AS unique_ips, " f"uniqExact(ja4) AS unique_ja4, " f"uniqExact(host) AS unique_hosts, " f"uniqExact(asn_org) AS unique_asns, " f"avg(anomaly_score) AS avg_score, max(anomaly_score) AS max_score, " f"min(anomaly_score) AS min_score, " f"avg(hits) AS avg_hits, sum(hits) AS total_hits, " f"avg(hit_velocity) AS avg_velocity, " f"avg(fuzzing_index) AS avg_fuzzing, " f"avg(post_ratio) AS avg_post_ratio, " f"groupUniqArray(30)(toString(src_ip)) AS ip_list, " f"groupUniqArray(20)(ja4) AS ja4_list, " f"groupUniqArray(10)(host) AS host_list, " f"groupUniqArray(10)(asn_org) AS asn_list, " f"groupUniqArray(10)(country_code) AS country_list, " f"groupUniqArray(5)(browser_family) AS browser_list, " f"groupUniqArray(5)(bot_name) AS bot_names, " f"min(detected_at) AS first_seen, max(detected_at) AS last_seen, " f"countIf(threat_level IN ('HIGH','CRITICAL')) AS threat_count, " f"countIf(threat_level = 'KNOWN_BOT') AS known_bot_count " f"FROM {_DB}.ml_detected_anomalies " "WHERE campaign_id = {cid:Int32} " "AND detected_at >= now() - INTERVAL 7 DAY", params, ) # Membres détaillés members = query( f"SELECT toString(src_ip) AS src_ip, ja4, host, " f"anomaly_score, raw_anomaly_score, threat_level, " f"hits, hit_velocity, fuzzing_index, post_ratio, " f"asn_org, asn_number, country_code, " f"browser_family, bot_name, detected_at, reason " f"FROM {_DB}.ml_detected_anomalies " "WHERE campaign_id = {cid:Int32} " "AND detected_at >= now() - INTERVAL 7 DAY " "ORDER BY anomaly_score ASC LIMIT 500", params, ) # Timeline horaire timeline = query( f"SELECT toStartOfHour(detected_at) AS hour, " f"count() AS detections, uniqExact(src_ip) AS active_ips, " f"avg(anomaly_score) AS avg_score " f"FROM {_DB}.ml_detected_anomalies " "WHERE campaign_id = {cid:Int32} " "AND detected_at >= now() - INTERVAL 7 DAY " "GROUP BY hour ORDER BY hour", params, ) # Répartition par JA4 (signature convergence) ja4_breakdown = query( f"SELECT ja4, count() AS sessions, " f"uniqExact(src_ip) AS unique_ips, " f"avg(anomaly_score) AS avg_score " f"FROM {_DB}.ml_detected_anomalies " "WHERE campaign_id = {cid:Int32} " "AND detected_at >= now() - INTERVAL 7 DAY " "GROUP BY ja4 ORDER BY sessions DESC", params, ) # Répartition par ASN (infrastructure) asn_breakdown = query( f"SELECT asn_org, count() AS sessions, " f"uniqExact(src_ip) AS unique_ips " f"FROM {_DB}.ml_detected_anomalies " "WHERE campaign_id = {cid:Int32} " "AND detected_at >= now() - INTERVAL 7 DAY " "GROUP BY asn_org ORDER BY sessions DESC", params, ) # Répartition par host ciblé host_breakdown = query( f"SELECT host, count() AS sessions, " f"avg(anomaly_score) AS avg_score " f"FROM {_DB}.ml_detected_anomalies " "WHERE campaign_id = {cid:Int32} " "AND detected_at >= now() - INTERVAL 7 DAY " "GROUP BY host ORDER BY sessions DESC", params, ) # Threat breakdown threats = query( f"SELECT threat_level, count() AS cnt " f"FROM {_DB}.ml_detected_anomalies " "WHERE campaign_id = {cid:Int32} " "AND detected_at >= now() - INTERVAL 7 DAY " "GROUP BY threat_level ORDER BY cnt DESC", params, ) return { "campaign_id": cid, "profile": profile[0] if profile else {}, "members": members, "timeline": timeline, "ja4_breakdown": ja4_breakdown, "asn_breakdown": asn_breakdown, "host_breakdown": host_breakdown, "threats": threats, } except Exception as exc: logger.exception("cluster detail query failed for %s", cid) raise HTTPException(status_code=500, detail=str(exc)) # ═══════════════════════════════════════════════════════════════════════════════ # Listes de référence (CSV / dictionnaires ClickHouse) # ═══════════════════════════════════════════════════════════════════════════════ @router.get("/dictionaries") async def dictionaries_meta(): """Métadonnées de tous les dictionnaires ClickHouse.""" try: rows = query( "SELECT name, type, status, element_count, " " arrayStringConcat(attribute.names, ', ') AS attributes " "FROM system.dictionaries " f"WHERE database = '{_DB}' " "ORDER BY name", ) return {"dictionaries": rows} except Exception as exc: logger.exception("dictionaries meta query failed") raise HTTPException(status_code=500, detail=str(exc)) _REFLIST_SORT = { "bot_ip": {"prefix", "bot_name"}, "bot_ja4": {"ja4", "bot_name"}, "browser_ja4": {"ja4", "browser_family", "tls_library"}, "browser_h2": {"h2_fingerprint", "browser_family"}, "asn_reputation": {"src_asn", "label"}, "iplocate_asn": {"asn", "country_code", "name", "network"}, "anubis_ip_rules": {"prefix", "bot_name", "action", "category"}, "anubis_asn_rules": {"asn", "bot_name", "action", "category"}, } _REFLIST_SEARCH_COLS: dict[str, list[str]] = { "bot_ip": ["prefix", "bot_name"], "bot_ja4": ["ja4", "bot_name"], "browser_ja4": ["ja4", "browser_family", "tls_library", "context"], "browser_h2": ["h2_fingerprint", "browser_family"], "asn_reputation": ["toString(src_asn)", "label"], "iplocate_asn": ["network", "toString(asn)", "country_code", "name"], "anubis_ip_rules": ["prefix", "bot_name", "action", "category"], "anubis_asn_rules": ["toString(asn)", "bot_name", "action", "category"], } _REFLIST_QUERIES: dict[str, str] = { "bot_ip": f"SELECT prefix, bot_name FROM dictionary('{_DB}.dict_bot_ip')", "bot_ja4": f"SELECT ja4, bot_name FROM dictionary('{_DB}.dict_bot_ja4')", "browser_ja4": ( f"SELECT ja4, browser_family, tls_library, context " f"FROM dictionary('{_DB}.dict_browser_ja4')" ), "browser_h2": ( f"SELECT h2_fingerprint, browser_family " f"FROM dictionary('{_DB}.dict_browser_h2') ORDER BY browser_family" ), "asn_reputation": ( f"SELECT src_asn, label FROM dictionary('{_DB}.dict_asn_reputation')" ), "iplocate_asn": ( f"SELECT network, asn, country_code, name " f"FROM dictionary('{_DB}.dict_iplocate_asn')" ), "anubis_ip_rules": ( f"SELECT prefix, bot_name, action, category FROM {_DB}.anubis_ip_rules" ), "anubis_asn_rules": ( f"SELECT asn, bot_name, action, category FROM {_DB}.anubis_asn_rules" ), } @router.get("/reflist/{name}") async def reflist( name: str, limit: int = Query(default=200, ge=1, le=10000), offset: int = Query(default=0, ge=0), sort: str = Query(default=""), order: str = Query(default="ASC"), search: str = Query(default=""), ): """Contenu paginé d'une liste de référence / dictionnaire.""" if name not in _REFLIST_QUERIES: raise HTTPException(status_code=404, detail=f"Unknown reflist: {name}") base_q = _REFLIST_QUERIES[name] order_clause = "" if sort and sort in _REFLIST_SORT.get(name, set()): direction = "DESC" if order.upper() == "DESC" else "ASC" order_clause = f" ORDER BY {sort} {direction}" where_clause = "" params: dict = {} if search: params["_q"] = f"%{search}%" cols = _REFLIST_SEARCH_COLS.get(name, []) if cols: conditions = " OR ".join(f"{c} LIKE {{_q:String}}" for c in cols) where_clause = f" WHERE ({conditions})" try: wrapped = f"SELECT * FROM ({base_q}){where_clause}" count_q = f"SELECT count() AS total FROM ({wrapped})" total_row = query(count_q, params or None) total = total_row[0]["total"] if total_row else 0 data_q = f"{wrapped}{order_clause} LIMIT {int(limit)} OFFSET {int(offset)}" rows = query(data_q, params or None) return {"name": name, "total": total, "limit": limit, "offset": offset, "rows": rows} except Exception as exc: logger.exception("reflist query failed for %s", name) raise HTTPException(status_code=500, detail=str(exc)) @router.get("/reflist/{name}/stats") async def reflist_stats(name: str): """Statistiques agrégées pour une liste de référence.""" if name not in _REFLIST_QUERIES: raise HTTPException(status_code=404, detail=f"Unknown reflist: {name}") base_q = _REFLIST_QUERIES[name] try: count_q = f"SELECT count() AS total FROM ({base_q})" total_row = query(count_q) total = total_row[0]["total"] if total_row else 0 agg: list = [] if name == "bot_ip": agg = query( f"SELECT bot_name, count() AS cnt FROM ({base_q}) " "GROUP BY bot_name ORDER BY cnt DESC LIMIT 20" ) elif name == "bot_ja4": agg = query( f"SELECT bot_name, count() AS cnt FROM ({base_q}) " "GROUP BY bot_name ORDER BY cnt DESC LIMIT 20" ) elif name == "browser_ja4": agg = query( f"SELECT browser_family, count() AS cnt FROM ({base_q}) " "GROUP BY browser_family ORDER BY cnt DESC LIMIT 20" ) elif name == "asn_reputation": agg = query( f"SELECT label, count() AS cnt FROM ({base_q}) " "GROUP BY label ORDER BY cnt DESC" ) elif name == "iplocate_asn": agg = query( f"SELECT country_code, count() AS cnt FROM ({base_q}) " "GROUP BY country_code ORDER BY cnt DESC LIMIT 20" ) elif name == "anubis_ip_rules": agg = query( f"SELECT action, count() AS cnt FROM ({base_q}) " "GROUP BY action ORDER BY cnt DESC" ) elif name == "anubis_asn_rules": agg = query( f"SELECT action, count() AS cnt FROM ({base_q}) " "GROUP BY action ORDER BY cnt DESC" ) return {"name": name, "total": total, "breakdown": agg} except Exception as exc: logger.exception("reflist stats query failed for %s", name) raise HTTPException(status_code=500, detail=str(exc)) @router.get("/fleet") async def fleet() -> dict[str, Any]: """Détections de flottes JA4×ASN (§5.2).""" rows = query( f"SELECT detected_at, community_id, fleet_score, n_ips, ja4_set, asn_set, ip_sample " f"FROM {_DB}.fleet_detections " f"WHERE detected_at >= now() - INTERVAL 7 DAY " f"ORDER BY fleet_score DESC " f"LIMIT 100" ) return {"fleets": rows} @router.get("/health") async def health_metrics() -> dict[str, Any]: """Métriques de santé du pipeline ML (Étape 9).""" rows = query( f"SELECT cycle_at, model_name, total_sessions, correlated_rate, anomaly_rate, " f" critical_count, high_count, drift_rate, drift_alert, cycle_latency_ms, " f" features_valid, features_total, baseline_size, meta_learner_active " f"FROM {_DB}.ml_performance_metrics " f"WHERE cycle_at >= now() - INTERVAL 7 DAY " f"ORDER BY cycle_at DESC " f"LIMIT 500" ) # Statistiques de synthèse if rows: latest = {r['model_name']: r for r in rows} avg_anomaly = sum(r['anomaly_rate'] for r in rows) / len(rows) avg_latency = sum(r['cycle_latency_ms'] for r in rows) / len(rows) else: latest = {} avg_anomaly = 0 avg_latency = 0 return { "metrics": rows, "latest_by_model": latest, "avg_anomaly_rate": round(avg_anomaly, 4), "avg_latency_ms": round(avg_latency), } @router.get("/browser-signatures") async def browser_signatures() -> dict[str, Any]: """Analyse des signatures navigateur passives (browser_matcher, §4). Exploite les colonnes H2 de view_ai_features_1h : h2_dict_family, h2_window_{chrome,firefox,safari,absent}, tls_h2_family_mismatch, h2_order_*, h2_settings_known. """ result: dict[str, Any] = { "stats": {}, "h2_families": [], "h2_window_signals": [], "mismatch_by_family": [], "top_mismatches": [], } # ── Compteurs globaux ────────────────────────────────────────────────── try: stats = query( f"SELECT " f" count() AS total_sessions, " f" countIf(h2_settings_known > 0) AS sessions_with_h2, " f" countIf(h2_dict_family != '') AS sessions_matched, " f" countIf(tls_h2_family_mismatch > 0) AS sessions_mismatch, " f" round(100.0 * countIf(h2_dict_family != '') / greatest(countIf(h2_settings_known > 0), 1), 1) AS match_rate, " f" round(100.0 * countIf(tls_h2_family_mismatch > 0) / greatest(countIf(h2_settings_known > 0), 1), 1) AS mismatch_rate, " f" countIf(h2_priority_present > 0) AS sessions_with_priority " f"FROM {_DB}.view_ai_features_1h" ) if stats: result["stats"] = stats[0] except Exception: logger.debug("view_ai_features_1h unavailable for /api/browsers stats") # ── Distribution des familles H2 (dict_browser_h2) ──────────────────── try: families = query( f"SELECT h2_dict_family AS family, count() AS sessions " f"FROM {_DB}.view_ai_features_1h " f"WHERE h2_dict_family != '' " f"GROUP BY family ORDER BY sessions DESC" ) result["h2_families"] = families except Exception: pass # ── Répartition des signaux WINDOW_UPDATE H2 ────────────────────────── try: wu_rows = query( f"SELECT " f" 'Chrome (WU≈15663105)' AS signal, countIf(h2_window_chrome > 0) AS sessions FROM {_DB}.view_ai_features_1h " f"UNION ALL " f"SELECT 'Firefox (WU≈12517377)', countIf(h2_window_firefox > 0) FROM {_DB}.view_ai_features_1h " f"UNION ALL " f"SELECT 'Safari (WU≈10485760)', countIf(h2_window_safari > 0) FROM {_DB}.view_ai_features_1h " f"UNION ALL " f"SELECT 'Absent (outil/curl)', countIf(h2_window_absent > 0) FROM {_DB}.view_ai_features_1h " f"UNION ALL " f"SELECT 'Autre / go net/http', countIf(h2_settings_known > 0 AND h2_window_chrome = 0 AND h2_window_firefox = 0 AND h2_window_safari = 0 AND h2_window_absent = 0) FROM {_DB}.view_ai_features_1h" ) result["h2_window_signals"] = wu_rows except Exception: pass # ── Mismatch TLS↔H2 par famille JA4 ────────────────────────────────── try: mismatches = query( f"SELECT browser_family AS ja4_family, " f" count() AS total, " f" countIf(tls_h2_family_mismatch > 0) AS mismatches, " f" round(100.0 * countIf(tls_h2_family_mismatch > 0) / count(), 1) AS mismatch_pct " f"FROM {_DB}.view_ai_features_1h " f"WHERE browser_family != '' " f"GROUP BY ja4_family ORDER BY mismatches DESC" ) result["mismatch_by_family"] = mismatches except Exception: pass # ── Top 20 sessions suspectes (mismatch TLS↔H2 confirmé) ───────────── try: suspects = query( f"SELECT " f" replaceRegexpOne(toString(src_ip), '^::ffff:', '') AS ip, " f" ja4, " f" browser_family AS ja4_family, " f" h2_dict_family, " f" h2_window_update_value AS wu_value, " f" hits, " f" h2_pseudo_ord_raw AS pseudo_order, " f" fingerprint_coherence_score AS coherence, " f" h2_header_table_size, " f" h2_enable_push, " f" h2_max_concurrent_streams, " f" h2_initial_window_size, " f" h2_max_frame_size, " f" h2_max_header_list_size, " f" h2_enable_connect_protocol " f"FROM {_DB}.view_ai_features_1h " f"WHERE tls_h2_family_mismatch > 0 " f"ORDER BY hits DESC " f"LIMIT 20" ) result["top_mismatches"] = suspects except Exception: pass return result # --------------------------------------------------------------------------- # GET /api/browser-signatures/entries — liste des fingerprints H2 gérés # POST /api/browser-signatures/entries — ajouter un fingerprint H2 # DELETE /api/browser-signatures/entries — supprimer un fingerprint H2 # --------------------------------------------------------------------------- class BrowserH2Entry(BaseModel): """Nouveau fingerprint H2 à enregistrer dans browser_h2_signatures.""" h2_fingerprint: str browser_family: str confidence: float = 1.0 notes: str = "" _VALID_BROWSER_FAMILIES = {"Chrome", "Firefox", "Safari", "Edge", "Other"} @router.get("/browser-signatures/entries") async def browser_sig_entries() -> dict[str, Any]: """Retourne le contenu de la table browser_h2_signatures. Si la table n'existe pas encore (migration 06 non appliquée), retourne les données du dictionnaire CSV (sans confidence/notes). """ # Essai prioritaire : table structurée (post-migration 06) try: rows = query( f"SELECT h2_fingerprint, browser_family, confidence, notes " f"FROM {_DB}.browser_h2_signatures " f"ORDER BY browser_family, confidence DESC" ) return {"entries": rows, "total": len(rows), "source": "table"} except Exception: pass # Fallback : dictionnaire CSV (pré-migration 06) try: rows = query( f"SELECT h2_fingerprint, browser_family, " f"toFloat32(1.0) AS confidence, '' AS notes " f"FROM dictionary('{_DB}.dict_browser_h2') " f"ORDER BY browser_family" ) return {"entries": rows, "total": len(rows), "source": "dict_csv", "readonly": True} except Exception as exc: logger.exception("browser_h2 entries fallback failed") raise HTTPException(status_code=500, detail=str(exc)) @router.post("/browser-signatures/entries", status_code=201) async def browser_sig_add(body: BrowserH2Entry) -> dict[str, Any]: """Ajoute un fingerprint H2 dans browser_h2_signatures et recharge le dictionnaire.""" if not body.h2_fingerprint.strip(): raise HTTPException(status_code=422, detail="h2_fingerprint ne peut pas être vide") if body.browser_family not in _VALID_BROWSER_FAMILIES: raise HTTPException( status_code=422, detail=f"browser_family doit être l'un de {_VALID_BROWSER_FAMILIES}", ) if not 0.0 <= body.confidence <= 1.0: raise HTTPException(status_code=422, detail="confidence doit être entre 0.0 et 1.0") try: execute( f"INSERT INTO {_DB}.browser_h2_signatures " "(h2_fingerprint, browser_family, confidence, notes) VALUES " "({fp:String}, {fam:String}, {conf:Float32}, {notes:String})", { "fp": body.h2_fingerprint.strip(), "fam": body.browser_family, "conf": body.confidence, "notes": body.notes, }, ) # Force le rechargement du dictionnaire try: execute(f"SYSTEM RELOAD DICTIONARY {_DB}.dict_browser_h2") except Exception: logger.warning("dict_browser_h2 reload failed (migration 06 peut-être non appliquée)") return {"status": "ok", "h2_fingerprint": body.h2_fingerprint.strip()} except Exception as exc: logger.exception("browser_h2_signatures insert failed") raise HTTPException(status_code=500, detail=str(exc)) @router.delete("/browser-signatures/entries") async def browser_sig_delete(fingerprint: str = Query(...)) -> dict[str, Any]: """Supprime un fingerprint H2 de browser_h2_signatures et recharge le dictionnaire.""" if not fingerprint.strip(): raise HTTPException(status_code=422, detail="fingerprint ne peut pas être vide") try: execute( f"ALTER TABLE {_DB}.browser_h2_signatures DELETE " "WHERE h2_fingerprint = {fp:String}", {"fp": fingerprint.strip()}, ) try: execute(f"SYSTEM RELOAD DICTIONARY {_DB}.dict_browser_h2") except Exception: logger.warning("dict_browser_h2 reload failed") return {"status": "ok", "deleted": fingerprint.strip()} except Exception as exc: logger.exception("browser_h2_signatures delete failed") raise HTTPException(status_code=500, detail=str(exc)) # --------------------------------------------------------------------------- # GET /api/fingerprint-discovery — Extraction et regroupement des fingerprints # du trafic réel pour proposer des signatures navigateur # --------------------------------------------------------------------------- @router.get("/fingerprint-discovery") async def fingerprint_discovery( days: int = Query(default=7, ge=1, le=30), min_hits: int = Query(default=10, ge=1, le=100000), limit: int = Query(default=300, ge=10, le=1000), ) -> dict[str, Any]: """Découverte de profils fingerprint depuis http_logs. Regroupe par JA4 et agrège : user-agent, headers HTTP, données H2, TLS — pour proposer des signatures navigateur. """ try: profiles = query( f"SELECT " f" ja4, " # ── Famille navigateur extraite du User-Agent (vote majoritaire) ── f" topK(1)(" f" multiIf(" f" position(header_user_agent, 'Edg/') > 0, 'Edge', " f" position(header_user_agent, 'OPR/') > 0, 'Opera', " f" position(header_user_agent, 'Chrome/') > 0 AND " f" position(header_user_agent, 'Safari/') > 0, 'Chrome', " f" position(header_user_agent, 'Firefox/') > 0, 'Firefox', " f" position(header_user_agent, 'Safari/') > 0, 'Safari', " f" position(lower(header_user_agent), 'bot') > 0 OR " f" position(lower(header_user_agent), 'crawl') > 0 OR " f" position(lower(header_user_agent), 'spider') > 0, 'Bot', " f" header_user_agent = '', 'Vide', " f" 'Autre'" f" )" f" )[1] AS ua_family, " # ── Volume ── f" count() AS total_hits, " f" uniqExact(src_ip) AS unique_ips, " f" uniqExact(header_user_agent) AS distinct_uas, " # ── Échantillons UA (top 3) ── f" topK(3)(header_user_agent) AS ua_samples, " # ── TLS ── f" any(tls_version) AS tls_version, " f" any(tls_alpn) AS tls_alpn, " # ── H2 ── f" anyIf(h2_fingerprint, h2_fingerprint != '') AS h2_fp, " f" anyIf(h2_settings_fp, h2_settings_fp != '') AS h2_settings, " f" max(h2_window_update) AS h2_wu, " f" anyIf(h2_pseudo_order, h2_pseudo_order != '') AS h2_pseudo, " # ── Taux de présence headers (%) ── f" round(countIf(header_sec_ch_ua != '') * 100.0 / count(), 1) " f" AS pct_sec_ch_ua, " f" round(countIf(header_sec_fetch_mode != '') * 100.0 / count(), 1) " f" AS pct_sec_fetch, " f" round(countIf(header_accept_language != '') * 100.0 / count(), 1) " f" AS pct_accept_lang, " f" round(countIf(position(header_accept_encoding, 'zstd') > 0) " f" * 100.0 / count(), 1) AS pct_zstd, " f" round(countIf(position(header_accept_encoding, 'br') > 0) " f" * 100.0 / count(), 1) AS pct_brotli, " f" round(countIf(position(header_accept_encoding, 'gzip') > 0) " f" * 100.0 / count(), 1) AS pct_gzip, " f" round(countIf(header_x_forwarded_for != '') * 100.0 / count(), 1) " f" AS pct_xff, " # ── Détails Sec-CH-UA ── f" anyIf(header_sec_ch_ua, header_sec_ch_ua != '') AS sec_ch_ua_sample, " f" anyIf(header_sec_ch_ua_platform, header_sec_ch_ua_platform != '') " f" AS platform_sample, " f" anyIf(header_sec_ch_ua_mobile, header_sec_ch_ua_mobile != '') " f" AS mobile_sample, " # ── Accept-Encoding dominant ── f" topK(1)(header_accept_encoding)[1] AS accept_enc_main, " # ── Lookup dictionnaire ── f" dictGetOrDefault('{_DB}.dict_browser_ja4', 'browser_family', " f" tuple(ja4), '') AS dict_family " # ── Source ── f"FROM {_DB_LOGS}.http_logs " "WHERE ja4 != '' AND log_date >= today() - {days:UInt32} " "GROUP BY ja4 " "HAVING count() >= {min_hits:UInt32} " "ORDER BY total_hits DESC " "LIMIT {lim:UInt32}", {"days": days, "min_hits": min_hits, "lim": limit}, ) except Exception as exc: logger.exception("fingerprint-discovery query failed") raise HTTPException(status_code=500, detail=str(exc)) # ── Regroupement par famille navigateur côté Python ── groups: dict[str, dict[str, Any]] = {} for p in profiles: # Famille prioritaire : dict > UA family = p.get("dict_family") or p.get("ua_family") or "Inconnu" if family not in groups: groups[family] = { "family": family, "ja4_count": 0, "total_hits": 0, "unique_ips": 0, "has_h2": False, "has_sec_ch_ua": False, "has_sec_fetch": False, } g = groups[family] g["ja4_count"] += 1 g["total_hits"] += p.get("total_hits", 0) g["unique_ips"] += p.get("unique_ips", 0) if p.get("h2_fp"): g["has_h2"] = True if (p.get("pct_sec_ch_ua") or 0) > 50: g["has_sec_ch_ua"] = True if (p.get("pct_sec_fetch") or 0) > 50: g["has_sec_fetch"] = True groups_sorted = sorted( groups.values(), key=lambda g: g["total_hits"], reverse=True ) return { "profiles": profiles, "groups": groups_sorted, "meta": { "total_ja4": len(profiles), "total_groups": len(groups_sorted), "days": days, "min_hits": min_hits, }, }