Chapter 80

Case Study 3: Multilingual Content Production Pipeline (Batch + Structured Outputs + Connector Automation)

Chapter 80: Building an Enterprise RAG System: Architecture Evolution from POC to Millions of Daily Active Users

80.1 The Full Picture of Enterprise RAG

Retrieval-Augmented Generation (RAG) is the dominant architecture pattern in enterprise AI applications today. Compared to pure LLM Q&A, RAG's value lies in combining a company's private knowledge base with the generation capabilities of an LLM — enabling responses grounded in the most current and accurate internal information.

This chapter uses a hypothetical enterprise — 500,000 internal documents, 1 million daily active users — as the backdrop to demonstrate a complete architectural evolution from zero to production.

80.1.1 Full System Architecture (ASCII Diagram)

┌────────────────────────────────────────────────────────────────────┐
│                    Enterprise RAG System Architecture               │
├──────────────────────────┬─────────────────────────────────────────┤
│  OFFLINE PIPELINE        │  ONLINE PIPELINE (Query Service)        │
│  (Index Construction)    │                                         │
│                          │  User Query                             │
│  Document Sources        │  ┌────────────────────────────────┐    │
│  ┌─────────────────┐     │  │ API Gateway                    │    │
│  │ SharePoint      │     │  │ (Auth / Rate Limiting / Logs)  │    │
│  │ Confluence      │─┐   │  └────────────────┬───────────────┘    │
│  │ S3 / Object     │ │   │                   ↓                    │
│  │ Database        │ │   │  ┌────────────────────────────────┐    │
│  └─────────────────┘ │   │  │ Query Understanding Layer      │    │
│                      ↓   │  │ - Intent classification        │    │
│  ┌───────────────────┐   │  │ - Query rewriting / expansion  │    │
│  │ Document          │   │  │ - Language detection           │    │
│  │ Pre-processing    │   │  └────────────────┬───────────────┘    │
│  │ - Format parsing  │   │                   ↓                    │
│  │ - Text cleaning   │   │  ┌────────────────────────────────┐    │
│  │ - Metadata        │   │  │ Hybrid Retrieval Layer         │    │
│  └────────┬──────────┘   │  │ ┌────────────┐ ┌────────────┐  │    │
│           ↓              │  │ │ Vector     │ │ Keyword    │  │    │
│  ┌───────────────────┐   │  │ │ Search     │ │ (BM25/ES)  │  │    │
│  │ Chunking Strategy │   │  │ └─────┬──────┘ └─────┬──────┘  │    │
│  │ - Fixed size      │   │  │       └────────┬──────┘         │    │
│  │ - Semantic        │   │  │            RRF Fusion           │    │
│  │ - Recursive       │   │  └────────────────┬───────────────┘    │
│  └────────┬──────────┘   │                   ↓                    │
│           ↓              │  ┌────────────────────────────────┐    │
│  ┌───────────────────┐   │  │ Reranking Layer                │    │
│  │ Embedding         │   │  │ Cross-encoder precision rerank │    │
│  │ - Cohere / OpenAI │   │  └────────────────┬───────────────┘    │
│  │ - BGE / E5        │   │                   ↓                    │
│  └────────┬──────────┘   │  ┌────────────────────────────────┐    │
│           ↓              │  │ Generation Layer (Claude)      │    │
│  ┌───────────────────┐   │  │ - Context assembly             │    │
│  │ Vector DB Write   │   │  │ - Citation injection           │    │
│  │ - Pinecone        │   │  │ - Answer generation            │    │
│  │ - Qdrant          │   │  └────────────────┬───────────────┘    │
│  │ - pgvector        │   │                   ↓                    │
│  └───────────────────┘   │  ┌────────────────────────────────┐    │
│                          │  │ Post-processing Layer          │    │
│                          │  │ - Citation validation          │    │
│                          │  │ - Output formatting            │    │
│                          │  │ - Confidence scoring           │    │
│                          │  └────────────────────────────────┘    │
└──────────────────────────┴─────────────────────────────────────────┘
                                  ↓
           ┌──────────────────────────────────────────┐
           │  Observability Stack                      │
           │  Prometheus + Grafana + Jaeger            │
           └──────────────────────────────────────────┘

80.2 Phase 1: POC (0–100 Users)

80.2.1 POC Goals and Constraints

The POC phase's core goal is validating technical feasibility, not building a scalable system. Typical constraints: single-machine deployment, fewer than 10,000 documents, relaxed response time requirements (<10 seconds acceptable), no high-availability requirements.

80.2.2 Chunking Strategy

Chunking is one of the most critical determinants of RAG quality. Wrong chunking leads to either incomplete retrieved context or irrelevant content dominating the prompt.

import re

class DocumentChunker:
    
    def fixed_size_chunking(self, text: str, chunk_size: int = 512, overlap: int = 50) -> list:
        """Fixed-size chunking. Simple and predictable but may split mid-sentence."""
        chunks = []
        start = 0
        while start < len(text):
            end = start + chunk_size
            chunk = text[start:end]
            if end < len(text):
                last_period = max(chunk.rfind('.'), chunk.rfind('\n'), chunk.rfind('!'))
                if last_period > chunk_size * 0.7:
                    chunk = chunk[:last_period + 1]
                    end = start + last_period + 1
            chunks.append({"text": chunk, "start_char": start, "chunk_type": "fixed_size"})
            start = end - overlap
        return chunks
    
    def semantic_chunking(self, text: str, max_size: int = 800) -> list:
        """Paragraph-boundary chunking. Preserves semantic integrity."""
        paragraphs = re.split(r'\n{2,}', text)
        chunks = []
        current_chunk = ""
        for para in paragraphs:
            para = para.strip()
            if not para:
                continue
            if len(current_chunk) + len(para) < max_size:
                current_chunk += ("\n\n" if current_chunk else "") + para
            else:
                if current_chunk:
                    chunks.append({"text": current_chunk, "chunk_type": "semantic"})
                current_chunk = para
        if current_chunk:
            chunks.append({"text": current_chunk, "chunk_type": "semantic"})
        return chunks
    
    def recursive_character_splitting(self, text: str, chunk_size: int = 512,
                                       separators: list = None) -> list:
        """Recursive splitting (inspired by LangChain). Intelligently chooses split level."""
        if separators is None:
            separators = ["\n\n", "\n", ". ", " ", ""]
        
        def _split(text, seps):
            if len(text) <= chunk_size:
                return [text]
            sep = seps[0] if seps else ""
            splits = text.split(sep) if sep else list(text)
            chunks, current = [], ""
            for s in splits:
                if len(current) + len(s) + len(sep) <= chunk_size:
                    current += (sep if current else "") + s
                else:
                    if current:
                        chunks.append(current)
                    current = s if len(s) <= chunk_size else ""
                    if len(s) > chunk_size and len(seps) > 1:
                        chunks.extend(_split(s, seps[1:]))
            if current:
                chunks.append(current)
            return chunks
        
        return [{"text": c, "chunk_type": "recursive"} for c in _split(text, separators) if c.strip()]

80.2.3 Minimal POC Implementation

from anthropic import Anthropic
import json

client = Anthropic()

class SimplePOCRAG:
    
    def __init__(self, embedding_fn, vector_store):
        self.embed = embedding_fn
        self.store = vector_store
        self.chunker = DocumentChunker()
    
    def index_document(self, doc_id: str, text: str, metadata: dict = None):
        chunks = self.chunker.semantic_chunking(text)
        for i, chunk in enumerate(chunks):
            self.store.upsert(
                id=f"{doc_id}_{i}",
                vector=self.embed(chunk["text"]),
                metadata={"text": chunk["text"], "doc_id": doc_id, "chunk_index": i,
                          **(metadata or {})}
            )
    
    def query(self, question: str, top_k: int = 5) -> dict:
        results = self.store.query(vector=self.embed(question), top_k=top_k)
        if not results:
            return {"answer": "No relevant information found.", "sources": []}
        
        context = "\n\n---\n\n".join([
            f"[Source {i+1}]: {r['metadata']['text']}"
            for i, r in enumerate(results)
        ])
        
        response = client.messages.create(
            model="claude-opus-4-5",
            max_tokens=1000,
            system="""You are an enterprise knowledge base assistant. Answer questions based on the provided document excerpts.

Rules:
1. Only answer based on provided document content
2. If the documents don't contain relevant information, say so explicitly
3. Cite sources using [Source N] format
4. Be accurate and concise""",
            messages=[{"role": "user", "content": f"Documents:\n{context}\n\nQuestion: {question}"}]
        )
        
        return {
            "answer": response.content[0].text,
            "sources": [r["metadata"]["doc_id"] for r in results]
        }

80.3 Phase 2: Production (100–10,000 Users)

Pure vector search struggles with exact-match queries (like order numbers). BM25 keyword search has limited semantic understanding. Production systems need both:

class HybridRetriever:
    """Vector search + BM25 + RRF fusion."""
    
    def __init__(self, vector_store, bm25_index):
        self.vector_store = vector_store
        self.bm25 = bm25_index
    
    def retrieve(self, query: str, top_k: int = 20) -> list:
        query_embedding = compute_embedding(query)
        vector_results = self.vector_store.query(vector=query_embedding, top_k=top_k)
        keyword_results = self.bm25.search(query, top_k=top_k)
        return self._rrf_fusion([vector_results, keyword_results])
    
    def _rrf_fusion(self, result_lists: list, k: int = 60) -> list:
        """Reciprocal Rank Fusion: score = sum(1 / (k + rank))"""
        scores = {}
        for result_list in result_lists:
            for rank, result in enumerate(result_list):
                doc_id = result.get("id")
                if doc_id not in scores:
                    scores[doc_id] = {"score": 0.0, "data": result}
                scores[doc_id]["score"] += 1.0 / (k + rank + 1)
        return [r["data"] for r in sorted(scores.values(), key=lambda x: x["score"], reverse=True)]

80.3.2 Cross-Encoder Reranking

class CrossEncoderReranker:
    """Cross-encoder reranker. Higher accuracy than bi-encoder at the cost of speed."""
    
    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model_name)
    
    def rerank(self, query: str, candidates: list, top_k: int = 5) -> list:
        pairs = [(query, c["text"]) for c in candidates]
        scores = self.model.predict(pairs)
        scored = sorted(zip(scores, candidates), key=lambda x: x[0], reverse=True)
        return [{**c, "rerank_score": float(s)} for s, c in scored[:top_k]]

80.3.3 Query Rewriting and Expansion

def rewrite_query(original_query: str, conversation_history: list = None) -> dict:
    """Rewrite queries to remove anaphora and dependency on conversation context."""
    history_text = ""
    if conversation_history:
        history_text = "Conversation history:\n" + "\n".join([
            f"User: {turn['user']}\nAssistant: {turn['assistant'][:100]}..."
            for turn in conversation_history[-3:]
        ])
    
    rewrite_prompt = f"""Rewrite the following user query for document retrieval.

{history_text}

Original query: {original_query}

Requirements:
1. Resolve pronouns (e.g., "it", "that", "the aforementioned")
2. Add context if the query depends on conversation history
3. Generate 3 semantically similar query variants for query expansion
4. Identify key entities and concepts

Output as JSON:
{{
  "rewritten_query": "standalone rewritten query",
  "query_variants": ["variant1", "variant2", "variant3"],
  "key_entities": ["entity1", "entity2"],
  "query_type": "factual|procedural|analytical|comparative"
}}"""
    
    response = client.messages.create(
        model="claude-haiku-4-5",
        max_tokens=400,
        messages=[{"role": "user", "content": rewrite_prompt}]
    )
    
    try:
        return json.loads(response.content[0].text)
    except:
        return {"rewritten_query": original_query, "query_variants": [original_query],
                "key_entities": [], "query_type": "factual"}

80.4 Phase 3: Scale (10,000–1 Million DAU)

80.4.1 Vector Database Selection

Feature Pinecone Qdrant pgvector
Deployment Fully managed cloud Self-hosted / cloud PostgreSQL extension
Scale ceiling Billions of vectors Billions (horizontal scale) Millions (single node)
p99 latency <50ms <20ms (self-hosted) <100ms
Full-text search No Yes (payload filtering) Yes (PostgreSQL)
Cost High (usage-based) Medium (infrastructure) Low (if PG already exists)
Best for Fast start, no ops capacity High-performance self-hosting Existing PG stack

80.4.2 Distributed Index Construction

from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

class DistributedIndexBuilder:
    
    def __init__(self, vector_store, embedding_fn, max_workers: int = 10):
        self.vector_store = vector_store
        self.embed = embedding_fn
        self.max_workers = max_workers
        self.chunker = DocumentChunker()
        self._progress = {"processed": 0, "failed": 0, "total": 0}
        self._lock = threading.Lock()
    
    def build_index(self, documents: list) -> dict:
        self._progress["total"] = len(documents)
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {executor.submit(self._process_document, doc): doc["id"]
                      for doc in documents}
            for future in as_completed(futures):
                doc_id = futures[future]
                try:
                    future.result()
                    with self._lock:
                        self._progress["processed"] += 1
                except Exception as e:
                    print(f"Failed to index {doc_id}: {e}")
                    with self._lock:
                        self._progress["failed"] += 1
        
        return dict(self._progress)
    
    def _process_document(self, doc: dict):
        chunks = self.chunker.recursive_character_splitting(doc["text"])
        items = [
            {"id": f"{doc['id']}_{i}",
             "vector": self.embed(chunk["text"]),
             "metadata": {"text": chunk["text"], "doc_id": doc["id"],
                         "chunk_index": i, **doc.get("metadata", {})}}
            for i, chunk in enumerate(chunks)
        ]
        self.vector_store.upsert_batch(items)

80.4.3 Multi-Layer Caching

import hashlib, json
from typing import Optional

class RAGCache:
    """
    L1: In-memory LRU cache
    L2: Redis distributed cache
    L3: Semantic cache (reuse results for similar queries)
    """
    
    def __init__(self, redis_client=None, semantic_threshold: float = 0.95):
        self.redis = redis_client
        self.semantic_threshold = semantic_threshold
        self._memory_cache = {}
    
    def get_cache_key(self, query: str) -> str:
        return hashlib.md5(query.encode()).hexdigest()
    
    def get(self, query: str) -> Optional[dict]:
        key = self.get_cache_key(query)
        if key in self._memory_cache:
            return self._memory_cache[key]
        if self.redis:
            cached = self.redis.get(f"rag:{key}")
            if cached:
                result = json.loads(cached)
                self._memory_cache[key] = result
                return result
        return None
    
    def set(self, query: str, result: dict, ttl: int = 3600):
        key = self.get_cache_key(query)
        self._memory_cache[key] = result
        if self.redis:
            self.redis.setex(f"rag:{key}", ttl, json.dumps(result))

80.5 Monitoring and Observability

80.5.1 Key Metrics Framework

RAG_METRICS = {
    "retrieval_quality": {
        "recall_at_k": "Proportion of top-K results containing relevant documents",
        "mrr": "Mean Reciprocal Rank",
        "ndcg": "Normalized Discounted Cumulative Gain"
    },
    "generation_quality": {
        "answer_faithfulness": "Is the answer grounded in retrieved content?",
        "answer_relevance": "Does the answer address the question?",
        "context_precision": "What fraction of retrieved context was actually used?"
    },
    "system_performance": {
        "retrieval_latency_p50": "Target: <100ms",
        "retrieval_latency_p99": "Target: <500ms",
        "generation_latency_p50": "Target: <2s",
        "total_latency_p99": "Target: <5s",
        "error_rate": "Target: <0.1%"
    },
    "business_metrics": {
        "answer_acceptance_rate": "Users who don't ask follow-up questions",
        "citation_click_rate": "Users who view the source document",
        "session_resolution_rate": "Sessions resolved without escalation"
    }
}

80.5.2 RAG Evaluation Pipeline

class RAGEvaluator:
    """Uses Claude as the evaluator."""
    
    def evaluate_answer(self, question: str, retrieved_context: str,
                        generated_answer: str) -> dict:
        eval_prompt = f"""Evaluate the quality of this RAG system output.

Question: {question}

Retrieved context:
{retrieved_context[:3000]}

Generated answer:
{generated_answer}

Score the following dimensions (1-5):
1. Faithfulness: Is the answer entirely grounded in the context, with no fabrication?
2. Relevance: Does the answer directly address the question?
3. Completeness: Does the answer cover all aspects of the question?
4. Context Precision: What fraction of the context was actually used? (0.0-1.0)

Output as JSON:
{{
  "faithfulness": {{"score": 1-5, "explanation": "..."}},
  "relevance": {{"score": 1-5, "explanation": "..."}},
  "completeness": {{"score": 1-5, "explanation": "..."}},
  "context_precision": {{"score": 0.0-1.0, "explanation": "..."}},
  "overall_quality": "excellent/good/acceptable/poor",
  "hallucination_detected": true/false,
  "hallucinated_content": "list any fabricated content here"
}}"""
        
        response = client.messages.create(
            model="claude-opus-4-5",
            max_tokens=1000,
            messages=[{"role": "user", "content": eval_prompt}]
        )
        
        try:
            return json.loads(response.content[0].text)
        except:
            return {"raw_evaluation": response.content[0].text}

80.6 Cost Optimization at Scale

80.6.1 Model Routing

def route_model(query_complexity_score: float) -> str:
    """Route queries to the right model based on complexity."""
    if query_complexity_score < 0.3:
        return "claude-haiku-4-5"      # ~5x cheaper, handles simple factual queries
    elif query_complexity_score < 0.7:
        return "claude-sonnet-4-5"     # balanced cost/quality
    else:
        return "claude-opus-4-5"       # reserved for complex reasoning tasks

80.6.2 Monthly Cost Projection at Scale

def estimate_monthly_cost(daily_active_users, avg_queries_per_user,
                           avg_context_tokens, avg_output_tokens) -> dict:
    monthly_queries = daily_active_users * avg_queries_per_user * 30
    
    # Embedding cost
    embedding_cost = (monthly_queries * 100 / 1_000_000) * 0.02  # $0.02/1M tokens
    
    # Claude generation cost (claude-sonnet example)
    input_cost = (monthly_queries * avg_context_tokens / 1_000_000) * 3.0   # $3/1M
    output_cost = (monthly_queries * avg_output_tokens / 1_000_000) * 15.0  # $15/1M
    
    total = embedding_cost + input_cost + output_cost
    
    return {
        "monthly_queries": monthly_queries,
        "monthly_cost_usd": round(total, 2),
        "cost_per_query_usd": round(total / monthly_queries, 5),
        "optimization_note": "Semantic caching + model routing can reduce this by 60-70%"
    }

# 1 million DAU estimate
print(estimate_monthly_cost(1_000_000, 3, 2000, 300))
# monthly_queries: 90,000,000
# monthly_cost_usd: ~$837,000 (before optimization)
# After caching + routing: ~$250,000-300,000

80.7 Architecture Evolution Summary

POC Phase (0–100 users)
├── Single-machine deployment
├── Simple semantic chunking
├── Pure vector search
├── No caching
└── 5–15 second response time acceptable

Production Phase (100–10,000 users)
├── Hybrid retrieval (vector + BM25)
├── Cross-encoder reranking
├── Query rewriting and expansion
├── Redis query caching
└── Response time < 3 seconds

Scale Phase (10,000–1M DAU)
├── Distributed vector database (Pinecone / Qdrant cluster)
├── Multi-layer caching (L1 / L2 / semantic)
├── Query routing (model tiering)
├── Async processing pipeline
├── Comprehensive monitoring and alerting
└── p99 response time < 2 seconds

Beyond 1M DAU
├── Multi-region deployment (latency reduction)
├── Model distillation (shift expensive inference to local models)
├── Real-time incremental indexing (document updates reflected instantly)
├── Personalized retrieval (user-profile-aware)
└── A/B testing framework (continuous module optimization)

Closing Words: The Final Chapter

This final chapter, using an enterprise RAG system as its vehicle, synthesizes the major themes of this book: prompt engineering (query rewriting, answer generation), evaluation frameworks (RAG Evaluator), multilingual support (language-aware chunking and retrieval), and system architecture design (from POC to millions of daily active users).

Summary of core design principles:

1. Evolve in phases. Do not attempt to build the full production system at POC stage. Start with the simplest working version, then add complexity driven by data.

2. Retrieval quality is the core. 80% of RAG quality problems originate in retrieval, not generation. Before optimizing prompts, ensure retrieval is surfacing the right documents.

3. Observability before optimization. Without metrics there is no direction for improvement. Build the complete metrics framework first, then optimize.

4. Maintain cost consciousness throughout. At 1 million DAU scale, each 1% cost reduction is worth tens of thousands of dollars per month. Query routing, caching, and context compression are the three primary cost control levers.

5. Keep humans in the loop. AI is a means, not an end. The best RAG systems honestly say "I don't know" when confidence is insufficient, and guide users toward human experts.

Building AI systems is a marathon, not a sprint. The frameworks, code, and ways of thinking this book provides are the starting point of that journey, not the finish line.

Rate this chapter
4.8  / 5  (3 ratings)

💬 Comments