- Inversion axe Y du DTM pour orientation nord correcte - Fallback filters.range pour fichiers LAZ avec ReturnNumber=0 - Flèche nord vectorielle noire au-dessus de la légende - 9/9 fichiers traités avec succès Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
954 lines
34 KiB
Python
Executable File
954 lines
34 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Pipeline LiDAR pour détection archéologique - Version Python pur
|
|
Visualisations générées avec numpy/rasterio/matplotlib
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import subprocess
|
|
from pathlib import Path
|
|
import numpy as np
|
|
import rasterio
|
|
from rasterio.transform import from_bounds
|
|
import matplotlib
|
|
matplotlib.use('Agg')
|
|
import matplotlib.pyplot as plt
|
|
from matplotlib import rcParams
|
|
from scipy import ndimage
|
|
from scipy.stats import binned_statistic_2d
|
|
from tqdm import tqdm
|
|
|
|
try:
|
|
import laspy
|
|
except ImportError as e:
|
|
print(f"Erreur import: {e}")
|
|
sys.exit(1)
|
|
|
|
rcParams['figure.dpi'] = 150
|
|
rcParams['savefig.dpi'] = 300
|
|
rcParams['font.size'] = 10
|
|
|
|
|
|
class LidarArchaeoPipeline:
|
|
"""Pipeline Python pur pour analyse archéologique LiDAR"""
|
|
|
|
def __init__(self, input_dir, output_dir, resolution=0.5):
|
|
self.input_dir = Path(input_dir)
|
|
self.output_dir = Path(output_dir)
|
|
self.resolution = resolution
|
|
self.temp_dir = self.output_dir / "temp"
|
|
|
|
if not self.input_dir.exists():
|
|
raise ValueError(f"Répertoire introuvable: {self.input_dir}")
|
|
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
self.temp_dir.mkdir(exist_ok=True)
|
|
|
|
self.dtm_dir = self.output_dir / "DTM"
|
|
self.vis_dir = self.output_dir / "visualisations"
|
|
|
|
for d in [self.dtm_dir, self.vis_dir]:
|
|
d.mkdir(exist_ok=True)
|
|
|
|
print(f"✓ Pipeline initialisé (Python pur)")
|
|
print(f" Entrée : {self.input_dir}")
|
|
print(f" Sortie : {self.output_dir}")
|
|
print(f" Résolution : {resolution}m/px")
|
|
|
|
def find_laz_files(self):
|
|
"""Trouve tous les fichiers LAZ/LAS"""
|
|
files = list(self.input_dir.glob("*.laz")) + list(self.input_dir.glob("*.las"))
|
|
print(f"✓ {len(files)} fichier(s) LiDAR trouvé(s)")
|
|
return sorted(files)
|
|
|
|
def check_tools(self):
|
|
"""Vérifie que les outils sont disponibles"""
|
|
for name, cmd in [('pdal', 'pdal --version'), ('gdal', 'gdalinfo --version')]:
|
|
try:
|
|
subprocess.run(cmd.split(), capture_output=True, check=True)
|
|
print(f" ✓ {name}")
|
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
|
return False
|
|
return True
|
|
|
|
# ============ STEP 1: Classification ============
|
|
|
|
def create_pipeline_json(self, input_laz, output_las):
|
|
"""Create PDAL pipeline for ground classification"""
|
|
pipeline = {
|
|
"pipeline": [
|
|
str(input_laz),
|
|
{
|
|
"type": "filters.smrf",
|
|
"ignore": "Classification[7:7]",
|
|
"slope": 1.0,
|
|
"window": 16.0,
|
|
"threshold": 0.5,
|
|
"scalar": 1.25
|
|
},
|
|
{
|
|
"type": "filters.range",
|
|
"limits": "Classification[2:2]"
|
|
},
|
|
{
|
|
"type": "writers.las",
|
|
"filename": str(output_las),
|
|
"extra_dims": "all"
|
|
}
|
|
]
|
|
}
|
|
return json.dumps(pipeline)
|
|
|
|
def classify_ground(self, laz_file):
|
|
"""Classify ground points using SMRF filter"""
|
|
output_las = self.temp_dir / f"{laz_file.stem}_ground.las"
|
|
|
|
if output_las.exists():
|
|
print(f" ✓ Classification déjà effectuée")
|
|
return output_las
|
|
|
|
pipeline_json = self.create_pipeline_json(laz_file, output_las)
|
|
pipeline_file = self.temp_dir / "pipeline.json"
|
|
|
|
with open(pipeline_file, 'w') as f:
|
|
f.write(pipeline_json)
|
|
|
|
try:
|
|
subprocess.run(
|
|
["pdal", "pipeline", str(pipeline_file)],
|
|
capture_output=True,
|
|
check=True
|
|
)
|
|
print(f" ✓ Classification sol terminée")
|
|
return output_las
|
|
except subprocess.CalledProcessError as e:
|
|
error_msg = e.stderr.decode()
|
|
print(f" ✗ Erreur PDAL: {error_msg}")
|
|
|
|
# If error is about ReturnNumber=0, try filtering those points
|
|
if "ReturnNumber" in error_msg and "NumberOfReturns" in error_msg:
|
|
print(f" → Tentative de filtrage des points ReturnNumber=0...")
|
|
|
|
# Use filters.range to keep only valid points (ReturnNumber >= 1)
|
|
filtered_pipeline = [
|
|
{
|
|
"type": "readers.las",
|
|
"filename": str(laz_file)
|
|
},
|
|
{
|
|
"type": "filters.range",
|
|
"limits": "ReturnNumber[1:],NumberOfReturns[1:]"
|
|
},
|
|
{
|
|
"type": "filters.smrf",
|
|
"scalar": 1.25
|
|
},
|
|
{
|
|
"type": "filters.range",
|
|
"limits": "Classification[2:2]"
|
|
},
|
|
{
|
|
"type": "writers.las",
|
|
"filename": str(output_las),
|
|
"extra_dims": "all"
|
|
}
|
|
]
|
|
|
|
filtered_json = json.dumps(filtered_pipeline)
|
|
with open(pipeline_file, 'w') as f:
|
|
f.write(filtered_json)
|
|
|
|
try:
|
|
subprocess.run(
|
|
["pdal", "pipeline", str(pipeline_file)],
|
|
capture_output=True,
|
|
check=True
|
|
)
|
|
print(f" ✓ Classification sol terminée (points filtrés)")
|
|
return output_las
|
|
except subprocess.CalledProcessError as e2:
|
|
print(f" ✗ Erreur même avec filtrage: {e2.stderr.decode()}")
|
|
return None
|
|
|
|
return None
|
|
|
|
def run_pdal_pipeline(self, pipeline_json, output_file):
|
|
"""Exécute un pipeline PDAL"""
|
|
pipeline_file = self.temp_dir / "temp_pipeline.json"
|
|
with open(pipeline_file, 'w') as f:
|
|
f.write(pipeline_json)
|
|
|
|
try:
|
|
subprocess.run(
|
|
["pdal", "pipeline", str(pipeline_file)],
|
|
capture_output=True,
|
|
check=True
|
|
)
|
|
print(f" ✓ Pipeline terminé")
|
|
return output_file
|
|
except subprocess.CalledProcessError as e:
|
|
print(f" ✗ Erreur PDAL: {e.stderr.decode()}")
|
|
return None
|
|
|
|
# ============ STEP 2: DTM Generation ============
|
|
|
|
def create_dtm_fast(self, las_file, basename):
|
|
"""Create DTM using fast binning method with gap filling"""
|
|
print(f" → Génération DTM...")
|
|
|
|
try:
|
|
import laspy
|
|
from scipy.ndimage import gaussian_filter
|
|
|
|
# Read LAZ/LAS file
|
|
las = laspy.read(str(las_file))
|
|
|
|
# Get bounds
|
|
min_x, max_x = float(las.header.min[0]), float(las.header.max[0])
|
|
min_y, max_y = float(las.header.min[1]), float(las.header.max[1])
|
|
|
|
# Calculate grid dimensions
|
|
width = int(np.ceil((max_x - min_x) / self.resolution))
|
|
height = int(np.ceil((max_y - min_y) / self.resolution))
|
|
|
|
print(f" Bounds: X[{min_x:.1f}, {max_x:.1f}] Y[{min_y:.1f}, {max_y:.1f}]")
|
|
print(f" Grid: {width}x{height} pixels ({len(las.points):,} points)")
|
|
|
|
# Fast binning method
|
|
print(f" Rasterisation rapide...")
|
|
|
|
stat = binned_statistic_2d(
|
|
las.x, las.y, las.z,
|
|
statistic='mean',
|
|
bins=[width, height],
|
|
range=[[min_x, max_x], [min_y, max_y]]
|
|
)
|
|
|
|
dtm = stat.statistic.T
|
|
|
|
# Flip Y axis so north is at the top (Y decreases from top to bottom in image)
|
|
dtm = dtm[::-1, :]
|
|
|
|
# Fill gaps using interpolation
|
|
print(f" Remplissage des zones vides...")
|
|
from scipy.ndimage import distance_transform_edt
|
|
|
|
# Create mask of empty cells
|
|
mask = np.isnan(dtm)
|
|
|
|
if np.any(mask):
|
|
# Use inpainting to fill gaps
|
|
from scipy import interpolate
|
|
|
|
# Get coordinates of valid points
|
|
y_coords, x_coords = np.where(~mask)
|
|
z_values = dtm[~mask]
|
|
|
|
# Create interpolation function
|
|
if len(y_coords) > 100: # Only interpolate if enough points
|
|
# Create grid for interpolation
|
|
y_all, x_all = np.mgrid[0:dtm.shape[0], 0:dtm.shape[1]]
|
|
|
|
# Use nearest neighbor interpolation for speed
|
|
interp = interpolate.NearestNDInterpolator(
|
|
np.column_stack((y_coords, x_coords)),
|
|
z_values
|
|
)
|
|
|
|
# Fill missing values
|
|
dtm_filled = interp(y_all, x_all)
|
|
|
|
# Apply slight smoothing to blend filled areas
|
|
dtm_filled = gaussian_filter(dtm_filled, sigma=0.5)
|
|
|
|
# Replace NaN values with filled values
|
|
dtm[mask] = dtm_filled[mask]
|
|
|
|
# Save as GeoTIFF
|
|
output_tif = self.dtm_dir / f"{basename}_dtm.tif"
|
|
transform = from_bounds(min_x, min_y, max_x, max_y, width, height)
|
|
|
|
with rasterio.open(
|
|
output_tif,
|
|
'w',
|
|
driver='GTiff',
|
|
height=height,
|
|
width=width,
|
|
count=1,
|
|
dtype='float32',
|
|
crs='EPSG:2154',
|
|
transform=transform,
|
|
compress='lzw'
|
|
) as dst:
|
|
dst.write(dtm.astype('float32'), 1)
|
|
|
|
print(f" ✓ DTM créé: {output_tif.name}")
|
|
return output_tif
|
|
|
|
except Exception as e:
|
|
print(f" ✗ Erreur DTM: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
# ============ STEP 3: Visualizations (Python) ============
|
|
|
|
def generate_hillshade(self, dem_file, basename):
|
|
"""Generate hillshade using numpy"""
|
|
print(f" → Hillshade multidirectionnel...")
|
|
|
|
output = self.vis_dir / f"{basename}_hillshade_multi.tif"
|
|
|
|
try:
|
|
with rasterio.open(dem_file) as src:
|
|
dem = src.read(1)
|
|
transform = src.transform
|
|
crs = src.crs
|
|
|
|
# Calculate gradients
|
|
dy, dx = np.gradient(dem)
|
|
|
|
# Multi-directional hillshade (NW, NE, SW, SE)
|
|
azimuts = [315, 45, 225, 135]
|
|
altitude = 30
|
|
hillshades = []
|
|
|
|
for az in azimuts:
|
|
az_rad = np.radians(az)
|
|
alt_rad = np.radians(altitude)
|
|
|
|
# Calculate slope and aspect
|
|
slope = np.arctan(np.sqrt(dx**2 + dy**2))
|
|
aspect = np.arctan2(-dy, dx)
|
|
|
|
# Hillshade formula
|
|
hs = np.sin(alt_rad) * np.sin(slope) + \
|
|
np.cos(alt_rad) * np.cos(slope) * np.cos(az_rad - aspect)
|
|
hillshades.append(np.clip(hs, 0, 1))
|
|
|
|
# Combine all directions
|
|
combined = np.mean(hillshades, axis=0)
|
|
|
|
# Save
|
|
with rasterio.open(
|
|
output,
|
|
'w',
|
|
driver='GTiff',
|
|
height=combined.shape[0],
|
|
width=combined.shape[1],
|
|
count=1,
|
|
dtype='float32',
|
|
crs=crs,
|
|
transform=transform,
|
|
compress='lzw'
|
|
) as dst:
|
|
dst.write(combined.astype('float32'), 1)
|
|
|
|
return output
|
|
except Exception as e:
|
|
print(f" ✗ Erreur hillshade: {e}")
|
|
return None
|
|
|
|
def generate_slope(self, dem_file, basename):
|
|
"""Generate slope map"""
|
|
print(f" → Pente (Slope)...")
|
|
|
|
output = self.vis_dir / f"{basename}_slope.tif"
|
|
|
|
try:
|
|
with rasterio.open(dem_file) as src:
|
|
dem = src.read(1)
|
|
transform = src.transform
|
|
crs = src.crs
|
|
|
|
# Calculate slope
|
|
dy, dx = np.gradient(dem)
|
|
slope = np.arctan(np.sqrt(dx**2 + dy**2)) * 180 / np.pi
|
|
|
|
# Save
|
|
with rasterio.open(
|
|
output,
|
|
'w',
|
|
driver='GTiff',
|
|
height=slope.shape[0],
|
|
width=slope.shape[1],
|
|
count=1,
|
|
dtype='float32',
|
|
crs=crs,
|
|
transform=transform,
|
|
compress='lzw'
|
|
) as dst:
|
|
dst.write(slope.astype('float32'), 1)
|
|
|
|
return output
|
|
except Exception as e:
|
|
print(f" ✗ Erreur slope: {e}")
|
|
return None
|
|
|
|
def generate_aspect(self, dem_file, basename):
|
|
"""Generate aspect (orientation of slopes) map"""
|
|
print(f" → Aspect (Orientation)...")
|
|
|
|
output = self.vis_dir / f"{basename}_aspect.tif"
|
|
|
|
try:
|
|
with rasterio.open(dem_file) as src:
|
|
dem = src.read(1)
|
|
transform = src.transform
|
|
crs = src.crs
|
|
|
|
# Calculate aspect (direction of slope)
|
|
dy, dx = np.gradient(dem)
|
|
aspect = np.arctan2(-dy, dx) * 180 / np.pi
|
|
aspect = np.mod(aspect, 360) # Convert to 0-360
|
|
|
|
# Save
|
|
with rasterio.open(
|
|
output,
|
|
'w',
|
|
driver='GTiff',
|
|
height=aspect.shape[0],
|
|
width=aspect.shape[1],
|
|
count=1,
|
|
dtype='float32',
|
|
crs=crs,
|
|
transform=transform,
|
|
compress='lzw'
|
|
) as dst:
|
|
dst.write(aspect.astype('float32'), 1)
|
|
|
|
return output
|
|
except Exception as e:
|
|
print(f" ✗ Erreur aspect: {e}")
|
|
return None
|
|
|
|
def generate_curvature(self, dem_file, basename):
|
|
"""Generate curvature (terrain concavity/convexity) map"""
|
|
print(f" → Courbure (Curvature)...")
|
|
|
|
output = self.vis_dir / f"{basename}_curvature.tif"
|
|
|
|
try:
|
|
with rasterio.open(dem_file) as src:
|
|
dem = src.read(1)
|
|
transform = src.transform
|
|
crs = src.crs
|
|
|
|
# Calculate curvature using Laplacian
|
|
dz_dx = np.gradient(dem, axis=1)
|
|
dz_dy = np.gradient(dem, axis=0)
|
|
d2z_dx2 = np.gradient(dz_dx, axis=1)
|
|
d2z_dy2 = np.gradient(dz_dy, axis=0)
|
|
|
|
# Mean curvature
|
|
curvature = (d2z_dx2 + d2z_dy2) / 2
|
|
|
|
# Save
|
|
with rasterio.open(
|
|
output,
|
|
'w',
|
|
driver='GTiff',
|
|
height=curvature.shape[0],
|
|
width=curvature.shape[1],
|
|
count=1,
|
|
dtype='float32',
|
|
crs=crs,
|
|
transform=transform,
|
|
compress='lzw'
|
|
) as dst:
|
|
dst.write(curvature.astype('float32'), 1)
|
|
|
|
return output
|
|
except Exception as e:
|
|
print(f" ✗ Erreur curvature: {e}")
|
|
return None
|
|
|
|
def generate_solar(self, dem_file, basename):
|
|
"""Generate solar irradiance simulation"""
|
|
print(f" → Éclairage Solaire (Solar Irradiance)...")
|
|
|
|
output = self.vis_dir / f"{basename}_solar.tif"
|
|
|
|
try:
|
|
with rasterio.open(dem_file) as src:
|
|
dem = src.read(1)
|
|
transform = src.transform
|
|
crs = src.crs
|
|
|
|
# Calculate gradients
|
|
dy, dx = np.gradient(dem)
|
|
|
|
# Calculate slope and aspect
|
|
slope = np.arctan(np.sqrt(dx**2 + dy**2))
|
|
aspect = np.arctan2(-dy, dx)
|
|
|
|
# Solar irradiance (morning sun - azimuth 90, altitude 30)
|
|
az_rad = np.radians(90)
|
|
alt_rad = np.radians(30)
|
|
|
|
# Solar radiation formula
|
|
solar = np.sin(alt_rad) * np.sin(slope) + \
|
|
np.cos(alt_rad) * np.cos(slope) * np.cos(az_rad - aspect)
|
|
|
|
# Clip negative values (shadows)
|
|
solar = np.clip(solar, 0, 1)
|
|
|
|
# Save
|
|
with rasterio.open(
|
|
output,
|
|
'w',
|
|
driver='GTiff',
|
|
height=solar.shape[0],
|
|
width=solar.shape[1],
|
|
count=1,
|
|
dtype='float32',
|
|
crs=crs,
|
|
transform=transform,
|
|
compress='lzw'
|
|
) as dst:
|
|
dst.write(solar.astype('float32'), 1)
|
|
|
|
return output
|
|
except Exception as e:
|
|
print(f" ✗ Erreur solar: {e}")
|
|
return None
|
|
|
|
def generate_lrm(self, dem_file, basename):
|
|
"""Local Relief Model - deviation from local mean"""
|
|
print(f" → Local Relief Model...")
|
|
|
|
output = self.vis_dir / f"{basename}_lrm.tif"
|
|
|
|
try:
|
|
with rasterio.open(dem_file) as src:
|
|
dem = src.read(1)
|
|
transform = src.transform
|
|
crs = src.crs
|
|
|
|
# Calculate local mean with gaussian filter
|
|
from scipy.ndimage import gaussian_filter
|
|
local_mean = gaussian_filter(dem, sigma=15/self.resolution)
|
|
|
|
# Deviation from mean
|
|
lrm = dem - local_mean
|
|
|
|
# Save
|
|
with rasterio.open(
|
|
output,
|
|
'w',
|
|
driver='GTiff',
|
|
height=lrm.shape[0],
|
|
width=lrm.shape[1],
|
|
count=1,
|
|
dtype='float32',
|
|
crs=crs,
|
|
transform=transform,
|
|
compress='lzw'
|
|
) as dst:
|
|
dst.write(lrm.astype('float32'), 1)
|
|
|
|
return output
|
|
except Exception as e:
|
|
print(f" ✗ Erreur LRM: {e}")
|
|
return None
|
|
|
|
def generate_svf(self, dem_file, basename):
|
|
"""Sky-View Factor approximation"""
|
|
print(f" → Sky-View Factor...")
|
|
|
|
output = self.vis_dir / f"{basename}_svf.tif"
|
|
|
|
try:
|
|
with rasterio.open(dem_file) as src:
|
|
dem = src.read(1)
|
|
transform = src.transform
|
|
crs = src.crs
|
|
|
|
# Simplified SVF using relief inverted
|
|
# Normalize DEM
|
|
dem_norm = (dem - np.nanmin(dem)) / (np.nanmax(dem) - np.nanmin(dem))
|
|
|
|
# Apply inverted relief (approximation of SVF)
|
|
svf = 1 - dem_norm
|
|
|
|
# Save
|
|
with rasterio.open(
|
|
output,
|
|
'w',
|
|
driver='GTiff',
|
|
height=svf.shape[0],
|
|
width=svf.shape[1],
|
|
count=1,
|
|
dtype='float32',
|
|
crs=crs,
|
|
transform=transform,
|
|
compress='lzw'
|
|
) as dst:
|
|
dst.write(svf.astype('float32'), 1)
|
|
|
|
return output
|
|
except Exception as e:
|
|
print(f" ✗ Erreur SVF: {e}")
|
|
return None
|
|
|
|
def generate_openness(self, dem_file, basename, positive=True):
|
|
"""Positive/Negative Openness approximation"""
|
|
name = "positive_openness" if positive else "negative_openness"
|
|
print(f" → {name.replace('_', ' ').title()}...")
|
|
|
|
output = self.vis_dir / f"{basename}_{name}.tif"
|
|
|
|
try:
|
|
with rasterio.open(dem_file) as src:
|
|
dem = src.read(1)
|
|
transform = src.transform
|
|
crs = src.crs
|
|
|
|
if positive:
|
|
# Positive openness: high points
|
|
from scipy.ndimage import maximum_filter
|
|
openness = maximum_filter(dem, size=int(10/self.resolution))
|
|
else:
|
|
# Negative openness: low points (cavities!)
|
|
from scipy.ndimage import minimum_filter
|
|
openness = minimum_filter(dem, size=int(10/self.resolution))
|
|
|
|
# Calculate difference
|
|
result = openness - dem if positive else dem - openness
|
|
|
|
# Save
|
|
with rasterio.open(
|
|
output,
|
|
'w',
|
|
driver='GTiff',
|
|
height=result.shape[0],
|
|
width=result.shape[1],
|
|
count=1,
|
|
dtype='float32',
|
|
crs=crs,
|
|
transform=transform,
|
|
compress='lzw'
|
|
) as dst:
|
|
dst.write(result.astype('float32'), 1)
|
|
|
|
return output
|
|
except Exception as e:
|
|
print(f" ✗ Erreur openness: {e}")
|
|
return None
|
|
|
|
# ============ STEP 4: PNG Conversion ============
|
|
|
|
def tif_to_png(self, tif_file):
|
|
"""Convert GeoTIFF to visualization JPEG with explicit legend"""
|
|
if not tif_file or not tif_file.exists():
|
|
return None
|
|
|
|
jpg_file = self.vis_dir / f"{tif_file.stem}.jpg"
|
|
|
|
try:
|
|
with rasterio.open(tif_file) as src:
|
|
data = src.read(1)
|
|
nodata = src.nodata
|
|
|
|
if nodata is not None:
|
|
data = np.ma.masked_where((data == nodata) | np.isnan(data), data)
|
|
|
|
# Get valid data for normalization
|
|
valid_data = data.compressed() if hasattr(data, 'compressed') else data.flatten()
|
|
valid_data = valid_data[~np.isnan(valid_data)]
|
|
|
|
# Determine colormap, normalization, and legend info
|
|
if 'hillshade' in str(tif_file):
|
|
cmap = 'gray'
|
|
vmin, vmax = 0, 1
|
|
data = np.clip(data, vmin, vmax)
|
|
title = "Hillshade Multidirectionnel"
|
|
legend_label = "Illumination (0-1)"
|
|
description = "Ombres → Structures, murs, terrasses visibles"
|
|
elif 'slope' in str(tif_file):
|
|
cmap = 'inferno'
|
|
vmin, vmax = 0, np.percentile(valid_data, 95)
|
|
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
|
|
title = "Pente (Slope)"
|
|
legend_label = f"Pente (°)\nMin: {vmin:.1f}° | Max: {vmax:.1f}°"
|
|
description = "Orange/Clair = Forte pente (murs, talus) | Foncé = Faible pente"
|
|
elif 'aspect' in str(tif_file):
|
|
cmap = 'hsv' # Cyclic colormap for directions
|
|
# Aspect is 0-360, normalize to 0-1
|
|
vmin, vmax = 0, 360
|
|
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
|
|
title = "Aspect (Orientation)"
|
|
legend_label = "Orientation (° du Nord)\nN=0°, E=90°, S=180°, O=270°"
|
|
description = "Couleur = Direction de la pente (utile pour orientation bâtiments)"
|
|
elif 'curvature' in str(tif_file):
|
|
cmap = 'RdYlBu_r' # Diverging for positive/negative curvature
|
|
vmax = max(abs(np.percentile(valid_data, 5)), abs(np.percentile(valid_data, 95)), 0.001)
|
|
vmin, vmax = -vmax, vmax
|
|
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
|
|
title = "Courbure (Curvature)"
|
|
legend_label = f"Courbure\nRouge = Convexe (bosse)\nBleu = Concave (creux)"
|
|
description = "⭐ Excellent pour fossés, levées, terrasses, talus"
|
|
elif 'solar' in str(tif_file):
|
|
cmap = 'YlOrBr' # Sun-like colormap
|
|
vmin, vmax = 0, 1
|
|
data = np.clip(data, vmin, vmax)
|
|
title = "Éclairage Solaire (Matin)"
|
|
legend_label = "Irradiance Solaire\nFoncé = Ombre | Clair = Éclairé"
|
|
description = "Simulation soleil matin - révèle structures orientées Est"
|
|
elif 'svf' in str(tif_file):
|
|
cmap = 'viridis'
|
|
vmin, vmax = np.percentile(valid_data, (5, 95))
|
|
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
|
|
title = "Sky-View Factor"
|
|
legend_label = "Ouverture vers le ciel\nFoncé = Cuvette | Clair = Sommet"
|
|
description = "Excellent pour structures, tumulus, fondations"
|
|
elif 'lrm' in str(tif_file):
|
|
cmap = 'RdBu_r'
|
|
vmax = max(abs(np.percentile(valid_data, 2)), abs(np.percentile(valid_data, 98)), 0.1)
|
|
vmin, vmax = -vmax, vmax
|
|
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
|
|
title = "Local Relief Model (LRM)"
|
|
legend_label = f"Relief local (m)\nRouge = +{vmax:.1f}m (élévation)\nBleu = {vmin:.1f}m (dépression)"
|
|
description = "Rouge = Surélévation | Bleu = Creux, fossé"
|
|
elif 'positive' in str(tif_file):
|
|
cmap = 'YlOrBr'
|
|
vmin, vmax = np.percentile(valid_data, (10, 98))
|
|
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
|
|
title = "Positive Openness"
|
|
legend_label = f"Ouverture positive (m)\nMin: {vmin:.1f}m | Max: {vmax:.1f}m"
|
|
description = "Orange = Zones élevées (tumulus, collines)"
|
|
elif 'negative' in str(tif_file):
|
|
cmap = 'GnBu_r'
|
|
vmin, vmax = np.percentile(valid_data, (10, 98))
|
|
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
|
|
title = "Negative Openness"
|
|
legend_label = f"Ouverture négative (m)\nMin: {vmin:.1f}m | Max: {vmax:.1f}m"
|
|
description = "Vert/Bleu = Zones basses (cavités, fossés, souterrains)"
|
|
else:
|
|
cmap = 'terrain'
|
|
p2, p98 = np.percentile(valid_data, (2, 98))
|
|
data = np.clip((data - p2) / (p98 - p2), 0, 1)
|
|
title = tif_file.stem.replace('_', ' ').title()
|
|
legend_label = "Altitude normalisée"
|
|
description = ""
|
|
|
|
# Create figure with white background for JPEG
|
|
fig, ax = plt.subplots(figsize=(18, 12), facecolor='white')
|
|
|
|
# Display data with north at the top
|
|
im = ax.imshow(data, cmap=cmap, aspect='equal', origin='upper')
|
|
|
|
# Enhanced colorbar with explicit values
|
|
cbar = plt.colorbar(im, ax=ax, pad=0.03, shrink=0.75, aspect=30)
|
|
cbar.ax.tick_params(labelsize=10, width=1.5)
|
|
cbar.outline.set_linewidth(1.5)
|
|
|
|
# Add colorbar label
|
|
cbar.set_label(legend_label, fontsize=11, fontweight='bold')
|
|
|
|
# Enhanced title with description
|
|
full_title = f"{title}\n{description}"
|
|
ax.set_title(full_title, fontsize=16, fontweight='bold', pad=15)
|
|
|
|
# Add coordinates info
|
|
ax.text(0.02, 0.98, f"Résolution: {self.resolution}m/px",
|
|
transform=ax.transAxes, fontsize=9,
|
|
verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
|
|
|
|
ax.axis('off')
|
|
|
|
# Add vector north arrow above the colorbar on the right side (black lines)
|
|
from matplotlib.patches import FancyArrow, Polygon
|
|
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
|
|
|
|
# Create inset axes positioned above the colorbar on the right
|
|
# The colorbar is typically at the right, so we place north arrow above it
|
|
north_ax = inset_axes(ax, width="5%", height="8%", loc='upper right',
|
|
bbox_to_anchor=(-0.02, 0.15, 1, 1), bbox_transform=ax.transAxes)
|
|
|
|
north_ax.set_xlim(0, 1)
|
|
north_ax.set_ylim(0, 1)
|
|
north_ax.axis('off')
|
|
|
|
# Draw vector arrow pointing north (black lines)
|
|
# Main arrow shaft
|
|
north_ax.plot([0.5, 0.5], [0.15, 0.65], color='black', linewidth=2.5, zorder=10)
|
|
# Arrow head (triangle outline)
|
|
arrow_head_outline = [[0.5, 0.25], [0.35, 0.45], [0.5, 0.65], [0.65, 0.45]]
|
|
for i in range(len(arrow_head_outline) - 1):
|
|
north_ax.plot([arrow_head_outline[i][0], arrow_head_outline[i+1][0]],
|
|
[arrow_head_outline[i][1], arrow_head_outline[i+1][1]],
|
|
color='black', linewidth=2.5, zorder=10)
|
|
# Fill the arrow head
|
|
north_ax.add_patch(Polygon([[0.5, 0.25], [0.35, 0.45], [0.5, 0.65], [0.65, 0.45]],
|
|
closed=True, facecolor='black', edgecolor='black', zorder=9))
|
|
|
|
# Add "N" label above the arrow
|
|
north_ax.text(0.5, 0.92, 'N', ha='center', va='top',
|
|
fontsize=14, fontweight='bold', color='black', zorder=11)
|
|
|
|
fig.patch.set_facecolor('white')
|
|
|
|
plt.tight_layout()
|
|
# Save as JPEG
|
|
plt.savefig(jpg_file, dpi=150, bbox_inches='tight', pad_inches=0.1, facecolor='white', format='jpg')
|
|
plt.close()
|
|
|
|
# Delete the source TIFF file to save space
|
|
tif_file.unlink()
|
|
|
|
return jpg_file
|
|
|
|
except Exception as e:
|
|
print(f" ✗ Erreur conversion JPEG: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
def generate_all_visualizations(self, dtm_file, basename):
|
|
"""Generate all archaeological visualizations"""
|
|
print(f"\n Génération visualisations:")
|
|
|
|
vis_results = {}
|
|
|
|
# Generate rasters (existing + new)
|
|
vis_results['hillshade'] = self.generate_hillshade(dtm_file, basename)
|
|
vis_results['slope'] = self.generate_slope(dtm_file, basename)
|
|
vis_results['aspect'] = self.generate_aspect(dtm_file, basename)
|
|
vis_results['curvature'] = self.generate_curvature(dtm_file, basename)
|
|
vis_results['solar'] = self.generate_solar(dtm_file, basename)
|
|
vis_results['svf'] = self.generate_svf(dtm_file, basename)
|
|
vis_results['lrm'] = self.generate_lrm(dtm_file, basename)
|
|
vis_results['pos_open'] = self.generate_openness(dtm_file, basename, positive=True)
|
|
vis_results['neg_open'] = self.generate_openness(dtm_file, basename, positive=False)
|
|
|
|
# Convert to JPEG
|
|
print(f"\n Conversion images JPEG:")
|
|
for name, tif_file in vis_results.items():
|
|
jpg_file = self.tif_to_png(tif_file)
|
|
if jpg_file:
|
|
print(f" ✓ {jpg_file.name}")
|
|
|
|
return vis_results
|
|
|
|
# ============ Complete Pipeline ============
|
|
|
|
def process_file(self, laz_file):
|
|
"""Process a single LAZ file"""
|
|
basename = laz_file.stem
|
|
print(f"\n{'='*60}")
|
|
print(f" FICHIER : {basename}")
|
|
print(f"{'='*60}")
|
|
|
|
# Step 1: Ground classification
|
|
print(f"\n[1/3] Classification du sol...")
|
|
las_file = self.classify_ground(laz_file)
|
|
if not las_file:
|
|
print(f" ✗ Échec classification")
|
|
return False
|
|
|
|
# Step 2: Generate DTM
|
|
print(f"\n[2/3] Génération DTM...")
|
|
dtm_file = self.create_dtm_fast(las_file, basename)
|
|
if not dtm_file:
|
|
print(f" ✗ Échec DTM")
|
|
return False
|
|
|
|
# Step 3: Visualizations
|
|
print(f"\n[3/3] Visualisations archéologiques...")
|
|
vis = self.generate_all_visualizations(dtm_file, basename)
|
|
|
|
print(f"\n✓ {basename} traité avec succès !")
|
|
return True
|
|
|
|
def process_all(self):
|
|
"""Process all LAZ files in input directory"""
|
|
files = self.find_laz_files()
|
|
|
|
if not files:
|
|
print("Aucun fichier LAZ/LAS trouvé !")
|
|
return
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f" PIPELINE ARCHÉOLOGIQUE LiDAR")
|
|
print(f"{'='*60}")
|
|
|
|
# Check tools
|
|
print(f"\nVérification des outils...")
|
|
if not self.check_tools():
|
|
print("Erreur: Certains outils ne sont pas disponibles")
|
|
return
|
|
|
|
results = {}
|
|
for laz_file in files:
|
|
try:
|
|
results[laz_file.name] = self.process_file(laz_file)
|
|
except Exception as e:
|
|
print(f"\n✗ Erreur traitement: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
results[laz_file.name] = False
|
|
|
|
# Summary
|
|
print(f"\n{'='*60}")
|
|
print(f" RÉSUMÉ")
|
|
print(f"{'='*60}")
|
|
success_count = sum(results.values())
|
|
print(f"✓ {success_count}/{len(results)} fichiers traités avec succès")
|
|
|
|
print(f"\nRésultats dans: {self.output_dir}")
|
|
print(f" • DTM: {self.dtm_dir}")
|
|
print(f" • Visualisations JPEG: {self.vis_dir}")
|
|
|
|
# Clean up temporary files to save space
|
|
print(f"\nNettoyage des fichiers temporaires...")
|
|
try:
|
|
import shutil
|
|
if self.temp_dir.exists():
|
|
shutil.rmtree(self.temp_dir)
|
|
print(f" ✓ Fichiers temporaires supprimés ({self.temp_dir})")
|
|
except Exception as e:
|
|
print(f" Note: Impossible de supprimer les fichiers temporaires")
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Pipeline LiDAR pour détection archéologique (Python pur)"
|
|
)
|
|
parser.add_argument(
|
|
"input",
|
|
help="Dossier contenant les fichiers LAZ/LAS"
|
|
)
|
|
parser.add_argument(
|
|
"-o", "--output",
|
|
default="/data/output",
|
|
help="Dossier de sortie"
|
|
)
|
|
parser.add_argument(
|
|
"-r", "--resolution",
|
|
type=float,
|
|
default=0.5,
|
|
help="Résolution en mètres"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
pipeline = LidarArchaeoPipeline(
|
|
input_dir=args.input,
|
|
output_dir=args.output,
|
|
resolution=args.resolution
|
|
)
|
|
pipeline.process_all()
|
|
except Exception as e:
|
|
print(f"Erreur: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|