- Nouveau service backend/services/reputation_ip.py - IP-API.com: Géolocalisation + détection Proxy/Hosting - IPinfo.io: ASN + Organisation - Agrégation des sources avec score de menace 0-100 - Niveaux: clean/low/medium/high/critical - Nouvelle route API GET /api/reputation/ip/:ip - Validation IPv4 - Version complète et summary - Timeout 10s par source - Nouveau composant frontend ReputationPanel.tsx - Badge de niveau de menace (code couleur) - 4 badges détection: Proxy 🌐, Hosting ☁️, VPN 🔒, Tor 🧅 - Infos géographiques: pays, ville, ASN, organisation - Liste des avertissements - Sources et timestamp - Intégration dans InvestigationView - Panel affiché en premier (avant Graph de corrélations) - Chargement asynchrone au montage du composant - Dépendance: httpx==0.26.0 (requêtes HTTP async) Testé avec 141.98.11.209 (Lithuania, AS209605) → 🟢 CLEAN (0/100) Aucun proxy/hosting/VPN/Tor détecté Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
313 lines
10 KiB
Python
313 lines
10 KiB
Python
"""
|
|
Services de réputation IP - Bases de données publiques sans clé API
|
|
"""
|
|
import httpx
|
|
from typing import Optional, Dict, Any
|
|
from datetime import datetime
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Timeout pour les requêtes HTTP
|
|
HTTP_TIMEOUT = 10.0
|
|
|
|
|
|
class IPReputationService:
|
|
"""
|
|
Service de réputation IP utilisant des bases de données publiques gratuites
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.http_client = httpx.AsyncClient(timeout=HTTP_TIMEOUT)
|
|
|
|
# Sources de réputation (sans clé API)
|
|
self.sources = {
|
|
'ip_api': 'http://ip-api.com/json/{ip}',
|
|
'ipinfo': 'https://ipinfo.io/{ip}/json',
|
|
}
|
|
|
|
async def get_reputation(self, ip: str) -> Dict[str, Any]:
|
|
"""
|
|
Récupère la réputation d'une IP depuis toutes les sources disponibles
|
|
|
|
Args:
|
|
ip: Adresse IP à vérifier
|
|
|
|
Returns:
|
|
Dict avec les informations de réputation agrégées
|
|
"""
|
|
results = {
|
|
'ip': ip,
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'sources': {},
|
|
'aggregated': {
|
|
'is_proxy': False,
|
|
'is_hosting': False,
|
|
'is_vpn': False,
|
|
'is_tor': False,
|
|
'threat_score': 0,
|
|
'threat_level': 'unknown',
|
|
'country': None,
|
|
'asn': None,
|
|
'org': None,
|
|
'warnings': []
|
|
}
|
|
}
|
|
|
|
# Interroge chaque source
|
|
for source_name, url_template in self.sources.items():
|
|
try:
|
|
url = url_template.format(ip=ip)
|
|
response = await self.http_client.get(url)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
results['sources'][source_name] = self._parse_source_data(source_name, data)
|
|
else:
|
|
logger.warning(f"Source {source_name} returned status {response.status_code}")
|
|
results['sources'][source_name] = {'error': f'Status {response.status_code}'}
|
|
|
|
except httpx.TimeoutException:
|
|
logger.warning(f"Timeout for source {source_name}")
|
|
results['sources'][source_name] = {'error': 'Timeout'}
|
|
except Exception as e:
|
|
logger.error(f"Error fetching from {source_name}: {str(e)}")
|
|
results['sources'][source_name] = {'error': str(e)}
|
|
|
|
# Agrège les résultats
|
|
results['aggregated'] = self._aggregate_results(results['sources'])
|
|
|
|
return results
|
|
|
|
def _parse_source_data(self, source: str, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Parse les données d'une source spécifique
|
|
"""
|
|
if source == 'ip_api':
|
|
return self._parse_ip_api(data)
|
|
elif source == 'ipinfo':
|
|
return self._parse_ipinfo(data)
|
|
return data
|
|
|
|
def _parse_ip_api(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Parse les données de IP-API.com
|
|
|
|
Response example:
|
|
{
|
|
"status": "success",
|
|
"country": "France",
|
|
"countryCode": "FR",
|
|
"region": "IDF",
|
|
"regionName": "Île-de-France",
|
|
"city": "Paris",
|
|
"zip": "75001",
|
|
"lat": 48.8534,
|
|
"lon": 2.3488,
|
|
"timezone": "Europe/Paris",
|
|
"isp": "OVH SAS",
|
|
"org": "OVH SAS",
|
|
"as": "AS16276 OVH SAS",
|
|
"asname": "OVH",
|
|
"mobile": false,
|
|
"proxy": false,
|
|
"hosting": true,
|
|
"query": "51.15.0.1"
|
|
}
|
|
"""
|
|
if data.get('status') != 'success':
|
|
return {'error': data.get('message', 'Unknown error')}
|
|
|
|
# Extraire l'ASN
|
|
asn_full = data.get('as', '')
|
|
asn_number = None
|
|
asn_org = None
|
|
|
|
if asn_full:
|
|
parts = asn_full.split(' ', 1)
|
|
if len(parts) >= 1:
|
|
asn_number = parts[0].replace('AS', '')
|
|
if len(parts) >= 2:
|
|
asn_org = parts[1]
|
|
|
|
return {
|
|
'country': data.get('country'),
|
|
'country_code': data.get('countryCode'),
|
|
'region': data.get('regionName'),
|
|
'city': data.get('city'),
|
|
'isp': data.get('isp'),
|
|
'org': data.get('org'),
|
|
'asn': asn_number,
|
|
'asn_org': asn_org,
|
|
'is_proxy': data.get('proxy', False),
|
|
'is_hosting': data.get('hosting', False),
|
|
'is_mobile': data.get('mobile', False),
|
|
'timezone': data.get('timezone'),
|
|
'lat': data.get('lat'),
|
|
'lon': data.get('lon'),
|
|
'query': data.get('query')
|
|
}
|
|
|
|
def _parse_ipinfo(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Parse les données de IPinfo.io
|
|
|
|
Response example:
|
|
{
|
|
"ip": "51.15.0.1",
|
|
"city": "Paris",
|
|
"region": "Île-de-France",
|
|
"country": "FR",
|
|
"loc": "48.8534,2.3488",
|
|
"org": "AS16276 OVH SAS",
|
|
"postal": "75001",
|
|
"timezone": "Europe/Paris",
|
|
"readme": "https://ipinfo.io/missingauth"
|
|
}
|
|
"""
|
|
# Extraire l'ASN
|
|
org_full = data.get('org', '')
|
|
asn_number = None
|
|
asn_org = None
|
|
|
|
if org_full:
|
|
parts = org_full.split(' ', 1)
|
|
if len(parts) >= 1:
|
|
asn_number = parts[0].replace('AS', '')
|
|
if len(parts) >= 2:
|
|
asn_org = parts[1]
|
|
|
|
# Extraire lat/lon
|
|
loc = data.get('loc', '')
|
|
lat = None
|
|
lon = None
|
|
if loc:
|
|
coords = loc.split(',')
|
|
if len(coords) == 2:
|
|
lat = float(coords[0])
|
|
lon = float(coords[1])
|
|
|
|
return {
|
|
'ip': data.get('ip'),
|
|
'city': data.get('city'),
|
|
'region': data.get('region'),
|
|
'country': data.get('country'),
|
|
'postal': data.get('postal'),
|
|
'timezone': data.get('timezone'),
|
|
'asn': asn_number,
|
|
'asn_org': asn_org,
|
|
'org': data.get('org'),
|
|
'lat': lat,
|
|
'lon': lon
|
|
}
|
|
|
|
def _aggregate_results(self, sources: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Agrège les résultats de toutes les sources
|
|
|
|
Logique d'agrégation:
|
|
- is_proxy: true si au moins une source le détecte
|
|
- is_hosting: true si au moins une source le détecte
|
|
- threat_score: basé sur les détections proxy/hosting/vpn/tor
|
|
- threat_level: low/medium/high/critical basé sur le score
|
|
"""
|
|
aggregated = {
|
|
'is_proxy': False,
|
|
'is_hosting': False,
|
|
'is_vpn': False,
|
|
'is_tor': False,
|
|
'threat_score': 0,
|
|
'threat_level': 'unknown',
|
|
'country': None,
|
|
'country_code': None,
|
|
'asn': None,
|
|
'asn_org': None,
|
|
'org': None,
|
|
'city': None,
|
|
'warnings': []
|
|
}
|
|
|
|
threat_score = 0
|
|
|
|
for source_name, source_data in sources.items():
|
|
if 'error' in source_data:
|
|
continue
|
|
|
|
# Proxy detection
|
|
if source_data.get('is_proxy'):
|
|
aggregated['is_proxy'] = True
|
|
threat_score += 30
|
|
aggregated['warnings'].append(f'{source_name}: Proxy détecté')
|
|
|
|
# Hosting detection
|
|
if source_data.get('is_hosting'):
|
|
aggregated['is_hosting'] = True
|
|
threat_score += 20
|
|
aggregated['warnings'].append(f'{source_name}: Hébergement cloud/datacenter')
|
|
|
|
# VPN detection (si disponible)
|
|
if source_data.get('is_vpn'):
|
|
aggregated['is_vpn'] = True
|
|
threat_score += 40
|
|
aggregated['warnings'].append(f'{source_name}: VPN détecté')
|
|
|
|
# Tor detection (si disponible)
|
|
if source_data.get('is_tor'):
|
|
aggregated['is_tor'] = True
|
|
threat_score += 50
|
|
aggregated['warnings'].append(f'{source_name}: Exit node Tor détecté')
|
|
|
|
# Infos géographiques (prend la première disponible)
|
|
if not aggregated['country'] and source_data.get('country'):
|
|
aggregated['country'] = source_data.get('country')
|
|
|
|
if not aggregated['country_code'] and source_data.get('country_code'):
|
|
aggregated['country_code'] = source_data.get('country_code')
|
|
|
|
# ASN (prend la première disponible)
|
|
if not aggregated['asn'] and source_data.get('asn'):
|
|
aggregated['asn'] = source_data.get('asn')
|
|
|
|
if not aggregated['asn_org'] and source_data.get('asn_org'):
|
|
aggregated['asn_org'] = source_data.get('asn_org')
|
|
|
|
# Organisation/ISP
|
|
if not aggregated['org'] and source_data.get('org'):
|
|
aggregated['org'] = source_data.get('org')
|
|
|
|
# Ville
|
|
if not aggregated['city'] and source_data.get('city'):
|
|
aggregated['city'] = source_data.get('city')
|
|
|
|
# Calcul du niveau de menace
|
|
aggregated['threat_score'] = min(100, threat_score)
|
|
|
|
if threat_score >= 80:
|
|
aggregated['threat_level'] = 'critical'
|
|
elif threat_score >= 60:
|
|
aggregated['threat_level'] = 'high'
|
|
elif threat_score >= 40:
|
|
aggregated['threat_level'] = 'medium'
|
|
elif threat_score >= 20:
|
|
aggregated['threat_level'] = 'low'
|
|
else:
|
|
aggregated['threat_level'] = 'clean'
|
|
|
|
return aggregated
|
|
|
|
async def close(self):
|
|
"""Ferme le client HTTP"""
|
|
await self.http_client.aclose()
|
|
|
|
|
|
# Singleton pour réutiliser le service
|
|
_reputation_service: Optional[IPReputationService] = None
|
|
|
|
|
|
def get_reputation_service() -> IPReputationService:
|
|
"""Retourne l'instance singleton du service de réputation"""
|
|
global _reputation_service
|
|
if _reputation_service is None:
|
|
_reputation_service = IPReputationService()
|
|
return _reputation_service
|