// Package output provides writers for ja4sentinel log records package output import ( "encoding/json" "fmt" "io" "net" "os" "path/filepath" "sync" "time" "ja4sentinel/api" ) // Socket configuration constants const ( // DefaultDialTimeout is the default timeout for socket connections DefaultDialTimeout = 5 * time.Second // DefaultWriteTimeout is the default timeout for socket writes DefaultWriteTimeout = 5 * time.Second // DefaultMaxReconnectAttempts is the maximum number of reconnection attempts DefaultMaxReconnectAttempts = 3 // DefaultReconnectBackoff is the initial backoff duration for reconnection DefaultReconnectBackoff = 100 * time.Millisecond // DefaultMaxReconnectBackoff is the maximum backoff duration DefaultMaxReconnectBackoff = 2 * time.Second // DefaultQueueSize is the size of the write queue for async writes DefaultQueueSize = 1000 // DefaultMaxFileSize is the default maximum file size in bytes before rotation (100MB) DefaultMaxFileSize = 100 * 1024 * 1024 // DefaultMaxBackups is the default number of backup files to keep DefaultMaxBackups = 3 ) // StdoutWriter writes log records to stdout type StdoutWriter struct { encoder *json.Encoder mutex sync.Mutex } // NewStdoutWriter creates a new stdout writer func NewStdoutWriter() *StdoutWriter { return &StdoutWriter{ encoder: json.NewEncoder(os.Stdout), } } // Write writes a log record to stdout func (w *StdoutWriter) Write(rec api.LogRecord) error { w.mutex.Lock() defer w.mutex.Unlock() return w.encoder.Encode(rec) } // Close closes the writer (no-op for stdout) func (w *StdoutWriter) Close() error { return nil } // FileWriter writes log records to a file with rotation support type FileWriter struct { file *os.File encoder *json.Encoder mutex sync.Mutex path string maxSize int64 maxBackups int currentSize int64 errorCallback ErrorCallback failuresMu sync.Mutex failures int } // FileWriterOption is a function type for configuring FileWriter type FileWriterOption func(*FileWriter) // WithFileErrorCallback sets an error callback for file write errors func WithFileErrorCallback(cb ErrorCallback) FileWriterOption { return func(w *FileWriter) { w.errorCallback = cb } } // NewFileWriter creates a new file writer with rotation func NewFileWriter(path string) (*FileWriter, error) { return NewFileWriterWithConfig(path, DefaultMaxFileSize, DefaultMaxBackups) } // NewFileWriterWithConfig creates a new file writer with custom rotation config func NewFileWriterWithConfig(path string, maxSize int64, maxBackups int, opts ...FileWriterOption) (*FileWriter, error) { // Create directory if it doesn't exist dir := filepath.Dir(path) if err := os.MkdirAll(dir, 0755); err != nil { return nil, fmt.Errorf("failed to create directory %s: %w", dir, err) } // Open file with secure permissions (owner read/write only) file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) if err != nil { return nil, fmt.Errorf("failed to open file %s: %w", path, err) } // Get current file size info, err := file.Stat() if err != nil { file.Close() return nil, fmt.Errorf("failed to stat file: %w", err) } w := &FileWriter{ file: file, encoder: json.NewEncoder(file), path: path, maxSize: maxSize, maxBackups: maxBackups, currentSize: info.Size(), } // Apply options (for error callback) for _, opt := range opts { opt(w) } return w, nil } // rotate rotates the log file if it exceeds the max size func (w *FileWriter) rotate() error { if err := w.file.Close(); err != nil { return fmt.Errorf("failed to close file: %w", err) } // Rotate existing backups for i := w.maxBackups; i > 1; i-- { oldPath := fmt.Sprintf("%s.%d", w.path, i-1) newPath := fmt.Sprintf("%s.%d", w.path, i) os.Rename(oldPath, newPath) // Ignore errors - file may not exist } // Move current file to .1 backupPath := fmt.Sprintf("%s.1", w.path) if err := os.Rename(w.path, backupPath); err != nil { // If rename fails, just truncate if err := os.Truncate(w.path, 0); err != nil { return fmt.Errorf("failed to truncate file: %w", err) } } // Open new file newFile, err := os.OpenFile(w.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) if err != nil { return fmt.Errorf("failed to open new file: %w", err) } w.file = newFile w.encoder = json.NewEncoder(newFile) w.currentSize = 0 return nil } // Write writes a log record to the file func (w *FileWriter) Write(rec api.LogRecord) error { w.mutex.Lock() defer w.mutex.Unlock() // Check if rotation is needed if w.currentSize >= w.maxSize { if err := w.rotate(); err != nil { w.reportError(fmt.Errorf("failed to rotate file: %w", err)) return fmt.Errorf("failed to rotate file: %w", err) } } // Encode to buffer first to get size data, err := json.Marshal(rec) if err != nil { return fmt.Errorf("failed to marshal record: %w", err) } data = append(data, '\n') // Write to file n, err := w.file.Write(data) if err != nil { w.reportError(fmt.Errorf("failed to write to file: %w", err)) return fmt.Errorf("failed to write to file: %w", err) } w.currentSize += int64(n) return nil } // reportError reports a file write error via the configured callback func (w *FileWriter) reportError(err error) { if w.errorCallback != nil { w.failuresMu.Lock() w.failures++ failures := w.failures w.failuresMu.Unlock() w.errorCallback(w.path, err, failures) } } // Close closes the file func (w *FileWriter) Close() error { w.mutex.Lock() defer w.mutex.Unlock() if w.file != nil { return w.file.Close() } return nil } // Reopen reopens the log file (for logrotate support) func (w *FileWriter) Reopen() error { w.mutex.Lock() defer w.mutex.Unlock() if err := w.file.Close(); err != nil { return fmt.Errorf("failed to close file during reopen: %w", err) } // Open new file newFile, err := os.OpenFile(w.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) if err != nil { return fmt.Errorf("failed to reopen file %s: %w", w.path, err) } w.file = newFile w.encoder = json.NewEncoder(newFile) w.currentSize = 0 return nil } // ErrorCallback is a function type for reporting socket connection errors type ErrorCallback func(socketPath string, err error, attempt int) // UnixSocketWriter writes log records to a UNIX socket with reconnection logic // No internal logging - only LogRecord JSON data is sent to the socket type UnixSocketWriter struct { socketPath string conn net.Conn mutex sync.Mutex dialTimeout time.Duration writeTimeout time.Duration maxReconnects int reconnectBackoff time.Duration maxBackoff time.Duration queue chan []byte queueClose chan struct{} queueDone chan struct{} closeOnce sync.Once isClosed bool pendingWrites [][]byte pendingMu sync.Mutex errorCallback ErrorCallback consecutiveFailures int failuresMu sync.Mutex } // NewUnixSocketWriter creates a new UNIX socket writer with reconnection logic func NewUnixSocketWriter(socketPath string) (*UnixSocketWriter, error) { return NewUnixSocketWriterWithConfig(socketPath, DefaultDialTimeout, DefaultWriteTimeout, DefaultQueueSize) } // UnixSocketWriterOption is a function type for configuring UnixSocketWriter type UnixSocketWriterOption func(*UnixSocketWriter) // WithErrorCallback sets an error callback for socket connection errors func WithErrorCallback(cb ErrorCallback) UnixSocketWriterOption { return func(w *UnixSocketWriter) { w.errorCallback = cb } } // NewUnixSocketWriterWithConfig creates a new UNIX socket writer with custom configuration func NewUnixSocketWriterWithConfig(socketPath string, dialTimeout, writeTimeout time.Duration, queueSize int, opts ...UnixSocketWriterOption) (*UnixSocketWriter, error) { w := &UnixSocketWriter{ socketPath: socketPath, dialTimeout: dialTimeout, writeTimeout: writeTimeout, maxReconnects: DefaultMaxReconnectAttempts, reconnectBackoff: DefaultReconnectBackoff, maxBackoff: DefaultMaxReconnectBackoff, queue: make(chan []byte, queueSize), queueClose: make(chan struct{}), queueDone: make(chan struct{}), pendingWrites: make([][]byte, 0), } // Apply options for _, opt := range opts { opt(w) } // Start the queue processor go w.processQueue() // Try initial connection silently (socket may not exist yet - that's okay) conn, err := net.DialTimeout("unix", socketPath, w.dialTimeout) if err == nil { w.conn = conn } return w, nil } // processQueue handles queued writes with reconnection logic func (w *UnixSocketWriter) processQueue() { defer close(w.queueDone) backoff := w.reconnectBackoff for { select { case data, ok := <-w.queue: if !ok { // Channel closed, drain remaining data w.flushPendingData() return } if err := w.writeWithReconnect(data); err != nil { w.failuresMu.Lock() w.consecutiveFailures++ failures := w.consecutiveFailures w.failuresMu.Unlock() // Report error via callback if configured w.reportError(err, failures) // Queue for retry w.pendingMu.Lock() if len(w.pendingWrites) < DefaultQueueSize { w.pendingWrites = append(w.pendingWrites, data) } w.pendingMu.Unlock() // Exponential backoff if failures > w.maxReconnects { time.Sleep(backoff) backoff *= 2 if backoff > w.maxBackoff { backoff = w.maxBackoff } } } else { w.failuresMu.Lock() w.consecutiveFailures = 0 w.failuresMu.Unlock() backoff = w.reconnectBackoff // Try to flush pending data w.flushPendingData() } case <-w.queueClose: w.flushPendingData() return } } } // reportError reports a socket connection error via the configured callback func (w *UnixSocketWriter) reportError(err error, attempt int) { if w.errorCallback != nil { w.errorCallback(w.socketPath, err, attempt) } } // flushPendingData attempts to write any pending data func (w *UnixSocketWriter) flushPendingData() { w.pendingMu.Lock() pending := w.pendingWrites w.pendingWrites = make([][]byte, 0) w.pendingMu.Unlock() for _, data := range pending { if err := w.writeWithReconnect(data); err != nil { // Put it back for next flush attempt w.pendingMu.Lock() if len(w.pendingWrites) < DefaultQueueSize { w.pendingWrites = append(w.pendingWrites, data) } w.pendingMu.Unlock() break } } } // writeWithReconnect attempts to write data with reconnection logic func (w *UnixSocketWriter) writeWithReconnect(data []byte) error { w.mutex.Lock() defer w.mutex.Unlock() ensureConn := func() error { if w.conn != nil { return nil } conn, err := net.DialTimeout("unix", w.socketPath, w.dialTimeout) if err != nil { return fmt.Errorf("failed to connect to socket %s: %w", w.socketPath, err) } w.conn = conn return nil } if err := ensureConn(); err != nil { return err } if err := w.conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)); err != nil { return fmt.Errorf("failed to set write deadline: %w", err) } if _, err := w.conn.Write(data); err == nil { return nil } // Connection failed, try to reconnect _ = w.conn.Close() w.conn = nil if err := ensureConn(); err != nil { return fmt.Errorf("failed to reconnect: %w", err) } if err := w.conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)); err != nil { _ = w.conn.Close() w.conn = nil return fmt.Errorf("failed to set write deadline after reconnect: %w", err) } if _, err := w.conn.Write(data); err != nil { _ = w.conn.Close() w.conn = nil return fmt.Errorf("failed to write after reconnect: %w", err) } return nil } // Write writes a log record to the UNIX socket (non-blocking with queue) func (w *UnixSocketWriter) Write(rec api.LogRecord) error { w.mutex.Lock() if w.isClosed { w.mutex.Unlock() return fmt.Errorf("writer is closed") } w.mutex.Unlock() data, err := json.Marshal(rec) if err != nil { return fmt.Errorf("failed to marshal record: %w", err) } data = append(data, '\n') select { case w.queue <- data: return nil default: // Queue is full, drop the message (could also block or return error) return fmt.Errorf("write queue is full, dropping message") } } // Close closes the UNIX socket connection and stops the queue processor func (w *UnixSocketWriter) Close() error { w.closeOnce.Do(func() { close(w.queueClose) <-w.queueDone close(w.queue) w.mutex.Lock() defer w.mutex.Unlock() w.isClosed = true if w.conn != nil { w.conn.Close() w.conn = nil } }) return nil } // MultiWriter combines multiple writers type MultiWriter struct { writers []api.Writer mutex sync.Mutex } // NewMultiWriter creates a new multi-writer func NewMultiWriter() *MultiWriter { return &MultiWriter{ writers: make([]api.Writer, 0), } } // Write writes a log record to all writers func (mw *MultiWriter) Write(rec api.LogRecord) error { mw.mutex.Lock() defer mw.mutex.Unlock() var lastErr error for _, w := range mw.writers { if err := w.Write(rec); err != nil { lastErr = err } } return lastErr } // Add adds a writer to the multi-writer func (mw *MultiWriter) Add(writer api.Writer) { mw.mutex.Lock() defer mw.mutex.Unlock() mw.writers = append(mw.writers, writer) } // CloseAll closes all writers func (mw *MultiWriter) CloseAll() error { mw.mutex.Lock() defer mw.mutex.Unlock() var lastErr error for _, w := range mw.writers { if closer, ok := w.(io.Closer); ok { if err := closer.Close(); err != nil { lastErr = err } } } return lastErr } // Reopen reopens all writers that support log rotation func (mw *MultiWriter) Reopen() error { mw.mutex.Lock() defer mw.mutex.Unlock() var lastErr error for _, w := range mw.writers { if reopenable, ok := w.(api.Reopenable); ok { if err := reopenable.Reopen(); err != nil { lastErr = err } } } return lastErr } // BuilderImpl implements the api.Builder interface type BuilderImpl struct { errorCallback ErrorCallback } // NewBuilder creates a new output builder func NewBuilder() *BuilderImpl { return &BuilderImpl{} } // WithErrorCallback sets an error callback for all unix_socket and file writers created by this builder func (b *BuilderImpl) WithErrorCallback(cb ErrorCallback) *BuilderImpl { b.errorCallback = cb return b } // NewFromConfig constructs writers from AppConfig // Uses AsyncBuffer from OutputConfig if specified, otherwise uses DefaultQueueSize func (b *BuilderImpl) NewFromConfig(cfg api.AppConfig) (api.Writer, error) { multiWriter := NewMultiWriter() for _, outputCfg := range cfg.Outputs { if !outputCfg.Enabled { continue } var writer api.Writer var err error // Determine queue size: use AsyncBuffer if specified, otherwise default queueSize := DefaultQueueSize if outputCfg.AsyncBuffer > 0 { queueSize = outputCfg.AsyncBuffer } switch outputCfg.Type { case "stdout": writer = NewStdoutWriter() case "file": path := outputCfg.Params["path"] if path == "" { return nil, fmt.Errorf("file output requires 'path' parameter") } // Build options list for file writer var fileOpts []FileWriterOption if b.errorCallback != nil { fileOpts = append(fileOpts, WithFileErrorCallback(b.errorCallback)) } writer, err = NewFileWriterWithConfig(path, DefaultMaxFileSize, DefaultMaxBackups, fileOpts...) if err != nil { return nil, err } case "unix_socket": socketPath := outputCfg.Params["socket_path"] if socketPath == "" { return nil, fmt.Errorf("unix_socket output requires 'socket_path' parameter") } // Build options list var opts []UnixSocketWriterOption if b.errorCallback != nil { opts = append(opts, WithErrorCallback(b.errorCallback)) } writer, err = NewUnixSocketWriterWithConfig(socketPath, DefaultDialTimeout, DefaultWriteTimeout, queueSize, opts...) if err != nil { return nil, err } default: return nil, fmt.Errorf("unknown output type: %s", outputCfg.Type) } multiWriter.Add(writer) } // If no outputs configured, default to stdout if len(multiWriter.writers) == 0 { multiWriter.Add(NewStdoutWriter()) } return multiWriter, nil }