feat(dashboard): add JA4 fingerprint and cluster investigation pages

- /ja4/{fingerprint} page: 8 KPIs, timeline, threat pie, IP scores
  table, ASN/geo charts, HTTP logs, AI features — full JA4 investigation
- /cluster/{cid} page: 8 KPIs, timeline, threat/JA4/ASN/host charts,
  member table with bulk classify — full campaign investigation
- /api/ja4/{fingerprint} and /api/cluster/{cid} API endpoints
- fmtJA4 links now navigate to /ja4/ investigation page
- campaigns.html: 'Ouvrir' button links to /cluster/{cid} full page
- Fix: double-brace {{param}} in non-f-string queries → single {param}
  (was causing HTTP 500 on all parameterized ClickHouse queries)
- 50 routes total, all tests pass, 0 JS console errors

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
toto
2026-04-09 14:05:52 +02:00
parent 70188b508c
commit 702c0d5edb
6 changed files with 778 additions and 5 deletions

View File

@ -1027,7 +1027,7 @@ async def campaign_detail(cid: int) -> dict[str, Any]:
f"asn_org, asn_number, country_code, "
f"browser_family, bot_name, detected_at, reason "
f"FROM {_DB}.ml_detected_anomalies "
"WHERE campaign_id = {{cid:Int32}} "
"WHERE campaign_id = {cid:Int32} "
"AND detected_at >= now() - INTERVAL 7 DAY "
"ORDER BY anomaly_score ASC LIMIT 200",
{"cid": cid},
@ -1048,7 +1048,7 @@ async def campaign_detail(cid: int) -> dict[str, Any]:
f"groupUniqArray(10)(host) AS host_list, "
f"min(detected_at) AS first_seen, max(detected_at) AS last_seen "
f"FROM {_DB}.ml_detected_anomalies "
"WHERE campaign_id = {{cid:Int32}} "
"WHERE campaign_id = {cid:Int32} "
"AND detected_at >= now() - INTERVAL 7 DAY",
{"cid": cid},
)
@ -1057,7 +1057,7 @@ async def campaign_detail(cid: int) -> dict[str, Any]:
f"SELECT toStartOfHour(detected_at) AS hour, "
f"count() AS detections, uniqExact(src_ip) AS active_ips "
f"FROM {_DB}.ml_detected_anomalies "
"WHERE campaign_id = {{cid:Int32}} "
"WHERE campaign_id = {cid:Int32} "
"AND detected_at >= now() - INTERVAL 7 DAY "
"GROUP BY hour ORDER BY hour",
{"cid": cid},
@ -1226,3 +1226,247 @@ async def ua_rotation() -> dict[str, Any]:
except Exception as exc:
logger.exception("ua-rotation query failed")
return {"data": []}
# ---------------------------------------------------------------------------
# GET /api/ja4/{fingerprint} — JA4 fingerprint investigation detail
# ---------------------------------------------------------------------------
@router.get("/ja4/{fingerprint:path}")
async def ja4_detail(fingerprint: str) -> dict[str, Any]:
"""Investigation complète d'une empreinte JA4 : IPs, scores, comportement."""
params = {"ja4": fingerprint}
try:
# IPs utilisant cette empreinte (détections)
detections = query(
f"SELECT toString(src_ip) AS src_ip, anomaly_score, "
f"raw_anomaly_score, threat_level, hits, hit_velocity, "
f"host, asn_org, country_code, browser_family, bot_name, "
f"detected_at, campaign_id "
f"FROM {_DB}.ml_detected_anomalies "
"WHERE ja4 = {ja4:String} "
"AND detected_at >= now() - INTERVAL 7 DAY "
"ORDER BY detected_at DESC LIMIT 500",
params,
)
# Scores ML pour cette JA4
all_scores = query(
f"SELECT toString(src_ip) AS src_ip, anomaly_score, "
f"raw_anomaly_score, ae_recon_error, xgb_prob, "
f"threat_level, model_name, host, hits, "
f"asn_org, country_code, browser_family, detected_at "
f"FROM {_DB}.ml_all_scores "
"WHERE ja4 = {ja4:String} "
"AND detected_at >= now() - INTERVAL 3 DAY "
"ORDER BY detected_at DESC LIMIT 500",
params,
)
# Profil agrégé
profile = query(
f"SELECT "
f"count() AS total_sessions, "
f"uniqExact(src_ip) AS unique_ips, "
f"uniqExact(host) AS unique_hosts, "
f"uniqExact(asn_org) AS unique_asns, "
f"avg(anomaly_score) AS avg_score, "
f"max(anomaly_score) AS max_score, "
f"avg(hits) AS avg_hits, "
f"avg(hit_velocity) AS avg_velocity, "
f"sum(hits) AS total_hits, "
f"groupUniqArray(20)(toString(src_ip)) AS ip_sample, "
f"groupUniqArray(10)(host) AS host_list, "
f"groupUniqArray(10)(asn_org) AS asn_list, "
f"groupUniqArray(10)(country_code) AS country_list, "
f"groupUniqArray(5)(browser_family) AS browser_list, "
f"groupUniqArray(5)(bot_name) AS bot_names, "
f"min(detected_at) AS first_seen, max(detected_at) AS last_seen, "
f"countIf(threat_level IN ('HIGH','CRITICAL')) AS threat_count, "
f"countIf(threat_level = 'KNOWN_BOT') AS known_bot_count, "
f"countIf(browser_family != '') AS browser_count "
f"FROM {_DB}.ml_all_scores "
"WHERE ja4 = {ja4:String} "
"AND detected_at >= now() - INTERVAL 7 DAY",
params,
)
# Timeline horaire
timeline = query(
f"SELECT toStartOfHour(detected_at) AS hour, "
f"count() AS sessions, uniqExact(src_ip) AS active_ips, "
f"avg(anomaly_score) AS avg_score "
f"FROM {_DB}.ml_all_scores "
"WHERE ja4 = {ja4:String} "
"AND detected_at >= now() - INTERVAL 3 DAY "
"GROUP BY hour ORDER BY hour",
params,
)
# Threat breakdown
threats = query(
f"SELECT threat_level, count() AS cnt "
f"FROM {_DB}.ml_all_scores "
"WHERE ja4 = {ja4:String} "
"AND detected_at >= now() - INTERVAL 7 DAY "
"GROUP BY threat_level ORDER BY cnt DESC",
params,
)
# Trafic HTTP brut
http_logs = query(
f"SELECT time, toString(src_ip) AS src_ip, method, host, path, "
f"http_version, header_user_agent "
f"FROM {_DB_LOGS}.http_logs "
"WHERE ja4 = {ja4:String} "
"AND time >= now() - INTERVAL 1 DAY "
"ORDER BY time DESC LIMIT 200",
params,
)
# AI features pour cette JA4
ai_features: list[dict] = []
try:
ai_features = query(
f"SELECT * FROM {_DB}.view_ai_features_1h "
"WHERE ja4 = {ja4:String} LIMIT 20",
params,
)
except Exception:
logger.debug("view_ai_features_1h unavailable for ja4=%s", fingerprint)
return {
"ja4": fingerprint,
"profile": profile[0] if profile else {},
"detections": detections,
"scores": all_scores,
"timeline": timeline,
"threats": threats,
"http_logs": http_logs,
"ai_features": ai_features,
}
except Exception as exc:
logger.exception("ja4 detail query failed for %s", fingerprint)
raise HTTPException(status_code=500, detail=str(exc))
# ---------------------------------------------------------------------------
# GET /api/cluster/{cid} — Enhanced cluster investigation
# ---------------------------------------------------------------------------
@router.get("/cluster/{cid}")
async def cluster_detail(cid: int) -> dict[str, Any]:
"""Investigation complète d'un cluster : profil, membres, graphe, timeline."""
params = {"cid": cid}
try:
# Profil agrégé enrichi
profile = query(
f"SELECT "
f"count() AS total_members, "
f"uniqExact(src_ip) AS unique_ips, "
f"uniqExact(ja4) AS unique_ja4, "
f"uniqExact(host) AS unique_hosts, "
f"uniqExact(asn_org) AS unique_asns, "
f"avg(anomaly_score) AS avg_score, max(anomaly_score) AS max_score, "
f"min(anomaly_score) AS min_score, "
f"avg(hits) AS avg_hits, sum(hits) AS total_hits, "
f"avg(hit_velocity) AS avg_velocity, "
f"avg(fuzzing_index) AS avg_fuzzing, "
f"avg(post_ratio) AS avg_post_ratio, "
f"groupUniqArray(30)(toString(src_ip)) AS ip_list, "
f"groupUniqArray(20)(ja4) AS ja4_list, "
f"groupUniqArray(10)(host) AS host_list, "
f"groupUniqArray(10)(asn_org) AS asn_list, "
f"groupUniqArray(10)(country_code) AS country_list, "
f"groupUniqArray(5)(browser_family) AS browser_list, "
f"groupUniqArray(5)(bot_name) AS bot_names, "
f"min(detected_at) AS first_seen, max(detected_at) AS last_seen, "
f"countIf(threat_level IN ('HIGH','CRITICAL')) AS threat_count, "
f"countIf(threat_level = 'KNOWN_BOT') AS known_bot_count "
f"FROM {_DB}.ml_detected_anomalies "
"WHERE campaign_id = {cid:Int32} "
"AND detected_at >= now() - INTERVAL 7 DAY",
params,
)
# Membres détaillés
members = query(
f"SELECT toString(src_ip) AS src_ip, ja4, host, "
f"anomaly_score, raw_anomaly_score, threat_level, "
f"hits, hit_velocity, fuzzing_index, post_ratio, "
f"asn_org, asn_number, country_code, "
f"browser_family, bot_name, detected_at, reason "
f"FROM {_DB}.ml_detected_anomalies "
"WHERE campaign_id = {cid:Int32} "
"AND detected_at >= now() - INTERVAL 7 DAY "
"ORDER BY anomaly_score ASC LIMIT 500",
params,
)
# Timeline horaire
timeline = query(
f"SELECT toStartOfHour(detected_at) AS hour, "
f"count() AS detections, uniqExact(src_ip) AS active_ips, "
f"avg(anomaly_score) AS avg_score "
f"FROM {_DB}.ml_detected_anomalies "
"WHERE campaign_id = {cid:Int32} "
"AND detected_at >= now() - INTERVAL 7 DAY "
"GROUP BY hour ORDER BY hour",
params,
)
# Répartition par JA4 (signature convergence)
ja4_breakdown = query(
f"SELECT ja4, count() AS sessions, "
f"uniqExact(src_ip) AS unique_ips, "
f"avg(anomaly_score) AS avg_score "
f"FROM {_DB}.ml_detected_anomalies "
"WHERE campaign_id = {cid:Int32} "
"AND detected_at >= now() - INTERVAL 7 DAY "
"GROUP BY ja4 ORDER BY sessions DESC",
params,
)
# Répartition par ASN (infrastructure)
asn_breakdown = query(
f"SELECT asn_org, count() AS sessions, "
f"uniqExact(src_ip) AS unique_ips "
f"FROM {_DB}.ml_detected_anomalies "
"WHERE campaign_id = {cid:Int32} "
"AND detected_at >= now() - INTERVAL 7 DAY "
"GROUP BY asn_org ORDER BY sessions DESC",
params,
)
# Répartition par host ciblé
host_breakdown = query(
f"SELECT host, count() AS sessions, "
f"avg(anomaly_score) AS avg_score "
f"FROM {_DB}.ml_detected_anomalies "
"WHERE campaign_id = {cid:Int32} "
"AND detected_at >= now() - INTERVAL 7 DAY "
"GROUP BY host ORDER BY sessions DESC",
params,
)
# Threat breakdown
threats = query(
f"SELECT threat_level, count() AS cnt "
f"FROM {_DB}.ml_detected_anomalies "
"WHERE campaign_id = {cid:Int32} "
"AND detected_at >= now() - INTERVAL 7 DAY "
"GROUP BY threat_level ORDER BY cnt DESC",
params,
)
return {
"campaign_id": cid,
"profile": profile[0] if profile else {},
"members": members,
"timeline": timeline,
"ja4_breakdown": ja4_breakdown,
"asn_breakdown": asn_breakdown,
"host_breakdown": host_breakdown,
"threats": threats,
}
except Exception as exc:
logger.exception("cluster detail query failed for %s", cid)
raise HTTPException(status_code=500, detail=str(exc))