- Ajout classification automatique du sol (SMRF/PMF/CSF) avec détection
heuristique (ratio retours uniques > 0.6 → PMF urbain, sinon SMRF)
- Pré-traitement PDAL recommandé avant classification: ELM + outlier
removal (cell=5.0, threshold=2.0 adapté au calcaire rocailleux)
- Options CLI: --ground-classification {auto,smrf,pmf,csf} et
--force-classification pour forcer la reclassification
- Fix double logging (logger.propagate = False)
- Fix --force non transmis dans run.sh (réécriture parsing arguments)
- Fix warning numpy 'partition will ignore mask': conversion MaskedArray
en ndarray avant np.percentile()
- Ajout liblaszip8 + lazrs pour support LAZ dans Docker et laspy
- Tests unitaires pour PMF, CSF et auto-détection
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
243 lines
11 KiB
Python
243 lines
11 KiB
Python
"""Tests for DTM module."""
|
|
|
|
import json
|
|
import numpy as np
|
|
import pytest
|
|
from pathlib import Path
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
|
|
class TestSMRFPipeline:
|
|
def test_pipeline_json_valid(self):
|
|
"""create_smrf_pipeline produces valid JSON with expected stages."""
|
|
from lidar_pipeline.dtm import create_smrf_pipeline
|
|
result = create_smrf_pipeline("/data/input/test.laz", "/data/output/test_ground.las")
|
|
pipeline = json.loads(result)
|
|
|
|
assert "pipeline" in pipeline
|
|
stages = pipeline["pipeline"]
|
|
|
|
# Should have: reader, range filter (ReturnNumber), assign, ELM, outlier, SMRF, range filter (Classification), writer
|
|
stage_types = [s.get("type") if isinstance(s, dict) else None for s in stages]
|
|
|
|
# First stage is the filename string (reader)
|
|
assert isinstance(stages[0], str)
|
|
assert "test.laz" in stages[0]
|
|
|
|
# Must contain preprocessing steps
|
|
assert "filters.assign" in stage_types
|
|
assert "filters.elm" in stage_types
|
|
assert "filters.outlier" in stage_types
|
|
|
|
# Must contain SMRF filter
|
|
assert "filters.smrf" in stage_types
|
|
|
|
# Must contain ReturnNumber filter
|
|
range_stages = [s for s in stages if isinstance(s, dict) and s.get("type") == "filters.range"]
|
|
assert len(range_stages) >= 1
|
|
# At least one should filter ReturnNumber
|
|
assert any("ReturnNumber" in str(s.get("limits", "")) for s in range_stages)
|
|
|
|
def test_pipeline_elm_parameters(self):
|
|
"""ELM filter has terrain-adapted parameters."""
|
|
from lidar_pipeline.dtm import create_smrf_pipeline
|
|
result = create_smrf_pipeline("/input/a.laz", "/output/a_ground.las")
|
|
pipeline = json.loads(result)
|
|
|
|
elm_stage = [s for s in pipeline["pipeline"] if isinstance(s, dict) and s.get("type") == "filters.elm"][0]
|
|
assert elm_stage["cell"] == 5.0
|
|
assert elm_stage["threshold"] == 2.0
|
|
|
|
def test_pipeline_outlier_parameters(self):
|
|
"""Outlier filter uses statistical method."""
|
|
from lidar_pipeline.dtm import create_smrf_pipeline
|
|
result = create_smrf_pipeline("/input/a.laz", "/output/a_ground.las")
|
|
pipeline = json.loads(result)
|
|
|
|
outlier_stage = [s for s in pipeline["pipeline"] if isinstance(s, dict) and s.get("type") == "filters.outlier"][0]
|
|
assert outlier_stage["method"] == "statistical"
|
|
assert outlier_stage["mean_k"] == 8
|
|
assert outlier_stage["multiplier"] == 3.0
|
|
|
|
def test_pipeline_output_path(self):
|
|
"""Pipeline output path is set correctly."""
|
|
from lidar_pipeline.dtm import create_smrf_pipeline
|
|
result = create_smrf_pipeline("/input/a.laz", "/output/a_ground.las")
|
|
pipeline = json.loads(result)
|
|
# Last stage should be writer with correct output path
|
|
writer = [s for s in pipeline["pipeline"] if isinstance(s, dict) and s.get("type") == "writers.las"][0]
|
|
assert writer["filename"] == "/output/a_ground.las"
|
|
|
|
|
|
class TestPMFPipeline:
|
|
def test_pipeline_json_valid(self):
|
|
"""create_pmf_pipeline produces valid JSON with PMF filter."""
|
|
from lidar_pipeline.dtm import create_pmf_pipeline
|
|
result = create_pmf_pipeline("/data/input/test.laz", "/data/output/test_ground.las")
|
|
pipeline = json.loads(result)
|
|
|
|
assert "pipeline" in pipeline
|
|
stages = pipeline["pipeline"]
|
|
stage_types = [s.get("type") if isinstance(s, dict) else None for s in stages]
|
|
|
|
# Must contain PMF filter
|
|
assert "filters.pmf" in stage_types
|
|
|
|
# Must contain ReturnNumber filter
|
|
range_stages = [s for s in stages if isinstance(s, dict) and s.get("type") == "filters.range"]
|
|
assert any("ReturnNumber" in str(s.get("limits", "")) for s in range_stages)
|
|
|
|
def test_pmf_parameters(self):
|
|
"""PMF pipeline has expected parameters."""
|
|
from lidar_pipeline.dtm import create_pmf_pipeline
|
|
result = create_pmf_pipeline("/input/a.laz", "/output/a_ground.las")
|
|
pipeline = json.loads(result)
|
|
|
|
pmf_stage = [s for s in pipeline["pipeline"] if isinstance(s, dict) and s.get("type") == "filters.pmf"][0]
|
|
assert pmf_stage["max_window"] == 33
|
|
assert pmf_stage["slope"] == 0.15
|
|
|
|
|
|
class TestCSFPipeline:
|
|
def test_pipeline_json_valid(self):
|
|
"""create_csf_pipeline produces valid JSON with CSF filter."""
|
|
from lidar_pipeline.dtm import create_csf_pipeline
|
|
result = create_csf_pipeline("/data/input/test.laz", "/data/output/test_ground.las")
|
|
pipeline = json.loads(result)
|
|
|
|
assert "pipeline" in pipeline
|
|
stages = pipeline["pipeline"]
|
|
stage_types = [s.get("type") if isinstance(s, dict) else None for s in stages]
|
|
|
|
# Must contain CSF filter
|
|
assert "filters.csf" in stage_types
|
|
|
|
# Must contain ReturnNumber filter
|
|
range_stages = [s for s in stages if isinstance(s, dict) and s.get("type") == "filters.range"]
|
|
assert any("ReturnNumber" in str(s.get("limits", "")) for s in range_stages)
|
|
|
|
def test_csf_parameters(self):
|
|
"""CSF pipeline has expected parameters."""
|
|
from lidar_pipeline.dtm import create_csf_pipeline
|
|
result = create_csf_pipeline("/input/a.laz", "/output/a_ground.las")
|
|
pipeline = json.loads(result)
|
|
|
|
csf_stage = [s for s in pipeline["pipeline"] if isinstance(s, dict) and s.get("type") == "filters.csf"][0]
|
|
assert csf_stage["resolution"] == 0.5
|
|
assert csf_stage["rigidness"] == 3
|
|
assert csf_stage["smooth"] is True
|
|
assert "hdiff" not in csf_stage # hdiff is not a valid PDAL CSF parameter
|
|
|
|
|
|
class TestDetectGroundMethod:
|
|
def _make_mock_las(self, num_returns, z_values):
|
|
"""Create a mock laspy object with specified NumberOfReturns and z."""
|
|
mock_las = MagicMock()
|
|
mock_las.NumberOfReturns = np.array(num_returns)
|
|
mock_las.z = np.array(z_values)
|
|
mock_las.points = MagicMock()
|
|
mock_las.points.__len__ = lambda self: len(num_returns)
|
|
return mock_las
|
|
|
|
@patch('lidar_pipeline.dtm.laspy')
|
|
def test_urban_terrain_returns_pmf(self, mock_laspy):
|
|
"""High single-return ratio (>0.6) should select PMF."""
|
|
from lidar_pipeline.dtm import detect_ground_method
|
|
|
|
# 70% single returns = urban
|
|
n = 10000
|
|
num_returns = np.ones(n, dtype=int)
|
|
num_returns[:int(n * 0.3)] = 2 # 30% multi-return
|
|
z_values = np.random.normal(100, 5, n) # Low variance = flat terrain
|
|
|
|
mock_laspy.read.return_value = self._make_mock_las(num_returns, z_values)
|
|
|
|
result = detect_ground_method(Path("/data/input/test.laz"))
|
|
assert result == 'pmf'
|
|
|
|
@patch('lidar_pipeline.dtm.laspy')
|
|
def test_natural_terrain_returns_smrf(self, mock_laspy):
|
|
"""Low single-return ratio and moderate variance should select SMRF."""
|
|
from lidar_pipeline.dtm import detect_ground_method
|
|
|
|
# 40% single returns, moderate variance
|
|
n = 10000
|
|
num_returns = np.ones(n, dtype=int)
|
|
num_returns[:int(n * 0.6)] = 2 # 60% multi-return (forest)
|
|
z_values = np.random.normal(100, 15, n) # Moderate variance
|
|
|
|
mock_laspy.read.return_value = self._make_mock_las(num_returns, z_values)
|
|
|
|
result = detect_ground_method(Path("/data/input/test.laz"))
|
|
assert result == 'smrf'
|
|
|
|
@patch('lidar_pipeline.dtm.laspy')
|
|
def test_mountainous_terrain_still_defaults_to_smrf(self, mock_laspy):
|
|
"""High variance terrain defaults to SMRF in auto mode (CSF only via --ground-classification csf)."""
|
|
from lidar_pipeline.dtm import detect_ground_method
|
|
|
|
# Moderate single-return ratio but very high height variance
|
|
n = 10000
|
|
num_returns = np.ones(n, dtype=int)
|
|
num_returns[:int(n * 0.5)] = 2
|
|
z_values = np.random.normal(100, 50, n) # Very high variance = mountainous
|
|
|
|
mock_laspy.read.return_value = self._make_mock_las(num_returns, z_values)
|
|
|
|
result = detect_ground_method(Path("/data/input/test.laz"))
|
|
assert result == 'smrf'
|
|
|
|
|
|
class TestClassifyGroundMethod:
|
|
@patch('lidar_pipeline.dtm.subprocess')
|
|
@patch('lidar_pipeline.dtm.laspy')
|
|
def test_classify_ground_auto_calls_detect(self, mock_laspy, mock_subprocess):
|
|
"""classify_ground with method='auto' should call detect_ground_method."""
|
|
from lidar_pipeline.dtm import classify_ground
|
|
|
|
# Mock detect_ground_method to return 'pmf'
|
|
with patch('lidar_pipeline.dtm.detect_ground_method', return_value='pmf') as mock_detect:
|
|
mock_subprocess.run.return_value = MagicMock(returncode=0)
|
|
result = classify_ground(Path("/data/input/test.laz"), Path("/tmp"), method='auto')
|
|
|
|
mock_detect.assert_called_once()
|
|
|
|
@patch('lidar_pipeline.dtm.subprocess')
|
|
@patch('lidar_pipeline.dtm.laspy')
|
|
def test_classify_ground_smrf_uses_smrf_pipeline(self, mock_laspy, mock_subprocess):
|
|
"""classify_ground with method='smrf' should create SMRF pipeline."""
|
|
from lidar_pipeline.dtm import classify_ground, _create_ground_pipeline
|
|
|
|
mock_subprocess.run.return_value = MagicMock(returncode=0)
|
|
|
|
with patch('lidar_pipeline.dtm.detect_ground_method'):
|
|
# Create temp dir
|
|
import tempfile
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
result = classify_ground(Path("/data/input/test.laz"), Path(tmpdir), method='smrf')
|
|
|
|
# Check the pipeline JSON was written with SMRF
|
|
pipeline_file = Path(tmpdir) / "pipeline_smrf.json"
|
|
if pipeline_file.exists():
|
|
pipeline = json.loads(pipeline_file.read_text())
|
|
stage_types = [s.get("type") if isinstance(s, dict) else None for s in pipeline["pipeline"]]
|
|
assert "filters.smrf" in stage_types
|
|
|
|
@patch('lidar_pipeline.dtm.subprocess')
|
|
@patch('lidar_pipeline.dtm.laspy')
|
|
def test_classify_ground_pmf_uses_pmf_pipeline(self, mock_laspy, mock_subprocess):
|
|
"""classify_ground with method='pmf' should create PMF pipeline."""
|
|
from lidar_pipeline.dtm import classify_ground
|
|
|
|
mock_subprocess.run.return_value = MagicMock(returncode=0)
|
|
|
|
with patch('lidar_pipeline.dtm.detect_ground_method'):
|
|
import tempfile
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
result = classify_ground(Path("/data/input/test.laz"), Path(tmpdir), method='pmf')
|
|
|
|
pipeline_file = Path(tmpdir) / "pipeline_pmf.json"
|
|
if pipeline_file.exists():
|
|
pipeline = json.loads(pipeline_file.read_text())
|
|
stage_types = [s.get("type") if isinstance(s, dict) else None for s in pipeline["pipeline"]]
|
|
assert "filters.pmf" in stage_types |