Files
lidar_rendu/process_lidar.py
Jacquin Antoine d8502ff26e Pipeline LiDAR: 19 visualisations archéologiques, corrections SVF/Openness, légendes explicites
- Corriger SVF: ray-tracing 16 azimuts (inversion DEM remplacée)
- Corriger Openness: angle zénith/nadir sur 8 directions (min/max filter remplacé)
- Ajouter MSRM (LRM multi-échelle 5/10/25/50/100m)
- Ajouter TPI multi-échelle (5m + 100m)
- Ajouter détection dépressions (remplissage hydrologique)
- Ajouter SAILORE (LRM adaptatif f(pente))
- Ajouter rugosité de surface (écart-type local 5m)
- Ajouter anomalies statistiques (z-score + Moran's I)
- Ajouter ondelette Mexican Hat (CWT 5 échelles)
- Ajouter texture GLCM (contraste + entropie - homogénéité)
- Ajouter accumulation de flux (D8)
- Retirer geomorphons et VAT composite
- Répertoire par fichier LAZ dans visualisations/
- Support multi-CPU (-w/--workers)
- Légendes explicites avec différenciation entre techniques similaires
- Docker: uid/gid 1000:1000

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-09 21:13:20 +02:00

2694 lines
110 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Pipeline LiDAR pour détection archéologique - Version Python pur
Visualisations générées avec numpy/rasterio/matplotlib
"""
import os
import sys
import json
import subprocess
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
import numpy as np
import rasterio
from rasterio.transform import from_bounds
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from matplotlib import rcParams
from scipy import ndimage
from scipy.stats import binned_statistic_2d
from tqdm import tqdm
try:
import laspy
except ImportError as e:
print(f"Erreur import: {e}")
sys.exit(1)
try:
from rasterio.warp import transform as warp_transform
HAS_WARP = True
except ImportError:
HAS_WARP = False
rcParams['figure.dpi'] = 150
rcParams['savefig.dpi'] = 300
rcParams['font.size'] = 10
class LidarArchaeoPipeline:
"""Pipeline Python pur pour analyse archéologique LiDAR"""
def __init__(self, input_dir, output_dir, resolution=0.5, workers=1):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.resolution = resolution
self.workers = workers
self.temp_dir = self.output_dir / "temp"
if not self.input_dir.exists():
raise ValueError(f"Répertoire introuvable: {self.input_dir}")
self.output_dir.mkdir(parents=True, exist_ok=True)
self.temp_dir.mkdir(exist_ok=True)
self.dtm_dir = self.output_dir / "DTM"
self.vis_dir = self.output_dir / "visualisations"
self.pdf_dir = self.output_dir / "rapports"
for d in [self.dtm_dir, self.vis_dir, self.pdf_dir]:
d.mkdir(exist_ok=True)
print(f"✓ Pipeline initialisé (Python pur)")
print(f" Entrée : {self.input_dir}")
print(f" Sortie : {self.output_dir}")
print(f" Résolution : {resolution}m/px")
print(f" Workers : {workers}")
def find_laz_files(self):
"""Trouve tous les fichiers LAZ/LAS"""
files = list(self.input_dir.glob("*.laz")) + list(self.input_dir.glob("*.las"))
print(f"{len(files)} fichier(s) LiDAR trouvé(s)")
return sorted(files)
def check_tools(self):
"""Vérifie que les outils sont disponibles"""
for name, cmd in [('pdal', 'pdal --version'), ('gdal', 'gdalinfo --version')]:
try:
subprocess.run(cmd.split(), capture_output=True, check=True)
print(f"{name}")
except (subprocess.CalledProcessError, FileNotFoundError):
return False
return True
# ============ STEP 1: Classification ============
def create_pipeline_json(self, input_laz, output_las):
"""Create PDAL pipeline for ground classification"""
pipeline = {
"pipeline": [
str(input_laz),
{
"type": "filters.smrf",
"ignore": "Classification[7:7]",
"slope": 1.0,
"window": 16.0,
"threshold": 0.5,
"scalar": 1.25
},
{
"type": "filters.range",
"limits": "Classification[2:2]"
},
{
"type": "writers.las",
"filename": str(output_las),
"extra_dims": "all"
}
]
}
return json.dumps(pipeline)
def classify_ground(self, laz_file):
"""Classify ground points using SMRF filter"""
output_las = self.temp_dir / f"{laz_file.stem}_ground.las"
if output_las.exists():
print(f" ✓ Classification déjà effectuée")
return output_las
pipeline_json = self.create_pipeline_json(laz_file, output_las)
pipeline_file = self.temp_dir / "pipeline.json"
with open(pipeline_file, 'w') as f:
f.write(pipeline_json)
try:
subprocess.run(
["pdal", "pipeline", str(pipeline_file)],
capture_output=True,
check=True
)
print(f" ✓ Classification sol terminée")
return output_las
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode()
print(f" ✗ Erreur PDAL: {error_msg}")
# If error is about ReturnNumber=0, try filtering those points
if "ReturnNumber" in error_msg and "NumberOfReturns" in error_msg:
print(f" → Tentative de filtrage des points ReturnNumber=0...")
# Use filters.range to keep only valid points (ReturnNumber >= 1)
filtered_pipeline = [
{
"type": "readers.las",
"filename": str(laz_file)
},
{
"type": "filters.range",
"limits": "ReturnNumber[1:],NumberOfReturns[1:]"
},
{
"type": "filters.smrf",
"scalar": 1.25
},
{
"type": "filters.range",
"limits": "Classification[2:2]"
},
{
"type": "writers.las",
"filename": str(output_las),
"extra_dims": "all"
}
]
filtered_json = json.dumps(filtered_pipeline)
with open(pipeline_file, 'w') as f:
f.write(filtered_json)
try:
subprocess.run(
["pdal", "pipeline", str(pipeline_file)],
capture_output=True,
check=True
)
print(f" ✓ Classification sol terminée (points filtrés)")
return output_las
except subprocess.CalledProcessError as e2:
print(f" ✗ Erreur même avec filtrage: {e2.stderr.decode()}")
return None
return None
def run_pdal_pipeline(self, pipeline_json, output_file):
"""Exécute un pipeline PDAL"""
pipeline_file = self.temp_dir / "temp_pipeline.json"
with open(pipeline_file, 'w') as f:
f.write(pipeline_json)
try:
subprocess.run(
["pdal", "pipeline", str(pipeline_file)],
capture_output=True,
check=True
)
print(f" ✓ Pipeline terminé")
return output_file
except subprocess.CalledProcessError as e:
print(f" ✗ Erreur PDAL: {e.stderr.decode()}")
return None
# ============ STEP 2: DTM Generation ============
def create_dtm_fast(self, las_file, basename):
"""Create DTM using fast binning method with gap filling"""
print(f" → Génération DTM...")
try:
import laspy
from scipy.ndimage import gaussian_filter
# Read LAZ/LAS file
las = laspy.read(str(las_file))
# Get bounds
min_x, max_x = float(las.header.min[0]), float(las.header.max[0])
min_y, max_y = float(las.header.min[1]), float(las.header.max[1])
# Calculate grid dimensions
width = int(np.ceil((max_x - min_x) / self.resolution))
height = int(np.ceil((max_y - min_y) / self.resolution))
print(f" Bounds: X[{min_x:.1f}, {max_x:.1f}] Y[{min_y:.1f}, {max_y:.1f}]")
print(f" Grid: {width}x{height} pixels ({len(las.points):,} points)")
# Fast binning method
print(f" Rasterisation rapide...")
stat = binned_statistic_2d(
las.x, las.y, las.z,
statistic='mean',
bins=[width, height],
range=[[min_x, max_x], [min_y, max_y]]
)
dtm = stat.statistic.T
# Flip Y axis so north is at the top (Y decreases from top to bottom in image)
dtm = dtm[::-1, :]
# Fill gaps using interpolation
print(f" Remplissage des zones vides...")
# Create mask of empty cells
mask = np.isnan(dtm)
if np.any(mask):
from scipy.ndimage import distance_transform_edt
# Calculate distance to nearest valid point for each cell
dist_to_nearest = distance_transform_edt(mask)
max_fill_distance = max(20, int(10 / self.resolution)) # Max 20px or 10m
# Use nearest-neighbor interpolation for speed (memory efficient)
# but apply strong Gaussian smoothing to remove artifacts
from scipy import interpolate
y_coords, x_coords = np.where(~mask)
z_values = dtm[~mask]
if len(y_coords) > 100:
# Nearest neighbor interpolation (fast, memory-efficient)
interp = interpolate.NearestNDInterpolator(
np.column_stack((y_coords, x_coords)),
z_values
)
# Only interpolate where needed (memory-efficient)
y_missing, x_missing = np.where(mask)
dtm[y_missing, x_missing] = interp(y_missing, x_missing)
# Apply Gaussian smoothing to entire DTM to remove blocky artifacts
# Stronger smoothing in areas that were interpolated
dtm_smooth = gaussian_filter(dtm, sigma=2.0)
# Only use smoothed values for interpolated areas, keep original for valid data
# Also only fill within max distance from valid data
fill_mask = mask & (dist_to_nearest <= max_fill_distance)
dtm[fill_mask] = dtm_smooth[fill_mask]
# For cells beyond max distance, leave as NaN
far_mask = mask & (dist_to_nearest > max_fill_distance)
dtm[far_mask] = np.nan
# Save as GeoTIFF
output_tif = self.dtm_dir / f"{basename}_dtm.tif"
transform = from_bounds(min_x, min_y, max_x, max_y, width, height)
with rasterio.open(
output_tif,
'w',
driver='GTiff',
height=height,
width=width,
count=1,
dtype='float32',
crs='EPSG:2154',
transform=transform,
compress='lzw'
) as dst:
dst.write(dtm.astype('float32'), 1)
print(f" ✓ DTM créé: {output_tif.name}")
return output_tif
except Exception as e:
print(f" ✗ Erreur DTM: {e}")
import traceback
traceback.print_exc()
return None
# ============ STEP 3: Visualizations (Python) ============
def generate_hillshade(self, dem_file, basename):
"""Generate hillshade using numpy"""
print(f" → Hillshade multidirectionnel...")
output = self.vis_dir / f"{basename}_hillshade_multi.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
# Calculate gradients
dy, dx = np.gradient(dem)
# Multi-directional hillshade (NW, NE, SW, SE)
azimuts = [315, 45, 225, 135]
altitude = 30
hillshades = []
for az in azimuts:
az_rad = np.radians(az)
alt_rad = np.radians(altitude)
# Calculate slope and aspect
slope = np.arctan(np.sqrt(dx**2 + dy**2))
aspect = np.arctan2(dy, dx)
# Hillshade formula
hs = np.sin(alt_rad) * np.sin(slope) + \
np.cos(alt_rad) * np.cos(slope) * np.cos(az_rad - aspect)
hillshades.append(np.clip(hs, 0, 1))
# Combine all directions
combined = np.mean(hillshades, axis=0)
# Save
with rasterio.open(
output,
'w',
driver='GTiff',
height=combined.shape[0],
width=combined.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(combined.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur hillshade: {e}")
return None
def generate_hillshade_ne(self, dem_file, basename):
"""Generate hillshade from NW (azimuth 315deg) like topographic maps"""
print(f" → Hillshade Nord-Est (carte topo)...")
output = self.vis_dir / f"{basename}_hillshade_ne.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
# Single-direction hillshade from NW (315°) - standard for topo maps
# Since DTM Y-axis is flipped (north at top, Y increases downward in image),
# use +dy in aspect calculation (not -dy)
azimuth = 315
altitude = 45
dy, dx = np.gradient(dem)
slope = np.arctan(np.sqrt(dx**2 + dy**2))
# Use +dy because DTM is already flipped (north at top)
aspect = np.arctan2(dy, dx)
az_rad = np.radians(azimuth)
alt_rad = np.radians(altitude)
hs = np.sin(alt_rad) * np.sin(slope) + \
np.cos(alt_rad) * np.cos(slope) * np.cos(az_rad - aspect)
hs = np.clip(hs, 0, 1)
# Save
with rasterio.open(
output,
'w',
driver='GTiff',
height=hs.shape[0],
width=hs.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(hs.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur hillshade NE: {e}")
return None
def generate_slope(self, dem_file, basename):
"""Generate slope map"""
print(f" → Pente (Slope)...")
output = self.vis_dir / f"{basename}_slope.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
# Calculate slope
dy, dx = np.gradient(dem)
slope = np.arctan(np.sqrt(dx**2 + dy**2)) * 180 / np.pi
# Save
with rasterio.open(
output,
'w',
driver='GTiff',
height=slope.shape[0],
width=slope.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(slope.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur slope: {e}")
return None
def generate_aspect(self, dem_file, basename):
"""Generate aspect (orientation of slopes) map"""
print(f" → Aspect (Orientation)...")
output = self.vis_dir / f"{basename}_aspect.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
# Calculate aspect (direction of slope)
dy, dx = np.gradient(dem)
aspect = np.arctan2(dy, dx) * 180 / np.pi
aspect = np.mod(aspect, 360) # Convert to 0-360
# Save
with rasterio.open(
output,
'w',
driver='GTiff',
height=aspect.shape[0],
width=aspect.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(aspect.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur aspect: {e}")
return None
def generate_curvature(self, dem_file, basename):
"""Generate curvature (terrain concavity/convexity) map"""
print(f" → Courbure (Curvature)...")
output = self.vis_dir / f"{basename}_curvature.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
# Calculate curvature using Laplacian
dz_dx = np.gradient(dem, axis=1)
dz_dy = np.gradient(dem, axis=0)
d2z_dx2 = np.gradient(dz_dx, axis=1)
d2z_dy2 = np.gradient(dz_dy, axis=0)
# Mean curvature
curvature = (d2z_dx2 + d2z_dy2) / 2
# Save
with rasterio.open(
output,
'w',
driver='GTiff',
height=curvature.shape[0],
width=curvature.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(curvature.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur curvature: {e}")
return None
def generate_solar(self, dem_file, basename):
"""Generate solar irradiance simulation"""
print(f" → Éclairage Solaire (Solar Irradiance)...")
output = self.vis_dir / f"{basename}_solar.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
# Calculate gradients
dy, dx = np.gradient(dem)
# Calculate slope and aspect
slope = np.arctan(np.sqrt(dx**2 + dy**2))
aspect = np.arctan2(dy, dx)
# Solar irradiance (morning sun - azimuth 90, altitude 30)
az_rad = np.radians(90)
alt_rad = np.radians(30)
# Solar radiation formula
solar = np.sin(alt_rad) * np.sin(slope) + \
np.cos(alt_rad) * np.cos(slope) * np.cos(az_rad - aspect)
# Clip negative values (shadows)
solar = np.clip(solar, 0, 1)
# Save
with rasterio.open(
output,
'w',
driver='GTiff',
height=solar.shape[0],
width=solar.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(solar.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur solar: {e}")
return None
def generate_lrm(self, dem_file, basename):
"""Local Relief Model - deviation from local mean"""
print(f" → Local Relief Model...")
output = self.vis_dir / f"{basename}_lrm.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
# Calculate local mean with gaussian filter
from scipy.ndimage import gaussian_filter
local_mean = gaussian_filter(dem, sigma=15/self.resolution)
# Deviation from mean
lrm = dem - local_mean
# Save
with rasterio.open(
output,
'w',
driver='GTiff',
height=lrm.shape[0],
width=lrm.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(lrm.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur LRM: {e}")
return None
def generate_svf(self, dem_file, basename):
"""Sky-View Factor - ray-tracing on 16 azimuths.
For each pixel, trace rays in N directions, find the max horizon
angle in each direction, then SVF = (1/N) * sum(cos²(horizon_angle)).
Valleys/crevices have low SVF (obstructed sky), ridges/peaks have high SVF."""
print(f" → Sky-View Factor (ray-tracing)...")
output = self.vis_dir / f"{basename}_svf.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
rows, cols = dem.shape
res = self.resolution # meters per pixel
# 16 azimuth directions (0°, 22.5°, 45°, ... 337.5°)
n_dirs = 16
angles = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False)
dx = np.cos(angles)
dy = np.sin(angles)
# Maximum search distance (in pixels)
max_dist = int(50 / res) # 50m search radius
# Pad DEM with NaN to handle boundaries
padded = np.pad(dem.astype(np.float64), max_dist, mode='constant',
constant_values=np.nan)
svf = np.zeros_like(dem, dtype=np.float64)
for d_idx in range(n_dirs):
# Direction vector
ddx, ddy = dx[d_idx], dy[d_idx]
# For each step along the ray, compute the horizon angle
horizon = np.zeros_like(dem, dtype=np.float64)
for step in range(1, max_dist + 1):
# Offset in pixels (rounded to nearest integer)
px = int(round(ddx * step))
py = int(round(ddy * step))
# Distance in meters
dist_m = np.sqrt((ddx * step * res) ** 2 + (ddy * step * res) ** 2)
if dist_m < res * 0.5:
continue
# Elevation difference relative to center pixel
# Source is at (max_dist + row, max_dist + col) in padded array
# Target is at (max_dist + row + py, max_dist + col + px)
elev_diff = padded[max_dist + py:max_dist + py + rows,
max_dist + px:max_dist + px + cols] - dem
# Horizon angle = arctan(elev_diff / dist)
angle = np.arctan2(elev_diff, dist_m)
# Keep maximum horizon angle
horizon = np.where(np.isnan(angle), horizon,
np.maximum(horizon, np.nan_to_num(angle, nan=0)))
# SVF contribution: cos²(zenith) where zenith = pi/2 - horizon
# Higher horizon = less sky visible
svf += np.cos(np.pi / 2 - horizon) ** 2
svf /= n_dirs # Average over all directions
# Save
with rasterio.open(
output,
'w',
driver='GTiff',
height=svf.shape[0],
width=svf.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(svf.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur SVF: {e}")
return None
def generate_openness(self, dem_file, basename, positive=True):
"""Positive/Negative Openness - true zenith/nadir angle computation.
For each pixel, in 8 directions (N, NE, E, SE, S, SW, W, NW):
- Positive openness: max zenith angle (angle from vertical to highest visible terrain)
- Negative openness: max nadir angle (angle from vertical down to lowest terrain)
Result is averaged across all 8 directions."""
name = "positive_openness" if positive else "negative_openness"
print(f"{name.replace('_', ' ').title()} (ray-tracing)...")
output = self.vis_dir / f"{basename}_{name}.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
rows, cols = dem.shape
res = self.resolution
# 8 cardinal directions
n_dirs = 8
angles = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False)
dx = np.cos(angles)
dy = np.sin(angles)
max_dist = int(50 / res) # 50m search radius
# Pad DEM with NaN to handle boundaries
padded = np.pad(dem.astype(np.float64), max_dist, mode='constant',
constant_values=np.nan)
openness_sum = np.zeros_like(dem, dtype=np.float64)
for d_idx in range(n_dirs):
ddx, ddy = dx[d_idx], dy[d_idx]
# For positive openness: find max upward angle (zenith)
# For negative openness: find max downward angle (nadir)
max_angle = np.zeros_like(dem, dtype=np.float64)
for step in range(1, max_dist + 1):
px = int(round(ddx * step))
py = int(round(ddy * step))
dist_m = np.sqrt((ddx * step * res) ** 2 + (ddy * step * res) ** 2)
if dist_m < res * 0.5:
continue
elev_diff = padded[max_dist + py:max_dist + py + rows,
max_dist + px:max_dist + px + cols] - dem
if positive:
# Positive openness: angle from vertical to terrain above
# Only consider points higher than center (elev_diff > 0)
angle = np.arctan2(np.maximum(elev_diff, 0), dist_m)
else:
# Negative openness: angle from vertical to terrain below
# Only consider points lower than center (elev_diff < 0)
angle = np.arctan2(np.maximum(-elev_diff, 0), dist_m)
max_angle = np.where(np.isnan(angle), max_angle,
np.maximum(max_angle, np.nan_to_num(angle, nan=0)))
openness_sum += max_angle
# Average across all directions (convert to degrees)
openness_result = np.degrees(openness_sum / n_dirs)
# Save
with rasterio.open(
output,
'w',
driver='GTiff',
height=openness_result.shape[0],
width=openness_result.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(openness_result.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur openness: {e}")
return None
def generate_mslrm(self, dem_file, basename):
"""Multi-Scale Relief Model (MSRM) - LRM computed at multiple scales
(sigma=5,10,25,50,100) and combined. Reveals features at all scales:
small (walls, ditches), medium (enclosures, tumulus), large (landscapes)."""
print(f" → Multi-Scale Relief Model (MSRM)...")
output = self.vis_dir / f"{basename}_mslrm.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
from scipy.ndimage import gaussian_filter
# Compute LRM at multiple scales
sigmas = [5, 10, 25, 50, 100]
lrm_stack = []
for sigma in sigmas:
sigma_px = sigma / self.resolution
local_mean = gaussian_filter(dem, sigma=sigma_px)
lrm = dem - local_mean
# Normalize each scale
lrm_norm = lrm / max(np.nanstd(lrm), 0.01)
lrm_stack.append(lrm_norm)
# Combine: RMS of normalized LRM at all scales
mslrm = np.sqrt(np.mean(np.array(lrm_stack) ** 2, axis=0))
with rasterio.open(
output,
'w',
driver='GTiff',
height=mslrm.shape[0],
width=mslrm.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(mslrm.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur MSRM: {e}")
return None
def generate_tpi(self, dem_file, basename):
"""Multi-Scale Topographic Position Index.
TPI = elevation - mean(neighborhood).
Computed at fine (5m) and broad (100m) scales to identify
ridges, valleys, and intermediate landforms."""
print(f" → TPI multi-echelle...")
output = self.vis_dir / f"{basename}_tpi.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
from scipy.ndimage import uniform_filter
# Fine-scale TPI (5m radius)
fine_size = int(5 / self.resolution)
if fine_size % 2 == 0:
fine_size += 1
tpi_fine = dem - uniform_filter(dem, size=fine_size)
# Broad-scale TPI (100m radius)
broad_size = int(100 / self.resolution)
if broad_size % 2 == 0:
broad_size += 1
tpi_broad = dem - uniform_filter(dem, size=broad_size)
# Combine: fine-scale weighted more for archaeological features
# Normalize each scale then weight: 0.6 fine + 0.4 broad
fine_std = max(np.nanstd(tpi_fine), 0.01)
broad_std = max(np.nanstd(tpi_broad), 0.01)
tpi_combined = 0.6 * (tpi_fine / fine_std) + 0.4 * (tpi_broad / broad_std)
with rasterio.open(
output,
'w',
driver='GTiff',
height=tpi_combined.shape[0],
width=tpi_combined.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(tpi_combined.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur TPI: {e}")
return None
def generate_vat(self, dem_file, basename):
"""VAT composite (Visualisation for Archaeological Topography).
Fuses hillshade (R), slope (G), and SVF (B) into an RGB composite
that reveals all micro-topographic features in a single image."""
print(f" → VAT composite...")
output = self.vis_dir / f"{basename}_vat.tif"
try:
# Generate required rasters if not already available
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
from scipy.ndimage import gaussian_filter
rows, cols = dem.shape
# Hillshade (luminance) for R channel
gy, gx = np.gradient(dem, self.resolution)
slope = np.arctan(np.sqrt(gx**2 + gy**2))
# Illumination from NW (azimuth 315°, altitude 45°)
alt_rad = np.radians(45)
az_rad = np.radians(315)
shading = (np.sin(alt_rad) * np.cos(slope) +
np.cos(alt_rad) * np.sin(slope) *
np.cos(az_rad - np.arctan2(gy, gx)))
hillshade = np.clip(shading, 0, 1)
# Slope for G channel
slope_deg = np.degrees(slope)
slope_norm = np.clip(slope_deg / 45, 0, 1)
# SVF (simplified fast version for composite) for B channel
n_dirs_svf = 8
angles_svf = np.linspace(0, 2 * np.pi, n_dirs_svf, endpoint=False)
dx_svf = np.cos(angles_svf)
dy_svf = np.sin(angles_svf)
max_dist_svf = int(30 / self.resolution)
padded = np.pad(dem.astype(np.float64), max_dist_svf, mode='constant',
constant_values=np.nan)
svf = np.zeros_like(dem, dtype=np.float64)
for d_idx in range(n_dirs_svf):
horizon = np.zeros_like(dem, dtype=np.float64)
for step in range(1, max_dist_svf + 1):
px = int(round(dx_svf[d_idx] * step))
py = int(round(dy_svf[d_idx] * step))
dist_m = np.sqrt((dx_svf[d_idx] * step * self.resolution) ** 2 +
(dy_svf[d_idx] * step * self.resolution) ** 2)
if dist_m < self.resolution * 0.5:
continue
elev_diff = padded[max_dist_svf + py:max_dist_svf + py + rows,
max_dist_svf + px:max_dist_svf + px + cols] - dem
angle = np.arctan2(elev_diff, dist_m)
horizon = np.where(np.isnan(angle), horizon,
np.maximum(horizon, np.nan_to_num(angle, nan=0)))
svf += np.cos(np.pi / 2 - horizon) ** 2
svf /= n_dirs_svf
# Normalize each channel to 0-1 using percentile stretch
def stretch(arr, p_low=2, p_high=98):
valid = arr[~np.isnan(arr)]
if len(valid) == 0:
return arr
lo, hi = np.percentile(valid, (p_low, p_high))
if hi - lo < 1e-10:
return np.zeros_like(arr)
return np.clip((arr - lo) / (hi - lo), 0, 1)
r = stretch(hillshade, 1, 99)
g = stretch(slope_norm, 2, 95)
b = stretch(svf, 5, 95)
# Stack as RGB (3-band GeoTIFF)
rgb = np.stack([r, g, b], axis=0).astype('float32')
with rasterio.open(
output,
'w',
driver='GTiff',
height=rows,
width=cols,
count=3,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
for i in range(3):
dst.write(rgb[i], i + 1)
return output
except Exception as e:
print(f" ✗ Erreur VAT: {e}")
return None
def generate_depressions(self, dem_file, basename):
"""Depression detection using hydrological sink filling.
Fills depressions in the DEM and computes the difference to identify
dolines, sinkholes, cavities, and enclosed basins."""
print(f" → Detection depressions (hydrologique)...")
output = self.vis_dir / f"{basename}_depressions.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
# Fill depressions using iterative approach (scipy)
# Priority-flood algorithm approximation
from scipy.ndimage import binary_dilation, generate_binary_structure
# Replace NaN with a very high value for filling
dem_filled = dem.copy()
nodata_mask = np.isnan(dem_filled)
dem_filled[nodata_mask] = np.nanmax(dem) + 1000
# Iterative depression filling
# A pixel is a sink if it's lower than all its neighbors
# Fill by raising sinks to the minimum of their neighbors
changed = True
iterations = 0
max_iter = 100
struct = generate_binary_structure(2, 2) # 8-connectivity
while changed and iterations < max_iter:
# Find local minima (sinks)
from scipy.ndimage import minimum_filter
neighbor_min = minimum_filter(dem_filled, footprint=struct)
# A pixel is a sink if it's lower than the minimum of its neighbors
sinks = (dem_filled < neighbor_min) & ~nodata_mask
if not np.any(sinks):
break
# Fill sinks to the level of the lowest neighbor
new_dem = np.maximum(dem_filled, neighbor_min)
new_dem[nodata_mask] = np.nan
changed = np.any(new_dem != dem_filled)
dem_filled = new_dem
iterations += 1
# Difference = depth of depressions
# Only keep positive differences (actual depressions)
depressions = dem_filled - dem
depressions[nodata_mask] = np.nan
# Only keep real depressions (positive depth)
depressions = np.where(depressions > 0.01, depressions, 0)
with rasterio.open(
output,
'w',
driver='GTiff',
height=depressions.shape[0],
width=depressions.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(depressions.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur depressions: {e}")
return None
def generate_sailore(self, dem_file, basename):
"""SAILORE - Self-Adaptive Improved Local Relief Model.
Kernel size adapts to local slope: flat areas get larger kernels,
steep areas get smaller kernels. Better than fixed LRM for heterogeneous terrain."""
print(f" → SAILORE (LRM adaptatif)...")
output = self.vis_dir / f"{basename}_sailore.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
from scipy.ndimage import gaussian_filter, uniform_filter
# Compute slope for adaptive kernel sizing
gy, gx = np.gradient(dem, self.resolution)
slope = np.arctan(np.sqrt(gx**2 + gy**2))
slope_deg = np.degrees(slope)
# Adaptive sigma: flat terrain (low slope) = large kernel, steep = small
# slope 0° -> sigma_max (25m), slope 30° -> sigma_min (2m)
sigma_min = 2.0 / self.resolution
sigma_max = 25.0 / self.resolution
# Normalize slope to 0-1 range
slope_norm = np.clip(slope_deg / 30.0, 0, 1)
# Invert: flat areas get high sigma, steep areas get low sigma
adaptive_sigma = sigma_max - slope_norm * (sigma_max - sigma_min)
# Compute local mean with adaptive Gaussian (approximated by blending)
# For efficiency, compute at 3 fixed scales and blend based on slope
lrm_fine = dem - gaussian_filter(dem, sigma=sigma_min)
lrm_medium = dem - gaussian_filter(dem, sigma=(sigma_min + sigma_max) / 2)
lrm_coarse = dem - gaussian_filter(dem, sigma=sigma_max)
# Blend based on slope: steep -> fine, flat -> coarse
w_fine = slope_norm
w_medium = 1 - 2 * np.abs(slope_norm - 0.5)
w_coarse = 1 - slope_norm
# Normalize weights
w_total = w_fine + w_medium + w_coarse
w_total[w_total == 0] = 1
sailore = (w_fine * lrm_fine + w_medium * lrm_medium + w_coarse * lrm_coarse) / w_total
with rasterio.open(
output,
'w',
driver='GTiff',
height=sailore.shape[0],
width=sailore.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(sailore.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur SAILORE: {e}")
return None
def generate_geomorphons(self, dem_file, basename):
"""Geomorphons - classification of terrain into 10 landform types
using ternary pattern of zenith/nadir angles in 8 directions.
Types: flat, peak, ridge, shoulder, spur, slope, hollow, footslope,
valley, pit."""
print(f" → Geomorphons (10 formes de terrain)...")
output = self.vis_dir / f"{basename}_geomorphons.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
rows, cols = dem.shape
# Flatness threshold in meters
flat_threshold = 1.0 # 1m difference considered flat
# 8 directions (N, NE, E, SE, S, SW, W, NW)
n_dirs = 8
angles_g = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False)
dx_g = np.cos(angles_g)
dy_g = np.sin(angles_g)
# Lookup distance in pixels
lookup = int(10 / self.resolution) # 10m lookup distance
# Pad DEM
padded = np.pad(dem.astype(np.float64), lookup, mode='edge')
# For each direction, determine if higher (+1), flat (0), or lower (-1)
pattern = np.zeros((n_dirs, rows, cols), dtype=np.int8)
for d in range(n_dirs):
px = int(round(dx_g[d] * lookup))
py = int(round(dy_g[d] * lookup))
neighbor = padded[lookup + py:lookup + py + rows,
lookup + px:lookup + px + cols]
diff = neighbor - dem
pattern[d] = np.where(diff > flat_threshold, 1,
np.where(diff < -flat_threshold, -1, 0))
# Map ternary patterns to 10 landform types
# Using the standard geomorphon classification
# Count positive, negative, and flat directions
n_positive = np.sum(pattern == 1, axis=0)
n_negative = np.sum(pattern == -1, axis=0)
n_flat = np.sum(pattern == 0, axis=0)
# Classification rules (standard geomorphons)
geomorphons = np.full((rows, cols), 0, dtype=np.int8)
# 0=flat, 1=peak, 2=ridge, 3=shoulder, 4=spur,
# 5=slope, 6=hollow, 7=footslope, 8=valley, 9=pit
# Flat: all 8 directions are flat
geomorphons[n_flat == 8] = 0
# Peak: all directions look up (all positive)
geomorphons[n_positive == 8] = 1
# Pit: all directions look down (all negative)
geomorphons[n_negative == 8] = 9
# Ridge: at least 3 positive, no negative, not all positive
geomorphons[(n_positive >= 3) & (n_negative == 0) & (n_positive < 8)] = 2
# Valley: at least 3 negative, no positive, not all negative
geomorphons[(n_negative >= 3) & (n_positive == 0) & (n_negative < 8)] = 8
# Shoulder: mixed positive+flat, some negative
geomorphons[(n_positive >= 2) & (n_negative >= 1) & (n_positive > n_negative)] = 3
# Spur: some positive, more flat than negative
geomorphons[(n_positive >= 1) & (n_negative >= 1) & (n_positive > n_negative) &
(geomorphons == 0)] = 4
# Slope: mostly flat with some negative or positive
geomorphons[(n_flat >= 4) & (n_positive + n_negative <= 4) & (geomorphons == 0)] = 5
# Hollow: more negative than positive, some flat
geomorphons[(n_negative >= 1) & (n_positive >= 1) & (n_negative > n_positive) &
(geomorphons == 0)] = 6
# Footslope: mixed negative+flat, some positive
geomorphons[(n_negative >= 2) & (n_positive >= 1) & (n_negative > n_positive) &
(geomorphons == 0)] = 7
# Catch remaining as slope
geomorphons[geomorphons == 0] = 5
# Mask nodata
nodata_mask = np.isnan(dem)
geomorphons[nodata_mask] = 0
with rasterio.open(
output,
'w',
driver='GTiff',
height=rows,
width=cols,
count=1,
dtype='int8',
crs=crs,
transform=transform,
compress='lzw',
nodata=0
) as dst:
dst.write(geomorphons.astype('int8'), 1)
return output
except Exception as e:
print(f" ✗ Erreur Geomorphons: {e}")
return None
def generate_roughness(self, dem_file, basename):
"""Surface roughness - standard deviation of elevation in a window
plus roughness index (ratio of surface area to planar area).
Reveals anthropogenic surfaces (walls, paths) vs natural terrain."""
print(f" → Rugosite de surface...")
output = self.vis_dir / f"{basename}_roughness.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
from scipy.ndimage import generic_filter
# Standard deviation in 5m window
window_size = int(5 / self.resolution)
if window_size % 2 == 0:
window_size += 1
# Compute std deviation (surface roughness)
def std_filter(arr):
return np.nanstd(arr)
roughness = generic_filter(dem.astype(np.float64), std_filter,
size=window_size, mode='nearest')
with rasterio.open(
output,
'w',
driver='GTiff',
height=roughness.shape[0],
width=roughness.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(roughness.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur rugosite: {e}")
return None
def generate_anomalies(self, dem_file, basename):
"""Statistical anomaly detection - z-score of local relief
plus Local Moran's I for spatial autocorrelation.
Identifies statistically significant topographic anomalies."""
print(f" → Detection anomalies statistiques...")
output = self.vis_dir / f"{basename}_anomalies.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
# Z-score of LRM (local relief)
from scipy.ndimage import gaussian_filter
lrm = dem - gaussian_filter(dem, sigma=15 / self.resolution)
lrm_mean = np.nanmean(lrm)
lrm_std = max(np.nanstd(lrm), 0.01)
z_score = (lrm - lrm_mean) / lrm_std
# Local Moran's I for spatial clustering
# I_i = (x_i - x_mean) * sum(w_ij * (x_j - x_mean)) / (S0 * sigma²)
# Simplified: use local mean of z-score as spatial lag
from scipy.ndimage import uniform_filter
window = int(10 / self.resolution)
if window % 2 == 0:
window += 1
local_mean = uniform_filter(z_score, size=window)
local_var = uniform_filter(z_score ** 2, size=window) - local_mean ** 2
local_var = np.maximum(local_var, 0.01)
# Moran's I approximation: high positive = cluster, high negative = outlier
morans_i = z_score * (local_mean - np.nanmean(z_score)) / np.nanstd(z_score)
# Combine: absolute z-score (anomaly magnitude) * sign of Moran's I (cluster type)
# High positive = significant elevated cluster, high negative = significant depression cluster
anomaly_score = np.abs(z_score) * np.sign(morans_i)
with rasterio.open(
output,
'w',
driver='GTiff',
height=anomaly_score.shape[0],
width=anomaly_score.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(anomaly_score.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur anomalies: {e}")
return None
def generate_wavelet(self, dem_file, basename):
"""Mexican Hat wavelet (Ricker wavelet) multi-scale analysis.
Continuous Wavelet Transform at multiple scales to detect
circular/circular features like tumulus, ring ditches, enclosures."""
print(f" → Ondelette Mexican Hat multi-echelle...")
output = self.vis_dir / f"{basename}_wavelet.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
# Mexican Hat (Ricker) wavelet at multiple scales
# Uses scipy.ndimage.gaussian_laplace as 2D Mexican Hat approximation
scales = [2, 5, 10, 20, 50] # meters
wavelet_stack = []
for scale_m in scales:
# Create 1D Ricker wavelet and apply as 2D separable filter
sigma_px = scale_m / self.resolution
# 2D Mexican Hat = Laplacian of Gaussian
from scipy.ndimage import gaussian_laplace
response = -gaussian_laplace(dem.astype(np.float64), sigma=sigma_px)
# Normalize by scale
response /= max(np.nanstd(response), 0.01)
wavelet_stack.append(response)
# Combine: RMS of all scales
combined = np.sqrt(np.mean(np.array(wavelet_stack) ** 2, axis=0))
with rasterio.open(
output,
'w',
driver='GTiff',
height=combined.shape[0],
width=combined.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(combined.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur ondelette: {e}")
return None
def generate_texture(self, dem_file, basename):
"""GLCM texture analysis on hillshade - contrast, entropy, homogeneity.
Detects anthropogenic surfaces (plowing, paths, walls) vs natural terrain."""
print(f" → Texture GLCM...")
output = self.vis_dir / f"{basename}_texture.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
# First create a hillshade for texture analysis
gy, gx = np.gradient(dem, self.resolution)
slope = np.arctan(np.sqrt(gx**2 + gy**2))
alt_rad = np.radians(45)
az_rad = np.radians(315)
shading = (np.sin(alt_rad) * np.cos(slope) +
np.cos(alt_rad) * np.sin(slope) *
np.cos(az_rad - np.arctan2(gy, gx)))
hillshade = np.clip(shading, 0, 1)
# Quantize to 256 levels for GLCM
valid = hillshade[~np.isnan(hillshade)]
if len(valid) == 0:
raise ValueError("No valid data for texture analysis")
lo, hi = np.percentile(valid, (1, 99))
img = np.clip((hillshade - lo) / max(hi - lo, 0.001), 0, 1)
img_uint8 = (img * 255).astype(np.uint8)
# Compute GLCM texture using sklearn-style approach
# Simplified GLCM: local variance (proxy for contrast) and local entropy
from scipy.ndimage import generic_filter
window = int(5 / self.resolution)
if window % 2 == 0:
window += 1
# Local variance (contrast proxy)
def local_variance(arr):
arr_f = arr.astype(np.float64)
return np.var(arr_f)
contrast = generic_filter(img_uint8.astype(np.float64), local_variance,
size=window, mode='nearest')
# Local entropy approximation
def local_entropy(arr):
arr_f = arr.astype(np.float64)
hist, _ = np.histogram(arr_f, bins=16, range=(0, 256))
hist = hist / max(hist.sum(), 1)
hist = hist[hist > 0]
return -np.sum(hist * np.log2(hist))
entropy = generic_filter(img_uint8.astype(np.float64), local_entropy,
size=window, mode='nearest')
# Local homogeneity (inverse difference moment proxy)
def local_homogeneity(arr):
arr_f = arr.astype(np.float64)
mean_val = np.mean(arr_f)
return np.mean(1.0 / (1.0 + (arr_f - mean_val) ** 2))
homogeneity = generic_filter(img_uint8.astype(np.float64), local_homogeneity,
size=window, mode='nearest')
# Combine into composite texture index
# Normalize each component
def norm(arr):
valid = arr[~np.isnan(arr)]
if len(valid) == 0:
return arr
std = max(np.std(valid), 0.01)
return (arr - np.mean(valid)) / std
texture_combined = (0.4 * norm(contrast) +
0.4 * norm(entropy) -
0.2 * norm(homogeneity))
with rasterio.open(
output,
'w',
driver='GTiff',
height=texture_combined.shape[0],
width=texture_combined.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(texture_combined.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur texture GLCM: {e}")
return None
def generate_flow(self, dem_file, basename):
"""Flow accumulation using D8 algorithm.
Identifies drainage patterns, ditches, and enclosure boundaries.
Uses scipy for efficient pit-filling and flow routing."""
print(f" → Accumulation de flux D8...")
output = self.vis_dir / f"{basename}_flow.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
rows, cols = dem.shape
nodata_mask = np.isnan(dem)
# Fill pits first for proper flow routing
from scipy.ndimage import minimum_filter, generate_binary_structure
# Simple pit-filling: iterative depression filling
dem_filled = dem.copy()
dem_filled[nodata_mask] = np.nanmax(dem) + 1000
struct = generate_binary_structure(2, 2) # 8-connectivity
for _ in range(50):
neighbor_min = minimum_filter(dem_filled, footprint=struct)
sinks = (dem_filled < neighbor_min) & ~nodata_mask
if not np.any(sinks):
break
dem_filled = np.where(sinks, neighbor_min, dem_filled)
dem_filled[nodata_mask] = np.nan
# D8 flow direction: each cell flows to steepest descent neighbor
# 8 neighbors: E, SE, S, SW, W, NW, N, NE
dx8 = [1, 1, 0, -1, -1, -1, 0, 1]
dy8 = [0, 1, 1, 1, 0, -1, -1, -1]
# Distances (cardinal=1, diagonal=sqrt(2))
dist8 = [1.0, np.sqrt(2), 1.0, np.sqrt(2), 1.0, np.sqrt(2), 1.0, np.sqrt(2)]
# Compute flow direction for each cell
flow_dir = np.full((rows, cols), -1, dtype=np.int8)
max_slope = np.full((rows, cols), 0.0)
padded = np.pad(dem_filled, 1, mode='constant', constant_values=np.nanmax(dem_filled) + 10000)
for d in range(8):
# Neighbor elevation
nx = 1 + dx8[d]
ny = 1 + dy8[d]
neighbor_elev = padded[ny:ny + rows, nx:nx + cols]
# Slope to neighbor (positive = downhill)
slope = (dem_filled - neighbor_elev) / (dist8[d] * self.resolution)
slope[nodata_mask] = -1
# Update flow direction if this neighbor has steepest descent
better = slope > max_slope
flow_dir[better] = d
max_slope[better] = slope[better]
# Flow accumulation: count upstream cells
# Process cells from highest to lowest elevation
flat_dem = dem_filled[~nodata_mask].flatten()
valid_indices = np.where(~nodata_mask.flatten())[0]
# Sort by elevation (highest first = process upstream first)
sort_order = valid_indices[np.argsort(-flat_dem)]
flow_acc = np.ones((rows, cols), dtype=np.float32)
flow_acc[nodata_mask] = 0
# Accumulate flow
for idx in sort_order:
r, c = divmod(idx, cols)
d = flow_dir[r, c]
if d < 0:
continue
# Downstream cell
nr, nc = r + dy8[d], c + dx8[d]
if 0 <= nr < rows and 0 <= nc < cols and not nodata_mask[nr, nc]:
flow_acc[nr, nc] += flow_acc[r, c]
# Log-scale for visualization
flow_log = np.log1p(flow_acc)
with rasterio.open(
output,
'w',
driver='GTiff',
height=flow_log.shape[0],
width=flow_log.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(flow_log.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur flux: {e}")
return None
"""Detect cavities/depressions below ground level.
Uses local relief (difference between smoothed ground and actual terrain)
to highlight depressions, dolines, sinkholes and any below-ground features.
Surface features (mounds, walls) are suppressed so only depressions show."""
print(f" → Detection cavites (gouffres)...")
output = self.vis_dir / f"{basename}_cavity.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
from scipy.ndimage import uniform_filter, minimum_filter, gaussian_filter
from scipy.stats import binned_statistic_2d
# Method 1: Local Relief - compare terrain to smoothed reference
# Large window smoothing captures the regional ground level
# Small depressions (cavities, dolines) appear as negative values
window_large = int(100 / self.resolution) # 100m window
if window_large % 2 == 0:
window_large += 1
# Regional ground level (smoothed over large area)
regional_ground = uniform_filter(dem, size=window_large)
# Local relief = actual terrain minus regional ground
# Positive = above regional ground (hills, walls)
# Negative = below regional ground (cavities, dolines, sinkholes)
local_relief = dem - regional_ground
# Method 2: Minimum filter to detect sharp local depressions
# Compares each point to its minimum neighborhood
window_small = int(20 / self.resolution) | 1 # 20m window
local_min = minimum_filter(dem, size=window_small)
# Difference between local minimum and the actual terrain
# If terrain is much higher than local minimum, there's a deep pit nearby
pit_depth = dem - local_min
# Only keep negative values (terrain lower than neighbors = depression)
# This detects sharp, small depressions
# Combine both methods:
# Method 1 catches large gentle depressions (dolines)
# Method 2 catches sharp small depressions (sinkholes, wells)
# Take the most negative (deepest) signal
combined = np.minimum(local_relief, -pit_depth)
# Only show depressions (negative values = below ground)
# Set positive values (hills, walls) to 0 so they don't obscure cavities
combined = np.where(combined < 0, -combined, 0)
# Slight smoothing to reduce noise
combined = gaussian_filter(combined, sigma=1.0)
# Mask NaN areas
combined = np.where(np.isnan(dem), np.nan, combined)
# Save
with rasterio.open(
output,
'w',
driver='GTiff',
height=combined.shape[0],
width=combined.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(combined.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur cavity detection: {e}")
import traceback
traceback.print_exc()
return None
def generate_nodata_mask(self, dem_file, basename):
"""Generate a map showing areas without LiDAR coverage"""
print(f" → Carte couverture LiDAR...")
output = self.vis_dir / f"{basename}_nodata.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
# Create binary mask: 1 = has data, 0 = no data (NaN)
# Also create distance from data edge to show interpolated areas
from scipy.ndimage import distance_transform_edt
has_data = (~np.isnan(dem)).astype(np.float32)
no_data = np.isnan(dem).astype(np.float32)
# Distance to nearest data point (in pixels)
if np.any(no_data.astype(bool)):
dist_from_data = distance_transform_edt(no_data.astype(bool))
else:
dist_from_data = np.zeros_like(dem)
# Normalize: 0 = has data, increasing values = further from data
# Cap at a reasonable distance
max_dist = 50 # pixels
coverage = np.clip(dist_from_data / max_dist, 0, 1)
# Areas with data = 0, areas without data > 0
# For visualization, use negative values for data and positive for gaps
result = np.where(np.isnan(dem), coverage, -0.1) # -0.1 for data areas
# Save
with rasterio.open(
output,
'w',
driver='GTiff',
height=result.shape[0],
width=result.shape[1],
count=1,
dtype='float32',
crs=crs,
transform=transform,
compress='lzw'
) as dst:
dst.write(result.astype('float32'), 1)
return output
except Exception as e:
print(f" ✗ Erreur nodata mask: {e}")
import traceback
traceback.print_exc()
return None
def download_ign_tiles(self, min_x, max_x, min_y, max_y, layer, zoom_level=15):
"""Download IGN WMTS tiles for the given bounds using Web Mercator (PM).
Converts Lambert 93 bounds to lat/lon then to Web Mercator tile coordinates."""
import urllib.request
import io
import math
from PIL import Image as PILImage
# Convert Lambert 93 bounds to WGS84 lat/lon
try:
l93_xs = [min_x, max_x]
l93_ys = [max_y, min_y] # NW, SE
lons, lats = warp_transform('EPSG:2154', 'EPSG:4326', l93_xs, l93_ys)
nw_lat, nw_lon = lats[0], lons[0]
se_lat, se_lon = lats[1], lons[1]
except Exception:
print(f" ✗ Impossible de convertir les coordonnees")
return None
# IGN WMTS endpoint (free, no API key needed)
wmts_url = "https://data.geopf.fr/wmts"
tile_matrix_set = "PM" # Web Mercator (native cache)
tile_size = 256
# Web Mercator resolution at each zoom level (meters per pixel at equator)
# and tile calculation
n = 2 ** zoom_level # Number of tiles at this zoom level
# Calculate tile range from lat/lon bounds
# Web Mercator formulas
def lat_lon_to_tile(lat, lon, zoom):
n = 2 ** zoom
col = int((lon + 180) / 360 * n)
lat_rad = math.radians(lat)
row = int((1 - math.log(math.tan(lat_rad) + 1 / math.cos(lat_rad)) / math.pi) / 2 * n)
return col, row
col_min, row_min = lat_lon_to_tile(nw_lat, nw_lon, zoom_level)
col_max, row_max = lat_lon_to_tile(se_lat, se_lon, zoom_level)
# Calculate Web Mercator pixel coordinates for compositing
def lat_lon_to_px(lat, lon, zoom):
n = 2 ** zoom
px_x = (lon + 180) / 360 * n * tile_size
lat_rad = math.radians(lat)
px_y = (1 - math.log(math.tan(lat_rad) + 1 / math.cos(lat_rad)) / math.pi) / 2 * n * tile_size
return px_x, px_y
nw_px_x, nw_px_y = lat_lon_to_px(nw_lat, nw_lon, zoom_level)
se_px_x, se_px_y = lat_lon_to_px(se_lat, se_lon, zoom_level)
out_width = int(se_px_x - nw_px_x)
out_height = int(se_px_y - nw_px_y)
if out_width <= 0 or out_height <= 0 or out_width > 5000 or out_height > 5000:
return None
composite = np.full((out_height, out_width, 3), 255, dtype=np.uint8)
# Download and assemble tiles
tiles_downloaded = 0
fmt = "image/png" if 'PLAN' in layer else "image/jpeg"
for col in range(col_min, col_max + 1):
for row in range(row_min, row_max + 1):
url = (
f"{wmts_url}?SERVICE=WMTS&VERSION=1.0.0&REQUEST=GetTile"
f"&LAYER={layer}&STYLE=normal"
f"&TILEMATRIXSET={tile_matrix_set}"
f"&TILEMATRIX={zoom_level}&TILECOL={col}&TILEROW={row}"
f"&FORMAT={fmt}"
)
try:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=10) as response:
tile_data = response.read()
tile_img = PILImage.open(io.BytesIO(tile_data)).convert('RGB')
tile_arr = np.array(tile_img)
# Calculate tile position in Web Mercator pixels
tile_origin_x = col * tile_size
tile_origin_y = row * tile_size
# Position in composite image (relative to NW corner)
px_x = int(tile_origin_x - nw_px_x)
px_y = int(tile_origin_y - nw_px_y)
# Paste tile into composite
x_off = max(0, -px_x)
y_off = max(0, -px_y)
dst_x_start = max(0, px_x)
dst_y_start = max(0, px_y)
dst_x_end = min(out_width, px_x + tile_size)
dst_y_end = min(out_height, px_y + tile_size)
src_x = x_off
src_y = y_off
src_w = dst_x_end - dst_x_start
src_h = dst_y_end - dst_y_start
if src_w > 0 and src_h > 0 and tile_arr.shape[0] >= src_y + src_h and tile_arr.shape[1] >= src_x + src_w:
composite[dst_y_start:dst_y_end, dst_x_start:dst_x_end] = \
tile_arr[src_y:src_y+src_h, src_x:src_x+src_w]
tiles_downloaded += 1
except Exception as e:
if tiles_downloaded == 0 and col == col_min and row == row_min:
print(f" ✗ Erreur tuile IGN: {e}")
continue
print(f"{tiles_downloaded} tuiles IGN telechargees ({layer})")
if tiles_downloaded == 0:
return None
return composite
def generate_ign_overlay(self, dem_file, basename, layer, title, legend_label, description, out_suffix):
"""Generate an IGN basemap overlay visualization"""
print(f"{title}...")
output = self.vis_dir / f"{basename}_{out_suffix}.tif"
try:
with rasterio.open(dem_file) as src:
dem = src.read(1)
transform = src.transform
crs = src.crs
height, width = dem.shape
top_left_x = transform.c
top_left_y = transform.f
pixel_size_x = transform.a
pixel_size_y = abs(transform.e)
min_x = top_left_x
max_x = top_left_x + width * pixel_size_x
max_y = top_left_y
min_y = top_left_y - height * pixel_size_y
# Choose zoom level based on resolution
# At 0.5m/px, use zoom level 15 (3.19m/px IGN)
zoom = 15
result = self.download_ign_tiles(min_x, max_x, min_y, max_y, layer, zoom)
if result is None:
print(f" ✗ Impossible de telecharger les tuiles IGN")
return None
ign_img = result
# Resize IGN image to match our DTM dimensions
from PIL import Image as PILImage
ign_pil = PILImage.fromarray(ign_img)
ign_resized = ign_pil.resize((width, height), PILImage.LANCZOS)
ign_arr = np.array(ign_resized)
# Save as GeoTIFF
with rasterio.open(
output,
'w',
driver='GTiff',
height=height,
width=width,
count=3,
dtype='uint8',
crs=crs,
transform=transform,
) as dst:
for i in range(3):
dst.write(ign_arr[:, :, i], i + 1)
return output
except Exception as e:
print(f" ✗ Erreur IGN overlay: {e}")
import traceback
traceback.print_exc()
return None
# ============ STEP 4: PNG Conversion ============
def tif_to_png(self, tif_file):
"""Convert GeoTIFF to visualization PNG with GPS coordinates, external legend/scale"""
if not tif_file or not tif_file.exists():
return None
jpg_file = self.vis_dir / f"{tif_file.stem}.png"
try:
with rasterio.open(tif_file) as src:
# Check if this is a 3-band RGB image (ortho/topo)
is_rgb = src.count >= 3 and ('ortho' in str(tif_file) or 'topo' in str(tif_file))
if is_rgb:
# Read all 3 bands for RGB
data = src.read([1, 2, 3]) # Shape: (3, height, width)
data = np.moveaxis(data, 0, -1) # Shape: (height, width, 3)
else:
data = src.read(1)
nodata = src.nodata
transform = src.transform
crs = src.crs
# Get geographic bounds from transform
if is_rgb:
height, width, _ = data.shape
else:
height, width = data.shape
top_left_x = transform.c
top_left_y = transform.f
pixel_size_x = transform.a
pixel_size_y = abs(transform.e)
min_x = top_left_x
max_x = top_left_x + width * pixel_size_x
max_y = top_left_y
min_y = top_left_y - height * pixel_size_y
# Convert Lambert 93 corners to GPS (WGS84)
gps_coords = {}
if HAS_WARP and crs is not None:
try:
# Corner coordinates in Lambert 93
l93_xs = [min_x, max_x, min_x, max_x]
l93_ys = [max_y, max_y, min_y, min_y]
lons, lats = warp_transform(crs, 'EPSG:4326', l93_xs, l93_ys)
gps_coords = {
'NW': (lats[0], lons[0]),
'NE': (lats[1], lons[1]),
'SW': (lats[2], lons[2]),
'SE': (lats[3], lons[3]),
}
# Also compute tick positions along edges
n_ticks = 5
# Bottom edge (X axis): longitude ticks
tick_l93_x = np.linspace(min_x, max_x, n_ticks)
tick_l93_y_bottom = np.full(n_ticks, min_y)
tick_lons, tick_lats = warp_transform(crs, 'EPSG:4326', tick_l93_x, tick_l93_y_bottom)
gps_coords['x_ticks'] = list(zip(tick_lons, tick_lats))
# Left edge (Y axis): latitude ticks
tick_l93_y = np.linspace(min_y, max_y, n_ticks)
tick_l93_x_left = np.full(n_ticks, min_x)
tick_lons_y, tick_lats_y = warp_transform(crs, 'EPSG:4326', tick_l93_x_left, tick_l93_y)
gps_coords['y_ticks'] = list(zip(tick_lons_y, tick_lats_y))
except Exception:
gps_coords = {}
if nodata is not None and not is_rgb:
data = np.ma.masked_where((data == nodata) | np.isnan(data), data)
# For RGB images (ortho/topo), skip normalization and display directly
if is_rgb:
# RGB image: skip all normalization, display as-is
pass
else:
# Get valid data for normalization
valid_data = data.compressed() if hasattr(data, 'compressed') else data.flatten()
valid_data = valid_data[~np.isnan(valid_data)]
# Determine colormap, normalization, and legend info
if is_rgb:
# RGB images: no colormap needed, display directly
if 'ortho' in str(tif_file):
title = "Photographie Aerienne IGN"
legend_label = "Orthophotographie\nImage aerienne"
description = "Photographie aerienne IGN"
elif 'topo' in str(tif_file):
title = "Carte Topographique IGN"
legend_label = "Carte IGN\nPlan topographique"
description = "Carte topographique IGN (Plan IGN)"
else:
title = tif_file.stem.replace('_', ' ').title()
legend_label = "Image RGB"
description = ""
elif 'hillshade' in str(tif_file):
cmap = 'gray'
vmin, vmax = 0, 1
data = np.clip(data, vmin, vmax)
title = "Hillshade Multidirectionnel"
legend_label = "Illumination combinee de 6 directions\nBlanc = Face eclairee | Noir = Zone d'ombre"
description = "Ombres portees revelant micro-relief (murs, fosses, terrasses)"
elif 'slope' in str(tif_file):
cmap = 'inferno'
vmin, vmax = 0, np.percentile(valid_data, 95)
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
title = "Pente (Inclinaison du terrain)"
legend_label = f"Inclinaison en degres\nMin: {vmin:.1f} | Max: {vmax:.1f}\nClair = Forte pente | Sombre = Terrain plat"
description = "Murs, talus et bords ressortent en clair - terrain plat en sombre"
elif 'aspect' in str(tif_file):
cmap = 'hsv'
vmin, vmax = 0, 360
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
title = "Aspect (Direction des pentes)"
legend_label = "Direction vers laquelle le terrain descend\nRouge=Nord | Vert=Est | Cyan=Sud | Bleu=Ouest"
description = "Orientation des pentes - utile pour distinguer structures des formes naturelles"
elif 'curvature' in str(tif_file):
cmap = 'RdYlBu_r'
vmax = max(abs(np.percentile(valid_data, 5)), abs(np.percentile(valid_data, 95)), 0.001)
vmin, vmax = -vmax, vmax
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
title = "Courbure (Convexite/Concavite du terrain)"
legend_label = f"Changement de pente (1/m)\nRouge = Convexe (sommet de mur, levee)\nBleu = Concave (fond de fosse, depression)"
description = "Detecte les ruptures de pente - utile pour bords de terrasses et levees"
elif 'svf' in str(tif_file):
cmap = 'viridis'
vmin, vmax = np.percentile(valid_data, (5, 95))
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
title = "Sky-View Factor (Ray-tracing 16 directions)"
legend_label = "Fraction de ciel visible depuis chaque point\nSombre = Encaisse (fosses, vallees, rues)\nClair = Degage (sommets, plateformes, plateaux)"
description = "Ray-tracing sur 16 azimuts - elimine l'eclairage, detecte structures lineaires et enclos"
elif 'mslrm' in str(tif_file):
cmap = 'RdBu_r'
vmax = max(abs(np.percentile(valid_data, 2)), abs(np.percentile(valid_data, 98)), 0.1)
vmin, vmax = -vmax, vmax
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
title = "MSRM - Multi-Scale Relief Model (5 echelles)"
legend_label = f"Relief combine a 5 echelles (5m a 100m)\nRouge = Surelevation (mur, tumulus, levee)\nBleu = Depression (fosse, douve, fosse)\n\nDifference avec LRM:\nLRM = 1 echelle (15m)\nMSRM = 5 echelles combinees\nMSRM detecte du micro au macro"
description = "Combine LRM a 5 echelles - detecte structures de 5m a 100m simultanement"
elif 'lrm' in str(tif_file):
cmap = 'RdBu_r'
vmax = max(abs(np.percentile(valid_data, 2)), abs(np.percentile(valid_data, 98)), 0.1)
vmin, vmax = -vmax, vmax
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
title = "LRM - Local Relief Model (echelle unique 15m)"
legend_label = f"Ecart local par rapport au terrain moyen (m)\nRouge = Surelevation (+{vmax:.2f}m)\nBleu = Depression ({vmin:.2f}m)\nNoyau gaussien unique de 15m"
description = "Micro-relief a 15m seulement - voir MSRM pour toutes les echelles"
elif 'positive' in str(tif_file):
cmap = 'YlOrBr'
vmin, vmax = np.percentile(valid_data, (10, 98))
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
title = "Openness Positive (ouverture vers le haut)"
legend_label = f"Angle d'ouverture vers le haut (deg)\nClair = Vue degagee vers le ciel (sommets, plateaux)\nSombre = Vue bouchee (vallees encaissees)"
description = "Ray-tracing 8 directions - complementaire de la negative pour detecter cretes"
elif 'negative' in str(tif_file):
cmap = 'GnBu_r'
vmin, vmax = np.percentile(valid_data, (10, 98))
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
title = "Openness Negative (ouverture vers le bas)"
legend_label = f"Angle d'ouverture vers le bas (deg)\nClair = Surplomb (bords de fosse, grottes)\nSombre = Terrain plat (fonds de vallee)\nMeilleur detecteur de cavites et dolines"
description = "Ray-tracing 8 directions - detecte fosses, dolines, souterrains, dolines"
elif 'tpi' in str(tif_file):
cmap = 'BrBG'
vmax = max(abs(np.percentile(valid_data, 2)), abs(np.percentile(valid_data, 98)), 0.1)
vmin, vmax = -vmax, vmax
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
title = "TPI - Topographic Position Index (2 echelles)"
legend_label = "Position dans le paysage\nBrun/Sombre = Plus bas que le voisinage (fosse, vallee)\nVert/Clair = Plus haut que le voisinage (crete, plateau)\nCombine echelle fine 5m + large 100m"
description = "Identifie la position topographique - utile pour repeter cretes vs vallees a grande echelle"
elif 'depressions' in str(tif_file):
cmap = 'YlOrRd'
vmin, vmax = 0, max(np.percentile(valid_data, 99), 0.1)
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
title = "Depressions (Remplissage hydrologique)"
legend_label = f"Profondeur des cuvettes detectees (m)\nTransparent = Pas de depression\nJaune = Depression legere | Rouge = Depression profonde\nMax: {vmax:.2f}m"
description = "Simule le remplissage d'eau - detecte dolines, sinkholes, cuvettes et zones inondables"
elif 'sailore' in str(tif_file):
cmap = 'seismic'
vmax = max(abs(np.percentile(valid_data, 2)), abs(np.percentile(valid_data, 98)), 0.1)
vmin, vmax = -vmax, vmax
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
title = "SAILORE - LRM Auto-Adaptatif"
legend_label = f"Relief local adaptatif (m)\nRouge = Surelevation | Bleu = Depression\n\nDifference avec LRM/MSRM:\nLRM = noyau fixe 15m\nMSRM = 5 noyaux fixes\nSAILORE = noyau adapte a la pente\nPlat=grand noyau | Pente=petit noyau"
description = "Noyau qui s'adapte a la pente locale - terrain plat=grand noyau, pente=petit noyau"
elif 'roughness' in str(tif_file):
cmap = 'magma'
vmin, vmax = 0, np.percentile(valid_data, 97)
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
title = "Rugosite de Surface (Ecart-type local 5m)"
legend_label = f"Irregularite du terrain dans un voisinage de 5m\nSombre = Surface lisse (route, mur, sol plat)\nClair = Surface rugueuse (vegetation, ruines, pierres)\nMax: {vmax:.2f}m"
description = "Mesure la variabilite locale - surfaces anthropiques lisses vs naturelles rugueuses"
elif 'anomalies' in str(tif_file):
cmap = 'coolwarm'
vmax = max(abs(np.percentile(valid_data, 5)), abs(np.percentile(valid_data, 95)), 0.1)
vmin, vmax = -vmax, vmax
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
title = "Anomalies Statistiques (Z-score x Moran's I)"
legend_label = "Anomalies topographiques significatives\nRouge vif = Surelevation anormale (mur, tumulus)\nBleu vif = Depression anormale (fosse, doline)\nBlanc/gris = Normal\n\nCombine z-score (intensite) et\nMoran's I (regroupement spatial)"
description = "Detecte uniquement les anomalies statistiquement significatives - filtre le bruit de fond"
elif 'wavelet' in str(tif_file):
cmap = 'cividis'
vmax = max(abs(np.percentile(valid_data, 2)), abs(np.percentile(valid_data, 98)), 0.1)
vmin, vmax = -vmax, vmax
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
title = "Ondelette Mexican Hat (CWT multi-echelle)"
legend_label = "Reponse de la transformee en ondelette a 5 echelles\nEchelles: 2m, 5m, 10m, 20m, 50m\n\nClair = Structure detectee a cette echelle\nSombre = Pas de structure\n\nOptimise pour formes circulaires:\ntumulus, enclos, fosses annulaires"
description = "Transformee en ondelette 2D - excellente pour detecter structures circulaires"
elif 'texture' in str(tif_file):
cmap = 'inferno'
vmax = max(abs(np.percentile(valid_data, 2)), abs(np.percentile(valid_data, 98)), 0.1)
vmin, vmax = -vmax, vmax
data = np.clip((data - vmin) / (vmax - vmin), 0, 1)
title = "Texture GLCM (Contraste + Entropie - Homogeneite)"
legend_label = "Analyse de la texture du relief (fenetre 5m)\nClair = Texture heterogene (labour, ruines, sol perturbe)\nSombre = Texture homogene (sol nu, route, zone plate)\n\nCombine contraste, entropie et homogeneite"
description = "Distingue surfaces anthropiques (labour, chemins) des naturelles"
elif 'flow' in str(tif_file):
cmap = 'Blues'
vmin, vmax = 0, np.percentile(valid_data, 98)
data = np.clip((data - vmin) / max(vmax - vmin, 0.01), 0, 1)
title = "Accumulation de Flux Hydrologique (D8)"
legend_label = f"Logarithme de l'accumulation d'eau\nBlanc = Pas de collecte (sommet, crete)\nBleu fonce = Collecte maximale (thalweg, fosse)\n\nSimule l'ecoulement de l'eau de pluie\nDetecte fosses d'enceinte, routes et drainage"
description = "Algorithme D8 - simule le cheminement de l'eau pour detecter fosses et routes antiques"
elif 'ortho' in str(tif_file):
# Aerial photo - display RGB directly
title = "Photographie Aerienne IGN"
legend_label = "Orthophotographie\nImage aerienne"
description = "Photographie aerienne IGN"
# For 3-band RGB, display directly
if data.ndim == 3:
# Already RGB, just display
pass
else:
cmap = 'gray'
# Skip normalization for RGB
elif 'topo' in str(tif_file):
# Topographic map - display RGB directly
title = "Carte Topographique IGN"
legend_label = "Carte IGN\nPlan topographique"
description = "Carte topographique IGN (Plan IGN)"
# For 3-band RGB, display directly
if data.ndim == 3:
pass
else:
cmap = 'gray'
else:
cmap = 'terrain'
p2, p98 = np.percentile(valid_data, (2, 98))
data = np.clip((data - p2) / (p98 - p2), 0, 1)
title = tif_file.stem.replace('_', ' ').title()
legend_label = "Altitude normalisee"
description = ""
# ---- Create figure with external margins ----
# Layout: map with colorbar, info bar below
# Map aspect ratio determines the figure height dynamically
from matplotlib.gridspec import GridSpec
# Keep image width-based sizing, add room for colorbar and bottom info
fig_width = 20
# Map height based on data aspect ratio, plus margins for title and info
map_aspect = height / width
fig = plt.figure(figsize=(fig_width, fig_width * map_aspect * 0.7 + 2.5),
facecolor='white')
gs = GridSpec(2, 1, height_ratios=[1.0, 0.06],
hspace=0.04, figure=fig,
left=0.06, right=0.88, top=0.93, bottom=0.08)
# Map row (with title above)
ax = fig.add_subplot(gs[0])
# Display data - RGB or single-band
if is_rgb:
im = ax.imshow(data, aspect='equal', origin='upper')
else:
im = ax.imshow(data, cmap=cmap, aspect='equal', origin='upper')
# Title above the map
ax.set_title(f"{title}\n{description}", fontsize=15, fontweight='bold', pad=10)
# Colorbar on the right side (not for RGB images)
if not is_rgb:
cbar = plt.colorbar(im, ax=ax, pad=0.02, shrink=0.85, aspect=30)
cbar.ax.tick_params(labelsize=9, width=1.5)
cbar.outline.set_linewidth(1.5)
cbar.set_label(legend_label, fontsize=10, fontweight='bold')
else:
# For other RGB images (ortho/topo), add legend label as text on the right
ax.text(1.02, 0.5, legend_label, transform=ax.transAxes,
fontsize=10, fontweight='bold', rotation=90,
verticalalignment='center', horizontalalignment='left')
# ---- GPS coordinate tick labels ----
if gps_coords and 'x_ticks' in gps_coords:
# X-axis: longitude along bottom
x_pixel_positions = np.linspace(0, width - 1, len(gps_coords['x_ticks']))
x_labels = [f"{lon:.5f}E" for lon, lat in gps_coords['x_ticks']]
ax.set_xticks(x_pixel_positions)
ax.set_xticklabels(x_labels, fontsize=7, rotation=30)
ax.set_xlabel('Longitude', fontsize=9, fontweight='bold')
# Y-axis: latitude along left
y_pixel_positions = np.linspace(0, height - 1, len(gps_coords['y_ticks']))
y_labels = [f"{lat:.5f}N" for lon, lat in gps_coords['y_ticks']]
ax.set_yticks(y_pixel_positions)
ax.set_yticklabels(y_labels, fontsize=7)
ax.set_ylabel('Latitude', fontsize=9, fontweight='bold')
else:
# Fallback to Lambert 93 coordinates
x_ticks_count = 5
x_positions = np.linspace(0, width - 1, x_ticks_count)
x_labels = [f"{(min_x + xp * pixel_size_x)/1000:.1f}" for xp in x_positions]
ax.set_xticks(x_positions)
ax.set_xticklabels(x_labels, fontsize=8)
ax.set_xlabel('Est (km) - Lambert 93', fontsize=9, fontweight='bold')
y_ticks_count = 5
y_positions = np.linspace(0, height - 1, y_ticks_count)
y_labels = [f"{(max_y - yp * pixel_size_y)/1000:.1f}" for yp in y_positions]
ax.set_yticks(y_positions)
ax.set_yticklabels(y_labels, fontsize=8)
ax.set_ylabel('Nord (km) - Lambert 93', fontsize=9, fontweight='bold')
# Style axes
ax.tick_params(axis='both', which='both', direction='out', length=3,
width=0.8, colors='black')
for spine in ax.spines.values():
spine.set_visible(True)
spine.set_color('black')
spine.set_linewidth(0.8)
# ---- North arrow ----
from matplotlib.patches import Polygon as MplPolygon
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
north_ax = inset_axes(ax, width="4%", height="7%", loc='upper right',
bbox_to_anchor=(-0.03, 0.12, 1, 1), bbox_transform=ax.transAxes)
north_ax.set_xlim(0, 1)
north_ax.set_ylim(0, 1)
north_ax.axis('off')
north_ax.plot([0.5, 0.5], [0.1, 0.65], color='black', linewidth=2.5, zorder=10)
north_ax.add_patch(MplPolygon([[0.5, 0.2], [0.35, 0.4], [0.5, 0.65], [0.65, 0.4]],
closed=True, facecolor='black', edgecolor='black', zorder=9))
north_ax.text(0.5, 0.95, 'N', ha='center', va='top',
fontsize=13, fontweight='bold', color='black', zorder=11)
# ---- Bottom cartouche: info + scale bar OUTSIDE the map ----
info_ax = fig.add_subplot(gs[1])
info_ax.axis('off')
# Altitude range (skip for RGB images)
if is_rgb:
alt_min = 0
alt_max = 0
else:
alt_min = np.nanmin(valid_data) if len(valid_data) > 0 else 0
alt_max = np.nanmax(valid_data) if len(valid_data) > 0 else 0
extent_km_x = (max_x - min_x) / 1000
extent_km_y = (max_y - min_y) / 1000
# Build info text (left side of bottom row)
if gps_coords:
nw_lat, nw_lon = gps_coords['NW']
se_lat, se_lon = gps_coords['SE']
info_text = (
f"GPS: {nw_lat:.5f}N {nw_lon:.5f}E - {se_lat:.5f}N {se_lon:.5f}E | "
f"EPSG:2154 | Res: {self.resolution}m/px | "
f"Emprise: {extent_km_x:.1f}x{extent_km_y:.1f}km"
)
else:
info_text = (
f"EPSG:2154 | X: {min_x:.0f}-{max_x:.0f} Y: {min_y:.0f}-{max_y:.0f} | "
f"Res: {self.resolution}m/px | Emprise: {extent_km_x:.1f}x{extent_km_y:.1f}km"
)
info_ax.text(0.01, 0.5, info_text,
transform=info_ax.transAxes, fontsize=8.5,
verticalalignment='center', family='monospace',
bbox=dict(boxstyle='round,pad=0.3', facecolor='#f0f0f0', edgecolor='#aaaaaa', alpha=0.95))
# Scale bar: always 100m (fixed scale)
# 100m at 0.5m/px = 200 pixels
scale_m = 100
scale_label = "100 m"
pixels_per_meter = 1.0 / pixel_size_x
scale_px = int(scale_m * pixels_per_meter)
# Convert pixel length to fraction of the axis width
# Scale bar as fraction of the info axis (which matches map width)
scale_bar_frac = scale_px / width
# Draw scale bar as simple black bar with text
from matplotlib.patches import Rectangle as RectPatch
bar_left = 0.80
bar_bottom = 0.15
bar_width_frac = min(scale_bar_frac, 0.15) # Cap at 15% of width
bar_height = 0.35
info_ax.add_patch(RectPatch((bar_left, bar_bottom), bar_width_frac, bar_height,
facecolor='black', edgecolor='black', linewidth=0.5,
transform=info_ax.transAxes, clip_on=False))
info_ax.text(bar_left + bar_width_frac / 2, bar_bottom + bar_height + 0.12,
scale_label, ha='center', va='bottom', fontsize=9, fontweight='bold',
transform=info_ax.transAxes)
fig.patch.set_facecolor('white')
plt.savefig(jpg_file, dpi=150, bbox_inches='tight', pad_inches=0.15,
facecolor='white', format='png')
plt.close()
# Delete the source TIFF file to save space
tif_file.unlink()
return jpg_file
except Exception as e:
print(f" Erreur conversion PNG: {e}")
import traceback
traceback.print_exc()
return None
def generate_all_visualizations(self, dtm_file, basename):
"""Generate all archaeological visualizations"""
print(f"\n Génération visualisations:")
# Create per-file subdirectory
file_vis_dir = self.vis_dir / basename
file_vis_dir.mkdir(exist_ok=True)
# Temporarily override vis_dir for this file
original_vis_dir = self.vis_dir
self.vis_dir = file_vis_dir
vis_results = {}
# Core visualizations
vis_results['hillshade'] = self.generate_hillshade(dtm_file, basename)
vis_results['slope'] = self.generate_slope(dtm_file, basename)
vis_results['aspect'] = self.generate_aspect(dtm_file, basename)
vis_results['curvature'] = self.generate_curvature(dtm_file, basename)
vis_results['svf'] = self.generate_svf(dtm_file, basename)
vis_results['lrm'] = self.generate_lrm(dtm_file, basename)
vis_results['pos_open'] = self.generate_openness(dtm_file, basename, positive=True)
vis_results['neg_open'] = self.generate_openness(dtm_file, basename, positive=False)
# Advanced visualizations
vis_results['mslrm'] = self.generate_mslrm(dtm_file, basename)
vis_results['tpi'] = self.generate_tpi(dtm_file, basename)
vis_results['depressions'] = self.generate_depressions(dtm_file, basename)
vis_results['sailore'] = self.generate_sailore(dtm_file, basename)
vis_results['roughness'] = self.generate_roughness(dtm_file, basename)
vis_results['anomalies'] = self.generate_anomalies(dtm_file, basename)
vis_results['wavelet'] = self.generate_wavelet(dtm_file, basename)
vis_results['texture'] = self.generate_texture(dtm_file, basename)
vis_results['flow'] = self.generate_flow(dtm_file, basename)
# IGN overlays
vis_results['ortho'] = self.generate_ign_overlay(
dtm_file, basename,
layer='ORTHOIMAGERY.ORTHOPHOTOS',
title='Photographie Aerienne IGN',
legend_label='Orthophotographie\nImage aerienne',
description='Photographie aerienne IGN (Orthophoto)',
out_suffix='ortho'
)
vis_results['topo'] = self.generate_ign_overlay(
dtm_file, basename,
layer='GEOGRAPHICALGRIDSYSTEMS.PLANIGNV2',
title='Carte Topographique IGN',
legend_label='Carte IGN\nPlan topographique',
description='Carte topographique IGN (Plan IGN)',
out_suffix='topo'
)
# Convert to PNG
print(f"\n Conversion images PNG:")
for name, tif_file in vis_results.items():
if tif_file:
jpg_file = self.tif_to_png(tif_file)
if jpg_file:
print(f"{jpg_file.name}")
# Restore original vis_dir
self.vis_dir = original_vis_dir
return vis_results
# ============ STEP 5: PDF Report ============
def generate_pdf_report(self, basename):
"""Generate A3 PDF report for a LiDAR file with all visualizations.
Page 1: Mise en situation (ortho + topo IGN side by side)
Pages 2+: Other visualizations (2 per page)"""
from matplotlib.backends.backend_pdf import PdfPages
pdf_file = self.pdf_dir / f"{basename}_rapport.pdf"
print(f" → Génération rapport PDF A3: {pdf_file.name}")
# Look for PNGs in per-file subdirectory first, then fallback to main dir
file_vis_dir = self.vis_dir / basename
if file_vis_dir.exists():
png_files = sorted(file_vis_dir.glob("*.png"))
else:
png_files = sorted(self.vis_dir.glob(f"{basename}_*.png"))
if not png_files:
print(f" ✗ Aucune image PNG trouvée pour {basename}")
return None
# Categorize files
situ_files = [] # Page 1: situation maps
analysis_files = [] # Pages 2+: analysis maps
for f in png_files:
name = f.stem.lower()
if 'ortho' in name:
situ_files.insert(0, f) # Ortho first
elif 'topo' in name:
situ_files.append(f) # Topo second
else:
analysis_files.append(f)
# Define display order for analysis maps
order = ['mslrm', 'svf', 'negative_openness',
'positive_openness', 'sailore', 'depressions', 'hillshade_multi',
'lrm', 'tpi', 'slope', 'curvature', 'aspect',
'roughness', 'anomalies', 'wavelet', 'texture', 'flow']
def sort_key(f):
name = f.stem.lower()
for i, key in enumerate(order):
if key in name:
return i
return len(order)
analysis_files.sort(key=sort_key)
# A3 landscape dimensions in inches (420mm x 297mm)
a3_w, a3_h = 16.54, 11.69
try:
with PdfPages(str(pdf_file)) as pdf:
# ============ PAGE 1: Mise en situation ============
if situ_files:
fig = plt.figure(figsize=(a3_w, a3_h), facecolor='white')
n_situ = len(situ_files)
if n_situ == 2:
# Side by side
gs = fig.add_gridspec(1, 2, wspace=0.05, left=0.03, right=0.97,
top=0.92, bottom=0.06)
else:
# Single or more
gs = fig.add_gridspec(1, max(n_situ, 1), wspace=0.05,
left=0.03, right=0.97, top=0.92, bottom=0.06)
fig.text(0.5, 0.97, f"Mise en situation - {basename}",
fontsize=20, fontweight='bold', ha='center', va='top')
for i, f in enumerate(situ_files):
ax = fig.add_subplot(gs[0, i])
img = plt.imread(str(f))
ax.imshow(img)
ax.axis('off')
# Add title from filename
title = f.stem.replace(basename + '_', '').replace('_', ' ').title()
ax.set_title(title, fontsize=12, fontweight='bold', pad=5)
pdf.savefig(fig, dpi=150)
plt.close(fig)
# ============ PAGES 2+: Analysis maps (2 per page) ============
# Group analysis files 2 per page
for page_start in range(0, len(analysis_files), 2):
page_files = analysis_files[page_start:page_start + 2]
fig = plt.figure(figsize=(a3_w, a3_h), facecolor='white')
if len(page_files) == 2:
gs = fig.add_gridspec(1, 2, wspace=0.08, left=0.03, right=0.97,
top=0.93, bottom=0.05)
else:
gs = fig.add_gridspec(1, 1, left=0.05, right=0.95,
top=0.93, bottom=0.05)
for i, f in enumerate(page_files):
ax = fig.add_subplot(gs[0, i])
img = plt.imread(str(f))
ax.imshow(img)
ax.axis('off')
# Title from filename
title = f.stem.replace(basename + '_', '').replace('_', ' ').title()
ax.set_title(title, fontsize=11, fontweight='bold', pad=3)
# Page number
page_num = (page_start // 2) + 2 # Start from page 2
fig.text(0.99, 0.01, f"Page {page_num}", fontsize=8,
ha='right', va='bottom', color='gray')
pdf.savefig(fig, dpi=150)
plt.close(fig)
print(f" ✓ Rapport PDF: {pdf_file.name}")
return pdf_file
except Exception as e:
print(f" ✗ Erreur PDF: {e}")
import traceback
traceback.print_exc()
return None
# ============ Complete Pipeline ============
def process_file(self, laz_file):
"""Process a single LAZ file"""
basename = laz_file.stem
print(f"\n{'='*60}")
print(f" FICHIER : {basename}")
print(f"{'='*60}")
# Step 1: Ground classification
print(f"\n[1/3] Classification du sol...")
las_file = self.classify_ground(laz_file)
if not las_file:
print(f" ✗ Échec classification")
return False
# Step 2: Generate DTM
print(f"\n[2/3] Génération DTM...")
dtm_file = self.create_dtm_fast(las_file, basename)
if not dtm_file:
print(f" ✗ Échec DTM")
return False
# Step 3: Visualizations
print(f"\n[3/4] Visualisations archéologiques...")
vis = self.generate_all_visualizations(dtm_file, basename)
# Step 4: PDF report
print(f"\n[4/4] Rapport PDF A3...")
self.generate_pdf_report(basename)
print(f"\n{basename} traité avec succès !")
return True
def process_all(self):
"""Process all LAZ files in input directory"""
files = self.find_laz_files()
if not files:
print("Aucun fichier LAZ/LAS trouvé !")
return
print(f"\n{'='*60}")
print(f" PIPELINE ARCHÉOLOGIQUE LiDAR")
print(f"{'='*60}")
# Check tools
print(f"\nVérification des outils...")
if not self.check_tools():
print("Erreur: Certains outils ne sont pas disponibles")
return
results = {}
if self.workers > 1 and len(files) > 1:
# Process multiple files in parallel
print(f"\n Traitement parallèle avec {self.workers} workers...")
with ProcessPoolExecutor(max_workers=self.workers) as executor:
future_to_file = {
executor.submit(self._process_file_standalone, str(laz_file), str(self.input_dir), str(self.output_dir), self.resolution): laz_file
for laz_file in files
}
for future in as_completed(future_to_file):
laz_file = future_to_file[future]
try:
success = future.result()
results[laz_file.name] = success
status = "" if success else ""
print(f" {status} {laz_file.name}")
except Exception as e:
print(f"\n✗ Erreur traitement {laz_file.name}: {e}")
results[laz_file.name] = False
else:
# Sequential processing
for laz_file in files:
try:
results[laz_file.name] = self.process_file(laz_file)
except Exception as e:
print(f"\n✗ Erreur traitement: {e}")
import traceback
traceback.print_exc()
results[laz_file.name] = False
# Summary
print(f"\n{'='*60}")
print(f" RÉSUMÉ")
print(f"{'='*60}")
success_count = sum(results.values())
print(f"{success_count}/{len(results)} fichiers traités avec succès")
print(f"\nRésultats dans: {self.output_dir}")
print(f" • DTM: {self.dtm_dir}")
print(f" • Visualisations PNG: {self.vis_dir}")
print(f" • Rapports PDF: {self.pdf_dir}")
# Clean up temporary files to save space
print(f"\nNettoyage des fichiers temporaires...")
try:
import shutil
if self.temp_dir.exists():
shutil.rmtree(self.temp_dir)
print(f" ✓ Fichiers temporaires supprimés ({self.temp_dir})")
except Exception as e:
print(f" Note: Impossible de supprimer les fichiers temporaires")
@staticmethod
def _process_file_standalone(laz_file_str, input_dir, output_dir, resolution):
"""Standalone function for multiprocessing - creates its own pipeline instance."""
pipeline = LidarArchaeoPipeline(input_dir, output_dir, resolution=resolution, workers=1)
laz_file = Path(laz_file_str)
return pipeline.process_file(laz_file)
def main():
import argparse
parser = argparse.ArgumentParser(
description="Pipeline LiDAR pour détection archéologique (Python pur)"
)
parser.add_argument(
"input",
help="Dossier contenant les fichiers LAZ/LAS"
)
parser.add_argument(
"-o", "--output",
default="/data/output",
help="Dossier de sortie"
)
parser.add_argument(
"-r", "--resolution",
type=float,
default=0.5,
help="Résolution en mètres"
)
parser.add_argument(
"-w", "--workers",
type=int,
default=1,
help="Nombre de workers pour traitement parallèle (défaut: 1)"
)
args = parser.parse_args()
try:
pipeline = LidarArchaeoPipeline(
input_dir=args.input,
output_dir=args.output,
resolution=args.resolution,
workers=args.workers
)
pipeline.process_all()
except Exception as e:
print(f"Erreur: {e}")
sys.exit(1)
if __name__ == "__main__":
main()