Usage & Cost API:用量报告 / 成本追踪 / Datadog 等可观测平台集成
第六十五章:使用量监控与成本分配:团队级别的 Token 预算管理
65.1 为什么成本可见性是企业级采用的关键
当企业开始将 Claude 从试点项目推向全面生产部署时,一个几乎必然出现的管理挑战是:谁在用,用了多少,花了多少钱?
在个人或小团队场景下,这个问题并不紧迫。每月的 API 账单一目了然,超支了调整一下用法即可。但当使用者扩展到数十个团队、数百个工程师、数十个产品线时,成本可见性就变成了一个系统性问题。
没有成本可见性的后果是显著的:
- 财务团队无法将 AI 支出归因到具体业务单元,导致成本中心分配混乱
- 工程团队缺乏优化动机,因为他们感受不到过度使用的代价
- 平台团队无法提前预警超支风险,预算管控失去主动性
- 产品决策缺乏数据支撑,无法判断某个功能的 AI 成本是否合理
本章将系统介绍如何利用 Anthropic 提供的管理 API、自建监控基础设施,以及预算分配机制,在企业环境中实现细粒度的 token 用量追踪与成本归因。
65.2 Anthropic 管理 API:使用量数据的权威来源
管理 API 概览
Anthropic 提供了专门的管理端 API(Admin API),允许具有管理员权限的账号查询组织层面的使用数据。这些 API 与标准的对话 API 使用不同的认证机制,通常需要专门的 Admin API Key。
核心使用量端点:
GET /v1/usage
GET /v1/usage/monthly
GET /v1/organizations/{org_id}/usage
GET /v1/projects/{project_id}/usage
GET /v1/workspaces/{workspace_id}/usage
一个典型的用量查询请求:
import anthropic
import os
from datetime import datetime, timedelta
client = anthropic.Anthropic(
api_key=os.environ["ANTHROPIC_ADMIN_API_KEY"]
)
# 查询过去 30 天的使用量
end_date = datetime.now()
start_date = end_date - timedelta(days=30)
usage_data = client.admin.usage.list(
start_time=start_date.isoformat(),
end_time=end_date.isoformat(),
granularity="daily",
group_by=["workspace", "model"]
)
for record in usage_data.data:
print(f"日期: {record.date}")
print(f"工作区: {record.workspace_id}")
print(f"模型: {record.model}")
print(f"输入 tokens: {record.input_tokens:,}")
print(f"输出 tokens: {record.output_tokens:,}")
print(f"请求次数: {record.request_count:,}")
print("---")
工作区与项目层级
Anthropic 的账户结构支持多层级组织:
组织 (Organization)
└── 工作区 (Workspace)
└── API Keys
└── 项目 (Project) [部分层级支持]
最佳实践是按业务单元或团队创建独立工作区,并为每个工作区分配独立的 API Key 集合。这样可以在查询使用量时直接按工作区维度聚合,无需额外的 key-to-team 映射逻辑。
# 按工作区查询使用量并生成报告
def generate_workspace_cost_report(month: str) -> dict:
"""
month: 格式 "2024-03"
返回每个工作区的成本明细
"""
workspace_costs = {}
# 定义各模型单价(美元/百万 tokens)
pricing = {
"claude-opus-4-5": {"input": 15.0, "output": 75.0},
"claude-sonnet-4-5": {"input": 3.0, "output": 15.0},
"claude-haiku-3-5": {"input": 0.8, "output": 4.0},
}
workspaces = client.admin.workspaces.list()
for workspace in workspaces.data:
usage = client.admin.usage.list(
workspace_id=workspace.id,
start_time=f"{month}-01T00:00:00Z",
end_time=f"{month}-31T23:59:59Z",
granularity="monthly"
)
total_cost = 0.0
model_breakdown = {}
for record in usage.data:
model = record.model
if model in pricing:
input_cost = (record.input_tokens / 1_000_000) * pricing[model]["input"]
output_cost = (record.output_tokens / 1_000_000) * pricing[model]["output"]
cost = input_cost + output_cost
model_breakdown[model] = {
"input_tokens": record.input_tokens,
"output_tokens": record.output_tokens,
"cost_usd": round(cost, 4)
}
total_cost += cost
workspace_costs[workspace.name] = {
"total_cost_usd": round(total_cost, 2),
"by_model": model_breakdown
}
return workspace_costs
65.3 自建监控基础设施:超越 API 原生能力
为什么需要自建监控
Anthropic 管理 API 提供的是账单级别的汇总数据,粒度通常到天、工作区和模型维度。但企业往往需要更细粒度的追踪:
- 具体到某个功能、某个 endpoint、某个用户的用量
- 实时告警(如某团队当天用量超过预算的 80%)
- 与内部成本分摊系统集成
- 历史趋势分析与预测
- 按请求类型(如"文档摘要"vs"代码审查")分类成本
这些需求需要在应用层自建监控逻辑。
中间件拦截模式
最优雅的自建监控方案是在 API 调用链路中插入中间件,在不修改业务代码的前提下捕获每次请求的 token 使用量:
import time
import uuid
from functools import wraps
from dataclasses import dataclass, field
from typing import Optional, Callable
import anthropic
@dataclass
class UsageRecord:
request_id: str
timestamp: float
team_id: str
feature_name: str
model: str
input_tokens: int
output_tokens: int
cache_read_tokens: int = 0
cache_write_tokens: int = 0
latency_ms: float = 0.0
success: bool = True
error_code: Optional[str] = None
class ClaudeMonitoringWrapper:
"""对 Anthropic 客户端的监控包装器"""
def __init__(
self,
client: anthropic.Anthropic,
team_id: str,
metrics_sink: Callable[[UsageRecord], None]
):
self.client = client
self.team_id = team_id
self.metrics_sink = metrics_sink
def messages_create(
self,
feature_name: str,
**kwargs
):
request_id = str(uuid.uuid4())
start_time = time.time()
try:
response = self.client.messages.create(**kwargs)
latency_ms = (time.time() - start_time) * 1000
usage = response.usage
record = UsageRecord(
request_id=request_id,
timestamp=start_time,
team_id=self.team_id,
feature_name=feature_name,
model=response.model,
input_tokens=usage.input_tokens,
output_tokens=usage.output_tokens,
cache_read_tokens=getattr(usage, 'cache_read_input_tokens', 0),
cache_write_tokens=getattr(usage, 'cache_creation_input_tokens', 0),
latency_ms=latency_ms,
success=True
)
self.metrics_sink(record)
return response
except anthropic.APIError as e:
latency_ms = (time.time() - start_time) * 1000
record = UsageRecord(
request_id=request_id,
timestamp=start_time,
team_id=self.team_id,
feature_name=feature_name,
model=kwargs.get("model", "unknown"),
input_tokens=0,
output_tokens=0,
latency_ms=latency_ms,
success=False,
error_code=str(e.status_code)
)
self.metrics_sink(record)
raise
指标存储与查询
对于中小规模部署,推荐使用 ClickHouse 存储 token 使用记录,因为它对时序聚合查询有极高的性能:
-- ClickHouse 建表
CREATE TABLE claude_usage (
request_id UUID,
timestamp DateTime64(3),
team_id LowCardinality(String),
feature_name LowCardinality(String),
model LowCardinality(String),
input_tokens UInt32,
output_tokens UInt32,
cache_read_tokens UInt32,
cache_write_tokens UInt32,
latency_ms Float32,
success UInt8,
error_code Nullable(String)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (team_id, timestamp);
-- 查询本月各团队成本
SELECT
team_id,
feature_name,
sum(input_tokens) as total_input,
sum(output_tokens) as total_output,
-- claude-sonnet-4-5 价格
round(sum(input_tokens) / 1e6 * 3.0 + sum(output_tokens) / 1e6 * 15.0, 2) as cost_usd
FROM claude_usage
WHERE timestamp >= toStartOfMonth(now())
AND model = 'claude-sonnet-4-5'
GROUP BY team_id, feature_name
ORDER BY cost_usd DESC;
65.4 Token 预算管理:从监控到控制
分级预算模型
企业级 token 预算应支持多层次的配额体系:
组织级预算 (月度总预算)
├── 团队 A 预算 (月度分配)
│ ├── 功能 A1 每日限额
│ └── 功能 A2 每日限额
├── 团队 B 预算
└── 团队 C 预算(弹性超配,但触发告警)
以下是基于 Redis 实现的轻量级预算管控系统:
import redis
import json
from datetime import datetime
from typing import Tuple
class TokenBudgetManager:
"""
基于 Redis 的 Token 预算管控器
支持:月度/日度预算、硬限制/软告警、实时消耗追踪
"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def set_budget(
self,
team_id: str,
monthly_tokens: int,
daily_tokens: Optional[int] = None,
alert_threshold: float = 0.8
):
"""设置团队预算"""
budget = {
"monthly_tokens": monthly_tokens,
"daily_tokens": daily_tokens or (monthly_tokens // 30),
"alert_threshold": alert_threshold
}
self.redis.hset(f"budget:config:{team_id}", mapping=budget)
def check_and_consume(
self,
team_id: str,
estimated_tokens: int
) -> Tuple[bool, str]:
"""
检查预算并预扣费
返回: (is_allowed, reason)
"""
now = datetime.now()
month_key = f"budget:usage:{team_id}:{now.strftime('%Y-%m')}"
day_key = f"budget:usage:{team_id}:{now.strftime('%Y-%m-%d')}"
# 获取预算配置
config = self.redis.hgetall(f"budget:config:{team_id}")
if not config:
return True, "no_budget_configured"
monthly_limit = int(config[b"monthly_tokens"])
daily_limit = int(config[b"daily_tokens"])
# 检查月度预算
current_month = int(self.redis.get(month_key) or 0)
if current_month + estimated_tokens > monthly_limit:
return False, f"monthly_budget_exceeded: {current_month}/{monthly_limit}"
# 检查日度预算
current_day = int(self.redis.get(day_key) or 0)
if current_day + estimated_tokens > daily_limit:
return False, f"daily_budget_exceeded: {current_day}/{daily_limit}"
# 预扣费(使用 pipeline 保证原子性)
pipe = self.redis.pipeline()
pipe.incrby(month_key, estimated_tokens)
pipe.expire(month_key, 35 * 24 * 3600) # 35天过期
pipe.incrby(day_key, estimated_tokens)
pipe.expire(day_key, 25 * 3600) # 25小时过期
pipe.execute()
# 检查告警阈值
alert_threshold = float(config.get(b"alert_threshold", 0.8))
if (current_month + estimated_tokens) / monthly_limit > alert_threshold:
self._trigger_alert(team_id, "monthly", current_month, monthly_limit)
return True, "ok"
def reconcile_actual_usage(
self,
team_id: str,
estimated_tokens: int,
actual_tokens: int
):
"""请求完成后,用实际用量修正预扣费"""
diff = actual_tokens - estimated_tokens
if diff == 0:
return
now = datetime.now()
month_key = f"budget:usage:{team_id}:{now.strftime('%Y-%m')}"
day_key = f"budget:usage:{team_id}:{now.strftime('%Y-%m-%d')}"
pipe = self.redis.pipeline()
pipe.incrby(month_key, diff)
pipe.incrby(day_key, diff)
pipe.execute()
def _trigger_alert(self, team_id: str, period: str, used: int, limit: int):
"""触发预算告警(集成 Slack/PagerDuty 等)"""
# 实现告警发送逻辑
pass
Prompt 层面的 Token 估算
在请求发出之前预估 token 用量,是实现预扣费机制的前提。由于 Claude 使用与 GPT 相似的 BPE tokenizer,可以用 tiktoken 做近似估算:
import tiktoken
def estimate_tokens(messages: list, model: str = "claude-sonnet-4-5") -> int:
"""
估算 token 用量(输入部分)
注意:这是近似值,实际值会有 ±10% 的偏差
"""
# 使用 cl100k_base 作为近似 tokenizer
enc = tiktoken.get_encoding("cl100k_base")
total_tokens = 0
for msg in messages:
# 消息结构开销
total_tokens += 4
if isinstance(msg.get("content"), str):
total_tokens += len(enc.encode(msg["content"]))
elif isinstance(msg.get("content"), list):
for block in msg["content"]:
if block.get("type") == "text":
total_tokens += len(enc.encode(block["text"]))
# 添加 15% 的安全余量
return int(total_tokens * 1.15)
65.5 成本分配报告:与财务系统集成
成本分配的三种模式
企业通常采用以下三种模式之一来分配 AI 使用成本:
1. 直接归因模式(Showback) 各团队看到自己的实际用量和对应成本,但成本仍由中央预算承担。用于提高成本意识,不涉及实际资金转移。
2. 内部转账模式(Chargeback) 按照实际使用量将成本从中央 IT 预算转移到业务单元预算。需要与财务系统(如 SAP、Oracle)集成。
3. 配额制模式(Budget Allocation) 每个团队获得固定的 token 配额,超出后需要申请额外配额或自动降级到更便宜的模型。
自动化月度报告
import pandas as pd
from jinja2 import Template
def generate_monthly_cost_report(month: str) -> str:
"""
生成月度成本分配报告 HTML
"""
# 从 ClickHouse 获取数据
query = f"""
SELECT
team_id,
feature_name,
model,
sum(input_tokens) as input_tokens,
sum(output_tokens) as output_tokens,
countIf(success = 1) as success_requests,
countIf(success = 0) as error_requests,
avg(latency_ms) as avg_latency_ms
FROM claude_usage
WHERE toYYYYMM(timestamp) = toYYYYMM(toDate('{month}-01'))
GROUP BY team_id, feature_name, model
ORDER BY team_id, feature_name
"""
df = pd.read_sql(query, clickhouse_conn)
# 计算成本
pricing = {
"claude-opus-4-5": (15.0, 75.0),
"claude-sonnet-4-5": (3.0, 15.0),
"claude-haiku-3-5": (0.8, 4.0),
}
def calc_cost(row):
in_price, out_price = pricing.get(row["model"], (3.0, 15.0))
return (row["input_tokens"] / 1e6 * in_price +
row["output_tokens"] / 1e6 * out_price)
df["cost_usd"] = df.apply(calc_cost, axis=1)
# 按团队汇总
team_summary = df.groupby("team_id").agg(
total_cost=("cost_usd", "sum"),
total_requests=("success_requests", "sum"),
total_errors=("error_requests", "sum")
).reset_index()
return render_report_template(df, team_summary, month)
65.6 模型路由优化:降低成本的工程手段
基于复杂度的模型选择
并非所有请求都需要最强的模型。通过智能路由,可以在不牺牲质量的前提下显著降低成本:
class ModelRouter:
"""
基于请求复杂度的智能模型路由器
简单任务 → Haiku (成本最低)
中等任务 → Sonnet (平衡)
复杂任务 → Opus (最强)
"""
def select_model(self, request_context: dict) -> str:
complexity = self._estimate_complexity(request_context)
if complexity < 0.3:
return "claude-haiku-3-5"
elif complexity < 0.7:
return "claude-sonnet-4-5"
else:
return "claude-opus-4-5"
def _estimate_complexity(self, ctx: dict) -> float:
score = 0.0
# 输入长度
total_chars = sum(len(m.get("content", "")) for m in ctx.get("messages", []))
if total_chars > 10000:
score += 0.3
elif total_chars > 3000:
score += 0.15
# 任务类型
task_type = ctx.get("task_type", "")
complex_tasks = {"code_review", "architecture_design", "legal_analysis"}
simple_tasks = {"translation", "summarization", "classification"}
if task_type in complex_tasks:
score += 0.4
elif task_type in simple_tasks:
score += 0.0
else:
score += 0.2
# 工具使用
if ctx.get("tools"):
score += 0.2
return min(score, 1.0)
Prompt 缓存的成本收益分析
对于包含大量重复前缀(如系统提示、长文档上下文)的请求,Claude 的 Prompt Cache 功能可以将输入 token 成本降低最高 90%:
def analyze_cache_opportunity(prompt_logs: list) -> dict:
"""
分析历史请求中的缓存潜力
"""
from collections import Counter
# 提取系统提示前缀
system_prefixes = Counter()
for log in prompt_logs:
system = log.get("system", "")
if len(system) > 500:
# 取前 2000 字符作为前缀指纹
prefix = system[:2000]
system_prefixes[prefix] += 1
cacheable_requests = sum(
count for prefix, count in system_prefixes.items()
if count > 5 # 出现超过 5 次才有缓存价值
)
cache_rate = cacheable_requests / len(prompt_logs)
estimated_savings = cache_rate * 0.9 # 缓存命中节省 90% 输入成本
return {
"cacheable_request_rate": round(cache_rate, 2),
"estimated_cost_reduction": f"{estimated_savings:.0%}",
"top_cacheable_prefixes": [
prefix[:100] + "..."
for prefix, _ in system_prefixes.most_common(3)
]
}
65.7 Grafana 监控看板设计
关键指标体系
企业级 Claude 使用量监控看板应涵盖以下指标维度:
成本维度
- 当月累计成本 vs 预算(进度条)
- 各团队成本排名(条形图)
- 成本按天趋势(时序图)
- 成本按模型分布(饼图)
性能维度
- P50/P90/P99 延迟趋势
- 请求成功率
- 错误类型分布(429 速率限制、500 服务错误等)
- 并发请求数趋势
效率维度
- 缓存命中率
- 平均输入/输出 token 比
- 各功能的 cost-per-request
# Grafana Dashboard as Code (部分)
panels:
- title: "月度成本进度"
type: gauge
targets:
- expr: |
sum(claude_usage_cost_usd{period="current_month"})
/ claude_budget_monthly_usd * 100
fieldConfig:
thresholds:
- value: 80
color: yellow
- value: 95
color: red
- title: "各团队 Token 消耗(今日)"
type: bargauge
targets:
- expr: |
sum by (team_id) (
increase(claude_input_tokens_total[1d])
+ increase(claude_output_tokens_total[1d])
)
65.8 最佳实践与常见陷阱
最佳实践
1. 从第一天就建立成本追踪习惯 不要等到账单超支才开始关注。即使在原型阶段,也应该记录每次请求的 token 用量。
2. 为每个请求打上业务标签 团队、功能、环境(dev/staging/prod)、用户 ID(匿名化后)。标签越丰富,后续的成本归因越精确。
3. 设置多层次告警,不只是最终超支告警 建议在 50%、75%、90%、100% 处各设置一次告警,让团队有足够的响应窗口。
4. 区分 dev 和 prod 预算 开发环境的用量往往很不规律,混入生产数据会扭曲成本分析。
5. 定期做成本复盘 每月召集各团队做成本回顾,分享降本最佳实践,建立成本优化文化。
常见陷阱
陷阱 1:只看总量,不看效率 某个功能的 token 总量高不代表有问题,关键是 cost-per-value 是否合理。
陷阱 2:过度限制反而影响生产力 预算管控太严格,频繁触发限额,会导致工程师绕过监控系统(如使用个人 API Key)。
陷阱 3:忽略缓存命中率 Prompt 缓存可以大幅降低成本,但需要主动设计和监控,否则其优化空间会被浪费。
陷阱 4:估算偏差累积 如果 token 估算系统性偏低,预扣费机制会持续低估实际消耗,导致月末突然超支。
小结
企业级 token 预算管理是一个涉及监控基础设施、成本归因逻辑、预算控制机制和组织流程的系统性工程。核心原则是:让每一分 AI 成本都能被追踪、归因和优化。
从 Anthropic Admin API 获取权威账单数据,在应用层自建细粒度监控,通过 Redis 实现实时预算管控,最终以 Grafana 看板实现全面可见性——这四个层次共同构成了企业级 Claude 成本管理体系的基础。随着 AI 使用规模的扩大,这套体系将成为控制 AI TCO(总拥有成本)的核心工具。