第 69 章
案例:智能知识库助手(RAG + Hermes)
第69章:案例:智能知识库助手(RAG + Hermes)
本章通过一个完整的企业级项目——基于 Hermes Agent 和向量数据库构建的内部文档问答系统,将前68章的理论落地为可运行的代码。从需求分析到生产部署,我们将完整走过每一个环节,并诚实地记录在真实开发过程中踩过的5个坑。
69.1 需求分析
69.1.1 业务场景
客户:一家拥有 5000+ 名员工的制造业企业
痛点:
- 内部文档体量巨大:产品手册、工艺规程、培训材料、合规文件共计 12 万+ 文档
- 员工找文档费时费力:平均每次查找需要 23 分钟
- 文档更新不及时:员工经常参考过期版本
- 跨部门知识孤岛:A 部门有答案,B 部门不知道
目标:构建企业内部知识库问答系统,让员工用自然语言提问,秒级获得准确答案(含来源引用)。
69.1.2 需求矩阵
| 需求 | 描述 | 优先级 |
|---|---|---|
| 自然语言问答 | 支持中英文混合提问 | P0 |
| 文档引用 | 答案必须标注来源文档和页码 | P0 |
| 实时性 | 文档更新后30分钟内在系统中生效 | P0 |
| 多轮对话 | 支持追问和上下文关联 | P1 |
| 权限控制 | 不同部门只能访问授权文档 | P1 |
| 文档类型 | 支持 PDF、Word、Excel、PPT、Markdown | P1 |
| 响应时间 | P95 < 5秒 | P1 |
| 并发 | 支持 200 并发用户 | P2 |
| 追溯日志 | 所有查询可审计 | P2 |
69.2 架构设计
69.2.1 整体架构
用户查询
│
▼
┌───────────────────────────────────────────────────────┐
│ Hermes Agent 层 │
│ ┌─────────────┐ ┌─────────────┐ ┌──────────────┐ │
│ │ 查询理解 │ │ 工具编排 │ │ 答案合成 │ │
│ │ - 意图识别 │ │ - 检索策略 │ │ - 引用标注 │ │
│ │ - 查询改写 │ │ - 多工具组合 │ │ - 格式化 │ │
│ └─────────────┘ └─────────────┘ └──────────────┘ │
└───────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────┐ ┌──────────┐ ┌──────────────┐
│ Qdrant │ │ 全文索引 │ │ 文档元数据 │
│ 向量库 │ │ (ES) │ │ (PostgreSQL) │
└─────────┘ └──────────┘ └──────────────┘
▲ ▲
│ │
┌───────────────────────────────────────────────────────┐
│ 文档处理流水线 │
│ 文档上传 → 解析 → 分块 → 嵌入 → 存储 → 索引更新 │
└───────────────────────────────────────────────────────┘
69.2.2 技术选型
| 组件 | 选型 | 理由 |
|---|---|---|
| Agent 框架 | Hermes Agent | 工具调用精准,支持多步推理 |
| 向量数据库 | Qdrant | 开源、高性能、支持过滤 |
| 全文搜索 | Elasticsearch | 混合检索,处理精确匹配 |
| Embedding 模型 | text-embedding-3-large | 中英文效果好 |
| 文档解析 | Unstructured.io | 支持多种格式,布局感知 |
| 元数据存储 | PostgreSQL | 文档版本管理、权限控制 |
| API 网关 | FastAPI | 高性能,异步支持 |
| 部署 | Kubernetes | 弹性扩缩容 |
69.3 完整实现代码
69.3.1 文档处理流水线
# pipeline/document_processor.py
import asyncio
import hashlib
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
import unstructured.partition.auto as auto_partition
from unstructured.documents.elements import Element
@dataclass
class DocumentChunk:
"""文档分块"""
chunk_id: str
doc_id: str
doc_name: str
page_number: int
chunk_index: int
content: str
content_type: str # text / table / image_caption
# 元数据
department: str
access_level: str
created_at: str
updated_at: str
# 向量(后续填充)
embedding: Optional[list[float]] = None
@property
def word_count(self) -> int:
return len(self.content.split())
class DocumentProcessor:
"""文档处理器:解析、分块、生成摘要"""
def __init__(self, chunking_config: dict):
self.chunk_size = chunking_config.get("chunk_size", 512) # tokens
self.chunk_overlap = chunking_config.get("chunk_overlap", 64)
self.min_chunk_size = chunking_config.get("min_chunk_size", 50)
async def process_document(
self,
file_path: str,
doc_metadata: dict
) -> list[DocumentChunk]:
"""处理单个文档,返回分块列表"""
# 1. 解析文档(Unstructured.io 处理多种格式)
elements = auto_partition.partition(
filename=file_path,
languages=["chi_sim", "eng"], # 支持中英文
strategy="hi_res", # 高精度模式(识别表格、图片)
include_page_breaks=True,
)
# 2. 按页面/章节分组
grouped = self._group_elements(elements)
# 3. 生成分块
chunks = []
chunk_index = 0
for page_num, page_elements in grouped.items():
page_text = "\n".join(e.text for e in page_elements if e.text)
if len(page_text) < self.min_chunk_size:
continue
# 滑动窗口分块
page_chunks = self._sliding_window_chunk(page_text)
for chunk_text in page_chunks:
chunk_id = hashlib.sha256(
f"{doc_metadata['doc_id']}:{chunk_index}:{chunk_text[:50]}".encode()
).hexdigest()[:16]
chunks.append(DocumentChunk(
chunk_id=chunk_id,
doc_id=doc_metadata["doc_id"],
doc_name=doc_metadata["doc_name"],
page_number=page_num,
chunk_index=chunk_index,
content=chunk_text,
content_type=self._detect_content_type(page_elements),
department=doc_metadata.get("department", "general"),
access_level=doc_metadata.get("access_level", "public"),
created_at=doc_metadata.get("created_at", ""),
updated_at=doc_metadata.get("updated_at", ""),
))
chunk_index += 1
return chunks
def _sliding_window_chunk(self, text: str) -> list[str]:
"""
滑动窗口分块
策略:优先在句号/换行处切分,保持语义完整性
"""
# 按句子切分
import re
sentences = re.split(r'(?<=[。!?.!?])\s+', text)
chunks = []
current_chunk = []
current_size = 0
for sentence in sentences:
sentence_size = len(sentence.split())
if current_size + sentence_size > self.chunk_size and current_chunk:
chunks.append(" ".join(current_chunk))
# 保留重叠部分
overlap_sentences = []
overlap_size = 0
for s in reversed(current_chunk):
overlap_size += len(s.split())
if overlap_size > self.chunk_overlap:
break
overlap_sentences.insert(0, s)
current_chunk = overlap_sentences
current_size = sum(len(s.split()) for s in current_chunk)
current_chunk.append(sentence)
current_size += sentence_size
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def _group_elements(self, elements: list[Element]) -> dict[int, list[Element]]:
"""按页面分组文档元素"""
grouped = {}
current_page = 1
for element in elements:
# Unstructured 在页面分隔处插入 PageBreak 元素
if element.__class__.__name__ == "PageBreak":
current_page += 1
else:
if current_page not in grouped:
grouped[current_page] = []
grouped[current_page].append(element)
return grouped
def _detect_content_type(self, elements: list) -> str:
"""检测内容类型"""
type_counts = {}
for e in elements:
t = e.__class__.__name__
type_counts[t] = type_counts.get(t, 0) + 1
if "Table" in type_counts and type_counts["Table"] > len(elements) * 0.3:
return "table"
return "text"
69.3.2 Embedding 与向量存储
# pipeline/vector_store.py
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance, VectorParams, PointStruct, Filter,
FieldCondition, MatchValue, Range
)
from openai import AsyncOpenAI
import asyncio
from typing import Optional
class VectorStore:
"""Qdrant 向量存储管理器"""
COLLECTION_NAME = "enterprise_kb"
VECTOR_DIM = 3072 # text-embedding-3-large 的维度
def __init__(self, qdrant_url: str, openai_api_key: str):
self.qdrant = QdrantClient(url=qdrant_url)
self.openai = AsyncOpenAI(api_key=openai_api_key)
self._ensure_collection()
def _ensure_collection(self):
"""确保向量集合存在(幂等操作)"""
collections = [c.name for c in self.qdrant.get_collections().collections]
if self.COLLECTION_NAME not in collections:
self.qdrant.create_collection(
collection_name=self.COLLECTION_NAME,
vectors_config=VectorParams(
size=self.VECTOR_DIM,
distance=Distance.COSINE,
),
)
# 创建用于过滤的索引(部门、访问级别)
self.qdrant.create_payload_index(
collection_name=self.COLLECTION_NAME,
field_name="department",
field_schema="keyword",
)
self.qdrant.create_payload_index(
collection_name=self.COLLECTION_NAME,
field_name="access_level",
field_schema="keyword",
)
self.qdrant.create_payload_index(
collection_name=self.COLLECTION_NAME,
field_name="doc_id",
field_schema="keyword",
)
async def embed_chunks(self, chunks: list[DocumentChunk]) -> list[DocumentChunk]:
"""批量生成 Embedding(控制并发避免 Rate Limit)"""
BATCH_SIZE = 100 # OpenAI 批量 embedding 限制
semaphore = asyncio.Semaphore(3) # 最多3个并发请求
async def embed_batch(batch: list[DocumentChunk]) -> list[DocumentChunk]:
async with semaphore:
texts = [chunk.content for chunk in batch]
response = await self.openai.embeddings.create(
input=texts,
model="text-embedding-3-large",
)
for i, chunk in enumerate(batch):
chunk.embedding = response.data[i].embedding
return batch
# 分批处理
batches = [chunks[i:i+BATCH_SIZE] for i in range(0, len(chunks), BATCH_SIZE)]
results = await asyncio.gather(*[embed_batch(batch) for batch in batches])
return [chunk for batch in results for chunk in batch]
def upsert_chunks(self, chunks: list[DocumentChunk]):
"""将分块写入 Qdrant(upsert 支持更新)"""
points = [
PointStruct(
id=int(chunk.chunk_id[:8], 16), # 将前8位hex转为int作为ID
vector=chunk.embedding,
payload={
"chunk_id": chunk.chunk_id,
"doc_id": chunk.doc_id,
"doc_name": chunk.doc_name,
"page_number": chunk.page_number,
"chunk_index": chunk.chunk_index,
"content": chunk.content,
"content_type": chunk.content_type,
"department": chunk.department,
"access_level": chunk.access_level,
"updated_at": chunk.updated_at,
}
)
for chunk in chunks
if chunk.embedding is not None
]
# 批量写入
BATCH_SIZE = 100
for i in range(0, len(points), BATCH_SIZE):
self.qdrant.upsert(
collection_name=self.COLLECTION_NAME,
points=points[i:i+BATCH_SIZE],
)
async def search(
self,
query: str,
top_k: int = 10,
department_filter: Optional[str] = None,
access_levels: Optional[list[str]] = None,
score_threshold: float = 0.7,
) -> list[dict]:
"""语义搜索"""
# 生成查询向量
response = await self.openai.embeddings.create(
input=[query],
model="text-embedding-3-large",
)
query_vector = response.data[0].embedding
# 构建过滤条件(权限控制)
filters = []
if department_filter:
filters.append(FieldCondition(
key="department",
match=MatchValue(value=department_filter)
))
if access_levels:
filters.append(FieldCondition(
key="access_level",
match=MatchValue(any=access_levels)
))
query_filter = Filter(must=filters) if filters else None
# 执行搜索
results = self.qdrant.search(
collection_name=self.COLLECTION_NAME,
query_vector=query_vector,
query_filter=query_filter,
limit=top_k,
score_threshold=score_threshold,
with_payload=True,
)
return [
{
"score": r.score,
"content": r.payload["content"],
"doc_name": r.payload["doc_name"],
"doc_id": r.payload["doc_id"],
"page_number": r.payload["page_number"],
"department": r.payload["department"],
}
for r in results
]
69.3.3 Hermes Agent 工具定义
# agent/tools.py
from typing import Optional
import json
import re
class KnowledgeBaseTools:
"""知识库 Agent 工具集"""
def __init__(
self,
vector_store: VectorStore,
full_text_search, # Elasticsearch 客户端
db_session, # PostgreSQL Session
):
self.vector_store = vector_store
self.fts = full_text_search
self.db = db_session
def get_tool_definitions(self) -> list[dict]:
"""返回 Hermes API 格式的工具定义"""
return [
{
"name": "semantic_search",
"description": "在知识库中进行语义搜索,找出语义相关的文档片段。适合开放式问题和概念查询。",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询,用自然语言描述"
},
"top_k": {
"type": "integer",
"description": "返回结果数量(默认5,最多20)",
"default": 5
},
"department": {
"type": "string",
"description": "可选:限定在特定部门文档中搜索"
},
},
"required": ["query"]
}
},
{
"name": "keyword_search",
"description": "在知识库中进行关键词精确搜索,适合搜索产品型号、规格参数、文档编号等精确信息。",
"input_schema": {
"type": "object",
"properties": {
"keywords": {
"type": "string",
"description": "搜索关键词(支持AND/OR/NOT逻辑)"
},
"doc_type": {
"type": "string",
"enum": ["manual", "standard", "policy", "training", "all"],
"description": "文档类型过滤"
},
},
"required": ["keywords"]
}
},
{
"name": "get_document_content",
"description": "获取特定文档的完整内容(或指定页范围)",
"input_schema": {
"type": "object",
"properties": {
"doc_id": {"type": "string", "description": "文档ID"},
"page_start": {"type": "integer", "description": "起始页(从1开始)"},
"page_end": {"type": "integer", "description": "结束页"},
},
"required": ["doc_id"]
}
},
{
"name": "get_document_list",
"description": "获取知识库中特定类别的文档列表",
"input_schema": {
"type": "object",
"properties": {
"category": {"type": "string", "description": "文档类别"},
"updated_after": {"type": "string", "description": "更新日期过滤(ISO格式)"},
},
}
},
]
async def execute_tool(self, tool_name: str, tool_args: dict, user_context: dict) -> str:
"""执行工具调用"""
# 注入用户权限上下文
department = user_context.get("department")
access_levels = user_context.get("access_levels", ["public"])
if tool_name == "semantic_search":
results = await self.vector_store.search(
query=tool_args["query"],
top_k=min(tool_args.get("top_k", 5), 20),
department_filter=tool_args.get("department") or department,
access_levels=access_levels,
)
return json.dumps(results, ensure_ascii=False, indent=2)
elif tool_name == "keyword_search":
results = await self._keyword_search(
keywords=tool_args["keywords"],
doc_type=tool_args.get("doc_type", "all"),
access_levels=access_levels,
)
return json.dumps(results, ensure_ascii=False, indent=2)
elif tool_name == "get_document_content":
content = await self._get_doc_content(
doc_id=tool_args["doc_id"],
page_start=tool_args.get("page_start", 1),
page_end=tool_args.get("page_end"),
access_levels=access_levels,
)
return content
else:
return json.dumps({"error": f"Unknown tool: {tool_name}"})
69.3.4 Hermes Agent 主控逻辑
# agent/hermes_kb_agent.py
import anthropic
import json
from typing import AsyncGenerator
class HermesKnowledgeAgent:
"""
基于 Hermes Agent 框架的知识库问答 Agent
注意:这里使用 Anthropic Claude 作为底层 LLM
实际部署中替换为 Hermes API 端点
"""
SYSTEM_PROMPT = """你是企业内部知识库助手,负责帮助员工快速找到工作相关的信息。
你的工作原则:
1. 只基于知识库中的文档内容回答,不要凭记忆或推测
2. 每个关键信息点必须标注来源(文档名称 + 页码)
3. 如果知识库中没有相关信息,明确告知用户"知识库中暂无此信息"
4. 对于技术规格、安全规程类问题,务必提醒用户以最新版文档为准
5. 保持回答简洁专业,优先使用表格、列表等结构化格式
安全约束(不可违反):
- 不得访问或透露其他部门的受权限保护文档
- 不得执行任何文档读取以外的操作
- 不得修改或删除任何内容
"""
def __init__(self, tools: KnowledgeBaseTools, model: str = "claude-opus-4-5"):
self.client = anthropic.Anthropic()
self.tools_handler = tools
self.model = model
self.tool_defs = tools.get_tool_definitions()
async def answer(
self,
user_query: str,
user_context: dict,
conversation_history: list[dict] = None,
max_steps: int = 10,
) -> dict:
"""
处理用户问题,返回带来源引用的答案
"""
messages = conversation_history or []
messages.append({"role": "user", "content": user_query})
tool_calls_log = []
for step in range(max_steps):
# 调用 Hermes/Claude
response = self.client.messages.create(
model=self.model,
max_tokens=4096,
system=self.SYSTEM_PROMPT,
tools=self.tool_defs,
messages=messages,
)
# 检查停止原因
if response.stop_reason == "end_turn":
# Agent 完成回答
final_answer = next(
(block.text for block in response.content if hasattr(block, "text")),
"无法生成回答"
)
return {
"answer": final_answer,
"tool_calls": tool_calls_log,
"steps": step + 1,
"sources": self._extract_sources(tool_calls_log),
}
elif response.stop_reason == "tool_use":
# 处理工具调用
tool_results = []
for block in response.content:
if block.type == "tool_use":
# 记录工具调用
tool_calls_log.append({
"step": step,
"tool": block.name,
"args": block.input,
})
# 执行工具(带权限控制)
result = await self.tools_handler.execute_tool(
tool_name=block.name,
tool_args=block.input,
user_context=user_context,
)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
# 将工具结果加入消息历史
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
else:
break
return {
"answer": "回答生成超过最大步骤限制,请简化您的问题",
"tool_calls": tool_calls_log,
"steps": max_steps,
"sources": [],
}
def _extract_sources(self, tool_calls: list[dict]) -> list[dict]:
"""从工具调用结果中提取来源引用"""
sources = []
seen = set()
for call in tool_calls:
if call["tool"] in ["semantic_search", "keyword_search"]:
try:
results = json.loads(call.get("result", "[]"))
for r in results:
key = f"{r.get('doc_id')}:{r.get('page_number')}"
if key not in seen:
sources.append({
"doc_name": r.get("doc_name"),
"doc_id": r.get("doc_id"),
"page": r.get("page_number"),
})
seen.add(key)
except json.JSONDecodeError:
pass
return sources
69.3.5 API 层实现
# api/main.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import uvicorn
app = FastAPI(title="Enterprise Knowledge Base API", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["https://internal.company.com"],
allow_credentials=True,
allow_methods=["POST", "GET"],
allow_headers=["Authorization", "Content-Type"],
)
class QueryRequest(BaseModel):
query: str
session_id: Optional[str] = None
department_filter: Optional[str] = None
class QueryResponse(BaseModel):
answer: str
sources: list[dict]
session_id: str
steps_taken: int
response_time_ms: int
@app.post("/api/v1/query", response_model=QueryResponse)
async def query_knowledge_base(
request: QueryRequest,
current_user: dict = Depends(get_current_user), # JWT 认证
agent: HermesKnowledgeAgent = Depends(get_agent),
audit_log = Depends(get_audit_log),
):
import time
start_time = time.time()
# 构建用户上下文(权限信息)
user_context = {
"user_id": current_user["user_id"],
"department": current_user["department"],
"access_levels": current_user["access_levels"],
}
# 获取会话历史
session_id = request.session_id or generate_session_id()
history = await get_session_history(session_id)
# 调用 Agent
result = await agent.answer(
user_query=request.query,
user_context=user_context,
conversation_history=history,
)
# 记录审计日志
await audit_log.append({
"event_type": "KB_QUERY",
"user_id": current_user["user_id"],
"session_id": session_id,
"query": request.query,
"sources_cited": [s["doc_id"] for s in result["sources"]],
"steps_taken": result["steps"],
"response_time_ms": int((time.time() - start_time) * 1000),
})
# 更新会话历史
await save_session_history(session_id, history + [
{"role": "user", "content": request.query},
{"role": "assistant", "content": result["answer"]},
])
return QueryResponse(
answer=result["answer"],
sources=result["sources"],
session_id=session_id,
steps_taken=result["steps"],
response_time_ms=int((time.time() - start_time) * 1000),
)
69.4 性能优化
69.4.1 混合检索(Hybrid Search)
class HybridRetriever:
"""
混合检索:向量搜索 + 全文搜索 + 重排序
效果比单一向量搜索提升 15-25%(实测数据)
"""
def __init__(self, vector_store: VectorStore, es_client):
self.vector_store = vector_store
self.es = es_client
async def retrieve(
self,
query: str,
top_k: int = 10,
user_context: dict = None,
alpha: float = 0.6, # 向量搜索权重,0.6 = 60%向量 + 40%全文
) -> list[dict]:
"""混合检索"""
# 并行执行向量搜索和全文搜索
vector_results, fts_results = await asyncio.gather(
self.vector_store.search(query, top_k=top_k * 2),
self._full_text_search(query, top_k=top_k * 2),
)
# 使用 RRF(Reciprocal Rank Fusion)融合结果
fused = self._reciprocal_rank_fusion(
[vector_results, fts_results],
alpha=alpha,
)
# 重排序(Reranking):用更精确的交叉编码器重新排序
reranked = await self._rerank(query, fused[:top_k * 2])
return reranked[:top_k]
def _reciprocal_rank_fusion(
self,
result_lists: list[list[dict]],
k: int = 60,
alpha: float = 0.6,
) -> list[dict]:
"""
RRF 融合:每个结果的得分 = sum(1 / (k + rank_in_list))
"""
weights = [alpha, 1 - alpha] # 向量搜索权重 > 全文搜索
scores = {}
for i, (results, weight) in enumerate(zip(result_lists, weights)):
for rank, result in enumerate(results):
doc_key = f"{result['doc_id']}:{result.get('page_number', 0)}"
rrf_score = weight * (1 / (k + rank + 1))
if doc_key in scores:
scores[doc_key]["rrf_score"] += rrf_score
else:
scores[doc_key] = {**result, "rrf_score": rrf_score}
return sorted(scores.values(), key=lambda x: x["rrf_score"], reverse=True)
async def _rerank(self, query: str, candidates: list[dict]) -> list[dict]:
"""
使用交叉编码器重排序(Cohere Rerank API 或本地部署 BGE-reranker)
"""
import cohere
co = cohere.Client() # 使用代理,避免直接暴露密钥
texts = [c["content"] for c in candidates]
rerank_results = co.rerank(
model="rerank-multilingual-v3.0",
query=query,
documents=texts,
top_n=len(candidates),
)
# 按重排序结果重新排列
reranked = []
for r in rerank_results.results:
candidate = candidates[r.index]
candidate["rerank_score"] = r.relevance_score
reranked.append(candidate)
return sorted(reranked, key=lambda x: x["rerank_score"], reverse=True)
69.4.2 查询改写优化
class QueryRewriter:
"""
查询改写:将用户的自然语言问题改写为更适合检索的形式
实测可提升 12% 的检索召回率
"""
async def rewrite(self, original_query: str, chat_history: list[dict]) -> list[str]:
"""
生成多个改写版本,提高检索覆盖率
返回:[原始查询, 改写版本1, 改写版本2, ...]
"""
prompt = f"""
你是一个搜索查询优化专家。给定一个用户问题,请生成3个不同的搜索查询变体,
这些变体应该从不同角度描述同一个问题,以提高文档检索覆盖率。
原始问题:{original_query}
对话历史(如果有):
{json.dumps(chat_history[-3:], ensure_ascii=False) if chat_history else "无"}
请输出3个查询变体(每行一个,不要编号):
"""
response = await call_llm(prompt)
variants = [line.strip() for line in response.split("\n") if line.strip()]
return [original_query] + variants[:3] # 原始查询 + 最多3个变体
69.5 部署方案
# k8s/hermes-kb-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: hermes-kb-agent
namespace: knowledge-base
spec:
replicas: 3
selector:
matchLabels:
app: hermes-kb-agent
template:
metadata:
labels:
app: hermes-kb-agent
spec:
containers:
- name: agent
image: company-registry/hermes-kb-agent:v1.2.0
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
env:
- name: QDRANT_URL
value: "http://qdrant-service:6333"
- name: ES_URL
value: "http://elasticsearch:9200"
envFrom:
- secretRef:
name: hermes-kb-secrets # 通过 External Secrets Operator 管理
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: hermes-kb-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: hermes-kb-agent
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
69.6 踩坑记录(5 个真实问题)
坑 1:分块策略破坏表格结构
问题描述:PDF 中的表格被滑动窗口强行切断,导致检索到的是残缺的表格片段,回答不完整。
示例:
原始表格:
| 产品型号 | 额定电压 | 额定电流 | 适用温度 |
|---------|---------|---------|---------|
| A-100 | 220V | 10A | -20~50℃|
| B-200 | 380V | 20A | -10~60℃|
被切断后的分块:
分块1:| 产品型号 | 额定电压 | 额定电流 | 适用温度 |
分块2:| A-100 | 220V | 10A | -20~50℃|
分块3:| B-200 | 380V | 20A | -10~60℃|
问题:每个分块单独看都没有语义
解决方案:
def smart_chunk_with_table_awareness(elements: list) -> list[str]:
"""表格感知分块:表格作为整体不切断"""
chunks = []
current_chunk = []
in_table = False
table_content = []
for element in elements:
if element.__class__.__name__ == "Table":
# 表格作为整体处理
if current_chunk:
chunks.append("\n".join(current_chunk))
current_chunk = []
chunks.append(element.text) # 整张表格作为一个分块
else:
current_chunk.append(element.text)
if len(" ".join(current_chunk).split()) > CHUNK_SIZE:
chunks.append("\n".join(current_chunk))
current_chunk = []
return chunks
教训:分块策略必须是内容感知的,不能盲目按 token 数切割。
坑 2:中英混合文档的 Embedding 质量差
问题描述:文档中大量中英混合内容(如"A-100型号的Operating Temperature是-20~50°C"),检索精度显著下降。
根本原因:Embedding 模型在中英混合输入上的语义表示质量不稳定。
解决方案:
def normalize_mixed_language_text(text: str) -> str:
"""
统一中英混合文本的格式
策略:在中英切换处加入空格,帮助模型正确分词
"""
import re
# 在中文字符和英文字符之间插入空格
text = re.sub(r'([\u4e00-\u9fff])([A-Za-z0-9])', r'\1 \2', text)
text = re.sub(r'([A-Za-z0-9])([\u4e00-\u9fff])', r'\1 \2', text)
return text
# 同时为每个分块生成中文和英文描述(双语索引)
async def create_bilingual_chunks(chunk: DocumentChunk, translator) -> list[DocumentChunk]:
"""为非纯中文内容创建双语版本"""
if not is_predominantly_chinese(chunk.content):
zh_translation = await translator.to_chinese(chunk.content)
return [chunk, DocumentChunk(**{**chunk.__dict__, "content": zh_translation})]
return [chunk]
坑 3:Agent 回答时"创造"不存在的文档引用
问题描述:Agent 有时会编造文档引用,声称某信息来自"《产品安全手册》第23页",但这个文档根本不存在。
根本原因:检索结果相关性不够时,LLM 倾向于"补全"可信的来源。
解决方案:
SYSTEM_PROMPT_ADDITION = """
重要规则:引用文档时必须遵守以下格式:
【来源:{工具返回的doc_name},第{工具返回的page_number}页】
严禁:
- 引用任何未通过工具检索到的文档
- 自行编造文档名称或页码
- 对检索结果进行推测性延伸
如果检索结果不足以回答问题,请明确说明:
"根据当前知识库中的文档,无法找到关于XX的完整信息。
已找到的相关内容:[列出找到的内容]
建议:[向哪个部门咨询]"
"""
def validate_citations(answer: str, tool_call_results: list[dict]) -> str:
"""验证答案中的引用是否真实存在"""
valid_sources = {
f"{r.get('doc_name')}:{r.get('page_number')}"
for call in tool_call_results
for r in (json.loads(call.get("result", "[]")) if call.get("result") else [])
}
# 提取答案中的引用
citation_pattern = re.compile(r'【来源:(.+?),第(\d+)页】')
citations = citation_pattern.findall(answer)
invalid = [c for c in citations if f"{c[0]}:{c[1]}" not in valid_sources]
if invalid:
# 移除无效引用并添加警告
for doc, page in invalid:
answer = answer.replace(f"【来源:{doc},第{page}页】", "[引用验证失败]")
answer += "\n\n⚠️ 注意:部分引用无法验证,请以实际文档为准。"
return answer
坑 4:热门文档"淹没"长尾文档
问题描述:某些频繁被引用的文档(如《公司规章制度》)会出现在几乎所有查询结果的前列,导致真正相关的技术文档被排在后面。
解决方案:
def apply_diversity_penalty(results: list[dict], max_from_same_doc: int = 2) -> list[dict]:
"""
多样性惩罚:限制同一文档出现的次数
保证答案来源的多样性
"""
doc_count = {}
filtered = []
for result in results:
doc_id = result["doc_id"]
count = doc_count.get(doc_id, 0)
if count < max_from_same_doc:
filtered.append(result)
doc_count[doc_id] = count + 1
return filtered
def apply_freshness_boost(results: list[dict], boost_factor: float = 0.1) -> list[dict]:
"""
时效性加权:更新时间近的文档得分略高
解决过期文档排名靠前的问题
"""
from datetime import datetime
now = datetime.utcnow()
for result in results:
updated_at = datetime.fromisoformat(result.get("updated_at", "2020-01-01"))
days_old = (now - updated_at).days
freshness_boost = max(0, boost_factor * (1 - days_old / 365))
result["score"] = result.get("score", 0) + freshness_boost
return sorted(results, key=lambda x: x["score"], reverse=True)
坑 5:大并发下 Qdrant 连接池耗尽
问题描述:在 200 并发用户压测时,Qdrant 客户端出现 ConnectionPool exhausted 错误,P99 响应时间飙升至 30+ 秒。
解决方案:
from qdrant_client import AsyncQdrantClient
from contextlib import asynccontextmanager
import asyncio
class QdrantConnectionPool:
"""Qdrant 异步连接池"""
def __init__(self, url: str, pool_size: int = 20):
self._semaphore = asyncio.Semaphore(pool_size)
self._client = AsyncQdrantClient(url=url, timeout=30)
self.pool_size = pool_size
@asynccontextmanager
async def acquire(self):
"""获取客户端(带超时的信号量控制)"""
try:
await asyncio.wait_for(self._semaphore.acquire(), timeout=5.0)
try:
yield self._client
finally:
self._semaphore.release()
except asyncio.TimeoutError:
raise RuntimeError("Qdrant connection pool exhausted. Try again later.")
async def search_with_retry(self, *args, max_retries: int = 3, **kwargs) -> list:
"""带重试的搜索"""
for attempt in range(max_retries):
try:
async with self.acquire() as client:
return await client.search(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(0.5 * (attempt + 1)) # 指数退避
return []
# 在 FastAPI 启动时初始化连接池
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.qdrant_pool = QdrantConnectionPool(
url=settings.QDRANT_URL,
pool_size=50, # 根据负载调整
)
yield
# 关闭时清理
本章小结
本章通过一个完整的企业知识库项目,将 RAG + Hermes Agent 的最佳实践落地:
- 文档处理:Unstructured.io 解析多格式文档,滑动窗口+表格感知分块
- 向量存储:Qdrant + OpenAI Embedding,支持部门和权限过滤
- 混合检索:向量搜索 + 全文搜索 + RRF 融合 + Cohere 重排序(提升 15-25%)
- Agent 设计:工具权限绑定用户上下文,引用验证防止幻觉
- 5个踩坑:表格分块、中英混合、幻觉引用、热门文档淹没、连接池耗尽
思考题
- 在这个系统中,如果一位员工问了一个跨部门的问题(他只有A部门权限,但最佳答案在B部门文档中),系统应该如何响应?既不能泄露B部门数据,又要帮助用户找到正确的联系人。
- 查询改写(生成3个变体)会增加 LLM 调用成本。在什么情况下这个额外成本是值得的?如何设计一个自适应策略(简单查询不改写,复杂查询才改写)?
- 重排序(Reranking)是提升精度的有效手段,但会增加延迟。对于 P95 < 5秒的响应时间目标,你会如何在精度和延迟之间做权衡?
- 如果文档更新非常频繁(每天有数百个文档更新),你会如何设计增量更新策略,在不影响查询服务的情况下保持索引的实时性?