Chapter 21

Group Coordinator Source: The Complete Rebalance Flow

What a Consumer Group Really Is

A Consumer Group is Kafka's mechanism for the subscription messaging semantic: multiple consumers within the same group collectively consume all partitions of a topic, with each partition assigned to exactly one consumer at a time. When the number of consumers or topic partitions changes, assignments must be reshuffled โ€” a process called Rebalance.

Rebalance is one of Kafka's most intricate protocols. It is driven by a carefully designed state machine, requiring coordination between GroupCoordinator (broker side) and ConsumerCoordinator (client side). This chapter dives into GroupCoordinator.scala to trace the complete rebalance lifecycle.

GroupMetadata โ€” The Group's In-Memory State

Every Consumer Group has a corresponding GroupMetadata object on its Coordinator broker:

// GroupMetadata.scala (core/src/main/scala/kafka/coordinator/group/, simplified)
class GroupMetadata(val groupId: String, initialState: GroupState, time: Time) {

  // Current state of the group (state machine)
  private var state: GroupState = initialState

  // All active members (memberId โ†’ MemberMetadata)
  private val members = new mutable.HashMap[String, MemberMetadata]

  // Generation ID: incremented on every rebalance, used to reject stale requests
  var generationId: Int = 0

  // Partition assignment sent to members in the SyncGroup phase
  // memberId โ†’ serialized partition list (protocol-specific bytes)
  private var assignment: Map[String, Array[Byte]] = Map.empty

  // The leader member โ€” the one that runs the assignment algorithm
  var leaderId: Option[String] = None
}

GroupState Enum

A group can be in one of five states:

sealed trait GroupState
case object Empty               extends GroupState  // group exists but has no active members
case object PreparingRebalance  extends GroupState  // waiting for all members to re-join
case object CompletingRebalance extends GroupState  // waiting for leader to submit assignment
case object Stable              extends GroupState  // running normally, partitions assigned
case object Dead                extends GroupState  // group has been deleted

MemberMetadata โ€” A Single Member's State

// MemberMetadata.scala (simplified)
class MemberMetadata(
  val memberId: String,                   // unique ID assigned by the Coordinator
  val groupId: String,
  val clientId: String,                   // consumer.client.id config
  val clientHost: String,                 // consumer's IP address
  val rebalanceTimeoutMs: Int,            // max.poll.interval.ms
  val sessionTimeoutMs: Int,              // session.timeout.ms
  val protocolType: String,               // "consumer"
  var supportedProtocols: List[(String, Array[Byte])]  // supported assignment strategies + subscription metadata
) {
  // Serialized partition assignment for this member (received from SyncGroup)
  var assignment: Array[Byte] = Array.empty

  // Pending JoinGroup response callback (held by DelayedJoin)
  var awaitingJoinCallback: JoinGroupResult => Unit = null

  // Pending SyncGroup response callback
  var awaitingSyncCallback: SyncGroupResult => Unit = null

  var latestHeartbeat: Long = -1
}

handleJoinGroup() โ€” The Rebalance Trigger

After a consumer calls subscribe(), its internal ConsumerCoordinator sends a JoinGroupRequest to the Coordinator broker. This is the entry point for every rebalance:

// GroupCoordinator.scala (core/, simplified)
def handleJoinGroup(
  groupId: String,
  memberId: String,              // empty string on first join
  groupInstanceId: Option[String],  // static member ID (KIP-345)
  requireKnownMemberId: Boolean,
  clientId: String,
  clientHost: String,
  rebalanceTimeoutMs: Int,
  sessionTimeoutMs: Int,
  protocolType: String,
  protocols: List[(String, Array[Byte])],
  responseCallback: JoinGroupResult => Unit,
  requestLocal: RequestLocal
): Unit = {

  groupManager.getOrMaybeCreateGroup(groupId, protocolType) match {
    case None =>
      responseCallback(JoinGroupResult(memberId, error = Errors.UNKNOWN_MEMBER_ID))

    case Some(group) =>
      group.inLock {
        if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
            sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
          responseCallback(JoinGroupResult(memberId, error = Errors.INVALID_SESSION_TIMEOUT))
          return
        }

        group.currentState match {
          case Dead =>
            responseCallback(JoinGroupResult(memberId, error = Errors.COORDINATOR_NOT_AVAILABLE))

          case Empty =>
            // First member joining an empty group โ€” trigger rebalance
            addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
                                  clientId, clientHost, protocolType, protocols, group, responseCallback)

          case PreparingRebalance =>
            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
              // Brand-new member: assign it a memberId, add to group
              addMemberAndRebalance(...)
            } else {
              // Known member re-joining during rebalance
              updateMemberAndRebalance(group, memberId, protocols, responseCallback)
            }

          case CompletingRebalance =>
            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
              // New member arrives while waiting for SyncGroup: restart rebalance
              addMemberAndRebalance(...)
            } else {
              // Known member rejoins: abort current SyncGroup phase, restart rebalance
              prepareRebalance(group, s"Rebalance triggered by member join: $memberId")
              updateMemberAndRebalance(group, memberId, protocols, responseCallback)
            }

          case Stable =>
            group.get(memberId) match {
              case None =>
                // New member joining a stable group: trigger rebalance
                addMemberAndRebalance(...)
              case Some(existingMember) =>
                if (existingMember.matchesProtocols(protocols) && memberId == group.leaderId.orNull) {
                  // Leader rejoining with same protocols (normal rejoin after max.poll.interval expiry)
                  // No rebalance needed โ€” return current generation info directly
                  responseCallback(JoinGroupResult.forStableGroup(group, existingMember))
                } else {
                  // Subscription changed, or a follower is rejoining โ€” trigger rebalance
                  updateMemberAndRebalance(group, memberId, protocols, responseCallback)
                }
            }
        }
      }
  }
}

prepareRebalance() โ€” Entering PreparingRebalance

// GroupCoordinator.scala (simplified)
private def prepareRebalance(group: GroupMetadata, reason: String): Unit = {
  // If we are in CompletingRebalance, cancel pending SyncGroup responses
  if (group.is(CompletingRebalance)) {
    resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
  }

  // Create the appropriate DelayedJoin timer
  val delayedRebalance = if (group.is(Empty))
    new InitialDelayedJoin(this, joinPurgatory, group, groupConfig, ...)
  else
    new DelayedJoin(this, group, group.rebalanceTimeoutMs)

  group.transitionTo(PreparingRebalance)
  info(s"Preparing to rebalance group ${group.groupId} in state ${group.currentState} " +
       s"with old generation ${group.generationId} (reason: $reason)")

  // Park DelayedJoin in Purgatory โ€” it fires when all members have joined or timeout expires
  val groupKey = GroupJoinKey(group.groupId)
  joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}

DelayedJoin โ€” Waiting for All Members

// DelayedJoin.scala (simplified)
class DelayedJoin(
  coordinator: GroupCoordinator,
  group: GroupMetadata,
  rebalanceTimeout: Long
) extends DelayedOperation(rebalanceTimeout) {

  // Completion condition: all currently known members have re-sent JoinGroupRequest
  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)

  override def onExpiration(): Unit = coordinator.onExpireJoin()

  // Called when all members have joined or timeout fires
  override def onComplete(): Unit = coordinator.onCompleteJoin(group)
}
// GroupCoordinator.scala (simplified)
def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean): Boolean = {
  group.inLock {
    if (group.hasAllMembersJoined) forceComplete()
    else false
  }
}

def onCompleteJoin(group: GroupMetadata): Unit = {
  group.inLock {
    // Evict members that never sent a JoinGroup request (timed out)
    val notYetRejoinedDynamicMembers = group.notYetRejoinedDynamicMembers
    if (notYetRejoinedDynamicMembers.nonEmpty) {
      notYetRejoinedDynamicMembers.values.foreach { member =>
        group.remove(member.memberId)
      }
    }

    if (group.is(Dead)) return

    // Increment generation ID โ€” this is the "new epoch" that all members must use
    group.initNextGeneration()  // generationId++, clears assignment

    // Send JoinGroupResponse to all members
    group.allMembers.foreach { member =>
      val joinResult = JoinGroupResult(
        // Leader receives the full member list (including each member's subscription metadata)
        // Followers receive an empty list โ€” they just wait for their assignment
        members = if (member.memberId == group.leaderId.orNull)
          group.currentMemberMetadata
        else
          List.empty,
        memberId     = member.memberId,
        generationId = group.generationId,
        protocolType = group.protocolType,
        protocolName = group.protocolName,
        leaderId     = group.leaderId.orNull,
        error        = Errors.NONE
      )
      member.awaitingJoinCallback(joinResult)
      member.awaitingJoinCallback = null
    }

    // Transition to CompletingRebalance: waiting for leader's SyncGroup
    group.transitionTo(CompletingRebalance)
  }
}

handleSyncGroup() โ€” Distributing the Assignment

The leader consumer receives the JoinGroupResponse containing the full member list. It runs the partition assignment algorithm locally (Round Robin, Range, Sticky, etc.) and submits the result via SyncGroupRequest:

// GroupCoordinator.scala (simplified)
def handleSyncGroup(
  groupId: String,
  generation: Int,
  memberId: String,
  groupInstanceId: Option[String],
  protocolType: Option[String],
  protocolName: Option[String],
  groupAssignment: Map[String, Array[Byte]],  // only leader sends this; followers send empty
  responseCallback: SyncGroupResult => Unit,
  requestLocal: RequestLocal
): Unit = {

  groupManager.getGroup(groupId) match {
    case None =>
      responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))

    case Some(group) =>
      group.inLock {
        if (group.generationId != generation) {
          responseCallback(SyncGroupResult(Errors.ILLEGAL_GENERATION))
          return
        }

        group.currentState match {
          case Empty | Dead =>
            responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))

          case PreparingRebalance =>
            responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))

          case CompletingRebalance =>
            // Register this member's response callback
            group.get(memberId).foreach(_.awaitingSyncCallback = responseCallback)

            // Only the leader's SyncGroup carries the actual assignment
            if (memberId == group.leaderId.orNull) {
              // Persist assignment to __consumer_offsets, then respond to everyone
              setAndPropagateAssignment(group, groupAssignment)
              group.transitionTo(Stable)
            }
            // Non-leader members: callback is registered, will fire when leader's call completes

          case Stable =>
            // Group is already stable (consumer restarted and re-synced)
            val assignment = group.assignment(memberId)
            responseCallback(SyncGroupResult(protocolType, protocolName, assignment, Errors.NONE))
        }
      }
  }
}

setAndPropagateAssignment() โ€” Persist and Distribute

// GroupCoordinator.scala (simplified)
private def setAndPropagateAssignment(
  group: GroupMetadata,
  assignment: Map[String, Array[Byte]]
): Unit = {
  // 1. Update each member's assignment field in GroupMetadata
  assignment.foreach { case (memberId, memberAssignment) =>
    group.get(memberId).foreach { member =>
      member.assignment = memberAssignment
    }
  }

  // 2. Persist GroupMetadata (including the assignment) to __consumer_offsets
  groupManager.storeGroup(group, assignment, error => {
    group.inLock {
      if (group.is(CompletingRebalance) && error == Errors.NONE) {
        // 3. Fire all pending SyncGroup callbacks โ€” each member receives only its own assignment
        group.allMembers.foreach { member =>
          member.awaitingSyncCallback(SyncGroupResult(
            group.protocolType,
            group.protocolName,
            member.assignment,   // each member gets only its own slice
            Errors.NONE
          ))
          member.awaitingSyncCallback = null
        }
        group.transitionTo(Stable)
      }
    }
  })
}

handleHeartbeat() โ€” Maintaining the Session

// GroupCoordinator.scala (simplified)
def handleHeartbeat(
  groupId: String,
  memberId: String,
  groupInstanceId: Option[String],
  generationId: Int,
  responseCallback: Errors => Unit
): Unit = {
  groupManager.getGroup(groupId) match {
    case None => responseCallback(Errors.UNKNOWN_MEMBER_ID)

    case Some(group) =>
      group.inLock {
        group.currentState match {
          case Dead =>
            responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
          case Empty =>
            responseCallback(Errors.UNKNOWN_MEMBER_ID)
          case CompletingRebalance =>
            // Heartbeat received during SyncGroup phase
            // Signal REBALANCE_IN_PROGRESS โ€” consumer will re-issue JoinGroup
            responseCallback(Errors.REBALANCE_IN_PROGRESS)
          case PreparingRebalance =>
            group.get(memberId) match {
              case None => responseCallback(Errors.UNKNOWN_MEMBER_ID)
              case Some(member) =>
                completeAndScheduleNextHeartbeatExpiration(group, member)
                responseCallback(Errors.REBALANCE_IN_PROGRESS)  // still rebalancing
            }
          case Stable =>
            group.get(memberId) match {
              case None => responseCallback(Errors.UNKNOWN_MEMBER_ID)
              case Some(member) =>
                if (generationId != group.generationId) {
                  responseCallback(Errors.ILLEGAL_GENERATION)
                } else {
                  // Normal heartbeat: reset the session timer
                  completeAndScheduleNextHeartbeatExpiration(group, member)
                  responseCallback(Errors.NONE)
                }
            }
        }
      }
  }
}

Session timeout handling โ€” when session.timeout.ms elapses without a heartbeat:

// GroupCoordinator.scala (simplified)
def onMemberHeartbeatExpired(group: GroupMetadata, member: MemberMetadata): Unit = {
  group.inLock {
    if (member.sessionTimeoutMs <= (time.milliseconds - member.latestHeartbeat)) {
      info(s"Member ${member.memberId} in group ${group.groupId} has failed " +
           s"(missed heartbeat), removing it from the group")
      // Evict the member and trigger a rebalance
      removeMemberAndUpdateGroup(group, member,
          s"removing member ${member.memberId} on heartbeat expiration")
    }
  }
}

handleLeaveGroup() โ€” Voluntary Departure

// GroupCoordinator.scala (simplified)
def handleLeaveGroup(
  groupId: String,
  memberIdentities: List[MemberIdentity],
  responseCallback: LeaveGroupResult => Unit
): Unit = {
  groupManager.getGroup(groupId) match {
    case None => responseCallback(LeaveGroupResult(Errors.UNKNOWN_MEMBER_ID, ...))

    case Some(group) =>
      group.inLock {
        val memberErrors = memberIdentities.map { identity =>
          group.get(identity.memberId) match {
            case None => MemberResponse(identity.memberId, Errors.UNKNOWN_MEMBER_ID)
            case Some(member) =>
              // Remove member and trigger rebalance immediately
              removeMemberAndUpdateGroup(group, member,
                  s"LeaveGroup received from member ${identity.memberId}")
              MemberResponse(identity.memberId, Errors.NONE)
          }
        }
        responseCallback(LeaveGroupResult(Errors.NONE, memberErrors))
      }
  }
}

The voluntary leave causes an immediate rebalance, which is faster than waiting for session.timeout.ms to expire. Consumer clients call this on close() or on JVM shutdown via shutdown hooks.

Offset Management: handleOffsetCommit() and handleOffsetFetch()

Consumer Groups store their committed offsets in the __consumer_offsets internal topic, managed by the Coordinator broker:

// GroupCoordinator.scala (simplified)
def handleOffsetCommit(
  groupId: String,
  generationId: Int,
  memberId: String,
  groupInstanceId: Option[String],
  offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata],
  responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit,
  requestLocal: RequestLocal
): Unit = {

  groupManager.getGroup(groupId) match {
    case Some(group) =>
      group.inLock {
        // Validate generation ID to reject stale commits from old generations
        if (generationId != group.generationId && generationId != OffsetCommitRequest.DEFAULT_GENERATION_ID) {
          responseCallback(offsetMetadata.map { case (tp, _) => tp -> Errors.ILLEGAL_GENERATION })
          return
        }
        // Append to __consumer_offsets via ReplicaManager
        groupManager.storeOffsets(group, memberId, groupInstanceId, offsetMetadata, responseCallback, requestLocal)
      }
    case None =>
      // No active group โ€” could be a standalone offset store (no rebalance involvement)
      groupManager.storeOffsets(GroupMetadata.loadingGroup(groupId), memberId, None, offsetMetadata, responseCallback, requestLocal)
  }
}
// GroupCoordinator.scala (simplified)
def handleOffsetFetch(
  groupId: String,
  partitions: Option[Seq[TopicPartition]],  // None = fetch all committed offsets
  requireStable: Boolean,
  responseCallback: Map[TopicPartition, OffsetFetchResponse.PartitionData] => Unit
): Unit = {
  groupManager.getGroup(groupId) match {
    case None =>
      // Try loading from __consumer_offsets (group may exist on disk but not in memory)
      val groupOffsets = groupManager.getOffsets(groupId, partitions)
      responseCallback(groupOffsets)

    case Some(group) =>
      group.inLock {
        if (group.is(Dead)) {
          responseCallback(partitions.getOrElse(Nil).map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap)
        } else {
          // Serve from in-memory offset cache (populated from __consumer_offsets on startup)
          val offsets = groupManager.getOffsets(groupId, partitions)
          responseCallback(offsets)
        }
      }
  }
}

__consumer_offsets Data Format

When an offset is committed, it is written as a key/value record:

Key:   [version][GroupId][TopicName][PartitionId]
Value: [version][Offset][LeaderEpoch][Metadata][CommitTimestamp][ExpireTimestamp]

GroupMetadataManager loads existing offset records from __consumer_offsets into an in-memory cache when the Coordinator starts (or when it becomes responsible for a new group after a rebalance among Coordinators). Subsequent OffsetFetch requests are served entirely from memory, with no disk I/O.

Complete Rebalance Sequence Diagram

Consumer A         Consumer B         GroupCoordinator
    โ”‚                  โ”‚                    โ”‚
    โ”‚โ”€โ”€JoinGroupโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ†’โ”‚  state โ†’ PreparingRebalance
    โ”‚                  โ”‚โ”€โ”€JoinGroupโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ†’โ”‚  (all members registered, DelayedJoin ticking)
    โ”‚                  โ”‚                    โ”‚
    โ”‚  (all members joined โ€” DelayedJoin.tryComplete() โ†’ true)
    โ”‚                  โ”‚                    โ”‚  generationId++, state โ†’ CompletingRebalance
    โ”‚โ†โ”€โ”€JoinGroupResponse(Leader=A)โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”‚  A gets full member list
    โ”‚                  โ”‚โ†โ”€โ”€JoinGroupRespโ”€โ”€โ”€โ”€โ”‚  B gets empty list
    โ”‚                  โ”‚                    โ”‚
    โ”‚  [A runs assignment algorithm locally] โ”‚
    โ”‚โ”€โ”€SyncGroup(assignment={A:[p0,p1], B:[p2]})โ”€โ”€โ†’โ”‚
    โ”‚                  โ”‚โ”€โ”€SyncGroupโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ†’โ”‚  (B registers callback)
    โ”‚                  โ”‚                    โ”‚
    โ”‚             (storeGroup to __consumer_offsets)
    โ”‚                  โ”‚                    โ”‚
    โ”‚โ†โ”€โ”€SyncGroupResp([p0, p1])โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”‚  state โ†’ Stable
    โ”‚                  โ”‚โ†โ”€โ”€SyncGroupResp([p2])โ”€โ”€โ”‚
    โ”‚                  โ”‚                    โ”‚
    โ”‚โ”€โ”€Heartbeatโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ†’โ”‚
    โ”‚โ†โ”€โ”€Heartbeat OK (Errors.NONE)โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”‚

Understanding this state machine explains why the entire Consumer Group stops consuming during a rebalance: all consumers are blocked waiting for JoinGroupResponse and SyncGroupResponse, and both require all members to participate. This is precisely the motivation for the Cooperative Sticky Assignor (incremental rebalance, KIP-429), which only reassigns partitions that genuinely need to move, allowing consumers whose assignments are unchanged to keep consuming throughout the rebalance.

Rate this chapter
4.7  / 5  (8 ratings)

๐Ÿ’ฌ Comments