#!/usr/bin/env python3 """ generate_asn_data.py — Download public IPtoASN database and generate: 1. asn_reputation.csv — ASN number + reputation label (no header) 2. iplocate-ip-to-asn.csv — CIDR,ASN,country,AS name (no header) Data source: https://iptoasn.com/data/ip2asn-v4.tsv.gz (free, no registration) Usage: python3 generate_asn_data.py --output-dir . python3 generate_asn_data.py --output-dir . --no-download # reuse cached TSV """ import argparse import gzip import ipaddress import io import os import sys import urllib.request IPTOASN_URL = "https://iptoasn.com/data/ip2asn-v4.tsv.gz" CACHED_TSV = "ip2asn-v4.tsv" # --------------------------------------------------------------------------- # Hard-coded well-known ASN lists for accurate classification # --------------------------------------------------------------------------- KNOWN_HUMAN_ASNS: set[int] = { # France 3215, 12322, 15557, 5410, 6799, 29169, 2027, # Germany 3320, 5432, 6805, 6830, 8422, 31334, 8881, 9145, # UK 2856, 5607, 8913, 6871, 13285, 20712, 25577, # Netherlands 1136, 15542, 33915, 50266, 15435, # Spain 3352, 12357, 12715, 12479, 12338, # Italy 3269, 12874, 30722, 1267, 12797, # US — residential / consumer ISPs 7922, 7018, 701, 20115, 209, 6389, 22773, 10796, 11351, 11427, 11426, 20001, 22394, 5650, 6128, 10507, 12271, 19108, 26801, 33363, 33588, 33651, 33652, 33657, 33659, 33660, 33662, 33668, 7843, 11025, 12083, 20057, 23005, 26827, 33491, 33650, 5769, 6167, 11404, 14265, 21508, 22561, 30036, # Canada 577, 812, 6327, 852, 855, 6539, 21949, 5645, 6453, # Japan 2516, 4713, 17676, 2519, 2497, 9605, 4685, 7679, 9824, # South Korea 4766, 9318, 3786, 38091, 9848, 4659, # Australia 1221, 4764, 4804, 7545, 9443, 18291, 24429, # India 9829, 45609, 55836, 24560, 17488, 9498, 18101, 45820, # Brazil 4230, 7738, 8167, 16735, 26599, 27699, 28573, 53006, 53089, 18881, 22085, 28343, # Russia 8359, 12389, 25513, 8402, 3216, 31133, 42610, # China 4134, 4837, 4808, 4812, 9808, 9394, 56040, 56041, 56042, # Mexico 8151, 6503, 11888, 17072, 32098, # Turkey 9121, 34984, 47331, 16135, # Poland 5617, 12912, 6830, 5588, 8374, 21021, # Sweden / Nordics 3301, 1257, 2119, 8473, 12552, 44034, 2116, 29518, # Switzerland 3303, 6830, # Belgium 5432, 6848, 12392, # Portugal 3243, 2860, 8657, # Ireland 5466, 15502, 6830, # Southeast Asia 4773, 7552, 45899, 9299, 4818, 18403, 17974, 23969, 9534, 24203, 7470, # Middle East 5384, 8781, 39891, 42961, 12880, 44244, 50710, # Africa 36903, 37105, 36874, 36992, 37453, 29571, 33771, 37492, # Argentina 7303, 10318, 11664, 22927, # Colombia 10620, 13489, 14080, 27831, # New Zealand 9790, 4771, 24127, 9500, } KNOWN_DATACENTER_ASNS: set[int] = { # AWS 16509, 14618, 7224, # Google Cloud 396982, 36492, 36384, 15169, # Microsoft Azure 8075, 8068, 8069, 12076, # Oracle Cloud 31898, # IBM Cloud / SoftLayer 36351, # Alibaba Cloud 45102, # Tencent Cloud 132203, # OVH / OVHcloud 16276, # Hetzner 24940, 213230, # DigitalOcean 14061, 393406, # Linode / Akamai Connected Cloud 63949, 22040, # Vultr 20473, # Scaleway / Online SAS 12876, # Contabo 209083, 40021, # IONOS / 1&1 8560, # Rackspace 33070, 19994, 27357, # Equinix Metal 54825, # Kamatera 36007, # UpCloud 202053, # Cherry Servers 59642, # Leaseweb 28753, 60781, # Psychz Networks 40676, # ColoCrossing 36352, # QuadraNet 8100, # Choopa (Vultr subsidiary) 20473, # Zenlayer 21859, } KNOWN_CDN_ASNS: set[int] = { 13335, # Cloudflare 20940, # Akamai 54113, # Fastly 15133, # Edgecast / Verizon Digital Media / Edgio 16625, # Akamai 22822, # Limelight Networks 30148, # Sucuri 209242, # Cloudflare WARP 132892, # Cloudflare APAC 202623, # StackPath 35994, # Akamai 23454, # Akamai 23455, # Akamai 6185, # Apple CDN (via Akamai) 714, # Apple 2906, # Netflix Open Connect 36183, # Netflix 40027, # Netflix 55095, # Cloudflare CN 394536, # Fastly 395973, # Fastly } KNOWN_HOSTING_ASNS: set[int] = { 26496, # GoDaddy 46606, # Unified Layer / Bluehost 197695, # Reg.ru 47583, # Hostinger 32244, # Liquid Web 19871, # Network Solutions 40034, # Confluence Networks 26347, # DreamHost 36351, # SoftLayer (also datacenter, but historically hosting) 29802, # HIVELOCITY 36024, # HostNOC 21844, # ThePlanet / SoftLayer 33182, # HostDime 32475, # SingleHop 18450, # WebNX 53831, # Squarespace 15830, # Telecity / Equinix EU 25369, # Hydra Communications 46475, # Limestone Networks 13768, # Peer 1 Network 29854, # Westhost 36137, # ServerMania 20454, # SecuredServers 62567, # DigitalOcean (hosting arm) 398101, # GoDaddy hosting 19969, # Joe's Datacenter 395003, # WPEngine } # --------------------------------------------------------------------------- # Keyword-based heuristic classification # --------------------------------------------------------------------------- HUMAN_KEYWORDS = [ "telecom", "telcom", "telekom", "telefonica", "telecomunicacoes", "mobile", "broadband", "fiber", "fibre", "cable", "wireless", "residential", "communications", "comm ", " isp", "netcom", "internet service", "subscriber", "dsl", "adsl", "vdsl", "ftth", "fttb", "dial-up", "dialup", "cellular", "lte", "5g network", "wimax", "satellite", " tel ", "telco", "ptcl", "bsnl", "airtel", "jio", "reliance", "vodafone", "orange", "bouygues", "proximus", "swisscom", "telenor", "telia", "elisa", "dna oy", "rogers", "bell canada", "shaw", "telus", "optus", "tpg ", "internode", "comcast", "charter", "spectrum", "cox comm", "verizon", "at&t", "centurylink", "lumen", "frontier comm", "mediacom", "windstream", "consolidated comm", "t-mobile", "sprint", "cricket", "boost mobile", "virgin media", "bt group", "sky broadband", "sky uk", "plusnet", "talktalk", "ee limited", "three uk", "hutchison", "deutsche telekom", "1&1 versatel", "freenet", "unitymedia", "kabel deutschland", "kpn", "ziggo", "xs4all", "movistar", "masmovil", "yoigo", "tim s.p.a", "fastweb", "iliad", "free sas", "sfr", "numericable", "kddi", "ntt ", "softbank bb", "so-net", "biglobe", "kt corporation", "sk broadband", "lg uplus", "china telecom", "china unicom", "china mobile", "chunghwa telecom", "taiwan mobile", "far eastone", "pldt", "globe telecom", "true internet", "ais ", "dtac", "unifi", "maxis", "singtel", "starhub", "m1 limited", "viettel", "vnpt", "indosat", "telkomsel", "xl axiata", "turkcell", "turk telekom", "superonline", "rostelecom", "mts ", "beeline", "megafon", "etisalat", "du telecom", "stc ", "zain", "mtn ", "safaricom", "airtel africa", "rain ", "telmex", "claro", "vivo ", "oi s.a", ] DATACENTER_KEYWORDS = [ "cloud", "amazon", "aws", "google cloud", "gcp", "microsoft azure", "azure", "digitalocean", "digital ocean", "linode", "vultr", "hetzner", "ovhcloud", "ovh sas", "scaleway", "contabo", "kamatera", "upcloud", "oracle cloud", "alibaba cloud", "tencent cloud", "ibm cloud", "softlayer", "rackspace", "equinix", "leaseweb", "choopa", "data center", "datacenter", "colocation", "colo ", "baremetal", "bare metal", "infrastructure", "iaas", "paas", ] HOSTING_KEYWORDS = [ "hosting", "host ", "hoster", "webhost", "server farm", "vps", "virtual private", "dedicated server", "shared hosting", "managed hosting", "reseller", "cpanel", "plesk", "wordpress host", "godaddy", "namecheap", "hostinger", "bluehost", "siteground", "a2 hosting", "dreamhost", "hostgator", "ionos", "squarespace", "wix", "wpengine", ] CDN_KEYWORDS = [ "cloudflare", "akamai", "fastly", "cdn ", "content delivery", "edgecast", "limelight", "stackpath", "sucuri", "keycdn", "bunnycdn", "jsdelivr", "cachefly", "imperva", "incapsula", ] def classify_asn(asn_number: int, as_name: str) -> str: """Classify an ASN into a reputation category.""" # Hard-coded lookups first (highest priority) if asn_number in KNOWN_CDN_ASNS: return "cdn" if asn_number in KNOWN_HOSTING_ASNS: return "hosting" if asn_number in KNOWN_DATACENTER_ASNS: return "datacenter" if asn_number in KNOWN_HUMAN_ASNS: return "human" # Keyword heuristics on AS name name_lower = as_name.lower() # Skip "Not routed" or reserved if name_lower in ("not routed", "none", "", "-"): return "unknown" # CDN first (most specific) for kw in CDN_KEYWORDS: if kw in name_lower: return "cdn" # Hosting before datacenter (more specific) for kw in HOSTING_KEYWORDS: if kw in name_lower: return "hosting" # Datacenter for kw in DATACENTER_KEYWORDS: if kw in name_lower: return "datacenter" # Human / ISP for kw in HUMAN_KEYWORDS: if kw in name_lower: return "human" return "unknown" def ranges_to_cidrs(start_ip: str, end_ip: str): """Convert an IP range to a list of CIDR networks.""" try: first = ipaddress.IPv4Address(start_ip) last = ipaddress.IPv4Address(end_ip) if first > last: return [] return list(ipaddress.summarize_address_range(first, last)) except (ipaddress.AddressValueError, ValueError, TypeError): return [] def download_iptoasn(output_dir: str) -> str: """Download and decompress ip2asn-v4.tsv.gz, return path to TSV.""" tsv_path = os.path.join(output_dir, CACHED_TSV) print(f"[download] Fetching {IPTOASN_URL} ...") try: req = urllib.request.Request(IPTOASN_URL, headers={ "User-Agent": "generate_asn_data/1.0" }) with urllib.request.urlopen(req, timeout=120) as resp: compressed = resp.read() print(f"[download] Downloaded {len(compressed):,} bytes compressed") raw = gzip.decompress(compressed) print(f"[download] Decompressed to {len(raw):,} bytes") with open(tsv_path, "wb") as f: f.write(raw) print(f"[download] Saved to {tsv_path}") except Exception as e: print(f"[error] Download failed: {e}", file=sys.stderr) if os.path.exists(tsv_path): print(f"[info] Using cached {tsv_path}") else: print("[fatal] No cached TSV available, aborting.", file=sys.stderr) sys.exit(1) return tsv_path def parse_tsv(tsv_path: str): """ Parse ip2asn-v4.tsv. Format: range_start \\t range_end \\t AS_number \\t country_code \\t AS_description Returns list of (start_ip, end_ip, asn, country, as_name) tuples. """ entries = [] with open(tsv_path, "r", encoding="utf-8") as f: for line_no, line in enumerate(f, 1): line = line.strip() if not line or line.startswith("#"): continue parts = line.split("\t") if len(parts) < 5: continue start_ip, end_ip, asn_str, country, as_name = ( parts[0], parts[1], parts[2], parts[3], parts[4] ) try: asn = int(asn_str) except ValueError: continue if asn == 0: continue entries.append((start_ip, end_ip, asn, country, as_name)) return entries def main(): parser = argparse.ArgumentParser( description="Generate ASN reputation and IP-to-ASN CSVs from iptoasn.com data" ) parser.add_argument( "--output-dir", default=".", help="Directory for output CSV files (default: current dir)" ) parser.add_argument( "--no-download", action="store_true", help="Skip download, use cached ip2asn-v4.tsv in output-dir" ) args = parser.parse_args() os.makedirs(args.output_dir, exist_ok=True) # Step 1: Get TSV data tsv_path = os.path.join(args.output_dir, CACHED_TSV) if not args.no_download: tsv_path = download_iptoasn(args.output_dir) elif not os.path.exists(tsv_path): print(f"[fatal] --no-download specified but {tsv_path} not found", file=sys.stderr) sys.exit(1) # Step 2: Parse print("[parse] Reading TSV ...") entries = parse_tsv(tsv_path) print(f"[parse] {len(entries):,} IP range entries") # Step 3: Build ASN info map {asn: (country, as_name)} # Use first occurrence for name/country (they're consistent in the dataset) asn_info: dict[int, tuple[str, str]] = {} for _, _, asn, country, as_name in entries: if asn not in asn_info: asn_info[asn] = (country, as_name) # Step 4: Classify all ASNs asn_labels: dict[int, str] = {} for asn, (country, as_name) in asn_info.items(): asn_labels[asn] = classify_asn(asn, as_name) # Step 5: Write asn_reputation.csv (sorted by ASN, no header) rep_path = os.path.join(args.output_dir, "asn_reputation.csv") sorted_asns = sorted(asn_labels.keys()) with open(rep_path, "w") as f: for asn in sorted_asns: f.write(f"{asn},{asn_labels[asn]}\n") print(f"[output] Wrote {len(sorted_asns):,} ASNs to {rep_path}") # Step 6: Convert ranges to CIDRs and write iplocate-ip-to-asn.csv ip_path = os.path.join(args.output_dir, "iplocate-ip-to-asn.csv") cidr_count = 0 seen_cidrs: set[str] = set() # Collect all rows, then sort cidr_rows: list[tuple[ipaddress.IPv4Network, int, str, str]] = [] print("[cidr] Converting IP ranges to CIDR notation ...") for start_ip, end_ip, asn, country, as_name in entries: cidrs = ranges_to_cidrs(start_ip, end_ip) for cidr in cidrs: key = f"{cidr},{asn}" if key not in seen_cidrs: seen_cidrs.add(key) cidr_rows.append((cidr, asn, country, as_name)) # Sort by network address cidr_rows.sort(key=lambda r: (r[0].network_address, r[0].prefixlen)) with open(ip_path, "w") as f: for cidr, asn, country, as_name in cidr_rows: # Escape commas in AS names safe_name = as_name.replace(",", " ") f.write(f"{cidr},{asn},{country},{safe_name}\n") cidr_count = len(cidr_rows) print(f"[output] Wrote {cidr_count:,} CIDRs to {ip_path}") # Step 7: Stats stats: dict[str, int] = {} for label in asn_labels.values(): stats[label] = stats.get(label, 0) + 1 print("\n=== Summary ===") print(f"Total ASNs: {len(sorted_asns):,}") for label in ["human", "datacenter", "hosting", "cdn", "unknown"]: count = stats.get(label, 0) pct = 100.0 * count / len(sorted_asns) if sorted_asns else 0 print(f" {label:12s}: {count:>6,} ({pct:5.1f}%)") print(f"Total CIDRs: {cidr_count:,}") if __name__ == "__main__": main()