""" Services de réputation IP - Bases de données publiques sans clé API """ import httpx from typing import Optional, Dict, Any from datetime import datetime import logging logger = logging.getLogger(__name__) # Timeout pour les requêtes HTTP HTTP_TIMEOUT = 10.0 class IPReputationService: """ Service de réputation IP utilisant des bases de données publiques gratuites """ def __init__(self): self.http_client = httpx.AsyncClient(timeout=HTTP_TIMEOUT) # Sources de réputation (sans clé API) self.sources = { 'ip_api': 'http://ip-api.com/json/{ip}', 'ipinfo': 'https://ipinfo.io/{ip}/json', } async def get_reputation(self, ip: str) -> Dict[str, Any]: """ Récupère la réputation d'une IP depuis toutes les sources disponibles Args: ip: Adresse IP à vérifier Returns: Dict avec les informations de réputation agrégées """ results = { 'ip': ip, 'timestamp': datetime.utcnow().isoformat(), 'sources': {}, 'aggregated': { 'is_proxy': False, 'is_hosting': False, 'is_vpn': False, 'is_tor': False, 'threat_score': 0, 'threat_level': 'unknown', 'country': None, 'asn': None, 'org': None, 'warnings': [] } } # Interroge chaque source for source_name, url_template in self.sources.items(): try: url = url_template.format(ip=ip) response = await self.http_client.get(url) if response.status_code == 200: data = response.json() results['sources'][source_name] = self._parse_source_data(source_name, data) else: logger.warning(f"Source {source_name} returned status {response.status_code}") results['sources'][source_name] = {'error': f'Status {response.status_code}'} except httpx.TimeoutException: logger.warning(f"Timeout for source {source_name}") results['sources'][source_name] = {'error': 'Timeout'} except Exception as e: logger.error(f"Error fetching from {source_name}: {str(e)}") results['sources'][source_name] = {'error': str(e)} # Agrège les résultats results['aggregated'] = self._aggregate_results(results['sources']) return results def _parse_source_data(self, source: str, data: Dict[str, Any]) -> Dict[str, Any]: """ Parse les données d'une source spécifique """ if source == 'ip_api': return self._parse_ip_api(data) elif source == 'ipinfo': return self._parse_ipinfo(data) return data def _parse_ip_api(self, data: Dict[str, Any]) -> Dict[str, Any]: """ Parse les données de IP-API.com Response example: { "status": "success", "country": "France", "countryCode": "FR", "region": "IDF", "regionName": "Île-de-France", "city": "Paris", "zip": "75001", "lat": 48.8534, "lon": 2.3488, "timezone": "Europe/Paris", "isp": "OVH SAS", "org": "OVH SAS", "as": "AS16276 OVH SAS", "asname": "OVH", "mobile": false, "proxy": false, "hosting": true, "query": "51.15.0.1" } """ if data.get('status') != 'success': return {'error': data.get('message', 'Unknown error')} # Extraire l'ASN asn_full = data.get('as', '') asn_number = None asn_org = None if asn_full: parts = asn_full.split(' ', 1) if len(parts) >= 1: asn_number = parts[0].replace('AS', '') if len(parts) >= 2: asn_org = parts[1] return { 'country': data.get('country'), 'country_code': data.get('countryCode'), 'region': data.get('regionName'), 'city': data.get('city'), 'isp': data.get('isp'), 'org': data.get('org'), 'asn': asn_number, 'asn_org': asn_org, 'is_proxy': data.get('proxy', False), 'is_hosting': data.get('hosting', False), 'is_mobile': data.get('mobile', False), 'timezone': data.get('timezone'), 'lat': data.get('lat'), 'lon': data.get('lon'), 'query': data.get('query') } def _parse_ipinfo(self, data: Dict[str, Any]) -> Dict[str, Any]: """ Parse les données de IPinfo.io Response example: { "ip": "51.15.0.1", "city": "Paris", "region": "Île-de-France", "country": "FR", "loc": "48.8534,2.3488", "org": "AS16276 OVH SAS", "postal": "75001", "timezone": "Europe/Paris", "readme": "https://ipinfo.io/missingauth" } """ # Extraire l'ASN org_full = data.get('org', '') asn_number = None asn_org = None if org_full: parts = org_full.split(' ', 1) if len(parts) >= 1: asn_number = parts[0].replace('AS', '') if len(parts) >= 2: asn_org = parts[1] # Extraire lat/lon loc = data.get('loc', '') lat = None lon = None if loc: coords = loc.split(',') if len(coords) == 2: lat = float(coords[0]) lon = float(coords[1]) return { 'ip': data.get('ip'), 'city': data.get('city'), 'region': data.get('region'), 'country': data.get('country'), 'postal': data.get('postal'), 'timezone': data.get('timezone'), 'asn': asn_number, 'asn_org': asn_org, 'org': data.get('org'), 'lat': lat, 'lon': lon } def _aggregate_results(self, sources: Dict[str, Any]) -> Dict[str, Any]: """ Agrège les résultats de toutes les sources Logique d'agrégation: - is_proxy: true si au moins une source le détecte - is_hosting: true si au moins une source le détecte - threat_score: basé sur les détections proxy/hosting/vpn/tor - threat_level: low/medium/high/critical basé sur le score """ aggregated = { 'is_proxy': False, 'is_hosting': False, 'is_vpn': False, 'is_tor': False, 'threat_score': 0, 'threat_level': 'unknown', 'country': None, 'country_code': None, 'asn': None, 'asn_org': None, 'org': None, 'city': None, 'warnings': [] } threat_score = 0 for source_name, source_data in sources.items(): if 'error' in source_data: continue # Proxy detection if source_data.get('is_proxy'): aggregated['is_proxy'] = True threat_score += 30 aggregated['warnings'].append(f'{source_name}: Proxy détecté') # Hosting detection if source_data.get('is_hosting'): aggregated['is_hosting'] = True threat_score += 20 aggregated['warnings'].append(f'{source_name}: Hébergement cloud/datacenter') # VPN detection (si disponible) if source_data.get('is_vpn'): aggregated['is_vpn'] = True threat_score += 40 aggregated['warnings'].append(f'{source_name}: VPN détecté') # Tor detection (si disponible) if source_data.get('is_tor'): aggregated['is_tor'] = True threat_score += 50 aggregated['warnings'].append(f'{source_name}: Exit node Tor détecté') # Infos géographiques (prend la première disponible) if not aggregated['country'] and source_data.get('country'): aggregated['country'] = source_data.get('country') if not aggregated['country_code'] and source_data.get('country_code'): aggregated['country_code'] = source_data.get('country_code') # ASN (prend la première disponible) if not aggregated['asn'] and source_data.get('asn'): aggregated['asn'] = source_data.get('asn') if not aggregated['asn_org'] and source_data.get('asn_org'): aggregated['asn_org'] = source_data.get('asn_org') # Organisation/ISP if not aggregated['org'] and source_data.get('org'): aggregated['org'] = source_data.get('org') # Ville if not aggregated['city'] and source_data.get('city'): aggregated['city'] = source_data.get('city') # Calcul du niveau de menace aggregated['threat_score'] = min(100, threat_score) if threat_score >= 80: aggregated['threat_level'] = 'critical' elif threat_score >= 60: aggregated['threat_level'] = 'high' elif threat_score >= 40: aggregated['threat_level'] = 'medium' elif threat_score >= 20: aggregated['threat_level'] = 'low' else: aggregated['threat_level'] = 'clean' return aggregated async def close(self): """Ferme le client HTTP""" await self.http_client.aclose() # Singleton pour réutiliser le service _reputation_service: Optional[IPReputationService] = None def get_reputation_service() -> IPReputationService: """Retourne l'instance singleton du service de réputation""" global _reputation_service if _reputation_service is None: _reputation_service = IPReputationService() return _reputation_service