The demo works. You chunked your documents, embedded them, wired up semantic search, and Claude is returning answers grounded in your knowledge base. Your team is excited. Your stakeholders are impressed. And you are approximately 30% of the way to a production system.
I've seen this exact moment play out a dozen times. The demo handles the happy path — the questions your QA person thought to ask during the review meeting. Production handles everything else: the ambiguous queries, the documents that were updated yesterday but whose embeddings are stale, the user who pastes in a 2,000-word question, the edge case where three different knowledge base articles contradict each other. The gap between demo and production is not a feature list. It is a set of architectural decisions about how retrieval, ranking, context assembly, and answer generation work together under real conditions.
The demo handles the happy path. Production handles everything else.
This chapter assembles the full pipeline — from multi-source retrieval through ranked context assembly to grounded generation — and shows you where the production-grade engineering decisions live. Most of this code builds directly on the chunking, embedding, and ranking patterns from Chapter 17. The difference is in how you orchestrate them.
A production RAG system has five stages, and each one has a distinct job:
Each stage is a function. Each function can be tested, measured, and swapped independently. This modularity is the difference between a system you can debug and a system you stare at helplessly when answers go wrong.
import os
import json
import sqlite3
import numpy as np
from anthropic import Anthropic
from rank_bm25 import BM25Okapi
from dotenv import load_dotenv
import voyageai
import re
load_dotenv()
claude_client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
voyage_client = voyageai.Client(api_key=os.getenv("VOYAGE_API_KEY"))
Every production RAG system runs through five stages: ingest, retrieve, rank, assemble, generate. When answer quality degrades, diagnose by checking each stage in order. The bug is almost always in the earliest stage you haven't tested recently.
Ingestion is where you convert raw documents into searchable chunks with embeddings. The critical production concern that demos ignore: your documents change. Policies get updated. Product specs evolve. New articles get published. If your embeddings don't reflect the current state of your documents, your retrieval system is answering from a stale snapshot.
def ingest_document(doc_id: str, text: str, db_path: str = "rag.db"):
"""Chunk, embed, and store a document. Replaces any existing version."""
# Clear old version
conn = sqlite3.connect(db_path)
conn.execute("DELETE FROM chunks WHERE doc_id = ?", (doc_id,))
conn.commit()
# Chunk using recursive strategy
chunks = recursive_chunks(text, max_size=800)
# Generate embeddings
texts = [c["text"] for c in chunks]
embeddings = voyage_client.embed(texts, model="voyage-3", input_type="document").embeddings
# Store chunks and embeddings
for chunk, embedding in zip(chunks, embeddings):
conn.execute(
"INSERT INTO chunks (doc_id, chunk_index, text, embedding) VALUES (?, ?, ?, ?)",
(doc_id, chunk["index"], chunk["text"], json.dumps(embedding))
)
conn.commit()
conn.close()
return len(chunks)
def recursive_chunks(text: str, max_size: int = 800) -> list[dict]:
"""Split on paragraphs, then sentences if a paragraph exceeds max_size."""
paragraphs = text.split("\n\n")
chunks = []
for para in paragraphs:
para = para.strip()
if not para:
continue
if len(para) <= max_size:
chunks.append({"index": len(chunks), "text": para})
else:
sentences = para.replace(". ", ".\n").split("\n")
current = ""
for sentence in sentences:
if len(current) + len(sentence) + 1 > max_size:
if current:
chunks.append({"index": len(chunks), "text": current.strip()})
current = sentence
else:
current = f"{current} {sentence}" if current else sentence
if current:
chunks.append({"index": len(chunks), "text": current.strip()})
return chunks
The DELETE before INSERT pattern is intentional. When a document is re-ingested — because it was updated, corrected, or expanded — the old chunks and their stale embeddings must go. The alternative, appending new chunks alongside old ones, means your retrieval system returns both the current answer and the deprecated one, and Claude has to figure out which is right. Claude shouldn't have to do that. Your ingestion pipeline should ensure the index reflects the current truth.
Voyage AI charges per token embedded. For a knowledge base of 10,000 documents averaging 2,000 tokens each, a full re-embedding costs roughly $2-5. That's cheap enough to re-embed nightly. If your corpus is larger, track which documents changed since the last ingest and re-embed only those. A last_modified timestamp in your database makes this trivial.
A single retrieval method always has blind spots. Semantic search misses exact identifiers. Keyword search misses paraphrases. The production answer is to run both in parallel and merge the results.
def retrieve(query: str, db_path: str = "rag.db", top_k: int = 10) -> list[dict]:
"""Hybrid retrieval: combine semantic search and BM25 keyword matching."""
conn = sqlite3.connect(db_path)
rows = conn.execute("SELECT doc_id, chunk_index, text, embedding FROM chunks").fetchall()
conn.close()
chunks = [{"doc_id": r[0], "chunk_index": r[1], "text": r[2]} for r in rows]
stored_embeddings = [json.loads(r[3]) for r in rows]
# Semantic search
query_embedding = voyage_client.embed(
[query], model="voyage-3", input_type="query"
).embeddings[0]
semantic_scores = {}
for i, emb in enumerate(stored_embeddings):
a, b = np.array(query_embedding), np.array(emb)
score = float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
semantic_scores[i] = score
# BM25 keyword search
tokenized_chunks = [tokenize(c["text"]) for c in chunks]
bm25 = BM25Okapi(tokenized_chunks)
bm25_scores_raw = bm25.get_scores(tokenize(query))
# Normalize BM25 scores to 0-1 range
max_bm25 = max(bm25_scores_raw) if max(bm25_scores_raw) > 0 else 1
bm25_scores = {i: float(s / max_bm25) for i, s in enumerate(bm25_scores_raw)}
# Merge: weighted combination (semantic gets more weight)
combined = {}
for i in range(len(chunks)):
combined[i] = 0.7 * semantic_scores.get(i, 0) + 0.3 * bm25_scores.get(i, 0)
# Sort and return top_k
ranked_indices = sorted(combined, key=combined.get, reverse=True)[:top_k]
return [
{**chunks[i], "retrieval_score": combined[i]}
for i in ranked_indices
]
def tokenize(text: str) -> list[str]:
text = text.lower()
text = re.sub(r"[^\w\s]", "", text)
return text.split()
The 0.7/0.3 weighting between semantic and BM25 scores is a starting point, not gospel. I've seen optimal weights range from 0.5/0.5 (for highly technical domains with lots of specific identifiers) to 0.9/0.1 (for conversational domains where paraphrasing dominates). Tune the weights against your golden question set.
A single retrieval method always has blind spots. The production answer is to run both semantic search and BM25 in parallel and merge the results.
The retriever returns ten candidates. The reranker cuts that down to the three to five that actually answer the question. This is where you convert recall (did I find a relevant document somewhere in my results?) into precision (is the most relevant document at the top?).
def rerank(query: str, candidates: list[dict], top_k: int = 3) -> list[dict]:
"""Rerank candidates using Claude for maximum relevance precision."""
if len(candidates) <= top_k:
return candidates
docs_formatted = "\n\n".join(
f"[Doc {i+1} | Source: {c['doc_id']}]\n{c['text'][:600]}"
for i, c in enumerate(candidates)
)
response = claude_client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
temperature=0,
messages=[{
"role": "user",
"content": (
f"Given the question: \"{query}\"\n\n"
f"Rank these documents by how directly each one answers the question. "
f"Return a JSON array of objects with 'doc_index' (1-based), "
f"'score' (0.0 to 1.0), and 'reason' (one sentence). "
f"Return ONLY valid JSON, no markdown fencing.\n\n{docs_formatted}"
),
}],
)
try:
# Extract JSON from response
text = response.content[0].text
# Handle potential code fencing
if "```" in text:
text = text.split("```")[1]
if text.startswith("json"):
text = text[4:]
rankings = json.loads(text.strip())
rankings.sort(key=lambda x: x["score"], reverse=True)
return [
{**candidates[r["doc_index"] - 1], "rerank_score": r["score"],
"rerank_reason": r["reason"]}
for r in rankings[:top_k]
]
except (json.JSONDecodeError, KeyError, IndexError):
# Fallback: return top_k by retrieval score
return candidates[:top_k]
The try/except fallback is not laziness — it's production engineering. Claude's reranking response is usually valid JSON, but "usually" is not "always." When the reranker fails, falling back to the retrieval-ordered candidates is better than crashing the entire pipeline. Log these fallback events and investigate them, but don't let a JSON parsing error turn into a user-facing failure.
This is the stage most tutorials skip entirely, and it's where a surprising number of production bugs live. You have three ranked chunks. Now you need to assemble them into a context block that Claude can reason about effectively.
def assemble_context(query: str, ranked_chunks: list[dict]) -> str:
"""Format ranked chunks into a structured context block for Claude."""
context_parts = []
for i, chunk in enumerate(ranked_chunks, 1):
source = chunk.get("doc_id", "unknown")
context_parts.append(
f"<source id=\"{i}\" document=\"{source}\">\n"
f"{chunk['text']}\n"
f"</source>"
)
context_block = "\n\n".join(context_parts)
prompt = (
f"You are answering a question using ONLY the sources provided below. "
f"Do not use any knowledge outside these sources. If the sources do not "
f"contain enough information to answer the question fully, say so explicitly "
f"rather than guessing.\n\n"
f"When you reference information, cite the source by its ID number.\n\n"
f"<sources>\n{context_block}\n</sources>\n\n"
f"Question: {query}"
)
return prompt
Three decisions matter here.
Source attribution. Each chunk is wrapped in a <source> tag with an ID and the document name. This lets Claude cite its sources, and it lets your users verify answers against the original documents. Traceability is not optional in production — it's how your users build trust and how your team debugs bad answers.
Strict grounding instructions. The system prompt tells Claude to answer only from the provided sources and to explicitly flag when the sources are insufficient. Without this, Claude fills gaps with its training data — which may be correct but is unverifiable and undermines the point of having a knowledge base.
XML structure. Using XML-style tags for context blocks gives Claude clear boundaries between sources. Compared to plain text separators, structured tags reduce the chance that Claude confuses content from one source with another.
Every token in the context block counts against Claude's context window and costs money. If you retrieve three 800-token chunks, that's 2,400 tokens of context before the question and system prompt. Budget your context: for most use cases, three to five chunks of 400-600 tokens each hits the sweet spot between coverage and cost. Stuffing the context window with twenty chunks doesn't improve answer quality — it degrades it.
The final stage sends the assembled prompt to Claude and returns the answer.
def generate_answer(prompt: str) -> dict:
"""Generate a grounded answer from Claude using the assembled context."""
response = claude_client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
temperature=0,
messages=[{"role": "user", "content": prompt}],
)
return {
"answer": response.content[0].text,
"model": "claude-sonnet-4-20250514",
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
}
Temperature zero is deliberate. For RAG applications, you want deterministic, grounded answers — not creative ones. Creativity means drawing on training data, which defeats the purpose of retrieval. If two users ask the same question against the same knowledge base, they should get the same answer.
Tracking input and output tokens in the return value is not vanity — it's cost management. A production RAG system handles hundreds or thousands of queries per day. If your average query costs $0.03 in API calls, that's $900/month at 30,000 queries. Knowing where those tokens go lets you optimize: tighter chunking, stricter top-k limits, shorter system prompts.
Here's how all five stages compose into a single query function:
def answer_question(query: str, db_path: str = "rag.db") -> dict:
"""Full RAG pipeline: retrieve, rank, assemble, generate."""
# Stage 2: Retrieve candidates
candidates = retrieve(query, db_path=db_path, top_k=10)
if not candidates:
return {
"answer": "No relevant information found in the knowledge base.",
"sources": [],
"tokens": {"input": 0, "output": 0},
}
# Stage 3: Rerank to top 3
ranked = rerank(query, candidates, top_k=3)
# Stage 4: Assemble context
prompt = assemble_context(query, ranked)
# Stage 5: Generate grounded answer
result = generate_answer(prompt)
return {
"answer": result["answer"],
"sources": [
{"document": c["doc_id"], "score": c.get("rerank_score", c.get("retrieval_score"))}
for c in ranked
],
"tokens": {
"input": result["input_tokens"],
"output": result["output_tokens"],
},
}
The return structure includes the answer, the sources with their scores, and the token usage. This is the minimum viable response format for a production RAG endpoint. Your frontend needs the answer to display, the sources for citation links, and you need the token counts for cost monitoring.
A production RAG system is not a monolith — it is five modular stages that each do one job and can each be tested, measured, and replaced independently. When answer quality degrades, diagnose by checking each stage in sequence: ingest, retrieve, rank, assemble, generate.
Real-world applications rarely have a single knowledge base. You might have product documentation in one system, FAQs in another, and company policies in a third. The Model Context Protocol (MCP) gives you a clean abstraction for multi-source retrieval: each knowledge source becomes an MCP resource that your RAG client can query.
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def retrieve_from_mcp(server_script: str, resource_names: list[str]) -> dict[str, list[str]]:
"""Fetch knowledge from multiple MCP resources."""
server_params = StdioServerParameters(
command="python",
args=[server_script],
)
results = {}
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
for name in resource_names:
response = await session.read_resource(f"kb://{name}")
text = "".join(
block.text for block in response.contents
if hasattr(block, "text")
)
results[name] = [line.strip() for line in text.split("\n") if line.strip()]
return results
The MCP server exposes each knowledge base as a named resource — kb://products, kb://faq, kb://policies. The RAG client reads from all of them, runs retrieval across the combined corpus, and assembles context with source labels so Claude can cite which knowledge base each piece of information came from.
This architecture scales horizontally. When a new knowledge source appears — a new product line, a new policy document, a new API reference — you register it as an MCP resource. The retrieval pipeline doesn't change. The ranking pipeline doesn't change. The context assembly just includes one more source tag. That's the payoff of the modular Five-Stage Pipeline: you can extend the system without rewriting it.
Production systems fail. The question is not whether but how gracefully.
Empty retrieval. The query matches nothing in your knowledge base. Don't silently fall back to Claude's general knowledge — that defeats the purpose of RAG. Return a clear message: "This question is outside the scope of the current knowledge base." If you need a fallback, make it explicit to the user: "No matching documents found. Here's a general answer, but it may not reflect your organization's specific policies."
Contradictory sources. Two documents say different things about the same topic. This is common with policies, product specs, and any content that evolves over time. Your context assembly should include document metadata (version dates, source labels) so Claude can identify and call out the contradiction rather than silently averaging the two answers.
Stale embeddings. A document was updated but its embeddings were not regenerated. The old chunks are still being retrieved. Your ingest pipeline needs a mechanism for detecting document changes — a hash comparison, a modified timestamp, or a webhook from your content management system — and triggering re-embedding.
Token budget overflow. A complex query triggers retrieval of many long chunks that exceed your context budget. Set hard limits in the assemble stage: if the total context exceeds a threshold (say, 4,000 tokens), truncate the lowest-ranked chunks first. Never truncate the highest-ranked chunk — that's the one most likely to contain the answer.
Take the code from this chapter and connect it to the chunks and embeddings you built in Chapter 17. Run your ten golden questions through the full pipeline. For each question, log which chunks were retrieved, which survived reranking, and whether the final answer was correct. This end-to-end trace is your debugging lifeline.
Modify your context assembly to include source tags and instruct Claude to cite sources by ID. Verify that the citations in the answers actually point to the correct chunks. Source attribution is the simplest way to build user trust and catch retrieval errors.
Set up a mechanism to detect when your source documents change. For file-based knowledge bases, a hash comparison on a cron job works. When a change is detected, re-run the ingest function for that document. Verify that the old chunks are removed and the new ones are searchable.
Run 50 representative queries through the pipeline and calculate the average cost per query. Break it down: how many tokens go to context, how many to the question, how many to the reranking call. Identify the single change that would reduce cost the most without hurting answer quality — usually it's tighter chunk sizes or a lower top_k.
Log every query where the reranker falls back to retrieval ordering, every query that returns zero candidates, and every query where the token count exceeds your budget. Review this log weekly. The patterns in your failures tell you exactly where to invest engineering time next.
A production RAG system is not something you build and forget. It is something you operate — with ingestion freshness, retrieval accuracy, and cost efficiency as ongoing engineering obligations.