Files
Jacquin Antoine f88b739992 feat(e2e): add distributed E2E test framework with parametric traffic generation
Add run-e2e-test.sh with CLI parameters (--hits, --http-ratio, --dns, --tls,
--src-ips, --keep-analysis, --up) for configurable traffic generation. Traffic
runs from VM endpoints with multiple source IPs (alias IPs on eth0) to produce
distinct sessions for the ML pipeline. Fix curl TLS flags (--tlsv1.2 instead
of --tls-v1-2), skip redundant local verification in distributed mode, and
fix dashboard is_available() cache that never retried after ClickHouse recovery.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-15 00:09:32 +02:00

122 lines
3.7 KiB
Python

from __future__ import annotations
import ipaddress
import logging
from typing import Any
import clickhouse_connect
from clickhouse_connect.driver.client import Client
from clickhouse_connect.driver.exceptions import DatabaseError
from backend.config import CLICKHOUSE_HOST, CLICKHOUSE_PORT, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD
logger = logging.getLogger(__name__)
_client: Client | None = None
_available: bool | None = None # None = not tested yet
class ClickHouseUnavailable(Exception):
"""Raised when ClickHouse is not reachable."""
def is_available() -> bool:
"""Check ClickHouse connectivity (retries on every call if previously failed)."""
global _client, _available
try:
# Force re-creation if previously marked unavailable
if _available is False:
_client = None
get_client()
_available = True
return True
except Exception:
_available = False
_client = None
logger.warning("ClickHouse unavailable at %s:%s", CLICKHOUSE_HOST, CLICKHOUSE_PORT)
return False
def get_client() -> Client:
"""Return a lazily-initialised ClickHouse client (singleton).
Resets the singleton on connection failure so the next call retries.
"""
global _client, _available
if _client is None:
_client = clickhouse_connect.get_client(
host=CLICKHOUSE_HOST,
port=CLICKHOUSE_PORT,
username=CLICKHOUSE_USER,
password=CLICKHOUSE_PASSWORD,
)
logger.info("Connected to ClickHouse at %s:%s", CLICKHOUSE_HOST, CLICKHOUSE_PORT)
_available = True
return _client
def _mark_unavailable() -> None:
"""Reset client and mark ClickHouse as unavailable."""
global _client, _available
_client = None
_available = False
def _normalise_value(v: Any) -> Any:
"""Convert ClickHouse-specific types to JSON-friendly Python types."""
if isinstance(v, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
return str(v)
if isinstance(v, bytes):
try:
return str(ipaddress.IPv6Address(v))
except Exception:
return v.hex()
return v
def query(sql: str, params: dict | None = None) -> list[dict[str, Any]]:
"""Execute *sql* and return a list of row-dicts.
Raises ClickHouseUnavailable if the server is not reachable.
"""
try:
client = get_client()
result = client.query(sql, parameters=params or {})
columns = result.column_names
rows: list[dict[str, Any]] = []
for row in result.result_rows:
rows.append({col: _normalise_value(val) for col, val in zip(columns, row)})
return rows
except (DatabaseError, ConnectionError, OSError) as exc:
_mark_unavailable()
raise ClickHouseUnavailable(str(exc)) from exc
def query_scalar(sql: str, params: dict | None = None) -> Any:
"""Execute *sql* and return the single scalar value.
Raises ClickHouseUnavailable if the server is not reachable.
"""
try:
client = get_client()
result = client.query(sql, parameters=params or {})
if result.result_rows:
return _normalise_value(result.result_rows[0][0])
return None
except (DatabaseError, ConnectionError, OSError) as exc:
_mark_unavailable()
raise ClickHouseUnavailable(str(exc)) from exc
def execute(sql: str, params: dict | None = None) -> None:
"""Execute a DDL / DML statement that returns no rows.
Raises ClickHouseUnavailable if the server is not reachable.
"""
try:
client = get_client()
client.command(sql, parameters=params or {})
except (DatabaseError, ConnectionError, OSError) as exc:
_mark_unavailable()
raise ClickHouseUnavailable(str(exc)) from exc