Complete Cost Optimization Playbook: Combined Strategies for Model Routing, Caching and Batch Processing with ROI Calculation
Chapter 72: Load Testing and Performance Tuning: Stress Testing Methods and Bottleneck Identification for Claude API
72.1 Why Claude API Load Testing Differs from Traditional API Testing
Load testing the Claude API faces an entirely different set of challenges from traditional RESTful API stress testing.
Typical goals of traditional API load testing:
- QPS (queries per second) under N concurrent requests
- Average response time and P99 latency
- Resource bottlenecks in CPU, memory, and database connections
Unique challenges of Claude API load testing:
- Highly variable response times (2 seconds to 120+ seconds, depending on output length and model)
- Core constraint is not server resources but token rate limits (TPM/RPM)
- Streaming responses require specialized testing approaches
- Costs are directly tied to traffic (load testing generates real bills)
- Context length has nonlinear impact on latency
Understanding these differences is the starting point for designing meaningful Claude API load tests.
72.2 Understanding the Rate Limiting System
Anthropic's Rate Limit Dimensions
Anthropic imposes multi-dimensional rate limits on API usage:
# Claude API rate limit tiers (example for claude-sonnet-4-5; actual values vary by tier)
RATE_LIMITS = {
"RPM": {
"name": "Requests Per Minute",
"tier_1": 50,
"tier_2": 1000,
"tier_3": 2000,
"tier_4": 4000,
},
"TPM": {
"name": "Tokens Per Minute",
"description": "Total input+output tokens per minute",
"tier_1": 40_000,
"tier_2": 400_000,
"tier_3": 800_000,
"tier_4": 4_000_000,
},
"TPD": {
"name": "Tokens Per Day",
"tier_1": 1_000_000,
"tier_2": 10_000_000,
"tier_3": "unlimited",
}
}
Parsing Rate Limit Response Headers
Understanding real-time rate limit status requires monitoring response headers:
class RateLimitMonitor:
def parse_rate_limit_headers(self, headers: dict) -> dict:
return {
"requests_limit": int(headers.get('anthropic-ratelimit-requests-limit', 0)),
"requests_remaining": int(headers.get('anthropic-ratelimit-requests-remaining', 0)),
"requests_reset": headers.get('anthropic-ratelimit-requests-reset', ''),
"tokens_limit": int(headers.get('anthropic-ratelimit-tokens-limit', 0)),
"tokens_remaining": int(headers.get('anthropic-ratelimit-tokens-remaining', 0)),
"tokens_reset": headers.get('anthropic-ratelimit-tokens-reset', ''),
"input_tokens_limit": int(headers.get('anthropic-ratelimit-input-tokens-limit', 0)),
"input_tokens_remaining": int(headers.get('anthropic-ratelimit-input-tokens-remaining', 0)),
"output_tokens_limit": int(headers.get('anthropic-ratelimit-output-tokens-limit', 0)),
"output_tokens_remaining": int(headers.get('anthropic-ratelimit-output-tokens-remaining', 0)),
}
def estimate_utilization(self, headers_data: dict) -> dict:
return {
"request_utilization": (
1 - headers_data["requests_remaining"] / max(headers_data["requests_limit"], 1)
),
"token_utilization": (
1 - headers_data["tokens_remaining"] / max(headers_data["tokens_limit"], 1)
)
}
72.3 Load Testing with Locust
Basic Locust Load Test Script
Locust is the most mature load testing framework in the Python ecosystem and suits complex Claude API testing scenarios:
# locustfile.py
import time
import random
from locust import HttpUser, task, between, events
import anthropic
import os
class ClaudeAPIUser(HttpUser):
"""
Claude API load test user.
Note: Overrides HTTP request logic to use the Anthropic SDK directly,
but uses Locust's statistics framework for metrics recording.
"""
wait_time = between(1, 3)
TEST_PROMPTS = {
"simple": [
"Summarize the concept of artificial intelligence in one sentence",
"Explain what machine learning is",
"What are the main differences between Python and Java?",
],
"medium": [
"Explain the pros and cons of microservices architecture with use cases",
"Describe REST API design best practices including security considerations",
"Explain the CAP theorem and its applications in distributed systems",
],
"complex": [
"""Analyze the time and space complexity of the following code,
and suggest optimizations:
def find_pairs(arr, target):
result = []
for i in range(len(arr)):
for j in range(i+1, len(arr)):
if arr[i] + arr[j] == target:
result.append((arr[i], arr[j]))
return result
""",
]
}
def on_start(self):
self.claude_client = anthropic.Anthropic(
api_key=os.environ["ANTHROPIC_API_KEY"],
timeout=anthropic.Timeout(connect=5.0, read=60.0)
)
@task(5)
def simple_query(self):
"""Simple query - weight 5 (highest frequency)"""
prompt = random.choice(self.TEST_PROMPTS["simple"])
self._make_claude_request(prompt, "simple", max_tokens=256)
@task(3)
def medium_query(self):
"""Medium query - weight 3"""
prompt = random.choice(self.TEST_PROMPTS["medium"])
self._make_claude_request(prompt, "medium", max_tokens=1024)
@task(1)
def complex_query(self):
"""Complex query - weight 1 (lowest frequency, but highest token consumption)"""
prompt = random.choice(self.TEST_PROMPTS["complex"])
self._make_claude_request(prompt, "complex", max_tokens=2048)
def _make_claude_request(self, prompt: str, task_type: str, max_tokens: int):
start_time = time.time()
try:
response = self.claude_client.messages.create(
model="claude-haiku-3-5", # Use Haiku for load testing to reduce costs
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}]
)
elapsed = int((time.time() - start_time) * 1000)
events.request.fire(
request_type="Claude API",
name=f"/{task_type}",
response_time=elapsed,
response_length=len(response.content[0].text),
exception=None
)
except anthropic.RateLimitError as e:
elapsed = int((time.time() - start_time) * 1000)
events.request.fire(
request_type="Claude API",
name=f"/{task_type}",
response_time=elapsed,
response_length=0,
exception=e
)
time.sleep(5)
except Exception as e:
elapsed = int((time.time() - start_time) * 1000)
events.request.fire(
request_type="Claude API",
name=f"/{task_type}",
response_time=elapsed,
response_length=0,
exception=e
)
Custom Metrics Collection
# custom_metrics.py
from collections import defaultdict
import threading
class ClaudeMetricsCollector:
def __init__(self):
self.lock = threading.Lock()
self.metrics = defaultdict(list)
def record(
self,
task_type: str,
input_tokens: int,
output_tokens: int,
ttfb_ms: float,
total_latency_ms: float,
rate_limited: bool = False
):
with self.lock:
self.metrics[task_type].append({
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"ttfb_ms": ttfb_ms,
"total_latency_ms": total_latency_ms,
"tokens_per_second": output_tokens / (total_latency_ms / 1000),
"rate_limited": rate_limited
})
def get_summary(self) -> dict:
summary = {}
for task_type, records in self.metrics.items():
if not records:
continue
n = len(records)
total_latencies = sorted([r["total_latency_ms"] for r in records])
summary[task_type] = {
"total_requests": n,
"avg_input_tokens": sum(r["input_tokens"] for r in records) / n,
"avg_output_tokens": sum(r["output_tokens"] for r in records) / n,
"avg_latency_ms": sum(total_latencies) / n,
"p50_latency_ms": total_latencies[int(n * 0.5)],
"p95_latency_ms": total_latencies[int(n * 0.95)],
"p99_latency_ms": total_latencies[int(n * 0.99)],
"avg_ttfb_ms": sum(r["ttfb_ms"] for r in records) / n,
"avg_tokens_per_second": sum(r["tokens_per_second"] for r in records) / n,
"rate_limited_pct": sum(1 for r in records if r["rate_limited"]) / n * 100
}
return summary
72.4 Load Testing with k6
k6 is another popular load testing tool using JavaScript test scripts:
// k6_claude_load_test.js
import http from 'k6/http';
import { sleep, check } from 'k6';
import { Rate, Trend, Counter } from 'k6/metrics';
const rateLimitedRequests = new Rate('rate_limited');
const inputTokens = new Trend('input_tokens');
const outputTokens = new Trend('output_tokens');
const tokenCost = new Counter('token_cost_cents');
export const options = {
stages: [
{ duration: '2m', target: 5 }, // Warm up: ramp to 5 users over 2 minutes
{ duration: '5m', target: 10 }, // Steady: 10 concurrent users for 5 minutes
{ duration: '2m', target: 20 }, // Stress: ramp to 20 concurrent users
{ duration: '3m', target: 20 }, // Sustain: 20 concurrent users for 3 minutes
{ duration: '2m', target: 0 }, // Wind down: ramp to 0
],
thresholds: {
http_req_duration: ['p(95)<10000'], // 95% of requests complete within 10s
rate_limited: ['rate<0.05'], // Rate limit rate below 5%
http_req_failed: ['rate<0.01'], // Failure rate below 1%
},
};
const ANTHROPIC_API_KEY = __ENV.ANTHROPIC_API_KEY;
const PROMPTS = [
"Explain the concept of recursion in programming",
"What are the SOLID principles in software design?",
"Describe the difference between SQL and NoSQL databases",
"What is containerization and why is it useful?",
];
export default function() {
const prompt = PROMPTS[Math.floor(Math.random() * PROMPTS.length)];
const payload = JSON.stringify({
model: "claude-haiku-3-5",
max_tokens: 512,
messages: [{ role: "user", content: prompt }]
});
const params = {
headers: {
'Content-Type': 'application/json',
'x-api-key': ANTHROPIC_API_KEY,
'anthropic-version': '2023-06-01',
},
timeout: '60s',
};
const res = http.post('https://api.anthropic.com/v1/messages', payload, params);
if (res.status === 429) {
rateLimitedRequests.add(1);
sleep(5);
return;
}
rateLimitedRequests.add(0);
const success = check(res, {
'status is 200': (r) => r.status === 200,
'has content': (r) => {
try {
const body = JSON.parse(r.body);
return body.content && body.content.length > 0;
} catch {
return false;
}
}
});
if (success) {
try {
const body = JSON.parse(res.body);
const usage = body.usage;
inputTokens.add(usage.input_tokens);
outputTokens.add(usage.output_tokens);
// Record cost (approximate claude-haiku-3-5 pricing)
const costCents = (usage.input_tokens / 1e6 * 0.8 +
usage.output_tokens / 1e6 * 4.0) * 100;
tokenCost.add(costCents);
} catch (e) {
console.error('Failed to parse response:', e);
}
}
sleep(Math.random() * 2 + 1);
}
72.5 Performance Bottleneck Identification Methodology
Five Common Bottleneck Types
Bottleneck Type 1: TPM Rate Limiting
def diagnose_tpm_bottleneck(metrics: list) -> dict:
"""
Determine whether the bottleneck is TPM rate limiting.
Signs:
- 429 errors concentrated in specific time windows
- Bimodal latency distribution (normal requests + requests waiting for rate recovery)
"""
rate_limited = [m for m in metrics if m.get("error_code") == "429"]
rate_limit_rate = len(rate_limited) / len(metrics)
if rate_limit_rate > 0.05:
return {
"bottleneck": "TPM_RATE_LIMIT",
"severity": "HIGH",
"rate_limited_pct": rate_limit_rate * 100,
"recommendations": [
"Reduce concurrent request count",
"Implement request queuing with token bucket rate limiting",
"Check if max_tokens is set too high causing TPM overruns",
"Consider upgrading to a higher rate limit tier"
]
}
return {"status": "no_rate_limit_bottleneck"}
Bottleneck Type 2: Excessive Context Length
def analyze_context_length_impact(metrics: list) -> dict:
"""Analyze the impact of context length on latency."""
groups = {
"short": [m for m in metrics if m["input_tokens"] < 1000],
"medium": [m for m in metrics if 1000 <= m["input_tokens"] < 10000],
"long": [m for m in metrics if m["input_tokens"] >= 10000]
}
analysis = {}
for group_name, group_metrics in groups.items():
if not group_metrics:
continue
latencies = [m["latency_ms"] for m in group_metrics]
analysis[group_name] = {
"count": len(group_metrics),
"avg_input_tokens": sum(m["input_tokens"] for m in group_metrics) / len(group_metrics),
"avg_latency_ms": sum(latencies) / len(latencies),
"p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)]
}
if "short" in analysis and "long" in analysis:
latency_multiplier = (
analysis["long"]["avg_latency_ms"] /
analysis["short"]["avg_latency_ms"]
)
if latency_multiplier > 3:
return {
"bottleneck": "CONTEXT_LENGTH",
"latency_multiplier": round(latency_multiplier, 1),
"recommendations": [
"Use Prompt Cache to cache repeated long prefixes",
"Implement context compression (summarize conversation history)",
"Review whether prompts include unnecessary information"
]
}
return analysis
Bottleneck Type 3: Suboptimal Model Selection
def analyze_model_performance_tradeoff(results_by_model: dict) -> dict:
"""Compare performance characteristics of different models under concurrent load."""
comparison = {}
for model, metrics in results_by_model.items():
if not metrics:
continue
latencies = sorted([m["latency_ms"] for m in metrics])
costs = [m["cost_usd"] for m in metrics]
n = len(metrics)
comparison[model] = {
"avg_latency_ms": sum(latencies) / n,
"p95_latency_ms": latencies[int(n * 0.95)],
"avg_cost_per_request_usd": sum(costs) / n,
"requests_analyzed": n
}
return comparison
Load Test Results Analysis Dashboard
class LoadTestAnalyzer:
def generate_report(self, test_results: dict) -> str:
report = []
report.append("=" * 60)
report.append("Claude API Load Test Report")
report.append("=" * 60)
report.append("\n## Overall Performance Metrics")
overall = test_results.get("overall", {})
report.append(f"Total requests: {overall.get('total_requests', 0):,}")
report.append(f"Success rate: {overall.get('success_rate', 0):.1%}")
report.append(f"Average latency: {overall.get('avg_latency_ms', 0):.0f}ms")
report.append(f"P95 latency: {overall.get('p95_latency_ms', 0):.0f}ms")
report.append(f"P99 latency: {overall.get('p99_latency_ms', 0):.0f}ms")
report.append(f"Rate limit rate: {overall.get('rate_limit_rate', 0):.1%}")
report.append("\n## Token Consumption Statistics")
token_stats = test_results.get("token_stats", {})
report.append(f"Total input tokens: {token_stats.get('total_input', 0):,}")
report.append(f"Total output tokens: {token_stats.get('total_output', 0):,}")
report.append(f"Estimated cost: ${token_stats.get('estimated_cost', 0):.4f}")
report.append("\n## Bottleneck Analysis")
bottlenecks = test_results.get("bottlenecks", [])
if bottlenecks:
for bottleneck in bottlenecks:
report.append(f"- [{bottleneck['severity']}] {bottleneck['type']}")
for rec in bottleneck.get('recommendations', []):
report.append(f" -> {rec}")
else:
report.append("No significant bottlenecks detected")
return "\n".join(report)
72.6 Cost Control During Load Testing
Strategies to Reduce Testing Costs
Load testing the Claude API generates real costs — conscious cost control is essential:
class CostControlledLoadTest:
def __init__(
self,
max_cost_usd: float = 10.0,
test_model: str = "claude-haiku-3-5",
max_output_tokens: int = 256
):
self.max_cost = max_cost_usd
self.test_model = test_model
self.max_output_tokens = max_output_tokens
self.current_cost = 0.0
self.pricing = {"input": 0.8, "output": 4.0}
def check_budget(self, estimated_tokens: tuple) -> bool:
input_tokens, output_tokens = estimated_tokens
estimated_cost = (
input_tokens / 1e6 * self.pricing["input"] +
output_tokens / 1e6 * self.pricing["output"]
)
if self.current_cost + estimated_cost > self.max_cost:
return False
return True
def record_cost(self, input_tokens: int, output_tokens: int):
cost = (
input_tokens / 1e6 * self.pricing["input"] +
output_tokens / 1e6 * self.pricing["output"]
)
self.current_cost += cost
if self.current_cost > self.max_cost * 0.8:
import logging
logging.warning(f"80% of test budget consumed: ${self.current_cost:.4f}")
def get_cost_summary(self) -> dict:
return {
"budget_usd": self.max_cost,
"spent_usd": round(self.current_cost, 4),
"remaining_usd": round(self.max_cost - self.current_cost, 4),
"utilization_pct": self.current_cost / self.max_cost * 100
}
72.7 Production Performance Optimization Recommendations
Based on load test results, common optimization paths:
1. Connection Reuse
# Use a persistent client; avoid recreating the connection per request
# Share a single Anthropic client instance rather than creating new ones per request
client = anthropic.Anthropic() # Create once at app startup, share globally
2. Concurrent Request Batching
import asyncio
async def batch_process(prompts: list, concurrency: int = 10) -> list:
"""
Concurrently process batch requests while controlling concurrency to avoid rate limits.
"""
semaphore = asyncio.Semaphore(concurrency)
client = anthropic.AsyncAnthropic()
async def process_one(prompt: str) -> str:
async with semaphore:
response = await client.messages.create(
model="claude-haiku-3-5",
max_tokens=512,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
tasks = [process_one(p) for p in prompts]
return await asyncio.gather(*tasks, return_exceptions=True)
3. Prompt Cache Performance Benefits For scenarios with long system prompts, Prompt Cache reduces not only cost but also TTFB:
# Enable Prompt Cache for the system prompt
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
system=[{
"type": "text",
"text": long_system_prompt,
"cache_control": {"type": "ephemeral"} # Cache the system prompt
}],
messages=[{"role": "user", "content": user_query}]
)
Summary
Claude API load testing requires specially designed test plans tailored to its unique characteristics: token rate limits as the core constraint, latency and TTFB as key metrics, and cost control as a necessary constraint.
Both Locust and k6 are viable load testing frameworks. Locust is more Python-developer-friendly, while k6 is more mature for CI/CD integration. Bottleneck identification should follow the order: TPM rate limits, context length, then model selection.
Ultimately, load testing is not just a means of finding performance limits — it is an engineering practice for understanding system behavior under real load. Only capacity planning based on real data provides the technical readiness for a product to handle user growth.