Back
intermediate
RAG (Retrieval-Augmented Generation)

Building a RAG System with LangChain

Learn to build a complete RAG pipeline from scratch: document loading, chunking, embedding, vector storage, retrieval, and generation with LangChain.

35 min read· RAG· LangChain· Pipeline· Document Processing

Building a RAG System with LangChain

Now that you understand the concepts behind RAG, embeddings, and vector databases, let's put it all together and build a complete, production-ready RAG system using LangChain.

By the end of this lesson, you'll have a working RAG pipeline that can answer questions about any document.

The Complete RAG Pipeline

A RAG system has 6 key stages:

1. Load Documents (PDF, web, text, etc.)
2. Split into Chunks (preserve context)
3. Create Embeddings (semantic vectors)
4. Store in Vector DB (enable fast search)
5. Retrieve Relevant Chunks (similarity search)
6. Generate Answer (LLM + context)

Let's build each component step by step.

Setup and Installation

bash
# Install required packages
pip install langchain langchain-openai langchain-community
pip install chromadb
pip install pypdf  # For PDF support
pip install tiktoken  # For token counting
python
# Import core dependencies
from langchain_openai import OpenAI, ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain_community.document_loaders import (
    PyPDFLoader,
    TextLoader,
    DirectoryLoader
)
from langchain.schema import Document
import os

# Set API key
os.environ["OPENAI_API_KEY"] = "your-api-key-here"

Step 1: Document Loading

LangChain provides loaders for various document types:

Load PDF Documents

python
from langchain_community.document_loaders import PyPDFLoader

def load_pdf(file_path: str):
    """Load a PDF file"""
    loader = PyPDFLoader(file_path)
    documents = loader.load()

    print(f"Loaded {len(documents)} pages from {file_path}")
    print(f"First page preview: {documents[0].page_content[:200]}...")

    return documents

# Example
docs = load_pdf("company_handbook.pdf")

# Each document has:
# - page_content: The actual text
# - metadata: {'source': 'handbook.pdf', 'page': 0}

Load Multiple PDFs from Directory

python
from langchain_community.document_loaders import DirectoryLoader

def load_pdf_directory(directory_path: str):
    """Load all PDFs from a directory"""
    loader = DirectoryLoader(
        directory_path,
        glob="**/*.pdf",  # Recursive PDF search
        loader_cls=PyPDFLoader,
        show_progress=True
    )
    documents = loader.load()

    print(f"Loaded {len(documents)} pages from directory")
    return documents

# Example
all_docs = load_pdf_directory("./company_docs/")

Load Text Files

python
from langchain_community.document_loaders import TextLoader

def load_text_file(file_path: str):
    """Load a text file"""
    loader = TextLoader(file_path, encoding="utf-8")
    documents = loader.load()
    return documents

# Example
text_docs = load_text_file("policy.txt")

Load from Web

python
from langchain_community.document_loaders import WebBaseLoader

def load_web_page(url: str):
    """Load content from a web page"""
    loader = WebBaseLoader(url)
    documents = loader.load()
    return documents

# Example
web_docs = load_web_page("https://example.com/documentation")

Custom Document Creation

python
from langchain.schema import Document

def create_custom_documents():
    """Create documents manually"""
    documents = [
        Document(
            page_content="Our company was founded in 2020.",
            metadata={"source": "about_us", "section": "history"}
        ),
        Document(
            page_content="We offer a 30-day money-back guarantee.",
            metadata={"source": "policy", "section": "refunds"}
        ),
        Document(
            page_content="Customer support is available 24/7.",
            metadata={"source": "support", "section": "contact"}
        )
    ]
    return documents

custom_docs = create_custom_documents()

Pro Tip: Always check the metadata after loading. It helps with source attribution and filtering during retrieval.

Step 2: Text Splitting (Chunking)

Documents need to be split into smaller chunks for effective retrieval.

Chunking Definition: The process of dividing large documents into smaller, semantically meaningful segments (chunks) that fit within LLM context windows while preserving context through overlapping text between adjacent chunks.

Why Chunking Matters

python
# ❌ Bad: Entire document as one chunk
# - Too large for context window
# - Diluted relevance
# - Poor retrieval accuracy

# ✅ Good: Smart chunking
# - Manageable size (500-1000 tokens)
# - Preserves context
# - Better retrieval precision

RecursiveCharacterTextSplitter

python
from langchain.text_splitter import RecursiveCharacterTextSplitter

def create_text_splitter(chunk_size=1000, chunk_overlap=200):
    """
    Create a text splitter with smart defaults

    Args:
        chunk_size: Target size of each chunk (in characters)
        chunk_overlap: Overlap between chunks to preserve context
    """
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", " ", ""]  # Try to split on paragraphs first
    )
    return text_splitter

# Split documents
text_splitter = create_text_splitter()
chunks = text_splitter.split_documents(docs)

print(f"Original documents: {len(docs)}")
print(f"After splitting: {len(chunks)} chunks")
print(f"\nFirst chunk:\n{chunks[0].page_content}")
print(f"\nMetadata: {chunks[0].metadata}")

Token-Based Splitting

python
from langchain.text_splitter import TokenTextSplitter

def create_token_splitter(chunk_size=500, chunk_overlap=50):
    """
    Split by tokens (more accurate for LLM context limits)
    """
    text_splitter = TokenTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )
    return text_splitter

# Example
token_splitter = create_token_splitter()
token_chunks = token_splitter.split_documents(docs)

Semantic Splitting

python
from langchain.text_splitter import CharacterTextSplitter

def create_semantic_splitter():
    """
    Split by semantic boundaries (paragraphs, sections)
    """
    text_splitter = CharacterTextSplitter(
        separator="\n\n",  # Split on double newlines (paragraphs)
        chunk_size=1000,
        chunk_overlap=100,
        length_function=len,
    )
    return text_splitter

semantic_splitter = create_semantic_splitter()
semantic_chunks = semantic_splitter.split_documents(docs)

Chunking Guidelines:

  • Size: 500-1000 tokens per chunk (balance between context and precision)
  • Overlap: 10-20% overlap to avoid cutting sentences
  • Boundaries: Prefer natural breaks (paragraphs, sections)
  • Metadata: Preserve source information in each chunk

Step 3: Create Embeddings

python
from langchain_openai import OpenAIEmbeddings

def create_embeddings():
    """Initialize embedding model"""
    embeddings = OpenAIEmbeddings(
        model="text-embedding-3-small",  # Cost-effective
        # model="text-embedding-3-large",  # Higher accuracy
    )
    return embeddings

# Create embeddings instance
embeddings = create_embeddings()

# Test single embedding
test_text = "What is the refund policy?"
test_embedding = embeddings.embed_query(test_text)
print(f"Embedding dimension: {len(test_embedding)}")  # 1536

# Batch embed documents (more efficient)
texts = ["Text 1", "Text 2", "Text 3"]
doc_embeddings = embeddings.embed_documents(texts)
print(f"Embedded {len(doc_embeddings)} documents")

Step 4: Vector Store Setup

python
from langchain_community.vectorstores import Chroma

def create_vector_store(chunks, embeddings, persist_directory="./chroma_db"):
    """
    Create and persist a vector store

    Args:
        chunks: List of Document objects
        embeddings: Embedding model
        persist_directory: Where to save the database
    """
    vectorstore = Chroma.from_documents(
        documents=chunks,
        embedding=embeddings,
        persist_directory=persist_directory
    )

    print(f"Created vector store with {vectorstore._collection.count()} documents")
    return vectorstore

# Create vector store
vectorstore = create_vector_store(chunks, embeddings)

# Test similarity search
query = "What is the refund policy?"
results = vectorstore.similarity_search(query, k=3)

print(f"\nQuery: {query}")
print(f"Found {len(results)} relevant chunks:\n")
for i, doc in enumerate(results, 1):
    print(f"{i}. {doc.page_content[:200]}...")
    print(f"   Source: {doc.metadata.get('source', 'N/A')}\n")

Loading Existing Vector Store

python
def load_vector_store(persist_directory="./chroma_db", embeddings=None):
    """Load a previously created vector store"""
    if embeddings is None:
        embeddings = create_embeddings()

    vectorstore = Chroma(
        persist_directory=persist_directory,
        embedding_function=embeddings
    )

    print(f"Loaded vector store with {vectorstore._collection.count()} documents")
    return vectorstore

# Load existing store
existing_vectorstore = load_vector_store()

Step 5: Retrieval

Retriever Definition: A component that searches the vector store for relevant documents based on a query, returning the top-k most similar chunks to be used as context for the LLM's response generation.

Basic Retriever

python
def create_retriever(vectorstore, search_kwargs=None):
    """
    Create a retriever from vector store

    Args:
        vectorstore: The vector store
        search_kwargs: Search parameters (e.g., {'k': 5})
    """
    if search_kwargs is None:
        search_kwargs = {"k": 4}  # Return top 4 results

    retriever = vectorstore.as_retriever(
        search_type="similarity",
        search_kwargs=search_kwargs
    )

    return retriever

# Create retriever
retriever = create_retriever(vectorstore)

# Test retrieval
query = "How do I return a product?"
docs = retriever.get_relevant_documents(query)

print(f"Retrieved {len(docs)} documents:")
for doc in docs:
    print(f"- {doc.page_content[:100]}...")

Advanced Retrieval: MMR (Maximum Marginal Relevance)

MMR (Maximum Marginal Relevance) Definition: A retrieval strategy that balances relevance and diversity by selecting documents that are both similar to the query and different from already-selected documents, avoiding redundant results.

python
def create_mmr_retriever(vectorstore, k=4, fetch_k=20):
    """
    Create MMR retriever for diverse results

    MMR balances relevance and diversity to avoid redundant results
    """
    retriever = vectorstore.as_retriever(
        search_type="mmr",
        search_kwargs={
            "k": k,          # Number of documents to return
            "fetch_k": fetch_k  # Fetch more, then select diverse k
        }
    )
    return retriever

# MMR retriever
mmr_retriever = create_mmr_retriever(vectorstore)
diverse_docs = mmr_retriever.get_relevant_documents("refund policy")

Retrieval with Score Threshold

python
def create_threshold_retriever(vectorstore, score_threshold=0.7):
    """
    Only return documents above a similarity threshold
    """
    retriever = vectorstore.as_retriever(
        search_type="similarity_score_threshold",
        search_kwargs={"score_threshold": score_threshold}
    )
    return retriever

# Threshold retriever
threshold_retriever = create_threshold_retriever(vectorstore, score_threshold=0.75)
high_quality_docs = threshold_retriever.get_relevant_documents("shipping information")

Step 6: Generation (RAG Chain)

Basic QA Chain

python
from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI

def create_qa_chain(retriever):
    """
    Create a question-answering chain

    Args:
        retriever: The document retriever
    """
    llm = ChatOpenAI(
        model="gpt-4",
        temperature=0  # Deterministic for factual answers
    )

    <Callout type="info">
**RetrievalQA Chain Definition:** A LangChain component that combines a retriever and LLM to answer questions by first retrieving relevant documents, then generating responses based on the retrieved context.
</Callout>

    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",  # "stuff" = put all docs in prompt
        retriever=retriever,
        return_source_documents=True  # Include sources in response
    )

    return qa_chain

# Create chain
qa_chain = create_qa_chain(retriever)

# Ask questions!
def ask_question(chain, question):
    """Ask a question and get answer with sources"""
    result = chain.invoke({"query": question})

    print(f"Question: {question}\n")
    print(f"Answer: {result['result']}\n")
    print("Sources:")
    for i, doc in enumerate(result['source_documents'], 1):
        source = doc.metadata.get('source', 'Unknown')
        page = doc.metadata.get('page', 'N/A')
        print(f"{i}. {source} (page {page})")
        print(f"   Preview: {doc.page_content[:150]}...\n")

# Example
ask_question(qa_chain, "What is your refund policy?")

Advanced: Custom Prompt Chain

python
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA

def create_custom_qa_chain(retriever):
    """
    Create QA chain with custom prompt
    """
    # Custom prompt template
    prompt_template = """You are a helpful customer service assistant. Use the following pieces of context to answer the question at the end.

If you don't know the answer based on the context, just say "I don't have that information in our documentation." Don't try to make up an answer.

Always cite the source of your information when possible.

Context:
{context}

Question: {question}

Helpful Answer:"""

    PROMPT = PromptTemplate(
        template=prompt_template,
        input_variables=["context", "question"]
    )

    llm = ChatOpenAI(model="gpt-4", temperature=0)

    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=retriever,
        return_source_documents=True,
        chain_type_kwargs={"prompt": PROMPT}
    )

    return qa_chain

# Use custom chain
custom_chain = create_custom_qa_chain(retriever)
ask_question(custom_chain, "Do you offer international shipping?")

Complete RAG System: Putting It All Together

python
import os
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain_community.document_loaders import DirectoryLoader, PyPDFLoader
from langchain.prompts import PromptTemplate

class RAGSystem:
    """
    Complete RAG system for document Q&A
    """

    def __init__(
        self,
        api_key: str,
        persist_directory: str = "./chroma_db",
        chunk_size: int = 1000,
        chunk_overlap: int = 200
    ):
        os.environ["OPENAI_API_KEY"] = api_key

        self.embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap
        )
        self.persist_directory = persist_directory
        self.vectorstore = None
        self.qa_chain = None

    def load_documents(self, source_path: str, source_type: str = "pdf"):
        """
        Load documents from various sources

        Args:
            source_path: Path to file or directory
            source_type: "pdf", "text", or "directory"
        """
        print(f"Loading documents from {source_path}...")

        if source_type == "pdf":
            loader = PyPDFLoader(source_path)
        elif source_type == "directory":
            loader = DirectoryLoader(
                source_path,
                glob="**/*.pdf",
                loader_cls=PyPDFLoader
            )
        else:
            raise ValueError(f"Unsupported source type: {source_type}")

        documents = loader.load()
        print(f"Loaded {len(documents)} documents")

        return documents

    def process_documents(self, documents):
        """
        Split documents into chunks
        """
        print("Splitting documents into chunks...")
        chunks = self.text_splitter.split_documents(documents)
        print(f"Created {len(chunks)} chunks")
        return chunks

    def create_vectorstore(self, chunks):
        """
        Create and persist vector store
        """
        print("Creating vector store...")
        self.vectorstore = Chroma.from_documents(
            documents=chunks,
            embedding=self.embeddings,
            persist_directory=self.persist_directory
        )
        print(f"Vector store created with {self.vectorstore._collection.count()} chunks")

    def load_vectorstore(self):
        """
        Load existing vector store
        """
        print("Loading existing vector store...")
        self.vectorstore = Chroma(
            persist_directory=self.persist_directory,
            embedding_function=self.embeddings
        )
        print(f"Loaded vector store with {self.vectorstore._collection.count()} chunks")

    def setup_qa_chain(self, temperature=0, k=4):
        """
        Setup the QA chain
        """
        if self.vectorstore is None:
            raise ValueError("Vector store not initialized. Load or create first.")

        print("Setting up QA chain...")

        # Custom prompt
        prompt_template = """Use the following pieces of context to answer the question. If you don't know the answer, say so.

Context:
{context}

Question: {question}

Answer:"""

        PROMPT = PromptTemplate(
            template=prompt_template,
            input_variables=["context", "question"]
        )

        llm = ChatOpenAI(model="gpt-4", temperature=temperature)

        retriever = self.vectorstore.as_retriever(
            search_kwargs={"k": k}
        )

        self.qa_chain = RetrievalQA.from_chain_type(
            llm=llm,
            chain_type="stuff",
            retriever=retriever,
            return_source_documents=True,
            chain_type_kwargs={"prompt": PROMPT}
        )

        print("QA chain ready!")

    def query(self, question: str):
        """
        Ask a question
        """
        if self.qa_chain is None:
            raise ValueError("QA chain not setup. Call setup_qa_chain() first.")

        result = self.qa_chain.invoke({"query": question})

        return {
            "answer": result["result"],
            "sources": [
                {
                    "content": doc.page_content,
                    "metadata": doc.metadata
                }
                for doc in result["source_documents"]
            ]
        }

    def ingest_and_setup(self, source_path: str, source_type: str = "directory"):
        """
        Complete pipeline: load, process, store, and setup
        """
        # 1. Load documents
        documents = self.load_documents(source_path, source_type)

        # 2. Split into chunks
        chunks = self.process_documents(documents)

        # 3. Create vector store
        self.create_vectorstore(chunks)

        # 4. Setup QA chain
        self.setup_qa_chain()

        print("\n✅ RAG system ready!")


# ==================== USAGE ====================

# Initialize RAG system
rag = RAGSystem(
    api_key="your-api-key",
    persist_directory="./my_knowledge_base"
)

# Option 1: First time - ingest documents
rag.ingest_and_setup(
    source_path="./company_docs/",
    source_type="directory"
)

# Option 2: Later - load existing vector store
# rag.load_vectorstore()
# rag.setup_qa_chain()

# Ask questions
questions = [
    "What is the company's refund policy?",
    "How long does shipping take?",
    "Do you offer customer support?",
]

for question in questions:
    print(f"\n{'='*60}")
    print(f"Q: {question}")
    print('='*60)

    result = rag.query(question)

    print(f"\nA: {result['answer']}\n")
    print("Sources:")
    for i, source in enumerate(result['sources'], 1):
        metadata = source['metadata']
        print(f"{i}. {metadata.get('source', 'Unknown')} (page {metadata.get('page', 'N/A')})")

Complete System: This RAGSystem class provides a production-ready foundation. You can easily extend it with additional features like streaming, async queries, or multiple vector stores.

Optimization Techniques

1. Caching

python
from functools import lru_cache

@lru_cache(maxsize=100)
def cached_query(question: str):
    """Cache recent queries to avoid redundant processing"""
    return rag.query(question)

2. Streaming Responses

python
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

def create_streaming_chain(retriever):
    """Stream responses for better UX"""
    llm = ChatOpenAI(
        model="gpt-4",
        temperature=0,
        streaming=True,
        callbacks=[StreamingStdOutCallbackHandler()]
    )

    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        retriever=retriever,
        return_source_documents=True
    )

    return qa_chain

3. Async Processing

python
import asyncio
from langchain_openai import ChatOpenAI

async def async_query(chain, question):
    """Process queries asynchronously"""
    result = await chain.ainvoke({"query": question})
    return result

# Multiple queries in parallel
async def process_multiple_queries(questions):
    tasks = [async_query(qa_chain, q) for q in questions]
    results = await asyncio.gather(*tasks)
    return results

# Usage
questions = ["Question 1", "Question 2", "Question 3"]
results = asyncio.run(process_multiple_queries(questions))

Common Pitfalls and Solutions

1. Chunk Size Too Large/Small

python
# ❌ Problem: Large chunks = diluted relevance
chunks = splitter.split_documents(docs, chunk_size=5000)

# ✅ Solution: Optimal chunk size
chunks = splitter.split_documents(docs, chunk_size=1000, chunk_overlap=200)

2. No Source Attribution

python
# ❌ Problem: Can't verify answers
qa_chain = RetrievalQA.from_chain_type(llm, retriever=retriever)

# ✅ Solution: Always return sources
qa_chain = RetrievalQA.from_chain_type(
    llm,
    retriever=retriever,
    return_source_documents=True  # ✅
)

3. Poor Retrieval

python
# ❌ Problem: Only retrieving 1 document
retriever = vectorstore.as_retriever(search_kwargs={"k": 1})

# ✅ Solution: Retrieve more for context
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})

Summary

You've built a complete RAG system! The key components are:

  1. Document Loading: Support multiple formats (PDF, text, web)
  2. Chunking: Split intelligently with overlap
  3. Embeddings: Convert text to semantic vectors
  4. Vector Store: Enable fast similarity search
  5. Retrieval: Find relevant context
  6. Generation: LLM creates answers from context

This foundation scales from prototypes to production systems.