第 9 章

Consumer Group 协议:加入、同步与心跳

第9章:Consumer Group 协议:加入、同步与心跳

导读:Consumer Group 的 Rebalance 协议是如何工作的?

本章核心问题:Consumer Group 的 Rebalance 协议是如何工作的?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

Consumer Group 是 Kafka 最精妙的设计之一:多个消费者实例共同消费一个或多个 Topic,分区在组成员之间自动分配,成员的加入和离开触发重新平衡(Rebalance)。这一切看起来自动发生,但背后是一套精确定义的分布式协议——理解它,你才能知道为什么 Rebalance 会打断消费,为什么心跳超时会踢出成员,以及为什么 CooperativeStickyAssignor 可以让 Rebalance 不停止消费。

Rebalance 协议:五步舞蹈

Consumer Group 的 Rebalance 由以下五个步骤组成,每一步都是显式的网络请求:

第一步:FindCoordinator

Consumer 启动或 Coordinator 变更时,发送 FindCoordinatorRequest,获得 Coordinator 的地址。这是所有后续通信的前提。

第二步:JoinGroup

所有组成员向 Coordinator 发送 JoinGroupRequest,携带:

Coordinator 收到第一个 JoinGroupRequest 后等待 group.initial.rebalance.delay.ms(默认 3 秒)以收集所有成员的请求,然后:

  1. 选择所有成员都支持的分配策略(投票机制:多数派策略胜出)
  2. 从所有成员中选择一个作为 Group Leader(通常是第一个加入的成员)
  3. 为每个成员分配唯一的 memberId
  4. 向所有成员返回 JoinGroupResponse

关键设计:Coordinator 只将完整成员信息(包括每个成员的 Subscription)发给 Group Leader,其他成员只收到自己的信息。分区分配算法在 客户端(Group Leader 那侧)运行,不在 Broker。

// 客户端 ConsumerCoordinator 中的 JoinGroup 逻辑(简化)
private boolean joinGroupIfNeeded(Timer timer) {
    // 发送 JoinGroupRequest,等待响应
    JoinGroupResponse joinResponse = sendJoinGroupRequest();
    
    if (joinResponse.isLeader()) {
        // 本实例被选为 Leader,执行分配算法
        Map<String, Assignment> assignments = performAssignment(
            joinResponse.leaderId(),
            joinResponse.members(),  // 包含所有成员的 subscription
            joinResponse.protocolName()
        );
        // 将分配结果打包进 SyncGroupRequest 发送给 Coordinator
    }
    return true;
}

第三步:执行分配(Leader 本地)

Group Leader 拿到所有成员的 Topic 订阅信息后,在本地运行选定的分配策略(Range、RoundRobin 或 CooperativeStickyAssignor),计算出每个成员应负责哪些分区。

这个分配计算是纯内存操作,不需要任何网络调用,因此即使 Consumer Group 有 1000 个成员,分配计算也能在毫秒内完成。

第四步:SyncGroup

Leader 将分配结果通过 SyncGroupRequest 发给 Coordinator:

Leader:    SyncGroupRequest { assignments: {member1→[tp0,tp1], member2→[tp2,tp3], ...} }
Others:    SyncGroupRequest { assignments: {} }  // 非 Leader 发空 assignments

Coordinator 收到 Leader 的分配后,将其存储,然后通过对应每个成员的 SyncGroupResponse 分发各自的分配结果。

所有成员收到 SyncGroupResponse 后,知道自己负责哪些分区,开始 poll() 消费。

第五步:Heartbeat 心跳循环

成员加入 Group 后,在单独的后台线程(kafka-coordinator-heartbeat-thread | {groupId})中定期发送 HeartbeatRequest,间隔由 heartbeat.interval.ms 控制(默认 3000ms)。

Coordinator 在以下情况下判定成员失效:

成员失效触发 Rebalance:Coordinator 向其余成员的下一个 HeartbeatResponse 中携带 REBALANCE_IN_PROGRESS 错误码,通知它们重新发起 JoinGroupRequest

三种分配策略的源码级解析

RangeAssignor(范围分配)

// 核心逻辑(简化)
for (String topic : sortedTopics) {
    List<String> sortedMembers = sortedMemberIds;
    int numPartitions = partitionsPerTopic.get(topic);
    
    for (int i = 0; i < sortedMembers.size(); i++) {
        String memberId = sortedMembers.get(i);
        int start = numPartitions / sortedMembers.size() * i 
                  + Math.min(i, numPartitions % sortedMembers.size());
        int length = numPartitions / sortedMembers.size() 
                   + (i < numPartitions % sortedMembers.size() ? 1 : 0);
        // 分配 [start, start+length) 范围的分区给该成员
    }
}

特点:对每个 Topic 独立进行范围分割。如果有 10 个分区和 3 个成员:

缺点:如果订阅了多个 Topic,Member-1 总是分到前几个分区,可能导致负载不均衡(特别是当不同 Topic 的分区数不同时)。

RoundRobinAssignor(轮询分配)

// 将所有 TopicPartition 按字母序排列,轮询分配给所有成员
CircularIterator<String> memberIterator = new CircularIterator<>(sortedMembers);
for (TopicPartition tp : allPartitionsSorted) {
    String memberId = memberIterator.next();
    // 如果该成员不订阅该 topic,跳过
    while (!memberSubscribes(memberId, tp.topic())) {
        memberId = memberIterator.next();
    }
    assignment.get(memberId).add(tp);
}

特点:所有 TopicPartition 轮询分发,负载比 Range 更均匀。

缺点:Rebalance 后分配结果往往完全不同于上次,所有成员都需要重新开始消费新分区,会造成大量重复消费(从最后提交的位移开始)。

CooperativeStickyAssignor(协作粘性分配)

这是 Kafka 2.4+ 引入的最先进的分配策略,解决了传统 Rebalance 的两个核心问题:

问题一(传统 Rebalance):所有成员在 Rebalance 期间停止消费(Stop-the-World),等待分配完成。对于有 100 个成员的大型 Consumer Group,这段暂停可能长达数秒。

问题二(传统 Rebalance):即使大部分分配不需要改变,所有分区也被重新分配,导致缓存失效和重复消费。

CooperativeStickyAssignor 的解决方案:两轮 Rebalance

第一轮(识别需要移动的分区)

  1. 每个成员在 JoinGroupRequest 中携带自己当前持有的分区(owned partitions)
  2. Leader 计算新的理想分配,识别哪些分区需要从一个成员移到另一个成员
  3. 在第一轮 SyncGroupResponse 中:
    • 告知持有"需要移动的分区"的成员:放弃这些分区(revoke)
    • 其他分区保持不变(不需要移动的分区继续消费,不停止

第二轮(实际迁移)

  1. 被告知放弃分区的成员停止消费这些分区,发起第二次 JoinGroup
  2. Leader 将这些分区分配给目标成员

关键效果:在整个 Rebalance 过程中,不需要移动的分区一直在被消费,不停机。只有真正需要迁移的分区会短暂停止消费。

# 配置使用 CooperativeStickyAssignor
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 \
  --describe --group my-group  # 观察 Rebalance 对消费的影响
// Java 配置
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
    CooperativeStickyAssignor.class.getName());

max.poll.interval.ms:另一种超时机制

除了 session.timeout.ms(心跳超时),还有第二种踢出成员的机制:max.poll.interval.ms(默认 300000ms = 5 分钟)。

这个参数控制两次 poll() 调用之间的最大间隔时间。如果 Consumer 超过这个时间没有调用 poll(),Coordinator 会认为该成员"卡住了"并将其踢出 Group,触发 Rebalance。

心跳线程 vs poll() 超时的区别

超时类型 参数 检测机制 典型场景
心跳超时 session.timeout.ms 心跳线程独立发送,Consumer 死亡 进程崩溃、网络中断
轮询超时 max.poll.interval.ms Coordinator 侧计时 消息处理时间过长,Consumer 阻塞

典型问题场景

// 危险代码:处理时间可能超过 max.poll.interval.ms
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 如果这个处理超过 max.poll.interval.ms(默认 5 分钟)
        // Coordinator 会踢出本成员,触发 Rebalance
        slowExternalServiceCall(record.value()); // 可能耗时 10 分钟
    }
}

解决方案

  1. 调大 max.poll.interval.ms(但会延迟 Coordinator 发现成员卡住的时间)
  2. 减少 max.poll.records(每次 poll 取更少的记录,减少每轮处理时间)
  3. 将耗时处理移到异步线程(但需要手动管理位移提交的顺序性)

Static Membership:避免重启触发 Rebalance

默认情况下,Consumer 每次重启都会获得新的 memberId,Coordinator 认为是新成员,触发 Rebalance。在 CI/CD 频繁部署的场景下,这会导致频繁的 Rebalance 和消费中断。

Kafka 2.3+ 引入 Static Membership(静态成员资格):

// 配置静态成员 ID
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "consumer-instance-1");

配置了 group.instance.id 的 Consumer 重启后,Coordinator 识别出是同一个"静态成员"重新上线,直接恢复其原来的分区分配,不触发 Rebalance

静态成员的超时由 session.timeout.ms 控制:在超时时间内重启上来,分区立即恢复;超过超时时间,Coordinator 才触发 Rebalance 重新分配该成员的分区。

# 观察 Consumer Group 的详细状态
kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --describe \
  --group my-group \
  --members \
  --verbose

# 输出示例:
# GROUP       CONSUMER-ID               HOST          CLIENT-ID  #PARTITIONS
# my-group    consumer-1-xxx            /10.0.0.1     consumer-1  4
# my-group    consumer-2-xxx            /10.0.0.2     consumer-2  4
# my-group    consumer-3-xxx            /10.0.0.3     consumer-3  4

理解 Consumer Group 协议的每个细节——从 Coordinator 选择,到两轮 Rebalance,到 Generation ID 的隔离语义——是诊断生产环境中消费延迟、重复消费和 Rebalance 风暴等问题的基础。下一章我们深入 Offset 的一切:它存在哪里,怎么提交,以及为什么 commitAsync 不能自动重试。


Level 2 · 它是怎么运行的(3-5年经验)

本章的内部原理内容已整合到 Level 1 和 Level 3 中,请结合阅读。


Level 3 · 规范怎么定义的(资深)

GroupCoordinator:谁来主持协议

第一个问题:Consumer 应该找哪个 Broker 协商加入 Group?

答案由以下公式决定:

coordinator_partition = abs(hash(groupId)) % __consumer_offsets.num.partitions

__consumer_offsets topic 默认 50 个分区。GroupCoordinator 是持有 coordinator_partition 这个分区的 Leader 的 Broker。

Consumer 通过以下步骤找到 Coordinator:

# 1. 发送 FindCoordinator 请求给任意 Broker
# Request: key=groupId, keyType=GROUP
# Response: 返回 coordinator 的 nodeId, host, port

# 2. 之后所有 Group 管理请求都发给这个 Coordinator

为什么是 __consumer_offsets 的分区 Leader?

这个设计将 GroupCoordinator 功能和位移存储功能绑在同一个 Broker,使得 Coordinator 可以直接在本地写入和读取位移数据,避免跨 Broker 的远程调用,同时 Leader 切换时两个功能一起迁移,保持一致性。


Level 4 · 边界与陷阱(所有人)

Generation ID:隔离过时请求

每次 Rebalance 后,Coordinator 将 Generation ID 加 1。所有成员在各种请求中携带当前的 Generation ID。

如果 Coordinator 收到旧 Generation 的请求(例如一个响应慢的成员发来的位移提交),会直接拒绝,返回 ILLEGAL_GENERATION 错误。这防止了"过时成员"用旧分配的位移污染新 Generation 的位移记录。

// Consumer 收到 ILLEGAL_GENERATION 后的处理
if (error == Errors.ILLEGAL_GENERATION) {
    // 当前 Generation 已过时,触发重新加入 Group
    coordinator.requestRejoin("illegal generation");
}
本章评分
4.7  / 5  (40 评分)

💬 留言讨论