Skip to main content

Building a RAG Pipeline with LangChain and ChromaDB

AI-Generated Content

AI-generated content may contain errors. Always verify against official sources.

By the end of this tutorial you will have a working Retrieval-Augmented Generation (RAG) pipeline that ingests your own documents, stores their embeddings in ChromaDB, and answers natural-language questions with cited sources — all streamed token-by-token to the terminal.

What You'll Learn

  • Why RAG exists and when to use it over fine-tuning
  • How to chunk and embed documents with LangChain text splitters
  • How to persist and query a local ChromaDB vector store
  • How to wire a retrieval chain that cites its sources
  • How to stream responses and handle rate-limit retries
  • How to count tokens and estimate API costs before every call
  • How to version your prompts in external files
  • How to evaluate retrieval quality with a simple hit-rate metric
Prerequisites
  • Python 3.10 or higher installed
  • An OpenAI API key (or an Anthropic key — we show both)
  • Basic familiarity with the OpenAI Python SDK (openai>=1.0)
  • pip and venv available on your machine
  • A folder of .txt or .pdf documents you want to query (we provide a sample)

1. Project Setup

1.1 Create a Virtual Environment

terminal
$ python -m venv .venv
$ source .venv/bin/activate # Windows: .venv\Scripts\activate

1.2 Install Dependencies

Pin every version so the tutorial stays reproducible:

terminal
$ pip install \
langchain==0.2.16 \
langchain-openai==0.1.23 \
langchain-anthropic==0.1.23 \
langchain-community==0.2.16 \
chromadb==0.5.5 \
tiktoken==0.7.0 \
pypdf==4.3.1 \
tenacity==8.5.0 \
python-dotenv==1.0.1
tip

Run pip freeze > requirements.txt after installation to lock your environment for collaborators.

1.3 Configure Environment Variables

Never Hardcode API Keys

Committing an API key to Git — even for a second — can result in immediate compromise. Always load secrets from the environment.

Create a .env file at the project root:

.env
OPENAI_API_KEY=sk-...          # replace with your key
ANTHROPIC_API_KEY=sk-ant-... # optional: only needed for the Anthropic tab
CHROMA_PERSIST_DIR=./chroma_db

Add .env to .gitignore immediately:

terminal
$ echo ".env" >> .gitignore

1.4 Project Structure

rag-demo/
├── .env
├── .gitignore
├── requirements.txt
├── docs/ ← drop your .txt / .pdf files here
│ └── sample.txt
├── prompts/
│ └── rag_prompt.txt ← versioned prompt template
├── main.py
└── evaluate.py

Create the folders:

terminal
$ mkdir -p docs prompts

2. Understanding RAG

Before writing code, here's the mental model you need:

User Question


[Embeddings Model] ──► Query Vector


[ChromaDB Vector Store]

top-k similar chunks


[LLM] ◄── Prompt = Question + Chunks


Grounded Answer + Sources

RAG solves the knowledge cut-off and hallucination problems by grounding the LLM in your documents at inference time — no fine-tuning required.

Why not fine-tuning?

Fine-tuning bakes knowledge into weights permanently; you'd need to retrain whenever documents change. RAG lets you update the knowledge base by re-ingesting documents, which takes seconds.


3. Document Ingestion

3.1 Add a Sample Document

terminal
$ cat > docs/sample.txt << 'EOF'
LangChain is an open-source framework for building LLM-powered applications.
It provides abstractions for chains, agents, memory, and retrieval.
ChromaDB is an AI-native open-source vector database that stores and retrieves
embeddings with millisecond latency. Together they form the backbone of many
production RAG systems. LangChain 0.2 introduced a stable LCEL (LangChain
Expression Language) interface that composes chains using the pipe operator (|).
EOF

3.2 Load and Split Documents

src/ingest.py
import os
from dotenv import load_dotenv
from langchain_community.document_loaders import TextLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

load_dotenv()

DOCS_DIR = "docs"
CHROMA_DIR = os.getenv("CHROMA_PERSIST_DIR", "./chroma_db")

def load_documents(directory: str):
"""Load .txt and .pdf files from a directory."""
docs = []
for fname in os.listdir(directory):
path = os.path.join(directory, fname)
if fname.endswith(".txt"):
docs.extend(TextLoader(path, encoding="utf-8").load())
elif fname.endswith(".pdf"):
docs.extend(PyPDFLoader(path).load())
print(f"Loaded {len(docs)} document(s) from '{directory}'")
return docs

def split_documents(docs, chunk_size=500, chunk_overlap=50):
"""Split documents into overlapping chunks for better retrieval."""
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
)
chunks = splitter.split_documents(docs)
print(f"Split into {len(chunks)} chunk(s)")
return chunks

def build_vectorstore(chunks):
"""Embed chunks and persist them to ChromaDB."""
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory=CHROMA_DIR,
collection_name="rag_demo",
)
print(f"Vector store persisted at '{CHROMA_DIR}'")
return vectorstore

if __name__ == "__main__":
raw_docs = load_documents(DOCS_DIR)
chunks = split_documents(raw_docs)
build_vectorstore(chunks)

What this does: RecursiveCharacterTextSplitter tries to split on paragraph breaks first, then sentences, then words — keeping semantic coherence. A chunk_overlap of 50 characters ensures context isn't lost at boundaries.

Run ingestion:

terminal
$ python src/ingest.py

Expected output:

Loaded 1 document(s) from 'docs'
Split into 3 chunk(s)
Vector store persisted at './chroma_db'
Chunk Size Matters

Too large (>1000 chars) → less precise retrieval; too small (<100 chars) → chunks lose context. Start at 400–600 and tune based on your documents.


4. Prompt Versioning

Store your prompt in a file — never inline it in Python. This lets you A/B test prompts without touching code.

prompts/rag_prompt.txt
You are a helpful assistant. Use ONLY the context below to answer the question.
If the answer is not contained in the context, say "I don't have enough information."
Always cite the source filename at the end of your answer.

Context:
{context}

Question: {question}

Answer:

Load it in Python:

src/prompt_loader.py
from pathlib import Path

def load_prompt(name: str) -> str:
"""Load a prompt template from the prompts/ directory."""
path = Path("prompts") / name
return path.read_text(encoding="utf-8")
Prompt Versioning Strategy

Commit prompts/ to Git and tag releases (e.g., rag_prompt_v2.txt). Link prompt version to evaluation scores in your experiment tracker.


5. Building the Retrieval Chain

5.1 Token Counting and Cost Estimation

Always estimate cost before sending a request — especially during development:

src/token_utils.py
import tiktoken

PRICE_PER_1K = {
"gpt-4o": {"input": 0.005, "output": 0.015},
"gpt-4o-mini": {"input": 0.00015,"output": 0.0006},
"text-embedding-3-small":{"input": 0.00002,"output": 0.0},
}

def count_tokens(text: str, model: str = "gpt-4o-mini") -> int:
enc = tiktoken.encoding_for_model(model)
return len(enc.encode(text))

def estimate_cost(prompt: str, model: str = "gpt-4o-mini") -> float:
n_tokens = count_tokens(prompt, model)
price = PRICE_PER_1K.get(model, {}).get("input", 0)
cost_usd = (n_tokens / 1000) * price
print(f" ↳ ~{n_tokens} tokens | estimated input cost: ${cost_usd:.6f}")
return cost_usd

5.2 Rate-Limit Retry Decorator

src/retry_utils.py
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
from openai import RateLimitError

@retry(retry=retry_if_exception_type(RateLimitError),
wait=wait_exponential(multiplier=1, min=2, max=60),
stop=stop_after_attempt(5))
def call_with_retry(chain, inputs: dict):
"""Invoke a LangChain chain with automatic exponential back-off."""
return chain.invoke(inputs)

This decorator retries up to 5 times with exponential back-off (2 s → 4 s → 8 s … up to 60 s) whenever OpenAI returns a 429.

5.3 The RAG Chain

src/rag_chain.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

from src.prompt_loader import load_prompt
from src.token_utils import estimate_cost

load_dotenv()

CHROMA_DIR = os.getenv("CHROMA_PERSIST_DIR", "./chroma_db")

def format_docs(docs) -> str:
parts = []
for d in docs:
source = d.metadata.get("source", "unknown")
parts.append(f"[{source}]\n{d.page_content}")
return "\n\n---\n\n".join(parts)

def build_rag_chain(model: str = "gpt-4o-mini", top_k: int = 4):
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma(
persist_directory=CHROMA_DIR,
embedding_function=embeddings,
collection_name="rag_demo",
)
retriever = vectorstore.as_retriever(search_kwargs={"k": top_k})

template = load_prompt("rag_prompt.txt")
prompt = PromptTemplate.from_template(template)

llm = ChatOpenAI(
model=model,
temperature=0,
streaming=True, # ← enable streaming
)

chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
return chain, retriever

What this does: The LCEL pipe (|) composes a DAG: the retriever fetches the top-k chunks, format_docs serialises them with source citations, the prompt template injects them, and the LLM streams the answer character by character.


6. Streaming the Answer

main.py
import sys
from src.rag_chain import build_rag_chain
from src.token_utils import estimate_cost
from src.retry_utils import call_with_retry

def main():
question = " ".join(sys.argv[1:]) or "What is LangChain?"

print(f"\n🔍 Question: {question}\n")
chain, retriever = build_rag_chain(model="gpt-4o-mini", top_k=4)

# Retrieve and cost-estimate before streaming
docs = retriever.invoke(question)
context_text = "\n".join(d.page_content for d in docs)
estimate_cost(context_text + question, model="gpt-4o-mini")

print("💬 Answer (streaming):\n")
for token in chain.stream(question):
print(token, end="", flush=True)
print("\n\n📄 Sources:")
for d in docs:
print(f" • {d.metadata.get('source', 'unknown')}")

if __name__ == "__main__":
main()

Run it:

terminal
$ python main.py "What is LCEL in LangChain?"

Expected output:

🔍 Question: What is LCEL in LangChain?

↳ ~312 tokens | estimated input cost: $0.000047

💬 Answer (streaming):

LCEL (LangChain Expression Language) is a stable interface introduced in
LangChain 0.2 that composes chains using the pipe operator (|). It allows
you to chain components declaratively... [source: docs/sample.txt]

📄 Sources:
• docs/sample.txt
Streaming in Web Apps

When serving RAG over HTTP, use FastAPI's StreamingResponse with an async for token in chain.astream(question) generator to forward tokens to the browser in real time.


7. Evaluating Retrieval Quality

A pipeline that retrieves wrong chunks will hallucinate even with a perfect LLM. Measure hit rate — the fraction of questions for which the correct chunk appears in the top-k results.

evaluate.py
"""
Simple hit-rate evaluator.
Requires a gold_set.json file:
[{"question": "...", "expected_source": "docs/sample.txt"}, ...]
"""
import json
from src.rag_chain import build_rag_chain

def hit_rate(gold_path: str = "gold_set.json", top_k: int = 4) -> float:
with open(gold_path) as f:
gold = json.load(f)

_, retriever = build_rag_chain(top_k=top_k)
hits = 0
for item in gold:
docs = retriever.invoke(item["question"])
sources = [d.metadata.get("source", "") for d in docs]
if any(item["expected_source"] in s for s in sources):
hits += 1

rate = hits / len(gold)
print(f"Hit rate @{top_k}: {rate:.2%} ({hits}/{len(gold)})")
return rate

if __name__ == "__main__":
hit_rate()

Create a minimal gold set:

gold_set.json
[
{
"question": "What pipe operator does LCEL use?",
"expected_source": "docs/sample.txt"
}
]
terminal
$ python evaluate.py
Hit rate @4: 100.00% (1/1)
Scaling Evaluation

For production, consider RAGAS which measures faithfulness, answer relevancy, and context precision automatically using an LLM-as-judge approach.


8. Troubleshooting

🛑 `AuthenticationError: Incorrect API key`
  1. Confirm .env is in the project root, not inside src/.
  2. Verify load_dotenv() is called before any LangChain import that reads env vars.
  3. Check for invisible characters: cat -A .env | grep OPENAI.
🛑 `InvalidRequestError: This model's maximum context length is 16385 tokens`

Your retrieved chunks plus the prompt exceed the model's context window.

  • Reduce top_k from 4 to 2.
  • Reduce chunk_size in the splitter.
  • Switch to gpt-4o (128 k context) for large-document use cases.
🛑 `chromadb.errors.UniqueConstraintError` on re-ingestion

ChromaDB already has documents with the same IDs. Either:

  • Delete the persist directory: rm -rf ./chroma_db and re-run ingestion, or
  • Use Chroma.from_documents(..., ids=[...]) with content-hashed IDs for idempotent upserts.
🛑 Answers are vague or hallucinated
  1. Print the retrieved chunks: for d in retriever.invoke(q): print(d.page_content).
  2. If chunks look irrelevant, your chunk_size is too large — split more aggressively.
  3. If chunks are correct but the answer is wrong, tighten the prompt: add "Do NOT speculate. Only use the provided context."
🛑 `RateLimitError` even with retry logic
  • Check your OpenAI tier — free-tier accounts have very low RPM (requests per minute).
  • Add time.sleep(1) between batch embedding calls during ingestion.
  • Consider batching documents: embeddings.embed_documents(texts, chunk_size=100).

9. Next Steps

You now have a working RAG pipeline. Here's where to take it next:

TopicLink
Add conversational memory (multi-turn RAG)Advanced RAG
Serve your pipeline as a REST APIAPI Gateway & Rate Limiting
Re-rank retrieved chunks with a cross-encoderAdvanced RAG
Evaluate with RAGAS metricsAutomated Metrics
Hybrid search (BM25 + vector)Advanced RAG

📋 Complete Code
main.py
"""
RAG Pipeline — LangChain 0.2 + ChromaDB
Run: python main.py "Your question here"
"""
import os
import sys
from pathlib import Path
from dotenv import load_dotenv

from langchain_community.document_loaders import TextLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from openai import RateLimitError
import tiktoken

load_dotenv()

DOCS_DIR = "docs"
CHROMA_DIR = os.getenv("CHROMA_PERSIST_DIR", "./chroma_db")
PROMPT_FILE = Path("prompts/rag_prompt.txt")

# ── Token utilities ──────────────────────────────────────────────────────────
def count_tokens(text: str, model: str = "gpt-4o-mini") -> int:
enc = tiktoken.encoding_for_model(model)
return len(enc.encode(text))

def estimate_cost(prompt: str, model: str = "gpt-4o-mini") -> None:
price_map = {"gpt-4o-mini": 0.00015, "gpt-4o": 0.005}
n = count_tokens(prompt, model)
cost = (n / 1000) * price_map.get(model, 0)
print(f" ↳ ~{n} tokens | estimated input cost: ${cost:.6f}")

# ── Ingestion ────────────────────────────────────────────────────────────────
def ingest():
docs = []
for fname in os.listdir(DOCS_DIR):
path = os.path.join(DOCS_DIR, fname)
if fname.endswith(".txt"):
docs.extend(TextLoader(path, encoding="utf-8").load())
elif fname.endswith(".pdf"):
docs.extend(PyPDFLoader(path).load())

splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
chunks = splitter.split_documents(docs)

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory=CHROMA_DIR,
collection_name="rag_demo",
)
print(f"Ingested {len(chunks)} chunks into '{CHROMA_DIR}'")
return vectorstore

# ── Chain ────────────────────────────────────────────────────────────────────
def format_docs(docs) -> str:
return "\n\n---\n\n".join(
f"[{d.metadata.get('source','?')}]\n{d.page_content}" for d in docs
)

def build_chain(model="gpt-4o-mini", top_k=4):
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma(
persist_directory=CHROMA_DIR,
embedding_function=embeddings,
collection_name="rag_demo",
)
retriever = vectorstore.as_retriever(search_kwargs={"k": top_k})
template = PROMPT_FILE.read_text(encoding="utf-8")
prompt = PromptTemplate.from_template(template)
llm = ChatOpenAI(model=model, temperature=0, streaming=True)
chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
return chain, retriever

# ── Retry wrapper ─────────────────────────────────────────────────────────────
@retry(retry=retry_if_exception_type(RateLimitError),
wait=wait_exponential(multiplier=1, min=2, max=60),
stop=stop_after_attempt(5))
def ask(chain, retriever, question: str):
docs = retriever.invoke(question)
context_text = " ".join(d.page_content for d in docs)
estimate_cost(context_text + question)
print(f"\n💬 Answer (streaming):\n")
for token in chain.stream(question):
print(token, end="", flush=True)
print("\n\n📄 Sources:")
for d in docs:
print(f" • {d.metadata.get('source', 'unknown')}")

# ── Entry point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
if not os.path.exists(CHROMA_DIR):
print("No vector store found — running ingestion first…")
ingest()

question = " ".join(sys.argv[1:]) or "What is LangChain?"
print(f"\n🔍 Question: {question}")
chain, retriever = build_chain()
ask(chain, retriever, question)