Files
mod_reqin_log/scripts/socket_consumer.py
Jacquin Antoine 66549acf5c Initial commit: mod_reqin_log Apache module
Features:
- JSON logging of HTTP requests to Unix domain socket
- Configurable HTTP headers logging (flat JSON structure)
- Header value truncation and count limits
- Automatic reconnect on socket disconnection
- Error reporting with throttling

Configuration directives:
- JsonSockLogEnabled: Enable/disable logging
- JsonSockLogSocket: Unix socket path
- JsonSockLogHeaders: List of headers to log
- JsonSockLogMaxHeaders: Maximum headers to log
- JsonSockLogMaxHeaderValueLen: Max header value length
- JsonSockLogReconnectInterval: Reconnect delay
- JsonSockLogErrorReportInterval: Error log throttle

Includes:
- Module source code (src/)
- Unit and integration tests (tests/, scripts/)
- Documentation (README.md, architecture.yml)
- Build configuration (CMakeLists.txt, Makefile)
- Packaging (deb/rpm)

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
2026-02-26 13:55:07 +01:00

186 lines
5.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
socket_consumer.py - Unix socket consumer for mod_reqin_log
This script creates a Unix domain socket server that receives JSON log lines
from the mod_reqin_log Apache module. It is primarily used for testing and
development purposes.
Usage:
python3 socket_consumer.py [socket_path]
Example:
python3 socket_consumer.py /var/run/mod_reqin_log.sock
"""
import socket
import os
import sys
import json
import signal
import argparse
from datetime import datetime
# Default socket path
DEFAULT_SOCKET_PATH = "/tmp/mod_reqin_log.sock"
# Global flag for graceful shutdown
shutdown_requested = False
def signal_handler(signum, frame):
"""Handle shutdown signals gracefully."""
global shutdown_requested
shutdown_requested = True
print("\nShutdown requested, finishing current operations...")
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Unix socket consumer for mod_reqin_log"
)
parser.add_argument(
"socket_path",
nargs="?",
default=DEFAULT_SOCKET_PATH,
help=f"Path to Unix socket (default: {DEFAULT_SOCKET_PATH})"
)
parser.add_argument(
"-q", "--quiet",
action="store_true",
help="Suppress log output"
)
parser.add_argument(
"-o", "--output",
type=str,
help="Write logs to file instead of stdout"
)
parser.add_argument(
"--validate-json",
action="store_true",
help="Validate JSON and pretty-print"
)
return parser.parse_args()
def create_socket(socket_path):
"""Create and bind Unix domain socket."""
# Remove existing socket file
if os.path.exists(socket_path):
os.remove(socket_path)
# Create socket
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(socket_path)
server.listen(5)
# Set permissions (allow Apache to connect)
os.chmod(socket_path, 0o666)
return server
def process_log_line(line, validate_json=False, output_file=None):
"""Process a single log line."""
line = line.strip()
if not line:
return
if validate_json:
try:
log_entry = json.loads(line)
line = json.dumps(log_entry, indent=2)
except json.JSONDecodeError as e:
line = f"[INVALID JSON] {line}\nError: {e}"
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
output = f"[{timestamp}] {line}"
if output_file:
output_file.write(output + "\n")
output_file.flush()
else:
print(output)
def handle_client(conn, validate_json=False, output_file=None):
"""Handle a client connection."""
data = b""
try:
while not shutdown_requested:
chunk = conn.recv(4096)
if not chunk:
break
data += chunk
# Process complete lines
while b"\n" in data:
newline_pos = data.index(b"\n")
line = data[:newline_pos].decode("utf-8", errors="replace")
data = data[newline_pos + 1:]
process_log_line(line, validate_json, output_file)
except Exception as e:
print(f"Error handling client: {e}", file=sys.stderr)
finally:
# Process any remaining data
if data:
line = data.decode("utf-8", errors="replace")
process_log_line(line, validate_json, output_file)
conn.close()
def main():
"""Main entry point."""
args = parse_args()
# Setup signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
output_file = None
if args.output:
output_file = open(args.output, "a")
try:
# Create socket
server = create_socket(args.socket_path)
print(f"Listening on {args.socket_path}", file=sys.stderr)
if not args.quiet:
print(f"Waiting for connections... (Ctrl+C to stop)", file=sys.stderr)
# Accept connections
while not shutdown_requested:
try:
server.settimeout(1.0)
try:
conn, addr = server.accept()
except socket.timeout:
continue
# Handle client in same thread for simplicity
handle_client(conn, args.validate_json, output_file)
except Exception as e:
if not shutdown_requested:
print(f"Accept error: {e}", file=sys.stderr)
except Exception as e:
print(f"Fatal error: {e}", file=sys.stderr)
return 1
finally:
# Cleanup
if os.path.exists(args.socket_path):
os.remove(args.socket_path)
if output_file:
output_file.close()
print("Socket consumer stopped.", file=sys.stderr)
return 0
if __name__ == "__main__":
sys.exit(main())