Chapter 3

KRaft: How Kafka Implements Raft

ZooKeeper: A Decade of Technical Debt

When Kafka was designed in 2011, the team made a pragmatic choice: delegate cluster metadata management โ€” topic configurations, partition leader elections, broker registration โ€” to Apache ZooKeeper. This was reasonable. ZooKeeper was a mature, battle-tested distributed coordination service. LinkedIn's engineers didn't need to invent distributed consensus from scratch.

For nearly a decade, this arrangement held. But as Kafka's adoption exploded and cluster sizes grew into the thousands of brokers and millions of partitions, ZooKeeper's structural limitations became impossible to ignore.

Three Fundamental Bottlenecks

Bottleneck 1: Two distributed systems to operate. Running a Kafka cluster meant running two distributed systems: the Kafka broker cluster and a ZooKeeper ensemble (typically 3 or 5 nodes). Two monitoring stacks. Two backup strategies. Two upgrade procedures. Two security configurations (ZooKeeper's SASL/ACL configuration was entirely separate from Kafka's ACL system). The operational overhead was substantial, and the deep coupling between Kafka's internals and ZooKeeper's data model meant that many Kafka bugs were actually Kafka-ZooKeeper interaction bugs.

Bottleneck 2: ~200,000 partition ceiling. The Kafka Controller โ€” a special role held by one broker in the cluster โ€” maintains an in-memory representation of the complete cluster state. In ZooKeeper mode, the Controller builds this state by reading all partition metadata from ZooKeeper at startup. At ~200K partitions, Controller startup time exceeded 10 minutes. Controller failover โ€” when the Controller broker crashed and a new one was elected โ€” required the new Controller to reload all state from ZooKeeper, taking 30โ€“60 seconds during which the cluster could not elect new partition leaders.

ZooKeeper-mode Controller failover sequence:
1. Detect Controller broker crash (ZooKeeper session timeout: ~30s)
2. Remaining brokers race to create /controller znode (election: seconds)
3. Winning broker reads ALL partition metadata from ZooKeeper
   โ†’ At 200K partitions: 5-10 minutes
4. New Controller sends LeaderAndIsrRequest to all brokers to restore state
Total: 30-60 seconds minimum; 5-10 minutes for large clusters
During this time: no partition leader elections can complete

Bottleneck 3: Metadata propagation lag. ZooKeeper Watches notify the Controller of changes, but the Controller must then push those changes to all brokers via LeaderAndIsrRequest, UpdateMetadataRequest, and similar RPCs. At large cluster scales (1000+ brokers, 100K+ partitions), this fan-out push could take seconds, leaving the cluster in an inconsistent state during partition leader elections.

Raft Fundamentals

Understanding KRaft requires understanding Raft. Raft is the distributed consensus algorithm published by Diego Ongaro and John Ousterhout in 2013, specifically designed for understandability โ€” to be easier to understand and implement correctly than Paxos.

Leader Election

Raft divides time into terms โ€” monotonically increasing integers that serve as logical clocks. Each term begins with an election. The term number is included in every RPC, allowing nodes to detect and reject stale messages.

Election trigger: A Follower starts an election when it doesn't receive a heartbeat from the Leader within electionTimeout (randomized between 150โ€“300ms in the original paper). It increments its current term, transitions to Candidate, votes for itself, and sends RequestVote RPCs to all other nodes in parallel.

Voting rules:

  1. A node votes for at most one candidate per term (first-come-first-served within a term).
  2. A node only votes for a candidate whose log is at least as up-to-date as its own log. "More up-to-date" means: higher last log term, or same last log term but longer log. This rule is critical โ€” it ensures that only candidates with all committed entries can win elections.
  3. A candidate wins when it receives votes from a majority of nodes (โŒŠN/2โŒ‹ + 1).

Heartbeats: The elected Leader immediately sends AppendEntries RPCs (as heartbeats, possibly with empty log entries) to all Followers at heartbeatInterval (typically 50ms). This resets Followers' election timers and asserts the Leader's authority.

Term progression example (5-node cluster):

Term 1: Node A elected Leader. Serves client requests.
        Heartbeats: Aโ†’B, Aโ†’C, Aโ†’D, Aโ†’E every 50ms.

Node A crashes.

Term 2: B's election timer fires first. Bโ†’Candidate, sends RequestVote.
        C and D vote for B (C's timer hadn't fired yet; D same).
        E votes for B. A is crashed.
        B receives 3 votes (B+C+D) = majority of 5 = won.
        Bโ†’Leader for Term 2.

Log Replication

The Leader converts client requests into log entries and replicates them via AppendEntries RPCs to all Followers in parallel:

AppendEntries {
    term: 2,
    leaderId: B,
    prevLogIndex: 5,     // index of entry immediately preceding new ones
    prevLogTerm: 1,      // term of prevLogIndex entry
    entries: [           // log entries to store (empty for heartbeat)
        {index:6, term:2, data: "set x=42"},
        {index:7, term:2, data: "set y=99"},
    ],
    leaderCommit: 5      // leader's commitIndex
}

A Follower accepts the AppendEntries only if its log matches the Leader's at prevLogIndex/prevLogTerm. If not, it rejects, and the Leader decrements prevLogIndex and retries, walking back until it finds the divergence point. This Log Matching Property guarantees that if two nodes have the same (index, term) pair, their logs are identical up to that point.

Commitment: An entry is committed when the Leader has successfully replicated it to a majority of nodes. The Leader advances its commitIndex and notifies Followers via the leaderCommit field in subsequent AppendEntries. A committed entry is permanent โ€” Raft's safety guarantee prevents it from ever being overwritten.

Why Raft Is Simpler Than Paxos

Paxos is incomplete as specified and requires significant gaps to be filled for practical implementation. Its leader election and log replication are separate protocols that must be carefully combined. Raft decomposes the consensus problem into three relatively independent sub-problems โ€” leader election, log replication, and safety โ€” with clean interfaces between them. This makes Raft easier to implement correctly and reason about in production, which is why Kafka chose it for KRaft.

KRaft: Kafka's Raft Customizations

KRaft is not a verbatim implementation of the Raft paper. It makes several important adaptations for Kafka's specific metadata management requirements.

Epoch Instead of Term

KRaft uses epoch rather than "term," but the semantics are identical. The word choice matters operationally โ€” Kafka already used "epoch" in other contexts (leader epoch per partition), and the consistent terminology helps:

__cluster_metadata: The Raft Log as a Kafka Topic

KRaft's most elegant design decision: implement the Raft log as a special Kafka topic named __cluster_metadata.

This topic has exactly one partition. The Controller Quorum members maintain replicas of this partition via the Raft protocol. Every cluster metadata change โ€” topic creation/deletion, partition leader changes, ISR updates, ACL changes, broker registration โ€” is appended to this topic as a typed record.

Contents of __cluster_metadata partition 0:
offset=0:  RegisterBrokerRecord   {brokerId=1, host="broker1", port=9092, rack="us-east-1a"}
offset=1:  RegisterBrokerRecord   {brokerId=2, host="broker2", port=9092, rack="us-east-1b"}
offset=2:  RegisterBrokerRecord   {brokerId=3, host="broker3", port=9092, rack="us-east-1c"}
offset=3:  TopicRecord            {topicId="abc-uuid", name="order-events"}
offset=4:  PartitionRecord        {topicId="abc-uuid", partId=0, leader=1, isr=[1,2,3], epoch=0}
offset=5:  PartitionRecord        {topicId="abc-uuid", partId=1, leader=2, isr=[1,2,3], epoch=0}
...
offset=N:  PartitionChangeRecord  {topicId="abc-uuid", partId=0, leader=2, isr=[2,3], epoch=1}
  (written when broker 1 falls out of ISR and broker 2 becomes leader)

Non-Controller brokers subscribe to __cluster_metadata via the MetadataFetch RPC, receiving pushed updates from the active Controller leader. Each broker maintains a complete, up-to-date metadata image in memory. This eliminates ZooKeeper mode's complexity of the Controller maintaining separate push channels (LeaderAndIsrRequest, UpdateMetadataRequest) to all brokers.

Controller Quorum: 3 or 5 Nodes?

The Controller Quorum is the set of nodes participating in Raft consensus for __cluster_metadata. Controllers can be:

Combined mode: Broker nodes that also participate in the Controller Quorum. Used in small clusters (3โ€“5 nodes) to reduce infrastructure cost.

Dedicated mode: Nodes dedicated exclusively to Controller Quorum duties, handling no client traffic. Recommended for large clusters (50+ brokers) to ensure Controller resources aren't impacted by producer/consumer I/O load.

# Check Controller Quorum status
kafka-metadata-quorum.sh \
  --bootstrap-controller broker1:9093 \
  describe --status

# Example output:
# ClusterId:           XhcUv2Q5Rz2dkPmB8xVdAg
# LeaderId:            1
# LeaderEpoch:         42
# HighWatermark:       158392
# MaxFollowerLag:      0
# MaxFollowerLagTimeMs: 12
# CurrentVoters:       [1,2,3]
# CurrentObservers:    [4,5,6,7,8]

3-node quorum: Tolerates 1 node failure (majority = 2/3). Suitable for most production environments. If 1 node is down for maintenance, you still have consensus (2 of 3 operational).

5-node quorum: Tolerates 2 node failures (majority = 3/5). Use when rolling upgrades might temporarily have 2 nodes down simultaneously, or when you need stronger availability guarantees for metadata operations.

MetadataVersion: Schema Versioning for the Raft Log

MetadataVersion (MV) is a monotonically increasing integer that tracks the format version of records written to __cluster_metadata. Each Kafka release that introduces new metadata record types or fields increments the MV.

All Controller Quorum members must support the cluster's current MV. This enables safe rolling upgrades: upgrade all nodes to the new software version (which supports both old and new MV), then bump the MV with a single admin command to activate new features.

# Describe current and available feature flags including MetadataVersion
kafka-features.sh --bootstrap-server broker1:9092 describe

# Output:
# Feature: metadata.version
#   Current version: 14
#   Minimum version: 1
#   Maximum version: 20

# Upgrade MetadataVersion after all brokers are on the new software version
kafka-features.sh --bootstrap-server broker1:9092 upgrade \
  --feature metadata.version=15

KRaft vs ZooKeeper: Performance Comparison

Metric ZooKeeper Mode KRaft Mode Improvement
Controller startup time (200K partitions) 5โ€“10 minutes 2โ€“5 seconds ~100x
Controller failover time 30โ€“60 seconds < 500ms ~100x
Maximum supported partitions ~200,000 Millions (tested to 3M+) ~15x
Metadata propagation lag (1000 brokers) Seconds Milliseconds ~100x
Infrastructure components 2 (Kafka + ZooKeeper) 1 (Kafka only) โ€”

Why Controller failover is 100x faster in KRaft: In ZooKeeper mode, the new Controller must read all partition metadata from ZooKeeper โ€” the more partitions, the longer it takes. In KRaft mode, all Controller Quorum nodes always have a complete, up-to-date copy of __cluster_metadata because Raft ensures all replicas are synchronized. When the Controller leader crashes, Raft elects a new leader from the existing replicas. The new leader already has all metadata in memory โ€” it starts serving immediately after election, which takes under 500ms (the Raft election timeout).

ZooKeeper to KRaft Migration: Complete Walkthrough

Kafka 3.7 provides a supported migration path from ZooKeeper mode to KRaft mode without downtime (rolling restarts required, but no full cluster stop).

Prerequisites

# 1. Ensure all brokers are running Kafka 3.7+
kafka-broker-api-versions.sh --bootstrap-server broker1:9092 | grep -i version

# 2. Ensure inter.broker.protocol.version is at current version
kafka-configs.sh --bootstrap-server broker1:9092 \
  --entity-type brokers --entity-name 1 \
  --describe | grep "inter.broker.protocol.version"

# 3. All brokers must be healthy โ€” no under-replicated partitions
kafka-topics.sh --bootstrap-server broker1:9092 --describe \
  --under-replicated-partitions
# Should return empty output

Step 1: Provision KRaft Controller Nodes

Prepare dedicated controller nodes (or use existing brokers in combined mode). Generate a Cluster UUID โ€” this identifies the cluster across both ZooKeeper and KRaft phases:

# Generate Cluster UUID โ€” must be the same as the ZooKeeper cluster's ID
# Extract from ZooKeeper if it already exists:
CLUSTER_UUID=$(zookeeper-shell.sh zoo1:2181 get /cluster/id 2>/dev/null \
  | grep -v "^Connecting" | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])")

# Or generate new if starting fresh:
# CLUSTER_UUID=$(kafka-storage.sh random-uuid)

echo "Using Cluster UUID: $CLUSTER_UUID"

Create controller.properties for each Controller Quorum node:

# /etc/kafka/controller.properties (for dedicated controller mode)
process.roles=controller
node.id=101
controller.quorum.voters=101@ctrl1:9093,102@ctrl2:9093,103@ctrl3:9093

listeners=CONTROLLER://0.0.0.0:9093
listener.security.protocol.map=CONTROLLER:PLAINTEXT
controller.listener.names=CONTROLLER

log.dirs=/var/kafka/controller-metadata

# ZooKeeper connection (needed during migration)
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
zookeeper.metadata.migration.enable=true

Format the controller storage:

kafka-storage.sh format \
  --config /etc/kafka/controller.properties \
  --cluster-id "$CLUSTER_UUID" \
  --no-initial-controllers

Step 2: Start KRaft Controllers and Enable Migration Mode on Brokers

# Start all KRaft Controller nodes
for ctrl in ctrl1 ctrl2 ctrl3; do
    ssh $ctrl "kafka-server-start.sh -daemon /etc/kafka/controller.properties"
done

# Wait for Controller leader to be elected
kafka-metadata-quorum.sh --bootstrap-controller ctrl1:9093 describe --status
# Confirm LeaderId is populated

Update server.properties on all existing Kafka brokers to point at the new Controller Quorum:

# Add to server.properties on all brokers (DO NOT remove zookeeper.connect yet)
controller.quorum.voters=101@ctrl1:9093,102@ctrl2:9093,103@ctrl3:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
zookeeper.metadata.migration.enable=true

Rolling restart all brokers one at a time:

# Restart brokers one at a time; wait for partition reassignment to complete
for broker in broker1 broker2 broker3 broker4 broker5; do
    ssh $broker "kafka-server-stop.sh && sleep 5 && kafka-server-start.sh -daemon /etc/kafka/server.properties"
    # Wait until no under-replicated partitions before proceeding
    while kafka-topics.sh --bootstrap-server broker1:9092 --describe \
        --under-replicated-partitions | grep -q "UnderReplicated"; do
        echo "Waiting for replication to catch up..."
        sleep 5
    done
    echo "Broker $broker restarted successfully"
done

Step 3: Complete the Migration

# Monitor migration progress โ€” all partition metadata migrates from ZK to KRaft
kafka-metadata-quorum.sh --bootstrap-controller ctrl1:9093 describe --status

# Confirm migration complete โ€” ZooKeeper should show no remaining Kafka znodes
# being actively written
zookeeper-shell.sh zoo1:2181 ls /brokers/topics | wc -l
# Should be 0 after migration completes

Step 4: Remove ZooKeeper Dependency

Once all brokers are confirmed running in KRaft mode:

# Remove from server.properties on all brokers:
# zookeeper.connect=...
# zookeeper.metadata.migration.enable=...

# Remove from controller.properties:
# zookeeper.connect=...
# zookeeper.metadata.migration.enable=...

Rolling restart all brokers and controllers one final time. ZooKeeper can now be safely decommissioned.

# Final confirmation: no ZooKeeper dependency
kafka-metadata-quorum.sh --bootstrap-controller ctrl1:9093 describe --status
# ClusterId, LeaderId, HighWatermark all present, no ZooKeeper references

# Stop ZooKeeper ensemble
for zk in zoo1 zoo2 zoo3; do
    ssh $zk "zkServer.sh stop"
done
echo "ZooKeeper decommissioned. KRaft migration complete."

KRaft Implementation Details: Fetch vs Vote

KRaft makes one particularly elegant implementation choice: it uses Kafka's existing Fetch protocol as the transport for Raft log replication, rather than implementing a separate AppendEntries RPC.

Controller Quorum Followers replicate __cluster_metadata from the Controller Leader using the same FetchRequest/FetchResponse protocol path that consumer clients use to read from brokers. The Controller Leader's fetch handler is essentially the same code as the broker's consumer fetch handler, with minor differences for the metadata log context.

The dedicated Raft-specific APIs are:

ApiKey=52  VoteRequest           โ€” RequestVote equivalent (ballot request)
ApiKey=53  BeginQuorumEpochRequest โ€” new Leader broadcasts authority
ApiKey=54  EndQuorumEpochRequest  โ€” Leader voluntarily steps down (rolling upgrade)
ApiKey=55  DescribeQuorumRequest  โ€” introspect Quorum state (used by CLI tools)
ApiKey=1   FetchRequest           โ€” Raft log replication (same as consumer fetch)
# Watch KRaft internal traffic in action
# On the controller port (9093), you'll see:
# - FetchRequests every few milliseconds (Followers fetching __cluster_metadata)
# - VoteRequests only during elections (rare under normal operation)
# - BeginQuorumEpoch immediately after each election

# Observe current quorum voters and their replication lag
kafka-metadata-quorum.sh --bootstrap-controller ctrl1:9093 describe --replication

# Output:
# NodeId  LogEndOffset  Lag  LastFetchTimestamp  LastCaughtUpTimestamp  Status
# 101     158392        0    1714000000123       1714000000123          Leader
# 102     158392        0    1714000000119       1714000000119          Follower
# 103     158390        2    1714000000098       1714000000098          Follower

This reuse of the Fetch protocol is architecturally elegant: Kafka's binary protocol, network stack, and I/O threading model handle both client traffic and internal Raft replication through the same code paths. The result is a single, unified distributed system โ€” not two systems (Kafka + ZooKeeper) loosely coupled through a client library. This is the fundamental architectural simplification that KRaft represents, and why it unlocks millions of partitions, sub-second failover, and single-system operability.

Rate this chapter
4.8  / 5  (86 ratings)

๐Ÿ’ฌ Comments