Sesja 18: Warsztaty - Wdrażanie modeli i automatyzacja MLOps

Production-ready ML systems

🎯 Cele warsztatów

  • Implementacja kompletnego MLOps pipeline od treningu do produkcji
  • Automatyzacja deployment i monitoring modeli ML
  • Blue-green deployment strategies dla ML models
  • Real-time monitoring i automated rollback procedures

🏗️ Projekt warsztatowy: End-to-End MLOps System

Scenariusz biznesowy

GlobalTech Corporation potrzebuje production-ready ML system dla:

  • Predictive maintenance - przewidywanie awarii maszyn
  • Fraud detection - wykrywanie podejrzanych transakcji
  • Customer churn prediction - identyfikacja ryzyka odejścia klientów

Wymagania techniczne:

  • Automated retraining gdy performance spada
  • A/B testing dla model variants
  • <100ms inference latency
  • 99.9% availability SLA
  • Comprehensive audit trail

💻 Implementacja kompletnego systemu

Architektura MLOps Pipeline

from azure.ai.ml import MLClient, command, Input, Output
from azure.ai.ml.entities import Environment, Model, OnlineEndpoint, OnlineDeployment
from azure.ai.ml.dsl import pipeline
import asyncio
import json
from datetime import datetime, timedelta

class ProductionMLOpsSystem:
    def __init__(self, ml_client, config):
        self.ml_client = ml_client
        self.config = config
        self.deployment_strategies = {
            "blue_green": self._blue_green_deployment,
            "canary": self._canary_deployment,
            "rolling": self._rolling_deployment
        }
        
    @pipeline(description="Production ML Pipeline with Automated Quality Gates")
    def create_production_pipeline(
        self,
        training_data: Input(type="uri_folder"),
        model_name: str,
        performance_threshold: float = 0.85,
        deployment_strategy: str = "blue_green"
    ):
        """Complete MLOps pipeline z automated deployment"""
        
        # Step 1: Data validation and preprocessing
        data_validation = command(
            name="validate_data",
            display_name="Data Quality Validation",
            code="./src/data_validation",
            command="""
            python validate_data.py 
                --input-data ${{inputs.training_data}} 
                --validation-report ${{outputs.validation_report}}
                --quality-threshold 0.95
            """,
            environment="azureml://registries/azureml/environments/sklearn-1.5/versions/1",
            inputs={"training_data": training_data},
            outputs={"validation_report": Output(type="uri_file")},
            compute="cpu-cluster"
        )
        
        # Step 2: Feature engineering with drift detection
        feature_engineering = command(
            name="feature_engineering",
            display_name="Feature Engineering with Drift Detection",
            code="./src/feature_engineering",
            command="""
            python engineer_features.py
                --raw-data ${{inputs.training_data}}
                --baseline-features ${{inputs.baseline_features}}
                --engineered-features ${{outputs.features}}
                --drift-report ${{outputs.drift_report}}
            """,
            environment="azureml://registries/azureml/environments/sklearn-1.5/versions/1",
            inputs={
                "training_data": training_data,
                "baseline_features": Input(
                    type="uri_folder", 
                    path="azureml://datastores/workspaceblobstore/paths/baseline-features/"
                )
            },
            outputs={
                "features": Output(type="uri_folder"),
                "drift_report": Output(type="uri_file")
            },
            compute="cpu-cluster"
        )
        
        # Step 3: Model training with hyperparameter optimization
        model_training = command(
            name="train_model",
            display_name="Model Training with HPO",
            code="./src/training",
            command="""
            python train_with_hpo.py
                --features ${{inputs.features}}
                --model-output ${{outputs.trained_model}}
                --experiment-tracking ${{outputs.experiment_results}}
                --model-name {model_name}
                --hpo-trials 20
            """,
            environment="azureml://registries/azureml/environments/sklearn-1.5/versions/1",
            inputs={"features": feature_engineering.outputs.features},
            outputs={
                "trained_model": Output(type="mlflow_model"),
                "experiment_results": Output(type="uri_file")
            },
            compute="gpu-cluster"
        )
        
        # Step 4: Comprehensive model evaluation
        model_evaluation = command(
            name="evaluate_model",
            display_name="Model Evaluation with Bias Testing",
            code="./src/evaluation", 
            command="""
            python evaluate_model.py
                --model ${{inputs.trained_model}}
                --test-features ${{inputs.features}}
                --baseline-performance ${{inputs.baseline_metrics}}
                --evaluation-results ${{outputs.evaluation_metrics}}
                --bias-report ${{outputs.bias_analysis}}
                --performance-threshold {performance_threshold}
            """,
            environment="azureml://registries/azureml/environments/sklearn-1.5/versions/1",
            inputs={
                "trained_model": model_training.outputs.trained_model,
                "features": feature_engineering.outputs.features,
                "baseline_metrics": Input(
                    type="uri_file",
                    path="azureml://datastores/workspaceblobstore/paths/baseline-metrics.json"
                )
            },
            outputs={
                "evaluation_metrics": Output(type="uri_file"),
                "bias_analysis": Output(type="uri_file")
            },
            compute="cpu-cluster"
        )
        
        # Step 5: Automated model registration with approval gates
        model_registration = command(
            name="register_model",
            display_name="Conditional Model Registration",
            code="./src/registration",
            command="""
            python register_model.py
                --model ${{inputs.trained_model}}
                --evaluation-metrics ${{inputs.evaluation_metrics}}
                --bias-analysis ${{inputs.bias_analysis}}
                --model-name {model_name}
                --performance-threshold {performance_threshold}
                --registration-result ${{outputs.registration_status}}
            """,
            environment="azureml://registries/azureml/environments/sklearn-1.5/versions/1",
            inputs={
                "trained_model": model_training.outputs.trained_model,
                "evaluation_metrics": model_evaluation.outputs.evaluation_metrics,
                "bias_analysis": model_evaluation.outputs.bias_analysis
            },
            outputs={"registration_status": Output(type="uri_file")},
            compute="cpu-cluster"
        )
        
        # Step 6: Automated deployment with strategy selection
        model_deployment = command(
            name="deploy_model",
            display_name="Automated Model Deployment",
            code="./src/deployment",
            command="""
            python deploy_model.py
                --registered-model {model_name}
                --deployment-strategy {deployment_strategy}
                --registration-status ${{inputs.registration_status}}
                --deployment-result ${{outputs.deployment_status}}
                --endpoint-config ${{inputs.deployment_config}}
            """,
            environment="azureml://registries/azureml/environments/sklearn-1.5/versions/1",
            inputs={
                "registration_status": model_registration.outputs.registration_status,
                "deployment_config": Input(
                    type="uri_file",
                    path="azureml://datastores/workspaceblobstore/paths/deployment-config.json"
                )
            },
            outputs={"deployment_status": Output(type="uri_file")},
            compute="cpu-cluster"
        )
        
        return {
            "trained_model": model_training.outputs.trained_model,
            "evaluation_metrics": model_evaluation.outputs.evaluation_metrics,
            "deployment_status": model_deployment.outputs.deployment_status
        }
    
    async def execute_production_pipeline(self, pipeline_config):
        """Wykonanie production pipeline"""
        
        print("🚀 Starting production MLOps pipeline...")
        
        # Create pipeline job
        pipeline_job = self.ml_client.jobs.create_or_update(
            self.create_production_pipeline(
                training_data=Input(
                    type="uri_folder",
                    path=pipeline_config["training_data_path"]
                ),
                model_name=pipeline_config["model_name"],
                performance_threshold=pipeline_config.get("performance_threshold", 0.85),
                deployment_strategy=pipeline_config.get("deployment_strategy", "blue_green")
            )
        )
        
        print(f"✅ Pipeline submitted: {pipeline_job.name}")
        
        # Monitor pipeline execution
        pipeline_result = await self._monitor_pipeline_execution(pipeline_job.name)
        
        return pipeline_result
    
    async def _monitor_pipeline_execution(self, pipeline_name):
        """Monitor pipeline execution with real-time updates"""
        
        while True:
            # Get pipeline status
            pipeline_job = self.ml_client.jobs.get(pipeline_name)
            
            print(f"📊 Pipeline status: {pipeline_job.status}")
            
            if pipeline_job.status == "Completed":
                print("✅ Pipeline completed successfully!")
                
                # Get pipeline outputs
                outputs = pipeline_job.outputs
                
                return {
                    "status": "completed",
                    "pipeline_name": pipeline_name,
                    "outputs": outputs,
                    "completion_time": datetime.utcnow().isoformat()
                }
                
            elif pipeline_job.status == "Failed":
                print(f"❌ Pipeline failed: {pipeline_job.error}")
                
                return {
                    "status": "failed",
                    "pipeline_name": pipeline_name,
                    "error": str(pipeline_job.error),
                    "failure_time": datetime.utcnow().isoformat()
                }
                
            elif pipeline_job.status in ["Running", "Preparing", "Queued"]:
                print("⏳ Pipeline still running...")
                await asyncio.sleep(60)  # Check every minute
                
            else:
                print(f"🔄 Pipeline status: {pipeline_job.status}")
                await asyncio.sleep(30)
    
    async def setup_automated_monitoring(self, model_endpoint_name):
        """Setup comprehensive monitoring dla production model"""
        
        monitoring_config = {
            "performance_monitoring": {
                "metrics": ["accuracy", "precision", "recall", "f1_score", "auc"],
                "thresholds": {
                    "accuracy": 0.85,
                    "precision": 0.80, 
                    "recall": 0.80,
                    "f1_score": 0.82
                },
                "evaluation_frequency": "daily",
                "alert_channels": ["email", "teams", "slack"]
            },
            "operational_monitoring": {
                "metrics": ["request_rate", "response_time", "error_rate", "availability"],
                "thresholds": {
                    "response_time_ms": 100,
                    "error_rate_percent": 1.0,
                    "availability_percent": 99.9
                },
                "evaluation_frequency": "real-time",
                "alert_channels": ["email", "pagerduty"]
            },
            "data_drift_monitoring": {
                "baseline_period": "last_30_days",
                "drift_threshold": 0.1,
                "evaluation_frequency": "daily",
                "features_to_monitor": "all",
                "alert_channels": ["email", "teams"]
            },
            "business_metrics": {
                "cost_per_prediction": 0.001,  # $0.001 per prediction
                "daily_prediction_volume": 100000,
                "user_satisfaction_score": 4.0  # out of 5
            }
        }
        
        # Implement monitoring logic
        monitoring_system = ModelMonitoringSystem(
            model_endpoint_name, 
            monitoring_config
        )
        
        await monitoring_system.initialize()
        
        print(f"📊 Monitoring configured for endpoint: {model_endpoint_name}")
        
        return monitoring_config

class ModelMonitoringSystem:
    def __init__(self, endpoint_name, config):
        self.endpoint_name = endpoint_name
        self.config = config
        self.alert_manager = AlertManager()
        
    async def initialize(self):
        """Initialize monitoring system"""
        
        # Setup performance monitoring
        await self._setup_performance_monitoring()
        
        # Setup operational monitoring  
        await self._setup_operational_monitoring()
        
        # Setup data drift monitoring
        await self._setup_drift_monitoring()
        
        # Setup alerting rules
        await self._configure_alerting()
        
        print("✅ Monitoring system initialized")
        
    async def _setup_performance_monitoring(self):
        """Setup model performance monitoring"""
        
        performance_config = self.config["performance_monitoring"]
        
        # Create scheduled job dla performance evaluation
        performance_job = {
            "name": f"{self.endpoint_name}-performance-monitor",
            "schedule": self._convert_frequency_to_cron(performance_config["evaluation_frequency"]),
            "script": "scripts/monitor_model_performance.py",
            "parameters": {
                "endpoint_name": self.endpoint_name,
                "metrics": performance_config["metrics"],
                "thresholds": performance_config["thresholds"]
            }
        }
        
        print("📈 Performance monitoring configured")
        
        return performance_job
    
    async def _setup_drift_monitoring(self):
        """Setup data drift monitoring"""
        
        drift_config = self.config["data_drift_monitoring"]
        
        # Create drift detection job
        drift_job = {
            "name": f"{self.endpoint_name}-drift-monitor",
            "schedule": self._convert_frequency_to_cron(drift_config["evaluation_frequency"]),
            "script": "scripts/detect_data_drift.py",
            "parameters": {
                "endpoint_name": self.endpoint_name,
                "baseline_period": drift_config["baseline_period"],
                "drift_threshold": drift_config["drift_threshold"]
            }
        }
        
        print("📊 Data drift monitoring configured")
        
        return drift_job

🛠️ Warsztat praktyczny (120 min)

Implementacja krok po kroku

Krok 1: Infrastructure Setup (30 min)

# Setup complete MLOps infrastructure
async def setup_mlops_infrastructure():
    """Setup complete infrastructure dla MLOps"""
    
    infrastructure_config = {
        "resource_group": "rg-mlops-workshop",
        "workspace_name": "mlops-workspace",
        "compute_clusters": [
            {
                "name": "cpu-cluster",
                "size": "Standard_DS3_v2", 
                "min_nodes": 0,
                "max_nodes": 4
            },
            {
                "name": "gpu-cluster",
                "size": "Standard_NC6s_v3",
                "min_nodes": 0,
                "max_nodes": 2
            }
        ],
        "datastores": [
            "training-data",
            "model-artifacts", 
            "monitoring-data"
        ],
        "environments": [
            "sklearn-production",
            "pytorch-training",
            "monitoring-env"
        ]
    }
    
    # Deploy infrastructure using Azure CLI commands
    setup_commands = [
        f"az group create --name {infrastructure_config['resource_group']} --location eastus",
        f"az ml workspace create --name {infrastructure_config['workspace_name']} --resource-group {infrastructure_config['resource_group']}",
        "az ml compute create --name cpu-cluster --type AmlCompute --size Standard_DS3_v2 --min-instances 0 --max-instances 4",
        "az ml compute create --name gpu-cluster --type AmlCompute --size Standard_NC6s_v3 --min-instances 0 --max-instances 2"
    ]
    
    print("🏗️ Setting up MLOps infrastructure...")
    
    for command in setup_commands:
        print(f"Executing: {command}")
        # W rzeczywistości wykonałby te komendy
        
    print("✅ Infrastructure setup completed")
    
    return infrastructure_config

# Workshop setup
workshop_config = {
    "subscription_id": "your-subscription-id",
    "resource_group": "rg-mlops-workshop", 
    "workspace_name": "mlops-workspace",
    "model_name": "churn-prediction-model",
    "training_data_path": "azureml://datastores/workspaceblobstore/paths/churn-data/"
}

# Initialize MLOps system
ml_client = MLClient.from_config()
mlops_system = ProductionMLOpsSystem(ml_client, workshop_config)

# Setup infrastructure
infrastructure = await setup_mlops_infrastructure()

Krok 2: Pipeline Development (45 min)

# Implementacja training i evaluation scripts

# src/data_validation/validate_data.py
class DataQualityValidator:
    def __init__(self):
        self.quality_checks = [
            self._check_completeness,
            self._check_consistency,
            self._check_validity,
            self._check_distribution_stability
        ]
    
    def validate_dataset(self, data_path, quality_threshold=0.95):
        """Comprehensive data quality validation"""
        
        import pandas as pd
        
        # Load dataset
        df = pd.read_csv(data_path + "/training_data.csv")
        
        validation_results = {
            "total_records": len(df),
            "quality_score": 0,
            "checks_passed": 0,
            "total_checks": len(self.quality_checks),
            "issues": [],
            "recommendations": []
        }
        
        # Run wszystkie quality checks
        for check in self.quality_checks:
            try:
                check_result = check(df)
                
                if check_result["passed"]:
                    validation_results["checks_passed"] += 1
                else:
                    validation_results["issues"].extend(check_result["issues"])
                    validation_results["recommendations"].extend(check_result["recommendations"])
                    
            except Exception as e:
                validation_results["issues"].append(f"Check failed: {str(e)}")
        
        # Calculate overall quality score
        validation_results["quality_score"] = validation_results["checks_passed"] / validation_results["total_checks"]
        
        # Determine if data passes quality threshold
        validation_results["passes_threshold"] = validation_results["quality_score"] >= quality_threshold
        
        if not validation_results["passes_threshold"]:
            raise ValueError(f"Data quality score {validation_results['quality_score']:.2f} below threshold {quality_threshold}")
        
        return validation_results
    
    def _check_completeness(self, df):
        """Check for missing values and completeness"""
        
        missing_percentages = df.isnull().sum() / len(df)
        high_missing_cols = missing_percentages[missing_percentages > 0.1].index.tolist()
        
        return {
            "passed": len(high_missing_cols) == 0,
            "issues": [f"Column '{col}' has {missing_percentages[col]:.1%} missing values" for col in high_missing_cols],
            "recommendations": ["Consider imputation strategies dla high-missing columns"] if high_missing_cols else []
        }
    
    def _check_distribution_stability(self, df):
        """Check for distribution shifts in key features"""
        
        # Simplified check - w rzeczywistości porównałby z baseline distribution
        numeric_cols = df.select_dtypes(include=['number']).columns
        
        distribution_issues = []
        
        for col in numeric_cols:
            # Check dla outliers
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            
            outlier_count = len(df[(df[col] < Q1 - 1.5*IQR) | (df[col] > Q3 + 1.5*IQR)])
            outlier_percentage = outlier_count / len(df)
            
            if outlier_percentage > 0.05:  # >5% outliers
                distribution_issues.append(f"Column '{col}' has {outlier_percentage:.1%} outliers")
        
        return {
            "passed": len(distribution_issues) == 0,
            "issues": distribution_issues,
            "recommendations": ["Review data collection process dla outlier handling"] if distribution_issues else []
        }

# src/training/train_with_hpo.py  
class HyperparameterOptimizer:
    def __init__(self):
        self.optimization_methods = {
            "grid_search": self._grid_search_optimization,
            "random_search": self._random_search_optimization,
            "bayesian": self._bayesian_optimization
        }
    
    def optimize_model(self, X_train, y_train, X_val, y_val, trials=20):
        """Hyperparameter optimization z MLflow tracking"""
        
        import mlflow
        from sklearn.ensemble import RandomForestClassifier
        from sklearn.model_selection import RandomizedSearchCV
        from scipy.stats import randint, uniform
        
        # Define hyperparameter search space
        param_distributions = {
            'n_estimators': randint(50, 200),
            'max_depth': [None, 10, 20, 30],
            'min_samples_split': randint(2, 20),
            'min_samples_leaf': randint(1, 10),
            'max_features': ['auto', 'sqrt', 'log2']
        }
        
        best_results = {"score": 0, "model": None, "params": {}}
        
        with mlflow.start_run(run_name="hyperparameter_optimization"):
            
            # Log search configuration
            mlflow.log_params({
                "optimization_method": "random_search",
                "n_trials": trials,
                "search_space": str(param_distributions)
            })
            
            # Perform hyperparameter search
            base_model = RandomForestClassifier(random_state=42)
            
            random_search = RandomizedSearchCV(
                base_model,
                param_distributions,
                n_iter=trials,
                cv=5,
                scoring='f1_weighted',
                random_state=42,
                n_jobs=-1
            )
            
            # Fit the search
            print("🔍 Starting hyperparameter optimization...")
            random_search.fit(X_train, y_train)
            
            # Get best model
            best_model = random_search.best_estimator_
            
            # Evaluate on validation set
            val_predictions = best_model.predict(X_val)
            
            from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
            
            val_metrics = {
                "val_accuracy": accuracy_score(y_val, val_predictions),
                "val_f1": f1_score(y_val, val_predictions, average='weighted'),
                "val_precision": precision_score(y_val, val_predictions, average='weighted'),
                "val_recall": recall_score(y_val, val_predictions, average='weighted')
            }
            
            # Log results
            mlflow.log_params(random_search.best_params_)
            mlflow.log_metrics(val_metrics)
            mlflow.log_metric("cv_score", random_search.best_score_)
            
            # Save model
            mlflow.sklearn.log_model(best_model, "optimized_model")
            
            print(f"✅ Optimization completed - Best CV Score: {random_search.best_score_:.3f}")
            print(f"📊 Validation F1: {val_metrics['val_f1']:.3f}")
            
            return {
                "best_model": best_model,
                "best_params": random_search.best_params_,
                "cv_score": random_search.best_score_,
                "validation_metrics": val_metrics
            }

Krok 3: Automated Deployment (30 min)

class AutomatedDeploymentManager:
    def __init__(self, ml_client):
        self.ml_client = ml_client
        
    async def deploy_with_quality_gates(self, model_name, model_version, 
                                       deployment_config):
        """Deploy model z automated quality gates"""
        
        endpoint_name = f"{model_name}-endpoint"
        
        try:
            # Step 1: Pre-deployment validation
            print("🔍 Running pre-deployment validation...")
            validation_result = await self._validate_model_for_deployment(
                model_name, model_version
            )
            
            if not validation_result["passed"]:
                raise Exception(f"Model validation failed: {validation_result['issues']}")
            
            # Step 2: Create/update endpoint
            print("🔧 Configuring endpoint...")
            endpoint = await self._ensure_endpoint_exists(endpoint_name)
            
            # Step 3: Deploy using selected strategy
            deployment_strategy = deployment_config.get("strategy", "blue_green")
            
            if deployment_strategy == "blue_green":
                deployment_result = await self._blue_green_deployment(
                    endpoint_name, model_name, model_version
                )
            elif deployment_strategy == "canary":
                deployment_result = await self._canary_deployment(
                    endpoint_name, model_name, model_version, 
                    deployment_config.get("canary_percentage", 10)
                )
            else:
                raise ValueError(f"Unsupported deployment strategy: {deployment_strategy}")
            
            # Step 4: Post-deployment verification
            print("✅ Running post-deployment tests...")
            verification_result = await self._verify_deployment(
                endpoint_name, deployment_result["deployment_name"]
            )
            
            if verification_result["success"]:
                print("🎉 Deployment successful and verified!")
                
                # Setup monitoring
                await self._setup_deployment_monitoring(
                    endpoint_name, deployment_result["deployment_name"]
                )
                
                return {
                    "status": "success",
                    "endpoint_name": endpoint_name,
                    "deployment_name": deployment_result["deployment_name"],
                    "verification": verification_result
                }
            else:
                # Automatic rollback
                print("❌ Verification failed, initiating rollback...")
                await self._rollback_deployment(endpoint_name, deployment_result["deployment_name"])
                
                return {
                    "status": "failed_verification",
                    "error": verification_result["error"],
                    "rollback_completed": True
                }
                
        except Exception as e:
            print(f"❌ Deployment failed: {str(e)}")
            return {
                "status": "failed",
                "error": str(e)
            }
    
    async def _blue_green_deployment(self, endpoint_name, model_name, model_version):
        """Blue-green deployment implementation"""
        
        # Determine current i new deployment colors
        current_deployments = list(self.ml_client.online_deployments.list(endpoint_name))
        
        if current_deployments:
            current_color = current_deployments[0].name
            new_color = "green" if current_color == "blue" else "blue"
        else:
            current_color = None
            new_color = "blue"
        
        print(f"🔵🟢 Blue-Green: Current={current_color}, New={new_color}")
        
        # Create new deployment
        new_deployment = ManagedOnlineDeployment(
            name=new_color,
            endpoint_name=endpoint_name,
            model=f"{model_name}:{model_version}",
            instance_type="Standard_DS3_v2",
            instance_count=1,
            environment_variables={
                "DEPLOYMENT_COLOR": new_color,
                "MODEL_VERSION": model_version
            }
        )
        
        # Deploy
        deployment_poller = self.ml_client.online_deployments.begin_create_or_update(new_deployment)
        deployment_result = deployment_poller.result()
        
        print(f"✅ {new_color} deployment created")
        
        return {
            "deployment_name": new_color,
            "previous_deployment": current_color,
            "endpoint_name": endpoint_name
        }
    
    async def _verify_deployment(self, endpoint_name, deployment_name):
        """Comprehensive deployment verification"""
        
        verification_tests = [
            self._test_basic_functionality,
            self._test_performance_requirements,
            self._test_error_handling,
            self._test_load_capacity
        ]
        
        verification_results = {
            "success": True,
            "tests_passed": 0,
            "total_tests": len(verification_tests),
            "test_results": []
        }
        
        for test_func in verification_tests:
            try:
                test_result = await test_func(endpoint_name, deployment_name)
                
                verification_results["test_results"].append(test_result)
                
                if test_result["passed"]:
                    verification_results["tests_passed"] += 1
                    print(f"✅ {test_result['test_name']}: PASSED")
                else:
                    verification_results["success"] = False
                    print(f"❌ {test_result['test_name']}: FAILED - {test_result['reason']}")
                    
            except Exception as e:
                verification_results["success"] = False
                verification_results["test_results"].append({
                    "test_name": test_func.__name__,
                    "passed": False,
                    "error": str(e)
                })
                print(f"❌ {test_func.__name__}: ERROR - {str(e)}")
        
        verification_results["success_rate"] = verification_results["tests_passed"] / verification_results["total_tests"]
        
        return verification_results
    
    async def _test_basic_functionality(self, endpoint_name, deployment_name):
        """Test basic model functionality"""
        
        # Get endpoint scoring URI
        endpoint = self.ml_client.online_endpoints.get(endpoint_name)
        scoring_uri = endpoint.scoring_uri
        
        # Test data
        test_payload = {
            "data": [
                [25, 50000, 2, 1, 0.8],  # Sample customer data
                [45, 80000, 5, 0, 0.3],
                [35, 60000, 3, 1, 0.6]
            ]
        }
        
        try:
            import requests
            
            response = requests.post(
                scoring_uri,
                json=test_payload,
                headers={
                    "Authorization": f"Bearer {self._get_auth_token()}",
                    "Content-Type": "application/json"
                },
                timeout=10
            )
            
            if response.status_code == 200:
                predictions = response.json()
                
                # Validate response format
                if isinstance(predictions, list) and len(predictions) == 3:
                    return {
                        "test_name": "basic_functionality",
                        "passed": True,
                        "predictions": predictions,
                        "response_time_ms": response.elapsed.total_seconds() * 1000
                    }
                else:
                    return {
                        "test_name": "basic_functionality", 
                        "passed": False,
                        "reason": f"Invalid response format: {predictions}"
                    }
            else:
                return {
                    "test_name": "basic_functionality",
                    "passed": False,
                    "reason": f"HTTP {response.status_code}: {response.text}"
                }
                
        except Exception as e:
            return {
                "test_name": "basic_functionality",
                "passed": False,
                "error": str(e)
            }

Krok 4: Monitoring Implementation (15 min)

class RealTimeMonitoring:
    def __init__(self, endpoint_name):
        self.endpoint_name = endpoint_name
        self.metrics_collector = MetricsCollector()
        
    async def setup_comprehensive_monitoring(self):
        """Setup complete monitoring system"""
        
        monitoring_components = [
            "performance_metrics",
            "operational_metrics", 
            "business_metrics",
            "data_drift_detection",
            "model_explainability"
        ]
        
        for component in monitoring_components:
            await self._setup_monitoring_component(component)
            
        print("📊 Comprehensive monitoring system active")
        
        return {"status": "monitoring_active", "components": monitoring_components}
    
    async def _setup_monitoring_component(self, component_type):
        """Setup specific monitoring component"""
        
        if component_type == "performance_metrics":
            # Setup performance tracking
            await self._setup_performance_tracking()
        elif component_type == "data_drift_detection":
            # Setup drift detection
            await self._setup_drift_detection()
        # ... other components
        
        print(f"✅ {component_type} monitoring configured")

✅ Zadania warsztatowe

Zadanie główne: Complete MLOps System (90 min)

Implementacja:

  1. Infrastructure setup (30 min) - Azure resources, compute, storage
  2. Pipeline development (45 min) - training, validation, deployment
  3. Monitoring setup (15 min) - comprehensive observability

Zadania dodatkowe

Zadanie 1: Advanced Deployment (20 min)

  • Canary deployment implementation
  • Automated rollback triggers
  • Multi-region deployment

Zadanie 2: Cost Optimization (15 min)

  • Resource auto-scaling
  • Cost monitoring i alerting
  • Efficiency improvements

Zadanie 3: Compliance i Audit (15 min)

  • Audit trail implementation
  • Compliance reporting
  • Security scanning integration

📊 Kryteria oceny

Technical Implementation (60 punktów)

  • Complete MLOps pipeline (25 pkt)
  • Automated deployment (20 pkt)
  • Monitoring system (15 pkt)

Operational Excellence (25 punktów)

  • Error handling i recovery (10 pkt)
  • Performance optimization (10 pkt)
  • Documentation (5 pkt)

Innovation (15 punktów)

  • Advanced features implementation (15 pkt)

🏆 Rezultat warsztatów

Po ukończeniu uczestnicy będą mieli:

  1. Production MLOps pipeline - fully automated
  2. Deployment strategies - blue-green, canary implementations
  3. Monitoring system - comprehensive observability
  4. Enterprise experience - production-ready skills

📚 Materiały dodatkowe

💡 Wskazówka

Każda sesja to 2 godziny intensywnej nauki z praktycznymi ćwiczeniami. Materiały można przeglądać w dowolnym tempie.

📈 Postęp

Śledź swój postęp w nauce AI i przygotowaniu do certyfikacji Azure AI-102. Każdy moduł buduje na poprzednim.