Chapter 9

Consumer Group Protocol: Join, Sync and Heartbeat

A Kafka Consumer Group lets multiple consumer instances collaboratively consume one or more topics, with partition assignments automatically balanced across members and rebalanced as the group membership changes. The machinery that makes this work is a precisely defined distributed protocol — a sequence of explicit network requests that every Kafka client library implements. Understanding this protocol at the message level tells you why rebalances halt consumption, why heartbeat timeouts evict members, and why CooperativeStickyAssignor can rebalance without stopping the world.

Finding the GroupCoordinator

Before any group-level operation can happen, the consumer must discover which broker is acting as its GroupCoordinator. The answer is deterministic:

coordinator_partition = abs(hash(groupId)) % numPartitions(__consumer_offsets)

With the default of 50 __consumer_offsets partitions, every groupId maps to exactly one partition. The broker that is the Leader of that partition is the GroupCoordinator for that group.

Why Collocate Coordinator with Offset Storage?

This design is intentional. The GroupCoordinator and the offset commit log for the group live on the same broker, allowing:

  1. Zero remote writes for offset commits — the coordinator writes directly to its local partition leader
  2. Automatic co-migration — when the __consumer_offsets partition leader changes (due to broker restart or failure), both the coordinator role and the offset log move together, maintaining consistency
  3. No split-brain — there is exactly one coordinator per group at any moment, tied to partition leadership
// Consumer startup: discover coordinator
// Step 1: send FindCoordinatorRequest to any broker
FindCoordinatorRequest request = new FindCoordinatorRequest.Builder(
    new FindCoordinatorRequestData()
        .setKey(groupId)
        .setKeyType(CoordinatorType.GROUP.id())
).build();

// Response contains: nodeId, host, port of the coordinator broker
// Step 2: all subsequent group management requests go to this coordinator

The Five-Step Rebalance Protocol

A Consumer Group rebalance consists of five explicit protocol phases. Every member participates in each phase.

Phase 1: FindCoordinator

On startup, or whenever the current coordinator becomes unreachable, the consumer sends FindCoordinatorRequest to any bootstrap broker. The response provides the coordinator's host and port. All subsequent group management communication targets this specific broker.

Phase 2: JoinGroup — Forming the Group

All members send JoinGroupRequest to the coordinator, carrying:

JoinGroupRequest {
  groupId: "order-consumers"
  sessionTimeoutMs: 45000          // session.timeout.ms
  rebalanceTimeoutMs: 300000       // max.poll.interval.ms  
  memberId: ""                     // empty for new members, existing ID for rejoins
  groupInstanceId: null            // non-null for Static Membership
  protocolType: "consumer"
  protocols: [
    { name: "cooperative-sticky", metadata: <subscription bytes> },
    { name: "range",              metadata: <subscription bytes> }
  ]
}

The coordinator waits up to group.initial.rebalance.delay.ms (default 3s) after the first JoinGroupRequest arrives to collect all members before proceeding. Then:

  1. Selects the assignment protocol: Uses voting — the protocol that all members support and the most members prefer wins. If members support different strategy sets, only the intersection is eligible.
  2. Elects the Group Leader: Typically the first member to join. The leader is just a regular consumer instance — it performs the assignment calculation but has no other special role.
  3. Assigns member IDs: Each member gets a unique memberId string (format: {clientId}-{UUID}).
  4. Sends JoinGroupResponse: The leader receives the complete member list with each member's subscription metadata. Non-leader members receive only their own information.

Critical design decision: The assignment algorithm runs on the client side (in the leader member's JVM), not on the broker. This keeps broker complexity low and allows sophisticated assignment strategies to be implemented in client libraries without broker upgrades.

// In ConsumerCoordinator (simplified from Kafka client source)
private RequestFuture<ByteBuffer> initiateJoinGroup() {
    return sendJoinGroupRequest().compose(new RequestFutureAdapter<>() {
        @Override
        public void onSuccess(JoinGroupResponse response, RequestFuture<ByteBuffer> future) {
            // Am I the leader?
            if (response.data().leader().equals(response.data().memberId())) {
                // Run assignment algorithm locally
                Map<String, ByteBuffer> assignments = performAssignment(
                    response.data().leader(),
                    response.data().protocolName(),
                    response.data().members()
                );
                future.complete(assignments);
            } else {
                // Non-leader: wait for SyncGroup to get assignment
                future.complete(null);
            }
        }
    });
}

Phase 3: Assignment Calculation (Leader-Local)

The group leader deserializes the subscription metadata from every member's JoinGroupResponse entry, then runs the selected assignment strategy. This is a pure in-memory computation — no network calls needed, so even a 1,000-member group computes assignments in milliseconds.

The output is a map of memberId → List<TopicPartition>.

Phase 4: SyncGroup — Distributing Assignments

Leader sends:
SyncGroupRequest {
  assignments: {
    "consumer-1-abc": [orders-0, orders-1, orders-4, orders-5],
    "consumer-2-def": [orders-2, orders-3, orders-6, orders-7],
    ...
  }
}

Non-leaders send:
SyncGroupRequest {
  assignments: {}  // empty — only the leader provides assignments
}

The coordinator stores the assignment, then replies to every member's SyncGroupRequest with their individual partition list in the SyncGroupResponse. All members block on this response before they begin polling.

After receiving SyncGroupResponse, each consumer:

  1. Knows which partitions to consume
  2. Fetches the last committed offset for each assigned partition from __consumer_offsets
  3. Begins calling poll() to fetch records starting from those offsets

Phase 5: The Heartbeat Loop

After joining, each consumer runs a dedicated background thread (kafka-coordinator-heartbeat-thread | {groupId}) that periodically sends HeartbeatRequest:

HeartbeatRequest {
  groupId: "order-consumers"
  generationId: 7          // must match current generation
  memberId: "consumer-1-abc"
  groupInstanceId: null
}

The heartbeat.interval.ms (default 3,000ms) sets the send interval. The coordinator considers a member dead if no heartbeat arrives within session.timeout.ms (default 45,000ms).

When the coordinator decides to trigger a rebalance (due to member join/leave/death), it signals existing members through the next HeartbeatResponse:

HeartbeatResponse {
  errorCode: REBALANCE_IN_PROGRESS  // "time to re-join"
}

Members receiving this error stop fetching and re-submit JoinGroupRequest, starting the protocol from Phase 2.

Generation ID: Fencing Stale Requests

Each complete rebalance cycle increments the Generation ID — an integer stored by the coordinator. Every group-management request (heartbeats, offset commits, sync) must carry the current generation ID.

When the coordinator receives a request with an outdated generation ID, it rejects it with ILLEGAL_GENERATION. This prevents a slow member that missed a rebalance from committing offsets from its old partition assignment into the new generation's offset record — which would corrupt position tracking for whatever member is now responsible for those partitions.

// Consumer-side handling of ILLEGAL_GENERATION
if (error == Errors.ILLEGAL_GENERATION || error == Errors.UNKNOWN_MEMBER_ID) {
    // Our generation is stale — must rejoin
    coordinator.requestRejoin("received error " + error);
    return RequestFuture.failure(error.exception());
}

Assignment Strategies: Source-Level Explanation

RangeAssignor

Operates independently per topic. For each topic, it sorts partitions numerically, sorts members alphabetically, then divides the partition list into contiguous ranges:

// For topic T with 10 partitions and 3 members:
// member-0: partitions 0, 1, 2, 3   (10/3 = 3, plus 1 extra for the first 10%3=1 members)
// member-1: partitions 4, 5, 6
// member-2: partitions 7, 8, 9

int numPartitionsPerConsumer = numPartitions / numConsumers;       // 3
int consumersWithExtraPartition = numPartitions % numConsumers;    // 1

for (int i = 0; i < sortedConsumers.size(); i++) {
    int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
    int num   = numPartitionsPerConsumer + (i < consumersWithExtraPartition ? 1 : 0);
    assignment.get(sortedConsumers.get(i)).addAll(
        partitions.subList(start, start + num)
    );
}

Weakness: With multiple topics, the "extra" partition always goes to the same early-alphabetical members. If consumer-a gets the extra partition from every topic, it ends up with significantly more work than consumer-z. The imbalance worsens with more topics.

RoundRobinAssignor

Sorts all TopicPartition objects across all topics alphabetically, then distributes them round-robin across all members:

// All TopicPartitions across all topics, sorted
List<TopicPartition> allPartitions = allTopicPartitions.stream()
    .sorted(Comparator.comparing(TopicPartition::topic)
                      .thenComparingInt(TopicPartition::partition))
    .collect(toList());

CircularIterator<String> memberIter = new CircularIterator<>(sortedMembers);

for (TopicPartition tp : allPartitions) {
    String memberId = memberIter.next();
    // Skip members that don't subscribe to this topic
    while (!subscriptions.get(memberId).topics().contains(tp.topic())) {
        memberId = memberIter.next();
    }
    assignment.get(memberId).add(tp);
}

Better balance than Range for heterogeneous topic subscriptions. Weakness: After a rebalance, nearly every member gets entirely different partitions compared to before. All consumers must reset their fetch positions, losing in-memory state built up from previously assigned partitions. This causes a "cold start" effect on every rebalance.

CooperativeStickyAssignor: Two-Round Rebalance

Introduced in Kafka 2.4, CooperativeStickyAssignor solves the two fundamental problems with traditional ("eager") rebalancing:

Problem 1 (Eager Rebalance): All members revoke all their partitions before the assignment is computed, then receive entirely new assignments. During the revocation-to-reassignment window, zero consumption happens for the group. This "stop the world" window can last seconds for large groups.

Problem 2 (Eager Rebalance): Even partitions that don't need to move are revoked and reassigned, causing unnecessary cache invalidation and potential re-processing.

CooperativeStickyAssignor's solution — Round 1:

Each member includes its currently owned partitions in JoinGroupRequest metadata. The leader computes:

In the first SyncGroupResponse:

Round 2 (triggered only by revoking members):

The members that revoked partitions send another JoinGroupRequest (they are the only ones who must rejoin). A second, smaller rebalance runs, this time assigning the newly-freed partitions to their destination members.

Net result: During a rolling deployment or scaling event, the vast majority of partitions continue being consumed without interruption. Only the partitions actively migrating experience a brief pause.

// Configure CooperativeStickyAssignor
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumers");
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
    CooperativeStickyAssignor.class.getName());
// Note: if migrating from eager strategies, you must configure a transition period
// by temporarily listing both strategies: [CooperativeStickyAssignor, RangeAssignor]

Migrating from an eager strategy to cooperative: Since cooperative and eager protocols are incompatible mid-group, Kafka requires a transition:

  1. Deploy with [CooperativeStickyAssignor, RangeAssignor] — members negotiate range (eager) while advertising cooperative support
  2. Once all members are updated, deploy with [CooperativeStickyAssignor] alone — next rebalance uses cooperative

max.poll.interval.ms: The Second Eviction Mechanism

There are two independent ways a consumer can be evicted from the group:

Timeout Type Config Parameter Default Detection Mechanism
Session timeout session.timeout.ms 45,000ms Coordinator tracks last heartbeat time
Poll interval timeout max.poll.interval.ms 300,000ms Coordinator tracks time since last JoinGroup/poll

max.poll.interval.ms guards against a consumer that is alive and sending heartbeats (so session.timeout.ms never triggers) but is stuck processing messages and not calling poll():

// Dangerous pattern: processing time may exceed max.poll.interval.ms
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // If this takes > 5 minutes (default max.poll.interval.ms),
        // the coordinator evicts this consumer and triggers rebalance.
        // The heartbeat thread keeps sending heartbeats, but it doesn't matter —
        // the coordinator is checking poll interval, not heartbeat alone.
        slowExternalServiceCall(record.value());
    }
}

The consumer's heartbeat thread runs independently and can report the consumer as "alive" even when the consumer loop is blocked. The coordinator's poll interval check provides a secondary circuit breaker specifically for "alive but not making progress" scenarios.

Remedies for poll interval violations:

  1. Increase max.poll.interval.ms: Simple, but delays detection of genuinely stuck consumers
  2. Reduce max.poll.records: Take fewer records per poll, process them faster, poll more often
  3. Async processing with manual offset management: Push work to a thread pool, commit offsets explicitly after all in-batch records are processed (complex, requires careful offset ordering)
// Safe pattern: bound processing time per poll batch
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "50");      // limit batch size
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "30000"); // 30s limit
// Ensure: (max_processing_time_per_record * max_poll_records) < max.poll.interval.ms

Static Membership: Surviving Restarts Without Rebalance

By default, every consumer restart creates a new memberId, which the coordinator interprets as a new member joining. This triggers a full rebalance — even if the consumer will immediately reclaim its old partitions.

In CI/CD environments with frequent rolling deployments, this causes rebalances every few minutes. Kafka 2.3 introduced Static Membership to handle this:

props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "order-consumer-pod-0");

With a group.instance.id configured:

This is particularly powerful in Kubernetes deployments where pod names are stable:

# Kubernetes StatefulSet: pod names are stable (order-consumer-0, order-consumer-1, ...)
# Use pod name as group.instance.id for zero-rebalance rolling updates
env:
  - name: KAFKA_GROUP_INSTANCE_ID
    valueFrom:
      fieldRef:
        fieldPath: metadata.name

Monitoring Group Health

# Describe group state and assignments
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 \
  --describe --group order-consumers

# Output columns: GROUP, TOPIC, PARTITION, CURRENT-OFFSET, LOG-END-OFFSET, LAG, CONSUMER-ID, HOST

# Watch for rebalances in real time (Kafka 3.x)
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 \
  --describe --group order-consumers \
  --state
# State: Stable | PreparingRebalance | CompletingRebalance | Dead | Empty

# Identify stuck consumers (excessive lag on one member)
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 \
  --describe --group order-consumers \
  --members --verbose

The state column reveals where in the rebalance protocol the group currently sits. PreparingRebalance means the coordinator is waiting for all members to send JoinGroupRequest. CompletingRebalance means the coordinator is waiting for all SyncGroupRequest messages. A group stuck in either state usually indicates a consumer that is unreachable or processing too slowly to respond to the REBALANCE_IN_PROGRESS signal.

Understanding the Consumer Group protocol at this level — coordinator selection, the five-phase handshake, generation ID fencing, the difference between the three assignment strategies, and the two independent eviction mechanisms — is the foundation for diagnosing the most common production Kafka problems: rebalance storms, consumer lag spikes, and duplicate consumption after partition reassignment. The next chapter examines the other half of the consumer story: where offsets live, how commits work internally, and why commitAsync deliberately never retries.

Rate this chapter
4.7  / 5  (40 ratings)

💬 Comments