副本机制:ISR、水位线与数据一致性
第15章:副本机制:ISR、水位线与数据一致性
导读:ISR、水位线与 Leader Epoch 如何协作保证数据一致性?
本章核心问题:ISR、水位线与 Leader Epoch 如何协作保证数据一致性?
读完本章你将理解:
- Follower 拉取复制模型
- 三条水位线的精确定义与推进过程
- ISR 动态管理与 min.insync.replicas
- Unclean Leader Election 的取舍
Level 1 · 你需要知道的(1-3年经验)
Kafka 的副本机制是其数据持久性和高可用性的基石。理解 ISR(In-Sync Replicas)、三条水位线(LEO/HW/Leader Epoch)以及它们在各种故障场景下的交互,是深入掌握 Kafka 数据一致性保证的必要前提。
三条水位线:Kafka 数据一致性的核心
LEO(Log End Offset):各副本的日志末端
定义:每个副本自己写入日志的最新 offset 的下一个位置(即下一条消息将被分配的 offset)。
- Leader 的 LEO = Leader 已经写入的最新 offset + 1
- Follower 的 LEO = Follower 已经从 Leader 拉取并写入的最新 offset + 1
LEO 是各副本私有的属性,不同副本的 LEO 可以不同(Follower 可能落后 Leader)。
HW(High Watermark):安全可消费的位置
定义:ISR 中所有副本 LEO 的最小值。
HW = min(Leader LEO, Follower-1 LEO, Follower-2 LEO, ...)
其中参与计算的只有 ISR 列表中的副本
为什么 HW 是安全消费的边界:
消费者只能消费到 HW 之前的消息,这保证了:如果 Leader 崩溃,新 Leader(从 ISR 中选出)一定也拥有这些消息。若消费者能消费到 HW 之后的消息,而此时 Leader 崩溃、新 Leader 没有这些消息,则会出现已消费但记录消失的"幻象读"现象。
Leader Epoch:解决 HW 截断 Bug 的关键
Leader Epoch 是 Leader 的纪元号,每次 Leader 选举时单调递增。每条消息在写入时都带有当时的 Leader Epoch 标记(存储在 RecordBatch 的 partitionLeaderEpoch 字段)。
为什么需要 Leader Epoch(KIP-101 的背景):
在 KIP-101 引入之前,Follower 在重启后会通过比对 HW 来决定是否需要截断本地日志。这个机制在特定时序下会导致数据丢失:
初始状态:
Leader(A):offset=0,1,2 LEO=3 HW=2
Follower(B):offset=0,1 LEO=2 HW=2
(消息2已写入A,但B的FetchRequest携带HW=2还未到达,HW还未更新到2)
步骤1:A宕机
步骤2:B成为新 Leader(B的LEO=2,已有0,1)
步骤3:A重启,A的本地日志有 offset 0,1,2
步骤4:A重启后看到自己的HW=2,
认为 offset=2 可能是未确认数据,截断到 offset=2(删除了消息2!)
步骤5:A作为 Follower 从新 Leader B 同步
B只有 offset 0,1,A丢失了 offset 2
Leader Epoch 的解决方案:
重启的 Follower 不再用 HW 判断是否截断,而是向当前 Leader 发送 OffsetForLeaderEpochRequest,询问"我的最后一个 Leader Epoch 在你这里对应的最后一个 offset 是多少?"。Leader 根据自己存储的 Epoch 历史回答,Follower 据此精确决定是否需要截断以及截断到哪里。
# 查看 Leader Epoch 记录
cat /data/kafka/orders-0/leader-epoch-checkpoint
# 版本号
# 1
# Epoch 和对应的起始 offset:
# 0 0 ← Epoch 0 从 offset 0 开始(初始 Leader)
# 1 5000 ← Epoch 1 从 offset 5000 开始(第一次 Leader 切换后)
# 2 10500 ← Epoch 2 从 offset 10500 开始(第二次 Leader 切换后)
ISR 管理:动态进入与移除
replica.lag.time.max.ms:落后的容忍边界
ISR(In-Sync Replicas)是 Leader 认为"足够同步"的副本集合。判断标准:
Follower 落后于 Leader 的时间超过 replica.lag.time.max.ms(默认 30000ms),则从 ISR 中移除。
注意:Kafka 3.0 之前有另一个参数 replica.lag.max.messages(基于消息数量的落后判断),在 3.0 中已被移除,因为批量生产场景下容易产生误判。
# 查看 ISR 变化日志
grep "ISR" /var/log/kafka/server.log | tail -20
# [2024-04-26 10:15:00] INFO [Partition orders-0 epoch=2] ISR updated
# from [1, 2, 3] to [1, 2] because replica 3 is no longer in-sync
# [2024-04-26 10:16:30] INFO [Partition orders-0 epoch=2] ISR updated
# from [1, 2] to [1, 2, 3] because replica 3 rejoined ISR
# 查看主题分区的 ISR 状态
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic orders
# Topic: orders Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
# Topic: orders Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3 ← ISR 缩小!
Follower 因以下原因可能被移出 ISR:
- Follower 所在 Broker 过载(磁盘慢、GC 停顿、网络拥塞)
- Follower 节点宕机
- 网络分区
min.insync.replicas:ISR 下限保护
min.insync.replicas(简称 minISR)设置了写入操作对 ISR 数量的最低要求。
当生产者配置 acks=all 时:
- 如果当前 ISR 数量 ≥
min.insync.replicas→ 写入正常进行 - 如果当前 ISR 数量 <
min.insync.replicas→ 抛出NotEnoughReplicasException
// 生产者端的处理
try {
RecordMetadata metadata = producer.send(record).get();
} catch (ExecutionException e) {
if (e.getCause() instanceof NotEnoughReplicasException) {
// ISR 数量不足,无法保证 acks=all 的写入
// 选项:
// 1. 重试(等待 ISR 恢复)
// 2. 降级:临时切换到 acks=1(接受潜在数据丢失风险)
// 3. 告警并停止写入(数据合规场景)
log.error("Cannot write: ISR below min.insync.replicas. " +
"Current ISR may be degraded.");
}
}
推荐配置:
| 场景 | replication.factor | min.insync.replicas | acks |
|---|---|---|---|
| 最高持久性(金融数据) | 3 | 2 | all |
| 平衡(一般业务) | 3 | 2 | all |
| 高吞吐(日志/指标) | 3 | 1 | 1 |
| 开发/测试 | 1 | 1 | 1 |
对于 3 副本 + minISR=2 的配置含义:允许 1 个 Follower 落后(ISR=2),但如果 2 个 Follower 都不同步(ISR=1),写入将失败,保护数据不会因写入不足副本而丢失。
Unclean Leader Election:可用性与持久性的取舍
什么是"脏选举"
当一个分区的所有 ISR 成员都挂掉时(例如 3 个副本的主机全部宕机),面临两种选择:
unclean.leader.election.enable=false(默认值,推荐):
- 等待 ISR 中的某个成员重新上线
- 期间该分区不可用(写入失败、消费者无法消费最新数据)
- 数据绝对不丢失(已提交到 HW 的数据全部保留)
unclean.leader.election.enable=true:
- 允许不在 ISR 中的落后副本成为 Leader
- 分区立即恢复可用
- 数据可能丢失(新 Leader 可能缺少 HW 之前已提交的消息)
数据丢失场景演示
3 副本分区,minISR=2,acks=all
初始状态(最近的一次写入):
Leader(Broker1): [0...100, 101] LEO=102 HW=102
Follower(Broker2): [0...100, 101] LEO=102 HW=102
Follower(Broker3): [0...100] LEO=101 HW=100 ← 刚刚落后,正在追赶
消费者已成功消费到 offset=101(HW=102 > 101)
--- 灾难事件 ---
Broker1 宕机(停电)
Broker2 宕机(网络故障)
Broker3 存活(只有 offset 0..100)
如果 unclean.leader.election.enable=true:
Broker3 成为新 Leader,HW 回退到 100
消费者已经消费了 offset=101,但新 Leader 没有 offset=101
→ 数据 offset=101 永久丢失!消费者看到了永远不会再出现的消息。
如果 unclean.leader.election.enable=false(默认):
分区不可用,等待 Broker1 或 Broker2 恢复
→ 数据完全保留
生产建议:在绝大多数场景下保持 unclean.leader.election.enable=false。仅在数据可丢失(如实时统计)且可用性比持久性更重要时,才考虑开启。
Level 2 · 它是怎么运行的(3-5年经验)
Follower 的拉取循环:复制是如何运作的
推送 vs 拉取:Kafka 选择了拉取
Kafka 的副本复制使用 Follower 主动拉取(Pull) 模型,而非 Leader 主动推送(Push)模型。这个选择背后有深刻的工程考量:
推送模型的问题:
- Leader 需要跟踪每个 Follower 的复制进度
- 某个 Follower 处理速度慢时,Leader 必须降速或在内存中积压待发送的数据
- 复制逻辑与 Leader 的写入逻辑耦合,增加复杂性
拉取模型的优势:
- Follower 以自己的速度复制,不阻塞 Leader
- Follower 可以批量拉取,与消费者行为复用同一套 FetchRequest 协议
- Leader 只需维护 Follower 的 LEO,无需管理推送队列
FetchRequest/FetchResponse 的关键字段
Follower 通过发送 FetchRequest 到 Leader 来拉取数据。与普通消费者的 Fetch 请求相比,Follower 的 FetchRequest 携带了额外的副本信息:
FetchRequest(Follower 发往 Leader):
├── replicaId: <Follower broker ID> ← 区分于消费者(消费者 replicaId=-1)
├── maxWaitMs: <fetch.max.wait.ms>
├── minBytes: <fetch.min.bytes>
├── topics:
│ └── partitions:
│ ├── partition: 0
│ ├── fetchOffset: <Follower 当前的 LEO> ← "我已经有到这里了"
│ └── maxBytes: <fetch.max.bytes>
FetchResponse(Leader 返回给 Follower):
├── topics:
│ └── partitions:
│ ├── partition: 0
│ ├── highWatermark: <Leader 当前的 HW> ← Follower 据此更新自己的 HW
│ ├── lastStableOffset: <LSO,事务相关>
│ └── records: <从 fetchOffset 开始的消息批次>
fetchOffset 字段携带了 Follower 的当前 LEO(Log End Offset)。Leader 从这个位置开始发送数据,同时在响应中包含自己最新的 HW,让 Follower 知道哪些数据已经被所有 ISR 确认。
LEO/HW 的推进过程图解
下面以一个 3 副本的主题为例,追踪一次完整的消息写入过程中 LEO 和 HW 的变化:
初始状态:所有副本同步
Leader: [0,1,2,3,4] LEO=5 HW=5
Follower1: [0,1,2,3,4] LEO=5 HW=5
Follower2: [0,1,2,3,4] LEO=5 HW=5
步骤1:Producer 发送 acks=all,消息 offset=5 写入 Leader
Leader: [0,1,2,3,4,5] LEO=6 HW=5 ← HW 未变,Follower 还未确认
Follower1: [0,1,2,3,4] LEO=5 HW=5
Follower2: [0,1,2,3,4] LEO=5 HW=5
步骤2:Follower1 发送 FetchRequest(fetchOffset=5)
Leader 返回:records=[offset5], highWatermark=5
Follower1 写入 offset=5:
Leader: [0,1,2,3,4,5] LEO=6 HW=5
Follower1: [0,1,2,3,4,5] LEO=6 HW=5 ← Follower1 从 response 得知 HW=5
步骤3:Follower2 发送 FetchRequest(fetchOffset=5),同步
Follower2: [0,1,2,3,4,5] LEO=6 HW=5
步骤4:Follower1 发送下一个 FetchRequest(fetchOffset=6)
Leader 收到:发现 Follower1 的 LEO=6(通过 fetchOffset 得知)
Leader 更新 ISR 中 Follower1 的 LEO=6
新 HW = min(Leader LEO=6, Follower1 LEO=6, Follower2 LEO=6) = 6 ← HW 推进!
Leader 在 FetchResponse 中携带 highWatermark=6
Follower1 更新自己的 HW=6
步骤5:消费者现在可以消费 offset=5 的消息了(HW=6 > 5)
Producer 收到成功响应
关键观察:HW 不是在 Leader 写入时立即推进,而是在下一轮 FetchRequest 中,Leader 通过 fetchOffset 得知 Follower 的 LEO 后才推进。这意味着 HW 的推进总是滞后于 Leader 的 LEO 推进至少一个 FetchRequest 往返时间。
Level 3 · 规范怎么定义的(资深)
本章的协议与规范细节已融入上述 Level 中。如需深入了解,请参阅 Kafka 官方协议文档和对应 KIP 提案。
Level 4 · 边界与陷阱(所有人)
副本延迟监控与告警
# 查看所有分区的 ISR 状态(找出 ISR 缩小的分区)
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe | grep -v "Isr:.*Replicas" | \
awk '{
split($9, replicas, ","); split($11, isr, ",");
if (length(replicas) != length(isr)) print $0
}'
# 监控 ISR 收缩的 JMX 指标
# kafka.server:type=ReplicaManager,name=IsrShrinks-PerSec
# kafka.server:type=ReplicaManager,name=IsrExpands-PerSec
# 监控 Follower 延迟
# kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica
# 值持续 > 0 表明有 Follower 在追赶
# Prometheus 告警规则
- alert: KafkaISRShrink
expr: |
rate(kafka_server_replicamanager_isrshrinks_total[5m]) > 0
for: 1m
labels:
severity: warning
annotations:
summary: "Kafka ISR shrinking on {{ $labels.instance }}"
- alert: KafkaUnderReplicatedPartitions
expr: |
kafka_server_replicamanager_underreplicatedpartitions > 0
for: 5m
labels:
severity: critical
annotations:
summary: "{{ $value }} under-replicated partitions detected"
ISR、水位线和 Leader Epoch 共同构成了 Kafka 分布式一致性的三角支撑。ISR 定义了"可信"的副本集合,HW 定义了"安全"的消费边界,Leader Epoch 确保了在 Leader 切换时日志不会被错误截断。三者缺一不可,共同保障了 Kafka 在面对各种局部故障时的数据完整性。