Files
lidar_rendu/lidar_pipeline/cli.py
Jacquin Antoine 1cf8e1752f Remove PMF, fix NaN in gradient visualizations, fix pos_open/neg_open shared param
- Remove PMF from ground classification options (PDAL recommends SMRF over PMF)
- Auto-detection now uses CSF for urban/complex terrain instead of PMF
- Add z_std > 30m heuristic to auto-select CSF for complex terrain
- Fix pos_open/neg_open lambda missing 'shared' parameter (NameError in workers)
- Fix NaN mask not restored in hillshade, slope, aspect, curvature
  (gradient-based products computed on filled DEM lost NaN transparency)
- Add nan_mask parameter to _save_tif for centralized NaN restoration
- DTM TIF kept by default (no longer deleted after WebP conversion)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-14 00:50:45 +02:00

243 lines
8.3 KiB
Python

"""Command-line interface for the LiDAR archaeological pipeline.
Handles argument parsing, logging configuration, and entry point.
"""
import argparse
import logging
import os
import shutil
import signal
import sys
from .pipeline import LidarArchaeoPipeline
from .gpu import log_gpu_status
logger = logging.getLogger("lidar")
def setup_logging(verbose=False, debug=False):
"""Configure the 'lidar' logger.
Args:
verbose: If True, include timestamps and level names.
debug: If True, set level to DEBUG and add file:line info.
"""
# Ensure UTF-8 output for French messages (é, è, ê, etc.)
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
if debug:
level = logging.DEBUG
fmt = "%(asctime)s.%(msecs)03d %(levelname)-5s [%(filename)s:%(lineno)d] %(message)s"
elif verbose:
level = logging.INFO
fmt = "%(asctime)s %(levelname)-5s %(message)s"
else:
level = logging.INFO
fmt = "%(message)s"
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S"))
logger.setLevel(level)
logger.handlers.clear()
logger.addHandler(handler)
logger.propagate = False # Prevent double logging via root logger
# Also configure the root logger so worker processes log properly
root_logger = logging.getLogger()
root_logger.setLevel(level)
if not root_logger.handlers:
root_handler = logging.StreamHandler(sys.stdout)
root_handler.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S"))
root_logger.addHandler(root_handler)
return logger
def main():
"""Entry point for the LiDAR archaeological pipeline."""
parser = argparse.ArgumentParser(
description="Pipeline LiDAR pour détection archéologique",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Exemples:
Traitement standard:
python -m lidar_pipeline /data/input -o /data/output
Haute résolution avec accélération GPU:
python -m lidar_pipeline /data/input -o /data/output -r 0.2 -g
Mode verbeux (timestamps):
python -m lidar_pipeline /data/input -o /data/output -v
Mode debug (détails internes):
python -m lidar_pipeline /data/input -o /data/output --debug
Forcer la régénération de tous les fichiers:
python -m lidar_pipeline /data/input -o /data/output --force
Traiter un seul fichier (pour tests):
python -m lidar_pipeline /data/input -o /data/output --file LHD_FXX_1000_6881_PTS_LAMB93_IGN69.copc
Traitement parallèle (4 workers):
python -m lidar_pipeline /data/input -o /data/output -w 4
"""
)
parser.add_argument(
"input",
help="Dossier contenant les fichiers LAZ/LAS"
)
parser.add_argument(
"-o", "--output",
default="/data/output",
help="Dossier de sortie (défaut: /data/output)"
)
parser.add_argument(
"-r", "--resolution",
type=float,
default=0.5,
help="Résolution en mètres par pixel (défaut: 0.5)"
)
parser.add_argument(
"-w", "--workers",
type=int,
default=1,
help="Nombre de workers pour traitement parallèle (défaut: 1)"
)
parser.add_argument(
"-f", "--force",
action="store_true",
help="Régénérer tous les fichiers même si les WebP existent déjà"
)
parser.add_argument(
"--force-classification",
action="store_true",
help="Reclassifier le sol même si le fichier .las existe déjà"
)
parser.add_argument(
"--keep-tif",
action="store_true",
help="Conserver les fichiers TIFF (DTM + visualisations) pour pouvoir régénérer les WebP sans recalculer"
)
parser.add_argument(
"--ground-classification",
choices=["auto", "smrf", "csf"],
default="auto",
help="Méthode de classification du sol : auto (détection), smrf, csf (défaut: auto)"
)
parser.add_argument(
"--file",
nargs="+",
type=str,
default=None,
help="Traiter un ou plusieurs fichiers LAZ/LAS (nom complet sans extension, ex: LHD_FXX_1000_6882_PTS_LAMB93_IGN69.copc)"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Mode verbeux : affiche les timestamps et niveaux"
)
parser.add_argument(
"--debug",
action="store_true",
help="Mode debug : affiche les détails internes (fichier:ligne)"
)
args = parser.parse_args()
# Configure logging before any other output
setup_logging(verbose=args.verbose, debug=args.debug)
# Add file prefix filter for parallel processing
from .pipeline import _file_filter
logger.addFilter(_file_filter)
logger.info("=" * 60)
logger.info("Pipeline LiDAR Archéologique")
logger.info("=" * 60)
# Kill orphan PDAL processes on interrupt or termination
signal.signal(signal.SIGINT, _kill_orphan_pdal)
signal.signal(signal.SIGTERM, _kill_orphan_pdal)
import atexit
atexit.register(_kill_orphan_pdal)
log_gpu_status()
try:
pipeline = LidarArchaeoPipeline(
input_dir=args.input,
output_dir=args.output,
resolution=args.resolution,
workers=args.workers,
force=args.force,
ground_method=args.ground_classification,
force_classify=args.force_classification,
keep_tif=args.keep_tif,
)
# If --file is specified, process only matching files
if args.file:
from pathlib import Path
input_dir = Path(args.input)
# Each pattern is the full filename without extension (e.g. LHD_FXX_1000_6882_PTS_LAMB93_IGN69.copc)
# Also supports bare name without .copc (e.g. LHD_FXX_1000_6882_PTS_LAMB93_IGN69)
selected_files = []
for pattern in args.file:
matches = (list(input_dir.glob(f"{pattern}.laz"))
+ list(input_dir.glob(f"{pattern}.las"))
+ list(input_dir.glob(f"{pattern}.copc.laz"))
+ list(input_dir.glob(f"{pattern}.copc.las")))
# Remove duplicates
matches = list(dict.fromkeys(matches))
if not matches:
logger.warning(f"Aucun fichier trouvé pour: {pattern}")
continue
selected_files.extend(matches)
# Remove duplicates across patterns
seen = set()
unique_files = []
for f in selected_files:
if f not in seen:
seen.add(f)
unique_files.append(f)
if not unique_files:
logger.error("Aucun fichier trouvé pour les motifs spécifiés")
sys.exit(1)
logger.info(f"Traitement de {len(unique_files)} fichier(s) sélectionné(s)")
for laz_file in unique_files:
logger.info(f"{laz_file.name}")
for laz_file in unique_files:
pipeline.process_file(laz_file)
# Clean up temporary files
logger.info("Nettoyage des fichiers temporaires...")
try:
if pipeline.temp_dir.exists():
shutil.rmtree(pipeline.temp_dir)
temp_base = pipeline.output_dir / "temp"
if temp_base.exists():
shutil.rmtree(temp_base)
logger.info(" ✓ Fichiers temporaires supprimés")
except Exception as e:
logger.warning(f" Note: Impossible de supprimer les fichiers temporaires: {e}")
else:
pipeline.process_all()
except Exception as e:
logger.error(f"Erreur fatale: {e}", exc_info=True)
sys.exit(1)
def _kill_orphan_pdal(signum=None, frame=None):
"""Kill orphan PDAL processes on interrupt or exit."""
import subprocess
try:
subprocess.run(["pkill", "-f", "pdal"], capture_output=True, timeout=5)
except Exception:
pass
if signum is not None:
logger.info("Interruption — nettoyage des processus PDAL")
sys.exit(130)