- base.html: collapsible sidebar navigation, doc tooltip system, JS helpers (fmtNum, fmtPct, fmtDuration, ecGrid, buildTable, docHTML) - overview.html: SOC command center with stacked timeline, live alerts, campaigns panel, browser donut, 6 KPIs - detections.html: threat color dots, raw score column, click-to-navigate rows - network.html: JA4 rotation, brute-force, persistent threats tables, 6 KPIs - ip_detail.html: ASN/country KPIs, AE/XGB/campaign columns, enriched features - scores/traffic/features/models/classify: page_title blocks + doc tooltips - api.py: 9 new endpoints (campaigns, brute-force, ja4-rotation, recurrence, cascade, alerts, timeline-detail, ua-rotation) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
990 lines
36 KiB
Python
990 lines
36 KiB
Python
"""JSON API endpoints for the JA4 SOC Dashboard."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import os
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
from fastapi import APIRouter, HTTPException, Query
|
||
from pydantic import BaseModel
|
||
|
||
from backend.config import DB_PROCESSING, DB_LOGS, safe_identifier
|
||
from backend.database import query, query_scalar, execute
|
||
|
||
logger = logging.getLogger(__name__)
|
||
router = APIRouter(prefix="/api")
|
||
|
||
# Pre-validate DB identifiers at import time
|
||
_DB = safe_identifier(DB_PROCESSING)
|
||
_DB_LOGS = safe_identifier(DB_LOGS)
|
||
|
||
# Whitelists for sort/order to prevent SQL injection
|
||
_DETECTION_SORT_COLS = {
|
||
"detected_at", "src_ip", "ja4", "host", "anomaly_score",
|
||
"threat_level", "recurrence", "hits", "hit_velocity",
|
||
"fuzzing_index", "post_ratio", "campaign_id",
|
||
"asn_org", "country_code", "bot_name", "browser_family",
|
||
}
|
||
_SCORE_SORT_COLS = {
|
||
"detected_at", "window_start", "src_ip", "ja4", "host",
|
||
"anomaly_score", "raw_anomaly_score", "threat_level",
|
||
"hits", "hit_velocity", "xgb_prob", "ae_recon_error",
|
||
"asn_org", "country_code", "browser_family",
|
||
}
|
||
_TRAFFIC_SORT_COLS = {
|
||
"time", "src_ip", "method", "host", "path", "http_version",
|
||
"header_user_agent", "ja4", "src_country_code",
|
||
}
|
||
_ORDER_VALUES = {"ASC", "DESC"}
|
||
|
||
|
||
def _validate_sort(value: str, whitelist: set[str], default: str) -> str:
|
||
return value if value in whitelist else default
|
||
|
||
|
||
def _validate_order(value: str) -> str:
|
||
return value.upper() if value.upper() in _ORDER_VALUES else "DESC"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/overview
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/overview")
|
||
async def overview() -> dict[str, Any]:
|
||
try:
|
||
detections_24h = query_scalar(
|
||
f"SELECT count() FROM {_DB}.ml_detected_anomalies "
|
||
"WHERE detected_at >= now() - INTERVAL 1 DAY"
|
||
) or 0
|
||
|
||
scored_24h = query_scalar(
|
||
f"SELECT count() FROM {_DB}.ml_all_scores "
|
||
"WHERE detected_at >= now() - INTERVAL 1 DAY"
|
||
) or 0
|
||
|
||
threat_distribution = query(
|
||
f"SELECT threat_level, count() AS cnt "
|
||
f"FROM {_DB}.ml_all_scores "
|
||
"WHERE detected_at >= now() - INTERVAL 1 DAY "
|
||
"GROUP BY threat_level"
|
||
)
|
||
|
||
# Compute critical / high counts from distribution
|
||
threat_map = {r["threat_level"]: r["cnt"] for r in threat_distribution}
|
||
critical_count = threat_map.get("CRITICAL", 0)
|
||
high_count = threat_map.get("HIGH", 0)
|
||
|
||
unique_ips = query_scalar(
|
||
f"SELECT uniq(src_ip) FROM {_DB}.ml_detected_anomalies "
|
||
"WHERE detected_at >= now() - INTERVAL 1 DAY"
|
||
) or 0
|
||
|
||
top_ips = query(
|
||
f"SELECT toString(src_ip) AS src_ip, count() AS cnt, "
|
||
f"max(anomaly_score) AS worst_score, "
|
||
f"any(threat_level) AS threat_level, "
|
||
f"any(asn_org) AS asn_org, any(country_code) AS country_code "
|
||
f"FROM {_DB}.ml_detected_anomalies "
|
||
"WHERE detected_at >= now() - INTERVAL 1 DAY "
|
||
"GROUP BY src_ip ORDER BY cnt DESC LIMIT 10"
|
||
)
|
||
|
||
timeline = query(
|
||
f"SELECT toStartOfHour(detected_at) AS hour, count() AS cnt "
|
||
f"FROM {_DB}.ml_detected_anomalies "
|
||
"WHERE detected_at >= now() - INTERVAL 1 DAY "
|
||
"GROUP BY hour ORDER BY hour"
|
||
)
|
||
|
||
traffic_24h = query_scalar(
|
||
f"SELECT count() FROM {_DB_LOGS}.http_logs "
|
||
"WHERE time >= now() - INTERVAL 1 DAY"
|
||
) or 0
|
||
|
||
models = query(
|
||
f"SELECT model_name, count() AS scored "
|
||
f"FROM {_DB}.ml_all_scores "
|
||
"WHERE detected_at >= now() - INTERVAL 1 DAY "
|
||
"GROUP BY model_name"
|
||
)
|
||
|
||
browser_stats = query(
|
||
f"SELECT browser_family, count() AS cnt "
|
||
f"FROM {_DB}.ml_all_scores "
|
||
"WHERE detected_at >= now() - INTERVAL 1 DAY "
|
||
"AND browser_family != '' "
|
||
"GROUP BY browser_family ORDER BY cnt DESC"
|
||
)
|
||
|
||
legitimate_browsers = query_scalar(
|
||
f"SELECT count() FROM {_DB}.ml_all_scores "
|
||
"WHERE detected_at >= now() - INTERVAL 1 DAY "
|
||
"AND threat_level = 'LEGITIMATE_BROWSER'"
|
||
) or 0
|
||
|
||
return {
|
||
"detections_24h": detections_24h,
|
||
"scored_24h": scored_24h,
|
||
"traffic_24h": traffic_24h,
|
||
"unique_ips": unique_ips,
|
||
"critical_count": critical_count,
|
||
"high_count": high_count,
|
||
"legitimate_browsers": legitimate_browsers,
|
||
"browser_stats": browser_stats,
|
||
"threat_distribution": threat_distribution,
|
||
"top_ips": top_ips,
|
||
"timeline": [{"hour": str(r["hour"]), "cnt": r["cnt"]} for r in timeline],
|
||
"models": models,
|
||
}
|
||
except Exception as exc:
|
||
logger.exception("overview query failed")
|
||
raise HTTPException(status_code=500, detail=str(exc))
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/detections
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/detections")
|
||
async def detections(
|
||
page: int = Query(1, ge=1),
|
||
per_page: int = Query(50, ge=1, le=500),
|
||
sort: str = Query("detected_at"),
|
||
order: str = Query("DESC"),
|
||
threat_level: str | None = Query(None),
|
||
search: str | None = Query(None),
|
||
asn_org: str | None = Query(None),
|
||
country_code: str | None = Query(None),
|
||
ja4: str | None = Query(None),
|
||
bot_name: str | None = Query(None),
|
||
browser_family: str | None = Query(None),
|
||
) -> dict[str, Any]:
|
||
sort = _validate_sort(sort, _DETECTION_SORT_COLS, "detected_at")
|
||
order = _validate_order(order)
|
||
offset = (page - 1) * per_page
|
||
|
||
where_clauses = ["detected_at >= now() - INTERVAL 30 DAY"]
|
||
params: dict[str, Any] = {}
|
||
|
||
if threat_level:
|
||
where_clauses.append("threat_level = {tl:String}")
|
||
params["tl"] = threat_level
|
||
|
||
if search:
|
||
where_clauses.append(
|
||
"(toString(src_ip) LIKE {search:String} OR host LIKE {search:String})"
|
||
)
|
||
params["search"] = f"%{search}%"
|
||
|
||
if asn_org:
|
||
where_clauses.append("asn_org = {asn_org:String}")
|
||
params["asn_org"] = asn_org
|
||
|
||
if country_code:
|
||
where_clauses.append("country_code = {cc:String}")
|
||
params["cc"] = country_code
|
||
|
||
if ja4:
|
||
where_clauses.append("ja4 = {ja4:String}")
|
||
params["ja4"] = ja4
|
||
|
||
if bot_name:
|
||
where_clauses.append("bot_name = {bn:String}")
|
||
params["bn"] = bot_name
|
||
|
||
if browser_family:
|
||
where_clauses.append("browser_family = {bf:String}")
|
||
params["bf"] = browser_family
|
||
|
||
where = " AND ".join(where_clauses)
|
||
|
||
try:
|
||
total = query_scalar(
|
||
f"SELECT count() FROM {_DB}.ml_detected_anomalies WHERE {where}",
|
||
params,
|
||
)
|
||
|
||
rows = query(
|
||
f"SELECT *, toString(src_ip) AS src_ip_str "
|
||
f"FROM {_DB}.ml_detected_anomalies "
|
||
f"WHERE {where} ORDER BY {sort} {order} "
|
||
f"LIMIT {{lim:UInt32}} OFFSET {{off:UInt32}}",
|
||
{**params, "lim": per_page, "off": offset},
|
||
)
|
||
|
||
return {
|
||
"data": rows,
|
||
"total": total or 0,
|
||
"page": page,
|
||
"per_page": per_page,
|
||
"pages": max(1, -(-((total or 0)) // per_page)),
|
||
}
|
||
except Exception as exc:
|
||
logger.exception("detections query failed")
|
||
raise HTTPException(status_code=500, detail=str(exc))
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/scores
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/scores")
|
||
async def scores(
|
||
page: int = Query(1, ge=1),
|
||
per_page: int = Query(50, ge=1, le=500),
|
||
sort: str = Query("detected_at"),
|
||
order: str = Query("DESC"),
|
||
threat_level: str | None = Query(None),
|
||
search: str | None = Query(None),
|
||
asn_org: str | None = Query(None),
|
||
country_code: str | None = Query(None),
|
||
ja4: str | None = Query(None),
|
||
browser_family: str | None = Query(None),
|
||
) -> dict[str, Any]:
|
||
sort = _validate_sort(sort, _SCORE_SORT_COLS, "detected_at")
|
||
order = _validate_order(order)
|
||
offset = (page - 1) * per_page
|
||
|
||
where_clauses = ["detected_at >= now() - INTERVAL 3 DAY"]
|
||
params: dict[str, Any] = {}
|
||
|
||
if threat_level:
|
||
where_clauses.append("threat_level = {tl:String}")
|
||
params["tl"] = threat_level
|
||
|
||
if search:
|
||
where_clauses.append(
|
||
"(toString(src_ip) LIKE {search:String} OR host LIKE {search:String})"
|
||
)
|
||
params["search"] = f"%{search}%"
|
||
|
||
if asn_org:
|
||
where_clauses.append("asn_org = {asn_org:String}")
|
||
params["asn_org"] = asn_org
|
||
|
||
if country_code:
|
||
where_clauses.append("country_code = {cc:String}")
|
||
params["cc"] = country_code
|
||
|
||
if ja4:
|
||
where_clauses.append("ja4 = {ja4:String}")
|
||
params["ja4"] = ja4
|
||
|
||
if browser_family:
|
||
where_clauses.append("browser_family = {bf:String}")
|
||
params["bf"] = browser_family
|
||
|
||
where = " AND ".join(where_clauses)
|
||
|
||
try:
|
||
total = query_scalar(
|
||
f"SELECT count() FROM {_DB}.ml_all_scores WHERE {where}",
|
||
params,
|
||
)
|
||
|
||
rows = query(
|
||
f"SELECT *, toString(src_ip) AS src_ip_str "
|
||
f"FROM {_DB}.ml_all_scores "
|
||
f"WHERE {where} ORDER BY {sort} {order} "
|
||
f"LIMIT {{lim:UInt32}} OFFSET {{off:UInt32}}",
|
||
{**params, "lim": per_page, "off": offset},
|
||
)
|
||
|
||
return {
|
||
"data": rows,
|
||
"total": total or 0,
|
||
"page": page,
|
||
"per_page": per_page,
|
||
"pages": max(1, -(-((total or 0)) // per_page)),
|
||
}
|
||
except Exception as exc:
|
||
logger.exception("scores query failed")
|
||
raise HTTPException(status_code=500, detail=str(exc))
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/traffic
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/traffic")
|
||
async def traffic(
|
||
page: int = Query(1, ge=1),
|
||
per_page: int = Query(50, ge=1, le=500),
|
||
sort: str = Query("time"),
|
||
order: str = Query("DESC"),
|
||
method: str | None = Query(None),
|
||
host: str | None = Query(None),
|
||
http_version: str | None = Query(None),
|
||
) -> dict[str, Any]:
|
||
sort = _validate_sort(sort, _TRAFFIC_SORT_COLS, "time")
|
||
order = _validate_order(order)
|
||
offset = (page - 1) * per_page
|
||
|
||
where_clauses = ["time >= now() - INTERVAL 1 DAY"]
|
||
params: dict[str, Any] = {}
|
||
|
||
if method:
|
||
where_clauses.append("method = {method:String}")
|
||
params["method"] = method
|
||
|
||
if host:
|
||
where_clauses.append("host LIKE {host:String}")
|
||
params["host"] = f"%{host}%"
|
||
|
||
if http_version is not None:
|
||
where_clauses.append("http_version = {http_version:String}")
|
||
params["http_version"] = http_version
|
||
|
||
where = " AND ".join(where_clauses)
|
||
|
||
try:
|
||
total = query_scalar(
|
||
f"SELECT count() FROM {_DB_LOGS}.http_logs WHERE {where}",
|
||
params,
|
||
)
|
||
|
||
rows = query(
|
||
f"SELECT time, toString(src_ip) AS src_ip, method, host, path, "
|
||
f"http_version, header_user_agent, ja4, src_country_code "
|
||
f"FROM {_DB_LOGS}.http_logs "
|
||
f"WHERE {where} ORDER BY {sort} {order} "
|
||
f"LIMIT {{lim:UInt32}} OFFSET {{off:UInt32}}",
|
||
{**params, "lim": per_page, "off": offset},
|
||
)
|
||
|
||
return {
|
||
"data": rows,
|
||
"total": total or 0,
|
||
"page": page,
|
||
"per_page": per_page,
|
||
"pages": max(1, -(-((total or 0)) // per_page)),
|
||
}
|
||
except Exception as exc:
|
||
logger.exception("traffic query failed")
|
||
raise HTTPException(status_code=500, detail=str(exc))
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/ip/{ip}
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/ip/{ip}")
|
||
async def ip_detail(ip: str) -> dict[str, Any]:
|
||
# Strip ::ffff: prefix for IPv4-mapped addresses
|
||
clean_ip = ip.replace("::ffff:", "")
|
||
params = {"ip": clean_ip}
|
||
try:
|
||
detections = query(
|
||
f"SELECT *, toString(src_ip) AS src_ip_str "
|
||
f"FROM {_DB}.ml_detected_anomalies "
|
||
"WHERE src_ip = toIPv6({ip:String}) "
|
||
"AND detected_at >= now() - INTERVAL 30 DAY "
|
||
"ORDER BY detected_at DESC",
|
||
params,
|
||
)
|
||
|
||
all_scores = query(
|
||
f"SELECT *, toString(src_ip) AS src_ip_str "
|
||
f"FROM {_DB}.ml_all_scores "
|
||
"WHERE src_ip = toIPv6({ip:String}) "
|
||
"AND detected_at >= now() - INTERVAL 3 DAY "
|
||
"ORDER BY detected_at DESC",
|
||
params,
|
||
)
|
||
|
||
http_logs = query(
|
||
f"SELECT time, method, host, path, http_version, header_user_agent, ja4 "
|
||
f"FROM {_DB_LOGS}.http_logs "
|
||
"WHERE src_ip = toIPv4OrZero({ip:String}) "
|
||
"AND time >= now() - INTERVAL 1 DAY "
|
||
"ORDER BY time DESC LIMIT 100",
|
||
params,
|
||
)
|
||
|
||
ai_features: list[dict] = []
|
||
try:
|
||
ai_features = query(
|
||
f"SELECT * FROM {_DB}.view_ai_features_1h "
|
||
"WHERE src_ip = toIPv6({ip:String}) LIMIT 1",
|
||
params,
|
||
)
|
||
except Exception:
|
||
logger.debug("view_ai_features_1h unavailable for %s", ip)
|
||
|
||
recurrence: list[dict] = []
|
||
try:
|
||
recurrence = query(
|
||
f"SELECT * FROM {_DB}.view_ip_recurrence "
|
||
"WHERE src_ip = toIPv6({ip:String})",
|
||
params,
|
||
)
|
||
except Exception:
|
||
logger.debug("view_ip_recurrence unavailable for %s", ip)
|
||
|
||
return {
|
||
"ip": ip,
|
||
"detections": detections,
|
||
"scores": all_scores,
|
||
"http_logs": http_logs,
|
||
"ai_features": ai_features,
|
||
"recurrence": recurrence,
|
||
}
|
||
except Exception as exc:
|
||
logger.exception("ip detail query failed for %s", ip)
|
||
raise HTTPException(status_code=500, detail=str(exc))
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/features
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/features")
|
||
async def features() -> dict[str, Any]:
|
||
result: dict[str, Any] = {
|
||
"ai_features": {}, "thesis_features": {},
|
||
"human_profile": {}, "bot_profile": {},
|
||
"feature_importance": [],
|
||
}
|
||
_feat_cols = (
|
||
"avg(hits) AS avg_hits, avg(hit_velocity) AS avg_velocity, "
|
||
"avg(fuzzing_index) AS avg_fuzz, avg(post_ratio) AS avg_post, "
|
||
"avg(asset_ratio) AS avg_asset, avg(direct_access_ratio) AS avg_direct, "
|
||
"avg(temporal_entropy) AS avg_entropy, avg(path_diversity_ratio) AS avg_path_div, "
|
||
"avg(modern_browser_score) AS avg_browser, avg(header_count) AS avg_headers, "
|
||
"avg(src_port_density) AS avg_port_density, avg(distinct_ja4_count) AS avg_ja4_count"
|
||
)
|
||
try:
|
||
ai_stats = query(
|
||
f"SELECT count() AS total, {_feat_cols} FROM {_DB}.view_ai_features_1h"
|
||
)
|
||
if ai_stats:
|
||
result["ai_features"] = ai_stats[0]
|
||
except Exception:
|
||
logger.debug("view_ai_features_1h not available")
|
||
|
||
try:
|
||
thesis_stats = query(
|
||
f"SELECT count() AS total, {_feat_cols} FROM {_DB}.view_thesis_features_1h"
|
||
)
|
||
if thesis_stats:
|
||
result["thesis_features"] = thesis_stats[0]
|
||
except Exception:
|
||
logger.debug("view_thesis_features_1h not available")
|
||
|
||
# ISP (residential) vs bot feature profiles for radar comparison
|
||
try:
|
||
human = query(
|
||
f"SELECT {_feat_cols} FROM {_DB}.view_ai_features_1h "
|
||
"WHERE asn_label = 'isp'"
|
||
)
|
||
if human:
|
||
result["human_profile"] = human[0]
|
||
except Exception:
|
||
pass
|
||
|
||
try:
|
||
bot = query(
|
||
f"SELECT {_feat_cols} FROM {_DB}.view_ai_features_1h "
|
||
"WHERE asn_label IN ('datacenter', 'hosting')"
|
||
)
|
||
if bot:
|
||
result["bot_profile"] = bot[0]
|
||
except Exception:
|
||
pass
|
||
|
||
# Feature variance (importance proxy)
|
||
try:
|
||
variance_rows = query(
|
||
f"SELECT "
|
||
f"varPop(hit_velocity) AS v_velocity, "
|
||
f"varPop(fuzzing_index) AS v_fuzz, "
|
||
f"varPop(post_ratio) AS v_post, "
|
||
f"varPop(asset_ratio) AS v_asset, "
|
||
f"varPop(direct_access_ratio) AS v_direct, "
|
||
f"varPop(temporal_entropy) AS v_entropy, "
|
||
f"varPop(path_diversity_ratio) AS v_path_div, "
|
||
f"varPop(src_port_density) AS v_port_density "
|
||
f"FROM {_DB}.view_ai_features_1h"
|
||
)
|
||
if variance_rows:
|
||
row = variance_rows[0]
|
||
result["feature_importance"] = [
|
||
{"name": k.replace("v_", ""), "variance": v}
|
||
for k, v in sorted(row.items(), key=lambda x: -(x[1] or 0))
|
||
]
|
||
except Exception:
|
||
pass
|
||
|
||
return result
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/geo — Geographic & ASN breakdown
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/geo")
|
||
async def geo() -> dict[str, Any]:
|
||
try:
|
||
countries = query(
|
||
f"SELECT country_code, asn_label, "
|
||
f"count() AS sessions, sum(hits) AS total_hits "
|
||
f"FROM {_DB}.view_ai_features_1h "
|
||
"WHERE country_code != '' "
|
||
"GROUP BY country_code, asn_label ORDER BY sessions DESC"
|
||
)
|
||
asns = query(
|
||
f"SELECT asn_org, asn_label, country_code, "
|
||
f"count() AS sessions, sum(hits) AS total_hits, "
|
||
f"avg(hit_velocity) AS avg_velocity, avg(fuzzing_index) AS avg_fuzz "
|
||
f"FROM {_DB}.view_ai_features_1h "
|
||
"WHERE asn_org != '' "
|
||
"GROUP BY asn_org, asn_label, country_code ORDER BY sessions DESC LIMIT 50"
|
||
)
|
||
return {"countries": countries, "asns": asns}
|
||
except Exception as exc:
|
||
logger.exception("geo query failed")
|
||
return {"countries": [], "asns": []}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/fingerprints — JA4 fingerprint analysis
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/fingerprints")
|
||
async def fingerprints() -> dict[str, Any]:
|
||
try:
|
||
ja4_stats = query(
|
||
f"SELECT ja4, asn_label, "
|
||
f"count() AS sessions, sum(hits) AS total_hits, "
|
||
f"avg(hit_velocity) AS avg_velocity, "
|
||
f"avg(fuzzing_index) AS avg_fuzz, "
|
||
f"avg(modern_browser_score) AS avg_browser_score "
|
||
f"FROM {_DB}.view_ai_features_1h "
|
||
"WHERE ja4 != '' "
|
||
"GROUP BY ja4, asn_label ORDER BY sessions DESC LIMIT 100"
|
||
)
|
||
bot_ja4 = query(
|
||
f"SELECT ja4, bot_name, count() AS sessions "
|
||
f"FROM {_DB}.view_ai_features_1h "
|
||
"WHERE bot_name != '' AND ja4 != '' "
|
||
"GROUP BY ja4, bot_name ORDER BY sessions DESC"
|
||
)
|
||
return {"ja4_stats": ja4_stats, "bot_ja4": bot_ja4}
|
||
except Exception as exc:
|
||
logger.exception("fingerprints query failed")
|
||
return {"ja4_stats": [], "bot_ja4": []}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/browsers — Browser family distribution from JA4 fingerprints
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/browsers")
|
||
async def browsers() -> dict[str, Any]:
|
||
"""Browser identification via JA4 TLS fingerprint → browser_family dictionary."""
|
||
try:
|
||
distribution = query(
|
||
f"SELECT browser_family, count() AS sessions, "
|
||
f"uniqExact(src_ip) AS unique_ips, sum(hits) AS total_hits "
|
||
f"FROM {_DB}.view_ai_features_1h "
|
||
"WHERE browser_family != '' "
|
||
"GROUP BY browser_family ORDER BY sessions DESC"
|
||
)
|
||
# Also get unknown (no browser match)
|
||
unknown = query_scalar(
|
||
f"SELECT count() FROM {_DB}.view_ai_features_1h "
|
||
"WHERE browser_family = '' AND bot_name = ''"
|
||
)
|
||
# Top JA4 per browser family
|
||
top_ja4 = query(
|
||
f"SELECT browser_family, ja4, count() AS sessions "
|
||
f"FROM {_DB}.view_ai_features_1h "
|
||
"WHERE browser_family != '' "
|
||
"GROUP BY browser_family, ja4 ORDER BY browser_family, sessions DESC "
|
||
"LIMIT 50"
|
||
)
|
||
return {
|
||
"distribution": distribution,
|
||
"unknown_sessions": unknown or 0,
|
||
"top_ja4_by_browser": top_ja4,
|
||
}
|
||
except Exception as exc:
|
||
logger.exception("browsers query failed")
|
||
return {"distribution": [], "unknown_sessions": 0, "top_ja4_by_browser": []}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/behavior — Feature scatter + distributions
|
||
# ---------------------------------------------------------------------------
|
||
_BEHAVIOR_FEATURES = [
|
||
"hit_velocity", "fuzzing_index", "post_ratio", "asset_ratio",
|
||
"direct_access_ratio", "temporal_entropy", "path_diversity_ratio",
|
||
"modern_browser_score", "header_count", "is_ua_rotating",
|
||
"distinct_ja4_count", "src_port_density",
|
||
]
|
||
|
||
|
||
@router.get("/behavior")
|
||
async def behavior() -> dict[str, Any]:
|
||
cols = ", ".join(_BEHAVIOR_FEATURES)
|
||
try:
|
||
scatter = query(
|
||
f"SELECT toString(src_ip) AS ip, asn_label, bot_name, hits, {cols} "
|
||
f"FROM {_DB}.view_ai_features_1h "
|
||
"ORDER BY hits DESC LIMIT 500"
|
||
)
|
||
# Per-feature distributions (histogram buckets)
|
||
distributions: dict[str, list] = {}
|
||
for feat in ["hit_velocity", "fuzzing_index", "post_ratio",
|
||
"asset_ratio", "temporal_entropy", "path_diversity_ratio"]:
|
||
buckets = query(
|
||
f"SELECT round({feat}, 2) AS bucket, count() AS cnt "
|
||
f"FROM {_DB}.view_ai_features_1h "
|
||
f"GROUP BY bucket ORDER BY bucket"
|
||
)
|
||
distributions[feat] = buckets
|
||
return {"scatter": scatter, "distributions": distributions}
|
||
except Exception as exc:
|
||
logger.exception("behavior query failed")
|
||
return {"scatter": [], "distributions": {}}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/heatmap — Temporal heatmap (hour × day)
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/heatmap")
|
||
async def heatmap() -> dict[str, Any]:
|
||
try:
|
||
cells = query(
|
||
f"SELECT toDayOfWeek(time) - 1 AS dow, toHour(time) AS hour, count() AS cnt "
|
||
f"FROM {_DB_LOGS}.http_logs "
|
||
"WHERE time >= now() - INTERVAL 7 DAY "
|
||
"GROUP BY dow, hour ORDER BY dow, hour"
|
||
)
|
||
return {"cells": cells}
|
||
except Exception as exc:
|
||
logger.exception("heatmap query failed")
|
||
return {"cells": []}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/ip/{ip}/radar — Radar comparison vs ISP baseline
|
||
# ---------------------------------------------------------------------------
|
||
_RADAR_FEATURES = [
|
||
"hit_velocity", "fuzzing_index", "post_ratio", "asset_ratio",
|
||
"direct_access_ratio", "temporal_entropy", "path_diversity_ratio",
|
||
"modern_browser_score",
|
||
]
|
||
|
||
|
||
@router.get("/ip/{ip}/radar")
|
||
async def ip_radar(ip: str) -> dict[str, Any]:
|
||
clean_ip = ip.replace("::ffff:", "")
|
||
cols_avg = ", ".join(f"avg({f}) AS {f}" for f in _RADAR_FEATURES)
|
||
try:
|
||
ip_data = query(
|
||
f"SELECT {', '.join(_RADAR_FEATURES)} "
|
||
f"FROM {_DB}.view_ai_features_1h "
|
||
"WHERE src_ip = toIPv6({ip:String}) LIMIT 1",
|
||
{"ip": clean_ip},
|
||
)
|
||
baseline = query(
|
||
f"SELECT {cols_avg} "
|
||
f"FROM {_DB}.view_ai_features_1h "
|
||
"WHERE asn_label = 'isp'"
|
||
)
|
||
bot_avg = query(
|
||
f"SELECT {cols_avg} "
|
||
f"FROM {_DB}.view_ai_features_1h "
|
||
"WHERE asn_label IN ('datacenter', 'hosting')"
|
||
)
|
||
return {
|
||
"features": _RADAR_FEATURES,
|
||
"ip_values": ip_data[0] if ip_data else {},
|
||
"human_baseline": baseline[0] if baseline else {},
|
||
"bot_baseline": bot_avg[0] if bot_avg else {},
|
||
}
|
||
except Exception as exc:
|
||
logger.exception("ip radar query failed for %s", ip)
|
||
return {"features": _RADAR_FEATURES, "ip_values": {},
|
||
"human_baseline": {}, "bot_baseline": {}}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/models
|
||
# ---------------------------------------------------------------------------
|
||
_MODEL_DIR = Path("/data/models")
|
||
|
||
|
||
@router.get("/models")
|
||
async def models() -> dict[str, Any]:
|
||
model_info: list[dict[str, Any]] = []
|
||
|
||
if _MODEL_DIR.is_dir():
|
||
for p in sorted(_MODEL_DIR.glob("*.json")):
|
||
try:
|
||
data = json.loads(p.read_text())
|
||
model_info.append(data)
|
||
except Exception:
|
||
logger.warning("Could not read model metadata %s", p)
|
||
|
||
# Also fetch latest scoring stats from ClickHouse
|
||
scoring_stats: list[dict] = []
|
||
try:
|
||
scoring_stats = query(
|
||
f"SELECT model_name, count() AS scored, "
|
||
f"min(detected_at) AS first_seen, max(detected_at) AS last_seen "
|
||
f"FROM {_DB}.ml_all_scores "
|
||
"WHERE detected_at >= now() - INTERVAL 7 DAY "
|
||
"GROUP BY model_name"
|
||
)
|
||
except Exception:
|
||
logger.debug("could not fetch model scoring stats")
|
||
|
||
return {"models": model_info, "scoring_stats": scoring_stats}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# POST /api/classify — SOC analyst feedback
|
||
# ---------------------------------------------------------------------------
|
||
class ClassifyRequest(BaseModel):
|
||
src_ip: str
|
||
classification: str # bot | legitimate | suspicious
|
||
comment: str = ""
|
||
|
||
|
||
_VALID_CLASSIFICATIONS = {"bot", "legitimate", "suspicious"}
|
||
_feedback_table_ensured = False
|
||
|
||
|
||
def _ensure_feedback_table() -> None:
|
||
global _feedback_table_ensured
|
||
if _feedback_table_ensured:
|
||
return
|
||
execute(
|
||
f"CREATE TABLE IF NOT EXISTS {_DB}.soc_feedback ("
|
||
" created_at DateTime DEFAULT now(),"
|
||
" src_ip IPv6,"
|
||
" classification LowCardinality(String),"
|
||
" comment String"
|
||
") ENGINE = MergeTree() ORDER BY (src_ip, created_at)"
|
||
)
|
||
_feedback_table_ensured = True
|
||
|
||
|
||
@router.post("/classify")
|
||
async def classify(body: ClassifyRequest) -> dict[str, Any]:
|
||
if body.classification not in _VALID_CLASSIFICATIONS:
|
||
raise HTTPException(
|
||
status_code=422,
|
||
detail=f"classification must be one of {_VALID_CLASSIFICATIONS}",
|
||
)
|
||
try:
|
||
_ensure_feedback_table()
|
||
execute(
|
||
f"INSERT INTO {_DB}.soc_feedback (src_ip, classification, comment) VALUES "
|
||
"(toIPv6({ip:String}), {cls:String}, {cmt:String})",
|
||
{"ip": body.src_ip, "cls": body.classification, "cmt": body.comment},
|
||
)
|
||
return {"status": "ok", "src_ip": body.src_ip, "classification": body.classification}
|
||
except Exception as exc:
|
||
logger.exception("classify insert failed")
|
||
raise HTTPException(status_code=500, detail=str(exc))
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/classifications — recent SOC feedback
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/classifications")
|
||
async def classifications() -> dict[str, Any]:
|
||
try:
|
||
_ensure_feedback_table()
|
||
rows = query(
|
||
f"SELECT created_at, toString(src_ip) AS src_ip, classification, comment "
|
||
f"FROM {_DB}.soc_feedback "
|
||
"ORDER BY created_at DESC LIMIT 50"
|
||
)
|
||
return {"data": rows}
|
||
except Exception as exc:
|
||
logger.exception("classifications query failed")
|
||
return {"data": []}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/campaigns — HDBSCAN bot campaign clusters
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/campaigns")
|
||
async def campaigns() -> dict[str, Any]:
|
||
"""Campagnes de bots détectées par clustering HDBSCAN."""
|
||
try:
|
||
rows = query(
|
||
f"SELECT campaign_id, "
|
||
f"count() AS members, "
|
||
f"min(detected_at) AS first_seen, max(detected_at) AS last_seen, "
|
||
f"avg(anomaly_score) AS avg_score, "
|
||
f"max(anomaly_score) AS max_score, "
|
||
f"uniqExact(src_ip) AS unique_ips, "
|
||
f"groupUniqArray(10)(ja4) AS ja4_list, "
|
||
f"groupUniqArray(5)(asn_org) AS asn_list, "
|
||
f"groupUniqArray(5)(country_code) AS countries "
|
||
f"FROM {_DB}.ml_detected_anomalies "
|
||
"WHERE campaign_id != '' AND campaign_id != '0' "
|
||
"AND detected_at >= now() - INTERVAL 7 DAY "
|
||
"GROUP BY campaign_id "
|
||
"ORDER BY members DESC LIMIT 50"
|
||
)
|
||
return {"campaigns": rows}
|
||
except Exception as exc:
|
||
logger.exception("campaigns query failed")
|
||
return {"campaigns": []}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/brute-force — Form brute-force detection
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/brute-force")
|
||
async def brute_force() -> dict[str, Any]:
|
||
"""Détection de brute-force / credential stuffing via view_form_bruteforce_detected."""
|
||
try:
|
||
rows = query(
|
||
f"SELECT toString(src_ip) AS src_ip, host, "
|
||
f"post_count, distinct_paths, first_seen, last_seen "
|
||
f"FROM {_DB}.view_form_bruteforce_detected "
|
||
"ORDER BY post_count DESC LIMIT 100"
|
||
)
|
||
return {"data": rows}
|
||
except Exception as exc:
|
||
logger.exception("brute-force query failed")
|
||
return {"data": []}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/ja4-rotation — JA4 fingerprint rotation detection
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/ja4-rotation")
|
||
async def ja4_rotation() -> dict[str, Any]:
|
||
"""IPs présentant une rotation de fingerprints JA4 (évasion potentielle)."""
|
||
try:
|
||
rows = query(
|
||
f"SELECT toString(src_ip) AS src_ip, host, "
|
||
f"distinct_ja4, ja4_list, total_hits, window_start "
|
||
f"FROM {_DB}.view_host_ip_ja4_rotation "
|
||
"ORDER BY distinct_ja4 DESC LIMIT 100"
|
||
)
|
||
return {"data": rows}
|
||
except Exception as exc:
|
||
logger.exception("ja4-rotation query failed")
|
||
return {"data": []}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/recurrence — Persistent threat IPs
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/recurrence")
|
||
async def recurrence() -> dict[str, Any]:
|
||
"""IPs récurrentes détectées sur plusieurs fenêtres temporelles."""
|
||
try:
|
||
rows = query(
|
||
f"SELECT toString(src_ip) AS src_ip, "
|
||
f"recurrence, worst_score, worst_threat, "
|
||
f"first_seen, last_seen, "
|
||
f"top_ja4, top_host "
|
||
f"FROM {_DB}.view_ip_recurrence "
|
||
"ORDER BY recurrence DESC, worst_score DESC LIMIT 100"
|
||
)
|
||
return {"data": rows}
|
||
except Exception as exc:
|
||
logger.exception("recurrence query failed")
|
||
return {"data": []}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/cascade/{ip} — Resource cascade for headless detection
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/cascade/{ip}")
|
||
async def cascade(ip: str) -> dict[str, Any]:
|
||
"""Cascade de ressources (détection navigateurs headless) pour une IP."""
|
||
clean_ip = ip.replace("::ffff:", "")
|
||
try:
|
||
rows = query(
|
||
f"SELECT toString(src_ip) AS src_ip, host, "
|
||
f"page_count, avg_sub_delay_ms, stddev_sub_delay_ms, "
|
||
f"max_sub_resources, window_start "
|
||
f"FROM {_DB}.view_resource_cascade_1h "
|
||
"WHERE src_ip = toIPv6({ip:String}) "
|
||
"ORDER BY window_start DESC LIMIT 50",
|
||
{"ip": clean_ip},
|
||
)
|
||
return {"data": rows}
|
||
except Exception as exc:
|
||
logger.exception("cascade query failed for %s", ip)
|
||
return {"data": []}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/alerts — Live alert feed (recent HIGH/CRITICAL)
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/alerts")
|
||
async def alerts(
|
||
limit: int = Query(20, ge=1, le=100),
|
||
) -> dict[str, Any]:
|
||
"""Flux d'alertes en temps réel (CRITICAL, HIGH, KNOWN_BOT)."""
|
||
try:
|
||
rows = query(
|
||
f"SELECT detected_at, toString(src_ip) AS src_ip, "
|
||
f"anomaly_score, threat_level, ja4, host, "
|
||
f"asn_org, country_code, bot_name, campaign_id, "
|
||
f"hits, hit_velocity, reason "
|
||
f"FROM {_DB}.ml_detected_anomalies "
|
||
"WHERE detected_at >= now() - INTERVAL 1 DAY "
|
||
"ORDER BY detected_at DESC "
|
||
f"LIMIT {{lim:UInt32}}",
|
||
{"lim": limit},
|
||
)
|
||
return {"alerts": rows}
|
||
except Exception as exc:
|
||
logger.exception("alerts query failed")
|
||
return {"alerts": []}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/timeline-detail — Hourly threat-level breakdown
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/timeline-detail")
|
||
async def timeline_detail() -> dict[str, Any]:
|
||
"""Timeline horaire avec ventilation par threat level."""
|
||
try:
|
||
rows = query(
|
||
f"SELECT toStartOfHour(detected_at) AS hour, "
|
||
f"threat_level, count() AS cnt "
|
||
f"FROM {_DB}.ml_detected_anomalies "
|
||
"WHERE detected_at >= now() - INTERVAL 1 DAY "
|
||
"GROUP BY hour, threat_level "
|
||
"ORDER BY hour"
|
||
)
|
||
# Pivot: group by hour
|
||
hours: dict[str, dict] = {}
|
||
for r in rows:
|
||
h = str(r["hour"])
|
||
if h not in hours:
|
||
hours[h] = {"hour": h}
|
||
hours[h][r["threat_level"]] = r["cnt"]
|
||
return {"timeline": list(hours.values())}
|
||
except Exception as exc:
|
||
logger.exception("timeline-detail query failed")
|
||
return {"timeline": []}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# GET /api/ua-rotation — User-Agent rotation detection
|
||
# ---------------------------------------------------------------------------
|
||
@router.get("/ua-rotation")
|
||
async def ua_rotation() -> dict[str, Any]:
|
||
"""IPs avec rotation de User-Agent (évasion potentielle)."""
|
||
try:
|
||
rows = query(
|
||
f"SELECT toString(src_ip) AS src_ip, ja4, "
|
||
f"distinct_ua_count, ua_samples, total_requests, window_start "
|
||
f"FROM {_DB}.view_dashboard_user_agents "
|
||
"ORDER BY distinct_ua_count DESC LIMIT 100"
|
||
)
|
||
return {"data": rows}
|
||
except Exception as exc:
|
||
logger.exception("ua-rotation query failed")
|
||
return {"data": []}
|