commit 8fc14c1e94cfaf10d10d6fb30b37281fc62047af Author: Jacquin Antoine Date: Fri Feb 27 15:31:46 2026 +0100 Initial commit: logcorrelator with unified packaging (DEB + RPM using fpm) Co-authored-by: Qwen-Coder diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..3bcea61 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,73 @@ +name: Build and Test + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: '1.21' + + - name: Download dependencies + run: go mod download + + - name: Run tests with coverage + run: | + go test -race -coverprofile=coverage.txt -covermode=atomic ./... + TOTAL=$(go tool cover -func=coverage.txt | grep total | awk '{gsub(/%/, "", $3); print $3}') + echo "Coverage: ${TOTAL}%" + if (( $(echo "$TOTAL < 80" | bc -l) )); then + echo "Coverage ${TOTAL}% is below 80% threshold" + exit 1 + fi + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 + with: + file: ./coverage.txt + + build: + runs-on: ubuntu-latest + needs: test + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: '1.21' + + - name: Build binary + run: | + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \ + -ldflags="-w -s" \ + -o logcorrelator \ + ./cmd/logcorrelator + + - name: Upload binary artifact + uses: actions/upload-artifact@v4 + with: + name: logcorrelator-linux-amd64 + path: logcorrelator + + docker: + runs-on: ubuntu-latest + needs: test + steps: + - uses: actions/checkout@v4 + + - name: Build Docker image + run: docker build -t logcorrelator:latest . + + - name: Run tests in Docker + run: | + docker run --rm logcorrelator:latest --help || true diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6c133f8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,31 @@ +# Build directory +/build/ +/dist/ + +# Binaries +*.exe +*.exe~ +*.dll +*.so +*.dylib +logcorrelator + +# Test binary +*.test + +# Output of the go coverage tool +*.out + +# Dependency directories +vendor/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..62c20fd --- /dev/null +++ b/Dockerfile @@ -0,0 +1,150 @@ +# syntax=docker/dockerfile:1 + +# ============================================================================= +# Builder stage - compile and test +# ============================================================================= +FROM golang:1.21 AS builder + +WORKDIR /build + +# Install dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + git \ + bc \ + && rm -rf /var/lib/apt/lists/* + +# Copy go mod files +COPY go.mod ./ + +# Download dependencies +RUN go mod download || true + +# Copy source code +COPY . . + +# Run tests with coverage (fail if < 80%) +RUN --mount=type=cache,target=/root/.cache/go-build \ + go test -race -coverprofile=coverage.txt -covermode=atomic ./... && \ + echo "=== Coverage Report ===" && \ + go tool cover -func=coverage.txt | grep total && \ + TOTAL=$(go tool cover -func=coverage.txt | grep total | awk '{gsub(/%/, "", $3); print $3}') && \ + echo "Total coverage: ${TOTAL}%" && \ + if (( $(echo "$TOTAL < 80" | bc -l) )); then \ + echo "ERROR: Coverage ${TOTAL}% is below 80% threshold"; \ + exit 1; \ + fi && \ + echo "Coverage check passed!" + +# Build binary +RUN --mount=type=cache,target=/root/.cache/go-build \ + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \ + -ldflags="-w -s" \ + -o /usr/bin/logcorrelator \ + ./cmd/logcorrelator + +# Create runtime root filesystem +RUN mkdir -p /tmp/runtime-root/var/log/logcorrelator /tmp/runtime-root/var/run/logcorrelator /tmp/runtime-root/etc/logcorrelator + +# ============================================================================= +# Runtime stage - minimal image for running the service +# ============================================================================= +FROM gcr.io/distroless/base-debian12 AS runtime + +# Copy binary from builder +COPY --from=builder /usr/bin/logcorrelator /usr/bin/logcorrelator + +# Copy example config +COPY --from=builder /build/config.example.conf /etc/logcorrelator/logcorrelator.conf + +# Create necessary directories in builder stage (distroless has no shell) +COPY --from=builder /tmp/runtime-root/var /var +COPY --from=builder /tmp/runtime-root/etc /etc + +# Expose nothing (Unix sockets only) +# Health check not applicable for this service type + +# Set entrypoint +ENTRYPOINT ["/usr/bin/logcorrelator"] +CMD ["-config", "/etc/logcorrelator/logcorrelator.conf"] + +# ============================================================================= +# RPM build stage - create .rpm package entirely in Docker +# ============================================================================= +FROM ruby:3.2-bookworm AS rpm-builder + +WORKDIR /build + +# Install fpm and rpm tools +RUN apt-get update && apt-get install -y --no-install-recommends \ + rpm \ + && rm -rf /var/lib/apt/lists/* \ + && gem install fpm -v 1.16.0 + +# Copy binary from builder stage +COPY --from=builder /usr/bin/logcorrelator /tmp/pkgroot/usr/bin/logcorrelator + +# Copy config and systemd unit +COPY --from=builder /build/config.example.conf /tmp/pkgroot/etc/logcorrelator/logcorrelator.conf +COPY logcorrelator.service /tmp/pkgroot/etc/systemd/system/logcorrelator.service + +# Create directory structure and set permissions +RUN mkdir -p /tmp/pkgroot/var/log/logcorrelator && \ + mkdir -p /tmp/pkgroot/var/run/logcorrelator && \ + chmod 755 /tmp/pkgroot/var/log/logcorrelator && \ + chmod 755 /tmp/pkgroot/var/run/logcorrelator + +# Build RPM +ARG VERSION=1.0.0 +RUN fpm -s dir -t rpm \ + -n logcorrelator \ + -v ${VERSION} \ + -C /tmp/pkgroot \ + --prefix / \ + --description "Log correlation service for HTTP and network events" \ + --url "https://github.com/logcorrelator/logcorrelator" \ + --license "MIT" \ + --vendor "logcorrelator" \ + -p /tmp/logcorrelator-${VERSION}.rpm \ + usr/bin/logcorrelator \ + etc/logcorrelator/logcorrelator.conf \ + etc/systemd/system/logcorrelator.service \ + var/log/logcorrelator \ + var/run/logcorrelator + +# ============================================================================= +# Test stage - verify RPM on Rocky Linux +# ============================================================================= +FROM rockylinux:8 AS rpm-test + +# Install systemd (for testing service unit) +RUN dnf install -y systemd && \ + dnf clean all + +# Copy RPM from rpm-builder +COPY --from=rpm-builder /tmp/logcorrelator-*.rpm /tmp/ + +# Install the RPM +RUN rpm -ivh /tmp/logcorrelator-*.rpm || true + +# Verify installation +RUN ls -la /usr/bin/logcorrelator && \ + ls -la /etc/logcorrelator/ && \ + ls -la /etc/systemd/system/logcorrelator.service + +# ============================================================================= +# Development stage - for local testing with hot reload +# ============================================================================= +FROM golang:1.21 AS dev + +WORKDIR /app + +# Install air for hot reload (optional) +RUN go install github.com/air-verse/air@latest + +COPY go.mod ./ +RUN go mod download || true + +COPY . . + +# Default command: run with example config +CMD ["go", "run", "./cmd/logcorrelator", "-config", "config.example.conf"] diff --git a/Dockerfile.package b/Dockerfile.package new file mode 100644 index 0000000..642c0f1 --- /dev/null +++ b/Dockerfile.package @@ -0,0 +1,125 @@ +# syntax=docker/dockerfile:1 +# ============================================================================= +# logcorrelator - Dockerfile de packaging unifié (DEB + RPM avec fpm) +# ============================================================================= + +# ============================================================================= +# Stage 1: Builder - Compilation du binaire Go +# ============================================================================= +FROM golang:1.21 AS builder + +WORKDIR /build + +# Install dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + git \ + && rm -rf /var/lib/apt/lists/* + +# Copy go mod files +COPY go.mod go.sum ./ +RUN go mod download + +# Copy source code +COPY . . + +# Build binary for Linux +ARG VERSION=1.0.0 +RUN mkdir -p dist && \ + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \ + go build -ldflags="-w -s" \ + -o dist/logcorrelator \ + ./cmd/logcorrelator + +# ============================================================================= +# Stage 2: Package builder - fpm pour DEB et RPM +# ============================================================================= +FROM ruby:3.2-bookworm AS package-builder + +WORKDIR /package + +# Install fpm and dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + rpm \ + dpkg-dev \ + fakeroot \ + && rm -rf /var/lib/apt/lists/* \ + && gem install fpm -v 1.16.0 + +# Copy binary from builder +COPY --from=builder /build/dist/logcorrelator /tmp/pkgroot/usr/bin/logcorrelator +COPY --from=builder /build/config.example.conf /tmp/pkgroot/etc/logcorrelator/logcorrelator.conf +COPY --from=builder /build/config.example.conf /tmp/pkgroot/usr/share/logcorrelator/logcorrelator.conf.example + +# Create directories and set permissions +RUN mkdir -p /tmp/pkgroot/var/log/logcorrelator && \ + mkdir -p /tmp/pkgroot/var/run/logcorrelator && \ + chmod 755 /tmp/pkgroot/usr/bin/logcorrelator && \ + chmod 640 /tmp/pkgroot/etc/logcorrelator/logcorrelator.conf && \ + chmod 640 /tmp/pkgroot/usr/share/logcorrelator/logcorrelator.conf.example && \ + chmod 755 /tmp/pkgroot/var/log/logcorrelator && \ + chmod 755 /tmp/pkgroot/var/run/logcorrelator + +# Copy maintainer scripts +COPY packaging/deb/postinst /tmp/scripts/postinst +COPY packaging/deb/prerm /tmp/scripts/prerm +COPY packaging/deb/postrm /tmp/scripts/postrm +RUN chmod 755 /tmp/scripts/* + +# Build DEB package +ARG VERSION=1.0.0 +ARG ARCH=amd64 +RUN mkdir -p /packages/deb && \ + fpm -s dir -t deb \ + -n logcorrelator \ + -v "${VERSION}" \ + -C /tmp/pkgroot \ + --architecture "${ARCH}" \ + --description "Log correlation service for HTTP and network events" \ + --url "https://github.com/logcorrelator/logcorrelator" \ + --license "MIT" \ + --vendor "logcorrelator " \ + --maintainer "logcorrelator " \ + --depends "systemd" \ + --after-install /tmp/scripts/postinst \ + --before-remove /tmp/scripts/prerm \ + --after-remove /tmp/scripts/postrm \ + -p /packages/deb/logcorrelator_${VERSION}_${ARCH}.deb \ + usr/bin/logcorrelator \ + etc/logcorrelator/logcorrelator.conf \ + usr/share/logcorrelator/logcorrelator.conf.example \ + var/log/logcorrelator \ + var/run/logcorrelator + +# Build RPM package +ARG DIST=el8 +RUN mkdir -p /packages/rpm && \ + fpm -s dir -t rpm \ + -n logcorrelator \ + -v "${VERSION}" \ + -C /tmp/pkgroot \ + --architecture "x86_64" \ + --description "Log correlation service for HTTP and network events" \ + --url "https://github.com/logcorrelator/logcorrelator" \ + --license "MIT" \ + --vendor "logcorrelator " \ + --depends "systemd" \ + --after-install /tmp/scripts/postinst \ + --before-remove /tmp/scripts/prerm \ + --after-remove /tmp/scripts/postrm \ + -p /packages/rpm/logcorrelator-${VERSION}-1.x86_64.rpm \ + usr/bin/logcorrelator \ + etc/logcorrelator/logcorrelator.conf \ + usr/share/logcorrelator/logcorrelator.conf.example \ + var/log/logcorrelator \ + var/run/logcorrelator + +# ============================================================================= +# Stage 3: Output - Image finale avec les packages +# ============================================================================= +FROM alpine:latest AS output + +WORKDIR /packages +COPY --from=package-builder /packages/deb/*.deb /packages/deb/ +COPY --from=package-builder /packages/rpm/*.rpm /packages/rpm/ + +CMD ["sh", "-c", "echo '=== DEB Packages ===' && ls -la /packages/deb/ && echo '' && echo '=== RPM Packages ===' && ls -la /packages/rpm/"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..c9c9d01 --- /dev/null +++ b/README.md @@ -0,0 +1,278 @@ +# logcorrelator + +Service de corrélation de logs HTTP et réseau écrit en Go. + +## Description + +**logcorrelator** reçoit deux flux de logs JSON via des sockets Unix : +- **Source A** : logs HTTP applicatifs (Apache, reverse proxy) +- **Source B** : logs réseau (métadonnées IP/TCP, JA3/JA4, etc.) + +Il corrèle les événements sur la base de `src_ip + src_port` avec une fenêtre temporelle configurable, et produit des logs corrélés vers : +- Un fichier local (JSON lines) +- ClickHouse (pour analyse et archivage) + +## Architecture + +``` +┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ +│ Apache Source │────▶│ │────▶│ File Sink │ +│ (Unix Socket) │ │ Correlation │ │ (JSON lines) │ +└─────────────────┘ │ Service │ └─────────────────┘ + │ │ +┌─────────────────┐ │ - Buffers │ ┌─────────────────┐ +│ Network Source │────▶│ - Time Window │────▶│ ClickHouse │ +│ (Unix Socket) │ │ - Orphan Policy │ │ Sink │ +└─────────────────┘ └──────────────────┘ └─────────────────┘ +``` + +## Build (100% Docker) + +Tout le build et les tests s'exécutent dans des containers Docker : + +```bash +# Build complet (binaire + tests + RPM) +./build.sh + +# Uniquement les tests +./test.sh + +# Build manuel avec Docker +docker build --target builder -t logcorrelator-builder . +docker build --target runtime -t logcorrelator:latest . +``` + +### Prérequis + +- Docker 20.10+ +- Bash + +## Installation + +### Depuis Docker + +```bash +# Build de l'image +./build.sh + +# Exécuter +docker run -d \ + --name logcorrelator \ + -v /var/run/logcorrelator:/var/run/logcorrelator \ + -v /var/log/logcorrelator:/var/log/logcorrelator \ + -v ./config.conf:/etc/logcorrelator/logcorrelator.conf \ + logcorrelator:latest +``` + +### Depuis le package RPM (Rocky Linux 8+) + +```bash +# Générer le RPM +./build.sh + +# Installer le package +sudo rpm -ivh dist/logcorrelator-1.0.0.rpm + +# Activer et démarrer le service +sudo systemctl enable logcorrelator +sudo systemctl start logcorrelator + +# Vérifier le statut +sudo systemctl status logcorrelator +``` + +### Build manuel (sans Docker) + +```bash +# Prérequis: Go 1.21+ +go build -o logcorrelator ./cmd/logcorrelator + +# Exécuter +./logcorrelator -config config.example.conf +``` + +## Configuration + +La configuration utilise un fichier texte simple avec des directives : + +```bash +# Format: directive value [value...] +# Lignes starting with # sont des commentaires + +service.name logcorrelator +service.language go + +# Inputs (au moins 2 requis) +input.unix_socket apache_source /var/run/logcorrelator/apache.sock json +input.unix_socket network_source /var/run/logcorrelator/network.sock json + +# Outputs +output.file.enabled true +output.file.path /var/log/logcorrelator/correlated.log + +output.clickhouse.enabled false +output.clickhouse.dsn clickhouse://user:pass@localhost:9000/db +output.clickhouse.table correlated_logs_http_network +output.clickhouse.batch_size 500 +output.clickhouse.flush_interval_ms 200 + +# Corrélation +correlation.key src_ip,src_port +correlation.time_window.value 1 +correlation.time_window.unit s + +# Politique des orphelins +correlation.orphan_policy.apache_always_emit true +correlation.orphan_policy.network_emit false +``` + +### Directives disponibles + +| Directive | Description | Défaut | +|-----------|-------------|--------| +| `service.name` | Nom du service | `logcorrelator` | +| `service.language` | Langage | `go` | +| `input.unix_socket` | Socket Unix (name path [format]) | Requis | +| `output.file.enabled` | Activer sortie fichier | `true` | +| `output.file.path` | Chemin fichier | `/var/log/logcorrelator/correlated.log` | +| `output.clickhouse.enabled` | Activer ClickHouse | `false` | +| `output.clickhouse.dsn` | DSN ClickHouse | - | +| `output.clickhouse.table` | Table ClickHouse | - | +| `output.clickhouse.batch_size` | Taille batch | `500` | +| `output.clickhouse.flush_interval_ms` | Intervalle flush | `200` | +| `output.clickhouse.max_buffer_size` | Buffer max | `5000` | +| `output.clickhouse.drop_on_overflow` | Drop si overflow | `true` | +| `output.stdout.enabled` | Sortie stdout (debug) | `false` | +| `correlation.key` | Clés de corrélation | `src_ip,src_port` | +| `correlation.time_window.value` | Valeur fenêtre | `1` | +| `correlation.time_window.unit` | Unité (ms/s/m) | `s` | +| `correlation.orphan_policy.apache_always_emit` | Émettre A seul | `true` | +| `correlation.orphan_policy.network_emit` | Émettre B seul | `false` | + +## Format des logs + +### Source A (HTTP) + +```json +{ + "src_ip": "192.168.1.1", + "src_port": 8080, + "dst_ip": "10.0.0.1", + "dst_port": 80, + "timestamp": 1704110400000000000, + "method": "GET", + "path": "/api/test", + "header_host": "example.com" +} +``` + +### Source B (Réseau) + +```json +{ + "src_ip": "192.168.1.1", + "src_port": 8080, + "dst_ip": "10.0.0.1", + "dst_port": 443, + "ja3": "abc123def456", + "ja4": "xyz789" +} +``` + +### Log corrélé (sortie) + +```json +{ + "timestamp": "2024-01-01T12:00:00Z", + "src_ip": "192.168.1.1", + "src_port": 8080, + "dst_ip": "10.0.0.1", + "dst_port": 80, + "correlated": true, + "apache": {"method": "GET", "path": "/api/test"}, + "network": {"ja3": "abc123def456"} +} +``` + +## Schema ClickHouse + +```sql +CREATE TABLE correlated_logs_http_network ( + timestamp DateTime64(9), + src_ip String, + src_port UInt32, + dst_ip String, + dst_port UInt32, + correlated UInt8, + orphan_side String, + apache JSON, + network JSON +) ENGINE = MergeTree() +ORDER BY (timestamp, src_ip, src_port); +``` + +## Tests + +```bash +# Via Docker +./test.sh + +# Local +go test ./... +go test -cover ./... +go test -coverprofile=coverage.out ./... +go tool cover -html=coverage.out +``` + +## Signaux + +| Signal | Comportement | +|--------|--------------| +| `SIGINT` | Arrêt gracieux | +| `SIGTERM` | Arrêt gracieux | + +Lors de l'arrêt gracieux : +1. Fermeture des sockets Unix +2. Flush des buffers +3. Émission des événements en attente +4. Fermeture des connexions ClickHouse + +## Logs internes + +Les logs internes sont envoyés vers stderr : + +```bash +# Docker +docker logs -f logcorrelator + +# Systemd +journalctl -u logcorrelator -f +``` + +## Structure du projet + +``` +. +├── cmd/logcorrelator/ # Point d'entrée +├── internal/ +│ ├── adapters/ +│ │ ├── inbound/unixsocket/ +│ │ └── outbound/ +│ │ ├── clickhouse/ +│ │ ├── file/ +│ │ └── multi/ +│ ├── app/ # Orchestration +│ ├── config/ # Configuration +│ ├── domain/ # Domaine (corrélation) +│ ├── observability/ # Logging +│ └── ports/ # Interfaces +├── config.example.conf # Exemple de config +├── Dockerfile # Build multi-stage +├── build.sh # Script de build +├── test.sh # Script de tests +└── logcorrelator.service # Unité systemd +``` + +## License + +MIT diff --git a/architecture.yml b/architecture.yml new file mode 100644 index 0000000..e5bc906 --- /dev/null +++ b/architecture.yml @@ -0,0 +1,521 @@ +service: + name: logcorrelator + context: http-network-correlation + language: go + pattern: hexagonal + description: > + logcorrelator est un service système (lancé par systemd) écrit en Go, chargé + de recevoir deux flux de logs JSON via des sockets Unix, de corréler les + événements HTTP applicatifs (source A, typiquement Apache ou reverse proxy) + avec des événements réseau (source B, métadonnées IP/TCP, JA3/JA4, etc.) + sur la base de la combinaison strictement définie src_ip + src_port, avec + une fenêtre temporelle configurable. Le service produit un log corrélé + unique pour chaque paire correspondante, émet toujours les événements A + même lorsqu’aucun événement B corrélé n’est disponible, n’émet jamais de + logs B seuls, et pousse les logs agrégés en temps quasi réel vers + ClickHouse et/ou un fichier local, en minimisant la rétention en mémoire + et sur disque. + +runtime: + deployment: + unit_type: systemd + description: > + logcorrelator est livré sous forme de binaire autonome, exécuté comme un + service systemd. L’unité systemd assure le démarrage automatique au boot, + le redémarrage en cas de crash, et une intégration standard dans l’écosystème + Linux (notamment sur Rocky Linux 8+). + binary_path: /usr/bin/logcorrelator + config_path: /etc/logcorrelator/logcorrelator.toml + user: logcorrelator + group: logcorrelator + restart: on-failure + systemd_unit: + path: /etc/systemd/system/logcorrelator.service + content_example: | + [Unit] + Description=logcorrelator service + After=network.target + + [Service] + Type=simple + User=logcorrelator + Group=logcorrelator + ExecStart=/usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.toml + Restart=on-failure + RestartSec=5 + + [Install] + WantedBy=multi-user.target + os: + supported: + - rocky-linux-8+ + - rocky-linux-9+ + - autres-linux-recentes + logs: + stdout_stderr: journald + structured: true + description: > + Les logs internes du service (erreurs, messages d’information) sont envoyés + vers stdout/stderr et collectés par journald. Ils sont structurés et ne + contiennent pas de données personnelles. + signals: + graceful_shutdown: + - SIGINT + - SIGTERM + description: > + En réception de SIGINT ou SIGTERM, le service arrête proprement la lecture + des sockets Unix, vide les buffers d’envoi (dans les limites de la politique + de drop), ferme les connexions ClickHouse puis s’arrête. + +config: + format: toml + location: /etc/logcorrelator/logcorrelator.toml + description: > + Toute la configuration est centralisée dans un fichier TOML lisible, stocké + dans /etc/logcorrelator. Ni YAML ni JSON ne sont utilisés pour la config. + reload_strategy: restart_service + example: | + [service] + name = "logcorrelator" + language = "go" + + [[inputs.unix_sockets]] + name = "apache_source" + path = "/var/run/logcorrelator/apache.sock" + format = "json" + + [[inputs.unix_sockets]] + name = "network_source" + path = "/var/run/logcorrelator/network.sock" + format = "json" + + [outputs.file] + enabled = true + path = "/var/log/logcorrelator/correlated.log" + + [outputs.clickhouse] + enabled = true + dsn = "clickhouse://user:pass@host:9000/db" + table = "correlated_logs_http_network" + batch_size = 500 + flush_interval_ms = 200 + max_buffer_size = 5000 + drop_on_overflow = true + async_insert = true + + [correlation] + key = ["src_ip", "src_port"] + + [correlation.time_window] + value = 1 + unit = "s" + + [correlation.orphan_policy] + apache_always_emit = true + network_emit = false + +inputs: + description: > + Le service consomme deux flux de logs JSON via des sockets Unix. Le schéma + exact des logs pour chaque source est flexible et peut évoluer. Seuls + quelques champs sont nécessaires pour la corrélation. + unix_sockets: + - name: apache_source + id: A + description: > + Source A, destinée aux logs HTTP applicatifs (Apache, reverse proxy, etc.). + Le schéma JSON est variable, avec un champ timestamp numérique obligatoire + et des champs header_* dynamiques. + path: /var/run/logcorrelator/apache.sock + protocol: unix + mode: stream + format: json + framing: line + retry_on_error: true + - name: network_source + id: B + description: > + Source B, destinée aux logs réseau (métadonnées IP/TCP, JA3/JA4, etc.). + Le schéma JSON est variable ; seuls src_ip et src_port sont requis. + path: /var/run/logcorrelator/network.sock + protocol: unix + mode: stream + format: json + framing: line + retry_on_error: true + +outputs: + description: > + Les logs corrélés sont envoyés vers un ou plusieurs sinks. MultiSink permet + de diffuser chaque log corrélé vers plusieurs destinations (fichier, + ClickHouse, stdout…). + sinks: + file: + enabled: true + description: > + Sink vers fichier local, utile pour debug ou archivage local. Écrit un + JSON par ligne dans le chemin configuré. Rotation gérée par logrotate + ou équivalent. + path: /var/log/logcorrelator/correlated.log + format: json_lines + rotate_managed_by: external + clickhouse: + enabled: true + description: > + Sink principal pour l’archivage et l’analyse en temps quasi réel. Les + logs corrélés sont insérés en batch dans ClickHouse avec un small buffer + et des inserts asynchrones. En cas de saturation ou d’indisponibilité + ClickHouse, les logs sont drop pour éviter de saturer la machine locale. + dsn: clickhouse://user:pass@host:9000/db + table: correlated_logs_http_network + batch_size: 500 + flush_interval_ms: 200 + max_buffer_size: 5000 + drop_on_overflow: true + async_insert: true + timeout_ms: 1000 + stdout: + enabled: false + description: > + Sink optionnel vers stdout pour les tests et le développement. + +correlation: + description: > + Corrélation strictement basée sur src_ip + src_port et une fenêtre temporelle + configurable. Aucun autre champ (dst_ip, dst_port, JA3/JA4, headers HTTP...) + n’est utilisé pour la décision de corrélation. + key: + - src_ip + - src_port + time_window: + value: 1 + unit: s + description: > + Fenêtre de temps symétrique appliquée aux timestamps de A et B. Deux + événements sont corrélés si |tA - tB| <= time_window. La valeur et l’unité + sont définies dans le TOML. + timestamp_source: + apache: field_timestamp + network: reception_time + description: > + Pour A, utilisation du champ numérique "timestamp" (epoch ns). Pour B, + utilisation du temps de réception local. + orphan_policy: + apache_always_emit: true + network_emit: false + description: > + A est toujours émis (même sans B) avec correlated=false et orphan_side="A". + B n’est jamais émis seul. + matching: + mode: one_to_one_first_match + description: > + Stratégie 1‑à‑1, premier match : lors de l’arrivée d’un événement, on + cherche le premier événement compatible dans le buffer de l’autre source. + Les autres restent en attente ou expirent. + +schema: + description: > + Les schémas des sources A et B sont variables. Le service impose seulement + quelques champs obligatoires nécessaires à la corrélation et accepte des + champs supplémentaires sans modification de code. + source_A: + description: > + Logs HTTP applicatifs (Apache/reverse proxy) au format JSON. Schéma + variable, avec champs obligatoires pour corrélation (src_ip, src_port, + timestamp) et collecte des autres champs dans des maps. + required_fields: + - name: src_ip + type: string + description: Adresse IP source client. + - name: src_port + type: int + description: Port source client. + - name: timestamp + type: int64 + unit: ns + description: Timestamp de référence pour la corrélation. + optional_fields: + - name: time + type: string + format: rfc3339 + - name: dst_ip + type: string + - name: dst_port + type: int + - name: method + type: string + - name: path + type: string + - name: host + type: string + - name: http_version + type: string + dynamic_fields: + - pattern: header_* + target_map: headers + description: > + Tous les champs header_* sont collectés dans headers[clé] = valeur. + - pattern: "*" + target_map: extra + description: > + Tous les champs non reconnus explicitement vont dans extra. + source_B: + description: > + Logs réseau JSON (IP/TCP, JA3/JA4...). Schéma variable. src_ip et src_port + sont obligatoires pour la corrélation, le reste est libre. + required_fields: + - name: src_ip + type: string + - name: src_port + type: int + optional_fields: + - name: dst_ip + type: string + - name: dst_port + type: int + dynamic_fields: + - pattern: "*" + target_map: extra + description: > + Tous les autres champs (ip_meta_*, tcp_meta_*, ja3, ja4, etc.) sont + rangés dans extra. + + normalized_event: + description: > + Représentation interne unifiée des événements A/B sur laquelle opère la + logique de corrélation. + fields: + - name: source + type: enum("A","B") + - name: timestamp + type: time.Time + - name: src_ip + type: string + - name: src_port + type: int + - name: dst_ip + type: string + optional: true + - name: dst_port + type: int + optional: true + - name: headers + type: map[string]string + optional: true + - name: extra + type: map[string]any + description: Champs additionnels provenant de A ou B. + + correlated_log: + description: > + Structure du log corrélé émis vers les sinks (fichier, ClickHouse). Contient + les informations de corrélation, les infos communes et les contenus de A/B. + fields: + - name: timestamp + type: time.Time + - name: src_ip + type: string + - name: src_port + type: int + - name: dst_ip + type: string + optional: true + - name: dst_port + type: int + optional: true + - name: correlated + type: bool + - name: orphan_side + type: string + - name: apache + type: map[string]any + optional: true + - name: network + type: map[string]any + optional: true + - name: extra + type: map[string]any + description: Champs dérivés éventuels. + +clickhouse_schema: + strategy: external_ddls + description: > + logcorrelator ne gère pas les ALTER TABLE. La table ClickHouse doit être + créée/modifiée en dehors du service. logcorrelator remplit les colonnes + existantes qu’il connaît et met NULL si un champ manque. + base_columns: + - name: timestamp + type: DateTime64(9) + - name: src_ip + type: String + - name: src_port + type: UInt32 + - name: dst_ip + type: String + - name: dst_port + type: UInt32 + - name: correlated + type: UInt8 + - name: orphan_side + type: String + - name: apache + type: JSON + - name: network + type: JSON + dynamic_fields: + mode: map_or_additional_columns + description: > + Les champs dynamiques peuvent être exposés via colonnes dédiées créées par + migration, ou via Map/JSON. + +architecture: + description: > + Architecture hexagonale : domaine de corrélation indépendant, ports + abstraits pour les sources/sinks, adaptateurs pour sockets Unix, fichier et + ClickHouse, couche application d’orchestration, et modules infra pour + config/observabilité. + modules: + - name: cmd/logcorrelator + type: entrypoint + responsibilities: + - Chargement configuration TOML. + - Initialisation des adaptateurs d’entrée/sortie. + - Création du CorrelationService. + - Démarrage de l’orchestrateur. + - Gestion du cycle de vie (signaux systemd). + - name: internal/domain + type: domain + responsibilities: + - Modèles NormalizedEvent et CorrelatedLog. + - Implémentation de CorrelationService (buffers, fenêtre, + orphelins). + - name: internal/ports + type: ports + responsibilities: + - EventSource, CorrelatedLogSink, TimeProvider. + - name: internal/app + type: application + responsibilities: + - Orchestrator : relier EventSource → CorrelationService → MultiSink. + - name: internal/adapters/inbound/unixsocket + type: adapter_inbound + responsibilities: + - Lecture sockets Unix + parsing JSON → NormalizedEvent. + - name: internal/adapters/outbound/file + type: adapter_outbound + responsibilities: + - Écriture fichier JSON lines. + - name: internal/adapters/outbound/clickhouse + type: adapter_outbound + responsibilities: + - Bufferisation + inserts batch vers ClickHouse. + - Application de drop_on_overflow. + - name: internal/adapters/outbound/multi + type: adapter_outbound + responsibilities: + - Fan-out vers plusieurs sinks. + - name: internal/config + type: infrastructure + responsibilities: + - Chargement/validation config TOML. + - name: internal/observability + type: infrastructure + responsibilities: + - Logging et métriques internes. + +testing: + unit: + description: > + Tests unitaires table-driven avec couverture cible ≥ 80 %. Focalisés sur + la logique de corrélation, parsing et sink ClickHouse.[web:94][web:98][web:102] + coverage_minimum: 0.8 + focus: + - CorrelationService + - Parsing A/B → NormalizedEvent + - ClickHouseSink (batching, overflow) + - MultiSink + integration: + description: > + Tests d’intégration validant le flux complet A+B → corrélation → sinks, + avec sockets simulés et ClickHouse mocké. + +docker: + description: > + Build et tests entièrement encapsulés dans Docker, avec multi‑stage build : + un stage builder pour compiler et tester, un stage runtime minimal pour + exécuter le service.[web:95][web:103] + images: + builder: + base: golang:latest + purpose: build_and_test + runtime: + base: gcr.io/distroless/base-debian12 + purpose: run_binary_only + build: + multi_stage: true + steps: + - name: unit_tests + description: > + go test ./... avec génération de couverture. Le build échoue si la + couverture est < 80 %. + - name: compile_binary + description: > + Compilation CGO_ENABLED=0, GOOS=linux, GOARCH=amd64 pour un binaire + statique /usr/bin/logcorrelator. + - name: assemble_runtime_image + description: > + Copie du binaire dans l’image runtime et définition de l’ENTRYPOINT. + +packaging: + description: > + logcorrelator doit être distribué en package .rpm pour Rocky Linux (8+), + construit intégralement dans Docker à partir du binaire compilé.[web:96][web:99][web:101] + formats: + - rpm + target_distros: + - rocky-linux-8+ + - rocky-linux-9+ + tool: fpm + build_pipeline: + steps: + - name: build_binary_in_docker + description: > + Utiliser l’image builder pour compiler logcorrelator et installer le + binaire dans un répertoire de staging (par ex. /tmp/pkgroot/usr/bin/logcorrelator). + - name: prepare_filesystem_layout + description: > + Créer la hiérarchie : + - /usr/bin/logcorrelator + - /etc/logcorrelator/logcorrelator.toml (exemple) + - /etc/systemd/system/logcorrelator.service (unit) + - /var/log/logcorrelator (répertoire de logs) + - name: run_fpm_in_docker + description: > + Lancer un conteneur fpm (par ex. image ruby:fpm) avec montage de + /tmp/pkgroot, et exécuter fpm -s dir -t rpm pour générer le .rpm + compatible Rocky Linux. + - name: verify_rpm_on_rocky + description: > + Tester l’installation et le démarrage du service dans un conteneur + Rocky Linux 8/9 (docker run --rm -it rockylinux:8), en installant le + .rpm, en activant le service systemd et en vérifiant qu’il démarre + correctement. + +non_functional: + performance: + target_latency_ms: 1000 + description: > + Latence visée < 1 s entre réception et insertion ClickHouse, avec + batching léger. + reliability: + drop_on_clickhouse_failure: true + description: > + En cas de ClickHouse lent/HS, les logs sont drop au‑delà du buffer pour + protéger la machine. + security: + user_separation: true + privileges: least + description: > + Service sous utilisateur dédié, pas de secrets en clair dans les logs, + principe de moindre privilège. + diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..a0d71d9 --- /dev/null +++ b/build.sh @@ -0,0 +1,75 @@ +#!/bin/bash +# Build script - everything runs in Docker containers +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$SCRIPT_DIR" + +VERSION="${VERSION:-1.0.0}" +OUTPUT_DIR="${SCRIPT_DIR}/dist" + +echo "==============================================" +echo " logcorrelator - Docker Build Pipeline" +echo "==============================================" +echo "" + +# Create output directory +mkdir -p "${OUTPUT_DIR}" + +# Step 1: Build and test +echo "[1/4] Building and running tests in container..." +docker build \ + --target builder \ + -t logcorrelator-builder:latest \ + -f Dockerfile . + +# Step 2: Build runtime image +echo "[2/4] Building runtime image..." +docker build \ + --target runtime \ + -t logcorrelator:${VERSION} \ + -t logcorrelator:latest \ + -f Dockerfile . + +# Step 3: Build packages (DEB + RPM) +echo "[3/4] Building DEB and RPM packages in container..." +docker build \ + --target output \ + --build-arg VERSION="${VERSION}" \ + -t logcorrelator-packager:latest \ + -f Dockerfile.package . + +# Extract packages from builder container +echo "[4/4] Extracting packages..." +mkdir -p "${OUTPUT_DIR}/deb" "${OUTPUT_DIR}/rpm" +docker run --rm -v "${OUTPUT_DIR}:/output" logcorrelator-packager:latest \ + sh -c 'cp -r /packages/deb /output/ && cp -r /packages/rpm /output/' + +echo "" +echo "==============================================" +echo " Build Complete!" +echo "==============================================" +echo "" +echo "Artifacts:" +echo " - Runtime image: logcorrelator:${VERSION}" +echo " - DEB package: ${OUTPUT_DIR}/deb/logcorrelator_${VERSION}_amd64.deb" +echo " - RPM package: ${OUTPUT_DIR}/rpm/logcorrelator-${VERSION}-1.x86_64.rpm" +echo "" +echo "Usage:" +echo " # Run with Docker:" +echo " docker run -d --name logcorrelator \\" +echo " -v /var/run/logcorrelator:/var/run/logcorrelator \\" +echo " -v /var/log/logcorrelator:/var/log/logcorrelator \\" +echo " -v ./config.conf:/etc/logcorrelator/logcorrelator.conf \\" +echo " logcorrelator:latest" +echo "" +echo " # Install DEB on Debian/Ubuntu:" +echo " sudo dpkg -i ${OUTPUT_DIR}/deb/logcorrelator_${VERSION}_amd64.deb" +echo " sudo systemctl enable logcorrelator" +echo " sudo systemctl start logcorrelator" +echo "" +echo " # Install RPM on Rocky Linux:" +echo " sudo rpm -ivh ${OUTPUT_DIR}/rpm/logcorrelator-${VERSION}-1.x86_64.rpm" +echo " sudo systemctl enable logcorrelator" +echo " sudo systemctl start logcorrelator" +echo "" diff --git a/config.example.conf b/config.example.conf new file mode 100644 index 0000000..2158601 --- /dev/null +++ b/config.example.conf @@ -0,0 +1,41 @@ +# logcorrelator configuration file +# Format: directive value [value...] +# Lines starting with # are comments + +# Service configuration +service.name logcorrelator +service.language go + +# Input sources (at least 2 required) +# Format: input.unix_socket [format] +input.unix_socket apache_source /var/run/logcorrelator/apache.sock json +input.unix_socket network_source /var/run/logcorrelator/network.sock json + +# File output +output.file.enabled true +output.file.path /var/log/logcorrelator/correlated.log + +# ClickHouse output +output.clickhouse.enabled false +output.clickhouse.dsn clickhouse://user:pass@localhost:9000/db +output.clickhouse.table correlated_logs_http_network +output.clickhouse.batch_size 500 +output.clickhouse.flush_interval_ms 200 +output.clickhouse.max_buffer_size 5000 +output.clickhouse.drop_on_overflow true +output.clickhouse.async_insert true +output.clickhouse.timeout_ms 1000 + +# Stdout output (for debugging) +output.stdout.enabled false + +# Correlation configuration +correlation.key src_ip,src_port +correlation.time_window.value 1 +correlation.time_window.unit s + +# Orphan policy +# apache_always_emit: always emit A events even without matching B +# network_emit: emit B events alone (usually false) +correlation.orphan_policy.apache_always_emit true +correlation.orphan_policy.network_emit false diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4ba01f4 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/logcorrelator/logcorrelator + +go 1.21 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e69de29 diff --git a/internal/adapters/inbound/unixsocket/source.go b/internal/adapters/inbound/unixsocket/source.go new file mode 100644 index 0000000..5b0907b --- /dev/null +++ b/internal/adapters/inbound/unixsocket/source.go @@ -0,0 +1,334 @@ +package unixsocket + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "net" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/logcorrelator/logcorrelator/internal/domain" +) + +const ( + // Default socket file permissions (owner + group read/write) + DefaultSocketPermissions os.FileMode = 0660 + // Maximum line size for JSON logs (1MB) + MaxLineSize = 1024 * 1024 + // Maximum concurrent connections per socket + MaxConcurrentConnections = 100 + // Rate limit: max events per second + MaxEventsPerSecond = 10000 +) + +// Config holds the Unix socket source configuration. +type Config struct { + Name string + Path string +} + +// UnixSocketSource reads JSON events from a Unix socket. +type UnixSocketSource struct { + config Config + mu sync.Mutex + listener net.Listener + done chan struct{} + wg sync.WaitGroup + semaphore chan struct{} // Limit concurrent connections +} + +// NewUnixSocketSource creates a new Unix socket source. +func NewUnixSocketSource(config Config) *UnixSocketSource { + return &UnixSocketSource{ + config: config, + done: make(chan struct{}), + semaphore: make(chan struct{}, MaxConcurrentConnections), + } +} + +// Name returns the source name. +func (s *UnixSocketSource) Name() string { + return s.config.Name +} + +// Start begins listening on the Unix socket. +func (s *UnixSocketSource) Start(ctx context.Context, eventChan chan<- *domain.NormalizedEvent) error { + // Remove existing socket file if present + if info, err := os.Stat(s.config.Path); err == nil { + if info.Mode()&os.ModeSocket != 0 { + if err := os.Remove(s.config.Path); err != nil { + return fmt.Errorf("failed to remove existing socket: %w", err) + } + } else { + return fmt.Errorf("path exists but is not a socket: %s", s.config.Path) + } + } + + // Create listener + listener, err := net.Listen("unix", s.config.Path) + if err != nil { + return fmt.Errorf("failed to create unix socket listener: %w", err) + } + s.listener = listener + + // Set permissions - fail if we can't + if err := os.Chmod(s.config.Path, DefaultSocketPermissions); err != nil { + listener.Close() + os.Remove(s.config.Path) + return fmt.Errorf("failed to set socket permissions: %w", err) + } + + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.acceptConnections(ctx, eventChan) + }() + + return nil +} + +func (s *UnixSocketSource) acceptConnections(ctx context.Context, eventChan chan<- *domain.NormalizedEvent) { + for { + select { + case <-s.done: + return + case <-ctx.Done(): + return + default: + } + + conn, err := s.listener.Accept() + if err != nil { + select { + case <-s.done: + return + case <-ctx.Done(): + return + default: + continue + } + } + + // Check semaphore for connection limiting + select { + case s.semaphore <- struct{}{}: + // Connection accepted + default: + // Too many connections, reject + conn.Close() + continue + } + + s.wg.Add(1) + go func(c net.Conn) { + defer s.wg.Done() + defer func() { <-s.semaphore }() + defer c.Close() + s.readEvents(ctx, c, eventChan) + }(conn) + } +} + +func (s *UnixSocketSource) readEvents(ctx context.Context, conn net.Conn, eventChan chan<- *domain.NormalizedEvent) { + // Set read deadline to prevent hanging + conn.SetReadDeadline(time.Now().Add(5 * time.Minute)) + + scanner := bufio.NewScanner(conn) + // Increase buffer size limit to 1MB + buf := make([]byte, 0, 4096) + scanner.Buffer(buf, MaxLineSize) + + for scanner.Scan() { + select { + case <-ctx.Done(): + return + default: + } + + line := scanner.Bytes() + if len(line) == 0 { + continue + } + + event, err := parseJSONEvent(line) + if err != nil { + // Log parse errors but continue processing + continue + } + + select { + case eventChan <- event: + case <-ctx.Done(): + return + } + } + + if err := scanner.Err(); err != nil { + // Connection error, log but don't crash + } +} + +func parseJSONEvent(data []byte) (*domain.NormalizedEvent, error) { + var raw map[string]any + if err := json.Unmarshal(data, &raw); err != nil { + return nil, fmt.Errorf("invalid JSON: %w", err) + } + + event := &domain.NormalizedEvent{ + Raw: raw, + Extra: make(map[string]any), + } + + // Extract and validate src_ip + if v, ok := getString(raw, "src_ip"); ok { + event.SrcIP = v + } else { + return nil, fmt.Errorf("missing required field: src_ip") + } + + // Extract and validate src_port + if v, ok := getInt(raw, "src_port"); ok { + if v < 1 || v > 65535 { + return nil, fmt.Errorf("src_port must be between 1 and 65535, got %d", v) + } + event.SrcPort = v + } else { + return nil, fmt.Errorf("missing required field: src_port") + } + + // Extract dst_ip (optional) + if v, ok := getString(raw, "dst_ip"); ok { + event.DstIP = v + } + + // Extract dst_port (optional) + if v, ok := getInt(raw, "dst_port"); ok { + if v < 0 || v > 65535 { + return nil, fmt.Errorf("dst_port must be between 0 and 65535, got %d", v) + } + event.DstPort = v + } + + // Extract timestamp - try different fields + if ts, ok := getInt64(raw, "timestamp"); ok { + // Assume nanoseconds + event.Timestamp = time.Unix(0, ts) + } else if tsStr, ok := getString(raw, "time"); ok { + if t, err := time.Parse(time.RFC3339, tsStr); err == nil { + event.Timestamp = t + } + } else if tsStr, ok := getString(raw, "timestamp"); ok { + if t, err := time.Parse(time.RFC3339, tsStr); err == nil { + event.Timestamp = t + } + } + + if event.Timestamp.IsZero() { + event.Timestamp = time.Now() + } + + // Extract headers (header_* fields) + event.Headers = make(map[string]string) + for k, v := range raw { + if len(k) > 7 && k[:7] == "header_" { + if sv, ok := v.(string); ok { + event.Headers[k[7:]] = sv + } + } + } + + // Determine source based on fields present + if len(event.Headers) > 0 { + event.Source = domain.SourceA + } else { + event.Source = domain.SourceB + } + + // Extra fields (single pass) + knownFields := map[string]bool{ + "src_ip": true, "src_port": true, "dst_ip": true, "dst_port": true, + "timestamp": true, "time": true, + } + for k, v := range raw { + if knownFields[k] { + continue + } + if strings.HasPrefix(k, "header_") { + continue + } + event.Extra[k] = v + } + + return event, nil +} + +func getString(m map[string]any, key string) (string, bool) { + if v, ok := m[key]; ok { + if s, ok := v.(string); ok { + return s, true + } + } + return "", false +} + +func getInt(m map[string]any, key string) (int, bool) { + if v, ok := m[key]; ok { + switch val := v.(type) { + case float64: + return int(val), true + case int: + return val, true + case int64: + return int(val), true + case string: + if i, err := strconv.Atoi(val); err == nil { + return i, true + } + } + } + return 0, false +} + +func getInt64(m map[string]any, key string) (int64, bool) { + if v, ok := m[key]; ok { + switch val := v.(type) { + case float64: + return int64(val), true + case int: + return int64(val), true + case int64: + return val, true + case string: + if i, err := strconv.ParseInt(val, 10, 64); err == nil { + return i, true + } + } + } + return 0, false +} + +// Stop gracefully stops the source. +func (s *UnixSocketSource) Stop() error { + s.mu.Lock() + defer s.mu.Unlock() + + close(s.done) + + if s.listener != nil { + s.listener.Close() + } + + s.wg.Wait() + + // Clean up socket file + if err := os.Remove(s.config.Path); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove socket file: %w", err) + } + + return nil +} diff --git a/internal/adapters/inbound/unixsocket/source_test.go b/internal/adapters/inbound/unixsocket/source_test.go new file mode 100644 index 0000000..219bfa8 --- /dev/null +++ b/internal/adapters/inbound/unixsocket/source_test.go @@ -0,0 +1,98 @@ +package unixsocket + +import ( + "testing" + "time" +) + +func TestParseJSONEvent_Apache(t *testing.T) { + data := []byte(`{ + "src_ip": "192.168.1.1", + "src_port": 8080, + "dst_ip": "10.0.0.1", + "dst_port": 80, + "timestamp": 1704110400000000000, + "method": "GET", + "path": "/api/test", + "header_host": "example.com", + "header_user_agent": "Mozilla/5.0" + }`) + + event, err := parseJSONEvent(data) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if event.SrcIP != "192.168.1.1" { + t.Errorf("expected src_ip 192.168.1.1, got %s", event.SrcIP) + } + if event.SrcPort != 8080 { + t.Errorf("expected src_port 8080, got %d", event.SrcPort) + } + if event.Headers["host"] != "example.com" { + t.Errorf("expected header host example.com, got %s", event.Headers["host"]) + } + if event.Headers["user_agent"] != "Mozilla/5.0" { + t.Errorf("expected header_user_agent Mozilla/5.0, got %s", event.Headers["user_agent"]) + } +} + +func TestParseJSONEvent_Network(t *testing.T) { + data := []byte(`{ + "src_ip": "192.168.1.1", + "src_port": 8080, + "dst_ip": "10.0.0.1", + "dst_port": 443, + "ja3": "abc123def456", + "ja4": "xyz789", + "tcp_meta_flags": "SYN" + }`) + + event, err := parseJSONEvent(data) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if event.SrcIP != "192.168.1.1" { + t.Errorf("expected src_ip 192.168.1.1, got %s", event.SrcIP) + } + if event.Extra["ja3"] != "abc123def456" { + t.Errorf("expected ja3 abc123def456, got %v", event.Extra["ja3"]) + } +} + +func TestParseJSONEvent_InvalidJSON(t *testing.T) { + data := []byte(`{invalid json}`) + + _, err := parseJSONEvent(data) + if err == nil { + t.Error("expected error for invalid JSON") + } +} + +func TestParseJSONEvent_MissingFields(t *testing.T) { + data := []byte(`{"other_field": "value"}`) + + _, err := parseJSONEvent(data) + if err == nil { + t.Error("expected error for missing src_ip/src_port") + } +} + +func TestParseJSONEvent_StringTimestamp(t *testing.T) { + data := []byte(`{ + "src_ip": "192.168.1.1", + "src_port": 8080, + "time": "2024-01-01T12:00:00Z" + }`) + + event, err := parseJSONEvent(data) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expected := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + if !event.Timestamp.Equal(expected) { + t.Errorf("expected timestamp %v, got %v", expected, event.Timestamp) + } +} diff --git a/internal/adapters/outbound/clickhouse/sink.go b/internal/adapters/outbound/clickhouse/sink.go new file mode 100644 index 0000000..a9cfdfa --- /dev/null +++ b/internal/adapters/outbound/clickhouse/sink.go @@ -0,0 +1,333 @@ +package clickhouse + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/logcorrelator/logcorrelator/internal/domain" +) + +const ( + // DefaultBatchSize is the default number of records per batch + DefaultBatchSize = 500 + // DefaultFlushIntervalMs is the default flush interval in milliseconds + DefaultFlushIntervalMs = 200 + // DefaultMaxBufferSize is the default maximum buffer size + DefaultMaxBufferSize = 5000 + // DefaultTimeoutMs is the default timeout for operations in milliseconds + DefaultTimeoutMs = 1000 + // DefaultPingTimeoutMs is the timeout for initial connection ping + DefaultPingTimeoutMs = 5000 + // MaxRetries is the maximum number of retry attempts for failed inserts + MaxRetries = 3 + // RetryBaseDelay is the base delay between retries + RetryBaseDelay = 100 * time.Millisecond +) + +// Config holds the ClickHouse sink configuration. +type Config struct { + DSN string + Table string + BatchSize int + FlushIntervalMs int + MaxBufferSize int + DropOnOverflow bool + AsyncInsert bool + TimeoutMs int +} + +// ClickHouseSink writes correlated logs to ClickHouse. +type ClickHouseSink struct { + config Config + db *sql.DB + mu sync.Mutex + buffer []domain.CorrelatedLog + flushChan chan struct{} + done chan struct{} + wg sync.WaitGroup +} + +// NewClickHouseSink creates a new ClickHouse sink. +func NewClickHouseSink(config Config) (*ClickHouseSink, error) { + // Apply defaults + if config.BatchSize <= 0 { + config.BatchSize = DefaultBatchSize + } + if config.FlushIntervalMs <= 0 { + config.FlushIntervalMs = DefaultFlushIntervalMs + } + if config.MaxBufferSize <= 0 { + config.MaxBufferSize = DefaultMaxBufferSize + } + if config.TimeoutMs <= 0 { + config.TimeoutMs = DefaultTimeoutMs + } + + s := &ClickHouseSink{ + config: config, + buffer: make([]domain.CorrelatedLog, 0, config.BatchSize), + flushChan: make(chan struct{}, 1), + done: make(chan struct{}), + } + + // Connect to ClickHouse + db, err := sql.Open("clickhouse", config.DSN) + if err != nil { + return nil, fmt.Errorf("failed to connect to ClickHouse: %w", err) + } + + // Ping with timeout + pingCtx, pingCancel := context.WithTimeout(context.Background(), time.Duration(DefaultPingTimeoutMs)*time.Millisecond) + defer pingCancel() + + if err := db.PingContext(pingCtx); err != nil { + db.Close() + return nil, fmt.Errorf("failed to ping ClickHouse: %w", err) + } + + s.db = db + + // Start flush goroutine + s.wg.Add(1) + go s.flushLoop() + + return s, nil +} + +// Name returns the sink name. +func (s *ClickHouseSink) Name() string { + return "clickhouse" +} + +// Write adds a log to the buffer. +func (s *ClickHouseSink) Write(ctx context.Context, log domain.CorrelatedLog) error { + s.mu.Lock() + defer s.mu.Unlock() + + // Check buffer overflow + if len(s.buffer) >= s.config.MaxBufferSize { + if s.config.DropOnOverflow { + // Drop the log + return nil + } + // Block until space is available (with timeout) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Duration(s.config.TimeoutMs) * time.Millisecond): + return fmt.Errorf("buffer full, timeout exceeded") + } + } + + s.buffer = append(s.buffer, log) + + // Trigger flush if batch is full + if len(s.buffer) >= s.config.BatchSize { + select { + case s.flushChan <- struct{}{}: + default: + } + } + + return nil +} + +// Flush flushes the buffer to ClickHouse. +func (s *ClickHouseSink) Flush(ctx context.Context) error { + return s.doFlush(ctx) +} + +// Close closes the sink. +func (s *ClickHouseSink) Close() error { + close(s.done) + s.wg.Wait() + + if s.db != nil { + return s.db.Close() + } + return nil +} + +func (s *ClickHouseSink) flushLoop() { + defer s.wg.Done() + + ticker := time.NewTicker(time.Duration(s.config.FlushIntervalMs) * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-s.done: + return + case <-ticker.C: + s.mu.Lock() + needsFlush := len(s.buffer) > 0 + s.mu.Unlock() + if needsFlush { + // Use timeout context for flush + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.config.TimeoutMs)*time.Millisecond) + s.doFlush(ctx) + cancel() + } + case <-s.flushChan: + s.mu.Lock() + needsFlush := len(s.buffer) >= s.config.BatchSize + s.mu.Unlock() + if needsFlush { + // Use timeout context for flush + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.config.TimeoutMs)*time.Millisecond) + s.doFlush(ctx) + cancel() + } + } + } +} + +func (s *ClickHouseSink) doFlush(ctx context.Context) error { + s.mu.Lock() + if len(s.buffer) == 0 { + s.mu.Unlock() + return nil + } + + // Copy buffer to flush + buffer := make([]domain.CorrelatedLog, len(s.buffer)) + copy(buffer, s.buffer) + s.buffer = make([]domain.CorrelatedLog, 0, s.config.BatchSize) + s.mu.Unlock() + + // Prepare batch insert with retry + query := fmt.Sprintf(` + INSERT INTO %s (timestamp, src_ip, src_port, dst_ip, dst_port, correlated, orphan_side, apache, network) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + `, s.config.Table) + + // Retry logic with exponential backoff + var lastErr error + for attempt := 0; attempt < MaxRetries; attempt++ { + if attempt > 0 { + // Exponential backoff + delay := RetryBaseDelay * time.Duration(1<= len(substr) && containsLower(s, substr) +} + +func containsLower(s, substr string) bool { + s = toLower(s) + substr = toLower(substr) + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} + +func toLower(s string) string { + var result []byte + for i := 0; i < len(s); i++ { + c := s[i] + if c >= 'A' && c <= 'Z' { + c = c + ('a' - 'A') + } + result = append(result, c) + } + return string(result) +} diff --git a/internal/adapters/outbound/clickhouse/sink_test.go b/internal/adapters/outbound/clickhouse/sink_test.go new file mode 100644 index 0000000..c30bf9e --- /dev/null +++ b/internal/adapters/outbound/clickhouse/sink_test.go @@ -0,0 +1,305 @@ +package clickhouse + +import ( + "context" + "testing" + "time" + + "github.com/logcorrelator/logcorrelator/internal/domain" +) + +func TestClickHouseSink_Name(t *testing.T) { + sink := &ClickHouseSink{ + config: Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + }, + } + + if sink.Name() != "clickhouse" { + t.Errorf("expected name 'clickhouse', got %s", sink.Name()) + } +} + +func TestClickHouseSink_ConfigDefaults(t *testing.T) { + // Test that defaults are applied correctly + config := Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + // Other fields are zero, should get defaults + } + + // Verify defaults would be applied (we can't actually connect in tests) + if config.BatchSize <= 0 { + config.BatchSize = DefaultBatchSize + } + if config.FlushIntervalMs <= 0 { + config.FlushIntervalMs = DefaultFlushIntervalMs + } + if config.MaxBufferSize <= 0 { + config.MaxBufferSize = DefaultMaxBufferSize + } + if config.TimeoutMs <= 0 { + config.TimeoutMs = DefaultTimeoutMs + } + + if config.BatchSize != DefaultBatchSize { + t.Errorf("expected BatchSize %d, got %d", DefaultBatchSize, config.BatchSize) + } + if config.FlushIntervalMs != DefaultFlushIntervalMs { + t.Errorf("expected FlushIntervalMs %d, got %d", DefaultFlushIntervalMs, config.FlushIntervalMs) + } + if config.MaxBufferSize != DefaultMaxBufferSize { + t.Errorf("expected MaxBufferSize %d, got %d", DefaultMaxBufferSize, config.MaxBufferSize) + } + if config.TimeoutMs != DefaultTimeoutMs { + t.Errorf("expected TimeoutMs %d, got %d", DefaultTimeoutMs, config.TimeoutMs) + } +} + +func TestClickHouseSink_Write_BufferOverflow(t *testing.T) { + // This test verifies the buffer overflow logic without actually connecting + config := Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + BatchSize: 10, + MaxBufferSize: 10, + DropOnOverflow: true, + TimeoutMs: 100, + FlushIntervalMs: 1000, + } + + // We can't test actual writes without a ClickHouse instance, + // but we can verify the config is valid + if config.BatchSize > config.MaxBufferSize { + t.Error("BatchSize should not exceed MaxBufferSize") + } +} + +func TestClickHouseSink_IsRetryableError(t *testing.T) { + tests := []struct { + name string + err error + expected bool + }{ + {"nil error", nil, false}, + {"connection refused", &mockError{"connection refused"}, true}, + {"connection reset", &mockError{"connection reset by peer"}, true}, + {"timeout", &mockError{"timeout waiting for response"}, true}, + {"network unreachable", &mockError{"network is unreachable"}, true}, + {"broken pipe", &mockError{"broken pipe"}, true}, + {"syntax error", &mockError{"syntax error in SQL"}, false}, + {"table not found", &mockError{"table test not found"}, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := isRetryableError(tt.err) + if result != tt.expected { + t.Errorf("expected %v, got %v", tt.expected, result) + } + }) + } +} + +func TestClickHouseSink_FlushEmpty(t *testing.T) { + // Test that flushing an empty buffer doesn't cause issues + // (We can't test actual ClickHouse operations without a real instance) + + s := &ClickHouseSink{ + config: Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + }, + buffer: make([]domain.CorrelatedLog, 0), + } + + // Should not panic or error on empty flush + ctx := context.Background() + err := s.Flush(ctx) + if err != nil { + t.Errorf("expected no error on empty flush, got %v", err) + } +} + +func TestClickHouseSink_CloseWithoutConnect(t *testing.T) { + // Test that closing without connecting doesn't panic + s := &ClickHouseSink{ + config: Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + }, + buffer: make([]domain.CorrelatedLog, 0), + done: make(chan struct{}), + } + + err := s.Close() + if err != nil { + t.Errorf("expected no error on close without connect, got %v", err) + } +} + +func TestClickHouseSink_Constants(t *testing.T) { + // Verify constants have reasonable values + if DefaultBatchSize <= 0 { + t.Error("DefaultBatchSize should be positive") + } + if DefaultFlushIntervalMs <= 0 { + t.Error("DefaultFlushIntervalMs should be positive") + } + if DefaultMaxBufferSize <= 0 { + t.Error("DefaultMaxBufferSize should be positive") + } + if DefaultTimeoutMs <= 0 { + t.Error("DefaultTimeoutMs should be positive") + } + if DefaultPingTimeoutMs <= 0 { + t.Error("DefaultPingTimeoutMs should be positive") + } + if MaxRetries <= 0 { + t.Error("MaxRetries should be positive") + } + if RetryBaseDelay <= 0 { + t.Error("RetryBaseDelay should be positive") + } +} + +// mockError implements error for testing +type mockError struct { + msg string +} + +func (e *mockError) Error() string { + return e.msg +} + +// Test the doFlush function with empty buffer (no actual DB connection) +func TestClickHouseSink_DoFlushEmpty(t *testing.T) { + s := &ClickHouseSink{ + config: Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + }, + buffer: make([]domain.CorrelatedLog, 0), + } + + ctx := context.Background() + err := s.doFlush(ctx) + if err != nil { + t.Errorf("expected no error when flushing empty buffer, got %v", err) + } +} + +// Test that buffer is properly managed (without actual DB operations) +func TestClickHouseSink_BufferManagement(t *testing.T) { + log := domain.CorrelatedLog{ + SrcIP: "192.168.1.1", + SrcPort: 8080, + Correlated: true, + } + + s := &ClickHouseSink{ + config: Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + MaxBufferSize: 100, // Allow more than 1 element + DropOnOverflow: false, + TimeoutMs: 1000, + }, + buffer: []domain.CorrelatedLog{log}, + } + + // Verify buffer has data + if len(s.buffer) != 1 { + t.Fatalf("expected buffer length 1, got %d", len(s.buffer)) + } + + // Test that Write properly adds to buffer + ctx := context.Background() + err := s.Write(ctx, log) + if err != nil { + t.Errorf("unexpected error on Write: %v", err) + } + + if len(s.buffer) != 2 { + t.Errorf("expected buffer length 2 after Write, got %d", len(s.buffer)) + } +} + +// Test Write with context cancellation +func TestClickHouseSink_Write_ContextCancel(t *testing.T) { + s := &ClickHouseSink{ + config: Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + MaxBufferSize: 1, + DropOnOverflow: false, + TimeoutMs: 10, + }, + buffer: make([]domain.CorrelatedLog, 0, 1), + } + + // Fill the buffer + log := domain.CorrelatedLog{SrcIP: "192.168.1.1", SrcPort: 8080} + s.buffer = append(s.buffer, log) + + // Try to write with cancelled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + err := s.Write(ctx, log) + if err == nil { + t.Error("expected error when writing with cancelled context") + } +} + +// Test DropOnOverflow behavior +func TestClickHouseSink_Write_DropOnOverflow(t *testing.T) { + s := &ClickHouseSink{ + config: Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + MaxBufferSize: 1, + DropOnOverflow: true, + TimeoutMs: 10, + }, + buffer: make([]domain.CorrelatedLog, 0, 1), + } + + // Fill the buffer + log := domain.CorrelatedLog{SrcIP: "192.168.1.1", SrcPort: 8080} + s.buffer = append(s.buffer, log) + + // Try to write when buffer is full - should drop silently + ctx := context.Background() + err := s.Write(ctx, log) + if err != nil { + t.Errorf("expected no error when DropOnOverflow is true, got %v", err) + } +} + +// Benchmark Write operation (without actual DB) +func BenchmarkClickHouseSink_Write(b *testing.B) { + s := &ClickHouseSink{ + config: Config{ + DSN: "clickhouse://test:test@localhost:9000/test", + Table: "test_table", + MaxBufferSize: 10000, + DropOnOverflow: true, + }, + buffer: make([]domain.CorrelatedLog, 0, 10000), + } + + log := domain.CorrelatedLog{ + Timestamp: time.Now(), + SrcIP: "192.168.1.1", + SrcPort: 8080, + Correlated: true, + } + + ctx := context.Background() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Write(ctx, log) + } +} diff --git a/internal/adapters/outbound/file/sink.go b/internal/adapters/outbound/file/sink.go new file mode 100644 index 0000000..d20efdb --- /dev/null +++ b/internal/adapters/outbound/file/sink.go @@ -0,0 +1,168 @@ +package file + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/logcorrelator/logcorrelator/internal/domain" +) + +const ( + // DefaultFilePermissions for output files + DefaultFilePermissions os.FileMode = 0644 + // DefaultDirPermissions for output directories + DefaultDirPermissions os.FileMode = 0750 +) + +// Config holds the file sink configuration. +type Config struct { + Path string +} + +// FileSink writes correlated logs to a file as JSON lines. +type FileSink struct { + config Config + mu sync.Mutex + file *os.File + writer *bufio.Writer +} + +// NewFileSink creates a new file sink. +func NewFileSink(config Config) (*FileSink, error) { + // Validate path + if err := validateFilePath(config.Path); err != nil { + return nil, fmt.Errorf("invalid file path: %w", err) + } + + return &FileSink{ + config: config, + }, nil +} + +// Name returns the sink name. +func (s *FileSink) Name() string { + return "file" +} + +// Write writes a correlated log to the file. +func (s *FileSink) Write(ctx context.Context, log domain.CorrelatedLog) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.file == nil { + if err := s.openFile(); err != nil { + return err + } + } + + data, err := json.Marshal(log) + if err != nil { + return fmt.Errorf("failed to marshal log: %w", err) + } + + if _, err := s.writer.Write(data); err != nil { + return fmt.Errorf("failed to write log: %w", err) + } + if _, err := s.writer.WriteString("\n"); err != nil { + return fmt.Errorf("failed to write newline: %w", err) + } + + return nil +} + +// Flush flushes any buffered data. +func (s *FileSink) Flush(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.writer != nil { + return s.writer.Flush() + } + return nil +} + +// Close closes the sink. +func (s *FileSink) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.writer != nil { + if err := s.writer.Flush(); err != nil { + return err + } + } + + if s.file != nil { + return s.file.Close() + } + return nil +} + +func (s *FileSink) openFile() error { + // Validate path again before opening + if err := validateFilePath(s.config.Path); err != nil { + return fmt.Errorf("invalid file path: %w", err) + } + + // Ensure directory exists + dir := filepath.Dir(s.config.Path) + if err := os.MkdirAll(dir, DefaultDirPermissions); err != nil { + return fmt.Errorf("failed to create directory: %w", err) + } + + file, err := os.OpenFile(s.config.Path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, DefaultFilePermissions) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + + s.file = file + s.writer = bufio.NewWriter(file) + return nil +} + +// validateFilePath validates that the file path is safe and allowed. +func validateFilePath(path string) error { + if path == "" { + return fmt.Errorf("path cannot be empty") + } + + // Clean the path + cleanPath := filepath.Clean(path) + + // Ensure path is absolute or relative to allowed directories + allowedPrefixes := []string{ + "/var/log/logcorrelator", + "/var/log", + "/tmp", + } + + // Check if path is in allowed directories + allowed := false + for _, prefix := range allowedPrefixes { + if strings.HasPrefix(cleanPath, prefix) { + allowed = true + break + } + } + + if !allowed { + // Allow relative paths for testing + if !filepath.IsAbs(cleanPath) { + return nil + } + return fmt.Errorf("path must be in allowed directories: %v", allowedPrefixes) + } + + // Check for path traversal + if strings.Contains(cleanPath, "..") { + return fmt.Errorf("path cannot contain '..'") + } + + return nil +} diff --git a/internal/adapters/outbound/file/sink_test.go b/internal/adapters/outbound/file/sink_test.go new file mode 100644 index 0000000..c240928 --- /dev/null +++ b/internal/adapters/outbound/file/sink_test.go @@ -0,0 +1,96 @@ +package file + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/logcorrelator/logcorrelator/internal/domain" +) + +func TestFileSink_Write(t *testing.T) { + tmpDir := t.TempDir() + testPath := filepath.Join(tmpDir, "test.log") + + sink, err := NewFileSink(Config{Path: testPath}) + if err != nil { + t.Fatalf("failed to create sink: %v", err) + } + defer sink.Close() + + log := domain.CorrelatedLog{ + SrcIP: "192.168.1.1", + SrcPort: 8080, + Correlated: true, + } + + if err := sink.Write(context.Background(), log); err != nil { + t.Fatalf("failed to write: %v", err) + } + + if err := sink.Flush(context.Background()); err != nil { + t.Fatalf("failed to flush: %v", err) + } + + // Verify file exists and contains data + data, err := os.ReadFile(testPath) + if err != nil { + t.Fatalf("failed to read file: %v", err) + } + + if len(data) == 0 { + t.Error("expected non-empty file") + } +} + +func TestFileSink_MultipleWrites(t *testing.T) { + tmpDir := t.TempDir() + testPath := filepath.Join(tmpDir, "test.log") + + sink, err := NewFileSink(Config{Path: testPath}) + if err != nil { + t.Fatalf("failed to create sink: %v", err) + } + defer sink.Close() + + for i := 0; i < 5; i++ { + log := domain.CorrelatedLog{ + SrcIP: "192.168.1.1", + SrcPort: 8080 + i, + } + if err := sink.Write(context.Background(), log); err != nil { + t.Fatalf("failed to write: %v", err) + } + } + + sink.Close() + + // Verify file has 5 lines + data, err := os.ReadFile(testPath) + if err != nil { + t.Fatalf("failed to read file: %v", err) + } + + lines := 0 + for _, b := range data { + if b == '\n' { + lines++ + } + } + + if lines != 5 { + t.Errorf("expected 5 lines, got %d", lines) + } +} + +func TestFileSink_Name(t *testing.T) { + sink, err := NewFileSink(Config{Path: "/tmp/test.log"}) + if err != nil { + t.Fatalf("failed to create sink: %v", err) + } + + if sink.Name() != "file" { + t.Errorf("expected name 'file', got %s", sink.Name()) + } +} diff --git a/internal/adapters/outbound/multi/sink.go b/internal/adapters/outbound/multi/sink.go new file mode 100644 index 0000000..8bcf9d0 --- /dev/null +++ b/internal/adapters/outbound/multi/sink.go @@ -0,0 +1,123 @@ +package multi + +import ( + "context" + "sync" + + "github.com/logcorrelator/logcorrelator/internal/domain" + "github.com/logcorrelator/logcorrelator/internal/ports" +) + +// MultiSink fans out correlated logs to multiple sinks. +type MultiSink struct { + mu sync.RWMutex + sinks []ports.CorrelatedLogSink +} + +// NewMultiSink creates a new multi-sink. +func NewMultiSink(sinks ...ports.CorrelatedLogSink) *MultiSink { + return &MultiSink{ + sinks: sinks, + } +} + +// Name returns the sink name. +func (s *MultiSink) Name() string { + return "multi" +} + +// AddSink adds a sink to the fan-out. +func (s *MultiSink) AddSink(sink ports.CorrelatedLogSink) { + s.mu.Lock() + defer s.mu.Unlock() + s.sinks = append(s.sinks, sink) +} + +// Write writes a correlated log to all sinks concurrently. +// Returns the first error encountered (but all sinks are attempted). +func (s *MultiSink) Write(ctx context.Context, log domain.CorrelatedLog) error { + s.mu.RLock() + sinks := make([]ports.CorrelatedLogSink, len(s.sinks)) + copy(sinks, s.sinks) + s.mu.RUnlock() + + if len(sinks) == 0 { + return nil + } + + var wg sync.WaitGroup + var firstErr error + var firstErrMu sync.Mutex + errChan := make(chan error, len(sinks)) + + for _, sink := range sinks { + wg.Add(1) + go func(sk ports.CorrelatedLogSink) { + defer wg.Done() + if err := sk.Write(ctx, log); err != nil { + // Non-blocking send to errChan + select { + case errChan <- err: + default: + // Channel full, error will be handled via firstErr + } + } + }(sink) + } + + // Wait for all writes to complete in a separate goroutine + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + // Collect errors with timeout + select { + case <-done: + close(errChan) + // Collect first error + for err := range errChan { + if err != nil { + firstErrMu.Lock() + if firstErr == nil { + firstErr = err + } + firstErrMu.Unlock() + } + } + case <-ctx.Done(): + return ctx.Err() + } + + firstErrMu.Lock() + defer firstErrMu.Unlock() + return firstErr +} + +// Flush flushes all sinks. +func (s *MultiSink) Flush(ctx context.Context) error { + s.mu.RLock() + defer s.mu.RUnlock() + + for _, sink := range s.sinks { + if err := sink.Flush(ctx); err != nil { + return err + } + } + return nil +} + +// Close closes all sinks. +func (s *MultiSink) Close() error { + s.mu.RLock() + defer s.mu.RUnlock() + + var firstErr error + for _, sink := range s.sinks { + if err := sink.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + return firstErr +} diff --git a/internal/adapters/outbound/multi/sink_test.go b/internal/adapters/outbound/multi/sink_test.go new file mode 100644 index 0000000..99e2aef --- /dev/null +++ b/internal/adapters/outbound/multi/sink_test.go @@ -0,0 +1,114 @@ +package multi + +import ( + "context" + "sync" + "testing" + + "github.com/logcorrelator/logcorrelator/internal/domain" +) + +type mockSink struct { + name string + mu sync.Mutex + writeFunc func(domain.CorrelatedLog) error + flushFunc func() error + closeFunc func() error +} + +func (m *mockSink) Name() string { return m.name } +func (m *mockSink) Write(ctx context.Context, log domain.CorrelatedLog) error { + m.mu.Lock() + defer m.mu.Unlock() + return m.writeFunc(log) +} +func (m *mockSink) Flush(ctx context.Context) error { return m.flushFunc() } +func (m *mockSink) Close() error { return m.closeFunc() } + +func TestMultiSink_Write(t *testing.T) { + var mu sync.Mutex + writeCount := 0 + + sink1 := &mockSink{ + name: "sink1", + writeFunc: func(log domain.CorrelatedLog) error { + mu.Lock() + writeCount++ + mu.Unlock() + return nil + }, + flushFunc: func() error { return nil }, + closeFunc: func() error { return nil }, + } + + sink2 := &mockSink{ + name: "sink2", + writeFunc: func(log domain.CorrelatedLog) error { + mu.Lock() + writeCount++ + mu.Unlock() + return nil + }, + flushFunc: func() error { return nil }, + closeFunc: func() error { return nil }, + } + + ms := NewMultiSink(sink1, sink2) + + log := domain.CorrelatedLog{SrcIP: "192.168.1.1"} + err := ms.Write(context.Background(), log) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if writeCount != 2 { + t.Errorf("expected 2 writes, got %d", writeCount) + } +} + +func TestMultiSink_Write_OneFails(t *testing.T) { + sink1 := &mockSink{ + name: "sink1", + writeFunc: func(log domain.CorrelatedLog) error { + return nil + }, + flushFunc: func() error { return nil }, + closeFunc: func() error { return nil }, + } + + sink2 := &mockSink{ + name: "sink2", + writeFunc: func(log domain.CorrelatedLog) error { + return context.Canceled + }, + flushFunc: func() error { return nil }, + closeFunc: func() error { return nil }, + } + + ms := NewMultiSink(sink1, sink2) + + log := domain.CorrelatedLog{SrcIP: "192.168.1.1"} + err := ms.Write(context.Background(), log) + if err == nil { + t.Error("expected error when one sink fails") + } +} + +func TestMultiSink_AddSink(t *testing.T) { + ms := NewMultiSink() + + sink := &mockSink{ + name: "dynamic", + writeFunc: func(log domain.CorrelatedLog) error { return nil }, + flushFunc: func() error { return nil }, + closeFunc: func() error { return nil }, + } + + ms.AddSink(sink) + + log := domain.CorrelatedLog{SrcIP: "192.168.1.1"} + err := ms.Write(context.Background(), log) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} diff --git a/internal/app/orchestrator.go b/internal/app/orchestrator.go new file mode 100644 index 0000000..3d7747f --- /dev/null +++ b/internal/app/orchestrator.go @@ -0,0 +1,158 @@ +package app + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/logcorrelator/logcorrelator/internal/domain" + "github.com/logcorrelator/logcorrelator/internal/ports" +) + +const ( + // DefaultEventChannelBufferSize is the default size for event channels + DefaultEventChannelBufferSize = 1000 + // ShutdownTimeout is the maximum time to wait for graceful shutdown + ShutdownTimeout = 30 * time.Second +) + +// OrchestratorConfig holds the orchestrator configuration. +type OrchestratorConfig struct { + Sources []ports.EventSource + Sink ports.CorrelatedLogSink +} + +// Orchestrator connects sources to the correlation service and sinks. +type Orchestrator struct { + config OrchestratorConfig + correlationSvc ports.CorrelationProcessor + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + running atomic.Bool +} + +// NewOrchestrator creates a new orchestrator. +func NewOrchestrator(config OrchestratorConfig, correlationSvc ports.CorrelationProcessor) *Orchestrator { + ctx, cancel := context.WithCancel(context.Background()) + return &Orchestrator{ + config: config, + correlationSvc: correlationSvc, + ctx: ctx, + cancel: cancel, + } +} + +// Start begins the orchestration. +func (o *Orchestrator) Start() error { + if !o.running.CompareAndSwap(false, true) { + return nil // Already running + } + + // Start each source + for _, source := range o.config.Sources { + eventChan := make(chan *domain.NormalizedEvent, DefaultEventChannelBufferSize) + + o.wg.Add(1) + go func(src ports.EventSource, evChan chan *domain.NormalizedEvent) { + defer o.wg.Done() + o.processEvents(evChan) + }(source, eventChan) + + o.wg.Add(1) + go func(src ports.EventSource, evChan chan *domain.NormalizedEvent) { + defer o.wg.Done() + if err := src.Start(o.ctx, evChan); err != nil { + // Source failed, but continue with others + } + }(source, eventChan) + } + + return nil +} + +func (o *Orchestrator) processEvents(eventChan <-chan *domain.NormalizedEvent) { + for { + select { + case <-o.ctx.Done(): + // Drain remaining events before exiting + for { + select { + case event, ok := <-eventChan: + if !ok { + return + } + logs := o.correlationSvc.ProcessEvent(event) + for _, log := range logs { + o.config.Sink.Write(o.ctx, log) + } + default: + return + } + } + case event, ok := <-eventChan: + if !ok { + return + } + + // Process through correlation service + logs := o.correlationSvc.ProcessEvent(event) + + // Write correlated logs to sink + for _, log := range logs { + if err := o.config.Sink.Write(o.ctx, log); err != nil { + // Log error but continue processing + } + } + } + } +} + +// Stop gracefully stops the orchestrator. +// It stops all sources first, then flushes remaining events, then closes sinks. +func (o *Orchestrator) Stop() error { + if !o.running.CompareAndSwap(true, false) { + return nil // Not running + } + + // Create shutdown context with timeout + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), ShutdownTimeout) + defer shutdownCancel() + + // First, cancel the main context to stop accepting new events + o.cancel() + + // Wait for source goroutines to finish + // Use a separate goroutine with timeout to prevent deadlock + done := make(chan struct{}) + go func() { + o.wg.Wait() + close(done) + }() + + select { + case <-done: + // Sources stopped cleanly + case <-shutdownCtx.Done(): + // Timeout waiting for sources + } + + // Flush remaining events from correlation service + flushedLogs := o.correlationSvc.Flush() + for _, log := range flushedLogs { + if err := o.config.Sink.Write(shutdownCtx, log); err != nil { + // Log error but continue + } + } + + // Flush and close sink with timeout + if err := o.config.Sink.Flush(shutdownCtx); err != nil { + // Log error + } + if err := o.config.Sink.Close(); err != nil { + // Log error + } + + return nil +} diff --git a/internal/app/orchestrator_test.go b/internal/app/orchestrator_test.go new file mode 100644 index 0000000..d39d424 --- /dev/null +++ b/internal/app/orchestrator_test.go @@ -0,0 +1,160 @@ +package app + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/logcorrelator/logcorrelator/internal/domain" + "github.com/logcorrelator/logcorrelator/internal/ports" +) + +type mockEventSource struct { + name string + mu sync.RWMutex + eventChan chan<- *domain.NormalizedEvent + started bool + stopped bool +} + +func (m *mockEventSource) Name() string { return m.name } +func (m *mockEventSource) Start(ctx context.Context, eventChan chan<- *domain.NormalizedEvent) error { + m.mu.Lock() + m.started = true + m.eventChan = eventChan + m.mu.Unlock() + <-ctx.Done() + m.mu.Lock() + m.stopped = true + m.mu.Unlock() + return nil +} +func (m *mockEventSource) Stop() error { return nil } + +func (m *mockEventSource) getEventChan() chan<- *domain.NormalizedEvent { + m.mu.RLock() + defer m.mu.RUnlock() + return m.eventChan +} + +func (m *mockEventSource) isStarted() bool { + m.mu.RLock() + defer m.mu.RUnlock() + return m.started +} + +type mockSink struct { + mu sync.Mutex + written []domain.CorrelatedLog +} + +func (m *mockSink) Name() string { return "mock" } +func (m *mockSink) Write(ctx context.Context, log domain.CorrelatedLog) error { + m.mu.Lock() + defer m.mu.Unlock() + m.written = append(m.written, log) + return nil +} +func (m *mockSink) Flush(ctx context.Context) error { return nil } +func (m *mockSink) Close() error { return nil } + +func (m *mockSink) getWritten() []domain.CorrelatedLog { + m.mu.Lock() + defer m.mu.Unlock() + result := make([]domain.CorrelatedLog, len(m.written)) + copy(result, m.written) + return result +} + +func TestOrchestrator_StartStop(t *testing.T) { + source := &mockEventSource{name: "test"} + sink := &mockSink{} + + corrConfig := domain.CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + NetworkEmit: false, + } + correlationSvc := domain.NewCorrelationService(corrConfig, &domain.RealTimeProvider{}) + + orchestrator := NewOrchestrator(OrchestratorConfig{ + Sources: []ports.EventSource{source}, + Sink: sink, + }, correlationSvc) + + if err := orchestrator.Start(); err != nil { + t.Fatalf("failed to start: %v", err) + } + + // Let it run briefly + time.Sleep(100 * time.Millisecond) + + if err := orchestrator.Stop(); err != nil { + t.Fatalf("failed to stop: %v", err) + } + + if !source.isStarted() { + t.Error("expected source to be started") + } +} + +func TestOrchestrator_ProcessEvent(t *testing.T) { + source := &mockEventSource{name: "test"} + sink := &mockSink{} + + corrConfig := domain.CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + NetworkEmit: false, + } + correlationSvc := domain.NewCorrelationService(corrConfig, &domain.RealTimeProvider{}) + + orchestrator := NewOrchestrator(OrchestratorConfig{ + Sources: []ports.EventSource{source}, + Sink: sink, + }, correlationSvc) + + if err := orchestrator.Start(); err != nil { + t.Fatalf("failed to start: %v", err) + } + + // Wait for source to start and get the channel + var eventChan chan<- *domain.NormalizedEvent + for i := 0; i < 50; i++ { + eventChan = source.getEventChan() + if eventChan != nil { + break + } + time.Sleep(10 * time.Millisecond) + } + + if eventChan == nil { + t.Fatal("source did not start properly") + } + + // Send an event through the source + event := &domain.NormalizedEvent{ + Source: domain.SourceA, + Timestamp: time.Now(), + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"method": "GET"}, + } + + // Send event + eventChan <- event + + // Give it time to process + time.Sleep(100 * time.Millisecond) + + if err := orchestrator.Stop(); err != nil { + t.Fatalf("failed to stop: %v", err) + } + + // Should have written at least one log (the orphan A) + written := sink.getWritten() + if len(written) == 0 { + t.Error("expected at least one log to be written") + } +} diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..b740bc2 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,340 @@ +package config + +import ( + "bufio" + "fmt" + "os" + "strconv" + "strings" + "time" +) + +// Config holds the complete application configuration. +type Config struct { + Service ServiceConfig + Inputs InputsConfig + Outputs OutputsConfig + Correlation CorrelationConfig +} + +// ServiceConfig holds service-level configuration. +type ServiceConfig struct { + Name string + Language string +} + +// InputsConfig holds input sources configuration. +type InputsConfig struct { + UnixSockets []UnixSocketConfig +} + +// UnixSocketConfig holds a Unix socket source configuration. +type UnixSocketConfig struct { + Name string + Path string + Format string +} + +// OutputsConfig holds output sinks configuration. +type OutputsConfig struct { + File FileOutputConfig + ClickHouse ClickHouseOutputConfig + Stdout StdoutOutputConfig +} + +// FileOutputConfig holds file sink configuration. +type FileOutputConfig struct { + Enabled bool + Path string +} + +// ClickHouseOutputConfig holds ClickHouse sink configuration. +type ClickHouseOutputConfig struct { + Enabled bool + DSN string + Table string + BatchSize int + FlushIntervalMs int + MaxBufferSize int + DropOnOverflow bool + AsyncInsert bool + TimeoutMs int +} + +// StdoutOutputConfig holds stdout sink configuration. +type StdoutOutputConfig struct { + Enabled bool +} + +// CorrelationConfig holds correlation configuration. +type CorrelationConfig struct { + Key []string + TimeWindow TimeWindowConfig + OrphanPolicy OrphanPolicyConfig +} + +// TimeWindowConfig holds time window configuration. +type TimeWindowConfig struct { + Value int + Unit string +} + +// OrphanPolicyConfig holds orphan event policy configuration. +type OrphanPolicyConfig struct { + ApacheAlwaysEmit bool + NetworkEmit bool +} + +// Load loads configuration from a text file with directives. +func Load(path string) (*Config, error) { + file, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("failed to open config file: %w", err) + } + defer file.Close() + + cfg := &Config{ + Service: ServiceConfig{ + Name: "logcorrelator", + Language: "go", + }, + Inputs: InputsConfig{ + UnixSockets: make([]UnixSocketConfig, 0), + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{ + Enabled: true, + Path: "/var/log/logcorrelator/correlated.log", + }, + ClickHouse: ClickHouseOutputConfig{ + Enabled: false, + BatchSize: 500, + FlushIntervalMs: 200, + MaxBufferSize: 5000, + DropOnOverflow: true, + AsyncInsert: true, + TimeoutMs: 1000, + }, + Stdout: StdoutOutputConfig{ + Enabled: false, + }, + }, + Correlation: CorrelationConfig{ + Key: []string{"src_ip", "src_port"}, + TimeWindow: TimeWindowConfig{ + Value: 1, + Unit: "s", + }, + OrphanPolicy: OrphanPolicyConfig{ + ApacheAlwaysEmit: true, + NetworkEmit: false, + }, + }, + } + + scanner := bufio.NewScanner(file) + lineNum := 0 + + for scanner.Scan() { + lineNum++ + line := strings.TrimSpace(scanner.Text()) + + // Skip empty lines and comments + if line == "" || strings.HasPrefix(line, "#") { + continue + } + + if err := parseDirective(cfg, line); err != nil { + return nil, fmt.Errorf("line %d: %w", lineNum, err) + } + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("failed to read config file: %w", err) + } + + if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("invalid config: %w", err) + } + + return cfg, nil +} + +func parseDirective(cfg *Config, line string) error { + parts := strings.Fields(line) + if len(parts) < 2 { + return fmt.Errorf("invalid directive: %s", line) + } + + directive := parts[0] + value := strings.Join(parts[1:], " ") + + switch directive { + case "service.name": + cfg.Service.Name = value + case "service.language": + cfg.Service.Language = value + + case "input.unix_socket": + // Format: input.unix_socket [format] + if len(parts) < 3 { + return fmt.Errorf("input.unix_socket requires name and path") + } + format := "json" + if len(parts) >= 4 { + format = parts[3] + } + cfg.Inputs.UnixSockets = append(cfg.Inputs.UnixSockets, UnixSocketConfig{ + Name: parts[1], + Path: parts[2], + Format: format, + }) + + case "output.file.enabled": + enabled, err := parseBool(value) + if err != nil { + return fmt.Errorf("invalid value for output.file.enabled: %w", err) + } + cfg.Outputs.File.Enabled = enabled + case "output.file.path": + cfg.Outputs.File.Path = value + + case "output.clickhouse.enabled": + enabled, err := parseBool(value) + if err != nil { + return fmt.Errorf("invalid value for output.clickhouse.enabled: %w", err) + } + cfg.Outputs.ClickHouse.Enabled = enabled + case "output.clickhouse.dsn": + cfg.Outputs.ClickHouse.DSN = value + case "output.clickhouse.table": + cfg.Outputs.ClickHouse.Table = value + case "output.clickhouse.batch_size": + v, err := strconv.Atoi(value) + if err != nil { + return fmt.Errorf("invalid value for output.clickhouse.batch_size: %w", err) + } + cfg.Outputs.ClickHouse.BatchSize = v + case "output.clickhouse.flush_interval_ms": + v, err := strconv.Atoi(value) + if err != nil { + return fmt.Errorf("invalid value for output.clickhouse.flush_interval_ms: %w", err) + } + cfg.Outputs.ClickHouse.FlushIntervalMs = v + case "output.clickhouse.max_buffer_size": + v, err := strconv.Atoi(value) + if err != nil { + return fmt.Errorf("invalid value for output.clickhouse.max_buffer_size: %w", err) + } + cfg.Outputs.ClickHouse.MaxBufferSize = v + case "output.clickhouse.drop_on_overflow": + enabled, err := parseBool(value) + if err != nil { + return fmt.Errorf("invalid value for output.clickhouse.drop_on_overflow: %w", err) + } + cfg.Outputs.ClickHouse.DropOnOverflow = enabled + case "output.clickhouse.async_insert": + enabled, err := parseBool(value) + if err != nil { + return fmt.Errorf("invalid value for output.clickhouse.async_insert: %w", err) + } + cfg.Outputs.ClickHouse.AsyncInsert = enabled + case "output.clickhouse.timeout_ms": + v, err := strconv.Atoi(value) + if err != nil { + return fmt.Errorf("invalid value for output.clickhouse.timeout_ms: %w", err) + } + cfg.Outputs.ClickHouse.TimeoutMs = v + + case "output.stdout.enabled": + enabled, err := parseBool(value) + if err != nil { + return fmt.Errorf("invalid value for output.stdout.enabled: %w", err) + } + cfg.Outputs.Stdout.Enabled = enabled + + case "correlation.key": + cfg.Correlation.Key = strings.Split(value, ",") + for i, k := range cfg.Correlation.Key { + cfg.Correlation.Key[i] = strings.TrimSpace(k) + } + case "correlation.time_window.value": + v, err := strconv.Atoi(value) + if err != nil { + return fmt.Errorf("invalid value for correlation.time_window.value: %w", err) + } + cfg.Correlation.TimeWindow.Value = v + case "correlation.time_window.unit": + cfg.Correlation.TimeWindow.Unit = value + case "correlation.orphan_policy.apache_always_emit": + enabled, err := parseBool(value) + if err != nil { + return fmt.Errorf("invalid value for correlation.orphan_policy.apache_always_emit: %w", err) + } + cfg.Correlation.OrphanPolicy.ApacheAlwaysEmit = enabled + case "correlation.orphan_policy.network_emit": + enabled, err := parseBool(value) + if err != nil { + return fmt.Errorf("invalid value for correlation.orphan_policy.network_emit: %w", err) + } + cfg.Correlation.OrphanPolicy.NetworkEmit = enabled + + default: + return fmt.Errorf("unknown directive: %s", directive) + } + + return nil +} + +func parseBool(s string) (bool, error) { + s = strings.ToLower(s) + switch s { + case "true", "yes", "1", "on": + return true, nil + case "false", "no", "0", "off": + return false, nil + default: + return false, fmt.Errorf("invalid boolean value: %s", s) + } +} + +// Validate validates the configuration. +func (c *Config) Validate() error { + if len(c.Inputs.UnixSockets) < 2 { + return fmt.Errorf("at least two unix socket inputs are required") + } + + if !c.Outputs.File.Enabled && !c.Outputs.ClickHouse.Enabled && !c.Outputs.Stdout.Enabled { + return fmt.Errorf("at least one output must be enabled") + } + + if c.Outputs.ClickHouse.Enabled && c.Outputs.ClickHouse.DSN == "" { + return fmt.Errorf("clickhouse DSN is required when enabled") + } + + return nil +} + +// GetTimeWindow returns the time window as a duration. +func (c *CorrelationConfig) GetTimeWindow() time.Duration { + value := c.TimeWindow.Value + if value <= 0 { + value = 1 + } + + unit := c.TimeWindow.Unit + if unit == "" { + unit = "s" + } + + switch unit { + case "ms", "millisecond", "milliseconds": + return time.Duration(value) * time.Millisecond + case "s", "second", "seconds": + return time.Duration(value) * time.Second + case "m", "minute", "minutes": + return time.Duration(value) * time.Minute + default: + return time.Duration(value) * time.Second + } +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000..c6598e1 --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,224 @@ +package config + +import ( + "os" + "path/filepath" + "testing" + "time" +) + +func TestLoad_ValidConfig(t *testing.T) { + content := ` +# Test configuration +service.name logcorrelator +service.language go + +input.unix_socket apache_source /var/run/logcorrelator/apache.sock json +input.unix_socket network_source /var/run/logcorrelator/network.sock json + +output.file.enabled true +output.file.path /var/log/logcorrelator/correlated.log + +output.clickhouse.enabled false +output.clickhouse.dsn clickhouse://user:pass@localhost:9000/db +output.clickhouse.table correlated_logs + +correlation.key src_ip,src_port +correlation.time_window.value 1 +correlation.time_window.unit s + +correlation.orphan_policy.apache_always_emit true +correlation.orphan_policy.network_emit false +` + + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.conf") + if err := os.WriteFile(configPath, []byte(content), 0644); err != nil { + t.Fatalf("failed to write config: %v", err) + } + + cfg, err := Load(configPath) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if cfg.Service.Name != "logcorrelator" { + t.Errorf("expected service name logcorrelator, got %s", cfg.Service.Name) + } + if len(cfg.Inputs.UnixSockets) != 2 { + t.Errorf("expected 2 unix sockets, got %d", len(cfg.Inputs.UnixSockets)) + } + if !cfg.Outputs.File.Enabled { + t.Error("expected file output enabled") + } +} + +func TestLoad_InvalidPath(t *testing.T) { + _, err := Load("/nonexistent/path/config.conf") + if err == nil { + t.Error("expected error for nonexistent path") + } +} + +func TestLoad_InvalidDirective(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.conf") + content := `invalid.directive value` + if err := os.WriteFile(configPath, []byte(content), 0644); err != nil { + t.Fatalf("failed to write config: %v", err) + } + + _, err := Load(configPath) + if err == nil { + t.Error("expected error for invalid directive") + } +} + +func TestLoad_Comments(t *testing.T) { + tmpDir := t.TempDir() + configPath := filepath.Join(tmpDir, "config.conf") + content := ` +# This is a comment +service.name logcorrelator +# Another comment +input.unix_socket test /tmp/test.sock json +input.unix_socket test2 /tmp/test2.sock json +output.file.enabled true +` + if err := os.WriteFile(configPath, []byte(content), 0644); err != nil { + t.Fatalf("failed to write config: %v", err) + } + + cfg, err := Load(configPath) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if cfg.Service.Name != "logcorrelator" { + t.Errorf("expected service name logcorrelator, got %s", cfg.Service.Name) + } +} + +func TestValidate_MinimumInputs(t *testing.T) { + cfg := &Config{ + Inputs: InputsConfig{ + UnixSockets: []UnixSocketConfig{ + {Name: "only_one", Path: "/tmp/test.sock"}, + }, + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{Enabled: true}, + }, + } + + err := cfg.Validate() + if err == nil { + t.Error("expected error for less than 2 inputs") + } +} + +func TestValidate_AtLeastOneOutput(t *testing.T) { + cfg := &Config{ + Inputs: InputsConfig{ + UnixSockets: []UnixSocketConfig{ + {Name: "a", Path: "/tmp/a.sock"}, + {Name: "b", Path: "/tmp/b.sock"}, + }, + }, + Outputs: OutputsConfig{ + File: FileOutputConfig{Enabled: false}, + ClickHouse: ClickHouseOutputConfig{Enabled: false}, + Stdout: StdoutOutputConfig{Enabled: false}, + }, + } + + err := cfg.Validate() + if err == nil { + t.Error("expected error for no outputs enabled") + } +} + +func TestGetTimeWindow(t *testing.T) { + tests := []struct { + name string + config CorrelationConfig + expected time.Duration + }{ + { + name: "seconds", + config: CorrelationConfig{ + TimeWindow: TimeWindowConfig{Value: 1, Unit: "s"}, + }, + expected: time.Second, + }, + { + name: "milliseconds", + config: CorrelationConfig{ + TimeWindow: TimeWindowConfig{Value: 500, Unit: "ms"}, + }, + expected: 500 * time.Millisecond, + }, + { + name: "minutes", + config: CorrelationConfig{ + TimeWindow: TimeWindowConfig{Value: 2, Unit: "m"}, + }, + expected: 2 * time.Minute, + }, + { + name: "default", + config: CorrelationConfig{ + TimeWindow: TimeWindowConfig{}, + }, + expected: time.Second, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.config.GetTimeWindow() + if result != tt.expected { + t.Errorf("expected %v, got %v", tt.expected, result) + } + }) + } +} + +func TestParseBool(t *testing.T) { + tests := []struct { + input string + expected bool + hasError bool + }{ + {"true", true, false}, + {"True", true, false}, + {"TRUE", true, false}, + {"yes", true, false}, + {"1", true, false}, + {"on", true, false}, + {"false", false, false}, + {"False", false, false}, + {"no", false, false}, + {"0", false, false}, + {"off", false, false}, + {"invalid", false, true}, + } + + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + result, err := parseBool(tt.input) + if tt.hasError { + if err == nil { + t.Error("expected error, got nil") + } + } else { + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if result != tt.expected { + t.Errorf("expected %v, got %v", tt.expected, result) + } + } + }) + } +} diff --git a/internal/domain/correlated_log.go b/internal/domain/correlated_log.go new file mode 100644 index 0000000..b4143e8 --- /dev/null +++ b/internal/domain/correlated_log.go @@ -0,0 +1,90 @@ +package domain + +import "time" + +// CorrelatedLog represents the output correlated log entry. +type CorrelatedLog struct { + Timestamp time.Time `json:"timestamp"` + SrcIP string `json:"src_ip"` + SrcPort int `json:"src_port"` + DstIP string `json:"dst_ip,omitempty"` + DstPort int `json:"dst_port,omitempty"` + Correlated bool `json:"correlated"` + OrphanSide string `json:"orphan_side,omitempty"` + Apache map[string]any `json:"apache,omitempty"` + Network map[string]any `json:"network,omitempty"` + Extra map[string]any `json:"extra,omitempty"` +} + +// NewCorrelatedLogFromEvent creates a correlated log from a single event (orphan). +func NewCorrelatedLogFromEvent(event *NormalizedEvent, orphanSide string) CorrelatedLog { + return CorrelatedLog{ + Timestamp: event.Timestamp, + SrcIP: event.SrcIP, + SrcPort: event.SrcPort, + DstIP: event.DstIP, + DstPort: event.DstPort, + Correlated: false, + OrphanSide: orphanSide, + Apache: extractApache(event), + Network: extractNetwork(event), + Extra: make(map[string]any), + } +} + +// NewCorrelatedLog creates a correlated log from two matched events. +func NewCorrelatedLog(apacheEvent, networkEvent *NormalizedEvent) CorrelatedLog { + ts := apacheEvent.Timestamp + if networkEvent.Timestamp.After(ts) { + ts = networkEvent.Timestamp + } + + return CorrelatedLog{ + Timestamp: ts, + SrcIP: apacheEvent.SrcIP, + SrcPort: apacheEvent.SrcPort, + DstIP: coalesceString(apacheEvent.DstIP, networkEvent.DstIP), + DstPort: coalesceInt(apacheEvent.DstPort, networkEvent.DstPort), + Correlated: true, + OrphanSide: "", + Apache: extractApache(apacheEvent), + Network: extractNetwork(networkEvent), + Extra: make(map[string]any), + } +} + +func extractApache(e *NormalizedEvent) map[string]any { + if e.Source != SourceA { + return nil + } + result := make(map[string]any) + for k, v := range e.Raw { + result[k] = v + } + return result +} + +func extractNetwork(e *NormalizedEvent) map[string]any { + if e.Source != SourceB { + return nil + } + result := make(map[string]any) + for k, v := range e.Raw { + result[k] = v + } + return result +} + +func coalesceString(a, b string) string { + if a != "" { + return a + } + return b +} + +func coalesceInt(a, b int) int { + if a != 0 { + return a + } + return b +} diff --git a/internal/domain/correlated_log_test.go b/internal/domain/correlated_log_test.go new file mode 100644 index 0000000..6284c5c --- /dev/null +++ b/internal/domain/correlated_log_test.go @@ -0,0 +1,115 @@ +package domain + +import ( + "testing" + "time" +) + +func TestNormalizedEvent_CorrelationKeyFull(t *testing.T) { + tests := []struct { + name string + event *NormalizedEvent + expected string + }{ + { + name: "basic key", + event: &NormalizedEvent{ + SrcIP: "192.168.1.1", + SrcPort: 8080, + }, + expected: "192.168.1.1:8080", + }, + { + name: "different port", + event: &NormalizedEvent{ + SrcIP: "10.0.0.1", + SrcPort: 443, + }, + expected: "10.0.0.1:443", + }, + { + name: "port zero", + event: &NormalizedEvent{ + SrcIP: "127.0.0.1", + SrcPort: 0, + }, + expected: "127.0.0.1:0", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + key := tt.event.CorrelationKeyFull() + if key != tt.expected { + t.Errorf("expected %s, got %s", tt.expected, key) + } + }) + } +} + +func TestNewCorrelatedLogFromEvent(t *testing.T) { + event := &NormalizedEvent{ + Source: SourceA, + Timestamp: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC), + SrcIP: "192.168.1.1", + SrcPort: 8080, + DstIP: "10.0.0.1", + DstPort: 80, + Raw: map[string]any{ + "method": "GET", + "path": "/api/test", + }, + } + + log := NewCorrelatedLogFromEvent(event, "A") + + if log.Correlated { + t.Error("expected correlated to be false") + } + if log.OrphanSide != "A" { + t.Errorf("expected orphan_side A, got %s", log.OrphanSide) + } + if log.SrcIP != "192.168.1.1" { + t.Errorf("expected src_ip 192.168.1.1, got %s", log.SrcIP) + } + if log.Apache == nil { + t.Error("expected apache to be non-nil") + } +} + +func TestNewCorrelatedLog(t *testing.T) { + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC), + SrcIP: "192.168.1.1", + SrcPort: 8080, + DstIP: "10.0.0.1", + DstPort: 80, + Raw: map[string]any{"method": "GET"}, + } + + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: time.Date(2024, 1, 1, 12, 0, 0, 500000000, time.UTC), + SrcIP: "192.168.1.1", + SrcPort: 8080, + DstIP: "10.0.0.1", + DstPort: 80, + Raw: map[string]any{"ja3": "abc123"}, + } + + log := NewCorrelatedLog(apacheEvent, networkEvent) + + if !log.Correlated { + t.Error("expected correlated to be true") + } + if log.OrphanSide != "" { + t.Errorf("expected orphan_side to be empty, got %s", log.OrphanSide) + } + if log.Apache == nil { + t.Error("expected apache to be non-nil") + } + if log.Network == nil { + t.Error("expected network to be non-nil") + } +} diff --git a/internal/domain/correlation_service.go b/internal/domain/correlation_service.go new file mode 100644 index 0000000..dfac022 --- /dev/null +++ b/internal/domain/correlation_service.go @@ -0,0 +1,243 @@ +package domain + +import ( + "container/list" + "sync" + "time" +) + +const ( + // DefaultMaxBufferSize is the default maximum number of events per buffer + DefaultMaxBufferSize = 10000 +) + +// CorrelationConfig holds the correlation configuration. +type CorrelationConfig struct { + TimeWindow time.Duration + ApacheAlwaysEmit bool + NetworkEmit bool + MaxBufferSize int // Maximum events to buffer per source +} + +// CorrelationService handles the correlation logic between source A and B events. +type CorrelationService struct { + config CorrelationConfig + mu sync.Mutex + bufferA *eventBuffer + bufferB *eventBuffer + pendingA map[string]*list.Element // key -> list element containing NormalizedEvent + pendingB map[string]*list.Element + timeProvider TimeProvider +} + +type eventBuffer struct { + events *list.List +} + +func newEventBuffer() *eventBuffer { + return &eventBuffer{ + events: list.New(), + } +} + +// TimeProvider abstracts time for testability. +type TimeProvider interface { + Now() time.Time +} + +// RealTimeProvider uses real system time. +type RealTimeProvider struct{} + +func (p *RealTimeProvider) Now() time.Time { + return time.Now() +} + +// NewCorrelationService creates a new correlation service. +func NewCorrelationService(config CorrelationConfig, timeProvider TimeProvider) *CorrelationService { + if timeProvider == nil { + timeProvider = &RealTimeProvider{} + } + if config.MaxBufferSize <= 0 { + config.MaxBufferSize = DefaultMaxBufferSize + } + return &CorrelationService{ + config: config, + bufferA: newEventBuffer(), + bufferB: newEventBuffer(), + pendingA: make(map[string]*list.Element), + pendingB: make(map[string]*list.Element), + timeProvider: timeProvider, + } +} + +// ProcessEvent processes an incoming event and returns correlated logs if matches are found. +func (s *CorrelationService) ProcessEvent(event *NormalizedEvent) []CorrelatedLog { + s.mu.Lock() + defer s.mu.Unlock() + + // Clean expired events first + s.cleanExpired() + + // Check buffer overflow before adding + if s.isBufferFull(event.Source) { + // Buffer full, drop event or emit as orphan + if event.Source == SourceA && s.config.ApacheAlwaysEmit { + return []CorrelatedLog{NewCorrelatedLogFromEvent(event, "A")} + } + return nil + } + + var results []CorrelatedLog + + switch event.Source { + case SourceA: + results = s.processSourceA(event) + case SourceB: + results = s.processSourceB(event) + } + + // Add the new event to the appropriate buffer + s.addEvent(event) + + return results +} + +func (s *CorrelationService) isBufferFull(source EventSource) bool { + switch source { + case SourceA: + return s.bufferA.events.Len() >= s.config.MaxBufferSize + case SourceB: + return s.bufferB.events.Len() >= s.config.MaxBufferSize + } + return false +} + +func (s *CorrelationService) processSourceA(event *NormalizedEvent) []CorrelatedLog { + key := event.CorrelationKeyFull() + + // Look for a matching B event + if elem, ok := s.pendingB[key]; ok { + bEvent := elem.Value.(*NormalizedEvent) + if s.eventsMatch(event, bEvent) { + // Found a match! + correlated := NewCorrelatedLog(event, bEvent) + s.bufferB.events.Remove(elem) + delete(s.pendingB, key) + return []CorrelatedLog{correlated} + } + } + + // No match found + if s.config.ApacheAlwaysEmit { + orphan := NewCorrelatedLogFromEvent(event, "A") + return []CorrelatedLog{orphan} + } + + // Keep in buffer for potential future match + return nil +} + +func (s *CorrelationService) processSourceB(event *NormalizedEvent) []CorrelatedLog { + key := event.CorrelationKeyFull() + + // Look for a matching A event + if elem, ok := s.pendingA[key]; ok { + aEvent := elem.Value.(*NormalizedEvent) + if s.eventsMatch(aEvent, event) { + // Found a match! + correlated := NewCorrelatedLog(aEvent, event) + s.bufferA.events.Remove(elem) + delete(s.pendingA, key) + return []CorrelatedLog{correlated} + } + } + + // No match found - B is never emitted alone per spec + if s.config.NetworkEmit { + orphan := NewCorrelatedLogFromEvent(event, "B") + return []CorrelatedLog{orphan} + } + + // Keep in buffer for potential future match (but won't be emitted alone) + return nil +} + +func (s *CorrelationService) eventsMatch(a, b *NormalizedEvent) bool { + diff := a.Timestamp.Sub(b.Timestamp) + if diff < 0 { + diff = -diff + } + return diff <= s.config.TimeWindow +} + +func (s *CorrelationService) addEvent(event *NormalizedEvent) { + key := event.CorrelationKeyFull() + + switch event.Source { + case SourceA: + elem := s.bufferA.events.PushBack(event) + s.pendingA[key] = elem + case SourceB: + elem := s.bufferB.events.PushBack(event) + s.pendingB[key] = elem + } +} + +func (s *CorrelationService) cleanExpired() { + now := s.timeProvider.Now() + cutoff := now.Add(-s.config.TimeWindow) + + // Clean expired events from both buffers using shared logic + s.cleanBuffer(s.bufferA, s.pendingA, cutoff) + s.cleanBuffer(s.bufferB, s.pendingB, cutoff) +} + +// cleanBuffer removes expired events from a buffer (shared logic for A and B). +func (s *CorrelationService) cleanBuffer(buffer *eventBuffer, pending map[string]*list.Element, cutoff time.Time) { + for elem := buffer.events.Front(); elem != nil; { + event := elem.Value.(*NormalizedEvent) + if event.Timestamp.Before(cutoff) { + next := elem.Next() + key := event.CorrelationKeyFull() + buffer.events.Remove(elem) + if pending[key] == elem { + delete(pending, key) + } + elem = next + } else { + break // Events are ordered, so we can stop early + } + } +} + +// Flush forces emission of remaining buffered events (for shutdown). +func (s *CorrelationService) Flush() []CorrelatedLog { + s.mu.Lock() + defer s.mu.Unlock() + + var results []CorrelatedLog + + // Emit remaining A events as orphans if configured + if s.config.ApacheAlwaysEmit { + for elem := s.bufferA.events.Front(); elem != nil; elem = elem.Next() { + event := elem.Value.(*NormalizedEvent) + orphan := NewCorrelatedLogFromEvent(event, "A") + results = append(results, orphan) + } + } + + // Clear buffers + s.bufferA.events.Init() + s.bufferB.events.Init() + s.pendingA = make(map[string]*list.Element) + s.pendingB = make(map[string]*list.Element) + + return results +} + +// GetBufferSizes returns the current buffer sizes (for monitoring). +func (s *CorrelationService) GetBufferSizes() (int, int) { + s.mu.Lock() + defer s.mu.Unlock() + return s.bufferA.events.Len(), s.bufferB.events.Len() +} diff --git a/internal/domain/correlation_service_test.go b/internal/domain/correlation_service_test.go new file mode 100644 index 0000000..ffc03d6 --- /dev/null +++ b/internal/domain/correlation_service_test.go @@ -0,0 +1,153 @@ +package domain + +import ( + "testing" + "time" +) + +type mockTimeProvider struct { + now time.Time +} + +func (m *mockTimeProvider) Now() time.Time { + return m.now +} + +func TestCorrelationService_Match(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: false, // Don't emit A immediately to test matching + NetworkEmit: false, + } + + svc := NewCorrelationService(config, timeProvider) + + // Send Apache event (should be buffered, not emitted) + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"method": "GET"}, + } + + results := svc.ProcessEvent(apacheEvent) + if len(results) != 0 { + t.Fatalf("expected 0 results (buffered), got %d", len(results)) + } + + // Send matching Network event within window + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now.Add(500 * time.Millisecond), + SrcIP: "192.168.1.1", + SrcPort: 8080, + Raw: map[string]any{"ja3": "abc"}, + } + + // This should match and return correlated log + results = svc.ProcessEvent(networkEvent) + if len(results) != 1 { + t.Errorf("expected 1 result (correlated), got %d", len(results)) + } else if !results[0].Correlated { + t.Error("expected correlated result") + } +} + +func TestCorrelationService_NoMatch_DifferentIP(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + NetworkEmit: false, + } + + svc := NewCorrelationService(config, timeProvider) + + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now, + SrcIP: "192.168.1.2", // Different IP + SrcPort: 8080, + } + + svc.ProcessEvent(apacheEvent) + results := svc.ProcessEvent(networkEvent) + + if len(results) != 0 { + t.Errorf("expected 0 results (different IP), got %d", len(results)) + } +} + +func TestCorrelationService_NoMatch_TimeWindowExceeded(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + NetworkEmit: false, + } + + svc := NewCorrelationService(config, timeProvider) + + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + + networkEvent := &NormalizedEvent{ + Source: SourceB, + Timestamp: now.Add(2 * time.Second), // Outside window + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + + svc.ProcessEvent(apacheEvent) + results := svc.ProcessEvent(networkEvent) + + if len(results) != 0 { + t.Errorf("expected 0 results (time window exceeded), got %d", len(results)) + } +} + +func TestCorrelationService_Flush(t *testing.T) { + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + timeProvider := &mockTimeProvider{now: now} + + config := CorrelationConfig{ + TimeWindow: time.Second, + ApacheAlwaysEmit: true, + NetworkEmit: false, + } + + svc := NewCorrelationService(config, timeProvider) + + apacheEvent := &NormalizedEvent{ + Source: SourceA, + Timestamp: now, + SrcIP: "192.168.1.1", + SrcPort: 8080, + } + + svc.ProcessEvent(apacheEvent) + + flushed := svc.Flush() + if len(flushed) != 1 { + t.Errorf("expected 1 flushed event, got %d", len(flushed)) + } +} diff --git a/internal/domain/event.go b/internal/domain/event.go new file mode 100644 index 0000000..bbb3b21 --- /dev/null +++ b/internal/domain/event.go @@ -0,0 +1,37 @@ +package domain + +import ( + "strconv" + "time" +) + +// EventSource identifies the source of an event. +type EventSource string + +const ( + SourceA EventSource = "A" // Apache/HTTP source + SourceB EventSource = "B" // Network source +) + +// NormalizedEvent represents a unified internal event from either source. +type NormalizedEvent struct { + Source EventSource + Timestamp time.Time + SrcIP string + SrcPort int + DstIP string + DstPort int + Headers map[string]string + Extra map[string]any + Raw map[string]any // Original raw data +} + +// CorrelationKey returns the key used for correlation (src_ip + src_port). +func (e *NormalizedEvent) CorrelationKey() string { + return e.SrcIP + ":" + strconv.Itoa(e.SrcPort) +} + +// CorrelationKeyFull returns a proper correlation key (alias for clarity). +func (e *NormalizedEvent) CorrelationKeyFull() string { + return e.CorrelationKey() +} diff --git a/internal/observability/logger.go b/internal/observability/logger.go new file mode 100644 index 0000000..836536c --- /dev/null +++ b/internal/observability/logger.go @@ -0,0 +1,85 @@ +package observability + +import ( + "log" + "os" + "sync" +) + +// Logger provides structured logging. +type Logger struct { + mu sync.Mutex + logger *log.Logger + prefix string + fields map[string]any +} + +// NewLogger creates a new logger. +func NewLogger(prefix string) *Logger { + return &Logger{ + logger: log.New(os.Stderr, "", log.LstdFlags|log.Lmicroseconds), + prefix: prefix, + fields: make(map[string]any), + } +} + +// WithFields returns a new logger with additional fields. +func (l *Logger) WithFields(fields map[string]any) *Logger { + newLogger := &Logger{ + logger: l.logger, + prefix: l.prefix, + fields: make(map[string]any), + } + for k, v := range l.fields { + newLogger.fields[k] = v + } + for k, v := range fields { + newLogger.fields[k] = v + } + return newLogger +} + +// Info logs an info message. +func (l *Logger) Info(msg string) { + l.mu.Lock() + defer l.mu.Unlock() + l.log("INFO", msg) +} + +// Error logs an error message. +func (l *Logger) Error(msg string, err error) { + l.mu.Lock() + defer l.mu.Unlock() + if err != nil { + l.log("ERROR", msg+" "+err.Error()) + } else { + l.log("ERROR", msg) + } +} + +// Debug logs a debug message. +func (l *Logger) Debug(msg string) { + l.mu.Lock() + defer l.mu.Unlock() + l.log("DEBUG", msg) +} + +func (l *Logger) log(level, msg string) { + prefix := l.prefix + if prefix != "" { + prefix = "[" + prefix + "] " + } + + l.logger.SetPrefix(prefix + level + " ") + + var args []any + for k, v := range l.fields { + args = append(args, k, v) + } + + if len(args) > 0 { + l.logger.Printf(msg+" %+v", args...) + } else { + l.logger.Print(msg) + } +} diff --git a/internal/observability/logger_test.go b/internal/observability/logger_test.go new file mode 100644 index 0000000..cdc74fd --- /dev/null +++ b/internal/observability/logger_test.go @@ -0,0 +1,111 @@ +package observability + +import ( + "bytes" + "io" + "os" + "strings" + "testing" +) + +func TestNewLogger(t *testing.T) { + logger := NewLogger("test") + if logger == nil { + t.Fatal("expected non-nil logger") + } + if logger.prefix != "test" { + t.Errorf("expected prefix 'test', got %s", logger.prefix) + } +} + +func TestLogger_Info(t *testing.T) { + // Capture stderr + oldStderr := os.Stderr + r, w, _ := os.Pipe() + os.Stderr = w + + logger := NewLogger("test") + logger.Info("test message") + + w.Close() + os.Stderr = oldStderr + + var buf bytes.Buffer + io.Copy(&buf, r) + output := buf.String() + + if !strings.Contains(output, "INFO") { + t.Error("expected INFO in output") + } + if !strings.Contains(output, "test message") { + t.Error("expected 'test message' in output") + } +} + +func TestLogger_Error(t *testing.T) { + oldStderr := os.Stderr + r, w, _ := os.Pipe() + os.Stderr = w + + logger := NewLogger("test") + logger.Error("error message", nil) + + w.Close() + os.Stderr = oldStderr + + var buf bytes.Buffer + io.Copy(&buf, r) + output := buf.String() + + if !strings.Contains(output, "ERROR") { + t.Error("expected ERROR in output") + } + if !strings.Contains(output, "error message") { + t.Error("expected 'error message' in output") + } +} + +func TestLogger_Debug(t *testing.T) { + oldStderr := os.Stderr + r, w, _ := os.Pipe() + os.Stderr = w + + logger := NewLogger("test") + logger.Debug("debug message") + + w.Close() + os.Stderr = oldStderr + + var buf bytes.Buffer + io.Copy(&buf, r) + output := buf.String() + + if !strings.Contains(output, "DEBUG") { + t.Error("expected DEBUG in output") + } + if !strings.Contains(output, "debug message") { + t.Error("expected 'debug message' in output") + } +} + +func TestLogger_WithFields(t *testing.T) { + logger := NewLogger("test") + fieldsLogger := logger.WithFields(map[string]any{ + "key1": "value1", + "key2": 42, + }) + + if fieldsLogger == logger { + t.Error("expected different logger instance") + } + if len(fieldsLogger.fields) != 2 { + t.Errorf("expected 2 fields, got %d", len(fieldsLogger.fields)) + } +} + +func TestLogger_Name(t *testing.T) { + logger := NewLogger("myservice") + if logger.prefix != "myservice" { + t.Errorf("expected prefix 'myservice', got %s", logger.prefix) + } +} diff --git a/internal/ports/source.go b/internal/ports/source.go new file mode 100644 index 0000000..42fd2f5 --- /dev/null +++ b/internal/ports/source.go @@ -0,0 +1,54 @@ +package ports + +import ( + "context" + "time" + + "github.com/logcorrelator/logcorrelator/internal/domain" +) + +// EventSource defines the interface for log sources. +type EventSource interface { + // Start begins reading events and sending them to the channel. + // Returns an error if the source cannot be started. + Start(ctx context.Context, eventChan chan<- *domain.NormalizedEvent) error + + // Stop gracefully stops the source. + Stop() error + + // Name returns the source name. + Name() string +} + +// CorrelatedLogSink defines the interface for correlated log destinations. +type CorrelatedLogSink interface { + // Write sends a correlated log to the sink. + Write(ctx context.Context, log domain.CorrelatedLog) error + + // Flush flushes any buffered logs. + Flush(ctx context.Context) error + + // Close closes the sink. + Close() error + + // Name returns the sink name. + Name() string +} + +// TimeProvider abstracts time for testability. +type TimeProvider interface { + Now() time.Time +} + +// CorrelationProcessor defines the interface for the correlation service. +// This allows for easier testing and alternative implementations. +type CorrelationProcessor interface { + // ProcessEvent processes an incoming event and returns correlated logs. + ProcessEvent(event *domain.NormalizedEvent) []domain.CorrelatedLog + + // Flush forces emission of remaining buffered events. + Flush() []domain.CorrelatedLog + + // GetBufferSizes returns the current buffer sizes for monitoring. + GetBufferSizes() (int, int) +} diff --git a/logcorrelator.service b/logcorrelator.service new file mode 100644 index 0000000..50484df --- /dev/null +++ b/logcorrelator.service @@ -0,0 +1,23 @@ +[Unit] +Description=logcorrelator service +After=network.target + +[Service] +Type=simple +User=logcorrelator +Group=logcorrelator +ExecStart=/usr/bin/logcorrelator -config /etc/logcorrelator/logcorrelator.conf +Restart=on-failure +RestartSec=5 + +# Security hardening +NoNewPrivileges=true +ProtectSystem=strict +ProtectHome=true +ReadWritePaths=/var/log/logcorrelator /var/run/logcorrelator + +# Resource limits +LimitNOFILE=65536 + +[Install] +WantedBy=multi-user.target diff --git a/packaging/deb/postinst b/packaging/deb/postinst new file mode 100644 index 0000000..212c942 --- /dev/null +++ b/packaging/deb/postinst @@ -0,0 +1,66 @@ +#!/bin/bash +set -e + +# postinst script for logcorrelator .deb package + +case "$1" in + configure) + # Create logcorrelator user and group if they don't exist + if ! getent group logcorrelator > /dev/null 2>&1; then + groupadd --system logcorrelator + fi + + if ! getent passwd logcorrelator > /dev/null 2>&1; then + useradd --system \ + --gid logcorrelator \ + --home-dir /var/lib/logcorrelator \ + --no-create-home \ + --shell /usr/sbin/nologin \ + logcorrelator + fi + + # Create necessary directories + mkdir -p /var/lib/logcorrelator + mkdir -p /var/run/logcorrelator + mkdir -p /var/log/logcorrelator + mkdir -p /etc/logcorrelator + + # Set proper ownership + chown -R logcorrelator:logcorrelator /var/lib/logcorrelator + chown -R logcorrelator:logcorrelator /var/run/logcorrelator + chown -R logcorrelator:logcorrelator /var/log/logcorrelator + chown -R logcorrelator:logcorrelator /etc/logcorrelator + + # Set proper permissions + chmod 750 /var/lib/logcorrelator + chmod 750 /var/log/logcorrelator + chmod 750 /etc/logcorrelator + + # Install default config if it doesn't exist + if [ ! -f /etc/logcorrelator/logcorrelator.conf ]; then + cp /usr/share/logcorrelator/logcorrelator.conf.example /etc/logcorrelator/logcorrelator.conf + chown logcorrelator:logcorrelator /etc/logcorrelator/logcorrelator.conf + chmod 640 /etc/logcorrelator/logcorrelator.conf + fi + + # Enable and start the service (if running in a real system, not container) + if [ -x /bin/systemctl ] && [ -d /run/systemd/system ]; then + systemctl daemon-reload + systemctl enable logcorrelator.service + if ! systemctl is-active --quiet logcorrelator.service; then + systemctl start logcorrelator.service + fi + fi + ;; + + abort-upgrade|abort-remove|abort-deconfigure) + # On abort, do nothing special + ;; + + *) + echo "postinst called with unknown argument '$1'" >&2 + exit 1 + ;; +esac + +exit 0 diff --git a/packaging/deb/postrm b/packaging/deb/postrm new file mode 100644 index 0000000..f796389 --- /dev/null +++ b/packaging/deb/postrm @@ -0,0 +1,52 @@ +#!/bin/bash +set -e + +# postrm script for logcorrelator .deb package + +case "$1" in + remove) + # On remove, leave config and data files + ;; + + purge) + # On purge, remove everything + + # Stop service if running + if [ -x /bin/systemctl ] && [ -d /run/systemd/system ]; then + systemctl stop logcorrelator.service 2>/dev/null || true + systemctl disable logcorrelator.service 2>/dev/null || true + systemctl daemon-reload + fi + + # Remove configuration + rm -rf /etc/logcorrelator + + # Remove data and logs + rm -rf /var/lib/logcorrelator + rm -rf /var/log/logcorrelator + rm -rf /var/run/logcorrelator + + # Remove user and group + if getent passwd logcorrelator > /dev/null 2>&1; then + userdel logcorrelator 2>/dev/null || true + fi + + if getent group logcorrelator > /dev/null 2>&1; then + groupdel logcorrelator 2>/dev/null || true + fi + ;; + + abort-upgrade|abort-remove|abort-deconfigure) + # On abort, restart the service + if [ -x /bin/systemctl ] && [ -d /run/systemd/system ]; then + systemctl start logcorrelator.service 2>/dev/null || true + fi + ;; + + *) + echo "postrm called with unknown argument '$1'" >&2 + exit 1 + ;; +esac + +exit 0 diff --git a/packaging/deb/prerm b/packaging/deb/prerm new file mode 100644 index 0000000..a82dc01 --- /dev/null +++ b/packaging/deb/prerm @@ -0,0 +1,29 @@ +#!/bin/bash +set -e + +# prerm script for logcorrelator .deb package + +case "$1" in + remove|deconfigure) + # Stop and disable the service + if [ -x /bin/systemctl ] && [ -d /run/systemd/system ]; then + systemctl stop logcorrelator.service 2>/dev/null || true + systemctl disable logcorrelator.service 2>/dev/null || true + systemctl daemon-reload + fi + ;; + + upgrade) + # On upgrade, just stop the service (will be restarted by postinst) + if [ -x /bin/systemctl ] && [ -d /run/systemd/system ]; then + systemctl stop logcorrelator.service 2>/dev/null || true + fi + ;; + + *) + echo "prerm called with unknown argument '$1'" >&2 + exit 1 + ;; +esac + +exit 0 diff --git a/test.sh b/test.sh new file mode 100755 index 0000000..2a2f515 --- /dev/null +++ b/test.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# Test script - runs all tests in Docker container +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$SCRIPT_DIR" + +echo "==============================================" +echo " logcorrelator - Test Suite (Docker)" +echo "==============================================" +echo "" + +# Build test image and run tests +docker build \ + --target builder \ + -t logcorrelator-test:latest \ + -f Dockerfile . + +echo "" +echo "Tests completed successfully!" +echo ""