第 21 章

Group Coordinator 源码:Rebalance 全流程

第21章:Group Coordinator 源码:Rebalance 全流程

导读:Group Coordinator 源码如何实现 Rebalance 全流程?

本章核心问题:Group Coordinator 源码如何实现 Rebalance 全流程?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

Consumer Group 的本质

Consumer Group 是 Kafka 实现消息订阅语义的核心机制:同一个 Group 内的多个 Consumer 共同消费一个 Topic 的所有 Partition,每个 Partition 只被 Group 内的一个 Consumer 消费。当 Consumer 数量或 Topic Partition 数量发生变化时,就需要重新分配——这个过程叫做 Rebalance

Rebalance 是 Kafka 最复杂的协议之一。它由一系列精密设计的状态转换驱动,涉及 GroupCoordinator(Broker 侧)和 ConsumerCoordinator(Client 侧)的配合。本章深入 GroupCoordinator.scala 的源码,追踪 Rebalance 的完整生命周期。

handleHeartbeat() — 维持会话

// GroupCoordinator.scala(简化)
def handleHeartbeat(
  groupId: String,
  memberId: String,
  groupInstanceId: Option[String],
  generationId: Int,
  responseCallback: Errors => Unit
): Unit = {
  groupManager.getGroup(groupId) match {
    case None => responseCallback(Errors.UNKNOWN_MEMBER_ID)

    case Some(group) =>
      group.inLock {
        group.currentState match {
          case Dead =>
            responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
          case Empty =>
            responseCallback(Errors.UNKNOWN_MEMBER_ID)
          case CompletingRebalance =>
            // Rebalance 期间收到心跳,直接返回 REBALANCE_IN_PROGRESS
            // Consumer 收到后会重新发送 JoinGroupRequest
            responseCallback(Errors.REBALANCE_IN_PROGRESS)
          case PreparingRebalance =>
            group.get(memberId) match {
              case None => responseCallback(Errors.UNKNOWN_MEMBER_ID)
              case Some(member) =>
                // 验证 Generation ID,刷新心跳计时器
                completeAndScheduleNextHeartbeatExpiration(group, member)
                responseCallback(Errors.REBALANCE_IN_PROGRESS)
            }
          case Stable =>
            group.get(memberId) match {
              case None => responseCallback(Errors.UNKNOWN_MEMBER_ID)
              case Some(member) =>
                if (generationId != group.generationId) {
                  responseCallback(Errors.ILLEGAL_GENERATION)
                } else {
                  // 正常心跳:刷新会话计时器
                  completeAndScheduleNextHeartbeatExpiration(group, member)
                  responseCallback(Errors.NONE)
                }
            }
        }
      }
  }
}

心跳超时(session.timeout.ms)处理:

// GroupCoordinator.scala(简化)
private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata): Unit = {
  // 取消当前的过期计时器
  completeAndScheduleNextExpiration(group, member, MemberKey(group.groupId, member.memberId))
}

// 超时触发
def onMemberHeartbeatExpired(group: GroupMetadata, member: MemberMetadata): Unit = {
  group.inLock {
    if (member.sessionTimeoutMs <= (time.milliseconds - member.latestHeartbeat)) {
      // 成员会话超时,将其踢出 Group,触发 Rebalance
      info(s"Member ${member.memberId} in group ${group.groupId} has failed, " +
           s"removing it from the group")
      removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
    }
  }
}

handleLeaveGroup() — 主动离开

// GroupCoordinator.scala(简化)
def handleLeaveGroup(
  groupId: String,
  memberIdentities: List[MemberIdentity],
  responseCallback: LeaveGroupResult => Unit
): Unit = {
  groupManager.getGroup(groupId) match {
    case None => responseCallback(LeaveGroupResult(Errors.UNKNOWN_MEMBER_ID, ...))

    case Some(group) =>
      group.inLock {
        val memberErrors = memberIdentities.map { identity =>
          group.get(identity.memberId) match {
            case None => MemberResponse(identity.memberId, Errors.UNKNOWN_MEMBER_ID)
            case Some(member) =>
              // 从 Group 中移除成员,触发 Rebalance
              removeMemberAndUpdateGroup(group, member, s"LeaveGroup received from member ${identity.memberId}")
              MemberResponse(identity.memberId, Errors.NONE)
          }
        }
        responseCallback(LeaveGroupResult(Errors.NONE, memberErrors))
      }
  }
}

Offset 管理:handleOffsetCommit() 和 handleOffsetFetch()

Consumer Group 将消费进度(Offset)提交到 __consumer_offsets Topic,由 GroupCoordinator 所在的 Broker 负责读写:

// GroupCoordinator.scala(简化)
def handleOffsetCommit(
  groupId: String,
  generationId: Int,
  memberId: String,
  groupInstanceId: Option[String],
  offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata],
  responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit,
  requestLocal: RequestLocal
): Unit = {

  groupManager.getGroup(groupId) match {
    case Some(group) =>
      group.inLock {
        // 校验 Generation(防止提交过期 Offset)
        if (generationId != group.generationId && generationId != OffsetCommitRequest.DEFAULT_GENERATION_ID) {
          responseCallback(offsetMetadata.map { case (tp, _) => tp -> Errors.ILLEGAL_GENERATION })
          return
        }

        // 将 Offset 写入 __consumer_offsets(通过 ReplicaManager 追加日志)
        groupManager.storeOffsets(group, memberId, groupInstanceId, offsetMetadata, responseCallback, requestLocal)
      }
    case None =>
      // Group 不存在(可能是纯 Offset 存储,不关联 Rebalance)
      groupManager.storeOffsets(GroupMetadata.loadingGroup(groupId), memberId, None, offsetMetadata, responseCallback, requestLocal)
  }
}
// GroupCoordinator.scala(简化)
def handleOffsetFetch(
  groupId: String,
  partitions: Option[Seq[TopicPartition]],  // None = 获取所有 Offset
  requireStable: Boolean,
  responseCallback: Map[TopicPartition, OffsetFetchResponse.PartitionData] => Unit
): Unit = {

  groupManager.getGroup(groupId) match {
    case None =>
      // 尝试从 __consumer_offsets 加载(Group 可能在协调者重启后尚未恢复到内存)
      val groupOffsets = groupManager.getOffsets(groupId, partitions)
      responseCallback(groupOffsets)

    case Some(group) =>
      group.inLock {
        if (group.is(Dead)) {
          responseCallback(partitions.getOrElse(Nil).map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap)
        } else {
          // 优先从内存缓存读取(GroupMetadataManager 维护的 offsetsCache)
          val offsets = groupManager.getOffsets(groupId, partitions)
          responseCallback(offsets)
        }
      }
  }
}

__consumer_offsets 的数据结构

Offset 提交写入 __consumer_offsets 时,key 和 value 都是二进制编码:

Key:   [GroupId][TopicName][PartitionId]  (版本号 + 字段)
Value: [Offset][LeaderEpoch][Metadata][CommitTimestamp][ExpireTimestamp]

GroupMetadataManager 在 Coordinator 启动时(或成为该 Group 的 Coordinator 时)从 __consumer_offsets 加载已有的 Offset 记录到内存缓存,后续的 OffsetFetch 请求直接从内存返回,无需读取磁盘。

完整 Rebalance 时序图

Consumer A         Consumer B         GroupCoordinator
    │                  │                    │
    │──JoinGroup───────────────────────────→│ (触发 Rebalance,状态→PreparingRebalance)
    │                  │──JoinGroup─────────→│ (注册成员,等待其他成员)
    │                  │                    │
    │←──JoinGroupResponse(Leader)───────────│ (所有成员 Join 完成,状态→CompletingRebalance)
    │                  │←──JoinGroupResp────│ (Follower 收到空的成员列表)
    │                  │                    │
    │ [执行分配算法]    │                    │
    │──SyncGroup(assignment)────────────────→│ (持久化分配方案)
    │                  │──SyncGroup─────────→│ (等待 Leader 的分配)
    │                  │                    │
    │←──SyncGroupResp(partitions for A)─────│ (状态→Stable)
    │                  │←──SyncGroupResp────│ (Consumer B 收到自己的分配)
    │                  │                    │
    │──Heartbeat───────────────────────────→│
    │←──Heartbeat OK────────────────────────│

理解了这个状态机,就能解释为什么 Rebalance 期间整个 Consumer Group 会停止消费:所有 Consumer 都在等待 JoinGroup 和 SyncGroup 的响应,而这两个响应都需要 Group 内所有成员参与协调。这也是 Kafka 3.1 引入 Cooperative Sticky Assignor(增量式 Rebalance)的动机——只重新分配真正需要变化的 Partition,让没有变化的 Consumer 可以继续消费。


Level 2 · 它是怎么运行的(3-5年经验)

GroupMetadata — Group 的内存状态

每个 Consumer Group 在 Coordinator Broker 上有一个对应的 GroupMetadata 对象:

// GroupMetadata.scala(core/src/main/scala/kafka/coordinator/group/,简化)
class GroupMetadata(val groupId: String, initialState: GroupState, time: Time) {

  // Group 的当前状态(状态机)
  private var state: GroupState = initialState

  // Group 中所有活跃成员
  private val members = new mutable.HashMap[String, MemberMetadata]  // memberId → MemberMetadata

  // 当前代次(每次 Rebalance 递增,用于检测并拒绝过期请求)
  var generationId: Int = 0

  // 当前分配给各成员的 Partition 方案(SyncGroup 阶段写入)
  private var assignment: Map[String, Array[Byte]] = Map.empty  // memberId → 序列化的分区列表

  // Leader 成员 ID(负责执行分区分配算法的成员)
  var leaderId: Option[String] = None

  // Rebalance 超时计时器的键(用于 DelayedJoin)
  var newMemberAdded: Boolean = false
}

GroupState 枚举

Group 可处于以下五种状态:

// GroupState.scala(简化)
sealed trait GroupState
case object Empty               extends GroupState  // Group 存在但无活跃成员
case object PreparingRebalance  extends GroupState  // 正在等待所有成员重新 JoinGroup
case object CompletingRebalance extends GroupState  // 等待 Leader 提交分配方案(SyncGroup)
case object Stable              extends GroupState  // 正常运行,分区已分配
case object Dead                extends GroupState  // Group 已删除

MemberMetadata — 单个成员的状态

// MemberMetadata.scala(简化)
class MemberMetadata(
  val memberId: String,              // Coordinator 分配的唯一 ID
  val groupId: String,
  val clientId: String,             // consumer.id 配置
  val clientHost: String,           // Consumer 的 IP
  val rebalanceTimeoutMs: Int,      // max.poll.interval.ms
  val sessionTimeoutMs: Int,        // session.timeout.ms
  val protocolType: String,         // "consumer"
  var supportedProtocols: List[(String, Array[Byte])]  // 支持的分配策略及订阅信息
) {
  // 当前 Consumer 分配到的 Partition(序列化字节)
  var assignment: Array[Byte] = Array.empty

  // 是否有 JoinGroup 响应在等待(DelayedJoin 持有)
  var awaitingJoinCallback: JoinGroupResult => Unit = null

  // 是否有 SyncGroup 响应在等待
  var awaitingSyncCallback: SyncGroupResult => Unit = null

  // 会话心跳计时器:超时则踢出 Group
  var heartbeatSatisfied: Boolean = false
  var latestHeartbeat: Long = -1
}

handleJoinGroup() — Rebalance 的触发器

Consumer 调用 subscribe() 后,Consumer 内部的 ConsumerCoordinator 会向 Broker 发送 JoinGroupRequest。这是 Rebalance 的起点:

// GroupCoordinator.scala(core/,简化)
def handleJoinGroup(
  groupId: String,
  memberId: String,           // 第一次加入时为空字符串
  groupInstanceId: Option[String],  // 静态成员 ID(KIP-345)
  requireKnownMemberId: Boolean,
  clientId: String,
  clientHost: String,
  rebalanceTimeoutMs: Int,
  sessionTimeoutMs: Int,
  protocolType: String,
  protocols: List[(String, Array[Byte])],
  responseCallback: JoinGroupResult => Unit,
  requestLocal: RequestLocal
): Unit = {

  groupManager.getOrMaybeCreateGroup(groupId, protocolType) match {
    case None =>
      // Group 不存在且不允许创建,返回错误
      responseCallback(JoinGroupResult(memberId, error = Errors.UNKNOWN_MEMBER_ID))

    case Some(group) =>
      group.inLock {
        // 校验 sessionTimeoutMs 范围
        if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
            sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
          responseCallback(JoinGroupResult(memberId, error = Errors.INVALID_SESSION_TIMEOUT))
          return
        }

        group.currentState match {
          case Dead =>
            // Group 已删除,Consumer 需要重新创建
            responseCallback(JoinGroupResult(memberId, error = Errors.COORDINATOR_NOT_AVAILABLE))

          case Empty | Dead =>
            // Group 为空,第一个成员加入,触发 Rebalance
            addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
                                  clientId, clientHost, protocolType, protocols, group, responseCallback)

          case PreparingRebalance =>
            // Rebalance 进行中,直接加入
            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
              // 新成员,分配 memberId
              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
                                    clientId, clientHost, protocolType, protocols, group, responseCallback)
            } else {
              // 已知成员重新加入(Rebalance 期间的 rejoin)
              updateMemberAndRebalance(group, memberId, protocols, responseCallback)
            }

          case CompletingRebalance =>
            // 正在等待 SyncGroup,新成员的加入需要再次触发 Rebalance
            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
              addMemberAndRebalance(...)
            } else {
              // 已知成员重新 join:中断当前 SyncGroup,重启 Rebalance
              prepareRebalance(group, s"Rebalance triggered by member join: $memberId")
              updateMemberAndRebalance(group, memberId, protocols, responseCallback)
            }

          case Stable =>
            // Group 正常运行
            group.get(memberId) match {
              case None =>
                // 新成员加入 Stable 的 Group,触发 Rebalance
                addMemberAndRebalance(...)
              case Some(existingMember) =>
                if (existingMember.matchesProtocols(protocols) && memberId == group.leaderId.orNull) {
                  // Leader 使用相同协议重新 join(心跳超时后的正常 rejoin)
                  // 直接返回当前 Generation 信息,不触发 Rebalance
                  responseCallback(JoinGroupResult.forStableGroup(group, existingMember))
                } else {
                  // 订阅协议变化,或非 Leader 成员重新 join,触发 Rebalance
                  updateMemberAndRebalance(group, memberId, protocols, responseCallback)
                }
            }
        }
      }
  }
}

prepareRebalance() — 进入 PreparingRebalance 状态

// GroupCoordinator.scala(简化)
private def prepareRebalance(group: GroupMetadata, reason: String): Unit = {
  // 取消当前正在等待的 SyncGroup 响应(如果处于 CompletingRebalance 状态)
  if (group.is(CompletingRebalance)) {
    resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
  }

  // 更新状态到 PreparingRebalance
  val delayedRebalance = if (group.is(Empty))
    new InitialDelayedJoin(this, joinPurgatory, group, groupConfig, ...)
  else
    new DelayedJoin(this, group, group.rebalanceTimeoutMs)

  group.transitionTo(PreparingRebalance)
  info(s"Preparing to rebalance group ${group.groupId} in state ${group.currentState} " +
       s"with old generation ${group.generationId} (${java.util.UUID.randomUUID}) " +
       s"(reason: $reason)")

  // 将 DelayedJoin 放入 Purgatory,等待所有成员 Join 或超时
  val groupKey = GroupJoinKey(group.groupId)
  joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}

DelayedJoin — 等待所有成员

// DelayedJoin.scala(简化)
class DelayedJoin(
  coordinator: GroupCoordinator,
  group: GroupMetadata,
  rebalanceTimeout: Long
) extends DelayedOperation(rebalanceTimeout) {

  // 完成条件:所有已知成员都已重新发送 JoinGroupRequest
  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)

  override def onExpiration(): Unit = coordinator.onExpireJoin()

  override def onComplete(): Unit = coordinator.onCompleteJoin(group)
}
// GroupCoordinator.scala(简化)
def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean): Boolean = {
  group.inLock {
    // 所有已知成员(上一代中未明确离开的)都已发送 JoinGroup 请求
    if (group.hasAllMembersJoined) forceComplete()
    else false
  }
}

// 所有成员 Join 完成(或超时)后触发
def onCompleteJoin(group: GroupMetadata): Unit = {
  group.inLock {
    // 踢出超时未 Join 的成员
    val notYetRejoinedDynamicMembers = group.notYetRejoinedDynamicMembers
    if (notYetRejoinedDynamicMembers.nonEmpty) {
      notYetRejoinedDynamicMembers.values.foreach { member =>
        group.remove(member.memberId)
        replicaManager.removeDelayedProduceLock(...)
      }
    }

    // 递增 Generation ID
    if (group.is(Dead)) return
    group.initNextGeneration()  // generationId++, 清空 assignment

    // 向所有成员返回 JoinGroupResponse,其中 Leader 成员会收到完整的成员列表
    group.allMembers.foreach { member =>
      val joinResult = JoinGroupResult(
        members = if (member.memberId == group.leaderId.orNull) {
          // Leader 收到:所有成员的订阅信息(用于执行分区分配)
          group.currentMemberMetadata
        } else {
          List.empty  // Follower 只收到空列表
        },
        memberId        = member.memberId,
        generationId    = group.generationId,
        protocolType    = group.protocolType,
        protocolName    = group.protocolName,
        leaderId        = group.leaderId.orNull,
        skipAssignment  = false,
        error           = Errors.NONE
      )
      member.awaitingJoinCallback(joinResult)
      member.awaitingJoinCallback = null
    }

    // Group 进入 CompletingRebalance 状态,等待 Leader 提交分配方案
    group.transitionTo(CompletingRebalance)
  }
}

handleSyncGroup() — 分发分配方案

Leader Consumer 收到 JoinGroupResponse 后,在本地执行分区分配算法(Round Robin、Range、Sticky 等),然后通过 SyncGroupRequest 将结果提交给 Coordinator:

// GroupCoordinator.scala(简化)
def handleSyncGroup(
  groupId: String,
  generation: Int,
  memberId: String,
  groupInstanceId: Option[String],
  protocolType: Option[String],
  protocolName: Option[String],
  groupAssignment: Map[String, Array[Byte]],  // 仅 Leader 发送此参数,其他成员为空
  responseCallback: SyncGroupResult => Unit,
  requestLocal: RequestLocal
): Unit = {

  groupManager.getGroup(groupId) match {
    case None =>
      responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))

    case Some(group) =>
      group.inLock {
        // 校验 Generation ID(防止处理过期的 SyncGroup 请求)
        if (group.generationId != generation) {
          responseCallback(SyncGroupResult(Errors.ILLEGAL_GENERATION))
          return
        }

        group.currentState match {
          case Empty | Dead =>
            responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))

          case PreparingRebalance =>
            responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))

          case CompletingRebalance =>
            // 注册 SyncGroup 回调(等待所有成员的 SyncGroup 请求都收到)
            group.get(memberId).foreach(_.awaitingSyncCallback = responseCallback)

            // 如果是 Leader 提交的分配方案
            if (memberId == group.leaderId.orNull) {
              // 将分配方案持久化到 __consumer_offsets
              // 同时向所有成员返回各自的分配结果
              setAndPropagateAssignment(group, groupAssignment)
              group.transitionTo(Stable)
            }
            // 非 Leader 成员:只注册回调,等 Leader 触发后一并响应

          case Stable =>
            // Group 已稳定,重新发来的 SyncGroup(可能是 Consumer 重启后的情况)
            // 直接返回当前分配方案
            val assignment = group.assignment(memberId)
            responseCallback(SyncGroupResult(
              protocolType, protocolName, assignment, Errors.NONE
            ))
        }
      }
  }
}

setAndPropagateAssignment() — 持久化并分发

// GroupCoordinator.scala(简化)
private def setAndPropagateAssignment(
  group: GroupMetadata,
  assignment: Map[String, Array[Byte]]
): Unit = {
  // 1. 更新 GroupMetadata 中每个成员的 assignment 字段
  assignment.foreach { case (memberId, memberAssignment) =>
    group.get(memberId).foreach { member =>
      member.assignment = memberAssignment
    }
  }

  // 2. 将 GroupMetadata(包含分配方案)持久化到 __consumer_offsets
  groupManager.storeGroup(group, assignment, error => {
    group.inLock {
      if (group.is(CompletingRebalance) && error == Errors.NONE) {
        // 3. 向所有等待 SyncGroup 响应的成员发送结果
        group.allMembers.foreach { member =>
          member.awaitingSyncCallback(SyncGroupResult(
            group.protocolType,
            group.protocolName,
            member.assignment,  // 每个成员只收到自己的分配结果
            Errors.NONE
          ))
          member.awaitingSyncCallback = null
        }
        group.transitionTo(Stable)
      }
    }
  })
}

Level 3 · 规范怎么定义的(资深)

本章的协议与规范细节已融入上述 Level 中。如需深入了解,请参阅 Kafka 官方协议文档和对应 KIP 提案。


Level 4 · 边界与陷阱(所有人)

以下是与"Group Coordinator 源码:Rebalance 全流程"相关的常见边界问题和生产陷阱:

陷阱一:忽略默认配置的隐含假设。 许多 Kafka 参数的默认值是为通用场景设计的,在特定业务场景下可能导致性能问题或数据风险。上线前务必逐一审查与本章主题相关的配置项,确认其是否符合业务需求。

陷阱二:缺乏端到端的验证。 理论理解和生产实践之间往往存在差距。建议在预发布环境中构建完整的测试场景,覆盖正常路径和异常路径(如节点宕机、网络分区、磁盘满),验证系统行为是否符合预期。

陷阱三:监控盲区。 本章涉及的机制如果缺乏对应的监控指标和告警规则,问题往往在造成业务影响后才被发现。建议将关键指标纳入监控体系,设置合理的告警阈值。

本章评分
4.7  / 5  (8 评分)

💬 留言讨论