Skip to content

Day 06 - Perf

Learning Objectives

This module covers advanced performance engineering and reliability practices essential for enterprise geospatial systems operating at scale. By the end of this module, you will understand:

  • Performance measurement and profiling techniques for identifying bottlenecks in geospatial applications
  • Memory optimization strategies for processing large-scale spatial datasets efficiently
  • Concurrency and parallelism patterns optimized for spatial operations and I/O-bound workloads
  • Caching architectures with spatial awareness and geographic distribution strategies
  • Production observability including metrics, logging, tracing, and alerting for geospatial services
  • Reliability engineering patterns including circuit breakers, bulkheads, and graceful degradation
  • Capacity planning and auto-scaling for geospatial workloads with variable spatial distributions
  • Database performance optimization for spatial queries and large geographic datasets

Theoretical Foundation

Performance Characteristics of Geospatial Systems

Spatial Data Access Patterns: Geospatial applications exhibit unique performance characteristics: - Spatial locality: Queries often cluster around geographic regions - Multi-scale access: Same data accessed at different zoom levels and resolutions - Temporal patterns: Usage follows human activity patterns (diurnal, seasonal) - Hotspot phenomena: Popular locations create uneven load distribution

Performance Bottlenecks in Spatial Systems: - I/O bound operations: Database queries, file system access, network tile fetching - CPU bound operations: Coordinate transformations, geometric computations, spatial indexing - Memory bound operations: Large dataset processing, spatial joins, buffer operations - Network bound operations: Tile serving, real-time location updates, distributed queries

Scaling Challenges: - Geographic load balancing: Distributing work based on spatial boundaries - Cache efficiency: Balancing hit rates with storage costs across geographic regions - Data locality: Minimizing network overhead for spatially-related data - Query optimization: Spatial query plans with varying selectivity ratios

Site Reliability Engineering for Geospatial Services

Availability Requirements: - Consumer applications: 99.9% availability (8.77 hours/year downtime) - Enterprise systems: 99.95% availability (4.38 hours/year downtime) - Safety-critical systems: 99.99% availability (52.6 minutes/year downtime) - Global mapping services: 99.999% availability (5.26 minutes/year downtime)

Error Budgets and SLOs: - Latency SLOs: p95 < 100ms for interactive mapping, p99 < 500ms for routing - Throughput SLOs: 10,000+ QPS for tile serving, 1,000+ QPS for geocoding - Error rate SLOs: < 0.1% for critical path operations, < 1% for non-critical features

Reliability Patterns: - Circuit breakers: Prevent cascade failures between geospatial services - Bulkhead isolation: Separate resource pools for different geographic regions - Graceful degradation: Fallback to cached or simplified data during outages - Chaos engineering: Systematic testing of failure scenarios in spatial systems

Performance Measurement and Profiling

Advanced Profiling Techniques

import cProfile
import pstats
import tracemalloc
import psutil
import time
from typing import Dict, List, Callable, Any
from contextlib import contextmanager
from dataclasses import dataclass
from functools import wraps

@dataclass
class PerformanceMetrics:
    """Comprehensive performance metrics for geospatial operations."""
    duration_seconds: float
    memory_peak_mb: float
    memory_current_mb: float
    cpu_percent: float
    disk_io_read_mb: float
    disk_io_write_mb: float
    network_bytes_sent: int
    network_bytes_recv: int
    spatial_operations_count: int
    cache_hit_rate: float

class GeospatialProfiler:
    """Advanced profiler for geospatial operations."""

    def __init__(self):
        self.metrics_history = []
        self.active_profiles = {}
        self.process = psutil.Process()

    @contextmanager
    def profile_operation(self, operation_name: str):
        """Profile a geospatial operation with comprehensive metrics."""
        # Start resource monitoring
        start_time = time.time()
        tracemalloc.start()

        # Baseline measurements
        baseline_memory = self.process.memory_info().rss / 1024 / 1024  # MB
        baseline_io = self.process.io_counters()
        baseline_net = psutil.net_io_counters()

        # CPU profiling
        profiler = cProfile.Profile()
        profiler.enable()

        try:
            yield self
        finally:
            # Stop profiling
            profiler.disable()

            # Collect final measurements
            end_time = time.time()
            final_memory = self.process.memory_info().rss / 1024 / 1024
            final_io = self.process.io_counters()
            final_net = psutil.net_io_counters()

            # Memory profiling
            current_mem, peak_mem = tracemalloc.get_traced_memory()
            tracemalloc.stop()

            # Calculate metrics
            metrics = PerformanceMetrics(
                duration_seconds=end_time - start_time,
                memory_peak_mb=peak_mem / 1024 / 1024,
                memory_current_mb=current_mem / 1024 / 1024,
                cpu_percent=self.process.cpu_percent(),
                disk_io_read_mb=(final_io.read_bytes - baseline_io.read_bytes) / 1024 / 1024,
                disk_io_write_mb=(final_io.write_bytes - baseline_io.write_bytes) / 1024 / 1024,
                network_bytes_sent=final_net.bytes_sent - baseline_net.bytes_sent,
                network_bytes_recv=final_net.bytes_recv - baseline_net.bytes_recv,
                spatial_operations_count=getattr(self, '_spatial_ops_count', 0),
                cache_hit_rate=getattr(self, '_cache_hit_rate', 0.0)
            )

            # Store profile results
            self.active_profiles[operation_name] = {
                'metrics': metrics,
                'profiler': profiler,
                'timestamp': end_time
            }

            self.metrics_history.append((operation_name, metrics))

    def get_hotspots(self, operation_name: str, top_n: int = 10) -> List[Dict]:
        """Get performance hotspots for an operation."""
        if operation_name not in self.active_profiles:
            return []

        profiler = self.active_profiles[operation_name]['profiler']
        stats = pstats.Stats(profiler)

        # Sort by cumulative time
        stats.sort_stats('cumulative')

        # Extract top functions
        hotspots = []
        for func_info in stats.get_stats_profile().func_profiles.values():
            if len(hotspots) >= top_n:
                break

            hotspots.append({
                'function': func_info.func_name,
                'filename': func_info.filename,
                'line_number': func_info.line_number,
                'cumulative_time': func_info.cumulative_time,
                'internal_time': func_info.internal_time,
                'call_count': func_info.call_count
            })

        return hotspots

    def generate_performance_report(self) -> Dict[str, Any]:
        """Generate comprehensive performance report."""
        if not self.metrics_history:
            return {}

        # Aggregate metrics
        total_operations = len(self.metrics_history)
        avg_duration = sum(m.duration_seconds for _, m in self.metrics_history) / total_operations
        max_memory = max(m.memory_peak_mb for _, m in self.metrics_history)
        total_disk_io = sum(m.disk_io_read_mb + m.disk_io_write_mb for _, m in self.metrics_history)

        # Identify bottlenecks
        slow_operations = [
            (name, metrics) for name, metrics in self.metrics_history 
            if metrics.duration_seconds > avg_duration * 2
        ]

        memory_intensive = [
            (name, metrics) for name, metrics in self.metrics_history
            if metrics.memory_peak_mb > max_memory * 0.8
        ]

        return {
            'summary': {
                'total_operations': total_operations,
                'average_duration_seconds': avg_duration,
                'max_memory_mb': max_memory,
                'total_disk_io_mb': total_disk_io,
                'overall_cache_hit_rate': sum(m.cache_hit_rate for _, m in self.metrics_history) / total_operations
            },
            'bottlenecks': {
                'slow_operations': [(name, m.duration_seconds) for name, m in slow_operations],
                'memory_intensive': [(name, m.memory_peak_mb) for name, m in memory_intensive]
            },
            'recommendations': self._generate_recommendations()
        }

    def _generate_recommendations(self) -> List[str]:
        """Generate performance optimization recommendations."""
        recommendations = []

        if not self.metrics_history:
            return recommendations

        avg_memory = sum(m.memory_peak_mb for _, m in self.metrics_history) / len(self.metrics_history)
        avg_cache_hit = sum(m.cache_hit_rate for _, m in self.metrics_history) / len(self.metrics_history)

        if avg_memory > 1000:  # > 1GB average
            recommendations.append("Consider implementing streaming/chunked processing for large datasets")

        if avg_cache_hit < 0.7:  # < 70% cache hit rate
            recommendations.append("Optimize caching strategy - consider larger cache size or better eviction policies")

        high_io_ops = [m for _, m in self.metrics_history if m.disk_io_read_mb + m.disk_io_write_mb > 100]
        if len(high_io_ops) > len(self.metrics_history) * 0.3:
            recommendations.append("High disk I/O detected - consider SSD storage or better I/O optimization")

        return recommendations

def performance_monitor(operation_type: str = "spatial_operation"):
    """Decorator for monitoring performance of spatial operations."""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        async def async_wrapper(*args, **kwargs):
            profiler = GeospatialProfiler()
            with profiler.profile_operation(f"{operation_type}_{func.__name__}"):
                result = await func(*args, **kwargs)

                # Log performance metrics
                metrics = profiler.metrics_history[-1][1]
                logger.info(f"Performance metrics for {func.__name__}", extra={
                    'duration_seconds': metrics.duration_seconds,
                    'memory_peak_mb': metrics.memory_peak_mb,
                    'operation_type': operation_type
                })

                return result

        @wraps(func)
        def sync_wrapper(*args, **kwargs):
            profiler = GeospatialProfiler()
            with profiler.profile_operation(f"{operation_type}_{func.__name__}"):
                result = func(*args, **kwargs)

                # Log performance metrics
                metrics = profiler.metrics_history[-1][1]
                logger.info(f"Performance metrics for {func.__name__}", extra={
                    'duration_seconds': metrics.duration_seconds,
                    'memory_peak_mb': metrics.memory_peak_mb,
                    'operation_type': operation_type
                })

                return result

        # Return appropriate wrapper based on function type
        return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper

    return decorator

Memory Optimization Strategies

import gc
import weakref
from typing import Optional, Dict, Any, Iterator
from collections import OrderedDict
import numpy as np
from shapely.geometry import Geometry
import gzip
import pickle

class SpatialMemoryManager:
    """Advanced memory management for geospatial operations."""

    def __init__(self, max_memory_gb: float = 4.0):
        self.max_memory_bytes = max_memory_gb * 1024 * 1024 * 1024
        self.geometry_cache = SpatialLRUCache(max_size_mb=1000)
        self.compression_ratio = 0.3  # Estimated compression ratio
        self.weak_refs = weakref.WeakValueDictionary()

    @contextmanager
    def memory_limit_context(self, operation_name: str):
        """Context manager for memory-limited operations."""
        initial_memory = self._get_memory_usage()

        try:
            yield self
        finally:
            current_memory = self._get_memory_usage()
            memory_delta = current_memory - initial_memory

            if memory_delta > self.max_memory_bytes * 0.8:
                logger.warning(f"High memory usage in {operation_name}: {memory_delta / 1024 / 1024:.1f} MB")
                self._aggressive_cleanup()

    def optimize_geometry_collection(self, geometries: List[Geometry]) -> 'OptimizedGeometryCollection':
        """Optimize a collection of geometries for memory efficiency."""
        # Analyze geometry characteristics
        total_coords = sum(len(geom.coords) if hasattr(geom, 'coords') else 0 for geom in geometries)
        avg_complexity = total_coords / len(geometries) if geometries else 0

        if avg_complexity > 1000:  # High complexity geometries
            return self._create_compressed_collection(geometries)
        elif len(geometries) > 10000:  # Many simple geometries
            return self._create_indexed_collection(geometries)
        else:
            return self._create_standard_collection(geometries)

    def _create_compressed_collection(self, geometries: List[Geometry]) -> 'CompressedGeometryCollection':
        """Create memory-efficient compressed geometry collection."""
        compressed_data = []

        for geom in geometries:
            # Serialize and compress geometry
            wkb_data = geom.wkb
            compressed = gzip.compress(wkb_data)
            compressed_data.append(compressed)

        return CompressedGeometryCollection(compressed_data, self.compression_ratio)

    def _create_indexed_collection(self, geometries: List[Geometry]) -> 'IndexedGeometryCollection':
        """Create spatially-indexed geometry collection."""
        # Build spatial index with minimal memory footprint
        spatial_index = rtree.index.Index()
        geometry_store = {}

        for i, geom in enumerate(geometries):
            # Store only bounding box in index
            spatial_index.insert(i, geom.bounds)

            # Store geometry with weak reference
            geometry_store[i] = geom
            self.weak_refs[f"geom_{i}"] = geom

        return IndexedGeometryCollection(spatial_index, geometry_store)

    def stream_large_dataset(self, dataset_path: str, 
                           chunk_size: int = 10000) -> Iterator[List[Geometry]]:
        """Stream large datasets with memory management."""
        with open(dataset_path, 'rb') as f:
            while True:
                with self.memory_limit_context(f"stream_chunk"):
                    chunk = self._read_chunk(f, chunk_size)
                    if not chunk:
                        break

                    # Process chunk
                    processed_chunk = self._process_geometry_chunk(chunk)
                    yield processed_chunk

                    # Explicit cleanup
                    del chunk, processed_chunk
                    gc.collect()

    def _aggressive_cleanup(self):
        """Perform aggressive memory cleanup."""
        # Clear caches
        self.geometry_cache.clear()

        # Force garbage collection
        gc.collect()

        # Clear weak references
        self.weak_refs.clear()

        # Log memory recovery
        current_memory = self._get_memory_usage()
        logger.info(f"Memory cleanup completed. Current usage: {current_memory / 1024 / 1024:.1f} MB")

class SpatialLRUCache:
    """LRU cache optimized for spatial data."""

    def __init__(self, max_size_mb: int = 1000):
        self.max_size_bytes = max_size_mb * 1024 * 1024
        self.current_size = 0
        self.cache = OrderedDict()
        self.size_tracker = {}

    def get(self, key: str) -> Optional[Any]:
        """Get item from cache and move to end (most recently used)."""
        if key in self.cache:
            # Move to end
            value = self.cache.pop(key)
            self.cache[key] = value
            return value
        return None

    def put(self, key: str, value: Any, estimated_size: int = None):
        """Put item in cache with size tracking."""
        if estimated_size is None:
            estimated_size = self._estimate_size(value)

        # Remove existing item if present
        if key in self.cache:
            self.current_size -= self.size_tracker[key]
            del self.cache[key]
            del self.size_tracker[key]

        # Evict items if necessary
        while self.current_size + estimated_size > self.max_size_bytes and self.cache:
            self._evict_lru()

        # Add new item
        self.cache[key] = value
        self.size_tracker[key] = estimated_size
        self.current_size += estimated_size

    def _evict_lru(self):
        """Evict least recently used item."""
        if self.cache:
            key, _ = self.cache.popitem(last=False)  # FIFO - least recently used
            size = self.size_tracker.pop(key)
            self.current_size -= size

    def _estimate_size(self, value: Any) -> int:
        """Estimate memory size of cached value."""
        try:
            return len(pickle.dumps(value))
        except:
            # Fallback estimation
            if isinstance(value, (str, bytes)):
                return len(value)
            elif isinstance(value, (list, tuple)):
                return sum(self._estimate_size(item) for item in value[:10])  # Sample first 10
            else:
                return 1024  # Conservative estimate

Concurrency and Parallelism Optimization

import asyncio
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from typing import List, Callable, Any, Awaitable
import numpy as np
from functools import partial

class SpatialConcurrencyManager:
    """Optimized concurrency management for spatial operations."""

    def __init__(self):
        self.cpu_count = mp.cpu_count()
        self.io_thread_pool = ThreadPoolExecutor(max_workers=min(32, self.cpu_count * 4))
        self.cpu_process_pool = ProcessPoolExecutor(max_workers=self.cpu_count)
        self.semaphores = {
            'disk_io': asyncio.Semaphore(4),  # Limit concurrent disk operations
            'network_io': asyncio.Semaphore(20),  # Network operations
            'cpu_intensive': asyncio.Semaphore(self.cpu_count),  # CPU-bound operations
            'memory_intensive': asyncio.Semaphore(2)  # Memory-intensive operations
        }

    async def parallel_spatial_operation(self, 
                                       geometries: List[Geometry], 
                                       operation: Callable,
                                       chunk_size: int = None,
                                       operation_type: str = 'cpu_intensive') -> List[Any]:
        """Execute spatial operations in parallel with optimal batching."""

        if chunk_size is None:
            chunk_size = max(1, len(geometries) // (self.cpu_count * 2))

        # Split geometries into chunks
        chunks = [geometries[i:i + chunk_size] for i in range(0, len(geometries), chunk_size)]

        # Choose execution strategy based on operation type
        if operation_type == 'cpu_intensive':
            return await self._cpu_parallel_execution(chunks, operation)
        elif operation_type == 'io_bound':
            return await self._io_parallel_execution(chunks, operation)
        else:
            return await self._mixed_parallel_execution(chunks, operation)

    async def _cpu_parallel_execution(self, chunks: List[List[Geometry]], 
                                    operation: Callable) -> List[Any]:
        """Execute CPU-intensive operations using process pool."""
        loop = asyncio.get_event_loop()

        # Prepare serializable operation
        if hasattr(operation, '__self__'):
            # Handle bound methods by converting to function
            operation = partial(operation.__func__, operation.__self__)

        tasks = []
        for chunk in chunks:
            async with self.semaphores['cpu_intensive']:
                task = loop.run_in_executor(
                    self.cpu_process_pool,
                    self._process_geometry_chunk,
                    chunk, operation
                )
                tasks.append(task)

        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Flatten results and handle exceptions
        flattened_results = []
        for result in results:
            if isinstance(result, Exception):
                logger.error(f"CPU operation failed: {result}")
                continue
            flattened_results.extend(result)

        return flattened_results

    async def _io_parallel_execution(self, chunks: List[List[Geometry]], 
                                   operation: Callable) -> List[Any]:
        """Execute I/O-bound operations using thread pool."""
        loop = asyncio.get_event_loop()

        tasks = []
        for chunk in chunks:
            async with self.semaphores['network_io']:
                task = loop.run_in_executor(
                    self.io_thread_pool,
                    self._process_geometry_chunk,
                    chunk, operation
                )
                tasks.append(task)

        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Flatten and filter results
        flattened_results = []
        for result in results:
            if isinstance(result, Exception):
                logger.error(f"I/O operation failed: {result}")
                continue
            flattened_results.extend(result)

        return flattened_results

    def _process_geometry_chunk(self, chunk: List[Geometry], operation: Callable) -> List[Any]:
        """Process a chunk of geometries with the given operation."""
        try:
            return [operation(geom) for geom in chunk]
        except Exception as e:
            logger.error(f"Chunk processing failed: {e}")
            return []

    async def adaptive_batch_processing(self, 
                                      data_stream: AsyncIterator[Geometry],
                                      processor: Callable,
                                      target_latency_ms: float = 100) -> AsyncIterator[Any]:
        """Adaptive batch processing that adjusts batch size based on performance."""

        batch_size = 10  # Initial batch size
        batch = []
        processing_times = []

        async for item in data_stream:
            batch.append(item)

            if len(batch) >= batch_size:
                # Process batch and measure time
                start_time = time.time()

                async with self.semaphores['cpu_intensive']:
                    results = await self.parallel_spatial_operation(
                        batch, processor, chunk_size=batch_size//2
                    )

                processing_time = (time.time() - start_time) * 1000  # ms
                processing_times.append(processing_time)

                # Yield results
                for result in results:
                    yield result

                # Adapt batch size based on performance
                batch_size = self._adapt_batch_size(
                    batch_size, processing_time, target_latency_ms, processing_times
                )

                batch = []

        # Process remaining items
        if batch:
            async with self.semaphores['cpu_intensive']:
                results = await self.parallel_spatial_operation(batch, processor)
                for result in results:
                    yield result

    def _adapt_batch_size(self, current_size: int, last_time: float, 
                         target_time: float, history: List[float]) -> int:
        """Adapt batch size based on performance metrics."""
        if len(history) < 3:
            return current_size

        avg_time = sum(history[-3:]) / 3

        if avg_time > target_time * 1.2:  # Too slow
            return max(1, int(current_size * 0.8))
        elif avg_time < target_time * 0.5:  # Too fast, can increase
            return min(1000, int(current_size * 1.2))
        else:
            return current_size  # Good performance, keep current size

class SpatialWorkloadBalancer:
    """Load balancer for spatial workloads across multiple workers."""

    def __init__(self, worker_count: int = None):
        self.worker_count = worker_count or mp.cpu_count()
        self.workers = []
        self.workload_stats = {}

    async def distribute_spatial_workload(self, 
                                        spatial_tasks: List[Dict],
                                        distribution_strategy: str = 'geographic') -> List[Any]:
        """Distribute spatial workload across workers."""

        if distribution_strategy == 'geographic':
            task_groups = self._group_by_geography(spatial_tasks)
        elif distribution_strategy == 'complexity':
            task_groups = self._group_by_complexity(spatial_tasks)
        else:
            task_groups = self._round_robin_distribution(spatial_tasks)

        # Execute task groups in parallel
        tasks = []
        for i, task_group in enumerate(task_groups):
            worker_id = i % self.worker_count
            task = self._execute_on_worker(worker_id, task_group)
            tasks.append(task)

        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Collect and flatten results
        all_results = []
        for result in results:
            if isinstance(result, Exception):
                logger.error(f"Worker task failed: {result}")
                continue
            all_results.extend(result)

        return all_results

    def _group_by_geography(self, tasks: List[Dict]) -> List[List[Dict]]:
        """Group tasks by geographic proximity."""
        # Implementation would use spatial clustering
        # For now, simple bbox-based grouping
        groups = [[] for _ in range(self.worker_count)]

        for i, task in enumerate(tasks):
            if 'bbox' in task:
                # Simple hash-based assignment based on bbox center
                center_x = (task['bbox'][0] + task['bbox'][2]) / 2
                center_y = (task['bbox'][1] + task['bbox'][3]) / 2
                group_id = hash((int(center_x * 1000), int(center_y * 1000))) % self.worker_count
                groups[group_id].append(task)
            else:
                groups[i % self.worker_count].append(task)

        return groups

Advanced Caching Strategies

import redis
import json
import hashlib
from typing import Optional, Dict, Any, Union
import geojson
from shapely.geometry import shape, box

class SpatialCacheManager:
    """Multi-tier spatial caching with geographic awareness."""

    def __init__(self, redis_client: redis.Redis = None):
        self.redis_client = redis_client or redis.Redis(host='localhost', port=6379, db=0)
        self.local_cache = SpatialLRUCache(max_size_mb=500)
        self.cache_stats = {
            'hits': 0,
            'misses': 0,
            'evictions': 0,
            'memory_pressure_events': 0
        }

    async def get_spatial_data(self, bbox: Tuple[float, float, float, float],
                              zoom_level: int, layer: str) -> Optional[Dict]:
        """Get spatial data with multi-tier caching."""
        cache_key = self._generate_spatial_key(bbox, zoom_level, layer)

        # L1: Check local memory cache
        local_result = self.local_cache.get(cache_key)
        if local_result:
            self.cache_stats['hits'] += 1
            return local_result

        # L2: Check Redis cache
        redis_result = await self._get_from_redis(cache_key)
        if redis_result:
            # Store in local cache for faster future access
            self.local_cache.put(cache_key, redis_result)
            self.cache_stats['hits'] += 1
            return redis_result

        self.cache_stats['misses'] += 1
        return None

    async def set_spatial_data(self, bbox: Tuple[float, float, float, float],
                              zoom_level: int, layer: str, data: Dict,
                              ttl_seconds: int = 3600):
        """Set spatial data in multi-tier cache."""
        cache_key = self._generate_spatial_key(bbox, zoom_level, layer)

        # Optimize data for caching
        optimized_data = self._optimize_for_cache(data, zoom_level)

        # Store in both cache tiers
        self.local_cache.put(cache_key, optimized_data)
        await self._set_in_redis(cache_key, optimized_data, ttl_seconds)

    def _generate_spatial_key(self, bbox: Tuple[float, float, float, float],
                             zoom_level: int, layer: str) -> str:
        """Generate cache key for spatial data."""
        # Normalize bbox to reduce cache fragmentation
        normalized_bbox = self._normalize_bbox(bbox, zoom_level)

        key_data = {
            'bbox': normalized_bbox,
            'zoom': zoom_level,
            'layer': layer
        }

        # Create hash for consistent key generation
        key_string = json.dumps(key_data, sort_keys=True)
        return f"spatial:{hashlib.md5(key_string.encode()).hexdigest()}"

    def _normalize_bbox(self, bbox: Tuple[float, float, float, float],
                       zoom_level: int) -> Tuple[float, float, float, float]:
        """Normalize bbox to grid boundaries for better cache hit rates."""
        # Grid size based on zoom level
        grid_size = 1.0 / (2 ** max(0, zoom_level - 10))

        # Snap to grid
        min_x = math.floor(bbox[0] / grid_size) * grid_size
        min_y = math.floor(bbox[1] / grid_size) * grid_size
        max_x = math.ceil(bbox[2] / grid_size) * grid_size
        max_y = math.ceil(bbox[3] / grid_size) * grid_size

        return (min_x, min_y, max_x, max_y)

    def _optimize_for_cache(self, data: Dict, zoom_level: int) -> Dict:
        """Optimize spatial data for caching based on zoom level."""
        if 'features' not in data:
            return data

        optimized_features = []
        simplification_tolerance = self._get_simplification_tolerance(zoom_level)

        for feature in data['features']:
            if 'geometry' in feature:
                # Simplify geometry based on zoom level
                geom = shape(feature['geometry'])
                simplified = geom.simplify(simplification_tolerance, preserve_topology=True)

                optimized_feature = {
                    'type': 'Feature',
                    'geometry': simplified.__geo_interface__,
                    'properties': self._filter_properties(feature.get('properties', {}), zoom_level)
                }
                optimized_features.append(optimized_feature)

        return {
            'type': 'FeatureCollection',
            'features': optimized_features
        }

    async def invalidate_spatial_region(self, bbox: Tuple[float, float, float, float],
                                       layer: str = None):
        """Invalidate cache for a spatial region."""
        # Find all cache keys that intersect with the bbox
        pattern = f"spatial:*"
        if layer:
            pattern = f"spatial:*{layer}*"

        # This is a simplified approach - production systems would use
        # spatial indexing of cache keys for efficient invalidation
        keys_to_invalidate = []
        async for key in self.redis_client.scan_iter(pattern):
            if await self._key_intersects_bbox(key, bbox):
                keys_to_invalidate.append(key)

        # Remove from both cache levels
        if keys_to_invalidate:
            await self.redis_client.delete(*keys_to_invalidate)
            for key in keys_to_invalidate:
                if key in self.local_cache.cache:
                    del self.local_cache.cache[key]

class SpatialPreloadManager:
    """Intelligent preloading of spatial data based on usage patterns."""

    def __init__(self, cache_manager: SpatialCacheManager):
        self.cache_manager = cache_manager
        self.usage_patterns = {}
        self.preload_queue = asyncio.Queue()
        self.preload_worker_running = False

    async def track_access_pattern(self, bbox: Tuple[float, float, float, float],
                                  zoom_level: int, layer: str):
        """Track spatial data access patterns for intelligent preloading."""
        pattern_key = f"{layer}:{zoom_level}"

        if pattern_key not in self.usage_patterns:
            self.usage_patterns[pattern_key] = {
                'access_count': 0,
                'bboxes': [],
                'last_access': time.time()
            }

        pattern = self.usage_patterns[pattern_key]
        pattern['access_count'] += 1
        pattern['bboxes'].append(bbox)
        pattern['last_access'] = time.time()

        # Keep only recent bboxes
        if len(pattern['bboxes']) > 1000:
            pattern['bboxes'] = pattern['bboxes'][-500:]

        # Trigger preloading for hot patterns
        if pattern['access_count'] % 10 == 0:
            await self._schedule_preload(pattern_key)

    async def _schedule_preload(self, pattern_key: str):
        """Schedule preloading based on access patterns."""
        if not self.preload_worker_running:
            asyncio.create_task(self._preload_worker())
            self.preload_worker_running = True

        pattern = self.usage_patterns[pattern_key]

        # Predict next likely access areas
        predicted_areas = self._predict_access_areas(pattern['bboxes'])

        for bbox, confidence in predicted_areas:
            if confidence > 0.7:  # High confidence predictions
                layer, zoom_str = pattern_key.split(':')
                zoom_level = int(zoom_str)

                await self.preload_queue.put({
                    'bbox': bbox,
                    'zoom_level': zoom_level,
                    'layer': layer,
                    'priority': confidence
                })

    def _predict_access_areas(self, recent_bboxes: List[Tuple]) -> List[Tuple[Tuple, float]]:
        """Predict likely next access areas based on spatial patterns."""
        if len(recent_bboxes) < 5:
            return []

        predictions = []

        # Simple spatial trend analysis
        recent = recent_bboxes[-10:]  # Last 10 accesses

        # Calculate movement vector
        if len(recent) >= 2:
            start_center = self._bbox_center(recent[0])
            end_center = self._bbox_center(recent[-1])

            dx = end_center[0] - start_center[0]
            dy = end_center[1] - start_center[1]

            # Predict next area based on movement
            if abs(dx) > 0.001 or abs(dy) > 0.001:  # Significant movement
                last_bbox = recent[-1]
                predicted_center = (
                    self._bbox_center(last_bbox)[0] + dx,
                    self._bbox_center(last_bbox)[1] + dy
                )

                bbox_size = (
                    last_bbox[2] - last_bbox[0],
                    last_bbox[3] - last_bbox[1]
                )

                predicted_bbox = (
                    predicted_center[0] - bbox_size[0]/2,
                    predicted_center[1] - bbox_size[1]/2,
                    predicted_center[0] + bbox_size[0]/2,
                    predicted_center[1] + bbox_size[1]/2
                )

                predictions.append((predicted_bbox, 0.8))

        return predictions

    async def _preload_worker(self):
        """Background worker for preloading spatial data."""
        while True:
            try:
                # Wait for preload tasks
                preload_task = await asyncio.wait_for(
                    self.preload_queue.get(), timeout=60.0
                )

                # Check if data is already cached
                cache_key = self.cache_manager._generate_spatial_key(
                    preload_task['bbox'],
                    preload_task['zoom_level'],
                    preload_task['layer']
                )

                existing_data = await self.cache_manager.get_spatial_data(
                    preload_task['bbox'],
                    preload_task['zoom_level'],
                    preload_task['layer']
                )

                if existing_data is None:
                    # Fetch and cache data
                    # This would call your actual data source
                    data = await self._fetch_spatial_data(
                        preload_task['bbox'],
                        preload_task['zoom_level'],
                        preload_task['layer']
                    )

                    if data:
                        await self.cache_manager.set_spatial_data(
                            preload_task['bbox'],
                            preload_task['zoom_level'],
                            preload_task['layer'],
                            data
                        )

            except asyncio.TimeoutError:
                # No preload tasks, worker can exit
                self.preload_worker_running = False
                break
            except Exception as e:
                logger.error(f"Preload worker error: {e}")

Production Monitoring and Observability

Comprehensive Metrics Collection

from prometheus_client import Counter, Histogram, Gauge, Summary
import structlog
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Prometheus metrics for spatial operations
SPATIAL_OPERATIONS_TOTAL = Counter(
    'spatial_operations_total',
    'Total number of spatial operations',
    ['operation_type', 'layer', 'result_status']
)

SPATIAL_QUERY_DURATION = Histogram(
    'spatial_query_duration_seconds',
    'Time spent on spatial queries',
    ['query_type', 'complexity_bucket'],
    buckets=[0.01, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0]
)

SPATIAL_CACHE_OPERATIONS = Counter(
    'spatial_cache_operations_total',
    'Cache operations',
    ['cache_type', 'operation', 'result']
)

SPATIAL_MEMORY_USAGE = Gauge(
    'spatial_memory_usage_bytes',
    'Memory usage for spatial operations',
    ['component']
)

SPATIAL_INDEX_SIZE = Gauge(
    'spatial_index_size_total',
    'Number of entries in spatial indices',
    ['index_type']
)

class SpatialObservabilityManager:
    """Comprehensive observability for spatial systems."""

    def __init__(self):
        self.logger = structlog.get_logger()
        self.tracer = trace.get_tracer(__name__)
        self.active_spans = {}

        # Setup distributed tracing
        trace.set_tracer_provider(TracerProvider())
        jaeger_exporter = JaegerExporter(
            agent_host_name="localhost",
            agent_port=14268,
        )
        span_processor = BatchSpanProcessor(jaeger_exporter)
        trace.get_tracer_provider().add_span_processor(span_processor)

    @contextmanager
    def trace_spatial_operation(self, operation_name: str, 
                               bbox: Tuple = None, layer: str = None):
        """Trace spatial operations with distributed tracing."""
        with self.tracer.start_as_current_span(operation_name) as span:
            # Add spatial context to span
            if bbox:
                span.set_attribute("spatial.bbox.min_x", bbox[0])
                span.set_attribute("spatial.bbox.min_y", bbox[1])
                span.set_attribute("spatial.bbox.max_x", bbox[2])
                span.set_attribute("spatial.bbox.max_y", bbox[3])
                span.set_attribute("spatial.bbox.area", 
                                 (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]))

            if layer:
                span.set_attribute("spatial.layer", layer)

            # Record operation start
            start_time = time.time()
            SPATIAL_OPERATIONS_TOTAL.labels(
                operation_type=operation_name,
                layer=layer or 'unknown',
                result_status='started'
            ).inc()

            try:
                yield span

                # Record successful completion
                SPATIAL_OPERATIONS_TOTAL.labels(
                    operation_type=operation_name,
                    layer=layer or 'unknown',
                    result_status='success'
                ).inc()

            except Exception as e:
                # Record error
                span.record_exception(e)
                span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))

                SPATIAL_OPERATIONS_TOTAL.labels(
                    operation_type=operation_name,
                    layer=layer or 'unknown',
                    result_status='error'
                ).inc()

                raise
            finally:
                # Record timing
                duration = time.time() - start_time
                complexity = self._classify_complexity(bbox, operation_name)

                SPATIAL_QUERY_DURATION.labels(
                    query_type=operation_name,
                    complexity_bucket=complexity
                ).observe(duration)

    def log_spatial_event(self, event_type: str, **context):
        """Log spatial events with structured logging."""
        self.logger.info(
            event_type,
            **context,
            timestamp=time.time()
        )

    def record_cache_operation(self, cache_type: str, operation: str, 
                             result: str, response_time: float = None):
        """Record cache operation metrics."""
        SPATIAL_CACHE_OPERATIONS.labels(
            cache_type=cache_type,
            operation=operation,
            result=result
        ).inc()

        if response_time:
            SPATIAL_QUERY_DURATION.labels(
                query_type=f"cache_{operation}",
                complexity_bucket="simple"
            ).observe(response_time)

    def update_memory_metrics(self, component: str, memory_bytes: int):
        """Update memory usage metrics."""
        SPATIAL_MEMORY_USAGE.labels(component=component).set(memory_bytes)

    def update_index_metrics(self, index_type: str, entry_count: int):
        """Update spatial index metrics."""
        SPATIAL_INDEX_SIZE.labels(index_type=index_type).set(entry_count)

    def _classify_complexity(self, bbox: Tuple, operation: str) -> str:
        """Classify operation complexity for metrics."""
        if not bbox:
            return "unknown"

        area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])

        if area < 0.001:
            return "simple"
        elif area < 0.1:
            return "medium"
        else:
            return "complex"

class SpatialHealthChecker:
    """Health monitoring for spatial system components."""

    def __init__(self, spatial_service, cache_manager, database_pool):
        self.spatial_service = spatial_service
        self.cache_manager = cache_manager
        self.database_pool = database_pool
        self.health_history = []

    async def check_system_health(self) -> Dict[str, Any]:
        """Comprehensive system health check."""
        health_status = {
            'timestamp': time.time(),
            'overall_status': 'healthy',
            'components': {}
        }

        # Check spatial service health
        try:
            service_health = await self._check_spatial_service()
            health_status['components']['spatial_service'] = service_health
        except Exception as e:
            health_status['components']['spatial_service'] = {
                'status': 'unhealthy',
                'error': str(e)
            }
            health_status['overall_status'] = 'degraded'

        # Check cache health
        try:
            cache_health = await self._check_cache_health()
            health_status['components']['cache'] = cache_health
        except Exception as e:
            health_status['components']['cache'] = {
                'status': 'unhealthy',
                'error': str(e)
            }
            health_status['overall_status'] = 'degraded'

        # Check database health
        try:
            db_health = await self._check_database_health()
            health_status['components']['database'] = db_health
        except Exception as e:
            health_status['components']['database'] = {
                'status': 'unhealthy',
                'error': str(e)
            }
            health_status['overall_status'] = 'unhealthy'

        # Store health history
        self.health_history.append(health_status)
        if len(self.health_history) > 100:
            self.health_history = self.health_history[-50:]

        return health_status

    async def _check_spatial_service(self) -> Dict[str, Any]:
        """Check spatial service health."""
        start_time = time.time()

        # Test basic query
        test_bbox = (-122.5, 37.7, -122.4, 37.8)  # San Francisco area
        results = await self.spatial_service.query_bbox(*test_bbox)

        response_time = time.time() - start_time

        return {
            'status': 'healthy' if response_time < 5.0 else 'slow',
            'response_time_seconds': response_time,
            'result_count': len(results) if results else 0
        }

    async def _check_cache_health(self) -> Dict[str, Any]:
        """Check cache system health."""
        # Test cache operations
        test_key = f"health_check_{time.time()}"
        test_data = {'test': True, 'timestamp': time.time()}

        # Write test
        start_time = time.time()
        await self.cache_manager.set_spatial_data(
            (-1, -1, 1, 1), 10, "health_check", test_data, ttl_seconds=60
        )
        write_time = time.time() - start_time

        # Read test
        start_time = time.time()
        retrieved_data = await self.cache_manager.get_spatial_data(
            (-1, -1, 1, 1), 10, "health_check"
        )
        read_time = time.time() - start_time

        # Calculate cache hit rate
        hit_rate = (
            self.cache_manager.cache_stats['hits'] / 
            max(1, self.cache_manager.cache_stats['hits'] + self.cache_manager.cache_stats['misses'])
        )

        return {
            'status': 'healthy' if write_time < 0.1 and read_time < 0.05 else 'slow',
            'write_time_seconds': write_time,
            'read_time_seconds': read_time,
            'hit_rate': hit_rate,
            'data_consistency': retrieved_data == test_data
        }

    async def _check_database_health(self) -> Dict[str, Any]:
        """Check database health."""
        start_time = time.time()

        async with self.database_pool.acquire() as connection:
            # Test simple query
            result = await connection.fetch("SELECT 1 as health_check")

        response_time = time.time() - start_time

        return {
            'status': 'healthy' if response_time < 1.0 else 'slow',
            'response_time_seconds': response_time,
            'connection_pool_size': len(self.database_pool._holders),
            'active_connections': len([h for h in self.database_pool._holders if h._con])
        }

Professional Development Exercises

Exercise 1: Build a Performance Analysis Dashboard

Create a comprehensive performance monitoring system: - Real-time metrics visualization for spatial operations - Performance regression detection and alerting - Resource utilization tracking and capacity planning - Automated performance benchmarking and reporting

Exercise 2: Implement Auto-Scaling for Spatial Workloads

Design an auto-scaling system that: - Monitors spatial query patterns and load distribution - Scales compute resources based on geographic demand - Implements predictive scaling for known traffic patterns - Balances cost optimization with performance requirements

Exercise 3: Create a Multi-Region Caching Strategy

Build a globally distributed caching system: - Geographic cache distribution and replication - Intelligent cache warming based on usage patterns - Cross-region cache invalidation and consistency - Performance optimization for global user bases

Exercise 4: Develop a Chaos Engineering Framework

Implement chaos engineering for spatial systems: - Network partition simulation between geographic regions - Database failover testing with spatial data consistency - Cache failure scenarios and graceful degradation - Load spike simulation and recovery testing

Industry Context and Real-World Applications

Production Performance Requirements

Google Maps Performance Standards: - Tile serving: p99 < 50ms globally - Search queries: p95 < 200ms - Route calculation: p99 < 2 seconds - 99.9% availability with global redundancy

Uber's Spatial Computing Scale: - 15+ billion location updates daily - Sub-second H3 spatial indexing - Real-time driver matching algorithms - 99.99% availability for safety-critical operations

Tesla's Autonomous Driving Requirements: - Real-time HD map processing: < 10ms latency - Sensor fusion with map data: 60+ FPS - Neural network inference: < 5ms - Zero tolerance for safety-critical failures

Enterprise Performance Optimization

Financial Services (Trading Systems): - Geospatial market data: microsecond latency requirements - Regulatory compliance with audit trails - Multi-region disaster recovery - Real-time risk calculations with geographic factors

Logistics and Supply Chain: - Route optimization: millions of calculations per second - Real-time package tracking at global scale - Warehouse automation with spatial optimization - Predictive analytics for demand forecasting

Smart Cities and IoT: - Sensor data processing: millions of updates per minute - Real-time traffic optimization - Emergency response coordination - Energy grid optimization with spatial factors

Resources

Performance Engineering

Monitoring and Observability

Database Performance

Industry Performance Practices

This module provides the foundation for building and operating high-performance, reliable geospatial systems that can scale to serve millions of users while maintaining strict performance and availability requirements.