Multi-Agent Collaboration: Swarm Pattern
Chapter 55: Multi-Agent Collaboration: Swarm Pattern
Introduction
The Orchestrator pattern relies on a centralized master Agent to coordinate all work—clear and intuitive by design, but it introduces single-point-of-failure and scaling bottleneck risks. Swarm offers an alternative philosophy: no central controller, each Agent makes decisions based on local information, and global coordination emerges through message passing and shared state. This decentralized design appears throughout nature—the collective intelligence of ants, bees, and bird flocks is the natural inspiration for the Swarm pattern. This chapter explores Swarm's design principles, implementation challenges, and workarounds for the current Hermes architecture.
55.1 Swarm Decentralized Design Philosophy
Why Decentralization
Orchestrator's potential problems:
| Problem | Description | Impact |
|---|---|---|
| Single point of failure | Orchestrator crash stops the entire system | Availability drops |
| Scaling bottleneck | All tasks must route through the Orchestrator | Throughput limited |
| Cognitive overload | Orchestrator must know all Worker capabilities | System prompt complexity |
| Added latency | Every task incurs coordination overhead | Response time increases |
Swarm's core idea: Emergence—complex global behavior arises from simple local rules, without any central controller.
flowchart TD
subgraph Orch["Orchestrator (Centralized)"]
O[Orchestrator] --> W1[Worker 1]
O --> W2[Worker 2]
O --> W3[Worker 3]
W1 --> O
W2 --> O
W3 --> O
end
subgraph Swarm["Swarm (Decentralized)"]
S1[Agent 1] <--> S2[Agent 2]
S2 <--> S3[Agent 3]
S3 <--> S1
S1 <--> SharedState[(Shared State)]
S2 <--> SharedState
S3 <--> SharedState
end
Four Core Swarm Principles
- Local Perception: Each Agent sees only local information; no global view needed
- Simple Rules: Each Agent's behavior is simple, but collective complexity emerges
- Message Passing: Agents coordinate via messages; no shared memory needed
- Self-Organization: Task assignment is self-determined, not centrally mandated
55.2 Agent Message Passing Protocol
# swarm_messaging.py
import asyncio
import json
import time
import uuid
from dataclasses import dataclass, field
from typing import Optional, Any
from enum import Enum
class MessageType(Enum):
BROADCAST = "broadcast"
DIRECT = "direct"
TASK_CLAIM = "task_claim"
TASK_RESULT = "task_result"
HEARTBEAT = "heartbeat"
REQUEST_HELP = "request_help"
@dataclass
class SwarmMessage:
msg_id: str = field(default_factory=lambda: uuid.uuid4().hex[:8])
sender_id: str = ""
receiver_id: Optional[str] = None
msg_type: MessageType = MessageType.BROADCAST
content: Any = None
timestamp: float = field(default_factory=time.time)
priority: int = 0
class MessageBus:
"""Swarm message bus implementing publish/subscribe routing."""
def __init__(self):
self._queues: dict[str, asyncio.Queue] = {}
self._history: list[SwarmMessage] = []
def register_agent(self, agent_id: str) -> asyncio.Queue:
queue = asyncio.Queue(maxsize=100)
self._queues[agent_id] = queue
return queue
async def publish(self, message: SwarmMessage) -> int:
self._history.append(message)
if len(self._history) > 1000:
self._history.pop(0)
delivered = 0
if message.receiver_id is None:
# Broadcast
for agent_id, queue in self._queues.items():
if agent_id != message.sender_id:
try:
queue.put_nowait(message)
delivered += 1
except asyncio.QueueFull:
pass
elif message.receiver_id in self._queues:
# Point-to-point
try:
self._queues[message.receiver_id].put_nowait(message)
delivered = 1
except asyncio.QueueFull:
pass
return delivered
@property
def active_agents(self) -> list:
return list(self._queues.keys())
55.3 Shared State Management
# swarm_state.py
import asyncio
import time
from typing import Any, Optional
from dataclasses import dataclass, field
@dataclass
class TaskItem:
task_id: str
description: str
required_capability: str
priority: int = 0
status: str = "available"
claimed_by: Optional[str] = None
result: Optional[str] = None
created_at: float = field(default_factory=time.time)
claimed_at: Optional[float] = None
class SharedSwarmState:
"""Thread-safe shared state using asyncio.Lock for concurrent access."""
def __init__(self):
self._lock = asyncio.Lock()
self._tasks: dict[str, TaskItem] = {}
self._agent_status: dict[str, dict] = {}
self._global_context: dict[str, Any] = {}
async def add_task(self, task: TaskItem) -> None:
async with self._lock:
self._tasks[task.task_id] = task
async def claim_task(self, task_id: str, agent_id: str) -> bool:
"""Atomically claim a task. Returns True on success."""
async with self._lock:
task = self._tasks.get(task_id)
if task is None or task.status != "available":
return False
task.status = "claimed"
task.claimed_by = agent_id
task.claimed_at = time.time()
return True
async def complete_task(self, task_id: str, agent_id: str, result: str) -> bool:
async with self._lock:
task = self._tasks.get(task_id)
if task is None or task.claimed_by != agent_id:
return False
task.status = "completed"
task.result = result
return True
async def fail_task(self, task_id: str, agent_id: str, error: str) -> None:
async with self._lock:
task = self._tasks.get(task_id)
if task and task.claimed_by == agent_id:
task.status = "failed"
task.result = f"Error: {error}"
async def get_available_tasks(self, capability: Optional[str] = None) -> list:
async with self._lock:
tasks = [t for t in self._tasks.values() if t.status == "available"]
if capability:
tasks = [t for t in tasks if t.required_capability == capability]
return sorted(tasks, key=lambda t: (-t.priority, t.created_at))
async def get_task_results(self) -> dict:
async with self._lock:
return {tid: t.result for tid, t in self._tasks.items()
if t.status == "completed" and t.result}
async def register_agent(self, agent_id: str, capability: str) -> None:
async with self._lock:
self._agent_status[agent_id] = {
"capability": capability, "last_seen": time.time(), "workload": 0
}
async def update_heartbeat(self, agent_id: str, workload: int = 0) -> None:
async with self._lock:
if agent_id in self._agent_status:
self._agent_status[agent_id].update({"last_seen": time.time(), "workload": workload})
async def set_context(self, key: str, value: Any) -> None:
async with self._lock:
self._global_context[key] = value
async def get_context(self, key: str, default: Any = None) -> Any:
async with self._lock:
return self._global_context.get(key, default)
55.4 Swarm Agent Implementation
# swarm_agent.py
import asyncio
from openai import AsyncOpenAI
from swarm_messaging import MessageBus, SwarmMessage, MessageType
from swarm_state import SharedSwarmState, TaskItem
class SwarmAgent:
"""
Decentralized Swarm Agent: autonomously decides which tasks to claim,
coordinates with peers through messages and shared state.
"""
CAPABILITY_PROMPTS = {
"search": "You are an information search expert specializing in collecting and organizing online information.",
"code": "You are a coding expert specializing in writing, debugging, and analyzing code.",
"write": "You are a writing expert specializing in structured writing and content creation.",
"analyze": "You are a data analysis expert specializing in processing and analyzing various data types."
}
def __init__(self, agent_id: str, capability: str, model: str,
client: AsyncOpenAI, message_bus: MessageBus,
shared_state: SharedSwarmState, max_concurrent: int = 2):
self.agent_id = agent_id
self.capability = capability
self.model = model
self.client = client
self.bus = message_bus
self.state = shared_state
self.max_concurrent = max_concurrent
self.inbox = message_bus.register_agent(agent_id)
self.current_tasks: set = set()
self.system_prompt = self.CAPABILITY_PROMPTS.get(capability, "You are a general AI Agent.")
async def _execute_task(self, task: TaskItem) -> str:
context = await self.state.get_context("task_context", {})
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": f"Task: {task.description}\n\nContext: {context}\n\nComplete this task thoroughly."}
],
temperature=0.2,
max_tokens=2048
)
return response.choices[0].message.content or "Task complete"
async def _run_task(self, task: TaskItem) -> None:
try:
result = await self._execute_task(task)
await self.state.complete_task(task.task_id, self.agent_id, result)
await self.bus.publish(SwarmMessage(
sender_id=self.agent_id,
msg_type=MessageType.TASK_RESULT,
content={"task_id": task.task_id, "preview": result[:100]}
))
print(f"[{self.agent_id}] Completed task: {task.task_id}")
except Exception as e:
await self.state.fail_task(task.task_id, self.agent_id, str(e))
finally:
self.current_tasks.discard(task.task_id)
async def run_loop(self, stop_event: asyncio.Event) -> None:
await self.state.register_agent(self.agent_id, self.capability)
print(f"[{self.agent_id}] Started, capability: {self.capability}")
while not stop_event.is_set():
try:
# Process incoming messages
while not self.inbox.empty():
msg = self.inbox.get_nowait()
if msg.msg_type == MessageType.REQUEST_HELP:
help_task_id = msg.content.get("task_id")
if (help_task_id and len(self.current_tasks) < self.max_concurrent
and await self.state.claim_task(help_task_id, self.agent_id)):
self.current_tasks.add(help_task_id)
# Claim new tasks if capacity available
if len(self.current_tasks) < self.max_concurrent:
available = await self.state.get_available_tasks(self.capability)
for task in available:
if task.task_id not in self.current_tasks:
if await self.state.claim_task(task.task_id, self.agent_id):
self.current_tasks.add(task.task_id)
print(f"[{self.agent_id}] Claimed: {task.task_id} - {task.description[:50]}")
asyncio.create_task(self._run_task(task))
break
await self.state.update_heartbeat(self.agent_id, len(self.current_tasks))
await asyncio.sleep(0.5)
except Exception as e:
print(f"[{self.agent_id}] Runtime error: {e}")
await asyncio.sleep(1)
print(f"[{self.agent_id}] Stopped")
async def run_swarm(task_list: list, num_per_type: int = 2):
from openai import AsyncOpenAI
import uuid
client = AsyncOpenAI(base_url="http://localhost:8000/v1", api_key="not-needed")
model = "NousResearch/Hermes-3-Llama-3.1-8B"
bus = MessageBus()
state = SharedSwarmState()
await state.set_context("task_context", {"project": "AI Market Analysis"})
agents = []
for capability in ["search", "analyze", "write"]:
for i in range(num_per_type):
agents.append(SwarmAgent(
agent_id=f"{capability}_{i+1}",
capability=capability,
model=model,
client=client,
message_bus=bus,
shared_state=state
))
for td in task_list:
await state.add_task(TaskItem(
task_id=uuid.uuid4().hex[:8],
description=td["description"],
required_capability=td["capability"],
priority=td.get("priority", 0)
))
print(f"\n[Swarm] Starting {len(agents)} agents for {len(task_list)} tasks")
stop = asyncio.Event()
agent_tasks = [asyncio.create_task(a.run_loop(stop)) for a in agents]
import time
start = time.time()
while time.time() - start < 120:
results = await state.get_task_results()
if len(results) >= len(task_list):
print("[Swarm] All tasks complete!")
break
await asyncio.sleep(5)
stop.set()
await asyncio.gather(*agent_tasks, return_exceptions=True)
return await state.get_task_results()
55.5 Orchestrator vs. Swarm Selection Guide
| Selection Dimension | Orchestrator | Swarm |
|---|---|---|
| Task type | Structured, with clear dependencies | Parallel, relatively independent |
| Scale | Small (<10 Workers) | Large (>10 Agents) |
| Consistency | High (precise execution order needed) | Low (eventual consistency OK) |
| Fault tolerance | Lower (central coordinator) | Higher (no central dependency) |
| Implementation complexity | Low | High |
| Debug difficulty | Low (centralized logging) | High (distributed tracing) |
| Latency | Added coordination overhead | Lower (direct execution) |
| Dynamic scaling | Requires Orchestrator update | Add new Agents directly |
55.6 Hermes Current Limitations and Workarounds
Current Limitations
- No built-in message bus: No direct communication mechanism between Agents
- No shared context: Sub-Agent contexts are fully isolated
- No identity system: Named specialized Agents are planned but not yet implemented
Workarounds
| Limitation | Workaround |
|---|---|
| No message bus | Redis Pub/Sub or RabbitMQ as external message bus |
| No shared context | Redis Hash for shared state storage |
| No identity system | Convention-based agent_id prefixes to distinguish Agent types |
| No coordination | Redis List as shared task queue for decentralized assignment |
# redis_swarm_bus.py
import redis.asyncio as aioredis
import json
class RedisSwarmBus:
"""Production-grade Swarm message bus using Redis Pub/Sub."""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = aioredis.from_url(redis_url)
self.PREFIX = "hermes:swarm:"
async def publish(self, channel: str, message: dict) -> None:
await self.redis.publish(f"{self.PREFIX}{channel}", json.dumps(message))
async def claim_task(self, task_id: str, agent_id: str, timeout: int = 30) -> bool:
"""Atomic task claim using Redis SET NX."""
key = f"{self.PREFIX}claim:{task_id}"
result = await self.redis.set(key, agent_id, nx=True, ex=timeout)
return result is True
Summary
This chapter provided an in-depth exploration of the Swarm decentralized multi-Agent pattern:
- Decentralization philosophy: Swarm eliminates single points of failure; each Agent decides autonomously, and complex coordination emerges from simple rules.
- Message passing protocol:
asyncio.Queue-based message bus supports broadcast, point-to-point, and pub/sub communication. - Shared state design:
asyncio.Lockenables concurrent-safe task claiming, preventing race conditions. - Orchestrator vs. Swarm: Swarm suits large-scale parallel tasks; Orchestrator suits structured dependency tasks.
- Workarounds: Redis Pub/Sub + shared task queue is the best path to implementing Swarm under the current Hermes architecture.
Review Questions
- How does Swarm handle task priority? If a high-priority task goes unclaimed for a long time, what mechanism should handle it?
- When an Agent crashes unexpectedly, how are its claimed-but-incomplete tasks recovered? What mechanism is needed?
- What concrete manifestations of "emergent behavior" exist in the AI Agent Swarm context? Is it possible to produce undesired emergent behaviors?
- If you introduce a "monitor Agent" in the Swarm to track overall system state, does this re-centralize the architecture?