feat(scripts): add reload-prod-logs.sh for prod→dev data sync

Exports http_logs from prod ClickHouse via HTTP API, imports into dev
with dynamic date shifting (max(time) → now() by default).

Features:
- Batch export in Native format (200K rows/batch, ~10s each)
- Auto date shift: prod max(time) aligned to current time
- --shift N: manual override (seconds)
- --days N: filter to last N days only
- --cron: silent mode for scheduled runs
- Staging table approach: export → staging → INSERT SELECT with shift → cleanup

Tested: 3,054,122 rows imported in ~3 minutes, dates 2026-04-03→2026-04-09.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
toto
2026-04-09 15:41:38 +02:00
parent 8180f4af04
commit d8ca804a55
2 changed files with 227 additions and 1 deletions

View File

@ -11,7 +11,8 @@
build-correlator test-correlator rpm-correlator \
build-bot-detector test-bot-detector \
build-dashboard test-dashboard \
test-ja4common-python
test-ja4common-python \
reload-prod-logs
# --- Root -------------------------------------------------------------------
@ -139,3 +140,7 @@ test-integration-keep:
test-integration-down:
cd tests/integration && docker compose down -v --remove-orphans
# ── Dev data ─────────────────────────────────────────────────────────────────
reload-prod-logs:
./scripts/reload-prod-logs.sh

221
scripts/reload-prod-logs.sh Executable file
View File

@ -0,0 +1,221 @@
#!/usr/bin/env bash
# =============================================================================
# reload-prod-logs.sh — Exporte les http_logs de prod et les réimporte dans la
# DB de dev avec un décalage de date dynamique.
#
# Usage:
# ./scripts/reload-prod-logs.sh # décalage auto (max(time) → now)
# ./scripts/reload-prod-logs.sh --shift 3600 # décalage de 3600 secondes
# ./scripts/reload-prod-logs.sh --days 7 # n'exporte que les N derniers jours
# ./scripts/reload-prod-logs.sh --cron # mode silencieux (pour crontab)
#
# Variables d'environnement :
# PROD_CH_HOST Host ClickHouse de prod (défaut: test-sdv-anubis.sdv.fr)
# PROD_CH_PORT Port HTTP de prod (défaut: 8123)
# PROD_CH_USER Utilisateur prod (défaut: admin)
# PROD_CH_PASSWORD Mot de passe prod (défaut: SuperPassword123!)
# PROD_CH_DB Base de données prod (défaut: mabase_prod)
# DEV_CONTAINER Nom du conteneur ClickHouse dev (défaut: integration-clickhouse-1)
# DEV_DB_LOGS Base de données dev logs (défaut: ja4_logs)
# BATCH_SIZE Nombre de lignes par batch (défaut: 200000)
# =============================================================================
set -euo pipefail
# ── Configuration ────────────────────────────────────────────────────────────
PROD_CH_HOST="${PROD_CH_HOST:-test-sdv-anubis.sdv.fr}"
PROD_CH_PORT="${PROD_CH_PORT:-8123}"
PROD_CH_USER="${PROD_CH_USER:-admin}"
PROD_CH_PASSWORD="${PROD_CH_PASSWORD:-SuperPassword123!}"
PROD_CH_DB="${PROD_CH_DB:-mabase_prod}"
DEV_CONTAINER="${DEV_CONTAINER:-integration-clickhouse-1}"
DEV_DB_LOGS="${DEV_DB_LOGS:-ja4_logs}"
BATCH_SIZE="${BATCH_SIZE:-200000}"
SHIFT_SECONDS=""
DAYS_FILTER=""
CRON_MODE=false
# ── Parsing des arguments ────────────────────────────────────────────────────
while [[ $# -gt 0 ]]; do
case "$1" in
--shift) SHIFT_SECONDS="$2"; shift 2 ;;
--days) DAYS_FILTER="$2"; shift 2 ;;
--cron) CRON_MODE=true; shift ;;
-h|--help)
head -15 "$0" | grep -E '^#' | sed 's/^# \?//'
exit 0
;;
*) echo "Option inconnue : $1"; exit 1 ;;
esac
done
TMPDIR="${TMPDIR:-/tmp}"
EXPORT_FILE="${TMPDIR}/prod_http_logs_export.native"
LOG_PREFIX="[reload-prod-logs]"
log() {
if [ "$CRON_MODE" = false ]; then
echo "${LOG_PREFIX} $(date '+%H:%M:%S') $*"
fi
}
err() {
echo "${LOG_PREFIX} ERREUR: $*" >&2
exit 1
}
# ── Fonctions utilitaires ────────────────────────────────────────────────────
prod_query() {
# Exécute une requête sur le CH de prod et retourne le résultat
curl -sf \
-H "X-ClickHouse-User: ${PROD_CH_USER}" \
-H "X-ClickHouse-Key: ${PROD_CH_PASSWORD}" \
"http://${PROD_CH_HOST}:${PROD_CH_PORT}/" \
--data-binary "$1"
}
prod_query_to_file() {
# Exécute une requête sur le CH de prod et écrit la sortie dans un fichier
curl -sf \
-H "X-ClickHouse-User: ${PROD_CH_USER}" \
-H "X-ClickHouse-Key: ${PROD_CH_PASSWORD}" \
"http://${PROD_CH_HOST}:${PROD_CH_PORT}/" \
--data-binary "$1" \
-o "$2"
}
dev_query() {
# Exécute une requête sur le CH de dev
docker exec -i "${DEV_CONTAINER}" clickhouse-client --query "$1"
}
dev_insert_native() {
# Insère des données au format Native dans le CH de dev
docker exec -i "${DEV_CONTAINER}" clickhouse-client \
--query "INSERT INTO ${DEV_DB_LOGS}._staging_prod_import FORMAT Native" < "$1"
}
# ── Vérifications ────────────────────────────────────────────────────────────
log "Vérification de la connexion à prod (${PROD_CH_HOST})…"
PROD_COUNT=$(prod_query "SELECT count() FROM ${PROD_CH_DB}.http_logs") \
|| err "Impossible de contacter le ClickHouse de prod"
log " Prod : ${PROD_COUNT} lignes dans http_logs"
log "Vérification du conteneur dev (${DEV_CONTAINER})…"
DEV_COUNT=$(dev_query "SELECT count() FROM ${DEV_DB_LOGS}.http_logs") \
|| err "Impossible de contacter le ClickHouse de dev"
log " Dev : ${DEV_COUNT} lignes dans http_logs"
# ── Calcul du décalage de date ───────────────────────────────────────────────
if [ -z "${SHIFT_SECONDS}" ]; then
# Calcul automatique : on décale pour que max(time) de prod = now()
PROD_MAX_TS=$(prod_query "SELECT toUnixTimestamp(max(time)) FROM ${PROD_CH_DB}.http_logs")
NOW_TS=$(date +%s)
SHIFT_SECONDS=$(( NOW_TS - PROD_MAX_TS ))
log "Décalage auto : ${SHIFT_SECONDS}s (prod max → now)"
fi
log "Décalage appliqué : ${SHIFT_SECONDS} secondes ($(( SHIFT_SECONDS / 3600 ))h)"
# ── Filtre temporel ──────────────────────────────────────────────────────────
WHERE_CLAUSE=""
if [ -n "${DAYS_FILTER}" ]; then
WHERE_CLAUSE="WHERE time >= now() - toIntervalDay(${DAYS_FILTER})"
log "Filtre : ${DAYS_FILTER} derniers jours"
fi
# ── Création de la table de staging ──────────────────────────────────────────
log "Création de la table de staging…"
dev_query "DROP TABLE IF EXISTS ${DEV_DB_LOGS}._staging_prod_import"
dev_query "CREATE TABLE ${DEV_DB_LOGS}._staging_prod_import AS ${DEV_DB_LOGS}.http_logs ENGINE = MergeTree() ORDER BY (time, src_ip) SETTINGS index_granularity = 8192"
# ── Export par batch ─────────────────────────────────────────────────────────
# Compter les lignes à exporter
EXPORT_COUNT=$(prod_query "SELECT count() FROM ${PROD_CH_DB}.http_logs ${WHERE_CLAUSE}")
log "Export de ${EXPORT_COUNT} lignes en batches de ${BATCH_SIZE}"
NUM_BATCHES=$(( (EXPORT_COUNT + BATCH_SIZE - 1) / BATCH_SIZE ))
TOTAL_IMPORTED=0
for (( i=0; i<NUM_BATCHES; i++ )); do
OFFSET=$(( i * BATCH_SIZE ))
log " Batch $((i+1))/${NUM_BATCHES} (offset=${OFFSET})…"
# Toutes les colonnes dans l'ordre du schéma
QUERY="SELECT * FROM ${PROD_CH_DB}.http_logs ${WHERE_CLAUSE} ORDER BY time, src_ip LIMIT ${BATCH_SIZE} OFFSET ${OFFSET} FORMAT Native"
prod_query_to_file "${QUERY}" "${EXPORT_FILE}" \
|| err "Export batch $((i+1)) échoué"
FILE_SIZE=$(stat -c%s "${EXPORT_FILE}" 2>/dev/null || stat -f%z "${EXPORT_FILE}" 2>/dev/null || echo 0)
if [ "${FILE_SIZE}" -eq 0 ]; then
log " Batch vide, arrêt."
break
fi
dev_insert_native "${EXPORT_FILE}" \
|| err "Import batch $((i+1)) échoué"
BATCH_ROWS=$(dev_query "SELECT count() FROM ${DEV_DB_LOGS}._staging_prod_import" )
BATCH_IMPORTED=$(( BATCH_ROWS - TOTAL_IMPORTED ))
TOTAL_IMPORTED=${BATCH_ROWS}
log "${BATCH_IMPORTED} lignes importées (total staging: ${TOTAL_IMPORTED})"
rm -f "${EXPORT_FILE}"
done
log "Staging terminé : ${TOTAL_IMPORTED} lignes."
# ── Insertion avec décalage de date ──────────────────────────────────────────
log "Nettoyage de la table cible (données de prod précédentes)…"
# On identifie les données de prod importées par le décalage — on tronque tout
# car les données de dev sont générées par le traffic seeder / intégration
dev_query "TRUNCATE TABLE ${DEV_DB_LOGS}.http_logs"
log "Insertion avec décalage de ${SHIFT_SECONDS}s dans ${DEV_DB_LOGS}.http_logs…"
dev_query "
INSERT INTO ${DEV_DB_LOGS}.http_logs
SELECT
time + toIntervalSecond(${SHIFT_SECONDS}) AS time,
toDate(time + toIntervalSecond(${SHIFT_SECONDS})) AS log_date,
src_ip, src_port, dst_ip, dst_port,
src_asn, src_country_code, src_as_name, src_org, src_domain,
method, scheme, host, path, query, http_version,
orphan_side, correlated, keepalives,
a_timestamp, b_timestamp, conn_id,
ip_meta_df, ip_meta_id, ip_meta_total_length, ip_meta_ttl,
tcp_meta_options, tcp_meta_window_size, tcp_meta_mss, tcp_meta_window_scale,
syn_to_clienthello_ms,
tls_version, tls_sni, tls_alpn, ja3, ja3_hash, ja4,
client_headers,
header_user_agent, header_accept, header_accept_encoding, header_accept_language,
header_content_type, header_x_request_id, header_x_trace_id, header_x_forwarded_for,
header_sec_ch_ua, header_sec_ch_ua_mobile, header_sec_ch_ua_platform,
header_sec_fetch_dest, header_sec_fetch_mode, header_sec_fetch_site,
anubis_bot_name, anubis_bot_action, anubis_bot_category
FROM ${DEV_DB_LOGS}._staging_prod_import
"
FINAL_COUNT=$(dev_query "SELECT count() FROM ${DEV_DB_LOGS}.http_logs")
FINAL_MIN=$(dev_query "SELECT min(time) FROM ${DEV_DB_LOGS}.http_logs")
FINAL_MAX=$(dev_query "SELECT max(time) FROM ${DEV_DB_LOGS}.http_logs")
log "Nettoyage de la table de staging…"
dev_query "DROP TABLE IF EXISTS ${DEV_DB_LOGS}._staging_prod_import"
rm -f "${EXPORT_FILE}"
# ── Résultat ─────────────────────────────────────────────────────────────────
log "════════════════════════════════════════════════════"
log " Import terminé : ${FINAL_COUNT} lignes dans ${DEV_DB_LOGS}.http_logs"
log " Plage : ${FINAL_MIN}${FINAL_MAX}"
log " Décalage appliqué : ${SHIFT_SECONDS}s"
log "════════════════════════════════════════════════════"