Case Study 3: Multilingual Content Production Pipeline (Batch + Structured Outputs + Connector Automation)
Chapter 80: Building an Enterprise RAG System: Architecture Evolution from POC to Millions of Daily Active Users
80.1 The Full Picture of Enterprise RAG
Retrieval-Augmented Generation (RAG) is the dominant architecture pattern in enterprise AI applications today. Compared to pure LLM Q&A, RAG's value lies in combining a company's private knowledge base with the generation capabilities of an LLM โ enabling responses grounded in the most current and accurate internal information.
This chapter uses a hypothetical enterprise โ 500,000 internal documents, 1 million daily active users โ as the backdrop to demonstrate a complete architectural evolution from zero to production.
80.1.1 Full System Architecture (ASCII Diagram)
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Enterprise RAG System Architecture โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ OFFLINE PIPELINE โ ONLINE PIPELINE (Query Service) โ
โ (Index Construction) โ โ
โ โ User Query โ
โ Document Sources โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โโโโโโโโโโโโโโโโโโโ โ โ API Gateway โ โ
โ โ SharePoint โ โ โ (Auth / Rate Limiting / Logs) โ โ
โ โ Confluence โโโ โ โโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโ โ
โ โ S3 / Object โ โ โ โ โ
โ โ Database โ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โโโโโโโโโโโโโโโโโโโ โ โ โ Query Understanding Layer โ โ
โ โ โ โ - Intent classification โ โ
โ โโโโโโโโโโโโโโโโโโโโโ โ โ - Query rewriting / expansion โ โ
โ โ Document โ โ โ - Language detection โ โ
โ โ Pre-processing โ โ โโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโ โ
โ โ - Format parsing โ โ โ โ
โ โ - Text cleaning โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ - Metadata โ โ โ Hybrid Retrieval Layer โ โ
โ โโโโโโโโโโฌโโโโโโโโโโโ โ โ โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ โ โ
โ โ โ โ โ Vector โ โ Keyword โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโ โ โ โ Search โ โ (BM25/ES) โ โ โ
โ โ Chunking Strategy โ โ โ โโโโโโโฌโโโโโโโ โโโโโโโฌโโโโโโโ โ โ
โ โ - Fixed size โ โ โ โโโโโโโโโโฌโโโโโโโ โ โ
โ โ - Semantic โ โ โ RRF Fusion โ โ
โ โ - Recursive โ โ โโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโ โ
โ โโโโโโโโโโฌโโโโโโโโโโโ โ โ โ
โ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โโโโโโโโโโโโโโโโโโโโโ โ โ Reranking Layer โ โ
โ โ Embedding โ โ โ Cross-encoder precision rerank โ โ
โ โ - Cohere / OpenAI โ โ โโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโ โ
โ โ - BGE / E5 โ โ โ โ
โ โโโโโโโโโโฌโโโโโโโโโโโ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ โ Generation Layer (Claude) โ โ
โ โโโโโโโโโโโโโโโโโโโโโ โ โ - Context assembly โ โ
โ โ Vector DB Write โ โ โ - Citation injection โ โ
โ โ - Pinecone โ โ โ - Answer generation โ โ
โ โ - Qdrant โ โ โโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโ โ
โ โ - pgvector โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ Post-processing Layer โ โ
โ โ โ - Citation validation โ โ
โ โ โ - Output formatting โ โ
โ โ โ - Confidence scoring โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Observability Stack โ
โ Prometheus + Grafana + Jaeger โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
80.2 Phase 1: POC (0โ100 Users)
80.2.1 POC Goals and Constraints
The POC phase's core goal is validating technical feasibility, not building a scalable system. Typical constraints: single-machine deployment, fewer than 10,000 documents, relaxed response time requirements (<10 seconds acceptable), no high-availability requirements.
80.2.2 Chunking Strategy
Chunking is one of the most critical determinants of RAG quality. Wrong chunking leads to either incomplete retrieved context or irrelevant content dominating the prompt.
import re
class DocumentChunker:
def fixed_size_chunking(self, text: str, chunk_size: int = 512, overlap: int = 50) -> list:
"""Fixed-size chunking. Simple and predictable but may split mid-sentence."""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
if end < len(text):
last_period = max(chunk.rfind('.'), chunk.rfind('\n'), chunk.rfind('!'))
if last_period > chunk_size * 0.7:
chunk = chunk[:last_period + 1]
end = start + last_period + 1
chunks.append({"text": chunk, "start_char": start, "chunk_type": "fixed_size"})
start = end - overlap
return chunks
def semantic_chunking(self, text: str, max_size: int = 800) -> list:
"""Paragraph-boundary chunking. Preserves semantic integrity."""
paragraphs = re.split(r'\n{2,}', text)
chunks = []
current_chunk = ""
for para in paragraphs:
para = para.strip()
if not para:
continue
if len(current_chunk) + len(para) < max_size:
current_chunk += ("\n\n" if current_chunk else "") + para
else:
if current_chunk:
chunks.append({"text": current_chunk, "chunk_type": "semantic"})
current_chunk = para
if current_chunk:
chunks.append({"text": current_chunk, "chunk_type": "semantic"})
return chunks
def recursive_character_splitting(self, text: str, chunk_size: int = 512,
separators: list = None) -> list:
"""Recursive splitting (inspired by LangChain). Intelligently chooses split level."""
if separators is None:
separators = ["\n\n", "\n", ". ", " ", ""]
def _split(text, seps):
if len(text) <= chunk_size:
return [text]
sep = seps[0] if seps else ""
splits = text.split(sep) if sep else list(text)
chunks, current = [], ""
for s in splits:
if len(current) + len(s) + len(sep) <= chunk_size:
current += (sep if current else "") + s
else:
if current:
chunks.append(current)
current = s if len(s) <= chunk_size else ""
if len(s) > chunk_size and len(seps) > 1:
chunks.extend(_split(s, seps[1:]))
if current:
chunks.append(current)
return chunks
return [{"text": c, "chunk_type": "recursive"} for c in _split(text, separators) if c.strip()]
80.2.3 Minimal POC Implementation
from anthropic import Anthropic
import json
client = Anthropic()
class SimplePOCRAG:
def __init__(self, embedding_fn, vector_store):
self.embed = embedding_fn
self.store = vector_store
self.chunker = DocumentChunker()
def index_document(self, doc_id: str, text: str, metadata: dict = None):
chunks = self.chunker.semantic_chunking(text)
for i, chunk in enumerate(chunks):
self.store.upsert(
id=f"{doc_id}_{i}",
vector=self.embed(chunk["text"]),
metadata={"text": chunk["text"], "doc_id": doc_id, "chunk_index": i,
**(metadata or {})}
)
def query(self, question: str, top_k: int = 5) -> dict:
results = self.store.query(vector=self.embed(question), top_k=top_k)
if not results:
return {"answer": "No relevant information found.", "sources": []}
context = "\n\n---\n\n".join([
f"[Source {i+1}]: {r['metadata']['text']}"
for i, r in enumerate(results)
])
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=1000,
system="""You are an enterprise knowledge base assistant. Answer questions based on the provided document excerpts.
Rules:
1. Only answer based on provided document content
2. If the documents don't contain relevant information, say so explicitly
3. Cite sources using [Source N] format
4. Be accurate and concise""",
messages=[{"role": "user", "content": f"Documents:\n{context}\n\nQuestion: {question}"}]
)
return {
"answer": response.content[0].text,
"sources": [r["metadata"]["doc_id"] for r in results]
}
80.3 Phase 2: Production (100โ10,000 Users)
80.3.1 Hybrid Search
Pure vector search struggles with exact-match queries (like order numbers). BM25 keyword search has limited semantic understanding. Production systems need both:
class HybridRetriever:
"""Vector search + BM25 + RRF fusion."""
def __init__(self, vector_store, bm25_index):
self.vector_store = vector_store
self.bm25 = bm25_index
def retrieve(self, query: str, top_k: int = 20) -> list:
query_embedding = compute_embedding(query)
vector_results = self.vector_store.query(vector=query_embedding, top_k=top_k)
keyword_results = self.bm25.search(query, top_k=top_k)
return self._rrf_fusion([vector_results, keyword_results])
def _rrf_fusion(self, result_lists: list, k: int = 60) -> list:
"""Reciprocal Rank Fusion: score = sum(1 / (k + rank))"""
scores = {}
for result_list in result_lists:
for rank, result in enumerate(result_list):
doc_id = result.get("id")
if doc_id not in scores:
scores[doc_id] = {"score": 0.0, "data": result}
scores[doc_id]["score"] += 1.0 / (k + rank + 1)
return [r["data"] for r in sorted(scores.values(), key=lambda x: x["score"], reverse=True)]
80.3.2 Cross-Encoder Reranking
class CrossEncoderReranker:
"""Cross-encoder reranker. Higher accuracy than bi-encoder at the cost of speed."""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model_name)
def rerank(self, query: str, candidates: list, top_k: int = 5) -> list:
pairs = [(query, c["text"]) for c in candidates]
scores = self.model.predict(pairs)
scored = sorted(zip(scores, candidates), key=lambda x: x[0], reverse=True)
return [{**c, "rerank_score": float(s)} for s, c in scored[:top_k]]
80.3.3 Query Rewriting and Expansion
def rewrite_query(original_query: str, conversation_history: list = None) -> dict:
"""Rewrite queries to remove anaphora and dependency on conversation context."""
history_text = ""
if conversation_history:
history_text = "Conversation history:\n" + "\n".join([
f"User: {turn['user']}\nAssistant: {turn['assistant'][:100]}..."
for turn in conversation_history[-3:]
])
rewrite_prompt = f"""Rewrite the following user query for document retrieval.
{history_text}
Original query: {original_query}
Requirements:
1. Resolve pronouns (e.g., "it", "that", "the aforementioned")
2. Add context if the query depends on conversation history
3. Generate 3 semantically similar query variants for query expansion
4. Identify key entities and concepts
Output as JSON:
{{
"rewritten_query": "standalone rewritten query",
"query_variants": ["variant1", "variant2", "variant3"],
"key_entities": ["entity1", "entity2"],
"query_type": "factual|procedural|analytical|comparative"
}}"""
response = client.messages.create(
model="claude-haiku-4-5",
max_tokens=400,
messages=[{"role": "user", "content": rewrite_prompt}]
)
try:
return json.loads(response.content[0].text)
except:
return {"rewritten_query": original_query, "query_variants": [original_query],
"key_entities": [], "query_type": "factual"}
80.4 Phase 3: Scale (10,000โ1 Million DAU)
80.4.1 Vector Database Selection
| Feature | Pinecone | Qdrant | pgvector |
|---|---|---|---|
| Deployment | Fully managed cloud | Self-hosted / cloud | PostgreSQL extension |
| Scale ceiling | Billions of vectors | Billions (horizontal scale) | Millions (single node) |
| p99 latency | <50ms | <20ms (self-hosted) | <100ms |
| Full-text search | No | Yes (payload filtering) | Yes (PostgreSQL) |
| Cost | High (usage-based) | Medium (infrastructure) | Low (if PG already exists) |
| Best for | Fast start, no ops capacity | High-performance self-hosting | Existing PG stack |
80.4.2 Distributed Index Construction
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
class DistributedIndexBuilder:
def __init__(self, vector_store, embedding_fn, max_workers: int = 10):
self.vector_store = vector_store
self.embed = embedding_fn
self.max_workers = max_workers
self.chunker = DocumentChunker()
self._progress = {"processed": 0, "failed": 0, "total": 0}
self._lock = threading.Lock()
def build_index(self, documents: list) -> dict:
self._progress["total"] = len(documents)
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self._process_document, doc): doc["id"]
for doc in documents}
for future in as_completed(futures):
doc_id = futures[future]
try:
future.result()
with self._lock:
self._progress["processed"] += 1
except Exception as e:
print(f"Failed to index {doc_id}: {e}")
with self._lock:
self._progress["failed"] += 1
return dict(self._progress)
def _process_document(self, doc: dict):
chunks = self.chunker.recursive_character_splitting(doc["text"])
items = [
{"id": f"{doc['id']}_{i}",
"vector": self.embed(chunk["text"]),
"metadata": {"text": chunk["text"], "doc_id": doc["id"],
"chunk_index": i, **doc.get("metadata", {})}}
for i, chunk in enumerate(chunks)
]
self.vector_store.upsert_batch(items)
80.4.3 Multi-Layer Caching
import hashlib, json
from typing import Optional
class RAGCache:
"""
L1: In-memory LRU cache
L2: Redis distributed cache
L3: Semantic cache (reuse results for similar queries)
"""
def __init__(self, redis_client=None, semantic_threshold: float = 0.95):
self.redis = redis_client
self.semantic_threshold = semantic_threshold
self._memory_cache = {}
def get_cache_key(self, query: str) -> str:
return hashlib.md5(query.encode()).hexdigest()
def get(self, query: str) -> Optional[dict]:
key = self.get_cache_key(query)
if key in self._memory_cache:
return self._memory_cache[key]
if self.redis:
cached = self.redis.get(f"rag:{key}")
if cached:
result = json.loads(cached)
self._memory_cache[key] = result
return result
return None
def set(self, query: str, result: dict, ttl: int = 3600):
key = self.get_cache_key(query)
self._memory_cache[key] = result
if self.redis:
self.redis.setex(f"rag:{key}", ttl, json.dumps(result))
80.5 Monitoring and Observability
80.5.1 Key Metrics Framework
RAG_METRICS = {
"retrieval_quality": {
"recall_at_k": "Proportion of top-K results containing relevant documents",
"mrr": "Mean Reciprocal Rank",
"ndcg": "Normalized Discounted Cumulative Gain"
},
"generation_quality": {
"answer_faithfulness": "Is the answer grounded in retrieved content?",
"answer_relevance": "Does the answer address the question?",
"context_precision": "What fraction of retrieved context was actually used?"
},
"system_performance": {
"retrieval_latency_p50": "Target: <100ms",
"retrieval_latency_p99": "Target: <500ms",
"generation_latency_p50": "Target: <2s",
"total_latency_p99": "Target: <5s",
"error_rate": "Target: <0.1%"
},
"business_metrics": {
"answer_acceptance_rate": "Users who don't ask follow-up questions",
"citation_click_rate": "Users who view the source document",
"session_resolution_rate": "Sessions resolved without escalation"
}
}
80.5.2 RAG Evaluation Pipeline
class RAGEvaluator:
"""Uses Claude as the evaluator."""
def evaluate_answer(self, question: str, retrieved_context: str,
generated_answer: str) -> dict:
eval_prompt = f"""Evaluate the quality of this RAG system output.
Question: {question}
Retrieved context:
{retrieved_context[:3000]}
Generated answer:
{generated_answer}
Score the following dimensions (1-5):
1. Faithfulness: Is the answer entirely grounded in the context, with no fabrication?
2. Relevance: Does the answer directly address the question?
3. Completeness: Does the answer cover all aspects of the question?
4. Context Precision: What fraction of the context was actually used? (0.0-1.0)
Output as JSON:
{{
"faithfulness": {{"score": 1-5, "explanation": "..."}},
"relevance": {{"score": 1-5, "explanation": "..."}},
"completeness": {{"score": 1-5, "explanation": "..."}},
"context_precision": {{"score": 0.0-1.0, "explanation": "..."}},
"overall_quality": "excellent/good/acceptable/poor",
"hallucination_detected": true/false,
"hallucinated_content": "list any fabricated content here"
}}"""
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=1000,
messages=[{"role": "user", "content": eval_prompt}]
)
try:
return json.loads(response.content[0].text)
except:
return {"raw_evaluation": response.content[0].text}
80.6 Cost Optimization at Scale
80.6.1 Model Routing
def route_model(query_complexity_score: float) -> str:
"""Route queries to the right model based on complexity."""
if query_complexity_score < 0.3:
return "claude-haiku-4-5" # ~5x cheaper, handles simple factual queries
elif query_complexity_score < 0.7:
return "claude-sonnet-4-5" # balanced cost/quality
else:
return "claude-opus-4-5" # reserved for complex reasoning tasks
80.6.2 Monthly Cost Projection at Scale
def estimate_monthly_cost(daily_active_users, avg_queries_per_user,
avg_context_tokens, avg_output_tokens) -> dict:
monthly_queries = daily_active_users * avg_queries_per_user * 30
# Embedding cost
embedding_cost = (monthly_queries * 100 / 1_000_000) * 0.02 # $0.02/1M tokens
# Claude generation cost (claude-sonnet example)
input_cost = (monthly_queries * avg_context_tokens / 1_000_000) * 3.0 # $3/1M
output_cost = (monthly_queries * avg_output_tokens / 1_000_000) * 15.0 # $15/1M
total = embedding_cost + input_cost + output_cost
return {
"monthly_queries": monthly_queries,
"monthly_cost_usd": round(total, 2),
"cost_per_query_usd": round(total / monthly_queries, 5),
"optimization_note": "Semantic caching + model routing can reduce this by 60-70%"
}
# 1 million DAU estimate
print(estimate_monthly_cost(1_000_000, 3, 2000, 300))
# monthly_queries: 90,000,000
# monthly_cost_usd: ~$837,000 (before optimization)
# After caching + routing: ~$250,000-300,000
80.7 Architecture Evolution Summary
POC Phase (0โ100 users)
โโโ Single-machine deployment
โโโ Simple semantic chunking
โโโ Pure vector search
โโโ No caching
โโโ 5โ15 second response time acceptable
Production Phase (100โ10,000 users)
โโโ Hybrid retrieval (vector + BM25)
โโโ Cross-encoder reranking
โโโ Query rewriting and expansion
โโโ Redis query caching
โโโ Response time < 3 seconds
Scale Phase (10,000โ1M DAU)
โโโ Distributed vector database (Pinecone / Qdrant cluster)
โโโ Multi-layer caching (L1 / L2 / semantic)
โโโ Query routing (model tiering)
โโโ Async processing pipeline
โโโ Comprehensive monitoring and alerting
โโโ p99 response time < 2 seconds
Beyond 1M DAU
โโโ Multi-region deployment (latency reduction)
โโโ Model distillation (shift expensive inference to local models)
โโโ Real-time incremental indexing (document updates reflected instantly)
โโโ Personalized retrieval (user-profile-aware)
โโโ A/B testing framework (continuous module optimization)
Closing Words: The Final Chapter
This final chapter, using an enterprise RAG system as its vehicle, synthesizes the major themes of this book: prompt engineering (query rewriting, answer generation), evaluation frameworks (RAG Evaluator), multilingual support (language-aware chunking and retrieval), and system architecture design (from POC to millions of daily active users).
Summary of core design principles:
1. Evolve in phases. Do not attempt to build the full production system at POC stage. Start with the simplest working version, then add complexity driven by data.
2. Retrieval quality is the core. 80% of RAG quality problems originate in retrieval, not generation. Before optimizing prompts, ensure retrieval is surfacing the right documents.
3. Observability before optimization. Without metrics there is no direction for improvement. Build the complete metrics framework first, then optimize.
4. Maintain cost consciousness throughout. At 1 million DAU scale, each 1% cost reduction is worth tens of thousands of dollars per month. Query routing, caching, and context compression are the three primary cost control levers.
5. Keep humans in the loop. AI is a means, not an end. The best RAG systems honestly say "I don't know" when confidence is insufficient, and guide users toward human experts.
Building AI systems is a marathon, not a sprint. The frameworks, code, and ways of thinking this book provides are the starting point of that journey, not the finish line.