第 20 章

Controller 源码:选举、分配与状态机

第20章:Controller 源码:选举、分配与状态机

导读:KRaft Controller 如何通过单线程事件循环管理集群元数据?

本章核心问题:KRaft Controller 如何通过单线程事件循环管理集群元数据?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

Controller 的职责

Kafka 集群中有一个特殊的角色:Controller。它是元数据操作的中央协调者,负责:

在 KRaft 模式(Kafka 3.3+ 正式 GA)下,Controller 的角色由 QuorumController 承担,它运行在独立的 Controller 节点上(或与 Broker 合并部署),通过 Raft 协议在 Controller 节点间选举 Active Controller。

元数据增量传播:__cluster_metadata

这是 KRaft 架构的核心——Controller 不再主动向所有 Broker 推送元数据,而是将所有变更写入 __cluster_metadata Topic,Broker 作为这个 Topic 的 Follower,自动同步元数据

Active Controller
    │
    │ 写入变更记录(PartitionChangeRecord、BrokerRegistrationRecord...)
    ↓
__cluster_metadata Topic(Raft Log)
    │
    ├── Controller Follower 1 ────── 参与 Raft 投票
    ├── Controller Follower 2 ────── 参与 Raft 投票
    │
    ├── Broker 1 ─── MetadataFetcher ─── 消费 __cluster_metadata ─── 更新本地缓存
    ├── Broker 2 ─── MetadataFetcher ─── 消费 __cluster_metadata ─── 更新本地缓存
    └── Broker 3 ─── MetadataFetcher ─── 消费 __cluster_metadata ─── 更新本地缓存

每个 Broker 内部有一个 BrokerMetadataPublisher,它消费 __cluster_metadata 的增量记录,并将变更应用到本地的 MetadataImage

// BrokerMetadataPublisher.java(metadata/,简化)
public class BrokerMetadataPublisher implements MetadataPublisher {

  @Override
  public void onMetadataUpdate(
      MetadataDelta delta,
      MetadataImage newImage,
      LoaderManifest manifest
  ) {
    // ① 更新副本管理器(处理 Leader/Follower 切换)
    if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
      replicaManager.applyDelta(delta, newImage);
    }

    // ② 更新 GroupCoordinator(处理 Coordinator 迁移)
    if (delta.topicsDelta() != null) {
      groupCoordinator.onMetadataUpdate(delta, newImage);
    }

    // ③ 更新本地 Topic/Partition 元数据缓存
    metadataCache.setImage(newImage);
  }
}

ZooKeeper 模式 vs KRaft 模式的对比

维度 ZooKeeper 模式(旧) KRaft 模式(新)
Controller 选举 ZK 临时节点抢占 Raft 投票
元数据存储 ZooKeeper znodes __cluster_metadata Topic
元数据传播 Controller 主动推 RPC Broker 被动消费 Topic
Controller 故障恢复 新 Controller 需重建所有状态(慢) 新 Controller 从 Raft Log 恢复(快)
外部依赖 需要独立的 ZooKeeper 集群 无外部依赖

Level 2 · 它是怎么运行的(3-5年经验)

KRaft 的事件驱动架构

ControllerEventManager — 单线程事件循环

KRaft Controller 的核心设计决策是所有元数据变更都通过一个单线程事件队列串行化。这消除了并发竞争,使 Controller 状态的推理极为简单:

// ControllerEventManager.java(metadata/src/main/java/,简化)
public class ControllerEventManager {

  private final LinkedBlockingQueue<ControllerEvent> queue = new LinkedBlockingQueue<>();
  private final ControllerEventThread eventThread;
  private final QuorumController controller;

  public ControllerEventManager(QuorumController controller, ...) {
    this.controller = controller;
    this.eventThread = new ControllerEventThread("controller-event-thread");
    this.eventThread.start();
  }

  // 任意线程可以入队事件(线程安全)
  public void enqueue(ControllerEvent event) {
    queue.put(event);
  }

  // 事件处理线程(只有这一个线程修改 Controller 状态)
  class ControllerEventThread extends Thread {
    @Override
    public void run() {
      while (!stopped) {
        ControllerEvent event = queue.take();  // 阻塞等待
        try {
          long startMs = time.milliseconds();
          event.process(controller);           // 串行处理
          eventProcessingStats.record(event.name(), time.milliseconds() - startMs);
        } catch (Exception e) {
          log.error("Error processing event {}", event.name(), e);
        }
      }
    }
  }
}

所有可能修改 Controller 状态的操作——Broker 注册、Topic 创建、ISR 变更、Partition 重分配——都必须封装成 ControllerEvent 并入队。这保证了 Controller 状态的一致性,代价是操作必须串行,因此每个事件的处理时间需要尽量短。

QuorumController — KRaft Controller 核心

// QuorumController.java(metadata/,简化)
public class QuorumController implements Controller {

  // 集群元数据的内存状态(仅由事件线程修改)
  private final ClusterControlManager clusterControl;      // Broker 注册状态
  private final ReplicationControlManager replicationControl; // Partition/副本状态
  private final ConfigurationControlManager configurationControl;
  private final ProducerIdControlManager producerIdControl; // Producer ID 分配

  // Raft 客户端:用于将元数据变更写入 __cluster_metadata Log
  private final RaftClient<ApiMessageAndVersion> raftClient;

  // Active Controller 的 Epoch(每次重新选举后递增)
  private volatile int curClaimEpoch = -1;

  /**
   * 处理 BrokerRegistration 事件(Broker 启动时调用)
   */
  public CompletableFuture<BrokerRegistrationReply> registerBroker(
      BrokerRegistrationRequestData request
  ) {
    // 入队事件,在事件线程中处理
    return appendWriteEvent("registerBroker", () -> {
      // 验证 Broker ID 和集群 ID
      clusterControl.validateBrokerRegistration(request);
      // 更新内存状态
      BrokerRegistrationReply reply = clusterControl.registerBroker(request, ...);
      // 写入 __cluster_metadata(通过 Raft 复制到所有 Controller 节点)
      return ControllerResult.of(
          Collections.singletonList(new ApiMessageAndVersion(
              new RegisterBrokerRecord()
                  .setBrokerId(request.brokerId())
                  .setBrokerEpoch(reply.epoch()), ...
          )),
          reply
      );
    });
  }
}

appendWriteEvent() 是关键方法:它将操作封装为事件,在单线程中执行,执行结果(元数据变更记录)通过 Raft 写入 __cluster_metadata Topic,再由所有 Broker 作为 Follower 消费该 Topic 来更新自身的本地元数据缓存。

Partition 状态机

KRaft 中的 Partition 状态由 ReplicationControlManager 管理。每个 Partition 可处于以下状态之一:

NonExistent
    │ (Topic 创建)
    ↓
NewPartition
    │ (Leader 选举成功,ISR 建立)
    ↓
OnlinePartition ←──────────────────┐
    │ (所有副本都离线)               │ (Leader 重选成功)
    ↓                               │
OfflinePartition ──────────────────┘
    │ (Topic 删除)
    ↓
NonExistent

状态转换触发 Leader 选举

// ReplicationControlManager.java(metadata/,简化)
public class ReplicationControlManager {

  // Partition 的运行时状态(在内存中,由事件线程维护)
  private final HashMap<TopicIdPartition, PartitionRegistration> partitions = new HashMap<>();

  /**
   * 当 Broker 离线时,触发受影响 Partition 的 Leader 重选举
   */
  ControllerResult<Void> handleBrokerFenced(int brokerId) {
    List<ApiMessageAndVersion> records = new ArrayList<>();

    // 找出所有以该 Broker 为 Leader 的 Partition
    clusterControl.getBrokerLeaderPartitions(brokerId).forEach(topicIdPartition -> {
      PartitionRegistration partition = partitions.get(topicIdPartition);

      // 从 ISR 中移除离线 Broker
      int[] newIsr = removeFromArray(partition.isr, brokerId);

      // 从 ISR 中选出新 Leader(使用配置的选举策略)
      int newLeader = electLeader(topicIdPartition, newIsr, partition.replicas);

      if (newLeader == NO_LEADER) {
        // ISR 为空,Partition 变为 Offline
        records.add(new ApiMessageAndVersion(
            new PartitionChangeRecord()
                .setTopicId(topicIdPartition.topicId())
                .setPartitionId(topicIdPartition.partitionId())
                .setLeader(NO_LEADER)
                .setIsr(newIsr), ...
        ));
      } else {
        // 选出新 Leader
        records.add(new ApiMessageAndVersion(
            new PartitionChangeRecord()
                .setLeader(newLeader)
                .setLeaderEpoch(partition.leaderEpoch + 1)
                .setIsr(newIsr), ...
        ));
      }
    });

    return ControllerResult.of(records, null);
  }

  /**
   * Leader 选举策略(简化)
   * 默认策略:从 ISR 中选偏好副本列表(replicas)中排序最靠前的
   */
  private int electLeader(TopicIdPartition tp, int[] isr, int[] replicas) {
    for (int replica : replicas) {
      if (arrayContains(isr, replica)) return replica;
    }
    // ISR 为空时,如果 unclean.leader.election.enable=true,从 AR 中选
    return NO_LEADER;
  }
}

Controller Epoch 保护

每次 Controller 重新当选,curClaimEpoch 递增。Broker 在处理来自 Controller 的请求时,会校验请求中携带的 controllerEpoch

// ReplicaManager.scala(core/,简化)
def becomeLeaderOrFollower(correlationId: Int, leaderAndIsrRequest: LeaderAndIsrRequest, ...): Unit = {
  val controllerId    = leaderAndIsrRequest.controllerId
  val controllerEpoch = leaderAndIsrRequest.controllerEpoch

  // 拒绝来自旧 Controller 的请求(防止脑裂)
  if (controllerEpoch < this.controllerEpoch) {
    staleControllerEpochCount.getAndIncrement()
    warn(s"Ignoring LeaderAndIsr request from controller $controllerId " +
         s"with epoch $controllerEpoch (current epoch is ${this.controllerEpoch})")
    return
  }
  this.controllerEpoch = controllerEpoch
  // ... 处理 Leader/Follower 切换
}

Replica 状态机

副本状态由 ReplicationControlManager 同步管理,与 Partition 状态机协作:

NewReplica
    │ (Partition 开始服务,副本开始追赶 Leader)
    ↓
OnlineReplica ←─────────────────────────────────┐
    │ (Broker 离线 / 副本落后太多被踢出 ISR)      │ (副本追上 Leader,重新加入 ISR)
    ↓                                             │
OfflineReplica ───────────────────────────────── ┘
    │ (Topic 删除,触发副本删除流程)
    ↓
ReplicaDeletionStarted
    │ (Broker 收到删除请求,删除本地日志文件)
    ↓
ReplicaDeletionSuccessful
    │
    ↓
NonExistentReplica

副本从 OfflineReplica 恢复到 OnlineReplica 需要满足:Broker 重新上线,且副本的 LEO 追上 Leader 的 HW(在 replica.lag.time.max.ms 内完成)。

Topic 创建的完整流程

kafka-topics.sh --create 为例,追踪 KRaft 模式下的完整处理链:

Admin Client
    │
    │ CreateTopicsRequest
    ↓
KafkaApis.handleCreateTopicsRequest()
    │  [权限检查、配置验证]
    ↓
QuorumController.createTopics()
    │  [入队 CreateTopicsEvent 到事件线程]
    ↓
ReplicationControlManager.createTopics()
    │  [为每个 Partition 分配副本(Replica Assignment)]
    │  [使用 RackAwareReplicaPlacement 或 StripedReplicaPlacement 策略]
    ↓
生成 TopicRecord + PartitionRecord
    │  [写入 __cluster_metadata,Raft 复制]
    ↓
所有 Broker 消费 __cluster_metadata
    │  [BrokerMetadataPublisher.onMetadataUpdate()]
    ↓
ReplicaManager.applyDelta()
    │  [创建本地 UnifiedLog 目录和文件]
    │  [触发 Leader 选举:Leader 副本开始接受 Produce]
    ↓
Controller 向 Admin Client 返回结果

副本分配策略

ReplicationControlManager 在创建 Topic 时调用 ReplicaPlacementPolicy

// StripedReplicaPlacement.java(简化)
public class StripedReplicaPlacement implements ReplicaPlacementPolicy {

  @Override
  public List<List<Integer>> assignReplicas(
      int numPartitions,
      short replicationFactor,
      List<Integer> usableBrokers
  ) {
    List<List<Integer>> result = new ArrayList<>();
    int startIndex = random.nextInt(usableBrokers.size());  // 随机起点,避免热点

    for (int i = 0; i < numPartitions; i++) {
      List<Integer> replicas = new ArrayList<>();
      // Leader 副本:轮询 Broker 列表,均匀分布
      int leaderIndex = (startIndex + i) % usableBrokers.size();
      replicas.add(usableBrokers.get(leaderIndex));

      // Follower 副本:在 Leader 之后顺序排列(考虑 Rack 感知)
      for (int j = 1; j < replicationFactor; j++) {
        int followerIndex = (leaderIndex + j) % usableBrokers.size();
        replicas.add(usableBrokers.get(followerIndex));
      }
      result.add(replicas);
    }
    return result;
  }
}

这个"条带化"分配确保 Leader 副本均匀分布在所有 Broker 上,避免单个 Broker 承担过多 Leader 流量。

Partition 重分配的源码追踪

运行 kafka-reassign-partitions.sh --execute 会向 Controller 发送 AlterPartitionReassignments 请求:

// ReplicationControlManager.java(简化)
ControllerResult<AlterPartitionReassignmentsResponseData> alterPartitionReassignments(
    AlterPartitionReassignmentsRequestData request
) {
  List<ApiMessageAndVersion> records = new ArrayList<>();

  request.topics().forEach(topic -> {
    topic.partitions().forEach(partition -> {
      // 验证目标副本列表
      validateReassignment(topic.name(), partition.partitionIndex(), partition.replicas());

      // 记录重分配目标(不立即切换,而是渐进式)
      records.add(new ApiMessageAndVersion(
          new PartitionChangeRecord()
              .setTopicId(topicId)
              .setPartitionId(partition.partitionIndex())
              // targetReplicas:期望的最终副本列表
              // addingReplicas:需要新增的副本(等待追赶)
              // removingReplicas:完成后要移除的副本
              .setAddingReplicas(computeAddingReplicas(currentReplicas, partition.replicas()))
              .setRemovingReplicas(computeRemovingReplicas(currentReplicas, partition.replicas())),
          ...
      ));
    });
  });

  return ControllerResult.of(records, buildResponse());
}

重分配是渐进式的:新增副本先作为 Learner 加入,追赶 Leader 的 LEO。追赶完成后,Controller 将其加入 ISR,再将旧副本从 ISR 移除,最后在合适时机切换 Leader(如果 Leader 是要被移除的副本)。这个过程可以通过 kafka-reassign-partitions.sh --verify 实时监控。

Controller 架构的演进意义

KRaft 架构的本质是将 Kafka 从"依赖外部协调服务"变成"自我管理的分布式系统"。通过单线程事件循环+Raft Log,Controller 实现了:

  1. 可预测的延迟:每个元数据操作都有明确的处理时序,不存在 ZooKeeper 临时节点竞争带来的不确定延迟
  2. 快速故障恢复:新 Controller 只需从 __cluster_metadata 的最新 Snapshot 开始重放即可重建状态,而不需要重新遍历所有 ZooKeeper 节点
  3. 更大的集群规模:KRaft 理论上支持百万级 Partition,而 ZooKeeper 模式在 20 万 Partition 时已经面临性能挑战

Level 3 · 规范怎么定义的(资深)

本章的协议与规范细节已融入上述 Level 中。如需深入了解,请参阅 Kafka 官方协议文档和对应 KIP 提案。


Level 4 · 边界与陷阱(所有人)

以下是与"Controller 源码:选举、分配与状态机"相关的常见边界问题和生产陷阱:

陷阱一:忽略默认配置的隐含假设。 许多 Kafka 参数的默认值是为通用场景设计的,在特定业务场景下可能导致性能问题或数据风险。上线前务必逐一审查与本章主题相关的配置项,确认其是否符合业务需求。

陷阱二:缺乏端到端的验证。 理论理解和生产实践之间往往存在差距。建议在预发布环境中构建完整的测试场景,覆盖正常路径和异常路径(如节点宕机、网络分区、磁盘满),验证系统行为是否符合预期。

陷阱三:监控盲区。 本章涉及的机制如果缺乏对应的监控指标和告警规则,问题往往在造成业务影响后才被发现。建议将关键指标纳入监控体系,设置合理的告警阈值。

本章评分
4.8  / 5  (9 评分)

💬 留言讨论