#!/usr/bin/env bash # ============================================================================= # import-prod-data.sh — Importe les données prod pré-extraites dans la DB de dev # avec un décalage de date dynamique (max(time) → now()). # # Les données doivent avoir été exportées au format Native dans le répertoire # scripts/data/prod-export/ (fichiers *.native). Utilisez reload-prod-logs.sh # pour effectuer l'extraction initiale depuis la prod. # # Usage: # ./scripts/import-prod-data.sh # décalage auto # ./scripts/import-prod-data.sh --shift 3600 # décalage manuel (secondes) # ./scripts/import-prod-data.sh --container my-ch # conteneur spécifique # ./scripts/import-prod-data.sh --no-truncate # conserver les données existantes # ./scripts/import-prod-data.sh --cron # mode silencieux # # Variables d'environnement : # DEV_CONTAINER Nom du conteneur ClickHouse (défaut: integration-clickhouse-1) # DEV_DB_LOGS Base de données logs (défaut: ja4_logs) # EXPORT_DIR Répertoire des fichiers Native (défaut: scripts/data/prod-export) # ============================================================================= set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" # ── Configuration ──────────────────────────────────────────────────────────── DEV_CONTAINER="${DEV_CONTAINER:-integration-clickhouse-1}" DEV_DB_LOGS="${DEV_DB_LOGS:-ja4_logs}" EXPORT_DIR="${EXPORT_DIR:-${REPO_ROOT}/scripts/data/prod-export}" SHIFT_SECONDS="" NO_TRUNCATE=false CRON_MODE=false # ── Parsing des arguments ──────────────────────────────────────────────────── while [[ $# -gt 0 ]]; do case "$1" in --container) DEV_CONTAINER="$2"; shift 2 ;; --shift) SHIFT_SECONDS="$2"; shift 2 ;; --no-truncate) NO_TRUNCATE=true; shift ;; --cron) CRON_MODE=true; shift ;; -h|--help) sed -n '2,/^# =====/{ /^# =====/d; s/^# \?//p; }' "$0" exit 0 ;; *) echo "Option inconnue : $1"; exit 1 ;; esac done STAGING_TABLE="${DEV_DB_LOGS}._staging_prod_import" LOG_PREFIX="[import-prod]" log() { [ "${CRON_MODE}" = false ] && echo "${LOG_PREFIX} $(date '+%H:%M:%S') $*" || true; } err() { echo "${LOG_PREFIX} ERREUR: $*" >&2; exit 1; } # ── Requêteur CH ───────────────────────────────────────────────────────────── ch() { docker exec -i "${DEV_CONTAINER}" clickhouse-client --query "$1" } ch_insert_native() { docker exec -i "${DEV_CONTAINER}" clickhouse-client \ --query "INSERT INTO ${STAGING_TABLE} FORMAT Native" } # ── Vérifications ──────────────────────────────────────────────────────────── log "Vérification du conteneur ${DEV_CONTAINER}…" docker exec "${DEV_CONTAINER}" clickhouse-client --query "SELECT 1" > /dev/null 2>&1 \ || err "Conteneur ${DEV_CONTAINER} inaccessible" log "Vérification des fichiers d'export dans ${EXPORT_DIR}…" NATIVE_FILES=("${EXPORT_DIR}"/*.native) if [[ ! -f "${NATIVE_FILES[0]}" ]]; then err "Aucun fichier *.native trouvé dans ${EXPORT_DIR}. Lancez d'abord reload-prod-logs.sh." fi FILE_COUNT=${#NATIVE_FILES[@]} log " ${FILE_COUNT} fichier(s) Native trouvé(s)" # ── Création de la table de staging ────────────────────────────────────────── log "Création de la table de staging…" ch "DROP TABLE IF EXISTS ${STAGING_TABLE}" ch "CREATE TABLE ${STAGING_TABLE} AS ${DEV_DB_LOGS}.http_logs ENGINE = MergeTree() ORDER BY (time, src_ip) SETTINGS index_granularity = 8192" # ── Import des fichiers Native ─────────────────────────────────────────────── log "Import des ${FILE_COUNT} fichiers dans le staging…" TOTAL_IMPORTED=0 for f in "${NATIVE_FILES[@]}"; do BASENAME=$(basename "${f}") FILE_SIZE=$(stat -c%s "${f}" 2>/dev/null || stat -f%z "${f}" 2>/dev/null || echo 0) if [ "${FILE_SIZE}" -eq 0 ]; then log " ${BASENAME} — vide, ignoré" continue fi log " ${BASENAME} ($(( FILE_SIZE / 1024 / 1024 )) Mo)…" ch_insert_native < "${f}" \ || err "Import échoué pour ${BASENAME}" CURRENT=$(ch "SELECT count() FROM ${STAGING_TABLE}") BATCH_ROWS=$(( CURRENT - TOTAL_IMPORTED )) TOTAL_IMPORTED=${CURRENT} log " → ${BATCH_ROWS} lignes (total staging: ${TOTAL_IMPORTED})" done log "Staging terminé : ${TOTAL_IMPORTED} lignes." if [ "${TOTAL_IMPORTED}" -eq 0 ]; then ch "DROP TABLE IF EXISTS ${STAGING_TABLE}" err "Aucune donnée importée dans le staging" fi # ── Calcul du décalage de date ─────────────────────────────────────────────── if [ -z "${SHIFT_SECONDS}" ]; then STAGING_MAX_TS=$(ch "SELECT toUnixTimestamp(max(time)) FROM ${STAGING_TABLE}") NOW_TS=$(date +%s) SHIFT_SECONDS=$(( NOW_TS - STAGING_MAX_TS )) log "Décalage auto : ${SHIFT_SECONDS}s (max(staging) → now)" fi log "Décalage appliqué : ${SHIFT_SECONDS} secondes ($(( SHIFT_SECONDS / 3600 ))h $(( (SHIFT_SECONDS % 3600) / 60 ))min)" # ── Nettoyage de la table cible ────────────────────────────────────────────── if [ "${NO_TRUNCATE}" = false ]; then log "Nettoyage de ${DEV_DB_LOGS}.http_logs…" ch "TRUNCATE TABLE ${DEV_DB_LOGS}.http_logs" fi # ── Insertion avec décalage de date ────────────────────────────────────────── log "Insertion avec décalage dans ${DEV_DB_LOGS}.http_logs…" ch " INSERT INTO ${DEV_DB_LOGS}.http_logs SELECT time + toIntervalSecond(${SHIFT_SECONDS}) AS time, toDate(time + toIntervalSecond(${SHIFT_SECONDS})) AS log_date, src_ip, src_port, dst_ip, dst_port, src_asn, src_country_code, src_as_name, src_org, src_domain, method, scheme, host, path, query, http_version, orphan_side, correlated, keepalives, a_timestamp, b_timestamp, conn_id, ip_meta_df, ip_meta_id, ip_meta_total_length, ip_meta_ttl, tcp_meta_options, tcp_meta_window_size, tcp_meta_mss, tcp_meta_window_scale, syn_to_clienthello_ms, tls_version, tls_sni, tls_alpn, ja3, ja3_hash, ja4, client_headers, header_user_agent, header_accept, header_accept_encoding, header_accept_language, header_content_type, header_x_request_id, header_x_trace_id, header_x_forwarded_for, header_sec_ch_ua, header_sec_ch_ua_mobile, header_sec_ch_ua_platform, header_sec_fetch_dest, header_sec_fetch_mode, header_sec_fetch_site, anubis_bot_name, anubis_bot_action, anubis_bot_category, -- Colonnes h2 absentes des exports prod (ajoutées post-export) : valeurs vides par défaut '' AS h2_fingerprint, '' AS h2_settings_fp, 0 AS h2_window_update, '' AS h2_pseudo_order FROM ${STAGING_TABLE} " FINAL_COUNT=$(ch "SELECT count() FROM ${DEV_DB_LOGS}.http_logs") FINAL_MIN=$(ch "SELECT min(time) FROM ${DEV_DB_LOGS}.http_logs") FINAL_MAX=$(ch "SELECT max(time) FROM ${DEV_DB_LOGS}.http_logs") # ── Nettoyage ──────────────────────────────────────────────────────────────── log "Nettoyage de la table de staging…" ch "DROP TABLE IF EXISTS ${STAGING_TABLE}" # ── Résultat ───────────────────────────────────────────────────────────────── log "════════════════════════════════════════════════════" log " Import terminé : ${FINAL_COUNT} lignes" log " Plage : ${FINAL_MIN} → ${FINAL_MAX}" log " Décalage : ${SHIFT_SECONDS}s" log "════════════════════════════════════════════════════"