Files
dashboard/backend/routes/ml_features.py
SOC Analyst e2bc4a47cd feat: ajout de 7 nouveaux dashboards d'analyse avancée
- 🔥 Brute Force & Credential Stuffing (view_form_bruteforce_detected)
- 🧬 TCP/OS Spoofing (view_tcp_spoofing_detected, 86K détections)
- 📡 Header Fingerprint Clustering (agg_header_fingerprint_1h, 1374 clusters)
- ⏱️ Heatmap Temporelle (agg_host_ip_ja4_1h, pic à 20h)
- 🌍 Botnets Distribués / JA4 spread (view_host_ja4_anomalies)
- 🔄 Rotation JA4 & Persistance (view_host_ip_ja4_rotation + view_ip_recurrence)
- 🤖 Features ML / Radar (view_ai_features_1h, radar SVG + scatter plot)

Backend: 7 nouveaux router FastAPI avec requêtes ClickHouse optimisées
Frontend: 7 nouveaux composants React + navigation 'Analyse Avancée' dans la sidebar
Fixes: alias fuzzing_index → max_fuzzing (ORDER BY ClickHouse), normalisation IPs ::ffff:

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-15 23:57:27 +01:00

158 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Endpoints pour les features ML / IA (scores d'anomalies, radar, scatter)
"""
from fastapi import APIRouter, HTTPException, Query
from ..database import db
router = APIRouter(prefix="/api/ml", tags=["ml_features"])
def _attack_type(fuzzing_index: float, hit_velocity: float,
is_fake_nav: int, ua_ch_mismatch: int) -> str:
if fuzzing_index > 50:
return "brute_force"
if hit_velocity > 1.0:
return "flood"
if is_fake_nav:
return "scraper"
if ua_ch_mismatch:
return "spoofing"
return "scanner"
@router.get("/top-anomalies")
async def get_top_anomalies(limit: int = Query(50, ge=1, le=500)):
"""Top IPs anomales déduplicées par IP (max fuzzing_index), triées par fuzzing_index DESC."""
try:
sql = """
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
any(ja4) AS ja4,
any(host) AS host,
max(hits) AS hits,
max(fuzzing_index) AS max_fuzzing,
max(hit_velocity) AS hit_velocity,
max(temporal_entropy) AS temporal_entropy,
max(is_fake_navigation) AS is_fake_navigation,
max(ua_ch_mismatch) AS ua_ch_mismatch,
max(sni_host_mismatch) AS sni_host_mismatch,
max(is_ua_rotating) AS is_ua_rotating,
max(path_diversity_ratio) AS path_diversity_ratio,
max(anomalous_payload_ratio) AS anomalous_payload_ratio,
any(asn_label) AS asn_label,
any(bot_name) AS bot_name
FROM mabase_prod.view_ai_features_1h
GROUP BY src_ip
ORDER BY 5 DESC
LIMIT %(limit)s
"""
result = db.query(sql, {"limit": limit})
items = []
for row in result.result_rows:
fuzzing = float(row[4] or 0)
velocity = float(row[5] or 0)
fake_nav = int(row[7] or 0)
ua_mm = int(row[8] or 0)
items.append({
"ip": str(row[0]),
"ja4": str(row[1]),
"host": str(row[2]),
"hits": int(row[3] or 0),
"fuzzing_index": fuzzing,
"hit_velocity": velocity,
"temporal_entropy": float(row[6] or 0),
"is_fake_navigation": fake_nav,
"ua_ch_mismatch": ua_mm,
"sni_host_mismatch": int(row[9] or 0),
"is_ua_rotating": int(row[10] or 0),
"path_diversity_ratio": float(row[11] or 0),
"anomalous_payload_ratio":float(row[12] or 0),
"asn_label": str(row[13] or ""),
"bot_name": str(row[14] or ""),
"attack_type": _attack_type(fuzzing, velocity, fake_nav, ua_mm),
})
return {"items": items}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/ip/{ip}/radar")
async def get_ip_radar(ip: str):
"""Scores radar pour une IP spécifique (8 dimensions d'anomalie)."""
try:
sql = """
SELECT
avg(fuzzing_index) AS fuzzing_index,
avg(hit_velocity) AS hit_velocity,
avg(is_fake_navigation) AS is_fake_navigation,
avg(ua_ch_mismatch) AS ua_ch_mismatch,
avg(sni_host_mismatch) AS sni_host_mismatch,
avg(orphan_ratio) AS orphan_ratio,
avg(path_diversity_ratio) AS path_diversity_ratio,
avg(anomalous_payload_ratio) AS anomalous_payload_ratio
FROM mabase_prod.view_ai_features_1h
WHERE replaceRegexpAll(toString(src_ip), '^::ffff:', '') = %(ip)s
"""
result = db.query(sql, {"ip": ip})
if not result.result_rows:
raise HTTPException(status_code=404, detail="IP not found")
row = result.result_rows[0]
def _f(v) -> float:
return float(v or 0)
return {
"ip": ip,
"fuzzing_score": min(100.0, _f(row[0])),
"velocity_score": min(100.0, _f(row[1]) * 100),
"fake_nav_score": _f(row[2]) * 100,
"ua_mismatch_score": _f(row[3]) * 100,
"sni_mismatch_score": _f(row[4]) * 100,
"orphan_score": min(100.0, _f(row[5]) * 100),
"path_repetition_score": max(0.0, 100 - _f(row[6]) * 100),
"payload_anomaly_score": min(100.0, _f(row[7]) * 100),
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/scatter")
async def get_ml_scatter(limit: int = Query(200, ge=1, le=1000)):
"""Points pour scatter plot (fuzzing_index × hit_velocity), dédupliqués par IP."""
try:
sql = """
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
any(ja4) AS ja4,
max(fuzzing_index) AS max_fuzzing,
max(hit_velocity) AS hit_velocity,
max(hits) AS hits,
max(is_fake_navigation) AS is_fake_navigation,
max(ua_ch_mismatch) AS ua_ch_mismatch
FROM mabase_prod.view_ai_features_1h
GROUP BY src_ip
ORDER BY 3 DESC
LIMIT %(limit)s
"""
result = db.query(sql, {"limit": limit})
points = []
for row in result.result_rows:
fuzzing = float(row[2] or 0)
velocity = float(row[3] or 0)
fake_nav = int(row[5] or 0)
ua_mm = int(row[6] or 0)
points.append({
"ip": str(row[0]),
"ja4": str(row[1]),
"fuzzing_index":fuzzing,
"hit_velocity": velocity,
"hits": int(row[4] or 0),
"attack_type": _attack_type(fuzzing, velocity, fake_nav, ua_mm),
})
return {"points": points}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))