Files
dashboard/backend/routes/fingerprints.py
SOC Analyst 533072a157 fix: suppression de tous les LIMIT hardcodés dans les requêtes SQL
Supprime les LIMIT arbitraires qui tronquaient silencieusement les résultats:

- analysis.py   : LIMIT 5, 10, 100, 500 (pays ASN, top pays, UAs)
- variability.py: LIMIT 10, 20 (JA4s, pays, ASNs, hosts, UAs)
- fingerprints.py: LIMIT 10, 20, 100 (IPs, UAs, JA4 spoofing)
- entities.py   : LIMIT 100 (IPs associées)
- tcp_spoofing.py: LIMIT 10, 12, 15 (distributions TTL/MSS/window)
- heatmap.py    : LIMIT 15
- search.py     : LIMIT 5 (suggestions de recherche)

Conservés: LIMIT 1 (lookup d'un seul enregistrement) et
LIMIT %(limit)s / OFFSET (pagination contrôlée par le frontend).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-19 18:10:55 +01:00

829 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Endpoints pour l'analyse des fingerprints JA4 et User-Agents
Objectifs:
- Détecter le spoofing JA4 (fingerprint TLS qui prétend être un navigateur mais
dont les User-Agents, les headers HTTP ou les métriques comportementales trahissent
une origine bot/script)
- Construire une matrice JA4 × User-Agent pour visualiser les associations suspectes
- Analyser la distribution des User-Agents pour identifier les rotateurs et les bots
qui usurpent des UA de navigateurs légitimes
"""
from fastapi import APIRouter, HTTPException, Query
from typing import Optional
import re
from ..database import db
router = APIRouter(prefix="/api/fingerprints", tags=["fingerprints"])
# ─── Helpers ──────────────────────────────────────────────────────────────────
# Patterns indiquant clairement un bot/script sans simulation de navigateur
_BOT_PATTERNS = re.compile(
r"bot|crawler|spider|scraper|python|curl|wget|go-http|java/|axios|"
r"libwww|httpclient|okhttp|requests|aiohttp|httpx|playwright|puppeteer|"
r"selenium|headless|phantomjs",
re.IGNORECASE,
)
# Navigateurs légitimes communs — un JA4 de type "browser" devrait venir avec ces UAs
_BROWSER_PATTERNS = re.compile(
r"mozilla|chrome|safari|firefox|edge|opera|trident",
re.IGNORECASE,
)
def _classify_ua(ua: str) -> str:
"""Retourne 'bot', 'browser', ou 'script'"""
if not ua:
return "empty"
if _BOT_PATTERNS.search(ua):
return "bot"
if _BROWSER_PATTERNS.search(ua):
return "browser"
return "script"
# =============================================================================
# ENDPOINT 1 — Détection de spoofing JA4
# =============================================================================
@router.get("/spoofing")
async def get_ja4_spoofing(
hours: int = Query(24, ge=1, le=168, description="Fenêtre temporelle"),
min_detections: int = Query(10, ge=1, description="Nombre minimum de détections"),
limit: int = Query(50, ge=1, le=200),
):
"""
Identifie les JA4 fingerprints suspects de spoofing navigateur.
Un JA4 est considéré suspect quand:
- Il présente un taux élevé de ua_ch_mismatch (header UA ≠ Client Hints)
- Son modern_browser_score est élevé mais les UAs associés sont des bots/scripts
- Il apparaît avec un taux élevé de sni_host_mismatch ou alpn_http_mismatch
- is_rare_ja4 = true avec un volume important
Retourne un score de confiance de spoofing [0-100] pour chaque JA4.
"""
try:
# Agrégation par JA4 avec tous les indicateurs de spoofing
query = """
SELECT
ja4,
count() AS total_detections,
uniq(src_ip) AS unique_ips,
-- Indicateurs de mismatch
countIf(ua_ch_mismatch = true) AS ua_ch_mismatch_count,
round(countIf(ua_ch_mismatch = true) * 100.0 / count(), 2) AS ua_ch_mismatch_pct,
countIf(sni_host_mismatch = true) AS sni_mismatch_count,
round(countIf(sni_host_mismatch = true) * 100.0 / count(), 2) AS sni_mismatch_pct,
countIf(alpn_http_mismatch = true) AS alpn_mismatch_count,
round(countIf(alpn_http_mismatch = true) * 100.0 / count(), 2) AS alpn_mismatch_pct,
-- Indicateurs comportementaux
avg(modern_browser_score) AS avg_browser_score,
countIf(is_rare_ja4 = true) AS rare_ja4_count,
round(countIf(is_rare_ja4 = true) * 100.0 / count(), 2) AS rare_ja4_pct,
countIf(is_ua_rotating = true) AS ua_rotating_count,
round(countIf(is_ua_rotating = true) * 100.0 / count(), 2) AS ua_rotating_pct,
-- Métriques TLS/TCP
countIf(is_alpn_missing = true) AS alpn_missing_count,
avg(distinct_ja4_count) AS avg_distinct_ja4_per_ip,
-- Répartition threat levels
countIf(threat_level = 'CRITICAL') AS critical_count,
countIf(threat_level = 'HIGH') AS high_count,
-- Botnet indicators
avg(ja4_asn_concentration) AS avg_asn_concentration,
avg(ja4_country_concentration) AS avg_country_concentration,
argMax(threat_level, detected_at) AS last_threat_level
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL %(hours)s HOUR
AND ja4 != '' AND ja4 IS NOT NULL
GROUP BY ja4
HAVING total_detections >= %(min_detections)s
ORDER BY ua_ch_mismatch_pct DESC, total_detections DESC
LIMIT %(limit)s
"""
result = db.query(query, {
"hours": hours,
"min_detections": min_detections,
"limit": limit,
})
# Fetch top UA per JA4 from view_dashboard_user_agents
ja4_list = [str(r[0]) for r in result.result_rows if r[0]]
ua_by_ja4: dict = {}
if ja4_list:
ja4_sql = ", ".join(f"'{j}'" for j in ja4_list[:100])
ua_q = f"""
SELECT ja4, groupArray(5)(ua) AS top_uas
FROM (
SELECT ja4, arrayJoin(user_agents) AS ua, sum(requests) AS cnt
FROM view_dashboard_user_agents
WHERE ja4 IN ({ja4_sql})
AND hour >= now() - INTERVAL {hours} HOUR
AND ua != ''
GROUP BY ja4, ua
ORDER BY ja4, cnt DESC
)
GROUP BY ja4
"""
try:
ua_res = db.query(ua_q)
for ua_row in ua_res.result_rows:
j4 = str(ua_row[0])
if ua_row[1]:
ua_by_ja4[j4] = list(ua_row[1])
except Exception:
pass
items = []
for row in result.result_rows:
ja4 = str(row[0])
ua_ch_mismatch_pct = float(row[4] or 0)
sni_mismatch_pct = float(row[6] or 0)
alpn_mismatch_pct = float(row[8] or 0)
avg_browser_score = float(row[9] or 0)
rare_ja4_pct = float(row[11] or 0)
ua_rotating_pct = float(row[13] or 0)
alpn_missing_count = int(row[14] or 0)
total = int(row[1] or 1)
top_uas = ua_by_ja4.get(ja4, [])
ua_classes = [_classify_ua(u) for u in top_uas]
has_bot_ua = any(c == "bot" for c in ua_classes)
has_browser_ua = any(c == "browser" for c in ua_classes)
# Spoofing confidence score [0-100]:
# UA/CH mismatch est le signal le plus fort (poids 40)
# Browser UA avec score navigateur élevé mais indicateurs bot (poids 25)
# SNI/ALPN mismatches (poids 15)
# is_rare_ja4 avec gros volume (poids 10)
# UA rotating (poids 10)
spoof_score = min(100, round(
ua_ch_mismatch_pct * 0.40
+ (avg_browser_score * 25 / 100 if has_bot_ua else 0)
+ sni_mismatch_pct * 0.10
+ alpn_mismatch_pct * 0.05
+ rare_ja4_pct * 0.10
+ ua_rotating_pct * 0.10
+ (10 if alpn_missing_count > total * 0.3 else 0)
))
# Classification du JA4
if spoof_score >= 60:
classification = "spoofed_browser"
elif has_bot_ua and avg_browser_score < 30:
classification = "known_bot"
elif has_browser_ua and ua_ch_mismatch_pct < 10:
classification = "legitimate_browser"
else:
classification = "suspicious"
items.append({
"ja4": ja4,
"classification": classification,
"spoofing_score": spoof_score,
"total_detections": int(row[1] or 0),
"unique_ips": int(row[2] or 0),
"indicators": {
"ua_ch_mismatch_pct": ua_ch_mismatch_pct,
"sni_mismatch_pct": sni_mismatch_pct,
"alpn_mismatch_pct": alpn_mismatch_pct,
"avg_browser_score": round(avg_browser_score, 1),
"rare_ja4_pct": rare_ja4_pct,
"ua_rotating_pct": ua_rotating_pct,
"alpn_missing_count": alpn_missing_count,
"avg_asn_concentration": round(float(row[18] or 0), 3),
"avg_country_concentration": round(float(row[19] or 0), 3),
},
"top_user_agents": [
{"ua": u, "type": _classify_ua(u)} for u in top_uas
],
"threat_breakdown": {
"critical": int(row[16] or 0),
"high": int(row[17] or 0),
"last_level": str(row[20] or "LOW"),
},
})
# Trier: spoofed_browser d'abord, puis par score
items.sort(key=lambda x: (-x["spoofing_score"], -x["total_detections"]))
return {
"items": items,
"total": len(items),
"period_hours": hours,
"summary": {
"spoofed_browser": sum(1 for i in items if i["classification"] == "spoofed_browser"),
"known_bot": sum(1 for i in items if i["classification"] == "known_bot"),
"suspicious": sum(1 for i in items if i["classification"] == "suspicious"),
"legitimate_browser": sum(1 for i in items if i["classification"] == "legitimate_browser"),
},
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
# =============================================================================
# ENDPOINT 2 — Matrice JA4 × User-Agent
# =============================================================================
@router.get("/ja4-ua-matrix")
async def get_ja4_ua_matrix(
hours: int = Query(24, ge=1, le=168),
min_ips: int = Query(3, ge=1, description="Nombre minimum d'IPs pour inclure un JA4"),
limit: int = Query(30, ge=1, le=100),
):
"""
Matrice JA4 × User-Agent.
Pour chaque JA4:
- Top User-Agents associés (depuis view_dashboard_entities)
- Taux de ua_ch_mismatch
- Classification UA (bot / browser / script)
- Indicateur de spoofing si browser_score élevé + UA non-navigateur
"""
try:
# Stats JA4 depuis ml_detected_anomalies
stats_query = """
SELECT
ja4,
uniq(src_ip) AS unique_ips,
count() AS total_detections,
round(countIf(ua_ch_mismatch = true) * 100.0 / count(), 2) AS ua_ch_mismatch_pct,
avg(modern_browser_score) AS avg_browser_score,
countIf(is_rare_ja4 = true) AS rare_count,
countIf(is_ua_rotating = true) AS rotating_count,
argMax(threat_level, detected_at) AS last_threat
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL %(hours)s HOUR
AND ja4 != '' AND ja4 IS NOT NULL
GROUP BY ja4
HAVING unique_ips >= %(min_ips)s
ORDER BY ua_ch_mismatch_pct DESC, unique_ips DESC
LIMIT %(limit)s
"""
stats_res = db.query(stats_query, {"hours": hours, "min_ips": min_ips, "limit": limit})
ja4_list = [str(r[0]) for r in stats_res.result_rows]
if not ja4_list:
return {"items": [], "total": 0, "period_hours": hours}
# UAs par JA4 depuis view_dashboard_user_agents
ja4_sql = ", ".join(f"'{j}'" for j in ja4_list)
ua_query = f"""
SELECT
ja4,
ua,
sum(requests) AS cnt
FROM view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE ja4 IN ({ja4_sql})
AND hour >= now() - INTERVAL {hours} HOUR
AND ua != ''
GROUP BY ja4, ua
ORDER BY ja4, cnt DESC
"""
ua_by_ja4: dict = {}
try:
ua_res = db.query(ua_query)
for row in ua_res.result_rows:
j4 = str(row[0])
if j4 not in ua_by_ja4:
ua_by_ja4[j4] = []
if len(ua_by_ja4[j4]) < 8:
ua_by_ja4[j4].append({"ua": str(row[1]), "count": int(row[2] or 0)})
except Exception:
pass
items = []
for row in stats_res.result_rows:
ja4 = str(row[0])
unique_ips = int(row[1] or 0)
ua_ch_mismatch_pct = float(row[3] or 0)
avg_browser_score = float(row[4] or 0)
top_uas = ua_by_ja4.get(ja4, [])
ua_total = sum(u["count"] for u in top_uas) or 1
classified_uas = []
for u in top_uas:
ua_type = _classify_ua(u["ua"])
classified_uas.append({
"ua": u["ua"],
"count": u["count"],
"pct": round(u["count"] * 100 / ua_total, 1),
"type": ua_type,
})
bot_pct = sum(u["pct"] for u in classified_uas if u["type"] == "bot")
browser_pct = sum(u["pct"] for u in classified_uas if u["type"] == "browser")
# Spoofing flag: JA4 ressemble à un navigateur (browser_score élevé)
# mais les UAs sont des bots/scripts
is_spoofing = avg_browser_score > 50 and bot_pct > 30 and ua_ch_mismatch_pct > 20
items.append({
"ja4": ja4,
"unique_ips": unique_ips,
"total_detections": int(row[2] or 0),
"ua_ch_mismatch_pct": ua_ch_mismatch_pct,
"avg_browser_score": round(avg_browser_score, 1),
"rare_count": int(row[5] or 0),
"rotating_count": int(row[6] or 0),
"last_threat": str(row[7] or "LOW"),
"user_agents": classified_uas,
"ua_summary": {
"bot_pct": round(bot_pct, 1),
"browser_pct": round(browser_pct, 1),
"script_pct": round(100 - bot_pct - browser_pct, 1),
"total_distinct": len(top_uas),
},
"is_spoofing_suspect": is_spoofing,
})
return {
"items": items,
"total": len(items),
"period_hours": hours,
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
# =============================================================================
# ENDPOINT 3 — Analyse globale des User-Agents
# =============================================================================
@router.get("/ua-analysis")
async def get_ua_analysis(
hours: int = Query(24, ge=1, le=168),
limit: int = Query(50, ge=1, le=200),
):
"""
Analyse globale des User-Agents dans les détections.
Identifie:
- UAs de type bot/script
- UAs browser légitimes vs UAs browser utilisés par des bots (via ua_ch_mismatch)
- UAs rares/suspects qui tournent (is_ua_rotating)
- Distribution JA4 par UA pour détecter les UAs multi-fingerprints (rotation)
"""
try:
# Top UAs globaux depuis view_dashboard_user_agents
ua_global_query = """
SELECT
ua,
sum(requests) AS ip_count
FROM view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE hour >= now() - INTERVAL %(hours)s HOUR
AND ua != ''
GROUP BY ua
ORDER BY ip_count DESC
LIMIT %(limit)s
"""
ua_global_res = db.query(ua_global_query, {"hours": hours, "limit": limit})
top_uas = [str(r[0]) for r in ua_global_res.result_rows]
# Pour chaque UA, chercher ses JA4 via view_dashboard_user_agents
ua_sql = ", ".join(f"'{u.replace(chr(39), chr(39)*2)}'" for u in top_uas[:50]) if top_uas else "''"
ja4_per_ua_query = f"""
SELECT
ua,
uniq(ja4) AS unique_ja4s,
groupUniqArray(3)(ja4) AS sample_ja4s
FROM view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE ua IN ({ua_sql})
AND hour >= now() - INTERVAL {hours} HOUR
AND ua != ''
AND ja4 != ''
GROUP BY ua
"""
ja4_by_ua: dict = {}
try:
ja4_res = db.query(ja4_per_ua_query)
for r in ja4_res.result_rows:
ja4_by_ua[str(r[0])] = {
"unique_ja4s": int(r[1] or 0),
"sample_ja4s": list(r[2] or []),
}
except Exception:
pass
# IPs avec is_ua_rotating depuis ml_detected_anomalies
rotating_query = """
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS clean_ip,
avg(ua_ch_mismatch) AS avg_ua_ch_mismatch
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL %(hours)s HOUR
AND is_ua_rotating = true
GROUP BY clean_ip
ORDER BY avg_ua_ch_mismatch DESC
"""
rotating_ips: list = []
try:
rot_res = db.query(rotating_query, {"hours": hours})
rotating_ips = [str(r[0]) for r in rot_res.result_rows]
except Exception:
pass
# Construire la réponse
items = []
for row in ua_global_res.result_rows:
ua = str(row[0])
ip_count = int(row[1] or 0)
ua_type = _classify_ua(ua)
ja4_info = ja4_by_ua.get(ua, {"unique_ja4s": 0, "sample_ja4s": []})
# UA multi-JA4 est suspect: un vrai navigateur a généralement 1-2 JA4
multi_ja4_flag = ja4_info["unique_ja4s"] > 3
items.append({
"user_agent": ua,
"type": ua_type,
"ip_count": ip_count,
"unique_ja4_count": ja4_info["unique_ja4s"],
"sample_ja4s": ja4_info["sample_ja4s"],
"is_multi_ja4_suspect": multi_ja4_flag,
"risk_flags": _build_ua_risk_flags(ua, ua_type, ja4_info["unique_ja4s"], ip_count),
})
# IPs avec rotation d'UA
ua_rotating_stats = {
"rotating_ip_count": len(rotating_ips),
"sample_rotating_ips": rotating_ips[:10],
}
return {
"items": items,
"total": len(items),
"period_hours": hours,
"ua_rotating_stats": ua_rotating_stats,
"summary": {
"bot_count": sum(1 for i in items if i["type"] == "bot"),
"browser_count": sum(1 for i in items if i["type"] == "browser"),
"script_count": sum(1 for i in items if i["type"] == "script"),
"multi_ja4_suspect_count": sum(1 for i in items if i["is_multi_ja4_suspect"]),
},
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
def _build_ua_risk_flags(ua: str, ua_type: str, unique_ja4s: int, ip_count: int) -> list:
flags = []
if ua_type == "bot":
flags.append("ua_bot_signature")
elif ua_type == "script":
flags.append("ua_script_library")
if unique_ja4s > 5:
flags.append("ja4_rotation_suspect")
if unique_ja4s > 3 and ua_type == "browser":
flags.append("browser_ua_multi_fingerprint")
if ip_count > 100:
flags.append("high_volume")
return flags
# =============================================================================
# ENDPOINT 4 — JA4 d'un IP spécifique: analyse de cohérence UA/JA4
# =============================================================================
@router.get("/ip/{ip}/coherence")
async def get_ip_fingerprint_coherence(ip: str):
"""
Analyse la cohérence JA4/UA pour une IP spécifique.
Répond à la question: "Cette IP spoofait-elle son fingerprint?"
Calcule un score de cohérence basé sur:
- Correspondance entre JA4 (TLS client fingerprint) et User-Agent
- ua_ch_mismatch (User-Agent vs Client Hints)
- modern_browser_score vs type d'UA réel
- Nombre de JA4 distincts utilisés (rotation)
- sni_host_mismatch, alpn_http_mismatch
"""
try:
# Données depuis ml_detected_anomalies
ml_query = """
SELECT
ja4,
ua_ch_mismatch,
modern_browser_score,
sni_host_mismatch,
alpn_http_mismatch,
is_alpn_missing,
is_rare_ja4,
is_ua_rotating,
distinct_ja4_count,
header_count,
has_accept_language,
has_cookie,
has_referer,
header_order_shared_count,
detected_at,
threat_level,
window_mss_ratio,
tcp_jitter_variance,
multiplexing_efficiency
FROM ml_detected_anomalies
WHERE src_ip = %(ip)s
ORDER BY detected_at DESC
"""
ml_res = db.query(ml_query, {"ip": ip})
if not ml_res.result_rows:
raise HTTPException(status_code=404, detail="IP non trouvée dans les détections")
# User-agents réels depuis view_dashboard_user_agents
ua_query = """
SELECT ua, sum(requests) AS cnt
FROM view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE toString(src_ip) = %(ip)s
AND hour >= now() - INTERVAL 72 HOUR
AND ua != ''
GROUP BY ua ORDER BY cnt DESC
"""
ua_res = db.query(ua_query, {"ip": ip})
top_uas = [{"ua": str(r[0]), "count": int(r[1] or 0), "type": _classify_ua(str(r[0]))}
for r in ua_res.result_rows]
# Agréger les indicateurs de la dernière session
rows = ml_res.result_rows
latest = rows[0]
total_rows = len(rows)
ua_ch_mismatch_count = sum(1 for r in rows if r[1])
sni_mismatch_count = sum(1 for r in rows if r[3])
alpn_mismatch_count = sum(1 for r in rows if r[4])
is_rare_count = sum(1 for r in rows if r[6])
is_rotating = any(r[7] for r in rows)
distinct_ja4s = {str(r[0]) for r in rows if r[0]}
avg_browser_score = sum(int(r[2] or 0) for r in rows) / total_rows
# UA analysis
has_browser_ua = any(u["type"] == "browser" for u in top_uas)
has_bot_ua = any(u["type"] == "bot" for u in top_uas)
primary_ua_type = top_uas[0]["type"] if top_uas else "empty"
# Calcul du score de spoofing
spoof_score = min(100, round(
(ua_ch_mismatch_count / total_rows * 100) * 0.40
+ (avg_browser_score * 0.20 if has_bot_ua else 0)
+ (sni_mismatch_count / total_rows * 100) * 0.10
+ (alpn_mismatch_count / total_rows * 100) * 0.05
+ (len(distinct_ja4s) * 5 if len(distinct_ja4s) > 2 else 0)
+ (15 if is_rotating else 0)
+ (10 if is_rare_count > total_rows * 0.5 else 0)
))
# Verdict
if spoof_score >= 70:
verdict = "high_confidence_spoofing"
elif spoof_score >= 40:
verdict = "suspicious_spoofing"
elif has_bot_ua and avg_browser_score < 20:
verdict = "known_bot_no_spoofing"
elif has_browser_ua and spoof_score < 20:
verdict = "legitimate_browser"
else:
verdict = "inconclusive"
# Explication humaine
explanation = []
if ua_ch_mismatch_count > total_rows * 0.3:
explanation.append(f"UA-Client-Hints mismatch sur {round(ua_ch_mismatch_count*100/total_rows)}% des requêtes")
if has_bot_ua and avg_browser_score > 40:
explanation.append(f"JA4 ressemble à un navigateur (score {round(avg_browser_score)}/100) mais UA est de type bot")
if len(distinct_ja4s) > 2:
explanation.append(f"{len(distinct_ja4s)} JA4 distincts utilisés → rotation de fingerprint")
if is_rotating:
explanation.append("is_ua_rotating détecté → rotation d'User-Agent confirmée")
if sni_mismatch_count > 0:
explanation.append(f"SNI ≠ Host header sur {sni_mismatch_count}/{total_rows} requêtes")
if not explanation:
explanation.append("Aucun indicateur de spoofing majeur détecté")
return {
"ip": ip,
"verdict": verdict,
"spoofing_score": spoof_score,
"explanation": explanation,
"indicators": {
"ua_ch_mismatch_rate": round(ua_ch_mismatch_count / total_rows * 100, 1),
"sni_mismatch_rate": round(sni_mismatch_count / total_rows * 100, 1),
"alpn_mismatch_rate": round(alpn_mismatch_count / total_rows * 100, 1),
"avg_browser_score": round(avg_browser_score, 1),
"distinct_ja4_count": len(distinct_ja4s),
"is_ua_rotating": is_rotating,
"rare_ja4_rate": round(is_rare_count / total_rows * 100, 1),
},
"fingerprints": {
"ja4_list": list(distinct_ja4s),
"latest_ja4": str(latest[0] or ""),
},
"user_agents": top_uas,
"latest_detection": {
"detected_at": latest[14].isoformat() if latest[14] else "",
"threat_level": str(latest[15] or "LOW"),
"modern_browser_score": int(latest[2] or 0),
"header_count": int(latest[9] or 0),
"has_accept_language": bool(latest[10]),
"has_cookie": bool(latest[11]),
"has_referer": bool(latest[12]),
"header_order_shared_count": int(latest[13] or 0),
},
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
# =============================================================================
# ENDPOINT 5 — JA4 légitimes (baseline / whitelist)
# =============================================================================
@router.get("/legitimate-ja4")
async def get_legitimate_ja4(
hours: int = Query(168, ge=24, le=720, description="Fenêtre pour établir la baseline"),
min_ips: int = Query(50, ge=5, description="Nombre minimum d'IPs pour qualifier un JA4 de légitime"),
):
"""
Établit une baseline des JA4 fingerprints légitimes.
Un JA4 est considéré légitime si:
- Il est utilisé par un grand nombre d'IPs distinctes (> min_ips)
- Son taux de ua_ch_mismatch est faible (< 5%)
- Son modern_browser_score est élevé (> 60)
- Il n'est PAS is_rare_ja4
- Ses UAs sont dominés par des navigateurs connus
Utile comme whitelist pour réduire les faux positifs.
"""
try:
query = """
SELECT
ja4,
uniq(src_ip) AS unique_ips,
count() AS total_detections,
round(countIf(ua_ch_mismatch = true) * 100.0 / count(), 2) AS ua_ch_mismatch_pct,
avg(modern_browser_score) AS avg_browser_score,
countIf(is_rare_ja4 = true) AS rare_count,
round(countIf(threat_level = 'CRITICAL') * 100.0 / count(), 2) AS critical_pct,
round(countIf(threat_level = 'HIGH') * 100.0 / count(), 2) AS high_pct
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL %(hours)s HOUR
AND ja4 != '' AND ja4 IS NOT NULL
GROUP BY ja4
HAVING unique_ips >= %(min_ips)s
AND ua_ch_mismatch_pct < 5.0
AND avg_browser_score > 60
AND rare_count = 0
ORDER BY unique_ips DESC
"""
result = db.query(query, {"hours": hours, "min_ips": min_ips})
items = [
{
"ja4": str(row[0]),
"unique_ips": int(row[1] or 0),
"total_detections": int(row[2] or 0),
"ua_ch_mismatch_pct": float(row[3] or 0),
"avg_browser_score": round(float(row[4] or 0), 1),
"critical_pct": float(row[6] or 0),
"high_pct": float(row[7] or 0),
"legitimacy_confidence": min(100, round(
(1 - float(row[3] or 0) / 100) * 40
+ float(row[4] or 0) * 0.40
+ min(int(row[1] or 0) / min_ips, 1) * 20
)),
}
for row in result.result_rows
]
return {
"items": items,
"total": len(items),
"period_hours": hours,
"note": "Ces JA4 sont candidats à une whitelist. Vérifier manuellement avant de whitelister.",
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
# =============================================================================
# ENDPOINT — Corrélation JA4 × ASN / Pays (C5)
# Détecte les JA4 fortement concentrés sur un seul ASN ou pays
# → signal de botnet ciblé ou d'infrastructure de test/attaque partagée
# =============================================================================
@router.get("/asn-correlation")
async def get_ja4_asn_correlation(
min_concentration: float = Query(0.7, ge=0.0, le=1.0, description="Seuil min de concentration ASN ou pays"),
min_ips: int = Query(5, ge=1, description="Nombre minimum d'IPs par JA4"),
limit: int = Query(50, ge=1, le=200),
):
"""
Identifie les JA4 fingerprints fortement concentrés sur un seul ASN ou pays.
Un JA4 avec asn_concentration ≥ 0.7 signifie que ≥70% des IPs utilisant ce fingerprint
proviennent du même ASN → infrastructure de bot partagée ou datacenter suspect.
"""
try:
# Two-pass: first aggregate per (ja4, asn) to get IP counts per ASN,
# then aggregate per ja4 to compute concentration ratio
sql = """
SELECT
ja4,
sum(ips_per_combo) AS unique_ips,
uniq(src_asn) AS unique_asns,
uniq(src_country_code) AS unique_countries,
toString(argMax(src_asn, ips_per_combo)) AS top_asn_number,
argMax(asn_name, ips_per_combo) AS top_asn_name,
argMax(src_country_code, country_ips) AS dominant_country,
sum(total_hits) AS total_hits,
round(max(ips_per_combo) / greatest(sum(ips_per_combo), 1), 3) AS asn_concentration,
round(max(country_ips) / greatest(sum(ips_per_combo), 1), 3) AS country_concentration
FROM (
SELECT
ja4,
src_asn,
src_country_code,
any(src_as_name) AS asn_name,
uniq(src_ip) AS ips_per_combo,
uniq(src_ip) AS country_ips,
sum(hits) AS total_hits
FROM mabase_prod.agg_host_ip_ja4_1h
WHERE window_start >= now() - INTERVAL 24 HOUR
AND ja4 != ''
GROUP BY ja4, src_asn, src_country_code
)
GROUP BY ja4
HAVING unique_ips >= %(min_ips)s
AND (asn_concentration >= %(min_conc)s OR country_concentration >= %(min_conc)s)
ORDER BY asn_concentration DESC, unique_ips DESC
LIMIT %(limit)s
"""
result = db.query(sql, {"min_ips": min_ips, "min_conc": min_concentration, "limit": limit})
items = []
for row in result.result_rows:
ja4 = str(row[0])
unique_ips = int(row[1])
unique_asns = int(row[2])
unique_countries = int(row[3])
top_asn_number = str(row[4] or "")
top_asn_name = str(row[5] or "")
dominant_country = str(row[6] or "")
total_hits = int(row[7] or 0)
asn_concentration = float(row[8] or 0)
country_concentration = float(row[9] or 0)
if asn_concentration >= 0.85:
corr_type, risk = "asn_monopoly", "high"
elif asn_concentration >= min_concentration:
corr_type, risk = "asn_dominant", "medium"
elif country_concentration >= min_concentration:
corr_type, risk = "geo_targeted", "medium"
else:
corr_type, risk = "distributed", "low"
items.append({
"ja4": ja4,
"unique_ips": unique_ips,
"unique_asns": unique_asns,
"unique_countries": unique_countries,
"top_asn_name": top_asn_name,
"top_asn_number": top_asn_number,
"dominant_country": dominant_country,
"total_hits": total_hits,
"asn_concentration": asn_concentration,
"country_concentration":country_concentration,
"correlation_type": corr_type,
"risk": risk,
})
return {"items": items, "total": len(items)}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")