release: version 1.1.2 - Add error callback mechanism and comprehensive test suite
Some checks failed
Build RPM Package / Build RPM Packages (CentOS 7, Rocky 8/9/10) (push) Has been cancelled

Features:
- Add ErrorCallback type for UNIX socket connection error reporting
- Add WithErrorCallback option for UnixSocketWriter configuration
- Add BuilderImpl.WithErrorCallback() for propagating callbacks
- Add consecutive failure tracking in processQueue

Testing (50+ new tests):
- Add integration tests for full pipeline (capture → tlsparse → fingerprint → output)
- Add tests for FileWriter.rotate() and Reopen() log rotation
- Add tests for cleanupExpiredFlows() and cleanupLoop() in TLS parser
- Add tests for extractSNIFromPayload() and extractJA4Hash() helpers
- Add tests for config load error paths (invalid YAML, permission denied)
- Add tests for capture.Run() error conditions
- Add tests for signal handling documentation

Documentation:
- Update architecture.yml with new fields (LogLevel, TLSClientHello extensions)
- Update architecture.yml with Close() methods for Capture and Parser interfaces
- Update RPM spec changelog

Cleanup:
- Remove empty internal/api/ directory

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
Jacquin Antoine
2026-03-02 23:24:56 +01:00
parent 6e5addd6d4
commit 23f3012fb1
10 changed files with 2058 additions and 10 deletions

View File

@ -202,6 +202,9 @@ func (w *FileWriter) Reopen() error {
return nil
}
// ErrorCallback is a function type for reporting socket connection errors
type ErrorCallback func(socketPath string, err error, attempt int)
// UnixSocketWriter writes log records to a UNIX socket with reconnection logic
// No internal logging - only LogRecord JSON data is sent to the socket
type UnixSocketWriter struct {
@ -220,6 +223,9 @@ type UnixSocketWriter struct {
isClosed bool
pendingWrites [][]byte
pendingMu sync.Mutex
errorCallback ErrorCallback
consecutiveFailures int
failuresMu sync.Mutex
}
// NewUnixSocketWriter creates a new UNIX socket writer with reconnection logic
@ -227,8 +233,18 @@ func NewUnixSocketWriter(socketPath string) (*UnixSocketWriter, error) {
return NewUnixSocketWriterWithConfig(socketPath, DefaultDialTimeout, DefaultWriteTimeout, DefaultQueueSize)
}
// UnixSocketWriterOption is a function type for configuring UnixSocketWriter
type UnixSocketWriterOption func(*UnixSocketWriter)
// WithErrorCallback sets an error callback for socket connection errors
func WithErrorCallback(cb ErrorCallback) UnixSocketWriterOption {
return func(w *UnixSocketWriter) {
w.errorCallback = cb
}
}
// NewUnixSocketWriterWithConfig creates a new UNIX socket writer with custom configuration
func NewUnixSocketWriterWithConfig(socketPath string, dialTimeout, writeTimeout time.Duration, queueSize int) (*UnixSocketWriter, error) {
func NewUnixSocketWriterWithConfig(socketPath string, dialTimeout, writeTimeout time.Duration, queueSize int, opts ...UnixSocketWriterOption) (*UnixSocketWriter, error) {
w := &UnixSocketWriter{
socketPath: socketPath,
dialTimeout: dialTimeout,
@ -242,6 +258,11 @@ func NewUnixSocketWriterWithConfig(socketPath string, dialTimeout, writeTimeout
pendingWrites: make([][]byte, 0),
}
// Apply options
for _, opt := range opts {
opt(w)
}
// Start the queue processor
go w.processQueue()
@ -259,7 +280,6 @@ func (w *UnixSocketWriter) processQueue() {
defer close(w.queueDone)
backoff := w.reconnectBackoff
consecutiveFailures := 0
for {
select {
@ -271,7 +291,14 @@ func (w *UnixSocketWriter) processQueue() {
}
if err := w.writeWithReconnect(data); err != nil {
consecutiveFailures++
w.failuresMu.Lock()
w.consecutiveFailures++
failures := w.consecutiveFailures
w.failuresMu.Unlock()
// Report error via callback if configured
w.reportError(err, failures)
// Queue for retry
w.pendingMu.Lock()
if len(w.pendingWrites) < DefaultQueueSize {
@ -280,7 +307,7 @@ func (w *UnixSocketWriter) processQueue() {
w.pendingMu.Unlock()
// Exponential backoff
if consecutiveFailures > w.maxReconnects {
if failures > w.maxReconnects {
time.Sleep(backoff)
backoff *= 2
if backoff > w.maxBackoff {
@ -288,7 +315,9 @@ func (w *UnixSocketWriter) processQueue() {
}
}
} else {
consecutiveFailures = 0
w.failuresMu.Lock()
w.consecutiveFailures = 0
w.failuresMu.Unlock()
backoff = w.reconnectBackoff
// Try to flush pending data
w.flushPendingData()
@ -301,6 +330,13 @@ func (w *UnixSocketWriter) processQueue() {
}
}
// reportError reports a socket connection error via the configured callback
func (w *UnixSocketWriter) reportError(err error, attempt int) {
if w.errorCallback != nil {
w.errorCallback(w.socketPath, err, attempt)
}
}
// flushPendingData attempts to write any pending data
func (w *UnixSocketWriter) flushPendingData() {
w.pendingMu.Lock()
@ -486,13 +522,21 @@ func (mw *MultiWriter) Reopen() error {
}
// BuilderImpl implements the api.Builder interface
type BuilderImpl struct{}
type BuilderImpl struct {
errorCallback ErrorCallback
}
// NewBuilder creates a new output builder
func NewBuilder() *BuilderImpl {
return &BuilderImpl{}
}
// WithErrorCallback sets an error callback for all unix_socket writers created by this builder
func (b *BuilderImpl) WithErrorCallback(cb ErrorCallback) *BuilderImpl {
b.errorCallback = cb
return b
}
// NewFromConfig constructs writers from AppConfig
// Uses AsyncBuffer from OutputConfig if specified, otherwise uses DefaultQueueSize
func (b *BuilderImpl) NewFromConfig(cfg api.AppConfig) (api.Writer, error) {
@ -529,7 +573,12 @@ func (b *BuilderImpl) NewFromConfig(cfg api.AppConfig) (api.Writer, error) {
if socketPath == "" {
return nil, fmt.Errorf("unix_socket output requires 'socket_path' parameter")
}
writer, err = NewUnixSocketWriterWithConfig(socketPath, DefaultDialTimeout, DefaultWriteTimeout, queueSize)
// Build options list
var opts []UnixSocketWriterOption
if b.errorCallback != nil {
opts = append(opts, WithErrorCallback(b.errorCallback))
}
writer, err = NewUnixSocketWriterWithConfig(socketPath, DefaultDialTimeout, DefaultWriteTimeout, queueSize, opts...)
if err != nil {
return nil, err
}

View File

@ -3,6 +3,7 @@ package output
import (
"bytes"
"encoding/json"
"net"
"os"
"path/filepath"
"testing"
@ -501,3 +502,560 @@ func TestLogRecordOptionalFieldsOmitted(t *testing.T) {
func contains(s, substr string) bool {
return bytes.Contains([]byte(s), []byte(substr))
}
// TestUnixSocketWriter_ErrorCallback tests that errors are reported via callback
func TestUnixSocketWriter_ErrorCallback(t *testing.T) {
tmpDir := t.TempDir()
socketPath := filepath.Join(tmpDir, "nonexistent.sock")
// Track callback invocations
var errorCalls []struct {
path string
err error
attempt int
}
callback := func(path string, err error, attempt int) {
errorCalls = append(errorCalls, struct {
path string
err error
attempt int
}{path, err, attempt})
}
w, err := NewUnixSocketWriterWithConfig(
socketPath,
100*time.Millisecond,
100*time.Millisecond,
10,
WithErrorCallback(callback),
)
if err != nil {
t.Fatalf("NewUnixSocketWriterWithConfig() error = %v", err)
}
defer w.Close()
rec := api.LogRecord{
SrcIP: "192.168.1.1",
SrcPort: 12345,
JA4: "test",
}
// Write should queue the message
err = w.Write(rec)
if err != nil {
t.Errorf("Write() unexpected error = %v", err)
}
// Wait for queue processor to attempt write and trigger callback
time.Sleep(500 * time.Millisecond)
// Callback should have been invoked at least once
if len(errorCalls) == 0 {
t.Error("ErrorCallback was not invoked")
} else {
// Verify callback parameters
lastCall := errorCalls[len(errorCalls)-1]
if lastCall.path != socketPath {
t.Errorf("Callback path = %v, want %v", lastCall.path, socketPath)
}
if lastCall.err == nil {
t.Error("Callback err should not be nil")
}
if lastCall.attempt < 1 {
t.Errorf("Callback attempt = %d, want >= 1", lastCall.attempt)
}
}
}
// TestBuilder_WithErrorCallback tests that the builder propagates error callbacks
func TestBuilder_WithErrorCallback(t *testing.T) {
tmpDir := t.TempDir()
socketPath := filepath.Join(tmpDir, "test.sock")
callback := func(path string, err error, attempt int) {
// Callback tracked for verification
}
builder := NewBuilder().WithErrorCallback(callback)
config := api.AppConfig{
Core: api.Config{
Interface: "eth0",
ListenPorts: []uint16{443},
},
Outputs: []api.OutputConfig{
{
Type: "unix_socket",
Enabled: true,
AsyncBuffer: 100,
Params: map[string]string{"socket_path": socketPath},
},
},
}
writer, err := builder.NewFromConfig(config)
if err != nil {
t.Fatalf("NewFromConfig() error = %v", err)
}
// Verify writer is a MultiWriter
mw, ok := writer.(*MultiWriter)
if !ok {
t.Fatal("Writer is not a MultiWriter")
}
// Verify the UnixSocketWriter has the callback set
if len(mw.writers) != 1 {
t.Fatalf("Expected 1 writer, got %d", len(mw.writers))
}
unixWriter, ok := mw.writers[0].(*UnixSocketWriter)
if !ok {
t.Fatal("Writer is not a UnixSocketWriter")
}
if unixWriter.errorCallback == nil {
t.Error("UnixSocketWriter.errorCallback is nil")
}
_ = writer
}
// TestUnixSocketWriter_NoCallback tests that writer works without callback
func TestUnixSocketWriter_NoCallback(t *testing.T) {
tmpDir := t.TempDir()
socketPath := filepath.Join(tmpDir, "nonexistent.sock")
// Create writer without callback
w, err := NewUnixSocketWriter(socketPath)
if err != nil {
t.Fatalf("NewUnixSocketWriter() error = %v", err)
}
defer w.Close()
rec := api.LogRecord{
SrcIP: "192.168.1.1",
SrcPort: 12345,
JA4: "test",
}
// Write should not panic even without callback
err = w.Write(rec)
if err != nil {
t.Logf("Write() error (expected) = %v", err)
}
// Give queue processor time to run
time.Sleep(100 * time.Millisecond)
// Should not panic
}
// TestUnixSocketWriter_CallbackResetOnSuccess tests that failure counter resets on success
func TestUnixSocketWriter_CallbackResetOnSuccess(t *testing.T) {
tmpDir := t.TempDir()
socketPath := filepath.Join(tmpDir, "test.sock")
// Create a real socket
listener, err := net.Listen("unix", socketPath)
if err != nil {
t.Fatalf("Failed to create socket: %v", err)
}
defer listener.Close()
// Start a goroutine to accept and read connections
done := make(chan struct{})
go func() {
for {
conn, err := listener.Accept()
if err != nil {
select {
case <-done:
return
default:
}
continue
}
// Read and discard data
buf := make([]byte, 1024)
conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
conn.Read(buf)
conn.Close()
}
}()
defer close(done)
var errorCalls int
callback := func(path string, err error, attempt int) {
errorCalls++
}
w, err := NewUnixSocketWriterWithConfig(
socketPath,
100*time.Millisecond,
100*time.Millisecond,
10,
WithErrorCallback(callback),
)
if err != nil {
t.Fatalf("NewUnixSocketWriterWithConfig() error = %v", err)
}
defer w.Close()
// Write successfully
rec := api.LogRecord{
SrcIP: "192.168.1.1",
SrcPort: 12345,
JA4: "test",
}
err = w.Write(rec)
if err != nil {
t.Errorf("Write() error = %v", err)
}
// Wait for write to complete
time.Sleep(200 * time.Millisecond)
// Callback should not have been called since connection succeeded
if errorCalls > 0 {
t.Errorf("ErrorCallback called %d times, want 0 for successful connection", errorCalls)
}
}
// TestFileWriter_Reopen tests the Reopen method for logrotate support
func TestFileWriter_Reopen(t *testing.T) {
tmpDir := t.TempDir()
testFile := filepath.Join(tmpDir, "test.log")
w, err := NewFileWriter(testFile)
if err != nil {
t.Fatalf("NewFileWriter() error = %v", err)
}
// Write initial data
rec1 := api.LogRecord{
SrcIP: "192.168.1.1",
SrcPort: 12345,
JA4: "test1",
}
err = w.Write(rec1)
if err != nil {
t.Errorf("Write() error = %v", err)
}
// Reopen the file (for logrotate - file is typically moved externally)
err = w.Reopen()
if err != nil {
t.Errorf("Reopen() error = %v", err)
}
// Write more data after reopen
rec2 := api.LogRecord{
SrcIP: "192.168.1.2",
SrcPort: 54321,
JA4: "test2",
}
err = w.Write(rec2)
if err != nil {
t.Errorf("Write() after reopen error = %v", err)
}
// Close and verify
if err := w.Close(); err != nil {
t.Errorf("Close() error = %v", err)
}
// Read the file - should contain both records (Reopen uses O_APPEND)
data, err := os.ReadFile(testFile)
if err != nil {
t.Fatalf("Failed to read file: %v", err)
}
// Parse JSON lines
lines := bytes.Split(bytes.TrimSpace(data), []byte("\n"))
if len(lines) != 2 {
t.Fatalf("Expected 2 lines, got %d", len(lines))
}
// Verify second record
var got api.LogRecord
if err := json.Unmarshal(lines[1], &got); err != nil {
t.Errorf("Invalid JSON on line 2: %v", err)
}
if got.SrcIP != rec2.SrcIP {
t.Errorf("SrcIP = %v, want %v", got.SrcIP, rec2.SrcIP)
}
}
// TestFileWriter_Rotate tests the log rotation functionality
func TestFileWriter_Rotate(t *testing.T) {
tmpDir := t.TempDir()
testFile := filepath.Join(tmpDir, "test.log")
// Create writer with very small max size to trigger rotation
// Minimum useful size is ~100 bytes for a log record
w, err := NewFileWriterWithConfig(testFile, 200, 3)
if err != nil {
t.Fatalf("NewFileWriterWithConfig() error = %v", err)
}
// Write multiple records to trigger rotation
records := []api.LogRecord{
{SrcIP: "192.168.1.1", SrcPort: 1111, JA4: "record1"},
{SrcIP: "192.168.1.2", SrcPort: 2222, JA4: "record2"},
{SrcIP: "192.168.1.3", SrcPort: 3333, JA4: "record3"},
{SrcIP: "192.168.1.4", SrcPort: 4444, JA4: "record4"},
}
for i, rec := range records {
err = w.Write(rec)
if err != nil {
t.Errorf("Write() record %d error = %v", i, err)
}
}
if err := w.Close(); err != nil {
t.Errorf("Close() error = %v", err)
}
// Check that rotation occurred (backup file should exist)
backupFile := testFile + ".1"
if _, err := os.Stat(backupFile); os.IsNotExist(err) {
t.Log("Note: Rotation may not have occurred if total data < maxSize")
}
// Verify main file exists and has content
if _, err := os.Stat(testFile); os.IsNotExist(err) {
t.Errorf("Main file %s does not exist", testFile)
}
}
// TestFileWriter_Rotate_MaxBackups tests that old backups are cleaned up
func TestFileWriter_Rotate_MaxBackups(t *testing.T) {
tmpDir := t.TempDir()
testFile := filepath.Join(tmpDir, "test.log")
// Create writer with small max size and only 2 backups
w, err := NewFileWriterWithConfig(testFile, 150, 2)
if err != nil {
t.Fatalf("NewFileWriterWithConfig() error = %v", err)
}
// Write enough records to trigger multiple rotations
for i := 0; i < 10; i++ {
rec := api.LogRecord{
SrcIP: "192.168.1.1",
SrcPort: uint16(1000 + i),
JA4: "test",
}
err = w.Write(rec)
if err != nil {
t.Errorf("Write() error = %v", err)
}
}
if err := w.Close(); err != nil {
t.Errorf("Close() error = %v", err)
}
// Count backup files
backupCount := 0
for i := 1; i <= 5; i++ {
backupPath := testFile + "." + string(rune('0'+i))
if _, err := os.Stat(backupPath); err == nil {
backupCount++
}
}
// Should have at most 2 backups
if backupCount > 2 {
t.Errorf("Too many backup files: %d, want <= 2", backupCount)
}
}
// TestFileWriter_Reopen_Error tests Reopen after external file removal
func TestFileWriter_Reopen_Error(t *testing.T) {
tmpDir := t.TempDir()
testFile := filepath.Join(tmpDir, "test.log")
w, err := NewFileWriter(testFile)
if err != nil {
t.Fatalf("NewFileWriter() error = %v", err)
}
// Write initial data
rec := api.LogRecord{
SrcIP: "192.168.1.1",
SrcPort: 12345,
JA4: "test",
}
err = w.Write(rec)
if err != nil {
t.Errorf("Write() error = %v", err)
}
// Remove the file externally (simulating logrotate move)
os.Remove(testFile)
// Reopen should succeed - it will create a new file
err = w.Reopen()
if err != nil {
t.Errorf("Reopen() should succeed after file removal, error = %v", err)
}
if err := w.Close(); err != nil {
t.Errorf("Close() error = %v", err)
}
}
// TestFileWriter_NewFileWriterWithConfig tests custom configuration
func TestFileWriter_NewFileWriterWithConfig(t *testing.T) {
tmpDir := t.TempDir()
testFile := filepath.Join(tmpDir, "test.log")
// Test with custom max size and backups
w, err := NewFileWriterWithConfig(testFile, 50*1024*1024, 5)
if err != nil {
t.Fatalf("NewFileWriterWithConfig() error = %v", err)
}
defer w.Close()
if w.maxSize != 50*1024*1024 {
t.Errorf("maxSize = %d, want %d", w.maxSize, 50*1024*1024)
}
if w.maxBackups != 5 {
t.Errorf("maxBackups = %d, want 5", w.maxBackups)
}
}
// TestFileWriter_NewFileWriterWithConfig_InvalidPath tests error handling
func TestFileWriter_NewFileWriterWithConfig_InvalidPath(t *testing.T) {
// Try to create file in a path that should fail (e.g., /proc which is read-only)
_, err := NewFileWriterWithConfig("/proc/test/test.log", 1024, 3)
if err == nil {
t.Error("NewFileWriterWithConfig() with invalid path should return error")
}
}
// TestMultiWriter_Reopen tests Reopen on MultiWriter
func TestMultiWriter_Reopen(t *testing.T) {
tmpDir := t.TempDir()
testFile := filepath.Join(tmpDir, "test.log")
mw := NewMultiWriter()
// Add a FileWriter (which is Reopenable)
fw, err := NewFileWriter(testFile)
if err != nil {
t.Fatalf("NewFileWriter() error = %v", err)
}
mw.Add(fw)
// Add a StdoutWriter (which is NOT Reopenable)
mw.Add(NewStdoutWriter())
// Write initial data
rec := api.LogRecord{
SrcIP: "192.168.1.1",
SrcPort: 12345,
JA4: "test",
}
err = mw.Write(rec)
if err != nil {
t.Errorf("Write() error = %v", err)
}
// Reopen should work (FileWriter is reopenable, StdoutWriter is skipped)
err = mw.Reopen()
if err != nil {
t.Errorf("Reopen() error = %v", err)
}
// Write after reopen
rec2 := api.LogRecord{
SrcIP: "192.168.1.2",
SrcPort: 54321,
JA4: "test2",
}
err = mw.Write(rec2)
if err != nil {
t.Errorf("Write() after reopen error = %v", err)
}
if err := mw.CloseAll(); err != nil {
t.Errorf("CloseAll() error = %v", err)
}
}
// TestUnixSocketWriter_QueueFull tests behavior when queue is full
func TestUnixSocketWriter_QueueFull(t *testing.T) {
tmpDir := t.TempDir()
socketPath := filepath.Join(tmpDir, "test.sock")
// Create writer with very small queue
w, err := NewUnixSocketWriterWithConfig(socketPath, 10*time.Millisecond, 10*time.Millisecond, 2)
if err != nil {
t.Fatalf("NewUnixSocketWriterWithConfig() error = %v", err)
}
defer w.Close()
// Fill the queue with records
for i := 0; i < 10; i++ {
rec := api.LogRecord{
SrcIP: "192.168.1.1",
SrcPort: uint16(1000 + i),
JA4: "test",
}
_ = w.Write(rec) // May succeed or fail depending on queue state
}
// Should not panic - queue full messages are dropped
}
// TestUnixSocketWriter_ReconnectBackoff tests exponential backoff behavior
func TestUnixSocketWriter_ReconnectBackoff(t *testing.T) {
tmpDir := t.TempDir()
socketPath := filepath.Join(tmpDir, "nonexistent.sock")
var errorCount int
callback := func(path string, err error, attempt int) {
errorCount++
}
w, err := NewUnixSocketWriterWithConfig(
socketPath,
10*time.Millisecond,
10*time.Millisecond,
5,
WithErrorCallback(callback),
)
if err != nil {
t.Fatalf("NewUnixSocketWriterWithConfig() error = %v", err)
}
defer w.Close()
// Write multiple records to trigger reconnection attempts
for i := 0; i < 3; i++ {
rec := api.LogRecord{
SrcIP: "192.168.1.1",
SrcPort: uint16(1000 + i),
JA4: "test",
}
_ = w.Write(rec)
}
// Wait for queue processor to attempt writes
time.Sleep(500 * time.Millisecond)
// Should have attempted reconnection
if errorCount == 0 {
t.Error("Expected at least one error callback for nonexistent socket")
}
}