Chapter 69

Case Study: Intelligent Knowledge Base Assistant (RAG + Hermes)

Chapter 69: Case Study: Intelligent Knowledge Base Assistant (RAG + Hermes)

This chapter takes the theory from all preceding chapters and lands it in running code through a complete enterprise project: an internal document Q&A system built on Hermes Agent and a vector database. From requirements analysis to production deployment, we walk through every stepโ€”and honestly document the five real pitfalls encountered along the way.


69.1 Requirements Analysis

69.1.1 Business Context

Client: A manufacturing enterprise with 5,000+ employees
Pain points:

Goal: Build an internal knowledge base Q&A system so employees can ask questions in natural language and receive accurate answersโ€”with source citationsโ€”in seconds.

69.1.2 Requirements Matrix

Requirement Description Priority
Natural language Q&A Support Chinese/English mixed queries P0
Document citations Every answer must cite source doc and page P0
Real-time updates Doc changes reflected within 30 minutes P0
Multi-turn dialog Follow-up questions with context P1
Permission control Departments access only authorized docs P1
File types PDF, Word, Excel, PPT, Markdown P1
Response time P95 < 5 seconds P1
Concurrency 200 simultaneous users P2
Audit trail All queries logged and auditable P2

69.2 Architecture Design

69.2.1 System Architecture

User Query
    โ”‚
    โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                  Hermes Agent Layer                   โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚ Query        โ”‚  โ”‚ Tool         โ”‚  โ”‚ Answer     โ”‚  โ”‚
โ”‚  โ”‚ Understandingโ”‚  โ”‚ Orchestrationโ”‚  โ”‚ Synthesis  โ”‚  โ”‚
โ”‚  โ”‚ - Intent     โ”‚  โ”‚ - Strategy   โ”‚  โ”‚ - Citationsโ”‚  โ”‚
โ”‚  โ”‚ - Rewriting  โ”‚  โ”‚ - Multi-tool โ”‚  โ”‚ - Format   โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
    โ”‚              โ”‚              โ”‚
    โ–ผ              โ–ผ              โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Qdrant  โ”‚  โ”‚Full-text โ”‚  โ”‚ Doc Metadata โ”‚
โ”‚ Vector  โ”‚  โ”‚ (ES)     โ”‚  โ”‚ (PostgreSQL) โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
    โ–ฒ              โ–ฒ
    โ”‚              โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Document Processing Pipeline             โ”‚
โ”‚  Upload โ†’ Parse โ†’ Chunk โ†’ Embed โ†’ Store โ†’ Index     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

69.2.2 Technology Choices

Component Choice Rationale
Agent framework Hermes Agent Precise tool calling, multi-step reasoning
Vector DB Qdrant Open-source, high-performance, supports filtering
Full-text search Elasticsearch Hybrid retrieval, handles exact matches
Embedding model text-embedding-3-large Strong multilingual performance
Document parsing Unstructured.io Multi-format support, layout-aware
Metadata storage PostgreSQL Version management, permission control
API layer FastAPI High-performance, async support
Deployment Kubernetes Elastic scaling

69.3 Complete Implementation

69.3.1 Document Processing Pipeline

# pipeline/document_processor.py

import asyncio, hashlib, re
from dataclasses import dataclass
from typing import Optional
import unstructured.partition.auto as auto_partition

@dataclass
class DocumentChunk:
    chunk_id: str
    doc_id: str
    doc_name: str
    page_number: int
    chunk_index: int
    content: str
    content_type: str        # text / table / image_caption
    department: str
    access_level: str
    created_at: str
    updated_at: str
    embedding: Optional[list[float]] = None

    @property
    def word_count(self) -> int:
        return len(self.content.split())

class DocumentProcessor:
    def __init__(self, chunk_size: int = 512, chunk_overlap: int = 64, min_chunk: int = 50):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.min_chunk = min_chunk

    async def process_document(self, file_path: str, metadata: dict) -> list[DocumentChunk]:
        elements = auto_partition.partition(
            filename=file_path,
            languages=["chi_sim", "eng"],
            strategy="hi_res",
            include_page_breaks=True,
        )
        grouped = self._group_by_page(elements)
        chunks, idx = [], 0

        for page_num, page_elements in grouped.items():
            # Table-aware chunking: keep tables whole
            for element in page_elements:
                if element.__class__.__name__ == "Table":
                    if element.text and len(element.text) >= self.min_chunk:
                        chunk_id = hashlib.sha256(
                            f"{metadata['doc_id']}:{idx}:{element.text[:50]}".encode()
                        ).hexdigest()[:16]
                        chunks.append(self._make_chunk(chunk_id, idx, page_num,
                                                        element.text, "table", metadata))
                        idx += 1

            # Text chunks with sliding window
            text = "\n".join(
                e.text for e in page_elements
                if e.__class__.__name__ != "Table" and getattr(e, "text", ""))
            if len(text) < self.min_chunk:
                continue

            for chunk_text in self._sliding_window(text):
                chunk_id = hashlib.sha256(
                    f"{metadata['doc_id']}:{idx}:{chunk_text[:50]}".encode()
                ).hexdigest()[:16]
                chunks.append(self._make_chunk(chunk_id, idx, page_num,
                                                chunk_text, "text", metadata))
                idx += 1

        return chunks

    def _make_chunk(self, chunk_id, idx, page, text, ctype, meta) -> DocumentChunk:
        return DocumentChunk(
            chunk_id=chunk_id, doc_id=meta["doc_id"], doc_name=meta["doc_name"],
            page_number=page, chunk_index=idx, content=text, content_type=ctype,
            department=meta.get("department", "general"),
            access_level=meta.get("access_level", "public"),
            created_at=meta.get("created_at", ""), updated_at=meta.get("updated_at", ""),
        )

    def _sliding_window(self, text: str) -> list[str]:
        sentences = re.split(r'(?<=[ใ€‚๏ผ๏ผŸ.!?])\s+', text)
        chunks, current, current_size = [], [], 0

        for sent in sentences:
            size = len(sent.split())
            if current_size + size > self.chunk_size and current:
                chunks.append(" ".join(current))
                # Keep overlap
                overlap, overlap_size = [], 0
                for s in reversed(current):
                    overlap_size += len(s.split())
                    if overlap_size > self.chunk_overlap: break
                    overlap.insert(0, s)
                current, current_size = overlap, sum(len(s.split()) for s in overlap)
            current.append(sent); current_size += size

        if current:
            chunks.append(" ".join(current))
        return chunks

    def _group_by_page(self, elements) -> dict:
        grouped, page = {}, 1
        for e in elements:
            if e.__class__.__name__ == "PageBreak":
                page += 1
            else:
                grouped.setdefault(page, []).append(e)
        return grouped

69.3.2 Vector Store with Qdrant

# pipeline/vector_store.py

from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
from openai import AsyncOpenAI
import asyncio
from typing import Optional

class VectorStore:
    COLLECTION = "enterprise_kb"
    DIM = 3072  # text-embedding-3-large

    def __init__(self, qdrant_url: str, openai_key: str):
        self.qdrant = QdrantClient(url=qdrant_url)
        self.oai = AsyncOpenAI(api_key=openai_key)
        self._ensure_collection()

    def _ensure_collection(self):
        existing = [c.name for c in self.qdrant.get_collections().collections]
        if self.COLLECTION not in existing:
            self.qdrant.create_collection(
                self.COLLECTION,
                vectors_config=VectorParams(size=self.DIM, distance=Distance.COSINE),
            )
            for field in ["department", "access_level", "doc_id"]:
                self.qdrant.create_payload_index(self.COLLECTION, field, "keyword")

    async def embed_chunks(self, chunks: list[DocumentChunk]) -> list[DocumentChunk]:
        BATCH = 100
        sem = asyncio.Semaphore(3)

        async def embed_batch(batch):
            async with sem:
                resp = await self.oai.embeddings.create(
                    input=[c.content for c in batch], model="text-embedding-3-large")
                for i, chunk in enumerate(batch):
                    chunk.embedding = resp.data[i].embedding
                return batch

        batches = [chunks[i:i+BATCH] for i in range(0, len(chunks), BATCH)]
        results = await asyncio.gather(*[embed_batch(b) for b in batches])
        return [c for batch in results for c in batch]

    def upsert_chunks(self, chunks: list[DocumentChunk]):
        points = [
            PointStruct(
                id=int(c.chunk_id[:8], 16),
                vector=c.embedding,
                payload={
                    "chunk_id": c.chunk_id, "doc_id": c.doc_id, "doc_name": c.doc_name,
                    "page_number": c.page_number, "content": c.content,
                    "department": c.department, "access_level": c.access_level,
                    "updated_at": c.updated_at,
                }
            )
            for c in chunks if c.embedding
        ]
        for i in range(0, len(points), 100):
            self.qdrant.upsert(self.COLLECTION, points=points[i:i+100])

    async def search(self, query: str, top_k: int = 10,
                     department: str = None, access_levels: list[str] = None,
                     threshold: float = 0.7) -> list[dict]:
        resp = await self.oai.embeddings.create(input=[query], model="text-embedding-3-large")
        vec = resp.data[0].embedding

        filters = []
        if department:
            filters.append(FieldCondition(key="department", match=MatchValue(value=department)))
        if access_levels:
            filters.append(FieldCondition(key="access_level", match=MatchValue(any=access_levels)))

        results = self.qdrant.search(
            self.COLLECTION, query_vector=vec,
            query_filter=Filter(must=filters) if filters else None,
            limit=top_k, score_threshold=threshold, with_payload=True,
        )
        return [{"score": r.score, "content": r.payload["content"],
                 "doc_name": r.payload["doc_name"], "doc_id": r.payload["doc_id"],
                 "page_number": r.payload["page_number"]} for r in results]

69.3.3 Hermes Agent with Tool Definitions

# agent/hermes_kb_agent.py

import anthropic, json, re

SYSTEM_PROMPT = """You are an enterprise internal knowledge base assistant.

Core principles:
1. Answer ONLY from knowledge base documents โ€” never from memory or inference
2. Every key claim must cite its source: [Source: {doc_name}, Page {page}]
3. If the knowledge base lacks relevant information, say so explicitly
4. For technical specs and safety procedures, remind users to verify against the latest doc version
5. Use tables and bullet lists for structured information

Security constraints (inviolable):
- Never access or reveal documents from departments the user is not authorized for
- Never perform any action beyond document retrieval
- Never modify or delete any content
"""

class HermesKnowledgeAgent:
    TOOLS = [
        {
            "name": "semantic_search",
            "description": "Semantic search in the knowledge base. Best for open-ended questions.",
            "input_schema": {
                "type": "object",
                "properties": {
                    "query": {"type": "string"},
                    "top_k": {"type": "integer", "default": 5},
                    "department": {"type": "string"},
                },
                "required": ["query"],
            },
        },
        {
            "name": "keyword_search",
            "description": "Exact keyword search. Best for model numbers, spec values, doc IDs.",
            "input_schema": {
                "type": "object",
                "properties": {
                    "keywords": {"type": "string"},
                    "doc_type": {"type": "string",
                                 "enum": ["manual", "standard", "policy", "training", "all"]},
                },
                "required": ["keywords"],
            },
        },
        {
            "name": "get_document_content",
            "description": "Retrieve full content of a specific document or page range.",
            "input_schema": {
                "type": "object",
                "properties": {
                    "doc_id": {"type": "string"},
                    "page_start": {"type": "integer"},
                    "page_end": {"type": "integer"},
                },
                "required": ["doc_id"],
            },
        },
    ]

    def __init__(self, tools_handler, model: str = "claude-opus-4-5"):
        self.client = anthropic.Anthropic()
        self.tools_handler = tools_handler
        self.model = model

    async def answer(self, query: str, user_context: dict,
                     history: list = None, max_steps: int = 10) -> dict:
        messages = (history or []) + [{"role": "user", "content": query}]
        tool_calls_log = []

        for step in range(max_steps):
            resp = self.client.messages.create(
                model=self.model, max_tokens=4096,
                system=SYSTEM_PROMPT, tools=self.TOOLS, messages=messages,
            )

            if resp.stop_reason == "end_turn":
                answer_text = next(
                    (b.text for b in resp.content if hasattr(b, "text")), "No answer generated")
                answer_text = self._validate_citations(answer_text, tool_calls_log)
                return {"answer": answer_text, "tool_calls": tool_calls_log,
                        "steps": step + 1, "sources": self._extract_sources(tool_calls_log)}

            elif resp.stop_reason == "tool_use":
                tool_results = []
                for block in resp.content:
                    if block.type == "tool_use":
                        result = await self.tools_handler.execute_tool(
                            block.name, block.input, user_context)
                        tool_calls_log.append({"step": step, "tool": block.name,
                                               "args": block.input, "result": result})
                        tool_results.append({"type": "tool_result",
                                             "tool_use_id": block.id, "content": result})
                messages.append({"role": "assistant", "content": resp.content})
                messages.append({"role": "user", "content": tool_results})

        return {"answer": "Max steps exceeded. Please simplify your question.",
                "tool_calls": tool_calls_log, "steps": max_steps, "sources": []}

    def _validate_citations(self, answer: str, tool_logs: list) -> str:
        valid = {
            f"{r.get('doc_name')}:{r.get('page_number')}"
            for log in tool_logs
            for r in (json.loads(log.get("result", "[]")) if log.get("result") else [])
            if isinstance(r, dict)
        }
        pattern = re.compile(r'\[Source: (.+?), Page (\d+)\]')
        for doc, page in pattern.findall(answer):
            if f"{doc}:{page}" not in valid:
                answer = answer.replace(f"[Source: {doc}, Page {page}]", "[Citation unverified]")
        return answer

    def _extract_sources(self, logs: list) -> list[dict]:
        sources, seen = [], set()
        for log in logs:
            if log["tool"] in ["semantic_search", "keyword_search"]:
                try:
                    for r in json.loads(log.get("result", "[]")):
                        key = f"{r.get('doc_id')}:{r.get('page_number')}"
                        if key not in seen:
                            sources.append({"doc_name": r.get("doc_name"),
                                           "doc_id": r.get("doc_id"),
                                           "page": r.get("page_number")})
                            seen.add(key)
                except json.JSONDecodeError:
                    pass
        return sources

69.3.4 Hybrid Retrieval

class HybridRetriever:
    """Vector search + full-text search + RRF fusion + reranking (15โ€“25% better than vector-only)"""

    def __init__(self, vector_store: VectorStore, es_client):
        self.vs = vector_store
        self.es = es_client

    async def retrieve(self, query: str, top_k: int = 10,
                       alpha: float = 0.6, user_context: dict = None) -> list[dict]:
        vec_results, fts_results = await asyncio.gather(
            self.vs.search(query, top_k=top_k * 2),
            self._fts(query, top_k=top_k * 2),
        )
        fused = self._rrf([vec_results, fts_results], weights=[alpha, 1 - alpha])
        reranked = await self._rerank(query, fused[:top_k * 2])
        return self._diversity_filter(reranked[:top_k], max_per_doc=2)

    def _rrf(self, lists: list, weights: list, k: int = 60) -> list[dict]:
        scores = {}
        for results, w in zip(lists, weights):
            for rank, r in enumerate(results):
                key = f"{r['doc_id']}:{r.get('page_number', 0)}"
                scores.setdefault(key, {**r, "rrf": 0})["rrf"] += w / (k + rank + 1)
        return sorted(scores.values(), key=lambda x: x["rrf"], reverse=True)

    async def _rerank(self, query: str, candidates: list) -> list:
        import cohere
        co = cohere.Client()
        results = co.rerank(model="rerank-multilingual-v3.0", query=query,
                            documents=[c["content"] for c in candidates],
                            top_n=len(candidates))
        reranked = []
        for r in results.results:
            c = dict(candidates[r.index]); c["rerank_score"] = r.relevance_score
            reranked.append(c)
        return sorted(reranked, key=lambda x: x["rerank_score"], reverse=True)

    def _diversity_filter(self, results: list, max_per_doc: int = 2) -> list:
        counts, out = {}, []
        for r in results:
            did = r["doc_id"]
            if counts.get(did, 0) < max_per_doc:
                out.append(r); counts[did] = counts.get(did, 0) + 1
        return out

69.4 Deployment

# k8s/hermes-kb-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: hermes-kb-agent
  namespace: knowledge-base
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: agent
        image: company-registry/hermes-kb-agent:v1.2.0
        resources:
          requests: { memory: "1Gi", cpu: "500m" }
          limits:   { memory: "2Gi", cpu: "2000m" }
        readinessProbe:
          httpGet: { path: /health, port: 8000 }
          initialDelaySeconds: 10
        envFrom:
        - secretRef:
            name: hermes-kb-secrets  # Managed via External Secrets Operator
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: hermes-kb-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: hermes-kb-agent
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target: { type: Utilization, averageUtilization: 70 }

69.5 Five Real Pitfalls

Pitfall 1: Chunking Destroys Table Structure

Problem: Sliding-window chunking split tables mid-row, returning meaningless fragments.

Original table:
| Model | Voltage | Current | Temperature |
|-------|---------|---------|-------------|
| A-100 | 220V    | 10A     | -20~50ยฐC   |

After blind chunking:
Chunk 1: | Model | Voltage | Current | Temperature |
Chunk 2: | A-100 | 220V    | 10A     | -20~50ยฐC   |
โ†‘ Each chunk is semantically meaningless alone

Fix: Table-aware chunkingโ€”keep tables as atomic units:

for element in page_elements:
    if element.__class__.__name__ == "Table":
        chunks.append(element.text)  # Whole table as one chunk
    else:
        current_text_buffer.append(element.text)

Lesson: Chunking must be content-aware. Never blindly split by token count.


Pitfall 2: Mixed Chinese-English Degrades Embedding Quality

Problem: Documents with mixed Chinese-English (e.g., "A-100ๅž‹ๅท็š„Operating Temperatureๆ˜ฏ-20~50ยฐC") showed poor retrieval accuracy.

Root cause: Embedding models produce unstable semantic representations for code-switched text.

Fix:

def normalize_mixed_text(text: str) -> str:
    """Insert spaces at Chinese-English boundaries to aid tokenization."""
    text = re.sub(r'([\u4e00-\u9fff])([A-Za-z0-9])', r'\1 \2', text)
    text = re.sub(r'([A-Za-z0-9])([\u4e00-\u9fff])', r'\1 \2', text)
    return text

Also consider bilingual dual-indexing: store both original and translated versions.


Pitfall 3: Agent "Invents" Non-Existent Citations

Problem: The agent cited "Product Safety Manual, Page 23" when that document didn't exist.

Root cause: When retrieval results are weak, the LLM completes "plausible" looking citations.

Fix:

def validate_citations(answer: str, tool_logs: list) -> str:
    """Strip any citations not found in actual tool call results."""
    valid = {f"{r['doc_name']}:{r['page_number']}"
             for log in tool_logs for r in json.loads(log.get("result", "[]"))
             if isinstance(r, dict)}

    pattern = re.compile(r'\[Source: (.+?), Page (\d+)\]')
    for doc, page in pattern.findall(answer):
        if f"{doc}:{page}" not in valid:
            answer = answer.replace(f"[Source: {doc}, Page {page}]", "[Citation unverified]")
    return answer

Also add an explicit system prompt rule: "Never cite any document not returned by a tool call."


Problem: Frequently-cited documents like "Company Policy Manual" dominated almost every query's results, burying technically-relevant but less-cited documents.

Fix:

def apply_diversity_filter(results: list, max_per_doc: int = 2) -> list:
    counts, out = {}, []
    for r in results:
        if counts.get(r["doc_id"], 0) < max_per_doc:
            out.append(r); counts[r["doc_id"]] = counts.get(r["doc_id"], 0) + 1
    return out

def apply_freshness_boost(results: list, boost: float = 0.1) -> list:
    from datetime import datetime
    now = datetime.utcnow()
    for r in results:
        age_days = (now - datetime.fromisoformat(r.get("updated_at", "2020-01-01"))).days
        r["score"] = r.get("score", 0) + max(0, boost * (1 - age_days / 365))
    return sorted(results, key=lambda x: x["score"], reverse=True)

Pitfall 5: Qdrant Connection Pool Exhaustion Under Load

Problem: At 200 concurrent users, Qdrant clients returned ConnectionPool exhausted. P99 response time spiked to 30+ seconds.

Fix:

from contextlib import asynccontextmanager
from qdrant_client import AsyncQdrantClient

class QdrantConnectionPool:
    def __init__(self, url: str, pool_size: int = 50):
        self._sem = asyncio.Semaphore(pool_size)
        self._client = AsyncQdrantClient(url=url, timeout=30)

    @asynccontextmanager
    async def acquire(self):
        try:
            await asyncio.wait_for(self._sem.acquire(), timeout=5.0)
            try:
                yield self._client
            finally:
                self._sem.release()
        except asyncio.TimeoutError:
            raise RuntimeError("Qdrant pool exhausted โ€” try again later")

    async def search_with_retry(self, *args, retries: int = 3, **kwargs):
        for attempt in range(retries):
            try:
                async with self.acquire() as client:
                    return await client.search(*args, **kwargs)
            except Exception:
                if attempt == retries - 1: raise
                await asyncio.sleep(0.5 * (attempt + 1))
        return []

Key tuning: set pool_size to 2โ€“3ร— peak expected concurrent searches, not concurrent HTTP requests.


Chapter Summary

This chapter landed RAG + Hermes Agent best practices in a complete enterprise project:

  1. Document processing: Unstructured.io for multi-format parsing, table-aware chunking with sliding window
  2. Vector store: Qdrant + OpenAI Embeddings, with department and permission filtering
  3. Hybrid retrieval: Vector + full-text + RRF fusion + Cohere reranking (15โ€“25% improvement)
  4. Agent design: Tool permissions bound to user context, citation validation to prevent hallucination
  5. Five pitfalls: Table chunking, mixed-language embeddings, hallucinated citations, document popularity bias, connection pool exhaustion

Discussion Questions

  1. If an employee asks a cross-department question (they have access to Department A only, but the best answer is in Department B documents), how should the system respond? It must not leak Department B data, but it should help the user find the right person to contact.
  2. Query rewriting (generating 3 variants) adds LLM call cost. When is that extra cost justified? How would you design an adaptive strategyโ€”skip rewriting for simple queries, apply it only for complex ones?
  3. Reranking improves accuracy but adds latency. Given a P95 < 5-second SLA, how would you balance accuracy against latency? What's your decision framework?
  4. If documents update very frequently (hundreds per day), how would you design an incremental update strategy that keeps the index current without interrupting the query service?
Rate this chapter
4.7  / 5  (3 ratings)

๐Ÿ’ฌ Comments