Initial commit: Bot Detector Dashboard for SOC Incident Response

🛡️ Dashboard complet pour l'analyse et la classification des menaces

Fonctionnalités principales:
- Visualisation des détections en temps réel (24h)
- Investigation multi-entités (IP, JA4, ASN, Host, User-Agent)
- Analyse de corrélation pour classification SOC
- Clustering automatique par subnet/JA4/UA
- Export des classifications pour ML

Composants:
- Backend: FastAPI (Python) + ClickHouse
- Frontend: React + TypeScript + TailwindCSS
- 6 routes API: metrics, detections, variability, attributes, analysis, entities
- 7 types d'entités investigables

Documentation ajoutée:
- NAVIGATION_GRAPH.md: Graph complet de navigation
- SOC_OPTIMIZATION_PROPOSAL.md: Proposition d'optimisation pour SOC
  • Réduction de 7 à 2 clics pour classification
  • Nouvelle vue /incidents clusterisée
  • Panel latéral d'investigation
  • Quick Search (Cmd+K)
  • Timeline interactive
  • Graph de corrélations

Sécurité:
- .gitignore configuré (exclut .env, secrets, node_modules)
- Credentials dans .env (à ne pas committer)

⚠️ Audit sécurité réalisé - Voir recommandations dans SOC_OPTIMIZATION_PROPOSAL.md

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
SOC Analyst
2026-03-14 21:33:55 +01:00
commit a61828d1e7
55 changed files with 11189 additions and 0 deletions

View File

@ -0,0 +1 @@
# Routes package

691
backend/routes/analysis.py Normal file
View File

@ -0,0 +1,691 @@
"""
Endpoints pour l'analyse de corrélations et la classification SOC
"""
from fastapi import APIRouter, HTTPException, Query
from typing import Optional, List
from datetime import datetime
import ipaddress
import json
from ..database import db
from ..models import (
SubnetAnalysis, CountryAnalysis, CountryData, JA4Analysis, JA4SubnetData,
UserAgentAnalysis, UserAgentData, CorrelationIndicators,
ClassificationRecommendation, ClassificationLabel,
ClassificationCreate, Classification, ClassificationsListResponse
)
router = APIRouter(prefix="/api/analysis", tags=["analysis"])
# =============================================================================
# ANALYSE SUBNET / ASN
# =============================================================================
@router.get("/{ip}/subnet", response_model=SubnetAnalysis)
async def analyze_subnet(ip: str):
"""
Analyse les IPs du même subnet et ASN
"""
try:
# Calculer le subnet /24
ip_obj = ipaddress.ip_address(ip)
subnet = ipaddress.ip_network(f"{ip}/24", strict=False)
subnet_str = str(subnet)
# Récupérer les infos ASN pour cette IP
asn_query = """
SELECT asn_number, asn_org
FROM ml_detected_anomalies
WHERE src_ip = %(ip)s
ORDER BY detected_at DESC
LIMIT 1
"""
asn_result = db.query(asn_query, {"ip": ip})
if not asn_result.result_rows:
# Fallback: utiliser données par défaut
asn_number = "0"
asn_org = "Unknown"
else:
asn_number = str(asn_result.result_rows[0][0] or "0")
asn_org = asn_result.result_rows[0][1] or "Unknown"
# IPs du même subnet /24
subnet_ips_query = """
SELECT DISTINCT src_ip
FROM ml_detected_anomalies
WHERE toIPv4(src_ip) >= toIPv4(%(subnet_start)s)
AND toIPv4(src_ip) <= toIPv4(%(subnet_end)s)
AND detected_at >= now() - INTERVAL 24 HOUR
ORDER BY src_ip
"""
subnet_result = db.query(subnet_ips_query, {
"subnet_start": str(subnet.network_address),
"subnet_end": str(subnet.broadcast_address)
})
subnet_ips = [str(row[0]) for row in subnet_result.result_rows]
# Total IPs du même ASN
if asn_number != "0":
asn_total_query = """
SELECT uniq(src_ip)
FROM ml_detected_anomalies
WHERE asn_number = %(asn_number)s
AND detected_at >= now() - INTERVAL 24 HOUR
"""
asn_total_result = db.query(asn_total_query, {"asn_number": asn_number})
asn_total = asn_total_result.result_rows[0][0] if asn_total_result.result_rows else 0
else:
asn_total = 0
return SubnetAnalysis(
ip=ip,
subnet=subnet_str,
ips_in_subnet=subnet_ips,
total_in_subnet=len(subnet_ips),
asn_number=asn_number,
asn_org=asn_org,
total_in_asn=asn_total,
alert=len(subnet_ips) > 10
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
@router.get("/{ip}/country", response_model=dict)
async def analyze_ip_country(ip: str):
"""
Analyse le pays d'une IP spécifique et la répartition des autres pays du même ASN
"""
try:
# Pays de l'IP
ip_country_query = """
SELECT country_code, asn_number
FROM ml_detected_anomalies
WHERE src_ip = %(ip)s
ORDER BY detected_at DESC
LIMIT 1
"""
ip_result = db.query(ip_country_query, {"ip": ip})
if not ip_result.result_rows:
return {"ip_country": None, "asn_countries": []}
ip_country_code = ip_result.result_rows[0][0]
asn_number = ip_result.result_rows[0][1]
# Noms des pays
country_names = {
"CN": "China", "US": "United States", "DE": "Germany",
"FR": "France", "RU": "Russia", "GB": "United Kingdom",
"NL": "Netherlands", "IN": "India", "BR": "Brazil",
"JP": "Japan", "KR": "South Korea", "IT": "Italy",
"ES": "Spain", "CA": "Canada", "AU": "Australia"
}
# Répartition des autres pays du même ASN
asn_countries_query = """
SELECT
country_code,
count() AS count
FROM ml_detected_anomalies
WHERE asn_number = %(asn_number)s
AND detected_at >= now() - INTERVAL 24 HOUR
GROUP BY country_code
ORDER BY count DESC
LIMIT 10
"""
asn_result = db.query(asn_countries_query, {"asn_number": asn_number})
total = sum(row[1] for row in asn_result.result_rows)
asn_countries = [
{
"code": row[0],
"name": country_names.get(row[0], row[0]),
"count": row[1],
"percentage": round((row[1] / total * 100), 2) if total > 0 else 0.0
}
for row in asn_result.result_rows
]
return {
"ip_country": {
"code": ip_country_code,
"name": country_names.get(ip_country_code, ip_country_code)
},
"asn_countries": asn_countries
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
# =============================================================================
# ANALYSE PAYS
# =============================================================================
@router.get("/country", response_model=CountryAnalysis)
async def analyze_country(days: int = Query(1, ge=1, le=30)):
"""
Analyse la distribution des pays
"""
try:
# Top pays
top_query = """
SELECT
country_code,
count() AS count
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL %(days)s DAY
AND country_code != '' AND country_code IS NOT NULL
GROUP BY country_code
ORDER BY count DESC
LIMIT 10
"""
top_result = db.query(top_query, {"days": days})
# Calculer le total pour le pourcentage
total = sum(row[1] for row in top_result.result_rows)
# Noms des pays (mapping simple)
country_names = {
"CN": "China", "US": "United States", "DE": "Germany",
"FR": "France", "RU": "Russia", "GB": "United Kingdom",
"NL": "Netherlands", "IN": "India", "BR": "Brazil",
"JP": "Japan", "KR": "South Korea", "IT": "Italy",
"ES": "Spain", "CA": "Canada", "AU": "Australia"
}
top_countries = [
CountryData(
code=row[0],
name=country_names.get(row[0], row[0]),
count=row[1],
percentage=round((row[1] / total * 100), 2) if total > 0 else 0.0
)
for row in top_result.result_rows
]
# Baseline (7 derniers jours)
baseline_query = """
SELECT
country_code,
count() AS count
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL 7 DAY
AND country_code != '' AND country_code IS NOT NULL
GROUP BY country_code
ORDER BY count DESC
LIMIT 5
"""
baseline_result = db.query(baseline_query)
baseline_total = sum(row[1] for row in baseline_result.result_rows)
baseline = {
row[0]: round((row[1] / baseline_total * 100), 2) if baseline_total > 0 else 0.0
for row in baseline_result.result_rows
}
# Détecter pays surreprésenté
alert_country = None
for country in top_countries:
baseline_pct = baseline.get(country.code, 0)
if baseline_pct > 0 and country.percentage > baseline_pct * 2 and country.percentage > 30:
alert_country = country.code
break
return CountryAnalysis(
top_countries=top_countries,
baseline=baseline,
alert_country=alert_country
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
# =============================================================================
# ANALYSE JA4
# =============================================================================
@router.get("/{ip}/ja4", response_model=JA4Analysis)
async def analyze_ja4(ip: str):
"""
Analyse le JA4 fingerprint
"""
try:
# JA4 de cette IP
ja4_query = """
SELECT ja4
FROM ml_detected_anomalies
WHERE src_ip = %(ip)s
AND ja4 != '' AND ja4 IS NOT NULL
ORDER BY detected_at DESC
LIMIT 1
"""
ja4_result = db.query(ja4_query, {"ip": ip})
if not ja4_result.result_rows:
return JA4Analysis(
ja4="",
shared_ips_count=0,
top_subnets=[],
other_ja4_for_ip=[]
)
ja4 = ja4_result.result_rows[0][0]
# IPs avec le même JA4
shared_query = """
SELECT uniq(src_ip)
FROM ml_detected_anomalies
WHERE ja4 = %(ja4)s
AND detected_at >= now() - INTERVAL 24 HOUR
"""
shared_result = db.query(shared_query, {"ja4": ja4})
shared_count = shared_result.result_rows[0][0] if shared_result.result_rows else 0
# Top subnets pour ce JA4 - Simplifié
subnets_query = """
SELECT
src_ip,
count() AS count
FROM ml_detected_anomalies
WHERE ja4 = %(ja4)s
AND detected_at >= now() - INTERVAL 24 HOUR
GROUP BY src_ip
ORDER BY count DESC
LIMIT 100
"""
subnets_result = db.query(subnets_query, {"ja4": ja4})
# Grouper par subnet /24
from collections import defaultdict
subnet_counts = defaultdict(int)
for row in subnets_result.result_rows:
ip_addr = row[0]
parts = ip_addr.split('.')
if len(parts) == 4:
subnet = f"{parts[0]}.{parts[1]}.{parts[2]}.0/24"
subnet_counts[subnet] += row[1]
top_subnets = [
JA4SubnetData(subnet=subnet, count=count)
for subnet, count in sorted(subnet_counts.items(), key=lambda x: x[1], reverse=True)[:10]
]
# Autres JA4 pour cette IP
other_ja4_query = """
SELECT DISTINCT ja4
FROM ml_detected_anomalies
WHERE src_ip = %(ip)s
AND ja4 != '' AND ja4 IS NOT NULL
AND ja4 != %(current_ja4)s
"""
other_result = db.query(other_ja4_query, {"ip": ip, "current_ja4": ja4})
other_ja4 = [row[0] for row in other_result.result_rows]
return JA4Analysis(
ja4=ja4,
shared_ips_count=shared_count,
top_subnets=top_subnets,
other_ja4_for_ip=other_ja4
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
# =============================================================================
# ANALYSE USER-AGENTS
# =============================================================================
@router.get("/{ip}/user-agents", response_model=UserAgentAnalysis)
async def analyze_user_agents(ip: str):
"""
Analyse les User-Agents
"""
try:
# User-Agents pour cette IP (depuis http_logs)
ip_ua_query = """
SELECT
header_user_agent AS ua,
count() AS count
FROM mabase_prod.http_logs
WHERE src_ip = %(ip)s
AND header_user_agent != '' AND header_user_agent IS NOT NULL
AND time >= now() - INTERVAL 24 HOUR
GROUP BY ua
ORDER BY count DESC
LIMIT 10
"""
ip_ua_result = db.query(ip_ua_query, {"ip": ip})
# Classification des UAs
def classify_ua(ua: str) -> str:
ua_lower = ua.lower()
if any(bot in ua_lower for bot in ['bot', 'crawler', 'spider', 'curl', 'wget', 'python', 'requests', 'scrapy']):
return 'bot'
if any(script in ua_lower for script in ['python', 'java', 'php', 'ruby', 'perl', 'node']):
return 'script'
if not ua or ua.strip() == '':
return 'script'
return 'normal'
# Calculer le total
total_count = sum(row[1] for row in ip_ua_result.result_rows)
ip_user_agents = [
UserAgentData(
value=row[0],
count=row[1],
percentage=round((row[1] / total_count * 100), 2) if total_count > 0 else 0.0,
classification=classify_ua(row[0])
)
for row in ip_ua_result.result_rows
]
# Pour les UAs du JA4, on retourne les mêmes pour l'instant
ja4_user_agents = ip_user_agents
# Pourcentage de bots
bot_count = sum(ua.count for ua in ip_user_agents if ua.classification in ['bot', 'script'])
bot_percentage = (bot_count / total_count * 100) if total_count > 0 else 0
return UserAgentAnalysis(
ip_user_agents=ip_user_agents,
ja4_user_agents=ja4_user_agents,
bot_percentage=bot_percentage,
alert=bot_percentage > 20
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
# =============================================================================
# RECOMMANDATION DE CLASSIFICATION
# =============================================================================
@router.get("/{ip}/recommendation", response_model=ClassificationRecommendation)
async def get_classification_recommendation(ip: str):
"""
Génère une recommandation de classification basée sur les corrélations
"""
try:
# Récupérer les analyses
try:
subnet_analysis = await analyze_subnet(ip)
except:
subnet_analysis = None
try:
country_analysis = await analyze_country(1)
except:
country_analysis = None
try:
ja4_analysis = await analyze_ja4(ip)
except:
ja4_analysis = None
try:
ua_analysis = await analyze_user_agents(ip)
except:
ua_analysis = None
# Indicateurs par défaut
indicators = CorrelationIndicators(
subnet_ips_count=subnet_analysis.total_in_subnet if subnet_analysis else 0,
asn_ips_count=subnet_analysis.total_in_asn if subnet_analysis else 0,
country_percentage=0.0,
ja4_shared_ips=ja4_analysis.shared_ips_count if ja4_analysis else 0,
user_agents_count=len(ua_analysis.ja4_user_agents) if ua_analysis else 0,
bot_ua_percentage=ua_analysis.bot_percentage if ua_analysis else 0.0
)
# Score de confiance
score = 0.0
reasons = []
tags = []
# Subnet > 10 IPs
if subnet_analysis and subnet_analysis.total_in_subnet > 10:
score += 0.25
reasons.append(f"{subnet_analysis.total_in_subnet} IPs du même subnet")
tags.append("distributed")
# JA4 partagé > 50 IPs
if ja4_analysis and ja4_analysis.shared_ips_count > 50:
score += 0.25
reasons.append(f"{ja4_analysis.shared_ips_count} IPs avec même JA4")
tags.append("ja4-rotation")
# Bot UA > 20%
if ua_analysis and ua_analysis.bot_percentage > 20:
score += 0.25
reasons.append(f"{ua_analysis.bot_percentage:.0f}% UAs bots/scripts")
tags.append("bot-ua")
# Pays surreprésenté
if country_analysis and country_analysis.alert_country:
score += 0.15
reasons.append(f"Pays {country_analysis.alert_country} surreprésenté")
tags.append(f"country-{country_analysis.alert_country.lower()}")
# ASN hosting
if subnet_analysis:
hosting_keywords = ["ovh", "amazon", "aws", "google", "azure", "digitalocean", "linode", "vultr", "china169", "chinamobile"]
if any(kw in (subnet_analysis.asn_org or "").lower() for kw in hosting_keywords):
score += 0.10
tags.append("hosting-asn")
# Déterminer label
if score >= 0.7:
label = ClassificationLabel.MALICIOUS
tags.append("campaign")
elif score >= 0.4:
label = ClassificationLabel.SUSPICIOUS
else:
label = ClassificationLabel.LEGITIMATE
reason = " | ".join(reasons) if reasons else "Aucun indicateur fort"
return ClassificationRecommendation(
label=label,
confidence=min(score, 1.0),
indicators=indicators,
suggested_tags=tags,
reason=reason
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
# =============================================================================
# CLASSIFICATIONS CRUD
# =============================================================================
@router.post("/classifications", response_model=Classification)
async def create_classification(data: ClassificationCreate):
"""
Crée une classification pour une IP ou un JA4
"""
try:
# Validation: soit ip, soit ja4 doit être fourni
if not data.ip and not data.ja4:
raise HTTPException(status_code=400, detail="IP ou JA4 requis")
query = """
INSERT INTO mabase_prod.classifications
(ip, ja4, label, tags, comment, confidence, features, analyst, created_at)
VALUES
(%(ip)s, %(ja4)s, %(label)s, %(tags)s, %(comment)s, %(confidence)s, %(features)s, %(analyst)s, now())
"""
db.query(query, {
"ip": data.ip or "",
"ja4": data.ja4 or "",
"label": data.label.value,
"tags": data.tags,
"comment": data.comment,
"confidence": data.confidence,
"features": json.dumps(data.features),
"analyst": data.analyst
})
# Récupérer la classification créée
where_clause = "ip = %(entity)s" if data.ip else "ja4 = %(entity)s"
select_query = f"""
SELECT ip, ja4, label, tags, comment, confidence, features, analyst, created_at
FROM mabase_prod.classifications
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT 1
"""
result = db.query(select_query, {"entity": data.ip or data.ja4})
if not result.result_rows:
raise HTTPException(status_code=404, detail="Classification non trouvée")
row = result.result_rows[0]
return Classification(
ip=row[0] or None,
ja4=row[1] or None,
label=ClassificationLabel(row[2]),
tags=row[3],
comment=row[4],
confidence=row[5],
features=json.loads(row[6]) if row[6] else {},
analyst=row[7],
created_at=row[8]
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
@router.get("/classifications", response_model=ClassificationsListResponse)
async def list_classifications(
ip: Optional[str] = Query(None, description="Filtrer par IP"),
ja4: Optional[str] = Query(None, description="Filtrer par JA4"),
label: Optional[str] = Query(None, description="Filtrer par label"),
limit: int = Query(100, ge=1, le=1000)
):
"""
Liste les classifications
"""
try:
where_clauses = ["1=1"]
params = {"limit": limit}
if ip:
where_clauses.append("ip = %(ip)s")
params["ip"] = ip
if ja4:
where_clauses.append("ja4 = %(ja4)s")
params["ja4"] = ja4
if label:
where_clauses.append("label = %(label)s")
params["label"] = label
where_clause = " AND ".join(where_clauses)
query = f"""
SELECT ip, ja4, label, tags, comment, confidence, features, analyst, created_at
FROM mabase_prod.classifications
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT %(limit)s
"""
result = db.query(query, params)
classifications = [
Classification(
ip=row[0] or None,
ja4=row[1] or None,
label=ClassificationLabel(row[2]),
tags=row[3],
comment=row[4],
confidence=row[5],
features=json.loads(row[6]) if row[6] else {},
analyst=row[7],
created_at=row[8]
)
for row in result.result_rows
]
# Total
count_query = f"""
SELECT count()
FROM mabase_prod.classifications
WHERE {where_clause}
"""
count_result = db.query(count_query, params)
total = count_result.result_rows[0][0] if count_result.result_rows else 0
return ClassificationsListResponse(
items=classifications,
total=total
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
@router.get("/classifications/stats")
async def get_classification_stats():
"""
Statistiques des classifications
"""
try:
stats_query = """
SELECT
label,
count() AS total,
uniq(ip) AS unique_ips,
avg(confidence) AS avg_confidence
FROM mabase_prod.classifications
GROUP BY label
ORDER BY total DESC
"""
result = db.query(stats_query)
stats = [
{
"label": row[0],
"total": row[1],
"unique_ips": row[2],
"avg_confidence": float(row[3]) if row[3] else 0.0
}
for row in result.result_rows
]
return {"stats": stats}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")

View File

@ -0,0 +1,92 @@
"""
Endpoints pour la liste des attributs uniques
"""
from fastapi import APIRouter, HTTPException, Query
from ..database import db
from ..models import AttributeListResponse, AttributeListItem
router = APIRouter(prefix="/api/attributes", tags=["attributes"])
@router.get("/{attr_type}", response_model=AttributeListResponse)
async def get_attributes(
attr_type: str,
limit: int = Query(100, ge=1, le=1000, description="Nombre maximum de résultats")
):
"""
Récupère la liste des valeurs uniques pour un type d'attribut
"""
try:
# Mapping des types vers les colonnes
type_column_map = {
"ip": "src_ip",
"ja4": "ja4",
"country": "country_code",
"asn": "asn_number",
"host": "host",
"threat_level": "threat_level",
"model_name": "model_name",
"asn_org": "asn_org"
}
if attr_type not in type_column_map:
raise HTTPException(
status_code=400,
detail=f"Type invalide. Types supportés: {', '.join(type_column_map.keys())}"
)
column = type_column_map[attr_type]
# Requête de base
base_query = f"""
SELECT
{column} AS value,
count() AS count
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL 24 HOUR
"""
# Ajout du filtre pour exclure les valeurs vides/nulles
# Gestion spéciale pour les types IPv6/IPv4 qui ne peuvent pas être comparés à ''
if attr_type == "ip":
# Pour les adresses IP, on convertit en string et on filtre
query = f"""
SELECT value, count FROM (
SELECT toString({column}) AS value, count() AS count
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL 24 HOUR
GROUP BY {column}
)
WHERE value != '' AND value IS NOT NULL
ORDER BY count DESC
LIMIT %(limit)s
"""
else:
query = f"""
{base_query}
AND {column} != '' AND {column} IS NOT NULL
GROUP BY value
ORDER BY count DESC
LIMIT %(limit)s
"""
result = db.query(query, {"limit": limit})
items = [
AttributeListItem(
value=str(row[0]),
count=row[1]
)
for row in result.result_rows
]
return AttributeListResponse(
type=attr_type,
items=items,
total=len(items)
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")

View File

@ -0,0 +1,294 @@
"""
Endpoints pour la liste des détections
"""
from fastapi import APIRouter, HTTPException, Query
from typing import Optional, List
from ..database import db
from ..models import DetectionsListResponse, Detection
router = APIRouter(prefix="/api/detections", tags=["detections"])
@router.get("", response_model=DetectionsListResponse)
async def get_detections(
page: int = Query(1, ge=1, description="Numéro de page"),
page_size: int = Query(25, ge=1, le=100, description="Nombre de lignes par page"),
threat_level: Optional[str] = Query(None, description="Filtrer par niveau de menace"),
model_name: Optional[str] = Query(None, description="Filtrer par modèle"),
country_code: Optional[str] = Query(None, description="Filtrer par pays"),
asn_number: Optional[str] = Query(None, description="Filtrer par ASN"),
search: Optional[str] = Query(None, description="Recherche texte (IP, JA4, Host)"),
sort_by: str = Query("detected_at", description="Trier par"),
sort_order: str = Query("DESC", description="Ordre (ASC/DESC)")
):
"""
Récupère la liste des détections avec pagination et filtres
"""
try:
# Construction de la requête
where_clauses = ["detected_at >= now() - INTERVAL 24 HOUR"]
params = {}
if threat_level:
where_clauses.append("threat_level = %(threat_level)s")
params["threat_level"] = threat_level
if model_name:
where_clauses.append("model_name = %(model_name)s")
params["model_name"] = model_name
if country_code:
where_clauses.append("country_code = %(country_code)s")
params["country_code"] = country_code.upper()
if asn_number:
where_clauses.append("asn_number = %(asn_number)s")
params["asn_number"] = asn_number
if search:
where_clauses.append(
"(src_ip ILIKE %(search)s OR ja4 ILIKE %(search)s OR host ILIKE %(search)s)"
)
params["search"] = f"%{search}%"
where_clause = " AND ".join(where_clauses)
# Requête de comptage
count_query = f"""
SELECT count()
FROM ml_detected_anomalies
WHERE {where_clause}
"""
count_result = db.query(count_query, params)
total = count_result.result_rows[0][0] if count_result.result_rows else 0
# Requête principale
offset = (page - 1) * page_size
# Validation du tri
valid_sort_columns = [
"detected_at", "src_ip", "threat_level", "anomaly_score",
"asn_number", "country_code", "hits", "hit_velocity"
]
if sort_by not in valid_sort_columns:
sort_by = "detected_at"
sort_order = "DESC" if sort_order.upper() == "DESC" else "ASC"
main_query = f"""
SELECT
detected_at,
src_ip,
ja4,
host,
bot_name,
anomaly_score,
threat_level,
model_name,
recurrence,
asn_number,
asn_org,
asn_detail,
asn_domain,
country_code,
asn_label,
hits,
hit_velocity,
fuzzing_index,
post_ratio,
reason
FROM ml_detected_anomalies
WHERE {where_clause}
ORDER BY {sort_by} {sort_order}
LIMIT %(limit)s OFFSET %(offset)s
"""
params["limit"] = page_size
params["offset"] = offset
result = db.query(main_query, params)
detections = [
Detection(
detected_at=row[0],
src_ip=str(row[1]),
ja4=row[2] or "",
host=row[3] or "",
bot_name=row[4] or "",
anomaly_score=float(row[5]) if row[5] else 0.0,
threat_level=row[6] or "LOW",
model_name=row[7] or "",
recurrence=row[8] or 0,
asn_number=str(row[9]) if row[9] else "",
asn_org=row[10] or "",
asn_detail=row[11] or "",
asn_domain=row[12] or "",
country_code=row[13] or "",
asn_label=row[14] or "",
hits=row[15] or 0,
hit_velocity=float(row[16]) if row[16] else 0.0,
fuzzing_index=float(row[17]) if row[17] else 0.0,
post_ratio=float(row[18]) if row[18] else 0.0,
reason=row[19] or ""
)
for row in result.result_rows
]
total_pages = (total + page_size - 1) // page_size
return DetectionsListResponse(
items=detections,
total=total,
page=page,
page_size=page_size,
total_pages=total_pages
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur lors de la récupération des détections: {str(e)}")
@router.get("/{detection_id}")
async def get_detection_details(detection_id: str):
"""
Récupère les détails d'une détection spécifique
detection_id peut être une IP ou un identifiant
"""
try:
query = """
SELECT
detected_at,
src_ip,
ja4,
host,
bot_name,
anomaly_score,
threat_level,
model_name,
recurrence,
asn_number,
asn_org,
asn_detail,
asn_domain,
country_code,
asn_label,
hits,
hit_velocity,
fuzzing_index,
post_ratio,
port_exhaustion_ratio,
orphan_ratio,
tcp_jitter_variance,
tcp_shared_count,
true_window_size,
window_mss_ratio,
alpn_http_mismatch,
is_alpn_missing,
sni_host_mismatch,
header_count,
has_accept_language,
has_cookie,
has_referer,
modern_browser_score,
ua_ch_mismatch,
header_order_shared_count,
ip_id_zero_ratio,
request_size_variance,
multiplexing_efficiency,
mss_mobile_mismatch,
correlated,
reason,
asset_ratio,
direct_access_ratio,
is_ua_rotating,
distinct_ja4_count,
src_port_density,
ja4_asn_concentration,
ja4_country_concentration,
is_rare_ja4
FROM ml_detected_anomalies
WHERE src_ip = %(ip)s
ORDER BY detected_at DESC
LIMIT 1
"""
result = db.query(query, {"ip": detection_id})
if not result.result_rows:
raise HTTPException(status_code=404, detail="Détection non trouvée")
row = result.result_rows[0]
return {
"detected_at": row[0],
"src_ip": str(row[1]),
"ja4": row[2] or "",
"host": row[3] or "",
"bot_name": row[4] or "",
"anomaly_score": float(row[5]) if row[5] else 0.0,
"threat_level": row[6] or "LOW",
"model_name": row[7] or "",
"recurrence": row[8] or 0,
"asn": {
"number": str(row[9]) if row[9] else "",
"org": row[10] or "",
"detail": row[11] or "",
"domain": row[12] or "",
"label": row[14] or ""
},
"country": {
"code": row[13] or "",
},
"metrics": {
"hits": row[15] or 0,
"hit_velocity": float(row[16]) if row[16] else 0.0,
"fuzzing_index": float(row[17]) if row[17] else 0.0,
"post_ratio": float(row[18]) if row[18] else 0.0,
"port_exhaustion_ratio": float(row[19]) if row[19] else 0.0,
"orphan_ratio": float(row[20]) if row[20] else 0.0,
},
"tcp": {
"jitter_variance": float(row[21]) if row[21] else 0.0,
"shared_count": row[22] or 0,
"true_window_size": row[23] or 0,
"window_mss_ratio": float(row[24]) if row[24] else 0.0,
},
"tls": {
"alpn_http_mismatch": bool(row[25]) if row[25] is not None else False,
"is_alpn_missing": bool(row[26]) if row[26] is not None else False,
"sni_host_mismatch": bool(row[27]) if row[27] is not None else False,
},
"headers": {
"count": row[28] or 0,
"has_accept_language": bool(row[29]) if row[29] is not None else False,
"has_cookie": bool(row[30]) if row[30] is not None else False,
"has_referer": bool(row[31]) if row[31] is not None else False,
"modern_browser_score": row[32] or 0,
"ua_ch_mismatch": bool(row[33]) if row[33] is not None else False,
"header_order_shared_count": row[34] or 0,
},
"behavior": {
"ip_id_zero_ratio": float(row[35]) if row[35] else 0.0,
"request_size_variance": float(row[36]) if row[36] else 0.0,
"multiplexing_efficiency": float(row[37]) if row[37] else 0.0,
"mss_mobile_mismatch": bool(row[38]) if row[38] is not None else False,
"correlated": bool(row[39]) if row[39] is not None else False,
},
"advanced": {
"asset_ratio": float(row[41]) if row[41] else 0.0,
"direct_access_ratio": float(row[42]) if row[42] else 0.0,
"is_ua_rotating": bool(row[43]) if row[43] is not None else False,
"distinct_ja4_count": row[44] or 0,
"src_port_density": float(row[45]) if row[45] else 0.0,
"ja4_asn_concentration": float(row[46]) if row[46] else 0.0,
"ja4_country_concentration": float(row[47]) if row[47] else 0.0,
"is_rare_ja4": bool(row[48]) if row[48] is not None else False,
},
"reason": row[40] or ""
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")

337
backend/routes/entities.py Normal file
View File

@ -0,0 +1,337 @@
"""
Routes pour l'investigation d'entités (IP, JA4, User-Agent, Client-Header, Host, Path, Query-Param)
"""
from fastapi import APIRouter, HTTPException, Query
from typing import Optional, List, Dict, Any
from datetime import datetime
import json
from ..database import db
from ..models import (
EntityInvestigation,
EntityStats,
EntityRelatedAttributes,
EntityAttributeValue
)
router = APIRouter(prefix="/api/entities", tags=["Entities"])
db = db
# Mapping des types d'entités
ENTITY_TYPES = {
'ip': 'ip',
'ja4': 'ja4',
'user_agent': 'user_agent',
'client_header': 'client_header',
'host': 'host',
'path': 'path',
'query_param': 'query_param'
}
def get_entity_stats(entity_type: str, entity_value: str, hours: int = 24) -> Optional[EntityStats]:
"""
Récupère les statistiques pour une entité donnée
"""
query = """
SELECT
entity_type,
entity_value,
sum(requests) as total_requests,
sum(unique_ips) as unique_ips,
min(log_date) as first_seen,
max(log_date) as last_seen
FROM mabase_prod.view_dashboard_entities
WHERE entity_type = %(entity_type)s
AND entity_value = %(entity_value)s
AND log_date >= now() - INTERVAL %(hours)s HOUR
GROUP BY entity_type, entity_value
"""
result = db.connect().query(query, {
'entity_type': entity_type,
'entity_value': entity_value,
'hours': hours
})
if not result.result_rows:
return None
row = result.result_rows[0]
return EntityStats(
entity_type=row[0],
entity_value=row[1],
total_requests=row[2],
unique_ips=row[3],
first_seen=row[4],
last_seen=row[5]
)
def get_related_attributes(entity_type: str, entity_value: str, hours: int = 24) -> EntityRelatedAttributes:
"""
Récupère les attributs associés à une entité
"""
# Requête pour agréger tous les attributs associés
query = """
SELECT
(SELECT groupUniqArray(toString(src_ip)) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= now() - INTERVAL %(hours)s HOUR) as ips,
(SELECT groupUniqArray(ja4) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= now() - INTERVAL %(hours)s HOUR AND ja4 != '') as ja4s,
(SELECT groupUniqArray(host) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= now() - INTERVAL %(hours)s HOUR AND host != '') as hosts,
(SELECT groupUniqArrayArray(asns) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= now() - INTERVAL %(hours)s HOUR AND notEmpty(asns)) as asns,
(SELECT groupUniqArrayArray(countries) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= now() - INTERVAL %(hours)s HOUR AND notEmpty(countries)) as countries
"""
result = db.connect().query(query, {
'entity_type': entity_type,
'entity_value': entity_value,
'hours': hours
})
if not result.result_rows or not any(result.result_rows[0]):
return EntityRelatedAttributes(
ips=[],
ja4s=[],
hosts=[],
asns=[],
countries=[]
)
row = result.result_rows[0]
return EntityRelatedAttributes(
ips=[str(ip) for ip in (row[0] or []) if ip],
ja4s=[ja4 for ja4 in (row[1] or []) if ja4],
hosts=[host for host in (row[2] or []) if host],
asns=[asn for asn in (row[3] or []) if asn],
countries=[country for country in (row[4] or []) if country]
)
def get_array_values(entity_type: str, entity_value: str, array_field: str, hours: int = 24) -> List[EntityAttributeValue]:
"""
Extrait et retourne les valeurs d'un champ Array (user_agents, client_headers, etc.)
"""
query = f"""
SELECT
value,
count() as count,
round(count * 100.0 / sum(count) OVER (), 2) as percentage
FROM (
SELECT
arrayJoin({array_field}) as value
FROM mabase_prod.view_dashboard_entities
WHERE entity_type = %(entity_type)s
AND entity_value = %(entity_value)s
AND log_date >= now() - INTERVAL %(hours)s HOUR
AND notEmpty({array_field})
)
GROUP BY value
ORDER BY count DESC
LIMIT 100
"""
result = db.connect().query(query, {
'entity_type': entity_type,
'entity_value': entity_value,
'hours': hours
})
return [
EntityAttributeValue(
value=row[0],
count=row[1],
percentage=row[2]
)
for row in result.result_rows
]
@router.get("/{entity_type}/{entity_value:path}", response_model=EntityInvestigation)
async def get_entity_investigation(
entity_type: str,
entity_value: str,
hours: int = Query(default=24, ge=1, le=720, description="Fenêtre temporelle en heures")
):
"""
Investigation complète pour une entité donnée
- **entity_type**: Type d'entité (ip, ja4, user_agent, client_header, host, path, query_param)
- **entity_value**: Valeur de l'entité
- **hours**: Fenêtre temporelle (défaut: 24h)
Retourne:
- Stats générales
- Attributs associés (IPs, JA4, Hosts, ASNs, Pays)
- User-Agents
- Client-Headers
- Paths
- Query-Params
"""
# Valider le type d'entité
if entity_type not in ENTITY_TYPES:
raise HTTPException(
status_code=400,
detail=f"Type d'entité invalide. Types supportés: {', '.join(ENTITY_TYPES.keys())}"
)
# Stats générales
stats = get_entity_stats(entity_type, entity_value, hours)
if not stats:
raise HTTPException(status_code=404, detail="Entité non trouvée")
# Attributs associés
related = get_related_attributes(entity_type, entity_value, hours)
# User-Agents
user_agents = get_array_values(entity_type, entity_value, 'user_agents', hours)
# Client-Headers
client_headers = get_array_values(entity_type, entity_value, 'client_headers', hours)
# Paths
paths = get_array_values(entity_type, entity_value, 'paths', hours)
# Query-Params
query_params = get_array_values(entity_type, entity_value, 'query_params', hours)
return EntityInvestigation(
stats=stats,
related=related,
user_agents=user_agents,
client_headers=client_headers,
paths=paths,
query_params=query_params
)
@router.get("/{entity_type}/{entity_value:path}/related")
async def get_entity_related(
entity_type: str,
entity_value: str,
hours: int = Query(default=24, ge=1, le=720)
):
"""
Récupère uniquement les attributs associés à une entité
"""
if entity_type not in ENTITY_TYPES:
raise HTTPException(
status_code=400,
detail=f"Type d'entité invalide. Types supportés: {', '.join(ENTITY_TYPES.keys())}"
)
related = get_related_attributes(entity_type, entity_value, hours)
return {
"entity_type": entity_type,
"entity_value": entity_value,
"hours": hours,
"related": related
}
@router.get("/{entity_type}/{entity_value:path}/user_agents")
async def get_entity_user_agents(
entity_type: str,
entity_value: str,
hours: int = Query(default=24, ge=1, le=720)
):
"""
Récupère les User-Agents associés à une entité
"""
if entity_type not in ENTITY_TYPES:
raise HTTPException(status_code=400, detail="Type d'entité invalide")
user_agents = get_array_values(entity_type, entity_value, 'user_agents', hours)
return {
"entity_type": entity_type,
"entity_value": entity_value,
"user_agents": user_agents,
"total": len(user_agents)
}
@router.get("/{entity_type}/{entity_value:path}/client_headers")
async def get_entity_client_headers(
entity_type: str,
entity_value: str,
hours: int = Query(default=24, ge=1, le=720)
):
"""
Récupère les Client-Headers associés à une entité
"""
if entity_type not in ENTITY_TYPES:
raise HTTPException(status_code=400, detail="Type d'entité invalide")
client_headers = get_array_values(entity_type, entity_value, 'client_headers', hours)
return {
"entity_type": entity_type,
"entity_value": entity_value,
"client_headers": client_headers,
"total": len(client_headers)
}
@router.get("/{entity_type}/{entity_value:path}/paths")
async def get_entity_paths(
entity_type: str,
entity_value: str,
hours: int = Query(default=24, ge=1, le=720)
):
"""
Récupère les Paths associés à une entité
"""
if entity_type not in ENTITY_TYPES:
raise HTTPException(status_code=400, detail="Type d'entité invalide")
paths = get_array_values(entity_type, entity_value, 'paths', hours)
return {
"entity_type": entity_type,
"entity_value": entity_value,
"paths": paths,
"total": len(paths)
}
@router.get("/{entity_type}/{entity_value:path}/query_params")
async def get_entity_query_params(
entity_type: str,
entity_value: str,
hours: int = Query(default=24, ge=1, le=720)
):
"""
Récupère les Query-Params associés à une entité
"""
if entity_type not in ENTITY_TYPES:
raise HTTPException(status_code=400, detail="Type d'entité invalide")
query_params = get_array_values(entity_type, entity_value, 'query_params', hours)
return {
"entity_type": entity_type,
"entity_value": entity_value,
"query_params": query_params,
"total": len(query_params)
}
@router.get("/types")
async def get_entity_types():
"""
Retourne la liste des types d'entités supportés
"""
return {
"entity_types": list(ENTITY_TYPES.values()),
"descriptions": {
"ip": "Adresse IP source",
"ja4": "Fingerprint JA4 TLS",
"user_agent": "User-Agent HTTP",
"client_header": "Client Header HTTP",
"host": "Host HTTP",
"path": "Path URL",
"query_param": "Paramètres de query (noms concaténés)"
}
}

122
backend/routes/metrics.py Normal file
View File

@ -0,0 +1,122 @@
"""
Endpoints pour les métriques du dashboard
"""
from fastapi import APIRouter, HTTPException
from ..database import db
from ..models import MetricsResponse, MetricsSummary, TimeSeriesPoint
router = APIRouter(prefix="/api/metrics", tags=["metrics"])
@router.get("", response_model=MetricsResponse)
async def get_metrics():
"""
Récupère les métriques globales du dashboard
"""
try:
# Résumé des métriques
summary_query = """
SELECT
count() AS total_detections,
countIf(threat_level = 'CRITICAL') AS critical_count,
countIf(threat_level = 'HIGH') AS high_count,
countIf(threat_level = 'MEDIUM') AS medium_count,
countIf(threat_level = 'LOW') AS low_count,
countIf(bot_name != '') AS known_bots_count,
countIf(bot_name = '') AS anomalies_count,
uniq(src_ip) AS unique_ips
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL 24 HOUR
"""
summary_result = db.query(summary_query)
summary_row = summary_result.result_rows[0] if summary_result.result_rows else None
if not summary_row:
raise HTTPException(status_code=404, detail="Aucune donnée disponible")
summary = MetricsSummary(
total_detections=summary_row[0],
critical_count=summary_row[1],
high_count=summary_row[2],
medium_count=summary_row[3],
low_count=summary_row[4],
known_bots_count=summary_row[5],
anomalies_count=summary_row[6],
unique_ips=summary_row[7]
)
# Série temporelle (par heure)
timeseries_query = """
SELECT
toStartOfHour(detected_at) AS hour,
count() AS total,
countIf(threat_level = 'CRITICAL') AS critical,
countIf(threat_level = 'HIGH') AS high,
countIf(threat_level = 'MEDIUM') AS medium,
countIf(threat_level = 'LOW') AS low
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL 24 HOUR
GROUP BY hour
ORDER BY hour
"""
timeseries_result = db.query(timeseries_query)
timeseries = [
TimeSeriesPoint(
hour=row[0],
total=row[1],
critical=row[2],
high=row[3],
medium=row[4],
low=row[5]
)
for row in timeseries_result.result_rows
]
# Distribution par menace
threat_distribution = {
"CRITICAL": summary.critical_count,
"HIGH": summary.high_count,
"MEDIUM": summary.medium_count,
"LOW": summary.low_count
}
return MetricsResponse(
summary=summary,
timeseries=timeseries,
threat_distribution=threat_distribution
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur lors de la récupération des métriques: {str(e)}")
@router.get("/threats")
async def get_threat_distribution():
"""
Récupère la répartition par niveau de menace
"""
try:
query = """
SELECT
threat_level,
count() AS count,
round(count() * 100.0 / sum(count()) OVER (), 2) AS percentage
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL 24 HOUR
GROUP BY threat_level
ORDER BY count DESC
"""
result = db.query(query)
return {
"items": [
{"threat_level": row[0], "count": row[1], "percentage": row[2]}
for row in result.result_rows
]
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")

View File

@ -0,0 +1,629 @@
"""
Endpoints pour la variabilité des attributs
"""
from fastapi import APIRouter, HTTPException, Query
from typing import Optional
from ..database import db
from ..models import (
VariabilityResponse, VariabilityAttributes, AttributeValue, Insight,
UserAgentsResponse, UserAgentValue
)
router = APIRouter(prefix="/api/variability", tags=["variability"])
# =============================================================================
# ROUTES SPÉCIFIQUES (doivent être avant les routes génériques)
# =============================================================================
@router.get("/{attr_type}/{value:path}/ips", response_model=dict)
async def get_associated_ips(
attr_type: str,
value: str,
limit: int = Query(100, ge=1, le=1000, description="Nombre maximum d'IPs")
):
"""
Récupère la liste des IPs associées à un attribut
"""
try:
# Mapping des types vers les colonnes
type_column_map = {
"ip": "src_ip",
"ja4": "ja4",
"country": "country_code",
"asn": "asn_number",
"host": "host",
}
if attr_type not in type_column_map:
raise HTTPException(
status_code=400,
detail=f"Type invalide. Types supportés: {', '.join(type_column_map.keys())}"
)
column = type_column_map[attr_type]
query = f"""
SELECT DISTINCT src_ip
FROM ml_detected_anomalies
WHERE {column} = %(value)s
AND detected_at >= now() - INTERVAL 24 HOUR
ORDER BY src_ip
LIMIT %(limit)s
"""
result = db.query(query, {"value": value, "limit": limit})
ips = [str(row[0]) for row in result.result_rows]
# Compter le total
count_query = f"""
SELECT uniq(src_ip) AS total
FROM ml_detected_anomalies
WHERE {column} = %(value)s
AND detected_at >= now() - INTERVAL 24 HOUR
"""
count_result = db.query(count_query, {"value": value})
total = count_result.result_rows[0][0] if count_result.result_rows else 0
return {
"type": attr_type,
"value": value,
"ips": ips,
"total": total,
"showing": len(ips)
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
@router.get("/{attr_type}/{value:path}/attributes", response_model=dict)
async def get_associated_attributes(
attr_type: str,
value: str,
target_attr: str = Query(..., description="Type d'attribut à récupérer (user_agents, ja4, countries, asns, hosts)"),
limit: int = Query(50, ge=1, le=500, description="Nombre maximum de résultats")
):
"""
Récupère la liste des attributs associés (ex: User-Agents pour un pays)
"""
try:
# Mapping des types vers les colonnes
type_column_map = {
"ip": "src_ip",
"ja4": "ja4",
"country": "country_code",
"asn": "asn_number",
"host": "host",
}
# Mapping des attributs cibles
target_column_map = {
"user_agents": "''", # Pas de user_agent
"ja4": "ja4",
"countries": "country_code",
"asns": "asn_number",
"hosts": "host",
}
if attr_type not in type_column_map:
raise HTTPException(status_code=400, detail=f"Type '{attr_type}' invalide")
if target_attr not in target_column_map:
raise HTTPException(
status_code=400,
detail=f"Attribut cible invalide. Supportés: {', '.join(target_column_map.keys())}"
)
column = type_column_map[attr_type]
target_column = target_column_map[target_attr]
# Pour user_agent, retourne liste vide
if target_column == "''":
return {"type": attr_type, "value": value, "target": target_attr, "items": [], "total": 0}
query = f"""
SELECT
{target_column} AS value,
count() AS count,
round(count() * 100.0 / sum(count()) OVER (), 2) AS percentage
FROM ml_detected_anomalies
WHERE {column} = %(value)s
AND {target_column} != '' AND {target_column} IS NOT NULL
AND detected_at >= now() - INTERVAL 24 HOUR
GROUP BY value
ORDER BY count DESC
LIMIT %(limit)s
"""
result = db.query(query, {"value": value, "limit": limit})
items = [
{
"value": str(row[0]),
"count": row[1],
"percentage": round(float(row[2]), 2) if row[2] else 0.0
}
for row in result.result_rows
]
# Compter le total
count_query = f"""
SELECT uniq({target_column}) AS total
FROM ml_detected_anomalies
WHERE {column} = %(value)s
AND {target_column} != '' AND {target_column} IS NOT NULL
AND detected_at >= now() - INTERVAL 24 HOUR
"""
count_result = db.query(count_query, {"value": value})
total = count_result.result_rows[0][0] if count_result.result_rows else 0
return {
"type": attr_type,
"value": value,
"target": target_attr,
"items": items,
"total": total,
"showing": len(items)
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
@router.get("/{attr_type}/{value:path}/user_agents", response_model=UserAgentsResponse)
async def get_user_agents(
attr_type: str,
value: str,
limit: int = Query(100, ge=1, le=500, description="Nombre maximum de user-agents")
):
"""
Récupère la liste des User-Agents associés à un attribut (IP, JA4, pays, etc.)
Les données sont récupérées depuis la vue materialisée view_dashboard_user_agents
"""
try:
# Mapping des types vers les colonnes
type_column_map = {
"ip": "src_ip",
"ja4": "ja4",
"country": "src_country_code",
"asn": "src_asn",
"host": "host",
}
if attr_type not in type_column_map:
raise HTTPException(
status_code=400,
detail=f"Type invalide. Types supportés: {', '.join(type_column_map.keys())}"
)
column = type_column_map[attr_type]
# Requête sur la vue materialisée
# user_agents est un Array, on utilise arrayJoin pour l'aplatir
query = f"""
SELECT
ua AS user_agent,
sum(requests) AS count,
round(count * 100.0 / sum(count) OVER (), 2) AS percentage,
min(hour) AS first_seen,
max(hour) AS last_seen
FROM mabase_prod.view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE {column} = %(value)s
AND hour >= now() - INTERVAL 24 HOUR
GROUP BY user_agent
ORDER BY count DESC
LIMIT %(limit)s
"""
result = db.query(query, {"value": value, "limit": limit})
user_agents = [
UserAgentValue(
value=str(row[0]),
count=row[1] or 0,
percentage=round(float(row[2]), 2) if row[2] else 0.0,
first_seen=row[3] if len(row) > 3 and row[3] else None,
last_seen=row[4] if len(row) > 4 and row[4] else None,
)
for row in result.result_rows
]
# Compter le total
count_query = f"""
SELECT uniq(ua) AS total
FROM mabase_prod.view_dashboard_user_agents
ARRAY JOIN user_agents AS ua
WHERE {column} = %(value)s
AND hour >= now() - INTERVAL 24 HOUR
"""
count_result = db.query(count_query, {"value": value})
total = count_result.result_rows[0][0] if count_result.result_rows else 0
return {
"type": attr_type,
"value": value,
"user_agents": user_agents,
"total": total,
"showing": len(user_agents)
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
# =============================================================================
# ROUTE GÉNÉRIQUE (doit être en dernier)
# =============================================================================
def get_attribute_value(row, count_idx: int, percentage_idx: int,
first_seen_idx: Optional[int] = None,
last_seen_idx: Optional[int] = None,
threat_idx: Optional[int] = None,
unique_ips_idx: Optional[int] = None) -> AttributeValue:
"""Helper pour créer un AttributeValue depuis une ligne ClickHouse"""
return AttributeValue(
value=str(row[0]),
count=row[count_idx] or 0,
percentage=round(float(row[percentage_idx]), 2) if row[percentage_idx] else 0.0,
first_seen=row[first_seen_idx] if first_seen_idx is not None and len(row) > first_seen_idx else None,
last_seen=row[last_seen_idx] if last_seen_idx is not None and len(row) > last_seen_idx else None,
threat_levels=_parse_threat_levels(row[threat_idx]) if threat_idx is not None and len(row) > threat_idx and row[threat_idx] else None,
unique_ips=row[unique_ips_idx] if unique_ips_idx is not None and len(row) > unique_ips_idx else None,
primary_threat=_get_primary_threat(row[threat_idx]) if threat_idx is not None and len(row) > threat_idx and row[threat_idx] else None
)
def _parse_threat_levels(threat_str: str) -> dict:
"""Parse une chaîne de type 'CRITICAL:5,HIGH:10' en dict"""
if not threat_str:
return {}
result = {}
for part in str(threat_str).split(','):
if ':' in part:
level, count = part.strip().split(':')
result[level.strip()] = int(count.strip())
return result
def _get_primary_threat(threat_str: str) -> str:
"""Retourne le niveau de menace principal"""
if not threat_str:
return ""
levels_order = ["CRITICAL", "HIGH", "MEDIUM", "LOW"]
for level in levels_order:
if level in str(threat_str):
return level
return ""
def _generate_insights(attr_type: str, value: str, attributes: VariabilityAttributes,
total_detections: int, unique_ips: int) -> list:
"""Génère des insights basés sur les données de variabilité"""
insights = []
# User-Agent insights
if len(attributes.user_agents) > 1:
insights.append(Insight(
type="warning",
message=f"{len(attributes.user_agents)} User-Agents différents → Possible rotation/obfuscation"
))
# JA4 insights
if len(attributes.ja4) > 1:
insights.append(Insight(
type="warning",
message=f"{len(attributes.ja4)} JA4 fingerprints différents → Possible rotation de fingerprint"
))
# IP insights (pour les sélections non-IP)
if attr_type != "ip" and unique_ips > 10:
insights.append(Insight(
type="info",
message=f"{unique_ips} IPs différentes associées → Possible infrastructure distribuée"
))
# ASN insights
if len(attributes.asns) == 1 and attributes.asns[0].value:
asn_label_lower = ""
if attr_type == "asn":
asn_label_lower = value.lower()
# Vérifier si c'est un ASN de hosting/cloud
hosting_keywords = ["ovh", "amazon", "aws", "google", "azure", "digitalocean", "linode", "vultr"]
if any(kw in (attributes.asns[0].value or "").lower() for kw in hosting_keywords):
insights.append(Insight(
type="warning",
message="ASN de type hosting/cloud → Souvent utilisé pour des bots"
))
# Country insights
if len(attributes.countries) > 5:
insights.append(Insight(
type="info",
message=f"Présent dans {len(attributes.countries)} pays → Distribution géographique large"
))
# Threat level insights
critical_count = 0
high_count = 0
for tl in attributes.threat_levels:
if tl.value == "CRITICAL":
critical_count = tl.count
elif tl.value == "HIGH":
high_count = tl.count
if critical_count > total_detections * 0.3:
insights.append(Insight(
type="warning",
message=f"{round(critical_count * 100 / total_detections)}% de détections CRITICAL → Menace sévère"
))
elif high_count > total_detections * 0.5:
insights.append(Insight(
type="info",
message=f"{round(high_count * 100 / total_detections)}% de détections HIGH → Activité suspecte"
))
return insights
@router.get("/{attr_type}/{value:path}", response_model=VariabilityResponse)
async def get_variability(attr_type: str, value: str):
"""
Récupère la variabilité des attributs associés à une valeur
attr_type: ip, ja4, country, asn, host, user_agent
value: la valeur à investiguer
"""
try:
# Mapping des types vers les colonnes ClickHouse
type_column_map = {
"ip": "src_ip",
"ja4": "ja4",
"country": "country_code",
"asn": "asn_number",
"host": "host",
"user_agent": "header_user_agent"
}
if attr_type not in type_column_map:
raise HTTPException(
status_code=400,
detail=f"Type invalide. Types supportés: {', '.join(type_column_map.keys())}"
)
column = type_column_map[attr_type]
# Requête principale - Récupère toutes les détections pour cette valeur
# On utilise toStartOfHour pour le timeseries et on évite header_user_agent si inexistant
base_query = f"""
SELECT *
FROM (
SELECT
detected_at,
src_ip,
ja4,
host,
'' AS user_agent,
country_code,
asn_number,
asn_org,
threat_level,
model_name,
anomaly_score
FROM ml_detected_anomalies
WHERE {column} = %(value)s
AND detected_at >= now() - INTERVAL 24 HOUR
)
"""
# Stats globales
stats_query = f"""
SELECT
count() AS total_detections,
uniq(src_ip) AS unique_ips,
min(detected_at) AS first_seen,
max(detected_at) AS last_seen
FROM ml_detected_anomalies
WHERE {column} = %(value)s
AND detected_at >= now() - INTERVAL 24 HOUR
"""
stats_result = db.query(stats_query, {"value": value})
if not stats_result.result_rows or stats_result.result_rows[0][0] == 0:
raise HTTPException(status_code=404, detail="Aucune donnée trouvée")
stats_row = stats_result.result_rows[0]
total_detections = stats_row[0]
unique_ips = stats_row[1]
first_seen = stats_row[2]
last_seen = stats_row[3]
# User-Agents
ua_query = f"""
SELECT
user_agent,
count() AS count,
round(count() * 100.0 / sum(count()) OVER (), 2) AS percentage,
min(detected_at) AS first_seen,
max(detected_at) AS last_seen,
groupArray((threat_level, 1)) AS threats
FROM ({base_query})
WHERE user_agent != '' AND user_agent IS NOT NULL
GROUP BY user_agent
ORDER BY count DESC
LIMIT 10
"""
# Simplified query without complex threat parsing
ua_query_simple = f"""
SELECT
user_agent,
count() AS count,
round(count() * 100.0 / (SELECT count() FROM ({base_query}) WHERE user_agent != '' AND user_agent IS NOT NULL), 2) AS percentage,
min(detected_at) AS first_seen,
max(detected_at) AS last_seen
FROM ({base_query})
WHERE user_agent != '' AND user_agent IS NOT NULL
GROUP BY user_agent
ORDER BY count DESC
LIMIT 10
"""
ua_result = db.query(ua_query_simple, {"value": value})
user_agents = [get_attribute_value(row, 1, 2, 3, 4) for row in ua_result.result_rows]
# JA4 fingerprints
ja4_query = f"""
SELECT
ja4,
count() AS count,
round(count() * 100.0 / (SELECT count() FROM ({base_query})), 2) AS percentage,
min(detected_at) AS first_seen,
max(detected_at) AS last_seen
FROM ({base_query})
WHERE ja4 != '' AND ja4 IS NOT NULL
GROUP BY ja4
ORDER BY count DESC
LIMIT 10
"""
ja4_result = db.query(ja4_query, {"value": value})
ja4s = [get_attribute_value(row, 1, 2, 3, 4) for row in ja4_result.result_rows]
# Pays
country_query = f"""
SELECT
country_code,
count() AS count,
round(count() * 100.0 / (SELECT count() FROM ({base_query})), 2) AS percentage
FROM ({base_query})
WHERE country_code != '' AND country_code IS NOT NULL
GROUP BY country_code
ORDER BY count DESC
LIMIT 10
"""
country_result = db.query(country_query, {"value": value})
countries = [get_attribute_value(row, 1, 2) for row in country_result.result_rows]
# ASN
asn_query = f"""
SELECT
concat('AS', toString(asn_number), ' - ', asn_org) AS asn_display,
asn_number,
count() AS count,
round(count() * 100.0 / (SELECT count() FROM ({base_query})), 2) AS percentage
FROM ({base_query})
WHERE asn_number != '' AND asn_number IS NOT NULL AND asn_number != '0'
GROUP BY asn_display, asn_number
ORDER BY count DESC
LIMIT 10
"""
asn_result = db.query(asn_query, {"value": value})
asns = [
AttributeValue(
value=str(row[0]),
count=row[2] or 0,
percentage=round(float(row[3]), 2) if row[3] else 0.0
)
for row in asn_result.result_rows
]
# Hosts
host_query = f"""
SELECT
host,
count() AS count,
round(count() * 100.0 / (SELECT count() FROM ({base_query})), 2) AS percentage
FROM ({base_query})
WHERE host != '' AND host IS NOT NULL
GROUP BY host
ORDER BY count DESC
LIMIT 10
"""
host_result = db.query(host_query, {"value": value})
hosts = [get_attribute_value(row, 1, 2) for row in host_result.result_rows]
# Threat levels
threat_query = f"""
SELECT
threat_level,
count() AS count,
round(count() * 100.0 / (SELECT count() FROM ({base_query})), 2) AS percentage
FROM ({base_query})
WHERE threat_level != '' AND threat_level IS NOT NULL
GROUP BY threat_level
ORDER BY
CASE threat_level
WHEN 'CRITICAL' THEN 1
WHEN 'HIGH' THEN 2
WHEN 'MEDIUM' THEN 3
WHEN 'LOW' THEN 4
ELSE 5
END
"""
threat_result = db.query(threat_query, {"value": value})
threat_levels = [get_attribute_value(row, 1, 2) for row in threat_result.result_rows]
# Model names
model_query = f"""
SELECT
model_name,
count() AS count,
round(count() * 100.0 / (SELECT count() FROM ({base_query})), 2) AS percentage
FROM ({base_query})
WHERE model_name != '' AND model_name IS NOT NULL
GROUP BY model_name
ORDER BY count DESC
"""
model_result = db.query(model_query, {"value": value})
model_names = [get_attribute_value(row, 1, 2) for row in model_result.result_rows]
# Construire la réponse
attributes = VariabilityAttributes(
user_agents=user_agents,
ja4=ja4s,
countries=countries,
asns=asns,
hosts=hosts,
threat_levels=threat_levels,
model_names=model_names
)
# Générer les insights
insights = _generate_insights(attr_type, value, attributes, total_detections, unique_ips)
return VariabilityResponse(
type=attr_type,
value=value,
total_detections=total_detections,
unique_ips=unique_ips,
date_range={
"first_seen": first_seen,
"last_seen": last_seen
},
attributes=attributes,
insights=insights
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")