""" Endpoints pour la variabilité des attributs """ from fastapi import APIRouter, HTTPException, Query from typing import Optional from ..database import db from ..models import ( VariabilityResponse, VariabilityAttributes, AttributeValue, Insight, UserAgentsResponse, UserAgentValue ) router = APIRouter(prefix="/api/variability", tags=["variability"]) # ============================================================================= # ROUTES SPÉCIFIQUES (doivent être avant les routes génériques) # ============================================================================= @router.get("/{attr_type}/{value:path}/ips", response_model=dict) async def get_associated_ips( attr_type: str, value: str, limit: int = Query(100, ge=1, le=1000, description="Nombre maximum d'IPs") ): """ Récupère la liste des IPs associées à un attribut """ try: # Mapping des types vers les colonnes type_column_map = { "ip": "src_ip", "ja4": "ja4", "country": "country_code", "asn": "asn_number", "host": "host", } if attr_type not in type_column_map: raise HTTPException( status_code=400, detail=f"Type invalide. Types supportés: {', '.join(type_column_map.keys())}" ) column = type_column_map[attr_type] query = f""" SELECT src_ip, count() AS hit_count FROM ml_detected_anomalies WHERE {column} = %(value)s AND detected_at >= now() - INTERVAL 24 HOUR GROUP BY src_ip ORDER BY hit_count DESC LIMIT %(limit)s """ result = db.query(query, {"value": value, "limit": limit}) total_hits = sum(row[1] for row in result.result_rows) or 1 ips = [ {"ip": str(row[0]), "count": row[1], "percentage": round(row[1] * 100.0 / total_hits, 2)} for row in result.result_rows ] # Compter le total count_query = f""" SELECT uniq(src_ip) AS total FROM ml_detected_anomalies WHERE {column} = %(value)s AND detected_at >= now() - INTERVAL 24 HOUR """ count_result = db.query(count_query, {"value": value}) total = count_result.result_rows[0][0] if count_result.result_rows else 0 return { "type": attr_type, "value": value, "ips": ips, "total": total, "showing": len(ips) } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") @router.get("/{attr_type}/{value:path}/attributes", response_model=dict) async def get_associated_attributes( attr_type: str, value: str, target_attr: str = Query(..., description="Type d'attribut à récupérer (user_agents, ja4, countries, asns, hosts)"), limit: int = Query(50, ge=1, le=500, description="Nombre maximum de résultats") ): """ Récupère la liste des attributs associés (ex: User-Agents pour un pays) """ try: # Mapping des types vers les colonnes type_column_map = { "ip": "src_ip", "ja4": "ja4", "country": "country_code", "asn": "asn_number", "host": "host", } # Mapping des attributs cibles target_column_map = { "user_agents": None, # handled separately via view_dashboard_entities "ja4": "ja4", "countries": "country_code", "asns": "asn_number", "hosts": "host", } if attr_type not in type_column_map: raise HTTPException(status_code=400, detail=f"Type '{attr_type}' invalide") if target_attr not in target_column_map: raise HTTPException( status_code=400, detail=f"Attribut cible invalide. Supportés: {', '.join(target_column_map.keys())}" ) column = type_column_map[attr_type] target_column = target_column_map[target_attr] # Pour user_agents: requête via view_dashboard_user_agents # Colonnes: src_ip, ja4, hour, log_date, user_agents, requests if target_column is None: if attr_type == "ip": ua_where = "toString(src_ip) = %(value)s" elif attr_type == "ja4": ua_where = "ja4 = %(value)s" else: # country/asn/host: pivot via ml_detected_anomalies ua_where = f"""toString(src_ip) IN ( SELECT DISTINCT replaceRegexpAll(toString(src_ip), '^::ffff:', '') FROM ml_detected_anomalies WHERE {column} = %(value)s AND detected_at >= now() - INTERVAL 24 HOUR )""" ua_q = f""" SELECT ua AS value, sum(requests) AS count, round(sum(requests) * 100.0 / sum(sum(requests)) OVER (), 2) AS percentage FROM view_dashboard_user_agents ARRAY JOIN user_agents AS ua WHERE {ua_where} AND hour >= now() - INTERVAL 24 HOUR AND ua != '' GROUP BY value ORDER BY count DESC LIMIT %(limit)s """ ua_result = db.query(ua_q, {"value": value, "limit": limit}) items = [{"value": str(r[0]), "count": r[1], "percentage": round(float(r[2]), 2) if r[2] else 0.0} for r in ua_result.result_rows] return {"type": attr_type, "value": value, "target": target_attr, "items": items, "total": len(items), "showing": len(items)} query = f""" SELECT {target_column} AS value, count() AS count, round(count() * 100.0 / sum(count()) OVER (), 2) AS percentage FROM ml_detected_anomalies WHERE {column} = %(value)s AND {target_column} != '' AND {target_column} IS NOT NULL AND detected_at >= now() - INTERVAL 24 HOUR GROUP BY value ORDER BY count DESC LIMIT %(limit)s """ result = db.query(query, {"value": value, "limit": limit}) items = [ { "value": str(row[0]), "count": row[1], "percentage": round(float(row[2]), 2) if row[2] else 0.0 } for row in result.result_rows ] # Compter le total count_query = f""" SELECT uniq({target_column}) AS total FROM ml_detected_anomalies WHERE {column} = %(value)s AND {target_column} != '' AND {target_column} IS NOT NULL AND detected_at >= now() - INTERVAL 24 HOUR """ count_result = db.query(count_query, {"value": value}) total = count_result.result_rows[0][0] if count_result.result_rows else 0 return { "type": attr_type, "value": value, "target": target_attr, "items": items, "total": total, "showing": len(items) } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") @router.get("/{attr_type}/{value:path}/user_agents", response_model=UserAgentsResponse) async def get_user_agents( attr_type: str, value: str, limit: int = Query(100, ge=1, le=500, description="Nombre maximum de user-agents") ): """ Récupère la liste des User-Agents associés à un attribut (IP, JA4, pays, etc.) Les données sont récupérées depuis la vue materialisée view_dashboard_user_agents """ try: # Mapping des types vers les colonnes type_column_map = { "ip": "src_ip", "ja4": "ja4", "country": "country_code", "asn": "asn_number", "host": "host", } if attr_type not in type_column_map: raise HTTPException( status_code=400, detail=f"Type invalide. Types supportés: {', '.join(type_column_map.keys())}" ) column = type_column_map[attr_type] # view_dashboard_user_agents colonnes: src_ip, ja4, hour, log_date, user_agents, requests if attr_type == "ip": where = "toString(src_ip) = %(value)s" params: dict = {"value": value, "limit": limit} elif attr_type == "ja4": where = "ja4 = %(value)s" params = {"value": value, "limit": limit} else: # country / asn / host: pivot via ml_detected_anomalies → IPs connus → vue par src_ip ml_col = {"country": "country_code", "asn": "asn_number", "host": "host"}[attr_type] where = f"""toString(src_ip) IN ( SELECT DISTINCT replaceRegexpAll(toString(src_ip), '^::ffff:', '') FROM ml_detected_anomalies WHERE {ml_col} = %(value)s AND detected_at >= now() - INTERVAL 24 HOUR )""" params = {"value": value, "limit": limit} query = f""" SELECT ua AS user_agent, sum(requests) AS count, round(sum(requests) * 100.0 / sum(sum(requests)) OVER (), 2) AS percentage, min(log_date) AS first_seen, max(log_date) AS last_seen FROM view_dashboard_user_agents ARRAY JOIN user_agents AS ua WHERE {where} AND hour >= now() - INTERVAL 24 HOUR AND ua != '' GROUP BY user_agent ORDER BY count DESC LIMIT %(limit)s """ result = db.query(query, params) count_query = f""" SELECT uniqExact(ua) AS total FROM view_dashboard_user_agents ARRAY JOIN user_agents AS ua WHERE {where} AND hour >= now() - INTERVAL 24 HOUR AND ua != '' """ count_result = db.query(count_query, params) user_agents = [ UserAgentValue( value=str(row[0]), count=row[1] or 0, percentage=round(float(row[2]), 2) if row[2] else 0.0, first_seen=row[3] if len(row) > 3 and row[3] else None, last_seen=row[4] if len(row) > 4 and row[4] else None, ) for row in result.result_rows ] total = count_result.result_rows[0][0] if count_result.result_rows else 0 return { "type": attr_type, "value": value, "user_agents": user_agents, "total": total, "showing": len(user_agents) } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}") # ============================================================================= # ROUTE GÉNÉRIQUE (doit être en dernier) # ============================================================================= def get_attribute_value(row, count_idx: int, percentage_idx: int, first_seen_idx: Optional[int] = None, last_seen_idx: Optional[int] = None, threat_idx: Optional[int] = None, unique_ips_idx: Optional[int] = None) -> AttributeValue: """Helper pour créer un AttributeValue depuis une ligne ClickHouse""" return AttributeValue( value=str(row[0]), count=row[count_idx] or 0, percentage=round(float(row[percentage_idx]), 2) if row[percentage_idx] else 0.0, first_seen=row[first_seen_idx] if first_seen_idx is not None and len(row) > first_seen_idx else None, last_seen=row[last_seen_idx] if last_seen_idx is not None and len(row) > last_seen_idx else None, threat_levels=_parse_threat_levels(row[threat_idx]) if threat_idx is not None and len(row) > threat_idx and row[threat_idx] else None, unique_ips=row[unique_ips_idx] if unique_ips_idx is not None and len(row) > unique_ips_idx else None, primary_threat=_get_primary_threat(row[threat_idx]) if threat_idx is not None and len(row) > threat_idx and row[threat_idx] else None ) def _parse_threat_levels(threat_str: str) -> dict: """Parse une chaîne de type 'CRITICAL:5,HIGH:10' en dict""" if not threat_str: return {} result = {} for part in str(threat_str).split(','): if ':' in part: level, count = part.strip().split(':') result[level.strip()] = int(count.strip()) return result def _get_primary_threat(threat_str: str) -> str: """Retourne le niveau de menace principal""" if not threat_str: return "" levels_order = ["CRITICAL", "HIGH", "MEDIUM", "LOW"] for level in levels_order: if level in str(threat_str): return level return "" def _generate_insights(attr_type: str, value: str, attributes: VariabilityAttributes, total_detections: int, unique_ips: int) -> list: """Génère des insights basés sur les données de variabilité""" insights = [] # User-Agent insights if len(attributes.user_agents) > 1: insights.append(Insight( type="warning", message=f"{len(attributes.user_agents)} User-Agents différents → Possible rotation/obfuscation" )) # JA4 insights if len(attributes.ja4) > 1: insights.append(Insight( type="warning", message=f"{len(attributes.ja4)} JA4 fingerprints différents → Possible rotation de fingerprint" )) # IP insights (pour les sélections non-IP) if attr_type != "ip" and unique_ips > 10: insights.append(Insight( type="info", message=f"{unique_ips} IPs différentes associées → Possible infrastructure distribuée" )) # ASN insights if len(attributes.asns) == 1 and attributes.asns[0].value: asn_label_lower = "" if attr_type == "asn": asn_label_lower = value.lower() # Vérifier si c'est un ASN de hosting/cloud hosting_keywords = ["ovh", "amazon", "aws", "google", "azure", "digitalocean", "linode", "vultr"] if any(kw in (attributes.asns[0].value or "").lower() for kw in hosting_keywords): insights.append(Insight( type="warning", message="ASN de type hosting/cloud → Souvent utilisé pour des bots" )) # Country insights if len(attributes.countries) > 5: insights.append(Insight( type="info", message=f"Présent dans {len(attributes.countries)} pays → Distribution géographique large" )) # Threat level insights critical_count = 0 high_count = 0 for tl in attributes.threat_levels: if tl.value == "CRITICAL": critical_count = tl.count elif tl.value == "HIGH": high_count = tl.count if critical_count > total_detections * 0.3: insights.append(Insight( type="warning", message=f"{round(critical_count * 100 / total_detections)}% de détections CRITICAL → Menace sévère" )) elif high_count > total_detections * 0.5: insights.append(Insight( type="info", message=f"{round(high_count * 100 / total_detections)}% de détections HIGH → Activité suspecte" )) return insights @router.get("/{attr_type}/{value:path}", response_model=VariabilityResponse) async def get_variability(attr_type: str, value: str): """ Récupère la variabilité des attributs associés à une valeur attr_type: ip, ja4, country, asn, host, user_agent value: la valeur à investiguer """ try: # Mapping des types vers les colonnes ClickHouse type_column_map = { "ip": "src_ip", "ja4": "ja4", "country": "country_code", "asn": "asn_number", "host": "host", "user_agent": "header_user_agent" } if attr_type not in type_column_map: raise HTTPException( status_code=400, detail=f"Type invalide. Types supportés: {', '.join(type_column_map.keys())}" ) column = type_column_map[attr_type] # Requête principale - Récupère toutes les détections pour cette valeur # On utilise toStartOfHour pour le timeseries et on évite header_user_agent si inexistant base_query = f""" SELECT * FROM ( SELECT detected_at, src_ip, ja4, host, '' AS user_agent, country_code, asn_number, asn_org, threat_level, model_name, anomaly_score FROM ml_detected_anomalies WHERE {column} = %(value)s AND detected_at >= now() - INTERVAL 24 HOUR ) """ # Stats globales stats_query = f""" SELECT count() AS total_detections, uniq(src_ip) AS unique_ips, min(detected_at) AS first_seen, max(detected_at) AS last_seen FROM ml_detected_anomalies WHERE {column} = %(value)s AND detected_at >= now() - INTERVAL 24 HOUR """ stats_result = db.query(stats_query, {"value": value}) if not stats_result.result_rows or stats_result.result_rows[0][0] == 0: raise HTTPException(status_code=404, detail="Aucune donnée trouvée") stats_row = stats_result.result_rows[0] total_detections = stats_row[0] unique_ips = stats_row[1] first_seen = stats_row[2] last_seen = stats_row[3] # User-Agents depuis http_logs pour des comptes exacts par requête # (view_dashboard_user_agents déduplique par heure, ce qui sous-compte les hits) _ua_params: dict = {"value": value} if attr_type == "ip": _ua_logs_where = "src_ip = toIPv4(%(value)s)" ua_query_simple = f""" SELECT header_user_agent AS user_agent, count() AS count, round(count() * 100.0 / ( SELECT count() FROM mabase_prod.http_logs WHERE {_ua_logs_where} AND time >= now() - INTERVAL 24 HOUR ), 2) AS percentage, min(time) AS first_seen, max(time) AS last_seen FROM mabase_prod.http_logs WHERE {_ua_logs_where} AND time >= now() - INTERVAL 24 HOUR AND header_user_agent != '' AND header_user_agent IS NOT NULL GROUP BY user_agent ORDER BY count DESC """ ua_result = db.query(ua_query_simple, _ua_params) user_agents = [get_attribute_value(row, 1, 2, 3, 4) for row in ua_result.result_rows] elif attr_type == "ja4": _ua_logs_where = "ja4 = %(value)s" ua_query_simple = f""" SELECT header_user_agent AS user_agent, count() AS count, round(count() * 100.0 / ( SELECT count() FROM mabase_prod.http_logs WHERE {_ua_logs_where} AND time >= now() - INTERVAL 24 HOUR ), 2) AS percentage, min(time) AS first_seen, max(time) AS last_seen FROM mabase_prod.http_logs WHERE {_ua_logs_where} AND time >= now() - INTERVAL 24 HOUR AND header_user_agent != '' AND header_user_agent IS NOT NULL GROUP BY user_agent ORDER BY count DESC LIMIT 20 """ ua_result = db.query(ua_query_simple, _ua_params) user_agents = [get_attribute_value(row, 1, 2, 3, 4) for row in ua_result.result_rows] else: # country / asn / host: pivot via ml_detected_anomalies → IPs, puis view UA _ua_where = f"""toString(src_ip) IN ( SELECT DISTINCT replaceRegexpAll(toString(src_ip), '^::ffff:', '') FROM ml_detected_anomalies WHERE {column} = %(value)s AND detected_at >= now() - INTERVAL 24 HOUR )""" ua_query_simple = f""" SELECT ua AS user_agent, sum(requests) AS count, round(sum(requests) * 100.0 / sum(sum(requests)) OVER (), 2) AS percentage, min(log_date) AS first_seen, max(log_date) AS last_seen FROM view_dashboard_user_agents ARRAY JOIN user_agents AS ua WHERE {_ua_where} AND hour >= now() - INTERVAL 24 HOUR AND ua != '' GROUP BY user_agent ORDER BY count DESC LIMIT 20 """ ua_result = db.query(ua_query_simple, _ua_params) user_agents = [get_attribute_value(row, 1, 2, 3, 4) for row in ua_result.result_rows] # JA4 fingerprints ja4_query = f""" SELECT ja4, count() AS count, round(count() * 100.0 / (SELECT count() FROM ({base_query})), 2) AS percentage, min(detected_at) AS first_seen, max(detected_at) AS last_seen FROM ({base_query}) WHERE ja4 != '' AND ja4 IS NOT NULL GROUP BY ja4 ORDER BY count DESC LIMIT 10 """ ja4_result = db.query(ja4_query, {"value": value}) ja4s = [get_attribute_value(row, 1, 2, 3, 4) for row in ja4_result.result_rows] # Pays country_query = f""" SELECT country_code, count() AS count, round(count() * 100.0 / (SELECT count() FROM ({base_query})), 2) AS percentage FROM ({base_query}) WHERE country_code != '' AND country_code IS NOT NULL GROUP BY country_code ORDER BY count DESC LIMIT 10 """ country_result = db.query(country_query, {"value": value}) countries = [get_attribute_value(row, 1, 2) for row in country_result.result_rows] # ASN asn_query = f""" SELECT concat('AS', toString(asn_number), ' - ', asn_org) AS asn_display, asn_number, count() AS count, round(count() * 100.0 / (SELECT count() FROM ({base_query})), 2) AS percentage FROM ({base_query}) WHERE asn_number != '' AND asn_number IS NOT NULL AND asn_number != '0' GROUP BY asn_display, asn_number ORDER BY count DESC LIMIT 10 """ asn_result = db.query(asn_query, {"value": value}) asns = [ AttributeValue( value=str(row[0]), count=row[2] or 0, percentage=round(float(row[3]), 2) if row[3] else 0.0 ) for row in asn_result.result_rows ] # Hosts host_query = f""" SELECT host, count() AS count, round(count() * 100.0 / (SELECT count() FROM ({base_query})), 2) AS percentage FROM ({base_query}) WHERE host != '' AND host IS NOT NULL GROUP BY host ORDER BY count DESC LIMIT 10 """ host_result = db.query(host_query, {"value": value}) hosts = [get_attribute_value(row, 1, 2) for row in host_result.result_rows] # Threat levels threat_query = f""" SELECT threat_level, count() AS count, round(count() * 100.0 / (SELECT count() FROM ({base_query})), 2) AS percentage FROM ({base_query}) WHERE threat_level != '' AND threat_level IS NOT NULL GROUP BY threat_level ORDER BY CASE threat_level WHEN 'CRITICAL' THEN 1 WHEN 'HIGH' THEN 2 WHEN 'MEDIUM' THEN 3 WHEN 'LOW' THEN 4 ELSE 5 END """ threat_result = db.query(threat_query, {"value": value}) threat_levels = [get_attribute_value(row, 1, 2) for row in threat_result.result_rows] # Model names model_query = f""" SELECT model_name, count() AS count, round(count() * 100.0 / (SELECT count() FROM ({base_query})), 2) AS percentage FROM ({base_query}) WHERE model_name != '' AND model_name IS NOT NULL GROUP BY model_name ORDER BY count DESC """ model_result = db.query(model_query, {"value": value}) model_names = [get_attribute_value(row, 1, 2) for row in model_result.result_rows] # Construire la réponse attributes = VariabilityAttributes( user_agents=user_agents, ja4=ja4s, countries=countries, asns=asns, hosts=hosts, threat_levels=threat_levels, model_names=model_names ) # Générer les insights insights = _generate_insights(attr_type, value, attributes, total_detections, unique_ips) return VariabilityResponse( type=attr_type, value=value, total_detections=total_detections, unique_ips=unique_ips, date_range={ "first_seen": first_seen, "last_seen": last_seen }, attributes=attributes, insights=insights ) except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Erreur: {str(e)}")