第 9 章

System Prompt 工程:角色注入、约束设计与人格一致性的完整方法论

第九章:流式输出(Streaming):SSE 协议、增量渲染与错误恢复

9.1 为什么需要流式输出

在没有流式传输的情况下,一次完整的 Claude API 调用需要等待模型生成全部 token 之后,才将响应一次性返回给客户端。对于一个生成 1000 个 token 的回复,用户可能需要等待 10 到 20 秒才能看到任何内容。这种"白屏等待"体验在现代对话产品中是不可接受的。

流式输出(Streaming)通过服务器推送事件(Server-Sent Events,SSE)协议解决了这个问题:模型每生成几个 token,就立即将其推送到客户端,用户看到文字逐字出现,感知延迟从"总生成时间"降低到"首 token 时间(TTFT,Time To First Token)",通常只有 300ms 到 1 秒。

流式输出的适用场景

场景 是否推荐流式 原因
聊天对话界面 强烈推荐 用户体验关键指标
代码生成 推荐 用户可边看边审查
文档摘要(后台任务) 不推荐 结果完整性优先
Batch 异步处理 不适用 使用专用 Batch API
工具调用结果处理 视情况 需要完整 JSON 才能解析

9.2 SSE 协议基础

Server-Sent Events 是 W3C 标准,基于 HTTP 长连接,服务器可以持续向客户端推送文本事件流。其格式非常简单:

data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}\n\n
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" World"}}\n\n
data: [DONE]\n\n

每个事件以 data: 开头,以双换行 \n\n 结束。Claude API 在 SSE 数据中嵌套 JSON 对象,包含事件类型和具体内容。

Claude Streaming 事件序列

一次完整的流式响应会按顺序产生以下事件:

message_start          → 响应元数据(model、usage 初始值等)
content_block_start    → 内容块开始(index=0,type=text)
ping                   → 心跳(每隔一段时间)
content_block_delta    → 文本增量(反复出现)
content_block_stop     → 内容块结束
message_delta          → 消息级别的增量(stop_reason、usage 更新)
message_stop           → 响应完成

理解这个事件序列对于正确处理流式输出至关重要,尤其是在涉及工具调用时,content_block_start 会携带 type=tool_use,而 content_block_delta 会携带 JSON 增量片段。

9.3 Python 流式调用实现

基础流式调用

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "用中文解释量子纠缠的原理"}
    ]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
    
    # 流完成后获取完整消息对象
    final_message = stream.get_final_message()
    print(f"\n\n[使用 token: {final_message.usage.input_tokens} 输入, "
          f"{final_message.usage.output_tokens} 输出]")

这是最简洁的方式。client.messages.stream() 是官方 Python SDK 提供的上下文管理器,stream.text_stream 自动过滤非文本事件,直接产出纯文本增量。

原始事件处理

如果需要处理所有事件(例如工具调用或使用量统计),使用原始事件流:

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=2048,
    system="你是一个专业的技术文档助手。",
    messages=[
        {"role": "user", "content": "写一个 Python 快速排序实现"}
    ]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if hasattr(event.delta, "text"):
                print(event.delta.text, end="", flush=True)
        elif event.type == "message_start":
            print(f"[模型: {event.message.model}]")
        elif event.type == "message_delta":
            if event.usage:
                print(f"\n[输出 token: {event.usage.output_tokens}]")

低级 API:直接使用 stream=True 参数

对于需要更底层控制的场景,可以使用 client.messages.create(stream=True)

import anthropic
import json

client = anthropic.Anthropic()

with client.messages.create(
    model="claude-opus-4-6",
    max_tokens=4096,
    stream=True,
    messages=[
        {"role": "user", "content": "生成一篇关于机器学习的长文章"}
    ]
) as response:
    for line in response.iter_lines():
        if line.startswith("data: "):
            data = line[6:]
            if data == "[DONE]":
                break
            try:
                event = json.loads(data)
                if event.get("type") == "content_block_delta":
                    delta = event.get("delta", {})
                    if delta.get("type") == "text_delta":
                        print(delta.get("text", ""), end="", flush=True)
            except json.JSONDecodeError:
                pass

注意:在实际生产代码中,官方 SDK 的 stream() 上下文管理器比手动解析 SSE 更安全可靠,推荐优先使用。

9.4 TypeScript/Node.js 流式实现

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

async function streamChat(userMessage: string): Promise<void> {
  const stream = await client.messages.stream({
    model: "claude-sonnet-4-6",
    max_tokens: 1024,
    messages: [{ role: "user", content: userMessage }],
  });

  process.stdout.write("Assistant: ");

  for await (const chunk of stream) {
    if (
      chunk.type === "content_block_delta" &&
      chunk.delta.type === "text_delta"
    ) {
      process.stdout.write(chunk.delta.text);
    }
  }

  const finalMessage = await stream.getFinalMessage();
  console.log(`\n\n[Tokens: ${finalMessage.usage.output_tokens}]`);
}

// 在 Web 服务中转发流式响应
import express from "express";
const app = express();

app.get("/chat", async (req, res) => {
  const message = req.query.message as string;

  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  const stream = await client.messages.stream({
    model: "claude-sonnet-4-6",
    max_tokens: 1024,
    messages: [{ role: "user", content: message }],
  });

  for await (const chunk of stream) {
    if (
      chunk.type === "content_block_delta" &&
      chunk.delta.type === "text_delta"
    ) {
      res.write(`data: ${JSON.stringify({ text: chunk.delta.text })}\n\n`);
    }
  }

  res.write("data: [DONE]\n\n");
  res.end();
});

9.5 增量渲染技术

前端实时渲染

在 React 应用中,增量渲染是关键的用户体验技术:

import { useState, useCallback } from "react";

function ChatInterface() {
  const [response, setResponse] = useState("");
  const [isStreaming, setIsStreaming] = useState(false);

  const sendMessage = useCallback(async (message: string) => {
    setResponse("");
    setIsStreaming(true);

    try {
      const eventSource = new EventSource(
        `/api/chat?message=${encodeURIComponent(message)}`
      );

      eventSource.onmessage = (event) => {
        if (event.data === "[DONE]") {
          eventSource.close();
          setIsStreaming(false);
          return;
        }

        const data = JSON.parse(event.data);
        if (data.text) {
          setResponse((prev) => prev + data.text);
        }
      };

      eventSource.onerror = () => {
        eventSource.close();
        setIsStreaming(false);
      };
    } catch (error) {
      setIsStreaming(false);
    }
  }, []);

  return (
    <div>
      <div className="response-area">
        {response}
        {isStreaming && <span className="cursor-blink">▋</span>}
      </div>
    </div>
  );
}

Markdown 增量渲染的挑战

流式输出 Markdown 时面临一个经典问题:Markdown 语法标记可能被分割在多个增量块中。例如,**粗体** 可能以 **粗** 三个 delta 到达,如果每次 delta 后立即渲染,会出现短暂的 **粗体** 原始文本。

解决方案:缓冲渲染

class StreamingMarkdownRenderer {
  private buffer: string = "";
  private renderTimeout: ReturnType<typeof setTimeout> | null = null;
  private readonly RENDER_DELAY = 50; // ms

  appendText(delta: string): void {
    this.buffer += delta;

    // 防抖:累积一段时间后再渲染
    if (this.renderTimeout) {
      clearTimeout(this.renderTimeout);
    }
    this.renderTimeout = setTimeout(() => {
      this.render();
    }, this.RENDER_DELAY);
  }

  private render(): void {
    // 检测是否处于未闭合的 Markdown 结构中
    const safeBuffer = this.getSafeRenderBuffer();
    // 调用 Markdown 解析器(如 marked.js)渲染
    document.getElementById("output")!.innerHTML = marked(safeBuffer);
  }

  private getSafeRenderBuffer(): string {
    // 检测未闭合的代码块、加粗、斜体等
    const openCodeBlock = (this.buffer.match(/```/g) || []).length % 2 !== 0;
    if (openCodeBlock) {
      return this.buffer + "\n```"; // 临时闭合
    }
    return this.buffer;
  }
}

9.6 流式工具调用处理

当模型在流式模式下调用工具时,content_block_delta 会携带 JSON 增量,需要累积完整字符串后再解析:

import anthropic
import json

client = anthropic.Anthropic()

tools = [
    {
        "name": "get_weather",
        "description": "获取指定城市的天气信息",
        "input_schema": {
            "type": "object",
            "properties": {
                "city": {"type": "string", "description": "城市名称"},
                "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
            },
            "required": ["city"]
        }
    }
]

# 累积工具调用输入的辅助结构
tool_calls = {}  # index -> {name, input_json_str}

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    tools=tools,
    messages=[{"role": "user", "content": "北京今天天气怎么样?"}]
) as stream:
    for event in stream:
        if event.type == "content_block_start":
            if event.content_block.type == "tool_use":
                tool_calls[event.index] = {
                    "name": event.content_block.name,
                    "id": event.content_block.id,
                    "input_json_str": ""
                }

        elif event.type == "content_block_delta":
            if event.delta.type == "input_json_delta":
                if event.index in tool_calls:
                    tool_calls[event.index]["input_json_str"] += event.delta.partial_json

        elif event.type == "content_block_stop":
            if event.index in tool_calls:
                tc = tool_calls[event.index]
                try:
                    tc["input"] = json.loads(tc["input_json_str"])
                    print(f"工具调用: {tc['name']}({tc['input']})")
                except json.JSONDecodeError as e:
                    print(f"JSON 解析失败: {e}")

9.7 错误恢复与重试策略

常见流式错误类型

错误类型 HTTP 状态码 处理策略
速率限制 429 指数退避重试
过载 529 短暂等待后重试
超时 408 / 网络断开 从最后位置恢复(如有)
内容过滤 400 不重试,修改输入
服务器错误 500/503 有限次数重试

带重试的流式客户端

import anthropic
import time
import logging
from typing import Generator

logger = logging.getLogger(__name__)

def stream_with_retry(
    client: anthropic.Anthropic,
    max_retries: int = 3,
    **kwargs
) -> Generator[str, None, None]:
    """带指数退避重试的流式生成器"""
    
    for attempt in range(max_retries):
        try:
            with client.messages.stream(**kwargs) as stream:
                for text in stream.text_stream:
                    yield text
            return  # 成功完成,退出
            
        except anthropic.RateLimitError as e:
            if attempt == max_retries - 1:
                raise
            wait_time = (2 ** attempt) * 1.0  # 1s, 2s, 4s
            logger.warning(f"速率限制,等待 {wait_time}s 后重试 (第{attempt+1}次)")
            time.sleep(wait_time)
            
        except anthropic.APIStatusError as e:
            if e.status_code in (500, 503, 529):
                if attempt == max_retries - 1:
                    raise
                wait_time = (2 ** attempt) * 0.5
                logger.warning(f"服务器错误 {e.status_code},重试中...")
                time.sleep(wait_time)
            else:
                raise  # 不可重试的错误直接抛出
                
        except (anthropic.APIConnectionError, 
                anthropic.APITimeoutError) as e:
            if attempt == max_retries - 1:
                raise
            logger.warning(f"连接错误,重试中: {e}")
            time.sleep(1.0)

# 使用示例
client = anthropic.Anthropic()

for text in stream_with_retry(
    client,
    max_retries=3,
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{"role": "user", "content": "介绍一下 Python 协程"}]
):
    print(text, end="", flush=True)

断点续传模式

对于非常长的生成任务,可以实现一种"检查点"模式:

class ResumableStream:
    """支持断点续传的流式生成器"""
    
    def __init__(self, client, messages, **kwargs):
        self.client = client
        self.messages = messages
        self.kwargs = kwargs
        self.accumulated_text = ""
    
    def stream(self) -> Generator[str, None, None]:
        # 如果已有部分内容,将其作为 assistant prefill 加入对话
        messages = list(self.messages)
        if self.accumulated_text:
            messages.append({
                "role": "assistant",
                "content": self.accumulated_text
            })
        
        with self.client.messages.stream(
            messages=messages,
            **self.kwargs
        ) as stream:
            for text in stream.text_stream:
                self.accumulated_text += text
                yield text
    
    def get_accumulated(self) -> str:
        return self.accumulated_text

9.8 性能优化与最佳实践

TTFT 优化

首 token 时间(TTFT)是用户感知的关键指标。以下措施可以降低 TTFT:

  1. 选择合适的模型claude-haiku-4-5-20251001 的 TTFT 通常比 claude-opus-4-6 低 50-70%
  2. 减少系统提示词长度:长 system prompt 会增加 prefill 时间
  3. 使用 API 密钥就近路由:Anthropic 在多个区域有节点
  4. 减少 max_tokens:过大的 max_tokens 设置会让模型"预留"更多计算资源

并发流式处理

import asyncio
import anthropic

async def stream_single(client, message: str) -> str:
    """异步流式单次请求"""
    result = []
    async with client.messages.stream(
        model="claude-haiku-4-5-20251001",
        max_tokens=512,
        messages=[{"role": "user", "content": message}]
    ) as stream:
        async for text in stream.text_stream:
            result.append(text)
    return "".join(result)

async def parallel_streams(messages: list[str]) -> list[str]:
    """并行处理多个流式请求"""
    client = anthropic.AsyncAnthropic()
    tasks = [stream_single(client, msg) for msg in messages]
    return await asyncio.gather(*tasks)

# 使用
results = asyncio.run(parallel_streams([
    "解释机器学习",
    "解释深度学习",
    "解释强化学习"
]))

流式输出的 Token 计算

流式模式下,usage 统计在 message_delta 事件中提供。注意:

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{"role": "user", "content": "你好"}]
) as stream:
    for event in stream:
        if event.type == "message_start":
            print(f"输入 tokens: {event.message.usage.input_tokens}")
        elif event.type == "message_delta":
            if hasattr(event, "usage"):
                print(f"输出 tokens: {event.usage.output_tokens}")

9.9 生产部署注意事项

Nginx 配置

流式响应需要特殊的 Nginx 配置,避免缓冲区导致延迟:

location /api/chat {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Connection "";
    
    # 禁用缓冲,确保 SSE 实时推送
    proxy_buffering off;
    proxy_cache off;
    
    # 增加超时时间
    proxy_read_timeout 300s;
    proxy_send_timeout 300s;
    
    # SSE 必需的响应头
    add_header X-Accel-Buffering no;
}

客户端超时设置

client = anthropic.Anthropic(
    timeout=anthropic.Timeout(
        connect=5.0,    # 连接超时
        read=300.0,     # 读取超时(流式要设大)
        write=10.0,     # 写入超时
        pool=5.0        # 连接池超时
    )
)

监控指标

生产环境中应监控以下流式相关指标:


小结

流式输出通过 SSE 协议将用户等待体验从"整体等待"转变为"即时反馈",是现代对话 AI 产品的标配能力。核心要点:

  1. 使用 client.messages.stream() 上下文管理器而非手动解析 SSE
  2. 工具调用需要累积 input_json_delta 片段后再解析
  3. 实现指数退避重试处理速率限制和服务器错误
  4. 生产环境需配置 Nginx 禁用代理缓冲
  5. 监控 TTFT 而非总响应时间作为用户体验指标
本章评分
4.7  / 5  (57 评分)

💬 留言讨论