🛡️ Dashboard complet pour l'analyse et la classification des menaces Fonctionnalités principales: - Visualisation des détections en temps réel (24h) - Investigation multi-entités (IP, JA4, ASN, Host, User-Agent) - Analyse de corrélation pour classification SOC - Clustering automatique par subnet/JA4/UA - Export des classifications pour ML Composants: - Backend: FastAPI (Python) + ClickHouse - Frontend: React + TypeScript + TailwindCSS - 6 routes API: metrics, detections, variability, attributes, analysis, entities - 7 types d'entités investigables Documentation ajoutée: - NAVIGATION_GRAPH.md: Graph complet de navigation - SOC_OPTIMIZATION_PROPOSAL.md: Proposition d'optimisation pour SOC • Réduction de 7 à 2 clics pour classification • Nouvelle vue /incidents clusterisée • Panel latéral d'investigation • Quick Search (Cmd+K) • Timeline interactive • Graph de corrélations Sécurité: - .gitignore configuré (exclut .env, secrets, node_modules) - Credentials dans .env (à ne pas committer) ⚠️ Audit sécurité réalisé - Voir recommandations dans SOC_OPTIMIZATION_PROPOSAL.md Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
57 lines
1.7 KiB
Python
57 lines
1.7 KiB
Python
"""
|
|
Connexion à ClickHouse
|
|
"""
|
|
import clickhouse_connect
|
|
from typing import Optional
|
|
from .config import settings
|
|
|
|
|
|
class ClickHouseClient:
|
|
"""Gestionnaire de connexion ClickHouse"""
|
|
|
|
def __init__(self):
|
|
self._client: Optional[clickhouse_connect.driver.client.Client] = None
|
|
|
|
def connect(self) -> clickhouse_connect.driver.client.Client:
|
|
"""Établit la connexion à ClickHouse"""
|
|
if self._client is None or not self._ping():
|
|
self._client = clickhouse_connect.get_client(
|
|
host=settings.CLICKHOUSE_HOST,
|
|
port=settings.CLICKHOUSE_PORT,
|
|
database=settings.CLICKHOUSE_DB,
|
|
user=settings.CLICKHOUSE_USER,
|
|
password=settings.CLICKHOUSE_PASSWORD,
|
|
connect_timeout=10
|
|
)
|
|
return self._client
|
|
|
|
def _ping(self) -> bool:
|
|
"""Vérifie si la connexion est active"""
|
|
try:
|
|
if self._client:
|
|
self._client.ping()
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
def query(self, query: str, params: Optional[dict] = None):
|
|
"""Exécute une requête SELECT"""
|
|
client = self.connect()
|
|
return client.query(query, params)
|
|
|
|
def query_df(self, query: str, params: Optional[dict] = None):
|
|
"""Exécute une requête et retourne un DataFrame"""
|
|
client = self.connect()
|
|
return client.query_df(query, params)
|
|
|
|
def close(self):
|
|
"""Ferme la connexion"""
|
|
if self._client:
|
|
self._client.close()
|
|
self._client = None
|
|
|
|
|
|
# Instance globale
|
|
db = ClickHouseClient()
|