Files
ja4-platform/scripts/generate_asn_data.py
toto 5c5bca71d1 feat: rewrite ASN classification with PeeringDB + expanded heuristics
Major improvements to generate_asn_data.py:
- Add PeeringDB network data source (34K networks with info_type)
- Add new categories: education, government, enterprise
- Rename 'human' label to 'isp' across all consumers
- Expand keyword heuristics (ISP, datacenter, hosting, CDN, education, gov)
- Add hard-coded lists for education, government, enterprise ASNs
- Support both --output-dir and --output-asn/--output-ipasn CLI interfaces
- Add --no-peeringdb flag for offline use

Results: unknown dropped from 86% to 57%, ISP coverage 21.8K ASNs,
education 3.1K, enterprise 5.7K, government 520.

Updated consumers:
- bot_detector.py: 'human' -> 'isp' for baseline selection
- dashboard api.py: 'human' -> 'isp' in SQL queries
- run-tests.sh: 'human' -> 'isp' in integration test assertions
- update-csv-data.sh: updated label description comment

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-08 16:02:07 +02:00

815 lines
28 KiB
Python

#!/usr/bin/env python3
"""
generate_asn_data.py — Download public IPtoASN + PeeringDB data and generate:
1. asn_reputation.csv — ASN number + reputation label (no header)
2. iplocate-ip-to-asn.csv — CIDR,ASN,country,AS name (no header)
Data sources:
- https://iptoasn.com/data/ip2asn-v4.tsv.gz (IP ranges → ASN mapping)
- https://www.peeringdb.com/api/net (ASN → network type)
Labels: isp, datacenter, hosting, cdn, enterprise, education, government, unknown
Usage:
python3 generate_asn_data.py --output-dir ./data
python3 generate_asn_data.py --output-dir ./data --no-download
python3 generate_asn_data.py --output-dir ./data --no-peeringdb
python3 generate_asn_data.py --output-asn out/asn.csv --output-ipasn out/ip.csv
"""
import argparse
import gzip
import ipaddress
import json
import os
import sys
import urllib.request
IPTOASN_URL = "https://iptoasn.com/data/ip2asn-v4.tsv.gz"
PEERINGDB_URL = "https://www.peeringdb.com/api/net?limit=0&depth=0"
CACHED_TSV = "ip2asn-v4.tsv"
CACHED_PEERINGDB = "peeringdb_nets.json"
# ---------------------------------------------------------------------------
# Hard-coded well-known ASN lists (highest classification priority)
# ---------------------------------------------------------------------------
KNOWN_ISP_ASNS: set[int] = {
# France
3215, 12322, 15557, 5410, 6799, 29169, 2027,
# Germany
3320, 5432, 6805, 6830, 8422, 31334, 8881, 9145,
# UK
2856, 5607, 8913, 6871, 13285, 20712, 25577,
# Netherlands
1136, 15542, 33915, 50266, 15435,
# Spain
3352, 12357, 12715, 12479, 12338,
# Italy
3269, 12874, 30722, 1267, 12797,
# US — residential / consumer ISPs
7922, 7018, 701, 20115, 209, 6389, 22773, 10796, 11351, 11427,
11426, 20001, 22394, 5650, 6128, 10507, 12271, 19108, 26801,
33363, 33588, 33651, 33652, 33657, 33659, 33660, 33662, 33668,
7843, 11025, 12083, 20057, 23005, 26827, 33491, 33650, 5769,
6167, 11404, 14265, 21508, 22561, 30036,
# Canada
577, 812, 6327, 852, 855, 6539, 21949, 5645, 6453,
# Japan
2516, 4713, 17676, 2519, 2497, 9605, 4685, 7679, 9824,
# South Korea
4766, 9318, 3786, 38091, 9848, 4659,
# Australia
1221, 4764, 4804, 7545, 9443, 18291, 24429,
# India
9829, 45609, 55836, 24560, 17488, 9498, 18101, 45820,
# Brazil
4230, 7738, 8167, 16735, 26599, 27699, 28573, 53006, 53089,
18881, 22085, 28343,
# Russia
8359, 12389, 25513, 8402, 3216, 31133, 42610,
# China
4134, 4837, 4808, 4812, 9808, 9394, 56040, 56041, 56042,
# Mexico
8151, 6503, 11888, 17072, 32098,
# Turkey
9121, 34984, 47331, 16135,
# Poland
5617, 12912, 5588, 8374, 21021,
# Sweden / Nordics
3301, 1257, 2119, 8473, 12552, 44034, 2116, 29518,
# Switzerland
3303,
# Belgium
6848, 12392,
# Portugal
3243, 2860, 8657,
# Ireland
5466, 15502,
# Southeast Asia
4773, 7552, 45899, 9299, 4818, 18403, 17974, 23969, 9534,
24203, 7470,
# Middle East
5384, 8781, 39891, 42961, 12880, 44244, 50710,
# Africa
36903, 37105, 36874, 36992, 37453, 29571, 33771, 37492,
# Argentina
7303, 10318, 11664, 22927,
# Colombia
10620, 13489, 14080, 27831,
# New Zealand
9790, 4771, 24127, 9500,
}
KNOWN_DATACENTER_ASNS: set[int] = {
# AWS
16509, 14618, 7224,
# Google Cloud
396982, 36492, 36384, 15169,
# Microsoft Azure
8075, 8068, 8069, 12076,
# Oracle Cloud
31898,
# IBM Cloud / SoftLayer
36351,
# Alibaba Cloud
45102,
# Tencent Cloud
132203,
# OVH / OVHcloud
16276,
# Hetzner
24940, 213230,
# DigitalOcean
14061, 393406,
# Linode / Akamai Connected Cloud
63949, 22040,
# Vultr / Choopa
20473,
# Scaleway / Online SAS
12876,
# Contabo
209083, 40021,
# IONOS / 1&1
8560,
# Rackspace
33070, 19994, 27357,
# Equinix Metal
54825,
# Kamatera
36007,
# UpCloud
202053,
# Cherry Servers
59642,
# Leaseweb
28753, 60781,
# Psychz Networks
40676,
# ColoCrossing
36352,
# QuadraNet
8100,
# Zenlayer
21859,
# Yandex Cloud
200350, 208722,
# Huawei Cloud
136907,
# NTT Global
2914,
# Cogent
174,
# GTT
3257,
# Zayo
6461,
}
KNOWN_CDN_ASNS: set[int] = {
13335, # Cloudflare
20940, # Akamai
54113, # Fastly
15133, # Edgecast / Verizon Digital Media / Edgio
16625, # Akamai
22822, # Limelight Networks
30148, # Sucuri
209242, # Cloudflare WARP
132892, # Cloudflare APAC
202623, # StackPath
35994, # Akamai
23454, # Akamai
23455, # Akamai
6185, # Apple CDN (via Akamai)
714, # Apple
2906, # Netflix Open Connect
36183, # Netflix
40027, # Netflix
55095, # Cloudflare CN
394536, # Fastly
395973, # Fastly
36040, # Google CDN
15395, # MaxCDN
30081, # CacheFly
60068, # CDN77
200325, # BunnyCDN
}
KNOWN_HOSTING_ASNS: set[int] = {
26496, # GoDaddy
46606, # Unified Layer / Bluehost
197695, # Reg.ru
47583, # Hostinger
32244, # Liquid Web
19871, # Network Solutions
40034, # Confluence Networks
26347, # DreamHost
29802, # HIVELOCITY
36024, # HostNOC
21844, # ThePlanet / SoftLayer
33182, # HostDime
32475, # SingleHop
18450, # WebNX
53831, # Squarespace
15830, # Telecity / Equinix EU
25369, # Hydra Communications
46475, # Limestone Networks
13768, # Peer 1 Network
29854, # Westhost
36137, # ServerMania
20454, # SecuredServers
62567, # DigitalOcean (hosting arm)
398101, # GoDaddy hosting
19969, # Joe's Datacenter
395003, # WPEngine
22612, # Namecheap
46664, # VolumeDrive
55286, # B2 Net Solutions / ServerMania
35916, # MultaCom
23470, # ReliableSite
}
KNOWN_EDUCATION_ASNS: set[int] = {
786, # JANET (UK academic network)
21320, # GEANT (European research)
11537, # Internet2 (US research)
2200, # RENATER (France research)
680, # DFN (Germany research)
137, # GARR (Italy research)
766, # RedIRIS (Spain research)
7575, # AARNet (Australia research)
513, # CERN
1741, # FUNET (Finland research)
2852, # CESNET (Czech research)
2603, # NORDUnet (Nordics research)
1930, # FCCN (Portugal research)
2381, # WIDE Project (Japan research)
1103, # SURFNET (Netherlands research)
2018, # AFRINIC
7660, # APNIC research
2500, # WIDE (Japan)
4556, # HKIX (Hong Kong academic)
24151, # CNGI-CERNET2 (China education)
4538, # CERNET (China education)
4837, # CERNET2 backbone
3, # MIT
10578, # Stanford
27, # University of Maryland
46, # UC Berkeley
104, # Carnegie Mellon
224, # Uninett (Norway research)
2852, # CESNET
2547, # Belnet (Belgium research)
1754, # DESY (Germany)
5765, # SANET (Slovakia academic)
9264, # KAIST (South Korea)
}
KNOWN_GOVERNMENT_ASNS: set[int] = {
3354, # US DoD Network Information Center
27064, # DoD Education Activity
721, # DoD / DISA
749, # DoD / DISA
568, # US Postal Service
3541, # Headquarters USAISC
6983, # US Internal Revenue Service
19551, # Incapsula Fed (US gov CDN)
394671, # US Dept of Veterans Affairs
7046, # US DoD
10796, # US Census Bureau
2572, # UK MoD
25180, # Bundesverwaltung (Swiss gov)
553, # BelWue (partially government)
559, # SWITCH (Swiss gov/edu)
}
KNOWN_ENTERPRISE_ASNS: set[int] = {
8068, # Microsoft Corp
36459, # GitHub
32934, # Facebook / Meta
13414, # Twitter / X
54115, # Dropbox
8003, # Uber
14413, # Linkedin
40428, # Salesforce
30148, # Sucuri (enterprise security)
394699, # Zoom
19551, # Shopify
62229, # Spotify
394406, # Slack
}
# ---------------------------------------------------------------------------
# PeeringDB info_type → label mapping
# ---------------------------------------------------------------------------
PEERINGDB_TYPE_MAP: dict[str, str] = {
"Cable/DSL/ISP": "isp",
"NSP": "isp",
"Content": "cdn",
"Enterprise": "enterprise",
"Educational/Research": "education",
"Government": "government",
"Non-Profit": "enterprise",
"Route Server": "unknown",
"Route Collector": "unknown",
}
def download_peeringdb(output_dir: str) -> dict[int, str]:
"""Download PeeringDB network list, return {asn: label} mapping."""
cache_path = os.path.join(output_dir, CACHED_PEERINGDB)
print(f"[peeringdb] Fetching {PEERINGDB_URL} ...")
data = None
try:
req = urllib.request.Request(PEERINGDB_URL, headers={
"User-Agent": "generate_asn_data/2.0",
"Accept": "application/json",
})
with urllib.request.urlopen(req, timeout=60) as resp:
raw = resp.read()
print(f"[peeringdb] Downloaded {len(raw):,} bytes")
data = json.loads(raw)
with open(cache_path, "wb") as f:
f.write(raw)
except Exception as e:
print(f"[warning] PeeringDB download failed: {e}", file=sys.stderr)
if os.path.exists(cache_path):
print(f"[info] Using cached {cache_path}")
with open(cache_path, "r", encoding="utf-8") as f:
data = json.load(f)
else:
print("[warning] No PeeringDB data available, continuing without it.",
file=sys.stderr)
return {}
if not data or "data" not in data:
print("[warning] PeeringDB response has no 'data' key", file=sys.stderr)
return {}
pdb_labels: dict[int, str] = {}
nets = data["data"]
print(f"[peeringdb] Processing {len(nets):,} networks ...")
for net in nets:
asn = net.get("asn", 0)
if not asn or asn <= 0:
continue
info_type = (net.get("info_type") or "").strip()
name = (net.get("name") or "").lower()
# Direct type mapping
if info_type in PEERINGDB_TYPE_MAP:
pdb_labels[asn] = PEERINGDB_TYPE_MAP[info_type]
elif info_type == "Network Services":
# Check keywords to distinguish hosting/datacenter/isp
if _match_keywords(name, HOSTING_KEYWORDS):
pdb_labels[asn] = "hosting"
elif _match_keywords(name, DATACENTER_KEYWORDS):
pdb_labels[asn] = "datacenter"
elif _match_keywords(name, ISP_KEYWORDS):
pdb_labels[asn] = "isp"
else:
pdb_labels[asn] = "enterprise"
elif info_type == "":
# No type set — try keyword classification on name
label = _classify_by_keywords(name)
if label:
pdb_labels[asn] = label
# else: unknown types like Route Server — skip
stats: dict[str, int] = {}
for v in pdb_labels.values():
stats[v] = stats.get(v, 0) + 1
print(f"[peeringdb] Mapped {len(pdb_labels):,} ASNs: {stats}")
return pdb_labels
# ---------------------------------------------------------------------------
# Keyword-based heuristic classification (expanded)
# ---------------------------------------------------------------------------
ISP_KEYWORDS = [
# Generic ISP terms
"telecom", "telcom", "telekom", "telefonica", "telecomunicacoes",
"telecomunicaciones", "telekommunikasjon", "telekommunikation",
"mobile", "broadband", "fiber", "fibre", "cable", "wireless",
"residential", "communications", "comm ", " isp", "netcom",
"internet service", "subscriber", "dsl", "adsl", "vdsl", "ftth",
"fttb", "fttp", "dial-up", "dialup", "cellular", "lte", "5g network",
"wimax", "satellite", " tel ", "telco", "ptcl", "bsnl",
"telecable", "telemedia", "telnet", "telprom",
"internet provider", "internet access",
# Specific operators — Global
"airtel", "jio", "reliance", "vodafone", "orange", "bouygues",
"proximus", "swisscom", "telenor", "telia", "elisa", "dna oy",
"rogers", "bell canada", "shaw", "telus", "optus", "tpg ",
"internode", "comcast", "charter", "spectrum", "cox comm",
"verizon", "at&t", "centurylink", "lumen", "frontier comm",
"mediacom", "windstream", "consolidated comm", "t-mobile",
"sprint", "cricket", "boost mobile", "virgin media",
"bt group", "sky broadband", "sky uk", "plusnet", "talktalk",
"ee limited", "three uk", "hutchison", "deutsche telekom",
"1&1 versatel", "freenet", "unitymedia", "kabel deutschland",
"kpn", "ziggo", "xs4all", "movistar", "masmovil", "yoigo",
"tim s.p.a", "fastweb", "iliad", "free sas", "sfr",
"numericable", "kddi", "ntt ", "softbank bb", "so-net",
"biglobe", "kt corporation", "sk broadband", "lg uplus",
"china telecom", "china unicom", "china mobile",
"chunghwa telecom", "taiwan mobile", "far eastone",
"pldt", "globe telecom", "true internet", "ais ", "dtac",
"unifi", "maxis", "singtel", "starhub", "m1 limited",
"viettel", "vnpt", "indosat", "telkomsel", "xl axiata",
"turkcell", "turk telekom", "superonline",
"rostelecom", "mts ", "beeline", "megafon",
"etisalat", "du telecom", "stc ", "zain",
"mtn ", "safaricom", "airtel africa", "rain ",
"telmex", "claro", "vivo ", "oi s.a",
# Additional global operators
"telstra", "spark nz", "chorus nz", "vodacom", "digicel",
"flow ", "liberty cablevision", "totalplay", "izzi ",
"megacable", "axtel", "tigo ", "entel", "bitel", "movilnet",
"cantv", "antel", "copel telecom", "algar telecom", "rede sul",
"surf telecom", "net servicos", "porto seguro",
"dialog axiata", "sri lanka telecom", "banglalink", "grameenphone",
"robi axiata", "ncell", "nepal telecom",
"omantel", "ooredoo", "mobily", "batelco", "viva bahrain",
"nawras", "sure telecom", "airlink", "cool ideas", "vumatel",
"telkom sa", "web africa", "afrihost", "zte corporation",
"smile telecom", "busyinternet", "camtel", "orange cameroun",
"maroc telecom", "inwi", "djezzy", "mobilis", "tunisie telecom",
"ethio telecom", "liquid telecom", "neotel", "cell c",
"a1 telekom", "magenta telekom", "drei austria", "cablecom",
"salt mobile", "sunrise", "post luxembourg", "tango ",
"nova croatia", "ht ", "vivacom", "bulsatcom", "cosmote",
"nova greece", "forthnet", "wind hellas", "cyta",
"telekom srbija", "vip mobile", "telenor serbia",
"telekom slovenije", "a1 slovenia",
"o2 czech", "t-mobile czech", "vodafone czech",
"orange polska", "polkomtel", "play mobile",
"netia", "inea ", "vectra",
"bite ", "tele2", "tet ", "eltel",
"starman", "telia eesti", "elisa eesti",
"lattelecom", "baltcom",
"kyivstar", "lifecell", "ukrtelecom",
"moldtelecom", "orange moldova",
"kazakhtelecom", "beeline kz", "tele2 kz",
"uztelecom", "ucell",
"bakcell", "azercell", "azerconnect",
"veon", "silknet", "magti",
]
DATACENTER_KEYWORDS = [
"cloud", "amazon", "aws", "google cloud", "gcp",
"microsoft azure", "azure", "digitalocean", "digital ocean",
"linode", "vultr", "hetzner", "ovhcloud", "ovh sas",
"scaleway", "contabo", "kamatera", "upcloud",
"oracle cloud", "alibaba cloud", "tencent cloud",
"ibm cloud", "softlayer", "rackspace", "equinix",
"leaseweb", "choopa", "data center", "datacenter", "data centre",
"colocation", "colo ", "baremetal", "bare metal",
"infrastructure", "iaas", "paas",
# Additional cloud/datacenter providers
"serverel", "packet", "metal ", "nexon", "netmagic",
"phoenixnap", "tierpoint", "cyrusone", "coresite",
"datapipe", "internap", "peak 10", "zayo ", "coreweave",
"lambda cloud", "paperspace", "vast.ai", "fluidstack",
"cloudfloor", "netriplex", "hyperoptic", "serverspace",
"selectel", "aruba cloud", "worldstream", "datacamp",
"m247", "datapacket", "clouvider", "hostkey",
"tzulo", "ramnode", "hostens", "serverius", "servers.com",
"nocix", "fdcservers", "reliablesite",
"quadranet", "cogent ", "colocrossing",
]
HOSTING_KEYWORDS = [
"hosting", "host ", "hoster", "webhost", "web host",
"server farm", "vps", "virtual private",
"dedicated server", "shared hosting", "managed hosting",
"reseller", "cpanel", "plesk", "wordpress host",
"godaddy", "namecheap", "hostinger", "bluehost",
"siteground", "a2 hosting", "dreamhost", "hostgator",
"ionos", "squarespace", "wix", "wpengine", "wp engine",
"register.com", "domain registr", "registrar",
"strato ag", "mittwald", "hosteurope", "all-inkl",
"infomaniak", "gandi", "one.com", "loopia",
"fastcomet", "cloudways", "kinsta", "flywheel",
"netlify", "vercel", "render.com", "railway",
"heroku", "webhosting",
]
CDN_KEYWORDS = [
"cloudflare", "akamai", "fastly", "cdn ",
"content delivery", "edgecast", "limelight",
"stackpath", "sucuri", "keycdn", "bunnycdn", "bunny cdn",
"jsdelivr", "cachefly", "imperva", "incapsula",
"g-core", "gcorelabs", "cdn77", "cdn network",
"medianova", "chinacache", "wangsu", "quantil",
"azion", "section.io", "beluga cdn",
]
EDUCATION_KEYWORDS = [
"university", "universit", "universite", "universidad", "universidade",
"universita", "universitaet", "univ ", "univ.", "univers",
"college", "academic", "academia", "research",
"education", "educational", "school", "institute",
"polytechnic", "hochschule", "fachhochschule",
"nren", "ren ", ".ren", "research network",
"wissenschaft", "forschung",
"campus", "student",
"ecole ", "école", "escuela", "escola",
"fakultat", "fakultet",
"技術", "大学", "学院", "研究", "科学",
"교육", "대학",
"กรม", "มหาวิทยาลัย",
]
GOVERNMENT_KEYWORDS = [
"government", "gouvernement", "gobierno", "governo", "regierung",
"gouv", "gov.", ".gov", "gob.",
"ministry", "ministere", "ministerio", "ministerium", "ministero",
"military", "army", "navy", "defense", "defence", "defensa",
"police", "polizei", "polizia",
"federal", "fedral", "estado", "etat",
"municipal", "prefecture", "region ",
"national agency", "state of ",
"bundeswehr", "armed forces",
"customs", "immigration",
"justic", "judiciary",
]
# Enterprise keywords are lowest priority — many ISPs have these suffixes
ENTERPRISE_KEYWORDS = [
"corporation", "corp.", "corporate",
" inc.", " inc,",
" ltd.", " ltd,",
"gmbh", "s.a.", "s.r.l",
" group", "holdings",
"financial", "banking", "insurance",
"pharmaceutical", "logistics",
"manufacturing", "industrial",
"automotive", "energy",
"retail ", "media group",
]
def _match_keywords(name_lower: str, keywords: list[str]) -> bool:
"""Check if any keyword matches in the lowered name."""
return any(kw in name_lower for kw in keywords)
def _classify_by_keywords(name_lower: str) -> str:
"""Classify by keyword heuristics. Returns label or empty string."""
if name_lower in ("not routed", "none", "", "-"):
return "unknown"
if _match_keywords(name_lower, CDN_KEYWORDS):
return "cdn"
if _match_keywords(name_lower, EDUCATION_KEYWORDS):
return "education"
if _match_keywords(name_lower, GOVERNMENT_KEYWORDS):
return "government"
if _match_keywords(name_lower, HOSTING_KEYWORDS):
return "hosting"
if _match_keywords(name_lower, DATACENTER_KEYWORDS):
return "datacenter"
if _match_keywords(name_lower, ISP_KEYWORDS):
return "isp"
if _match_keywords(name_lower, ENTERPRISE_KEYWORDS):
return "enterprise"
return ""
def classify_asn(asn_number: int, as_name: str,
peeringdb: dict[int, str]) -> str:
"""Classify an ASN into a reputation category.
Priority: hard-coded lists > PeeringDB > keyword heuristics.
"""
# 1. Hard-coded lookups (highest priority)
if asn_number in KNOWN_CDN_ASNS:
return "cdn"
if asn_number in KNOWN_HOSTING_ASNS:
return "hosting"
if asn_number in KNOWN_DATACENTER_ASNS:
return "datacenter"
if asn_number in KNOWN_EDUCATION_ASNS:
return "education"
if asn_number in KNOWN_GOVERNMENT_ASNS:
return "government"
if asn_number in KNOWN_ENTERPRISE_ASNS:
return "enterprise"
if asn_number in KNOWN_ISP_ASNS:
return "isp"
# 2. PeeringDB classification
if asn_number in peeringdb:
return peeringdb[asn_number]
# 3. Keyword heuristics on AS name
name_lower = as_name.lower()
label = _classify_by_keywords(name_lower)
return label if label else "unknown"
def ranges_to_cidrs(start_ip: str, end_ip: str):
"""Convert an IP range to a list of CIDR networks."""
try:
first = ipaddress.IPv4Address(start_ip)
last = ipaddress.IPv4Address(end_ip)
if first > last:
return []
return list(ipaddress.summarize_address_range(first, last))
except (ipaddress.AddressValueError, ValueError, TypeError):
return []
def download_iptoasn(output_dir: str) -> str:
"""Download and decompress ip2asn-v4.tsv.gz, return path to TSV."""
tsv_path = os.path.join(output_dir, CACHED_TSV)
print(f"[download] Fetching {IPTOASN_URL} ...")
try:
req = urllib.request.Request(IPTOASN_URL, headers={
"User-Agent": "generate_asn_data/2.0"
})
with urllib.request.urlopen(req, timeout=120) as resp:
compressed = resp.read()
print(f"[download] Downloaded {len(compressed):,} bytes compressed")
raw = gzip.decompress(compressed)
print(f"[download] Decompressed to {len(raw):,} bytes")
with open(tsv_path, "wb") as f:
f.write(raw)
print(f"[download] Saved to {tsv_path}")
except Exception as e:
print(f"[error] Download failed: {e}", file=sys.stderr)
if os.path.exists(tsv_path):
print(f"[info] Using cached {tsv_path}")
else:
print("[fatal] No cached TSV available, aborting.", file=sys.stderr)
sys.exit(1)
return tsv_path
def parse_tsv(tsv_path: str):
"""
Parse ip2asn-v4.tsv.
Format: range_start \\t range_end \\t AS_number \\t country_code \\t AS_description
Returns list of (start_ip, end_ip, asn, country, as_name) tuples.
"""
entries = []
with open(tsv_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) < 5:
continue
start_ip, end_ip, asn_str, country, as_name = (
parts[0], parts[1], parts[2], parts[3], parts[4]
)
try:
asn = int(asn_str)
except ValueError:
continue
if asn == 0:
continue
entries.append((start_ip, end_ip, asn, country, as_name))
return entries
def main():
parser = argparse.ArgumentParser(
description="Generate ASN reputation and IP-to-ASN CSVs from iptoasn.com + PeeringDB"
)
parser.add_argument(
"--output-dir", default=None,
help="Directory for output CSV files (writes asn_reputation.csv and iplocate-ip-to-asn.csv)"
)
parser.add_argument(
"--output-asn", default=None,
help="Explicit path for asn_reputation.csv output"
)
parser.add_argument(
"--output-ipasn", default=None,
help="Explicit path for iplocate-ip-to-asn.csv output"
)
parser.add_argument(
"--no-download", action="store_true",
help="Skip iptoasn.com download, use cached ip2asn-v4.tsv"
)
parser.add_argument(
"--no-peeringdb", action="store_true",
help="Skip PeeringDB download, classify using keyword heuristics only"
)
args = parser.parse_args()
# Resolve output paths — support both --output-dir and --output-asn/--output-ipasn
if args.output_asn and args.output_ipasn:
rep_path = args.output_asn
ip_path = args.output_ipasn
work_dir = os.path.dirname(rep_path) or "."
elif args.output_dir:
work_dir = args.output_dir
rep_path = os.path.join(work_dir, "asn_reputation.csv")
ip_path = os.path.join(work_dir, "iplocate-ip-to-asn.csv")
else:
work_dir = "."
rep_path = os.path.join(work_dir, "asn_reputation.csv")
ip_path = os.path.join(work_dir, "iplocate-ip-to-asn.csv")
os.makedirs(work_dir, exist_ok=True)
# Step 1: Get IP-to-ASN TSV data
tsv_path = os.path.join(work_dir, CACHED_TSV)
if not args.no_download:
tsv_path = download_iptoasn(work_dir)
elif not os.path.exists(tsv_path):
print(f"[fatal] --no-download specified but {tsv_path} not found",
file=sys.stderr)
sys.exit(1)
# Step 2: Get PeeringDB data
peeringdb: dict[int, str] = {}
if not args.no_peeringdb:
peeringdb = download_peeringdb(work_dir)
# Step 3: Parse TSV
print("[parse] Reading TSV ...")
entries = parse_tsv(tsv_path)
print(f"[parse] {len(entries):,} IP range entries")
# Step 4: Build ASN info map {asn: (country, as_name)}
asn_info: dict[int, tuple[str, str]] = {}
for _, _, asn, country, as_name in entries:
if asn not in asn_info:
asn_info[asn] = (country, as_name)
# Step 5: Classify all ASNs
asn_labels: dict[int, str] = {}
for asn, (country, as_name) in asn_info.items():
asn_labels[asn] = classify_asn(asn, as_name, peeringdb)
# Include ASNs that are in PeeringDB but not in iptoasn (they have no
# IP ranges but still get a label in asn_reputation.csv)
for asn, label in peeringdb.items():
if asn not in asn_labels:
asn_labels[asn] = label
# Step 6: Write asn_reputation.csv (sorted by ASN, no header)
sorted_asns = sorted(asn_labels.keys())
os.makedirs(os.path.dirname(rep_path) or ".", exist_ok=True)
with open(rep_path, "w") as f:
for asn in sorted_asns:
f.write(f"{asn},{asn_labels[asn]}\n")
print(f"[output] Wrote {len(sorted_asns):,} ASNs to {rep_path}")
# Step 7: Convert ranges to CIDRs and write iplocate-ip-to-asn.csv
seen_cidrs: set[str] = set()
cidr_rows: list[tuple[ipaddress.IPv4Network, int, str, str]] = []
print("[cidr] Converting IP ranges to CIDR notation ...")
for start_ip, end_ip, asn, country, as_name in entries:
cidrs = ranges_to_cidrs(start_ip, end_ip)
for cidr in cidrs:
key = f"{cidr},{asn}"
if key not in seen_cidrs:
seen_cidrs.add(key)
cidr_rows.append((cidr, asn, country, as_name))
cidr_rows.sort(key=lambda r: (r[0].network_address, r[0].prefixlen))
os.makedirs(os.path.dirname(ip_path) or ".", exist_ok=True)
with open(ip_path, "w") as f:
for cidr, asn, country, as_name in cidr_rows:
safe_name = as_name.replace(",", " ")
f.write(f"{cidr},{asn},{country},{safe_name}\n")
cidr_count = len(cidr_rows)
print(f"[output] Wrote {cidr_count:,} CIDRs to {ip_path}")
# Step 8: Stats
stats: dict[str, int] = {}
for label in asn_labels.values():
stats[label] = stats.get(label, 0) + 1
all_labels = ["isp", "datacenter", "hosting", "cdn", "enterprise",
"education", "government", "unknown"]
print("\n=== Summary ===")
print(f"Total ASNs: {len(sorted_asns):,}")
for label in all_labels:
count = stats.get(label, 0)
pct = 100.0 * count / len(sorted_asns) if sorted_asns else 0
print(f" {label:12s}: {count:>6,} ({pct:5.1f}%)")
print(f"Total CIDRs: {cidr_count:,}")
# Warn if unknown is still too high
unk_pct = 100.0 * stats.get("unknown", 0) / len(sorted_asns) if sorted_asns else 0
if unk_pct > 50:
print(f"\n[warning] {unk_pct:.1f}% of ASNs are still 'unknown'.", file=sys.stderr)
if not peeringdb:
print("[hint] Try running without --no-peeringdb to use PeeringDB data.",
file=sys.stderr)
if __name__ == "__main__":
main()