diff --git a/backend/main.py b/backend/main.py index d964b93..4c6bb69 100644 --- a/backend/main.py +++ b/backend/main.py @@ -12,7 +12,7 @@ import os from .config import settings from .database import db -from .routes import metrics, detections, variability, attributes, analysis, entities, incidents, audit +from .routes import metrics, detections, variability, attributes, analysis, entities, incidents, audit, reputation # Configuration logging logging.basicConfig( @@ -72,6 +72,7 @@ app.include_router(analysis.router) app.include_router(entities.router) app.include_router(incidents.router) app.include_router(audit.router) +app.include_router(reputation.router) # Route pour servir le frontend diff --git a/backend/routes/reputation.py b/backend/routes/reputation.py new file mode 100644 index 0000000..267782c --- /dev/null +++ b/backend/routes/reputation.py @@ -0,0 +1,125 @@ +""" +Routes pour la réputation IP (bases de données publiques) +""" +from fastapi import APIRouter, HTTPException, Path +from typing import Dict, Any +import re + +from ..services.reputation_ip import get_reputation_service + +router = APIRouter(prefix="/api/reputation", tags=["Reputation"]) + +# Pattern de validation d'IP (IPv4) +IP_PATTERN = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$') + + +def is_valid_ipv4(ip: str) -> bool: + """Valide qu'une chaîne est une adresse IPv4 valide""" + if not IP_PATTERN.match(ip): + return False + + # Vérifie que chaque octet est entre 0 et 255 + parts = ip.split('.') + for part in parts: + try: + num = int(part) + if num < 0 or num > 255: + return False + except ValueError: + return False + + return True + + +@router.get("/ip/{ip_address}") +async def get_ip_reputation( + ip_address: str = Path(..., description="Adresse IP à vérifier") +) -> Dict[str, Any]: + """ + Récupère la réputation d'une adresse IP depuis les bases de données publiques + + Sources utilisées (sans clé API): + - IP-API.com: Géolocalisation + Proxy/Hosting detection + - IPinfo.io: ASN + Organisation + + Returns: + Dict avec: + - ip: Adresse IP vérifiée + - timestamp: Date de la vérification + - sources: Détails par source + - aggregated: Résultats agrégés + - is_proxy: bool + - is_hosting: bool + - is_vpn: bool + - is_tor: bool + - threat_score: 0-100 + - threat_level: clean/low/medium/high/critical + - country: Pays + - asn: Numéro ASN + - asn_org: Organisation ASN + - org: ISP/Organisation + - warnings: Liste des alertes + """ + # Valide l'adresse IP + if not is_valid_ipv4(ip_address): + raise HTTPException( + status_code=400, + detail=f"Adresse IP invalide: {ip_address}. Format attendu: x.x.x.x" + ) + + try: + # Récupère le service de réputation + reputation_service = get_reputation_service() + + # Interroge les sources + results = await reputation_service.get_reputation(ip_address) + + return results + + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Erreur lors de la vérification de réputation: {str(e)}" + ) + + +@router.get("/ip/{ip_address}/summary") +async def get_ip_reputation_summary( + ip_address: str = Path(..., description="Adresse IP à vérifier") +) -> Dict[str, Any]: + """ + Version simplifiée de la réputation IP (juste les infos essentielles) + + Utile pour affichage rapide dans les tableaux + """ + if not is_valid_ipv4(ip_address): + raise HTTPException( + status_code=400, + detail=f"Adresse IP invalide: {ip_address}" + ) + + try: + reputation_service = get_reputation_service() + full_results = await reputation_service.get_reputation(ip_address) + + # Retourne juste l'essentiel + aggregated = full_results.get('aggregated', {}) + + return { + 'ip': ip_address, + 'threat_level': aggregated.get('threat_level', 'unknown'), + 'threat_score': aggregated.get('threat_score', 0), + 'is_proxy': aggregated.get('is_proxy', False), + 'is_hosting': aggregated.get('is_hosting', False), + 'country': aggregated.get('country'), + 'country_code': aggregated.get('country_code'), + 'asn': aggregated.get('asn'), + 'org': aggregated.get('org'), + 'warnings_count': len(aggregated.get('warnings', [])) + } + + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Erreur: {str(e)}" + ) diff --git a/backend/services/__init__.py b/backend/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/services/reputation_ip.py b/backend/services/reputation_ip.py new file mode 100644 index 0000000..758c4f1 --- /dev/null +++ b/backend/services/reputation_ip.py @@ -0,0 +1,312 @@ +""" +Services de réputation IP - Bases de données publiques sans clé API +""" +import httpx +from typing import Optional, Dict, Any +from datetime import datetime +import logging + +logger = logging.getLogger(__name__) + +# Timeout pour les requêtes HTTP +HTTP_TIMEOUT = 10.0 + + +class IPReputationService: + """ + Service de réputation IP utilisant des bases de données publiques gratuites + """ + + def __init__(self): + self.http_client = httpx.AsyncClient(timeout=HTTP_TIMEOUT) + + # Sources de réputation (sans clé API) + self.sources = { + 'ip_api': 'http://ip-api.com/json/{ip}', + 'ipinfo': 'https://ipinfo.io/{ip}/json', + } + + async def get_reputation(self, ip: str) -> Dict[str, Any]: + """ + Récupère la réputation d'une IP depuis toutes les sources disponibles + + Args: + ip: Adresse IP à vérifier + + Returns: + Dict avec les informations de réputation agrégées + """ + results = { + 'ip': ip, + 'timestamp': datetime.utcnow().isoformat(), + 'sources': {}, + 'aggregated': { + 'is_proxy': False, + 'is_hosting': False, + 'is_vpn': False, + 'is_tor': False, + 'threat_score': 0, + 'threat_level': 'unknown', + 'country': None, + 'asn': None, + 'org': None, + 'warnings': [] + } + } + + # Interroge chaque source + for source_name, url_template in self.sources.items(): + try: + url = url_template.format(ip=ip) + response = await self.http_client.get(url) + + if response.status_code == 200: + data = response.json() + results['sources'][source_name] = self._parse_source_data(source_name, data) + else: + logger.warning(f"Source {source_name} returned status {response.status_code}") + results['sources'][source_name] = {'error': f'Status {response.status_code}'} + + except httpx.TimeoutException: + logger.warning(f"Timeout for source {source_name}") + results['sources'][source_name] = {'error': 'Timeout'} + except Exception as e: + logger.error(f"Error fetching from {source_name}: {str(e)}") + results['sources'][source_name] = {'error': str(e)} + + # Agrège les résultats + results['aggregated'] = self._aggregate_results(results['sources']) + + return results + + def _parse_source_data(self, source: str, data: Dict[str, Any]) -> Dict[str, Any]: + """ + Parse les données d'une source spécifique + """ + if source == 'ip_api': + return self._parse_ip_api(data) + elif source == 'ipinfo': + return self._parse_ipinfo(data) + return data + + def _parse_ip_api(self, data: Dict[str, Any]) -> Dict[str, Any]: + """ + Parse les données de IP-API.com + + Response example: + { + "status": "success", + "country": "France", + "countryCode": "FR", + "region": "IDF", + "regionName": "Île-de-France", + "city": "Paris", + "zip": "75001", + "lat": 48.8534, + "lon": 2.3488, + "timezone": "Europe/Paris", + "isp": "OVH SAS", + "org": "OVH SAS", + "as": "AS16276 OVH SAS", + "asname": "OVH", + "mobile": false, + "proxy": false, + "hosting": true, + "query": "51.15.0.1" + } + """ + if data.get('status') != 'success': + return {'error': data.get('message', 'Unknown error')} + + # Extraire l'ASN + asn_full = data.get('as', '') + asn_number = None + asn_org = None + + if asn_full: + parts = asn_full.split(' ', 1) + if len(parts) >= 1: + asn_number = parts[0].replace('AS', '') + if len(parts) >= 2: + asn_org = parts[1] + + return { + 'country': data.get('country'), + 'country_code': data.get('countryCode'), + 'region': data.get('regionName'), + 'city': data.get('city'), + 'isp': data.get('isp'), + 'org': data.get('org'), + 'asn': asn_number, + 'asn_org': asn_org, + 'is_proxy': data.get('proxy', False), + 'is_hosting': data.get('hosting', False), + 'is_mobile': data.get('mobile', False), + 'timezone': data.get('timezone'), + 'lat': data.get('lat'), + 'lon': data.get('lon'), + 'query': data.get('query') + } + + def _parse_ipinfo(self, data: Dict[str, Any]) -> Dict[str, Any]: + """ + Parse les données de IPinfo.io + + Response example: + { + "ip": "51.15.0.1", + "city": "Paris", + "region": "Île-de-France", + "country": "FR", + "loc": "48.8534,2.3488", + "org": "AS16276 OVH SAS", + "postal": "75001", + "timezone": "Europe/Paris", + "readme": "https://ipinfo.io/missingauth" + } + """ + # Extraire l'ASN + org_full = data.get('org', '') + asn_number = None + asn_org = None + + if org_full: + parts = org_full.split(' ', 1) + if len(parts) >= 1: + asn_number = parts[0].replace('AS', '') + if len(parts) >= 2: + asn_org = parts[1] + + # Extraire lat/lon + loc = data.get('loc', '') + lat = None + lon = None + if loc: + coords = loc.split(',') + if len(coords) == 2: + lat = float(coords[0]) + lon = float(coords[1]) + + return { + 'ip': data.get('ip'), + 'city': data.get('city'), + 'region': data.get('region'), + 'country': data.get('country'), + 'postal': data.get('postal'), + 'timezone': data.get('timezone'), + 'asn': asn_number, + 'asn_org': asn_org, + 'org': data.get('org'), + 'lat': lat, + 'lon': lon + } + + def _aggregate_results(self, sources: Dict[str, Any]) -> Dict[str, Any]: + """ + Agrège les résultats de toutes les sources + + Logique d'agrégation: + - is_proxy: true si au moins une source le détecte + - is_hosting: true si au moins une source le détecte + - threat_score: basé sur les détections proxy/hosting/vpn/tor + - threat_level: low/medium/high/critical basé sur le score + """ + aggregated = { + 'is_proxy': False, + 'is_hosting': False, + 'is_vpn': False, + 'is_tor': False, + 'threat_score': 0, + 'threat_level': 'unknown', + 'country': None, + 'country_code': None, + 'asn': None, + 'asn_org': None, + 'org': None, + 'city': None, + 'warnings': [] + } + + threat_score = 0 + + for source_name, source_data in sources.items(): + if 'error' in source_data: + continue + + # Proxy detection + if source_data.get('is_proxy'): + aggregated['is_proxy'] = True + threat_score += 30 + aggregated['warnings'].append(f'{source_name}: Proxy détecté') + + # Hosting detection + if source_data.get('is_hosting'): + aggregated['is_hosting'] = True + threat_score += 20 + aggregated['warnings'].append(f'{source_name}: Hébergement cloud/datacenter') + + # VPN detection (si disponible) + if source_data.get('is_vpn'): + aggregated['is_vpn'] = True + threat_score += 40 + aggregated['warnings'].append(f'{source_name}: VPN détecté') + + # Tor detection (si disponible) + if source_data.get('is_tor'): + aggregated['is_tor'] = True + threat_score += 50 + aggregated['warnings'].append(f'{source_name}: Exit node Tor détecté') + + # Infos géographiques (prend la première disponible) + if not aggregated['country'] and source_data.get('country'): + aggregated['country'] = source_data.get('country') + + if not aggregated['country_code'] and source_data.get('country_code'): + aggregated['country_code'] = source_data.get('country_code') + + # ASN (prend la première disponible) + if not aggregated['asn'] and source_data.get('asn'): + aggregated['asn'] = source_data.get('asn') + + if not aggregated['asn_org'] and source_data.get('asn_org'): + aggregated['asn_org'] = source_data.get('asn_org') + + # Organisation/ISP + if not aggregated['org'] and source_data.get('org'): + aggregated['org'] = source_data.get('org') + + # Ville + if not aggregated['city'] and source_data.get('city'): + aggregated['city'] = source_data.get('city') + + # Calcul du niveau de menace + aggregated['threat_score'] = min(100, threat_score) + + if threat_score >= 80: + aggregated['threat_level'] = 'critical' + elif threat_score >= 60: + aggregated['threat_level'] = 'high' + elif threat_score >= 40: + aggregated['threat_level'] = 'medium' + elif threat_score >= 20: + aggregated['threat_level'] = 'low' + else: + aggregated['threat_level'] = 'clean' + + return aggregated + + async def close(self): + """Ferme le client HTTP""" + await self.http_client.aclose() + + +# Singleton pour réutiliser le service +_reputation_service: Optional[IPReputationService] = None + + +def get_reputation_service() -> IPReputationService: + """Retourne l'instance singleton du service de réputation""" + global _reputation_service + if _reputation_service is None: + _reputation_service = IPReputationService() + return _reputation_service diff --git a/frontend/src/components/InvestigationView.tsx b/frontend/src/components/InvestigationView.tsx index b7b974e..632b812 100644 --- a/frontend/src/components/InvestigationView.tsx +++ b/frontend/src/components/InvestigationView.tsx @@ -6,6 +6,7 @@ import { UserAgentAnalysis } from './analysis/UserAgentAnalysis'; import { CorrelationSummary } from './analysis/CorrelationSummary'; import { CorrelationGraph } from './CorrelationGraph'; import { InteractiveTimeline } from './InteractiveTimeline'; +import { ReputationPanel } from './ReputationPanel'; export function InvestigationView() { const { ip } = useParams<{ ip: string }>(); @@ -46,6 +47,12 @@ export function InvestigationView() { {/* Panels d'analyse */}