""" Modèles de données pour l'API """ from pydantic import BaseModel, Field from typing import Optional, List, Dict, Any from datetime import datetime from enum import Enum class ThreatLevel(str, Enum): CRITICAL = "CRITICAL" HIGH = "HIGH" MEDIUM = "MEDIUM" LOW = "LOW" class ModelName(str, Enum): COMPLET = "Complet" APPLICATIF = "Applicatif" # ───────────────────────────────────────────────────────────────────────────── # MÉTRIQUES # ───────────────────────────────────────────────────────────────────────────── class MetricsSummary(BaseModel): total_detections: int critical_count: int high_count: int medium_count: int low_count: int known_bots_count: int anomalies_count: int unique_ips: int class TimeSeriesPoint(BaseModel): hour: datetime total: int critical: int high: int medium: int low: int class MetricsResponse(BaseModel): summary: MetricsSummary timeseries: List[TimeSeriesPoint] threat_distribution: Dict[str, int] # ───────────────────────────────────────────────────────────────────────────── # DÉTECTIONS # ───────────────────────────────────────────────────────────────────────────── class Detection(BaseModel): detected_at: datetime src_ip: str ja4: str host: str bot_name: str anomaly_score: float threat_level: str model_name: str recurrence: int asn_number: str asn_org: str asn_detail: str asn_domain: str country_code: str asn_label: str hits: int hit_velocity: float fuzzing_index: float post_ratio: float reason: str client_headers: str = "" asn_score: Optional[float] = None asn_rep_label: str = "" class DetectionsListResponse(BaseModel): items: List[Detection] total: int page: int page_size: int total_pages: int # ───────────────────────────────────────────────────────────────────────────── # VARIABILITÉ # ───────────────────────────────────────────────────────────────────────────── class AttributeValue(BaseModel): value: str count: int percentage: float first_seen: Optional[datetime] = None last_seen: Optional[datetime] = None threat_levels: Optional[Dict[str, int]] = None unique_ips: Optional[int] = None primary_threat: Optional[str] = None class VariabilityAttributes(BaseModel): user_agents: List[AttributeValue] = Field(default_factory=list) ja4: List[AttributeValue] = Field(default_factory=list) countries: List[AttributeValue] = Field(default_factory=list) asns: List[AttributeValue] = Field(default_factory=list) hosts: List[AttributeValue] = Field(default_factory=list) threat_levels: List[AttributeValue] = Field(default_factory=list) model_names: List[AttributeValue] = Field(default_factory=list) class Insight(BaseModel): type: str # "warning", "info", "success" message: str class VariabilityResponse(BaseModel): type: str value: str total_detections: int unique_ips: int date_range: Dict[str, datetime] attributes: VariabilityAttributes insights: List[Insight] = Field(default_factory=list) # ───────────────────────────────────────────────────────────────────────────── # ATTRIBUTS UNIQUES # ───────────────────────────────────────────────────────────────────────────── class AttributeListItem(BaseModel): value: str count: int class AttributeListResponse(BaseModel): type: str items: List[AttributeListItem] total: int # ───────────────────────────────────────────────────────────────────────────── # USER-AGENTS # ───────────────────────────────────────────────────────────────────────────── class UserAgentValue(BaseModel): value: str count: int percentage: float first_seen: Optional[datetime] = None last_seen: Optional[datetime] = None class UserAgentsResponse(BaseModel): type: str value: str user_agents: List[UserAgentValue] total: int showing: int # ───────────────────────────────────────────────────────────────────────────── # COMPARAISON # ───────────────────────────────────────────────────────────────────────────── class ComparisonMetric(BaseModel): name: str value1: Any value2: Any difference: str trend: str # "better", "worse", "same" class ComparisonEntity(BaseModel): type: str value: str total_detections: int unique_ips: int avg_score: float primary_threat: str class ComparisonResponse(BaseModel): entity1: ComparisonEntity entity2: ComparisonEntity metrics: List[ComparisonMetric] # ───────────────────────────────────────────────────────────────────────────── # CLASSIFICATIONS (SOC / ML) # ───────────────────────────────────────────────────────────────────────────── class ClassificationLabel(str, Enum): LEGITIMATE = "legitimate" SUSPICIOUS = "suspicious" MALICIOUS = "malicious" class ClassificationBase(BaseModel): ip: Optional[str] = None ja4: Optional[str] = None label: ClassificationLabel tags: List[str] = Field(default_factory=list) comment: str = "" confidence: float = Field(ge=0.0, le=1.0, default=0.5) analyst: str = "unknown" class ClassificationCreate(ClassificationBase): """Données pour créer une classification""" features: dict = Field(default_factory=dict) class Classification(ClassificationBase): """Classification complète avec métadonnées""" created_at: datetime features: dict = Field(default_factory=dict) class Config: from_attributes = True class ClassificationStats(BaseModel): """Statistiques de classification""" label: str total: int unique_ips: int avg_confidence: float class ClassificationsListResponse(BaseModel): """Réponse pour la liste des classifications""" items: List[Classification] total: int # ───────────────────────────────────────────────────────────────────────────── # ANALYSIS (CORRELATION) # ───────────────────────────────────────────────────────────────────────────── class SubnetAnalysis(BaseModel): """Analyse subnet/ASN""" ip: str subnet: str ips_in_subnet: List[str] total_in_subnet: int asn_number: str asn_org: str total_in_asn: int alert: bool # True si > 10 IPs du subnet class CountryData(BaseModel): """Données pour un pays""" code: str name: str count: int percentage: float class CountryAnalysis(BaseModel): """Analyse des pays""" top_countries: List[CountryData] baseline: dict # Pays habituels alert_country: Optional[str] = None # Pays surreprésenté class JA4SubnetData(BaseModel): """Subnet pour un JA4""" subnet: str count: int class JA4Analysis(BaseModel): """Analyse JA4""" ja4: str shared_ips_count: int top_subnets: List[JA4SubnetData] other_ja4_for_ip: List[str] class UserAgentData(BaseModel): """Données pour un User-Agent""" value: str count: int percentage: float classification: str # "normal", "bot", "script" class UserAgentAnalysis(BaseModel): """Analyse User-Agents""" ip_user_agents: List[UserAgentData] ja4_user_agents: List[UserAgentData] bot_percentage: float alert: bool # True si > 20% bots/scripts class CorrelationIndicators(BaseModel): """Indicateurs de corrélation""" subnet_ips_count: int asn_ips_count: int country_percentage: float ja4_shared_ips: int user_agents_count: int bot_ua_percentage: float class ClassificationRecommendation(BaseModel): """Recommandation de classification""" label: ClassificationLabel confidence: float indicators: CorrelationIndicators suggested_tags: List[str] reason: str # ───────────────────────────────────────────────────────────────────────────── # ENTITIES (UNIFIED VIEW) # ───────────────────────────────────────────────────────────────────────────── class EntityStats(BaseModel): """Statistiques pour une entité""" entity_type: str entity_value: str total_requests: int unique_ips: int first_seen: datetime last_seen: datetime class EntityRelatedAttributes(BaseModel): """Attributs associés à une entité""" ips: List[str] = Field(default_factory=list) ja4s: List[str] = Field(default_factory=list) hosts: List[str] = Field(default_factory=list) asns: List[str] = Field(default_factory=list) countries: List[str] = Field(default_factory=list) class EntityAttributeValue(BaseModel): """Valeur d'attribut avec count et percentage (pour les entities)""" value: str count: int percentage: float class EntityInvestigation(BaseModel): """Investigation complète pour une entité""" stats: EntityStats related: EntityRelatedAttributes user_agents: List[EntityAttributeValue] = Field(default_factory=list) client_headers: List[EntityAttributeValue] = Field(default_factory=list) paths: List[EntityAttributeValue] = Field(default_factory=list) query_params: List[EntityAttributeValue] = Field(default_factory=list)