Files
dashboard/backend/routes/entities.py
SOC Analyst ee2b24b277 fix: Subnet investigation - Récupération des user-agents depuis view_dashboard_entities
- Utilisation de 2 requêtes séparées + fusion en Python
- 1ère requête: ml_detected_anomalies pour les détections récentes
- 2ème requête: view_dashboard_entities avec IN clause pour les user-agents
- La clause IN permet d'utiliser l'index ClickHouse (splitByChar ne l'utilise pas)
- PREWHERE optimise les performances de requête

Problème résolu:
- unique_ua était toujours à 0 car la jointure LEFT JOIN ne fonctionnait pas
- La solution avec IN clause fonctionne car elle utilise l'index sur entity_value

Testé avec 141.98.11.0/24:
- 5 IPs, 8 détections, 65 user-agents uniques
- 141.98.11.209: 68 user-agents différents

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
2026-03-15 19:41:48 +01:00

521 lines
17 KiB
Python

"""
Routes pour l'investigation d'entités (IP, JA4, User-Agent, Client-Header, Host, Path, Query-Param)
"""
from fastapi import APIRouter, HTTPException, Query
from typing import Optional, List, Dict, Any
from datetime import datetime
import json
from ..database import db
from ..models import (
EntityInvestigation,
EntityStats,
EntityRelatedAttributes,
EntityAttributeValue
)
router = APIRouter(prefix="/api/entities", tags=["Entities"])
db = db
# Mapping des types d'entités
ENTITY_TYPES = {
'ip': 'ip',
'ja4': 'ja4',
'user_agent': 'user_agent',
'client_header': 'client_header',
'host': 'host',
'path': 'path',
'query_param': 'query_param'
}
def get_entity_stats(entity_type: str, entity_value: str, hours: int = 24) -> Optional[EntityStats]:
"""
Récupère les statistiques pour une entité donnée
"""
query = """
SELECT
entity_type,
entity_value,
sum(requests) as total_requests,
sum(unique_ips) as unique_ips,
min(log_date) as first_seen,
max(log_date) as last_seen
FROM mabase_prod.view_dashboard_entities
WHERE entity_type = %(entity_type)s
AND entity_value = %(entity_value)s
AND log_date >= now() - INTERVAL %(hours)s HOUR
GROUP BY entity_type, entity_value
"""
result = db.connect().query(query, {
'entity_type': entity_type,
'entity_value': entity_value,
'hours': hours
})
if not result.result_rows:
return None
row = result.result_rows[0]
return EntityStats(
entity_type=row[0],
entity_value=row[1],
total_requests=row[2],
unique_ips=row[3],
first_seen=row[4],
last_seen=row[5]
)
def get_related_attributes(entity_type: str, entity_value: str, hours: int = 24) -> EntityRelatedAttributes:
"""
Récupère les attributs associés à une entité
"""
# Requête pour agréger tous les attributs associés
query = """
SELECT
(SELECT groupUniqArray(toString(src_ip)) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= now() - INTERVAL %(hours)s HOUR) as ips,
(SELECT groupUniqArray(ja4) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= now() - INTERVAL %(hours)s HOUR AND ja4 != '') as ja4s,
(SELECT groupUniqArray(host) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= now() - INTERVAL %(hours)s HOUR AND host != '') as hosts,
(SELECT groupUniqArrayArray(asns) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= now() - INTERVAL %(hours)s HOUR AND notEmpty(asns)) as asns,
(SELECT groupUniqArrayArray(countries) FROM mabase_prod.view_dashboard_entities WHERE entity_type = %(entity_type)s AND entity_value = %(entity_value)s AND log_date >= now() - INTERVAL %(hours)s HOUR AND notEmpty(countries)) as countries
"""
result = db.connect().query(query, {
'entity_type': entity_type,
'entity_value': entity_value,
'hours': hours
})
if not result.result_rows or not any(result.result_rows[0]):
return EntityRelatedAttributes(
ips=[],
ja4s=[],
hosts=[],
asns=[],
countries=[]
)
row = result.result_rows[0]
return EntityRelatedAttributes(
ips=[str(ip) for ip in (row[0] or []) if ip],
ja4s=[ja4 for ja4 in (row[1] or []) if ja4],
hosts=[host for host in (row[2] or []) if host],
asns=[asn for asn in (row[3] or []) if asn],
countries=[country for country in (row[4] or []) if country]
)
def get_array_values(entity_type: str, entity_value: str, array_field: str, hours: int = 24) -> List[EntityAttributeValue]:
"""
Extrait et retourne les valeurs d'un champ Array (user_agents, client_headers, etc.)
"""
query = f"""
SELECT
value,
count() as count,
round(count * 100.0 / sum(count) OVER (), 2) as percentage
FROM (
SELECT
arrayJoin({array_field}) as value
FROM mabase_prod.view_dashboard_entities
WHERE entity_type = %(entity_type)s
AND entity_value = %(entity_value)s
AND log_date >= now() - INTERVAL %(hours)s HOUR
AND notEmpty({array_field})
)
GROUP BY value
ORDER BY count DESC
LIMIT 100
"""
result = db.connect().query(query, {
'entity_type': entity_type,
'entity_value': entity_value,
'hours': hours
})
return [
EntityAttributeValue(
value=row[0],
count=row[1],
percentage=row[2]
)
for row in result.result_rows
]
@router.get("/subnet/{subnet:path}")
async def get_subnet_investigation(
subnet: str,
hours: int = Query(default=24, ge=1, le=720)
):
"""
Récupère toutes les IPs d'un subnet /24 avec leurs statistiques
Utilise ml_detected_anomalies pour les détections + view_dashboard_entities pour les user-agents
"""
try:
# Extraire l'IP de base du subnet (ex: 192.168.1.0/24 -> 192.168.1.0)
subnet_ip = subnet.replace('/24', '').replace('/16', '').replace('/8', '')
# Extraire les 3 premiers octets pour le filtre (ex: 141.98.11)
subnet_parts = subnet_ip.split('.')[:3]
subnet_prefix = subnet_parts[0]
subnet_mask = subnet_parts[1]
subnet_third = subnet_parts[2]
# Stats globales du subnet - utilise ml_detected_anomalies + view_dashboard_entities pour UA
stats_query = """
WITH cleaned_ips AS (
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS clean_ip,
detected_at,
ja4,
host,
country_code,
asn_number
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL %(hours)s HOUR
),
subnet_filter AS (
SELECT *
FROM cleaned_ips
WHERE splitByChar('.', clean_ip)[1] = %(subnet_prefix)s
AND splitByChar('.', clean_ip)[2] = %(subnet_mask)s
AND splitByChar('.', clean_ip)[3] = %(subnet_third)s
),
-- Récupérer les user-agents depuis view_dashboard_entities
ua_data AS (
SELECT
entity_value AS ip,
arrayJoin(user_agents) AS user_agent
FROM view_dashboard_entities
WHERE entity_type = 'ip'
AND log_date >= now() - INTERVAL %(hours)s HOUR
AND splitByChar('.', entity_value)[1] = %(subnet_prefix)s
AND splitByChar('.', entity_value)[2] = %(subnet_mask)s
AND splitByChar('.', entity_value)[3] = %(subnet_third)s
)
SELECT
%(subnet)s AS subnet,
uniq(clean_ip) AS total_ips,
count() AS total_detections,
uniq(ja4) AS unique_ja4,
(SELECT uniq(user_agent) FROM ua_data) AS unique_ua,
uniq(host) AS unique_hosts,
argMax(country_code, detected_at) AS primary_country,
argMax(asn_number, detected_at) AS primary_asn,
min(detected_at) AS first_seen,
max(detected_at) AS last_seen
FROM subnet_filter
"""
stats_result = db.query(stats_query, {
"subnet": subnet,
"subnet_prefix": subnet_prefix,
"subnet_mask": subnet_mask,
"subnet_third": subnet_third,
"hours": hours
})
if not stats_result.result_rows or stats_result.result_rows[0][1] == 0:
raise HTTPException(status_code=404, detail="Subnet non trouvé")
stats_row = stats_result.result_rows[0]
stats = {
"subnet": subnet,
"total_ips": stats_row[1] or 0,
"total_detections": stats_row[2] or 0,
"unique_ja4": stats_row[3] or 0,
"unique_ua": stats_row[4] or 0,
"unique_hosts": stats_row[5] or 0,
"primary_country": stats_row[6] or "XX",
"primary_asn": str(stats_row[7]) if stats_row[7] else "?",
"first_seen": stats_row[8].isoformat() if stats_row[8] else "",
"last_seen": stats_row[9].isoformat() if stats_row[9] else ""
}
# Liste des IPs avec détails - 2 requêtes séparées + fusion en Python
ips_query = """
WITH cleaned_ips AS (
SELECT
replaceRegexpAll(toString(src_ip), '^::ffff:', '') AS clean_ip,
detected_at,
ja4,
country_code,
asn_number,
threat_level,
anomaly_score
FROM ml_detected_anomalies
WHERE detected_at >= now() - INTERVAL %(hours)s HOUR
),
subnet_filter AS (
SELECT *
FROM cleaned_ips
WHERE splitByChar('.', clean_ip)[1] = %(subnet_prefix)s
AND splitByChar('.', clean_ip)[2] = %(subnet_mask)s
AND splitByChar('.', clean_ip)[3] = %(subnet_third)s
)
SELECT
clean_ip AS ip,
count() AS total_detections,
uniq(ja4) AS unique_ja4,
argMax(country_code, detected_at) AS primary_country,
argMax(asn_number, detected_at) AS primary_asn,
argMax(threat_level, detected_at) AS threat_level,
avg(anomaly_score) AS avg_score,
min(detected_at) AS first_seen,
max(detected_at) AS last_seen
FROM subnet_filter
GROUP BY ip
ORDER BY total_detections DESC
LIMIT 100
"""
# Exécuter la première requête pour obtenir les IPs
ips_result = db.query(ips_query, {
"subnet_prefix": subnet_prefix,
"subnet_mask": subnet_mask,
"subnet_third": subnet_third,
"hours": hours
})
# Extraire la liste des IPs pour la requête UA
ip_list = [str(row[0]) for row in ips_result.result_rows]
# Requête pour les user-agents avec IN clause (utilise l'index)
unique_ua_dict = {}
if ip_list:
# Formater la liste pour la clause IN
ip_values = ', '.join(f"'{ip}'" for ip in ip_list)
ua_query = f"""
SELECT
entity_value AS ip,
uniq(arrayJoin(user_agents)) AS unique_ua
FROM view_dashboard_entities
PREWHERE entity_type = 'ip'
WHERE entity_value IN ({ip_values})
AND log_date >= today() - INTERVAL 30 DAY
GROUP BY entity_value
"""
ua_result = db.query(ua_query, {})
unique_ua_dict = {row[0]: row[1] for row in ua_result.result_rows}
# Fusionner les résultats
ips = []
for row in ips_result.result_rows:
ips.append({
"ip": str(row[0]),
"total_detections": row[1],
"unique_ja4": row[2],
"unique_ua": unique_ua_dict.get(row[0], 0),
"primary_country": row[3] or "XX",
"primary_asn": str(row[4]) if row[4] else "?",
"threat_level": row[5] or "LOW",
"avg_score": abs(row[6] or 0),
"first_seen": row[7].isoformat() if row[7] else "",
"last_seen": row[8].isoformat() if row[8] else ""
})
return {
"stats": stats,
"ips": ips
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")
@router.get("/{entity_type}/{entity_value:path}", response_model=EntityInvestigation)
async def get_entity_investigation(
entity_type: str,
entity_value: str,
hours: int = Query(default=24, ge=1, le=720, description="Fenêtre temporelle en heures")
):
"""
Investigation complète pour une entité donnée
- **entity_type**: Type d'entité (ip, ja4, user_agent, client_header, host, path, query_param)
- **entity_value**: Valeur de l'entité
- **hours**: Fenêtre temporelle (défaut: 24h)
Retourne:
- Stats générales
- Attributs associés (IPs, JA4, Hosts, ASNs, Pays)
- User-Agents
- Client-Headers
- Paths
- Query-Params
"""
# Valider le type d'entité
if entity_type not in ENTITY_TYPES:
raise HTTPException(
status_code=400,
detail=f"Type d'entité invalide. Types supportés: {', '.join(ENTITY_TYPES.keys())}"
)
# Stats générales
stats = get_entity_stats(entity_type, entity_value, hours)
if not stats:
raise HTTPException(status_code=404, detail="Entité non trouvée")
# Attributs associés
related = get_related_attributes(entity_type, entity_value, hours)
# User-Agents
user_agents = get_array_values(entity_type, entity_value, 'user_agents', hours)
# Client-Headers
client_headers = get_array_values(entity_type, entity_value, 'client_headers', hours)
# Paths
paths = get_array_values(entity_type, entity_value, 'paths', hours)
# Query-Params
query_params = get_array_values(entity_type, entity_value, 'query_params', hours)
return EntityInvestigation(
stats=stats,
related=related,
user_agents=user_agents,
client_headers=client_headers,
paths=paths,
query_params=query_params
)
@router.get("/{entity_type}/{entity_value:path}/related")
async def get_entity_related(
entity_type: str,
entity_value: str,
hours: int = Query(default=24, ge=1, le=720)
):
"""
Récupère uniquement les attributs associés à une entité
"""
if entity_type not in ENTITY_TYPES:
raise HTTPException(
status_code=400,
detail=f"Type d'entité invalide. Types supportés: {', '.join(ENTITY_TYPES.keys())}"
)
related = get_related_attributes(entity_type, entity_value, hours)
return {
"entity_type": entity_type,
"entity_value": entity_value,
"hours": hours,
"related": related
}
@router.get("/{entity_type}/{entity_value:path}/user_agents")
async def get_entity_user_agents(
entity_type: str,
entity_value: str,
hours: int = Query(default=24, ge=1, le=720)
):
"""
Récupère les User-Agents associés à une entité
"""
if entity_type not in ENTITY_TYPES:
raise HTTPException(status_code=400, detail="Type d'entité invalide")
user_agents = get_array_values(entity_type, entity_value, 'user_agents', hours)
return {
"entity_type": entity_type,
"entity_value": entity_value,
"user_agents": user_agents,
"total": len(user_agents)
}
@router.get("/{entity_type}/{entity_value:path}/client_headers")
async def get_entity_client_headers(
entity_type: str,
entity_value: str,
hours: int = Query(default=24, ge=1, le=720)
):
"""
Récupère les Client-Headers associés à une entité
"""
if entity_type not in ENTITY_TYPES:
raise HTTPException(status_code=400, detail="Type d'entité invalide")
client_headers = get_array_values(entity_type, entity_value, 'client_headers', hours)
return {
"entity_type": entity_type,
"entity_value": entity_value,
"client_headers": client_headers,
"total": len(client_headers)
}
@router.get("/{entity_type}/{entity_value:path}/paths")
async def get_entity_paths(
entity_type: str,
entity_value: str,
hours: int = Query(default=24, ge=1, le=720)
):
"""
Récupère les Paths associés à une entité
"""
if entity_type not in ENTITY_TYPES:
raise HTTPException(status_code=400, detail="Type d'entité invalide")
paths = get_array_values(entity_type, entity_value, 'paths', hours)
return {
"entity_type": entity_type,
"entity_value": entity_value,
"paths": paths,
"total": len(paths)
}
@router.get("/{entity_type}/{entity_value:path}/query_params")
async def get_entity_query_params(
entity_type: str,
entity_value: str,
hours: int = Query(default=24, ge=1, le=720)
):
"""
Récupère les Query-Params associés à une entité
"""
if entity_type not in ENTITY_TYPES:
raise HTTPException(status_code=400, detail="Type d'entité invalide")
query_params = get_array_values(entity_type, entity_value, 'query_params', hours)
return {
"entity_type": entity_type,
"entity_value": entity_value,
"query_params": query_params,
"total": len(query_params)
}
@router.get("/types")
async def get_entity_types():
"""
Retourne la liste des types d'entités supportés
"""
return {
"entity_types": list(ENTITY_TYPES.values()),
"descriptions": {
"ip": "Adresse IP source",
"ja4": "Fingerprint JA4 TLS",
"user_agent": "User-Agent HTTP",
"client_header": "Client Header",
"host": "Host HTTP",
"path": "Path URL",
"query_param": "Query Param"
}
}