"""GPU acceleration helpers for LiDAR pipeline. Provides CuPy/numpy abstraction layer. If CuPy is available and a CUDA GPU is detected, array operations are accelerated on the GPU. Otherwise, all operations fall back to numpy/scipy on CPU. GPU errors (e.g. in forked subprocesses) are caught gracefully and cause an automatic fallback to CPU for the current operation. """ import logging import numpy as np from scipy import ndimage logger = logging.getLogger("lidar") # GPU detection - must happen at import time HAS_GPU = False _gpu_name = None _gpu_mem_gb = 0 _xp = np # Default: CPU _cp = None # cupy module (or None) _cp_ndimage = None # cupyx.scipy.ndimage (or None) try: import cupy as _cupy import cupyx.scipy.ndimage as _cupy_ndimage _gpu_info = _cupy.cuda.runtime.getDeviceProperties(0) _gpu_name = _gpu_info['name'].decode() if isinstance(_gpu_info['name'], bytes) else str(_gpu_info['name']) _gpu_mem_gb = _gpu_info['totalGlobalMem'] // (1024 ** 3) HAS_GPU = True _xp = _cupy _cp = _cupy _cp_ndimage = _cupy_ndimage except (ImportError, Exception): pass def _gpu_available(): """Check if GPU is usable right now (may fail in forked subprocesses).""" if not HAS_GPU: return False try: _cp.cuda.runtime.getDevice() return True except Exception: return False def log_gpu_status(): """Log GPU detection result. Called after logging is configured.""" if _gpu_available(): logger.info(f"GPU détectée: {_gpu_name} ({_gpu_mem_gb} Go VRAM)") else: logger.info("Pas de GPU — mode CPU uniquement") def to_gpu(arr): """Send array to GPU if available, otherwise return as float32 numpy. Uses float32 to reduce GPU memory usage. Falls back to CPU if GPU is unavailable (e.g. in forked subprocess). """ if _gpu_available(): try: return _cp.asarray(arr.astype(np.float32)) except Exception: pass # Fall back to CPU return arr.astype(np.float32) def to_cpu(arr): """Bring array back to CPU (numpy). No-op if already on CPU.""" if _cp is not None and isinstance(arr, _cp.ndarray): try: return _cp.asnumpy(arr) except Exception: pass # Already on CPU or GPU error return arr def xp_gaussian_filter(arr, sigma): """Gaussian filter — uses GPU if array is on GPU, CPU otherwise.""" if _cp is not None and isinstance(arr, _cp.ndarray): try: return _cp_ndimage.gaussian_filter(arr, sigma) except Exception: arr = to_cpu(arr) return ndimage.gaussian_filter(arr, sigma) def xp_uniform_filter(arr, size): """Uniform filter — uses GPU if array is on GPU, CPU otherwise.""" if _cp is not None and isinstance(arr, _cp.ndarray): try: return _cp_ndimage.uniform_filter(arr, size) except Exception: arr = to_cpu(arr) return ndimage.uniform_filter(arr, size) def xp_minimum_filter(arr, footprint=None, size=None): """Minimum filter — uses GPU if array is on GPU, CPU otherwise.""" if _cp is not None and isinstance(arr, _cp.ndarray): try: return _cp_ndimage.minimum_filter(arr, footprint=footprint, size=size) except Exception: arr = to_cpu(arr) return ndimage.minimum_filter(arr, footprint=footprint, size=size) def xp_maximum_filter(arr, footprint=None, size=None): """Maximum filter — uses GPU if array is on GPU, CPU otherwise.""" if _cp is not None and isinstance(arr, _cp.ndarray): try: return _cp_ndimage.maximum_filter(arr, footprint=footprint, size=size) except Exception: arr = to_cpu(arr) return ndimage.maximum_filter(arr, footprint=footprint, size=size) def gpu_cleanup(): """Free GPU memory. Call between visualizations to prevent OOM.""" if _cp is not None: try: _cp.get_default_memory_pool().free_all_blocks() except Exception: pass