第 72 章
成本优化全攻略:模型路由 / 缓存 / Batch 的组合策略与 ROI 计算
第七十二章:负载测试与性能调优:Claude API 的压测方法与瓶颈定位
72.1 为什么 Claude API 的压测与传统 API 不同
对 Claude API 进行压测,面临与传统 RESTful API 压测完全不同的挑战体系。
传统 API 压测的典型目标:
- 服务器在并发 N 个请求时的 QPS(每秒请求数)
- 平均响应时间和 P99 延迟
- CPU、内存、数据库连接的资源瓶颈
Claude API 压测的独特挑战:
- 响应时间高度可变(从 2 秒到 120 秒,取决于输出长度和模型)
- 核心约束不是服务器资源,而是 token 速率限制(TPM/RPM)
- 流式响应(Streaming)需要专门的测试方案
- 费用直接与流量挂钩(压测会产生真实账单!)
- 上下文长度对延迟有非线性影响
理解这些差异,是设计有意义的 Claude API 压测的起点。
72.2 速率限制体系理解
Anthropic 的速率限制维度
Anthropic 对 API 使用施加多维度的速率限制:
# Claude API 速率限制层次(以 claude-sonnet-4-5 为例,实际值因账户层级而异)
RATE_LIMITS = {
"RPM": {
"name": "每分钟请求数",
"description": "每分钟可发送的 API 请求总数",
"tier_1": 50, # 基础层
"tier_2": 1000, # 构建层
"tier_3": 2000, # 扩展层
"tier_4": 4000, # 生产层
},
"TPM": {
"name": "每分钟 Token 数",
"description": "每分钟可处理的输入+输出 token 总数",
"tier_1": 40_000,
"tier_2": 400_000,
"tier_3": 800_000,
"tier_4": 4_000_000,
},
"TPD": {
"name": "每日 Token 数",
"description": "每日可处理的 token 上限",
"tier_1": 1_000_000,
"tier_2": 10_000_000,
"tier_3": "unlimited",
}
}
速率限制响应头解析
理解速率限制状态需要实时监控响应头:
class RateLimitMonitor:
"""
实时监控 Claude API 速率限制状态
"""
def parse_rate_limit_headers(self, headers: dict) -> dict:
"""
解析 Anthropic API 返回的速率限制相关响应头
"""
return {
# 请求限制
"requests_limit": int(headers.get('anthropic-ratelimit-requests-limit', 0)),
"requests_remaining": int(headers.get('anthropic-ratelimit-requests-remaining', 0)),
"requests_reset": headers.get('anthropic-ratelimit-requests-reset', ''),
# Token 限制
"tokens_limit": int(headers.get('anthropic-ratelimit-tokens-limit', 0)),
"tokens_remaining": int(headers.get('anthropic-ratelimit-tokens-remaining', 0)),
"tokens_reset": headers.get('anthropic-ratelimit-tokens-reset', ''),
# 输入 Token 限制(单独计)
"input_tokens_limit": int(headers.get('anthropic-ratelimit-input-tokens-limit', 0)),
"input_tokens_remaining": int(headers.get('anthropic-ratelimit-input-tokens-remaining', 0)),
# 输出 Token 限制
"output_tokens_limit": int(headers.get('anthropic-ratelimit-output-tokens-limit', 0)),
"output_tokens_remaining": int(headers.get('anthropic-ratelimit-output-tokens-remaining', 0)),
}
def estimate_utilization(self, headers_data: dict) -> dict:
"""估算当前速率限制使用率"""
return {
"request_utilization": (
1 - headers_data["requests_remaining"] / max(headers_data["requests_limit"], 1)
),
"token_utilization": (
1 - headers_data["tokens_remaining"] / max(headers_data["tokens_limit"], 1)
)
}
72.3 使用 Locust 进行 Claude API 压测
基础 Locust 压测脚本
Locust 是 Python 生态中最成熟的负载测试框架,适合 Claude API 的复杂压测场景:
# locustfile.py
import time
import random
from locust import HttpUser, task, between, events
from locust.exception import StopUser
import anthropic
import json
# 不使用 Locust 的 HTTP 客户端,直接使用 Anthropic SDK
# 但通过 Locust 的统计框架记录指标
class ClaudeAPIUser(HttpUser):
"""
Claude API 负载测试用户
注意:重写了 HTTP 请求逻辑,直接使用 anthropic SDK
"""
wait_time = between(1, 3) # 请求间隔 1-3 秒
# 不同复杂度的测试用例
TEST_PROMPTS = {
"simple": [
"用一句话总结人工智能的概念",
"解释什么是机器学习",
"Python 和 Java 的主要区别是什么?",
],
"medium": [
"请解释微服务架构的优缺点,并给出适用场景",
"描述 REST API 设计的最佳实践,包括安全性考量",
"解释 CAP 理论及其在分布式系统中的应用",
],
"complex": [
"""请分析以下代码的时间复杂度和空间复杂度,并提出优化建议:
def find_pairs(arr, target):
result = []
for i in range(len(arr)):
for j in range(i+1, len(arr)):
if arr[i] + arr[j] == target:
result.append((arr[i], arr[j]))
return result
""",
]
}
def on_start(self):
"""用户开始时初始化 Claude 客户端"""
import os
self.claude_client = anthropic.Anthropic(
api_key=os.environ["ANTHROPIC_API_KEY"],
timeout=anthropic.Timeout(
connect=5.0,
read=60.0
)
)
@task(5)
def simple_query(self):
"""简单查询 - 权重 5(执行频率最高)"""
prompt = random.choice(self.TEST_PROMPTS["simple"])
self._make_claude_request(prompt, "simple", max_tokens=256)
@task(3)
def medium_query(self):
"""中等查询 - 权重 3"""
prompt = random.choice(self.TEST_PROMPTS["medium"])
self._make_claude_request(prompt, "medium", max_tokens=1024)
@task(1)
def complex_query(self):
"""复杂查询 - 权重 1(执行频率最低,但 token 消耗最高)"""
prompt = random.choice(self.TEST_PROMPTS["complex"])
self._make_claude_request(prompt, "complex", max_tokens=2048)
def _make_claude_request(
self,
prompt: str,
task_type: str,
max_tokens: int
):
start_time = time.time()
try:
response = self.claude_client.messages.create(
model="claude-haiku-3-5", # 压测使用 Haiku 降低成本
max_tokens=max_tokens,
messages=[{
"role": "user",
"content": prompt
}]
)
elapsed = int((time.time() - start_time) * 1000)
# 手动触发 Locust 成功事件
events.request.fire(
request_type="Claude API",
name=f"/{task_type}",
response_time=elapsed,
response_length=len(response.content[0].text),
context={
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens
},
exception=None
)
except anthropic.RateLimitError as e:
elapsed = int((time.time() - start_time) * 1000)
events.request.fire(
request_type="Claude API",
name=f"/{task_type}",
response_time=elapsed,
response_length=0,
exception=e
)
# 速率限制时暂停当前用户
time.sleep(5)
except Exception as e:
elapsed = int((time.time() - start_time) * 1000)
events.request.fire(
request_type="Claude API",
name=f"/{task_type}",
response_time=elapsed,
response_length=0,
exception=e
)
自定义指标收集
# custom_metrics.py - 在 Locust 中收集 Claude 专有指标
from locust import events
from collections import defaultdict
import threading
class ClaudeMetricsCollector:
"""收集 Claude API 特有的性能指标"""
def __init__(self):
self.lock = threading.Lock()
self.metrics = defaultdict(list)
def record(
self,
task_type: str,
input_tokens: int,
output_tokens: int,
ttfb_ms: float, # Time to First Byte
total_latency_ms: float,
rate_limited: bool = False
):
with self.lock:
self.metrics[task_type].append({
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"ttfb_ms": ttfb_ms,
"total_latency_ms": total_latency_ms,
"tokens_per_second": output_tokens / (total_latency_ms / 1000),
"rate_limited": rate_limited
})
def get_summary(self) -> dict:
summary = {}
for task_type, records in self.metrics.items():
if not records:
continue
total_latencies = [r["total_latency_ms"] for r in records]
ttfbs = [r["ttfb_ms"] for r in records]
tps = [r["tokens_per_second"] for r in records]
total_latencies.sort()
n = len(total_latencies)
summary[task_type] = {
"total_requests": n,
"avg_input_tokens": sum(r["input_tokens"] for r in records) / n,
"avg_output_tokens": sum(r["output_tokens"] for r in records) / n,
"avg_latency_ms": sum(total_latencies) / n,
"p50_latency_ms": total_latencies[int(n * 0.5)],
"p95_latency_ms": total_latencies[int(n * 0.95)],
"p99_latency_ms": total_latencies[int(n * 0.99)],
"avg_ttfb_ms": sum(ttfbs) / n,
"avg_tokens_per_second": sum(tps) / n,
"rate_limited_pct": sum(1 for r in records if r["rate_limited"]) / n * 100
}
return summary
metrics_collector = ClaudeMetricsCollector()
72.4 使用 k6 进行压测
k6 是另一个流行的负载测试工具,使用 JavaScript 编写测试脚本:
// k6_claude_load_test.js
import http from 'k6/http';
import { sleep, check } from 'k6';
import { Rate, Trend, Counter } from 'k6/metrics';
// 自定义指标
const rateLimitedRequests = new Rate('rate_limited');
const inputTokens = new Trend('input_tokens');
const outputTokens = new Trend('output_tokens');
const tokenCost = new Counter('token_cost_cents');
export const options = {
stages: [
{ duration: '2m', target: 5 }, // 预热:2 分钟内逐步增加到 5 个并发用户
{ duration: '5m', target: 10 }, // 稳定:10 个并发用户运行 5 分钟
{ duration: '2m', target: 20 }, // 压力:增加到 20 个并发用户
{ duration: '3m', target: 20 }, // 持续:在 20 并发下运行 3 分钟
{ duration: '2m', target: 0 }, // 降温:逐步降至 0
],
thresholds: {
http_req_duration: ['p(95)<10000'], // 95% 请求在 10 秒内完成
rate_limited: ['rate<0.05'], // 速率限制率低于 5%
http_req_failed: ['rate<0.01'], // 失败率低于 1%
},
};
const ANTHROPIC_API_KEY = __ENV.ANTHROPIC_API_KEY;
const PROMPTS = [
"Explain the concept of recursion in programming",
"What are the SOLID principles in software design?",
"Describe the difference between SQL and NoSQL databases",
"What is containerization and why is it useful?",
];
export default function() {
const prompt = PROMPTS[Math.floor(Math.random() * PROMPTS.length)];
const payload = JSON.stringify({
model: "claude-haiku-3-5",
max_tokens: 512,
messages: [{
role: "user",
content: prompt
}]
});
const params = {
headers: {
'Content-Type': 'application/json',
'x-api-key': ANTHROPIC_API_KEY,
'anthropic-version': '2023-06-01',
},
timeout: '60s',
};
const res = http.post(
'https://api.anthropic.com/v1/messages',
payload,
params
);
// 检查是否被速率限制
if (res.status === 429) {
rateLimitedRequests.add(1);
sleep(5); // 等待后重试
return;
}
rateLimitedRequests.add(0);
const success = check(res, {
'status is 200': (r) => r.status === 200,
'has content': (r) => {
try {
const body = JSON.parse(r.body);
return body.content && body.content.length > 0;
} catch {
return false;
}
}
});
if (success) {
try {
const body = JSON.parse(res.body);
const usage = body.usage;
inputTokens.add(usage.input_tokens);
outputTokens.add(usage.output_tokens);
// 记录成本(claude-haiku-3-5 的近似价格)
const costCents = (usage.input_tokens / 1e6 * 0.8 +
usage.output_tokens / 1e6 * 4.0) * 100;
tokenCost.add(costCents);
} catch (e) {
console.error('Failed to parse response:', e);
}
}
sleep(Math.random() * 2 + 1); // 1-3 秒随机等待
}
72.5 性能瓶颈定位方法论
五类常见瓶颈
瓶颈类型 1:TPM 速率限制
def diagnose_tpm_bottleneck(metrics: list) -> dict:
"""
判断瓶颈是否来自 TPM 速率限制
特征:
- 429 错误集中在特定时间窗口
- 请求延迟分布出现双峰(正常请求 + 等待速率恢复的请求)
"""
rate_limited = [m for m in metrics if m.get("error_code") == "429"]
rate_limit_rate = len(rate_limited) / len(metrics)
if rate_limit_rate > 0.05: # 超过 5% 被速率限制
return {
"bottleneck": "TPM_RATE_LIMIT",
"severity": "HIGH",
"rate_limited_pct": rate_limit_rate * 100,
"recommendations": [
"减少并发请求数",
"实现请求队列和令牌桶限流",
"检查是否有 max_tokens 设置过高导致 TPM 超限",
"考虑升级到更高的速率限制层级"
]
}
瓶颈类型 2:上下文长度过大
def analyze_context_length_impact(metrics: list) -> dict:
"""
分析上下文长度对延迟的影响
"""
# 按输入 token 数分组
groups = {
"short": [m for m in metrics if m["input_tokens"] < 1000],
"medium": [m for m in metrics if 1000 <= m["input_tokens"] < 10000],
"long": [m for m in metrics if m["input_tokens"] >= 10000]
}
analysis = {}
for group_name, group_metrics in groups.items():
if not group_metrics:
continue
latencies = [m["latency_ms"] for m in group_metrics]
analysis[group_name] = {
"count": len(group_metrics),
"avg_input_tokens": sum(m["input_tokens"] for m in group_metrics) / len(group_metrics),
"avg_latency_ms": sum(latencies) / len(latencies),
"p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)]
}
# 计算延迟随上下文增长的倍数
if "short" in analysis and "long" in analysis:
latency_multiplier = (
analysis["long"]["avg_latency_ms"] /
analysis["short"]["avg_latency_ms"]
)
if latency_multiplier > 3:
return {
"bottleneck": "CONTEXT_LENGTH",
"latency_multiplier": round(latency_multiplier, 1),
"recommendations": [
"使用 Prompt Cache 缓存重复的长前缀",
"实现上下文压缩(摘要历史对话)",
"审查是否在 prompt 中包含了不必要的信息"
]
}
return analysis
瓶颈类型 3:并发模型选择不当
def analyze_model_performance_tradeoff(results_by_model: dict) -> dict:
"""
比较不同模型在并发场景下的性能表现
"""
comparison = {}
for model, metrics in results_by_model.items():
latencies = [m["latency_ms"] for m in metrics]
costs = [m["cost_usd"] for m in metrics]
quality_scores = [m.get("quality_score", 0) for m in metrics]
comparison[model] = {
"p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)],
"avg_cost_per_request": sum(costs) / len(costs),
"avg_quality_score": sum(quality_scores) / len(quality_scores) if quality_scores[0] else None,
"cost_efficiency": (sum(quality_scores) / len(quality_scores)) / (sum(costs) / len(costs))
if quality_scores[0] else None
}
return comparison
压测结果分析仪表板
class LoadTestAnalyzer:
"""压测结果综合分析器"""
def generate_report(self, test_results: dict) -> str:
report = []
report.append("=" * 60)
report.append("Claude API 负载测试报告")
report.append("=" * 60)
# 1. 总体性能概览
report.append("\n## 总体性能指标")
overall = test_results.get("overall", {})
report.append(f"总请求数: {overall.get('total_requests', 0):,}")
report.append(f"成功率: {overall.get('success_rate', 0):.1%}")
report.append(f"平均延迟: {overall.get('avg_latency_ms', 0):.0f}ms")
report.append(f"P95 延迟: {overall.get('p95_latency_ms', 0):.0f}ms")
report.append(f"P99 延迟: {overall.get('p99_latency_ms', 0):.0f}ms")
report.append(f"速率限制率: {overall.get('rate_limit_rate', 0):.1%}")
# 2. Token 消耗统计
report.append("\n## Token 消耗统计")
token_stats = test_results.get("token_stats", {})
report.append(f"总输入 tokens: {token_stats.get('total_input', 0):,}")
report.append(f"总输出 tokens: {token_stats.get('total_output', 0):,}")
report.append(f"总预估成本: ${token_stats.get('estimated_cost', 0):.4f}")
# 3. 瓶颈识别
report.append("\n## 瓶颈分析")
bottlenecks = test_results.get("bottlenecks", [])
if bottlenecks:
for bottleneck in bottlenecks:
report.append(f"- [{bottleneck['severity']}] {bottleneck['type']}")
for rec in bottleneck.get('recommendations', []):
report.append(f" → {rec}")
else:
report.append("未发现明显瓶颈")
return "\n".join(report)
72.6 压测的成本控制
降低压测成本的策略
压测 Claude API 会产生真实费用,需要有意识地控制成本:
class CostControlledLoadTest:
"""
带成本控制的负载测试
"""
def __init__(
self,
max_cost_usd: float = 10.0, # 最大允许花费
test_model: str = "claude-haiku-3-5", # 优先用最便宜的模型
max_output_tokens: int = 256 # 限制输出长度
):
self.max_cost = max_cost_usd
self.test_model = test_model
self.max_output_tokens = max_output_tokens
self.current_cost = 0.0
# claude-haiku-3-5 价格(美元/百万 tokens)
self.pricing = {"input": 0.8, "output": 4.0}
def check_budget(self, estimated_tokens: tuple) -> bool:
"""检查是否还有预算继续测试"""
input_tokens, output_tokens = estimated_tokens
estimated_cost = (
input_tokens / 1e6 * self.pricing["input"] +
output_tokens / 1e6 * self.pricing["output"]
)
if self.current_cost + estimated_cost > self.max_cost:
logger.warning(
f"Budget limit approached: "
f"${self.current_cost:.4f} + ${estimated_cost:.4f} > ${self.max_cost}"
)
return False
return True
def record_cost(self, input_tokens: int, output_tokens: int):
"""记录实际花费"""
cost = (
input_tokens / 1e6 * self.pricing["input"] +
output_tokens / 1e6 * self.pricing["output"]
)
self.current_cost += cost
if self.current_cost > self.max_cost * 0.8:
logger.warning(f"80% of test budget consumed: ${self.current_cost:.4f}")
def get_cost_summary(self) -> dict:
return {
"budget_usd": self.max_cost,
"spent_usd": round(self.current_cost, 4),
"remaining_usd": round(self.max_cost - self.current_cost, 4),
"utilization_pct": self.current_cost / self.max_cost * 100
}
72.7 生产级性能优化建议
基于压测结果,常见的性能优化路径:
1. 连接复用
# 使用持久化客户端,避免每次请求重建连接
# 共享单个 Anthropic 客户端实例,而非为每个请求创建新实例
client = anthropic.Anthropic() # 在应用启动时创建,全局共享
2. 并发请求批处理
import asyncio
async def batch_process(prompts: list, concurrency: int = 10) -> list:
"""
并发处理批量请求,但控制并发数以避免速率限制
"""
semaphore = asyncio.Semaphore(concurrency)
client = anthropic.AsyncAnthropic()
async def process_one(prompt: str) -> str:
async with semaphore:
response = await client.messages.create(
model="claude-haiku-3-5",
max_tokens=512,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
tasks = [process_one(p) for p in prompts]
return await asyncio.gather(*tasks, return_exceptions=True)
3. Prompt Cache 的性能收益 对于包含长系统提示的场景,Prompt Cache 不仅降低成本,也降低 TTFB:
# 开启 Prompt Cache
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
system=[{
"type": "text",
"text": long_system_prompt,
"cache_control": {"type": "ephemeral"} # 缓存系统提示
}],
messages=[{"role": "user", "content": user_query}]
)
小结
Claude API 的压测需要针对其独特特性设计专门的测试方案:以 token 速率限制为核心约束、以延迟和 TTFB 为关键指标、以成本控制为必要约束。
Locust 和 k6 都是可行的压测框架,区别在于 Locust 对 Python 开发者更友好,k6 在 CI/CD 集成上更成熟。瓶颈定位应按照 TPM 速率限制、上下文长度、模型选择的顺序逐步排查。
最终,压测不只是发现性能极限的手段,更是理解系统在真实负载下行为的工程实践。只有基于真实数据的容量规划,才能为产品的用户增长做好充分的技术准备。