Sesja 15: Warsztaty - RAG i asystenci AI

Praktyczna implementacja end-to-end

🎯 Cele warsztatów

  • Implementacja kompletnego systemu RAG (Retrieval Augmented Generation)
  • Budowa inteligentnych asystentów AI z pamięcią konwersacji
  • Integracja z bazami wiedzy i dokumentami
  • Optymalizacja jakości odpowiedzi i relevance

🔗 Retrieval Augmented Generation (RAG)

Architektura systemu RAG

ZAPYTANIE → RETRIEVAL → CONTEXT INJECTION → LLM → ODPOWIEDŹ
    ↓           ↓            ↓             ↓         ↓
EMBEDDING → VECTOR DB → PROMPT BUILDING → GPT-4 → VALIDATION

Kluczowe komponenty:

  1. Document Processing - przetwarzanie i indeksowanie dokumentów
  2. Vector Storage - przechowywanie embeddingów
  3. Retrieval System - wyszukiwanie relevantnych fragmentów
  4. Context Integration - łączenie kontekstu z zapytaniem
  5. Response Generation - generowanie odpowiedzi przez LLM

💻 Implementacja production-ready RAG

import asyncio
from typing import List, Dict, Optional, Tuple
from langchain.document_loaders import PyPDFLoader, TextLoader, WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Pinecone, FAISS
from langchain.llms import AzureOpenAI
from langchain.chains import RetrievalQA, ConversationChain
from langchain.memory import ConversationBufferWindowMemory
import pinecone

class ProductionRAGSystem:
    def __init__(self, config: Dict):
        self.config = config
        self.embeddings = OpenAIEmbeddings(
            openai_api_key=config["openai_api_key"],
            deployment=config["embedding_deployment"]
        )
        
        self.llm = AzureOpenAI(
            deployment_name=config["llm_deployment"],
            openai_api_key=config["openai_api_key"],
            openai_api_base=config["openai_api_base"],
            openai_api_version=config["openai_api_version"]
        )
        
        self.vector_store = None
        self.conversation_memory = ConversationBufferWindowMemory(
            k=10,  # Pamiętaj ostatnie 10 wymian
            return_messages=True
        )
        
    async def ingest_documents(self, document_sources: List[Dict]) -> Dict:
        """Przetwarzanie i indeksowanie dokumentów"""
        
        print("🔄 Starting document ingestion...")
        
        all_documents = []
        ingestion_stats = {
            "total_sources": len(document_sources),
            "successful_loads": 0,
            "total_chunks": 0,
            "failed_sources": []
        }
        
        # Przetwarzanie różnych typów źródeł
        for source in document_sources:
            try:
                documents = await self._load_documents(source)
                all_documents.extend(documents)
                ingestion_stats["successful_loads"] += 1
                print(f"✅ Loaded {len(documents)} documents from {source['type']}")
                
            except Exception as e:
                print(f"❌ Failed to load from {source['type']}: {str(e)}")
                ingestion_stats["failed_sources"].append({
                    "source": source,
                    "error": str(e)
                })
        
        if not all_documents:
            raise ValueError("No documents were successfully loaded")
        
        # Chunking dokumentów
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            length_function=len,
            separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""]
        )
        
        document_chunks = text_splitter.split_documents(all_documents)
        ingestion_stats["total_chunks"] = len(document_chunks)
        
        print(f"📄 Created {len(document_chunks)} chunks from {len(all_documents)} documents")
        
        # Tworzenie vector store
        if self.config.get("use_pinecone", False):
            self.vector_store = await self._create_pinecone_index(document_chunks)
        else:
            self.vector_store = FAISS.from_documents(document_chunks, self.embeddings)
            
            # Zapisz lokalnie dla persistence
            self.vector_store.save_local("faiss_index")
            
        print("✅ Vector store created successfully")
        
        return ingestion_stats
    
    async def _load_documents(self, source: Dict) -> List:
        """Ładowanie dokumentów z różnych źródeł"""
        
        source_type = source["type"]
        source_path = source["path"]
        
        if source_type == "pdf":
            loader = PyPDFLoader(source_path)
        elif source_type == "text":
            loader = TextLoader(source_path, encoding="utf-8")
        elif source_type == "web":
            loader = WebBaseLoader(source_path)
        else:
            raise ValueError(f"Unsupported source type: {source_type}")
        
        documents = loader.load()
        
        # Dodaj metadata
        for doc in documents:
            doc.metadata.update({
                "source_type": source_type,
                "source_path": source_path,
                "ingestion_timestamp": datetime.utcnow().isoformat()
            })
        
        return documents
    
    async def _create_pinecone_index(self, documents: List) -> pinecone.Index:
        """Tworzenie indeksu Pinecone"""
        
        # Inicjalizacja Pinecone
        pinecone.init(
            api_key=self.config["pinecone_api_key"],
            environment=self.config["pinecone_environment"]
        )
        
        index_name = self.config["pinecone_index_name"]
        
        # Sprawdź czy indeks istnieje
        if index_name not in pinecone.list_indexes():
            # Stwórz nowy indeks
            pinecone.create_index(
                name=index_name,
                dimension=1536,  # OpenAI embeddings dimension
                metric="cosine"
            )
            
        # Połącz z indeksem
        vector_store = Pinecone.from_documents(
            documents,
            self.embeddings,
            index_name=index_name
        )
        
        return vector_store
    
    async def intelligent_search(self, query: str, 
                                search_options: Optional[Dict] = None) -> Dict:
        """Inteligentne wyszukiwanie z kontekstem"""
        
        if not self.vector_store:
            raise ValueError("Vector store not initialized. Run ingest_documents first.")
        
        options = search_options or {}
        k = options.get("k", 4)  # Number of documents to retrieve
        
        # Wyszukiwanie relevantnych dokumentów
        relevant_docs = self.vector_store.similarity_search(query, k=k)
        
        # Przygotowanie kontekstu
        context_chunks = []
        sources = []
        
        for doc in relevant_docs:
            context_chunks.append(doc.page_content)
            sources.append({
                "source": doc.metadata.get("source_path", "Unknown"),
                "content_preview": doc.page_content[:200] + "..."
            })
        
        combined_context = "\n\n".join(context_chunks)
        
        # Generowanie odpowiedzi z kontekstem
        enhanced_prompt = self._build_rag_prompt(query, combined_context)
        
        try:
            response = await self.llm.agenerate([enhanced_prompt])
            answer = response.generations[0][0].text.strip()
            
            return {
                "query": query,
                "answer": answer,
                "context_used": combined_context,
                "sources": sources,
                "confidence": self._calculate_confidence(relevant_docs, answer)
            }
            
        except Exception as e:
            return {
                "query": query,
                "error": f"Failed to generate response: {str(e)}",
                "sources": sources
            }
    
    def _build_rag_prompt(self, query: str, context: str) -> str:
        """Budowanie prompta RAG"""
        
        prompt_template = f"""
Jesteś ekspertem-asystentem AI. Odpowiadaj na pytania na podstawie dostarczonego kontekstu.

WAŻNE ZASADY:
1. Użyj TYLKO informacji z dostarczonego kontekstu
2. Jeśli kontekst nie zawiera odpowiedzi, powiedz to jasno
3. Cytuj konkretne fragmenty z kontekstu gdy to możliwe
4. Bądź precyzyjny i faktograficzny

KONTEKST:
{context}

PYTANIE: {query}

ODPOWIEDŹ:"""
        
        return prompt_template
    
    def _calculate_confidence(self, documents: List, answer: str) -> float:
        """Obliczanie poziomu pewności odpowiedzi"""
        
        # Prosty scoring na podstawie jakości źródeł
        base_confidence = 0.5
        
        # Bonus za liczbę relevantnych dokumentów
        doc_bonus = min(len(documents) * 0.1, 0.3)
        
        # Bonus za długość odpowiedzi (więcej szczegółów = wyższa pewność)
        length_bonus = min(len(answer) / 1000, 0.2)
        
        # Sprawdź czy odpowiedź zawiera konkretne fakty
        fact_indicators = ["zgodnie z", "według", "jak wskazuje", "dane pokazują"]
        fact_bonus = 0.1 if any(indicator in answer.lower() for indicator in fact_indicators) else 0
        
        total_confidence = min(base_confidence + doc_bonus + length_bonus + fact_bonus, 1.0)
        
        return round(total_confidence, 2)

class IntelligentAssistant:
    def __init__(self, rag_system: ProductionRAGSystem, 
                 assistant_config: Dict):
        self.rag_system = rag_system
        self.config = assistant_config
        self.conversation_history = []
        self.persona = assistant_config.get("persona", self._default_persona())
        
    def _default_persona(self) -> str:
        return """
Jesteś pomocnym ekspertem-asystentem AI. Twoje cechy:
- Profesjonalny ale przyjazny ton
- Precyzyjne, oparte na faktach odpowiedzi  
- Przyznawanie się do ograniczeń wiedzy
- Zadawanie pytań doprecyzujących gdy potrzeba
- Dostarczanie praktycznych, działających rozwiązań
"""
    
    async def process_user_message(self, user_input: str, 
                                  session_id: str = "default") -> Dict:
        """Przetwarzanie wiadomości użytkownika"""
        
        # Pobieranie kontekstu z RAG
        rag_result = await self.rag_system.intelligent_search(user_input)
        
        # Budowanie historii konwersacji
        conversation_context = self._build_conversation_context(session_id)
        
        # Tworzenie enhanced prompta
        enhanced_prompt = self._create_assistant_prompt(
            user_input=user_input,
            rag_context=rag_result.get("context_used", ""),
            conversation_history=conversation_context
        )
        
        try:
            # Generowanie odpowiedzi
            response = await self.rag_system.llm.agenerate([enhanced_prompt])
            assistant_response = response.generations[0][0].text.strip()
            
            # Aktualizacja historii
            self._update_conversation_history(
                session_id, user_input, assistant_response
            )
            
            return {
                "user_input": user_input,
                "assistant_response": assistant_response,
                "sources_used": rag_result.get("sources", []),
                "confidence": rag_result.get("confidence", 0.5),
                "session_id": session_id,
                "context_relevance": self._assess_context_relevance(
                    user_input, rag_result.get("context_used", "")
                )
            }
            
        except Exception as e:
            return {
                "user_input": user_input,
                "error": f"Failed to process message: {str(e)}",
                "session_id": session_id
            }
    
    def _create_assistant_prompt(self, user_input: str, 
                               rag_context: str, 
                               conversation_history: str) -> str:
        """Tworzenie prompta dla asystenta"""
        
        prompt = f"""
{self.persona}

HISTORIA KONWERSACJI:
{conversation_history}

DOSTĘPNA WIEDZA:
{rag_context}

AKTUALNE PYTANIE UŻYTKOWNIKA:
{user_input}

Odpowiedz w sposób pomocny, wykorzystując dostępną wiedzę i uwzględniając kontekst rozmowy.
Jeśli dostępna wiedza nie wystarczy, powiedz o tym użytkownikowi i zasugeruj alternatywne źródła informacji.

ODPOWIEDŹ:"""
        
        return prompt

🛠️ Praktyczny warsztat

Projekt: Korporacyjny Asystent Wiedzy (120 min)

Scenariusz biznesowy: Firma potrzebuje inteligentnego asystenta, który pomoże pracownikom znajdować informacje w firmowej bazie wiedzy (dokumenty, procedures, FAQ).

Krok 1: Przygotowanie danych (30 min)

# Setup projektu
workshop_config = {
    "openai_api_key": "your-key",
    "openai_api_base": "https://your-resource.openai.azure.com/",
    "openai_api_version": "2024-02-01", 
    "embedding_deployment": "text-embedding-ada-002",
    "llm_deployment": "gpt-4-turbo",
    "use_pinecone": False  # Używamy FAISS dla prostoty
}

# Przykładowe źródła danych
document_sources = [
    {
        "type": "pdf",
        "path": "company_handbook.pdf"
    },
    {
        "type": "text", 
        "path": "hr_policies.txt"
    },
    {
        "type": "web",
        "path": "https://company-wiki.example.com/procedures"
    }
]

Krok 2: Implementacja RAG (45 min)

async def main():
    # Inicjalizacja systemu
    rag_system = ProductionRAGSystem(workshop_config)
    
    print("📚 Ingesting company documents...")
    stats = await rag_system.ingest_documents(document_sources)
    print(f"✅ Processed {stats['total_chunks']} document chunks")
    
    # Testowanie wyszukiwania
    test_queries = [
        "What is our vacation policy?",
        "How do I submit expenses?", 
        "What are the security guidelines for remote work?",
        "Who should I contact for IT support?"
    ]
    
    print("\n🔍 Testing RAG system...")
    for query in test_queries:
        result = await rag_system.intelligent_search(query)
        print(f"\nQ: {query}")
        print(f"A: {result['answer'][:200]}...")
        print(f"Confidence: {result['confidence']}")
        print(f"Sources: {len(result['sources'])}")

if __name__ == "__main__":
    asyncio.run(main())

Krok 3: Asystent z pamięcią (30 min)

# Konfiguracja asystenta
assistant_config = {
    "persona": """
Jesteś profesjonalnym asystentem HR w firmie technologicznej.
Pomagasz pracownikom w sprawach związanych z politykami firmy,
procedurami i codziennymi pytaniami organizacyjnymi.
""" 
}

assistant = IntelligentAssistant(rag_system, assistant_config)

# Simulacja konwersacji
conversation_test = [
    "What's our policy on working from home?",
    "Can you give me more details about the equipment allowance?",
    "How do I apply for this benefit?",
    "Thank you, that's very helpful!"
]

print("\n💬 Testing conversational assistant...")
session_id = "employee_123"

for user_message in conversation_test:
    response = await assistant.process_user_message(user_message, session_id)
    
    print(f"\n👤 User: {user_message}")
    print(f"🤖 Assistant: {response['assistant_response']}")
    print(f"📊 Confidence: {response['confidence']}")

Krok 4: Interface użytkownika (15 min)

Prosty Streamlit interface:

import streamlit as st

def create_assistant_interface():
    st.title("🤖 Company Knowledge Assistant")
    st.write("Ask me anything about company policies and procedures!")
    
    # Initialize session state
    if "conversation_history" not in st.session_state:
        st.session_state.conversation_history = []
    
    # Chat interface
    user_input = st.text_input("Your question:", key="user_input")
    
    if st.button("Ask Assistant"):
        if user_input:
            # Process with assistant
            with st.spinner("Thinking..."):
                response = await assistant.process_user_message(
                    user_input, 
                    session_id="streamlit_user"
                )
            
            # Display response
            st.session_state.conversation_history.append({
                "user": user_input,
                "assistant": response["assistant_response"],
                "confidence": response["confidence"]
            })
    
    # Show conversation history
    for exchange in st.session_state.conversation_history:
        st.write(f"**You:** {exchange['user']}")
        st.write(f"**Assistant:** {exchange['assistant']}")
        st.write(f"*Confidence: {exchange['confidence']}*")
        st.write("---")

if __name__ == "__main__":
    create_assistant_interface()

🎯 Zadania warsztatowe

Zadanie główne: Kompletny RAG System (90 min)

Implementacja:

  1. Document Processing (30 min) - ingestion różnych formatów
  2. Search Optimization (30 min) - tuning retrieval parameters
  3. Assistant Development (30 min) - conversational capabilities

Zadania dodatkowe

Zadanie 1: Advanced Retrieval (20 min)

  • Implementacja hybrid search (semantic + keyword)
  • Reranking wyników wyszukiwania
  • Multi-query expansion

Zadanie 2: Response Quality (20 min)

  • Fact-checking mechanizm
  • Citation generation
  • Confidence scoring improvement

Zadanie 3: Memory Management (20 min)

  • Persistent conversation memory
  • Context compression dla długich rozmów
  • User personalization

📊 Kryteria oceny

Funkcjonalność (50 punktów)

  • RAG system działa z wieloma typami dokumentów (15 pkt)
  • Asystent ma pamięć konwersacji (15 pkt)
  • Search quality i relevance (20 pkt)

Jakość techniczna (30 punktów)

  • Code quality i dokumentacja (10 pkt)
  • Error handling i monitoring (10 pkt)
  • Performance optimization (10 pkt)

Dodatkowe funkcje (20 punktów)

  • Advanced search features (5 pkt)
  • UI/UX implementation (5 pkt)
  • Quality assurance mechanisms (10 pkt)

🏆 Rezultat warsztatów

Po ukończeniu uczestnicy będą mieli:

  1. Działający RAG system - production-ready implementation
  2. Intelligent Assistant - z conversation memory
  3. Practical Experience - hands-on z najnowszymi technikami
  4. Reusable Framework - kod do wykorzystania w projektach

📚 Zasoby dodatkowe

💡 Wskazówka

Każda sesja to 2 godziny intensywnej nauki z praktycznymi ćwiczeniami. Materiały można przeglądać w dowolnym tempie.

📈 Postęp

Śledź swój postęp w nauce AI i przygotowaniu do certyfikacji Azure AI-102. Każdy moduł buduje na poprzednim.