Some checks failed
Build RPM Package / Build RPM Packages (CentOS 7, Rocky 8/9/10) (push) Has been cancelled
Features: - Add ErrorCallback type for UNIX socket connection error reporting - Add WithErrorCallback option for UnixSocketWriter configuration - Add BuilderImpl.WithErrorCallback() for propagating callbacks - Add consecutive failure tracking in processQueue Testing (50+ new tests): - Add integration tests for full pipeline (capture → tlsparse → fingerprint → output) - Add tests for FileWriter.rotate() and Reopen() log rotation - Add tests for cleanupExpiredFlows() and cleanupLoop() in TLS parser - Add tests for extractSNIFromPayload() and extractJA4Hash() helpers - Add tests for config load error paths (invalid YAML, permission denied) - Add tests for capture.Run() error conditions - Add tests for signal handling documentation Documentation: - Update architecture.yml with new fields (LogLevel, TLSClientHello extensions) - Update architecture.yml with Close() methods for Capture and Parser interfaces - Update RPM spec changelog Cleanup: - Remove empty internal/api/ directory Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
599 lines
15 KiB
Go
599 lines
15 KiB
Go
// Package output provides writers for ja4sentinel log records
|
|
package output
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"ja4sentinel/api"
|
|
)
|
|
|
|
// Socket configuration constants
|
|
const (
|
|
// DefaultDialTimeout is the default timeout for socket connections
|
|
DefaultDialTimeout = 5 * time.Second
|
|
// DefaultWriteTimeout is the default timeout for socket writes
|
|
DefaultWriteTimeout = 5 * time.Second
|
|
// DefaultMaxReconnectAttempts is the maximum number of reconnection attempts
|
|
DefaultMaxReconnectAttempts = 3
|
|
// DefaultReconnectBackoff is the initial backoff duration for reconnection
|
|
DefaultReconnectBackoff = 100 * time.Millisecond
|
|
// DefaultMaxReconnectBackoff is the maximum backoff duration
|
|
DefaultMaxReconnectBackoff = 2 * time.Second
|
|
// DefaultQueueSize is the size of the write queue for async writes
|
|
DefaultQueueSize = 1000
|
|
// DefaultMaxFileSize is the default maximum file size in bytes before rotation (100MB)
|
|
DefaultMaxFileSize = 100 * 1024 * 1024
|
|
// DefaultMaxBackups is the default number of backup files to keep
|
|
DefaultMaxBackups = 3
|
|
)
|
|
|
|
// StdoutWriter writes log records to stdout
|
|
type StdoutWriter struct {
|
|
encoder *json.Encoder
|
|
mutex sync.Mutex
|
|
}
|
|
|
|
// NewStdoutWriter creates a new stdout writer
|
|
func NewStdoutWriter() *StdoutWriter {
|
|
return &StdoutWriter{
|
|
encoder: json.NewEncoder(os.Stdout),
|
|
}
|
|
}
|
|
|
|
// Write writes a log record to stdout
|
|
func (w *StdoutWriter) Write(rec api.LogRecord) error {
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
return w.encoder.Encode(rec)
|
|
}
|
|
|
|
// Close closes the writer (no-op for stdout)
|
|
func (w *StdoutWriter) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// FileWriter writes log records to a file with rotation support
|
|
type FileWriter struct {
|
|
file *os.File
|
|
encoder *json.Encoder
|
|
mutex sync.Mutex
|
|
path string
|
|
maxSize int64
|
|
maxBackups int
|
|
currentSize int64
|
|
}
|
|
|
|
// NewFileWriter creates a new file writer with rotation
|
|
func NewFileWriter(path string) (*FileWriter, error) {
|
|
return NewFileWriterWithConfig(path, DefaultMaxFileSize, DefaultMaxBackups)
|
|
}
|
|
|
|
// NewFileWriterWithConfig creates a new file writer with custom rotation config
|
|
func NewFileWriterWithConfig(path string, maxSize int64, maxBackups int) (*FileWriter, error) {
|
|
// Create directory if it doesn't exist
|
|
dir := filepath.Dir(path)
|
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create directory %s: %w", dir, err)
|
|
}
|
|
|
|
// Open file with secure permissions (owner read/write only)
|
|
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open file %s: %w", path, err)
|
|
}
|
|
|
|
// Get current file size
|
|
info, err := file.Stat()
|
|
if err != nil {
|
|
file.Close()
|
|
return nil, fmt.Errorf("failed to stat file: %w", err)
|
|
}
|
|
|
|
return &FileWriter{
|
|
file: file,
|
|
encoder: json.NewEncoder(file),
|
|
path: path,
|
|
maxSize: maxSize,
|
|
maxBackups: maxBackups,
|
|
currentSize: info.Size(),
|
|
}, nil
|
|
}
|
|
|
|
// rotate rotates the log file if it exceeds the max size
|
|
func (w *FileWriter) rotate() error {
|
|
if err := w.file.Close(); err != nil {
|
|
return fmt.Errorf("failed to close file: %w", err)
|
|
}
|
|
|
|
// Rotate existing backups
|
|
for i := w.maxBackups; i > 1; i-- {
|
|
oldPath := fmt.Sprintf("%s.%d", w.path, i-1)
|
|
newPath := fmt.Sprintf("%s.%d", w.path, i)
|
|
os.Rename(oldPath, newPath) // Ignore errors - file may not exist
|
|
}
|
|
|
|
// Move current file to .1
|
|
backupPath := fmt.Sprintf("%s.1", w.path)
|
|
if err := os.Rename(w.path, backupPath); err != nil {
|
|
// If rename fails, just truncate
|
|
if err := os.Truncate(w.path, 0); err != nil {
|
|
return fmt.Errorf("failed to truncate file: %w", err)
|
|
}
|
|
}
|
|
|
|
// Open new file
|
|
newFile, err := os.OpenFile(w.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open new file: %w", err)
|
|
}
|
|
|
|
w.file = newFile
|
|
w.encoder = json.NewEncoder(newFile)
|
|
w.currentSize = 0
|
|
|
|
return nil
|
|
}
|
|
|
|
// Write writes a log record to the file
|
|
func (w *FileWriter) Write(rec api.LogRecord) error {
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
|
|
// Check if rotation is needed
|
|
if w.currentSize >= w.maxSize {
|
|
if err := w.rotate(); err != nil {
|
|
return fmt.Errorf("failed to rotate file: %w", err)
|
|
}
|
|
}
|
|
|
|
// Encode to buffer first to get size
|
|
data, err := json.Marshal(rec)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal record: %w", err)
|
|
}
|
|
data = append(data, '\n')
|
|
|
|
// Write to file
|
|
n, err := w.file.Write(data)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to write to file: %w", err)
|
|
}
|
|
w.currentSize += int64(n)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close closes the file
|
|
func (w *FileWriter) Close() error {
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
if w.file != nil {
|
|
return w.file.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Reopen reopens the log file (for logrotate support)
|
|
func (w *FileWriter) Reopen() error {
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
|
|
if err := w.file.Close(); err != nil {
|
|
return fmt.Errorf("failed to close file during reopen: %w", err)
|
|
}
|
|
|
|
// Open new file
|
|
newFile, err := os.OpenFile(w.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to reopen file %s: %w", w.path, err)
|
|
}
|
|
|
|
w.file = newFile
|
|
w.encoder = json.NewEncoder(newFile)
|
|
w.currentSize = 0
|
|
|
|
return nil
|
|
}
|
|
|
|
// ErrorCallback is a function type for reporting socket connection errors
|
|
type ErrorCallback func(socketPath string, err error, attempt int)
|
|
|
|
// UnixSocketWriter writes log records to a UNIX socket with reconnection logic
|
|
// No internal logging - only LogRecord JSON data is sent to the socket
|
|
type UnixSocketWriter struct {
|
|
socketPath string
|
|
conn net.Conn
|
|
mutex sync.Mutex
|
|
dialTimeout time.Duration
|
|
writeTimeout time.Duration
|
|
maxReconnects int
|
|
reconnectBackoff time.Duration
|
|
maxBackoff time.Duration
|
|
queue chan []byte
|
|
queueClose chan struct{}
|
|
queueDone chan struct{}
|
|
closeOnce sync.Once
|
|
isClosed bool
|
|
pendingWrites [][]byte
|
|
pendingMu sync.Mutex
|
|
errorCallback ErrorCallback
|
|
consecutiveFailures int
|
|
failuresMu sync.Mutex
|
|
}
|
|
|
|
// NewUnixSocketWriter creates a new UNIX socket writer with reconnection logic
|
|
func NewUnixSocketWriter(socketPath string) (*UnixSocketWriter, error) {
|
|
return NewUnixSocketWriterWithConfig(socketPath, DefaultDialTimeout, DefaultWriteTimeout, DefaultQueueSize)
|
|
}
|
|
|
|
// UnixSocketWriterOption is a function type for configuring UnixSocketWriter
|
|
type UnixSocketWriterOption func(*UnixSocketWriter)
|
|
|
|
// WithErrorCallback sets an error callback for socket connection errors
|
|
func WithErrorCallback(cb ErrorCallback) UnixSocketWriterOption {
|
|
return func(w *UnixSocketWriter) {
|
|
w.errorCallback = cb
|
|
}
|
|
}
|
|
|
|
// NewUnixSocketWriterWithConfig creates a new UNIX socket writer with custom configuration
|
|
func NewUnixSocketWriterWithConfig(socketPath string, dialTimeout, writeTimeout time.Duration, queueSize int, opts ...UnixSocketWriterOption) (*UnixSocketWriter, error) {
|
|
w := &UnixSocketWriter{
|
|
socketPath: socketPath,
|
|
dialTimeout: dialTimeout,
|
|
writeTimeout: writeTimeout,
|
|
maxReconnects: DefaultMaxReconnectAttempts,
|
|
reconnectBackoff: DefaultReconnectBackoff,
|
|
maxBackoff: DefaultMaxReconnectBackoff,
|
|
queue: make(chan []byte, queueSize),
|
|
queueClose: make(chan struct{}),
|
|
queueDone: make(chan struct{}),
|
|
pendingWrites: make([][]byte, 0),
|
|
}
|
|
|
|
// Apply options
|
|
for _, opt := range opts {
|
|
opt(w)
|
|
}
|
|
|
|
// Start the queue processor
|
|
go w.processQueue()
|
|
|
|
// Try initial connection silently (socket may not exist yet - that's okay)
|
|
conn, err := net.DialTimeout("unix", socketPath, w.dialTimeout)
|
|
if err == nil {
|
|
w.conn = conn
|
|
}
|
|
|
|
return w, nil
|
|
}
|
|
|
|
// processQueue handles queued writes with reconnection logic
|
|
func (w *UnixSocketWriter) processQueue() {
|
|
defer close(w.queueDone)
|
|
|
|
backoff := w.reconnectBackoff
|
|
|
|
for {
|
|
select {
|
|
case data, ok := <-w.queue:
|
|
if !ok {
|
|
// Channel closed, drain remaining data
|
|
w.flushPendingData()
|
|
return
|
|
}
|
|
|
|
if err := w.writeWithReconnect(data); err != nil {
|
|
w.failuresMu.Lock()
|
|
w.consecutiveFailures++
|
|
failures := w.consecutiveFailures
|
|
w.failuresMu.Unlock()
|
|
|
|
// Report error via callback if configured
|
|
w.reportError(err, failures)
|
|
|
|
// Queue for retry
|
|
w.pendingMu.Lock()
|
|
if len(w.pendingWrites) < DefaultQueueSize {
|
|
w.pendingWrites = append(w.pendingWrites, data)
|
|
}
|
|
w.pendingMu.Unlock()
|
|
|
|
// Exponential backoff
|
|
if failures > w.maxReconnects {
|
|
time.Sleep(backoff)
|
|
backoff *= 2
|
|
if backoff > w.maxBackoff {
|
|
backoff = w.maxBackoff
|
|
}
|
|
}
|
|
} else {
|
|
w.failuresMu.Lock()
|
|
w.consecutiveFailures = 0
|
|
w.failuresMu.Unlock()
|
|
backoff = w.reconnectBackoff
|
|
// Try to flush pending data
|
|
w.flushPendingData()
|
|
}
|
|
|
|
case <-w.queueClose:
|
|
w.flushPendingData()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// reportError reports a socket connection error via the configured callback
|
|
func (w *UnixSocketWriter) reportError(err error, attempt int) {
|
|
if w.errorCallback != nil {
|
|
w.errorCallback(w.socketPath, err, attempt)
|
|
}
|
|
}
|
|
|
|
// flushPendingData attempts to write any pending data
|
|
func (w *UnixSocketWriter) flushPendingData() {
|
|
w.pendingMu.Lock()
|
|
pending := w.pendingWrites
|
|
w.pendingWrites = make([][]byte, 0)
|
|
w.pendingMu.Unlock()
|
|
|
|
for _, data := range pending {
|
|
if err := w.writeWithReconnect(data); err != nil {
|
|
// Put it back for next flush attempt
|
|
w.pendingMu.Lock()
|
|
if len(w.pendingWrites) < DefaultQueueSize {
|
|
w.pendingWrites = append(w.pendingWrites, data)
|
|
}
|
|
w.pendingMu.Unlock()
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// writeWithReconnect attempts to write data with reconnection logic
|
|
func (w *UnixSocketWriter) writeWithReconnect(data []byte) error {
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
|
|
ensureConn := func() error {
|
|
if w.conn != nil {
|
|
return nil
|
|
}
|
|
conn, err := net.DialTimeout("unix", w.socketPath, w.dialTimeout)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to socket %s: %w", w.socketPath, err)
|
|
}
|
|
w.conn = conn
|
|
return nil
|
|
}
|
|
|
|
if err := ensureConn(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := w.conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)); err != nil {
|
|
return fmt.Errorf("failed to set write deadline: %w", err)
|
|
}
|
|
|
|
if _, err := w.conn.Write(data); err == nil {
|
|
return nil
|
|
}
|
|
|
|
// Connection failed, try to reconnect
|
|
_ = w.conn.Close()
|
|
w.conn = nil
|
|
|
|
if err := ensureConn(); err != nil {
|
|
return fmt.Errorf("failed to reconnect: %w", err)
|
|
}
|
|
|
|
if err := w.conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)); err != nil {
|
|
_ = w.conn.Close()
|
|
w.conn = nil
|
|
return fmt.Errorf("failed to set write deadline after reconnect: %w", err)
|
|
}
|
|
|
|
if _, err := w.conn.Write(data); err != nil {
|
|
_ = w.conn.Close()
|
|
w.conn = nil
|
|
return fmt.Errorf("failed to write after reconnect: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Write writes a log record to the UNIX socket (non-blocking with queue)
|
|
func (w *UnixSocketWriter) Write(rec api.LogRecord) error {
|
|
w.mutex.Lock()
|
|
if w.isClosed {
|
|
w.mutex.Unlock()
|
|
return fmt.Errorf("writer is closed")
|
|
}
|
|
w.mutex.Unlock()
|
|
|
|
data, err := json.Marshal(rec)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal record: %w", err)
|
|
}
|
|
data = append(data, '\n')
|
|
|
|
select {
|
|
case w.queue <- data:
|
|
return nil
|
|
default:
|
|
// Queue is full, drop the message (could also block or return error)
|
|
return fmt.Errorf("write queue is full, dropping message")
|
|
}
|
|
}
|
|
|
|
// Close closes the UNIX socket connection and stops the queue processor
|
|
func (w *UnixSocketWriter) Close() error {
|
|
w.closeOnce.Do(func() {
|
|
close(w.queueClose)
|
|
<-w.queueDone
|
|
close(w.queue)
|
|
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
|
|
w.isClosed = true
|
|
if w.conn != nil {
|
|
w.conn.Close()
|
|
w.conn = nil
|
|
}
|
|
})
|
|
return nil
|
|
}
|
|
|
|
// MultiWriter combines multiple writers
|
|
type MultiWriter struct {
|
|
writers []api.Writer
|
|
mutex sync.Mutex
|
|
}
|
|
|
|
// NewMultiWriter creates a new multi-writer
|
|
func NewMultiWriter() *MultiWriter {
|
|
return &MultiWriter{
|
|
writers: make([]api.Writer, 0),
|
|
}
|
|
}
|
|
|
|
// Write writes a log record to all writers
|
|
func (mw *MultiWriter) Write(rec api.LogRecord) error {
|
|
mw.mutex.Lock()
|
|
defer mw.mutex.Unlock()
|
|
|
|
var lastErr error
|
|
for _, w := range mw.writers {
|
|
if err := w.Write(rec); err != nil {
|
|
lastErr = err
|
|
}
|
|
}
|
|
|
|
return lastErr
|
|
}
|
|
|
|
// Add adds a writer to the multi-writer
|
|
func (mw *MultiWriter) Add(writer api.Writer) {
|
|
mw.mutex.Lock()
|
|
defer mw.mutex.Unlock()
|
|
mw.writers = append(mw.writers, writer)
|
|
}
|
|
|
|
// CloseAll closes all writers
|
|
func (mw *MultiWriter) CloseAll() error {
|
|
mw.mutex.Lock()
|
|
defer mw.mutex.Unlock()
|
|
|
|
var lastErr error
|
|
for _, w := range mw.writers {
|
|
if closer, ok := w.(io.Closer); ok {
|
|
if err := closer.Close(); err != nil {
|
|
lastErr = err
|
|
}
|
|
}
|
|
}
|
|
|
|
return lastErr
|
|
}
|
|
|
|
// Reopen reopens all writers that support log rotation
|
|
func (mw *MultiWriter) Reopen() error {
|
|
mw.mutex.Lock()
|
|
defer mw.mutex.Unlock()
|
|
|
|
var lastErr error
|
|
for _, w := range mw.writers {
|
|
if reopenable, ok := w.(api.Reopenable); ok {
|
|
if err := reopenable.Reopen(); err != nil {
|
|
lastErr = err
|
|
}
|
|
}
|
|
}
|
|
|
|
return lastErr
|
|
}
|
|
|
|
// BuilderImpl implements the api.Builder interface
|
|
type BuilderImpl struct {
|
|
errorCallback ErrorCallback
|
|
}
|
|
|
|
// NewBuilder creates a new output builder
|
|
func NewBuilder() *BuilderImpl {
|
|
return &BuilderImpl{}
|
|
}
|
|
|
|
// WithErrorCallback sets an error callback for all unix_socket writers created by this builder
|
|
func (b *BuilderImpl) WithErrorCallback(cb ErrorCallback) *BuilderImpl {
|
|
b.errorCallback = cb
|
|
return b
|
|
}
|
|
|
|
// NewFromConfig constructs writers from AppConfig
|
|
// Uses AsyncBuffer from OutputConfig if specified, otherwise uses DefaultQueueSize
|
|
func (b *BuilderImpl) NewFromConfig(cfg api.AppConfig) (api.Writer, error) {
|
|
multiWriter := NewMultiWriter()
|
|
|
|
for _, outputCfg := range cfg.Outputs {
|
|
if !outputCfg.Enabled {
|
|
continue
|
|
}
|
|
|
|
var writer api.Writer
|
|
var err error
|
|
|
|
// Determine queue size: use AsyncBuffer if specified, otherwise default
|
|
queueSize := DefaultQueueSize
|
|
if outputCfg.AsyncBuffer > 0 {
|
|
queueSize = outputCfg.AsyncBuffer
|
|
}
|
|
|
|
switch outputCfg.Type {
|
|
case "stdout":
|
|
writer = NewStdoutWriter()
|
|
case "file":
|
|
path := outputCfg.Params["path"]
|
|
if path == "" {
|
|
return nil, fmt.Errorf("file output requires 'path' parameter")
|
|
}
|
|
writer, err = NewFileWriter(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
case "unix_socket":
|
|
socketPath := outputCfg.Params["socket_path"]
|
|
if socketPath == "" {
|
|
return nil, fmt.Errorf("unix_socket output requires 'socket_path' parameter")
|
|
}
|
|
// Build options list
|
|
var opts []UnixSocketWriterOption
|
|
if b.errorCallback != nil {
|
|
opts = append(opts, WithErrorCallback(b.errorCallback))
|
|
}
|
|
writer, err = NewUnixSocketWriterWithConfig(socketPath, DefaultDialTimeout, DefaultWriteTimeout, queueSize, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
default:
|
|
return nil, fmt.Errorf("unknown output type: %s", outputCfg.Type)
|
|
}
|
|
|
|
multiWriter.Add(writer)
|
|
}
|
|
|
|
// If no outputs configured, default to stdout
|
|
if len(multiWriter.writers) == 0 {
|
|
multiWriter.Add(NewStdoutWriter())
|
|
}
|
|
|
|
return multiWriter, nil
|
|
}
|