Project: Document Q&A System
It's time to build a real, production-ready RAG application! In this project, you'll create a document Q&A system that lets users upload PDFs, ask questions, and get accurate answers with source citations.
This is the kind of system companies use for:
- Internal knowledge bases
- Customer support chatbots
- Legal document analysis
- Research paper Q&A
Project Overview
What We're Building
A web application with:
- PDF Upload: Users can upload multiple PDF documents
- Intelligent Chunking: Automatic document processing
- Semantic Search: Find relevant information quickly
- AI Answers: Get accurate answers with GPT-4
- Source Citations: Every answer includes sources
- Chat History: Track conversation context
- Web Interface: Beautiful, user-friendly UI
Semantic Search Definition: Search based on the meaning and context of queries rather than exact keyword matching, using embeddings to find conceptually similar content even when different words are used.
Tech Stack
# Core RAG
- LangChain (orchestration)
- OpenAI (embeddings + LLM)
- ChromaDB (vector store)
# Web Interface
- Streamlit (UI framework)
- Python 3.8+
# Document Processing
- PyPDF (PDF parsing)
- tiktoken (token counting)
Project Structure
document-qa-system/
├── app.py # Main Streamlit app
├── rag_system.py # Core RAG logic
├── utils.py # Helper functions
├── requirements.txt # Dependencies
├── .env # API keys (don't commit!)
├── data/ # Uploaded documents
│ └── uploads/
└── chroma_db/ # Vector database (auto-created)
Complete Implementation
1. requirements.txt
langchain==0.1.0
langchain-openai==0.0.2
langchain-community==0.0.10
chromadb==0.4.22
streamlit==1.29.0
pypdf==3.17.4
tiktoken==0.5.2
python-dotenv==1.0.0
openai==1.7.2
Install dependencies:
pip install -r requirements.txt
2. .env (Environment Variables)
# .env file (create this, don't commit to git!)
OPENAI_API_KEY=your-openai-api-key-here
3. utils.py (Helper Functions)
"""
Utility functions for document processing and validation
"""
import os
import hashlib
from typing import List
import tiktoken
from langchain.schema import Document
<Callout type="info">
**File Hashing Definition:** Creating a unique fingerprint (hash) of file content using algorithms like MD5, enabling duplicate detection by comparing hashes rather than entire file contents.
</Callout>
def get_file_hash(file_bytes: bytes) -> str:
"""
Generate hash of file content for duplicate detection
Args:
file_bytes: File content as bytes
Returns:
MD5 hash string
"""
return hashlib.md5(file_bytes).hexdigest()
def count_tokens(text: str, model: str = "gpt-4") -> int:
"""
Count tokens in text using tiktoken
Args:
text: Input text
model: Model name for encoding
Returns:
Number of tokens
"""
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(text))
def estimate_cost(num_tokens: int, model: str = "gpt-4") -> float:
"""
Estimate API cost based on token count
Args:
num_tokens: Number of tokens
model: Model name
Returns:
Estimated cost in USD
"""
# Pricing as of 2024 (check latest prices)
prices = {
"gpt-4": {"input": 0.03, "output": 0.06}, # per 1K tokens
"gpt-3.5-turbo": {"input": 0.0015, "output": 0.002},
"text-embedding-3-small": {"input": 0.00002, "output": 0},
}
if model not in prices:
return 0.0
# Rough estimate (assuming equal input/output)
avg_price = (prices[model]["input"] + prices[model]["output"]) / 2
return (num_tokens / 1000) * avg_price
def format_documents(docs: List[Document]) -> str:
"""
Format documents for display
Args:
docs: List of Document objects
Returns:
Formatted string
"""
formatted = []
for i, doc in enumerate(docs, 1):
source = doc.metadata.get("source", "Unknown")
page = doc.metadata.get("page", "N/A")
formatted.append(f"{i}. **{source}** (Page {page})")
formatted.append(f" {doc.page_content[:200]}...")
formatted.append("")
return "\n".join(formatted)
def validate_api_key(api_key: str) -> bool:
"""
Validate OpenAI API key format
Args:
api_key: API key to validate
Returns:
True if valid format
"""
return api_key.startswith("sk-") and len(api_key) > 20
def create_directory_if_not_exists(directory: str):
"""Create directory if it doesn't exist"""
if not os.path.exists(directory):
os.makedirs(directory)
4. rag_system.py (Core RAG Logic)
"""
RAG System for Document Q&A
"""
import os
from typing import List, Dict, Optional
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain_community.document_loaders import PyPDFLoader
from langchain.prompts import PromptTemplate
from langchain.schema import Document
import tempfile
class DocumentQASystem:
"""
Production-ready RAG system for document Q&A
"""
def __init__(
self,
api_key: str,
persist_directory: str = "./chroma_db",
chunk_size: int = 1000,
chunk_overlap: int = 200,
model_name: str = "gpt-4"
):
"""
Initialize the RAG system
Args:
api_key: OpenAI API key
persist_directory: Directory for vector store
chunk_size: Size of text chunks
chunk_overlap: Overlap between chunks
model_name: LLM model to use
"""
os.environ["OPENAI_API_KEY"] = api_key
self.embeddings = OpenAIEmbeddings(
model="text-embedding-3-small"
)
<Callout type="info">
**RecursiveCharacterTextSplitter Definition:** A text splitter that intelligently divides documents by trying multiple separators (paragraphs, sentences, words) in order, preserving semantic structure while respecting size limits.
</Callout>
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", " ", ""]
)
self.persist_directory = persist_directory
self.model_name = model_name
self.vectorstore = None
self.qa_chain = None
self.documents = []
def load_pdf(self, file_path: str) -> List[Document]:
"""
Load a PDF file
Args:
file_path: Path to PDF file
Returns:
List of Document objects
"""
loader = PyPDFLoader(file_path)
documents = loader.load()
return documents
def load_pdfs_from_bytes(self, files_data: List[tuple]) -> List[Document]:
"""
Load PDFs from uploaded file bytes
Args:
files_data: List of (filename, file_bytes) tuples
Returns:
List of Document objects
"""
all_documents = []
<Callout type="info">
**Temporary File Definition:** A file created in a temporary location for short-term use during processing, automatically cleaned up after use to avoid disk space accumulation and security issues.
</Callout>
for filename, file_bytes in files_data:
# Create temporary file
with tempfile.NamedTemporaryFile(
delete=False,
suffix=".pdf"
) as tmp_file:
tmp_file.write(file_bytes)
tmp_path = tmp_file.name
try:
# Load PDF
documents = self.load_pdf(tmp_path)
# Update metadata with filename
for doc in documents:
doc.metadata["source"] = filename
all_documents.extend(documents)
finally:
# Clean up temp file
os.unlink(tmp_path)
return all_documents
def process_documents(self, documents: List[Document]) -> List[Document]:
"""
Split documents into chunks
Args:
documents: List of Document objects
Returns:
List of chunked Document objects
"""
chunks = self.text_splitter.split_documents(documents)
self.documents = chunks
return chunks
def create_vectorstore(self, chunks: List[Document]):
"""
Create vector store from chunks
Args:
chunks: List of Document chunks
"""
self.vectorstore = Chroma.from_documents(
documents=chunks,
embedding=self.embeddings,
persist_directory=self.persist_directory
)
def load_vectorstore(self):
"""Load existing vector store"""
if os.path.exists(self.persist_directory):
self.vectorstore = Chroma(
persist_directory=self.persist_directory,
embedding_function=self.embeddings
)
return True
return False
def setup_qa_chain(
self,
temperature: float = 0,
k: int = 4,
search_type: str = "similarity"
):
"""
Setup the QA chain
Args:
temperature: LLM temperature (0 = deterministic)
k: Number of documents to retrieve
search_type: "similarity" or "mmr"
"""
if self.vectorstore is None:
raise ValueError("Vector store not initialized")
# Custom prompt for better answers
prompt_template = """You are a helpful AI assistant that answers questions based on the provided context.
Use the following pieces of context to answer the question at the end. If you don't know the answer based on the context provided, say "I don't have enough information to answer that question based on the provided documents."
Always cite your sources by mentioning the document name and page number when possible.
Context:
{context}
Question: {question}
Detailed Answer:"""
PROMPT = PromptTemplate(
template=prompt_template,
input_variables=["context", "question"]
)
llm = ChatOpenAI(
model=self.model_name,
temperature=temperature
)
retriever = self.vectorstore.as_retriever(
search_type=search_type,
search_kwargs={"k": k}
)
self.qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True,
chain_type_kwargs={"prompt": PROMPT}
)
def query(self, question: str) -> Dict:
"""
Ask a question
Args:
question: User question
Returns:
Dict with answer and sources
"""
if self.qa_chain is None:
raise ValueError("QA chain not setup")
result = self.qa_chain.invoke({"query": question})
return {
"answer": result["result"],
"source_documents": result["source_documents"]
}
def ingest_documents(
self,
files_data: List[tuple],
progress_callback=None
) -> Dict:
"""
Complete ingestion pipeline
Args:
files_data: List of (filename, file_bytes) tuples
progress_callback: Optional callback for progress updates
Returns:
Dict with ingestion statistics
"""
if progress_callback:
progress_callback("Loading PDFs...")
# Load documents
documents = self.load_pdfs_from_bytes(files_data)
if progress_callback:
progress_callback(f"Loaded {len(documents)} pages")
# Split into chunks
if progress_callback:
progress_callback("Splitting into chunks...")
chunks = self.process_documents(documents)
if progress_callback:
progress_callback(f"Created {len(chunks)} chunks")
# Create vector store
if progress_callback:
progress_callback("Creating embeddings and vector store...")
self.create_vectorstore(chunks)
if progress_callback:
progress_callback("Setting up QA chain...")
# Setup QA chain
self.setup_qa_chain()
if progress_callback:
progress_callback("Done!")
return {
"num_documents": len(documents),
"num_chunks": len(chunks),
"files_processed": [f[0] for f in files_data]
}
def get_stats(self) -> Dict:
"""Get system statistics"""
return {
"num_chunks": len(self.documents) if self.documents else 0,
"vectorstore_count": (
self.vectorstore._collection.count()
if self.vectorstore else 0
),
"model": self.model_name
}
5. app.py (Streamlit Web Interface)
"""
Streamlit Web Interface for Document Q&A System
"""
import streamlit as st
import os
from dotenv import load_dotenv
from rag_system import DocumentQASystem
from utils import (
get_file_hash,
count_tokens,
estimate_cost,
validate_api_key,
create_directory_if_not_exists
)
# Load environment variables
load_dotenv()
# Page config
st.set_page_config(
page_title="Document Q&A System",
page_icon="📚",
layout="wide"
)
# Custom CSS
st.markdown("""
<style>
.main-header {
font-size: 2.5rem;
font-weight: bold;
margin-bottom: 1rem;
}
.sub-header {
font-size: 1.2rem;
color: #666;
margin-bottom: 2rem;
}
.source-box {
background-color: #f0f2f6;
padding: 1rem;
border-radius: 0.5rem;
margin: 0.5rem 0;
}
.stat-box {
background-color: #e8f4f8;
padding: 1rem;
border-radius: 0.5rem;
text-align: center;
}
</style>
""", unsafe_allow_html=True)
# Initialize session state
<Callout type="info">
**Session State Definition:** A mechanism in web frameworks like Streamlit to persist data across user interactions and page reruns, maintaining application state between requests without database storage.
</Callout>
if "rag_system" not in st.session_state:
st.session_state.rag_system = None
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
if "uploaded_files" not in st.session_state:
st.session_state.uploaded_files = set()
def initialize_rag_system(api_key: str):
"""Initialize RAG system"""
create_directory_if_not_exists("./chroma_db")
create_directory_if_not_exists("./data/uploads")
rag = DocumentQASystem(
api_key=api_key,
persist_directory="./chroma_db"
)
return rag
def main():
"""Main application"""
# Header
st.markdown('<div class="main-header">📚 Document Q&A System</div>', unsafe_allow_html=True)
st.markdown(
'<div class="sub-header">Upload documents, ask questions, get AI-powered answers with sources</div>',
unsafe_allow_html=True
)
# Sidebar
with st.sidebar:
st.header("⚙️ Configuration")
# API Key
api_key = st.text_input(
"OpenAI API Key",
type="password",
value=os.getenv("OPENAI_API_KEY", ""),
help="Enter your OpenAI API key"
)
if api_key and validate_api_key(api_key):
st.success("✅ API Key Valid")
# Initialize RAG system
if st.session_state.rag_system is None:
st.session_state.rag_system = initialize_rag_system(api_key)
else:
st.warning("⚠️ Please enter a valid API key")
st.stop()
st.divider()
# Settings
st.header("🎛️ Settings")
model = st.selectbox(
"Model",
["gpt-4", "gpt-3.5-turbo"],
help="Choose the LLM model"
)
k = st.slider(
"Documents to Retrieve",
min_value=1,
max_value=10,
value=4,
help="Number of relevant chunks to retrieve"
)
search_type = st.selectbox(
"Search Type",
["similarity", "mmr"],
help="Similarity or MMR (diverse results)"
)
st.divider()
# Stats
if st.session_state.rag_system:
stats = st.session_state.rag_system.get_stats()
st.header("📊 Statistics")
st.metric("Documents Processed", len(st.session_state.uploaded_files))
st.metric("Total Chunks", stats["num_chunks"])
st.metric("Model", stats["model"])
# Main area - Tabs
tab1, tab2, tab3 = st.tabs(["📤 Upload Documents", "💬 Ask Questions", "📜 Chat History"])
# TAB 1: Upload Documents
with tab1:
st.header("Upload PDF Documents")
uploaded_files = st.file_uploader(
"Choose PDF files",
type=["pdf"],
accept_multiple_files=True,
help="Upload one or more PDF documents"
)
if uploaded_files:
st.write(f"**{len(uploaded_files)} file(s) selected:**")
files_data = []
total_size = 0
for uploaded_file in uploaded_files:
file_bytes = uploaded_file.read()
file_hash = get_file_hash(file_bytes)
# Check for duplicates
if file_hash in st.session_state.uploaded_files:
st.warning(f"⚠️ {uploaded_file.name} already uploaded (duplicate)")
continue
files_data.append((uploaded_file.name, file_bytes))
total_size += len(file_bytes)
st.write(f"✅ {uploaded_file.name} ({len(file_bytes) / 1024:.1f} KB)")
st.write(f"**Total size:** {total_size / 1024:.1f} KB")
if st.button("🚀 Process Documents", type="primary"):
if not files_data:
st.error("No new files to process")
else:
# Progress bar
progress_bar = st.progress(0)
status_text = st.empty()
def progress_callback(message):
status_text.text(message)
try:
# Ingest documents
with st.spinner("Processing documents..."):
result = st.session_state.rag_system.ingest_documents(
files_data,
progress_callback=progress_callback
)
# Update uploaded files
for filename, file_bytes in files_data:
file_hash = get_file_hash(file_bytes)
st.session_state.uploaded_files.add(file_hash)
progress_bar.progress(100)
st.success(f"""
✅ **Successfully processed!**
- Documents: {result['num_documents']} pages
- Chunks: {result['num_chunks']}
- Files: {', '.join(result['files_processed'])}
""")
except Exception as e:
st.error(f"❌ Error processing documents: {str(e)}")
# TAB 2: Ask Questions
with tab2:
st.header("Ask Questions About Your Documents")
if not st.session_state.rag_system or not st.session_state.uploaded_files:
st.info("👆 Please upload documents first in the 'Upload Documents' tab")
else:
# Question input
question = st.text_input(
"Your Question:",
placeholder="e.g., What is the refund policy?",
help="Ask any question about your uploaded documents"
)
if st.button("🔍 Get Answer", type="primary") and question:
with st.spinner("Searching documents and generating answer..."):
try:
# Update RAG settings
st.session_state.rag_system.model_name = model
st.session_state.rag_system.setup_qa_chain(
k=k,
search_type=search_type
)
# Get answer
result = st.session_state.rag_system.query(question)
# Display answer
st.markdown("### 💡 Answer")
st.write(result["answer"])
# Display sources
st.markdown("### 📚 Sources")
for i, doc in enumerate(result["source_documents"], 1):
with st.expander(
f"Source {i}: {doc.metadata.get('source', 'Unknown')} "
f"(Page {doc.metadata.get('page', 'N/A')})"
):
st.write(doc.page_content)
# Estimate cost
total_tokens = count_tokens(
question + result["answer"] +
" ".join([d.page_content for d in result["source_documents"]])
)
cost = estimate_cost(total_tokens, model)
st.caption(f"Estimated cost: ${cost:.4f} | Tokens: {total_tokens:,}")
# Add to history
st.session_state.chat_history.append({
"question": question,
"answer": result["answer"],
"sources": result["source_documents"]
})
except Exception as e:
st.error(f"❌ Error: {str(e)}")
# Quick examples
st.markdown("### 💡 Example Questions")
col1, col2, col3 = st.columns(3)
with col1:
if st.button("What are the main topics?"):
st.session_state.example_question = "What are the main topics discussed in these documents?"
with col2:
if st.button("Summarize key points"):
st.session_state.example_question = "Can you summarize the key points from these documents?"
with col3:
if st.button("What is mentioned about..."):
st.session_state.example_question = "What information is provided about [specific topic]?"
# TAB 3: Chat History
with tab3:
st.header("Chat History")
if not st.session_state.chat_history:
st.info("No questions asked yet. Go to 'Ask Questions' tab to start!")
else:
for i, item in enumerate(reversed(st.session_state.chat_history), 1):
with st.expander(f"Q{len(st.session_state.chat_history) - i + 1}: {item['question']}"):
st.markdown("**Answer:**")
st.write(item["answer"])
st.markdown("**Sources:**")
for j, doc in enumerate(item["sources"], 1):
st.caption(
f"{j}. {doc.metadata.get('source', 'Unknown')} "
f"(Page {doc.metadata.get('page', 'N/A')})"
)
if st.button("🗑️ Clear History"):
st.session_state.chat_history = []
st.rerun()
if __name__ == "__main__":
main()
Running the Application
1. Setup
# Create project directory
mkdir document-qa-system
cd document-qa-system
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
# Create .env file
echo "OPENAI_API_KEY=your-api-key-here" > .env
2. Run the App
streamlit run app.py
The app will open in your browser at
http://localhost:8501Usage Guide
Step 1: Enter API Key
- In the sidebar, enter your OpenAI API key
- Or set it in file
.env
Step 2: Upload Documents
- Go to "Upload Documents" tab
- Select one or more PDF files
- Click "Process Documents"
- Wait for processing to complete
Step 3: Ask Questions
- Go to "Ask Questions" tab
- Type your question
- Click "Get Answer"
- View answer and sources
Step 4: Review History
- Go to "Chat History" tab
- Review all previous questions and answers
- Clear history if needed
Advanced Features
1. Document Filtering
# Add to rag_system.py
def query_with_filter(self, question: str, source_filter: str = None):
"""Query with source filtering"""
if source_filter:
# Filter retriever by metadata
retriever = self.vectorstore.as_retriever(
search_kwargs={
"k": 4,
"filter": {"source": source_filter}
}
)
# Create temporary QA chain with filtered retriever
# ... (implementation)
2. Multi-language Support
# Add language detection
from langdetect import detect
def detect_language(text: str) -> str:
"""Detect text language"""
return detect(text)
# Adjust prompt based on language
if detect_language(question) == "es":
prompt_template = """Responde en español..."""
3. Export Q&A to PDF
# Add to app.py
from reportlab.pdfgen import canvas
def export_qa_to_pdf(chat_history, filename="qa_export.pdf"):
"""Export chat history to PDF"""
c = canvas.Canvas(filename)
y = 800
for item in chat_history:
c.drawString(50, y, f"Q: {item['question']}")
y -= 20
c.drawString(50, y, f"A: {item['answer']}")
y -= 40
c.save()
# Add button in UI
if st.button("📥 Export to PDF"):
export_qa_to_pdf(st.session_state.chat_history)
st.success("Exported to qa_export.pdf")
Production Deployment
Deploy to Streamlit Cloud
- Push code to GitHub
- Go to share.streamlit.io
- Connect repository
- Add secrets (API keys) in dashboard
- Deploy!
Environment Variables
# .streamlit/secrets.toml (for Streamlit Cloud)
OPENAI_API_KEY = "sk-..."
Docker Deployment
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8501
CMD ["streamlit", "run", "app.py", "--server.port=8501", "--server.address=0.0.0.0"]
# Build and run
docker build -t document-qa .
docker run -p 8501:8501 -e OPENAI_API_KEY=your-key document-qa
Best Practices
1. Error Handling
try:
result = rag_system.query(question)
except Exception as e:
logger.error(f"Query failed: {e}")
st.error("Sorry, something went wrong. Please try again.")
2. Rate Limiting
import time
from functools import wraps
def rate_limit(max_calls=10, time_window=60):
"""Limit API calls"""
calls = []
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
now = time.time()
calls[:] = [c for c in calls if c > now - time_window]
if len(calls) >= max_calls:
raise Exception("Rate limit exceeded")
calls.append(now)
return func(*args, **kwargs)
return wrapper
return decorator
3. Caching
@st.cache_data(ttl=3600)
def cached_query(question: str):
"""Cache queries for 1 hour"""
return rag_system.query(question)
Summary
You've built a production-ready document Q&A system with:
- PDF Upload: Multi-file support with duplicate detection
- Smart Processing: Intelligent chunking and embedding
- Semantic Search: Vector-based retrieval
- AI Answers: GPT-4 powered responses
- Source Citations: Transparent, verifiable answers
- Web Interface: Beautiful, user-friendly UI
- Chat History: Track conversations
This project demonstrates all core RAG concepts in a real application that you can deploy and use today!