Sesja 19: Integracja AI z systemami biznesowymi
Enterprise AI Integration Patterns
🎯 Cele sesji
- Implementacja enterprise AI integration patterns
- Azure Integration Services dla AI workloads
- Event-driven architecture z AI processing
- API design i security dla AI services
🔗 Enterprise AI Integration Patterns
Common Integration Architectures
INTEGRATION PATTERNS:
1. API-FIRST INTEGRATION:
[BUSINESS SYSTEM] ↔ [AI API GATEWAY] ↔ [AI SERVICES]
✓ Loose coupling
✓ Scalable i maintainable
✓ Technology agnostic
2. EVENT-DRIVEN INTEGRATION:
[BUSINESS EVENTS] → [EVENT HUB] → [AI PROCESSORS] → [RESULTS STORE]
✓ Real-time processing
✓ Asynchronous handling
✓ High throughput
3. BATCH INTEGRATION:
[SCHEDULED JOBS] → [DATA EXTRACT] → [AI PROCESSING] → [RESULTS IMPORT]
✓ Large volume processing
✓ Cost-effective
✓ Predictable resource usage
4. EMBEDDED INTEGRATION:
[APPLICATION] + [EMBEDDED AI MODELS] → [REAL-TIME DECISIONS]
✓ Low latency
✓ Offline capability
✓ Data privacy
Azure Integration Services dla AI
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.eventhub import EventHubProducerClient, EventData
from azure.storage.queue import QueueClient
import azure.functions as func
import json
import asyncio
from datetime import datetime
class EnterpriseAIOrchestrator:
def __init__(self, config):
self.servicebus_client = ServiceBusClient.from_connection_string(
config["servicebus_connection"]
)
self.eventhub_client = EventHubProducerClient.from_connection_string(
config["eventhub_connection"]
)
self.ai_services = {
"document_ai": DocumentAIService(config["document_ai"]),
"language_ai": LanguageAIService(config["language_ai"]),
"vision_ai": VisionAIService(config["vision_ai"]),
"custom_models": CustomModelService(config["custom_models"])
}
async def process_business_event(self, event_data):
"""Process incoming business event z AI services"""
event_type = event_data.get("event_type")
payload = event_data.get("payload")
correlation_id = event_data.get("correlation_id", str(uuid.uuid4()))
print(f"📨 Processing event: {event_type} (ID: {correlation_id})")
try:
if event_type == "document_uploaded":
result = await self._process_document_upload(payload, correlation_id)
elif event_type == "customer_inquiry":
result = await self._process_customer_inquiry(payload, correlation_id)
elif event_type == "transaction_completed":
result = await self._process_transaction(payload, correlation_id)
elif event_type == "support_ticket_created":
result = await self._process_support_ticket(payload, correlation_id)
else:
raise ValueError(f"Unsupported event type: {event_type}")
# Send results back to business system
await self._publish_ai_results(result, correlation_id)
return result
except Exception as e:
error_result = {
"event_type": event_type,
"correlation_id": correlation_id,
"status": "error",
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
await self._handle_processing_error(error_result)
return error_result
async def _process_document_upload(self, payload, correlation_id):
"""Process document upload event z comprehensive AI analysis"""
document_url = payload["document_url"]
document_type = payload.get("document_type", "unknown")
business_context = payload.get("business_context", {})
print(f"📄 Processing document: {document_type}")
# Route to appropriate AI service based na document type
if document_type == "contract":
analysis_result = await self._analyze_contract(document_url, business_context)
elif document_type == "invoice":
analysis_result = await self._analyze_invoice(document_url, business_context)
elif document_type == "compliance_document":
analysis_result = await self._analyze_compliance_document(document_url, business_context)
else:
analysis_result = await self._analyze_general_document(document_url, business_context)
# Business validation i enrichment
validated_result = await self._validate_and_enrich_analysis(
analysis_result, business_context
)
return {
"event_type": "document_processed",
"correlation_id": correlation_id,
"document_id": payload["document_id"],
"analysis_result": validated_result,
"processing_timestamp": datetime.utcnow().isoformat(),
"status": "completed"
}
async def _analyze_contract(self, document_url, business_context):
"""Specialized contract analysis using multiple AI services"""
# Multi-step analysis workflow
analysis_steps = []
# Step 1: Document structure extraction
print("🔍 Extracting contract structure...")
structure_analysis = await self.ai_services["document_ai"].extract_structure(
document_url,
document_type="contract"
)
analysis_steps.append(("structure_extraction", structure_analysis))
# Step 2: Legal entity recognition
print("⚖️ Identifying legal entities...")
entity_analysis = await self.ai_services["language_ai"].extract_legal_entities(
structure_analysis["extracted_text"]
)
analysis_steps.append(("entity_recognition", entity_analysis))
# Step 3: Risk assessment using custom model
print("🚨 Assessing contract risks...")
risk_analysis = await self.ai_services["custom_models"].assess_contract_risk(
structure_analysis["extracted_text"],
entity_analysis["entities"]
)
analysis_steps.append(("risk_assessment", risk_analysis))
# Step 4: Compliance checking
print("📋 Checking compliance requirements...")
compliance_analysis = await self._check_contract_compliance(
structure_analysis, entity_analysis, business_context
)
analysis_steps.append(("compliance_check", compliance_analysis))
# Combine wszystkie results
comprehensive_analysis = {
"document_type": "contract",
"structure": structure_analysis,
"entities": entity_analysis,
"risk_assessment": risk_analysis,
"compliance": compliance_analysis,
"overall_confidence": self._calculate_combined_confidence(analysis_steps),
"recommended_actions": self._generate_contract_recommendations(analysis_steps),
"processing_pipeline": [step[0] for step in analysis_steps]
}
return comprehensive_analysis
async def _process_customer_inquiry(self, payload, correlation_id):
"""Process customer inquiry z intelligent routing"""
inquiry_text = payload["inquiry_text"]
customer_id = payload.get("customer_id")
channel = payload.get("channel", "email") # email, chat, phone
# Multi-step processing
processing_result = {
"inquiry_analysis": {},
"routing_decision": {},
"automated_response": {},
"escalation_needed": False
}
# Step 1: Intent i sentiment analysis
print("🧠 Analyzing customer intent...")
intent_analysis = await self.ai_services["language_ai"].analyze_customer_intent(
inquiry_text,
customer_context={"id": customer_id, "channel": channel}
)
processing_result["inquiry_analysis"] = intent_analysis
# Step 2: Intelligent routing decision
print("🎯 Determining routing...")
routing_decision = await self._determine_inquiry_routing(
intent_analysis, customer_id
)
processing_result["routing_decision"] = routing_decision
# Step 3: Generate automated response (if applicable)
if routing_decision["can_auto_respond"]:
print("🤖 Generating automated response...")
automated_response = await self._generate_customer_response(
inquiry_text, intent_analysis, customer_id
)
processing_result["automated_response"] = automated_response
else:
processing_result["escalation_needed"] = True
processing_result["escalation_reason"] = routing_decision["escalation_reason"]
return {
"event_type": "customer_inquiry_processed",
"correlation_id": correlation_id,
"customer_id": customer_id,
"processing_result": processing_result,
"timestamp": datetime.utcnow().isoformat()
}
# Azure Functions integration dla event processing
@func.function_name("BusinessEventProcessor")
@func.service_bus_queue_trigger(
arg_name="msg",
queue_name="business-events",
connection="ServiceBusConnection"
)
async def business_event_processor(msg: func.ServiceBusMessage):
"""Azure Function dla processing business events z AI"""
try:
# Parse event data
event_data = json.loads(msg.get_body().decode('utf-8'))
# Initialize AI orchestrator
config = get_integration_config()
orchestrator = EnterpriseAIOrchestrator(config)
# Process event
result = await orchestrator.process_business_event(event_data)
# Log successful processing
logging.info(f"Successfully processed event {event_data.get('correlation_id')}")
return result
except Exception as e:
logging.error(f"Error processing business event: {str(e)}")
# Send to dead letter queue dla manual review
await send_to_dead_letter_queue(msg, str(e))
raise
🏢 Enterprise Architecture Workshop
Workshop Project: Integrated AI Business Solution
Scenario: GlobalCorp needs AI-powered document processing system
Requirements:
- 1000+ documents daily (contracts, invoices, reports)
- 80% automation rate z human oversight
- Integration z SAP, Salesforce, SharePoint
- Compliance z SOX, GDPR, industry regulations
- <30 second processing time per document
Architecture Implementation:
class GlobalCorpAISystem:
def __init__(self):
self.integration_layers = {
"presentation": WebAPILayer(),
"orchestration": BusinessLogicOrchestrator(),
"ai_services": AIServicesLayer(),
"data": DataAccessLayer(),
"external": ExternalSystemsIntegration()
}
async def initialize_system(self, config):
"""Initialize complete system architecture"""
# Setup integration endpoints
await self._setup_sap_integration(config["sap"])
await self._setup_salesforce_integration(config["salesforce"])
await self._setup_sharepoint_integration(config["sharepoint"])
# Configure AI services
await self._configure_ai_pipeline()
# Setup monitoring i compliance
await self._setup_monitoring_and_compliance()
print("🏗️ GlobalCorp AI System initialized")
async def _setup_sap_integration(self, sap_config):
"""Setup SAP integration dla financial documents"""
sap_integration = {
"connection_type": "REST_API",
"authentication": "OAuth2",
"endpoints": {
"invoice_posting": f"{sap_config['base_url']}/sap/bc/rest/invoices",
"vendor_lookup": f"{sap_config['base_url']}/sap/bc/rest/vendors",
"approval_workflow": f"{sap_config['base_url']}/sap/bc/rest/workflows"
},
"data_mapping": {
"ai_invoice_data": "sap_invoice_format",
"vendor_entities": "sap_vendor_master",
"approval_routing": "sap_workflow_triggers"
}
}
# Test SAP connectivity
connection_test = await self._test_sap_connection(sap_integration)
if connection_test["success"]:
print("✅ SAP integration configured")
else:
print(f"❌ SAP integration failed: {connection_test['error']}")
return sap_integration
✅ Zadania praktyczne
Zadanie 1: API Gateway Design (45 min)
- Zaprojektuj AI API gateway dla enterprise
- Implement authentication i rate limiting
- Add request/response transformation
- Setup monitoring i logging
Zadanie 2: Event-Driven Processing (45 min)
- Implement event-driven AI processing
- Setup Azure Service Bus integration
- Add error handling i dead letter queues
- Create monitoring dashboard
Zadanie 3: Legacy System Integration (20 min)
- Design integration z legacy ERP system
- Implement data transformation layers
- Add compliance i audit logging
- Test end-to-end workflow
Zadanie 4: Security Implementation (10 min)
- Implement enterprise security patterns
- Add API authentication i authorization
- Setup data encryption in transit/rest
- Configure compliance monitoring
📊 Metryki sukcesu
- Integration reliability > 99.5%
- Data transformation accuracy > 99.9%
- Processing latency < 2 seconds end-to-end
- Security compliance - 100% audit requirements met