Streaming Output: SSE Protocol, Resumable Connections and Front-End Real-Time Rendering in Practice
Chapter 10: Batch API: Asynchronous Large-Scale Inference and Cost Optimization
10.1 What the Batch API Is For
Anthropic's Message Batches API is purpose-built for asynchronous, large-scale inference workloads. Compared to the standard Messages API, it differs in three fundamental ways:
- Asynchronous execution โ you submit a batch and get a batch ID back immediately; results are ready when processing completes, typically well within 24 hours
- 50% cost discount โ every model available through the standard API is available at half price through the Batch API
- Higher throughput โ batch requests operate outside real-time rate limits, making it practical to process hundreds of thousands of items in a single run
When to use Batch API
| Use case | Batch API? | Notes |
|---|---|---|
| Large-scale data annotation | Yes | Core use case |
| Bulk document summarization | Yes | Process entire knowledge bases |
| Offline model evaluation | Yes | Run thousands of test cases |
| Bulk translation | Yes | Multi-language product copy |
| Real-time chat | No | Use streaming Messages API |
| Tool-call agent loops | No | Requires synchronous execution |
10.2 Creating and Monitoring a Batch
Submitting a batch
import anthropic
client = anthropic.Anthropic()
requests = [
{
"custom_id": "review-001",
"params": {
"model": "claude-haiku-4-5-20251001",
"max_tokens": 20,
"messages": [{
"role": "user",
"content": "Classify sentiment (positive/negative/neutral):\n\nThe headphones have amazing sound quality, punchy bass, and crystal highs. Highly recommended!"
}]
}
},
{
"custom_id": "review-002",
"params": {
"model": "claude-haiku-4-5-20251001",
"max_tokens": 20,
"messages": [{
"role": "user",
"content": "Classify sentiment (positive/negative/neutral):\n\nSlow shipping, terrible packaging, product does not match description. Very disappointed."
}]
}
}
]
batch = client.messages.batches.create(requests=requests)
print(f"Batch ID: {batch.id}")
print(f"Status: {batch.processing_status}")
print(f"Expires: {batch.expires_at}")
Each batch can contain up to 10,000 requests with a total payload under 32 MB. The custom_id field is your key for correlating results โ it must be unique within the batch.
Polling for completion
import time
batch_id = batch.id
while True:
batch = client.messages.batches.retrieve(batch_id)
counts = batch.request_counts
print(f"Status: {batch.processing_status} | "
f"Succeeded: {counts.succeeded} | "
f"Errored: {counts.errored} | "
f"Processing: {counts.processing}")
if batch.processing_status == "ended":
break
time.sleep(60) # Poll every minute
print("Batch complete!")
processing_status values:
in_progressโ still being processedcancelingโ cancellation in progressendedโ done (all succeeded, all failed, or mixed)
10.3 Retrieving Results
Iterating over results
results = {}
for result in client.messages.batches.results(batch_id):
cid = result.custom_id
if result.result.type == "succeeded":
message = result.result.message
results[cid] = {
"text": message.content[0].text,
"input_tokens": message.usage.input_tokens,
"output_tokens": message.usage.output_tokens
}
elif result.result.type == "errored":
results[cid] = {
"error_type": result.result.error.type,
"error": str(result.result.error)
}
for cid, r in results.items():
if "text" in r:
print(f"{cid}: {r['text']} ({r['input_tokens']}+{r['output_tokens']} tokens)")
else:
print(f"{cid}: ERROR โ {r['error_type']}")
Streaming results to disk
For large batches, avoid loading everything into memory:
import json
with open("batch_results.jsonl", "w") as f:
for result in client.messages.batches.results(batch_id):
record = {
"custom_id": result.custom_id,
"type": result.result.type
}
if result.result.type == "succeeded":
record["text"] = result.result.message.content[0].text
record["usage"] = {
"in": result.result.message.usage.input_tokens,
"out": result.result.message.usage.output_tokens
}
else:
record["error"] = result.result.error.type
f.write(json.dumps(record) + "\n")
10.4 Engineering Patterns for Large Batches
Chunked submission
When you have more than 10,000 requests, split them into multiple batches:
import anthropic, time
from typing import Generator
def chunked(lst: list, n: int) -> Generator:
for i in range(0, len(lst), n):
yield lst[i:i + n]
def submit_all(client, all_requests: list, chunk_size=9000) -> list[str]:
batch_ids = []
chunks = list(chunked(all_requests, chunk_size))
print(f"Splitting {len(all_requests)} requests into {len(chunks)} batches")
for i, chunk in enumerate(chunks):
batch = client.messages.batches.create(requests=chunk)
batch_ids.append(batch.id)
print(f" Submitted batch {i+1}/{len(chunks)}: {batch.id}")
if i < len(chunks) - 1:
time.sleep(1) # Avoid create-request rate limits
return batch_ids
def wait_all(client, batch_ids: list[str], poll_interval=60) -> dict:
pending = set(batch_ids)
all_results = {}
while pending:
done = set()
for bid in pending:
b = client.messages.batches.retrieve(bid)
if b.processing_status == "ended":
for result in client.messages.batches.results(bid):
if result.result.type == "succeeded":
all_results[result.custom_id] = (
result.result.message.content[0].text
)
done.add(bid)
print(f"Batch {bid} done: "
f"{b.request_counts.succeeded} ok, "
f"{b.request_counts.errored} errors")
pending -= done
if pending:
print(f"{len(pending)} batches still running...")
time.sleep(poll_interval)
return all_results
Batch requests with system prompts
SYSTEM = "You are a professional e-commerce review analyst. Output JSON only."
requests = [
{
"custom_id": f"review-{i}",
"params": {
"model": "claude-haiku-4-5-20251001",
"max_tokens": 100,
"system": SYSTEM,
"messages": [{"role": "user", "content": text}]
}
}
for i, text in enumerate(review_texts)
]
10.5 Cost Optimization Strategies
Model cost comparison (Batch pricing)
| Model | Batch Input | Batch Output |
|---|---|---|
| claude-opus-4-6 | $7.50/MTok | $37.50/MTok |
| claude-sonnet-4-6 | $1.50/MTok | $7.50/MTok |
| claude-haiku-4-5-20251001 | $0.40/MTok | $2.00/MTok |
Using claude-haiku-4-5-20251001 + Batch API vs. claude-opus-4-6 standard API = roughly 188x cost reduction for the same volume.
Minimizing token consumption
# Wasteful: excessive context and over-allocated max_tokens
bad = {
"custom_id": "item-001",
"params": {
"model": "claude-haiku-4-5-20251001",
"max_tokens": 512, # Way more than needed
"messages": [{"role": "user", "content": "[500 words of preamble...]\n\nClassify: Great product!"}]
}
}
# Efficient: concise system prompt, minimal max_tokens
good = {
"custom_id": "item-001",
"params": {
"model": "claude-haiku-4-5-20251001",
"max_tokens": 5, # Only need "positive"
"system": "Classify sentiment: output positive/negative/neutral only.",
"messages": [{"role": "user", "content": "Great product!"}]
}
}
Token estimation before submission
def estimate_cost(client, requests: list, model: str = "claude-haiku-4-5-20251001") -> dict:
PRICES = {
"claude-haiku-4-5-20251001": {"in": 0.40, "out": 2.00},
"claude-sonnet-4-6": {"in": 1.50, "out": 7.50},
"claude-opus-4-6": {"in": 7.50, "out": 37.50},
}
sample = requests[:min(10, len(requests))]
total_in = 0
for req in sample:
count = client.messages.count_tokens(
model=model,
system=req["params"].get("system", ""),
messages=req["params"]["messages"]
)
total_in += count.input_tokens
avg_in = total_in / len(sample)
avg_out = sum(req["params"].get("max_tokens", 100) * 0.5 for req in sample) / len(sample)
total_in_est = avg_in * len(requests)
total_out_est = avg_out * len(requests)
p = PRICES[model]
return {
"estimated_input_tokens": int(total_in_est),
"estimated_output_tokens": int(total_out_est),
"estimated_cost_usd": round(
(total_in_est / 1e6) * p["in"] + (total_out_est / 1e6) * p["out"], 4
)
}
10.6 Error Handling and Partial Failures
Individual requests within a batch can fail while others succeed. The batch itself will still end with status ended. Always check each result's type:
from collections import defaultdict
def analyze_results(client, batch_id: str) -> dict:
succeeded, failed = {}, defaultdict(list)
for result in client.messages.batches.results(batch_id):
if result.result.type == "succeeded":
succeeded[result.custom_id] = result.result.message.content[0].text
else:
failed[result.result.error.type].append(result.custom_id)
if failed:
print("Failures by type:")
for etype, ids in failed.items():
print(f" {etype}: {len(ids)} items")
return {"succeeded": succeeded, "failed": dict(failed)}
def build_retry_batch(original_requests: list, failed_ids: set) -> list:
return [r for r in original_requests if r["custom_id"] in failed_ids]
Common error types
| Error type | Cause | Action |
|---|---|---|
invalid_request |
Malformed params | Fix request before retrying |
overloaded_error |
Capacity issues | Retry in a new batch |
rate_limit_error |
Rare in batch mode | Retry |
content_filter |
Policy violation | Revise input content |
10.7 Combining Batch API with Prompt Caching
Prompt Caching and Batch API stack together. When many requests share the same system prompt, cache hits reduce input costs by an additional 90%:
SHARED_SYSTEM = "You are a professional document analyst. Extract key information concisely."
requests = [
{
"custom_id": f"doc-{i}",
"params": {
"model": "claude-haiku-4-5-20251001",
"max_tokens": 200,
"system": [
{
"type": "text",
"text": SHARED_SYSTEM,
"cache_control": {"type": "ephemeral"} # Enable caching
}
],
"messages": [{"role": "user", "content": doc_text}]
}
}
for i, doc_text in enumerate(documents)
]
The combined discount: Batch 50% off + Cache hit 90% off on the shared system prompt portion = highly economical for homogeneous workloads.
10.8 Batch Lifecycle Management
Listing and canceling batches
# List recent batches
for batch in client.messages.batches.list():
print(f"{batch.id}: {batch.processing_status} (created {batch.created_at})")
# Cancel a batch (only possible while in_progress)
canceled = client.messages.batches.cancel(batch_id)
print(f"Status after cancel: {canceled.processing_status}")
Expiry awareness
- Unfinished batches expire after 24 hours
- Results remain available for 29 days after completion
- Always download results before the 29-day window closes
from datetime import datetime, timezone
def warn_if_expiring(batch, threshold_hours: float = 4.0) -> None:
now = datetime.now(timezone.utc)
hours_left = (batch.expires_at - now).total_seconds() / 3600
if hours_left < threshold_hours:
print(f"WARNING: Batch {batch.id} expires in {hours_left:.1f}h!")
10.9 Complete Production Example
import anthropic, json, time
def batch_product_descriptions(products: list[dict], output_path: str) -> None:
client = anthropic.Anthropic()
requests = [
{
"custom_id": f"prod-{p['id']}",
"params": {
"model": "claude-haiku-4-5-20251001",
"max_tokens": 150,
"system": "Write a compelling 50-100 word product description for e-commerce.",
"messages": [{
"role": "user",
"content": f"Name: {p['name']}\nCategory: {p['category']}\nFeatures: {', '.join(p['features'])}"
}]
}
}
for p in products
]
# Cost estimate before submitting
est = estimate_cost(client, requests)
print(f"Estimated cost: ${est['estimated_cost_usd']:.4f} USD")
confirm = input("Submit? (y/n): ")
if confirm.lower() != "y":
return
# Submit in chunks
batch_ids = []
for i in range(0, len(requests), 9000):
b = client.messages.batches.create(requests=requests[i:i+9000])
batch_ids.append(b.id)
print(f"Submitted {b.id}")
# Wait for all
pending = set(batch_ids)
while pending:
done = {bid for bid in pending
if client.messages.batches.retrieve(bid).processing_status == "ended"}
pending -= done
if pending:
time.sleep(30)
# Write results
with open(output_path, "w") as f:
for bid in batch_ids:
for result in client.messages.batches.results(bid):
if result.result.type == "succeeded":
f.write(json.dumps({
"id": result.custom_id.replace("prod-", ""),
"description": result.result.message.content[0].text
}) + "\n")
print(f"Done! Results at {output_path}")
Summary
The Batch API is the right tool whenever you need to run Claude at scale without real-time constraints. Key takeaways:
- 50% cost discount makes it the default choice for data pipelines, annotation, and evaluation
- Up to 10,000 requests per batch; split larger jobs with a chunking helper
custom_idis your correlation key โ keep it unique and meaningful- Stream results with
batches.results()to avoid memory issues on large batches - Combine with Prompt Caching for homogeneous workloads โ the savings stack
- Always handle partial failures; implement a retry loop for errored requests