Stream:Kafka 式消息流在 Redis 中的实现
第11章 Stream:Kafka 式消息流在 Redis 中的实现
11.1 为什么需要 Stream
在 Stream(Redis 5.0,2018年)出现之前,工程师们用 List 或 Sorted Set 来模拟消息队列。这种方案有三个根本性缺陷:
缺陷1:无消费者组(Consumer Group)
List 的 BLPOP 只允许一个消费者拿到同一条消息。如果需要多个消费者并行处理,必须手动分片,维护成本极高。
缺陷2:无 ACK 机制
LPOP 弹出后消息立即消失。消费者崩溃时,消息无法重新投递,数据直接丢失。
缺陷3:无历史回溯
List 消费即删除,无法让新消费者从头重新消费历史消息,也无法做消息审计。
Stream 针对性地解决了这三个问题,同时在内存层面保持高效编码。
11.2 底层存储结构
11.2.1 Radix Tree(基数树)
Stream 的核心存储是一棵 Radix Tree(压缩前缀树),定义在 rax.h/rax.c。
Stream 对象(stream *)
├── rax *rax ← 基数树,key = Entry ID 字节串
├── uint64_t length ← 消息总数(含已删除的计数扣除)
├── streamID last_id ← 最后一条消息的 ID
├── streamID first_id ← 第一条消息的 ID(Redis 7.0+)
└── rax *cgroups ← 消费者组字典,key = 组名
Radix Tree 的时间复杂度为 O(k),k 为 key 长度(Entry ID 固定16字节),与消息数量 n 无关。这使得按 ID 范围查找和插入都极为高效。
11.2.2 Listpack 叶节点
Radix Tree 不是每条消息存一个叶节点。它将 连续的若干条消息打包到一个 listpack 中,再将 listpack 挂到叶节点上。
Radix Tree 叶节点结构(示意):
key = "1704067200000-"(共同前缀)
data → listpack {
entry0: [flags][ms_delta][seq_delta][num_fields][field1][val1]...[lp_len]
entry1: [flags][ms_delta][seq_delta][SAME_FIELDS][val1]...[lp_len]
...
}
同一个 listpack 中的消息共享字段名(field names)。如果相邻消息的字段完全相同,只存储值,不重复存字段名,节省大量空间。
每个 listpack 默认存放最多 stream-node-max-entries(默认100)条消息,或总字节数超过 stream-node-max-bytes(默认4096字节)时分裂为新节点。
内存布局对比(1000条消息,各4个字段):
| 结构 | 内存占用 | 说明 |
|---|---|---|
| 朴素 Hash 模拟 | ~500KB | 每条消息独立 Hash |
| Stream(listpack) | ~80KB | 字段名共享 + 整数压缩 |
11.2.3 Entry ID 设计
格式:<毫秒时间戳>-<同毫秒内序列号>
示例:1704067200000-3
↑ ↑
Unix ms timestamp 同一毫秒第4条(从0开始)
- 严格单调递增,保证全局有序
- 支持手动指定 ID(用于数据迁移):
XADD stream 1704067200000-0 field val *表示自动生成:Redis 取当前毫秒时间戳,若与 last_id 毫秒相同则 seq++
11.3 核心命令详解
11.3.1 XADD
# 自动生成 ID,追加消息
XADD orders * user_id 1001 action "place_order" amount 299.00
# 返回:1704067200123-0
# 近似裁剪(~ 表示不精确,允许多保留几条,换取更高性能)
XADD orders MAXLEN ~ 10000 * user_id 1002 action "cancel" amount 0
# MAXLEN 精确裁剪需要遍历到 listpack 边界,~ 裁剪只在 Radix Tree 节点粒度截断
# MINID 裁剪:删除 ID 小于指定值的消息
XADD orders MINID ~ 1704000000000 * user_id 1003 action "refund" amount 100
XADD 时间复杂度:O(1) 均摊(含偶尔的 listpack 分裂)。
11.3.2 读取命令
# 范围读取(- 表示最小ID,+ 表示最大ID)
XRANGE orders - + COUNT 100
# 返回:[(id, {field: val, ...}), ...]
# 反向读取
XREVRANGE orders + - COUNT 10
# 非阻塞读(COUNT 条,从指定 ID 之后开始)
XREAD COUNT 10 STREAMS orders 1704067200000-0
# 阻塞读(等待新消息,$ 表示只收此后新增的)
XREAD COUNT 10 BLOCK 0 STREAMS orders $
# 读取多个 Stream
XREAD BLOCK 1000 STREAMS orders payments 1704067200000-0 1704067100000-0
# 查询长度
XLEN orders
11.3.3 删除与裁剪
# 删除单条消息(标记删除,listpack 中仍存在,计数减少)
XDEL orders 1704067200123-0
# 精确裁剪至最多 N 条
XTRIM orders MAXLEN 5000
# 近似裁剪(推荐生产使用)
XTRIM orders MAXLEN ~ 5000
注意:XDEL 仅在 Stream 元数据中标记删除(stream->length--),不立即释放 listpack 中的空间。只有当整个 listpack 中的消息全被删除后,该节点才从 Radix Tree 中移除。这意味着 XLEN 返回的逻辑长度可能小于实际内存占用中的条目数。
11.4 Consumer Group 内部实现
11.4.1 创建消费者组
# 从末尾开始消费($ 表示只消费此后新增消息)
XGROUP CREATE orders order_processor $ MKSTREAM
# 从头开始消费(0 表示从最早消息开始)
XGROUP CREATE orders audit_log 0
# 修改消费者组的起始位置
XGROUP SETID orders order_processor 1704067200000-0
# 删除消费者组
XGROUP DESTROY orders order_processor
# 添加消费者(也会在 XREADGROUP 时自动创建)
XGROUP CREATECONSUMER orders order_processor worker-1
11.4.2 消费与确认
# 从消费者组读取(> 表示读取未分配给任何消费者的新消息)
XREADGROUP GROUP order_processor worker-1 COUNT 10 STREAMS orders >
# 返回消息同时,Redis 将这些消息 ID 记入 PEL
# ACK 确认(将消息从 PEL 移除)
XACK orders order_processor 1704067200123-0 1704067200124-0
# 查看待确认消息(PEL 内容)
XPENDING orders order_processor - + 10
# 返回:[id, consumer_name, idle_ms, delivery_count]
# 转移超时消息(超过 1 小时未确认,转给 worker-2)
XCLAIM orders order_processor worker-2 3600000 1704067200123-0
# AUTOCLC(Redis 7.0):自动认领并删除超过最大投递次数的消息
XAUTOCLAIM orders order_processor worker-2 3600000 0-0 COUNT 10
11.4.3 PEL(Pending Entry List)内部结构
每个消费者组(streamCG)和每个消费者(streamConsumer)各自维护一个 PEL:
// stream.h(简化)
typedef struct streamCG {
streamID last_id; // 该组已处理到的最大 ID
long long entries_read; // 已读取条数(用于 lag 计算)
rax *pel; // 消费者组级 PEL:id → pelentry
rax *consumers; // 消费者字典:name → streamConsumer
} streamCG;
typedef struct streamConsumer {
mstime_t seen_time; // 最后活跃时间
sds name; // 消费者名称
rax *pel; // 消费者级 PEL:id → pelentry(同一 pelentry 对象)
} streamConsumer;
typedef struct streamNACK {
mstime_t delivery_time; // 最后一次投递时间
uint64_t delivery_count; // 投递次数(用于死信检测)
streamConsumer *consumer; // 当前持有该消息的消费者
} streamNACK;
PEL 用 Radix Tree 存储,key 是 Entry ID 的二进制表示(16字节,大端序)。消费者组 PEL 和消费者 PEL 指向同一个 streamNACK 对象,节省内存。
11.4.4 消息投递流程
XREADGROUP 执行时:
1. 从 Stream Radix Tree 中找到 last_id 之后的消息
2. 将消息 ID 写入消费者组 PEL 和消费者 PEL
3. 更新 streamNACK.delivery_time = now, delivery_count++
4. 返回消息内容给客户端
XACK 执行时:
1. 在消费者组 PEL 中查找该 ID → 找到 streamNACK
2. 在对应消费者的 PEL 中删除该 ID
3. 在消费者组 PEL 中删除该 ID
4. 释放 streamNACK 内存
5. 返回成功确认数量
11.5 Stream 监控命令
# 完整的 Stream 信息
XINFO STREAM orders FULL COUNT 5
# 输出包含:length/radix-tree-keys/radix-tree-nodes/
# last-generated-id/entries/groups
# 消费者组信息
XINFO GROUPS orders
# 输出:name/consumers/pending/last-delivered-id/entries-read/lag
# 消费者信息
XINFO CONSUMERS orders order_processor
# 输出:name/pending/idle/inactive
Lag 计算(Redis 7.0+):
lag = stream.length - group.entries_read
lag 表示该消费者组还有多少消息未处理,是监控消费积压的关键指标。
11.6 实际案例:订单状态变更通知
import redis
import json
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 生产者:订单服务发布状态变更
def publish_order_event(order_id, status, extra=None):
payload = {
'order_id': str(order_id),
'status': status,
'timestamp': str(int(time.time() * 1000)),
}
if extra:
payload.update(extra)
msg_id = r.xadd(
'stream:orders',
payload,
maxlen=100000, # 保留最近10万条
approximate=True
)
return msg_id
# 消费者:通知服务处理
def start_consumer(group, consumer_name):
# 确保消费者组存在
try:
r.xgroup_create('stream:orders', group, '$', mkstream=True)
except redis.exceptions.ResponseError:
pass # 组已存在
while True:
# 读取最多10条未分配消息
messages = r.xreadgroup(
group, consumer_name,
{'stream:orders': '>'},
count=10,
block=2000 # 阻塞2秒
)
if not messages:
# 检查 PEL 中超时消息(超过30秒未确认)
pending = r.xpending_range(
'stream:orders', group,
min='-', max='+', count=10
)
for p in pending:
if p['time_since_delivered'] > 30000:
r.xclaim(
'stream:orders', group, consumer_name,
30000, p['message_id']
)
continue
for stream_name, msgs in messages:
for msg_id, data in msgs:
try:
process_order_event(data)
r.xack('stream:orders', group, msg_id)
except Exception as e:
print(f"处理失败: {msg_id}, 错误: {e}")
# 不 ACK,等待超时后重新认领
def process_order_event(data):
print(f"处理订单事件: order_id={data['order_id']}, status={data['status']}")
# 发送短信/推送/邮件等
11.7 Redis Stream vs Kafka 本质差异
| 特性 | Redis Stream | Apache Kafka |
|---|---|---|
| 存储介质 | 内存(持久化可选) | 磁盘(顺序写) |
| 分区支持 | 无(单 Stream 单节点) | 有(Topic 多 Partition) |
| 吞吐量 | ~10万条/秒(单节点) | ~百万条/秒(多分区) |
| 消息保留 | 可删除/裁剪 | 按时间/大小保留,默认7天 |
| 消费者组 | 支持,基于 PEL | 支持,基于 offset |
| 消息回溯 | 支持(ID 范围查询) | 支持(offset seek) |
| 部署复杂度 | 极简(单进程) | 较高(ZooKeeper/KRaft + Broker) |
| 运维成本 | 低 | 高 |
| 消息有序性 | Stream 内全局有序 | Partition 内有序 |
选型建议:
- 消息量 < 10万/秒,消费者 < 10个,可接受内存成本 → Redis Stream
- 消息量 > 10万/秒,需要多分区并行,需要磁盘持久化几天 → Kafka
- 需要 Schema Registry、Kafka Connect 等生态 → Kafka
- 已有 Redis 基础设施,不想引入新组件 → Redis Stream
11.8 性能测试数据
# 测试环境:Redis 7.2,8核 CPU,32GB 内存
# 消息格式:5个字段,每个字段约10字节
# XADD 性能(单连接)
redis-benchmark -t xadd -n 1000000 -c 50
# 结果:约 350,000 ops/sec
# XREAD 性能(COUNT 100)
# 约 8,000 次/秒(每次读取100条,即80万条消息/秒)
# 内存占用:100万条消息(5字段,每字段10字节)
# 实测:约 120MB(listpack 压缩后)
# 朴素 Hash 模拟:约 800MB
# 压缩比:约 6.7:1
11.9 常见问题与最佳实践
Q1:Stream 消息能保证不丢失吗?
不能完全保证。Stream 本身是内存数据结构,需配合 AOF appendfsync always 或主从复制才能达到高可靠性。
Q2:消费者死亡后消息怎么办?
消息留在 PEL 中,idle 时间不断增加。通过 XPENDING 监控 + XCLAIM 或 XAUTOCLAIM 转移给其他消费者。
Q3:MAXLEN ~ 和精确 MAXLEN 的性能差异有多大?
在 listpack 节点较大时(100条/节点),精确裁剪需要额外拆分节点,每次 XADD 约多消耗 3-5 微秒。大流量下推荐始终使用 ~。
Q4:单个 Stream 的最大消息数?
理论上无上限(受内存限制),但超过 100万条后内存占用显著,建议配合 MAXLEN 或 MINID 控制。
Q5:多个消费者组会重复消费吗?
是的,这是设计特性。每个消费者组独立维护 last_id,互不影响。适合多个下游服务各自独立消费同一份数据。