Cooperative Rebalance and Static Membership
Cooperative Rebalance and Static Membership
Rebalancing is Kafka's fundamental group coordination mechanism — it determines how partitions are distributed across consumers. For years, the traditional "stop-the-world" eager rebalance was one of the biggest availability killers for consumer groups. Kafka 3.x's Cooperative Rebalance and Static Membership fundamentally changed this, shrinking the unavailability window from seconds to milliseconds or eliminating it entirely.
Eager Rebalance: The Cost of Stopping Everything
Why It's Called Stop-the-World
Under the traditional Eager rebalance protocol, whenever a rebalance is triggered — a member joins or leaves, crashes, topic subscriptions change, or partition counts change — the protocol mandates:
- All members immediately revoke ALL their partitions: Regardless of whether a given consumer's partitions actually need to be reassigned, it must unconditionally release everything.
- All members send JoinGroup requests: They wait for the Group Coordinator to collect responses from every known member.
- The Leader Consumer computes the new assignment: The result is distributed to all members via SyncGroup responses.
- All members resume consuming from their newly assigned partitions.
Throughout this process, consumption of every partition in the group is suspended. The duration of this suspension depends on several factors:
Suspension time ≈ max(member response latency) + assignment algorithm time + round-trip network time
For a production consumer group with 200 partitions and 50 consumer instances, a single rebalance can cause 10–30 seconds of consumption suspension. During that window, upstream messages continue accumulating, and consumer lag grows unchecked.
What Triggers a Rebalance
The Group Coordinator maintains the consumer group's member list. Any of the following events triggers a rebalance:
- New consumer joining: Scaling out, rolling deployments
- Consumer leaving:
consumer.close()called gracefully - Consumer crash: Heartbeat timeout (
session.timeout.msexpired) - Subscription change: Consumer subscribes to a new topic
- Partition count change: A topic gains new partitions
- Poll interval exceeded: Consumer didn't call
poll()withinmax.poll.interval.ms
# Observe consumer group state during a rebalance
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--describe
# Example output during rebalance:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
# my-group orders 0 1000000 1000050 50 - - -
# my-group orders 1 2000000 2000100 100 - - -
# STATE: PreparingRebalance ← all consumption halted here
The Group Coordinator State Machine
The Group Coordinator maintains a state machine for each consumer group:
Empty → PreparingRebalance → CompletingRebalance → Stable
↑ |
└────────────────────────────────────────┘
(next rebalance trigger)
During PreparingRebalance, the Coordinator waits for all known members to send JoinGroup. Consumers that receive a REBALANCE_IN_PROGRESS error must stop consuming and revoke all partitions before they can send JoinGroup. This is the protocol-level requirement that makes rebalance stop-the-world — consumers must actively cooperate in halting themselves.
Cooperative Rebalance: Two-Round Incremental Protocol
Design Philosophy: Only Move What Must Move
Cooperative Rebalance (also called Incremental Cooperative Rebalance) is grounded in a simple observation: most rebalance triggers affect only a small subset of partitions, not all of them.
Consider a consumer group with 100 partitions and 10 consumers, where one new consumer joins. Ideally, only about 1 partition needs to be taken from each existing consumer and given to the newcomer. The Eager protocol stops all 100 partitions regardless.
Cooperative rebalance addresses this with a two-round negotiation:
Round 1: Discover which partitions need to move
All members send JoinGroup (with their currently held partition list)
↓
Leader computes the new target assignment
↓
Compares old vs. new assignment → identifies the migration set D
↓
SyncGroup response tells each member:
- Partitions to keep: CONTINUE CONSUMING, no interruption!
- Partitions in D: release before the next round
Round 2: Only the affected partitions get migrated
Members designated to release their partitions in D revoke them
↓
A second JoinGroup round is triggered (narrowly scoped to affected members)
↓
Leader assigns partitions in D to their target consumers
↓
Stable state restored
The critical insight: partitions not in the migration set D never stop consuming — they remain assigned and active throughout both rounds.
How CooperativeStickyAssignor Works
CooperativeStickyAssignor is the assignment strategy that implements cooperative rebalance. It extends AbstractStickyAssignor and adds cooperative protocol support on top of sticky partition retention.
Core algorithm steps:
- Sticky retention: Keep partitions with their current owner whenever possible, only moving them when load balance demands it.
- Balance enforcement: Ensure no consumer holds more than 1 extra partition compared to others.
- Generation tagging: Uses
generationfields to let the Coordinator distinguish between the two negotiation rounds.
// Configure CooperativeStickyAssignor
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
// Critical: use cooperative sticky assignment
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders", "payments"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// With cooperative rebalance, this is only called for partitions
// that are actually being moved, not all partitions
log.info("Revoking partitions: {}", partitions);
commitOffsetsForPartitions(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("Assigned partitions: {}", partitions);
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
// Called when partitions are lost without proper revocation (crash scenario)
log.warn("Lost partitions without revocation: {}", partitions);
}
});
Rolling Migration from Eager to Cooperative
You cannot switch directly from RangeAssignor or RoundRobinAssignor to CooperativeStickyAssignor in a live group, because all group members must agree on a protocol. The correct approach is a two-phase rolling upgrade:
Phase 1: Configure all instances to support both strategies simultaneously:
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName() + "," +
RoundRobinAssignor.class.getName());
Roll-restart all consumer instances. The Group Coordinator picks the highest-supported version common to all members.
Phase 2: Once all instances are running the Phase 1 config, remove the legacy strategy:
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
Roll-restart again.
The consumer group continues working normally throughout both phases without any full-stop window.
Static Membership: Eliminating Restart-Triggered Rebalances
The Root Problem: Every Restart Looks Like a New Member
By default, each consumer gets a new member_id assigned by the Coordinator on every startup (e.g., consumer-my-group-1-a3f8b2c1). This means:
- Restarting a consumer instance → old member leaves, new member joins → two rebalances triggered
- Rolling deployment of N instances → each restart triggers rebalances → potentially 2N rebalance events
In Kubernetes environments, where pod restarts happen frequently (OOM kills, node evictions, version upgrades), this creates a continuous rebalance storm that severely degrades consumer group stability.
group.instance.id: Stable Member Identity
Static membership is configured via group.instance.id — a stable, persistent identifier for a consumer instance across its restarts.
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
// Static membership: each instance uses a unique, stable ID
// Options: hostname, Pod name, container ID, etc.
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
System.getenv("HOSTNAME")); // Kubernetes pod name: "order-processor-0"
// Give instances enough time to restart before being declared dead
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000"); // 60 seconds
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
The behavioral difference on restart:
Dynamic member (default, on restart):
Instance restarts
→ Sends LeaveGroup (if graceful) or times out
→ Rebalance triggered (partition redistribution)
→ Instance reconnects with NEW member_id
→ Another rebalance triggered
→ Net: 2 rebalances, seconds of disruption
Static member (with group.instance.id, on restart):
Instance restarts
→ No LeaveGroup sent
→ Coordinator holds the assignment for up to session.timeout.ms
→ Instance reconnects, presents same group.instance.id
→ Coordinator recognizes it as the returning member
→ Assignment restored immediately
→ Net: 0 rebalances
The Timeout Boundary
When a static member exceeds session.timeout.ms without sending a heartbeat, the Coordinator:
- Marks that member's
group.instance.idas expired - Triggers a rebalance to redistribute its partitions
- If the instance later reconnects, it's treated as a rejoining member (another rebalance)
Therefore, session.timeout.ms must be set larger than your instance's worst-case restart time (JVM startup + Spring context initialization + connectivity checks). For most Java applications, 45–60 seconds is appropriate. Set too low and static membership loses its benefit; set too high and actual failures take longer to detect.
# Verify static membership is active — look for INSTANCE-ID column
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group order-processor \
--describe
# Output:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST INSTANCE-ID
# order-processor orders 0 5000000 5000010 10 consumer-...-abc123 /10.0.1.10 order-processor-0
# order-processor orders 1 4800000 4800020 20 consumer-...-def456 /10.0.1.11 order-processor-1
# order-processor orders 2 5100000 5100005 5 consumer-...-ghi789 /10.0.1.12 order-processor-2
The Golden Combination: Static Membership + CooperativeStickyAssignor
Zero-Rebalance Rolling Deployment
Combining both features achieves what was previously impossible — rolling deployments of a consumer group with zero rebalances:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "critical-order-processor");
// Static membership
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
System.getenv("HOSTNAME")); // "order-processor-0", "order-processor-1", etc.
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
// Cooperative rebalance
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
// Other tuning
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Walk through a 3-instance rolling deployment, each holding partitions [0-3], [4-7], [8-11]:
T=0: All 3 pods running stably, consuming their assigned partitions
T=10: pod-0 restarts (Kubernetes rolling update)
→ No LeaveGroup sent; Coordinator holds assignment for pod-0
→ pod-1 and pod-2 continue consuming their OWN partitions uninterrupted
→ Partitions [0-3] go briefly unconsumed (or Coordinator may reassign temporarily
if pod-0 misses more than session.timeout.ms heartbeats)
T=45: pod-0 comes back up, presents instance-id="order-processor-0"
→ Coordinator recognizes it, restores partitions [0-3] to pod-0
→ Cooperative rebalance if needed: only partitions [0-3] are affected
→ pod-1 and pod-2 never pause their consumption
T=60: pod-1 restarts → same process
T=110: pod-2 restarts → same process
Result: Zero full-group rebalances, minimal partition gap for restarting instance only
Kubernetes StatefulSet Configuration
apiVersion: apps/v1
kind: StatefulSet # StatefulSet is REQUIRED — Deployment pod names are not stable
metadata:
name: order-processor
spec:
replicas: 3
selector:
matchLabels:
app: order-processor
template:
metadata:
labels:
app: order-processor
spec:
containers:
- name: kafka-consumer
image: my-kafka-consumer:2.0.0
env:
- name: KAFKA_GROUP_INSTANCE_ID
valueFrom:
fieldRef:
fieldPath: metadata.name # stable: "order-processor-0/1/2"
- name: KAFKA_SESSION_TIMEOUT_MS
value: "60000"
resources:
requests:
cpu: "500m"
memory: "512Mi"
The StatefulSet requirement is non-negotiable. A Deployment assigns pods random hash suffixes (order-processor-7d4f8b-xk2p9), which change on every restart, defeating the purpose of group.instance.id.
Consumer Rack Awareness: Prefer Local-AZ Replicas
The Hidden Cost of Cross-AZ Reads
In a multi-AZ Kafka cluster, brokers are distributed across availability zones and replicas are placed using rack-aware distribution. Without rack awareness, consumers may fetch from a replica in a different AZ:
- Cross-AZ latency: Same-AZ is typically < 1ms; cross-AZ is typically 1–5ms
- Cross-AZ egress costs: AWS and GCP charge ~$0.01/GB for inter-AZ traffic — at 10 GB/s throughput this becomes significant quickly
- Bandwidth asymmetry: Intra-AZ bandwidth is typically higher than cross-AZ
Configuring Rack Awareness for Consumers (Kafka 3.x)
Broker side — already set during cluster setup:
# server.properties (per broker)
broker.rack=us-east-1a # matches the AZ or rack label
Enable Follower Fetching on the broker — without this, consumers always fetch from the leader:
# server.properties
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
Consumer side:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "rack-aware-consumer");
// Tell Kafka which rack/AZ this consumer is in
// Must match one of the broker.rack values in the cluster
props.put(ConsumerConfig.CLIENT_RACK_CONFIG, "us-east-1a");
// Rack-aware assignment (Kafka 3.x) — assigns partitions to consumers
// whose rack matches the partition's local replica
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
With RackAwareReplicaSelector enabled, the broker evaluates the consumer's client.rack against the available replicas for each partition fetch. If a follower replica exists in the same rack as the consumer, that replica serves the fetch — no cross-AZ traffic, no cross-AZ charges.
Getting the AZ Value in Cloud Environments
// For AWS EC2/EKS: read instance metadata
public static String getAvailabilityZone() {
try {
URL url = new URL("http://169.254.169.254/latest/meta-data/placement/availability-zone");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setConnectTimeout(2000);
conn.setReadTimeout(2000);
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(conn.getInputStream()))) {
return reader.readLine(); // e.g., "us-east-1a"
}
} catch (Exception e) {
log.warn("Could not determine AZ from metadata service, using default", e);
return ""; // fallback: no rack preference
}
}
// In Kubernetes, expose via Downward API or node labels
// env:
// - name: MY_AZ
// valueFrom:
// fieldRef:
// fieldPath: metadata.labels['topology.kubernetes.io/zone']
Complete Production Consumer Configuration
Bringing everything together — static membership, cooperative rebalance, and rack awareness:
public KafkaConsumer<String, OrderEvent> buildProductionConsumer(String podName, String availabilityZone) {
Properties props = new Properties();
// Connection
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092,broker3:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-event-processor");
// Static membership
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, podName);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
// Cooperative rebalance
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
// Rack awareness
props.put(ConsumerConfig.CLIENT_RACK_CONFIG, availabilityZone);
// Poll tuning
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
// Fetch tuning
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); // 1KB min per fetch
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
// Deserialization
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class.getName());
return new KafkaConsumer<>(props);
}
Monitoring Rebalance Health
Rebalance frequency and duration are primary indicators of consumer group health. Key metrics to track:
# JMX metrics (via kafka.consumer:type=consumer-coordinator-metrics)
# rebalance-total — lifetime count of rebalances for this consumer
# rebalance-rate-per-hour — recent rebalance rate
# last-rebalance-seconds-ago — how long ago the last rebalance completed
# join-time-avg / join-time-max — time spent in JoinGroup
# Check via JConsole or Prometheus JMX exporter:
# kafka_consumer_coordinator_rebalance_total{client_id="...",group="..."} 3.0
# kafka_consumer_coordinator_last_rebalance_seconds_ago{...} 86400.0 ← healthy: rebalanced once a day
# Quick health check via CLI
watch -n 5 'kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group order-processor \
--describe 2>&1 | grep -E "STATE|INSTANCE-ID|GROUP"'
A healthy consumer group with static membership and cooperative rebalance should show:
- STATE: Stable at all times except during intentional topology changes
- Rebalance rate near zero — only triggered by deliberate scaling events
- INSTANCE-ID populated — confirming static membership is active
- No members with empty CONSUMER-ID — confirming no members are in limbo
If rebalances are still frequent, investigate in this order:
- Are any instances crashing and restarting faster than
session.timeout.ms? - Are any instances exceeding
max.poll.interval.msdue to slow processing? - Is there a
group.instance.idcollision (two pods with the same static ID)? - Is the
CooperativeStickyAssignoractually being used? Check Coordinator logs for protocol negotiation.
Cooperative rebalance and static membership represent the maturation of Kafka's consumer group protocol — from a mechanism that treated every topology change as a full stop to one that surgically adjusts only what needs to change. Together with rack-aware replica selection, they form the foundation of production-grade consumer deployment in multi-AZ Kafka clusters.