Chapter 10

Streaming Output: SSE Protocol, Resumable Connections and Front-End Real-Time Rendering in Practice

Chapter 10: Batch API: Asynchronous Large-Scale Inference and Cost Optimization

10.1 What the Batch API Is For

Anthropic's Message Batches API is purpose-built for asynchronous, large-scale inference workloads. Compared to the standard Messages API, it differs in three fundamental ways:

When to use Batch API

Use case Batch API? Notes
Large-scale data annotation Yes Core use case
Bulk document summarization Yes Process entire knowledge bases
Offline model evaluation Yes Run thousands of test cases
Bulk translation Yes Multi-language product copy
Real-time chat No Use streaming Messages API
Tool-call agent loops No Requires synchronous execution

10.2 Creating and Monitoring a Batch

Submitting a batch

import anthropic

client = anthropic.Anthropic()

requests = [
    {
        "custom_id": "review-001",
        "params": {
            "model": "claude-haiku-4-5-20251001",
            "max_tokens": 20,
            "messages": [{
                "role": "user",
                "content": "Classify sentiment (positive/negative/neutral):\n\nThe headphones have amazing sound quality, punchy bass, and crystal highs. Highly recommended!"
            }]
        }
    },
    {
        "custom_id": "review-002",
        "params": {
            "model": "claude-haiku-4-5-20251001",
            "max_tokens": 20,
            "messages": [{
                "role": "user",
                "content": "Classify sentiment (positive/negative/neutral):\n\nSlow shipping, terrible packaging, product does not match description. Very disappointed."
            }]
        }
    }
]

batch = client.messages.batches.create(requests=requests)
print(f"Batch ID: {batch.id}")
print(f"Status: {batch.processing_status}")
print(f"Expires: {batch.expires_at}")

Each batch can contain up to 10,000 requests with a total payload under 32 MB. The custom_id field is your key for correlating results โ€” it must be unique within the batch.

Polling for completion

import time

batch_id = batch.id

while True:
    batch = client.messages.batches.retrieve(batch_id)
    counts = batch.request_counts

    print(f"Status: {batch.processing_status} | "
          f"Succeeded: {counts.succeeded} | "
          f"Errored: {counts.errored} | "
          f"Processing: {counts.processing}")

    if batch.processing_status == "ended":
        break

    time.sleep(60)  # Poll every minute

print("Batch complete!")

processing_status values:

10.3 Retrieving Results

Iterating over results

results = {}

for result in client.messages.batches.results(batch_id):
    cid = result.custom_id

    if result.result.type == "succeeded":
        message = result.result.message
        results[cid] = {
            "text": message.content[0].text,
            "input_tokens": message.usage.input_tokens,
            "output_tokens": message.usage.output_tokens
        }
    elif result.result.type == "errored":
        results[cid] = {
            "error_type": result.result.error.type,
            "error": str(result.result.error)
        }

for cid, r in results.items():
    if "text" in r:
        print(f"{cid}: {r['text']} ({r['input_tokens']}+{r['output_tokens']} tokens)")
    else:
        print(f"{cid}: ERROR โ€” {r['error_type']}")

Streaming results to disk

For large batches, avoid loading everything into memory:

import json

with open("batch_results.jsonl", "w") as f:
    for result in client.messages.batches.results(batch_id):
        record = {
            "custom_id": result.custom_id,
            "type": result.result.type
        }
        if result.result.type == "succeeded":
            record["text"] = result.result.message.content[0].text
            record["usage"] = {
                "in": result.result.message.usage.input_tokens,
                "out": result.result.message.usage.output_tokens
            }
        else:
            record["error"] = result.result.error.type
        f.write(json.dumps(record) + "\n")

10.4 Engineering Patterns for Large Batches

Chunked submission

When you have more than 10,000 requests, split them into multiple batches:

import anthropic, time
from typing import Generator

def chunked(lst: list, n: int) -> Generator:
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

def submit_all(client, all_requests: list, chunk_size=9000) -> list[str]:
    batch_ids = []
    chunks = list(chunked(all_requests, chunk_size))
    print(f"Splitting {len(all_requests)} requests into {len(chunks)} batches")

    for i, chunk in enumerate(chunks):
        batch = client.messages.batches.create(requests=chunk)
        batch_ids.append(batch.id)
        print(f"  Submitted batch {i+1}/{len(chunks)}: {batch.id}")
        if i < len(chunks) - 1:
            time.sleep(1)  # Avoid create-request rate limits

    return batch_ids

def wait_all(client, batch_ids: list[str], poll_interval=60) -> dict:
    pending = set(batch_ids)
    all_results = {}

    while pending:
        done = set()
        for bid in pending:
            b = client.messages.batches.retrieve(bid)
            if b.processing_status == "ended":
                for result in client.messages.batches.results(bid):
                    if result.result.type == "succeeded":
                        all_results[result.custom_id] = (
                            result.result.message.content[0].text
                        )
                done.add(bid)
                print(f"Batch {bid} done: "
                      f"{b.request_counts.succeeded} ok, "
                      f"{b.request_counts.errored} errors")
        pending -= done
        if pending:
            print(f"{len(pending)} batches still running...")
            time.sleep(poll_interval)

    return all_results

Batch requests with system prompts

SYSTEM = "You are a professional e-commerce review analyst. Output JSON only."

requests = [
    {
        "custom_id": f"review-{i}",
        "params": {
            "model": "claude-haiku-4-5-20251001",
            "max_tokens": 100,
            "system": SYSTEM,
            "messages": [{"role": "user", "content": text}]
        }
    }
    for i, text in enumerate(review_texts)
]

10.5 Cost Optimization Strategies

Model cost comparison (Batch pricing)

Model Batch Input Batch Output
claude-opus-4-6 $7.50/MTok $37.50/MTok
claude-sonnet-4-6 $1.50/MTok $7.50/MTok
claude-haiku-4-5-20251001 $0.40/MTok $2.00/MTok

Using claude-haiku-4-5-20251001 + Batch API vs. claude-opus-4-6 standard API = roughly 188x cost reduction for the same volume.

Minimizing token consumption

# Wasteful: excessive context and over-allocated max_tokens
bad = {
    "custom_id": "item-001",
    "params": {
        "model": "claude-haiku-4-5-20251001",
        "max_tokens": 512,   # Way more than needed
        "messages": [{"role": "user", "content": "[500 words of preamble...]\n\nClassify: Great product!"}]
    }
}

# Efficient: concise system prompt, minimal max_tokens
good = {
    "custom_id": "item-001",
    "params": {
        "model": "claude-haiku-4-5-20251001",
        "max_tokens": 5,    # Only need "positive"
        "system": "Classify sentiment: output positive/negative/neutral only.",
        "messages": [{"role": "user", "content": "Great product!"}]
    }
}

Token estimation before submission

def estimate_cost(client, requests: list, model: str = "claude-haiku-4-5-20251001") -> dict:
    PRICES = {
        "claude-haiku-4-5-20251001": {"in": 0.40, "out": 2.00},
        "claude-sonnet-4-6": {"in": 1.50, "out": 7.50},
        "claude-opus-4-6": {"in": 7.50, "out": 37.50},
    }

    sample = requests[:min(10, len(requests))]
    total_in = 0

    for req in sample:
        count = client.messages.count_tokens(
            model=model,
            system=req["params"].get("system", ""),
            messages=req["params"]["messages"]
        )
        total_in += count.input_tokens

    avg_in = total_in / len(sample)
    avg_out = sum(req["params"].get("max_tokens", 100) * 0.5 for req in sample) / len(sample)

    total_in_est = avg_in * len(requests)
    total_out_est = avg_out * len(requests)

    p = PRICES[model]
    return {
        "estimated_input_tokens": int(total_in_est),
        "estimated_output_tokens": int(total_out_est),
        "estimated_cost_usd": round(
            (total_in_est / 1e6) * p["in"] + (total_out_est / 1e6) * p["out"], 4
        )
    }

10.6 Error Handling and Partial Failures

Individual requests within a batch can fail while others succeed. The batch itself will still end with status ended. Always check each result's type:

from collections import defaultdict

def analyze_results(client, batch_id: str) -> dict:
    succeeded, failed = {}, defaultdict(list)

    for result in client.messages.batches.results(batch_id):
        if result.result.type == "succeeded":
            succeeded[result.custom_id] = result.result.message.content[0].text
        else:
            failed[result.result.error.type].append(result.custom_id)

    if failed:
        print("Failures by type:")
        for etype, ids in failed.items():
            print(f"  {etype}: {len(ids)} items")

    return {"succeeded": succeeded, "failed": dict(failed)}

def build_retry_batch(original_requests: list, failed_ids: set) -> list:
    return [r for r in original_requests if r["custom_id"] in failed_ids]

Common error types

Error type Cause Action
invalid_request Malformed params Fix request before retrying
overloaded_error Capacity issues Retry in a new batch
rate_limit_error Rare in batch mode Retry
content_filter Policy violation Revise input content

10.7 Combining Batch API with Prompt Caching

Prompt Caching and Batch API stack together. When many requests share the same system prompt, cache hits reduce input costs by an additional 90%:

SHARED_SYSTEM = "You are a professional document analyst. Extract key information concisely."

requests = [
    {
        "custom_id": f"doc-{i}",
        "params": {
            "model": "claude-haiku-4-5-20251001",
            "max_tokens": 200,
            "system": [
                {
                    "type": "text",
                    "text": SHARED_SYSTEM,
                    "cache_control": {"type": "ephemeral"}  # Enable caching
                }
            ],
            "messages": [{"role": "user", "content": doc_text}]
        }
    }
    for i, doc_text in enumerate(documents)
]

The combined discount: Batch 50% off + Cache hit 90% off on the shared system prompt portion = highly economical for homogeneous workloads.

10.8 Batch Lifecycle Management

Listing and canceling batches

# List recent batches
for batch in client.messages.batches.list():
    print(f"{batch.id}: {batch.processing_status} (created {batch.created_at})")

# Cancel a batch (only possible while in_progress)
canceled = client.messages.batches.cancel(batch_id)
print(f"Status after cancel: {canceled.processing_status}")

Expiry awareness

from datetime import datetime, timezone

def warn_if_expiring(batch, threshold_hours: float = 4.0) -> None:
    now = datetime.now(timezone.utc)
    hours_left = (batch.expires_at - now).total_seconds() / 3600
    if hours_left < threshold_hours:
        print(f"WARNING: Batch {batch.id} expires in {hours_left:.1f}h!")

10.9 Complete Production Example

import anthropic, json, time

def batch_product_descriptions(products: list[dict], output_path: str) -> None:
    client = anthropic.Anthropic()

    requests = [
        {
            "custom_id": f"prod-{p['id']}",
            "params": {
                "model": "claude-haiku-4-5-20251001",
                "max_tokens": 150,
                "system": "Write a compelling 50-100 word product description for e-commerce.",
                "messages": [{
                    "role": "user",
                    "content": f"Name: {p['name']}\nCategory: {p['category']}\nFeatures: {', '.join(p['features'])}"
                }]
            }
        }
        for p in products
    ]

    # Cost estimate before submitting
    est = estimate_cost(client, requests)
    print(f"Estimated cost: ${est['estimated_cost_usd']:.4f} USD")
    confirm = input("Submit? (y/n): ")
    if confirm.lower() != "y":
        return

    # Submit in chunks
    batch_ids = []
    for i in range(0, len(requests), 9000):
        b = client.messages.batches.create(requests=requests[i:i+9000])
        batch_ids.append(b.id)
        print(f"Submitted {b.id}")

    # Wait for all
    pending = set(batch_ids)
    while pending:
        done = {bid for bid in pending
                if client.messages.batches.retrieve(bid).processing_status == "ended"}
        pending -= done
        if pending:
            time.sleep(30)

    # Write results
    with open(output_path, "w") as f:
        for bid in batch_ids:
            for result in client.messages.batches.results(bid):
                if result.result.type == "succeeded":
                    f.write(json.dumps({
                        "id": result.custom_id.replace("prod-", ""),
                        "description": result.result.message.content[0].text
                    }) + "\n")

    print(f"Done! Results at {output_path}")

Summary

The Batch API is the right tool whenever you need to run Claude at scale without real-time constraints. Key takeaways:

  1. 50% cost discount makes it the default choice for data pipelines, annotation, and evaluation
  2. Up to 10,000 requests per batch; split larger jobs with a chunking helper
  3. custom_id is your correlation key โ€” keep it unique and meaningful
  4. Stream results with batches.results() to avoid memory issues on large batches
  5. Combine with Prompt Caching for homogeneous workloads โ€” the savings stack
  6. Always handle partial failures; implement a retry loop for errored requests
Rate this chapter
4.5  / 5  (50 ratings)

๐Ÿ’ฌ Comments