第 15 章

副本机制:ISR、水位线与数据一致性

第15章:副本机制:ISR、水位线与数据一致性

导读:ISR、水位线与 Leader Epoch 如何协作保证数据一致性?

本章核心问题:ISR、水位线与 Leader Epoch 如何协作保证数据一致性?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

Kafka 的副本机制是其数据持久性和高可用性的基石。理解 ISR(In-Sync Replicas)、三条水位线(LEO/HW/Leader Epoch)以及它们在各种故障场景下的交互,是深入掌握 Kafka 数据一致性保证的必要前提。

三条水位线:Kafka 数据一致性的核心

LEO(Log End Offset):各副本的日志末端

定义:每个副本自己写入日志的最新 offset 的下一个位置(即下一条消息将被分配的 offset)。

LEO 是各副本私有的属性,不同副本的 LEO 可以不同(Follower 可能落后 Leader)。

HW(High Watermark):安全可消费的位置

定义:ISR 中所有副本 LEO 的最小值。

HW = min(Leader LEO, Follower-1 LEO, Follower-2 LEO, ...)
     其中参与计算的只有 ISR 列表中的副本

为什么 HW 是安全消费的边界

消费者只能消费到 HW 之前的消息,这保证了:如果 Leader 崩溃,新 Leader(从 ISR 中选出)一定也拥有这些消息。若消费者能消费到 HW 之后的消息,而此时 Leader 崩溃、新 Leader 没有这些消息,则会出现已消费但记录消失的"幻象读"现象。

Leader Epoch:解决 HW 截断 Bug 的关键

Leader Epoch 是 Leader 的纪元号,每次 Leader 选举时单调递增。每条消息在写入时都带有当时的 Leader Epoch 标记(存储在 RecordBatch 的 partitionLeaderEpoch 字段)。

为什么需要 Leader Epoch(KIP-101 的背景)

在 KIP-101 引入之前,Follower 在重启后会通过比对 HW 来决定是否需要截断本地日志。这个机制在特定时序下会导致数据丢失:

初始状态:
Leader(A):offset=0,1,2   LEO=3   HW=2
Follower(B):offset=0,1   LEO=2   HW=2
(消息2已写入A,但B的FetchRequest携带HW=2还未到达,HW还未更新到2)

步骤1:A宕机
步骤2:B成为新 Leader(B的LEO=2,已有0,1)
步骤3:A重启,A的本地日志有 offset 0,1,2
步骤4:A重启后看到自己的HW=2,
       认为 offset=2 可能是未确认数据,截断到 offset=2(删除了消息2!)
步骤5:A作为 Follower 从新 Leader B 同步
       B只有 offset 0,1,A丢失了 offset 2

Leader Epoch 的解决方案

重启的 Follower 不再用 HW 判断是否截断,而是向当前 Leader 发送 OffsetForLeaderEpochRequest,询问"我的最后一个 Leader Epoch 在你这里对应的最后一个 offset 是多少?"。Leader 根据自己存储的 Epoch 历史回答,Follower 据此精确决定是否需要截断以及截断到哪里。

# 查看 Leader Epoch 记录
cat /data/kafka/orders-0/leader-epoch-checkpoint
# 版本号
# 1
# Epoch 和对应的起始 offset:
# 0 0       ← Epoch 0 从 offset 0 开始(初始 Leader)
# 1 5000    ← Epoch 1 从 offset 5000 开始(第一次 Leader 切换后)
# 2 10500   ← Epoch 2 从 offset 10500 开始(第二次 Leader 切换后)

ISR 管理:动态进入与移除

replica.lag.time.max.ms:落后的容忍边界

ISR(In-Sync Replicas)是 Leader 认为"足够同步"的副本集合。判断标准:

Follower 落后于 Leader 的时间超过 replica.lag.time.max.ms(默认 30000ms),则从 ISR 中移除。

注意:Kafka 3.0 之前有另一个参数 replica.lag.max.messages(基于消息数量的落后判断),在 3.0 中已被移除,因为批量生产场景下容易产生误判。

# 查看 ISR 变化日志
grep "ISR" /var/log/kafka/server.log | tail -20
# [2024-04-26 10:15:00] INFO [Partition orders-0 epoch=2] ISR updated
#   from [1, 2, 3] to [1, 2] because replica 3 is no longer in-sync
# [2024-04-26 10:16:30] INFO [Partition orders-0 epoch=2] ISR updated
#   from [1, 2] to [1, 2, 3] because replica 3 rejoined ISR

# 查看主题分区的 ISR 状态
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic orders
# Topic: orders  Partition: 0  Leader: 1  Replicas: 1,2,3  Isr: 1,2,3
# Topic: orders  Partition: 1  Leader: 2  Replicas: 2,3,1  Isr: 2,3    ← ISR 缩小!

Follower 因以下原因可能被移出 ISR:

min.insync.replicas:ISR 下限保护

min.insync.replicas(简称 minISR)设置了写入操作对 ISR 数量的最低要求。

当生产者配置 acks=all 时:

// 生产者端的处理
try {
    RecordMetadata metadata = producer.send(record).get();
} catch (ExecutionException e) {
    if (e.getCause() instanceof NotEnoughReplicasException) {
        // ISR 数量不足,无法保证 acks=all 的写入
        // 选项:
        // 1. 重试(等待 ISR 恢复)
        // 2. 降级:临时切换到 acks=1(接受潜在数据丢失风险)
        // 3. 告警并停止写入(数据合规场景)
        log.error("Cannot write: ISR below min.insync.replicas. " +
                  "Current ISR may be degraded.");
    }
}

推荐配置

场景 replication.factor min.insync.replicas acks
最高持久性(金融数据) 3 2 all
平衡(一般业务) 3 2 all
高吞吐(日志/指标) 3 1 1
开发/测试 1 1 1

对于 3 副本 + minISR=2 的配置含义:允许 1 个 Follower 落后(ISR=2),但如果 2 个 Follower 都不同步(ISR=1),写入将失败,保护数据不会因写入不足副本而丢失。

Unclean Leader Election:可用性与持久性的取舍

什么是"脏选举"

当一个分区的所有 ISR 成员都挂掉时(例如 3 个副本的主机全部宕机),面临两种选择:

unclean.leader.election.enable=false(默认值,推荐):

unclean.leader.election.enable=true

数据丢失场景演示

3 副本分区,minISR=2,acks=all

初始状态(最近的一次写入):
Leader(Broker1):   [0...100, 101]  LEO=102  HW=102
Follower(Broker2): [0...100, 101]  LEO=102  HW=102
Follower(Broker3): [0...100]       LEO=101  HW=100  ← 刚刚落后,正在追赶

消费者已成功消费到 offset=101(HW=102 > 101)

--- 灾难事件 ---
Broker1 宕机(停电)
Broker2 宕机(网络故障)
Broker3 存活(只有 offset 0..100)

如果 unclean.leader.election.enable=true:
Broker3 成为新 Leader,HW 回退到 100
消费者已经消费了 offset=101,但新 Leader 没有 offset=101
→ 数据 offset=101 永久丢失!消费者看到了永远不会再出现的消息。

如果 unclean.leader.election.enable=false(默认):
分区不可用,等待 Broker1 或 Broker2 恢复
→ 数据完全保留

生产建议:在绝大多数场景下保持 unclean.leader.election.enable=false。仅在数据可丢失(如实时统计)且可用性比持久性更重要时,才考虑开启。


Level 2 · 它是怎么运行的(3-5年经验)

Follower 的拉取循环:复制是如何运作的

推送 vs 拉取:Kafka 选择了拉取

Kafka 的副本复制使用 Follower 主动拉取(Pull) 模型,而非 Leader 主动推送(Push)模型。这个选择背后有深刻的工程考量:

推送模型的问题

拉取模型的优势

FetchRequest/FetchResponse 的关键字段

Follower 通过发送 FetchRequest 到 Leader 来拉取数据。与普通消费者的 Fetch 请求相比,Follower 的 FetchRequest 携带了额外的副本信息:

FetchRequest(Follower 发往 Leader):
├── replicaId: <Follower broker ID>  ← 区分于消费者(消费者 replicaId=-1)
├── maxWaitMs: <fetch.max.wait.ms>
├── minBytes: <fetch.min.bytes>
├── topics:
│   └── partitions:
│       ├── partition: 0
│       ├── fetchOffset: <Follower 当前的 LEO>  ← "我已经有到这里了"
│       └── maxBytes: <fetch.max.bytes>

FetchResponse(Leader 返回给 Follower):
├── topics:
│   └── partitions:
│       ├── partition: 0
│       ├── highWatermark: <Leader 当前的 HW>  ← Follower 据此更新自己的 HW
│       ├── lastStableOffset: <LSO,事务相关>
│       └── records: <从 fetchOffset 开始的消息批次>

fetchOffset 字段携带了 Follower 的当前 LEO(Log End Offset)。Leader 从这个位置开始发送数据,同时在响应中包含自己最新的 HW,让 Follower 知道哪些数据已经被所有 ISR 确认。

LEO/HW 的推进过程图解

下面以一个 3 副本的主题为例,追踪一次完整的消息写入过程中 LEO 和 HW 的变化:

初始状态:所有副本同步
Leader:    [0,1,2,3,4]  LEO=5  HW=5
Follower1: [0,1,2,3,4]  LEO=5  HW=5
Follower2: [0,1,2,3,4]  LEO=5  HW=5

步骤1:Producer 发送 acks=all,消息 offset=5 写入 Leader
Leader:    [0,1,2,3,4,5]  LEO=6  HW=5  ← HW 未变,Follower 还未确认
Follower1: [0,1,2,3,4]    LEO=5  HW=5
Follower2: [0,1,2,3,4]    LEO=5  HW=5

步骤2:Follower1 发送 FetchRequest(fetchOffset=5)
Leader 返回:records=[offset5], highWatermark=5
Follower1 写入 offset=5:
Leader:    [0,1,2,3,4,5]  LEO=6  HW=5
Follower1: [0,1,2,3,4,5]  LEO=6  HW=5  ← Follower1 从 response 得知 HW=5

步骤3:Follower2 发送 FetchRequest(fetchOffset=5),同步
Follower2: [0,1,2,3,4,5]  LEO=6  HW=5

步骤4:Follower1 发送下一个 FetchRequest(fetchOffset=6)
Leader 收到:发现 Follower1 的 LEO=6(通过 fetchOffset 得知)
Leader 更新 ISR 中 Follower1 的 LEO=6
新 HW = min(Leader LEO=6, Follower1 LEO=6, Follower2 LEO=6) = 6 ← HW 推进!
Leader 在 FetchResponse 中携带 highWatermark=6
Follower1 更新自己的 HW=6

步骤5:消费者现在可以消费 offset=5 的消息了(HW=6 > 5)
Producer 收到成功响应

关键观察:HW 不是在 Leader 写入时立即推进,而是在下一轮 FetchRequest 中,Leader 通过 fetchOffset 得知 Follower 的 LEO 后才推进。这意味着 HW 的推进总是滞后于 Leader 的 LEO 推进至少一个 FetchRequest 往返时间。


Level 3 · 规范怎么定义的(资深)

本章的协议与规范细节已融入上述 Level 中。如需深入了解,请参阅 Kafka 官方协议文档和对应 KIP 提案。


Level 4 · 边界与陷阱(所有人)

副本延迟监控与告警

# 查看所有分区的 ISR 状态(找出 ISR 缩小的分区)
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe | grep -v "Isr:.*Replicas" | \
  awk '{
    split($9, replicas, ","); split($11, isr, ",");
    if (length(replicas) != length(isr)) print $0
  }'

# 监控 ISR 收缩的 JMX 指标
# kafka.server:type=ReplicaManager,name=IsrShrinks-PerSec
# kafka.server:type=ReplicaManager,name=IsrExpands-PerSec

# 监控 Follower 延迟
# kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica
# 值持续 > 0 表明有 Follower 在追赶
# Prometheus 告警规则
- alert: KafkaISRShrink
  expr: |
    rate(kafka_server_replicamanager_isrshrinks_total[5m]) > 0
  for: 1m
  labels:
    severity: warning
  annotations:
    summary: "Kafka ISR shrinking on {{ $labels.instance }}"

- alert: KafkaUnderReplicatedPartitions
  expr: |
    kafka_server_replicamanager_underreplicatedpartitions > 0
  for: 5m
  labels:
    severity: critical
  annotations:
    summary: "{{ $value }} under-replicated partitions detected"

ISR、水位线和 Leader Epoch 共同构成了 Kafka 分布式一致性的三角支撑。ISR 定义了"可信"的副本集合,HW 定义了"安全"的消费边界,Leader Epoch 确保了在 Leader 切换时日志不会被错误截断。三者缺一不可,共同保障了 Kafka 在面对各种局部故障时的数据完整性。

本章评分
4.8  / 5  (18 评分)

💬 留言讨论