第 11 章

Stream:Kafka 式消息流在 Redis 中的实现

第11章 Stream:Kafka 式消息流在 Redis 中的实现

11.1 为什么需要 Stream

在 Stream(Redis 5.0,2018年)出现之前,工程师们用 List 或 Sorted Set 来模拟消息队列。这种方案有三个根本性缺陷:

缺陷1:无消费者组(Consumer Group)

List 的 BLPOP 只允许一个消费者拿到同一条消息。如果需要多个消费者并行处理,必须手动分片,维护成本极高。

缺陷2:无 ACK 机制

LPOP 弹出后消息立即消失。消费者崩溃时,消息无法重新投递,数据直接丢失。

缺陷3:无历史回溯

List 消费即删除,无法让新消费者从头重新消费历史消息,也无法做消息审计。

Stream 针对性地解决了这三个问题,同时在内存层面保持高效编码。


11.2 底层存储结构

11.2.1 Radix Tree(基数树)

Stream 的核心存储是一棵 Radix Tree(压缩前缀树),定义在 rax.h/rax.c

Stream 对象(stream *)
├── rax *rax          ← 基数树,key = Entry ID 字节串
├── uint64_t length   ← 消息总数(含已删除的计数扣除)
├── streamID last_id  ← 最后一条消息的 ID
├── streamID first_id ← 第一条消息的 ID(Redis 7.0+)
└── rax *cgroups      ← 消费者组字典,key = 组名

Radix Tree 的时间复杂度为 O(k),k 为 key 长度(Entry ID 固定16字节),与消息数量 n 无关。这使得按 ID 范围查找和插入都极为高效。

11.2.2 Listpack 叶节点

Radix Tree 不是每条消息存一个叶节点。它将 连续的若干条消息打包到一个 listpack 中,再将 listpack 挂到叶节点上。

Radix Tree 叶节点结构(示意):
  key  = "1704067200000-"(共同前缀)
  data → listpack {
    entry0: [flags][ms_delta][seq_delta][num_fields][field1][val1]...[lp_len]
    entry1: [flags][ms_delta][seq_delta][SAME_FIELDS][val1]...[lp_len]
    ...
  }

同一个 listpack 中的消息共享字段名(field names)。如果相邻消息的字段完全相同,只存储值,不重复存字段名,节省大量空间。

每个 listpack 默认存放最多 stream-node-max-entries(默认100)条消息,或总字节数超过 stream-node-max-bytes(默认4096字节)时分裂为新节点。

内存布局对比(1000条消息,各4个字段):

结构 内存占用 说明
朴素 Hash 模拟 ~500KB 每条消息独立 Hash
Stream(listpack) ~80KB 字段名共享 + 整数压缩

11.2.3 Entry ID 设计

格式:<毫秒时间戳>-<同毫秒内序列号>
示例:1704067200000-3
       ↑              ↑
  Unix ms timestamp  同一毫秒第4条(从0开始)

11.3 核心命令详解

11.3.1 XADD

# 自动生成 ID,追加消息
XADD orders * user_id 1001 action "place_order" amount 299.00
# 返回:1704067200123-0

# 近似裁剪(~ 表示不精确,允许多保留几条,换取更高性能)
XADD orders MAXLEN ~ 10000 * user_id 1002 action "cancel" amount 0
# MAXLEN 精确裁剪需要遍历到 listpack 边界,~ 裁剪只在 Radix Tree 节点粒度截断

# MINID 裁剪:删除 ID 小于指定值的消息
XADD orders MINID ~ 1704000000000 * user_id 1003 action "refund" amount 100

XADD 时间复杂度:O(1) 均摊(含偶尔的 listpack 分裂)。

11.3.2 读取命令

# 范围读取(- 表示最小ID,+ 表示最大ID)
XRANGE orders - + COUNT 100
# 返回:[(id, {field: val, ...}), ...]

# 反向读取
XREVRANGE orders + - COUNT 10

# 非阻塞读(COUNT 条,从指定 ID 之后开始)
XREAD COUNT 10 STREAMS orders 1704067200000-0

# 阻塞读(等待新消息,$ 表示只收此后新增的)
XREAD COUNT 10 BLOCK 0 STREAMS orders $

# 读取多个 Stream
XREAD BLOCK 1000 STREAMS orders payments 1704067200000-0 1704067100000-0

# 查询长度
XLEN orders

11.3.3 删除与裁剪

# 删除单条消息(标记删除,listpack 中仍存在,计数减少)
XDEL orders 1704067200123-0

# 精确裁剪至最多 N 条
XTRIM orders MAXLEN 5000

# 近似裁剪(推荐生产使用)
XTRIM orders MAXLEN ~ 5000

注意:XDEL 仅在 Stream 元数据中标记删除(stream->length--),不立即释放 listpack 中的空间。只有当整个 listpack 中的消息全被删除后,该节点才从 Radix Tree 中移除。这意味着 XLEN 返回的逻辑长度可能小于实际内存占用中的条目数。


11.4 Consumer Group 内部实现

11.4.1 创建消费者组

# 从末尾开始消费($ 表示只消费此后新增消息)
XGROUP CREATE orders order_processor $ MKSTREAM

# 从头开始消费(0 表示从最早消息开始)
XGROUP CREATE orders audit_log 0

# 修改消费者组的起始位置
XGROUP SETID orders order_processor 1704067200000-0

# 删除消费者组
XGROUP DESTROY orders order_processor

# 添加消费者(也会在 XREADGROUP 时自动创建)
XGROUP CREATECONSUMER orders order_processor worker-1

11.4.2 消费与确认

# 从消费者组读取(> 表示读取未分配给任何消费者的新消息)
XREADGROUP GROUP order_processor worker-1 COUNT 10 STREAMS orders >
# 返回消息同时,Redis 将这些消息 ID 记入 PEL

# ACK 确认(将消息从 PEL 移除)
XACK orders order_processor 1704067200123-0 1704067200124-0

# 查看待确认消息(PEL 内容)
XPENDING orders order_processor - + 10
# 返回:[id, consumer_name, idle_ms, delivery_count]

# 转移超时消息(超过 1 小时未确认,转给 worker-2)
XCLAIM orders order_processor worker-2 3600000 1704067200123-0

# AUTOCLC(Redis 7.0):自动认领并删除超过最大投递次数的消息
XAUTOCLAIM orders order_processor worker-2 3600000 0-0 COUNT 10

11.4.3 PEL(Pending Entry List)内部结构

每个消费者组(streamCG)和每个消费者(streamConsumer)各自维护一个 PEL:

// stream.h(简化)
typedef struct streamCG {
    streamID last_id;       // 该组已处理到的最大 ID
    long long entries_read; // 已读取条数(用于 lag 计算)
    rax *pel;               // 消费者组级 PEL:id → pelentry
    rax *consumers;         // 消费者字典:name → streamConsumer
} streamCG;

typedef struct streamConsumer {
    mstime_t seen_time;     // 最后活跃时间
    sds name;               // 消费者名称
    rax *pel;               // 消费者级 PEL:id → pelentry(同一 pelentry 对象)
} streamConsumer;

typedef struct streamNACK {
    mstime_t delivery_time;  // 最后一次投递时间
    uint64_t delivery_count; // 投递次数(用于死信检测)
    streamConsumer *consumer; // 当前持有该消息的消费者
} streamNACK;

PEL 用 Radix Tree 存储,key 是 Entry ID 的二进制表示(16字节,大端序)。消费者组 PEL 和消费者 PEL 指向同一个 streamNACK 对象,节省内存。

11.4.4 消息投递流程

XREADGROUP 执行时:
1. 从 Stream Radix Tree 中找到 last_id 之后的消息
2. 将消息 ID 写入消费者组 PEL 和消费者 PEL
3. 更新 streamNACK.delivery_time = now, delivery_count++
4. 返回消息内容给客户端

XACK 执行时:
1. 在消费者组 PEL 中查找该 ID → 找到 streamNACK
2. 在对应消费者的 PEL 中删除该 ID
3. 在消费者组 PEL 中删除该 ID
4. 释放 streamNACK 内存
5. 返回成功确认数量

11.5 Stream 监控命令

# 完整的 Stream 信息
XINFO STREAM orders FULL COUNT 5
# 输出包含:length/radix-tree-keys/radix-tree-nodes/
#           last-generated-id/entries/groups

# 消费者组信息
XINFO GROUPS orders
# 输出:name/consumers/pending/last-delivered-id/entries-read/lag

# 消费者信息
XINFO CONSUMERS orders order_processor
# 输出:name/pending/idle/inactive

Lag 计算(Redis 7.0+):

lag = stream.length - group.entries_read

lag 表示该消费者组还有多少消息未处理,是监控消费积压的关键指标。


11.6 实际案例:订单状态变更通知

import redis
import json
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 生产者:订单服务发布状态变更
def publish_order_event(order_id, status, extra=None):
    payload = {
        'order_id': str(order_id),
        'status': status,
        'timestamp': str(int(time.time() * 1000)),
    }
    if extra:
        payload.update(extra)
    
    msg_id = r.xadd(
        'stream:orders',
        payload,
        maxlen=100000,  # 保留最近10万条
        approximate=True
    )
    return msg_id

# 消费者:通知服务处理
def start_consumer(group, consumer_name):
    # 确保消费者组存在
    try:
        r.xgroup_create('stream:orders', group, '$', mkstream=True)
    except redis.exceptions.ResponseError:
        pass  # 组已存在

    while True:
        # 读取最多10条未分配消息
        messages = r.xreadgroup(
            group, consumer_name,
            {'stream:orders': '>'},
            count=10,
            block=2000  # 阻塞2秒
        )
        
        if not messages:
            # 检查 PEL 中超时消息(超过30秒未确认)
            pending = r.xpending_range(
                'stream:orders', group,
                min='-', max='+', count=10
            )
            for p in pending:
                if p['time_since_delivered'] > 30000:
                    r.xclaim(
                        'stream:orders', group, consumer_name,
                        30000, p['message_id']
                    )
            continue
        
        for stream_name, msgs in messages:
            for msg_id, data in msgs:
                try:
                    process_order_event(data)
                    r.xack('stream:orders', group, msg_id)
                except Exception as e:
                    print(f"处理失败: {msg_id}, 错误: {e}")
                    # 不 ACK,等待超时后重新认领

def process_order_event(data):
    print(f"处理订单事件: order_id={data['order_id']}, status={data['status']}")
    # 发送短信/推送/邮件等

11.7 Redis Stream vs Kafka 本质差异

特性 Redis Stream Apache Kafka
存储介质 内存(持久化可选) 磁盘(顺序写)
分区支持 无(单 Stream 单节点) 有(Topic 多 Partition)
吞吐量 ~10万条/秒(单节点) ~百万条/秒(多分区)
消息保留 可删除/裁剪 按时间/大小保留,默认7天
消费者组 支持,基于 PEL 支持,基于 offset
消息回溯 支持(ID 范围查询) 支持(offset seek)
部署复杂度 极简(单进程) 较高(ZooKeeper/KRaft + Broker)
运维成本
消息有序性 Stream 内全局有序 Partition 内有序

选型建议


11.8 性能测试数据

# 测试环境:Redis 7.2,8核 CPU,32GB 内存
# 消息格式:5个字段,每个字段约10字节

# XADD 性能(单连接)
redis-benchmark -t xadd -n 1000000 -c 50
# 结果:约 350,000 ops/sec

# XREAD 性能(COUNT 100)
# 约 8,000 次/秒(每次读取100条,即80万条消息/秒)

# 内存占用:100万条消息(5字段,每字段10字节)
# 实测:约 120MB(listpack 压缩后)
# 朴素 Hash 模拟:约 800MB
# 压缩比:约 6.7:1

11.9 常见问题与最佳实践

Q1:Stream 消息能保证不丢失吗?

不能完全保证。Stream 本身是内存数据结构,需配合 AOF appendfsync always 或主从复制才能达到高可靠性。

Q2:消费者死亡后消息怎么办?

消息留在 PEL 中,idle 时间不断增加。通过 XPENDING 监控 + XCLAIMXAUTOCLAIM 转移给其他消费者。

Q3:MAXLEN ~ 和精确 MAXLEN 的性能差异有多大?

在 listpack 节点较大时(100条/节点),精确裁剪需要额外拆分节点,每次 XADD 约多消耗 3-5 微秒。大流量下推荐始终使用 ~

Q4:单个 Stream 的最大消息数?

理论上无上限(受内存限制),但超过 100万条后内存占用显著,建议配合 MAXLEN 或 MINID 控制。

Q5:多个消费者组会重复消费吗?

是的,这是设计特性。每个消费者组独立维护 last_id,互不影响。适合多个下游服务各自独立消费同一份数据。

本章评分
4.8  / 5  (34 评分)

💬 留言讨论