Sesja 8: Analiza obrazów i rozpoznawanie obiektów
Azure Computer Vision Services
🎯 Cele sesji
- Implementacja Azure Computer Vision API
- Budowa custom models dla rozpoznawania obiektów
- OCR i analiza tekstu w obrazach
- Tworzenie aplikacji z analizą wizualną
👁️ Podstawy Computer Vision
Podstawowe zadania Computer Vision
Klasyfikacja obrazów:
- Pojedyncza etykieta - jeden obiekt na obraz
- Wielokrotne etykiety - wiele obiektów na obraz
- Hierarchiczna klasyfikacja - kategorie i podkategorie
Wykrywanie obiektów (Object Detection):
- Bounding boxes wokół obiektów
- Klasyfikacja + lokalizacja
- Real-time processing
Segmentacja:
- Semantic segmentation - klasyfikacja pikseli
- Instance segmentation - rozróżnienie instancji objektów
Azure Computer Vision Services
from azure.cognitiveservices.vision.computervision import ComputerVisionClient
from azure.cognitiveservices.vision.computervision.models import VisualFeatureTypes
from msrest.authentication import CognitiveServicesCredentials
import io
class AzureVisionAnalyzer:
def __init__(self, subscription_key, endpoint):
self.client = ComputerVisionClient(
endpoint,
CognitiveServicesCredentials(subscription_key)
)
def analyze_image_comprehensive(self, image_url):
"""Kompleksowa analiza obrazu"""
# Pełna analiza z wszystkimi dostępnymi funkcjami
analysis = self.client.analyze_image(
image_url,
visual_features=[
VisualFeatureTypes.tags,
VisualFeatureTypes.description,
VisualFeatureTypes.faces,
VisualFeatureTypes.image_type,
VisualFeatureTypes.color,
VisualFeatureTypes.adult,
VisualFeatureTypes.categories,
VisualFeatureTypes.objects,
VisualFeatureTypes.brands
]
)
# Formatowanie wyników
results = {
"description": {
"captions": [caption.text for caption in analysis.description.captions],
"confidence": analysis.description.captions[0].confidence if analysis.description.captions else 0
},
"tags": [{"name": tag.name, "confidence": tag.confidence} for tag in analysis.tags],
"categories": [{"name": cat.name, "score": cat.score} for cat in analysis.categories],
"objects": [],
"faces": [],
"color": {
"dominant_color_foreground": analysis.color.dominant_color_foreground,
"dominant_color_background": analysis.color.dominant_color_background,
"accent_color": analysis.color.accent_color
},
"image_type": {
"clip_art_type": analysis.image_type.clip_art_type,
"line_drawing_type": analysis.image_type.line_drawing_type
}
}
# Przetwarzanie obiektów
if analysis.objects:
for obj in analysis.objects:
results["objects"].append({
"object": obj.object_property,
"confidence": obj.confidence,
"rectangle": {
"x": obj.rectangle.x,
"y": obj.rectangle.y,
"w": obj.rectangle.w,
"h": obj.rectangle.h
}
})
# Przetwarzanie twarzy
if analysis.faces:
for face in analysis.faces:
results["faces"].append({
"age": face.age,
"gender": face.gender,
"rectangle": {
"left": face.face_rectangle.left,
"top": face.face_rectangle.top,
"width": face.face_rectangle.width,
"height": face.face_rectangle.height
}
})
return results
def extract_text_from_image(self, image_url):
"""OCR - wydobywanie tekstu z obrazu"""
# Rozpoczęcie operacji Read
read_response = self.client.read(image_url, raw=True)
# Pobranie Operation-Location header
operation_location_remote = read_response.headers["Operation-Location"]
operation_id = operation_location_remote.split("/")[-1]
# Oczekiwanie na zakończenie operacji
import time
while True:
read_result = self.client.get_read_result(operation_id)
if read_result.status not in ['notStarted', 'running']:
break
time.sleep(1)
# Wydobywanie tekstu
extracted_text = []
if read_result.status == 'succeeded':
for text_result in read_result.analyze_result.read_results:
for line in text_result.lines:
extracted_text.append({
"text": line.text,
"bounding_box": [round(coord, 2) for coord in line.bounding_box],
"confidence": getattr(line, 'confidence', None)
})
return {
"status": read_result.status,
"extracted_text": extracted_text,
"full_text": " ".join([item["text"] for item in extracted_text])
}
def analyze_document_layout(self, image_url):
"""Analiza layoutu dokumentu"""
# Analiza specjalna dla dokumentów
analysis = self.client.analyze_image(
image_url,
visual_features=[VisualFeatureTypes.objects, VisualFeatureTypes.tags]
)
# OCR dla tekstu
ocr_result = self.extract_text_from_image(image_url)
# Kombinacja wyników
document_analysis = {
"layout_elements": [obj.object_property for obj in analysis.objects] if analysis.objects else [],
"text_content": ocr_result["extracted_text"],
"document_type": self._classify_document_type(analysis, ocr_result),
"confidence": self._calculate_overall_confidence(analysis, ocr_result)
}
return document_analysis
def _classify_document_type(self, vision_analysis, ocr_result):
"""Klasyfikacja typu dokumentu na podstawie analizy"""
text_content = ocr_result["full_text"].lower()
# Prosta klasyfikacja na podstawie słów kluczowych
if any(word in text_content for word in ["invoice", "bill", "payment", "faktura"]):
return "invoice"
elif any(word in text_content for word in ["contract", "agreement", "umowa"]):
return "contract"
elif any(word in text_content for word in ["report", "summary", "raport"]):
return "report"
else:
return "general_document"
def _calculate_overall_confidence(self, vision_analysis, ocr_result):
"""Obliczanie ogólnej pewności analizy"""
confidences = []
# Pewność z vision analysis
if hasattr(vision_analysis, 'description') and vision_analysis.description.captions:
confidences.append(vision_analysis.description.captions[0].confidence)
# Pewność z OCR (jeśli dostępna)
ocr_confidences = [item.get("confidence", 0.8) for item in ocr_result["extracted_text"] if item.get("confidence")]
if ocr_confidences:
confidences.append(sum(ocr_confidences) / len(ocr_confidences))
return sum(confidences) / len(confidences) if confidences else 0.5
🔧 Custom Vision Models
Tworzenie custom modeli klasyfikacji
from azure.cognitiveservices.vision.customvision.training import CustomVisionTrainingClient
from azure.cognitiveservices.vision.customvision.prediction import CustomVisionPredictionClient
from azure.cognitiveservices.vision.customvision.training.models import ImageFileCreateEntry
from msrest.authentication import ApiKeyCredentials
class CustomVisionTrainer:
def __init__(self, training_key, prediction_key, endpoint):
self.trainer = CustomVisionTrainingClient(
ApiKeyCredentials(in_headers={"Training-key": training_key}),
endpoint=endpoint
)
self.predictor = CustomVisionPredictionClient(
ApiKeyCredentials(in_headers={"Prediction-key": prediction_key}),
endpoint=endpoint
)
def create_classification_project(self, project_name, domain_type="General"):
"""Tworzenie projektu klasyfikacji"""
# Stworzenie projektu
project = self.trainer.create_project(
name=project_name,
description=f"Custom classification project: {project_name}",
domain_id=self._get_domain_id(domain_type)
)
print(f"✅ Projekt utworzony: {project.name} (ID: {project.id})")
return project
def create_tags_and_upload_images(self, project_id, training_data):
"""Tworzenie tagów i upload obrazów treningowych"""
# Tworzenie tagów
created_tags = {}
for tag_name in training_data.keys():
tag = self.trainer.create_tag(project_id, tag_name)
created_tags[tag_name] = tag
print(f"📌 Tag utworzony: {tag_name}")
# Upload obrazów dla każdego tagu
for tag_name, image_paths in training_data.items():
tag = created_tags[tag_name]
# Przygotowanie obrazów do uploadu
image_list = []
for image_path in image_paths:
with open(image_path, "rb") as image_contents:
image_list.append(ImageFileCreateEntry(
name=f"{tag_name}_{len(image_list)}.jpg",
contents=image_contents.read(),
tag_ids=[tag.id]
))
# Upload batch obrazów
upload_result = self.trainer.create_images_from_files(
project_id,
images=image_list
)
successful_uploads = len([img for img in upload_result.images if img.status == "OK"])
print(f"✅ Uploaded {successful_uploads}/{len(image_list)} images for tag '{tag_name}'")
return created_tags
def train_and_evaluate_model(self, project_id):
"""Trenowanie i ewaluacja modelu"""
print("🚀 Rozpoczynanie treningu...")
# Rozpoczęcie treningu
iteration = self.trainer.train_project(project_id)
# Oczekiwanie na zakończenie treningu
while iteration.status != "Completed":
iteration = self.trainer.get_iteration(project_id, iteration.id)
print(f"Status treningu: {iteration.status}")
time.sleep(10)
print("✅ Trening zakończony!")
# Publikacja iteracji
self.trainer.publish_iteration(
project_id,
iteration.id,
"production",
prediction_resource_id="/subscriptions/.../resourceGroups/.../providers/Microsoft.CognitiveServices/accounts/..."
)
# Ewaluacja performance
performance = self.trainer.get_iteration_performance(project_id, iteration.id)
evaluation_results = {
"precision": performance.precision,
"recall": performance.recall,
"average_precision": performance.average_precision,
"per_tag_performance": []
}
# Performance per tag
for tag_performance in performance.per_tag_performance:
evaluation_results["per_tag_performance"].append({
"tag_name": tag_performance.name,
"precision": tag_performance.precision,
"recall": tag_performance.recall,
"average_precision": tag_performance.average_precision
})
print(f"📊 Model Performance:")
print(f" Precision: {performance.precision:.3f}")
print(f" Recall: {performance.recall:.3f}")
print(f" Average Precision: {performance.average_precision:.3f}")
return evaluation_results, iteration
def predict_image(self, project_id, iteration_name, image_path):
"""Predykcja dla nowego obrazu"""
with open(image_path, "rb") as image_contents:
results = self.predictor.classify_image(
project_id,
iteration_name,
image_contents.read()
)
predictions = []
for prediction in results.predictions:
predictions.append({
"tag": prediction.tag_name,
"probability": round(prediction.probability, 4),
"confidence": "High" if prediction.probability > 0.8 else "Medium" if prediction.probability > 0.5 else "Low"
})
# Sortowanie według prawdopodobieństwa
predictions.sort(key=lambda x: x["probability"], reverse=True)
return {
"predictions": predictions,
"top_prediction": predictions[0] if predictions else None,
"image_analyzed": image_path
}
🛠️ Praktyczne zastosowania
System jakości produktów
class ProductQualityInspector:
def __init__(self, vision_client, custom_vision_client):
self.vision_client = vision_client
self.custom_client = custom_vision_client
self.quality_standards = {
"defect_threshold": 0.7,
"minimum_confidence": 0.6
}
async def inspect_product(self, image_path, product_type):
"""Inspekcja jakości produktu"""
inspection_results = {
"product_type": product_type,
"image_path": image_path,
"timestamp": datetime.utcnow().isoformat(),
"quality_score": 0,
"defects_detected": [],
"recommendation": ""
}
try:
# Ogólna analiza obrazu
general_analysis = self.vision_client.analyze_image_comprehensive(image_path)
# Custom analiza defektów (jeśli dostępny model)
if hasattr(self.custom_client, f"{product_type}_defect_model"):
defect_analysis = await self.custom_client.predict_image(
f"{product_type}_project_id",
"production",
image_path
)
# Analiza defektów
for prediction in defect_analysis["predictions"]:
if "defect" in prediction["tag"].lower() and prediction["probability"] > self.quality_standards["defect_threshold"]:
inspection_results["defects_detected"].append({
"type": prediction["tag"],
"severity": prediction["probability"],
"location": "detected" # W rzeczywistości byłby bounding box
})
# Obliczenie ogólnego score
base_score = 1.0
for defect in inspection_results["defects_detected"]:
base_score -= defect["severity"] * 0.3 # Redukcja za każdy defekt
inspection_results["quality_score"] = max(0, base_score)
# Rekomendacja
if inspection_results["quality_score"] > 0.8:
inspection_results["recommendation"] = "ACCEPT - Wysoka jakość"
elif inspection_results["quality_score"] > 0.6:
inspection_results["recommendation"] = "REVIEW - Wymaga przeglądu"
else:
inspection_results["recommendation"] = "REJECT - Niska jakość"
return inspection_results
except Exception as e:
inspection_results["error"] = f"Błąd inspekcji: {str(e)}"
inspection_results["recommendation"] = "ERROR - Wymaga manualnej inspekcji"
return inspection_results
def generate_quality_report(self, inspection_results_batch):
"""Generowanie raportu jakości dla batch inspections"""
if not inspection_results_batch:
return {"error": "No inspection results provided"}
total_inspected = len(inspection_results_batch)
accepted = len([r for r in inspection_results_batch if r["recommendation"].startswith("ACCEPT")])
rejected = len([r for r in inspection_results_batch if r["recommendation"].startswith("REJECT")])
needs_review = total_inspected - accepted - rejected
# Defekty analysis
all_defects = []
for result in inspection_results_batch:
all_defects.extend(result.get("defects_detected", []))
defect_types = {}
for defect in all_defects:
defect_type = defect["type"]
if defect_type not in defect_types:
defect_types[defect_type] = {"count": 0, "avg_severity": 0}
defect_types[defect_type]["count"] += 1
defect_types[defect_type]["avg_severity"] += defect["severity"]
# Oblicz średnie severity
for defect_type, stats in defect_types.items():
stats["avg_severity"] = stats["avg_severity"] / stats["count"]
quality_report = {
"summary": {
"total_inspected": total_inspected,
"accepted": accepted,
"rejected": rejected,
"needs_review": needs_review,
"acceptance_rate": (accepted / total_inspected) * 100,
"rejection_rate": (rejected / total_inspected) * 100
},
"defect_analysis": defect_types,
"recommendations": self._generate_quality_recommendations(defect_types, inspection_results_batch)
}
return quality_report
def _generate_quality_recommendations(self, defect_types, inspection_results):
"""Generowanie rekomendacji na podstawie analizy jakości"""
recommendations = []
# Rekomendacje na podstawie częstości defektów
for defect_type, stats in defect_types.items():
if stats["count"] > len(inspection_results) * 0.1: # >10% produktów
recommendations.append({
"priority": "High",
"issue": f"Frequent {defect_type} defects",
"recommendation": f"Review production process for {defect_type} prevention",
"affected_percentage": (stats["count"] / len(inspection_results)) * 100
})
# Rekomendacje na podstawie severity
high_severity_defects = [dt for dt, stats in defect_types.items() if stats["avg_severity"] > 0.8]
if high_severity_defects:
recommendations.append({
"priority": "Critical",
"issue": f"High severity defects: {', '.join(high_severity_defects)}",
"recommendation": "Immediate process review and corrective actions required"
})
return recommendations
✅ Zadania praktyczne
Zadanie 1: Basic Image Analysis (30 min)
- Skonfiguruj Azure Computer Vision
- Przeanalizuj różne typy obrazów (zdjęcia, dokumenty, diagramy)
- Przetestuj wszystkie dostępne funkcje API
- Porównaj wyniki dla różnych typów treści
Zadanie 2: OCR Implementation (30 min)
- Zaimplementuj wydobywanie tekstu z obrazów
- Przetestuj z różnymi językami i fontami
- Obsłuż dokumenty wielostronicowe
- Dodaj walidację i post-processing tekstu
Zadanie 3: Custom Vision Model (45 min)
- Stwórz custom model klasyfikacji (min. 3 klasy)
- Przygotuj zestaw treningowy (min. 10 obrazów/klasa)
- Wytrenuj i oceń model
- Przetestuj na nowych obrazach
Zadanie 4: Quality Control System (15 min)
- Zaimplementuj system kontroli jakości
- Użyj custom model do wykrywania defektów
- Stwórz automated workflow
- Wygeneruj raport jakości
📊 Metryki sukcesu
- OCR Accuracy > 95% dla czytelnego tekstu
- Object Detection > 90% precision dla głównych obiektów
- Custom Model > 85% accuracy po treningu
- Processing Speed < 3 sekundy per obraz