第 32 章
Environments API:容器化执行环境配置与持久化工作空间
第三十二章:Claude.ai API 集成:通过官方接口扩展 Managed Agent 能力
32.1 从 Claude.ai 到 API 的能力迁移
Claude.ai 的 Managed Agents 提供了极低的上手门槛,但它有一条清晰的能力边界:你只能使用平台提供的工具,无法接入自己的数据系统,也无法自动化大批量任务。
当用户需要突破这条边界时,有两条路:
- 完全迁移到 API:放弃 Claude.ai 界面,通过 Anthropic API 自建所有功能
- 混合架构:保留 Claude.ai 的托管便利性,同时通过 API 扩展关键能力
本章重点介绍混合架构模式,以及如何将 Claude API 的全部能力整合到企业级 Agent 系统中。
API 能力全景
Anthropic Claude API 核心能力
消息与对话
├── 基础文本生成(Messages API)
├── 流式输出(Streaming)
├── 多轮对话管理
└── 系统提示控制
工具调用(Tool Use)
├── 函数调用(Function Calling)
├── 并行工具调用
├── 工具结果反馈
└── 强制工具使用(tool_choice)
多模态
├── 图像输入(Vision)
├── 文档处理(PDF)
└── 计算机控制(Computer Use)
高级功能
├── 扩展上下文(200K tokens)
├── Batch API(异步批量处理)
├── 缓存提示词(Prompt Caching)
└── 模型选择(Opus/Sonnet/Haiku)
32.2 Anthropic Python SDK 深度使用
安装与初始化
pip install anthropic
import anthropic
import os
# 方式一:环境变量(推荐)
client = anthropic.Anthropic() # 自动读取 ANTHROPIC_API_KEY
# 方式二:显式传入
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
# 方式三:企业级配置(自定义超时、重试)
client = anthropic.Anthropic(
api_key=os.getenv("ANTHROPIC_API_KEY"),
timeout=60.0, # 单次请求超时(秒)
max_retries=3, # 自动重试次数
default_headers={ # 自定义请求头(用于追踪)
"X-Request-Source": "my-agent-v2"
}
)
基础调用模式
from anthropic import Anthropic
client = Anthropic()
# 最基础的调用
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=1024,
messages=[
{"role": "user", "content": "解释量子纠缠"}
]
)
print(response.content[0].text)
print(f"输入 tokens: {response.usage.input_tokens}")
print(f"输出 tokens: {response.usage.output_tokens}")
print(f"停止原因: {response.stop_reason}")
流式输出
对于长回复,流式输出能显著提升用户体验:
def stream_response(prompt: str) -> str:
"""流式输出,边生成边打印"""
full_text = ""
with client.messages.stream(
model="claude-opus-4-5",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text_chunk in stream.text_stream:
print(text_chunk, end="", flush=True)
full_text += text_chunk
print() # 换行
return full_text
# 异步流式输出(适合 FastAPI、asyncio 场景)
import anthropic
async def async_stream_response(prompt: str):
"""异步流式输出生成器,适用于 SSE"""
async with anthropic.AsyncAnthropic() as async_client:
async with async_client.messages.stream(
model="claude-opus-4-5",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text_chunk in stream.text_stream:
yield text_chunk
在 FastAPI 中实现 SSE 流式接口
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
app = FastAPI()
client = anthropic.AsyncAnthropic()
@app.post("/chat/stream")
async def chat_stream(request: dict):
"""Server-Sent Events 流式接口"""
async def generate():
async with client.messages.stream(
model="claude-opus-4-5",
max_tokens=2048,
system=request.get("system", ""),
messages=request["messages"]
) as stream:
async for text in stream.text_stream:
# SSE 格式:data: <content>\n\n
yield f"data: {text}\n\n"
# 发送结束信号
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no" # 禁用 nginx 缓冲
}
)
32.3 工具调用的企业级模式
并行工具调用
Claude 支持在单次回复中并行调用多个工具,显著减少延迟:
import anthropic
import json
import asyncio
from concurrent.futures import ThreadPoolExecutor
client = anthropic.Anthropic()
# 定义多个工具
tools = [
{
"name": "get_user_info",
"description": "获取用户基本信息",
"input_schema": {
"type": "object",
"properties": {
"user_id": {"type": "string", "description": "用户ID"}
},
"required": ["user_id"]
}
},
{
"name": "get_user_orders",
"description": "获取用户的订单列表",
"input_schema": {
"type": "object",
"properties": {
"user_id": {"type": "string"},
"limit": {"type": "integer", "default": 10}
},
"required": ["user_id"]
}
},
{
"name": "get_user_activity",
"description": "获取用户最近的活动记录",
"input_schema": {
"type": "object",
"properties": {
"user_id": {"type": "string"},
"days": {"type": "integer", "default": 7}
},
"required": ["user_id"]
}
}
]
# 模拟工具执行函数
def execute_tool(tool_name: str, tool_input: dict) -> str:
"""执行工具调用(真实场景中调用实际 API 或数据库)"""
if tool_name == "get_user_info":
return json.dumps({
"user_id": tool_input["user_id"],
"name": "张伟",
"email": "[email protected]",
"plan": "Professional",
"created_at": "2024-01-15"
})
elif tool_name == "get_user_orders":
return json.dumps({
"orders": [
{"id": "ord_001", "amount": 1299, "status": "delivered"},
{"id": "ord_002", "amount": 299, "status": "processing"}
],
"total": 2
})
elif tool_name == "get_user_activity":
return json.dumps({
"logins": 12,
"api_calls": 4521,
"last_active": "2025-04-27"
})
return json.dumps({"error": "Unknown tool"})
def run_agent_with_parallel_tools(user_message: str) -> str:
"""支持并行工具调用的 Agent"""
messages = [{"role": "user", "content": user_message}]
while True:
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=2048,
tools=tools,
messages=messages
)
if response.stop_reason != "tool_use":
# 提取最终文本
return next(
(block.text for block in response.content if hasattr(block, "text")),
""
)
# 收集所有工具调用
tool_uses = [block for block in response.content if block.type == "tool_use"]
# 并行执行工具(使用线程池处理 I/O 密集型工具)
with ThreadPoolExecutor(max_workers=len(tool_uses)) as executor:
futures = {
executor.submit(execute_tool, tu.name, tu.input): tu
for tu in tool_uses
}
tool_results = []
for future, tool_use in futures.items():
result = future.result()
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": result
})
# 继续对话
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
# 测试
result = run_agent_with_parallel_tools("分析用户 user_123 的账户状态和使用情况")
print(result)
强制工具使用(tool_choice)
# 强制 Claude 使用特定工具
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=1024,
tools=tools,
tool_choice={"type": "tool", "name": "get_user_info"}, # 强制使用此工具
messages=[{"role": "user", "content": "查询用户信息"}]
)
# 强制使用任意工具(至少调用一个)
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=1024,
tools=tools,
tool_choice={"type": "any"}, # 必须调用至少一个工具
messages=[{"role": "user", "content": "帮我查数据"}]
)
32.4 Prompt Caching:大幅降低成本
对于包含大量固定内容的调用(长系统提示、固定文档),Prompt Caching 可以将成本降低 90%:
def build_cached_request(
static_content: str,
user_message: str,
system: str = ""
) -> dict:
"""
构建带缓存标记的请求
static_content: 会被缓存的长文档内容
"""
return {
"model": "claude-opus-4-5",
"max_tokens": 2048,
"system": [
{
"type": "text",
"text": system or "你是一位专业助手。"
},
{
"type": "text",
"text": static_content,
"cache_control": {"type": "ephemeral"} # 标记为可缓存
}
],
"messages": [
{"role": "user", "content": user_message}
]
}
# 典型场景:长文档 QA
long_document = "..." * 5000 # 假设是一个 50K 字符的文档
# 第一次调用:写入缓存(cache miss)
response1 = client.messages.create(
**build_cached_request(
static_content=long_document,
user_message="文档中提到的主要观点是什么?"
)
)
print(f"缓存写入: {response1.usage.cache_creation_input_tokens} tokens")
# 第二次调用:命中缓存(cache hit),成本大幅降低
response2 = client.messages.create(
**build_cached_request(
static_content=long_document,
user_message="文档中有哪些数据支撑了这些观点?"
)
)
print(f"缓存读取: {response2.usage.cache_read_input_tokens} tokens")
print(f"(缓存读取成本约为普通 token 的 10%)")
Prompt Caching 的使用条件:
- 缓存的内容必须超过 1024 tokens
- 缓存有效期约 5 分钟(每次命中会刷新)
- 缓存的内容在请求中的位置必须完全一致(包括所有前置内容)
- 支持系统提示、工具定义、和消息中的静态内容缓存
32.5 Batch API:异步批量处理
对于大批量任务(数百到数千个请求),Batch API 提供异步处理,成本降低 50%:
import anthropic
import json
client = anthropic.Anthropic()
def batch_process_documents(documents: list[dict]) -> str:
"""
批量处理文档
返回 batch_id,用于后续查询结果
"""
requests = []
for i, doc in enumerate(documents):
requests.append({
"custom_id": f"doc_{i}_{doc.get('id', i)}",
"params": {
"model": "claude-haiku-4-5", # Batch API 推荐用 Haiku 控制成本
"max_tokens": 512,
"messages": [{
"role": "user",
"content": f"对以下文档生成一段摘要(不超过100字):\n\n{doc['content']}"
}]
}
})
batch = client.beta.messages.batches.create(requests=requests)
print(f"Batch 已提交,ID: {batch.id}")
print(f"状态: {batch.processing_status}")
return batch.id
def check_batch_status(batch_id: str) -> dict:
"""检查 Batch 处理状态"""
batch = client.beta.messages.batches.retrieve(batch_id)
return {
"id": batch.id,
"status": batch.processing_status,
"request_counts": {
"total": batch.request_counts.processing +
batch.request_counts.succeeded +
batch.request_counts.errored,
"succeeded": batch.request_counts.succeeded,
"errored": batch.request_counts.errored,
"processing": batch.request_counts.processing
}
}
def collect_batch_results(batch_id: str) -> dict[str, str]:
"""收集 Batch 处理结果(需要 Batch 状态为 ended)"""
results = {}
for result in client.beta.messages.batches.results(batch_id):
if result.result.type == "succeeded":
doc_id = result.custom_id
text = result.result.message.content[0].text
results[doc_id] = text
else:
results[result.custom_id] = f"ERROR: {result.result.error.type}"
return results
# 完整使用示例
import time
documents = [
{"id": "001", "content": "人工智能(AI)是计算机科学的一个分支..."},
{"id": "002", "content": "量子计算利用量子力学原理处理信息..."},
# ... 更多文档
]
# 提交批量任务
batch_id = batch_process_documents(documents)
# 轮询等待完成(生产环境应使用 webhook 或定时任务)
while True:
status = check_batch_status(batch_id)
print(f"状态: {status['status']}, 进度: {status['request_counts']}")
if status["status"] == "ended":
break
time.sleep(30) # 每30秒检查一次
# 收集结果
results = collect_batch_results(batch_id)
for doc_id, summary in results.items():
print(f"\n{doc_id}: {summary}")
32.6 企业级可靠性工程
限流与重试策略
import anthropic
from anthropic import RateLimitError, APIStatusError
import time
import random
from functools import wraps
def with_exponential_backoff(max_retries: int = 5, base_delay: float = 1.0):
"""指数退避重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except RateLimitError as e:
if attempt == max_retries - 1:
raise
# 从响应头读取建议的等待时间
retry_after = float(e.response.headers.get("retry-after", base_delay))
# 加入随机抖动避免惊群效应
jitter = random.uniform(0, 1)
wait = min(retry_after + jitter, 60.0)
print(f"限流,{wait:.1f}秒后重试(第{attempt+1}次)")
time.sleep(wait)
except APIStatusError as e:
if e.status_code >= 500 and attempt < max_retries - 1:
wait = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"服务器错误 {e.status_code},{wait:.1f}秒后重试")
time.sleep(wait)
else:
raise
return wrapper
return decorator
@with_exponential_backoff(max_retries=5)
def reliable_create(client: anthropic.Anthropic, **kwargs):
"""带重试的可靠 API 调用"""
return client.messages.create(**kwargs)
Token 预算控制
class TokenBudgetManager:
"""Token 预算管理器,防止超预算"""
def __init__(self, daily_budget: int = 1_000_000):
self.daily_budget = daily_budget
self.used_today = 0
self.last_reset = time.time()
def _maybe_reset(self):
"""每24小时重置计数"""
if time.time() - self.last_reset > 86400:
self.used_today = 0
self.last_reset = time.time()
def check_budget(self, estimated_tokens: int) -> bool:
"""检查是否有足够预算"""
self._maybe_reset()
return self.used_today + estimated_tokens <= self.daily_budget
def record_usage(self, usage: anthropic.types.Usage):
"""记录实际用量"""
self._maybe_reset()
self.used_today += usage.input_tokens + usage.output_tokens
def remaining(self) -> int:
self._maybe_reset()
return max(0, self.daily_budget - self.used_today)
budget_manager = TokenBudgetManager(daily_budget=500_000)
def budget_aware_call(client: anthropic.Anthropic,
estimated_tokens: int = 5000,
**kwargs) -> anthropic.types.Message:
"""带预算检查的 API 调用"""
if not budget_manager.check_budget(estimated_tokens):
raise RuntimeError(
f"Daily token budget exceeded. "
f"Remaining: {budget_manager.remaining()} tokens"
)
response = client.messages.create(**kwargs)
budget_manager.record_usage(response.usage)
return response
多模型路由
from enum import Enum
class ModelTier(Enum):
FAST = "claude-haiku-4-5" # 快速廉价,适合简单任务
BALANCED = "claude-sonnet-4-5" # 平衡性能和成本
POWERFUL = "claude-opus-4-5" # 最强能力,用于复杂任务
def route_to_model(
task_complexity: str,
requires_reasoning: bool = False,
max_cost_per_call: float = 0.10 # 美元
) -> str:
"""根据任务特征路由到合适的模型"""
if task_complexity == "simple" and not requires_reasoning:
return ModelTier.FAST.value
elif task_complexity == "complex" or requires_reasoning:
return ModelTier.POWERFUL.value
else:
return ModelTier.BALANCED.value
class AdaptiveAgent:
"""自适应模型选择的 Agent"""
COMPLEXITY_INDICATORS = {
"complex": ["分析", "设计", "优化", "比较", "推理", "策略", "架构"],
"simple": ["翻译", "摘要", "格式化", "分类", "提取"]
}
def __init__(self):
self.client = anthropic.Anthropic()
def _detect_complexity(self, message: str) -> str:
"""简单的规则基础复杂度检测"""
for indicator in self.COMPLEXITY_INDICATORS["complex"]:
if indicator in message:
return "complex"
for indicator in self.COMPLEXITY_INDICATORS["simple"]:
if indicator in message:
return "simple"
return "medium"
def chat(self, message: str) -> str:
complexity = self._detect_complexity(message)
model = route_to_model(complexity)
response = self.client.messages.create(
model=model,
max_tokens=2048,
messages=[{"role": "user", "content": message}]
)
print(f"[Model Router] 使用 {model}(复杂度: {complexity})")
return response.content[0].text
32.7 监控与可观测性
import time
import logging
from dataclasses import dataclass, field
from typing import Optional
logger = logging.getLogger(__name__)
@dataclass
class APICallMetrics:
"""单次 API 调用的指标"""
model: str
input_tokens: int
output_tokens: int
latency_ms: float
success: bool
error_type: Optional[str] = None
user_id: Optional[str] = None
task_type: Optional[str] = None
class MetricsCollector:
"""API 调用指标收集器"""
def __init__(self):
self.calls: list[APICallMetrics] = []
def record(self, metrics: APICallMetrics):
self.calls.append(metrics)
logger.info(
"claude_api_call",
extra={
"model": metrics.model,
"input_tokens": metrics.input_tokens,
"output_tokens": metrics.output_tokens,
"latency_ms": metrics.latency_ms,
"success": metrics.success,
"cost_usd": self._estimate_cost(metrics)
}
)
def _estimate_cost(self, m: APICallMetrics) -> float:
"""粗略估算调用成本(美元)"""
pricing = {
"claude-opus-4-5": (0.015, 0.075), # (input/1K, output/1K)
"claude-sonnet-4-5": (0.003, 0.015),
"claude-haiku-4-5": (0.00025, 0.00125)
}
inp, out = pricing.get(m.model, (0.01, 0.05))
return m.input_tokens / 1000 * inp + m.output_tokens / 1000 * out
collector = MetricsCollector()
def monitored_call(client: anthropic.Anthropic,
user_id: str = "", task_type: str = "", **kwargs):
"""带监控的 API 调用"""
start = time.time()
success = True
error_type = None
response = None
try:
response = client.messages.create(**kwargs)
return response
except Exception as e:
success = False
error_type = type(e).__name__
raise
finally:
latency = (time.time() - start) * 1000
if response:
collector.record(APICallMetrics(
model=kwargs.get("model", "unknown"),
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
latency_ms=latency,
success=success,
error_type=error_type,
user_id=user_id,
task_type=task_type
))
小结
Claude API 提供了比 Claude.ai Managed Agents 更强大但也更需要工程投入的能力集合。两者并非互斥,而是互补的:
- Claude.ai Projects:快速启动、零运维、适合知识助手和团队协作
- Claude API:完全控制、自定义工具、企业级集成、批量处理
核心 API 能力总结:
- 流式输出:同步和异步两种模式,支持 SSE 接口
- 并行工具调用:单次回复可同时执行多个工具,减少延迟
- Prompt Caching:长文档场景下降低 90% 的 token 成本
- Batch API:大批量异步处理,成本降低 50%
- 多模型路由:根据任务复杂度自动选择 Haiku/Sonnet/Opus
- 可靠性工程:指数退避重试、Token 预算管理、监控可观测性
掌握这些 API 能力,结合前面章节的 Memory Tool、Context Editing、RAG 和 Compaction 技术,你已经具备了构建生产级 Claude Agent 系统的完整技术栈。