feat: Réputation IP depuis bases publiques (sans clé API)

- Nouveau service backend/services/reputation_ip.py
  - IP-API.com: Géolocalisation + détection Proxy/Hosting
  - IPinfo.io: ASN + Organisation
  - Agrégation des sources avec score de menace 0-100
  - Niveaux: clean/low/medium/high/critical

- Nouvelle route API GET /api/reputation/ip/:ip
  - Validation IPv4
  - Version complète et summary
  - Timeout 10s par source

- Nouveau composant frontend ReputationPanel.tsx
  - Badge de niveau de menace (code couleur)
  - 4 badges détection: Proxy 🌐, Hosting ☁️, VPN 🔒, Tor 🧅
  - Infos géographiques: pays, ville, ASN, organisation
  - Liste des avertissements
  - Sources et timestamp

- Intégration dans InvestigationView
  - Panel affiché en premier (avant Graph de corrélations)
  - Chargement asynchrone au montage du composant

- Dépendance: httpx==0.26.0 (requêtes HTTP async)

Testé avec 141.98.11.209 (Lithuania, AS209605) → 🟢 CLEAN (0/100)
Aucun proxy/hosting/VPN/Tor détecté

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
SOC Analyst
2026-03-15 18:15:01 +01:00
parent 776aa52241
commit 05d21ae8fb
7 changed files with 662 additions and 1 deletions

View File

@ -12,7 +12,7 @@ import os
from .config import settings
from .database import db
from .routes import metrics, detections, variability, attributes, analysis, entities, incidents, audit
from .routes import metrics, detections, variability, attributes, analysis, entities, incidents, audit, reputation
# Configuration logging
logging.basicConfig(
@ -72,6 +72,7 @@ app.include_router(analysis.router)
app.include_router(entities.router)
app.include_router(incidents.router)
app.include_router(audit.router)
app.include_router(reputation.router)
# Route pour servir le frontend

View File

@ -0,0 +1,125 @@
"""
Routes pour la réputation IP (bases de données publiques)
"""
from fastapi import APIRouter, HTTPException, Path
from typing import Dict, Any
import re
from ..services.reputation_ip import get_reputation_service
router = APIRouter(prefix="/api/reputation", tags=["Reputation"])
# Pattern de validation d'IP (IPv4)
IP_PATTERN = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$')
def is_valid_ipv4(ip: str) -> bool:
"""Valide qu'une chaîne est une adresse IPv4 valide"""
if not IP_PATTERN.match(ip):
return False
# Vérifie que chaque octet est entre 0 et 255
parts = ip.split('.')
for part in parts:
try:
num = int(part)
if num < 0 or num > 255:
return False
except ValueError:
return False
return True
@router.get("/ip/{ip_address}")
async def get_ip_reputation(
ip_address: str = Path(..., description="Adresse IP à vérifier")
) -> Dict[str, Any]:
"""
Récupère la réputation d'une adresse IP depuis les bases de données publiques
Sources utilisées (sans clé API):
- IP-API.com: Géolocalisation + Proxy/Hosting detection
- IPinfo.io: ASN + Organisation
Returns:
Dict avec:
- ip: Adresse IP vérifiée
- timestamp: Date de la vérification
- sources: Détails par source
- aggregated: Résultats agrégés
- is_proxy: bool
- is_hosting: bool
- is_vpn: bool
- is_tor: bool
- threat_score: 0-100
- threat_level: clean/low/medium/high/critical
- country: Pays
- asn: Numéro ASN
- asn_org: Organisation ASN
- org: ISP/Organisation
- warnings: Liste des alertes
"""
# Valide l'adresse IP
if not is_valid_ipv4(ip_address):
raise HTTPException(
status_code=400,
detail=f"Adresse IP invalide: {ip_address}. Format attendu: x.x.x.x"
)
try:
# Récupère le service de réputation
reputation_service = get_reputation_service()
# Interroge les sources
results = await reputation_service.get_reputation(ip_address)
return results
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Erreur lors de la vérification de réputation: {str(e)}"
)
@router.get("/ip/{ip_address}/summary")
async def get_ip_reputation_summary(
ip_address: str = Path(..., description="Adresse IP à vérifier")
) -> Dict[str, Any]:
"""
Version simplifiée de la réputation IP (juste les infos essentielles)
Utile pour affichage rapide dans les tableaux
"""
if not is_valid_ipv4(ip_address):
raise HTTPException(
status_code=400,
detail=f"Adresse IP invalide: {ip_address}"
)
try:
reputation_service = get_reputation_service()
full_results = await reputation_service.get_reputation(ip_address)
# Retourne juste l'essentiel
aggregated = full_results.get('aggregated', {})
return {
'ip': ip_address,
'threat_level': aggregated.get('threat_level', 'unknown'),
'threat_score': aggregated.get('threat_score', 0),
'is_proxy': aggregated.get('is_proxy', False),
'is_hosting': aggregated.get('is_hosting', False),
'country': aggregated.get('country'),
'country_code': aggregated.get('country_code'),
'asn': aggregated.get('asn'),
'org': aggregated.get('org'),
'warnings_count': len(aggregated.get('warnings', []))
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Erreur: {str(e)}"
)

View File

View File

@ -0,0 +1,312 @@
"""
Services de réputation IP - Bases de données publiques sans clé API
"""
import httpx
from typing import Optional, Dict, Any
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
# Timeout pour les requêtes HTTP
HTTP_TIMEOUT = 10.0
class IPReputationService:
"""
Service de réputation IP utilisant des bases de données publiques gratuites
"""
def __init__(self):
self.http_client = httpx.AsyncClient(timeout=HTTP_TIMEOUT)
# Sources de réputation (sans clé API)
self.sources = {
'ip_api': 'http://ip-api.com/json/{ip}',
'ipinfo': 'https://ipinfo.io/{ip}/json',
}
async def get_reputation(self, ip: str) -> Dict[str, Any]:
"""
Récupère la réputation d'une IP depuis toutes les sources disponibles
Args:
ip: Adresse IP à vérifier
Returns:
Dict avec les informations de réputation agrégées
"""
results = {
'ip': ip,
'timestamp': datetime.utcnow().isoformat(),
'sources': {},
'aggregated': {
'is_proxy': False,
'is_hosting': False,
'is_vpn': False,
'is_tor': False,
'threat_score': 0,
'threat_level': 'unknown',
'country': None,
'asn': None,
'org': None,
'warnings': []
}
}
# Interroge chaque source
for source_name, url_template in self.sources.items():
try:
url = url_template.format(ip=ip)
response = await self.http_client.get(url)
if response.status_code == 200:
data = response.json()
results['sources'][source_name] = self._parse_source_data(source_name, data)
else:
logger.warning(f"Source {source_name} returned status {response.status_code}")
results['sources'][source_name] = {'error': f'Status {response.status_code}'}
except httpx.TimeoutException:
logger.warning(f"Timeout for source {source_name}")
results['sources'][source_name] = {'error': 'Timeout'}
except Exception as e:
logger.error(f"Error fetching from {source_name}: {str(e)}")
results['sources'][source_name] = {'error': str(e)}
# Agrège les résultats
results['aggregated'] = self._aggregate_results(results['sources'])
return results
def _parse_source_data(self, source: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse les données d'une source spécifique
"""
if source == 'ip_api':
return self._parse_ip_api(data)
elif source == 'ipinfo':
return self._parse_ipinfo(data)
return data
def _parse_ip_api(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse les données de IP-API.com
Response example:
{
"status": "success",
"country": "France",
"countryCode": "FR",
"region": "IDF",
"regionName": "Île-de-France",
"city": "Paris",
"zip": "75001",
"lat": 48.8534,
"lon": 2.3488,
"timezone": "Europe/Paris",
"isp": "OVH SAS",
"org": "OVH SAS",
"as": "AS16276 OVH SAS",
"asname": "OVH",
"mobile": false,
"proxy": false,
"hosting": true,
"query": "51.15.0.1"
}
"""
if data.get('status') != 'success':
return {'error': data.get('message', 'Unknown error')}
# Extraire l'ASN
asn_full = data.get('as', '')
asn_number = None
asn_org = None
if asn_full:
parts = asn_full.split(' ', 1)
if len(parts) >= 1:
asn_number = parts[0].replace('AS', '')
if len(parts) >= 2:
asn_org = parts[1]
return {
'country': data.get('country'),
'country_code': data.get('countryCode'),
'region': data.get('regionName'),
'city': data.get('city'),
'isp': data.get('isp'),
'org': data.get('org'),
'asn': asn_number,
'asn_org': asn_org,
'is_proxy': data.get('proxy', False),
'is_hosting': data.get('hosting', False),
'is_mobile': data.get('mobile', False),
'timezone': data.get('timezone'),
'lat': data.get('lat'),
'lon': data.get('lon'),
'query': data.get('query')
}
def _parse_ipinfo(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse les données de IPinfo.io
Response example:
{
"ip": "51.15.0.1",
"city": "Paris",
"region": "Île-de-France",
"country": "FR",
"loc": "48.8534,2.3488",
"org": "AS16276 OVH SAS",
"postal": "75001",
"timezone": "Europe/Paris",
"readme": "https://ipinfo.io/missingauth"
}
"""
# Extraire l'ASN
org_full = data.get('org', '')
asn_number = None
asn_org = None
if org_full:
parts = org_full.split(' ', 1)
if len(parts) >= 1:
asn_number = parts[0].replace('AS', '')
if len(parts) >= 2:
asn_org = parts[1]
# Extraire lat/lon
loc = data.get('loc', '')
lat = None
lon = None
if loc:
coords = loc.split(',')
if len(coords) == 2:
lat = float(coords[0])
lon = float(coords[1])
return {
'ip': data.get('ip'),
'city': data.get('city'),
'region': data.get('region'),
'country': data.get('country'),
'postal': data.get('postal'),
'timezone': data.get('timezone'),
'asn': asn_number,
'asn_org': asn_org,
'org': data.get('org'),
'lat': lat,
'lon': lon
}
def _aggregate_results(self, sources: Dict[str, Any]) -> Dict[str, Any]:
"""
Agrège les résultats de toutes les sources
Logique d'agrégation:
- is_proxy: true si au moins une source le détecte
- is_hosting: true si au moins une source le détecte
- threat_score: basé sur les détections proxy/hosting/vpn/tor
- threat_level: low/medium/high/critical basé sur le score
"""
aggregated = {
'is_proxy': False,
'is_hosting': False,
'is_vpn': False,
'is_tor': False,
'threat_score': 0,
'threat_level': 'unknown',
'country': None,
'country_code': None,
'asn': None,
'asn_org': None,
'org': None,
'city': None,
'warnings': []
}
threat_score = 0
for source_name, source_data in sources.items():
if 'error' in source_data:
continue
# Proxy detection
if source_data.get('is_proxy'):
aggregated['is_proxy'] = True
threat_score += 30
aggregated['warnings'].append(f'{source_name}: Proxy détecté')
# Hosting detection
if source_data.get('is_hosting'):
aggregated['is_hosting'] = True
threat_score += 20
aggregated['warnings'].append(f'{source_name}: Hébergement cloud/datacenter')
# VPN detection (si disponible)
if source_data.get('is_vpn'):
aggregated['is_vpn'] = True
threat_score += 40
aggregated['warnings'].append(f'{source_name}: VPN détecté')
# Tor detection (si disponible)
if source_data.get('is_tor'):
aggregated['is_tor'] = True
threat_score += 50
aggregated['warnings'].append(f'{source_name}: Exit node Tor détecté')
# Infos géographiques (prend la première disponible)
if not aggregated['country'] and source_data.get('country'):
aggregated['country'] = source_data.get('country')
if not aggregated['country_code'] and source_data.get('country_code'):
aggregated['country_code'] = source_data.get('country_code')
# ASN (prend la première disponible)
if not aggregated['asn'] and source_data.get('asn'):
aggregated['asn'] = source_data.get('asn')
if not aggregated['asn_org'] and source_data.get('asn_org'):
aggregated['asn_org'] = source_data.get('asn_org')
# Organisation/ISP
if not aggregated['org'] and source_data.get('org'):
aggregated['org'] = source_data.get('org')
# Ville
if not aggregated['city'] and source_data.get('city'):
aggregated['city'] = source_data.get('city')
# Calcul du niveau de menace
aggregated['threat_score'] = min(100, threat_score)
if threat_score >= 80:
aggregated['threat_level'] = 'critical'
elif threat_score >= 60:
aggregated['threat_level'] = 'high'
elif threat_score >= 40:
aggregated['threat_level'] = 'medium'
elif threat_score >= 20:
aggregated['threat_level'] = 'low'
else:
aggregated['threat_level'] = 'clean'
return aggregated
async def close(self):
"""Ferme le client HTTP"""
await self.http_client.aclose()
# Singleton pour réutiliser le service
_reputation_service: Optional[IPReputationService] = None
def get_reputation_service() -> IPReputationService:
"""Retourne l'instance singleton du service de réputation"""
global _reputation_service
if _reputation_service is None:
_reputation_service = IPReputationService()
return _reputation_service

View File

@ -6,6 +6,7 @@ import { UserAgentAnalysis } from './analysis/UserAgentAnalysis';
import { CorrelationSummary } from './analysis/CorrelationSummary';
import { CorrelationGraph } from './CorrelationGraph';
import { InteractiveTimeline } from './InteractiveTimeline';
import { ReputationPanel } from './ReputationPanel';
export function InvestigationView() {
const { ip } = useParams<{ ip: string }>();
@ -46,6 +47,12 @@ export function InvestigationView() {
{/* Panels d'analyse */}
<div className="space-y-6">
{/* NOUVEAU: Réputation IP */}
<div className="bg-background-secondary rounded-lg p-6">
<h3 className="text-lg font-medium text-text-primary mb-4">🌍 Réputation IP (Bases publiques)</h3>
<ReputationPanel ip={ip || ''} />
</div>
{/* NOUVEAU: Graph de corrélations */}
<div className="bg-background-secondary rounded-lg p-6">
<h3 className="text-lg font-medium text-text-primary mb-4">🕸 Graph de Corrélations</h3>

View File

@ -0,0 +1,215 @@
import { useEffect, useState } from 'react';
interface ReputationData {
ip: string;
timestamp: string;
sources: {
[key: string]: {
[key: string]: any;
};
};
aggregated: {
is_proxy: boolean;
is_hosting: boolean;
is_vpn: boolean;
is_tor: boolean;
threat_score: number;
threat_level: 'unknown' | 'clean' | 'low' | 'medium' | 'high' | 'critical';
country: string | null;
country_code: string | null;
asn: string | null;
asn_org: string | null;
org: string | null;
city: string | null;
warnings: string[];
};
}
interface ReputationPanelProps {
ip: string;
}
export function ReputationPanel({ ip }: ReputationPanelProps) {
const [reputation, setReputation] = useState<ReputationData | null>(null);
const [loading, setLoading] = useState(false);
const [error, setError] = useState<string | null>(null);
useEffect(() => {
const fetchReputation = async () => {
setLoading(true);
setError(null);
try {
const response = await fetch(`/api/reputation/ip/${encodeURIComponent(ip)}`);
if (!response.ok) {
throw new Error(`Erreur HTTP: ${response.status}`);
}
const data = await response.json();
setReputation(data);
} catch (err) {
setError(err instanceof Error ? err.message : 'Erreur inconnue');
} finally {
setLoading(false);
}
};
if (ip) {
fetchReputation();
}
}, [ip]);
const getThreatLevelColor = (level: string) => {
switch (level) {
case 'critical': return 'text-red-500 bg-red-500/10 border-red-500';
case 'high': return 'text-orange-500 bg-orange-500/10 border-orange-500';
case 'medium': return 'text-yellow-500 bg-yellow-500/10 border-yellow-500';
case 'low': return 'text-blue-500 bg-blue-500/10 border-blue-500';
case 'clean': return 'text-green-500 bg-green-500/10 border-green-500';
default: return 'text-gray-500 bg-gray-500/10 border-gray-500';
}
};
const getThreatLevelLabel = (level: string) => {
const labels: { [key: string]: string } = {
'critical': '🔴 Critique',
'high': '🟠 Élevé',
'medium': '🟡 Moyen',
'low': '🔵 Faible',
'clean': '🟢 Propre',
'unknown': '⚪ Inconnu'
};
return labels[level] || level;
};
if (loading) {
return (
<div className="p-4 text-center text-text-secondary">
Vérification de la réputation...
</div>
);
}
if (error) {
return (
<div className="p-4 text-center text-red-500">
Erreur: {error}
</div>
);
}
if (!reputation) {
return null;
}
const { aggregated } = reputation;
return (
<div className="space-y-4">
{/* Threat Level Badge */}
<div className={`p-4 rounded-lg border ${getThreatLevelColor(aggregated.threat_level)}`}>
<div className="flex items-center justify-between">
<div>
<div className="text-sm font-medium mb-1">Niveau de menace</div>
<div className="text-2xl font-bold">{getThreatLevelLabel(aggregated.threat_level)}</div>
</div>
<div className="text-right">
<div className="text-sm font-medium mb-1">Score</div>
<div className="text-2xl font-bold">{aggregated.threat_score}/100</div>
</div>
</div>
{/* Progress bar */}
<div className="mt-3 w-full bg-background-secondary rounded-full h-2">
<div
className={`h-2 rounded-full transition-all ${
aggregated.threat_score >= 80 ? 'bg-red-500' :
aggregated.threat_score >= 60 ? 'bg-orange-500' :
aggregated.threat_score >= 40 ? 'bg-yellow-500' :
aggregated.threat_score >= 20 ? 'bg-blue-500' :
'bg-green-500'
}`}
style={{ width: `${aggregated.threat_score}%` }}
/>
</div>
</div>
{/* Detection Badges */}
<div className="grid grid-cols-2 gap-2">
<DetectionBadge
label="Proxy"
detected={aggregated.is_proxy}
icon="🌐"
/>
<DetectionBadge
label="Hosting"
detected={aggregated.is_hosting}
icon="☁️"
/>
<DetectionBadge
label="VPN"
detected={aggregated.is_vpn}
icon="🔒"
/>
<DetectionBadge
label="Tor"
detected={aggregated.is_tor}
icon="🧅"
/>
</div>
{/* Info Grid */}
<div className="grid grid-cols-2 gap-3">
<InfoField label="Pays" value={aggregated.country || '-'} />
<InfoField label="Ville" value={aggregated.city || '-'} />
<InfoField label="ASN" value={aggregated.asn ? `AS${aggregated.asn}` : '-'} />
<InfoField label="Organisation" value={aggregated.org || aggregated.asn_org || '-'} />
</div>
{/* Warnings */}
{aggregated.warnings.length > 0 && (
<div className="bg-orange-500/10 border border-orange-500 rounded-lg p-3">
<div className="text-sm font-medium text-orange-500 mb-2">
Avertissements ({aggregated.warnings.length})
</div>
<ul className="space-y-1">
{aggregated.warnings.map((warning, index) => (
<li key={index} className="text-xs text-text-secondary">
{warning}
</li>
))}
</ul>
</div>
)}
{/* Sources */}
<div className="text-xs text-text-secondary text-center">
Sources: {Object.keys(reputation.sources).join(', ')} {new Date(reputation.timestamp).toLocaleString('fr-FR')}
</div>
</div>
);
}
function DetectionBadge({ label, detected, icon }: { label: string; detected: boolean; icon: string }) {
return (
<div className={`p-2 rounded-lg border text-center ${
detected
? 'bg-red-500/10 border-red-500 text-red-500'
: 'bg-background-card border-background-card text-text-secondary'
}`}>
<div className="text-lg mb-1">{icon}</div>
<div className="text-xs font-medium">{label}</div>
<div className={`text-xs font-bold ${detected ? 'text-red-500' : 'text-text-secondary'}`}>
{detected ? 'DÉTECTÉ' : 'Non'}
</div>
</div>
);
}
function InfoField({ label, value }: { label: string; value: string }) {
return (
<div className="bg-background-card rounded-lg p-2">
<div className="text-xs text-text-secondary mb-1">{label}</div>
<div className="text-sm text-text-primary font-medium truncate" title={value}>
{value}
</div>
</div>
);
}

View File

@ -4,3 +4,4 @@ clickhouse-connect==0.8.0
pydantic==2.5.0
pydantic-settings==2.1.0
python-dotenv==1.0.0
httpx==0.26.0