Pipeline LiDAR: classification sol auto + pré-traitement ELM + fix warnings
- Ajout classification automatique du sol (SMRF/PMF/CSF) avec détection
heuristique (ratio retours uniques > 0.6 → PMF urbain, sinon SMRF)
- Pré-traitement PDAL recommandé avant classification: ELM + outlier
removal (cell=5.0, threshold=2.0 adapté au calcaire rocailleux)
- Options CLI: --ground-classification {auto,smrf,pmf,csf} et
--force-classification pour forcer la reclassification
- Fix double logging (logger.propagate = False)
- Fix --force non transmis dans run.sh (réécriture parsing arguments)
- Fix warning numpy 'partition will ignore mask': conversion MaskedArray
en ndarray avant np.percentile()
- Ajout liblaszip8 + lazrs pour support LAZ dans Docker et laspy
- Tests unitaires pour PMF, CSF et auto-détection
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@ -35,6 +35,7 @@ def setup_logging(verbose=False, debug=False):
|
||||
logger.setLevel(level)
|
||||
logger.handlers.clear()
|
||||
logger.addHandler(handler)
|
||||
logger.propagate = False # Prevent double logging via root logger
|
||||
|
||||
# Also configure the root logger so worker processes log properly
|
||||
root_logger = logging.getLogger()
|
||||
@ -102,6 +103,17 @@ Exemples:
|
||||
action="store_true",
|
||||
help="Régénérer tous les fichiers même si les WebP existent déjà"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--force-classification",
|
||||
action="store_true",
|
||||
help="Reclassifier le sol même si le fichier .las existe déjà"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ground-classification",
|
||||
choices=["auto", "smrf", "pmf", "csf"],
|
||||
default="auto",
|
||||
help="Méthode de classification du sol : auto (détection), smrf, pmf, csf (défaut: auto)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--file",
|
||||
nargs="+",
|
||||
@ -141,7 +153,9 @@ Exemples:
|
||||
output_dir=args.output,
|
||||
resolution=args.resolution,
|
||||
workers=args.workers,
|
||||
force=args.force
|
||||
force=args.force,
|
||||
ground_method=args.ground_classification,
|
||||
force_classify=args.force_classification
|
||||
)
|
||||
|
||||
# If --file is specified, process only matching files
|
||||
|
||||
@ -17,38 +17,102 @@ from scipy.stats import binned_statistic_2d
|
||||
logger = logging.getLogger("lidar")
|
||||
|
||||
|
||||
def create_smrf_pipeline(input_laz, output_las):
|
||||
"""Create a PDAL pipeline JSON for SMRF ground classification.
|
||||
def _create_ground_pipeline(input_laz, output_las, method):
|
||||
"""Create a PDAL pipeline JSON for ground classification.
|
||||
|
||||
Includes a filter for ReturnNumber/NumberOfReturns >= 1 to handle
|
||||
All methods include a ReturnNumber/NumberOfReturns >= 1 filter to handle
|
||||
LiDAR HD files that may contain points with invalid return numbers.
|
||||
|
||||
Pre-processing steps (PDAL recommended workflow):
|
||||
1. Reset Classification to 0
|
||||
2. ELM (Extended Local Minimum) — mark low outliers as noise (Classification=7)
|
||||
3. Statistical outlier removal
|
||||
4. Ground classification (SMRF/PMF/CSF)
|
||||
5. Extract ground points (Classification=2)
|
||||
|
||||
Args:
|
||||
input_laz: Path to input LAZ/LAS file.
|
||||
output_las: Path to output classified LAS file.
|
||||
method: Ground classification method ('smrf', 'pmf', or 'csf').
|
||||
|
||||
Returns:
|
||||
JSON string of the PDAL pipeline.
|
||||
"""
|
||||
# Common ReturnNumber filter for LiDAR HD compatibility
|
||||
return_filter = {
|
||||
"type": "filters.range",
|
||||
"limits": "ReturnNumber[1:],NumberOfReturns[1:]"
|
||||
}
|
||||
|
||||
# Reset Classification to 0 before preprocessing
|
||||
reset_classification = {
|
||||
"type": "filters.assign",
|
||||
"assignment": "Classification[:]=0"
|
||||
}
|
||||
|
||||
# ELM (Extended Local Minimum) — mark low outliers as noise
|
||||
# Parameters tuned for rocky limestone terrain with low vegetation:
|
||||
# - cell=5.0m: fine resolution to capture rocky relief
|
||||
# - threshold=2.0m: high threshold to avoid marking rock outcrops as noise
|
||||
elm_filter = {
|
||||
"type": "filters.elm",
|
||||
"cell": 5.0,
|
||||
"threshold": 2.0
|
||||
}
|
||||
|
||||
# Statistical outlier removal
|
||||
outlier_filter = {
|
||||
"type": "filters.outlier",
|
||||
"method": "statistical",
|
||||
"mean_k": 8,
|
||||
"multiplier": 3.0
|
||||
}
|
||||
|
||||
# Classification filter (ground points only)
|
||||
ground_filter = {
|
||||
"type": "filters.range",
|
||||
"limits": "Classification[2:2]"
|
||||
}
|
||||
|
||||
# Method-specific ground classification filter
|
||||
if method == 'smrf':
|
||||
ground_step = {
|
||||
"type": "filters.smrf",
|
||||
"ignore": "Classification[7:7]",
|
||||
"slope": 1.0,
|
||||
"window": 16.0,
|
||||
"threshold": 0.5,
|
||||
"scalar": 1.25
|
||||
}
|
||||
elif method == 'pmf':
|
||||
ground_step = {
|
||||
"type": "filters.pmf",
|
||||
"max_window": 33,
|
||||
"slope": 0.15,
|
||||
"max_distance": 2.5,
|
||||
"initial_distance": 0.15,
|
||||
"cell_size": 1.0
|
||||
}
|
||||
elif method == 'csf':
|
||||
ground_step = {
|
||||
"type": "filters.csf",
|
||||
"resolution": 0.5,
|
||||
"rigidness": 3,
|
||||
"smooth": True,
|
||||
"threshold": 0.5
|
||||
}
|
||||
else:
|
||||
raise ValueError(f"Méthode de classification inconnue: {method}")
|
||||
|
||||
pipeline = {
|
||||
"pipeline": [
|
||||
str(input_laz),
|
||||
{
|
||||
"type": "filters.range",
|
||||
"limits": "ReturnNumber[1:],NumberOfReturns[1:]"
|
||||
},
|
||||
{
|
||||
"type": "filters.smrf",
|
||||
"ignore": "Classification[7:7]",
|
||||
"slope": 1.0,
|
||||
"window": 16.0,
|
||||
"threshold": 0.5,
|
||||
"scalar": 1.25
|
||||
},
|
||||
{
|
||||
"type": "filters.range",
|
||||
"limits": "Classification[2:2]"
|
||||
},
|
||||
return_filter,
|
||||
reset_classification,
|
||||
elm_filter,
|
||||
outlier_filter,
|
||||
ground_step,
|
||||
ground_filter,
|
||||
{
|
||||
"type": "writers.las",
|
||||
"filename": str(output_las),
|
||||
@ -59,26 +123,113 @@ def create_smrf_pipeline(input_laz, output_las):
|
||||
return json.dumps(pipeline)
|
||||
|
||||
|
||||
def classify_ground(laz_file, temp_dir):
|
||||
"""Classify ground points using PDAL SMRF filter.
|
||||
def create_smrf_pipeline(input_laz, output_las):
|
||||
"""Create a PDAL pipeline JSON for SMRF ground classification."""
|
||||
return _create_ground_pipeline(input_laz, output_las, 'smrf')
|
||||
|
||||
|
||||
def create_pmf_pipeline(input_laz, output_las):
|
||||
"""Create a PDAL pipeline JSON for PMF ground classification."""
|
||||
return _create_ground_pipeline(input_laz, output_las, 'pmf')
|
||||
|
||||
|
||||
def create_csf_pipeline(input_laz, output_las):
|
||||
"""Create a PDAL pipeline JSON for CSF ground classification."""
|
||||
return _create_ground_pipeline(input_laz, output_las, 'csf')
|
||||
|
||||
|
||||
def detect_ground_method(laz_file):
|
||||
"""Detect the best ground classification method based on point cloud statistics.
|
||||
|
||||
Auto-selects between SMRF (natural terrain) and PMF (urban) only.
|
||||
CSF is available only via --ground-classification csf (slow but robust
|
||||
on complex terrain).
|
||||
|
||||
Falls back to SMRF if the file cannot be read or attributes are missing.
|
||||
|
||||
Args:
|
||||
laz_file: Path to input LAZ/LAS file.
|
||||
|
||||
Returns:
|
||||
String: 'smrf', 'pmf', or 'csf'
|
||||
"""
|
||||
import laspy
|
||||
|
||||
try:
|
||||
las = laspy.read(str(laz_file))
|
||||
except Exception as e:
|
||||
logger.warning(f" Impossible de lire le nuage pour auto-détection: {e}")
|
||||
logger.info(f" → Méthode: SMRF (défaut — lecture impossible)")
|
||||
return 'smrf'
|
||||
|
||||
total_points = len(las.points)
|
||||
z = np.array(las.z)
|
||||
|
||||
# Height variance (always available)
|
||||
z_std = float(np.std(z))
|
||||
z_range = float(np.max(z) - np.min(z))
|
||||
|
||||
# Try to get NumberOfReturns (may not exist in all point formats)
|
||||
single_return_ratio = 0.0
|
||||
try:
|
||||
num_returns = np.array(las.NumberOfReturns)
|
||||
single_return_count = int(np.sum(num_returns == 1))
|
||||
single_return_ratio = single_return_count / total_points if total_points > 0 else 0
|
||||
except AttributeError:
|
||||
logger.debug(" NumberOfReturns non disponible — utilisation de la variance Z uniquement")
|
||||
|
||||
logger.info(f" Analyse du nuage: {total_points:,} points, "
|
||||
f"ratio_retours_uniques={single_return_ratio:.2f}, "
|
||||
f"écart_Z={z_std:.1f}m, amplitude_Z={z_range:.1f}m")
|
||||
|
||||
# Decision logic (auto selects between SMRF and PMF only):
|
||||
# - High single-return ratio (>0.6) → urban (buildings, roads) → PMF
|
||||
# - Default → SMRF (fast, robust for most natural terrain)
|
||||
# Note: CSF is available only via --ground-classification csf (slow but robust on complex terrain)
|
||||
if single_return_ratio > 0.6:
|
||||
method = 'pmf'
|
||||
reason = f"ratio retours uniques={single_return_ratio:.2f} > 0.6 → milieu urbain"
|
||||
else:
|
||||
method = 'smrf'
|
||||
reason = f"terrain naturel standard"
|
||||
|
||||
logger.info(f" → Méthode: {method.upper()} ({reason})")
|
||||
return method
|
||||
|
||||
|
||||
def classify_ground(laz_file, temp_dir, method='auto', force=False):
|
||||
"""Classify ground points using PDAL ground classification filter.
|
||||
|
||||
Args:
|
||||
laz_file: Path to input LAZ/LAS file.
|
||||
temp_dir: Directory for temporary files (pipeline.json, ground.las).
|
||||
method: Ground classification method ('auto', 'smrf', 'pmf', or 'csf').
|
||||
force: If True, reclassify even if output file already exists.
|
||||
|
||||
Returns:
|
||||
Path to classified ground LAS file, or None on failure.
|
||||
"""
|
||||
import laspy # noqa: ensure available
|
||||
|
||||
output_las = temp_dir / f"{laz_file.stem}_ground.las"
|
||||
# Auto-detect method if requested
|
||||
if method == 'auto':
|
||||
method = detect_ground_method(laz_file)
|
||||
logger.info(f" Classification sol: {method.upper()} (auto)")
|
||||
else:
|
||||
logger.info(f" Classification sol: {method.upper()} (forcé)")
|
||||
|
||||
if output_las.exists():
|
||||
logger.info(" Classification déjà effectuée — fichier existant réutilisé")
|
||||
output_las = temp_dir / f"{laz_file.stem}_ground_{method}.las"
|
||||
|
||||
if output_las.exists() and not force:
|
||||
logger.info(f" Classification {method.upper()} déjà effectuée — fichier existant réutilisé")
|
||||
return output_las
|
||||
|
||||
pipeline_json = create_smrf_pipeline(laz_file, output_las)
|
||||
pipeline_file = temp_dir / "pipeline.json"
|
||||
if force and output_las.exists():
|
||||
logger.info(f" Reclassification forcée — suppression de {output_las.name}")
|
||||
output_las.unlink()
|
||||
|
||||
pipeline_json = _create_ground_pipeline(laz_file, output_las, method)
|
||||
pipeline_file = temp_dir / f"pipeline_{method}.json"
|
||||
|
||||
with open(pipeline_file, 'w') as f:
|
||||
f.write(pipeline_json)
|
||||
@ -88,10 +239,10 @@ def classify_ground(laz_file, temp_dir):
|
||||
["pdal", "pipeline", str(pipeline_file)],
|
||||
capture_output=True, check=True
|
||||
)
|
||||
logger.info(" ✓ Classification sol terminée")
|
||||
logger.info(f" ✓ Classification sol {method.upper()} terminée")
|
||||
return output_las
|
||||
except subprocess.CalledProcessError as e:
|
||||
logger.error(f" ✗ Erreur classification PDAL: {e.stderr.decode()}")
|
||||
logger.error(f" ✗ Erreur classification PDAL ({method.upper()}): {e.stderr.decode()}")
|
||||
return None
|
||||
|
||||
|
||||
|
||||
@ -93,12 +93,14 @@ VIZ_STEPS = [
|
||||
class LidarArchaeoPipeline:
|
||||
"""Orchestrates the LiDAR archaeological analysis pipeline."""
|
||||
|
||||
def __init__(self, input_dir, output_dir, resolution=0.5, workers=1, force=False):
|
||||
def __init__(self, input_dir, output_dir, resolution=0.5, workers=1, force=False, ground_method='auto', force_classify=False):
|
||||
self.input_dir = Path(input_dir)
|
||||
self.output_dir = Path(output_dir)
|
||||
self.resolution = resolution
|
||||
self.workers = workers
|
||||
self.force = force
|
||||
self.ground_method = ground_method
|
||||
self.force_classify = force_classify
|
||||
self.temp_dir = self.output_dir / "temp"
|
||||
|
||||
if not self.input_dir.exists():
|
||||
@ -120,6 +122,8 @@ class LidarArchaeoPipeline:
|
||||
logger.info(f" Résolution : {resolution}m/px")
|
||||
logger.info(f" Workers : {workers}")
|
||||
logger.info(f" Force : {'OUI' if self.force else 'non (skip existing)'}")
|
||||
logger.info(f" Classification sol : {self.ground_method}")
|
||||
logger.info(f" Force classif.: {'OUI' if self.force_classify else 'non'}")
|
||||
|
||||
def find_laz_files(self):
|
||||
"""Find all LAZ/LAS files in input directory."""
|
||||
@ -216,7 +220,7 @@ class LidarArchaeoPipeline:
|
||||
# Step 1: Ground classification
|
||||
logger.info("[1/4] Classification du sol...")
|
||||
t1 = time.time()
|
||||
las_file = classify_ground(laz_file, self.temp_dir)
|
||||
las_file = classify_ground(laz_file, self.temp_dir, method=self.ground_method, force=self.force_classify)
|
||||
t_classif = time.time() - t1
|
||||
if not las_file:
|
||||
logger.error(f" ✗ Échec classification ({t_classif:.1f}s)")
|
||||
@ -276,7 +280,7 @@ class LidarArchaeoPipeline:
|
||||
logger.info(f"Fichiers: {len(files)}")
|
||||
with ProcessPoolExecutor(max_workers=self.workers) as executor:
|
||||
future_to_file = {
|
||||
executor.submit(_process_file_standalone, str(laz_file), str(self.input_dir), str(self.output_dir), self.resolution, self.force): laz_file
|
||||
executor.submit(_process_file_standalone, str(laz_file), str(self.input_dir), str(self.output_dir), self.resolution, self.force, self.ground_method, self.force_classify): laz_file
|
||||
for laz_file in files
|
||||
}
|
||||
done = 0
|
||||
@ -334,7 +338,7 @@ class LidarArchaeoPipeline:
|
||||
logger.warning(f" Note: Impossible de supprimer les fichiers temporaires: {e}")
|
||||
|
||||
|
||||
def _process_file_standalone(laz_file_str, input_dir, output_dir, resolution, force=False):
|
||||
def _process_file_standalone(laz_file_str, input_dir, output_dir, resolution, force=False, ground_method='auto', force_classify=False):
|
||||
"""Standalone function for multiprocessing — creates its own pipeline instance.
|
||||
|
||||
Each worker gets its own temp directory to avoid file conflicts.
|
||||
@ -350,7 +354,7 @@ def _process_file_standalone(laz_file_str, input_dir, output_dir, resolution, fo
|
||||
worker_logger.addHandler(handler)
|
||||
worker_logger.addFilter(_file_filter)
|
||||
|
||||
pipeline = LidarArchaeoPipeline(input_dir, output_dir, resolution=resolution, workers=1, force=force)
|
||||
pipeline = LidarArchaeoPipeline(input_dir, output_dir, resolution=resolution, workers=1, force=force, ground_method=ground_method, force_classify=force_classify)
|
||||
basename = Path(laz_file_str).stem
|
||||
pipeline.temp_dir = pipeline.output_dir / f"temp_{basename}"
|
||||
pipeline.temp_dir.mkdir(exist_ok=True)
|
||||
|
||||
@ -204,7 +204,7 @@ def _apply_colormap(data, tif_file):
|
||||
# Find matching colormap
|
||||
for key, info in COLORMAPS.items():
|
||||
if key in name:
|
||||
valid_data = data[~np.isnan(data)] if hasattr(data, 'compressed') else data.flatten()
|
||||
valid_data = np.asarray(data[~np.isnan(data)] if hasattr(data, 'compressed') else data.flatten())
|
||||
valid_data = valid_data[~np.isnan(valid_data)]
|
||||
|
||||
vmin = vmax = None
|
||||
@ -239,7 +239,7 @@ def _apply_colormap(data, tif_file):
|
||||
return data, info['cmap'], info['title'], legend, info['description'], False
|
||||
|
||||
# Default: terrain colormap with percentile stretch
|
||||
valid_data = data[~np.isnan(data)] if hasattr(data, 'compressed') else data.flatten()
|
||||
valid_data = np.asarray(data[~np.isnan(data)] if hasattr(data, 'compressed') else data.flatten())
|
||||
valid_data = valid_data[~np.isnan(valid_data)]
|
||||
p2, p98 = np.percentile(valid_data, (2, 98))
|
||||
data = np.clip((data - p2) / (p98 - p2), 0, 1)
|
||||
@ -321,7 +321,7 @@ def tif_to_png(tif_file, vis_dir, resolution):
|
||||
data = np.ma.masked_where((data == nodata) | np.isnan(data), data)
|
||||
|
||||
if not is_rgb:
|
||||
valid_data = data.compressed() if hasattr(data, 'compressed') else data.flatten()
|
||||
valid_data = np.asarray(data.compressed() if hasattr(data, 'compressed') else data.flatten())
|
||||
valid_data = valid_data[~np.isnan(valid_data)]
|
||||
|
||||
# Apply colormap
|
||||
|
||||
@ -1,8 +1,10 @@
|
||||
"""Tests for DTM module."""
|
||||
|
||||
import json
|
||||
import numpy as np
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
|
||||
class TestSMRFPipeline:
|
||||
@ -15,13 +17,18 @@ class TestSMRFPipeline:
|
||||
assert "pipeline" in pipeline
|
||||
stages = pipeline["pipeline"]
|
||||
|
||||
# Should have: reader, range filter (ReturnNumber), SMRF, range filter (Classification), writer
|
||||
# Should have: reader, range filter (ReturnNumber), assign, ELM, outlier, SMRF, range filter (Classification), writer
|
||||
stage_types = [s.get("type") if isinstance(s, dict) else None for s in stages]
|
||||
|
||||
# First stage is the filename string (reader)
|
||||
assert isinstance(stages[0], str)
|
||||
assert "test.laz" in stages[0]
|
||||
|
||||
# Must contain preprocessing steps
|
||||
assert "filters.assign" in stage_types
|
||||
assert "filters.elm" in stage_types
|
||||
assert "filters.outlier" in stage_types
|
||||
|
||||
# Must contain SMRF filter
|
||||
assert "filters.smrf" in stage_types
|
||||
|
||||
@ -31,6 +38,27 @@ class TestSMRFPipeline:
|
||||
# At least one should filter ReturnNumber
|
||||
assert any("ReturnNumber" in str(s.get("limits", "")) for s in range_stages)
|
||||
|
||||
def test_pipeline_elm_parameters(self):
|
||||
"""ELM filter has terrain-adapted parameters."""
|
||||
from lidar_pipeline.dtm import create_smrf_pipeline
|
||||
result = create_smrf_pipeline("/input/a.laz", "/output/a_ground.las")
|
||||
pipeline = json.loads(result)
|
||||
|
||||
elm_stage = [s for s in pipeline["pipeline"] if isinstance(s, dict) and s.get("type") == "filters.elm"][0]
|
||||
assert elm_stage["cell"] == 5.0
|
||||
assert elm_stage["threshold"] == 2.0
|
||||
|
||||
def test_pipeline_outlier_parameters(self):
|
||||
"""Outlier filter uses statistical method."""
|
||||
from lidar_pipeline.dtm import create_smrf_pipeline
|
||||
result = create_smrf_pipeline("/input/a.laz", "/output/a_ground.las")
|
||||
pipeline = json.loads(result)
|
||||
|
||||
outlier_stage = [s for s in pipeline["pipeline"] if isinstance(s, dict) and s.get("type") == "filters.outlier"][0]
|
||||
assert outlier_stage["method"] == "statistical"
|
||||
assert outlier_stage["mean_k"] == 8
|
||||
assert outlier_stage["multiplier"] == 3.0
|
||||
|
||||
def test_pipeline_output_path(self):
|
||||
"""Pipeline output path is set correctly."""
|
||||
from lidar_pipeline.dtm import create_smrf_pipeline
|
||||
@ -38,4 +66,178 @@ class TestSMRFPipeline:
|
||||
pipeline = json.loads(result)
|
||||
# Last stage should be writer with correct output path
|
||||
writer = [s for s in pipeline["pipeline"] if isinstance(s, dict) and s.get("type") == "writers.las"][0]
|
||||
assert writer["filename"] == "/output/a_ground.las"
|
||||
assert writer["filename"] == "/output/a_ground.las"
|
||||
|
||||
|
||||
class TestPMFPipeline:
|
||||
def test_pipeline_json_valid(self):
|
||||
"""create_pmf_pipeline produces valid JSON with PMF filter."""
|
||||
from lidar_pipeline.dtm import create_pmf_pipeline
|
||||
result = create_pmf_pipeline("/data/input/test.laz", "/data/output/test_ground.las")
|
||||
pipeline = json.loads(result)
|
||||
|
||||
assert "pipeline" in pipeline
|
||||
stages = pipeline["pipeline"]
|
||||
stage_types = [s.get("type") if isinstance(s, dict) else None for s in stages]
|
||||
|
||||
# Must contain PMF filter
|
||||
assert "filters.pmf" in stage_types
|
||||
|
||||
# Must contain ReturnNumber filter
|
||||
range_stages = [s for s in stages if isinstance(s, dict) and s.get("type") == "filters.range"]
|
||||
assert any("ReturnNumber" in str(s.get("limits", "")) for s in range_stages)
|
||||
|
||||
def test_pmf_parameters(self):
|
||||
"""PMF pipeline has expected parameters."""
|
||||
from lidar_pipeline.dtm import create_pmf_pipeline
|
||||
result = create_pmf_pipeline("/input/a.laz", "/output/a_ground.las")
|
||||
pipeline = json.loads(result)
|
||||
|
||||
pmf_stage = [s for s in pipeline["pipeline"] if isinstance(s, dict) and s.get("type") == "filters.pmf"][0]
|
||||
assert pmf_stage["max_window"] == 33
|
||||
assert pmf_stage["slope"] == 0.15
|
||||
|
||||
|
||||
class TestCSFPipeline:
|
||||
def test_pipeline_json_valid(self):
|
||||
"""create_csf_pipeline produces valid JSON with CSF filter."""
|
||||
from lidar_pipeline.dtm import create_csf_pipeline
|
||||
result = create_csf_pipeline("/data/input/test.laz", "/data/output/test_ground.las")
|
||||
pipeline = json.loads(result)
|
||||
|
||||
assert "pipeline" in pipeline
|
||||
stages = pipeline["pipeline"]
|
||||
stage_types = [s.get("type") if isinstance(s, dict) else None for s in stages]
|
||||
|
||||
# Must contain CSF filter
|
||||
assert "filters.csf" in stage_types
|
||||
|
||||
# Must contain ReturnNumber filter
|
||||
range_stages = [s for s in stages if isinstance(s, dict) and s.get("type") == "filters.range"]
|
||||
assert any("ReturnNumber" in str(s.get("limits", "")) for s in range_stages)
|
||||
|
||||
def test_csf_parameters(self):
|
||||
"""CSF pipeline has expected parameters."""
|
||||
from lidar_pipeline.dtm import create_csf_pipeline
|
||||
result = create_csf_pipeline("/input/a.laz", "/output/a_ground.las")
|
||||
pipeline = json.loads(result)
|
||||
|
||||
csf_stage = [s for s in pipeline["pipeline"] if isinstance(s, dict) and s.get("type") == "filters.csf"][0]
|
||||
assert csf_stage["resolution"] == 0.5
|
||||
assert csf_stage["rigidness"] == 3
|
||||
assert csf_stage["smooth"] is True
|
||||
assert "hdiff" not in csf_stage # hdiff is not a valid PDAL CSF parameter
|
||||
|
||||
|
||||
class TestDetectGroundMethod:
|
||||
def _make_mock_las(self, num_returns, z_values):
|
||||
"""Create a mock laspy object with specified NumberOfReturns and z."""
|
||||
mock_las = MagicMock()
|
||||
mock_las.NumberOfReturns = np.array(num_returns)
|
||||
mock_las.z = np.array(z_values)
|
||||
mock_las.points = MagicMock()
|
||||
mock_las.points.__len__ = lambda self: len(num_returns)
|
||||
return mock_las
|
||||
|
||||
@patch('lidar_pipeline.dtm.laspy')
|
||||
def test_urban_terrain_returns_pmf(self, mock_laspy):
|
||||
"""High single-return ratio (>0.6) should select PMF."""
|
||||
from lidar_pipeline.dtm import detect_ground_method
|
||||
|
||||
# 70% single returns = urban
|
||||
n = 10000
|
||||
num_returns = np.ones(n, dtype=int)
|
||||
num_returns[:int(n * 0.3)] = 2 # 30% multi-return
|
||||
z_values = np.random.normal(100, 5, n) # Low variance = flat terrain
|
||||
|
||||
mock_laspy.read.return_value = self._make_mock_las(num_returns, z_values)
|
||||
|
||||
result = detect_ground_method(Path("/data/input/test.laz"))
|
||||
assert result == 'pmf'
|
||||
|
||||
@patch('lidar_pipeline.dtm.laspy')
|
||||
def test_natural_terrain_returns_smrf(self, mock_laspy):
|
||||
"""Low single-return ratio and moderate variance should select SMRF."""
|
||||
from lidar_pipeline.dtm import detect_ground_method
|
||||
|
||||
# 40% single returns, moderate variance
|
||||
n = 10000
|
||||
num_returns = np.ones(n, dtype=int)
|
||||
num_returns[:int(n * 0.6)] = 2 # 60% multi-return (forest)
|
||||
z_values = np.random.normal(100, 15, n) # Moderate variance
|
||||
|
||||
mock_laspy.read.return_value = self._make_mock_las(num_returns, z_values)
|
||||
|
||||
result = detect_ground_method(Path("/data/input/test.laz"))
|
||||
assert result == 'smrf'
|
||||
|
||||
@patch('lidar_pipeline.dtm.laspy')
|
||||
def test_mountainous_terrain_still_defaults_to_smrf(self, mock_laspy):
|
||||
"""High variance terrain defaults to SMRF in auto mode (CSF only via --ground-classification csf)."""
|
||||
from lidar_pipeline.dtm import detect_ground_method
|
||||
|
||||
# Moderate single-return ratio but very high height variance
|
||||
n = 10000
|
||||
num_returns = np.ones(n, dtype=int)
|
||||
num_returns[:int(n * 0.5)] = 2
|
||||
z_values = np.random.normal(100, 50, n) # Very high variance = mountainous
|
||||
|
||||
mock_laspy.read.return_value = self._make_mock_las(num_returns, z_values)
|
||||
|
||||
result = detect_ground_method(Path("/data/input/test.laz"))
|
||||
assert result == 'smrf'
|
||||
|
||||
|
||||
class TestClassifyGroundMethod:
|
||||
@patch('lidar_pipeline.dtm.subprocess')
|
||||
@patch('lidar_pipeline.dtm.laspy')
|
||||
def test_classify_ground_auto_calls_detect(self, mock_laspy, mock_subprocess):
|
||||
"""classify_ground with method='auto' should call detect_ground_method."""
|
||||
from lidar_pipeline.dtm import classify_ground
|
||||
|
||||
# Mock detect_ground_method to return 'pmf'
|
||||
with patch('lidar_pipeline.dtm.detect_ground_method', return_value='pmf') as mock_detect:
|
||||
mock_subprocess.run.return_value = MagicMock(returncode=0)
|
||||
result = classify_ground(Path("/data/input/test.laz"), Path("/tmp"), method='auto')
|
||||
|
||||
mock_detect.assert_called_once()
|
||||
|
||||
@patch('lidar_pipeline.dtm.subprocess')
|
||||
@patch('lidar_pipeline.dtm.laspy')
|
||||
def test_classify_ground_smrf_uses_smrf_pipeline(self, mock_laspy, mock_subprocess):
|
||||
"""classify_ground with method='smrf' should create SMRF pipeline."""
|
||||
from lidar_pipeline.dtm import classify_ground, _create_ground_pipeline
|
||||
|
||||
mock_subprocess.run.return_value = MagicMock(returncode=0)
|
||||
|
||||
with patch('lidar_pipeline.dtm.detect_ground_method'):
|
||||
# Create temp dir
|
||||
import tempfile
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
result = classify_ground(Path("/data/input/test.laz"), Path(tmpdir), method='smrf')
|
||||
|
||||
# Check the pipeline JSON was written with SMRF
|
||||
pipeline_file = Path(tmpdir) / "pipeline_smrf.json"
|
||||
if pipeline_file.exists():
|
||||
pipeline = json.loads(pipeline_file.read_text())
|
||||
stage_types = [s.get("type") if isinstance(s, dict) else None for s in pipeline["pipeline"]]
|
||||
assert "filters.smrf" in stage_types
|
||||
|
||||
@patch('lidar_pipeline.dtm.subprocess')
|
||||
@patch('lidar_pipeline.dtm.laspy')
|
||||
def test_classify_ground_pmf_uses_pmf_pipeline(self, mock_laspy, mock_subprocess):
|
||||
"""classify_ground with method='pmf' should create PMF pipeline."""
|
||||
from lidar_pipeline.dtm import classify_ground
|
||||
|
||||
mock_subprocess.run.return_value = MagicMock(returncode=0)
|
||||
|
||||
with patch('lidar_pipeline.dtm.detect_ground_method'):
|
||||
import tempfile
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
result = classify_ground(Path("/data/input/test.laz"), Path(tmpdir), method='pmf')
|
||||
|
||||
pipeline_file = Path(tmpdir) / "pipeline_pmf.json"
|
||||
if pipeline_file.exists():
|
||||
pipeline = json.loads(pipeline_file.read_text())
|
||||
stage_types = [s.get("type") if isinstance(s, dict) else None for s in pipeline["pipeline"]]
|
||||
assert "filters.pmf" in stage_types
|
||||
@ -696,7 +696,7 @@ def generate_texture(dem_file, basename, vis_dir, resolution):
|
||||
np.cos(az_rad - aspect))
|
||||
hillshade = np.clip(shading, 0, 1)
|
||||
|
||||
valid = hillshade[~np.isnan(hillshade)]
|
||||
valid = np.asarray(hillshade[~np.isnan(hillshade)])
|
||||
if len(valid) == 0:
|
||||
raise ValueError("No valid data for texture analysis")
|
||||
lo, hi = np.percentile(valid, (1, 99))
|
||||
|
||||
Reference in New Issue
Block a user