Files
dashboard/backend/routes/rotation.py
SOC Analyst d4c3512572 feat: 6 améliorations SOC — synthèse IP, baseline, sophistication, chasse proactive, badge ASN, 2 nouveaux onglets rotation
- investigation_summary.py: nouveau endpoint GET /api/investigation/{ip}/summary
  agrège 6 sources (ML, bruteforce, TCP spoofing, JA4 rotation, persistance, timeline 24h)
  en un score de risque 0-100 avec signaux détaillés
- InvestigationView.tsx: widget IPActivitySummary avec jauge Risk Score SVG,
  badges multi-sources et mini-timeline 24h barres
- metrics.py: endpoint GET /api/metrics/baseline — comparaison 24h vs hier
  (total détections, IPs uniques, alertes CRITICAL) avec % de variation
- IncidentsView.tsx: widget baseline avec ▲▼ sur le dashboard principal
- rotation.py: endpoints /sophistication et /proactive-hunt
  Score sophistication = JOIN 3 tables (rotation×10 + récurrence×20 + log(bf+1)×5)
  Chasse proactive = IPs récurrentes sous le seuil ML (abs(score) < 0.5)
- RotationView.tsx: onglets 🏆 Sophistication et 🕵️ Chasse proactive
  avec tier APT-like/Advanced/Automated/Basic et boutons investigation
- detections.py: LEFT JOIN asn_reputation, badge coloré rouge/orange/vert
  selon label (bot/scanner → score 0.05, human → 0.9)
- models.py: ajout champs asn_score et asn_rep_label dans Detection

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-16 00:43:27 +01:00

214 lines
7.7 KiB
Python

"""
Endpoints pour la détection de la rotation de fingerprints JA4 et des menaces persistantes
"""
import math
from fastapi import APIRouter, HTTPException, Query
from ..database import db
router = APIRouter(prefix="/api/rotation", tags=["rotation"])
@router.get("/ja4-rotators")
async def get_ja4_rotators(limit: int = Query(50, ge=1, le=500)):
"""IPs qui effectuent le plus de rotation de fingerprints JA4."""
try:
sql = """
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
distinct_ja4_count,
total_hits
FROM mabase_prod.view_host_ip_ja4_rotation
ORDER BY distinct_ja4_count DESC
LIMIT %(limit)s
"""
result = db.query(sql, {"limit": limit})
items = []
for row in result.result_rows:
distinct = int(row[1])
items.append({
"ip": str(row[0]),
"distinct_ja4_count":distinct,
"total_hits": int(row[2]),
"evasion_score": min(100, distinct * 15),
})
return {"items": items, "total": len(items)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/persistent-threats")
async def get_persistent_threats(limit: int = Query(100, ge=1, le=1000)):
"""Menaces persistantes triées par score de persistance."""
try:
sql = """
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
recurrence,
worst_score,
worst_threat_level,
first_seen,
last_seen
FROM mabase_prod.view_ip_recurrence
ORDER BY (least(100, recurrence * 20 + worst_score * 50)) DESC
LIMIT %(limit)s
"""
result = db.query(sql, {"limit": limit})
items = []
for row in result.result_rows:
recurrence = int(row[1])
worst_score = float(row[2] or 0)
items.append({
"ip": str(row[0]),
"recurrence": recurrence,
"worst_score": worst_score,
"worst_threat_level":str(row[3] or ""),
"first_seen": str(row[4]),
"last_seen": str(row[5]),
"persistence_score": min(100, recurrence * 20 + worst_score * 50),
})
return {"items": items, "total": len(items)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/ip/{ip}/ja4-history")
async def get_ip_ja4_history(ip: str):
"""Historique des JA4 utilisés par une IP donnée."""
try:
sql = """
SELECT
ja4,
sum(hits) AS hits,
min(window_start) AS first_seen,
max(window_start) AS last_seen
FROM mabase_prod.agg_host_ip_ja4_1h
WHERE replaceRegexpAll(toString(src_ip), '^::ffff:', '') = %(ip)s
GROUP BY ja4
ORDER BY hits DESC
"""
result = db.query(sql, {"ip": ip})
items = [
{
"ja4": str(row[0]),
"hits": int(row[1]),
"first_seen":str(row[2]),
"last_seen": str(row[3]),
}
for row in result.result_rows
]
return {"ip": ip, "ja4_history": items, "total": len(items)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/sophistication")
async def get_sophistication(limit: int = Query(50, ge=1, le=500)):
"""Score de sophistication adversaire par IP (rotation JA4 + récurrence + bruteforce)."""
try:
# Separate queries merged in Python to avoid view JOIN issues
rot_result = db.query("""
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
distinct_ja4_count
FROM mabase_prod.view_host_ip_ja4_rotation
""")
rotation_map = {str(row[0]): int(row[1]) for row in rot_result.result_rows}
rec_result = db.query("""
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
recurrence
FROM mabase_prod.view_ip_recurrence
""")
recurrence_map = {str(row[0]): int(row[1]) for row in rec_result.result_rows}
bf_result = db.query("""
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
sum(hits) AS total_hits
FROM mabase_prod.view_form_bruteforce_detected
GROUP BY ip
""")
bruteforce_map = {str(row[0]): int(row[1]) for row in bf_result.result_rows}
# Start from IPs that appear in rotation view (most evasive)
items = []
for ip, ja4_count in rotation_map.items():
recurrence = recurrence_map.get(ip, 0)
bf_hits = bruteforce_map.get(ip, 0)
score = min(100.0, ja4_count * 10 + recurrence * 20 + min(30.0, math.log(bf_hits + 1) * 5))
if score > 80:
tier = "APT-like"
elif score > 50:
tier = "Advanced"
elif score > 20:
tier = "Automated"
else:
tier = "Basic"
items.append({
"ip": ip,
"ja4_rotation_count": ja4_count,
"recurrence": recurrence,
"bruteforce_hits": bf_hits,
"sophistication_score": round(score, 1),
"tier": tier,
})
items.sort(key=lambda x: x["sophistication_score"], reverse=True)
items = items[:limit]
return {"items": items, "total": len(items)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/proactive-hunt")
async def get_proactive_hunt(
min_recurrence: int = Query(2, ge=1, description="Récurrence minimale"),
min_days: int = Query(2, ge=0, description="Jours d'activité minimum"),
limit: int = Query(50, ge=1, le=500),
):
"""IPs volant sous le radar : récurrentes mais sous le seuil de détection normal."""
try:
sql = """
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
recurrence,
worst_score,
worst_threat_level,
first_seen,
last_seen,
dateDiff('day', first_seen, last_seen) AS days_active
FROM mabase_prod.view_ip_recurrence
WHERE recurrence >= %(min_recurrence)s
AND abs(worst_score) < 0.5
AND dateDiff('day', first_seen, last_seen) >= %(min_days)s
ORDER BY recurrence DESC, worst_score ASC
LIMIT %(limit)s
"""
result = db.query(sql, {
"min_recurrence": min_recurrence,
"min_days": min_days,
"limit": limit,
})
items = []
for row in result.result_rows:
recurrence = int(row[1])
worst_score = float(row[2] or 0)
days_active = int(row[6] or 0)
ratio = recurrence / (worst_score + 0.1)
risk = "Évadeur potentiel" if ratio > 10 else "Persistant modéré"
items.append({
"ip": str(row[0]),
"recurrence": recurrence,
"worst_score": round(worst_score, 4),
"worst_threat_level": str(row[3] or ""),
"first_seen": str(row[4]),
"last_seen": str(row[5]),
"days_active": days_active,
"risk_assessment": risk,
})
return {"items": items, "total": len(items)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))