#!/usr/bin/env python3 """ fetch_rules.py — Récupère TOUTES les règles Anubis depuis GitHub et les insère dans ClickHouse. Sources : - data/bots/**/*.yaml (bots pathologiques, IA, IRC) - data/crawlers/*.yaml (crawlers légitimes et clouds) - data/clients/*.yaml (clients IA agissant pour utilisateurs) - data/common/*.yaml (règles communes : IPs privées, etc.) - data/botPolicies.yaml (règles ASN et pays inline) Usage (depuis le container dashboard_web) : python /tmp/fetch_rules.py Variables d'environnement : CLICKHOUSE_HOST, CLICKHOUSE_DB, CLICKHOUSE_DB_PROCESSING, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD """ import json import os import re import sys import urllib.request import urllib.error try: import yaml except ImportError: print("[ERREUR] pyyaml manquant.", file=sys.stderr) sys.exit(1) try: import clickhouse_connect except ImportError: print("[ERREUR] clickhouse-connect manquant.", file=sys.stderr) sys.exit(1) # ────────────────────────────────────────────────────────────────────────────── # Config # ────────────────────────────────────────────────────────────────────────────── GITHUB_API = "https://api.github.com/repos/TecharoHQ/anubis/contents" GITHUB_RAW = "https://raw.githubusercontent.com/TecharoHQ/anubis/main" # Répertoires à parcourir — ORDER CRITIQUE pour REGEXP_TREE : # Dans REGEXP_TREE (root-level rules), la règle avec l'ID le plus bas gagne quand plusieurs matchent. # → Les règles SPÉCIFIQUES doivent être chargées en PREMIER (IDs bas) pour gagner sur les catch-alls. # → Les catch-alls (ai-robots-txt, ai-catchall) doivent être chargés en DERNIER (IDs hauts). # # Au sein de chaque répertoire, les fichiers sont triés EN ORDRE ALPHABÉTIQUE INVERSÉ # pour que les règles spécifiques (noms longs) aient des IDs plus bas que les catch-alls (ai.yaml). DIRECTORIES = [ ("data/clients", "clients"), # Règles AI clients avec IP (openai-chatgpt-user, etc.) ("data/bots/irc-bots", "bots/irc-bots"), # Bots IRC spécifiques ("data/crawlers", "crawlers"), # Crawlers spécifiques + clouds ("data/common", "common"), # IPs privées, routes communes ("data/bots", "bots"), # Catch-alls larges (ai-robots-txt, ai-catchall) — LAST ] # Fichier de politique principal (règles ASN + pays inline) BOT_POLICIES_PATH = "data/botPolicies.yaml" # UA_PARENT_OVERRIDE : mapping nom_règle → nom_parent pour forcer la hiérarchie REGEXP_TREE. # Conservé vide intentionnellement : l'ordre de chargement (spécifique avant catch-all) # garantit la priorité sans hiérarchie parent_id explicite. # Populer ce dict si une règle doit hériter d'une autre via parent_id dans REGEXP_TREE. UA_PARENT_OVERRIDE: dict[str, str] = {} # ────────────────────────────────────────────────────────────────────────────── # HTTP helpers # ────────────────────────────────────────────────────────────────────────────── def _fetch_url(url: str, timeout: int = 15) -> str | None: try: with urllib.request.urlopen(url, timeout=timeout) as resp: return resp.read().decode("utf-8") except urllib.error.URLError as e: print(f"[WARN] {url}: {e}", file=sys.stderr) return None def fetch_yaml_url(url: str) -> list | dict | None: content = _fetch_url(url) if content: return yaml.safe_load(content) return None def list_yaml_files(api_path: str) -> list[str]: """ Retourne la liste des raw URLs des fichiers .yaml/.yml dans api_path via l'API GitHub. Les fichiers sont triés en ordre ALPHABÉTIQUE INVERSÉ pour que les règles spécifiques (noms longs, ex: openai-chatgpt-user.yaml) aient un ID inférieur aux catch-alls (ai.yaml). """ content = _fetch_url(f"{GITHUB_API}/{api_path}") if not content: return [] try: entries = json.loads(content) except json.JSONDecodeError: return [] files = [ entry for entry in entries if entry.get("type") == "file" and entry.get("name", "").endswith((".yaml", ".yml")) ] # Tri inverse : les noms longs (spécifiques) avant les noms courts (catch-alls) files.sort(key=lambda e: e["name"], reverse=True) return [f["download_url"] for f in files] # ────────────────────────────────────────────────────────────────────────────── # Extraction des patterns UA depuis les expressions CEL-like # ────────────────────────────────────────────────────────────────────────────── def _extract_ua_from_all(conditions: list) -> str | None: """Extrait une regex UA depuis une expression 'all' (ex: yandexbot userAgent.matches).""" for cond in conditions: if not isinstance(cond, str): continue m = re.search(r'userAgent\.matches\("(.+?)"\)', cond) if m: return m.group(1).replace("\\\\", "\\") return None def _extract_ua_from_any(conditions: list) -> str | None: """ Extrait une regex UA depuis une expression 'any' avec userAgent.contains(...) Exemple : aggressive-brazilian-scrapers.yaml Retourne une regex en OR : MSIE|Trident|... """ patterns = [] for cond in conditions: if not isinstance(cond, str): continue m = re.search(r'userAgent\.contains\("(.+?)"\)', cond) if m: patterns.append(re.escape(m.group(1))) if patterns: return "|".join(patterns) return None def extract_ua_regex(rule: dict) -> str | None: """Extrait la regex User-Agent depuis toutes les formes possibles.""" # Forme directe if ua := rule.get("user_agent_regex"): return ua.strip() expr = rule.get("expression") if not expr: return None # Expression scalaire (CEL string) if isinstance(expr, str): m = re.search(r'userAgent\.matches\("(.+?)"\)', expr) if m: return m.group(1).replace("\\\\", "\\") m = re.search(r'userAgent\.contains\("(.+?)"\)', expr) if m: return re.escape(m.group(1)) return None # Expression structurée dict if isinstance(expr, dict): if ua := _extract_ua_from_all(expr.get("all", [])): return ua if ua := _extract_ua_from_any(expr.get("any", [])): return ua return None # ────────────────────────────────────────────────────────────────────────────── # Parse des fichiers YAML # ────────────────────────────────────────────────────────────────────────────── def parse_file( url: str, category: str, ua_name_to_id: dict, ua_id_counter_ref: list, # [int] — compteur mutable partagé entre appels rule_id_counter_ref: list, # [int] — idem ) -> tuple[list[dict], list[dict]]: """ Parse un fichier YAML Anubis. Retourne (ua_rules, ip_rules). Note : ua_name_to_id est maintenu pour supporter la hiérarchie parent_id dans REGEXP_TREE (via UA_PARENT_OVERRIDE). Tant que UA_PARENT_OVERRIDE est vide, parent_id vaut toujours 0 et ua_name_to_id n'est pas consulté en pratique. """ data = fetch_yaml_url(url) if not data or not isinstance(data, list): return [], [] ua_rules, ip_rules = [], [] for rule in data: if not isinstance(rule, dict): continue # Ignorer les imports (références à d'autres fichiers) if "import" in rule: continue name = rule.get("name", "").strip() action = rule.get("action", "").strip() if not name or not action: continue remote_addrs = [str(c).strip() for c in rule.get("remote_addresses", []) if c] has_ip = bool(remote_addrs) rule_id = rule_id_counter_ref[0] rule_id_counter_ref[0] += 1 # ── User-Agent regex ───────────────────────────────────────────────── ua_regex = extract_ua_regex(rule) if ua_regex: parent_name = UA_PARENT_OVERRIDE.get(name) parent_id = ua_name_to_id.get(parent_name, 0) if parent_name else 0 uid = ua_id_counter_ref[0] ua_id_counter_ref[0] += 1 ua_name_to_id[name] = uid ua_rules.append({ "id": uid, "parent_id": parent_id, "regexp": ua_regex, "bot_name": name, "action": action, "has_ip": "1" if has_ip else "0", "rule_id": str(rule_id), "category": category, }) # ── IP/CIDR ranges ─────────────────────────────────────────────────── has_ua = bool(ua_regex) for cidr in remote_addrs: ip_rules.append({ "prefix": cidr, "bot_name": name, "action": action, "rule_id": rule_id, "has_ua": 1 if has_ua else 0, "category": category, }) return ua_rules, ip_rules def parse_bot_policies_inline(url: str) -> tuple[list[dict], list[dict]]: """ Parse botPolicies.yaml pour les règles inline avec geoip.countries et asns.match. Retourne (asn_rules, country_rules). """ data = fetch_yaml_url(url) if not data or not isinstance(data, dict): return [], [] asn_rules: list[dict] = [] country_rules: list[dict] = [] for rule in data.get("bots", []): if not isinstance(rule, dict): continue if "import" in rule: continue name = rule.get("name", "").strip() action = rule.get("action", "").strip() if not name or not action: continue # ASN rules asns = rule.get("asns", {}) if isinstance(asns, dict): for asn in asns.get("match", []): asn_rules.append({ "asn": int(asn), "bot_name": name, "action": action, "category": "policies", }) # Country rules geoip = rule.get("geoip", {}) if isinstance(geoip, dict): for cc in geoip.get("countries", []): country_rules.append({ "country_code": str(cc).upper(), "bot_name": name, "action": action, "category": "policies", }) return asn_rules, country_rules # ────────────────────────────────────────────────────────────────────────────── # Collecte de toutes les règles # ────────────────────────────────────────────────────────────────────────────── def collect_all_rules() -> tuple[list, list, list, list]: """Retourne (ua_rules, ip_rules, asn_rules, country_rules).""" ua_name_to_id: dict[str, int] = {} ua_id_counter_ref: list[int] = [1] rule_id_counter: list[int] = [1] all_ua: list[dict] = [] all_ip: list[dict] = [] for api_path, category in DIRECTORIES: print(f"[INFO] Parcours de {api_path} ({category})…") file_urls = list_yaml_files(api_path) print(f" {len(file_urls)} fichiers trouvés") for url in file_urls: ua, ip = parse_file(url, category, ua_name_to_id, ua_id_counter_ref, rule_id_counter) all_ua.extend(ua) all_ip.extend(ip) # Règles ASN + pays depuis botPolicies.yaml print(f"[INFO] Lecture de botPolicies.yaml…") policies_url = f"{GITHUB_RAW}/{BOT_POLICIES_PATH}" asn_rules, country_rules = parse_bot_policies_inline(policies_url) return all_ua, all_ip, asn_rules, country_rules # ────────────────────────────────────────────────────────────────────────────── # ClickHouse # ────────────────────────────────────────────────────────────────────────────── def get_ch_client(): return clickhouse_connect.get_client( host=os.environ.get("CLICKHOUSE_HOST", "clickhouse"), database=os.environ.get("CLICKHOUSE_DB_PROCESSING", os.environ.get("CLICKHOUSE_DB", "ja4_processing")), username=os.environ.get("CLICKHOUSE_USER", "admin"), password=os.environ.get("CLICKHOUSE_PASSWORD", ""), ) DB_PROC = os.environ.get("CLICKHOUSE_DB_PROCESSING", os.environ.get("CLICKHOUSE_DB", "ja4_processing")) def insert_ua_rules(client, rules: list[dict]) -> None: if not rules: print("[INFO] Aucune règle UA.") return client.command(f"TRUNCATE TABLE {DB_PROC}.anubis_ua_rules") # REGEXP_TREE format : id, parent_id, regexp, keys[], values[] # keys = ['bot_name', 'action', 'has_ip', 'rule_id', 'category'] data = [ [ r["id"], r["parent_id"], r["regexp"], ["bot_name", "action", "has_ip", "rule_id", "category"], [r["bot_name"], r["action"], r["has_ip"], r["rule_id"], r["category"]], ] for r in rules ] client.insert(f"{DB_PROC}.anubis_ua_rules", data, column_names=["id", "parent_id", "regexp", "keys", "values"]) print(f"[OK] {len(rules)} règles UA insérées.") def insert_ip_rules(client, rules: list[dict]) -> None: if not rules: print("[INFO] Aucune règle IP.") return client.command(f"TRUNCATE TABLE {DB_PROC}.anubis_ip_rules") data = [ [r["prefix"], r["bot_name"], r["action"], r["rule_id"], r["has_ua"], r["category"]] for r in rules ] client.insert(f"{DB_PROC}.anubis_ip_rules", data, column_names=["prefix", "bot_name", "action", "rule_id", "has_ua", "category"]) print(f"[OK] {len(rules)} règles IP insérées.") def insert_asn_rules(client, rules: list[dict]) -> None: if not rules: print("[INFO] Aucune règle ASN.") return client.command(f"TRUNCATE TABLE {DB_PROC}.anubis_asn_rules") data = [[r["asn"], r["bot_name"], r["action"], r["category"]] for r in rules] client.insert(f"{DB_PROC}.anubis_asn_rules", data, column_names=["asn", "bot_name", "action", "category"]) print(f"[OK] {len(rules)} règles ASN insérées.") def insert_country_rules(client, rules: list[dict]) -> None: if not rules: print("[INFO] Aucune règle pays.") return client.command(f"TRUNCATE TABLE {DB_PROC}.anubis_country_rules") data = [[r["country_code"], r["bot_name"], r["action"], r["category"]] for r in rules] client.insert(f"{DB_PROC}.anubis_country_rules", data, column_names=["country_code", "bot_name", "action", "category"]) print(f"[OK] {len(rules)} règles pays insérées.") def reload_dicts(client) -> None: dicts = [ f"{DB_PROC}.dict_anubis_ua", f"{DB_PROC}.dict_anubis_ip", f"{DB_PROC}.dict_anubis_asn", f"{DB_PROC}.dict_anubis_country", ] for d in dicts: try: client.command(f"SYSTEM RELOAD DICTIONARY {d}") print(f"[OK] {d} rechargé.") except Exception as e: print(f"[WARN] Rechargement {d}: {e}", file=sys.stderr) # ────────────────────────────────────────────────────────────────────────────── # Rapport # ────────────────────────────────────────────────────────────────────────────── def print_summary(ua_rules, ip_rules, asn_rules, country_rules): print("\n── Règles UA ──") by_cat: dict[str, list] = {} for r in ua_rules: by_cat.setdefault(r["category"], []).append(r) for cat, rules in sorted(by_cat.items()): print(f" [{cat}] {len(rules)} règle(s)") for r in rules[:5]: has = " [+IP]" if r["has_ip"] == "1" else "" par = f" [parent={r['parent_id']}]" if r["parent_id"] else "" print(f" [{r['action']:9s}] {r['bot_name']}{has}{par}: {r['regexp'][:50]}") if len(rules) > 5: print(f" … et {len(rules) - 5} autres") print(f"\n── Règles IP : {len(ip_rules)} CIDRs ──") by_bot: dict[str, list] = {} for r in ip_rules: by_bot.setdefault(r["bot_name"], []).append(r) for bot, rs in sorted(by_bot.items())[:15]: print(f" [{rs[0]['action']:9s}] {bot}: {len(rs)} CIDRs (cat={rs[0]['category']}, has_ua={rs[0]['has_ua']})") if len(by_bot) > 15: print(f" … et {len(by_bot) - 15} autres bots") if asn_rules: print(f"\n── Règles ASN : {len(asn_rules)} ──") for r in asn_rules: print(f" [{r['action']:9s}] ASN {r['asn']}: {r['bot_name']}") if country_rules: print(f"\n── Règles pays : {len(country_rules)} ──") for r in country_rules: print(f" [{r['action']:9s}] {r['country_code']}: {r['bot_name']}") # ────────────────────────────────────────────────────────────────────────────── # Main # ────────────────────────────────────────────────────────────────────────────── def main() -> None: print("[INFO] Collecte des règles Anubis depuis GitHub…") ua_rules, ip_rules, asn_rules, country_rules = collect_all_rules() total = len(ua_rules) + len(ip_rules) + len(asn_rules) + len(country_rules) print(f"\n[INFO] {len(ua_rules)} règles UA, {len(ip_rules)} CIDRs IP, " f"{len(asn_rules)} ASN, {len(country_rules)} pays (total={total})") if total == 0: print("[ERREUR] Aucune règle récupérée.", file=sys.stderr) sys.exit(1) print_summary(ua_rules, ip_rules, asn_rules, country_rules) print("\n[INFO] Connexion à ClickHouse…") client = get_ch_client() insert_ua_rules(client, ua_rules) insert_ip_rules(client, ip_rules) insert_asn_rules(client, asn_rules) insert_country_rules(client, country_rules) reload_dicts(client) print("\n[OK] Règles Anubis chargées avec succès.") if __name__ == "__main__": main()