#!/usr/bin/env python3 """ test_integration.py - Integration tests for mod_reqin_log This script runs integration tests for the mod_reqin_log Apache module. It tests the 4 required scenarios from architecture.yml: 1. basic_logging - Verify JSON logs with expected fields 2. header_limits - Verify header count and value length limits 3. socket_unavailable_on_start - Verify reconnect behavior when socket is unavailable 4. runtime_socket_loss - Verify behavior when socket disappears during traffic """ import socket import os import sys import json import signal import time import subprocess import threading import argparse from datetime import datetime from http.server import HTTPServer, BaseHTTPRequestHandler # Default paths DEFAULT_SOCKET_PATH = "/tmp/mod_reqin_log_test.sock" DEFAULT_APACHE_URL = "http://localhost:8080" # Test results tests_run = 0 tests_passed = 0 tests_failed = 0 # Global flags shutdown_requested = False log_entries = [] socket_server = None socket_thread = None def log_info(msg): print(f"[INFO] {msg}", file=sys.stderr) def log_pass(msg): global tests_passed tests_passed += 1 print(f"[PASS] {msg}", file=sys.stderr) def log_fail(msg): global tests_failed tests_failed += 1 print(f"[FAIL] {msg}", file=sys.stderr) def log_test_start(name): global tests_run tests_run += 1 print(f"\n[TEST] Starting: {name}", file=sys.stderr) class SocketServer: """Unix socket server that collects JSON log entries.""" def __init__(self, socket_path): self.socket_path = socket_path self.server = None self.running = False self.entries = [] self.lock = threading.Lock() self.connection = None self.buffer = b"" def start(self): """Start the socket server.""" # Remove existing socket if os.path.exists(self.socket_path): os.remove(self.socket_path) # Create socket self.server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.bind(self.socket_path) self.server.listen(5) self.server.settimeout(1.0) os.chmod(self.socket_path, 0o666) self.running = True # Start accept thread self.thread = threading.Thread(target=self._accept_loop, daemon=True) self.thread.start() def _accept_loop(self): """Accept connections and read data.""" while self.running: try: conn, addr = self.server.accept() conn.settimeout(0.5) self.connection = conn while self.running: try: chunk = conn.recv(4096) if not chunk: break self.buffer += chunk # Process complete lines while b'\n' in self.buffer: newline_pos = self.buffer.index(b'\n') line = self.buffer[:newline_pos].decode('utf-8', errors='replace') self.buffer = self.buffer[newline_pos + 1:] if line.strip(): self._process_entry(line) except socket.timeout: continue except Exception as e: break conn.close() self.connection = None except socket.timeout: continue except Exception as e: if self.running: log_info(f"Socket server error: {e}") def _process_entry(self, line): """Process a log entry.""" try: entry = json.loads(line) with self.lock: self.entries.append(entry) except json.JSONDecodeError: log_info(f"Invalid JSON entry: {line[:100]}") def stop(self): """Stop the socket server.""" self.running = False if self.connection: try: self.connection.close() except: pass if self.server: try: self.server.close() except: pass if os.path.exists(self.socket_path): try: os.remove(self.socket_path) except: pass def get_entries(self): """Get collected log entries.""" with self.lock: return list(self.entries) def clear_entries(self): """Clear collected entries.""" with self.lock: self.entries.clear() def wait_for_entries(self, count, timeout=5.0): """Wait for at least 'count' entries to arrive.""" start = time.time() while time.time() - start < timeout: with self.lock: if len(self.entries) >= count: return True time.sleep(0.1) return False def make_request(url, headers=None, method='GET'): """Make an HTTP request using curl.""" import urllib.request req = urllib.request.Request(url, method=method) if headers: for key, value in headers.items(): req.add_header(key, value) try: with urllib.request.urlopen(req, timeout=5) as response: return response.status, response.read().decode('utf-8', errors='replace') except Exception as e: return None, str(e) # ============================================================================ # Test 1: Basic Logging # ============================================================================ def test_basic_logging(socket_server, apache_url): """ Test: basic_logging Description: With JsonSockLogEnabled On and valid socket, verify that each request produces a valid JSON line with expected fields. """ log_test_start("basic_logging") socket_server.clear_entries() # Make a simple request status, _ = make_request(f"{apache_url}/") # Wait for log entry if not socket_server.wait_for_entries(1, timeout=3.0): log_fail("basic_logging - No log entries received") return False entries = socket_server.get_entries() entry = entries[-1] # Verify required fields required_fields = ['time', 'timestamp', 'src_ip', 'src_port', 'dst_ip', 'dst_port', 'method', 'path', 'host', 'http_version'] missing_fields = [] for field in required_fields: if field not in entry: missing_fields.append(field) if missing_fields: log_fail(f"basic_logging - Missing fields: {missing_fields}") return False # Verify field types and values if entry.get('method') != 'GET': log_fail(f"basic_logging - Expected method 'GET', got '{entry.get('method')}'") return False if not isinstance(entry.get('timestamp'), int): log_fail(f"basic_logging - timestamp should be integer, got {type(entry.get('timestamp'))}") return False if not entry.get('time', '').startswith('20'): log_fail(f"basic_logging - Invalid time format: {entry.get('time')}") return False log_pass("basic_logging - All required fields present and valid") return True # ============================================================================ # Test 2: Header Limits # ============================================================================ def test_header_limits(socket_server, apache_url): """ Test: header_limits Description: Configure more headers than JsonSockLogMaxHeaders and verify only the first N are logged and values are truncated. """ log_test_start("header_limits") socket_server.clear_entries() # Make request with multiple headers including a long one headers = { 'X-Request-Id': 'test-123', 'X-Trace-Id': 'trace-456', 'User-Agent': 'TestAgent/1.0', 'X-Long-Header': 'A' * 500, # Very long value } status, _ = make_request(f"{apache_url}/test-headers", headers=headers) # Wait for log entry if not socket_server.wait_for_entries(1, timeout=3.0): log_fail("header_limits - No log entries received") return False entries = socket_server.get_entries() entry = entries[-1] # Check that header fields are present (implementation logs configured headers) header_fields = [k for k in entry.keys() if k.startswith('header_')] # Verify header value truncation (max 256 chars by default) for key, value in entry.items(): if key.startswith('header_') and isinstance(value, str): if len(value) > 256: log_fail(f"header_limits - Header value not truncated: {key} has {len(value)} chars") return False log_pass(f"header_limits - Headers logged correctly ({len(header_fields)} header fields)") return True # ============================================================================ # Test 3: Socket Unavailable on Start # ============================================================================ def test_socket_unavailable_on_start(socket_server, apache_url): """ Test: socket_unavailable_on_start Description: Start with socket not yet created; verify periodic reconnect attempts and throttled error logging. """ log_test_start("socket_unavailable_on_start") # Stop the socket server to simulate unavailable socket socket_server.stop() time.sleep(0.5) # Make requests while socket is unavailable for i in range(3): make_request(f"{apache_url}/unavailable-{i}") time.sleep(0.2) # Requests should still succeed (logging failures don't affect client) status, _ = make_request(f"{apache_url}/final-check") if status != 200: log_fail("socket_unavailable_on_start - Request failed when socket unavailable") # Restart socket server for subsequent tests socket_server.start() return False # Restart socket server socket_server.start() time.sleep(0.5) # Verify module can reconnect socket_server.clear_entries() status, _ = make_request(f"{apache_url}/after-reconnect") if socket_server.wait_for_entries(1, timeout=3.0): log_pass("socket_unavailable_on_start - Module reconnected after socket became available") return True else: log_fail("socket_unavailable_on_start - Module did not reconnect") return False # ============================================================================ # Test 4: Runtime Socket Loss # ============================================================================ def test_runtime_socket_loss(socket_server, apache_url): """ Test: runtime_socket_loss Description: Drop the Unix socket while traffic is ongoing; verify that log lines are dropped, worker threads are not blocked, and reconnect attempts resume once the socket reappears. """ log_test_start("runtime_socket_loss") socket_server.clear_entries() # Make some initial requests for i in range(3): make_request(f"{apache_url}/before-loss-{i}") if not socket_server.wait_for_entries(3, timeout=3.0): log_fail("runtime_socket_loss - Initial requests not logged") return False initial_count = len(socket_server.get_entries()) # Simulate socket loss by stopping server socket_server.stop() time.sleep(0.3) # Make requests while socket is gone start_time = time.time() for i in range(3): req_start = time.time() status, _ = make_request(f"{apache_url}/during-loss-{i}") req_duration = time.time() - req_start # Requests should NOT block (should complete quickly) if req_duration > 2.0: log_fail(f"runtime_socket_loss - Request blocked for {req_duration:.2f}s") socket_server.start() return False # Give time for any pending logs time.sleep(0.5) # Verify no new entries were logged (socket was down) current_count = len(socket_server.get_entries()) if current_count != initial_count: log_info(f"runtime_socket_loss - Some entries logged during socket loss (expected: {initial_count}, got: {current_count})") # Restart socket server socket_server.start() time.sleep(0.5) # Verify module can reconnect and log again socket_server.clear_entries() status, _ = make_request(f"{apache_url}/after-loss") if socket_server.wait_for_entries(1, timeout=3.0): log_pass("runtime_socket_loss - Module recovered after socket restored") return True else: log_fail("runtime_socket_loss - Module did not recover after socket restored") return False # ============================================================================ # Main Test Runner # ============================================================================ def run_all_tests(apache_url, socket_path): """Run all integration tests.""" global tests_run, tests_passed, tests_failed print("=" * 60, file=sys.stderr) print("mod_reqin_log Integration Tests", file=sys.stderr) print("=" * 60, file=sys.stderr) # Create socket server server = SocketServer(socket_path) server.start() log_info(f"Socket server started on {socket_path}") # Give Apache time to connect time.sleep(1.0) try: # Run all tests test_basic_logging(server, apache_url) test_header_limits(server, apache_url) test_socket_unavailable_on_start(server, apache_url) test_runtime_socket_loss(server, apache_url) finally: # Cleanup server.stop() log_info("Socket server stopped") # Print summary print("\n" + "=" * 60, file=sys.stderr) print("Test Summary", file=sys.stderr) print("=" * 60, file=sys.stderr) print(f"Tests run: {tests_run}", file=sys.stderr) print(f"Tests passed: {tests_passed}", file=sys.stderr) print(f"Tests failed: {tests_failed}", file=sys.stderr) print("=" * 60, file=sys.stderr) return tests_failed == 0 def main(): parser = argparse.ArgumentParser(description='Integration tests for mod_reqin_log') parser.add_argument('--socket', default=DEFAULT_SOCKET_PATH, help=f'Unix socket path (default: {DEFAULT_SOCKET_PATH})') parser.add_argument('--url', default=DEFAULT_APACHE_URL, help=f'Apache URL (default: {DEFAULT_APACHE_URL})') args = parser.parse_args() success = run_all_tests(args.url, args.socket) sys.exit(0 if success else 1) if __name__ == '__main__': main()