- Positions d'axes fixes (data_left/bottom/width/height_frac) pour alignement pixel-parfait entre terrain et ortho/topo - aspect='equal' au lieu de 'auto' pour conserver les proportions géographiques - Colorbar descriptive pour les visualisations RGB (ortho/topo) - Comblage des petits trous DTM (< 1m) via rasterio.fill.fillnodata - Suppression de la visualisation "dépressions" - Hillshade composite: 0.7*hillshade + 0.3*cos(slope) - D8 flow accumulation accéléré par numba JIT (fallback Python) - Flag --keep-tif pour conserver les TIFF intermédiaires - --force supprime aussi les TIF existants avant régénération - ETA affiché pendant la génération des visualisations - Répertoires temp dans temp/ pour traitement parallèle Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
330 lines
11 KiB
Python
330 lines
11 KiB
Python
"""DTM generation from classified LiDAR point clouds.
|
|
|
|
Handles ground classification via PDAL/SMRF and DTM rasterisation
|
|
using scipy binned_statistic_2d. Zones without LiDAR data remain as NaN.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
import rasterio
|
|
from rasterio.transform import from_bounds
|
|
from scipy.stats import binned_statistic_2d
|
|
|
|
logger = logging.getLogger("lidar")
|
|
|
|
|
|
def _create_ground_pipeline(input_laz, output_las, method):
|
|
"""Create a PDAL pipeline JSON for ground classification.
|
|
|
|
All methods include a ReturnNumber/NumberOfReturns >= 1 filter to handle
|
|
LiDAR HD files that may contain points with invalid return numbers.
|
|
|
|
Pre-processing steps (PDAL recommended workflow):
|
|
1. Reset Classification to 0
|
|
2. ELM (Extended Local Minimum) — mark low outliers as noise (Classification=7)
|
|
3. Statistical outlier removal
|
|
4. Ground classification (SMRF/PMF/CSF)
|
|
5. Extract ground points (Classification=2)
|
|
|
|
Args:
|
|
input_laz: Path to input LAZ/LAS file.
|
|
output_las: Path to output classified LAS file.
|
|
method: Ground classification method ('smrf', 'pmf', or 'csf').
|
|
|
|
Returns:
|
|
JSON string of the PDAL pipeline.
|
|
"""
|
|
# Common ReturnNumber filter for LiDAR HD compatibility
|
|
return_filter = {
|
|
"type": "filters.range",
|
|
"limits": "ReturnNumber[1:],NumberOfReturns[1:]"
|
|
}
|
|
|
|
# Reset Classification to 0 before preprocessing
|
|
reset_classification = {
|
|
"type": "filters.assign",
|
|
"assignment": "Classification[:]=0"
|
|
}
|
|
|
|
# ELM (Extended Local Minimum) — mark low outliers as noise
|
|
# Parameters tuned for rocky limestone terrain with low vegetation:
|
|
# - cell=5.0m: fine resolution to capture rocky relief
|
|
# - threshold=2.0m: high threshold to avoid marking rock outcrops as noise
|
|
elm_filter = {
|
|
"type": "filters.elm",
|
|
"cell": 5.0,
|
|
"threshold": 2.0
|
|
}
|
|
|
|
# Statistical outlier removal
|
|
outlier_filter = {
|
|
"type": "filters.outlier",
|
|
"method": "statistical",
|
|
"mean_k": 8,
|
|
"multiplier": 3.0
|
|
}
|
|
|
|
# Classification filter (ground points only)
|
|
ground_filter = {
|
|
"type": "filters.range",
|
|
"limits": "Classification[2:2]"
|
|
}
|
|
|
|
# Method-specific ground classification filter
|
|
if method == 'smrf':
|
|
ground_step = {
|
|
"type": "filters.smrf",
|
|
"ignore": "Classification[7:7]",
|
|
"slope": 1.0,
|
|
"window": 16.0,
|
|
"threshold": 0.5,
|
|
"scalar": 1.25
|
|
}
|
|
elif method == 'pmf':
|
|
ground_step = {
|
|
"type": "filters.pmf",
|
|
"max_window": 33,
|
|
"slope": 0.15,
|
|
"max_distance": 2.5,
|
|
"initial_distance": 0.15,
|
|
"cell_size": 1.0
|
|
}
|
|
elif method == 'csf':
|
|
ground_step = {
|
|
"type": "filters.csf",
|
|
"resolution": 0.5,
|
|
"rigidness": 3,
|
|
"smooth": True,
|
|
"threshold": 0.5
|
|
}
|
|
else:
|
|
raise ValueError(f"Méthode de classification inconnue: {method}")
|
|
|
|
pipeline = {
|
|
"pipeline": [
|
|
str(input_laz),
|
|
return_filter,
|
|
reset_classification,
|
|
elm_filter,
|
|
outlier_filter,
|
|
ground_step,
|
|
ground_filter,
|
|
{
|
|
"type": "writers.las",
|
|
"filename": str(output_las),
|
|
"extra_dims": "all"
|
|
}
|
|
]
|
|
}
|
|
return json.dumps(pipeline)
|
|
|
|
|
|
def create_smrf_pipeline(input_laz, output_las):
|
|
"""Create a PDAL pipeline JSON for SMRF ground classification."""
|
|
return _create_ground_pipeline(input_laz, output_las, 'smrf')
|
|
|
|
|
|
def create_pmf_pipeline(input_laz, output_las):
|
|
"""Create a PDAL pipeline JSON for PMF ground classification."""
|
|
return _create_ground_pipeline(input_laz, output_las, 'pmf')
|
|
|
|
|
|
def create_csf_pipeline(input_laz, output_las):
|
|
"""Create a PDAL pipeline JSON for CSF ground classification."""
|
|
return _create_ground_pipeline(input_laz, output_las, 'csf')
|
|
|
|
|
|
def detect_ground_method(laz_file):
|
|
"""Detect the best ground classification method based on point cloud statistics.
|
|
|
|
Auto-selects between SMRF (natural terrain) and PMF (urban) only.
|
|
CSF is available only via --ground-classification csf (slow but robust
|
|
on complex terrain).
|
|
|
|
Falls back to SMRF if the file cannot be read or attributes are missing.
|
|
|
|
Args:
|
|
laz_file: Path to input LAZ/LAS file.
|
|
|
|
Returns:
|
|
String: 'smrf', 'pmf', or 'csf'
|
|
"""
|
|
import laspy
|
|
|
|
try:
|
|
las = laspy.read(str(laz_file))
|
|
except Exception as e:
|
|
logger.warning(f" Impossible de lire le nuage pour auto-détection: {e}")
|
|
logger.info(f" → Méthode: SMRF (défaut — lecture impossible)")
|
|
return 'smrf'
|
|
|
|
total_points = len(las.points)
|
|
z = np.array(las.z)
|
|
|
|
# Height variance (always available)
|
|
z_std = float(np.std(z))
|
|
z_range = float(np.max(z) - np.min(z))
|
|
|
|
# Try to get NumberOfReturns (may not exist in all point formats)
|
|
single_return_ratio = 0.0
|
|
try:
|
|
num_returns = np.array(las.NumberOfReturns)
|
|
single_return_count = int(np.sum(num_returns == 1))
|
|
single_return_ratio = single_return_count / total_points if total_points > 0 else 0
|
|
except AttributeError:
|
|
logger.debug(" NumberOfReturns non disponible — utilisation de la variance Z uniquement")
|
|
|
|
logger.info(f" Analyse du nuage: {total_points:,} points, "
|
|
f"ratio_retours_uniques={single_return_ratio:.2f}, "
|
|
f"écart_Z={z_std:.1f}m, amplitude_Z={z_range:.1f}m")
|
|
|
|
# Decision logic (auto selects between SMRF and PMF only):
|
|
# - High single-return ratio (>0.6) → urban (buildings, roads) → PMF
|
|
# - Default → SMRF (fast, robust for most natural terrain)
|
|
# Note: CSF is available only via --ground-classification csf (slow but robust on complex terrain)
|
|
if single_return_ratio > 0.6:
|
|
method = 'pmf'
|
|
reason = f"ratio retours uniques={single_return_ratio:.2f} > 0.6 → milieu urbain"
|
|
else:
|
|
method = 'smrf'
|
|
reason = f"terrain naturel standard"
|
|
|
|
logger.info(f" → Méthode: {method.upper()} ({reason})")
|
|
return method
|
|
|
|
|
|
def classify_ground(laz_file, temp_dir, method='auto', force=False):
|
|
"""Classify ground points using PDAL ground classification filter.
|
|
|
|
Args:
|
|
laz_file: Path to input LAZ/LAS file.
|
|
temp_dir: Directory for temporary files (pipeline.json, ground.las).
|
|
method: Ground classification method ('auto', 'smrf', 'pmf', or 'csf').
|
|
force: If True, reclassify even if output file already exists.
|
|
|
|
Returns:
|
|
Path to classified ground LAS file, or None on failure.
|
|
"""
|
|
import laspy # noqa: ensure available
|
|
|
|
# Auto-detect method if requested
|
|
if method == 'auto':
|
|
method = detect_ground_method(laz_file)
|
|
logger.info(f" Classification sol: {method.upper()} (auto)")
|
|
else:
|
|
logger.info(f" Classification sol: {method.upper()} (forcé)")
|
|
|
|
# Use shared basename extraction function
|
|
from .pipeline import _file_basename
|
|
laz_base = _file_basename(laz_file)
|
|
|
|
output_las = temp_dir / f"{laz_base}_ground_{method}.las"
|
|
|
|
if output_las.exists() and not force:
|
|
logger.info(f" Classification {method.upper()} déjà effectuée — fichier existant réutilisé")
|
|
return output_las
|
|
|
|
if force and output_las.exists():
|
|
logger.info(f" Reclassification forcée — suppression de {output_las.name}")
|
|
output_las.unlink()
|
|
|
|
pipeline_json = _create_ground_pipeline(laz_file, output_las, method)
|
|
pipeline_file = temp_dir / f"pipeline_{method}.json"
|
|
|
|
with open(pipeline_file, 'w') as f:
|
|
f.write(pipeline_json)
|
|
|
|
try:
|
|
subprocess.run(
|
|
["pdal", "pipeline", str(pipeline_file)],
|
|
capture_output=True, check=True
|
|
)
|
|
logger.info(f" ✓ Classification sol {method.upper()} terminée")
|
|
return output_las
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error(f" ✗ Erreur classification PDAL ({method.upper()}): {e.stderr.decode()}")
|
|
return None
|
|
|
|
|
|
def create_dtm_fast(las_file, basename, dtm_dir, resolution):
|
|
"""Create DTM using fast binning method with gap filling.
|
|
|
|
Args:
|
|
las_file: Path to classified ground LAS file.
|
|
basename: Base name for output file.
|
|
dtm_dir: Directory for output DTM GeoTIFF.
|
|
resolution: Grid resolution in meters per pixel.
|
|
|
|
Returns:
|
|
Path to output DTM GeoTIFF, or None on failure.
|
|
"""
|
|
import laspy
|
|
|
|
logger.info(" → Génération DTM...")
|
|
|
|
try:
|
|
las = laspy.read(str(las_file))
|
|
|
|
min_x, max_x = float(las.header.min[0]), float(las.header.max[0])
|
|
min_y, max_y = float(las.header.min[1]), float(las.header.max[1])
|
|
|
|
width = int(np.ceil((max_x - min_x) / resolution))
|
|
height = int(np.ceil((max_y - min_y) / resolution))
|
|
|
|
logger.debug(f" Bounds: X[{min_x:.1f}, {max_x:.1f}] Y[{min_y:.1f}, {max_y:.1f}]")
|
|
logger.debug(f" Grid: {width}x{height} pixels ({len(las.points):,} points)")
|
|
logger.info(f" Rasterisation {width}x{height} ({len(las.points):,} points)...")
|
|
|
|
stat = binned_statistic_2d(
|
|
las.x, las.y, las.z,
|
|
statistic='mean',
|
|
bins=[width, height],
|
|
range=[[min_x, max_x], [min_y, max_y]]
|
|
)
|
|
|
|
dtm = stat.statistic.T
|
|
dtm = dtm[::-1, :] # Flip Y so north is at top
|
|
|
|
# Fill small gaps (< 1m from existing data) while keeping large gaps as NaN
|
|
nan_count = np.count_nonzero(np.isnan(dtm))
|
|
if nan_count > 0:
|
|
total = dtm.size
|
|
nan_pct = 100.0 * nan_count / total
|
|
logger.info(f" {nan_count:,} pixels sans données ({nan_pct:.1f}%)")
|
|
|
|
max_gap_pixels = max(1, int(1.0 / resolution))
|
|
from rasterio.fill import fillnodata
|
|
valid_mask = ~np.isnan(dtm)
|
|
dtm_filled = fillnodata(dtm, mask=valid_mask, max_search_distance=max_gap_pixels)
|
|
small_gap_mask = np.isnan(dtm) & ~np.isnan(dtm_filled)
|
|
filled_count = np.count_nonzero(small_gap_mask)
|
|
if filled_count > 0:
|
|
dtm = np.where(small_gap_mask, dtm_filled, dtm)
|
|
logger.info(f" {filled_count:,} petits trous comblés (< {max_gap_pixels}px)")
|
|
remaining = np.count_nonzero(np.isnan(dtm))
|
|
logger.info(f" {remaining:,} pixels restent sans données (grands écarts)")
|
|
|
|
# Save as GeoTIFF
|
|
output_tif = dtm_dir / f"{basename}_dtm.tif"
|
|
transform = from_bounds(min_x, min_y, max_x, max_y, width, height)
|
|
|
|
with rasterio.open(
|
|
output_tif, 'w',
|
|
driver='GTiff', height=height, width=width,
|
|
count=1, dtype='float32',
|
|
crs='EPSG:2154', transform=transform,
|
|
nodata=float('nan'),
|
|
compress='lzw'
|
|
) as dst:
|
|
dst.write(dtm.astype('float32'), 1)
|
|
|
|
logger.info(f" ✓ DTM créé: {output_tif.name}")
|
|
return output_tif
|
|
|
|
except Exception as e:
|
|
logger.error(f" ✗ Erreur DTM: {e}", exc_info=True)
|
|
return None |