"""DTM generation from classified LiDAR point clouds. Handles ground classification via PDAL/SMRF and DTM rasterisation using scipy binned_statistic_2d with gap-filling interpolation. """ import json import logging import subprocess from pathlib import Path import numpy as np import rasterio from rasterio.transform import from_bounds from scipy import ndimage as scipy_ndimage from scipy.ndimage import distance_transform_edt, gaussian_filter from scipy.stats import binned_statistic_2d from scipy import interpolate logger = logging.getLogger("lidar") def create_smrf_pipeline(input_laz, output_las): """Create a PDAL pipeline JSON for SMRF ground classification. Includes a filter for ReturnNumber/NumberOfReturns >= 1 to handle LiDAR HD files that may contain points with invalid return numbers. Args: input_laz: Path to input LAZ/LAS file. output_las: Path to output classified LAS file. Returns: JSON string of the PDAL pipeline. """ pipeline = { "pipeline": [ str(input_laz), { "type": "filters.range", "limits": "ReturnNumber[1:],NumberOfReturns[1:]" }, { "type": "filters.smrf", "ignore": "Classification[7:7]", "slope": 1.0, "window": 16.0, "threshold": 0.5, "scalar": 1.25 }, { "type": "filters.range", "limits": "Classification[2:2]" }, { "type": "writers.las", "filename": str(output_las), "extra_dims": "all" } ] } return json.dumps(pipeline) def classify_ground(laz_file, temp_dir): """Classify ground points using PDAL SMRF filter. Args: laz_file: Path to input LAZ/LAS file. temp_dir: Directory for temporary files (pipeline.json, ground.las). Returns: Path to classified ground LAS file, or None on failure. """ import laspy # noqa: ensure available output_las = temp_dir / f"{laz_file.stem}_ground.las" if output_las.exists(): logger.info(" Classification déjà effectuée — fichier existant réutilisé") return output_las pipeline_json = create_smrf_pipeline(laz_file, output_las) pipeline_file = temp_dir / "pipeline.json" with open(pipeline_file, 'w') as f: f.write(pipeline_json) try: subprocess.run( ["pdal", "pipeline", str(pipeline_file)], capture_output=True, check=True ) logger.info(" ✓ Classification sol terminée") return output_las except subprocess.CalledProcessError as e: logger.error(f" ✗ Erreur classification PDAL: {e.stderr.decode()}") return None def create_dtm_fast(las_file, basename, dtm_dir, resolution): """Create DTM using fast binning method with gap filling. Args: las_file: Path to classified ground LAS file. basename: Base name for output file. dtm_dir: Directory for output DTM GeoTIFF. resolution: Grid resolution in meters per pixel. Returns: Path to output DTM GeoTIFF, or None on failure. """ import laspy logger.info(" → Génération DTM...") try: las = laspy.read(str(las_file)) min_x, max_x = float(las.header.min[0]), float(las.header.max[0]) min_y, max_y = float(las.header.min[1]), float(las.header.max[1]) width = int(np.ceil((max_x - min_x) / resolution)) height = int(np.ceil((max_y - min_y) / resolution)) logger.debug(f" Bounds: X[{min_x:.1f}, {max_x:.1f}] Y[{min_y:.1f}, {max_y:.1f}]") logger.debug(f" Grid: {width}x{height} pixels ({len(las.points):,} points)") logger.info(f" Rasterisation {width}x{height} ({len(las.points):,} points)...") stat = binned_statistic_2d( las.x, las.y, las.z, statistic='mean', bins=[width, height], range=[[min_x, max_x], [min_y, max_y]] ) dtm = stat.statistic.T dtm = dtm[::-1, :] # Flip Y so north is at top # Fill gaps using interpolation mask = np.isnan(dtm) if np.any(mask): logger.info(" Remplissage des zones vides...") dist_to_nearest = distance_transform_edt(mask) max_fill_distance = max(20, int(10 / resolution)) y_coords, x_coords = np.where(~mask) z_values = dtm[~mask] if len(y_coords) > 100: interp = interpolate.NearestNDInterpolator( np.column_stack((y_coords, x_coords)), z_values ) y_missing, x_missing = np.where(mask) dtm[y_missing, x_missing] = interp(y_missing, x_missing) dtm_smooth = gaussian_filter(dtm, sigma=2.0) fill_mask = mask & (dist_to_nearest <= max_fill_distance) dtm[fill_mask] = dtm_smooth[fill_mask] far_mask = mask & (dist_to_nearest > max_fill_distance) dtm[far_mask] = np.nan # Save as GeoTIFF output_tif = dtm_dir / f"{basename}_dtm.tif" transform = from_bounds(min_x, min_y, max_x, max_y, width, height) with rasterio.open( output_tif, 'w', driver='GTiff', height=height, width=width, count=1, dtype='float32', crs='EPSG:2154', transform=transform, compress='lzw' ) as dst: dst.write(dtm.astype('float32'), 1) logger.info(f" ✓ DTM créé: {output_tif.name}") return output_tif except Exception as e: logger.error(f" ✗ Erreur DTM: {e}", exc_info=True) return None