Sesja 9: Warsztaty - Implementacja systemu transkrypcji i rozpoznawania obrazów

Multimodalny system AI w praktyce

🎯 Cele warsztatów

  • Implementacja kompletnego systemu transkrypcji audio/wideo
  • Integracja rozpoznawania obrazów z przetwarzaniem mowy
  • Budowa multimodalnej aplikacji AI
  • Deployment i testowanie w środowisku production-like

🛠️ Projekt warsztatowy

System multimodalnej analizy treści

Scenariusz biznesowy: Firma organizuje regularne spotkania, webinary i prezentacje. Potrzebuje systemu do automatycznego:

  • Transkrypcji nagrań audio/wideo
  • Rozpoznawania obiektów i tekstu w prezentacjach
  • Generowania podsumowań i insights
  • Przeszukiwania w archiwum treści

Architektura systemu:

[UPLOAD AUDIO/VIDEO] → [CONTENT ROUTER] → [SPECIALIZED PROCESSORS]
                                              ↓
[SPEECH PROCESSOR] ← [ORCHESTRATOR] → [VISION PROCESSOR]
        ↓              ↓                    ↓
[TEXT ANALYTICS] ← [RESULT FUSION] → [OCR PROCESSOR]
                        ↓
[STRUCTURED OUTPUT] → [API/UI] → [USER RESULTS]

💻 Implementacja end-to-end

Główna klasa orchestratora

import asyncio
import tempfile
import uuid
from datetime import datetime
from typing import Dict, List, Optional, Any
import json

class MultimodalContentProcessor:
    def __init__(self, config):
        self.speech_processor = AzureSpeechProcessor(
            config["speech_key"], 
            config["speech_region"]
        )
        self.vision_analyzer = AzureVisionAnalyzer(
            config["vision_key"],
            config["vision_endpoint"]
        )
        self.processing_stats = {
            "total_processed": 0,
            "successful": 0,
            "failed": 0,
            "average_processing_time": 0
        }
    
    async def process_multimedia_content(self, content_info: Dict) -> Dict:
        """Główna metoda przetwarzania treści multimodalnych"""
        
        processing_id = str(uuid.uuid4())
        start_time = datetime.utcnow()
        
        result = {
            "processing_id": processing_id,
            "content_type": content_info.get("type", "unknown"),
            "start_time": start_time.isoformat(),
            "status": "processing",
            "results": {},
            "metadata": content_info.get("metadata", {})
        }
        
        try:
            content_type = content_info["type"]
            content_path = content_info["path"]
            
            if content_type == "video":
                result["results"] = await self._process_video_content(content_path)
            elif content_type == "audio":
                result["results"] = await self._process_audio_content(content_path)
            elif content_type == "image":
                result["results"] = await self._process_image_content(content_path)
            elif content_type == "document_with_images":
                result["results"] = await self._process_document_with_images(content_path)
            else:
                raise ValueError(f"Unsupported content type: {content_type}")
            
            # Finalizacja
            result["status"] = "completed"
            result["end_time"] = datetime.utcnow().isoformat()
            result["processing_duration"] = (
                datetime.utcnow() - start_time
            ).total_seconds()
            
            # Statystyki
            self.processing_stats["total_processed"] += 1
            self.processing_stats["successful"] += 1
            self._update_average_processing_time(result["processing_duration"])
            
            print(f"✅ Processing completed for {processing_id}")
            
        except Exception as e:
            result["status"] = "failed"
            result["error"] = str(e)
            result["end_time"] = datetime.utcnow().isoformat()
            
            self.processing_stats["total_processed"] += 1
            self.processing_stats["failed"] += 1
            
            print(f"❌ Processing failed for {processing_id}: {str(e)}")
        
        return result
    
    async def _process_video_content(self, video_path: str) -> Dict:
        """Przetwarzanie treści wideo"""
        
        results = {
            "audio_analysis": {},
            "visual_analysis": {},
            "synchronized_insights": {}
        }
        
        # Krok 1: Ekstrakcja audio z wideo
        audio_path = await self._extract_audio_from_video(video_path)
        
        # Krok 2: Transkrypcja audio
        print("🎙️ Transcribing audio...")
        transcription_result = await self._transcribe_audio_file(audio_path)
        results["audio_analysis"] = transcription_result
        
        # Krok 3: Ekstrakcja kluczowych klatek
        print("🖼️ Extracting key frames...")
        key_frames = await self._extract_key_frames(video_path, frame_count=10)
        
        # Krok 4: Analiza każdej klatki
        print("👁️ Analyzing frames...")
        frame_analyses = []
        for i, frame_path in enumerate(key_frames):
            frame_analysis = self.vision_analyzer.analyze_image_comprehensive(frame_path)
            frame_analysis["timestamp"] = i * (transcription_result.get("duration", 60) / len(key_frames))
            frame_analyses.append(frame_analysis)
        
        results["visual_analysis"] = {
            "frames_analyzed": len(frame_analyses),
            "frame_analyses": frame_analyses
        }
        
        # Krok 5: Synchronizacja insights
        print("🔗 Synchronizing insights...")
        synchronized = await self._synchronize_audio_visual_insights(
            transcription_result, 
            frame_analyses
        )
        results["synchronized_insights"] = synchronized
        
        return results
    
    async def _transcribe_audio_file(self, audio_path: str) -> Dict:
        """Transkrypcja pliku audio z diaryzacją mówców"""
        
        # Konfiguracja dla batch transcription
        transcription_config = {
            "audio_path": audio_path,
            "language": "pl-PL",
            "enable_speaker_diarization": True,
            "enable_word_timestamps": True,
            "enable_automatic_punctuation": True
        }
        
        # Symulacja batch transcription (w rzeczywistości używałby REST API)
        # To jest uproszczona implementacja dla warsztatów
        
        transcription_result = {
            "transcript_segments": [
                {
                    "speaker_id": "Speaker_0",
                    "start_time": 0.0,
                    "end_time": 15.5,
                    "text": "Witam wszystkich na dzisiejszej prezentacji o sztucznej inteligencji.",
                    "confidence": 0.95
                },
                {
                    "speaker_id": "Speaker_1", 
                    "start_time": 16.0,
                    "end_time": 28.3,
                    "text": "Dziękuję za wprowadzenie. Zacznijmy od podstawowych definicji AI.",
                    "confidence": 0.92
                }
            ],
            "speaker_analysis": {
                "total_speakers": 2,
                "speaking_time": {
                    "Speaker_0": 45.2,  # seconds
                    "Speaker_1": 78.8
                },
                "word_counts": {
                    "Speaker_0": 156,
                    "Speaker_1": 267
                }
            },
            "full_transcript": "Witam wszystkich na dzisiejszej prezentacji...",
            "duration": 124.0,  # seconds
            "confidence": 0.94
        }
        
        return transcription_result
    
    async def _extract_key_frames(self, video_path: str, frame_count: int = 10) -> List[str]:
        """Ekstrakcja kluczowych klatek z wideo"""
        
        # W rzeczywistości używałby OpenCV lub FFmpeg
        # To jest symulacja dla warsztatów
        
        key_frames = []
        
        for i in range(frame_count):
            # Symulacja ekstrakcji klatki
            frame_path = f"/tmp/frame_{i:03d}.jpg"
            # Tutaj byłaby logika ekstrakcji klatki
            key_frames.append(frame_path)
        
        print(f"📷 Extracted {len(key_frames)} key frames")
        
        return key_frames
    
    async def _synchronize_audio_visual_insights(self, audio_data: Dict, 
                                               frame_analyses: List[Dict]) -> Dict:
        """Synchronizacja insights z audio i video"""
        
        synchronized_timeline = []
        
        # Dla każdej klatki znajdź odpowiadający fragment audio
        for frame_analysis in frame_analyses:
            timestamp = frame_analysis["timestamp"]
            
            # Znajdź segment audio dla tego czasu
            relevant_audio_segment = None
            for segment in audio_data["transcript_segments"]:
                if segment["start_time"] <= timestamp <= segment["end_time"]:
                    relevant_audio_segment = segment
                    break
            
            # Stwórz synchroniczną analizę
            timeline_entry = {
                "timestamp": timestamp,
                "visual_content": {
                    "description": frame_analysis.get("description", {}),
                    "objects": frame_analysis.get("objects", []),
                    "text_in_image": frame_analysis.get("extracted_text", "")
                },
                "audio_content": {
                    "speaker": relevant_audio_segment["speaker_id"] if relevant_audio_segment else None,
                    "text": relevant_audio_segment["text"] if relevant_audio_segment else "",
                    "confidence": relevant_audio_segment["confidence"] if relevant_audio_segment else 0
                },
                "insights": self._generate_multimodal_insights(
                    frame_analysis, relevant_audio_segment
                )
            }
            
            synchronized_timeline.append(timeline_entry)
        
        # Generowanie podsumowania
        summary = self._generate_content_summary(synchronized_timeline)
        
        return {
            "timeline": synchronized_timeline,
            "summary": summary,
            "key_topics": self._extract_key_topics(audio_data, frame_analyses),
            "action_items": self._extract_action_items(audio_data)
        }
    
    def _generate_multimodal_insights(self, frame_data: Dict, audio_data: Optional[Dict]) -> Dict:
        """Generowanie insights z danych multimodalnych"""
        
        insights = {
            "correlation_strength": 0.5,
            "content_alignment": "unknown",
            "key_observations": []
        }
        
        if not audio_data:
            return insights
        
        # Analiza zgodności treści wizualnej z audio
        visual_elements = [obj["object"] for obj in frame_data.get("objects", [])]
        audio_text = audio_data["text"].lower()
        
        # Sprawdź korelacje
        correlations = []
        for visual_element in visual_elements:
            if visual_element.lower() in audio_text:
                correlations.append({
                    "type": "direct_mention",
                    "visual": visual_element,
                    "audio_context": audio_text
                })
        
        if correlations:
            insights["correlation_strength"] = 0.8
            insights["content_alignment"] = "high"
            insights["key_observations"].append("Strong alignment between visual content and speech")
        
        return insights
    
    def _extract_key_topics(self, audio_data: Dict, frame_analyses: List[Dict]) -> List[str]:
        """Wydobywanie kluczowych tematów z całej treści"""
        
        # Kombinacja tematów z audio i video
        topics = set()
        
        # Tematy z transkrypcji
        full_transcript = audio_data.get("full_transcript", "")
        audio_keywords = self._extract_keywords_from_text(full_transcript)
        topics.update(audio_keywords)
        
        # Tematy z analizy wizualnej
        for frame in frame_analyses:
            visual_tags = [tag["name"] for tag in frame.get("tags", []) if tag["confidence"] > 0.7]
            topics.update(visual_tags)
        
        # Sortuj według częstości występowania
        sorted_topics = list(topics)[:10]  # Top 10
        
        return sorted_topics
    
    def _extract_keywords_from_text(self, text: str) -> List[str]:
        """Proste wydobywanie słów kluczowych"""
        
        # W rzeczywistości użyłby Azure AI Language lub NLP library
        import re
        
        words = re.findall(r'\b\w+\b', text.lower())
        
        # Usuń stop words (uproszczone)
        stop_words = {"i", "a", "to", "jest", "w", "z", "na", "o", "do", "że", "się"}
        keywords = [word for word in words if word not in stop_words and len(word) > 3]
        
        # Policz częstość
        from collections import Counter
        word_counts = Counter(keywords)
        
        # Zwróć najczęściej występujące
        return [word for word, count in word_counts.most_common(10)]

🎯 Warsztat praktyczny (120 min)

Implementacja krok po kroku

Krok 1: Setup i konfiguracja (20 min)

# Konfiguracja środowiska
workshop_config = {
    "speech_key": "YOUR_SPEECH_KEY",
    "speech_region": "eastus",
    "vision_key": "YOUR_VISION_KEY", 
    "vision_endpoint": "https://your-vision.cognitiveservices.azure.com/",
    "storage_connection": "YOUR_STORAGE_CONNECTION"
}

# Inicjalizacja systemu
processor = MultimodalContentProcessor(workshop_config)

# Test połączeń
async def test_services():
    """Test dostępności wszystkich usług"""
    
    results = {}
    
    # Test Speech Service
    try:
        test_audio = "Hello, this is a test"
        audio_data = processor.speech_processor.text_to_speech(test_audio)
        results["speech_service"] = "✅ Connected"
    except Exception as e:
        results["speech_service"] = f"❌ Error: {str(e)}"
    
    # Test Vision Service  
    try:
        # Test z przykładowym obrazem
        test_url = "https://example.com/test-image.jpg"
        analysis = processor.vision_analyzer.analyze_image_comprehensive(test_url)
        results["vision_service"] = "✅ Connected"
    except Exception as e:
        results["vision_service"] = f"❌ Error: {str(e)}"
    
    return results

# Uruchomienie testów
test_results = await test_services()
print("🔧 Service connectivity test:")
for service, status in test_results.items():
    print(f"  {service}: {status}")

Krok 2: Przetwarzanie przykładowych plików (40 min)

# Przykładowe pliki testowe
test_files = [
    {
        "type": "video",
        "path": "sample_presentation.mp4",
        "metadata": {
            "title": "AI in Business Presentation",
            "duration_estimate": "15 minutes",
            "speaker": "John Doe"
        }
    },
    {
        "type": "audio", 
        "path": "interview_recording.wav",
        "metadata": {
            "title": "Technical Interview",
            "participants": ["Interviewer", "Candidate"],
            "topic": "Machine Learning Engineer"
        }
    },
    {
        "type": "image",
        "path": "technical_diagram.png", 
        "metadata": {
            "title": "System Architecture Diagram",
            "source": "Technical Documentation"
        }
    }
]

# Przetwarzanie każdego pliku
processing_results = []

for file_info in test_files:
    print(f"\n🔄 Processing: {file_info['metadata']['title']}")
    
    result = await processor.process_multimedia_content(file_info)
    processing_results.append(result)
    
    # Wyświetl kluczowe wyniki
    if result["status"] == "completed":
        print(f"✅ Completed in {result['processing_duration']:.1f}s")
        
        # Pokaż kluczowe insights
        if "synchronized_insights" in result["results"]:
            insights = result["results"]["synchronized_insights"]
            print(f"📊 Key topics: {', '.join(insights.get('key_topics', [])[:3])}")
            print(f"🎯 Action items: {len(insights.get('action_items', []))}")
        
    else:
        print(f"❌ Failed: {result.get('error', 'Unknown error')}")

Krok 3: Integracja i API (30 min)

from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
import aiofiles
import os

app = FastAPI(title="Multimodal Content Processor API")

# Global processor instance
global_processor = None

@app.on_event("startup")
async def startup_event():
    """Inicjalizacja przy starcie aplikacji"""
    global global_processor
    
    config = {
        "speech_key": os.getenv("AZURE_SPEECH_KEY"),
        "speech_region": os.getenv("AZURE_SPEECH_REGION"),
        "vision_key": os.getenv("AZURE_VISION_KEY"),
        "vision_endpoint": os.getenv("AZURE_VISION_ENDPOINT")
    }
    
    global_processor = MultimodalContentProcessor(config)
    print("🚀 Multimodal processor initialized")

@app.post("/process-content/")
async def process_uploaded_content(
    file: UploadFile = File(...),
    content_type: str = "auto-detect"
):
    """Endpoint dla przesyłania i przetwarzania treści"""
    
    if not global_processor:
        raise HTTPException(status_code=500, detail="Processor not initialized")
    
    try:
        # Zapisz przesłany plik
        temp_file_path = f"/tmp/{file.filename}"
        async with aiofiles.open(temp_file_path, 'wb') as f:
            content = await file.read()
            await f.write(content)
        
        # Wykryj typ treści jeśli auto-detect
        if content_type == "auto-detect":
            content_type = detect_content_type(file.filename)
        
        # Przygotuj info dla processora
        content_info = {
            "type": content_type,
            "path": temp_file_path,
            "metadata": {
                "filename": file.filename,
                "size": len(content),
                "upload_time": datetime.utcnow().isoformat()
            }
        }
        
        # Przetwarzanie
        result = await global_processor.process_multimedia_content(content_info)
        
        # Cleanup
        os.remove(temp_file_path)
        
        return JSONResponse(content=result)
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}")

@app.get("/processing-stats/")
async def get_processing_stats():
    """Endpoint dla statystyk przetwarzania"""
    
    if not global_processor:
        raise HTTPException(status_code=500, detail="Processor not initialized")
    
    stats = global_processor.processing_stats.copy()
    
    # Dodaj dodatkowe metryki
    if stats["total_processed"] > 0:
        stats["success_rate"] = (stats["successful"] / stats["total_processed"]) * 100
        stats["failure_rate"] = (stats["failed"] / stats["total_processed"]) * 100
    
    return JSONResponse(content=stats)

def detect_content_type(filename: str) -> str:
    """Auto-detection typu treści na podstawie rozszerzenia"""
    
    extension = filename.lower().split('.')[-1]
    
    video_extensions = ["mp4", "avi", "mov", "mkv"]
    audio_extensions = ["wav", "mp3", "m4a", "flac"]
    image_extensions = ["jpg", "jpeg", "png", "bmp", "gif"]
    
    if extension in video_extensions:
        return "video"
    elif extension in audio_extensions:
        return "audio"
    elif extension in image_extensions:
        return "image"
    else:
        return "unknown"

# Uruchomienie serwera
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Krok 4: Frontend interface (20 min)

<!DOCTYPE html>
<html>
<head>
    <title>Multimodal Content Processor</title>
    <meta charset="utf-8">
    <style>
        body {
            font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif;
            max-width: 1200px;
            margin: 0 auto;
            padding: 20px;
            background-color: #f5f5f5;
        }
        .container {
            background: white;
            border-radius: 12px;
            padding: 30px;
            box-shadow: 0 4px 6px rgba(0,0,0,0.1);
        }
        .upload-area {
            border: 2px dashed #e2e8f0;
            border-radius: 8px;
            padding: 40px;
            text-align: center;
            margin: 20px 0;
            transition: all 0.3s ease;
        }
        .upload-area:hover {
            border-color: #3b82f6;
            background-color: #f8fafc;
        }
        .upload-area.dragging {
            border-color: #10b981;
            background-color: #f0fdf4;
        }
        .results-section {
            margin-top: 30px;
            padding: 20px;
            background-color: #f8fafc;
            border-radius: 8px;
            border-left: 4px solid #3b82f6;
        }
        .processing-indicator {
            display: none;
            text-align: center;
            padding: 20px;
        }
        .spinner {
            border: 3px solid #f3f3f3;
            border-top: 3px solid #3b82f6;
            border-radius: 50%;
            width: 30px;
            height: 30px;
            animation: spin 1s linear infinite;
            margin: 0 auto;
        }
        @keyframes spin {
            0% { transform: rotate(0deg); }
            100% { transform: rotate(360deg); }
        }
    </style>
</head>
<body>
    <div class="container">
        <h1>🎭 Multimodal Content Processor</h1>
        <p>Upload audio, video, or images for AI-powered analysis</p>
        
        <div class="upload-area" id="uploadArea">
            <p>📁 Drag & drop files here or <strong>click to browse</strong></p>
            <p style="color: #6b7280; font-size: 14px;">
                Supported: MP4, MP3, WAV, JPG, PNG (max 50MB)
            </p>
            <input type="file" id="fileInput" style="display: none;" 
                   accept=".mp4,.mp3,.wav,.jpg,.jpeg,.png">
        </div>
        
        <div class="processing-indicator" id="processingIndicator">
            <div class="spinner"></div>
            <p>Processing your content...</p>
        </div>
        
        <div class="results-section" id="resultsSection" style="display: none;">
            <h3>📊 Analysis Results</h3>
            <div id="resultsContent"></div>
        </div>
    </div>
    
    <script>
        const uploadArea = document.getElementById('uploadArea');
        const fileInput = document.getElementById('fileInput');
        const processingIndicator = document.getElementById('processingIndicator');
        const resultsSection = document.getElementById('resultsSection');
        const resultsContent = document.getElementById('resultsContent');
        
        // Upload area click handler
        uploadArea.addEventListener('click', () => fileInput.click());
        
        // File selection handler
        fileInput.addEventListener('change', handleFileSelection);
        
        // Drag and drop handlers
        uploadArea.addEventListener('dragover', handleDragOver);
        uploadArea.addEventListener('dragleave', handleDragLeave);
        uploadArea.addEventListener('drop', handleDrop);
        
        function handleDragOver(e) {
            e.preventDefault();
            uploadArea.classList.add('dragging');
        }
        
        function handleDragLeave(e) {
            e.preventDefault();
            uploadArea.classList.remove('dragging');
        }
        
        function handleDrop(e) {
            e.preventDefault();
            uploadArea.classList.remove('dragging');
            
            const files = e.dataTransfer.files;
            if (files.length > 0) {
                processFile(files[0]);
            }
        }
        
        function handleFileSelection(e) {
            const file = e.target.files[0];
            if (file) {
                processFile(file);
            }
        }
        
        async function processFile(file) {
            // Show processing indicator
            processingIndicator.style.display = 'block';
            resultsSection.style.display = 'none';
            
            const formData = new FormData();
            formData.append('file', file);
            formData.append('content_type', 'auto-detect');
            
            try {
                const response = await fetch('/process-content/', {
                    method: 'POST',
                    body: formData
                });
                
                const result = await response.json();
                
                if (response.ok) {
                    displayResults(result);
                } else {
                    displayError(result.detail || 'Processing failed');
                }
                
            } catch (error) {
                displayError('Upload failed: ' + error.message);
            } finally {
                processingIndicator.style.display = 'none';
            }
        }
        
        function displayResults(result) {
            let html = `<h4>✅ Processing Completed</h4>`;
            html += `<p><strong>Processing ID:</strong> ${result.processing_id}</p>`;
            html += `<p><strong>Duration:</strong> ${result.processing_duration.toFixed(1)}s</p>`;
            
            if (result.results.audio_analysis) {
                html += `<h5>🎙️ Audio Analysis</h5>`;
                const audio = result.results.audio_analysis;
                html += `<p><strong>Duration:</strong> ${audio.duration}s</p>`;
                html += `<p><strong>Speakers:</strong> ${audio.speaker_analysis.total_speakers}</p>`;
                html += `<p><strong>Transcript:</strong> ${audio.full_transcript.substring(0, 200)}...</p>`;
            }
            
            if (result.results.visual_analysis) {
                html += `<h5>👁️ Visual Analysis</h5>`;
                const visual = result.results.visual_analysis;
                html += `<p><strong>Frames analyzed:</strong> ${visual.frames_analyzed}</p>`;
            }
            
            if (result.results.synchronized_insights) {
                html += `<h5>🔗 Key Insights</h5>`;
                const insights = result.results.synchronized_insights;
                html += `<p><strong>Key topics:</strong> ${insights.key_topics.join(', ')}</p>`;
                html += `<p><strong>Action items:</strong> ${insights.action_items.length}</p>`;
            }
            
            resultsContent.innerHTML = html;
            resultsSection.style.display = 'block';
        }
        
        function displayError(errorMessage) {
            resultsContent.innerHTML = `<p style="color: red;">❌ Error: ${errorMessage}</p>`;
            resultsSection.style.display = 'block';
        }
    </script>
</body>
</html>

Krok 5: Testing i deployment (30 min)

class SystemTester:
    def __init__(self, processor):
        self.processor = processor
        
    async def run_comprehensive_tests(self):
        """Kompleksowe testowanie systemu"""
        
        test_scenarios = [
            {
                "name": "Single speaker audio",
                "file": "single_speaker_test.wav",
                "expected_speakers": 1,
                "min_confidence": 0.8
            },
            {
                "name": "Multi-speaker conversation", 
                "file": "conversation_test.wav",
                "expected_speakers": 2,
                "min_confidence": 0.7
            },
            {
                "name": "Presentation with slides",
                "file": "presentation_test.mp4", 
                "expected_visual_elements": ["text", "diagrams"],
                "expected_speakers": 1
            },
            {
                "name": "Technical diagram",
                "file": "diagram_test.png",
                "expected_text_extraction": True,
                "min_ocr_confidence": 0.8
            }
        ]
        
        test_results = []
        
        for scenario in test_scenarios:
            print(f"\n🧪 Testing: {scenario['name']}")
            
            try:
                content_info = {
                    "type": self._detect_type_from_filename(scenario["file"]),
                    "path": f"test_data/{scenario['file']}",
                    "metadata": {"test_scenario": scenario["name"]}
                }
                
                result = await self.processor.process_multimedia_content(content_info)
                
                # Walidacja wyników
                validation = self._validate_test_result(result, scenario)
                
                test_results.append({
                    "scenario": scenario["name"],
                    "status": "passed" if validation["passed"] else "failed",
                    "details": validation,
                    "processing_time": result.get("processing_duration", 0)
                })
                
                status_emoji = "✅" if validation["passed"] else "❌"
                print(f"{status_emoji} {scenario['name']}: {validation['summary']}")
                
            except Exception as e:
                test_results.append({
                    "scenario": scenario["name"],
                    "status": "error",
                    "error": str(e)
                })
                print(f"❌ {scenario['name']}: Error - {str(e)}")
        
        # Podsumowanie testów
        passed_tests = len([t for t in test_results if t["status"] == "passed"])
        total_tests = len(test_results)
        
        print(f"\n📊 Test Summary: {passed_tests}/{total_tests} tests passed")
        
        return test_results
    
    def _validate_test_result(self, result, scenario):
        """Walidacja wyników testu"""
        
        validation = {"passed": True, "issues": [], "summary": ""}
        
        if result["status"] != "completed":
            validation["passed"] = False
            validation["issues"].append("Processing failed")
            validation["summary"] = "Processing failed"
            return validation
        
        # Sprawdź specific requirements dla scenariusza
        if "expected_speakers" in scenario:
            audio_analysis = result["results"].get("audio_analysis", {})
            speaker_count = audio_analysis.get("speaker_analysis", {}).get("total_speakers", 0)
            
            if speaker_count != scenario["expected_speakers"]:
                validation["passed"] = False
                validation["issues"].append(f"Expected {scenario['expected_speakers']} speakers, got {speaker_count}")
        
        if "min_confidence" in scenario:
            confidence = result["results"].get("audio_analysis", {}).get("confidence", 0)
            if confidence < scenario["min_confidence"]:
                validation["passed"] = False
                validation["issues"].append(f"Confidence {confidence:.2f} below threshold {scenario['min_confidence']}")
        
        # Set summary
        if validation["passed"]:
            validation["summary"] = "All requirements met"
        else:
            validation["summary"] = f"{len(validation['issues'])} issues found"
        
        return validation

✅ Zadania warsztatowe

Zadanie główne: Multimodal System (90 min)

  1. Setup environment (20 min) - konfiguracja Azure services
  2. Core implementation (40 min) - processor classes i API
  3. Integration testing (20 min) - testowanie z przykładowymi plikami
  4. UI development (10 min) - simple web interface

Zadania dodatkowe

Zadanie 1: Performance optimization (20 min)

  • Parallel processing dla multiple files
  • Caching mechanizm dla results
  • Async optimization

Zadanie 2: Advanced features (25 min)

  • Real-time processing capabilities
  • Custom model integration
  • Multi-language support

Zadanie 3: Production deployment (15 min)

  • Docker containerization
  • Azure deployment configuration
  • Monitoring i alerting setup

📊 Kryteria oceny

Technical implementation (60 punktów)

  • Działający system transkrypcji (20 pkt)
  • Integration z vision services (20 pkt)
  • API i interface (20 pkt)

Code quality (20 punktów)

  • Error handling i logging (10 pkt)
  • Documentation i code structure (10 pkt)

Innovation (20 punktów)

  • Dodatkowe funkcje (10 pkt)
  • UI/UX improvements (10 pkt)

🏆 Rezultat warsztatów

Po ukończeniu uczestnicy będą mieli:

  1. Działający multimodal system - pełna implementacja
  2. Practical experience z Azure Cognitive Services
  3. Production-ready code - gotowy do wdrożenia
  4. Portfolio project - demonstracja umiejętności

📚 Materiały dodatkowe

💡 Wskazówka

Każda sesja to 2 godziny intensywnej nauki z praktycznymi ćwiczeniami. Materiały można przeglądać w dowolnym tempie.

📈 Postęp

Śledź swój postęp w nauce AI i przygotowaniu do certyfikacji Azure AI-102. Każdy moduł buduje na poprzednim.