Consumer Group 协议:加入、同步与心跳
第9章:Consumer Group 协议:加入、同步与心跳
导读:Consumer Group 的 Rebalance 协议是如何工作的?
本章核心问题:Consumer Group 的 Rebalance 协议是如何工作的?
读完本章你将理解:
- GroupCoordinator 的定位机制
- Rebalance 五步协议全流程
- 三种分配策略的源码级对比
- max.poll.interval.ms 与 session.timeout.ms 的区别
Level 1 · 你需要知道的(1-3年经验)
Consumer Group 是 Kafka 最精妙的设计之一:多个消费者实例共同消费一个或多个 Topic,分区在组成员之间自动分配,成员的加入和离开触发重新平衡(Rebalance)。这一切看起来自动发生,但背后是一套精确定义的分布式协议——理解它,你才能知道为什么 Rebalance 会打断消费,为什么心跳超时会踢出成员,以及为什么 CooperativeStickyAssignor 可以让 Rebalance 不停止消费。
Rebalance 协议:五步舞蹈
Consumer Group 的 Rebalance 由以下五个步骤组成,每一步都是显式的网络请求:
第一步:FindCoordinator
Consumer 启动或 Coordinator 变更时,发送 FindCoordinatorRequest,获得 Coordinator 的地址。这是所有后续通信的前提。
第二步:JoinGroup
所有组成员向 Coordinator 发送 JoinGroupRequest,携带:
memberId:新成员发空字符串,重连成员发上次分配的 IDprotocols:该成员支持的分配策略列表(如range,roundrobin,cooperative-sticky)groupInstanceId:Static Membership 时使用(见后文)sessionTimeoutMs:心跳超时时间rebalanceTimeoutMs:即max.poll.interval.ms
Coordinator 收到第一个 JoinGroupRequest 后等待 group.initial.rebalance.delay.ms(默认 3 秒)以收集所有成员的请求,然后:
- 选择所有成员都支持的分配策略(投票机制:多数派策略胜出)
- 从所有成员中选择一个作为 Group Leader(通常是第一个加入的成员)
- 为每个成员分配唯一的
memberId - 向所有成员返回
JoinGroupResponse
关键设计:Coordinator 只将完整成员信息(包括每个成员的 Subscription)发给 Group Leader,其他成员只收到自己的信息。分区分配算法在 客户端(Group Leader 那侧)运行,不在 Broker。
// 客户端 ConsumerCoordinator 中的 JoinGroup 逻辑(简化)
private boolean joinGroupIfNeeded(Timer timer) {
// 发送 JoinGroupRequest,等待响应
JoinGroupResponse joinResponse = sendJoinGroupRequest();
if (joinResponse.isLeader()) {
// 本实例被选为 Leader,执行分配算法
Map<String, Assignment> assignments = performAssignment(
joinResponse.leaderId(),
joinResponse.members(), // 包含所有成员的 subscription
joinResponse.protocolName()
);
// 将分配结果打包进 SyncGroupRequest 发送给 Coordinator
}
return true;
}
第三步:执行分配(Leader 本地)
Group Leader 拿到所有成员的 Topic 订阅信息后,在本地运行选定的分配策略(Range、RoundRobin 或 CooperativeStickyAssignor),计算出每个成员应负责哪些分区。
这个分配计算是纯内存操作,不需要任何网络调用,因此即使 Consumer Group 有 1000 个成员,分配计算也能在毫秒内完成。
第四步:SyncGroup
Leader 将分配结果通过 SyncGroupRequest 发给 Coordinator:
Leader: SyncGroupRequest { assignments: {member1→[tp0,tp1], member2→[tp2,tp3], ...} }
Others: SyncGroupRequest { assignments: {} } // 非 Leader 发空 assignments
Coordinator 收到 Leader 的分配后,将其存储,然后通过对应每个成员的 SyncGroupResponse 分发各自的分配结果。
所有成员收到 SyncGroupResponse 后,知道自己负责哪些分区,开始 poll() 消费。
第五步:Heartbeat 心跳循环
成员加入 Group 后,在单独的后台线程(kafka-coordinator-heartbeat-thread | {groupId})中定期发送 HeartbeatRequest,间隔由 heartbeat.interval.ms 控制(默认 3000ms)。
Coordinator 在以下情况下判定成员失效:
session.timeout.ms(默认 45000ms)内没有收到心跳- Coordinator 宕机并重启(成员需重新 JoinGroup)
成员失效触发 Rebalance:Coordinator 向其余成员的下一个 HeartbeatResponse 中携带 REBALANCE_IN_PROGRESS 错误码,通知它们重新发起 JoinGroupRequest。
三种分配策略的源码级解析
RangeAssignor(范围分配)
// 核心逻辑(简化)
for (String topic : sortedTopics) {
List<String> sortedMembers = sortedMemberIds;
int numPartitions = partitionsPerTopic.get(topic);
for (int i = 0; i < sortedMembers.size(); i++) {
String memberId = sortedMembers.get(i);
int start = numPartitions / sortedMembers.size() * i
+ Math.min(i, numPartitions % sortedMembers.size());
int length = numPartitions / sortedMembers.size()
+ (i < numPartitions % sortedMembers.size() ? 1 : 0);
// 分配 [start, start+length) 范围的分区给该成员
}
}
特点:对每个 Topic 独立进行范围分割。如果有 10 个分区和 3 个成员:
- Member-1 → 分区 0,1,2,3
- Member-2 → 分区 4,5,6
- Member-3 → 分区 7,8,9
缺点:如果订阅了多个 Topic,Member-1 总是分到前几个分区,可能导致负载不均衡(特别是当不同 Topic 的分区数不同时)。
RoundRobinAssignor(轮询分配)
// 将所有 TopicPartition 按字母序排列,轮询分配给所有成员
CircularIterator<String> memberIterator = new CircularIterator<>(sortedMembers);
for (TopicPartition tp : allPartitionsSorted) {
String memberId = memberIterator.next();
// 如果该成员不订阅该 topic,跳过
while (!memberSubscribes(memberId, tp.topic())) {
memberId = memberIterator.next();
}
assignment.get(memberId).add(tp);
}
特点:所有 TopicPartition 轮询分发,负载比 Range 更均匀。
缺点:Rebalance 后分配结果往往完全不同于上次,所有成员都需要重新开始消费新分区,会造成大量重复消费(从最后提交的位移开始)。
CooperativeStickyAssignor(协作粘性分配)
这是 Kafka 2.4+ 引入的最先进的分配策略,解决了传统 Rebalance 的两个核心问题:
问题一(传统 Rebalance):所有成员在 Rebalance 期间停止消费(Stop-the-World),等待分配完成。对于有 100 个成员的大型 Consumer Group,这段暂停可能长达数秒。
问题二(传统 Rebalance):即使大部分分配不需要改变,所有分区也被重新分配,导致缓存失效和重复消费。
CooperativeStickyAssignor 的解决方案:两轮 Rebalance
第一轮(识别需要移动的分区):
- 每个成员在
JoinGroupRequest中携带自己当前持有的分区(owned partitions) - Leader 计算新的理想分配,识别哪些分区需要从一个成员移到另一个成员
- 在第一轮
SyncGroupResponse中:- 告知持有"需要移动的分区"的成员:放弃这些分区(revoke)
- 其他分区保持不变(不需要移动的分区继续消费,不停止)
第二轮(实际迁移):
- 被告知放弃分区的成员停止消费这些分区,发起第二次
JoinGroup - Leader 将这些分区分配给目标成员
关键效果:在整个 Rebalance 过程中,不需要移动的分区一直在被消费,不停机。只有真正需要迁移的分区会短暂停止消费。
# 配置使用 CooperativeStickyAssignor
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 \
--describe --group my-group # 观察 Rebalance 对消费的影响
// Java 配置
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
max.poll.interval.ms:另一种超时机制
除了 session.timeout.ms(心跳超时),还有第二种踢出成员的机制:max.poll.interval.ms(默认 300000ms = 5 分钟)。
这个参数控制两次 poll() 调用之间的最大间隔时间。如果 Consumer 超过这个时间没有调用 poll(),Coordinator 会认为该成员"卡住了"并将其踢出 Group,触发 Rebalance。
心跳线程 vs poll() 超时的区别:
| 超时类型 | 参数 | 检测机制 | 典型场景 |
|---|---|---|---|
| 心跳超时 | session.timeout.ms |
心跳线程独立发送,Consumer 死亡 | 进程崩溃、网络中断 |
| 轮询超时 | max.poll.interval.ms |
Coordinator 侧计时 | 消息处理时间过长,Consumer 阻塞 |
典型问题场景:
// 危险代码:处理时间可能超过 max.poll.interval.ms
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 如果这个处理超过 max.poll.interval.ms(默认 5 分钟)
// Coordinator 会踢出本成员,触发 Rebalance
slowExternalServiceCall(record.value()); // 可能耗时 10 分钟
}
}
解决方案:
- 调大
max.poll.interval.ms(但会延迟 Coordinator 发现成员卡住的时间) - 减少
max.poll.records(每次 poll 取更少的记录,减少每轮处理时间) - 将耗时处理移到异步线程(但需要手动管理位移提交的顺序性)
Static Membership:避免重启触发 Rebalance
默认情况下,Consumer 每次重启都会获得新的 memberId,Coordinator 认为是新成员,触发 Rebalance。在 CI/CD 频繁部署的场景下,这会导致频繁的 Rebalance 和消费中断。
Kafka 2.3+ 引入 Static Membership(静态成员资格):
// 配置静态成员 ID
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "consumer-instance-1");
配置了 group.instance.id 的 Consumer 重启后,Coordinator 识别出是同一个"静态成员"重新上线,直接恢复其原来的分区分配,不触发 Rebalance。
静态成员的超时由 session.timeout.ms 控制:在超时时间内重启上来,分区立即恢复;超过超时时间,Coordinator 才触发 Rebalance 重新分配该成员的分区。
# 观察 Consumer Group 的详细状态
kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092 \
--describe \
--group my-group \
--members \
--verbose
# 输出示例:
# GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS
# my-group consumer-1-xxx /10.0.0.1 consumer-1 4
# my-group consumer-2-xxx /10.0.0.2 consumer-2 4
# my-group consumer-3-xxx /10.0.0.3 consumer-3 4
理解 Consumer Group 协议的每个细节——从 Coordinator 选择,到两轮 Rebalance,到 Generation ID 的隔离语义——是诊断生产环境中消费延迟、重复消费和 Rebalance 风暴等问题的基础。下一章我们深入 Offset 的一切:它存在哪里,怎么提交,以及为什么 commitAsync 不能自动重试。
Level 2 · 它是怎么运行的(3-5年经验)
本章的内部原理内容已整合到 Level 1 和 Level 3 中,请结合阅读。
Level 3 · 规范怎么定义的(资深)
GroupCoordinator:谁来主持协议
第一个问题:Consumer 应该找哪个 Broker 协商加入 Group?
答案由以下公式决定:
coordinator_partition = abs(hash(groupId)) % __consumer_offsets.num.partitions
__consumer_offsets topic 默认 50 个分区。GroupCoordinator 是持有 coordinator_partition 这个分区的 Leader 的 Broker。
Consumer 通过以下步骤找到 Coordinator:
# 1. 发送 FindCoordinator 请求给任意 Broker
# Request: key=groupId, keyType=GROUP
# Response: 返回 coordinator 的 nodeId, host, port
# 2. 之后所有 Group 管理请求都发给这个 Coordinator
为什么是 __consumer_offsets 的分区 Leader?
这个设计将 GroupCoordinator 功能和位移存储功能绑在同一个 Broker,使得 Coordinator 可以直接在本地写入和读取位移数据,避免跨 Broker 的远程调用,同时 Leader 切换时两个功能一起迁移,保持一致性。
Level 4 · 边界与陷阱(所有人)
Generation ID:隔离过时请求
每次 Rebalance 后,Coordinator 将 Generation ID 加 1。所有成员在各种请求中携带当前的 Generation ID。
如果 Coordinator 收到旧 Generation 的请求(例如一个响应慢的成员发来的位移提交),会直接拒绝,返回 ILLEGAL_GENERATION 错误。这防止了"过时成员"用旧分配的位移污染新 Generation 的位移记录。
// Consumer 收到 ILLEGAL_GENERATION 后的处理
if (error == Errors.ILLEGAL_GENERATION) {
// 当前 Generation 已过时,触发重新加入 Group
coordinator.requestRejoin("illegal generation");
}