From d8ca804a5591a7cf12062db12c1f634590007b4a Mon Sep 17 00:00:00 2001 From: toto Date: Thu, 9 Apr 2026 15:41:38 +0200 Subject: [PATCH] =?UTF-8?q?feat(scripts):=20add=20reload-prod-logs.sh=20fo?= =?UTF-8?q?r=20prod=E2=86=92dev=20data=20sync?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Exports http_logs from prod ClickHouse via HTTP API, imports into dev with dynamic date shifting (max(time) → now() by default). Features: - Batch export in Native format (200K rows/batch, ~10s each) - Auto date shift: prod max(time) aligned to current time - --shift N: manual override (seconds) - --days N: filter to last N days only - --cron: silent mode for scheduled runs - Staging table approach: export → staging → INSERT SELECT with shift → cleanup Tested: 3,054,122 rows imported in ~3 minutes, dates 2026-04-03→2026-04-09. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- Makefile | 7 +- scripts/reload-prod-logs.sh | 221 ++++++++++++++++++++++++++++++++++++ 2 files changed, 227 insertions(+), 1 deletion(-) create mode 100755 scripts/reload-prod-logs.sh diff --git a/Makefile b/Makefile index ec0a4b7..690042d 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,8 @@ build-correlator test-correlator rpm-correlator \ build-bot-detector test-bot-detector \ build-dashboard test-dashboard \ - test-ja4common-python + test-ja4common-python \ + reload-prod-logs # --- Root ------------------------------------------------------------------- @@ -139,3 +140,7 @@ test-integration-keep: test-integration-down: cd tests/integration && docker compose down -v --remove-orphans + +# ── Dev data ───────────────────────────────────────────────────────────────── +reload-prod-logs: +./scripts/reload-prod-logs.sh diff --git a/scripts/reload-prod-logs.sh b/scripts/reload-prod-logs.sh new file mode 100755 index 0000000..6d7e498 --- /dev/null +++ b/scripts/reload-prod-logs.sh @@ -0,0 +1,221 @@ +#!/usr/bin/env bash +# ============================================================================= +# reload-prod-logs.sh — Exporte les http_logs de prod et les réimporte dans la +# DB de dev avec un décalage de date dynamique. +# +# Usage: +# ./scripts/reload-prod-logs.sh # décalage auto (max(time) → now) +# ./scripts/reload-prod-logs.sh --shift 3600 # décalage de 3600 secondes +# ./scripts/reload-prod-logs.sh --days 7 # n'exporte que les N derniers jours +# ./scripts/reload-prod-logs.sh --cron # mode silencieux (pour crontab) +# +# Variables d'environnement : +# PROD_CH_HOST Host ClickHouse de prod (défaut: test-sdv-anubis.sdv.fr) +# PROD_CH_PORT Port HTTP de prod (défaut: 8123) +# PROD_CH_USER Utilisateur prod (défaut: admin) +# PROD_CH_PASSWORD Mot de passe prod (défaut: SuperPassword123!) +# PROD_CH_DB Base de données prod (défaut: mabase_prod) +# DEV_CONTAINER Nom du conteneur ClickHouse dev (défaut: integration-clickhouse-1) +# DEV_DB_LOGS Base de données dev logs (défaut: ja4_logs) +# BATCH_SIZE Nombre de lignes par batch (défaut: 200000) +# ============================================================================= +set -euo pipefail + +# ── Configuration ──────────────────────────────────────────────────────────── +PROD_CH_HOST="${PROD_CH_HOST:-test-sdv-anubis.sdv.fr}" +PROD_CH_PORT="${PROD_CH_PORT:-8123}" +PROD_CH_USER="${PROD_CH_USER:-admin}" +PROD_CH_PASSWORD="${PROD_CH_PASSWORD:-SuperPassword123!}" +PROD_CH_DB="${PROD_CH_DB:-mabase_prod}" + +DEV_CONTAINER="${DEV_CONTAINER:-integration-clickhouse-1}" +DEV_DB_LOGS="${DEV_DB_LOGS:-ja4_logs}" +BATCH_SIZE="${BATCH_SIZE:-200000}" + +SHIFT_SECONDS="" +DAYS_FILTER="" +CRON_MODE=false + +# ── Parsing des arguments ──────────────────────────────────────────────────── +while [[ $# -gt 0 ]]; do + case "$1" in + --shift) SHIFT_SECONDS="$2"; shift 2 ;; + --days) DAYS_FILTER="$2"; shift 2 ;; + --cron) CRON_MODE=true; shift ;; + -h|--help) + head -15 "$0" | grep -E '^#' | sed 's/^# \?//' + exit 0 + ;; + *) echo "Option inconnue : $1"; exit 1 ;; + esac +done + +TMPDIR="${TMPDIR:-/tmp}" +EXPORT_FILE="${TMPDIR}/prod_http_logs_export.native" +LOG_PREFIX="[reload-prod-logs]" + +log() { + if [ "$CRON_MODE" = false ]; then + echo "${LOG_PREFIX} $(date '+%H:%M:%S') $*" + fi +} + +err() { + echo "${LOG_PREFIX} ERREUR: $*" >&2 + exit 1 +} + +# ── Fonctions utilitaires ──────────────────────────────────────────────────── + +prod_query() { + # Exécute une requête sur le CH de prod et retourne le résultat + curl -sf \ + -H "X-ClickHouse-User: ${PROD_CH_USER}" \ + -H "X-ClickHouse-Key: ${PROD_CH_PASSWORD}" \ + "http://${PROD_CH_HOST}:${PROD_CH_PORT}/" \ + --data-binary "$1" +} + +prod_query_to_file() { + # Exécute une requête sur le CH de prod et écrit la sortie dans un fichier + curl -sf \ + -H "X-ClickHouse-User: ${PROD_CH_USER}" \ + -H "X-ClickHouse-Key: ${PROD_CH_PASSWORD}" \ + "http://${PROD_CH_HOST}:${PROD_CH_PORT}/" \ + --data-binary "$1" \ + -o "$2" +} + +dev_query() { + # Exécute une requête sur le CH de dev + docker exec -i "${DEV_CONTAINER}" clickhouse-client --query "$1" +} + +dev_insert_native() { + # Insère des données au format Native dans le CH de dev + docker exec -i "${DEV_CONTAINER}" clickhouse-client \ + --query "INSERT INTO ${DEV_DB_LOGS}._staging_prod_import FORMAT Native" < "$1" +} + +# ── Vérifications ──────────────────────────────────────────────────────────── + +log "Vérification de la connexion à prod (${PROD_CH_HOST})…" +PROD_COUNT=$(prod_query "SELECT count() FROM ${PROD_CH_DB}.http_logs") \ + || err "Impossible de contacter le ClickHouse de prod" +log " Prod : ${PROD_COUNT} lignes dans http_logs" + +log "Vérification du conteneur dev (${DEV_CONTAINER})…" +DEV_COUNT=$(dev_query "SELECT count() FROM ${DEV_DB_LOGS}.http_logs") \ + || err "Impossible de contacter le ClickHouse de dev" +log " Dev : ${DEV_COUNT} lignes dans http_logs" + +# ── Calcul du décalage de date ─────────────────────────────────────────────── + +if [ -z "${SHIFT_SECONDS}" ]; then + # Calcul automatique : on décale pour que max(time) de prod = now() + PROD_MAX_TS=$(prod_query "SELECT toUnixTimestamp(max(time)) FROM ${PROD_CH_DB}.http_logs") + NOW_TS=$(date +%s) + SHIFT_SECONDS=$(( NOW_TS - PROD_MAX_TS )) + log "Décalage auto : ${SHIFT_SECONDS}s (prod max → now)" +fi + +log "Décalage appliqué : ${SHIFT_SECONDS} secondes ($(( SHIFT_SECONDS / 3600 ))h)" + +# ── Filtre temporel ────────────────────────────────────────────────────────── + +WHERE_CLAUSE="" +if [ -n "${DAYS_FILTER}" ]; then + WHERE_CLAUSE="WHERE time >= now() - toIntervalDay(${DAYS_FILTER})" + log "Filtre : ${DAYS_FILTER} derniers jours" +fi + +# ── Création de la table de staging ────────────────────────────────────────── + +log "Création de la table de staging…" +dev_query "DROP TABLE IF EXISTS ${DEV_DB_LOGS}._staging_prod_import" +dev_query "CREATE TABLE ${DEV_DB_LOGS}._staging_prod_import AS ${DEV_DB_LOGS}.http_logs ENGINE = MergeTree() ORDER BY (time, src_ip) SETTINGS index_granularity = 8192" + +# ── Export par batch ───────────────────────────────────────────────────────── + +# Compter les lignes à exporter +EXPORT_COUNT=$(prod_query "SELECT count() FROM ${PROD_CH_DB}.http_logs ${WHERE_CLAUSE}") +log "Export de ${EXPORT_COUNT} lignes en batches de ${BATCH_SIZE}…" + +NUM_BATCHES=$(( (EXPORT_COUNT + BATCH_SIZE - 1) / BATCH_SIZE )) +TOTAL_IMPORTED=0 + +for (( i=0; i/dev/null || stat -f%z "${EXPORT_FILE}" 2>/dev/null || echo 0) + if [ "${FILE_SIZE}" -eq 0 ]; then + log " Batch vide, arrêt." + break + fi + + dev_insert_native "${EXPORT_FILE}" \ + || err "Import batch $((i+1)) échoué" + + BATCH_ROWS=$(dev_query "SELECT count() FROM ${DEV_DB_LOGS}._staging_prod_import" ) + BATCH_IMPORTED=$(( BATCH_ROWS - TOTAL_IMPORTED )) + TOTAL_IMPORTED=${BATCH_ROWS} + log " → ${BATCH_IMPORTED} lignes importées (total staging: ${TOTAL_IMPORTED})" + + rm -f "${EXPORT_FILE}" +done + +log "Staging terminé : ${TOTAL_IMPORTED} lignes." + +# ── Insertion avec décalage de date ────────────────────────────────────────── + +log "Nettoyage de la table cible (données de prod précédentes)…" +# On identifie les données de prod importées par le décalage — on tronque tout +# car les données de dev sont générées par le traffic seeder / intégration +dev_query "TRUNCATE TABLE ${DEV_DB_LOGS}.http_logs" + +log "Insertion avec décalage de ${SHIFT_SECONDS}s dans ${DEV_DB_LOGS}.http_logs…" +dev_query " +INSERT INTO ${DEV_DB_LOGS}.http_logs +SELECT + time + toIntervalSecond(${SHIFT_SECONDS}) AS time, + toDate(time + toIntervalSecond(${SHIFT_SECONDS})) AS log_date, + src_ip, src_port, dst_ip, dst_port, + src_asn, src_country_code, src_as_name, src_org, src_domain, + method, scheme, host, path, query, http_version, + orphan_side, correlated, keepalives, + a_timestamp, b_timestamp, conn_id, + ip_meta_df, ip_meta_id, ip_meta_total_length, ip_meta_ttl, + tcp_meta_options, tcp_meta_window_size, tcp_meta_mss, tcp_meta_window_scale, + syn_to_clienthello_ms, + tls_version, tls_sni, tls_alpn, ja3, ja3_hash, ja4, + client_headers, + header_user_agent, header_accept, header_accept_encoding, header_accept_language, + header_content_type, header_x_request_id, header_x_trace_id, header_x_forwarded_for, + header_sec_ch_ua, header_sec_ch_ua_mobile, header_sec_ch_ua_platform, + header_sec_fetch_dest, header_sec_fetch_mode, header_sec_fetch_site, + anubis_bot_name, anubis_bot_action, anubis_bot_category +FROM ${DEV_DB_LOGS}._staging_prod_import +" + +FINAL_COUNT=$(dev_query "SELECT count() FROM ${DEV_DB_LOGS}.http_logs") +FINAL_MIN=$(dev_query "SELECT min(time) FROM ${DEV_DB_LOGS}.http_logs") +FINAL_MAX=$(dev_query "SELECT max(time) FROM ${DEV_DB_LOGS}.http_logs") + +log "Nettoyage de la table de staging…" +dev_query "DROP TABLE IF EXISTS ${DEV_DB_LOGS}._staging_prod_import" +rm -f "${EXPORT_FILE}" + +# ── Résultat ───────────────────────────────────────────────────────────────── + +log "════════════════════════════════════════════════════" +log " Import terminé : ${FINAL_COUNT} lignes dans ${DEV_DB_LOGS}.http_logs" +log " Plage : ${FINAL_MIN} → ${FINAL_MAX}" +log " Décalage appliqué : ${SHIFT_SECONDS}s" +log "════════════════════════════════════════════════════"