#!/usr/bin/env python3 """ socket_consumer.py - Unix socket consumer for mod_reqin_log This script creates a Unix domain socket server that receives JSON log lines from the mod_reqin_log Apache module. It is primarily used for testing and development purposes. Usage: python3 socket_consumer.py [socket_path] Example: python3 socket_consumer.py /var/run/mod_reqin_log.sock """ import socket import os import sys import json import signal import argparse from datetime import datetime # Default socket path DEFAULT_SOCKET_PATH = "/tmp/mod_reqin_log.sock" # Global flag for graceful shutdown shutdown_requested = False def signal_handler(signum, frame): """Handle shutdown signals gracefully.""" global shutdown_requested shutdown_requested = True print("\nShutdown requested, finishing current operations...") def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser( description="Unix socket consumer for mod_reqin_log" ) parser.add_argument( "socket_path", nargs="?", default=DEFAULT_SOCKET_PATH, help=f"Path to Unix socket (default: {DEFAULT_SOCKET_PATH})" ) parser.add_argument( "-q", "--quiet", action="store_true", help="Suppress log output" ) parser.add_argument( "-o", "--output", type=str, help="Write logs to file instead of stdout" ) parser.add_argument( "--validate-json", action="store_true", help="Validate JSON and pretty-print" ) return parser.parse_args() def create_socket(socket_path): """Create and bind Unix domain socket.""" # Remove existing socket file if os.path.exists(socket_path): os.remove(socket_path) # Create socket server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(socket_path) server.listen(5) # Set permissions (allow Apache to connect) os.chmod(socket_path, 0o666) return server def process_log_line(line, validate_json=False, output_file=None): """Process a single log line.""" line = line.strip() if not line: return if validate_json: try: log_entry = json.loads(line) line = json.dumps(log_entry, indent=2) except json.JSONDecodeError as e: line = f"[INVALID JSON] {line}\nError: {e}" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") output = f"[{timestamp}] {line}" if output_file: output_file.write(output + "\n") output_file.flush() else: print(output) def handle_client(conn, validate_json=False, output_file=None): """Handle a client connection.""" data = b"" try: while not shutdown_requested: chunk = conn.recv(4096) if not chunk: break data += chunk # Process complete lines while b"\n" in data: newline_pos = data.index(b"\n") line = data[:newline_pos].decode("utf-8", errors="replace") data = data[newline_pos + 1:] process_log_line(line, validate_json, output_file) except Exception as e: print(f"Error handling client: {e}", file=sys.stderr) finally: # Process any remaining data if data: line = data.decode("utf-8", errors="replace") process_log_line(line, validate_json, output_file) conn.close() def main(): """Main entry point.""" args = parse_args() # Setup signal handlers signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) output_file = None if args.output: output_file = open(args.output, "a") try: # Create socket server = create_socket(args.socket_path) print(f"Listening on {args.socket_path}", file=sys.stderr) if not args.quiet: print(f"Waiting for connections... (Ctrl+C to stop)", file=sys.stderr) # Accept connections while not shutdown_requested: try: server.settimeout(1.0) try: conn, addr = server.accept() except socket.timeout: continue # Handle client in same thread for simplicity handle_client(conn, args.validate_json, output_file) except Exception as e: if not shutdown_requested: print(f"Accept error: {e}", file=sys.stderr) except Exception as e: print(f"Fatal error: {e}", file=sys.stderr) return 1 finally: # Cleanup if os.path.exists(args.socket_path): os.remove(args.socket_path) if output_file: output_file.close() print("Socket consumer stopped.", file=sys.stderr) return 0 if __name__ == "__main__": sys.exit(main())