Refactor pipeline en modules + logging verbose/debug + options CLI

- Découpage du monolithe process_lidar.py (~2750 lignes) en package
  lidar_pipeline/ avec 9 modules (gpu, dtm, visualizations, ign,
  rendering, pipeline, cli, __init__, __main__)
- Logging configurable: -v (verbose avec timestamps) et --debug
  (détails internes fichier:ligne)
- Option --force pour régénérer tous les fichiers (par défaut skip
  les WebP existants)
- Option --file NOM pour traiter un seul fichier LAZ (tests rapides)
- ProcessPoolExecutor avec répertoires temporaires uniques par worker
- Suppression du code mort (geomorphons, hillshade_ne, nodata_mask)
- Aucun fichier TIFF résiduel après conversion WebP
- setup.py pour installation pip, stub process_lidar.py compatible

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jacquin Antoine
2026-05-10 00:15:29 +02:00
parent 54800cb516
commit f07e915f6d
14 changed files with 2544 additions and 2848 deletions

View File

@ -0,0 +1,21 @@
"""LiDAR Archaeological Pipeline — detection of archaeological structures from LiDAR data.
Usage:
python -m lidar_pipeline /path/to/input -o /path/to/output -r 0.5 -v
"""
# Lazy imports to avoid importing heavy dependencies at package import time
def __getattr__(name):
"""Lazy-load heavy submodules only when accessed."""
if name == 'LidarArchaeoPipeline':
from .pipeline import LidarArchaeoPipeline
return LidarArchaeoPipeline
if name == 'VIZ_STEPS':
from .pipeline import VIZ_STEPS
return VIZ_STEPS
if name == 'main':
from .cli import main
return main
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View File

@ -0,0 +1,6 @@
"""Entry point for running the pipeline as: python -m lidar_pipeline"""
from .cli import main
if __name__ == "__main__":
main()

155
lidar_pipeline/cli.py Normal file
View File

@ -0,0 +1,155 @@
"""Command-line interface for the LiDAR archaeological pipeline.
Handles argument parsing, logging configuration, and entry point.
"""
import argparse
import logging
import sys
from .pipeline import LidarArchaeoPipeline
from .gpu import log_gpu_status
logger = logging.getLogger("lidar")
def setup_logging(verbose=False, debug=False):
"""Configure the 'lidar' logger.
Args:
verbose: If True, include timestamps and level names.
debug: If True, set level to DEBUG and add file:line info.
"""
if debug:
level = logging.DEBUG
fmt = "%(asctime)s.%(msecs)03d %(levelname)-5s [%(filename)s:%(lineno)d] %(message)s"
elif verbose:
level = logging.INFO
fmt = "%(asctime)s %(levelname)-5s %(message)s"
else:
level = logging.INFO
fmt = "%(message)s"
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S"))
logger.setLevel(level)
logger.handlers.clear()
logger.addHandler(handler)
return logger
def main():
"""Entry point for the LiDAR archaeological pipeline."""
parser = argparse.ArgumentParser(
description="Pipeline LiDAR pour détection archéologique",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Exemples:
Traitement standard:
python -m lidar_pipeline /data/input -o /data/output
Haute résolution avec accélération GPU:
python -m lidar_pipeline /data/input -o /data/output -r 0.2 -g
Mode verbeux (timestamps):
python -m lidar_pipeline /data/input -o /data/output -v
Mode debug (détails internes):
python -m lidar_pipeline /data/input -o /data/output --debug
Forcer la régénération de tous les fichiers:
python -m lidar_pipeline /data/input -o /data/output --force
Traiter un seul fichier (pour tests):
python -m lidar_pipeline /data/input -o /data/output --file LHD_FXX_1000_6881_PTS_LAMB93_IGN69.copc.laz
Traitement parallèle (4 workers):
python -m lidar_pipeline /data/input -o /data/output -w 4
"""
)
parser.add_argument(
"input",
help="Dossier contenant les fichiers LAZ/LAS"
)
parser.add_argument(
"-o", "--output",
default="/data/output",
help="Dossier de sortie (défaut: /data/output)"
)
parser.add_argument(
"-r", "--resolution",
type=float,
default=0.5,
help="Résolution en mètres par pixel (défaut: 0.5)"
)
parser.add_argument(
"-w", "--workers",
type=int,
default=1,
help="Nombre de workers pour traitement parallèle (défaut: 1)"
)
parser.add_argument(
"-f", "--force",
action="store_true",
help="Régénérer tous les fichiers même si les WebP existent déjà"
)
parser.add_argument(
"--file",
type=str,
default=None,
help="Traiter un seul fichier LAZ/LAS (pour tests, par nom partiel ou complet)"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Mode verbeux : affiche les timestamps et niveaux"
)
parser.add_argument(
"--debug",
action="store_true",
help="Mode debug : affiche les détails internes (fichier:ligne)"
)
args = parser.parse_args()
# Configure logging before any other output
setup_logging(verbose=args.verbose, debug=args.debug)
logger.info("=" * 60)
logger.info("Pipeline LiDAR Archéologique")
logger.info("=" * 60)
log_gpu_status()
try:
pipeline = LidarArchaeoPipeline(
input_dir=args.input,
output_dir=args.output,
resolution=args.resolution,
workers=args.workers,
force=args.force
)
# If --file is specified, process only that single file
if args.file:
from pathlib import Path
input_dir = Path(args.input)
# Find matching file
matches = list(input_dir.glob(f"*{args.file}*")) + list(input_dir.glob(f"*{args.file}*.laz")) + list(input_dir.glob(f"*{args.file}*.las"))
# Remove duplicates
matches = list(dict.fromkeys(matches))
if not matches:
logger.error(f"Aucun fichier trouvé pour: {args.file}")
sys.exit(1)
if len(matches) > 1:
logger.info(f"Plusieurs fichiers correspondent, utilisation du premier:")
for m in matches:
logger.info(f" {m.name}")
laz_file = matches[0]
logger.info(f"Traitement du fichier: {laz_file.name}")
pipeline.process_file(laz_file)
else:
pipeline.process_all()
except Exception as e:
logger.error(f"Erreur fatale: {e}", exc_info=True)
sys.exit(1)

209
lidar_pipeline/dtm.py Normal file
View File

@ -0,0 +1,209 @@
"""DTM generation from classified LiDAR point clouds.
Handles ground classification via PDAL/SMRF and DTM rasterisation
using scipy binned_statistic_2d with gap-filling interpolation.
"""
import json
import logging
import subprocess
from pathlib import Path
import numpy as np
import rasterio
from rasterio.transform import from_bounds
from scipy import ndimage as scipy_ndimage
from scipy.ndimage import distance_transform_edt, gaussian_filter
from scipy.stats import binned_statistic_2d
from scipy import interpolate
logger = logging.getLogger("lidar")
def create_smrf_pipeline(input_laz, output_las):
"""Create a PDAL pipeline JSON for SMRF ground classification.
Args:
input_laz: Path to input LAZ/LAS file.
output_las: Path to output classified LAS file.
Returns:
JSON string of the PDAL pipeline.
"""
pipeline = {
"pipeline": [
str(input_laz),
{
"type": "filters.smrf",
"ignore": "Classification[7:7]",
"slope": 1.0,
"window": 16.0,
"threshold": 0.5,
"scalar": 1.25
},
{
"type": "filters.range",
"limits": "Classification[2:2]"
},
{
"type": "writers.las",
"filename": str(output_las),
"extra_dims": "all"
}
]
}
return json.dumps(pipeline)
def classify_ground(laz_file, temp_dir):
"""Classify ground points using PDAL SMRF filter.
Args:
laz_file: Path to input LAZ/LAS file.
temp_dir: Directory for temporary files (pipeline.json, ground.las).
Returns:
Path to classified ground LAS file, or None on failure.
"""
import laspy # noqa: ensure available
output_las = temp_dir / f"{laz_file.stem}_ground.las"
if output_las.exists():
logger.info(" Classification déjà effectuée — fichier existant réutilisé")
return output_las
pipeline_json = create_smrf_pipeline(laz_file, output_las)
pipeline_file = temp_dir / "pipeline.json"
with open(pipeline_file, 'w') as f:
f.write(pipeline_json)
try:
subprocess.run(
["pdal", "pipeline", str(pipeline_file)],
capture_output=True, check=True
)
logger.info(" ✓ Classification sol terminée")
return output_las
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode()
logger.error(f" ✗ Erreur PDAL: {error_msg}")
# If error is about ReturnNumber=0, try filtering those points
if "ReturnNumber" in error_msg and "NumberOfReturns" in error_msg:
logger.info(" → Tentative de filtrage des points ReturnNumber=0...")
filtered_pipeline = [
{"type": "readers.las", "filename": str(laz_file)},
{"type": "filters.range", "limits": "ReturnNumber[1:],NumberOfReturns[1:]"},
{"type": "filters.smrf", "scalar": 1.25},
{"type": "filters.range", "limits": "Classification[2:2]"},
{"type": "writers.las", "filename": str(output_las), "extra_dims": "all"}
]
filtered_json = json.dumps(filtered_pipeline)
with open(pipeline_file, 'w') as f:
f.write(filtered_json)
try:
subprocess.run(
["pdal", "pipeline", str(pipeline_file)],
capture_output=True, check=True
)
logger.info(" ✓ Classification sol terminée (points filtrés)")
return output_las
except subprocess.CalledProcessError as e2:
logger.error(f" ✗ Erreur même avec filtrage: {e2.stderr.decode()}")
return None
return None
def create_dtm_fast(las_file, basename, dtm_dir, resolution):
"""Create DTM using fast binning method with gap filling.
Args:
las_file: Path to classified ground LAS file.
basename: Base name for output file.
dtm_dir: Directory for output DTM GeoTIFF.
resolution: Grid resolution in meters per pixel.
Returns:
Path to output DTM GeoTIFF, or None on failure.
"""
import laspy
logger.info(" → Génération DTM...")
try:
las = laspy.read(str(las_file))
min_x, max_x = float(las.header.min[0]), float(las.header.max[0])
min_y, max_y = float(las.header.min[1]), float(las.header.max[1])
width = int(np.ceil((max_x - min_x) / resolution))
height = int(np.ceil((max_y - min_y) / resolution))
logger.debug(f" Bounds: X[{min_x:.1f}, {max_x:.1f}] Y[{min_y:.1f}, {max_y:.1f}]")
logger.debug(f" Grid: {width}x{height} pixels ({len(las.points):,} points)")
logger.info(f" Rasterisation {width}x{height} ({len(las.points):,} points)...")
stat = binned_statistic_2d(
las.x, las.y, las.z,
statistic='mean',
bins=[width, height],
range=[[min_x, max_x], [min_y, max_y]]
)
dtm = stat.statistic.T
dtm = dtm[::-1, :] # Flip Y so north is at top
# Fill gaps using interpolation
mask = np.isnan(dtm)
if np.any(mask):
logger.info(" Remplissage des zones vides...")
dist_to_nearest = distance_transform_edt(mask)
max_fill_distance = max(20, int(10 / resolution))
y_coords, x_coords = np.where(~mask)
z_values = dtm[~mask]
if len(y_coords) > 100:
interp = interpolate.NearestNDInterpolator(
np.column_stack((y_coords, x_coords)),
z_values
)
y_missing, x_missing = np.where(mask)
dtm[y_missing, x_missing] = interp(y_missing, x_missing)
dtm_smooth = gaussian_filter(dtm, sigma=2.0)
fill_mask = mask & (dist_to_nearest <= max_fill_distance)
dtm[fill_mask] = dtm_smooth[fill_mask]
far_mask = mask & (dist_to_nearest > max_fill_distance)
dtm[far_mask] = np.nan
# Save as GeoTIFF
output_tif = dtm_dir / f"{basename}_dtm.tif"
transform = from_bounds(min_x, min_y, max_x, max_y, width, height)
with rasterio.open(
output_tif, 'w',
driver='GTiff', height=height, width=width,
count=1, dtype='float32',
crs='EPSG:2154', transform=transform,
compress='lzw'
) as dst:
dst.write(dtm.astype('float32'), 1)
logger.info(f" ✓ DTM créé: {output_tif.name}")
return output_tif
except Exception as e:
logger.error(f" ✗ Erreur DTM: {e}", exc_info=True)
return None

73
lidar_pipeline/gpu.py Normal file
View File

@ -0,0 +1,73 @@
"""GPU acceleration helpers for LiDAR pipeline.
Provides CuPy/numpy abstraction layer. If CuPy is available and a CUDA GPU
is detected, array operations are accelerated on the GPU. Otherwise, all
operations fall back to numpy/scipy on CPU.
"""
import logging
import numpy as np
from scipy import ndimage
logger = logging.getLogger("lidar")
# GPU detection - must happen at import time
HAS_GPU = False
_gpu_name = None
_gpu_mem_gb = 0
_xp = np # Default: CPU
try:
import cupy as cp
import cupyx.scipy.ndimage as cp_ndimage
_gpu_info = cp.cuda.runtime.getDeviceProperties(0)
_gpu_name = _gpu_info['name'].decode() if isinstance(_gpu_info['name'], bytes) else str(_gpu_info['name'])
_gpu_mem_gb = _gpu_info['totalGlobalMem'] // (1024 ** 3)
HAS_GPU = True
_xp = cp
except (ImportError, Exception):
pass
def log_gpu_status():
"""Log GPU detection result. Called after logging is configured."""
if HAS_GPU:
logger.info(f"GPU détectée: {_gpu_name} ({_gpu_mem_gb} Go VRAM)")
else:
logger.info("Pas de GPU — mode CPU uniquement")
def to_gpu(arr):
"""Send array to GPU if available, otherwise return as float64 numpy."""
if HAS_GPU:
return cp.asarray(arr.astype(np.float64))
return arr.astype(np.float64)
def to_cpu(arr):
"""Bring array back to CPU (numpy). No-op if already on CPU."""
if HAS_GPU and isinstance(arr, cp.ndarray):
return cp.asnumpy(arr)
return arr
def xp_gaussian_filter(arr, sigma):
"""Gaussian filter — uses GPU if array is on GPU, CPU otherwise."""
if HAS_GPU and isinstance(arr, cp.ndarray):
return cp_ndimage.gaussian_filter(arr, sigma)
return ndimage.gaussian_filter(arr, sigma)
def xp_uniform_filter(arr, size):
"""Uniform filter — uses GPU if array is on GPU, CPU otherwise."""
if HAS_GPU and isinstance(arr, cp.ndarray):
return cp_ndimage.uniform_filter(arr, size)
return ndimage.uniform_filter(arr, size)
def xp_minimum_filter(arr, footprint=None, size=None):
"""Minimum filter — uses GPU if array is on GPU, CPU otherwise."""
if HAS_GPU and isinstance(arr, cp.ndarray):
return cp_ndimage.minimum_filter(arr, footprint=footprint, size=size)
return ndimage.minimum_filter(arr, footprint=footprint, size=size)

209
lidar_pipeline/ign.py Normal file
View File

@ -0,0 +1,209 @@
"""IGN (French National Geographic Institute) tile download and overlay generation.
Downloads WMTS tiles from the IGN Geoportail and composites them into
GeoTIFF overlays matching the LiDAR DTM extent.
"""
import logging
import math
import time
from pathlib import Path
import numpy as np
import rasterio
try:
from rasterio.warp import transform as warp_transform
HAS_WARP = True
except ImportError:
HAS_WARP = False
logger = logging.getLogger("lidar")
def _lat_lon_to_tile(lat, lon, zoom):
"""Convert lat/lon to Web Mercator tile coordinates."""
n = 2 ** zoom
col = int((lon + 180) / 360 * n)
lat_rad = math.radians(lat)
row = int((1 - math.log(math.tan(lat_rad) + 1 / math.cos(lat_rad)) / math.pi) / 2 * n)
return col, row
def _lat_lon_to_px(lat, lon, zoom, tile_size=256):
"""Convert lat/lon to Web Mercator pixel coordinates."""
n = 2 ** zoom
px_x = (lon + 180) / 360 * n * tile_size
lat_rad = math.radians(lat)
px_y = (1 - math.log(math.tan(lat_rad) + 1 / math.cos(lat_rad)) / math.pi) / 2 * n * tile_size
return px_x, px_y
def download_ign_tiles(min_x, max_x, min_y, max_y, layer, zoom_level=15):
"""Download IGN WMTS tiles for the given bounds using Web Mercator (PM).
Args:
min_x, max_x, min_y, max_y: Bounds in Lambert 93.
layer: IGN WMTS layer name.
zoom_level: WMTS zoom level (default 15).
Returns:
numpy array (H, W, 3) uint8, or None on failure.
"""
import urllib.request
import io
from PIL import Image as PILImage
if not HAS_WARP:
logger.error(" ✗ rasterio.warp non disponible pour conversion de coordonnées")
return None
try:
l93_xs = [min_x, max_x]
l93_ys = [max_y, min_y]
lons, lats = warp_transform('EPSG:2154', 'EPSG:4326', l93_xs, l93_ys)
nw_lat, nw_lon = lats[0], lons[0]
se_lat, se_lon = lats[1], lons[1]
except Exception:
logger.error(" ✗ Impossible de convertir les coordonnées")
return None
wmts_url = "https://data.geopf.fr/wmts"
tile_matrix_set = "PM"
tile_size = 256
col_min, row_min = _lat_lon_to_tile(nw_lat, nw_lon, zoom_level)
col_max, row_max = _lat_lon_to_tile(se_lat, se_lon, zoom_level)
nw_px_x, nw_px_y = _lat_lon_to_px(nw_lat, nw_lon, zoom_level)
se_px_x, se_px_y = _lat_lon_to_px(se_lat, se_lon, zoom_level)
out_width = int(se_px_x - nw_px_x)
out_height = int(se_px_y - nw_px_y)
if out_width <= 0 or out_height <= 0 or out_width > 5000 or out_height > 5000:
return None
composite = np.full((out_height, out_width, 3), 255, dtype=np.uint8)
tiles_downloaded = 0
fmt = "image/png" if 'PLAN' in layer else "image/jpeg"
for col in range(col_min, col_max + 1):
for row in range(row_min, row_max + 1):
url = (
f"{wmts_url}?SERVICE=WMTS&VERSION=1.0.0&REQUEST=GetTile"
f"&LAYER={layer}&STYLE=normal"
f"&TILEMATRIXSET={tile_matrix_set}"
f"&TILEMATRIX={zoom_level}&TILECOL={col}&TILEROW={row}"
f"&FORMAT={fmt}"
)
try:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=10) as response:
tile_data = response.read()
tile_img = PILImage.open(io.BytesIO(tile_data)).convert('RGB')
tile_arr = np.array(tile_img)
tile_origin_x = col * tile_size
tile_origin_y = row * tile_size
px_x = int(tile_origin_x - nw_px_x)
px_y = int(tile_origin_y - nw_px_y)
x_off = max(0, -px_x)
y_off = max(0, -px_y)
dst_x_start = max(0, px_x)
dst_y_start = max(0, px_y)
dst_x_end = min(out_width, px_x + tile_size)
dst_y_end = min(out_height, px_y + tile_size)
src_x = x_off
src_y = y_off
src_w = dst_x_end - dst_x_start
src_h = dst_y_end - dst_y_start
if src_w > 0 and src_h > 0 and tile_arr.shape[0] >= src_y + src_h and tile_arr.shape[1] >= src_x + src_w:
composite[dst_y_start:dst_y_end, dst_x_start:dst_x_end] = \
tile_arr[src_y:src_y+src_h, src_x:src_x+src_w]
tiles_downloaded += 1
except Exception as e:
if tiles_downloaded == 0 and col == col_min and row == row_min:
logger.error(f" ✗ Erreur tuile IGN: {e}")
continue
logger.info(f"{tiles_downloaded} tuiles IGN téléchargées ({layer})")
if tiles_downloaded == 0:
return None
return composite
def generate_ign_overlay(dem_file, basename, vis_dir, resolution, layer, title, legend_label, description, out_suffix):
"""Generate an IGN basemap overlay visualization.
Args:
dem_file: Path to DTM GeoTIFF.
basename: Base name for output file.
vis_dir: Output directory for visualizations.
resolution: Grid resolution in m/px.
layer: IGN WMTS layer name.
title: Title for the overlay.
legend_label: Legend text.
description: Description text.
out_suffix: Suffix for output filename.
Returns:
Path to output GeoTIFF, or None on failure.
"""
logger.info(f"{title}...")
t0 = time.time()
output = vis_dir / f"{basename}_{out_suffix}.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
height, width = dem.shape
top_left_x = transform.c
top_left_y = transform.f
pixel_size_x = transform.a
pixel_size_y = abs(transform.e)
min_x = top_left_x
max_x = top_left_x + width * pixel_size_x
max_y = top_left_y
min_y = top_left_y - height * pixel_size_y
zoom = 15
result = download_ign_tiles(min_x, max_x, min_y, max_y, layer, zoom)
if result is None:
logger.error(" ✗ Impossible de télécharger les tuiles IGN")
return None
from PIL import Image as PILImage
ign_pil = PILImage.fromarray(result)
ign_resized = ign_pil.resize((width, height), PILImage.LANCZOS)
ign_arr = np.array(ign_resized)
with rasterio.open(
output, 'w',
driver='GTiff', height=height, width=width,
count=3, dtype='uint8',
crs=crs, transform=transform,
) as dst:
for i in range(3):
dst.write(ign_arr[:, :, i], i + 1)
logger.info(f"{title} terminé ({time.time()-t0:.1f}s)")
return output
except Exception as e:
logger.error(f" ✗ Erreur IGN overlay: {e}", exc_info=True)
return None

317
lidar_pipeline/pipeline.py Normal file
View File

@ -0,0 +1,317 @@
"""Pipeline orchestration for LiDAR archaeological analysis.
LidarArchaeoPipeline coordinates the full processing chain:
1. Ground classification (PDAL/SMRF)
2. DTM generation
3. Visualization generation (19 products)
4. Rendering (WebP + PDF report)
"""
import logging
import shutil
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
import subprocess
from .dtm import classify_ground, create_dtm_fast
from .visualizations import (
generate_hillshade, generate_slope, generate_aspect, generate_curvature,
generate_solar, generate_lrm, generate_svf, generate_openness,
generate_mslrm, generate_tpi, generate_depressions, generate_sailore,
generate_roughness, generate_anomalies, generate_wavelet, generate_texture,
generate_flow,
)
from .ign import generate_ign_overlay
from .rendering import tif_to_png, generate_pdf_report
logger = logging.getLogger("lidar")
# Ordered list of visualization steps.
# Each entry: (name, function_or_lambda)
# Adding a new visualization = add a generate_* function + register here.
VIZ_STEPS = [
('hillshade', generate_hillshade),
('slope', generate_slope),
('aspect', generate_aspect),
('curvature', generate_curvature),
('solar', generate_solar),
('svf', generate_svf),
('lrm', generate_lrm),
('pos_open', lambda d, b, v, r: generate_openness(d, b, v, r, positive=True)),
('neg_open', lambda d, b, v, r: generate_openness(d, b, v, r, positive=False)),
('mslrm', generate_mslrm),
('tpi', generate_tpi),
('depressions', generate_depressions),
('sailore', generate_sailore),
('roughness', generate_roughness),
('anomalies', generate_anomalies),
('wavelet', generate_wavelet),
('texture', generate_texture),
('flow', generate_flow),
('ortho', lambda d, b, v, r: generate_ign_overlay(
d, b, v, r,
layer='ORTHOIMAGERY.ORTHOPHOTOS',
title='Photographie Aérienne IGN',
legend_label='Orthophotographie\nImage aérienne',
description='Photographie aérienne IGN (Orthophoto)',
out_suffix='ortho')),
('topo', lambda d, b, v, r: generate_ign_overlay(
d, b, v, r,
layer='GEOGRAPHICALGRIDSYSTEMS.PLANIGNV2',
title='Carte Topographique IGN',
legend_label='Carte IGN\nPlan topographique',
description='Carte topographique IGN (Plan IGN)',
out_suffix='topo')),
]
class LidarArchaeoPipeline:
"""Orchestrates the LiDAR archaeological analysis pipeline."""
def __init__(self, input_dir, output_dir, resolution=0.5, workers=1, force=False):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.resolution = resolution
self.workers = workers
self.force = force
self.temp_dir = self.output_dir / "temp"
if not self.input_dir.exists():
raise ValueError(f"Répertoire introuvable: {self.input_dir}")
self.output_dir.mkdir(parents=True, exist_ok=True)
self.temp_dir.mkdir(exist_ok=True)
self.dtm_dir = self.output_dir / "DTM"
self.vis_dir = self.output_dir / "visualisations"
self.pdf_dir = self.output_dir / "rapports"
for d in [self.dtm_dir, self.vis_dir, self.pdf_dir]:
d.mkdir(exist_ok=True)
logger.info("Pipeline initialisé")
logger.info(f" Entrée : {self.input_dir}")
logger.info(f" Sortie : {self.output_dir}")
logger.info(f" Résolution : {resolution}m/px")
logger.info(f" Workers : {workers}")
logger.info(f" Force : {'OUI' if self.force else 'non (skip existing)'}")
def find_laz_files(self):
"""Find all LAZ/LAS files in input directory."""
files = list(self.input_dir.glob("*.laz")) + list(self.input_dir.glob("*.las"))
logger.info(f"{len(files)} fichier(s) LiDAR trouvé(s)")
for f in sorted(files):
logger.debug(f" {f.name}")
return sorted(files)
def check_tools(self):
"""Check that required external tools are available."""
for name, cmd in [('pdal', 'pdal --version'), ('gdal', 'gdalinfo --version')]:
try:
result = subprocess.run(cmd.split(), capture_output=True, check=True, text=True)
version = result.stdout.strip().split('\n')[0]
logger.info(f"{name}: {version}")
except (subprocess.CalledProcessError, FileNotFoundError):
logger.error(f"{name} non disponible")
return False
return True
def generate_all_visualizations(self, dtm_file, basename):
"""Generate all archaeological visualizations for one DTM file.
Returns a dict of {name: tif_path} for successful generations.
"""
logger.info(" Génération visualisations:")
# Create per-file subdirectory
file_vis_dir = self.vis_dir / basename
file_vis_dir.mkdir(exist_ok=True)
vis_results = {}
total = len(VIZ_STEPS)
for idx, (name, func) in enumerate(VIZ_STEPS, 1):
# Check if output WebP already exists (skip unless --force)
if not self.force:
# Determine expected WebP filename from the viz name
# Special cases for openness and IGN overlays
if name == 'pos_open':
expected_webp = file_vis_dir / f"{basename}_positive_openness.webp"
elif name == 'neg_open':
expected_webp = file_vis_dir / f"{basename}_negative_openness.webp"
elif name == 'hillshade':
expected_webp = file_vis_dir / f"{basename}_hillshade_multi.webp"
elif name in ('ortho', 'topo'):
expected_webp = file_vis_dir / f"{basename}_{name}.webp"
else:
expected_webp = file_vis_dir / f"{basename}_{name}.webp"
if expected_webp.exists():
logger.info(f" [{idx}/{total}] {name}: déjà existant, ignoré")
vis_results[name] = expected_webp # Track as existing file
continue
logger.info(f" [{idx}/{total}] {name}...")
t0 = time.time()
try:
result = func(dtm_file, basename, file_vis_dir, self.resolution)
vis_results[name] = result
elapsed = time.time() - t0
if result:
logger.info(f" [{idx}/{total}] ✓ {name} ({elapsed:.1f}s)")
else:
logger.warning(f" [{idx}/{total}] ✗ {name} — no output ({elapsed:.1f}s)")
except Exception as e:
vis_results[name] = None
logger.error(f" [{idx}/{total}] ✗ {name}: {e}", exc_info=True)
# Convert to WebP (only newly generated TIFs, not skipped ones)
logger.info(" Conversion images WebP:")
for name, tif_file in vis_results.items():
if tif_file and isinstance(tif_file, Path) and tif_file.suffix == '.tif' and tif_file.exists():
webp_file = tif_to_png(tif_file, file_vis_dir, self.resolution)
if webp_file:
logger.info(f"{webp_file.name}")
return vis_results
def process_file(self, laz_file):
"""Process a single LAZ file through the full pipeline."""
basename = laz_file.stem
t_start = time.time()
logger.info("=" * 60)
logger.info(f"FICHIER : {basename}")
logger.info("=" * 60)
# Step 1: Ground classification
logger.info("[1/4] Classification du sol...")
t1 = time.time()
las_file = classify_ground(laz_file, self.temp_dir)
t_classif = time.time() - t1
if not las_file:
logger.error(f" ✗ Échec classification ({t_classif:.1f}s)")
return False
logger.info(f" ✓ Classification terminée ({t_classif:.1f}s)")
# Step 2: Generate DTM
logger.info("[2/4] Génération DTM...")
t2 = time.time()
dtm_file = create_dtm_fast(las_file, basename, self.dtm_dir, self.resolution)
t_dtm = time.time() - t2
if not dtm_file:
logger.error(f" ✗ Échec DTM ({t_dtm:.1f}s)")
return False
logger.info(f" ✓ DTM terminé ({t_dtm:.1f}s)")
# Step 3: Visualizations
logger.info("[3/4] Visualisations archéologiques...")
self.generate_all_visualizations(dtm_file, basename)
# Step 4: PDF report
file_vis_dir = self.vis_dir / basename
logger.info("[4/4] Rapport PDF A3...")
t4 = time.time()
generate_pdf_report(basename, file_vis_dir, self.pdf_dir, self.resolution)
t_pdf = time.time() - t4
logger.info(f" ✓ Rapport PDF terminé ({t_pdf:.1f}s)")
t_total = time.time() - t_start
logger.info(f"{basename} terminé en {t_total:.1f}s")
logger.debug(f" Détails: classification={t_classif:.1f}s, DTM={t_dtm:.1f}s, PDF={t_pdf:.1f}s")
return True
def process_all(self):
"""Process all LAZ files in input directory."""
files = self.find_laz_files()
if not files:
logger.error("Aucun fichier LAZ/LAS trouvé !")
return
logger.info("=" * 60)
logger.info("PIPELINE ARCHÉOLOGIQUE LiDAR")
logger.info("=" * 60)
logger.info("Vérification des outils...")
if not self.check_tools():
logger.error("Outils manquants — abandon")
return
results = {}
t_pipeline_start = time.time()
if self.workers > 1 and len(files) > 1:
logger.info(f"Traitement parallèle avec {self.workers} workers...")
logger.info(f"Fichiers: {len(files)}")
with ProcessPoolExecutor(max_workers=self.workers) as executor:
future_to_file = {
executor.submit(_process_file_standalone, str(laz_file), str(self.input_dir), str(self.output_dir), self.resolution, self.force): laz_file
for laz_file in files
}
for idx, future in enumerate(as_completed(future_to_file), 1):
laz_file = future_to_file[future]
try:
success = future.result()
results[laz_file.name] = success
status = "" if success else ""
logger.info(f" [{idx}/{len(files)}] {status} {laz_file.name}")
except Exception as e:
logger.error(f" [{idx}/{len(files)}] ✗ {laz_file.name}: {e}")
logger.debug(f" Traceback:", exc_info=True)
results[laz_file.name] = False
else:
total = len(files)
for idx, laz_file in enumerate(files, 1):
logger.info(f"--- Fichier {idx}/{total} ---")
try:
results[laz_file.name] = self.process_file(laz_file)
except Exception as e:
logger.error(f"✗ Erreur traitement {laz_file.name}: {e}")
logger.debug("Traceback:", exc_info=True)
results[laz_file.name] = False
# Summary
t_pipeline_total = time.time() - t_pipeline_start
success_count = sum(1 for v in results.values() if v)
fail_count = sum(1 for v in results.values() if not v)
logger.info("=" * 60)
logger.info("RÉSUMÉ")
logger.info("=" * 60)
logger.info(f" Succès : {success_count}/{len(results)}")
if fail_count:
logger.info(f" Échecs : {fail_count}/{len(results)}")
for name, ok in results.items():
if not ok:
logger.info(f"{name}")
logger.info(f" Durée totale : {t_pipeline_total:.1f}s ({t_pipeline_total/60:.1f}min)")
logger.info(f"\nRésultats dans: {self.output_dir}")
logger.info(f" • DTM : {self.dtm_dir}")
logger.info(f" • Visualisations: {self.vis_dir}")
logger.info(f" • Rapports PDF : {self.pdf_dir}")
# Clean up temporary files
logger.info("Nettoyage des fichiers temporaires...")
try:
if self.temp_dir.exists():
shutil.rmtree(self.temp_dir)
logger.info(" ✓ Fichiers temporaires supprimés")
except Exception as e:
logger.warning(f" Note: Impossible de supprimer les fichiers temporaires: {e}")
def _process_file_standalone(laz_file_str, input_dir, output_dir, resolution, force=False):
"""Standalone function for multiprocessing — creates its own pipeline instance.
Each worker gets its own temp directory to avoid file conflicts.
"""
pipeline = LidarArchaeoPipeline(input_dir, output_dir, resolution=resolution, workers=1, force=force)
basename = Path(laz_file_str).stem
pipeline.temp_dir = pipeline.output_dir / f"temp_{basename}"
pipeline.temp_dir.mkdir(exist_ok=True)
laz_file = Path(laz_file_str)
return pipeline.process_file(laz_file)

605
lidar_pipeline/rendering.py Normal file
View File

@ -0,0 +1,605 @@
"""Rendering module: colormap registry, GeoTIFF-to-WebP conversion, and PDF report generation.
Contains:
- COLORMAPS: registry mapping filename keywords to (cmap, title, legend, description)
- tif_to_png(): convert a GeoTIFF to a WebP visualization with legend, scale bar, north arrow
- generate_pdf_report(): generate an A3 PDF report with all visualizations
"""
import logging
import time
from pathlib import Path
import numpy as np
import rasterio
try:
from rasterio.warp import transform as warp_transform
HAS_WARP = True
except ImportError:
HAS_WARP = False
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from matplotlib import rcParams
from matplotlib.gridspec import GridSpec
from matplotlib.patches import Polygon as MplPolygon, Rectangle as RectPatch
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
rcParams['figure.dpi'] = 150
rcParams['savefig.dpi'] = 300
rcParams['font.size'] = 10
logger = logging.getLogger("lidar")
# ============================================================
# Colormap registry
# ============================================================
# Each entry: keyword → (cmap, title, legend_label, description, vmin_mode, vmax_mode)
# vmin_mode/vmax_mode: 'percentile_X_Y' or '0_max_X' or 'symmetric_X_Y' or 'fixed_0_1'
# For RGB images (ortho/topo), special handling is done in tif_to_png.
COLORMAPS = {
'hillshade': {
'cmap': 'gray',
'title': 'Hillshade Multidirectionnel',
'legend': 'Illumination combinée de 6 directions\nBlanc = Face éclairée | Noir = Zone d\'ombre',
'description': 'Ombres portées révélant micro-relief (murs, fossés, terrasses)',
'vmin_mode': 'fixed', 'vmin_val': 0,
'vmax_mode': 'fixed', 'vmax_val': 1,
},
'slope': {
'cmap': 'inferno',
'title': 'Pente (Inclinaison du terrain)',
'legend': 'Inclinaison en degrés\nMin: {vmin:.1f}° | Max: {vmax:.1f}°\nClair = Forte pente | Sombre = Terrain plat',
'description': 'Murs, talus et bords ressortent en clair — terrain plat en sombre',
'vmin_mode': 'fixed', 'vmin_val': 0,
'vmax_mode': 'percentile', 'vmax_pct': 95,
},
'aspect': {
'cmap': 'hsv',
'title': 'Aspect (Direction des pentes)',
'legend': 'Direction vers laquelle le terrain descend\nRouge=Nord | Vert=Est | Cyan=Sud | Bleu=Ouest',
'description': 'Orientation des pentes — utile pour distinguer structures des formes naturelles',
'vmin_mode': 'fixed', 'vmin_val': 0,
'vmax_mode': 'fixed', 'vmax_val': 360,
},
'solar': {
'cmap': 'gray',
'title': 'Éclairage Solaire',
'legend': 'Illumination solaire (azimut 90°, altitude 30°)\nClair = Face éclairée | Sombre = Zone d\'ombre',
'description': 'Simulation de l\'éclairage solaire matinal',
'vmin_mode': 'fixed', 'vmin_val': 0,
'vmax_mode': 'fixed', 'vmax_val': 1,
},
'curvature': {
'cmap': 'RdYlBu_r',
'title': 'Courbure (Convexité/Concavité du terrain)',
'legend': 'Changement de pente (1/m)\nRouge = Convexe (sommet de mur, levée)\nBleu = Concave (fond de fossé, dépression)',
'description': 'Détecte les ruptures de pente — utile pour bords de terrasses et levées',
'vmin_mode': 'symmetric', 'sym_pct': (5, 95),
},
'svf': {
'cmap': 'viridis',
'title': 'Sky-View Factor (Ray-tracing 16 directions)',
'legend': 'Fraction de ciel visible depuis chaque point\nSombre = Encaissé (fossés, vallées, rues)\nClair = Dégagé (sommets, plateformes, plateaux)',
'description': 'Ray-tracing sur 16 azimuts — élimine l\'éclairage, détecte structures linéaires et enclos',
'vmin_mode': 'percentile', 'vmin_pct': 5,
'vmax_mode': 'percentile', 'vmax_pct': 95,
},
'mslrm': {
'cmap': 'RdBu_r',
'title': 'MSRM - Multi-Scale Relief Model (5 échelles)',
'legend': 'Relief combiné à 5 échelles (5m à 100m)\nRouge = Surélévation (mur, tumulus, levée)\nBleu = Dépression (fossé, douve, fossé)\n\nDifférence avec LRM:\nLRM = 1 échelle (15m)\nMSRM = 5 échelles combinées\nMSRM détecte du micro au macro',
'description': 'Combine LRM à 5 échelles — détecte structures de 5m à 100m simultanément',
'vmin_mode': 'symmetric', 'sym_pct': (2, 98),
},
'lrm': {
'cmap': 'RdBu_r',
'title': 'LRM - Local Relief Model (échelle unique 15m)',
'legend': 'Écart local par rapport au terrain moyen (m)\nRouge = Surélévation (+{vmax:.2f}m)\nBleu = Dépression ({vmin:.2f}m)\nNoyau gaussien unique de 15m',
'description': 'Micro-relief à 15m seulement — voir MSRM pour toutes les échelles',
'vmin_mode': 'symmetric', 'sym_pct': (2, 98),
},
'positive_openness': {
'cmap': 'YlOrBr',
'title': 'Openness Positive (ouverture vers le haut)',
'legend': 'Angle d\'ouverture vers le haut (deg)\nClair = Vue dégagée vers le ciel (sommets, plateaux)\nSombre = Vue bloquée (vallées encaissées)',
'description': 'Ray-tracing 8 directions — complémentaire de la négative pour détecter crêtes',
'vmin_mode': 'percentile', 'vmin_pct': 10,
'vmax_mode': 'percentile', 'vmax_pct': 98,
},
'negative_openness': {
'cmap': 'GnBu_r',
'title': 'Openness Negative (ouverture vers le bas)',
'legend': 'Angle d\'ouverture vers le bas (deg)\nClair = Surplomb (bords de fossé, grottes)\nSombre = Terrain plat (fonds de vallée)\nMeilleur détecteur de cavités et dolines',
'description': 'Ray-tracing 8 directions — détecte fossés, dolines, souterrains',
'vmin_mode': 'percentile', 'vmin_pct': 10,
'vmax_mode': 'percentile', 'vmax_pct': 98,
},
'tpi': {
'cmap': 'BrBG',
'title': 'TPI - Topographic Position Index (2 échelles)',
'legend': 'Position dans le paysage\nBrun/Sombre = Plus bas que le voisinage (fossé, vallée)\nVert/Clair = Plus haut que le voisinage (crête, plateau)\nCombine échelle fine 5m + large 100m',
'description': 'Identifie la position topographique — utile pour repérer crêtes vs vallées à grande échelle',
'vmin_mode': 'symmetric', 'sym_pct': (2, 98),
},
'depressions': {
'cmap': 'YlOrRd',
'title': 'Dépressions (Remplissage hydrologique)',
'legend': 'Profondeur des cuvettes détectées (m)\nTransparent = Pas de dépression\nJaune = Dépression légère | Rouge = Dépression profonde\nMax: {vmax:.2f}m',
'description': 'Simule le remplissage d\'eau — détecte dolines, sinkholes, cuvettes et zones inondables',
'vmin_mode': 'fixed', 'vmin_val': 0,
'vmax_mode': 'percentile', 'vmax_pct': 99,
},
'sailore': {
'cmap': 'seismic',
'title': 'SAILORE - LRM Auto-Adaptatif',
'legend': 'Relief local adaptatif (m)\nRouge = Surélévation | Bleu = Dépression\n\nDifférence avec LRM/MSRM:\nLRM = noyau fixe 15m\nMSRM = 5 noyaux fixes\nSAILORE = noyau adapté à la pente\nPlat=grand noyau | Pente=petit noyau',
'description': 'Noyau qui s\'adapte à la pente locale — terrain plat=grand noyau, pente=petit noyau',
'vmin_mode': 'symmetric', 'sym_pct': (2, 98),
},
'roughness': {
'cmap': 'magma',
'title': 'Rugosité de Surface (Écart-type local 5m)',
'legend': 'Irrégularité du terrain dans un voisinage de 5m\nSombre = Surface lisse (route, mur, sol plat)\nClair = Surface rugueuse (végétation, ruines, pierres)\nMax: {vmax:.2f}m',
'description': 'Mesure la variabilité locale — surfaces anthropiques lisses vs naturelles rugueuses',
'vmin_mode': 'fixed', 'vmin_val': 0,
'vmax_mode': 'percentile', 'vmax_pct': 97,
},
'anomalies': {
'cmap': 'coolwarm',
'title': 'Anomalies Statistiques (Z-score x Moran\'s I)',
'legend': 'Anomalies topographiques significatives\nRouge vif = Surélévation anormale (mur, tumulus)\nBleu vif = Dépression anormale (fossé, doline)\nBlanc/gris = Normal\n\nCombine z-score (intensité) et\nMoran\'s I (regroupement spatial)',
'description': 'Détecte uniquement les anomalies statistiquement significatives — filtre le bruit de fond',
'vmin_mode': 'symmetric', 'sym_pct': (5, 95),
},
'wavelet': {
'cmap': 'cividis',
'title': 'Ondelette Mexican Hat (CWT multi-échelle)',
'legend': 'Réponse de la transformée en ondelette à 5 échelles\nÉchelles: 2m, 5m, 10m, 20m, 50m\n\nClair = Structure détectée à cette échelle\nSombre = Pas de structure\n\nOptimisé pour formes circulaires:\ntumulus, enclos, fossés annulaires',
'description': 'Transformée en ondelette 2D — excellente pour détecter structures circulaires',
'vmin_mode': 'symmetric', 'sym_pct': (2, 98),
},
'texture': {
'cmap': 'inferno',
'title': 'Texture GLCM (Contraste + Entropie - Homogénéité)',
'legend': 'Analyse de la texture du relief (fenêtre 5m)\nClair = Texture hétérogène (labour, ruines, sol perturbé)\nSombre = Texture homogène (sol nu, route, zone plate)\n\nCombine contraste, entropie et homogénéité',
'description': 'Distingue surfaces anthropiques (labour, chemins) des naturelles',
'vmin_mode': 'symmetric', 'sym_pct': (2, 98),
},
'flow': {
'cmap': 'Blues',
'title': 'Accumulation de Flux Hydrologique (D8)',
'legend': 'Logarithme de l\'accumulation d\'eau\nBlanc = Pas de collecte (sommet, crête)\nBleu foncé = Collecte maximale (thalweg, fossé)\n\nSimule l\'écoulement de l\'eau de pluie\nDétecte fossés d\'enceinte, routes et drainage',
'description': 'Algorithme D8 — simule le cheminement de l\'eau pour détecter fossés et routes antiques',
'vmin_mode': 'fixed', 'vmin_val': 0,
'vmax_mode': 'percentile', 'vmax_pct': 98,
},
}
# RGB entries (ortho/topo) are handled specially
RGB_LEGENDS = {
'ortho': {
'title': 'Photographie Aérienne IGN',
'legend': 'Orthophotographie\nImage aérienne',
'description': 'Photographie aérienne IGN (Orthophoto)',
},
'topo': {
'title': 'Carte Topographique IGN',
'legend': 'Carte IGN\nPlan topographique',
'description': 'Carte topographique IGN (Plan IGN)',
},
}
def _apply_colormap(data, tif_file):
"""Apply the registered colormap normalization to data based on filename.
Returns (data, cmap, title, legend_label, description, is_rgb).
"""
name = str(tif_file).lower()
# Check for RGB first
for key in RGB_LEGENDS:
if key in name:
info = RGB_LEGENDS[key]
return data, None, info['title'], info['legend'], info['description'], True
# Find matching colormap
for key, info in COLORMAPS.items():
if key in name:
valid_data = data[~np.isnan(data)] if hasattr(data, 'compressed') else data.flatten()
valid_data = valid_data[~np.isnan(valid_data)]
vmin = vmax = None
# Compute vmin/vmax based on mode
if info['vmin_mode'] == 'fixed':
vmin = info['vmin_val']
elif info['vmin_mode'] == 'percentile':
vmin = np.percentile(valid_data, info['vmin_pct'])
elif info['vmin_mode'] == 'symmetric':
vmax_abs = max(abs(np.percentile(valid_data, info['sym_pct'][0])),
abs(np.percentile(valid_data, info['sym_pct'][1])), 0.001)
vmin = -vmax_abs
vmax = vmax_abs # symmetric mode sets both vmin and vmax
if vmax is None:
# Only compute vmax if not already set by symmetric mode
if info.get('vmax_mode') == 'fixed':
vmax = info['vmax_val']
elif info.get('vmax_mode') == 'percentile':
vmax = np.percentile(valid_data, info['vmax_pct'])
elif info.get('vmax_mode') == 'symmetric':
vmax_abs = max(abs(np.percentile(valid_data, info['sym_pct'][0])),
abs(np.percentile(valid_data, info['sym_pct'][1])), 0.001)
vmax = vmax_abs
# Apply normalization
if vmin is not None and vmax is not None:
data = np.clip((data - vmin) / max(vmax - vmin, 0.001), 0, 1)
legend = info['legend'].format(vmin=vmin or 0, vmax=vmax or 0)
return data, info['cmap'], info['title'], legend, info['description'], False
# Default: terrain colormap with percentile stretch
valid_data = data[~np.isnan(data)] if hasattr(data, 'compressed') else data.flatten()
valid_data = valid_data[~np.isnan(valid_data)]
p2, p98 = np.percentile(valid_data, (2, 98))
data = np.clip((data - p2) / (p98 - p2), 0, 1)
title = Path(tif_file).stem.replace('_', ' ').title()
return data, 'terrain', title, 'Altitude normalisée', '', False
def tif_to_png(tif_file, vis_dir, resolution):
"""Convert GeoTIFF to visualization WebP with GPS coordinates, legend, and scale bar.
Args:
tif_file: Path to input GeoTIFF.
vis_dir: Output directory for the WebP file.
resolution: Grid resolution in m/px.
Returns:
Path to output WebP file, or None on failure.
"""
if not tif_file or not tif_file.exists():
return None
webp_file = vis_dir / f"{tif_file.stem}.webp"
try:
with rasterio.open(tif_file) as src:
is_rgb = src.count >= 3 and ('ortho' in str(tif_file) or 'topo' in str(tif_file))
if is_rgb:
data = src.read([1, 2, 3])
data = np.moveaxis(data, 0, -1)
else:
data = src.read(1)
nodata = src.nodata
transform = src.transform
crs = src.crs
if is_rgb:
height, width, _ = data.shape
else:
height, width = data.shape
top_left_x = transform.c
top_left_y = transform.f
pixel_size_x = transform.a
pixel_size_y = abs(transform.e)
min_x = top_left_x
max_x = top_left_x + width * pixel_size_x
max_y = top_left_y
min_y = top_left_y - height * pixel_size_y
# GPS coordinates
gps_coords = {}
if HAS_WARP and crs is not None:
try:
l93_xs = [min_x, max_x, min_x, max_x]
l93_ys = [max_y, max_y, min_y, min_y]
lons, lats = warp_transform(crs, 'EPSG:4326', l93_xs, l93_ys)
gps_coords = {
'NW': (lats[0], lons[0]),
'NE': (lats[1], lons[1]),
'SW': (lats[2], lons[2]),
'SE': (lats[3], lons[3]),
}
n_ticks = 5
tick_l93_x = np.linspace(min_x, max_x, n_ticks)
tick_l93_y_bottom = np.full(n_ticks, min_y)
tick_lons, tick_lats = warp_transform(crs, 'EPSG:4326', tick_l93_x, tick_l93_y_bottom)
gps_coords['x_ticks'] = list(zip(tick_lons, tick_lats))
tick_l93_y = np.linspace(min_y, max_y, n_ticks)
tick_l93_x_left = np.full(n_ticks, min_x)
tick_lons_y, tick_lats_y = warp_transform(crs, 'EPSG:4326', tick_l93_x_left, tick_l93_y)
gps_coords['y_ticks'] = list(zip(tick_lons_y, tick_lats_y))
except Exception:
gps_coords = {}
if nodata is not None and not is_rgb:
data = np.ma.masked_where((data == nodata) | np.isnan(data), data)
if not is_rgb:
valid_data = data.compressed() if hasattr(data, 'compressed') else data.flatten()
valid_data = valid_data[~np.isnan(valid_data)]
# Apply colormap
data, cmap, title, legend_label, description, is_rgb_result = _apply_colormap(data, tif_file)
# Create figure
fig_width = 20
map_aspect = height / width
fig = plt.figure(figsize=(fig_width, fig_width * map_aspect * 0.7 + 2.5),
facecolor='white')
gs = GridSpec(2, 1, height_ratios=[1.0, 0.06],
hspace=0.04, figure=fig,
left=0.06, right=0.88, top=0.93, bottom=0.08)
ax = fig.add_subplot(gs[0])
if is_rgb:
im = ax.imshow(data, aspect='equal', origin='upper')
else:
im = ax.imshow(data, cmap=cmap, aspect='equal', origin='upper')
ax.set_title(f"{title}\n{description}", fontsize=15, fontweight='bold', pad=10)
if not is_rgb:
cbar = plt.colorbar(im, ax=ax, pad=0.02, shrink=0.85, aspect=30)
cbar.ax.tick_params(labelsize=9, width=1.5)
cbar.outline.set_linewidth(1.5)
cbar.set_label(legend_label, fontsize=10, fontweight='bold')
else:
ax.text(1.02, 0.5, legend_label, transform=ax.transAxes,
fontsize=10, fontweight='bold', rotation=90,
verticalalignment='center', horizontalalignment='left')
# GPS coordinate ticks
if gps_coords and 'x_ticks' in gps_coords:
x_pixel_positions = np.linspace(0, width - 1, len(gps_coords['x_ticks']))
x_labels = [f"{lon:.5f}E" for lon, lat in gps_coords['x_ticks']]
ax.set_xticks(x_pixel_positions)
ax.set_xticklabels(x_labels, fontsize=7, rotation=30)
ax.set_xlabel('Longitude', fontsize=9, fontweight='bold')
y_pixel_positions = np.linspace(0, height - 1, len(gps_coords['y_ticks']))
y_labels = [f"{lat:.5f}N" for lon, lat in gps_coords['y_ticks']]
ax.set_yticks(y_pixel_positions)
ax.set_yticklabels(y_labels, fontsize=7)
ax.set_ylabel('Latitude', fontsize=9, fontweight='bold')
else:
x_ticks_count = 5
x_positions = np.linspace(0, width - 1, x_ticks_count)
x_labels = [f"{(min_x + xp * pixel_size_x)/1000:.1f}" for xp in x_positions]
ax.set_xticks(x_positions)
ax.set_xticklabels(x_labels, fontsize=8)
ax.set_xlabel('Est (km) - Lambert 93', fontsize=9, fontweight='bold')
y_ticks_count = 5
y_positions = np.linspace(0, height - 1, y_ticks_count)
y_labels = [f"{(max_y - yp * pixel_size_y)/1000:.1f}" for yp in y_positions]
ax.set_yticks(y_positions)
ax.set_yticklabels(y_labels, fontsize=8)
ax.set_ylabel('Nord (km) - Lambert 93', fontsize=9, fontweight='bold')
ax.tick_params(axis='both', which='both', direction='out', length=3,
width=0.8, colors='black')
for spine in ax.spines.values():
spine.set_visible(True)
spine.set_color('black')
spine.set_linewidth(0.8)
# North arrow
north_ax = inset_axes(ax, width="4%", height="7%", loc='upper right',
bbox_to_anchor=(-0.03, 0.12, 1, 1), bbox_transform=ax.transAxes)
north_ax.set_xlim(0, 1)
north_ax.set_ylim(0, 1)
north_ax.axis('off')
north_ax.plot([0.5, 0.5], [0.1, 0.65], color='black', linewidth=2.5, zorder=10)
north_ax.add_patch(MplPolygon([[0.5, 0.2], [0.35, 0.4], [0.5, 0.65], [0.65, 0.4]],
closed=True, facecolor='black', edgecolor='black', zorder=9))
north_ax.text(0.5, 0.95, 'N', ha='center', va='top',
fontsize=13, fontweight='bold', color='black', zorder=11)
# Bottom info bar
info_ax = fig.add_subplot(gs[1])
info_ax.axis('off')
extent_km_x = (max_x - min_x) / 1000
extent_km_y = (max_y - min_y) / 1000
if is_rgb:
alt_min = alt_max = 0
else:
alt_min = float(np.nanmin(valid_data)) if len(valid_data) > 0 else 0
alt_max = float(np.nanmax(valid_data)) if len(valid_data) > 0 else 0
if gps_coords:
nw_lat, nw_lon = gps_coords['NW']
se_lat, se_lon = gps_coords['SE']
info_text = (
f"GPS: {nw_lat:.5f}N {nw_lon:.5f}E - {se_lat:.5f}N {se_lon:.5f}E | "
f"EPSG:2154 | Res: {resolution}m/px | "
f"Emprise: {extent_km_x:.1f}x{extent_km_y:.1f}km"
)
else:
info_text = (
f"EPSG:2154 | X: {min_x:.0f}-{max_x:.0f} Y: {min_y:.0f}-{max_y:.0f} | "
f"Res: {resolution}m/px | Emprise: {extent_km_x:.1f}x{extent_km_y:.1f}km"
)
info_ax.text(0.01, 0.5, info_text,
transform=info_ax.transAxes, fontsize=8.5,
verticalalignment='center', family='monospace',
bbox=dict(boxstyle='round,pad=0.3', facecolor='#f0f0f0',
edgecolor='#aaaaaa', alpha=0.95))
# Scale bar
scale_m = 100
pixels_per_meter = 1.0 / pixel_size_x
scale_px = int(scale_m * pixels_per_meter)
scale_bar_frac = scale_px / width
bar_left = 0.80
bar_bottom = 0.15
bar_width_frac = min(scale_bar_frac, 0.15)
bar_height = 0.35
info_ax.add_patch(RectPatch((bar_left, bar_bottom), bar_width_frac, bar_height,
facecolor='black', edgecolor='black', linewidth=0.5,
transform=info_ax.transAxes, clip_on=False))
info_ax.text(bar_left + bar_width_frac / 2, bar_bottom + bar_height + 0.12,
f"{scale_m} m", ha='center', va='bottom', fontsize=9, fontweight='bold',
transform=info_ax.transAxes)
fig.patch.set_facecolor('white')
# Save as PNG then convert to WebP
png_temp = vis_dir / f"{tif_file.stem}_temp.png"
plt.savefig(png_temp, dpi=150, bbox_inches='tight', pad_inches=0.15,
facecolor='white', format='png')
plt.close()
from PIL import Image as PILImage
img = PILImage.open(str(png_temp))
img.save(str(webp_file), format='WEBP', lossless=True)
png_temp.unlink()
# Delete source TIFF
tif_file.unlink()
return webp_file
except Exception as e:
logger.error(f" Erreur conversion WebP: {e}", exc_info=True)
return None
def generate_pdf_report(basename, vis_dir, pdf_dir, resolution):
"""Generate A3 PDF report for a LiDAR file with all visualizations.
Page 1: Mise en situation (ortho + topo IGN side by side)
Pages 2+: Other visualizations (2 per page)
Args:
basename: Base name for the report file.
vis_dir: Directory containing WebP visualization files.
pdf_dir: Directory for output PDF.
resolution: Grid resolution (used in info text).
Returns:
Path to PDF file, or None on failure.
"""
from matplotlib.backends.backend_pdf import PdfPages
pdf_file = pdf_dir / f"{basename}_rapport.pdf"
logger.info(f" → Génération rapport PDF A3: {pdf_file.name}")
t0 = time.time()
# Look for WebPs in per-file subdirectory first, then fallback to main dir
file_vis_dir = vis_dir / basename
if file_vis_dir.exists():
png_files = sorted(file_vis_dir.glob("*.webp"))
else:
png_files = sorted(vis_dir.glob(f"{basename}_*.webp"))
if not png_files:
logger.warning(f" ✗ Aucune image trouvée pour {basename}")
return None
# Categorize
situ_files = []
analysis_files = []
for f in png_files:
name = f.stem.lower()
if 'ortho' in name:
situ_files.insert(0, f)
elif 'topo' in name:
situ_files.append(f)
else:
analysis_files.append(f)
# Sort analysis files by archaeological priority
order = ['mslrm', 'svf', 'negative_openness',
'positive_openness', 'sailore', 'depressions', 'hillshade_multi',
'lrm', 'tpi', 'slope', 'curvature', 'aspect',
'roughness', 'anomalies', 'wavelet', 'texture', 'flow']
def sort_key(f):
name = f.stem.lower()
for i, key in enumerate(order):
if key in name:
return i
return len(order)
analysis_files.sort(key=sort_key)
a3_w, a3_h = 16.54, 11.69
try:
with PdfPages(str(pdf_file)) as pdf:
# Page 1: Mise en situation
if situ_files:
fig = plt.figure(figsize=(a3_w, a3_h), facecolor='white')
n_situ = len(situ_files)
if n_situ == 2:
gs = fig.add_gridspec(1, 2, wspace=0.05, left=0.03, right=0.97,
top=0.92, bottom=0.06)
else:
gs = fig.add_gridspec(1, max(n_situ, 1), wspace=0.05,
left=0.03, right=0.97, top=0.92, bottom=0.06)
fig.text(0.5, 0.97, f"Mise en situation - {basename}",
fontsize=20, fontweight='bold', ha='center', va='top')
for i, f in enumerate(situ_files):
ax = fig.add_subplot(gs[0, i])
img = plt.imread(str(f))
ax.imshow(img)
ax.axis('off')
title = f.stem.replace(basename + '_', '').replace('_', ' ').title()
ax.set_title(title, fontsize=12, fontweight='bold', pad=5)
pdf.savefig(fig, dpi=150)
plt.close(fig)
# Pages 2+: Analysis maps (2 per page)
for page_start in range(0, len(analysis_files), 2):
page_files = analysis_files[page_start:page_start + 2]
fig = plt.figure(figsize=(a3_w, a3_h), facecolor='white')
if len(page_files) == 2:
gs = fig.add_gridspec(1, 2, wspace=0.08, left=0.03, right=0.97,
top=0.93, bottom=0.05)
else:
gs = fig.add_gridspec(1, 1, left=0.05, right=0.95,
top=0.93, bottom=0.05)
for i, f in enumerate(page_files):
ax = fig.add_subplot(gs[0, i])
img = plt.imread(str(f))
ax.imshow(img)
ax.axis('off')
title = f.stem.replace(basename + '_', '').replace('_', ' ').title()
ax.set_title(title, fontsize=11, fontweight='bold', pad=3)
page_num = (page_start // 2) + 2
fig.text(0.99, 0.01, f"Page {page_num}", fontsize=8,
ha='right', va='bottom', color='gray')
pdf.savefig(fig, dpi=150)
plt.close(fig)
logger.info(f" ✓ Rapport PDF terminé ({time.time()-t0:.1f}s)")
return pdf_file
except Exception as e:
logger.error(f" ✗ Erreur PDF: {e}", exc_info=True)
return None

View File

@ -0,0 +1,740 @@
"""Terrain visualization functions for LiDAR archaeological analysis.
Each function takes (dem_file, basename, vis_dir, resolution) as explicit
parameters and returns the path to the output GeoTIFF file, or None on error.
"""
import logging
import time
from pathlib import Path
import numpy as np
import rasterio
from scipy import ndimage
from scipy.ndimage import generic_filter, gaussian_filter, uniform_filter, minimum_filter
from scipy.stats import binned_statistic_2d
from .gpu import HAS_GPU, to_gpu, to_cpu, xp_gaussian_filter, xp_uniform_filter
logger = logging.getLogger("lidar")
# Use CuPy array module when available
if HAS_GPU:
import cupy as cp
xp = cp
else:
xp = np
def _save_tif(output_path, data, transform, crs, dtype='float32', count=1):
"""Helper to save a 2D or 3D array as GeoTIFF."""
if data.ndim == 2:
height, width = data.shape
with rasterio.open(
output_path, 'w', driver='GTiff',
height=height, width=width, count=count,
dtype=dtype, crs=crs, transform=transform, compress='lzw'
) as dst:
dst.write(data.astype(dtype), 1)
elif data.ndim == 3:
bands, height, width = data.shape
with rasterio.open(
output_path, 'w', driver='GTiff',
height=height, width=width, count=bands,
dtype=dtype, crs=crs, transform=transform, compress='lzw'
) as dst:
for i in range(bands):
dst.write(data[i].astype(dtype), i + 1)
def _read_dem(dem_file):
"""Read DEM file and return (data, transform, crs)."""
with rasterio.open(dem_file) as src:
return src.read(1), src.transform, src.crs
# ============================================================
# Core terrain visualizations
# ============================================================
def generate_hillshade(dem_file, basename, vis_dir, resolution):
"""Generate multi-directional hillshade (NW, NE, SW, SE)."""
logger.info(" → Hillshade multidirectionnel...")
t0 = time.time()
output = vis_dir / f"{basename}_hillshade_multi.tif"
try:
dem, transform, crs = _read_dem(dem_file)
dy, dx = np.gradient(dem)
azimuts = [315, 45, 225, 135]
altitude = 30
hillshades = []
for az in azimuts:
az_rad = np.radians(az)
alt_rad = np.radians(altitude)
slope = np.arctan(np.sqrt(dx**2 + dy**2))
aspect = np.arctan2(dy, dx)
hs = np.sin(alt_rad) * np.sin(slope) + \
np.cos(alt_rad) * np.cos(slope) * np.cos(az_rad - aspect)
hillshades.append(np.clip(hs, 0, 1))
combined = np.mean(hillshades, axis=0)
_save_tif(output, combined, transform, crs)
logger.info(f" ✓ Hillshade terminé ({time.time()-t0:.1f}s)")
return output
except Exception as e:
logger.error(f" ✗ Erreur hillshade: {e}", exc_info=True)
return None
def generate_slope(dem_file, basename, vis_dir, resolution):
"""Generate slope map (degrees)."""
logger.info(" → Pente (Slope)...")
t0 = time.time()
output = vis_dir / f"{basename}_slope.tif"
try:
dem, transform, crs = _read_dem(dem_file)
dy, dx = np.gradient(dem)
slope = np.arctan(np.sqrt(dx**2 + dy**2)) * 180 / np.pi
_save_tif(output, slope, transform, crs)
logger.info(f" ✓ Pente terminée ({time.time()-t0:.1f}s)")
return output
except Exception as e:
logger.error(f" ✗ Erreur slope: {e}", exc_info=True)
return None
def generate_aspect(dem_file, basename, vis_dir, resolution):
"""Generate aspect (slope orientation) map."""
logger.info(" → Aspect (Orientation)...")
t0 = time.time()
output = vis_dir / f"{basename}_aspect.tif"
try:
dem, transform, crs = _read_dem(dem_file)
dy, dx = np.gradient(dem)
aspect = np.arctan2(dy, dx) * 180 / np.pi
aspect = np.mod(aspect, 360)
_save_tif(output, aspect, transform, crs)
logger.info(f" ✓ Aspect terminé ({time.time()-t0:.1f}s)")
return output
except Exception as e:
logger.error(f" ✗ Erreur aspect: {e}", exc_info=True)
return None
def generate_curvature(dem_file, basename, vis_dir, resolution):
"""Generate curvature (terrain concavity/convexity) map."""
logger.info(" → Courbure (Curvature)...")
t0 = time.time()
output = vis_dir / f"{basename}_curvature.tif"
try:
dem, transform, crs = _read_dem(dem_file)
dz_dx = np.gradient(dem, axis=1)
dz_dy = np.gradient(dem, axis=0)
d2z_dx2 = np.gradient(dz_dx, axis=1)
d2z_dy2 = np.gradient(dz_dy, axis=0)
curvature = (d2z_dx2 + d2z_dy2) / 2
_save_tif(output, curvature, transform, crs)
logger.info(f" ✓ Courbure terminée ({time.time()-t0:.1f}s)")
return output
except Exception as e:
logger.error(f" ✗ Erreur curvature: {e}", exc_info=True)
return None
def generate_solar(dem_file, basename, vis_dir, resolution):
"""Generate solar irradiance simulation."""
logger.info(" → Éclairage Solaire...")
t0 = time.time()
output = vis_dir / f"{basename}_solar.tif"
try:
dem, transform, crs = _read_dem(dem_file)
dy, dx = np.gradient(dem)
slope = np.arctan(np.sqrt(dx**2 + dy**2))
aspect = np.arctan2(dy, dx)
az_rad = np.radians(90)
alt_rad = np.radians(30)
solar = np.sin(alt_rad) * np.sin(slope) + \
np.cos(alt_rad) * np.cos(slope) * np.cos(az_rad - aspect)
solar = np.clip(solar, 0, 1)
_save_tif(output, solar, transform, crs)
logger.info(f" ✓ Solaire terminé ({time.time()-t0:.1f}s)")
return output
except Exception as e:
logger.error(f" ✗ Erreur solar: {e}", exc_info=True)
return None
# ============================================================
# GPU-accelerated visualizations
# ============================================================
def generate_lrm(dem_file, basename, vis_dir, resolution):
"""Local Relief Model - deviation from local mean (GPU if available)."""
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f" → Local Relief Model{gpu_tag}...")
t0 = time.time()
output = vis_dir / f"{basename}_lrm.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
dem = to_gpu(dem_np)
local_mean = xp_gaussian_filter(dem, sigma=15/resolution)
lrm = dem - local_mean
lrm_np = to_cpu(lrm).astype(np.float32)
_save_tif(output, lrm_np, transform, crs)
logger.info(f" ✓ LRM terminé ({time.time()-t0:.1f}s){gpu_tag}")
return output
except Exception as e:
logger.error(f" ✗ Erreur LRM: {e}", exc_info=True)
return None
def generate_svf(dem_file, basename, vis_dir, resolution):
"""Sky-View Factor - ray-tracing on 16 azimuths (GPU if available).
For each pixel, trace rays in N directions, find the max horizon
angle in each direction, then SVF = (1/N) * sum(cos²(horizon_angle)).
Valleys/crevices have low SVF (obstructed sky), ridges/peaks have high SVF.
"""
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f" → Sky-View Factor (ray-tracing){gpu_tag}...")
t0 = time.time()
output = vis_dir / f"{basename}_svf.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
rows, cols = dem_np.shape
res = resolution
dem = to_gpu(dem_np)
n_dirs = 16
angles = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False)
dx = np.cos(angles)
dy = np.sin(angles)
max_dist = int(50 / res)
padded = xp.pad(dem, max_dist, mode='constant', constant_values=xp.nan)
svf = xp.zeros_like(dem)
for d_idx in range(n_dirs):
ddx, ddy = dx[d_idx], dy[d_idx]
horizon = xp.zeros_like(dem)
for step in range(1, max_dist + 1):
px = int(round(ddx * step))
py = int(round(ddy * step))
dist_m = np.sqrt((ddx * step * res) ** 2 + (ddy * step * res) ** 2)
if dist_m < res * 0.5:
continue
elev_diff = padded[max_dist + py:max_dist + py + rows,
max_dist + px:max_dist + px + cols] - dem
angle = xp.arctan2(elev_diff, dist_m)
horizon = xp.where(xp.isnan(angle), horizon,
xp.maximum(horizon, xp.nan_to_num(angle, nan=0)))
svf += xp.cos(xp.pi / 2 - horizon) ** 2
svf /= n_dirs
svf_np = to_cpu(svf).astype(np.float32)
_save_tif(output, svf_np, transform, crs)
logger.info(f" ✓ SVF terminé ({time.time()-t0:.1f}s){gpu_tag}")
return output
except Exception as e:
logger.error(f" ✗ Erreur SVF: {e}", exc_info=True)
return None
def generate_openness(dem_file, basename, vis_dir, resolution, positive=True):
"""Positive/Negative Openness - true zenith/nadir angle computation (GPU if available).
For each pixel, in 8 directions (N, NE, E, SE, S, SW, W, NW):
- Positive openness: max zenith angle (angle from vertical to highest visible terrain)
- Negative openness: max nadir angle (angle from vertical down to lowest terrain)
Result is averaged across all 8 directions.
"""
name = "positive_openness" if positive else "negative_openness"
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f"{name.replace('_', ' ').title()} (ray-tracing){gpu_tag}...")
t0 = time.time()
output = vis_dir / f"{basename}_{name}.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
rows, cols = dem_np.shape
res = resolution
dem = to_gpu(dem_np)
n_dirs = 8
angles = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False)
dx = np.cos(angles)
dy = np.sin(angles)
max_dist = int(50 / res)
padded = xp.pad(dem, max_dist, mode='constant', constant_values=xp.nan)
openness_sum = xp.zeros_like(dem)
for d_idx in range(n_dirs):
ddx, ddy = dx[d_idx], dy[d_idx]
max_angle = xp.zeros_like(dem)
for step in range(1, max_dist + 1):
px = int(round(ddx * step))
py = int(round(ddy * step))
dist_m = np.sqrt((ddx * step * res) ** 2 + (ddy * step * res) ** 2)
if dist_m < res * 0.5:
continue
elev_diff = padded[max_dist + py:max_dist + py + rows,
max_dist + px:max_dist + px + cols] - dem
if positive:
angle = xp.arctan2(xp.maximum(elev_diff, 0), dist_m)
else:
angle = xp.arctan2(xp.maximum(-elev_diff, 0), dist_m)
max_angle = xp.where(xp.isnan(angle), max_angle,
xp.maximum(max_angle, xp.nan_to_num(angle, nan=0)))
openness_sum += max_angle
openness_result = to_cpu(xp.degrees(openness_sum / n_dirs)).astype(np.float32)
_save_tif(output, openness_result, transform, crs)
logger.info(f"{name} terminé ({time.time()-t0:.1f}s){gpu_tag}")
return output
except Exception as e:
logger.error(f" ✗ Erreur openness: {e}", exc_info=True)
return None
def generate_mslrm(dem_file, basename, vis_dir, resolution):
"""Multi-Scale Relief Model (MSRM) - LRM at 5 scales combined (GPU if available)."""
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f" → Multi-Scale Relief Model (MSRM){gpu_tag}...")
t0 = time.time()
output = vis_dir / f"{basename}_mslrm.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
dem = to_gpu(dem_np)
sigmas = [5, 10, 25, 50, 100]
lrm_stack = []
for sigma in sigmas:
sigma_px = sigma / resolution
local_mean = xp_gaussian_filter(dem, sigma=sigma_px)
lrm = dem - local_mean
lrm_norm = lrm / max(float(xp.nanstd(lrm)), 0.01)
lrm_stack.append(lrm_norm)
mslrm = xp.sqrt(xp.mean(xp.array(lrm_stack) ** 2, axis=0))
mslrm_np = to_cpu(mslrm).astype(np.float32)
_save_tif(output, mslrm_np, transform, crs)
logger.info(f" ✓ MSRM terminé ({time.time()-t0:.1f}s){gpu_tag}")
return output
except Exception as e:
logger.error(f" ✗ Erreur MSRM: {e}", exc_info=True)
return None
def generate_tpi(dem_file, basename, vis_dir, resolution):
"""Multi-Scale Topographic Position Index (GPU if available).
TPI = elevation - mean(neighborhood).
Computed at fine (5m) and broad (100m) scales.
"""
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f" → TPI multi-échelle{gpu_tag}...")
t0 = time.time()
output = vis_dir / f"{basename}_tpi.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
dem = to_gpu(dem_np)
fine_size = int(5 / resolution)
if fine_size % 2 == 0:
fine_size += 1
tpi_fine = dem - xp_uniform_filter(dem, size=fine_size)
broad_size = int(100 / resolution)
if broad_size % 2 == 0:
broad_size += 1
tpi_broad = dem - xp_uniform_filter(dem, size=broad_size)
fine_std = max(float(xp.nanstd(tpi_fine)), 0.01)
broad_std = max(float(xp.nanstd(tpi_broad)), 0.01)
tpi_combined = 0.6 * (tpi_fine / fine_std) + 0.4 * (tpi_broad / broad_std)
tpi_np = to_cpu(tpi_combined).astype(np.float32)
_save_tif(output, tpi_np, transform, crs)
logger.info(f" ✓ TPI terminé ({time.time()-t0:.1f}s){gpu_tag}")
return output
except Exception as e:
logger.error(f" ✗ Erreur TPI: {e}", exc_info=True)
return None
# ============================================================
# Depression / hydrology
# ============================================================
def generate_depressions(dem_file, basename, vis_dir, resolution):
"""Depression detection using hydrological sink filling."""
logger.info(" → Détection dépressions (hydrologique)...")
t0 = time.time()
output = vis_dir / f"{basename}_depressions.tif"
try:
dem, transform, crs = _read_dem(dem_file)
from scipy.ndimage import binary_dilation, generate_binary_structure
dem_filled = dem.copy()
nodata_mask = np.isnan(dem_filled)
dem_filled[nodata_mask] = np.nanmax(dem) + 1000
struct = generate_binary_structure(2, 2)
changed = True
iterations = 0
max_iter = 100
while changed and iterations < max_iter:
from scipy.ndimage import minimum_filter as scipy_min_filter
neighbor_min = scipy_min_filter(dem_filled, footprint=struct)
sinks = (dem_filled < neighbor_min) & ~nodata_mask
if not np.any(sinks):
break
new_dem = np.maximum(dem_filled, neighbor_min)
new_dem[nodata_mask] = np.nan
changed = np.any(new_dem != dem_filled)
dem_filled = new_dem
iterations += 1
depressions = dem_filled - dem
depressions[nodata_mask] = np.nan
depressions = np.where(depressions > 0.01, depressions, 0)
_save_tif(output, depressions, transform, crs)
logger.info(f" ✓ Dépressions terminé ({time.time()-t0:.1f}s)")
return output
except Exception as e:
logger.error(f" ✗ Erreur dépressions: {e}", exc_info=True)
return None
# ============================================================
# SAILORE
# ============================================================
def generate_sailore(dem_file, basename, vis_dir, resolution):
"""SAILORE - Self-Adaptive Improved Local Relief Model (GPU if available).
Kernel size adapts to local slope: flat areas get larger kernels,
steep areas get smaller kernels.
"""
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f" → SAILORE (LRM adaptatif){gpu_tag}...")
t0 = time.time()
output = vis_dir / f"{basename}_sailore.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
dem = to_gpu(dem_np)
gy, gx = xp.gradient(dem, resolution)
slope = xp.arctan(xp.sqrt(gx**2 + gy**2))
slope_deg = xp.degrees(slope)
sigma_min = 2.0 / resolution
sigma_max = 25.0 / resolution
slope_norm = xp.clip(slope_deg / 30.0, 0, 1)
adaptive_sigma = sigma_max - slope_norm * (sigma_max - sigma_min)
lrm_fine = dem - xp_gaussian_filter(dem, sigma=sigma_min)
lrm_medium = dem - xp_gaussian_filter(dem, sigma=(sigma_min + sigma_max) / 2)
lrm_coarse = dem - xp_gaussian_filter(dem, sigma=sigma_max)
w_fine = slope_norm
w_medium = 1 - 2 * xp.abs(slope_norm - 0.5)
w_coarse = 1 - slope_norm
w_total = w_fine + w_medium + w_coarse
w_total[w_total == 0] = 1
sailore = (w_fine * lrm_fine + w_medium * lrm_medium + w_coarse * lrm_coarse) / w_total
sailore_np = to_cpu(sailore).astype(np.float32)
_save_tif(output, sailore_np, transform, crs)
logger.info(f" ✓ SAILORE terminé ({time.time()-t0:.1f}s){gpu_tag}")
return output
except Exception as e:
logger.error(f" ✗ Erreur SAILORE: {e}", exc_info=True)
return None
# ============================================================
# Roughness
# ============================================================
def generate_roughness(dem_file, basename, vis_dir, resolution):
"""Surface roughness - standard deviation of elevation in a window."""
logger.info(" → Rugosité de surface...")
t0 = time.time()
output = vis_dir / f"{basename}_roughness.tif"
try:
dem, transform, crs = _read_dem(dem_file)
window_size = int(5 / resolution)
if window_size % 2 == 0:
window_size += 1
def std_filter(arr):
return np.nanstd(arr)
roughness = generic_filter(dem.astype(np.float64), std_filter,
size=window_size, mode='nearest')
_save_tif(output, roughness, transform, crs)
logger.info(f" ✓ Rugosité terminée ({time.time()-t0:.1f}s)")
return output
except Exception as e:
logger.error(f" ✗ Erreur rugosité: {e}", exc_info=True)
return None
# ============================================================
# Anomalies
# ============================================================
def generate_anomalies(dem_file, basename, vis_dir, resolution):
"""Statistical anomaly detection - z-score of local relief + Local Moran's I."""
logger.info(" → Détection anomalies statistiques...")
t0 = time.time()
output = vis_dir / f"{basename}_anomalies.tif"
try:
dem, transform, crs = _read_dem(dem_file)
lrm = dem - gaussian_filter(dem, sigma=15 / resolution)
lrm_mean = np.nanmean(lrm)
lrm_std = max(np.nanstd(lrm), 0.01)
z_score = (lrm - lrm_mean) / lrm_std
window = int(10 / resolution)
if window % 2 == 0:
window += 1
local_mean = uniform_filter(z_score, size=window)
morans_i = z_score * (local_mean - np.nanmean(z_score)) / np.nanstd(z_score)
anomaly_score = np.abs(z_score) * np.sign(morans_i)
_save_tif(output, anomaly_score, transform, crs)
logger.info(f" ✓ Anomalies terminé ({time.time()-t0:.1f}s)")
return output
except Exception as e:
logger.error(f" ✗ Erreur anomalies: {e}", exc_info=True)
return None
# ============================================================
# Wavelet
# ============================================================
def generate_wavelet(dem_file, basename, vis_dir, resolution):
"""Mexican Hat wavelet multi-scale analysis (GPU if available).
CWT 2D at multiple scales to detect circular features.
"""
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f" → Ondelette Mexican Hat multi-échelle{gpu_tag}...")
t0 = time.time()
output = vis_dir / f"{basename}_wavelet.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
dem = to_gpu(dem_np)
scales = [2, 5, 10, 20, 50]
wavelet_stack = []
for scale_m in scales:
sigma_px = scale_m / resolution
if HAS_GPU:
from cupyx.scipy.ndimage import gaussian_laplace as gpu_gaussian_laplace
response = -gpu_gaussian_laplace(dem, sigma=sigma_px)
else:
from scipy.ndimage import gaussian_laplace
response = to_gpu(-gaussian_laplace(to_cpu(dem).astype(np.float64), sigma=sigma_px))
response /= max(float(xp.nanstd(response)), 0.01)
wavelet_stack.append(response)
combined = xp.sqrt(xp.mean(xp.array(wavelet_stack) ** 2, axis=0))
combined_np = to_cpu(combined).astype(np.float32)
_save_tif(output, combined_np, transform, crs)
logger.info(f" ✓ Ondelette terminée ({time.time()-t0:.1f}s){gpu_tag}")
return output
except Exception as e:
logger.error(f" ✗ Erreur ondelette: {e}", exc_info=True)
return None
# ============================================================
# Texture GLCM
# ============================================================
def generate_texture(dem_file, basename, vis_dir, resolution):
"""GLCM texture analysis on hillshade - contrast, entropy, homogeneity."""
logger.info(" → Texture GLCM...")
t0 = time.time()
output = vis_dir / f"{basename}_texture.tif"
try:
dem, transform, crs = _read_dem(dem_file)
gy, gx = np.gradient(dem, resolution)
slope = np.arctan(np.sqrt(gx**2 + gy**2))
alt_rad = np.radians(45)
az_rad = np.radians(315)
shading = (np.sin(alt_rad) * np.cos(slope) +
np.cos(alt_rad) * np.sin(slope) *
np.cos(az_rad - np.arctan2(gy, gx)))
hillshade = np.clip(shading, 0, 1)
valid = hillshade[~np.isnan(hillshade)]
if len(valid) == 0:
raise ValueError("No valid data for texture analysis")
lo, hi = np.percentile(valid, (1, 99))
img = np.clip((hillshade - lo) / max(hi - lo, 0.001), 0, 1)
img_uint8 = (img * 255).astype(np.uint8)
window = int(5 / resolution)
if window % 2 == 0:
window += 1
def local_variance(arr):
return np.var(arr.astype(np.float64))
def local_entropy(arr):
hist, _ = np.histogram(arr.astype(np.float64), bins=16, range=(0, 256))
hist = hist / max(hist.sum(), 1)
hist = hist[hist > 0]
return -np.sum(hist * np.log2(hist))
def local_homogeneity(arr):
arr_f = arr.astype(np.float64)
return np.mean(1.0 / (1.0 + (arr_f - np.mean(arr_f)) ** 2))
contrast = generic_filter(img_uint8.astype(np.float64), local_variance,
size=window, mode='nearest')
entropy = generic_filter(img_uint8.astype(np.float64), local_entropy,
size=window, mode='nearest')
homogeneity = generic_filter(img_uint8.astype(np.float64), local_homogeneity,
size=window, mode='nearest')
def norm(arr):
valid_arr = arr[~np.isnan(arr)]
if len(valid_arr) == 0:
return arr
std_val = max(np.std(valid_arr), 0.01)
return (arr - np.mean(valid_arr)) / std_val
texture_combined = (0.4 * norm(contrast) +
0.4 * norm(entropy) -
0.2 * norm(homogeneity))
_save_tif(output, texture_combined, transform, crs)
logger.info(f" ✓ Texture terminée ({time.time()-t0:.1f}s)")
return output
except Exception as e:
logger.error(f" ✗ Erreur texture GLCM: {e}", exc_info=True)
return None
# ============================================================
# Flow accumulation
# ============================================================
def generate_flow(dem_file, basename, vis_dir, resolution):
"""Flow accumulation using D8 algorithm.
Identifies drainage patterns, ditches, and enclosure boundaries.
"""
logger.info(" → Accumulation de flux D8...")
t0 = time.time()
output = vis_dir / f"{basename}_flow.tif"
try:
dem, transform, crs = _read_dem(dem_file)
rows, cols = dem.shape
nodata_mask = np.isnan(dem)
from scipy.ndimage import minimum_filter as scipy_min_filter, generate_binary_structure
dem_filled = dem.copy()
dem_filled[nodata_mask] = np.nanmax(dem) + 1000
struct = generate_binary_structure(2, 2)
for _ in range(50):
neighbor_min = scipy_min_filter(dem_filled, footprint=struct)
sinks = (dem_filled < neighbor_min) & ~nodata_mask
if not np.any(sinks):
break
dem_filled = np.where(sinks, neighbor_min, dem_filled)
dem_filled[nodata_mask] = np.nan
dx8 = [1, 1, 0, -1, -1, -1, 0, 1]
dy8 = [0, 1, 1, 1, 0, -1, -1, -1]
dist8 = [1.0, np.sqrt(2), 1.0, np.sqrt(2), 1.0, np.sqrt(2), 1.0, np.sqrt(2)]
flow_dir = np.full((rows, cols), -1, dtype=np.int8)
max_slope = np.full((rows, cols), 0.0)
padded = np.pad(dem_filled, 1, mode='constant',
constant_values=np.nanmax(dem_filled) + 10000)
for d in range(8):
nx = 1 + dx8[d]
ny = 1 + dy8[d]
neighbor_elev = padded[ny:ny + rows, nx:nx + cols]
slope = (dem_filled - neighbor_elev) / (dist8[d] * resolution)
slope[nodata_mask] = -1
better = slope > max_slope
flow_dir[better] = d
max_slope[better] = slope[better]
flat_dem = dem_filled[~nodata_mask].flatten()
valid_indices = np.where(~nodata_mask.flatten())[0]
sort_order = valid_indices[np.argsort(-flat_dem)]
flow_acc = np.ones((rows, cols), dtype=np.float32)
flow_acc[nodata_mask] = 0
for idx in sort_order:
r, c = divmod(idx, cols)
d = flow_dir[r, c]
if d < 0:
continue
nr, nc = r + dy8[d], c + dx8[d]
if 0 <= nr < rows and 0 <= nc < cols and not nodata_mask[nr, nc]:
flow_acc[nr, nc] += flow_acc[r, c]
flow_log = np.log1p(flow_acc)
_save_tif(output, flow_log, transform, crs)
logger.info(f" ✓ Flux terminé ({time.time()-t0:.1f}s)")
return output
except Exception as e:
logger.error(f" ✗ Erreur flux: {e}", exc_info=True)
return None