fix: correct CampaignsView, analysis.py IPv4 split, entities date filter

- CampaignsView: update ClusterData interface to match real API response
  (severity/unique_ips/score instead of threat_level/total_ips/confidence_range)
  Fix fetch to use data.items, rewrite ClusterCard and BehavioralTab
  Remove unused getClassificationColor and THREAT_ORDER constants
- analysis.py: fix IPv4Address object has no attribute 'split' on line 322
  Add str() conversion before calling .split('.')
- entities.py: fix Date vs DateTime comparison — log_date is a Date column,
  comparing against now()-INTERVAL HOUR caused yesterday's entries to be excluded
  Use toDate(now() - INTERVAL X HOUR) for correct Date-level comparison

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
SOC Analyst
2026-03-15 23:10:35 +01:00
parent 8d35b91642
commit 1455e04303
50 changed files with 5442 additions and 7325 deletions

View File

@ -318,7 +318,7 @@ async def analyze_ja4(ip: str):
from collections import defaultdict
subnet_counts = defaultdict(int)
for row in subnets_result.result_rows:
ip_addr = row[0]
ip_addr = str(row[0])
parts = ip_addr.split('.')
if len(parts) == 4:
subnet = f"{parts[0]}.{parts[1]}.{parts[2]}.0/24"

View File

@ -45,7 +45,7 @@ def get_entity_stats(entity_type: str, entity_value: str, hours: int = 24) -> Op
FROM mabase_prod.view_dashboard_entities
WHERE entity_type = %(entity_type)s
AND entity_value = %(entity_value)s
AND log_date >= now() - INTERVAL %(hours)s HOUR
AND log_date >= toDate(now() - INTERVAL %(hours)s HOUR)
GROUP BY entity_type, entity_value
"""
@ -76,11 +76,11 @@ def get_related_attributes(entity_type: str, entity_value: str, hours: int = 24)
# Requête pour agréger tous les attributs associés
query = """
SELECT
(SELECT groupUniqArray(toString(src_ip)) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= now() - INTERVAL %(hours)s HOUR) as ips,
(SELECT groupUniqArray(ja4) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= now() - INTERVAL %(hours)s HOUR AND ja4 != '') as ja4s,
(SELECT groupUniqArray(host) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= now() - INTERVAL %(hours)s HOUR AND host != '') as hosts,
(SELECT groupUniqArrayArray(asns) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= now() - INTERVAL %(hours)s HOUR AND notEmpty(asns)) as asns,
(SELECT groupUniqArrayArray(countries) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= now() - INTERVAL %(hours)s HOUR AND notEmpty(countries)) as countries
(SELECT groupUniqArray(toString(src_ip)) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= toDate(now() - INTERVAL %(hours)s HOUR)) as ips,
(SELECT groupUniqArray(ja4) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= toDate(now() - INTERVAL %(hours)s HOUR) AND ja4 != '') as ja4s,
(SELECT groupUniqArray(host) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= toDate(now() - INTERVAL %(hours)s HOUR) AND host != '') as hosts,
(SELECT groupUniqArrayArray(asns) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= toDate(now() - INTERVAL %(hours)s HOUR) AND notEmpty(asns)) as asns,
(SELECT groupUniqArrayArray(countries) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= toDate(now() - INTERVAL %(hours)s HOUR) AND notEmpty(countries)) as countries
"""
result = db.connect().query(query, {
@ -123,7 +123,7 @@ def get_array_values(entity_type: str, entity_value: str, array_field: str, hour
FROM mabase_prod.view_dashboard_entities
WHERE entity_type = %(entity_type)s
AND entity_value = %(entity_value)s
AND log_date >= now() - INTERVAL %(hours)s HOUR
AND log_date >= toDate(now() - INTERVAL %(hours)s HOUR)
AND notEmpty({array_field})
)
GROUP BY value
@ -193,7 +193,7 @@ async def get_subnet_investigation(
arrayJoin(user_agents) AS user_agent
FROM view_dashboard_entities
WHERE entity_type = 'ip'
AND log_date >= now() - INTERVAL %(hours)s HOUR
AND log_date >= toDate(now() - INTERVAL %(hours)s HOUR)
AND splitByChar('.', entity_value)[1] = %(subnet_prefix)s
AND splitByChar('.', entity_value)[2] = %(subnet_mask)s
AND splitByChar('.', entity_value)[3] = %(subnet_third)s

View File

@ -0,0 +1,737 @@
"""
Endpoints pour l'analyse des fingerprints JA4 et User-Agents
Objectifs:
- Détecter le spoofing JA4 (fingerprint TLS qui prétend être un navigateur mais
dont les User-Agents, les headers HTTP ou les métriques comportementales trahissent
une origine bot/script)
- Construire une matrice JA4 × User-Agent pour visualiser les associations suspectes
- Analyser la distribution des User-Agents pour identifier les rotateurs et les bots
qui usurpent des UA de navigateurs légitimes
"""
from fastapi import APIRouter, HTTPException, Query
from typing import Optional
import re
from ..database import db
router = APIRouter(prefix="/api/fingerprints", tags=["fingerprints"])
# ─── Helpers ──────────────────────────────────────────────────────────────────
# Patterns indiquant clairement un bot/script sans simulation de navigateur
_BOT_PATTERNS = re.compile(
r"bot|crawler|spider|scraper|python|curl|wget|go-http|java/|axios|"
r"libwww|httpclient|okhttp|requests|aiohttp|httpx|playwright|puppeteer|"
r"selenium|headless|phantomjs",
re.IGNORECASE,
)
# Navigateurs légitimes communs — un JA4 de type "browser" devrait venir avec ces UAs
_BROWSER_PATTERNS = re.compile(
r"mozilla|chrome|safari|firefox|edge|opera|trident",
re.IGNORECASE,
)
def _classify_ua(ua: str) -> str:
"""Retourne 'bot', 'browser', ou 'script'"""
if not ua:
return "empty"
if _BOT_PATTERNS.search(ua):
return "bot"
if _BROWSER_PATTERNS.search(ua):
return "browser"
return "script"
# =============================================================================
# ENDPOINT 1 — Détection de spoofing JA4
# =============================================================================
@router.get("/spoofing")
async def get_ja4_spoofing(
hours: int = Query(24, ge=1, le=168, description="Fenêtre temporelle"),
min_detections: int = Query(10, ge=1, description="Nombre minimum de détections"),
limit: int = Query(50, ge=1, le=200),
):
"""
Identifie les JA4 fingerprints suspects de spoofing navigateur.
Un JA4 est considéré suspect quand:
- Il présente un taux élevé de ua_ch_mismatch (header UA ≠ Client Hints)
- Son modern_browser_score est élevé mais les UAs associés sont des bots/scripts
- Il apparaît avec un taux élevé de sni_host_mismatch ou alpn_http_mismatch
- is_rare_ja4 = true avec un volume important
Retourne un score de confiance de spoofing [0-100] pour chaque JA4.
"""
try:
# Agrégation par JA4 avec tous les indicateurs de spoofing
query = """
SELECT
ja4,
count() AS total_detections,
uniq(src_ip) AS unique_ips,
-- Indicateurs de mismatch
countIf(ua_ch_mismatch = true) AS ua_ch_mismatch_count,
round(countIf(ua_ch_mismatch = true) * 100.0 / count(), 2) AS ua_ch_mismatch_pct,
countIf(sni_host_mismatch = true) AS sni_mismatch_count,
round(countIf(sni_host_mismatch = true) * 100.0 / count(), 2) AS sni_mismatch_pct,
countIf(alpn_http_mismatch = true) AS alpn_mismatch_count,
round(countIf(alpn_http_mismatch = true) * 100.0 / count(), 2) AS alpn_mismatch_pct,
-- Indicateurs comportementaux
avg(modern_browser_score) AS avg_browser_score,
countIf(is_rare_ja4 = true) AS rare_ja4_count,
round(countIf(is_rare_ja4 = true) * 100.0 / count(), 2) AS rare_ja4_pct,
countIf(is_ua_rotating = true) AS ua_rotating_count,
round(countIf(is_ua_rotating = true) * 100.0 / count(), 2) AS ua_rotating_pct,
-- Métriques TLS/TCP
countIf(is_alpn_missing = true) AS alpn_missing_count,
avg(distinct_ja4_count) AS avg_distinct_ja4_per_ip,
-- Répartition threat levels
countIf(threat_level = 'CRITICAL') AS critical_count,
countIf(threat_level = 'HIGH') AS high_count,
-- Botnet indicators
avg(ja4_asn_concentration) AS avg_asn_concentration,
avg(ja4_country_concentration) AS avg_country_concentration,
argMax(threat_level, detected_at) AS last_threat_level
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL %(hours)s HOUR
AND ja4 != '' AND ja4 IS NOT NULL
GROUP BY ja4
HAVING total_detections >= %(min_detections)s
ORDER BY ua_ch_mismatch_pct DESC, total_detections DESC
LIMIT %(limit)s
"""
result = db.query(query, {
"hours": hours,
"min_detections": min_detections,
"limit": limit,
})
# Fetch top UA per JA4 from view_dashboard_user_agents
ja4_list = [str(r[0]) for r in result.result_rows if r[0]]
ua_by_ja4: dict = {}
if ja4_list:
ja4_sql = ", ".join(f"'{j}'" for j in ja4_list[:100])
ua_q = f"""
SELECT ja4, groupArray(5)(ua) AS top_uas
FROM (
SELECT ja4, arrayJoin(user_agents) AS ua, sum(requests) AS cnt
FROM view_dashboard_user_agents
WHERE ja4 IN ({ja4_sql})
AND hour >= now() - INTERVAL {hours} HOUR
AND ua != ''
GROUP BY ja4, ua
ORDER BY ja4, cnt DESC
)
GROUP BY ja4
"""
try:
ua_res = db.query(ua_q)
for ua_row in ua_res.result_rows:
j4 = str(ua_row[0])
if ua_row[1]:
ua_by_ja4[j4] = list(ua_row[1])
except Exception:
pass
items = []
for row in result.result_rows:
ja4 = str(row[0])
ua_ch_mismatch_pct = float(row[4] or 0)
sni_mismatch_pct = float(row[6] or 0)
alpn_mismatch_pct = float(row[8] or 0)
avg_browser_score = float(row[9] or 0)
rare_ja4_pct = float(row[11] or 0)
ua_rotating_pct = float(row[13] or 0)
alpn_missing_count = int(row[14] or 0)
total = int(row[1] or 1)
top_uas = ua_by_ja4.get(ja4, [])
ua_classes = [_classify_ua(u) for u in top_uas]
has_bot_ua = any(c == "bot" for c in ua_classes)
has_browser_ua = any(c == "browser" for c in ua_classes)
# Spoofing confidence score [0-100]:
# UA/CH mismatch est le signal le plus fort (poids 40)
# Browser UA avec score navigateur élevé mais indicateurs bot (poids 25)
# SNI/ALPN mismatches (poids 15)
# is_rare_ja4 avec gros volume (poids 10)
# UA rotating (poids 10)
spoof_score = min(100, round(
ua_ch_mismatch_pct * 0.40
+ (avg_browser_score * 25 / 100 if has_bot_ua else 0)
+ sni_mismatch_pct * 0.10
+ alpn_mismatch_pct * 0.05
+ rare_ja4_pct * 0.10
+ ua_rotating_pct * 0.10
+ (10 if alpn_missing_count > total * 0.3 else 0)
))
# Classification du JA4
if spoof_score >= 60:
classification = "spoofed_browser"
elif has_bot_ua and avg_browser_score < 30:
classification = "known_bot"
elif has_browser_ua and ua_ch_mismatch_pct < 10:
classification = "legitimate_browser"
else:
classification = "suspicious"
items.append({
"ja4": ja4,
"classification": classification,
"spoofing_score": spoof_score,
"total_detections": int(row[1] or 0),
"unique_ips": int(row[2] or 0),
"indicators": {
"ua_ch_mismatch_pct": ua_ch_mismatch_pct,
"sni_mismatch_pct": sni_mismatch_pct,
"alpn_mismatch_pct": alpn_mismatch_pct,
"avg_browser_score": round(avg_browser_score, 1),
"rare_ja4_pct": rare_ja4_pct,
"ua_rotating_pct": ua_rotating_pct,
"alpn_missing_count": alpn_missing_count,
"avg_asn_concentration": round(float(row[18] or 0), 3),
"avg_country_concentration": round(float(row[19] or 0), 3),
},
"top_user_agents": [
{"ua": u, "type": _classify_ua(u)} for u in top_uas
],
"threat_breakdown": {
"critical": int(row[16] or 0),
"high": int(row[17] or 0),
"last_level": str(row[20] or "LOW"),
},
})
# Trier: spoofed_browser d'abord, puis par score
items.sort(key=lambda x: (-x["spoofing_score"], -x["total_detections"]))
return {
"items": items,
"total": len(items),
"period_hours": hours,
"summary": {
"spoofed_browser": sum(1 for i in items if i["classification"] == "spoofed_browser"),
"known_bot": sum(1 for i in items if i["classification"] == "known_bot"),
"suspicious": sum(1 for i in items if i["classification"] == "suspicious"),
"legitimate_browser": sum(1 for i in items if i["classification"] == "legitimate_browser"),
},
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
# =============================================================================
# ENDPOINT 2 — Matrice JA4 × User-Agent
# =============================================================================
@router.get("/ja4-ua-matrix")
async def get_ja4_ua_matrix(
hours: int = Query(24, ge=1, le=168),
min_ips: int = Query(3, ge=1, description="Nombre minimum d'IPs pour inclure un JA4"),
limit: int = Query(30, ge=1, le=100),
):
"""
Matrice JA4 × User-Agent.
Pour chaque JA4:
- Top User-Agents associés (depuis view_dashboard_entities)
- Taux de ua_ch_mismatch
- Classification UA (bot / browser / script)
- Indicateur de spoofing si browser_score élevé + UA non-navigateur
"""
try:
# Stats JA4 depuis ml_detected_anomalies
stats_query = """
SELECT
ja4,
uniq(src_ip) AS unique_ips,
count() AS total_detections,
round(countIf(ua_ch_mismatch = true) * 100.0 / count(), 2) AS ua_ch_mismatch_pct,
avg(modern_browser_score) AS avg_browser_score,
countIf(is_rare_ja4 = true) AS rare_count,
countIf(is_ua_rotating = true) AS rotating_count,
argMax(threat_level, detected_at) AS last_threat
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL %(hours)s HOUR
AND ja4 != '' AND ja4 IS NOT NULL
GROUP BY ja4
HAVING unique_ips >= %(min_ips)s
ORDER BY ua_ch_mismatch_pct DESC, unique_ips DESC
LIMIT %(limit)s
"""
stats_res = db.query(stats_query, {"hours": hours, "min_ips": min_ips, "limit": limit})
ja4_list = [str(r[0]) for r in stats_res.result_rows]
if not ja4_list:
return {"items": [], "total": 0, "period_hours": hours}
# UAs par JA4 depuis view_dashboard_user_agents
ja4_sql = ", ".join(f"'{j}'" for j in ja4_list)
ua_query = f"""
SELECT
ja4,
ua,
sum(requests) AS cnt
FROM view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE ja4 IN ({ja4_sql})
AND hour >= now() - INTERVAL {hours} HOUR
AND ua != ''
GROUP BY ja4, ua
ORDER BY ja4, cnt DESC
"""
ua_by_ja4: dict = {}
try:
ua_res = db.query(ua_query)
for row in ua_res.result_rows:
j4 = str(row[0])
if j4 not in ua_by_ja4:
ua_by_ja4[j4] = []
if len(ua_by_ja4[j4]) < 8:
ua_by_ja4[j4].append({"ua": str(row[1]), "count": int(row[2] or 0)})
except Exception:
pass
items = []
for row in stats_res.result_rows:
ja4 = str(row[0])
unique_ips = int(row[1] or 0)
ua_ch_mismatch_pct = float(row[3] or 0)
avg_browser_score = float(row[4] or 0)
top_uas = ua_by_ja4.get(ja4, [])
ua_total = sum(u["count"] for u in top_uas) or 1
classified_uas = []
for u in top_uas:
ua_type = _classify_ua(u["ua"])
classified_uas.append({
"ua": u["ua"],
"count": u["count"],
"pct": round(u["count"] * 100 / ua_total, 1),
"type": ua_type,
})
bot_pct = sum(u["pct"] for u in classified_uas if u["type"] == "bot")
browser_pct = sum(u["pct"] for u in classified_uas if u["type"] == "browser")
# Spoofing flag: JA4 ressemble à un navigateur (browser_score élevé)
# mais les UAs sont des bots/scripts
is_spoofing = avg_browser_score > 50 and bot_pct > 30 and ua_ch_mismatch_pct > 20
items.append({
"ja4": ja4,
"unique_ips": unique_ips,
"total_detections": int(row[2] or 0),
"ua_ch_mismatch_pct": ua_ch_mismatch_pct,
"avg_browser_score": round(avg_browser_score, 1),
"rare_count": int(row[5] or 0),
"rotating_count": int(row[6] or 0),
"last_threat": str(row[7] or "LOW"),
"user_agents": classified_uas,
"ua_summary": {
"bot_pct": round(bot_pct, 1),
"browser_pct": round(browser_pct, 1),
"script_pct": round(100 - bot_pct - browser_pct, 1),
"total_distinct": len(top_uas),
},
"is_spoofing_suspect": is_spoofing,
})
return {
"items": items,
"total": len(items),
"period_hours": hours,
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
# =============================================================================
# ENDPOINT 3 — Analyse globale des User-Agents
# =============================================================================
@router.get("/ua-analysis")
async def get_ua_analysis(
hours: int = Query(24, ge=1, le=168),
limit: int = Query(50, ge=1, le=200),
):
"""
Analyse globale des User-Agents dans les détections.
Identifie:
- UAs de type bot/script
- UAs browser légitimes vs UAs browser utilisés par des bots (via ua_ch_mismatch)
- UAs rares/suspects qui tournent (is_ua_rotating)
- Distribution JA4 par UA pour détecter les UAs multi-fingerprints (rotation)
"""
try:
# Top UAs globaux depuis view_dashboard_user_agents
ua_global_query = """
SELECT
ua,
sum(requests) AS ip_count
FROM view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE hour >= now() - INTERVAL %(hours)s HOUR
AND ua != ''
GROUP BY ua
ORDER BY ip_count DESC
LIMIT %(limit)s
"""
ua_global_res = db.query(ua_global_query, {"hours": hours, "limit": limit})
top_uas = [str(r[0]) for r in ua_global_res.result_rows]
# Pour chaque UA, chercher ses JA4 via view_dashboard_user_agents
ua_sql = ", ".join(f"'{u.replace(chr(39), chr(39)*2)}'" for u in top_uas[:50]) if top_uas else "''"
ja4_per_ua_query = f"""
SELECT
ua,
uniq(ja4) AS unique_ja4s,
groupUniqArray(3)(ja4) AS sample_ja4s
FROM view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE ua IN ({ua_sql})
AND hour >= now() - INTERVAL {hours} HOUR
AND ua != ''
AND ja4 != ''
GROUP BY ua
"""
ja4_by_ua: dict = {}
try:
ja4_res = db.query(ja4_per_ua_query)
for r in ja4_res.result_rows:
ja4_by_ua[str(r[0])] = {
"unique_ja4s": int(r[1] or 0),
"sample_ja4s": list(r[2] or []),
}
except Exception:
pass
# IPs avec is_ua_rotating depuis ml_detected_anomalies
rotating_query = """
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS clean_ip,
avg(ua_ch_mismatch) AS avg_ua_ch_mismatch
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL %(hours)s HOUR
AND is_ua_rotating = true
GROUP BY clean_ip
ORDER BY avg_ua_ch_mismatch DESC
LIMIT 100
"""
rotating_ips: list = []
try:
rot_res = db.query(rotating_query, {"hours": hours})
rotating_ips = [str(r[0]) for r in rot_res.result_rows]
except Exception:
pass
# Construire la réponse
items = []
for row in ua_global_res.result_rows:
ua = str(row[0])
ip_count = int(row[1] or 0)
ua_type = _classify_ua(ua)
ja4_info = ja4_by_ua.get(ua, {"unique_ja4s": 0, "sample_ja4s": []})
# UA multi-JA4 est suspect: un vrai navigateur a généralement 1-2 JA4
multi_ja4_flag = ja4_info["unique_ja4s"] > 3
items.append({
"user_agent": ua,
"type": ua_type,
"ip_count": ip_count,
"unique_ja4_count": ja4_info["unique_ja4s"],
"sample_ja4s": ja4_info["sample_ja4s"],
"is_multi_ja4_suspect": multi_ja4_flag,
"risk_flags": _build_ua_risk_flags(ua, ua_type, ja4_info["unique_ja4s"], ip_count),
})
# IPs avec rotation d'UA
ua_rotating_stats = {
"rotating_ip_count": len(rotating_ips),
"sample_rotating_ips": rotating_ips[:10],
}
return {
"items": items,
"total": len(items),
"period_hours": hours,
"ua_rotating_stats": ua_rotating_stats,
"summary": {
"bot_count": sum(1 for i in items if i["type"] == "bot"),
"browser_count": sum(1 for i in items if i["type"] == "browser"),
"script_count": sum(1 for i in items if i["type"] == "script"),
"multi_ja4_suspect_count": sum(1 for i in items if i["is_multi_ja4_suspect"]),
},
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
def _build_ua_risk_flags(ua: str, ua_type: str, unique_ja4s: int, ip_count: int) -> list:
flags = []
if ua_type == "bot":
flags.append("ua_bot_signature")
elif ua_type == "script":
flags.append("ua_script_library")
if unique_ja4s > 5:
flags.append("ja4_rotation_suspect")
if unique_ja4s > 3 and ua_type == "browser":
flags.append("browser_ua_multi_fingerprint")
if ip_count > 100:
flags.append("high_volume")
return flags
# =============================================================================
# ENDPOINT 4 — JA4 d'un IP spécifique: analyse de cohérence UA/JA4
# =============================================================================
@router.get("/ip/{ip}/coherence")
async def get_ip_fingerprint_coherence(ip: str):
"""
Analyse la cohérence JA4/UA pour une IP spécifique.
Répond à la question: "Cette IP spoofait-elle son fingerprint?"
Calcule un score de cohérence basé sur:
- Correspondance entre JA4 (TLS client fingerprint) et User-Agent
- ua_ch_mismatch (User-Agent vs Client Hints)
- modern_browser_score vs type d'UA réel
- Nombre de JA4 distincts utilisés (rotation)
- sni_host_mismatch, alpn_http_mismatch
"""
try:
# Données depuis ml_detected_anomalies
ml_query = """
SELECT
ja4,
ua_ch_mismatch,
modern_browser_score,
sni_host_mismatch,
alpn_http_mismatch,
is_alpn_missing,
is_rare_ja4,
is_ua_rotating,
distinct_ja4_count,
header_count,
has_accept_language,
has_cookie,
has_referer,
header_order_shared_count,
detected_at,
threat_level,
window_mss_ratio,
tcp_jitter_variance,
multiplexing_efficiency
FROM ml_detected_anomalies
WHERE src_ip = %(ip)s
ORDER BY detected_at DESC
LIMIT 20
"""
ml_res = db.query(ml_query, {"ip": ip})
if not ml_res.result_rows:
raise HTTPException(status_code=404, detail="IP non trouvée dans les détections")
# User-agents réels depuis view_dashboard_user_agents
ua_query = """
SELECT ua, sum(requests) AS cnt
FROM view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE toString(src_ip) = %(ip)s
AND hour >= now() - INTERVAL 72 HOUR
AND ua != ''
GROUP BY ua ORDER BY cnt DESC LIMIT 10
"""
ua_res = db.query(ua_query, {"ip": ip})
top_uas = [{"ua": str(r[0]), "count": int(r[1] or 0), "type": _classify_ua(str(r[0]))}
for r in ua_res.result_rows]
# Agréger les indicateurs de la dernière session
rows = ml_res.result_rows
latest = rows[0]
total_rows = len(rows)
ua_ch_mismatch_count = sum(1 for r in rows if r[1])
sni_mismatch_count = sum(1 for r in rows if r[3])
alpn_mismatch_count = sum(1 for r in rows if r[4])
is_rare_count = sum(1 for r in rows if r[6])
is_rotating = any(r[7] for r in rows)
distinct_ja4s = {str(r[0]) for r in rows if r[0]}
avg_browser_score = sum(int(r[2] or 0) for r in rows) / total_rows
# UA analysis
has_browser_ua = any(u["type"] == "browser" for u in top_uas)
has_bot_ua = any(u["type"] == "bot" for u in top_uas)
primary_ua_type = top_uas[0]["type"] if top_uas else "empty"
# Calcul du score de spoofing
spoof_score = min(100, round(
(ua_ch_mismatch_count / total_rows * 100) * 0.40
+ (avg_browser_score * 0.20 if has_bot_ua else 0)
+ (sni_mismatch_count / total_rows * 100) * 0.10
+ (alpn_mismatch_count / total_rows * 100) * 0.05
+ (len(distinct_ja4s) * 5 if len(distinct_ja4s) > 2 else 0)
+ (15 if is_rotating else 0)
+ (10 if is_rare_count > total_rows * 0.5 else 0)
))
# Verdict
if spoof_score >= 70:
verdict = "high_confidence_spoofing"
elif spoof_score >= 40:
verdict = "suspicious_spoofing"
elif has_bot_ua and avg_browser_score < 20:
verdict = "known_bot_no_spoofing"
elif has_browser_ua and spoof_score < 20:
verdict = "legitimate_browser"
else:
verdict = "inconclusive"
# Explication humaine
explanation = []
if ua_ch_mismatch_count > total_rows * 0.3:
explanation.append(f"UA-Client-Hints mismatch sur {round(ua_ch_mismatch_count*100/total_rows)}% des requêtes")
if has_bot_ua and avg_browser_score > 40:
explanation.append(f"JA4 ressemble à un navigateur (score {round(avg_browser_score)}/100) mais UA est de type bot")
if len(distinct_ja4s) > 2:
explanation.append(f"{len(distinct_ja4s)} JA4 distincts utilisés → rotation de fingerprint")
if is_rotating:
explanation.append("is_ua_rotating détecté → rotation d'User-Agent confirmée")
if sni_mismatch_count > 0:
explanation.append(f"SNI ≠ Host header sur {sni_mismatch_count}/{total_rows} requêtes")
if not explanation:
explanation.append("Aucun indicateur de spoofing majeur détecté")
return {
"ip": ip,
"verdict": verdict,
"spoofing_score": spoof_score,
"explanation": explanation,
"indicators": {
"ua_ch_mismatch_rate": round(ua_ch_mismatch_count / total_rows * 100, 1),
"sni_mismatch_rate": round(sni_mismatch_count / total_rows * 100, 1),
"alpn_mismatch_rate": round(alpn_mismatch_count / total_rows * 100, 1),
"avg_browser_score": round(avg_browser_score, 1),
"distinct_ja4_count": len(distinct_ja4s),
"is_ua_rotating": is_rotating,
"rare_ja4_rate": round(is_rare_count / total_rows * 100, 1),
},
"fingerprints": {
"ja4_list": list(distinct_ja4s),
"latest_ja4": str(latest[0] or ""),
},
"user_agents": top_uas,
"latest_detection": {
"detected_at": latest[14].isoformat() if latest[14] else "",
"threat_level": str(latest[15] or "LOW"),
"modern_browser_score": int(latest[2] or 0),
"header_count": int(latest[9] or 0),
"has_accept_language": bool(latest[10]),
"has_cookie": bool(latest[11]),
"has_referer": bool(latest[12]),
"header_order_shared_count": int(latest[13] or 0),
},
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
# =============================================================================
# ENDPOINT 5 — JA4 légitimes (baseline / whitelist)
# =============================================================================
@router.get("/legitimate-ja4")
async def get_legitimate_ja4(
hours: int = Query(168, ge=24, le=720, description="Fenêtre pour établir la baseline"),
min_ips: int = Query(50, ge=5, description="Nombre minimum d'IPs pour qualifier un JA4 de légitime"),
):
"""
Établit une baseline des JA4 fingerprints légitimes.
Un JA4 est considéré légitime si:
- Il est utilisé par un grand nombre d'IPs distinctes (> min_ips)
- Son taux de ua_ch_mismatch est faible (< 5%)
- Son modern_browser_score est élevé (> 60)
- Il n'est PAS is_rare_ja4
- Ses UAs sont dominés par des navigateurs connus
Utile comme whitelist pour réduire les faux positifs.
"""
try:
query = """
SELECT
ja4,
uniq(src_ip) AS unique_ips,
count() AS total_detections,
round(countIf(ua_ch_mismatch = true) * 100.0 / count(), 2) AS ua_ch_mismatch_pct,
avg(modern_browser_score) AS avg_browser_score,
countIf(is_rare_ja4 = true) AS rare_count,
round(countIf(threat_level = 'CRITICAL') * 100.0 / count(), 2) AS critical_pct,
round(countIf(threat_level = 'HIGH') * 100.0 / count(), 2) AS high_pct
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL %(hours)s HOUR
AND ja4 != '' AND ja4 IS NOT NULL
GROUP BY ja4
HAVING unique_ips >= %(min_ips)s
AND ua_ch_mismatch_pct < 5.0
AND avg_browser_score > 60
AND rare_count = 0
ORDER BY unique_ips DESC
LIMIT 100
"""
result = db.query(query, {"hours": hours, "min_ips": min_ips})
items = [
{
"ja4": str(row[0]),
"unique_ips": int(row[1] or 0),
"total_detections": int(row[2] or 0),
"ua_ch_mismatch_pct": float(row[3] or 0),
"avg_browser_score": round(float(row[4] or 0), 1),
"critical_pct": float(row[6] or 0),
"high_pct": float(row[7] or 0),
"legitimacy_confidence": min(100, round(
(1 - float(row[3] or 0) / 100) * 40
+ float(row[4] or 0) * 0.40
+ min(int(row[1] or 0) / min_ips, 1) * 20
)),
}
for row in result.result_rows
]
return {
"items": items,
"total": len(items),
"period_hours": hours,
"note": "Ces JA4 sont candidats à une whitelist. Vérifier manuellement avant de whitelister.",
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")

View File

@ -81,25 +81,94 @@ async def get_incident_clusters(
result = db.query(cluster_query, {"hours": hours, "limit": limit})
# Collect sample IPs to fetch real UA and trend data in bulk
sample_ips = [row[10] for row in result.result_rows if row[10]]
subnets_list = [row[0] for row in result.result_rows]
# Fetch real primary UA per sample IP from view_dashboard_entities
ua_by_ip: dict = {}
if sample_ips:
ip_list_sql = ", ".join(f"'{ip}'" for ip in sample_ips[:50])
ua_query = f"""
SELECT entity_value, arrayElement(user_agents, 1) AS top_ua
FROM view_dashboard_entities
WHERE entity_type = 'ip'
AND entity_value IN ({ip_list_sql})
AND notEmpty(user_agents)
GROUP BY entity_value, top_ua
ORDER BY entity_value
"""
try:
ua_result = db.query(ua_query)
for ua_row in ua_result.result_rows:
if ua_row[0] not in ua_by_ip and ua_row[1]:
ua_by_ip[str(ua_row[0])] = str(ua_row[1])
except Exception:
pass # UA enrichment is best-effort
# Compute real trend: compare current window vs previous window of same duration
trend_query = """
WITH cleaned AS (
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS clean_ip,
detected_at,
concat(
splitByChar('.', clean_ip)[1], '.',
splitByChar('.', clean_ip)[2], '.',
splitByChar('.', clean_ip)[3], '.0/24'
) AS subnet
FROM ml_detected_anomalies
),
current_window AS (
SELECT subnet, count() AS cnt
FROM cleaned
WHERE detected_at >= now() - INTERVAL %(hours)s HOUR
GROUP BY subnet
),
prev_window AS (
SELECT subnet, count() AS cnt
FROM cleaned
WHERE detected_at >= now() - INTERVAL %(hours2)s HOUR
AND detected_at < now() - INTERVAL %(hours)s HOUR
GROUP BY subnet
)
SELECT c.subnet, c.cnt AS current_cnt, p.cnt AS prev_cnt
FROM current_window c
LEFT JOIN prev_window p ON c.subnet = p.subnet
"""
trend_by_subnet: dict = {}
try:
trend_result = db.query(trend_query, {"hours": hours, "hours2": hours * 2})
for tr in trend_result.result_rows:
subnet_key = tr[0]
curr = tr[1] or 0
prev = tr[2] or 0
if prev == 0:
trend_by_subnet[subnet_key] = ("new", 100)
else:
pct = round(((curr - prev) / prev) * 100)
trend_by_subnet[subnet_key] = ("up" if pct >= 0 else "down", abs(pct))
except Exception:
pass
clusters = []
for row in result.result_rows:
# Calcul du score de risque
subnet = row[0]
threat_level = row[8] or 'LOW'
unique_ips = row[2] or 1
avg_score = abs(row[9] or 0)
# Score based on threat level and other factors
sample_ip = row[10] if row[10] else subnet.split('/')[0]
critical_count = 1 if threat_level == 'CRITICAL' else 0
high_count = 1 if threat_level == 'HIGH' else 0
risk_score = min(100, round(
(critical_count * 30) +
(high_count * 20) +
(unique_ips * 5) +
(critical_count * 30) +
(high_count * 20) +
(unique_ips * 5) +
(avg_score * 100)
))
# Détermination de la sévérité
if critical_count > 0 or risk_score >= 80:
severity = "CRITICAL"
elif high_count > (row[1] or 1) * 0.3 or risk_score >= 60:
@ -108,31 +177,27 @@ async def get_incident_clusters(
severity = "MEDIUM"
else:
severity = "LOW"
# Calcul de la tendance
trend = "up"
trend_percentage = 23
trend_dir, trend_pct = trend_by_subnet.get(subnet, ("stable", 0))
primary_ua = ua_by_ip.get(sample_ip, "")
clusters.append({
"id": f"INC-{datetime.now().strftime('%Y%m%d')}-{len(clusters)+1:03d}",
"score": risk_score,
"severity": severity,
"total_detections": row[1],
"unique_ips": row[2],
"subnet": row[0],
"sample_ip": row[10] if row[10] else row[0].split('/')[0],
"subnet": subnet,
"sample_ip": sample_ip,
"ja4": row[5] or "",
"primary_ua": "python-requests",
"primary_target": "Unknown",
"countries": [{
"code": row[6] or "XX",
"percentage": 100
}],
"primary_ua": primary_ua,
"primary_target": row[3].strftime('%H:%M') if row[3] else "Unknown",
"countries": [{"code": row[6] or "XX", "percentage": 100}],
"asn": str(row[7]) if row[7] else "",
"first_seen": row[3].isoformat() if row[3] else "",
"last_seen": row[4].isoformat() if row[4] else "",
"trend": trend,
"trend_percentage": trend_percentage
"trend": trend_dir,
"trend_percentage": trend_pct,
})
return {

View File

@ -103,7 +103,7 @@ async def get_associated_attributes(
# Mapping des attributs cibles
target_column_map = {
"user_agents": "''", # Pas de user_agent
"user_agents": None, # handled separately via view_dashboard_entities
"ja4": "ja4",
"countries": "country_code",
"asns": "asn_number",
@ -122,9 +122,33 @@ async def get_associated_attributes(
column = type_column_map[attr_type]
target_column = target_column_map[target_attr]
# Pour user_agent, retourne liste vide
if target_column == "''":
return {"type": attr_type, "value": value, "target": target_attr, "items": [], "total": 0}
# Pour user_agents: requête via view_dashboard_user_agents
# Colonnes: src_ip, ja4, hour, log_date, user_agents, requests
if target_column is None:
if attr_type == "ip":
ua_where = "toString(src_ip) = %(value)s"
elif attr_type == "ja4":
ua_where = "ja4 = %(value)s"
else:
# country/asn/host: pivot via ml_detected_anomalies
ua_where = f"""toString(src_ip) IN (
SELECT DISTINCT replaceRegexpAll(toString(src_ip), '^::ffff:', '')
FROM ml_detected_anomalies
WHERE {column} = %(value)s AND detected_at >= now() - INTERVAL 24 HOUR
)"""
ua_q = f"""
SELECT ua AS value, sum(requests) AS count,
round(sum(requests) * 100.0 / sum(sum(requests)) OVER (), 2) AS percentage
FROM view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE {ua_where}
AND hour >= now() - INTERVAL 24 HOUR AND ua != ''
GROUP BY value ORDER BY count DESC LIMIT %(limit)s
"""
ua_result = db.query(ua_q, {"value": value, "limit": limit})
items = [{"value": str(r[0]), "count": r[1], "percentage": round(float(r[2]), 2) if r[2] else 0.0}
for r in ua_result.result_rows]
return {"type": attr_type, "value": value, "target": target_attr, "items": items, "total": len(items), "showing": len(items)}
query = f"""
SELECT
@ -193,8 +217,8 @@ async def get_user_agents(
type_column_map = {
"ip": "src_ip",
"ja4": "ja4",
"country": "src_country_code",
"asn": "src_asn",
"country": "country_code",
"asn": "asn_number",
"host": "host",
}
@ -206,25 +230,51 @@ async def get_user_agents(
column = type_column_map[attr_type]
# Requête sur la vue materialisée
# user_agents est un Array, on utilise arrayJoin pour l'aplatir
# view_dashboard_user_agents colonnes: src_ip, ja4, hour, log_date, user_agents, requests
if attr_type == "ip":
where = "toString(src_ip) = %(value)s"
params: dict = {"value": value, "limit": limit}
elif attr_type == "ja4":
where = "ja4 = %(value)s"
params = {"value": value, "limit": limit}
else:
# country / asn / host: pivot via ml_detected_anomalies → IPs connus → vue par src_ip
ml_col = {"country": "country_code", "asn": "asn_number", "host": "host"}[attr_type]
where = f"""toString(src_ip) IN (
SELECT DISTINCT replaceRegexpAll(toString(src_ip), '^::ffff:', '')
FROM ml_detected_anomalies
WHERE {ml_col} = %(value)s
AND detected_at >= now() - INTERVAL 24 HOUR
)"""
params = {"value": value, "limit": limit}
query = f"""
SELECT
ua AS user_agent,
sum(requests) AS count,
round(count * 100.0 / sum(count) OVER (), 2) AS percentage,
min(hour) AS first_seen,
max(hour) AS last_seen
FROM mabase_prod.view_dashboard_user_agents
round(sum(requests) * 100.0 / sum(sum(requests)) OVER (), 2) AS percentage,
min(log_date) AS first_seen,
max(log_date) AS last_seen
FROM view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE {column} = %(value)s
WHERE {where}
AND hour >= now() - INTERVAL 24 HOUR
AND ua != ''
GROUP BY user_agent
ORDER BY count DESC
LIMIT %(limit)s
"""
result = db.query(query, params)
result = db.query(query, {"value": value, "limit": limit})
count_query = f"""
SELECT uniqExact(ua) AS total
FROM view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE {where}
AND hour >= now() - INTERVAL 24 HOUR
AND ua != ''
"""
count_result = db.query(count_query, params)
user_agents = [
UserAgentValue(
@ -237,16 +287,6 @@ async def get_user_agents(
for row in result.result_rows
]
# Compter le total
count_query = f"""
SELECT uniq(ua) AS total
FROM mabase_prod.view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE {column} = %(value)s
AND hour >= now() - INTERVAL 24 HOUR
"""
count_result = db.query(count_query, {"value": value})
total = count_result.result_rows[0][0] if count_result.result_rows else 0
return {
@ -451,38 +491,41 @@ async def get_variability(attr_type: str, value: str):
first_seen = stats_row[2]
last_seen = stats_row[3]
# User-Agents
ua_query = f"""
SELECT
user_agent,
count() AS count,
round(count() * 100.0 / sum(count()) OVER (), 2) AS percentage,
min(detected_at) AS first_seen,
max(detected_at) AS last_seen,
groupArray((threat_level, 1)) AS threats
FROM ({base_query})
WHERE user_agent != '' AND user_agent IS NOT NULL
GROUP BY user_agent
ORDER BY count DESC
LIMIT 10
"""
# Simplified query without complex threat parsing
# User-Agents via view_dashboard_user_agents (source principale pour les UAs)
# Colonnes disponibles: src_ip, ja4, hour, log_date, user_agents, requests
if attr_type == "ip":
_ua_where = "toString(src_ip) = %(value)s"
_ua_params: dict = {"value": value}
elif attr_type == "ja4":
_ua_where = "ja4 = %(value)s"
_ua_params = {"value": value}
else:
# country / asn / host: pivot via ml_detected_anomalies → IPs
_ua_where = f"""toString(src_ip) IN (
SELECT DISTINCT replaceRegexpAll(toString(src_ip), '^::ffff:', '')
FROM ml_detected_anomalies
WHERE {column} = %(value)s AND detected_at >= now() - INTERVAL 24 HOUR
)"""
_ua_params = {"value": value}
ua_query_simple = f"""
SELECT
user_agent,
count() AS count,
round(count() * 100.0 / (SELECT count() FROM ({base_query}) WHERE user_agent != '' AND user_agent IS NOT NULL), 2) AS percentage,
min(detected_at) AS first_seen,
max(detected_at) AS last_seen
FROM ({base_query})
WHERE user_agent != '' AND user_agent IS NOT NULL
ua AS user_agent,
sum(requests) AS count,
round(sum(requests) * 100.0 / sum(sum(requests)) OVER (), 2) AS percentage,
min(log_date) AS first_seen,
max(log_date) AS last_seen
FROM view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE {_ua_where}
AND hour >= now() - INTERVAL 24 HOUR
AND ua != ''
GROUP BY user_agent
ORDER BY count DESC
LIMIT 10
"""
ua_result = db.query(ua_query_simple, {"value": value})
ua_result = db.query(ua_query_simple, _ua_params)
user_agents = [get_attribute_value(row, 1, 2, 3, 4) for row in ua_result.result_rows]
# JA4 fingerprints