Sesja 18: Warsztaty - Wdrażanie modeli i automatyzacja MLOps
Production-ready ML systems
🎯 Cele warsztatów
- Implementacja kompletnego MLOps pipeline od treningu do produkcji
- Automatyzacja deployment i monitoring modeli ML
- Blue-green deployment strategies dla ML models
- Real-time monitoring i automated rollback procedures
🏗️ Projekt warsztatowy: End-to-End MLOps System
Scenariusz biznesowy
GlobalTech Corporation potrzebuje production-ready ML system dla:
- Predictive maintenance - przewidywanie awarii maszyn
- Fraud detection - wykrywanie podejrzanych transakcji
- Customer churn prediction - identyfikacja ryzyka odejścia klientów
Wymagania techniczne:
- Automated retraining gdy performance spada
- A/B testing dla model variants
- <100ms inference latency
- 99.9% availability SLA
- Comprehensive audit trail
💻 Implementacja kompletnego systemu
Architektura MLOps Pipeline
from azure.ai.ml import MLClient, command, Input, Output
from azure.ai.ml.entities import Environment, Model, OnlineEndpoint, OnlineDeployment
from azure.ai.ml.dsl import pipeline
import asyncio
import json
from datetime import datetime, timedelta
class ProductionMLOpsSystem:
def __init__(self, ml_client, config):
self.ml_client = ml_client
self.config = config
self.deployment_strategies = {
"blue_green": self._blue_green_deployment,
"canary": self._canary_deployment,
"rolling": self._rolling_deployment
}
@pipeline(description="Production ML Pipeline with Automated Quality Gates")
def create_production_pipeline(
self,
training_data: Input(type="uri_folder"),
model_name: str,
performance_threshold: float = 0.85,
deployment_strategy: str = "blue_green"
):
"""Complete MLOps pipeline z automated deployment"""
# Step 1: Data validation and preprocessing
data_validation = command(
name="validate_data",
display_name="Data Quality Validation",
code="./src/data_validation",
command="""
python validate_data.py
--input-data ${{inputs.training_data}}
--validation-report ${{outputs.validation_report}}
--quality-threshold 0.95
""",
environment="azureml://registries/azureml/environments/sklearn-1.5/versions/1",
inputs={"training_data": training_data},
outputs={"validation_report": Output(type="uri_file")},
compute="cpu-cluster"
)
# Step 2: Feature engineering with drift detection
feature_engineering = command(
name="feature_engineering",
display_name="Feature Engineering with Drift Detection",
code="./src/feature_engineering",
command="""
python engineer_features.py
--raw-data ${{inputs.training_data}}
--baseline-features ${{inputs.baseline_features}}
--engineered-features ${{outputs.features}}
--drift-report ${{outputs.drift_report}}
""",
environment="azureml://registries/azureml/environments/sklearn-1.5/versions/1",
inputs={
"training_data": training_data,
"baseline_features": Input(
type="uri_folder",
path="azureml://datastores/workspaceblobstore/paths/baseline-features/"
)
},
outputs={
"features": Output(type="uri_folder"),
"drift_report": Output(type="uri_file")
},
compute="cpu-cluster"
)
# Step 3: Model training with hyperparameter optimization
model_training = command(
name="train_model",
display_name="Model Training with HPO",
code="./src/training",
command="""
python train_with_hpo.py
--features ${{inputs.features}}
--model-output ${{outputs.trained_model}}
--experiment-tracking ${{outputs.experiment_results}}
--model-name {model_name}
--hpo-trials 20
""",
environment="azureml://registries/azureml/environments/sklearn-1.5/versions/1",
inputs={"features": feature_engineering.outputs.features},
outputs={
"trained_model": Output(type="mlflow_model"),
"experiment_results": Output(type="uri_file")
},
compute="gpu-cluster"
)
# Step 4: Comprehensive model evaluation
model_evaluation = command(
name="evaluate_model",
display_name="Model Evaluation with Bias Testing",
code="./src/evaluation",
command="""
python evaluate_model.py
--model ${{inputs.trained_model}}
--test-features ${{inputs.features}}
--baseline-performance ${{inputs.baseline_metrics}}
--evaluation-results ${{outputs.evaluation_metrics}}
--bias-report ${{outputs.bias_analysis}}
--performance-threshold {performance_threshold}
""",
environment="azureml://registries/azureml/environments/sklearn-1.5/versions/1",
inputs={
"trained_model": model_training.outputs.trained_model,
"features": feature_engineering.outputs.features,
"baseline_metrics": Input(
type="uri_file",
path="azureml://datastores/workspaceblobstore/paths/baseline-metrics.json"
)
},
outputs={
"evaluation_metrics": Output(type="uri_file"),
"bias_analysis": Output(type="uri_file")
},
compute="cpu-cluster"
)
# Step 5: Automated model registration with approval gates
model_registration = command(
name="register_model",
display_name="Conditional Model Registration",
code="./src/registration",
command="""
python register_model.py
--model ${{inputs.trained_model}}
--evaluation-metrics ${{inputs.evaluation_metrics}}
--bias-analysis ${{inputs.bias_analysis}}
--model-name {model_name}
--performance-threshold {performance_threshold}
--registration-result ${{outputs.registration_status}}
""",
environment="azureml://registries/azureml/environments/sklearn-1.5/versions/1",
inputs={
"trained_model": model_training.outputs.trained_model,
"evaluation_metrics": model_evaluation.outputs.evaluation_metrics,
"bias_analysis": model_evaluation.outputs.bias_analysis
},
outputs={"registration_status": Output(type="uri_file")},
compute="cpu-cluster"
)
# Step 6: Automated deployment with strategy selection
model_deployment = command(
name="deploy_model",
display_name="Automated Model Deployment",
code="./src/deployment",
command="""
python deploy_model.py
--registered-model {model_name}
--deployment-strategy {deployment_strategy}
--registration-status ${{inputs.registration_status}}
--deployment-result ${{outputs.deployment_status}}
--endpoint-config ${{inputs.deployment_config}}
""",
environment="azureml://registries/azureml/environments/sklearn-1.5/versions/1",
inputs={
"registration_status": model_registration.outputs.registration_status,
"deployment_config": Input(
type="uri_file",
path="azureml://datastores/workspaceblobstore/paths/deployment-config.json"
)
},
outputs={"deployment_status": Output(type="uri_file")},
compute="cpu-cluster"
)
return {
"trained_model": model_training.outputs.trained_model,
"evaluation_metrics": model_evaluation.outputs.evaluation_metrics,
"deployment_status": model_deployment.outputs.deployment_status
}
async def execute_production_pipeline(self, pipeline_config):
"""Wykonanie production pipeline"""
print("🚀 Starting production MLOps pipeline...")
# Create pipeline job
pipeline_job = self.ml_client.jobs.create_or_update(
self.create_production_pipeline(
training_data=Input(
type="uri_folder",
path=pipeline_config["training_data_path"]
),
model_name=pipeline_config["model_name"],
performance_threshold=pipeline_config.get("performance_threshold", 0.85),
deployment_strategy=pipeline_config.get("deployment_strategy", "blue_green")
)
)
print(f"✅ Pipeline submitted: {pipeline_job.name}")
# Monitor pipeline execution
pipeline_result = await self._monitor_pipeline_execution(pipeline_job.name)
return pipeline_result
async def _monitor_pipeline_execution(self, pipeline_name):
"""Monitor pipeline execution with real-time updates"""
while True:
# Get pipeline status
pipeline_job = self.ml_client.jobs.get(pipeline_name)
print(f"📊 Pipeline status: {pipeline_job.status}")
if pipeline_job.status == "Completed":
print("✅ Pipeline completed successfully!")
# Get pipeline outputs
outputs = pipeline_job.outputs
return {
"status": "completed",
"pipeline_name": pipeline_name,
"outputs": outputs,
"completion_time": datetime.utcnow().isoformat()
}
elif pipeline_job.status == "Failed":
print(f"❌ Pipeline failed: {pipeline_job.error}")
return {
"status": "failed",
"pipeline_name": pipeline_name,
"error": str(pipeline_job.error),
"failure_time": datetime.utcnow().isoformat()
}
elif pipeline_job.status in ["Running", "Preparing", "Queued"]:
print("⏳ Pipeline still running...")
await asyncio.sleep(60) # Check every minute
else:
print(f"🔄 Pipeline status: {pipeline_job.status}")
await asyncio.sleep(30)
async def setup_automated_monitoring(self, model_endpoint_name):
"""Setup comprehensive monitoring dla production model"""
monitoring_config = {
"performance_monitoring": {
"metrics": ["accuracy", "precision", "recall", "f1_score", "auc"],
"thresholds": {
"accuracy": 0.85,
"precision": 0.80,
"recall": 0.80,
"f1_score": 0.82
},
"evaluation_frequency": "daily",
"alert_channels": ["email", "teams", "slack"]
},
"operational_monitoring": {
"metrics": ["request_rate", "response_time", "error_rate", "availability"],
"thresholds": {
"response_time_ms": 100,
"error_rate_percent": 1.0,
"availability_percent": 99.9
},
"evaluation_frequency": "real-time",
"alert_channels": ["email", "pagerduty"]
},
"data_drift_monitoring": {
"baseline_period": "last_30_days",
"drift_threshold": 0.1,
"evaluation_frequency": "daily",
"features_to_monitor": "all",
"alert_channels": ["email", "teams"]
},
"business_metrics": {
"cost_per_prediction": 0.001, # $0.001 per prediction
"daily_prediction_volume": 100000,
"user_satisfaction_score": 4.0 # out of 5
}
}
# Implement monitoring logic
monitoring_system = ModelMonitoringSystem(
model_endpoint_name,
monitoring_config
)
await monitoring_system.initialize()
print(f"📊 Monitoring configured for endpoint: {model_endpoint_name}")
return monitoring_config
class ModelMonitoringSystem:
def __init__(self, endpoint_name, config):
self.endpoint_name = endpoint_name
self.config = config
self.alert_manager = AlertManager()
async def initialize(self):
"""Initialize monitoring system"""
# Setup performance monitoring
await self._setup_performance_monitoring()
# Setup operational monitoring
await self._setup_operational_monitoring()
# Setup data drift monitoring
await self._setup_drift_monitoring()
# Setup alerting rules
await self._configure_alerting()
print("✅ Monitoring system initialized")
async def _setup_performance_monitoring(self):
"""Setup model performance monitoring"""
performance_config = self.config["performance_monitoring"]
# Create scheduled job dla performance evaluation
performance_job = {
"name": f"{self.endpoint_name}-performance-monitor",
"schedule": self._convert_frequency_to_cron(performance_config["evaluation_frequency"]),
"script": "scripts/monitor_model_performance.py",
"parameters": {
"endpoint_name": self.endpoint_name,
"metrics": performance_config["metrics"],
"thresholds": performance_config["thresholds"]
}
}
print("📈 Performance monitoring configured")
return performance_job
async def _setup_drift_monitoring(self):
"""Setup data drift monitoring"""
drift_config = self.config["data_drift_monitoring"]
# Create drift detection job
drift_job = {
"name": f"{self.endpoint_name}-drift-monitor",
"schedule": self._convert_frequency_to_cron(drift_config["evaluation_frequency"]),
"script": "scripts/detect_data_drift.py",
"parameters": {
"endpoint_name": self.endpoint_name,
"baseline_period": drift_config["baseline_period"],
"drift_threshold": drift_config["drift_threshold"]
}
}
print("📊 Data drift monitoring configured")
return drift_job
🛠️ Warsztat praktyczny (120 min)
Implementacja krok po kroku
Krok 1: Infrastructure Setup (30 min)
# Setup complete MLOps infrastructure
async def setup_mlops_infrastructure():
"""Setup complete infrastructure dla MLOps"""
infrastructure_config = {
"resource_group": "rg-mlops-workshop",
"workspace_name": "mlops-workspace",
"compute_clusters": [
{
"name": "cpu-cluster",
"size": "Standard_DS3_v2",
"min_nodes": 0,
"max_nodes": 4
},
{
"name": "gpu-cluster",
"size": "Standard_NC6s_v3",
"min_nodes": 0,
"max_nodes": 2
}
],
"datastores": [
"training-data",
"model-artifacts",
"monitoring-data"
],
"environments": [
"sklearn-production",
"pytorch-training",
"monitoring-env"
]
}
# Deploy infrastructure using Azure CLI commands
setup_commands = [
f"az group create --name {infrastructure_config['resource_group']} --location eastus",
f"az ml workspace create --name {infrastructure_config['workspace_name']} --resource-group {infrastructure_config['resource_group']}",
"az ml compute create --name cpu-cluster --type AmlCompute --size Standard_DS3_v2 --min-instances 0 --max-instances 4",
"az ml compute create --name gpu-cluster --type AmlCompute --size Standard_NC6s_v3 --min-instances 0 --max-instances 2"
]
print("🏗️ Setting up MLOps infrastructure...")
for command in setup_commands:
print(f"Executing: {command}")
# W rzeczywistości wykonałby te komendy
print("✅ Infrastructure setup completed")
return infrastructure_config
# Workshop setup
workshop_config = {
"subscription_id": "your-subscription-id",
"resource_group": "rg-mlops-workshop",
"workspace_name": "mlops-workspace",
"model_name": "churn-prediction-model",
"training_data_path": "azureml://datastores/workspaceblobstore/paths/churn-data/"
}
# Initialize MLOps system
ml_client = MLClient.from_config()
mlops_system = ProductionMLOpsSystem(ml_client, workshop_config)
# Setup infrastructure
infrastructure = await setup_mlops_infrastructure()
Krok 2: Pipeline Development (45 min)
# Implementacja training i evaluation scripts
# src/data_validation/validate_data.py
class DataQualityValidator:
def __init__(self):
self.quality_checks = [
self._check_completeness,
self._check_consistency,
self._check_validity,
self._check_distribution_stability
]
def validate_dataset(self, data_path, quality_threshold=0.95):
"""Comprehensive data quality validation"""
import pandas as pd
# Load dataset
df = pd.read_csv(data_path + "/training_data.csv")
validation_results = {
"total_records": len(df),
"quality_score": 0,
"checks_passed": 0,
"total_checks": len(self.quality_checks),
"issues": [],
"recommendations": []
}
# Run wszystkie quality checks
for check in self.quality_checks:
try:
check_result = check(df)
if check_result["passed"]:
validation_results["checks_passed"] += 1
else:
validation_results["issues"].extend(check_result["issues"])
validation_results["recommendations"].extend(check_result["recommendations"])
except Exception as e:
validation_results["issues"].append(f"Check failed: {str(e)}")
# Calculate overall quality score
validation_results["quality_score"] = validation_results["checks_passed"] / validation_results["total_checks"]
# Determine if data passes quality threshold
validation_results["passes_threshold"] = validation_results["quality_score"] >= quality_threshold
if not validation_results["passes_threshold"]:
raise ValueError(f"Data quality score {validation_results['quality_score']:.2f} below threshold {quality_threshold}")
return validation_results
def _check_completeness(self, df):
"""Check for missing values and completeness"""
missing_percentages = df.isnull().sum() / len(df)
high_missing_cols = missing_percentages[missing_percentages > 0.1].index.tolist()
return {
"passed": len(high_missing_cols) == 0,
"issues": [f"Column '{col}' has {missing_percentages[col]:.1%} missing values" for col in high_missing_cols],
"recommendations": ["Consider imputation strategies dla high-missing columns"] if high_missing_cols else []
}
def _check_distribution_stability(self, df):
"""Check for distribution shifts in key features"""
# Simplified check - w rzeczywistości porównałby z baseline distribution
numeric_cols = df.select_dtypes(include=['number']).columns
distribution_issues = []
for col in numeric_cols:
# Check dla outliers
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
outlier_count = len(df[(df[col] < Q1 - 1.5*IQR) | (df[col] > Q3 + 1.5*IQR)])
outlier_percentage = outlier_count / len(df)
if outlier_percentage > 0.05: # >5% outliers
distribution_issues.append(f"Column '{col}' has {outlier_percentage:.1%} outliers")
return {
"passed": len(distribution_issues) == 0,
"issues": distribution_issues,
"recommendations": ["Review data collection process dla outlier handling"] if distribution_issues else []
}
# src/training/train_with_hpo.py
class HyperparameterOptimizer:
def __init__(self):
self.optimization_methods = {
"grid_search": self._grid_search_optimization,
"random_search": self._random_search_optimization,
"bayesian": self._bayesian_optimization
}
def optimize_model(self, X_train, y_train, X_val, y_val, trials=20):
"""Hyperparameter optimization z MLflow tracking"""
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import randint, uniform
# Define hyperparameter search space
param_distributions = {
'n_estimators': randint(50, 200),
'max_depth': [None, 10, 20, 30],
'min_samples_split': randint(2, 20),
'min_samples_leaf': randint(1, 10),
'max_features': ['auto', 'sqrt', 'log2']
}
best_results = {"score": 0, "model": None, "params": {}}
with mlflow.start_run(run_name="hyperparameter_optimization"):
# Log search configuration
mlflow.log_params({
"optimization_method": "random_search",
"n_trials": trials,
"search_space": str(param_distributions)
})
# Perform hyperparameter search
base_model = RandomForestClassifier(random_state=42)
random_search = RandomizedSearchCV(
base_model,
param_distributions,
n_iter=trials,
cv=5,
scoring='f1_weighted',
random_state=42,
n_jobs=-1
)
# Fit the search
print("🔍 Starting hyperparameter optimization...")
random_search.fit(X_train, y_train)
# Get best model
best_model = random_search.best_estimator_
# Evaluate on validation set
val_predictions = best_model.predict(X_val)
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
val_metrics = {
"val_accuracy": accuracy_score(y_val, val_predictions),
"val_f1": f1_score(y_val, val_predictions, average='weighted'),
"val_precision": precision_score(y_val, val_predictions, average='weighted'),
"val_recall": recall_score(y_val, val_predictions, average='weighted')
}
# Log results
mlflow.log_params(random_search.best_params_)
mlflow.log_metrics(val_metrics)
mlflow.log_metric("cv_score", random_search.best_score_)
# Save model
mlflow.sklearn.log_model(best_model, "optimized_model")
print(f"✅ Optimization completed - Best CV Score: {random_search.best_score_:.3f}")
print(f"📊 Validation F1: {val_metrics['val_f1']:.3f}")
return {
"best_model": best_model,
"best_params": random_search.best_params_,
"cv_score": random_search.best_score_,
"validation_metrics": val_metrics
}
Krok 3: Automated Deployment (30 min)
class AutomatedDeploymentManager:
def __init__(self, ml_client):
self.ml_client = ml_client
async def deploy_with_quality_gates(self, model_name, model_version,
deployment_config):
"""Deploy model z automated quality gates"""
endpoint_name = f"{model_name}-endpoint"
try:
# Step 1: Pre-deployment validation
print("🔍 Running pre-deployment validation...")
validation_result = await self._validate_model_for_deployment(
model_name, model_version
)
if not validation_result["passed"]:
raise Exception(f"Model validation failed: {validation_result['issues']}")
# Step 2: Create/update endpoint
print("🔧 Configuring endpoint...")
endpoint = await self._ensure_endpoint_exists(endpoint_name)
# Step 3: Deploy using selected strategy
deployment_strategy = deployment_config.get("strategy", "blue_green")
if deployment_strategy == "blue_green":
deployment_result = await self._blue_green_deployment(
endpoint_name, model_name, model_version
)
elif deployment_strategy == "canary":
deployment_result = await self._canary_deployment(
endpoint_name, model_name, model_version,
deployment_config.get("canary_percentage", 10)
)
else:
raise ValueError(f"Unsupported deployment strategy: {deployment_strategy}")
# Step 4: Post-deployment verification
print("✅ Running post-deployment tests...")
verification_result = await self._verify_deployment(
endpoint_name, deployment_result["deployment_name"]
)
if verification_result["success"]:
print("🎉 Deployment successful and verified!")
# Setup monitoring
await self._setup_deployment_monitoring(
endpoint_name, deployment_result["deployment_name"]
)
return {
"status": "success",
"endpoint_name": endpoint_name,
"deployment_name": deployment_result["deployment_name"],
"verification": verification_result
}
else:
# Automatic rollback
print("❌ Verification failed, initiating rollback...")
await self._rollback_deployment(endpoint_name, deployment_result["deployment_name"])
return {
"status": "failed_verification",
"error": verification_result["error"],
"rollback_completed": True
}
except Exception as e:
print(f"❌ Deployment failed: {str(e)}")
return {
"status": "failed",
"error": str(e)
}
async def _blue_green_deployment(self, endpoint_name, model_name, model_version):
"""Blue-green deployment implementation"""
# Determine current i new deployment colors
current_deployments = list(self.ml_client.online_deployments.list(endpoint_name))
if current_deployments:
current_color = current_deployments[0].name
new_color = "green" if current_color == "blue" else "blue"
else:
current_color = None
new_color = "blue"
print(f"🔵🟢 Blue-Green: Current={current_color}, New={new_color}")
# Create new deployment
new_deployment = ManagedOnlineDeployment(
name=new_color,
endpoint_name=endpoint_name,
model=f"{model_name}:{model_version}",
instance_type="Standard_DS3_v2",
instance_count=1,
environment_variables={
"DEPLOYMENT_COLOR": new_color,
"MODEL_VERSION": model_version
}
)
# Deploy
deployment_poller = self.ml_client.online_deployments.begin_create_or_update(new_deployment)
deployment_result = deployment_poller.result()
print(f"✅ {new_color} deployment created")
return {
"deployment_name": new_color,
"previous_deployment": current_color,
"endpoint_name": endpoint_name
}
async def _verify_deployment(self, endpoint_name, deployment_name):
"""Comprehensive deployment verification"""
verification_tests = [
self._test_basic_functionality,
self._test_performance_requirements,
self._test_error_handling,
self._test_load_capacity
]
verification_results = {
"success": True,
"tests_passed": 0,
"total_tests": len(verification_tests),
"test_results": []
}
for test_func in verification_tests:
try:
test_result = await test_func(endpoint_name, deployment_name)
verification_results["test_results"].append(test_result)
if test_result["passed"]:
verification_results["tests_passed"] += 1
print(f"✅ {test_result['test_name']}: PASSED")
else:
verification_results["success"] = False
print(f"❌ {test_result['test_name']}: FAILED - {test_result['reason']}")
except Exception as e:
verification_results["success"] = False
verification_results["test_results"].append({
"test_name": test_func.__name__,
"passed": False,
"error": str(e)
})
print(f"❌ {test_func.__name__}: ERROR - {str(e)}")
verification_results["success_rate"] = verification_results["tests_passed"] / verification_results["total_tests"]
return verification_results
async def _test_basic_functionality(self, endpoint_name, deployment_name):
"""Test basic model functionality"""
# Get endpoint scoring URI
endpoint = self.ml_client.online_endpoints.get(endpoint_name)
scoring_uri = endpoint.scoring_uri
# Test data
test_payload = {
"data": [
[25, 50000, 2, 1, 0.8], # Sample customer data
[45, 80000, 5, 0, 0.3],
[35, 60000, 3, 1, 0.6]
]
}
try:
import requests
response = requests.post(
scoring_uri,
json=test_payload,
headers={
"Authorization": f"Bearer {self._get_auth_token()}",
"Content-Type": "application/json"
},
timeout=10
)
if response.status_code == 200:
predictions = response.json()
# Validate response format
if isinstance(predictions, list) and len(predictions) == 3:
return {
"test_name": "basic_functionality",
"passed": True,
"predictions": predictions,
"response_time_ms": response.elapsed.total_seconds() * 1000
}
else:
return {
"test_name": "basic_functionality",
"passed": False,
"reason": f"Invalid response format: {predictions}"
}
else:
return {
"test_name": "basic_functionality",
"passed": False,
"reason": f"HTTP {response.status_code}: {response.text}"
}
except Exception as e:
return {
"test_name": "basic_functionality",
"passed": False,
"error": str(e)
}
Krok 4: Monitoring Implementation (15 min)
class RealTimeMonitoring:
def __init__(self, endpoint_name):
self.endpoint_name = endpoint_name
self.metrics_collector = MetricsCollector()
async def setup_comprehensive_monitoring(self):
"""Setup complete monitoring system"""
monitoring_components = [
"performance_metrics",
"operational_metrics",
"business_metrics",
"data_drift_detection",
"model_explainability"
]
for component in monitoring_components:
await self._setup_monitoring_component(component)
print("📊 Comprehensive monitoring system active")
return {"status": "monitoring_active", "components": monitoring_components}
async def _setup_monitoring_component(self, component_type):
"""Setup specific monitoring component"""
if component_type == "performance_metrics":
# Setup performance tracking
await self._setup_performance_tracking()
elif component_type == "data_drift_detection":
# Setup drift detection
await self._setup_drift_detection()
# ... other components
print(f"✅ {component_type} monitoring configured")
✅ Zadania warsztatowe
Zadanie główne: Complete MLOps System (90 min)
Implementacja:
- Infrastructure setup (30 min) - Azure resources, compute, storage
- Pipeline development (45 min) - training, validation, deployment
- Monitoring setup (15 min) - comprehensive observability
Zadania dodatkowe
Zadanie 1: Advanced Deployment (20 min)
- Canary deployment implementation
- Automated rollback triggers
- Multi-region deployment
Zadanie 2: Cost Optimization (15 min)
- Resource auto-scaling
- Cost monitoring i alerting
- Efficiency improvements
Zadanie 3: Compliance i Audit (15 min)
- Audit trail implementation
- Compliance reporting
- Security scanning integration
📊 Kryteria oceny
Technical Implementation (60 punktów)
- Complete MLOps pipeline (25 pkt)
- Automated deployment (20 pkt)
- Monitoring system (15 pkt)
Operational Excellence (25 punktów)
- Error handling i recovery (10 pkt)
- Performance optimization (10 pkt)
- Documentation (5 pkt)
Innovation (15 punktów)
- Advanced features implementation (15 pkt)
🏆 Rezultat warsztatów
Po ukończeniu uczestnicy będą mieli:
- Production MLOps pipeline - fully automated
- Deployment strategies - blue-green, canary implementations
- Monitoring system - comprehensive observability
- Enterprise experience - production-ready skills