Files
ja4sentinel/internal/output/writers.go
toto e166fdab2e
Some checks failed
Build RPM Package / Build RPM Packages (CentOS 7, Rocky 8/9/10) (push) Has been cancelled
feature: 1.1.18
+- FEATURE: Add comprehensive metrics for capture and TLS parser monitoring
+- Capture metrics: packets_received, packets_sent, packets_dropped (atomic counters)
+- Parser metrics: retransmit_count, gap_detected_count, buffer_exceeded_count, segment_exceeded_count
+- New GetStats() method on Capture interface for capture statistics
+- New GetMetrics() method on Parser interface for parser statistics
+- Add DefaultMaxHelloSegments constant (100) to prevent memory leaks from fragmented handshakes
+- Add Segments field to ConnectionFlow for per-flow segment tracking
+- Increase DefaultMaxTrackedFlows from 50000 to 100000 for high-traffic scenarios
+- Improve TCP reassembly: better handling of retransmissions and sequence gaps
+- Memory leak prevention: limit segments per flow and buffer size
+- Aggressive flow cleanup: clean up JA4_DONE flows when approaching flow limit
+- Lock ordering fix: release flow.mu before acquiring p.mu to avoid deadlocks
+- Exclude IPv6 link-local addresses (fe80::) from local IP detection
+- Improve error logging with detailed connection and TLS extension information
+- Add capture diagnostics logging (interface, link_type, local_ips, bpf_filter)
+- Fix false positive retransmission counter when SYN packet is missed
+- Fix gap handling: reset sequence tracking instead of dropping flow
+- Fix extractTLSExtensions: return error details with basic TLS info for debugging
2026-03-09 16:38:40 +01:00

645 lines
16 KiB
Go

// Package output provides writers for ja4sentinel log records
package output
import (
"encoding/json"
"fmt"
"io"
"net"
"os"
"path/filepath"
"sync"
"time"
"ja4sentinel/api"
)
// Socket configuration constants
const (
// DefaultDialTimeout is the default timeout for socket connections
DefaultDialTimeout = 5 * time.Second
// DefaultWriteTimeout is the default timeout for socket writes
DefaultWriteTimeout = 5 * time.Second
// DefaultMaxReconnectAttempts is the maximum number of reconnection attempts
DefaultMaxReconnectAttempts = 3
// DefaultReconnectBackoff is the initial backoff duration for reconnection
DefaultReconnectBackoff = 100 * time.Millisecond
// DefaultMaxReconnectBackoff is the maximum backoff duration
DefaultMaxReconnectBackoff = 2 * time.Second
// DefaultQueueSize is the size of the write queue for async writes
DefaultQueueSize = 1000
// DefaultMaxFileSize is the default maximum file size in bytes before rotation (100MB)
DefaultMaxFileSize = 100 * 1024 * 1024
// DefaultMaxBackups is the default number of backup files to keep
DefaultMaxBackups = 3
)
// StdoutWriter writes log records to stdout
type StdoutWriter struct {
encoder *json.Encoder
mutex sync.Mutex
}
// NewStdoutWriter creates a new stdout writer
func NewStdoutWriter() *StdoutWriter {
return &StdoutWriter{
encoder: json.NewEncoder(os.Stdout),
}
}
// Write writes a log record to stdout
func (w *StdoutWriter) Write(rec api.LogRecord) error {
w.mutex.Lock()
defer w.mutex.Unlock()
return w.encoder.Encode(rec)
}
// Close closes the writer (no-op for stdout)
func (w *StdoutWriter) Close() error {
return nil
}
// FileWriter writes log records to a file with rotation support
type FileWriter struct {
file *os.File
encoder *json.Encoder
mutex sync.Mutex
path string
maxSize int64
maxBackups int
currentSize int64
errorCallback ErrorCallback
failuresMu sync.Mutex
failures int
}
// FileWriterOption is a function type for configuring FileWriter
type FileWriterOption func(*FileWriter)
// WithFileErrorCallback sets an error callback for file write errors
func WithFileErrorCallback(cb ErrorCallback) FileWriterOption {
return func(w *FileWriter) {
w.errorCallback = cb
}
}
// NewFileWriter creates a new file writer with rotation
func NewFileWriter(path string) (*FileWriter, error) {
return NewFileWriterWithConfig(path, DefaultMaxFileSize, DefaultMaxBackups)
}
// NewFileWriterWithConfig creates a new file writer with custom rotation config
func NewFileWriterWithConfig(path string, maxSize int64, maxBackups int, opts ...FileWriterOption) (*FileWriter, error) {
// Create directory if it doesn't exist
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("failed to create directory %s: %w", dir, err)
}
// Open file with secure permissions (owner read/write only)
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return nil, fmt.Errorf("failed to open file %s: %w", path, err)
}
// Get current file size
info, err := file.Stat()
if err != nil {
file.Close()
return nil, fmt.Errorf("failed to stat file: %w", err)
}
w := &FileWriter{
file: file,
encoder: json.NewEncoder(file),
path: path,
maxSize: maxSize,
maxBackups: maxBackups,
currentSize: info.Size(),
}
// Apply options (for error callback)
for _, opt := range opts {
opt(w)
}
return w, nil
}
// rotate rotates the log file if it exceeds the max size
func (w *FileWriter) rotate() error {
if err := w.file.Close(); err != nil {
return fmt.Errorf("failed to close file: %w", err)
}
// Rotate existing backups
for i := w.maxBackups; i > 1; i-- {
oldPath := fmt.Sprintf("%s.%d", w.path, i-1)
newPath := fmt.Sprintf("%s.%d", w.path, i)
os.Rename(oldPath, newPath) // Ignore errors - file may not exist
}
// Move current file to .1
backupPath := fmt.Sprintf("%s.1", w.path)
if err := os.Rename(w.path, backupPath); err != nil {
// If rename fails, just truncate
if err := os.Truncate(w.path, 0); err != nil {
return fmt.Errorf("failed to truncate file: %w", err)
}
}
// Open new file
newFile, err := os.OpenFile(w.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return fmt.Errorf("failed to open new file: %w", err)
}
w.file = newFile
w.encoder = json.NewEncoder(newFile)
w.currentSize = 0
return nil
}
// Write writes a log record to the file
func (w *FileWriter) Write(rec api.LogRecord) error {
w.mutex.Lock()
defer w.mutex.Unlock()
// Check if rotation is needed
if w.currentSize >= w.maxSize {
if err := w.rotate(); err != nil {
w.reportError(fmt.Errorf("failed to rotate file: %w", err))
return fmt.Errorf("failed to rotate file: %w", err)
}
}
// Encode to buffer first to get size
data, err := json.Marshal(rec)
if err != nil {
return fmt.Errorf("failed to marshal record: %w", err)
}
data = append(data, '\n')
// Write to file
n, err := w.file.Write(data)
if err != nil {
w.reportError(fmt.Errorf("failed to write to file: %w", err))
return fmt.Errorf("failed to write to file: %w", err)
}
w.currentSize += int64(n)
return nil
}
// reportError reports a file write error via the configured callback
func (w *FileWriter) reportError(err error) {
if w.errorCallback != nil {
w.failuresMu.Lock()
w.failures++
failures := w.failures
w.failuresMu.Unlock()
w.errorCallback(w.path, err, failures)
}
}
// Close closes the file
func (w *FileWriter) Close() error {
w.mutex.Lock()
defer w.mutex.Unlock()
if w.file != nil {
return w.file.Close()
}
return nil
}
// Reopen reopens the log file (for logrotate support)
func (w *FileWriter) Reopen() error {
w.mutex.Lock()
defer w.mutex.Unlock()
if err := w.file.Close(); err != nil {
return fmt.Errorf("failed to close file during reopen: %w", err)
}
// Open new file
newFile, err := os.OpenFile(w.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return fmt.Errorf("failed to reopen file %s: %w", w.path, err)
}
w.file = newFile
w.encoder = json.NewEncoder(newFile)
w.currentSize = 0
return nil
}
// ErrorCallback is a function type for reporting socket connection errors
type ErrorCallback func(socketPath string, err error, attempt int)
// UnixSocketWriter writes log records to a UNIX socket with reconnection logic
// No internal logging - only LogRecord JSON data is sent to the socket
type UnixSocketWriter struct {
socketPath string
conn net.Conn
mutex sync.Mutex
dialTimeout time.Duration
writeTimeout time.Duration
maxReconnects int
reconnectBackoff time.Duration
maxBackoff time.Duration
queue chan []byte
queueClose chan struct{}
queueDone chan struct{}
closeOnce sync.Once
isClosed bool
pendingWrites [][]byte
pendingMu sync.Mutex
errorCallback ErrorCallback
consecutiveFailures int
failuresMu sync.Mutex
networkType string // "unix" for STREAM, "unixgram" for DGRAM
}
// NewUnixSocketWriter creates a new UNIX socket writer with reconnection logic
func NewUnixSocketWriter(socketPath string) (*UnixSocketWriter, error) {
return NewUnixSocketWriterWithConfig(socketPath, DefaultDialTimeout, DefaultWriteTimeout, DefaultQueueSize)
}
// UnixSocketWriterOption is a function type for configuring UnixSocketWriter
type UnixSocketWriterOption func(*UnixSocketWriter)
// WithErrorCallback sets an error callback for socket connection errors
func WithErrorCallback(cb ErrorCallback) UnixSocketWriterOption {
return func(w *UnixSocketWriter) {
w.errorCallback = cb
}
}
// NewUnixSocketWriterWithConfig creates a new UNIX socket writer with custom configuration
func NewUnixSocketWriterWithConfig(socketPath string, dialTimeout, writeTimeout time.Duration, queueSize int, opts ...UnixSocketWriterOption) (*UnixSocketWriter, error) {
w := &UnixSocketWriter{
socketPath: socketPath,
dialTimeout: dialTimeout,
writeTimeout: writeTimeout,
maxReconnects: DefaultMaxReconnectAttempts,
reconnectBackoff: DefaultReconnectBackoff,
maxBackoff: DefaultMaxReconnectBackoff,
queue: make(chan []byte, queueSize),
queueClose: make(chan struct{}),
queueDone: make(chan struct{}),
pendingWrites: make([][]byte, 0),
}
// Apply options
for _, opt := range opts {
opt(w)
}
// Start the queue processor
go w.processQueue()
// Try initial connection silently (socket may not exist yet - that's okay)
// Use unixgram (DGRAM) for connectionless UDP-like socket communication
conn, err := net.DialTimeout("unixgram", socketPath, w.dialTimeout)
if err == nil {
w.conn = conn
}
return w, nil
}
// processQueue handles queued writes with reconnection logic
func (w *UnixSocketWriter) processQueue() {
defer close(w.queueDone)
backoff := w.reconnectBackoff
for {
select {
case data, ok := <-w.queue:
if !ok {
// Channel closed, drain remaining data
w.flushPendingData()
return
}
if err := w.writeWithReconnect(data); err != nil {
w.failuresMu.Lock()
w.consecutiveFailures++
failures := w.consecutiveFailures
w.failuresMu.Unlock()
// Report error via callback if configured
w.reportError(err, failures)
// Queue for retry
w.pendingMu.Lock()
if len(w.pendingWrites) < DefaultQueueSize {
w.pendingWrites = append(w.pendingWrites, data)
}
w.pendingMu.Unlock()
// Exponential backoff
if failures > w.maxReconnects {
time.Sleep(backoff)
backoff *= 2
if backoff > w.maxBackoff {
backoff = w.maxBackoff
}
}
} else {
w.failuresMu.Lock()
w.consecutiveFailures = 0
w.failuresMu.Unlock()
backoff = w.reconnectBackoff
// Try to flush pending data
w.flushPendingData()
}
case <-w.queueClose:
w.flushPendingData()
return
}
}
}
// reportError reports a socket connection error via the configured callback
func (w *UnixSocketWriter) reportError(err error, attempt int) {
if w.errorCallback != nil {
w.errorCallback(w.socketPath, err, attempt)
}
}
// flushPendingData attempts to write any pending data
func (w *UnixSocketWriter) flushPendingData() {
w.pendingMu.Lock()
pending := w.pendingWrites
w.pendingWrites = make([][]byte, 0)
w.pendingMu.Unlock()
for _, data := range pending {
if err := w.writeWithReconnect(data); err != nil {
// Put it back for next flush attempt
w.pendingMu.Lock()
if len(w.pendingWrites) < DefaultQueueSize {
w.pendingWrites = append(w.pendingWrites, data)
}
w.pendingMu.Unlock()
break
}
}
}
// writeWithReconnect attempts to write data with reconnection logic
func (w *UnixSocketWriter) writeWithReconnect(data []byte) error {
w.mutex.Lock()
defer w.mutex.Unlock()
ensureConn := func() error {
if w.conn != nil {
return nil
}
// Use unixgram (DGRAM) for connectionless UDP-like socket communication
conn, err := net.DialTimeout("unixgram", w.socketPath, w.dialTimeout)
if err != nil {
return fmt.Errorf("failed to connect to socket %s: %w", w.socketPath, err)
}
w.conn = conn
return nil
}
if err := ensureConn(); err != nil {
return err
}
if err := w.conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)); err != nil {
return fmt.Errorf("failed to set write deadline: %w", err)
}
if _, err := w.conn.Write(data); err == nil {
return nil
}
// Connection failed, try to reconnect
_ = w.conn.Close()
w.conn = nil
if err := ensureConn(); err != nil {
return fmt.Errorf("failed to reconnect: %w", err)
}
if err := w.conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)); err != nil {
_ = w.conn.Close()
w.conn = nil
return fmt.Errorf("failed to set write deadline after reconnect: %w", err)
}
if _, err := w.conn.Write(data); err != nil {
_ = w.conn.Close()
w.conn = nil
return fmt.Errorf("failed to write after reconnect: %w", err)
}
return nil
}
// Write writes a log record to the UNIX socket (non-blocking with queue).
// Bug 12 fix: marshal JSON outside the lock, then hold mutex through both the
// isClosed check AND the non-blocking channel send so Close() cannot close the
// channel between those two operations.
func (w *UnixSocketWriter) Write(rec api.LogRecord) error {
data, err := json.Marshal(rec)
if err != nil {
return fmt.Errorf("failed to marshal record: %w", err)
}
data = append(data, '\n')
w.mutex.Lock()
defer w.mutex.Unlock()
if w.isClosed {
return fmt.Errorf("writer is closed")
}
select {
case w.queue <- data:
return nil
default:
return fmt.Errorf("write queue is full, dropping message")
}
}
// Close closes the UNIX socket connection and stops the queue processor.
// Bug 12 fix: set isClosed=true under mutex BEFORE closing the channel so a
// concurrent Write() sees the flag and returns early instead of panicking on
// a send-on-closed-channel.
func (w *UnixSocketWriter) Close() error {
w.closeOnce.Do(func() {
w.mutex.Lock()
w.isClosed = true
w.mutex.Unlock()
close(w.queueClose)
<-w.queueDone
close(w.queue)
w.mutex.Lock()
if w.conn != nil {
w.conn.Close()
w.conn = nil
}
w.mutex.Unlock()
})
return nil
}
// MultiWriter combines multiple writers
type MultiWriter struct {
writers []api.Writer
mutex sync.Mutex
}
// NewMultiWriter creates a new multi-writer
func NewMultiWriter() *MultiWriter {
return &MultiWriter{
writers: make([]api.Writer, 0),
}
}
// Write writes a log record to all writers
func (mw *MultiWriter) Write(rec api.LogRecord) error {
mw.mutex.Lock()
defer mw.mutex.Unlock()
var lastErr error
for _, w := range mw.writers {
if err := w.Write(rec); err != nil {
lastErr = err
}
}
return lastErr
}
// Add adds a writer to the multi-writer
func (mw *MultiWriter) Add(writer api.Writer) {
mw.mutex.Lock()
defer mw.mutex.Unlock()
mw.writers = append(mw.writers, writer)
}
// CloseAll closes all writers
func (mw *MultiWriter) CloseAll() error {
mw.mutex.Lock()
defer mw.mutex.Unlock()
var lastErr error
for _, w := range mw.writers {
if closer, ok := w.(io.Closer); ok {
if err := closer.Close(); err != nil {
lastErr = err
}
}
}
return lastErr
}
// Reopen reopens all writers that support log rotation
func (mw *MultiWriter) Reopen() error {
mw.mutex.Lock()
defer mw.mutex.Unlock()
var lastErr error
for _, w := range mw.writers {
if reopenable, ok := w.(api.Reopenable); ok {
if err := reopenable.Reopen(); err != nil {
lastErr = err
}
}
}
return lastErr
}
// BuilderImpl implements the api.Builder interface
type BuilderImpl struct {
errorCallback ErrorCallback
}
// NewBuilder creates a new output builder
func NewBuilder() *BuilderImpl {
return &BuilderImpl{}
}
// WithErrorCallback sets an error callback for all unix_socket and file writers created by this builder
func (b *BuilderImpl) WithErrorCallback(cb ErrorCallback) *BuilderImpl {
b.errorCallback = cb
return b
}
// NewFromConfig constructs writers from AppConfig
// Uses AsyncBuffer from OutputConfig if specified, otherwise uses DefaultQueueSize
func (b *BuilderImpl) NewFromConfig(cfg api.AppConfig) (api.Writer, error) {
multiWriter := NewMultiWriter()
for _, outputCfg := range cfg.Outputs {
if !outputCfg.Enabled {
continue
}
var writer api.Writer
var err error
// Determine queue size: use AsyncBuffer if specified, otherwise default
queueSize := DefaultQueueSize
if outputCfg.AsyncBuffer > 0 {
queueSize = outputCfg.AsyncBuffer
}
switch outputCfg.Type {
case "stdout":
writer = NewStdoutWriter()
case "file":
path := outputCfg.Params["path"]
if path == "" {
return nil, fmt.Errorf("file output requires 'path' parameter")
}
// Build options list for file writer
var fileOpts []FileWriterOption
if b.errorCallback != nil {
fileOpts = append(fileOpts, WithFileErrorCallback(b.errorCallback))
}
writer, err = NewFileWriterWithConfig(path, DefaultMaxFileSize, DefaultMaxBackups, fileOpts...)
if err != nil {
return nil, err
}
case "unix_socket":
socketPath := outputCfg.Params["socket_path"]
if socketPath == "" {
return nil, fmt.Errorf("unix_socket output requires 'socket_path' parameter")
}
// Build options list
var opts []UnixSocketWriterOption
if b.errorCallback != nil {
opts = append(opts, WithErrorCallback(b.errorCallback))
}
writer, err = NewUnixSocketWriterWithConfig(socketPath, DefaultDialTimeout, DefaultWriteTimeout, queueSize, opts...)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown output type: %s", outputCfg.Type)
}
multiWriter.Add(writer)
}
// If no outputs configured, default to stdout
if len(multiWriter.writers) == 0 {
multiWriter.Add(NewStdoutWriter())
}
return multiWriter, nil
}