Sesja 13: Wdrażanie dużych modeli językowych
Praktyczne deployment i konfiguracja LLM
🎯 Cele sesji
- Konfiguracja Azure OpenAI Service dla enterprise
- Deployment różnych modeli LLM (GPT-4/5, Embeddings)
- Optymalizacja wydajności i kosztów
- Best practices dla production deployments
🧠 Architektura dużych modeli językowych
Podstawy LLM
Large Language Models to sieci neuronowe wytrenowane na ogromnych zbiorach tekstów, zdolne do generowania i rozumienia języka naturalnego.
Kluczowe komponenty:
- Tokenizacja - podział tekstu na tokeny
- Embeddingi - reprezentacja tokenów w przestrzeni wektorowej
- Attention mechanism - mechanizm uwagi łączący tokeny
- Transformer layers - warstwy przetwarzające informacje
Przegląd architektury Transformer
TEKST WEJŚCIOWY → TOKENIZACJA → EMBEDDING → WARSTWY ATTENTION → GENEROWANIE
Proces przetwarzania:
- Input Processing - tokenizacja i embedding
- Multi-Head Attention - analiza relacji między tokenami
- Feed-Forward Networks - nieliniowe transformacje
- Layer Normalization - stabilizacja treningu
- Output Generation - generowanie następnych tokenów
🔧 Azure OpenAI Service
Konfiguracja i deployment
import openai
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient
import os
class AzureOpenAIManager:
def __init__(self, subscription_id, resource_group, workspace_name):
self.credential = DefaultAzureCredential()
self.ml_client = MLClient(
credential=self.credential,
subscription_id=subscription_id,
resource_group_name=resource_group,
workspace_name=workspace_name
)
# Konfiguracja Azure OpenAI
openai.api_type = "azure"
openai.api_base = os.getenv("AZURE_OPENAI_ENDPOINT")
openai.api_version = "2024-02-01"
openai.api_key = os.getenv("AZURE_OPENAI_KEY")
def deploy_language_model(self, model_name, deployment_name, capacity=10):
"""Deployment modelu językowego w Azure OpenAI"""
deployment_config = {
"model": {
"format": "OpenAI",
"name": model_name, # np. "gpt-4", "gpt-35-turbo"
"version": "latest"
},
"scale_settings": {
"scale_type": "Standard",
"capacity": capacity # TPM (Tokens Per Minute)
}
}
print(f"Deploying {model_name} as {deployment_name}...")
# W rzeczywistości deployment odbywa się przez Azure Portal lub CLI
# To jest przykład konfiguracji
return {
"deployment_name": deployment_name,
"model": model_name,
"capacity": capacity,
"status": "deployed",
"endpoint": f"https://{deployment_name}.openai.azure.com/"
}
def test_model_deployment(self, deployment_name, test_prompts):
"""Testowanie wdrożonego modelu"""
results = []
for prompt in test_prompts:
try:
response = openai.ChatCompletion.create(
engine=deployment_name,
messages=[
{"role": "system", "content": "Jesteś pomocnym asystentem AI."},
{"role": "user", "content": prompt}
],
max_tokens=150,
temperature=0.7
)
results.append({
"prompt": prompt,
"response": response.choices[0].message.content,
"tokens_used": response.usage.total_tokens,
"status": "success"
})
except Exception as e:
results.append({
"prompt": prompt,
"error": str(e),
"status": "error"
})
return results
Wybór odpowiedniego modelu
class ModelSelector:
def __init__(self):
self.model_capabilities = {
"gpt-4-turbo": {
"max_tokens": 128000,
"strengths": ["complex reasoning", "code generation", "analysis"],
"best_for": ["technical documentation", "complex analysis", "creative writing"],
"cost_tier": "high",
"latency": "medium"
},
"gpt-35-turbo": {
"max_tokens": 16385,
"strengths": ["fast response", "cost effective", "general purpose"],
"best_for": ["chatbots", "simple qa", "content generation"],
"cost_tier": "low",
"latency": "low"
},
"gpt-4-vision": {
"max_tokens": 128000,
"strengths": ["image understanding", "multimodal", "visual analysis"],
"best_for": ["document analysis", "image description", "visual qa"],
"cost_tier": "high",
"latency": "medium"
},
"text-embedding-ada-002": {
"max_tokens": 8192,
"strengths": ["semantic similarity", "search", "clustering"],
"best_for": ["semantic search", "recommendations", "similarity"],
"cost_tier": "very_low",
"latency": "very_low"
}
}
def recommend_model(self, use_case_requirements):
"""Rekomendacja modelu na podstawie wymagań"""
requirements = use_case_requirements
recommendations = []
for model, capabilities in self.model_capabilities.items():
score = 0
reasons = []
# Ocena na podstawie przypadku użycia
if requirements.get("task_complexity") == "high" and "complex reasoning" in capabilities["strengths"]:
score += 3
reasons.append("excellent for complex reasoning")
if requirements.get("response_time") == "fast" and capabilities["latency"] == "low":
score += 2
reasons.append("fast response times")
if requirements.get("cost_sensitivity") == "high" and capabilities["cost_tier"] in ["low", "very_low"]:
score += 2
reasons.append("cost effective")
if requirements.get("context_length") and requirements["context_length"] <= capabilities["max_tokens"]:
score += 1
reasons.append("supports required context length")
# Sprawdź czy model obsługuje wymagane funkcje
required_features = requirements.get("features", [])
for feature in required_features:
if feature in capabilities["strengths"]:
score += 2
reasons.append(f"supports {feature}")
if score > 0:
recommendations.append({
"model": model,
"score": score,
"reasons": reasons,
"capabilities": capabilities
})
# Sortuj według score
recommendations.sort(key=lambda x: x["score"], reverse=True)
return recommendations
def calculate_cost_estimate(self, model_name, monthly_usage):
"""Oszacowanie miesięcznych kosztów"""
# Przykładowe ceny (mogą się zmieniać)
pricing = {
"gpt-4-turbo": {"input": 0.01, "output": 0.03}, # per 1K tokens
"gpt-35-turbo": {"input": 0.0005, "output": 0.0015},
"gpt-4-vision": {"input": 0.01, "output": 0.03},
"text-embedding-ada-002": {"input": 0.0001, "output": 0}
}
if model_name not in pricing:
return {"error": "Model pricing not available"}
model_pricing = pricing[model_name]
# Oszacowanie na podstawie użycia
input_tokens = monthly_usage.get("input_tokens", 0)
output_tokens = monthly_usage.get("output_tokens", 0)
input_cost = (input_tokens / 1000) * model_pricing["input"]
output_cost = (output_tokens / 1000) * model_pricing["output"]
return {
"model": model_name,
"monthly_cost": input_cost + output_cost,
"breakdown": {
"input_cost": input_cost,
"output_cost": output_cost,
"input_tokens": input_tokens,
"output_tokens": output_tokens
}
}
⚡ Optymalizacja wydajności
Strategie optymalizacji
import time
import asyncio
from typing import List, Dict, Optional
class LLMOptimizer:
def __init__(self, deployment_name):
self.deployment_name = deployment_name
self.response_cache = {}
self.token_usage_tracker = {}
self.performance_metrics = {
"total_requests": 0,
"cache_hits": 0,
"average_latency": 0,
"total_tokens": 0
}
async def optimized_completion(self, messages, **kwargs):
"""Optymalizowane wywołanie API z cache i monitoring"""
# Generowanie klucza cache
cache_key = self._generate_cache_key(messages, kwargs)
# Sprawdź cache
if cache_key in self.response_cache:
self.performance_metrics["cache_hits"] += 1
return self.response_cache[cache_key]
# Optymalizacja parametrów
optimized_params = self._optimize_parameters(messages, kwargs)
# Wywołanie API z monitoring
start_time = time.time()
try:
response = await self._make_api_call_with_retry(messages, optimized_params)
# Monitoring wydajności
latency = (time.time() - start_time) * 1000
self._update_performance_metrics(response, latency)
# Cache odpowiedzi
if response and optimized_params.get("temperature", 0.7) < 0.3:
# Cache tylko dla deterministycznych odpowiedzi
self.response_cache[cache_key] = response
return response
except Exception as e:
print(f"API call failed: {str(e)}")
raise
def _optimize_parameters(self, messages, kwargs):
"""Dynamiczna optymalizacja parametrów API"""
# Analiza długości wejścia
total_length = sum(len(msg["content"]) for msg in messages)
optimized = kwargs.copy()
# Optymalizacja max_tokens
if total_length < 500:
optimized["max_tokens"] = min(kwargs.get("max_tokens", 1000), 200)
elif total_length > 3000:
optimized["max_tokens"] = min(kwargs.get("max_tokens", 1000), 500)
# Optymalizacja temperature dla różnych zadań
if any("analyze" in msg["content"].lower() for msg in messages):
optimized["temperature"] = 0.1 # Więcej determinizmu dla analizy
elif any("create" in msg["content"].lower() for msg in messages):
optimized["temperature"] = 0.8 # Więcej kreatywności
# Optymalizacja presence_penalty
if "conversation" in str(messages).lower():
optimized["presence_penalty"] = 0.6 # Zmniejsz powtarzanie
return optimized
async def batch_process(self, requests: List[Dict], batch_size: int = 5):
"""Przetwarzanie wsadowe zapytań"""
results = []
# Podziel na batche
for i in range(0, len(requests), batch_size):
batch = requests[i:i + batch_size]
# Przetwarzaj batch równolegle
batch_tasks = [
self.optimized_completion(
messages=req["messages"],
**req.get("parameters", {})
)
for req in batch
]
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
results.extend(batch_results)
# Rate limiting - pauza między batchami
await asyncio.sleep(1)
return results
def get_performance_report(self):
"""Raport wydajności systemu"""
metrics = self.performance_metrics.copy()
# Oblicz dodatkowe metryki
if metrics["total_requests"] > 0:
cache_hit_rate = (metrics["cache_hits"] / metrics["total_requests"]) * 100
metrics["cache_hit_rate"] = f"{cache_hit_rate:.1f}%"
# Tokeny na request
if metrics["total_requests"] > 0:
metrics["avg_tokens_per_request"] = metrics["total_tokens"] / metrics["total_requests"]
return {
"performance_metrics": metrics,
"optimization_recommendations": self._get_optimization_recommendations()
}
def _get_optimization_recommendations(self):
"""Rekomendacje optymalizacji na podstawie metryk"""
recommendations = []
metrics = self.performance_metrics
# Rekomendacje na podstawie cache hit rate
if metrics["total_requests"] > 100:
cache_rate = (metrics["cache_hits"] / metrics["total_requests"]) * 100
if cache_rate < 20:
recommendations.append(
"Consider reducing temperature for more deterministic responses to improve cache hit rate"
)
# Rekomendacje na podstawie średniej latencji
if metrics["average_latency"] > 5000: # 5 sekund
recommendations.append("High latency detected - consider using smaller model or reducing max_tokens")
# Rekomendacje na podstawie użycia tokenów
if metrics["total_requests"] > 50:
avg_tokens = metrics["total_tokens"] / metrics["total_requests"]
if avg_tokens > 1000:
recommendations.append("High token usage - consider prompt optimization or response length limits")
return recommendations
🛡️ Production best practices
Monitorowanie i obsługa błędów
import logging
from azure.monitor.opentelemetry import configure_azure_monitor
import opentelemetry.trace as trace
class ProductionLLMService:
def __init__(self, deployment_name, monitoring_connection_string=None):
self.deployment_name = deployment_name
# Konfiguracja monitoring
if monitoring_connection_string:
configure_azure_monitor(connection_string=monitoring_connection_string)
self.tracer = trace.get_tracer(__name__)
# Konfiguracja logowania
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
async def safe_completion(self, messages, max_retries=3, **kwargs):
"""Bezpieczne wywołanie API z retry logic i monitoring"""
with self.tracer.start_as_current_span("llm_completion") as span:
span.set_attribute("deployment", self.deployment_name)
span.set_attribute("message_count", len(messages))
last_error = None
for attempt in range(max_retries):
try:
# Walidacja wejścia
self._validate_input(messages, kwargs)
# Wywołanie API
response = await openai.ChatCompletion.acreate(
engine=self.deployment_name,
messages=messages,
**kwargs
)
# Walidacja odpowiedzi
validated_response = self._validate_response(response)
# Metryki sukcesu
span.set_attribute("success", True)
span.set_attribute("tokens_used", response.usage.total_tokens)
self.logger.info(
f"LLM completion successful - tokens: {response.usage.total_tokens}"
)
return validated_response
except openai.error.RateLimitError as e:
wait_time = 2 ** attempt # Exponential backoff
self.logger.warning(f"Rate limit hit, waiting {wait_time}s (attempt {attempt + 1})")
await asyncio.sleep(wait_time)
last_error = e
except openai.error.ServiceUnavailableError as e:
wait_time = 5 * (attempt + 1)
self.logger.warning(f"Service unavailable, waiting {wait_time}s (attempt {attempt + 1})")
await asyncio.sleep(wait_time)
last_error = e
except Exception as e:
self.logger.error(f"Unexpected error in LLM completion: {str(e)}")
last_error = e
if attempt == max_retries - 1: # Last attempt
break
await asyncio.sleep(1)
# Wszystkie próby zakończone niepowodzeniem
span.set_attribute("success", False)
span.set_attribute("error", str(last_error))
self.logger.error(f"LLM completion failed after {max_retries} attempts: {str(last_error)}")
raise last_error
def _validate_input(self, messages, kwargs):
"""Walidacja parametrów wejściowych"""
# Sprawdź strukturę messages
if not isinstance(messages, list) or len(messages) == 0:
raise ValueError("Messages must be non-empty list")
for msg in messages:
if "role" not in msg or "content" not in msg:
raise ValueError("Each message must have 'role' and 'content'")
if msg["role"] not in ["system", "user", "assistant"]:
raise ValueError("Invalid message role")
# Sprawdź parametry
max_tokens = kwargs.get("max_tokens", 1000)
if max_tokens > 4000:
self.logger.warning(f"High max_tokens value: {max_tokens}")
temperature = kwargs.get("temperature", 0.7)
if not 0 <= temperature <= 2:
raise ValueError("Temperature must be between 0 and 2")
def _validate_response(self, response):
"""Walidacja odpowiedzi API"""
if not response or not response.choices:
raise ValueError("Empty response from API")
choice = response.choices[0]
if not choice.message or not choice.message.content:
raise ValueError("No content in API response")
# Sprawdź finish_reason
if choice.finish_reason == "content_filter":
raise ValueError("Response blocked by content filter")
return response
async def health_check(self):
"""Sprawdzenie zdrowia usługi"""
try:
test_response = await self.safe_completion(
messages=[{"role": "user", "content": "Test"}],
max_tokens=5,
temperature=0
)
return {
"status": "healthy",
"deployment": self.deployment_name,
"response_time_ms": getattr(test_response, 'response_ms', None)
}
except Exception as e:
return {
"status": "unhealthy",
"deployment": self.deployment_name,
"error": str(e)
}
✅ Zadania praktyczne
Zadanie 1: Deployment modeli (45 min)
- Skonfiguruj Azure OpenAI Service
- Wdróż GPT-4 Turbo i GPT-3.5 Turbo
- Przetestuj oba modele z różnymi promptami
- Porównaj wydajność i koszty
Zadanie 2: Optymalizacja performance (30 min)
- Zaimplementuj system cache dla odpowiedzi
- Dodaj monitoring użycia tokenów
- Skonfiguruj automatyczne retry logic
- Przetestuj pod obciążeniem
Zadanie 3: Production monitoring (30 min)
- Skonfiguruj Application Insights
- Zaimplementuj custom metryki
- Stwórz dashboard monitoringu
- Skonfiguruj alerty
Zadanie 4: Cost optimization (15 min)
- Przeanalizuj patter użycia
- Zoptymalizuj parametry modeli
- Zaimplementuj budżetowe alarmy
- Przygotuj raport ROI
📊 Metryki sukcesu
- Dostępność systemu > 99.5%
- Średnia latencja < 3 sekundy
- Cache hit rate > 30%
- Cost per request optymalizacja o 20%