第 28 章
多会话软件开发模式:Initializer / Subsequent / End-of-session 三段式架构
第二十八章:RAG 架构与 Claude:检索增强生成的工程实践
28.1 RAG 的本质:超越上下文窗口的知识访问
检索增强生成(Retrieval-Augmented Generation,RAG)解决的是大语言模型的根本局限:训练数据有截止日期,上下文窗口有容量限制,但企业知识库可以无限增长。
RAG 的核心思路是:不把所有文档都塞进上下文(不现实),而是在每次查询时动态检索最相关的片段,只将这些片段连同查询一起发给模型。这使得 Claude 能够基于任意规模的知识库回答问题,同时保持低延迟和可控成本。
RAG vs 微调 vs 长上下文
| 方案 | 适用场景 | 优势 | 劣势 |
|---|---|---|---|
| RAG | 频繁更新的知识库、事实性查询 | 实时更新、可解释、低成本 | 检索质量依赖工程 |
| 微调 | 固定的风格/格式/领域知识 | 模型"内化"知识 | 更新成本高、不可实时 |
| 长上下文直接注入 | 文档数量少(<10篇) | 实现简单 | Token 成本高、稀释注意力 |
对于大多数企业知识库场景(数千到数十万文档),RAG 是最实际的工程选择。
28.2 RAG 系统架构全景
┌─────────────────────────────────────────────────────────┐
│ 离线索引流程 │
│ │
│ 原始文档 → 文档加载 → 分块 → 嵌入 → 向量数据库存储 │
│ (PDF/MD/HTML) (Loader) (Chunker) (Embedder) (VectorDB) │
└─────────────────────────────────────────────────────────┘
│
向量数据库
│
┌─────────────────────────────────────────────────────────┐
│ 在线查询流程 │
│ │
│ 用户查询 → 查询重写 → 嵌入 → 向量检索 → 重排序 → 上下文 │
│ (可选) (Top-K) (Reranker) 拼装 │
│ │ │
│ Claude API │
│ │ │
│ 最终回答 │
└─────────────────────────────────────────────────────────┘
核心组件
- 文档加载器(Loader):将各种格式的文档转化为纯文本
- 分块器(Chunker):将长文本切分为适合嵌入的片段
- 嵌入模型(Embedder):将文本转化为向量表示
- 向量数据库(VectorDB):存储和检索向量
- 重排序器(Reranker):对初步检索结果精排
- 上下文拼装器:将检索结果格式化注入 Claude
28.3 分块策略:RAG 工程的关键
分块(Chunking)是 RAG 系统最容易被忽视但影响最大的环节。分块不当会导致:
- 截断语义:一个完整的论断被分成两块,每块单独看都不完整
- 块间重复:相邻块有大量重复,浪费检索容量
- 块太短:检索到的片段缺乏足够上下文
- 块太长:嵌入质量下降,单块占用过多上下文窗口
策略一:固定大小分块(基线方案)
def fixed_size_chunk(text: str, chunk_size: int = 512,
overlap: int = 64) -> list[str]:
"""
最简单的固定大小分块
chunk_size: 每块的字符数
overlap: 相邻块的重叠字符数(保持语义连续性)
"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start = end - overlap # 滑动窗口
return chunks
策略二:递归分块(推荐基础方案)
def recursive_chunk(text: str,
chunk_size: int = 1000,
overlap: int = 100,
separators: list[str] | None = None) -> list[str]:
"""
按层级分隔符递归分块,优先在自然边界断开
分隔符优先级:段落 > 换行 > 句子 > 词 > 字符
"""
if separators is None:
separators = ["\n\n", "\n", "。", "!", "?", ". ", "! ", "? ", " ", ""]
def _split(text: str, seps: list[str]) -> list[str]:
if not seps:
return [text]
sep = seps[0]
remaining_seps = seps[1:]
if sep == "":
# 最后的分隔符:字符级别
return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size - overlap)]
parts = text.split(sep)
chunks = []
current = ""
for part in parts:
if len(current) + len(sep) + len(part) <= chunk_size:
current = current + sep + part if current else part
else:
if current:
chunks.append(current)
# 如果单个部分超过 chunk_size,递归分割
if len(part) > chunk_size:
chunks.extend(_split(part, remaining_seps))
current = ""
else:
current = part
if current:
chunks.append(current)
return [c.strip() for c in chunks if c.strip()]
return _split(text, separators)
策略三:语义分块(质量最高)
语义分块通过计算相邻句子的嵌入相似度,在语义边界处断开:
import numpy as np
from sentence_transformers import SentenceTransformer
def semantic_chunk(text: str,
model_name: str = "BAAI/bge-m3",
threshold: float = 0.7,
min_chunk_size: int = 200,
max_chunk_size: int = 2000) -> list[str]:
"""
基于语义相似度的分块
在相邻句子语义相似度低于阈值的地方断开
"""
model = SentenceTransformer(model_name)
# 1. 切分为句子
import re
sentences = re.split(r'(?<=[。!?.!?])\s*', text)
sentences = [s.strip() for s in sentences if s.strip()]
if len(sentences) <= 1:
return [text]
# 2. 计算所有句子的嵌入
embeddings = model.encode(sentences, batch_size=32)
# 3. 计算相邻句子的余弦相似度
similarities = []
for i in range(len(embeddings) - 1):
sim = np.dot(embeddings[i], embeddings[i+1]) / (
np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[i+1])
)
similarities.append(float(sim))
# 4. 在相似度低的地方断开
chunks = []
current_sentences = [sentences[0]]
current_size = len(sentences[0])
for i, (sentence, sim) in enumerate(zip(sentences[1:], similarities)):
should_break = (
sim < threshold or # 语义断点
current_size + len(sentence) > max_chunk_size # 超过最大长度
)
if should_break and current_size >= min_chunk_size:
chunks.append(" ".join(current_sentences))
current_sentences = [sentence]
current_size = len(sentence)
else:
current_sentences.append(sentence)
current_size += len(sentence)
if current_sentences:
chunks.append(" ".join(current_sentences))
return chunks
策略四:结构化文档分块(Markdown/代码)
import re
def markdown_chunk(text: str, max_chunk_size: int = 1500) -> list[dict]:
"""
针对 Markdown 文档的结构化分块
保留标题层级作为元数据,有助于检索时的定位
"""
chunks = []
# 按标题层级分割
sections = re.split(r'^(#{1,4}\s+.+)$', text, flags=re.MULTILINE)
current_header_stack = []
current_content = []
for item in sections:
header_match = re.match(r'^(#{1,4})\s+(.+)$', item)
if header_match:
# 保存之前积累的内容
if current_content:
full_content = "\n".join(current_content).strip()
if full_content:
chunks.append({
"content": full_content,
"breadcrumb": " > ".join(h[1] for h in current_header_stack),
"level": current_header_stack[-1][0] if current_header_stack else 0
})
current_content = [item]
# 更新标题栈
level = len(header_match.group(1))
title = header_match.group(2)
current_header_stack = [(h[0], h[1]) for h in current_header_stack if h[0] < level]
current_header_stack.append((level, title))
else:
current_content.append(item)
# 处理最后一段
if current_content:
full_content = "\n".join(current_content).strip()
if full_content:
chunks.append({
"content": full_content,
"breadcrumb": " > ".join(h[1] for h in current_header_stack),
"level": current_header_stack[-1][0] if current_header_stack else 0
})
# 对超长块进行递归分割
result = []
for chunk in chunks:
if len(chunk["content"]) > max_chunk_size:
sub_chunks = recursive_chunk(chunk["content"], max_chunk_size)
for i, sub in enumerate(sub_chunks):
result.append({
**chunk,
"content": sub,
"sub_chunk_index": i
})
else:
result.append(chunk)
return result
28.4 嵌入模型选择
不同的嵌入模型在检索质量上差异显著:
| 模型 | 维度 | 语言 | 性能 | 适用场景 |
|---|---|---|---|---|
BAAI/bge-m3 |
1024 | 中英双语 | 高 | 通用推荐 |
text-embedding-3-large |
3072 | 多语言 | 最高 | 高质量生产 |
text-embedding-3-small |
1536 | 多语言 | 中 | 成本敏感 |
BAAI/bge-small-zh |
512 | 中文 | 中 | 纯中文场景 |
from openai import OpenAI
def embed_with_openai(texts: list[str],
model: str = "text-embedding-3-small") -> list[list[float]]:
"""使用 OpenAI 嵌入 API(也可用于其他 OpenAI 兼容 API)"""
client = OpenAI()
# 批量请求,每批最多 2048 条
all_embeddings = []
batch_size = 100
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
response = client.embeddings.create(model=model, input=batch)
all_embeddings.extend([r.embedding for r in response.data])
return all_embeddings
28.5 检索与重排序
基础向量检索
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
import uuid
class RAGVectorStore:
"""RAG 专用向量存储"""
def __init__(self, collection: str = "knowledge_base"):
self.client = QdrantClient(host="localhost", port=6333)
self.collection = collection
def index_chunks(self, chunks: list[dict], embeddings: list[list[float]]):
"""批量索引分块和对应的嵌入"""
points = []
for chunk, embedding in zip(chunks, embeddings):
points.append(PointStruct(
id=str(uuid.uuid4()),
vector=embedding,
payload={
"content": chunk.get("content", chunk) if isinstance(chunk, dict) else chunk,
"source": chunk.get("source", "unknown") if isinstance(chunk, dict) else "unknown",
"breadcrumb": chunk.get("breadcrumb", "") if isinstance(chunk, dict) else "",
"metadata": {k: v for k, v in chunk.items()
if k not in ("content",)} if isinstance(chunk, dict) else {}
}
))
# 批量上传
batch_size = 100
for i in range(0, len(points), batch_size):
self.client.upsert(
collection_name=self.collection,
points=points[i:i+batch_size]
)
def retrieve(self, query_embedding: list[float],
top_k: int = 20,
source_filter: str | None = None) -> list[dict]:
"""检索最相关的文本块"""
query_filter = None
if source_filter:
query_filter = Filter(
must=[FieldCondition(
key="source",
match=MatchValue(value=source_filter)
)]
)
results = self.client.search(
collection_name=self.collection,
query_vector=query_embedding,
query_filter=query_filter,
limit=top_k,
with_payload=True
)
return [
{
"id": str(r.id),
"content": r.payload["content"],
"source": r.payload.get("source", ""),
"breadcrumb": r.payload.get("breadcrumb", ""),
"score": r.score
}
for r in results
]
重排序(Reranking)
向量检索的召回率高但精度不足,重排序用交叉编码器精排:
from sentence_transformers import CrossEncoder
class Reranker:
"""基于 CrossEncoder 的重排序器"""
def __init__(self, model_name: str = "BAAI/bge-reranker-v2-m3"):
self.model = CrossEncoder(model_name)
def rerank(self, query: str, candidates: list[dict],
top_k: int = 5) -> list[dict]:
"""
对候选块进行精排
CrossEncoder 同时编码查询和候选文档,比双塔模型更精准
"""
if not candidates:
return []
pairs = [(query, c["content"]) for c in candidates]
scores = self.model.predict(pairs)
# 按重排序分数排序
ranked = sorted(
zip(candidates, scores),
key=lambda x: x[1],
reverse=True
)
return [
{**candidate, "rerank_score": float(score)}
for candidate, score in ranked[:top_k]
]
混合检索(Hybrid Search)
结合向量检索和关键词检索(BM25),覆盖语义相似但措辞迥异的情况:
from rank_bm25 import BM25Okapi
import jieba # 中文分词
class HybridRetriever:
"""混合检索器:向量检索 + BM25 关键词检索"""
def __init__(self, chunks: list[str]):
self.chunks = chunks
# 构建 BM25 索引
tokenized = [list(jieba.cut(c)) for c in chunks]
self.bm25 = BM25Okapi(tokenized)
def hybrid_search(self, query: str, query_embedding: list[float],
vector_results: list[dict],
alpha: float = 0.6,
top_k: int = 10) -> list[dict]:
"""
RRF (Reciprocal Rank Fusion) 融合向量检索和 BM25 结果
alpha: 向量检索权重(1-alpha 为 BM25 权重)
"""
# BM25 检索
tokens = list(jieba.cut(query))
bm25_scores = self.bm25.get_scores(tokens)
bm25_top = sorted(
enumerate(bm25_scores),
key=lambda x: x[1],
reverse=True
)[:top_k * 2]
# RRF 融合
rrf_k = 60 # RRF 常数
scores = {}
# 向量检索分数
for rank, result in enumerate(vector_results):
doc_id = result["id"]
scores[doc_id] = scores.get(doc_id, 0) + alpha / (rrf_k + rank + 1)
# BM25 分数(通过内容匹配找到对应 id)
for rank, (idx, _) in enumerate(bm25_top):
if idx < len(self.chunks):
# 通过内容查找对应 id
content = self.chunks[idx]
matching = [r for r in vector_results if r["content"] == content]
if matching:
doc_id = matching[0]["id"]
scores[doc_id] = scores.get(doc_id, 0) + (1 - alpha) / (rrf_k + rank + 1)
# 按融合分数排序
sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
result_map = {r["id"]: r for r in vector_results}
return [
{**result_map[id_], "hybrid_score": scores[id_]}
for id_ in sorted_ids[:top_k]
if id_ in result_map
]
28.6 查询优化
查询重写
原始用户查询往往不适合直接用于向量检索:
def rewrite_query_for_retrieval(client: anthropic.Anthropic,
query: str,
conversation_history: list[dict] | None = None) -> list[str]:
"""
将用户查询改写为更适合检索的形式
返回多个查询变体,增加召回率
"""
context = ""
if conversation_history:
last_few = conversation_history[-4:]
context = "\n".join(f"{m['role']}: {m['content'][:200]}" for m in last_few)
response = client.messages.create(
model="claude-haiku-4-5",
max_tokens=256,
messages=[{
"role": "user",
"content": f"""给定以下对话上下文(如有)和用户查询,生成3个改写版本,
用于从知识库检索相关文档。每个版本独立成行,不加序号。
对话上下文:
{context}
用户查询:{query}
要求:
- 消解指代(将"它"、"这个"等替换为具体名词)
- 扩展缩写和专业术语
- 生成不同角度的查询变体"""
}]
)
rewrites = response.content[0].text.strip().split("\n")
return [query] + [r.strip() for r in rewrites if r.strip()][:3]
28.7 完整 RAG Pipeline
import anthropic
from sentence_transformers import SentenceTransformer
class ClaudeRAGPipeline:
"""完整的 RAG Pipeline"""
def __init__(self, collection: str = "knowledge_base"):
self.claude = anthropic.Anthropic()
self.embedder = SentenceTransformer("BAAI/bge-m3")
self.vector_store = RAGVectorStore(collection)
self.reranker = Reranker()
def _format_context(self, chunks: list[dict]) -> str:
"""将检索结果格式化为 Claude 可理解的上下文"""
parts = []
for i, chunk in enumerate(chunks, 1):
source = chunk.get("source", "unknown")
breadcrumb = chunk.get("breadcrumb", "")
loc = f"{source} > {breadcrumb}" if breadcrumb else source
parts.append(f"[文档 {i}] 来源:{loc}\n{chunk['content']}")
return "\n\n---\n\n".join(parts)
def query(self, user_question: str,
conversation_history: list[dict] | None = None,
top_k_retrieve: int = 20,
top_k_rerank: int = 5) -> str:
"""
执行完整的 RAG 查询流程
1. 查询重写
2. 向量检索
3. 重排序
4. 上下文注入 + Claude 回答
"""
# 步骤1:查询重写
queries = rewrite_query_for_retrieval(
self.claude, user_question, conversation_history
)
# 步骤2:多查询向量检索(去重合并)
all_results = []
seen_ids = set()
for q in queries:
q_embedding = self.embedder.encode(q).tolist()
results = self.vector_store.retrieve(q_embedding, top_k=top_k_retrieve // len(queries))
for r in results:
if r["id"] not in seen_ids:
all_results.append(r)
seen_ids.add(r["id"])
# 步骤3:重排序
reranked = self.reranker.rerank(user_question, all_results, top_k=top_k_rerank)
# 步骤4:格式化上下文
context = self._format_context(reranked)
# 步骤5:调用 Claude
system = """你是一个专业的知识助手。
根据提供的文档片段回答用户问题。
要求:
- 只使用提供的文档内容作为依据
- 如果文档中没有相关信息,明确说明"文档中未包含此信息"
- 引用时注明来源(如"根据文档1...")
- 不要凭空添加未在文档中出现的信息"""
messages_to_send = conversation_history.copy() if conversation_history else []
messages_to_send.append({
"role": "user",
"content": f"<retrieved_documents>\n{context}\n</retrieved_documents>\n\n{user_question}"
})
response = self.claude.messages.create(
model="claude-opus-4-5",
max_tokens=2048,
system=system,
messages=messages_to_send
)
return response.content[0].text
# 使用示例
pipeline = ClaudeRAGPipeline(collection="company_docs")
# 离线索引(一次性)
docs = [
{"content": "我们的退款政策是...", "source": "policy.md", "breadcrumb": "退款政策"},
{"content": "产品功能列表...", "source": "features.md", "breadcrumb": "功能说明"}
]
embeddings = [pipeline.embedder.encode(d["content"]).tolist() for d in docs]
pipeline.vector_store.index_chunks(docs, embeddings)
# 在线查询
answer = pipeline.query("你们的退款流程是什么?")
print(answer)
小结
RAG 是将 Claude 与企业知识库结合的最实用架构。工程质量的核心在于:
- 分块策略:递归分块适合通用场景,语义分块质量更高,Markdown 感知分块保留结构信息
- 嵌入模型:
BAAI/bge-m3是中英双语的优质选择,text-embedding-3-large质量更高 - 检索优化:向量检索 + BM25 混合检索提升召回率,CrossEncoder 重排序提升精度
- 查询优化:多查询重写扩展语义覆盖
- 上下文注入:结构化格式(含来源和位置信息)帮助 Claude 生成有依据的回答
下一章开始进入 Part 6,探讨 Claude.ai 平台上的 Managed Agents 体系——Projects、Artifacts 与 Agent 生命周期管理。