KRaft 模式:Raft 协议在 Kafka 中的实现
第3章:KRaft 模式:Raft 协议在 Kafka 中的实现
导读:KRaft 模式如何取代 ZooKeeper,彻底改变 Kafka 的元数据管理架构?
本章核心问题:KRaft 模式如何取代 ZooKeeper,彻底改变 Kafka 的元数据管理架构?
读完本章你将理解:
- ZooKeeper 的三大瓶颈与技术债
- Raft 协议核心机制:Leader 选举、日志复制、日志匹配
- KRaft 的定制实现:epoch、__cluster_metadata、Controller Quorum
- ZooKeeper 到 KRaft 的迁移实操步骤
Level 1 · 你需要知道的(1-3年经验)
KRaft:Kafka 对 Raft 的定制实现
KRaft(Kafka Raft)不是对 Raft 论文的逐字实现,而是针对 Kafka 元数据管理场景做了若干重要定制。
epoch 替代 term
KRaft 使用epoch代替 Raft 的 term,语义上完全等价,但在 Kafka 的多个层次都有 epoch 的影子:
- Controller epoch:Controller 节点的 epoch,每次 Controller 选举递增。所有 Broker 缓存当前的 Controller epoch,收到过时 epoch 的请求直接拒绝(防止脑裂状态下的旧 Controller 继续操作)。
- Leader epoch:每个 Partition 的 Leader 选举 epoch,用于消费者和 Follower 检测 Leader 变更。
__cluster_metadata Topic 即 Raft 日志
KRaft 的核心设计:将 Raft 日志实现为一个特殊的 Kafka Topic,名为 __cluster_metadata。
这个 Topic 只有一个 Partition,Controller Quorum 中的节点通过 Raft 协议维护这个 Partition 的副本。所有集群元数据变更——创建/删除 Topic、Partition Leader 变更、ISR 更新、ACL 变更、Broker 注册——都以 Record 的形式追加到这个 Topic。
__cluster_metadata Partition 0 的内容示意:
offset=0: RegisterBrokerRecord{brokerId=1, host="broker1", port=9092, ...}
offset=1: RegisterBrokerRecord{brokerId=2, host="broker2", port=9092, ...}
offset=2: TopicRecord{topicId=UUID, name="order-events"}
offset=3: PartitionRecord{topicId=..., partitionId=0, leader=1, isr=[1,2,3], ...}
offset=4: PartitionRecord{topicId=..., partitionId=1, leader=2, isr=[1,2,3], ...}
...
offset=N: PartitionChangeRecord{topicId=..., partitionId=0, leader=2, isr=[2,3]}
普通 Broker(非 Controller Quorum 成员)通过 MetadataFetch RPC 订阅这个 Topic 的更新,接收 Controller 推送的元数据变更,在本地维护一个完整的元数据缓存。这消除了 ZooKeeper 模式下 Controller 需要主动推送 LeaderAndIsrRequest 的复杂度。
Controller Quorum:3 节点还是 5 节点?
KRaft 引入了 Controller Quorum 的概念:集群中专门参与 Raft 共识的节点集合。Controller Quorum 可以是独立节点(dedicated controller mode),也可以是同时承担 Broker 职责的节点(combined mode)。
# 查看 KRaft 集群的 Controller Quorum 配置
kafka-metadata-quorum.sh --bootstrap-controller broker1:9093 describe --status
# 输出示例:
ClusterId: abc123xyz
LeaderId: 1
LeaderEpoch: 42
HighWatermark: 158392
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 12
CurrentVoters: [1,2,3]
CurrentObservers: [4,5,6,7,8]
3 节点 Quorum:容忍 1 个节点故障(多数为 2/3)。适合大多数生产环境。
5 节点 Quorum:容忍 2 个节点故障(多数为 3/5)。适合对元数据可用性要求极高、需要在滚动升级期间同时保持多个节点在线的场景。
Combined vs Dedicated Controller:
在 Combined 模式下,每个 Broker 节点同时参与 Controller Quorum,适合小集群(3-5 个节点)降低运维复杂度。在 Dedicated 模式下,Controller Quorum 是专用节点,不处理客户端流量,适合大集群(50+ Broker)保证 Controller 资源不被 I/O 负载影响。
MetadataVersion 与在线升级
MetadataVersion(MV)是 KRaft 中的一个关键概念,类似于数据库的 Schema Version。每次 Kafka 大版本引入新的元数据格式,MV 递增。集群的当前 MV 存储在 __cluster_metadata 中,所有 Controller Quorum 成员必须支持该 MV 才能参与共识。
# 查看当前 MetadataVersion
kafka-features.sh --bootstrap-server broker1:9092 describe
# 升级 MetadataVersion(在所有节点升级到新版本后执行)
kafka-features.sh --bootstrap-server broker1:9092 upgrade \
--feature metadata.version=<new-version>
MV 机制使得跨版本滚动升级成为可能:先将所有节点升级到新版本(但仍以旧 MV 运行),然后通过一条管理命令将 MV 升级,激活新功能。
KRaft vs ZooKeeper:性能对比
| 指标 | ZooKeeper 模式 | KRaft 模式 | 提升倍数 |
|---|---|---|---|
| 集群启动时间 | 数分钟(100K 分区) | 数秒 | ~10-100x |
| Controller 故障切换 | 30-60 秒 | < 1 秒 | ~60x |
| 最大 Partition 数 | ~200K | 数百万(理论无上限) | ~10x |
| 元数据传播延迟 | 秒级(大集群) | 毫秒级 | ~100x |
| 运维组件数 | 2(Kafka + ZK) | 1(Kafka) | —— |
Controller 故障切换从 30-60 秒降至毫秒级的原因:在 KRaft 模式下,所有 Controller Quorum 节点始终维护完整的元数据副本(因为 __cluster_metadata Topic 的所有 Raft 日志同步给每个 Quorum 成员)。当 Leader 宕机,新 Leader 不需要从任何外部系统加载状态,直接从本地内存继续服务,选举完成即服务恢复,耗时仅为 Raft 选举时间(通常 < 500ms)。
Level 2 · 它是怎么运行的(3-5年经验)
Raft 协议基础
在理解 KRaft 如何工作之前,需要掌握 Raft 的核心机制。Raft 是 Diego Ongaro 在 2013 年发表的分布式共识算法,设计目标是"可理解性"——相比 Paxos 更容易理解和实现。
Leader 选举
Raft 将时间划分为任期(Term),每个 Term 以选举开始。Term 是单调递增的整数,充当逻辑时钟,用于检测过时的信息。
Term 1: Leader A 当选并服务
Term 2: A 宕机,选举失败(票数分裂)
Term 3: B 当选,服务...
选举触发:Follower 在 electionTimeout(随机的 150-300ms)内没有收到 Leader 的心跳,认为 Leader 已死,转变为 Candidate,递增 Term,向所有节点发送 RequestVote RPC。
投票规则:
- 每个节点在一个 Term 内最多投一票(First-Come-First-Served)。
- 只投给日志至少和自己一样新的 Candidate(Safety 保证:确保 Leader 包含所有已提交的日志)。
- 获得多数票(N/2+1)的 Candidate 成为新 Leader。
心跳:Leader 定期向所有 Follower 发送 AppendEntries(空的或带数据的),作为心跳保持权威,阻止 Follower 发起新选举。
日志复制
Leader 将客户端请求封装为日志条目(Log Entry),通过 AppendEntries RPC 并行发送给所有 Follower。
日志结构(每个节点本地维护):
Index: [1] [2] [3] [4] [5]
Term: [1] [1] [2] [3] [3]
Data: [set x=1][set y=2][set x=3][del z][set w=4]
↑
CommitIndex=3(已提交)
提交条件:当多数节点(N/2+1)确认写入某条日志后,Leader 将该条日志标记为 Committed,并在后续的 AppendEntries 中通过 leaderCommit 字段通知 Follower 推进 CommitIndex。已提交的日志永不被覆盖(Raft Safety Property)。
日志匹配属性(Log Matching Property)
这是 Raft Safety 的核心:如果两个节点上的日志在某个 Index 处有相同的 Term,那么这两个节点在这个 Index 之前的所有日志条目完全相同。
AppendEntries 通过 prevLogIndex 和 prevLogTerm 进行一致性检查:Follower 收到 AppendEntries 后,首先验证 prevLogIndex 处的 Term 是否匹配,不匹配则拒绝,Leader 会回退并重试,直到找到双方日志分叉点。
KRaft 的实现细节:Fetch vs Vote
KRaft 在 Raft 的 AppendEntries 之上做了一个重要改变:使用 Kafka 的 Fetch 协议作为 Raft 日志复制的传输层,而不是专用的 AppendEntries RPC。
这意味着 Controller Follower 从 Controller Leader 拉取 __cluster_metadata 日志的机制,与普通消费者从 Broker 拉取消息的机制完全相同(底层调用路径的复用)。Vote 请求(对应 Raft 的 RequestVote)则使用专用的 VoteRequest API(ApiKey=52)。
# 观察 KRaft 内部流量(使用 Wireshark 或 kcat 监听 controller 端口)
# Controller port (9093) 上的主要流量:
# ApiKey=52 (Vote) - 选举投票
# ApiKey=53 (BeginQuorumEpoch) - 新 Leader 广播权威
# ApiKey=54 (EndQuorumEpoch) - Leader 主动让位(滚动升级时)
# ApiKey=1 (Fetch) - Follower 拉取 __cluster_metadata 日志
# 查看 Controller 选举历史
kafka-metadata-quorum.sh \
--bootstrap-controller controller1:9093 \
describe --status
这种设计的优雅之处在于:Kafka 的网络栈只需要处理一种消息格式(Kafka 二进制协议),Controller 和 Broker 的代码路径高度复用,降低了实现复杂度和 Bug 面。
ZooKeeper 模式下,元数据系统是两套完全不同的代码(Kafka 代码 + ZooKeeper 客户端代码 + ZooKeeper 服务端代码);KRaft 模式下,它是一套统一的代码,运行在同一个进程中,使用同一个二进制协议。这是架构层面的根本性简化。
Level 3 · 规范怎么定义的(资深)
ZooKeeper → KRaft 迁移实操
Kafka 3.7 提供了从 ZooKeeper 模式平滑迁移到 KRaft 模式的工具链,支持在不停机的情况下完成迁移(但需要滚动重启)。
迁移前提条件
# 确认当前版本 >= 3.7(支持迁移)
kafka-broker-api-versions.sh --bootstrap-server broker1:9092 | head -5
# 确认 inter.broker.protocol.version 和 log.message.format.version 已升级到最新
kafka-configs.sh --bootstrap-server broker1:9092 --entity-type brokers \
--entity-name 1 --describe | grep protocol.version
步骤一:格式化 KRaft 存储
在 新的 Controller 节点上(如果是 dedicated controller 模式),生成 Cluster UUID 并格式化存储:
# 生成唯一的 Cluster UUID(整个迁移过程使用同一个 UUID)
CLUSTER_UUID=$(kafka-storage.sh random-uuid)
echo "Cluster UUID: $CLUSTER_UUID"
# 格式化 KRaft Controller 节点的存储目录
kafka-storage.sh format \
--config /etc/kafka/controller.properties \
--cluster-id $CLUSTER_UUID \
--ignore-formatted
controller.properties 配置示例:
# KRaft Controller 配置
process.roles=controller
node.id=101
controller.quorum.voters=101@controller1:9093,102@controller2:9093,103@controller3:9093
listeners=CONTROLLER://0.0.0.0:9093
controller.listener.names=CONTROLLER
log.dirs=/var/kafka/controller-logs
步骤二:启动 KRaft Controller,触发迁移
# 启动 KRaft Controller(第一个启动的会成为迁移协调者)
kafka-server-start.sh /etc/kafka/controller.properties
# 在 ZooKeeper 模式的 Kafka Broker 上启用迁移模式
# 修改 server.properties,添加:
# zookeeper.metadata.migration.enable=true
# controller.quorum.voters=101@controller1:9093,...
# 滚动重启所有 Broker(逐一重启,不会中断服务)
步骤三:验证迁移状态
# 监控迁移进度
kafka-metadata-quorum.sh \
--bootstrap-controller controller1:9093 \
describe --status
# 检查所有 Broker 是否已切换到 KRaft 模式
kafka-metadata-quorum.sh \
--bootstrap-controller controller1:9093 \
describe --replication
# 确认无 ZooKeeper 依赖后,关闭 ZooKeeper 服务
# 将 zookeeper.metadata.migration.enable=false 从配置中移除
# 再次滚动重启
步骤四:关闭 ZooKeeper
一旦所有 Broker 都已在 KRaft 模式下运行并确认元数据一致,可以安全关闭 ZooKeeper 集群:
# 最终确认:集群完全脱离 ZooKeeper
kafka-metadata-quorum.sh \
--bootstrap-controller controller1:9093 \
describe --status | grep -i zookeeper
# 应该没有任何 ZooKeeper 相关输出
# 停止 ZooKeeper(确认无任何服务依赖后)
zkServer.sh stop
Level 4 · 边界与陷阱(所有人)
ZooKeeper:十年的技术债
Kafka 在设计之初,将集群元数据管理(Topic 配置、Partition Leader 选举、Broker 注册)交给了 Apache ZooKeeper 处理。这个选择在 2011 年是合理的——ZooKeeper 是成熟的分布式协调服务,Kafka 团队不需要重新发明分布式共识算法。
但随着 Kafka 规模扩展,ZooKeeper 的代价逐渐显现,成为无法回避的技术债。
ZooKeeper 的三大瓶颈
瓶颈一:双系统运维负担。 运营一个 Kafka 集群意味着同时运营两个分布式系统:Kafka Broker 集群 + ZooKeeper 集群(通常 3 或 5 节点)。两套监控、两套备份、两套版本升级、两套安全配置(ZooKeeper 的 SASL/ACL 与 Kafka 的 ACL 完全独立)。这不仅增加运维成本,更要命的是 ZooKeeper 对 Kafka 内部状态有深度耦合,很多 Kafka Bug 实际上是 ZooKeeper 交互导致的。
瓶颈二:~200K Partition 上限。 Kafka Controller(Broker 集群中的一个特殊角色)在启动时需要从 ZooKeeper 读取所有 Partition 的元数据,并在内存中构建完整的集群状态。当 Partition 数量超过约 200K 时,Controller 启动时间超过 10 分钟,Controller 故障切换需要 30-60 秒(新 Controller 需要重新从 ZooKeeper 加载所有状态)。
ZooKeeper 模式下的 Controller 故障切换流程:
1. 检测到 Controller Broker 宕机(ZooKeeper Session 超时:30s)
2. 其他 Broker 竞争 /controller ZNode(选举:秒级)
3. 新 Controller 从 ZooKeeper 读取全量元数据(Partition 越多越慢)
4. 新 Controller 向所有 Broker 发送 LeaderAndIsrRequest 更新状态
总耗时:30-60 秒,期间生产和消费被中断
瓶颈三:元数据传播延迟。 ZooKeeper Watch 机制是事件驱动的,但 Kafka 的 Controller 必须将元数据变更主动推送给所有 Broker(通过 LeaderAndIsrRequest、UpdateMetadataRequest 等)。在大集群(1000+ Broker,100K+ Partition)下,这个推送过程本身就可能耗时数秒,导致集群在 Partition Leader 选举期间长时间处于不一致状态。