Files
lidar_rendu/lidar_pipeline/cli.py
Jacquin Antoine e2845b9e6d Pipeline LiDAR: classification sol auto + pré-traitement ELM + fix warnings
- Ajout classification automatique du sol (SMRF/PMF/CSF) avec détection
  heuristique (ratio retours uniques > 0.6 → PMF urbain, sinon SMRF)
- Pré-traitement PDAL recommandé avant classification: ELM + outlier
  removal (cell=5.0, threshold=2.0 adapté au calcaire rocailleux)
- Options CLI: --ground-classification {auto,smrf,pmf,csf} et
  --force-classification pour forcer la reclassification
- Fix double logging (logger.propagate = False)
- Fix --force non transmis dans run.sh (réécriture parsing arguments)
- Fix warning numpy 'partition will ignore mask': conversion MaskedArray
  en ndarray avant np.percentile()
- Ajout liblaszip8 + lazrs pour support LAZ dans Docker et laspy
- Tests unitaires pour PMF, CSF et auto-détection

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-10 03:00:33 +02:00

194 lines
6.3 KiB
Python

"""Command-line interface for the LiDAR archaeological pipeline.
Handles argument parsing, logging configuration, and entry point.
"""
import argparse
import logging
import sys
from .pipeline import LidarArchaeoPipeline
from .gpu import log_gpu_status
logger = logging.getLogger("lidar")
def setup_logging(verbose=False, debug=False):
"""Configure the 'lidar' logger.
Args:
verbose: If True, include timestamps and level names.
debug: If True, set level to DEBUG and add file:line info.
"""
if debug:
level = logging.DEBUG
fmt = "%(asctime)s.%(msecs)03d %(levelname)-5s [%(filename)s:%(lineno)d] %(message)s"
elif verbose:
level = logging.INFO
fmt = "%(asctime)s %(levelname)-5s %(message)s"
else:
level = logging.INFO
fmt = "%(message)s"
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S"))
logger.setLevel(level)
logger.handlers.clear()
logger.addHandler(handler)
logger.propagate = False # Prevent double logging via root logger
# Also configure the root logger so worker processes log properly
root_logger = logging.getLogger()
root_logger.setLevel(level)
if not root_logger.handlers:
root_handler = logging.StreamHandler(sys.stdout)
root_handler.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S"))
root_logger.addHandler(root_handler)
return logger
def main():
"""Entry point for the LiDAR archaeological pipeline."""
parser = argparse.ArgumentParser(
description="Pipeline LiDAR pour détection archéologique",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Exemples:
Traitement standard:
python -m lidar_pipeline /data/input -o /data/output
Haute résolution avec accélération GPU:
python -m lidar_pipeline /data/input -o /data/output -r 0.2 -g
Mode verbeux (timestamps):
python -m lidar_pipeline /data/input -o /data/output -v
Mode debug (détails internes):
python -m lidar_pipeline /data/input -o /data/output --debug
Forcer la régénération de tous les fichiers:
python -m lidar_pipeline /data/input -o /data/output --force
Traiter un seul fichier (pour tests):
python -m lidar_pipeline /data/input -o /data/output --file LHD_FXX_1000_6881_PTS_LAMB93_IGN69.copc
Traitement parallèle (4 workers):
python -m lidar_pipeline /data/input -o /data/output -w 4
"""
)
parser.add_argument(
"input",
help="Dossier contenant les fichiers LAZ/LAS"
)
parser.add_argument(
"-o", "--output",
default="/data/output",
help="Dossier de sortie (défaut: /data/output)"
)
parser.add_argument(
"-r", "--resolution",
type=float,
default=0.5,
help="Résolution en mètres par pixel (défaut: 0.5)"
)
parser.add_argument(
"-w", "--workers",
type=int,
default=1,
help="Nombre de workers pour traitement parallèle (défaut: 1)"
)
parser.add_argument(
"-f", "--force",
action="store_true",
help="Régénérer tous les fichiers même si les WebP existent déjà"
)
parser.add_argument(
"--force-classification",
action="store_true",
help="Reclassifier le sol même si le fichier .las existe déjà"
)
parser.add_argument(
"--ground-classification",
choices=["auto", "smrf", "pmf", "csf"],
default="auto",
help="Méthode de classification du sol : auto (détection), smrf, pmf, csf (défaut: auto)"
)
parser.add_argument(
"--file",
nargs="+",
type=str,
default=None,
help="Traiter un ou plusieurs fichiers LAZ/LAS (nom complet sans extension, ex: LHD_FXX_1000_6882_PTS_LAMB93_IGN69.copc)"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Mode verbeux : affiche les timestamps et niveaux"
)
parser.add_argument(
"--debug",
action="store_true",
help="Mode debug : affiche les détails internes (fichier:ligne)"
)
args = parser.parse_args()
# Configure logging before any other output
setup_logging(verbose=args.verbose, debug=args.debug)
# Add file prefix filter for parallel processing
from .pipeline import _file_filter
logger.addFilter(_file_filter)
logger.info("=" * 60)
logger.info("Pipeline LiDAR Archéologique")
logger.info("=" * 60)
log_gpu_status()
try:
pipeline = LidarArchaeoPipeline(
input_dir=args.input,
output_dir=args.output,
resolution=args.resolution,
workers=args.workers,
force=args.force,
ground_method=args.ground_classification,
force_classify=args.force_classification
)
# If --file is specified, process only matching files
if args.file:
from pathlib import Path
input_dir = Path(args.input)
# Each pattern is the full filename without extension (e.g. LHD_FXX_1000_6882_PTS_LAMB93_IGN69.copc)
selected_files = []
for pattern in args.file:
matches = list(input_dir.glob(f"{pattern}.laz")) + list(input_dir.glob(f"{pattern}.las"))
# Remove duplicates
matches = list(dict.fromkeys(matches))
if not matches:
logger.warning(f"Aucun fichier trouvé pour: {pattern}")
continue
selected_files.extend(matches)
# Remove duplicates across patterns
seen = set()
unique_files = []
for f in selected_files:
if f not in seen:
seen.add(f)
unique_files.append(f)
if not unique_files:
logger.error("Aucun fichier trouvé pour les motifs spécifiés")
sys.exit(1)
logger.info(f"Traitement de {len(unique_files)} fichier(s) sélectionné(s)")
for laz_file in unique_files:
logger.info(f"{laz_file.name}")
for laz_file in unique_files:
pipeline.process_file(laz_file)
else:
pipeline.process_all()
except Exception as e:
logger.error(f"Erreur fatale: {e}", exc_info=True)
sys.exit(1)