Managed Agents 生产实践:错误恢复、监控告警与成本控制
第三十三章:Multi-Agent 编排:让多个 Claude 实例协同工作
33.1 为什么需要多 Agent 协同
单个 Claude 实例在处理复杂任务时面临三类瓶颈:上下文窗口限制、并行处理能力不足与专业化分工缺失。当任务需要同时分析十万行代码库、协调多个独立子任务、或者需要不同"角色"互相校验时,单实例架构就开始暴露其局限。
Multi-Agent 架构的核心思想是将一个复杂任务分解为多个子任务,每个子任务交给一个专门的 Claude 实例(称为 Subagent 或 Worker)处理,由一个 Orchestrator(编排器)协调整体流程。这一模式在以下场景中尤为有效:
- 大规模代码审查:Orchestrator 将代码库分成模块,多个 Worker 并行分析,最后汇总报告
- 复杂研究任务:不同 Worker 分别负责资料收集、事实核查、综合撰写
- 流水线处理:数据提取 → 清洗 → 分析 → 可视化,每个阶段独立 Agent
- 对抗性验证:一个 Agent 生成答案,另一个专门寻找漏洞和错误
根据 Anthropic 的官方文档,Claude 在 Multi-Agent 系统中可以扮演两种角色:作为 Orchestrator(向其他 Agent 发出指令)或作为 Subagent(执行具体任务并返回结果)。同一个 Claude 实例在不同架构层级中可能同时扮演两种角色。
33.2 编排模式分类
33.2.1 中心化编排(Hub-and-Spoke)
最常见的模式。一个 Orchestrator 接收用户请求,将任务分配给多个 Subagent,收集结果后聚合返回。
用户
│
▼
Orchestrator(主 Claude 实例)
├── Worker A(负责任务 1)
├── Worker B(负责任务 2)
└── Worker C(负责任务 3)
│
└── 结果汇总 → 用户
适用场景:任务可以明确拆分、子任务相对独立、需要最终综合判断的场景。
33.2.2 流水线编排(Pipeline)
每个 Agent 处理上一个 Agent 的输出,形成串行处理链。适合有明确顺序依赖的任务。
输入 → Agent A(提取) → Agent B(分析) → Agent C(报告) → 输出
适用场景:数据处理流水线、多阶段文档生成、需要逐步精炼的内容创作。
33.2.3 分层编排(Hierarchical)
Orchestrator 下面有 Sub-orchestrator,Sub-orchestrator 再管理 Worker。用于超大规模任务。
Master Orchestrator
├── Sub-orchestrator A
│ ├── Worker A1
│ └── Worker A2
└── Sub-orchestrator B
├── Worker B1
└── Worker B2
33.2.4 对等协作(Peer-to-Peer)
多个 Claude 实例平等协作,互相发送消息,通过共识或投票机制决定最终输出。常用于需要多视角验证的场景(如代码安全审查、医疗建议生成)。
33.3 使用 Anthropic SDK 实现基础编排
33.3.1 Python 实现示例
import anthropic
import asyncio
from typing import List, Dict, Any
client = anthropic.Anthropic()
async def run_subagent(
task: str,
context: str,
model: str = "claude-opus-4-5"
) -> str:
"""运行单个 Subagent 执行具体任务"""
message = client.messages.create(
model=model,
max_tokens=4096,
system=f"""你是一个专业的分析 Agent。
你的任务是专注处理分配给你的子任务,提供详细、准确的结果。
背景信息:{context}""",
messages=[
{"role": "user", "content": task}
]
)
return message.content[0].text
async def orchestrator(user_request: str) -> str:
"""
Orchestrator:接收用户请求,分解任务,协调 Subagent,汇总结果
"""
# 第一步:让 Orchestrator 分解任务
decomposition_response = client.messages.create(
model="claude-opus-4-5",
max_tokens=2048,
system="""你是一个任务编排器。
收到用户请求后,将其分解为 2-4 个独立的子任务。
以 JSON 格式返回,结构为:
{
"subtasks": [
{"id": "1", "task": "任务描述", "requires": []},
{"id": "2", "task": "任务描述", "requires": ["1"]}
]
}
requires 字段表示依赖关系(需要先完成哪些任务)。""",
messages=[
{"role": "user", "content": f"请分解以下任务:{user_request}"}
]
)
import json
decomposition = json.loads(decomposition_response.content[0].text)
subtasks = decomposition["subtasks"]
# 第二步:执行无依赖的任务(并行)
results: Dict[str, str] = {}
# 找出没有依赖的任务
ready_tasks = [t for t in subtasks if not t.get("requires")]
# 并行执行
async def execute_task(task_info):
context = f"原始用户请求:{user_request}"
result = await run_subagent(task_info["task"], context)
return task_info["id"], result
while ready_tasks:
# 并行执行所有就绪任务
task_results = await asyncio.gather(
*[execute_task(t) for t in ready_tasks]
)
for task_id, result in task_results:
results[task_id] = result
# 找出新的就绪任务(依赖已全部完成)
completed_ids = set(results.keys())
remaining = [t for t in subtasks if t["id"] not in completed_ids]
ready_tasks = [
t for t in remaining
if all(dep in completed_ids for dep in t.get("requires", []))
]
# 第三步:汇总所有结果
synthesis_prompt = f"""
用户的原始请求:{user_request}
各子任务的执行结果:
{chr(10).join([f"子任务 {k}:{v}" for k, v in results.items()])}
请将以上结果综合整理,形成完整、连贯的最终回答。
"""
final_response = client.messages.create(
model="claude-opus-4-5",
max_tokens=4096,
messages=[{"role": "user", "content": synthesis_prompt}]
)
return final_response.content[0].text
# 使用示例
async def main():
result = await orchestrator(
"分析 GPT-4 和 Claude 3.5 Sonnet 在代码生成、推理和创意写作三个维度的优劣势,并给出选型建议"
)
print(result)
if __name__ == "__main__":
asyncio.run(main())
33.3.2 TypeScript 实现示例
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
interface SubTask {
id: string;
task: string;
requires: string[];
}
interface TaskDecomposition {
subtasks: SubTask[];
}
async function runSubagent(
task: string,
context: string,
model: string = "claude-opus-4-5"
): Promise<string> {
const message = await client.messages.create({
model,
max_tokens: 4096,
system: `你是一个专业分析 Agent。专注处理分配的子任务,提供详细准确的结果。
背景:${context}`,
messages: [{ role: "user", content: task }],
});
const block = message.content[0];
return block.type === "text" ? block.text : "";
}
async function orchestrate(userRequest: string): Promise<string> {
// 分解任务
const decompositionMsg = await client.messages.create({
model: "claude-opus-4-5",
max_tokens: 2048,
system: `你是任务编排器。将用户请求分解为独立子任务,以 JSON 格式返回:
{"subtasks": [{"id": "1", "task": "...", "requires": []}]}`,
messages: [{ role: "user", content: `分解任务:${userRequest}` }],
});
const block = decompositionMsg.content[0];
const decomposition: TaskDecomposition = JSON.parse(
block.type === "text" ? block.text : "{}"
);
const results: Record<string, string> = {};
const subtasks = decomposition.subtasks;
// 按依赖顺序执行
let remaining = [...subtasks];
while (remaining.length > 0) {
const completedIds = new Set(Object.keys(results));
const ready = remaining.filter((t) =>
t.requires.every((dep) => completedIds.has(dep))
);
if (ready.length === 0) break; // 避免死锁
// 并行执行就绪任务
const taskResults = await Promise.all(
ready.map(async (t) => {
const result = await runSubagent(
t.task,
`原始请求:${userRequest}`
);
return [t.id, result] as [string, string];
})
);
for (const [id, result] of taskResults) {
results[id] = result;
}
remaining = remaining.filter((t) => !(t.id in results));
}
// 汇总
const synthesisPrompt = `
原始请求:${userRequest}
子任务结果:
${Object.entries(results)
.map(([k, v]) => `任务 ${k}:${v}`)
.join("\n\n")}
请综合以上结果,形成完整回答。
`;
const finalMsg = await client.messages.create({
model: "claude-opus-4-5",
max_tokens: 4096,
messages: [{ role: "user", content: synthesisPrompt }],
});
const finalBlock = finalMsg.content[0];
return finalBlock.type === "text" ? finalBlock.text : "";
}
// 使用
orchestrate("对比分析三种主流 LLM API 的定价策略和适用场景").then(console.log);
33.4 消息传递机制与结果聚合
33.4.1 结构化消息格式
Agent 间传递消息时,使用结构化格式能极大降低解析错误率。推荐的消息信封格式:
{
"message_id": "msg_20250101_001",
"from": "orchestrator",
"to": "worker_a",
"task_id": "task_001",
"type": "task_assignment",
"payload": {
"instruction": "分析以下代码段的时间复杂度",
"data": "...",
"constraints": {
"max_tokens": 1024,
"format": "json",
"deadline_ms": 30000
}
},
"metadata": {
"priority": "high",
"retry_count": 0,
"parent_task": "task_000"
}
}
33.4.2 结果聚合策略
不同场景需要不同的聚合策略:
投票聚合(Voting Aggregation):适合有明确对错的判断类任务。
def voting_aggregation(results: List[str], question: str) -> str:
"""让多个 Agent 投票,选择出现频率最高的答案"""
vote_prompt = f"""
以下是多个 Agent 对同一问题的回答:
{chr(10).join([f"Agent {i+1}:{r}" for i, r in enumerate(results)])}
问题:{question}
请分析这些答案,找出共识,给出最终答案。如果存在分歧,说明分歧原因。
"""
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=2048,
messages=[{"role": "user", "content": vote_prompt}]
)
return response.content[0].text
加权聚合(Weighted Aggregation):给不同专长的 Agent 分配不同权重。
def weighted_aggregation(
results: List[Dict[str, Any]],
weights: Dict[str, float]
) -> str:
"""
results: [{"agent_id": "expert_a", "content": "...", "confidence": 0.9}]
weights: {"expert_a": 0.6, "expert_b": 0.4}
"""
weighted_prompt = "\n".join([
f"Agent {r['agent_id']}(权重 {weights.get(r['agent_id'], 0.5):.1f},"
f"置信度 {r.get('confidence', 0.5):.1f}):{r['content']}"
for r in results
])
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"综合以下加权意见,给出最终结论:\n{weighted_prompt}"
}]
)
return response.content[0].text
33.5 工具调用在 Multi-Agent 中的传递
33.5.1 共享工具定义
在 Multi-Agent 系统中,所有 Agent 通常共享一套工具定义,但不同 Agent 可以访问不同工具子集:
# 工具注册表
TOOL_REGISTRY = {
"web_search": {
"name": "web_search",
"description": "搜索互联网获取最新信息",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "搜索关键词"},
"max_results": {"type": "integer", "default": 5}
},
"required": ["query"]
}
},
"code_executor": {
"name": "code_executor",
"description": "执行 Python 代码并返回结果",
"input_schema": {
"type": "object",
"properties": {
"code": {"type": "string"},
"timeout": {"type": "integer", "default": 30}
},
"required": ["code"]
}
},
"file_reader": {
"name": "file_reader",
"description": "读取本地文件内容",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"encoding": {"type": "string", "default": "utf-8"}
},
"required": ["path"]
}
}
}
# 不同角色获得不同工具权限
ROLE_TOOLS = {
"researcher": ["web_search", "file_reader"],
"coder": ["code_executor", "file_reader"],
"orchestrator": ["web_search", "code_executor", "file_reader"]
}
def get_tools_for_role(role: str) -> List[dict]:
tool_names = ROLE_TOOLS.get(role, [])
return [TOOL_REGISTRY[name] for name in tool_names if name in TOOL_REGISTRY]
33.5.2 工具调用结果的跨 Agent 传递
当 Subagent 调用工具后,其结果需要传递给 Orchestrator 或其他 Agent:
async def run_agent_with_tools(
task: str,
tools: List[dict],
tool_executor: callable
) -> Dict[str, Any]:
"""
运行带工具调用能力的 Agent,处理完整的工具调用循环
返回最终结果和所有工具调用记录
"""
messages = [{"role": "user", "content": task}]
tool_calls_log = []
while True:
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=4096,
tools=tools,
messages=messages
)
if response.stop_reason == "end_turn":
# Agent 完成任务
final_text = next(
(b.text for b in response.content if b.type == "text"), ""
)
return {
"result": final_text,
"tool_calls": tool_calls_log,
"usage": response.usage.model_dump()
}
elif response.stop_reason == "tool_use":
# 处理工具调用
tool_use_blocks = [
b for b in response.content if b.type == "tool_use"
]
# 将 Assistant 消息加入历史
messages.append({
"role": "assistant",
"content": response.content
})
# 执行工具并收集结果
tool_results = []
for tool_use in tool_use_blocks:
tool_result = await tool_executor(
tool_use.name,
tool_use.input
)
tool_calls_log.append({
"tool": tool_use.name,
"input": tool_use.input,
"output": tool_result
})
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": str(tool_result)
})
messages.append({
"role": "user",
"content": tool_results
})
33.6 状态管理与上下文共享
33.6.1 共享状态存储
Multi-Agent 系统需要一个共享状态存储,让各 Agent 可以读写公共信息:
import threading
from dataclasses import dataclass, field
from typing import Any, Optional
import json
import time
@dataclass
class SharedState:
"""线程安全的共享状态存储"""
_lock: threading.Lock = field(default_factory=threading.Lock)
_data: Dict[str, Any] = field(default_factory=dict)
_history: List[Dict] = field(default_factory=list)
def set(self, key: str, value: Any, agent_id: str = "unknown"):
with self._lock:
old_value = self._data.get(key)
self._data[key] = value
self._history.append({
"timestamp": time.time(),
"agent": agent_id,
"action": "set",
"key": key,
"old_value": old_value,
"new_value": value
})
def get(self, key: str, default: Any = None) -> Any:
with self._lock:
return self._data.get(key, default)
def get_context_for_agent(self, relevant_keys: List[str]) -> str:
"""生成供 Agent 读取的上下文摘要"""
with self._lock:
context_items = {
k: v for k, v in self._data.items()
if k in relevant_keys
}
return json.dumps(context_items, ensure_ascii=False, indent=2)
def get_audit_log(self) -> List[Dict]:
with self._lock:
return list(self._history)
# 使用共享状态
shared_state = SharedState()
async def research_agent(topic: str, state: SharedState) -> str:
"""研究 Agent:搜索资料并将关键发现存入共享状态"""
result = await run_subagent(
f"研究以下主题,列出 5 个关键事实:{topic}",
"你是研究 Agent"
)
state.set(f"research_{topic}", result, agent_id="research_agent")
return result
async def writer_agent(topic: str, state: SharedState) -> str:
"""写作 Agent:基于共享状态中的研究结果撰写文章"""
research_context = state.get(f"research_{topic}", "")
result = await run_subagent(
f"基于以下研究结果,撰写一篇关于 {topic} 的文章:\n{research_context}",
"你是写作 Agent"
)
state.set(f"article_{topic}", result, agent_id="writer_agent")
return result
33.6.2 上下文压缩与传递
当 Agent 需要传递大量上下文时,使用压缩技术避免超出 token 限制:
async def compress_context(
full_context: str,
target_tokens: int = 2000
) -> str:
"""压缩长上下文,保留关键信息"""
compress_prompt = f"""
以下是需要压缩的上下文(约 {len(full_context)} 字符):
{full_context}
请将其压缩到约 {target_tokens} token 的摘要,保留:
1. 所有关键决策和结论
2. 重要的数据和数字
3. 任务进度和待办事项
4. 错误和异常情况
以结构化格式输出。
"""
response = client.messages.create(
model="claude-haiku-4-5", # 用较小模型做压缩,节省成本
max_tokens=target_tokens,
messages=[{"role": "user", "content": compress_prompt}]
)
return response.content[0].text
33.7 错误处理与容错机制
33.7.1 Subagent 失败重试
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def resilient_subagent(task: str, context: str) -> str:
"""带重试机制的 Subagent"""
try:
return await run_subagent(task, context)
except anthropic.RateLimitError:
# 速率限制,等待后重试
raise
except anthropic.APIError as e:
# API 错误,记录并重试
print(f"API 错误:{e}")
raise
async def safe_orchestrator(user_request: str) -> Dict[str, Any]:
"""带完整错误处理的编排器"""
failed_tasks = []
results = {}
tasks = [
{"id": "1", "task": "子任务 1"},
{"id": "2", "task": "子任务 2"},
{"id": "3", "task": "子任务 3"},
]
async def execute_with_fallback(task_info):
try:
result = await resilient_subagent(
task_info["task"],
f"请求:{user_request}"
)
return task_info["id"], result, None
except Exception as e:
failed_tasks.append(task_info["id"])
return task_info["id"], None, str(e)
task_results = await asyncio.gather(
*[execute_with_fallback(t) for t in tasks],
return_exceptions=True
)
for item in task_results:
if isinstance(item, Exception):
continue
task_id, result, error = item
if result:
results[task_id] = result
return {
"results": results,
"failed_tasks": failed_tasks,
"success_rate": len(results) / len(tasks)
}
33.7.2 超时控制
async def timeout_subagent(
task: str,
context: str,
timeout_seconds: float = 60.0
) -> Optional[str]:
"""带超时控制的 Subagent 执行"""
try:
result = await asyncio.wait_for(
run_subagent(task, context),
timeout=timeout_seconds
)
return result
except asyncio.TimeoutError:
print(f"任务超时({timeout_seconds}s):{task[:50]}...")
return None
33.8 成本优化策略
33.8.1 模型分层使用
不是所有子任务都需要用最强的模型。根据任务复杂度选择合适的模型:
MODEL_TIERS = {
"heavy": "claude-opus-4-5", # 复杂推理、最终综合
"medium": "claude-sonnet-4-5", # 一般分析、内容生成
"light": "claude-haiku-4-5" # 简单提取、格式转换、压缩
}
def select_model_for_task(task_type: str) -> str:
"""根据任务类型选择合适的模型"""
heavy_tasks = ["complex_analysis", "synthesis", "creative_writing"]
light_tasks = ["extraction", "formatting", "classification", "compression"]
if task_type in heavy_tasks:
return MODEL_TIERS["heavy"]
elif task_type in light_tasks:
return MODEL_TIERS["light"]
else:
return MODEL_TIERS["medium"]
成本对比参考(以 1M tokens 计):
| 模型 | 输入 | 输出 | 适用场景 |
|---|---|---|---|
| claude-opus-4-5 | $15 | $75 | 复杂推理、最终决策 |
| claude-sonnet-4-5 | $3 | $15 | 通用任务 |
| claude-haiku-4-5 | $0.25 | $1.25 | 简单处理、压缩 |
33.8.2 缓存 Subagent 结果
对相同或相似的子任务使用缓存,避免重复 API 调用:
import hashlib
from functools import lru_cache
class SubagentCache:
def __init__(self, max_size: int = 100):
self._cache: Dict[str, str] = {}
self._max_size = max_size
def _make_key(self, task: str, context: str) -> str:
content = f"{task}|||{context}"
return hashlib.md5(content.encode()).hexdigest()
async def get_or_execute(
self,
task: str,
context: str,
ttl_seconds: int = 3600
) -> str:
key = self._make_key(task, context)
if key in self._cache:
cached = self._cache[key]
if time.time() - cached["timestamp"] < ttl_seconds:
return cached["result"]
result = await run_subagent(task, context)
if len(self._cache) >= self._max_size:
oldest_key = min(self._cache, key=lambda k: self._cache[k]["timestamp"])
del self._cache[oldest_key]
self._cache[key] = {
"result": result,
"timestamp": time.time()
}
return result
33.9 实战案例:自动化研究报告生成系统
import asyncio
import anthropic
from typing import Dict, List
client = anthropic.Anthropic()
async def generate_research_report(topic: str) -> str:
"""
完整的多 Agent 研究报告生成流程:
1. 大纲生成 Agent
2. 并行章节研究 Agent(每个章节一个)
3. 事实核查 Agent
4. 综合撰写 Agent
"""
# Step 1: 生成研究大纲
outline_response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"为'{topic}'生成一个包含 4 个章节的研究大纲,每章一句话描述。JSON 格式:{{\"chapters\": [\"章节描述\"]}}"
}]
)
import json
outline = json.loads(outline_response.content[0].text)
chapters = outline["chapters"]
print(f"大纲生成完成,共 {len(chapters)} 章")
# Step 2: 并行研究各章节
async def research_chapter(idx: int, chapter: str) -> tuple:
response = client.messages.create(
model="claude-haiku-4-5", # 用轻量模型做研究
max_tokens=2048,
messages=[{
"role": "user",
"content": f"为研究报告写一章关于:{chapter}(主题:{topic})。约 300 字,包含具体数据和例子。"
}]
)
return idx, response.content[0].text
chapter_results = await asyncio.gather(
*[research_chapter(i, ch) for i, ch in enumerate(chapters)]
)
chapter_contents = dict(chapter_results)
print("各章节研究完成")
# Step 3: 事实核查
draft = "\n\n".join([
f"## 第{i+1}章:{chapters[i]}\n{chapter_contents[i]}"
for i in range(len(chapters))
])
fact_check_response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"检查以下草稿中的潜在事实错误或不确定性,列出需要注意的地方:\n\n{draft}"
}]
)
fact_check_notes = fact_check_response.content[0].text
# Step 4: 最终综合
final_response = client.messages.create(
model="claude-opus-4-5",
max_tokens=8192,
messages=[{
"role": "user",
"content": f"""
主题:{topic}
章节草稿:
{draft}
事实核查备注:
{fact_check_notes}
请综合以上内容,生成专业、连贯的研究报告。改正事实核查中发现的问题,优化行文风格。
"""
}]
)
return final_response.content[0].text
# 运行示例
if __name__ == "__main__":
report = asyncio.run(
generate_research_report("大型语言模型在企业应用中的现状与趋势")
)
print(report)
33.10 Multi-Agent 系统的设计原则
在构建 Multi-Agent 系统时,以下几个设计原则能帮助你避免常见陷阱:
原则一:明确责任边界。每个 Agent 应该有明确、单一的职责。避免 Agent 互相依赖导致循环等待。
原则二:最小上下文传递。只向 Subagent 传递其完成任务所必需的信息,避免 token 浪费和信息混淆。
原则三:幂等性设计。同一个子任务被执行多次(重试场景)时,结果应该是确定性的或可合并的,避免副作用叠加。
原则四:显式依赖图。在代码中明确定义任务间的依赖关系(DAG),不依赖隐式的执行顺序假设。
原则五:保留审计追踪。记录每个 Agent 的输入、输出和工具调用,便于调试和回溯分析。
原则六:渐进式复杂度。从最简单的中心化编排开始,只在必要时引入分层和流水线结构,避免过度工程化。
小结
本章系统介绍了 Multi-Agent 编排的核心模式与实现技术。从中心化 Hub-and-Spoke 到流水线编排,从共享状态管理到跨 Agent 工具调用,Multi-Agent 架构的威力在于将复杂任务的拆解与并行执行发挥到极致。
关键要点:
- 选择适合任务特性的编排模式(中心化 vs 流水线 vs 层次化)
- 使用结构化消息格式降低 Agent 间的解析错误
- 实现完善的错误处理、重试和超时机制
- 通过模型分层(Opus/Sonnet/Haiku)控制成本
- 共享状态存储需要线程安全设计
下一章我们将深入 MCP(Model Context Protocol)协议,了解 Anthropic 为工具生态制定的标准规范。