第 24 章

实时功能:Server-Sent Events 与 WebSocket

第24章:实时功能:Server-Sent Events 与 WebSocket

在 Next.js App Router 里,SSE 是原生支持的,WebSocket 不是。SSE 基于标准 HTTP,适合通知、进度条、AI 流式输出等服务器主动推送场景。

本章核心问题:SSE 与 WebSocket 的根本差异是什么?如何在 Route Handler 中实现 SSE?Next.js 中 WebSocket 的替代方案有哪些?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

实时通信的两种协议

构建实时功能时,开发者通常在 SSE(Server-Sent Events)和 WebSocket 之间选择。理解两者的根本差异是做出正确选择的前提。

WebSocket 是全双工协议——客户端和服务器可以随时向对方发送消息,真正的双向通信。适合需要客户端频繁向服务器发送数据的场景:多人协作编辑、实时游戏、交易所行情。

SSE 是单向的——服务器向客户端推送消息,客户端通过标准 HTTP 建立长连接监听。如果客户端需要发送数据,仍然用普通 HTTP 请求。表面上的"限制"其实是大多数实时场景的正确模型:通知、进度条、流式输出、股价推送——这些都是服务器主动推送,客户端被动接收。

在 Next.js App Router 里,SSE 是原生支持的,WebSocket 不是。Route Handler 底层是标准的 Web 标准 Response,支持流式输出,这正是 SSE 所需要的。WebSocket 需要 HTTP 协议升级(101 Switching Protocols),这在 Next.js Route Handlers 里不被支持。

SSE 的工作原理

SSE 的实现极其简单。服务器返回 Content-Type: text/event-stream 的响应,保持连接不关闭,持续写入格式化数据:

data: {"message": "Hello"}\n\n
data: {"message": "World"}\n\n
event: ping\ndata: {}\n\n

每条消息以两个换行符结束。客户端使用浏览器原生的 EventSource API 接收:

const es = new EventSource('/api/events')
es.onmessage = (e) => console.log(JSON.parse(e.data))
es.addEventListener('ping', (e) => console.log('ping'))

这就是 SSE 的全部协议。没有复杂的握手,没有二进制帧,完全基于 HTTP。

Level 2 · 它是怎么运行的(3-5年经验)

在 Route Handler 中实现 SSE

// app/api/events/route.ts
import { NextRequest } from 'next/server'
import { auth } from '@/auth'

export async function GET(request: NextRequest) {
  const session = await auth()
  if (!session) {
    return new Response('Unauthorized', { status: 401 })
  }

  const encoder = new TextEncoder()

  // ReadableStream 是 SSE 的核心
  const stream = new ReadableStream({
    start(controller) {
      // 发送初始连接确认
      controller.enqueue(
        encoder.encode('data: {"type":"connected"}\n\n')
      )

      // 心跳:每 30 秒发一次,防止连接超时
      const heartbeat = setInterval(() => {
        try {
          controller.enqueue(encoder.encode(': heartbeat\n\n'))
        } catch {
          clearInterval(heartbeat)
        }
      }, 30000)

      // 监听客户端断开连接
      request.signal.addEventListener('abort', () => {
        clearInterval(heartbeat)
        controller.close()
      })
    },
  })

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
      // 防止 Nginx/代理服务器缓冲响应
      'X-Accel-Buffering': 'no',
    },
  })
}

注意 : heartbeat\n\n——以冒号开头的行是 SSE 注释,客户端忽略,但可以防止代理服务器和负载均衡器因为长时间无数据而关闭"空闲"连接。

广播通知:基于发布订阅

真实场景里,SSE 连接需要能够接收服务器其他部分触发的事件。在单进程 Node.js 应用里,可以用内存中的事件发射器:

// lib/eventEmitter.ts
import { EventEmitter } from 'events'

// 全局单例(利用 global 对象避免 HMR 创建多实例)
const globalForEmitter = globalThis as unknown as { emitter: EventEmitter }

export const emitter = globalForEmitter.emitter ?? new EventEmitter()
if (process.env.NODE_ENV !== 'production') globalForEmitter.emitter = emitter

// 设置高上限避免内存泄漏警告(每个 SSE 连接添加一个 listener)
emitter.setMaxListeners(1000)
// app/api/notifications/route.ts
import { NextRequest } from 'next/server'
import { auth } from '@/auth'
import { emitter } from '@/lib/eventEmitter'

export async function GET(request: NextRequest) {
  const session = await auth()
  if (!session) return new Response('Unauthorized', { status: 401 })

  const userId = session.user.id
  const encoder = new TextEncoder()

  const stream = new ReadableStream({
    start(controller) {
      function sendNotification(notification: object) {
        const data = `data: ${JSON.stringify(notification)}\n\n`
        controller.enqueue(encoder.encode(data))
      }

      // 监听发给该用户的通知
      emitter.on(`notification:${userId}`, sendNotification)

      // 清理:连接断开时移除 listener
      request.signal.addEventListener('abort', () => {
        emitter.off(`notification:${userId}`, sendNotification)
        controller.close()
      })
    },
  })

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'X-Accel-Buffering': 'no',
    },
  })
}

从其他地方触发通知:

// app/api/admin/notify/route.ts
import { NextRequest, NextResponse } from 'next/server'
import { emitter } from '@/lib/eventEmitter'

export async function POST(request: NextRequest) {
  const { userId, message, type } = await request.json()

  emitter.emit(`notification:${userId}`, {
    id: crypto.randomUUID(),
    message,
    type,
    timestamp: new Date().toISOString(),
  })

  return NextResponse.json({ sent: true })
}

重要限制:内存事件发射器只在单进程内有效。如果你的应用运行多个实例(Kubernetes、多 Vercel 实例),不同进程间的 SSE 连接无法互相通信。多进程场景需要外部消息队列(Redis Pub/Sub、Upstash Redis)。

AI 流式输出:ChatGPT 模式

这是当前最热门的 SSE 使用场景。AI SDK 的 streamText 函数与 Next.js Route Handler 天然配合:

// app/api/chat/route.ts
import { streamText } from 'ai'
import { openai } from '@ai-sdk/openai'
import { NextRequest } from 'next/server'

export async function POST(request: NextRequest) {
  const { messages } = await request.json()

  const result = await streamText({
    model: openai('gpt-4o'),
    messages,
    system: 'You are a helpful assistant.',
  })

  // toDataStreamResponse() 返回 SSE 格式的 Response
  return result.toDataStreamResponse()
}
// app/chat/page.tsx
'use client'

import { useChat } from 'ai/react'

export default function ChatPage() {
  const { messages, input, handleInputChange, handleSubmit, isLoading } = useChat({
    api: '/api/chat',
  })

  return (
    <div className="flex flex-col h-screen">
      <div className="flex-1 overflow-y-auto p-4 space-y-4">
        {messages.map(m => (
          <div key={m.id} className={m.role === 'user' ? 'text-right' : 'text-left'}>
            <div className="inline-block p-3 rounded-lg bg-gray-100 max-w-xl">
              {m.content}
            </div>
          </div>
        ))}
        {isLoading && <div className="text-gray-400">AI 正在思考...</div>}
      </div>
      <form onSubmit={handleSubmit} className="p-4 border-t">
        <input
          value={input}
          onChange={handleInputChange}
          placeholder="输入消息..."
          className="w-full border rounded px-3 py-2"
        />
      </form>
    </div>
  )
}

Level 3 · 规范怎么定义的(资深)

进度条:长任务的实时反馈

SSE 非常适合报告长时间运行任务的进度:

// app/api/process/route.ts
import { NextRequest } from 'next/server'

export async function POST(request: NextRequest) {
  const { taskId } = await request.json()
  const encoder = new TextEncoder()

  function send(data: object) {
    return encoder.encode(`data: ${JSON.stringify(data)}\n\n`)
  }

  const stream = new ReadableStream({
    async start(controller) {
      try {
        // 模拟多阶段处理
        const steps = [
          { step: 1, label: '读取数据', total: 4 },
          { step: 2, label: '处理中', total: 4 },
          { step: 3, label: '生成报告', total: 4 },
          { step: 4, label: '完成', total: 4 },
        ]

        for (const step of steps) {
          controller.enqueue(send({ type: 'progress', ...step, percent: (step.step / step.total) * 100 }))
          // 模拟实际工作
          await new Promise(r => setTimeout(r, 1000))
        }

        controller.enqueue(send({ type: 'complete', taskId, result: '处理完成' }))
        controller.close()
      } catch (err) {
        controller.enqueue(send({ type: 'error', message: String(err) }))
        controller.close()
      }
    },
  })

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
    },
  })
}

客户端 EventSource 封装

// hooks/useSSE.ts
'use client'

import { useEffect, useRef, useState } from 'react'

interface SSEOptions<T> {
  url: string
  onMessage: (data: T) => void
  onError?: (error: Event) => void
  enabled?: boolean
}

export function useSSE<T>({ url, onMessage, onError, enabled = true }: SSEOptions<T>) {
  const [status, setStatus] = useState<'connecting' | 'connected' | 'closed'>('connecting')
  const esRef = useRef<EventSource | null>(null)

  useEffect(() => {
    if (!enabled) return

    const es = new EventSource(url)
    esRef.current = es

    es.addEventListener('open', () => setStatus('connected'))

    es.addEventListener('message', (e) => {
      try {
        onMessage(JSON.parse(e.data))
      } catch {
        console.error('Failed to parse SSE message:', e.data)
      }
    })

    es.addEventListener('error', (e) => {
      setStatus('closed')
      onError?.(e)
      // EventSource 会自动重连(指数退避),通常不需要手动处理
    })

    return () => {
      es.close()
      setStatus('closed')
    }
  }, [url, enabled])

  return { status, close: () => esRef.current?.close() }
}

WebSocket 的替代方案

Next.js Route Handlers 不支持 WebSocket。如果你的场景确实需要双向实时通信,有三个方向:

方向一:Pusher / Ably(托管 WebSocket 服务)

npm install pusher pusher-js
// app/api/pusher/auth/route.ts — Pusher 频道认证
import { NextRequest, NextResponse } from 'next/server'
import Pusher from 'pusher'
import { auth } from '@/auth'

const pusher = new Pusher({
  appId: process.env.PUSHER_APP_ID!,
  key: process.env.NEXT_PUBLIC_PUSHER_KEY!,
  secret: process.env.PUSHER_SECRET!,
  cluster: process.env.PUSHER_CLUSTER!,
})

export async function POST(request: NextRequest) {
  const session = await auth()
  if (!session) return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })

  const formData = await request.formData()
  const socketId = formData.get('socket_id') as string
  const channel = formData.get('channel_name') as string

  // 私有频道认证
  const authResponse = pusher.authorizeChannel(socketId, channel, {
    user_id: session.user.id,
    user_info: { name: session.user.name },
  })

  return NextResponse.json(authResponse)
}
// 从服务端触发事件
await pusher.trigger('private-chat', 'message', {
  text: 'Hello',
  userId: session.user.id,
})

Pusher/Ably 把 WebSocket 基础设施外包出去,你只需要调用 API 触发事件、客户端监听。代价是依赖第三方服务和额外费用。

方向二:自定义 WebSocket 服务器

适合完全控制的场景:创建一个独立的 Node.js WebSocket 服务器(使用 ws 库),与 Next.js 应用分开部署,通过内部 API 通信。

方向三:SSE + Server Actions 组合

对于"实时表单"类场景(用户提交操作,希望实时看到服务端处理进度),SSE + Server Actions 是优雅的纯 Next.js 方案:

// 客户端逻辑
'use client'

import { useState } from 'react'
import { useSSE } from '@/hooks/useSSE'
import { startProcessing } from './actions'

export function ProcessForm() {
  const [taskId, setTaskId] = useState<string | null>(null)
  const [progress, setProgress] = useState(0)
  const [done, setDone] = useState(false)

  useSSE({
    url: taskId ? `/api/task-progress/${taskId}` : '',
    enabled: !!taskId && !done,
    onMessage: (data: { type: string; percent: number }) => {
      if (data.type === 'progress') setProgress(data.percent)
      if (data.type === 'complete') setDone(true)
    },
  })

  async function handleSubmit() {
    const id = await startProcessing()
    setTaskId(id)
  }

  return (
    <div>
      <button onClick={handleSubmit}>开始处理</button>
      {taskId && (
        <div>
          <div>进度: {progress}%</div>
          {done && <div>处理完成!</div>}
        </div>
      )}
    </div>
  )
}
// app/actions.ts
'use server'
export async function startProcessing(): Promise<string> {
  const taskId = crypto.randomUUID()
  // 将任务 ID 存储到数据库或 Redis,供进度 API 读取
  // 异步启动后台任务...
  return taskId
}

SSE 是 Next.js 里实时功能的首选方案:原生支持、协议简单、客户端自动重连、基于标准 HTTP(防火墙和代理友好)。只有在真正需要客户端频繁双向通信时,才考虑 WebSocket 方案或托管服务。

Level 4 · 边界与陷阱(所有人)

陷阱1:内存事件发射器只在单进程内有效——多实例部署需要外部消息队列(Redis Pub/Sub)。

陷阱2:SSE 心跳(: heartbeat\n\n)以冒号开头,客户端会忽略——但它可以防止代理服务器因长时间无数据而关闭连接。

陷阱3:EventSource 会自动重连(指数退避)——通常不需要手动处理重连逻辑。

本章评分
4.6  / 5  (5 评分)

💬 留言讨论