Chapter 12

AI API Integration — OpenAI, Claude, and Local Models

Chapter 12: Calling AI APIs — Add Intelligence with Claude & GPT

The difference between a script that moves files and one that understands intent, classifies content, and generates human-quality replies is a single API call. This chapter covers OpenAI and Anthropic Claude end-to-end: Chat Completions, streaming, Function Calling, 200K-token long-context processing, Jinja2 prompt templates, tiktoken cost control, exponential backoff, and multi-model fallback — closing with a complete intelligent email triage system.

AI API Ecosystem

Provider Key Models Context Price (per 1M tokens in/out) Best For
OpenAI GPT-4o, GPT-4o-mini 128K $2.5/$10 · $0.15/$0.6 General tasks, Function Calling, code
Anthropic Claude 3.5 Sonnet, Haiku 200K $3/$15 · $0.25/$1.25 Long documents, complex reasoning
Google Gemini 1.5 Pro/Flash 1M $3.5/$10.5 · $0.075/$0.3 Ultra-long context, multimodal
Domestic CN Qwen, ERNIE, Doubao 128K ~¥1-8 / 1M tokens Chinese optimization, compliance

Decision guide: High-frequency low-complexity

OpenAI API Deep Dive

terminal

pip install openai python-dotenv tiktoken

config.py

import os
from dotenv import load_dotenv
from openai import OpenAI
load_dotenv()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

Chat Completions + Streaming + Function Calling

openai_patterns.py

import json
from config import client

# ── 1. Classification (temperature=0 for determinism) ──────────────
def classify_text(text: str, categories: list[str]) -> str:
    r = client.chat.completions.create(
        model="gpt-4o-mini", temperature=0, max_tokens=50,
        messages=[
            {"role":"system","content":f"Classify into one of: {', '.join(categories)}. Output only the category name."},
            {"role":"user","content":text}
        ]
    )
    return r.choices[0].message.content.strip()

# ── 2. Streaming ────────────────────────────────────────────────────
def stream_generate(prompt: str, system: str = "") -> str:
    msgs = ([{"role":"system","content":system}] if system else []) + [{"role":"user","content":prompt}]
    full = ""
    with client.chat.completions.create(model="gpt-4o", stream=True, messages=msgs) as s:
        for chunk in s:
            d = chunk.choices[0].delta.content or ""
            print(d, end="", flush=True); full += d
    print(); return full

# ── 3. Function Calling — support ticket agent ──────────────────────
tools = [{"type":"function","function":{"name":"update_ticket","description":"Update ticket status",
    "parameters":{"type":"object","properties":{
        "ticket_id":{"type":"string"},"status":{"type":"string","enum":["open","processing","resolved","closed"]},
        "note":{"type":"string"}},"required":["ticket_id","status"]}}}]

def update_ticket(ticket_id, status, note=""): return {"success": True}

def run_agent(user_msg: str):
    msgs = [{"role":"system","content":"You are a support ticket assistant."},{"role":"user","content":user_msg}]
    while True:
        resp = client.chat.completions.create(model="gpt-4o", tools=tools, messages=msgs)
        m = resp.choices[0].message; msgs.append(m)
        if not m.tool_calls: print("AI:", m.content); break
        for tc in m.tool_calls:
            result = update_ticket(**json.loads(tc.function.arguments))
            msgs.append({"role":"tool","tool_call_id":tc.id,"content":json.dumps(result)})

Anthropic Claude API — Long Document Summarizer

Claude's 200K token context window fits an entire book in a single call — its key advantage for document-heavy workloads.

claude_summary.py

import os, anthropic
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
claude = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))

SYSTEM_MAP = {
    "executive": "Senior business analyst. Output: (1) Core conclusions in 3 sentences, (2) Up to 5 key facts, (3) Risks needing leadership attention, (4) Recommended actions. Use bullet points.",
    "legal": "Legal counsel assistant. Extract: (1) Main contract terms, (2) Rights and obligations, (3) Breach and penalty clauses, (4) Clauses needing legal review.",
    "technical": "Technical doc expert. Summarize: (1) Architecture overview, (2) Key metrics, (3) Dependencies, (4) Known issues."
}

def summarize_document(file_path: str, summary_type: str = "executive") -> dict:
    text = Path(file_path).read_text(encoding="utf-8")
    msg = claude.messages.create(model="claude-3-5-sonnet-20241022", max_tokens=2000,
        system=SYSTEM_MAP.get(summary_type, SYSTEM_MAP["executive"]),
        messages=[{"role":"user","content":f"Summarize:\n\n{text}"}])
    cost = (msg.usage.input_tokens * 3 + msg.usage.output_tokens * 15) / 1_000_000
    return {"summary": msg.content[0].text, "cost_usd": round(cost, 4)}

Prompt Engineering — Reusable PromptManager

prompt_manager.py

import json
from jinja2 import Environment, FileSystemLoader

class PromptManager:
    def __init__(self, template_dir: str = "prompts"):
        self.env = Environment(loader=FileSystemLoader(template_dir), trim_blocks=True)
    def render(self, tpl: str, **kw) -> str: return self.env.get_template(tpl).render(**kw)
    def few_shot(self, examples: list[dict], query: str) -> list[dict]:
        msgs = []
        for ex in examples:
            msgs += [{"role":"user","content":ex["input"]},{"role":"assistant","content":ex["output"]}]
        msgs.append({"role":"user","content":query}); return msgs

# JSON mode for structured output
def extract_email_info(text: str, client) -> dict:
    r = client.chat.completions.create(model="gpt-4o-mini", temperature=0,
        response_format={"type":"json_object"},
        messages=[
            {"role":"system","content":'Extract: {"intent":"...","urgency":"high|medium|low","requires_human":true,"key_points":[]}'},
            {"role":"user","content":text}
        ])
    return json.loads(r.choices[0].message.content)

Cost Control — Token Counting & Disk Cache

cost_control.py

import hashlib, json, tiktoken
from pathlib import Path

PRICING = {"gpt-4o":(2.5,10.0),"gpt-4o-mini":(0.15,0.6),"claude-3-5-sonnet-20241022":(3.0,15.0),"claude-3-5-haiku-20241022":(0.25,1.25)}

def count_tokens(text: str, model: str = "gpt-4o") -> int:
    return len(tiktoken.encoding_for_model(model).encode(text))

def estimate_cost(in_tok, out_tok, model="gpt-4o-mini"):
    if model not in PRICING: return 0.0
    ip, op = PRICING[model]; return round((in_tok*ip + out_tok*op)/1_000_000, 6)

CACHE = Path(".api_cache"); CACHE.mkdir(exist_ok=True)
def disk_cache(func):
    def wrapper(*args, **kwargs):
        k = hashlib.md5(json.dumps({"a":args,"k":kwargs},sort_keys=True,ensure_ascii=False).encode()).hexdigest()
        cf = CACHE / f"{k}.json"
        if cf.exists(): return json.loads(cf.read_text(encoding="utf-8"))
        r = func(*args, **kwargs); cf.write_text(json.dumps(r, ensure_ascii=False), encoding="utf-8"); return r
    return wrapper

Error Handling — Exponential Backoff & Multi-Model Fallback

retry_client.py

import time, os, logging
from openai import OpenAI, RateLimitError, APIError, APIConnectionError
import anthropic

logger = logging.getLogger(__name__)

def with_retry(func, max_retries=5, base_delay=1.0):
    """Exponential backoff: 1s     def wrapper(*args, **kwargs):
        for attempt in range(max_retries):
            try: return func(*args, **kwargs)
            except (RateLimitError, APIConnectionError) as e:
                if attempt == max_retries-1: raise
                delay = base_delay*(2**attempt); logger.warning(f"Retry {attempt+1}/{max_retries} in {delay:.0f}s"); time.sleep(delay)
            except APIError as e:
                if e.status_code in (500,502,503,529):
                    if attempt == max_retries-1: raise
                    time.sleep(base_delay*(2**attempt))
                else: raise
    return wrapper

FALLBACK = [("openai","gpt-4o"),("openai","gpt-4o-mini"),("anthropic","claude-3-5-haiku-20241022")]

def smart_complete(prompt: str) -> str:
    oa = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    cl = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
    for provider, model in FALLBACK:
        try:
            if provider == "openai":
                return oa.chat.completions.create(model=model,max_tokens=1024,messages=[{"role":"user","content":prompt}]).choices[0].message.content
            return cl.messages.create(model=model,max_tokens=1024,messages=[{"role":"user","content":prompt}]).content[0].text
        except Exception as e: logger.warning(f"{provider}/{model} failed: {e}")
    raise RuntimeError("All models unavailable")

Full Project: Intelligent Email Triage System

Pipeline: fetch unread emails email_ai_system.py

"""Intelligent Email Triage — fetch import json, imaplib, smtplib, email, os, logging
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from dataclasses import dataclass
from datetime import datetime
from openai import OpenAI
from dotenv import load_dotenv

load_dotenv(); logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

@dataclass
class EmailRecord: uid: str; sender: str; subject: str; body: str
@dataclass
class ProcessedEmail: record: EmailRecord; category: str; urgency: str; reply_draft: str; auto_send: bool

CATEGORIES = {"Refund Request":False,"Technical Support":False,"Shipping Query":True,"Product Inquiry":True,"Complaint":False,"Other":False}

def fetch_unread(limit=20) -> list[EmailRecord]:
    imap = imaplib.IMAP4_SSL(os.getenv("IMAP_HOST","imap.gmail.com"))
    imap.login(os.getenv("EMAIL_USER"), os.getenv("EMAIL_PASS")); imap.select("INBOX")
    _, uid_list = imap.uid("search", None, "UNSEEN"); records = []
    for uid in uid_list[0].split()[-limit:]:
        _, data = imap.uid("fetch", uid, "(RFC822)"); msg = email.message_from_bytes(data[0][1])
        body = ""
        if msg.is_multipart():
            for p in msg.walk():
                if p.get_content_type()=="text/plain": body=p.get_payload(decode=True).decode("utf-8",errors="ignore"); break
        else: body = msg.get_payload(decode=True).decode("utf-8",errors="ignore")
        records.append(EmailRecord(uid=uid.decode(), sender=msg["From"], subject=msg["Subject"] or "(no subject)", body=body[:2000]))
    imap.logout(); return records

def analyze_and_reply(rec: EmailRecord) -> ProcessedEmail:
    cats = "|".join(CATEGORIES.keys())
    info = json.loads(client.chat.completions.create(
        model="gpt-4o-mini", temperature=0, response_format={"type":"json_object"},
        messages=[
            {"role":"system","content":f'{"category":"{cats}","urgency":"high|medium|low","customer_emotion":"angry|neutral|satisfied","key_points":[]}'},
            {"role":"user","content":f"Subject: {rec.subject}\n{rec.body}"}
        ]
    ).choices[0].message.content)
    cat, urg = info.get("category","Other"), info.get("urgency","medium")
    draft = client.chat.completions.create(
        model="gpt-4o-mini", temperature=0.3, max_tokens=300,
        messages=[
            {"role":"system","content":"Professional customer service rep. Address the customer, respond to their core concern, state resolution or timeline, close with thanks. Max 100 words."},
            {"role":"user","content":f"Subject: {rec.subject}\n{rec.body}\nCategory: {cat}, Emotion: {info.get('customer_emotion','neutral')}"}
        ]
    ).choices[0].message.content
    return ProcessedEmail(record=rec, category=cat, urgency=urg, reply_draft=draft, auto_send=CATEGORIES.get(cat,False) and urg=="low")

def send_reply(p: ProcessedEmail):
    msg = MIMEMultipart(); msg["From"]=os.getenv("EMAIL_USER"); msg["To"]=p.record.sender
    msg["Subject"]=f"Re: {p.record.subject}"; msg.attach(MIMEText(p.reply_draft,"plain","utf-8"))
    with smtplib.SMTP_SSL(os.getenv("SMTP_HOST","smtp.gmail.com"),465) as smtp:
        smtp.login(os.getenv("EMAIL_USER"),os.getenv("EMAIL_PASS")); smtp.send_message(msg)

def archive(p: ProcessedEmail, d="email_archive"):
    os.makedirs(d, exist_ok=True); ts=datetime.now().strftime("%Y%m%d_%H%M%S")
    with open(f"{d}/{ts}_{p.category}_{p.record.uid}.json","w",encoding="utf-8") as f:
        json.dump({"uid":p.record.uid,"sender":p.record.sender,"category":p.category,
                   "urgency":p.urgency,"reply_draft":p.reply_draft,"auto_sent":p.auto_send,
                   "processed_at":datetime.now().isoformat()}, f, ensure_ascii=False, indent=2)

def main():
    emails = fetch_unread(limit=10); auto_sent = queued = 0
    for rec in emails:
        try:
            p = analyze_and_reply(rec); archive(p)
            if p.auto_send: send_reply(p); auto_sent += 1
            else: queued += 1; logging.info(f"[{p.urgency.upper()}] {p.category}         except Exception as e: logging.error(f"Failed: {e}")
    print(f"\nDone: {len(emails)} emails — {auto_sent} auto-replied, {queued} queued")

if __name__ == "__main__": main()

Production tip: Run in dry-run mode (archive only) for one week. Enable auto-send only after classification accuracy exceeds 90%. Always route refund and complaint categories to human review.

Previous NextChapter 13: Data Visualization

Rate this chapter
4.6  / 5  (23 ratings)

💬 Comments