# Correlator The correlator (`logcorrelator`) is a Go daemon that joins HTTP events from [mod-reqin-log](mod-reqin-log.md) (source A) with TLS/network events from [sentinel](sentinel.md) (source B) into unified correlated log entries. It uses a `src_ip:src_port` key with a configurable time window to match events, supports HTTP Keep-Alive connections, and writes results to ClickHouse, file, and/or stdout. ## Correlation Algorithm ### Key Matching Events are correlated by their **correlation key**: `src_ip:src_port`. Since a client's ephemeral source port uniquely identifies a TCP connection, matching on this pair reliably joins the HTTP request (seen by Apache) with the TLS handshake (seen by sentinel) from the same connection. ### Time Window Events must arrive within the configured time window (default: **10 seconds**) to be matched. This accounts for: - Processing latency between Apache and sentinel - Packet capture buffering - UNIX socket delivery ordering ### Keep-Alive Support In `one_to_many` mode (default), a single TLS handshake event (source B) can match **multiple** HTTP requests (source A) on the same TCP connection: 1. Source B event arrives → buffered with TTL (default: 120 s) 2. Source A event arrives with same key → correlation match, B event TTL resets 3. Next A event on same connection → matches same B event (TTL resets again) 4. Connection closes → B event expires after TTL Each A event within a Keep-Alive session gets an incrementing `keepalives` counter. ### Orphan Handling - **Source A orphans** (HTTP without TLS match): Emitted after `apache_emit_delay_ms` (default: 500 ms) with `correlated=false`, `orphan_side=A` - **Source B orphans** (TLS without HTTP match): Not emitted by default (`network_emit: false`) - **Buffer overflow**: Oldest events are rotated out and emitted as orphans ### Field Merging When two events are correlated: - HTTP fields (method, path, headers, etc.) come from source A - TLS/network fields (JA4, JA3, IP/TCP metadata) come from source B - On field collision with different values: both are kept with `a_` and `b_` prefixes ## Configuration Reference Configuration is loaded from a YAML file (default: `/etc/logcorrelator/logcorrelator.yml`). ### Log Settings | Name | Type | Default | Description | |------|------|---------|-------------| | `log.level` | string | `INFO` | Log level: `DEBUG`, `INFO`, `WARN`, `ERROR` | ### Input Settings | Name | Type | Default | Description | |------|------|---------|-------------| | `inputs.unix_sockets[].name` | string | — | Human-readable source name (e.g., `http`, `network`) | | `inputs.unix_sockets[].path` | string | — | UNIX socket path to listen on | | `inputs.unix_sockets[].format` | string | `json` | Input format | | `inputs.unix_sockets[].source_type` | string | — | Event source: `A` (HTTP), `B` (Network) | | `inputs.unix_sockets[].socket_permissions` | string | `0666` | Socket file permissions (octal) | ### Output Settings #### File Output | Name | Type | Default | Description | |------|------|---------|-------------| | `outputs.file.enabled` | bool | `true` | Enable file output | | `outputs.file.path` | string | `/var/log/logcorrelator/correlated.log` | Output file path | #### ClickHouse Output | Name | Type | Default | Description | |------|------|---------|-------------| | `outputs.clickhouse.enabled` | bool | `false` | Enable ClickHouse output | | `outputs.clickhouse.dsn` | string | — | ClickHouse DSN (e.g., `clickhouse://user:pass@host:9000/db`) | | `outputs.clickhouse.table` | string | — | Target table name | | `outputs.clickhouse.batch_size` | int | `500` | Records per batch insert | | `outputs.clickhouse.flush_interval_ms` | int | `200` | Flush interval in milliseconds | | `outputs.clickhouse.max_buffer_size` | int | `5000` | Maximum in-memory buffer size | | `outputs.clickhouse.drop_on_overflow` | bool | `true` | Drop records when buffer is full | | `outputs.clickhouse.async_insert` | bool | `true` | Use ClickHouse async inserts | | `outputs.clickhouse.timeout_ms` | int | `1000` | Operation timeout in milliseconds | #### Stdout Output | Name | Type | Default | Description | |------|------|---------|-------------| | `outputs.stdout.enabled` | bool | `false` | Enable stdout output | | `outputs.stdout.level` | string | — | Output verbosity filter | ### Correlation Settings | Name | Type | Default | Description | |------|------|---------|-------------| | `correlation.time_window.value` | int | `10` | Time window value | | `correlation.time_window.unit` | string | `s` | Time window unit (`s`, `ms`) | | `correlation.orphan_policy.apache_always_emit` | bool | `true` | Always emit A events even without B match | | `correlation.orphan_policy.apache_emit_delay_ms` | int | `500` | Delay before emitting orphan A (ms) | | `correlation.orphan_policy.network_emit` | bool | `false` | Emit B events without A match | | `correlation.matching.mode` | string | `one_to_many` | Matching mode: `one_to_one` or `one_to_many` | | `correlation.buffers.max_http_items` | int | `10000` | Max buffered HTTP (source A) events | | `correlation.buffers.max_network_items` | int | `20000` | Max buffered network (source B) events | | `correlation.ttl.network_ttl_s` | int | `120` | TTL for source B events (seconds) | | `correlation.exclude_source_ips` | []string | `[]` | IPs or CIDRs to exclude from correlation | | `correlation.include_dest_ports` | []int | `[]` | If non-empty, only correlate events on these ports | ### Metrics Settings | Name | Type | Default | Description | |------|------|---------|-------------| | `metrics.enabled` | bool | `false` | Enable metrics HTTP server | | `metrics.addr` | string | `:8080` | Metrics server listen address | ## Input Events ### Source A (HTTP — from mod-reqin-log) JSON fields: `time`, `src_ip`, `src_port`, `dst_ip`, `dst_port`, `method`, `scheme`, `host`, `path`, `query`, `http_version`, `client_headers`, `header_*` ### Source B (Network — from sentinel) JSON fields: `src_ip`, `src_port`, `dst_ip`, `dst_port`, `ip_meta_*`, `tcp_meta_*`, `tls_version`, `tls_sni`, `tls_alpn`, `ja4`, `ja3`, `ja3_hash`, `conn_id`, `syn_to_clienthello_ms`, `timestamp` ## Output CorrelatedLog JSON Schema ```json { "timestamp": "2026-03-09T14:30:00Z", "src_ip": "203.0.113.42", "src_port": 52341, "dst_ip": "192.168.1.10", "dst_port": 443, "correlated": true, "method": "GET", "host": "example.com", "path": "/api/v1/users", "ja4": "t13d1516h2_8daaf6152771_b0da82dd1658", "ja3_hash": "e7d705a3286e19ea42f587b344ee6865", "ip_meta_ttl": 64, "tcp_meta_window_size": 65535, "tls_version": "1.3", "tls_sni": "example.com", "tls_alpn": "h2", "header_User-Agent": "Mozilla/5.0 ...", "keepalives": 3 } ``` Core fields are always present; additional fields are merged from A and B event raw data. ## ClickHouse Sink - **Protocol**: ClickHouse native TCP (port 9000) via `clickhouse-go/v2` - **Target table**: `http_logs_raw` (raw JSON stored, then parsed by materialized views) - **Batch inserts**: Buffered up to `batch_size` records (default 500) - **Flush interval**: Default 200 ms timer triggers flush if batch not full - **Retry behavior**: Up to 3 retries with exponential backoff (100 ms base) - **Connection ping**: 5-second timeout on startup - **Buffer overflow**: Records dropped when buffer exceeds `max_buffer_size` (configurable) ## Metrics HTTP Server When `metrics.enabled: true`, exposes: | Endpoint | Description | |----------|-------------| | `GET /metrics` | Correlation metrics as JSON (events received, correlated, orphans, buffer sizes) | | `GET /health` | Health check endpoint | ## systemd Service ```ini [Unit] Description=logcorrelator service After=network.target [Service] Type=simple User=logcorrelator Group=logcorrelator ExecStart=/usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.yml ExecReload=/bin/kill -HUP $MAINPID Restart=on-failure RestartSec=5 RuntimeDirectory=logcorrelator RuntimeDirectoryMode=0755 # Security hardening NoNewPrivileges=true ProtectSystem=strict ProtectHome=true ReadWritePaths=/var/log/logcorrelator /etc/logcorrelator # Resource limits LimitNOFILE=65536 TimeoutStartSec=10 TimeoutStopSec=30 [Install] WantedBy=multi-user.target ``` ### Security Hardening - Runs as dedicated `logcorrelator` user/group - `NoNewPrivileges=true` — prevents privilege escalation - `ProtectSystem=strict` — read-only filesystem except `ReadWritePaths` - `ProtectHome=true` — no access to home directories - `RuntimeDirectory=logcorrelator` — systemd creates socket directory with correct ownership ## RPM Package Contents | Path | Description | |------|-------------| | `/usr/bin/logcorrelator` | Binary | | `/etc/logcorrelator/logcorrelator.yml` | Configuration file | | `/usr/lib/systemd/system/logcorrelator.service` | systemd unit | | `/var/log/logcorrelator/` | Log directory | | `/var/run/logcorrelator/` | Socket directory (RuntimeDirectory) |