feat: use log_level parameter for Unix socket output
Some checks failed
Build RPM Package / Build RPM Packages (CentOS 7, Rocky 8/9/10) (push) Has been cancelled

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
Jacquin Antoine
2026-03-01 02:24:51 +01:00
parent 2a39f76ecd
commit 18d2978499
3 changed files with 50 additions and 64 deletions

View File

@ -231,7 +231,7 @@ outputs:
enabled: true enabled: true
params: params:
socket_path: /var/run/logcorrelator/network.socket socket_path: /var/run/logcorrelator/network.socket
# debug: true # Activer pour logger l'état de connexion et les données envoyées # log_level: debug # debug, info, warn, error (défaut: error)
``` ```
### Logging fichier + stdout ### Logging fichier + stdout

View File

@ -36,4 +36,4 @@ outputs:
# enabled: false # enabled: false
# params: # params:
# socket_path: /var/run/logcorrelator/network.socket # socket_path: /var/run/logcorrelator/network.socket
# debug: true # Enable debug logging for socket connection state # log_level: debug # debug, info, warn, error (default: error)

View File

@ -9,6 +9,7 @@ import (
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"sync" "sync"
"time" "time"
@ -198,7 +199,7 @@ type UnixSocketWriter struct {
isClosed bool isClosed bool
pendingWrites [][]byte pendingWrites [][]byte
pendingMu sync.Mutex pendingMu sync.Mutex
debug bool logLevel string
logger *log.Logger logger *log.Logger
} }
@ -209,11 +210,11 @@ func NewUnixSocketWriter(socketPath string) (*UnixSocketWriter, error) {
// NewUnixSocketWriterWithConfig creates a new UNIX socket writer with custom configuration // NewUnixSocketWriterWithConfig creates a new UNIX socket writer with custom configuration
func NewUnixSocketWriterWithConfig(socketPath string, dialTimeout, writeTimeout time.Duration, queueSize int) (*UnixSocketWriter, error) { func NewUnixSocketWriterWithConfig(socketPath string, dialTimeout, writeTimeout time.Duration, queueSize int) (*UnixSocketWriter, error) {
return NewUnixSocketWriterWithConfigAndDebug(socketPath, dialTimeout, writeTimeout, queueSize, false, nil) return NewUnixSocketWriterWithConfigAndLogLevel(socketPath, dialTimeout, writeTimeout, queueSize, "error")
} }
// NewUnixSocketWriterWithConfigAndDebug creates a new UNIX socket writer with debug logging // NewUnixSocketWriterWithConfigAndLogLevel creates a new UNIX socket writer with log level configuration
func NewUnixSocketWriterWithConfigAndDebug(socketPath string, dialTimeout, writeTimeout time.Duration, queueSize int, debug bool, logger *log.Logger) (*UnixSocketWriter, error) { func NewUnixSocketWriterWithConfigAndLogLevel(socketPath string, dialTimeout, writeTimeout time.Duration, queueSize int, logLevel string) (*UnixSocketWriter, error) {
w := &UnixSocketWriter{ w := &UnixSocketWriter{
socketPath: socketPath, socketPath: socketPath,
dialTimeout: dialTimeout, dialTimeout: dialTimeout,
@ -225,8 +226,8 @@ func NewUnixSocketWriterWithConfigAndDebug(socketPath string, dialTimeout, write
queueClose: make(chan struct{}), queueClose: make(chan struct{}),
queueDone: make(chan struct{}), queueDone: make(chan struct{}),
pendingWrites: make([][]byte, 0), pendingWrites: make([][]byte, 0),
debug: debug, logLevel: strings.ToLower(logLevel),
logger: logger, logger: log.New(os.Stderr, "[UnixSocket] ", log.LstdFlags|log.Lmicroseconds),
} }
// Start the queue processor // Start the queue processor
@ -236,26 +237,39 @@ func NewUnixSocketWriterWithConfigAndDebug(socketPath string, dialTimeout, write
conn, err := net.DialTimeout("unix", socketPath, w.dialTimeout) conn, err := net.DialTimeout("unix", socketPath, w.dialTimeout)
if err == nil { if err == nil {
w.conn = conn w.conn = conn
if w.logger != nil { w.log("INFO", "connected to %s", socketPath)
if w.debug {
w.logger.Printf("[DEBUG] UnixSocketWriter: connected to %s", socketPath)
} else { } else {
w.logger.Printf("[INFO] UnixSocketWriter: connected to %s", socketPath) w.log("WARNING", "initial connection to %s failed: %v (will retry on write)", socketPath, err)
}
}
} else {
if w.logger != nil {
if w.debug {
w.logger.Printf("[DEBUG] UnixSocketWriter: initial connection to %s failed: %v (will retry on write)", socketPath, err)
} else {
w.logger.Printf("[WARNING] UnixSocketWriter: initial connection to %s failed: %v (will retry on write)", socketPath, err)
}
}
} }
return w, nil return w, nil
} }
// log emits a log message if the level is enabled
func (w *UnixSocketWriter) log(level, format string, args ...interface{}) {
if !w.isLogLevelEnabled(level) {
return
}
w.logger.Printf("[%s] UnixSocketWriter: "+format, append([]interface{}{level}, args...)...)
}
// isLogLevelEnabled checks if a log level should be emitted
func (w *UnixSocketWriter) isLogLevelEnabled(level string) bool {
level = strings.ToLower(level)
switch w.logLevel {
case "debug":
return true
case "info":
return level != "debug"
case "warn", "warning":
return level != "debug" && level != "info"
case "error":
return level == "error"
default:
return false
}
}
// processQueue handles queued writes with reconnection logic // processQueue handles queued writes with reconnection logic
func (w *UnixSocketWriter) processQueue() { func (w *UnixSocketWriter) processQueue() {
defer close(w.queueDone) defer close(w.queueDone)
@ -281,14 +295,10 @@ func (w *UnixSocketWriter) processQueue() {
} }
w.pendingMu.Unlock() w.pendingMu.Unlock()
if w.logger != nil { if consecutiveFailures >= w.maxReconnects {
if w.debug { w.log("ERROR", "max reconnection attempts reached for %s (failures: %d)", w.socketPath, consecutiveFailures)
w.logger.Printf("[DEBUG] UnixSocketWriter: write failed to %s: %v (failures: %d)", w.socketPath, err, consecutiveFailures)
} else if consecutiveFailures >= w.maxReconnects {
w.logger.Printf("[ERROR] UnixSocketWriter: max reconnection attempts reached for %s (failures: %d)", w.socketPath, consecutiveFailures)
} else if consecutiveFailures > 1 { } else if consecutiveFailures > 1 {
w.logger.Printf("[WARNING] UnixSocketWriter: write failed to %s: %v (attempt %d/%d)", w.socketPath, err, consecutiveFailures, w.maxReconnects) w.log("WARNING", "write failed to %s: %v (attempt %d/%d)", w.socketPath, err, consecutiveFailures, w.maxReconnects)
}
} }
// Exponential backoff // Exponential backoff
@ -300,9 +310,7 @@ func (w *UnixSocketWriter) processQueue() {
} }
} }
} else { } else {
if w.debug && w.logger != nil { w.log("DEBUG", "wrote %d bytes to %s", len(data), w.socketPath)
w.logger.Printf("[DEBUG] UnixSocketWriter: wrote %d bytes to %s", len(data), w.socketPath)
}
consecutiveFailures = 0 consecutiveFailures = 0
backoff = w.reconnectBackoff backoff = w.reconnectBackoff
// Try to flush pending data // Try to flush pending data
@ -350,20 +358,12 @@ func (w *UnixSocketWriter) writeWithReconnect(data []byte) error {
return fmt.Errorf("failed to connect to socket %s: %w", w.socketPath, err) return fmt.Errorf("failed to connect to socket %s: %w", w.socketPath, err)
} }
w.conn = conn w.conn = conn
if w.logger != nil { w.log("INFO", "connected to %s", w.socketPath)
if w.debug {
w.logger.Printf("[DEBUG] UnixSocketWriter: reconnected to %s", w.socketPath)
} else {
w.logger.Printf("[INFO] UnixSocketWriter: connected to %s", w.socketPath)
}
}
return nil return nil
} }
if err := ensureConn(); err != nil { if err := ensureConn(); err != nil {
if w.logger != nil { w.log("ERROR", "connection failed to %s: %v", w.socketPath, err)
w.logger.Printf("[ERROR] UnixSocketWriter: connection failed to %s: %v", w.socketPath, err)
}
return err return err
} }
@ -376,13 +376,7 @@ func (w *UnixSocketWriter) writeWithReconnect(data []byte) error {
} }
// Connection failed, try to reconnect // Connection failed, try to reconnect
if w.logger != nil { w.log("WARNING", "write failed, attempting reconnect to %s", w.socketPath)
if w.debug {
w.logger.Printf("[DEBUG] UnixSocketWriter: write failed, attempting reconnect to %s", w.socketPath)
} else {
w.logger.Printf("[WARNING] UnixSocketWriter: write failed, attempting reconnect to %s", w.socketPath)
}
}
_ = w.conn.Close() _ = w.conn.Close()
w.conn = nil w.conn = nil
@ -441,13 +435,7 @@ func (w *UnixSocketWriter) Close() error {
w.isClosed = true w.isClosed = true
if w.conn != nil { if w.conn != nil {
if w.logger != nil { w.log("INFO", "closing connection to %s", w.socketPath)
if w.debug {
w.logger.Printf("[DEBUG] UnixSocketWriter: closing connection to %s", w.socketPath)
} else {
w.logger.Printf("[INFO] UnixSocketWriter: closing connection to %s", w.socketPath)
}
}
w.conn.Close() w.conn.Close()
w.conn = nil w.conn = nil
} }
@ -544,14 +532,12 @@ func (b *BuilderImpl) NewFromConfig(cfg api.AppConfig) (api.Writer, error) {
if socketPath == "" { if socketPath == "" {
return nil, fmt.Errorf("unix_socket output requires 'socket_path' parameter") return nil, fmt.Errorf("unix_socket output requires 'socket_path' parameter")
} }
// Check for debug mode // Get log level (default: error)
debug := outputCfg.Params["debug"] == "true" || outputCfg.Params["debug"] == "1" logLevel := outputCfg.Params["log_level"]
if debug { if logLevel == "" {
logger := log.New(os.Stderr, "[UnixSocket] ", log.LstdFlags|log.Lmicroseconds) logLevel = "error"
writer, err = NewUnixSocketWriterWithConfigAndDebug(socketPath, DefaultDialTimeout, DefaultWriteTimeout, DefaultQueueSize, debug, logger)
} else {
writer, err = NewUnixSocketWriter(socketPath)
} }
writer, err = NewUnixSocketWriterWithConfigAndLogLevel(socketPath, DefaultDialTimeout, DefaultWriteTimeout, DefaultQueueSize, logLevel)
if err != nil { if err != nil {
return nil, err return nil, err
} }