第 20 章

Advisor Tool:Executor + Advisor 双模型单次 API 实现高质量长程任务

第二十章:并行工具调用:同步执行多工具的协调策略

20.1 并行工具调用的价值

在处理复杂任务时,顺序执行工具会产生不必要的延迟。如果用户问"帮我比较三个城市明天的天气并推荐穿衣建议",顺序调用三次天气 API 会让等待时间翻三倍。并行工具调用允许 Claude 在单次响应中请求多个工具,这些工具可以同时执行,显著减少总延迟。

Claude 支持在单个 stop_reason: tool_use 响应中包含多个 tool_use block。开发者检测到多个工具调用时,应并发执行它们,然后将所有结果一次性注入消息历史。

并行调用适用的场景:

20.2 并行调用的消息格式

单次响应多工具调用

Claude 可以在一次响应中请求多个工具:

import anthropic
import json

client = anthropic.Anthropic()

# Claude 在一次响应中请求了三个工具
response = client.messages.create(
    model="claude-opus-4-5",
    max_tokens=4096,
    tools=[weather_tool, restaurant_tool, events_tool],
    messages=[{
        "role": "user",
        "content": "查询上海明天的天气、推荐三家餐厅、列出明天的文化活动"
    }]
)

# response.content 可能包含多个 tool_use blocks
print(f"Stop reason: {response.stop_reason}")  # "tool_use"
print(f"Number of tool calls: {sum(1 for b in response.content if b.type == 'tool_use')}")

for block in response.content:
    if block.type == "tool_use":
        print(f"  - {block.name}: {json.dumps(block.input, ensure_ascii=False)}")

并行执行与结果注入

关键点:所有工具的结果必须在同一轮消息中注入,而不是分多次:

import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple
import anthropic

async def execute_tools_parallel(
    tool_calls: List[anthropic.types.ToolUseBlock],
    tool_executor
) -> List[dict]:
    """并行执行多个工具调用"""
    
    async def execute_single(tool_use_block):
        """在线程池中执行单个工具"""
        loop = asyncio.get_event_loop()
        try:
            result = await loop.run_in_executor(
                None,  # 使用默认线程池
                lambda: tool_executor.execute(
                    tool_use_block.name, 
                    tool_use_block.input
                )
            )
            return {
                "type": "tool_result",
                "tool_use_id": tool_use_block.id,
                "content": json.dumps(result, ensure_ascii=False, default=str)
            }
        except Exception as e:
            return {
                "type": "tool_result",
                "tool_use_id": tool_use_block.id,
                "content": f"错误: {str(e)}",
                "is_error": True
            }
    
    # 并行执行所有工具
    tasks = [execute_single(block) for block in tool_calls]
    results = await asyncio.gather(*tasks)
    return list(results)


async def run_parallel_agent(user_message: str, tools: list, executor) -> str:
    """运行支持并行工具调用的 Agent"""
    
    client = anthropic.Anthropic()
    messages = [{"role": "user", "content": user_message}]
    
    while True:
        response = client.messages.create(
            model="claude-opus-4-5",
            max_tokens=4096,
            tools=tools,
            messages=messages
        )
        
        if response.stop_reason == "end_turn":
            return ' '.join(b.text for b in response.content if b.type == "text")
        
        if response.stop_reason == "tool_use":
            # 收集所有工具调用
            tool_calls = [b for b in response.content if b.type == "tool_use"]
            
            print(f"并行执行 {len(tool_calls)} 个工具调用...")
            
            # 并行执行
            tool_results = await execute_tools_parallel(tool_calls, executor)
            
            # 一次性注入所有结果
            messages.append({"role": "assistant", "content": response.content})
            messages.append({"role": "user", "content": tool_results})
        else:
            break
    
    return "任务完成"

20.3 同步版本的并行执行

如果不使用 asyncio,可以用 ThreadPoolExecutor 实现并行:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

class ParallelToolExecutor:
    """支持并行执行的工具执行器"""
    
    def __init__(self, max_workers: int = 10):
        self.tools = {}
        self.tool_definitions = []
        self.max_workers = max_workers
    
    def register(self, tool_definition: dict, func):
        self.tools[tool_definition["name"]] = func
        self.tool_definitions.append(tool_definition)
        return self
    
    def execute_parallel(self, tool_calls: list) -> list:
        """并行执行多个工具调用,返回所有结果"""
        results = [None] * len(tool_calls)  # 保持顺序
        
        def execute_one(index_and_call):
            index, tool_call = index_and_call
            start_time = time.time()
            try:
                if hasattr(tool_call, 'name'):
                    # anthropic ToolUseBlock 对象
                    name = tool_call.name
                    inputs = tool_call.input
                    tool_use_id = tool_call.id
                else:
                    name = tool_call["name"]
                    inputs = tool_call["input"]
                    tool_use_id = tool_call["id"]
                
                if name not in self.tools:
                    raise ValueError(f"未知工具: {name}")
                
                result = self.tools[name](**inputs)
                elapsed = time.time() - start_time
                
                print(f"  [OK] {name} 完成,耗时 {elapsed:.2f}s")
                
                return index, {
                    "type": "tool_result",
                    "tool_use_id": tool_use_id,
                    "content": json.dumps(result, ensure_ascii=False, default=str)
                }
            except Exception as e:
                elapsed = time.time() - start_time
                print(f"  [ERROR] {name} 失败,耗时 {elapsed:.2f}s: {e}")
                return index, {
                    "type": "tool_result",
                    "tool_use_id": tool_use_id,
                    "content": str(e),
                    "is_error": True
                }
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = [
                executor.submit(execute_one, (i, call))
                for i, call in enumerate(tool_calls)
            ]
            
            for future in as_completed(futures):
                index, result = future.result()
                results[index] = result
        
        return results
    
    def run_agent(self, user_message: str, system: str = "") -> str:
        """运行完整的并行工具调用 Agent"""
        client = anthropic.Anthropic()
        messages = [{"role": "user", "content": user_message}]
        
        create_kwargs = {
            "model": "claude-opus-4-5",
            "max_tokens": 4096,
            "tools": self.tool_definitions,
            "messages": messages
        }
        if system:
            create_kwargs["system"] = system
        
        iteration = 0
        while iteration < 15:
            iteration += 1
            response = client.messages.create(**create_kwargs)
            
            if response.stop_reason == "end_turn":
                return ' '.join(b.text for b in response.content if b.type == "text")
            
            if response.stop_reason == "tool_use":
                tool_calls = [b for b in response.content if b.type == "tool_use"]
                
                start = time.time()
                print(f"\n轮次 {iteration}:并行执行 {len(tool_calls)} 个工具")
                
                # 并行执行
                tool_results = self.execute_parallel(tool_calls)
                
                total_time = time.time() - start
                print(f"全部完成,总耗时 {total_time:.2f}s")
                
                messages.append({"role": "assistant", "content": response.content})
                messages.append({"role": "user", "content": tool_results})
                create_kwargs["messages"] = messages
            else:
                break
        
        return "达到最大迭代次数"

20.4 控制并行调用的工具设计

通过工具描述引导 Claude 在合适的时机使用并行调用:

# 明确告诉 Claude 这些工具可以并行调用
parallelizable_tools = [
    {
        "name": "get_stock_price",
        "description": """获取单个股票的当前价格和涨跌信息。
注意:当需要查询多只股票时,可以同时调用此工具多次(并行调用),
而不需要等待每次结果再继续。""",
        "input_schema": {
            "type": "object",
            "properties": {
                "symbol": {
                    "type": "string",
                    "description": "股票代码,如 'AAPL'、'TSLA'、'000001.SZ'"
                }
            },
            "required": ["symbol"]
        }
    },
    {
        "name": "get_company_info",
        "description": """获取公司基本信息(成立时间、主营业务、员工人数等)。
此工具独立于 get_stock_price,可以同时调用。""",
        "input_schema": {
            "type": "object",
            "properties": {
                "symbol": {"type": "string", "description": "股票代码"}
            },
            "required": ["symbol"]
        }
    }
]

不应并行的工具:依赖链处理

某些工具存在数据依赖,必须顺序执行:

sequential_tools = [
    {
        "name": "create_order",
        "description": """创建订单,返回 order_id。
重要:必须在调用 process_payment 和 send_confirmation 之前调用此工具,
因为这两个工具需要 order_id。""",
        "input_schema": {
            "type": "object",
            "properties": {
                "product_id": {"type": "string"},
                "quantity": {"type": "integer", "minimum": 1},
                "user_id": {"type": "string"}
            },
            "required": ["product_id", "quantity", "user_id"]
        }
    },
    {
        "name": "process_payment",
        "description": """处理订单支付。
依赖:需要先用 create_order 创建订单获取 order_id。
注意:process_payment 和 send_confirmation 在有了 order_id 之后可以并行调用。""",
        "input_schema": {
            "type": "object",
            "properties": {
                "order_id": {"type": "string", "description": "由 create_order 返回"},
                "payment_method": {"type": "string", "enum": ["credit_card", "alipay", "wechat_pay"]}
            },
            "required": ["order_id", "payment_method"]
        }
    },
    {
        "name": "send_confirmation",
        "description": """发送订单确认邮件。
依赖:需要先用 create_order 创建订单获取 order_id。
可以与 process_payment 并行执行。""",
        "input_schema": {
            "type": "object",
            "properties": {
                "order_id": {"type": "string"},
                "email": {"type": "string"}
            },
            "required": ["order_id", "email"]
        }
    }
]

20.5 并行调用的超时与降级策略

单工具超时控制

import signal
from contextlib import contextmanager
from typing import Optional

@contextmanager
def timeout(seconds: int):
    """Unix 系统的超时上下文管理器"""
    def handler(signum, frame):
        raise TimeoutError(f"工具执行超时({seconds}秒)")
    
    signal.signal(signal.SIGALRM, handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)


class TimeoutParallelExecutor(ParallelToolExecutor):
    """带超时控制的并行执行器"""
    
    def __init__(self, max_workers: int = 10, tool_timeout_seconds: int = 30):
        super().__init__(max_workers)
        self.tool_timeout = tool_timeout_seconds
    
    def execute_with_timeout(self, tool_name: str, inputs: dict, 
                              tool_use_id: str) -> dict:
        """带超时的单工具执行"""
        from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout
        
        with ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(self.tools[tool_name], **inputs)
            try:
                result = future.result(timeout=self.tool_timeout)
                return {
                    "type": "tool_result",
                    "tool_use_id": tool_use_id,
                    "content": json.dumps(result, ensure_ascii=False, default=str)
                }
            except FuturesTimeout:
                return {
                    "type": "tool_result",
                    "tool_use_id": tool_use_id,
                    "content": f"工具 {tool_name} 执行超时({self.tool_timeout}秒)",
                    "is_error": True
                }

降级策略:部分失败处理

def handle_partial_failures(tool_results: list, critical_tools: set) -> tuple:
    """
    处理部分工具失败的情况
    返回:(should_continue, enhanced_results)
    """
    failed_tools = []
    
    for result in tool_results:
        if result.get("is_error"):
            tool_id = result["tool_use_id"]
            failed_tools.append(tool_id)
    
    # 检查是否有关键工具失败
    # 注:实际使用时需要建立 tool_use_id 到 tool_name 的映射
    if failed_tools:
        print(f"警告:{len(failed_tools)} 个工具执行失败")
        # 可以选择:
        # 1. 继续并让 Claude 基于部分结果给出答案
        # 2. 抛出异常终止
        # 3. 重试失败的工具
    
    return True, tool_results  # 这里选择继续

20.6 实战案例:竞争对手分析 Agent

def build_competitor_analysis_agent():
    """构建并行分析多个竞争对手的 Agent"""
    
    executor = ParallelToolExecutor(max_workers=10)
    
    def fetch_company_website(url: str) -> dict:
        """抓取公司官网信息(模拟)"""
        return {
            "url": url,
            "title": f"{url} 的主页标题",
            "description": "公司主营业务描述",
            "product_count": 42
        }
    
    def get_crunchbase_data(company_name: str) -> dict:
        """获取 Crunchbase 融资信息(模拟)"""
        return {
            "company": company_name,
            "total_funding": "$50M",
            "last_round": "Series B",
            "investors": ["a16z", "Sequoia"]
        }
    
    def analyze_app_store(app_name: str) -> dict:
        """分析 App Store 评分(模拟)"""
        return {
            "app": app_name,
            "rating": 4.3,
            "reviews_count": 12500,
            "top_complaint": "加载速度慢"
        }
    
    executor.register({
        "name": "fetch_company_website",
        "description": "抓取指定公司官网的产品信息。可以对多个公司并行调用。",
        "input_schema": {
            "type": "object",
            "properties": {"url": {"type": "string"}},
            "required": ["url"]
        }
    }, fetch_company_website)
    
    executor.register({
        "name": "get_crunchbase_data",
        "description": "获取公司的融资历史和投资人信息。可以对多个公司并行调用。",
        "input_schema": {
            "type": "object",
            "properties": {"company_name": {"type": "string"}},
            "required": ["company_name"]
        }
    }, get_crunchbase_data)
    
    executor.register({
        "name": "analyze_app_store",
        "description": "分析应用在 App Store 的用户评价。可以对多个应用并行调用。",
        "input_schema": {
            "type": "object",
            "properties": {"app_name": {"type": "string"}},
            "required": ["app_name"]
        }
    }, analyze_app_store)
    
    return executor


# 使用
executor = build_competitor_analysis_agent()
result = executor.run_agent(
    user_message="""请帮我分析以下三家 AI 写作工具的竞争情况:
    Jasper (jasper.ai)、Copy.ai (copy.ai)、Writesonic (writesonic.com)。
    
    对每家公司:
    1. 抓取官网信息
    2. 查询融资情况
    3. 分析 App Store 评价
    
    最后给出综合对比报告。""",
    system="你是一个专业的市场分析师,善于从多维度收集和分析竞争对手信息。"
)

print(result)

20.7 性能监控与优化

import time
from dataclasses import dataclass, field
from typing import Dict, List

@dataclass
class ToolCallMetrics:
    """工具调用性能指标"""
    tool_name: str
    start_time: float
    end_time: float = 0
    success: bool = True
    error_msg: str = ""
    
    @property
    def duration(self) -> float:
        return self.end_time - self.start_time

class MonitoredParallelExecutor(ParallelToolExecutor):
    """带性能监控的并行执行器"""
    
    def __init__(self, max_workers: int = 10):
        super().__init__(max_workers)
        self.metrics: List[ToolCallMetrics] = []
    
    def execute_parallel(self, tool_calls: list) -> list:
        """带监控的并行执行"""
        results = []
        metrics_batch = []
        
        def monitored_execute(index_and_call):
            index, tool_call = index_and_call
            name = tool_call.name if hasattr(tool_call, 'name') else tool_call["name"]
            tool_use_id = tool_call.id if hasattr(tool_call, 'id') else tool_call["id"]
            inputs = tool_call.input if hasattr(tool_call, 'input') else tool_call["input"]
            
            metric = ToolCallMetrics(tool_name=name, start_time=time.time())
            
            try:
                result_data = self.tools[name](**inputs)
                metric.end_time = time.time()
                metric.success = True
                
                return index, metric, {
                    "type": "tool_result",
                    "tool_use_id": tool_use_id,
                    "content": json.dumps(result_data, ensure_ascii=False, default=str)
                }
            except Exception as e:
                metric.end_time = time.time()
                metric.success = False
                metric.error_msg = str(e)
                
                return index, metric, {
                    "type": "tool_result",
                    "tool_use_id": tool_use_id,
                    "content": str(e),
                    "is_error": True
                }
        
        ordered_results = [None] * len(tool_calls)
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = [
                executor.submit(monitored_execute, (i, call))
                for i, call in enumerate(tool_calls)
            ]
            for future in as_completed(futures):
                index, metric, result = future.result()
                ordered_results[index] = result
                self.metrics.append(metric)
        
        return ordered_results
    
    def print_performance_report(self):
        """打印性能报告"""
        if not self.metrics:
            print("没有工具调用记录")
            return
        
        print("\n=== 工具调用性能报告 ===")
        print(f"{'工具名称':<30} {'耗时(s)':<10} {'状态':<10}")
        print("-" * 55)
        
        by_tool: Dict[str, List[float]] = {}
        for m in self.metrics:
            if m.tool_name not in by_tool:
                by_tool[m.tool_name] = []
            by_tool[m.tool_name].append(m.duration)
        
        for tool_name, durations in by_tool.items():
            avg = sum(durations) / len(durations)
            max_d = max(durations)
            calls = len(durations)
            print(f"{tool_name:<30} avg={avg:.2f}s max={max_d:.2f}s calls={calls}")
        
        success_count = sum(1 for m in self.metrics if m.success)
        total = len(self.metrics)
        print(f"\n成功率: {success_count}/{total} ({100*success_count/total:.1f}%)")

小结

并行工具调用是将 Claude 的工具使用能力从"逐步执行"提升到"高效协作"的关键技术。核心要点:

  1. Claude 自然支持在单次响应中返回多个 tool_use block
  2. 开发者需要并行执行这些工具,并在同一轮消息中注入所有结果
  3. 工具描述应明确指出哪些工具可以并行调用,哪些有依赖顺序
  4. 生产系统需要处理部分失败、超时和性能监控

下一章将介绍 Computer Use——让 Claude 直接操作 GUI 界面的完整方案。

本章评分
4.8  / 5  (14 评分)

💬 留言讨论