Sesja 20: Wielomodalne rozwiązania AI i zastosowania brzegowe
Edge AI i multimodal processing
🎯 Cele sesji
- Implementacja multimodal AI systems (text + vision + audio)
- Azure IoT Edge z AI modules
- Real-time processing na edge devices
- Optimization modeli dla edge deployment
🎭 Multimodal AI Applications
Understanding Multimodal AI
MULTIMODAL AI CAPABILITIES:
TEXT + VISION:
- Document analysis z images i text
- Visual question answering
- Image captioning i description
- Chart i diagram interpretation
TEXT + AUDIO:
- Voice assistants z natural conversation
- Audio content analysis z transcription
- Sentiment analysis z voice tone
- Multi-language support
VISION + AUDIO:
- Video content analysis
- Real-time scene understanding
- Activity recognition w video
- Audio-visual synchronization
TEXT + VISION + AUDIO:
- Complete media understanding
- Interactive AI assistants
- Comprehensive content analysis
- Rich user experience interfaces
Multimodal AI Implementation
import cv2
import numpy as np
import asyncio
from concurrent.futures import ThreadPoolExecutor
import queue
import threading
from datetime import datetime
import json
class MultimodalAIProcessor:
def __init__(self, ai_models_config):
self.vision_model = self._load_vision_model(ai_models_config["vision"])
self.audio_model = self._load_audio_model(ai_models_config["audio"])
self.text_model = self._load_text_model(ai_models_config["text"])
# Processing queues dla real-time streams
self.frame_queue = queue.Queue(maxsize=30)
self.audio_queue = queue.Queue(maxsize=100)
self.results_queue = queue.Queue()
# Thread pool dla concurrent processing
self.executor = ThreadPoolExecutor(max_workers=6)
# Processing statistics
self.stats = {
"frames_processed": 0,
"audio_chunks_processed": 0,
"multimodal_fusions": 0,
"average_latency": 0,
"error_count": 0
}
async def start_realtime_multimodal_processing(self):
"""Start real-time multimodal processing system"""
print("🚀 Starting real-time multimodal processing...")
# Start capture threads
video_capture_thread = threading.Thread(
target=self._capture_video_stream,
daemon=True
)
audio_capture_thread = threading.Thread(
target=self._capture_audio_stream,
daemon=True
)
video_capture_thread.start()
audio_capture_thread.start()
# Start processing workers
processing_tasks = [
asyncio.create_task(self._process_video_stream()),
asyncio.create_task(self._process_audio_stream()),
asyncio.create_task(self._fuse_multimodal_results()),
asyncio.create_task(self._handle_processing_results())
]
print("✅ All processing threads started")
# Monitor system health
monitoring_task = asyncio.create_task(self._monitor_system_health())
# Run until stopped
try:
await asyncio.gather(*processing_tasks, monitoring_task)
except KeyboardInterrupt:
print("🛑 Stopping multimodal processing...")
return {
"status": "stopped",
"final_stats": self.stats
}
async def _process_video_stream(self):
"""Process video frames w real-time"""
print("📹 Video processing thread started")
while True:
try:
if not self.frame_queue.empty():
frame_data = self.frame_queue.get(timeout=1)
# Process frame z vision AI
start_time = datetime.utcnow()
vision_result = await self._analyze_frame_async(frame_data)
processing_time = (datetime.utcnow() - start_time).total_seconds() * 1000
# Add metadata i queue result
vision_result.update({
"timestamp": start_time.isoformat(),
"modality": "vision",
"processing_time_ms": processing_time,
"frame_id": frame_data["frame_id"]
})
self.results_queue.put(vision_result)
self.stats["frames_processed"] += 1
await asyncio.sleep(0.1) # Small delay
except queue.Empty:
await asyncio.sleep(0.1)
except Exception as e:
self.stats["error_count"] += 1
print(f"❌ Video processing error: {e}")
await asyncio.sleep(1)
async def _process_audio_stream(self):
"""Process audio chunks w real-time"""
print("🎙️ Audio processing thread started")
audio_buffer = []
while True:
try:
if not self.audio_queue.empty():
audio_chunk = self.audio_queue.get(timeout=1)
audio_buffer.append(audio_chunk)
# Process when buffer reaches optimal size (≈1 second of audio)
if len(audio_buffer) >= 16:
combined_audio = np.concatenate([chunk["data"] for chunk in audio_buffer])
start_time = datetime.utcnow()
# Process audio z speech AI
audio_result = await self._analyze_audio_async(combined_audio)
processing_time = (datetime.utcnow() - start_time).total_seconds() * 1000
# Add metadata
audio_result.update({
"timestamp": start_time.isoformat(),
"modality": "audio",
"processing_time_ms": processing_time,
"chunk_count": len(audio_buffer)
})
self.results_queue.put(audio_result)
self.stats["audio_chunks_processed"] += 1
# Clear buffer
audio_buffer = []
await asyncio.sleep(0.05) # Faster audio processing
except queue.Empty:
await asyncio.sleep(0.1)
except Exception as e:
self.stats["error_count"] += 1
print(f"❌ Audio processing error: {e}")
await asyncio.sleep(1)
async def _fuse_multimodal_results(self):
"""Combine results from different modalities"""
print("🔗 Multimodal fusion thread started")
result_buffer = {
"vision": [],
"audio": [],
"text": []
}
while True:
try:
if not self.results_queue.empty():
result = self.results_queue.get(timeout=1)
modality = result["modality"]
# Add to appropriate buffer
result_buffer[modality].append(result)
# Fuse results when we have recent data from multiple modalities
if (len(result_buffer["vision"]) > 0 and
len(result_buffer["audio"]) > 0):
# Check timestamp alignment (within 2 seconds)
latest_vision = result_buffer["vision"][-1]
latest_audio = result_buffer["audio"][-1]
vision_time = datetime.fromisoformat(latest_vision["timestamp"].replace('Z', '+00:00'))
audio_time = datetime.fromisoformat(latest_audio["timestamp"].replace('Z', '+00:00'))
time_diff = abs((vision_time - audio_time).total_seconds())
if time_diff <= 2.0: # Within 2 seconds
# Create fused understanding
fused_result = await self._create_multimodal_understanding(
latest_vision, latest_audio
)
# Send dla decision making
await self._handle_fused_result(fused_result)
self.stats["multimodal_fusions"] += 1
# Keep only recent results w buffer
result_buffer["vision"] = result_buffer["vision"][-5:]
result_buffer["audio"] = result_buffer["audio"][-5:]
await asyncio.sleep(0.1)
except queue.Empty:
await asyncio.sleep(0.2)
except Exception as e:
self.stats["error_count"] += 1
print(f"❌ Fusion processing error: {e}")
await asyncio.sleep(1)
async def _create_multimodal_understanding(self, vision_data, audio_data):
"""Create integrated understanding from multiple modalities"""
# Extract key information from each modality
vision_objects = vision_data.get("detected_objects", [])
vision_scene = vision_data.get("scene_description", "")
audio_transcript = audio_data.get("transcript", "")
audio_sentiment = audio_data.get("sentiment", {})
# Use LLM dla cross-modal reasoning
multimodal_prompt = f"""
Analyze the following simultaneous information from different sources and provide integrated insights:
VISUAL SCENE: {vision_scene}
DETECTED OBJECTS: {', '.join([obj['name'] for obj in vision_objects])}
AUDIO TRANSCRIPT: {audio_transcript}
AUDIO SENTIMENT: {audio_sentiment.get('label', 'neutral')} (confidence: {audio_sentiment.get('confidence', 0):.2f})
Provide integrated analysis:
1. Correlation between visual i audio content
2. Overall scene understanding
3. Detected activities or events
4. Confidence assessment dla integrated understanding
5. Recommended actions based na analysis
Format response jako structured JSON.
"""
# Send to text model dla analysis
integrated_analysis = await self.text_model.generate_response(multimodal_prompt)
return {
"timestamp": datetime.utcnow().isoformat(),
"input_modalities": ["vision", "audio"],
"vision_input": vision_data,
"audio_input": audio_data,
"integrated_analysis": integrated_analysis["content"],
"confidence": self._calculate_multimodal_confidence(vision_data, audio_data),
"correlation_strength": self._assess_cross_modal_correlation(vision_data, audio_data)
}
🖥️ Edge Computing dla AI
Azure IoT Edge z AI Modules
import onnxruntime as ort
import json
from azure.iot.device import IoTHubDeviceClient, Message
import asyncio
import time
class EdgeAIModule:
def __init__(self, connection_string, model_configs):
self.device_client = IoTHubDeviceClient.create_from_connection_string(connection_string)
self.models = {}
self.processing_queue = asyncio.Queue()
# Load optimized models dla edge
for model_name, config in model_configs.items():
self.models[model_name] = ort.InferenceSession(config["model_path"])
print(f"📱 Loaded edge model: {model_name}")
self.edge_stats = {
"total_inferences": 0,
"average_latency": 0,
"successful_cloud_sends": 0,
"local_cache_hits": 0
}
async def start_edge_processing(self):
"""Start edge AI processing loop"""
await self.device_client.connect()
print("🔗 Connected to IoT Hub")
# Setup message handlers dla cloud communication
self.device_client.on_message_received = self._handle_cloud_message
# Start processing tasks
processing_tasks = [
asyncio.create_task(self._capture_sensor_data()),
asyncio.create_task(self._process_ai_inference()),
asyncio.create_task(self._manage_cloud_communication()),
asyncio.create_task(self._monitor_edge_health())
]
try:
await asyncio.gather(*processing_tasks)
except KeyboardInterrupt:
print("🛑 Stopping edge processing...")
await self.device_client.disconnect()
async def _process_ai_inference(self):
"""Process AI inference na edge device"""
while True:
try:
# Get data from processing queue
sensor_data = await self.processing_queue.get()
start_time = time.time()
# Determine appropriate model based na data type
if sensor_data["type"] == "image":
result = await self._run_vision_inference(sensor_data["data"])
elif sensor_data["type"] == "audio":
result = await self._run_audio_inference(sensor_data["data"])
elif sensor_data["type"] == "sensor_reading":
result = await self._run_sensor_inference(sensor_data["data"])
else:
continue
processing_latency = (time.time() - start_time) * 1000 # ms
# Add edge processing metadata
edge_result = {
"device_id": "edge-device-001",
"timestamp": datetime.utcnow().isoformat(),
"input_type": sensor_data["type"],
"ai_result": result,
"edge_latency_ms": processing_latency,
"confidence": result.get("confidence", 0),
"local_processing": True
}
# Update statistics
self.edge_stats["total_inferences"] += 1
self._update_average_latency(processing_latency)
# Decide whether to send do cloud
if self._should_send_to_cloud(edge_result):
await self._send_to_cloud(edge_result)
# Local action if needed
await self._handle_local_action(edge_result)
except Exception as e:
print(f"❌ Edge inference error: {e}")
await asyncio.sleep(1)
async def _run_vision_inference(self, image_data):
"""Run optimized vision inference na edge"""
# Preprocess image dla edge model
preprocessed_image = self._preprocess_image_for_edge(image_data)
# Run inference
if "object_detection" in self.models:
# Object detection inference
inputs = {self.models["object_detection"].get_inputs()[0].name: preprocessed_image}
outputs = self.models["object_detection"].run(None, inputs)
# Post-process results
detected_objects = self._postprocess_detection_outputs(outputs)
return {
"task": "object_detection",
"detected_objects": detected_objects,
"confidence": self._calculate_detection_confidence(detected_objects)
}
return {"task": "vision", "result": "no_model_available"}
def _preprocess_image_for_edge(self, image_data):
"""Optimize image preprocessing dla edge devices"""
# Convert to numpy array if needed
if isinstance(image_data, bytes):
# Decode image from bytes
import cv2
nparr = np.frombuffer(image_data, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
else:
image = image_data
# Resize do model expected input size
target_size = (640, 640) # Common dla edge object detection
resized_image = cv2.resize(image, target_size)
# Normalize
normalized_image = resized_image.astype(np.float32) / 255.0
# Add batch dimension
batched_image = np.expand_dims(normalized_image, axis=0)
# Transpose dla model format (NCHW)
final_image = np.transpose(batched_image, (0, 3, 1, 2))
return final_image
def _should_send_to_cloud(self, edge_result):
"""Determine if result should be sent do cloud"""
# Send to cloud if:
# 1. High confidence detection (>0.8)
# 2. Unusual/anomalous result
# 3. Error condition
# 4. Periodic sync (every 100 inferences)
confidence = edge_result.get("confidence", 0)
# High confidence results
if confidence > 0.8:
return True
# Periodic sync
if self.edge_stats["total_inferences"] % 100 == 0:
return True
# Error conditions
if "error" in edge_result:
return True
return False
async def _send_to_cloud(self, edge_result):
"""Send significant results do cloud dla further processing"""
try:
# Prepare message dla IoT Hub
message_data = {
"deviceId": "edge-device-001",
"timestamp": edge_result["timestamp"],
"aiResult": edge_result["ai_result"],
"edgeLatency": edge_result["edge_latency_ms"],
"confidence": edge_result["confidence"],
"inputType": edge_result["input_type"]
}
message = Message(json.dumps(message_data))
message.message_id = f"edge-ai-{int(time.time())}"
message.correlation_id = "multimodal-processing"
message.custom_properties["result_type"] = "ai_inference"
message.custom_properties["confidence_level"] = "high" if edge_result["confidence"] > 0.8 else "medium"
# Send to cloud
await self.device_client.send_message(message)
self.edge_stats["successful_cloud_sends"] += 1
print(f"☁️ Sent result do cloud (confidence: {edge_result['confidence']:.2f})")
except Exception as e:
print(f"❌ Failed to send to cloud: {e}")
Edge Model Optimization
import onnx
from onnxruntime.quantization import quantize_dynamic, QuantType
import torch
class EdgeModelOptimizer:
def __init__(self):
self.optimization_techniques = [
"quantization",
"pruning",
"knowledge_distillation",
"model_compression"
]
def optimize_for_edge_deployment(self, model_path, target_device="cpu", optimization_level="balanced"):
"""Comprehensive edge optimization"""
print(f"🔧 Optimizing model dla edge deployment...")
print(f" Target device: {target_device}")
print(f" Optimization level: {optimization_level}")
original_model = onnx.load(model_path)
current_model_path = model_path
optimizations_applied = []
# Apply quantization
if optimization_level in ["aggressive", "balanced"]:
print("🔄 Applying quantization...")
quantized_path = model_path.replace(".onnx", "_quantized.onnx")
try:
quantize_dynamic(
current_model_path,
quantized_path,
weight_type=QuantType.QUInt8
)
current_model_path = quantized_path
optimizations_applied.append("int8_quantization")
print("✅ Quantization applied")
except Exception as e:
print(f"⚠️ Quantization failed: {e}")
# Model pruning (simplified example)
if optimization_level == "aggressive":
print("🔄 Applying model pruning...")
# W rzeczywistości implementowałby structured pruning
optimizations_applied.append("pruning")
# Benchmark performance
print("📊 Benchmarking optimized model...")
performance_metrics = self._benchmark_edge_performance(
original_path=model_path,
optimized_path=current_model_path,
target_device=target_device
)
optimization_summary = {
"optimized_model_path": current_model_path,
"optimizations_applied": optimizations_applied,
"performance_improvement": performance_metrics,
"model_size_reduction_percent": self._calculate_size_reduction_percent(
model_path, current_model_path
),
"target_device": target_device,
"optimization_level": optimization_level
}
print(f"✅ Edge optimization completed:")
print(f" Size reduction: {optimization_summary['model_size_reduction_percent']:.1f}%")
print(f" Latency improvement: {performance_metrics.get('speedup_factor', 1):.2f}x")
return optimization_summary
def _benchmark_edge_performance(self, original_path, optimized_path, target_device):
"""Benchmark performance improvement"""
import time
# Load both models
original_session = ort.InferenceSession(original_path)
optimized_session = ort.InferenceSession(optimized_path)
# Generate test input (appropriate dla model)
input_shape = original_session.get_inputs()[0].shape
if len(input_shape) == 4: # Image model (NCHW)
test_input = np.random.randn(1, 3, 224, 224).astype(np.float32)
else: # Other model types
test_input = np.random.randn(*input_shape).astype(np.float32)
input_name = original_session.get_inputs()[0].name
# Benchmark original model (warm-up + measurement)
for _ in range(5): # Warm-up
original_session.run(None, {input_name: test_input})
original_times = []
for _ in range(50): # Measurement runs
start = time.time()
original_session.run(None, {input_name: test_input})
original_times.append((time.time() - start) * 1000) # ms
# Benchmark optimized model
for _ in range(5): # Warm-up
optimized_session.run(None, {input_name: test_input})
optimized_times = []
for _ in range(50): # Measurement runs
start = time.time()
optimized_session.run(None, {input_name: test_input})
optimized_times.append((time.time() - start) * 1000) # ms
# Calculate performance metrics
orig_avg = np.mean(original_times)
opt_avg = np.mean(optimized_times)
return {
"original_avg_latency_ms": orig_avg,
"optimized_avg_latency_ms": opt_avg,
"speedup_factor": orig_avg / opt_avg,
"latency_reduction_percent": ((orig_avg - opt_avg) / orig_avg) * 100,
"original_std": np.std(original_times),
"optimized_std": np.std(optimized_times)
}
✅ Zadania praktyczne
Zadanie 1: Multimodal System (60 min)
- Zaimplementuj real-time multimodal processor
- Połącz vision + audio + text analysis
- Stwórz fusion algorithm dla combined understanding
- Przetestuj z live camera/microphone
Zadanie 2: Edge Optimization (30 min)
- Zoptymalizuj model dla edge deployment
- Implement ONNX quantization
- Benchmark performance improvements
- Deploy na edge device/simulator
Zadanie 3: IoT Integration (20 min)
- Setup Azure IoT Edge environment
- Deploy AI module do edge device
- Configure cloud communication
- Test bi-directional data flow
Zadanie 4: Real-time Dashboard (10 min)
- Create monitoring dashboard dla edge devices
- Show real-time AI inference results
- Display performance metrics
- Add alerting dla anomalies
📊 Kryteria oceny
Technical Implementation (50 punktów)
- Działający multimodal system (20 pkt)
- Edge optimization working (15 pkt)
- IoT integration functional (15 pkt)
Performance (30 punktów)
- Real-time processing capability (15 pkt)
- Edge latency optimization (15 pkt)
Innovation (20 punktów)
- Creative multimodal applications (10 pkt)
- Advanced edge features (10 pkt)
🏆 Rezultat sesji
Po ukończeniu uczestnicy będą mieli:
- Multimodal AI system - text + vision + audio
- Edge deployment capability - optimized dla IoT
- Real-time processing - low-latency inference
- IoT integration experience - Azure IoT Edge expertise