Files
ja4-platform/services/dashboard/backend/routes/rotation.py
toto 2bfb4b7282 perf(dashboard): P2 — remplacer replaceRegexpAll dans les WHERE par IPv4MappedToIPv6
Problème : 8 clauses WHERE appliquaient une fonction sur la colonne src_ip :
  WHERE replaceRegexpAll(toString(src_ip), '^::ffff:', '') = %(ip)s
→ ClickHouse ne peut pas utiliser l'index de tri ou les skipping indexes
  quand une fonction est appliquée à la colonne filtrée.

Fix : transformer l'INPUT (le paramètre) plutôt que la colonne :
  WHERE src_ip = IPv4MappedToIPv6(toIPv4(%(ip)s))
→ src_ip reste intact → ClickHouse utilise les indexes (P1) et la
  projection proj_by_ip (P1) pour ces requêtes.

Fichiers modifiés :
  investigation_summary.py — 6 WHERE (ml_detected_anomalies, agg_host_ip_ja4_1h,
                              view_form_bruteforce_detected, view_host_ip_ja4_rotation,
                              view_ip_recurrence)
  ml_features.py           — 1 WHERE (view_ai_features_1h)
  rotation.py              — 1 WHERE (agg_host_ip_ja4_1h)

Note : les 27 autres occurrences de replaceRegexpAll dans les SELECT sont des
transformations d'affichage (IPv6→IPv4 pour l'UI) et ne bloquent pas les indexes.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-07 22:31:57 +02:00

219 lines
7.9 KiB
Python

"""
Endpoints pour la détection de la rotation de fingerprints JA4 et des menaces persistantes
"""
from fastapi import APIRouter, HTTPException, Query
from ..database import db
from ..config import settings
router = APIRouter(prefix="/api/rotation", tags=["rotation"])
@router.get("/ja4-rotators")
async def get_ja4_rotators(limit: int = Query(50, ge=1, le=500)):
"""IPs qui effectuent le plus de rotation de fingerprints JA4."""
try:
sql = f"""
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
distinct_ja4_count,
total_hits
FROM {settings.CLICKHOUSE_DB_PROCESSING}.view_host_ip_ja4_rotation
ORDER BY distinct_ja4_count DESC
LIMIT %(limit)s
"""
result = db.query(sql, {"limit": limit})
items = []
for row in result.result_rows:
distinct = int(row[1])
items.append({
"ip": str(row[0]),
"distinct_ja4_count":distinct,
"total_hits": int(row[2]),
"evasion_score": min(100, distinct * 15),
})
return {"items": items, "total": len(items)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/persistent-threats")
async def get_persistent_threats(limit: int = Query(100, ge=1, le=1000)):
"""Menaces persistantes triées par score de persistance."""
try:
sql = f"""
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
recurrence,
worst_score,
worst_threat_level,
first_seen,
last_seen
FROM {settings.CLICKHOUSE_DB_PROCESSING}.view_ip_recurrence
ORDER BY (least(100, recurrence * 20 + worst_score * 50)) DESC
LIMIT %(limit)s
"""
result = db.query(sql, {"limit": limit})
items = []
for row in result.result_rows:
recurrence = int(row[1])
worst_score = float(row[2] or 0)
items.append({
"ip": str(row[0]),
"recurrence": recurrence,
"worst_score": worst_score,
"worst_threat_level":str(row[3] or ""),
"first_seen": str(row[4]),
"last_seen": str(row[5]),
"persistence_score": min(100, recurrence * 20 + worst_score * 50),
})
return {"items": items, "total": len(items)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/ip/{ip}/ja4-history")
async def get_ip_ja4_history(ip: str):
"""Historique des JA4 utilisés par une IP donnée."""
try:
sql = f"""
SELECT
ja4,
sum(hits) AS hits,
min(window_start) AS first_seen,
max(window_start) AS last_seen
FROM {settings.CLICKHOUSE_DB_PROCESSING}.agg_host_ip_ja4_1h
WHERE src_ip = IPv4MappedToIPv6(toIPv4(%(ip)s))
GROUP BY ja4
ORDER BY hits DESC
"""
result = db.query(sql, {"ip": ip})
items = [
{
"ja4": str(row[0]),
"hits": int(row[1]),
"first_seen":str(row[2]),
"last_seen": str(row[3]),
}
for row in result.result_rows
]
return {"ip": ip, "ja4_history": items, "total": len(items)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/sophistication")
async def get_sophistication(limit: int = Query(50, ge=1, le=500)):
"""Score de sophistication adversaire par IP (rotation JA4 + récurrence + bruteforce).
Single SQL JOIN query — aucun traitement Python sur 34K entrées.
"""
try:
sql = f"""
SELECT
r.ip,
r.distinct_ja4_count,
coalesce(rec.recurrence, 0) AS recurrence,
coalesce(bf.bruteforce_hits, 0) AS bruteforce_hits,
round(least(100.0,
r.distinct_ja4_count * 10
+ coalesce(rec.recurrence, 0) * 20
+ least(30.0, log(coalesce(bf.bruteforce_hits, 0) + 1) * 5)
), 1) AS sophistication_score
FROM (
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
distinct_ja4_count
FROM {settings.CLICKHOUSE_DB_PROCESSING}.view_host_ip_ja4_rotation
) r
LEFT JOIN (
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
count() AS recurrence
FROM {settings.CLICKHOUSE_DB_PROCESSING}.ml_detected_anomalies FINAL
GROUP BY ip
) rec ON r.ip = rec.ip
LEFT JOIN (
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
sum(hits) AS bruteforce_hits
FROM {settings.CLICKHOUSE_DB_PROCESSING}.view_form_bruteforce_detected
GROUP BY ip
) bf ON r.ip = bf.ip
ORDER BY sophistication_score DESC
LIMIT %(limit)s
"""
result = db.query(sql, {"limit": limit})
items = []
for row in result.result_rows:
score = float(row[4] or 0)
if score > 80:
tier = "APT-like"
elif score > 50:
tier = "Advanced"
elif score > 20:
tier = "Automated"
else:
tier = "Basic"
items.append({
"ip": str(row[0]),
"ja4_rotation_count": int(row[1] or 0),
"recurrence": int(row[2] or 0),
"bruteforce_hits": int(row[3] or 0),
"sophistication_score":score,
"tier": tier,
})
return {"items": items, "total": len(items)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/proactive-hunt")
async def get_proactive_hunt(
min_recurrence: int = Query(2, ge=1, description="Récurrence minimale"),
min_days: int = Query(2, ge=0, description="Jours d'activité minimum"),
limit: int = Query(50, ge=1, le=500),
):
"""IPs volant sous le radar : récurrentes mais sous le seuil de détection normal."""
try:
sql = f"""
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
recurrence,
worst_score,
worst_threat_level,
first_seen,
last_seen,
dateDiff('day', first_seen, last_seen) AS days_active
FROM {settings.CLICKHOUSE_DB_PROCESSING}.view_ip_recurrence
WHERE recurrence >= %(min_recurrence)s
AND abs(worst_score) < 0.5
AND dateDiff('day', first_seen, last_seen) >= %(min_days)s
ORDER BY recurrence DESC, worst_score ASC
LIMIT %(limit)s
"""
result = db.query(sql, {
"min_recurrence": min_recurrence,
"min_days": min_days,
"limit": limit,
})
items = []
for row in result.result_rows:
recurrence = int(row[1])
worst_score = float(row[2] or 0)
days_active = int(row[6] or 0)
ratio = recurrence / (worst_score + 0.1)
risk = "Évadeur potentiel" if ratio > 10 else "Persistant modéré"
items.append({
"ip": str(row[0]),
"recurrence": recurrence,
"worst_score": round(worst_score, 4),
"worst_threat_level": str(row[3] or ""),
"first_seen": str(row[4]),
"last_seen": str(row[5]),
"days_active": days_active,
"risk_assessment": risk,
})
return {"items": items, "total": len(items)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))