feat(dashboard): rebuild SOC dashboard + fix ClickHouse SQL

Complete rewrite of the SOC dashboard using FastAPI + Jinja2 + htmx + Chart.js + Tailwind CSS.
Replaces the old React/Vite frontend with server-rendered templates.

Dashboard pages:
- Overview: KPIs, timeline chart, threat distribution, top IPs
- Detections: paginated/filterable anomaly table
- Scores: ml_all_scores with AE error & XGB prob columns
- Traffic: HTTP logs with method/host filters
- IP Investigation: full deep-dive (scores, features, HTTP logs, classify)
- Classification: SOC feedback form + history
- Features: AI + thesis feature stats
- Models: scoring stats + model metadata

API: 9 JSON endpoints with parameterized queries, sort whitelists

SQL fixes:
- 05_aggregation_tables: add deduplicate_merge_projection_mode
- 11_views: fix nested aggregate (argMax inside sum)
- 12_thesis_features: remove invalid 'let' bindings, fix groupArrayIf type

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
toto
2026-04-08 03:21:05 +02:00
parent 228ad7026a
commit b735bab5a5
120 changed files with 1444 additions and 24933 deletions

View File

@ -0,0 +1,507 @@
"""JSON API endpoints for the JA4 SOC Dashboard."""
from __future__ import annotations
import json
import logging
import os
from pathlib import Path
from typing import Any
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from backend.config import DB_PROCESSING, DB_LOGS, safe_identifier
from backend.database import query, query_scalar, execute
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api")
# Pre-validate DB identifiers at import time
_DB = safe_identifier(DB_PROCESSING)
_DB_LOGS = safe_identifier(DB_LOGS)
# Whitelists for sort/order to prevent SQL injection
_DETECTION_SORT_COLS = {
"detected_at", "src_ip", "ja4", "host", "anomaly_score",
"threat_level", "recurrence", "hits", "hit_velocity",
"fuzzing_index", "post_ratio", "campaign_id",
}
_SCORE_SORT_COLS = {
"detected_at", "window_start", "src_ip", "ja4", "host",
"anomaly_score", "raw_anomaly_score", "threat_level",
"hits", "hit_velocity", "xgb_prob", "ae_recon_error",
}
_TRAFFIC_SORT_COLS = {
"time", "src_ip", "method", "host", "path", "http_version",
"header_user_agent", "ja4", "src_country_code",
}
_ORDER_VALUES = {"ASC", "DESC"}
def _validate_sort(value: str, whitelist: set[str], default: str) -> str:
return value if value in whitelist else default
def _validate_order(value: str) -> str:
return value.upper() if value.upper() in _ORDER_VALUES else "DESC"
# ---------------------------------------------------------------------------
# GET /api/overview
# ---------------------------------------------------------------------------
@router.get("/overview")
async def overview() -> dict[str, Any]:
try:
detections_24h = query_scalar(
f"SELECT count() FROM {_DB}.ml_detected_anomalies "
"WHERE detected_at >= now() - INTERVAL 1 DAY"
) or 0
scored_24h = query_scalar(
f"SELECT count() FROM {_DB}.ml_all_scores "
"WHERE detected_at >= now() - INTERVAL 1 DAY"
) or 0
threat_distribution = query(
f"SELECT threat_level, count() AS cnt "
f"FROM {_DB}.ml_all_scores "
"WHERE detected_at >= now() - INTERVAL 1 DAY "
"GROUP BY threat_level"
)
# Compute critical / high counts from distribution
threat_map = {r["threat_level"]: r["cnt"] for r in threat_distribution}
critical_count = threat_map.get("CRITICAL", 0)
high_count = threat_map.get("HIGH", 0)
unique_ips = query_scalar(
f"SELECT uniq(src_ip) FROM {_DB}.ml_detected_anomalies "
"WHERE detected_at >= now() - INTERVAL 1 DAY"
) or 0
top_ips = query(
f"SELECT toString(src_ip) AS src_ip, count() AS cnt, "
f"max(anomaly_score) AS worst_score, "
f"any(threat_level) AS threat_level, "
f"any(asn_org) AS asn_org, any(country_code) AS country_code "
f"FROM {_DB}.ml_detected_anomalies "
"WHERE detected_at >= now() - INTERVAL 1 DAY "
"GROUP BY src_ip ORDER BY cnt DESC LIMIT 10"
)
timeline = query(
f"SELECT toStartOfHour(detected_at) AS hour, count() AS cnt "
f"FROM {_DB}.ml_detected_anomalies "
"WHERE detected_at >= now() - INTERVAL 1 DAY "
"GROUP BY hour ORDER BY hour"
)
traffic_24h = query_scalar(
f"SELECT count() FROM {_DB_LOGS}.http_logs "
"WHERE time >= now() - INTERVAL 1 DAY"
) or 0
models = query(
f"SELECT model_name, count() AS scored "
f"FROM {_DB}.ml_all_scores "
"WHERE detected_at >= now() - INTERVAL 1 DAY "
"GROUP BY model_name"
)
return {
"detections_24h": detections_24h,
"scored_24h": scored_24h,
"traffic_24h": traffic_24h,
"unique_ips": unique_ips,
"critical_count": critical_count,
"high_count": high_count,
"threat_distribution": threat_distribution,
"top_ips": top_ips,
"timeline": [{"hour": str(r["hour"]), "cnt": r["cnt"]} for r in timeline],
"models": models,
}
except Exception as exc:
logger.exception("overview query failed")
raise HTTPException(status_code=500, detail=str(exc))
# ---------------------------------------------------------------------------
# GET /api/detections
# ---------------------------------------------------------------------------
@router.get("/detections")
async def detections(
page: int = Query(1, ge=1),
per_page: int = Query(50, ge=1, le=500),
sort: str = Query("detected_at"),
order: str = Query("DESC"),
threat_level: str | None = Query(None),
search: str | None = Query(None),
) -> dict[str, Any]:
sort = _validate_sort(sort, _DETECTION_SORT_COLS, "detected_at")
order = _validate_order(order)
offset = (page - 1) * per_page
where_clauses = ["detected_at >= now() - INTERVAL 30 DAY"]
params: dict[str, Any] = {}
if threat_level:
where_clauses.append("threat_level = {tl:String}")
params["tl"] = threat_level
if search:
where_clauses.append(
"(toString(src_ip) LIKE {search:String} OR host LIKE {search:String})"
)
params["search"] = f"%{search}%"
where = " AND ".join(where_clauses)
try:
total = query_scalar(
f"SELECT count() FROM {_DB}.ml_detected_anomalies WHERE {where}",
params,
)
rows = query(
f"SELECT *, toString(src_ip) AS src_ip_str "
f"FROM {_DB}.ml_detected_anomalies "
f"WHERE {where} ORDER BY {sort} {order} "
f"LIMIT {{lim:UInt32}} OFFSET {{off:UInt32}}",
{**params, "lim": per_page, "off": offset},
)
return {
"data": rows,
"total": total or 0,
"page": page,
"per_page": per_page,
"pages": max(1, -(-((total or 0)) // per_page)),
}
except Exception as exc:
logger.exception("detections query failed")
raise HTTPException(status_code=500, detail=str(exc))
# ---------------------------------------------------------------------------
# GET /api/scores
# ---------------------------------------------------------------------------
@router.get("/scores")
async def scores(
page: int = Query(1, ge=1),
per_page: int = Query(50, ge=1, le=500),
sort: str = Query("detected_at"),
order: str = Query("DESC"),
threat_level: str | None = Query(None),
search: str | None = Query(None),
) -> dict[str, Any]:
sort = _validate_sort(sort, _SCORE_SORT_COLS, "detected_at")
order = _validate_order(order)
offset = (page - 1) * per_page
where_clauses = ["detected_at >= now() - INTERVAL 3 DAY"]
params: dict[str, Any] = {}
if threat_level:
where_clauses.append("threat_level = {tl:String}")
params["tl"] = threat_level
if search:
where_clauses.append(
"(toString(src_ip) LIKE {search:String} OR host LIKE {search:String})"
)
params["search"] = f"%{search}%"
where = " AND ".join(where_clauses)
try:
total = query_scalar(
f"SELECT count() FROM {_DB}.ml_all_scores WHERE {where}",
params,
)
rows = query(
f"SELECT *, toString(src_ip) AS src_ip_str "
f"FROM {_DB}.ml_all_scores "
f"WHERE {where} ORDER BY {sort} {order} "
f"LIMIT {{lim:UInt32}} OFFSET {{off:UInt32}}",
{**params, "lim": per_page, "off": offset},
)
return {
"data": rows,
"total": total or 0,
"page": page,
"per_page": per_page,
"pages": max(1, -(-((total or 0)) // per_page)),
}
except Exception as exc:
logger.exception("scores query failed")
raise HTTPException(status_code=500, detail=str(exc))
# ---------------------------------------------------------------------------
# GET /api/traffic
# ---------------------------------------------------------------------------
@router.get("/traffic")
async def traffic(
page: int = Query(1, ge=1),
per_page: int = Query(50, ge=1, le=500),
sort: str = Query("time"),
order: str = Query("DESC"),
method: str | None = Query(None),
host: str | None = Query(None),
status: str | None = Query(None),
) -> dict[str, Any]:
sort = _validate_sort(sort, _TRAFFIC_SORT_COLS, "time")
order = _validate_order(order)
offset = (page - 1) * per_page
where_clauses = ["time >= now() - INTERVAL 1 DAY"]
params: dict[str, Any] = {}
if method:
where_clauses.append("method = {method:String}")
params["method"] = method
if host:
where_clauses.append("host LIKE {host:String}")
params["host"] = f"%{host}%"
if status is not None:
where_clauses.append("http_version = {status:String}")
params["status"] = status
where = " AND ".join(where_clauses)
try:
total = query_scalar(
f"SELECT count() FROM {_DB_LOGS}.http_logs WHERE {where}",
params,
)
rows = query(
f"SELECT time, toString(src_ip) AS src_ip, method, host, path, "
f"http_version, header_user_agent, ja4, src_country_code "
f"FROM {_DB_LOGS}.http_logs "
f"WHERE {where} ORDER BY {sort} {order} "
f"LIMIT {{lim:UInt32}} OFFSET {{off:UInt32}}",
{**params, "lim": per_page, "off": offset},
)
return {
"data": rows,
"total": total or 0,
"page": page,
"per_page": per_page,
"pages": max(1, -(-((total or 0)) // per_page)),
}
except Exception as exc:
logger.exception("traffic query failed")
raise HTTPException(status_code=500, detail=str(exc))
# ---------------------------------------------------------------------------
# GET /api/ip/{ip}
# ---------------------------------------------------------------------------
@router.get("/ip/{ip}")
async def ip_detail(ip: str) -> dict[str, Any]:
# Strip ::ffff: prefix for IPv4-mapped addresses
clean_ip = ip.replace("::ffff:", "")
params = {"ip": clean_ip}
try:
detections = query(
f"SELECT *, toString(src_ip) AS src_ip_str "
f"FROM {_DB}.ml_detected_anomalies "
"WHERE src_ip = toIPv6({ip:String}) "
"AND detected_at >= now() - INTERVAL 30 DAY "
"ORDER BY detected_at DESC",
params,
)
all_scores = query(
f"SELECT *, toString(src_ip) AS src_ip_str "
f"FROM {_DB}.ml_all_scores "
"WHERE src_ip = toIPv6({ip:String}) "
"AND detected_at >= now() - INTERVAL 3 DAY "
"ORDER BY detected_at DESC",
params,
)
http_logs = query(
f"SELECT time, method, host, path, http_version, header_user_agent, ja4 "
f"FROM {_DB_LOGS}.http_logs "
"WHERE src_ip = toIPv4OrZero({ip:String}) "
"AND time >= now() - INTERVAL 1 DAY "
"ORDER BY time DESC LIMIT 100",
params,
)
ai_features: list[dict] = []
try:
ai_features = query(
f"SELECT * FROM {_DB}.view_ai_features_1h "
"WHERE src_ip = toIPv6({ip:String}) LIMIT 1",
params,
)
except Exception:
logger.debug("view_ai_features_1h unavailable for %s", ip)
recurrence: list[dict] = []
try:
recurrence = query(
f"SELECT * FROM {_DB}.view_ip_recurrence "
"WHERE src_ip = toIPv6({ip:String})",
params,
)
except Exception:
logger.debug("view_ip_recurrence unavailable for %s", ip)
return {
"ip": ip,
"detections": detections,
"scores": all_scores,
"http_logs": http_logs,
"ai_features": ai_features,
"recurrence": recurrence,
}
except Exception as exc:
logger.exception("ip detail query failed for %s", ip)
raise HTTPException(status_code=500, detail=str(exc))
# ---------------------------------------------------------------------------
# GET /api/features
# ---------------------------------------------------------------------------
@router.get("/features")
async def features() -> dict[str, Any]:
result: dict[str, Any] = {"ai_features": {}, "thesis_features": {}}
try:
ai_stats = query(
f"SELECT count() AS total, "
f"avg(hits) AS avg_hits, "
f"avg(hit_velocity) AS avg_hit_velocity, "
f"avg(fuzzing_index) AS avg_fuzzing_index, "
f"avg(post_ratio) AS avg_post_ratio "
f"FROM {_DB}.view_ai_features_1h"
)
if ai_stats:
result["ai_features"] = ai_stats[0]
except Exception:
logger.debug("view_ai_features_1h not available")
try:
thesis_stats = query(
f"SELECT count() AS total, "
f"avg(hits) AS avg_hits, "
f"avg(hit_velocity) AS avg_hit_velocity, "
f"avg(fuzzing_index) AS avg_fuzzing_index, "
f"avg(post_ratio) AS avg_post_ratio "
f"FROM {_DB}.view_thesis_features_1h"
)
if thesis_stats:
result["thesis_features"] = thesis_stats[0]
except Exception:
logger.debug("view_thesis_features_1h not available")
return result
# ---------------------------------------------------------------------------
# GET /api/models
# ---------------------------------------------------------------------------
_MODEL_DIR = Path("/data/models")
@router.get("/models")
async def models() -> dict[str, Any]:
model_info: list[dict[str, Any]] = []
if _MODEL_DIR.is_dir():
for p in sorted(_MODEL_DIR.glob("*.json")):
try:
data = json.loads(p.read_text())
model_info.append(data)
except Exception:
logger.warning("Could not read model metadata %s", p)
# Also fetch latest scoring stats from ClickHouse
scoring_stats: list[dict] = []
try:
scoring_stats = query(
f"SELECT model_name, count() AS scored, "
f"min(detected_at) AS first_seen, max(detected_at) AS last_seen "
f"FROM {_DB}.ml_all_scores "
"WHERE detected_at >= now() - INTERVAL 7 DAY "
"GROUP BY model_name"
)
except Exception:
logger.debug("could not fetch model scoring stats")
return {"models": model_info, "scoring_stats": scoring_stats}
# ---------------------------------------------------------------------------
# POST /api/classify — SOC analyst feedback
# ---------------------------------------------------------------------------
class ClassifyRequest(BaseModel):
src_ip: str
classification: str # bot | legitimate | suspicious
comment: str = ""
_VALID_CLASSIFICATIONS = {"bot", "legitimate", "suspicious"}
_feedback_table_ensured = False
def _ensure_feedback_table() -> None:
global _feedback_table_ensured
if _feedback_table_ensured:
return
execute(
f"CREATE TABLE IF NOT EXISTS {_DB}.soc_feedback ("
" created_at DateTime DEFAULT now(),"
" src_ip IPv6,"
" classification LowCardinality(String),"
" comment String"
") ENGINE = MergeTree() ORDER BY (src_ip, created_at)"
)
_feedback_table_ensured = True
@router.post("/classify")
async def classify(body: ClassifyRequest) -> dict[str, Any]:
if body.classification not in _VALID_CLASSIFICATIONS:
raise HTTPException(
status_code=422,
detail=f"classification must be one of {_VALID_CLASSIFICATIONS}",
)
try:
_ensure_feedback_table()
execute(
f"INSERT INTO {_DB}.soc_feedback (src_ip, classification, comment) VALUES "
"(toIPv6({ip:String}), {cls:String}, {cmt:String})",
{"ip": body.src_ip, "cls": body.classification, "cmt": body.comment},
)
return {"status": "ok", "src_ip": body.src_ip, "classification": body.classification}
except Exception as exc:
logger.exception("classify insert failed")
raise HTTPException(status_code=500, detail=str(exc))
# ---------------------------------------------------------------------------
# GET /api/classifications — recent SOC feedback
# ---------------------------------------------------------------------------
@router.get("/classifications")
async def classifications() -> dict[str, Any]:
try:
_ensure_feedback_table()
rows = query(
f"SELECT created_at, toString(src_ip) AS src_ip, classification, comment "
f"FROM {_DB}.soc_feedback "
"ORDER BY created_at DESC LIMIT 50"
)
return {"data": rows}
except Exception as exc:
logger.exception("classifications query failed")
return {"data": []}