Sesja 8: Analiza obrazów i rozpoznawanie obiektów

Azure Computer Vision Services

🎯 Cele sesji

  • Implementacja Azure Computer Vision API
  • Budowa custom models dla rozpoznawania obiektów
  • OCR i analiza tekstu w obrazach
  • Tworzenie aplikacji z analizą wizualną

👁️ Podstawy Computer Vision

Podstawowe zadania Computer Vision

Klasyfikacja obrazów:

  • Pojedyncza etykieta - jeden obiekt na obraz
  • Wielokrotne etykiety - wiele obiektów na obraz
  • Hierarchiczna klasyfikacja - kategorie i podkategorie

Wykrywanie obiektów (Object Detection):

  • Bounding boxes wokół obiektów
  • Klasyfikacja + lokalizacja
  • Real-time processing

Segmentacja:

  • Semantic segmentation - klasyfikacja pikseli
  • Instance segmentation - rozróżnienie instancji objektów

Azure Computer Vision Services

from azure.cognitiveservices.vision.computervision import ComputerVisionClient
from azure.cognitiveservices.vision.computervision.models import VisualFeatureTypes
from msrest.authentication import CognitiveServicesCredentials
import io

class AzureVisionAnalyzer:
    def __init__(self, subscription_key, endpoint):
        self.client = ComputerVisionClient(
            endpoint,
            CognitiveServicesCredentials(subscription_key)
        )
    
    def analyze_image_comprehensive(self, image_url):
        """Kompleksowa analiza obrazu"""
        
        # Pełna analiza z wszystkimi dostępnymi funkcjami
        analysis = self.client.analyze_image(
            image_url,
            visual_features=[
                VisualFeatureTypes.tags,
                VisualFeatureTypes.description,
                VisualFeatureTypes.faces,
                VisualFeatureTypes.image_type,
                VisualFeatureTypes.color,
                VisualFeatureTypes.adult,
                VisualFeatureTypes.categories,
                VisualFeatureTypes.objects,
                VisualFeatureTypes.brands
            ]
        )
        
        # Formatowanie wyników
        results = {
            "description": {
                "captions": [caption.text for caption in analysis.description.captions],
                "confidence": analysis.description.captions[0].confidence if analysis.description.captions else 0
            },
            "tags": [{"name": tag.name, "confidence": tag.confidence} for tag in analysis.tags],
            "categories": [{"name": cat.name, "score": cat.score} for cat in analysis.categories],
            "objects": [],
            "faces": [],
            "color": {
                "dominant_color_foreground": analysis.color.dominant_color_foreground,
                "dominant_color_background": analysis.color.dominant_color_background,
                "accent_color": analysis.color.accent_color
            },
            "image_type": {
                "clip_art_type": analysis.image_type.clip_art_type,
                "line_drawing_type": analysis.image_type.line_drawing_type
            }
        }
        
        # Przetwarzanie obiektów
        if analysis.objects:
            for obj in analysis.objects:
                results["objects"].append({
                    "object": obj.object_property,
                    "confidence": obj.confidence,
                    "rectangle": {
                        "x": obj.rectangle.x,
                        "y": obj.rectangle.y,
                        "w": obj.rectangle.w,
                        "h": obj.rectangle.h
                    }
                })
        
        # Przetwarzanie twarzy
        if analysis.faces:
            for face in analysis.faces:
                results["faces"].append({
                    "age": face.age,
                    "gender": face.gender,
                    "rectangle": {
                        "left": face.face_rectangle.left,
                        "top": face.face_rectangle.top,
                        "width": face.face_rectangle.width,
                        "height": face.face_rectangle.height
                    }
                })
        
        return results
    
    def extract_text_from_image(self, image_url):
        """OCR - wydobywanie tekstu z obrazu"""
        
        # Rozpoczęcie operacji Read
        read_response = self.client.read(image_url, raw=True)
        
        # Pobranie Operation-Location header
        operation_location_remote = read_response.headers["Operation-Location"]
        operation_id = operation_location_remote.split("/")[-1]
        
        # Oczekiwanie na zakończenie operacji
        import time
        while True:
            read_result = self.client.get_read_result(operation_id)
            if read_result.status not in ['notStarted', 'running']:
                break
            time.sleep(1)
        
        # Wydobywanie tekstu
        extracted_text = []
        if read_result.status == 'succeeded':
            for text_result in read_result.analyze_result.read_results:
                for line in text_result.lines:
                    extracted_text.append({
                        "text": line.text,
                        "bounding_box": [round(coord, 2) for coord in line.bounding_box],
                        "confidence": getattr(line, 'confidence', None)
                    })
        
        return {
            "status": read_result.status,
            "extracted_text": extracted_text,
            "full_text": " ".join([item["text"] for item in extracted_text])
        }
    
    def analyze_document_layout(self, image_url):
        """Analiza layoutu dokumentu"""
        
        # Analiza specjalna dla dokumentów
        analysis = self.client.analyze_image(
            image_url,
            visual_features=[VisualFeatureTypes.objects, VisualFeatureTypes.tags]
        )
        
        # OCR dla tekstu
        ocr_result = self.extract_text_from_image(image_url)
        
        # Kombinacja wyników
        document_analysis = {
            "layout_elements": [obj.object_property for obj in analysis.objects] if analysis.objects else [],
            "text_content": ocr_result["extracted_text"],
            "document_type": self._classify_document_type(analysis, ocr_result),
            "confidence": self._calculate_overall_confidence(analysis, ocr_result)
        }
        
        return document_analysis
    
    def _classify_document_type(self, vision_analysis, ocr_result):
        """Klasyfikacja typu dokumentu na podstawie analizy"""
        
        text_content = ocr_result["full_text"].lower()
        
        # Prosta klasyfikacja na podstawie słów kluczowych
        if any(word in text_content for word in ["invoice", "bill", "payment", "faktura"]):
            return "invoice"
        elif any(word in text_content for word in ["contract", "agreement", "umowa"]):
            return "contract"
        elif any(word in text_content for word in ["report", "summary", "raport"]):
            return "report"
        else:
            return "general_document"
    
    def _calculate_overall_confidence(self, vision_analysis, ocr_result):
        """Obliczanie ogólnej pewności analizy"""
        
        confidences = []
        
        # Pewność z vision analysis
        if hasattr(vision_analysis, 'description') and vision_analysis.description.captions:
            confidences.append(vision_analysis.description.captions[0].confidence)
        
        # Pewność z OCR (jeśli dostępna)
        ocr_confidences = [item.get("confidence", 0.8) for item in ocr_result["extracted_text"] if item.get("confidence")]
        if ocr_confidences:
            confidences.append(sum(ocr_confidences) / len(ocr_confidences))
        
        return sum(confidences) / len(confidences) if confidences else 0.5

🔧 Custom Vision Models

Tworzenie custom modeli klasyfikacji

from azure.cognitiveservices.vision.customvision.training import CustomVisionTrainingClient
from azure.cognitiveservices.vision.customvision.prediction import CustomVisionPredictionClient
from azure.cognitiveservices.vision.customvision.training.models import ImageFileCreateEntry
from msrest.authentication import ApiKeyCredentials

class CustomVisionTrainer:
    def __init__(self, training_key, prediction_key, endpoint):
        self.trainer = CustomVisionTrainingClient(
            ApiKeyCredentials(in_headers={"Training-key": training_key}),
            endpoint=endpoint
        )
        self.predictor = CustomVisionPredictionClient(
            ApiKeyCredentials(in_headers={"Prediction-key": prediction_key}),
            endpoint=endpoint
        )
        
    def create_classification_project(self, project_name, domain_type="General"):
        """Tworzenie projektu klasyfikacji"""
        
        # Stworzenie projektu
        project = self.trainer.create_project(
            name=project_name,
            description=f"Custom classification project: {project_name}",
            domain_id=self._get_domain_id(domain_type)
        )
        
        print(f"✅ Projekt utworzony: {project.name} (ID: {project.id})")
        
        return project
    
    def create_tags_and_upload_images(self, project_id, training_data):
        """Tworzenie tagów i upload obrazów treningowych"""
        
        # Tworzenie tagów
        created_tags = {}
        for tag_name in training_data.keys():
            tag = self.trainer.create_tag(project_id, tag_name)
            created_tags[tag_name] = tag
            print(f"📌 Tag utworzony: {tag_name}")
        
        # Upload obrazów dla każdego tagu
        for tag_name, image_paths in training_data.items():
            tag = created_tags[tag_name]
            
            # Przygotowanie obrazów do uploadu
            image_list = []
            for image_path in image_paths:
                with open(image_path, "rb") as image_contents:
                    image_list.append(ImageFileCreateEntry(
                        name=f"{tag_name}_{len(image_list)}.jpg",
                        contents=image_contents.read(),
                        tag_ids=[tag.id]
                    ))
            
            # Upload batch obrazów
            upload_result = self.trainer.create_images_from_files(
                project_id, 
                images=image_list
            )
            
            successful_uploads = len([img for img in upload_result.images if img.status == "OK"])
            print(f"✅ Uploaded {successful_uploads}/{len(image_list)} images for tag '{tag_name}'")
        
        return created_tags
    
    def train_and_evaluate_model(self, project_id):
        """Trenowanie i ewaluacja modelu"""
        
        print("🚀 Rozpoczynanie treningu...")
        
        # Rozpoczęcie treningu
        iteration = self.trainer.train_project(project_id)
        
        # Oczekiwanie na zakończenie treningu
        while iteration.status != "Completed":
            iteration = self.trainer.get_iteration(project_id, iteration.id)
            print(f"Status treningu: {iteration.status}")
            time.sleep(10)
        
        print("✅ Trening zakończony!")
        
        # Publikacja iteracji
        self.trainer.publish_iteration(
            project_id,
            iteration.id,
            "production",
            prediction_resource_id="/subscriptions/.../resourceGroups/.../providers/Microsoft.CognitiveServices/accounts/..."
        )
        
        # Ewaluacja performance
        performance = self.trainer.get_iteration_performance(project_id, iteration.id)
        
        evaluation_results = {
            "precision": performance.precision,
            "recall": performance.recall,
            "average_precision": performance.average_precision,
            "per_tag_performance": []
        }
        
        # Performance per tag
        for tag_performance in performance.per_tag_performance:
            evaluation_results["per_tag_performance"].append({
                "tag_name": tag_performance.name,
                "precision": tag_performance.precision,
                "recall": tag_performance.recall,
                "average_precision": tag_performance.average_precision
            })
        
        print(f"📊 Model Performance:")
        print(f"   Precision: {performance.precision:.3f}")
        print(f"   Recall: {performance.recall:.3f}")
        print(f"   Average Precision: {performance.average_precision:.3f}")
        
        return evaluation_results, iteration
    
    def predict_image(self, project_id, iteration_name, image_path):
        """Predykcja dla nowego obrazu"""
        
        with open(image_path, "rb") as image_contents:
            results = self.predictor.classify_image(
                project_id,
                iteration_name,
                image_contents.read()
            )
        
        predictions = []
        for prediction in results.predictions:
            predictions.append({
                "tag": prediction.tag_name,
                "probability": round(prediction.probability, 4),
                "confidence": "High" if prediction.probability > 0.8 else "Medium" if prediction.probability > 0.5 else "Low"
            })
        
        # Sortowanie według prawdopodobieństwa
        predictions.sort(key=lambda x: x["probability"], reverse=True)
        
        return {
            "predictions": predictions,
            "top_prediction": predictions[0] if predictions else None,
            "image_analyzed": image_path
        }

🛠️ Praktyczne zastosowania

System jakości produktów

class ProductQualityInspector:
    def __init__(self, vision_client, custom_vision_client):
        self.vision_client = vision_client
        self.custom_client = custom_vision_client
        self.quality_standards = {
            "defect_threshold": 0.7,
            "minimum_confidence": 0.6
        }
    
    async def inspect_product(self, image_path, product_type):
        """Inspekcja jakości produktu"""
        
        inspection_results = {
            "product_type": product_type,
            "image_path": image_path,
            "timestamp": datetime.utcnow().isoformat(),
            "quality_score": 0,
            "defects_detected": [],
            "recommendation": ""
        }
        
        try:
            # Ogólna analiza obrazu
            general_analysis = self.vision_client.analyze_image_comprehensive(image_path)
            
            # Custom analiza defektów (jeśli dostępny model)
            if hasattr(self.custom_client, f"{product_type}_defect_model"):
                defect_analysis = await self.custom_client.predict_image(
                    f"{product_type}_project_id",
                    "production", 
                    image_path
                )
                
                # Analiza defektów
                for prediction in defect_analysis["predictions"]:
                    if "defect" in prediction["tag"].lower() and prediction["probability"] > self.quality_standards["defect_threshold"]:
                        inspection_results["defects_detected"].append({
                            "type": prediction["tag"],
                            "severity": prediction["probability"],
                            "location": "detected"  # W rzeczywistości byłby bounding box
                        })
            
            # Obliczenie ogólnego score
            base_score = 1.0
            for defect in inspection_results["defects_detected"]:
                base_score -= defect["severity"] * 0.3  # Redukcja za każdy defekt
            
            inspection_results["quality_score"] = max(0, base_score)
            
            # Rekomendacja
            if inspection_results["quality_score"] > 0.8:
                inspection_results["recommendation"] = "ACCEPT - Wysoka jakość"
            elif inspection_results["quality_score"] > 0.6:
                inspection_results["recommendation"] = "REVIEW - Wymaga przeglądu"
            else:
                inspection_results["recommendation"] = "REJECT - Niska jakość"
            
            return inspection_results
            
        except Exception as e:
            inspection_results["error"] = f"Błąd inspekcji: {str(e)}"
            inspection_results["recommendation"] = "ERROR - Wymaga manualnej inspekcji"
            
            return inspection_results
    
    def generate_quality_report(self, inspection_results_batch):
        """Generowanie raportu jakości dla batch inspections"""
        
        if not inspection_results_batch:
            return {"error": "No inspection results provided"}
        
        total_inspected = len(inspection_results_batch)
        accepted = len([r for r in inspection_results_batch if r["recommendation"].startswith("ACCEPT")])
        rejected = len([r for r in inspection_results_batch if r["recommendation"].startswith("REJECT")])
        needs_review = total_inspected - accepted - rejected
        
        # Defekty analysis
        all_defects = []
        for result in inspection_results_batch:
            all_defects.extend(result.get("defects_detected", []))
        
        defect_types = {}
        for defect in all_defects:
            defect_type = defect["type"]
            if defect_type not in defect_types:
                defect_types[defect_type] = {"count": 0, "avg_severity": 0}
            defect_types[defect_type]["count"] += 1
            defect_types[defect_type]["avg_severity"] += defect["severity"]
        
        # Oblicz średnie severity
        for defect_type, stats in defect_types.items():
            stats["avg_severity"] = stats["avg_severity"] / stats["count"]
        
        quality_report = {
            "summary": {
                "total_inspected": total_inspected,
                "accepted": accepted,
                "rejected": rejected,
                "needs_review": needs_review,
                "acceptance_rate": (accepted / total_inspected) * 100,
                "rejection_rate": (rejected / total_inspected) * 100
            },
            "defect_analysis": defect_types,
            "recommendations": self._generate_quality_recommendations(defect_types, inspection_results_batch)
        }
        
        return quality_report
    
    def _generate_quality_recommendations(self, defect_types, inspection_results):
        """Generowanie rekomendacji na podstawie analizy jakości"""
        
        recommendations = []
        
        # Rekomendacje na podstawie częstości defektów
        for defect_type, stats in defect_types.items():
            if stats["count"] > len(inspection_results) * 0.1:  # >10% produktów
                recommendations.append({
                    "priority": "High",
                    "issue": f"Frequent {defect_type} defects",
                    "recommendation": f"Review production process for {defect_type} prevention",
                    "affected_percentage": (stats["count"] / len(inspection_results)) * 100
                })
        
        # Rekomendacje na podstawie severity
        high_severity_defects = [dt for dt, stats in defect_types.items() if stats["avg_severity"] > 0.8]
        if high_severity_defects:
            recommendations.append({
                "priority": "Critical",
                "issue": f"High severity defects: {', '.join(high_severity_defects)}",
                "recommendation": "Immediate process review and corrective actions required"
            })
        
        return recommendations

✅ Zadania praktyczne

Zadanie 1: Basic Image Analysis (30 min)

  1. Skonfiguruj Azure Computer Vision
  2. Przeanalizuj różne typy obrazów (zdjęcia, dokumenty, diagramy)
  3. Przetestuj wszystkie dostępne funkcje API
  4. Porównaj wyniki dla różnych typów treści

Zadanie 2: OCR Implementation (30 min)

  1. Zaimplementuj wydobywanie tekstu z obrazów
  2. Przetestuj z różnymi językami i fontami
  3. Obsłuż dokumenty wielostronicowe
  4. Dodaj walidację i post-processing tekstu

Zadanie 3: Custom Vision Model (45 min)

  1. Stwórz custom model klasyfikacji (min. 3 klasy)
  2. Przygotuj zestaw treningowy (min. 10 obrazów/klasa)
  3. Wytrenuj i oceń model
  4. Przetestuj na nowych obrazach

Zadanie 4: Quality Control System (15 min)

  1. Zaimplementuj system kontroli jakości
  2. Użyj custom model do wykrywania defektów
  3. Stwórz automated workflow
  4. Wygeneruj raport jakości

📊 Metryki sukcesu

  • OCR Accuracy > 95% dla czytelnego tekstu
  • Object Detection > 90% precision dla głównych obiektów
  • Custom Model > 85% accuracy po treningu
  • Processing Speed < 3 sekundy per obraz

📚 Materiały dodatkowe

💡 Wskazówka

Każda sesja to 2 godziny intensywnej nauki z praktycznymi ćwiczeniami. Materiały można przeglądać w dowolnym tempie.

📈 Postęp

Śledź swój postęp w nauce AI i przygotowaniu do certyfikacji Azure AI-102. Każdy moduł buduje na poprzednim.