第 33 章

WebSocket 实时通信

第三十三章:WebSocket 实时通信

2005 年,Jesse James Garrett 发表了那篇著名的文章,将 Google Maps 和 Gmail 背后的技术命名为 Ajax(Asynchronous JavaScript and XML)。这个技术让浏览器能够在不刷新整个页面的情况下向服务器发送请求并更新部分内容。Ajax 是革命性的,但它有一个根本性的局限:通信永远由客户端发起。服务器从不主动推送数据。

要让浏览器实时接收服务器更新,开发者发明了各种变通方案:短轮询(每隔几秒发一个 HTTP 请求)、长轮询(请求挂起直到服务器有数据,然后立刻发起下一个)、HTTP Streaming(保持连接开放,服务器持续写入数据)。这些方法或浪费资源,或实现复杂,或有各种兼容性问题。

2011 年,WebSocket 协议(RFC 6455)正式成为标准,彻底解决了这个问题。WebSocket 是第一个真正意义上的双向、全双工、持久化浏览器通信协议。

本章将带你深入 WebSocket 的每一个技术细节,然后用 Go 构建一个支持房间管理和广播的实时聊天服务器。


Level 1 · 你需要知道的

WebSocket vs SSE vs 长轮询:如何选择?

在实时 Web 应用中,你有三种主要选择:

长轮询(Long Polling)

客户端发送一个 HTTP 请求,服务器挂起这个请求直到有新数据,然后返回响应。客户端收到响应后立刻发起下一个请求。

客户端 ──GET /events──→ 服务器
                         等待...
客户端 ←──200 OK data── 服务器
客户端 ──GET /events──→ 服务器  ← 立刻发起下一个

适用场景:兼容性要求极高(支持 IE8)、消息频率低(每分钟几条)。 缺点:每条消息都有一次完整的 HTTP 握手开销,延迟高于 WebSocket,服务器连接数管理复杂。

SSE(Server-Sent Events)

服务器向客户端单向推送事件流。基于普通的 HTTP 持久连接,Content-Type: text/event-stream

客户端 ──GET /stream──→ 服务器
客户端 ←──event: msg── 服务器  ← 持续推送,不关闭连接
客户端 ←──event: msg── 服务器

适用场景:服务器到客户端的单向推送,如新闻 feed、股票行情、日志流。 优点:基于 HTTP,自动断线重连(浏览器内置),支持 CDN 缓存和代理。 缺点:单向——客户端无法通过同一连接向服务器发送数据;受 HTTP/1.1 连接数限制影响(HTTP/2 下没有此问题)。

WebSocket

升级自 HTTP 的全双工双向协议。握手后,客户端和服务器都可以随时向对方发送数据帧,没有请求-响应的约束。

客户端 ──HTTP Upgrade──→ 服务器
客户端 ←──101 Switching── 服务器  ← 协议升级成功
客户端 ←──────────────── 服务器  ← 任意方向随时发送
客户端 ──────────────────→ 服务器

适用场景:聊天、在线游戏、协同编辑、实时仪表板、金融交易终端。 优点:真正的双向实时通信,延迟最低,单个连接处理双向流量。 缺点:有状态连接,横向扩展需要额外工作(见 Level 4);某些企业防火墙和代理可能阻断 WebSocket。

选择原则:如果你只需要服务器推送,SSE 更简单;如果需要双向通信或极低延迟,选 WebSocket;如果必须兼容极老的浏览器或网络环境,用长轮询作为 fallback。

HTTP Upgrade 握手

WebSocket 连接从一个 HTTP/1.1 请求开始,通过特殊的 Upgrade 头实现协议切换:

客户端发送(HTTP 请求):

GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

服务器响应(101 Switching Protocols):

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

Sec-WebSocket-Key 是客户端随机生成的 Base64 编码随机数(16 字节)。服务器将这个值与固定魔术字符串 258EAFA5-E914-47DA-95CA-C5AB0DC85B11(RFC 6455 定义)拼接,计算 SHA-1 哈希,再 Base64 编码,得到 Sec-WebSocket-Accept。这个机制不是安全措施,而是用来确认服务器确实理解 WebSocket 协议,防止浏览器的 HTTP 缓存返回一个缓存的 101 响应。

握手成功后,底层 TCP 连接保持开放,但不再使用 HTTP 协议,而是使用 WebSocket 帧格式进行通信。


Level 2 · 原理深入

gorilla/websocket 内部实现

github.com/gorilla/websocket 是 Go 生态中最广泛使用的 WebSocket 库。它封装了 RFC 6455 定义的帧协议,提供了清晰的读写 API。

WebSocket 帧结构:

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)    |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - -+
|   Extended payload length continued, if payload len == 127   |
+ - - - - - - - - - - - - - - -+-------------------------------+
|                               | Masking-key, if MASK set to 1|
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------- - - - - - - - - - - - - - - -+
:                     Payload Data continued ...                :
+---------------------------------------------------------------+

关键字段:

gorilla/websocketConn 类型封装了这些细节,但有一个关键约束:读写操作不是并发安全的,必须由单个 goroutine 执行。

连接生命周期管理

一个 WebSocket 连接的生命周期:

建立(Upgrade)→ 正常通信 → 关闭握手 → TCP 断开

正常关闭:任意一方发送关闭帧(opcode=8),携带可选的状态码和原因字符串。对方收到后回发关闭帧,然后关闭 TCP 连接。这是"优雅关闭"(graceful close)。

异常断开:网络中断、进程崩溃等情况下,TCP 连接直接断开,没有关闭握手。这就是为什么需要心跳机制——Ping/Pong。

gorilla/websocket 中:

// 设置读取截止时间——如果这段时间内没有收到数据(包括 Pong),认为连接断开
conn.SetReadDeadline(time.Now().Add(60 * time.Second))

// 设置 Pong 处理器:每次收到 Pong 时,延长读取截止时间(重置心跳计时器)
conn.SetPongHandler(func(string) error {
    conn.SetReadDeadline(time.Now().Add(60 * time.Second))
    return nil
})

并发读写:单 Goroutine per 方向

gorilla/websocket 的文档明确说明:Connections support one concurrent reader and one concurrent writer。这意味着:

这是因为底层的 conn.Write() 会直接写入 TCP 缓冲区,多个 goroutine 并发写入会导致帧交错(frame interleaving),对端会收到损坏的数据。

正确的并发模式:

// 读 goroutine:只负责读取
go func() {
    for {
        msgType, data, err := conn.ReadMessage()
        // ...处理消息...
    }
}()

// 写 goroutine:只负责写入,通过 channel 接收待发送消息
go func() {
    for msg := range sendChan {
        conn.WriteMessage(msg.Type, msg.Data)
    }
}()

// 主线程只向 sendChan 发送,不直接调用 conn.WriteMessage
sendChan <- Message{Type: websocket.TextMessage, Data: []byte("hello")}

Ping/Pong 心跳协议

WebSocket RFC 定义了 Ping(opcode=9)和 Pong(opcode=10)控制帧用于保活。浏览器端会自动响应服务器的 Ping,发回 Pong。

心跳的目的:

  1. 检测半开连接(half-open connection):TCP 连接看起来还开着,但对端已经消失(网络中断、进程崩溃),此时没有心跳的话,服务器永远不知道连接已经失效。
  2. 穿越 NAT/防火墙:许多 NAT 设备会在一段时间无流量后删除映射表项,定期 Ping 可以保持映射活跃。

实现心跳的正确方式:

const (
    writeWait  = 10 * time.Second  // 写操作超时
    pongWait   = 60 * time.Second  // 等待 Pong 的最长时间
    pingPeriod = (pongWait * 9) / 10  // Ping 发送间隔(pongWait 的 90%)
)

连接注册表模式(Connection Registry)

管理大量 WebSocket 连接的标准模式是使用一个中央注册表,通过 channel 进行所有状态变更:

connect → register channel → Hub goroutine → connections map
disconnect → unregister channel → Hub goroutine → 删除 + 清理
broadcast → broadcast channel → Hub goroutine → 遍历所有连接发送

Hub goroutine 是整个系统的单点,它是唯一可以修改 connections map 的 goroutine,因此不需要锁。这是 Go 并发设计的精髓之一:不通过共享内存来通信,而通过通信来共享内存(Don't communicate by sharing memory; share memory by communicating)。


Level 3 · 代码实战

构建实时聊天服务器

我们构建一个支持多房间的实时聊天服务器,完整实现连接管理、消息广播、优雅断开和心跳机制。

消息类型定义(message.go):

package main

import (
    "encoding/json"
    "time"
)

// MessageType 定义消息类型
type MessageType string

const (
    MsgTypeChat   MessageType = "chat"   // 普通聊天消息
    MsgTypeSystem MessageType = "system" // 系统通知(加入/退出)
    MsgTypeError  MessageType = "error"  // 错误消息
    MsgTypePing   MessageType = "ping"   // 客户端心跳
)

// IncomingMessage 是客户端发来的消息结构
type IncomingMessage struct {
    Type    MessageType `json:"type"`
    Room    string      `json:"room"`
    Content string      `json:"content"`
}

// OutgoingMessage 是发往客户端的消息结构
type OutgoingMessage struct {
    Type      MessageType `json:"type"`
    Room      string      `json:"room"`
    Sender    string      `json:"sender"`
    Content   string      `json:"content"`
    Timestamp time.Time   `json:"timestamp"`
}

func (m OutgoingMessage) Encode() ([]byte, error) {
    return json.Marshal(m)
}

客户端连接管理(client.go):

package main

import (
    "log"
    "time"

    "github.com/gorilla/websocket"
)

const (
    writeWait      = 10 * time.Second
    pongWait       = 60 * time.Second
    pingPeriod     = (pongWait * 9) / 10
    maxMessageSize = 4096 // 每条消息最大 4KB
)

// Client 代表一个 WebSocket 客户端连接
type Client struct {
    id   string
    room string
    hub  *Hub
    conn *websocket.Conn
    send chan []byte // 待发送消息的缓冲 channel
}

// readPump 在独立的 goroutine 中运行,专门负责读取客户端消息
// gorilla/websocket 要求每个方向只有一个并发 goroutine
func (c *Client) readPump() {
    defer func() {
        // 读取结束(连接断开)时,向 Hub 注销并关闭连接
        c.hub.unregister <- c
        c.conn.Close()
    }()

    c.conn.SetReadLimit(maxMessageSize)
    c.conn.SetReadDeadline(time.Now().Add(pongWait))

    // Pong 处理器:每次收到 Pong,重置读取超时
    c.conn.SetPongHandler(func(string) error {
        c.conn.SetReadDeadline(time.Now().Add(pongWait))
        return nil
    })

    for {
        _, rawMsg, err := c.conn.ReadMessage()
        if err != nil {
            // 区分正常关闭和异常断开
            if websocket.IsUnexpectedCloseError(err,
                websocket.CloseGoingAway,
                websocket.CloseAbnormalClosure,
            ) {
                log.Printf("client %s unexpected close: %v", c.id, err)
            }
            break
        }

        // 解析客户端消息
        var incoming IncomingMessage
        if err := json.Unmarshal(rawMsg, &incoming); err != nil {
            log.Printf("client %s invalid message: %v", c.id, err)
            continue
        }

        // 处理 ping(客户端自定义心跳)
        if incoming.Type == MsgTypePing {
            continue
        }

        // 构建广播消息并发送给 Hub
        outgoing := OutgoingMessage{
            Type:      MsgTypeChat,
            Room:      c.room,
            Sender:    c.id,
            Content:   incoming.Content,
            Timestamp: time.Now(),
        }

        encoded, err := outgoing.Encode()
        if err != nil {
            continue
        }

        c.hub.broadcast <- BroadcastRequest{
            Room:    c.room,
            Payload: encoded,
        }
    }
}

// writePump 在独立的 goroutine 中运行,专门负责向客户端发送消息
// 同时承担 Ping 心跳发送的职责
func (c *Client) writePump() {
    ticker := time.NewTicker(pingPeriod)
    defer func() {
        ticker.Stop()
        c.conn.Close()
    }()

    for {
        select {
        case message, ok := <-c.send:
            c.conn.SetWriteDeadline(time.Now().Add(writeWait))

            if !ok {
                // Hub 关闭了 send channel,向客户端发送关闭帧
                c.conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }

            // 获取一个写入器,类型为文本帧
            w, err := c.conn.NextWriter(websocket.TextMessage)
            if err != nil {
                return
            }
            w.Write(message)

            // 批量写入:将 send channel 中积压的消息一次性发出
            // 这可以显著减少系统调用次数
            n := len(c.send)
            for i := 0; i < n; i++ {
                w.Write([]byte{'\n'})
                w.Write(<-c.send)
            }

            if err := w.Close(); err != nil {
                return
            }

        case <-ticker.C:
            // 发送 Ping 帧
            c.conn.SetWriteDeadline(time.Now().Add(writeWait))
            if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}

Hub:连接注册表和消息路由(hub.go):

package main

import (
    "log"
    "sync"
    "time"
)

// BroadcastRequest 描述一次广播操作
type BroadcastRequest struct {
    Room    string // 目标房间,空字符串表示全局广播
    Payload []byte
}

// Hub 管理所有客户端连接,是连接注册表的核心
// 所有对 rooms map 的修改都在 Hub 的 run goroutine 中进行,无需锁
type Hub struct {
    // rooms 是房间名到客户端集合的映射
    // map[roomName]map[*Client]bool
    rooms map[string]map[*Client]bool

    register   chan *Client
    unregister chan *Client
    broadcast  chan BroadcastRequest

    // 统计信息
    mu    sync.RWMutex
    stats HubStats
}

type HubStats struct {
    TotalConnections  int64
    ActiveConnections int64
    MessagesSent      int64
}

func NewHub() *Hub {
    return &Hub{
        rooms:      make(map[string]map[*Client]bool),
        register:   make(chan *Client, 256),
        unregister: make(chan *Client, 256),
        broadcast:  make(chan BroadcastRequest, 1024),
    }
}

// Run 是 Hub 的主循环,在单个 goroutine 中运行
// 这保证了对 rooms 的所有访问都是串行的,不需要任何锁
func (h *Hub) Run() {
    for {
        select {
        case client := <-h.register:
            h.addClient(client)

        case client := <-h.unregister:
            h.removeClient(client)

        case req := <-h.broadcast:
            h.broadcastToRoom(req)
        }
    }
}

func (h *Hub) addClient(c *Client) {
    if h.rooms[c.room] == nil {
        h.rooms[c.room] = make(map[*Client]bool)
    }
    h.rooms[c.room][c] = true

    h.mu.Lock()
    h.stats.TotalConnections++
    h.stats.ActiveConnections++
    h.mu.Unlock()

    log.Printf("client %s joined room %s (total in room: %d)",
        c.id, c.room, len(h.rooms[c.room]))

    // 通知房间内其他人
    joinMsg := OutgoingMessage{
        Type:      MsgTypeSystem,
        Room:      c.room,
        Sender:    "system",
        Content:   c.id + " joined the room",
        Timestamp: time.Now(),
    }
    if encoded, err := joinMsg.Encode(); err == nil {
        h.broadcastToRoom(BroadcastRequest{Room: c.room, Payload: encoded})
    }
}

func (h *Hub) removeClient(c *Client) {
    if clients, ok := h.rooms[c.room]; ok {
        if _, exists := clients[c]; exists {
            delete(clients, c)
            close(c.send) // 关闭 send channel,触发 writePump 退出

            // 如果房间空了,删除房间
            if len(clients) == 0 {
                delete(h.rooms, c.room)
                log.Printf("room %s is now empty, removed", c.room)
            }
        }
    }

    h.mu.Lock()
    h.stats.ActiveConnections--
    h.mu.Unlock()

    log.Printf("client %s left room %s", c.id, c.room)
}

func (h *Hub) broadcastToRoom(req BroadcastRequest) {
    clients, ok := h.rooms[req.Room]
    if !ok {
        return
    }

    for client := range clients {
        select {
        case client.send <- req.Payload:
            h.mu.Lock()
            h.stats.MessagesSent++
            h.mu.Unlock()
        default:
            // send channel 已满,说明客户端消费太慢,强制断开
            log.Printf("client %s send buffer full, disconnecting", client.id)
            delete(clients, client)
            close(client.send)
        }
    }
}

func (h *Hub) Stats() HubStats {
    h.mu.RLock()
    defer h.mu.RUnlock()
    return h.stats
}

HTTP 处理器和主程序(main.go):

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "strings"

    "github.com/google/uuid"
    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    // 生产环境应该验证 Origin 头
    CheckOrigin: func(r *http.Request) bool {
        // 示例:只允许来自特定域名的连接
        origin := r.Header.Get("Origin")
        return strings.HasPrefix(origin, "https://example.com") ||
               origin == "http://localhost:3000"
    },
}

var hub = NewHub()

func wsHandler(w http.ResponseWriter, r *http.Request) {
    // 从 URL 路径获取房间名:/ws/{room}
    room := strings.TrimPrefix(r.URL.Path, "/ws/")
    if room == "" {
        room = "general"
    }

    // 升级 HTTP 连接为 WebSocket
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Printf("upgrade error: %v", err)
        return
    }

    // 创建客户端
    client := &Client{
        id:   uuid.New().String()[:8], // 简化的 ID
        room: room,
        hub:  hub,
        conn: conn,
        send: make(chan []byte, 256), // 256 条消息的缓冲
    }

    // 向 Hub 注册
    hub.register <- client

    // 启动读写 goroutine(每个连接 2 个 goroutine)
    go client.writePump()
    go client.readPump()
}

func statsHandler(w http.ResponseWriter, r *http.Request) {
    stats := hub.Stats()
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(stats)
}

func main() {
    go hub.Run()

    http.HandleFunc("/ws/", wsHandler)
    http.HandleFunc("/stats", statsHandler)
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        fmt.Fprintln(w, "Chat server running. Connect to /ws/{room}")
    })

    log.Println("Chat server listening on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

客户端断线重连逻辑(JavaScript):

// 实现指数退避的断线重连
class ReconnectingWebSocket {
    constructor(url) {
        this.url = url;
        this.reconnectDelay = 1000;  // 初始重连延迟 1s
        this.maxDelay = 30000;       // 最大重连延迟 30s
        this.connect();
    }

    connect() {
        this.ws = new WebSocket(this.url);

        this.ws.onopen = () => {
            console.log('Connected');
            this.reconnectDelay = 1000; // 重置延迟
        };

        this.ws.onmessage = (event) => {
            const msg = JSON.parse(event.data);
            this.onmessage(msg);
        };

        this.ws.onclose = (event) => {
            if (event.wasClean) {
                console.log('Connection closed cleanly');
                return;
            }
            // 非正常断开,触发重连
            console.log(`Connection lost, reconnecting in ${this.reconnectDelay}ms`);
            setTimeout(() => {
                this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxDelay);
                this.connect();
            }, this.reconnectDelay);
        };

        this.ws.onerror = (error) => {
            console.error('WebSocket error:', error);
        };
    }

    send(data) {
        if (this.ws.readyState === WebSocket.OPEN) {
            this.ws.send(JSON.stringify(data));
        }
    }

    onmessage(msg) {} // 由子类覆盖
}

Level 4 · 进阶与边界

跨服务器扩展:Redis Pub/Sub 消息路由

单台服务器的 WebSocket Hub 无法扩展到多台机器——客户端 A 连接到服务器 1,客户端 B 连接到服务器 2,服务器 1 的 Hub 不知道服务器 2 上的连接。

解决方案:使用 Redis Pub/Sub 作为消息总线,每台服务器既订阅全局频道,也将本地广播消息发布到 Redis:

import "github.com/redis/go-redis/v9"

type DistributedHub struct {
    *Hub
    redis   *redis.Client
    channel string // Redis Pub/Sub 频道名
}

func NewDistributedHub(redisAddr string) *DistributedHub {
    rdb := redis.NewClient(&redis.Options{Addr: redisAddr})
    h := &DistributedHub{
        Hub:     NewHub(),
        redis:   rdb,
        channel: "chat:broadcast",
    }
    go h.subscribeRedis()
    return h
}

// subscribeRedis 订阅 Redis 频道,将收到的消息广播给本地连接
func (h *DistributedHub) subscribeRedis() {
    ctx := context.Background()
    sub := h.redis.Subscribe(ctx, h.channel)
    ch := sub.Channel()

    for msg := range ch {
        var req BroadcastRequest
        if err := json.Unmarshal([]byte(msg.Payload), &req); err != nil {
            continue
        }
        // 广播给本服务器上的连接
        h.Hub.broadcast <- req
    }
}

// BroadcastGlobal 将消息发布到 Redis,所有服务器实例都会收到
func (h *DistributedHub) BroadcastGlobal(req BroadcastRequest) {
    data, _ := json.Marshal(req)
    h.redis.Publish(context.Background(), h.channel, data)
}

这个架构中,每台服务器保持自己的本地 Hub,负责管理直接连接到自己的客户端。当需要向某个房间广播时,将消息发布到 Redis,所有服务器订阅并各自向本地的该房间成员发送。

注意:这个方案对于房间成员分散在多台服务器的场景有效,但如果需要知道某个房间有哪些成员(例如显示在线人数),还需要将成员信息也存储在 Redis 中(使用 Set 数据结构)。

WebSocket 压缩:permessage-deflate

对于文本消息(JSON),压缩可以显著减少带宽。RFC 7692 定义了 WebSocket 的 permessage-deflate 扩展,gorilla/websocket 直接支持:

var upgrader = websocket.Upgrader{
    EnableCompression: true, // 启用 permessage-deflate
}

// 客户端和服务器在握手时协商是否启用压缩
// 如果都支持,后续消息帧会使用 deflate 压缩

压缩有 CPU 成本。在高并发场景下,需要 benchmark 确认压缩的收益(带宽节省)是否值得 CPU 开销。对于小消息(< 100 字节),压缩后可能反而更大,因为 deflate 头部本身就有几个字节的开销。一般规则:消息平均 > 512 字节时,压缩才有明显收益。

每连接速率限制

防止恶意客户端发送海量消息,需要在每个连接层面实施速率限制:

import "golang.org/x/time/rate"

type Client struct {
    // ... 其他字段 ...
    limiter *rate.Limiter
}

// 创建客户端时初始化限速器:允许每秒 10 条消息,突发最多 20 条
client := &Client{
    limiter: rate.NewLimiter(10, 20),
    // ...
}

// 在 readPump 中处理消息前检查速率
func (c *Client) readPump() {
    for {
        _, rawMsg, err := c.conn.ReadMessage()
        // ...

        // 速率检查:如果超过限制,丢弃消息或断开连接
        if !c.limiter.Allow() {
            log.Printf("client %s rate limited", c.id)
            // 选项1:静默丢弃
            continue
            // 选项2:断开连接(更严格)
            // break
        }

        // 正常处理消息
    }
}

使用 k6 进行 WebSocket 负载测试

// k6 WebSocket 负载测试脚本
import ws from 'k6/ws';
import { check } from 'k6';

export let options = {
    vus: 1000,        // 1000 个并发虚拟用户
    duration: '60s',  // 持续 60 秒
};

export default function() {
    const url = 'ws://localhost:8080/ws/loadtest';

    const res = ws.connect(url, {}, function(socket) {
        socket.on('open', () => {
            // 每秒发送一条消息
            socket.setInterval(() => {
                socket.send(JSON.stringify({
                    type: 'chat',
                    room: 'loadtest',
                    content: 'hello from k6',
                }));
            }, 1000);
        });

        socket.on('message', (data) => {
            const msg = JSON.parse(data);
            check(msg, { 'message received': (m) => m.type !== undefined });
        });

        socket.on('error', (e) => {
            console.error('WebSocket error:', e);
        });

        // 30 秒后关闭连接
        socket.setTimeout(() => {
            socket.close();
        }, 30000);
    });

    check(res, { 'status is 101': (r) => r && r.status === 101 });
}

Goroutine 泄漏检测

WebSocket 服务器每个连接通常有 2 个 goroutine(readPump + writePump)。如果连接没有被正确清理,goroutine 会泄漏,内存和 goroutine 数量会持续增长直到 OOM。

使用 goleak 库在测试中检测泄漏:

import "go.uber.org/goleak"

func TestNoGoroutineLeak(t *testing.T) {
    defer goleak.VerifyNone(t) // 测试结束时检查是否有泄漏的 goroutine

    server := httptest.NewServer(http.HandlerFunc(wsHandler))
    defer server.Close()

    // 建立连接
    wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/ws/test"
    conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
    require.NoError(t, err)

    // 正常关闭
    conn.WriteMessage(websocket.CloseMessage,
        websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
    conn.Close()

    // 给 goroutine 一点时间退出
    time.Sleep(100 * time.Millisecond)
    // goleak.VerifyNone 会在这里检查,如果有遗留 goroutine 则 fail
}

防止 goroutine 泄漏的关键:确保 readPump 退出时触发 hub.unregister <- clientremoveClient 关闭 c.send channel,writePump 监听 channel 关闭后退出。形成完整的退出链。

WebSocket 是构建实时应用的强大工具,但它带来的状态性和连接管理复杂度是真实的工程挑战。掌握了本章的设计模式——Hub 注册表、单方向单 goroutine、Ping/Pong 心跳、分布式 Redis 路由——你就拥有了在 Go 中构建高并发实时服务的完整工具箱。

本章评分
4.7  / 5  (3 评分)

💬 留言讨论