Moduł 6: Uczenie maszynowe i MLOps


🎯 Cele modułu

  • Zarządzanie eksperymentami i zasobami ML w Azure
  • Implementacja potoków CI/CD dla rozwiązań AI
  • Automatyzacja MLOps workflows
  • Monitoring i maintenance modeli w production

Sesja 16: Zarządzanie eksperymentami i zasobami ML (23.10.2025)

🧪 ML Experiment Management

Azure Machine Learning Workspace Setup

from azure.ai.ml import MLClient
from azure.ai.ml.entities import Workspace, ComputeInstance, Environment
from azure.identity import DefaultAzureCredential

def setup_ml_workspace():
    """Initialize Azure ML workspace dla ML experiments"""
    
    # Create ML client
    ml_client = MLClient(
        credential=DefaultAzureCredential(),
        subscription_id="your-subscription-id",
        resource_group_name="your-resource-group",
        workspace_name="your-workspace"
    )
    
    # Define workspace configuration
    workspace_config = Workspace(
        name="ai-workshop-workspace",
        description="Workspace dla AI Workshop ML experiments",
        tags={"project": "ai-workshop", "environment": "development"}
    )
    
    return ml_client, workspace_config

def create_compute_resources(ml_client):
    """Setup compute resources dla ML training"""
    
    # CPU compute dla lightweight training
    cpu_compute = ComputeInstance(
        name="cpu-compute-instance",
        size="Standard_DS3_v2",
        idle_time_before_shutdown_minutes=30
    )
    
    # GPU compute dla deep learning
    gpu_compute = ComputeInstance(
        name="gpu-compute-instance", 
        size="Standard_NC6s_v3",
        idle_time_before_shutdown_minutes=15
    )
    
    # Create compute instances
    ml_client.compute.begin_create_or_update(cpu_compute)
    ml_client.compute.begin_create_or_update(gpu_compute)
    
    return {"cpu": "cpu-compute-instance", "gpu": "gpu-compute-instance"}

Experiment Tracking i Versioning

from azure.ai.ml.entities import Model, Environment
import mlflow
import joblib

class MLExperimentTracker:
    def __init__(self, ml_client, experiment_name):
        self.ml_client = ml_client
        self.experiment_name = experiment_name
        mlflow.set_experiment(experiment_name)
    
    def run_training_experiment(self, model_config, training_data, validation_data):
        """Run ML training experiment z comprehensive tracking"""
        
        with mlflow.start_run() as run:
            # Log parameters
            mlflow.log_params(model_config)
            
            # Log dataset info
            mlflow.log_param("training_samples", len(training_data))
            mlflow.log_param("validation_samples", len(validation_data))
            
            # Train model
            model = self._train_model(model_config, training_data)
            
            # Evaluate model
            metrics = self._evaluate_model(model, validation_data)
            
            # Log metrics
            mlflow.log_metrics(metrics)
            
            # Log model artifacts
            model_path = "model"
            joblib.dump(model, model_path)
            mlflow.log_artifact(model_path)
            
            # Register model w Azure ML
            registered_model = self._register_model(model, run.info.run_id, metrics)
            
            return {
                "run_id": run.info.run_id,
                "model_id": registered_model.id,
                "metrics": metrics,
                "artifacts": [model_path]
            }
    
    def _train_model(self, config, training_data):
        """Train ML model based na configuration"""
        
        from sklearn.ensemble import RandomForestClassifier
        from sklearn.linear_model import LogisticRegression
        from sklearn.svm import SVC
        
        # Model selection based na config
        if config["model_type"] == "random_forest":
            model = RandomForestClassifier(
                n_estimators=config.get("n_estimators", 100),
                max_depth=config.get("max_depth", None),
                random_state=42
            )
        elif config["model_type"] == "logistic_regression":
            model = LogisticRegression(
                C=config.get("C", 1.0),
                max_iter=config.get("max_iter", 1000),
                random_state=42
            )
        elif config["model_type"] == "svm":
            model = SVC(
                C=config.get("C", 1.0),
                kernel=config.get("kernel", "rbf"),
                random_state=42
            )
        
        # Train model
        X_train, y_train = training_data
        model.fit(X_train, y_train)
        
        return model
    
    def _evaluate_model(self, model, validation_data):
        """Comprehensive model evaluation"""
        
        from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
        
        X_val, y_val = validation_data
        y_pred = model.predict(X_val)
        
        metrics = {
            "accuracy": accuracy_score(y_val, y_pred),
            "precision": precision_score(y_val, y_pred, average='weighted'),
            "recall": recall_score(y_val, y_pred, average='weighted'),
            "f1_score": f1_score(y_val, y_pred, average='weighted')
        }
        
        return metrics

📊 Resource Management i Cost Optimization

Azure ML Resource Monitoring

from azure.ai.ml.entities import ComputeInstance
from azure.monitor.query import LogsQueryClient
import pandas as pd

class MLResourceManager:
    def __init__(self, ml_client, monitoring_client):
        self.ml_client = ml_client
        self.monitoring_client = monitoring_client
    
    def monitor_compute_usage(self, timespan_hours=24):
        """Monitor compute resource utilization"""
        
        query = f"""
        AzureDiagnostics
        | where TimeGenerated > ago({timespan_hours}h)
        | where ResourceProvider == "MICROSOFT.MACHINELEARNINGSERVICES"
        | where Category == "ComputeInstanceEvent"
        | summarize 
            TotalRuntime = sum(todouble(DurationMs))/1000/60,
            ActiveHours = dcount(bin(TimeGenerated, 1h)),
            CostEstimate = TotalRuntime * 0.50  // Example cost per minute
        by ResourceId
        """
        
        results = self.monitoring_client.query_workspace(
            workspace_id="your-workspace-id",
            query=query
        )
        
        return pd.DataFrame(results.tables[0].rows, columns=[col.name for col in results.tables[0].columns])
    
    def optimize_compute_costs(self):
        """Implement cost optimization strategies"""
        
        recommendations = []
        
        # Check dla idle compute instances
        compute_instances = self.ml_client.compute.list()
        
        for compute in compute_instances:
            if isinstance(compute, ComputeInstance):
                if compute.state == "Running":
                    # Check last activity
                    last_activity = self._get_last_activity(compute.name)
                    if last_activity > 2:  # hours
                        recommendations.append({
                            "resource": compute.name,
                            "action": "consider_shutdown",
                            "potential_savings": self._calculate_savings(compute),
                            "last_activity_hours": last_activity
                        })
        
        return recommendations
    
    def setup_auto_shutdown_policies(self):
        """Configure automatic shutdown policies"""
        
        # Setup compute instance auto-shutdown
        compute_config = {
            "idle_time_before_shutdown_minutes": 30,
            "enable_node_public_ip": False,
            "auto_shutdown": {
                "enabled": True,
                "idle_timeout_minutes": 30,
                "schedule": {
                    "weekdays": {"start_time": "09:00", "end_time": "18:00"},
                    "weekends": {"enabled": False}
                }
            }
        }
        
        return compute_config

Sesja 17: Potoki CI/CD dla rozwiązań AI (28.10.2025)

🔄 MLOps Pipeline Architecture

CI/CD dla Machine Learning

ML CI/CD PIPELINE STAGES:

1. CODE COMMIT → 2. AUTOMATED TESTING → 3. MODEL TRAINING → 4. MODEL VALIDATION → 5. DEPLOYMENT

STAGE DETAILS:

1. CODE COMMIT:
   - Version control dla code, data, i models
   - Automated quality checks (linting, testing)
   - Dependency management i security scanning

2. AUTOMATED TESTING:
   - Unit tests dla ML code
   - Data validation tests
   - Model smoke tests
   - Integration testing

3. MODEL TRAINING:
   - Automated training triggers
   - Hyperparameter optimization
   - Cross-validation i performance evaluation
   - Model comparison i selection

4. MODEL VALIDATION:
   - Performance benchmarking
   - Bias i fairness testing
   - A/B testing setup
   - Stakeholder approval workflows

5. DEPLOYMENT:
   - Model packaging i containerization
   - Blue-green deployment strategies
   - Canary releases dla gradual rollout
   - Rollback mechanisms dla issues

Azure DevOps dla ML Projects

# azure-pipelines.yml dla ML project

trigger:
  branches:
    include:
    - main
    - develop
  paths:
    include:
    - src/
    - data/
    - models/

variables:
  azureServiceConnection: 'azure-ml-service-connection'
  workspaceName: 'ai-workshop-workspace'
  experimentName: 'production-model-training'

stages:
- stage: DataValidation
  displayName: 'Data Quality Validation'
  jobs:
  - job: ValidateData
    displayName: 'Validate Training Data'
    pool:
      vmImage: 'ubuntu-latest'
    steps:
    - task: UsePythonVersion@0
      inputs:
        versionSpec: '3.9'
    
    - script: |
        pip install -r requirements.txt
        python scripts/validate_data.py --data-path $(dataPath)
      displayName: 'Run Data Validation'
    
    - task: PublishTestResults@2
      inputs:
        testResultsFiles: 'data-validation-results.xml'
        testRunTitle: 'Data Quality Tests'

- stage: ModelTraining
  displayName: 'Model Training & Evaluation'
  dependsOn: DataValidation
  condition: succeeded()
  jobs:
  - job: TrainModel
    displayName: 'Train i Evaluate Model'
    pool:
      vmImage: 'ubuntu-latest'
    steps:
    - task: AzureCLI@2
      displayName: 'Train Model w Azure ML'
      inputs:
        azureSubscription: $(azureServiceConnection)
        scriptType: 'bash'
        scriptLocation: 'inlineScript'
        inlineScript: |
          az ml job create --file training-job.yml --workspace-name $(workspaceName)
    
    - script: |
        python scripts/evaluate_model.py --experiment $(experimentName)
      displayName: 'Model Evaluation'
    
    - task: PublishBuildArtifacts@1
      inputs:
        pathToPublish: 'outputs/'
        artifactName: 'model-artifacts'

- stage: ModelDeployment
  displayName: 'Model Deployment'
  dependsOn: ModelTraining
  condition: and(succeeded(), eq(variables['Build.SourceBranch'], 'refs/heads/main'))
  jobs:
  - deployment: DeployToStaging
    displayName: 'Deploy to Staging'
    environment: 'staging'
    strategy:
      runOnce:
        deploy:
          steps:
          - task: AzureCLI@2
            displayName: 'Deploy Model Endpoint'
            inputs:
              azureSubscription: $(azureServiceConnection)
              scriptType: 'bash'
              scriptLocation: 'inlineScript'
              inlineScript: |
                az ml online-endpoint create --file staging-endpoint.yml
                az ml online-deployment create --file staging-deployment.yml

Model Deployment Strategies

from azure.ai.ml.entities import ManagedOnlineEndpoint, ManagedOnlineDeployment, Model, Environment

class ModelDeploymentManager:
    def __init__(self, ml_client):
        self.ml_client = ml_client
    
    def deploy_model_endpoint(self, model_name, model_version, deployment_config):
        """Deploy trained model jako online endpoint"""
        
        # Define endpoint
        endpoint = ManagedOnlineEndpoint(
            name=f"{model_name}-endpoint",
            description=f"Endpoint dla {model_name} model",
            auth_mode="key",
            tags={"model": model_name, "version": model_version}
        )
        
        # Create endpoint
        endpoint_result = self.ml_client.online_endpoints.begin_create_or_update(endpoint)
        
        # Define deployment
        deployment = ManagedOnlineDeployment(
            name="primary",
            endpoint_name=endpoint.name,
            model=f"{model_name}:{model_version}",
            instance_type="Standard_DS3_v2",
            instance_count=deployment_config.get("instance_count", 1),
            environment_variables=deployment_config.get("env_vars", {}),
            request_settings={
                "request_timeout_ms": 60000,
                "max_concurrent_requests_per_instance": 1
            },
            liveness_probe={
                "initial_delay": 10,
                "period": 10,
                "timeout": 2,
                "success_threshold": 1,
                "failure_threshold": 30
            }
        )
        
        # Create deployment
        deployment_result = self.ml_client.online_deployments.begin_create_or_update(deployment)
        
        return {
            "endpoint_name": endpoint.name,
            "deployment_name": deployment.name,
            "status": "deployed",
            "scoring_uri": endpoint_result.result().scoring_uri
        }
    
    def blue_green_deployment(self, endpoint_name, new_model_version):
        """Implement blue-green deployment strategy"""
        
        # Get current deployment (blue)
        current_deployments = list(self.ml_client.online_deployments.list(endpoint_name))
        blue_deployment = current_deployments[0] if current_deployments else None
        
        # Create green deployment
        green_deployment = ManagedOnlineDeployment(
            name="green",
            endpoint_name=endpoint_name,
            model=f"model:{new_model_version}",
            instance_type="Standard_DS3_v2",
            instance_count=1
        )
        
        # Deploy green
        self.ml_client.online_deployments.begin_create_or_update(green_deployment)
        
        # Test green deployment
        test_results = self._test_deployment(endpoint_name, "green")
        
        if test_results["success"]:
            # Switch traffic to green
            self._update_traffic_allocation(endpoint_name, {"green": 100, "blue": 0})
            
            # Delete blue deployment after verification
            if blue_deployment:
                self.ml_client.online_deployments.begin_delete(
                    endpoint_name, blue_deployment.name
                )
            
            return {"status": "success", "active_deployment": "green"}
        else:
            # Rollback - delete failed green deployment
            self.ml_client.online_deployments.begin_delete(endpoint_name, "green")
            return {"status": "rollback", "error": test_results["error"]}

📈 Model Performance Monitoring

Production Model Monitoring

from azure.monitor.query import LogsQueryClient, MetricsQueryClient
import numpy as np
from datetime import datetime, timedelta

class ModelPerformanceMonitor:
    def __init__(self, workspace_id, endpoint_name):
        self.logs_client = LogsQueryClient(DefaultAzureCredential())
        self.metrics_client = MetricsQueryClient(DefaultAzureCredential())
        self.workspace_id = workspace_id
        self.endpoint_name = endpoint_name
    
    def monitor_model_drift(self, baseline_data, current_timespan_hours=24):
        """Detect model drift by comparing current predictions z baseline"""
        
        # Get recent predictions
        query = f"""
        AmlOnlineEndpointTrafficLog
        | where TimeGenerated > ago({current_timespan_hours}h)
        | where EndpointName == "{self.endpoint_name}"
        | project TimeGenerated, RequestPayload, ResponsePayload
        | limit 1000
        """
        
        results = self.logs_client.query_workspace(self.workspace_id, query)
        current_predictions = self._extract_predictions(results)
        
        # Calculate drift metrics
        drift_score = self._calculate_psi(baseline_data, current_predictions)
        
        if drift_score > 0.2:  # Threshold dla significant drift
            return {
                "drift_detected": True,
                "drift_score": drift_score,
                "recommendation": "Retrain model z recent data"
            }
        
        return {"drift_detected": False, "drift_score": drift_score}
    
    def _calculate_psi(self, baseline, current):
        """Calculate Population Stability Index (PSI)"""
        
        # Bin the data
        baseline_bins = np.histogram(baseline, bins=10)[0]
        current_bins = np.histogram(current, bins=10)[0]
        
        # Calculate percentages
        baseline_pct = baseline_bins / len(baseline)
        current_pct = current_bins / len(current)
        
        # Avoid division by zero
        baseline_pct = np.where(baseline_pct == 0, 0.0001, baseline_pct)
        current_pct = np.where(current_pct == 0, 0.0001, current_pct)
        
        # Calculate PSI
        psi = np.sum((current_pct - baseline_pct) * np.log(current_pct / baseline_pct))
        
        return psi
    
    def setup_alerting(self, alert_rules):
        """Setup automated alerting dla model performance"""
        
        alert_configs = []
        
        for rule in alert_rules:
            if rule["metric"] == "accuracy_drop":
                alert_config = {
                    "name": f"{self.endpoint_name}-accuracy-alert",
                    "condition": "accuracy < 0.85",
                    "frequency": "PT5M",  # Check every 5 minutes
                    "action": ["email", "slack"],
                    "severity": "high"
                }
            elif rule["metric"] == "latency_increase":
                alert_config = {
                    "name": f"{self.endpoint_name}-latency-alert", 
                    "condition": "avg_response_time > 2000ms",
                    "frequency": "PT1M",
                    "action": ["email"],
                    "severity": "medium"
                }
            elif rule["metric"] == "error_rate":
                alert_config = {
                    "name": f"{self.endpoint_name}-error-alert",
                    "condition": "error_rate > 5%",
                    "frequency": "PT1M", 
                    "action": ["email", "slack", "pagerduty"],
                    "severity": "critical"
                }
            
            alert_configs.append(alert_config)
        
        return alert_configs

Sesja 18: Wdrażanie modeli i automatyzacja MLOps (30.10.2025)

🚀 Production ML Pipeline Implementation

Complete MLOps Workflow

from azure.ai.ml import Input, Output, command
from azure.ai.ml.entities import Job, Pipeline
from azure.ai.ml.dsl import pipeline

@pipeline(description="Complete MLOps pipeline dla production ML")
def ml_production_pipeline(
    training_data: Input(type="uri_folder"),
    model_name: str,
    deployment_target: str
):
    """End-to-end ML pipeline z training do deployment"""
    
    # Step 1: Data validation i preprocessing
    data_prep_step = command(
        name="data_preparation",
        display_name="Data Preparation i Validation",
        code="./src/data_preparation",
        command="python prep_data.py --input ${{inputs.training_data}} --output ${{outputs.processed_data}}",
        environment="azureml:sklearn-env:1",
        inputs={"training_data": training_data},
        outputs={"processed_data": Output(type="uri_folder")}
    )
    
    # Step 2: Model training z hyperparameter tuning
    training_step = command(
        name="model_training",
        display_name="Model Training i Hyperparameter Tuning",
        code="./src/training",
        command="python train_model.py --data ${{inputs.processed_data}} --model-output ${{outputs.trained_model}}",
        environment="azureml:sklearn-env:1",
        inputs={"processed_data": data_prep_step.outputs.processed_data},
        outputs={"trained_model": Output(type="mlflow_model")},
        compute="gpu-compute-cluster"
    )
    
    # Step 3: Model evaluation i validation
    evaluation_step = command(
        name="model_evaluation", 
        display_name="Model Evaluation i Quality Gates",
        code="./src/evaluation",
        command="python evaluate_model.py --model ${{inputs.trained_model}} --test-data ${{inputs.test_data}} --metrics-output ${{outputs.evaluation_metrics}}",
        environment="azureml:sklearn-env:1",
        inputs={
            "trained_model": training_step.outputs.trained_model,
            "test_data": data_prep_step.outputs.processed_data
        },
        outputs={"evaluation_metrics": Output(type="uri_file")}
    )
    
    # Step 4: Model registration (conditional on performance)
    registration_step = command(
        name="model_registration",
        display_name="Conditional Model Registration", 
        code="./src/registration",
        command="python register_model.py --model ${{inputs.trained_model}} --metrics ${{inputs.evaluation_metrics}} --model-name {model_name}",
        environment="azureml:sklearn-env:1",
        inputs={
            "trained_model": training_step.outputs.trained_model,
            "evaluation_metrics": evaluation_step.outputs.evaluation_metrics
        }
    )
    
    # Step 5: Deployment (conditional on registration success)
    deployment_step = command(
        name="model_deployment",
        display_name="Model Deployment to Production",
        code="./src/deployment", 
        command="python deploy_model.py --model-name {model_name} --target {deployment_target}",
        environment="azureml:sklearn-env:1"
    )
    
    # Define step dependencies
    registration_step.inputs.trained_model = training_step.outputs.trained_model
    registration_step.inputs.evaluation_metrics = evaluation_step.outputs.evaluation_metrics
    
    return {
        "trained_model": training_step.outputs.trained_model,
        "evaluation_metrics": evaluation_step.outputs.evaluation_metrics
    }

Infrastructure jako Code (IaC)

# infrastructure/ml-workspace.bicep

param workspaceName string
param location string = resourceGroup().location
param storageAccountName string
param keyVaultName string
param appInsightsName string

// Storage Account dla ML workspace
resource storageAccount 'Microsoft.Storage/storageAccounts@2023-01-01' = {
  name: storageAccountName
  location: location
  sku: {
    name: 'Standard_LRS'
  }
  kind: 'StorageV2'
  properties: {
    supportsHttpsTrafficOnly: true
    encryption: {
      services: {
        file: {
          enabled: true
        }
        blob: {
          enabled: true  
        }
      }
      keySource: 'Microsoft.Storage'
    }
  }
}

// Key Vault dla secrets management
resource keyVault 'Microsoft.KeyVault/vaults@2023-07-01' = {
  name: keyVaultName
  location: location
  properties: {
    tenantId: subscription().tenantId
    sku: {
      family: 'A'
      name: 'standard'
    }
    accessPolicies: []
    enableSoftDelete: true
    softDeleteRetentionInDays: 7
  }
}

// Application Insights dla monitoring
resource appInsights 'Microsoft.Insights/components@2020-02-02' = {
  name: appInsightsName
  location: location
  kind: 'web'
  properties: {
    Application_Type: 'web'
    WorkspaceResourceId: logAnalyticsWorkspace.id
  }
}

// Azure ML Workspace
resource mlWorkspace 'Microsoft.MachineLearningServices/workspaces@2023-10-01' = {
  name: workspaceName
  location: location
  identity: {
    type: 'SystemAssigned'
  }
  properties: {
    storageAccount: storageAccount.id
    keyVault: keyVault.id
    applicationInsights: appInsights.id
    hbiWorkspace: false
    publicNetworkAccess: 'Enabled'
  }
}

🔍 Automated Testing dla ML Systems

ML-Specific Testing Framework

import pytest
import numpy as np
from sklearn.metrics import accuracy_score
import joblib

class MLModelTests:
    def __init__(self, model_path, test_data_path):
        self.model = joblib.load(model_path)
        self.test_data = self._load_test_data(test_data_path)
    
    def test_model_accuracy_threshold(self):
        """Test model meets minimum accuracy requirements"""
        
        X_test, y_test = self.test_data
        predictions = self.model.predict(X_test)
        accuracy = accuracy_score(y_test, predictions)
        
        assert accuracy >= 0.85, f"Model accuracy {accuracy:.3f} below threshold 0.85"
    
    def test_prediction_consistency(self):
        """Test model produces consistent predictions"""
        
        X_sample = self.test_data[0][:10]  # Sample of test data
        
        # Multiple predictions dla same input
        predictions_1 = self.model.predict(X_sample)
        predictions_2 = self.model.predict(X_sample)
        
        # Should be identical dla deterministic models
        np.testing.assert_array_equal(
            predictions_1, predictions_2,
            "Model predictions are not consistent"
        )
    
    def test_input_validation(self):
        """Test model handles invalid inputs gracefully"""
        
        # Test z empty input
        with pytest.raises((ValueError, AttributeError)):
            self.model.predict(np.array([]))
        
        # Test z wrong shape
        with pytest.raises((ValueError, AttributeError)):
            wrong_shape_input = np.random.rand(5, 999)  # Wrong feature count
            self.model.predict(wrong_shape_input)
    
    def test_prediction_latency(self):
        """Test model prediction latency requirements"""
        
        import time
        X_sample = self.test_data[0][:1]
        
        start_time = time.time()
        prediction = self.model.predict(X_sample)
        end_time = time.time()
        
        latency_ms = (end_time - start_time) * 1000
        
        assert latency_ms < 100, f"Prediction latency {latency_ms:.2f}ms exceeds 100ms limit"
    
    def test_memory_usage(self):
        """Test model memory footprint"""
        
        import psutil
        import os
        
        process = psutil.Process(os.getpid())
        initial_memory = process.memory_info().rss / 1024 / 1024  # MB
        
        # Load i use model
        X_test = self.test_data[0]
        predictions = self.model.predict(X_test)
        
        final_memory = process.memory_info().rss / 1024 / 1024  # MB
        memory_increase = final_memory - initial_memory
        
        assert memory_increase < 500, f"Memory usage increase {memory_increase:.2f}MB exceeds 500MB limit"

# Example test execution
def run_ml_tests():
    """Run comprehensive ML model tests"""
    
    test_suite = MLModelTests("models/latest_model.pkl", "data/test_data.npz")
    
    # Run all tests
    test_results = {}
    
    try:
        test_suite.test_model_accuracy_threshold()
        test_results["accuracy"] = "PASS"
    except AssertionError as e:
        test_results["accuracy"] = f"FAIL: {str(e)}"
    
    try:
        test_suite.test_prediction_consistency()
        test_results["consistency"] = "PASS"
    except AssertionError as e:
        test_results["consistency"] = f"FAIL: {str(e)}"
    
    try:
        test_suite.test_prediction_latency()
        test_results["latency"] = "PASS"
    except AssertionError as e:
        test_results["latency"] = f"FAIL: {str(e)}"
    
    return test_results

Sesja 19: Warsztaty - Wdrażanie modeli i automatyzacja MLOps (30.10.2025)

🏗️ Complete MLOps Implementation Workshop

Project: End-to-End ML System

WORKSHOP DELIVERABLE: PRODUCTION-READY ML SYSTEM

SYSTEM REQUIREMENTS:
- Automated training pipeline triggered by data changes
- A/B testing dla model variants
- Real-time monitoring i alerting
- Automated rollback w case of issues
- Comprehensive logging dla audit purposes

TECHNICAL STACK:
- Azure Machine Learning dla training i deployment
- Azure DevOps dla CI/CD pipelines
- Azure Monitor dla observability
- Azure Key Vault dla secrets management
- Azure Storage dla data i artifact management

TIMELINE (4 hours hands-on):
1. Infrastructure setup (60 min)
2. Pipeline development (90 min)  
3. Deployment automation (60 min)
4. Monitoring i testing (30 min)

Implementation Steps

Step 1: Infrastructure Setup (60 min)

# Deploy infrastructure using Bicep templates
az deployment group create \
  --resource-group ai-workshop-rg \
  --template-file infrastructure/ml-workspace.bicep \
  --parameters workspaceName=ai-workshop-ml location=eastus

# Configure compute resources
az ml compute create \
  --name cpu-cluster \
  --type AmlCompute \
  --min-instances 0 \
  --max-instances 4 \
  --size Standard_DS3_v2

az ml compute create \
  --name gpu-cluster \
  --type AmlCompute \
  --min-instances 0 \
  --max-instances 2 \
  --size Standard_NC6s_v3

Step 2: Pipeline Development (90 min)

# ml_pipeline.py - Complete training pipeline

from azure.ai.ml import MLClient, command, Input, Output
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import Environment, Model

@pipeline(description="Production ML Pipeline")
def production_ml_pipeline(
    training_data: Input(type="uri_folder"),
    model_name: str = "production-classifier",
    performance_threshold: float = 0.85
):
    # Data validation step
    data_validation = command(
        name="validate_data",
        display_name="Data Quality Validation",
        code="./src/validation",
        command="python validate_data.py --input ${{inputs.data}} --output ${{outputs.validation_report}}",
        environment="azureml:sklearn-1.0:1",
        inputs={"data": training_data},
        outputs={"validation_report": Output(type="uri_file")},
        compute="cpu-cluster"
    )
    
    # Feature engineering step
    feature_engineering = command(
        name="feature_engineering",
        display_name="Feature Engineering",
        code="./src/features", 
        command="python engineer_features.py --input ${{inputs.data}} --output ${{outputs.features}}",
        environment="azureml:sklearn-1.0:1",
        inputs={"data": training_data},
        outputs={"features": Output(type="uri_folder")},
        compute="cpu-cluster"
    )
    
    # Model training step
    model_training = command(
        name="train_model",
        display_name="Model Training",
        code="./src/training",
        command="python train.py --features ${{inputs.features}} --model-output ${{outputs.model}} --model-name {model_name}",
        environment="azureml:sklearn-1.0:1",
        inputs={"features": feature_engineering.outputs.features},
        outputs={"model": Output(type="mlflow_model")},
        compute="gpu-cluster"
    )
    
    # Model evaluation step
    model_evaluation = command(
        name="evaluate_model",
        display_name="Model Evaluation",
        code="./src/evaluation",
        command="python evaluate.py --model ${{inputs.model}} --test-data ${{inputs.features}} --threshold {performance_threshold} --metrics-output ${{outputs.metrics}}",
        environment="azureml:sklearn-1.0:1",
        inputs={
            "model": model_training.outputs.model,
            "features": feature_engineering.outputs.features
        },
        outputs={"metrics": Output(type="uri_file")},
        compute="cpu-cluster"
    )
    
    # Conditional deployment step
    model_deployment = command(
        name="deploy_model",
        display_name="Model Deployment",
        code="./src/deployment",
        command="python deploy.py --model ${{inputs.model}} --metrics ${{inputs.metrics}} --model-name {model_name}",
        environment="azureml:sklearn-1.0:1",
        inputs={
            "model": model_training.outputs.model,
            "metrics": model_evaluation.outputs.metrics
        },
        compute="cpu-cluster"
    )
    
    return {
        "trained_model": model_training.outputs.model,
        "evaluation_metrics": model_evaluation.outputs.metrics
    }

# Execute pipeline
def run_production_pipeline():
    ml_client = MLClient.from_config()
    
    # Create pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        production_ml_pipeline(
            training_data=Input(
                type="uri_folder",
                path="azureml://datastores/workspaceblobstore/paths/training-data/"
            ),
            model_name="production-classifier-v1",
            performance_threshold=0.85
        )
    )
    
    return pipeline_job

Step 3: Deployment Automation (60 min)

# deployment_automation.py

class AutomatedDeploymentManager:
    def __init__(self, ml_client):
        self.ml_client = ml_client
        self.deployment_strategies = {
            "blue_green": self._blue_green_deployment,
            "canary": self._canary_deployment,
            "rolling": self._rolling_deployment
        }
    
    def deploy_with_strategy(self, model_name, model_version, strategy="blue_green"):
        """Deploy model using specified strategy"""
        
        deployment_func = self.deployment_strategies.get(strategy)
        if not deployment_func:
            raise ValueError(f"Unknown deployment strategy: {strategy}")
        
        return deployment_func(model_name, model_version)
    
    def _blue_green_deployment(self, model_name, model_version):
        """Blue-green deployment implementation"""
        
        endpoint_name = f"{model_name}-endpoint"
        
        # Check if endpoint exists
        try:
            existing_endpoint = self.ml_client.online_endpoints.get(endpoint_name)
            current_deployment = "blue" if "green" in [d.name for d in existing_endpoint.deployments] else "green"
            new_deployment = "green" if current_deployment == "blue" else "blue"
        except Exception:
            # First deployment
            current_deployment = None
            new_deployment = "blue"
        
        # Create new deployment
        new_deployment_config = self._create_deployment_config(
            endpoint_name, new_deployment, model_name, model_version
        )
        
        # Deploy new version
        deployment_result = self.ml_client.online_deployments.begin_create_or_update(
            new_deployment_config
        ).result()
        
        # Test new deployment
        test_results = self._run_deployment_tests(endpoint_name, new_deployment)
        
        if test_results["success"]:
            # Switch traffic
            self._switch_traffic(endpoint_name, new_deployment, 100)
            
            # Clean up old deployment
            if current_deployment:
                self._cleanup_old_deployment(endpoint_name, current_deployment)
            
            return {"status": "success", "active_deployment": new_deployment}
        else:
            # Rollback
            self._cleanup_failed_deployment(endpoint_name, new_deployment)
            return {"status": "failed", "error": test_results["error"]}
    
    def _run_deployment_tests(self, endpoint_name, deployment_name):
        """Run comprehensive tests on new deployment"""
        
        import requests
        import time
        
        # Get endpoint scoring URI
        endpoint = self.ml_client.online_endpoints.get(endpoint_name)
        scoring_uri = endpoint.scoring_uri
        
        # Test scenarios
        test_cases = [
            {"input": [1, 2, 3, 4, 5], "expected_type": "array"},
            {"input": [0.1, 0.2, 0.3], "expected_type": "array"},
            {"input": "invalid", "expected_error": True}
        ]
        
        for i, test_case in enumerate(test_cases):
            try:
                # Make test request
                response = requests.post(
                    scoring_uri,
                    json={"data": test_case["input"]},
                    headers={"Authorization": f"Bearer {self._get_auth_token()}"},
                    timeout=30
                )
                
                if test_case.get("expected_error"):
                    assert response.status_code != 200, f"Test {i}: Expected error but got success"
                else:
                    assert response.status_code == 200, f"Test {i}: Request failed z status {response.status_code}"
                    
                    result = response.json()
                    assert isinstance(result, list), f"Test {i}: Expected array response"
                
                time.sleep(1)  # Rate limiting
                
            except Exception as e:
                return {"success": False, "error": f"Test {i} failed: {str(e)}"}
        
        return {"success": True, "tests_passed": len(test_cases)}

Step 4: Monitoring Setup (30 min)

# monitoring_setup.py

class MLOpsMonitoring:
    def __init__(self, workspace_name, endpoint_name):
        self.workspace_name = workspace_name
        self.endpoint_name = endpoint_name
        self.alert_manager = AlertManager()
    
    def setup_comprehensive_monitoring(self):
        """Setup complete monitoring dla ML system"""
        
        monitoring_config = {
            "model_performance": {
                "metrics": ["accuracy", "precision", "recall", "f1_score"],
                "thresholds": {"accuracy": 0.85, "f1_score": 0.80},
                "evaluation_frequency": "daily"
            },
            "system_performance": {
                "metrics": ["latency", "throughput", "error_rate"],
                "thresholds": {"latency_ms": 2000, "error_rate_pct": 5},
                "evaluation_frequency": "realtime"
            },
            "data_quality": {
                "metrics": ["completeness", "consistency", "drift"],
                "thresholds": {"drift_score": 0.2, "completeness_pct": 95},
                "evaluation_frequency": "hourly"
            }
        }
        
        # Setup alerts dla each category
        for category, config in monitoring_config.items():
            self._setup_category_alerts(category, config)
        
        return monitoring_config
    
    def _setup_category_alerts(self, category, config):
        """Setup alerts dla specific monitoring category"""
        
        for metric in config["metrics"]:
            if metric in config["thresholds"]:
                alert_rule = {
                    "name": f"{self.endpoint_name}-{metric}-alert",
                    "metric": metric,
                    "threshold": config["thresholds"][metric],
                    "frequency": config["evaluation_frequency"],
                    "severity": self._determine_severity(metric),
                    "actions": ["email", "teams", "pagerduty"]
                }
                
                self.alert_manager.create_alert_rule(alert_rule)

🏠 Final Module Project

Capstone: Complete MLOps Implementation

Requirements:

  1. Full ML Pipeline z automated training, validation, i deployment
  2. Monitoring Dashboard z real-time metrics i alerts
  3. Documentation covering architecture, operations, i troubleshooting
  4. Demo showing end-to-end workflow z sample data

Deliverables:

  • Working Azure ML workspace z deployed model
  • CI/CD pipeline w Azure DevOps
  • Monitoring setup z custom dashboards
  • Incident response playbook
  • Performance optimization recommendations

Assessment Criteria:

  • Technical implementation quality (40%)
  • Documentation completeness (25%)
  • Monitoring i observability (20%)
  • Demo presentation (15%)

💡 Wskazówka

Każda sesja to 2 godziny intensywnej nauki z praktycznymi ćwiczeniami. Materiały można przeglądać w dowolnym tempie.

📈 Postęp

Śledź swój postęp w nauce AI i przygotowaniu do certyfikacji Azure AI-102. Każdy moduł buduje na poprzednim.