Building a RAG System with LangChain
Now that you understand the concepts behind RAG, embeddings, and vector databases, let's put it all together and build a complete, production-ready RAG system using LangChain.
By the end of this lesson, you'll have a working RAG pipeline that can answer questions about any document.
The Complete RAG Pipeline
A RAG system has 6 key stages:
1. Load Documents (PDF, web, text, etc.)
2. Split into Chunks (preserve context)
3. Create Embeddings (semantic vectors)
4. Store in Vector DB (enable fast search)
5. Retrieve Relevant Chunks (similarity search)
6. Generate Answer (LLM + context)
Let's build each component step by step.
Setup and Installation
# Install required packages
pip install langchain langchain-openai langchain-community
pip install chromadb
pip install pypdf # For PDF support
pip install tiktoken # For token counting
# Import core dependencies
from langchain_openai import OpenAI, ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain_community.document_loaders import (
PyPDFLoader,
TextLoader,
DirectoryLoader
)
from langchain.schema import Document
import os
# Set API key
os.environ["OPENAI_API_KEY"] = "your-api-key-here"
Step 1: Document Loading
LangChain provides loaders for various document types:
Load PDF Documents
from langchain_community.document_loaders import PyPDFLoader
def load_pdf(file_path: str):
"""Load a PDF file"""
loader = PyPDFLoader(file_path)
documents = loader.load()
print(f"Loaded {len(documents)} pages from {file_path}")
print(f"First page preview: {documents[0].page_content[:200]}...")
return documents
# Example
docs = load_pdf("company_handbook.pdf")
# Each document has:
# - page_content: The actual text
# - metadata: {'source': 'handbook.pdf', 'page': 0}
Load Multiple PDFs from Directory
from langchain_community.document_loaders import DirectoryLoader
def load_pdf_directory(directory_path: str):
"""Load all PDFs from a directory"""
loader = DirectoryLoader(
directory_path,
glob="**/*.pdf", # Recursive PDF search
loader_cls=PyPDFLoader,
show_progress=True
)
documents = loader.load()
print(f"Loaded {len(documents)} pages from directory")
return documents
# Example
all_docs = load_pdf_directory("./company_docs/")
Load Text Files
from langchain_community.document_loaders import TextLoader
def load_text_file(file_path: str):
"""Load a text file"""
loader = TextLoader(file_path, encoding="utf-8")
documents = loader.load()
return documents
# Example
text_docs = load_text_file("policy.txt")
Load from Web
from langchain_community.document_loaders import WebBaseLoader
def load_web_page(url: str):
"""Load content from a web page"""
loader = WebBaseLoader(url)
documents = loader.load()
return documents
# Example
web_docs = load_web_page("https://example.com/documentation")
Custom Document Creation
from langchain.schema import Document
def create_custom_documents():
"""Create documents manually"""
documents = [
Document(
page_content="Our company was founded in 2020.",
metadata={"source": "about_us", "section": "history"}
),
Document(
page_content="We offer a 30-day money-back guarantee.",
metadata={"source": "policy", "section": "refunds"}
),
Document(
page_content="Customer support is available 24/7.",
metadata={"source": "support", "section": "contact"}
)
]
return documents
custom_docs = create_custom_documents()
Pro Tip: Always check the metadata after loading. It helps with source attribution and filtering during retrieval.
Step 2: Text Splitting (Chunking)
Documents need to be split into smaller chunks for effective retrieval.
Chunking Definition: The process of dividing large documents into smaller, semantically meaningful segments (chunks) that fit within LLM context windows while preserving context through overlapping text between adjacent chunks.
Why Chunking Matters
# ❌ Bad: Entire document as one chunk
# - Too large for context window
# - Diluted relevance
# - Poor retrieval accuracy
# ✅ Good: Smart chunking
# - Manageable size (500-1000 tokens)
# - Preserves context
# - Better retrieval precision
RecursiveCharacterTextSplitter
from langchain.text_splitter import RecursiveCharacterTextSplitter
def create_text_splitter(chunk_size=1000, chunk_overlap=200):
"""
Create a text splitter with smart defaults
Args:
chunk_size: Target size of each chunk (in characters)
chunk_overlap: Overlap between chunks to preserve context
"""
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", " ", ""] # Try to split on paragraphs first
)
return text_splitter
# Split documents
text_splitter = create_text_splitter()
chunks = text_splitter.split_documents(docs)
print(f"Original documents: {len(docs)}")
print(f"After splitting: {len(chunks)} chunks")
print(f"\nFirst chunk:\n{chunks[0].page_content}")
print(f"\nMetadata: {chunks[0].metadata}")
Token-Based Splitting
from langchain.text_splitter import TokenTextSplitter
def create_token_splitter(chunk_size=500, chunk_overlap=50):
"""
Split by tokens (more accurate for LLM context limits)
"""
text_splitter = TokenTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
return text_splitter
# Example
token_splitter = create_token_splitter()
token_chunks = token_splitter.split_documents(docs)
Semantic Splitting
from langchain.text_splitter import CharacterTextSplitter
def create_semantic_splitter():
"""
Split by semantic boundaries (paragraphs, sections)
"""
text_splitter = CharacterTextSplitter(
separator="\n\n", # Split on double newlines (paragraphs)
chunk_size=1000,
chunk_overlap=100,
length_function=len,
)
return text_splitter
semantic_splitter = create_semantic_splitter()
semantic_chunks = semantic_splitter.split_documents(docs)
Chunking Guidelines:
- Size: 500-1000 tokens per chunk (balance between context and precision)
- Overlap: 10-20% overlap to avoid cutting sentences
- Boundaries: Prefer natural breaks (paragraphs, sections)
- Metadata: Preserve source information in each chunk
Step 3: Create Embeddings
from langchain_openai import OpenAIEmbeddings
def create_embeddings():
"""Initialize embedding model"""
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small", # Cost-effective
# model="text-embedding-3-large", # Higher accuracy
)
return embeddings
# Create embeddings instance
embeddings = create_embeddings()
# Test single embedding
test_text = "What is the refund policy?"
test_embedding = embeddings.embed_query(test_text)
print(f"Embedding dimension: {len(test_embedding)}") # 1536
# Batch embed documents (more efficient)
texts = ["Text 1", "Text 2", "Text 3"]
doc_embeddings = embeddings.embed_documents(texts)
print(f"Embedded {len(doc_embeddings)} documents")
Step 4: Vector Store Setup
from langchain_community.vectorstores import Chroma
def create_vector_store(chunks, embeddings, persist_directory="./chroma_db"):
"""
Create and persist a vector store
Args:
chunks: List of Document objects
embeddings: Embedding model
persist_directory: Where to save the database
"""
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory=persist_directory
)
print(f"Created vector store with {vectorstore._collection.count()} documents")
return vectorstore
# Create vector store
vectorstore = create_vector_store(chunks, embeddings)
# Test similarity search
query = "What is the refund policy?"
results = vectorstore.similarity_search(query, k=3)
print(f"\nQuery: {query}")
print(f"Found {len(results)} relevant chunks:\n")
for i, doc in enumerate(results, 1):
print(f"{i}. {doc.page_content[:200]}...")
print(f" Source: {doc.metadata.get('source', 'N/A')}\n")
Loading Existing Vector Store
def load_vector_store(persist_directory="./chroma_db", embeddings=None):
"""Load a previously created vector store"""
if embeddings is None:
embeddings = create_embeddings()
vectorstore = Chroma(
persist_directory=persist_directory,
embedding_function=embeddings
)
print(f"Loaded vector store with {vectorstore._collection.count()} documents")
return vectorstore
# Load existing store
existing_vectorstore = load_vector_store()
Step 5: Retrieval
Retriever Definition: A component that searches the vector store for relevant documents based on a query, returning the top-k most similar chunks to be used as context for the LLM's response generation.
Basic Retriever
def create_retriever(vectorstore, search_kwargs=None):
"""
Create a retriever from vector store
Args:
vectorstore: The vector store
search_kwargs: Search parameters (e.g., {'k': 5})
"""
if search_kwargs is None:
search_kwargs = {"k": 4} # Return top 4 results
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs=search_kwargs
)
return retriever
# Create retriever
retriever = create_retriever(vectorstore)
# Test retrieval
query = "How do I return a product?"
docs = retriever.get_relevant_documents(query)
print(f"Retrieved {len(docs)} documents:")
for doc in docs:
print(f"- {doc.page_content[:100]}...")
Advanced Retrieval: MMR (Maximum Marginal Relevance)
MMR (Maximum Marginal Relevance) Definition: A retrieval strategy that balances relevance and diversity by selecting documents that are both similar to the query and different from already-selected documents, avoiding redundant results.
def create_mmr_retriever(vectorstore, k=4, fetch_k=20):
"""
Create MMR retriever for diverse results
MMR balances relevance and diversity to avoid redundant results
"""
retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={
"k": k, # Number of documents to return
"fetch_k": fetch_k # Fetch more, then select diverse k
}
)
return retriever
# MMR retriever
mmr_retriever = create_mmr_retriever(vectorstore)
diverse_docs = mmr_retriever.get_relevant_documents("refund policy")
Retrieval with Score Threshold
def create_threshold_retriever(vectorstore, score_threshold=0.7):
"""
Only return documents above a similarity threshold
"""
retriever = vectorstore.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": score_threshold}
)
return retriever
# Threshold retriever
threshold_retriever = create_threshold_retriever(vectorstore, score_threshold=0.75)
high_quality_docs = threshold_retriever.get_relevant_documents("shipping information")
Step 6: Generation (RAG Chain)
Basic QA Chain
from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI
def create_qa_chain(retriever):
"""
Create a question-answering chain
Args:
retriever: The document retriever
"""
llm = ChatOpenAI(
model="gpt-4",
temperature=0 # Deterministic for factual answers
)
<Callout type="info">
**RetrievalQA Chain Definition:** A LangChain component that combines a retriever and LLM to answer questions by first retrieving relevant documents, then generating responses based on the retrieved context.
</Callout>
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff", # "stuff" = put all docs in prompt
retriever=retriever,
return_source_documents=True # Include sources in response
)
return qa_chain
# Create chain
qa_chain = create_qa_chain(retriever)
# Ask questions!
def ask_question(chain, question):
"""Ask a question and get answer with sources"""
result = chain.invoke({"query": question})
print(f"Question: {question}\n")
print(f"Answer: {result['result']}\n")
print("Sources:")
for i, doc in enumerate(result['source_documents'], 1):
source = doc.metadata.get('source', 'Unknown')
page = doc.metadata.get('page', 'N/A')
print(f"{i}. {source} (page {page})")
print(f" Preview: {doc.page_content[:150]}...\n")
# Example
ask_question(qa_chain, "What is your refund policy?")
Advanced: Custom Prompt Chain
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA
def create_custom_qa_chain(retriever):
"""
Create QA chain with custom prompt
"""
# Custom prompt template
prompt_template = """You are a helpful customer service assistant. Use the following pieces of context to answer the question at the end.
If you don't know the answer based on the context, just say "I don't have that information in our documentation." Don't try to make up an answer.
Always cite the source of your information when possible.
Context:
{context}
Question: {question}
Helpful Answer:"""
PROMPT = PromptTemplate(
template=prompt_template,
input_variables=["context", "question"]
)
llm = ChatOpenAI(model="gpt-4", temperature=0)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True,
chain_type_kwargs={"prompt": PROMPT}
)
return qa_chain
# Use custom chain
custom_chain = create_custom_qa_chain(retriever)
ask_question(custom_chain, "Do you offer international shipping?")
Complete RAG System: Putting It All Together
import os
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain_community.document_loaders import DirectoryLoader, PyPDFLoader
from langchain.prompts import PromptTemplate
class RAGSystem:
"""
Complete RAG system for document Q&A
"""
def __init__(
self,
api_key: str,
persist_directory: str = "./chroma_db",
chunk_size: int = 1000,
chunk_overlap: int = 200
):
os.environ["OPENAI_API_KEY"] = api_key
self.embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
self.persist_directory = persist_directory
self.vectorstore = None
self.qa_chain = None
def load_documents(self, source_path: str, source_type: str = "pdf"):
"""
Load documents from various sources
Args:
source_path: Path to file or directory
source_type: "pdf", "text", or "directory"
"""
print(f"Loading documents from {source_path}...")
if source_type == "pdf":
loader = PyPDFLoader(source_path)
elif source_type == "directory":
loader = DirectoryLoader(
source_path,
glob="**/*.pdf",
loader_cls=PyPDFLoader
)
else:
raise ValueError(f"Unsupported source type: {source_type}")
documents = loader.load()
print(f"Loaded {len(documents)} documents")
return documents
def process_documents(self, documents):
"""
Split documents into chunks
"""
print("Splitting documents into chunks...")
chunks = self.text_splitter.split_documents(documents)
print(f"Created {len(chunks)} chunks")
return chunks
def create_vectorstore(self, chunks):
"""
Create and persist vector store
"""
print("Creating vector store...")
self.vectorstore = Chroma.from_documents(
documents=chunks,
embedding=self.embeddings,
persist_directory=self.persist_directory
)
print(f"Vector store created with {self.vectorstore._collection.count()} chunks")
def load_vectorstore(self):
"""
Load existing vector store
"""
print("Loading existing vector store...")
self.vectorstore = Chroma(
persist_directory=self.persist_directory,
embedding_function=self.embeddings
)
print(f"Loaded vector store with {self.vectorstore._collection.count()} chunks")
def setup_qa_chain(self, temperature=0, k=4):
"""
Setup the QA chain
"""
if self.vectorstore is None:
raise ValueError("Vector store not initialized. Load or create first.")
print("Setting up QA chain...")
# Custom prompt
prompt_template = """Use the following pieces of context to answer the question. If you don't know the answer, say so.
Context:
{context}
Question: {question}
Answer:"""
PROMPT = PromptTemplate(
template=prompt_template,
input_variables=["context", "question"]
)
llm = ChatOpenAI(model="gpt-4", temperature=temperature)
retriever = self.vectorstore.as_retriever(
search_kwargs={"k": k}
)
self.qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True,
chain_type_kwargs={"prompt": PROMPT}
)
print("QA chain ready!")
def query(self, question: str):
"""
Ask a question
"""
if self.qa_chain is None:
raise ValueError("QA chain not setup. Call setup_qa_chain() first.")
result = self.qa_chain.invoke({"query": question})
return {
"answer": result["result"],
"sources": [
{
"content": doc.page_content,
"metadata": doc.metadata
}
for doc in result["source_documents"]
]
}
def ingest_and_setup(self, source_path: str, source_type: str = "directory"):
"""
Complete pipeline: load, process, store, and setup
"""
# 1. Load documents
documents = self.load_documents(source_path, source_type)
# 2. Split into chunks
chunks = self.process_documents(documents)
# 3. Create vector store
self.create_vectorstore(chunks)
# 4. Setup QA chain
self.setup_qa_chain()
print("\n✅ RAG system ready!")
# ==================== USAGE ====================
# Initialize RAG system
rag = RAGSystem(
api_key="your-api-key",
persist_directory="./my_knowledge_base"
)
# Option 1: First time - ingest documents
rag.ingest_and_setup(
source_path="./company_docs/",
source_type="directory"
)
# Option 2: Later - load existing vector store
# rag.load_vectorstore()
# rag.setup_qa_chain()
# Ask questions
questions = [
"What is the company's refund policy?",
"How long does shipping take?",
"Do you offer customer support?",
]
for question in questions:
print(f"\n{'='*60}")
print(f"Q: {question}")
print('='*60)
result = rag.query(question)
print(f"\nA: {result['answer']}\n")
print("Sources:")
for i, source in enumerate(result['sources'], 1):
metadata = source['metadata']
print(f"{i}. {metadata.get('source', 'Unknown')} (page {metadata.get('page', 'N/A')})")
Complete System: This RAGSystem class provides a production-ready foundation. You can easily extend it with additional features like streaming, async queries, or multiple vector stores.
Optimization Techniques
1. Caching
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_query(question: str):
"""Cache recent queries to avoid redundant processing"""
return rag.query(question)
2. Streaming Responses
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
def create_streaming_chain(retriever):
"""Stream responses for better UX"""
llm = ChatOpenAI(
model="gpt-4",
temperature=0,
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()]
)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
retriever=retriever,
return_source_documents=True
)
return qa_chain
3. Async Processing
import asyncio
from langchain_openai import ChatOpenAI
async def async_query(chain, question):
"""Process queries asynchronously"""
result = await chain.ainvoke({"query": question})
return result
# Multiple queries in parallel
async def process_multiple_queries(questions):
tasks = [async_query(qa_chain, q) for q in questions]
results = await asyncio.gather(*tasks)
return results
# Usage
questions = ["Question 1", "Question 2", "Question 3"]
results = asyncio.run(process_multiple_queries(questions))
Common Pitfalls and Solutions
1. Chunk Size Too Large/Small
# ❌ Problem: Large chunks = diluted relevance
chunks = splitter.split_documents(docs, chunk_size=5000)
# ✅ Solution: Optimal chunk size
chunks = splitter.split_documents(docs, chunk_size=1000, chunk_overlap=200)
2. No Source Attribution
# ❌ Problem: Can't verify answers
qa_chain = RetrievalQA.from_chain_type(llm, retriever=retriever)
# ✅ Solution: Always return sources
qa_chain = RetrievalQA.from_chain_type(
llm,
retriever=retriever,
return_source_documents=True # ✅
)
3. Poor Retrieval
# ❌ Problem: Only retrieving 1 document
retriever = vectorstore.as_retriever(search_kwargs={"k": 1})
# ✅ Solution: Retrieve more for context
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
Summary
You've built a complete RAG system! The key components are:
- Document Loading: Support multiple formats (PDF, text, web)
- Chunking: Split intelligently with overlap
- Embeddings: Convert text to semantic vectors
- Vector Store: Enable fast similarity search
- Retrieval: Find relevant context
- Generation: LLM creates answers from context
This foundation scales from prototypes to production systems.