Chapter 9

System Prompt Engineering: Complete Methodology for Role Injection, Constraint Design and Persona Consistency

Chapter 9: Streaming Output: SSE Protocol, Incremental Rendering, and Error Recovery

9.1 Why Streaming Matters

Without streaming, a Claude API call forces the user to wait until every token of the response has been generated before anything appears on screen. For a 1,000-token reply, that can mean 10โ€“20 seconds of blank silence. Modern conversational AI products cannot afford that experience.

Streaming solves this by using the Server-Sent Events (SSE) protocol: as each token is generated it is immediately pushed to the client. The perceived latency drops from "total generation time" to Time To First Token (TTFT), typically 300 msโ€“1 s. Users see words appear as the model "thinks", transforming a frustrating wait into an engaging interaction.

When to use streaming

Use case Recommendation Reason
Chat interface Strongly recommended Core UX metric
Code generation Recommended User can review as it appears
Background summarization Not needed Completeness matters more
Batch async processing Not applicable Use Batch API instead
Tool-call result processing Situational Needs full JSON before parsing

9.2 SSE Protocol Fundamentals

Server-Sent Events is a W3C standard built on HTTP long-polling. The wire format is minimal:

data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}\n\n
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" world"}}\n\n
data: [DONE]\n\n

Each event starts with data: and terminates with a double newline \n\n. Claude's API embeds JSON objects inside these SSE data fields.

Claude streaming event sequence

A complete streaming response produces events in this order:

message_start        โ†’ Response metadata (model, initial usage, etc.)
content_block_start  โ†’ Block begins (index=0, type=text or tool_use)
ping                 โ†’ Heartbeat (periodic)
content_block_delta  โ†’ Incremental content (repeats many times)
content_block_stop   โ†’ Block complete
message_delta        โ†’ Message-level delta (stop_reason, updated usage)
message_stop         โ†’ Response finished

Understanding this sequence is essential when handling tool calls, where content_block_start carries type=tool_use and subsequent content_block_delta events carry partial JSON strings that must be accumulated before parsing.

9.3 Python Streaming Implementation

Basic streaming

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Explain quantum entanglement in simple terms."}
    ]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

    final = stream.get_final_message()
    print(f"\n\n[Tokens: {final.usage.input_tokens} in / {final.usage.output_tokens} out]")

client.messages.stream() is the official SDK context manager. stream.text_stream filters out non-text events automatically and yields pure text deltas โ€” the simplest approach for most use cases.

Raw event handling

When you need full event access (tool calls, token counting per-event):

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=2048,
    system="You are a technical documentation assistant.",
    messages=[{"role": "user", "content": "Write a Python quicksort implementation."}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if hasattr(event.delta, "text"):
                print(event.delta.text, end="", flush=True)
        elif event.type == "message_start":
            print(f"[Model: {event.message.model}]")
        elif event.type == "message_delta":
            if event.usage:
                print(f"\n[Output tokens: {event.usage.output_tokens}]")

Low-level: stream=True parameter

For maximum control, use client.messages.create(stream=True):

import anthropic, json

client = anthropic.Anthropic()

with client.messages.create(
    model="claude-opus-4-6",
    max_tokens=4096,
    stream=True,
    messages=[{"role": "user", "content": "Write a long essay on machine learning."}]
) as response:
    for line in response.iter_lines():
        if line.startswith("data: "):
            data = line[6:]
            if data == "[DONE]":
                break
            try:
                event = json.loads(data)
                if event.get("type") == "content_block_delta":
                    delta = event.get("delta", {})
                    if delta.get("type") == "text_delta":
                        print(delta.get("text", ""), end="", flush=True)
            except json.JSONDecodeError:
                pass

In production code, prefer the SDK's stream() context manager over manual SSE parsing โ€” it handles reconnection, error propagation, and event accumulation correctly.

9.4 TypeScript / Node.js Streaming

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

async function streamChat(userMessage: string): Promise<void> {
  const stream = await client.messages.stream({
    model: "claude-sonnet-4-6",
    max_tokens: 1024,
    messages: [{ role: "user", content: userMessage }],
  });

  process.stdout.write("Assistant: ");

  for await (const chunk of stream) {
    if (
      chunk.type === "content_block_delta" &&
      chunk.delta.type === "text_delta"
    ) {
      process.stdout.write(chunk.delta.text);
    }
  }

  const final = await stream.getFinalMessage();
  console.log(`\n\n[Tokens: ${final.usage.output_tokens}]`);
}

// Forwarding a stream from an Express server
import express from "express";
const app = express();

app.get("/chat", async (req, res) => {
  const message = req.query.message as string;

  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");
  res.setHeader("X-Accel-Buffering", "no");

  const stream = await client.messages.stream({
    model: "claude-sonnet-4-6",
    max_tokens: 1024,
    messages: [{ role: "user", content: message }],
  });

  for await (const chunk of stream) {
    if (
      chunk.type === "content_block_delta" &&
      chunk.delta.type === "text_delta"
    ) {
      res.write(`data: ${JSON.stringify({ text: chunk.delta.text })}\n\n`);
    }
  }

  res.write("data: [DONE]\n\n");
  res.end();
});

9.5 Incremental Rendering Techniques

React real-time rendering

import { useState, useCallback } from "react";

function ChatInterface() {
  const [response, setResponse] = useState("");
  const [isStreaming, setIsStreaming] = useState(false);

  const sendMessage = useCallback(async (message: string) => {
    setResponse("");
    setIsStreaming(true);

    const eventSource = new EventSource(
      `/api/chat?message=${encodeURIComponent(message)}`
    );

    eventSource.onmessage = (event) => {
      if (event.data === "[DONE]") {
        eventSource.close();
        setIsStreaming(false);
        return;
      }
      const data = JSON.parse(event.data);
      if (data.text) {
        setResponse((prev) => prev + data.text);
      }
    };

    eventSource.onerror = () => {
      eventSource.close();
      setIsStreaming(false);
    };
  }, []);

  return (
    <div>
      <div className="response-area">
        {response}
        {isStreaming && <span className="cursor-blink">โ–‹</span>}
      </div>
    </div>
  );
}

The Markdown incremental rendering problem

Streaming Markdown presents a classic challenge: syntax markers can be split across delta chunks. The sequence **bold** might arrive as **bo, ld, ** โ€” rendering after each delta shows raw asterisks briefly.

Solution: buffered rendering with debounce

class StreamingMarkdownRenderer {
  private buffer = "";
  private renderTimeout: ReturnType<typeof setTimeout> | null = null;
  private readonly RENDER_DELAY = 50; // ms

  appendText(delta: string): void {
    this.buffer += delta;
    if (this.renderTimeout) clearTimeout(this.renderTimeout);
    this.renderTimeout = setTimeout(() => this.render(), this.RENDER_DELAY);
  }

  private render(): void {
    const safe = this.getSafeBuffer();
    document.getElementById("output")!.innerHTML = marked(safe);
  }

  private getSafeBuffer(): string {
    // Detect unclosed fenced code blocks
    const openFence = (this.buffer.match(/```/g) || []).length % 2 !== 0;
    return openFence ? this.buffer + "\n```" : this.buffer;
  }
}

9.6 Streaming Tool Call Handling

During tool use, content_block_delta carries input_json_delta โ€” partial JSON that must be accumulated before parsing:

import anthropic, json

client = anthropic.Anthropic()

tools = [{
    "name": "get_weather",
    "description": "Get weather for a city",
    "input_schema": {
        "type": "object",
        "properties": {
            "city": {"type": "string"},
            "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
        },
        "required": ["city"]
    }
}]

tool_calls: dict = {}

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    tools=tools,
    messages=[{"role": "user", "content": "What's the weather in Paris?"}]
) as stream:
    for event in stream:
        if event.type == "content_block_start":
            if event.content_block.type == "tool_use":
                tool_calls[event.index] = {
                    "name": event.content_block.name,
                    "id": event.content_block.id,
                    "json_str": ""
                }

        elif event.type == "content_block_delta":
            if event.delta.type == "input_json_delta":
                if event.index in tool_calls:
                    tool_calls[event.index]["json_str"] += event.delta.partial_json

        elif event.type == "content_block_stop":
            if event.index in tool_calls:
                tc = tool_calls[event.index]
                tc["input"] = json.loads(tc["json_str"])
                print(f"Tool call: {tc['name']}({tc['input']})")

9.7 Error Recovery and Retry Strategies

Error taxonomy

Error type HTTP status Strategy
Rate limit 429 Exponential backoff
Overloaded 529 Short wait then retry
Timeout 408 / network drop Resume if possible
Content filter 400 Do not retry; modify input
Server error 500/503 Limited retries

Streaming client with retry

import anthropic, time, logging
from typing import Generator

logger = logging.getLogger(__name__)

def stream_with_retry(
    client: anthropic.Anthropic,
    max_retries: int = 3,
    **kwargs
) -> Generator[str, None, None]:
    for attempt in range(max_retries):
        try:
            with client.messages.stream(**kwargs) as stream:
                for text in stream.text_stream:
                    yield text
            return

        except anthropic.RateLimitError:
            if attempt == max_retries - 1:
                raise
            wait = 2 ** attempt  # 1s, 2s, 4s
            logger.warning(f"Rate limited, waiting {wait}s (attempt {attempt+1})")
            time.sleep(wait)

        except anthropic.APIStatusError as e:
            if e.status_code in (500, 503, 529):
                if attempt == max_retries - 1:
                    raise
                time.sleep(0.5 * (2 ** attempt))
            else:
                raise

        except (anthropic.APIConnectionError, anthropic.APITimeoutError):
            if attempt == max_retries - 1:
                raise
            time.sleep(1.0)

9.8 Performance Optimization

Reducing TTFT

  1. Choose a faster model โ€” claude-haiku-4-5-20251001 has 50โ€“70% lower TTFT than claude-opus-4-6
  2. Shorten system prompts โ€” long prefills add processing time before the first token
  3. Set realistic max_tokens โ€” over-allocating can increase scheduling latency
  4. Use async clients for concurrent requests

Async parallel streaming

import asyncio, anthropic

async def stream_single(client, message: str) -> str:
    result = []
    async with client.messages.stream(
        model="claude-haiku-4-5-20251001",
        max_tokens=512,
        messages=[{"role": "user", "content": message}]
    ) as stream:
        async for text in stream.text_stream:
            result.append(text)
    return "".join(result)

async def parallel_streams(messages: list[str]) -> list[str]:
    client = anthropic.AsyncAnthropic()
    return await asyncio.gather(*[stream_single(client, m) for m in messages])

results = asyncio.run(parallel_streams([
    "Explain machine learning",
    "Explain deep learning",
    "Explain reinforcement learning"
]))

9.9 Production Deployment

Nginx configuration

location /api/chat {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Connection "";
    proxy_buffering off;
    proxy_cache off;
    proxy_read_timeout 300s;
    proxy_send_timeout 300s;
    add_header X-Accel-Buffering no;
}

Client timeout settings

client = anthropic.Anthropic(
    timeout=anthropic.Timeout(
        connect=5.0,
        read=300.0,   # Must be large for long streams
        write=10.0,
        pool=5.0
    )
)

Monitoring metrics


Summary

Streaming transforms the user experience from "wait for everything" to "instant feedback." Key takeaways:

  1. Use client.messages.stream() rather than manual SSE parsing
  2. Accumulate input_json_delta fragments before parsing tool call inputs
  3. Implement exponential backoff for rate limits and server errors
  4. Disable proxy buffering in Nginx with proxy_buffering off
  5. Monitor TTFT as the primary UX metric, not total response time
Rate this chapter
4.7  / 5  (57 ratings)

๐Ÿ’ฌ Comments