实时功能:Server-Sent Events 与 WebSocket
第24章:实时功能:Server-Sent Events 与 WebSocket
在 Next.js App Router 里,SSE 是原生支持的,WebSocket 不是。SSE 基于标准 HTTP,适合通知、进度条、AI 流式输出等服务器主动推送场景。
本章核心问题:SSE 与 WebSocket 的根本差异是什么?如何在 Route Handler 中实现 SSE?Next.js 中 WebSocket 的替代方案有哪些?
读完本章你将理解:
- SSE 的 ReadableStream 实现与心跳保活机制
- 基于 EventEmitter 的广播通知与 AI 流式输出(Vercel AI SDK)
- WebSocket 替代方案:Pusher/Ably 托管服务、独立 WS 服务器、SSE + Server Actions 组合
Level 1 · 你需要知道的(1-3年经验)
实时通信的两种协议
构建实时功能时,开发者通常在 SSE(Server-Sent Events)和 WebSocket 之间选择。理解两者的根本差异是做出正确选择的前提。
WebSocket 是全双工协议——客户端和服务器可以随时向对方发送消息,真正的双向通信。适合需要客户端频繁向服务器发送数据的场景:多人协作编辑、实时游戏、交易所行情。
SSE 是单向的——服务器向客户端推送消息,客户端通过标准 HTTP 建立长连接监听。如果客户端需要发送数据,仍然用普通 HTTP 请求。表面上的"限制"其实是大多数实时场景的正确模型:通知、进度条、流式输出、股价推送——这些都是服务器主动推送,客户端被动接收。
在 Next.js App Router 里,SSE 是原生支持的,WebSocket 不是。Route Handler 底层是标准的 Web 标准 Response,支持流式输出,这正是 SSE 所需要的。WebSocket 需要 HTTP 协议升级(101 Switching Protocols),这在 Next.js Route Handlers 里不被支持。
SSE 的工作原理
SSE 的实现极其简单。服务器返回 Content-Type: text/event-stream 的响应,保持连接不关闭,持续写入格式化数据:
data: {"message": "Hello"}\n\n
data: {"message": "World"}\n\n
event: ping\ndata: {}\n\n
每条消息以两个换行符结束。客户端使用浏览器原生的 EventSource API 接收:
const es = new EventSource('/api/events')
es.onmessage = (e) => console.log(JSON.parse(e.data))
es.addEventListener('ping', (e) => console.log('ping'))
这就是 SSE 的全部协议。没有复杂的握手,没有二进制帧,完全基于 HTTP。
Level 2 · 它是怎么运行的(3-5年经验)
在 Route Handler 中实现 SSE
// app/api/events/route.ts
import { NextRequest } from 'next/server'
import { auth } from '@/auth'
export async function GET(request: NextRequest) {
const session = await auth()
if (!session) {
return new Response('Unauthorized', { status: 401 })
}
const encoder = new TextEncoder()
// ReadableStream 是 SSE 的核心
const stream = new ReadableStream({
start(controller) {
// 发送初始连接确认
controller.enqueue(
encoder.encode('data: {"type":"connected"}\n\n')
)
// 心跳:每 30 秒发一次,防止连接超时
const heartbeat = setInterval(() => {
try {
controller.enqueue(encoder.encode(': heartbeat\n\n'))
} catch {
clearInterval(heartbeat)
}
}, 30000)
// 监听客户端断开连接
request.signal.addEventListener('abort', () => {
clearInterval(heartbeat)
controller.close()
})
},
})
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
// 防止 Nginx/代理服务器缓冲响应
'X-Accel-Buffering': 'no',
},
})
}
注意 : heartbeat\n\n——以冒号开头的行是 SSE 注释,客户端忽略,但可以防止代理服务器和负载均衡器因为长时间无数据而关闭"空闲"连接。
广播通知:基于发布订阅
真实场景里,SSE 连接需要能够接收服务器其他部分触发的事件。在单进程 Node.js 应用里,可以用内存中的事件发射器:
// lib/eventEmitter.ts
import { EventEmitter } from 'events'
// 全局单例(利用 global 对象避免 HMR 创建多实例)
const globalForEmitter = globalThis as unknown as { emitter: EventEmitter }
export const emitter = globalForEmitter.emitter ?? new EventEmitter()
if (process.env.NODE_ENV !== 'production') globalForEmitter.emitter = emitter
// 设置高上限避免内存泄漏警告(每个 SSE 连接添加一个 listener)
emitter.setMaxListeners(1000)
// app/api/notifications/route.ts
import { NextRequest } from 'next/server'
import { auth } from '@/auth'
import { emitter } from '@/lib/eventEmitter'
export async function GET(request: NextRequest) {
const session = await auth()
if (!session) return new Response('Unauthorized', { status: 401 })
const userId = session.user.id
const encoder = new TextEncoder()
const stream = new ReadableStream({
start(controller) {
function sendNotification(notification: object) {
const data = `data: ${JSON.stringify(notification)}\n\n`
controller.enqueue(encoder.encode(data))
}
// 监听发给该用户的通知
emitter.on(`notification:${userId}`, sendNotification)
// 清理:连接断开时移除 listener
request.signal.addEventListener('abort', () => {
emitter.off(`notification:${userId}`, sendNotification)
controller.close()
})
},
})
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no',
},
})
}
从其他地方触发通知:
// app/api/admin/notify/route.ts
import { NextRequest, NextResponse } from 'next/server'
import { emitter } from '@/lib/eventEmitter'
export async function POST(request: NextRequest) {
const { userId, message, type } = await request.json()
emitter.emit(`notification:${userId}`, {
id: crypto.randomUUID(),
message,
type,
timestamp: new Date().toISOString(),
})
return NextResponse.json({ sent: true })
}
重要限制:内存事件发射器只在单进程内有效。如果你的应用运行多个实例(Kubernetes、多 Vercel 实例),不同进程间的 SSE 连接无法互相通信。多进程场景需要外部消息队列(Redis Pub/Sub、Upstash Redis)。
AI 流式输出:ChatGPT 模式
这是当前最热门的 SSE 使用场景。AI SDK 的 streamText 函数与 Next.js Route Handler 天然配合:
// app/api/chat/route.ts
import { streamText } from 'ai'
import { openai } from '@ai-sdk/openai'
import { NextRequest } from 'next/server'
export async function POST(request: NextRequest) {
const { messages } = await request.json()
const result = await streamText({
model: openai('gpt-4o'),
messages,
system: 'You are a helpful assistant.',
})
// toDataStreamResponse() 返回 SSE 格式的 Response
return result.toDataStreamResponse()
}
// app/chat/page.tsx
'use client'
import { useChat } from 'ai/react'
export default function ChatPage() {
const { messages, input, handleInputChange, handleSubmit, isLoading } = useChat({
api: '/api/chat',
})
return (
<div className="flex flex-col h-screen">
<div className="flex-1 overflow-y-auto p-4 space-y-4">
{messages.map(m => (
<div key={m.id} className={m.role === 'user' ? 'text-right' : 'text-left'}>
<div className="inline-block p-3 rounded-lg bg-gray-100 max-w-xl">
{m.content}
</div>
</div>
))}
{isLoading && <div className="text-gray-400">AI 正在思考...</div>}
</div>
<form onSubmit={handleSubmit} className="p-4 border-t">
<input
value={input}
onChange={handleInputChange}
placeholder="输入消息..."
className="w-full border rounded px-3 py-2"
/>
</form>
</div>
)
}
Level 3 · 规范怎么定义的(资深)
进度条:长任务的实时反馈
SSE 非常适合报告长时间运行任务的进度:
// app/api/process/route.ts
import { NextRequest } from 'next/server'
export async function POST(request: NextRequest) {
const { taskId } = await request.json()
const encoder = new TextEncoder()
function send(data: object) {
return encoder.encode(`data: ${JSON.stringify(data)}\n\n`)
}
const stream = new ReadableStream({
async start(controller) {
try {
// 模拟多阶段处理
const steps = [
{ step: 1, label: '读取数据', total: 4 },
{ step: 2, label: '处理中', total: 4 },
{ step: 3, label: '生成报告', total: 4 },
{ step: 4, label: '完成', total: 4 },
]
for (const step of steps) {
controller.enqueue(send({ type: 'progress', ...step, percent: (step.step / step.total) * 100 }))
// 模拟实际工作
await new Promise(r => setTimeout(r, 1000))
}
controller.enqueue(send({ type: 'complete', taskId, result: '处理完成' }))
controller.close()
} catch (err) {
controller.enqueue(send({ type: 'error', message: String(err) }))
controller.close()
}
},
})
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
},
})
}
客户端 EventSource 封装
// hooks/useSSE.ts
'use client'
import { useEffect, useRef, useState } from 'react'
interface SSEOptions<T> {
url: string
onMessage: (data: T) => void
onError?: (error: Event) => void
enabled?: boolean
}
export function useSSE<T>({ url, onMessage, onError, enabled = true }: SSEOptions<T>) {
const [status, setStatus] = useState<'connecting' | 'connected' | 'closed'>('connecting')
const esRef = useRef<EventSource | null>(null)
useEffect(() => {
if (!enabled) return
const es = new EventSource(url)
esRef.current = es
es.addEventListener('open', () => setStatus('connected'))
es.addEventListener('message', (e) => {
try {
onMessage(JSON.parse(e.data))
} catch {
console.error('Failed to parse SSE message:', e.data)
}
})
es.addEventListener('error', (e) => {
setStatus('closed')
onError?.(e)
// EventSource 会自动重连(指数退避),通常不需要手动处理
})
return () => {
es.close()
setStatus('closed')
}
}, [url, enabled])
return { status, close: () => esRef.current?.close() }
}
WebSocket 的替代方案
Next.js Route Handlers 不支持 WebSocket。如果你的场景确实需要双向实时通信,有三个方向:
方向一:Pusher / Ably(托管 WebSocket 服务)
npm install pusher pusher-js
// app/api/pusher/auth/route.ts — Pusher 频道认证
import { NextRequest, NextResponse } from 'next/server'
import Pusher from 'pusher'
import { auth } from '@/auth'
const pusher = new Pusher({
appId: process.env.PUSHER_APP_ID!,
key: process.env.NEXT_PUBLIC_PUSHER_KEY!,
secret: process.env.PUSHER_SECRET!,
cluster: process.env.PUSHER_CLUSTER!,
})
export async function POST(request: NextRequest) {
const session = await auth()
if (!session) return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
const formData = await request.formData()
const socketId = formData.get('socket_id') as string
const channel = formData.get('channel_name') as string
// 私有频道认证
const authResponse = pusher.authorizeChannel(socketId, channel, {
user_id: session.user.id,
user_info: { name: session.user.name },
})
return NextResponse.json(authResponse)
}
// 从服务端触发事件
await pusher.trigger('private-chat', 'message', {
text: 'Hello',
userId: session.user.id,
})
Pusher/Ably 把 WebSocket 基础设施外包出去,你只需要调用 API 触发事件、客户端监听。代价是依赖第三方服务和额外费用。
方向二:自定义 WebSocket 服务器
适合完全控制的场景:创建一个独立的 Node.js WebSocket 服务器(使用 ws 库),与 Next.js 应用分开部署,通过内部 API 通信。
方向三:SSE + Server Actions 组合
对于"实时表单"类场景(用户提交操作,希望实时看到服务端处理进度),SSE + Server Actions 是优雅的纯 Next.js 方案:
// 客户端逻辑
'use client'
import { useState } from 'react'
import { useSSE } from '@/hooks/useSSE'
import { startProcessing } from './actions'
export function ProcessForm() {
const [taskId, setTaskId] = useState<string | null>(null)
const [progress, setProgress] = useState(0)
const [done, setDone] = useState(false)
useSSE({
url: taskId ? `/api/task-progress/${taskId}` : '',
enabled: !!taskId && !done,
onMessage: (data: { type: string; percent: number }) => {
if (data.type === 'progress') setProgress(data.percent)
if (data.type === 'complete') setDone(true)
},
})
async function handleSubmit() {
const id = await startProcessing()
setTaskId(id)
}
return (
<div>
<button onClick={handleSubmit}>开始处理</button>
{taskId && (
<div>
<div>进度: {progress}%</div>
{done && <div>处理完成!</div>}
</div>
)}
</div>
)
}
// app/actions.ts
'use server'
export async function startProcessing(): Promise<string> {
const taskId = crypto.randomUUID()
// 将任务 ID 存储到数据库或 Redis,供进度 API 读取
// 异步启动后台任务...
return taskId
}
SSE 是 Next.js 里实时功能的首选方案:原生支持、协议简单、客户端自动重连、基于标准 HTTP(防火墙和代理友好)。只有在真正需要客户端频繁双向通信时,才考虑 WebSocket 方案或托管服务。
Level 4 · 边界与陷阱(所有人)
陷阱1:内存事件发射器只在单进程内有效——多实例部署需要外部消息队列(Redis Pub/Sub)。
陷阱2:SSE 心跳(: heartbeat\n\n)以冒号开头,客户端会忽略——但它可以防止代理服务器因长时间无数据而关闭连接。
陷阱3:EventSource 会自动重连(指数退避)——通常不需要手动处理重连逻辑。