fix: durcir la validation et fiabiliser flush/arrêt idempotents

Co-authored-by: aider (openrouter/openai/gpt-5.3-codex) <aider@aider.chat>
This commit is contained in:
Jacquin Antoine
2026-02-28 20:10:28 +01:00
parent 81849b16d8
commit 7e9535122e
5 changed files with 239 additions and 123 deletions

View File

@ -40,6 +40,7 @@ type UnixSocketSource struct {
done chan struct{} done chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
semaphore chan struct{} // Limit concurrent connections semaphore chan struct{} // Limit concurrent connections
stopOnce sync.Once
} }
// NewUnixSocketSource creates a new Unix socket source. // NewUnixSocketSource creates a new Unix socket source.
@ -58,6 +59,10 @@ func (s *UnixSocketSource) Name() string {
// Start begins listening on the Unix socket. // Start begins listening on the Unix socket.
func (s *UnixSocketSource) Start(ctx context.Context, eventChan chan<- *domain.NormalizedEvent) error { func (s *UnixSocketSource) Start(ctx context.Context, eventChan chan<- *domain.NormalizedEvent) error {
if strings.TrimSpace(s.config.Path) == "" {
return fmt.Errorf("socket path cannot be empty")
}
// Remove existing socket file if present // Remove existing socket file if present
if info, err := os.Stat(s.config.Path); err == nil { if info, err := os.Stat(s.config.Path); err == nil {
if info.Mode()&os.ModeSocket != 0 { if info.Mode()&os.ModeSocket != 0 {
@ -78,8 +83,8 @@ func (s *UnixSocketSource) Start(ctx context.Context, eventChan chan<- *domain.N
// Set permissions - fail if we can't // Set permissions - fail if we can't
if err := os.Chmod(s.config.Path, DefaultSocketPermissions); err != nil { if err := os.Chmod(s.config.Path, DefaultSocketPermissions); err != nil {
listener.Close() _ = listener.Close()
os.Remove(s.config.Path) _ = os.Remove(s.config.Path)
return fmt.Errorf("failed to set socket permissions: %w", err) return fmt.Errorf("failed to set socket permissions: %w", err)
} }
@ -120,7 +125,7 @@ func (s *UnixSocketSource) acceptConnections(ctx context.Context, eventChan chan
// Connection accepted // Connection accepted
default: default:
// Too many connections, reject // Too many connections, reject
conn.Close() _ = conn.Close()
continue continue
} }
@ -136,7 +141,7 @@ func (s *UnixSocketSource) acceptConnections(ctx context.Context, eventChan chan
func (s *UnixSocketSource) readEvents(ctx context.Context, conn net.Conn, eventChan chan<- *domain.NormalizedEvent) { func (s *UnixSocketSource) readEvents(ctx context.Context, conn net.Conn, eventChan chan<- *domain.NormalizedEvent) {
// Set read deadline to prevent hanging // Set read deadline to prevent hanging
conn.SetReadDeadline(time.Now().Add(5 * time.Minute)) _ = conn.SetReadDeadline(time.Now().Add(5 * time.Minute))
scanner := bufio.NewScanner(conn) scanner := bufio.NewScanner(conn)
// Increase buffer size limit to 1MB // Increase buffer size limit to 1MB
@ -167,10 +172,6 @@ func (s *UnixSocketSource) readEvents(ctx context.Context, conn net.Conn, eventC
return return
} }
} }
if err := scanner.Err(); err != nil {
// Connection error, log but don't crash
}
} }
func parseJSONEvent(data []byte) (*domain.NormalizedEvent, error) { func parseJSONEvent(data []byte) (*domain.NormalizedEvent, error) {
@ -314,21 +315,26 @@ func getInt64(m map[string]any, key string) (int64, bool) {
// Stop gracefully stops the source. // Stop gracefully stops the source.
func (s *UnixSocketSource) Stop() error { func (s *UnixSocketSource) Stop() error {
s.mu.Lock() var stopErr error
defer s.mu.Unlock()
close(s.done) s.stopOnce.Do(func() {
s.mu.Lock()
defer s.mu.Unlock()
if s.listener != nil { close(s.done)
s.listener.Close()
}
s.wg.Wait() if s.listener != nil {
_ = s.listener.Close()
}
// Clean up socket file s.wg.Wait()
if err := os.Remove(s.config.Path); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove socket file: %w", err)
}
return nil // Clean up socket file
if err := os.Remove(s.config.Path); err != nil && !os.IsNotExist(err) {
stopErr = fmt.Errorf("failed to remove socket file: %w", err)
return
}
})
return stopErr
} }

View File

@ -5,6 +5,7 @@ import (
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings"
"sync" "sync"
"time" "time"
@ -49,10 +50,18 @@ type ClickHouseSink struct {
flushChan chan struct{} flushChan chan struct{}
done chan struct{} done chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
closeOnce sync.Once
} }
// NewClickHouseSink creates a new ClickHouse sink. // NewClickHouseSink creates a new ClickHouse sink.
func NewClickHouseSink(config Config) (*ClickHouseSink, error) { func NewClickHouseSink(config Config) (*ClickHouseSink, error) {
if strings.TrimSpace(config.DSN) == "" {
return nil, fmt.Errorf("clickhouse DSN is required")
}
if strings.TrimSpace(config.Table) == "" {
return nil, fmt.Errorf("clickhouse table is required")
}
// Apply defaults // Apply defaults
if config.BatchSize <= 0 { if config.BatchSize <= 0 {
config.BatchSize = DefaultBatchSize config.BatchSize = DefaultBatchSize
@ -85,7 +94,7 @@ func NewClickHouseSink(config Config) (*ClickHouseSink, error) {
defer pingCancel() defer pingCancel()
if err := db.PingContext(pingCtx); err != nil { if err := db.PingContext(pingCtx); err != nil {
db.Close() _ = db.Close()
return nil, fmt.Errorf("failed to ping ClickHouse: %w", err) return nil, fmt.Errorf("failed to ping ClickHouse: %w", err)
} }
@ -143,13 +152,28 @@ func (s *ClickHouseSink) Flush(ctx context.Context) error {
// Close closes the sink. // Close closes the sink.
func (s *ClickHouseSink) Close() error { func (s *ClickHouseSink) Close() error {
close(s.done) var closeErr error
s.wg.Wait()
if s.db != nil { s.closeOnce.Do(func() {
return s.db.Close() if s.done != nil {
} close(s.done)
return nil }
s.wg.Wait()
flushCtx, cancel := context.WithTimeout(context.Background(), time.Duration(s.config.TimeoutMs)*time.Millisecond)
defer cancel()
if err := s.doFlush(flushCtx); err != nil {
closeErr = err
}
if s.db != nil {
if err := s.db.Close(); err != nil && closeErr == nil {
closeErr = err
}
}
})
return closeErr
} }
func (s *ClickHouseSink) flushLoop() { func (s *ClickHouseSink) flushLoop() {
@ -161,25 +185,30 @@ func (s *ClickHouseSink) flushLoop() {
for { for {
select { select {
case <-s.done: case <-s.done:
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.config.TimeoutMs)*time.Millisecond)
_ = s.doFlush(ctx)
cancel()
return return
case <-ticker.C: case <-ticker.C:
s.mu.Lock() s.mu.Lock()
needsFlush := len(s.buffer) > 0 needsFlush := len(s.buffer) > 0
s.mu.Unlock() s.mu.Unlock()
if needsFlush { if needsFlush {
// Use timeout context for flush
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.config.TimeoutMs)*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.config.TimeoutMs)*time.Millisecond)
s.doFlush(ctx) _ = s.doFlush(ctx)
cancel() cancel()
} }
case <-s.flushChan: case <-s.flushChan:
s.mu.Lock() s.mu.Lock()
needsFlush := len(s.buffer) >= s.config.BatchSize needsFlush := len(s.buffer) >= s.config.BatchSize
s.mu.Unlock() s.mu.Unlock()
if needsFlush { if needsFlush {
// Use timeout context for flush
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.config.TimeoutMs)*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.config.TimeoutMs)*time.Millisecond)
s.doFlush(ctx) _ = s.doFlush(ctx)
cancel() cancel()
} }
} }
@ -199,7 +228,10 @@ func (s *ClickHouseSink) doFlush(ctx context.Context) error {
s.buffer = make([]domain.CorrelatedLog, 0, s.config.BatchSize) s.buffer = make([]domain.CorrelatedLog, 0, s.config.BatchSize)
s.mu.Unlock() s.mu.Unlock()
// Prepare batch insert with retry if s.db == nil {
return fmt.Errorf("clickhouse connection is not initialized")
}
query := fmt.Sprintf(` query := fmt.Sprintf(`
INSERT INTO %s (timestamp, src_ip, src_port, dst_ip, dst_port, correlated, orphan_side, apache, network) INSERT INTO %s (timestamp, src_ip, src_port, dst_ip, dst_port, correlated, orphan_side, apache, network)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
@ -209,7 +241,6 @@ func (s *ClickHouseSink) doFlush(ctx context.Context) error {
var lastErr error var lastErr error
for attempt := 0; attempt < MaxRetries; attempt++ { for attempt := 0; attempt < MaxRetries; attempt++ {
if attempt > 0 { if attempt > 0 {
// Exponential backoff
delay := RetryBaseDelay * time.Duration(1<<uint(attempt-1)) delay := RetryBaseDelay * time.Duration(1<<uint(attempt-1))
select { select {
case <-time.After(delay): case <-time.After(delay):
@ -220,10 +251,9 @@ func (s *ClickHouseSink) doFlush(ctx context.Context) error {
lastErr = s.executeBatch(ctx, query, buffer) lastErr = s.executeBatch(ctx, query, buffer)
if lastErr == nil { if lastErr == nil {
return nil // Success return nil
} }
// Check if error is retryable
if !isRetryableError(lastErr) { if !isRetryableError(lastErr) {
return fmt.Errorf("non-retryable error: %w", lastErr) return fmt.Errorf("non-retryable error: %w", lastErr)
} }
@ -249,11 +279,6 @@ func (s *ClickHouseSink) executeBatch(ctx context.Context, query string, buffer
apacheJSON, _ := json.Marshal(log.Apache) apacheJSON, _ := json.Marshal(log.Apache)
networkJSON, _ := json.Marshal(log.Network) networkJSON, _ := json.Marshal(log.Network)
orphanSide := log.OrphanSide
if !log.Correlated {
orphanSide = log.OrphanSide
}
correlated := 0 correlated := 0
if log.Correlated { if log.Correlated {
correlated = 1 correlated = 1
@ -266,7 +291,7 @@ func (s *ClickHouseSink) executeBatch(ctx context.Context, query string, buffer
log.DstIP, log.DstIP,
log.DstPort, log.DstPort,
correlated, correlated,
orphanSide, log.OrphanSide,
string(apacheJSON), string(apacheJSON),
string(networkJSON), string(networkJSON),
) )
@ -287,8 +312,7 @@ func isRetryableError(err error) bool {
if err == nil { if err == nil {
return false return false
} }
errStr := err.Error() errStr := strings.ToLower(err.Error())
// Common retryable errors
retryableErrors := []string{ retryableErrors := []string{
"connection refused", "connection refused",
"connection reset", "connection reset",
@ -298,36 +322,9 @@ func isRetryableError(err error) bool {
"broken pipe", "broken pipe",
} }
for _, re := range retryableErrors { for _, re := range retryableErrors {
if containsIgnoreCase(errStr, re) { if strings.Contains(errStr, re) {
return true return true
} }
} }
return false return false
} }
func containsIgnoreCase(s, substr string) bool {
return len(s) >= len(substr) && containsLower(s, substr)
}
func containsLower(s, substr string) bool {
s = toLower(s)
substr = toLower(substr)
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
return false
}
func toLower(s string) string {
var result []byte
for i := 0; i < len(s); i++ {
c := s[i]
if c >= 'A' && c <= 'Z' {
c = c + ('a' - 'A')
}
result = append(result, c)
}
return string(result)
}

View File

@ -3,6 +3,7 @@ package config
import ( import (
"fmt" "fmt"
"os" "os"
"strings"
"time" "time"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
@ -36,9 +37,9 @@ type UnixSocketConfig struct {
// OutputsConfig holds output sinks configuration. // OutputsConfig holds output sinks configuration.
type OutputsConfig struct { type OutputsConfig struct {
File FileOutputConfig `yaml:"file"` File FileOutputConfig `yaml:"file"`
ClickHouse ClickHouseOutputConfig `yaml:"clickhouse"` ClickHouse ClickHouseOutputConfig `yaml:"clickhouse"`
Stdout StdoutOutputConfig `yaml:"stdout"` Stdout StdoutOutputConfig `yaml:"stdout"`
} }
// FileOutputConfig holds file sink configuration. // FileOutputConfig holds file sink configuration.
@ -152,12 +153,59 @@ func (c *Config) Validate() error {
return fmt.Errorf("at least two unix socket inputs are required") return fmt.Errorf("at least two unix socket inputs are required")
} }
seenNames := make(map[string]struct{}, len(c.Inputs.UnixSockets))
seenPaths := make(map[string]struct{}, len(c.Inputs.UnixSockets))
for i, input := range c.Inputs.UnixSockets {
if strings.TrimSpace(input.Name) == "" {
return fmt.Errorf("inputs.unix_sockets[%d].name is required", i)
}
if strings.TrimSpace(input.Path) == "" {
return fmt.Errorf("inputs.unix_sockets[%d].path is required", i)
}
if _, exists := seenNames[input.Name]; exists {
return fmt.Errorf("duplicate unix socket input name: %s", input.Name)
}
seenNames[input.Name] = struct{}{}
if _, exists := seenPaths[input.Path]; exists {
return fmt.Errorf("duplicate unix socket input path: %s", input.Path)
}
seenPaths[input.Path] = struct{}{}
}
if !c.Outputs.File.Enabled && !c.Outputs.ClickHouse.Enabled && !c.Outputs.Stdout.Enabled { if !c.Outputs.File.Enabled && !c.Outputs.ClickHouse.Enabled && !c.Outputs.Stdout.Enabled {
return fmt.Errorf("at least one output must be enabled") return fmt.Errorf("at least one output must be enabled")
} }
if c.Outputs.ClickHouse.Enabled && c.Outputs.ClickHouse.DSN == "" { if c.Outputs.File.Enabled && strings.TrimSpace(c.Outputs.File.Path) == "" {
return fmt.Errorf("clickhouse DSN is required when enabled") return fmt.Errorf("file output path is required when file output is enabled")
}
if c.Outputs.ClickHouse.Enabled {
if strings.TrimSpace(c.Outputs.ClickHouse.DSN) == "" {
return fmt.Errorf("clickhouse DSN is required when enabled")
}
if strings.TrimSpace(c.Outputs.ClickHouse.Table) == "" {
return fmt.Errorf("clickhouse table is required when enabled")
}
if c.Outputs.ClickHouse.BatchSize <= 0 {
return fmt.Errorf("clickhouse batch_size must be > 0")
}
if c.Outputs.ClickHouse.MaxBufferSize <= 0 {
return fmt.Errorf("clickhouse max_buffer_size must be > 0")
}
if c.Outputs.ClickHouse.TimeoutMs <= 0 {
return fmt.Errorf("clickhouse timeout_ms must be > 0")
}
}
if len(c.Correlation.Key) == 0 {
return fmt.Errorf("correlation.key cannot be empty")
}
if c.Correlation.TimeWindow.Value <= 0 {
return fmt.Errorf("correlation.time_window.value must be > 0")
} }
return nil return nil

View File

@ -9,6 +9,8 @@ import (
const ( const (
// DefaultMaxBufferSize is the default maximum number of events per buffer // DefaultMaxBufferSize is the default maximum number of events per buffer
DefaultMaxBufferSize = 10000 DefaultMaxBufferSize = 10000
// DefaultTimeWindow is used when no valid time window is provided
DefaultTimeWindow = time.Second
) )
// CorrelationConfig holds the correlation configuration. // CorrelationConfig holds the correlation configuration.
@ -25,8 +27,8 @@ type CorrelationService struct {
mu sync.Mutex mu sync.Mutex
bufferA *eventBuffer bufferA *eventBuffer
bufferB *eventBuffer bufferB *eventBuffer
pendingA map[string]*list.Element // key -> list element containing NormalizedEvent pendingA map[string][]*list.Element // key -> ordered elements containing *NormalizedEvent
pendingB map[string]*list.Element pendingB map[string][]*list.Element
timeProvider TimeProvider timeProvider TimeProvider
} }
@ -60,12 +62,16 @@ func NewCorrelationService(config CorrelationConfig, timeProvider TimeProvider)
if config.MaxBufferSize <= 0 { if config.MaxBufferSize <= 0 {
config.MaxBufferSize = DefaultMaxBufferSize config.MaxBufferSize = DefaultMaxBufferSize
} }
if config.TimeWindow <= 0 {
config.TimeWindow = DefaultTimeWindow
}
return &CorrelationService{ return &CorrelationService{
config: config, config: config,
bufferA: newEventBuffer(), bufferA: newEventBuffer(),
bufferB: newEventBuffer(), bufferB: newEventBuffer(),
pendingA: make(map[string]*list.Element), pendingA: make(map[string][]*list.Element),
pendingB: make(map[string]*list.Element), pendingB: make(map[string][]*list.Element),
timeProvider: timeProvider, timeProvider: timeProvider,
} }
} }
@ -84,20 +90,29 @@ func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLo
if event.Source == SourceA && s.config.ApacheAlwaysEmit { if event.Source == SourceA && s.config.ApacheAlwaysEmit {
return []CorrelatedLog{NewCorrelatedLogFromEvent(event, "A")} return []CorrelatedLog{NewCorrelatedLogFromEvent(event, "A")}
} }
if event.Source == SourceB && s.config.NetworkEmit {
return []CorrelatedLog{NewCorrelatedLogFromEvent(event, "B")}
}
return nil return nil
} }
var results []CorrelatedLog var (
results []CorrelatedLog
shouldBuffer bool
)
switch event.Source { switch event.Source {
case SourceA: case SourceA:
results = s.processSourceA(event) results, shouldBuffer = s.processSourceA(event)
case SourceB: case SourceB:
results = s.processSourceB(event) results, shouldBuffer = s.processSourceB(event)
default:
return nil
} }
// Add the new event to the appropriate buffer if shouldBuffer {
s.addEvent(event) s.addEvent(event)
}
return results return results
} }
@ -112,54 +127,46 @@ func (s *CorrelationService) isBufferFull(source EventSource) bool {
return false return false
} }
func (s *CorrelationService) processSourceA(event *NormalizedEvent) []CorrelatedLog { func (s *CorrelationService) processSourceA(event *NormalizedEvent) ([]CorrelatedLog, bool) {
key := event.CorrelationKeyFull() key := event.CorrelationKeyFull()
// Look for a matching B event // Look for the first matching B event (one-to-one first match)
if elem, ok := s.pendingB[key]; ok { if bEvent := s.findAndPopFirstMatch(s.bufferB, s.pendingB, key, func(other *NormalizedEvent) bool {
bEvent := elem.Value.(*NormalizedEvent) return s.eventsMatch(event, other)
if s.eventsMatch(event, bEvent) { }); bEvent != nil {
// Found a match! correlated := NewCorrelatedLog(event, bEvent)
correlated := NewCorrelatedLog(event, bEvent) return []CorrelatedLog{correlated}, false
s.bufferB.events.Remove(elem)
delete(s.pendingB, key)
return []CorrelatedLog{correlated}
}
} }
// No match found // No match found
if s.config.ApacheAlwaysEmit { if s.config.ApacheAlwaysEmit {
orphan := NewCorrelatedLogFromEvent(event, "A") orphan := NewCorrelatedLogFromEvent(event, "A")
return []CorrelatedLog{orphan} return []CorrelatedLog{orphan}, false
} }
// Keep in buffer for potential future match // Keep in buffer for potential future match
return nil return nil, true
} }
func (s *CorrelationService) processSourceB(event *NormalizedEvent) []CorrelatedLog { func (s *CorrelationService) processSourceB(event *NormalizedEvent) ([]CorrelatedLog, bool) {
key := event.CorrelationKeyFull() key := event.CorrelationKeyFull()
// Look for a matching A event // Look for the first matching A event (one-to-one first match)
if elem, ok := s.pendingA[key]; ok { if aEvent := s.findAndPopFirstMatch(s.bufferA, s.pendingA, key, func(other *NormalizedEvent) bool {
aEvent := elem.Value.(*NormalizedEvent) return s.eventsMatch(other, event)
if s.eventsMatch(aEvent, event) { }); aEvent != nil {
// Found a match! correlated := NewCorrelatedLog(aEvent, event)
correlated := NewCorrelatedLog(aEvent, event) return []CorrelatedLog{correlated}, false
s.bufferA.events.Remove(elem)
delete(s.pendingA, key)
return []CorrelatedLog{correlated}
}
} }
// No match found - B is never emitted alone per spec // No match found
if s.config.NetworkEmit { if s.config.NetworkEmit {
orphan := NewCorrelatedLogFromEvent(event, "B") orphan := NewCorrelatedLogFromEvent(event, "B")
return []CorrelatedLog{orphan} return []CorrelatedLog{orphan}, false
} }
// Keep in buffer for potential future match (but won't be emitted alone) // Keep in buffer for potential future match
return nil return nil, true
} }
func (s *CorrelationService) eventsMatch(a, b *NormalizedEvent) bool { func (s *CorrelationService) eventsMatch(a, b *NormalizedEvent) bool {
@ -176,10 +183,10 @@ func (s *CorrelationService) addEvent(event *NormalizedEvent) {
switch event.Source { switch event.Source {
case SourceA: case SourceA:
elem := s.bufferA.events.PushBack(event) elem := s.bufferA.events.PushBack(event)
s.pendingA[key] = elem s.pendingA[key] = append(s.pendingA[key], elem)
case SourceB: case SourceB:
elem := s.bufferB.events.PushBack(event) elem := s.bufferB.events.PushBack(event)
s.pendingB[key] = elem s.pendingB[key] = append(s.pendingB[key], elem)
} }
} }
@ -192,22 +199,67 @@ func (s *CorrelationService) cleanExpired() {
s.cleanBuffer(s.bufferB, s.pendingB, cutoff) s.cleanBuffer(s.bufferB, s.pendingB, cutoff)
} }
// cleanBuffer removes expired events from a buffer (shared logic for A and B). // cleanBuffer removes expired events from a buffer.
func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string]*list.Element, cutoff time.Time) { func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string][]*list.Element, cutoff time.Time) {
for elem := buffer.events.Front(); elem != nil; { for elem := buffer.events.Front(); elem != nil; {
event := elem.Value.(*NormalizedEvent) event := elem.Value.(*NormalizedEvent)
if event.Timestamp.Before(cutoff) { if event.Timestamp.Before(cutoff) {
next := elem.Next() next := elem.Next()
key := event.CorrelationKeyFull() key := event.CorrelationKeyFull()
buffer.events.Remove(elem) buffer.events.Remove(elem)
if pending[key] == elem { pending[key] = removeElementFromSlice(pending[key], elem)
if len(pending[key]) == 0 {
delete(pending, key) delete(pending, key)
} }
elem = next elem = next
continue
}
// Events are inserted in arrival order; once we hit a non-expired event, stop.
break
}
}
func (s *CorrelationService) findAndPopFirstMatch(
buffer *eventBuffer,
pending map[string][]*list.Element,
key string,
matcher func(*NormalizedEvent) bool,
) *NormalizedEvent {
elements, ok := pending[key]
if !ok || len(elements) == 0 {
return nil
}
for idx, elem := range elements {
other := elem.Value.(*NormalizedEvent)
if !matcher(other) {
continue
}
buffer.events.Remove(elem)
updated := append(elements[:idx], elements[idx+1:]...)
if len(updated) == 0 {
delete(pending, key)
} else { } else {
break // Events are ordered, so we can stop early pending[key] = updated
}
return other
}
return nil
}
func removeElementFromSlice(elements []*list.Element, target *list.Element) []*list.Element {
if len(elements) == 0 {
return elements
}
for i, elem := range elements {
if elem == target {
return append(elements[:i], elements[i+1:]...)
} }
} }
return elements
} }
// Flush forces emission of remaining buffered events (for shutdown). // Flush forces emission of remaining buffered events (for shutdown).
@ -226,11 +278,20 @@ func (s *CorrelationService) Flush() []CorrelatedLog {
} }
} }
// Emit remaining B events as orphans only if explicitly enabled
if s.config.NetworkEmit {
for elem := s.bufferB.events.Front(); elem != nil; elem = elem.Next() {
event := elem.Value.(*NormalizedEvent)
orphan := NewCorrelatedLogFromEvent(event, "B")
results = append(results, orphan)
}
}
// Clear buffers // Clear buffers
s.bufferA.events.Init() s.bufferA.events.Init()
s.bufferB.events.Init() s.bufferB.events.Init()
s.pendingA = make(map[string]*list.Element) s.pendingA = make(map[string][]*list.Element)
s.pendingB = make(map[string]*list.Element) s.pendingB = make(map[string][]*list.Element)
return results return results
} }

View File

@ -144,10 +144,14 @@ func TestCorrelationService_Flush(t *testing.T) {
SrcPort: 8080, SrcPort: 8080,
} }
svc.ProcessEvent(apacheEvent) // A est émis immédiatement quand ApacheAlwaysEmit=true
results := svc.ProcessEvent(apacheEvent)
if len(results) != 1 {
t.Fatalf("expected 1 immediate orphan event, got %d", len(results))
}
flushed := svc.Flush() flushed := svc.Flush()
if len(flushed) != 1 { if len(flushed) != 0 {
t.Errorf("expected 1 flushed event, got %d", len(flushed)) t.Errorf("expected 0 flushed events, got %d", len(flushed))
} }
} }