Moduł 7: Zaawansowane scenariusze AI


🎯 Cele modułu

  • Integracja rozwiązań AI z istniejącymi systemami biznesowymi
  • Implementacja wielomodalnych rozwiązań AI (text + vision + audio)
  • Wdrażanie AI na urządzeniach brzegowych (edge computing)
  • Budowa kompleksowych rozwiązań AI dla enterprise

Sesja 20: Integracja AI z systemami biznesowymi (04.11.2025)

🔗 Enterprise AI Integration Patterns

Common Integration Architectures

INTEGRATION PATTERNS:

1. API-FIRST INTEGRATION:
[BUSINESS SYSTEM] ↔ [AI API GATEWAY] ↔ [AI SERVICES]
✓ Loose coupling
✓ Scalable i maintainable
✓ Technology agnostic

2. EVENT-DRIVEN INTEGRATION:
[BUSINESS EVENTS] → [EVENT HUB] → [AI PROCESSORS] → [RESULTS STORE]
✓ Real-time processing
✓ Asynchronous handling
✓ High throughput

3. BATCH INTEGRATION:
[SCHEDULED JOBS] → [DATA EXTRACT] → [AI PROCESSING] → [RESULTS IMPORT]
✓ Large volume processing
✓ Cost-effective
✓ Predictable resource usage

4. EMBEDDED INTEGRATION:
[APPLICATION] + [EMBEDDED AI MODELS] → [REAL-TIME DECISIONS]
✓ Low latency
✓ Offline capability
✓ Data privacy

Azure Integration Services dla AI

from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.eventhub import EventHubProducerClient, EventData
import azure.functions as func
import json

class AIIntegrationOrchestrator:
    def __init__(self, config):
        self.servicebus_client = ServiceBusClient.from_connection_string(
            config["servicebus_connection_string"]
        )
        self.eventhub_client = EventHubProducerClient.from_connection_string(
            config["eventhub_connection_string"]
        )
        self.ai_endpoints = config["ai_endpoints"]
    
    def process_business_event(self, event_data):
        """Process incoming business event z AI services"""
        
        event_type = event_data.get("event_type")
        payload = event_data.get("payload")
        
        if event_type == "document_uploaded":
            return self._process_document_event(payload)
        elif event_type == "customer_inquiry":
            return self._process_inquiry_event(payload)
        elif event_type == "transaction_completed":
            return self._process_transaction_event(payload)
        else:
            raise ValueError(f"Unknown event type: {event_type}")
    
    def _process_document_event(self, payload):
        """Process document upload event z AI analysis"""
        
        document_url = payload["document_url"]
        document_type = payload.get("document_type", "unknown")
        
        # Route do appropriate AI service
        if document_type == "contract":
            analysis_result = self._analyze_contract(document_url)
        elif document_type == "invoice":
            analysis_result = self._analyze_invoice(document_url)
        else:
            analysis_result = self._analyze_general_document(document_url)
        
        # Send results back do business system
        result_message = ServiceBusMessage(json.dumps({
            "document_id": payload["document_id"],
            "analysis_result": analysis_result,
            "processing_timestamp": datetime.utcnow().isoformat()
        }))
        
        with self.servicebus_client:
            sender = self.servicebus_client.get_queue_sender("document-analysis-results")
            sender.send_messages(result_message)
        
        return analysis_result
    
    def _analyze_contract(self, document_url):
        """Specialized contract analysis using AI"""
        
        # Use Azure Form Recognizer dla structured extraction
        contract_data = self._extract_contract_fields(document_url)
        
        # Use Azure OpenAI dla risk analysis
        risk_analysis = self._analyze_contract_risks(contract_data["content"])
        
        # Use Azure Language dla entity extraction
        entities = self._extract_legal_entities(contract_data["content"])
        
        return {
            "contract_fields": contract_data,
            "risk_assessment": risk_analysis,
            "legal_entities": entities,
            "confidence_score": self._calculate_overall_confidence([
                contract_data["confidence"],
                risk_analysis["confidence"], 
                entities["confidence"]
            ])
        }

# Azure Functions integration example
@func.function_name("AIBusinessIntegration")
@func.service_bus_queue_trigger(
    arg_name="msg",
    queue_name="business-events",
    connection="ServiceBusConnectionString"
)
def business_event_processor(msg: func.ServiceBusMessage):
    """Azure Function dla processing business events z AI"""
    
    try:
        # Parse incoming message
        event_data = json.loads(msg.get_body().decode('utf-8'))
        
        # Initialize AI orchestrator
        orchestrator = AIIntegrationOrchestrator(get_config())
        
        # Process event
        result = orchestrator.process_business_event(event_data)
        
        # Log success
        logging.info(f"Successfully processed event {event_data['event_id']}")
        
        return result
        
    except Exception as e:
        logging.error(f"Error processing event: {str(e)}")
        raise

🏢 Enterprise AI Architecture Workshop

Workshop Project: Integrated AI Business Solution

PROJECT: INTELLIGENT DOCUMENT PROCESSING SYSTEM

BUSINESS SCENARIO:
Large organization receives 1000+ documents daily (contracts, invoices, reports)
Current process: Manual review, data entry, routing
Target: 80% automation z human oversight dla complex cases

SYSTEM COMPONENTS:

1. DOCUMENT INTAKE:
   - Multiple input channels (email, web portal, API)
   - Automatic classification i routing
   - Duplicate detection i handling

2. AI PROCESSING PIPELINE:
   - OCR i text extraction
   - Document type classification
   - Key information extraction
   - Risk i compliance analysis

3. BUSINESS SYSTEM INTEGRATION:
   - ERP system data sync
   - CRM integration dla customer documents  
   - Workflow automation dla approvals
   - Audit trail dla compliance

4. HUMAN OVERSIGHT:
   - Review queue dla low-confidence results
   - Approval workflows dla high-value documents
   - Feedback system dla continuous improvement
   - Exception handling procedures

IMPLEMENTATION ARCHITECTURE:

[DOCUMENT INPUTS] → [API GATEWAY] → [AZURE FUNCTIONS] → [AI SERVICES] ↓ ↓ ↓ ↓ [BLOB STORAGE] → [SERVICE BUS] → [LOGIC APPS] → [COSMOS DB] ↓ ↓ ↓ ↓ [ERP/CRM] ← [POWER PLATFORM] ← [MONITORING] ← [RESULTS API]


DELIVERABLES:
- Working prototype processing 3 document types
- Integration z mock business system
- Monitoring dashboard z key metrics
- Documentation dla operations team

Sesja 21: Wielomodalne rozwiązania AI i zastosowania brzegowe (06.11.2025)

🎭 Multimodal AI Applications

Understanding Multimodal AI

MULTIMODAL AI CAPABILITIES:

TEXT + VISION:
- Document analysis z images i text
- Visual question answering
- Image captioning i description
- Chart i diagram interpretation

TEXT + AUDIO:
- Voice assistants z natural conversation
- Audio content analysis z transcription
- Sentiment analysis z voice tone
- Multi-language support

VISION + AUDIO:
- Video content analysis
- Real-time scene understanding
- Activity recognition w video
- Audio-visual synchronization

TEXT + VISION + AUDIO:
- Complete media understanding
- Interactive AI assistants
- Comprehensive content analysis
- Rich user experience interfaces

GPT-4 Vision Implementation

import base64
import requests
from azure.ai.ml import MLClient

class MultimodalAIProcessor:
    def __init__(self, azure_openai_config):
        self.client = AzureOpenAI(**azure_openai_config)
        self.vision_model = "gpt-4-vision-preview"
    
    def analyze_image_with_context(self, image_path, text_context, analysis_type="general"):
        """Analyze image z textual context using GPT-4 Vision"""
        
        # Encode image
        with open(image_path, "rb") as image_file:
            image_data = base64.b64encode(image_file.read()).decode('utf-8')
        
        # Construct multimodal prompt
        if analysis_type == "business_document":
            prompt = f"""
Analyze this business document image w context of: {text_context}

Extract i analyze:
1. Document type i structure
2. Key information i data points  
3. Quality i completeness assessment
4. Potential issues or red flags
5. Recommended actions

Provide structured JSON response z confidence scores.
"""
        elif analysis_type == "technical_diagram":
            prompt = f"""
Analyze this technical diagram related to: {text_context}

Identify i explain:
1. Main components i their relationships
2. Data flow or process flow
3. Technical specifications visible
4. Potential improvements or issues
5. Implementation recommendations

Format response jako technical analysis report.
"""
        
        # Make API call
        response = self.client.chat.completions.create(
            model=self.vision_model,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": prompt},
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{image_data}",
                                "detail": "high"
                            }
                        }
                    ]
                }
            ],
            max_tokens=1500
        )
        
        return {
            "analysis": response.choices[0].message.content,
            "image_path": image_path,
            "context": text_context,
            "analysis_type": analysis_type
        }
    
    def process_video_content(self, video_path, analysis_objectives):
        """Process video content z multimodal analysis"""
        
        # Extract key frames
        key_frames = self._extract_key_frames(video_path, frame_count=10)
        
        # Extract audio i transcribe
        audio_transcription = self._transcribe_video_audio(video_path)
        
        # Analyze each frame z context
        frame_analyses = []
        for i, frame in enumerate(key_frames):
            timestamp = i * (len(audio_transcription) / len(key_frames))
            relevant_text = self._get_relevant_transcript(audio_transcription, timestamp)
            
            frame_analysis = self.analyze_image_with_context(
                frame, relevant_text, "video_frame"
            )
            frame_analyses.append(frame_analysis)
        
        # Synthesize complete video understanding
        complete_analysis = self._synthesize_video_analysis(
            frame_analyses, audio_transcription, analysis_objectives
        )
        
        return complete_analysis

🖥️ Edge AI Implementation

Edge Computing dla AI Workloads

EDGE AI ARCHITECTURE:

CLOUD SERVICES:
- Model training i optimization
- Central management i updates
- Analytics i insights aggregation
- Global coordination

EDGE DEVICES:
- Local model inference
- Real-time decision making
- Offline operation capability
- Data preprocessing i filtering

COMMUNICATION:
- Model synchronization
- Telemetry i monitoring data
- Critical alert escalation
- Batch data uploads

EDGE AI BENEFITS:
✓ Low latency responses
✓ Privacy-preserving processing
✓ Reduced bandwidth costs
✓ Offline operation capability
✓ Compliance z data residency requirements

Azure IoT Edge z AI Modules

# edge_ai_module.py dla Azure IoT Edge

import asyncio
import json
from azure.iot.device import IoTHubDeviceClient, Message
import cv2
import numpy as np
import onnxruntime as ort

class EdgeAIModule:
    def __init__(self, connection_string, model_path):
        self.device_client = IoTHubDeviceClient.create_from_connection_string(connection_string)
        self.model_session = ort.InferenceSession(model_path)
        self.processing_stats = {
            "total_processed": 0,
            "average_latency": 0,
            "error_count": 0
        }
    
    async def start_processing(self):
        """Start edge AI processing loop"""
        
        await self.device_client.connect()
        
        # Setup message handlers
        self.device_client.on_message_received = self._handle_cloud_message
        
        # Start local processing
        while True:
            try:
                # Process local data (e.g., camera feed)
                local_data = await self._capture_local_data()
                
                if local_data:
                    result = await self._process_with_ai(local_data)
                    
                    # Send results do cloud if significant
                    if self._is_significant_result(result):
                        await self._send_to_cloud(result)
                    
                    # Update local statistics
                    self._update_stats(result)
                
                await asyncio.sleep(1)  # Process every second
                
            except Exception as e:
                self.processing_stats["error_count"] += 1
                print(f"Error w processing: {e}")
                await asyncio.sleep(5)  # Wait before retry
    
    async def _process_with_ai(self, input_data):
        """Run AI inference on edge device"""
        
        import time
        start_time = time.time()
        
        # Preprocess input dla model
        preprocessed = self._preprocess_input(input_data)
        
        # Run inference
        inputs = {self.model_session.get_inputs()[0].name: preprocessed}
        outputs = self.model_session.run(None, inputs)
        
        # Post-process results
        result = self._postprocess_output(outputs[0])
        
        # Calculate latency
        latency = (time.time() - start_time) * 1000  # ms
        
        return {
            "result": result,
            "latency_ms": latency,
            "timestamp": time.time(),
            "confidence": self._calculate_confidence(outputs[0])
        }
    
    def _preprocess_input(self, input_data):
        """Preprocess input dla edge model"""
        
        if isinstance(input_data, np.ndarray):
            # Image processing
            if len(input_data.shape) == 3:  # Color image
                # Resize do model expected size
                resized = cv2.resize(input_data, (224, 224))
                # Normalize pixel values
                normalized = resized.astype(np.float32) / 255.0
                # Add batch dimension
                batched = np.expand_dims(normalized, axis=0)
                return batched
        
        elif isinstance(input_data, str):
            # Text processing - tokenization would go here
            # This is simplified dla example
            return np.array([[len(input_data)]], dtype=np.float32)
        
        return input_data
    
    async def _send_to_cloud(self, result):
        """Send significant results do cloud dla further processing"""
        
        message_data = {
            "device_id": "edge-device-001",
            "timestamp": result["timestamp"],
            "result": result["result"],
            "confidence": result["confidence"],
            "latency_ms": result["latency_ms"]
        }
        
        message = Message(json.dumps(message_data))
        message.message_id = f"edge-result-{int(result['timestamp'])}"
        message.correlation_id = "ai-processing"
        message.custom_properties["result_type"] = "ai_inference"
        
        await self.device_client.send_message(message)

# Deployment configuration dla IoT Edge
edge_deployment_manifest = {
    "modulesContent": {
        "$edgeAgent": {
            "properties.desired": {
                "schemaVersion": "1.1",
                "runtime": {
                    "type": "docker",
                    "settings": {
                        "minDockerVersion": "v1.25"
                    }
                },
                "systemModules": {
                    "edgeAgent": {
                        "type": "docker",
                        "settings": {
                            "image": "mcr.microsoft.com/azureiotedge-agent:1.4",
                            "createOptions": "{}"
                        }
                    },
                    "edgeHub": {
                        "type": "docker",
                        "status": "running",
                        "restartPolicy": "always",
                        "settings": {
                            "image": "mcr.microsoft.com/azureiotedge-hub:1.4",
                            "createOptions": "{\"HostConfig\":{\"PortBindings\":{\"5671/tcp\":[{\"HostPort\":\"5671\"}],\"8883/tcp\":[{\"HostPort\":\"8883\"}],\"443/tcp\":[{\"HostPort\":\"443\"}]}}}"
                        }
                    }
                },
                "modules": {
                    "aiProcessingModule": {
                        "type": "docker",
                        "status": "running",
                        "restartPolicy": "always",
                        "settings": {
                            "image": "your-registry.azurecr.io/ai-edge-module:latest",
                            "createOptions": "{\"HostConfig\":{\"Binds\":[\"/dev/video0:/dev/video0\"],\"Privileged\":true}}"
                        },
                        "env": {
                            "MODEL_PATH": {"value": "/app/models/optimized_model.onnx"},
                            "CONFIDENCE_THRESHOLD": {"value": "0.7"}
                        }
                    }
                }
            }
        }
    }
}

📱 Mobile i Edge Device Integration

ONNX Model Optimization dla Edge

import onnx
from onnxruntime.quantization import quantize_dynamic, QuantType
import torch

class EdgeModelOptimizer:
    def __init__(self):
        self.optimization_techniques = [
            "quantization",
            "pruning", 
            "knowledge_distillation",
            "tensor_compression"
        ]
    
    def optimize_for_edge(self, model_path, target_device="cpu"):
        """Optimize model dla edge deployment"""
        
        optimizations_applied = []
        
        # Load original model
        original_model = onnx.load(model_path)
        current_model_path = model_path
        
        # Apply quantization
        if target_device == "cpu":
            quantized_path = model_path.replace(".onnx", "_quantized.onnx")
            quantize_dynamic(current_model_path, quantized_path, weight_type=QuantType.QUInt8)
            current_model_path = quantized_path
            optimizations_applied.append("int8_quantization")
        
        # Measure performance improvement
        performance_metrics = self._benchmark_model_performance(
            original_path=model_path,
            optimized_path=current_model_path,
            target_device=target_device
        )
        
        return {
            "optimized_model_path": current_model_path,
            "optimizations_applied": optimizations_applied,
            "performance_improvement": performance_metrics,
            "model_size_reduction": self._calculate_size_reduction(model_path, current_model_path)
        }
    
    def _benchmark_model_performance(self, original_path, optimized_path, target_device):
        """Benchmark performance between original i optimized models"""
        
        import time
        
        # Load both models
        original_session = ort.InferenceSession(original_path)
        optimized_session = ort.InferenceSession(optimized_path)
        
        # Generate test input
        test_input = np.random.randn(1, 3, 224, 224).astype(np.float32)
        input_name = original_session.get_inputs()[0].name
        
        # Benchmark original model
        original_times = []
        for _ in range(100):
            start = time.time()
            original_session.run(None, {input_name: test_input})
            original_times.append((time.time() - start) * 1000)
        
        # Benchmark optimized model  
        optimized_times = []
        for _ in range(100):
            start = time.time()
            optimized_session.run(None, {input_name: test_input})
            optimized_times.append((time.time() - start) * 1000)
        
        return {
            "original_avg_latency_ms": np.mean(original_times),
            "optimized_avg_latency_ms": np.mean(optimized_times),
            "speedup_factor": np.mean(original_times) / np.mean(optimized_times),
            "latency_reduction_pct": (1 - np.mean(optimized_times) / np.mean(original_times)) * 100
        }

Real-time Multimodal Processing

import cv2
import threading
import queue
from concurrent.futures import ThreadPoolExecutor

class RealTimeMultimodalProcessor:
    def __init__(self, ai_models_config):
        self.vision_model = self._load_vision_model(ai_models_config["vision"])
        self.audio_model = self._load_audio_model(ai_models_config["audio"])
        self.text_model = self._load_text_model(ai_models_config["text"])
        
        self.frame_queue = queue.Queue(maxsize=10)
        self.audio_queue = queue.Queue(maxsize=50)
        self.results_queue = queue.Queue()
        
        self.executor = ThreadPoolExecutor(max_workers=4)
    
    def start_realtime_processing(self):
        """Start real-time multimodal processing"""
        
        # Start capture threads
        video_thread = threading.Thread(target=self._capture_video)
        audio_thread = threading.Thread(target=self._capture_audio)
        
        video_thread.start()
        audio_thread.start()
        
        # Start processing threads
        self.executor.submit(self._process_video_stream)
        self.executor.submit(self._process_audio_stream)
        self.executor.submit(self._fuse_multimodal_results)
        
        return {"status": "started", "threads": ["video", "audio", "processing"]}
    
    def _process_video_stream(self):
        """Process video frames w real-time"""
        
        while True:
            try:
                if not self.frame_queue.empty():
                    frame = self.frame_queue.get(timeout=1)
                    
                    # Run vision AI
                    vision_result = self._analyze_frame(frame)
                    
                    # Add timestamp i queue result
                    vision_result["timestamp"] = time.time()
                    vision_result["modality"] = "vision"
                    self.results_queue.put(vision_result)
                    
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Video processing error: {e}")
    
    def _process_audio_stream(self):
        """Process audio chunks w real-time"""
        
        audio_buffer = []
        
        while True:
            try:
                if not self.audio_queue.empty():
                    audio_chunk = self.audio_queue.get(timeout=1)
                    audio_buffer.append(audio_chunk)
                    
                    # Process when buffer reaches optimal size
                    if len(audio_buffer) >= 16:  # ~1 second of audio
                        combined_audio = np.concatenate(audio_buffer)
                        
                        # Run audio AI
                        audio_result = self._analyze_audio(combined_audio)
                        
                        # Add timestamp i queue result
                        audio_result["timestamp"] = time.time()
                        audio_result["modality"] = "audio"
                        self.results_queue.put(audio_result)
                        
                        # Clear buffer
                        audio_buffer = []
                        
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Audio processing error: {e}")
    
    def _fuse_multimodal_results(self):
        """Combine results from different modalities"""
        
        result_buffer = {"vision": [], "audio": [], "text": []}
        
        while True:
            try:
                if not self.results_queue.empty():
                    result = self.results_queue.get(timeout=1)
                    modality = result["modality"]
                    
                    # Add do appropriate buffer
                    result_buffer[modality].append(result)
                    
                    # Fuse results when we have data from multiple modalities
                    if len(result_buffer["vision"]) > 0 and len(result_buffer["audio"]) > 0:
                        fused_result = self._create_fused_understanding(result_buffer)
                        
                        # Send fused result dla decision making
                        self._handle_fused_result(fused_result)
                        
                        # Clear processed results
                        result_buffer = {"vision": [], "audio": [], "text": []}
                        
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Fusion processing error: {e}")

Sesja 22: Budowa kompleksowego rozwiązania AI (13.11.2025)

🏗️ Enterprise AI Solution Architecture

Complete System Design Workshop

CAPSTONE PROJECT: INTELLIGENT BUSINESS AUTOMATION PLATFORM

BUSINESS REQUIREMENTS:
- Multi-channel customer interaction (web, mobile, voice, email)
- Intelligent document processing i workflow automation
- Real-time analytics i decision support
- Compliance i audit trail maintenance
- Scalable architecture dla future growth

TECHNICAL ARCHITECTURE:

FRONTEND LAYER:
- Web application (React/Next.js)
- Mobile apps (React Native)
- Voice interface (Azure Bot Framework)
- Admin dashboard (Power BI)

API GATEWAY LAYER:
- Azure API Management
- Authentication i authorization
- Rate limiting i throttling
- Request/response transformation

AI SERVICES LAYER:
- Azure OpenAI dla conversational AI
- Azure Cognitive Services dla document processing
- Custom ML models dla domain-specific tasks
- Azure Bot Services dla multi-channel interaction

BUSINESS LOGIC LAYER:
- Azure Functions dla serverless processing
- Logic Apps dla workflow automation
- Service Bus dla messaging
- Azure SQL Database dla transactional data

ANALYTICS LAYER:
- Azure Synapse dla data warehousing
- Azure Analysis Services dla OLAP
- Power BI dla reporting i dashboards
- Azure Monitor dla operational insights

INFRASTRUCTURE LAYER:
- Azure Kubernetes Service dla containerized workloads
- Azure Container Registry dla image management
- Azure Key Vault dla secrets management
- Azure Virtual Network dla security

Implementation Framework

from dataclasses import dataclass
from typing import Dict, List, Any
import asyncio

@dataclass
class AIProcessingRequest:
    request_id: str
    input_type: str  # "text", "image", "audio", "document"
    content: Any
    metadata: Dict[str, Any]
    priority: int = 1

@dataclass 
class AIProcessingResult:
    request_id: str
    results: Dict[str, Any]
    confidence_scores: Dict[str, float]
    processing_time_ms: float
    errors: List[str] = None

class ComprehensiveAIOrchestrator:
    def __init__(self, services_config):
        self.text_processor = TextAnalysisService(services_config["text"])
        self.vision_processor = VisionAnalysisService(services_config["vision"])
        self.audio_processor = AudioAnalysisService(services_config["audio"])
        self.document_processor = DocumentAnalysisService(services_config["document"])
        
        self.request_queue = asyncio.Queue(maxsize=1000)
        self.result_store = {}
        self.active_processors = 0
    
    async def process_request(self, request: AIProcessingRequest) -> AIProcessingResult:
        """Process AI request through appropriate services"""
        
        start_time = time.time()
        results = {}
        confidence_scores = {}
        errors = []
        
        try:
            if request.input_type == "text":
                text_result = await self.text_processor.analyze(request.content)
                results["text_analysis"] = text_result["analysis"]
                confidence_scores["text"] = text_result["confidence"]
                
            elif request.input_type == "image":
                vision_result = await self.vision_processor.analyze(request.content)
                results["vision_analysis"] = vision_result["analysis"]
                confidence_scores["vision"] = vision_result["confidence"]
                
            elif request.input_type == "audio":
                # First transcribe, then analyze text
                transcription = await self.audio_processor.transcribe(request.content)
                text_analysis = await self.text_processor.analyze(transcription["text"])
                
                results["transcription"] = transcription
                results["text_analysis"] = text_analysis["analysis"]
                confidence_scores["audio"] = transcription["confidence"]
                confidence_scores["text"] = text_analysis["confidence"]
                
            elif request.input_type == "document":
                # Multi-step document processing
                document_result = await self.document_processor.analyze(request.content)
                
                # Extract text i analyze
                if document_result["extracted_text"]:
                    text_analysis = await self.text_processor.analyze(document_result["extracted_text"])
                    results["text_analysis"] = text_analysis["analysis"]
                    confidence_scores["text"] = text_analysis["confidence"]
                
                # Analyze any images w document
                if document_result["extracted_images"]:
                    image_analyses = []
                    for image in document_result["extracted_images"]:
                        img_result = await self.vision_processor.analyze(image)
                        image_analyses.append(img_result)
                    results["image_analyses"] = image_analyses
                
                results["document_structure"] = document_result["structure"]
                confidence_scores["document"] = document_result["confidence"]
                
            # Cross-modal analysis if multiple types of content
            if len(results) > 1:
                cross_modal_result = await self._perform_cross_modal_analysis(results)
                results["cross_modal_insights"] = cross_modal_result
                confidence_scores["cross_modal"] = cross_modal_result["confidence"]
                
        except Exception as e:
            errors.append(f"Processing error: {str(e)}")
        
        processing_time = (time.time() - start_time) * 1000
        
        return AIProcessingResult(
            request_id=request.request_id,
            results=results,
            confidence_scores=confidence_scores,
            processing_time_ms=processing_time,
            errors=errors if errors else None
        )
    
    async def _perform_cross_modal_analysis(self, individual_results):
        """Perform analysis across different modalities"""
        
        # Extract key insights from each modality
        text_insights = individual_results.get("text_analysis", {}).get("key_points", [])
        vision_insights = individual_results.get("vision_analysis", {}).get("objects", [])
        
        # Use LLM dla cross-modal reasoning
        cross_modal_prompt = f"""
Analyze the following multi-modal information i provide integrated insights:

TEXT ANALYSIS INSIGHTS:
{json.dumps(text_insights, indent=2)}

VISION ANALYSIS INSIGHTS:  
{json.dumps(vision_insights, indent=2)}

Provide:
1. Correlations between text i visual content
2. Inconsistencies or conflicts between modalities
3. Combined understanding that wouldn't be possible z single modality
4. Confidence assessment dla integrated analysis

Format response jako structured JSON.
"""
        
        cross_modal_response = await self.text_processor.generate_response(cross_modal_prompt)
        
        return {
            "integrated_insights": cross_modal_response["content"],
            "confidence": min(
                individual_results.get("text_analysis", {}).get("confidence", 0),
                individual_results.get("vision_analysis", {}).get("confidence", 0)
            ),
            "correlation_strength": self._assess_correlation_strength(individual_results)
        }

🏠 Capstone Project

Final Implementation Challenge

Project Requirements:

  1. Complete multimodal AI system processing text, images, i audio
  2. Edge deployment z optimized models
  3. Enterprise integration z mock business systems
  4. Real-time monitoring i alerting
  5. Comprehensive documentation dla operations

Technical Deliverables:

  • Working prototype z all modalities
  • Deployed edge components
  • Monitoring dashboard
  • Performance benchmarks
  • Scalability analysis

Business Deliverables:

  • ROI analysis dla proposed solution
  • Implementation roadmap dla production
  • Risk assessment i mitigation strategies
  • Training plan dla operations team

💡 Advanced AI Patterns i Best Practices

Enterprise AI Success Factors

CRITICAL SUCCESS FACTORS:

1. BUSINESS ALIGNMENT:
   - Clear ROI metrics i measurement
   - Stakeholder buy-in i change management
   - Realistic expectations i timeline
   - Continuous value demonstration

2. TECHNICAL EXCELLENCE:
   - Robust architecture i scalability planning
   - Comprehensive testing i validation
   - Security i compliance by design
   - Operational excellence i monitoring

3. ORGANIZATIONAL READINESS:
   - AI literacy i training programs
   - Data governance i quality processes
   - Cross-functional collaboration
   - Continuous learning culture

4. RISK MANAGEMENT:
   - Ethical AI principles i governance
   - Bias detection i mitigation
   - Privacy i security protection
   - Regulatory compliance

💡 Wskazówka

Każda sesja to 2 godziny intensywnej nauki z praktycznymi ćwiczeniami. Materiały można przeglądać w dowolnym tempie.

📈 Postęp

Śledź swój postęp w nauce AI i przygotowaniu do certyfikacji Azure AI-102. Każdy moduł buduje na poprzednim.