Controller Source Code: Election, Assignment and State Machine
The Controller's Responsibilities
Every Kafka cluster has one special role: the Controller. It is the central coordinator for all metadata operations, responsible for:
- Detecting broker join/leave events and triggering leader re-elections
- Deciding which broker is leader for each partition, and which replicas form the ISR
- Propagating LeaderAndIsr metadata changes to all affected brokers
- Managing topic creation and deletion
- Coordinating partition reassignment (triggered by
kafka-reassign-partitions.sh)
In KRaft mode (generally available since Kafka 3.3), the Controller role is played by QuorumController. It runs on dedicated Controller nodes (or co-located with brokers in combined mode) and uses the Raft protocol to elect an Active Controller among the Controller quorum.
KRaft's Event-Driven Architecture
ControllerEventManager — Single-Threaded Event Loop
The foundational design decision in KRaft is that all metadata mutations are serialized through a single-threaded event queue. This eliminates concurrency races and makes reasoning about Controller state straightforward — there is always exactly one thread modifying it.
// ControllerEventManager.java (metadata/src/main/java/, simplified)
public class ControllerEventManager {
private final LinkedBlockingQueue<ControllerEvent> queue = new LinkedBlockingQueue<>();
private final ControllerEventThread eventThread;
private final QuorumController controller;
public ControllerEventManager(QuorumController controller, ...) {
this.controller = controller;
this.eventThread = new ControllerEventThread("controller-event-thread");
this.eventThread.start();
}
// Any thread can enqueue an event (thread-safe)
public void enqueue(ControllerEvent event) {
queue.put(event);
}
// The event processing thread — the ONLY thread that mutates Controller state
class ControllerEventThread extends Thread {
@Override
public void run() {
while (!stopped) {
ControllerEvent event = queue.take(); // block until an event arrives
try {
long startMs = time.milliseconds();
event.process(controller); // process serially, no concurrency
eventProcessingStats.record(event.name(), time.milliseconds() - startMs);
} catch (Exception e) {
log.error("Error processing event {}", event.name(), e);
}
}
}
}
}
Every operation that could mutate Controller state — broker registration, topic creation, ISR change, partition reassignment — must be wrapped as a ControllerEvent and enqueued. Consistency is guaranteed by the single-thread constraint; the cost is that individual event processing must be kept short.
QuorumController — The KRaft Controller Core
// QuorumController.java (metadata/, simplified)
public class QuorumController implements Controller {
// In-memory cluster metadata state (mutated exclusively by the event thread)
private final ClusterControlManager clusterControl; // broker registration state
private final ReplicationControlManager replicationControl; // partition / replica state
private final ConfigurationControlManager configurationControl;
private final ProducerIdControlManager producerIdControl; // producer ID allocation
// Raft client: used to write metadata changes to __cluster_metadata
private final RaftClient<ApiMessageAndVersion> raftClient;
// Epoch of the current active Controller (incremented on every election)
private volatile int curClaimEpoch = -1;
/**
* Handle a BrokerRegistration event (broker sends this on startup).
*/
public CompletableFuture<BrokerRegistrationReply> registerBroker(
BrokerRegistrationRequestData request
) {
return appendWriteEvent("registerBroker", () -> {
// Validate broker ID and cluster ID
clusterControl.validateBrokerRegistration(request);
// Update in-memory state
BrokerRegistrationReply reply = clusterControl.registerBroker(request, ...);
// Build a Raft log record; this gets replicated to all Controller nodes
return ControllerResult.of(
Collections.singletonList(new ApiMessageAndVersion(
new RegisterBrokerRecord()
.setBrokerId(request.brokerId())
.setBrokerEpoch(reply.epoch()), CURRENT_VERSION
)),
reply
);
});
}
}
appendWriteEvent() wraps the operation as an event, executes it on the single-threaded event loop, and commits the resulting metadata records to __cluster_metadata via Raft. Every broker in the cluster subscribes to __cluster_metadata as a follower and applies the records to its local MetadataImage.
The Partition State Machine
Partition states are managed by ReplicationControlManager. Each partition can be in one of these states:
NonExistent
│ (topic created)
↓
NewPartition
│ (leader elected, ISR established)
↓
OnlinePartition ◄──────────────────────────────┐
│ (all replicas go offline) │ (leader re-elected)
↓ │
OfflinePartition ──────────────────────────────┘
│ (topic deleted)
↓
NonExistent
State Transitions Trigger Leader Election
// ReplicationControlManager.java (metadata/, simplified)
public class ReplicationControlManager {
// Runtime state of all partitions (in memory, mutated by event thread only)
private final HashMap<TopicIdPartition, PartitionRegistration> partitions = new HashMap<>();
/**
* Called when a broker is fenced (detected offline).
* Triggers leader re-election for every partition that broker was leading.
*/
ControllerResult<Void> handleBrokerFenced(int brokerId) {
List<ApiMessageAndVersion> records = new ArrayList<>();
clusterControl.getBrokerLeaderPartitions(brokerId).forEach(topicIdPartition -> {
PartitionRegistration partition = partitions.get(topicIdPartition);
// Remove the fenced broker from the ISR
int[] newIsr = removeFromArray(partition.isr, brokerId);
// Elect a new leader from the remaining ISR
int newLeader = electLeader(topicIdPartition, newIsr, partition.replicas);
if (newLeader == NO_LEADER) {
// ISR is now empty — partition goes offline
records.add(new ApiMessageAndVersion(
new PartitionChangeRecord()
.setTopicId(topicIdPartition.topicId())
.setPartitionId(topicIdPartition.partitionId())
.setLeader(NO_LEADER)
.setIsr(newIsr), CURRENT_VERSION
));
} else {
// New leader elected
records.add(new ApiMessageAndVersion(
new PartitionChangeRecord()
.setTopicId(topicIdPartition.topicId())
.setPartitionId(topicIdPartition.partitionId())
.setLeader(newLeader)
.setLeaderEpoch(partition.leaderEpoch + 1)
.setIsr(newIsr), CURRENT_VERSION
));
}
});
return ControllerResult.of(records, null);
}
/**
* Leader election strategy (simplified).
* Default: pick the first replica in the preferred replica list that is in the ISR.
*/
private int electLeader(TopicIdPartition tp, int[] isr, int[] replicas) {
for (int replica : replicas) {
if (arrayContains(isr, replica)) return replica;
}
// If ISR is empty and unclean.leader.election.enable=true, pick from AR
return NO_LEADER;
}
}
Controller Epoch Protection
Each time a new Controller is elected, curClaimEpoch increments. Brokers validate the controllerEpoch on every incoming Controller request:
// ReplicaManager.scala (core/, simplified)
def becomeLeaderOrFollower(correlationId: Int, leaderAndIsrRequest: LeaderAndIsrRequest, ...): Unit = {
val controllerId = leaderAndIsrRequest.controllerId
val controllerEpoch = leaderAndIsrRequest.controllerEpoch
// Reject requests from a stale Controller (split-brain protection)
if (controllerEpoch < this.controllerEpoch) {
staleControllerEpochCount.getAndIncrement()
warn(s"Ignoring LeaderAndIsr request from controller $controllerId " +
s"with epoch $controllerEpoch (current epoch is ${this.controllerEpoch})")
return
}
this.controllerEpoch = controllerEpoch
// ... process leader/follower transitions
}
This simple epoch check is the safeguard against split-brain: if a partition has two brokers that both believe they are leader (for example, due to a network partition that recovered), the one with the lower controller epoch will be rejected as soon as it tries to serve requests.
The Replica State Machine
Replica states are managed in concert with the Partition state machine:
NewReplica
│ (partition starts serving, replica starts catching up to leader)
↓
OnlineReplica ◄────────────────────────────────────────────────────────┐
│ (broker goes offline / replica falls too far behind and leaves ISR) │
↓ │
OfflineReplica ────────────────────────────────────────────────────────┘
│ (topic deleted, replica deletion triggered)
↓
ReplicaDeletionStarted
│ (broker receives deletion request, removes local log files)
↓
ReplicaDeletionSuccessful
│
↓
NonExistentReplica
A replica transitions from OfflineReplica back to OnlineReplica when its broker comes back online and its LEO catches up to the leader's HW within the replica.lag.time.max.ms window.
Metadata Delta Propagation: __cluster_metadata
This is the architectural heart of KRaft: the Controller no longer pushes metadata to brokers via RPC. Instead, all changes are written to the __cluster_metadata topic, and brokers subscribe to it as followers, pulling metadata updates automatically.
Active Controller
│
│ writes change records (PartitionChangeRecord, BrokerRegistrationRecord, ...)
↓
__cluster_metadata Topic (Raft Log)
│
├── Controller Follower 1 ────── participates in Raft voting
├── Controller Follower 2 ────── participates in Raft voting
│
├── Broker 1 ─── MetadataFetcher ─── consumes __cluster_metadata ─── updates local image
├── Broker 2 ─── MetadataFetcher ─── consumes __cluster_metadata ─── updates local image
└── Broker 3 ─── MetadataFetcher ─── consumes __cluster_metadata ─── updates local image
Each broker has a BrokerMetadataPublisher that consumes incremental records from __cluster_metadata and applies them to a local MetadataImage:
// BrokerMetadataPublisher.java (metadata/, simplified)
public class BrokerMetadataPublisher implements MetadataPublisher {
@Override
public void onMetadataUpdate(
MetadataDelta delta,
MetadataImage newImage,
LoaderManifest manifest
) {
// ① Update ReplicaManager: handle leader/follower transitions
if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
replicaManager.applyDelta(delta, newImage);
}
// ② Notify GroupCoordinator of coordinator migrations
if (delta.topicsDelta() != null) {
groupCoordinator.onMetadataUpdate(delta, newImage);
}
// ③ Replace the local topic/partition metadata cache atomically
metadataCache.setImage(newImage);
}
}
ZooKeeper Mode vs KRaft Mode: A Comparison
| Dimension | ZooKeeper Mode (legacy) | KRaft Mode (current) |
|---|---|---|
| Controller election | ZK ephemeral node race | Raft voting |
| Metadata storage | ZooKeeper znodes | __cluster_metadata topic |
| Metadata propagation | Controller pushes RPC to all brokers | Brokers pull from topic passively |
| Controller failure recovery | New Controller must rebuild all state (slow, O(partitions)) | New Controller replays from latest snapshot (fast) |
| External dependencies | ZooKeeper cluster required | None |
| Partition scale limit | ~200,000 partitions practical | Millions theoretically |
The Complete Topic Creation Flow
Tracing kafka-topics.sh --create in KRaft mode:
Admin Client
│
│ CreateTopicsRequest
↓
KafkaApis.handleCreateTopicsRequest()
│ [auth check, config validation]
↓
QuorumController.createTopics()
│ [enqueue CreateTopicsEvent to event thread]
↓
ReplicationControlManager.createTopics()
│ [assign replicas to brokers for each partition]
│ [uses RackAwareReplicaPlacement or StripedReplicaPlacement]
↓
Generate TopicRecord + PartitionRecord(s)
│ [written to __cluster_metadata via Raft replication]
↓
All brokers consume __cluster_metadata
│ [BrokerMetadataPublisher.onMetadataUpdate()]
↓
ReplicaManager.applyDelta()
│ [create local UnifiedLog directory and files]
│ [trigger leader election: leader replica starts accepting Produce]
↓
Controller returns result to Admin Client
Replica Assignment Strategy
ReplicationControlManager calls a ReplicaPlacementPolicy during topic creation:
// StripedReplicaPlacement.java (simplified)
public class StripedReplicaPlacement implements ReplicaPlacementPolicy {
@Override
public List<List<Integer>> assignReplicas(
int numPartitions,
short replicationFactor,
List<Integer> usableBrokers
) {
List<List<Integer>> result = new ArrayList<>();
int startIndex = random.nextInt(usableBrokers.size()); // random start to avoid hotspots
for (int i = 0; i < numPartitions; i++) {
List<Integer> replicas = new ArrayList<>();
// Leader replica: round-robin across all brokers for even distribution
int leaderIndex = (startIndex + i) % usableBrokers.size();
replicas.add(usableBrokers.get(leaderIndex));
// Follower replicas: sequential from leader position (rack-aware variant interleaves racks)
for (int j = 1; j < replicationFactor; j++) {
int followerIndex = (leaderIndex + j) % usableBrokers.size();
replicas.add(usableBrokers.get(followerIndex));
}
result.add(replicas);
}
return result;
}
}
This "striped" assignment ensures leader replicas are evenly spread across all brokers, preventing any single broker from hosting a disproportionate share of leaders and thus a disproportionate share of produce traffic.
Partition Reassignment: Source Code Trace
Running kafka-reassign-partitions.sh --execute sends an AlterPartitionReassignments request to the Controller:
// ReplicationControlManager.java (simplified)
ControllerResult<AlterPartitionReassignmentsResponseData> alterPartitionReassignments(
AlterPartitionReassignmentsRequestData request
) {
List<ApiMessageAndVersion> records = new ArrayList<>();
request.topics().forEach(topic -> {
topic.partitions().forEach(partition -> {
validateReassignment(topic.name(), partition.partitionIndex(), partition.replicas());
// Record the reassignment target (gradual, not immediate)
records.add(new ApiMessageAndVersion(
new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(partition.partitionIndex())
// addingReplicas: new replicas that need to catch up before joining ISR
.setAddingReplicas(computeAddingReplicas(currentReplicas, partition.replicas()))
// removingReplicas: old replicas to be dropped after new ISR is established
.setRemovingReplicas(computeRemovingReplicas(currentReplicas, partition.replicas())),
CURRENT_VERSION
));
});
});
return ControllerResult.of(records, buildResponse());
}
Reassignment is deliberately gradual:
- New replicas join as Learners (tracked in
addingReplicas), fetching from the leader. - Once a new replica's LEO catches up to the leader's HW, the Controller adds it to the ISR.
- Once the ISR contains all target replicas, the Controller removes the old replicas from ISR.
- If the current leader is one of the replicas being removed, the Controller triggers a preferred leader election to hand off leadership to a replica in the target set.
This process can be monitored in real time with kafka-reassign-partitions.sh --verify.
The Broader Significance of the KRaft Architecture
KRaft transforms Kafka from a system that depends on an external coordination service into a self-managing distributed system. The combination of single-threaded event loop and Raft Log delivers:
-
Predictable latency: Every metadata operation has a defined processing order, eliminating the non-deterministic delays of ZooKeeper ephemeral-node races.
-
Fast failure recovery: A newly elected Controller replays
__cluster_metadatafrom the latest snapshot rather than crawling through ZooKeeper, reducing recovery time from minutes to seconds at scale. -
Dramatically larger cluster sizes: KRaft supports millions of partitions in theory, while the ZooKeeper-based architecture starts encountering performance walls around 200,000 partitions.
-
Operational simplicity: Removing the ZooKeeper dependency eliminates an entire class of operational failure modes — ZooKeeper session timeouts, session ID mismatch, and the need to synchronize ZooKeeper and Kafka upgrades.