Files
dashboard/backend/routes/reputation.py
SOC Analyst 05d21ae8fb feat: Réputation IP depuis bases publiques (sans clé API)
- Nouveau service backend/services/reputation_ip.py
  - IP-API.com: Géolocalisation + détection Proxy/Hosting
  - IPinfo.io: ASN + Organisation
  - Agrégation des sources avec score de menace 0-100
  - Niveaux: clean/low/medium/high/critical

- Nouvelle route API GET /api/reputation/ip/:ip
  - Validation IPv4
  - Version complète et summary
  - Timeout 10s par source

- Nouveau composant frontend ReputationPanel.tsx
  - Badge de niveau de menace (code couleur)
  - 4 badges détection: Proxy 🌐, Hosting ☁️, VPN 🔒, Tor 🧅
  - Infos géographiques: pays, ville, ASN, organisation
  - Liste des avertissements
  - Sources et timestamp

- Intégration dans InvestigationView
  - Panel affiché en premier (avant Graph de corrélations)
  - Chargement asynchrone au montage du composant

- Dépendance: httpx==0.26.0 (requêtes HTTP async)

Testé avec 141.98.11.209 (Lithuania, AS209605) → 🟢 CLEAN (0/100)
Aucun proxy/hosting/VPN/Tor détecté

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
2026-03-15 18:15:01 +01:00

126 lines
3.8 KiB
Python

"""
Routes pour la réputation IP (bases de données publiques)
"""
from fastapi import APIRouter, HTTPException, Path
from typing import Dict, Any
import re
from ..services.reputation_ip import get_reputation_service
router = APIRouter(prefix="/api/reputation", tags=["Reputation"])
# Pattern de validation d'IP (IPv4)
IP_PATTERN = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$')
def is_valid_ipv4(ip: str) -> bool:
"""Valide qu'une chaîne est une adresse IPv4 valide"""
if not IP_PATTERN.match(ip):
return False
# Vérifie que chaque octet est entre 0 et 255
parts = ip.split('.')
for part in parts:
try:
num = int(part)
if num < 0 or num > 255:
return False
except ValueError:
return False
return True
@router.get("/ip/{ip_address}")
async def get_ip_reputation(
ip_address: str = Path(..., description="Adresse IP à vérifier")
) -> Dict[str, Any]:
"""
Récupère la réputation d'une adresse IP depuis les bases de données publiques
Sources utilisées (sans clé API):
- IP-API.com: Géolocalisation + Proxy/Hosting detection
- IPinfo.io: ASN + Organisation
Returns:
Dict avec:
- ip: Adresse IP vérifiée
- timestamp: Date de la vérification
- sources: Détails par source
- aggregated: Résultats agrégés
- is_proxy: bool
- is_hosting: bool
- is_vpn: bool
- is_tor: bool
- threat_score: 0-100
- threat_level: clean/low/medium/high/critical
- country: Pays
- asn: Numéro ASN
- asn_org: Organisation ASN
- org: ISP/Organisation
- warnings: Liste des alertes
"""
# Valide l'adresse IP
if not is_valid_ipv4(ip_address):
raise HTTPException(
status_code=400,
detail=f"Adresse IP invalide: {ip_address}. Format attendu: x.x.x.x"
)
try:
# Récupère le service de réputation
reputation_service = get_reputation_service()
# Interroge les sources
results = await reputation_service.get_reputation(ip_address)
return results
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Erreur lors de la vérification de réputation: {str(e)}"
)
@router.get("/ip/{ip_address}/summary")
async def get_ip_reputation_summary(
ip_address: str = Path(..., description="Adresse IP à vérifier")
) -> Dict[str, Any]:
"""
Version simplifiée de la réputation IP (juste les infos essentielles)
Utile pour affichage rapide dans les tableaux
"""
if not is_valid_ipv4(ip_address):
raise HTTPException(
status_code=400,
detail=f"Adresse IP invalide: {ip_address}"
)
try:
reputation_service = get_reputation_service()
full_results = await reputation_service.get_reputation(ip_address)
# Retourne juste l'essentiel
aggregated = full_results.get('aggregated', {})
return {
'ip': ip_address,
'threat_level': aggregated.get('threat_level', 'unknown'),
'threat_score': aggregated.get('threat_score', 0),
'is_proxy': aggregated.get('is_proxy', False),
'is_hosting': aggregated.get('is_hosting', False),
'country': aggregated.get('country'),
'country_code': aggregated.get('country_code'),
'asn': aggregated.get('asn'),
'org': aggregated.get('org'),
'warnings_count': len(aggregated.get('warnings', []))
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Erreur: {str(e)}"
)