Sesja 16: Zarządzanie eksperymentami i zasobami ML

Azure Machine Learning dla production ML

🎯 Cele sesji

  • Konfiguracja Azure Machine Learning workspace
  • Tracking i zarządzanie eksperymentami ML
  • Zarządzanie modelami i wersjami
  • Optymalizacja kosztów i zasobów obliczeniowych

🧪 Azure Machine Learning Workspace

Architektura i komponenty

Azure ML Workspace to centralny hub dla wszystkich operacji machine learning w Azure.

Kluczowe komponenty:

  • Compute - zasoby obliczeniowe dla treningu i inferencji
  • Datastores - bezpieczny dostęp do danych
  • Experiments - tracking eksperymentów i metryk
  • Models - rejestr modeli z wersjami
  • Endpoints - deployment modeli do produkcji

Setup workspace

from azure.ai.ml import MLClient
from azure.ai.ml.entities import Workspace, ComputeInstance, AmlCompute
from azure.identity import DefaultAzureCredential
import os

class AzureMLWorkspaceManager:
    def __init__(self, subscription_id, resource_group, workspace_name):
        self.credential = DefaultAzureCredential()
        self.subscription_id = subscription_id
        self.resource_group = resource_group
        self.workspace_name = workspace_name
        
        # Initialize ML client
        self.ml_client = MLClient(
            credential=self.credential,
            subscription_id=subscription_id,
            resource_group_name=resource_group,
            workspace_name=workspace_name
        )
    
    def create_workspace(self, location="eastus"):
        """Tworzenie workspace Azure ML"""
        
        workspace = Workspace(
            name=self.workspace_name,
            location=location,
            display_name="AI Workshop ML Workspace",
            description="Workspace for AI Workshop ML experiments and models",
            hbi_workspace=False,  # High Business Impact
            tags={
                "project": "ai-workshop",
                "environment": "development", 
                "cost_center": "training"
            }
        )
        
        try:
            # Create workspace
            workspace_result = self.ml_client.workspaces.begin_create(workspace)
            print(f"✅ Workspace {self.workspace_name} created successfully")
            return workspace_result
            
        except Exception as e:
            if "already exists" in str(e):
                print(f"📋 Workspace {self.workspace_name} already exists")
                return self.ml_client.workspaces.get(self.workspace_name)
            else:
                print(f"❌ Error creating workspace: {str(e)}")
                raise
    
    def setup_compute_resources(self):
        """Konfiguracja zasobów obliczeniowych"""
        
        compute_configs = []
        
        # CPU compute instance for development
        cpu_compute = ComputeInstance(
            name="dev-cpu-instance",
            size="Standard_DS3_v2",
            idle_time_before_shutdown_minutes=30,
            description="Development instance for data science work"
        )
        compute_configs.append(("compute_instance", cpu_compute))
        
        # CPU compute cluster for training
        cpu_cluster = AmlCompute(
            name="cpu-cluster",
            type="amlcompute",
            size="Standard_DS3_v2",
            min_instances=0,
            max_instances=4,
            idle_time_before_scale_down=120,
            description="CPU cluster for ML training jobs"
        )
        compute_configs.append(("aml_compute", cpu_cluster))
        
        # GPU cluster for deep learning (optional)
        gpu_cluster = AmlCompute(
            name="gpu-cluster", 
            type="amlcompute",
            size="Standard_NC6s_v3",
            min_instances=0,
            max_instances=2,
            idle_time_before_scale_down=120,
            description="GPU cluster for deep learning workloads"
        )
        compute_configs.append(("aml_compute", gpu_cluster))
        
        # Create compute resources
        created_resources = []
        for compute_type, config in compute_configs:
            try:
                if compute_type == "compute_instance":
                    result = self.ml_client.compute.begin_create_or_update(config)
                else:  # aml_compute
                    result = self.ml_client.compute.begin_create_or_update(config)
                
                created_resources.append({
                    "name": config.name,
                    "type": compute_type,
                    "status": "created",
                    "size": config.size
                })
                print(f"✅ Created compute resource: {config.name}")
                
            except Exception as e:
                print(f"❌ Failed to create {config.name}: {str(e)}")
                created_resources.append({
                    "name": config.name,
                    "type": compute_type, 
                    "status": "failed",
                    "error": str(e)
                })
        
        return created_resources

📊 Experiment Tracking

MLflow integration

import mlflow
import mlflow.sklearn
from azure.ai.ml import command, Input, Output
from azure.ai.ml.entities import Environment, Model
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
import joblib
import json

class MLExperimentTracker:
    def __init__(self, ml_client, experiment_name):
        self.ml_client = ml_client
        self.experiment_name = experiment_name
        
        # Configure MLflow tracking
        mlflow.set_tracking_uri(ml_client.workspaces.get().mlflow_tracking_uri)
        mlflow.set_experiment(experiment_name)
        
    def run_training_experiment(self, dataset_path, model_config, experiment_metadata=None):
        """Uruchomienie eksperymentu treningowego z pełnym trackingiem"""
        
        with mlflow.start_run(run_name=f"training_{model_config['model_type']}") as run:
            
            # Log experiment metadata
            if experiment_metadata:
                mlflow.set_tags(experiment_metadata)
            
            # Log hyperparameters
            mlflow.log_params(model_config)
            
            try:
                # Load and prepare data
                print("📊 Loading and preparing data...")
                data = pd.read_csv(dataset_path)
                X, y = self._prepare_features_target(data, model_config)
                
                # Split data
                X_train, X_test, y_train, y_test = train_test_split(
                    X, y, test_size=0.2, random_state=42, stratify=y
                )
                
                # Log data statistics
                mlflow.log_metrics({
                    "train_samples": len(X_train),
                    "test_samples": len(X_test),
                    "features_count": X.shape[1],
                    "classes_count": len(y.unique())
                })
                
                # Train model
                print("🤖 Training model...")
                model = self._train_model(X_train, y_train, model_config)
                
                # Evaluate model
                print("📈 Evaluating model...")
                metrics = self._evaluate_model(model, X_test, y_test)
                
                # Log metrics
                mlflow.log_metrics(metrics)
                
                # Log model artifacts
                model_path = "trained_model"
                mlflow.sklearn.log_model(
                    model, 
                    model_path,
                    registered_model_name=f"{self.experiment_name}_model"
                )
                
                # Save additional artifacts
                self._save_experiment_artifacts(model, X_test, y_test, metrics)
                
                # Register model if performance is good enough
                if metrics.get("accuracy", 0) > model_config.get("min_accuracy", 0.8):
                    registered_model = self._register_model(model, run.info.run_id, metrics)
                    mlflow.log_param("model_registered", True)
                    mlflow.log_param("registered_model_name", registered_model["name"])
                else:
                    mlflow.log_param("model_registered", False)
                    mlflow.log_param("registration_reason", "Accuracy below threshold")
                
                return {
                    "run_id": run.info.run_id,
                    "status": "success",
                    "metrics": metrics,
                    "model_path": f"runs:/{run.info.run_id}/{model_path}"
                }
                
            except Exception as e:
                mlflow.log_param("error", str(e))
                print(f"❌ Experiment failed: {str(e)}")
                return {
                    "run_id": run.info.run_id,
                    "status": "failed", 
                    "error": str(e)
                }
    
    def _train_model(self, X_train, y_train, config):
        """Training model based na configuration"""
        
        model_type = config["model_type"]
        
        if model_type == "random_forest":
            model = RandomForestClassifier(
                n_estimators=config.get("n_estimators", 100),
                max_depth=config.get("max_depth", None),
                min_samples_split=config.get("min_samples_split", 2),
                random_state=42,
                n_jobs=-1
            )
        elif model_type == "logistic_regression":
            from sklearn.linear_model import LogisticRegression
            model = LogisticRegression(
                C=config.get("C", 1.0),
                max_iter=config.get("max_iter", 1000),
                random_state=42
            )
        else:
            raise ValueError(f"Unsupported model type: {model_type}")
        
        # Train with timing
        import time
        start_time = time.time()
        model.fit(X_train, y_train)
        training_time = time.time() - start_time
        
        # Log training time
        mlflow.log_metric("training_time_seconds", training_time)
        
        return model
    
    def _evaluate_model(self, model, X_test, y_test):
        """Comprehensive model evaluation"""
        
        # Predictions
        y_pred = model.predict(X_test)
        y_pred_proba = model.predict_proba(X_test) if hasattr(model, 'predict_proba') else None
        
        # Basic metrics
        accuracy = accuracy_score(y_test, y_pred)
        
        # Detailed classification metrics
        from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
        
        precision = precision_score(y_test, y_pred, average='weighted')
        recall = recall_score(y_test, y_pred, average='weighted')
        f1 = f1_score(y_test, y_pred, average='weighted')
        
        metrics = {
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "f1_score": f1
        }
        
        # Log confusion matrix as artifact
        cm = confusion_matrix(y_test, y_pred)
        self._log_confusion_matrix(cm, y_test)
        
        # Feature importance (if available)
        if hasattr(model, 'feature_importances_'):
            self._log_feature_importance(model.feature_importances_)
        
        return metrics
    
    def _register_model(self, model, run_id, metrics):
        """Register model w Azure ML Model Registry"""
        
        model_name = f"{self.experiment_name}_model"
        
        # Create model entity
        model_entity = Model(
            name=model_name,
            path=f"runs:/{run_id}/trained_model",
            description=f"Model trained in experiment {self.experiment_name}",
            tags={
                "accuracy": str(metrics.get("accuracy", 0)),
                "f1_score": str(metrics.get("f1_score", 0)),
                "experiment": self.experiment_name,
                "framework": "scikit-learn"
            }
        )
        
        # Register model
        registered_model = self.ml_client.models.create_or_update(model_entity)
        
        print(f"✅ Model registered: {registered_model.name} v{registered_model.version}")
        
        return {
            "name": registered_model.name,
            "version": registered_model.version,
            "id": registered_model.id
        }
    
    def compare_experiments(self, experiment_names=None):
        """Porównanie wyników z różnych eksperymentów"""
        
        if experiment_names is None:
            experiment_names = [self.experiment_name]
        
        comparison_data = []
        
        for exp_name in experiment_names:
            experiment = mlflow.get_experiment_by_name(exp_name)
            if experiment:
                runs = mlflow.search_runs(
                    experiment_ids=[experiment.experiment_id],
                    order_by=["metrics.accuracy DESC"]
                )
                
                for _, run in runs.head(5).iterrows():  # Top 5 runs
                    comparison_data.append({
                        "experiment": exp_name,
                        "run_id": run["run_id"],
                        "accuracy": run.get("metrics.accuracy", 0),
                        "f1_score": run.get("metrics.f1_score", 0),
                        "model_type": run.get("params.model_type", "unknown"),
                        "training_time": run.get("metrics.training_time_seconds", 0)
                    })
        
        # Create comparison DataFrame
        comparison_df = pd.DataFrame(comparison_data)
        
        # Generate comparison report
        report = {
            "best_accuracy": comparison_df.loc[comparison_df["accuracy"].idxmax()].to_dict(),
            "best_f1": comparison_df.loc[comparison_df["f1_score"].idxmax()].to_dict(),
            "fastest_training": comparison_df.loc[comparison_df["training_time"].idxmin()].to_dict(),
            "summary_stats": comparison_df.describe().to_dict()
        }
        
        return report, comparison_df

💰 Resource Management i Cost Optimization

Cost monitoring i optimization

from azure.monitor.query import LogsQueryClient
from azure.mgmt.consumption import ConsumptionManagementClient
import pandas as pd
from datetime import datetime, timedelta

class MLResourceOptimizer:
    def __init__(self, ml_client, subscription_id):
        self.ml_client = ml_client
        self.subscription_id = subscription_id
        self.consumption_client = ConsumptionManagementClient(
            credential=DefaultAzureCredential(),
            subscription_id=subscription_id
        )
    
    def analyze_compute_usage(self, days_back=7):
        """Analiza wykorzystania zasobów obliczeniowych"""
        
        usage_stats = []
        
        # Get all compute resources
        compute_resources = list(self.ml_client.compute.list())
        
        for compute in compute_resources:
            try:
                # Get usage statistics
                usage_info = self._get_compute_usage_stats(compute.name, days_back)
                
                usage_stats.append({
                    "resource_name": compute.name,
                    "resource_type": compute.type,
                    "size": getattr(compute, 'size', 'unknown'),
                    "current_state": getattr(compute, 'state', 'unknown'),
                    "utilization_percent": usage_info.get("utilization", 0),
                    "total_runtime_hours": usage_info.get("runtime_hours", 0),
                    "estimated_cost": usage_info.get("estimated_cost", 0),
                    "idle_time_hours": usage_info.get("idle_hours", 0)
                })
                
            except Exception as e:
                print(f"⚠️  Could not get usage stats for {compute.name}: {str(e)}")
        
        return pd.DataFrame(usage_stats)
    
    def get_optimization_recommendations(self, usage_stats_df):
        """Generowanie rekomendacji optymalizacyjnych"""
        
        recommendations = []
        
        for _, resource in usage_stats_df.iterrows():
            
            # Recommend shutdown for low utilization
            if resource["utilization_percent"] < 20 and resource["current_state"] == "Running":
                recommendations.append({
                    "resource": resource["resource_name"],
                    "type": "shutdown",
                    "reason": f"Low utilization ({resource['utilization_percent']:.1f}%)",
                    "potential_savings": resource["estimated_cost"] * 0.8,
                    "priority": "high"
                })
            
            # Recommend downsizing for consistently low usage
            if (resource["utilization_percent"] < 40 and 
                resource["total_runtime_hours"] > 10 and
                "Standard_D" in resource["size"]):
                
                recommendations.append({
                    "resource": resource["resource_name"],
                    "type": "downsize",
                    "reason": f"Consistent low utilization ({resource['utilization_percent']:.1f}%)",
                    "suggested_size": self._suggest_smaller_size(resource["size"]),
                    "potential_savings": resource["estimated_cost"] * 0.3,
                    "priority": "medium"
                })
            
            # Recommend scheduling for batch workloads
            if resource["idle_time_hours"] > resource["total_runtime_hours"] * 2:
                recommendations.append({
                    "resource": resource["resource_name"],
                    "type": "schedule",
                    "reason": f"High idle time ({resource['idle_time_hours']:.1f}h idle vs {resource['total_runtime_hours']:.1f}h active)",
                    "suggestion": "Implement automated start/stop scheduling",
                    "potential_savings": resource["estimated_cost"] * 0.5,
                    "priority": "medium"
                })
        
        return sorted(recommendations, key=lambda x: x["potential_savings"], reverse=True)
    
    def implement_cost_controls(self, budget_limit_usd=1000):
        """Implementacja kontroli kosztów"""
        
        cost_controls = {
            "auto_shutdown_policies": self._setup_auto_shutdown(),
            "budget_alerts": self._setup_budget_alerts(budget_limit_usd),
            "resource_tagging": self._implement_cost_tagging(),
            "scheduled_scaling": self._setup_scheduled_scaling()
        }
        
        return cost_controls
    
    def _setup_auto_shutdown(self):
        """Konfiguracja automatycznego wyłączania"""
        
        shutdown_policies = []
        
        compute_instances = [
            compute for compute in self.ml_client.compute.list() 
            if compute.type == "ComputeInstance"
        ]
        
        for instance in compute_instances:
            try:
                # Update compute instance with auto-shutdown
                instance.idle_time_before_shutdown_minutes = 30
                instance.description = f"{instance.description} - Auto-shutdown: 30min idle"
                
                # Apply update
                self.ml_client.compute.begin_create_or_update(instance)
                
                shutdown_policies.append({
                    "resource": instance.name,
                    "policy": "auto_shutdown_30min",
                    "status": "applied"
                })
                
            except Exception as e:
                shutdown_policies.append({
                    "resource": instance.name,
                    "policy": "auto_shutdown_30min", 
                    "status": "failed",
                    "error": str(e)
                })
        
        return shutdown_policies
    
    def generate_cost_report(self, period_days=30):
        """Generowanie raportu kosztów"""
        
        end_date = datetime.now()
        start_date = end_date - timedelta(days=period_days)
        
        report = {
            "report_period": f"{start_date.date()} to {end_date.date()}",
            "total_cost_usd": 0,
            "cost_breakdown": {},
            "top_cost_drivers": [],
            "optimization_impact": {},
            "recommendations": []
        }
        
        try:
            # Get usage data for the period
            usage_data = self.analyze_compute_usage(period_days)
            
            # Calculate total costs
            total_cost = usage_data["estimated_cost"].sum()
            report["total_cost_usd"] = round(total_cost, 2)
            
            # Cost breakdown by resource type
            cost_by_type = usage_data.groupby("resource_type")["estimated_cost"].sum().to_dict()
            report["cost_breakdown"] = cost_by_type
            
            # Top cost drivers
            top_costs = usage_data.nlargest(5, "estimated_cost")[["resource_name", "estimated_cost"]].to_dict("records")
            report["top_cost_drivers"] = top_costs
            
            # Get optimization recommendations
            recommendations = self.get_optimization_recommendations(usage_data)
            report["recommendations"] = recommendations[:10]  # Top 10
            
            # Calculate potential savings
            potential_savings = sum(rec.get("potential_savings", 0) for rec in recommendations)
            report["optimization_impact"]["potential_monthly_savings"] = round(potential_savings, 2)
            report["optimization_impact"]["savings_percentage"] = round((potential_savings / total_cost) * 100, 1) if total_cost > 0 else 0
            
        except Exception as e:
            report["error"] = f"Error generating cost report: {str(e)}"
        
        return report

✅ Zadania praktyczne

Zadanie 1: Workspace Setup (30 min)

  1. Stwórz Azure ML workspace
  2. Skonfiguruj compute resources (CPU i GPU clusters)
  3. Setup datastores dla danych treningowych
  4. Przetestuj połączenie i dostępność

Zadanie 2: Experiment Tracking (45 min)

  1. Zaimplementuj tracking eksperymentu ML
  2. Wytrenuj modele z różnymi hiperparametrami
  3. Porównaj wyniki różnych eksperymentów
  4. Zarejestruj najlepszy model w registry

Zadanie 3: Cost Optimization (30 min)

  1. Przeanalizuj koszty compute resources
  2. Zaimplementuj auto-shutdown policies
  3. Stwórz dashboard monitoringu kosztów
  4. Przygotuj plan optymalizacji

Zadanie 4: Resource Monitoring (15 min)

  1. Skonfiguruj alerty dla wysokiego wykorzystania
  2. Setup automated reporting
  3. Implementuj cost budgets
  4. Przetestuj notification system

📊 Metryki sukcesu

  • Resource utilization > 70% dla active workloads
  • Cost optimization - reduction o 30% w idle resources
  • Experiment tracking - 100% eksperymentów z metrykami
  • Model registry - systematic versioning i deployment

📚 Materiały dodatkowe

💡 Wskazówka

Każda sesja to 2 godziny intensywnej nauki z praktycznymi ćwiczeniami. Materiały można przeglądać w dowolnym tempie.

📈 Postęp

Śledź swój postęp w nauce AI i przygotowaniu do certyfikacji Azure AI-102. Każdy moduł buduje na poprzednim.