Chapter 55

Multi-Agent Collaboration: Swarm Pattern

Chapter 55: Multi-Agent Collaboration: Swarm Pattern

Introduction

The Orchestrator pattern relies on a centralized master Agent to coordinate all workโ€”clear and intuitive by design, but it introduces single-point-of-failure and scaling bottleneck risks. Swarm offers an alternative philosophy: no central controller, each Agent makes decisions based on local information, and global coordination emerges through message passing and shared state. This decentralized design appears throughout natureโ€”the collective intelligence of ants, bees, and bird flocks is the natural inspiration for the Swarm pattern. This chapter explores Swarm's design principles, implementation challenges, and workarounds for the current Hermes architecture.


55.1 Swarm Decentralized Design Philosophy

Why Decentralization

Orchestrator's potential problems:

Problem Description Impact
Single point of failure Orchestrator crash stops the entire system Availability drops
Scaling bottleneck All tasks must route through the Orchestrator Throughput limited
Cognitive overload Orchestrator must know all Worker capabilities System prompt complexity
Added latency Every task incurs coordination overhead Response time increases

Swarm's core idea: Emergenceโ€”complex global behavior arises from simple local rules, without any central controller.

flowchart TD
    subgraph Orch["Orchestrator (Centralized)"]
        O[Orchestrator] --> W1[Worker 1]
        O --> W2[Worker 2]
        O --> W3[Worker 3]
        W1 --> O
        W2 --> O
        W3 --> O
    end
    
    subgraph Swarm["Swarm (Decentralized)"]
        S1[Agent 1] <--> S2[Agent 2]
        S2 <--> S3[Agent 3]
        S3 <--> S1
        S1 <--> SharedState[(Shared State)]
        S2 <--> SharedState
        S3 <--> SharedState
    end

Four Core Swarm Principles

  1. Local Perception: Each Agent sees only local information; no global view needed
  2. Simple Rules: Each Agent's behavior is simple, but collective complexity emerges
  3. Message Passing: Agents coordinate via messages; no shared memory needed
  4. Self-Organization: Task assignment is self-determined, not centrally mandated

55.2 Agent Message Passing Protocol

# swarm_messaging.py
import asyncio
import json
import time
import uuid
from dataclasses import dataclass, field
from typing import Optional, Any
from enum import Enum


class MessageType(Enum):
    BROADCAST = "broadcast"
    DIRECT = "direct"
    TASK_CLAIM = "task_claim"
    TASK_RESULT = "task_result"
    HEARTBEAT = "heartbeat"
    REQUEST_HELP = "request_help"


@dataclass
class SwarmMessage:
    msg_id: str = field(default_factory=lambda: uuid.uuid4().hex[:8])
    sender_id: str = ""
    receiver_id: Optional[str] = None
    msg_type: MessageType = MessageType.BROADCAST
    content: Any = None
    timestamp: float = field(default_factory=time.time)
    priority: int = 0


class MessageBus:
    """Swarm message bus implementing publish/subscribe routing."""
    
    def __init__(self):
        self._queues: dict[str, asyncio.Queue] = {}
        self._history: list[SwarmMessage] = []
    
    def register_agent(self, agent_id: str) -> asyncio.Queue:
        queue = asyncio.Queue(maxsize=100)
        self._queues[agent_id] = queue
        return queue
    
    async def publish(self, message: SwarmMessage) -> int:
        self._history.append(message)
        if len(self._history) > 1000:
            self._history.pop(0)
        
        delivered = 0
        if message.receiver_id is None:
            # Broadcast
            for agent_id, queue in self._queues.items():
                if agent_id != message.sender_id:
                    try:
                        queue.put_nowait(message)
                        delivered += 1
                    except asyncio.QueueFull:
                        pass
        elif message.receiver_id in self._queues:
            # Point-to-point
            try:
                self._queues[message.receiver_id].put_nowait(message)
                delivered = 1
            except asyncio.QueueFull:
                pass
        
        return delivered
    
    @property
    def active_agents(self) -> list:
        return list(self._queues.keys())

55.3 Shared State Management

# swarm_state.py
import asyncio
import time
from typing import Any, Optional
from dataclasses import dataclass, field


@dataclass
class TaskItem:
    task_id: str
    description: str
    required_capability: str
    priority: int = 0
    status: str = "available"
    claimed_by: Optional[str] = None
    result: Optional[str] = None
    created_at: float = field(default_factory=time.time)
    claimed_at: Optional[float] = None


class SharedSwarmState:
    """Thread-safe shared state using asyncio.Lock for concurrent access."""
    
    def __init__(self):
        self._lock = asyncio.Lock()
        self._tasks: dict[str, TaskItem] = {}
        self._agent_status: dict[str, dict] = {}
        self._global_context: dict[str, Any] = {}
    
    async def add_task(self, task: TaskItem) -> None:
        async with self._lock:
            self._tasks[task.task_id] = task
    
    async def claim_task(self, task_id: str, agent_id: str) -> bool:
        """Atomically claim a task. Returns True on success."""
        async with self._lock:
            task = self._tasks.get(task_id)
            if task is None or task.status != "available":
                return False
            task.status = "claimed"
            task.claimed_by = agent_id
            task.claimed_at = time.time()
            return True
    
    async def complete_task(self, task_id: str, agent_id: str, result: str) -> bool:
        async with self._lock:
            task = self._tasks.get(task_id)
            if task is None or task.claimed_by != agent_id:
                return False
            task.status = "completed"
            task.result = result
            return True
    
    async def fail_task(self, task_id: str, agent_id: str, error: str) -> None:
        async with self._lock:
            task = self._tasks.get(task_id)
            if task and task.claimed_by == agent_id:
                task.status = "failed"
                task.result = f"Error: {error}"
    
    async def get_available_tasks(self, capability: Optional[str] = None) -> list:
        async with self._lock:
            tasks = [t for t in self._tasks.values() if t.status == "available"]
            if capability:
                tasks = [t for t in tasks if t.required_capability == capability]
            return sorted(tasks, key=lambda t: (-t.priority, t.created_at))
    
    async def get_task_results(self) -> dict:
        async with self._lock:
            return {tid: t.result for tid, t in self._tasks.items()
                    if t.status == "completed" and t.result}
    
    async def register_agent(self, agent_id: str, capability: str) -> None:
        async with self._lock:
            self._agent_status[agent_id] = {
                "capability": capability, "last_seen": time.time(), "workload": 0
            }
    
    async def update_heartbeat(self, agent_id: str, workload: int = 0) -> None:
        async with self._lock:
            if agent_id in self._agent_status:
                self._agent_status[agent_id].update({"last_seen": time.time(), "workload": workload})
    
    async def set_context(self, key: str, value: Any) -> None:
        async with self._lock:
            self._global_context[key] = value
    
    async def get_context(self, key: str, default: Any = None) -> Any:
        async with self._lock:
            return self._global_context.get(key, default)

55.4 Swarm Agent Implementation

# swarm_agent.py
import asyncio
from openai import AsyncOpenAI
from swarm_messaging import MessageBus, SwarmMessage, MessageType
from swarm_state import SharedSwarmState, TaskItem


class SwarmAgent:
    """
    Decentralized Swarm Agent: autonomously decides which tasks to claim,
    coordinates with peers through messages and shared state.
    """
    
    CAPABILITY_PROMPTS = {
        "search": "You are an information search expert specializing in collecting and organizing online information.",
        "code": "You are a coding expert specializing in writing, debugging, and analyzing code.",
        "write": "You are a writing expert specializing in structured writing and content creation.",
        "analyze": "You are a data analysis expert specializing in processing and analyzing various data types."
    }
    
    def __init__(self, agent_id: str, capability: str, model: str,
                 client: AsyncOpenAI, message_bus: MessageBus,
                 shared_state: SharedSwarmState, max_concurrent: int = 2):
        self.agent_id = agent_id
        self.capability = capability
        self.model = model
        self.client = client
        self.bus = message_bus
        self.state = shared_state
        self.max_concurrent = max_concurrent
        self.inbox = message_bus.register_agent(agent_id)
        self.current_tasks: set = set()
        self.system_prompt = self.CAPABILITY_PROMPTS.get(capability, "You are a general AI Agent.")
    
    async def _execute_task(self, task: TaskItem) -> str:
        context = await self.state.get_context("task_context", {})
        response = await self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": self.system_prompt},
                {"role": "user", "content": f"Task: {task.description}\n\nContext: {context}\n\nComplete this task thoroughly."}
            ],
            temperature=0.2,
            max_tokens=2048
        )
        return response.choices[0].message.content or "Task complete"
    
    async def _run_task(self, task: TaskItem) -> None:
        try:
            result = await self._execute_task(task)
            await self.state.complete_task(task.task_id, self.agent_id, result)
            await self.bus.publish(SwarmMessage(
                sender_id=self.agent_id,
                msg_type=MessageType.TASK_RESULT,
                content={"task_id": task.task_id, "preview": result[:100]}
            ))
            print(f"[{self.agent_id}] Completed task: {task.task_id}")
        except Exception as e:
            await self.state.fail_task(task.task_id, self.agent_id, str(e))
        finally:
            self.current_tasks.discard(task.task_id)
    
    async def run_loop(self, stop_event: asyncio.Event) -> None:
        await self.state.register_agent(self.agent_id, self.capability)
        print(f"[{self.agent_id}] Started, capability: {self.capability}")
        
        while not stop_event.is_set():
            try:
                # Process incoming messages
                while not self.inbox.empty():
                    msg = self.inbox.get_nowait()
                    if msg.msg_type == MessageType.REQUEST_HELP:
                        help_task_id = msg.content.get("task_id")
                        if (help_task_id and len(self.current_tasks) < self.max_concurrent
                                and await self.state.claim_task(help_task_id, self.agent_id)):
                            self.current_tasks.add(help_task_id)
                
                # Claim new tasks if capacity available
                if len(self.current_tasks) < self.max_concurrent:
                    available = await self.state.get_available_tasks(self.capability)
                    for task in available:
                        if task.task_id not in self.current_tasks:
                            if await self.state.claim_task(task.task_id, self.agent_id):
                                self.current_tasks.add(task.task_id)
                                print(f"[{self.agent_id}] Claimed: {task.task_id} - {task.description[:50]}")
                                asyncio.create_task(self._run_task(task))
                                break
                
                await self.state.update_heartbeat(self.agent_id, len(self.current_tasks))
                await asyncio.sleep(0.5)
                
            except Exception as e:
                print(f"[{self.agent_id}] Runtime error: {e}")
                await asyncio.sleep(1)
        
        print(f"[{self.agent_id}] Stopped")


async def run_swarm(task_list: list, num_per_type: int = 2):
    from openai import AsyncOpenAI
    import uuid
    
    client = AsyncOpenAI(base_url="http://localhost:8000/v1", api_key="not-needed")
    model = "NousResearch/Hermes-3-Llama-3.1-8B"
    bus = MessageBus()
    state = SharedSwarmState()
    
    await state.set_context("task_context", {"project": "AI Market Analysis"})
    
    agents = []
    for capability in ["search", "analyze", "write"]:
        for i in range(num_per_type):
            agents.append(SwarmAgent(
                agent_id=f"{capability}_{i+1}",
                capability=capability,
                model=model,
                client=client,
                message_bus=bus,
                shared_state=state
            ))
    
    for td in task_list:
        await state.add_task(TaskItem(
            task_id=uuid.uuid4().hex[:8],
            description=td["description"],
            required_capability=td["capability"],
            priority=td.get("priority", 0)
        ))
    
    print(f"\n[Swarm] Starting {len(agents)} agents for {len(task_list)} tasks")
    stop = asyncio.Event()
    agent_tasks = [asyncio.create_task(a.run_loop(stop)) for a in agents]
    
    import time
    start = time.time()
    while time.time() - start < 120:
        results = await state.get_task_results()
        if len(results) >= len(task_list):
            print("[Swarm] All tasks complete!")
            break
        await asyncio.sleep(5)
    
    stop.set()
    await asyncio.gather(*agent_tasks, return_exceptions=True)
    return await state.get_task_results()

55.5 Orchestrator vs. Swarm Selection Guide

Selection Dimension Orchestrator Swarm
Task type Structured, with clear dependencies Parallel, relatively independent
Scale Small (<10 Workers) Large (>10 Agents)
Consistency High (precise execution order needed) Low (eventual consistency OK)
Fault tolerance Lower (central coordinator) Higher (no central dependency)
Implementation complexity Low High
Debug difficulty Low (centralized logging) High (distributed tracing)
Latency Added coordination overhead Lower (direct execution)
Dynamic scaling Requires Orchestrator update Add new Agents directly

55.6 Hermes Current Limitations and Workarounds

Current Limitations

  1. No built-in message bus: No direct communication mechanism between Agents
  2. No shared context: Sub-Agent contexts are fully isolated
  3. No identity system: Named specialized Agents are planned but not yet implemented

Workarounds

Limitation Workaround
No message bus Redis Pub/Sub or RabbitMQ as external message bus
No shared context Redis Hash for shared state storage
No identity system Convention-based agent_id prefixes to distinguish Agent types
No coordination Redis List as shared task queue for decentralized assignment
# redis_swarm_bus.py
import redis.asyncio as aioredis
import json

class RedisSwarmBus:
    """Production-grade Swarm message bus using Redis Pub/Sub."""
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = aioredis.from_url(redis_url)
        self.PREFIX = "hermes:swarm:"
    
    async def publish(self, channel: str, message: dict) -> None:
        await self.redis.publish(f"{self.PREFIX}{channel}", json.dumps(message))
    
    async def claim_task(self, task_id: str, agent_id: str, timeout: int = 30) -> bool:
        """Atomic task claim using Redis SET NX."""
        key = f"{self.PREFIX}claim:{task_id}"
        result = await self.redis.set(key, agent_id, nx=True, ex=timeout)
        return result is True

Summary

This chapter provided an in-depth exploration of the Swarm decentralized multi-Agent pattern:

  1. Decentralization philosophy: Swarm eliminates single points of failure; each Agent decides autonomously, and complex coordination emerges from simple rules.
  2. Message passing protocol: asyncio.Queue-based message bus supports broadcast, point-to-point, and pub/sub communication.
  3. Shared state design: asyncio.Lock enables concurrent-safe task claiming, preventing race conditions.
  4. Orchestrator vs. Swarm: Swarm suits large-scale parallel tasks; Orchestrator suits structured dependency tasks.
  5. Workarounds: Redis Pub/Sub + shared task queue is the best path to implementing Swarm under the current Hermes architecture.

Review Questions

  1. How does Swarm handle task priority? If a high-priority task goes unclaimed for a long time, what mechanism should handle it?
  2. When an Agent crashes unexpectedly, how are its claimed-but-incomplete tasks recovered? What mechanism is needed?
  3. What concrete manifestations of "emergent behavior" exist in the AI Agent Swarm context? Is it possible to produce undesired emergent behaviors?
  4. If you introduce a "monitor Agent" in the Swarm to track overall system state, does this re-centralize the architecture?
Rate this chapter
4.6  / 5  (3 ratings)

๐Ÿ’ฌ Comments