Sesja 16: Zarządzanie eksperymentami i zasobami ML
Azure Machine Learning dla production ML
🎯 Cele sesji
- Konfiguracja Azure Machine Learning workspace
- Tracking i zarządzanie eksperymentami ML
- Zarządzanie modelami i wersjami
- Optymalizacja kosztów i zasobów obliczeniowych
🧪 Azure Machine Learning Workspace
Architektura i komponenty
Azure ML Workspace to centralny hub dla wszystkich operacji machine learning w Azure.
Kluczowe komponenty:
- Compute - zasoby obliczeniowe dla treningu i inferencji
- Datastores - bezpieczny dostęp do danych
- Experiments - tracking eksperymentów i metryk
- Models - rejestr modeli z wersjami
- Endpoints - deployment modeli do produkcji
Setup workspace
from azure.ai.ml import MLClient
from azure.ai.ml.entities import Workspace, ComputeInstance, AmlCompute
from azure.identity import DefaultAzureCredential
import os
class AzureMLWorkspaceManager:
def __init__(self, subscription_id, resource_group, workspace_name):
self.credential = DefaultAzureCredential()
self.subscription_id = subscription_id
self.resource_group = resource_group
self.workspace_name = workspace_name
# Initialize ML client
self.ml_client = MLClient(
credential=self.credential,
subscription_id=subscription_id,
resource_group_name=resource_group,
workspace_name=workspace_name
)
def create_workspace(self, location="eastus"):
"""Tworzenie workspace Azure ML"""
workspace = Workspace(
name=self.workspace_name,
location=location,
display_name="AI Workshop ML Workspace",
description="Workspace for AI Workshop ML experiments and models",
hbi_workspace=False, # High Business Impact
tags={
"project": "ai-workshop",
"environment": "development",
"cost_center": "training"
}
)
try:
# Create workspace
workspace_result = self.ml_client.workspaces.begin_create(workspace)
print(f"✅ Workspace {self.workspace_name} created successfully")
return workspace_result
except Exception as e:
if "already exists" in str(e):
print(f"📋 Workspace {self.workspace_name} already exists")
return self.ml_client.workspaces.get(self.workspace_name)
else:
print(f"❌ Error creating workspace: {str(e)}")
raise
def setup_compute_resources(self):
"""Konfiguracja zasobów obliczeniowych"""
compute_configs = []
# CPU compute instance for development
cpu_compute = ComputeInstance(
name="dev-cpu-instance",
size="Standard_DS3_v2",
idle_time_before_shutdown_minutes=30,
description="Development instance for data science work"
)
compute_configs.append(("compute_instance", cpu_compute))
# CPU compute cluster for training
cpu_cluster = AmlCompute(
name="cpu-cluster",
type="amlcompute",
size="Standard_DS3_v2",
min_instances=0,
max_instances=4,
idle_time_before_scale_down=120,
description="CPU cluster for ML training jobs"
)
compute_configs.append(("aml_compute", cpu_cluster))
# GPU cluster for deep learning (optional)
gpu_cluster = AmlCompute(
name="gpu-cluster",
type="amlcompute",
size="Standard_NC6s_v3",
min_instances=0,
max_instances=2,
idle_time_before_scale_down=120,
description="GPU cluster for deep learning workloads"
)
compute_configs.append(("aml_compute", gpu_cluster))
# Create compute resources
created_resources = []
for compute_type, config in compute_configs:
try:
if compute_type == "compute_instance":
result = self.ml_client.compute.begin_create_or_update(config)
else: # aml_compute
result = self.ml_client.compute.begin_create_or_update(config)
created_resources.append({
"name": config.name,
"type": compute_type,
"status": "created",
"size": config.size
})
print(f"✅ Created compute resource: {config.name}")
except Exception as e:
print(f"❌ Failed to create {config.name}: {str(e)}")
created_resources.append({
"name": config.name,
"type": compute_type,
"status": "failed",
"error": str(e)
})
return created_resources
📊 Experiment Tracking
MLflow integration
import mlflow
import mlflow.sklearn
from azure.ai.ml import command, Input, Output
from azure.ai.ml.entities import Environment, Model
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
import joblib
import json
class MLExperimentTracker:
def __init__(self, ml_client, experiment_name):
self.ml_client = ml_client
self.experiment_name = experiment_name
# Configure MLflow tracking
mlflow.set_tracking_uri(ml_client.workspaces.get().mlflow_tracking_uri)
mlflow.set_experiment(experiment_name)
def run_training_experiment(self, dataset_path, model_config, experiment_metadata=None):
"""Uruchomienie eksperymentu treningowego z pełnym trackingiem"""
with mlflow.start_run(run_name=f"training_{model_config['model_type']}") as run:
# Log experiment metadata
if experiment_metadata:
mlflow.set_tags(experiment_metadata)
# Log hyperparameters
mlflow.log_params(model_config)
try:
# Load and prepare data
print("📊 Loading and preparing data...")
data = pd.read_csv(dataset_path)
X, y = self._prepare_features_target(data, model_config)
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Log data statistics
mlflow.log_metrics({
"train_samples": len(X_train),
"test_samples": len(X_test),
"features_count": X.shape[1],
"classes_count": len(y.unique())
})
# Train model
print("🤖 Training model...")
model = self._train_model(X_train, y_train, model_config)
# Evaluate model
print("📈 Evaluating model...")
metrics = self._evaluate_model(model, X_test, y_test)
# Log metrics
mlflow.log_metrics(metrics)
# Log model artifacts
model_path = "trained_model"
mlflow.sklearn.log_model(
model,
model_path,
registered_model_name=f"{self.experiment_name}_model"
)
# Save additional artifacts
self._save_experiment_artifacts(model, X_test, y_test, metrics)
# Register model if performance is good enough
if metrics.get("accuracy", 0) > model_config.get("min_accuracy", 0.8):
registered_model = self._register_model(model, run.info.run_id, metrics)
mlflow.log_param("model_registered", True)
mlflow.log_param("registered_model_name", registered_model["name"])
else:
mlflow.log_param("model_registered", False)
mlflow.log_param("registration_reason", "Accuracy below threshold")
return {
"run_id": run.info.run_id,
"status": "success",
"metrics": metrics,
"model_path": f"runs:/{run.info.run_id}/{model_path}"
}
except Exception as e:
mlflow.log_param("error", str(e))
print(f"❌ Experiment failed: {str(e)}")
return {
"run_id": run.info.run_id,
"status": "failed",
"error": str(e)
}
def _train_model(self, X_train, y_train, config):
"""Training model based na configuration"""
model_type = config["model_type"]
if model_type == "random_forest":
model = RandomForestClassifier(
n_estimators=config.get("n_estimators", 100),
max_depth=config.get("max_depth", None),
min_samples_split=config.get("min_samples_split", 2),
random_state=42,
n_jobs=-1
)
elif model_type == "logistic_regression":
from sklearn.linear_model import LogisticRegression
model = LogisticRegression(
C=config.get("C", 1.0),
max_iter=config.get("max_iter", 1000),
random_state=42
)
else:
raise ValueError(f"Unsupported model type: {model_type}")
# Train with timing
import time
start_time = time.time()
model.fit(X_train, y_train)
training_time = time.time() - start_time
# Log training time
mlflow.log_metric("training_time_seconds", training_time)
return model
def _evaluate_model(self, model, X_test, y_test):
"""Comprehensive model evaluation"""
# Predictions
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test) if hasattr(model, 'predict_proba') else None
# Basic metrics
accuracy = accuracy_score(y_test, y_pred)
# Detailed classification metrics
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
f1 = f1_score(y_test, y_pred, average='weighted')
metrics = {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1_score": f1
}
# Log confusion matrix as artifact
cm = confusion_matrix(y_test, y_pred)
self._log_confusion_matrix(cm, y_test)
# Feature importance (if available)
if hasattr(model, 'feature_importances_'):
self._log_feature_importance(model.feature_importances_)
return metrics
def _register_model(self, model, run_id, metrics):
"""Register model w Azure ML Model Registry"""
model_name = f"{self.experiment_name}_model"
# Create model entity
model_entity = Model(
name=model_name,
path=f"runs:/{run_id}/trained_model",
description=f"Model trained in experiment {self.experiment_name}",
tags={
"accuracy": str(metrics.get("accuracy", 0)),
"f1_score": str(metrics.get("f1_score", 0)),
"experiment": self.experiment_name,
"framework": "scikit-learn"
}
)
# Register model
registered_model = self.ml_client.models.create_or_update(model_entity)
print(f"✅ Model registered: {registered_model.name} v{registered_model.version}")
return {
"name": registered_model.name,
"version": registered_model.version,
"id": registered_model.id
}
def compare_experiments(self, experiment_names=None):
"""Porównanie wyników z różnych eksperymentów"""
if experiment_names is None:
experiment_names = [self.experiment_name]
comparison_data = []
for exp_name in experiment_names:
experiment = mlflow.get_experiment_by_name(exp_name)
if experiment:
runs = mlflow.search_runs(
experiment_ids=[experiment.experiment_id],
order_by=["metrics.accuracy DESC"]
)
for _, run in runs.head(5).iterrows(): # Top 5 runs
comparison_data.append({
"experiment": exp_name,
"run_id": run["run_id"],
"accuracy": run.get("metrics.accuracy", 0),
"f1_score": run.get("metrics.f1_score", 0),
"model_type": run.get("params.model_type", "unknown"),
"training_time": run.get("metrics.training_time_seconds", 0)
})
# Create comparison DataFrame
comparison_df = pd.DataFrame(comparison_data)
# Generate comparison report
report = {
"best_accuracy": comparison_df.loc[comparison_df["accuracy"].idxmax()].to_dict(),
"best_f1": comparison_df.loc[comparison_df["f1_score"].idxmax()].to_dict(),
"fastest_training": comparison_df.loc[comparison_df["training_time"].idxmin()].to_dict(),
"summary_stats": comparison_df.describe().to_dict()
}
return report, comparison_df
💰 Resource Management i Cost Optimization
Cost monitoring i optimization
from azure.monitor.query import LogsQueryClient
from azure.mgmt.consumption import ConsumptionManagementClient
import pandas as pd
from datetime import datetime, timedelta
class MLResourceOptimizer:
def __init__(self, ml_client, subscription_id):
self.ml_client = ml_client
self.subscription_id = subscription_id
self.consumption_client = ConsumptionManagementClient(
credential=DefaultAzureCredential(),
subscription_id=subscription_id
)
def analyze_compute_usage(self, days_back=7):
"""Analiza wykorzystania zasobów obliczeniowych"""
usage_stats = []
# Get all compute resources
compute_resources = list(self.ml_client.compute.list())
for compute in compute_resources:
try:
# Get usage statistics
usage_info = self._get_compute_usage_stats(compute.name, days_back)
usage_stats.append({
"resource_name": compute.name,
"resource_type": compute.type,
"size": getattr(compute, 'size', 'unknown'),
"current_state": getattr(compute, 'state', 'unknown'),
"utilization_percent": usage_info.get("utilization", 0),
"total_runtime_hours": usage_info.get("runtime_hours", 0),
"estimated_cost": usage_info.get("estimated_cost", 0),
"idle_time_hours": usage_info.get("idle_hours", 0)
})
except Exception as e:
print(f"⚠️ Could not get usage stats for {compute.name}: {str(e)}")
return pd.DataFrame(usage_stats)
def get_optimization_recommendations(self, usage_stats_df):
"""Generowanie rekomendacji optymalizacyjnych"""
recommendations = []
for _, resource in usage_stats_df.iterrows():
# Recommend shutdown for low utilization
if resource["utilization_percent"] < 20 and resource["current_state"] == "Running":
recommendations.append({
"resource": resource["resource_name"],
"type": "shutdown",
"reason": f"Low utilization ({resource['utilization_percent']:.1f}%)",
"potential_savings": resource["estimated_cost"] * 0.8,
"priority": "high"
})
# Recommend downsizing for consistently low usage
if (resource["utilization_percent"] < 40 and
resource["total_runtime_hours"] > 10 and
"Standard_D" in resource["size"]):
recommendations.append({
"resource": resource["resource_name"],
"type": "downsize",
"reason": f"Consistent low utilization ({resource['utilization_percent']:.1f}%)",
"suggested_size": self._suggest_smaller_size(resource["size"]),
"potential_savings": resource["estimated_cost"] * 0.3,
"priority": "medium"
})
# Recommend scheduling for batch workloads
if resource["idle_time_hours"] > resource["total_runtime_hours"] * 2:
recommendations.append({
"resource": resource["resource_name"],
"type": "schedule",
"reason": f"High idle time ({resource['idle_time_hours']:.1f}h idle vs {resource['total_runtime_hours']:.1f}h active)",
"suggestion": "Implement automated start/stop scheduling",
"potential_savings": resource["estimated_cost"] * 0.5,
"priority": "medium"
})
return sorted(recommendations, key=lambda x: x["potential_savings"], reverse=True)
def implement_cost_controls(self, budget_limit_usd=1000):
"""Implementacja kontroli kosztów"""
cost_controls = {
"auto_shutdown_policies": self._setup_auto_shutdown(),
"budget_alerts": self._setup_budget_alerts(budget_limit_usd),
"resource_tagging": self._implement_cost_tagging(),
"scheduled_scaling": self._setup_scheduled_scaling()
}
return cost_controls
def _setup_auto_shutdown(self):
"""Konfiguracja automatycznego wyłączania"""
shutdown_policies = []
compute_instances = [
compute for compute in self.ml_client.compute.list()
if compute.type == "ComputeInstance"
]
for instance in compute_instances:
try:
# Update compute instance with auto-shutdown
instance.idle_time_before_shutdown_minutes = 30
instance.description = f"{instance.description} - Auto-shutdown: 30min idle"
# Apply update
self.ml_client.compute.begin_create_or_update(instance)
shutdown_policies.append({
"resource": instance.name,
"policy": "auto_shutdown_30min",
"status": "applied"
})
except Exception as e:
shutdown_policies.append({
"resource": instance.name,
"policy": "auto_shutdown_30min",
"status": "failed",
"error": str(e)
})
return shutdown_policies
def generate_cost_report(self, period_days=30):
"""Generowanie raportu kosztów"""
end_date = datetime.now()
start_date = end_date - timedelta(days=period_days)
report = {
"report_period": f"{start_date.date()} to {end_date.date()}",
"total_cost_usd": 0,
"cost_breakdown": {},
"top_cost_drivers": [],
"optimization_impact": {},
"recommendations": []
}
try:
# Get usage data for the period
usage_data = self.analyze_compute_usage(period_days)
# Calculate total costs
total_cost = usage_data["estimated_cost"].sum()
report["total_cost_usd"] = round(total_cost, 2)
# Cost breakdown by resource type
cost_by_type = usage_data.groupby("resource_type")["estimated_cost"].sum().to_dict()
report["cost_breakdown"] = cost_by_type
# Top cost drivers
top_costs = usage_data.nlargest(5, "estimated_cost")[["resource_name", "estimated_cost"]].to_dict("records")
report["top_cost_drivers"] = top_costs
# Get optimization recommendations
recommendations = self.get_optimization_recommendations(usage_data)
report["recommendations"] = recommendations[:10] # Top 10
# Calculate potential savings
potential_savings = sum(rec.get("potential_savings", 0) for rec in recommendations)
report["optimization_impact"]["potential_monthly_savings"] = round(potential_savings, 2)
report["optimization_impact"]["savings_percentage"] = round((potential_savings / total_cost) * 100, 1) if total_cost > 0 else 0
except Exception as e:
report["error"] = f"Error generating cost report: {str(e)}"
return report
✅ Zadania praktyczne
Zadanie 1: Workspace Setup (30 min)
- Stwórz Azure ML workspace
- Skonfiguruj compute resources (CPU i GPU clusters)
- Setup datastores dla danych treningowych
- Przetestuj połączenie i dostępność
Zadanie 2: Experiment Tracking (45 min)
- Zaimplementuj tracking eksperymentu ML
- Wytrenuj modele z różnymi hiperparametrami
- Porównaj wyniki różnych eksperymentów
- Zarejestruj najlepszy model w registry
Zadanie 3: Cost Optimization (30 min)
- Przeanalizuj koszty compute resources
- Zaimplementuj auto-shutdown policies
- Stwórz dashboard monitoringu kosztów
- Przygotuj plan optymalizacji
Zadanie 4: Resource Monitoring (15 min)
- Skonfiguruj alerty dla wysokiego wykorzystania
- Setup automated reporting
- Implementuj cost budgets
- Przetestuj notification system
📊 Metryki sukcesu
- Resource utilization > 70% dla active workloads
- Cost optimization - reduction o 30% w idle resources
- Experiment tracking - 100% eksperymentów z metrykami
- Model registry - systematic versioning i deployment