Replication: ISR, Watermarks and Data Consistency
Replication: ISR, Watermarks and Data Consistency
Kafka's replication mechanism is the foundation of its durability and high availability guarantees. Understanding ISR (In-Sync Replicas), the three watermarks (LEO, HW, and Leader Epoch), and how they interact during various failure scenarios is essential for truly understanding what Kafka's consistency guarantees mean โ and where their boundaries lie.
The Follower Fetch Loop: How Replication Works
Push vs Pull: Why Kafka Chose Pull
Kafka's replication uses a Follower pull model rather than a Leader push model. This architectural choice has deep engineering rationale:
Problems with push:
- The Leader must track every Follower's replication position and buffer pending sends
- A slow Follower forces the Leader to either slow down or accumulate unbounded send queues
- Replication logic becomes coupled to the Leader's write path, increasing complexity
Advantages of pull:
- Followers replicate at their own pace without blocking the Leader's write path
- Follower Fetch requests are structurally identical to consumer Fetch requests โ they reuse the same protocol handlers (with replica identification added)
- The Leader only needs to track Follower LEOs, not manage push queues
FetchRequest/FetchResponse: Key Fields
When a Follower fetches from the Leader, the request looks like this:
FetchRequest (Follower โ Leader):
โโโ replicaId: <Follower broker ID> โ negative for consumers (-1), positive for replicas
โโโ maxWaitMs: replica.fetch.wait.max.ms (default: 500ms)
โโโ minBytes: replica.fetch.min.bytes (default: 1)
โโโ topics:
โ โโโ partitions:
โ โโโ partition: 0
โ โโโ fetchOffset: <Follower's current LEO> โ "I have everything up to here"
โ โโโ logStartOffset: <Follower's log start>
โ โโโ maxBytes: replica.fetch.max.bytes (default: 10MB)
FetchResponse (Leader โ Follower):
โโโ topics:
โ โโโ partitions:
โ โโโ partition: 0
โ โโโ highWatermark: <Leader's current HW> โ Follower uses this to update its own HW
โ โโโ lastStableOffset: <LSO, for transaction-aware consumers>
โ โโโ records: <RecordBatches starting at fetchOffset>
The fetchOffset field serves double duty: it tells the Leader where to start sending data, and it tells the Leader what the Follower's current LEO is. This is how the Leader tracks replica progress without the Follower sending a separate status message.
The Three Watermarks: Core of Kafka's Consistency Model
LEO (Log End Offset): Each Replica's Write Frontier
Definition: The offset of the next message to be written to a replica's log. Equivalently, one greater than the offset of the last written message.
- Leader LEO = (offset of the last message Leader has written) + 1
- Follower LEO = (offset of the last message Follower has fetched and written) + 1
LEO is private to each replica. Different replicas may have different LEOs โ Followers are typically slightly behind the Leader.
HW (High Watermark): The Safe Consumption Boundary
Definition: The minimum LEO across all replicas in the ISR (In-Sync Replica set).
HW = min(Leader LEO, Follower-1 LEO, Follower-2 LEO, ...)
where only ISR members are included in the min calculation
Why HW is the consumption boundary:
Consumers are only allowed to read messages up to the current HW (exclusive). This guarantee ensures: if the current Leader fails and a new Leader is elected from the ISR, the new Leader is guaranteed to have all messages below HW. If consumers could read beyond HW, and then the Leader failed with an ISR member that didn't have those messages becoming the new Leader, consumers would have read messages that no longer exist โ a phantom read scenario that breaks the fundamental consistency contract.
Leader Epoch: The Fix for the HW Truncation Bug
Leader Epoch is a monotonically increasing generation counter that increments with each Leader election. Every RecordBatch written to the log includes the current partitionLeaderEpoch in its header (the partitionLeaderEpoch field in the batch format).
Why Leader Epoch was needed (KIP-101 background):
Before KIP-101, Followers used HW comparison to decide whether to truncate their local log on restart. This mechanism had a subtle data loss bug under specific timing conditions:
Initial state (after ISR has confirmed through offset 4, HW=5):
Leader (Broker A): [0,1,2,3,4,5] LEO=6 HW=5
Follower (Broker B): [0,1,2,3,4] LEO=5 HW=5
Note: A has written offset 5, B's FetchRequest carrying LEO=5 hasn't arrived yet,
so HW hasn't advanced to 6 yet.
Step 1: Both A and B crash simultaneously (e.g., power outage in the datacenter)
Step 2: B restarts first, becomes the new Leader
B's log: [0,1,2,3,4] LEO=5
Step 3: A restarts, still has [0,1,2,3,4,5] in its log
A sees its stored HW=5, looks at its LEO=6
Old logic: "my LEO > HW, the data beyond HW might be uncommitted, truncate!"
A truncates its log to LEO=5, DELETING offset 5!
Step 4: A joins as Follower to new Leader B
B only has [0,1,2,3,4], A syncs to match
Offset 5 is permanently lost โ even though it was in HW before the crash!
Leader Epoch solution:
After restart, instead of using HW to decide truncation, a Follower sends an OffsetForLeaderEpochRequest to the current Leader:
"For my last known Leader Epoch (E), what is the last offset you have for that epoch?"
The Leader responds with the correct end offset for that epoch, and the Follower truncates precisely to that point โ never erasing data that the current Leader actually has. This eliminates the race condition entirely.
# Inspect leader epoch checkpoint file
cat /data/kafka/orders-0/leader-epoch-checkpoint
# Format: version, then epochโstartOffset pairs
# 1
# 0 0 โ Epoch 0 started at offset 0 (initial leader)
# 1 5000 โ Epoch 1 started at offset 5000 (first leader change)
# 2 10500 โ Epoch 2 started at offset 10500 (second leader change)
LEO/HW Progression: Step-by-Step Diagram
Tracing a complete produce operation through a 3-replica topic, showing how LEO and HW evolve:
Initial state: all replicas synchronized
Leader: [0,1,2,3,4] LEO=5 HW=5
Follower1: [0,1,2,3,4] LEO=5 HW=5
Follower2: [0,1,2,3,4] LEO=5 HW=5
Step 1: Producer sends record with acks=all
Leader writes offset=5 to local log
Leader: [0,1,2,3,4,5] LEO=6 HW=5 โ HW unchanged: Followers haven't confirmed yet
Follower1: [0,1,2,3,4] LEO=5 HW=5
Follower2: [0,1,2,3,4] LEO=5 HW=5
[DelayedProduce created in Purgatory, waiting for ISR confirmation]
Step 2: Follower1 sends FetchRequest(fetchOffset=5)
Leader sends: records=[offset5], highWatermark=5
Follower1 writes offset=5, advances its LEO to 6
Follower1 reads HW=5 from response, updates its local HW=5
Leader: [0,1,2,3,4,5] LEO=6 HW=5
Follower1: [0,1,2,3,4,5] LEO=6 HW=5
Step 3: Follower2 sends FetchRequest(fetchOffset=5), same process
Leader: [0,1,2,3,4,5] LEO=6 HW=5
Follower2: [0,1,2,3,4,5] LEO=6 HW=5
Step 4: Follower1 sends next FetchRequest(fetchOffset=6)
This tells Leader: "Follower1's LEO is now 6"
Leader updates its tracking of Follower1's LEO: 6
New HW = min(Leader LEO=6, Follower1 LEO=6, Follower2 LEO=6) = 6 โ HW advances!
Leader triggers DelayedProduce: all ISR members confirmed
ProduceResponse sent to Producer with offset=5
Step 5: Leader's FetchResponse to Follower1 carries highWatermark=6
Follower1 updates its HW=6
(Follower2 gets HW=6 on its next FetchRequest as well)
Step 6: Consumer calls poll()
Consumer can now read offset=5 (HW=6 > 5)
Offset 5 is safely replicated to all ISR members
Key insight: HW does not advance the moment the Leader writes a record. It advances when the Leader processes the next FetchRequest from a Follower and learns (via the fetchOffset field) that the Follower has caught up. HW advancement always lags at least one FetchRequest round-trip behind the Leader's LEO advancement.
ISR Management: Dynamic Membership
replica.lag.time.max.ms: The Sync Tolerance Boundary
The ISR (In-Sync Replicas) is the set of replicas the Leader considers "sufficiently synchronized." The criterion:
A Follower that has not sent a FetchRequest or whose LEO has not caught up within replica.lag.time.max.ms (default: 30,000ms) is removed from the ISR.
Note: Kafka 3.0 removed the older replica.lag.max.messages parameter (message-count-based lag detection), which was removed because it caused false positives during burst producing scenarios.
# Watch ISR changes in broker logs
grep "ISR" /var/log/kafka/server.log | grep -E "(updated|shrunk|expanded)" | tail -20
# INFO [Partition orders-1 epoch=3] ISR updated from [1, 2, 3] to [1, 2]
# because replica 3 is no longer in-sync (kafka.cluster.Partition)
# INFO [Partition orders-1 epoch=3] ISR updated from [1, 2] to [1, 2, 3]
# because replica 3 rejoined ISR (kafka.cluster.Partition)
# Check topic partition ISR state
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic orders
# Topic: orders Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 โ healthy
# Topic: orders Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3 โ ISR shrunken!
# Topic: orders Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3 โ only leader in ISR!
A Follower may be removed from ISR due to:
- Broker overload: slow disk I/O, JVM GC pause, CPU saturation
- Node crash or restart
- Network partition separating Follower from Leader
min.insync.replicas: The ISR Floor Protection
min.insync.replicas sets a minimum ISR size threshold. When a producer uses acks=all:
- If current ISR size โฅ
min.insync.replicasโ write proceeds normally - If current ISR size <
min.insync.replicasโNotEnoughReplicasExceptionthrown
// Producer-side handling
Properties props = new Properties();
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
RecordMetadata metadata = producer.send(
new ProducerRecord<>("orders", key, value)
).get(10, TimeUnit.SECONDS);
log.info("Written to partition={} offset={}", metadata.partition(), metadata.offset());
} catch (ExecutionException e) {
if (e.getCause() instanceof NotEnoughReplicasException) {
// ISR has fallen below min.insync.replicas
// Options depend on business requirements:
// 1. Retry with backoff (if ISR is expected to recover quickly)
// 2. Write to a dead-letter queue and alert operations
// 3. For compliance-critical data: halt processing and page on-call
alertingService.notifyISRDegraded(topic, partition, e.getMessage());
deadLetterQueue.send(key, value);
} else if (e.getCause() instanceof TimeoutException) {
log.error("Produce timed out โ broker may be overloaded");
}
}
Recommended configurations by durability requirement:
| Use Case | replication.factor | min.insync.replicas | acks | Can Tolerate Failure Of |
|---|---|---|---|---|
| Financial / compliance data | 3 | 2 | all | 1 broker (writes fail if 2 brokers down) |
| Standard business events | 3 | 2 | all | 1 broker |
| High-throughput metrics/logs | 3 | 1 | 1 | any 2 brokers (with possible data loss) |
| Development / testing | 1 | 1 | 1 | N/A |
For 3 replicas with minISR=2: the system tolerates 1 Follower falling behind (ISR=2 is fine). If 2 Followers fall behind simultaneously (ISR=1 = only the Leader), writes fail. This prevents a scenario where data appears committed but is only on a single broker that could then fail.
Unclean Leader Election: The Availability vs Durability Trade-off
What Is an "Unclean" Election?
When all ISR members for a partition become unavailable simultaneously (e.g., a datacenter segment loses power), Kafka faces a binary choice:
unclean.leader.election.enable=false (default, recommended):
- Wait for an ISR member to come back online
- Partition is unavailable during the wait (producers get
LeaderNotAvailableException, consumers cannot read new messages) - Zero data loss โ all data that was committed to HW is preserved
unclean.leader.election.enable=true:
- Allow an out-of-sync replica (not in ISR) to become the new Leader
- Partition immediately recovers availability
- Possible data loss โ the new Leader may be missing messages that were committed to HW before the failure
Data Loss Scenario Walkthrough
Setup: 3-replica partition, min.insync.replicas=2, acks=all
Last confirmed state before disaster:
Leader (Broker 1): [0 ... 100, 101] LEO=102 HW=102
Follower (Broker 2): [0 ... 100, 101] LEO=102 HW=102 (fully in sync)
Follower (Broker 3): [0 ... 100] LEO=101 HW=101 (just fell behind, being caught up)
Consumer successfully read offset=101 (HW was 102, so 101 was visible and consumed)
--- Disaster ---
Broker 1: CRASHED (power outage)
Broker 2: CRASHED (network equipment failure)
Broker 3: ALIVE but only has offsets 0..100
If unclean.leader.election.enable=true:
Broker 3 elected as new Leader (only survivor, not in ISR)
New Leader's log: [0 ... 100] LEO=101 HW=101
Consumer checks for offset 101: NOT FOUND (new Leader starts at LEO=101)
โ Offset 101 is permanently lost
โ Consumer had already processed this message โ it processed data that no longer exists
โ Any downstream state built from offset 101 is now inconsistent with Kafka's log
If unclean.leader.election.enable=false (default):
Partition goes offline: producers fail, consumers see LEADER_NOT_AVAILABLE
Operations team is paged; Broker 1 or 2 is recovered
Once either returns:
โ Partition leader elected from recovered ISR member
โ ALL data (including offset 101) is preserved
โ Consumer resume from offset 102, no data gap
The correct default is false for almost all production use cases. Set it to true only when:
- Data is genuinely ephemeral (real-time metrics, sensor readings with no historical value)
- Availability is strictly more important than consistency (monitoring dashboards, not financial records)
- The team has explicitly signed off on accepting potential data loss
Monitoring Replication Health
# Find partitions where ISR < replication factor (under-replicated)
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --under-replicated-partitions
# (no output is good โ means all partitions are fully replicated)
# Find partitions with no leader (unavailable partitions)
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --unavailable-partitions
# Monitor leader counts per broker (should be roughly balanced)
kafka-topics.sh --bootstrap-server localhost:9092 --describe 2>/dev/null \
| grep "Leader:" | awk '{print $4}' | sort | uniq -c | sort -rn
# 142 broker-1
# 141 broker-2
# 140 broker-3 โ well-balanced
Key JMX metrics for replication monitoring:
# Prometheus alert rules for replication health
groups:
- name: kafka-replication
rules:
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_replicamanager_underreplicatedpartitions > 0
for: 5m
labels:
severity: critical
annotations:
summary: "{{ $value }} partitions under-replicated on {{ $labels.instance }}"
description: "Replicas may be falling behind. Check broker disk/network health."
- alert: KafkaISRShrinkRateHigh
expr: rate(kafka_server_replicamanager_isrshrinks_total[5m]) > 0.1
for: 2m
labels:
severity: warning
annotations:
summary: "ISR is shrinking frequently on {{ $labels.instance }}"
- alert: KafkaFollowerLagHigh
expr: kafka_server_replicafetchermanager_maxlag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Follower replica is {{ $value }} messages behind the leader"
- alert: KafkaOfflinePartitions
expr: kafka_controller_kafkacontroller_offlinepartitionscount > 0
for: 1m
labels:
severity: critical
annotations:
summary: "{{ $value }} partitions have no available leader"
ISR, the high watermark, and Leader Epoch form the three pillars of Kafka's distributed consistency model. ISR defines which replicas are "trustworthy" for write acknowledgment. HW defines the "safe" boundary for consumption โ the furthest point that all ISR members have confirmed. Leader Epoch ensures that log truncation decisions during leader transitions are precise and never erase committed data. Together, they provide strong durability guarantees while maintaining high throughput โ the combination that makes Kafka viable as a mission-critical data infrastructure component.