第 69 章

案例:智能知识库助手(RAG + Hermes)

第69章:案例:智能知识库助手(RAG + Hermes)

本章通过一个完整的企业级项目——基于 Hermes Agent 和向量数据库构建的内部文档问答系统,将前68章的理论落地为可运行的代码。从需求分析到生产部署,我们将完整走过每一个环节,并诚实地记录在真实开发过程中踩过的5个坑。


69.1 需求分析

69.1.1 业务场景

客户:一家拥有 5000+ 名员工的制造业企业
痛点

目标:构建企业内部知识库问答系统,让员工用自然语言提问,秒级获得准确答案(含来源引用)。

69.1.2 需求矩阵

需求 描述 优先级
自然语言问答 支持中英文混合提问 P0
文档引用 答案必须标注来源文档和页码 P0
实时性 文档更新后30分钟内在系统中生效 P0
多轮对话 支持追问和上下文关联 P1
权限控制 不同部门只能访问授权文档 P1
文档类型 支持 PDF、Word、Excel、PPT、Markdown P1
响应时间 P95 < 5秒 P1
并发 支持 200 并发用户 P2
追溯日志 所有查询可审计 P2

69.2 架构设计

69.2.1 整体架构

用户查询
    │
    ▼
┌───────────────────────────────────────────────────────┐
│                   Hermes Agent 层                      │
│  ┌─────────────┐  ┌─────────────┐  ┌──────────────┐  │
│  │ 查询理解     │  │ 工具编排     │  │ 答案合成      │  │
│  │ - 意图识别   │  │ - 检索策略   │  │ - 引用标注   │  │
│  │ - 查询改写   │  │ - 多工具组合 │  │ - 格式化     │  │
│  └─────────────┘  └─────────────┘  └──────────────┘  │
└───────────────────────────────────────────────────────┘
    │              │              │
    ▼              ▼              ▼
┌─────────┐  ┌──────────┐  ┌──────────────┐
│ Qdrant  │  │ 全文索引  │  │ 文档元数据    │
│ 向量库  │  │ (ES)     │  │ (PostgreSQL) │
└─────────┘  └──────────┘  └──────────────┘
    ▲              ▲
    │              │
┌───────────────────────────────────────────────────────┐
│                   文档处理流水线                        │
│  文档上传 → 解析 → 分块 → 嵌入 → 存储 → 索引更新      │
└───────────────────────────────────────────────────────┘

69.2.2 技术选型

组件 选型 理由
Agent 框架 Hermes Agent 工具调用精准,支持多步推理
向量数据库 Qdrant 开源、高性能、支持过滤
全文搜索 Elasticsearch 混合检索,处理精确匹配
Embedding 模型 text-embedding-3-large 中英文效果好
文档解析 Unstructured.io 支持多种格式,布局感知
元数据存储 PostgreSQL 文档版本管理、权限控制
API 网关 FastAPI 高性能,异步支持
部署 Kubernetes 弹性扩缩容

69.3 完整实现代码

69.3.1 文档处理流水线

# pipeline/document_processor.py

import asyncio
import hashlib
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
import unstructured.partition.auto as auto_partition
from unstructured.documents.elements import Element

@dataclass
class DocumentChunk:
    """文档分块"""
    chunk_id: str
    doc_id: str
    doc_name: str
    page_number: int
    chunk_index: int
    content: str
    content_type: str  # text / table / image_caption
    
    # 元数据
    department: str
    access_level: str
    created_at: str
    updated_at: str
    
    # 向量(后续填充)
    embedding: Optional[list[float]] = None
    
    @property
    def word_count(self) -> int:
        return len(self.content.split())

class DocumentProcessor:
    """文档处理器:解析、分块、生成摘要"""
    
    def __init__(self, chunking_config: dict):
        self.chunk_size = chunking_config.get("chunk_size", 512)  # tokens
        self.chunk_overlap = chunking_config.get("chunk_overlap", 64)
        self.min_chunk_size = chunking_config.get("min_chunk_size", 50)
    
    async def process_document(
        self,
        file_path: str,
        doc_metadata: dict
    ) -> list[DocumentChunk]:
        """处理单个文档,返回分块列表"""
        
        # 1. 解析文档(Unstructured.io 处理多种格式)
        elements = auto_partition.partition(
            filename=file_path,
            languages=["chi_sim", "eng"],  # 支持中英文
            strategy="hi_res",              # 高精度模式(识别表格、图片)
            include_page_breaks=True,
        )
        
        # 2. 按页面/章节分组
        grouped = self._group_elements(elements)
        
        # 3. 生成分块
        chunks = []
        chunk_index = 0
        
        for page_num, page_elements in grouped.items():
            page_text = "\n".join(e.text for e in page_elements if e.text)
            
            if len(page_text) < self.min_chunk_size:
                continue
            
            # 滑动窗口分块
            page_chunks = self._sliding_window_chunk(page_text)
            
            for chunk_text in page_chunks:
                chunk_id = hashlib.sha256(
                    f"{doc_metadata['doc_id']}:{chunk_index}:{chunk_text[:50]}".encode()
                ).hexdigest()[:16]
                
                chunks.append(DocumentChunk(
                    chunk_id=chunk_id,
                    doc_id=doc_metadata["doc_id"],
                    doc_name=doc_metadata["doc_name"],
                    page_number=page_num,
                    chunk_index=chunk_index,
                    content=chunk_text,
                    content_type=self._detect_content_type(page_elements),
                    department=doc_metadata.get("department", "general"),
                    access_level=doc_metadata.get("access_level", "public"),
                    created_at=doc_metadata.get("created_at", ""),
                    updated_at=doc_metadata.get("updated_at", ""),
                ))
                chunk_index += 1
        
        return chunks
    
    def _sliding_window_chunk(self, text: str) -> list[str]:
        """
        滑动窗口分块
        策略:优先在句号/换行处切分,保持语义完整性
        """
        # 按句子切分
        import re
        sentences = re.split(r'(?<=[。!?.!?])\s+', text)
        
        chunks = []
        current_chunk = []
        current_size = 0
        
        for sentence in sentences:
            sentence_size = len(sentence.split())
            
            if current_size + sentence_size > self.chunk_size and current_chunk:
                chunks.append(" ".join(current_chunk))
                
                # 保留重叠部分
                overlap_sentences = []
                overlap_size = 0
                for s in reversed(current_chunk):
                    overlap_size += len(s.split())
                    if overlap_size > self.chunk_overlap:
                        break
                    overlap_sentences.insert(0, s)
                
                current_chunk = overlap_sentences
                current_size = sum(len(s.split()) for s in current_chunk)
            
            current_chunk.append(sentence)
            current_size += sentence_size
        
        if current_chunk:
            chunks.append(" ".join(current_chunk))
        
        return chunks
    
    def _group_elements(self, elements: list[Element]) -> dict[int, list[Element]]:
        """按页面分组文档元素"""
        grouped = {}
        current_page = 1
        
        for element in elements:
            # Unstructured 在页面分隔处插入 PageBreak 元素
            if element.__class__.__name__ == "PageBreak":
                current_page += 1
            else:
                if current_page not in grouped:
                    grouped[current_page] = []
                grouped[current_page].append(element)
        
        return grouped
    
    def _detect_content_type(self, elements: list) -> str:
        """检测内容类型"""
        type_counts = {}
        for e in elements:
            t = e.__class__.__name__
            type_counts[t] = type_counts.get(t, 0) + 1
        
        if "Table" in type_counts and type_counts["Table"] > len(elements) * 0.3:
            return "table"
        return "text"

69.3.2 Embedding 与向量存储

# pipeline/vector_store.py

from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance, VectorParams, PointStruct, Filter,
    FieldCondition, MatchValue, Range
)
from openai import AsyncOpenAI
import asyncio
from typing import Optional

class VectorStore:
    """Qdrant 向量存储管理器"""
    
    COLLECTION_NAME = "enterprise_kb"
    VECTOR_DIM = 3072  # text-embedding-3-large 的维度
    
    def __init__(self, qdrant_url: str, openai_api_key: str):
        self.qdrant = QdrantClient(url=qdrant_url)
        self.openai = AsyncOpenAI(api_key=openai_api_key)
        self._ensure_collection()
    
    def _ensure_collection(self):
        """确保向量集合存在(幂等操作)"""
        collections = [c.name for c in self.qdrant.get_collections().collections]
        
        if self.COLLECTION_NAME not in collections:
            self.qdrant.create_collection(
                collection_name=self.COLLECTION_NAME,
                vectors_config=VectorParams(
                    size=self.VECTOR_DIM,
                    distance=Distance.COSINE,
                ),
            )
            
            # 创建用于过滤的索引(部门、访问级别)
            self.qdrant.create_payload_index(
                collection_name=self.COLLECTION_NAME,
                field_name="department",
                field_schema="keyword",
            )
            self.qdrant.create_payload_index(
                collection_name=self.COLLECTION_NAME,
                field_name="access_level",
                field_schema="keyword",
            )
            self.qdrant.create_payload_index(
                collection_name=self.COLLECTION_NAME,
                field_name="doc_id",
                field_schema="keyword",
            )
    
    async def embed_chunks(self, chunks: list[DocumentChunk]) -> list[DocumentChunk]:
        """批量生成 Embedding(控制并发避免 Rate Limit)"""
        
        BATCH_SIZE = 100  # OpenAI 批量 embedding 限制
        semaphore = asyncio.Semaphore(3)  # 最多3个并发请求
        
        async def embed_batch(batch: list[DocumentChunk]) -> list[DocumentChunk]:
            async with semaphore:
                texts = [chunk.content for chunk in batch]
                response = await self.openai.embeddings.create(
                    input=texts,
                    model="text-embedding-3-large",
                )
                for i, chunk in enumerate(batch):
                    chunk.embedding = response.data[i].embedding
                return batch
        
        # 分批处理
        batches = [chunks[i:i+BATCH_SIZE] for i in range(0, len(chunks), BATCH_SIZE)]
        results = await asyncio.gather(*[embed_batch(batch) for batch in batches])
        
        return [chunk for batch in results for chunk in batch]
    
    def upsert_chunks(self, chunks: list[DocumentChunk]):
        """将分块写入 Qdrant(upsert 支持更新)"""
        
        points = [
            PointStruct(
                id=int(chunk.chunk_id[:8], 16),  # 将前8位hex转为int作为ID
                vector=chunk.embedding,
                payload={
                    "chunk_id": chunk.chunk_id,
                    "doc_id": chunk.doc_id,
                    "doc_name": chunk.doc_name,
                    "page_number": chunk.page_number,
                    "chunk_index": chunk.chunk_index,
                    "content": chunk.content,
                    "content_type": chunk.content_type,
                    "department": chunk.department,
                    "access_level": chunk.access_level,
                    "updated_at": chunk.updated_at,
                }
            )
            for chunk in chunks
            if chunk.embedding is not None
        ]
        
        # 批量写入
        BATCH_SIZE = 100
        for i in range(0, len(points), BATCH_SIZE):
            self.qdrant.upsert(
                collection_name=self.COLLECTION_NAME,
                points=points[i:i+BATCH_SIZE],
            )
    
    async def search(
        self,
        query: str,
        top_k: int = 10,
        department_filter: Optional[str] = None,
        access_levels: Optional[list[str]] = None,
        score_threshold: float = 0.7,
    ) -> list[dict]:
        """语义搜索"""
        
        # 生成查询向量
        response = await self.openai.embeddings.create(
            input=[query],
            model="text-embedding-3-large",
        )
        query_vector = response.data[0].embedding
        
        # 构建过滤条件(权限控制)
        filters = []
        if department_filter:
            filters.append(FieldCondition(
                key="department",
                match=MatchValue(value=department_filter)
            ))
        if access_levels:
            filters.append(FieldCondition(
                key="access_level",
                match=MatchValue(any=access_levels)
            ))
        
        query_filter = Filter(must=filters) if filters else None
        
        # 执行搜索
        results = self.qdrant.search(
            collection_name=self.COLLECTION_NAME,
            query_vector=query_vector,
            query_filter=query_filter,
            limit=top_k,
            score_threshold=score_threshold,
            with_payload=True,
        )
        
        return [
            {
                "score": r.score,
                "content": r.payload["content"],
                "doc_name": r.payload["doc_name"],
                "doc_id": r.payload["doc_id"],
                "page_number": r.payload["page_number"],
                "department": r.payload["department"],
            }
            for r in results
        ]

69.3.3 Hermes Agent 工具定义

# agent/tools.py

from typing import Optional
import json
import re

class KnowledgeBaseTools:
    """知识库 Agent 工具集"""
    
    def __init__(
        self,
        vector_store: VectorStore,
        full_text_search,  # Elasticsearch 客户端
        db_session,        # PostgreSQL Session
    ):
        self.vector_store = vector_store
        self.fts = full_text_search
        self.db = db_session
    
    def get_tool_definitions(self) -> list[dict]:
        """返回 Hermes API 格式的工具定义"""
        return [
            {
                "name": "semantic_search",
                "description": "在知识库中进行语义搜索,找出语义相关的文档片段。适合开放式问题和概念查询。",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "搜索查询,用自然语言描述"
                        },
                        "top_k": {
                            "type": "integer",
                            "description": "返回结果数量(默认5,最多20)",
                            "default": 5
                        },
                        "department": {
                            "type": "string",
                            "description": "可选:限定在特定部门文档中搜索"
                        },
                    },
                    "required": ["query"]
                }
            },
            {
                "name": "keyword_search",
                "description": "在知识库中进行关键词精确搜索,适合搜索产品型号、规格参数、文档编号等精确信息。",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "keywords": {
                            "type": "string",
                            "description": "搜索关键词(支持AND/OR/NOT逻辑)"
                        },
                        "doc_type": {
                            "type": "string",
                            "enum": ["manual", "standard", "policy", "training", "all"],
                            "description": "文档类型过滤"
                        },
                    },
                    "required": ["keywords"]
                }
            },
            {
                "name": "get_document_content",
                "description": "获取特定文档的完整内容(或指定页范围)",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "doc_id": {"type": "string", "description": "文档ID"},
                        "page_start": {"type": "integer", "description": "起始页(从1开始)"},
                        "page_end": {"type": "integer", "description": "结束页"},
                    },
                    "required": ["doc_id"]
                }
            },
            {
                "name": "get_document_list",
                "description": "获取知识库中特定类别的文档列表",
                "input_schema": {
                    "type": "object",
                    "properties": {
                        "category": {"type": "string", "description": "文档类别"},
                        "updated_after": {"type": "string", "description": "更新日期过滤(ISO格式)"},
                    },
                }
            },
        ]
    
    async def execute_tool(self, tool_name: str, tool_args: dict, user_context: dict) -> str:
        """执行工具调用"""
        
        # 注入用户权限上下文
        department = user_context.get("department")
        access_levels = user_context.get("access_levels", ["public"])
        
        if tool_name == "semantic_search":
            results = await self.vector_store.search(
                query=tool_args["query"],
                top_k=min(tool_args.get("top_k", 5), 20),
                department_filter=tool_args.get("department") or department,
                access_levels=access_levels,
            )
            return json.dumps(results, ensure_ascii=False, indent=2)
        
        elif tool_name == "keyword_search":
            results = await self._keyword_search(
                keywords=tool_args["keywords"],
                doc_type=tool_args.get("doc_type", "all"),
                access_levels=access_levels,
            )
            return json.dumps(results, ensure_ascii=False, indent=2)
        
        elif tool_name == "get_document_content":
            content = await self._get_doc_content(
                doc_id=tool_args["doc_id"],
                page_start=tool_args.get("page_start", 1),
                page_end=tool_args.get("page_end"),
                access_levels=access_levels,
            )
            return content
        
        else:
            return json.dumps({"error": f"Unknown tool: {tool_name}"})

69.3.4 Hermes Agent 主控逻辑

# agent/hermes_kb_agent.py

import anthropic
import json
from typing import AsyncGenerator

class HermesKnowledgeAgent:
    """
    基于 Hermes Agent 框架的知识库问答 Agent
    
    注意:这里使用 Anthropic Claude 作为底层 LLM
    实际部署中替换为 Hermes API 端点
    """
    
    SYSTEM_PROMPT = """你是企业内部知识库助手,负责帮助员工快速找到工作相关的信息。

你的工作原则:
1. 只基于知识库中的文档内容回答,不要凭记忆或推测
2. 每个关键信息点必须标注来源(文档名称 + 页码)
3. 如果知识库中没有相关信息,明确告知用户"知识库中暂无此信息"
4. 对于技术规格、安全规程类问题,务必提醒用户以最新版文档为准
5. 保持回答简洁专业,优先使用表格、列表等结构化格式

安全约束(不可违反):
- 不得访问或透露其他部门的受权限保护文档
- 不得执行任何文档读取以外的操作
- 不得修改或删除任何内容
"""
    
    def __init__(self, tools: KnowledgeBaseTools, model: str = "claude-opus-4-5"):
        self.client = anthropic.Anthropic()
        self.tools_handler = tools
        self.model = model
        self.tool_defs = tools.get_tool_definitions()
    
    async def answer(
        self,
        user_query: str,
        user_context: dict,
        conversation_history: list[dict] = None,
        max_steps: int = 10,
    ) -> dict:
        """
        处理用户问题,返回带来源引用的答案
        """
        
        messages = conversation_history or []
        messages.append({"role": "user", "content": user_query})
        
        tool_calls_log = []
        
        for step in range(max_steps):
            # 调用 Hermes/Claude
            response = self.client.messages.create(
                model=self.model,
                max_tokens=4096,
                system=self.SYSTEM_PROMPT,
                tools=self.tool_defs,
                messages=messages,
            )
            
            # 检查停止原因
            if response.stop_reason == "end_turn":
                # Agent 完成回答
                final_answer = next(
                    (block.text for block in response.content if hasattr(block, "text")),
                    "无法生成回答"
                )
                return {
                    "answer": final_answer,
                    "tool_calls": tool_calls_log,
                    "steps": step + 1,
                    "sources": self._extract_sources(tool_calls_log),
                }
            
            elif response.stop_reason == "tool_use":
                # 处理工具调用
                tool_results = []
                
                for block in response.content:
                    if block.type == "tool_use":
                        # 记录工具调用
                        tool_calls_log.append({
                            "step": step,
                            "tool": block.name,
                            "args": block.input,
                        })
                        
                        # 执行工具(带权限控制)
                        result = await self.tools_handler.execute_tool(
                            tool_name=block.name,
                            tool_args=block.input,
                            user_context=user_context,
                        )
                        
                        tool_results.append({
                            "type": "tool_result",
                            "tool_use_id": block.id,
                            "content": result,
                        })
                
                # 将工具结果加入消息历史
                messages.append({"role": "assistant", "content": response.content})
                messages.append({"role": "user", "content": tool_results})
            
            else:
                break
        
        return {
            "answer": "回答生成超过最大步骤限制,请简化您的问题",
            "tool_calls": tool_calls_log,
            "steps": max_steps,
            "sources": [],
        }
    
    def _extract_sources(self, tool_calls: list[dict]) -> list[dict]:
        """从工具调用结果中提取来源引用"""
        sources = []
        seen = set()
        
        for call in tool_calls:
            if call["tool"] in ["semantic_search", "keyword_search"]:
                try:
                    results = json.loads(call.get("result", "[]"))
                    for r in results:
                        key = f"{r.get('doc_id')}:{r.get('page_number')}"
                        if key not in seen:
                            sources.append({
                                "doc_name": r.get("doc_name"),
                                "doc_id": r.get("doc_id"),
                                "page": r.get("page_number"),
                            })
                            seen.add(key)
                except json.JSONDecodeError:
                    pass
        
        return sources

69.3.5 API 层实现

# api/main.py

from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import uvicorn

app = FastAPI(title="Enterprise Knowledge Base API", version="1.0.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://internal.company.com"],
    allow_credentials=True,
    allow_methods=["POST", "GET"],
    allow_headers=["Authorization", "Content-Type"],
)

class QueryRequest(BaseModel):
    query: str
    session_id: Optional[str] = None
    department_filter: Optional[str] = None

class QueryResponse(BaseModel):
    answer: str
    sources: list[dict]
    session_id: str
    steps_taken: int
    response_time_ms: int

@app.post("/api/v1/query", response_model=QueryResponse)
async def query_knowledge_base(
    request: QueryRequest,
    current_user: dict = Depends(get_current_user),  # JWT 认证
    agent: HermesKnowledgeAgent = Depends(get_agent),
    audit_log = Depends(get_audit_log),
):
    import time
    start_time = time.time()
    
    # 构建用户上下文(权限信息)
    user_context = {
        "user_id": current_user["user_id"],
        "department": current_user["department"],
        "access_levels": current_user["access_levels"],
    }
    
    # 获取会话历史
    session_id = request.session_id or generate_session_id()
    history = await get_session_history(session_id)
    
    # 调用 Agent
    result = await agent.answer(
        user_query=request.query,
        user_context=user_context,
        conversation_history=history,
    )
    
    # 记录审计日志
    await audit_log.append({
        "event_type": "KB_QUERY",
        "user_id": current_user["user_id"],
        "session_id": session_id,
        "query": request.query,
        "sources_cited": [s["doc_id"] for s in result["sources"]],
        "steps_taken": result["steps"],
        "response_time_ms": int((time.time() - start_time) * 1000),
    })
    
    # 更新会话历史
    await save_session_history(session_id, history + [
        {"role": "user", "content": request.query},
        {"role": "assistant", "content": result["answer"]},
    ])
    
    return QueryResponse(
        answer=result["answer"],
        sources=result["sources"],
        session_id=session_id,
        steps_taken=result["steps"],
        response_time_ms=int((time.time() - start_time) * 1000),
    )

69.4 性能优化

class HybridRetriever:
    """
    混合检索:向量搜索 + 全文搜索 + 重排序
    效果比单一向量搜索提升 15-25%(实测数据)
    """
    
    def __init__(self, vector_store: VectorStore, es_client):
        self.vector_store = vector_store
        self.es = es_client
    
    async def retrieve(
        self,
        query: str,
        top_k: int = 10,
        user_context: dict = None,
        alpha: float = 0.6,  # 向量搜索权重,0.6 = 60%向量 + 40%全文
    ) -> list[dict]:
        """混合检索"""
        
        # 并行执行向量搜索和全文搜索
        vector_results, fts_results = await asyncio.gather(
            self.vector_store.search(query, top_k=top_k * 2),
            self._full_text_search(query, top_k=top_k * 2),
        )
        
        # 使用 RRF(Reciprocal Rank Fusion)融合结果
        fused = self._reciprocal_rank_fusion(
            [vector_results, fts_results],
            alpha=alpha,
        )
        
        # 重排序(Reranking):用更精确的交叉编码器重新排序
        reranked = await self._rerank(query, fused[:top_k * 2])
        
        return reranked[:top_k]
    
    def _reciprocal_rank_fusion(
        self,
        result_lists: list[list[dict]],
        k: int = 60,
        alpha: float = 0.6,
    ) -> list[dict]:
        """
        RRF 融合:每个结果的得分 = sum(1 / (k + rank_in_list))
        """
        weights = [alpha, 1 - alpha]  # 向量搜索权重 > 全文搜索
        scores = {}
        
        for i, (results, weight) in enumerate(zip(result_lists, weights)):
            for rank, result in enumerate(results):
                doc_key = f"{result['doc_id']}:{result.get('page_number', 0)}"
                rrf_score = weight * (1 / (k + rank + 1))
                
                if doc_key in scores:
                    scores[doc_key]["rrf_score"] += rrf_score
                else:
                    scores[doc_key] = {**result, "rrf_score": rrf_score}
        
        return sorted(scores.values(), key=lambda x: x["rrf_score"], reverse=True)
    
    async def _rerank(self, query: str, candidates: list[dict]) -> list[dict]:
        """
        使用交叉编码器重排序(Cohere Rerank API 或本地部署 BGE-reranker)
        """
        import cohere
        co = cohere.Client()  # 使用代理,避免直接暴露密钥
        
        texts = [c["content"] for c in candidates]
        
        rerank_results = co.rerank(
            model="rerank-multilingual-v3.0",
            query=query,
            documents=texts,
            top_n=len(candidates),
        )
        
        # 按重排序结果重新排列
        reranked = []
        for r in rerank_results.results:
            candidate = candidates[r.index]
            candidate["rerank_score"] = r.relevance_score
            reranked.append(candidate)
        
        return sorted(reranked, key=lambda x: x["rerank_score"], reverse=True)

69.4.2 查询改写优化

class QueryRewriter:
    """
    查询改写:将用户的自然语言问题改写为更适合检索的形式
    实测可提升 12% 的检索召回率
    """
    
    async def rewrite(self, original_query: str, chat_history: list[dict]) -> list[str]:
        """
        生成多个改写版本,提高检索覆盖率
        返回:[原始查询, 改写版本1, 改写版本2, ...]
        """
        
        prompt = f"""
你是一个搜索查询优化专家。给定一个用户问题,请生成3个不同的搜索查询变体,
这些变体应该从不同角度描述同一个问题,以提高文档检索覆盖率。

原始问题:{original_query}

对话历史(如果有):
{json.dumps(chat_history[-3:], ensure_ascii=False) if chat_history else "无"}

请输出3个查询变体(每行一个,不要编号):
"""
        response = await call_llm(prompt)
        variants = [line.strip() for line in response.split("\n") if line.strip()]
        
        return [original_query] + variants[:3]  # 原始查询 + 最多3个变体

69.5 部署方案

# k8s/hermes-kb-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: hermes-kb-agent
  namespace: knowledge-base
spec:
  replicas: 3
  selector:
    matchLabels:
      app: hermes-kb-agent
  template:
    metadata:
      labels:
        app: hermes-kb-agent
    spec:
      containers:
      - name: agent
        image: company-registry/hermes-kb-agent:v1.2.0
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "2000m"
        env:
        - name: QDRANT_URL
          value: "http://qdrant-service:6333"
        - name: ES_URL
          value: "http://elasticsearch:9200"
        envFrom:
        - secretRef:
            name: hermes-kb-secrets  # 通过 External Secrets Operator 管理
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 5
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: hermes-kb-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: hermes-kb-agent
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70

69.6 踩坑记录(5 个真实问题)

坑 1:分块策略破坏表格结构

问题描述:PDF 中的表格被滑动窗口强行切断,导致检索到的是残缺的表格片段,回答不完整。

示例

原始表格:
| 产品型号 | 额定电压 | 额定电流 | 适用温度 |
|---------|---------|---------|---------|
| A-100   | 220V    | 10A     | -20~50℃|
| B-200   | 380V    | 20A     | -10~60℃|

被切断后的分块:
分块1:| 产品型号 | 额定电压 | 额定电流 | 适用温度 |
分块2:| A-100   | 220V    | 10A     | -20~50℃|
分块3:| B-200   | 380V    | 20A     | -10~60℃|

问题:每个分块单独看都没有语义

解决方案

def smart_chunk_with_table_awareness(elements: list) -> list[str]:
    """表格感知分块:表格作为整体不切断"""
    chunks = []
    current_chunk = []
    in_table = False
    table_content = []
    
    for element in elements:
        if element.__class__.__name__ == "Table":
            # 表格作为整体处理
            if current_chunk:
                chunks.append("\n".join(current_chunk))
                current_chunk = []
            chunks.append(element.text)  # 整张表格作为一个分块
        else:
            current_chunk.append(element.text)
            if len(" ".join(current_chunk).split()) > CHUNK_SIZE:
                chunks.append("\n".join(current_chunk))
                current_chunk = []
    
    return chunks

教训:分块策略必须是内容感知的,不能盲目按 token 数切割。


坑 2:中英混合文档的 Embedding 质量差

问题描述:文档中大量中英混合内容(如"A-100型号的Operating Temperature是-20~50°C"),检索精度显著下降。

根本原因:Embedding 模型在中英混合输入上的语义表示质量不稳定。

解决方案

def normalize_mixed_language_text(text: str) -> str:
    """
    统一中英混合文本的格式
    策略:在中英切换处加入空格,帮助模型正确分词
    """
    import re
    # 在中文字符和英文字符之间插入空格
    text = re.sub(r'([\u4e00-\u9fff])([A-Za-z0-9])', r'\1 \2', text)
    text = re.sub(r'([A-Za-z0-9])([\u4e00-\u9fff])', r'\1 \2', text)
    return text

# 同时为每个分块生成中文和英文描述(双语索引)
async def create_bilingual_chunks(chunk: DocumentChunk, translator) -> list[DocumentChunk]:
    """为非纯中文内容创建双语版本"""
    if not is_predominantly_chinese(chunk.content):
        zh_translation = await translator.to_chinese(chunk.content)
        return [chunk, DocumentChunk(**{**chunk.__dict__, "content": zh_translation})]
    return [chunk]

坑 3:Agent 回答时"创造"不存在的文档引用

问题描述:Agent 有时会编造文档引用,声称某信息来自"《产品安全手册》第23页",但这个文档根本不存在。

根本原因:检索结果相关性不够时,LLM 倾向于"补全"可信的来源。

解决方案

SYSTEM_PROMPT_ADDITION = """
重要规则:引用文档时必须遵守以下格式:
【来源:{工具返回的doc_name},第{工具返回的page_number}页】

严禁:
- 引用任何未通过工具检索到的文档
- 自行编造文档名称或页码
- 对检索结果进行推测性延伸

如果检索结果不足以回答问题,请明确说明:
"根据当前知识库中的文档,无法找到关于XX的完整信息。
已找到的相关内容:[列出找到的内容]
建议:[向哪个部门咨询]"
"""

def validate_citations(answer: str, tool_call_results: list[dict]) -> str:
    """验证答案中的引用是否真实存在"""
    valid_sources = {
        f"{r.get('doc_name')}:{r.get('page_number')}"
        for call in tool_call_results
        for r in (json.loads(call.get("result", "[]")) if call.get("result") else [])
    }
    
    # 提取答案中的引用
    citation_pattern = re.compile(r'【来源:(.+?),第(\d+)页】')
    citations = citation_pattern.findall(answer)
    
    invalid = [c for c in citations if f"{c[0]}:{c[1]}" not in valid_sources]
    
    if invalid:
        # 移除无效引用并添加警告
        for doc, page in invalid:
            answer = answer.replace(f"【来源:{doc},第{page}页】", "[引用验证失败]")
        answer += "\n\n⚠️ 注意:部分引用无法验证,请以实际文档为准。"
    
    return answer

坑 4:热门文档"淹没"长尾文档

问题描述:某些频繁被引用的文档(如《公司规章制度》)会出现在几乎所有查询结果的前列,导致真正相关的技术文档被排在后面。

解决方案

def apply_diversity_penalty(results: list[dict], max_from_same_doc: int = 2) -> list[dict]:
    """
    多样性惩罚:限制同一文档出现的次数
    保证答案来源的多样性
    """
    doc_count = {}
    filtered = []
    
    for result in results:
        doc_id = result["doc_id"]
        count = doc_count.get(doc_id, 0)
        
        if count < max_from_same_doc:
            filtered.append(result)
            doc_count[doc_id] = count + 1
    
    return filtered

def apply_freshness_boost(results: list[dict], boost_factor: float = 0.1) -> list[dict]:
    """
    时效性加权:更新时间近的文档得分略高
    解决过期文档排名靠前的问题
    """
    from datetime import datetime
    now = datetime.utcnow()
    
    for result in results:
        updated_at = datetime.fromisoformat(result.get("updated_at", "2020-01-01"))
        days_old = (now - updated_at).days
        freshness_boost = max(0, boost_factor * (1 - days_old / 365))
        result["score"] = result.get("score", 0) + freshness_boost
    
    return sorted(results, key=lambda x: x["score"], reverse=True)

坑 5:大并发下 Qdrant 连接池耗尽

问题描述:在 200 并发用户压测时,Qdrant 客户端出现 ConnectionPool exhausted 错误,P99 响应时间飙升至 30+ 秒。

解决方案

from qdrant_client import AsyncQdrantClient
from contextlib import asynccontextmanager
import asyncio

class QdrantConnectionPool:
    """Qdrant 异步连接池"""
    
    def __init__(self, url: str, pool_size: int = 20):
        self._semaphore = asyncio.Semaphore(pool_size)
        self._client = AsyncQdrantClient(url=url, timeout=30)
        self.pool_size = pool_size
    
    @asynccontextmanager
    async def acquire(self):
        """获取客户端(带超时的信号量控制)"""
        try:
            await asyncio.wait_for(self._semaphore.acquire(), timeout=5.0)
            try:
                yield self._client
            finally:
                self._semaphore.release()
        except asyncio.TimeoutError:
            raise RuntimeError("Qdrant connection pool exhausted. Try again later.")
    
    async def search_with_retry(self, *args, max_retries: int = 3, **kwargs) -> list:
        """带重试的搜索"""
        for attempt in range(max_retries):
            try:
                async with self.acquire() as client:
                    return await client.search(*args, **kwargs)
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                await asyncio.sleep(0.5 * (attempt + 1))  # 指数退避
        return []

# 在 FastAPI 启动时初始化连接池
@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.qdrant_pool = QdrantConnectionPool(
        url=settings.QDRANT_URL,
        pool_size=50,  # 根据负载调整
    )
    yield
    # 关闭时清理

本章小结

本章通过一个完整的企业知识库项目,将 RAG + Hermes Agent 的最佳实践落地:

  1. 文档处理:Unstructured.io 解析多格式文档,滑动窗口+表格感知分块
  2. 向量存储:Qdrant + OpenAI Embedding,支持部门和权限过滤
  3. 混合检索:向量搜索 + 全文搜索 + RRF 融合 + Cohere 重排序(提升 15-25%)
  4. Agent 设计:工具权限绑定用户上下文,引用验证防止幻觉
  5. 5个踩坑:表格分块、中英混合、幻觉引用、热门文档淹没、连接池耗尽

思考题

  1. 在这个系统中,如果一位员工问了一个跨部门的问题(他只有A部门权限,但最佳答案在B部门文档中),系统应该如何响应?既不能泄露B部门数据,又要帮助用户找到正确的联系人。
  2. 查询改写(生成3个变体)会增加 LLM 调用成本。在什么情况下这个额外成本是值得的?如何设计一个自适应策略(简单查询不改写,复杂查询才改写)?
  3. 重排序(Reranking)是提升精度的有效手段,但会增加延迟。对于 P95 < 5秒的响应时间目标,你会如何在精度和延迟之间做权衡?
  4. 如果文档更新非常频繁(每天有数百个文档更新),你会如何设计增量更新策略,在不影响查询服务的情况下保持索引的实时性?
本章评分
4.7  / 5  (3 评分)

💬 留言讨论