第 55 章
多 Agent 协作:Swarm 模式
第55章:多 Agent 协作:Swarm 模式
导语
Orchestrator 模式依赖一个中心化的主控 Agent 来协调所有工作,这在设计上清晰直观,但也带来了单点故障和扩展瓶颈的风险。Swarm(群体)模式提供了另一种哲学:没有中央控制器,每个 Agent 根据局部信息做出决策,通过消息传递和共享状态实现全局协调。这种去中心化设计在生物界随处可见——蚂蚁、蜜蜂、鸟群的集体智慧正是 Swarm 模式的自然灵感来源。本章深入探讨 Swarm 的设计理念、实现挑战,以及在 Hermes 当前架构下的变通方案。
55.1 Swarm 去中心化设计理念
为什么需要去中心化
Orchestrator 模式的潜在问题:
| 问题 | 描述 | 影响 |
|---|---|---|
| 单点故障 | Orchestrator 崩溃,整个系统停摆 | 可用性下降 |
| 扩展瓶颈 | 所有任务都必须通过 Orchestrator 调度 | 吞吐量受限 |
| 认知过载 | Orchestrator 需要了解所有 Worker 的能力 | 系统提示词复杂化 |
| 延迟增加 | 每个任务都需要额外的协调开销 | 响应时间变长 |
Swarm 的核心思想:涌现(Emergence)——复杂的全局行为由简单的局部规则产生,不需要任何中央控制器。
flowchart TD
subgraph Orchestrator模式["Orchestrator 模式(中心化)"]
O[Orchestrator] --> W1[Worker 1]
O --> W2[Worker 2]
O --> W3[Worker 3]
W1 --> O
W2 --> O
W3 --> O
end
subgraph Swarm模式["Swarm 模式(去中心化)"]
S1[Agent 1] <--> S2[Agent 2]
S2 <--> S3[Agent 3]
S3 <--> S1
S1 <--> SharedState[(共享状态)]
S2 <--> SharedState
S3 <--> SharedState
end
Swarm 的四个核心原则
- 局部感知(Local Perception):每个 Agent 只看到局部信息,不需要全局视角
- 简单规则(Simple Rules):每个 Agent 的行为规则简单,但集体涌现出复杂行为
- 消息传递(Message Passing):Agent 间通过消息协调,无需共享内存
- 自组织(Self-Organization):任务分配由 Agent 自行决定,而非中央指派
55.2 Agent 间消息传递协议
# swarm_messaging.py
"""
Swarm 模式的消息传递系统
支持广播、点对点、订阅/发布三种通信模式
"""
import asyncio
import json
import time
import uuid
from dataclasses import dataclass, field
from typing import Optional, Callable, Any
from enum import Enum
class MessageType(Enum):
BROADCAST = "broadcast" # 广播:发给所有 Agent
DIRECT = "direct" # 点对点:发给特定 Agent
TASK_CLAIM = "task_claim" # 声明领取某个任务
TASK_RESULT = "task_result" # 发布任务结果
HEARTBEAT = "heartbeat" # 心跳(Agent 存活信号)
REQUEST_HELP = "request_help" # 请求协助
@dataclass
class SwarmMessage:
"""Swarm Agent 间的消息格式"""
msg_id: str = field(default_factory=lambda: uuid.uuid4().hex[:8])
sender_id: str = ""
receiver_id: Optional[str] = None # None 表示广播
msg_type: MessageType = MessageType.BROADCAST
content: Any = None
timestamp: float = field(default_factory=time.time)
priority: int = 0 # 0=普通, 1=高, 2=紧急
def to_dict(self) -> dict:
return {
"msg_id": self.msg_id,
"sender_id": self.sender_id,
"receiver_id": self.receiver_id,
"msg_type": self.msg_type.value,
"content": self.content,
"timestamp": self.timestamp,
"priority": self.priority
}
class MessageBus:
"""
Swarm 消息总线
实现发布/订阅模式的消息路由
"""
def __init__(self):
# agent_id -> asyncio.Queue
self._queues: dict[str, asyncio.Queue] = {}
self._topic_subscribers: dict[str, list[str]] = {} # topic -> [agent_ids]
self._history: list[SwarmMessage] = []
self._max_history = 1000
def register_agent(self, agent_id: str) -> asyncio.Queue:
"""注册 Agent,返回其专属消息队列"""
queue = asyncio.Queue(maxsize=100)
self._queues[agent_id] = queue
print(f"[MessageBus] Agent {agent_id} 已注册")
return queue
def unregister_agent(self, agent_id: str) -> None:
self._queues.pop(agent_id, None)
async def publish(self, message: SwarmMessage) -> int:
"""
发布消息,返回接收到消息的 Agent 数量
"""
# 记录历史
self._history.append(message)
if len(self._history) > self._max_history:
self._history.pop(0)
delivered = 0
if message.msg_type == MessageType.BROADCAST or message.receiver_id is None:
# 广播:发给所有 Agent(除发送者)
for agent_id, queue in self._queues.items():
if agent_id != message.sender_id:
try:
queue.put_nowait(message)
delivered += 1
except asyncio.QueueFull:
print(f"[MessageBus] Agent {agent_id} 的消息队列已满,丢弃消息")
elif message.receiver_id in self._queues:
# 点对点:发给特定 Agent
try:
self._queues[message.receiver_id].put_nowait(message)
delivered = 1
except asyncio.QueueFull:
print(f"[MessageBus] Agent {message.receiver_id} 的消息队列已满")
return delivered
def get_history(self, since: float = 0, msg_type: Optional[MessageType] = None) -> list:
"""获取消息历史"""
msgs = [m for m in self._history if m.timestamp > since]
if msg_type:
msgs = [m for m in msgs if m.msg_type == msg_type]
return msgs
@property
def active_agents(self) -> list[str]:
return list(self._queues.keys())
55.3 共享状态管理
# swarm_state.py
"""
Swarm 共享状态管理
使用乐观锁实现并发安全的状态更新
"""
import asyncio
import json
import time
from typing import Any, Optional
from dataclasses import dataclass, field
import hashlib
@dataclass
class TaskItem:
"""共享任务队列中的任务"""
task_id: str
description: str
required_capability: str # 'search', 'code', 'write', 'analyze'
priority: int = 0
status: str = "available" # available/claimed/completed/failed
claimed_by: Optional[str] = None
result: Optional[str] = None
created_at: float = field(default_factory=time.time)
claimed_at: Optional[float] = None
class SharedSwarmState:
"""
Swarm 共享状态
所有 Agent 读写同一个共享状态空间
使用 asyncio.Lock 实现并发安全
"""
def __init__(self):
self._lock = asyncio.Lock()
self._tasks: dict[str, TaskItem] = {}
self._agent_status: dict[str, dict] = {} # agent_id -> {capability, last_seen, workload}
self._global_context: dict[str, Any] = {} # 全局上下文信息
self._version: int = 0
# ─── 任务管理 ─────────────────────────────────────────────
async def add_task(self, task: TaskItem) -> None:
async with self._lock:
self._tasks[task.task_id] = task
self._version += 1
async def claim_task(self, task_id: str, agent_id: str) -> bool:
"""
原子性地领取任务(避免竞争条件)
返回 True 表示成功领取
"""
async with self._lock:
task = self._tasks.get(task_id)
if task is None or task.status != "available":
return False
task.status = "claimed"
task.claimed_by = agent_id
task.claimed_at = time.time()
self._version += 1
return True
async def complete_task(self, task_id: str, agent_id: str, result: str) -> bool:
async with self._lock:
task = self._tasks.get(task_id)
if task is None or task.claimed_by != agent_id:
return False
task.status = "completed"
task.result = result
self._version += 1
return True
async def fail_task(self, task_id: str, agent_id: str, error: str) -> None:
async with self._lock:
task = self._tasks.get(task_id)
if task and task.claimed_by == agent_id:
task.status = "failed"
task.result = f"Error: {error}"
self._version += 1
async def get_available_tasks(self, capability: Optional[str] = None) -> list[TaskItem]:
"""获取可用任务,可按能力过滤"""
async with self._lock:
tasks = [t for t in self._tasks.values() if t.status == "available"]
if capability:
tasks = [t for t in tasks if t.required_capability == capability]
return sorted(tasks, key=lambda t: (-t.priority, t.created_at))
async def get_task_results(self) -> dict[str, str]:
"""获取所有已完成任务的结果"""
async with self._lock:
return {
tid: t.result
for tid, t in self._tasks.items()
if t.status == "completed" and t.result
}
# ─── Agent 状态管理 ────────────────────────────────────────
async def register_agent(self, agent_id: str, capability: str) -> None:
async with self._lock:
self._agent_status[agent_id] = {
"capability": capability,
"last_seen": time.time(),
"workload": 0,
"tasks_completed": 0
}
async def update_agent_heartbeat(self, agent_id: str, workload: int = 0) -> None:
async with self._lock:
if agent_id in self._agent_status:
self._agent_status[agent_id]["last_seen"] = time.time()
self._agent_status[agent_id]["workload"] = workload
async def get_active_agents(self, timeout: float = 30.0) -> list[dict]:
"""获取最近活跃的 Agent 列表"""
async with self._lock:
now = time.time()
return [
{"id": aid, **status}
for aid, status in self._agent_status.items()
if now - status["last_seen"] < timeout
]
# ─── 全局上下文 ────────────────────────────────────────────
async def set_context(self, key: str, value: Any) -> None:
async with self._lock:
self._global_context[key] = value
self._version += 1
async def get_context(self, key: str, default: Any = None) -> Any:
async with self._lock:
return self._global_context.get(key, default)
@property
async def version(self) -> int:
async with self._lock:
return self._version
55.4 Swarm Agent 实现
# swarm_agent.py
"""
Swarm Agent 实现
每个 Agent 自主决策,通过消息和共享状态协调
"""
import asyncio
import uuid
import time
from typing import Optional
from openai import AsyncOpenAI
from swarm_messaging import MessageBus, SwarmMessage, MessageType
from swarm_state import SharedSwarmState, TaskItem
class SwarmAgent:
"""
Swarm Agent:去中心化的自主 Agent
每个 Agent:
1. 持续监听共享任务队列
2. 根据自身能力领取适合的任务
3. 执行任务后广播结果
4. 监听其他 Agent 的帮助请求
"""
def __init__(
self,
agent_id: str,
capability: str, # 'search', 'code', 'write', 'analyze'
model: str,
client: AsyncOpenAI,
message_bus: MessageBus,
shared_state: SharedSwarmState,
max_concurrent_tasks: int = 2
):
self.agent_id = agent_id
self.capability = capability
self.model = model
self.client = client
self.bus = message_bus
self.state = shared_state
self.max_concurrent = max_concurrent_tasks
# 注册到消息总线
self.inbox = message_bus.register_agent(agent_id)
self.current_tasks: set[str] = set()
self.running = False
# 该 Agent 的系统提示词(按能力专业化)
self.system_prompt = self._build_system_prompt()
def _build_system_prompt(self) -> str:
prompts = {
"search": "你是一个信息搜索专家,擅长收集和整理互联网信息。",
"code": "你是一个代码专家,擅长编写、调试和分析代码。",
"write": "你是一个写作专家,擅长结构化写作和内容创作。",
"analyze": "你是一个数据分析专家,擅长处理和分析各类数据。"
}
return prompts.get(self.capability, "你是一个通用 AI Agent。")
async def _execute_task(self, task: TaskItem) -> str:
"""执行具体任务"""
# 获取相关的全局上下文
context = await self.state.get_context("task_context", {})
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.system_prompt},
{
"role": "user",
"content": f"""任务:{task.description}
全局上下文:{context}
请完成这个任务并返回详细结果。"""
}
],
temperature=0.2,
max_tokens=2048
)
return response.choices[0].message.content or "任务完成"
async def _handle_message(self, msg: SwarmMessage) -> None:
"""处理接收到的消息"""
if msg.msg_type == MessageType.REQUEST_HELP:
# 如果有余力,响应帮助请求
if len(self.current_tasks) < self.max_concurrent:
content = msg.content
help_task_id = content.get("task_id")
if help_task_id and await self.state.claim_task(help_task_id, self.agent_id):
print(f"[{self.agent_id}] 响应帮助请求,领取任务 {help_task_id}")
self.current_tasks.add(help_task_id)
task = (await self.state.get_available_tasks())
# 实际执行见 run_loop
elif msg.msg_type == MessageType.TASK_RESULT:
# 记录其他 Agent 完成的任务(可用于学习)
content = msg.content
print(f"[{self.agent_id}] 收到 {msg.sender_id} 的任务结果")
async def run_loop(self, stop_event: asyncio.Event) -> None:
"""
Agent 主运行循环
持续轮询任务队列并处理消息
"""
self.running = True
await self.state.register_agent(self.agent_id, self.capability)
print(f"[{self.agent_id}] 启动,能力:{self.capability}")
while not stop_event.is_set():
try:
# 1. 处理消息队列
while not self.inbox.empty():
msg = self.inbox.get_nowait()
await self._handle_message(msg)
# 2. 如果当前任务数未达上限,尝试领取新任务
if len(self.current_tasks) < self.max_concurrent:
available = await self.state.get_available_tasks(self.capability)
for task in available:
if task.task_id not in self.current_tasks:
if await self.state.claim_task(task.task_id, self.agent_id):
self.current_tasks.add(task.task_id)
print(f"[{self.agent_id}] 领取任务: {task.task_id} - {task.description[:50]}")
# 异步执行任务
asyncio.create_task(
self._run_task(task)
)
break
# 3. 发送心跳
await self.state.update_agent_heartbeat(
self.agent_id,
workload=len(self.current_tasks)
)
await asyncio.sleep(0.5) # 轮询间隔
except Exception as e:
print(f"[{self.agent_id}] 运行时错误: {e}")
await asyncio.sleep(1)
self.running = False
print(f"[{self.agent_id}] 停止运行")
async def _run_task(self, task: TaskItem) -> None:
"""异步执行单个任务"""
try:
result = await self._execute_task(task)
await self.state.complete_task(task.task_id, self.agent_id, result)
# 广播任务完成
await self.bus.publish(SwarmMessage(
sender_id=self.agent_id,
msg_type=MessageType.TASK_RESULT,
content={"task_id": task.task_id, "result_preview": result[:100]}
))
print(f"[{self.agent_id}] 完成任务: {task.task_id}")
except Exception as e:
await self.state.fail_task(task.task_id, self.agent_id, str(e))
print(f"[{self.agent_id}] 任务 {task.task_id} 失败: {e}")
finally:
self.current_tasks.discard(task.task_id)
55.5 Swarm 完整运行示例
# swarm_demo.py
"""
Swarm 模式完整演示
启动多个 Agent,并发处理任务队列
"""
import asyncio
import uuid
from openai import AsyncOpenAI
from swarm_messaging import MessageBus
from swarm_state import SharedSwarmState, TaskItem
from swarm_agent import SwarmAgent
async def run_swarm(task_list: list[dict], num_agents_per_type: int = 2):
"""
启动 Swarm 集群,处理任务列表
"""
client = AsyncOpenAI(base_url="http://localhost:8000/v1", api_key="not-needed")
model = "NousResearch/Hermes-3-Llama-3.1-8B"
# 初始化共享基础设施
bus = MessageBus()
state = SharedSwarmState()
# 设置全局上下文
await state.set_context("task_context", {"project": "AI市场分析", "language": "中文"})
# 创建 Agent 集群
agents = []
capabilities = ["search", "analyze", "write"]
for capability in capabilities:
for i in range(num_agents_per_type):
agent_id = f"{capability}_{i+1}"
agent = SwarmAgent(
agent_id=agent_id,
capability=capability,
model=model,
client=client,
message_bus=bus,
shared_state=state
)
agents.append(agent)
# 将任务添加到共享队列
for task_data in task_list:
task = TaskItem(
task_id=uuid.uuid4().hex[:8],
description=task_data["description"],
required_capability=task_data["capability"],
priority=task_data.get("priority", 0)
)
await state.add_task(task)
print(f"\n[Swarm] 启动 {len(agents)} 个 Agent,处理 {len(task_list)} 个任务")
# 启动所有 Agent 的运行循环
stop_event = asyncio.Event()
agent_tasks = [
asyncio.create_task(agent.run_loop(stop_event))
for agent in agents
]
# 等待所有任务完成(最多等待 120 秒)
start_time = asyncio.get_event_loop().time()
while (asyncio.get_event_loop().time() - start_time) < 120:
available = await state.get_available_tasks()
results = await state.get_task_results()
if len(results) >= len(task_list):
print(f"\n[Swarm] 所有任务完成!")
break
claimed = sum(1 for t in state._tasks.values() if t.status == "claimed")
completed = len(results)
print(f"[Swarm] 进度: {completed}/{len(task_list)} 完成, {claimed} 执行中, {len(available)} 待处理")
await asyncio.sleep(5)
# 停止所有 Agent
stop_event.set()
await asyncio.gather(*agent_tasks, return_exceptions=True)
# 汇总结果
final_results = await state.get_task_results()
print(f"\n[Swarm] 完成任务: {len(final_results)}/{len(task_list)}")
return final_results
async def main():
# 定义任务列表
tasks = [
{"description": "搜索 OpenAI 2024年最新产品和功能更新", "capability": "search", "priority": 1},
{"description": "搜索 Anthropic Claude 2024年的发展动态", "capability": "search", "priority": 1},
{"description": "搜索 Google Gemini 的最新进展", "capability": "search", "priority": 1},
{"description": "分析三大AI公司的市场份额变化趋势", "capability": "analyze", "priority": 0},
{"description": "分析各模型在代码、推理、创意写作三个维度的表现对比", "capability": "analyze", "priority": 0},
{"description": "撰写'2024年大模型竞争格局'分析报告执行摘要(500字)", "capability": "write", "priority": -1},
]
results = await run_swarm(tasks, num_agents_per_type=2)
print("\n" + "="*60)
print("Swarm 执行结果摘要")
print("="*60)
for task_id, result in results.items():
print(f"\n任务 {task_id}:\n{result[:200]}...")
if __name__ == "__main__":
asyncio.run(main())
55.6 与 Orchestrator 模式的选型对比
flowchart LR
Q1{任务是否\n高度结构化?}
Q1 -->|是| Q2{是否需要\n严格顺序控制?}
Q1 -->|否| Q3{是否要求\n高可用性?}
Q2 -->|是| ORC[Orchestrator 模式]
Q2 -->|否| Q4{任务数量\n是否很多?}
Q3 -->|是| SWARM[Swarm 模式]
Q3 -->|否| ORC
Q4 -->|是| SWARM
Q4 -->|否| ORC
| 选型维度 | Orchestrator | Swarm |
|---|---|---|
| 任务类型 | 结构化、有明确依赖关系 | 并行、相对独立 |
| 规模 | 小规模(< 10 个 Worker) | 大规模(> 10 个 Agent) |
| 一致性要求 | 高(需要精确控制执行顺序) | 低(最终一致即可) |
| 容错需求 | 低(有中央协调) | 高(无中央依赖) |
| 实现复杂度 | 低 | 高 |
| 调试难度 | 低(中心化日志) | 高(分布式追踪) |
| 延迟 | 有额外协调延迟 | 更低(直接执行) |
| 动态扩展 | 需要更新 Orchestrator | 直接添加新 Agent |
55.7 Hermes 当前限制与变通方案
当前限制
Hermes Agent 目前对 Swarm 的原生支持有限:
- 无内置消息总线:Agent 之间没有直接通信机制,需要手动实现
- 无共享上下文:子 Agent 的上下文完全隔离,不能直接读取彼此的记忆
- 无身份系统:计划中支持有身份的专用 Agent,但当前版本尚未实现
变通方案
| 限制 | 变通方案 |
|---|---|
| 无消息总线 | 使用 Redis Pub/Sub 或 RabbitMQ 实现外部消息总线 |
| 无共享上下文 | 使用 Redis Hash 实现共享状态存储 |
| 无身份系统 | 用约定好的 agent_id 前缀区分 Agent 类型 |
| 无协调机制 | 用共享任务队列(Redis List)实现去中心化任务分配 |
# redis_swarm_bus.py
"""使用 Redis 实现生产级 Swarm 消息总线"""
import redis.asyncio as aioredis
import json
class RedisSwarmBus:
"""基于 Redis Pub/Sub 的生产级消息总线"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = aioredis.from_url(redis_url)
self.CHANNEL_PREFIX = "hermes:swarm:"
async def publish(self, channel: str, message: dict) -> None:
full_channel = f"{self.CHANNEL_PREFIX}{channel}"
await self.redis.publish(full_channel, json.dumps(message))
async def subscribe(self, agent_id: str, capabilities: list[str]):
"""订阅与 Agent 能力相关的频道"""
pubsub = self.redis.pubsub()
channels = [
f"{self.CHANNEL_PREFIX}broadcast",
f"{self.CHANNEL_PREFIX}agent:{agent_id}"
] + [f"{self.CHANNEL_PREFIX}capability:{cap}" for cap in capabilities]
await pubsub.subscribe(*channels)
return pubsub
async def claim_task(self, task_id: str, agent_id: str, timeout: int = 30) -> bool:
"""使用 Redis SET NX 实现原子性任务领取"""
key = f"hermes:swarm:claim:{task_id}"
result = await self.redis.set(key, agent_id, nx=True, ex=timeout)
return result is True
小结
本章深入探讨了 Swarm 去中心化多 Agent 模式:
- 去中心化哲学:Swarm 消除了单点故障,每个 Agent 自主决策,涌现出复杂协调行为。
- 消息传递协议:基于
asyncio.Queue的消息总线支持广播、点对点、订阅/发布三种模式。 - 共享状态设计:使用
asyncio.Lock实现并发安全的任务领取,避免竞争条件。 - 与 Orchestrator 对比:Swarm 适合大规模并行任务,Orchestrator 适合结构化依赖任务。
- 变通方案:Redis Pub/Sub + 共享任务队列是 Hermes 当前架构下实现 Swarm 的最佳路径。
思考题
- Swarm 模式如何处理任务的优先级?如果某个高优先级任务长时间没有被领取,如何处理?
- 当一个 Agent 意外崩溃时,它已经领取但未完成的任务如何恢复?需要什么机制?
- Swarm 中的"涌现行为"在 AI Agent 场景下有什么具体表现?是否有可能产生不期望的涌现行为?
- 如果需要在 Swarm 中引入一个"监控 Agent"来追踪系统整体状态,这是否会让系统重新变成中心化架构?