from __future__ import annotations import ipaddress import logging from typing import Any import clickhouse_connect from clickhouse_connect.driver.client import Client from backend.config import CLICKHOUSE_HOST, CLICKHOUSE_PORT, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD logger = logging.getLogger(__name__) _client: Client | None = None def get_client() -> Client: """Return a lazily-initialised ClickHouse client (singleton).""" global _client if _client is None: _client = clickhouse_connect.get_client( host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT, username=CLICKHOUSE_USER, password=CLICKHOUSE_PASSWORD, ) logger.info("Connected to ClickHouse at %s:%s", CLICKHOUSE_HOST, CLICKHOUSE_PORT) return _client def _normalise_value(v: Any) -> Any: """Convert ClickHouse-specific types to JSON-friendly Python types.""" if isinstance(v, (ipaddress.IPv4Address, ipaddress.IPv6Address)): return str(v) if isinstance(v, bytes): try: return str(ipaddress.IPv6Address(v)) except Exception: return v.hex() return v def query(sql: str, params: dict | None = None) -> list[dict[str, Any]]: """Execute *sql* and return a list of row-dicts.""" client = get_client() result = client.query(sql, parameters=params or {}) columns = result.column_names rows: list[dict[str, Any]] = [] for row in result.result_rows: rows.append({col: _normalise_value(val) for col, val in zip(columns, row)}) return rows def query_scalar(sql: str, params: dict | None = None) -> Any: """Execute *sql* and return the single scalar value.""" client = get_client() result = client.query(sql, parameters=params or {}) if result.result_rows: return _normalise_value(result.result_rows[0][0]) return None def execute(sql: str, params: dict | None = None) -> None: """Execute a DDL / DML statement that returns no rows.""" client = get_client() client.command(sql, parameters=params or {})