Complete rewrite of the SOC dashboard using FastAPI + Jinja2 + htmx + Chart.js + Tailwind CSS. Replaces the old React/Vite frontend with server-rendered templates. Dashboard pages: - Overview: KPIs, timeline chart, threat distribution, top IPs - Detections: paginated/filterable anomaly table - Scores: ml_all_scores with AE error & XGB prob columns - Traffic: HTTP logs with method/host filters - IP Investigation: full deep-dive (scores, features, HTTP logs, classify) - Classification: SOC feedback form + history - Features: AI + thesis feature stats - Models: scoring stats + model metadata API: 9 JSON endpoints with parameterized queries, sort whitelists SQL fixes: - 05_aggregation_tables: add deduplicate_merge_projection_mode - 11_views: fix nested aggregate (argMax inside sum) - 12_thesis_features: remove invalid 'let' bindings, fix groupArrayIf type Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
67 lines
2.0 KiB
Python
67 lines
2.0 KiB
Python
from __future__ import annotations
|
|
|
|
import ipaddress
|
|
import logging
|
|
from typing import Any
|
|
|
|
import clickhouse_connect
|
|
from clickhouse_connect.driver.client import Client
|
|
|
|
from backend.config import CLICKHOUSE_HOST, CLICKHOUSE_PORT, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_client: Client | None = None
|
|
|
|
|
|
def get_client() -> Client:
|
|
"""Return a lazily-initialised ClickHouse client (singleton)."""
|
|
global _client
|
|
if _client is None:
|
|
_client = clickhouse_connect.get_client(
|
|
host=CLICKHOUSE_HOST,
|
|
port=CLICKHOUSE_PORT,
|
|
username=CLICKHOUSE_USER,
|
|
password=CLICKHOUSE_PASSWORD,
|
|
)
|
|
logger.info("Connected to ClickHouse at %s:%s", CLICKHOUSE_HOST, CLICKHOUSE_PORT)
|
|
return _client
|
|
|
|
|
|
def _normalise_value(v: Any) -> Any:
|
|
"""Convert ClickHouse-specific types to JSON-friendly Python types."""
|
|
if isinstance(v, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
|
|
return str(v)
|
|
if isinstance(v, bytes):
|
|
try:
|
|
return str(ipaddress.IPv6Address(v))
|
|
except Exception:
|
|
return v.hex()
|
|
return v
|
|
|
|
|
|
def query(sql: str, params: dict | None = None) -> list[dict[str, Any]]:
|
|
"""Execute *sql* and return a list of row-dicts."""
|
|
client = get_client()
|
|
result = client.query(sql, parameters=params or {})
|
|
columns = result.column_names
|
|
rows: list[dict[str, Any]] = []
|
|
for row in result.result_rows:
|
|
rows.append({col: _normalise_value(val) for col, val in zip(columns, row)})
|
|
return rows
|
|
|
|
|
|
def query_scalar(sql: str, params: dict | None = None) -> Any:
|
|
"""Execute *sql* and return the single scalar value."""
|
|
client = get_client()
|
|
result = client.query(sql, parameters=params or {})
|
|
if result.result_rows:
|
|
return _normalise_value(result.result_rows[0][0])
|
|
return None
|
|
|
|
|
|
def execute(sql: str, params: dict | None = None) -> None:
|
|
"""Execute a DDL / DML statement that returns no rows."""
|
|
client = get_client()
|
|
client.command(sql, parameters=params or {})
|