- Positions d'axes fixes (data_left/bottom/width/height_frac) pour alignement pixel-parfait entre terrain et ortho/topo - aspect='equal' au lieu de 'auto' pour conserver les proportions géographiques - Colorbar descriptive pour les visualisations RGB (ortho/topo) - Comblage des petits trous DTM (< 1m) via rasterio.fill.fillnodata - Suppression de la visualisation "dépressions" - Hillshade composite: 0.7*hillshade + 0.3*cos(slope) - D8 flow accumulation accéléré par numba JIT (fallback Python) - Flag --keep-tif pour conserver les TIFF intermédiaires - --force supprime aussi les TIF existants avant régénération - ETA affiché pendant la génération des visualisations - Répertoires temp dans temp/ pour traitement parallèle Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
93 lines
3.0 KiB
Python
93 lines
3.0 KiB
Python
"""Tests for GPU helper module."""
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
|
|
def test_has_gpu_attribute():
|
|
"""HAS_GPU must be a boolean."""
|
|
from lidar_pipeline.gpu import HAS_GPU
|
|
assert isinstance(HAS_GPU, bool)
|
|
|
|
|
|
def test_to_gpu_returns_array():
|
|
"""to_gpu returns a float32 array with correct values."""
|
|
from lidar_pipeline.gpu import to_gpu, to_cpu, HAS_GPU
|
|
arr = np.array([1.0, 2.0, 3.0])
|
|
result = to_gpu(arr)
|
|
# to_gpu converts to float32 to reduce GPU memory usage
|
|
assert result.dtype == np.float32
|
|
# Always bring back to CPU for comparison
|
|
np.testing.assert_array_equal(to_cpu(result), [1.0, 2.0, 3.0])
|
|
|
|
|
|
def test_to_cpu_noop_numpy():
|
|
"""to_cpu on a numpy array is a no-op."""
|
|
from lidar_pipeline.gpu import to_cpu
|
|
arr = np.array([1.0, 2.0])
|
|
result = to_cpu(arr)
|
|
assert result is arr
|
|
|
|
|
|
def test_xp_gaussian_filter():
|
|
"""xp_gaussian_filter blurs a point source correctly."""
|
|
from lidar_pipeline.gpu import xp_gaussian_filter
|
|
arr = np.zeros((50, 50), dtype=np.float64)
|
|
arr[25, 25] = 1.0
|
|
result = xp_gaussian_filter(arr, sigma=3)
|
|
assert result.shape == (50, 50)
|
|
# Center should still be the highest value
|
|
center_val = float(np.asarray(result)[25, 25])
|
|
corner_val = float(np.asarray(result)[0, 0])
|
|
assert center_val > corner_val
|
|
assert center_val > 0.01 # Not all energy is lost
|
|
|
|
|
|
def test_xp_uniform_filter_cpu():
|
|
"""xp_uniform_filter works on CPU arrays."""
|
|
from lidar_pipeline.gpu import xp_uniform_filter
|
|
arr = np.ones((50, 50), dtype=np.float64)
|
|
arr[25, 25] = 100.0
|
|
result = xp_uniform_filter(arr, size=5)
|
|
# Mean should be close to 1 everywhere except near center
|
|
assert result.shape == (50, 50)
|
|
assert result[0, 0] == pytest.approx(1.0, abs=0.01)
|
|
|
|
|
|
def test_xp_minimum_filter_cpu():
|
|
"""xp_minimum_filter works on CPU arrays."""
|
|
from lidar_pipeline.gpu import xp_minimum_filter
|
|
arr = np.ones((50, 50), dtype=np.float64)
|
|
arr[25, 25] = 0.0
|
|
result = xp_minimum_filter(arr, size=3)
|
|
assert result.shape == (50, 50)
|
|
# Around the minimum, values should be 0
|
|
assert result[25, 25] == 0.0
|
|
assert result[24, 25] == 0.0
|
|
|
|
|
|
def test_log_gpu_status(caplog):
|
|
"""log_gpu_status emits a log message."""
|
|
import logging
|
|
from lidar_pipeline.gpu import log_gpu_status
|
|
with caplog.at_level(logging.INFO, logger="lidar"):
|
|
log_gpu_status()
|
|
assert any("GPU" in r.message or "CPU" in r.message for r in caplog.records)
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
not pytest.importorskip("cupy", reason="CuPy not available"),
|
|
reason="Requires GPU + CuPy"
|
|
)
|
|
def test_to_gpu_roundtrip():
|
|
"""to_gpu -> to_cpu preserves data when GPU is available."""
|
|
import cupy as cp
|
|
from lidar_pipeline.gpu import to_gpu, to_cpu, HAS_GPU
|
|
if not HAS_GPU:
|
|
pytest.skip("No GPU available")
|
|
arr = np.array([1.0, 2.0, 3.0], dtype=np.float32)
|
|
gpu_arr = to_gpu(arr)
|
|
assert isinstance(gpu_arr, cp.ndarray)
|
|
result = to_cpu(gpu_arr)
|
|
assert isinstance(result, np.ndarray)
|
|
np.testing.assert_array_almost_equal(result, [1.0, 2.0, 3.0]) |