Back
intermediate
RAG (Retrieval-Augmented Generation)

Project: Document Q&A System

Build a production-ready RAG application with PDF upload, question answering, source citations, and a web interface. Complete implementation included.

45 min read· RAG· Project· PDF· Q&A

Project: Document Q&A System

It's time to build a real, production-ready RAG application! In this project, you'll create a document Q&A system that lets users upload PDFs, ask questions, and get accurate answers with source citations.

This is the kind of system companies use for:

  • Internal knowledge bases
  • Customer support chatbots
  • Legal document analysis
  • Research paper Q&A

Project Overview

What We're Building

A web application with:

  • PDF Upload: Users can upload multiple PDF documents
  • Intelligent Chunking: Automatic document processing
  • Semantic Search: Find relevant information quickly
  • AI Answers: Get accurate answers with GPT-4
  • Source Citations: Every answer includes sources
  • Chat History: Track conversation context
  • Web Interface: Beautiful, user-friendly UI

Semantic Search Definition: Search based on the meaning and context of queries rather than exact keyword matching, using embeddings to find conceptually similar content even when different words are used.

Tech Stack

python
# Core RAG
- LangChain (orchestration)
- OpenAI (embeddings + LLM)
- ChromaDB (vector store)

# Web Interface
- Streamlit (UI framework)
- Python 3.8+

# Document Processing
- PyPDF (PDF parsing)
- tiktoken (token counting)

Project Structure

document-qa-system/
├── app.py                 # Main Streamlit app
├── rag_system.py          # Core RAG logic
├── utils.py               # Helper functions
├── requirements.txt       # Dependencies
├── .env                   # API keys (don't commit!)
├── data/                  # Uploaded documents
│   └── uploads/
└── chroma_db/            # Vector database (auto-created)

Complete Implementation

1. requirements.txt

txt
langchain==0.1.0
langchain-openai==0.0.2
langchain-community==0.0.10
chromadb==0.4.22
streamlit==1.29.0
pypdf==3.17.4
tiktoken==0.5.2
python-dotenv==1.0.0
openai==1.7.2

Install dependencies:

bash
pip install -r requirements.txt

2. .env (Environment Variables)

bash
# .env file (create this, don't commit to git!)
OPENAI_API_KEY=your-openai-api-key-here

3. utils.py (Helper Functions)

python
"""
Utility functions for document processing and validation
"""

import os
import hashlib
from typing import List
import tiktoken
from langchain.schema import Document


<Callout type="info">
**File Hashing Definition:** Creating a unique fingerprint (hash) of file content using algorithms like MD5, enabling duplicate detection by comparing hashes rather than entire file contents.
</Callout>

def get_file_hash(file_bytes: bytes) -> str:
    """
    Generate hash of file content for duplicate detection

    Args:
        file_bytes: File content as bytes

    Returns:
        MD5 hash string
    """
    return hashlib.md5(file_bytes).hexdigest()


def count_tokens(text: str, model: str = "gpt-4") -> int:
    """
    Count tokens in text using tiktoken

    Args:
        text: Input text
        model: Model name for encoding

    Returns:
        Number of tokens
    """
    try:
        encoding = tiktoken.encoding_for_model(model)
    except KeyError:
        encoding = tiktoken.get_encoding("cl100k_base")

    return len(encoding.encode(text))


def estimate_cost(num_tokens: int, model: str = "gpt-4") -> float:
    """
    Estimate API cost based on token count

    Args:
        num_tokens: Number of tokens
        model: Model name

    Returns:
        Estimated cost in USD
    """
    # Pricing as of 2024 (check latest prices)
    prices = {
        "gpt-4": {"input": 0.03, "output": 0.06},  # per 1K tokens
        "gpt-3.5-turbo": {"input": 0.0015, "output": 0.002},
        "text-embedding-3-small": {"input": 0.00002, "output": 0},
    }

    if model not in prices:
        return 0.0

    # Rough estimate (assuming equal input/output)
    avg_price = (prices[model]["input"] + prices[model]["output"]) / 2
    return (num_tokens / 1000) * avg_price


def format_documents(docs: List[Document]) -> str:
    """
    Format documents for display

    Args:
        docs: List of Document objects

    Returns:
        Formatted string
    """
    formatted = []
    for i, doc in enumerate(docs, 1):
        source = doc.metadata.get("source", "Unknown")
        page = doc.metadata.get("page", "N/A")
        formatted.append(f"{i}. **{source}** (Page {page})")
        formatted.append(f"   {doc.page_content[:200]}...")
        formatted.append("")

    return "\n".join(formatted)


def validate_api_key(api_key: str) -> bool:
    """
    Validate OpenAI API key format

    Args:
        api_key: API key to validate

    Returns:
        True if valid format
    """
    return api_key.startswith("sk-") and len(api_key) > 20


def create_directory_if_not_exists(directory: str):
    """Create directory if it doesn't exist"""
    if not os.path.exists(directory):
        os.makedirs(directory)

4. rag_system.py (Core RAG Logic)

python
"""
RAG System for Document Q&A
"""

import os
from typing import List, Dict, Optional
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain_community.document_loaders import PyPDFLoader
from langchain.prompts import PromptTemplate
from langchain.schema import Document
import tempfile


class DocumentQASystem:
    """
    Production-ready RAG system for document Q&A
    """

    def __init__(
        self,
        api_key: str,
        persist_directory: str = "./chroma_db",
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        model_name: str = "gpt-4"
    ):
        """
        Initialize the RAG system

        Args:
            api_key: OpenAI API key
            persist_directory: Directory for vector store
            chunk_size: Size of text chunks
            chunk_overlap: Overlap between chunks
            model_name: LLM model to use
        """
        os.environ["OPENAI_API_KEY"] = api_key

        self.embeddings = OpenAIEmbeddings(
            model="text-embedding-3-small"
        )

        <Callout type="info">
**RecursiveCharacterTextSplitter Definition:** A text splitter that intelligently divides documents by trying multiple separators (paragraphs, sentences, words) in order, preserving semantic structure while respecting size limits.
</Callout>

        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", " ", ""]
        )

        self.persist_directory = persist_directory
        self.model_name = model_name
        self.vectorstore = None
        self.qa_chain = None
        self.documents = []

    def load_pdf(self, file_path: str) -> List[Document]:
        """
        Load a PDF file

        Args:
            file_path: Path to PDF file

        Returns:
            List of Document objects
        """
        loader = PyPDFLoader(file_path)
        documents = loader.load()
        return documents

    def load_pdfs_from_bytes(self, files_data: List[tuple]) -> List[Document]:
        """
        Load PDFs from uploaded file bytes

        Args:
            files_data: List of (filename, file_bytes) tuples

        Returns:
            List of Document objects
        """
        all_documents = []

        <Callout type="info">
**Temporary File Definition:** A file created in a temporary location for short-term use during processing, automatically cleaned up after use to avoid disk space accumulation and security issues.
</Callout>

        for filename, file_bytes in files_data:
            # Create temporary file
            with tempfile.NamedTemporaryFile(
                delete=False,
                suffix=".pdf"
            ) as tmp_file:
                tmp_file.write(file_bytes)
                tmp_path = tmp_file.name

            try:
                # Load PDF
                documents = self.load_pdf(tmp_path)

                # Update metadata with filename
                for doc in documents:
                    doc.metadata["source"] = filename

                all_documents.extend(documents)

            finally:
                # Clean up temp file
                os.unlink(tmp_path)

        return all_documents

    def process_documents(self, documents: List[Document]) -> List[Document]:
        """
        Split documents into chunks

        Args:
            documents: List of Document objects

        Returns:
            List of chunked Document objects
        """
        chunks = self.text_splitter.split_documents(documents)
        self.documents = chunks
        return chunks

    def create_vectorstore(self, chunks: List[Document]):
        """
        Create vector store from chunks

        Args:
            chunks: List of Document chunks
        """
        self.vectorstore = Chroma.from_documents(
            documents=chunks,
            embedding=self.embeddings,
            persist_directory=self.persist_directory
        )

    def load_vectorstore(self):
        """Load existing vector store"""
        if os.path.exists(self.persist_directory):
            self.vectorstore = Chroma(
                persist_directory=self.persist_directory,
                embedding_function=self.embeddings
            )
            return True
        return False

    def setup_qa_chain(
        self,
        temperature: float = 0,
        k: int = 4,
        search_type: str = "similarity"
    ):
        """
        Setup the QA chain

        Args:
            temperature: LLM temperature (0 = deterministic)
            k: Number of documents to retrieve
            search_type: "similarity" or "mmr"
        """
        if self.vectorstore is None:
            raise ValueError("Vector store not initialized")

        # Custom prompt for better answers
        prompt_template = """You are a helpful AI assistant that answers questions based on the provided context.

Use the following pieces of context to answer the question at the end. If you don't know the answer based on the context provided, say "I don't have enough information to answer that question based on the provided documents."

Always cite your sources by mentioning the document name and page number when possible.

Context:
{context}

Question: {question}

Detailed Answer:"""

        PROMPT = PromptTemplate(
            template=prompt_template,
            input_variables=["context", "question"]
        )

        llm = ChatOpenAI(
            model=self.model_name,
            temperature=temperature
        )

        retriever = self.vectorstore.as_retriever(
            search_type=search_type,
            search_kwargs={"k": k}
        )

        self.qa_chain = RetrievalQA.from_chain_type(
            llm=llm,
            chain_type="stuff",
            retriever=retriever,
            return_source_documents=True,
            chain_type_kwargs={"prompt": PROMPT}
        )

    def query(self, question: str) -> Dict:
        """
        Ask a question

        Args:
            question: User question

        Returns:
            Dict with answer and sources
        """
        if self.qa_chain is None:
            raise ValueError("QA chain not setup")

        result = self.qa_chain.invoke({"query": question})

        return {
            "answer": result["result"],
            "source_documents": result["source_documents"]
        }

    def ingest_documents(
        self,
        files_data: List[tuple],
        progress_callback=None
    ) -> Dict:
        """
        Complete ingestion pipeline

        Args:
            files_data: List of (filename, file_bytes) tuples
            progress_callback: Optional callback for progress updates

        Returns:
            Dict with ingestion statistics
        """
        if progress_callback:
            progress_callback("Loading PDFs...")

        # Load documents
        documents = self.load_pdfs_from_bytes(files_data)

        if progress_callback:
            progress_callback(f"Loaded {len(documents)} pages")

        # Split into chunks
        if progress_callback:
            progress_callback("Splitting into chunks...")

        chunks = self.process_documents(documents)

        if progress_callback:
            progress_callback(f"Created {len(chunks)} chunks")

        # Create vector store
        if progress_callback:
            progress_callback("Creating embeddings and vector store...")

        self.create_vectorstore(chunks)

        if progress_callback:
            progress_callback("Setting up QA chain...")

        # Setup QA chain
        self.setup_qa_chain()

        if progress_callback:
            progress_callback("Done!")

        return {
            "num_documents": len(documents),
            "num_chunks": len(chunks),
            "files_processed": [f[0] for f in files_data]
        }

    def get_stats(self) -> Dict:
        """Get system statistics"""
        return {
            "num_chunks": len(self.documents) if self.documents else 0,
            "vectorstore_count": (
                self.vectorstore._collection.count()
                if self.vectorstore else 0
            ),
            "model": self.model_name
        }

5. app.py (Streamlit Web Interface)

python
"""
Streamlit Web Interface for Document Q&A System
"""

import streamlit as st
import os
from dotenv import load_dotenv
from rag_system import DocumentQASystem
from utils import (
    get_file_hash,
    count_tokens,
    estimate_cost,
    validate_api_key,
    create_directory_if_not_exists
)

# Load environment variables
load_dotenv()

# Page config
st.set_page_config(
    page_title="Document Q&A System",
    page_icon="📚",
    layout="wide"
)

# Custom CSS
st.markdown("""
<style>
    .main-header {
        font-size: 2.5rem;
        font-weight: bold;
        margin-bottom: 1rem;
    }
    .sub-header {
        font-size: 1.2rem;
        color: #666;
        margin-bottom: 2rem;
    }
    .source-box {
        background-color: #f0f2f6;
        padding: 1rem;
        border-radius: 0.5rem;
        margin: 0.5rem 0;
    }
    .stat-box {
        background-color: #e8f4f8;
        padding: 1rem;
        border-radius: 0.5rem;
        text-align: center;
    }
</style>
""", unsafe_allow_html=True)

# Initialize session state
<Callout type="info">
**Session State Definition:** A mechanism in web frameworks like Streamlit to persist data across user interactions and page reruns, maintaining application state between requests without database storage.
</Callout>

if "rag_system" not in st.session_state:
    st.session_state.rag_system = None
if "chat_history" not in st.session_state:
    st.session_state.chat_history = []
if "uploaded_files" not in st.session_state:
    st.session_state.uploaded_files = set()


def initialize_rag_system(api_key: str):
    """Initialize RAG system"""
    create_directory_if_not_exists("./chroma_db")
    create_directory_if_not_exists("./data/uploads")

    rag = DocumentQASystem(
        api_key=api_key,
        persist_directory="./chroma_db"
    )

    return rag


def main():
    """Main application"""

    # Header
    st.markdown('<div class="main-header">📚 Document Q&A System</div>', unsafe_allow_html=True)
    st.markdown(
        '<div class="sub-header">Upload documents, ask questions, get AI-powered answers with sources</div>',
        unsafe_allow_html=True
    )

    # Sidebar
    with st.sidebar:
        st.header("⚙️ Configuration")

        # API Key
        api_key = st.text_input(
            "OpenAI API Key",
            type="password",
            value=os.getenv("OPENAI_API_KEY", ""),
            help="Enter your OpenAI API key"
        )

        if api_key and validate_api_key(api_key):
            st.success("✅ API Key Valid")

            # Initialize RAG system
            if st.session_state.rag_system is None:
                st.session_state.rag_system = initialize_rag_system(api_key)

        else:
            st.warning("⚠️ Please enter a valid API key")
            st.stop()

        st.divider()

        # Settings
        st.header("🎛️ Settings")

        model = st.selectbox(
            "Model",
            ["gpt-4", "gpt-3.5-turbo"],
            help="Choose the LLM model"
        )

        k = st.slider(
            "Documents to Retrieve",
            min_value=1,
            max_value=10,
            value=4,
            help="Number of relevant chunks to retrieve"
        )

        search_type = st.selectbox(
            "Search Type",
            ["similarity", "mmr"],
            help="Similarity or MMR (diverse results)"
        )

        st.divider()

        # Stats
        if st.session_state.rag_system:
            stats = st.session_state.rag_system.get_stats()
            st.header("📊 Statistics")
            st.metric("Documents Processed", len(st.session_state.uploaded_files))
            st.metric("Total Chunks", stats["num_chunks"])
            st.metric("Model", stats["model"])

    # Main area - Tabs
    tab1, tab2, tab3 = st.tabs(["📤 Upload Documents", "💬 Ask Questions", "📜 Chat History"])

    # TAB 1: Upload Documents
    with tab1:
        st.header("Upload PDF Documents")

        uploaded_files = st.file_uploader(
            "Choose PDF files",
            type=["pdf"],
            accept_multiple_files=True,
            help="Upload one or more PDF documents"
        )

        if uploaded_files:
            st.write(f"**{len(uploaded_files)} file(s) selected:**")

            files_data = []
            total_size = 0

            for uploaded_file in uploaded_files:
                file_bytes = uploaded_file.read()
                file_hash = get_file_hash(file_bytes)

                # Check for duplicates
                if file_hash in st.session_state.uploaded_files:
                    st.warning(f"⚠️ {uploaded_file.name} already uploaded (duplicate)")
                    continue

                files_data.append((uploaded_file.name, file_bytes))
                total_size += len(file_bytes)

                st.write(f"✅ {uploaded_file.name} ({len(file_bytes) / 1024:.1f} KB)")

            st.write(f"**Total size:** {total_size / 1024:.1f} KB")

            if st.button("🚀 Process Documents", type="primary"):
                if not files_data:
                    st.error("No new files to process")
                else:
                    # Progress bar
                    progress_bar = st.progress(0)
                    status_text = st.empty()

                    def progress_callback(message):
                        status_text.text(message)

                    try:
                        # Ingest documents
                        with st.spinner("Processing documents..."):
                            result = st.session_state.rag_system.ingest_documents(
                                files_data,
                                progress_callback=progress_callback
                            )

                        # Update uploaded files
                        for filename, file_bytes in files_data:
                            file_hash = get_file_hash(file_bytes)
                            st.session_state.uploaded_files.add(file_hash)

                        progress_bar.progress(100)

                        st.success(f"""
                        ✅ **Successfully processed!**
                        - Documents: {result['num_documents']} pages
                        - Chunks: {result['num_chunks']}
                        - Files: {', '.join(result['files_processed'])}
                        """)

                    except Exception as e:
                        st.error(f"❌ Error processing documents: {str(e)}")

    # TAB 2: Ask Questions
    with tab2:
        st.header("Ask Questions About Your Documents")

        if not st.session_state.rag_system or not st.session_state.uploaded_files:
            st.info("👆 Please upload documents first in the 'Upload Documents' tab")
        else:
            # Question input
            question = st.text_input(
                "Your Question:",
                placeholder="e.g., What is the refund policy?",
                help="Ask any question about your uploaded documents"
            )

            if st.button("🔍 Get Answer", type="primary") and question:
                with st.spinner("Searching documents and generating answer..."):
                    try:
                        # Update RAG settings
                        st.session_state.rag_system.model_name = model
                        st.session_state.rag_system.setup_qa_chain(
                            k=k,
                            search_type=search_type
                        )

                        # Get answer
                        result = st.session_state.rag_system.query(question)

                        # Display answer
                        st.markdown("### 💡 Answer")
                        st.write(result["answer"])

                        # Display sources
                        st.markdown("### 📚 Sources")
                        for i, doc in enumerate(result["source_documents"], 1):
                            with st.expander(
                                f"Source {i}: {doc.metadata.get('source', 'Unknown')} "
                                f"(Page {doc.metadata.get('page', 'N/A')})"
                            ):
                                st.write(doc.page_content)

                        # Estimate cost
                        total_tokens = count_tokens(
                            question + result["answer"] +
                            " ".join([d.page_content for d in result["source_documents"]])
                        )
                        cost = estimate_cost(total_tokens, model)

                        st.caption(f"Estimated cost: ${cost:.4f} | Tokens: {total_tokens:,}")

                        # Add to history
                        st.session_state.chat_history.append({
                            "question": question,
                            "answer": result["answer"],
                            "sources": result["source_documents"]
                        })

                    except Exception as e:
                        st.error(f"❌ Error: {str(e)}")

            # Quick examples
            st.markdown("### 💡 Example Questions")
            col1, col2, col3 = st.columns(3)

            with col1:
                if st.button("What are the main topics?"):
                    st.session_state.example_question = "What are the main topics discussed in these documents?"

            with col2:
                if st.button("Summarize key points"):
                    st.session_state.example_question = "Can you summarize the key points from these documents?"

            with col3:
                if st.button("What is mentioned about..."):
                    st.session_state.example_question = "What information is provided about [specific topic]?"

    # TAB 3: Chat History
    with tab3:
        st.header("Chat History")

        if not st.session_state.chat_history:
            st.info("No questions asked yet. Go to 'Ask Questions' tab to start!")
        else:
            for i, item in enumerate(reversed(st.session_state.chat_history), 1):
                with st.expander(f"Q{len(st.session_state.chat_history) - i + 1}: {item['question']}"):
                    st.markdown("**Answer:**")
                    st.write(item["answer"])

                    st.markdown("**Sources:**")
                    for j, doc in enumerate(item["sources"], 1):
                        st.caption(
                            f"{j}. {doc.metadata.get('source', 'Unknown')} "
                            f"(Page {doc.metadata.get('page', 'N/A')})"
                        )

            if st.button("🗑️ Clear History"):
                st.session_state.chat_history = []
                st.rerun()


if __name__ == "__main__":
    main()

Running the Application

1. Setup

bash
# Create project directory
mkdir document-qa-system
cd document-qa-system

# Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install dependencies
pip install -r requirements.txt

# Create .env file
echo "OPENAI_API_KEY=your-api-key-here" > .env

2. Run the App

bash
streamlit run app.py

The app will open in your browser at

http://localhost:8501

Usage Guide

Step 1: Enter API Key

  • In the sidebar, enter your OpenAI API key
  • Or set it in
    .env
    file

Step 2: Upload Documents

  • Go to "Upload Documents" tab
  • Select one or more PDF files
  • Click "Process Documents"
  • Wait for processing to complete

Step 3: Ask Questions

  • Go to "Ask Questions" tab
  • Type your question
  • Click "Get Answer"
  • View answer and sources

Step 4: Review History

  • Go to "Chat History" tab
  • Review all previous questions and answers
  • Clear history if needed

Advanced Features

1. Document Filtering

python
# Add to rag_system.py
def query_with_filter(self, question: str, source_filter: str = None):
    """Query with source filtering"""
    if source_filter:
        # Filter retriever by metadata
        retriever = self.vectorstore.as_retriever(
            search_kwargs={
                "k": 4,
                "filter": {"source": source_filter}
            }
        )
        # Create temporary QA chain with filtered retriever
        # ... (implementation)

2. Multi-language Support

python
# Add language detection
from langdetect import detect

def detect_language(text: str) -> str:
    """Detect text language"""
    return detect(text)

# Adjust prompt based on language
if detect_language(question) == "es":
    prompt_template = """Responde en español..."""

3. Export Q&A to PDF

python
# Add to app.py
from reportlab.pdfgen import canvas

def export_qa_to_pdf(chat_history, filename="qa_export.pdf"):
    """Export chat history to PDF"""
    c = canvas.Canvas(filename)
    y = 800

    for item in chat_history:
        c.drawString(50, y, f"Q: {item['question']}")
        y -= 20
        c.drawString(50, y, f"A: {item['answer']}")
        y -= 40

    c.save()

# Add button in UI
if st.button("📥 Export to PDF"):
    export_qa_to_pdf(st.session_state.chat_history)
    st.success("Exported to qa_export.pdf")

Production Deployment

Deploy to Streamlit Cloud

  1. Push code to GitHub
  2. Go to share.streamlit.io
  3. Connect repository
  4. Add secrets (API keys) in dashboard
  5. Deploy!

Environment Variables

toml
# .streamlit/secrets.toml (for Streamlit Cloud)
OPENAI_API_KEY = "sk-..."

Docker Deployment

dockerfile
# Dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

EXPOSE 8501

CMD ["streamlit", "run", "app.py", "--server.port=8501", "--server.address=0.0.0.0"]
bash
# Build and run
docker build -t document-qa .
docker run -p 8501:8501 -e OPENAI_API_KEY=your-key document-qa

Best Practices

1. Error Handling

python
try:
    result = rag_system.query(question)
except Exception as e:
    logger.error(f"Query failed: {e}")
    st.error("Sorry, something went wrong. Please try again.")

2. Rate Limiting

python
import time
from functools import wraps

def rate_limit(max_calls=10, time_window=60):
    """Limit API calls"""
    calls = []

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            now = time.time()
            calls[:] = [c for c in calls if c > now - time_window]

            if len(calls) >= max_calls:
                raise Exception("Rate limit exceeded")

            calls.append(now)
            return func(*args, **kwargs)
        return wrapper
    return decorator

3. Caching

python
@st.cache_data(ttl=3600)
def cached_query(question: str):
    """Cache queries for 1 hour"""
    return rag_system.query(question)

Summary

You've built a production-ready document Q&A system with:

  1. PDF Upload: Multi-file support with duplicate detection
  2. Smart Processing: Intelligent chunking and embedding
  3. Semantic Search: Vector-based retrieval
  4. AI Answers: GPT-4 powered responses
  5. Source Citations: Transparent, verifiable answers
  6. Web Interface: Beautiful, user-friendly UI
  7. Chat History: Track conversations

This project demonstrates all core RAG concepts in a real application that you can deploy and use today!