Building a RAG Pipeline with LangChain and ChromaDB
AI-generated content may contain errors. Always verify against official sources.
By the end of this tutorial you will have a working Retrieval-Augmented Generation (RAG) pipeline that ingests your own documents, stores their embeddings in ChromaDB, and answers natural-language questions with cited sources — all streamed token-by-token to the terminal.
What You'll Learn
- Why RAG exists and when to use it over fine-tuning
- How to chunk and embed documents with LangChain text splitters
- How to persist and query a local ChromaDB vector store
- How to wire a retrieval chain that cites its sources
- How to stream responses and handle rate-limit retries
- How to count tokens and estimate API costs before every call
- How to version your prompts in external files
- How to evaluate retrieval quality with a simple hit-rate metric
- Python 3.10 or higher installed
- An OpenAI API key (or an Anthropic key — we show both)
- Basic familiarity with the OpenAI Python SDK (
openai>=1.0) pipandvenvavailable on your machine- A folder of
.txtor.pdfdocuments you want to query (we provide a sample)
1. Project Setup
1.1 Create a Virtual Environment
$ python -m venv .venv
$ source .venv/bin/activate # Windows: .venv\Scripts\activate
1.2 Install Dependencies
Pin every version so the tutorial stays reproducible:
$ pip install \
langchain==0.2.16 \
langchain-openai==0.1.23 \
langchain-anthropic==0.1.23 \
langchain-community==0.2.16 \
chromadb==0.5.5 \
tiktoken==0.7.0 \
pypdf==4.3.1 \
tenacity==8.5.0 \
python-dotenv==1.0.1
Run pip freeze > requirements.txt after installation to lock your environment for collaborators.
1.3 Configure Environment Variables
Committing an API key to Git — even for a second — can result in immediate compromise. Always load secrets from the environment.
Create a .env file at the project root:
OPENAI_API_KEY=sk-... # replace with your key
ANTHROPIC_API_KEY=sk-ant-... # optional: only needed for the Anthropic tab
CHROMA_PERSIST_DIR=./chroma_db
Add .env to .gitignore immediately:
$ echo ".env" >> .gitignore
1.4 Project Structure
rag-demo/
├── .env
├── .gitignore
├── requirements.txt
├── docs/ ← drop your .txt / .pdf files here
│ └── sample.txt
├── prompts/
│ └── rag_prompt.txt ← versioned prompt template
├── main.py
└── evaluate.py
Create the folders:
$ mkdir -p docs prompts
2. Understanding RAG
Before writing code, here's the mental model you need:
User Question
│
▼
[Embeddings Model] ──► Query Vector
│
▼
[ChromaDB Vector Store]
│
top-k similar chunks
│
▼
[LLM] ◄── Prompt = Question + Chunks
│
▼
Grounded Answer + Sources
RAG solves the knowledge cut-off and hallucination problems by grounding the LLM in your documents at inference time — no fine-tuning required.
Fine-tuning bakes knowledge into weights permanently; you'd need to retrain whenever documents change. RAG lets you update the knowledge base by re-ingesting documents, which takes seconds.
3. Document Ingestion
3.1 Add a Sample Document
$ cat > docs/sample.txt << 'EOF'
LangChain is an open-source framework for building LLM-powered applications.
It provides abstractions for chains, agents, memory, and retrieval.
ChromaDB is an AI-native open-source vector database that stores and retrieves
embeddings with millisecond latency. Together they form the backbone of many
production RAG systems. LangChain 0.2 introduced a stable LCEL (LangChain
Expression Language) interface that composes chains using the pipe operator (|).
EOF
3.2 Load and Split Documents
import os
from dotenv import load_dotenv
from langchain_community.document_loaders import TextLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
load_dotenv()
DOCS_DIR = "docs"
CHROMA_DIR = os.getenv("CHROMA_PERSIST_DIR", "./chroma_db")
def load_documents(directory: str):
"""Load .txt and .pdf files from a directory."""
docs = []
for fname in os.listdir(directory):
path = os.path.join(directory, fname)
if fname.endswith(".txt"):
docs.extend(TextLoader(path, encoding="utf-8").load())
elif fname.endswith(".pdf"):
docs.extend(PyPDFLoader(path).load())
print(f"Loaded {len(docs)} document(s) from '{directory}'")
return docs
def split_documents(docs, chunk_size=500, chunk_overlap=50):
"""Split documents into overlapping chunks for better retrieval."""
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
)
chunks = splitter.split_documents(docs)
print(f"Split into {len(chunks)} chunk(s)")
return chunks
def build_vectorstore(chunks):
"""Embed chunks and persist them to ChromaDB."""
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory=CHROMA_DIR,
collection_name="rag_demo",
)
print(f"Vector store persisted at '{CHROMA_DIR}'")
return vectorstore
if __name__ == "__main__":
raw_docs = load_documents(DOCS_DIR)
chunks = split_documents(raw_docs)
build_vectorstore(chunks)
What this does: RecursiveCharacterTextSplitter tries to split on paragraph breaks first, then sentences, then words — keeping semantic coherence. A chunk_overlap of 50 characters ensures context isn't lost at boundaries.
Run ingestion:
$ python src/ingest.py
Expected output:
Loaded 1 document(s) from 'docs'
Split into 3 chunk(s)
Vector store persisted at './chroma_db'
Too large (>1000 chars) → less precise retrieval; too small (<100 chars) → chunks lose context. Start at 400–600 and tune based on your documents.
4. Prompt Versioning
Store your prompt in a file — never inline it in Python. This lets you A/B test prompts without touching code.
You are a helpful assistant. Use ONLY the context below to answer the question.
If the answer is not contained in the context, say "I don't have enough information."
Always cite the source filename at the end of your answer.
Context:
{context}
Question: {question}
Answer:
Load it in Python:
from pathlib import Path
def load_prompt(name: str) -> str:
"""Load a prompt template from the prompts/ directory."""
path = Path("prompts") / name
return path.read_text(encoding="utf-8")
Commit prompts/ to Git and tag releases (e.g., rag_prompt_v2.txt). Link prompt version to evaluation scores in your experiment tracker.
5. Building the Retrieval Chain
5.1 Token Counting and Cost Estimation
Always estimate cost before sending a request — especially during development:
import tiktoken
PRICE_PER_1K = {
"gpt-4o": {"input": 0.005, "output": 0.015},
"gpt-4o-mini": {"input": 0.00015,"output": 0.0006},
"text-embedding-3-small":{"input": 0.00002,"output": 0.0},
}
def count_tokens(text: str, model: str = "gpt-4o-mini") -> int:
enc = tiktoken.encoding_for_model(model)
return len(enc.encode(text))
def estimate_cost(prompt: str, model: str = "gpt-4o-mini") -> float:
n_tokens = count_tokens(prompt, model)
price = PRICE_PER_1K.get(model, {}).get("input", 0)
cost_usd = (n_tokens / 1000) * price
print(f" ↳ ~{n_tokens} tokens | estimated input cost: ${cost_usd:.6f}")
return cost_usd
5.2 Rate-Limit Retry Decorator
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
from openai import RateLimitError
@retry(retry=retry_if_exception_type(RateLimitError),
wait=wait_exponential(multiplier=1, min=2, max=60),
stop=stop_after_attempt(5))
def call_with_retry(chain, inputs: dict):
"""Invoke a LangChain chain with automatic exponential back-off."""
return chain.invoke(inputs)
This decorator retries up to 5 times with exponential back-off (2 s → 4 s → 8 s … up to 60 s) whenever OpenAI returns a 429.
5.3 The RAG Chain
- OpenAI
- Anthropic
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from src.prompt_loader import load_prompt
from src.token_utils import estimate_cost
load_dotenv()
CHROMA_DIR = os.getenv("CHROMA_PERSIST_DIR", "./chroma_db")
def format_docs(docs) -> str:
parts = []
for d in docs:
source = d.metadata.get("source", "unknown")
parts.append(f"[{source}]\n{d.page_content}")
return "\n\n---\n\n".join(parts)
def build_rag_chain(model: str = "gpt-4o-mini", top_k: int = 4):
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma(
persist_directory=CHROMA_DIR,
embedding_function=embeddings,
collection_name="rag_demo",
)
retriever = vectorstore.as_retriever(search_kwargs={"k": top_k})
template = load_prompt("rag_prompt.txt")
prompt = PromptTemplate.from_template(template)
llm = ChatOpenAI(
model=model,
temperature=0,
streaming=True, # ← enable streaming
)
chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
return chain, retriever
import os
from dotenv import load_dotenv
from langchain_anthropic import ChatAnthropic
from langchain_openai import OpenAIEmbeddings # still use OpenAI for embeddings
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from src.prompt_loader import load_prompt
load_dotenv()
CHROMA_DIR = os.getenv("CHROMA_PERSIST_DIR", "./chroma_db")
def format_docs(docs) -> str:
parts = []
for d in docs:
source = d.metadata.get("source", "unknown")
parts.append(f"[{source}]\n{d.page_content}")
return "\n\n---\n\n".join(parts)
def build_rag_chain_anthropic(model: str = "claude-3-5-haiku-20241022", top_k: int = 4):
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma(
persist_directory=CHROMA_DIR,
embedding_function=embeddings,
collection_name="rag_demo",
)
retriever = vectorstore.as_retriever(search_kwargs={"k": top_k})
template = load_prompt("rag_prompt.txt")
prompt = PromptTemplate.from_template(template)
llm = ChatAnthropic(model=model, temperature=0, streaming=True)
chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
return chain, retriever
What this does: The LCEL pipe (|) composes a DAG: the retriever fetches the top-k chunks, format_docs serialises them with source citations, the prompt template injects them, and the LLM streams the answer character by character.
6. Streaming the Answer
import sys
from src.rag_chain import build_rag_chain
from src.token_utils import estimate_cost
from src.retry_utils import call_with_retry
def main():
question = " ".join(sys.argv[1:]) or "What is LangChain?"
print(f"\n🔍 Question: {question}\n")
chain, retriever = build_rag_chain(model="gpt-4o-mini", top_k=4)
# Retrieve and cost-estimate before streaming
docs = retriever.invoke(question)
context_text = "\n".join(d.page_content for d in docs)
estimate_cost(context_text + question, model="gpt-4o-mini")
print("💬 Answer (streaming):\n")
for token in chain.stream(question):
print(token, end="", flush=True)
print("\n\n📄 Sources:")
for d in docs:
print(f" • {d.metadata.get('source', 'unknown')}")
if __name__ == "__main__":
main()
Run it:
$ python main.py "What is LCEL in LangChain?"
Expected output:
🔍 Question: What is LCEL in LangChain?
↳ ~312 tokens | estimated input cost: $0.000047
💬 Answer (streaming):
LCEL (LangChain Expression Language) is a stable interface introduced in
LangChain 0.2 that composes chains using the pipe operator (|). It allows
you to chain components declaratively... [source: docs/sample.txt]
📄 Sources:
• docs/sample.txt
When serving RAG over HTTP, use FastAPI's StreamingResponse with an async for token in chain.astream(question) generator to forward tokens to the browser in real time.
7. Evaluating Retrieval Quality
A pipeline that retrieves wrong chunks will hallucinate even with a perfect LLM. Measure hit rate — the fraction of questions for which the correct chunk appears in the top-k results.
"""
Simple hit-rate evaluator.
Requires a gold_set.json file:
[{"question": "...", "expected_source": "docs/sample.txt"}, ...]
"""
import json
from src.rag_chain import build_rag_chain
def hit_rate(gold_path: str = "gold_set.json", top_k: int = 4) -> float:
with open(gold_path) as f:
gold = json.load(f)
_, retriever = build_rag_chain(top_k=top_k)
hits = 0
for item in gold:
docs = retriever.invoke(item["question"])
sources = [d.metadata.get("source", "") for d in docs]
if any(item["expected_source"] in s for s in sources):
hits += 1
rate = hits / len(gold)
print(f"Hit rate @{top_k}: {rate:.2%} ({hits}/{len(gold)})")
return rate
if __name__ == "__main__":
hit_rate()
Create a minimal gold set:
[
{
"question": "What pipe operator does LCEL use?",
"expected_source": "docs/sample.txt"
}
]
$ python evaluate.py
Hit rate @4: 100.00% (1/1)
For production, consider RAGAS which measures faithfulness, answer relevancy, and context precision automatically using an LLM-as-judge approach.
8. Troubleshooting
🛑 `AuthenticationError: Incorrect API key`
- Confirm
.envis in the project root, not insidesrc/. - Verify
load_dotenv()is called before any LangChain import that reads env vars. - Check for invisible characters:
cat -A .env | grep OPENAI.
🛑 `InvalidRequestError: This model's maximum context length is 16385 tokens`
Your retrieved chunks plus the prompt exceed the model's context window.
- Reduce
top_kfrom 4 to 2. - Reduce
chunk_sizein the splitter. - Switch to
gpt-4o(128 k context) for large-document use cases.
🛑 `chromadb.errors.UniqueConstraintError` on re-ingestion
ChromaDB already has documents with the same IDs. Either:
- Delete the persist directory:
rm -rf ./chroma_dband re-run ingestion, or - Use
Chroma.from_documents(..., ids=[...])with content-hashed IDs for idempotent upserts.
🛑 Answers are vague or hallucinated
- Print the retrieved chunks:
for d in retriever.invoke(q): print(d.page_content). - If chunks look irrelevant, your
chunk_sizeis too large — split more aggressively. - If chunks are correct but the answer is wrong, tighten the prompt: add "Do NOT speculate. Only use the provided context."
🛑 `RateLimitError` even with retry logic
- Check your OpenAI tier — free-tier accounts have very low RPM (requests per minute).
- Add
time.sleep(1)between batch embedding calls during ingestion. - Consider batching documents:
embeddings.embed_documents(texts, chunk_size=100).
9. Next Steps
You now have a working RAG pipeline. Here's where to take it next:
| Topic | Link |
|---|---|
| Add conversational memory (multi-turn RAG) | Advanced RAG |
| Serve your pipeline as a REST API | API Gateway & Rate Limiting |
| Re-rank retrieved chunks with a cross-encoder | Advanced RAG |
| Evaluate with RAGAS metrics | Automated Metrics |
| Hybrid search (BM25 + vector) | Advanced RAG |
📋 Complete Code
"""
RAG Pipeline — LangChain 0.2 + ChromaDB
Run: python main.py "Your question here"
"""
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
from langchain_community.document_loaders import TextLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from openai import RateLimitError
import tiktoken
load_dotenv()
DOCS_DIR = "docs"
CHROMA_DIR = os.getenv("CHROMA_PERSIST_DIR", "./chroma_db")
PROMPT_FILE = Path("prompts/rag_prompt.txt")
# ── Token utilities ──────────────────────────────────────────────────────────
def count_tokens(text: str, model: str = "gpt-4o-mini") -> int:
enc = tiktoken.encoding_for_model(model)
return len(enc.encode(text))
def estimate_cost(prompt: str, model: str = "gpt-4o-mini") -> None:
price_map = {"gpt-4o-mini": 0.00015, "gpt-4o": 0.005}
n = count_tokens(prompt, model)
cost = (n / 1000) * price_map.get(model, 0)
print(f" ↳ ~{n} tokens | estimated input cost: ${cost:.6f}")
# ── Ingestion ────────────────────────────────────────────────────────────────
def ingest():
docs = []
for fname in os.listdir(DOCS_DIR):
path = os.path.join(DOCS_DIR, fname)
if fname.endswith(".txt"):
docs.extend(TextLoader(path, encoding="utf-8").load())
elif fname.endswith(".pdf"):
docs.extend(PyPDFLoader(path).load())
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
chunks = splitter.split_documents(docs)
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory=CHROMA_DIR,
collection_name="rag_demo",
)
print(f"Ingested {len(chunks)} chunks into '{CHROMA_DIR}'")
return vectorstore
# ── Chain ────────────────────────────────────────────────────────────────────
def format_docs(docs) -> str:
return "\n\n---\n\n".join(
f"[{d.metadata.get('source','?')}]\n{d.page_content}" for d in docs
)
def build_chain(model="gpt-4o-mini", top_k=4):
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma(
persist_directory=CHROMA_DIR,
embedding_function=embeddings,
collection_name="rag_demo",
)
retriever = vectorstore.as_retriever(search_kwargs={"k": top_k})
template = PROMPT_FILE.read_text(encoding="utf-8")
prompt = PromptTemplate.from_template(template)
llm = ChatOpenAI(model=model, temperature=0, streaming=True)
chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
return chain, retriever
# ── Retry wrapper ─────────────────────────────────────────────────────────────
@retry(retry=retry_if_exception_type(RateLimitError),
wait=wait_exponential(multiplier=1, min=2, max=60),
stop=stop_after_attempt(5))
def ask(chain, retriever, question: str):
docs = retriever.invoke(question)
context_text = " ".join(d.page_content for d in docs)
estimate_cost(context_text + question)
print(f"\n💬 Answer (streaming):\n")
for token in chain.stream(question):
print(token, end="", flush=True)
print("\n\n📄 Sources:")
for d in docs:
print(f" • {d.metadata.get('source', 'unknown')}")
# ── Entry point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
if not os.path.exists(CHROMA_DIR):
print("No vector store found — running ingestion first…")
ingest()
question = " ".join(sys.argv[1:]) or "What is LangChain?"
print(f"\n🔍 Question: {question}")
chain, retriever = build_chain()
ask(chain, retriever, question)