第 49 章

负载均衡与横向扩展

第49章:负载均衡与横向扩展

导语

当 Hermes Agent 从实验室走向生产环境,单实例部署迅速成为瓶颈。随着并发请求量上升,响应延迟攀升,错误率开始出现,系统稳定性岌岌可危。本章系统讲解如何通过 Nginx/HAProxy 反向代理、无状态化设计、Redis 共享记忆层等技术手段,将 Hermes Agent 扩展为高可用的分布式服务。


49.1 为什么 Agent 扩展比普通 Web 服务更难

传统 Web 服务的扩展相对简单:无状态 HTTP 服务可以任意复制,负载均衡器将请求均匀分发即可。但 Hermes Agent 存在几个关键差异:

1. 会话状态的持久性

Agent 在执行多步任务时,维护着大量中间状态:

如果请求 A 在实例 1 上建立会话,请求 B 被转发到实例 2,则实例 2 找不到任何上下文,任务将从零开始或直接失败。

2. 长连接与流式响应

Hermes Agent 通常使用 SSE(Server-Sent Events)或 WebSocket 向客户端流式返回 Token。这要求负载均衡器在整个 Agent 执行期间(可能长达数分钟)保持连接稳定。

3. 工具调用的副作用

Agent 调用工具(如写文件、发邮件、查数据库)时,这些操作不可简单重试。如果实例崩溃导致任务被重新调度到另一实例,可能产生重复操作。

flowchart TD
    Client[客户端] --> LB[负载均衡器]
    LB --> A1[Agent 实例 1]
    LB --> A2[Agent 实例 2]
    LB --> A3[Agent 实例 3]
    A1 --> Redis[(Redis 共享记忆)]
    A2 --> Redis
    A3 --> Redis
    A1 --> Tools[工具层]
    A2 --> Tools
    A3 --> Tools
    Redis --> PG[(PostgreSQL 持久化)]

49.2 Nginx 反向代理配置

基础配置

# /etc/nginx/conf.d/hermes-agent.conf

upstream hermes_backend {
    # 会话亲和性:基于 session_id cookie
    ip_hash;
    
    server 10.0.1.10:8000 weight=1 max_fails=3 fail_timeout=30s;
    server 10.0.1.11:8000 weight=1 max_fails=3 fail_timeout=30s;
    server 10.0.1.12:8000 weight=1 max_fails=3 fail_timeout=30s;
    
    keepalive 32;
}

server {
    listen 80;
    server_name agent.example.com;
    
    # SSE 长连接优化
    proxy_read_timeout 300s;
    proxy_send_timeout 300s;
    proxy_connect_timeout 10s;
    
    # 禁用缓冲(SSE 必需)
    proxy_buffering off;
    proxy_cache off;
    
    location /api/agent/ {
        proxy_pass http://hermes_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        
        # 会话亲和性 Cookie
        proxy_set_header Cookie $http_cookie;
    }
    
    # 健康检查端点
    location /health {
        proxy_pass http://hermes_backend/health;
        access_log off;
    }
}
upstream hermes_backend {
    # 使用 sticky 模块实现 Cookie 亲和性
    sticky cookie srv_id expires=1h domain=.example.com path=/;
    
    server 10.0.1.10:8000;
    server 10.0.1.11:8000;
    server 10.0.1.12:8000;
}

注意:sticky 指令需要 Nginx Plus 或第三方模块 nginx-sticky-module-ng


49.3 HAProxy 配置(推荐用于生产)

HAProxy 对长连接和健康检查的支持比开源 Nginx 更成熟:

# /etc/haproxy/haproxy.cfg

global
    maxconn 50000
    log /dev/log local0
    chroot /var/lib/haproxy
    stats socket /run/haproxy/admin.sock mode 660 level admin
    user haproxy
    group haproxy
    daemon

defaults
    log     global
    mode    http
    option  httplog
    option  dontlognull
    timeout connect 5s
    timeout client  300s   # Agent 长任务需要足够长的超时
    timeout server  300s
    timeout tunnel  1h     # WebSocket/SSE 隧道超时

frontend hermes_frontend
    bind *:80
    bind *:443 ssl crt /etc/ssl/certs/agent.pem
    
    # 基于 session_id 头部的亲和性
    use_backend hermes_backend
    
    # 捕获 session_id 用于日志
    http-request capture req.hdr(X-Session-Id) len 64

backend hermes_backend
    balance leastconn    # 最少连接,适合长任务
    
    # Cookie 亲和性
    cookie SERVERID insert indirect nocache
    
    # 健康检查
    option httpchk GET /health
    http-check expect status 200
    
    server agent1 10.0.1.10:8000 check cookie agent1 inter 5s rise 2 fall 3
    server agent2 10.0.1.11:8000 check cookie agent2 inter 5s rise 2 fall 3
    server agent3 10.0.1.12:8000 check cookie agent3 inter 5s rise 2 fall 3

# 统计页面
listen stats
    bind *:8404
    stats enable
    stats uri /stats
    stats refresh 30s
    stats auth admin:your_password

49.4 无状态化设计方案

解决 Session 亲和性问题的根本方法是无状态化:将所有状态外移到共享存储,使每个请求可以被任意实例处理。

状态分类与存储策略

状态类型 大小 更新频率 推荐存储
对话历史(消息列表) 中(~10KB) 每轮更新 Redis(TTL 24h)
Agent 执行状态 小(~1KB) 每步更新 Redis
工具调用结果缓存 可变(1KB~10MB) 只写 Redis + S3
长期记忆/知识库 大(MB~GB) 低频 PostgreSQL / Qdrant
用户偏好设置 小(~1KB) 低频 PostgreSQL

无状态 Agent 实现

# stateless_agent.py
import json
import uuid
from typing import Optional
import redis.asyncio as aioredis
from hermes import HermesAgent, AgentConfig
from dataclasses import dataclass, asdict

@dataclass
class AgentSession:
    """可序列化的 Agent 会话状态"""
    session_id: str
    messages: list
    current_step: int
    tool_results: dict
    metadata: dict
    
    def to_json(self) -> str:
        return json.dumps(asdict(self), ensure_ascii=False)
    
    @classmethod
    def from_json(cls, data: str) -> 'AgentSession':
        return cls(**json.loads(data))


class StatelessHermesAgent:
    """无状态 Hermes Agent,所有状态存储在 Redis"""
    
    SESSION_TTL = 86400  # 24小时
    SESSION_PREFIX = "hermes:session:"
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_url = redis_url
        self._redis: Optional[aioredis.Redis] = None
        self.agent_config = AgentConfig(
            model="NousResearch/Hermes-3-Llama-3.1-8B",
            max_steps=20,
            tools=["web_search", "code_exec", "file_read"]
        )
    
    async def _get_redis(self) -> aioredis.Redis:
        if self._redis is None:
            self._redis = await aioredis.from_url(
                self.redis_url,
                encoding="utf-8",
                decode_responses=True
            )
        return self._redis
    
    async def create_session(self, user_id: str) -> str:
        """创建新的 Agent 会话,返回 session_id"""
        session_id = f"{user_id}:{uuid.uuid4().hex}"
        session = AgentSession(
            session_id=session_id,
            messages=[],
            current_step=0,
            tool_results={},
            metadata={"user_id": user_id, "created_at": str(__import__("time").time())}
        )
        
        redis = await self._get_redis()
        key = f"{self.SESSION_PREFIX}{session_id}"
        await redis.setex(key, self.SESSION_TTL, session.to_json())
        
        return session_id
    
    async def load_session(self, session_id: str) -> Optional[AgentSession]:
        """从 Redis 加载会话状态"""
        redis = await self._get_redis()
        key = f"{self.SESSION_PREFIX}{session_id}"
        data = await redis.get(key)
        
        if data is None:
            return None
        
        # 续期 TTL
        await redis.expire(key, self.SESSION_TTL)
        return AgentSession.from_json(data)
    
    async def save_session(self, session: AgentSession) -> None:
        """保存会话状态到 Redis"""
        redis = await self._get_redis()
        key = f"{self.SESSION_PREFIX}{session.session_id}"
        await redis.setex(key, self.SESSION_TTL, session.to_json())
    
    async def run_step(self, session_id: str, user_message: str) -> dict:
        """
        执行一步 Agent 推理。
        无论哪个实例处理请求,都能从 Redis 恢复完整状态。
        """
        # 1. 加载状态
        session = await self.load_session(session_id)
        if session is None:
            raise ValueError(f"Session {session_id} not found")
        
        # 2. 添加用户消息
        session.messages.append({
            "role": "user",
            "content": user_message
        })
        
        # 3. 执行 Agent 推理(本地无状态调用)
        agent = HermesAgent(self.agent_config)
        result = await agent.step(
            messages=session.messages,
            tool_results=session.tool_results,
            step_number=session.current_step
        )
        
        # 4. 更新状态
        session.messages.append({
            "role": "assistant",
            "content": result.content
        })
        session.current_step += 1
        if result.tool_calls:
            for tc in result.tool_calls:
                session.tool_results[tc.id] = tc.result
        
        # 5. 持久化状态
        await self.save_session(session)
        
        return {
            "session_id": session_id,
            "step": session.current_step,
            "content": result.content,
            "tool_calls": result.tool_calls,
            "is_final": result.is_final
        }
    
    async def delete_session(self, session_id: str) -> None:
        """清理会话"""
        redis = await self._get_redis()
        key = f"{self.SESSION_PREFIX}{session_id}"
        await redis.delete(key)


# FastAPI 应用示例
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()
agent_service = StatelessHermesAgent(redis_url="redis://redis-cluster:6379")

@app.post("/api/sessions")
async def create_session(user_id: str):
    session_id = await agent_service.create_session(user_id)
    return {"session_id": session_id}

@app.post("/api/sessions/{session_id}/run")
async def run_agent(session_id: str, message: str):
    try:
        result = await agent_service.run_step(session_id, message)
        return result
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))

@app.get("/health")
async def health():
    return {"status": "ok"}

49.5 Redis 共享记忆层设计

Redis 在这个架构中承担多重职责:

数据结构选择

# redis_memory.py
import redis.asyncio as aioredis
import json
from typing import Any, Optional

class HermesRedisMemory:
    """
    Hermes Agent 的 Redis 共享记忆层
    支持多实例共享、原子操作、TTL 管理
    """
    
    def __init__(self, redis_url: str):
        self.redis = aioredis.from_url(redis_url)
    
    # ─── 消息历史(List 结构)───────────────────────────────
    
    async def append_message(self, session_id: str, message: dict) -> int:
        """追加消息,返回当前消息数量"""
        key = f"hermes:msg:{session_id}"
        length = await self.redis.rpush(key, json.dumps(message, ensure_ascii=False))
        await self.redis.expire(key, 86400)
        return length
    
    async def get_messages(self, session_id: str, limit: int = -1) -> list:
        """获取消息历史,limit=-1 表示全部"""
        key = f"hermes:msg:{session_id}"
        start = 0 if limit == -1 else max(0, -limit)
        items = await self.redis.lrange(key, start, -1)
        return [json.loads(item) for item in items]
    
    async def trim_messages(self, session_id: str, keep_last: int = 50) -> None:
        """裁剪消息历史,防止无限增长"""
        key = f"hermes:msg:{session_id}"
        await self.redis.ltrim(key, -keep_last, -1)
    
    # ─── 工具结果缓存(Hash 结构)──────────────────────────
    
    async def cache_tool_result(
        self, 
        tool_call_id: str, 
        result: Any, 
        ttl: int = 3600
    ) -> None:
        """缓存工具调用结果(幂等保护)"""
        key = f"hermes:tool:{tool_call_id}"
        await self.redis.setex(key, ttl, json.dumps(result, ensure_ascii=False))
    
    async def get_tool_result(self, tool_call_id: str) -> Optional[Any]:
        """获取缓存的工具结果"""
        key = f"hermes:tool:{tool_call_id}"
        data = await self.redis.get(key)
        return json.loads(data) if data else None
    
    # ─── 分布式锁(防止并发冲突)──────────────────────────
    
    async def acquire_session_lock(
        self, 
        session_id: str, 
        timeout: int = 30
    ) -> bool:
        """
        获取会话锁,防止同一会话被多个实例并发处理
        返回 True 表示成功获取锁
        """
        key = f"hermes:lock:{session_id}"
        result = await self.redis.set(key, "1", nx=True, ex=timeout)
        return result is True
    
    async def release_session_lock(self, session_id: str) -> None:
        """释放会话锁"""
        key = f"hermes:lock:{session_id}"
        await self.redis.delete(key)
    
    # ─── 全局统计计数器──────────────────────────────────────
    
    async def increment_counter(self, metric: str) -> int:
        """原子递增计数器"""
        key = f"hermes:counter:{metric}"
        return await self.redis.incr(key)
    
    async def get_active_sessions(self) -> int:
        """获取当前活跃会话数量"""
        pattern = "hermes:msg:*"
        keys = await self.redis.keys(pattern)
        return len(keys)

Redis 集群配置(生产推荐)

# docker-compose.yml 中的 Redis Sentinel 配置
services:
  redis-master:
    image: redis:7-alpine
    command: redis-server --appendonly yes --maxmemory 4gb --maxmemory-policy allkeys-lru
    volumes:
      - redis-data:/data
    
  redis-replica-1:
    image: redis:7-alpine
    command: redis-server --replicaof redis-master 6379
    depends_on:
      - redis-master
  
  redis-sentinel:
    image: redis:7-alpine
    command: >
      redis-sentinel /etc/sentinel.conf
    volumes:
      - ./sentinel.conf:/etc/sentinel.conf

49.6 扩展瓶颈分析与解决方案

瓶颈识别

# bottleneck_analyzer.py
import time
import asyncio
from collections import defaultdict
from typing import Dict, List
import statistics

class BottleneckAnalyzer:
    """分析 Hermes Agent 集群的性能瓶颈"""
    
    def __init__(self):
        self.latencies: Dict[str, List[float]] = defaultdict(list)
        self.error_counts: Dict[str, int] = defaultdict(int)
    
    def record_latency(self, component: str, latency_ms: float):
        self.latencies[component].append(latency_ms)
        # 只保留最近 1000 条
        if len(self.latencies[component]) > 1000:
            self.latencies[component].pop(0)
    
    def get_report(self) -> dict:
        report = {}
        for component, lats in self.latencies.items():
            if lats:
                report[component] = {
                    "p50": statistics.median(lats),
                    "p95": sorted(lats)[int(len(lats) * 0.95)],
                    "p99": sorted(lats)[int(len(lats) * 0.99)],
                    "mean": statistics.mean(lats),
                    "count": len(lats)
                }
        return report
    
    def identify_bottleneck(self) -> str:
        """识别主要瓶颈"""
        report = self.get_report()
        
        bottlenecks = []
        thresholds = {
            "llm_inference": 5000,    # LLM 推理 >5s
            "redis_read": 10,          # Redis 读取 >10ms
            "redis_write": 20,         # Redis 写入 >20ms
            "tool_execution": 2000,    # 工具执行 >2s
        }
        
        for component, threshold in thresholds.items():
            if component in report:
                if report[component]["p95"] > threshold:
                    bottlenecks.append(
                        f"{component}: P95={report[component]['p95']:.0f}ms (threshold={threshold}ms)"
                    )
        
        return "\n".join(bottlenecks) if bottlenecks else "No bottlenecks detected"

常见瓶颈与解决方案

瓶颈位置 症状 解决方案
LLM 推理层 高延迟、GPU 利用率 100% 增加 GPU 节点、模型量化(AWQ/GPTQ)、请求批处理
Redis 层 Redis CPU 高、读写延迟上升 Redis Cluster 分片、读写分离(Replica 分担读)
网络 I/O 工具调用慢、外部 API 超时 连接池复用、异步并发调用、本地缓存
上下文长度 长任务后期推理变慢 上下文压缩(摘要)、滑动窗口、外部记忆
工具并发 多工具调用串行等待 并行工具调用(asyncio.gather

49.7 动态扩缩容(Kubernetes HPA)

# hermes-agent-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: hermes-agent
spec:
  replicas: 3
  selector:
    matchLabels:
      app: hermes-agent
  template:
    metadata:
      labels:
        app: hermes-agent
    spec:
      containers:
      - name: hermes-agent
        image: yourorg/hermes-agent:latest
        resources:
          requests:
            memory: "4Gi"
            cpu: "2"
            nvidia.com/gpu: "1"
          limits:
            memory: "8Gi"
            cpu: "4"
            nvidia.com/gpu: "1"
        env:
        - name: REDIS_URL
          valueFrom:
            secretKeyRef:
              name: hermes-secrets
              key: redis-url
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 60
          periodSeconds: 30

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: hermes-agent-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: hermes-agent
  minReplicas: 2
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Pods
    pods:
      metric:
        name: hermes_active_sessions
      target:
        type: AverageValue
        averageValue: "50"  # 每个实例最多 50 个并发会话

小结

本章覆盖了 Hermes Agent 生产环境横向扩展的完整方案:

  1. 理解挑战:Agent 的会话状态、长连接、工具副作用使扩展比普通 Web 服务复杂得多。
  2. Nginx/HAProxy:通过 ip_hashsticky cookie 实现会话亲和性,是快速可用的临时方案。
  3. 无状态化:将所有状态外移到 Redis 是长期正解,使任意实例都能处理任意请求。
  4. Redis 设计:合理选择数据结构(List/Hash/String)、设置 TTL、使用分布式锁防并发。
  5. 瓶颈识别:LLM 推理通常是最大瓶颈,其次是 Redis I/O 和工具调用延迟。
  6. Kubernetes HPA:结合自定义指标(活跃会话数)实现弹性扩缩容。

思考题

  1. 当 Redis 发生故障时,无状态化设计的 Agent 集群会有什么影响?如何设计降级方案?
  2. 对于需要长期记忆(跨会话)的 Agent,Redis TTL 策略如何制定?是否需要引入向量数据库?
  3. 如果工具调用是幂等的,是否可以完全取消 Session 亲和性?哪些类型的工具天然不幂等?
  4. leastconnip_hash 两种负载均衡策略各适合什么场景?对 Agent 服务的影响有何不同?
本章评分
4.7  / 5  (3 评分)

💬 留言讨论