Services: - ja4sentinel: TLS/JA4 fingerprint capture daemon (Go, libpcap) - logcorrelator: JA4 log correlation engine (Go, ClickHouse) - mod_reqin_log: Apache module (C, JSON request logging) - bot_detector: ML bot detection pipeline (Python) - dashboard: FastAPI/Streamlit analytics UI (Python) Shared libraries: - shared/go/ja4common: logger, config, shutdown, ipfilter (Go module) - shared/python/ja4_common: ClickHouseClient, ClickHouseSettings (Python package) - shared/clickhouse/: canonical SQL migrations (10 files) Build & packaging: - Unified 3-stage Dockerfile.package for Go RPMs (el8/el9/el10) - go.work workspace linking sentinel, correlator, ja4common - Makefile with test-all, build-all, rpm-* targets Fixes applied: - go.work: 1.21 → 1.24.6 (required by sentinel) - correlator Dockerfiles: golang:1.21 → golang:1.24 - replace directives in go.mod for ja4common local path - pyproject.toml: setuptools.backends → setuptools.build_meta - Removed static libpcap linking (unavailable on Rocky 9) - Fixed data races in output/writers_test.go (sync.Mutex + atomic.Int32) - Rewrote corrupted test files (logger_test.go × 2) Test coverage: - correlator: 67.1% total (unixsocket 80.5%, config 91.7%, app 83.3%, multi 87.7%, stdout 100%) - sentinel: all 10 packages pass (api, capture, config, fingerprint, ipfilter, logging, output, tlsparse) Documentation: - README.md + docs/ (architecture, development, 5 services, shared libs, DB schema & migrations) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
5.9 KiB
python-ja4common
ja4_common is the shared Python library for the ja4-platform, providing a unified ClickHouse client singleton and configuration settings. It is used by bot-detector and dashboard.
Package name: ja4-common
Python version: ≥ 3.11
Dependencies:
clickhouse-connect >= 0.8.0pydantic-settings >= 2.1.0
ClickHouseSettings
Pydantic-settings model that reads configuration from environment variables and .env files.
Fields
| Field | Type | Default | Env Variable | Description |
|---|---|---|---|---|
CLICKHOUSE_HOST |
str | "clickhouse" |
CLICKHOUSE_HOST |
ClickHouse server hostname |
CLICKHOUSE_PORT |
int | 8123 |
CLICKHOUSE_PORT |
ClickHouse HTTP API port |
CLICKHOUSE_DB |
str | "mabase_prod" |
CLICKHOUSE_DB |
Database name |
CLICKHOUSE_USER |
str | "admin" |
CLICKHOUSE_USER |
Username for authentication |
CLICKHOUSE_PASSWORD |
str | "" |
CLICKHOUSE_PASSWORD |
Password for authentication |
Configuration Sources
Settings are loaded in order of precedence:
- Environment variables (highest priority)
.envfile in the current working directory- Default values (lowest priority)
Environment variable names are case-sensitive (e.g., CLICKHOUSE_HOST, not clickhouse_host).
Usage
from ja4_common.settings import settings
print(settings.CLICKHOUSE_HOST) # "clickhouse" or from env
print(settings.CLICKHOUSE_PORT) # 8123 or from env
ClickHouseClient
Wraps clickhouse_connect with auto-reconnection and a clean API.
Methods
| Method | Signature | Description |
|---|---|---|
connect |
connect() -> Client |
Returns the underlying clickhouse_connect client, creating or reconnecting as needed |
query |
query(query: str, params: dict = None) |
Execute a SELECT query, returns result set |
command |
command(query: str, params: dict = None) |
Execute a DDL/DML command (CREATE, INSERT, etc.) |
insert |
insert(table: str, data, column_names=None) |
Bulk insert data into a table |
close |
close() |
Close the connection and release resources |
Auto-Reconnection
The connect() method automatically reconnects if the current connection is lost:
def connect(self):
if self._client is None or not self._ping():
self._client = clickhouse_connect.get_client(
host=settings.CLICKHOUSE_HOST,
port=settings.CLICKHOUSE_PORT,
database=settings.CLICKHOUSE_DB,
user=settings.CLICKHOUSE_USER,
password=settings.CLICKHOUSE_PASSWORD,
connect_timeout=10,
)
return self._client
Usage Example
from ja4_common.clickhouse import get_client
client = get_client()
# SELECT query
result = client.query("SELECT count() FROM http_logs WHERE src_ip = {ip:String}", {"ip": "203.0.113.42"})
print(result.result_rows)
# INSERT
client.insert("audit_logs", [[datetime.now(), "analyst1", "investigate", "ip", "203.0.113.42"]],
column_names=["timestamp", "user_name", "action", "entity_type", "entity_id"])
# Command
client.command("OPTIMIZE TABLE http_logs FINAL")
get_client() Singleton
The get_client() function provides a module-level singleton ClickHouseClient:
from ja4_common.clickhouse import get_client
# First call creates the client
client1 = get_client()
# Subsequent calls return the same instance
client2 = get_client()
assert client1 is client2
Implementation
_client: Optional[ClickHouseClient] = None
def get_client() -> ClickHouseClient:
global _client
if _client is None:
_client = ClickHouseClient()
return _client
Using from a New Service
1. Add Dependency
In your service's requirements.txt:
ja4-common @ file:///app/shared/python/ja4_common
Or in pyproject.toml:
[project]
dependencies = [
"ja4-common",
]
2. Docker Setup
# Copy shared library
COPY shared/python/ja4_common /app/shared/python/ja4_common
RUN pip install /app/shared/python/ja4_common
# Copy service code
COPY services/my-service /app/services/my-service
3. Use in Code
from ja4_common.clickhouse import get_client
from ja4_common.settings import settings
# Access settings
print(f"Connecting to {settings.CLICKHOUSE_HOST}:{settings.CLICKHOUSE_PORT}")
# Use client
db = get_client()
result = db.query("SELECT count() FROM ml_detected_anomalies")
4. Environment Configuration
Create a .env file or set environment variables:
CLICKHOUSE_HOST=clickhouse.example.com
CLICKHOUSE_PORT=8123
CLICKHOUSE_DB=mabase_prod
CLICKHOUSE_USER=data_writer
CLICKHOUSE_PASSWORD=secret
Testing: Mocking the Client
Using unittest.mock
from unittest.mock import MagicMock, patch
from ja4_common.clickhouse import ClickHouseClient
def test_my_service():
mock_client = MagicMock(spec=ClickHouseClient)
mock_client.query.return_value = MagicMock(result_rows=[(42,)])
with patch("ja4_common.clickhouse._client", mock_client):
from ja4_common.clickhouse import get_client
client = get_client()
result = client.query("SELECT count() FROM http_logs")
assert result.result_rows == [(42,)]
Overriding Settings in Tests
from ja4_common.settings import ClickHouseSettings
# Create custom settings for tests
test_settings = ClickHouseSettings(
CLICKHOUSE_HOST="localhost",
CLICKHOUSE_PORT=8123,
CLICKHOUSE_DB="test_db",
CLICKHOUSE_USER="test_user",
CLICKHOUSE_PASSWORD="test_pass",
)
Source Files
| File | Description |
|---|---|
ja4_common/settings.py |
ClickHouseSettings pydantic-settings model |
ja4_common/clickhouse.py |
ClickHouseClient class and get_client() singleton |
pyproject.toml |
Package metadata and dependencies |