第 80 章

案例三:多语言内容生产流水线(Batch + Structured Outputs + Connector 自动化)

第八十章:构建企业级 RAG 系统:从 POC 到百万日活的架构演进

80.1 企业级 RAG 的全景视角

检索增强生成(Retrieval-Augmented Generation,RAG)是目前企业 AI 应用中最主流的架构模式。与纯 LLM 问答相比,RAG 的价值在于:将企业私有知识库与 LLM 的生成能力结合,实现基于最新、最准确的企业内部信息进行问答。

本章以一家假设的企业——拥有 50 万份内部文档、日活用户 100 万的大型企业——为背景,展示从 0 到生产的完整架构演进路径。

80.1.1 系统架构全景(ASCII 图)

┌─────────────────────────────────────────────────────────────────┐
│                     企业级 RAG 系统架构                           │
├────────────────────────┬────────────────────────────────────────┤
│     离线管道(索引构建)  │          在线管道(查询服务)             │
│                        │                                        │
│  文档源                 │  用户查询                               │
│  ┌──────────────┐      │  ┌─────────────────────────────────┐  │
│  │ SharePoint   │      │  │  API Gateway                    │  │
│  │ Confluence   │      │  │  (认证/限流/日志)                 │  │
│  │ S3/OSS       │──┐   │  └─────────────┬───────────────────┘  │
│  │ 数据库        │  │   │                ↓                       │
│  └──────────────┘  │   │  ┌─────────────────────────────────┐  │
│                    ↓   │  │  查询理解层                       │  │
│  ┌──────────────────┐  │  │  - 意图分类                      │  │
│  │ 文档预处理        │  │  │  - 查询改写/扩展                  │  │
│  │ - 格式解析        │  │  │  - 语言检测                      │  │
│  │ - 文本清洗        │  │  └─────────────┬───────────────────┘  │
│  │ - 元数据提取      │  │                ↓                       │
│  └───────┬──────────┘  │  ┌─────────────────────────────────┐  │
│          ↓             │  │  混合检索层                       │  │
│  ┌──────────────────┐  │  │  ┌──────────┐  ┌─────────────┐  │  │
│  │  分块策略         │  │  │  │向量检索   │  │关键词检索    │  │  │
│  │ (Chunking)       │  │  │  │(Semantic) │  │(BM25/ES)    │  │  │
│  │ - 固定大小        │  │  │  └────┬─────┘  └──────┬──────┘  │  │
│  │ - 语义边界        │  │  │       └────────┬───────┘         │  │
│  │ - 递归分割        │  │  │                ↓                  │  │
│  └───────┬──────────┘  │  │       RRF 融合排序                 │  │
│          ↓             │  └─────────────┬───────────────────┘  │
│  ┌──────────────────┐  │                ↓                       │
│  │  嵌入计算         │  │  ┌─────────────────────────────────┐  │
│  │ - Cohere/OpenAI  │  │  │  重排序层 (Reranker)             │  │
│  │ - BGE/E5         │  │  │  Cross-encoder 精排               │  │
│  └───────┬──────────┘  │  └─────────────┬───────────────────┘  │
│          ↓             │                ↓                       │
│  ┌──────────────────┐  │  ┌─────────────────────────────────┐  │
│  │  向量数据库写入    │  │  │  生成层 (Claude)                 │  │
│  │ - Pinecone       │  │  │  - 上下文组装                    │  │
│  │ - Qdrant         │  │  │  - 引用注入                      │  │
│  │ - pgvector       │  │  │  - 答案生成                      │  │
│  └──────────────────┘  │  └─────────────┬───────────────────┘  │
│                        │                ↓                       │
│                        │  ┌─────────────────────────────────┐  │
│                        │  │  后处理层                        │  │
│                        │  │  - 引用验证                     │  │
│                        │  │  - 格式化输出                   │  │
│                        │  │  - 置信度评估                   │  │
│                        │  └─────────────────────────────────┘  │
└────────────────────────┴────────────────────────────────────────┘
                              ↓
          ┌─────────────────────────────────────┐
          │       监控与可观测性栈                 │
          │  Prometheus + Grafana + Jaeger        │
          └─────────────────────────────────────┘

80.2 第一阶段:POC(0-100 用户)

80.2.1 POC 的目标与约束

POC 阶段的核心目标是验证技术可行性,而不是构建可扩展的系统。典型的 POC 约束:

80.2.2 分块策略(Chunking Strategy)

分块是 RAG 效果最关键的决定因素之一。错误的分块策略会导致:检索到的上下文信息不完整、或者包含大量无关内容。

from anthropic import Anthropic
import re

client = Anthropic()

class DocumentChunker:
    """
    三种核心分块策略
    """
    
    def fixed_size_chunking(
        self,
        text: str,
        chunk_size: int = 512,
        overlap: int = 50
    ) -> list:
        """
        固定大小分块(按字符数)
        优点:简单、可预测
        缺点:可能在句子中间截断
        """
        chunks = []
        start = 0
        
        while start < len(text):
            end = start + chunk_size
            chunk = text[start:end]
            
            # 尝试在句子边界截断
            if end < len(text):
                last_period = max(
                    chunk.rfind('。'),
                    chunk.rfind('.'),
                    chunk.rfind('\n')
                )
                if last_period > chunk_size * 0.7:  # 至少保留70%内容
                    chunk = chunk[:last_period + 1]
                    end = start + last_period + 1
            
            chunks.append({
                "text": chunk,
                "start_char": start,
                "end_char": start + len(chunk),
                "chunk_type": "fixed_size"
            })
            
            start = end - overlap  # 重叠以保持上下文连续性
        
        return chunks
    
    def semantic_chunking(self, text: str) -> list:
        """
        语义边界分块(按段落/章节)
        优点:保持语义完整性
        缺点:chunk 大小不均匀
        """
        # 按段落分割
        paragraphs = re.split(r'\n{2,}', text)
        chunks = []
        current_chunk = ""
        
        for para in paragraphs:
            para = para.strip()
            if not para:
                continue
            
            # 如果当前 chunk 加上新段落不超过限制,则合并
            if len(current_chunk) + len(para) < 800:
                current_chunk += ("\n\n" if current_chunk else "") + para
            else:
                if current_chunk:
                    chunks.append({"text": current_chunk, "chunk_type": "semantic"})
                current_chunk = para
        
        if current_chunk:
            chunks.append({"text": current_chunk, "chunk_type": "semantic"})
        
        return chunks
    
    def recursive_character_splitting(
        self,
        text: str,
        chunk_size: int = 512,
        separators: list = None
    ) -> list:
        """
        递归字符分割(LangChain RecursiveCharacterTextSplitter 思路)
        优点:在多级分隔符间智能选择
        """
        if separators is None:
            separators = ["\n\n", "\n", "。", ".", " ", ""]
        
        def _split(text, separators):
            if len(text) <= chunk_size:
                return [text]
            
            separator = separators[0] if separators else ""
            
            if separator:
                splits = text.split(separator)
            else:
                splits = list(text)
            
            chunks = []
            current = ""
            
            for split in splits:
                if len(current) + len(split) + len(separator) <= chunk_size:
                    current += (separator if current else "") + split
                else:
                    if current:
                        chunks.append(current)
                    if len(split) > chunk_size and len(separators) > 1:
                        chunks.extend(_split(split, separators[1:]))
                    else:
                        current = split
            
            if current:
                chunks.append(current)
            
            return chunks
        
        raw_chunks = _split(text, separators)
        return [{"text": c, "chunk_type": "recursive"} for c in raw_chunks if c.strip()]

80.2.3 POC 完整实现

import json
from typing import Optional

class SimplePOCRAG:
    """POC 阶段的简化 RAG 实现"""
    
    def __init__(self, embedding_fn, vector_store):
        """
        embedding_fn: 计算文本嵌入的函数
        vector_store: 支持 upsert/query 操作的向量存储
        """
        self.embed = embedding_fn
        self.store = vector_store
        self.chunker = DocumentChunker()
    
    def index_document(self, doc_id: str, text: str, metadata: dict = None):
        """索引单个文档"""
        chunks = self.chunker.semantic_chunking(text)
        
        for i, chunk in enumerate(chunks):
            chunk_id = f"{doc_id}_{i}"
            embedding = self.embed(chunk["text"])
            
            self.store.upsert(
                id=chunk_id,
                vector=embedding,
                metadata={
                    "text": chunk["text"],
                    "doc_id": doc_id,
                    "chunk_index": i,
                    **(metadata or {})
                }
            )
    
    def query(self, question: str, top_k: int = 5) -> dict:
        """查询问答"""
        # 1. 检索相关文档
        query_embedding = self.embed(question)
        results = self.store.query(vector=query_embedding, top_k=top_k)
        
        if not results:
            return {"answer": "未找到相关信息", "sources": []}
        
        # 2. 组装上下文
        context = "\n\n---\n\n".join([
            f"[来源 {i+1}]: {r['metadata']['text']}"
            for i, r in enumerate(results)
        ])
        
        # 3. 调用 Claude 生成答案
        response = client.messages.create(
            model="claude-opus-4-5",
            max_tokens=1000,
            system="""你是企业知识库助手。基于提供的文档片段回答问题。
            
回答规则:
1. 只基于提供的文档内容回答
2. 如果文档中没有相关信息,明确说明
3. 引用来源(使用[来源N]格式)
4. 回答要准确、简洁""",
            messages=[{
                "role": "user",
                "content": f"相关文档:\n{context}\n\n问题:{question}"
            }]
        )
        
        return {
            "answer": response.content[0].text,
            "sources": [r["metadata"]["doc_id"] for r in results],
            "context_used": len(results)
        }

80.3 第二阶段:生产化(100-10,000 用户)

单一向量检索对于精确匹配(如 "订单号 ORD-12345")效果差,而 BM25 关键词检索对语义理解能力有限。生产系统需要两者结合:

class HybridRetriever:
    """
    混合检索器:向量检索 + BM25 关键词检索 + RRF 融合
    """
    
    def __init__(self, vector_store, bm25_index):
        self.vector_store = vector_store
        self.bm25 = bm25_index
    
    def retrieve(self, query: str, top_k: int = 20) -> list:
        """
        混合检索,返回 top_k 个结果
        """
        # 1. 向量检索
        query_embedding = compute_embedding(query)
        vector_results = self.vector_store.query(
            vector=query_embedding,
            top_k=top_k
        )
        
        # 2. BM25 关键词检索
        keyword_results = self.bm25.search(query, top_k=top_k)
        
        # 3. RRF (Reciprocal Rank Fusion) 融合
        return self._rrf_fusion(
            [vector_results, keyword_results],
            k=60  # RRF 常数
        )
    
    def _rrf_fusion(self, result_lists: list, k: int = 60) -> list:
        """
        Reciprocal Rank Fusion 算法
        为每个文档计算融合分数:sum(1 / (k + rank))
        """
        scores = {}
        
        for result_list in result_lists:
            for rank, result in enumerate(result_list):
                doc_id = result.get("id")
                if doc_id not in scores:
                    scores[doc_id] = {"score": 0.0, "data": result}
                scores[doc_id]["score"] += 1.0 / (k + rank + 1)
        
        # 按 RRF 分数排序
        sorted_results = sorted(
            scores.values(),
            key=lambda x: x["score"],
            reverse=True
        )
        
        return [r["data"] for r in sorted_results]

80.3.2 重排序(Reranking)

检索到的候选集需要经过 Cross-encoder 进行精排,以提升最终上下文的质量:

class CrossEncoderReranker:
    """
    Cross-encoder 重排序器
    相比双塔向量模型,Cross-encoder 同时处理查询和文档,精度更高
    """
    
    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model_name)
    
    def rerank(self, query: str, candidates: list, top_k: int = 5) -> list:
        """
        对候选集进行重排序
        candidates: [{"text": "...", "metadata": {...}}]
        """
        if not candidates:
            return []
        
        # 构建 (query, passage) 对
        pairs = [(query, c["text"]) for c in candidates]
        
        # 批量评分
        scores = self.model.predict(pairs)
        
        # 按分数排序
        scored = list(zip(scores, candidates))
        scored.sort(key=lambda x: x[0], reverse=True)
        
        return [
            {**candidate, "rerank_score": float(score)}
            for score, candidate in scored[:top_k]
        ]

80.3.3 查询改写与扩展

def rewrite_query(original_query: str, conversation_history: list = None) -> dict:
    """
    查询改写:将口语化、上下文依赖的查询转化为独立的检索查询
    """
    history_text = ""
    if conversation_history:
        history_text = "对话历史:\n" + "\n".join([
            f"用户:{turn['user']}\n助手:{turn['assistant'][:100]}..."
            for turn in conversation_history[-3:]
        ])
    
    rewrite_prompt = f"""请将以下用户查询改写为更适合文档检索的形式。

{history_text}

原始查询:{original_query}

改写要求:
1. 消除指代词("它"、"这个"、"上面提到的")
2. 补充上下文(如果查询依赖对话历史)
3. 生成 3 个语义相似但措辞不同的查询变体(用于查询扩展)
4. 识别查询中的关键实体和概念

以 JSON 格式输出:
{{
  "rewritten_query": "改写后的查询",
  "query_variants": ["变体1", "变体2", "变体3"],
  "key_entities": ["实体1", "实体2"],
  "query_type": "factual|procedural|analytical|comparative"
}}"""
    
    response = client.messages.create(
        model="claude-haiku-4-5",
        max_tokens=400,
        messages=[{"role": "user", "content": rewrite_prompt}]
    )
    
    try:
        return json.loads(response.content[0].text)
    except:
        return {
            "rewritten_query": original_query,
            "query_variants": [original_query],
            "key_entities": [],
            "query_type": "factual"
        }

80.4 第三阶段:规模化(10,000-100 万日活)

80.4.1 向量数据库选型

特性 Pinecone Qdrant pgvector
部署模式 全托管云服务 自托管/云托管 PostgreSQL 扩展
规模上限 亿级向量 亿级(水平扩展) 百万级(单机)
延迟(p99) <50ms <20ms(自托管) <100ms
全文检索 不支持 支持(内置 payload 过滤) 支持(PostgreSQL)
成本 高(按用量) 中(自托管基础设施) 低(已有 PG 环境)
适用场景 快速启动,无运维能力 高性能自托管 已有 PG 技术栈

80.4.2 分布式索引构建

from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import threading

class DistributedIndexBuilder:
    """
    分布式文档索引构建器
    支持并发处理大量文档
    """
    
    def __init__(
        self,
        vector_store,
        embedding_fn,
        max_workers: int = 10,
        batch_size: int = 100
    ):
        self.vector_store = vector_store
        self.embed = embedding_fn
        self.max_workers = max_workers
        self.batch_size = batch_size
        self.chunker = DocumentChunker()
        self._progress = {"processed": 0, "failed": 0, "total": 0}
        self._lock = threading.Lock()
    
    def build_index(self, documents: list) -> dict:
        """
        批量构建索引
        documents: [{"id": "...", "text": "...", "metadata": {...}}]
        """
        self._progress["total"] = len(documents)
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {
                executor.submit(self._process_document, doc): doc["id"]
                for doc in documents
            }
            
            for future in as_completed(futures):
                doc_id = futures[future]
                try:
                    future.result()
                    with self._lock:
                        self._progress["processed"] += 1
                except Exception as e:
                    print(f"Failed to index {doc_id}: {e}")
                    with self._lock:
                        self._progress["failed"] += 1
        
        return {
            "success": self._progress["processed"],
            "failed": self._progress["failed"],
            "total": self._progress["total"]
        }
    
    def _process_document(self, doc: dict):
        """处理单个文档"""
        chunks = self.chunker.recursive_character_splitting(doc["text"])
        
        # 批量计算嵌入
        texts = [c["text"] for c in chunks]
        embeddings = [self.embed(t) for t in texts]  # 生产环境应批量调用
        
        # 批量写入向量存储
        items = [
            {
                "id": f"{doc['id']}_{i}",
                "vector": emb,
                "metadata": {
                    "text": chunk["text"],
                    "doc_id": doc["id"],
                    "chunk_index": i,
                    **doc.get("metadata", {})
                }
            }
            for i, (chunk, emb) in enumerate(zip(chunks, embeddings))
        ]
        
        self.vector_store.upsert_batch(items)

80.4.3 缓存策略

import hashlib
from functools import lru_cache

class RAGCache:
    """
    多层缓存策略
    - L1: 内存缓存(LRU)
    - L2: Redis 缓存(分布式)
    - L3: 语义缓存(相似查询复用)
    """
    
    def __init__(self, redis_client=None, semantic_threshold: float = 0.95):
        self.redis = redis_client
        self.semantic_threshold = semantic_threshold
        self._query_cache = {}  # 内存缓存
    
    def get_cache_key(self, query: str) -> str:
        return hashlib.md5(query.encode()).hexdigest()
    
    def get(self, query: str) -> Optional[dict]:
        """多层缓存查询"""
        key = self.get_cache_key(query)
        
        # L1: 内存缓存
        if key in self._query_cache:
            return self._query_cache[key]
        
        # L2: Redis 缓存
        if self.redis:
            cached = self.redis.get(f"rag:{key}")
            if cached:
                result = json.loads(cached)
                self._query_cache[key] = result  # 回填 L1
                return result
        
        return None
    
    def set(self, query: str, result: dict, ttl: int = 3600):
        """写入缓存"""
        key = self.get_cache_key(query)
        
        # L1
        self._query_cache[key] = result
        
        # L2
        if self.redis:
            self.redis.setex(f"rag:{key}", ttl, json.dumps(result))

80.5 监控与可观测性

80.5.1 关键指标体系

RAG_METRICS = {
    "retrieval_quality": {
        "recall_at_k": "检索召回率(前K个结果包含相关文档的比例)",
        "mrr": "平均倒数排名(Mean Reciprocal Rank)",
        "ndcg": "归一化折损累积增益"
    },
    "generation_quality": {
        "answer_faithfulness": "答案忠实度(是否基于检索内容)",
        "answer_relevance": "答案相关性(是否回答了问题)",
        "context_precision": "上下文精确率(检索内容有多少真正被使用)"
    },
    "system_performance": {
        "retrieval_latency_p50": "检索延迟 P50(目标:<100ms)",
        "retrieval_latency_p99": "检索延迟 P99(目标:<500ms)",
        "generation_latency_p50": "生成延迟 P50(目标:<2s)",
        "total_latency_p99": "总延迟 P99(目标:<5s)",
        "error_rate": "错误率(目标:<0.1%)"
    },
    "business_metrics": {
        "answer_acceptance_rate": "答案采纳率(用户不追问的比例)",
        "citation_click_rate": "引用点击率(用户查看原文的比例)",
        "session_resolution_rate": "会话解决率"
    }
}

80.5.2 RAG 评估流水线

class RAGEvaluator:
    """使用 Claude 作为评估器"""
    
    def evaluate_answer(
        self,
        question: str,
        retrieved_context: str,
        generated_answer: str
    ) -> dict:
        """
        评估 RAG 答案质量
        """
        eval_prompt = f"""请评估以下 RAG 系统的输出质量。

问题:{question}

检索到的上下文:
{retrieved_context[:3000]}

生成的答案:
{generated_answer}

请评估以下维度(1-5分):

1. 忠实度(Faithfulness):答案是否完全基于提供的上下文,没有编造信息?
2. 相关性(Relevance):答案是否直接回答了问题?
3. 完整性(Completeness):答案是否涵盖了问题的所有方面?
4. 精确性(Precision):上下文中的哪些信息被实际使用?(0=完全没用到,1=全部用到)

以 JSON 格式输出:
{{
  "faithfulness": {{"score": 1-5, "explanation": "..."}},
  "relevance": {{"score": 1-5, "explanation": "..."}},
  "completeness": {{"score": 1-5, "explanation": "..."}},
  "context_precision": {{"score": 0.0-1.0, "explanation": "..."}},
  "overall_quality": "excellent/good/acceptable/poor",
  "hallucination_detected": true/false,
  "hallucinated_content": "如果有,列出编造的内容"
}}"""
        
        response = client.messages.create(
            model="claude-opus-4-5",
            max_tokens=1000,
            messages=[{"role": "user", "content": eval_prompt}]
        )
        
        try:
            return json.loads(response.content[0].text)
        except:
            return {"raw_evaluation": response.content[0].text}

80.6 成本优化策略

80.6.1 Token 成本优化

COST_OPTIMIZATION_STRATEGIES = {
    "query_routing": {
        "description": "根据查询复杂度路由到不同模型",
        "implementation": """
# 简单查询 → Claude Haiku(~5倍便宜)
# 复杂推理 → Claude Opus
        
def route_model(query: str, complexity_score: float) -> str:
    if complexity_score < 0.3:
        return "claude-haiku-4-5"
    elif complexity_score < 0.7:
        return "claude-sonnet-4-5"
    else:
        return "claude-opus-4-5"
"""
    },
    "context_compression": {
        "description": "压缩检索到的上下文,减少输入 Token",
        "implementation": """
def compress_context(retrieved_chunks: list, query: str) -> str:
    # 只保留与查询最相关的句子
    # 可节省 30-50% 的上下文 Token
    pass
"""
    },
    "semantic_caching": {
        "description": "缓存相似查询的结果",
        "estimated_savings": "重复查询率高的场景可节省 40-60% API 调用"
    },
    "batching": {
        "description": "批量处理嵌入计算请求",
        "note": "嵌入 API 批量调用通常有折扣"
    }
}

80.6.2 规模化成本预测

def estimate_monthly_cost(
    daily_active_users: int,
    avg_queries_per_user: int,
    avg_context_tokens: int,
    avg_output_tokens: int
) -> dict:
    """
    估算月度 RAG 系统成本
    """
    monthly_queries = daily_active_users * avg_queries_per_user * 30
    
    # Embedding 成本(以 text-embedding-3-small 为例)
    embedding_cost_per_1k = 0.00002  # $0.00002 / 1K tokens
    embedding_tokens_per_query = 100  # 平均每次查询的嵌入计算 tokens
    monthly_embedding_cost = (monthly_queries * embedding_tokens_per_query / 1000) * embedding_cost_per_1k
    
    # Claude 生成成本(以 claude-sonnet 为例)
    input_cost_per_1k = 0.003   # $3 / 1M input tokens
    output_cost_per_1k = 0.015  # $15 / 1M output tokens
    
    monthly_input_tokens = monthly_queries * avg_context_tokens
    monthly_output_tokens = monthly_queries * avg_output_tokens
    
    monthly_llm_cost = (
        (monthly_input_tokens / 1000) * input_cost_per_1k +
        (monthly_output_tokens / 1000) * output_cost_per_1k
    )
    
    return {
        "daily_active_users": daily_active_users,
        "monthly_queries": monthly_queries,
        "monthly_embedding_cost_usd": round(monthly_embedding_cost, 2),
        "monthly_llm_cost_usd": round(monthly_llm_cost, 2),
        "monthly_total_cost_usd": round(monthly_embedding_cost + monthly_llm_cost, 2),
        "cost_per_query_usd": round((monthly_embedding_cost + monthly_llm_cost) / monthly_queries, 5)
    }

# 百万日活场景估算
print(estimate_monthly_cost(
    daily_active_users=1_000_000,
    avg_queries_per_user=3,
    avg_context_tokens=2000,
    avg_output_tokens=300
))
# 预计输出:
# monthly_queries: 90,000,000
# monthly_llm_cost: ~$810,000(优化前)
# 通过缓存和模型路由可降至 ~$200,000-300,000

80.7 架构演进总结

POC 阶段(0-100 用户)
├── 单机部署
├── 简单语义分块
├── 纯向量检索
├── 无缓存
└── 响应时间 5-15 秒可接受

生产化阶段(100-10,000 用户)
├── 混合检索(向量 + BM25)
├── Cross-encoder 重排序
├── 查询改写与扩展
├── Redis 查询缓存
└── 响应时间 < 3 秒

规模化阶段(10,000-100 万日活)
├── 分布式向量数据库(Pinecone/Qdrant 集群)
├── 多层缓存(L1/L2/语义缓存)
├── 查询路由(模型分级)
├── 异步处理管道
├── 全面的监控告警
└── 响应时间 P99 < 2 秒

百万日活以上
├── 多地域部署(降低延迟)
├── 模型蒸馏(将高成本推理转移到本地模型)
├── 实时增量索引(文档更新即时反映)
├── 个性化检索(基于用户画像)
└── A/B 测试框架(持续优化各模块)

小结:全书终章

本章作为全书的终章,以企业级 RAG 系统为载体,综合了书中各个主题:Prompt 工程(查询改写、答案生成)、评估体系(RAG Evaluator)、多语言支持(分块与检索的语言感知)、以及系统架构设计(从 POC 到百万日活)。

核心设计原则总结

  1. 分阶段演进:不要试图在 POC 阶段就构建完整的生产系统。从最简单的可工作版本开始,数据驱动地添加复杂性。

  2. 检索质量是核心:RAG 系统 80% 的质量问题来自检索,而非生成。在优化 Prompt 之前,先确保检索召回了正确的文档。

  3. 可观测性先于优化:没有指标就没有改进方向。先构建完整的指标体系,再进行系统优化。

  4. 成本意识贯穿始终:在百万日活规模,每百分之一的成本优化都价值数万美元/月。查询路由、缓存和上下文压缩是三大核心成本控制手段。

  5. 人在回路(Human in the Loop):AI 不是目的,而是手段。最好的 RAG 系统会在置信度不足时诚实地说"我不知道",并引导用户找到人类专家。

构建 AI 系统是一场马拉松,而非短跑。本书提供的框架、代码和思维方式,是这场旅程的起点,而非终点。

本章评分
4.8  / 5  (3 评分)

💬 留言讨论