Pipeline LiDAR optimisé: suppression classification sémantique, ajout flèche nord vectorielle

- Suppression module classification sémantique (non fonctionnel)
- Suppression section rapports (vue synthétique)
- Ajout flèche du nord vectorielle noire (coin supérieur droit, au-dessus légende)
- Pipeline simplifié à 3 étapes: classification sol, génération DTM, visualisations
- Prétraitement ReturnNumber pour fichiers LAZ corrompus
- Orientation nord garantie sur toutes les cartes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jacquin Antoine
2026-05-09 00:46:50 +02:00
parent e642cde7bc
commit 57b3b78593
3 changed files with 86 additions and 410 deletions

View File

@ -30,8 +30,7 @@ RUN pip3 install --no-cache-dir \
# Copy scripts
COPY process_lidar.py /usr/local/bin/
COPY semantic_classifier.py /usr/local/bin/
RUN chmod +x /usr/local/bin/process_lidar.py /usr/local/bin/semantic_classifier.py
RUN chmod +x /usr/local/bin/process_lidar.py
# Create directories with correct permissions
RUN mkdir -p /data/output /data/input && \

View File

@ -48,9 +48,8 @@ class LidarArchaeoPipeline:
self.dtm_dir = self.output_dir / "DTM"
self.vis_dir = self.output_dir / "visualisations"
self.report_dir = self.output_dir / "rapports"
for d in [self.dtm_dir, self.vis_dir, self.report_dir]:
for d in [self.dtm_dir, self.vis_dir]:
d.mkdir(exist_ok=True)
print(f"✓ Pipeline initialisé (Python pur)")
@ -110,7 +109,53 @@ class LidarArchaeoPipeline:
print(f" ✓ Classification déjà effectuée")
return output_las
pipeline_json = self.create_pipeline_json(laz_file, output_las)
# First, try to fix ReturnNumber values if needed
fixed_las = self.temp_dir / f"{laz_file.stem}_fixed.las"
# Create preprocessing pipeline to fix ReturnNumber=0 issue using expression
fix_pipeline = [
{
"type": "readers.las",
"filename": str(laz_file)
},
{
"type": "filters.expression",
"expression": "ReturnNumber = MAX(ReturnNumber, 1)"
},
{
"type": "filters.expression",
"expression": "NumberOfReturns = MAX(NumberOfReturns, 1)"
},
{
"type": "writers.las",
"filename": str(fixed_las),
"extra_dims": "all"
}
]
try:
# Try with fixed data first
fix_json = json.dumps(fix_pipeline)
fix_pipe_file = self.temp_dir / "fix_pipeline.json"
with open(fix_pipe_file, 'w') as f:
f.write(fix_json)
import subprocess
result = subprocess.run(['pdal', 'pipeline', str(fix_pipe_file)],
capture_output=True, text=True)
if result.returncode == 0:
print(f" → Correction ReturnNumber effectuée")
input_for_smrf = fixed_las
else:
print(f" → Erreur correction, utilisation fichier original")
input_for_smrf = laz_file
if fixed_las.exists():
fixed_las.unlink()
except Exception as e:
print(f" → Erreur prétraitement: {e}")
input_for_smrf = laz_file
pipeline_json = self.create_pipeline_json(input_for_smrf, output_las)
pipeline_file = self.temp_dir / "pipeline.json"
with open(pipeline_file, 'w') as f:
@ -670,8 +715,8 @@ class LidarArchaeoPipeline:
# Create figure with white background for JPEG
fig, ax = plt.subplots(figsize=(18, 12), facecolor='white')
# Display data
im = ax.imshow(data, cmap=cmap, aspect='equal')
# Display data with north at the top
im = ax.imshow(data, cmap=cmap, aspect='equal', origin='upper')
# Enhanced colorbar with explicit values
cbar = plt.colorbar(im, ax=ax, pad=0.03, shrink=0.75, aspect=30)
@ -691,11 +736,42 @@ class LidarArchaeoPipeline:
verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
ax.axis('off')
# Add vector north arrow above the colorbar on the right side (black lines)
from matplotlib.patches import FancyArrow, Polygon
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
# Create inset axes positioned above the colorbar on the right
# The colorbar is typically at the right, so we place north arrow above it
north_ax = inset_axes(ax, width="5%", height="8%", loc='upper right',
bbox_to_anchor=(-0.02, 0.15, 1, 1), bbox_transform=ax.transAxes)
north_ax.set_xlim(0, 1)
north_ax.set_ylim(0, 1)
north_ax.axis('off')
# Draw vector arrow pointing north (black lines)
# Main arrow shaft
north_ax.plot([0.5, 0.5], [0.15, 0.65], color='black', linewidth=2.5, zorder=10)
# Arrow head (triangle outline)
arrow_head_outline = [[0.5, 0.25], [0.35, 0.45], [0.5, 0.65], [0.65, 0.45]]
for i in range(len(arrow_head_outline) - 1):
north_ax.plot([arrow_head_outline[i][0], arrow_head_outline[i+1][0]],
[arrow_head_outline[i][1], arrow_head_outline[i+1][1]],
color='black', linewidth=2.5, zorder=10)
# Fill the arrow head
north_ax.add_patch(Polygon([[0.5, 0.25], [0.35, 0.45], [0.5, 0.65], [0.65, 0.45]],
closed=True, facecolor='black', edgecolor='black', zorder=9))
# Add "N" label above the arrow
north_ax.text(0.5, 0.92, 'N', ha='center', va='top',
fontsize=14, fontweight='bold', color='black', zorder=11)
fig.patch.set_facecolor('white')
plt.tight_layout()
# Save as JPEG
plt.savefig(jpg_file, dpi=150, bbox_inches='tight', facecolor='white', format='jpg')
plt.savefig(jpg_file, dpi=150, bbox_inches='tight', pad_inches=0.1, facecolor='white', format='jpg')
plt.close()
# Delete the source TIFF file to save space
@ -735,94 +811,8 @@ class LidarArchaeoPipeline:
return vis_results
def create_overview(self, basename):
"""Create overview image with all visualizations (JPEG)"""
patterns = {
'Hillshade': '*_hillshade_multi.jpg',
'Pente': '*_slope.jpg',
'Aspect': '*_aspect.jpg',
'Courbure': '*_curvature.jpg',
'Éclairage': '*_solar.jpg',
'Sky-View Factor': '*_svf.jpg',
'Local Relief': '*_lrm.jpg',
'Pos. Openness': '*_positive_openness.jpg',
'Neg. Openness': '*_negative_openness.jpg'
}
images = {}
for name, pattern in patterns.items():
files = list(self.vis_dir.glob(pattern))
if files:
images[name] = str(files[0])
if len(images) < 2:
return None
# 3x3 grid for 9 visualizations
fig, axes = plt.subplots(3, 3, figsize=(24, 18), facecolor='white')
axes = axes.flatten()
for idx, (name, img_path) in enumerate(images.items()):
if idx >= 9:
break
img = plt.imread(img_path)
axes[idx].imshow(img)
axes[idx].set_title(name, fontsize=11, fontweight='bold')
axes[idx].axis('off')
# Hide unused subplots
for idx in range(len(images), 9):
axes[idx].axis('off')
# Title
fig.suptitle(
f"Analyse Archéologique LiDAR - {basename}",
fontsize=18,
fontweight='bold',
y=0.98
)
plt.tight_layout()
output = self.report_dir / f"{basename}_overview.jpg"
plt.savefig(output, dpi=120, bbox_inches='tight', facecolor='white', format='jpg')
plt.close()
return output
# ============ Complete Pipeline ============
def run_semantic_classification(self, dtm_file, basename):
"""Run semantic classification on DTM"""
print(f"\n[4/4] Classification Sémantique Automatique...")
try:
# Import semantic classifier
from semantic_classifier import ArchaeoSemanticClassifier
# Create output subdirectory for semantic results
semantic_dir = self.output_dir / "semantic"
semantic_dir.mkdir(exist_ok=True)
# Run classification
classifier = ArchaeoSemanticClassifier(dtm_file, semantic_dir)
results = classifier.process(basename)
print(f" ✓ Classification sémantique terminée")
print(f" → Carte: {results['tif'].name}")
print(f" → Visualisation: {results['jpg'].name}")
stats_file = Path(results['tif']).parent / f"{Path(results['tif']).stem.replace('_semantic', '')}_statistics.json"
print(f" → Statistiques: {stats_file.name}")
return results
except ImportError as e:
print(f" ✗ Module non disponible: {e}")
return None
except Exception as e:
print(f" ✗ Erreur classification: {e}")
import traceback
traceback.print_exc()
return None
def process_file(self, laz_file):
"""Process a single LAZ file"""
basename = laz_file.stem
@ -831,42 +821,23 @@ class LidarArchaeoPipeline:
print(f"{'='*60}")
# Step 1: Ground classification
print(f"\n[1/4] Classification du sol...")
print(f"\n[1/3] Classification du sol...")
las_file = self.classify_ground(laz_file)
if not las_file:
print(f" ✗ Échec classification")
return False
# Step 2: Generate DTM
print(f"\n[2/4] Génération DTM...")
print(f"\n[2/3] Génération DTM...")
dtm_file = self.create_dtm_fast(las_file, basename)
if not dtm_file:
print(f" ✗ Échec DTM")
return False
# Step 3: Visualizations
print(f"\n[3/4] Visualisations archéologiques...")
print(f"\n[3/3] Visualisations archéologiques...")
vis = self.generate_all_visualizations(dtm_file, basename)
# Overview
overview = self.create_overview(basename)
if overview:
print(f"\n ✓ Vue synthétique: {overview}")
# Step 4: Semantic Classification (NEW!)
semantic_results = self.run_semantic_classification(dtm_file, basename)
print(f"\n{basename} traité avec succès !")
return True
# Overview
overview = self.create_overview(basename)
if overview:
print(f"\n ✓ Vue synthétique: {overview}")
# Step 4: Semantic Classification (NEW!)
semantic_results = self.run_semantic_classification(dtm_file, basename)
print(f"\n{basename} traité avec succès !")
return True
@ -908,7 +879,6 @@ class LidarArchaeoPipeline:
print(f"\nRésultats dans: {self.output_dir}")
print(f" • DTM: {self.dtm_dir}")
print(f" • Visualisations JPEG: {self.vis_dir}")
print(f" • Rapports: {self.report_dir}")
# Clean up temporary files to save space
print(f"\nNettoyage des fichiers temporaires...")

View File

@ -1,293 +0,0 @@
#!/usr/bin/env python3
"""
Module de Classification Sémantique Simplifié pour LiDAR Archéologique
Approche robuste avec K-Means pour classification automatique
"""
import numpy as np
import rasterio
from rasterio.transform import from_bounds
from sklearn.cluster import KMeans
from scipy import ndimage
import json
from pathlib import Path
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
class ArchaeoSemanticClassifier:
"""Classification sémantique automatique robuste"""
def __init__(self, dtm_file, output_dir):
self.dtm_file = Path(dtm_file)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
# Load DTM
with rasterio.open(self.dtm_file) as src:
self.dem = src.read(1)
self.transform = src.transform
self.crs = src.crs
self.height, self.width = self.dem.shape
print(f"✓ Classifieur initialisé (DTM: {self.width}x{self.height})")
def extract_features_simple(self):
"""Extraction simplifiée des caractéristiques"""
print(" → Extraction caractéristiques...")
# Calculate gradients
dy, dx = np.gradient(self.dem)
# Slope
slope = np.arctan(np.sqrt(dx**2 + dy**2)) * 180 / np.pi
# Aspect
aspect = np.arctan2(-dy, dx) * 180 / np.pi
aspect = np.mod(aspect, 360)
# Curvature
dz_dx = np.gradient(dx, axis=1)
dz_dy = np.gradient(dy, axis=0)
curvature = (dz_dx + dz_dy) / 2
# Local Relief
from scipy.ndimage import uniform_filter
local_mean = uniform_filter(self.dem, size=int(15/0.5))
local_relief = self.dem - local_mean
return {
'elevation': self.dem,
'slope': slope,
'aspect': aspect,
'curvature': curvature,
'local_relief': local_relief
}
def classify_kmeans(self, n_clusters=6):
"""Classification K-Means robuste"""
print(" → Classification K-Means...")
features = self.extract_features_simple()
# Normalize each feature to 0-1
normalized_features = {}
for name, data in features.items():
min_val = np.percentile(data, 2)
max_val = np.percentile(data, 98)
normalized = np.clip((data - min_val) / (max_val - min_val + 1e-6), 0, 1)
normalized_features[name] = normalized
# Stack features for clustering
feature_stack = np.stack([
normalized_features['elevation'].flatten(),
normalized_features['slope'].flatten(),
normalized_features['curvature'].flatten(),
normalized_features['local_relief'].flatten()
], axis=1)
# Remove NaN values
valid_mask = ~np.isnan(feature_stack).any(axis=1)
feature_stack = feature_stack[valid_mask]
# K-Means clustering with random_state for reproducibility
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10, max_iter=300)
labels_flat = kmeans.fit_predict(feature_stack)
# Create full resolution labels
full_labels = np.zeros(self.height * self.width, dtype=int)
full_indices = np.where(valid_mask)[0]
full_labels[full_indices] = labels_flat + 1 # +1 to shift from 0-based
full_labels = full_labels.reshape(self.height, self.width)
# Interpret clusters
self._interpret_clusters(kmeans.cluster_centers_, features)
return full_labels
def _interpret_clusters(self, centers, features):
"""Interprète les clusters selon les centroïdes"""
interpretations = {}
# Centers are in normalized feature space
# Features order: elevation, slope, curvature, local_relief
for i, center in enumerate(centers):
elev = center[0]
slope = center[1]
curve = center[2]
relief = center[3]
# Interpret based on feature values
if relief > 0.7:
category = "ÉLEVÉE (Tumulus possible)"
elif relief < -0.3:
category = "ENFONCÉ (Fossé, cavité)"
elif abs(curve) > 0.5:
if curve > 0:
category = "CONVEX (Bosse, monticule)"
else:
category = "CONCAVE (Creux, dépression)"
elif slope > 0.6:
category = "PENTE FORTE (Talus, mur)"
elif slope < 0.2 and elev < 0.3:
category = "PLAT (Zone plane)"
else:
category = "TOPOGRAPHIE MIXTE"
interpretations[i + 1] = category # +1 because labels are 1-indexed
print(f" Interprétation des {len(centers)} clusters :")
for i, interp in interpretations.items():
print(f" Classe {i}: {interp}")
def generate_semantic_map(self, labels):
"""Génère une carte sémantique colorée"""
print(" → Génération carte sémantique...")
# Create color map for semantic classes
# 0: Background, 1-6: Different semantic classes
colors = {
0: [200, 200, 200], # Gray: Background/Unknown
1: [139, 69, 19], # Brown: Linear/Walls
2: [128, 0, 128], # Purple: Circular/Mounds
3: [255, 140, 0], # Orange: Elevated
4: [0, 200, 255], # Cyan: Depressed
5: [220, 220, 0], # Yellow: Slope
6: [0, 128, 0] # Green: Vegetation/Natural
}
# Create RGB image
rgb = np.zeros((self.height, self.width, 3), dtype=np.uint8)
for class_id, color in colors.items():
mask = labels == class_id
for c in range(3):
rgb[:, :, c][mask] = color[c]
return rgb
def process(self, basename):
"""Pipeline complet de classification sémantique"""
print(f"\n{'='*60}")
print(f" CLASSIFICATION SÉMANTIQUE - {basename}")
print(f"{'='*60}")
# Run K-Means classification
labels = self.classify_kmeans(n_clusters=6)
# Generate semantic map
rgb = self.generate_semantic_map(labels)
# Save semantic classification
output_tif = self.output_dir / f"{basename}_semantic.tif"
with rasterio.open(
output_tif,
'w',
driver='GTiff',
height=self.height,
width=self.width,
count=1,
dtype='uint8',
crs=self.crs,
transform=self.transform,
compress='lzw'
) as dst:
dst.write(labels, 1)
# Save visualization
output_jpg = self.output_dir / f"{basename}_semantic.jpg"
plt.figure(figsize=(16, 12), facecolor='white')
plt.imshow(rgb)
# Create legend
legend_elements = [
plt.Rectangle((0, 0), 1, 1, facecolor=np.array(c)/255, edgecolor='black', label=label)
for label, c in [
("Inconnu/Fond", [200, 200, 200]),
("Linéaire (murs)", [139, 69, 19]),
("Circulaire (tumulus)", [128, 0, 128]),
("Élevé (monticules)", [255, 140, 0]),
("Enfoncé (fossés)", [0, 200, 255]),
("Pente forte (talus)", [220, 220, 0]),
("Naturel", [0, 128, 0])
]
]
plt.legend(handles=legend_elements, loc='upper right', fontsize=11)
plt.title(f"Classification Sémantique LiDAR - {basename}\n",
fontsize=14, fontweight='bold', pad=15)
plt.axis('off')
plt.tight_layout()
plt.savefig(output_jpg, dpi=150, bbox_inches='tight', format='jpg')
plt.close()
# Generate statistics
stats = self._generate_stats(labels, basename)
print(f"\n✓ Classification terminée !")
print(f" • Carte sémantique: {output_tif.name}")
print(f" • Visualisation: {output_jpg.name}")
print(f" • Statistiques: {self.output_dir / f'{basename}_statistics.json'}")
return {
'labels': labels,
'tif': output_tif,
'jpg': output_jpg,
'stats': stats
}
def _generate_stats(self, labels, basename):
"""Génère les statistiques de classification"""
print(" → Génération statistiques...")
total_pixels = labels.size
stats = {}
class_names = {
0: "Inconnu/Fond",
1: "Linéaire",
2: "Circulaire",
3: "Élevée",
4: "Enfoncée",
5: "Pente forte",
6: "Naturel"
}
for class_id in range(7):
count = np.sum(labels == class_id)
percentage = (count / total_pixels) * 100
if count > 0:
stats[class_id] = {
'name': class_names[class_id],
'count': int(count),
'percentage': float(percentage)
}
# Save as JSON
stats_file = self.output_dir / f"{basename}_statistics.json"
with open(stats_file, 'w') as f:
json.dump(stats, f, indent=2, default=lambda x: float(x) if isinstance(x, (np.floating, np.integer)) else x)
# Print summary
print(f"\n 📊 Statistiques :")
for class_id, info in stats.items():
print(f" {info['name']}: {info['count']:.0f} px ({info['percentage']:.1f}%)")
return stats
def main():
import sys
if len(sys.argv) < 2:
print("Usage: python semantic_classifier.py <dtm_file.tif> [output_dir]")
sys.exit(1)
dtm_file = sys.argv[1]
output_dir = sys.argv[2] if len(sys.argv) > 2 else "semantic_output"
classifier = ArchaeoSemanticClassifier(dtm_file, output_dir)
classifier.process(Path(dtm_file).stem)
if __name__ == "__main__":
main()