- feat(observability): metrics server with /metrics and /health endpoints - feat(observability): correlation metrics (events, success/failed, reasons, buffers) - feat(correlation): IP exclusion filter (exact IPs and CIDR ranges) - feat(correlation): pending orphan delay for late-arriving B events - fix(stdout): sink is now a no-op for data; JSON must never appear on stdout - fix(clickhouse): all flush errors were silently discarded, now properly logged - fix(clickhouse): buffer overflow with DropOnOverflow now logged at WARN - fix(clickhouse): retry attempts logged at WARN with attempt/delay/error context - feat(clickhouse): connection success logged at INFO, batch sends at DEBUG - feat(clickhouse): SetLogger() for external logger injection - test(stdout): assert stdout remains empty for correlated and orphan logs - chore(rpm): bump version to 1.1.11, update changelog - docs: README and architecture.yml updated Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
177 lines
6.1 KiB
Go
177 lines
6.1 KiB
Go
package observability
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
// CorrelationMetrics tracks correlation statistics for debugging and monitoring.
|
|
type CorrelationMetrics struct {
|
|
mu sync.RWMutex
|
|
|
|
// Events received
|
|
eventsReceivedA atomic.Int64
|
|
eventsReceivedB atomic.Int64
|
|
|
|
// Correlation results
|
|
correlationsSuccess atomic.Int64
|
|
correlationsFailed atomic.Int64
|
|
|
|
// Failure reasons
|
|
failedNoMatchKey atomic.Int64 // No event with same key in buffer
|
|
failedTimeWindow atomic.Int64 // Key found but outside time window
|
|
failedBufferEviction atomic.Int64 // Event evicted due to buffer full
|
|
failedTTLExpired atomic.Int64 // B event TTL expired before match
|
|
failedIPExcluded atomic.Int64 // Event excluded by IP filter
|
|
|
|
// Buffer stats
|
|
bufferASize atomic.Int64
|
|
bufferBSize atomic.Int64
|
|
|
|
// Orphan stats
|
|
orphansEmittedA atomic.Int64
|
|
orphansEmittedB atomic.Int64
|
|
orphansPendingA atomic.Int64
|
|
pendingOrphanMatch atomic.Int64 // B matched with pending orphan A
|
|
|
|
// Keep-Alive stats
|
|
keepAliveResets atomic.Int64 // Number of TTL resets (one-to-many mode)
|
|
}
|
|
|
|
// NewCorrelationMetrics creates a new metrics tracker.
|
|
func NewCorrelationMetrics() *CorrelationMetrics {
|
|
return &CorrelationMetrics{}
|
|
}
|
|
|
|
// RecordEventReceived records an event received from a source.
|
|
func (m *CorrelationMetrics) RecordEventReceived(source string) {
|
|
if source == "A" {
|
|
m.eventsReceivedA.Add(1)
|
|
} else if source == "B" {
|
|
m.eventsReceivedB.Add(1)
|
|
}
|
|
}
|
|
|
|
// RecordCorrelationSuccess records a successful correlation.
|
|
func (m *CorrelationMetrics) RecordCorrelationSuccess() {
|
|
m.correlationsSuccess.Add(1)
|
|
}
|
|
|
|
// RecordCorrelationFailed records a failed correlation attempt with the reason.
|
|
func (m *CorrelationMetrics) RecordCorrelationFailed(reason string) {
|
|
m.correlationsFailed.Add(1)
|
|
switch reason {
|
|
case "no_match_key":
|
|
m.failedNoMatchKey.Add(1)
|
|
case "time_window":
|
|
m.failedTimeWindow.Add(1)
|
|
case "buffer_eviction":
|
|
m.failedBufferEviction.Add(1)
|
|
case "ttl_expired":
|
|
m.failedTTLExpired.Add(1)
|
|
case "ip_excluded":
|
|
m.failedIPExcluded.Add(1)
|
|
}
|
|
}
|
|
|
|
// RecordBufferEviction records an event evicted from buffer.
|
|
func (m *CorrelationMetrics) RecordBufferEviction(source string) {
|
|
// Can be used for additional tracking if needed
|
|
}
|
|
|
|
// RecordOrphanEmitted records an orphan event emitted.
|
|
func (m *CorrelationMetrics) RecordOrphanEmitted(source string) {
|
|
if source == "A" {
|
|
m.orphansEmittedA.Add(1)
|
|
} else if source == "B" {
|
|
m.orphansEmittedB.Add(1)
|
|
}
|
|
}
|
|
|
|
// RecordPendingOrphan records an A event added to pending orphans.
|
|
func (m *CorrelationMetrics) RecordPendingOrphan() {
|
|
m.orphansPendingA.Add(1)
|
|
}
|
|
|
|
// RecordPendingOrphanMatch records a B event matching a pending orphan A.
|
|
func (m *CorrelationMetrics) RecordPendingOrphanMatch() {
|
|
m.pendingOrphanMatch.Add(1)
|
|
}
|
|
|
|
// RecordKeepAliveReset records a TTL reset for Keep-Alive.
|
|
func (m *CorrelationMetrics) RecordKeepAliveReset() {
|
|
m.keepAliveResets.Add(1)
|
|
}
|
|
|
|
// UpdateBufferSizes updates the current buffer sizes.
|
|
func (m *CorrelationMetrics) UpdateBufferSizes(sizeA, sizeB int64) {
|
|
m.bufferASize.Store(sizeA)
|
|
m.bufferBSize.Store(sizeB)
|
|
}
|
|
|
|
// Snapshot returns a point-in-time snapshot of all metrics.
|
|
func (m *CorrelationMetrics) Snapshot() MetricsSnapshot {
|
|
return MetricsSnapshot{
|
|
EventsReceivedA: m.eventsReceivedA.Load(),
|
|
EventsReceivedB: m.eventsReceivedB.Load(),
|
|
CorrelationsSuccess: m.correlationsSuccess.Load(),
|
|
CorrelationsFailed: m.correlationsFailed.Load(),
|
|
FailedNoMatchKey: m.failedNoMatchKey.Load(),
|
|
FailedTimeWindow: m.failedTimeWindow.Load(),
|
|
FailedBufferEviction: m.failedBufferEviction.Load(),
|
|
FailedTTLExpired: m.failedTTLExpired.Load(),
|
|
FailedIPExcluded: m.failedIPExcluded.Load(),
|
|
BufferASize: m.bufferASize.Load(),
|
|
BufferBSize: m.bufferBSize.Load(),
|
|
OrphansEmittedA: m.orphansEmittedA.Load(),
|
|
OrphansEmittedB: m.orphansEmittedB.Load(),
|
|
OrphansPendingA: m.orphansPendingA.Load(),
|
|
PendingOrphanMatch: m.pendingOrphanMatch.Load(),
|
|
KeepAliveResets: m.keepAliveResets.Load(),
|
|
}
|
|
}
|
|
|
|
// MetricsSnapshot is a point-in-time snapshot of metrics.
|
|
type MetricsSnapshot struct {
|
|
EventsReceivedA int64 `json:"events_received_a"`
|
|
EventsReceivedB int64 `json:"events_received_b"`
|
|
CorrelationsSuccess int64 `json:"correlations_success"`
|
|
CorrelationsFailed int64 `json:"correlations_failed"`
|
|
FailedNoMatchKey int64 `json:"failed_no_match_key"`
|
|
FailedTimeWindow int64 `json:"failed_time_window"`
|
|
FailedBufferEviction int64 `json:"failed_buffer_eviction"`
|
|
FailedTTLExpired int64 `json:"failed_ttl_expired"`
|
|
FailedIPExcluded int64 `json:"failed_ip_excluded"`
|
|
BufferASize int64 `json:"buffer_a_size"`
|
|
BufferBSize int64 `json:"buffer_b_size"`
|
|
OrphansEmittedA int64 `json:"orphans_emitted_a"`
|
|
OrphansEmittedB int64 `json:"orphans_emitted_b"`
|
|
OrphansPendingA int64 `json:"orphans_pending_a"`
|
|
PendingOrphanMatch int64 `json:"pending_orphan_match"`
|
|
KeepAliveResets int64 `json:"keepalive_resets"`
|
|
}
|
|
|
|
// MarshalJSON implements json.Marshaler.
|
|
func (m *CorrelationMetrics) MarshalJSON() ([]byte, error) {
|
|
return json.Marshal(m.Snapshot())
|
|
}
|
|
|
|
// String returns a human-readable string of metrics.
|
|
func (m *CorrelationMetrics) String() string {
|
|
s := m.Snapshot()
|
|
var b strings.Builder
|
|
b.WriteString("Correlation Metrics:\n")
|
|
fmt.Fprintf(&b, " Events Received: A=%d B=%d Total=%d\n", s.EventsReceivedA, s.EventsReceivedB, s.EventsReceivedA+s.EventsReceivedB)
|
|
fmt.Fprintf(&b, " Correlations: Success=%d Failed=%d\n", s.CorrelationsSuccess, s.CorrelationsFailed)
|
|
fmt.Fprintf(&b, " Failure Reasons: no_match_key=%d time_window=%d buffer_eviction=%d ttl_expired=%d ip_excluded=%d\n",
|
|
s.FailedNoMatchKey, s.FailedTimeWindow, s.FailedBufferEviction, s.FailedTTLExpired, s.FailedIPExcluded)
|
|
fmt.Fprintf(&b, " Buffer Sizes: A=%d B=%d\n", s.BufferASize, s.BufferBSize)
|
|
fmt.Fprintf(&b, " Orphans: Emitted A=%d B=%d Pending A=%d\n", s.OrphansEmittedA, s.OrphansEmittedB, s.OrphansPendingA)
|
|
fmt.Fprintf(&b, " Pending Orphan Match: %d\n", s.PendingOrphanMatch)
|
|
fmt.Fprintf(&b, " Keep-Alive Resets: %d\n", s.KeepAliveResets)
|
|
return b.String()
|
|
}
|