Add run-e2e-test.sh with CLI parameters (--hits, --http-ratio, --dns, --tls, --src-ips, --keep-analysis, --up) for configurable traffic generation. Traffic runs from VM endpoints with multiple source IPs (alias IPs on eth0) to produce distinct sessions for the ML pipeline. Fix curl TLS flags (--tlsv1.2 instead of --tls-v1-2), skip redundant local verification in distributed mode, and fix dashboard is_available() cache that never retried after ClickHouse recovery. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
122 lines
3.7 KiB
Python
122 lines
3.7 KiB
Python
from __future__ import annotations
|
|
|
|
import ipaddress
|
|
import logging
|
|
from typing import Any
|
|
|
|
import clickhouse_connect
|
|
from clickhouse_connect.driver.client import Client
|
|
from clickhouse_connect.driver.exceptions import DatabaseError
|
|
|
|
from backend.config import CLICKHOUSE_HOST, CLICKHOUSE_PORT, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_client: Client | None = None
|
|
_available: bool | None = None # None = not tested yet
|
|
|
|
|
|
class ClickHouseUnavailable(Exception):
|
|
"""Raised when ClickHouse is not reachable."""
|
|
|
|
|
|
def is_available() -> bool:
|
|
"""Check ClickHouse connectivity (retries on every call if previously failed)."""
|
|
global _client, _available
|
|
try:
|
|
# Force re-creation if previously marked unavailable
|
|
if _available is False:
|
|
_client = None
|
|
get_client()
|
|
_available = True
|
|
return True
|
|
except Exception:
|
|
_available = False
|
|
_client = None
|
|
logger.warning("ClickHouse unavailable at %s:%s", CLICKHOUSE_HOST, CLICKHOUSE_PORT)
|
|
return False
|
|
|
|
|
|
def get_client() -> Client:
|
|
"""Return a lazily-initialised ClickHouse client (singleton).
|
|
|
|
Resets the singleton on connection failure so the next call retries.
|
|
"""
|
|
global _client, _available
|
|
if _client is None:
|
|
_client = clickhouse_connect.get_client(
|
|
host=CLICKHOUSE_HOST,
|
|
port=CLICKHOUSE_PORT,
|
|
username=CLICKHOUSE_USER,
|
|
password=CLICKHOUSE_PASSWORD,
|
|
)
|
|
logger.info("Connected to ClickHouse at %s:%s", CLICKHOUSE_HOST, CLICKHOUSE_PORT)
|
|
_available = True
|
|
return _client
|
|
|
|
|
|
def _mark_unavailable() -> None:
|
|
"""Reset client and mark ClickHouse as unavailable."""
|
|
global _client, _available
|
|
_client = None
|
|
_available = False
|
|
|
|
|
|
def _normalise_value(v: Any) -> Any:
|
|
"""Convert ClickHouse-specific types to JSON-friendly Python types."""
|
|
if isinstance(v, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
|
|
return str(v)
|
|
if isinstance(v, bytes):
|
|
try:
|
|
return str(ipaddress.IPv6Address(v))
|
|
except Exception:
|
|
return v.hex()
|
|
return v
|
|
|
|
|
|
def query(sql: str, params: dict | None = None) -> list[dict[str, Any]]:
|
|
"""Execute *sql* and return a list of row-dicts.
|
|
|
|
Raises ClickHouseUnavailable if the server is not reachable.
|
|
"""
|
|
try:
|
|
client = get_client()
|
|
result = client.query(sql, parameters=params or {})
|
|
columns = result.column_names
|
|
rows: list[dict[str, Any]] = []
|
|
for row in result.result_rows:
|
|
rows.append({col: _normalise_value(val) for col, val in zip(columns, row)})
|
|
return rows
|
|
except (DatabaseError, ConnectionError, OSError) as exc:
|
|
_mark_unavailable()
|
|
raise ClickHouseUnavailable(str(exc)) from exc
|
|
|
|
|
|
def query_scalar(sql: str, params: dict | None = None) -> Any:
|
|
"""Execute *sql* and return the single scalar value.
|
|
|
|
Raises ClickHouseUnavailable if the server is not reachable.
|
|
"""
|
|
try:
|
|
client = get_client()
|
|
result = client.query(sql, parameters=params or {})
|
|
if result.result_rows:
|
|
return _normalise_value(result.result_rows[0][0])
|
|
return None
|
|
except (DatabaseError, ConnectionError, OSError) as exc:
|
|
_mark_unavailable()
|
|
raise ClickHouseUnavailable(str(exc)) from exc
|
|
|
|
|
|
def execute(sql: str, params: dict | None = None) -> None:
|
|
"""Execute a DDL / DML statement that returns no rows.
|
|
|
|
Raises ClickHouseUnavailable if the server is not reachable.
|
|
"""
|
|
try:
|
|
client = get_client()
|
|
client.command(sql, parameters=params or {})
|
|
except (DatabaseError, ConnectionError, OSError) as exc:
|
|
_mark_unavailable()
|
|
raise ClickHouseUnavailable(str(exc)) from exc
|