Advisor Tool: Executor + Advisor Dual-Model Single API Call for High-Quality Long-Range Tasks
Chapter 20: Parallel Tool Calls: Coordination Strategies for Simultaneous Multi-Tool Execution
20.1 The Value of Parallel Tool Calls
When handling complex tasks, executing tools sequentially creates unnecessary latency. If a user asks "compare tomorrow's weather in three cities and recommend what to wear," calling the weather API three times in sequence triples the wait time. Parallel tool calls allow Claude to request multiple tools in a single response; those tools can then execute simultaneously, dramatically reducing total latency.
Claude supports returning multiple tool_use blocks in a single stop_reason: tool_use response. When developers detect multiple tool calls, they should execute them concurrently and inject all results into the message history at once.
Scenarios suited for parallel calls:
- Data aggregation: Fetch information from multiple sources simultaneously
- Batch operations: Perform the same operation on multiple entities
- Independent sub-tasks: Decompose a large task into mutually independent sub-tasks for parallel processing
- Multi-perspective analysis: Analyze the same problem from different angles simultaneously
20.2 Message Format for Parallel Calls
Multiple Tool Calls in a Single Response
Claude can request multiple tools in one response:
import anthropic
import json
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=4096,
tools=[weather_tool, restaurant_tool, events_tool],
messages=[{
"role": "user",
"content": "Check the weather for tomorrow, recommend three restaurants, and list cultural events happening tomorrow"
}]
)
print(f"Stop reason: {response.stop_reason}") # "tool_use"
print(f"Number of tool calls: {sum(1 for b in response.content if b.type == 'tool_use')}")
for block in response.content:
if block.type == "tool_use":
print(f" - {block.name}: {json.dumps(block.input)}")
Parallel Execution and Result Injection
Critical: all tool results must be injected in the same turn, not across multiple rounds:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List
import anthropic
async def execute_tools_parallel(
tool_calls: List[anthropic.types.ToolUseBlock],
tool_executor
) -> List[dict]:
"""Execute multiple tool calls in parallel"""
async def execute_single(tool_use_block):
loop = asyncio.get_event_loop()
try:
result = await loop.run_in_executor(
None,
lambda: tool_executor.execute(
tool_use_block.name,
tool_use_block.input
)
)
return {
"type": "tool_result",
"tool_use_id": tool_use_block.id,
"content": json.dumps(result, default=str)
}
except Exception as e:
return {
"type": "tool_result",
"tool_use_id": tool_use_block.id,
"content": f"Error: {str(e)}",
"is_error": True
}
tasks = [execute_single(block) for block in tool_calls]
results = await asyncio.gather(*tasks)
return list(results)
async def run_parallel_agent(user_message: str, tools: list, executor) -> str:
"""Run an agent with parallel tool call support"""
client = anthropic.Anthropic()
messages = [{"role": "user", "content": user_message}]
while True:
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=4096,
tools=tools,
messages=messages
)
if response.stop_reason == "end_turn":
return ' '.join(b.text for b in response.content if b.type == "text")
if response.stop_reason == "tool_use":
tool_calls = [b for b in response.content if b.type == "tool_use"]
print(f"Executing {len(tool_calls)} tool calls in parallel...")
tool_results = await execute_tools_parallel(tool_calls, executor)
# Inject all results at once
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
else:
break
return "Task complete"
20.3 Synchronous Parallel Execution
If asyncio is not used, ThreadPoolExecutor can achieve parallelism:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import json
class ParallelToolExecutor:
"""Tool executor supporting parallel execution"""
def __init__(self, max_workers: int = 10):
self.tools = {}
self.tool_definitions = []
self.max_workers = max_workers
def register(self, tool_definition: dict, func):
self.tools[tool_definition["name"]] = func
self.tool_definitions.append(tool_definition)
return self
def execute_parallel(self, tool_calls: list) -> list:
"""Execute multiple tool calls in parallel; preserve order in results"""
results = [None] * len(tool_calls)
def execute_one(index_and_call):
index, tool_call = index_and_call
start_time = time.time()
name = tool_call.name if hasattr(tool_call, 'name') else tool_call["name"]
inputs = tool_call.input if hasattr(tool_call, 'input') else tool_call["input"]
tool_use_id = tool_call.id if hasattr(tool_call, 'id') else tool_call["id"]
try:
if name not in self.tools:
raise ValueError(f"Unknown tool: {name}")
result = self.tools[name](**inputs)
elapsed = time.time() - start_time
print(f" [OK] {name} completed in {elapsed:.2f}s")
return index, {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": json.dumps(result, default=str)
}
except Exception as e:
elapsed = time.time() - start_time
print(f" [ERROR] {name} failed in {elapsed:.2f}s: {e}")
return index, {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": str(e),
"is_error": True
}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [
executor.submit(execute_one, (i, call))
for i, call in enumerate(tool_calls)
]
for future in as_completed(futures):
index, result = future.result()
results[index] = result
return results
def run_agent(self, user_message: str, system: str = "") -> str:
"""Run a full parallel tool-call agent"""
import anthropic
client = anthropic.Anthropic()
messages = [{"role": "user", "content": user_message}]
create_kwargs = {
"model": "claude-opus-4-5",
"max_tokens": 4096,
"tools": self.tool_definitions,
"messages": messages
}
if system:
create_kwargs["system"] = system
iteration = 0
while iteration < 15:
iteration += 1
response = client.messages.create(**create_kwargs)
if response.stop_reason == "end_turn":
return ' '.join(b.text for b in response.content if b.type == "text")
if response.stop_reason == "tool_use":
tool_calls = [b for b in response.content if b.type == "tool_use"]
start = time.time()
print(f"\nIteration {iteration}: executing {len(tool_calls)} tools in parallel")
tool_results = self.execute_parallel(tool_calls)
print(f"All done in {time.time() - start:.2f}s")
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
create_kwargs["messages"] = messages
else:
break
return "Maximum iterations reached"
20.4 Tool Design to Guide Parallel Calls
Use tool descriptions to signal to Claude when parallel calls are appropriate:
parallelizable_tools = [
{
"name": "get_stock_price",
"description": """Get the current price and change information for a single stock.
Note: When querying multiple stocks, you can call this tool multiple times simultaneously
(parallel calls) rather than waiting for each result before continuing.""",
"input_schema": {
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Stock ticker symbol, e.g. 'AAPL', 'TSLA', 'MSFT'"
}
},
"required": ["symbol"]
}
},
{
"name": "get_company_info",
"description": """Get basic company information (founded date, core business, headcount, etc.).
This tool is independent of get_stock_price and can be called simultaneously with it.""",
"input_schema": {
"type": "object",
"properties": {
"symbol": {"type": "string", "description": "Stock ticker symbol"}
},
"required": ["symbol"]
}
}
]
Sequential Tools: Handling Dependency Chains
Some tools have data dependencies and must execute in order:
sequential_tools = [
{
"name": "create_order",
"description": """Create an order and return an order_id.
IMPORTANT: This tool must be called before process_payment and send_confirmation,
as those tools require the order_id. However, process_payment and send_confirmation
can be called in parallel with each other once the order_id is available.""",
"input_schema": {
"type": "object",
"properties": {
"product_id": {"type": "string"},
"quantity": {"type": "integer", "minimum": 1},
"user_id": {"type": "string"}
},
"required": ["product_id", "quantity", "user_id"]
}
},
{
"name": "process_payment",
"description": """Process payment for an order.
Dependency: Requires an order_id from create_order.
Can be called in parallel with send_confirmation once order_id is available.""",
"input_schema": {
"type": "object",
"properties": {
"order_id": {"type": "string", "description": "Returned by create_order"},
"payment_method": {"type": "string", "enum": ["credit_card", "paypal", "bank_transfer"]}
},
"required": ["order_id", "payment_method"]
}
},
{
"name": "send_confirmation",
"description": """Send an order confirmation email.
Dependency: Requires an order_id from create_order.
Can be called in parallel with process_payment.""",
"input_schema": {
"type": "object",
"properties": {
"order_id": {"type": "string"},
"email": {"type": "string"}
},
"required": ["order_id", "email"]
}
}
]
20.5 Timeout and Degradation Strategies
Per-Tool Timeout Control
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout
class TimeoutParallelExecutor(ParallelToolExecutor):
"""Parallel executor with per-tool timeout control"""
def __init__(self, max_workers: int = 10, tool_timeout_seconds: int = 30):
super().__init__(max_workers)
self.tool_timeout = tool_timeout_seconds
def execute_with_timeout(self, tool_name: str, inputs: dict,
tool_use_id: str) -> dict:
"""Execute a single tool with timeout"""
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(self.tools[tool_name], **inputs)
try:
result = future.result(timeout=self.tool_timeout)
return {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": json.dumps(result, default=str)
}
except FuturesTimeout:
return {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": f"Tool {tool_name} timed out after {self.tool_timeout}s",
"is_error": True
}
Graceful Degradation: Handling Partial Failures
def handle_partial_failures(tool_results: list) -> tuple:
"""
Handle scenarios where some tools fail
Returns: (should_continue, enhanced_results)
"""
failed = [r for r in tool_results if r.get("is_error")]
succeeded = [r for r in tool_results if not r.get("is_error")]
if failed:
print(f"Warning: {len(failed)} tool(s) failed, {len(succeeded)} succeeded")
# Strategies:
# 1. Continue with partial results (Claude gracefully handles missing data)
# 2. Raise exception to abort
# 3. Retry failed tools
return True, tool_results # Strategy 1: continue
20.6 Real-World Case: Competitor Analysis Agent
def build_competitor_analysis_agent():
"""Build an agent that analyzes multiple competitors in parallel"""
executor = ParallelToolExecutor(max_workers=10)
def fetch_company_website(url: str) -> dict:
# Simulated implementation
return {
"url": url,
"title": f"Homepage of {url}",
"description": "Core business description",
"product_count": 42
}
def get_funding_data(company_name: str) -> dict:
# Simulated implementation
return {
"company": company_name,
"total_funding": "$50M",
"last_round": "Series B",
"investors": ["a16z", "Sequoia"]
}
def analyze_app_store(app_name: str) -> dict:
# Simulated implementation
return {
"app": app_name,
"rating": 4.3,
"reviews_count": 12500,
"top_complaint": "Slow loading"
}
executor.register({
"name": "fetch_company_website",
"description": "Fetch product info from a company's website. Can be called in parallel for multiple companies.",
"input_schema": {
"type": "object",
"properties": {"url": {"type": "string"}},
"required": ["url"]
}
}, fetch_company_website)
executor.register({
"name": "get_funding_data",
"description": "Get funding history and investor information for a company. Can be called in parallel.",
"input_schema": {
"type": "object",
"properties": {"company_name": {"type": "string"}},
"required": ["company_name"]
}
}, get_funding_data)
executor.register({
"name": "analyze_app_store",
"description": "Analyze user reviews for an app on the App Store. Can be called in parallel.",
"input_schema": {
"type": "object",
"properties": {"app_name": {"type": "string"}},
"required": ["app_name"]
}
}, analyze_app_store)
return executor
executor = build_competitor_analysis_agent()
result = executor.run_agent(
user_message="""Please analyze three AI writing tools:
Jasper (jasper.ai), Copy.ai (copy.ai), Writesonic (writesonic.com).
For each company: fetch website info, get funding data, and analyze App Store reviews.
Then provide a comprehensive comparison report.""",
system="You are a professional market analyst skilled at gathering and analyzing competitor data from multiple dimensions."
)
print(result)
20.7 Performance Monitoring and Optimization
from dataclasses import dataclass, field
from typing import Dict, List
@dataclass
class ToolCallMetrics:
tool_name: str
start_time: float
end_time: float = 0
success: bool = True
error_msg: str = ""
@property
def duration(self) -> float:
return self.end_time - self.start_time
class MonitoredParallelExecutor(ParallelToolExecutor):
"""Parallel executor with performance monitoring"""
def __init__(self, max_workers: int = 10):
super().__init__(max_workers)
self.metrics: List[ToolCallMetrics] = []
def print_performance_report(self):
if not self.metrics:
print("No tool call records")
return
print("\n=== Tool Call Performance Report ===")
by_tool: Dict[str, List[float]] = {}
for m in self.metrics:
by_tool.setdefault(m.tool_name, []).append(m.duration)
for tool_name, durations in by_tool.items():
avg = sum(durations) / len(durations)
max_d = max(durations)
print(f"{tool_name:<35} avg={avg:.2f}s max={max_d:.2f}s calls={len(durations)}")
success_count = sum(1 for m in self.metrics if m.success)
total = len(self.metrics)
print(f"\nSuccess rate: {success_count}/{total} ({100*success_count/total:.1f}%)")
Summary
Parallel tool calls are the key technology for elevating Claude's tool usage from "step-by-step execution" to "efficient collaboration." The core points are:
- Claude natively supports returning multiple
tool_useblocks in a single response - Developers must execute those tools in parallel and inject all results in the same message turn
- Tool descriptions should explicitly indicate which tools can be called in parallel and which have dependency ordering
- Production systems need to handle partial failures, timeouts, and performance monitoring
The next chapter introduces Computer Use — a complete solution for having Claude directly operate GUI interfaces.