Sesja 20: Wielomodalne rozwiązania AI i zastosowania brzegowe

Edge AI i multimodal processing

🎯 Cele sesji

  • Implementacja multimodal AI systems (text + vision + audio)
  • Azure IoT Edge z AI modules
  • Real-time processing na edge devices
  • Optimization modeli dla edge deployment

🎭 Multimodal AI Applications

Understanding Multimodal AI

MULTIMODAL AI CAPABILITIES:

TEXT + VISION:
- Document analysis z images i text
- Visual question answering  
- Image captioning i description
- Chart i diagram interpretation

TEXT + AUDIO:
- Voice assistants z natural conversation
- Audio content analysis z transcription
- Sentiment analysis z voice tone
- Multi-language support

VISION + AUDIO:
- Video content analysis
- Real-time scene understanding
- Activity recognition w video
- Audio-visual synchronization

TEXT + VISION + AUDIO:
- Complete media understanding
- Interactive AI assistants
- Comprehensive content analysis
- Rich user experience interfaces

Multimodal AI Implementation

import cv2
import numpy as np
import asyncio
from concurrent.futures import ThreadPoolExecutor
import queue
import threading
from datetime import datetime
import json

class MultimodalAIProcessor:
    def __init__(self, ai_models_config):
        self.vision_model = self._load_vision_model(ai_models_config["vision"])
        self.audio_model = self._load_audio_model(ai_models_config["audio"])
        self.text_model = self._load_text_model(ai_models_config["text"])
        
        # Processing queues dla real-time streams
        self.frame_queue = queue.Queue(maxsize=30)
        self.audio_queue = queue.Queue(maxsize=100)
        self.results_queue = queue.Queue()
        
        # Thread pool dla concurrent processing
        self.executor = ThreadPoolExecutor(max_workers=6)
        
        # Processing statistics
        self.stats = {
            "frames_processed": 0,
            "audio_chunks_processed": 0,
            "multimodal_fusions": 0,
            "average_latency": 0,
            "error_count": 0
        }
    
    async def start_realtime_multimodal_processing(self):
        """Start real-time multimodal processing system"""
        
        print("🚀 Starting real-time multimodal processing...")
        
        # Start capture threads
        video_capture_thread = threading.Thread(
            target=self._capture_video_stream,
            daemon=True
        )
        audio_capture_thread = threading.Thread(
            target=self._capture_audio_stream,
            daemon=True
        )
        
        video_capture_thread.start()
        audio_capture_thread.start()
        
        # Start processing workers
        processing_tasks = [
            asyncio.create_task(self._process_video_stream()),
            asyncio.create_task(self._process_audio_stream()),
            asyncio.create_task(self._fuse_multimodal_results()),
            asyncio.create_task(self._handle_processing_results())
        ]
        
        print("✅ All processing threads started")
        
        # Monitor system health
        monitoring_task = asyncio.create_task(self._monitor_system_health())
        
        # Run until stopped
        try:
            await asyncio.gather(*processing_tasks, monitoring_task)
        except KeyboardInterrupt:
            print("🛑 Stopping multimodal processing...")
            
        return {
            "status": "stopped",
            "final_stats": self.stats
        }
    
    async def _process_video_stream(self):
        """Process video frames w real-time"""
        
        print("📹 Video processing thread started")
        
        while True:
            try:
                if not self.frame_queue.empty():
                    frame_data = self.frame_queue.get(timeout=1)
                    
                    # Process frame z vision AI
                    start_time = datetime.utcnow()
                    
                    vision_result = await self._analyze_frame_async(frame_data)
                    
                    processing_time = (datetime.utcnow() - start_time).total_seconds() * 1000
                    
                    # Add metadata i queue result
                    vision_result.update({
                        "timestamp": start_time.isoformat(),
                        "modality": "vision",
                        "processing_time_ms": processing_time,
                        "frame_id": frame_data["frame_id"]
                    })
                    
                    self.results_queue.put(vision_result)
                    self.stats["frames_processed"] += 1
                    
                await asyncio.sleep(0.1)  # Small delay
                    
            except queue.Empty:
                await asyncio.sleep(0.1)
            except Exception as e:
                self.stats["error_count"] += 1
                print(f"❌ Video processing error: {e}")
                await asyncio.sleep(1)
    
    async def _process_audio_stream(self):
        """Process audio chunks w real-time"""
        
        print("🎙️ Audio processing thread started")
        
        audio_buffer = []
        
        while True:
            try:
                if not self.audio_queue.empty():
                    audio_chunk = self.audio_queue.get(timeout=1)
                    audio_buffer.append(audio_chunk)
                    
                    # Process when buffer reaches optimal size (≈1 second of audio)
                    if len(audio_buffer) >= 16:
                        combined_audio = np.concatenate([chunk["data"] for chunk in audio_buffer])
                        
                        start_time = datetime.utcnow()
                        
                        # Process audio z speech AI
                        audio_result = await self._analyze_audio_async(combined_audio)
                        
                        processing_time = (datetime.utcnow() - start_time).total_seconds() * 1000
                        
                        # Add metadata
                        audio_result.update({
                            "timestamp": start_time.isoformat(),
                            "modality": "audio",
                            "processing_time_ms": processing_time,
                            "chunk_count": len(audio_buffer)
                        })
                        
                        self.results_queue.put(audio_result)
                        self.stats["audio_chunks_processed"] += 1
                        
                        # Clear buffer
                        audio_buffer = []
                
                await asyncio.sleep(0.05)  # Faster audio processing
                        
            except queue.Empty:
                await asyncio.sleep(0.1)
            except Exception as e:
                self.stats["error_count"] += 1
                print(f"❌ Audio processing error: {e}")
                await asyncio.sleep(1)
    
    async def _fuse_multimodal_results(self):
        """Combine results from different modalities"""
        
        print("🔗 Multimodal fusion thread started")
        
        result_buffer = {
            "vision": [],
            "audio": [],
            "text": []
        }
        
        while True:
            try:
                if not self.results_queue.empty():
                    result = self.results_queue.get(timeout=1)
                    modality = result["modality"]
                    
                    # Add to appropriate buffer
                    result_buffer[modality].append(result)
                    
                    # Fuse results when we have recent data from multiple modalities
                    if (len(result_buffer["vision"]) > 0 and 
                        len(result_buffer["audio"]) > 0):
                        
                        # Check timestamp alignment (within 2 seconds)
                        latest_vision = result_buffer["vision"][-1]
                        latest_audio = result_buffer["audio"][-1]
                        
                        vision_time = datetime.fromisoformat(latest_vision["timestamp"].replace('Z', '+00:00'))
                        audio_time = datetime.fromisoformat(latest_audio["timestamp"].replace('Z', '+00:00'))
                        
                        time_diff = abs((vision_time - audio_time).total_seconds())
                        
                        if time_diff <= 2.0:  # Within 2 seconds
                            # Create fused understanding
                            fused_result = await self._create_multimodal_understanding(
                                latest_vision, latest_audio
                            )
                            
                            # Send dla decision making
                            await self._handle_fused_result(fused_result)
                            
                            self.stats["multimodal_fusions"] += 1
                            
                            # Keep only recent results w buffer
                            result_buffer["vision"] = result_buffer["vision"][-5:]
                            result_buffer["audio"] = result_buffer["audio"][-5:]
                
                await asyncio.sleep(0.1)
                        
            except queue.Empty:
                await asyncio.sleep(0.2)
            except Exception as e:
                self.stats["error_count"] += 1
                print(f"❌ Fusion processing error: {e}")
                await asyncio.sleep(1)
    
    async def _create_multimodal_understanding(self, vision_data, audio_data):
        """Create integrated understanding from multiple modalities"""
        
        # Extract key information from each modality
        vision_objects = vision_data.get("detected_objects", [])
        vision_scene = vision_data.get("scene_description", "")
        audio_transcript = audio_data.get("transcript", "")
        audio_sentiment = audio_data.get("sentiment", {})
        
        # Use LLM dla cross-modal reasoning
        multimodal_prompt = f"""
Analyze the following simultaneous information from different sources and provide integrated insights:

VISUAL SCENE: {vision_scene}
DETECTED OBJECTS: {', '.join([obj['name'] for obj in vision_objects])}
AUDIO TRANSCRIPT: {audio_transcript}
AUDIO SENTIMENT: {audio_sentiment.get('label', 'neutral')} (confidence: {audio_sentiment.get('confidence', 0):.2f})

Provide integrated analysis:
1. Correlation between visual i audio content
2. Overall scene understanding
3. Detected activities or events
4. Confidence assessment dla integrated understanding
5. Recommended actions based na analysis

Format response jako structured JSON.
"""
        
        # Send to text model dla analysis
        integrated_analysis = await self.text_model.generate_response(multimodal_prompt)
        
        return {
            "timestamp": datetime.utcnow().isoformat(),
            "input_modalities": ["vision", "audio"],
            "vision_input": vision_data,
            "audio_input": audio_data,
            "integrated_analysis": integrated_analysis["content"],
            "confidence": self._calculate_multimodal_confidence(vision_data, audio_data),
            "correlation_strength": self._assess_cross_modal_correlation(vision_data, audio_data)
        }

🖥️ Edge Computing dla AI

Azure IoT Edge z AI Modules

import onnxruntime as ort
import json
from azure.iot.device import IoTHubDeviceClient, Message
import asyncio
import time

class EdgeAIModule:
    def __init__(self, connection_string, model_configs):
        self.device_client = IoTHubDeviceClient.create_from_connection_string(connection_string)
        self.models = {}
        self.processing_queue = asyncio.Queue()
        
        # Load optimized models dla edge
        for model_name, config in model_configs.items():
            self.models[model_name] = ort.InferenceSession(config["model_path"])
            print(f"📱 Loaded edge model: {model_name}")
        
        self.edge_stats = {
            "total_inferences": 0,
            "average_latency": 0,
            "successful_cloud_sends": 0,
            "local_cache_hits": 0
        }
    
    async def start_edge_processing(self):
        """Start edge AI processing loop"""
        
        await self.device_client.connect()
        print("🔗 Connected to IoT Hub")
        
        # Setup message handlers dla cloud communication
        self.device_client.on_message_received = self._handle_cloud_message
        
        # Start processing tasks
        processing_tasks = [
            asyncio.create_task(self._capture_sensor_data()),
            asyncio.create_task(self._process_ai_inference()),
            asyncio.create_task(self._manage_cloud_communication()),
            asyncio.create_task(self._monitor_edge_health())
        ]
        
        try:
            await asyncio.gather(*processing_tasks)
        except KeyboardInterrupt:
            print("🛑 Stopping edge processing...")
            await self.device_client.disconnect()
    
    async def _process_ai_inference(self):
        """Process AI inference na edge device"""
        
        while True:
            try:
                # Get data from processing queue
                sensor_data = await self.processing_queue.get()
                
                start_time = time.time()
                
                # Determine appropriate model based na data type
                if sensor_data["type"] == "image":
                    result = await self._run_vision_inference(sensor_data["data"])
                elif sensor_data["type"] == "audio":
                    result = await self._run_audio_inference(sensor_data["data"])
                elif sensor_data["type"] == "sensor_reading":
                    result = await self._run_sensor_inference(sensor_data["data"])
                else:
                    continue
                
                processing_latency = (time.time() - start_time) * 1000  # ms
                
                # Add edge processing metadata
                edge_result = {
                    "device_id": "edge-device-001",
                    "timestamp": datetime.utcnow().isoformat(),
                    "input_type": sensor_data["type"],
                    "ai_result": result,
                    "edge_latency_ms": processing_latency,
                    "confidence": result.get("confidence", 0),
                    "local_processing": True
                }
                
                # Update statistics
                self.edge_stats["total_inferences"] += 1
                self._update_average_latency(processing_latency)
                
                # Decide whether to send do cloud
                if self._should_send_to_cloud(edge_result):
                    await self._send_to_cloud(edge_result)
                
                # Local action if needed
                await self._handle_local_action(edge_result)
                
            except Exception as e:
                print(f"❌ Edge inference error: {e}")
                await asyncio.sleep(1)
    
    async def _run_vision_inference(self, image_data):
        """Run optimized vision inference na edge"""
        
        # Preprocess image dla edge model
        preprocessed_image = self._preprocess_image_for_edge(image_data)
        
        # Run inference
        if "object_detection" in self.models:
            # Object detection inference
            inputs = {self.models["object_detection"].get_inputs()[0].name: preprocessed_image}
            outputs = self.models["object_detection"].run(None, inputs)
            
            # Post-process results
            detected_objects = self._postprocess_detection_outputs(outputs)
            
            return {
                "task": "object_detection",
                "detected_objects": detected_objects,
                "confidence": self._calculate_detection_confidence(detected_objects)
            }
        
        return {"task": "vision", "result": "no_model_available"}
    
    def _preprocess_image_for_edge(self, image_data):
        """Optimize image preprocessing dla edge devices"""
        
        # Convert to numpy array if needed
        if isinstance(image_data, bytes):
            # Decode image from bytes
            import cv2
            nparr = np.frombuffer(image_data, np.uint8)
            image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
        else:
            image = image_data
        
        # Resize do model expected input size
        target_size = (640, 640)  # Common dla edge object detection
        resized_image = cv2.resize(image, target_size)
        
        # Normalize
        normalized_image = resized_image.astype(np.float32) / 255.0
        
        # Add batch dimension
        batched_image = np.expand_dims(normalized_image, axis=0)
        
        # Transpose dla model format (NCHW)
        final_image = np.transpose(batched_image, (0, 3, 1, 2))
        
        return final_image
    
    def _should_send_to_cloud(self, edge_result):
        """Determine if result should be sent do cloud"""
        
        # Send to cloud if:
        # 1. High confidence detection (>0.8)
        # 2. Unusual/anomalous result
        # 3. Error condition
        # 4. Periodic sync (every 100 inferences)
        
        confidence = edge_result.get("confidence", 0)
        
        # High confidence results
        if confidence > 0.8:
            return True
        
        # Periodic sync
        if self.edge_stats["total_inferences"] % 100 == 0:
            return True
        
        # Error conditions
        if "error" in edge_result:
            return True
        
        return False
    
    async def _send_to_cloud(self, edge_result):
        """Send significant results do cloud dla further processing"""
        
        try:
            # Prepare message dla IoT Hub
            message_data = {
                "deviceId": "edge-device-001",
                "timestamp": edge_result["timestamp"],
                "aiResult": edge_result["ai_result"],
                "edgeLatency": edge_result["edge_latency_ms"],
                "confidence": edge_result["confidence"],
                "inputType": edge_result["input_type"]
            }
            
            message = Message(json.dumps(message_data))
            message.message_id = f"edge-ai-{int(time.time())}"
            message.correlation_id = "multimodal-processing"
            message.custom_properties["result_type"] = "ai_inference"
            message.custom_properties["confidence_level"] = "high" if edge_result["confidence"] > 0.8 else "medium"
            
            # Send to cloud
            await self.device_client.send_message(message)
            
            self.edge_stats["successful_cloud_sends"] += 1
            print(f"☁️ Sent result do cloud (confidence: {edge_result['confidence']:.2f})")
            
        except Exception as e:
            print(f"❌ Failed to send to cloud: {e}")

Edge Model Optimization

import onnx
from onnxruntime.quantization import quantize_dynamic, QuantType
import torch

class EdgeModelOptimizer:
    def __init__(self):
        self.optimization_techniques = [
            "quantization",
            "pruning",
            "knowledge_distillation",
            "model_compression"
        ]
    
    def optimize_for_edge_deployment(self, model_path, target_device="cpu", optimization_level="balanced"):
        """Comprehensive edge optimization"""
        
        print(f"🔧 Optimizing model dla edge deployment...")
        print(f"   Target device: {target_device}")
        print(f"   Optimization level: {optimization_level}")
        
        original_model = onnx.load(model_path)
        current_model_path = model_path
        optimizations_applied = []
        
        # Apply quantization
        if optimization_level in ["aggressive", "balanced"]:
            print("🔄 Applying quantization...")
            quantized_path = model_path.replace(".onnx", "_quantized.onnx")
            
            try:
                quantize_dynamic(
                    current_model_path, 
                    quantized_path,
                    weight_type=QuantType.QUInt8
                )
                current_model_path = quantized_path
                optimizations_applied.append("int8_quantization")
                print("✅ Quantization applied")
                
            except Exception as e:
                print(f"⚠️ Quantization failed: {e}")
        
        # Model pruning (simplified example)
        if optimization_level == "aggressive":
            print("🔄 Applying model pruning...")
            # W rzeczywistości implementowałby structured pruning
            optimizations_applied.append("pruning")
        
        # Benchmark performance
        print("📊 Benchmarking optimized model...")
        performance_metrics = self._benchmark_edge_performance(
            original_path=model_path,
            optimized_path=current_model_path,
            target_device=target_device
        )
        
        optimization_summary = {
            "optimized_model_path": current_model_path,
            "optimizations_applied": optimizations_applied,
            "performance_improvement": performance_metrics,
            "model_size_reduction_percent": self._calculate_size_reduction_percent(
                model_path, current_model_path
            ),
            "target_device": target_device,
            "optimization_level": optimization_level
        }
        
        print(f"✅ Edge optimization completed:")
        print(f"   Size reduction: {optimization_summary['model_size_reduction_percent']:.1f}%")
        print(f"   Latency improvement: {performance_metrics.get('speedup_factor', 1):.2f}x")
        
        return optimization_summary
    
    def _benchmark_edge_performance(self, original_path, optimized_path, target_device):
        """Benchmark performance improvement"""
        
        import time
        
        # Load both models
        original_session = ort.InferenceSession(original_path)
        optimized_session = ort.InferenceSession(optimized_path)
        
        # Generate test input (appropriate dla model)
        input_shape = original_session.get_inputs()[0].shape
        if len(input_shape) == 4:  # Image model (NCHW)
            test_input = np.random.randn(1, 3, 224, 224).astype(np.float32)
        else:  # Other model types
            test_input = np.random.randn(*input_shape).astype(np.float32)
        
        input_name = original_session.get_inputs()[0].name
        
        # Benchmark original model (warm-up + measurement)
        for _ in range(5):  # Warm-up
            original_session.run(None, {input_name: test_input})
        
        original_times = []
        for _ in range(50):  # Measurement runs
            start = time.time()
            original_session.run(None, {input_name: test_input})
            original_times.append((time.time() - start) * 1000)  # ms
        
        # Benchmark optimized model
        for _ in range(5):  # Warm-up
            optimized_session.run(None, {input_name: test_input})
        
        optimized_times = []
        for _ in range(50):  # Measurement runs
            start = time.time()
            optimized_session.run(None, {input_name: test_input})
            optimized_times.append((time.time() - start) * 1000)  # ms
        
        # Calculate performance metrics
        orig_avg = np.mean(original_times)
        opt_avg = np.mean(optimized_times)
        
        return {
            "original_avg_latency_ms": orig_avg,
            "optimized_avg_latency_ms": opt_avg,
            "speedup_factor": orig_avg / opt_avg,
            "latency_reduction_percent": ((orig_avg - opt_avg) / orig_avg) * 100,
            "original_std": np.std(original_times),
            "optimized_std": np.std(optimized_times)
        }

✅ Zadania praktyczne

Zadanie 1: Multimodal System (60 min)

  1. Zaimplementuj real-time multimodal processor
  2. Połącz vision + audio + text analysis
  3. Stwórz fusion algorithm dla combined understanding
  4. Przetestuj z live camera/microphone

Zadanie 2: Edge Optimization (30 min)

  1. Zoptymalizuj model dla edge deployment
  2. Implement ONNX quantization
  3. Benchmark performance improvements
  4. Deploy na edge device/simulator

Zadanie 3: IoT Integration (20 min)

  1. Setup Azure IoT Edge environment
  2. Deploy AI module do edge device
  3. Configure cloud communication
  4. Test bi-directional data flow

Zadanie 4: Real-time Dashboard (10 min)

  1. Create monitoring dashboard dla edge devices
  2. Show real-time AI inference results
  3. Display performance metrics
  4. Add alerting dla anomalies

📊 Kryteria oceny

Technical Implementation (50 punktów)

  • Działający multimodal system (20 pkt)
  • Edge optimization working (15 pkt)
  • IoT integration functional (15 pkt)

Performance (30 punktów)

  • Real-time processing capability (15 pkt)
  • Edge latency optimization (15 pkt)

Innovation (20 punktów)

  • Creative multimodal applications (10 pkt)
  • Advanced edge features (10 pkt)

🏆 Rezultat sesji

Po ukończeniu uczestnicy będą mieli:

  1. Multimodal AI system - text + vision + audio
  2. Edge deployment capability - optimized dla IoT
  3. Real-time processing - low-latency inference
  4. IoT integration experience - Azure IoT Edge expertise

📚 Materiały dodatkowe

💡 Wskazówka

Każda sesja to 2 godziny intensywnej nauki z praktycznymi ćwiczeniami. Materiały można przeglądać w dowolnym tempie.

📈 Postęp

Śledź swój postęp w nauce AI i przygotowaniu do certyfikacji Azure AI-102. Każdy moduł buduje na poprzednim.