Files
lidar_rendu/lidar_pipeline/ign.py
Jacquin Antoine f988ddb76d Auto-retry IGN tiles at lower zoom on 404
When zoom 20 tiles are unavailable (rural areas), fall back to zoom 19, 18,
etc. down to 15. Breaks out immediately on first-tile 404 to avoid wasting
requests at unsupported zoom levels.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-14 02:21:47 +02:00

282 lines
10 KiB
Python

"""IGN (French National Geographic Institute) tile download and overlay generation.
Downloads WMTS tiles from the IGN Geoportail and composites them into
GeoTIFF overlays matching the LiDAR DTM extent.
"""
import logging
import math
import time
from pathlib import Path
import numpy as np
import rasterio
try:
from rasterio.warp import transform as warp_transform
HAS_WARP = True
except ImportError:
HAS_WARP = False
logger = logging.getLogger("lidar")
def _optimal_zoom_level(resolution, lat, layer):
"""Compute optimal WMTS zoom level for a given ground resolution.
Chooses the zoom level where the WMTS ground resolution is at least
as fine as the requested resolution, capped to avoid downloading
too many tiles.
Args:
resolution: Desired ground resolution in m/px (DTM resolution).
lat: Latitude in degrees (for Web Mercator resolution calculation).
layer: IGN WMTS layer name (different max zoom per layer).
Returns:
Integer zoom level.
"""
# Web Mercator ground resolution: m/px = (2π * R * cos(lat)) / (256 * 2^z)
# Solve for z: z = log2(2π * R * cos(lat) / (256 * resolution))
R = 6378137.0 # WGS84 equatorial radius
lat_rad = math.radians(lat)
numerator = 2 * math.pi * R * math.cos(lat_rad)
z_exact = math.log2(numerator / (256 * resolution))
zoom = max(15, min(20, int(math.ceil(z_exact))))
# Cap zoom per layer: orthophoto supports up to 20, topographic map up to 19
if 'ORTHO' in layer:
zoom = min(zoom, 20)
elif 'PLAN' in layer:
zoom = min(zoom, 19)
logger.debug(f" Zoom WMTS: {zoom} (résolution cible: {resolution}m/px)")
return zoom
def _lat_lon_to_tile(lat, lon, zoom):
"""Convert lat/lon to Web Mercator tile coordinates."""
n = 2 ** zoom
col = int((lon + 180) / 360 * n)
lat_rad = math.radians(lat)
row = int((1 - math.log(math.tan(lat_rad) + 1 / math.cos(lat_rad)) / math.pi) / 2 * n)
return col, row
def _lat_lon_to_px(lat, lon, zoom, tile_size=256):
"""Convert lat/lon to Web Mercator pixel coordinates."""
n = 2 ** zoom
px_x = (lon + 180) / 360 * n * tile_size
lat_rad = math.radians(lat)
px_y = (1 - math.log(math.tan(lat_rad) + 1 / math.cos(lat_rad)) / math.pi) / 2 * n * tile_size
return px_x, px_y
def download_ign_tiles(min_x, max_x, min_y, max_y, layer, zoom_level=15):
"""Download IGN WMTS tiles for the given bounds using Web Mercator (PM).
If the first tile returns 404, automatically retries at lower zoom levels.
Args:
min_x, max_x, min_y, max_y: Bounds in Lambert 93.
layer: IGN WMTS layer name.
zoom_level: WMTS zoom level (default 15).
Returns:
numpy array (H, W, 3) uint8, or None on failure.
"""
import urllib.request
import io
from PIL import Image as PILImage
if not HAS_WARP:
logger.error(" ✗ rasterio.warp non disponible pour conversion de coordonnées")
return None
try:
l93_xs = [min_x, max_x]
l93_ys = [max_y, min_y]
lons, lats = warp_transform('EPSG:2154', 'EPSG:4326', l93_xs, l93_ys)
nw_lat, nw_lon = lats[0], lons[0]
se_lat, se_lon = lats[1], lons[1]
except Exception:
logger.error(" ✗ Impossible de convertir les coordonnées")
return None
wmts_url = "https://data.geopf.fr/wmts"
tile_matrix_set = "PM"
tile_size = 256
# Try downloading at the requested zoom level; fall back to lower zooms on 404
min_zoom = 15
for zoom in range(zoom_level, min_zoom - 1, -1):
col_min, row_min = _lat_lon_to_tile(nw_lat, nw_lon, zoom)
col_max, row_max = _lat_lon_to_tile(se_lat, se_lon, zoom)
nw_px_x, nw_px_y = _lat_lon_to_px(nw_lat, nw_lon, zoom)
se_px_x, se_px_y = _lat_lon_to_px(se_lat, se_lon, zoom)
out_width = int(se_px_x - nw_px_x)
out_height = int(se_px_y - nw_px_y)
if out_width <= 0 or out_height <= 0 or out_width > 10000 or out_height > 10000:
logger.warning(f" Image IGN trop grande: {out_width}x{out_height}px — zoom {zoom} abandon")
continue
total_tiles = (col_max - col_min + 1) * (row_max - row_min + 1)
if zoom < zoom_level:
logger.info(f" ↓ Retry zoom {zoom_level}{zoom}: {total_tiles} tuiles ({out_width}x{out_height}px)")
else:
logger.info(f" Zoom {zoom}: {total_tiles} tuiles à télécharger ({out_width}x{out_height}px)")
composite = np.full((out_height, out_width, 3), 255, dtype=np.uint8)
tiles_downloaded = 0
tiles_404 = 0
fmt = "image/png" if 'PLAN' in layer else "image/jpeg"
for col in range(col_min, col_max + 1):
for row in range(row_min, row_max + 1):
url = (
f"{wmts_url}?SERVICE=WMTS&VERSION=1.0.0&REQUEST=GetTile"
f"&LAYER={layer}&STYLE=normal"
f"&TILEMATRIXSET={tile_matrix_set}"
f"&TILEMATRIX={zoom}&TILECOL={col}&TILEROW={row}"
f"&FORMAT={fmt}"
)
try:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=10) as response:
tile_data = response.read()
tile_img = PILImage.open(io.BytesIO(tile_data)).convert('RGB')
tile_arr = np.array(tile_img)
tile_origin_x = col * tile_size
tile_origin_y = row * tile_size
px_x = int(tile_origin_x - nw_px_x)
px_y = int(tile_origin_y - nw_px_y)
x_off = max(0, -px_x)
y_off = max(0, -px_y)
dst_x_start = max(0, px_x)
dst_y_start = max(0, px_y)
dst_x_end = min(out_width, px_x + tile_size)
dst_y_end = min(out_height, px_y + tile_size)
src_x = x_off
src_y = y_off
src_w = dst_x_end - dst_x_start
src_h = dst_y_end - dst_y_start
if src_w > 0 and src_h > 0 and tile_arr.shape[0] >= src_y + src_h and tile_arr.shape[1] >= src_x + src_w:
composite[dst_y_start:dst_y_end, dst_x_start:dst_x_end] = \
tile_arr[src_y:src_y+src_h, src_x:src_x+src_w]
tiles_downloaded += 1
except urllib.error.HTTPError as e:
if e.code == 404:
tiles_404 += 1
# If the very first tile is 404, this zoom level is unavailable
if col == col_min and row == row_min:
logger.info(f" Zoom {zoom} non disponible (404) — essai zoom inférieur")
break
continue
except Exception:
continue
else:
continue
# Only reach here if inner loop broke (first tile 404)
break
if tiles_404 > 0 and tiles_downloaded == 0:
# No tiles at this zoom, try lower
continue
logger.info(f"{tiles_downloaded} tuiles IGN téléchargées ({layer})")
if tiles_downloaded == 0:
continue
return composite
logger.error(" ✗ Aucun zoom disponible pour cette zone")
return None
def generate_ign_overlay(dem_file, basename, vis_dir, resolution, layer, title, legend_label, description, out_suffix):
"""Generate an IGN basemap overlay visualization.
Args:
dem_file: Path to DTM GeoTIFF.
basename: Base name for output file.
vis_dir: Output directory for visualizations.
resolution: Grid resolution in m/px.
layer: IGN WMTS layer name.
title: Title for the overlay.
legend_label: Legend text.
description: Description text.
out_suffix: Suffix for output filename.
Returns:
Path to output GeoTIFF, or None on failure.
"""
logger.info(f"{title}...")
t0 = time.time()
output = vis_dir / f"{basename}_{out_suffix}.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
height, width = dem.shape
top_left_x = transform.c
top_left_y = transform.f
pixel_size_x = transform.a
pixel_size_y = abs(transform.e)
min_x = top_left_x
max_x = top_left_x + width * pixel_size_x
max_y = top_left_y
min_y = top_left_y - height * pixel_size_y
# Compute center latitude for zoom calculation
center_lat = (max_y + min_y) / 2
center_lon = (min_x + max_x) / 2
try:
clons, clats = warp_transform('EPSG:2154', 'EPSG:4326', [center_lon], [center_lat])
center_lat_wgs84 = clats[0]
except Exception:
center_lat_wgs84 = 46.0 # fallback: center of France
zoom = _optimal_zoom_level(resolution, center_lat_wgs84, layer)
result = download_ign_tiles(min_x, max_x, min_y, max_y, layer, zoom)
if result is None:
logger.error(" ✗ Impossible de télécharger les tuiles IGN")
return None
from PIL import Image as PILImage
ign_pil = PILImage.fromarray(result)
ign_resized = ign_pil.resize((width, height), PILImage.LANCZOS)
ign_arr = np.array(ign_resized)
with rasterio.open(
output, 'w',
driver='GTiff', height=height, width=width,
count=3, dtype='uint8',
crs=crs, transform=transform,
) as dst:
for i in range(3):
dst.write(ign_arr[:, :, i], i + 1)
logger.info(f"{title} terminé ({time.time()-t0:.1f}s)")
return output
except Exception as e:
logger.error(f" ✗ Erreur IGN overlay: {e}", exc_info=True)
return None