Case Study: Intelligent Knowledge Base Assistant (RAG + Hermes)
Chapter 69: Case Study: Intelligent Knowledge Base Assistant (RAG + Hermes)
This chapter takes the theory from all preceding chapters and lands it in running code through a complete enterprise project: an internal document Q&A system built on Hermes Agent and a vector database. From requirements analysis to production deployment, we walk through every step—and honestly document the five real pitfalls encountered along the way.
69.1 Requirements Analysis
69.1.1 Business Context
Client: A manufacturing enterprise with 5,000+ employees
Pain points:
- Massive internal document volume: product manuals, process standards, training materials, and compliance documents totaling 120,000+ files
- Time-consuming retrieval: employees spend an average of 23 minutes per search
- Stale documents: employees frequently reference outdated versions
- Cross-department knowledge silos: Department A has the answer; Department B doesn't know it exists
Goal: Build an internal knowledge base Q&A system so employees can ask questions in natural language and receive accurate answers—with source citations—in seconds.
69.1.2 Requirements Matrix
| Requirement | Description | Priority |
|---|---|---|
| Natural language Q&A | Support Chinese/English mixed queries | P0 |
| Document citations | Every answer must cite source doc and page | P0 |
| Real-time updates | Doc changes reflected within 30 minutes | P0 |
| Multi-turn dialog | Follow-up questions with context | P1 |
| Permission control | Departments access only authorized docs | P1 |
| File types | PDF, Word, Excel, PPT, Markdown | P1 |
| Response time | P95 < 5 seconds | P1 |
| Concurrency | 200 simultaneous users | P2 |
| Audit trail | All queries logged and auditable | P2 |
69.2 Architecture Design
69.2.1 System Architecture
User Query
│
▼
┌──────────────────────────────────────────────────────┐
│ Hermes Agent Layer │
│ ┌──────────────┐ ┌──────────────┐ ┌────────────┐ │
│ │ Query │ │ Tool │ │ Answer │ │
│ │ Understanding│ │ Orchestration│ │ Synthesis │ │
│ │ - Intent │ │ - Strategy │ │ - Citations│ │
│ │ - Rewriting │ │ - Multi-tool │ │ - Format │ │
│ └──────────────┘ └──────────────┘ └────────────┘ │
└──────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────┐ ┌──────────┐ ┌──────────────┐
│ Qdrant │ │Full-text │ │ Doc Metadata │
│ Vector │ │ (ES) │ │ (PostgreSQL) │
└─────────┘ └──────────┘ └──────────────┘
▲ ▲
│ │
┌──────────────────────────────────────────────────────┐
│ Document Processing Pipeline │
│ Upload → Parse → Chunk → Embed → Store → Index │
└──────────────────────────────────────────────────────┘
69.2.2 Technology Choices
| Component | Choice | Rationale |
|---|---|---|
| Agent framework | Hermes Agent | Precise tool calling, multi-step reasoning |
| Vector DB | Qdrant | Open-source, high-performance, supports filtering |
| Full-text search | Elasticsearch | Hybrid retrieval, handles exact matches |
| Embedding model | text-embedding-3-large | Strong multilingual performance |
| Document parsing | Unstructured.io | Multi-format support, layout-aware |
| Metadata storage | PostgreSQL | Version management, permission control |
| API layer | FastAPI | High-performance, async support |
| Deployment | Kubernetes | Elastic scaling |
69.3 Complete Implementation
69.3.1 Document Processing Pipeline
# pipeline/document_processor.py
import asyncio, hashlib, re
from dataclasses import dataclass
from typing import Optional
import unstructured.partition.auto as auto_partition
@dataclass
class DocumentChunk:
chunk_id: str
doc_id: str
doc_name: str
page_number: int
chunk_index: int
content: str
content_type: str # text / table / image_caption
department: str
access_level: str
created_at: str
updated_at: str
embedding: Optional[list[float]] = None
@property
def word_count(self) -> int:
return len(self.content.split())
class DocumentProcessor:
def __init__(self, chunk_size: int = 512, chunk_overlap: int = 64, min_chunk: int = 50):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.min_chunk = min_chunk
async def process_document(self, file_path: str, metadata: dict) -> list[DocumentChunk]:
elements = auto_partition.partition(
filename=file_path,
languages=["chi_sim", "eng"],
strategy="hi_res",
include_page_breaks=True,
)
grouped = self._group_by_page(elements)
chunks, idx = [], 0
for page_num, page_elements in grouped.items():
# Table-aware chunking: keep tables whole
for element in page_elements:
if element.__class__.__name__ == "Table":
if element.text and len(element.text) >= self.min_chunk:
chunk_id = hashlib.sha256(
f"{metadata['doc_id']}:{idx}:{element.text[:50]}".encode()
).hexdigest()[:16]
chunks.append(self._make_chunk(chunk_id, idx, page_num,
element.text, "table", metadata))
idx += 1
# Text chunks with sliding window
text = "\n".join(
e.text for e in page_elements
if e.__class__.__name__ != "Table" and getattr(e, "text", ""))
if len(text) < self.min_chunk:
continue
for chunk_text in self._sliding_window(text):
chunk_id = hashlib.sha256(
f"{metadata['doc_id']}:{idx}:{chunk_text[:50]}".encode()
).hexdigest()[:16]
chunks.append(self._make_chunk(chunk_id, idx, page_num,
chunk_text, "text", metadata))
idx += 1
return chunks
def _make_chunk(self, chunk_id, idx, page, text, ctype, meta) -> DocumentChunk:
return DocumentChunk(
chunk_id=chunk_id, doc_id=meta["doc_id"], doc_name=meta["doc_name"],
page_number=page, chunk_index=idx, content=text, content_type=ctype,
department=meta.get("department", "general"),
access_level=meta.get("access_level", "public"),
created_at=meta.get("created_at", ""), updated_at=meta.get("updated_at", ""),
)
def _sliding_window(self, text: str) -> list[str]:
sentences = re.split(r'(?<=[。!?.!?])\s+', text)
chunks, current, current_size = [], [], 0
for sent in sentences:
size = len(sent.split())
if current_size + size > self.chunk_size and current:
chunks.append(" ".join(current))
# Keep overlap
overlap, overlap_size = [], 0
for s in reversed(current):
overlap_size += len(s.split())
if overlap_size > self.chunk_overlap: break
overlap.insert(0, s)
current, current_size = overlap, sum(len(s.split()) for s in overlap)
current.append(sent); current_size += size
if current:
chunks.append(" ".join(current))
return chunks
def _group_by_page(self, elements) -> dict:
grouped, page = {}, 1
for e in elements:
if e.__class__.__name__ == "PageBreak":
page += 1
else:
grouped.setdefault(page, []).append(e)
return grouped
69.3.2 Vector Store with Qdrant
# pipeline/vector_store.py
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
from openai import AsyncOpenAI
import asyncio
from typing import Optional
class VectorStore:
COLLECTION = "enterprise_kb"
DIM = 3072 # text-embedding-3-large
def __init__(self, qdrant_url: str, openai_key: str):
self.qdrant = QdrantClient(url=qdrant_url)
self.oai = AsyncOpenAI(api_key=openai_key)
self._ensure_collection()
def _ensure_collection(self):
existing = [c.name for c in self.qdrant.get_collections().collections]
if self.COLLECTION not in existing:
self.qdrant.create_collection(
self.COLLECTION,
vectors_config=VectorParams(size=self.DIM, distance=Distance.COSINE),
)
for field in ["department", "access_level", "doc_id"]:
self.qdrant.create_payload_index(self.COLLECTION, field, "keyword")
async def embed_chunks(self, chunks: list[DocumentChunk]) -> list[DocumentChunk]:
BATCH = 100
sem = asyncio.Semaphore(3)
async def embed_batch(batch):
async with sem:
resp = await self.oai.embeddings.create(
input=[c.content for c in batch], model="text-embedding-3-large")
for i, chunk in enumerate(batch):
chunk.embedding = resp.data[i].embedding
return batch
batches = [chunks[i:i+BATCH] for i in range(0, len(chunks), BATCH)]
results = await asyncio.gather(*[embed_batch(b) for b in batches])
return [c for batch in results for c in batch]
def upsert_chunks(self, chunks: list[DocumentChunk]):
points = [
PointStruct(
id=int(c.chunk_id[:8], 16),
vector=c.embedding,
payload={
"chunk_id": c.chunk_id, "doc_id": c.doc_id, "doc_name": c.doc_name,
"page_number": c.page_number, "content": c.content,
"department": c.department, "access_level": c.access_level,
"updated_at": c.updated_at,
}
)
for c in chunks if c.embedding
]
for i in range(0, len(points), 100):
self.qdrant.upsert(self.COLLECTION, points=points[i:i+100])
async def search(self, query: str, top_k: int = 10,
department: str = None, access_levels: list[str] = None,
threshold: float = 0.7) -> list[dict]:
resp = await self.oai.embeddings.create(input=[query], model="text-embedding-3-large")
vec = resp.data[0].embedding
filters = []
if department:
filters.append(FieldCondition(key="department", match=MatchValue(value=department)))
if access_levels:
filters.append(FieldCondition(key="access_level", match=MatchValue(any=access_levels)))
results = self.qdrant.search(
self.COLLECTION, query_vector=vec,
query_filter=Filter(must=filters) if filters else None,
limit=top_k, score_threshold=threshold, with_payload=True,
)
return [{"score": r.score, "content": r.payload["content"],
"doc_name": r.payload["doc_name"], "doc_id": r.payload["doc_id"],
"page_number": r.payload["page_number"]} for r in results]
69.3.3 Hermes Agent with Tool Definitions
# agent/hermes_kb_agent.py
import anthropic, json, re
SYSTEM_PROMPT = """You are an enterprise internal knowledge base assistant.
Core principles:
1. Answer ONLY from knowledge base documents — never from memory or inference
2. Every key claim must cite its source: [Source: {doc_name}, Page {page}]
3. If the knowledge base lacks relevant information, say so explicitly
4. For technical specs and safety procedures, remind users to verify against the latest doc version
5. Use tables and bullet lists for structured information
Security constraints (inviolable):
- Never access or reveal documents from departments the user is not authorized for
- Never perform any action beyond document retrieval
- Never modify or delete any content
"""
class HermesKnowledgeAgent:
TOOLS = [
{
"name": "semantic_search",
"description": "Semantic search in the knowledge base. Best for open-ended questions.",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"top_k": {"type": "integer", "default": 5},
"department": {"type": "string"},
},
"required": ["query"],
},
},
{
"name": "keyword_search",
"description": "Exact keyword search. Best for model numbers, spec values, doc IDs.",
"input_schema": {
"type": "object",
"properties": {
"keywords": {"type": "string"},
"doc_type": {"type": "string",
"enum": ["manual", "standard", "policy", "training", "all"]},
},
"required": ["keywords"],
},
},
{
"name": "get_document_content",
"description": "Retrieve full content of a specific document or page range.",
"input_schema": {
"type": "object",
"properties": {
"doc_id": {"type": "string"},
"page_start": {"type": "integer"},
"page_end": {"type": "integer"},
},
"required": ["doc_id"],
},
},
]
def __init__(self, tools_handler, model: str = "claude-opus-4-5"):
self.client = anthropic.Anthropic()
self.tools_handler = tools_handler
self.model = model
async def answer(self, query: str, user_context: dict,
history: list = None, max_steps: int = 10) -> dict:
messages = (history or []) + [{"role": "user", "content": query}]
tool_calls_log = []
for step in range(max_steps):
resp = self.client.messages.create(
model=self.model, max_tokens=4096,
system=SYSTEM_PROMPT, tools=self.TOOLS, messages=messages,
)
if resp.stop_reason == "end_turn":
answer_text = next(
(b.text for b in resp.content if hasattr(b, "text")), "No answer generated")
answer_text = self._validate_citations(answer_text, tool_calls_log)
return {"answer": answer_text, "tool_calls": tool_calls_log,
"steps": step + 1, "sources": self._extract_sources(tool_calls_log)}
elif resp.stop_reason == "tool_use":
tool_results = []
for block in resp.content:
if block.type == "tool_use":
result = await self.tools_handler.execute_tool(
block.name, block.input, user_context)
tool_calls_log.append({"step": step, "tool": block.name,
"args": block.input, "result": result})
tool_results.append({"type": "tool_result",
"tool_use_id": block.id, "content": result})
messages.append({"role": "assistant", "content": resp.content})
messages.append({"role": "user", "content": tool_results})
return {"answer": "Max steps exceeded. Please simplify your question.",
"tool_calls": tool_calls_log, "steps": max_steps, "sources": []}
def _validate_citations(self, answer: str, tool_logs: list) -> str:
valid = {
f"{r.get('doc_name')}:{r.get('page_number')}"
for log in tool_logs
for r in (json.loads(log.get("result", "[]")) if log.get("result") else [])
if isinstance(r, dict)
}
pattern = re.compile(r'\[Source: (.+?), Page (\d+)\]')
for doc, page in pattern.findall(answer):
if f"{doc}:{page}" not in valid:
answer = answer.replace(f"[Source: {doc}, Page {page}]", "[Citation unverified]")
return answer
def _extract_sources(self, logs: list) -> list[dict]:
sources, seen = [], set()
for log in logs:
if log["tool"] in ["semantic_search", "keyword_search"]:
try:
for r in json.loads(log.get("result", "[]")):
key = f"{r.get('doc_id')}:{r.get('page_number')}"
if key not in seen:
sources.append({"doc_name": r.get("doc_name"),
"doc_id": r.get("doc_id"),
"page": r.get("page_number")})
seen.add(key)
except json.JSONDecodeError:
pass
return sources
69.3.4 Hybrid Retrieval
class HybridRetriever:
"""Vector search + full-text search + RRF fusion + reranking (15–25% better than vector-only)"""
def __init__(self, vector_store: VectorStore, es_client):
self.vs = vector_store
self.es = es_client
async def retrieve(self, query: str, top_k: int = 10,
alpha: float = 0.6, user_context: dict = None) -> list[dict]:
vec_results, fts_results = await asyncio.gather(
self.vs.search(query, top_k=top_k * 2),
self._fts(query, top_k=top_k * 2),
)
fused = self._rrf([vec_results, fts_results], weights=[alpha, 1 - alpha])
reranked = await self._rerank(query, fused[:top_k * 2])
return self._diversity_filter(reranked[:top_k], max_per_doc=2)
def _rrf(self, lists: list, weights: list, k: int = 60) -> list[dict]:
scores = {}
for results, w in zip(lists, weights):
for rank, r in enumerate(results):
key = f"{r['doc_id']}:{r.get('page_number', 0)}"
scores.setdefault(key, {**r, "rrf": 0})["rrf"] += w / (k + rank + 1)
return sorted(scores.values(), key=lambda x: x["rrf"], reverse=True)
async def _rerank(self, query: str, candidates: list) -> list:
import cohere
co = cohere.Client()
results = co.rerank(model="rerank-multilingual-v3.0", query=query,
documents=[c["content"] for c in candidates],
top_n=len(candidates))
reranked = []
for r in results.results:
c = dict(candidates[r.index]); c["rerank_score"] = r.relevance_score
reranked.append(c)
return sorted(reranked, key=lambda x: x["rerank_score"], reverse=True)
def _diversity_filter(self, results: list, max_per_doc: int = 2) -> list:
counts, out = {}, []
for r in results:
did = r["doc_id"]
if counts.get(did, 0) < max_per_doc:
out.append(r); counts[did] = counts.get(did, 0) + 1
return out
69.4 Deployment
# k8s/hermes-kb-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: hermes-kb-agent
namespace: knowledge-base
spec:
replicas: 3
template:
spec:
containers:
- name: agent
image: company-registry/hermes-kb-agent:v1.2.0
resources:
requests: { memory: "1Gi", cpu: "500m" }
limits: { memory: "2Gi", cpu: "2000m" }
readinessProbe:
httpGet: { path: /health, port: 8000 }
initialDelaySeconds: 10
envFrom:
- secretRef:
name: hermes-kb-secrets # Managed via External Secrets Operator
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: hermes-kb-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: hermes-kb-agent
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target: { type: Utilization, averageUtilization: 70 }
69.5 Five Real Pitfalls
Pitfall 1: Chunking Destroys Table Structure
Problem: Sliding-window chunking split tables mid-row, returning meaningless fragments.
Original table:
| Model | Voltage | Current | Temperature |
|-------|---------|---------|-------------|
| A-100 | 220V | 10A | -20~50°C |
After blind chunking:
Chunk 1: | Model | Voltage | Current | Temperature |
Chunk 2: | A-100 | 220V | 10A | -20~50°C |
↑ Each chunk is semantically meaningless alone
Fix: Table-aware chunking—keep tables as atomic units:
for element in page_elements:
if element.__class__.__name__ == "Table":
chunks.append(element.text) # Whole table as one chunk
else:
current_text_buffer.append(element.text)
Lesson: Chunking must be content-aware. Never blindly split by token count.
Pitfall 2: Mixed Chinese-English Degrades Embedding Quality
Problem: Documents with mixed Chinese-English (e.g., "A-100型号的Operating Temperature是-20~50°C") showed poor retrieval accuracy.
Root cause: Embedding models produce unstable semantic representations for code-switched text.
Fix:
def normalize_mixed_text(text: str) -> str:
"""Insert spaces at Chinese-English boundaries to aid tokenization."""
text = re.sub(r'([\u4e00-\u9fff])([A-Za-z0-9])', r'\1 \2', text)
text = re.sub(r'([A-Za-z0-9])([\u4e00-\u9fff])', r'\1 \2', text)
return text
Also consider bilingual dual-indexing: store both original and translated versions.
Pitfall 3: Agent "Invents" Non-Existent Citations
Problem: The agent cited "Product Safety Manual, Page 23" when that document didn't exist.
Root cause: When retrieval results are weak, the LLM completes "plausible" looking citations.
Fix:
def validate_citations(answer: str, tool_logs: list) -> str:
"""Strip any citations not found in actual tool call results."""
valid = {f"{r['doc_name']}:{r['page_number']}"
for log in tool_logs for r in json.loads(log.get("result", "[]"))
if isinstance(r, dict)}
pattern = re.compile(r'\[Source: (.+?), Page (\d+)\]')
for doc, page in pattern.findall(answer):
if f"{doc}:{page}" not in valid:
answer = answer.replace(f"[Source: {doc}, Page {page}]", "[Citation unverified]")
return answer
Also add an explicit system prompt rule: "Never cite any document not returned by a tool call."
Pitfall 4: Popular Documents Crowd Out Long-Tail Documents
Problem: Frequently-cited documents like "Company Policy Manual" dominated almost every query's results, burying technically-relevant but less-cited documents.
Fix:
def apply_diversity_filter(results: list, max_per_doc: int = 2) -> list:
counts, out = {}, []
for r in results:
if counts.get(r["doc_id"], 0) < max_per_doc:
out.append(r); counts[r["doc_id"]] = counts.get(r["doc_id"], 0) + 1
return out
def apply_freshness_boost(results: list, boost: float = 0.1) -> list:
from datetime import datetime
now = datetime.utcnow()
for r in results:
age_days = (now - datetime.fromisoformat(r.get("updated_at", "2020-01-01"))).days
r["score"] = r.get("score", 0) + max(0, boost * (1 - age_days / 365))
return sorted(results, key=lambda x: x["score"], reverse=True)
Pitfall 5: Qdrant Connection Pool Exhaustion Under Load
Problem: At 200 concurrent users, Qdrant clients returned ConnectionPool exhausted. P99 response time spiked to 30+ seconds.
Fix:
from contextlib import asynccontextmanager
from qdrant_client import AsyncQdrantClient
class QdrantConnectionPool:
def __init__(self, url: str, pool_size: int = 50):
self._sem = asyncio.Semaphore(pool_size)
self._client = AsyncQdrantClient(url=url, timeout=30)
@asynccontextmanager
async def acquire(self):
try:
await asyncio.wait_for(self._sem.acquire(), timeout=5.0)
try:
yield self._client
finally:
self._sem.release()
except asyncio.TimeoutError:
raise RuntimeError("Qdrant pool exhausted — try again later")
async def search_with_retry(self, *args, retries: int = 3, **kwargs):
for attempt in range(retries):
try:
async with self.acquire() as client:
return await client.search(*args, **kwargs)
except Exception:
if attempt == retries - 1: raise
await asyncio.sleep(0.5 * (attempt + 1))
return []
Key tuning: set pool_size to 2–3× peak expected concurrent searches, not concurrent HTTP requests.
Chapter Summary
This chapter landed RAG + Hermes Agent best practices in a complete enterprise project:
- Document processing: Unstructured.io for multi-format parsing, table-aware chunking with sliding window
- Vector store: Qdrant + OpenAI Embeddings, with department and permission filtering
- Hybrid retrieval: Vector + full-text + RRF fusion + Cohere reranking (15–25% improvement)
- Agent design: Tool permissions bound to user context, citation validation to prevent hallucination
- Five pitfalls: Table chunking, mixed-language embeddings, hallucinated citations, document popularity bias, connection pool exhaustion
Discussion Questions
- If an employee asks a cross-department question (they have access to Department A only, but the best answer is in Department B documents), how should the system respond? It must not leak Department B data, but it should help the user find the right person to contact.
- Query rewriting (generating 3 variants) adds LLM call cost. When is that extra cost justified? How would you design an adaptive strategy—skip rewriting for simple queries, apply it only for complex ones?
- Reranking improves accuracy but adds latency. Given a P95 < 5-second SLA, how would you balance accuracy against latency? What's your decision framework?
- If documents update very frequently (hundreds per day), how would you design an incremental update strategy that keeps the index current without interrupting the query service?