Pub/Sub 与 Sharded Pub/Sub
第28章 Pub/Sub 与 Sharded Pub/Sub:消息推送架构
28.1 Pub/Sub 的定位与价值
Redis Pub/Sub(发布/订阅)是一种消息广播机制,遵循"发布者不知道订阅者是谁"的解耦原则。与 Redis Streams(持久化消息队列)不同,Pub/Sub 是**即发即忘(fire-and-forget)**模型:发布者将消息推送到 channel,所有当前在线的订阅者立即收到消息;发布时不在线的订阅者将永远错过这条消息。
适用场景:
- 实时在线通知(用户A给B发消息,B在线立即收到)
- 配置变更广播(更新所有节点的本地缓存)
- 轻量级事件总线(微服务内部事件通知)
- 实时排行榜刷新、监控告警推送
不适用场景:
- 需要消息持久化和重放(用 Streams)
- 消费者处理慢,需要积压和重试(用 Streams 或消息队列)
- 需要消息确认(ACK)机制
28.2 标准 Pub/Sub 实现原理
28.2.1 服务端数据结构
Redis 用两个核心数据结构管理 Pub/Sub 状态(pubsub.c):
/* server 全局 */
dict *pubsub_channels; /* key: channel名(robj) → value: 订阅者链表(list of clients) */
list *pubsub_patterns; /* 模式订阅列表,每个元素是 pubsubPattern 结构体 */
/* 每个 client */
dict *pubsub_channels; /* 该client订阅的 channel 集合(用于 O(1) 查找)*/
list *pubsub_patterns; /* 该client的模式订阅列表 */
/* pubsubPattern 结构体 */
typedef struct pubsubPattern {
client *client; /* 订阅者 client */
robj *pattern; /* 订阅模式(如 news.* )*/
} pubsubPattern;
28.2.2 SUBSCRIBE 流程
# 客户端A订阅两个channel
SUBSCRIBE news sports
内部步骤:
- 在
server.pubsub_channels["news"]链表中追加 clientA - 在
server.pubsub_channels["sports"]链表中追加 clientA - 将 "news"、"sports" 加入
clientA->pubsub_channels字典 - 向 clientA 发送订阅确认响应(每个 channel 一条)
# 订阅确认消息(RESP 格式)
*3\r\n
$9\r\nsubscribe\r\n ← 消息类型
$4\r\nnews\r\n ← channel 名
:1\r\n ← 当前订阅 channel 总数
28.2.3 PUBLISH 流程
PUBLISH news "Breaking: Redis 8 released"
内部步骤:
- 在
server.pubsub_channels["news"]中查找订阅者链表 - 遍历链表,向每个订阅者 client 发送消息
- 遍历
server.pubsub_patterns,用 glob 匹配找出匹配的模式订阅者 - 向匹配的模式订阅者发送 pmessage 消息
- 返回接收消息的订阅者总数(整数)
# 订阅者收到的消息(RESP 格式)
*3\r\n
$7\r\nmessage\r\n ← 消息类型
$4\r\nnews\r\n ← channel 名
$28\r\nBreaking: Redis 8 released\r\n ← 消息内容
28.2.4 PSUBSCRIBE 模式订阅
# 模式订阅(glob 语法)
PSUBSCRIBE news.* # 匹配 news.cn、news.us 等
PSUBSCRIBE user:[0-9]* # 匹配 user:1000、user:9999 等
PSUBSCRIBE * # 匹配所有 channel
模式订阅者收到的消息格式与普通订阅者不同(多了 pattern 字段):
*4\r\n
$8\r\npmessage\r\n ← 消息类型
$6\r\nnews.*\r\n ← 匹配的模式
$7\r\nnews.cn\r\n ← 实际 channel 名
$15\r\nHello from China\r\n ← 消息内容
28.2.5 订阅者的连接状态约束
订阅状态下,客户端只能接受以下命令:
SUBSCRIBE / UNSUBSCRIBEPSUBSCRIBE / PUNSUBSCRIBEPING(用于保活检测)QUITRESET(Redis 6+,重置连接状态)
其他命令返回错误:ERR Command not allowed inside a subscription context。这意味着订阅 channel 的连接不能复用于普通命令,生产中通常需要单独的 Pub/Sub 连接池。
28.3 Keyspace Notifications(键空间通知)
28.3.1 配置与订阅
键空间通知是 Pub/Sub 的内置应用:Redis 将自身的键操作事件发布到特殊 channel,客户端订阅后可以监听数据变化。
# redis.conf 配置(默认关闭)
notify-keyspace-events "" # 关闭(默认)
notify-keyspace-events KEA # 开启所有事件
# 配置字符说明
# K = 键空间事件(__keyspace@<db>__:<key>)
# E = 键事件(__keyevent@<db>__:<event>)
# g = 通用命令(DEL, EXPIRE, RENAME...)
# $ = String 命令
# l = List 命令
# s = Set 命令
# h = Hash 命令
# z = ZSet 命令
# x = 过期事件
# d = 模块键空间事件
# A = 所有事件(g$lshzxd 的别名)
# 运行时开启(不需要重启)
CONFIG SET notify-keyspace-events KEA
# 监听 db0 中所有 key 过期事件
SUBSCRIBE __keyevent@0__:expired
# 监听 db0 中 user:1000 这个 key 的所有操作
SUBSCRIBE __keyspace@0__:user:1000
28.3.2 两种通知 channel 的区别
| channel 类型 | 格式 | 收到的消息 | 用途 |
|---|---|---|---|
| keyspace | __keyspace@<db>__:<key> |
操作类型(如 "expired") | 监听某个 key 的所有变化 |
| keyevent | __keyevent@<db>__:<event> |
key 名 | 监听某类事件影响的所有 key |
# 场景:key "order:12345" 过期
# keyspace 通知(谁监听这个key的变化?)
# channel: __keyspace@0__:order:12345
# message: "expired"
# keyevent 通知(谁监听过期事件?)
# channel: __keyevent@0__:expired
# message: "order:12345"
28.3.3 性能影响
键空间通知对性能有显著影响,原因:
- 每个修改操作(SET/DEL/EXPIRE 等)都会触发
notifyKeyspaceEvent() - 该函数遍历
server.pubsub_channels查找订阅者 - 无论是否有订阅者,查找本身有开销
实测:开启 notify-keyspace-events A(所有事件)后,高频写入场景 QPS 下降约 15~25%。
建议:仅开启业务必需的事件类型,例如只开启 Kx(只监听 key 过期):
CONFIG SET notify-keyspace-events Kx
28.4 Pub/Sub 的集群问题
28.4.1 标准 Pub/Sub 在 Cluster 中的行为
Redis Cluster 中使用 PUBLISH 时,消息会广播到所有节点:
节点A (master) 节点B (master) 节点C (master)
↑ ↑ ↑
PUBLISH "news" "hello"
↓ 广播到所有节点
节点A → 发送给A上的订阅者
节点B → 发送给B上的订阅者(即使B不是发布节点)
节点C → 发送给C上的订阅者
问题:
- 广播开销 O(N)(N 为集群节点数)
- 节点间产生大量内部通信流量
- 集群规模越大,PUBLISH 的代价越高
原因:标准 Pub/Sub 不属于 keyspace 操作,不能按槽路由,因此只能全广播。
28.4.2 Cluster 中的 SUBSCRIBE 连接问题
在 Cluster 中,客户端必须连接到正确的节点才能接收消息:
# 错误做法(可能连到错误节点)
r = redis.Redis(host='redis-cluster-node1')
r.subscribe('news') # 如果消息发到了另一个节点,收不到
# 正确做法:连接到集群的任意节点,都能收到(因为Cluster会广播)
# 但这不是真正高效的方案
28.5 Redis 7 Sharded Pub/Sub
28.5.1 设计动机
Redis 7.0(2022年4月)引入了 Sharded Pub/Sub,解决 Cluster 全广播问题。核心思想:将 channel 按哈希槽归属到特定节点,消息只在负责该槽的节点上分发。
28.5.2 新命令集
# 分片订阅(连接到负责 channel 所在槽的节点)
SSUBSCRIBE {user:1}:events
# 分片取消订阅
SUNSUBSCRIBE {user:1}:events
# 分片发布(发到 channel 对应槽的节点)
SPUBLISH {user:1}:events "login"
# 返回当前节点的分片订阅 channel 数量
PUBSUB SHARDCHANNELS
PUBSUB SHARDNUMSUB channel1 channel2
28.5.3 工作原理
Redis Cluster(3个主节点)
┌──────────────────────────────────────────────────────┐
│ 节点A:负责槽 0-5460 │
│ SSUBSCRIBE {user:1}:events 的订阅者在这里 │
│ SPUBLISH {user:1}:events → 只在节点A分发 │
├──────────────────────────────────────────────────────┤
│ 节点B:负责槽 5461-10922 │
│ 其他 channel 的订阅者在这里 │
├──────────────────────────────────────────────────────┤
│ 节点C:负责槽 10923-16383 │
└──────────────────────────────────────────────────────┘
步骤详解:
- 计算
{user:1}:events的哈希槽:CRC16("{user:1}") % 16384 = 2000(假设) - 槽 2000 由节点A负责
- 订阅者必须连接到节点A执行
SSUBSCRIBE {user:1}:events - 发布者执行
SPUBLISH {user:1}:events "login",客户端库自动路由到节点A - 节点A将消息分发给所有在本节点订阅
{user:1}:events的客户端 - 不向其他节点广播
网络开销:O(1) vs 标准 Pub/Sub 的 O(N_nodes)
28.5.4 哈希标签(Hash Tag)的关键作用
Sharded Pub/Sub 要求同一业务的 channel 落到同一个槽,需要合理使用哈希标签:
# 同一用户的所有 channel 落到同一槽(用户ID作为哈希标签)
SSUBSCRIBE {user:1}:chat
SSUBSCRIBE {user:1}:notification
SSUBSCRIBE {user:1}:presence
# 同一房间的所有 channel 落到同一槽
SSUBSCRIBE {room:100}:message
SSUBSCRIBE {room:100}:system
注意:哈希标签 {...} 中的内容决定槽,{user:1} 和 {user:2} 落到不同槽(不同节点),这是正确的设计——不同用户的消息彼此隔离。
28.5.5 客户端库支持
Sharded Pub/Sub 需要客户端库的特别支持(自动路由到正确节点):
# redis-py 4.3+
from redis.cluster import RedisCluster
rc = RedisCluster(
host='redis-cluster',
port=6379,
skip_full_coverage_check=True
)
# 使用分片 Pub/Sub(自动路由)
pubsub = rc.pubsub()
pubsub.ssubscribe('{user:1}:events')
# 接收消息
for message in pubsub.listen():
if message['type'] == 'smessage':
print(f"Received: {message['data']}")
# 发布
rc.spublish('{user:1}:events', 'user_login')
// go-redis v9
rdb := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{"localhost:7000", "localhost:7001", "localhost:7002"},
})
pubsub := rdb.SSubscribe(ctx, "{user:1}:events")
defer pubsub.Close()
ch := pubsub.Channel()
for msg := range ch {
fmt.Printf("Received: %s\n", msg.Payload)
}
28.6 Pub/Sub vs Stream:如何选择
28.6.1 核心区别
| 特性 | Pub/Sub | Sharded Pub/Sub | Redis Stream |
|---|---|---|---|
| 消息持久化 | 否 | 否 | 是 |
| 离线消息 | 丢失 | 丢失 | 保留(Consumer Group) |
| 消息 ACK | 否 | 否 | 是(XACK) |
| 消费者组 | 否 | 否 | 是(XGROUP) |
| Cluster 广播 | 全节点 | 单节点 | 单节点(按槽) |
| 消息积压处理 | 无 | 无 | 有(MAXLEN 截断) |
| 延迟 | 极低(< 1ms) | 极低 | 低(1~5ms) |
| 适用场景 | 实时通知 | 大规模实时通知 | 可靠消息队列 |
28.6.2 选型建议
是否需要消息保证(不丢失/至少一次交付)?
├── 是 → Redis Stream
└── 否(实时性优先)
├── Redis Cluster 中?
│ ├── 是 → Sharded Pub/Sub(节省带宽)
│ └── 否(单机/主从)→ 标准 Pub/Sub
└── channel 数量 > 1000 且有模式订阅?
→ 注意:大量 PSUBSCRIBE 会使 PUBLISH 变慢(O(N_patterns))
28.7 生产环境实践
28.7.1 连接管理
# 生产中 Pub/Sub 连接应独立于命令连接
import redis
import threading
class PubSubManager:
def __init__(self, redis_url):
self.r = redis.from_url(redis_url)
self.pubsub = self.r.pubsub()
self._thread = None
def subscribe(self, channels, handler):
self.pubsub.subscribe(**{ch: handler for ch in channels})
# 在独立线程中监听消息
self._thread = self.pubsub.run_in_thread(
sleep_time=0.001, # 轮询间隔
daemon=True
)
def publish(self, channel, message):
# 发布用独立连接(不影响订阅连接)
self.r.publish(channel, message)
def unsubscribe(self, *channels):
self.pubsub.unsubscribe(*channels)
def close(self):
if self._thread:
self._thread.stop()
self.pubsub.close()
28.7.2 心跳与连接保活
Pub/Sub 连接可能因网络设备超时(如 NAT 设备、负载均衡器)而被静默关闭。使用 PING 保活:
# 服务端配置
CONFIG SET tcp-keepalive 60 # 每60秒发送 TCP keepalive
# 客户端主动 PING
# redis-py 的 pubsub.run_in_thread 会自动处理 PING 响应
# 手动 PING
pubsub.ping("keepalive")
# 收到响应:{'type': 'pong', 'pattern': None, 'channel': b'keepalive', 'data': b'keepalive'}
28.7.3 监控 Pub/Sub 状态
# 查看当前 Pub/Sub 状态
PUBSUB CHANNELS # 所有活跃 channel(有订阅者的)
PUBSUB CHANNELS news.* # 模式过滤
PUBSUB NUMSUB news sports # 每个 channel 的订阅者数量
PUBSUB NUMPAT # 模式订阅总数
# 通过 INFO 查看
INFO stats | grep pubsub
# pubsub_channels:5
# pubsub_patterns:2
28.7.4 消息序列化建议
Pub/Sub 消息是字节串,建议使用轻量级序列化格式:
import json
import msgpack
# JSON(可读,开销大)
r.publish('channel', json.dumps({'event': 'login', 'user_id': 1000}))
# MessagePack(紧凑,高效)
r.publish('channel', msgpack.packb({'event': 'login', 'user_id': 1000}))
# 简单场景:直接字符串
r.publish('channel', f'login:{user_id}')
28.8 小结
Redis Pub/Sub 提供了简单高效的消息广播机制。标准 Pub/Sub 适合单机和主从架构;在 Cluster 中,Redis 7 的 Sharded Pub/Sub 通过槽路由消除了全节点广播,将网络开销从 O(N_nodes) 降到 O(1),是大规模集群实时消息的首选方案。
关键要点:
- Pub/Sub 是即发即忘,需要可靠消息传递时使用 Streams
- 键空间通知是 Pub/Sub 的内置应用,但会增加 CPU 开销,按需启用
- Cluster 中优先使用 Sharded Pub/Sub(SSUBSCRIBE/SPUBLISH)
- 哈希标签确保同业务的 channel 落在同一节点
- 生产环境 Pub/Sub 连接必须独立于命令连接,并实现心跳保活