Chapter 33

Managed Agents in Production: Error Recovery, Monitoring and Cost Control

Chapter 33: Multi-Agent Orchestration: Making Multiple Claude Instances Work Together

33.1 Why Multi-Agent Systems Matter

A single Claude instance faces three fundamental constraints when handling complex tasks: context window limits, inability to parallelize, and lack of role specialization. When a task requires simultaneously analyzing a hundred-thousand-line codebase, coordinating multiple independent subtasks, or having different "roles" cross-validate each other's work, single-instance architectures begin to show their limits.

The core idea of Multi-Agent architecture is to decompose a complex task into subtasks, each handled by a specialized Claude instance (called a Subagent or Worker), coordinated by an Orchestrator. This pattern excels in scenarios such as:

According to Anthropic's official documentation, Claude can play two roles in Multi-Agent systems: as an Orchestrator (issuing instructions to other Agents) or as a Subagent (executing specific tasks and returning results). The same Claude instance may play both roles simultaneously at different levels of the architecture hierarchy.

33.2 Orchestration Pattern Taxonomy

33.2.1 Centralized Orchestration (Hub-and-Spoke)

The most common pattern. A single Orchestrator receives user requests, assigns tasks to multiple Subagents, collects results, and returns an aggregated response.

User
  โ”‚
  โ–ผ
Orchestrator (primary Claude instance)
  โ”œโ”€โ”€ Worker A (handles task 1)
  โ”œโ”€โ”€ Worker B (handles task 2)
  โ””โ”€โ”€ Worker C (handles task 3)
        โ”‚
        โ””โ”€โ”€ Aggregated result โ†’ User

Best for: tasks that decompose cleanly, relatively independent subtasks, and scenarios requiring final holistic judgment.

33.2.2 Pipeline Orchestration

Each Agent processes the output of the previous Agent, forming a sequential processing chain. Suited for tasks with clear ordering dependencies.

Input โ†’ Agent A (extract) โ†’ Agent B (analyze) โ†’ Agent C (report) โ†’ Output

Best for: data processing pipelines, multi-stage document generation, content creation requiring iterative refinement.

33.2.3 Hierarchical Orchestration

An Orchestrator manages Sub-orchestrators, which in turn manage Workers. Used for ultra-large-scale tasks where the problem space itself needs decomposition.

33.2.4 Peer-to-Peer Collaboration

Multiple Claude instances collaborate as equals, exchanging messages and reaching decisions through consensus or voting mechanisms. Common for scenarios requiring multi-perspective validation (e.g., code security review, medical advice generation).

33.3 Implementing Basic Orchestration with the Anthropic SDK

33.3.1 Python Implementation

import anthropic
import asyncio
import json
from typing import List, Dict, Any

client = anthropic.Anthropic()

async def run_subagent(task: str, context: str, model: str = "claude-opus-4-5") -> str:
    """Run a single Subagent to execute a specific task."""
    message = client.messages.create(
        model=model,
        max_tokens=4096,
        system=f"You are a specialized analysis Agent. Focus on the assigned subtask and provide detailed, accurate results.\nContext: {context}",
        messages=[{"role": "user", "content": task}]
    )
    return message.content[0].text


async def orchestrator(user_request: str) -> str:
    """
    Orchestrator: receives user request, decomposes tasks,
    coordinates Subagents, and aggregates results.
    """
    # Step 1: Decompose the task
    decomposition_response = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=2048,
        system="""You are a task orchestrator. Decompose the user request into 2-4 independent subtasks.
Return JSON: {"subtasks": [{"id": "1", "task": "description", "requires": []}]}
The requires field lists task IDs that must complete first.""",
        messages=[{"role": "user", "content": f"Decompose this task: {user_request}"}]
    )
    
    decomposition = json.loads(decomposition_response.content[0].text)
    subtasks = decomposition["subtasks"]
    
    # Step 2: Execute tasks respecting dependencies
    results: Dict[str, str] = {}
    
    async def execute_task(task_info):
        context = f"Original user request: {user_request}"
        result = await run_subagent(task_info["task"], context)
        return task_info["id"], result
    
    remaining = list(subtasks)
    while remaining:
        completed_ids = set(results.keys())
        ready = [t for t in remaining if all(dep in completed_ids for dep in t.get("requires", []))]
        if not ready:
            break
        
        task_results = await asyncio.gather(*[execute_task(t) for t in ready])
        for task_id, result in task_results:
            results[task_id] = result
        
        remaining = [t for t in remaining if t["id"] not in results]
    
    # Step 3: Synthesize all results
    synthesis_prompt = f"""
Original request: {user_request}

Subtask results:
{chr(10).join([f"Task {k}: {v}" for k, v in results.items()])}

Synthesize the above into a complete, coherent final answer.
"""
    
    final_response = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=4096,
        messages=[{"role": "user", "content": synthesis_prompt}]
    )
    
    return final_response.content[0].text

33.3.2 TypeScript Implementation

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

interface SubTask {
  id: string;
  task: string;
  requires: string[];
}

async function orchestrate(userRequest: string): Promise<string> {
  // Decompose
  const decompositionMsg = await client.messages.create({
    model: "claude-opus-4-5",
    max_tokens: 2048,
    system: `You are a task orchestrator. Decompose into subtasks. Return JSON:
{"subtasks": [{"id": "1", "task": "...", "requires": []}]}`,
    messages: [{ role: "user", content: `Decompose: ${userRequest}` }],
  });

  const decomposition = JSON.parse(
    decompositionMsg.content[0].type === "text"
      ? decompositionMsg.content[0].text
      : "{}"
  );

  const results: Record<string, string> = {};
  let remaining: SubTask[] = [...decomposition.subtasks];

  while (remaining.length > 0) {
    const completedIds = new Set(Object.keys(results));
    const ready = remaining.filter((t) =>
      t.requires.every((dep) => completedIds.has(dep))
    );
    if (ready.length === 0) break;

    const taskResults = await Promise.all(
      ready.map(async (t) => {
        const msg = await client.messages.create({
          model: "claude-sonnet-4-5",
          max_tokens: 4096,
          messages: [{ role: "user", content: `Context: ${userRequest}\n\nTask: ${t.task}` }],
        });
        const block = msg.content[0];
        return [t.id, block.type === "text" ? block.text : ""] as [string, string];
      })
    );

    for (const [id, result] of taskResults) {
      results[id] = result;
    }
    remaining = remaining.filter((t) => !(t.id in results));
  }

  // Synthesize
  const synthesisMsg = await client.messages.create({
    model: "claude-opus-4-5",
    max_tokens: 4096,
    messages: [{
      role: "user",
      content: `Original request: ${userRequest}\n\nResults:\n${
        Object.entries(results).map(([k, v]) => `Task ${k}: ${v}`).join("\n\n")
      }\n\nSynthesize into a complete answer.`
    }],
  });

  const block = synthesisMsg.content[0];
  return block.type === "text" ? block.text : "";
}

33.4 Message Passing and Result Aggregation

33.4.1 Structured Message Format

Using structured message formats between Agents dramatically reduces parsing errors:

{
  "message_id": "msg_20250101_001",
  "from": "orchestrator",
  "to": "worker_a",
  "task_id": "task_001",
  "type": "task_assignment",
  "payload": {
    "instruction": "Analyze the time complexity of the following code segment",
    "data": "...",
    "constraints": {
      "max_tokens": 1024,
      "format": "json",
      "deadline_ms": 30000
    }
  },
  "metadata": {
    "priority": "high",
    "retry_count": 0,
    "parent_task": "task_000"
  }
}

33.4.2 Result Aggregation Strategies

Voting Aggregation: Suitable for judgment tasks with clear correct answers. Multiple Agents answer independently, then a meta-Agent identifies consensus.

Weighted Aggregation: Assign different weights to Agents with different expertise levels. Higher-confidence, more specialized Agents have more influence on the final result.

Sequential Refinement: Each Agent improves on the previous Agent's output. The first Agent produces a draft, subsequent Agents critique and enhance it progressively.

33.5 Tool Calls Across Agents

33.5.1 Shared Tool Registry

In Multi-Agent systems, all Agents typically share a tool definition registry, but different Agents can access different tool subsets:

TOOL_REGISTRY = {
    "web_search": {
        "name": "web_search",
        "description": "Search the internet for current information",
        "input_schema": {
            "type": "object",
            "properties": {
                "query": {"type": "string"},
                "max_results": {"type": "integer", "default": 5}
            },
            "required": ["query"]
        }
    },
    "code_executor": {
        "name": "code_executor",
        "description": "Execute Python code and return results",
        "input_schema": {
            "type": "object",
            "properties": {
                "code": {"type": "string"},
                "timeout": {"type": "integer", "default": 30}
            },
            "required": ["code"]
        }
    }
}

ROLE_TOOLS = {
    "researcher": ["web_search"],
    "coder": ["code_executor"],
    "orchestrator": ["web_search", "code_executor"]
}

33.5.2 Full Tool-Use Loop per Agent

Each Subagent may need to call tools multiple times before completing its task. Implement the full tool-use loop for each Agent:

async def run_agent_with_tools(task: str, tools: list, tool_executor) -> dict:
    """Run an Agent with tool-calling capability through the full interaction loop."""
    messages = [{"role": "user", "content": task}]
    tool_calls_log = []
    
    while True:
        response = client.messages.create(
            model="claude-opus-4-5",
            max_tokens=4096,
            tools=tools,
            messages=messages
        )
        
        if response.stop_reason == "end_turn":
            final_text = next((b.text for b in response.content if b.type == "text"), "")
            return {"result": final_text, "tool_calls": tool_calls_log}
        
        elif response.stop_reason == "tool_use":
            tool_use_blocks = [b for b in response.content if b.type == "tool_use"]
            messages.append({"role": "assistant", "content": response.content})
            
            tool_results = []
            for tool_use in tool_use_blocks:
                result = await tool_executor(tool_use.name, tool_use.input)
                tool_calls_log.append({"tool": tool_use.name, "input": tool_use.input, "output": result})
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": tool_use.id,
                    "content": str(result)
                })
            
            messages.append({"role": "user", "content": tool_results})

33.6 State Management and Context Sharing

33.6.1 Thread-Safe Shared State

Multi-Agent systems need a shared state store that Agents can read from and write to:

import threading
from typing import Any, Dict, List
import time

class SharedState:
    """Thread-safe shared state storage for multi-agent systems."""
    
    def __init__(self):
        self._lock = threading.Lock()
        self._data: Dict[str, Any] = {}
        self._history: List[Dict] = []
    
    def set(self, key: str, value: Any, agent_id: str = "unknown"):
        with self._lock:
            self._data[key] = value
            self._history.append({
                "timestamp": time.time(),
                "agent": agent_id,
                "key": key,
                "value": value
            })
    
    def get(self, key: str, default: Any = None) -> Any:
        with self._lock:
            return self._data.get(key, default)
    
    def get_context(self, relevant_keys: List[str]) -> str:
        import json
        with self._lock:
            return json.dumps(
                {k: v for k, v in self._data.items() if k in relevant_keys},
                ensure_ascii=False, indent=2
            )

33.6.2 Context Compression

When Agents need to pass large amounts of context, compression prevents hitting token limits:

async def compress_context(full_context: str, target_tokens: int = 2000) -> str:
    """Compress long context using a lightweight model to preserve key information."""
    response = client.messages.create(
        model="claude-haiku-4-5",  # Use smaller model for compression to save cost
        max_tokens=target_tokens,
        messages=[{
            "role": "user",
            "content": f"Compress this context to ~{target_tokens} tokens, preserving: key decisions, important data/numbers, task progress, errors.\n\n{full_context}"
        }]
    )
    return response.content[0].text

33.7 Error Handling and Fault Tolerance

33.7.1 Retry with Exponential Backoff

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def resilient_subagent(task: str, context: str) -> str:
    """Subagent with automatic retry on transient failures."""
    return await run_subagent(task, context)

33.7.2 Timeout Control

async def timeout_subagent(task: str, context: str, timeout: float = 60.0):
    """Subagent execution with timeout control."""
    try:
        return await asyncio.wait_for(run_subagent(task, context), timeout=timeout)
    except asyncio.TimeoutError:
        print(f"Task timed out after {timeout}s: {task[:50]}...")
        return None

33.8 Cost Optimization

33.8.1 Model Tiering

Not every subtask requires the most powerful model. Match model capability to task complexity:

Model Input (per 1M tokens) Output (per 1M tokens) Best For
claude-opus-4-5 $15 $75 Complex reasoning, final synthesis
claude-sonnet-4-5 $3 $15 General analysis, content generation
claude-haiku-4-5 $0.25 $1.25 Extraction, formatting, compression
MODEL_TIERS = {
    "heavy": "claude-opus-4-5",
    "medium": "claude-sonnet-4-5",
    "light": "claude-haiku-4-5"
}

def select_model(task_type: str) -> str:
    heavy = {"complex_analysis", "synthesis", "creative_writing"}
    light = {"extraction", "formatting", "classification", "compression"}
    if task_type in heavy:
        return MODEL_TIERS["heavy"]
    elif task_type in light:
        return MODEL_TIERS["light"]
    return MODEL_TIERS["medium"]

33.8.2 Result Caching

Cache Subagent results for identical or similar subtasks to avoid redundant API calls:

import hashlib

class SubagentCache:
    def __init__(self, ttl_seconds: int = 3600):
        self._cache: dict = {}
        self.ttl = ttl_seconds
    
    async def get_or_execute(self, task: str, context: str) -> str:
        key = hashlib.md5(f"{task}|||{context}".encode()).hexdigest()
        
        if key in self._cache:
            entry = self._cache[key]
            if time.time() - entry["timestamp"] < self.ttl:
                return entry["result"]
        
        result = await run_subagent(task, context)
        self._cache[key] = {"result": result, "timestamp": time.time()}
        return result

33.9 Real-World Example: Automated Research Report Generation

async def generate_research_report(topic: str) -> str:
    """
    Full multi-agent pipeline for research report generation:
    1. Outline Agent
    2. Parallel chapter research Agents
    3. Fact-check Agent
    4. Synthesis Agent
    """
    import json
    
    # Step 1: Generate outline
    outline_response = client.messages.create(
        model="claude-sonnet-4-5",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": f"Generate a 4-chapter research outline for '{topic}'. JSON: {{\"chapters\": [\"description\"]}}"
        }]
    )
    outline = json.loads(outline_response.content[0].text)
    chapters = outline["chapters"]
    
    # Step 2: Parallel chapter research
    async def research_chapter(idx: int, chapter: str):
        response = client.messages.create(
            model="claude-haiku-4-5",
            max_tokens=2048,
            messages=[{
                "role": "user",
                "content": f"Write one chapter about: {chapter} (topic: {topic}). ~300 words with specific data and examples."
            }]
        )
        return idx, response.content[0].text
    
    chapter_results = await asyncio.gather(
        *[research_chapter(i, ch) for i, ch in enumerate(chapters)]
    )
    chapter_contents = dict(chapter_results)
    
    # Step 3: Fact-check
    draft = "\n\n".join([
        f"## Chapter {i+1}: {chapters[i]}\n{chapter_contents[i]}"
        for i in range(len(chapters))
    ])
    
    fact_check = client.messages.create(
        model="claude-sonnet-4-5",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": f"Identify potential factual errors or uncertain claims in this draft:\n\n{draft}"
        }]
    ).content[0].text
    
    # Step 4: Final synthesis
    final = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=8192,
        messages=[{
            "role": "user",
            "content": f"Topic: {topic}\n\nDraft:\n{draft}\n\nFact-check notes:\n{fact_check}\n\nProduce a polished, coherent research report addressing all fact-check concerns."
        }]
    )
    
    return final.content[0].text

33.10 Design Principles for Multi-Agent Systems

Building effective Multi-Agent systems requires adherence to several foundational principles:

Clear Responsibility Boundaries: Each Agent should have a well-defined, singular responsibility. Avoid circular dependencies between Agents.

Minimal Context Transfer: Pass only the information strictly necessary for an Agent to complete its task. Excess context wastes tokens and introduces confusion.

Idempotent Design: When a subtask is retried, the result should be deterministic or safely mergeable. Avoid side effects that compound on repeated execution.

Explicit Dependency Graphs: Define task dependencies explicitly in code (as a DAG). Never rely on implicit execution order assumptions.

Audit Trails: Log every Agent's inputs, outputs, and tool calls for debugging and retrospective analysis.

Progressive Complexity: Start with the simplest centralized orchestration. Introduce hierarchical and pipeline structures only when genuinely necessary โ€” avoid over-engineering.


Summary

This chapter provided a systematic introduction to the core patterns and implementation techniques of Multi-Agent orchestration. From centralized Hub-and-Spoke to pipeline orchestration, from shared state management to cross-Agent tool calls, the power of Multi-Agent architecture lies in pushing task decomposition and parallel execution to their limits.

Key takeaways:

  1. Choose the orchestration pattern that fits your task's characteristics (centralized vs. pipeline vs. hierarchical)
  2. Use structured message formats to minimize inter-Agent parsing errors
  3. Implement comprehensive error handling, retry logic, and timeout mechanisms
  4. Control costs through model tiering (Opus/Sonnet/Haiku)
  5. Shared state storage requires thread-safe design

The next chapter dives into the MCP (Model Context Protocol), the standardized specification Anthropic established for the tool ecosystem.

Rate this chapter
4.7  / 5  (3 ratings)

๐Ÿ’ฌ Comments