第 20 章
Advisor Tool:Executor + Advisor 双模型单次 API 实现高质量长程任务
第二十章:并行工具调用:同步执行多工具的协调策略
20.1 并行工具调用的价值
在处理复杂任务时,顺序执行工具会产生不必要的延迟。如果用户问"帮我比较三个城市明天的天气并推荐穿衣建议",顺序调用三次天气 API 会让等待时间翻三倍。并行工具调用允许 Claude 在单次响应中请求多个工具,这些工具可以同时执行,显著减少总延迟。
Claude 支持在单个 stop_reason: tool_use 响应中包含多个 tool_use block。开发者检测到多个工具调用时,应并发执行它们,然后将所有结果一次性注入消息历史。
并行调用适用的场景:
- 数据聚合:同时从多个数据源获取信息
- 批量操作:对多个实体执行相同的操作
- 独立子任务:将大任务分解为互不依赖的子任务并行处理
- 多视角分析:同时从不同角度分析同一问题
20.2 并行调用的消息格式
单次响应多工具调用
Claude 可以在一次响应中请求多个工具:
import anthropic
import json
client = anthropic.Anthropic()
# Claude 在一次响应中请求了三个工具
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=4096,
tools=[weather_tool, restaurant_tool, events_tool],
messages=[{
"role": "user",
"content": "查询上海明天的天气、推荐三家餐厅、列出明天的文化活动"
}]
)
# response.content 可能包含多个 tool_use blocks
print(f"Stop reason: {response.stop_reason}") # "tool_use"
print(f"Number of tool calls: {sum(1 for b in response.content if b.type == 'tool_use')}")
for block in response.content:
if block.type == "tool_use":
print(f" - {block.name}: {json.dumps(block.input, ensure_ascii=False)}")
并行执行与结果注入
关键点:所有工具的结果必须在同一轮消息中注入,而不是分多次:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple
import anthropic
async def execute_tools_parallel(
tool_calls: List[anthropic.types.ToolUseBlock],
tool_executor
) -> List[dict]:
"""并行执行多个工具调用"""
async def execute_single(tool_use_block):
"""在线程池中执行单个工具"""
loop = asyncio.get_event_loop()
try:
result = await loop.run_in_executor(
None, # 使用默认线程池
lambda: tool_executor.execute(
tool_use_block.name,
tool_use_block.input
)
)
return {
"type": "tool_result",
"tool_use_id": tool_use_block.id,
"content": json.dumps(result, ensure_ascii=False, default=str)
}
except Exception as e:
return {
"type": "tool_result",
"tool_use_id": tool_use_block.id,
"content": f"错误: {str(e)}",
"is_error": True
}
# 并行执行所有工具
tasks = [execute_single(block) for block in tool_calls]
results = await asyncio.gather(*tasks)
return list(results)
async def run_parallel_agent(user_message: str, tools: list, executor) -> str:
"""运行支持并行工具调用的 Agent"""
client = anthropic.Anthropic()
messages = [{"role": "user", "content": user_message}]
while True:
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=4096,
tools=tools,
messages=messages
)
if response.stop_reason == "end_turn":
return ' '.join(b.text for b in response.content if b.type == "text")
if response.stop_reason == "tool_use":
# 收集所有工具调用
tool_calls = [b for b in response.content if b.type == "tool_use"]
print(f"并行执行 {len(tool_calls)} 个工具调用...")
# 并行执行
tool_results = await execute_tools_parallel(tool_calls, executor)
# 一次性注入所有结果
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
else:
break
return "任务完成"
20.3 同步版本的并行执行
如果不使用 asyncio,可以用 ThreadPoolExecutor 实现并行:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
class ParallelToolExecutor:
"""支持并行执行的工具执行器"""
def __init__(self, max_workers: int = 10):
self.tools = {}
self.tool_definitions = []
self.max_workers = max_workers
def register(self, tool_definition: dict, func):
self.tools[tool_definition["name"]] = func
self.tool_definitions.append(tool_definition)
return self
def execute_parallel(self, tool_calls: list) -> list:
"""并行执行多个工具调用,返回所有结果"""
results = [None] * len(tool_calls) # 保持顺序
def execute_one(index_and_call):
index, tool_call = index_and_call
start_time = time.time()
try:
if hasattr(tool_call, 'name'):
# anthropic ToolUseBlock 对象
name = tool_call.name
inputs = tool_call.input
tool_use_id = tool_call.id
else:
name = tool_call["name"]
inputs = tool_call["input"]
tool_use_id = tool_call["id"]
if name not in self.tools:
raise ValueError(f"未知工具: {name}")
result = self.tools[name](**inputs)
elapsed = time.time() - start_time
print(f" [OK] {name} 完成,耗时 {elapsed:.2f}s")
return index, {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": json.dumps(result, ensure_ascii=False, default=str)
}
except Exception as e:
elapsed = time.time() - start_time
print(f" [ERROR] {name} 失败,耗时 {elapsed:.2f}s: {e}")
return index, {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": str(e),
"is_error": True
}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [
executor.submit(execute_one, (i, call))
for i, call in enumerate(tool_calls)
]
for future in as_completed(futures):
index, result = future.result()
results[index] = result
return results
def run_agent(self, user_message: str, system: str = "") -> str:
"""运行完整的并行工具调用 Agent"""
client = anthropic.Anthropic()
messages = [{"role": "user", "content": user_message}]
create_kwargs = {
"model": "claude-opus-4-5",
"max_tokens": 4096,
"tools": self.tool_definitions,
"messages": messages
}
if system:
create_kwargs["system"] = system
iteration = 0
while iteration < 15:
iteration += 1
response = client.messages.create(**create_kwargs)
if response.stop_reason == "end_turn":
return ' '.join(b.text for b in response.content if b.type == "text")
if response.stop_reason == "tool_use":
tool_calls = [b for b in response.content if b.type == "tool_use"]
start = time.time()
print(f"\n轮次 {iteration}:并行执行 {len(tool_calls)} 个工具")
# 并行执行
tool_results = self.execute_parallel(tool_calls)
total_time = time.time() - start
print(f"全部完成,总耗时 {total_time:.2f}s")
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
create_kwargs["messages"] = messages
else:
break
return "达到最大迭代次数"
20.4 控制并行调用的工具设计
通过工具描述引导 Claude 在合适的时机使用并行调用:
# 明确告诉 Claude 这些工具可以并行调用
parallelizable_tools = [
{
"name": "get_stock_price",
"description": """获取单个股票的当前价格和涨跌信息。
注意:当需要查询多只股票时,可以同时调用此工具多次(并行调用),
而不需要等待每次结果再继续。""",
"input_schema": {
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "股票代码,如 'AAPL'、'TSLA'、'000001.SZ'"
}
},
"required": ["symbol"]
}
},
{
"name": "get_company_info",
"description": """获取公司基本信息(成立时间、主营业务、员工人数等)。
此工具独立于 get_stock_price,可以同时调用。""",
"input_schema": {
"type": "object",
"properties": {
"symbol": {"type": "string", "description": "股票代码"}
},
"required": ["symbol"]
}
}
]
不应并行的工具:依赖链处理
某些工具存在数据依赖,必须顺序执行:
sequential_tools = [
{
"name": "create_order",
"description": """创建订单,返回 order_id。
重要:必须在调用 process_payment 和 send_confirmation 之前调用此工具,
因为这两个工具需要 order_id。""",
"input_schema": {
"type": "object",
"properties": {
"product_id": {"type": "string"},
"quantity": {"type": "integer", "minimum": 1},
"user_id": {"type": "string"}
},
"required": ["product_id", "quantity", "user_id"]
}
},
{
"name": "process_payment",
"description": """处理订单支付。
依赖:需要先用 create_order 创建订单获取 order_id。
注意:process_payment 和 send_confirmation 在有了 order_id 之后可以并行调用。""",
"input_schema": {
"type": "object",
"properties": {
"order_id": {"type": "string", "description": "由 create_order 返回"},
"payment_method": {"type": "string", "enum": ["credit_card", "alipay", "wechat_pay"]}
},
"required": ["order_id", "payment_method"]
}
},
{
"name": "send_confirmation",
"description": """发送订单确认邮件。
依赖:需要先用 create_order 创建订单获取 order_id。
可以与 process_payment 并行执行。""",
"input_schema": {
"type": "object",
"properties": {
"order_id": {"type": "string"},
"email": {"type": "string"}
},
"required": ["order_id", "email"]
}
}
]
20.5 并行调用的超时与降级策略
单工具超时控制
import signal
from contextlib import contextmanager
from typing import Optional
@contextmanager
def timeout(seconds: int):
"""Unix 系统的超时上下文管理器"""
def handler(signum, frame):
raise TimeoutError(f"工具执行超时({seconds}秒)")
signal.signal(signal.SIGALRM, handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
class TimeoutParallelExecutor(ParallelToolExecutor):
"""带超时控制的并行执行器"""
def __init__(self, max_workers: int = 10, tool_timeout_seconds: int = 30):
super().__init__(max_workers)
self.tool_timeout = tool_timeout_seconds
def execute_with_timeout(self, tool_name: str, inputs: dict,
tool_use_id: str) -> dict:
"""带超时的单工具执行"""
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(self.tools[tool_name], **inputs)
try:
result = future.result(timeout=self.tool_timeout)
return {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": json.dumps(result, ensure_ascii=False, default=str)
}
except FuturesTimeout:
return {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": f"工具 {tool_name} 执行超时({self.tool_timeout}秒)",
"is_error": True
}
降级策略:部分失败处理
def handle_partial_failures(tool_results: list, critical_tools: set) -> tuple:
"""
处理部分工具失败的情况
返回:(should_continue, enhanced_results)
"""
failed_tools = []
for result in tool_results:
if result.get("is_error"):
tool_id = result["tool_use_id"]
failed_tools.append(tool_id)
# 检查是否有关键工具失败
# 注:实际使用时需要建立 tool_use_id 到 tool_name 的映射
if failed_tools:
print(f"警告:{len(failed_tools)} 个工具执行失败")
# 可以选择:
# 1. 继续并让 Claude 基于部分结果给出答案
# 2. 抛出异常终止
# 3. 重试失败的工具
return True, tool_results # 这里选择继续
20.6 实战案例:竞争对手分析 Agent
def build_competitor_analysis_agent():
"""构建并行分析多个竞争对手的 Agent"""
executor = ParallelToolExecutor(max_workers=10)
def fetch_company_website(url: str) -> dict:
"""抓取公司官网信息(模拟)"""
return {
"url": url,
"title": f"{url} 的主页标题",
"description": "公司主营业务描述",
"product_count": 42
}
def get_crunchbase_data(company_name: str) -> dict:
"""获取 Crunchbase 融资信息(模拟)"""
return {
"company": company_name,
"total_funding": "$50M",
"last_round": "Series B",
"investors": ["a16z", "Sequoia"]
}
def analyze_app_store(app_name: str) -> dict:
"""分析 App Store 评分(模拟)"""
return {
"app": app_name,
"rating": 4.3,
"reviews_count": 12500,
"top_complaint": "加载速度慢"
}
executor.register({
"name": "fetch_company_website",
"description": "抓取指定公司官网的产品信息。可以对多个公司并行调用。",
"input_schema": {
"type": "object",
"properties": {"url": {"type": "string"}},
"required": ["url"]
}
}, fetch_company_website)
executor.register({
"name": "get_crunchbase_data",
"description": "获取公司的融资历史和投资人信息。可以对多个公司并行调用。",
"input_schema": {
"type": "object",
"properties": {"company_name": {"type": "string"}},
"required": ["company_name"]
}
}, get_crunchbase_data)
executor.register({
"name": "analyze_app_store",
"description": "分析应用在 App Store 的用户评价。可以对多个应用并行调用。",
"input_schema": {
"type": "object",
"properties": {"app_name": {"type": "string"}},
"required": ["app_name"]
}
}, analyze_app_store)
return executor
# 使用
executor = build_competitor_analysis_agent()
result = executor.run_agent(
user_message="""请帮我分析以下三家 AI 写作工具的竞争情况:
Jasper (jasper.ai)、Copy.ai (copy.ai)、Writesonic (writesonic.com)。
对每家公司:
1. 抓取官网信息
2. 查询融资情况
3. 分析 App Store 评价
最后给出综合对比报告。""",
system="你是一个专业的市场分析师,善于从多维度收集和分析竞争对手信息。"
)
print(result)
20.7 性能监控与优化
import time
from dataclasses import dataclass, field
from typing import Dict, List
@dataclass
class ToolCallMetrics:
"""工具调用性能指标"""
tool_name: str
start_time: float
end_time: float = 0
success: bool = True
error_msg: str = ""
@property
def duration(self) -> float:
return self.end_time - self.start_time
class MonitoredParallelExecutor(ParallelToolExecutor):
"""带性能监控的并行执行器"""
def __init__(self, max_workers: int = 10):
super().__init__(max_workers)
self.metrics: List[ToolCallMetrics] = []
def execute_parallel(self, tool_calls: list) -> list:
"""带监控的并行执行"""
results = []
metrics_batch = []
def monitored_execute(index_and_call):
index, tool_call = index_and_call
name = tool_call.name if hasattr(tool_call, 'name') else tool_call["name"]
tool_use_id = tool_call.id if hasattr(tool_call, 'id') else tool_call["id"]
inputs = tool_call.input if hasattr(tool_call, 'input') else tool_call["input"]
metric = ToolCallMetrics(tool_name=name, start_time=time.time())
try:
result_data = self.tools[name](**inputs)
metric.end_time = time.time()
metric.success = True
return index, metric, {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": json.dumps(result_data, ensure_ascii=False, default=str)
}
except Exception as e:
metric.end_time = time.time()
metric.success = False
metric.error_msg = str(e)
return index, metric, {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": str(e),
"is_error": True
}
ordered_results = [None] * len(tool_calls)
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [
executor.submit(monitored_execute, (i, call))
for i, call in enumerate(tool_calls)
]
for future in as_completed(futures):
index, metric, result = future.result()
ordered_results[index] = result
self.metrics.append(metric)
return ordered_results
def print_performance_report(self):
"""打印性能报告"""
if not self.metrics:
print("没有工具调用记录")
return
print("\n=== 工具调用性能报告 ===")
print(f"{'工具名称':<30} {'耗时(s)':<10} {'状态':<10}")
print("-" * 55)
by_tool: Dict[str, List[float]] = {}
for m in self.metrics:
if m.tool_name not in by_tool:
by_tool[m.tool_name] = []
by_tool[m.tool_name].append(m.duration)
for tool_name, durations in by_tool.items():
avg = sum(durations) / len(durations)
max_d = max(durations)
calls = len(durations)
print(f"{tool_name:<30} avg={avg:.2f}s max={max_d:.2f}s calls={calls}")
success_count = sum(1 for m in self.metrics if m.success)
total = len(self.metrics)
print(f"\n成功率: {success_count}/{total} ({100*success_count/total:.1f}%)")
小结
并行工具调用是将 Claude 的工具使用能力从"逐步执行"提升到"高效协作"的关键技术。核心要点:
- Claude 自然支持在单次响应中返回多个
tool_useblock - 开发者需要并行执行这些工具,并在同一轮消息中注入所有结果
- 工具描述应明确指出哪些工具可以并行调用,哪些有依赖顺序
- 生产系统需要处理部分失败、超时和性能监控
下一章将介绍 Computer Use——让 Claude 直接操作 GUI 界面的完整方案。