Files
logcorrelator/internal/observability/metrics.go
toto e9dcd8ea51 feat: observability, IP filtering, stdout/clickhouse fixes (v1.1.11)
- feat(observability): metrics server with /metrics and /health endpoints
- feat(observability): correlation metrics (events, success/failed, reasons, buffers)
- feat(correlation): IP exclusion filter (exact IPs and CIDR ranges)
- feat(correlation): pending orphan delay for late-arriving B events
- fix(stdout): sink is now a no-op for data; JSON must never appear on stdout
- fix(clickhouse): all flush errors were silently discarded, now properly logged
- fix(clickhouse): buffer overflow with DropOnOverflow now logged at WARN
- fix(clickhouse): retry attempts logged at WARN with attempt/delay/error context
- feat(clickhouse): connection success logged at INFO, batch sends at DEBUG
- feat(clickhouse): SetLogger() for external logger injection
- test(stdout): assert stdout remains empty for correlated and orphan logs
- chore(rpm): bump version to 1.1.11, update changelog
- docs: README and architecture.yml updated

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-05 11:40:54 +01:00

177 lines
6.1 KiB
Go

package observability
import (
"encoding/json"
"fmt"
"strings"
"sync"
"sync/atomic"
)
// CorrelationMetrics tracks correlation statistics for debugging and monitoring.
type CorrelationMetrics struct {
mu sync.RWMutex
// Events received
eventsReceivedA atomic.Int64
eventsReceivedB atomic.Int64
// Correlation results
correlationsSuccess atomic.Int64
correlationsFailed atomic.Int64
// Failure reasons
failedNoMatchKey atomic.Int64 // No event with same key in buffer
failedTimeWindow atomic.Int64 // Key found but outside time window
failedBufferEviction atomic.Int64 // Event evicted due to buffer full
failedTTLExpired atomic.Int64 // B event TTL expired before match
failedIPExcluded atomic.Int64 // Event excluded by IP filter
// Buffer stats
bufferASize atomic.Int64
bufferBSize atomic.Int64
// Orphan stats
orphansEmittedA atomic.Int64
orphansEmittedB atomic.Int64
orphansPendingA atomic.Int64
pendingOrphanMatch atomic.Int64 // B matched with pending orphan A
// Keep-Alive stats
keepAliveResets atomic.Int64 // Number of TTL resets (one-to-many mode)
}
// NewCorrelationMetrics creates a new metrics tracker.
func NewCorrelationMetrics() *CorrelationMetrics {
return &CorrelationMetrics{}
}
// RecordEventReceived records an event received from a source.
func (m *CorrelationMetrics) RecordEventReceived(source string) {
if source == "A" {
m.eventsReceivedA.Add(1)
} else if source == "B" {
m.eventsReceivedB.Add(1)
}
}
// RecordCorrelationSuccess records a successful correlation.
func (m *CorrelationMetrics) RecordCorrelationSuccess() {
m.correlationsSuccess.Add(1)
}
// RecordCorrelationFailed records a failed correlation attempt with the reason.
func (m *CorrelationMetrics) RecordCorrelationFailed(reason string) {
m.correlationsFailed.Add(1)
switch reason {
case "no_match_key":
m.failedNoMatchKey.Add(1)
case "time_window":
m.failedTimeWindow.Add(1)
case "buffer_eviction":
m.failedBufferEviction.Add(1)
case "ttl_expired":
m.failedTTLExpired.Add(1)
case "ip_excluded":
m.failedIPExcluded.Add(1)
}
}
// RecordBufferEviction records an event evicted from buffer.
func (m *CorrelationMetrics) RecordBufferEviction(source string) {
// Can be used for additional tracking if needed
}
// RecordOrphanEmitted records an orphan event emitted.
func (m *CorrelationMetrics) RecordOrphanEmitted(source string) {
if source == "A" {
m.orphansEmittedA.Add(1)
} else if source == "B" {
m.orphansEmittedB.Add(1)
}
}
// RecordPendingOrphan records an A event added to pending orphans.
func (m *CorrelationMetrics) RecordPendingOrphan() {
m.orphansPendingA.Add(1)
}
// RecordPendingOrphanMatch records a B event matching a pending orphan A.
func (m *CorrelationMetrics) RecordPendingOrphanMatch() {
m.pendingOrphanMatch.Add(1)
}
// RecordKeepAliveReset records a TTL reset for Keep-Alive.
func (m *CorrelationMetrics) RecordKeepAliveReset() {
m.keepAliveResets.Add(1)
}
// UpdateBufferSizes updates the current buffer sizes.
func (m *CorrelationMetrics) UpdateBufferSizes(sizeA, sizeB int64) {
m.bufferASize.Store(sizeA)
m.bufferBSize.Store(sizeB)
}
// Snapshot returns a point-in-time snapshot of all metrics.
func (m *CorrelationMetrics) Snapshot() MetricsSnapshot {
return MetricsSnapshot{
EventsReceivedA: m.eventsReceivedA.Load(),
EventsReceivedB: m.eventsReceivedB.Load(),
CorrelationsSuccess: m.correlationsSuccess.Load(),
CorrelationsFailed: m.correlationsFailed.Load(),
FailedNoMatchKey: m.failedNoMatchKey.Load(),
FailedTimeWindow: m.failedTimeWindow.Load(),
FailedBufferEviction: m.failedBufferEviction.Load(),
FailedTTLExpired: m.failedTTLExpired.Load(),
FailedIPExcluded: m.failedIPExcluded.Load(),
BufferASize: m.bufferASize.Load(),
BufferBSize: m.bufferBSize.Load(),
OrphansEmittedA: m.orphansEmittedA.Load(),
OrphansEmittedB: m.orphansEmittedB.Load(),
OrphansPendingA: m.orphansPendingA.Load(),
PendingOrphanMatch: m.pendingOrphanMatch.Load(),
KeepAliveResets: m.keepAliveResets.Load(),
}
}
// MetricsSnapshot is a point-in-time snapshot of metrics.
type MetricsSnapshot struct {
EventsReceivedA int64 `json:"events_received_a"`
EventsReceivedB int64 `json:"events_received_b"`
CorrelationsSuccess int64 `json:"correlations_success"`
CorrelationsFailed int64 `json:"correlations_failed"`
FailedNoMatchKey int64 `json:"failed_no_match_key"`
FailedTimeWindow int64 `json:"failed_time_window"`
FailedBufferEviction int64 `json:"failed_buffer_eviction"`
FailedTTLExpired int64 `json:"failed_ttl_expired"`
FailedIPExcluded int64 `json:"failed_ip_excluded"`
BufferASize int64 `json:"buffer_a_size"`
BufferBSize int64 `json:"buffer_b_size"`
OrphansEmittedA int64 `json:"orphans_emitted_a"`
OrphansEmittedB int64 `json:"orphans_emitted_b"`
OrphansPendingA int64 `json:"orphans_pending_a"`
PendingOrphanMatch int64 `json:"pending_orphan_match"`
KeepAliveResets int64 `json:"keepalive_resets"`
}
// MarshalJSON implements json.Marshaler.
func (m *CorrelationMetrics) MarshalJSON() ([]byte, error) {
return json.Marshal(m.Snapshot())
}
// String returns a human-readable string of metrics.
func (m *CorrelationMetrics) String() string {
s := m.Snapshot()
var b strings.Builder
b.WriteString("Correlation Metrics:\n")
fmt.Fprintf(&b, " Events Received: A=%d B=%d Total=%d\n", s.EventsReceivedA, s.EventsReceivedB, s.EventsReceivedA+s.EventsReceivedB)
fmt.Fprintf(&b, " Correlations: Success=%d Failed=%d\n", s.CorrelationsSuccess, s.CorrelationsFailed)
fmt.Fprintf(&b, " Failure Reasons: no_match_key=%d time_window=%d buffer_eviction=%d ttl_expired=%d ip_excluded=%d\n",
s.FailedNoMatchKey, s.FailedTimeWindow, s.FailedBufferEviction, s.FailedTTLExpired, s.FailedIPExcluded)
fmt.Fprintf(&b, " Buffer Sizes: A=%d B=%d\n", s.BufferASize, s.BufferBSize)
fmt.Fprintf(&b, " Orphans: Emitted A=%d B=%d Pending A=%d\n", s.OrphansEmittedA, s.OrphansEmittedB, s.OrphansPendingA)
fmt.Fprintf(&b, " Pending Orphan Match: %d\n", s.PendingOrphanMatch)
fmt.Fprintf(&b, " Keep-Alive Resets: %d\n", s.KeepAliveResets)
return b.String()
}