第 49 章
负载均衡与横向扩展
第49章:负载均衡与横向扩展
导语
当 Hermes Agent 从实验室走向生产环境,单实例部署迅速成为瓶颈。随着并发请求量上升,响应延迟攀升,错误率开始出现,系统稳定性岌岌可危。本章系统讲解如何通过 Nginx/HAProxy 反向代理、无状态化设计、Redis 共享记忆层等技术手段,将 Hermes Agent 扩展为高可用的分布式服务。
49.1 为什么 Agent 扩展比普通 Web 服务更难
传统 Web 服务的扩展相对简单:无状态 HTTP 服务可以任意复制,负载均衡器将请求均匀分发即可。但 Hermes Agent 存在几个关键差异:
1. 会话状态的持久性
Agent 在执行多步任务时,维护着大量中间状态:
- 当前执行步骤(第几轮 ReAct 循环)
- 工具调用历史与结果
- 对话上下文(消息列表)
- 用户偏好与授权令牌
如果请求 A 在实例 1 上建立会话,请求 B 被转发到实例 2,则实例 2 找不到任何上下文,任务将从零开始或直接失败。
2. 长连接与流式响应
Hermes Agent 通常使用 SSE(Server-Sent Events)或 WebSocket 向客户端流式返回 Token。这要求负载均衡器在整个 Agent 执行期间(可能长达数分钟)保持连接稳定。
3. 工具调用的副作用
Agent 调用工具(如写文件、发邮件、查数据库)时,这些操作不可简单重试。如果实例崩溃导致任务被重新调度到另一实例,可能产生重复操作。
flowchart TD
Client[客户端] --> LB[负载均衡器]
LB --> A1[Agent 实例 1]
LB --> A2[Agent 实例 2]
LB --> A3[Agent 实例 3]
A1 --> Redis[(Redis 共享记忆)]
A2 --> Redis
A3 --> Redis
A1 --> Tools[工具层]
A2 --> Tools
A3 --> Tools
Redis --> PG[(PostgreSQL 持久化)]
49.2 Nginx 反向代理配置
基础配置
# /etc/nginx/conf.d/hermes-agent.conf
upstream hermes_backend {
# 会话亲和性:基于 session_id cookie
ip_hash;
server 10.0.1.10:8000 weight=1 max_fails=3 fail_timeout=30s;
server 10.0.1.11:8000 weight=1 max_fails=3 fail_timeout=30s;
server 10.0.1.12:8000 weight=1 max_fails=3 fail_timeout=30s;
keepalive 32;
}
server {
listen 80;
server_name agent.example.com;
# SSE 长连接优化
proxy_read_timeout 300s;
proxy_send_timeout 300s;
proxy_connect_timeout 10s;
# 禁用缓冲(SSE 必需)
proxy_buffering off;
proxy_cache off;
location /api/agent/ {
proxy_pass http://hermes_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# 会话亲和性 Cookie
proxy_set_header Cookie $http_cookie;
}
# 健康检查端点
location /health {
proxy_pass http://hermes_backend/health;
access_log off;
}
}
基于 Cookie 的会话亲和性(更精确)
upstream hermes_backend {
# 使用 sticky 模块实现 Cookie 亲和性
sticky cookie srv_id expires=1h domain=.example.com path=/;
server 10.0.1.10:8000;
server 10.0.1.11:8000;
server 10.0.1.12:8000;
}
注意:
sticky指令需要 Nginx Plus 或第三方模块nginx-sticky-module-ng。
49.3 HAProxy 配置(推荐用于生产)
HAProxy 对长连接和健康检查的支持比开源 Nginx 更成熟:
# /etc/haproxy/haproxy.cfg
global
maxconn 50000
log /dev/log local0
chroot /var/lib/haproxy
stats socket /run/haproxy/admin.sock mode 660 level admin
user haproxy
group haproxy
daemon
defaults
log global
mode http
option httplog
option dontlognull
timeout connect 5s
timeout client 300s # Agent 长任务需要足够长的超时
timeout server 300s
timeout tunnel 1h # WebSocket/SSE 隧道超时
frontend hermes_frontend
bind *:80
bind *:443 ssl crt /etc/ssl/certs/agent.pem
# 基于 session_id 头部的亲和性
use_backend hermes_backend
# 捕获 session_id 用于日志
http-request capture req.hdr(X-Session-Id) len 64
backend hermes_backend
balance leastconn # 最少连接,适合长任务
# Cookie 亲和性
cookie SERVERID insert indirect nocache
# 健康检查
option httpchk GET /health
http-check expect status 200
server agent1 10.0.1.10:8000 check cookie agent1 inter 5s rise 2 fall 3
server agent2 10.0.1.11:8000 check cookie agent2 inter 5s rise 2 fall 3
server agent3 10.0.1.12:8000 check cookie agent3 inter 5s rise 2 fall 3
# 统计页面
listen stats
bind *:8404
stats enable
stats uri /stats
stats refresh 30s
stats auth admin:your_password
49.4 无状态化设计方案
解决 Session 亲和性问题的根本方法是无状态化:将所有状态外移到共享存储,使每个请求可以被任意实例处理。
状态分类与存储策略
| 状态类型 | 大小 | 更新频率 | 推荐存储 |
|---|---|---|---|
| 对话历史(消息列表) | 中(~10KB) | 每轮更新 | Redis(TTL 24h) |
| Agent 执行状态 | 小(~1KB) | 每步更新 | Redis |
| 工具调用结果缓存 | 可变(1KB~10MB) | 只写 | Redis + S3 |
| 长期记忆/知识库 | 大(MB~GB) | 低频 | PostgreSQL / Qdrant |
| 用户偏好设置 | 小(~1KB) | 低频 | PostgreSQL |
无状态 Agent 实现
# stateless_agent.py
import json
import uuid
from typing import Optional
import redis.asyncio as aioredis
from hermes import HermesAgent, AgentConfig
from dataclasses import dataclass, asdict
@dataclass
class AgentSession:
"""可序列化的 Agent 会话状态"""
session_id: str
messages: list
current_step: int
tool_results: dict
metadata: dict
def to_json(self) -> str:
return json.dumps(asdict(self), ensure_ascii=False)
@classmethod
def from_json(cls, data: str) -> 'AgentSession':
return cls(**json.loads(data))
class StatelessHermesAgent:
"""无状态 Hermes Agent,所有状态存储在 Redis"""
SESSION_TTL = 86400 # 24小时
SESSION_PREFIX = "hermes:session:"
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_url = redis_url
self._redis: Optional[aioredis.Redis] = None
self.agent_config = AgentConfig(
model="NousResearch/Hermes-3-Llama-3.1-8B",
max_steps=20,
tools=["web_search", "code_exec", "file_read"]
)
async def _get_redis(self) -> aioredis.Redis:
if self._redis is None:
self._redis = await aioredis.from_url(
self.redis_url,
encoding="utf-8",
decode_responses=True
)
return self._redis
async def create_session(self, user_id: str) -> str:
"""创建新的 Agent 会话,返回 session_id"""
session_id = f"{user_id}:{uuid.uuid4().hex}"
session = AgentSession(
session_id=session_id,
messages=[],
current_step=0,
tool_results={},
metadata={"user_id": user_id, "created_at": str(__import__("time").time())}
)
redis = await self._get_redis()
key = f"{self.SESSION_PREFIX}{session_id}"
await redis.setex(key, self.SESSION_TTL, session.to_json())
return session_id
async def load_session(self, session_id: str) -> Optional[AgentSession]:
"""从 Redis 加载会话状态"""
redis = await self._get_redis()
key = f"{self.SESSION_PREFIX}{session_id}"
data = await redis.get(key)
if data is None:
return None
# 续期 TTL
await redis.expire(key, self.SESSION_TTL)
return AgentSession.from_json(data)
async def save_session(self, session: AgentSession) -> None:
"""保存会话状态到 Redis"""
redis = await self._get_redis()
key = f"{self.SESSION_PREFIX}{session.session_id}"
await redis.setex(key, self.SESSION_TTL, session.to_json())
async def run_step(self, session_id: str, user_message: str) -> dict:
"""
执行一步 Agent 推理。
无论哪个实例处理请求,都能从 Redis 恢复完整状态。
"""
# 1. 加载状态
session = await self.load_session(session_id)
if session is None:
raise ValueError(f"Session {session_id} not found")
# 2. 添加用户消息
session.messages.append({
"role": "user",
"content": user_message
})
# 3. 执行 Agent 推理(本地无状态调用)
agent = HermesAgent(self.agent_config)
result = await agent.step(
messages=session.messages,
tool_results=session.tool_results,
step_number=session.current_step
)
# 4. 更新状态
session.messages.append({
"role": "assistant",
"content": result.content
})
session.current_step += 1
if result.tool_calls:
for tc in result.tool_calls:
session.tool_results[tc.id] = tc.result
# 5. 持久化状态
await self.save_session(session)
return {
"session_id": session_id,
"step": session.current_step,
"content": result.content,
"tool_calls": result.tool_calls,
"is_final": result.is_final
}
async def delete_session(self, session_id: str) -> None:
"""清理会话"""
redis = await self._get_redis()
key = f"{self.SESSION_PREFIX}{session_id}"
await redis.delete(key)
# FastAPI 应用示例
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
agent_service = StatelessHermesAgent(redis_url="redis://redis-cluster:6379")
@app.post("/api/sessions")
async def create_session(user_id: str):
session_id = await agent_service.create_session(user_id)
return {"session_id": session_id}
@app.post("/api/sessions/{session_id}/run")
async def run_agent(session_id: str, message: str):
try:
result = await agent_service.run_step(session_id, message)
return result
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@app.get("/health")
async def health():
return {"status": "ok"}
49.5 Redis 共享记忆层设计
Redis 在这个架构中承担多重职责:
数据结构选择
# redis_memory.py
import redis.asyncio as aioredis
import json
from typing import Any, Optional
class HermesRedisMemory:
"""
Hermes Agent 的 Redis 共享记忆层
支持多实例共享、原子操作、TTL 管理
"""
def __init__(self, redis_url: str):
self.redis = aioredis.from_url(redis_url)
# ─── 消息历史(List 结构)───────────────────────────────
async def append_message(self, session_id: str, message: dict) -> int:
"""追加消息,返回当前消息数量"""
key = f"hermes:msg:{session_id}"
length = await self.redis.rpush(key, json.dumps(message, ensure_ascii=False))
await self.redis.expire(key, 86400)
return length
async def get_messages(self, session_id: str, limit: int = -1) -> list:
"""获取消息历史,limit=-1 表示全部"""
key = f"hermes:msg:{session_id}"
start = 0 if limit == -1 else max(0, -limit)
items = await self.redis.lrange(key, start, -1)
return [json.loads(item) for item in items]
async def trim_messages(self, session_id: str, keep_last: int = 50) -> None:
"""裁剪消息历史,防止无限增长"""
key = f"hermes:msg:{session_id}"
await self.redis.ltrim(key, -keep_last, -1)
# ─── 工具结果缓存(Hash 结构)──────────────────────────
async def cache_tool_result(
self,
tool_call_id: str,
result: Any,
ttl: int = 3600
) -> None:
"""缓存工具调用结果(幂等保护)"""
key = f"hermes:tool:{tool_call_id}"
await self.redis.setex(key, ttl, json.dumps(result, ensure_ascii=False))
async def get_tool_result(self, tool_call_id: str) -> Optional[Any]:
"""获取缓存的工具结果"""
key = f"hermes:tool:{tool_call_id}"
data = await self.redis.get(key)
return json.loads(data) if data else None
# ─── 分布式锁(防止并发冲突)──────────────────────────
async def acquire_session_lock(
self,
session_id: str,
timeout: int = 30
) -> bool:
"""
获取会话锁,防止同一会话被多个实例并发处理
返回 True 表示成功获取锁
"""
key = f"hermes:lock:{session_id}"
result = await self.redis.set(key, "1", nx=True, ex=timeout)
return result is True
async def release_session_lock(self, session_id: str) -> None:
"""释放会话锁"""
key = f"hermes:lock:{session_id}"
await self.redis.delete(key)
# ─── 全局统计计数器──────────────────────────────────────
async def increment_counter(self, metric: str) -> int:
"""原子递增计数器"""
key = f"hermes:counter:{metric}"
return await self.redis.incr(key)
async def get_active_sessions(self) -> int:
"""获取当前活跃会话数量"""
pattern = "hermes:msg:*"
keys = await self.redis.keys(pattern)
return len(keys)
Redis 集群配置(生产推荐)
# docker-compose.yml 中的 Redis Sentinel 配置
services:
redis-master:
image: redis:7-alpine
command: redis-server --appendonly yes --maxmemory 4gb --maxmemory-policy allkeys-lru
volumes:
- redis-data:/data
redis-replica-1:
image: redis:7-alpine
command: redis-server --replicaof redis-master 6379
depends_on:
- redis-master
redis-sentinel:
image: redis:7-alpine
command: >
redis-sentinel /etc/sentinel.conf
volumes:
- ./sentinel.conf:/etc/sentinel.conf
49.6 扩展瓶颈分析与解决方案
瓶颈识别
# bottleneck_analyzer.py
import time
import asyncio
from collections import defaultdict
from typing import Dict, List
import statistics
class BottleneckAnalyzer:
"""分析 Hermes Agent 集群的性能瓶颈"""
def __init__(self):
self.latencies: Dict[str, List[float]] = defaultdict(list)
self.error_counts: Dict[str, int] = defaultdict(int)
def record_latency(self, component: str, latency_ms: float):
self.latencies[component].append(latency_ms)
# 只保留最近 1000 条
if len(self.latencies[component]) > 1000:
self.latencies[component].pop(0)
def get_report(self) -> dict:
report = {}
for component, lats in self.latencies.items():
if lats:
report[component] = {
"p50": statistics.median(lats),
"p95": sorted(lats)[int(len(lats) * 0.95)],
"p99": sorted(lats)[int(len(lats) * 0.99)],
"mean": statistics.mean(lats),
"count": len(lats)
}
return report
def identify_bottleneck(self) -> str:
"""识别主要瓶颈"""
report = self.get_report()
bottlenecks = []
thresholds = {
"llm_inference": 5000, # LLM 推理 >5s
"redis_read": 10, # Redis 读取 >10ms
"redis_write": 20, # Redis 写入 >20ms
"tool_execution": 2000, # 工具执行 >2s
}
for component, threshold in thresholds.items():
if component in report:
if report[component]["p95"] > threshold:
bottlenecks.append(
f"{component}: P95={report[component]['p95']:.0f}ms (threshold={threshold}ms)"
)
return "\n".join(bottlenecks) if bottlenecks else "No bottlenecks detected"
常见瓶颈与解决方案
| 瓶颈位置 | 症状 | 解决方案 |
|---|---|---|
| LLM 推理层 | 高延迟、GPU 利用率 100% | 增加 GPU 节点、模型量化(AWQ/GPTQ)、请求批处理 |
| Redis 层 | Redis CPU 高、读写延迟上升 | Redis Cluster 分片、读写分离(Replica 分担读) |
| 网络 I/O | 工具调用慢、外部 API 超时 | 连接池复用、异步并发调用、本地缓存 |
| 上下文长度 | 长任务后期推理变慢 | 上下文压缩(摘要)、滑动窗口、外部记忆 |
| 工具并发 | 多工具调用串行等待 | 并行工具调用(asyncio.gather) |
49.7 动态扩缩容(Kubernetes HPA)
# hermes-agent-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: hermes-agent
spec:
replicas: 3
selector:
matchLabels:
app: hermes-agent
template:
metadata:
labels:
app: hermes-agent
spec:
containers:
- name: hermes-agent
image: yourorg/hermes-agent:latest
resources:
requests:
memory: "4Gi"
cpu: "2"
nvidia.com/gpu: "1"
limits:
memory: "8Gi"
cpu: "4"
nvidia.com/gpu: "1"
env:
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: hermes-secrets
key: redis-url
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 60
periodSeconds: 30
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: hermes-agent-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: hermes-agent
minReplicas: 2
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Pods
pods:
metric:
name: hermes_active_sessions
target:
type: AverageValue
averageValue: "50" # 每个实例最多 50 个并发会话
小结
本章覆盖了 Hermes Agent 生产环境横向扩展的完整方案:
- 理解挑战:Agent 的会话状态、长连接、工具副作用使扩展比普通 Web 服务复杂得多。
- Nginx/HAProxy:通过
ip_hash或sticky cookie实现会话亲和性,是快速可用的临时方案。 - 无状态化:将所有状态外移到 Redis 是长期正解,使任意实例都能处理任意请求。
- Redis 设计:合理选择数据结构(List/Hash/String)、设置 TTL、使用分布式锁防并发。
- 瓶颈识别:LLM 推理通常是最大瓶颈,其次是 Redis I/O 和工具调用延迟。
- Kubernetes HPA:结合自定义指标(活跃会话数)实现弹性扩缩容。
思考题
- 当 Redis 发生故障时,无状态化设计的 Agent 集群会有什么影响?如何设计降级方案?
- 对于需要长期记忆(跨会话)的 Agent,Redis TTL 策略如何制定?是否需要引入向量数据库?
- 如果工具调用是幂等的,是否可以完全取消 Session 亲和性?哪些类型的工具天然不幂等?
leastconn和ip_hash两种负载均衡策略各适合什么场景?对 Agent 服务的影响有何不同?