fix: use unixgram (DGRAM) instead of unix (STREAM) for socket output
Some checks failed
Build RPM Package / Build RPM Packages (CentOS 7, Rocky 8/9/10) (push) Has been cancelled
Some checks failed
Build RPM Package / Build RPM Packages (CentOS 7, Rocky 8/9/10) (push) Has been cancelled
- Change net.DialTimeout from "unix" to "unixgram" - Fixes "protocol wrong type for socket" error - DGRAM sockets are connectionless, better suited for log shipping - Update test to use net.ListenUnixgram instead of net.Listen Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
@ -259,6 +259,7 @@ type UnixSocketWriter struct {
|
|||||||
errorCallback ErrorCallback
|
errorCallback ErrorCallback
|
||||||
consecutiveFailures int
|
consecutiveFailures int
|
||||||
failuresMu sync.Mutex
|
failuresMu sync.Mutex
|
||||||
|
networkType string // "unix" for STREAM, "unixgram" for DGRAM
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewUnixSocketWriter creates a new UNIX socket writer with reconnection logic
|
// NewUnixSocketWriter creates a new UNIX socket writer with reconnection logic
|
||||||
@ -300,7 +301,8 @@ func NewUnixSocketWriterWithConfig(socketPath string, dialTimeout, writeTimeout
|
|||||||
go w.processQueue()
|
go w.processQueue()
|
||||||
|
|
||||||
// Try initial connection silently (socket may not exist yet - that's okay)
|
// Try initial connection silently (socket may not exist yet - that's okay)
|
||||||
conn, err := net.DialTimeout("unix", socketPath, w.dialTimeout)
|
// Use unixgram (DGRAM) for connectionless UDP-like socket communication
|
||||||
|
conn, err := net.DialTimeout("unixgram", socketPath, w.dialTimeout)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
w.conn = conn
|
w.conn = conn
|
||||||
}
|
}
|
||||||
@ -399,7 +401,8 @@ func (w *UnixSocketWriter) writeWithReconnect(data []byte) error {
|
|||||||
if w.conn != nil {
|
if w.conn != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
conn, err := net.DialTimeout("unix", w.socketPath, w.dialTimeout)
|
// Use unixgram (DGRAM) for connectionless UDP-like socket communication
|
||||||
|
conn, err := net.DialTimeout("unixgram", w.socketPath, w.dialTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to connect to socket %s: %w", w.socketPath, err)
|
return fmt.Errorf("failed to connect to socket %s: %w", w.socketPath, err)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -657,31 +657,29 @@ func TestUnixSocketWriter_CallbackResetOnSuccess(t *testing.T) {
|
|||||||
tmpDir := t.TempDir()
|
tmpDir := t.TempDir()
|
||||||
socketPath := filepath.Join(tmpDir, "test.sock")
|
socketPath := filepath.Join(tmpDir, "test.sock")
|
||||||
|
|
||||||
// Create a real socket
|
// Create a real DGRAM (unixgram) socket
|
||||||
listener, err := net.Listen("unix", socketPath)
|
addr, err := net.ResolveUnixAddr("unixgram", socketPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to create socket: %v", err)
|
t.Fatalf("Failed to resolve unixgram address: %v", err)
|
||||||
|
}
|
||||||
|
listener, err := net.ListenUnixgram("unixgram", addr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create unixgram socket: %v", err)
|
||||||
}
|
}
|
||||||
defer listener.Close()
|
defer listener.Close()
|
||||||
|
|
||||||
// Start a goroutine to accept and read connections
|
// Start a goroutine to read datagrams
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
|
buf := make([]byte, 65536)
|
||||||
for {
|
for {
|
||||||
conn, err := listener.Accept()
|
|
||||||
if err != nil {
|
|
||||||
select {
|
select {
|
||||||
case <-done:
|
case <-done:
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
continue
|
listener.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
|
||||||
}
|
_, _, _ = listener.ReadFrom(buf)
|
||||||
// Read and discard data
|
|
||||||
buf := make([]byte, 1024)
|
|
||||||
conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
|
|
||||||
conn.Read(buf)
|
|
||||||
conn.Close()
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
defer close(done)
|
defer close(done)
|
||||||
|
|||||||
Reference in New Issue
Block a user