第 33 章

Managed Agents 生产实践:错误恢复、监控告警与成本控制

第三十三章:Multi-Agent 编排:让多个 Claude 实例协同工作

33.1 为什么需要多 Agent 协同

单个 Claude 实例在处理复杂任务时面临三类瓶颈:上下文窗口限制并行处理能力不足专业化分工缺失。当任务需要同时分析十万行代码库、协调多个独立子任务、或者需要不同"角色"互相校验时,单实例架构就开始暴露其局限。

Multi-Agent 架构的核心思想是将一个复杂任务分解为多个子任务,每个子任务交给一个专门的 Claude 实例(称为 SubagentWorker)处理,由一个 Orchestrator(编排器)协调整体流程。这一模式在以下场景中尤为有效:

根据 Anthropic 的官方文档,Claude 在 Multi-Agent 系统中可以扮演两种角色:作为 Orchestrator(向其他 Agent 发出指令)或作为 Subagent(执行具体任务并返回结果)。同一个 Claude 实例在不同架构层级中可能同时扮演两种角色。

33.2 编排模式分类

33.2.1 中心化编排(Hub-and-Spoke)

最常见的模式。一个 Orchestrator 接收用户请求,将任务分配给多个 Subagent,收集结果后聚合返回。

用户
  │
  ▼
Orchestrator(主 Claude 实例)
  ├── Worker A(负责任务 1)
  ├── Worker B(负责任务 2)
  └── Worker C(负责任务 3)
        │
        └── 结果汇总 → 用户

适用场景:任务可以明确拆分、子任务相对独立、需要最终综合判断的场景。

33.2.2 流水线编排(Pipeline)

每个 Agent 处理上一个 Agent 的输出,形成串行处理链。适合有明确顺序依赖的任务。

输入 → Agent A(提取) → Agent B(分析) → Agent C(报告) → 输出

适用场景:数据处理流水线、多阶段文档生成、需要逐步精炼的内容创作。

33.2.3 分层编排(Hierarchical)

Orchestrator 下面有 Sub-orchestrator,Sub-orchestrator 再管理 Worker。用于超大规模任务。

Master Orchestrator
  ├── Sub-orchestrator A
  │     ├── Worker A1
  │     └── Worker A2
  └── Sub-orchestrator B
        ├── Worker B1
        └── Worker B2

33.2.4 对等协作(Peer-to-Peer)

多个 Claude 实例平等协作,互相发送消息,通过共识或投票机制决定最终输出。常用于需要多视角验证的场景(如代码安全审查、医疗建议生成)。

33.3 使用 Anthropic SDK 实现基础编排

33.3.1 Python 实现示例

import anthropic
import asyncio
from typing import List, Dict, Any

client = anthropic.Anthropic()

async def run_subagent(
    task: str,
    context: str,
    model: str = "claude-opus-4-5"
) -> str:
    """运行单个 Subagent 执行具体任务"""
    message = client.messages.create(
        model=model,
        max_tokens=4096,
        system=f"""你是一个专业的分析 Agent。
你的任务是专注处理分配给你的子任务,提供详细、准确的结果。
背景信息:{context}""",
        messages=[
            {"role": "user", "content": task}
        ]
    )
    return message.content[0].text


async def orchestrator(user_request: str) -> str:
    """
    Orchestrator:接收用户请求,分解任务,协调 Subagent,汇总结果
    """
    # 第一步:让 Orchestrator 分解任务
    decomposition_response = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=2048,
        system="""你是一个任务编排器。
收到用户请求后,将其分解为 2-4 个独立的子任务。
以 JSON 格式返回,结构为:
{
  "subtasks": [
    {"id": "1", "task": "任务描述", "requires": []},
    {"id": "2", "task": "任务描述", "requires": ["1"]}
  ]
}
requires 字段表示依赖关系(需要先完成哪些任务)。""",
        messages=[
            {"role": "user", "content": f"请分解以下任务:{user_request}"}
        ]
    )
    
    import json
    decomposition = json.loads(decomposition_response.content[0].text)
    subtasks = decomposition["subtasks"]
    
    # 第二步:执行无依赖的任务(并行)
    results: Dict[str, str] = {}
    
    # 找出没有依赖的任务
    ready_tasks = [t for t in subtasks if not t.get("requires")]
    
    # 并行执行
    async def execute_task(task_info):
        context = f"原始用户请求:{user_request}"
        result = await run_subagent(task_info["task"], context)
        return task_info["id"], result
    
    while ready_tasks:
        # 并行执行所有就绪任务
        task_results = await asyncio.gather(
            *[execute_task(t) for t in ready_tasks]
        )
        for task_id, result in task_results:
            results[task_id] = result
        
        # 找出新的就绪任务(依赖已全部完成)
        completed_ids = set(results.keys())
        remaining = [t for t in subtasks if t["id"] not in completed_ids]
        ready_tasks = [
            t for t in remaining
            if all(dep in completed_ids for dep in t.get("requires", []))
        ]
    
    # 第三步:汇总所有结果
    synthesis_prompt = f"""
用户的原始请求:{user_request}

各子任务的执行结果:
{chr(10).join([f"子任务 {k}:{v}" for k, v in results.items()])}

请将以上结果综合整理,形成完整、连贯的最终回答。
"""
    
    final_response = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=4096,
        messages=[{"role": "user", "content": synthesis_prompt}]
    )
    
    return final_response.content[0].text


# 使用示例
async def main():
    result = await orchestrator(
        "分析 GPT-4 和 Claude 3.5 Sonnet 在代码生成、推理和创意写作三个维度的优劣势,并给出选型建议"
    )
    print(result)

if __name__ == "__main__":
    asyncio.run(main())

33.3.2 TypeScript 实现示例

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

interface SubTask {
  id: string;
  task: string;
  requires: string[];
}

interface TaskDecomposition {
  subtasks: SubTask[];
}

async function runSubagent(
  task: string,
  context: string,
  model: string = "claude-opus-4-5"
): Promise<string> {
  const message = await client.messages.create({
    model,
    max_tokens: 4096,
    system: `你是一个专业分析 Agent。专注处理分配的子任务,提供详细准确的结果。
背景:${context}`,
    messages: [{ role: "user", content: task }],
  });

  const block = message.content[0];
  return block.type === "text" ? block.text : "";
}

async function orchestrate(userRequest: string): Promise<string> {
  // 分解任务
  const decompositionMsg = await client.messages.create({
    model: "claude-opus-4-5",
    max_tokens: 2048,
    system: `你是任务编排器。将用户请求分解为独立子任务,以 JSON 格式返回:
{"subtasks": [{"id": "1", "task": "...", "requires": []}]}`,
    messages: [{ role: "user", content: `分解任务:${userRequest}` }],
  });

  const block = decompositionMsg.content[0];
  const decomposition: TaskDecomposition = JSON.parse(
    block.type === "text" ? block.text : "{}"
  );

  const results: Record<string, string> = {};
  const subtasks = decomposition.subtasks;

  // 按依赖顺序执行
  let remaining = [...subtasks];
  while (remaining.length > 0) {
    const completedIds = new Set(Object.keys(results));
    const ready = remaining.filter((t) =>
      t.requires.every((dep) => completedIds.has(dep))
    );

    if (ready.length === 0) break; // 避免死锁

    // 并行执行就绪任务
    const taskResults = await Promise.all(
      ready.map(async (t) => {
        const result = await runSubagent(
          t.task,
          `原始请求:${userRequest}`
        );
        return [t.id, result] as [string, string];
      })
    );

    for (const [id, result] of taskResults) {
      results[id] = result;
    }

    remaining = remaining.filter((t) => !(t.id in results));
  }

  // 汇总
  const synthesisPrompt = `
原始请求:${userRequest}

子任务结果:
${Object.entries(results)
  .map(([k, v]) => `任务 ${k}:${v}`)
  .join("\n\n")}

请综合以上结果,形成完整回答。
`;

  const finalMsg = await client.messages.create({
    model: "claude-opus-4-5",
    max_tokens: 4096,
    messages: [{ role: "user", content: synthesisPrompt }],
  });

  const finalBlock = finalMsg.content[0];
  return finalBlock.type === "text" ? finalBlock.text : "";
}

// 使用
orchestrate("对比分析三种主流 LLM API 的定价策略和适用场景").then(console.log);

33.4 消息传递机制与结果聚合

33.4.1 结构化消息格式

Agent 间传递消息时,使用结构化格式能极大降低解析错误率。推荐的消息信封格式:

{
  "message_id": "msg_20250101_001",
  "from": "orchestrator",
  "to": "worker_a",
  "task_id": "task_001",
  "type": "task_assignment",
  "payload": {
    "instruction": "分析以下代码段的时间复杂度",
    "data": "...",
    "constraints": {
      "max_tokens": 1024,
      "format": "json",
      "deadline_ms": 30000
    }
  },
  "metadata": {
    "priority": "high",
    "retry_count": 0,
    "parent_task": "task_000"
  }
}

33.4.2 结果聚合策略

不同场景需要不同的聚合策略:

投票聚合(Voting Aggregation):适合有明确对错的判断类任务。

def voting_aggregation(results: List[str], question: str) -> str:
    """让多个 Agent 投票,选择出现频率最高的答案"""
    vote_prompt = f"""
以下是多个 Agent 对同一问题的回答:
{chr(10).join([f"Agent {i+1}:{r}" for i, r in enumerate(results)])}

问题:{question}

请分析这些答案,找出共识,给出最终答案。如果存在分歧,说明分歧原因。
"""
    response = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=2048,
        messages=[{"role": "user", "content": vote_prompt}]
    )
    return response.content[0].text

加权聚合(Weighted Aggregation):给不同专长的 Agent 分配不同权重。

def weighted_aggregation(
    results: List[Dict[str, Any]],
    weights: Dict[str, float]
) -> str:
    """
    results: [{"agent_id": "expert_a", "content": "...", "confidence": 0.9}]
    weights: {"expert_a": 0.6, "expert_b": 0.4}
    """
    weighted_prompt = "\n".join([
        f"Agent {r['agent_id']}(权重 {weights.get(r['agent_id'], 0.5):.1f},"
        f"置信度 {r.get('confidence', 0.5):.1f}):{r['content']}"
        for r in results
    ])
    
    response = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=2048,
        messages=[{
            "role": "user",
            "content": f"综合以下加权意见,给出最终结论:\n{weighted_prompt}"
        }]
    )
    return response.content[0].text

33.5 工具调用在 Multi-Agent 中的传递

33.5.1 共享工具定义

在 Multi-Agent 系统中,所有 Agent 通常共享一套工具定义,但不同 Agent 可以访问不同工具子集:

# 工具注册表
TOOL_REGISTRY = {
    "web_search": {
        "name": "web_search",
        "description": "搜索互联网获取最新信息",
        "input_schema": {
            "type": "object",
            "properties": {
                "query": {"type": "string", "description": "搜索关键词"},
                "max_results": {"type": "integer", "default": 5}
            },
            "required": ["query"]
        }
    },
    "code_executor": {
        "name": "code_executor",
        "description": "执行 Python 代码并返回结果",
        "input_schema": {
            "type": "object",
            "properties": {
                "code": {"type": "string"},
                "timeout": {"type": "integer", "default": 30}
            },
            "required": ["code"]
        }
    },
    "file_reader": {
        "name": "file_reader",
        "description": "读取本地文件内容",
        "input_schema": {
            "type": "object",
            "properties": {
                "path": {"type": "string"},
                "encoding": {"type": "string", "default": "utf-8"}
            },
            "required": ["path"]
        }
    }
}

# 不同角色获得不同工具权限
ROLE_TOOLS = {
    "researcher": ["web_search", "file_reader"],
    "coder": ["code_executor", "file_reader"],
    "orchestrator": ["web_search", "code_executor", "file_reader"]
}

def get_tools_for_role(role: str) -> List[dict]:
    tool_names = ROLE_TOOLS.get(role, [])
    return [TOOL_REGISTRY[name] for name in tool_names if name in TOOL_REGISTRY]

33.5.2 工具调用结果的跨 Agent 传递

当 Subagent 调用工具后,其结果需要传递给 Orchestrator 或其他 Agent:

async def run_agent_with_tools(
    task: str,
    tools: List[dict],
    tool_executor: callable
) -> Dict[str, Any]:
    """
    运行带工具调用能力的 Agent,处理完整的工具调用循环
    返回最终结果和所有工具调用记录
    """
    messages = [{"role": "user", "content": task}]
    tool_calls_log = []
    
    while True:
        response = client.messages.create(
            model="claude-opus-4-5",
            max_tokens=4096,
            tools=tools,
            messages=messages
        )
        
        if response.stop_reason == "end_turn":
            # Agent 完成任务
            final_text = next(
                (b.text for b in response.content if b.type == "text"), ""
            )
            return {
                "result": final_text,
                "tool_calls": tool_calls_log,
                "usage": response.usage.model_dump()
            }
        
        elif response.stop_reason == "tool_use":
            # 处理工具调用
            tool_use_blocks = [
                b for b in response.content if b.type == "tool_use"
            ]
            
            # 将 Assistant 消息加入历史
            messages.append({
                "role": "assistant",
                "content": response.content
            })
            
            # 执行工具并收集结果
            tool_results = []
            for tool_use in tool_use_blocks:
                tool_result = await tool_executor(
                    tool_use.name,
                    tool_use.input
                )
                tool_calls_log.append({
                    "tool": tool_use.name,
                    "input": tool_use.input,
                    "output": tool_result
                })
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": tool_use.id,
                    "content": str(tool_result)
                })
            
            messages.append({
                "role": "user",
                "content": tool_results
            })

33.6 状态管理与上下文共享

33.6.1 共享状态存储

Multi-Agent 系统需要一个共享状态存储,让各 Agent 可以读写公共信息:

import threading
from dataclasses import dataclass, field
from typing import Any, Optional
import json
import time

@dataclass
class SharedState:
    """线程安全的共享状态存储"""
    _lock: threading.Lock = field(default_factory=threading.Lock)
    _data: Dict[str, Any] = field(default_factory=dict)
    _history: List[Dict] = field(default_factory=list)
    
    def set(self, key: str, value: Any, agent_id: str = "unknown"):
        with self._lock:
            old_value = self._data.get(key)
            self._data[key] = value
            self._history.append({
                "timestamp": time.time(),
                "agent": agent_id,
                "action": "set",
                "key": key,
                "old_value": old_value,
                "new_value": value
            })
    
    def get(self, key: str, default: Any = None) -> Any:
        with self._lock:
            return self._data.get(key, default)
    
    def get_context_for_agent(self, relevant_keys: List[str]) -> str:
        """生成供 Agent 读取的上下文摘要"""
        with self._lock:
            context_items = {
                k: v for k, v in self._data.items()
                if k in relevant_keys
            }
        return json.dumps(context_items, ensure_ascii=False, indent=2)
    
    def get_audit_log(self) -> List[Dict]:
        with self._lock:
            return list(self._history)


# 使用共享状态
shared_state = SharedState()

async def research_agent(topic: str, state: SharedState) -> str:
    """研究 Agent:搜索资料并将关键发现存入共享状态"""
    result = await run_subagent(
        f"研究以下主题,列出 5 个关键事实:{topic}",
        "你是研究 Agent"
    )
    state.set(f"research_{topic}", result, agent_id="research_agent")
    return result

async def writer_agent(topic: str, state: SharedState) -> str:
    """写作 Agent:基于共享状态中的研究结果撰写文章"""
    research_context = state.get(f"research_{topic}", "")
    result = await run_subagent(
        f"基于以下研究结果,撰写一篇关于 {topic} 的文章:\n{research_context}",
        "你是写作 Agent"
    )
    state.set(f"article_{topic}", result, agent_id="writer_agent")
    return result

33.6.2 上下文压缩与传递

当 Agent 需要传递大量上下文时,使用压缩技术避免超出 token 限制:

async def compress_context(
    full_context: str,
    target_tokens: int = 2000
) -> str:
    """压缩长上下文,保留关键信息"""
    compress_prompt = f"""
以下是需要压缩的上下文(约 {len(full_context)} 字符):

{full_context}

请将其压缩到约 {target_tokens} token 的摘要,保留:
1. 所有关键决策和结论
2. 重要的数据和数字
3. 任务进度和待办事项
4. 错误和异常情况

以结构化格式输出。
"""
    response = client.messages.create(
        model="claude-haiku-4-5",  # 用较小模型做压缩,节省成本
        max_tokens=target_tokens,
        messages=[{"role": "user", "content": compress_prompt}]
    )
    return response.content[0].text

33.7 错误处理与容错机制

33.7.1 Subagent 失败重试

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def resilient_subagent(task: str, context: str) -> str:
    """带重试机制的 Subagent"""
    try:
        return await run_subagent(task, context)
    except anthropic.RateLimitError:
        # 速率限制,等待后重试
        raise
    except anthropic.APIError as e:
        # API 错误,记录并重试
        print(f"API 错误:{e}")
        raise


async def safe_orchestrator(user_request: str) -> Dict[str, Any]:
    """带完整错误处理的编排器"""
    failed_tasks = []
    results = {}
    
    tasks = [
        {"id": "1", "task": "子任务 1"},
        {"id": "2", "task": "子任务 2"},
        {"id": "3", "task": "子任务 3"},
    ]
    
    async def execute_with_fallback(task_info):
        try:
            result = await resilient_subagent(
                task_info["task"],
                f"请求:{user_request}"
            )
            return task_info["id"], result, None
        except Exception as e:
            failed_tasks.append(task_info["id"])
            return task_info["id"], None, str(e)
    
    task_results = await asyncio.gather(
        *[execute_with_fallback(t) for t in tasks],
        return_exceptions=True
    )
    
    for item in task_results:
        if isinstance(item, Exception):
            continue
        task_id, result, error = item
        if result:
            results[task_id] = result
    
    return {
        "results": results,
        "failed_tasks": failed_tasks,
        "success_rate": len(results) / len(tasks)
    }

33.7.2 超时控制

async def timeout_subagent(
    task: str,
    context: str,
    timeout_seconds: float = 60.0
) -> Optional[str]:
    """带超时控制的 Subagent 执行"""
    try:
        result = await asyncio.wait_for(
            run_subagent(task, context),
            timeout=timeout_seconds
        )
        return result
    except asyncio.TimeoutError:
        print(f"任务超时({timeout_seconds}s):{task[:50]}...")
        return None

33.8 成本优化策略

33.8.1 模型分层使用

不是所有子任务都需要用最强的模型。根据任务复杂度选择合适的模型:

MODEL_TIERS = {
    "heavy": "claude-opus-4-5",       # 复杂推理、最终综合
    "medium": "claude-sonnet-4-5",    # 一般分析、内容生成
    "light": "claude-haiku-4-5"       # 简单提取、格式转换、压缩
}

def select_model_for_task(task_type: str) -> str:
    """根据任务类型选择合适的模型"""
    heavy_tasks = ["complex_analysis", "synthesis", "creative_writing"]
    light_tasks = ["extraction", "formatting", "classification", "compression"]
    
    if task_type in heavy_tasks:
        return MODEL_TIERS["heavy"]
    elif task_type in light_tasks:
        return MODEL_TIERS["light"]
    else:
        return MODEL_TIERS["medium"]

成本对比参考(以 1M tokens 计)

模型 输入 输出 适用场景
claude-opus-4-5 $15 $75 复杂推理、最终决策
claude-sonnet-4-5 $3 $15 通用任务
claude-haiku-4-5 $0.25 $1.25 简单处理、压缩

33.8.2 缓存 Subagent 结果

对相同或相似的子任务使用缓存,避免重复 API 调用:

import hashlib
from functools import lru_cache

class SubagentCache:
    def __init__(self, max_size: int = 100):
        self._cache: Dict[str, str] = {}
        self._max_size = max_size
    
    def _make_key(self, task: str, context: str) -> str:
        content = f"{task}|||{context}"
        return hashlib.md5(content.encode()).hexdigest()
    
    async def get_or_execute(
        self,
        task: str,
        context: str,
        ttl_seconds: int = 3600
    ) -> str:
        key = self._make_key(task, context)
        
        if key in self._cache:
            cached = self._cache[key]
            if time.time() - cached["timestamp"] < ttl_seconds:
                return cached["result"]
        
        result = await run_subagent(task, context)
        
        if len(self._cache) >= self._max_size:
            oldest_key = min(self._cache, key=lambda k: self._cache[k]["timestamp"])
            del self._cache[oldest_key]
        
        self._cache[key] = {
            "result": result,
            "timestamp": time.time()
        }
        return result

33.9 实战案例:自动化研究报告生成系统

import asyncio
import anthropic
from typing import Dict, List

client = anthropic.Anthropic()

async def generate_research_report(topic: str) -> str:
    """
    完整的多 Agent 研究报告生成流程:
    1. 大纲生成 Agent
    2. 并行章节研究 Agent(每个章节一个)
    3. 事实核查 Agent
    4. 综合撰写 Agent
    """
    
    # Step 1: 生成研究大纲
    outline_response = client.messages.create(
        model="claude-sonnet-4-5",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": f"为'{topic}'生成一个包含 4 个章节的研究大纲,每章一句话描述。JSON 格式:{{\"chapters\": [\"章节描述\"]}}"
        }]
    )
    
    import json
    outline = json.loads(outline_response.content[0].text)
    chapters = outline["chapters"]
    
    print(f"大纲生成完成,共 {len(chapters)} 章")
    
    # Step 2: 并行研究各章节
    async def research_chapter(idx: int, chapter: str) -> tuple:
        response = client.messages.create(
            model="claude-haiku-4-5",  # 用轻量模型做研究
            max_tokens=2048,
            messages=[{
                "role": "user",
                "content": f"为研究报告写一章关于:{chapter}(主题:{topic})。约 300 字,包含具体数据和例子。"
            }]
        )
        return idx, response.content[0].text
    
    chapter_results = await asyncio.gather(
        *[research_chapter(i, ch) for i, ch in enumerate(chapters)]
    )
    
    chapter_contents = dict(chapter_results)
    print("各章节研究完成")
    
    # Step 3: 事实核查
    draft = "\n\n".join([
        f"## 第{i+1}章:{chapters[i]}\n{chapter_contents[i]}"
        for i in range(len(chapters))
    ])
    
    fact_check_response = client.messages.create(
        model="claude-sonnet-4-5",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": f"检查以下草稿中的潜在事实错误或不确定性,列出需要注意的地方:\n\n{draft}"
        }]
    )
    
    fact_check_notes = fact_check_response.content[0].text
    
    # Step 4: 最终综合
    final_response = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=8192,
        messages=[{
            "role": "user",
            "content": f"""
主题:{topic}

章节草稿:
{draft}

事实核查备注:
{fact_check_notes}

请综合以上内容,生成专业、连贯的研究报告。改正事实核查中发现的问题,优化行文风格。
"""
        }]
    )
    
    return final_response.content[0].text


# 运行示例
if __name__ == "__main__":
    report = asyncio.run(
        generate_research_report("大型语言模型在企业应用中的现状与趋势")
    )
    print(report)

33.10 Multi-Agent 系统的设计原则

在构建 Multi-Agent 系统时,以下几个设计原则能帮助你避免常见陷阱:

原则一:明确责任边界。每个 Agent 应该有明确、单一的职责。避免 Agent 互相依赖导致循环等待。

原则二:最小上下文传递。只向 Subagent 传递其完成任务所必需的信息,避免 token 浪费和信息混淆。

原则三:幂等性设计。同一个子任务被执行多次(重试场景)时,结果应该是确定性的或可合并的,避免副作用叠加。

原则四:显式依赖图。在代码中明确定义任务间的依赖关系(DAG),不依赖隐式的执行顺序假设。

原则五:保留审计追踪。记录每个 Agent 的输入、输出和工具调用,便于调试和回溯分析。

原则六:渐进式复杂度。从最简单的中心化编排开始,只在必要时引入分层和流水线结构,避免过度工程化。


小结

本章系统介绍了 Multi-Agent 编排的核心模式与实现技术。从中心化 Hub-and-Spoke 到流水线编排,从共享状态管理到跨 Agent 工具调用,Multi-Agent 架构的威力在于将复杂任务的拆解与并行执行发挥到极致。

关键要点:

  1. 选择适合任务特性的编排模式(中心化 vs 流水线 vs 层次化)
  2. 使用结构化消息格式降低 Agent 间的解析错误
  3. 实现完善的错误处理、重试和超时机制
  4. 通过模型分层(Opus/Sonnet/Haiku)控制成本
  5. 共享状态存储需要线程安全设计

下一章我们将深入 MCP(Model Context Protocol)协议,了解 Anthropic 为工具生态制定的标准规范。

本章评分
4.7  / 5  (3 评分)

💬 留言讨论