第 55 章

多 Agent 协作:Swarm 模式

第55章:多 Agent 协作:Swarm 模式

导语

Orchestrator 模式依赖一个中心化的主控 Agent 来协调所有工作,这在设计上清晰直观,但也带来了单点故障和扩展瓶颈的风险。Swarm(群体)模式提供了另一种哲学:没有中央控制器,每个 Agent 根据局部信息做出决策,通过消息传递和共享状态实现全局协调。这种去中心化设计在生物界随处可见——蚂蚁、蜜蜂、鸟群的集体智慧正是 Swarm 模式的自然灵感来源。本章深入探讨 Swarm 的设计理念、实现挑战,以及在 Hermes 当前架构下的变通方案。


55.1 Swarm 去中心化设计理念

为什么需要去中心化

Orchestrator 模式的潜在问题:

问题 描述 影响
单点故障 Orchestrator 崩溃,整个系统停摆 可用性下降
扩展瓶颈 所有任务都必须通过 Orchestrator 调度 吞吐量受限
认知过载 Orchestrator 需要了解所有 Worker 的能力 系统提示词复杂化
延迟增加 每个任务都需要额外的协调开销 响应时间变长

Swarm 的核心思想:涌现(Emergence)——复杂的全局行为由简单的局部规则产生,不需要任何中央控制器。

flowchart TD
    subgraph Orchestrator模式["Orchestrator 模式(中心化)"]
        O[Orchestrator] --> W1[Worker 1]
        O --> W2[Worker 2]
        O --> W3[Worker 3]
        W1 --> O
        W2 --> O
        W3 --> O
    end
    
    subgraph Swarm模式["Swarm 模式(去中心化)"]
        S1[Agent 1] <--> S2[Agent 2]
        S2 <--> S3[Agent 3]
        S3 <--> S1
        S1 <--> SharedState[(共享状态)]
        S2 <--> SharedState
        S3 <--> SharedState
    end

Swarm 的四个核心原则

  1. 局部感知(Local Perception):每个 Agent 只看到局部信息,不需要全局视角
  2. 简单规则(Simple Rules):每个 Agent 的行为规则简单,但集体涌现出复杂行为
  3. 消息传递(Message Passing):Agent 间通过消息协调,无需共享内存
  4. 自组织(Self-Organization):任务分配由 Agent 自行决定,而非中央指派

55.2 Agent 间消息传递协议

# swarm_messaging.py
"""
Swarm 模式的消息传递系统
支持广播、点对点、订阅/发布三种通信模式
"""

import asyncio
import json
import time
import uuid
from dataclasses import dataclass, field
from typing import Optional, Callable, Any
from enum import Enum

class MessageType(Enum):
    BROADCAST = "broadcast"          # 广播:发给所有 Agent
    DIRECT = "direct"                # 点对点:发给特定 Agent
    TASK_CLAIM = "task_claim"        # 声明领取某个任务
    TASK_RESULT = "task_result"      # 发布任务结果
    HEARTBEAT = "heartbeat"          # 心跳(Agent 存活信号)
    REQUEST_HELP = "request_help"    # 请求协助


@dataclass
class SwarmMessage:
    """Swarm Agent 间的消息格式"""
    msg_id: str = field(default_factory=lambda: uuid.uuid4().hex[:8])
    sender_id: str = ""
    receiver_id: Optional[str] = None  # None 表示广播
    msg_type: MessageType = MessageType.BROADCAST
    content: Any = None
    timestamp: float = field(default_factory=time.time)
    priority: int = 0   # 0=普通, 1=高, 2=紧急
    
    def to_dict(self) -> dict:
        return {
            "msg_id": self.msg_id,
            "sender_id": self.sender_id,
            "receiver_id": self.receiver_id,
            "msg_type": self.msg_type.value,
            "content": self.content,
            "timestamp": self.timestamp,
            "priority": self.priority
        }


class MessageBus:
    """
    Swarm 消息总线
    实现发布/订阅模式的消息路由
    """
    
    def __init__(self):
        # agent_id -> asyncio.Queue
        self._queues: dict[str, asyncio.Queue] = {}
        self._topic_subscribers: dict[str, list[str]] = {}  # topic -> [agent_ids]
        self._history: list[SwarmMessage] = []
        self._max_history = 1000
    
    def register_agent(self, agent_id: str) -> asyncio.Queue:
        """注册 Agent,返回其专属消息队列"""
        queue = asyncio.Queue(maxsize=100)
        self._queues[agent_id] = queue
        print(f"[MessageBus] Agent {agent_id} 已注册")
        return queue
    
    def unregister_agent(self, agent_id: str) -> None:
        self._queues.pop(agent_id, None)
    
    async def publish(self, message: SwarmMessage) -> int:
        """
        发布消息,返回接收到消息的 Agent 数量
        """
        # 记录历史
        self._history.append(message)
        if len(self._history) > self._max_history:
            self._history.pop(0)
        
        delivered = 0
        
        if message.msg_type == MessageType.BROADCAST or message.receiver_id is None:
            # 广播:发给所有 Agent(除发送者)
            for agent_id, queue in self._queues.items():
                if agent_id != message.sender_id:
                    try:
                        queue.put_nowait(message)
                        delivered += 1
                    except asyncio.QueueFull:
                        print(f"[MessageBus] Agent {agent_id} 的消息队列已满,丢弃消息")
        
        elif message.receiver_id in self._queues:
            # 点对点:发给特定 Agent
            try:
                self._queues[message.receiver_id].put_nowait(message)
                delivered = 1
            except asyncio.QueueFull:
                print(f"[MessageBus] Agent {message.receiver_id} 的消息队列已满")
        
        return delivered
    
    def get_history(self, since: float = 0, msg_type: Optional[MessageType] = None) -> list:
        """获取消息历史"""
        msgs = [m for m in self._history if m.timestamp > since]
        if msg_type:
            msgs = [m for m in msgs if m.msg_type == msg_type]
        return msgs
    
    @property
    def active_agents(self) -> list[str]:
        return list(self._queues.keys())

55.3 共享状态管理

# swarm_state.py
"""
Swarm 共享状态管理
使用乐观锁实现并发安全的状态更新
"""

import asyncio
import json
import time
from typing import Any, Optional
from dataclasses import dataclass, field
import hashlib


@dataclass
class TaskItem:
    """共享任务队列中的任务"""
    task_id: str
    description: str
    required_capability: str    # 'search', 'code', 'write', 'analyze'
    priority: int = 0
    status: str = "available"   # available/claimed/completed/failed
    claimed_by: Optional[str] = None
    result: Optional[str] = None
    created_at: float = field(default_factory=time.time)
    claimed_at: Optional[float] = None


class SharedSwarmState:
    """
    Swarm 共享状态
    所有 Agent 读写同一个共享状态空间
    使用 asyncio.Lock 实现并发安全
    """
    
    def __init__(self):
        self._lock = asyncio.Lock()
        self._tasks: dict[str, TaskItem] = {}
        self._agent_status: dict[str, dict] = {}  # agent_id -> {capability, last_seen, workload}
        self._global_context: dict[str, Any] = {}  # 全局上下文信息
        self._version: int = 0
    
    # ─── 任务管理 ─────────────────────────────────────────────
    
    async def add_task(self, task: TaskItem) -> None:
        async with self._lock:
            self._tasks[task.task_id] = task
            self._version += 1
    
    async def claim_task(self, task_id: str, agent_id: str) -> bool:
        """
        原子性地领取任务(避免竞争条件)
        返回 True 表示成功领取
        """
        async with self._lock:
            task = self._tasks.get(task_id)
            if task is None or task.status != "available":
                return False
            
            task.status = "claimed"
            task.claimed_by = agent_id
            task.claimed_at = time.time()
            self._version += 1
            return True
    
    async def complete_task(self, task_id: str, agent_id: str, result: str) -> bool:
        async with self._lock:
            task = self._tasks.get(task_id)
            if task is None or task.claimed_by != agent_id:
                return False
            
            task.status = "completed"
            task.result = result
            self._version += 1
            return True
    
    async def fail_task(self, task_id: str, agent_id: str, error: str) -> None:
        async with self._lock:
            task = self._tasks.get(task_id)
            if task and task.claimed_by == agent_id:
                task.status = "failed"
                task.result = f"Error: {error}"
                self._version += 1
    
    async def get_available_tasks(self, capability: Optional[str] = None) -> list[TaskItem]:
        """获取可用任务,可按能力过滤"""
        async with self._lock:
            tasks = [t for t in self._tasks.values() if t.status == "available"]
            if capability:
                tasks = [t for t in tasks if t.required_capability == capability]
            return sorted(tasks, key=lambda t: (-t.priority, t.created_at))
    
    async def get_task_results(self) -> dict[str, str]:
        """获取所有已完成任务的结果"""
        async with self._lock:
            return {
                tid: t.result
                for tid, t in self._tasks.items()
                if t.status == "completed" and t.result
            }
    
    # ─── Agent 状态管理 ────────────────────────────────────────
    
    async def register_agent(self, agent_id: str, capability: str) -> None:
        async with self._lock:
            self._agent_status[agent_id] = {
                "capability": capability,
                "last_seen": time.time(),
                "workload": 0,
                "tasks_completed": 0
            }
    
    async def update_agent_heartbeat(self, agent_id: str, workload: int = 0) -> None:
        async with self._lock:
            if agent_id in self._agent_status:
                self._agent_status[agent_id]["last_seen"] = time.time()
                self._agent_status[agent_id]["workload"] = workload
    
    async def get_active_agents(self, timeout: float = 30.0) -> list[dict]:
        """获取最近活跃的 Agent 列表"""
        async with self._lock:
            now = time.time()
            return [
                {"id": aid, **status}
                for aid, status in self._agent_status.items()
                if now - status["last_seen"] < timeout
            ]
    
    # ─── 全局上下文 ────────────────────────────────────────────
    
    async def set_context(self, key: str, value: Any) -> None:
        async with self._lock:
            self._global_context[key] = value
            self._version += 1
    
    async def get_context(self, key: str, default: Any = None) -> Any:
        async with self._lock:
            return self._global_context.get(key, default)
    
    @property
    async def version(self) -> int:
        async with self._lock:
            return self._version

55.4 Swarm Agent 实现

# swarm_agent.py
"""
Swarm Agent 实现
每个 Agent 自主决策,通过消息和共享状态协调
"""

import asyncio
import uuid
import time
from typing import Optional
from openai import AsyncOpenAI

from swarm_messaging import MessageBus, SwarmMessage, MessageType
from swarm_state import SharedSwarmState, TaskItem


class SwarmAgent:
    """
    Swarm Agent:去中心化的自主 Agent
    
    每个 Agent:
    1. 持续监听共享任务队列
    2. 根据自身能力领取适合的任务
    3. 执行任务后广播结果
    4. 监听其他 Agent 的帮助请求
    """
    
    def __init__(
        self,
        agent_id: str,
        capability: str,    # 'search', 'code', 'write', 'analyze'
        model: str,
        client: AsyncOpenAI,
        message_bus: MessageBus,
        shared_state: SharedSwarmState,
        max_concurrent_tasks: int = 2
    ):
        self.agent_id = agent_id
        self.capability = capability
        self.model = model
        self.client = client
        self.bus = message_bus
        self.state = shared_state
        self.max_concurrent = max_concurrent_tasks
        
        # 注册到消息总线
        self.inbox = message_bus.register_agent(agent_id)
        
        self.current_tasks: set[str] = set()
        self.running = False
        
        # 该 Agent 的系统提示词(按能力专业化)
        self.system_prompt = self._build_system_prompt()
    
    def _build_system_prompt(self) -> str:
        prompts = {
            "search": "你是一个信息搜索专家,擅长收集和整理互联网信息。",
            "code": "你是一个代码专家,擅长编写、调试和分析代码。",
            "write": "你是一个写作专家,擅长结构化写作和内容创作。",
            "analyze": "你是一个数据分析专家,擅长处理和分析各类数据。"
        }
        return prompts.get(self.capability, "你是一个通用 AI Agent。")
    
    async def _execute_task(self, task: TaskItem) -> str:
        """执行具体任务"""
        # 获取相关的全局上下文
        context = await self.state.get_context("task_context", {})
        
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": self.system_prompt},
                {
                    "role": "user",
                    "content": f"""任务:{task.description}

全局上下文:{context}

请完成这个任务并返回详细结果。"""
                }
            ],
            temperature=0.2,
            max_tokens=2048
        )
        
        return response.choices[0].message.content or "任务完成"
    
    async def _handle_message(self, msg: SwarmMessage) -> None:
        """处理接收到的消息"""
        if msg.msg_type == MessageType.REQUEST_HELP:
            # 如果有余力,响应帮助请求
            if len(self.current_tasks) < self.max_concurrent:
                content = msg.content
                help_task_id = content.get("task_id")
                
                if help_task_id and await self.state.claim_task(help_task_id, self.agent_id):
                    print(f"[{self.agent_id}] 响应帮助请求,领取任务 {help_task_id}")
                    self.current_tasks.add(help_task_id)
                    
                    task = (await self.state.get_available_tasks())
                    # 实际执行见 run_loop
        
        elif msg.msg_type == MessageType.TASK_RESULT:
            # 记录其他 Agent 完成的任务(可用于学习)
            content = msg.content
            print(f"[{self.agent_id}] 收到 {msg.sender_id} 的任务结果")
    
    async def run_loop(self, stop_event: asyncio.Event) -> None:
        """
        Agent 主运行循环
        持续轮询任务队列并处理消息
        """
        self.running = True
        await self.state.register_agent(self.agent_id, self.capability)
        print(f"[{self.agent_id}] 启动,能力:{self.capability}")
        
        while not stop_event.is_set():
            try:
                # 1. 处理消息队列
                while not self.inbox.empty():
                    msg = self.inbox.get_nowait()
                    await self._handle_message(msg)
                
                # 2. 如果当前任务数未达上限,尝试领取新任务
                if len(self.current_tasks) < self.max_concurrent:
                    available = await self.state.get_available_tasks(self.capability)
                    
                    for task in available:
                        if task.task_id not in self.current_tasks:
                            if await self.state.claim_task(task.task_id, self.agent_id):
                                self.current_tasks.add(task.task_id)
                                print(f"[{self.agent_id}] 领取任务: {task.task_id} - {task.description[:50]}")
                                
                                # 异步执行任务
                                asyncio.create_task(
                                    self._run_task(task)
                                )
                                break
                
                # 3. 发送心跳
                await self.state.update_agent_heartbeat(
                    self.agent_id, 
                    workload=len(self.current_tasks)
                )
                
                await asyncio.sleep(0.5)  # 轮询间隔
                
            except Exception as e:
                print(f"[{self.agent_id}] 运行时错误: {e}")
                await asyncio.sleep(1)
        
        self.running = False
        print(f"[{self.agent_id}] 停止运行")
    
    async def _run_task(self, task: TaskItem) -> None:
        """异步执行单个任务"""
        try:
            result = await self._execute_task(task)
            await self.state.complete_task(task.task_id, self.agent_id, result)
            
            # 广播任务完成
            await self.bus.publish(SwarmMessage(
                sender_id=self.agent_id,
                msg_type=MessageType.TASK_RESULT,
                content={"task_id": task.task_id, "result_preview": result[:100]}
            ))
            
            print(f"[{self.agent_id}] 完成任务: {task.task_id}")
            
        except Exception as e:
            await self.state.fail_task(task.task_id, self.agent_id, str(e))
            print(f"[{self.agent_id}] 任务 {task.task_id} 失败: {e}")
        
        finally:
            self.current_tasks.discard(task.task_id)

55.5 Swarm 完整运行示例

# swarm_demo.py
"""
Swarm 模式完整演示
启动多个 Agent,并发处理任务队列
"""

import asyncio
import uuid
from openai import AsyncOpenAI
from swarm_messaging import MessageBus
from swarm_state import SharedSwarmState, TaskItem
from swarm_agent import SwarmAgent


async def run_swarm(task_list: list[dict], num_agents_per_type: int = 2):
    """
    启动 Swarm 集群,处理任务列表
    """
    client = AsyncOpenAI(base_url="http://localhost:8000/v1", api_key="not-needed")
    model = "NousResearch/Hermes-3-Llama-3.1-8B"
    
    # 初始化共享基础设施
    bus = MessageBus()
    state = SharedSwarmState()
    
    # 设置全局上下文
    await state.set_context("task_context", {"project": "AI市场分析", "language": "中文"})
    
    # 创建 Agent 集群
    agents = []
    capabilities = ["search", "analyze", "write"]
    
    for capability in capabilities:
        for i in range(num_agents_per_type):
            agent_id = f"{capability}_{i+1}"
            agent = SwarmAgent(
                agent_id=agent_id,
                capability=capability,
                model=model,
                client=client,
                message_bus=bus,
                shared_state=state
            )
            agents.append(agent)
    
    # 将任务添加到共享队列
    for task_data in task_list:
        task = TaskItem(
            task_id=uuid.uuid4().hex[:8],
            description=task_data["description"],
            required_capability=task_data["capability"],
            priority=task_data.get("priority", 0)
        )
        await state.add_task(task)
    
    print(f"\n[Swarm] 启动 {len(agents)} 个 Agent,处理 {len(task_list)} 个任务")
    
    # 启动所有 Agent 的运行循环
    stop_event = asyncio.Event()
    
    agent_tasks = [
        asyncio.create_task(agent.run_loop(stop_event))
        for agent in agents
    ]
    
    # 等待所有任务完成(最多等待 120 秒)
    start_time = asyncio.get_event_loop().time()
    while (asyncio.get_event_loop().time() - start_time) < 120:
        available = await state.get_available_tasks()
        results = await state.get_task_results()
        
        if len(results) >= len(task_list):
            print(f"\n[Swarm] 所有任务完成!")
            break
        
        claimed = sum(1 for t in state._tasks.values() if t.status == "claimed")
        completed = len(results)
        print(f"[Swarm] 进度: {completed}/{len(task_list)} 完成, {claimed} 执行中, {len(available)} 待处理")
        
        await asyncio.sleep(5)
    
    # 停止所有 Agent
    stop_event.set()
    await asyncio.gather(*agent_tasks, return_exceptions=True)
    
    # 汇总结果
    final_results = await state.get_task_results()
    print(f"\n[Swarm] 完成任务: {len(final_results)}/{len(task_list)}")
    
    return final_results


async def main():
    # 定义任务列表
    tasks = [
        {"description": "搜索 OpenAI 2024年最新产品和功能更新", "capability": "search", "priority": 1},
        {"description": "搜索 Anthropic Claude 2024年的发展动态", "capability": "search", "priority": 1},
        {"description": "搜索 Google Gemini 的最新进展", "capability": "search", "priority": 1},
        {"description": "分析三大AI公司的市场份额变化趋势", "capability": "analyze", "priority": 0},
        {"description": "分析各模型在代码、推理、创意写作三个维度的表现对比", "capability": "analyze", "priority": 0},
        {"description": "撰写'2024年大模型竞争格局'分析报告执行摘要(500字)", "capability": "write", "priority": -1},
    ]
    
    results = await run_swarm(tasks, num_agents_per_type=2)
    
    print("\n" + "="*60)
    print("Swarm 执行结果摘要")
    print("="*60)
    for task_id, result in results.items():
        print(f"\n任务 {task_id}:\n{result[:200]}...")


if __name__ == "__main__":
    asyncio.run(main())

55.6 与 Orchestrator 模式的选型对比

flowchart LR
    Q1{任务是否\n高度结构化?}
    Q1 -->|是| Q2{是否需要\n严格顺序控制?}
    Q1 -->|否| Q3{是否要求\n高可用性?}
    
    Q2 -->|是| ORC[Orchestrator 模式]
    Q2 -->|否| Q4{任务数量\n是否很多?}
    
    Q3 -->|是| SWARM[Swarm 模式]
    Q3 -->|否| ORC
    
    Q4 -->|是| SWARM
    Q4 -->|否| ORC
选型维度 Orchestrator Swarm
任务类型 结构化、有明确依赖关系 并行、相对独立
规模 小规模(< 10 个 Worker) 大规模(> 10 个 Agent)
一致性要求 高(需要精确控制执行顺序) 低(最终一致即可)
容错需求 低(有中央协调) 高(无中央依赖)
实现复杂度
调试难度 低(中心化日志) 高(分布式追踪)
延迟 有额外协调延迟 更低(直接执行)
动态扩展 需要更新 Orchestrator 直接添加新 Agent

55.7 Hermes 当前限制与变通方案

当前限制

Hermes Agent 目前对 Swarm 的原生支持有限:

  1. 无内置消息总线:Agent 之间没有直接通信机制,需要手动实现
  2. 无共享上下文:子 Agent 的上下文完全隔离,不能直接读取彼此的记忆
  3. 无身份系统:计划中支持有身份的专用 Agent,但当前版本尚未实现

变通方案

限制 变通方案
无消息总线 使用 Redis Pub/Sub 或 RabbitMQ 实现外部消息总线
无共享上下文 使用 Redis Hash 实现共享状态存储
无身份系统 用约定好的 agent_id 前缀区分 Agent 类型
无协调机制 用共享任务队列(Redis List)实现去中心化任务分配
# redis_swarm_bus.py
"""使用 Redis 实现生产级 Swarm 消息总线"""

import redis.asyncio as aioredis
import json

class RedisSwarmBus:
    """基于 Redis Pub/Sub 的生产级消息总线"""
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = aioredis.from_url(redis_url)
        self.CHANNEL_PREFIX = "hermes:swarm:"
    
    async def publish(self, channel: str, message: dict) -> None:
        full_channel = f"{self.CHANNEL_PREFIX}{channel}"
        await self.redis.publish(full_channel, json.dumps(message))
    
    async def subscribe(self, agent_id: str, capabilities: list[str]):
        """订阅与 Agent 能力相关的频道"""
        pubsub = self.redis.pubsub()
        channels = [
            f"{self.CHANNEL_PREFIX}broadcast",
            f"{self.CHANNEL_PREFIX}agent:{agent_id}"
        ] + [f"{self.CHANNEL_PREFIX}capability:{cap}" for cap in capabilities]
        
        await pubsub.subscribe(*channels)
        return pubsub
    
    async def claim_task(self, task_id: str, agent_id: str, timeout: int = 30) -> bool:
        """使用 Redis SET NX 实现原子性任务领取"""
        key = f"hermes:swarm:claim:{task_id}"
        result = await self.redis.set(key, agent_id, nx=True, ex=timeout)
        return result is True

小结

本章深入探讨了 Swarm 去中心化多 Agent 模式:

  1. 去中心化哲学:Swarm 消除了单点故障,每个 Agent 自主决策,涌现出复杂协调行为。
  2. 消息传递协议:基于 asyncio.Queue 的消息总线支持广播、点对点、订阅/发布三种模式。
  3. 共享状态设计:使用 asyncio.Lock 实现并发安全的任务领取,避免竞争条件。
  4. 与 Orchestrator 对比:Swarm 适合大规模并行任务,Orchestrator 适合结构化依赖任务。
  5. 变通方案:Redis Pub/Sub + 共享任务队列是 Hermes 当前架构下实现 Swarm 的最佳路径。

思考题

  1. Swarm 模式如何处理任务的优先级?如果某个高优先级任务长时间没有被领取,如何处理?
  2. 当一个 Agent 意外崩溃时,它已经领取但未完成的任务如何恢复?需要什么机制?
  3. Swarm 中的"涌现行为"在 AI Agent 场景下有什么具体表现?是否有可能产生不期望的涌现行为?
  4. 如果需要在 Swarm 中引入一个"监控 Agent"来追踪系统整体状态,这是否会让系统重新变成中心化架构?
本章评分
4.6  / 5  (3 评分)

💬 留言讨论