Managed Agents in Production: Error Recovery, Monitoring and Cost Control
Chapter 33: Multi-Agent Orchestration: Making Multiple Claude Instances Work Together
33.1 Why Multi-Agent Systems Matter
A single Claude instance faces three fundamental constraints when handling complex tasks: context window limits, inability to parallelize, and lack of role specialization. When a task requires simultaneously analyzing a hundred-thousand-line codebase, coordinating multiple independent subtasks, or having different "roles" cross-validate each other's work, single-instance architectures begin to show their limits.
The core idea of Multi-Agent architecture is to decompose a complex task into subtasks, each handled by a specialized Claude instance (called a Subagent or Worker), coordinated by an Orchestrator. This pattern excels in scenarios such as:
- Large-scale code review: The Orchestrator splits a codebase into modules, multiple Workers analyze them in parallel, and results are aggregated into a report
- Complex research tasks: Different Workers handle data collection, fact-checking, and synthesis writing separately
- Pipeline processing: Data extraction โ cleaning โ analysis โ visualization, with an independent Agent for each stage
- Adversarial validation: One Agent generates an answer, another specifically hunts for flaws and errors
According to Anthropic's official documentation, Claude can play two roles in Multi-Agent systems: as an Orchestrator (issuing instructions to other Agents) or as a Subagent (executing specific tasks and returning results). The same Claude instance may play both roles simultaneously at different levels of the architecture hierarchy.
33.2 Orchestration Pattern Taxonomy
33.2.1 Centralized Orchestration (Hub-and-Spoke)
The most common pattern. A single Orchestrator receives user requests, assigns tasks to multiple Subagents, collects results, and returns an aggregated response.
User
โ
โผ
Orchestrator (primary Claude instance)
โโโ Worker A (handles task 1)
โโโ Worker B (handles task 2)
โโโ Worker C (handles task 3)
โ
โโโ Aggregated result โ User
Best for: tasks that decompose cleanly, relatively independent subtasks, and scenarios requiring final holistic judgment.
33.2.2 Pipeline Orchestration
Each Agent processes the output of the previous Agent, forming a sequential processing chain. Suited for tasks with clear ordering dependencies.
Input โ Agent A (extract) โ Agent B (analyze) โ Agent C (report) โ Output
Best for: data processing pipelines, multi-stage document generation, content creation requiring iterative refinement.
33.2.3 Hierarchical Orchestration
An Orchestrator manages Sub-orchestrators, which in turn manage Workers. Used for ultra-large-scale tasks where the problem space itself needs decomposition.
33.2.4 Peer-to-Peer Collaboration
Multiple Claude instances collaborate as equals, exchanging messages and reaching decisions through consensus or voting mechanisms. Common for scenarios requiring multi-perspective validation (e.g., code security review, medical advice generation).
33.3 Implementing Basic Orchestration with the Anthropic SDK
33.3.1 Python Implementation
import anthropic
import asyncio
import json
from typing import List, Dict, Any
client = anthropic.Anthropic()
async def run_subagent(task: str, context: str, model: str = "claude-opus-4-5") -> str:
"""Run a single Subagent to execute a specific task."""
message = client.messages.create(
model=model,
max_tokens=4096,
system=f"You are a specialized analysis Agent. Focus on the assigned subtask and provide detailed, accurate results.\nContext: {context}",
messages=[{"role": "user", "content": task}]
)
return message.content[0].text
async def orchestrator(user_request: str) -> str:
"""
Orchestrator: receives user request, decomposes tasks,
coordinates Subagents, and aggregates results.
"""
# Step 1: Decompose the task
decomposition_response = client.messages.create(
model="claude-opus-4-5",
max_tokens=2048,
system="""You are a task orchestrator. Decompose the user request into 2-4 independent subtasks.
Return JSON: {"subtasks": [{"id": "1", "task": "description", "requires": []}]}
The requires field lists task IDs that must complete first.""",
messages=[{"role": "user", "content": f"Decompose this task: {user_request}"}]
)
decomposition = json.loads(decomposition_response.content[0].text)
subtasks = decomposition["subtasks"]
# Step 2: Execute tasks respecting dependencies
results: Dict[str, str] = {}
async def execute_task(task_info):
context = f"Original user request: {user_request}"
result = await run_subagent(task_info["task"], context)
return task_info["id"], result
remaining = list(subtasks)
while remaining:
completed_ids = set(results.keys())
ready = [t for t in remaining if all(dep in completed_ids for dep in t.get("requires", []))]
if not ready:
break
task_results = await asyncio.gather(*[execute_task(t) for t in ready])
for task_id, result in task_results:
results[task_id] = result
remaining = [t for t in remaining if t["id"] not in results]
# Step 3: Synthesize all results
synthesis_prompt = f"""
Original request: {user_request}
Subtask results:
{chr(10).join([f"Task {k}: {v}" for k, v in results.items()])}
Synthesize the above into a complete, coherent final answer.
"""
final_response = client.messages.create(
model="claude-opus-4-5",
max_tokens=4096,
messages=[{"role": "user", "content": synthesis_prompt}]
)
return final_response.content[0].text
33.3.2 TypeScript Implementation
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
interface SubTask {
id: string;
task: string;
requires: string[];
}
async function orchestrate(userRequest: string): Promise<string> {
// Decompose
const decompositionMsg = await client.messages.create({
model: "claude-opus-4-5",
max_tokens: 2048,
system: `You are a task orchestrator. Decompose into subtasks. Return JSON:
{"subtasks": [{"id": "1", "task": "...", "requires": []}]}`,
messages: [{ role: "user", content: `Decompose: ${userRequest}` }],
});
const decomposition = JSON.parse(
decompositionMsg.content[0].type === "text"
? decompositionMsg.content[0].text
: "{}"
);
const results: Record<string, string> = {};
let remaining: SubTask[] = [...decomposition.subtasks];
while (remaining.length > 0) {
const completedIds = new Set(Object.keys(results));
const ready = remaining.filter((t) =>
t.requires.every((dep) => completedIds.has(dep))
);
if (ready.length === 0) break;
const taskResults = await Promise.all(
ready.map(async (t) => {
const msg = await client.messages.create({
model: "claude-sonnet-4-5",
max_tokens: 4096,
messages: [{ role: "user", content: `Context: ${userRequest}\n\nTask: ${t.task}` }],
});
const block = msg.content[0];
return [t.id, block.type === "text" ? block.text : ""] as [string, string];
})
);
for (const [id, result] of taskResults) {
results[id] = result;
}
remaining = remaining.filter((t) => !(t.id in results));
}
// Synthesize
const synthesisMsg = await client.messages.create({
model: "claude-opus-4-5",
max_tokens: 4096,
messages: [{
role: "user",
content: `Original request: ${userRequest}\n\nResults:\n${
Object.entries(results).map(([k, v]) => `Task ${k}: ${v}`).join("\n\n")
}\n\nSynthesize into a complete answer.`
}],
});
const block = synthesisMsg.content[0];
return block.type === "text" ? block.text : "";
}
33.4 Message Passing and Result Aggregation
33.4.1 Structured Message Format
Using structured message formats between Agents dramatically reduces parsing errors:
{
"message_id": "msg_20250101_001",
"from": "orchestrator",
"to": "worker_a",
"task_id": "task_001",
"type": "task_assignment",
"payload": {
"instruction": "Analyze the time complexity of the following code segment",
"data": "...",
"constraints": {
"max_tokens": 1024,
"format": "json",
"deadline_ms": 30000
}
},
"metadata": {
"priority": "high",
"retry_count": 0,
"parent_task": "task_000"
}
}
33.4.2 Result Aggregation Strategies
Voting Aggregation: Suitable for judgment tasks with clear correct answers. Multiple Agents answer independently, then a meta-Agent identifies consensus.
Weighted Aggregation: Assign different weights to Agents with different expertise levels. Higher-confidence, more specialized Agents have more influence on the final result.
Sequential Refinement: Each Agent improves on the previous Agent's output. The first Agent produces a draft, subsequent Agents critique and enhance it progressively.
33.5 Tool Calls Across Agents
33.5.1 Shared Tool Registry
In Multi-Agent systems, all Agents typically share a tool definition registry, but different Agents can access different tool subsets:
TOOL_REGISTRY = {
"web_search": {
"name": "web_search",
"description": "Search the internet for current information",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer", "default": 5}
},
"required": ["query"]
}
},
"code_executor": {
"name": "code_executor",
"description": "Execute Python code and return results",
"input_schema": {
"type": "object",
"properties": {
"code": {"type": "string"},
"timeout": {"type": "integer", "default": 30}
},
"required": ["code"]
}
}
}
ROLE_TOOLS = {
"researcher": ["web_search"],
"coder": ["code_executor"],
"orchestrator": ["web_search", "code_executor"]
}
33.5.2 Full Tool-Use Loop per Agent
Each Subagent may need to call tools multiple times before completing its task. Implement the full tool-use loop for each Agent:
async def run_agent_with_tools(task: str, tools: list, tool_executor) -> dict:
"""Run an Agent with tool-calling capability through the full interaction loop."""
messages = [{"role": "user", "content": task}]
tool_calls_log = []
while True:
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=4096,
tools=tools,
messages=messages
)
if response.stop_reason == "end_turn":
final_text = next((b.text for b in response.content if b.type == "text"), "")
return {"result": final_text, "tool_calls": tool_calls_log}
elif response.stop_reason == "tool_use":
tool_use_blocks = [b for b in response.content if b.type == "tool_use"]
messages.append({"role": "assistant", "content": response.content})
tool_results = []
for tool_use in tool_use_blocks:
result = await tool_executor(tool_use.name, tool_use.input)
tool_calls_log.append({"tool": tool_use.name, "input": tool_use.input, "output": result})
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": str(result)
})
messages.append({"role": "user", "content": tool_results})
33.6 State Management and Context Sharing
33.6.1 Thread-Safe Shared State
Multi-Agent systems need a shared state store that Agents can read from and write to:
import threading
from typing import Any, Dict, List
import time
class SharedState:
"""Thread-safe shared state storage for multi-agent systems."""
def __init__(self):
self._lock = threading.Lock()
self._data: Dict[str, Any] = {}
self._history: List[Dict] = []
def set(self, key: str, value: Any, agent_id: str = "unknown"):
with self._lock:
self._data[key] = value
self._history.append({
"timestamp": time.time(),
"agent": agent_id,
"key": key,
"value": value
})
def get(self, key: str, default: Any = None) -> Any:
with self._lock:
return self._data.get(key, default)
def get_context(self, relevant_keys: List[str]) -> str:
import json
with self._lock:
return json.dumps(
{k: v for k, v in self._data.items() if k in relevant_keys},
ensure_ascii=False, indent=2
)
33.6.2 Context Compression
When Agents need to pass large amounts of context, compression prevents hitting token limits:
async def compress_context(full_context: str, target_tokens: int = 2000) -> str:
"""Compress long context using a lightweight model to preserve key information."""
response = client.messages.create(
model="claude-haiku-4-5", # Use smaller model for compression to save cost
max_tokens=target_tokens,
messages=[{
"role": "user",
"content": f"Compress this context to ~{target_tokens} tokens, preserving: key decisions, important data/numbers, task progress, errors.\n\n{full_context}"
}]
)
return response.content[0].text
33.7 Error Handling and Fault Tolerance
33.7.1 Retry with Exponential Backoff
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def resilient_subagent(task: str, context: str) -> str:
"""Subagent with automatic retry on transient failures."""
return await run_subagent(task, context)
33.7.2 Timeout Control
async def timeout_subagent(task: str, context: str, timeout: float = 60.0):
"""Subagent execution with timeout control."""
try:
return await asyncio.wait_for(run_subagent(task, context), timeout=timeout)
except asyncio.TimeoutError:
print(f"Task timed out after {timeout}s: {task[:50]}...")
return None
33.8 Cost Optimization
33.8.1 Model Tiering
Not every subtask requires the most powerful model. Match model capability to task complexity:
| Model | Input (per 1M tokens) | Output (per 1M tokens) | Best For |
|---|---|---|---|
| claude-opus-4-5 | $15 | $75 | Complex reasoning, final synthesis |
| claude-sonnet-4-5 | $3 | $15 | General analysis, content generation |
| claude-haiku-4-5 | $0.25 | $1.25 | Extraction, formatting, compression |
MODEL_TIERS = {
"heavy": "claude-opus-4-5",
"medium": "claude-sonnet-4-5",
"light": "claude-haiku-4-5"
}
def select_model(task_type: str) -> str:
heavy = {"complex_analysis", "synthesis", "creative_writing"}
light = {"extraction", "formatting", "classification", "compression"}
if task_type in heavy:
return MODEL_TIERS["heavy"]
elif task_type in light:
return MODEL_TIERS["light"]
return MODEL_TIERS["medium"]
33.8.2 Result Caching
Cache Subagent results for identical or similar subtasks to avoid redundant API calls:
import hashlib
class SubagentCache:
def __init__(self, ttl_seconds: int = 3600):
self._cache: dict = {}
self.ttl = ttl_seconds
async def get_or_execute(self, task: str, context: str) -> str:
key = hashlib.md5(f"{task}|||{context}".encode()).hexdigest()
if key in self._cache:
entry = self._cache[key]
if time.time() - entry["timestamp"] < self.ttl:
return entry["result"]
result = await run_subagent(task, context)
self._cache[key] = {"result": result, "timestamp": time.time()}
return result
33.9 Real-World Example: Automated Research Report Generation
async def generate_research_report(topic: str) -> str:
"""
Full multi-agent pipeline for research report generation:
1. Outline Agent
2. Parallel chapter research Agents
3. Fact-check Agent
4. Synthesis Agent
"""
import json
# Step 1: Generate outline
outline_response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"Generate a 4-chapter research outline for '{topic}'. JSON: {{\"chapters\": [\"description\"]}}"
}]
)
outline = json.loads(outline_response.content[0].text)
chapters = outline["chapters"]
# Step 2: Parallel chapter research
async def research_chapter(idx: int, chapter: str):
response = client.messages.create(
model="claude-haiku-4-5",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"Write one chapter about: {chapter} (topic: {topic}). ~300 words with specific data and examples."
}]
)
return idx, response.content[0].text
chapter_results = await asyncio.gather(
*[research_chapter(i, ch) for i, ch in enumerate(chapters)]
)
chapter_contents = dict(chapter_results)
# Step 3: Fact-check
draft = "\n\n".join([
f"## Chapter {i+1}: {chapters[i]}\n{chapter_contents[i]}"
for i in range(len(chapters))
])
fact_check = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"Identify potential factual errors or uncertain claims in this draft:\n\n{draft}"
}]
).content[0].text
# Step 4: Final synthesis
final = client.messages.create(
model="claude-opus-4-5",
max_tokens=8192,
messages=[{
"role": "user",
"content": f"Topic: {topic}\n\nDraft:\n{draft}\n\nFact-check notes:\n{fact_check}\n\nProduce a polished, coherent research report addressing all fact-check concerns."
}]
)
return final.content[0].text
33.10 Design Principles for Multi-Agent Systems
Building effective Multi-Agent systems requires adherence to several foundational principles:
Clear Responsibility Boundaries: Each Agent should have a well-defined, singular responsibility. Avoid circular dependencies between Agents.
Minimal Context Transfer: Pass only the information strictly necessary for an Agent to complete its task. Excess context wastes tokens and introduces confusion.
Idempotent Design: When a subtask is retried, the result should be deterministic or safely mergeable. Avoid side effects that compound on repeated execution.
Explicit Dependency Graphs: Define task dependencies explicitly in code (as a DAG). Never rely on implicit execution order assumptions.
Audit Trails: Log every Agent's inputs, outputs, and tool calls for debugging and retrospective analysis.
Progressive Complexity: Start with the simplest centralized orchestration. Introduce hierarchical and pipeline structures only when genuinely necessary โ avoid over-engineering.
Summary
This chapter provided a systematic introduction to the core patterns and implementation techniques of Multi-Agent orchestration. From centralized Hub-and-Spoke to pipeline orchestration, from shared state management to cross-Agent tool calls, the power of Multi-Agent architecture lies in pushing task decomposition and parallel execution to their limits.
Key takeaways:
- Choose the orchestration pattern that fits your task's characteristics (centralized vs. pipeline vs. hierarchical)
- Use structured message formats to minimize inter-Agent parsing errors
- Implement comprehensive error handling, retry logic, and timeout mechanisms
- Control costs through model tiering (Opus/Sonnet/Haiku)
- Shared state storage requires thread-safe design
The next chapter dives into the MCP (Model Context Protocol), the standardized specification Anthropic established for the tool ecosystem.