Chapter 15

Replication: ISR, Watermarks and Data Consistency

Replication: ISR, Watermarks and Data Consistency

Kafka's replication mechanism is the foundation of its durability and high availability guarantees. Understanding ISR (In-Sync Replicas), the three watermarks (LEO, HW, and Leader Epoch), and how they interact during various failure scenarios is essential for truly understanding what Kafka's consistency guarantees mean โ€” and where their boundaries lie.

The Follower Fetch Loop: How Replication Works

Push vs Pull: Why Kafka Chose Pull

Kafka's replication uses a Follower pull model rather than a Leader push model. This architectural choice has deep engineering rationale:

Problems with push:

Advantages of pull:

FetchRequest/FetchResponse: Key Fields

When a Follower fetches from the Leader, the request looks like this:

FetchRequest (Follower โ†’ Leader):
โ”œโ”€โ”€ replicaId: <Follower broker ID>  โ† negative for consumers (-1), positive for replicas
โ”œโ”€โ”€ maxWaitMs: replica.fetch.wait.max.ms (default: 500ms)
โ”œโ”€โ”€ minBytes: replica.fetch.min.bytes (default: 1)
โ”œโ”€โ”€ topics:
โ”‚   โ””โ”€โ”€ partitions:
โ”‚       โ”œโ”€โ”€ partition: 0
โ”‚       โ”œโ”€โ”€ fetchOffset: <Follower's current LEO>  โ† "I have everything up to here"
โ”‚       โ”œโ”€โ”€ logStartOffset: <Follower's log start>
โ”‚       โ””โ”€โ”€ maxBytes: replica.fetch.max.bytes (default: 10MB)

FetchResponse (Leader โ†’ Follower):
โ”œโ”€โ”€ topics:
โ”‚   โ””โ”€โ”€ partitions:
โ”‚       โ”œโ”€โ”€ partition: 0
โ”‚       โ”œโ”€โ”€ highWatermark: <Leader's current HW>  โ† Follower uses this to update its own HW
โ”‚       โ”œโ”€โ”€ lastStableOffset: <LSO, for transaction-aware consumers>
โ”‚       โ””โ”€โ”€ records: <RecordBatches starting at fetchOffset>

The fetchOffset field serves double duty: it tells the Leader where to start sending data, and it tells the Leader what the Follower's current LEO is. This is how the Leader tracks replica progress without the Follower sending a separate status message.

The Three Watermarks: Core of Kafka's Consistency Model

LEO (Log End Offset): Each Replica's Write Frontier

Definition: The offset of the next message to be written to a replica's log. Equivalently, one greater than the offset of the last written message.

LEO is private to each replica. Different replicas may have different LEOs โ€” Followers are typically slightly behind the Leader.

HW (High Watermark): The Safe Consumption Boundary

Definition: The minimum LEO across all replicas in the ISR (In-Sync Replica set).

HW = min(Leader LEO, Follower-1 LEO, Follower-2 LEO, ...)
     where only ISR members are included in the min calculation

Why HW is the consumption boundary:

Consumers are only allowed to read messages up to the current HW (exclusive). This guarantee ensures: if the current Leader fails and a new Leader is elected from the ISR, the new Leader is guaranteed to have all messages below HW. If consumers could read beyond HW, and then the Leader failed with an ISR member that didn't have those messages becoming the new Leader, consumers would have read messages that no longer exist โ€” a phantom read scenario that breaks the fundamental consistency contract.

Leader Epoch: The Fix for the HW Truncation Bug

Leader Epoch is a monotonically increasing generation counter that increments with each Leader election. Every RecordBatch written to the log includes the current partitionLeaderEpoch in its header (the partitionLeaderEpoch field in the batch format).

Why Leader Epoch was needed (KIP-101 background):

Before KIP-101, Followers used HW comparison to decide whether to truncate their local log on restart. This mechanism had a subtle data loss bug under specific timing conditions:

Initial state (after ISR has confirmed through offset 4, HW=5):
Leader (Broker A):   [0,1,2,3,4,5]  LEO=6  HW=5
Follower (Broker B): [0,1,2,3,4]    LEO=5  HW=5

Note: A has written offset 5, B's FetchRequest carrying LEO=5 hasn't arrived yet,
      so HW hasn't advanced to 6 yet.

Step 1: Both A and B crash simultaneously (e.g., power outage in the datacenter)

Step 2: B restarts first, becomes the new Leader
        B's log: [0,1,2,3,4]  LEO=5

Step 3: A restarts, still has [0,1,2,3,4,5] in its log
        A sees its stored HW=5, looks at its LEO=6
        Old logic: "my LEO > HW, the data beyond HW might be uncommitted, truncate!"
        A truncates its log to LEO=5, DELETING offset 5!

Step 4: A joins as Follower to new Leader B
        B only has [0,1,2,3,4], A syncs to match
        Offset 5 is permanently lost โ€” even though it was in HW before the crash!

Leader Epoch solution:

After restart, instead of using HW to decide truncation, a Follower sends an OffsetForLeaderEpochRequest to the current Leader:

"For my last known Leader Epoch (E), what is the last offset you have for that epoch?"

The Leader responds with the correct end offset for that epoch, and the Follower truncates precisely to that point โ€” never erasing data that the current Leader actually has. This eliminates the race condition entirely.

# Inspect leader epoch checkpoint file
cat /data/kafka/orders-0/leader-epoch-checkpoint
# Format: version, then epochโ†’startOffset pairs
# 1
# 0 0        โ† Epoch 0 started at offset 0 (initial leader)
# 1 5000     โ† Epoch 1 started at offset 5000 (first leader change)
# 2 10500    โ† Epoch 2 started at offset 10500 (second leader change)

LEO/HW Progression: Step-by-Step Diagram

Tracing a complete produce operation through a 3-replica topic, showing how LEO and HW evolve:

Initial state: all replicas synchronized
Leader:    [0,1,2,3,4]  LEO=5  HW=5
Follower1: [0,1,2,3,4]  LEO=5  HW=5
Follower2: [0,1,2,3,4]  LEO=5  HW=5

Step 1: Producer sends record with acks=all
        Leader writes offset=5 to local log
Leader:    [0,1,2,3,4,5]  LEO=6  HW=5  โ† HW unchanged: Followers haven't confirmed yet
Follower1: [0,1,2,3,4]    LEO=5  HW=5
Follower2: [0,1,2,3,4]    LEO=5  HW=5
[DelayedProduce created in Purgatory, waiting for ISR confirmation]

Step 2: Follower1 sends FetchRequest(fetchOffset=5)
        Leader sends: records=[offset5], highWatermark=5
        Follower1 writes offset=5, advances its LEO to 6
        Follower1 reads HW=5 from response, updates its local HW=5
Leader:    [0,1,2,3,4,5]  LEO=6  HW=5
Follower1: [0,1,2,3,4,5]  LEO=6  HW=5

Step 3: Follower2 sends FetchRequest(fetchOffset=5), same process
Leader:    [0,1,2,3,4,5]  LEO=6  HW=5
Follower2: [0,1,2,3,4,5]  LEO=6  HW=5

Step 4: Follower1 sends next FetchRequest(fetchOffset=6)
        This tells Leader: "Follower1's LEO is now 6"
        Leader updates its tracking of Follower1's LEO: 6
        New HW = min(Leader LEO=6, Follower1 LEO=6, Follower2 LEO=6) = 6  โ† HW advances!
        Leader triggers DelayedProduce: all ISR members confirmed
        ProduceResponse sent to Producer with offset=5

Step 5: Leader's FetchResponse to Follower1 carries highWatermark=6
        Follower1 updates its HW=6
        (Follower2 gets HW=6 on its next FetchRequest as well)

Step 6: Consumer calls poll()
        Consumer can now read offset=5 (HW=6 > 5)
        Offset 5 is safely replicated to all ISR members

Key insight: HW does not advance the moment the Leader writes a record. It advances when the Leader processes the next FetchRequest from a Follower and learns (via the fetchOffset field) that the Follower has caught up. HW advancement always lags at least one FetchRequest round-trip behind the Leader's LEO advancement.

ISR Management: Dynamic Membership

replica.lag.time.max.ms: The Sync Tolerance Boundary

The ISR (In-Sync Replicas) is the set of replicas the Leader considers "sufficiently synchronized." The criterion:

A Follower that has not sent a FetchRequest or whose LEO has not caught up within replica.lag.time.max.ms (default: 30,000ms) is removed from the ISR.

Note: Kafka 3.0 removed the older replica.lag.max.messages parameter (message-count-based lag detection), which was removed because it caused false positives during burst producing scenarios.

# Watch ISR changes in broker logs
grep "ISR" /var/log/kafka/server.log | grep -E "(updated|shrunk|expanded)" | tail -20
# INFO [Partition orders-1 epoch=3] ISR updated from [1, 2, 3] to [1, 2]
#   because replica 3 is no longer in-sync (kafka.cluster.Partition)
# INFO [Partition orders-1 epoch=3] ISR updated from [1, 2] to [1, 2, 3]
#   because replica 3 rejoined ISR (kafka.cluster.Partition)

# Check topic partition ISR state
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic orders
# Topic: orders  Partition: 0  Leader: 1  Replicas: 1,2,3  Isr: 1,2,3  โ† healthy
# Topic: orders  Partition: 1  Leader: 2  Replicas: 2,3,1  Isr: 2,3    โ† ISR shrunken!
# Topic: orders  Partition: 2  Leader: 3  Replicas: 3,1,2  Isr: 3      โ† only leader in ISR!

A Follower may be removed from ISR due to:

min.insync.replicas: The ISR Floor Protection

min.insync.replicas sets a minimum ISR size threshold. When a producer uses acks=all:

// Producer-side handling
Properties props = new Properties();
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

try {
    RecordMetadata metadata = producer.send(
        new ProducerRecord<>("orders", key, value)
    ).get(10, TimeUnit.SECONDS);

    log.info("Written to partition={} offset={}", metadata.partition(), metadata.offset());

} catch (ExecutionException e) {
    if (e.getCause() instanceof NotEnoughReplicasException) {
        // ISR has fallen below min.insync.replicas
        // Options depend on business requirements:
        // 1. Retry with backoff (if ISR is expected to recover quickly)
        // 2. Write to a dead-letter queue and alert operations
        // 3. For compliance-critical data: halt processing and page on-call
        alertingService.notifyISRDegraded(topic, partition, e.getMessage());
        deadLetterQueue.send(key, value);
    } else if (e.getCause() instanceof TimeoutException) {
        log.error("Produce timed out โ€” broker may be overloaded");
    }
}

Recommended configurations by durability requirement:

Use Case replication.factor min.insync.replicas acks Can Tolerate Failure Of
Financial / compliance data 3 2 all 1 broker (writes fail if 2 brokers down)
Standard business events 3 2 all 1 broker
High-throughput metrics/logs 3 1 1 any 2 brokers (with possible data loss)
Development / testing 1 1 1 N/A

For 3 replicas with minISR=2: the system tolerates 1 Follower falling behind (ISR=2 is fine). If 2 Followers fall behind simultaneously (ISR=1 = only the Leader), writes fail. This prevents a scenario where data appears committed but is only on a single broker that could then fail.

Unclean Leader Election: The Availability vs Durability Trade-off

What Is an "Unclean" Election?

When all ISR members for a partition become unavailable simultaneously (e.g., a datacenter segment loses power), Kafka faces a binary choice:

unclean.leader.election.enable=false (default, recommended):

unclean.leader.election.enable=true:

Data Loss Scenario Walkthrough

Setup: 3-replica partition, min.insync.replicas=2, acks=all

Last confirmed state before disaster:
Leader (Broker 1):   [0 ... 100, 101]  LEO=102  HW=102
Follower (Broker 2): [0 ... 100, 101]  LEO=102  HW=102  (fully in sync)
Follower (Broker 3): [0 ... 100]       LEO=101  HW=101  (just fell behind, being caught up)

Consumer successfully read offset=101 (HW was 102, so 101 was visible and consumed)

--- Disaster ---
Broker 1: CRASHED (power outage)
Broker 2: CRASHED (network equipment failure)
Broker 3: ALIVE but only has offsets 0..100

If unclean.leader.election.enable=true:
  Broker 3 elected as new Leader (only survivor, not in ISR)
  New Leader's log: [0 ... 100]  LEO=101  HW=101
  
  Consumer checks for offset 101: NOT FOUND (new Leader starts at LEO=101)
  โ†’ Offset 101 is permanently lost
  โ†’ Consumer had already processed this message โ€” it processed data that no longer exists
  โ†’ Any downstream state built from offset 101 is now inconsistent with Kafka's log

If unclean.leader.election.enable=false (default):
  Partition goes offline: producers fail, consumers see LEADER_NOT_AVAILABLE
  Operations team is paged; Broker 1 or 2 is recovered
  Once either returns:
  โ†’ Partition leader elected from recovered ISR member
  โ†’ ALL data (including offset 101) is preserved
  โ†’ Consumer resume from offset 102, no data gap

The correct default is false for almost all production use cases. Set it to true only when:

  1. Data is genuinely ephemeral (real-time metrics, sensor readings with no historical value)
  2. Availability is strictly more important than consistency (monitoring dashboards, not financial records)
  3. The team has explicitly signed off on accepting potential data loss

Monitoring Replication Health

# Find partitions where ISR < replication factor (under-replicated)
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --under-replicated-partitions
# (no output is good โ€” means all partitions are fully replicated)

# Find partitions with no leader (unavailable partitions)
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --unavailable-partitions

# Monitor leader counts per broker (should be roughly balanced)
kafka-topics.sh --bootstrap-server localhost:9092 --describe 2>/dev/null \
  | grep "Leader:" | awk '{print $4}' | sort | uniq -c | sort -rn
# 142  broker-1
# 141  broker-2
# 140  broker-3  โ† well-balanced

Key JMX metrics for replication monitoring:

# Prometheus alert rules for replication health
groups:
  - name: kafka-replication
    rules:
      - alert: KafkaUnderReplicatedPartitions
        expr: kafka_server_replicamanager_underreplicatedpartitions > 0
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "{{ $value }} partitions under-replicated on {{ $labels.instance }}"
          description: "Replicas may be falling behind. Check broker disk/network health."

      - alert: KafkaISRShrinkRateHigh
        expr: rate(kafka_server_replicamanager_isrshrinks_total[5m]) > 0.1
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "ISR is shrinking frequently on {{ $labels.instance }}"

      - alert: KafkaFollowerLagHigh
        expr: kafka_server_replicafetchermanager_maxlag > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Follower replica is {{ $value }} messages behind the leader"

      - alert: KafkaOfflinePartitions
        expr: kafka_controller_kafkacontroller_offlinepartitionscount > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "{{ $value }} partitions have no available leader"

ISR, the high watermark, and Leader Epoch form the three pillars of Kafka's distributed consistency model. ISR defines which replicas are "trustworthy" for write acknowledgment. HW defines the "safe" boundary for consumption โ€” the furthest point that all ISR members have confirmed. Leader Epoch ensures that log truncation decisions during leader transitions are precise and never erase committed data. Together, they provide strong durability guarantees while maintaining high throughput โ€” the combination that makes Kafka viable as a mission-critical data infrastructure component.

Rate this chapter
4.8  / 5  (18 ratings)

๐Ÿ’ฌ Comments