- MSRM/TPI/roughness/anomalies: revert z-score (x-mean)/std to std normalization x/std to preserve contrast and visibility of linear features (paths, ditches, trenches) - MSRM: adaptive scales based on resolution, archaeological weight combination - TPI: extend from 2 to 4 scales (3m/15m/50m/200m) with weighted combination - Hillshade: 8 directions instead of 4, altitude 35° instead of 30° - LRM: adaptive sigma based on resolution - Openness: doubled radius (100m instead of 50m) - Roughness: multi-scale (3m fine + 15m broad) instead of single 5x5 window - Anomalies: uses MSRM multi-scale relief instead of single LRM 15m - Wavelet: 8 adaptive scales, std normalization, archaeological weights - Remove svf (Sky-View Factor) and local_dominance visualizations - Add AVIF format support (default), quality 98 - Add multi-resolution support (-r 0.5,0.2) - Improve Ctrl+C handling for immediate process termination - Update rendering.py descriptions for all modified visualizations Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1278 lines
49 KiB
Python
1278 lines
49 KiB
Python
"""Terrain visualization functions for LiDAR archaeological analysis.
|
||
|
||
Each function takes (dem_file, basename, vis_dir, resolution) as explicit
|
||
parameters and returns the path to the output GeoTIFF file, or None on error.
|
||
|
||
When a SharedDEM object is provided via the `shared` parameter, pre-computed
|
||
data (gradient, NaN mask, LRM) is reused across visualizations to avoid
|
||
redundant I/O and computation.
|
||
"""
|
||
|
||
import logging
|
||
import time
|
||
import warnings
|
||
from pathlib import Path
|
||
|
||
import numpy as np
|
||
import rasterio
|
||
from scipy.ndimage import generic_filter
|
||
from scipy.stats import binned_statistic_2d
|
||
|
||
from .gpu import HAS_GPU, to_gpu, to_cpu, xp_gaussian_filter, xp_uniform_filter, xp_minimum_filter, xp_maximum_filter, gpu_cleanup
|
||
|
||
logger = logging.getLogger("lidar")
|
||
|
||
# Use CuPy array module when available
|
||
if HAS_GPU:
|
||
import cupy as cp
|
||
xp = cp
|
||
else:
|
||
xp = np
|
||
|
||
|
||
class SharedDEM:
|
||
"""Pre-computed DEM data shared across all visualizations.
|
||
|
||
Reads the DEM once and lazily computes on first access:
|
||
- NaN mask and filled DEM (avoids 20+ calls to _fill_nans)
|
||
- Gradient components (shared by hillshade, slope, aspect, curvature)
|
||
- LRM at 15m kernel (shared by lrm + anomalies)
|
||
|
||
Attributes are computed lazily on first access to avoid computing
|
||
data that is never used (e.g. LRM when only hillshade needs generation).
|
||
"""
|
||
|
||
def __init__(self, dem_file, resolution):
|
||
dem_np, transform, crs = _read_dem(dem_file)
|
||
self.dem_file = dem_file
|
||
self.resolution = resolution
|
||
self.transform = transform
|
||
self.crs = crs
|
||
self.nan_mask = np.isnan(dem_np)
|
||
self.dem_np = dem_np.astype(np.float32)
|
||
|
||
# Lazy caches — computed on first access
|
||
self._filled = None
|
||
self._gradient = None # (dy, dx, slope_rad, slope_deg, aspect)
|
||
self._lrm_15 = None
|
||
|
||
# GPU lazy caches
|
||
self._filled_gpu = None
|
||
self._dem_gpu = None
|
||
|
||
@property
|
||
def filled(self):
|
||
"""Filled DEM (NaN interpolated) — computed lazily."""
|
||
if self._filled is None:
|
||
logger.debug(" → Calcul filled DEM (interpolation NaN)...")
|
||
self._filled, _ = _fill_nans(self.dem_np)
|
||
return self._filled
|
||
|
||
@property
|
||
def dy(self):
|
||
self._ensure_gradient()
|
||
return self._gradient[0]
|
||
|
||
@property
|
||
def dx(self):
|
||
self._ensure_gradient()
|
||
return self._gradient[1]
|
||
|
||
@property
|
||
def slope_rad(self):
|
||
self._ensure_gradient()
|
||
return self._gradient[2]
|
||
|
||
@property
|
||
def slope_deg(self):
|
||
self._ensure_gradient()
|
||
return self._gradient[3]
|
||
|
||
@property
|
||
def aspect(self):
|
||
self._ensure_gradient()
|
||
return self._gradient[4]
|
||
|
||
@property
|
||
def lrm_15(self):
|
||
"""LRM at 15m kernel — computed lazily."""
|
||
if self._lrm_15 is None:
|
||
logger.debug(" → Calcul LRM 15m...")
|
||
sigma_15 = 15.0 / self.resolution
|
||
local_mean_15 = _filter_nanaware_from_filled(self, xp_gaussian_filter, sigma=sigma_15)
|
||
self._lrm_15 = self.dem_np - local_mean_15
|
||
self._lrm_15[self.nan_mask] = np.nan
|
||
return self._lrm_15
|
||
|
||
def _ensure_gradient(self):
|
||
"""Compute gradient components lazily on first access."""
|
||
if self._gradient is None:
|
||
logger.debug(" → Calcul gradient...")
|
||
dy = np.gradient(self.filled, self.resolution, axis=0)
|
||
dx = np.gradient(self.filled, self.resolution, axis=1)
|
||
slope_rad = np.arctan(np.sqrt(dx**2 + dy**2))
|
||
slope_deg = np.degrees(slope_rad)
|
||
aspect = np.mod(np.degrees(np.arctan2(dy, dx)), 360)
|
||
self._gradient = (dy, dx, slope_rad, slope_deg, aspect)
|
||
|
||
@property
|
||
def filled_gpu(self):
|
||
"""Lazy GPU copy of the filled DEM."""
|
||
if self._filled_gpu is None and HAS_GPU:
|
||
self._filled_gpu = to_gpu(self.filled)
|
||
return self._filled_gpu
|
||
|
||
@property
|
||
def dem_gpu(self):
|
||
"""Lazy GPU copy of the DEM."""
|
||
if self._dem_gpu is None and HAS_GPU:
|
||
self._dem_gpu = to_gpu(self.dem_np)
|
||
return self._dem_gpu
|
||
|
||
|
||
def _filter_nanaware_from_filled(shared, filter_func, *args, **kwargs):
|
||
"""Apply filter on pre-filled DEM data (skips expensive _fill_nans).
|
||
|
||
Uses the SharedDEM.filled array directly, then restores NaN mask.
|
||
If GPU is available, uses the lazy GPU copy to avoid CPU↔GPU transfers.
|
||
"""
|
||
if HAS_GPU and shared.filled_gpu is not None:
|
||
filled_gpu = to_gpu(shared.filled)
|
||
result_gpu = filter_func(filled_gpu, *args, **kwargs)
|
||
result = to_cpu(result_gpu)
|
||
gpu_cleanup()
|
||
else:
|
||
result = filter_func(shared.filled, *args, **kwargs)
|
||
result[shared.nan_mask] = np.nan
|
||
return result
|
||
|
||
|
||
def _save_tif(output_path, data, transform, crs, dtype='float32', count=1, nodata=None, nan_mask=None):
|
||
"""Helper to save a 2D or 3D array as GeoTIFF.
|
||
|
||
Args:
|
||
nan_mask: Optional boolean mask (True=NaN) to apply before saving.
|
||
Restores NaN zones in gradient-derived products that were
|
||
computed on the filled DEM.
|
||
"""
|
||
if nan_mask is not None:
|
||
data = np.array(data, dtype=dtype, copy=True)
|
||
data[nan_mask] = np.nan
|
||
|
||
# Auto-detect nodata for float types with NaN
|
||
if nodata is None and dtype.startswith('float') and np.any(np.isnan(data)):
|
||
nodata = float('nan')
|
||
|
||
if data.ndim == 2:
|
||
height, width = data.shape
|
||
with rasterio.open(
|
||
output_path, 'w', driver='GTiff',
|
||
height=height, width=width, count=count,
|
||
dtype=dtype, crs=crs, transform=transform,
|
||
compress='deflate', nodata=nodata
|
||
) as dst:
|
||
dst.write(data.astype(dtype), 1)
|
||
elif data.ndim == 3:
|
||
bands, height, width = data.shape
|
||
with rasterio.open(
|
||
output_path, 'w', driver='GTiff',
|
||
height=height, width=width, count=bands,
|
||
dtype=dtype, crs=crs, transform=transform,
|
||
compress='deflate', nodata=nodata
|
||
) as dst:
|
||
for i in range(bands):
|
||
dst.write(data[i].astype(dtype), i + 1)
|
||
|
||
|
||
def _read_dem(dem_file):
|
||
"""Read DEM file and return (data, transform, crs)."""
|
||
with rasterio.open(dem_file) as src:
|
||
return src.read(1), src.transform, src.crs
|
||
|
||
|
||
def _fill_nans(arr):
|
||
"""Fill NaN values using nearest-neighbor interpolation.
|
||
|
||
Returns (filled_array, nan_mask) so the caller can restore NaN after filtering.
|
||
"""
|
||
from scipy.interpolate import NearestNDInterpolator
|
||
nan_mask = np.isnan(arr)
|
||
if not np.any(nan_mask):
|
||
return arr, nan_mask
|
||
valid = ~nan_mask
|
||
y_coords, x_coords = np.where(valid)
|
||
if len(y_coords) == 0:
|
||
return np.zeros_like(arr), nan_mask
|
||
z_values = arr[valid]
|
||
interp = NearestNDInterpolator(
|
||
np.column_stack((y_coords, x_coords)), z_values
|
||
)
|
||
y_missing, x_missing = np.where(nan_mask)
|
||
filled = arr.copy()
|
||
filled[y_missing, x_missing] = interp(y_missing, x_missing)
|
||
return filled, nan_mask
|
||
|
||
|
||
def _filter_nanaware(arr, filter_func, *args, use_gpu=True, **kwargs):
|
||
"""Apply a filter to an array while preserving NaN zones.
|
||
|
||
1. Fill NaN with nearest-neighbor interpolation
|
||
2. Apply the filter
|
||
3. Restore original NaN mask on the result
|
||
|
||
Args:
|
||
arr: Input array (numpy or cupy).
|
||
filter_func: Function that takes (array, *args, **kwargs) and returns filtered array.
|
||
use_gpu: If True, apply filter on GPU (send filled array to GPU first).
|
||
Returns:
|
||
Filtered array with original NaN positions preserved.
|
||
"""
|
||
is_gpu_arr = HAS_GPU and isinstance(arr, cp.ndarray)
|
||
arr_np = to_cpu(arr) if is_gpu_arr else arr
|
||
|
||
filled, nan_mask = _fill_nans(arr_np)
|
||
|
||
if use_gpu and HAS_GPU:
|
||
filled_gpu = to_gpu(filled)
|
||
result_gpu = filter_func(filled_gpu, *args, **kwargs)
|
||
result = to_cpu(result_gpu)
|
||
else:
|
||
result = filter_func(filled, *args, **kwargs)
|
||
|
||
result[nan_mask] = np.nan
|
||
return result
|
||
|
||
|
||
# ============================================================
|
||
# Core terrain visualizations
|
||
# ============================================================
|
||
|
||
def generate_hillshade(dem_file, basename, vis_dir, resolution, shared=None):
|
||
"""Generate multi-directional hillshade with contrast enhancement — GPU if available.
|
||
|
||
Combines 8-direction hillshade with slope shading for balanced illumination.
|
||
Applies percentile normalization and gamma correction to restore
|
||
contrast lost by averaging multiple azimuths.
|
||
"""
|
||
gpu_tag = " [GPU]" if HAS_GPU else ""
|
||
logger.info(f" → Hillshade multidirectionnel{gpu_tag}...")
|
||
t0 = time.time()
|
||
output = vis_dir / f"{basename}_hillshade_multi.tif"
|
||
|
||
try:
|
||
if shared:
|
||
transform = shared.transform
|
||
crs = shared.crs
|
||
dem = to_gpu(shared.dem_np)
|
||
dy = to_gpu(shared.dy) if HAS_GPU else shared.dy
|
||
dx = to_gpu(shared.dx) if HAS_GPU else shared.dx
|
||
slope = to_gpu(shared.slope_rad) if HAS_GPU else shared.slope_rad
|
||
aspect = xp.arctan2(dy, dx)
|
||
sin_slope = xp.sin(slope)
|
||
cos_slope = xp.cos(slope)
|
||
else:
|
||
dem_np, transform, crs = _read_dem(dem_file)
|
||
dem = to_gpu(dem_np)
|
||
dy, dx = xp.gradient(dem)
|
||
slope = xp.arctan(xp.sqrt(dx**2 + dy**2))
|
||
aspect = xp.arctan2(dy, dx)
|
||
sin_slope = xp.sin(slope)
|
||
cos_slope = xp.cos(slope)
|
||
|
||
# 8 azimuths for balanced illumination (eliminates directional bias)
|
||
azimuts = [0, 45, 90, 135, 180, 225, 270, 315]
|
||
altitude = 35 # Higher altitude for better micro-relief detection
|
||
hillshades = []
|
||
|
||
alt_rad = xp.radians(xp.array(altitude))
|
||
sin_alt = xp.sin(alt_rad)
|
||
cos_alt = xp.cos(alt_rad)
|
||
|
||
for az in azimuts:
|
||
az_rad = xp.radians(xp.array(az))
|
||
hs = sin_alt * sin_slope + cos_alt * cos_slope * xp.cos(az_rad - aspect)
|
||
hillshades.append(xp.clip(hs, 0, 1))
|
||
|
||
combined_hillshade = xp.mean(xp.array(hillshades), axis=0)
|
||
slope_shaded = cos_slope
|
||
combined = 0.7 * combined_hillshade + 0.3 * slope_shaded
|
||
|
||
# Contrast enhancement: percentile stretch + gamma
|
||
combined_np = to_cpu(combined)
|
||
nan_mask = shared.nan_mask if shared else np.isnan(to_cpu(dem_np) if HAS_GPU else dem_np)
|
||
valid = combined_np[~nan_mask]
|
||
if len(valid) > 0:
|
||
p2, p98 = np.percentile(valid, 2), np.percentile(valid, 98)
|
||
if p98 - p2 > 0.01:
|
||
combined_np = np.clip((combined_np - p2) / (p98 - p2), 0, 1)
|
||
# Gamma correction to enhance shadows
|
||
gamma = 0.8
|
||
combined_np = np.power(combined_np, gamma)
|
||
|
||
_save_tif(output, combined_np.astype(np.float32), transform, crs, nan_mask=nan_mask)
|
||
logger.info(f" ✓ Hillshade terminé ({time.time()-t0:.1f}s){gpu_tag}")
|
||
return output
|
||
except Exception as e:
|
||
logger.error(f" ✗ Erreur hillshade: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
def generate_slope(dem_file, basename, vis_dir, resolution, shared=None):
|
||
"""Generate slope map (degrees) — GPU if available."""
|
||
gpu_tag = " [GPU]" if HAS_GPU else ""
|
||
logger.info(f" → Pente (Slope){gpu_tag}...")
|
||
t0 = time.time()
|
||
output = vis_dir / f"{basename}_slope.tif"
|
||
|
||
try:
|
||
if shared:
|
||
transform = shared.transform
|
||
crs = shared.crs
|
||
slope = shared.slope_deg
|
||
nan_mask = shared.nan_mask
|
||
if HAS_GPU:
|
||
slope = to_gpu(slope)
|
||
else:
|
||
dem_np, transform, crs = _read_dem(dem_file)
|
||
dem = to_gpu(dem_np)
|
||
dy, dx = xp.gradient(dem)
|
||
slope = xp.arctan(xp.sqrt(dx**2 + dy**2)) * 180 / xp.pi
|
||
nan_mask = np.isnan(dem_np)
|
||
_save_tif(output, to_cpu(slope) if HAS_GPU else slope, transform, crs, nan_mask=nan_mask)
|
||
logger.info(f" ✓ Pente terminée ({time.time()-t0:.1f}s){gpu_tag}")
|
||
return output
|
||
except Exception as e:
|
||
logger.error(f" ✗ Erreur slope: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
def generate_aspect(dem_file, basename, vis_dir, resolution, shared=None):
|
||
"""Generate aspect (slope orientation) map — GPU if available."""
|
||
gpu_tag = " [GPU]" if HAS_GPU else ""
|
||
logger.info(f" → Aspect (Orientation){gpu_tag}...")
|
||
t0 = time.time()
|
||
output = vis_dir / f"{basename}_aspect.tif"
|
||
|
||
try:
|
||
if shared:
|
||
transform = shared.transform
|
||
crs = shared.crs
|
||
aspect = shared.aspect
|
||
nan_mask = shared.nan_mask
|
||
if HAS_GPU:
|
||
aspect = to_gpu(aspect)
|
||
else:
|
||
dem_np, transform, crs = _read_dem(dem_file)
|
||
dem = to_gpu(dem_np)
|
||
dy, dx = xp.gradient(dem)
|
||
aspect = xp.arctan2(dy, dx) * 180 / xp.pi
|
||
aspect = xp.mod(aspect, 360)
|
||
nan_mask = np.isnan(dem_np)
|
||
_save_tif(output, to_cpu(aspect) if HAS_GPU else aspect, transform, crs, nan_mask=nan_mask)
|
||
logger.info(f" ✓ Aspect terminé ({time.time()-t0:.1f}s){gpu_tag}")
|
||
return output
|
||
except Exception as e:
|
||
logger.error(f" ✗ Erreur aspect: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
def generate_curvature(dem_file, basename, vis_dir, resolution, shared=None):
|
||
"""Generate curvature (terrain concavity/convexity) map — GPU if available."""
|
||
gpu_tag = " [GPU]" if HAS_GPU else ""
|
||
logger.info(f" → Courbure (Curvature){gpu_tag}...")
|
||
t0 = time.time()
|
||
output = vis_dir / f"{basename}_curvature.tif"
|
||
|
||
try:
|
||
if shared:
|
||
transform = shared.transform
|
||
crs = shared.crs
|
||
dx = shared.dx
|
||
dy = shared.dy
|
||
nan_mask = shared.nan_mask
|
||
if HAS_GPU:
|
||
dx = to_gpu(dx)
|
||
dy = to_gpu(dy)
|
||
else:
|
||
dem_np, transform, crs = _read_dem(dem_file)
|
||
dem = to_gpu(dem_np)
|
||
dy, dx = xp.gradient(dem)
|
||
nan_mask = np.isnan(dem_np)
|
||
d2z_dx2 = xp.gradient(dx, axis=1)
|
||
d2z_dy2 = xp.gradient(dy, axis=0)
|
||
curvature = (d2z_dx2 + d2z_dy2) / 2
|
||
_save_tif(output, to_cpu(curvature), transform, crs, nan_mask=nan_mask)
|
||
logger.info(f" ✓ Courbure terminée ({time.time()-t0:.1f}s){gpu_tag}")
|
||
return output
|
||
except Exception as e:
|
||
logger.error(f" ✗ Erreur curvature: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
|
||
# ============================================================
|
||
# GPU-accelerated visualizations
|
||
# ============================================================
|
||
|
||
def generate_lrm(dem_file, basename, vis_dir, resolution, shared=None):
|
||
"""Local Relief Model - deviation from local mean (GPU if available).
|
||
|
||
Kernel sigma adapts to resolution: finer kernel at higher resolution
|
||
to capture micro-relief details. At 0.5m/px: 15m, at 0.2m/px: ~5m.
|
||
"""
|
||
gpu_tag = " [GPU]" if HAS_GPU else ""
|
||
logger.info(f" → Local Relief Model{gpu_tag}...")
|
||
t0 = time.time()
|
||
output = vis_dir / f"{basename}_lrm.tif"
|
||
|
||
try:
|
||
if shared:
|
||
transform = shared.transform
|
||
crs = shared.crs
|
||
lrm = shared.lrm_15.copy()
|
||
else:
|
||
dem_np, transform, crs = _read_dem(dem_file)
|
||
nan_mask = np.isnan(dem_np)
|
||
# Adapt sigma to resolution: standard 15m at 0.5m, finer at higher res
|
||
sigma_m = max(5.0, 15.0 * 0.5 / resolution)
|
||
logger.info(f" LRM sigma={sigma_m:.1f}m (résolution {resolution}m/px)")
|
||
local_mean = _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_m / resolution)
|
||
lrm = dem_np - local_mean
|
||
lrm[nan_mask] = np.nan
|
||
_save_tif(output, lrm.astype(np.float32), transform, crs)
|
||
logger.info(f" ✓ LRM terminé ({time.time()-t0:.1f}s){gpu_tag}")
|
||
return output
|
||
except Exception as e:
|
||
logger.error(f" ✗ Erreur LRM: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
def generate_svf(dem_file, basename, vis_dir, resolution, shared=None):
|
||
"""Sky-View Factor - ray-tracing on 16 azimuths (GPU if available).
|
||
|
||
For each pixel, trace rays in N directions, find the max horizon
|
||
angle in each direction, then SVF = (1/N) * sum(cos²(horizon_angle)).
|
||
Valleys/crevices have low SVF (obstructed sky), ridges/peaks have high SVF.
|
||
"""
|
||
gpu_tag = " [GPU]" if HAS_GPU else ""
|
||
logger.info(f" → Sky-View Factor (ray-tracing){gpu_tag}...")
|
||
t0 = time.time()
|
||
output = vis_dir / f"{basename}_svf.tif"
|
||
|
||
try:
|
||
if shared:
|
||
transform = shared.transform
|
||
crs = shared.crs
|
||
dem_np = shared.dem_np
|
||
rows, cols = dem_np.shape
|
||
res = resolution
|
||
dem = to_gpu(shared.filled) if HAS_GPU else shared.filled
|
||
nan_mask = shared.nan_mask
|
||
else:
|
||
dem_np, transform, crs = _read_dem(dem_file)
|
||
rows, cols = dem_np.shape
|
||
res = resolution
|
||
nan_mask = np.isnan(dem_np)
|
||
filled, _ = _fill_nans(dem_np)
|
||
dem = to_gpu(filled) if HAS_GPU else filled
|
||
|
||
n_dirs = 16
|
||
angles = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False)
|
||
dx_dir = np.cos(angles)
|
||
dy_dir = np.sin(angles)
|
||
max_dist = int(100 / res)
|
||
|
||
padded = xp.pad(dem, max_dist, mode='constant', constant_values=xp.nan)
|
||
svf = xp.zeros_like(dem)
|
||
|
||
for d_idx in range(n_dirs):
|
||
ddx, ddy = dx_dir[d_idx], dy_dir[d_idx]
|
||
horizon = xp.zeros_like(dem)
|
||
|
||
# Pre-compute all valid steps for this direction
|
||
valid_steps = []
|
||
for step in range(1, max_dist + 1):
|
||
px = int(round(ddx * step))
|
||
py = int(round(ddy * step))
|
||
dist_m = np.sqrt((ddx * step * res) ** 2 + (ddy * step * res) ** 2)
|
||
if dist_m < res * 0.5:
|
||
continue
|
||
valid_steps.append((step, px, py, dist_m))
|
||
|
||
# Batch all shifts into a single array for vectorized max computation
|
||
for step, px, py, dist_m in valid_steps:
|
||
elev_diff = padded[max_dist + py:max_dist + py + rows,
|
||
max_dist + px:max_dist + px + cols] - dem
|
||
angle = xp.arctan2(elev_diff, dist_m)
|
||
horizon = xp.where(xp.isnan(angle), horizon,
|
||
xp.maximum(horizon, xp.nan_to_num(angle, nan=0)))
|
||
|
||
svf += xp.cos(xp.pi / 2 - horizon) ** 2
|
||
|
||
svf /= n_dirs
|
||
svf_np = to_cpu(svf).astype(np.float32)
|
||
svf_np[nan_mask] = np.nan
|
||
_save_tif(output, svf_np, transform, crs)
|
||
logger.info(f" ✓ SVF terminé ({time.time()-t0:.1f}s){gpu_tag}")
|
||
return output
|
||
except Exception as e:
|
||
logger.error(f" ✗ Erreur SVF: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
def generate_openness(dem_file, basename, vis_dir, resolution, positive=True, shared=None):
|
||
"""Positive/Negative Openness - true zenith/nadir angle computation (GPU if available).
|
||
|
||
For each pixel, in 8 directions (N, NE, E, SE, S, SW, W, NW):
|
||
- Positive openness: max zenith angle (angle from vertical to highest visible terrain)
|
||
- Negative openness: max nadir angle (angle from vertical down to lowest terrain)
|
||
Result is averaged across all 8 directions.
|
||
Ray radius adapts to resolution: 100m for better detection of large enclosures.
|
||
"""
|
||
name = "positive_openness" if positive else "negative_openness"
|
||
gpu_tag = " [GPU]" if HAS_GPU else ""
|
||
logger.info(f" → {name.replace('_', ' ').title()} (ray-tracing){gpu_tag}...")
|
||
t0 = time.time()
|
||
output = vis_dir / f"{basename}_{name}.tif"
|
||
|
||
try:
|
||
if shared:
|
||
transform = shared.transform
|
||
crs = shared.crs
|
||
dem_np = shared.dem_np
|
||
rows, cols = dem_np.shape
|
||
res = resolution
|
||
dem = to_gpu(shared.filled) if HAS_GPU else shared.filled
|
||
nan_mask = shared.nan_mask
|
||
else:
|
||
dem_np, transform, crs = _read_dem(dem_file)
|
||
rows, cols = dem_np.shape
|
||
res = resolution
|
||
nan_mask = np.isnan(dem_np)
|
||
filled, _ = _fill_nans(dem_np)
|
||
dem = to_gpu(filled) if HAS_GPU else filled
|
||
|
||
n_dirs = 8
|
||
angles = np.linspace(0, 2 * np.pi, n_dirs, endpoint=False)
|
||
dx_dir = np.cos(angles)
|
||
dy_dir = np.sin(angles)
|
||
max_dist = int(100 / res)
|
||
|
||
padded = xp.pad(dem, max_dist, mode='constant', constant_values=xp.nan)
|
||
openness_sum = xp.zeros_like(dem)
|
||
|
||
for d_idx in range(n_dirs):
|
||
ddx, ddy = dx_dir[d_idx], dy_dir[d_idx]
|
||
max_angle = xp.zeros_like(dem)
|
||
|
||
for step in range(1, max_dist + 1):
|
||
px = int(round(ddx * step))
|
||
py = int(round(ddy * step))
|
||
dist_m = np.sqrt((ddx * step * res) ** 2 + (ddy * step * res) ** 2)
|
||
if dist_m < res * 0.5:
|
||
continue
|
||
|
||
elev_diff = padded[max_dist + py:max_dist + py + rows,
|
||
max_dist + px:max_dist + px + cols] - dem
|
||
|
||
if positive:
|
||
angle = xp.arctan2(xp.maximum(elev_diff, 0), dist_m)
|
||
else:
|
||
angle = xp.arctan2(xp.maximum(-elev_diff, 0), dist_m)
|
||
|
||
max_angle = xp.where(xp.isnan(angle), max_angle,
|
||
xp.maximum(max_angle, xp.nan_to_num(angle, nan=0)))
|
||
|
||
openness_sum += max_angle
|
||
|
||
openness_result = to_cpu(xp.degrees(openness_sum / n_dirs)).astype(np.float32)
|
||
openness_result[nan_mask] = np.nan
|
||
_save_tif(output, openness_result, transform, crs)
|
||
logger.info(f" ✓ {name} terminé ({time.time()-t0:.1f}s){gpu_tag}")
|
||
return output
|
||
except Exception as e:
|
||
logger.error(f" ✗ Erreur openness: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
def generate_local_dominance(dem_file, basename, vis_dir, resolution, shared=None,
|
||
radius=15, pmin=2, pmax=98):
|
||
"""Local Dominance — proportion of neighborhood below center point.
|
||
|
||
LD = (dem - local_min) / (local_max - local_min + epsilon)
|
||
|
||
High values = locally dominant (peak, ridge)
|
||
Low values = locally recessed (valley, pit)
|
||
|
||
Uses minimum/maximum filters on the filled DEM, then restores NaN mask.
|
||
Complements openness by measuring local height position rather than angular extent.
|
||
"""
|
||
gpu_tag = " [GPU]" if HAS_GPU else ""
|
||
logger.info(f" → Dominance Locale (rayon {radius}m){gpu_tag}...")
|
||
t0 = time.time()
|
||
output = vis_dir / f"{basename}_local_dominance.tif"
|
||
|
||
try:
|
||
if shared:
|
||
transform = shared.transform
|
||
crs = shared.crs
|
||
nan_mask = shared.nan_mask
|
||
dem_np = shared.dem_np
|
||
else:
|
||
dem_np, transform, crs = _read_dem(dem_file)
|
||
nan_mask = np.isnan(dem_np)
|
||
|
||
radius_px = max(1, int(radius / resolution))
|
||
if radius_px % 2 == 0:
|
||
radius_px += 1
|
||
|
||
local_min = _filter_nanaware_from_filled(
|
||
shared, xp_minimum_filter, size=radius_px
|
||
) if shared else _filter_nanaware(
|
||
dem_np, xp_minimum_filter, size=radius_px
|
||
)
|
||
|
||
local_max_data = _filter_nanaware_from_filled(
|
||
shared, xp_maximum_filter, size=radius_px
|
||
) if shared else _filter_nanaware(
|
||
dem_np, xp_maximum_filter, size=radius_px
|
||
)
|
||
|
||
# Local dominance ratio
|
||
epsilon = 0.01 # Avoid division by zero on flat terrain
|
||
local_range = local_max_data - local_min + epsilon
|
||
dominance = (dem_np - local_min) / local_range
|
||
dominance = np.clip(dominance, 0, 1)
|
||
dominance[nan_mask] = np.nan
|
||
|
||
_save_tif(output, dominance.astype(np.float32), transform, crs, nan_mask=nan_mask)
|
||
logger.info(f" ✓ Dominance Locale terminée ({time.time()-t0:.1f}s){gpu_tag}")
|
||
return output
|
||
except Exception as e:
|
||
logger.error(f" ✗ Erreur local_dominance: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
def generate_mslrm(dem_file, basename, vis_dir, resolution, shared=None):
|
||
"""Multi-Scale Relief Model (MSRM) - LRM at adaptive scales combined (GPU if available).
|
||
|
||
Scales adapt to resolution. Std normalization per scale.
|
||
Weighted combination favoring archaeologically relevant scales (5-25m).
|
||
"""
|
||
gpu_tag = " [GPU]" if HAS_GPU else ""
|
||
logger.info(f" → Multi-Scale Relief Model (MSRM){gpu_tag}...")
|
||
t0 = time.time()
|
||
output = vis_dir / f"{basename}_mslrm.tif"
|
||
|
||
try:
|
||
if shared:
|
||
transform = shared.transform
|
||
crs = shared.crs
|
||
dem_np = shared.dem_np
|
||
nan_mask = shared.nan_mask
|
||
else:
|
||
dem_np, transform, crs = _read_dem(dem_file)
|
||
nan_mask = np.isnan(dem_np)
|
||
|
||
# Adaptive scales: finer at higher resolution
|
||
min_scale = max(2.0, resolution * 4)
|
||
candidate_scales = [2, 5, 10, 20, 50, 100, 200]
|
||
sigmas = [s for s in candidate_scales if s >= min_scale]
|
||
|
||
# Archaeological weights: favor 5-25m range (ditches, enclosures, tumulus)
|
||
scale_weights = {
|
||
2: 0.8, 5: 2.0, 10: 1.8, 20: 1.5, 50: 1.0, 100: 0.6, 200: 0.4,
|
||
}
|
||
weights = np.array([scale_weights.get(s, 1.0) for s in sigmas])
|
||
|
||
logger.info(f" MSRM échelles: {sigmas}m")
|
||
lrm_stack = []
|
||
|
||
for sigma in sigmas:
|
||
sigma_px = sigma / resolution
|
||
if shared:
|
||
local_mean = _filter_nanaware_from_filled(shared, xp_gaussian_filter, sigma=sigma_px)
|
||
else:
|
||
local_mean = _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_px)
|
||
lrm = dem_np - local_mean
|
||
lrm[nan_mask] = np.nan
|
||
# Std normalization: x / std — preserves sign and contrast better than z-score
|
||
valid_lrm = lrm[~nan_mask]
|
||
lrm_std = max(np.nanstd(valid_lrm), 0.01) if len(valid_lrm) > 0 else 0.01
|
||
lrm = lrm / lrm_std
|
||
lrm_stack.append(lrm.astype(np.float32))
|
||
|
||
# Weighted combination
|
||
lrm_array = np.array(lrm_stack)
|
||
weights_3d = weights[:, np.newaxis, np.newaxis]
|
||
with np.errstate(invalid='ignore', divide='ignore'):
|
||
with warnings.catch_warnings():
|
||
warnings.filterwarnings('ignore', message='Mean of empty slice')
|
||
mslrm = np.sqrt(np.nansum((lrm_array ** 2) * weights_3d, axis=0) / np.sum(weights))
|
||
mslrm[nan_mask] = np.nan
|
||
_save_tif(output, mslrm.astype(np.float32), transform, crs)
|
||
logger.info(f" ✓ MSRM terminé ({time.time()-t0:.1f}s){gpu_tag}")
|
||
return output
|
||
except Exception as e:
|
||
logger.error(f" ✗ Erreur MSRM: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
def generate_tpi(dem_file, basename, vis_dir, resolution, shared=None):
|
||
"""Multi-Scale Topographic Position Index (GPU if available).
|
||
|
||
TPI = elevation - mean(neighborhood).
|
||
Computed at 4 scales with std normalization and weighted combination.
|
||
Weights favor fine and medium scales (archaeologically relevant).
|
||
"""
|
||
gpu_tag = " [GPU]" if HAS_GPU else ""
|
||
logger.info(f" → TPI multi-échelle{gpu_tag}...")
|
||
t0 = time.time()
|
||
output = vis_dir / f"{basename}_tpi.tif"
|
||
|
||
try:
|
||
if shared:
|
||
transform = shared.transform
|
||
crs = shared.crs
|
||
dem_np = shared.dem_np
|
||
nan_mask = shared.nan_mask
|
||
else:
|
||
dem_np, transform, crs = _read_dem(dem_file)
|
||
nan_mask = np.isnan(dem_np)
|
||
|
||
# 4 scales: fine (3m), medium (15m), broad (50m), landscape (200m)
|
||
scales_m = [3, 15, 50, 200]
|
||
weights = [1.5, 2.0, 1.2, 0.5] # Favor medium scales (ditches, enclosures)
|
||
|
||
tpi_stack = []
|
||
for scale_m, weight in zip(scales_m, weights):
|
||
size = max(3, int(scale_m / resolution))
|
||
if size % 2 == 0:
|
||
size += 1
|
||
if shared:
|
||
local_mean = _filter_nanaware_from_filled(shared, xp_uniform_filter, size=size)
|
||
else:
|
||
local_mean = _filter_nanaware(dem_np, xp_uniform_filter, size=size)
|
||
tpi = dem_np - local_mean
|
||
tpi[nan_mask] = np.nan
|
||
# Std normalization — preserves sign and contrast better than z-score
|
||
valid = tpi[~nan_mask]
|
||
tpi_std = max(np.nanstd(valid), 0.01) if len(valid) > 0 else 0.01
|
||
tpi = tpi / tpi_std
|
||
tpi_stack.append(tpi.astype(np.float32))
|
||
|
||
# Weighted combination
|
||
tpi_array = np.array(tpi_stack)
|
||
weights_3d = np.array(weights)[:, np.newaxis, np.newaxis]
|
||
with np.errstate(invalid='ignore', divide='ignore'):
|
||
with warnings.catch_warnings():
|
||
warnings.filterwarnings('ignore', message='Mean of empty slice')
|
||
tpi_combined = np.nansum(tpi_array * weights_3d, axis=0) / np.sum(weights)
|
||
tpi_combined[nan_mask] = np.nan
|
||
|
||
_save_tif(output, tpi_combined.astype(np.float32), transform, crs)
|
||
logger.info(f" ✓ TPI terminé ({time.time()-t0:.1f}s){gpu_tag}")
|
||
return output
|
||
except Exception as e:
|
||
logger.error(f" ✗ Erreur TPI: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
|
||
|
||
# ============================================================
|
||
# SAILORE
|
||
# ============================================================
|
||
|
||
def generate_sailore(dem_file, basename, vis_dir, resolution, shared=None):
|
||
"""SAILORE - Self-Adaptive Improved Local Relief Model (GPU if available).
|
||
|
||
Kernel size adapts to local slope: flat areas get larger kernels,
|
||
steep areas get smaller kernels. Scales adapt to resolution.
|
||
"""
|
||
gpu_tag = " [GPU]" if HAS_GPU else ""
|
||
logger.info(f" → SAILORE (LRM adaptatif){gpu_tag}...")
|
||
t0 = time.time()
|
||
output = vis_dir / f"{basename}_sailore.tif"
|
||
|
||
try:
|
||
if shared:
|
||
transform = shared.transform
|
||
crs = shared.crs
|
||
dem_np = shared.dem_np
|
||
nan_mask = shared.nan_mask
|
||
slope_deg = shared.slope_deg
|
||
else:
|
||
dem_np, transform, crs = _read_dem(dem_file)
|
||
nan_mask = np.isnan(dem_np)
|
||
gy, gx = np.gradient(dem_np, resolution)
|
||
slope = np.arctan(np.sqrt(gx**2 + gy**2))
|
||
slope_deg = np.degrees(slope)
|
||
slope_deg[nan_mask] = np.nan
|
||
|
||
# Adaptive scales: finer at higher resolution
|
||
sigma_min_m = max(1.0, 2.0 * 0.5 / resolution) # 2m at 0.5, ~5m at 0.2
|
||
sigma_mid_m = max(5.0, 13.5 * 0.5 / resolution) # 13.5m at 0.5, ~33m at 0.2
|
||
sigma_max_m = max(5.0, 25.0 * 0.5 / resolution) # 25m at 0.5, ~62m at 0.2
|
||
sigma_min = sigma_min_m / resolution
|
||
sigma_max = sigma_max_m / resolution
|
||
sigma_mid = (sigma_min + sigma_max) / 2
|
||
slope_norm = np.clip(slope_deg / 30.0, 0, 1)
|
||
|
||
if shared:
|
||
lrm_fine = dem_np - _filter_nanaware_from_filled(shared, xp_gaussian_filter, sigma=sigma_min)
|
||
else:
|
||
lrm_fine = dem_np - _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_min)
|
||
lrm_fine[nan_mask] = np.nan
|
||
|
||
if shared:
|
||
lrm_medium = dem_np - _filter_nanaware_from_filled(shared, xp_gaussian_filter, sigma=(sigma_min + sigma_max) / 2)
|
||
else:
|
||
lrm_medium = dem_np - _filter_nanaware(dem_np, xp_gaussian_filter, sigma=(sigma_min + sigma_max) / 2)
|
||
lrm_medium[nan_mask] = np.nan
|
||
|
||
if shared:
|
||
lrm_coarse = dem_np - _filter_nanaware_from_filled(shared, xp_gaussian_filter, sigma=sigma_max)
|
||
else:
|
||
lrm_coarse = dem_np - _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_max)
|
||
lrm_coarse[nan_mask] = np.nan
|
||
|
||
w_fine = slope_norm
|
||
w_medium = 1 - 2 * np.abs(slope_norm - 0.5)
|
||
w_coarse = 1 - slope_norm
|
||
w_total = w_fine + w_medium + w_coarse
|
||
w_total[w_total == 0] = 1
|
||
|
||
sailore = (w_fine * lrm_fine + w_medium * lrm_medium + w_coarse * lrm_coarse) / w_total
|
||
sailore[nan_mask] = np.nan
|
||
|
||
_save_tif(output, sailore.astype(np.float32), transform, crs)
|
||
logger.info(f" ✓ SAILORE terminé ({time.time()-t0:.1f}s){gpu_tag}")
|
||
return output
|
||
except Exception as e:
|
||
logger.error(f" ✗ Erreur SAILORE: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
# ============================================================
|
||
# Roughness
|
||
# ============================================================
|
||
|
||
def generate_roughness(dem_file, basename, vis_dir, resolution, shared=None):
|
||
"""Surface roughness - multi-scale standard deviation (GPU-accelerated).
|
||
|
||
Combines fine (3m) and broad (15m) roughness for better detection
|
||
of archaeological features at multiple scales.
|
||
"""
|
||
gpu_tag = " [GPU]" if HAS_GPU else ""
|
||
logger.info(f" → Rugosité de surface{gpu_tag}...")
|
||
t0 = time.time()
|
||
output = vis_dir / f"{basename}_roughness.tif"
|
||
|
||
try:
|
||
if shared:
|
||
transform = shared.transform
|
||
crs = shared.crs
|
||
dem_np = shared.dem_np
|
||
nan_mask = shared.nan_mask
|
||
else:
|
||
dem_np, transform, crs = _read_dem(dem_file)
|
||
nan_mask = np.isnan(dem_np)
|
||
|
||
# Fine roughness (3m window)
|
||
fine_size = max(3, int(3 / resolution))
|
||
if fine_size % 2 == 0:
|
||
fine_size += 1
|
||
|
||
if shared:
|
||
fine_mean = _filter_nanaware_from_filled(shared, xp_uniform_filter, size=fine_size)
|
||
fine_mean_sq = _filter_nanaware(shared.filled.astype(np.float64)**2, xp_uniform_filter, size=fine_size)
|
||
fine_mean_sq[shared.nan_mask] = np.nan
|
||
else:
|
||
fine_mean = _filter_nanaware(dem_np.astype(np.float64), xp_uniform_filter, size=fine_size)
|
||
fine_mean_sq = _filter_nanaware(dem_np.astype(np.float64)**2, xp_uniform_filter, size=fine_size)
|
||
roughness_fine = np.sqrt(np.maximum(fine_mean_sq - fine_mean * fine_mean, 0))
|
||
roughness_fine[nan_mask] = np.nan
|
||
|
||
# Broad roughness (15m window)
|
||
broad_size = max(3, int(15 / resolution))
|
||
if broad_size % 2 == 0:
|
||
broad_size += 1
|
||
|
||
if shared:
|
||
broad_mean = _filter_nanaware_from_filled(shared, xp_uniform_filter, size=broad_size)
|
||
broad_mean_sq = _filter_nanaware(shared.filled.astype(np.float64)**2, xp_uniform_filter, size=broad_size)
|
||
broad_mean_sq[shared.nan_mask] = np.nan
|
||
else:
|
||
broad_mean = _filter_nanaware(dem_np.astype(np.float64), xp_uniform_filter, size=broad_size)
|
||
broad_mean_sq = _filter_nanaware(dem_np.astype(np.float64)**2, xp_uniform_filter, size=broad_size)
|
||
roughness_broad = np.sqrt(np.maximum(broad_mean_sq - broad_mean * broad_mean, 0))
|
||
roughness_broad[nan_mask] = np.nan
|
||
|
||
# Std normalization per scale then weighted combination
|
||
fine_valid = roughness_fine[~nan_mask]
|
||
broad_valid = roughness_broad[~nan_mask]
|
||
fine_std = max(np.nanstd(fine_valid), 0.01) if len(fine_valid) > 0 else 0.01
|
||
broad_std = max(np.nanstd(broad_valid), 0.01) if len(broad_valid) > 0 else 0.01
|
||
|
||
roughness = 0.7 * roughness_fine / fine_std + 0.3 * roughness_broad / broad_std
|
||
roughness[nan_mask] = np.nan
|
||
|
||
roughness = to_cpu(roughness)
|
||
_save_tif(output, roughness, transform, crs)
|
||
logger.info(f" ✓ Rugosité terminée ({time.time()-t0:.1f}s){gpu_tag}")
|
||
return output
|
||
except Exception as e:
|
||
logger.error(f" ✗ Erreur rugosité: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
# ============================================================
|
||
# Anomalies
|
||
# ============================================================
|
||
|
||
def generate_anomalies(dem_file, basename, vis_dir, resolution, shared=None):
|
||
"""Statistical anomaly detection - std-normalized multi-scale relief + Local Moran's I — GPU if available.
|
||
|
||
Uses MSRM (multi-scale LRM) instead of single-scale LRM for better detection
|
||
of anomalies at all scales.
|
||
"""
|
||
gpu_tag = " [GPU]" if HAS_GPU else ""
|
||
logger.info(f" → Détection anomalies statistiques{gpu_tag}...")
|
||
t0 = time.time()
|
||
output = vis_dir / f"{basename}_anomalies.tif"
|
||
|
||
try:
|
||
if shared:
|
||
transform = shared.transform
|
||
crs = shared.crs
|
||
dem_np = shared.dem_np
|
||
nan_mask = shared.nan_mask
|
||
else:
|
||
dem_np, transform, crs = _read_dem(dem_file)
|
||
nan_mask = np.isnan(dem_np)
|
||
|
||
# Multi-scale LRM: compute MSRM-like combined relief
|
||
min_scale = max(2.0, resolution * 4)
|
||
candidate_scales = [2, 5, 10, 20, 50, 100]
|
||
sigmas = [s for s in candidate_scales if s >= min_scale]
|
||
lrm_stack = []
|
||
|
||
for sigma in sigmas:
|
||
sigma_px = sigma / resolution
|
||
if shared:
|
||
local_mean = _filter_nanaware_from_filled(shared, xp_gaussian_filter, sigma=sigma_px)
|
||
else:
|
||
local_mean = _filter_nanaware(dem_np, xp_gaussian_filter, sigma=sigma_px)
|
||
lrm = dem_np - local_mean
|
||
lrm[nan_mask] = np.nan
|
||
# Std normalization — preserves contrast better than z-score
|
||
valid_lrm = lrm[~nan_mask]
|
||
lrm_std = max(np.nanstd(valid_lrm), 0.01) if len(valid_lrm) > 0 else 0.01
|
||
lrm_norm = lrm / lrm_std
|
||
else:
|
||
lrm_norm = lrm
|
||
lrm_stack.append(lrm_norm.astype(np.float32))
|
||
|
||
# Weighted RMS combination (favor 5-25m scales)
|
||
scale_weights = {2: 0.8, 5: 2.0, 10: 1.8, 20: 1.5, 50: 1.0, 100: 0.6}
|
||
weights = np.array([scale_weights.get(s, 1.0) for s in sigmas])
|
||
lrm_array = np.array(lrm_stack)
|
||
weights_3d = weights[:, np.newaxis, np.newaxis]
|
||
with np.errstate(invalid='ignore', divide='ignore'):
|
||
with warnings.catch_warnings():
|
||
warnings.filterwarnings('ignore', message='Mean of empty slice')
|
||
msrm = np.sqrt(np.nansum((lrm_array ** 2) * weights_3d, axis=0) / np.sum(weights))
|
||
msrm[nan_mask] = np.nan
|
||
|
||
# Std normalization of MSRM — preserves contrast better than z-score
|
||
valid_msrm = msrm[~nan_mask]
|
||
msrm_std = max(np.nanstd(valid_msrm), 0.01) if len(valid_msrm) > 0 else 0.01
|
||
z_score = msrm / msrm_std
|
||
|
||
# Local Moran's I for spatial clustering
|
||
window = max(3, int(10 / resolution))
|
||
if window % 2 == 0:
|
||
window += 1
|
||
|
||
if shared:
|
||
local_mean_z = _filter_nanaware_from_filled(shared, xp_uniform_filter, size=window)
|
||
else:
|
||
local_mean_z = _filter_nanaware(z_score, xp_uniform_filter, size=window)
|
||
z_mean_global = np.nanmean(z_score[~nan_mask]) if np.any(~nan_mask) else 0
|
||
z_std_global = max(np.nanstd(z_score[~nan_mask]), 0.01) if np.any(~nan_mask) else 0.01
|
||
morans_i = z_score * (local_mean_z - z_mean_global) / z_std_global
|
||
anomaly_score = np.abs(z_score) * np.sign(morans_i)
|
||
anomaly_score[nan_mask] = np.nan
|
||
|
||
_save_tif(output, anomaly_score.astype(np.float32), transform, crs)
|
||
logger.info(f" ✓ Anomalies terminé ({time.time()-t0:.1f}s){gpu_tag}")
|
||
return output
|
||
except Exception as e:
|
||
logger.error(f" ✗ Erreur anomalies: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
# ============================================================
|
||
# Wavelet
|
||
# ============================================================
|
||
|
||
def generate_wavelet(dem_file, basename, vis_dir, resolution, shared=None):
|
||
"""Mexican Hat wavelet multi-scale analysis (GPU if available).
|
||
|
||
CWT 2D at multiple scales adapted to resolution.
|
||
- At 0.5m/px: [1, 2, 5, 10, 20, 50, 100]m
|
||
- At 0.2m/px: [0.5, 1, 2, 5, 10, 20, 50, 100]m
|
||
- Higher resolution = more fine scales available
|
||
|
||
Uses std normalization per scale and weighted combination
|
||
with emphasis on archaeologically relevant scales (2-50m).
|
||
"""
|
||
gpu_tag = " [GPU]" if HAS_GPU else ""
|
||
logger.info(f" → Ondelette Mexican Hat multi-échelle{gpu_tag}...")
|
||
t0 = time.time()
|
||
output = vis_dir / f"{basename}_wavelet.tif"
|
||
|
||
try:
|
||
if shared:
|
||
transform = shared.transform
|
||
crs = shared.crs
|
||
dem_np = shared.dem_np
|
||
nan_mask = shared.nan_mask
|
||
filled = shared.filled.astype(np.float64)
|
||
else:
|
||
dem_np, transform, crs = _read_dem(dem_file)
|
||
nan_mask = np.isnan(dem_np)
|
||
filled, _ = _fill_nans(dem_np.astype(np.float64))
|
||
|
||
# Adapt scales to resolution: finer scales available at higher resolution
|
||
min_scale = max(resolution * 2, 1.0)
|
||
candidate_scales = [0.5, 1, 2, 5, 10, 20, 50, 100]
|
||
scales = [s for s in candidate_scales if s >= min_scale]
|
||
|
||
# Weights favor archaeological scales (2-50m: ditches, enclosures, tumulus)
|
||
scale_weights = {
|
||
0.5: 0.6, # Fine texture
|
||
1.0: 0.8, # Micro-relief
|
||
2.0: 1.5, # Small ditches, paths — key scale
|
||
5.0: 2.0, # Fossés, small enclosures — key archaeological scale
|
||
10.0: 1.8, # Medium structures
|
||
20.0: 1.5, # Large enclosures, tumulus
|
||
50.0: 1.0, # Very large enclosures
|
||
100.0: 0.6, # Landscape-level features
|
||
}
|
||
weights = np.array([scale_weights.get(s, 1.0) for s in scales])
|
||
|
||
logger.info(f" Échelles CWT: {scales}m (résolution {resolution}m/px)")
|
||
wavelet_stack = []
|
||
|
||
for scale_m in scales:
|
||
sigma_px = scale_m / resolution
|
||
if HAS_GPU:
|
||
from cupyx.scipy.ndimage import gaussian_laplace as gpu_gaussian_laplace
|
||
response = -gpu_gaussian_laplace(to_gpu(filled), sigma=sigma_px)
|
||
response = to_cpu(response)
|
||
else:
|
||
from scipy.ndimage import gaussian_laplace
|
||
response = -gaussian_laplace(filled, sigma=sigma_px)
|
||
response[nan_mask] = np.nan
|
||
|
||
# Std normalization: scale by standard deviation to make scales comparable
|
||
valid = response[~nan_mask]
|
||
std_val = max(np.nanstd(valid), 0.01) if len(valid) > 0 else 0.01
|
||
response = response / std_val
|
||
wavelet_stack.append(response)
|
||
|
||
# Weighted RMS: sqrt(sum(w * x²) / sum(w))
|
||
# Preserves contrast at key archaeological scales
|
||
stack = np.array(wavelet_stack)
|
||
weights_3d = weights[:, np.newaxis, np.newaxis]
|
||
with np.errstate(invalid='ignore', divide='ignore'):
|
||
with warnings.catch_warnings():
|
||
warnings.filterwarnings('ignore', message='Mean of empty slice')
|
||
combined = np.sqrt(np.nansum((stack ** 2) * weights_3d, axis=0) / np.sum(weights))
|
||
combined[nan_mask] = np.nan
|
||
|
||
_save_tif(output, combined.astype(np.float32), transform, crs)
|
||
logger.info(f" ✓ Ondelette terminée ({time.time()-t0:.1f}s){gpu_tag}")
|
||
return output
|
||
except Exception as e:
|
||
logger.error(f" ✗ Erreur ondelette: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
# ============================================================
|
||
# Flow accumulation
|
||
# ============================================================
|
||
|
||
def _d8_accumulate_numba(flow_dir, nodata_mask, rows, cols):
|
||
"""JIT-compiled D8 flow accumulation loop.
|
||
|
||
Uses numba for ~100x speedup over pure Python loop.
|
||
Falls back to pure Python if numba is unavailable.
|
||
"""
|
||
try:
|
||
from numba import njit
|
||
|
||
@njit(cache=True)
|
||
def _accumulate(flow_dir, nodata_mask, rows, cols):
|
||
dx8 = np.array([1, 1, 0, -1, -1, -1, 0, 1], dtype=np.int8)
|
||
dy8 = np.array([0, 1, 1, 1, 0, -1, -1, -1], dtype=np.int8)
|
||
|
||
flow_acc = np.ones((rows, cols), dtype=np.float32)
|
||
|
||
# Sort cells by elevation (high to low) — walk downhill
|
||
# We use the fact that flow_dir already encodes steepest descent
|
||
# Process from highest to lowest elevation
|
||
for r in range(rows):
|
||
for c in range(cols):
|
||
if nodata_mask[r, c]:
|
||
flow_acc[r, c] = 0.0
|
||
continue
|
||
|
||
# Iterative accumulation: process cells in top-down order
|
||
# Multiple passes until convergence
|
||
for _pass in range(10):
|
||
changed = 0
|
||
for r in range(rows):
|
||
for c in range(cols):
|
||
if nodata_mask[r, c]:
|
||
continue
|
||
d = flow_dir[r, c]
|
||
if d < 0:
|
||
continue
|
||
nr = r + dy8[d]
|
||
nc = c + dx8[d]
|
||
if 0 <= nr < rows and 0 <= nc < cols and not nodata_mask[nr, nc]:
|
||
old_acc = flow_acc[nr, nc]
|
||
flow_acc[nr, nc] += flow_acc[r, c]
|
||
if flow_acc[nr, nc] != old_acc:
|
||
changed += 1
|
||
if changed == 0:
|
||
break
|
||
|
||
return flow_acc
|
||
|
||
return _accumulate(flow_dir, nodata_mask, rows, cols)
|
||
|
||
except ImportError:
|
||
# Fallback: pure Python
|
||
return None
|
||
|
||
|
||
def _priority_flood(dem, nodata_mask):
|
||
"""Priority-flood algorithm for sink filling (Wang & Liu 2006).
|
||
|
||
O(n log n) compared to 50 iterations of minimum_filter.
|
||
Fills pits so water can flow downhill.
|
||
"""
|
||
import heapq
|
||
|
||
rows, cols = dem.shape
|
||
filled = dem.copy()
|
||
closed = nodata_mask.copy()
|
||
open_queue = []
|
||
|
||
# Initialize border cells
|
||
for r in range(rows):
|
||
for c in [0, cols - 1]:
|
||
if not closed[r, c]:
|
||
heapq.heappush(open_queue, (filled[r, c], r, c))
|
||
closed[r, c] = True
|
||
for c in range(1, cols - 1):
|
||
for r in [0, rows - 1]:
|
||
if not closed[r, c]:
|
||
heapq.heappush(open_queue, (filled[r, c], r, c))
|
||
closed[r, c] = True
|
||
|
||
dx8 = [1, 1, 0, -1, -1, -1, 0, 1]
|
||
dy8 = [0, 1, 1, 1, 0, -1, -1, -1]
|
||
|
||
while open_queue:
|
||
elev, r, c = heapq.heappop(open_queue)
|
||
for d in range(8):
|
||
nr, nc = r + dy8[d], c + dx8[d]
|
||
if 0 <= nr < rows and 0 <= nc < cols and not closed[nr, nc]:
|
||
if filled[nr, nc] < elev:
|
||
filled[nr, nc] = elev # Fill the pit
|
||
closed[nr, nc] = True
|
||
heapq.heappush(open_queue, (filled[nr, nc], nr, nc))
|
||
|
||
return filled
|
||
|
||
|
||
def generate_flow(dem_file, basename, vis_dir, resolution, shared=None):
|
||
"""Flow accumulation using D8 algorithm — priority-flood sink filling, accumulation via numba."""
|
||
gpu_tag = " [GPU]" if HAS_GPU else ""
|
||
logger.info(f" → Accumulation de flux D8{gpu_tag}...")
|
||
t0 = time.time()
|
||
output = vis_dir / f"{basename}_flow.tif"
|
||
|
||
try:
|
||
if shared:
|
||
transform = shared.transform
|
||
crs = shared.crs
|
||
dem_np = shared.dem_np
|
||
nodata_mask = shared.nan_mask
|
||
else:
|
||
dem_np, transform, crs = _read_dem(dem_file)
|
||
nodata_mask = np.isnan(dem_np)
|
||
|
||
rows, cols = dem_np.shape
|
||
|
||
# Sink filling — priority-flood (O(n log n), faster than 50× minimum_filter)
|
||
dem_filled_np = _priority_flood(dem_np, nodata_mask)
|
||
|
||
# D8 slope — vectorized
|
||
dx8 = np.array([1, 1, 0, -1, -1, -1, 0, 1], dtype=np.int32)
|
||
dy8 = np.array([0, 1, 1, 1, 0, -1, -1, -1], dtype=np.int32)
|
||
dist8 = np.array([1.0, np.sqrt(2), 1.0, np.sqrt(2), 1.0, np.sqrt(2), 1.0, np.sqrt(2)])
|
||
|
||
flow_dir = np.full((rows, cols), -1, dtype=np.int8)
|
||
max_slope = np.zeros((rows, cols), dtype=np.float64)
|
||
|
||
padded = np.pad(dem_filled_np, 1, mode='constant',
|
||
constant_values=np.nanmax(dem_filled_np[~np.isnan(dem_filled_np)]) + 10000)
|
||
|
||
for d in range(8):
|
||
nx = 1 + dx8[d]
|
||
ny = 1 + dy8[d]
|
||
neighbor_elev = padded[ny:ny + rows, nx:nx + cols]
|
||
slope = (dem_filled_np - neighbor_elev) / (dist8[d] * resolution)
|
||
slope[nodata_mask] = -1
|
||
better = slope > max_slope
|
||
flow_dir[better] = d
|
||
max_slope[better] = slope[better]
|
||
|
||
# D8 accumulation — try numba first, fallback to Python
|
||
result = _d8_accumulate_numba(flow_dir, nodata_mask.astype(np.bool_), rows, cols)
|
||
|
||
if result is not None:
|
||
flow_acc = result
|
||
logger.info(f" Accumulation D8 via numba")
|
||
else:
|
||
# Pure Python fallback (slow for large DEMs)
|
||
logger.info(f" Accumulation D8 via Python (installez numba pour accélérer)")
|
||
flat_dem = dem_filled_np[~nodata_mask].flatten()
|
||
valid_indices = np.where(~nodata_mask.flatten())[0]
|
||
sort_order = valid_indices[np.argsort(-flat_dem)]
|
||
|
||
flow_acc = np.ones((rows, cols), dtype=np.float32)
|
||
flow_acc[nodata_mask] = 0
|
||
|
||
for idx in sort_order:
|
||
r, c = divmod(idx, cols)
|
||
d = flow_dir[r, c]
|
||
if d < 0:
|
||
continue
|
||
nr, nc = r + dy8[d], c + dx8[d]
|
||
if 0 <= nr < rows and 0 <= nc < cols and not nodata_mask[nr, nc]:
|
||
flow_acc[nr, nc] += flow_acc[r, c]
|
||
|
||
flow_log = np.log1p(flow_acc)
|
||
_save_tif(output, flow_log, transform, crs)
|
||
logger.info(f" ✓ Flux terminé ({time.time()-t0:.1f}s){gpu_tag}")
|
||
return output
|
||
except Exception as e:
|
||
logger.error(f" ✗ Erreur flux: {e}", exc_info=True)
|
||
return None |