11 Commits
ok1 ... v1.0.1

Author SHA1 Message Date
02218b2cfc Upgrade PDAL to 2.10 via conda-forge, add COPC v1.1 support
- Dockerfile: install PDAL 2.10.1 from conda-forge (was 2.3 from apt)
  Ubuntu 22.04's PDAL 2.3 cannot read COPC v1.1 files from IGN LiDAR HD
- dtm.py: add _read_with_pdal() fallback for COPC files that laspy can't read
- dtm.py: validate_laz() now tries PDAL when laspy fails
- dtm.py: create_dtm_fast() and detect_ground_method() use PDAL fallback
- ign.py: auto-retry at lower zoom on 404 errors
- pipeline.py: check DTM resolution mismatch and regenerate if needed
- pipeline.py: propagate actual DTM resolution to visualizations
- pipeline.py: add --init to docker run for proper Ctrl+C signal handling
- Remove RRIM and Multi-Hillshade RGB visualizations

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-14 19:01:05 +02:00
7ac08f75dc Add LAZ integrity check to skip corrupted files early
Validate file readability before PDAL classification. Corrupted/truncated
files are detected instantly via laspy header read and skipped with a clear
error message pointing to re-download, instead of wasting time on PDAL
and repair attempts that will fail anyway.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-14 17:59:32 +02:00
f988ddb76d Auto-retry IGN tiles at lower zoom on 404
When zoom 20 tiles are unavailable (rural areas), fall back to zoom 19, 18,
etc. down to 15. Breaks out immediately on first-tile 404 to avoid wasting
requests at unsupported zoom levels.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-14 02:21:47 +02:00
e2bd6b2536 Remove RRIM and Multi-Hillshade RGB, fix DTM resolution reuse bug, add --init to docker run
- Remove generate_rrim, generate_multi_hillshade, _compute_openness_both
- Remove corresponding VIZ_STEPS entries, COLORMAPS, RGB_LEGENDS, and tests
- Fix DTM resolution mismatch: existing DTM at different resolution is now
  regenerated instead of silently reused
- Propagate actual DTM resolution to visualizations and rendering
- Add --init to docker run commands for proper signal handling on Ctrl+C
- Add .playwright-mcp/ to .gitignore

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-14 02:19:42 +02:00
bf17ca4662 Remove ETA display from visualization progress logs 2026-05-14 01:18:52 +02:00
08dbc73869 Fix lambda signatures for rrim and multi_hillshade to accept shared kwarg
Same issue as pos_open/neg_open — lambdas in VIZ_STEPS need shared=None
parameter since the pipeline passes shared=shared to all visualization functions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-14 01:07:48 +02:00
8c2065801b Add unit tests for RRIM, Multi-Hillshade RGB, and Local Dominance
TestRRIM: TIF generation, 3-band RGB output, uint8 range, no NaN
TestMultiHillshade: TIF generation, 3-band RGB output, uint8 range
TestLocalDominance: TIF generation, values in [0,1], NaN mask preservation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-14 01:05:55 +02:00
7f6b816ed6 Add RRIM, Multi-Hillshade RGB, and Local Dominance visualizations
Three new visualizations complementing existing SVF/openness/LRM/MSRM:

- RRIM (Red Relief Image Map): RGB composite combining positive openness
  (R), inverted slope (G), negative openness (B). Uses ray-tracing
  to compute both openness values in a single pass.

- Multi-Hillshade RGB: 3 azimuths (315°, 135°, 45°) mapped to R/G/B
  channels with slope blending. Color reveals structure orientation.

- Local Dominance: (dem - local_min) / (local_max - local_min) using
  min/max filters. Measures local height position — complements openness.

Also adds:
- _compute_openness_both() helper for shared ray-tracing (used by RRIM)
- xp_maximum_filter() in gpu.py (GPU/CPU abstraction)
- Entries in COLORMAPS, RGB_LEGENDS, VIZ_STEPS, and is_rgb detection
- All NaN handling follows existing patterns (nan_mask restoration)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-14 01:03:47 +02:00
1cf8e1752f Remove PMF, fix NaN in gradient visualizations, fix pos_open/neg_open shared param
- Remove PMF from ground classification options (PDAL recommends SMRF over PMF)
- Auto-detection now uses CSF for urban/complex terrain instead of PMF
- Add z_std > 30m heuristic to auto-select CSF for complex terrain
- Fix pos_open/neg_open lambda missing 'shared' parameter (NameError in workers)
- Fix NaN mask not restored in hillshade, slope, aspect, curvature
  (gradient-based products computed on filled DEM lost NaN transparency)
- Add nan_mask parameter to _save_tif for centralized NaN restoration
- DTM TIF kept by default (no longer deleted after WebP conversion)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-14 00:50:45 +02:00
eac482874d Fix bugs and improve pipeline flexibility
- Fix gpu_cleanup import missing in visualizations.py (NameError in workers)
- Fix t_pdf referenced before assignment when PDF is skipped
- Skip classification+DTM when DTM exists regardless of --force
- --force now only regenerates WebP/PDF, not classification/DTM
- --force-classification forces reclassification when needed
- Add laspy repair fallback for corrupt LAZ files (EVLR errors)
- Keep DTM TIF by default for reuse (--no-keep-tif to delete)
- Increase space between image and bottom cartouche (0.12→0.19)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-14 00:08:25 +02:00
5b74322077 Skip ground classification when DTM already exists
If the DTM .tif exists and --force is not set, skip both ground
classification and DTM generation entirely. Previously, the pipeline
would spend 3+ minutes reclassifying ground even when the DTM was
already present and would be reused anyway.

Also includes: SharedDEM cache, enhanced WebP cartouche (compass rose,
adaptive scale bar, enriched info bar), removed COG/viewer, UTF-8
fix for parallel workers, skip logic for DTM and PDF.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-13 23:41:21 +02:00
14 changed files with 1018 additions and 1076 deletions

1
.gitignore vendored
View File

@ -45,3 +45,4 @@ htmlcov/
# Éventuels fichiers de cache matplotlib
matplotlibrc
.playwright-mcp/

View File

@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
## Project Overview
LiDAR archaeological processing pipeline that generates 17 terrain visualizations from LAZ/LAS point clouds. Runs in Docker with optional NVIDIA GPU acceleration (CuPy). Designed for French LiDAR HD data in Lambert 93 (EPSG:2154).
LiDAR archaeological processing pipeline that generates 20 terrain visualizations from LAZ/LAS point clouds. Runs in Docker with optional NVIDIA GPU acceleration (CuPy). Designed for French LiDAR HD data in Lambert 93 (EPSG:2154).
## Commands
@ -16,10 +16,8 @@ All commands run inside Docker. Use `./run.sh` as the primary interface.
./run.sh -g -r 0.2 # High resolution (0.2m/px)
./run.sh --test # Run unit tests
./run.sh -g --file LHD_FXX_1000_6882_PTS_LAMB93_IGN69.copc # Single file
./run.sh --ground-classification pmf # Force PMF ground classification
./run.sh -g --keep-tif # Keep intermediate TIFF files
./run.sh -g --no-viewer # Skip web viewer generation
./run.sh serve # Start web map server
./run.sh --ground-classification csf # Force CSF ground classification (complex terrain)
./run.sh -g --keep-tif # Keep TIFF files (allows WebP regeneration without recalculating DTM)
./run.sh # Print help (no args)
```
@ -34,34 +32,46 @@ docker run --rm --gpus all -v $(pwd)/input:/data/input:ro -v $(pwd)/output:/data
### Module responsibilities
- **`cli.py`** — argparse + logging setup. Entry point via `python -m lidar_pipeline`.
- **`pipeline.py`** — `LidarArchaeoPipeline` orchestrator. `VIZ_STEPS` registry maps names to generate functions. `FilePrefixFilter` for parallel logging.
- **`dtm.py`** — PDAL ground classification (SMRF/PMF/CSF + auto-detection) and DTM generation via scipy `binned_statistic_2d`.
- **`visualizations.py`** — 15 `generate_*` functions + 2 IGN overlay lambdas. All take `(dem_file, basename, vis_dir, resolution)` and return a TIF path or None.
- **`pipeline.py`** — `LidarArchaeoPipeline` orchestrator. `VIZ_STEPS` registry maps names to generate functions. `FilePrefixFilter` for parallel logging. Creates `SharedDEM` once per file and passes it to all visualizations.
- **`dtm.py`** — PDAL ground classification (SMRF/CSF + auto-detection) and DTM generation via scipy `binned_statistic_2d`.
- **`visualizations.py`** — 15 `generate_*` functions + 2 IGN overlay lambdas. All take `(dem_file, basename, vis_dir, resolution, shared=None)` and return a TIF path or None. `SharedDEM` class pre-computes gradient, NaN mask, LRM to avoid redundant I/O and computation.
- **`gpu.py`** — CuPy/numpy abstraction: `HAS_GPU`, `to_gpu()`, `to_cpu()`, `xp_gaussian_filter()`, `xp_uniform_filter()`, `xp_minimum_filter()`, `gpu_cleanup()`. Falls back to CPU gracefully.
- **`ign.py`** — IGN WMTS tile download + overlay generation for orthophoto and topographic maps.
- **`rendering.py`** — `COLORMAPS` dict maps filename keywords to (cmap, title, legend, description). `tif_to_png()` converts TIF→WebP with legend/scale/north arrow. `convert_to_cog()` converts TIF→Cloud Optimized GeoTIFF. `generate_cog_metadata()` creates metadata JSON for web viewer. `generate_pdf_report()` creates A3 PDF.
- **`viewer.py`** — Generates MapLibre GL JS HTML viewer with layer controls, opacity sliders, and IGN/OSM basemaps.
- **`server.py`** — TiTiler-based Starlette server for serving COG tiles and viewer HTML. Entry point via `python -m lidar_pipeline.server`.
- **`rendering.py`** — `COLORMAPS` dict maps filename keywords to (cmap, title, legend, description). `tif_to_png()` converts TIF→WebP with legend/scale/north arrow. `generate_pdf_report()` creates A3 PDF.
### SharedDEM optimization
`SharedDEM` pre-computes once per file:
- DEM data (single I/O read)
- NaN mask + filled DEM (single `_fill_nans` call, avoiding ~20 redundant calls)
- Gradient components (dy, dx, slope, aspect) shared by hillshade, slope, aspect, curvature
- LRM at 15m kernel (shared by lrm + anomalies)
`_filter_nanaware_from_filled()` applies filters on the pre-filled DEM, skipping the expensive `_fill_nans` interpolation.
### Adding a visualization
Three places must be updated:
1. `visualizations.py` — add `generate_X(dem_file, basename, vis_dir, resolution)` function
1. `visualizations.py` — add `generate_X(dem_file, basename, vis_dir, resolution, shared=None)` function
2. `pipeline.py` `VIZ_STEPS` — add `('name', generate_X)` entry
3. `rendering.py` `COLORMAPS` — add entry keyed by the output filename keyword
### Ground classification
Auto-detection in `dtm.py` `detect_ground_method()`:
- Single-return ratio > 0.6 → PMF (urban terrain)
- Single-return ratio > 0.6 → CSF (urban terrain, cloth simulation)
- Height std > 30m → CSF (complex/mountainous terrain)
- Default → SMRF (natural terrain)
Override with `--ground-classification {auto,smrf,pmf,csf}`.
Override with `--ground-classification {auto,smrf,csf}`.
### NaN handling
DTM small gaps (< 1m from existing data) are filled using `rasterio.fill.fillnodata`. Large gaps remain as NaN. Visualization functions use `_fill_nans()` and `_filter_nanaware()` to avoid NaN propagation through filters.
DTM small gaps (< 1m from existing data) are filled using `rasterio.fill.fillnodata`. Large gaps remain as NaN. `SharedDEM` fills NaN once; `_filter_nanaware_from_filled()` applies filters on the pre-filled array and restores the NaN mask.
### Flow accumulation
Uses priority-flood algorithm (Wang & Liu 2006) for sink filling, which is O(n log n) instead of iterative minimum_filter. D8 accumulation uses numba JIT; falls back to pure Python if numba unavailable.
### Parallel processing
@ -72,7 +82,6 @@ Uses `ProcessPoolExecutor` with `'spawn'` start method (required for CUDA). Each
- **Language**: UI messages and comments in French. Code identifiers in English.
- **Logging**: Use `logger = logging.getLogger("lidar")`. Prefix per-file logs via `_file_filter.basename`.
- **GPU pattern**: `arr_gpu = to_gpu(arr)` compute `result = to_cpu(arr_gpu)` `gpu_cleanup()` between visualizations.
- **Output format**: Visualizations saved as WebP (not PNG). TIFF intermediates deleted unless `--keep-tif` or viewer enabled. COGs generated for web viewer by default. PDF reports use `PILImage.open().convert('RGB')`.
- **Web viewer**: MapLibre GL JS + TiTiler. COGs served as raster tiles. `./run.sh serve` starts server on port 8000.
- **Flow accumulation**: Uses numba JIT for D8 accumulation loop. Falls back to pure Python if numba unavailable.
- **Output format**: Visualizations saved as WebP. TIFF intermediates deleted by default. Use `--keep-tif` to keep DTM+TIF for WebP regeneration with `--force`. No COGs or viewer.
- **Compression**: TIF intermediates use `deflate` compression (faster than LZW for float32 data).
- **Tests**: Run only inside Docker via `./run.sh --test`. Synthetic DEM fixture in `tests/conftest.py`.

View File

@ -3,17 +3,21 @@ FROM nvidia/cuda:12.4.0-devel-ubuntu22.04
ENV DEBIAN_FRONTEND=noninteractive
ENV TZ=Europe/Paris
# Install PDAL and system packages
# Install system packages + Miniforge for PDAL >= 2.5 (Ubuntu 22.04 ships PDAL 2.3 which can't read COPC v1.1)
RUN apt-get update && apt-get install -y --no-install-recommends \
pdal \
liblaszip8 \
gdal-bin \
python3-gdal \
python3-pip \
python3-dev \
build-essential \
wget \
&& rm -rf /var/lib/apt/lists/*
&& rm -rf /var/lib/apt/lists/* \
&& wget -q https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-Linux-x86_64.sh -O /tmp/miniforge.sh \
&& bash /tmp/miniforge.sh -b -p /opt/conda \
&& rm /tmp/miniforge.sh \
&& /opt/conda/bin/conda install -y -c conda-forge pdal \
&& ln -sf /opt/conda/bin/pdal /usr/local/bin/pdal \
&& /opt/conda/bin/conda clean -afy
WORKDIR /app

View File

@ -23,6 +23,12 @@ def setup_logging(verbose=False, debug=False):
verbose: If True, include timestamps and level names.
debug: If True, set level to DEBUG and add file:line info.
"""
# Ensure UTF-8 output for French messages (é, è, ê, etc.)
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
if debug:
level = logging.DEBUG
fmt = "%(asctime)s.%(msecs)03d %(levelname)-5s [%(filename)s:%(lineno)d] %(message)s"
@ -114,18 +120,13 @@ Exemples:
parser.add_argument(
"--keep-tif",
action="store_true",
help="Conserver les fichiers TIFF intermédiaires (sinon supprimés après conversion WebP)"
)
parser.add_argument(
"--no-viewer",
action="store_true",
help="Ne pas générer le viewer web (COGs + HTML MapLibre)"
help="Conserver les fichiers TIFF (DTM + visualisations) pour pouvoir régénérer les WebP sans recalculer"
)
parser.add_argument(
"--ground-classification",
choices=["auto", "smrf", "pmf", "csf"],
choices=["auto", "smrf", "csf"],
default="auto",
help="Méthode de classification du sol : auto (détection), smrf, pmf, csf (défaut: auto)"
help="Méthode de classification du sol : auto (détection), smrf, csf (défaut: auto)"
)
parser.add_argument(
"--file",
@ -176,7 +177,6 @@ Exemples:
ground_method=args.ground_classification,
force_classify=args.force_classification,
keep_tif=args.keep_tif,
no_viewer=args.no_viewer
)
# If --file is specified, process only matching files

View File

@ -1,6 +1,6 @@
"""DTM generation from classified LiDAR point clouds.
Handles ground classification via PDAL/SMRF and DTM rasterisation
Handles ground classification via PDAL (SMRF or CSF) and DTM rasterisation
using scipy binned_statistic_2d. Zones without LiDAR data remain as NaN.
"""
@ -27,13 +27,13 @@ def _create_ground_pipeline(input_laz, output_las, method):
1. Reset Classification to 0
2. ELM (Extended Local Minimum) — mark low outliers as noise (Classification=7)
3. Statistical outlier removal
4. Ground classification (SMRF/PMF/CSF)
4. Ground classification (SMRF or CSF)
5. Extract ground points (Classification=2)
Args:
input_laz: Path to input LAZ/LAS file.
output_las: Path to output classified LAS file.
method: Ground classification method ('smrf', 'pmf', or 'csf').
method: Ground classification method ('smrf' or 'csf').
Returns:
JSON string of the PDAL pipeline.
@ -84,15 +84,6 @@ def _create_ground_pipeline(input_laz, output_las, method):
"threshold": 0.5,
"scalar": 1.25
}
elif method == 'pmf':
ground_step = {
"type": "filters.pmf",
"max_window": 33,
"slope": 0.15,
"max_distance": 2.5,
"initial_distance": 0.15,
"cell_size": 1.0
}
elif method == 'csf':
ground_step = {
"type": "filters.csf",
@ -128,22 +119,100 @@ def create_smrf_pipeline(input_laz, output_las):
return _create_ground_pipeline(input_laz, output_las, 'smrf')
def create_pmf_pipeline(input_laz, output_las):
"""Create a PDAL pipeline JSON for PMF ground classification."""
return _create_ground_pipeline(input_laz, output_las, 'pmf')
def create_csf_pipeline(input_laz, output_las):
"""Create a PDAL pipeline JSON for CSF ground classification."""
return _create_ground_pipeline(input_laz, output_las, 'csf')
def validate_laz(laz_file):
"""Quick integrity check for a LAZ/LAS file.
Tries laspy first (fast header read), then PDAL as fallback for COPC files
that laspy cannot read.
Returns:
True if file is readable, False otherwise.
"""
# Try laspy first (fast)
import laspy
try:
with laspy.open(str(laz_file)) as f:
header = f.header
_ = header.point_count
return True
except Exception:
pass
# Fallback: try PDAL (handles COPC v1.1 that laspy can't read)
import subprocess
try:
result = subprocess.run(
["pdal", "info", str(laz_file), "--summary"],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
return True
logger.error(f" ✗ Fichier illisible: {laz_file.name}")
logger.error(f" PDAL: {result.stderr.strip()[:200]}")
except (subprocess.TimeoutExpired, FileNotFoundError):
logger.error(f" ✗ Impossible de vérifier le fichier: {laz_file.name}")
logger.error(f" → Re-télécharger depuis https://ign.fr/lidar-hd")
return False
def _read_with_pdal(laz_file):
"""Read a LAZ/LAS file via PDAL when laspy fails (e.g. COPC v1.1).
Returns a laspy.LasData object, or None on failure.
"""
import subprocess
import tempfile
import os
try:
# Convert COPC to LAS via PDAL, then read with laspy
with tempfile.NamedTemporaryFile(suffix='.las', delete=False) as tmp:
tmp_path = tmp.name
pipeline = json.dumps({
"pipeline": [
str(laz_file),
{"type": "writers.las", "filename": tmp_path}
]
})
result = subprocess.run(
["pdal", "pipeline", "--stdin"],
input=pipeline, capture_output=True, text=True, timeout=300
)
if result.returncode != 0:
logger.warning(f" PDAL conversion échouée: {result.stderr[:200]}")
try:
os.unlink(tmp_path)
except Exception:
pass
return None
import laspy
las = laspy.read(tmp_path)
try:
os.unlink(tmp_path)
except Exception:
pass
return las
except Exception as e:
logger.warning(f" PDAL fallback échoué: {e}")
return None
def detect_ground_method(laz_file):
"""Detect the best ground classification method based on point cloud statistics.
Auto-selects between SMRF (natural terrain) and PMF (urban) only.
CSF is available only via --ground-classification csf (slow but robust
on complex terrain).
Auto-selects between SMRF and CSF:
- SMRF: fast, robust for most natural terrain (PDAL recommended default)
- CSF: cloth simulation, better for complex/urban terrain
Falls back to SMRF if the file cannot be read or attributes are missing.
@ -151,14 +220,20 @@ def detect_ground_method(laz_file):
laz_file: Path to input LAZ/LAS file.
Returns:
String: 'smrf', 'pmf', or 'csf'
String: 'smrf' or 'csf'
"""
import laspy
# Try laspy first, then PDAL for COPC files
las = None
try:
las = laspy.read(str(laz_file))
except Exception as e:
logger.warning(f" Impossible de lire le nuage pour auto-détection: {e}")
logger.warning(f" laspy: {e}")
logger.info(f" → Lecture via PDAL pour auto-détection...")
las = _read_with_pdal(laz_file)
if las is None:
logger.info(f" → Méthode: SMRF (défaut — lecture impossible)")
return 'smrf'
@ -182,13 +257,16 @@ def detect_ground_method(laz_file):
f"ratio_retours_uniques={single_return_ratio:.2f}, "
f"écart_Z={z_std:.1f}m, amplitude_Z={z_range:.1f}m")
# Decision logic (auto selects between SMRF and PMF only):
# - High single-return ratio (>0.6) → urban (buildings, roads) → PMF
# Decision logic:
# - High single-return ratio (>0.6) → urban (buildings, roads) → CSF (cloth simulation)
# - High elevation variance (>30m) → complex/mountainous terrain → CSF
# - Default → SMRF (fast, robust for most natural terrain)
# Note: CSF is available only via --ground-classification csf (slow but robust on complex terrain)
if single_return_ratio > 0.6:
method = 'pmf'
method = 'csf'
reason = f"ratio retours uniques={single_return_ratio:.2f} > 0.6 → milieu urbain"
elif z_std > 30:
method = 'csf'
reason = f"écart_Z={z_std:.1f}m > 30m → terrain complexe"
else:
method = 'smrf'
reason = f"terrain naturel standard"
@ -203,7 +281,7 @@ def classify_ground(laz_file, temp_dir, method='auto', force=False):
Args:
laz_file: Path to input LAZ/LAS file.
temp_dir: Directory for temporary files (pipeline.json, ground.las).
method: Ground classification method ('auto', 'smrf', 'pmf', or 'csf').
method: Ground classification method ('auto', 'smrf', or 'csf').
force: If True, reclassify even if output file already exists.
Returns:
@ -246,11 +324,58 @@ def classify_ground(laz_file, temp_dir, method='auto', force=False):
logger.info(f" ✓ Classification sol {method.upper()} terminée")
return output_las
except subprocess.CalledProcessError as e:
logger.error(f" ✗ Erreur classification PDAL ({method.upper()}): {e.stderr.decode()}")
error_msg = e.stderr.decode() if e.stderr else str(e)
logger.warning(f" ✗ Erreur classification PDAL ({method.upper()}): {error_msg}")
# Try repairing file with laspy if PDAL fails on EVLR/VLR
if 'VLR' in error_msg or 'Invalid' in error_msg:
logger.info(f" → Tentative de réparation du fichier avec laspy...")
repaired_las = temp_dir / f"{laz_base}_repaired.las"
if _repair_laz_with_laspy(laz_file, repaired_las):
# Retry PDAL pipeline with repaired file
pipeline_json = _create_ground_pipeline(repaired_las, output_las, method)
with open(pipeline_file, 'w') as f:
f.write(pipeline_json)
try:
subprocess.run(
["pdal", "pipeline", str(pipeline_file)],
capture_output=True, check=True
)
logger.info(f" ✓ Classification sol {method.upper()} terminée (fichier réparé)")
return output_las
except subprocess.CalledProcessError as e2:
error_msg2 = e2.stderr.decode() if e2.stderr else str(e2)
logger.error(f" ✗ Échec classification même après réparation: {error_msg2}")
else:
logger.error(f" ✗ Impossible de réparer le fichier")
return None
def create_dtm_fast(las_file, basename, dtm_dir, resolution):
def _repair_laz_with_laspy(input_laz, output_las):
"""Try to repair a corrupt LAZ file by re-reading with laspy and saving as LAS.
Works around PDAL errors like 'Invalid Extended VLR size' by stripping
problematic VLR/EVLR metadata during re-save.
Args:
input_laz: Path to corrupt LAZ/LAS file.
output_las: Path for repaired LAS output.
Returns:
True if repair succeeded, False otherwise.
"""
import laspy
try:
las = laspy.read(str(input_laz))
las.write(str(output_las))
logger.info(f" ✓ Fichier réparé via laspy ({len(las.points):,} points)")
return True
except Exception as e:
logger.warning(f" ✗ Réparation laspy échouée: {e}")
return False
def create_dtm_fast(las_file, basename, dtm_dir, resolution, force=False):
"""Create DTM using fast binning method with gap filling.
Args:
@ -258,16 +383,33 @@ def create_dtm_fast(las_file, basename, dtm_dir, resolution):
basename: Base name for output file.
dtm_dir: Directory for output DTM GeoTIFF.
resolution: Grid resolution in meters per pixel.
force: If True, regenerate even if DTM already exists.
Returns:
Path to output DTM GeoTIFF, or None on failure.
"""
output_tif = dtm_dir / f"{basename}_dtm.tif"
if output_tif.exists() and not force:
logger.info(f" DTM déjà existant — fichier réutilisé: {output_tif.name}")
return output_tif
import laspy
logger.info(" → Génération DTM...")
try:
las = laspy.read(str(las_file))
except Exception as e:
# laspy can't read COPC v1.1 — try PDAL conversion
logger.warning(f" laspy: {e}")
logger.info(f" → Conversion via PDAL pour lecture COPC...")
las = _read_with_pdal(las_file)
if las is None:
logger.error(f" ✗ Impossible de lire {las_file.name}")
return None
try:
min_x, max_x = float(las.header.min[0]), float(las.header.max[0])
min_y, max_y = float(las.header.min[1]), float(las.header.max[1])

View File

@ -110,6 +110,16 @@ def xp_minimum_filter(arr, footprint=None, size=None):
return ndimage.minimum_filter(arr, footprint=footprint, size=size)
def xp_maximum_filter(arr, footprint=None, size=None):
"""Maximum filter — uses GPU if array is on GPU, CPU otherwise."""
if _cp is not None and isinstance(arr, _cp.ndarray):
try:
return _cp_ndimage.maximum_filter(arr, footprint=footprint, size=size)
except Exception:
arr = to_cpu(arr)
return ndimage.maximum_filter(arr, footprint=footprint, size=size)
def gpu_cleanup():
"""Free GPU memory. Call between visualizations to prevent OOM."""
if _cp is not None:

View File

@ -76,6 +76,8 @@ def _lat_lon_to_px(lat, lon, zoom, tile_size=256):
def download_ign_tiles(min_x, max_x, min_y, max_y, layer, zoom_level=15):
"""Download IGN WMTS tiles for the given bounds using Web Mercator (PM).
If the first tile returns 404, automatically retries at lower zoom levels.
Args:
min_x, max_x, min_y, max_y: Bounds in Lambert 93.
layer: IGN WMTS layer name.
@ -106,76 +108,100 @@ def download_ign_tiles(min_x, max_x, min_y, max_y, layer, zoom_level=15):
tile_matrix_set = "PM"
tile_size = 256
col_min, row_min = _lat_lon_to_tile(nw_lat, nw_lon, zoom_level)
col_max, row_max = _lat_lon_to_tile(se_lat, se_lon, zoom_level)
# Try downloading at the requested zoom level; fall back to lower zooms on 404
min_zoom = 15
for zoom in range(zoom_level, min_zoom - 1, -1):
col_min, row_min = _lat_lon_to_tile(nw_lat, nw_lon, zoom)
col_max, row_max = _lat_lon_to_tile(se_lat, se_lon, zoom)
nw_px_x, nw_px_y = _lat_lon_to_px(nw_lat, nw_lon, zoom_level)
se_px_x, se_px_y = _lat_lon_to_px(se_lat, se_lon, zoom_level)
nw_px_x, nw_px_y = _lat_lon_to_px(nw_lat, nw_lon, zoom)
se_px_x, se_px_y = _lat_lon_to_px(se_lat, se_lon, zoom)
out_width = int(se_px_x - nw_px_x)
out_height = int(se_px_y - nw_px_y)
out_width = int(se_px_x - nw_px_x)
out_height = int(se_px_y - nw_px_y)
if out_width <= 0 or out_height <= 0 or out_width > 10000 or out_height > 10000:
logger.warning(f" Image IGN trop grande: {out_width}x{out_height}px — abandon")
return None
if out_width <= 0 or out_height <= 0 or out_width > 10000 or out_height > 10000:
logger.warning(f" Image IGN trop grande: {out_width}x{out_height}px — zoom {zoom} abandon")
continue
total_tiles = (col_max - col_min + 1) * (row_max - row_min + 1)
logger.info(f" Zoom {zoom_level}: {total_tiles} tuiles à télécharger ({out_width}x{out_height}px)")
total_tiles = (col_max - col_min + 1) * (row_max - row_min + 1)
if zoom < zoom_level:
logger.info(f" ↓ Retry zoom {zoom_level}{zoom}: {total_tiles} tuiles ({out_width}x{out_height}px)")
else:
logger.info(f" Zoom {zoom}: {total_tiles} tuiles à télécharger ({out_width}x{out_height}px)")
composite = np.full((out_height, out_width, 3), 255, dtype=np.uint8)
composite = np.full((out_height, out_width, 3), 255, dtype=np.uint8)
tiles_downloaded = 0
fmt = "image/png" if 'PLAN' in layer else "image/jpeg"
tiles_downloaded = 0
tiles_404 = 0
fmt = "image/png" if 'PLAN' in layer else "image/jpeg"
for col in range(col_min, col_max + 1):
for row in range(row_min, row_max + 1):
url = (
f"{wmts_url}?SERVICE=WMTS&VERSION=1.0.0&REQUEST=GetTile"
f"&LAYER={layer}&STYLE=normal"
f"&TILEMATRIXSET={tile_matrix_set}"
f"&TILEMATRIX={zoom_level}&TILECOL={col}&TILEROW={row}"
f"&FORMAT={fmt}"
)
for col in range(col_min, col_max + 1):
for row in range(row_min, row_max + 1):
url = (
f"{wmts_url}?SERVICE=WMTS&VERSION=1.0.0&REQUEST=GetTile"
f"&LAYER={layer}&STYLE=normal"
f"&TILEMATRIXSET={tile_matrix_set}"
f"&TILEMATRIX={zoom}&TILECOL={col}&TILEROW={row}"
f"&FORMAT={fmt}"
)
try:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=10) as response:
tile_data = response.read()
tile_img = PILImage.open(io.BytesIO(tile_data)).convert('RGB')
tile_arr = np.array(tile_img)
try:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=10) as response:
tile_data = response.read()
tile_img = PILImage.open(io.BytesIO(tile_data)).convert('RGB')
tile_arr = np.array(tile_img)
tile_origin_x = col * tile_size
tile_origin_y = row * tile_size
tile_origin_x = col * tile_size
tile_origin_y = row * tile_size
px_x = int(tile_origin_x - nw_px_x)
px_y = int(tile_origin_y - nw_px_y)
px_x = int(tile_origin_x - nw_px_x)
px_y = int(tile_origin_y - nw_px_y)
x_off = max(0, -px_x)
y_off = max(0, -px_y)
dst_x_start = max(0, px_x)
dst_y_start = max(0, px_y)
dst_x_end = min(out_width, px_x + tile_size)
dst_y_end = min(out_height, px_y + tile_size)
x_off = max(0, -px_x)
y_off = max(0, -px_y)
dst_x_start = max(0, px_x)
dst_y_start = max(0, px_y)
dst_x_end = min(out_width, px_x + tile_size)
dst_y_end = min(out_height, px_y + tile_size)
src_x = x_off
src_y = y_off
src_w = dst_x_end - dst_x_start
src_h = dst_y_end - dst_y_start
src_x = x_off
src_y = y_off
src_w = dst_x_end - dst_x_start
src_h = dst_y_end - dst_y_start
if src_w > 0 and src_h > 0 and tile_arr.shape[0] >= src_y + src_h and tile_arr.shape[1] >= src_x + src_w:
composite[dst_y_start:dst_y_end, dst_x_start:dst_x_end] = \
tile_arr[src_y:src_y+src_h, src_x:src_x+src_w]
tiles_downloaded += 1
if src_w > 0 and src_h > 0 and tile_arr.shape[0] >= src_y + src_h and tile_arr.shape[1] >= src_x + src_w:
composite[dst_y_start:dst_y_end, dst_x_start:dst_x_end] = \
tile_arr[src_y:src_y+src_h, src_x:src_x+src_w]
tiles_downloaded += 1
except Exception as e:
if tiles_downloaded == 0 and col == col_min and row == row_min:
logger.error(f" ✗ Erreur tuile IGN: {e}")
except urllib.error.HTTPError as e:
if e.code == 404:
tiles_404 += 1
# If the very first tile is 404, this zoom level is unavailable
if col == col_min and row == row_min:
logger.info(f" Zoom {zoom} non disponible (404) — essai zoom inférieur")
break
continue
except Exception:
continue
else:
continue
# Only reach here if inner loop broke (first tile 404)
break
logger.info(f"{tiles_downloaded} tuiles IGN téléchargées ({layer})")
if tiles_downloaded == 0:
return None
return composite
if tiles_404 > 0 and tiles_downloaded == 0:
# No tiles at this zoom, try lower
continue
logger.info(f"{tiles_downloaded} tuiles IGN téléchargées ({layer})")
if tiles_downloaded == 0:
continue
return composite
logger.error(" ✗ Aucun zoom disponible pour cette zone")
return None
def generate_ign_overlay(dem_file, basename, vis_dir, resolution, layer, title, legend_label, description, out_suffix):

View File

@ -12,6 +12,7 @@ import multiprocessing
import shutil
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
import subprocess
@ -55,16 +56,16 @@ _file_filter = FilePrefixFilter()
from .dtm import classify_ground, create_dtm_fast
from .visualizations import (
SharedDEM,
generate_hillshade, generate_slope, generate_aspect, generate_curvature,
generate_lrm, generate_svf, generate_openness,
generate_mslrm, generate_tpi, generate_sailore,
generate_roughness, generate_anomalies, generate_wavelet,
generate_flow,
generate_flow, generate_local_dominance,
)
from .gpu import gpu_cleanup
from .ign import generate_ign_overlay
from .rendering import tif_to_png, generate_pdf_report, convert_to_cog, generate_cog_metadata
from .viewer import generate_viewer
from .rendering import tif_to_png, generate_pdf_report
# Ordered list of visualization steps.
@ -77,8 +78,8 @@ VIZ_STEPS = [
('curvature', generate_curvature),
('svf', generate_svf),
('lrm', generate_lrm),
('pos_open', lambda d, b, v, r: generate_openness(d, b, v, r, positive=True)),
('neg_open', lambda d, b, v, r: generate_openness(d, b, v, r, positive=False)),
('pos_open', lambda d, b, v, r, shared=None: generate_openness(d, b, v, r, positive=True, shared=shared)),
('neg_open', lambda d, b, v, r, shared=None: generate_openness(d, b, v, r, positive=False, shared=shared)),
('mslrm', generate_mslrm),
('tpi', generate_tpi),
('sailore', generate_sailore),
@ -86,6 +87,7 @@ VIZ_STEPS = [
('anomalies', generate_anomalies),
('wavelet', generate_wavelet),
('flow', generate_flow),
('local_dominance', generate_local_dominance),
('ortho', lambda d, b, v, r: generate_ign_overlay(
d, b, v, r,
layer='ORTHOIMAGERY.ORTHOPHOTOS',
@ -106,7 +108,7 @@ VIZ_STEPS = [
class LidarArchaeoPipeline:
"""Orchestrates the LiDAR archaeological analysis pipeline."""
def __init__(self, input_dir, output_dir, resolution=0.5, workers=1, force=False, ground_method='auto', force_classify=False, keep_tif=False, no_viewer=False):
def __init__(self, input_dir, output_dir, resolution=0.5, workers=1, force=False, ground_method='auto', force_classify=False, keep_tif=False):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.resolution = resolution
@ -115,7 +117,6 @@ class LidarArchaeoPipeline:
self.ground_method = ground_method
self.force_classify = force_classify
self.keep_tif = keep_tif
self.no_viewer = no_viewer
self.temp_dir = self.output_dir / "temp"
if not self.input_dir.exists():
@ -140,7 +141,6 @@ class LidarArchaeoPipeline:
logger.info(f" Classification sol : {self.ground_method}")
logger.info(f" Force classif.: {'OUI' if self.force_classify else 'non'}")
logger.info(f" Keep TIFF : {'OUI' if self.keep_tif else 'non'}")
logger.info(f" Viewer web : {'non' if self.no_viewer else 'OUI'}")
def find_laz_files(self):
"""Find all LAZ/LAS files in input directory."""
@ -162,20 +162,28 @@ class LidarArchaeoPipeline:
return False
return True
def generate_all_visualizations(self, dtm_file, basename):
def generate_all_visualizations(self, dtm_file, basename, resolution=None):
"""Generate all archaeological visualizations for one DTM file.
Returns a dict of {name: tif_path} for successful generations.
Args:
resolution: Actual resolution from DTM geotransform. If None, uses self.resolution.
"""
if resolution is None:
resolution = self.resolution
logger.info(" Génération visualisations:")
# Create per-file subdirectory
file_vis_dir = self.vis_dir / basename
file_vis_dir.mkdir(exist_ok=True)
# Pre-compute shared DEM data (gradient, NaN mask, LRM) once for all visualizations
logger.info(" Pré-calcul données partagées (gradient, LRM)...")
t_shared = time.time()
shared = SharedDEM(dtm_file, resolution)
logger.info(f" ✓ Données partagées prêtes ({time.time()-t_shared:.1f}s)")
vis_results = {}
total = len(VIZ_STEPS)
elapsed_times = []
for idx, (name, func) in enumerate(VIZ_STEPS, 1):
# When --force, delete existing TIF to ensure clean regeneration
@ -216,17 +224,15 @@ class LidarArchaeoPipeline:
logger.info(f" [{idx}/{total}] {name}...")
t0 = time.time()
try:
result = func(dtm_file, basename, file_vis_dir, self.resolution)
# IGN overlays don't use SharedDEM (they download external data)
if name in ('ortho', 'topo'):
result = func(dtm_file, basename, file_vis_dir, resolution)
else:
result = func(dtm_file, basename, file_vis_dir, resolution, shared=shared)
vis_results[name] = result
elapsed = time.time() - t0
elapsed_times.append(elapsed)
if result:
eta = ""
if len(elapsed_times) > 1:
avg_time = sum(elapsed_times) / len(elapsed_times)
remaining = (total - idx) * avg_time
eta = f" — ETA: {remaining:.0f}s"
logger.info(f" [{idx}/{total}] ✓ {name} ({elapsed:.1f}s){eta}")
logger.info(f" [{idx}/{total}] ✓ {name} ({elapsed:.1f}s)")
else:
logger.warning(f" [{idx}/{total}] ✗ {name} — no output ({elapsed:.1f}s)")
except Exception as e:
@ -237,20 +243,23 @@ class LidarArchaeoPipeline:
gpu_cleanup()
# Convert to WebP (only newly generated TIFs, not skipped ones)
# Also generate COGs for web viewer if enabled
logger.info(" Conversion images WebP:")
cog_dir = file_vis_dir / "cog"
if not self.no_viewer:
cog_dir.mkdir(exist_ok=True)
source_info = {
'method': self.ground_method,
'date': datetime.now().strftime('%Y-%m-%d'),
'basename': basename,
}
for name, tif_file in vis_results.items():
if tif_file and isinstance(tif_file, Path) and tif_file.suffix == '.tif' and tif_file.exists():
# Generate COG before WebP conversion (which may delete the TIF)
if not self.no_viewer:
convert_to_cog(tif_file, cog_dir)
webp_file = tif_to_png(tif_file, file_vis_dir, self.resolution, keep_tif=self.keep_tif or not self.no_viewer)
webp_file = tif_to_png(tif_file, file_vis_dir, resolution, keep_tif=self.keep_tif, source_info=source_info)
if webp_file:
logger.info(f"{webp_file.name}")
# Clean up remaining TIF files unless --keep-tif
if not self.keep_tif:
for tif in file_vis_dir.glob("*.tif"):
tif.unlink(missing_ok=True)
return vis_results
def process_file(self, laz_file):
@ -263,60 +272,75 @@ class LidarArchaeoPipeline:
logger.info(f"FICHIER : {basename}")
logger.info("=" * 60)
# Step 1: Ground classification
logger.info("[1/6] Classification du sol...")
t1 = time.time()
las_file = classify_ground(laz_file, self.temp_dir, method=self.ground_method, force=self.force_classify)
t_classif = time.time() - t1
if not las_file:
logger.error(f" ✗ Échec classification ({t_classif:.1f}s)")
# Validate file integrity before any processing
from .dtm import validate_laz
if not validate_laz(laz_file):
return False
logger.info(f" ✓ Classification terminée ({t_classif:.1f}s)")
# Step 2: Generate DTM
logger.info("[2/6] Génération DTM...")
t2 = time.time()
dtm_file = create_dtm_fast(las_file, basename, self.dtm_dir, self.resolution)
t_dtm = time.time() - t2
if not dtm_file:
logger.error(f" ✗ Échec DTM ({t_dtm:.1f}s)")
return False
logger.info(f" ✓ DTM terminé ({t_dtm:.1f}s)")
# Skip ground classification + DTM if DTM already exists with matching resolution
# --force only affects visualizations/PDF, not classification/DTM
# Use --force-classification to force reclassification
dtm_path = self.dtm_dir / f"{basename}_dtm.tif"
if dtm_path.exists():
# Check that existing DTM resolution matches requested resolution
import rasterio
try:
with rasterio.open(dtm_path) as src:
existing_res = abs(src.transform.a)
if abs(existing_res - self.resolution) > 0.01:
logger.info(f"[1/5] DTM existant à {existing_res}m/px — résolution demandée {self.resolution}m/px → régénération")
dtm_path.unlink()
else:
logger.info(f"[1/5] Classification du sol — sautée (DTM existant à {existing_res}m/px)")
logger.info("[2/5] Génération DTM — sautée (DTM existant)")
dtm_file = dtm_path
t_classif = 0
t_dtm = 0
except Exception:
logger.warning(f"Impossible de lire le DTM existant — régénération")
dtm_path.unlink()
# Step 3: Visualizations
logger.info("[3/6] Visualisations archéologiques...")
self.generate_all_visualizations(dtm_file, basename)
if not dtm_path.exists():
# Step 1: Ground classification
logger.info("[1/5] Classification du sol...")
t1 = time.time()
las_file = classify_ground(laz_file, self.temp_dir, method=self.ground_method, force=self.force_classify)
t_classif = time.time() - t1
if not las_file:
logger.error(f" ✗ Échec classification ({t_classif:.1f}s)")
return False
logger.info(f" ✓ Classification terminée ({t_classif:.1f}s)")
# Step 2: Generate DTM
logger.info("[2/5] Génération DTM...")
t2 = time.time()
dtm_file = create_dtm_fast(las_file, basename, self.dtm_dir, self.resolution)
t_dtm = time.time() - t2
if not dtm_file:
logger.error(f" ✗ Échec DTM ({t_dtm:.1f}s)")
return False
logger.info(f" ✓ DTM terminé ({t_dtm:.1f}s)")
# Step 3: Visualizations — use actual resolution from DTM
import rasterio
with rasterio.open(dtm_file) as src:
actual_res = abs(src.transform.a)
if abs(actual_res - self.resolution) > 0.01:
logger.info(f" Résolution DTM: {actual_res}m/px (demandée: {self.resolution}m/px)")
self.generate_all_visualizations(dtm_file, basename, actual_res)
# Step 4: PDF report
t_pdf = 0
file_vis_dir = self.vis_dir / basename
logger.info("[4/6] Rapport PDF A3...")
t4 = time.time()
generate_pdf_report(basename, file_vis_dir, self.pdf_dir, self.resolution)
t_pdf = time.time() - t4
logger.info(f" ✓ Rapport PDF terminé ({t_pdf:.1f}s)")
# Step 5: COGs for web viewer
logger.info("[5/6] Génération métadonnées viewer web...")
t5 = time.time()
if not self.no_viewer:
# Convert DTM to COG as well
dtm_cog_dir = self.dtm_dir / "cog"
dtm_cog_dir.mkdir(exist_ok=True)
for dtm_file in sorted(self.dtm_dir.glob(f"{basename}_dtm.tif")):
convert_to_cog(dtm_file, dtm_cog_dir)
generate_cog_metadata(self.vis_dir, basename)
t_cog = time.time() - t5
logger.info(f" ✓ Métadonnées viewer web terminées ({t_cog:.1f}s)")
# Step 6: Web viewer
if not self.no_viewer:
logger.info("[6/6] Génération viewer web...")
t6 = time.time()
generate_viewer(basename, file_vis_dir, self.vis_dir)
t_viewer = time.time() - t6
logger.info(f" ✓ Viewer web terminé ({t_viewer:.1f}s)")
pdf_file = self.pdf_dir / f"{basename}_rapport.pdf"
if pdf_file.exists() and not self.force:
logger.info(f"[4/5] Rapport PDF déjà existant — ignoré: {pdf_file.name}")
else:
logger.info("[6/6] Viewer web: ignoré (--no-viewer)")
logger.info("[4/5] Rapport PDF A3...")
t4 = time.time()
generate_pdf_report(basename, file_vis_dir, self.pdf_dir, actual_res)
t_pdf = time.time() - t4
logger.info(f" ✓ Rapport PDF terminé ({t_pdf:.1f}s)")
t_total = time.time() - t_start
logger.info(f"{basename} terminé en {t_total:.1f}s")
@ -349,7 +373,7 @@ class LidarArchaeoPipeline:
logger.info(f"Fichiers: {len(files)}")
with ProcessPoolExecutor(max_workers=self.workers) as executor:
future_to_file = {
executor.submit(_process_file_standalone, str(laz_file), str(self.input_dir), str(self.output_dir), self.resolution, self.force, self.ground_method, self.force_classify, self.keep_tif, self.no_viewer): laz_file
executor.submit(_process_file_standalone, str(laz_file), str(self.input_dir), str(self.output_dir), self.resolution, self.force, self.ground_method, self.force_classify, self.keep_tif): laz_file
for laz_file in files
}
done = 0
@ -411,7 +435,7 @@ class LidarArchaeoPipeline:
logger.warning(f" Note: Impossible de supprimer les fichiers temporaires: {e}")
def _process_file_standalone(laz_file_str, input_dir, output_dir, resolution, force=False, ground_method='auto', force_classify=False, keep_tif=False, no_viewer=False):
def _process_file_standalone(laz_file_str, input_dir, output_dir, resolution, force=False, ground_method='auto', force_classify=False, keep_tif=False):
"""Standalone function for multiprocessing — creates its own pipeline instance.
Each worker gets its own temp directory to avoid file conflicts.
@ -419,6 +443,11 @@ def _process_file_standalone(laz_file_str, input_dir, output_dir, resolution, fo
# Configure logging in worker process (spawn doesn't inherit parent config)
import logging
import sys
# Ensure UTF-8 output — spawn workers may default to ASCII
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
worker_logger = logging.getLogger("lidar")
if not worker_logger.handlers:
handler = logging.StreamHandler(sys.stdout)
@ -427,7 +456,7 @@ def _process_file_standalone(laz_file_str, input_dir, output_dir, resolution, fo
worker_logger.addHandler(handler)
worker_logger.addFilter(_file_filter)
pipeline = LidarArchaeoPipeline(input_dir, output_dir, resolution=resolution, workers=1, force=force, ground_method=ground_method, force_classify=force_classify, keep_tif=keep_tif, no_viewer=no_viewer)
pipeline = LidarArchaeoPipeline(input_dir, output_dir, resolution=resolution, workers=1, force=force, ground_method=ground_method, force_classify=force_classify, keep_tif=keep_tif)
basename = _file_basename(laz_file_str)
pipeline.temp_dir = pipeline.output_dir / "temp" / basename
pipeline.temp_dir.mkdir(exist_ok=True)

View File

@ -8,6 +8,7 @@ Contains:
import logging
import time
from datetime import datetime
from pathlib import Path
import numpy as np
@ -155,6 +156,14 @@ COLORMAPS = {
'vmin_mode': 'fixed', 'vmin_val': 0,
'vmax_mode': 'percentile', 'vmax_pct': 98,
},
'local_dominance': {
'cmap': 'RdYlBu_r',
'title': 'Dominance Locale (position relative dans le voisinage)',
'legend': 'Proportion du voisinage sous le point central\nRouge = Point dominant (sommet, crête)\nBleu = Point encaissé (fossé, vallée)\nRayon: 15m',
'description': 'Mesure la saillie locale — complémentaire de l\'openness',
'vmin_mode': 'percentile', 'vmin_pct': 2,
'vmax_mode': 'percentile', 'vmax_pct': 98,
},
}
# RGB entries (ortho/topo) are handled specially
@ -238,7 +247,31 @@ def _apply_colormap(data, tif_file):
return data, 'terrain', title, 'Altitude normalisée', '', False
def tif_to_png(tif_file, vis_dir, resolution, keep_tif=False):
def _nice_scale(extent_m):
"""Choose a nice round scale distance that fits well in the image.
Returns (scale_m, label) where label is like '100 m' or '500 m' or '1 km'.
"""
nice_scales = [50, 100, 200, 500, 1000, 2000, 5000, 10000]
# Pick the largest scale that is <= 15% of the extent
for s in nice_scales:
if s <= extent_m * 0.15:
continue
# s is the first one > 15% — take the one below
break
else:
s = nice_scales[-1]
# Actually pick the largest scale <= 20% of extent
chosen = nice_scales[0]
for s in nice_scales:
if s <= extent_m * 0.20:
chosen = s
if chosen >= 1000:
return chosen, f"{chosen // 1000} km"
return chosen, f"{chosen} m"
def tif_to_png(tif_file, vis_dir, resolution, keep_tif=False, source_info=None):
"""Convert GeoTIFF to visualization WebP with GPS coordinates, legend, and scale bar.
Args:
@ -257,7 +290,7 @@ def tif_to_png(tif_file, vis_dir, resolution, keep_tif=False):
try:
with rasterio.open(tif_file) as src:
is_rgb = src.count >= 3 and ('ortho' in str(tif_file) or 'topo' in str(tif_file))
is_rgb = src.count >= 3 and any(k in str(tif_file) for k in ('ortho', 'topo'))
if is_rgb:
data = src.read([1, 2, 3])
@ -362,9 +395,9 @@ def tif_to_png(tif_file, vis_dir, resolution, keep_tif=False):
# Fixed data area position — identical for ALL visualization types
# This ensures overlay/superposition works across all WebP images
data_left = 0.08
data_bottom = 0.12
data_bottom = 0.19
data_width_frac = 0.74
data_height_frac = 0.78
data_height_frac = 0.71
ax = fig.add_axes([data_left, data_bottom, data_width_frac, data_height_frac])
if is_rgba or is_rgb:
@ -440,20 +473,32 @@ def tif_to_png(tif_file, vis_dir, resolution, keep_tif=False):
spine.set_color('black')
spine.set_linewidth(0.8)
# North arrow
north_ax = inset_axes(ax, width="4%", height="7%", loc='upper right',
bbox_to_anchor=(-0.03, 0.12, 1, 1), bbox_transform=ax.transAxes)
north_ax.set_xlim(0, 1)
north_ax.set_ylim(0, 1)
# North arrow — compass rose style
north_ax = inset_axes(ax, width="5%", height="9%", loc='upper right',
bbox_to_anchor=(-0.03, 0.08, 1, 1), bbox_transform=ax.transAxes)
north_ax.set_xlim(-1.2, 1.2)
north_ax.set_ylim(-0.5, 1.5)
north_ax.axis('off')
north_ax.plot([0.5, 0.5], [0.1, 0.65], color='black', linewidth=2.5, zorder=10)
north_ax.add_patch(MplPolygon([[0.5, 0.2], [0.35, 0.4], [0.5, 0.65], [0.65, 0.4]],
closed=True, facecolor='black', edgecolor='black', zorder=9))
north_ax.text(0.5, 0.95, 'N', ha='center', va='top',
fontsize=13, fontweight='bold', color='black', zorder=11)
north_ax.set_aspect('equal')
# N arrow
north_ax.annotate('N', xy=(0, 1.3), fontsize=11, fontweight='bold',
ha='center', va='bottom', color='#b22222')
north_ax.plot([0, 0], [0.0, 1.0], color='#b22222', linewidth=2.0, zorder=10)
north_ax.add_patch(MplPolygon([[0, 0.3], [-0.2, 0.7], [0, 1.0], [0.2, 0.7]],
closed=True, facecolor='#b22222', edgecolor='#b22222', zorder=9))
# Cardinal ticks
for angle, label in [(90, ''), (0, 'E'), (180, 'O'), (270, 'S')]:
rad = np.radians(angle)
r_text = 1.25
north_ax.plot([0.85*np.cos(rad), 1.05*np.cos(rad)],
[0.85*np.sin(rad), 1.05*np.sin(rad)],
color='#555555', linewidth=0.8, zorder=5)
if label:
north_ax.text(r_text*np.cos(rad), r_text*np.sin(rad), label,
fontsize=7, ha='center', va='center', color='#555555')
# Bottom info bar — fixed position
info_ax = fig.add_axes([data_left, 0.02, data_width_frac + cbar_width + 0.02, 0.07])
# Bottom info bar — enriched with source, method, date
info_ax = fig.add_axes([data_left, 0.015, data_width_frac + cbar_width + 0.02, 0.09])
info_ax.axis('off')
extent_km_x = (max_x - min_x) / 1000
@ -465,42 +510,73 @@ def tif_to_png(tif_file, vis_dir, resolution, keep_tif=False):
alt_min = float(np.nanmin(valid_data)) if len(valid_data) > 0 else 0
alt_max = float(np.nanmax(valid_data)) if len(valid_data) > 0 else 0
# Build info lines
line1_parts = []
if gps_coords:
nw_lat, nw_lon = gps_coords['NW']
se_lat, se_lon = gps_coords['SE']
info_text = (
f"GPS: {nw_lat:.5f}N {nw_lon:.5f}E - {se_lat:.5f}N {se_lon:.5f}E | "
f"EPSG:2154 | Res: {resolution}m/px | "
f"Emprise: {extent_km_x:.1f}x{extent_km_y:.1f}km"
)
line1_parts.append(f"GPS: {nw_lat:.5f}°N {nw_lon:.5f}°E — {se_lat:.5f}°N {se_lon:.5f}°E")
else:
info_text = (
f"EPSG:2154 | X: {min_x:.0f}-{max_x:.0f} Y: {min_y:.0f}-{max_y:.0f} | "
f"Res: {resolution}m/px | Emprise: {extent_km_x:.1f}x{extent_km_y:.1f}km"
)
line1_parts.append(f"X: {min_x:.0f}{max_x:.0f} Y: {min_y:.0f}{max_y:.0f}")
line1_parts.append(f"EPSG:2154")
line1_parts.append(f"Res: {resolution}m/px")
line1_parts.append(f"Emprise: {extent_km_x:.1f}×{extent_km_y:.1f}km")
if not is_rgb:
line1_parts.append(f"Alt: {alt_min:.1f}{alt_max:.1f}m")
info_ax.text(0.01, 0.5, info_text,
transform=info_ax.transAxes, fontsize=8.5,
line2_parts = []
line2_parts.append("Source: LiDAR HD IGN")
if source_info:
if source_info.get('method'):
line2_parts.append(f"Classif.: {source_info['method'].upper()}")
if source_info.get('date'):
line2_parts.append(f"Date: {source_info['date']}")
else:
line2_parts.append(datetime.now().strftime("Date: %Y-%m-%d"))
info_text_line1 = " | ".join(line1_parts)
info_text_line2 = " | ".join(line2_parts)
info_ax.text(0.01, 0.7, info_text_line1,
transform=info_ax.transAxes, fontsize=8,
verticalalignment='center', family='monospace',
bbox=dict(boxstyle='round,pad=0.3', facecolor='#f0f0f0',
bbox=dict(boxstyle='round,pad=0.2', facecolor='#f0f0f0',
edgecolor='#aaaaaa', alpha=0.95))
info_ax.text(0.01, 0.2, info_text_line2,
transform=info_ax.transAxes, fontsize=7.5,
verticalalignment='center', family='monospace',
color='#444444',
bbox=dict(boxstyle='round,pad=0.2', facecolor='#f8f8f8',
edgecolor='#cccccc', alpha=0.9))
# Scale bar
scale_m = 100
# Scale bar — adaptive with alternating black/white segments
extent_m_x = max_x - min_x
scale_m, scale_label = _nice_scale(extent_m_x)
pixels_per_meter = 1.0 / pixel_size_x
scale_px = int(scale_m * pixels_per_meter)
scale_bar_frac = scale_px / width
bar_left = 0.80
bar_bottom = 0.15
bar_width_frac = min(scale_bar_frac, 0.15)
bar_height = 0.35
n_segments = 5
segment_px = scale_px / n_segments
bar_bottom_y = 0.55
bar_top_y = 0.85
bar_height = bar_top_y - bar_bottom_y
scale_start_x = 0.80
info_ax.add_patch(RectPatch((bar_left, bar_bottom), bar_width_frac, bar_height,
facecolor='black', edgecolor='black', linewidth=0.5,
transform=info_ax.transAxes, clip_on=False))
info_ax.text(bar_left + bar_width_frac / 2, bar_bottom + bar_height + 0.12,
f"{scale_m} m", ha='center', va='bottom', fontsize=9, fontweight='bold',
transform=info_ax.transAxes)
for seg_i in range(n_segments):
color = 'black' if seg_i % 2 == 0 else 'white'
seg_left = scale_start_x + seg_i * segment_px / width
seg_width_frac = segment_px / width
info_ax.add_patch(RectPatch((seg_left, bar_bottom_y), seg_width_frac, bar_height,
facecolor=color, edgecolor='black', linewidth=0.5,
transform=info_ax.transAxes, clip_on=False))
info_ax.text(scale_start_x + scale_px / (2 * width), bar_top_y + 0.12,
f"{scale_label}", ha='center', va='bottom', fontsize=8, fontweight='bold',
transform=info_ax.transAxes)
# Scale end ticks
info_ax.plot([scale_start_x, scale_start_x], [bar_bottom_y - 0.05, bar_top_y + 0.05],
color='black', linewidth=1, transform=info_ax.transAxes, clip_on=False)
info_ax.plot([scale_start_x + scale_px / width, scale_start_x + scale_px / width],
[bar_bottom_y - 0.05, bar_top_y + 0.05],
color='black', linewidth=1, transform=info_ax.transAxes, clip_on=False)
fig.patch.set_facecolor('white')
@ -527,145 +603,6 @@ def tif_to_png(tif_file, vis_dir, resolution, keep_tif=False):
return None
def convert_to_cog(tif_file, cog_dir):
"""Convert a GeoTIFF to Cloud Optimized GeoTIFF for web map serving.
Args:
tif_file: Path to input GeoTIFF.
cog_dir: Directory for output COG file.
Returns:
Path to output COG file, or None on failure.
"""
cog_file = cog_dir / tif_file.name
if cog_file.exists():
logger.debug(f" COG déjà existant: {cog_file.name}")
return cog_file
try:
from rio_cogeo.cogeo import cog_translate
from rio_cogeo.profiles import cog_profiles
with rasterio.open(tif_file) as src:
is_rgb = src.count >= 3
# Use deflate compression profile
profile = dict(cog_profiles["deflate"]) # Make a mutable copy
if not is_rgb:
# Single-band terrain data: keep float32 for continuous values
profile.update(dtype="float32")
cog_translate(str(tif_file), str(cog_file), profile, quiet=True)
logger.debug(f" COG créé: {cog_file.name}")
return cog_file
except ImportError:
logger.warning(" rio-cogeo non disponible — COG non généré")
return None
except Exception as e:
logger.warning(f" Erreur conversion COG: {e}")
return None
def generate_cog_metadata(vis_dir, basename):
"""Generate metadata JSON for all visualization layers.
Scans the COG directory for files and reads their geographic bounds.
Creates a metadata.json with WGS84 bounds and layer info for the web viewer.
Args:
vis_dir: Directory containing COG files (vis_dir/basename/cog/).
basename: Base name for the LiDAR tile.
Returns:
Path to metadata.json, or None on failure.
"""
import json
cog_dir = vis_dir / basename / "cog"
if not cog_dir.exists():
return None
# Find the DTM to get geographic bounds
# The COG files inherit bounds from the DTM, so we can read any COG
cog_files = sorted(cog_dir.glob("*.tif"))
if not cog_files:
return None
# Read bounds from first COG file
try:
ref_cog = cog_files[0]
with rasterio.open(ref_cog) as src:
bounds = src.bounds
crs = src.crs
if crs and HAS_WARP:
l93_xs = [bounds.left, bounds.right]
l93_ys = [bounds.top, bounds.bottom]
lons, lats = warp_transform(crs, 'EPSG:4326', l93_xs, l93_ys)
bounds_wgs84 = {
'west': float(min(lons)),
'south': float(min(lats)),
'east': float(max(lons)),
'north': float(max(lats)),
}
else:
# Fallback: use Lambert 93 bounds directly
bounds_wgs84 = {
'west': float(bounds.left),
'south': float(bounds.bottom),
'east': float(bounds.right),
'north': float(bounds.top),
}
bounds_l93 = {
'min_x': float(bounds.left),
'min_y': float(bounds.bottom),
'max_x': float(bounds.right),
'max_y': float(bounds.top),
}
resolution = float(abs(src.transform.a))
except Exception as e:
logger.warning(f" Erreur lecture bounds COG: {e}")
return None
# Build layer list with titles
layers = []
for cog_file in cog_files:
name = cog_file.stem.replace(basename + "_", "")
# Find matching title from COLORMAPS or RGB_LEGENDS
title = name.replace("_", " ").title()
for key in sorted(COLORMAPS.keys(), key=len, reverse=True):
if key in name:
title = COLORMAPS[key]['title']
break
for key in RGB_LEGENDS:
if key in name:
title = RGB_LEGENDS[key]['title']
break
layers.append({
'name': name,
'title': title,
'file': cog_file.name,
})
metadata = {
'basename': basename,
'bounds_l93': bounds_l93,
'bounds_wgs84': bounds_wgs84,
'resolution': resolution,
'layers': layers,
}
metadata_file = vis_dir / basename / "metadata.json"
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
logger.info(f" Métadonnées: {len(layers)} couches, bounds={bounds_wgs84}")
return metadata_file
def generate_pdf_report(basename, vis_dir, pdf_dir, resolution):
"""Generate A3 PDF report for a LiDAR file with all visualizations.

View File

@ -1,117 +0,0 @@
"""TiTiler-based web server for serving LiDAR COG visualizations.
Provides:
- COG tile serving via TiTiler API
- Static file serving for viewer HTML
- CORS headers for local development
Usage:
python -m lidar_pipeline.server /path/to/output [--port 8000]
"""
import logging
import sys
from pathlib import Path
logger = logging.getLogger("lidar")
def create_app(output_dir):
"""Create the FastAPI application with TiTiler and static file serving.
Args:
output_dir: Path to the output directory containing visualisations/ and viewer/.
Returns:
FastAPI application instance.
"""
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from titiler.core.factory import TilerFactory
output_dir = Path(output_dir).resolve()
# TiTiler COG endpoint
cog = TilerFactory(router_prefix="/cog")
app = FastAPI(title="LiDAR Archéologique", docs_url="/docs")
# CORS for local development
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
# Mount TiTiler routes
app.include_router(cog.router, prefix="/cog")
@app.get("/")
async def index():
"""Landing page with links."""
return HTMLResponse(
'<html><head><title>LiDAR Server</title>'
'<style>body{font-family:sans-serif;max-width:600px;margin:40px auto;padding:0 20px}'
'h1{color:#1a1a2e}a{color:#4fc3f7}</style></head>'
'<body>'
'<h1>Serveur LiDAR Archéologique</h1>'
'<p><a href="/viewer">Visualisations</a></p>'
'<p>TiTiler API: <a href="/cog/">/cog/</a></p>'
'</body></html>'
)
@app.get("/viewer/{basename}")
@app.get("/viewer")
async def serve_viewer(basename: str = ""):
"""Serve viewer HTML files."""
if not basename:
# List available viewers
viewer_dir = output_dir / 'visualisations' / 'viewer'
if viewer_dir.exists():
viewers = sorted(viewer_dir.glob('*.html'))
if viewers:
links = ''.join(
f'<li><a href="/viewer/{f.stem}">{f.stem}</a></li>'
for f in viewers
)
return HTMLResponse(
f'<html><head><title>LiDAR Viewer</title>'
f'<style>body{{font-family:sans-serif;max-width:600px;margin:40px auto;padding:0 20px}}'
f'h1{{color:#1a1a2e}}li{{margin:8px 0}}a{{color:#4fc3f7}}</style></head>'
f'<body><h1>Visualisations LiDAR</h1><ul>{links}</ul></body></html>'
)
return HTMLResponse('<html><body><p>Aucun viewer disponible</p></body></html>')
viewer_file = output_dir / 'visualisations' / 'viewer' / f"{basename}.html"
if viewer_file.exists():
return FileResponse(str(viewer_file), media_type='text/html')
return JSONResponse({'error': f'Viewer not found: {basename}'}, status_code=404)
return app
def main():
"""Entry point for the LiDAR web server."""
import argparse
import uvicorn
parser = argparse.ArgumentParser(description='Serveur cartographique LiDAR')
parser.add_argument('output_dir', help='Répertoire de sortie contenant les visualisations')
parser.add_argument('--host', default='0.0.0.0', help='Hôte (défaut: 0.0.0.0)')
parser.add_argument('--port', type=int, default=8000, help='Port (défaut: 8000)')
args = parser.parse_args()
output_dir = Path(args.output_dir)
if not output_dir.exists():
logger.error(f"Répertoire introuvable: {output_dir}")
sys.exit(1)
app = create_app(output_dir)
print(f"Serveur LiDAR Archéologique")
print(f" Viewer: http://{args.host}:{args.port}/viewer")
print(f" TiTiler: http://{args.host}:{args.port}/cog/")
print(f" Répertoire: {output_dir}")
uvicorn.run(app, host=args.host, port=args.port, log_level='info')
if __name__ == '__main__':
main()

View File

@ -198,4 +198,35 @@ class TestFlow:
data = src.read(1)
# log1p(x) >= 0 for x >= 0
valid = data[~np.isnan(data)]
assert np.nanmin(valid) >= 0
assert np.nanmin(valid) >= 0
class TestLocalDominance:
def test_generates_tif(self, synthetic_dem, tmp_output_dir):
from lidar_pipeline.visualizations import generate_local_dominance
result = generate_local_dominance(synthetic_dem, "test", tmp_output_dir, 5.0)
assert result is not None
assert result.exists()
assert result.suffix == ".tif"
def test_dominance_values_0_1(self, synthetic_dem, tmp_output_dir):
import rasterio
from lidar_pipeline.visualizations import generate_local_dominance
result = generate_local_dominance(synthetic_dem, "test", tmp_output_dir, 5.0)
with rasterio.open(result) as src:
data = src.read(1)
valid = data[~np.isnan(data)]
assert np.nanmin(valid) >= 0, "Local dominance should be >= 0"
assert np.nanmax(valid) <= 1, "Local dominance should be <= 1"
def test_dominance_nan_mask_preserved(self, synthetic_dem, tmp_output_dir):
"""Check that NaN zones from original DEM are preserved."""
import rasterio
from lidar_pipeline.visualizations import generate_local_dominance
result = generate_local_dominance(synthetic_dem, "test", tmp_output_dir, 5.0)
# The synthetic DEM has no NaN, so this just verifies the output is valid
with rasterio.open(result) as src:
data = src.read(1)
# Shape should match input
assert data.shape[0] > 0
assert data.shape[1] > 0

View File

@ -1,416 +0,0 @@
"""Web map viewer generator for LiDAR visualizations.
Generates a self-contained HTML file with MapLibre GL JS that displays
all visualization layers with opacity controls and IGN/OSM basemaps.
Served by TiTiler for COG tile access.
"""
import json
import logging
from pathlib import Path
logger = logging.getLogger("lidar")
# Layer ordering for the viewer panel (archaeological priority)
LAYER_ORDER = [
'hillshade_multi',
'slope',
'aspect',
'curvature',
'svf',
'lrm',
'positive_openness',
'negative_openness',
'mslrm',
'tpi',
'sailore',
'roughness',
'anomalies',
'wavelet',
'flow',
'ortho',
'topo',
]
# Colormaps for TiTiler rendering of single-band COGs
LAYER_COLORMAPS = {
'hillshade_multi': 'gray',
'slope': 'inferno',
'aspect': 'turbo',
'curvature': 'RdYlBu_r',
'svf': 'viridis',
'lrm': 'RdBu_r',
'positive_openness': 'YlOrBr',
'negative_openness': 'GnBu_r',
'mslrm': 'RdBu_r',
'tpi': 'BrBG',
'sailore': 'seismic',
'roughness': 'magma',
'anomalies': 'coolwarm',
'wavelet': 'cividis',
'flow': 'Blues',
}
def generate_viewer(basename, vis_dir, output_vis_dir, titiler_url='http://localhost:8000'):
"""Generate a MapLibre GL JS viewer HTML file for the LiDAR tile.
Args:
basename: Base name for the LiDAR tile.
vis_dir: Per-file visualization directory (vis_dir/basename/).
output_vis_dir: Parent visualization directory for viewer output.
titiler_url: Base URL of the TiTiler server.
Returns:
Path to the generated HTML file, or None on failure.
"""
# Read metadata.json
metadata_file = vis_dir / "metadata.json"
if not metadata_file.exists():
logger.error(f" Métadonnées manquantes: {metadata_file}")
return None
with open(metadata_file) as f:
metadata = json.load(f)
bounds_wgs84 = metadata['bounds_wgs84']
resolution = metadata.get('resolution', 0.5)
# Determine which files are RGB (ortho/topo)
layers = []
for layer in metadata['layers']:
is_rgb = 'ortho' in layer['name'] or 'topo' in layer['name']
layers.append({
'name': layer['name'],
'title': layer['title'],
'file': layer['file'],
'is_rgb': is_rgb,
})
# Sort layers by archaeological priority
def layer_sort_key(l):
name = l['name']
for i, key in enumerate(LAYER_ORDER):
if key in name:
return i
return len(LAYER_ORDER)
layers.sort(key=layer_sort_key)
# COG path prefix for TiTiler (absolute path inside Docker container)
cog_path_prefix = f'/data/output/visualisations/{basename}/cog'
# Build layer controls HTML
layer_controls = []
for layer in layers:
checked = 'checked' if layer['name'] == 'hillshade_multi' else ''
initial_opacity = 100 if layer['name'] == 'hillshade_multi' else 0
layer_type = 'RGB' if layer['is_rgb'] else 'DEM'
layer_controls.append(
f' <div class="layer-control">\n'
f' <label>\n'
f' <input type="checkbox" data-layer="{layer["name"]}" {checked}>\n'
f' <span class="layer-title">{layer["title"]}</span>\n'
f' <span class="layer-type">{layer_type}</span>\n'
f' </label>\n'
f' <input type="range" class="opacity-slider" data-layer="{layer["name"]}" '
f'min="0" max="100" value="{initial_opacity}">\n'
f' </div>'
)
layer_controls_html = '\n'.join(layer_controls)
# Serialize data for JS
bounds_json = json.dumps(bounds_wgs84)
layers_json = json.dumps(layers, ensure_ascii=False)
colormaps_json = json.dumps(LAYER_COLORMAPS)
center_lng = (bounds_wgs84['west'] + bounds_wgs84['east']) / 2
center_lat = (bounds_wgs84['south'] + bounds_wgs84['north']) / 2
# Build the full HTML using string concatenation to avoid f-string issues with {z}/{x}/{y}
html_parts = []
html_parts.append(f'''<!DOCTYPE html>
<html lang="fr">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>LiDAR Archéologique — {basename}</title>
<link rel="stylesheet" href="https://unpkg.com/maplibre-gl@4.7.1/dist/maplibre-gl.css">
<script src="https://unpkg.com/maplibre-gl@4.7.1/dist/maplibre-gl.js"></script>
<style>
* {{ margin: 0; padding: 0; box-sizing: border-box; }}
body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; }}
#map {{ position: absolute; top: 0; left: 280px; right: 0; bottom: 0; }}
#panel {{
position: absolute; top: 0; left: 0; width: 280px; bottom: 0;
background: #1a1a2e; color: #e0e0e0; overflow-y: auto;
z-index: 10; box-shadow: 2px 0 8px rgba(0,0,0,0.3);
}}
#panel h1 {{
font-size: 14px; padding: 12px 14px; background: #16213e;
border-bottom: 1px solid #0f3460; letter-spacing: 0.5px;
}}
#panel h2 {{
font-size: 11px; padding: 8px 14px; color: #a0a0a0;
text-transform: uppercase; letter-spacing: 1px;
border-bottom: 1px solid #2a2a4a;
}}
.layer-group {{ padding: 4px 0; border-bottom: 1px solid #2a2a4a; }}
.layer-control {{
padding: 6px 14px;
transition: background 0.15s;
}}
.layer-control:hover {{ background: #252550; }}
.layer-control label {{
display: flex; align-items: center; gap: 8px;
font-size: 13px; cursor: pointer; margin-bottom: 4px;
}}
.layer-control input[type="checkbox"] {{
width: 16px; height: 16px; accent-color: #4fc3f7;
}}
.layer-control .layer-title {{ flex: 1; }}
.layer-control .layer-type {{
font-size: 10px; color: #888; text-transform: uppercase;
}}
.opacity-slider {{
width: 100%; height: 4px; margin: 2px 0 0 24px;
-webkit-appearance: none; appearance: none;
background: #333; border-radius: 2px; outline: none;
}}
.opacity-slider::-webkit-slider-thumb {{
-webkit-appearance: none; appearance: none;
width: 14px; height: 14px; border-radius: 50%;
background: #4fc3f7; cursor: pointer;
}}
.basemap-section {{
padding: 10px 14px; border-bottom: 1px solid #2a2a4a;
}}
.basemap-section h3 {{
font-size: 11px; color: #a0a0a0; text-transform: uppercase;
letter-spacing: 1px; margin-bottom: 8px;
}}
.basemap-btn {{
display: inline-block; padding: 5px 10px; margin: 2px;
background: #2a2a4a; color: #ccc; border: 1px solid #444;
border-radius: 4px; cursor: pointer; font-size: 12px;
transition: all 0.15s;
}}
.basemap-btn:hover {{ background: #3a3a6a; }}
.basemap-btn.active {{ background: #4fc3f7; color: #000; border-color: #4fc3f7; }}
#coords {{
position: absolute; bottom: 8px; left: 288px;
background: rgba(26,26,46,0.9); color: #4fc3f7;
padding: 4px 10px; border-radius: 4px; font-size: 12px;
font-family: monospace; z-index: 5;
}}
</style>
</head>
<body>
<div id="panel">
<h1>LiDAR Archéologique</h1>
<div style="padding: 6px 14px; font-size: 12px; color: #888;">
{basename}<br>
<span style="font-size: 10px;">Res: {resolution}m/px | EPSG:2154</span>
</div>
<div class="basemap-section">
<h3>Fond de carte</h3>
<button class="basemap-btn active" onclick="setBasemap('osm')">OSM</button>
<button class="basemap-btn" onclick="setBasemap('ign-photo')">Photo IGN</button>
<button class="basemap-btn" onclick="setBasemap('ign-map')">Carte IGN</button>
</div>
<h2>Visualisations</h2>
<div class="layer-group" id="layers">
{layer_controls_html}
</div>
</div>
<div id="map"></div>
<div id="coords"></div>
<script>
const TITILER_URL = '{titiler_url}';
const BASENAME = '{basename}';
const BOUNDS = {bounds_json};
const LAYERS = {layers_json};
const COG_PATH_PREFIX = '{cog_path_prefix}';
const LAYER_COLORMAPS = {colormaps_json};''')
# JavaScript code — use regular string to avoid f-string issues with {z}/{x}/{y}
html_parts.append(r'''
// Map initialization
const map = new maplibregl.Map({
container: 'map',
style: {
version: 8,
sources: {},
layers: []
},
center: [' + f'{center_lng}, {center_lat}' + r'],
zoom: 13,
maxZoom: 19,
minZoom: 8,
});
// Basemap layers
const basemapStyles = {
'osm': {
type: 'raster',
tiles: ['https://tile.openstreetmap.org/{z}/{x}/{y}.png'],
tileSize: 256,
attribution: '&copy; OpenStreetMap',
maxzoom: 19
},
'ign-photo': {
type: 'raster',
tiles: ['https://data.geopf.fr/wmts?SERVICE=WMTS&VERSION=1.0.0&REQUEST=GetTile&LAYER=ORTHOIMAGERY.ORTHOPHOTOS&STYLE=normal&TILEMATRIXSET=PM&TILEMATRIX={z}&TILECOL={x}&TILEROW={y}&FORMAT=image/jpeg'],
tileSize: 256,
attribution: '&copy; IGN',
maxzoom: 20
},
'ign-map': {
type: 'raster',
tiles: ['https://data.geopf.fr/wmts?SERVICE=WMTS&VERSION=1.0.0&REQUEST=GetTile&LAYER=GEOGRAPHICALGRIDSYSTEMS.PLANIGNV2&STYLE=normal&TILEMATRIXSET=PM&TILEMATRIX={z}&TILECOL={x}&TILEROW={y}&FORMAT=image/png'],
tileSize: 256,
attribution: '&copy; IGN',
maxzoom: 19
}
};
let currentBasemap = 'osm';
function setBasemap(name) {
if (map.getLayer('basemap')) map.removeLayer('basemap');
if (map.getSource('basemap')) map.removeSource('basemap');
currentBasemap = name;
const style = basemapStyles[name];
map.addSource('basemap', style);
map.addLayer({ id: 'basemap', type: 'raster', source: 'basemap' });
document.querySelectorAll('.basemap-btn').forEach(btn => {
const names = {'osm': 'OSM', 'ign-photo': 'Photo IGN', 'ign-map': 'Carte IGN'};
btn.classList.toggle('active', btn.textContent.trim() === names[name]);
});
// Re-add data layers on top of basemap
LAYERS.forEach(layer => {
const layerId = 'layer-' + layer.name;
if (map.getLayer(layerId)) {
map.moveLayer(layerId, 'basemap');
}
});
}
// Add visualization layers
function addVisualizationLayer(layerInfo) {
const name = layerInfo.name;
const isRgb = layerInfo.is_rgb;
const cogUrl = COG_PATH_PREFIX + '/' + layerInfo.file;
let tileUrl;
if (isRgb) {
tileUrl = TITILER_URL + '/cog/tiles/{z}/{x}/{y}.png?url=' +
encodeURIComponent(cogUrl) + '&bidx=1&bidx=2&bidx=3';
} else {
const cmap = LAYER_COLORMAPS[name] || 'viridis';
tileUrl = TITILER_URL + '/cog/tiles/{z}/{x}/{y}.png?url=' +
encodeURIComponent(cogUrl) + '&colormap_name=' + cmap + '&rescale=-1,1';
}
const sourceId = 'source-' + name;
const layerId = 'layer-' + name;
map.addSource(sourceId, {
type: 'raster',
tiles: [tileUrl],
tileSize: 256,
minzoom: 8,
maxzoom: 19,
});
map.addLayer({
id: layerId,
type: 'raster',
source: sourceId,
paint: {
'raster-opacity': name === 'hillshade_multi' ? 1.0 : 0.0,
'raster-resampling': 'bilinear',
}
});
}
// Layer control event handlers
document.querySelectorAll('.layer-control input[type="checkbox"]').forEach(cb => {
cb.addEventListener('change', () => {
const layerId = 'layer-' + cb.dataset.layer;
if (map.getLayer(layerId)) {
map.setLayoutProperty(layerId, 'visibility',
cb.checked ? 'visible' : 'none');
}
// When checking a layer, set its opacity to 70% if it was 0
const slider = document.querySelector('.opacity-slider[data-layer="' + cb.dataset.layer + '"]');
if (cb.checked && slider && parseInt(slider.value) === 0) {
slider.value = 70;
const layerId = 'layer-' + cb.dataset.layer;
if (map.getLayer(layerId)) {
map.setPaintProperty(layerId, 'raster-opacity', 0.7);
}}
});
});
document.querySelectorAll('.opacity-slider').forEach(slider => {
slider.addEventListener('input', () => {
const layerId = 'layer-' + slider.dataset.layer;
if (map.getLayer(layerId)) {
map.setPaintProperty(layerId, 'raster-opacity', slider.value / 100);
}
});
});
// Coordinate display
map.on('mousemove', (e) => {
const { lng, lat } = e.lngLat;
document.getElementById('coords').textContent =
lat.toFixed(6) + 'N ' + lng.toFixed(6) + 'E';
});
// Fit map to bounds on load
map.on('load', () => {
setBasemap('osm');
// Add all visualization layers
LAYERS.forEach(layer => {
addVisualizationLayer(layer);
});
// Initially hide all except hillshade
LAYERS.forEach(layer => {
if (layer.name !== 'hillshade_multi') {
const layerId = 'layer-' + layer.name;
if (map.getLayer(layerId)) {
map.setLayoutProperty(layerId, 'visibility', 'none');
}
}
});
// Fit bounds
map.fitBounds([[BOUNDS.west, BOUNDS.south], [BOUNDS.east, BOUNDS.north]], { padding: 20 });
});
</script>
</body>
</html>''')
html = ''.join(html_parts)
# Write viewer HTML
viewer_dir = output_vis_dir / 'viewer'
viewer_dir.mkdir(exist_ok=True)
viewer_file = viewer_dir / f"{basename}.html"
with open(viewer_file, 'w', encoding='utf-8') as f:
f.write(html)
logger.info(f" Viewer: {viewer_file}")
return viewer_file

View File

@ -2,6 +2,10 @@
Each function takes (dem_file, basename, vis_dir, resolution) as explicit
parameters and returns the path to the output GeoTIFF file, or None on error.
When a SharedDEM object is provided via the `shared` parameter, pre-computed
data (gradient, NaN mask, LRM) is reused across visualizations to avoid
redundant I/O and computation.
"""
import logging
@ -14,7 +18,7 @@ import rasterio
from scipy.ndimage import generic_filter
from scipy.stats import binned_statistic_2d
from .gpu import HAS_GPU, to_gpu, to_cpu, xp_gaussian_filter, xp_uniform_filter, xp_minimum_filter
from .gpu import HAS_GPU, to_gpu, to_cpu, xp_gaussian_filter, xp_uniform_filter, xp_minimum_filter, xp_maximum_filter, gpu_cleanup
logger = logging.getLogger("lidar")
@ -26,8 +30,88 @@ else:
xp = np
def _save_tif(output_path, data, transform, crs, dtype='float32', count=1, nodata=None):
"""Helper to save a 2D or 3D array as GeoTIFF."""
class SharedDEM:
"""Pre-computed DEM data shared across all visualizations.
Reads the DEM once and pre-computes:
- NaN mask and filled DEM (avoids 20+ calls to _fill_nans)
- Gradient components (shared by hillshade, slope, aspect, curvature)
- LRM at 15m kernel (shared by lrm + anomalies)
"""
def __init__(self, dem_file, resolution):
dem_np, transform, crs = _read_dem(dem_file)
self.dem_file = dem_file
self.resolution = resolution
self.transform = transform
self.crs = crs
self.nan_mask = np.isnan(dem_np)
self.dem_np = dem_np.astype(np.float32)
# Pre-fill NaNs once (saves ~20 calls to NearestNDInterpolator)
self.filled, _ = _fill_nans(self.dem_np)
# Initialize GPU lazy caches before any filter calls
self._filled_gpu = None
self._dem_gpu = None
# Pre-compute gradient (shared by hillshade, slope, aspect, curvature)
self.dy = np.gradient(self.filled, resolution, axis=0)
self.dx = np.gradient(self.filled, resolution, axis=1)
self.slope_rad = np.arctan(np.sqrt(self.dx**2 + self.dy**2))
self.slope_deg = np.degrees(self.slope_rad)
self.aspect = np.mod(np.degrees(np.arctan2(self.dy, self.dx)), 360)
# Pre-compute LRM at 15m (shared by lrm + anomalies)
sigma_15 = 15.0 / resolution
local_mean_15 = _filter_nanaware_from_filled(self, xp_gaussian_filter, sigma=sigma_15)
self.lrm_15 = self.dem_np - local_mean_15
self.lrm_15[self.nan_mask] = np.nan
@property
def filled_gpu(self):
"""Lazy GPU copy of the filled DEM."""
if self._filled_gpu is None and HAS_GPU:
self._filled_gpu = to_gpu(self.filled)
return self._filled_gpu
@property
def dem_gpu(self):
"""Lazy GPU copy of the DEM."""
if self._dem_gpu is None and HAS_GPU:
self._dem_gpu = to_gpu(self.dem_np)
return self._dem_gpu
def _filter_nanaware_from_filled(shared, filter_func, *args, **kwargs):
"""Apply filter on pre-filled DEM data (skips expensive _fill_nans).
Uses the SharedDEM.filled array directly, then restores NaN mask.
If GPU is available, uses the lazy GPU copy to avoid CPU↔GPU transfers.
"""
if HAS_GPU and shared.filled_gpu is not None:
filled_gpu = to_gpu(shared.filled)
result_gpu = filter_func(filled_gpu, *args, **kwargs)
result = to_cpu(result_gpu)
gpu_cleanup()
else:
result = filter_func(shared.filled, *args, **kwargs)
result[shared.nan_mask] = np.nan
return result
def _save_tif(output_path, data, transform, crs, dtype='float32', count=1, nodata=None, nan_mask=None):
"""Helper to save a 2D or 3D array as GeoTIFF.
Args:
nan_mask: Optional boolean mask (True=NaN) to apply before saving.
Restores NaN zones in gradient-derived products that were
computed on the filled DEM.
"""
if nan_mask is not None:
data = np.array(data, dtype=dtype, copy=True)
data[nan_mask] = np.nan
# Auto-detect nodata for float types with NaN
if nodata is None and dtype.startswith('float') and np.any(np.isnan(data)):
nodata = float('nan')
@ -38,7 +122,7 @@ def _save_tif(output_path, data, transform, crs, dtype='float32', count=1, nodat
output_path, 'w', driver='GTiff',
height=height, width=width, count=count,
dtype=dtype, crs=crs, transform=transform,
compress='lzw', nodata=nodata
compress='deflate', nodata=nodata
) as dst:
dst.write(data.astype(dtype), 1)
elif data.ndim == 3:
@ -47,7 +131,7 @@ def _save_tif(output_path, data, transform, crs, dtype='float32', count=1, nodat
output_path, 'w', driver='GTiff',
height=height, width=width, count=bands,
dtype=dtype, crs=crs, transform=transform,
compress='lzw', nodata=nodata
compress='deflate', nodata=nodata
) as dst:
for i in range(bands):
dst.write(data[i].astype(dtype), i + 1)
@ -116,12 +200,12 @@ def _filter_nanaware(arr, filter_func, *args, use_gpu=True, **kwargs):
# Core terrain visualizations
# ============================================================
def generate_hillshade(dem_file, basename, vis_dir, resolution):
"""Generate multi-directional hillshade with slope shading — GPU if available.
def generate_hillshade(dem_file, basename, vis_dir, resolution, shared=None):
"""Generate multi-directional hillshade with contrast enhancement — GPU if available.
Combines 4-direction hillshade (NW, NE, SW, SE) with slope shading
for improved micro-relief visibility on flat terrain.
Result = 0.7 * hillshade + 0.3 * cos(slope).
Combines 4-direction hillshade (NW, NE, SW, SE) with slope shading.
Applies percentile normalization and gamma correction to restore
contrast lost by averaging multiple azimuths.
"""
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f" → Hillshade multidirectionnel{gpu_tag}...")
@ -129,19 +213,29 @@ def generate_hillshade(dem_file, basename, vis_dir, resolution):
output = vis_dir / f"{basename}_hillshade_multi.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
dem = to_gpu(dem_np)
dy, dx = xp.gradient(dem)
if shared:
transform = shared.transform
crs = shared.crs
dem = to_gpu(shared.dem_np)
dy = to_gpu(shared.dy) if HAS_GPU else shared.dy
dx = to_gpu(shared.dx) if HAS_GPU else shared.dx
slope = to_gpu(shared.slope_rad) if HAS_GPU else shared.slope_rad
aspect = xp.arctan2(dy, dx)
sin_slope = xp.sin(slope)
cos_slope = xp.cos(slope)
else:
dem_np, transform, crs = _read_dem(dem_file)
dem = to_gpu(dem_np)
dy, dx = xp.gradient(dem)
slope = xp.arctan(xp.sqrt(dx**2 + dy**2))
aspect = xp.arctan2(dy, dx)
sin_slope = xp.sin(slope)
cos_slope = xp.cos(slope)
azimuts = [315, 45, 225, 135]
altitude = 30
hillshades = []
slope = xp.arctan(xp.sqrt(dx**2 + dy**2))
aspect = xp.arctan2(dy, dx)
sin_slope = xp.sin(slope)
cos_slope = xp.cos(slope)
alt_rad = xp.radians(xp.array(altitude))
sin_alt = xp.sin(alt_rad)
cos_alt = xp.cos(alt_rad)
@ -152,10 +246,23 @@ def generate_hillshade(dem_file, basename, vis_dir, resolution):
hillshades.append(xp.clip(hs, 0, 1))
combined_hillshade = xp.mean(xp.array(hillshades), axis=0)
# Blend with slope shading for better micro-relief on flat terrain
slope_shaded = cos_slope # bright on flat, dark on steep
slope_shaded = cos_slope
combined = 0.7 * combined_hillshade + 0.3 * slope_shaded
_save_tif(output, to_cpu(combined), transform, crs)
# Contrast enhancement: percentile stretch + gamma
# Averaging 4 azimuths flattens contrast — this restores it
combined_np = to_cpu(combined)
nan_mask = shared.nan_mask if shared else np.isnan(to_cpu(dem_np) if HAS_GPU else dem_np)
valid = combined_np[~nan_mask]
if len(valid) > 0:
p2, p98 = np.percentile(valid, 2), np.percentile(valid, 98)
if p98 - p2 > 0.01:
combined_np = np.clip((combined_np - p2) / (p98 - p2), 0, 1)
# Gamma correction to enhance shadows
gamma = 0.8
combined_np = np.power(combined_np, gamma)
_save_tif(output, combined_np.astype(np.float32), transform, crs, nan_mask=nan_mask)
logger.info(f" ✓ Hillshade terminé ({time.time()-t0:.1f}s){gpu_tag}")
return output
except Exception as e:
@ -163,7 +270,7 @@ def generate_hillshade(dem_file, basename, vis_dir, resolution):
return None
def generate_slope(dem_file, basename, vis_dir, resolution):
def generate_slope(dem_file, basename, vis_dir, resolution, shared=None):
"""Generate slope map (degrees) — GPU if available."""
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f" → Pente (Slope){gpu_tag}...")
@ -171,11 +278,20 @@ def generate_slope(dem_file, basename, vis_dir, resolution):
output = vis_dir / f"{basename}_slope.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
dem = to_gpu(dem_np)
dy, dx = xp.gradient(dem)
slope = xp.arctan(xp.sqrt(dx**2 + dy**2)) * 180 / xp.pi
_save_tif(output, to_cpu(slope), transform, crs)
if shared:
transform = shared.transform
crs = shared.crs
slope = shared.slope_deg
nan_mask = shared.nan_mask
if HAS_GPU:
slope = to_gpu(slope)
else:
dem_np, transform, crs = _read_dem(dem_file)
dem = to_gpu(dem_np)
dy, dx = xp.gradient(dem)
slope = xp.arctan(xp.sqrt(dx**2 + dy**2)) * 180 / xp.pi
nan_mask = np.isnan(dem_np)
_save_tif(output, to_cpu(slope) if HAS_GPU else slope, transform, crs, nan_mask=nan_mask)
logger.info(f" ✓ Pente terminée ({time.time()-t0:.1f}s){gpu_tag}")
return output
except Exception as e:
@ -183,7 +299,7 @@ def generate_slope(dem_file, basename, vis_dir, resolution):
return None
def generate_aspect(dem_file, basename, vis_dir, resolution):
def generate_aspect(dem_file, basename, vis_dir, resolution, shared=None):
"""Generate aspect (slope orientation) map — GPU if available."""
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f" → Aspect (Orientation){gpu_tag}...")
@ -191,12 +307,21 @@ def generate_aspect(dem_file, basename, vis_dir, resolution):
output = vis_dir / f"{basename}_aspect.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
dem = to_gpu(dem_np)
dy, dx = xp.gradient(dem)
aspect = xp.arctan2(dy, dx) * 180 / xp.pi
aspect = xp.mod(aspect, 360)
_save_tif(output, to_cpu(aspect), transform, crs)
if shared:
transform = shared.transform
crs = shared.crs
aspect = shared.aspect
nan_mask = shared.nan_mask
if HAS_GPU:
aspect = to_gpu(aspect)
else:
dem_np, transform, crs = _read_dem(dem_file)
dem = to_gpu(dem_np)
dy, dx = xp.gradient(dem)
aspect = xp.arctan2(dy, dx) * 180 / xp.pi
aspect = xp.mod(aspect, 360)
nan_mask = np.isnan(dem_np)
_save_tif(output, to_cpu(aspect) if HAS_GPU else aspect, transform, crs, nan_mask=nan_mask)
logger.info(f" ✓ Aspect terminé ({time.time()-t0:.1f}s){gpu_tag}")
return output
except Exception as e:
@ -204,7 +329,7 @@ def generate_aspect(dem_file, basename, vis_dir, resolution):
return None
def generate_curvature(dem_file, basename, vis_dir, resolution):
def generate_curvature(dem_file, basename, vis_dir, resolution, shared=None):
"""Generate curvature (terrain concavity/convexity) map — GPU if available."""
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f" → Courbure (Curvature){gpu_tag}...")
@ -212,14 +337,24 @@ def generate_curvature(dem_file, basename, vis_dir, resolution):
output = vis_dir / f"{basename}_curvature.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
dem = to_gpu(dem_np)
dz_dx = xp.gradient(dem, axis=1)
dz_dy = xp.gradient(dem, axis=0)
d2z_dx2 = xp.gradient(dz_dx, axis=1)
d2z_dy2 = xp.gradient(dz_dy, axis=0)
if shared:
transform = shared.transform
crs = shared.crs
dx = shared.dx
dy = shared.dy
nan_mask = shared.nan_mask
if HAS_GPU:
dx = to_gpu(dx)
dy = to_gpu(dy)
else:
dem_np, transform, crs = _read_dem(dem_file)
dem = to_gpu(dem_np)
dy, dx = xp.gradient(dem)
nan_mask = np.isnan(dem_np)
d2z_dx2 = xp.gradient(dx, axis=1)
d2z_dy2 = xp.gradient(dy, axis=0)
curvature = (d2z_dx2 + d2z_dy2) / 2
_save_tif(output, to_cpu(curvature), transform, crs)
_save_tif(output, to_cpu(curvature), transform, crs, nan_mask=nan_mask)
logger.info(f" ✓ Courbure terminée ({time.time()-t0:.1f}s){gpu_tag}")
return output
except Exception as e:
@ -232,7 +367,7 @@ def generate_curvature(dem_file, basename, vis_dir, resolution):
# GPU-accelerated visualizations
# ============================================================
def generate_lrm(dem_file, basename, vis_dir, resolution):
def generate_lrm(dem_file, basename, vis_dir, resolution, shared=None):
"""Local Relief Model - deviation from local mean (GPU if available)."""
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f" → Local Relief Model{gpu_tag}...")
@ -240,11 +375,16 @@ def generate_lrm(dem_file, basename, vis_dir, resolution):
output = vis_dir / f"{basename}_lrm.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
nan_mask = np.isnan(dem_np)
local_mean = _filter_nanaware(dem_np, xp_gaussian_filter, sigma=15/resolution)
lrm = dem_np - local_mean
lrm[nan_mask] = np.nan
if shared:
transform = shared.transform
crs = shared.crs
lrm = shared.lrm_15.copy()
else:
dem_np, transform, crs = _read_dem(dem_file)
nan_mask = np.isnan(dem_np)
local_mean = _filter_nanaware(dem_np, xp_gaussian_filter, sigma=15/resolution)
lrm = dem_np - local_mean
lrm[nan_mask] = np.nan
_save_tif(output, lrm.astype(np.float32), transform, crs)
logger.info(f" ✓ LRM terminé ({time.time()-t0:.1f}s){gpu_tag}")
return output
@ -253,7 +393,7 @@ def generate_lrm(dem_file, basename, vis_dir, resolution):
return None
def generate_svf(dem_file, basename, vis_dir, resolution):
def generate_svf(dem_file, basename, vis_dir, resolution, shared=None):
"""Sky-View Factor - ray-tracing on 16 azimuths (GPU if available).
For each pixel, trace rays in N directions, find the max horizon
@ -266,23 +406,33 @@ def generate_svf(dem_file, basename, vis_dir, resolution):
output = vis_dir / f"{basename}_svf.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
rows, cols = dem_np.shape
res = resolution
dem = to_gpu(dem_np)
if shared:
transform = shared.transform
crs = shared.crs
dem_np = shared.dem_np
rows, cols = dem_np.shape
res = resolution
dem = to_gpu(shared.filled) if HAS_GPU else shared.filled
nan_mask = shared.nan_mask
else:
dem_np, transform, crs = _read_dem(dem_file)
rows, cols = dem_np.shape
res = resolution
nan_mask = np.isnan(dem_np)
filled, _ = _fill_nans(dem_np)
dem = to_gpu(filled) if HAS_GPU else filled
n_dirs = 16
angles = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False)
dx = np.cos(angles)
dy = np.sin(angles)
dx_dir = np.cos(angles)
dy_dir = np.sin(angles)
max_dist = int(50 / res)
padded = xp.pad(dem, max_dist, mode='constant', constant_values=xp.nan)
svf = xp.zeros_like(dem)
for d_idx in range(n_dirs):
ddx, ddy = dx[d_idx], dy[d_idx]
ddx, ddy = dx_dir[d_idx], dy_dir[d_idx]
horizon = xp.zeros_like(dem)
# Pre-compute all valid steps for this direction
@ -307,6 +457,7 @@ def generate_svf(dem_file, basename, vis_dir, resolution):
svf /= n_dirs
svf_np = to_cpu(svf).astype(np.float32)
svf_np[nan_mask] = np.nan
_save_tif(output, svf_np, transform, crs)
logger.info(f" ✓ SVF terminé ({time.time()-t0:.1f}s){gpu_tag}")
return output
@ -315,7 +466,7 @@ def generate_svf(dem_file, basename, vis_dir, resolution):
return None
def generate_openness(dem_file, basename, vis_dir, resolution, positive=True):
def generate_openness(dem_file, basename, vis_dir, resolution, positive=True, shared=None):
"""Positive/Negative Openness - true zenith/nadir angle computation (GPU if available).
For each pixel, in 8 directions (N, NE, E, SE, S, SW, W, NW):
@ -330,23 +481,33 @@ def generate_openness(dem_file, basename, vis_dir, resolution, positive=True):
output = vis_dir / f"{basename}_{name}.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
rows, cols = dem_np.shape
res = resolution
dem = to_gpu(dem_np)
if shared:
transform = shared.transform
crs = shared.crs
dem_np = shared.dem_np
rows, cols = dem_np.shape
res = resolution
dem = to_gpu(shared.filled) if HAS_GPU else shared.filled
nan_mask = shared.nan_mask
else:
dem_np, transform, crs = _read_dem(dem_file)
rows, cols = dem_np.shape
res = resolution
nan_mask = np.isnan(dem_np)
filled, _ = _fill_nans(dem_np)
dem = to_gpu(filled) if HAS_GPU else filled
n_dirs = 8
angles = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False)
dx = np.cos(angles)
dy = np.sin(angles)
dx_dir = np.cos(angles)
dy_dir = np.sin(angles)
max_dist = int(50 / res)
padded = xp.pad(dem, max_dist, mode='constant', constant_values=xp.nan)
openness_sum = xp.zeros_like(dem)
for d_idx in range(n_dirs):
ddx, ddy = dx[d_idx], dy[d_idx]
ddx, ddy = dx_dir[d_idx], dy_dir[d_idx]
max_angle = xp.zeros_like(dem)
for step in range(1, max_dist + 1):
@ -370,6 +531,7 @@ def generate_openness(dem_file, basename, vis_dir, resolution, positive=True):
openness_sum += max_angle
openness_result = to_cpu(xp.degrees(openness_sum / n_dirs)).astype(np.float32)
openness_result[nan_mask] = np.nan
_save_tif(output, openness_result, transform, crs)
logger.info(f"{name} terminé ({time.time()-t0:.1f}s){gpu_tag}")
return output
@ -378,7 +540,65 @@ def generate_openness(dem_file, basename, vis_dir, resolution, positive=True):
return None
def generate_mslrm(dem_file, basename, vis_dir, resolution):
def generate_local_dominance(dem_file, basename, vis_dir, resolution, shared=None,
radius=15, pmin=2, pmax=98):
"""Local Dominance — proportion of neighborhood below center point.
LD = (dem - local_min) / (local_max - local_min + epsilon)
High values = locally dominant (peak, ridge)
Low values = locally recessed (valley, pit)
Uses minimum/maximum filters on the filled DEM, then restores NaN mask.
Complements openness by measuring local height position rather than angular extent.
"""
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f" → Dominance Locale (rayon {radius}m){gpu_tag}...")
t0 = time.time()
output = vis_dir / f"{basename}_local_dominance.tif"
try:
if shared:
transform = shared.transform
crs = shared.crs
nan_mask = shared.nan_mask
dem_np = shared.dem_np
else:
dem_np, transform, crs = _read_dem(dem_file)
nan_mask = np.isnan(dem_np)
radius_px = max(1, int(radius / resolution))
if radius_px % 2 == 0:
radius_px += 1
local_min = _filter_nanaware_from_filled(
shared, xp_minimum_filter, size=radius_px
) if shared else _filter_nanaware(
dem_np, xp_minimum_filter, size=radius_px
)
local_max_data = _filter_nanaware_from_filled(
shared, xp_maximum_filter, size=radius_px
) if shared else _filter_nanaware(
dem_np, xp_maximum_filter, size=radius_px
)
# Local dominance ratio
epsilon = 0.01 # Avoid division by zero on flat terrain
local_range = local_max_data - local_min + epsilon
dominance = (dem_np - local_min) / local_range
dominance = np.clip(dominance, 0, 1)
dominance[nan_mask] = np.nan
_save_tif(output, dominance.astype(np.float32), transform, crs, nan_mask=nan_mask)
logger.info(f" ✓ Dominance Locale terminée ({time.time()-t0:.1f}s){gpu_tag}")
return output
except Exception as e:
logger.error(f" ✗ Erreur local_dominance: {e}", exc_info=True)
return None
def generate_mslrm(dem_file, basename, vis_dir, resolution, shared=None):
"""Multi-Scale Relief Model (MSRM) - LRM at 5 scales combined (GPU if available)."""
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f" → Multi-Scale Relief Model (MSRM){gpu_tag}...")
@ -386,15 +606,24 @@ def generate_mslrm(dem_file, basename, vis_dir, resolution):
output = vis_dir / f"{basename}_mslrm.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
nan_mask = np.isnan(dem_np)
if shared:
transform = shared.transform
crs = shared.crs
dem_np = shared.dem_np
nan_mask = shared.nan_mask
else:
dem_np, transform, crs = _read_dem(dem_file)
nan_mask = np.isnan(dem_np)
sigmas = [5, 10, 25, 50, 100]
lrm_stack = []
for sigma in sigmas:
sigma_px = sigma / resolution
local_mean = _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_px)
if shared:
local_mean = _filter_nanaware_from_filled(shared, xp_gaussian_filter, sigma=sigma_px)
else:
local_mean = _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_px)
lrm = dem_np - local_mean
lrm[nan_mask] = np.nan
valid_lrm = lrm[~nan_mask]
@ -416,7 +645,7 @@ def generate_mslrm(dem_file, basename, vis_dir, resolution):
return None
def generate_tpi(dem_file, basename, vis_dir, resolution):
def generate_tpi(dem_file, basename, vis_dir, resolution, shared=None):
"""Multi-Scale Topographic Position Index (GPU if available).
TPI = elevation - mean(neighborhood).
@ -428,20 +657,32 @@ def generate_tpi(dem_file, basename, vis_dir, resolution):
output = vis_dir / f"{basename}_tpi.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
nan_mask = np.isnan(dem_np)
if shared:
transform = shared.transform
crs = shared.crs
dem_np = shared.dem_np
nan_mask = shared.nan_mask
else:
dem_np, transform, crs = _read_dem(dem_file)
nan_mask = np.isnan(dem_np)
fine_size = int(5 / resolution)
if fine_size % 2 == 0:
fine_size += 1
fine_mean = _filter_nanaware(dem_np, xp_uniform_filter, size=fine_size)
if shared:
fine_mean = _filter_nanaware_from_filled(shared, xp_uniform_filter, size=fine_size)
else:
fine_mean = _filter_nanaware(dem_np, xp_uniform_filter, size=fine_size)
tpi_fine = dem_np - fine_mean
tpi_fine[nan_mask] = np.nan
broad_size = int(100 / resolution)
if broad_size % 2 == 0:
broad_size += 1
broad_mean = _filter_nanaware(dem_np, xp_uniform_filter, size=broad_size)
if shared:
broad_mean = _filter_nanaware_from_filled(shared, xp_uniform_filter, size=broad_size)
else:
broad_mean = _filter_nanaware(dem_np, xp_uniform_filter, size=broad_size)
tpi_broad = dem_np - broad_mean
tpi_broad[nan_mask] = np.nan
@ -464,7 +705,7 @@ def generate_tpi(dem_file, basename, vis_dir, resolution):
# SAILORE
# ============================================================
def generate_sailore(dem_file, basename, vis_dir, resolution):
def generate_sailore(dem_file, basename, vis_dir, resolution, shared=None):
"""SAILORE - Self-Adaptive Improved Local Relief Model (GPU if available).
Kernel size adapts to local slope: flat areas get larger kernels,
@ -476,23 +717,40 @@ def generate_sailore(dem_file, basename, vis_dir, resolution):
output = vis_dir / f"{basename}_sailore.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
nan_mask = np.isnan(dem_np)
gy, gx = np.gradient(dem_np, resolution)
slope = np.arctan(np.sqrt(gx**2 + gy**2))
slope_deg = np.degrees(slope)
slope_deg[nan_mask] = np.nan
if shared:
transform = shared.transform
crs = shared.crs
dem_np = shared.dem_np
nan_mask = shared.nan_mask
slope_deg = shared.slope_deg
else:
dem_np, transform, crs = _read_dem(dem_file)
nan_mask = np.isnan(dem_np)
gy, gx = np.gradient(dem_np, resolution)
slope = np.arctan(np.sqrt(gx**2 + gy**2))
slope_deg = np.degrees(slope)
slope_deg[nan_mask] = np.nan
sigma_min = 2.0 / resolution
sigma_max = 25.0 / resolution
slope_norm = np.clip(slope_deg / 30.0, 0, 1)
lrm_fine = dem_np - _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_min)
if shared:
lrm_fine = dem_np - _filter_nanaware_from_filled(shared, xp_gaussian_filter, sigma=sigma_min)
else:
lrm_fine = dem_np - _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_min)
lrm_fine[nan_mask] = np.nan
lrm_medium = dem_np - _filter_nanaware(dem_np, xp_gaussian_filter, sigma=(sigma_min + sigma_max) / 2)
if shared:
lrm_medium = dem_np - _filter_nanaware_from_filled(shared, xp_gaussian_filter, sigma=(sigma_min + sigma_max) / 2)
else:
lrm_medium = dem_np - _filter_nanaware(dem_np, xp_gaussian_filter, sigma=(sigma_min + sigma_max) / 2)
lrm_medium[nan_mask] = np.nan
lrm_coarse = dem_np - _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_max)
if shared:
lrm_coarse = dem_np - _filter_nanaware_from_filled(shared, xp_gaussian_filter, sigma=sigma_max)
else:
lrm_coarse = dem_np - _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_max)
lrm_coarse[nan_mask] = np.nan
w_fine = slope_norm
@ -516,7 +774,7 @@ def generate_sailore(dem_file, basename, vis_dir, resolution):
# Roughness
# ============================================================
def generate_roughness(dem_file, basename, vis_dir, resolution):
def generate_roughness(dem_file, basename, vis_dir, resolution, shared=None):
"""Surface roughness - standard deviation of elevation in a window (GPU-accelerated)."""
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f" → Rugosité de surface{gpu_tag}...")
@ -524,15 +782,28 @@ def generate_roughness(dem_file, basename, vis_dir, resolution):
output = vis_dir / f"{basename}_roughness.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
nan_mask = np.isnan(dem_np)
if shared:
transform = shared.transform
crs = shared.crs
dem_np = shared.dem_np
nan_mask = shared.nan_mask
else:
dem_np, transform, crs = _read_dem(dem_file)
nan_mask = np.isnan(dem_np)
window_size = int(5 / resolution)
if window_size % 2 == 0:
window_size += 1
# Vectorized std: sqrt(E[X²] - (E[X])²) via uniform_filter (NaN-aware)
local_mean = _filter_nanaware(dem_np.astype(np.float64), xp_uniform_filter, size=window_size)
local_mean_sq = _filter_nanaware(dem_np.astype(np.float64)**2, xp_uniform_filter, size=window_size)
if shared:
local_mean = _filter_nanaware_from_filled(shared, xp_uniform_filter, size=window_size)
# For local_mean_sq, we need to filter filled², not filled
local_mean_sq = _filter_nanaware(shared.filled.astype(np.float64)**2, xp_uniform_filter, size=window_size)
local_mean_sq[shared.nan_mask] = np.nan
else:
local_mean = _filter_nanaware(dem_np.astype(np.float64), xp_uniform_filter, size=window_size)
local_mean_sq = _filter_nanaware(dem_np.astype(np.float64)**2, xp_uniform_filter, size=window_size)
roughness = np.sqrt(np.maximum(local_mean_sq - local_mean * local_mean, 0))
roughness[nan_mask] = np.nan
@ -549,7 +820,7 @@ def generate_roughness(dem_file, basename, vis_dir, resolution):
# Anomalies
# ============================================================
def generate_anomalies(dem_file, basename, vis_dir, resolution):
def generate_anomalies(dem_file, basename, vis_dir, resolution, shared=None):
"""Statistical anomaly detection - z-score of local relief + Local Moran's I — GPU if available."""
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f" → Détection anomalies statistiques{gpu_tag}...")
@ -557,12 +828,19 @@ def generate_anomalies(dem_file, basename, vis_dir, resolution):
output = vis_dir / f"{basename}_anomalies.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
nan_mask = np.isnan(dem_np)
if shared:
transform = shared.transform
crs = shared.crs
dem_np = shared.dem_np
nan_mask = shared.nan_mask
lrm = shared.lrm_15.copy()
else:
dem_np, transform, crs = _read_dem(dem_file)
nan_mask = np.isnan(dem_np)
lrm_mean_val = _filter_nanaware(dem_np, xp_gaussian_filter, sigma=15 / resolution)
lrm = dem_np - lrm_mean_val
lrm[nan_mask] = np.nan
lrm_mean_val = _filter_nanaware(dem_np, xp_gaussian_filter, sigma=15 / resolution)
lrm = dem_np - lrm_mean_val
lrm[nan_mask] = np.nan
valid_lrm = lrm[~nan_mask]
lrm_mean = np.nanmean(valid_lrm) if len(valid_lrm) > 0 else 0
lrm_std = max(np.nanstd(valid_lrm), 0.01) if len(valid_lrm) > 0 else 0.01
@ -572,7 +850,10 @@ def generate_anomalies(dem_file, basename, vis_dir, resolution):
if window % 2 == 0:
window += 1
local_mean = _filter_nanaware(z_score, xp_uniform_filter, size=window)
if shared:
local_mean = _filter_nanaware_from_filled(shared, xp_uniform_filter, size=window)
else:
local_mean = _filter_nanaware(z_score, xp_uniform_filter, size=window)
z_mean = np.nanmean(valid_lrm) if len(valid_lrm) > 0 else 0
z_std = max(np.nanstd(z_score[~nan_mask]), 0.01) if np.any(~nan_mask) else 0.01
morans_i = z_score * (local_mean - z_mean) / z_std
@ -591,7 +872,7 @@ def generate_anomalies(dem_file, basename, vis_dir, resolution):
# Wavelet
# ============================================================
def generate_wavelet(dem_file, basename, vis_dir, resolution):
def generate_wavelet(dem_file, basename, vis_dir, resolution, shared=None):
"""Mexican Hat wavelet multi-scale analysis (GPU if available).
CWT 2D at multiple scales to detect circular features.
@ -602,15 +883,22 @@ def generate_wavelet(dem_file, basename, vis_dir, resolution):
output = vis_dir / f"{basename}_wavelet.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
nan_mask = np.isnan(dem_np)
if shared:
transform = shared.transform
crs = shared.crs
dem_np = shared.dem_np
nan_mask = shared.nan_mask
filled = shared.filled.astype(np.float64)
else:
dem_np, transform, crs = _read_dem(dem_file)
nan_mask = np.isnan(dem_np)
filled, _ = _fill_nans(dem_np.astype(np.float64))
scales = [2, 5, 10, 20, 50]
wavelet_stack = []
for scale_m in scales:
sigma_px = scale_m / resolution
filled, _ = _fill_nans(dem_np.astype(np.float64))
if HAS_GPU:
from cupyx.scipy.ndimage import gaussian_laplace as gpu_gaussian_laplace
response = -gpu_gaussian_laplace(to_gpu(filled), sigma=sigma_px)
@ -697,36 +985,68 @@ def _d8_accumulate_numba(flow_dir, nodata_mask, rows, cols):
return None
def generate_flow(dem_file, basename, vis_dir, resolution):
"""Flow accumulation using D8 algorithm sink filling on GPU, accumulation via numba."""
def _priority_flood(dem, nodata_mask):
"""Priority-flood algorithm for sink filling (Wang & Liu 2006).
O(n log n) compared to 50 iterations of minimum_filter.
Fills pits so water can flow downhill.
"""
import heapq
rows, cols = dem.shape
filled = dem.copy()
closed = nodata_mask.copy()
open_queue = []
# Initialize border cells
for r in range(rows):
for c in [0, cols - 1]:
if not closed[r, c]:
heapq.heappush(open_queue, (filled[r, c], r, c))
closed[r, c] = True
for c in range(1, cols - 1):
for r in [0, rows - 1]:
if not closed[r, c]:
heapq.heappush(open_queue, (filled[r, c], r, c))
closed[r, c] = True
dx8 = [1, 1, 0, -1, -1, -1, 0, 1]
dy8 = [0, 1, 1, 1, 0, -1, -1, -1]
while open_queue:
elev, r, c = heapq.heappop(open_queue)
for d in range(8):
nr, nc = r + dy8[d], c + dx8[d]
if 0 <= nr < rows and 0 <= nc < cols and not closed[nr, nc]:
if filled[nr, nc] < elev:
filled[nr, nc] = elev # Fill the pit
closed[nr, nc] = True
heapq.heappush(open_queue, (filled[nr, nc], nr, nc))
return filled
def generate_flow(dem_file, basename, vis_dir, resolution, shared=None):
"""Flow accumulation using D8 algorithm — priority-flood sink filling, accumulation via numba."""
gpu_tag = " [GPU]" if HAS_GPU else ""
logger.info(f" → Accumulation de flux D8{gpu_tag}...")
t0 = time.time()
output = vis_dir / f"{basename}_flow.tif"
try:
dem_np, transform, crs = _read_dem(dem_file)
if shared:
transform = shared.transform
crs = shared.crs
dem_np = shared.dem_np
nodata_mask = shared.nan_mask
else:
dem_np, transform, crs = _read_dem(dem_file)
nodata_mask = np.isnan(dem_np)
rows, cols = dem_np.shape
nodata_mask = np.isnan(dem_np)
# Sink filling — GPU-accelerated
dem_gpu = to_gpu(dem_np)
nodata_mask_gpu = xp.isnan(dem_gpu)
dem_filled = xp.copy(dem_gpu)
dem_filled[nodata_mask_gpu] = xp.nanmax(dem_gpu) + 1000
from scipy.ndimage import generate_binary_structure
struct = generate_binary_structure(2, 2)
for _ in range(50):
neighbor_min = xp_minimum_filter(dem_filled, footprint=struct)
sinks = (dem_filled < neighbor_min) & ~nodata_mask_gpu
if not xp.any(sinks):
break
dem_filled = xp.where(sinks, neighbor_min, dem_filled)
dem_filled[nodata_mask_gpu] = xp.nan
dem_filled_np = to_cpu(dem_filled)
# Sink filling — priority-flood (O(n log n), faster than 50× minimum_filter)
dem_filled_np = _priority_flood(dem_np, nodata_mask)
# D8 slope — vectorized
dx8 = np.array([1, 1, 0, -1, -1, -1, 0, 1], dtype=np.int32)

60
run.sh
View File

@ -9,15 +9,13 @@
# -v Mode verbeux (timestamps + niveaux)
# --debug Mode debug (détails internes fichier:ligne)
# -f / --force Régénérer tous les fichiers même si existants
# --keep-tif Conserver les fichiers TIFF intermédiaires
# --no-viewer Ne pas générer le viewer web (COGs + HTML)
# --keep-tif Conserver les fichiers TIFF pour régénérer les WebP
# --force-classification
# Reclassifier le sol même si le fichier .las existe déjà
# --ground-classification {auto,smrf,pmf,csf}
# --ground-classification {auto,smrf,csf}
# Méthode de classification du sol (défaut: auto)
# --file NOM... Traiter un ou plusieurs fichiers LAZ spécifiques
# --test Exécuter les tests unitaires
# serve Démarrer le serveur cartographique
# -h Afficher l'aide complète
set -e
@ -27,33 +25,11 @@ INPUT_DIR="${SCRIPT_DIR}/input"
OUTPUT_DIR="${SCRIPT_DIR}/output"
IMAGE_NAME="lidar-lidar"
# Serve command — start the web map server
if [ "$1" = "serve" ]; then
# Build l'image si elle n'existe pas
if ! docker image inspect "$IMAGE_NAME" >/dev/null 2>&1; then
echo "Build de l'image Docker..."
docker build -t "$IMAGE_NAME" "$SCRIPT_DIR"
fi
mkdir -p "$OUTPUT_DIR"
echo "============================================"
echo " Serveur cartographique LiDAR"
echo "============================================"
echo " Viewer: http://localhost:8000/viewer"
echo " TiTiler: http://localhost:8000/cog/"
echo "============================================"
docker run --rm -p 8000:8000 \
-v "${OUTPUT_DIR}:/data/output" \
"$IMAGE_NAME" \
python3 -m lidar_pipeline.server /data/output
exit 0
fi
# Afficher l'aide si aucun argument
if [ $# -eq 0 ]; then
echo "Pipeline LiDAR Archéologique"
echo ""
echo "Usage: $0 [options]"
echo " $0 serve # Démarrer le serveur cartographique"
echo ""
echo "Options:"
echo " -r RESOLUTION Résolution en m/px (défaut: 0.5)"
@ -64,13 +40,11 @@ if [ $# -eq 0 ]; then
echo " -f / --force Régénérer tous les fichiers même si les WebP existent"
echo " --force-classification"
echo " Reclassifier le sol même si le fichier .las existe"
echo " --keep-tif Conserver les fichiers TIFF intermédiaires"
echo " --no-viewer Ne pas générer le viewer web"
echo " --ground-classification {auto,smrf,pmf,csf}"
echo " --keep-tif Conserver les TIFF pour régénérer les WebP"
echo " --ground-classification {auto,smrf,csf}"
echo " Méthode de classification du sol (défaut: auto)"
echo " --file NOM... Traiter un ou plusieurs fichiers LAZ (nom complet sans .laz/.las)"
echo " --test Exécuter les tests unitaires"
echo " serve Démarrer le serveur cartographique"
echo " -h Afficher cette aide"
echo ""
echo "Exemples:"
@ -78,11 +52,10 @@ if [ $# -eq 0 ]; then
echo " $0 -g -w 4 # GPU + 4 workers"
echo " $0 -g -v # GPU + verbeux"
echo " $0 -g -r 0.2 # Haute résolution"
echo " $0 -g --force # Tout régénérer (WebP + classification)"
echo " $0 -g --force # Régénérer WebP (DTM conservé si --keep-tif)"
echo " $0 -g --force-classification # Reclassifier le sol seulement"
echo " $0 -g --ground-classification pmf # Forcer PMF"
echo " $0 -g --ground-classification csf # Forcer CSF (terrain complexe)"
echo " $0 -g --file LHD_...IGN69.copc # Un fichier"
echo " $0 serve # Démarrer le serveur web"
exit 0
fi
@ -95,7 +68,6 @@ FILE_ARGS=""
GROUND_METHOD=""
FORCE_CLASSIFY_FLAG=""
KEEP_TIF_FLAG=""
NO_VIEWER_FLAG=""
# Parse arguments manually (more robust than getopts for mixed short/long options)
while [ $# -gt 0 ]; do
@ -109,7 +81,6 @@ while [ $# -gt 0 ]; do
--force) FORCE_FLAG="--force"; shift ;;
--force-classification) FORCE_CLASSIFY_FLAG="--force-classification"; shift ;;
--keep-tif) KEEP_TIF_FLAG="--keep-tif"; shift ;;
--no-viewer) NO_VIEWER_FLAG="--no-viewer"; shift ;;
--ground-classification) GROUND_METHOD="$2"; shift 2 ;;
--ground-classification=*) GROUND_METHOD="${1#--ground-classification=}"; shift ;;
--file) shift; while [ $# -gt 0 ] && [[ ! "$1" =~ ^- ]]; do FILE_ARGS="$FILE_ARGS $1"; shift; done ;;
@ -118,7 +89,6 @@ while [ $# -gt 0 ]; do
echo "Pipeline LiDAR Archéologique"
echo ""
echo "Usage: $0 [options]"
echo " $0 serve # Démarrer le serveur cartographique"
echo ""
echo "Options:"
echo " -r RESOLUTION Résolution en m/px (défaut: 0.5)"
@ -129,13 +99,11 @@ while [ $# -gt 0 ]; do
echo " -f / --force Régénérer tous les fichiers même si les WebP existent"
echo " --force-classification"
echo " Reclassifier le sol même si le fichier .las existe"
echo " --keep-tif Conserver les fichiers TIFF intermédiaires"
echo " --no-viewer Ne pas générer le viewer web"
echo " --ground-classification {auto,smrf,pmf,csf}"
echo " --keep-tif Conserver les TIFF pour régénérer les WebP"
echo " --ground-classification {auto,smrf,csf}"
echo " Méthode de classification du sol (défaut: auto)"
echo " --file NOM... Traiter un ou plusieurs fichiers LAZ (nom complet sans .laz/.las)"
echo " --test Exécuter les tests unitaires"
echo " serve Démarrer le serveur cartographique"
echo " -h Afficher cette aide"
echo ""
echo "Exemples:"
@ -143,11 +111,10 @@ while [ $# -gt 0 ]; do
echo " $0 -g -w 4 # GPU + 4 workers"
echo " $0 -g -v # GPU + verbeux"
echo " $0 -g -r 0.2 # Haute résolution"
echo " $0 -g --force # Tout régénérer (WebP + classification)"
echo " $0 -g --force # Régénérer WebP (DTM conservé si --keep-tif)"
echo " $0 -g --force-classification # Reclassifier le sol seulement"
echo " $0 -g --ground-classification pmf # Forcer PMF"
echo " $0 -g --ground-classification csf # Forcer CSF (terrain complexe)"
echo " $0 -g --file LHD_...IGN69.copc # Un fichier"
echo " $0 serve # Démarrer le serveur web"
exit 0
;;
*) echo "Option invalide: $1" >&2; exit 1 ;;
@ -167,7 +134,7 @@ if [[ " $* " == *" --test "* ]]; then
echo "============================================"
echo " Tests unitaires LiDAR Pipeline"
echo "============================================"
docker run --rm $GPU_FLAG \
docker run --rm --init $GPU_FLAG \
"$IMAGE_NAME" \
python3 -m pytest -v --pyargs lidar_pipeline.tests
exit $?
@ -193,14 +160,13 @@ echo " Verbeux : $([ -n "$VERBOSE_FLAG" ] && echo 'OUI' || echo 'non')"
echo " Force : $([ -n "$FORCE_FLAG" ] && echo 'OUI' || echo 'non')"
echo " Force classif.: $([ -n "$FORCE_CLASSIFY_FLAG" ] && echo 'OUI' || echo 'non')"
echo " Keep TIFF : $([ -n "$KEEP_TIF_FLAG" ] && echo 'OUI' || echo 'non')"
echo " Viewer web : $([ -n "$NO_VIEWER_FLAG" ] && echo 'non' || echo 'OUI')"
echo " Classification sol : $([ -n "$GROUND_METHOD" ] && echo "$GROUND_METHOD" || echo 'auto')"
if [ -n "$FILE_ARGS" ]; then
echo " Fichiers :${FILE_ARGS}"
fi
echo "============================================"
CMD_ARGS="-o /data/output -r $RESOLUTION -w $WORKERS $VERBOSE_FLAG $FORCE_FLAG $FORCE_CLASSIFY_FLAG $KEEP_TIF_FLAG $NO_VIEWER_FLAG"
CMD_ARGS="-o /data/output -r $RESOLUTION -w $WORKERS $VERBOSE_FLAG $FORCE_FLAG $FORCE_CLASSIFY_FLAG $KEEP_TIF_FLAG"
if [ -n "$GROUND_METHOD" ]; then
CMD_ARGS="$CMD_ARGS --ground-classification $GROUND_METHOD"
fi
@ -208,7 +174,7 @@ if [ -n "$FILE_ARGS" ]; then
CMD_ARGS="$CMD_ARGS --file $FILE_ARGS"
fi
docker run --rm $GPU_FLAG \
docker run --rm --init $GPU_FLAG \
--user 1000:1000 \
-v "${INPUT_DIR}:/data/input:ro" \
-v "${OUTPUT_DIR}:/data/output" \