Chapter 20

Advisor Tool: Executor + Advisor Dual-Model Single API Call for High-Quality Long-Range Tasks

Chapter 20: Parallel Tool Calls: Coordination Strategies for Simultaneous Multi-Tool Execution

20.1 The Value of Parallel Tool Calls

When handling complex tasks, executing tools sequentially creates unnecessary latency. If a user asks "compare tomorrow's weather in three cities and recommend what to wear," calling the weather API three times in sequence triples the wait time. Parallel tool calls allow Claude to request multiple tools in a single response; those tools can then execute simultaneously, dramatically reducing total latency.

Claude supports returning multiple tool_use blocks in a single stop_reason: tool_use response. When developers detect multiple tool calls, they should execute them concurrently and inject all results into the message history at once.

Scenarios suited for parallel calls:

20.2 Message Format for Parallel Calls

Multiple Tool Calls in a Single Response

Claude can request multiple tools in one response:

import anthropic
import json

client = anthropic.Anthropic()

response = client.messages.create(
    model="claude-opus-4-5",
    max_tokens=4096,
    tools=[weather_tool, restaurant_tool, events_tool],
    messages=[{
        "role": "user",
        "content": "Check the weather for tomorrow, recommend three restaurants, and list cultural events happening tomorrow"
    }]
)

print(f"Stop reason: {response.stop_reason}")  # "tool_use"
print(f"Number of tool calls: {sum(1 for b in response.content if b.type == 'tool_use')}")

for block in response.content:
    if block.type == "tool_use":
        print(f"  - {block.name}: {json.dumps(block.input)}")

Parallel Execution and Result Injection

Critical: all tool results must be injected in the same turn, not across multiple rounds:

import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List
import anthropic

async def execute_tools_parallel(
    tool_calls: List[anthropic.types.ToolUseBlock],
    tool_executor
) -> List[dict]:
    """Execute multiple tool calls in parallel"""
    
    async def execute_single(tool_use_block):
        loop = asyncio.get_event_loop()
        try:
            result = await loop.run_in_executor(
                None,
                lambda: tool_executor.execute(
                    tool_use_block.name, 
                    tool_use_block.input
                )
            )
            return {
                "type": "tool_result",
                "tool_use_id": tool_use_block.id,
                "content": json.dumps(result, default=str)
            }
        except Exception as e:
            return {
                "type": "tool_result",
                "tool_use_id": tool_use_block.id,
                "content": f"Error: {str(e)}",
                "is_error": True
            }
    
    tasks = [execute_single(block) for block in tool_calls]
    results = await asyncio.gather(*tasks)
    return list(results)


async def run_parallel_agent(user_message: str, tools: list, executor) -> str:
    """Run an agent with parallel tool call support"""
    
    client = anthropic.Anthropic()
    messages = [{"role": "user", "content": user_message}]
    
    while True:
        response = client.messages.create(
            model="claude-opus-4-5",
            max_tokens=4096,
            tools=tools,
            messages=messages
        )
        
        if response.stop_reason == "end_turn":
            return ' '.join(b.text for b in response.content if b.type == "text")
        
        if response.stop_reason == "tool_use":
            tool_calls = [b for b in response.content if b.type == "tool_use"]
            print(f"Executing {len(tool_calls)} tool calls in parallel...")
            
            tool_results = await execute_tools_parallel(tool_calls, executor)
            
            # Inject all results at once
            messages.append({"role": "assistant", "content": response.content})
            messages.append({"role": "user", "content": tool_results})
        else:
            break
    
    return "Task complete"

20.3 Synchronous Parallel Execution

If asyncio is not used, ThreadPoolExecutor can achieve parallelism:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import json

class ParallelToolExecutor:
    """Tool executor supporting parallel execution"""
    
    def __init__(self, max_workers: int = 10):
        self.tools = {}
        self.tool_definitions = []
        self.max_workers = max_workers
    
    def register(self, tool_definition: dict, func):
        self.tools[tool_definition["name"]] = func
        self.tool_definitions.append(tool_definition)
        return self
    
    def execute_parallel(self, tool_calls: list) -> list:
        """Execute multiple tool calls in parallel; preserve order in results"""
        results = [None] * len(tool_calls)
        
        def execute_one(index_and_call):
            index, tool_call = index_and_call
            start_time = time.time()
            
            name = tool_call.name if hasattr(tool_call, 'name') else tool_call["name"]
            inputs = tool_call.input if hasattr(tool_call, 'input') else tool_call["input"]
            tool_use_id = tool_call.id if hasattr(tool_call, 'id') else tool_call["id"]
            
            try:
                if name not in self.tools:
                    raise ValueError(f"Unknown tool: {name}")
                
                result = self.tools[name](**inputs)
                elapsed = time.time() - start_time
                print(f"  [OK] {name} completed in {elapsed:.2f}s")
                
                return index, {
                    "type": "tool_result",
                    "tool_use_id": tool_use_id,
                    "content": json.dumps(result, default=str)
                }
            except Exception as e:
                elapsed = time.time() - start_time
                print(f"  [ERROR] {name} failed in {elapsed:.2f}s: {e}")
                return index, {
                    "type": "tool_result",
                    "tool_use_id": tool_use_id,
                    "content": str(e),
                    "is_error": True
                }
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = [
                executor.submit(execute_one, (i, call))
                for i, call in enumerate(tool_calls)
            ]
            for future in as_completed(futures):
                index, result = future.result()
                results[index] = result
        
        return results
    
    def run_agent(self, user_message: str, system: str = "") -> str:
        """Run a full parallel tool-call agent"""
        import anthropic
        client = anthropic.Anthropic()
        messages = [{"role": "user", "content": user_message}]
        
        create_kwargs = {
            "model": "claude-opus-4-5",
            "max_tokens": 4096,
            "tools": self.tool_definitions,
            "messages": messages
        }
        if system:
            create_kwargs["system"] = system
        
        iteration = 0
        while iteration < 15:
            iteration += 1
            response = client.messages.create(**create_kwargs)
            
            if response.stop_reason == "end_turn":
                return ' '.join(b.text for b in response.content if b.type == "text")
            
            if response.stop_reason == "tool_use":
                tool_calls = [b for b in response.content if b.type == "tool_use"]
                
                start = time.time()
                print(f"\nIteration {iteration}: executing {len(tool_calls)} tools in parallel")
                tool_results = self.execute_parallel(tool_calls)
                print(f"All done in {time.time() - start:.2f}s")
                
                messages.append({"role": "assistant", "content": response.content})
                messages.append({"role": "user", "content": tool_results})
                create_kwargs["messages"] = messages
            else:
                break
        
        return "Maximum iterations reached"

20.4 Tool Design to Guide Parallel Calls

Use tool descriptions to signal to Claude when parallel calls are appropriate:

parallelizable_tools = [
    {
        "name": "get_stock_price",
        "description": """Get the current price and change information for a single stock.
Note: When querying multiple stocks, you can call this tool multiple times simultaneously 
(parallel calls) rather than waiting for each result before continuing.""",
        "input_schema": {
            "type": "object",
            "properties": {
                "symbol": {
                    "type": "string",
                    "description": "Stock ticker symbol, e.g. 'AAPL', 'TSLA', 'MSFT'"
                }
            },
            "required": ["symbol"]
        }
    },
    {
        "name": "get_company_info",
        "description": """Get basic company information (founded date, core business, headcount, etc.).
This tool is independent of get_stock_price and can be called simultaneously with it.""",
        "input_schema": {
            "type": "object",
            "properties": {
                "symbol": {"type": "string", "description": "Stock ticker symbol"}
            },
            "required": ["symbol"]
        }
    }
]

Sequential Tools: Handling Dependency Chains

Some tools have data dependencies and must execute in order:

sequential_tools = [
    {
        "name": "create_order",
        "description": """Create an order and return an order_id.
IMPORTANT: This tool must be called before process_payment and send_confirmation, 
as those tools require the order_id. However, process_payment and send_confirmation 
can be called in parallel with each other once the order_id is available.""",
        "input_schema": {
            "type": "object",
            "properties": {
                "product_id": {"type": "string"},
                "quantity": {"type": "integer", "minimum": 1},
                "user_id": {"type": "string"}
            },
            "required": ["product_id", "quantity", "user_id"]
        }
    },
    {
        "name": "process_payment",
        "description": """Process payment for an order.
Dependency: Requires an order_id from create_order.
Can be called in parallel with send_confirmation once order_id is available.""",
        "input_schema": {
            "type": "object",
            "properties": {
                "order_id": {"type": "string", "description": "Returned by create_order"},
                "payment_method": {"type": "string", "enum": ["credit_card", "paypal", "bank_transfer"]}
            },
            "required": ["order_id", "payment_method"]
        }
    },
    {
        "name": "send_confirmation",
        "description": """Send an order confirmation email.
Dependency: Requires an order_id from create_order.
Can be called in parallel with process_payment.""",
        "input_schema": {
            "type": "object",
            "properties": {
                "order_id": {"type": "string"},
                "email": {"type": "string"}
            },
            "required": ["order_id", "email"]
        }
    }
]

20.5 Timeout and Degradation Strategies

Per-Tool Timeout Control

from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout

class TimeoutParallelExecutor(ParallelToolExecutor):
    """Parallel executor with per-tool timeout control"""
    
    def __init__(self, max_workers: int = 10, tool_timeout_seconds: int = 30):
        super().__init__(max_workers)
        self.tool_timeout = tool_timeout_seconds
    
    def execute_with_timeout(self, tool_name: str, inputs: dict, 
                              tool_use_id: str) -> dict:
        """Execute a single tool with timeout"""
        with ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(self.tools[tool_name], **inputs)
            try:
                result = future.result(timeout=self.tool_timeout)
                return {
                    "type": "tool_result",
                    "tool_use_id": tool_use_id,
                    "content": json.dumps(result, default=str)
                }
            except FuturesTimeout:
                return {
                    "type": "tool_result",
                    "tool_use_id": tool_use_id,
                    "content": f"Tool {tool_name} timed out after {self.tool_timeout}s",
                    "is_error": True
                }

Graceful Degradation: Handling Partial Failures

def handle_partial_failures(tool_results: list) -> tuple:
    """
    Handle scenarios where some tools fail
    Returns: (should_continue, enhanced_results)
    """
    failed = [r for r in tool_results if r.get("is_error")]
    succeeded = [r for r in tool_results if not r.get("is_error")]
    
    if failed:
        print(f"Warning: {len(failed)} tool(s) failed, {len(succeeded)} succeeded")
        # Strategies:
        # 1. Continue with partial results (Claude gracefully handles missing data)
        # 2. Raise exception to abort
        # 3. Retry failed tools
    
    return True, tool_results  # Strategy 1: continue

20.6 Real-World Case: Competitor Analysis Agent

def build_competitor_analysis_agent():
    """Build an agent that analyzes multiple competitors in parallel"""
    
    executor = ParallelToolExecutor(max_workers=10)
    
    def fetch_company_website(url: str) -> dict:
        # Simulated implementation
        return {
            "url": url,
            "title": f"Homepage of {url}",
            "description": "Core business description",
            "product_count": 42
        }
    
    def get_funding_data(company_name: str) -> dict:
        # Simulated implementation
        return {
            "company": company_name,
            "total_funding": "$50M",
            "last_round": "Series B",
            "investors": ["a16z", "Sequoia"]
        }
    
    def analyze_app_store(app_name: str) -> dict:
        # Simulated implementation
        return {
            "app": app_name,
            "rating": 4.3,
            "reviews_count": 12500,
            "top_complaint": "Slow loading"
        }
    
    executor.register({
        "name": "fetch_company_website",
        "description": "Fetch product info from a company's website. Can be called in parallel for multiple companies.",
        "input_schema": {
            "type": "object",
            "properties": {"url": {"type": "string"}},
            "required": ["url"]
        }
    }, fetch_company_website)
    
    executor.register({
        "name": "get_funding_data",
        "description": "Get funding history and investor information for a company. Can be called in parallel.",
        "input_schema": {
            "type": "object",
            "properties": {"company_name": {"type": "string"}},
            "required": ["company_name"]
        }
    }, get_funding_data)
    
    executor.register({
        "name": "analyze_app_store",
        "description": "Analyze user reviews for an app on the App Store. Can be called in parallel.",
        "input_schema": {
            "type": "object",
            "properties": {"app_name": {"type": "string"}},
            "required": ["app_name"]
        }
    }, analyze_app_store)
    
    return executor


executor = build_competitor_analysis_agent()
result = executor.run_agent(
    user_message="""Please analyze three AI writing tools: 
    Jasper (jasper.ai), Copy.ai (copy.ai), Writesonic (writesonic.com).
    
    For each company: fetch website info, get funding data, and analyze App Store reviews.
    Then provide a comprehensive comparison report.""",
    system="You are a professional market analyst skilled at gathering and analyzing competitor data from multiple dimensions."
)
print(result)

20.7 Performance Monitoring and Optimization

from dataclasses import dataclass, field
from typing import Dict, List

@dataclass
class ToolCallMetrics:
    tool_name: str
    start_time: float
    end_time: float = 0
    success: bool = True
    error_msg: str = ""
    
    @property
    def duration(self) -> float:
        return self.end_time - self.start_time

class MonitoredParallelExecutor(ParallelToolExecutor):
    """Parallel executor with performance monitoring"""
    
    def __init__(self, max_workers: int = 10):
        super().__init__(max_workers)
        self.metrics: List[ToolCallMetrics] = []
    
    def print_performance_report(self):
        if not self.metrics:
            print("No tool call records")
            return
        
        print("\n=== Tool Call Performance Report ===")
        
        by_tool: Dict[str, List[float]] = {}
        for m in self.metrics:
            by_tool.setdefault(m.tool_name, []).append(m.duration)
        
        for tool_name, durations in by_tool.items():
            avg = sum(durations) / len(durations)
            max_d = max(durations)
            print(f"{tool_name:<35} avg={avg:.2f}s  max={max_d:.2f}s  calls={len(durations)}")
        
        success_count = sum(1 for m in self.metrics if m.success)
        total = len(self.metrics)
        print(f"\nSuccess rate: {success_count}/{total} ({100*success_count/total:.1f}%)")

Summary

Parallel tool calls are the key technology for elevating Claude's tool usage from "step-by-step execution" to "efficient collaboration." The core points are:

  1. Claude natively supports returning multiple tool_use blocks in a single response
  2. Developers must execute those tools in parallel and inject all results in the same message turn
  3. Tool descriptions should explicitly indicate which tools can be called in parallel and which have dependency ordering
  4. Production systems need to handle partial failures, timeouts, and performance monitoring

The next chapter introduces Computer Use — a complete solution for having Claude directly operate GUI interfaces.

Rate this chapter
4.8  / 5  (14 ratings)

💬 Comments