#!/usr/bin/env bash # ip-check.sh — Vérifie le statut d'une ou plusieurs IPs via le Bot Detector Dashboard API # # Usage: # ./ip-check.sh # ./ip-check.sh ... # ./ip-check.sh # ./ip-check.sh (ex: 192.168.1.0/24, max 1024 hôtes) # ./ip-check.sh -f (une IP/CIDR par ligne, # ignorés) # ./ip-check.sh --api ... # ./ip-check.sh --verbose JSON complet (défaut: résumé) # ./ip-check.sh --pretty Formater le JSON (jq) # ./ip-check.sh --parallel # ./ip-check.sh --endpoint investigation|reputation|reputation-summary # ./ip-check.sh --help set -euo pipefail # ── Valeurs par défaut ──────────────────────────────────────────────────────── API_BASE="http://test-sdv-anubis.sdv.fr:8000" PARALLEL=5 ENDPOINT="investigation" PRETTY=false VERBOSE=false INPUT_FILE="" IPS=() # ── Couleurs ────────────────────────────────────────────────────────────────── RED='\033[0;31m'; YELLOW='\033[1;33m'; GREEN='\033[0;32m' CYAN='\033[0;36m'; BOLD='\033[1m'; DIM='\033[2m'; RESET='\033[0m' usage() { echo -e "${BOLD}ip-check.sh${RESET} — Bot Detector IP Status Checker" echo "" echo -e "${BOLD}Usage:${RESET}" echo " $0 [options] [ ...]" echo "" echo -e "${BOLD}Options:${RESET}" echo " --api URL de base de l'API [défaut: http://localhost:8000]" echo " --parallel Requêtes simultanées [défaut: 5]" echo " --endpoint investigation | reputation | reputation-summary [défaut: investigation]" echo " --verbose, -v JSON complet (défaut: résumé condensé)" echo " --pretty Formater le JSON avec jq" echo " -f Lire les IPs/CIDRs depuis un fichier" echo " --help, -h Afficher cette aide" echo "" echo -e "${BOLD}Exemples:${RESET}" echo " $0 1.2.3.4" echo " $0 1.2.3.4 5.6.7.8 9.10.11.12" echo " $0 192.168.0.0/24" echo " $0 --verbose --pretty 1.2.3.4" echo " $0 --endpoint reputation --parallel 10 10.0.0.0/28" echo " $0 -f ips.txt --api http://monserveur:8000" exit 0 } err() { echo -e "${RED}[ERREUR]${RESET} $*" >&2; } info() { echo -e "${CYAN}[INFO]${RESET} $*" >&2; } # ── Dépendances ─────────────────────────────────────────────────────────────── check_deps() { command -v curl &>/dev/null || { err "curl requis : apt install curl"; exit 1; } command -v python3 &>/dev/null || { err "python3 requis pour les CIDRs."; exit 1; } if $PRETTY && ! command -v jq &>/dev/null; then err "--pretty nécessite jq : apt install jq"; exit 1 fi } # ── Expansion CIDR ──────────────────────────────────────────────────────────── expand_cidr() { python3 - "$1" <<'PYEOF' import sys, ipaddress arg = sys.argv[1] try: net = ipaddress.ip_network(arg, strict=False) if net.num_addresses > 1024: print(f"__LARGE_CIDR__{arg}", flush=True) elif net.num_addresses == 1: print(str(net.network_address), flush=True) else: for ip in net.hosts(): print(str(ip), flush=True) except ValueError: print(arg, flush=True) PYEOF } # ── Validation IPv4 ─────────────────────────────────────────────────────────── is_valid_ipv4() { python3 -c "import ipaddress,sys; ipaddress.IPv4Address(sys.argv[1])" "$1" 2>/dev/null } # ── URL par endpoint ────────────────────────────────────────────────────────── build_url() { local ip="$1" case "$ENDPOINT" in investigation) echo "${API_BASE}/api/investigation/${ip}/summary" ;; reputation) echo "${API_BASE}/api/reputation/ip/${ip}" ;; reputation-summary) echo "${API_BASE}/api/reputation/ip/${ip}/summary" ;; *) err "Endpoint inconnu: $ENDPOINT"; exit 1 ;; esac } # ── Requête HTTP ────────────────────────────────────────────────────────────── query_ip() { local ip="$1" local url url=$(build_url "$ip") local body http_code body=$(curl -s -w "\n__HTTP_CODE__%{http_code}" --max-time 15 "$url" 2>/dev/null) || { echo "{\"ip\":\"${ip}\",\"error\":\"timeout ou connexion refusée\",\"url\":\"${url}\"}" return } http_code=$(echo "$body" | grep -o '__HTTP_CODE__[0-9]*' | grep -o '[0-9]*') body=$(echo "$body" | sed 's/__HTTP_CODE__[0-9]*//') if [[ "$http_code" == "200" ]]; then echo "$body" else echo "{\"ip\":\"${ip}\",\"error\":\"HTTP ${http_code}\",\"url\":\"${url}\",\"detail\":${body}}" fi } export -f query_ip build_url err export API_BASE ENDPOINT # ── Arguments ───────────────────────────────────────────────────────────────── while [[ $# -gt 0 ]]; do case "$1" in --help|-h) usage ;; --api) shift; API_BASE="${1%/}" ;; --parallel) shift; PARALLEL="$1" ;; --endpoint) shift; ENDPOINT="$1" ;; --verbose|-v) VERBOSE=true ;; --pretty) PRETTY=true ;; -f) shift; INPUT_FILE="$1" ;; *) IPS+=("$1") ;; esac shift done check_deps # ── Collecter les IPs ────────────────────────────────────────────────────────── ALL_IPS=() LARGE_CIDRS=() process_token() { local token="$1" IFS=',' read -ra parts <<< "$token" for part in "${parts[@]}"; do part=$(echo "$part" | tr -d '[:space:]') [[ -z "$part" ]] && continue if [[ "$part" == *"/"* ]]; then while IFS= read -r expanded; do if [[ "$expanded" == __LARGE_CIDR__* ]]; then LARGE_CIDRS+=("${expanded#__LARGE_CIDR__}") else ALL_IPS+=("$expanded") fi done < <(expand_cidr "$part") else ALL_IPS+=("$part") fi done } for token in "${IPS[@]-}"; do process_token "$token"; done if [[ -n "$INPUT_FILE" ]]; then [[ ! -f "$INPUT_FILE" ]] && { err "Fichier introuvable: $INPUT_FILE"; exit 1; } while IFS= read -r line || [[ -n "$line" ]]; do line=$(echo "$line" | sed 's/#.*//' | tr -d '[:space:]') [[ -z "$line" ]] && continue process_token "$line" done < "$INPUT_FILE" fi [[ ${#ALL_IPS[@]} -eq 0 && ${#LARGE_CIDRS[@]} -eq 0 ]] && { err "Aucune IP fournie. Utilisez --help pour l'aide." exit 1 } for cidr in "${LARGE_CIDRS[@]+"${LARGE_CIDRS[@]}"}"; do err "CIDR trop grand (>1024 hôtes), ignoré: $cidr — affinez le masque." done VALID_IPS=() for ip in "${ALL_IPS[@]}"; do if is_valid_ipv4 "$ip"; then VALID_IPS+=("$ip") else err "IP invalide ignorée: $ip" fi done [[ ${#VALID_IPS[@]} -eq 0 ]] && { err "Aucune IP valide à traiter."; exit 1; } mapfile -t VALID_IPS < <(printf '%s\n' "${VALID_IPS[@]}" | sort -u) info "Endpoint : ${ENDPOINT} → ${API_BASE}" info "IPs : ${#VALID_IPS[@]} à vérifier (parallélisme: ${PARALLEL})" $VERBOSE && info "Mode : verbose (JSON complet)" || info "Mode : résumé (--verbose pour le JSON complet)" # ── Requêtes parallèles ─────────────────────────────────────────────────────── TMPDIR_WORK=$(mktemp -d) trap 'rm -rf "$TMPDIR_WORK"' EXIT run_batch() { for ip in "$@"; do query_ip "$ip" > "${TMPDIR_WORK}/${ip}.json" & done wait } total=${#VALID_IPS[@]} i=0 while [[ $i -lt $total ]]; do run_batch "${VALID_IPS[@]:$i:$PARALLEL}" (( i += PARALLEL )) done # ── Assembler + condenser si non-verbose ───────────────────────────────────── FINAL_JSON=$(python3 - "$TMPDIR_WORK" "$ENDPOINT" "$VERBOSE" <<'PYEOF' import json, os, sys, glob directory, endpoint, verbose_flag = sys.argv[1], sys.argv[2], sys.argv[3] == "true" def summarize_investigation(d): """Résumé condensé du endpoint investigation.""" if "error" in d: return d ml = d.get("ml", {}) bf = d.get("bruteforce", {}) tcp = d.get("tcp_spoofing", {}) rot = d.get("ja4_rotation", {}) per = d.get("persistence", {}) flags = [] if bf.get("active"): flags.append("BRUTEFORCE") if tcp.get("detected"): flags.append("TCP_SPOOF" + ("/BOT_TOOL" if tcp.get("is_bot_tool") else "")) if rot.get("rotating"): flags.append(f"JA4_ROTATION(x{rot.get('distinct_ja4_count',0)})") if per.get("persistent"): flags.append(f"PERSISTENT(x{per.get('recurrence',0)})") return { "ip": d.get("ip"), "risk_score": d.get("risk_score"), "threat_level": ml.get("threat_level") or per.get("worst_threat_level") or "—", "attack_type": ml.get("attack_type") or "—", "total_detections": ml.get("total_detections", 0), "distinct_hosts": ml.get("distinct_hosts", 0), "distinct_ja4": ml.get("distinct_ja4", 0), "ml_max_score": ml.get("max_score", 0), "bruteforce": bf.get("active", False), "bf_hits": bf.get("total_hits", 0) if bf.get("active") else None, "tcp_spoof": tcp.get("detected", False), "tcp_suspected_os": tcp.get("suspected_os") if tcp.get("detected") else None, "tcp_declared_os": tcp.get("declared_os") if tcp.get("detected") else None, "tcp_confidence": tcp.get("confidence") if tcp.get("detected") else None, "ja4_rotating": rot.get("rotating", False), "ja4_count": rot.get("distinct_ja4_count", 0) if rot.get("rotating") else None, "persistent": per.get("persistent", False), "recurrence": per.get("recurrence", 0) if per.get("persistent") else None, "first_seen": per.get("first_seen") if per.get("persistent") else None, "last_seen": per.get("last_seen") if per.get("persistent") else None, "flags": flags, } def summarize_reputation(d): """Résumé condensé du endpoint reputation.""" if "error" in d: return d agg = d.get("aggregated", {}) flags = [] if agg.get("is_proxy"): flags.append("PROXY") if agg.get("is_hosting"): flags.append("HOSTING") if agg.get("is_vpn"): flags.append("VPN") if agg.get("is_tor"): flags.append("TOR") return { "ip": d.get("ip"), "threat_level": agg.get("threat_level", "—"), "threat_score": agg.get("threat_score", 0), "country": agg.get("country"), "country_code": agg.get("country_code"), "asn": agg.get("asn"), "org": agg.get("org") or agg.get("asn_org"), "is_proxy": agg.get("is_proxy", False), "is_hosting": agg.get("is_hosting", False), "is_vpn": agg.get("is_vpn", False), "is_tor": agg.get("is_tor", False), "flags": flags, "warnings": agg.get("warnings", []), } results = [] for f in sorted(glob.glob(os.path.join(directory, "*.json"))): with open(f) as fh: try: data = json.load(fh) except json.JSONDecodeError: results.append({"error": "invalid JSON", "raw": open(f).read()}) continue if verbose_flag: results.append(data) elif endpoint == "investigation": results.append(summarize_investigation(data)) elif endpoint == "reputation": results.append(summarize_reputation(data)) else: results.append(data) # reputation-summary est déjà compact print(json.dumps({"count": len(results), "results": results})) PYEOF ) # ── Affichage ───────────────────────────────────────────────────────────────── if $PRETTY; then echo "$FINAL_JSON" | jq . else echo "$FINAL_JSON" fi