第 65 章

Usage & Cost API:用量报告 / 成本追踪 / Datadog 等可观测平台集成

第六十五章:使用量监控与成本分配:团队级别的 Token 预算管理

65.1 为什么成本可见性是企业级采用的关键

当企业开始将 Claude 从试点项目推向全面生产部署时,一个几乎必然出现的管理挑战是:谁在用,用了多少,花了多少钱?

在个人或小团队场景下,这个问题并不紧迫。每月的 API 账单一目了然,超支了调整一下用法即可。但当使用者扩展到数十个团队、数百个工程师、数十个产品线时,成本可见性就变成了一个系统性问题。

没有成本可见性的后果是显著的:

本章将系统介绍如何利用 Anthropic 提供的管理 API、自建监控基础设施,以及预算分配机制,在企业环境中实现细粒度的 token 用量追踪与成本归因。

65.2 Anthropic 管理 API:使用量数据的权威来源

管理 API 概览

Anthropic 提供了专门的管理端 API(Admin API),允许具有管理员权限的账号查询组织层面的使用数据。这些 API 与标准的对话 API 使用不同的认证机制,通常需要专门的 Admin API Key。

核心使用量端点:

GET /v1/usage
GET /v1/usage/monthly
GET /v1/organizations/{org_id}/usage
GET /v1/projects/{project_id}/usage
GET /v1/workspaces/{workspace_id}/usage

一个典型的用量查询请求:

import anthropic
import os
from datetime import datetime, timedelta

client = anthropic.Anthropic(
    api_key=os.environ["ANTHROPIC_ADMIN_API_KEY"]
)

# 查询过去 30 天的使用量
end_date = datetime.now()
start_date = end_date - timedelta(days=30)

usage_data = client.admin.usage.list(
    start_time=start_date.isoformat(),
    end_time=end_date.isoformat(),
    granularity="daily",
    group_by=["workspace", "model"]
)

for record in usage_data.data:
    print(f"日期: {record.date}")
    print(f"工作区: {record.workspace_id}")
    print(f"模型: {record.model}")
    print(f"输入 tokens: {record.input_tokens:,}")
    print(f"输出 tokens: {record.output_tokens:,}")
    print(f"请求次数: {record.request_count:,}")
    print("---")

工作区与项目层级

Anthropic 的账户结构支持多层级组织:

组织 (Organization)
  └── 工作区 (Workspace)
        └── API Keys
        └── 项目 (Project) [部分层级支持]

最佳实践是按业务单元或团队创建独立工作区,并为每个工作区分配独立的 API Key 集合。这样可以在查询使用量时直接按工作区维度聚合,无需额外的 key-to-team 映射逻辑。

# 按工作区查询使用量并生成报告
def generate_workspace_cost_report(month: str) -> dict:
    """
    month: 格式 "2024-03"
    返回每个工作区的成本明细
    """
    workspace_costs = {}
    
    # 定义各模型单价(美元/百万 tokens)
    pricing = {
        "claude-opus-4-5": {"input": 15.0, "output": 75.0},
        "claude-sonnet-4-5": {"input": 3.0, "output": 15.0},
        "claude-haiku-3-5": {"input": 0.8, "output": 4.0},
    }
    
    workspaces = client.admin.workspaces.list()
    
    for workspace in workspaces.data:
        usage = client.admin.usage.list(
            workspace_id=workspace.id,
            start_time=f"{month}-01T00:00:00Z",
            end_time=f"{month}-31T23:59:59Z",
            granularity="monthly"
        )
        
        total_cost = 0.0
        model_breakdown = {}
        
        for record in usage.data:
            model = record.model
            if model in pricing:
                input_cost = (record.input_tokens / 1_000_000) * pricing[model]["input"]
                output_cost = (record.output_tokens / 1_000_000) * pricing[model]["output"]
                cost = input_cost + output_cost
                
                model_breakdown[model] = {
                    "input_tokens": record.input_tokens,
                    "output_tokens": record.output_tokens,
                    "cost_usd": round(cost, 4)
                }
                total_cost += cost
        
        workspace_costs[workspace.name] = {
            "total_cost_usd": round(total_cost, 2),
            "by_model": model_breakdown
        }
    
    return workspace_costs

65.3 自建监控基础设施:超越 API 原生能力

为什么需要自建监控

Anthropic 管理 API 提供的是账单级别的汇总数据,粒度通常到天、工作区和模型维度。但企业往往需要更细粒度的追踪:

这些需求需要在应用层自建监控逻辑。

中间件拦截模式

最优雅的自建监控方案是在 API 调用链路中插入中间件,在不修改业务代码的前提下捕获每次请求的 token 使用量:

import time
import uuid
from functools import wraps
from dataclasses import dataclass, field
from typing import Optional, Callable
import anthropic


@dataclass
class UsageRecord:
    request_id: str
    timestamp: float
    team_id: str
    feature_name: str
    model: str
    input_tokens: int
    output_tokens: int
    cache_read_tokens: int = 0
    cache_write_tokens: int = 0
    latency_ms: float = 0.0
    success: bool = True
    error_code: Optional[str] = None


class ClaudeMonitoringWrapper:
    """对 Anthropic 客户端的监控包装器"""
    
    def __init__(
        self,
        client: anthropic.Anthropic,
        team_id: str,
        metrics_sink: Callable[[UsageRecord], None]
    ):
        self.client = client
        self.team_id = team_id
        self.metrics_sink = metrics_sink
    
    def messages_create(
        self,
        feature_name: str,
        **kwargs
    ):
        request_id = str(uuid.uuid4())
        start_time = time.time()
        
        try:
            response = self.client.messages.create(**kwargs)
            latency_ms = (time.time() - start_time) * 1000
            
            usage = response.usage
            record = UsageRecord(
                request_id=request_id,
                timestamp=start_time,
                team_id=self.team_id,
                feature_name=feature_name,
                model=response.model,
                input_tokens=usage.input_tokens,
                output_tokens=usage.output_tokens,
                cache_read_tokens=getattr(usage, 'cache_read_input_tokens', 0),
                cache_write_tokens=getattr(usage, 'cache_creation_input_tokens', 0),
                latency_ms=latency_ms,
                success=True
            )
            self.metrics_sink(record)
            return response
            
        except anthropic.APIError as e:
            latency_ms = (time.time() - start_time) * 1000
            record = UsageRecord(
                request_id=request_id,
                timestamp=start_time,
                team_id=self.team_id,
                feature_name=feature_name,
                model=kwargs.get("model", "unknown"),
                input_tokens=0,
                output_tokens=0,
                latency_ms=latency_ms,
                success=False,
                error_code=str(e.status_code)
            )
            self.metrics_sink(record)
            raise

指标存储与查询

对于中小规模部署,推荐使用 ClickHouse 存储 token 使用记录,因为它对时序聚合查询有极高的性能:

-- ClickHouse 建表
CREATE TABLE claude_usage (
    request_id UUID,
    timestamp DateTime64(3),
    team_id LowCardinality(String),
    feature_name LowCardinality(String),
    model LowCardinality(String),
    input_tokens UInt32,
    output_tokens UInt32,
    cache_read_tokens UInt32,
    cache_write_tokens UInt32,
    latency_ms Float32,
    success UInt8,
    error_code Nullable(String)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (team_id, timestamp);

-- 查询本月各团队成本
SELECT
    team_id,
    feature_name,
    sum(input_tokens) as total_input,
    sum(output_tokens) as total_output,
    -- claude-sonnet-4-5 价格
    round(sum(input_tokens) / 1e6 * 3.0 + sum(output_tokens) / 1e6 * 15.0, 2) as cost_usd
FROM claude_usage
WHERE timestamp >= toStartOfMonth(now())
  AND model = 'claude-sonnet-4-5'
GROUP BY team_id, feature_name
ORDER BY cost_usd DESC;

65.4 Token 预算管理:从监控到控制

分级预算模型

企业级 token 预算应支持多层次的配额体系:

组织级预算 (月度总预算)
  ├── 团队 A 预算 (月度分配)
  │     ├── 功能 A1 每日限额
  │     └── 功能 A2 每日限额
  ├── 团队 B 预算
  └── 团队 C 预算(弹性超配,但触发告警)

以下是基于 Redis 实现的轻量级预算管控系统:

import redis
import json
from datetime import datetime
from typing import Tuple

class TokenBudgetManager:
    """
    基于 Redis 的 Token 预算管控器
    支持:月度/日度预算、硬限制/软告警、实时消耗追踪
    """
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    def set_budget(
        self,
        team_id: str,
        monthly_tokens: int,
        daily_tokens: Optional[int] = None,
        alert_threshold: float = 0.8
    ):
        """设置团队预算"""
        budget = {
            "monthly_tokens": monthly_tokens,
            "daily_tokens": daily_tokens or (monthly_tokens // 30),
            "alert_threshold": alert_threshold
        }
        self.redis.hset(f"budget:config:{team_id}", mapping=budget)
    
    def check_and_consume(
        self,
        team_id: str,
        estimated_tokens: int
    ) -> Tuple[bool, str]:
        """
        检查预算并预扣费
        返回: (is_allowed, reason)
        """
        now = datetime.now()
        month_key = f"budget:usage:{team_id}:{now.strftime('%Y-%m')}"
        day_key = f"budget:usage:{team_id}:{now.strftime('%Y-%m-%d')}"
        
        # 获取预算配置
        config = self.redis.hgetall(f"budget:config:{team_id}")
        if not config:
            return True, "no_budget_configured"
        
        monthly_limit = int(config[b"monthly_tokens"])
        daily_limit = int(config[b"daily_tokens"])
        
        # 检查月度预算
        current_month = int(self.redis.get(month_key) or 0)
        if current_month + estimated_tokens > monthly_limit:
            return False, f"monthly_budget_exceeded: {current_month}/{monthly_limit}"
        
        # 检查日度预算
        current_day = int(self.redis.get(day_key) or 0)
        if current_day + estimated_tokens > daily_limit:
            return False, f"daily_budget_exceeded: {current_day}/{daily_limit}"
        
        # 预扣费(使用 pipeline 保证原子性)
        pipe = self.redis.pipeline()
        pipe.incrby(month_key, estimated_tokens)
        pipe.expire(month_key, 35 * 24 * 3600)  # 35天过期
        pipe.incrby(day_key, estimated_tokens)
        pipe.expire(day_key, 25 * 3600)  # 25小时过期
        pipe.execute()
        
        # 检查告警阈值
        alert_threshold = float(config.get(b"alert_threshold", 0.8))
        if (current_month + estimated_tokens) / monthly_limit > alert_threshold:
            self._trigger_alert(team_id, "monthly", current_month, monthly_limit)
        
        return True, "ok"
    
    def reconcile_actual_usage(
        self,
        team_id: str,
        estimated_tokens: int,
        actual_tokens: int
    ):
        """请求完成后,用实际用量修正预扣费"""
        diff = actual_tokens - estimated_tokens
        if diff == 0:
            return
            
        now = datetime.now()
        month_key = f"budget:usage:{team_id}:{now.strftime('%Y-%m')}"
        day_key = f"budget:usage:{team_id}:{now.strftime('%Y-%m-%d')}"
        
        pipe = self.redis.pipeline()
        pipe.incrby(month_key, diff)
        pipe.incrby(day_key, diff)
        pipe.execute()
    
    def _trigger_alert(self, team_id: str, period: str, used: int, limit: int):
        """触发预算告警(集成 Slack/PagerDuty 等)"""
        # 实现告警发送逻辑
        pass

Prompt 层面的 Token 估算

在请求发出之前预估 token 用量,是实现预扣费机制的前提。由于 Claude 使用与 GPT 相似的 BPE tokenizer,可以用 tiktoken 做近似估算:

import tiktoken

def estimate_tokens(messages: list, model: str = "claude-sonnet-4-5") -> int:
    """
    估算 token 用量(输入部分)
    注意:这是近似值,实际值会有 ±10% 的偏差
    """
    # 使用 cl100k_base 作为近似 tokenizer
    enc = tiktoken.get_encoding("cl100k_base")
    
    total_tokens = 0
    for msg in messages:
        # 消息结构开销
        total_tokens += 4
        
        if isinstance(msg.get("content"), str):
            total_tokens += len(enc.encode(msg["content"]))
        elif isinstance(msg.get("content"), list):
            for block in msg["content"]:
                if block.get("type") == "text":
                    total_tokens += len(enc.encode(block["text"]))
    
    # 添加 15% 的安全余量
    return int(total_tokens * 1.15)

65.5 成本分配报告:与财务系统集成

成本分配的三种模式

企业通常采用以下三种模式之一来分配 AI 使用成本:

1. 直接归因模式(Showback) 各团队看到自己的实际用量和对应成本,但成本仍由中央预算承担。用于提高成本意识,不涉及实际资金转移。

2. 内部转账模式(Chargeback) 按照实际使用量将成本从中央 IT 预算转移到业务单元预算。需要与财务系统(如 SAP、Oracle)集成。

3. 配额制模式(Budget Allocation) 每个团队获得固定的 token 配额,超出后需要申请额外配额或自动降级到更便宜的模型。

自动化月度报告

import pandas as pd
from jinja2 import Template

def generate_monthly_cost_report(month: str) -> str:
    """
    生成月度成本分配报告 HTML
    """
    # 从 ClickHouse 获取数据
    query = f"""
    SELECT
        team_id,
        feature_name,
        model,
        sum(input_tokens) as input_tokens,
        sum(output_tokens) as output_tokens,
        countIf(success = 1) as success_requests,
        countIf(success = 0) as error_requests,
        avg(latency_ms) as avg_latency_ms
    FROM claude_usage
    WHERE toYYYYMM(timestamp) = toYYYYMM(toDate('{month}-01'))
    GROUP BY team_id, feature_name, model
    ORDER BY team_id, feature_name
    """
    
    df = pd.read_sql(query, clickhouse_conn)
    
    # 计算成本
    pricing = {
        "claude-opus-4-5": (15.0, 75.0),
        "claude-sonnet-4-5": (3.0, 15.0),
        "claude-haiku-3-5": (0.8, 4.0),
    }
    
    def calc_cost(row):
        in_price, out_price = pricing.get(row["model"], (3.0, 15.0))
        return (row["input_tokens"] / 1e6 * in_price +
                row["output_tokens"] / 1e6 * out_price)
    
    df["cost_usd"] = df.apply(calc_cost, axis=1)
    
    # 按团队汇总
    team_summary = df.groupby("team_id").agg(
        total_cost=("cost_usd", "sum"),
        total_requests=("success_requests", "sum"),
        total_errors=("error_requests", "sum")
    ).reset_index()
    
    return render_report_template(df, team_summary, month)

65.6 模型路由优化:降低成本的工程手段

基于复杂度的模型选择

并非所有请求都需要最强的模型。通过智能路由,可以在不牺牲质量的前提下显著降低成本:

class ModelRouter:
    """
    基于请求复杂度的智能模型路由器
    简单任务 → Haiku (成本最低)
    中等任务 → Sonnet (平衡)
    复杂任务 → Opus (最强)
    """
    
    def select_model(self, request_context: dict) -> str:
        complexity = self._estimate_complexity(request_context)
        
        if complexity < 0.3:
            return "claude-haiku-3-5"
        elif complexity < 0.7:
            return "claude-sonnet-4-5"
        else:
            return "claude-opus-4-5"
    
    def _estimate_complexity(self, ctx: dict) -> float:
        score = 0.0
        
        # 输入长度
        total_chars = sum(len(m.get("content", "")) for m in ctx.get("messages", []))
        if total_chars > 10000:
            score += 0.3
        elif total_chars > 3000:
            score += 0.15
        
        # 任务类型
        task_type = ctx.get("task_type", "")
        complex_tasks = {"code_review", "architecture_design", "legal_analysis"}
        simple_tasks = {"translation", "summarization", "classification"}
        
        if task_type in complex_tasks:
            score += 0.4
        elif task_type in simple_tasks:
            score += 0.0
        else:
            score += 0.2
        
        # 工具使用
        if ctx.get("tools"):
            score += 0.2
        
        return min(score, 1.0)

Prompt 缓存的成本收益分析

对于包含大量重复前缀(如系统提示、长文档上下文)的请求,Claude 的 Prompt Cache 功能可以将输入 token 成本降低最高 90%:

def analyze_cache_opportunity(prompt_logs: list) -> dict:
    """
    分析历史请求中的缓存潜力
    """
    from collections import Counter
    
    # 提取系统提示前缀
    system_prefixes = Counter()
    for log in prompt_logs:
        system = log.get("system", "")
        if len(system) > 500:
            # 取前 2000 字符作为前缀指纹
            prefix = system[:2000]
            system_prefixes[prefix] += 1
    
    cacheable_requests = sum(
        count for prefix, count in system_prefixes.items()
        if count > 5  # 出现超过 5 次才有缓存价值
    )
    
    cache_rate = cacheable_requests / len(prompt_logs)
    estimated_savings = cache_rate * 0.9  # 缓存命中节省 90% 输入成本
    
    return {
        "cacheable_request_rate": round(cache_rate, 2),
        "estimated_cost_reduction": f"{estimated_savings:.0%}",
        "top_cacheable_prefixes": [
            prefix[:100] + "..." 
            for prefix, _ in system_prefixes.most_common(3)
        ]
    }

65.7 Grafana 监控看板设计

关键指标体系

企业级 Claude 使用量监控看板应涵盖以下指标维度:

成本维度

性能维度

效率维度

# Grafana Dashboard as Code (部分)
panels:
  - title: "月度成本进度"
    type: gauge
    targets:
      - expr: |
          sum(claude_usage_cost_usd{period="current_month"}) 
          / claude_budget_monthly_usd * 100
    fieldConfig:
      thresholds:
        - value: 80
          color: yellow
        - value: 95
          color: red

  - title: "各团队 Token 消耗(今日)"
    type: bargauge
    targets:
      - expr: |
          sum by (team_id) (
            increase(claude_input_tokens_total[1d])
            + increase(claude_output_tokens_total[1d])
          )

65.8 最佳实践与常见陷阱

最佳实践

1. 从第一天就建立成本追踪习惯 不要等到账单超支才开始关注。即使在原型阶段,也应该记录每次请求的 token 用量。

2. 为每个请求打上业务标签 团队、功能、环境(dev/staging/prod)、用户 ID(匿名化后)。标签越丰富,后续的成本归因越精确。

3. 设置多层次告警,不只是最终超支告警 建议在 50%、75%、90%、100% 处各设置一次告警,让团队有足够的响应窗口。

4. 区分 dev 和 prod 预算 开发环境的用量往往很不规律,混入生产数据会扭曲成本分析。

5. 定期做成本复盘 每月召集各团队做成本回顾,分享降本最佳实践,建立成本优化文化。

常见陷阱

陷阱 1:只看总量,不看效率 某个功能的 token 总量高不代表有问题,关键是 cost-per-value 是否合理。

陷阱 2:过度限制反而影响生产力 预算管控太严格,频繁触发限额,会导致工程师绕过监控系统(如使用个人 API Key)。

陷阱 3:忽略缓存命中率 Prompt 缓存可以大幅降低成本,但需要主动设计和监控,否则其优化空间会被浪费。

陷阱 4:估算偏差累积 如果 token 估算系统性偏低,预扣费机制会持续低估实际消耗,导致月末突然超支。


小结

企业级 token 预算管理是一个涉及监控基础设施、成本归因逻辑、预算控制机制和组织流程的系统性工程。核心原则是:让每一分 AI 成本都能被追踪、归因和优化

从 Anthropic Admin API 获取权威账单数据,在应用层自建细粒度监控,通过 Redis 实现实时预算管控,最终以 Grafana 看板实现全面可见性——这四个层次共同构成了企业级 Claude 成本管理体系的基础。随着 AI 使用规模的扩大,这套体系将成为控制 AI TCO(总拥有成本)的核心工具。

本章评分
4.9  / 5  (3 评分)

💬 留言讨论