Sesja 15: Warsztaty - RAG i asystenci AI
Praktyczna implementacja end-to-end
🎯 Cele warsztatów
- Implementacja kompletnego systemu RAG (Retrieval Augmented Generation)
- Budowa inteligentnych asystentów AI z pamięcią konwersacji
- Integracja z bazami wiedzy i dokumentami
- Optymalizacja jakości odpowiedzi i relevance
🔗 Retrieval Augmented Generation (RAG)
Architektura systemu RAG
ZAPYTANIE → RETRIEVAL → CONTEXT INJECTION → LLM → ODPOWIEDŹ
↓ ↓ ↓ ↓ ↓
EMBEDDING → VECTOR DB → PROMPT BUILDING → GPT-4 → VALIDATION
Kluczowe komponenty:
- Document Processing - przetwarzanie i indeksowanie dokumentów
- Vector Storage - przechowywanie embeddingów
- Retrieval System - wyszukiwanie relevantnych fragmentów
- Context Integration - łączenie kontekstu z zapytaniem
- Response Generation - generowanie odpowiedzi przez LLM
💻 Implementacja production-ready RAG
import asyncio
from typing import List, Dict, Optional, Tuple
from langchain.document_loaders import PyPDFLoader, TextLoader, WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Pinecone, FAISS
from langchain.llms import AzureOpenAI
from langchain.chains import RetrievalQA, ConversationChain
from langchain.memory import ConversationBufferWindowMemory
import pinecone
class ProductionRAGSystem:
def __init__(self, config: Dict):
self.config = config
self.embeddings = OpenAIEmbeddings(
openai_api_key=config["openai_api_key"],
deployment=config["embedding_deployment"]
)
self.llm = AzureOpenAI(
deployment_name=config["llm_deployment"],
openai_api_key=config["openai_api_key"],
openai_api_base=config["openai_api_base"],
openai_api_version=config["openai_api_version"]
)
self.vector_store = None
self.conversation_memory = ConversationBufferWindowMemory(
k=10, # Pamiętaj ostatnie 10 wymian
return_messages=True
)
async def ingest_documents(self, document_sources: List[Dict]) -> Dict:
"""Przetwarzanie i indeksowanie dokumentów"""
print("🔄 Starting document ingestion...")
all_documents = []
ingestion_stats = {
"total_sources": len(document_sources),
"successful_loads": 0,
"total_chunks": 0,
"failed_sources": []
}
# Przetwarzanie różnych typów źródeł
for source in document_sources:
try:
documents = await self._load_documents(source)
all_documents.extend(documents)
ingestion_stats["successful_loads"] += 1
print(f"✅ Loaded {len(documents)} documents from {source['type']}")
except Exception as e:
print(f"❌ Failed to load from {source['type']}: {str(e)}")
ingestion_stats["failed_sources"].append({
"source": source,
"error": str(e)
})
if not all_documents:
raise ValueError("No documents were successfully loaded")
# Chunking dokumentów
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""]
)
document_chunks = text_splitter.split_documents(all_documents)
ingestion_stats["total_chunks"] = len(document_chunks)
print(f"📄 Created {len(document_chunks)} chunks from {len(all_documents)} documents")
# Tworzenie vector store
if self.config.get("use_pinecone", False):
self.vector_store = await self._create_pinecone_index(document_chunks)
else:
self.vector_store = FAISS.from_documents(document_chunks, self.embeddings)
# Zapisz lokalnie dla persistence
self.vector_store.save_local("faiss_index")
print("✅ Vector store created successfully")
return ingestion_stats
async def _load_documents(self, source: Dict) -> List:
"""Ładowanie dokumentów z różnych źródeł"""
source_type = source["type"]
source_path = source["path"]
if source_type == "pdf":
loader = PyPDFLoader(source_path)
elif source_type == "text":
loader = TextLoader(source_path, encoding="utf-8")
elif source_type == "web":
loader = WebBaseLoader(source_path)
else:
raise ValueError(f"Unsupported source type: {source_type}")
documents = loader.load()
# Dodaj metadata
for doc in documents:
doc.metadata.update({
"source_type": source_type,
"source_path": source_path,
"ingestion_timestamp": datetime.utcnow().isoformat()
})
return documents
async def _create_pinecone_index(self, documents: List) -> pinecone.Index:
"""Tworzenie indeksu Pinecone"""
# Inicjalizacja Pinecone
pinecone.init(
api_key=self.config["pinecone_api_key"],
environment=self.config["pinecone_environment"]
)
index_name = self.config["pinecone_index_name"]
# Sprawdź czy indeks istnieje
if index_name not in pinecone.list_indexes():
# Stwórz nowy indeks
pinecone.create_index(
name=index_name,
dimension=1536, # OpenAI embeddings dimension
metric="cosine"
)
# Połącz z indeksem
vector_store = Pinecone.from_documents(
documents,
self.embeddings,
index_name=index_name
)
return vector_store
async def intelligent_search(self, query: str,
search_options: Optional[Dict] = None) -> Dict:
"""Inteligentne wyszukiwanie z kontekstem"""
if not self.vector_store:
raise ValueError("Vector store not initialized. Run ingest_documents first.")
options = search_options or {}
k = options.get("k", 4) # Number of documents to retrieve
# Wyszukiwanie relevantnych dokumentów
relevant_docs = self.vector_store.similarity_search(query, k=k)
# Przygotowanie kontekstu
context_chunks = []
sources = []
for doc in relevant_docs:
context_chunks.append(doc.page_content)
sources.append({
"source": doc.metadata.get("source_path", "Unknown"),
"content_preview": doc.page_content[:200] + "..."
})
combined_context = "\n\n".join(context_chunks)
# Generowanie odpowiedzi z kontekstem
enhanced_prompt = self._build_rag_prompt(query, combined_context)
try:
response = await self.llm.agenerate([enhanced_prompt])
answer = response.generations[0][0].text.strip()
return {
"query": query,
"answer": answer,
"context_used": combined_context,
"sources": sources,
"confidence": self._calculate_confidence(relevant_docs, answer)
}
except Exception as e:
return {
"query": query,
"error": f"Failed to generate response: {str(e)}",
"sources": sources
}
def _build_rag_prompt(self, query: str, context: str) -> str:
"""Budowanie prompta RAG"""
prompt_template = f"""
Jesteś ekspertem-asystentem AI. Odpowiadaj na pytania na podstawie dostarczonego kontekstu.
WAŻNE ZASADY:
1. Użyj TYLKO informacji z dostarczonego kontekstu
2. Jeśli kontekst nie zawiera odpowiedzi, powiedz to jasno
3. Cytuj konkretne fragmenty z kontekstu gdy to możliwe
4. Bądź precyzyjny i faktograficzny
KONTEKST:
{context}
PYTANIE: {query}
ODPOWIEDŹ:"""
return prompt_template
def _calculate_confidence(self, documents: List, answer: str) -> float:
"""Obliczanie poziomu pewności odpowiedzi"""
# Prosty scoring na podstawie jakości źródeł
base_confidence = 0.5
# Bonus za liczbę relevantnych dokumentów
doc_bonus = min(len(documents) * 0.1, 0.3)
# Bonus za długość odpowiedzi (więcej szczegółów = wyższa pewność)
length_bonus = min(len(answer) / 1000, 0.2)
# Sprawdź czy odpowiedź zawiera konkretne fakty
fact_indicators = ["zgodnie z", "według", "jak wskazuje", "dane pokazują"]
fact_bonus = 0.1 if any(indicator in answer.lower() for indicator in fact_indicators) else 0
total_confidence = min(base_confidence + doc_bonus + length_bonus + fact_bonus, 1.0)
return round(total_confidence, 2)
class IntelligentAssistant:
def __init__(self, rag_system: ProductionRAGSystem,
assistant_config: Dict):
self.rag_system = rag_system
self.config = assistant_config
self.conversation_history = []
self.persona = assistant_config.get("persona", self._default_persona())
def _default_persona(self) -> str:
return """
Jesteś pomocnym ekspertem-asystentem AI. Twoje cechy:
- Profesjonalny ale przyjazny ton
- Precyzyjne, oparte na faktach odpowiedzi
- Przyznawanie się do ograniczeń wiedzy
- Zadawanie pytań doprecyzujących gdy potrzeba
- Dostarczanie praktycznych, działających rozwiązań
"""
async def process_user_message(self, user_input: str,
session_id: str = "default") -> Dict:
"""Przetwarzanie wiadomości użytkownika"""
# Pobieranie kontekstu z RAG
rag_result = await self.rag_system.intelligent_search(user_input)
# Budowanie historii konwersacji
conversation_context = self._build_conversation_context(session_id)
# Tworzenie enhanced prompta
enhanced_prompt = self._create_assistant_prompt(
user_input=user_input,
rag_context=rag_result.get("context_used", ""),
conversation_history=conversation_context
)
try:
# Generowanie odpowiedzi
response = await self.rag_system.llm.agenerate([enhanced_prompt])
assistant_response = response.generations[0][0].text.strip()
# Aktualizacja historii
self._update_conversation_history(
session_id, user_input, assistant_response
)
return {
"user_input": user_input,
"assistant_response": assistant_response,
"sources_used": rag_result.get("sources", []),
"confidence": rag_result.get("confidence", 0.5),
"session_id": session_id,
"context_relevance": self._assess_context_relevance(
user_input, rag_result.get("context_used", "")
)
}
except Exception as e:
return {
"user_input": user_input,
"error": f"Failed to process message: {str(e)}",
"session_id": session_id
}
def _create_assistant_prompt(self, user_input: str,
rag_context: str,
conversation_history: str) -> str:
"""Tworzenie prompta dla asystenta"""
prompt = f"""
{self.persona}
HISTORIA KONWERSACJI:
{conversation_history}
DOSTĘPNA WIEDZA:
{rag_context}
AKTUALNE PYTANIE UŻYTKOWNIKA:
{user_input}
Odpowiedz w sposób pomocny, wykorzystując dostępną wiedzę i uwzględniając kontekst rozmowy.
Jeśli dostępna wiedza nie wystarczy, powiedz o tym użytkownikowi i zasugeruj alternatywne źródła informacji.
ODPOWIEDŹ:"""
return prompt
🛠️ Praktyczny warsztat
Projekt: Korporacyjny Asystent Wiedzy (120 min)
Scenariusz biznesowy: Firma potrzebuje inteligentnego asystenta, który pomoże pracownikom znajdować informacje w firmowej bazie wiedzy (dokumenty, procedures, FAQ).
Krok 1: Przygotowanie danych (30 min)
# Setup projektu
workshop_config = {
"openai_api_key": "your-key",
"openai_api_base": "https://your-resource.openai.azure.com/",
"openai_api_version": "2024-02-01",
"embedding_deployment": "text-embedding-ada-002",
"llm_deployment": "gpt-4-turbo",
"use_pinecone": False # Używamy FAISS dla prostoty
}
# Przykładowe źródła danych
document_sources = [
{
"type": "pdf",
"path": "company_handbook.pdf"
},
{
"type": "text",
"path": "hr_policies.txt"
},
{
"type": "web",
"path": "https://company-wiki.example.com/procedures"
}
]
Krok 2: Implementacja RAG (45 min)
async def main():
# Inicjalizacja systemu
rag_system = ProductionRAGSystem(workshop_config)
print("📚 Ingesting company documents...")
stats = await rag_system.ingest_documents(document_sources)
print(f"✅ Processed {stats['total_chunks']} document chunks")
# Testowanie wyszukiwania
test_queries = [
"What is our vacation policy?",
"How do I submit expenses?",
"What are the security guidelines for remote work?",
"Who should I contact for IT support?"
]
print("\n🔍 Testing RAG system...")
for query in test_queries:
result = await rag_system.intelligent_search(query)
print(f"\nQ: {query}")
print(f"A: {result['answer'][:200]}...")
print(f"Confidence: {result['confidence']}")
print(f"Sources: {len(result['sources'])}")
if __name__ == "__main__":
asyncio.run(main())
Krok 3: Asystent z pamięcią (30 min)
# Konfiguracja asystenta
assistant_config = {
"persona": """
Jesteś profesjonalnym asystentem HR w firmie technologicznej.
Pomagasz pracownikom w sprawach związanych z politykami firmy,
procedurami i codziennymi pytaniami organizacyjnymi.
"""
}
assistant = IntelligentAssistant(rag_system, assistant_config)
# Simulacja konwersacji
conversation_test = [
"What's our policy on working from home?",
"Can you give me more details about the equipment allowance?",
"How do I apply for this benefit?",
"Thank you, that's very helpful!"
]
print("\n💬 Testing conversational assistant...")
session_id = "employee_123"
for user_message in conversation_test:
response = await assistant.process_user_message(user_message, session_id)
print(f"\n👤 User: {user_message}")
print(f"🤖 Assistant: {response['assistant_response']}")
print(f"📊 Confidence: {response['confidence']}")
Krok 4: Interface użytkownika (15 min)
Prosty Streamlit interface:
import streamlit as st
def create_assistant_interface():
st.title("🤖 Company Knowledge Assistant")
st.write("Ask me anything about company policies and procedures!")
# Initialize session state
if "conversation_history" not in st.session_state:
st.session_state.conversation_history = []
# Chat interface
user_input = st.text_input("Your question:", key="user_input")
if st.button("Ask Assistant"):
if user_input:
# Process with assistant
with st.spinner("Thinking..."):
response = await assistant.process_user_message(
user_input,
session_id="streamlit_user"
)
# Display response
st.session_state.conversation_history.append({
"user": user_input,
"assistant": response["assistant_response"],
"confidence": response["confidence"]
})
# Show conversation history
for exchange in st.session_state.conversation_history:
st.write(f"**You:** {exchange['user']}")
st.write(f"**Assistant:** {exchange['assistant']}")
st.write(f"*Confidence: {exchange['confidence']}*")
st.write("---")
if __name__ == "__main__":
create_assistant_interface()
🎯 Zadania warsztatowe
Zadanie główne: Kompletny RAG System (90 min)
Implementacja:
- Document Processing (30 min) - ingestion różnych formatów
- Search Optimization (30 min) - tuning retrieval parameters
- Assistant Development (30 min) - conversational capabilities
Zadania dodatkowe
Zadanie 1: Advanced Retrieval (20 min)
- Implementacja hybrid search (semantic + keyword)
- Reranking wyników wyszukiwania
- Multi-query expansion
Zadanie 2: Response Quality (20 min)
- Fact-checking mechanizm
- Citation generation
- Confidence scoring improvement
Zadanie 3: Memory Management (20 min)
- Persistent conversation memory
- Context compression dla długich rozmów
- User personalization
📊 Kryteria oceny
Funkcjonalność (50 punktów)
- RAG system działa z wieloma typami dokumentów (15 pkt)
- Asystent ma pamięć konwersacji (15 pkt)
- Search quality i relevance (20 pkt)
Jakość techniczna (30 punktów)
- Code quality i dokumentacja (10 pkt)
- Error handling i monitoring (10 pkt)
- Performance optimization (10 pkt)
Dodatkowe funkcje (20 punktów)
- Advanced search features (5 pkt)
- UI/UX implementation (5 pkt)
- Quality assurance mechanisms (10 pkt)
🏆 Rezultat warsztatów
Po ukończeniu uczestnicy będą mieli:
- Działający RAG system - production-ready implementation
- Intelligent Assistant - z conversation memory
- Practical Experience - hands-on z najnowszymi technikami
- Reusable Framework - kod do wykorzystania w projektach