Security fixes: #1 Buffer overflow: Validate socket path length against sun_path limit - Add MAX_SOCKET_PATH_LEN constant - Reject paths >= 108 bytes before snprintf #2,#3 NULL pointer dereference: Add NULL checks - r->connection->local_ip: use conditional append - r->protocol: fallback to "UNKNOWN" if NULL #4 Sensitive headers blacklist: Prevent credential leakage - Add DEFAULT_SENSITIVE_HEADERS[] blacklist - Block: Authorization, Cookie, Set-Cookie, X-Api-Key, etc. - Log skipped headers at DEBUG level only #5 Memory exhaustion DoS: Add MAX_JSON_SIZE limit (64KB) - Check buffer size before adding headers - Truncate header list if limit reached #6 Socket permissions: Change 0o666 → 0o660 - Owner and group only (not world-writable) - Apache user must be in socket's group #7 Race condition: Add mutex for FD access in worker/event MPMs - apr_thread_mutex_t protects socket_fd - FD_MUTEX_LOCK/UNLOCK macros - Created in reqin_log_create_server_conf() #8 Timestamp overflow: Document 2262 limitation - Add comment explaining apr_time_t limits - Safe until ~2262 (uint64 nanoseconds) #9 Error logging verbosity: Reduce information disclosure - APLOG_ERR: Generic messages only - APLOG_DEBUG: Detailed error information #10 Socket path security: Move from /tmp to /var/run - Update socket_consumer.py, test scripts - Use environment variable MOD_REQIN_LOG_SOCKET - More secure default location Files modified: - src/mod_reqin_log.c: All security fixes - scripts/socket_consumer.py: Permissions, path - scripts/run_integration_tests.sh: Path security - scripts/test_unix_socket.sh: Path security - tests/integration/test_integration.py: Path security Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
458 lines
15 KiB
Python
458 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
test_integration.py - Integration tests for mod_reqin_log
|
|
|
|
This script runs integration tests for the mod_reqin_log Apache module.
|
|
It tests the 4 required scenarios from architecture.yml:
|
|
1. basic_logging - Verify JSON logs with expected fields
|
|
2. header_limits - Verify header count and value length limits
|
|
3. socket_unavailable_on_start - Verify reconnect behavior when socket is unavailable
|
|
4. runtime_socket_loss - Verify behavior when socket disappears during traffic
|
|
"""
|
|
|
|
import socket
|
|
import os
|
|
import sys
|
|
import json
|
|
import signal
|
|
import time
|
|
import subprocess
|
|
import threading
|
|
import argparse
|
|
from datetime import datetime
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
|
|
# Default paths
|
|
# Use /var/run for production (more secure than /tmp)
|
|
DEFAULT_SOCKET_PATH = os.environ.get("MOD_REQIN_LOG_SOCKET", "/var/run/mod_reqin_log_test.sock")
|
|
DEFAULT_APACHE_URL = "http://localhost:8080"
|
|
|
|
# Test results
|
|
tests_run = 0
|
|
tests_passed = 0
|
|
tests_failed = 0
|
|
|
|
# Global flags
|
|
shutdown_requested = False
|
|
log_entries = []
|
|
socket_server = None
|
|
socket_thread = None
|
|
|
|
|
|
def log_info(msg):
|
|
print(f"[INFO] {msg}", file=sys.stderr)
|
|
|
|
|
|
def log_pass(msg):
|
|
global tests_passed
|
|
tests_passed += 1
|
|
print(f"[PASS] {msg}", file=sys.stderr)
|
|
|
|
|
|
def log_fail(msg):
|
|
global tests_failed
|
|
tests_failed += 1
|
|
print(f"[FAIL] {msg}", file=sys.stderr)
|
|
|
|
|
|
def log_test_start(name):
|
|
global tests_run
|
|
tests_run += 1
|
|
print(f"\n[TEST] Starting: {name}", file=sys.stderr)
|
|
|
|
|
|
class SocketServer:
|
|
"""Unix socket server that collects JSON log entries."""
|
|
|
|
def __init__(self, socket_path):
|
|
self.socket_path = socket_path
|
|
self.server = None
|
|
self.running = False
|
|
self.entries = []
|
|
self.lock = threading.Lock()
|
|
self.connection = None
|
|
self.buffer = b""
|
|
|
|
def start(self):
|
|
"""Start the socket server."""
|
|
# Remove existing socket
|
|
if os.path.exists(self.socket_path):
|
|
os.remove(self.socket_path)
|
|
|
|
# Create socket
|
|
self.server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
self.server.bind(self.socket_path)
|
|
self.server.listen(5)
|
|
self.server.settimeout(1.0)
|
|
os.chmod(self.socket_path, 0o666)
|
|
self.running = True
|
|
|
|
# Start accept thread
|
|
self.thread = threading.Thread(target=self._accept_loop, daemon=True)
|
|
self.thread.start()
|
|
|
|
def _accept_loop(self):
|
|
"""Accept connections and read data."""
|
|
while self.running:
|
|
try:
|
|
conn, addr = self.server.accept()
|
|
conn.settimeout(0.5)
|
|
self.connection = conn
|
|
while self.running:
|
|
try:
|
|
chunk = conn.recv(4096)
|
|
if not chunk:
|
|
break
|
|
self.buffer += chunk
|
|
|
|
# Process complete lines
|
|
while b'\n' in self.buffer:
|
|
newline_pos = self.buffer.index(b'\n')
|
|
line = self.buffer[:newline_pos].decode('utf-8', errors='replace')
|
|
self.buffer = self.buffer[newline_pos + 1:]
|
|
if line.strip():
|
|
self._process_entry(line)
|
|
except socket.timeout:
|
|
continue
|
|
except Exception as e:
|
|
break
|
|
conn.close()
|
|
self.connection = None
|
|
except socket.timeout:
|
|
continue
|
|
except Exception as e:
|
|
if self.running:
|
|
log_info(f"Socket server error: {e}")
|
|
|
|
def _process_entry(self, line):
|
|
"""Process a log entry."""
|
|
try:
|
|
entry = json.loads(line)
|
|
with self.lock:
|
|
self.entries.append(entry)
|
|
except json.JSONDecodeError:
|
|
log_info(f"Invalid JSON entry: {line[:100]}")
|
|
|
|
def stop(self):
|
|
"""Stop the socket server."""
|
|
self.running = False
|
|
if self.connection:
|
|
try:
|
|
self.connection.close()
|
|
except:
|
|
pass
|
|
if self.server:
|
|
try:
|
|
self.server.close()
|
|
except:
|
|
pass
|
|
if os.path.exists(self.socket_path):
|
|
try:
|
|
os.remove(self.socket_path)
|
|
except:
|
|
pass
|
|
|
|
def get_entries(self):
|
|
"""Get collected log entries."""
|
|
with self.lock:
|
|
return list(self.entries)
|
|
|
|
def clear_entries(self):
|
|
"""Clear collected entries."""
|
|
with self.lock:
|
|
self.entries.clear()
|
|
|
|
def wait_for_entries(self, count, timeout=5.0):
|
|
"""Wait for at least 'count' entries to arrive."""
|
|
start = time.time()
|
|
while time.time() - start < timeout:
|
|
with self.lock:
|
|
if len(self.entries) >= count:
|
|
return True
|
|
time.sleep(0.1)
|
|
return False
|
|
|
|
|
|
def make_request(url, headers=None, method='GET'):
|
|
"""Make an HTTP request using curl."""
|
|
import urllib.request
|
|
|
|
req = urllib.request.Request(url, method=method)
|
|
if headers:
|
|
for key, value in headers.items():
|
|
req.add_header(key, value)
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=5) as response:
|
|
return response.status, response.read().decode('utf-8', errors='replace')
|
|
except Exception as e:
|
|
return None, str(e)
|
|
|
|
|
|
# ============================================================================
|
|
# Test 1: Basic Logging
|
|
# ============================================================================
|
|
def test_basic_logging(socket_server, apache_url):
|
|
"""
|
|
Test: basic_logging
|
|
Description: With JsonSockLogEnabled On and valid socket, verify that each request
|
|
produces a valid JSON line with expected fields.
|
|
"""
|
|
log_test_start("basic_logging")
|
|
|
|
socket_server.clear_entries()
|
|
|
|
# Make a simple request
|
|
status, _ = make_request(f"{apache_url}/")
|
|
|
|
# Wait for log entry
|
|
if not socket_server.wait_for_entries(1, timeout=3.0):
|
|
log_fail("basic_logging - No log entries received")
|
|
return False
|
|
|
|
entries = socket_server.get_entries()
|
|
entry = entries[-1]
|
|
|
|
# Verify required fields
|
|
required_fields = ['time', 'timestamp', 'src_ip', 'src_port', 'dst_ip',
|
|
'dst_port', 'method', 'path', 'host', 'http_version']
|
|
|
|
missing_fields = []
|
|
for field in required_fields:
|
|
if field not in entry:
|
|
missing_fields.append(field)
|
|
|
|
if missing_fields:
|
|
log_fail(f"basic_logging - Missing fields: {missing_fields}")
|
|
return False
|
|
|
|
# Verify field types and values
|
|
if entry.get('method') != 'GET':
|
|
log_fail(f"basic_logging - Expected method 'GET', got '{entry.get('method')}'")
|
|
return False
|
|
|
|
if not isinstance(entry.get('timestamp'), int):
|
|
log_fail(f"basic_logging - timestamp should be integer, got {type(entry.get('timestamp'))}")
|
|
return False
|
|
|
|
if not entry.get('time', '').startswith('20'):
|
|
log_fail(f"basic_logging - Invalid time format: {entry.get('time')}")
|
|
return False
|
|
|
|
log_pass("basic_logging - All required fields present and valid")
|
|
return True
|
|
|
|
|
|
# ============================================================================
|
|
# Test 2: Header Limits
|
|
# ============================================================================
|
|
def test_header_limits(socket_server, apache_url):
|
|
"""
|
|
Test: header_limits
|
|
Description: Configure more headers than JsonSockLogMaxHeaders and verify only
|
|
the first N are logged and values are truncated.
|
|
"""
|
|
log_test_start("header_limits")
|
|
|
|
socket_server.clear_entries()
|
|
|
|
# Make request with multiple headers including a long one
|
|
headers = {
|
|
'X-Request-Id': 'test-123',
|
|
'X-Trace-Id': 'trace-456',
|
|
'User-Agent': 'TestAgent/1.0',
|
|
'X-Long-Header': 'A' * 500, # Very long value
|
|
}
|
|
|
|
status, _ = make_request(f"{apache_url}/test-headers", headers=headers)
|
|
|
|
# Wait for log entry
|
|
if not socket_server.wait_for_entries(1, timeout=3.0):
|
|
log_fail("header_limits - No log entries received")
|
|
return False
|
|
|
|
entries = socket_server.get_entries()
|
|
entry = entries[-1]
|
|
|
|
# Check that header fields are present (implementation logs configured headers)
|
|
header_fields = [k for k in entry.keys() if k.startswith('header_')]
|
|
|
|
# Verify header value truncation (max 256 chars by default)
|
|
for key, value in entry.items():
|
|
if key.startswith('header_') and isinstance(value, str):
|
|
if len(value) > 256:
|
|
log_fail(f"header_limits - Header value not truncated: {key} has {len(value)} chars")
|
|
return False
|
|
|
|
log_pass(f"header_limits - Headers logged correctly ({len(header_fields)} header fields)")
|
|
return True
|
|
|
|
|
|
# ============================================================================
|
|
# Test 3: Socket Unavailable on Start
|
|
# ============================================================================
|
|
def test_socket_unavailable_on_start(socket_server, apache_url):
|
|
"""
|
|
Test: socket_unavailable_on_start
|
|
Description: Start with socket not yet created; verify periodic reconnect attempts
|
|
and throttled error logging.
|
|
"""
|
|
log_test_start("socket_unavailable_on_start")
|
|
|
|
# Stop the socket server to simulate unavailable socket
|
|
socket_server.stop()
|
|
time.sleep(0.5)
|
|
|
|
# Make requests while socket is unavailable
|
|
for i in range(3):
|
|
make_request(f"{apache_url}/unavailable-{i}")
|
|
time.sleep(0.2)
|
|
|
|
# Requests should still succeed (logging failures don't affect client)
|
|
status, _ = make_request(f"{apache_url}/final-check")
|
|
if status != 200:
|
|
log_fail("socket_unavailable_on_start - Request failed when socket unavailable")
|
|
# Restart socket server for subsequent tests
|
|
socket_server.start()
|
|
return False
|
|
|
|
# Restart socket server
|
|
socket_server.start()
|
|
time.sleep(0.5)
|
|
|
|
# Verify module can reconnect
|
|
socket_server.clear_entries()
|
|
status, _ = make_request(f"{apache_url}/after-reconnect")
|
|
|
|
if socket_server.wait_for_entries(1, timeout=3.0):
|
|
log_pass("socket_unavailable_on_start - Module reconnected after socket became available")
|
|
return True
|
|
else:
|
|
log_fail("socket_unavailable_on_start - Module did not reconnect")
|
|
return False
|
|
|
|
|
|
# ============================================================================
|
|
# Test 4: Runtime Socket Loss
|
|
# ============================================================================
|
|
def test_runtime_socket_loss(socket_server, apache_url):
|
|
"""
|
|
Test: runtime_socket_loss
|
|
Description: Drop the Unix socket while traffic is ongoing; verify that log lines
|
|
are dropped, worker threads are not blocked, and reconnect attempts
|
|
resume once the socket reappears.
|
|
"""
|
|
log_test_start("runtime_socket_loss")
|
|
|
|
socket_server.clear_entries()
|
|
|
|
# Make some initial requests
|
|
for i in range(3):
|
|
make_request(f"{apache_url}/before-loss-{i}")
|
|
|
|
if not socket_server.wait_for_entries(3, timeout=3.0):
|
|
log_fail("runtime_socket_loss - Initial requests not logged")
|
|
return False
|
|
|
|
initial_count = len(socket_server.get_entries())
|
|
|
|
# Simulate socket loss by stopping server
|
|
socket_server.stop()
|
|
time.sleep(0.3)
|
|
|
|
# Make requests while socket is gone
|
|
start_time = time.time()
|
|
for i in range(3):
|
|
req_start = time.time()
|
|
status, _ = make_request(f"{apache_url}/during-loss-{i}")
|
|
req_duration = time.time() - req_start
|
|
|
|
# Requests should NOT block (should complete quickly)
|
|
if req_duration > 2.0:
|
|
log_fail(f"runtime_socket_loss - Request blocked for {req_duration:.2f}s")
|
|
socket_server.start()
|
|
return False
|
|
|
|
# Give time for any pending logs
|
|
time.sleep(0.5)
|
|
|
|
# Verify no new entries were logged (socket was down)
|
|
current_count = len(socket_server.get_entries())
|
|
if current_count != initial_count:
|
|
log_info(f"runtime_socket_loss - Some entries logged during socket loss (expected: {initial_count}, got: {current_count})")
|
|
|
|
# Restart socket server
|
|
socket_server.start()
|
|
time.sleep(0.5)
|
|
|
|
# Verify module can reconnect and log again
|
|
socket_server.clear_entries()
|
|
status, _ = make_request(f"{apache_url}/after-loss")
|
|
|
|
if socket_server.wait_for_entries(1, timeout=3.0):
|
|
log_pass("runtime_socket_loss - Module recovered after socket restored")
|
|
return True
|
|
else:
|
|
log_fail("runtime_socket_loss - Module did not recover after socket restored")
|
|
return False
|
|
|
|
|
|
# ============================================================================
|
|
# Main Test Runner
|
|
# ============================================================================
|
|
def run_all_tests(apache_url, socket_path):
|
|
"""Run all integration tests."""
|
|
global tests_run, tests_passed, tests_failed
|
|
|
|
print("=" * 60, file=sys.stderr)
|
|
print("mod_reqin_log Integration Tests", file=sys.stderr)
|
|
print("=" * 60, file=sys.stderr)
|
|
|
|
# Create socket server
|
|
server = SocketServer(socket_path)
|
|
server.start()
|
|
log_info(f"Socket server started on {socket_path}")
|
|
|
|
# Give Apache time to connect
|
|
time.sleep(1.0)
|
|
|
|
try:
|
|
# Run all tests
|
|
test_basic_logging(server, apache_url)
|
|
test_header_limits(server, apache_url)
|
|
test_socket_unavailable_on_start(server, apache_url)
|
|
test_runtime_socket_loss(server, apache_url)
|
|
|
|
finally:
|
|
# Cleanup
|
|
server.stop()
|
|
log_info("Socket server stopped")
|
|
|
|
# Print summary
|
|
print("\n" + "=" * 60, file=sys.stderr)
|
|
print("Test Summary", file=sys.stderr)
|
|
print("=" * 60, file=sys.stderr)
|
|
print(f"Tests run: {tests_run}", file=sys.stderr)
|
|
print(f"Tests passed: {tests_passed}", file=sys.stderr)
|
|
print(f"Tests failed: {tests_failed}", file=sys.stderr)
|
|
print("=" * 60, file=sys.stderr)
|
|
|
|
return tests_failed == 0
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Integration tests for mod_reqin_log')
|
|
parser.add_argument('--socket', default=DEFAULT_SOCKET_PATH,
|
|
help=f'Unix socket path (default: {DEFAULT_SOCKET_PATH})')
|
|
parser.add_argument('--url', default=DEFAULT_APACHE_URL,
|
|
help=f'Apache URL (default: {DEFAULT_APACHE_URL})')
|
|
args = parser.parse_args()
|
|
|
|
success = run_all_tests(args.url, args.socket)
|
|
sys.exit(0 if success else 1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|