第 28 章

多会话软件开发模式:Initializer / Subsequent / End-of-session 三段式架构

第二十八章:RAG 架构与 Claude:检索增强生成的工程实践

28.1 RAG 的本质:超越上下文窗口的知识访问

检索增强生成(Retrieval-Augmented Generation,RAG)解决的是大语言模型的根本局限:训练数据有截止日期,上下文窗口有容量限制,但企业知识库可以无限增长。

RAG 的核心思路是:不把所有文档都塞进上下文(不现实),而是在每次查询时动态检索最相关的片段,只将这些片段连同查询一起发给模型。这使得 Claude 能够基于任意规模的知识库回答问题,同时保持低延迟和可控成本。

RAG vs 微调 vs 长上下文

方案 适用场景 优势 劣势
RAG 频繁更新的知识库、事实性查询 实时更新、可解释、低成本 检索质量依赖工程
微调 固定的风格/格式/领域知识 模型"内化"知识 更新成本高、不可实时
长上下文直接注入 文档数量少(<10篇) 实现简单 Token 成本高、稀释注意力

对于大多数企业知识库场景(数千到数十万文档),RAG 是最实际的工程选择。

28.2 RAG 系统架构全景

┌─────────────────────────────────────────────────────────┐
│                    离线索引流程                           │
│                                                         │
│  原始文档 → 文档加载 → 分块 → 嵌入 → 向量数据库存储       │
│  (PDF/MD/HTML)  (Loader)  (Chunker) (Embedder) (VectorDB) │
└─────────────────────────────────────────────────────────┘
                           │
                     向量数据库
                           │
┌─────────────────────────────────────────────────────────┐
│                    在线查询流程                           │
│                                                         │
│  用户查询 → 查询重写 → 嵌入 → 向量检索 → 重排序 → 上下文  │
│              (可选)           (Top-K)  (Reranker)  拼装  │
│                                              │           │
│                                         Claude API       │
│                                              │           │
│                                          最终回答         │
└─────────────────────────────────────────────────────────┘

核心组件

  1. 文档加载器(Loader):将各种格式的文档转化为纯文本
  2. 分块器(Chunker):将长文本切分为适合嵌入的片段
  3. 嵌入模型(Embedder):将文本转化为向量表示
  4. 向量数据库(VectorDB):存储和检索向量
  5. 重排序器(Reranker):对初步检索结果精排
  6. 上下文拼装器:将检索结果格式化注入 Claude

28.3 分块策略:RAG 工程的关键

分块(Chunking)是 RAG 系统最容易被忽视但影响最大的环节。分块不当会导致:

策略一:固定大小分块(基线方案)

def fixed_size_chunk(text: str, chunk_size: int = 512, 
                     overlap: int = 64) -> list[str]:
    """
    最简单的固定大小分块
    chunk_size: 每块的字符数
    overlap: 相邻块的重叠字符数(保持语义连续性)
    """
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        start = end - overlap  # 滑动窗口
    return chunks

策略二:递归分块(推荐基础方案)

def recursive_chunk(text: str, 
                    chunk_size: int = 1000,
                    overlap: int = 100,
                    separators: list[str] | None = None) -> list[str]:
    """
    按层级分隔符递归分块,优先在自然边界断开
    分隔符优先级:段落 > 换行 > 句子 > 词 > 字符
    """
    if separators is None:
        separators = ["\n\n", "\n", "。", "!", "?", ". ", "! ", "? ", " ", ""]
    
    def _split(text: str, seps: list[str]) -> list[str]:
        if not seps:
            return [text]
        
        sep = seps[0]
        remaining_seps = seps[1:]
        
        if sep == "":
            # 最后的分隔符:字符级别
            return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size - overlap)]
        
        parts = text.split(sep)
        
        chunks = []
        current = ""
        for part in parts:
            if len(current) + len(sep) + len(part) <= chunk_size:
                current = current + sep + part if current else part
            else:
                if current:
                    chunks.append(current)
                # 如果单个部分超过 chunk_size,递归分割
                if len(part) > chunk_size:
                    chunks.extend(_split(part, remaining_seps))
                    current = ""
                else:
                    current = part
        
        if current:
            chunks.append(current)
        
        return [c.strip() for c in chunks if c.strip()]
    
    return _split(text, separators)

策略三:语义分块(质量最高)

语义分块通过计算相邻句子的嵌入相似度,在语义边界处断开:

import numpy as np
from sentence_transformers import SentenceTransformer

def semantic_chunk(text: str, 
                   model_name: str = "BAAI/bge-m3",
                   threshold: float = 0.7,
                   min_chunk_size: int = 200,
                   max_chunk_size: int = 2000) -> list[str]:
    """
    基于语义相似度的分块
    在相邻句子语义相似度低于阈值的地方断开
    """
    model = SentenceTransformer(model_name)
    
    # 1. 切分为句子
    import re
    sentences = re.split(r'(?<=[。!?.!?])\s*', text)
    sentences = [s.strip() for s in sentences if s.strip()]
    
    if len(sentences) <= 1:
        return [text]
    
    # 2. 计算所有句子的嵌入
    embeddings = model.encode(sentences, batch_size=32)
    
    # 3. 计算相邻句子的余弦相似度
    similarities = []
    for i in range(len(embeddings) - 1):
        sim = np.dot(embeddings[i], embeddings[i+1]) / (
            np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[i+1])
        )
        similarities.append(float(sim))
    
    # 4. 在相似度低的地方断开
    chunks = []
    current_sentences = [sentences[0]]
    current_size = len(sentences[0])
    
    for i, (sentence, sim) in enumerate(zip(sentences[1:], similarities)):
        should_break = (
            sim < threshold or  # 语义断点
            current_size + len(sentence) > max_chunk_size  # 超过最大长度
        )
        
        if should_break and current_size >= min_chunk_size:
            chunks.append(" ".join(current_sentences))
            current_sentences = [sentence]
            current_size = len(sentence)
        else:
            current_sentences.append(sentence)
            current_size += len(sentence)
    
    if current_sentences:
        chunks.append(" ".join(current_sentences))
    
    return chunks

策略四:结构化文档分块(Markdown/代码)

import re

def markdown_chunk(text: str, max_chunk_size: int = 1500) -> list[dict]:
    """
    针对 Markdown 文档的结构化分块
    保留标题层级作为元数据,有助于检索时的定位
    """
    chunks = []
    
    # 按标题层级分割
    sections = re.split(r'^(#{1,4}\s+.+)$', text, flags=re.MULTILINE)
    
    current_header_stack = []
    current_content = []
    
    for item in sections:
        header_match = re.match(r'^(#{1,4})\s+(.+)$', item)
        
        if header_match:
            # 保存之前积累的内容
            if current_content:
                full_content = "\n".join(current_content).strip()
                if full_content:
                    chunks.append({
                        "content": full_content,
                        "breadcrumb": " > ".join(h[1] for h in current_header_stack),
                        "level": current_header_stack[-1][0] if current_header_stack else 0
                    })
            current_content = [item]
            
            # 更新标题栈
            level = len(header_match.group(1))
            title = header_match.group(2)
            current_header_stack = [(h[0], h[1]) for h in current_header_stack if h[0] < level]
            current_header_stack.append((level, title))
        else:
            current_content.append(item)
    
    # 处理最后一段
    if current_content:
        full_content = "\n".join(current_content).strip()
        if full_content:
            chunks.append({
                "content": full_content,
                "breadcrumb": " > ".join(h[1] for h in current_header_stack),
                "level": current_header_stack[-1][0] if current_header_stack else 0
            })
    
    # 对超长块进行递归分割
    result = []
    for chunk in chunks:
        if len(chunk["content"]) > max_chunk_size:
            sub_chunks = recursive_chunk(chunk["content"], max_chunk_size)
            for i, sub in enumerate(sub_chunks):
                result.append({
                    **chunk,
                    "content": sub,
                    "sub_chunk_index": i
                })
        else:
            result.append(chunk)
    
    return result

28.4 嵌入模型选择

不同的嵌入模型在检索质量上差异显著:

模型 维度 语言 性能 适用场景
BAAI/bge-m3 1024 中英双语 通用推荐
text-embedding-3-large 3072 多语言 最高 高质量生产
text-embedding-3-small 1536 多语言 成本敏感
BAAI/bge-small-zh 512 中文 纯中文场景
from openai import OpenAI

def embed_with_openai(texts: list[str], 
                      model: str = "text-embedding-3-small") -> list[list[float]]:
    """使用 OpenAI 嵌入 API(也可用于其他 OpenAI 兼容 API)"""
    client = OpenAI()
    
    # 批量请求,每批最多 2048 条
    all_embeddings = []
    batch_size = 100
    
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i+batch_size]
        response = client.embeddings.create(model=model, input=batch)
        all_embeddings.extend([r.embedding for r in response.data])
    
    return all_embeddings

28.5 检索与重排序

基础向量检索

from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
import uuid

class RAGVectorStore:
    """RAG 专用向量存储"""
    
    def __init__(self, collection: str = "knowledge_base"):
        self.client = QdrantClient(host="localhost", port=6333)
        self.collection = collection
    
    def index_chunks(self, chunks: list[dict], embeddings: list[list[float]]):
        """批量索引分块和对应的嵌入"""
        points = []
        for chunk, embedding in zip(chunks, embeddings):
            points.append(PointStruct(
                id=str(uuid.uuid4()),
                vector=embedding,
                payload={
                    "content": chunk.get("content", chunk) if isinstance(chunk, dict) else chunk,
                    "source": chunk.get("source", "unknown") if isinstance(chunk, dict) else "unknown",
                    "breadcrumb": chunk.get("breadcrumb", "") if isinstance(chunk, dict) else "",
                    "metadata": {k: v for k, v in chunk.items() 
                                 if k not in ("content",)} if isinstance(chunk, dict) else {}
                }
            ))
        
        # 批量上传
        batch_size = 100
        for i in range(0, len(points), batch_size):
            self.client.upsert(
                collection_name=self.collection,
                points=points[i:i+batch_size]
            )
    
    def retrieve(self, query_embedding: list[float], 
                 top_k: int = 20,
                 source_filter: str | None = None) -> list[dict]:
        """检索最相关的文本块"""
        
        query_filter = None
        if source_filter:
            query_filter = Filter(
                must=[FieldCondition(
                    key="source",
                    match=MatchValue(value=source_filter)
                )]
            )
        
        results = self.client.search(
            collection_name=self.collection,
            query_vector=query_embedding,
            query_filter=query_filter,
            limit=top_k,
            with_payload=True
        )
        
        return [
            {
                "id": str(r.id),
                "content": r.payload["content"],
                "source": r.payload.get("source", ""),
                "breadcrumb": r.payload.get("breadcrumb", ""),
                "score": r.score
            }
            for r in results
        ]

重排序(Reranking)

向量检索的召回率高但精度不足,重排序用交叉编码器精排:

from sentence_transformers import CrossEncoder

class Reranker:
    """基于 CrossEncoder 的重排序器"""
    
    def __init__(self, model_name: str = "BAAI/bge-reranker-v2-m3"):
        self.model = CrossEncoder(model_name)
    
    def rerank(self, query: str, candidates: list[dict], 
               top_k: int = 5) -> list[dict]:
        """
        对候选块进行精排
        CrossEncoder 同时编码查询和候选文档,比双塔模型更精准
        """
        if not candidates:
            return []
        
        pairs = [(query, c["content"]) for c in candidates]
        scores = self.model.predict(pairs)
        
        # 按重排序分数排序
        ranked = sorted(
            zip(candidates, scores),
            key=lambda x: x[1],
            reverse=True
        )
        
        return [
            {**candidate, "rerank_score": float(score)}
            for candidate, score in ranked[:top_k]
        ]

结合向量检索和关键词检索(BM25),覆盖语义相似但措辞迥异的情况:

from rank_bm25 import BM25Okapi
import jieba  # 中文分词

class HybridRetriever:
    """混合检索器:向量检索 + BM25 关键词检索"""
    
    def __init__(self, chunks: list[str]):
        self.chunks = chunks
        # 构建 BM25 索引
        tokenized = [list(jieba.cut(c)) for c in chunks]
        self.bm25 = BM25Okapi(tokenized)
    
    def hybrid_search(self, query: str, query_embedding: list[float],
                       vector_results: list[dict], 
                       alpha: float = 0.6,
                       top_k: int = 10) -> list[dict]:
        """
        RRF (Reciprocal Rank Fusion) 融合向量检索和 BM25 结果
        alpha: 向量检索权重(1-alpha 为 BM25 权重)
        """
        # BM25 检索
        tokens = list(jieba.cut(query))
        bm25_scores = self.bm25.get_scores(tokens)
        bm25_top = sorted(
            enumerate(bm25_scores), 
            key=lambda x: x[1], 
            reverse=True
        )[:top_k * 2]
        
        # RRF 融合
        rrf_k = 60  # RRF 常数
        scores = {}
        
        # 向量检索分数
        for rank, result in enumerate(vector_results):
            doc_id = result["id"]
            scores[doc_id] = scores.get(doc_id, 0) + alpha / (rrf_k + rank + 1)
        
        # BM25 分数(通过内容匹配找到对应 id)
        for rank, (idx, _) in enumerate(bm25_top):
            if idx < len(self.chunks):
                # 通过内容查找对应 id
                content = self.chunks[idx]
                matching = [r for r in vector_results if r["content"] == content]
                if matching:
                    doc_id = matching[0]["id"]
                    scores[doc_id] = scores.get(doc_id, 0) + (1 - alpha) / (rrf_k + rank + 1)
        
        # 按融合分数排序
        sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
        result_map = {r["id"]: r for r in vector_results}
        
        return [
            {**result_map[id_], "hybrid_score": scores[id_]}
            for id_ in sorted_ids[:top_k]
            if id_ in result_map
        ]

28.6 查询优化

查询重写

原始用户查询往往不适合直接用于向量检索:

def rewrite_query_for_retrieval(client: anthropic.Anthropic, 
                                  query: str,
                                  conversation_history: list[dict] | None = None) -> list[str]:
    """
    将用户查询改写为更适合检索的形式
    返回多个查询变体,增加召回率
    """
    context = ""
    if conversation_history:
        last_few = conversation_history[-4:]
        context = "\n".join(f"{m['role']}: {m['content'][:200]}" for m in last_few)
    
    response = client.messages.create(
        model="claude-haiku-4-5",
        max_tokens=256,
        messages=[{
            "role": "user",
            "content": f"""给定以下对话上下文(如有)和用户查询,生成3个改写版本,
用于从知识库检索相关文档。每个版本独立成行,不加序号。

对话上下文:
{context}

用户查询:{query}

要求:
- 消解指代(将"它"、"这个"等替换为具体名词)
- 扩展缩写和专业术语
- 生成不同角度的查询变体"""
        }]
    )
    
    rewrites = response.content[0].text.strip().split("\n")
    return [query] + [r.strip() for r in rewrites if r.strip()][:3]

28.7 完整 RAG Pipeline

import anthropic
from sentence_transformers import SentenceTransformer

class ClaudeRAGPipeline:
    """完整的 RAG Pipeline"""
    
    def __init__(self, collection: str = "knowledge_base"):
        self.claude = anthropic.Anthropic()
        self.embedder = SentenceTransformer("BAAI/bge-m3")
        self.vector_store = RAGVectorStore(collection)
        self.reranker = Reranker()
    
    def _format_context(self, chunks: list[dict]) -> str:
        """将检索结果格式化为 Claude 可理解的上下文"""
        parts = []
        for i, chunk in enumerate(chunks, 1):
            source = chunk.get("source", "unknown")
            breadcrumb = chunk.get("breadcrumb", "")
            loc = f"{source} > {breadcrumb}" if breadcrumb else source
            parts.append(f"[文档 {i}] 来源:{loc}\n{chunk['content']}")
        return "\n\n---\n\n".join(parts)
    
    def query(self, user_question: str, 
              conversation_history: list[dict] | None = None,
              top_k_retrieve: int = 20,
              top_k_rerank: int = 5) -> str:
        """
        执行完整的 RAG 查询流程
        1. 查询重写
        2. 向量检索
        3. 重排序
        4. 上下文注入 + Claude 回答
        """
        
        # 步骤1:查询重写
        queries = rewrite_query_for_retrieval(
            self.claude, user_question, conversation_history
        )
        
        # 步骤2:多查询向量检索(去重合并)
        all_results = []
        seen_ids = set()
        for q in queries:
            q_embedding = self.embedder.encode(q).tolist()
            results = self.vector_store.retrieve(q_embedding, top_k=top_k_retrieve // len(queries))
            for r in results:
                if r["id"] not in seen_ids:
                    all_results.append(r)
                    seen_ids.add(r["id"])
        
        # 步骤3:重排序
        reranked = self.reranker.rerank(user_question, all_results, top_k=top_k_rerank)
        
        # 步骤4:格式化上下文
        context = self._format_context(reranked)
        
        # 步骤5:调用 Claude
        system = """你是一个专业的知识助手。
根据提供的文档片段回答用户问题。

要求:
- 只使用提供的文档内容作为依据
- 如果文档中没有相关信息,明确说明"文档中未包含此信息"
- 引用时注明来源(如"根据文档1...")
- 不要凭空添加未在文档中出现的信息"""
        
        messages_to_send = conversation_history.copy() if conversation_history else []
        messages_to_send.append({
            "role": "user",
            "content": f"<retrieved_documents>\n{context}\n</retrieved_documents>\n\n{user_question}"
        })
        
        response = self.claude.messages.create(
            model="claude-opus-4-5",
            max_tokens=2048,
            system=system,
            messages=messages_to_send
        )
        
        return response.content[0].text


# 使用示例
pipeline = ClaudeRAGPipeline(collection="company_docs")

# 离线索引(一次性)
docs = [
    {"content": "我们的退款政策是...", "source": "policy.md", "breadcrumb": "退款政策"},
    {"content": "产品功能列表...", "source": "features.md", "breadcrumb": "功能说明"}
]
embeddings = [pipeline.embedder.encode(d["content"]).tolist() for d in docs]
pipeline.vector_store.index_chunks(docs, embeddings)

# 在线查询
answer = pipeline.query("你们的退款流程是什么?")
print(answer)

小结

RAG 是将 Claude 与企业知识库结合的最实用架构。工程质量的核心在于:

  1. 分块策略:递归分块适合通用场景,语义分块质量更高,Markdown 感知分块保留结构信息
  2. 嵌入模型BAAI/bge-m3 是中英双语的优质选择,text-embedding-3-large 质量更高
  3. 检索优化:向量检索 + BM25 混合检索提升召回率,CrossEncoder 重排序提升精度
  4. 查询优化:多查询重写扩展语义覆盖
  5. 上下文注入:结构化格式(含来源和位置信息)帮助 Claude 生成有依据的回答

下一章开始进入 Part 6,探讨 Claude.ai 平台上的 Managed Agents 体系——Projects、Artifacts 与 Agent 生命周期管理。

本章评分
4.5  / 5  (5 评分)

💬 留言讨论