Files
ja4-platform/services/dashboard/backend/routes/reputation.py
toto d469e39da7 feat: ja4-platform monorepo — 5 services unified, tests & RPM builds standardized
Services:
- ja4sentinel: TLS/JA4 fingerprint capture daemon (Go, libpcap)
- logcorrelator: JA4 log correlation engine (Go, ClickHouse)
- mod_reqin_log: Apache module (C, JSON request logging)
- bot_detector: ML bot detection pipeline (Python)
- dashboard: FastAPI/Streamlit analytics UI (Python)

Shared libraries:
- shared/go/ja4common: logger, config, shutdown, ipfilter (Go module)
- shared/python/ja4_common: ClickHouseClient, ClickHouseSettings (Python package)
- shared/clickhouse/: canonical SQL migrations (10 files)

Build & packaging:
- Unified 3-stage Dockerfile.package for Go RPMs (el8/el9/el10)
- go.work workspace linking sentinel, correlator, ja4common
- Makefile with test-all, build-all, rpm-* targets

Fixes applied:
- go.work: 1.21 → 1.24.6 (required by sentinel)
- correlator Dockerfiles: golang:1.21 → golang:1.24
- replace directives in go.mod for ja4common local path
- pyproject.toml: setuptools.backends → setuptools.build_meta
- Removed static libpcap linking (unavailable on Rocky 9)
- Fixed data races in output/writers_test.go (sync.Mutex + atomic.Int32)
- Rewrote corrupted test files (logger_test.go × 2)

Test coverage:
- correlator: 67.1% total (unixsocket 80.5%, config 91.7%, app 83.3%, multi 87.7%, stdout 100%)
- sentinel: all 10 packages pass (api, capture, config, fingerprint, ipfilter, logging, output, tlsparse)

Documentation:
- README.md + docs/ (architecture, development, 5 services, shared libs, DB schema & migrations)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-07 16:42:59 +02:00

126 lines
3.9 KiB
Python

"""
Routes pour la réputation IP (bases de données publiques)
"""
from fastapi import APIRouter, HTTPException, Path
from typing import Dict, Any
import re
from ..services.reputation_ip import get_reputation_service
router = APIRouter(prefix="/api/reputation", tags=["Reputation"])
# Pattern de validation d'IP (IPv4)
IP_PATTERN = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$')
def is_valid_ipv4(ip: str) -> bool:
"""Valide qu'une chaîne est une adresse IPv4 valide"""
if not IP_PATTERN.match(ip):
return False
# Vérifie que chaque octet est entre 0 et 255
parts = ip.split('.')
for part in parts:
try:
num = int(part)
if num < 0 or num > 255:
return False
except ValueError:
return False
return True
@router.get("/ip/{ip_address}", summary="Réputation complète d'une IP")
async def get_ip_reputation(
ip_address: str = Path(..., description="Adresse IP à vérifier")
) -> Dict[str, Any]:
"""
Récupère la réputation d'une adresse IP depuis les bases de données publiques
Sources utilisées (sans clé API):
- IP-API.com: Géolocalisation + Proxy/Hosting detection
- IPinfo.io: ASN + Organisation
Returns:
Dict avec:
- ip: Adresse IP vérifiée
- timestamp: Date de la vérification
- sources: Détails par source
- aggregated: Résultats agrégés
- is_proxy: bool
- is_hosting: bool
- is_vpn: bool
- is_tor: bool
- threat_score: 0-100
- threat_level: clean/low/medium/high/critical
- country: Pays
- asn: Numéro ASN
- asn_org: Organisation ASN
- org: ISP/Organisation
- warnings: Liste des alertes
"""
# Valide l'adresse IP
if not is_valid_ipv4(ip_address):
raise HTTPException(
status_code=400,
detail=f"Adresse IP invalide: {ip_address}. Format attendu: x.x.x.x"
)
try:
# Récupère le service de réputation
reputation_service = get_reputation_service()
# Interroge les sources
results = await reputation_service.get_reputation(ip_address)
return results
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Erreur lors de la vérification de réputation: {str(e)}"
)
@router.get("/ip/{ip_address}/summary", summary="Réputation simplifiée d'une IP")
async def get_ip_reputation_summary(
ip_address: str = Path(..., description="Adresse IP à vérifier")
) -> Dict[str, Any]:
"""
Version simplifiée de la réputation IP (juste les infos essentielles)
Utile pour affichage rapide dans les tableaux
"""
if not is_valid_ipv4(ip_address):
raise HTTPException(
status_code=400,
detail=f"Adresse IP invalide: {ip_address}"
)
try:
reputation_service = get_reputation_service()
full_results = await reputation_service.get_reputation(ip_address)
# Retourne juste l'essentiel
aggregated = full_results.get('aggregated', {})
return {
'ip': ip_address,
'threat_level': aggregated.get('threat_level', 'unknown'),
'threat_score': aggregated.get('threat_score', 0),
'is_proxy': aggregated.get('is_proxy', False),
'is_hosting': aggregated.get('is_hosting', False),
'country': aggregated.get('country'),
'country_code': aggregated.get('country_code'),
'asn': aggregated.get('asn'),
'org': aggregated.get('org'),
'warnings_count': len(aggregated.get('warnings', []))
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Erreur: {str(e)}"
)