feat: add debug mode for Unix socket output
Some checks failed
Build RPM Package / Build RPM Packages (CentOS 7, Rocky 8/9/10) (push) Has been cancelled
Some checks failed
Build RPM Package / Build RPM Packages (CentOS 7, Rocky 8/9/10) (push) Has been cancelled
Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
@ -231,6 +231,7 @@ outputs:
|
||||
enabled: true
|
||||
params:
|
||||
socket_path: /var/run/logcorrelator/network.socket
|
||||
# debug: true # Activer pour logger l'état de connexion et les données envoyées
|
||||
```
|
||||
|
||||
### Logging fichier + stdout
|
||||
|
||||
@ -36,3 +36,4 @@ outputs:
|
||||
# enabled: false
|
||||
# params:
|
||||
# socket_path: /var/run/logcorrelator/network.socket
|
||||
# debug: true # Enable debug logging for socket connection state
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@ -197,6 +198,8 @@ type UnixSocketWriter struct {
|
||||
isClosed bool
|
||||
pendingWrites [][]byte
|
||||
pendingMu sync.Mutex
|
||||
debug bool
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
// NewUnixSocketWriter creates a new UNIX socket writer with reconnection logic
|
||||
@ -206,6 +209,11 @@ func NewUnixSocketWriter(socketPath string) (*UnixSocketWriter, error) {
|
||||
|
||||
// NewUnixSocketWriterWithConfig creates a new UNIX socket writer with custom configuration
|
||||
func NewUnixSocketWriterWithConfig(socketPath string, dialTimeout, writeTimeout time.Duration, queueSize int) (*UnixSocketWriter, error) {
|
||||
return NewUnixSocketWriterWithConfigAndDebug(socketPath, dialTimeout, writeTimeout, queueSize, false, nil)
|
||||
}
|
||||
|
||||
// NewUnixSocketWriterWithConfigAndDebug creates a new UNIX socket writer with debug logging
|
||||
func NewUnixSocketWriterWithConfigAndDebug(socketPath string, dialTimeout, writeTimeout time.Duration, queueSize int, debug bool, logger *log.Logger) (*UnixSocketWriter, error) {
|
||||
w := &UnixSocketWriter{
|
||||
socketPath: socketPath,
|
||||
dialTimeout: dialTimeout,
|
||||
@ -217,6 +225,8 @@ func NewUnixSocketWriterWithConfig(socketPath string, dialTimeout, writeTimeout
|
||||
queueClose: make(chan struct{}),
|
||||
queueDone: make(chan struct{}),
|
||||
pendingWrites: make([][]byte, 0),
|
||||
debug: debug,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
// Start the queue processor
|
||||
@ -226,6 +236,13 @@ func NewUnixSocketWriterWithConfig(socketPath string, dialTimeout, writeTimeout
|
||||
conn, err := net.DialTimeout("unix", socketPath, w.dialTimeout)
|
||||
if err == nil {
|
||||
w.conn = conn
|
||||
if w.debug && w.logger != nil {
|
||||
w.logger.Printf("[DEBUG] UnixSocketWriter: connected to %s", socketPath)
|
||||
}
|
||||
} else {
|
||||
if w.debug && w.logger != nil {
|
||||
w.logger.Printf("[DEBUG] UnixSocketWriter: initial connection to %s failed: %v (will retry on write)", socketPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
return w, nil
|
||||
@ -256,6 +273,10 @@ func (w *UnixSocketWriter) processQueue() {
|
||||
}
|
||||
w.pendingMu.Unlock()
|
||||
|
||||
if w.debug && w.logger != nil {
|
||||
w.logger.Printf("[DEBUG] UnixSocketWriter: write failed to %s: %v (failures: %d)", w.socketPath, err, consecutiveFailures)
|
||||
}
|
||||
|
||||
// Exponential backoff
|
||||
if consecutiveFailures > w.maxReconnects {
|
||||
time.Sleep(backoff)
|
||||
@ -265,6 +286,9 @@ func (w *UnixSocketWriter) processQueue() {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if w.debug && w.logger != nil {
|
||||
w.logger.Printf("[DEBUG] UnixSocketWriter: wrote %d bytes to %s", len(data), w.socketPath)
|
||||
}
|
||||
consecutiveFailures = 0
|
||||
backoff = w.reconnectBackoff
|
||||
// Try to flush pending data
|
||||
@ -312,10 +336,16 @@ func (w *UnixSocketWriter) writeWithReconnect(data []byte) error {
|
||||
return fmt.Errorf("failed to connect to socket %s: %w", w.socketPath, err)
|
||||
}
|
||||
w.conn = conn
|
||||
if w.debug && w.logger != nil {
|
||||
w.logger.Printf("[DEBUG] UnixSocketWriter: reconnected to %s", w.socketPath)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := ensureConn(); err != nil {
|
||||
if w.debug && w.logger != nil {
|
||||
w.logger.Printf("[DEBUG] UnixSocketWriter: connection failed to %s: %v", w.socketPath, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -328,6 +358,9 @@ func (w *UnixSocketWriter) writeWithReconnect(data []byte) error {
|
||||
}
|
||||
|
||||
// Connection failed, try to reconnect
|
||||
if w.debug && w.logger != nil {
|
||||
w.logger.Printf("[DEBUG] UnixSocketWriter: write failed, attempting reconnect to %s", w.socketPath)
|
||||
}
|
||||
_ = w.conn.Close()
|
||||
w.conn = nil
|
||||
|
||||
@ -386,6 +419,9 @@ func (w *UnixSocketWriter) Close() error {
|
||||
|
||||
w.isClosed = true
|
||||
if w.conn != nil {
|
||||
if w.debug && w.logger != nil {
|
||||
w.logger.Printf("[DEBUG] UnixSocketWriter: closing connection to %s", w.socketPath)
|
||||
}
|
||||
w.conn.Close()
|
||||
w.conn = nil
|
||||
}
|
||||
@ -482,7 +518,14 @@ func (b *BuilderImpl) NewFromConfig(cfg api.AppConfig) (api.Writer, error) {
|
||||
if socketPath == "" {
|
||||
return nil, fmt.Errorf("unix_socket output requires 'socket_path' parameter")
|
||||
}
|
||||
writer, err = NewUnixSocketWriter(socketPath)
|
||||
// Check for debug mode
|
||||
debug := outputCfg.Params["debug"] == "true" || outputCfg.Params["debug"] == "1"
|
||||
if debug {
|
||||
logger := log.New(os.Stderr, "[UnixSocket] ", log.LstdFlags|log.Lmicroseconds)
|
||||
writer, err = NewUnixSocketWriterWithConfigAndDebug(socketPath, DefaultDialTimeout, DefaultWriteTimeout, DefaultQueueSize, debug, logger)
|
||||
} else {
|
||||
writer, err = NewUnixSocketWriter(socketPath)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user