Files
lidar_rendu/lidar_pipeline/dtm.py
Jacquin Antoine ac56ba8084 Add multi-resolution support and remove PDF generation
- --resolution now accepts comma-separated values (e.g. 0.5,0.2)
- Additional resolutions get suffixed output dirs: basename_r0p2/
- DTM files are named basename_dtm_r0p2.tif for extra resolutions
- Ground classification is done once and shared across resolutions
- PDF report generation removed per user request
- Fix --file argument to accept full filenames with extensions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-14 21:29:45 +02:00

504 lines
18 KiB
Python

"""DTM generation from classified LiDAR point clouds.
Handles ground classification via PDAL (SMRF or CSF) and DTM rasterisation
using scipy binned_statistic_2d. Zones without LiDAR data remain as NaN.
"""
import json
import logging
import subprocess
from pathlib import Path
import numpy as np
import rasterio
from rasterio.transform import from_bounds
from scipy.stats import binned_statistic_2d
logger = logging.getLogger("lidar")
def _create_ground_pipeline(input_laz, output_las, method):
"""Create a PDAL pipeline JSON for ground classification.
All methods include a ReturnNumber/NumberOfReturns >= 1 filter to handle
LiDAR HD files that may contain points with invalid return numbers.
Pre-processing steps (PDAL recommended workflow):
1. Reset Classification to 0
2. ELM (Extended Local Minimum) — mark low outliers as noise (Classification=7)
3. Statistical outlier removal
4. Ground classification (SMRF or CSF)
5. Extract ground points (Classification=2)
Args:
input_laz: Path to input LAZ/LAS file.
output_las: Path to output classified LAS file.
method: Ground classification method ('smrf' or 'csf').
Returns:
JSON string of the PDAL pipeline.
"""
# Common ReturnNumber filter for LiDAR HD compatibility
return_filter = {
"type": "filters.range",
"limits": "ReturnNumber[1:],NumberOfReturns[1:]"
}
# Reset Classification to 0 before preprocessing
reset_classification = {
"type": "filters.assign",
"assignment": "Classification[:]=0"
}
# ELM (Extended Local Minimum) — mark low outliers as noise
# Parameters tuned for rocky limestone terrain with low vegetation:
# - cell=5.0m: fine resolution to capture rocky relief
# - threshold=2.0m: high threshold to avoid marking rock outcrops as noise
elm_filter = {
"type": "filters.elm",
"cell": 5.0,
"threshold": 2.0
}
# Statistical outlier removal
outlier_filter = {
"type": "filters.outlier",
"method": "statistical",
"mean_k": 8,
"multiplier": 3.0
}
# Classification filter (ground points only)
ground_filter = {
"type": "filters.range",
"limits": "Classification[2:2]"
}
# Method-specific ground classification filter
if method == 'smrf':
ground_step = {
"type": "filters.smrf",
"ignore": "Classification[7:7]",
"slope": 1.0,
"window": 16.0,
"threshold": 0.5,
"scalar": 1.25
}
elif method == 'csf':
ground_step = {
"type": "filters.csf",
"resolution": 0.5,
"rigidness": 3,
"smooth": True,
"threshold": 0.5
}
else:
raise ValueError(f"Méthode de classification inconnue: {method}")
pipeline = {
"pipeline": [
str(input_laz),
return_filter,
reset_classification,
elm_filter,
outlier_filter,
ground_step,
ground_filter,
{
"type": "writers.las",
"filename": str(output_las),
"extra_dims": "all"
}
]
}
return json.dumps(pipeline)
def create_smrf_pipeline(input_laz, output_las):
"""Create a PDAL pipeline JSON for SMRF ground classification."""
return _create_ground_pipeline(input_laz, output_las, 'smrf')
def create_csf_pipeline(input_laz, output_las):
"""Create a PDAL pipeline JSON for CSF ground classification."""
return _create_ground_pipeline(input_laz, output_las, 'csf')
def validate_laz(laz_file):
"""Quick integrity check for a LAZ/LAS file.
Tries laspy first (fast header read), then PDAL as fallback for COPC files
that laspy cannot read. Also checks that the file contains points.
Returns:
True if file is readable and contains points, False otherwise.
"""
# Try laspy first (fast)
import laspy
try:
with laspy.open(str(laz_file)) as f:
header = f.header
point_count = header.point_count
if point_count == 0:
logger.error(f" ✗ Fichier vide (0 points): {laz_file.name}")
logger.error(f" → Re-télécharger depuis https://ign.fr/lidar-hd")
return False
return True
except Exception:
pass
# Fallback: try PDAL (handles COPC v1.1 that laspy can't read)
import subprocess
try:
result = subprocess.run(
["pdal", "info", str(laz_file), "--summary"],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
# Check point count from PDAL info output
import json as _json
try:
info = _json.loads(result.stdout)
count = info.get('summary', {}).get('num_points', 0)
if count == 0:
logger.error(f" ✗ Fichier vide (0 points PDAL): {laz_file.name}")
logger.error(f" → Re-télécharger depuis https://ign.fr/lidar-hd")
return False
except Exception:
pass # Can't parse — assume valid
return True
logger.error(f" ✗ Fichier illisible: {laz_file.name}")
logger.error(f" PDAL: {result.stderr.strip()[:200]}")
except (subprocess.TimeoutExpired, FileNotFoundError):
logger.error(f" ✗ Impossible de vérifier le fichier: {laz_file.name}")
logger.error(f" → Re-télécharger depuis https://ign.fr/lidar-hd")
return False
def _read_with_pdal(laz_file):
"""Read a LAZ/LAS file via PDAL when laspy fails (e.g. COPC v1.1).
Returns a laspy.LasData object, or None on failure.
"""
import subprocess
import tempfile
import os
try:
# Convert COPC to LAS via PDAL, then read with laspy
with tempfile.NamedTemporaryFile(suffix='.las', delete=False) as tmp:
tmp_path = tmp.name
pipeline = json.dumps({
"pipeline": [
str(laz_file),
{"type": "writers.las", "filename": tmp_path}
]
})
result = subprocess.run(
["pdal", "pipeline", "--stdin"],
input=pipeline, capture_output=True, text=True, timeout=300
)
if result.returncode != 0:
logger.warning(f" PDAL conversion échouée: {result.stderr[:200]}")
try:
os.unlink(tmp_path)
except Exception:
pass
return None
import laspy
las = laspy.read(tmp_path)
try:
os.unlink(tmp_path)
except Exception:
pass
if len(las.points) == 0:
logger.warning(f" PDAL: conversion réussie mais 0 points")
return None
return las
except Exception as e:
logger.warning(f" PDAL fallback échoué: {e}")
return None
def detect_ground_method(laz_file):
"""Detect the best ground classification method based on point cloud statistics.
Auto-selects between SMRF and CSF:
- SMRF: fast, robust for most natural terrain (PDAL recommended default)
- CSF: cloth simulation, better for complex/urban terrain
Falls back to SMRF if the file cannot be read or attributes are missing.
Args:
laz_file: Path to input LAZ/LAS file.
Returns:
String: 'smrf' or 'csf'
"""
import laspy
# Try laspy first, then PDAL for COPC files
las = None
try:
las = laspy.read(str(laz_file))
except Exception as e:
logger.warning(f" laspy: {e}")
logger.info(f" → Lecture via PDAL pour auto-détection...")
las = _read_with_pdal(laz_file)
if las is None:
logger.info(f" → Méthode: SMRF (défaut — lecture impossible)")
return 'smrf'
total_points = len(las.points)
if total_points == 0:
logger.warning(f" Nuage vide (0 points) — méthode par défaut: SMRF")
return 'smrf'
z = np.array(las.z)
# Height variance (always available)
z_std = float(np.std(z))
z_range = float(np.max(z) - np.min(z))
# Try to get NumberOfReturns (may not exist in all point formats)
single_return_ratio = 0.0
try:
num_returns = np.array(las.NumberOfReturns)
single_return_count = int(np.sum(num_returns == 1))
single_return_ratio = single_return_count / total_points if total_points > 0 else 0
except AttributeError:
logger.debug(" NumberOfReturns non disponible — utilisation de la variance Z uniquement")
logger.info(f" Analyse du nuage: {total_points:,} points, "
f"ratio_retours_uniques={single_return_ratio:.2f}, "
f"écart_Z={z_std:.1f}m, amplitude_Z={z_range:.1f}m")
# Decision logic:
# - High single-return ratio (>0.6) → urban (buildings, roads) → CSF (cloth simulation)
# - High elevation variance (>30m) → complex/mountainous terrain → CSF
# - Default → SMRF (fast, robust for most natural terrain)
if single_return_ratio > 0.6:
method = 'csf'
reason = f"ratio retours uniques={single_return_ratio:.2f} > 0.6 → milieu urbain"
elif z_std > 30:
method = 'csf'
reason = f"écart_Z={z_std:.1f}m > 30m → terrain complexe"
else:
method = 'smrf'
reason = f"terrain naturel standard"
logger.info(f" → Méthode: {method.upper()} ({reason})")
return method
def classify_ground(laz_file, temp_dir, method='auto', force=False):
"""Classify ground points using PDAL ground classification filter.
Args:
laz_file: Path to input LAZ/LAS file.
temp_dir: Directory for temporary files (pipeline.json, ground.las).
method: Ground classification method ('auto', 'smrf', or 'csf').
force: If True, reclassify even if output file already exists.
Returns:
Path to classified ground LAS file, or None on failure.
"""
import laspy # noqa: ensure available
# Auto-detect method if requested
if method == 'auto':
method = detect_ground_method(laz_file)
logger.info(f" Classification sol: {method.upper()} (auto)")
else:
logger.info(f" Classification sol: {method.upper()} (forcé)")
# Use shared basename extraction function
from .pipeline import _file_basename
laz_base = _file_basename(laz_file)
output_las = temp_dir / f"{laz_base}_ground_{method}.las"
if output_las.exists() and not force:
logger.info(f" Classification {method.upper()} déjà effectuée — fichier existant réutilisé")
return output_las
if force and output_las.exists():
logger.info(f" Reclassification forcée — suppression de {output_las.name}")
output_las.unlink()
pipeline_json = _create_ground_pipeline(laz_file, output_las, method)
pipeline_file = temp_dir / f"pipeline_{method}.json"
with open(pipeline_file, 'w') as f:
f.write(pipeline_json)
try:
subprocess.run(
["pdal", "pipeline", str(pipeline_file)],
capture_output=True, check=True
)
# Verify that ground file has points
if output_las.exists() and output_las.stat().st_size < 100:
logger.error(f" ✗ Fichier ground vide (taille < 100 octets)")
output_las.unlink(missing_ok=True)
return None
logger.info(f" ✓ Classification sol {method.upper()} terminée")
return output_las
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode() if e.stderr else str(e)
logger.warning(f" ✗ Erreur classification PDAL ({method.upper()}): {error_msg}")
# Try repairing file with laspy if PDAL fails on EVLR/VLR
if 'VLR' in error_msg or 'Invalid' in error_msg:
logger.info(f" → Tentative de réparation du fichier avec laspy...")
repaired_las = temp_dir / f"{laz_base}_repaired.las"
if _repair_laz_with_laspy(laz_file, repaired_las):
# Retry PDAL pipeline with repaired file
pipeline_json = _create_ground_pipeline(repaired_las, output_las, method)
with open(pipeline_file, 'w') as f:
f.write(pipeline_json)
try:
subprocess.run(
["pdal", "pipeline", str(pipeline_file)],
capture_output=True, check=True
)
logger.info(f" ✓ Classification sol {method.upper()} terminée (fichier réparé)")
return output_las
except subprocess.CalledProcessError as e2:
error_msg2 = e2.stderr.decode() if e2.stderr else str(e2)
logger.error(f" ✗ Échec classification même après réparation: {error_msg2}")
else:
logger.error(f" ✗ Impossible de réparer le fichier")
return None
def _repair_laz_with_laspy(input_laz, output_las):
"""Try to repair a corrupt LAZ file by re-reading with laspy and saving as LAS.
Works around PDAL errors like 'Invalid Extended VLR size' by stripping
problematic VLR/EVLR metadata during re-save.
Args:
input_laz: Path to corrupt LAZ/LAS file.
output_las: Path for repaired LAS output.
Returns:
True if repair succeeded, False otherwise.
"""
import laspy
try:
las = laspy.read(str(input_laz))
las.write(str(output_las))
logger.info(f" ✓ Fichier réparé via laspy ({len(las.points):,} points)")
return True
except Exception as e:
logger.warning(f" ✗ Réparation laspy échouée: {e}")
return False
def create_dtm_fast(las_file, basename, dtm_dir, resolution, force=False, output_suffix=""):
"""Create DTM using fast binning method with gap filling.
Args:
las_file: Path to classified ground LAS file.
basename: Base name for output file.
dtm_dir: Directory for output DTM GeoTIFF.
resolution: Grid resolution in meters per pixel.
force: If True, regenerate even if DTM already exists.
output_suffix: Suffix for output filename (e.g. '_r0p2' for additional resolutions).
Returns:
Path to output DTM GeoTIFF, or None on failure.
"""
output_tif = dtm_dir / f"{basename}_dtm{output_suffix}.tif"
if output_tif.exists() and not force:
logger.info(f" DTM déjà existant — fichier réutilisé: {output_tif.name}")
return output_tif
import laspy
logger.info(" → Génération DTM...")
try:
las = laspy.read(str(las_file))
except Exception as e:
# laspy can't read COPC v1.1 — try PDAL conversion
logger.warning(f" laspy: {e}")
logger.info(f" → Conversion via PDAL pour lecture COPC...")
las = _read_with_pdal(las_file)
if las is None:
logger.error(f" ✗ Impossible de lire {las_file.name}")
return None
if len(las.points) == 0:
logger.error(f" ✗ Fichier vide (0 points): {las_file.name}")
return None
try:
min_x, max_x = float(las.header.min[0]), float(las.header.max[0])
min_y, max_y = float(las.header.min[1]), float(las.header.max[1])
width = int(np.ceil((max_x - min_x) / resolution))
height = int(np.ceil((max_y - min_y) / resolution))
logger.debug(f" Bounds: X[{min_x:.1f}, {max_x:.1f}] Y[{min_y:.1f}, {max_y:.1f}]")
logger.debug(f" Grid: {width}x{height} pixels ({len(las.points):,} points)")
logger.info(f" Rasterisation {width}x{height} ({len(las.points):,} points)...")
stat = binned_statistic_2d(
las.x, las.y, las.z,
statistic='mean',
bins=[width, height],
range=[[min_x, max_x], [min_y, max_y]]
)
dtm = stat.statistic.T
dtm = dtm[::-1, :] # Flip Y so north is at top
# Fill small gaps (< 1m from existing data) while keeping large gaps as NaN
nan_count = np.count_nonzero(np.isnan(dtm))
if nan_count > 0:
total = dtm.size
nan_pct = 100.0 * nan_count / total
logger.info(f" {nan_count:,} pixels sans données ({nan_pct:.1f}%)")
max_gap_pixels = max(1, int(1.0 / resolution))
from rasterio.fill import fillnodata
valid_mask = ~np.isnan(dtm)
dtm_filled = fillnodata(dtm, mask=valid_mask, max_search_distance=max_gap_pixels)
small_gap_mask = np.isnan(dtm) & ~np.isnan(dtm_filled)
filled_count = np.count_nonzero(small_gap_mask)
if filled_count > 0:
dtm = np.where(small_gap_mask, dtm_filled, dtm)
logger.info(f" {filled_count:,} petits trous comblés (< {max_gap_pixels}px)")
remaining = np.count_nonzero(np.isnan(dtm))
logger.info(f" {remaining:,} pixels restent sans données (grands écarts)")
# Save as GeoTIFF
output_tif = dtm_dir / f"{basename}_dtm.tif"
transform = from_bounds(min_x, min_y, max_x, max_y, width, height)
with rasterio.open(
output_tif, 'w',
driver='GTiff', height=height, width=width,
count=1, dtype='float32',
crs='EPSG:2154', transform=transform,
nodata=float('nan'),
compress='lzw'
) as dst:
dst.write(dtm.astype('float32'), 1)
logger.info(f" ✓ DTM créé: {output_tif.name}")
return output_tif
except Exception as e:
logger.error(f" ✗ Erreur DTM: {e}", exc_info=True)
return None