Group Coordinator Source: The Complete Rebalance Flow
What a Consumer Group Really Is
A Consumer Group is Kafka's mechanism for the subscription messaging semantic: multiple consumers within the same group collectively consume all partitions of a topic, with each partition assigned to exactly one consumer at a time. When the number of consumers or topic partitions changes, assignments must be reshuffled โ a process called Rebalance.
Rebalance is one of Kafka's most intricate protocols. It is driven by a carefully designed state machine, requiring coordination between GroupCoordinator (broker side) and ConsumerCoordinator (client side). This chapter dives into GroupCoordinator.scala to trace the complete rebalance lifecycle.
GroupMetadata โ The Group's In-Memory State
Every Consumer Group has a corresponding GroupMetadata object on its Coordinator broker:
// GroupMetadata.scala (core/src/main/scala/kafka/coordinator/group/, simplified)
class GroupMetadata(val groupId: String, initialState: GroupState, time: Time) {
// Current state of the group (state machine)
private var state: GroupState = initialState
// All active members (memberId โ MemberMetadata)
private val members = new mutable.HashMap[String, MemberMetadata]
// Generation ID: incremented on every rebalance, used to reject stale requests
var generationId: Int = 0
// Partition assignment sent to members in the SyncGroup phase
// memberId โ serialized partition list (protocol-specific bytes)
private var assignment: Map[String, Array[Byte]] = Map.empty
// The leader member โ the one that runs the assignment algorithm
var leaderId: Option[String] = None
}
GroupState Enum
A group can be in one of five states:
sealed trait GroupState
case object Empty extends GroupState // group exists but has no active members
case object PreparingRebalance extends GroupState // waiting for all members to re-join
case object CompletingRebalance extends GroupState // waiting for leader to submit assignment
case object Stable extends GroupState // running normally, partitions assigned
case object Dead extends GroupState // group has been deleted
MemberMetadata โ A Single Member's State
// MemberMetadata.scala (simplified)
class MemberMetadata(
val memberId: String, // unique ID assigned by the Coordinator
val groupId: String,
val clientId: String, // consumer.client.id config
val clientHost: String, // consumer's IP address
val rebalanceTimeoutMs: Int, // max.poll.interval.ms
val sessionTimeoutMs: Int, // session.timeout.ms
val protocolType: String, // "consumer"
var supportedProtocols: List[(String, Array[Byte])] // supported assignment strategies + subscription metadata
) {
// Serialized partition assignment for this member (received from SyncGroup)
var assignment: Array[Byte] = Array.empty
// Pending JoinGroup response callback (held by DelayedJoin)
var awaitingJoinCallback: JoinGroupResult => Unit = null
// Pending SyncGroup response callback
var awaitingSyncCallback: SyncGroupResult => Unit = null
var latestHeartbeat: Long = -1
}
handleJoinGroup() โ The Rebalance Trigger
After a consumer calls subscribe(), its internal ConsumerCoordinator sends a JoinGroupRequest to the Coordinator broker. This is the entry point for every rebalance:
// GroupCoordinator.scala (core/, simplified)
def handleJoinGroup(
groupId: String,
memberId: String, // empty string on first join
groupInstanceId: Option[String], // static member ID (KIP-345)
requireKnownMemberId: Boolean,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinGroupResult => Unit,
requestLocal: RequestLocal
): Unit = {
groupManager.getOrMaybeCreateGroup(groupId, protocolType) match {
case None =>
responseCallback(JoinGroupResult(memberId, error = Errors.UNKNOWN_MEMBER_ID))
case Some(group) =>
group.inLock {
if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
responseCallback(JoinGroupResult(memberId, error = Errors.INVALID_SESSION_TIMEOUT))
return
}
group.currentState match {
case Dead =>
responseCallback(JoinGroupResult(memberId, error = Errors.COORDINATOR_NOT_AVAILABLE))
case Empty =>
// First member joining an empty group โ trigger rebalance
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
clientId, clientHost, protocolType, protocols, group, responseCallback)
case PreparingRebalance =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
// Brand-new member: assign it a memberId, add to group
addMemberAndRebalance(...)
} else {
// Known member re-joining during rebalance
updateMemberAndRebalance(group, memberId, protocols, responseCallback)
}
case CompletingRebalance =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
// New member arrives while waiting for SyncGroup: restart rebalance
addMemberAndRebalance(...)
} else {
// Known member rejoins: abort current SyncGroup phase, restart rebalance
prepareRebalance(group, s"Rebalance triggered by member join: $memberId")
updateMemberAndRebalance(group, memberId, protocols, responseCallback)
}
case Stable =>
group.get(memberId) match {
case None =>
// New member joining a stable group: trigger rebalance
addMemberAndRebalance(...)
case Some(existingMember) =>
if (existingMember.matchesProtocols(protocols) && memberId == group.leaderId.orNull) {
// Leader rejoining with same protocols (normal rejoin after max.poll.interval expiry)
// No rebalance needed โ return current generation info directly
responseCallback(JoinGroupResult.forStableGroup(group, existingMember))
} else {
// Subscription changed, or a follower is rejoining โ trigger rebalance
updateMemberAndRebalance(group, memberId, protocols, responseCallback)
}
}
}
}
}
}
prepareRebalance() โ Entering PreparingRebalance
// GroupCoordinator.scala (simplified)
private def prepareRebalance(group: GroupMetadata, reason: String): Unit = {
// If we are in CompletingRebalance, cancel pending SyncGroup responses
if (group.is(CompletingRebalance)) {
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
}
// Create the appropriate DelayedJoin timer
val delayedRebalance = if (group.is(Empty))
new InitialDelayedJoin(this, joinPurgatory, group, groupConfig, ...)
else
new DelayedJoin(this, group, group.rebalanceTimeoutMs)
group.transitionTo(PreparingRebalance)
info(s"Preparing to rebalance group ${group.groupId} in state ${group.currentState} " +
s"with old generation ${group.generationId} (reason: $reason)")
// Park DelayedJoin in Purgatory โ it fires when all members have joined or timeout expires
val groupKey = GroupJoinKey(group.groupId)
joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}
DelayedJoin โ Waiting for All Members
// DelayedJoin.scala (simplified)
class DelayedJoin(
coordinator: GroupCoordinator,
group: GroupMetadata,
rebalanceTimeout: Long
) extends DelayedOperation(rebalanceTimeout) {
// Completion condition: all currently known members have re-sent JoinGroupRequest
override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
override def onExpiration(): Unit = coordinator.onExpireJoin()
// Called when all members have joined or timeout fires
override def onComplete(): Unit = coordinator.onCompleteJoin(group)
}
// GroupCoordinator.scala (simplified)
def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean): Boolean = {
group.inLock {
if (group.hasAllMembersJoined) forceComplete()
else false
}
}
def onCompleteJoin(group: GroupMetadata): Unit = {
group.inLock {
// Evict members that never sent a JoinGroup request (timed out)
val notYetRejoinedDynamicMembers = group.notYetRejoinedDynamicMembers
if (notYetRejoinedDynamicMembers.nonEmpty) {
notYetRejoinedDynamicMembers.values.foreach { member =>
group.remove(member.memberId)
}
}
if (group.is(Dead)) return
// Increment generation ID โ this is the "new epoch" that all members must use
group.initNextGeneration() // generationId++, clears assignment
// Send JoinGroupResponse to all members
group.allMembers.foreach { member =>
val joinResult = JoinGroupResult(
// Leader receives the full member list (including each member's subscription metadata)
// Followers receive an empty list โ they just wait for their assignment
members = if (member.memberId == group.leaderId.orNull)
group.currentMemberMetadata
else
List.empty,
memberId = member.memberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderId.orNull,
error = Errors.NONE
)
member.awaitingJoinCallback(joinResult)
member.awaitingJoinCallback = null
}
// Transition to CompletingRebalance: waiting for leader's SyncGroup
group.transitionTo(CompletingRebalance)
}
}
handleSyncGroup() โ Distributing the Assignment
The leader consumer receives the JoinGroupResponse containing the full member list. It runs the partition assignment algorithm locally (Round Robin, Range, Sticky, etc.) and submits the result via SyncGroupRequest:
// GroupCoordinator.scala (simplified)
def handleSyncGroup(
groupId: String,
generation: Int,
memberId: String,
groupInstanceId: Option[String],
protocolType: Option[String],
protocolName: Option[String],
groupAssignment: Map[String, Array[Byte]], // only leader sends this; followers send empty
responseCallback: SyncGroupResult => Unit,
requestLocal: RequestLocal
): Unit = {
groupManager.getGroup(groupId) match {
case None =>
responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
case Some(group) =>
group.inLock {
if (group.generationId != generation) {
responseCallback(SyncGroupResult(Errors.ILLEGAL_GENERATION))
return
}
group.currentState match {
case Empty | Dead =>
responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
case PreparingRebalance =>
responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
case CompletingRebalance =>
// Register this member's response callback
group.get(memberId).foreach(_.awaitingSyncCallback = responseCallback)
// Only the leader's SyncGroup carries the actual assignment
if (memberId == group.leaderId.orNull) {
// Persist assignment to __consumer_offsets, then respond to everyone
setAndPropagateAssignment(group, groupAssignment)
group.transitionTo(Stable)
}
// Non-leader members: callback is registered, will fire when leader's call completes
case Stable =>
// Group is already stable (consumer restarted and re-synced)
val assignment = group.assignment(memberId)
responseCallback(SyncGroupResult(protocolType, protocolName, assignment, Errors.NONE))
}
}
}
}
setAndPropagateAssignment() โ Persist and Distribute
// GroupCoordinator.scala (simplified)
private def setAndPropagateAssignment(
group: GroupMetadata,
assignment: Map[String, Array[Byte]]
): Unit = {
// 1. Update each member's assignment field in GroupMetadata
assignment.foreach { case (memberId, memberAssignment) =>
group.get(memberId).foreach { member =>
member.assignment = memberAssignment
}
}
// 2. Persist GroupMetadata (including the assignment) to __consumer_offsets
groupManager.storeGroup(group, assignment, error => {
group.inLock {
if (group.is(CompletingRebalance) && error == Errors.NONE) {
// 3. Fire all pending SyncGroup callbacks โ each member receives only its own assignment
group.allMembers.foreach { member =>
member.awaitingSyncCallback(SyncGroupResult(
group.protocolType,
group.protocolName,
member.assignment, // each member gets only its own slice
Errors.NONE
))
member.awaitingSyncCallback = null
}
group.transitionTo(Stable)
}
}
})
}
handleHeartbeat() โ Maintaining the Session
// GroupCoordinator.scala (simplified)
def handleHeartbeat(
groupId: String,
memberId: String,
groupInstanceId: Option[String],
generationId: Int,
responseCallback: Errors => Unit
): Unit = {
groupManager.getGroup(groupId) match {
case None => responseCallback(Errors.UNKNOWN_MEMBER_ID)
case Some(group) =>
group.inLock {
group.currentState match {
case Dead =>
responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
case Empty =>
responseCallback(Errors.UNKNOWN_MEMBER_ID)
case CompletingRebalance =>
// Heartbeat received during SyncGroup phase
// Signal REBALANCE_IN_PROGRESS โ consumer will re-issue JoinGroup
responseCallback(Errors.REBALANCE_IN_PROGRESS)
case PreparingRebalance =>
group.get(memberId) match {
case None => responseCallback(Errors.UNKNOWN_MEMBER_ID)
case Some(member) =>
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.REBALANCE_IN_PROGRESS) // still rebalancing
}
case Stable =>
group.get(memberId) match {
case None => responseCallback(Errors.UNKNOWN_MEMBER_ID)
case Some(member) =>
if (generationId != group.generationId) {
responseCallback(Errors.ILLEGAL_GENERATION)
} else {
// Normal heartbeat: reset the session timer
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.NONE)
}
}
}
}
}
}
Session timeout handling โ when session.timeout.ms elapses without a heartbeat:
// GroupCoordinator.scala (simplified)
def onMemberHeartbeatExpired(group: GroupMetadata, member: MemberMetadata): Unit = {
group.inLock {
if (member.sessionTimeoutMs <= (time.milliseconds - member.latestHeartbeat)) {
info(s"Member ${member.memberId} in group ${group.groupId} has failed " +
s"(missed heartbeat), removing it from the group")
// Evict the member and trigger a rebalance
removeMemberAndUpdateGroup(group, member,
s"removing member ${member.memberId} on heartbeat expiration")
}
}
}
handleLeaveGroup() โ Voluntary Departure
// GroupCoordinator.scala (simplified)
def handleLeaveGroup(
groupId: String,
memberIdentities: List[MemberIdentity],
responseCallback: LeaveGroupResult => Unit
): Unit = {
groupManager.getGroup(groupId) match {
case None => responseCallback(LeaveGroupResult(Errors.UNKNOWN_MEMBER_ID, ...))
case Some(group) =>
group.inLock {
val memberErrors = memberIdentities.map { identity =>
group.get(identity.memberId) match {
case None => MemberResponse(identity.memberId, Errors.UNKNOWN_MEMBER_ID)
case Some(member) =>
// Remove member and trigger rebalance immediately
removeMemberAndUpdateGroup(group, member,
s"LeaveGroup received from member ${identity.memberId}")
MemberResponse(identity.memberId, Errors.NONE)
}
}
responseCallback(LeaveGroupResult(Errors.NONE, memberErrors))
}
}
}
The voluntary leave causes an immediate rebalance, which is faster than waiting for session.timeout.ms to expire. Consumer clients call this on close() or on JVM shutdown via shutdown hooks.
Offset Management: handleOffsetCommit() and handleOffsetFetch()
Consumer Groups store their committed offsets in the __consumer_offsets internal topic, managed by the Coordinator broker:
// GroupCoordinator.scala (simplified)
def handleOffsetCommit(
groupId: String,
generationId: Int,
memberId: String,
groupInstanceId: Option[String],
offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit,
requestLocal: RequestLocal
): Unit = {
groupManager.getGroup(groupId) match {
case Some(group) =>
group.inLock {
// Validate generation ID to reject stale commits from old generations
if (generationId != group.generationId && generationId != OffsetCommitRequest.DEFAULT_GENERATION_ID) {
responseCallback(offsetMetadata.map { case (tp, _) => tp -> Errors.ILLEGAL_GENERATION })
return
}
// Append to __consumer_offsets via ReplicaManager
groupManager.storeOffsets(group, memberId, groupInstanceId, offsetMetadata, responseCallback, requestLocal)
}
case None =>
// No active group โ could be a standalone offset store (no rebalance involvement)
groupManager.storeOffsets(GroupMetadata.loadingGroup(groupId), memberId, None, offsetMetadata, responseCallback, requestLocal)
}
}
// GroupCoordinator.scala (simplified)
def handleOffsetFetch(
groupId: String,
partitions: Option[Seq[TopicPartition]], // None = fetch all committed offsets
requireStable: Boolean,
responseCallback: Map[TopicPartition, OffsetFetchResponse.PartitionData] => Unit
): Unit = {
groupManager.getGroup(groupId) match {
case None =>
// Try loading from __consumer_offsets (group may exist on disk but not in memory)
val groupOffsets = groupManager.getOffsets(groupId, partitions)
responseCallback(groupOffsets)
case Some(group) =>
group.inLock {
if (group.is(Dead)) {
responseCallback(partitions.getOrElse(Nil).map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap)
} else {
// Serve from in-memory offset cache (populated from __consumer_offsets on startup)
val offsets = groupManager.getOffsets(groupId, partitions)
responseCallback(offsets)
}
}
}
}
__consumer_offsets Data Format
When an offset is committed, it is written as a key/value record:
Key: [version][GroupId][TopicName][PartitionId]
Value: [version][Offset][LeaderEpoch][Metadata][CommitTimestamp][ExpireTimestamp]
GroupMetadataManager loads existing offset records from __consumer_offsets into an in-memory cache when the Coordinator starts (or when it becomes responsible for a new group after a rebalance among Coordinators). Subsequent OffsetFetch requests are served entirely from memory, with no disk I/O.
Complete Rebalance Sequence Diagram
Consumer A Consumer B GroupCoordinator
โ โ โ
โโโJoinGroupโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ state โ PreparingRebalance
โ โโโJoinGroupโโโโโโโโโโโ (all members registered, DelayedJoin ticking)
โ โ โ
โ (all members joined โ DelayedJoin.tryComplete() โ true)
โ โ โ generationId++, state โ CompletingRebalance
โโโโJoinGroupResponse(Leader=A)โโโโโโโโโโ A gets full member list
โ โโโโJoinGroupRespโโโโโ B gets empty list
โ โ โ
โ [A runs assignment algorithm locally] โ
โโโSyncGroup(assignment={A:[p0,p1], B:[p2]})โโโโ
โ โโโSyncGroupโโโโโโโโโโโ (B registers callback)
โ โ โ
โ (storeGroup to __consumer_offsets)
โ โ โ
โโโโSyncGroupResp([p0, p1])โโโโโโโโโโโโโโ state โ Stable
โ โโโโSyncGroupResp([p2])โโโ
โ โ โ
โโโHeartbeatโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โโโโHeartbeat OK (Errors.NONE)โโโโโโโโโโโ
Understanding this state machine explains why the entire Consumer Group stops consuming during a rebalance: all consumers are blocked waiting for JoinGroupResponse and SyncGroupResponse, and both require all members to participate. This is precisely the motivation for the Cooperative Sticky Assignor (incremental rebalance, KIP-429), which only reassigns partitions that genuinely need to move, allowing consumers whose assignments are unchanged to keep consuming throughout the rebalance.