Files
logcorrelator/internal/observability/metrics_server.go
toto e9dcd8ea51 feat: observability, IP filtering, stdout/clickhouse fixes (v1.1.11)
- feat(observability): metrics server with /metrics and /health endpoints
- feat(observability): correlation metrics (events, success/failed, reasons, buffers)
- feat(correlation): IP exclusion filter (exact IPs and CIDR ranges)
- feat(correlation): pending orphan delay for late-arriving B events
- fix(stdout): sink is now a no-op for data; JSON must never appear on stdout
- fix(clickhouse): all flush errors were silently discarded, now properly logged
- fix(clickhouse): buffer overflow with DropOnOverflow now logged at WARN
- fix(clickhouse): retry attempts logged at WARN with attempt/delay/error context
- feat(clickhouse): connection success logged at INFO, batch sends at DEBUG
- feat(clickhouse): SetLogger() for external logger injection
- test(stdout): assert stdout remains empty for correlated and orphan logs
- chore(rpm): bump version to 1.1.11, update changelog
- docs: README and architecture.yml updated

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-05 11:40:54 +01:00

129 lines
2.8 KiB
Go

package observability
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"sync"
"time"
)
// MetricsServer exposes correlation metrics via HTTP.
type MetricsServer struct {
mu sync.Mutex
server *http.Server
listener net.Listener
metricsFunc func() MetricsSnapshot
running bool
}
// NewMetricsServer creates a new metrics HTTP server.
func NewMetricsServer(addr string, metricsFunc func() MetricsSnapshot) (*MetricsServer, error) {
if metricsFunc == nil {
return nil, fmt.Errorf("metricsFunc cannot be nil")
}
ms := &MetricsServer{
metricsFunc: metricsFunc,
}
mux := http.NewServeMux()
mux.HandleFunc("/metrics", ms.handleMetrics)
mux.HandleFunc("/health", ms.handleHealth)
ms.server = &http.Server{
Addr: addr,
Handler: mux,
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
}
return ms, nil
}
// Start begins listening on the configured address.
func (ms *MetricsServer) Start() error {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.running {
return nil
}
listener, err := net.Listen("tcp", ms.server.Addr)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
ms.listener = listener
ms.running = true
go func() {
if err := ms.server.Serve(listener); err != nil && err != http.ErrServerClosed {
// Server error or closed
}
}()
return nil
}
// Stop gracefully stops the metrics server.
func (ms *MetricsServer) Stop(ctx context.Context) error {
ms.mu.Lock()
defer ms.mu.Unlock()
if !ms.running {
return nil
}
ms.running = false
return ms.server.Shutdown(ctx)
}
// handleMetrics returns the correlation metrics as JSON.
func (ms *MetricsServer) handleMetrics(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
metrics := ms.metricsFunc()
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(metrics); err != nil {
http.Error(w, "Failed to encode metrics", http.StatusInternalServerError)
return
}
}
// handleHealth returns a simple health check response.
func (ms *MetricsServer) handleHealth(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{"status":"healthy"}`)
}
// IsRunning returns true if the server is running.
func (ms *MetricsServer) IsRunning() bool {
ms.mu.Lock()
defer ms.mu.Unlock()
return ms.running
}
// Addr returns the listening address.
func (ms *MetricsServer) Addr() string {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.listener == nil {
return ""
}
return ms.listener.Addr().String()
}