第 32 章

Environments API:容器化执行环境配置与持久化工作空间

第三十二章:Claude.ai API 集成:通过官方接口扩展 Managed Agent 能力

32.1 从 Claude.ai 到 API 的能力迁移

Claude.ai 的 Managed Agents 提供了极低的上手门槛,但它有一条清晰的能力边界:你只能使用平台提供的工具,无法接入自己的数据系统,也无法自动化大批量任务。

当用户需要突破这条边界时,有两条路:

  1. 完全迁移到 API:放弃 Claude.ai 界面,通过 Anthropic API 自建所有功能
  2. 混合架构:保留 Claude.ai 的托管便利性,同时通过 API 扩展关键能力

本章重点介绍混合架构模式,以及如何将 Claude API 的全部能力整合到企业级 Agent 系统中。

API 能力全景

Anthropic Claude API 核心能力

消息与对话
├── 基础文本生成(Messages API)
├── 流式输出(Streaming)
├── 多轮对话管理
└── 系统提示控制

工具调用(Tool Use)
├── 函数调用(Function Calling)
├── 并行工具调用
├── 工具结果反馈
└── 强制工具使用(tool_choice)

多模态
├── 图像输入(Vision)
├── 文档处理(PDF)
└── 计算机控制(Computer Use)

高级功能
├── 扩展上下文(200K tokens)
├── Batch API(异步批量处理)
├── 缓存提示词(Prompt Caching)
└── 模型选择(Opus/Sonnet/Haiku)

32.2 Anthropic Python SDK 深度使用

安装与初始化

pip install anthropic
import anthropic
import os

# 方式一:环境变量(推荐)
client = anthropic.Anthropic()  # 自动读取 ANTHROPIC_API_KEY

# 方式二:显式传入
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))

# 方式三:企业级配置(自定义超时、重试)
client = anthropic.Anthropic(
    api_key=os.getenv("ANTHROPIC_API_KEY"),
    timeout=60.0,         # 单次请求超时(秒)
    max_retries=3,        # 自动重试次数
    default_headers={     # 自定义请求头(用于追踪)
        "X-Request-Source": "my-agent-v2"
    }
)

基础调用模式

from anthropic import Anthropic

client = Anthropic()

# 最基础的调用
response = client.messages.create(
    model="claude-opus-4-5",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "解释量子纠缠"}
    ]
)

print(response.content[0].text)
print(f"输入 tokens: {response.usage.input_tokens}")
print(f"输出 tokens: {response.usage.output_tokens}")
print(f"停止原因: {response.stop_reason}")

流式输出

对于长回复,流式输出能显著提升用户体验:

def stream_response(prompt: str) -> str:
    """流式输出,边生成边打印"""
    full_text = ""
    
    with client.messages.stream(
        model="claude-opus-4-5",
        max_tokens=2048,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        for text_chunk in stream.text_stream:
            print(text_chunk, end="", flush=True)
            full_text += text_chunk
    
    print()  # 换行
    return full_text


# 异步流式输出(适合 FastAPI、asyncio 场景)
import anthropic

async def async_stream_response(prompt: str):
    """异步流式输出生成器,适用于 SSE"""
    async with anthropic.AsyncAnthropic() as async_client:
        async with async_client.messages.stream(
            model="claude-opus-4-5",
            max_tokens=2048,
            messages=[{"role": "user", "content": prompt}]
        ) as stream:
            async for text_chunk in stream.text_stream:
                yield text_chunk

在 FastAPI 中实现 SSE 流式接口

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic

app = FastAPI()
client = anthropic.AsyncAnthropic()

@app.post("/chat/stream")
async def chat_stream(request: dict):
    """Server-Sent Events 流式接口"""
    
    async def generate():
        async with client.messages.stream(
            model="claude-opus-4-5",
            max_tokens=2048,
            system=request.get("system", ""),
            messages=request["messages"]
        ) as stream:
            async for text in stream.text_stream:
                # SSE 格式:data: <content>\n\n
                yield f"data: {text}\n\n"
        
        # 发送结束信号
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no"  # 禁用 nginx 缓冲
        }
    )

32.3 工具调用的企业级模式

并行工具调用

Claude 支持在单次回复中并行调用多个工具,显著减少延迟:

import anthropic
import json
import asyncio
from concurrent.futures import ThreadPoolExecutor

client = anthropic.Anthropic()

# 定义多个工具
tools = [
    {
        "name": "get_user_info",
        "description": "获取用户基本信息",
        "input_schema": {
            "type": "object",
            "properties": {
                "user_id": {"type": "string", "description": "用户ID"}
            },
            "required": ["user_id"]
        }
    },
    {
        "name": "get_user_orders",
        "description": "获取用户的订单列表",
        "input_schema": {
            "type": "object",
            "properties": {
                "user_id": {"type": "string"},
                "limit": {"type": "integer", "default": 10}
            },
            "required": ["user_id"]
        }
    },
    {
        "name": "get_user_activity",
        "description": "获取用户最近的活动记录",
        "input_schema": {
            "type": "object",
            "properties": {
                "user_id": {"type": "string"},
                "days": {"type": "integer", "default": 7}
            },
            "required": ["user_id"]
        }
    }
]

# 模拟工具执行函数
def execute_tool(tool_name: str, tool_input: dict) -> str:
    """执行工具调用(真实场景中调用实际 API 或数据库)"""
    if tool_name == "get_user_info":
        return json.dumps({
            "user_id": tool_input["user_id"],
            "name": "张伟",
            "email": "[email protected]",
            "plan": "Professional",
            "created_at": "2024-01-15"
        })
    elif tool_name == "get_user_orders":
        return json.dumps({
            "orders": [
                {"id": "ord_001", "amount": 1299, "status": "delivered"},
                {"id": "ord_002", "amount": 299, "status": "processing"}
            ],
            "total": 2
        })
    elif tool_name == "get_user_activity":
        return json.dumps({
            "logins": 12,
            "api_calls": 4521,
            "last_active": "2025-04-27"
        })
    return json.dumps({"error": "Unknown tool"})


def run_agent_with_parallel_tools(user_message: str) -> str:
    """支持并行工具调用的 Agent"""
    messages = [{"role": "user", "content": user_message}]
    
    while True:
        response = client.messages.create(
            model="claude-opus-4-5",
            max_tokens=2048,
            tools=tools,
            messages=messages
        )
        
        if response.stop_reason != "tool_use":
            # 提取最终文本
            return next(
                (block.text for block in response.content if hasattr(block, "text")), 
                ""
            )
        
        # 收集所有工具调用
        tool_uses = [block for block in response.content if block.type == "tool_use"]
        
        # 并行执行工具(使用线程池处理 I/O 密集型工具)
        with ThreadPoolExecutor(max_workers=len(tool_uses)) as executor:
            futures = {
                executor.submit(execute_tool, tu.name, tu.input): tu
                for tu in tool_uses
            }
            
            tool_results = []
            for future, tool_use in futures.items():
                result = future.result()
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": tool_use.id,
                    "content": result
                })
        
        # 继续对话
        messages.append({"role": "assistant", "content": response.content})
        messages.append({"role": "user", "content": tool_results})


# 测试
result = run_agent_with_parallel_tools("分析用户 user_123 的账户状态和使用情况")
print(result)

强制工具使用(tool_choice)

# 强制 Claude 使用特定工具
response = client.messages.create(
    model="claude-opus-4-5",
    max_tokens=1024,
    tools=tools,
    tool_choice={"type": "tool", "name": "get_user_info"},  # 强制使用此工具
    messages=[{"role": "user", "content": "查询用户信息"}]
)

# 强制使用任意工具(至少调用一个)
response = client.messages.create(
    model="claude-opus-4-5",
    max_tokens=1024,
    tools=tools,
    tool_choice={"type": "any"},  # 必须调用至少一个工具
    messages=[{"role": "user", "content": "帮我查数据"}]
)

32.4 Prompt Caching:大幅降低成本

对于包含大量固定内容的调用(长系统提示、固定文档),Prompt Caching 可以将成本降低 90%:

def build_cached_request(
    static_content: str,
    user_message: str,
    system: str = ""
) -> dict:
    """
    构建带缓存标记的请求
    static_content: 会被缓存的长文档内容
    """
    return {
        "model": "claude-opus-4-5",
        "max_tokens": 2048,
        "system": [
            {
                "type": "text",
                "text": system or "你是一位专业助手。"
            },
            {
                "type": "text",
                "text": static_content,
                "cache_control": {"type": "ephemeral"}  # 标记为可缓存
            }
        ],
        "messages": [
            {"role": "user", "content": user_message}
        ]
    }


# 典型场景:长文档 QA
long_document = "..." * 5000  # 假设是一个 50K 字符的文档

# 第一次调用:写入缓存(cache miss)
response1 = client.messages.create(
    **build_cached_request(
        static_content=long_document,
        user_message="文档中提到的主要观点是什么?"
    )
)
print(f"缓存写入: {response1.usage.cache_creation_input_tokens} tokens")

# 第二次调用:命中缓存(cache hit),成本大幅降低
response2 = client.messages.create(
    **build_cached_request(
        static_content=long_document,
        user_message="文档中有哪些数据支撑了这些观点?"
    )
)
print(f"缓存读取: {response2.usage.cache_read_input_tokens} tokens")
print(f"(缓存读取成本约为普通 token 的 10%)")

Prompt Caching 的使用条件:

32.5 Batch API:异步批量处理

对于大批量任务(数百到数千个请求),Batch API 提供异步处理,成本降低 50%:

import anthropic
import json

client = anthropic.Anthropic()

def batch_process_documents(documents: list[dict]) -> str:
    """
    批量处理文档
    返回 batch_id,用于后续查询结果
    """
    requests = []
    for i, doc in enumerate(documents):
        requests.append({
            "custom_id": f"doc_{i}_{doc.get('id', i)}",
            "params": {
                "model": "claude-haiku-4-5",  # Batch API 推荐用 Haiku 控制成本
                "max_tokens": 512,
                "messages": [{
                    "role": "user",
                    "content": f"对以下文档生成一段摘要(不超过100字):\n\n{doc['content']}"
                }]
            }
        })
    
    batch = client.beta.messages.batches.create(requests=requests)
    print(f"Batch 已提交,ID: {batch.id}")
    print(f"状态: {batch.processing_status}")
    return batch.id


def check_batch_status(batch_id: str) -> dict:
    """检查 Batch 处理状态"""
    batch = client.beta.messages.batches.retrieve(batch_id)
    return {
        "id": batch.id,
        "status": batch.processing_status,
        "request_counts": {
            "total": batch.request_counts.processing + 
                     batch.request_counts.succeeded + 
                     batch.request_counts.errored,
            "succeeded": batch.request_counts.succeeded,
            "errored": batch.request_counts.errored,
            "processing": batch.request_counts.processing
        }
    }


def collect_batch_results(batch_id: str) -> dict[str, str]:
    """收集 Batch 处理结果(需要 Batch 状态为 ended)"""
    results = {}
    
    for result in client.beta.messages.batches.results(batch_id):
        if result.result.type == "succeeded":
            doc_id = result.custom_id
            text = result.result.message.content[0].text
            results[doc_id] = text
        else:
            results[result.custom_id] = f"ERROR: {result.result.error.type}"
    
    return results


# 完整使用示例
import time

documents = [
    {"id": "001", "content": "人工智能(AI)是计算机科学的一个分支..."},
    {"id": "002", "content": "量子计算利用量子力学原理处理信息..."},
    # ... 更多文档
]

# 提交批量任务
batch_id = batch_process_documents(documents)

# 轮询等待完成(生产环境应使用 webhook 或定时任务)
while True:
    status = check_batch_status(batch_id)
    print(f"状态: {status['status']}, 进度: {status['request_counts']}")
    
    if status["status"] == "ended":
        break
    
    time.sleep(30)  # 每30秒检查一次

# 收集结果
results = collect_batch_results(batch_id)
for doc_id, summary in results.items():
    print(f"\n{doc_id}: {summary}")

32.6 企业级可靠性工程

限流与重试策略

import anthropic
from anthropic import RateLimitError, APIStatusError
import time
import random
from functools import wraps

def with_exponential_backoff(max_retries: int = 5, base_delay: float = 1.0):
    """指数退避重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except RateLimitError as e:
                    if attempt == max_retries - 1:
                        raise
                    # 从响应头读取建议的等待时间
                    retry_after = float(e.response.headers.get("retry-after", base_delay))
                    # 加入随机抖动避免惊群效应
                    jitter = random.uniform(0, 1)
                    wait = min(retry_after + jitter, 60.0)
                    print(f"限流,{wait:.1f}秒后重试(第{attempt+1}次)")
                    time.sleep(wait)
                except APIStatusError as e:
                    if e.status_code >= 500 and attempt < max_retries - 1:
                        wait = base_delay * (2 ** attempt) + random.uniform(0, 1)
                        print(f"服务器错误 {e.status_code},{wait:.1f}秒后重试")
                        time.sleep(wait)
                    else:
                        raise
        return wrapper
    return decorator


@with_exponential_backoff(max_retries=5)
def reliable_create(client: anthropic.Anthropic, **kwargs):
    """带重试的可靠 API 调用"""
    return client.messages.create(**kwargs)

Token 预算控制

class TokenBudgetManager:
    """Token 预算管理器,防止超预算"""
    
    def __init__(self, daily_budget: int = 1_000_000):
        self.daily_budget = daily_budget
        self.used_today = 0
        self.last_reset = time.time()
    
    def _maybe_reset(self):
        """每24小时重置计数"""
        if time.time() - self.last_reset > 86400:
            self.used_today = 0
            self.last_reset = time.time()
    
    def check_budget(self, estimated_tokens: int) -> bool:
        """检查是否有足够预算"""
        self._maybe_reset()
        return self.used_today + estimated_tokens <= self.daily_budget
    
    def record_usage(self, usage: anthropic.types.Usage):
        """记录实际用量"""
        self._maybe_reset()
        self.used_today += usage.input_tokens + usage.output_tokens
    
    def remaining(self) -> int:
        self._maybe_reset()
        return max(0, self.daily_budget - self.used_today)


budget_manager = TokenBudgetManager(daily_budget=500_000)

def budget_aware_call(client: anthropic.Anthropic, 
                       estimated_tokens: int = 5000,
                       **kwargs) -> anthropic.types.Message:
    """带预算检查的 API 调用"""
    if not budget_manager.check_budget(estimated_tokens):
        raise RuntimeError(
            f"Daily token budget exceeded. "
            f"Remaining: {budget_manager.remaining()} tokens"
        )
    
    response = client.messages.create(**kwargs)
    budget_manager.record_usage(response.usage)
    return response

多模型路由

from enum import Enum

class ModelTier(Enum):
    FAST = "claude-haiku-4-5"       # 快速廉价,适合简单任务
    BALANCED = "claude-sonnet-4-5"   # 平衡性能和成本
    POWERFUL = "claude-opus-4-5"     # 最强能力,用于复杂任务

def route_to_model(
    task_complexity: str,
    requires_reasoning: bool = False,
    max_cost_per_call: float = 0.10  # 美元
) -> str:
    """根据任务特征路由到合适的模型"""
    
    if task_complexity == "simple" and not requires_reasoning:
        return ModelTier.FAST.value
    elif task_complexity == "complex" or requires_reasoning:
        return ModelTier.POWERFUL.value
    else:
        return ModelTier.BALANCED.value


class AdaptiveAgent:
    """自适应模型选择的 Agent"""
    
    COMPLEXITY_INDICATORS = {
        "complex": ["分析", "设计", "优化", "比较", "推理", "策略", "架构"],
        "simple": ["翻译", "摘要", "格式化", "分类", "提取"]
    }
    
    def __init__(self):
        self.client = anthropic.Anthropic()
    
    def _detect_complexity(self, message: str) -> str:
        """简单的规则基础复杂度检测"""
        for indicator in self.COMPLEXITY_INDICATORS["complex"]:
            if indicator in message:
                return "complex"
        for indicator in self.COMPLEXITY_INDICATORS["simple"]:
            if indicator in message:
                return "simple"
        return "medium"
    
    def chat(self, message: str) -> str:
        complexity = self._detect_complexity(message)
        model = route_to_model(complexity)
        
        response = self.client.messages.create(
            model=model,
            max_tokens=2048,
            messages=[{"role": "user", "content": message}]
        )
        
        print(f"[Model Router] 使用 {model}(复杂度: {complexity})")
        return response.content[0].text

32.7 监控与可观测性

import time
import logging
from dataclasses import dataclass, field
from typing import Optional

logger = logging.getLogger(__name__)

@dataclass
class APICallMetrics:
    """单次 API 调用的指标"""
    model: str
    input_tokens: int
    output_tokens: int
    latency_ms: float
    success: bool
    error_type: Optional[str] = None
    user_id: Optional[str] = None
    task_type: Optional[str] = None

class MetricsCollector:
    """API 调用指标收集器"""
    
    def __init__(self):
        self.calls: list[APICallMetrics] = []
    
    def record(self, metrics: APICallMetrics):
        self.calls.append(metrics)
        logger.info(
            "claude_api_call",
            extra={
                "model": metrics.model,
                "input_tokens": metrics.input_tokens,
                "output_tokens": metrics.output_tokens,
                "latency_ms": metrics.latency_ms,
                "success": metrics.success,
                "cost_usd": self._estimate_cost(metrics)
            }
        )
    
    def _estimate_cost(self, m: APICallMetrics) -> float:
        """粗略估算调用成本(美元)"""
        pricing = {
            "claude-opus-4-5": (0.015, 0.075),   # (input/1K, output/1K)
            "claude-sonnet-4-5": (0.003, 0.015),
            "claude-haiku-4-5": (0.00025, 0.00125)
        }
        inp, out = pricing.get(m.model, (0.01, 0.05))
        return m.input_tokens / 1000 * inp + m.output_tokens / 1000 * out


collector = MetricsCollector()

def monitored_call(client: anthropic.Anthropic, 
                    user_id: str = "", task_type: str = "", **kwargs):
    """带监控的 API 调用"""
    start = time.time()
    success = True
    error_type = None
    response = None
    
    try:
        response = client.messages.create(**kwargs)
        return response
    except Exception as e:
        success = False
        error_type = type(e).__name__
        raise
    finally:
        latency = (time.time() - start) * 1000
        
        if response:
            collector.record(APICallMetrics(
                model=kwargs.get("model", "unknown"),
                input_tokens=response.usage.input_tokens,
                output_tokens=response.usage.output_tokens,
                latency_ms=latency,
                success=success,
                error_type=error_type,
                user_id=user_id,
                task_type=task_type
            ))

小结

Claude API 提供了比 Claude.ai Managed Agents 更强大但也更需要工程投入的能力集合。两者并非互斥,而是互补的:

核心 API 能力总结:

掌握这些 API 能力,结合前面章节的 Memory Tool、Context Editing、RAG 和 Compaction 技术,你已经具备了构建生产级 Claude Agent 系统的完整技术栈。

本章评分
4.8  / 5  (3 评分)

💬 留言讨论