System Prompt Engineering: Complete Methodology for Role Injection, Constraint Design and Persona Consistency
Chapter 9: Streaming Output: SSE Protocol, Incremental Rendering, and Error Recovery
9.1 Why Streaming Matters
Without streaming, a Claude API call forces the user to wait until every token of the response has been generated before anything appears on screen. For a 1,000-token reply, that can mean 10–20 seconds of blank silence. Modern conversational AI products cannot afford that experience.
Streaming solves this by using the Server-Sent Events (SSE) protocol: as each token is generated it is immediately pushed to the client. The perceived latency drops from "total generation time" to Time To First Token (TTFT), typically 300 ms–1 s. Users see words appear as the model "thinks", transforming a frustrating wait into an engaging interaction.
When to use streaming
| Use case | Recommendation | Reason |
|---|---|---|
| Chat interface | Strongly recommended | Core UX metric |
| Code generation | Recommended | User can review as it appears |
| Background summarization | Not needed | Completeness matters more |
| Batch async processing | Not applicable | Use Batch API instead |
| Tool-call result processing | Situational | Needs full JSON before parsing |
9.2 SSE Protocol Fundamentals
Server-Sent Events is a W3C standard built on HTTP long-polling. The wire format is minimal:
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}\n\n
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" world"}}\n\n
data: [DONE]\n\n
Each event starts with data: and terminates with a double newline \n\n. Claude's API embeds JSON objects inside these SSE data fields.
Claude streaming event sequence
A complete streaming response produces events in this order:
message_start → Response metadata (model, initial usage, etc.)
content_block_start → Block begins (index=0, type=text or tool_use)
ping → Heartbeat (periodic)
content_block_delta → Incremental content (repeats many times)
content_block_stop → Block complete
message_delta → Message-level delta (stop_reason, updated usage)
message_stop → Response finished
Understanding this sequence is essential when handling tool calls, where content_block_start carries type=tool_use and subsequent content_block_delta events carry partial JSON strings that must be accumulated before parsing.
9.3 Python Streaming Implementation
Basic streaming
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[
{"role": "user", "content": "Explain quantum entanglement in simple terms."}
]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
final = stream.get_final_message()
print(f"\n\n[Tokens: {final.usage.input_tokens} in / {final.usage.output_tokens} out]")
client.messages.stream() is the official SDK context manager. stream.text_stream filters out non-text events automatically and yields pure text deltas — the simplest approach for most use cases.
Raw event handling
When you need full event access (tool calls, token counting per-event):
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
system="You are a technical documentation assistant.",
messages=[{"role": "user", "content": "Write a Python quicksort implementation."}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if hasattr(event.delta, "text"):
print(event.delta.text, end="", flush=True)
elif event.type == "message_start":
print(f"[Model: {event.message.model}]")
elif event.type == "message_delta":
if event.usage:
print(f"\n[Output tokens: {event.usage.output_tokens}]")
Low-level: stream=True parameter
For maximum control, use client.messages.create(stream=True):
import anthropic, json
client = anthropic.Anthropic()
with client.messages.create(
model="claude-opus-4-6",
max_tokens=4096,
stream=True,
messages=[{"role": "user", "content": "Write a long essay on machine learning."}]
) as response:
for line in response.iter_lines():
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
try:
event = json.loads(data)
if event.get("type") == "content_block_delta":
delta = event.get("delta", {})
if delta.get("type") == "text_delta":
print(delta.get("text", ""), end="", flush=True)
except json.JSONDecodeError:
pass
In production code, prefer the SDK's stream() context manager over manual SSE parsing — it handles reconnection, error propagation, and event accumulation correctly.
9.4 TypeScript / Node.js Streaming
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
async function streamChat(userMessage: string): Promise<void> {
const stream = await client.messages.stream({
model: "claude-sonnet-4-6",
max_tokens: 1024,
messages: [{ role: "user", content: userMessage }],
});
process.stdout.write("Assistant: ");
for await (const chunk of stream) {
if (
chunk.type === "content_block_delta" &&
chunk.delta.type === "text_delta"
) {
process.stdout.write(chunk.delta.text);
}
}
const final = await stream.getFinalMessage();
console.log(`\n\n[Tokens: ${final.usage.output_tokens}]`);
}
// Forwarding a stream from an Express server
import express from "express";
const app = express();
app.get("/chat", async (req, res) => {
const message = req.query.message as string;
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
const stream = await client.messages.stream({
model: "claude-sonnet-4-6",
max_tokens: 1024,
messages: [{ role: "user", content: message }],
});
for await (const chunk of stream) {
if (
chunk.type === "content_block_delta" &&
chunk.delta.type === "text_delta"
) {
res.write(`data: ${JSON.stringify({ text: chunk.delta.text })}\n\n`);
}
}
res.write("data: [DONE]\n\n");
res.end();
});
9.5 Incremental Rendering Techniques
React real-time rendering
import { useState, useCallback } from "react";
function ChatInterface() {
const [response, setResponse] = useState("");
const [isStreaming, setIsStreaming] = useState(false);
const sendMessage = useCallback(async (message: string) => {
setResponse("");
setIsStreaming(true);
const eventSource = new EventSource(
`/api/chat?message=${encodeURIComponent(message)}`
);
eventSource.onmessage = (event) => {
if (event.data === "[DONE]") {
eventSource.close();
setIsStreaming(false);
return;
}
const data = JSON.parse(event.data);
if (data.text) {
setResponse((prev) => prev + data.text);
}
};
eventSource.onerror = () => {
eventSource.close();
setIsStreaming(false);
};
}, []);
return (
<div>
<div className="response-area">
{response}
{isStreaming && <span className="cursor-blink">▋</span>}
</div>
</div>
);
}
The Markdown incremental rendering problem
Streaming Markdown presents a classic challenge: syntax markers can be split across delta chunks. The sequence **bold** might arrive as **bo, ld, ** — rendering after each delta shows raw asterisks briefly.
Solution: buffered rendering with debounce
class StreamingMarkdownRenderer {
private buffer = "";
private renderTimeout: ReturnType<typeof setTimeout> | null = null;
private readonly RENDER_DELAY = 50; // ms
appendText(delta: string): void {
this.buffer += delta;
if (this.renderTimeout) clearTimeout(this.renderTimeout);
this.renderTimeout = setTimeout(() => this.render(), this.RENDER_DELAY);
}
private render(): void {
const safe = this.getSafeBuffer();
document.getElementById("output")!.innerHTML = marked(safe);
}
private getSafeBuffer(): string {
// Detect unclosed fenced code blocks
const openFence = (this.buffer.match(/```/g) || []).length % 2 !== 0;
return openFence ? this.buffer + "\n```" : this.buffer;
}
}
9.6 Streaming Tool Call Handling
During tool use, content_block_delta carries input_json_delta — partial JSON that must be accumulated before parsing:
import anthropic, json
client = anthropic.Anthropic()
tools = [{
"name": "get_weather",
"description": "Get weather for a city",
"input_schema": {
"type": "object",
"properties": {
"city": {"type": "string"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
"required": ["city"]
}
}]
tool_calls: dict = {}
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": "What's the weather in Paris?"}]
) as stream:
for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "tool_use":
tool_calls[event.index] = {
"name": event.content_block.name,
"id": event.content_block.id,
"json_str": ""
}
elif event.type == "content_block_delta":
if event.delta.type == "input_json_delta":
if event.index in tool_calls:
tool_calls[event.index]["json_str"] += event.delta.partial_json
elif event.type == "content_block_stop":
if event.index in tool_calls:
tc = tool_calls[event.index]
tc["input"] = json.loads(tc["json_str"])
print(f"Tool call: {tc['name']}({tc['input']})")
9.7 Error Recovery and Retry Strategies
Error taxonomy
| Error type | HTTP status | Strategy |
|---|---|---|
| Rate limit | 429 | Exponential backoff |
| Overloaded | 529 | Short wait then retry |
| Timeout | 408 / network drop | Resume if possible |
| Content filter | 400 | Do not retry; modify input |
| Server error | 500/503 | Limited retries |
Streaming client with retry
import anthropic, time, logging
from typing import Generator
logger = logging.getLogger(__name__)
def stream_with_retry(
client: anthropic.Anthropic,
max_retries: int = 3,
**kwargs
) -> Generator[str, None, None]:
for attempt in range(max_retries):
try:
with client.messages.stream(**kwargs) as stream:
for text in stream.text_stream:
yield text
return
except anthropic.RateLimitError:
if attempt == max_retries - 1:
raise
wait = 2 ** attempt # 1s, 2s, 4s
logger.warning(f"Rate limited, waiting {wait}s (attempt {attempt+1})")
time.sleep(wait)
except anthropic.APIStatusError as e:
if e.status_code in (500, 503, 529):
if attempt == max_retries - 1:
raise
time.sleep(0.5 * (2 ** attempt))
else:
raise
except (anthropic.APIConnectionError, anthropic.APITimeoutError):
if attempt == max_retries - 1:
raise
time.sleep(1.0)
9.8 Performance Optimization
Reducing TTFT
- Choose a faster model —
claude-haiku-4-5-20251001has 50–70% lower TTFT thanclaude-opus-4-6 - Shorten system prompts — long prefills add processing time before the first token
- Set realistic
max_tokens— over-allocating can increase scheduling latency - Use async clients for concurrent requests
Async parallel streaming
import asyncio, anthropic
async def stream_single(client, message: str) -> str:
result = []
async with client.messages.stream(
model="claude-haiku-4-5-20251001",
max_tokens=512,
messages=[{"role": "user", "content": message}]
) as stream:
async for text in stream.text_stream:
result.append(text)
return "".join(result)
async def parallel_streams(messages: list[str]) -> list[str]:
client = anthropic.AsyncAnthropic()
return await asyncio.gather(*[stream_single(client, m) for m in messages])
results = asyncio.run(parallel_streams([
"Explain machine learning",
"Explain deep learning",
"Explain reinforcement learning"
]))
9.9 Production Deployment
Nginx configuration
location /api/chat {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 300s;
proxy_send_timeout 300s;
add_header X-Accel-Buffering no;
}
Client timeout settings
client = anthropic.Anthropic(
timeout=anthropic.Timeout(
connect=5.0,
read=300.0, # Must be large for long streams
write=10.0,
pool=5.0
)
)
Monitoring metrics
- TTFT (P50/P95/P99) — primary user experience indicator
- Stream abort rate — percentage of streams that terminate abnormally
- Tokens/second — model throughput health signal
- Client disconnect rate — users who close the connection before completion
Summary
Streaming transforms the user experience from "wait for everything" to "instant feedback." Key takeaways:
- Use
client.messages.stream()rather than manual SSE parsing - Accumulate
input_json_deltafragments before parsing tool call inputs - Implement exponential backoff for rate limits and server errors
- Disable proxy buffering in Nginx with
proxy_buffering off - Monitor TTFT as the primary UX metric, not total response time