Moduł 6: Uczenie maszynowe i MLOps
🎯 Cele modułu
- Zarządzanie eksperymentami i zasobami ML w Azure
- Implementacja potoków CI/CD dla rozwiązań AI
- Automatyzacja MLOps workflows
- Monitoring i maintenance modeli w production
Sesja 16: Zarządzanie eksperymentami i zasobami ML (23.10.2025)
🧪 ML Experiment Management
Azure Machine Learning Workspace Setup
from azure.ai.ml import MLClient
from azure.ai.ml.entities import Workspace, ComputeInstance, Environment
from azure.identity import DefaultAzureCredential
def setup_ml_workspace():
"""Initialize Azure ML workspace dla ML experiments"""
# Create ML client
ml_client = MLClient(
credential=DefaultAzureCredential(),
subscription_id="your-subscription-id",
resource_group_name="your-resource-group",
workspace_name="your-workspace"
)
# Define workspace configuration
workspace_config = Workspace(
name="ai-workshop-workspace",
description="Workspace dla AI Workshop ML experiments",
tags={"project": "ai-workshop", "environment": "development"}
)
return ml_client, workspace_config
def create_compute_resources(ml_client):
"""Setup compute resources dla ML training"""
# CPU compute dla lightweight training
cpu_compute = ComputeInstance(
name="cpu-compute-instance",
size="Standard_DS3_v2",
idle_time_before_shutdown_minutes=30
)
# GPU compute dla deep learning
gpu_compute = ComputeInstance(
name="gpu-compute-instance",
size="Standard_NC6s_v3",
idle_time_before_shutdown_minutes=15
)
# Create compute instances
ml_client.compute.begin_create_or_update(cpu_compute)
ml_client.compute.begin_create_or_update(gpu_compute)
return {"cpu": "cpu-compute-instance", "gpu": "gpu-compute-instance"}
Experiment Tracking i Versioning
from azure.ai.ml.entities import Model, Environment
import mlflow
import joblib
class MLExperimentTracker:
def __init__(self, ml_client, experiment_name):
self.ml_client = ml_client
self.experiment_name = experiment_name
mlflow.set_experiment(experiment_name)
def run_training_experiment(self, model_config, training_data, validation_data):
"""Run ML training experiment z comprehensive tracking"""
with mlflow.start_run() as run:
# Log parameters
mlflow.log_params(model_config)
# Log dataset info
mlflow.log_param("training_samples", len(training_data))
mlflow.log_param("validation_samples", len(validation_data))
# Train model
model = self._train_model(model_config, training_data)
# Evaluate model
metrics = self._evaluate_model(model, validation_data)
# Log metrics
mlflow.log_metrics(metrics)
# Log model artifacts
model_path = "model"
joblib.dump(model, model_path)
mlflow.log_artifact(model_path)
# Register model w Azure ML
registered_model = self._register_model(model, run.info.run_id, metrics)
return {
"run_id": run.info.run_id,
"model_id": registered_model.id,
"metrics": metrics,
"artifacts": [model_path]
}
def _train_model(self, config, training_data):
"""Train ML model based na configuration"""
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
# Model selection based na config
if config["model_type"] == "random_forest":
model = RandomForestClassifier(
n_estimators=config.get("n_estimators", 100),
max_depth=config.get("max_depth", None),
random_state=42
)
elif config["model_type"] == "logistic_regression":
model = LogisticRegression(
C=config.get("C", 1.0),
max_iter=config.get("max_iter", 1000),
random_state=42
)
elif config["model_type"] == "svm":
model = SVC(
C=config.get("C", 1.0),
kernel=config.get("kernel", "rbf"),
random_state=42
)
# Train model
X_train, y_train = training_data
model.fit(X_train, y_train)
return model
def _evaluate_model(self, model, validation_data):
"""Comprehensive model evaluation"""
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
X_val, y_val = validation_data
y_pred = model.predict(X_val)
metrics = {
"accuracy": accuracy_score(y_val, y_pred),
"precision": precision_score(y_val, y_pred, average='weighted'),
"recall": recall_score(y_val, y_pred, average='weighted'),
"f1_score": f1_score(y_val, y_pred, average='weighted')
}
return metrics
📊 Resource Management i Cost Optimization
Azure ML Resource Monitoring
from azure.ai.ml.entities import ComputeInstance
from azure.monitor.query import LogsQueryClient
import pandas as pd
class MLResourceManager:
def __init__(self, ml_client, monitoring_client):
self.ml_client = ml_client
self.monitoring_client = monitoring_client
def monitor_compute_usage(self, timespan_hours=24):
"""Monitor compute resource utilization"""
query = f"""
AzureDiagnostics
| where TimeGenerated > ago({timespan_hours}h)
| where ResourceProvider == "MICROSOFT.MACHINELEARNINGSERVICES"
| where Category == "ComputeInstanceEvent"
| summarize
TotalRuntime = sum(todouble(DurationMs))/1000/60,
ActiveHours = dcount(bin(TimeGenerated, 1h)),
CostEstimate = TotalRuntime * 0.50 // Example cost per minute
by ResourceId
"""
results = self.monitoring_client.query_workspace(
workspace_id="your-workspace-id",
query=query
)
return pd.DataFrame(results.tables[0].rows, columns=[col.name for col in results.tables[0].columns])
def optimize_compute_costs(self):
"""Implement cost optimization strategies"""
recommendations = []
# Check dla idle compute instances
compute_instances = self.ml_client.compute.list()
for compute in compute_instances:
if isinstance(compute, ComputeInstance):
if compute.state == "Running":
# Check last activity
last_activity = self._get_last_activity(compute.name)
if last_activity > 2: # hours
recommendations.append({
"resource": compute.name,
"action": "consider_shutdown",
"potential_savings": self._calculate_savings(compute),
"last_activity_hours": last_activity
})
return recommendations
def setup_auto_shutdown_policies(self):
"""Configure automatic shutdown policies"""
# Setup compute instance auto-shutdown
compute_config = {
"idle_time_before_shutdown_minutes": 30,
"enable_node_public_ip": False,
"auto_shutdown": {
"enabled": True,
"idle_timeout_minutes": 30,
"schedule": {
"weekdays": {"start_time": "09:00", "end_time": "18:00"},
"weekends": {"enabled": False}
}
}
}
return compute_config
Sesja 17: Potoki CI/CD dla rozwiązań AI (28.10.2025)
🔄 MLOps Pipeline Architecture
CI/CD dla Machine Learning
ML CI/CD PIPELINE STAGES:
1. CODE COMMIT → 2. AUTOMATED TESTING → 3. MODEL TRAINING → 4. MODEL VALIDATION → 5. DEPLOYMENT
STAGE DETAILS:
1. CODE COMMIT:
- Version control dla code, data, i models
- Automated quality checks (linting, testing)
- Dependency management i security scanning
2. AUTOMATED TESTING:
- Unit tests dla ML code
- Data validation tests
- Model smoke tests
- Integration testing
3. MODEL TRAINING:
- Automated training triggers
- Hyperparameter optimization
- Cross-validation i performance evaluation
- Model comparison i selection
4. MODEL VALIDATION:
- Performance benchmarking
- Bias i fairness testing
- A/B testing setup
- Stakeholder approval workflows
5. DEPLOYMENT:
- Model packaging i containerization
- Blue-green deployment strategies
- Canary releases dla gradual rollout
- Rollback mechanisms dla issues
Azure DevOps dla ML Projects
# azure-pipelines.yml dla ML project
trigger:
branches:
include:
- main
- develop
paths:
include:
- src/
- data/
- models/
variables:
azureServiceConnection: 'azure-ml-service-connection'
workspaceName: 'ai-workshop-workspace'
experimentName: 'production-model-training'
stages:
- stage: DataValidation
displayName: 'Data Quality Validation'
jobs:
- job: ValidateData
displayName: 'Validate Training Data'
pool:
vmImage: 'ubuntu-latest'
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '3.9'
- script: |
pip install -r requirements.txt
python scripts/validate_data.py --data-path $(dataPath)
displayName: 'Run Data Validation'
- task: PublishTestResults@2
inputs:
testResultsFiles: 'data-validation-results.xml'
testRunTitle: 'Data Quality Tests'
- stage: ModelTraining
displayName: 'Model Training & Evaluation'
dependsOn: DataValidation
condition: succeeded()
jobs:
- job: TrainModel
displayName: 'Train i Evaluate Model'
pool:
vmImage: 'ubuntu-latest'
steps:
- task: AzureCLI@2
displayName: 'Train Model w Azure ML'
inputs:
azureSubscription: $(azureServiceConnection)
scriptType: 'bash'
scriptLocation: 'inlineScript'
inlineScript: |
az ml job create --file training-job.yml --workspace-name $(workspaceName)
- script: |
python scripts/evaluate_model.py --experiment $(experimentName)
displayName: 'Model Evaluation'
- task: PublishBuildArtifacts@1
inputs:
pathToPublish: 'outputs/'
artifactName: 'model-artifacts'
- stage: ModelDeployment
displayName: 'Model Deployment'
dependsOn: ModelTraining
condition: and(succeeded(), eq(variables['Build.SourceBranch'], 'refs/heads/main'))
jobs:
- deployment: DeployToStaging
displayName: 'Deploy to Staging'
environment: 'staging'
strategy:
runOnce:
deploy:
steps:
- task: AzureCLI@2
displayName: 'Deploy Model Endpoint'
inputs:
azureSubscription: $(azureServiceConnection)
scriptType: 'bash'
scriptLocation: 'inlineScript'
inlineScript: |
az ml online-endpoint create --file staging-endpoint.yml
az ml online-deployment create --file staging-deployment.yml
Model Deployment Strategies
from azure.ai.ml.entities import ManagedOnlineEndpoint, ManagedOnlineDeployment, Model, Environment
class ModelDeploymentManager:
def __init__(self, ml_client):
self.ml_client = ml_client
def deploy_model_endpoint(self, model_name, model_version, deployment_config):
"""Deploy trained model jako online endpoint"""
# Define endpoint
endpoint = ManagedOnlineEndpoint(
name=f"{model_name}-endpoint",
description=f"Endpoint dla {model_name} model",
auth_mode="key",
tags={"model": model_name, "version": model_version}
)
# Create endpoint
endpoint_result = self.ml_client.online_endpoints.begin_create_or_update(endpoint)
# Define deployment
deployment = ManagedOnlineDeployment(
name="primary",
endpoint_name=endpoint.name,
model=f"{model_name}:{model_version}",
instance_type="Standard_DS3_v2",
instance_count=deployment_config.get("instance_count", 1),
environment_variables=deployment_config.get("env_vars", {}),
request_settings={
"request_timeout_ms": 60000,
"max_concurrent_requests_per_instance": 1
},
liveness_probe={
"initial_delay": 10,
"period": 10,
"timeout": 2,
"success_threshold": 1,
"failure_threshold": 30
}
)
# Create deployment
deployment_result = self.ml_client.online_deployments.begin_create_or_update(deployment)
return {
"endpoint_name": endpoint.name,
"deployment_name": deployment.name,
"status": "deployed",
"scoring_uri": endpoint_result.result().scoring_uri
}
def blue_green_deployment(self, endpoint_name, new_model_version):
"""Implement blue-green deployment strategy"""
# Get current deployment (blue)
current_deployments = list(self.ml_client.online_deployments.list(endpoint_name))
blue_deployment = current_deployments[0] if current_deployments else None
# Create green deployment
green_deployment = ManagedOnlineDeployment(
name="green",
endpoint_name=endpoint_name,
model=f"model:{new_model_version}",
instance_type="Standard_DS3_v2",
instance_count=1
)
# Deploy green
self.ml_client.online_deployments.begin_create_or_update(green_deployment)
# Test green deployment
test_results = self._test_deployment(endpoint_name, "green")
if test_results["success"]:
# Switch traffic to green
self._update_traffic_allocation(endpoint_name, {"green": 100, "blue": 0})
# Delete blue deployment after verification
if blue_deployment:
self.ml_client.online_deployments.begin_delete(
endpoint_name, blue_deployment.name
)
return {"status": "success", "active_deployment": "green"}
else:
# Rollback - delete failed green deployment
self.ml_client.online_deployments.begin_delete(endpoint_name, "green")
return {"status": "rollback", "error": test_results["error"]}
📈 Model Performance Monitoring
Production Model Monitoring
from azure.monitor.query import LogsQueryClient, MetricsQueryClient
import numpy as np
from datetime import datetime, timedelta
class ModelPerformanceMonitor:
def __init__(self, workspace_id, endpoint_name):
self.logs_client = LogsQueryClient(DefaultAzureCredential())
self.metrics_client = MetricsQueryClient(DefaultAzureCredential())
self.workspace_id = workspace_id
self.endpoint_name = endpoint_name
def monitor_model_drift(self, baseline_data, current_timespan_hours=24):
"""Detect model drift by comparing current predictions z baseline"""
# Get recent predictions
query = f"""
AmlOnlineEndpointTrafficLog
| where TimeGenerated > ago({current_timespan_hours}h)
| where EndpointName == "{self.endpoint_name}"
| project TimeGenerated, RequestPayload, ResponsePayload
| limit 1000
"""
results = self.logs_client.query_workspace(self.workspace_id, query)
current_predictions = self._extract_predictions(results)
# Calculate drift metrics
drift_score = self._calculate_psi(baseline_data, current_predictions)
if drift_score > 0.2: # Threshold dla significant drift
return {
"drift_detected": True,
"drift_score": drift_score,
"recommendation": "Retrain model z recent data"
}
return {"drift_detected": False, "drift_score": drift_score}
def _calculate_psi(self, baseline, current):
"""Calculate Population Stability Index (PSI)"""
# Bin the data
baseline_bins = np.histogram(baseline, bins=10)[0]
current_bins = np.histogram(current, bins=10)[0]
# Calculate percentages
baseline_pct = baseline_bins / len(baseline)
current_pct = current_bins / len(current)
# Avoid division by zero
baseline_pct = np.where(baseline_pct == 0, 0.0001, baseline_pct)
current_pct = np.where(current_pct == 0, 0.0001, current_pct)
# Calculate PSI
psi = np.sum((current_pct - baseline_pct) * np.log(current_pct / baseline_pct))
return psi
def setup_alerting(self, alert_rules):
"""Setup automated alerting dla model performance"""
alert_configs = []
for rule in alert_rules:
if rule["metric"] == "accuracy_drop":
alert_config = {
"name": f"{self.endpoint_name}-accuracy-alert",
"condition": "accuracy < 0.85",
"frequency": "PT5M", # Check every 5 minutes
"action": ["email", "slack"],
"severity": "high"
}
elif rule["metric"] == "latency_increase":
alert_config = {
"name": f"{self.endpoint_name}-latency-alert",
"condition": "avg_response_time > 2000ms",
"frequency": "PT1M",
"action": ["email"],
"severity": "medium"
}
elif rule["metric"] == "error_rate":
alert_config = {
"name": f"{self.endpoint_name}-error-alert",
"condition": "error_rate > 5%",
"frequency": "PT1M",
"action": ["email", "slack", "pagerduty"],
"severity": "critical"
}
alert_configs.append(alert_config)
return alert_configs
Sesja 18: Wdrażanie modeli i automatyzacja MLOps (30.10.2025)
🚀 Production ML Pipeline Implementation
Complete MLOps Workflow
from azure.ai.ml import Input, Output, command
from azure.ai.ml.entities import Job, Pipeline
from azure.ai.ml.dsl import pipeline
@pipeline(description="Complete MLOps pipeline dla production ML")
def ml_production_pipeline(
training_data: Input(type="uri_folder"),
model_name: str,
deployment_target: str
):
"""End-to-end ML pipeline z training do deployment"""
# Step 1: Data validation i preprocessing
data_prep_step = command(
name="data_preparation",
display_name="Data Preparation i Validation",
code="./src/data_preparation",
command="python prep_data.py --input ${{inputs.training_data}} --output ${{outputs.processed_data}}",
environment="azureml:sklearn-env:1",
inputs={"training_data": training_data},
outputs={"processed_data": Output(type="uri_folder")}
)
# Step 2: Model training z hyperparameter tuning
training_step = command(
name="model_training",
display_name="Model Training i Hyperparameter Tuning",
code="./src/training",
command="python train_model.py --data ${{inputs.processed_data}} --model-output ${{outputs.trained_model}}",
environment="azureml:sklearn-env:1",
inputs={"processed_data": data_prep_step.outputs.processed_data},
outputs={"trained_model": Output(type="mlflow_model")},
compute="gpu-compute-cluster"
)
# Step 3: Model evaluation i validation
evaluation_step = command(
name="model_evaluation",
display_name="Model Evaluation i Quality Gates",
code="./src/evaluation",
command="python evaluate_model.py --model ${{inputs.trained_model}} --test-data ${{inputs.test_data}} --metrics-output ${{outputs.evaluation_metrics}}",
environment="azureml:sklearn-env:1",
inputs={
"trained_model": training_step.outputs.trained_model,
"test_data": data_prep_step.outputs.processed_data
},
outputs={"evaluation_metrics": Output(type="uri_file")}
)
# Step 4: Model registration (conditional on performance)
registration_step = command(
name="model_registration",
display_name="Conditional Model Registration",
code="./src/registration",
command="python register_model.py --model ${{inputs.trained_model}} --metrics ${{inputs.evaluation_metrics}} --model-name {model_name}",
environment="azureml:sklearn-env:1",
inputs={
"trained_model": training_step.outputs.trained_model,
"evaluation_metrics": evaluation_step.outputs.evaluation_metrics
}
)
# Step 5: Deployment (conditional on registration success)
deployment_step = command(
name="model_deployment",
display_name="Model Deployment to Production",
code="./src/deployment",
command="python deploy_model.py --model-name {model_name} --target {deployment_target}",
environment="azureml:sklearn-env:1"
)
# Define step dependencies
registration_step.inputs.trained_model = training_step.outputs.trained_model
registration_step.inputs.evaluation_metrics = evaluation_step.outputs.evaluation_metrics
return {
"trained_model": training_step.outputs.trained_model,
"evaluation_metrics": evaluation_step.outputs.evaluation_metrics
}
Infrastructure jako Code (IaC)
# infrastructure/ml-workspace.bicep
param workspaceName string
param location string = resourceGroup().location
param storageAccountName string
param keyVaultName string
param appInsightsName string
// Storage Account dla ML workspace
resource storageAccount 'Microsoft.Storage/storageAccounts@2023-01-01' = {
name: storageAccountName
location: location
sku: {
name: 'Standard_LRS'
}
kind: 'StorageV2'
properties: {
supportsHttpsTrafficOnly: true
encryption: {
services: {
file: {
enabled: true
}
blob: {
enabled: true
}
}
keySource: 'Microsoft.Storage'
}
}
}
// Key Vault dla secrets management
resource keyVault 'Microsoft.KeyVault/vaults@2023-07-01' = {
name: keyVaultName
location: location
properties: {
tenantId: subscription().tenantId
sku: {
family: 'A'
name: 'standard'
}
accessPolicies: []
enableSoftDelete: true
softDeleteRetentionInDays: 7
}
}
// Application Insights dla monitoring
resource appInsights 'Microsoft.Insights/components@2020-02-02' = {
name: appInsightsName
location: location
kind: 'web'
properties: {
Application_Type: 'web'
WorkspaceResourceId: logAnalyticsWorkspace.id
}
}
// Azure ML Workspace
resource mlWorkspace 'Microsoft.MachineLearningServices/workspaces@2023-10-01' = {
name: workspaceName
location: location
identity: {
type: 'SystemAssigned'
}
properties: {
storageAccount: storageAccount.id
keyVault: keyVault.id
applicationInsights: appInsights.id
hbiWorkspace: false
publicNetworkAccess: 'Enabled'
}
}
🔍 Automated Testing dla ML Systems
ML-Specific Testing Framework
import pytest
import numpy as np
from sklearn.metrics import accuracy_score
import joblib
class MLModelTests:
def __init__(self, model_path, test_data_path):
self.model = joblib.load(model_path)
self.test_data = self._load_test_data(test_data_path)
def test_model_accuracy_threshold(self):
"""Test model meets minimum accuracy requirements"""
X_test, y_test = self.test_data
predictions = self.model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
assert accuracy >= 0.85, f"Model accuracy {accuracy:.3f} below threshold 0.85"
def test_prediction_consistency(self):
"""Test model produces consistent predictions"""
X_sample = self.test_data[0][:10] # Sample of test data
# Multiple predictions dla same input
predictions_1 = self.model.predict(X_sample)
predictions_2 = self.model.predict(X_sample)
# Should be identical dla deterministic models
np.testing.assert_array_equal(
predictions_1, predictions_2,
"Model predictions are not consistent"
)
def test_input_validation(self):
"""Test model handles invalid inputs gracefully"""
# Test z empty input
with pytest.raises((ValueError, AttributeError)):
self.model.predict(np.array([]))
# Test z wrong shape
with pytest.raises((ValueError, AttributeError)):
wrong_shape_input = np.random.rand(5, 999) # Wrong feature count
self.model.predict(wrong_shape_input)
def test_prediction_latency(self):
"""Test model prediction latency requirements"""
import time
X_sample = self.test_data[0][:1]
start_time = time.time()
prediction = self.model.predict(X_sample)
end_time = time.time()
latency_ms = (end_time - start_time) * 1000
assert latency_ms < 100, f"Prediction latency {latency_ms:.2f}ms exceeds 100ms limit"
def test_memory_usage(self):
"""Test model memory footprint"""
import psutil
import os
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
# Load i use model
X_test = self.test_data[0]
predictions = self.model.predict(X_test)
final_memory = process.memory_info().rss / 1024 / 1024 # MB
memory_increase = final_memory - initial_memory
assert memory_increase < 500, f"Memory usage increase {memory_increase:.2f}MB exceeds 500MB limit"
# Example test execution
def run_ml_tests():
"""Run comprehensive ML model tests"""
test_suite = MLModelTests("models/latest_model.pkl", "data/test_data.npz")
# Run all tests
test_results = {}
try:
test_suite.test_model_accuracy_threshold()
test_results["accuracy"] = "PASS"
except AssertionError as e:
test_results["accuracy"] = f"FAIL: {str(e)}"
try:
test_suite.test_prediction_consistency()
test_results["consistency"] = "PASS"
except AssertionError as e:
test_results["consistency"] = f"FAIL: {str(e)}"
try:
test_suite.test_prediction_latency()
test_results["latency"] = "PASS"
except AssertionError as e:
test_results["latency"] = f"FAIL: {str(e)}"
return test_results
Sesja 19: Warsztaty - Wdrażanie modeli i automatyzacja MLOps (30.10.2025)
🏗️ Complete MLOps Implementation Workshop
Project: End-to-End ML System
WORKSHOP DELIVERABLE: PRODUCTION-READY ML SYSTEM
SYSTEM REQUIREMENTS:
- Automated training pipeline triggered by data changes
- A/B testing dla model variants
- Real-time monitoring i alerting
- Automated rollback w case of issues
- Comprehensive logging dla audit purposes
TECHNICAL STACK:
- Azure Machine Learning dla training i deployment
- Azure DevOps dla CI/CD pipelines
- Azure Monitor dla observability
- Azure Key Vault dla secrets management
- Azure Storage dla data i artifact management
TIMELINE (4 hours hands-on):
1. Infrastructure setup (60 min)
2. Pipeline development (90 min)
3. Deployment automation (60 min)
4. Monitoring i testing (30 min)
Implementation Steps
Step 1: Infrastructure Setup (60 min)
# Deploy infrastructure using Bicep templates
az deployment group create \
--resource-group ai-workshop-rg \
--template-file infrastructure/ml-workspace.bicep \
--parameters workspaceName=ai-workshop-ml location=eastus
# Configure compute resources
az ml compute create \
--name cpu-cluster \
--type AmlCompute \
--min-instances 0 \
--max-instances 4 \
--size Standard_DS3_v2
az ml compute create \
--name gpu-cluster \
--type AmlCompute \
--min-instances 0 \
--max-instances 2 \
--size Standard_NC6s_v3
Step 2: Pipeline Development (90 min)
# ml_pipeline.py - Complete training pipeline
from azure.ai.ml import MLClient, command, Input, Output
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import Environment, Model
@pipeline(description="Production ML Pipeline")
def production_ml_pipeline(
training_data: Input(type="uri_folder"),
model_name: str = "production-classifier",
performance_threshold: float = 0.85
):
# Data validation step
data_validation = command(
name="validate_data",
display_name="Data Quality Validation",
code="./src/validation",
command="python validate_data.py --input ${{inputs.data}} --output ${{outputs.validation_report}}",
environment="azureml:sklearn-1.0:1",
inputs={"data": training_data},
outputs={"validation_report": Output(type="uri_file")},
compute="cpu-cluster"
)
# Feature engineering step
feature_engineering = command(
name="feature_engineering",
display_name="Feature Engineering",
code="./src/features",
command="python engineer_features.py --input ${{inputs.data}} --output ${{outputs.features}}",
environment="azureml:sklearn-1.0:1",
inputs={"data": training_data},
outputs={"features": Output(type="uri_folder")},
compute="cpu-cluster"
)
# Model training step
model_training = command(
name="train_model",
display_name="Model Training",
code="./src/training",
command="python train.py --features ${{inputs.features}} --model-output ${{outputs.model}} --model-name {model_name}",
environment="azureml:sklearn-1.0:1",
inputs={"features": feature_engineering.outputs.features},
outputs={"model": Output(type="mlflow_model")},
compute="gpu-cluster"
)
# Model evaluation step
model_evaluation = command(
name="evaluate_model",
display_name="Model Evaluation",
code="./src/evaluation",
command="python evaluate.py --model ${{inputs.model}} --test-data ${{inputs.features}} --threshold {performance_threshold} --metrics-output ${{outputs.metrics}}",
environment="azureml:sklearn-1.0:1",
inputs={
"model": model_training.outputs.model,
"features": feature_engineering.outputs.features
},
outputs={"metrics": Output(type="uri_file")},
compute="cpu-cluster"
)
# Conditional deployment step
model_deployment = command(
name="deploy_model",
display_name="Model Deployment",
code="./src/deployment",
command="python deploy.py --model ${{inputs.model}} --metrics ${{inputs.metrics}} --model-name {model_name}",
environment="azureml:sklearn-1.0:1",
inputs={
"model": model_training.outputs.model,
"metrics": model_evaluation.outputs.metrics
},
compute="cpu-cluster"
)
return {
"trained_model": model_training.outputs.model,
"evaluation_metrics": model_evaluation.outputs.metrics
}
# Execute pipeline
def run_production_pipeline():
ml_client = MLClient.from_config()
# Create pipeline job
pipeline_job = ml_client.jobs.create_or_update(
production_ml_pipeline(
training_data=Input(
type="uri_folder",
path="azureml://datastores/workspaceblobstore/paths/training-data/"
),
model_name="production-classifier-v1",
performance_threshold=0.85
)
)
return pipeline_job
Step 3: Deployment Automation (60 min)
# deployment_automation.py
class AutomatedDeploymentManager:
def __init__(self, ml_client):
self.ml_client = ml_client
self.deployment_strategies = {
"blue_green": self._blue_green_deployment,
"canary": self._canary_deployment,
"rolling": self._rolling_deployment
}
def deploy_with_strategy(self, model_name, model_version, strategy="blue_green"):
"""Deploy model using specified strategy"""
deployment_func = self.deployment_strategies.get(strategy)
if not deployment_func:
raise ValueError(f"Unknown deployment strategy: {strategy}")
return deployment_func(model_name, model_version)
def _blue_green_deployment(self, model_name, model_version):
"""Blue-green deployment implementation"""
endpoint_name = f"{model_name}-endpoint"
# Check if endpoint exists
try:
existing_endpoint = self.ml_client.online_endpoints.get(endpoint_name)
current_deployment = "blue" if "green" in [d.name for d in existing_endpoint.deployments] else "green"
new_deployment = "green" if current_deployment == "blue" else "blue"
except Exception:
# First deployment
current_deployment = None
new_deployment = "blue"
# Create new deployment
new_deployment_config = self._create_deployment_config(
endpoint_name, new_deployment, model_name, model_version
)
# Deploy new version
deployment_result = self.ml_client.online_deployments.begin_create_or_update(
new_deployment_config
).result()
# Test new deployment
test_results = self._run_deployment_tests(endpoint_name, new_deployment)
if test_results["success"]:
# Switch traffic
self._switch_traffic(endpoint_name, new_deployment, 100)
# Clean up old deployment
if current_deployment:
self._cleanup_old_deployment(endpoint_name, current_deployment)
return {"status": "success", "active_deployment": new_deployment}
else:
# Rollback
self._cleanup_failed_deployment(endpoint_name, new_deployment)
return {"status": "failed", "error": test_results["error"]}
def _run_deployment_tests(self, endpoint_name, deployment_name):
"""Run comprehensive tests on new deployment"""
import requests
import time
# Get endpoint scoring URI
endpoint = self.ml_client.online_endpoints.get(endpoint_name)
scoring_uri = endpoint.scoring_uri
# Test scenarios
test_cases = [
{"input": [1, 2, 3, 4, 5], "expected_type": "array"},
{"input": [0.1, 0.2, 0.3], "expected_type": "array"},
{"input": "invalid", "expected_error": True}
]
for i, test_case in enumerate(test_cases):
try:
# Make test request
response = requests.post(
scoring_uri,
json={"data": test_case["input"]},
headers={"Authorization": f"Bearer {self._get_auth_token()}"},
timeout=30
)
if test_case.get("expected_error"):
assert response.status_code != 200, f"Test {i}: Expected error but got success"
else:
assert response.status_code == 200, f"Test {i}: Request failed z status {response.status_code}"
result = response.json()
assert isinstance(result, list), f"Test {i}: Expected array response"
time.sleep(1) # Rate limiting
except Exception as e:
return {"success": False, "error": f"Test {i} failed: {str(e)}"}
return {"success": True, "tests_passed": len(test_cases)}
Step 4: Monitoring Setup (30 min)
# monitoring_setup.py
class MLOpsMonitoring:
def __init__(self, workspace_name, endpoint_name):
self.workspace_name = workspace_name
self.endpoint_name = endpoint_name
self.alert_manager = AlertManager()
def setup_comprehensive_monitoring(self):
"""Setup complete monitoring dla ML system"""
monitoring_config = {
"model_performance": {
"metrics": ["accuracy", "precision", "recall", "f1_score"],
"thresholds": {"accuracy": 0.85, "f1_score": 0.80},
"evaluation_frequency": "daily"
},
"system_performance": {
"metrics": ["latency", "throughput", "error_rate"],
"thresholds": {"latency_ms": 2000, "error_rate_pct": 5},
"evaluation_frequency": "realtime"
},
"data_quality": {
"metrics": ["completeness", "consistency", "drift"],
"thresholds": {"drift_score": 0.2, "completeness_pct": 95},
"evaluation_frequency": "hourly"
}
}
# Setup alerts dla each category
for category, config in monitoring_config.items():
self._setup_category_alerts(category, config)
return monitoring_config
def _setup_category_alerts(self, category, config):
"""Setup alerts dla specific monitoring category"""
for metric in config["metrics"]:
if metric in config["thresholds"]:
alert_rule = {
"name": f"{self.endpoint_name}-{metric}-alert",
"metric": metric,
"threshold": config["thresholds"][metric],
"frequency": config["evaluation_frequency"],
"severity": self._determine_severity(metric),
"actions": ["email", "teams", "pagerduty"]
}
self.alert_manager.create_alert_rule(alert_rule)
🏠 Final Module Project
Capstone: Complete MLOps Implementation
Requirements:
- Full ML Pipeline z automated training, validation, i deployment
- Monitoring Dashboard z real-time metrics i alerts
- Documentation covering architecture, operations, i troubleshooting
- Demo showing end-to-end workflow z sample data
Deliverables:
- Working Azure ML workspace z deployed model
- CI/CD pipeline w Azure DevOps
- Monitoring setup z custom dashboards
- Incident response playbook
- Performance optimization recommendations
Assessment Criteria:
- Technical implementation quality (40%)
- Documentation completeness (25%)
- Monitoring i observability (20%)
- Demo presentation (15%)