第 12 章
AI API 集成——OpenAI、Claude 与本地模型调用
第12章:调用 AI API——用 Claude 和 GPT 给脚本加智能
一个能读懂邮件意图、给客服工单分类、把 200 页文档压缩成摘要的 Python 脚本,和普通脚本的差别只在于接入了 AI API。本章从 OpenAI 和 Anthropic Claude 两大主流平台出发,深入 Chat Completions、流式输出、Function Calling、长上下文处理,以及系统性的 Prompt 工程与成本控制策略。
AI API 生态概览
| 平台 | 核心模型 | 上下文 | 定价(输入/输出 per 1M tokens) | 最适合场景 |
|---|---|---|---|---|
| OpenAI | GPT-4o, GPT-4o-mini | 128K | $2.5/$10(4o)· $0.15/$0.6(mini) | 通用任务、Function Calling、代码生成 |
| Anthropic | Claude 3.5 Sonnet, Haiku | 200K | $3/$15(Sonnet)· $0.25/$1.25(Haiku) | 长文档分析、复杂推理、遵循指令 |
| Gemini 1.5 Pro/Flash | 1M | $3.5/$10.5(Pro)· $0.075/$0.3(Flash) | 超长上下文、多模态 | |
| 国内(阿里/百度/字节) | Qwen、ERNIE、Doubao | 128K | 约 ¥1-8 / 1M tokens | 中文优化、国内合规、低成本 |
**选型建议:**高频低复杂度任务用 GPT-4o-mini;长文档用 Claude 3.5 Sonnet;成本敏感型项目选 Gemini Flash(价格低 10-20 倍);中文优先场景选国内 API。
OpenAI API 精讲
terminal
pip install openai python-dotenv tiktoken
API Key 存入 .env 文件,加入 .gitignore,永远不要硬编码:
config.py
import os
from dotenv import load_dotenv
from openai import OpenAI
load_dotenv()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
Chat Completions——分类器完整案例
temperature=0 保证分类一致性;max_tokens=50 控制成本;system 设定角色规范。
classify.py — 客服工单自动分类器
from config import client
def classify_text(text: str, categories: list[str]) -> str:
response = client.chat.completions.create(
model="gpt-4o-mini",
temperature=0, # 分类任务:确定性输出
max_tokens=50,
messages=[
{"role": "system",
"content": f"将输入分类到:{', '.join(categories)}。只输出类别名,不要解释。"},
{"role": "user", "content": text}
]
)
return response.choices[0].message.content.strip()
# 测试
cats = ["技术支持", "账单问题", "退款申请", "产品咨询", "投诉建议"]
for t in ["账户被扣了两次钱", "软件安装后一直报错", "你们产品有什么优势?"]:
print(f"[{classify_text(t, cats)}] {t}")
流式输出与 Function Calling
stream_and_tools.py
import json
from config import client
# 流式输出
def stream_generate(prompt: str, system: str = "") -> str:
msgs = ([{"role":"system","content":system}] if system else []) + [{"role":"user","content":prompt}]
full = ""
with client.chat.completions.create(model="gpt-4o", stream=True, messages=msgs) as s:
for chunk in s:
d = chunk.choices[0].delta.content or ""
print(d, end="", flush=True); full += d
print(); return full
# Function Calling — 客服 Agent 示例
tools = [{"type":"function","function":{"name":"update_ticket","description":"更新工单状态",
"parameters":{"type":"object","properties":{
"ticket_id":{"type":"string"},"status":{"type":"string","enum":["open","processing","resolved","closed"]},
"note":{"type":"string"}},"required":["ticket_id","status"]}}}]
def update_ticket(ticket_id: str, status: str, note: str = "") -> dict:
print(f"[系统] 工单 {ticket_id} return {"success": True}
def run_agent(user_msg: str):
msgs = [{"role":"system","content":"你是客服工单助手。"},{"role":"user","content":user_msg}]
while True:
resp = client.chat.completions.create(model="gpt-4o", tools=tools, messages=msgs)
m = resp.choices[0].message; msgs.append(m)
if not m.tool_calls: print("AI:", m.content); break
for tc in m.tool_calls:
result = update_ticket(**json.loads(tc.function.arguments))
msgs.append({"role":"tool","tool_call_id":tc.id,"content":json.dumps(result, ensure_ascii=False)})
run_agent("查一下工单 TK-001 的情况,如果是登录问题就标记为 processing")
Anthropic Claude API——长文档摘要生成器
Claude 的 200K tokens 上下文约等于 150 万汉字,可一次放入整本书或几十份合同。
pip install anthropic
import os, anthropic
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
claude = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
SYSTEM_PROMPTS = {
"executive": "资深商业分析师。输出:①核心结论(3句内)②关键数据5条以内③风险点④建议行动项。用要点不用段落。",
"legal": "法律顾问助理。提取:①主要条款②双方权利义务③违约赔偿标准④需法律审核的风险条款。",
"technical": "技术文档专家。总结:①架构方案概述②关键技术指标③依赖项和版本④已知问题和限制。"
}
def summarize_document(file_path: str, summary_type: str = "executive") -> dict:
text = Path(file_path).read_text(encoding="utf-8")
msg = claude.messages.create(
model="claude-3-5-sonnet-20241022", max_tokens=2000,
system=SYSTEM_PROMPTS.get(summary_type, SYSTEM_PROMPTS["executive"]),
messages=[{"role":"user","content":f"请对以下文档生成摘要:\n\n{text}"}]
)
cost = (msg.usage.input_tokens * 3 + msg.usage.output_tokens * 15) / 1_000_000
return {"summary": msg.content[0].text, "cost_usd": round(cost, 4),
"input_tokens": msg.usage.input_tokens}
Prompt 工程最佳实践——可复用 PromptManager 类
prompt_manager.py
import json
from jinja2 import Environment, FileSystemLoader
class PromptManager:
"""Prompt 模板化管理器:分离 Prompt 与业务逻辑"""
def __init__(self, template_dir: str = "prompts"):
self.env = Environment(loader=FileSystemLoader(template_dir), trim_blocks=True)
def render(self, template_name: str, **kwargs) -> str:
return self.env.get_template(template_name).render(**kwargs)
def few_shot(self, examples: list[dict], query: str) -> list[dict]:
"""构建 few-shot 消息列表。examples: [{"input":"...","output":"..."}]"""
msgs = []
for ex in examples:
msgs += [{"role":"user","content":ex["input"]},{"role":"assistant","content":ex["output"]}]
msgs.append({"role":"user","content":query})
return msgs
# JSON 模式——结构化信息提取
def extract_email_info(text: str) -> dict:
from config import client
resp = client.chat.completions.create(
model="gpt-4o-mini", temperature=0,
response_format={"type":"json_object"},
messages=[
{"role":"system","content":'从客服邮件提取:{"intent":"退款申请|技术支持|物流查询|产品咨询|投诉|其他","urgency":"high|medium|low","requires_human":true,"key_points":[]}'},
{"role":"user","content":text}
]
)
return json.loads(resp.choices[0].message.content)
成本控制——Token 计数与磁盘缓存
cost_control.py
import hashlib, json, tiktoken
from pathlib import Path
PRICING = { # (input, output) USD per 1M tokens
"gpt-4o": (2.5, 10.0), "gpt-4o-mini": (0.15, 0.6),
"claude-3-5-sonnet-20241022": (3.0, 15.0), "claude-3-5-haiku-20241022": (0.25, 1.25),
}
def count_tokens(text: str, model: str = "gpt-4o") -> int:
return len(tiktoken.encoding_for_model(model).encode(text))
def estimate_cost(in_tok: int, out_tok: int, model: str = "gpt-4o-mini") -> float:
if model not in PRICING: return 0.0
ip, op = PRICING[model]
return round((in_tok * ip + out_tok * op) / 1_000_000, 6)
# 磁盘缓存:相同请求不重复付费
CACHE = Path(".api_cache"); CACHE.mkdir(exist_ok=True)
def disk_cache(func):
def wrapper(*args, **kwargs):
k = hashlib.md5(json.dumps({"a": args, "k": kwargs}, sort_keys=True, ensure_ascii=False).encode()).hexdigest()
cf = CACHE / f"{k}.json"
if cf.exists(): return json.loads(cf.read_text(encoding="utf-8"))
r = func(*args, **kwargs); cf.write_text(json.dumps(r, ensure_ascii=False), encoding="utf-8")
return r
return wrapper
错误处理与重试——指数退避 + 多模型降级
retry_client.py
import time, os, logging
from openai import OpenAI, RateLimitError, APIError, APIConnectionError
import anthropic
logger = logging.getLogger(__name__)
def with_retry(func, max_retries: int = 5, base_delay: float = 1.0):
"""指数退避重试:1s def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except (RateLimitError, APIConnectionError) as e:
if attempt == max_retries - 1: raise
delay = base_delay * (2 ** attempt)
logger.warning(f"重试 {attempt+1}/{max_retries},{delay:.0f}s 后...: {e}")
time.sleep(delay)
except APIError as e:
if e.status_code in (500, 502, 503, 529):
if attempt == max_retries - 1: raise
time.sleep(base_delay * (2 ** attempt))
else: raise # 4xx 不重试
return wrapper
# 多模型降级策略
FALLBACK = [("openai","gpt-4o"),("openai","gpt-4o-mini"),("anthropic","claude-3-5-haiku-20241022")]
def smart_complete(prompt: str) -> str:
"""依次尝试多个模型,任一成功即返回"""
oa = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
cl = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
for provider, model in FALLBACK:
try:
if provider == "openai":
r = oa.chat.completions.create(model=model, max_tokens=1024, messages=[{"role":"user","content":prompt}])
return r.choices[0].message.content
r = cl.messages.create(model=model, max_tokens=1024, messages=[{"role":"user","content":prompt}])
return r.content[0].text
except Exception as e:
logger.warning(f"{provider}/{model} 失败:{e}")
raise RuntimeError("所有模型均不可用")
实战项目:智能邮件分类与自动回复系统
完整流程:读取未读邮件 email_ai_system.py
"""智能邮件分类与自动回复系统"""
import json, imaplib, smtplib, email, os, logging
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from dataclasses import dataclass
from datetime import datetime
from openai import OpenAI
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
@dataclass
class EmailRecord:
uid: str; sender: str; subject: str; body: str
@dataclass
class ProcessedEmail:
record: EmailRecord; category: str; urgency: str; reply_draft: str; auto_send: bool
CATEGORIES = {"退款申请":False,"技术支持":False,"物流查询":True,"产品咨询":True,"投诉建议":False,"其他":False}
def fetch_unread_emails(limit: int = 20) -> list[EmailRecord]:
imap = imaplib.IMAP4_SSL(os.getenv("IMAP_HOST", "imap.gmail.com"))
imap.login(os.getenv("EMAIL_USER"), os.getenv("EMAIL_PASS"))
imap.select("INBOX")
_, uid_list = imap.uid("search", None, "UNSEEN")
records = []
for uid in uid_list[0].split()[-limit:]:
_, data = imap.uid("fetch", uid, "(RFC822)")
msg = email.message_from_bytes(data[0][1])
body = ""
if msg.is_multipart():
for p in msg.walk():
if p.get_content_type() == "text/plain":
body = p.get_payload(decode=True).decode("utf-8", errors="ignore"); break
else: body = msg.get_payload(decode=True).decode("utf-8", errors="ignore")
records.append(EmailRecord(uid=uid.decode(), sender=msg["From"],
subject=msg["Subject"] or "(无主题)", body=body[:2000]))
imap.logout(); return records
def analyze_and_reply(record: EmailRecord) -> ProcessedEmail:
cats = "|".join(CATEGORIES.keys())
analysis = json.loads(client.chat.completions.create(
model="gpt-4o-mini", temperature=0, response_format={"type":"json_object"},
messages=[
{"role":"system","content":f'{"category":"{cats}","urgency":"high|medium|low","customer_emotion":"angry|neutral|satisfied","key_points":[]}'},
{"role":"user","content":f"主题:{record.subject}\n{record.body}"}
]
).choices[0].message.content)
category, urgency = analysis.get("category","其他"), analysis.get("urgency","medium")
reply = client.chat.completions.create(
model="gpt-4o-mini", temperature=0.3, max_tokens=350,
messages=[
{"role":"system","content":"专业客服代表。称呼客户,回应核心诉求,给出处理方案或时间,表示感谢。不超过130字。"},
{"role":"user","content":f"主题:{record.subject}\n内容:{record.body}\n分类:{category},情绪:{analysis.get('customer_emotion','neutral')}"}
]
).choices[0].message.content
auto_send = CATEGORIES.get(category, False) and urgency == "low"
return ProcessedEmail(record=record, category=category, urgency=urgency, reply_draft=reply, auto_send=auto_send)
def send_reply(p: ProcessedEmail):
msg = MIMEMultipart(); msg["From"] = os.getenv("EMAIL_USER"); msg["To"] = p.record.sender
msg["Subject"] = f"Re: {p.record.subject}"; msg.attach(MIMEText(p.reply_draft, "plain", "utf-8"))
with smtplib.SMTP_SSL(os.getenv("SMTP_HOST","smtp.gmail.com"), 465) as smtp:
smtp.login(os.getenv("EMAIL_USER"), os.getenv("EMAIL_PASS")); smtp.send_message(msg)
def save_archive(p: ProcessedEmail, out: str = "email_archive"):
os.makedirs(out, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
with open(f"{out}/{ts}_{p.category}_{p.record.uid}.json","w",encoding="utf-8") as f:
json.dump({"uid":p.record.uid,"sender":p.record.sender,"category":p.category,
"urgency":p.urgency,"reply_draft":p.reply_draft,"auto_sent":p.auto_send,
"processed_at":datetime.now().isoformat()}, f, ensure_ascii=False, indent=2)
def main():
emails = fetch_unread_emails(limit=10)
auto_sent = queued = 0
for rec in emails:
try:
p = analyze_and_reply(rec)
save_archive(p)
if p.auto_send: send_reply(p); auto_sent += 1
else: queued += 1; logging.info(f"[{p.urgency.upper()}] {p.category} except Exception as e: logging.error(f"处理失败:{e}")
print(f"\n完成:{len(emails)} 封,自动回复 {auto_sent},待审核 {queued}")
if __name__ == "__main__": main()
**生产建议:**建议先运行一周"干跑模式"(只存档不发送),确认分类准确率超过 90% 再开启自动发送。退款、投诉类邮件建议始终人工审核。
上一章 下一章第13章:数据可视化自动化