案例三:多语言内容生产流水线(Batch + Structured Outputs + Connector 自动化)
第八十章:构建企业级 RAG 系统:从 POC 到百万日活的架构演进
80.1 企业级 RAG 的全景视角
检索增强生成(Retrieval-Augmented Generation,RAG)是目前企业 AI 应用中最主流的架构模式。与纯 LLM 问答相比,RAG 的价值在于:将企业私有知识库与 LLM 的生成能力结合,实现基于最新、最准确的企业内部信息进行问答。
本章以一家假设的企业——拥有 50 万份内部文档、日活用户 100 万的大型企业——为背景,展示从 0 到生产的完整架构演进路径。
80.1.1 系统架构全景(ASCII 图)
┌─────────────────────────────────────────────────────────────────┐
│ 企业级 RAG 系统架构 │
├────────────────────────┬────────────────────────────────────────┤
│ 离线管道(索引构建) │ 在线管道(查询服务) │
│ │ │
│ 文档源 │ 用户查询 │
│ ┌──────────────┐ │ ┌─────────────────────────────────┐ │
│ │ SharePoint │ │ │ API Gateway │ │
│ │ Confluence │ │ │ (认证/限流/日志) │ │
│ │ S3/OSS │──┐ │ └─────────────┬───────────────────┘ │
│ │ 数据库 │ │ │ ↓ │
│ └──────────────┘ │ │ ┌─────────────────────────────────┐ │
│ ↓ │ │ 查询理解层 │ │
│ ┌──────────────────┐ │ │ - 意图分类 │ │
│ │ 文档预处理 │ │ │ - 查询改写/扩展 │ │
│ │ - 格式解析 │ │ │ - 语言检测 │ │
│ │ - 文本清洗 │ │ └─────────────┬───────────────────┘ │
│ │ - 元数据提取 │ │ ↓ │
│ └───────┬──────────┘ │ ┌─────────────────────────────────┐ │
│ ↓ │ │ 混合检索层 │ │
│ ┌──────────────────┐ │ │ ┌──────────┐ ┌─────────────┐ │ │
│ │ 分块策略 │ │ │ │向量检索 │ │关键词检索 │ │ │
│ │ (Chunking) │ │ │ │(Semantic) │ │(BM25/ES) │ │ │
│ │ - 固定大小 │ │ │ └────┬─────┘ └──────┬──────┘ │ │
│ │ - 语义边界 │ │ │ └────────┬───────┘ │ │
│ │ - 递归分割 │ │ │ ↓ │ │
│ └───────┬──────────┘ │ │ RRF 融合排序 │ │
│ ↓ │ └─────────────┬───────────────────┘ │
│ ┌──────────────────┐ │ ↓ │
│ │ 嵌入计算 │ │ ┌─────────────────────────────────┐ │
│ │ - Cohere/OpenAI │ │ │ 重排序层 (Reranker) │ │
│ │ - BGE/E5 │ │ │ Cross-encoder 精排 │ │
│ └───────┬──────────┘ │ └─────────────┬───────────────────┘ │
│ ↓ │ ↓ │
│ ┌──────────────────┐ │ ┌─────────────────────────────────┐ │
│ │ 向量数据库写入 │ │ │ 生成层 (Claude) │ │
│ │ - Pinecone │ │ │ - 上下文组装 │ │
│ │ - Qdrant │ │ │ - 引用注入 │ │
│ │ - pgvector │ │ │ - 答案生成 │ │
│ └──────────────────┘ │ └─────────────┬───────────────────┘ │
│ │ ↓ │
│ │ ┌─────────────────────────────────┐ │
│ │ │ 后处理层 │ │
│ │ │ - 引用验证 │ │
│ │ │ - 格式化输出 │ │
│ │ │ - 置信度评估 │ │
│ │ └─────────────────────────────────┘ │
└────────────────────────┴────────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 监控与可观测性栈 │
│ Prometheus + Grafana + Jaeger │
└─────────────────────────────────────┘
80.2 第一阶段:POC(0-100 用户)
80.2.1 POC 的目标与约束
POC 阶段的核心目标是验证技术可行性,而不是构建可扩展的系统。典型的 POC 约束:
- 单机部署
- 文档量 < 10,000
- 响应时间要求宽松(< 10 秒可接受)
- 无高可用要求
80.2.2 分块策略(Chunking Strategy)
分块是 RAG 效果最关键的决定因素之一。错误的分块策略会导致:检索到的上下文信息不完整、或者包含大量无关内容。
from anthropic import Anthropic
import re
client = Anthropic()
class DocumentChunker:
"""
三种核心分块策略
"""
def fixed_size_chunking(
self,
text: str,
chunk_size: int = 512,
overlap: int = 50
) -> list:
"""
固定大小分块(按字符数)
优点:简单、可预测
缺点:可能在句子中间截断
"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
# 尝试在句子边界截断
if end < len(text):
last_period = max(
chunk.rfind('。'),
chunk.rfind('.'),
chunk.rfind('\n')
)
if last_period > chunk_size * 0.7: # 至少保留70%内容
chunk = chunk[:last_period + 1]
end = start + last_period + 1
chunks.append({
"text": chunk,
"start_char": start,
"end_char": start + len(chunk),
"chunk_type": "fixed_size"
})
start = end - overlap # 重叠以保持上下文连续性
return chunks
def semantic_chunking(self, text: str) -> list:
"""
语义边界分块(按段落/章节)
优点:保持语义完整性
缺点:chunk 大小不均匀
"""
# 按段落分割
paragraphs = re.split(r'\n{2,}', text)
chunks = []
current_chunk = ""
for para in paragraphs:
para = para.strip()
if not para:
continue
# 如果当前 chunk 加上新段落不超过限制,则合并
if len(current_chunk) + len(para) < 800:
current_chunk += ("\n\n" if current_chunk else "") + para
else:
if current_chunk:
chunks.append({"text": current_chunk, "chunk_type": "semantic"})
current_chunk = para
if current_chunk:
chunks.append({"text": current_chunk, "chunk_type": "semantic"})
return chunks
def recursive_character_splitting(
self,
text: str,
chunk_size: int = 512,
separators: list = None
) -> list:
"""
递归字符分割(LangChain RecursiveCharacterTextSplitter 思路)
优点:在多级分隔符间智能选择
"""
if separators is None:
separators = ["\n\n", "\n", "。", ".", " ", ""]
def _split(text, separators):
if len(text) <= chunk_size:
return [text]
separator = separators[0] if separators else ""
if separator:
splits = text.split(separator)
else:
splits = list(text)
chunks = []
current = ""
for split in splits:
if len(current) + len(split) + len(separator) <= chunk_size:
current += (separator if current else "") + split
else:
if current:
chunks.append(current)
if len(split) > chunk_size and len(separators) > 1:
chunks.extend(_split(split, separators[1:]))
else:
current = split
if current:
chunks.append(current)
return chunks
raw_chunks = _split(text, separators)
return [{"text": c, "chunk_type": "recursive"} for c in raw_chunks if c.strip()]
80.2.3 POC 完整实现
import json
from typing import Optional
class SimplePOCRAG:
"""POC 阶段的简化 RAG 实现"""
def __init__(self, embedding_fn, vector_store):
"""
embedding_fn: 计算文本嵌入的函数
vector_store: 支持 upsert/query 操作的向量存储
"""
self.embed = embedding_fn
self.store = vector_store
self.chunker = DocumentChunker()
def index_document(self, doc_id: str, text: str, metadata: dict = None):
"""索引单个文档"""
chunks = self.chunker.semantic_chunking(text)
for i, chunk in enumerate(chunks):
chunk_id = f"{doc_id}_{i}"
embedding = self.embed(chunk["text"])
self.store.upsert(
id=chunk_id,
vector=embedding,
metadata={
"text": chunk["text"],
"doc_id": doc_id,
"chunk_index": i,
**(metadata or {})
}
)
def query(self, question: str, top_k: int = 5) -> dict:
"""查询问答"""
# 1. 检索相关文档
query_embedding = self.embed(question)
results = self.store.query(vector=query_embedding, top_k=top_k)
if not results:
return {"answer": "未找到相关信息", "sources": []}
# 2. 组装上下文
context = "\n\n---\n\n".join([
f"[来源 {i+1}]: {r['metadata']['text']}"
for i, r in enumerate(results)
])
# 3. 调用 Claude 生成答案
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=1000,
system="""你是企业知识库助手。基于提供的文档片段回答问题。
回答规则:
1. 只基于提供的文档内容回答
2. 如果文档中没有相关信息,明确说明
3. 引用来源(使用[来源N]格式)
4. 回答要准确、简洁""",
messages=[{
"role": "user",
"content": f"相关文档:\n{context}\n\n问题:{question}"
}]
)
return {
"answer": response.content[0].text,
"sources": [r["metadata"]["doc_id"] for r in results],
"context_used": len(results)
}
80.3 第二阶段:生产化(100-10,000 用户)
80.3.1 混合检索(Hybrid Search)
单一向量检索对于精确匹配(如 "订单号 ORD-12345")效果差,而 BM25 关键词检索对语义理解能力有限。生产系统需要两者结合:
class HybridRetriever:
"""
混合检索器:向量检索 + BM25 关键词检索 + RRF 融合
"""
def __init__(self, vector_store, bm25_index):
self.vector_store = vector_store
self.bm25 = bm25_index
def retrieve(self, query: str, top_k: int = 20) -> list:
"""
混合检索,返回 top_k 个结果
"""
# 1. 向量检索
query_embedding = compute_embedding(query)
vector_results = self.vector_store.query(
vector=query_embedding,
top_k=top_k
)
# 2. BM25 关键词检索
keyword_results = self.bm25.search(query, top_k=top_k)
# 3. RRF (Reciprocal Rank Fusion) 融合
return self._rrf_fusion(
[vector_results, keyword_results],
k=60 # RRF 常数
)
def _rrf_fusion(self, result_lists: list, k: int = 60) -> list:
"""
Reciprocal Rank Fusion 算法
为每个文档计算融合分数:sum(1 / (k + rank))
"""
scores = {}
for result_list in result_lists:
for rank, result in enumerate(result_list):
doc_id = result.get("id")
if doc_id not in scores:
scores[doc_id] = {"score": 0.0, "data": result}
scores[doc_id]["score"] += 1.0 / (k + rank + 1)
# 按 RRF 分数排序
sorted_results = sorted(
scores.values(),
key=lambda x: x["score"],
reverse=True
)
return [r["data"] for r in sorted_results]
80.3.2 重排序(Reranking)
检索到的候选集需要经过 Cross-encoder 进行精排,以提升最终上下文的质量:
class CrossEncoderReranker:
"""
Cross-encoder 重排序器
相比双塔向量模型,Cross-encoder 同时处理查询和文档,精度更高
"""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model_name)
def rerank(self, query: str, candidates: list, top_k: int = 5) -> list:
"""
对候选集进行重排序
candidates: [{"text": "...", "metadata": {...}}]
"""
if not candidates:
return []
# 构建 (query, passage) 对
pairs = [(query, c["text"]) for c in candidates]
# 批量评分
scores = self.model.predict(pairs)
# 按分数排序
scored = list(zip(scores, candidates))
scored.sort(key=lambda x: x[0], reverse=True)
return [
{**candidate, "rerank_score": float(score)}
for score, candidate in scored[:top_k]
]
80.3.3 查询改写与扩展
def rewrite_query(original_query: str, conversation_history: list = None) -> dict:
"""
查询改写:将口语化、上下文依赖的查询转化为独立的检索查询
"""
history_text = ""
if conversation_history:
history_text = "对话历史:\n" + "\n".join([
f"用户:{turn['user']}\n助手:{turn['assistant'][:100]}..."
for turn in conversation_history[-3:]
])
rewrite_prompt = f"""请将以下用户查询改写为更适合文档检索的形式。
{history_text}
原始查询:{original_query}
改写要求:
1. 消除指代词("它"、"这个"、"上面提到的")
2. 补充上下文(如果查询依赖对话历史)
3. 生成 3 个语义相似但措辞不同的查询变体(用于查询扩展)
4. 识别查询中的关键实体和概念
以 JSON 格式输出:
{{
"rewritten_query": "改写后的查询",
"query_variants": ["变体1", "变体2", "变体3"],
"key_entities": ["实体1", "实体2"],
"query_type": "factual|procedural|analytical|comparative"
}}"""
response = client.messages.create(
model="claude-haiku-4-5",
max_tokens=400,
messages=[{"role": "user", "content": rewrite_prompt}]
)
try:
return json.loads(response.content[0].text)
except:
return {
"rewritten_query": original_query,
"query_variants": [original_query],
"key_entities": [],
"query_type": "factual"
}
80.4 第三阶段:规模化(10,000-100 万日活)
80.4.1 向量数据库选型
| 特性 | Pinecone | Qdrant | pgvector |
|---|---|---|---|
| 部署模式 | 全托管云服务 | 自托管/云托管 | PostgreSQL 扩展 |
| 规模上限 | 亿级向量 | 亿级(水平扩展) | 百万级(单机) |
| 延迟(p99) | <50ms | <20ms(自托管) | <100ms |
| 全文检索 | 不支持 | 支持(内置 payload 过滤) | 支持(PostgreSQL) |
| 成本 | 高(按用量) | 中(自托管基础设施) | 低(已有 PG 环境) |
| 适用场景 | 快速启动,无运维能力 | 高性能自托管 | 已有 PG 技术栈 |
80.4.2 分布式索引构建
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import threading
class DistributedIndexBuilder:
"""
分布式文档索引构建器
支持并发处理大量文档
"""
def __init__(
self,
vector_store,
embedding_fn,
max_workers: int = 10,
batch_size: int = 100
):
self.vector_store = vector_store
self.embed = embedding_fn
self.max_workers = max_workers
self.batch_size = batch_size
self.chunker = DocumentChunker()
self._progress = {"processed": 0, "failed": 0, "total": 0}
self._lock = threading.Lock()
def build_index(self, documents: list) -> dict:
"""
批量构建索引
documents: [{"id": "...", "text": "...", "metadata": {...}}]
"""
self._progress["total"] = len(documents)
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
executor.submit(self._process_document, doc): doc["id"]
for doc in documents
}
for future in as_completed(futures):
doc_id = futures[future]
try:
future.result()
with self._lock:
self._progress["processed"] += 1
except Exception as e:
print(f"Failed to index {doc_id}: {e}")
with self._lock:
self._progress["failed"] += 1
return {
"success": self._progress["processed"],
"failed": self._progress["failed"],
"total": self._progress["total"]
}
def _process_document(self, doc: dict):
"""处理单个文档"""
chunks = self.chunker.recursive_character_splitting(doc["text"])
# 批量计算嵌入
texts = [c["text"] for c in chunks]
embeddings = [self.embed(t) for t in texts] # 生产环境应批量调用
# 批量写入向量存储
items = [
{
"id": f"{doc['id']}_{i}",
"vector": emb,
"metadata": {
"text": chunk["text"],
"doc_id": doc["id"],
"chunk_index": i,
**doc.get("metadata", {})
}
}
for i, (chunk, emb) in enumerate(zip(chunks, embeddings))
]
self.vector_store.upsert_batch(items)
80.4.3 缓存策略
import hashlib
from functools import lru_cache
class RAGCache:
"""
多层缓存策略
- L1: 内存缓存(LRU)
- L2: Redis 缓存(分布式)
- L3: 语义缓存(相似查询复用)
"""
def __init__(self, redis_client=None, semantic_threshold: float = 0.95):
self.redis = redis_client
self.semantic_threshold = semantic_threshold
self._query_cache = {} # 内存缓存
def get_cache_key(self, query: str) -> str:
return hashlib.md5(query.encode()).hexdigest()
def get(self, query: str) -> Optional[dict]:
"""多层缓存查询"""
key = self.get_cache_key(query)
# L1: 内存缓存
if key in self._query_cache:
return self._query_cache[key]
# L2: Redis 缓存
if self.redis:
cached = self.redis.get(f"rag:{key}")
if cached:
result = json.loads(cached)
self._query_cache[key] = result # 回填 L1
return result
return None
def set(self, query: str, result: dict, ttl: int = 3600):
"""写入缓存"""
key = self.get_cache_key(query)
# L1
self._query_cache[key] = result
# L2
if self.redis:
self.redis.setex(f"rag:{key}", ttl, json.dumps(result))
80.5 监控与可观测性
80.5.1 关键指标体系
RAG_METRICS = {
"retrieval_quality": {
"recall_at_k": "检索召回率(前K个结果包含相关文档的比例)",
"mrr": "平均倒数排名(Mean Reciprocal Rank)",
"ndcg": "归一化折损累积增益"
},
"generation_quality": {
"answer_faithfulness": "答案忠实度(是否基于检索内容)",
"answer_relevance": "答案相关性(是否回答了问题)",
"context_precision": "上下文精确率(检索内容有多少真正被使用)"
},
"system_performance": {
"retrieval_latency_p50": "检索延迟 P50(目标:<100ms)",
"retrieval_latency_p99": "检索延迟 P99(目标:<500ms)",
"generation_latency_p50": "生成延迟 P50(目标:<2s)",
"total_latency_p99": "总延迟 P99(目标:<5s)",
"error_rate": "错误率(目标:<0.1%)"
},
"business_metrics": {
"answer_acceptance_rate": "答案采纳率(用户不追问的比例)",
"citation_click_rate": "引用点击率(用户查看原文的比例)",
"session_resolution_rate": "会话解决率"
}
}
80.5.2 RAG 评估流水线
class RAGEvaluator:
"""使用 Claude 作为评估器"""
def evaluate_answer(
self,
question: str,
retrieved_context: str,
generated_answer: str
) -> dict:
"""
评估 RAG 答案质量
"""
eval_prompt = f"""请评估以下 RAG 系统的输出质量。
问题:{question}
检索到的上下文:
{retrieved_context[:3000]}
生成的答案:
{generated_answer}
请评估以下维度(1-5分):
1. 忠实度(Faithfulness):答案是否完全基于提供的上下文,没有编造信息?
2. 相关性(Relevance):答案是否直接回答了问题?
3. 完整性(Completeness):答案是否涵盖了问题的所有方面?
4. 精确性(Precision):上下文中的哪些信息被实际使用?(0=完全没用到,1=全部用到)
以 JSON 格式输出:
{{
"faithfulness": {{"score": 1-5, "explanation": "..."}},
"relevance": {{"score": 1-5, "explanation": "..."}},
"completeness": {{"score": 1-5, "explanation": "..."}},
"context_precision": {{"score": 0.0-1.0, "explanation": "..."}},
"overall_quality": "excellent/good/acceptable/poor",
"hallucination_detected": true/false,
"hallucinated_content": "如果有,列出编造的内容"
}}"""
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=1000,
messages=[{"role": "user", "content": eval_prompt}]
)
try:
return json.loads(response.content[0].text)
except:
return {"raw_evaluation": response.content[0].text}
80.6 成本优化策略
80.6.1 Token 成本优化
COST_OPTIMIZATION_STRATEGIES = {
"query_routing": {
"description": "根据查询复杂度路由到不同模型",
"implementation": """
# 简单查询 → Claude Haiku(~5倍便宜)
# 复杂推理 → Claude Opus
def route_model(query: str, complexity_score: float) -> str:
if complexity_score < 0.3:
return "claude-haiku-4-5"
elif complexity_score < 0.7:
return "claude-sonnet-4-5"
else:
return "claude-opus-4-5"
"""
},
"context_compression": {
"description": "压缩检索到的上下文,减少输入 Token",
"implementation": """
def compress_context(retrieved_chunks: list, query: str) -> str:
# 只保留与查询最相关的句子
# 可节省 30-50% 的上下文 Token
pass
"""
},
"semantic_caching": {
"description": "缓存相似查询的结果",
"estimated_savings": "重复查询率高的场景可节省 40-60% API 调用"
},
"batching": {
"description": "批量处理嵌入计算请求",
"note": "嵌入 API 批量调用通常有折扣"
}
}
80.6.2 规模化成本预测
def estimate_monthly_cost(
daily_active_users: int,
avg_queries_per_user: int,
avg_context_tokens: int,
avg_output_tokens: int
) -> dict:
"""
估算月度 RAG 系统成本
"""
monthly_queries = daily_active_users * avg_queries_per_user * 30
# Embedding 成本(以 text-embedding-3-small 为例)
embedding_cost_per_1k = 0.00002 # $0.00002 / 1K tokens
embedding_tokens_per_query = 100 # 平均每次查询的嵌入计算 tokens
monthly_embedding_cost = (monthly_queries * embedding_tokens_per_query / 1000) * embedding_cost_per_1k
# Claude 生成成本(以 claude-sonnet 为例)
input_cost_per_1k = 0.003 # $3 / 1M input tokens
output_cost_per_1k = 0.015 # $15 / 1M output tokens
monthly_input_tokens = monthly_queries * avg_context_tokens
monthly_output_tokens = monthly_queries * avg_output_tokens
monthly_llm_cost = (
(monthly_input_tokens / 1000) * input_cost_per_1k +
(monthly_output_tokens / 1000) * output_cost_per_1k
)
return {
"daily_active_users": daily_active_users,
"monthly_queries": monthly_queries,
"monthly_embedding_cost_usd": round(monthly_embedding_cost, 2),
"monthly_llm_cost_usd": round(monthly_llm_cost, 2),
"monthly_total_cost_usd": round(monthly_embedding_cost + monthly_llm_cost, 2),
"cost_per_query_usd": round((monthly_embedding_cost + monthly_llm_cost) / monthly_queries, 5)
}
# 百万日活场景估算
print(estimate_monthly_cost(
daily_active_users=1_000_000,
avg_queries_per_user=3,
avg_context_tokens=2000,
avg_output_tokens=300
))
# 预计输出:
# monthly_queries: 90,000,000
# monthly_llm_cost: ~$810,000(优化前)
# 通过缓存和模型路由可降至 ~$200,000-300,000
80.7 架构演进总结
POC 阶段(0-100 用户)
├── 单机部署
├── 简单语义分块
├── 纯向量检索
├── 无缓存
└── 响应时间 5-15 秒可接受
生产化阶段(100-10,000 用户)
├── 混合检索(向量 + BM25)
├── Cross-encoder 重排序
├── 查询改写与扩展
├── Redis 查询缓存
└── 响应时间 < 3 秒
规模化阶段(10,000-100 万日活)
├── 分布式向量数据库(Pinecone/Qdrant 集群)
├── 多层缓存(L1/L2/语义缓存)
├── 查询路由(模型分级)
├── 异步处理管道
├── 全面的监控告警
└── 响应时间 P99 < 2 秒
百万日活以上
├── 多地域部署(降低延迟)
├── 模型蒸馏(将高成本推理转移到本地模型)
├── 实时增量索引(文档更新即时反映)
├── 个性化检索(基于用户画像)
└── A/B 测试框架(持续优化各模块)
小结:全书终章
本章作为全书的终章,以企业级 RAG 系统为载体,综合了书中各个主题:Prompt 工程(查询改写、答案生成)、评估体系(RAG Evaluator)、多语言支持(分块与检索的语言感知)、以及系统架构设计(从 POC 到百万日活)。
核心设计原则总结:
-
分阶段演进:不要试图在 POC 阶段就构建完整的生产系统。从最简单的可工作版本开始,数据驱动地添加复杂性。
-
检索质量是核心:RAG 系统 80% 的质量问题来自检索,而非生成。在优化 Prompt 之前,先确保检索召回了正确的文档。
-
可观测性先于优化:没有指标就没有改进方向。先构建完整的指标体系,再进行系统优化。
-
成本意识贯穿始终:在百万日活规模,每百分之一的成本优化都价值数万美元/月。查询路由、缓存和上下文压缩是三大核心成本控制手段。
-
人在回路(Human in the Loop):AI 不是目的,而是手段。最好的 RAG 系统会在置信度不足时诚实地说"我不知道",并引导用户找到人类专家。
构建 AI 系统是一场马拉松,而非短跑。本书提供的框架、代码和思维方式,是这场旅程的起点,而非终点。