fix: correction race conditions et amélioration robustesse
Some checks failed
Build RPM Package / Build RPM Packages (CentOS 7, Rocky 8/9/10) (push) Has been cancelled

- Correction race condition dans tlsparse avec mutex par ConnectionFlow
- Fix fuite mémoire buffer HelloBuffer
- Ajout rotation de fichiers logs (100MB, 3 backups)
- Implémentation queue asynchrone avec reconnexion exponentielle (socket UNIX)
- Validation BPF (caractères, longueur, parenthèses)
- Augmentation snapLen pcap de 1600 à 65535 bytes
- Permissions fichiers sécurisées (0600)
- Ajout 46 tests unitaires (capture, output, logging)
- Passage go test -race sans erreur

Tests: go test -race ./... ✓
Build: go build ./... ✓
Lint: go vet ./... ✓

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
Jacquin Antoine
2026-02-28 21:15:45 +01:00
parent d14d6d6bf0
commit fec500ba46
9 changed files with 1127 additions and 510 deletions

View File

@ -7,12 +7,33 @@ import (
"io"
"net"
"os"
"path/filepath"
"sync"
"time"
"ja4sentinel/api"
)
// Socket configuration constants
const (
// DefaultDialTimeout is the default timeout for socket connections
DefaultDialTimeout = 5 * time.Second
// DefaultWriteTimeout is the default timeout for socket writes
DefaultWriteTimeout = 5 * time.Second
// DefaultMaxReconnectAttempts is the maximum number of reconnection attempts
DefaultMaxReconnectAttempts = 3
// DefaultReconnectBackoff is the initial backoff duration for reconnection
DefaultReconnectBackoff = 100 * time.Millisecond
// DefaultMaxReconnectBackoff is the maximum backoff duration
DefaultMaxReconnectBackoff = 2 * time.Second
// DefaultQueueSize is the size of the write queue for async writes
DefaultQueueSize = 1000
// DefaultMaxFileSize is the default maximum file size in bytes before rotation (100MB)
DefaultMaxFileSize = 100 * 1024 * 1024
// DefaultMaxBackups is the default number of backup files to keep
DefaultMaxBackups = 3
)
// StdoutWriter writes log records to stdout
type StdoutWriter struct {
encoder *json.Encoder
@ -38,31 +59,115 @@ func (w *StdoutWriter) Close() error {
return nil
}
// FileWriter writes log records to a file
// FileWriter writes log records to a file with rotation support
type FileWriter struct {
file *os.File
encoder *json.Encoder
mutex sync.Mutex
file *os.File
encoder *json.Encoder
mutex sync.Mutex
path string
maxSize int64
maxBackups int
currentSize int64
}
// NewFileWriter creates a new file writer
// NewFileWriter creates a new file writer with rotation
func NewFileWriter(path string) (*FileWriter, error) {
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
return NewFileWriterWithConfig(path, DefaultMaxFileSize, DefaultMaxBackups)
}
// NewFileWriterWithConfig creates a new file writer with custom rotation config
func NewFileWriterWithConfig(path string, maxSize int64, maxBackups int) (*FileWriter, error) {
// Create directory if it doesn't exist
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("failed to create directory %s: %w", dir, err)
}
// Open file with secure permissions (owner read/write only)
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return nil, fmt.Errorf("failed to open file %s: %w", path, err)
}
// Get current file size
info, err := file.Stat()
if err != nil {
file.Close()
return nil, fmt.Errorf("failed to stat file: %w", err)
}
return &FileWriter{
file: file,
encoder: json.NewEncoder(file),
file: file,
encoder: json.NewEncoder(file),
path: path,
maxSize: maxSize,
maxBackups: maxBackups,
currentSize: info.Size(),
}, nil
}
// rotate rotates the log file if it exceeds the max size
func (w *FileWriter) rotate() error {
if err := w.file.Close(); err != nil {
return fmt.Errorf("failed to close file: %w", err)
}
// Rotate existing backups
for i := w.maxBackups; i > 1; i-- {
oldPath := fmt.Sprintf("%s.%d", w.path, i-1)
newPath := fmt.Sprintf("%s.%d", w.path, i)
os.Rename(oldPath, newPath) // Ignore errors - file may not exist
}
// Move current file to .1
backupPath := fmt.Sprintf("%s.1", w.path)
if err := os.Rename(w.path, backupPath); err != nil {
// If rename fails, just truncate
if err := os.Truncate(w.path, 0); err != nil {
return fmt.Errorf("failed to truncate file: %w", err)
}
}
// Open new file
newFile, err := os.OpenFile(w.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return fmt.Errorf("failed to open new file: %w", err)
}
w.file = newFile
w.encoder = json.NewEncoder(newFile)
w.currentSize = 0
return nil
}
// Write writes a log record to the file
func (w *FileWriter) Write(rec api.LogRecord) error {
w.mutex.Lock()
defer w.mutex.Unlock()
return w.encoder.Encode(rec)
// Check if rotation is needed
if w.currentSize >= w.maxSize {
if err := w.rotate(); err != nil {
return fmt.Errorf("failed to rotate file: %w", err)
}
}
// Encode to buffer first to get size
data, err := json.Marshal(rec)
if err != nil {
return fmt.Errorf("failed to marshal record: %w", err)
}
data = append(data, '\n')
// Write to file
n, err := w.file.Write(data)
if err != nil {
return fmt.Errorf("failed to write to file: %w", err)
}
w.currentSize += int64(n)
return nil
}
// Close closes the file
@ -75,24 +180,49 @@ func (w *FileWriter) Close() error {
return nil
}
// UnixSocketWriter writes log records to a UNIX socket
// UnixSocketWriter writes log records to a UNIX socket with reconnection logic
type UnixSocketWriter struct {
socketPath string
conn net.Conn
mutex sync.Mutex
dialTimeout time.Duration
writeTimeout time.Duration
socketPath string
conn net.Conn
mutex sync.Mutex
dialTimeout time.Duration
writeTimeout time.Duration
maxReconnects int
reconnectBackoff time.Duration
maxBackoff time.Duration
queue chan []byte
queueClose chan struct{}
queueDone chan struct{}
closeOnce sync.Once
isClosed bool
pendingWrites [][]byte
pendingMu sync.Mutex
}
// NewUnixSocketWriter creates a new UNIX socket writer
// NewUnixSocketWriter creates a new UNIX socket writer with reconnection logic
func NewUnixSocketWriter(socketPath string) (*UnixSocketWriter, error) {
return NewUnixSocketWriterWithConfig(socketPath, DefaultDialTimeout, DefaultWriteTimeout, DefaultQueueSize)
}
// NewUnixSocketWriterWithConfig creates a new UNIX socket writer with custom configuration
func NewUnixSocketWriterWithConfig(socketPath string, dialTimeout, writeTimeout time.Duration, queueSize int) (*UnixSocketWriter, error) {
w := &UnixSocketWriter{
socketPath: socketPath,
dialTimeout: 2 * time.Second,
writeTimeout: 2 * time.Second,
socketPath: socketPath,
dialTimeout: dialTimeout,
writeTimeout: writeTimeout,
maxReconnects: DefaultMaxReconnectAttempts,
reconnectBackoff: DefaultReconnectBackoff,
maxBackoff: DefaultMaxReconnectBackoff,
queue: make(chan []byte, queueSize),
queueClose: make(chan struct{}),
queueDone: make(chan struct{}),
pendingWrites: make([][]byte, 0),
}
// Try to connect (socket may not exist yet)
// Start the queue processor
go w.processQueue()
// Try initial connection (socket may not exist yet - that's okay)
conn, err := net.DialTimeout("unix", socketPath, w.dialTimeout)
if err == nil {
w.conn = conn
@ -101,8 +231,75 @@ func NewUnixSocketWriter(socketPath string) (*UnixSocketWriter, error) {
return w, nil
}
// Write writes a log record to the UNIX socket
func (w *UnixSocketWriter) Write(rec api.LogRecord) error {
// processQueue handles queued writes with reconnection logic
func (w *UnixSocketWriter) processQueue() {
defer close(w.queueDone)
backoff := w.reconnectBackoff
consecutiveFailures := 0
for {
select {
case data, ok := <-w.queue:
if !ok {
// Channel closed, drain remaining data
w.flushPendingData()
return
}
if err := w.writeWithReconnect(data); err != nil {
consecutiveFailures++
// Queue for retry
w.pendingMu.Lock()
if len(w.pendingWrites) < DefaultQueueSize {
w.pendingWrites = append(w.pendingWrites, data)
}
w.pendingMu.Unlock()
// Exponential backoff
if consecutiveFailures > w.maxReconnects {
time.Sleep(backoff)
backoff *= 2
if backoff > w.maxBackoff {
backoff = w.maxBackoff
}
}
} else {
consecutiveFailures = 0
backoff = w.reconnectBackoff
// Try to flush pending data
w.flushPendingData()
}
case <-w.queueClose:
w.flushPendingData()
return
}
}
}
// flushPendingData attempts to write any pending data
func (w *UnixSocketWriter) flushPendingData() {
w.pendingMu.Lock()
pending := w.pendingWrites
w.pendingWrites = make([][]byte, 0)
w.pendingMu.Unlock()
for _, data := range pending {
if err := w.writeWithReconnect(data); err != nil {
// Put it back for next flush attempt
w.pendingMu.Lock()
if len(w.pendingWrites) < DefaultQueueSize {
w.pendingWrites = append(w.pendingWrites, data)
}
w.pendingMu.Unlock()
break
}
}
}
// writeWithReconnect attempts to write data with reconnection logic
func (w *UnixSocketWriter) writeWithReconnect(data []byte) error {
w.mutex.Lock()
defer w.mutex.Unlock()
@ -122,48 +319,77 @@ func (w *UnixSocketWriter) Write(rec api.LogRecord) error {
return err
}
if err := w.conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)); err != nil {
return fmt.Errorf("failed to set write deadline: %w", err)
}
if _, err := w.conn.Write(data); err == nil {
return nil
}
// Connection failed, try to reconnect
_ = w.conn.Close()
w.conn = nil
if err := ensureConn(); err != nil {
return fmt.Errorf("failed to reconnect: %w", err)
}
if err := w.conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)); err != nil {
_ = w.conn.Close()
w.conn = nil
return fmt.Errorf("failed to set write deadline after reconnect: %w", err)
}
if _, err := w.conn.Write(data); err != nil {
_ = w.conn.Close()
w.conn = nil
return fmt.Errorf("failed to write after reconnect: %w", err)
}
return nil
}
// Write writes a log record to the UNIX socket (non-blocking with queue)
func (w *UnixSocketWriter) Write(rec api.LogRecord) error {
w.mutex.Lock()
if w.isClosed {
w.mutex.Unlock()
return fmt.Errorf("writer is closed")
}
w.mutex.Unlock()
data, err := json.Marshal(rec)
if err != nil {
return fmt.Errorf("failed to marshal record: %w", err)
}
data = append(data, '\n')
if err := w.conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)); err != nil {
return fmt.Errorf("failed to set write deadline: %w", err)
}
if _, err = w.conn.Write(data); err == nil {
select {
case w.queue <- data:
return nil
default:
// Queue is full, drop the message (could also block or return error)
return fmt.Errorf("write queue is full, dropping message")
}
_ = w.conn.Close()
w.conn = nil
if errConn := ensureConn(); errConn != nil {
return fmt.Errorf("failed to write to socket and reconnect failed: %w", errConn)
}
if errDeadline := w.conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)); errDeadline != nil {
_ = w.conn.Close()
w.conn = nil
return fmt.Errorf("failed to set write deadline after reconnect: %w", errDeadline)
}
if _, errRetry := w.conn.Write(data); errRetry != nil {
_ = w.conn.Close()
w.conn = nil
return fmt.Errorf("failed to write to socket after reconnect: %w", errRetry)
}
return nil
}
// Close closes the UNIX socket connection
// Close closes the UNIX socket connection and stops the queue processor
func (w *UnixSocketWriter) Close() error {
w.mutex.Lock()
defer w.mutex.Unlock()
if w.conn != nil {
return w.conn.Close()
}
w.closeOnce.Do(func() {
close(w.queueClose)
<-w.queueDone
close(w.queue)
w.mutex.Lock()
defer w.mutex.Unlock()
w.isClosed = true
if w.conn != nil {
w.conn.Close()
w.conn = nil
}
})
return nil
}