Files
mod_reqin_log/tests/integration/test_integration.py
Jacquin Antoine e44059865b Security: fix critical vulnerabilities and harden module
Security fixes:
#1 Buffer overflow: Validate socket path length against sun_path limit
    - Add MAX_SOCKET_PATH_LEN constant
    - Reject paths >= 108 bytes before snprintf

#2,#3 NULL pointer dereference: Add NULL checks
    - r->connection->local_ip: use conditional append
    - r->protocol: fallback to "UNKNOWN" if NULL

#4 Sensitive headers blacklist: Prevent credential leakage
    - Add DEFAULT_SENSITIVE_HEADERS[] blacklist
    - Block: Authorization, Cookie, Set-Cookie, X-Api-Key, etc.
    - Log skipped headers at DEBUG level only

#5 Memory exhaustion DoS: Add MAX_JSON_SIZE limit (64KB)
    - Check buffer size before adding headers
    - Truncate header list if limit reached

#6 Socket permissions: Change 0o666 → 0o660
    - Owner and group only (not world-writable)
    - Apache user must be in socket's group

#7 Race condition: Add mutex for FD access in worker/event MPMs
    - apr_thread_mutex_t protects socket_fd
    - FD_MUTEX_LOCK/UNLOCK macros
    - Created in reqin_log_create_server_conf()

#8 Timestamp overflow: Document 2262 limitation
    - Add comment explaining apr_time_t limits
    - Safe until ~2262 (uint64 nanoseconds)

#9 Error logging verbosity: Reduce information disclosure
    - APLOG_ERR: Generic messages only
    - APLOG_DEBUG: Detailed error information

#10 Socket path security: Move from /tmp to /var/run
    - Update socket_consumer.py, test scripts
    - Use environment variable MOD_REQIN_LOG_SOCKET
    - More secure default location

Files modified:
- src/mod_reqin_log.c: All security fixes
- scripts/socket_consumer.py: Permissions, path
- scripts/run_integration_tests.sh: Path security
- scripts/test_unix_socket.sh: Path security
- tests/integration/test_integration.py: Path security

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
2026-02-26 23:37:30 +01:00

458 lines
15 KiB
Python

#!/usr/bin/env python3
"""
test_integration.py - Integration tests for mod_reqin_log
This script runs integration tests for the mod_reqin_log Apache module.
It tests the 4 required scenarios from architecture.yml:
1. basic_logging - Verify JSON logs with expected fields
2. header_limits - Verify header count and value length limits
3. socket_unavailable_on_start - Verify reconnect behavior when socket is unavailable
4. runtime_socket_loss - Verify behavior when socket disappears during traffic
"""
import socket
import os
import sys
import json
import signal
import time
import subprocess
import threading
import argparse
from datetime import datetime
from http.server import HTTPServer, BaseHTTPRequestHandler
# Default paths
# Use /var/run for production (more secure than /tmp)
DEFAULT_SOCKET_PATH = os.environ.get("MOD_REQIN_LOG_SOCKET", "/var/run/mod_reqin_log_test.sock")
DEFAULT_APACHE_URL = "http://localhost:8080"
# Test results
tests_run = 0
tests_passed = 0
tests_failed = 0
# Global flags
shutdown_requested = False
log_entries = []
socket_server = None
socket_thread = None
def log_info(msg):
print(f"[INFO] {msg}", file=sys.stderr)
def log_pass(msg):
global tests_passed
tests_passed += 1
print(f"[PASS] {msg}", file=sys.stderr)
def log_fail(msg):
global tests_failed
tests_failed += 1
print(f"[FAIL] {msg}", file=sys.stderr)
def log_test_start(name):
global tests_run
tests_run += 1
print(f"\n[TEST] Starting: {name}", file=sys.stderr)
class SocketServer:
"""Unix socket server that collects JSON log entries."""
def __init__(self, socket_path):
self.socket_path = socket_path
self.server = None
self.running = False
self.entries = []
self.lock = threading.Lock()
self.connection = None
self.buffer = b""
def start(self):
"""Start the socket server."""
# Remove existing socket
if os.path.exists(self.socket_path):
os.remove(self.socket_path)
# Create socket
self.server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind(self.socket_path)
self.server.listen(5)
self.server.settimeout(1.0)
os.chmod(self.socket_path, 0o666)
self.running = True
# Start accept thread
self.thread = threading.Thread(target=self._accept_loop, daemon=True)
self.thread.start()
def _accept_loop(self):
"""Accept connections and read data."""
while self.running:
try:
conn, addr = self.server.accept()
conn.settimeout(0.5)
self.connection = conn
while self.running:
try:
chunk = conn.recv(4096)
if not chunk:
break
self.buffer += chunk
# Process complete lines
while b'\n' in self.buffer:
newline_pos = self.buffer.index(b'\n')
line = self.buffer[:newline_pos].decode('utf-8', errors='replace')
self.buffer = self.buffer[newline_pos + 1:]
if line.strip():
self._process_entry(line)
except socket.timeout:
continue
except Exception as e:
break
conn.close()
self.connection = None
except socket.timeout:
continue
except Exception as e:
if self.running:
log_info(f"Socket server error: {e}")
def _process_entry(self, line):
"""Process a log entry."""
try:
entry = json.loads(line)
with self.lock:
self.entries.append(entry)
except json.JSONDecodeError:
log_info(f"Invalid JSON entry: {line[:100]}")
def stop(self):
"""Stop the socket server."""
self.running = False
if self.connection:
try:
self.connection.close()
except:
pass
if self.server:
try:
self.server.close()
except:
pass
if os.path.exists(self.socket_path):
try:
os.remove(self.socket_path)
except:
pass
def get_entries(self):
"""Get collected log entries."""
with self.lock:
return list(self.entries)
def clear_entries(self):
"""Clear collected entries."""
with self.lock:
self.entries.clear()
def wait_for_entries(self, count, timeout=5.0):
"""Wait for at least 'count' entries to arrive."""
start = time.time()
while time.time() - start < timeout:
with self.lock:
if len(self.entries) >= count:
return True
time.sleep(0.1)
return False
def make_request(url, headers=None, method='GET'):
"""Make an HTTP request using curl."""
import urllib.request
req = urllib.request.Request(url, method=method)
if headers:
for key, value in headers.items():
req.add_header(key, value)
try:
with urllib.request.urlopen(req, timeout=5) as response:
return response.status, response.read().decode('utf-8', errors='replace')
except Exception as e:
return None, str(e)
# ============================================================================
# Test 1: Basic Logging
# ============================================================================
def test_basic_logging(socket_server, apache_url):
"""
Test: basic_logging
Description: With JsonSockLogEnabled On and valid socket, verify that each request
produces a valid JSON line with expected fields.
"""
log_test_start("basic_logging")
socket_server.clear_entries()
# Make a simple request
status, _ = make_request(f"{apache_url}/")
# Wait for log entry
if not socket_server.wait_for_entries(1, timeout=3.0):
log_fail("basic_logging - No log entries received")
return False
entries = socket_server.get_entries()
entry = entries[-1]
# Verify required fields
required_fields = ['time', 'timestamp', 'src_ip', 'src_port', 'dst_ip',
'dst_port', 'method', 'path', 'host', 'http_version']
missing_fields = []
for field in required_fields:
if field not in entry:
missing_fields.append(field)
if missing_fields:
log_fail(f"basic_logging - Missing fields: {missing_fields}")
return False
# Verify field types and values
if entry.get('method') != 'GET':
log_fail(f"basic_logging - Expected method 'GET', got '{entry.get('method')}'")
return False
if not isinstance(entry.get('timestamp'), int):
log_fail(f"basic_logging - timestamp should be integer, got {type(entry.get('timestamp'))}")
return False
if not entry.get('time', '').startswith('20'):
log_fail(f"basic_logging - Invalid time format: {entry.get('time')}")
return False
log_pass("basic_logging - All required fields present and valid")
return True
# ============================================================================
# Test 2: Header Limits
# ============================================================================
def test_header_limits(socket_server, apache_url):
"""
Test: header_limits
Description: Configure more headers than JsonSockLogMaxHeaders and verify only
the first N are logged and values are truncated.
"""
log_test_start("header_limits")
socket_server.clear_entries()
# Make request with multiple headers including a long one
headers = {
'X-Request-Id': 'test-123',
'X-Trace-Id': 'trace-456',
'User-Agent': 'TestAgent/1.0',
'X-Long-Header': 'A' * 500, # Very long value
}
status, _ = make_request(f"{apache_url}/test-headers", headers=headers)
# Wait for log entry
if not socket_server.wait_for_entries(1, timeout=3.0):
log_fail("header_limits - No log entries received")
return False
entries = socket_server.get_entries()
entry = entries[-1]
# Check that header fields are present (implementation logs configured headers)
header_fields = [k for k in entry.keys() if k.startswith('header_')]
# Verify header value truncation (max 256 chars by default)
for key, value in entry.items():
if key.startswith('header_') and isinstance(value, str):
if len(value) > 256:
log_fail(f"header_limits - Header value not truncated: {key} has {len(value)} chars")
return False
log_pass(f"header_limits - Headers logged correctly ({len(header_fields)} header fields)")
return True
# ============================================================================
# Test 3: Socket Unavailable on Start
# ============================================================================
def test_socket_unavailable_on_start(socket_server, apache_url):
"""
Test: socket_unavailable_on_start
Description: Start with socket not yet created; verify periodic reconnect attempts
and throttled error logging.
"""
log_test_start("socket_unavailable_on_start")
# Stop the socket server to simulate unavailable socket
socket_server.stop()
time.sleep(0.5)
# Make requests while socket is unavailable
for i in range(3):
make_request(f"{apache_url}/unavailable-{i}")
time.sleep(0.2)
# Requests should still succeed (logging failures don't affect client)
status, _ = make_request(f"{apache_url}/final-check")
if status != 200:
log_fail("socket_unavailable_on_start - Request failed when socket unavailable")
# Restart socket server for subsequent tests
socket_server.start()
return False
# Restart socket server
socket_server.start()
time.sleep(0.5)
# Verify module can reconnect
socket_server.clear_entries()
status, _ = make_request(f"{apache_url}/after-reconnect")
if socket_server.wait_for_entries(1, timeout=3.0):
log_pass("socket_unavailable_on_start - Module reconnected after socket became available")
return True
else:
log_fail("socket_unavailable_on_start - Module did not reconnect")
return False
# ============================================================================
# Test 4: Runtime Socket Loss
# ============================================================================
def test_runtime_socket_loss(socket_server, apache_url):
"""
Test: runtime_socket_loss
Description: Drop the Unix socket while traffic is ongoing; verify that log lines
are dropped, worker threads are not blocked, and reconnect attempts
resume once the socket reappears.
"""
log_test_start("runtime_socket_loss")
socket_server.clear_entries()
# Make some initial requests
for i in range(3):
make_request(f"{apache_url}/before-loss-{i}")
if not socket_server.wait_for_entries(3, timeout=3.0):
log_fail("runtime_socket_loss - Initial requests not logged")
return False
initial_count = len(socket_server.get_entries())
# Simulate socket loss by stopping server
socket_server.stop()
time.sleep(0.3)
# Make requests while socket is gone
start_time = time.time()
for i in range(3):
req_start = time.time()
status, _ = make_request(f"{apache_url}/during-loss-{i}")
req_duration = time.time() - req_start
# Requests should NOT block (should complete quickly)
if req_duration > 2.0:
log_fail(f"runtime_socket_loss - Request blocked for {req_duration:.2f}s")
socket_server.start()
return False
# Give time for any pending logs
time.sleep(0.5)
# Verify no new entries were logged (socket was down)
current_count = len(socket_server.get_entries())
if current_count != initial_count:
log_info(f"runtime_socket_loss - Some entries logged during socket loss (expected: {initial_count}, got: {current_count})")
# Restart socket server
socket_server.start()
time.sleep(0.5)
# Verify module can reconnect and log again
socket_server.clear_entries()
status, _ = make_request(f"{apache_url}/after-loss")
if socket_server.wait_for_entries(1, timeout=3.0):
log_pass("runtime_socket_loss - Module recovered after socket restored")
return True
else:
log_fail("runtime_socket_loss - Module did not recover after socket restored")
return False
# ============================================================================
# Main Test Runner
# ============================================================================
def run_all_tests(apache_url, socket_path):
"""Run all integration tests."""
global tests_run, tests_passed, tests_failed
print("=" * 60, file=sys.stderr)
print("mod_reqin_log Integration Tests", file=sys.stderr)
print("=" * 60, file=sys.stderr)
# Create socket server
server = SocketServer(socket_path)
server.start()
log_info(f"Socket server started on {socket_path}")
# Give Apache time to connect
time.sleep(1.0)
try:
# Run all tests
test_basic_logging(server, apache_url)
test_header_limits(server, apache_url)
test_socket_unavailable_on_start(server, apache_url)
test_runtime_socket_loss(server, apache_url)
finally:
# Cleanup
server.stop()
log_info("Socket server stopped")
# Print summary
print("\n" + "=" * 60, file=sys.stderr)
print("Test Summary", file=sys.stderr)
print("=" * 60, file=sys.stderr)
print(f"Tests run: {tests_run}", file=sys.stderr)
print(f"Tests passed: {tests_passed}", file=sys.stderr)
print(f"Tests failed: {tests_failed}", file=sys.stderr)
print("=" * 60, file=sys.stderr)
return tests_failed == 0
def main():
parser = argparse.ArgumentParser(description='Integration tests for mod_reqin_log')
parser.add_argument('--socket', default=DEFAULT_SOCKET_PATH,
help=f'Unix socket path (default: {DEFAULT_SOCKET_PATH})')
parser.add_argument('--url', default=DEFAULT_APACHE_URL,
help=f'Apache URL (default: {DEFAULT_APACHE_URL})')
args = parser.parse_args()
success = run_all_tests(args.url, args.socket)
sys.exit(0 if success else 1)
if __name__ == '__main__':
main()