Files
dashboard/backend/services/reputation_ip.py
SOC Analyst 05d21ae8fb feat: Réputation IP depuis bases publiques (sans clé API)
- Nouveau service backend/services/reputation_ip.py
  - IP-API.com: Géolocalisation + détection Proxy/Hosting
  - IPinfo.io: ASN + Organisation
  - Agrégation des sources avec score de menace 0-100
  - Niveaux: clean/low/medium/high/critical

- Nouvelle route API GET /api/reputation/ip/:ip
  - Validation IPv4
  - Version complète et summary
  - Timeout 10s par source

- Nouveau composant frontend ReputationPanel.tsx
  - Badge de niveau de menace (code couleur)
  - 4 badges détection: Proxy 🌐, Hosting ☁️, VPN 🔒, Tor 🧅
  - Infos géographiques: pays, ville, ASN, organisation
  - Liste des avertissements
  - Sources et timestamp

- Intégration dans InvestigationView
  - Panel affiché en premier (avant Graph de corrélations)
  - Chargement asynchrone au montage du composant

- Dépendance: httpx==0.26.0 (requêtes HTTP async)

Testé avec 141.98.11.209 (Lithuania, AS209605) → 🟢 CLEAN (0/100)
Aucun proxy/hosting/VPN/Tor détecté

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
2026-03-15 18:15:01 +01:00

313 lines
10 KiB
Python

"""
Services de réputation IP - Bases de données publiques sans clé API
"""
import httpx
from typing import Optional, Dict, Any
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
# Timeout pour les requêtes HTTP
HTTP_TIMEOUT = 10.0
class IPReputationService:
"""
Service de réputation IP utilisant des bases de données publiques gratuites
"""
def __init__(self):
self.http_client = httpx.AsyncClient(timeout=HTTP_TIMEOUT)
# Sources de réputation (sans clé API)
self.sources = {
'ip_api': 'http://ip-api.com/json/{ip}',
'ipinfo': 'https://ipinfo.io/{ip}/json',
}
async def get_reputation(self, ip: str) -> Dict[str, Any]:
"""
Récupère la réputation d'une IP depuis toutes les sources disponibles
Args:
ip: Adresse IP à vérifier
Returns:
Dict avec les informations de réputation agrégées
"""
results = {
'ip': ip,
'timestamp': datetime.utcnow().isoformat(),
'sources': {},
'aggregated': {
'is_proxy': False,
'is_hosting': False,
'is_vpn': False,
'is_tor': False,
'threat_score': 0,
'threat_level': 'unknown',
'country': None,
'asn': None,
'org': None,
'warnings': []
}
}
# Interroge chaque source
for source_name, url_template in self.sources.items():
try:
url = url_template.format(ip=ip)
response = await self.http_client.get(url)
if response.status_code == 200:
data = response.json()
results['sources'][source_name] = self._parse_source_data(source_name, data)
else:
logger.warning(f"Source {source_name} returned status {response.status_code}")
results['sources'][source_name] = {'error': f'Status {response.status_code}'}
except httpx.TimeoutException:
logger.warning(f"Timeout for source {source_name}")
results['sources'][source_name] = {'error': 'Timeout'}
except Exception as e:
logger.error(f"Error fetching from {source_name}: {str(e)}")
results['sources'][source_name] = {'error': str(e)}
# Agrège les résultats
results['aggregated'] = self._aggregate_results(results['sources'])
return results
def _parse_source_data(self, source: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse les données d'une source spécifique
"""
if source == 'ip_api':
return self._parse_ip_api(data)
elif source == 'ipinfo':
return self._parse_ipinfo(data)
return data
def _parse_ip_api(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse les données de IP-API.com
Response example:
{
"status": "success",
"country": "France",
"countryCode": "FR",
"region": "IDF",
"regionName": "Île-de-France",
"city": "Paris",
"zip": "75001",
"lat": 48.8534,
"lon": 2.3488,
"timezone": "Europe/Paris",
"isp": "OVH SAS",
"org": "OVH SAS",
"as": "AS16276 OVH SAS",
"asname": "OVH",
"mobile": false,
"proxy": false,
"hosting": true,
"query": "51.15.0.1"
}
"""
if data.get('status') != 'success':
return {'error': data.get('message', 'Unknown error')}
# Extraire l'ASN
asn_full = data.get('as', '')
asn_number = None
asn_org = None
if asn_full:
parts = asn_full.split(' ', 1)
if len(parts) >= 1:
asn_number = parts[0].replace('AS', '')
if len(parts) >= 2:
asn_org = parts[1]
return {
'country': data.get('country'),
'country_code': data.get('countryCode'),
'region': data.get('regionName'),
'city': data.get('city'),
'isp': data.get('isp'),
'org': data.get('org'),
'asn': asn_number,
'asn_org': asn_org,
'is_proxy': data.get('proxy', False),
'is_hosting': data.get('hosting', False),
'is_mobile': data.get('mobile', False),
'timezone': data.get('timezone'),
'lat': data.get('lat'),
'lon': data.get('lon'),
'query': data.get('query')
}
def _parse_ipinfo(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse les données de IPinfo.io
Response example:
{
"ip": "51.15.0.1",
"city": "Paris",
"region": "Île-de-France",
"country": "FR",
"loc": "48.8534,2.3488",
"org": "AS16276 OVH SAS",
"postal": "75001",
"timezone": "Europe/Paris",
"readme": "https://ipinfo.io/missingauth"
}
"""
# Extraire l'ASN
org_full = data.get('org', '')
asn_number = None
asn_org = None
if org_full:
parts = org_full.split(' ', 1)
if len(parts) >= 1:
asn_number = parts[0].replace('AS', '')
if len(parts) >= 2:
asn_org = parts[1]
# Extraire lat/lon
loc = data.get('loc', '')
lat = None
lon = None
if loc:
coords = loc.split(',')
if len(coords) == 2:
lat = float(coords[0])
lon = float(coords[1])
return {
'ip': data.get('ip'),
'city': data.get('city'),
'region': data.get('region'),
'country': data.get('country'),
'postal': data.get('postal'),
'timezone': data.get('timezone'),
'asn': asn_number,
'asn_org': asn_org,
'org': data.get('org'),
'lat': lat,
'lon': lon
}
def _aggregate_results(self, sources: Dict[str, Any]) -> Dict[str, Any]:
"""
Agrège les résultats de toutes les sources
Logique d'agrégation:
- is_proxy: true si au moins une source le détecte
- is_hosting: true si au moins une source le détecte
- threat_score: basé sur les détections proxy/hosting/vpn/tor
- threat_level: low/medium/high/critical basé sur le score
"""
aggregated = {
'is_proxy': False,
'is_hosting': False,
'is_vpn': False,
'is_tor': False,
'threat_score': 0,
'threat_level': 'unknown',
'country': None,
'country_code': None,
'asn': None,
'asn_org': None,
'org': None,
'city': None,
'warnings': []
}
threat_score = 0
for source_name, source_data in sources.items():
if 'error' in source_data:
continue
# Proxy detection
if source_data.get('is_proxy'):
aggregated['is_proxy'] = True
threat_score += 30
aggregated['warnings'].append(f'{source_name}: Proxy détecté')
# Hosting detection
if source_data.get('is_hosting'):
aggregated['is_hosting'] = True
threat_score += 20
aggregated['warnings'].append(f'{source_name}: Hébergement cloud/datacenter')
# VPN detection (si disponible)
if source_data.get('is_vpn'):
aggregated['is_vpn'] = True
threat_score += 40
aggregated['warnings'].append(f'{source_name}: VPN détecté')
# Tor detection (si disponible)
if source_data.get('is_tor'):
aggregated['is_tor'] = True
threat_score += 50
aggregated['warnings'].append(f'{source_name}: Exit node Tor détecté')
# Infos géographiques (prend la première disponible)
if not aggregated['country'] and source_data.get('country'):
aggregated['country'] = source_data.get('country')
if not aggregated['country_code'] and source_data.get('country_code'):
aggregated['country_code'] = source_data.get('country_code')
# ASN (prend la première disponible)
if not aggregated['asn'] and source_data.get('asn'):
aggregated['asn'] = source_data.get('asn')
if not aggregated['asn_org'] and source_data.get('asn_org'):
aggregated['asn_org'] = source_data.get('asn_org')
# Organisation/ISP
if not aggregated['org'] and source_data.get('org'):
aggregated['org'] = source_data.get('org')
# Ville
if not aggregated['city'] and source_data.get('city'):
aggregated['city'] = source_data.get('city')
# Calcul du niveau de menace
aggregated['threat_score'] = min(100, threat_score)
if threat_score >= 80:
aggregated['threat_level'] = 'critical'
elif threat_score >= 60:
aggregated['threat_level'] = 'high'
elif threat_score >= 40:
aggregated['threat_level'] = 'medium'
elif threat_score >= 20:
aggregated['threat_level'] = 'low'
else:
aggregated['threat_level'] = 'clean'
return aggregated
async def close(self):
"""Ferme le client HTTP"""
await self.http_client.aclose()
# Singleton pour réutiliser le service
_reputation_service: Optional[IPReputationService] = None
def get_reputation_service() -> IPReputationService:
"""Retourne l'instance singleton du service de réputation"""
global _reputation_service
if _reputation_service is None:
_reputation_service = IPReputationService()
return _reputation_service