#!/usr/bin/env python3 """ fetch_rules.py — Récupère TOUTES les règles Anubis depuis GitHub et les insère dans ClickHouse. Sources : - data/bots/**/*.yaml (bots pathologiques, IA, IRC) - data/crawlers/*.yaml (crawlers légitimes et clouds) - data/clients/*.yaml (clients IA agissant pour utilisateurs) - data/common/*.yaml (règles communes : IPs privées, etc.) - data/botPolicies.yaml (règles ASN et pays inline) Usage (depuis le container dashboard_web) : python /tmp/fetch_rules.py Variables d'environnement : CLICKHOUSE_HOST, CLICKHOUSE_DB, CLICKHOUSE_DB_PROCESSING, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD """ import json import os import sys import urllib.request import urllib.error try: import yaml except ImportError: print("[ERREUR] pyyaml manquant.", file=sys.stderr) sys.exit(1) try: import clickhouse_connect except ImportError: print("[ERREUR] clickhouse-connect manquant.", file=sys.stderr) sys.exit(1) # ────────────────────────────────────────────────────────────────────────────── # Config # ────────────────────────────────────────────────────────────────────────────── GITHUB_API = "https://api.github.com/repos/TecharoHQ/anubis/contents" GITHUB_RAW = "https://raw.githubusercontent.com/TecharoHQ/anubis/main" # Répertoires à parcourir — ORDER CRITIQUE pour REGEXP_TREE : # Dans REGEXP_TREE (root-level rules), la règle avec l'ID le plus bas gagne quand plusieurs matchent. # → Les règles SPÉCIFIQUES doivent être chargées en PREMIER (IDs bas) pour gagner sur les catch-alls. # → Les catch-alls (ai-robots-txt, ai-catchall) doivent être chargés en DERNIER (IDs hauts). # # Au sein de chaque répertoire, les fichiers sont triés EN ORDRE ALPHABÉTIQUE INVERSÉ # pour que les règles spécifiques (noms longs) aient des IDs plus bas que les catch-alls (ai.yaml). DIRECTORIES = [ ("data/clients", "clients"), # Règles AI clients avec IP (openai-chatgpt-user, etc.) ("data/bots/irc-bots", "bots/irc-bots"), # Bots IRC spécifiques ("data/crawlers", "crawlers"), # Crawlers spécifiques + clouds ("data/common", "common"), # IPs privées, routes communes ("data/bots", "bots"), # Catch-alls larges (ai-robots-txt, ai-catchall) — LAST ] # Fichier de politique principal (règles ASN inline) BOT_POLICIES_PATH = "data/botPolicies.yaml" # ────────────────────────────────────────────────────────────────────────────── # HTTP helpers # ────────────────────────────────────────────────────────────────────────────── def _fetch_url(url: str, timeout: int = 15) -> str | None: """Télécharge le contenu d'une URL en texte UTF-8. Retourne None en cas d'erreur réseau.""" try: with urllib.request.urlopen(url, timeout=timeout) as resp: return resp.read().decode("utf-8") except urllib.error.URLError as e: print(f"[WARN] {url}: {e}", file=sys.stderr) return None def fetch_yaml_url(url: str) -> list | dict | None: """Télécharge et désérialise un fichier YAML depuis une URL. Retourne None si inaccessible.""" content = _fetch_url(url) if content: return yaml.safe_load(content) return None def list_yaml_files(api_path: str) -> list[str]: """ Retourne la liste des raw URLs des fichiers .yaml/.yml dans api_path via l'API GitHub. Les fichiers sont triés en ordre ALPHABÉTIQUE INVERSÉ pour que les règles spécifiques (noms longs, ex: openai-chatgpt-user.yaml) aient un ID inférieur aux catch-alls (ai.yaml). """ content = _fetch_url(f"{GITHUB_API}/{api_path}") if not content: return [] try: entries = json.loads(content) except json.JSONDecodeError: return [] files = [ entry for entry in entries if entry.get("type") == "file" and entry.get("name", "").endswith((".yaml", ".yml")) ] # Tri inverse : les noms longs (spécifiques) avant les noms courts (catch-alls) files.sort(key=lambda e: e["name"], reverse=True) return [f["download_url"] for f in files] # ────────────────────────────────────────────────────────────────────────────── # Parse des fichiers YAML # ────────────────────────────────────────────────────────────────────────────── def parse_file( url: str, category: str, ua_name_to_id: dict, ua_id_counter_ref: list, rule_id_counter_ref: list, ) -> tuple[list[dict], list[dict]]: """ Parse un fichier YAML Anubis. Retourne ([], ip_rules) — les règles UA ne sont plus collectées. """ data = fetch_yaml_url(url) if not data or not isinstance(data, list): return [], [] ip_rules = [] for rule in data: if not isinstance(rule, dict): continue if "import" in rule: continue name = rule.get("name", "").strip() action = rule.get("action", "").strip() if not name or not action: continue remote_addrs = [str(c).strip() for c in rule.get("remote_addresses", []) if c] rule_id = rule_id_counter_ref[0] rule_id_counter_ref[0] += 1 # ── IP/CIDR ranges ─────────────────────────────────────────────────── for cidr in remote_addrs: ip_rules.append({ "prefix": cidr, "bot_name": name, "action": action, "rule_id": rule_id, "has_ua": 0, "category": category, }) return [], ip_rules def parse_bot_policies_inline(url: str) -> tuple[list[dict], list[dict]]: """ Parse botPolicies.yaml pour les règles ASN inline. Retourne (asn_rules, []) — les règles pays ne sont plus collectées. """ data = fetch_yaml_url(url) if not data or not isinstance(data, dict): return [], [] asn_rules: list[dict] = [] for rule in data.get("bots", []): if not isinstance(rule, dict): continue if "import" in rule: continue name = rule.get("name", "").strip() action = rule.get("action", "").strip() if not name or not action: continue # ASN rules asns = rule.get("asns", {}) if isinstance(asns, dict): for asn in asns.get("match", []): asn_rules.append({ "asn": int(asn), "bot_name": name, "action": action, "category": "policies", }) return asn_rules, [] # ────────────────────────────────────────────────────────────────────────────── # Collecte de toutes les règles # ────────────────────────────────────────────────────────────────────────────── def collect_all_rules() -> tuple[list, list]: """Retourne (ip_rules, asn_rules). Les règles UA et pays ne sont plus collectées.""" ua_name_to_id: dict[str, int] = {} ua_id_counter_ref: list[int] = [1] rule_id_counter: list[int] = [1] all_ip: list[dict] = [] for api_path, category in DIRECTORIES: print(f"[INFO] Parcours de {api_path} ({category})…") file_urls = list_yaml_files(api_path) print(f" {len(file_urls)} fichiers trouvés") for url in file_urls: _ua, ip = parse_file(url, category, ua_name_to_id, ua_id_counter_ref, rule_id_counter) all_ip.extend(ip) # Règles ASN depuis botPolicies.yaml print(f"[INFO] Lecture de botPolicies.yaml…") policies_url = f"{GITHUB_RAW}/{BOT_POLICIES_PATH}" asn_rules, _country_rules = parse_bot_policies_inline(policies_url) return all_ip, asn_rules # ────────────────────────────────────────────────────────────────────────────── # ClickHouse # ────────────────────────────────────────────────────────────────────────────── def get_ch_client(): """Crée et retourne un client ClickHouse configuré depuis les variables d'environnement.""" return clickhouse_connect.get_client( host=os.environ.get("CLICKHOUSE_HOST", "clickhouse"), database=os.environ.get("CLICKHOUSE_DB_PROCESSING", os.environ.get("CLICKHOUSE_DB", "ja4_processing")), username=os.environ.get("CLICKHOUSE_USER", "admin"), password=os.environ.get("CLICKHOUSE_PASSWORD", ""), ) DB_PROC = os.environ.get("CLICKHOUSE_DB_PROCESSING", os.environ.get("CLICKHOUSE_DB", "ja4_processing")) def insert_ip_rules(client, rules: list[dict]) -> None: """Tronque et remplace la table anubis_ip_rules avec les règles CIDR/IP fournies.""" if not rules: print("[INFO] Aucune règle IP.") return client.command(f"TRUNCATE TABLE {DB_PROC}.anubis_ip_rules") data = [ [r["prefix"], r["bot_name"], r["action"], r["rule_id"], r["has_ua"], r["category"]] for r in rules ] client.insert(f"{DB_PROC}.anubis_ip_rules", data, column_names=["prefix", "bot_name", "action", "rule_id", "has_ua", "category"]) print(f"[OK] {len(rules)} règles IP insérées.") def insert_asn_rules(client, rules: list[dict]) -> None: """Tronque et remplace la table anubis_asn_rules avec les règles ASN fournies.""" if not rules: print("[INFO] Aucune règle ASN.") return client.command(f"TRUNCATE TABLE {DB_PROC}.anubis_asn_rules") data = [[r["asn"], r["bot_name"], r["action"], r["category"]] for r in rules] client.insert(f"{DB_PROC}.anubis_asn_rules", data, column_names=["asn", "bot_name", "action", "category"]) print(f"[OK] {len(rules)} règles ASN insérées.") def reload_dicts(client) -> None: """Recharge les dictionnaires ClickHouse Anubis IP et ASN après mise à jour.""" dicts = [ f"{DB_PROC}.dict_anubis_ip", f"{DB_PROC}.dict_anubis_asn", ] for d in dicts: try: client.command(f"SYSTEM RELOAD DICTIONARY {d}") print(f"[OK] {d} rechargé.") except Exception as e: print(f"[WARN] Rechargement {d}: {e}", file=sys.stderr) # ────────────────────────────────────────────────────────────────────────────── # Rapport # ────────────────────────────────────────────────────────────────────────────── def print_summary(ip_rules, asn_rules): """Affiche un résumé lisible des règles collectées (IP, ASN) sur la sortie standard.""" print(f"\n── Règles IP : {len(ip_rules)} CIDRs ──") by_bot: dict[str, list] = {} for r in ip_rules: by_bot.setdefault(r["bot_name"], []).append(r) for bot, rs in sorted(by_bot.items())[:15]: print(f" [{rs[0]['action']:9s}] {bot}: {len(rs)} CIDRs (cat={rs[0]['category']})") if len(by_bot) > 15: print(f" … et {len(by_bot) - 15} autres bots") if asn_rules: print(f"\n── Règles ASN : {len(asn_rules)} ──") for r in asn_rules: print(f" [{r['action']:9s}] ASN {r['asn']}: {r['bot_name']}") # ────────────────────────────────────────────────────────────────────────────── # Main # ────────────────────────────────────────────────────────────────────────────── def main() -> None: """Point d'entrée principal : collecte les règles Anubis IP/CIDR et ASN, puis les charge dans ClickHouse.""" print("[INFO] Collecte des règles Anubis depuis GitHub (IP/CIDR + ASN uniquement)…") ip_rules, asn_rules = collect_all_rules() total = len(ip_rules) + len(asn_rules) print(f"\n[INFO] {len(ip_rules)} CIDRs IP, {len(asn_rules)} ASN (total={total})") if total == 0: print("[ERREUR] Aucune règle récupérée.", file=sys.stderr) sys.exit(1) print_summary(ip_rules, asn_rules) print("\n[INFO] Connexion à ClickHouse…") client = get_ch_client() insert_ip_rules(client, ip_rules) insert_asn_rules(client, asn_rules) reload_dicts(client) print("\n[OK] Règles Anubis chargées avec succès.") if __name__ == "__main__": main()