- feat(observability): metrics server with /metrics and /health endpoints - feat(observability): correlation metrics (events, success/failed, reasons, buffers) - feat(correlation): IP exclusion filter (exact IPs and CIDR ranges) - feat(correlation): pending orphan delay for late-arriving B events - fix(stdout): sink is now a no-op for data; JSON must never appear on stdout - fix(clickhouse): all flush errors were silently discarded, now properly logged - fix(clickhouse): buffer overflow with DropOnOverflow now logged at WARN - fix(clickhouse): retry attempts logged at WARN with attempt/delay/error context - feat(clickhouse): connection success logged at INFO, batch sends at DEBUG - feat(clickhouse): SetLogger() for external logger injection - test(stdout): assert stdout remains empty for correlated and orphan logs - chore(rpm): bump version to 1.1.11, update changelog - docs: README and architecture.yml updated Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
583 lines
18 KiB
Python
Executable File
583 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
test-correlation-advanced.py - Advanced correlation testing tool
|
|
|
|
This script provides comprehensive testing for the logcorrelator service,
|
|
including various scenarios to debug correlation issues.
|
|
|
|
Usage:
|
|
python3 test-correlation-advanced.py [options]
|
|
|
|
Requirements:
|
|
- Python 3.6+
|
|
- requests library (for metrics): pip install requests
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import socket
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional, Tuple
|
|
|
|
try:
|
|
import requests
|
|
HAS_REQUESTS = True
|
|
except ImportError:
|
|
HAS_REQUESTS = False
|
|
|
|
|
|
class Colors:
|
|
"""ANSI color codes for terminal output."""
|
|
BLUE = '\033[0;34m'
|
|
GREEN = '\033[0;32m'
|
|
YELLOW = '\033[1;33m'
|
|
RED = '\033[0;31m'
|
|
NC = '\033[0m' # No Color
|
|
BOLD = '\033[1m'
|
|
|
|
|
|
def colorize(text: str, color: str) -> str:
|
|
"""Wrap text with ANSI color codes."""
|
|
return f"{color}{text}{Colors.NC}"
|
|
|
|
|
|
def info(text: str):
|
|
print(colorize(f"[INFO] ", Colors.BLUE) + text)
|
|
|
|
|
|
def success(text: str):
|
|
print(colorize(f"[OK] ", Colors.GREEN) + text)
|
|
|
|
|
|
def warn(text: str):
|
|
print(colorize(f"[WARN] ", Colors.YELLOW) + text)
|
|
|
|
|
|
def error(text: str):
|
|
print(colorize(f"[ERROR] ", Colors.RED) + text)
|
|
|
|
|
|
def debug(text: str, verbose: bool = False):
|
|
if verbose:
|
|
print(colorize(f"[DEBUG] ", Colors.BLUE) + text)
|
|
|
|
|
|
class CorrelationTester:
|
|
"""Main test class for correlation testing."""
|
|
|
|
def __init__(
|
|
self,
|
|
http_socket: str = "/var/run/logcorrelator/http.socket",
|
|
network_socket: str = "/var/run/logcorrelator/network.socket",
|
|
metrics_url: str = "http://localhost:8080/metrics",
|
|
verbose: bool = False,
|
|
skip_metrics: bool = False
|
|
):
|
|
self.http_socket = http_socket
|
|
self.network_socket = network_socket
|
|
self.metrics_url = metrics_url
|
|
self.verbose = verbose
|
|
self.skip_metrics = skip_metrics
|
|
self.http_sock: Optional[socket.socket] = None
|
|
self.network_sock: Optional[socket.socket] = None
|
|
|
|
def connect(self) -> bool:
|
|
"""Connect to Unix sockets."""
|
|
try:
|
|
# HTTP socket
|
|
self.http_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
|
self.http_sock.connect(self.http_socket)
|
|
debug(f"Connected to HTTP socket: {self.http_socket}", self.verbose)
|
|
|
|
# Network socket
|
|
self.network_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
|
self.network_sock.connect(self.network_socket)
|
|
debug(f"Connected to Network socket: {self.network_socket}", self.verbose)
|
|
|
|
return True
|
|
except FileNotFoundError as e:
|
|
error(f"Socket not found: {e}")
|
|
return False
|
|
except Exception as e:
|
|
error(f"Connection error: {e}")
|
|
return False
|
|
|
|
def close(self):
|
|
"""Close socket connections."""
|
|
if self.http_sock:
|
|
self.http_sock.close()
|
|
if self.network_sock:
|
|
self.network_sock.close()
|
|
|
|
def send_http_event(
|
|
self,
|
|
src_ip: str,
|
|
src_port: int,
|
|
timestamp: int,
|
|
method: str = "GET",
|
|
path: str = "/test",
|
|
host: str = "example.com",
|
|
extra_headers: Optional[Dict[str, str]] = None
|
|
) -> Dict[str, Any]:
|
|
"""Send an HTTP (source A) event."""
|
|
event = {
|
|
"src_ip": src_ip,
|
|
"src_port": src_port,
|
|
"dst_ip": "10.0.0.1",
|
|
"dst_port": 443,
|
|
"timestamp": timestamp,
|
|
"method": method,
|
|
"path": path,
|
|
"host": host,
|
|
"http_version": "HTTP/1.1",
|
|
"header_user_agent": "TestAgent/1.0",
|
|
"header_accept": "*/*"
|
|
}
|
|
|
|
if extra_headers:
|
|
for key, value in extra_headers.items():
|
|
event[f"header_{key}"] = value
|
|
|
|
json_data = json.dumps(event)
|
|
|
|
if self.http_sock:
|
|
self.http_sock.sendall(json_data.encode())
|
|
debug(f"Sent HTTP event: {src_ip}:{src_port} ts={timestamp}", self.verbose)
|
|
|
|
return event
|
|
|
|
def send_network_event(
|
|
self,
|
|
src_ip: str,
|
|
src_port: int,
|
|
timestamp: int,
|
|
ja3: str = "abc123",
|
|
ja4: str = "def456",
|
|
tls_version: str = "TLS1.3",
|
|
tls_sni: str = "example.com"
|
|
) -> Dict[str, Any]:
|
|
"""Send a Network (source B) event."""
|
|
event = {
|
|
"src_ip": src_ip,
|
|
"src_port": src_port,
|
|
"dst_ip": "10.0.0.1",
|
|
"dst_port": 443,
|
|
"timestamp": timestamp,
|
|
"ja3": ja3,
|
|
"ja4": ja4,
|
|
"tls_version": tls_version,
|
|
"tls_sni": tls_sni
|
|
}
|
|
|
|
json_data = json.dumps(event)
|
|
|
|
if self.network_sock:
|
|
self.network_sock.sendall(json_data.encode())
|
|
debug(f"Sent Network event: {src_ip}:{src_port} ts={timestamp}", self.verbose)
|
|
|
|
return event
|
|
|
|
def get_metrics(self) -> Dict[str, Any]:
|
|
"""Fetch metrics from the metrics server."""
|
|
if self.skip_metrics:
|
|
return {}
|
|
|
|
if not HAS_REQUESTS:
|
|
warn("requests library not installed, skipping metrics")
|
|
return {}
|
|
|
|
try:
|
|
response = requests.get(self.metrics_url, timeout=5)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
warn(f"Failed to fetch metrics: {e}")
|
|
return {}
|
|
|
|
def print_metrics(self, metrics: Dict[str, Any], title: str = "Metrics"):
|
|
"""Print metrics in a formatted way."""
|
|
if not metrics:
|
|
return
|
|
|
|
print(f"\n{colorize(f'=== {title} ===', Colors.BOLD)}")
|
|
|
|
keys_to_show = [
|
|
("events_received_a", "Events A"),
|
|
("events_received_b", "Events B"),
|
|
("correlations_success", "Correlations"),
|
|
("correlations_failed", "Failures"),
|
|
("failed_no_match_key", " - No match key"),
|
|
("failed_time_window", " - Time window"),
|
|
("failed_buffer_eviction", " - Buffer eviction"),
|
|
("failed_ttl_expired", " - TTL expired"),
|
|
("buffer_a_size", "Buffer A size"),
|
|
("buffer_b_size", "Buffer B size"),
|
|
("orphans_emitted_a", "Orphans A"),
|
|
("orphans_emitted_b", "Orphans B"),
|
|
("pending_orphan_match", "Pending orphan matches"),
|
|
("keepalive_resets", "Keep-Alive resets"),
|
|
]
|
|
|
|
for key, label in keys_to_show:
|
|
if key in metrics:
|
|
print(f" {label}: {metrics[key]}")
|
|
|
|
def check_sockets(self) -> bool:
|
|
"""Check if sockets exist."""
|
|
import os
|
|
|
|
errors = 0
|
|
for name, path in [("HTTP", self.http_socket), ("Network", self.network_socket)]:
|
|
if not os.path.exists(path):
|
|
error(f"{name} socket not found: {path}")
|
|
errors += 1
|
|
elif not os.path.exists(path) or not os.path.stat(path).st_mode & 0o170000 == 0o140000:
|
|
# Check if it's a socket
|
|
try:
|
|
if not socket.getaddrinfo(path, None, socket.AF_UNIX):
|
|
error(f"{name} path exists but is not a socket: {path}")
|
|
errors += 1
|
|
except:
|
|
pass
|
|
else:
|
|
debug(f"{name} socket found: {path}", self.verbose)
|
|
|
|
return errors == 0
|
|
|
|
def run_basic_test(self, count: int = 10, delay_ms: int = 100) -> Tuple[bool, Dict[str, int]]:
|
|
"""
|
|
Run basic correlation test.
|
|
|
|
Sends N pairs of A+B events with matching src_ip:src_port and timestamps.
|
|
All should correlate successfully.
|
|
"""
|
|
info(f"Running basic correlation test with {count} pairs...")
|
|
|
|
# Get initial metrics
|
|
initial_metrics = self.get_metrics()
|
|
self.print_metrics(initial_metrics, "Initial Metrics")
|
|
|
|
initial_success = initial_metrics.get("correlations_success", 0)
|
|
initial_failed = initial_metrics.get("correlations_failed", 0)
|
|
initial_a = initial_metrics.get("events_received_a", 0)
|
|
initial_b = initial_metrics.get("events_received_b", 0)
|
|
|
|
# Send test events
|
|
print(f"\nSending {count} event pairs...")
|
|
|
|
base_timestamp = time.time_ns()
|
|
sent = 0
|
|
|
|
for i in range(1, count + 1):
|
|
src_ip = f"192.168.1.{(i % 254) + 1}"
|
|
src_port = 8000 + i
|
|
|
|
# Same timestamp for perfect correlation
|
|
timestamp = base_timestamp + (i * 1_000_000)
|
|
|
|
self.send_http_event(src_ip, src_port, timestamp)
|
|
self.send_network_event(src_ip, src_port, timestamp)
|
|
|
|
sent += 1
|
|
|
|
if delay_ms > 0:
|
|
time.sleep(delay_ms / 1000.0)
|
|
|
|
success(f"Sent {sent} event pairs")
|
|
|
|
# Wait for processing
|
|
info("Waiting for processing (2 seconds)...")
|
|
time.sleep(2)
|
|
|
|
# Get final metrics
|
|
final_metrics = self.get_metrics()
|
|
self.print_metrics(final_metrics, "Final Metrics")
|
|
|
|
# Calculate deltas
|
|
delta_success = final_metrics.get("correlations_success", 0) - initial_success
|
|
delta_failed = final_metrics.get("correlations_failed", 0) - initial_failed
|
|
delta_a = final_metrics.get("events_received_a", 0) - initial_a
|
|
delta_b = final_metrics.get("events_received_b", 0) - initial_b
|
|
|
|
results = {
|
|
"sent": sent,
|
|
"received_a": delta_a,
|
|
"received_b": delta_b,
|
|
"correlations": delta_success,
|
|
"failures": delta_failed
|
|
}
|
|
|
|
# Print results
|
|
print(f"\n{colorize('=== Results ===', Colors.BOLD)}")
|
|
print(f" Events A sent: {delta_a} (expected: {sent})")
|
|
print(f" Events B sent: {delta_b} (expected: {sent})")
|
|
print(f" Correlations: {delta_success}")
|
|
print(f" Failures: {delta_failed}")
|
|
|
|
# Validation
|
|
test_passed = True
|
|
|
|
if delta_a != sent:
|
|
error(f"Event A count mismatch: got {delta_a}, expected {sent}")
|
|
test_passed = False
|
|
|
|
if delta_b != sent:
|
|
error(f"Event B count mismatch: got {delta_b}, expected {sent}")
|
|
test_passed = False
|
|
|
|
if delta_success != sent:
|
|
error(f"Correlation count mismatch: got {delta_success}, expected {sent}")
|
|
test_passed = False
|
|
|
|
if delta_failed > 0:
|
|
warn(f"Unexpected correlation failures: {delta_failed}")
|
|
|
|
if test_passed:
|
|
success("All tests passed! Correlation is working correctly.")
|
|
else:
|
|
error("Some tests failed. Check logs for details.")
|
|
|
|
return test_passed, results
|
|
|
|
def run_time_window_test(self) -> bool:
|
|
"""Test time window expiration."""
|
|
info("Running time window test...")
|
|
|
|
src_ip = "192.168.100.1"
|
|
src_port = 9999
|
|
|
|
# Send A event
|
|
ts_a = time.time_ns()
|
|
self.send_http_event(src_ip, src_port, ts_a)
|
|
info(f"Sent A event at {ts_a}")
|
|
|
|
# Wait for time window to expire (default 10s)
|
|
info("Waiting 11 seconds (time window should expire)...")
|
|
time.sleep(11)
|
|
|
|
# Send B event
|
|
ts_b = time.time_ns()
|
|
self.send_network_event(src_ip, src_port, ts_b)
|
|
info(f"Sent B event at {ts_b}")
|
|
|
|
time_diff_sec = (ts_b - ts_a) / 1_000_000_000
|
|
info(f"Time difference: {time_diff_sec:.1f} seconds")
|
|
info("Expected: time_window failure (check metrics)")
|
|
|
|
return True
|
|
|
|
def run_different_ip_test(self) -> bool:
|
|
"""Test different IP (should not correlate)."""
|
|
info("Running different IP test...")
|
|
|
|
ts = time.time_ns()
|
|
|
|
# Send A with IP 192.168.200.1
|
|
self.send_http_event("192.168.200.1", 7777, ts)
|
|
info("Sent A event from 192.168.200.1:7777")
|
|
|
|
# Send B with different IP
|
|
self.send_network_event("192.168.200.2", 7777, ts)
|
|
info("Sent B event from 192.168.200.2:7777 (different IP)")
|
|
|
|
info("Expected: no_match_key failure (different src_ip)")
|
|
|
|
return True
|
|
|
|
def run_keepalive_test(self, count: int = 5) -> bool:
|
|
"""Test Keep-Alive mode (one B correlates with multiple A)."""
|
|
info(f"Running Keep-Alive test with {count} HTTP requests on same connection...")
|
|
|
|
src_ip = "192.168.50.1"
|
|
src_port = 6000
|
|
|
|
# Send one B event first (network/TCP connection)
|
|
ts_b = time.time_ns()
|
|
self.send_network_event(src_ip, src_port, ts_b)
|
|
info(f"Sent B event (connection): {src_ip}:{src_port}")
|
|
|
|
# Send multiple A events (HTTP requests) on same connection
|
|
for i in range(count):
|
|
ts_a = time.time_ns() + (i * 100_000_000) # 100ms apart
|
|
self.send_http_event(src_ip, src_port, ts_a, path=f"/request{i}")
|
|
info(f"Sent A event (request {i}): {src_ip}:{src_port}")
|
|
time.sleep(0.05) # 50ms delay
|
|
|
|
time.sleep(2) # Wait for processing
|
|
|
|
# Check metrics
|
|
metrics = self.get_metrics()
|
|
keepalive_resets = metrics.get("keepalive_resets", 0)
|
|
|
|
info(f"Keep-Alive resets: {keepalive_resets} (expected: {count - 1})")
|
|
|
|
if keepalive_resets >= count - 1:
|
|
success("Keep-Alive test passed!")
|
|
return True
|
|
else:
|
|
warn(f"Keep-Alive resets lower than expected. This may be normal depending on timing.")
|
|
return True
|
|
|
|
def run_all_tests(self) -> bool:
|
|
"""Run all test scenarios."""
|
|
results = []
|
|
|
|
# Basic test
|
|
passed, _ = self.run_basic_test(count=10)
|
|
results.append(("Basic correlation", passed))
|
|
|
|
print("\n" + "=" * 50 + "\n")
|
|
|
|
# Time window test
|
|
self.run_time_window_test()
|
|
results.append(("Time window", True)) # Informational
|
|
|
|
print("\n" + "=" * 50 + "\n")
|
|
|
|
# Different IP test
|
|
self.run_different_ip_test()
|
|
results.append(("Different IP", True)) # Informational
|
|
|
|
print("\n" + "=" * 50 + "\n")
|
|
|
|
# Keep-Alive test
|
|
self.run_keepalive_test()
|
|
results.append(("Keep-Alive", True))
|
|
|
|
# Summary
|
|
print(f"\n{colorize('=== Test Summary ===', Colors.BOLD)}")
|
|
for name, passed in results:
|
|
status = colorize("PASS", Colors.GREEN) if passed else colorize("FAIL", Colors.RED)
|
|
print(f" {name}: {status}")
|
|
|
|
return all(r[1] for r in results)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Advanced correlation testing tool for logcorrelator",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Run basic test with 20 pairs
|
|
python3 test-correlation-advanced.py -c 20
|
|
|
|
# Run all tests with verbose output
|
|
python3 test-correlation-advanced.py --all -v
|
|
|
|
# Test with custom socket paths
|
|
python3 test-correlation-advanced.py -H /tmp/http.sock -N /tmp/network.sock
|
|
|
|
# Skip metrics check
|
|
python3 test-correlation-advanced.py --skip-metrics
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-H", "--http-socket",
|
|
default="/var/run/logcorrelator/http.socket",
|
|
help="Path to HTTP Unix socket (default: /var/run/logcorrelator/http.socket)"
|
|
)
|
|
parser.add_argument(
|
|
"-N", "--network-socket",
|
|
default="/var/run/logcorrelator/network.socket",
|
|
help="Path to Network Unix socket (default: /var/run/logcorrelator/network.socket)"
|
|
)
|
|
parser.add_argument(
|
|
"-m", "--metrics-url",
|
|
default="http://localhost:8080/metrics",
|
|
help="Metrics server URL (default: http://localhost:8080/metrics)"
|
|
)
|
|
parser.add_argument(
|
|
"-c", "--count",
|
|
type=int,
|
|
default=10,
|
|
help="Number of test pairs to send (default: 10)"
|
|
)
|
|
parser.add_argument(
|
|
"-d", "--delay",
|
|
type=int,
|
|
default=100,
|
|
help="Delay between pairs in milliseconds (default: 100)"
|
|
)
|
|
parser.add_argument(
|
|
"-v", "--verbose",
|
|
action="store_true",
|
|
help="Enable verbose output"
|
|
)
|
|
parser.add_argument(
|
|
"--skip-metrics",
|
|
action="store_true",
|
|
help="Skip metrics check"
|
|
)
|
|
parser.add_argument(
|
|
"--all",
|
|
action="store_true",
|
|
help="Run all test scenarios"
|
|
)
|
|
parser.add_argument(
|
|
"--time-window",
|
|
action="store_true",
|
|
help="Run time window test only"
|
|
)
|
|
parser.add_argument(
|
|
"--different-ip",
|
|
action="store_true",
|
|
help="Run different IP test only"
|
|
)
|
|
parser.add_argument(
|
|
"--keepalive",
|
|
action="store_true",
|
|
help="Run Keep-Alive test only"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Create tester
|
|
tester = CorrelationTester(
|
|
http_socket=args.http_socket,
|
|
network_socket=args.network_socket,
|
|
metrics_url=args.metrics_url,
|
|
verbose=args.verbose,
|
|
skip_metrics=args.skip_metrics
|
|
)
|
|
|
|
# Check sockets
|
|
if not tester.check_sockets():
|
|
error("Socket check failed. Is logcorrelator running?")
|
|
sys.exit(1)
|
|
|
|
success("Socket check passed")
|
|
|
|
# Connect
|
|
if not tester.connect():
|
|
error("Failed to connect to sockets")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
if args.all:
|
|
success = tester.run_all_tests()
|
|
elif args.time_window:
|
|
tester.run_time_window_test()
|
|
success = True
|
|
elif args.different_ip:
|
|
tester.run_different_ip_test()
|
|
success = True
|
|
elif args.keepalive:
|
|
tester.run_keepalive_test()
|
|
success = True
|
|
else:
|
|
_, _ = tester.run_basic_test(count=args.count, delay_ms=args.delay)
|
|
success = True
|
|
|
|
sys.exit(0 if success else 1)
|
|
|
|
finally:
|
|
tester.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|