"""DTM generation from classified LiDAR point clouds. Handles ground classification via PDAL/SMRF and DTM rasterisation using scipy binned_statistic_2d. Zones without LiDAR data remain as NaN. """ import json import logging import subprocess from pathlib import Path import numpy as np import rasterio from rasterio.transform import from_bounds from scipy.stats import binned_statistic_2d logger = logging.getLogger("lidar") def _create_ground_pipeline(input_laz, output_las, method): """Create a PDAL pipeline JSON for ground classification. All methods include a ReturnNumber/NumberOfReturns >= 1 filter to handle LiDAR HD files that may contain points with invalid return numbers. Pre-processing steps (PDAL recommended workflow): 1. Reset Classification to 0 2. ELM (Extended Local Minimum) — mark low outliers as noise (Classification=7) 3. Statistical outlier removal 4. Ground classification (SMRF/PMF/CSF) 5. Extract ground points (Classification=2) Args: input_laz: Path to input LAZ/LAS file. output_las: Path to output classified LAS file. method: Ground classification method ('smrf', 'pmf', or 'csf'). Returns: JSON string of the PDAL pipeline. """ # Common ReturnNumber filter for LiDAR HD compatibility return_filter = { "type": "filters.range", "limits": "ReturnNumber[1:],NumberOfReturns[1:]" } # Reset Classification to 0 before preprocessing reset_classification = { "type": "filters.assign", "assignment": "Classification[:]=0" } # ELM (Extended Local Minimum) — mark low outliers as noise # Parameters tuned for rocky limestone terrain with low vegetation: # - cell=5.0m: fine resolution to capture rocky relief # - threshold=2.0m: high threshold to avoid marking rock outcrops as noise elm_filter = { "type": "filters.elm", "cell": 5.0, "threshold": 2.0 } # Statistical outlier removal outlier_filter = { "type": "filters.outlier", "method": "statistical", "mean_k": 8, "multiplier": 3.0 } # Classification filter (ground points only) ground_filter = { "type": "filters.range", "limits": "Classification[2:2]" } # Method-specific ground classification filter if method == 'smrf': ground_step = { "type": "filters.smrf", "ignore": "Classification[7:7]", "slope": 1.0, "window": 16.0, "threshold": 0.5, "scalar": 1.25 } elif method == 'pmf': ground_step = { "type": "filters.pmf", "max_window": 33, "slope": 0.15, "max_distance": 2.5, "initial_distance": 0.15, "cell_size": 1.0 } elif method == 'csf': ground_step = { "type": "filters.csf", "resolution": 0.5, "rigidness": 3, "smooth": True, "threshold": 0.5 } else: raise ValueError(f"Méthode de classification inconnue: {method}") pipeline = { "pipeline": [ str(input_laz), return_filter, reset_classification, elm_filter, outlier_filter, ground_step, ground_filter, { "type": "writers.las", "filename": str(output_las), "extra_dims": "all" } ] } return json.dumps(pipeline) def create_smrf_pipeline(input_laz, output_las): """Create a PDAL pipeline JSON for SMRF ground classification.""" return _create_ground_pipeline(input_laz, output_las, 'smrf') def create_pmf_pipeline(input_laz, output_las): """Create a PDAL pipeline JSON for PMF ground classification.""" return _create_ground_pipeline(input_laz, output_las, 'pmf') def create_csf_pipeline(input_laz, output_las): """Create a PDAL pipeline JSON for CSF ground classification.""" return _create_ground_pipeline(input_laz, output_las, 'csf') def detect_ground_method(laz_file): """Detect the best ground classification method based on point cloud statistics. Auto-selects between SMRF (natural terrain) and PMF (urban) only. CSF is available only via --ground-classification csf (slow but robust on complex terrain). Falls back to SMRF if the file cannot be read or attributes are missing. Args: laz_file: Path to input LAZ/LAS file. Returns: String: 'smrf', 'pmf', or 'csf' """ import laspy try: las = laspy.read(str(laz_file)) except Exception as e: logger.warning(f" Impossible de lire le nuage pour auto-détection: {e}") logger.info(f" → Méthode: SMRF (défaut — lecture impossible)") return 'smrf' total_points = len(las.points) z = np.array(las.z) # Height variance (always available) z_std = float(np.std(z)) z_range = float(np.max(z) - np.min(z)) # Try to get NumberOfReturns (may not exist in all point formats) single_return_ratio = 0.0 try: num_returns = np.array(las.NumberOfReturns) single_return_count = int(np.sum(num_returns == 1)) single_return_ratio = single_return_count / total_points if total_points > 0 else 0 except AttributeError: logger.debug(" NumberOfReturns non disponible — utilisation de la variance Z uniquement") logger.info(f" Analyse du nuage: {total_points:,} points, " f"ratio_retours_uniques={single_return_ratio:.2f}, " f"écart_Z={z_std:.1f}m, amplitude_Z={z_range:.1f}m") # Decision logic (auto selects between SMRF and PMF only): # - High single-return ratio (>0.6) → urban (buildings, roads) → PMF # - Default → SMRF (fast, robust for most natural terrain) # Note: CSF is available only via --ground-classification csf (slow but robust on complex terrain) if single_return_ratio > 0.6: method = 'pmf' reason = f"ratio retours uniques={single_return_ratio:.2f} > 0.6 → milieu urbain" else: method = 'smrf' reason = f"terrain naturel standard" logger.info(f" → Méthode: {method.upper()} ({reason})") return method def classify_ground(laz_file, temp_dir, method='auto', force=False): """Classify ground points using PDAL ground classification filter. Args: laz_file: Path to input LAZ/LAS file. temp_dir: Directory for temporary files (pipeline.json, ground.las). method: Ground classification method ('auto', 'smrf', 'pmf', or 'csf'). force: If True, reclassify even if output file already exists. Returns: Path to classified ground LAS file, or None on failure. """ import laspy # noqa: ensure available # Auto-detect method if requested if method == 'auto': method = detect_ground_method(laz_file) logger.info(f" Classification sol: {method.upper()} (auto)") else: logger.info(f" Classification sol: {method.upper()} (forcé)") # Use shared basename extraction function from .pipeline import _file_basename laz_base = _file_basename(laz_file) output_las = temp_dir / f"{laz_base}_ground_{method}.las" if output_las.exists() and not force: logger.info(f" Classification {method.upper()} déjà effectuée — fichier existant réutilisé") return output_las if force and output_las.exists(): logger.info(f" Reclassification forcée — suppression de {output_las.name}") output_las.unlink() pipeline_json = _create_ground_pipeline(laz_file, output_las, method) pipeline_file = temp_dir / f"pipeline_{method}.json" with open(pipeline_file, 'w') as f: f.write(pipeline_json) try: subprocess.run( ["pdal", "pipeline", str(pipeline_file)], capture_output=True, check=True ) logger.info(f" ✓ Classification sol {method.upper()} terminée") return output_las except subprocess.CalledProcessError as e: logger.error(f" ✗ Erreur classification PDAL ({method.upper()}): {e.stderr.decode()}") return None def create_dtm_fast(las_file, basename, dtm_dir, resolution): """Create DTM using fast binning method with gap filling. Args: las_file: Path to classified ground LAS file. basename: Base name for output file. dtm_dir: Directory for output DTM GeoTIFF. resolution: Grid resolution in meters per pixel. Returns: Path to output DTM GeoTIFF, or None on failure. """ import laspy logger.info(" → Génération DTM...") try: las = laspy.read(str(las_file)) min_x, max_x = float(las.header.min[0]), float(las.header.max[0]) min_y, max_y = float(las.header.min[1]), float(las.header.max[1]) width = int(np.ceil((max_x - min_x) / resolution)) height = int(np.ceil((max_y - min_y) / resolution)) logger.debug(f" Bounds: X[{min_x:.1f}, {max_x:.1f}] Y[{min_y:.1f}, {max_y:.1f}]") logger.debug(f" Grid: {width}x{height} pixels ({len(las.points):,} points)") logger.info(f" Rasterisation {width}x{height} ({len(las.points):,} points)...") stat = binned_statistic_2d( las.x, las.y, las.z, statistic='mean', bins=[width, height], range=[[min_x, max_x], [min_y, max_y]] ) dtm = stat.statistic.T dtm = dtm[::-1, :] # Flip Y so north is at top # Fill small gaps (< 1m from existing data) while keeping large gaps as NaN nan_count = np.count_nonzero(np.isnan(dtm)) if nan_count > 0: total = dtm.size nan_pct = 100.0 * nan_count / total logger.info(f" {nan_count:,} pixels sans données ({nan_pct:.1f}%)") max_gap_pixels = max(1, int(1.0 / resolution)) from rasterio.fill import fillnodata valid_mask = ~np.isnan(dtm) dtm_filled = fillnodata(dtm, mask=valid_mask, max_search_distance=max_gap_pixels) small_gap_mask = np.isnan(dtm) & ~np.isnan(dtm_filled) filled_count = np.count_nonzero(small_gap_mask) if filled_count > 0: dtm = np.where(small_gap_mask, dtm_filled, dtm) logger.info(f" {filled_count:,} petits trous comblés (< {max_gap_pixels}px)") remaining = np.count_nonzero(np.isnan(dtm)) logger.info(f" {remaining:,} pixels restent sans données (grands écarts)") # Save as GeoTIFF output_tif = dtm_dir / f"{basename}_dtm.tif" transform = from_bounds(min_x, min_y, max_x, max_y, width, height) with rasterio.open( output_tif, 'w', driver='GTiff', height=height, width=width, count=1, dtype='float32', crs='EPSG:2154', transform=transform, nodata=float('nan'), compress='lzw' ) as dst: dst.write(dtm.astype('float32'), 1) logger.info(f" ✓ DTM créé: {output_tif.name}") return output_tif except Exception as e: logger.error(f" ✗ Erreur DTM: {e}", exc_info=True) return None