view_ip_recurrence :
Ajout de WHERE detected_at >= now() - INTERVAL 30 DAY
→ Avec PARTITION BY (P1), ClickHouse élagage les partitions hors de cette
plage avant même de lire les données. La vue ne scanne que les partitions
actives (au lieu des 30 partitions journalières complètes).
→ ORDER BY (src_ip) garantit que le GROUP BY src_ip lit des données
contiguës (aucune réorganisation mémoire).
rotation.py — supprimer FINAL sur ml_detected_anomalies :
FINAL force une déduplication complète du ReplacingMergeTree en mémoire
(équivalent à un DISTINCT sur toute la table) — une des opérations les plus
coûteuses dans ClickHouse.
Fix : remplacer le sous-SELECT FINAL par view_ip_recurrence (déjà aggrégée
par src_ip, retourne recurrence directement sans FINAL).
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
220 lines
8.1 KiB
Python
220 lines
8.1 KiB
Python
"""
|
|
Endpoints pour la détection de la rotation de fingerprints JA4 et des menaces persistantes
|
|
"""
|
|
from fastapi import APIRouter, HTTPException, Query
|
|
|
|
from ..database import db
|
|
from ..config import settings
|
|
|
|
router = APIRouter(prefix="/api/rotation", tags=["rotation"])
|
|
|
|
|
|
@router.get("/ja4-rotators")
|
|
async def get_ja4_rotators(limit: int = Query(50, ge=1, le=500)):
|
|
"""IPs qui effectuent le plus de rotation de fingerprints JA4."""
|
|
try:
|
|
sql = f"""
|
|
SELECT
|
|
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
|
|
distinct_ja4_count,
|
|
total_hits
|
|
FROM {settings.CLICKHOUSE_DB_PROCESSING}.view_host_ip_ja4_rotation
|
|
ORDER BY distinct_ja4_count DESC
|
|
LIMIT %(limit)s
|
|
"""
|
|
result = db.query(sql, {"limit": limit})
|
|
items = []
|
|
for row in result.result_rows:
|
|
distinct = int(row[1])
|
|
items.append({
|
|
"ip": str(row[0]),
|
|
"distinct_ja4_count":distinct,
|
|
"total_hits": int(row[2]),
|
|
"evasion_score": min(100, distinct * 15),
|
|
})
|
|
return {"items": items, "total": len(items)}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/persistent-threats")
|
|
async def get_persistent_threats(limit: int = Query(100, ge=1, le=1000)):
|
|
"""Menaces persistantes triées par score de persistance."""
|
|
try:
|
|
sql = f"""
|
|
SELECT
|
|
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
|
|
recurrence,
|
|
worst_score,
|
|
worst_threat_level,
|
|
first_seen,
|
|
last_seen
|
|
FROM {settings.CLICKHOUSE_DB_PROCESSING}.view_ip_recurrence
|
|
ORDER BY (least(100, recurrence * 20 + worst_score * 50)) DESC
|
|
LIMIT %(limit)s
|
|
"""
|
|
result = db.query(sql, {"limit": limit})
|
|
items = []
|
|
for row in result.result_rows:
|
|
recurrence = int(row[1])
|
|
worst_score = float(row[2] or 0)
|
|
items.append({
|
|
"ip": str(row[0]),
|
|
"recurrence": recurrence,
|
|
"worst_score": worst_score,
|
|
"worst_threat_level":str(row[3] or ""),
|
|
"first_seen": str(row[4]),
|
|
"last_seen": str(row[5]),
|
|
"persistence_score": min(100, recurrence * 20 + worst_score * 50),
|
|
})
|
|
return {"items": items, "total": len(items)}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/ip/{ip}/ja4-history")
|
|
async def get_ip_ja4_history(ip: str):
|
|
"""Historique des JA4 utilisés par une IP donnée."""
|
|
try:
|
|
sql = f"""
|
|
SELECT
|
|
ja4,
|
|
sum(hits) AS hits,
|
|
min(window_start) AS first_seen,
|
|
max(window_start) AS last_seen
|
|
FROM {settings.CLICKHOUSE_DB_PROCESSING}.agg_host_ip_ja4_1h
|
|
WHERE src_ip = IPv4MappedToIPv6(toIPv4(%(ip)s))
|
|
GROUP BY ja4
|
|
ORDER BY hits DESC
|
|
"""
|
|
result = db.query(sql, {"ip": ip})
|
|
items = [
|
|
{
|
|
"ja4": str(row[0]),
|
|
"hits": int(row[1]),
|
|
"first_seen":str(row[2]),
|
|
"last_seen": str(row[3]),
|
|
}
|
|
for row in result.result_rows
|
|
]
|
|
return {"ip": ip, "ja4_history": items, "total": len(items)}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/sophistication")
|
|
async def get_sophistication(limit: int = Query(50, ge=1, le=500)):
|
|
"""Score de sophistication adversaire par IP (rotation JA4 + récurrence + bruteforce).
|
|
Single SQL JOIN query — aucun traitement Python sur 34K entrées.
|
|
"""
|
|
try:
|
|
sql = f"""
|
|
SELECT
|
|
r.ip,
|
|
r.distinct_ja4_count,
|
|
coalesce(rec.recurrence, 0) AS recurrence,
|
|
coalesce(bf.bruteforce_hits, 0) AS bruteforce_hits,
|
|
round(least(100.0,
|
|
r.distinct_ja4_count * 10
|
|
+ coalesce(rec.recurrence, 0) * 20
|
|
+ least(30.0, log(coalesce(bf.bruteforce_hits, 0) + 1) * 5)
|
|
), 1) AS sophistication_score
|
|
FROM (
|
|
SELECT
|
|
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
|
|
distinct_ja4_count
|
|
FROM {settings.CLICKHOUSE_DB_PROCESSING}.view_host_ip_ja4_rotation
|
|
) r
|
|
LEFT JOIN (
|
|
-- Utilise view_ip_recurrence (pré-agrégée) au lieu de ml_detected_anomalies FINAL
|
|
-- FINAL force une déduplication complète du ReplacingMergeTree — très coûteux
|
|
SELECT
|
|
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
|
|
recurrence
|
|
FROM {settings.CLICKHOUSE_DB_PROCESSING}.view_ip_recurrence
|
|
) rec ON r.ip = rec.ip
|
|
LEFT JOIN (
|
|
SELECT
|
|
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
|
|
sum(hits) AS bruteforce_hits
|
|
FROM {settings.CLICKHOUSE_DB_PROCESSING}.view_form_bruteforce_detected
|
|
GROUP BY ip
|
|
) bf ON r.ip = bf.ip
|
|
ORDER BY sophistication_score DESC
|
|
LIMIT %(limit)s
|
|
"""
|
|
result = db.query(sql, {"limit": limit})
|
|
items = []
|
|
for row in result.result_rows:
|
|
score = float(row[4] or 0)
|
|
if score > 80:
|
|
tier = "APT-like"
|
|
elif score > 50:
|
|
tier = "Advanced"
|
|
elif score > 20:
|
|
tier = "Automated"
|
|
else:
|
|
tier = "Basic"
|
|
items.append({
|
|
"ip": str(row[0]),
|
|
"ja4_rotation_count": int(row[1] or 0),
|
|
"recurrence": int(row[2] or 0),
|
|
"bruteforce_hits": int(row[3] or 0),
|
|
"sophistication_score":score,
|
|
"tier": tier,
|
|
})
|
|
return {"items": items, "total": len(items)}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/proactive-hunt")
|
|
async def get_proactive_hunt(
|
|
min_recurrence: int = Query(2, ge=1, description="Récurrence minimale"),
|
|
min_days: int = Query(2, ge=0, description="Jours d'activité minimum"),
|
|
limit: int = Query(50, ge=1, le=500),
|
|
):
|
|
"""IPs volant sous le radar : récurrentes mais sous le seuil de détection normal."""
|
|
try:
|
|
sql = f"""
|
|
SELECT
|
|
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS ip,
|
|
recurrence,
|
|
worst_score,
|
|
worst_threat_level,
|
|
first_seen,
|
|
last_seen,
|
|
dateDiff('day', first_seen, last_seen) AS days_active
|
|
FROM {settings.CLICKHOUSE_DB_PROCESSING}.view_ip_recurrence
|
|
WHERE recurrence >= %(min_recurrence)s
|
|
AND abs(worst_score) < 0.5
|
|
AND dateDiff('day', first_seen, last_seen) >= %(min_days)s
|
|
ORDER BY recurrence DESC, worst_score ASC
|
|
LIMIT %(limit)s
|
|
"""
|
|
result = db.query(sql, {
|
|
"min_recurrence": min_recurrence,
|
|
"min_days": min_days,
|
|
"limit": limit,
|
|
})
|
|
items = []
|
|
for row in result.result_rows:
|
|
recurrence = int(row[1])
|
|
worst_score = float(row[2] or 0)
|
|
days_active = int(row[6] or 0)
|
|
ratio = recurrence / (worst_score + 0.1)
|
|
risk = "Évadeur potentiel" if ratio > 10 else "Persistant modéré"
|
|
items.append({
|
|
"ip": str(row[0]),
|
|
"recurrence": recurrence,
|
|
"worst_score": round(worst_score, 4),
|
|
"worst_threat_level": str(row[3] or ""),
|
|
"first_seen": str(row[4]),
|
|
"last_seen": str(row[5]),
|
|
"days_active": days_active,
|
|
"risk_assessment": risk,
|
|
})
|
|
return {"items": items, "total": len(items)}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|