Sesja 13: Wdrażanie dużych modeli językowych

Praktyczne deployment i konfiguracja LLM

🎯 Cele sesji

  • Konfiguracja Azure OpenAI Service dla enterprise
  • Deployment różnych modeli LLM (GPT-4/5, Embeddings)
  • Optymalizacja wydajności i kosztów
  • Best practices dla production deployments

🧠 Architektura dużych modeli językowych

Podstawy LLM

Large Language Models to sieci neuronowe wytrenowane na ogromnych zbiorach tekstów, zdolne do generowania i rozumienia języka naturalnego.

Kluczowe komponenty:

  • Tokenizacja - podział tekstu na tokeny
  • Embeddingi - reprezentacja tokenów w przestrzeni wektorowej
  • Attention mechanism - mechanizm uwagi łączący tokeny
  • Transformer layers - warstwy przetwarzające informacje

Przegląd architektury Transformer

TEKST WEJŚCIOWY → TOKENIZACJA → EMBEDDING → WARSTWY ATTENTION → GENEROWANIE

Proces przetwarzania:

  1. Input Processing - tokenizacja i embedding
  2. Multi-Head Attention - analiza relacji między tokenami
  3. Feed-Forward Networks - nieliniowe transformacje
  4. Layer Normalization - stabilizacja treningu
  5. Output Generation - generowanie następnych tokenów

🔧 Azure OpenAI Service

Konfiguracja i deployment

import openai
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient
import os

class AzureOpenAIManager:
    def __init__(self, subscription_id, resource_group, workspace_name):
        self.credential = DefaultAzureCredential()
        self.ml_client = MLClient(
            credential=self.credential,
            subscription_id=subscription_id,
            resource_group_name=resource_group,
            workspace_name=workspace_name
        )
        
        # Konfiguracja Azure OpenAI
        openai.api_type = "azure"
        openai.api_base = os.getenv("AZURE_OPENAI_ENDPOINT")
        openai.api_version = "2024-02-01"
        openai.api_key = os.getenv("AZURE_OPENAI_KEY")
    
    def deploy_language_model(self, model_name, deployment_name, capacity=10):
        """Deployment modelu językowego w Azure OpenAI"""
        
        deployment_config = {
            "model": {
                "format": "OpenAI",
                "name": model_name,  # np. "gpt-4", "gpt-35-turbo"
                "version": "latest"
            },
            "scale_settings": {
                "scale_type": "Standard",
                "capacity": capacity  # TPM (Tokens Per Minute)
            }
        }
        
        print(f"Deploying {model_name} as {deployment_name}...")
        
        # W rzeczywistości deployment odbywa się przez Azure Portal lub CLI
        # To jest przykład konfiguracji
        
        return {
            "deployment_name": deployment_name,
            "model": model_name,
            "capacity": capacity,
            "status": "deployed",
            "endpoint": f"https://{deployment_name}.openai.azure.com/"
        }
    
    def test_model_deployment(self, deployment_name, test_prompts):
        """Testowanie wdrożonego modelu"""
        
        results = []
        
        for prompt in test_prompts:
            try:
                response = openai.ChatCompletion.create(
                    engine=deployment_name,
                    messages=[
                        {"role": "system", "content": "Jesteś pomocnym asystentem AI."},
                        {"role": "user", "content": prompt}
                    ],
                    max_tokens=150,
                    temperature=0.7
                )
                
                results.append({
                    "prompt": prompt,
                    "response": response.choices[0].message.content,
                    "tokens_used": response.usage.total_tokens,
                    "status": "success"
                })
                
            except Exception as e:
                results.append({
                    "prompt": prompt,
                    "error": str(e),
                    "status": "error"
                })
        
        return results

Wybór odpowiedniego modelu

class ModelSelector:
    def __init__(self):
        self.model_capabilities = {
            "gpt-4-turbo": {
                "max_tokens": 128000,
                "strengths": ["complex reasoning", "code generation", "analysis"],
                "best_for": ["technical documentation", "complex analysis", "creative writing"],
                "cost_tier": "high",
                "latency": "medium"
            },
            "gpt-35-turbo": {
                "max_tokens": 16385,
                "strengths": ["fast response", "cost effective", "general purpose"],
                "best_for": ["chatbots", "simple qa", "content generation"],
                "cost_tier": "low", 
                "latency": "low"
            },
            "gpt-4-vision": {
                "max_tokens": 128000,
                "strengths": ["image understanding", "multimodal", "visual analysis"],
                "best_for": ["document analysis", "image description", "visual qa"],
                "cost_tier": "high",
                "latency": "medium"
            },
            "text-embedding-ada-002": {
                "max_tokens": 8192,
                "strengths": ["semantic similarity", "search", "clustering"],
                "best_for": ["semantic search", "recommendations", "similarity"],
                "cost_tier": "very_low",
                "latency": "very_low"
            }
        }
    
    def recommend_model(self, use_case_requirements):
        """Rekomendacja modelu na podstawie wymagań"""
        
        requirements = use_case_requirements
        recommendations = []
        
        for model, capabilities in self.model_capabilities.items():
            score = 0
            reasons = []
            
            # Ocena na podstawie przypadku użycia
            if requirements.get("task_complexity") == "high" and "complex reasoning" in capabilities["strengths"]:
                score += 3
                reasons.append("excellent for complex reasoning")
            
            if requirements.get("response_time") == "fast" and capabilities["latency"] == "low":
                score += 2
                reasons.append("fast response times")
            
            if requirements.get("cost_sensitivity") == "high" and capabilities["cost_tier"] in ["low", "very_low"]:
                score += 2
                reasons.append("cost effective")
            
            if requirements.get("context_length") and requirements["context_length"] <= capabilities["max_tokens"]:
                score += 1
                reasons.append("supports required context length")
            
            # Sprawdź czy model obsługuje wymagane funkcje
            required_features = requirements.get("features", [])
            for feature in required_features:
                if feature in capabilities["strengths"]:
                    score += 2
                    reasons.append(f"supports {feature}")
            
            if score > 0:
                recommendations.append({
                    "model": model,
                    "score": score,
                    "reasons": reasons,
                    "capabilities": capabilities
                })
        
        # Sortuj według score
        recommendations.sort(key=lambda x: x["score"], reverse=True)
        
        return recommendations
    
    def calculate_cost_estimate(self, model_name, monthly_usage):
        """Oszacowanie miesięcznych kosztów"""
        
        # Przykładowe ceny (mogą się zmieniać)
        pricing = {
            "gpt-4-turbo": {"input": 0.01, "output": 0.03},  # per 1K tokens
            "gpt-35-turbo": {"input": 0.0005, "output": 0.0015},
            "gpt-4-vision": {"input": 0.01, "output": 0.03},
            "text-embedding-ada-002": {"input": 0.0001, "output": 0}
        }
        
        if model_name not in pricing:
            return {"error": "Model pricing not available"}
        
        model_pricing = pricing[model_name]
        
        # Oszacowanie na podstawie użycia
        input_tokens = monthly_usage.get("input_tokens", 0)
        output_tokens = monthly_usage.get("output_tokens", 0)
        
        input_cost = (input_tokens / 1000) * model_pricing["input"]
        output_cost = (output_tokens / 1000) * model_pricing["output"]
        
        return {
            "model": model_name,
            "monthly_cost": input_cost + output_cost,
            "breakdown": {
                "input_cost": input_cost,
                "output_cost": output_cost,
                "input_tokens": input_tokens,
                "output_tokens": output_tokens
            }
        }

⚡ Optymalizacja wydajności

Strategie optymalizacji

import time
import asyncio
from typing import List, Dict, Optional

class LLMOptimizer:
    def __init__(self, deployment_name):
        self.deployment_name = deployment_name
        self.response_cache = {}
        self.token_usage_tracker = {}
        self.performance_metrics = {
            "total_requests": 0,
            "cache_hits": 0,
            "average_latency": 0,
            "total_tokens": 0
        }
    
    async def optimized_completion(self, messages, **kwargs):
        """Optymalizowane wywołanie API z cache i monitoring"""
        
        # Generowanie klucza cache
        cache_key = self._generate_cache_key(messages, kwargs)
        
        # Sprawdź cache
        if cache_key in self.response_cache:
            self.performance_metrics["cache_hits"] += 1
            return self.response_cache[cache_key]
        
        # Optymalizacja parametrów
        optimized_params = self._optimize_parameters(messages, kwargs)
        
        # Wywołanie API z monitoring
        start_time = time.time()
        
        try:
            response = await self._make_api_call_with_retry(messages, optimized_params)
            
            # Monitoring wydajności
            latency = (time.time() - start_time) * 1000
            self._update_performance_metrics(response, latency)
            
            # Cache odpowiedzi
            if response and optimized_params.get("temperature", 0.7) < 0.3:
                # Cache tylko dla deterministycznych odpowiedzi
                self.response_cache[cache_key] = response
            
            return response
            
        except Exception as e:
            print(f"API call failed: {str(e)}")
            raise
    
    def _optimize_parameters(self, messages, kwargs):
        """Dynamiczna optymalizacja parametrów API"""
        
        # Analiza długości wejścia
        total_length = sum(len(msg["content"]) for msg in messages)
        
        optimized = kwargs.copy()
        
        # Optymalizacja max_tokens
        if total_length < 500:
            optimized["max_tokens"] = min(kwargs.get("max_tokens", 1000), 200)
        elif total_length > 3000:
            optimized["max_tokens"] = min(kwargs.get("max_tokens", 1000), 500)
        
        # Optymalizacja temperature dla różnych zadań
        if any("analyze" in msg["content"].lower() for msg in messages):
            optimized["temperature"] = 0.1  # Więcej determinizmu dla analizy
        elif any("create" in msg["content"].lower() for msg in messages):
            optimized["temperature"] = 0.8  # Więcej kreatywności
        
        # Optymalizacja presence_penalty
        if "conversation" in str(messages).lower():
            optimized["presence_penalty"] = 0.6  # Zmniejsz powtarzanie
        
        return optimized
    
    async def batch_process(self, requests: List[Dict], batch_size: int = 5):
        """Przetwarzanie wsadowe zapytań"""
        
        results = []
        
        # Podziel na batche
        for i in range(0, len(requests), batch_size):
            batch = requests[i:i + batch_size]
            
            # Przetwarzaj batch równolegle
            batch_tasks = [
                self.optimized_completion(
                    messages=req["messages"],
                    **req.get("parameters", {})
                )
                for req in batch
            ]
            
            batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
            results.extend(batch_results)
            
            # Rate limiting - pauza między batchami
            await asyncio.sleep(1)
        
        return results
    
    def get_performance_report(self):
        """Raport wydajności systemu"""
        
        metrics = self.performance_metrics.copy()
        
        # Oblicz dodatkowe metryki
        if metrics["total_requests"] > 0:
            cache_hit_rate = (metrics["cache_hits"] / metrics["total_requests"]) * 100
            metrics["cache_hit_rate"] = f"{cache_hit_rate:.1f}%"
        
        # Tokeny na request
        if metrics["total_requests"] > 0:
            metrics["avg_tokens_per_request"] = metrics["total_tokens"] / metrics["total_requests"]
        
        return {
            "performance_metrics": metrics,
            "optimization_recommendations": self._get_optimization_recommendations()
        }
    
    def _get_optimization_recommendations(self):
        """Rekomendacje optymalizacji na podstawie metryk"""
        
        recommendations = []
        
        metrics = self.performance_metrics
        
        # Rekomendacje na podstawie cache hit rate
        if metrics["total_requests"] > 100:
            cache_rate = (metrics["cache_hits"] / metrics["total_requests"]) * 100
            if cache_rate < 20:
                recommendations.append(
                    "Consider reducing temperature for more deterministic responses to improve cache hit rate"
                )
        
        # Rekomendacje na podstawie średniej latencji
        if metrics["average_latency"] > 5000:  # 5 sekund
            recommendations.append("High latency detected - consider using smaller model or reducing max_tokens")
        
        # Rekomendacje na podstawie użycia tokenów
        if metrics["total_requests"] > 50:
            avg_tokens = metrics["total_tokens"] / metrics["total_requests"]
            if avg_tokens > 1000:
                recommendations.append("High token usage - consider prompt optimization or response length limits")
        
        return recommendations

🛡️ Production best practices

Monitorowanie i obsługa błędów

import logging
from azure.monitor.opentelemetry import configure_azure_monitor
import opentelemetry.trace as trace

class ProductionLLMService:
    def __init__(self, deployment_name, monitoring_connection_string=None):
        self.deployment_name = deployment_name
        
        # Konfiguracja monitoring
        if monitoring_connection_string:
            configure_azure_monitor(connection_string=monitoring_connection_string)
            self.tracer = trace.get_tracer(__name__)
        
        # Konfiguracja logowania
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
        
    async def safe_completion(self, messages, max_retries=3, **kwargs):
        """Bezpieczne wywołanie API z retry logic i monitoring"""
        
        with self.tracer.start_as_current_span("llm_completion") as span:
            span.set_attribute("deployment", self.deployment_name)
            span.set_attribute("message_count", len(messages))
            
            last_error = None
            
            for attempt in range(max_retries):
                try:
                    # Walidacja wejścia
                    self._validate_input(messages, kwargs)
                    
                    # Wywołanie API
                    response = await openai.ChatCompletion.acreate(
                        engine=self.deployment_name,
                        messages=messages,
                        **kwargs
                    )
                    
                    # Walidacja odpowiedzi
                    validated_response = self._validate_response(response)
                    
                    # Metryki sukcesu
                    span.set_attribute("success", True)
                    span.set_attribute("tokens_used", response.usage.total_tokens)
                    
                    self.logger.info(
                        f"LLM completion successful - tokens: {response.usage.total_tokens}"
                    )
                    
                    return validated_response
                    
                except openai.error.RateLimitError as e:
                    wait_time = 2 ** attempt  # Exponential backoff
                    self.logger.warning(f"Rate limit hit, waiting {wait_time}s (attempt {attempt + 1})")
                    await asyncio.sleep(wait_time)
                    last_error = e
                    
                except openai.error.ServiceUnavailableError as e:
                    wait_time = 5 * (attempt + 1)
                    self.logger.warning(f"Service unavailable, waiting {wait_time}s (attempt {attempt + 1})")
                    await asyncio.sleep(wait_time)
                    last_error = e
                    
                except Exception as e:
                    self.logger.error(f"Unexpected error in LLM completion: {str(e)}")
                    last_error = e
                    if attempt == max_retries - 1:  # Last attempt
                        break
                    await asyncio.sleep(1)
            
            # Wszystkie próby zakończone niepowodzeniem
            span.set_attribute("success", False)
            span.set_attribute("error", str(last_error))
            
            self.logger.error(f"LLM completion failed after {max_retries} attempts: {str(last_error)}")
            raise last_error
    
    def _validate_input(self, messages, kwargs):
        """Walidacja parametrów wejściowych"""
        
        # Sprawdź strukturę messages
        if not isinstance(messages, list) or len(messages) == 0:
            raise ValueError("Messages must be non-empty list")
        
        for msg in messages:
            if "role" not in msg or "content" not in msg:
                raise ValueError("Each message must have 'role' and 'content'")
            
            if msg["role"] not in ["system", "user", "assistant"]:
                raise ValueError("Invalid message role")
        
        # Sprawdź parametry
        max_tokens = kwargs.get("max_tokens", 1000)
        if max_tokens > 4000:
            self.logger.warning(f"High max_tokens value: {max_tokens}")
        
        temperature = kwargs.get("temperature", 0.7)
        if not 0 <= temperature <= 2:
            raise ValueError("Temperature must be between 0 and 2")
    
    def _validate_response(self, response):
        """Walidacja odpowiedzi API"""
        
        if not response or not response.choices:
            raise ValueError("Empty response from API")
        
        choice = response.choices[0]
        if not choice.message or not choice.message.content:
            raise ValueError("No content in API response")
        
        # Sprawdź finish_reason
        if choice.finish_reason == "content_filter":
            raise ValueError("Response blocked by content filter")
        
        return response
    
    async def health_check(self):
        """Sprawdzenie zdrowia usługi"""
        
        try:
            test_response = await self.safe_completion(
                messages=[{"role": "user", "content": "Test"}],
                max_tokens=5,
                temperature=0
            )
            
            return {
                "status": "healthy",
                "deployment": self.deployment_name,
                "response_time_ms": getattr(test_response, 'response_ms', None)
            }
            
        except Exception as e:
            return {
                "status": "unhealthy",
                "deployment": self.deployment_name,
                "error": str(e)
            }

✅ Zadania praktyczne

Zadanie 1: Deployment modeli (45 min)

  1. Skonfiguruj Azure OpenAI Service
  2. Wdróż GPT-4 Turbo i GPT-3.5 Turbo
  3. Przetestuj oba modele z różnymi promptami
  4. Porównaj wydajność i koszty

Zadanie 2: Optymalizacja performance (30 min)

  1. Zaimplementuj system cache dla odpowiedzi
  2. Dodaj monitoring użycia tokenów
  3. Skonfiguruj automatyczne retry logic
  4. Przetestuj pod obciążeniem

Zadanie 3: Production monitoring (30 min)

  1. Skonfiguruj Application Insights
  2. Zaimplementuj custom metryki
  3. Stwórz dashboard monitoringu
  4. Skonfiguruj alerty

Zadanie 4: Cost optimization (15 min)

  1. Przeanalizuj patter użycia
  2. Zoptymalizuj parametry modeli
  3. Zaimplementuj budżetowe alarmy
  4. Przygotuj raport ROI

📊 Metryki sukcesu

  • Dostępność systemu > 99.5%
  • Średnia latencja < 3 sekundy
  • Cache hit rate > 30%
  • Cost per request optymalizacja o 20%

📚 Materiały dodatkowe

💡 Wskazówka

Każda sesja to 2 godziny intensywnej nauki z praktycznymi ćwiczeniami. Materiały można przeglądać w dowolnym tempie.

📈 Postęp

Śledź swój postęp w nauce AI i przygotowaniu do certyfikacji Azure AI-102. Każdy moduł buduje na poprzednim.