Group Coordinator 源码:Rebalance 全流程
第21章:Group Coordinator 源码:Rebalance 全流程
导读:Group Coordinator 源码如何实现 Rebalance 全流程?
本章核心问题:Group Coordinator 源码如何实现 Rebalance 全流程?
读完本章你将理解:
- GroupCoordinator 初始化与成员管理
- JoinGroup/SyncGroup 源码处理
- Generation ID 隔离语义
- 心跳线程与超时检测
Level 1 · 你需要知道的(1-3年经验)
Consumer Group 的本质
Consumer Group 是 Kafka 实现消息订阅语义的核心机制:同一个 Group 内的多个 Consumer 共同消费一个 Topic 的所有 Partition,每个 Partition 只被 Group 内的一个 Consumer 消费。当 Consumer 数量或 Topic Partition 数量发生变化时,就需要重新分配——这个过程叫做 Rebalance。
Rebalance 是 Kafka 最复杂的协议之一。它由一系列精密设计的状态转换驱动,涉及 GroupCoordinator(Broker 侧)和 ConsumerCoordinator(Client 侧)的配合。本章深入 GroupCoordinator.scala 的源码,追踪 Rebalance 的完整生命周期。
handleHeartbeat() — 维持会话
// GroupCoordinator.scala(简化)
def handleHeartbeat(
groupId: String,
memberId: String,
groupInstanceId: Option[String],
generationId: Int,
responseCallback: Errors => Unit
): Unit = {
groupManager.getGroup(groupId) match {
case None => responseCallback(Errors.UNKNOWN_MEMBER_ID)
case Some(group) =>
group.inLock {
group.currentState match {
case Dead =>
responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
case Empty =>
responseCallback(Errors.UNKNOWN_MEMBER_ID)
case CompletingRebalance =>
// Rebalance 期间收到心跳,直接返回 REBALANCE_IN_PROGRESS
// Consumer 收到后会重新发送 JoinGroupRequest
responseCallback(Errors.REBALANCE_IN_PROGRESS)
case PreparingRebalance =>
group.get(memberId) match {
case None => responseCallback(Errors.UNKNOWN_MEMBER_ID)
case Some(member) =>
// 验证 Generation ID,刷新心跳计时器
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.REBALANCE_IN_PROGRESS)
}
case Stable =>
group.get(memberId) match {
case None => responseCallback(Errors.UNKNOWN_MEMBER_ID)
case Some(member) =>
if (generationId != group.generationId) {
responseCallback(Errors.ILLEGAL_GENERATION)
} else {
// 正常心跳:刷新会话计时器
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.NONE)
}
}
}
}
}
}
心跳超时(session.timeout.ms)处理:
// GroupCoordinator.scala(简化)
private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata): Unit = {
// 取消当前的过期计时器
completeAndScheduleNextExpiration(group, member, MemberKey(group.groupId, member.memberId))
}
// 超时触发
def onMemberHeartbeatExpired(group: GroupMetadata, member: MemberMetadata): Unit = {
group.inLock {
if (member.sessionTimeoutMs <= (time.milliseconds - member.latestHeartbeat)) {
// 成员会话超时,将其踢出 Group,触发 Rebalance
info(s"Member ${member.memberId} in group ${group.groupId} has failed, " +
s"removing it from the group")
removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
}
}
}
handleLeaveGroup() — 主动离开
// GroupCoordinator.scala(简化)
def handleLeaveGroup(
groupId: String,
memberIdentities: List[MemberIdentity],
responseCallback: LeaveGroupResult => Unit
): Unit = {
groupManager.getGroup(groupId) match {
case None => responseCallback(LeaveGroupResult(Errors.UNKNOWN_MEMBER_ID, ...))
case Some(group) =>
group.inLock {
val memberErrors = memberIdentities.map { identity =>
group.get(identity.memberId) match {
case None => MemberResponse(identity.memberId, Errors.UNKNOWN_MEMBER_ID)
case Some(member) =>
// 从 Group 中移除成员,触发 Rebalance
removeMemberAndUpdateGroup(group, member, s"LeaveGroup received from member ${identity.memberId}")
MemberResponse(identity.memberId, Errors.NONE)
}
}
responseCallback(LeaveGroupResult(Errors.NONE, memberErrors))
}
}
}
Offset 管理:handleOffsetCommit() 和 handleOffsetFetch()
Consumer Group 将消费进度(Offset)提交到 __consumer_offsets Topic,由 GroupCoordinator 所在的 Broker 负责读写:
// GroupCoordinator.scala(简化)
def handleOffsetCommit(
groupId: String,
generationId: Int,
memberId: String,
groupInstanceId: Option[String],
offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit,
requestLocal: RequestLocal
): Unit = {
groupManager.getGroup(groupId) match {
case Some(group) =>
group.inLock {
// 校验 Generation(防止提交过期 Offset)
if (generationId != group.generationId && generationId != OffsetCommitRequest.DEFAULT_GENERATION_ID) {
responseCallback(offsetMetadata.map { case (tp, _) => tp -> Errors.ILLEGAL_GENERATION })
return
}
// 将 Offset 写入 __consumer_offsets(通过 ReplicaManager 追加日志)
groupManager.storeOffsets(group, memberId, groupInstanceId, offsetMetadata, responseCallback, requestLocal)
}
case None =>
// Group 不存在(可能是纯 Offset 存储,不关联 Rebalance)
groupManager.storeOffsets(GroupMetadata.loadingGroup(groupId), memberId, None, offsetMetadata, responseCallback, requestLocal)
}
}
// GroupCoordinator.scala(简化)
def handleOffsetFetch(
groupId: String,
partitions: Option[Seq[TopicPartition]], // None = 获取所有 Offset
requireStable: Boolean,
responseCallback: Map[TopicPartition, OffsetFetchResponse.PartitionData] => Unit
): Unit = {
groupManager.getGroup(groupId) match {
case None =>
// 尝试从 __consumer_offsets 加载(Group 可能在协调者重启后尚未恢复到内存)
val groupOffsets = groupManager.getOffsets(groupId, partitions)
responseCallback(groupOffsets)
case Some(group) =>
group.inLock {
if (group.is(Dead)) {
responseCallback(partitions.getOrElse(Nil).map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap)
} else {
// 优先从内存缓存读取(GroupMetadataManager 维护的 offsetsCache)
val offsets = groupManager.getOffsets(groupId, partitions)
responseCallback(offsets)
}
}
}
}
__consumer_offsets 的数据结构
Offset 提交写入 __consumer_offsets 时,key 和 value 都是二进制编码:
Key: [GroupId][TopicName][PartitionId] (版本号 + 字段)
Value: [Offset][LeaderEpoch][Metadata][CommitTimestamp][ExpireTimestamp]
GroupMetadataManager 在 Coordinator 启动时(或成为该 Group 的 Coordinator 时)从 __consumer_offsets 加载已有的 Offset 记录到内存缓存,后续的 OffsetFetch 请求直接从内存返回,无需读取磁盘。
完整 Rebalance 时序图
Consumer A Consumer B GroupCoordinator
│ │ │
│──JoinGroup───────────────────────────→│ (触发 Rebalance,状态→PreparingRebalance)
│ │──JoinGroup─────────→│ (注册成员,等待其他成员)
│ │ │
│←──JoinGroupResponse(Leader)───────────│ (所有成员 Join 完成,状态→CompletingRebalance)
│ │←──JoinGroupResp────│ (Follower 收到空的成员列表)
│ │ │
│ [执行分配算法] │ │
│──SyncGroup(assignment)────────────────→│ (持久化分配方案)
│ │──SyncGroup─────────→│ (等待 Leader 的分配)
│ │ │
│←──SyncGroupResp(partitions for A)─────│ (状态→Stable)
│ │←──SyncGroupResp────│ (Consumer B 收到自己的分配)
│ │ │
│──Heartbeat───────────────────────────→│
│←──Heartbeat OK────────────────────────│
理解了这个状态机,就能解释为什么 Rebalance 期间整个 Consumer Group 会停止消费:所有 Consumer 都在等待 JoinGroup 和 SyncGroup 的响应,而这两个响应都需要 Group 内所有成员参与协调。这也是 Kafka 3.1 引入 Cooperative Sticky Assignor(增量式 Rebalance)的动机——只重新分配真正需要变化的 Partition,让没有变化的 Consumer 可以继续消费。
Level 2 · 它是怎么运行的(3-5年经验)
GroupMetadata — Group 的内存状态
每个 Consumer Group 在 Coordinator Broker 上有一个对应的 GroupMetadata 对象:
// GroupMetadata.scala(core/src/main/scala/kafka/coordinator/group/,简化)
class GroupMetadata(val groupId: String, initialState: GroupState, time: Time) {
// Group 的当前状态(状态机)
private var state: GroupState = initialState
// Group 中所有活跃成员
private val members = new mutable.HashMap[String, MemberMetadata] // memberId → MemberMetadata
// 当前代次(每次 Rebalance 递增,用于检测并拒绝过期请求)
var generationId: Int = 0
// 当前分配给各成员的 Partition 方案(SyncGroup 阶段写入)
private var assignment: Map[String, Array[Byte]] = Map.empty // memberId → 序列化的分区列表
// Leader 成员 ID(负责执行分区分配算法的成员)
var leaderId: Option[String] = None
// Rebalance 超时计时器的键(用于 DelayedJoin)
var newMemberAdded: Boolean = false
}
GroupState 枚举
Group 可处于以下五种状态:
// GroupState.scala(简化)
sealed trait GroupState
case object Empty extends GroupState // Group 存在但无活跃成员
case object PreparingRebalance extends GroupState // 正在等待所有成员重新 JoinGroup
case object CompletingRebalance extends GroupState // 等待 Leader 提交分配方案(SyncGroup)
case object Stable extends GroupState // 正常运行,分区已分配
case object Dead extends GroupState // Group 已删除
MemberMetadata — 单个成员的状态
// MemberMetadata.scala(简化)
class MemberMetadata(
val memberId: String, // Coordinator 分配的唯一 ID
val groupId: String,
val clientId: String, // consumer.id 配置
val clientHost: String, // Consumer 的 IP
val rebalanceTimeoutMs: Int, // max.poll.interval.ms
val sessionTimeoutMs: Int, // session.timeout.ms
val protocolType: String, // "consumer"
var supportedProtocols: List[(String, Array[Byte])] // 支持的分配策略及订阅信息
) {
// 当前 Consumer 分配到的 Partition(序列化字节)
var assignment: Array[Byte] = Array.empty
// 是否有 JoinGroup 响应在等待(DelayedJoin 持有)
var awaitingJoinCallback: JoinGroupResult => Unit = null
// 是否有 SyncGroup 响应在等待
var awaitingSyncCallback: SyncGroupResult => Unit = null
// 会话心跳计时器:超时则踢出 Group
var heartbeatSatisfied: Boolean = false
var latestHeartbeat: Long = -1
}
handleJoinGroup() — Rebalance 的触发器
Consumer 调用 subscribe() 后,Consumer 内部的 ConsumerCoordinator 会向 Broker 发送 JoinGroupRequest。这是 Rebalance 的起点:
// GroupCoordinator.scala(core/,简化)
def handleJoinGroup(
groupId: String,
memberId: String, // 第一次加入时为空字符串
groupInstanceId: Option[String], // 静态成员 ID(KIP-345)
requireKnownMemberId: Boolean,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinGroupResult => Unit,
requestLocal: RequestLocal
): Unit = {
groupManager.getOrMaybeCreateGroup(groupId, protocolType) match {
case None =>
// Group 不存在且不允许创建,返回错误
responseCallback(JoinGroupResult(memberId, error = Errors.UNKNOWN_MEMBER_ID))
case Some(group) =>
group.inLock {
// 校验 sessionTimeoutMs 范围
if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
responseCallback(JoinGroupResult(memberId, error = Errors.INVALID_SESSION_TIMEOUT))
return
}
group.currentState match {
case Dead =>
// Group 已删除,Consumer 需要重新创建
responseCallback(JoinGroupResult(memberId, error = Errors.COORDINATOR_NOT_AVAILABLE))
case Empty | Dead =>
// Group 为空,第一个成员加入,触发 Rebalance
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
clientId, clientHost, protocolType, protocols, group, responseCallback)
case PreparingRebalance =>
// Rebalance 进行中,直接加入
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
// 新成员,分配 memberId
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
clientId, clientHost, protocolType, protocols, group, responseCallback)
} else {
// 已知成员重新加入(Rebalance 期间的 rejoin)
updateMemberAndRebalance(group, memberId, protocols, responseCallback)
}
case CompletingRebalance =>
// 正在等待 SyncGroup,新成员的加入需要再次触发 Rebalance
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
addMemberAndRebalance(...)
} else {
// 已知成员重新 join:中断当前 SyncGroup,重启 Rebalance
prepareRebalance(group, s"Rebalance triggered by member join: $memberId")
updateMemberAndRebalance(group, memberId, protocols, responseCallback)
}
case Stable =>
// Group 正常运行
group.get(memberId) match {
case None =>
// 新成员加入 Stable 的 Group,触发 Rebalance
addMemberAndRebalance(...)
case Some(existingMember) =>
if (existingMember.matchesProtocols(protocols) && memberId == group.leaderId.orNull) {
// Leader 使用相同协议重新 join(心跳超时后的正常 rejoin)
// 直接返回当前 Generation 信息,不触发 Rebalance
responseCallback(JoinGroupResult.forStableGroup(group, existingMember))
} else {
// 订阅协议变化,或非 Leader 成员重新 join,触发 Rebalance
updateMemberAndRebalance(group, memberId, protocols, responseCallback)
}
}
}
}
}
}
prepareRebalance() — 进入 PreparingRebalance 状态
// GroupCoordinator.scala(简化)
private def prepareRebalance(group: GroupMetadata, reason: String): Unit = {
// 取消当前正在等待的 SyncGroup 响应(如果处于 CompletingRebalance 状态)
if (group.is(CompletingRebalance)) {
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
}
// 更新状态到 PreparingRebalance
val delayedRebalance = if (group.is(Empty))
new InitialDelayedJoin(this, joinPurgatory, group, groupConfig, ...)
else
new DelayedJoin(this, group, group.rebalanceTimeoutMs)
group.transitionTo(PreparingRebalance)
info(s"Preparing to rebalance group ${group.groupId} in state ${group.currentState} " +
s"with old generation ${group.generationId} (${java.util.UUID.randomUUID}) " +
s"(reason: $reason)")
// 将 DelayedJoin 放入 Purgatory,等待所有成员 Join 或超时
val groupKey = GroupJoinKey(group.groupId)
joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}
DelayedJoin — 等待所有成员
// DelayedJoin.scala(简化)
class DelayedJoin(
coordinator: GroupCoordinator,
group: GroupMetadata,
rebalanceTimeout: Long
) extends DelayedOperation(rebalanceTimeout) {
// 完成条件:所有已知成员都已重新发送 JoinGroupRequest
override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
override def onExpiration(): Unit = coordinator.onExpireJoin()
override def onComplete(): Unit = coordinator.onCompleteJoin(group)
}
// GroupCoordinator.scala(简化)
def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean): Boolean = {
group.inLock {
// 所有已知成员(上一代中未明确离开的)都已发送 JoinGroup 请求
if (group.hasAllMembersJoined) forceComplete()
else false
}
}
// 所有成员 Join 完成(或超时)后触发
def onCompleteJoin(group: GroupMetadata): Unit = {
group.inLock {
// 踢出超时未 Join 的成员
val notYetRejoinedDynamicMembers = group.notYetRejoinedDynamicMembers
if (notYetRejoinedDynamicMembers.nonEmpty) {
notYetRejoinedDynamicMembers.values.foreach { member =>
group.remove(member.memberId)
replicaManager.removeDelayedProduceLock(...)
}
}
// 递增 Generation ID
if (group.is(Dead)) return
group.initNextGeneration() // generationId++, 清空 assignment
// 向所有成员返回 JoinGroupResponse,其中 Leader 成员会收到完整的成员列表
group.allMembers.foreach { member =>
val joinResult = JoinGroupResult(
members = if (member.memberId == group.leaderId.orNull) {
// Leader 收到:所有成员的订阅信息(用于执行分区分配)
group.currentMemberMetadata
} else {
List.empty // Follower 只收到空列表
},
memberId = member.memberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderId.orNull,
skipAssignment = false,
error = Errors.NONE
)
member.awaitingJoinCallback(joinResult)
member.awaitingJoinCallback = null
}
// Group 进入 CompletingRebalance 状态,等待 Leader 提交分配方案
group.transitionTo(CompletingRebalance)
}
}
handleSyncGroup() — 分发分配方案
Leader Consumer 收到 JoinGroupResponse 后,在本地执行分区分配算法(Round Robin、Range、Sticky 等),然后通过 SyncGroupRequest 将结果提交给 Coordinator:
// GroupCoordinator.scala(简化)
def handleSyncGroup(
groupId: String,
generation: Int,
memberId: String,
groupInstanceId: Option[String],
protocolType: Option[String],
protocolName: Option[String],
groupAssignment: Map[String, Array[Byte]], // 仅 Leader 发送此参数,其他成员为空
responseCallback: SyncGroupResult => Unit,
requestLocal: RequestLocal
): Unit = {
groupManager.getGroup(groupId) match {
case None =>
responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
case Some(group) =>
group.inLock {
// 校验 Generation ID(防止处理过期的 SyncGroup 请求)
if (group.generationId != generation) {
responseCallback(SyncGroupResult(Errors.ILLEGAL_GENERATION))
return
}
group.currentState match {
case Empty | Dead =>
responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
case PreparingRebalance =>
responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
case CompletingRebalance =>
// 注册 SyncGroup 回调(等待所有成员的 SyncGroup 请求都收到)
group.get(memberId).foreach(_.awaitingSyncCallback = responseCallback)
// 如果是 Leader 提交的分配方案
if (memberId == group.leaderId.orNull) {
// 将分配方案持久化到 __consumer_offsets
// 同时向所有成员返回各自的分配结果
setAndPropagateAssignment(group, groupAssignment)
group.transitionTo(Stable)
}
// 非 Leader 成员:只注册回调,等 Leader 触发后一并响应
case Stable =>
// Group 已稳定,重新发来的 SyncGroup(可能是 Consumer 重启后的情况)
// 直接返回当前分配方案
val assignment = group.assignment(memberId)
responseCallback(SyncGroupResult(
protocolType, protocolName, assignment, Errors.NONE
))
}
}
}
}
setAndPropagateAssignment() — 持久化并分发
// GroupCoordinator.scala(简化)
private def setAndPropagateAssignment(
group: GroupMetadata,
assignment: Map[String, Array[Byte]]
): Unit = {
// 1. 更新 GroupMetadata 中每个成员的 assignment 字段
assignment.foreach { case (memberId, memberAssignment) =>
group.get(memberId).foreach { member =>
member.assignment = memberAssignment
}
}
// 2. 将 GroupMetadata(包含分配方案)持久化到 __consumer_offsets
groupManager.storeGroup(group, assignment, error => {
group.inLock {
if (group.is(CompletingRebalance) && error == Errors.NONE) {
// 3. 向所有等待 SyncGroup 响应的成员发送结果
group.allMembers.foreach { member =>
member.awaitingSyncCallback(SyncGroupResult(
group.protocolType,
group.protocolName,
member.assignment, // 每个成员只收到自己的分配结果
Errors.NONE
))
member.awaitingSyncCallback = null
}
group.transitionTo(Stable)
}
}
})
}
Level 3 · 规范怎么定义的(资深)
本章的协议与规范细节已融入上述 Level 中。如需深入了解,请参阅 Kafka 官方协议文档和对应 KIP 提案。
Level 4 · 边界与陷阱(所有人)
以下是与"Group Coordinator 源码:Rebalance 全流程"相关的常见边界问题和生产陷阱:
陷阱一:忽略默认配置的隐含假设。 许多 Kafka 参数的默认值是为通用场景设计的,在特定业务场景下可能导致性能问题或数据风险。上线前务必逐一审查与本章主题相关的配置项,确认其是否符合业务需求。
陷阱二:缺乏端到端的验证。 理论理解和生产实践之间往往存在差距。建议在预发布环境中构建完整的测试场景,覆盖正常路径和异常路径(如节点宕机、网络分区、磁盘满),验证系统行为是否符合预期。
陷阱三:监控盲区。 本章涉及的机制如果缺乏对应的监控指标和告警规则,问题往往在造成业务影响后才被发现。建议将关键指标纳入监控体系,设置合理的告警阈值。