package observability import ( "encoding/json" "fmt" "strings" "sync" "sync/atomic" ) // CorrelationMetrics tracks correlation statistics for debugging and monitoring. type CorrelationMetrics struct { mu sync.RWMutex // Events received eventsReceivedA atomic.Int64 eventsReceivedB atomic.Int64 // Correlation results correlationsSuccess atomic.Int64 correlationsFailed atomic.Int64 // Failure reasons failedNoMatchKey atomic.Int64 // No event with same key in buffer failedTimeWindow atomic.Int64 // Key found but outside time window failedBufferEviction atomic.Int64 // Event evicted due to buffer full failedTTLExpired atomic.Int64 // B event TTL expired before match failedIPExcluded atomic.Int64 // Event excluded by IP filter // Buffer stats bufferASize atomic.Int64 bufferBSize atomic.Int64 // Orphan stats orphansEmittedA atomic.Int64 orphansEmittedB atomic.Int64 orphansPendingA atomic.Int64 pendingOrphanMatch atomic.Int64 // B matched with pending orphan A // Keep-Alive stats keepAliveResets atomic.Int64 // Number of TTL resets (one-to-many mode) } // NewCorrelationMetrics creates a new metrics tracker. func NewCorrelationMetrics() *CorrelationMetrics { return &CorrelationMetrics{} } // RecordEventReceived records an event received from a source. func (m *CorrelationMetrics) RecordEventReceived(source string) { if source == "A" { m.eventsReceivedA.Add(1) } else if source == "B" { m.eventsReceivedB.Add(1) } } // RecordCorrelationSuccess records a successful correlation. func (m *CorrelationMetrics) RecordCorrelationSuccess() { m.correlationsSuccess.Add(1) } // RecordCorrelationFailed records a failed correlation attempt with the reason. func (m *CorrelationMetrics) RecordCorrelationFailed(reason string) { m.correlationsFailed.Add(1) switch reason { case "no_match_key": m.failedNoMatchKey.Add(1) case "time_window": m.failedTimeWindow.Add(1) case "buffer_eviction": m.failedBufferEviction.Add(1) case "ttl_expired": m.failedTTLExpired.Add(1) case "ip_excluded": m.failedIPExcluded.Add(1) } } // RecordBufferEviction records an event evicted from buffer. func (m *CorrelationMetrics) RecordBufferEviction(source string) { // Can be used for additional tracking if needed } // RecordOrphanEmitted records an orphan event emitted. func (m *CorrelationMetrics) RecordOrphanEmitted(source string) { if source == "A" { m.orphansEmittedA.Add(1) } else if source == "B" { m.orphansEmittedB.Add(1) } } // RecordPendingOrphan records an A event added to pending orphans. func (m *CorrelationMetrics) RecordPendingOrphan() { m.orphansPendingA.Add(1) } // RecordPendingOrphanMatch records a B event matching a pending orphan A. func (m *CorrelationMetrics) RecordPendingOrphanMatch() { m.pendingOrphanMatch.Add(1) } // RecordKeepAliveReset records a TTL reset for Keep-Alive. func (m *CorrelationMetrics) RecordKeepAliveReset() { m.keepAliveResets.Add(1) } // UpdateBufferSizes updates the current buffer sizes. func (m *CorrelationMetrics) UpdateBufferSizes(sizeA, sizeB int64) { m.bufferASize.Store(sizeA) m.bufferBSize.Store(sizeB) } // Snapshot returns a point-in-time snapshot of all metrics. func (m *CorrelationMetrics) Snapshot() MetricsSnapshot { return MetricsSnapshot{ EventsReceivedA: m.eventsReceivedA.Load(), EventsReceivedB: m.eventsReceivedB.Load(), CorrelationsSuccess: m.correlationsSuccess.Load(), CorrelationsFailed: m.correlationsFailed.Load(), FailedNoMatchKey: m.failedNoMatchKey.Load(), FailedTimeWindow: m.failedTimeWindow.Load(), FailedBufferEviction: m.failedBufferEviction.Load(), FailedTTLExpired: m.failedTTLExpired.Load(), FailedIPExcluded: m.failedIPExcluded.Load(), BufferASize: m.bufferASize.Load(), BufferBSize: m.bufferBSize.Load(), OrphansEmittedA: m.orphansEmittedA.Load(), OrphansEmittedB: m.orphansEmittedB.Load(), OrphansPendingA: m.orphansPendingA.Load(), PendingOrphanMatch: m.pendingOrphanMatch.Load(), KeepAliveResets: m.keepAliveResets.Load(), } } // MetricsSnapshot is a point-in-time snapshot of metrics. type MetricsSnapshot struct { EventsReceivedA int64 `json:"events_received_a"` EventsReceivedB int64 `json:"events_received_b"` CorrelationsSuccess int64 `json:"correlations_success"` CorrelationsFailed int64 `json:"correlations_failed"` FailedNoMatchKey int64 `json:"failed_no_match_key"` FailedTimeWindow int64 `json:"failed_time_window"` FailedBufferEviction int64 `json:"failed_buffer_eviction"` FailedTTLExpired int64 `json:"failed_ttl_expired"` FailedIPExcluded int64 `json:"failed_ip_excluded"` BufferASize int64 `json:"buffer_a_size"` BufferBSize int64 `json:"buffer_b_size"` OrphansEmittedA int64 `json:"orphans_emitted_a"` OrphansEmittedB int64 `json:"orphans_emitted_b"` OrphansPendingA int64 `json:"orphans_pending_a"` PendingOrphanMatch int64 `json:"pending_orphan_match"` KeepAliveResets int64 `json:"keepalive_resets"` } // MarshalJSON implements json.Marshaler. func (m *CorrelationMetrics) MarshalJSON() ([]byte, error) { return json.Marshal(m.Snapshot()) } // String returns a human-readable string of metrics. func (m *CorrelationMetrics) String() string { s := m.Snapshot() var b strings.Builder b.WriteString("Correlation Metrics:\n") fmt.Fprintf(&b, " Events Received: A=%d B=%d Total=%d\n", s.EventsReceivedA, s.EventsReceivedB, s.EventsReceivedA+s.EventsReceivedB) fmt.Fprintf(&b, " Correlations: Success=%d Failed=%d\n", s.CorrelationsSuccess, s.CorrelationsFailed) fmt.Fprintf(&b, " Failure Reasons: no_match_key=%d time_window=%d buffer_eviction=%d ttl_expired=%d ip_excluded=%d\n", s.FailedNoMatchKey, s.FailedTimeWindow, s.FailedBufferEviction, s.FailedTTLExpired, s.FailedIPExcluded) fmt.Fprintf(&b, " Buffer Sizes: A=%d B=%d\n", s.BufferASize, s.BufferBSize) fmt.Fprintf(&b, " Orphans: Emitted A=%d B=%d Pending A=%d\n", s.OrphansEmittedA, s.OrphansEmittedB, s.OrphansPendingA) fmt.Fprintf(&b, " Pending Orphan Match: %d\n", s.PendingOrphanMatch) fmt.Fprintf(&b, " Keep-Alive Resets: %d\n", s.KeepAliveResets) return b.String() }