Files
lidar_rendu/lidar_pipeline/dtm.py
Jacquin Antoine ad762e682d Suppression éclairage solaire, GPU accéléré, --file multi, tests unitaires
- Suppression de generate_solar (éclairage solaire) des visualisations
- Accélération GPU de hillshade, slope, aspect, curvature, depressions,
  anomalies, roughness, texture GLCM, flow (sink filling)
- Nettoyage mémoire GPU entre visualisations (gpu_cleanup)
- Correction OOM texture GLCM: calcul entropie bin par bin au lieu d'un
  tableau 3D massif sur GPU
- Correction bug: xp_minimum_filter manquant dans imports visualizations
- Option --file accepte plusieurs noms complets sans extension
- run.sh affiche l'aide si appelé sans arguments
- Option --test pour exécuter les tests unitaires dans Docker
- Filtre ReturnNumber>=1 intégré dans le pipeline PDAL (plus d'erreur SMRF)
- 60 tests unitaires: GPU, visualisations, rendering, DTM, pipeline, CLI
- Ajout pytest au Dockerfile

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-10 00:57:39 +02:00

187 lines
5.7 KiB
Python

"""DTM generation from classified LiDAR point clouds.
Handles ground classification via PDAL/SMRF and DTM rasterisation
using scipy binned_statistic_2d with gap-filling interpolation.
"""
import json
import logging
import subprocess
from pathlib import Path
import numpy as np
import rasterio
from rasterio.transform import from_bounds
from scipy import ndimage as scipy_ndimage
from scipy.ndimage import distance_transform_edt, gaussian_filter
from scipy.stats import binned_statistic_2d
from scipy import interpolate
logger = logging.getLogger("lidar")
def create_smrf_pipeline(input_laz, output_las):
"""Create a PDAL pipeline JSON for SMRF ground classification.
Includes a filter for ReturnNumber/NumberOfReturns >= 1 to handle
LiDAR HD files that may contain points with invalid return numbers.
Args:
input_laz: Path to input LAZ/LAS file.
output_las: Path to output classified LAS file.
Returns:
JSON string of the PDAL pipeline.
"""
pipeline = {
"pipeline": [
str(input_laz),
{
"type": "filters.range",
"limits": "ReturnNumber[1:],NumberOfReturns[1:]"
},
{
"type": "filters.smrf",
"ignore": "Classification[7:7]",
"slope": 1.0,
"window": 16.0,
"threshold": 0.5,
"scalar": 1.25
},
{
"type": "filters.range",
"limits": "Classification[2:2]"
},
{
"type": "writers.las",
"filename": str(output_las),
"extra_dims": "all"
}
]
}
return json.dumps(pipeline)
def classify_ground(laz_file, temp_dir):
"""Classify ground points using PDAL SMRF filter.
Args:
laz_file: Path to input LAZ/LAS file.
temp_dir: Directory for temporary files (pipeline.json, ground.las).
Returns:
Path to classified ground LAS file, or None on failure.
"""
import laspy # noqa: ensure available
output_las = temp_dir / f"{laz_file.stem}_ground.las"
if output_las.exists():
logger.info(" Classification déjà effectuée — fichier existant réutilisé")
return output_las
pipeline_json = create_smrf_pipeline(laz_file, output_las)
pipeline_file = temp_dir / "pipeline.json"
with open(pipeline_file, 'w') as f:
f.write(pipeline_json)
try:
subprocess.run(
["pdal", "pipeline", str(pipeline_file)],
capture_output=True, check=True
)
logger.info(" ✓ Classification sol terminée")
return output_las
except subprocess.CalledProcessError as e:
logger.error(f" ✗ Erreur classification PDAL: {e.stderr.decode()}")
return None
def create_dtm_fast(las_file, basename, dtm_dir, resolution):
"""Create DTM using fast binning method with gap filling.
Args:
las_file: Path to classified ground LAS file.
basename: Base name for output file.
dtm_dir: Directory for output DTM GeoTIFF.
resolution: Grid resolution in meters per pixel.
Returns:
Path to output DTM GeoTIFF, or None on failure.
"""
import laspy
logger.info(" → Génération DTM...")
try:
las = laspy.read(str(las_file))
min_x, max_x = float(las.header.min[0]), float(las.header.max[0])
min_y, max_y = float(las.header.min[1]), float(las.header.max[1])
width = int(np.ceil((max_x - min_x) / resolution))
height = int(np.ceil((max_y - min_y) / resolution))
logger.debug(f" Bounds: X[{min_x:.1f}, {max_x:.1f}] Y[{min_y:.1f}, {max_y:.1f}]")
logger.debug(f" Grid: {width}x{height} pixels ({len(las.points):,} points)")
logger.info(f" Rasterisation {width}x{height} ({len(las.points):,} points)...")
stat = binned_statistic_2d(
las.x, las.y, las.z,
statistic='mean',
bins=[width, height],
range=[[min_x, max_x], [min_y, max_y]]
)
dtm = stat.statistic.T
dtm = dtm[::-1, :] # Flip Y so north is at top
# Fill gaps using interpolation
mask = np.isnan(dtm)
if np.any(mask):
logger.info(" Remplissage des zones vides...")
dist_to_nearest = distance_transform_edt(mask)
max_fill_distance = max(20, int(10 / resolution))
y_coords, x_coords = np.where(~mask)
z_values = dtm[~mask]
if len(y_coords) > 100:
interp = interpolate.NearestNDInterpolator(
np.column_stack((y_coords, x_coords)),
z_values
)
y_missing, x_missing = np.where(mask)
dtm[y_missing, x_missing] = interp(y_missing, x_missing)
dtm_smooth = gaussian_filter(dtm, sigma=2.0)
fill_mask = mask & (dist_to_nearest <= max_fill_distance)
dtm[fill_mask] = dtm_smooth[fill_mask]
far_mask = mask & (dist_to_nearest > max_fill_distance)
dtm[far_mask] = np.nan
# Save as GeoTIFF
output_tif = dtm_dir / f"{basename}_dtm.tif"
transform = from_bounds(min_x, min_y, max_x, max_y, width, height)
with rasterio.open(
output_tif, 'w',
driver='GTiff', height=height, width=width,
count=1, dtype='float32',
crs='EPSG:2154', transform=transform,
compress='lzw'
) as dst:
dst.write(dtm.astype('float32'), 1)
logger.info(f" ✓ DTM créé: {output_tif.name}")
return output_tif
except Exception as e:
logger.error(f" ✗ Erreur DTM: {e}", exc_info=True)
return None