第 28 章

Pub/Sub 与 Sharded Pub/Sub

第28章 Pub/Sub 与 Sharded Pub/Sub:消息推送架构

28.1 Pub/Sub 的定位与价值

Redis Pub/Sub(发布/订阅)是一种消息广播机制,遵循"发布者不知道订阅者是谁"的解耦原则。与 Redis Streams(持久化消息队列)不同,Pub/Sub 是**即发即忘(fire-and-forget)**模型:发布者将消息推送到 channel,所有当前在线的订阅者立即收到消息;发布时不在线的订阅者将永远错过这条消息。

适用场景

不适用场景


28.2 标准 Pub/Sub 实现原理

28.2.1 服务端数据结构

Redis 用两个核心数据结构管理 Pub/Sub 状态(pubsub.c):

/* server 全局 */
dict *pubsub_channels;   /* key: channel名(robj) → value: 订阅者链表(list of clients) */
list *pubsub_patterns;   /* 模式订阅列表,每个元素是 pubsubPattern 结构体 */

/* 每个 client */
dict *pubsub_channels;   /* 该client订阅的 channel 集合(用于 O(1) 查找)*/
list *pubsub_patterns;   /* 该client的模式订阅列表 */

/* pubsubPattern 结构体 */
typedef struct pubsubPattern {
    client *client;   /* 订阅者 client */
    robj *pattern;    /* 订阅模式(如 news.* )*/
} pubsubPattern;

28.2.2 SUBSCRIBE 流程

# 客户端A订阅两个channel
SUBSCRIBE news sports

内部步骤:

  1. server.pubsub_channels["news"] 链表中追加 clientA
  2. server.pubsub_channels["sports"] 链表中追加 clientA
  3. 将 "news"、"sports" 加入 clientA->pubsub_channels 字典
  4. 向 clientA 发送订阅确认响应(每个 channel 一条)
# 订阅确认消息(RESP 格式)
*3\r\n
$9\r\nsubscribe\r\n       ← 消息类型
$4\r\nnews\r\n            ← channel 名
:1\r\n                   ← 当前订阅 channel 总数

28.2.3 PUBLISH 流程

PUBLISH news "Breaking: Redis 8 released"

内部步骤:

  1. server.pubsub_channels["news"] 中查找订阅者链表
  2. 遍历链表,向每个订阅者 client 发送消息
  3. 遍历 server.pubsub_patterns,用 glob 匹配找出匹配的模式订阅者
  4. 向匹配的模式订阅者发送 pmessage 消息
  5. 返回接收消息的订阅者总数(整数)
# 订阅者收到的消息(RESP 格式)
*3\r\n
$7\r\nmessage\r\n         ← 消息类型
$4\r\nnews\r\n            ← channel 名
$28\r\nBreaking: Redis 8 released\r\n  ← 消息内容

28.2.4 PSUBSCRIBE 模式订阅

# 模式订阅(glob 语法)
PSUBSCRIBE news.*          # 匹配 news.cn、news.us 等
PSUBSCRIBE user:[0-9]*     # 匹配 user:1000、user:9999 等
PSUBSCRIBE *               # 匹配所有 channel

模式订阅者收到的消息格式与普通订阅者不同(多了 pattern 字段):

*4\r\n
$8\r\npmessage\r\n         ← 消息类型
$6\r\nnews.*\r\n           ← 匹配的模式
$7\r\nnews.cn\r\n          ← 实际 channel 名
$15\r\nHello from China\r\n ← 消息内容

28.2.5 订阅者的连接状态约束

订阅状态下,客户端只能接受以下命令:

其他命令返回错误:ERR Command not allowed inside a subscription context。这意味着订阅 channel 的连接不能复用于普通命令,生产中通常需要单独的 Pub/Sub 连接池。


28.3 Keyspace Notifications(键空间通知)

28.3.1 配置与订阅

键空间通知是 Pub/Sub 的内置应用:Redis 将自身的键操作事件发布到特殊 channel,客户端订阅后可以监听数据变化。

# redis.conf 配置(默认关闭)
notify-keyspace-events ""   # 关闭(默认)
notify-keyspace-events KEA  # 开启所有事件

# 配置字符说明
# K = 键空间事件(__keyspace@<db>__:<key>)
# E = 键事件(__keyevent@<db>__:<event>)
# g = 通用命令(DEL, EXPIRE, RENAME...)
# $ = String 命令
# l = List 命令
# s = Set 命令
# h = Hash 命令
# z = ZSet 命令
# x = 过期事件
# d = 模块键空间事件
# A = 所有事件(g$lshzxd 的别名)
# 运行时开启(不需要重启)
CONFIG SET notify-keyspace-events KEA

# 监听 db0 中所有 key 过期事件
SUBSCRIBE __keyevent@0__:expired

# 监听 db0 中 user:1000 这个 key 的所有操作
SUBSCRIBE __keyspace@0__:user:1000

28.3.2 两种通知 channel 的区别

channel 类型 格式 收到的消息 用途
keyspace __keyspace@<db>__:<key> 操作类型(如 "expired") 监听某个 key 的所有变化
keyevent __keyevent@<db>__:<event> key 名 监听某类事件影响的所有 key
# 场景:key "order:12345" 过期
# keyspace 通知(谁监听这个key的变化?)
# channel: __keyspace@0__:order:12345
# message: "expired"

# keyevent 通知(谁监听过期事件?)
# channel: __keyevent@0__:expired
# message: "order:12345"

28.3.3 性能影响

键空间通知对性能有显著影响,原因:

  1. 每个修改操作(SET/DEL/EXPIRE 等)都会触发 notifyKeyspaceEvent()
  2. 该函数遍历 server.pubsub_channels 查找订阅者
  3. 无论是否有订阅者,查找本身有开销

实测:开启 notify-keyspace-events A(所有事件)后,高频写入场景 QPS 下降约 15~25%。

建议:仅开启业务必需的事件类型,例如只开启 Kx(只监听 key 过期):

CONFIG SET notify-keyspace-events Kx

28.4 Pub/Sub 的集群问题

28.4.1 标准 Pub/Sub 在 Cluster 中的行为

Redis Cluster 中使用 PUBLISH 时,消息会广播到所有节点

节点A (master)    节点B (master)    节点C (master)
    ↑                  ↑                 ↑
PUBLISH "news" "hello"
    ↓ 广播到所有节点
节点A → 发送给A上的订阅者
节点B → 发送给B上的订阅者(即使B不是发布节点)
节点C → 发送给C上的订阅者

问题

原因:标准 Pub/Sub 不属于 keyspace 操作,不能按槽路由,因此只能全广播。

28.4.2 Cluster 中的 SUBSCRIBE 连接问题

在 Cluster 中,客户端必须连接到正确的节点才能接收消息:

# 错误做法(可能连到错误节点)
r = redis.Redis(host='redis-cluster-node1')
r.subscribe('news')  # 如果消息发到了另一个节点,收不到

# 正确做法:连接到集群的任意节点,都能收到(因为Cluster会广播)
# 但这不是真正高效的方案

28.5 Redis 7 Sharded Pub/Sub

28.5.1 设计动机

Redis 7.0(2022年4月)引入了 Sharded Pub/Sub,解决 Cluster 全广播问题。核心思想:将 channel 按哈希槽归属到特定节点,消息只在负责该槽的节点上分发

28.5.2 新命令集

# 分片订阅(连接到负责 channel 所在槽的节点)
SSUBSCRIBE {user:1}:events

# 分片取消订阅
SUNSUBSCRIBE {user:1}:events

# 分片发布(发到 channel 对应槽的节点)
SPUBLISH {user:1}:events "login"

# 返回当前节点的分片订阅 channel 数量
PUBSUB SHARDCHANNELS
PUBSUB SHARDNUMSUB channel1 channel2

28.5.3 工作原理

Redis Cluster(3个主节点)
┌──────────────────────────────────────────────────────┐
│ 节点A:负责槽 0-5460                                  │
│   SSUBSCRIBE {user:1}:events 的订阅者在这里           │
│   SPUBLISH {user:1}:events → 只在节点A分发            │
├──────────────────────────────────────────────────────┤
│ 节点B:负责槽 5461-10922                              │
│   其他 channel 的订阅者在这里                          │
├──────────────────────────────────────────────────────┤
│ 节点C:负责槽 10923-16383                             │
└──────────────────────────────────────────────────────┘

步骤详解:

  1. 计算 {user:1}:events 的哈希槽:CRC16("{user:1}") % 16384 = 2000(假设)
  2. 槽 2000 由节点A负责
  3. 订阅者必须连接到节点A执行 SSUBSCRIBE {user:1}:events
  4. 发布者执行 SPUBLISH {user:1}:events "login",客户端库自动路由到节点A
  5. 节点A将消息分发给所有在本节点订阅 {user:1}:events 的客户端
  6. 不向其他节点广播

网络开销:O(1) vs 标准 Pub/Sub 的 O(N_nodes)

28.5.4 哈希标签(Hash Tag)的关键作用

Sharded Pub/Sub 要求同一业务的 channel 落到同一个槽,需要合理使用哈希标签:

# 同一用户的所有 channel 落到同一槽(用户ID作为哈希标签)
SSUBSCRIBE {user:1}:chat
SSUBSCRIBE {user:1}:notification
SSUBSCRIBE {user:1}:presence

# 同一房间的所有 channel 落到同一槽
SSUBSCRIBE {room:100}:message
SSUBSCRIBE {room:100}:system

注意:哈希标签 {...} 中的内容决定槽,{user:1}{user:2} 落到不同槽(不同节点),这是正确的设计——不同用户的消息彼此隔离。

28.5.5 客户端库支持

Sharded Pub/Sub 需要客户端库的特别支持(自动路由到正确节点):

# redis-py 4.3+
from redis.cluster import RedisCluster

rc = RedisCluster(
    host='redis-cluster',
    port=6379,
    skip_full_coverage_check=True
)

# 使用分片 Pub/Sub(自动路由)
pubsub = rc.pubsub()
pubsub.ssubscribe('{user:1}:events')

# 接收消息
for message in pubsub.listen():
    if message['type'] == 'smessage':
        print(f"Received: {message['data']}")

# 发布
rc.spublish('{user:1}:events', 'user_login')
// go-redis v9
rdb := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs: []string{"localhost:7000", "localhost:7001", "localhost:7002"},
})

pubsub := rdb.SSubscribe(ctx, "{user:1}:events")
defer pubsub.Close()

ch := pubsub.Channel()
for msg := range ch {
    fmt.Printf("Received: %s\n", msg.Payload)
}

28.6 Pub/Sub vs Stream:如何选择

28.6.1 核心区别

特性 Pub/Sub Sharded Pub/Sub Redis Stream
消息持久化
离线消息 丢失 丢失 保留(Consumer Group)
消息 ACK 是(XACK)
消费者组 是(XGROUP)
Cluster 广播 全节点 单节点 单节点(按槽)
消息积压处理 有(MAXLEN 截断)
延迟 极低(< 1ms) 极低 低(1~5ms)
适用场景 实时通知 大规模实时通知 可靠消息队列

28.6.2 选型建议

是否需要消息保证(不丢失/至少一次交付)?
├── 是 → Redis Stream
└── 否(实时性优先)
    ├── Redis Cluster 中?
    │   ├── 是 → Sharded Pub/Sub(节省带宽)
    │   └── 否(单机/主从)→ 标准 Pub/Sub
    └── channel 数量 > 1000 且有模式订阅?
        → 注意:大量 PSUBSCRIBE 会使 PUBLISH 变慢(O(N_patterns))

28.7 生产环境实践

28.7.1 连接管理

# 生产中 Pub/Sub 连接应独立于命令连接
import redis
import threading

class PubSubManager:
    def __init__(self, redis_url):
        self.r = redis.from_url(redis_url)
        self.pubsub = self.r.pubsub()
        self._thread = None

    def subscribe(self, channels, handler):
        self.pubsub.subscribe(**{ch: handler for ch in channels})
        # 在独立线程中监听消息
        self._thread = self.pubsub.run_in_thread(
            sleep_time=0.001,        # 轮询间隔
            daemon=True
        )

    def publish(self, channel, message):
        # 发布用独立连接(不影响订阅连接)
        self.r.publish(channel, message)

    def unsubscribe(self, *channels):
        self.pubsub.unsubscribe(*channels)

    def close(self):
        if self._thread:
            self._thread.stop()
        self.pubsub.close()

28.7.2 心跳与连接保活

Pub/Sub 连接可能因网络设备超时(如 NAT 设备、负载均衡器)而被静默关闭。使用 PING 保活:

# 服务端配置
CONFIG SET tcp-keepalive 60    # 每60秒发送 TCP keepalive

# 客户端主动 PING
# redis-py 的 pubsub.run_in_thread 会自动处理 PING 响应
# 手动 PING
pubsub.ping("keepalive")
# 收到响应:{'type': 'pong', 'pattern': None, 'channel': b'keepalive', 'data': b'keepalive'}

28.7.3 监控 Pub/Sub 状态

# 查看当前 Pub/Sub 状态
PUBSUB CHANNELS            # 所有活跃 channel(有订阅者的)
PUBSUB CHANNELS news.*     # 模式过滤
PUBSUB NUMSUB news sports  # 每个 channel 的订阅者数量
PUBSUB NUMPAT             # 模式订阅总数

# 通过 INFO 查看
INFO stats | grep pubsub
# pubsub_channels:5
# pubsub_patterns:2

28.7.4 消息序列化建议

Pub/Sub 消息是字节串,建议使用轻量级序列化格式:

import json
import msgpack

# JSON(可读,开销大)
r.publish('channel', json.dumps({'event': 'login', 'user_id': 1000}))

# MessagePack(紧凑,高效)
r.publish('channel', msgpack.packb({'event': 'login', 'user_id': 1000}))

# 简单场景:直接字符串
r.publish('channel', f'login:{user_id}')

28.8 小结

Redis Pub/Sub 提供了简单高效的消息广播机制。标准 Pub/Sub 适合单机和主从架构;在 Cluster 中,Redis 7 的 Sharded Pub/Sub 通过槽路由消除了全节点广播,将网络开销从 O(N_nodes) 降到 O(1),是大规模集群实时消息的首选方案。

关键要点:

本章评分
4.5  / 5  (3 评分)

💬 留言讨论