第 10 章

Offset 的一切:存储、提交与回溯

第10章:Offset 的一切:存储、提交与回溯

导读:Kafka 的 Offset 存储、提交与回溯机制如何影响消费可靠性?

本章核心问题:Kafka 的 Offset 存储、提交与回溯机制如何影响消费可靠性?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

Offset 是 Kafka 消费模型的核心坐标系。它决定了消费者从哪里开始读、从哪里恢复故障、以及如何在不同时间点回溯历史数据。理解 Offset 的存储格式、提交语义和各种位置概念之间的差异,是写出健壮 Consumer 代码的必要条件。

auto.offset.reset:当没有 Committed Offset 时

当 Consumer Group 第一次消费一个 Partition,或者 Committed Offset 已经过期(数据已被删除),Consumer 需要知道从哪里开始消费,由 auto.offset.reset 控制:

行为 适用场景
latest(默认) 从当前 LEO 开始,只消费新消息 实时流处理,不关心历史数据
earliest 从分区最早的可用 offset 开始 需要处理所有历史数据
none 抛出 NoOffsetForPartitionException 强制应用层显式指定 offset,避免意外

"鬼消费"场景(Ghost Consumption)

当一个 Consumer Group 停止消费超过 offset.retention.minutes(默认 7 天)后再重启,其在 __consumer_offsets 中的记录可能已被清除。此时:

无论哪种情况,"静默地"使用 auto.offset.reset 都可能导致数据问题。生产环境建议:对于关键数据管道,使用 auto.offset.reset=none,强制在 Offset 丢失时报警并人工干预。

幂等位移提交模式

在"at-least-once"语义下,消息可能被重复处理。结合幂等的业务逻辑和手动位移提交,可以实现应用层的"effectively once":

// 幂等提交模式:将位移提交和业务操作绑定到同一个事务
@Transactional
public void processAndCommit(ConsumerRecord<String, String> record) {
    // 1. 检查是否已处理(使用数据库中的已处理 offset 表)
    String key = record.topic() + "-" + record.partition() + "-" + record.offset();
    if (processedOffsets.exists(key)) {
        return; // 幂等跳过
    }
    
    // 2. 处理消息
    businessService.process(record.value());
    
    // 3. 记录已处理的 offset(同一个数据库事务)
    processedOffsets.insert(key);
    
    // 4. 提交 Kafka offset(注意:这里的事务是数据库事务,不是 Kafka 事务)
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    offsets.put(
        new TopicPartition(record.topic(), record.partition()),
        new OffsetAndMetadata(record.offset() + 1)
    );
    consumer.commitSync(offsets);
}

Consumer Lag 监控:运维核心指标

# 实时查看 Consumer Lag
kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --describe \
  --group my-group

# 输出示例(LAG 列即为消费延迟):
# TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID
# orders  0          12345           12400           55   consumer-1
# orders  1          23456           23456           0    consumer-2

# 使用 kafka-consumer-groups 监控 Lag 趋势
watch -n 5 "kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --describe --group my-group 2>/dev/null | grep -v '^$'"

Lag 告警策略

从 Offset 的存储格式到三种位置概念,从 commitSynccommitAsync 的设计哲学到 seek() 的按时间戳回溯,Offset 管理是 Kafka Consumer 编程中最需要深思熟虑的部分。每一个决策——何时提交、如何提交、从哪里开始消费——都直接影响系统的可靠性、数据完整性和故障恢复能力。


Level 2 · 它是怎么运行的(3-5年经验)

__consumer_offsets:Offset 住在哪里

Kafka 将 Consumer Group 的位移信息存储在内置 Topic __consumer_offsets 中,而不是外部系统(ZooKeeper 的旧版本实现已在 Kafka 0.9 废弃)。

Topic 配置

# 查看 __consumer_offsets 的配置
kafka-topics.sh --bootstrap-server kafka1:9092 \
  --describe --topic __consumer_offsets

# 关键配置:
# Partitions: 50(默认,由 offsets.topic.num.partitions 控制)
# Replication factor: 3(由 offsets.topic.replication.factor 控制)
# cleanup.policy: compact(日志压实,保留每个 key 的最新 value)

50 个分区提供了足够的并行度:50 个不同的 GroupCoordinator 实例(分散在各 Broker 上)可以同时接受位移提交,互不干扰。

Key 与 Value 的二进制格式

Offset Commit Record 的 Key(版本 1,当前 Consumer Group 使用):

[version: int16] [group: string] [topic: string] [partition: int32]

Offset Commit Record 的 Value

[version: int16] 
[offset: int64]           # 下一条要读的记录的 offset(committed offset + 1 的含义见后文)
[leaderEpoch: int32]      # Leader Epoch,用于防止消费过时数据
[metadata: string]        # 用户自定义元数据(通常为空)
[commitTimestamp: int64]  # 提交时的 Unix 时间戳(毫秒)
[expireTimestamp: int64]  # 过期时间(废弃字段,现在用 offset.retention.minutes)

Group Metadata Record 的 Key(存储 Group 成员信息):

[version: int16] [group: string]

这两类记录都存储在同一个 __consumer_offsets topic 中,由版本字段区分类型。

Log Compaction 如何工作于 Offset

__consumer_offsets 使用 Log Compaction,确保每个 (group, topic, partition) 三元组只保留最新一条位移提交记录。旧的提交记录会被 Compaction 过程删除,节省存储空间。

当一个 Consumer Group 长时间不活跃时,Kafka 会写入 tombstone 记录(Value 为 null)来清除该 Group 的所有位移记录(由 offset.retention.minutes 控制,默认 10080 分钟 = 7 天)。

# 读取 __consumer_offsets 的原始内容(调试用)
kafka-console-consumer.sh \
  --bootstrap-server kafka1:9092 \
  --topic __consumer_offsets \
  --formatter kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter \
  --from-beginning 2>/dev/null | head -20

# 输出格式:
# [my-group,orders,0]::OffsetAndMetadata[offset=12345, leaderEpoch=Optional[5], 
#   metadata=, commitTimestamp=1714000000000, expireTimestamp=-1]

offset.retention.minutes 的运作机制

__consumer_offsets 中的 Offset 记录并非永久保存。当一个 Consumer Group 的所有成员都离线(Group 处于 Empty 状态)时,Kafka 开始计时。超过 offset.retention.minutes(默认 10080 分钟,即 7 天)后,Kafka 写入 tombstone 记录,Compaction 过程删除该 Group 的所有 Offset 记录。

这意味着:一个消费 Group 如果停机超过 7 天,再次启动时其历史 Offset 已不复存在,行为由 auto.offset.reset 决定。

检查 Offset 是否仍然有效

# 查看 Consumer Group 的当前状态和 Offset
kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --describe \
  --group my-group

# 如果输出显示 "Consumer group 'my-group' does not exist"
# 说明该 Group 的 Offset 已过期

Level 3 · 规范怎么定义的(资深)

手动分区管理与 seek():自定义回溯

有时需要绕过自动的 GroupCoordinator 协议,手动控制消费位置,例如:

手动分区分配

// 跳过 Group 协议,直接指定分区
TopicPartition partition0 = new TopicPartition("orders", 0);
TopicPartition partition1 = new TopicPartition("orders", 1);
consumer.assign(Arrays.asList(partition0, partition1));
// 注意:assign() 和 subscribe() 互斥,不能同时使用
// assign() 不使用 GroupCoordinator,因此没有 Rebalance,也没有自动位移管理

seek() 系列方法

// 跳到指定 offset
consumer.seek(new TopicPartition("orders", 0), 12345L);

// 跳到分区起始
consumer.seekToBeginning(consumer.assignment());

// 跳到分区末尾(只消费新消息)
consumer.seekToEnd(consumer.assignment());

// 按时间戳查找 offset(Kafka 0.10.1+)
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
long targetTimestamp = Instant.parse("2024-01-01T00:00:00Z").toEpochMilli();
for (TopicPartition tp : consumer.assignment()) {
    timestampsToSearch.put(tp, targetTimestamp);
}

Map<TopicPartition, OffsetAndTimestamp> offsets = 
    consumer.offsetsForTimes(timestampsToSearch);

for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
    if (entry.getValue() != null) {
        consumer.seek(entry.getKey(), entry.getValue().offset());
    }
}
// 现在 consumer.poll() 将从 2024-01-01 00:00:00 对应的 offset 开始消费

offsetsForTimes() 使用 Broker 端的时间戳索引(.timeindex 文件)进行二分查找,效率极高,即使是 TB 级别的分区也能在毫秒内定位。


Level 4 · 边界与陷阱(所有人)

三个位置概念:Consumer 的坐标系

理解 Offset 必须区分三个不同的位置概念,混淆它们是 Consumer 编程中最常见的错误来源:

Partition Log:  [0][1][2]...[999][1000][1001]...[5000]
                                  ↑                ↑
                       Committed Offset         Log End Offset (LEO)
                      (last confirmed)         (next to be written)
                                  ↑
                       Position (Consumer 内存中的位置,下一条要 fetch 的)

Position(当前位置):Consumer 内存中维护的指针,指向下一条将要 fetch 的记录。每次成功 poll() 后,Position 自动前进。Position 只存在于 Consumer 进程内存中,进程重启后丢失。

Committed Offset(已提交位移):持久化到 __consumer_offsets 的位移值。它的语义是:"小于这个值的所有消息,我已经处理完毕"。注意:提交的值是下一条要处理的消息的 offset(即最后处理消息的 offset + 1),而不是最后处理消息的 offset 本身。

Log End Offset(日志末尾位移,LEO):Partition 中下一条即将被写入的 offset,即当前最大 offset + 1。由 Producer 端写入推进。

Consumer Lag(消费延迟)

Lag = LEO - Committed Offset

Lag 是衡量 Consumer 是否跟得上 Producer 写入速度的核心指标。Lag 为 0 表示实时消费;Lag 持续增大说明消费速度跟不上生产速度,需要扩展 Consumer 实例或优化处理逻辑。

commitSync() vs commitAsync():两种提交的设计哲学

commitSync():阻塞、可靠、有性能代价

// commitSync 的行为
try {
    consumer.commitSync();
    // 只有当 Coordinator 确认位移已写入 __consumer_offsets 后才返回
    // 如果发生可重试错误(如 NetworkException),会自动重试
    // 如果发生不可重试错误(如 ILLEGAL_GENERATION),抛出异常
} catch (CommitFailedException e) {
    // 例如:Rebalance 发生,本 Consumer 已被踢出 Group
    log.error("Commit failed due to rebalance", e);
}

commitSync() 在内部对可重试错误(如网络抖动)自动重试,直到成功或发生不可重试错误。这确保了位移一定被提交,但代价是:每次 commitSync() 都需要一个完整的网络往返到 Coordinator,期间当前线程阻塞。

性能代价:如果每处理一条消息调用一次 commitSync(),在高吞吐场景下(10万 TPS),每秒需要 10 万次同步网络请求,这会成为严重瓶颈。正确做法是批量提交:处理一批消息后提交一次。

commitAsync():非阻塞、高性能、不自动重试

consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        log.error("Async commit failed for offsets {}: {}", offsets, exception);
        // 注意:这里不应该重试!原因见下方分析
    } else {
        log.debug("Offsets committed: {}", offsets);
    }
});
// 立即返回,不等待 Coordinator 响应

commitAsync() 发送位移提交请求后立即返回,通过 Callback 通知结果。它永远不会自动重试失败的提交

为什么 commitAsync 不能自动重试?

考虑以下场景:

时间线:
T1: commitAsync 提交 offset=100,请求 A 发出
T2: 请求 A 网络超时(未知是否到达 Coordinator)
T3: commitAsync 提交 offset=200,请求 B 成功,Coordinator 记录 offset=200
T4: 请求 A 超时,自动重试,发出请求 C 提交 offset=100
T5: 请求 C 到达 Coordinator,将 offset 从 200 回退到 100!
T6: Consumer 重启,从 offset=100 开始消费 → 100-200 之间的消息被重复消费

自动重试会导致更早的位移覆盖更晚的位移,产生"位移回退"(offset regression)。因此 commitAsync 的设计决定是:不重试,让应用层决定是否重试(通常是接受这次提交失败,等下一批消息处理完再提交更新的 offset)。

最佳实践:组合使用两种提交

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        
        for (ConsumerRecord<String, String> record : records) {
            process(record);
        }
        
        // 正常情况下使用异步提交(不阻塞,高性能)
        consumer.commitAsync((offsets, exception) -> {
            if (exception != null) {
                log.warn("Async commit failed: {}", exception.getMessage());
                // 不重试,等下次成功提交时自然覆盖
            }
        });
    }
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        // 关闭前使用同步提交确保最后一批消息的位移被保存
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}
本章评分
4.5  / 5  (35 评分)

💬 留言讨论