第 19 章

Fetch 请求与 Consumer 拉取机制

第19章:Fetch 请求与 Consumer 拉取机制

导读:Fetch 请求在 Broker 端如何处理?

本章核心问题:Fetch 请求在 Broker 端如何处理?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

拉模型的哲学

Kafka 选择消费者主动拉取(Pull),而不是 Broker 主动推送(Push),这个决策背后有深刻的工程考量。

推模型的核心矛盾在于:Broker 必须知道每个消费者当前能消费多快。慢消费者会导致 Broker 侧队列积压,快消费者又会因为等待而浪费资源。Kafka 的设计者选择了更简单的答案:让消费者自己决定何时、拉多少。慢消费者自然地落后,而不会对其他消费者或 Broker 造成影响。消费者崩溃重启后,也可以从任意 Offset 继续,这种语义在推模型中极难实现。

理解了这个哲学,接下来看具体的实现。

Broker 侧:KafkaApis.handleFetchRequest()

// KafkaApis.scala(简化)
def handleFetchRequest(request: RequestChannel.Request): Unit = {
  val versionId = request.header.apiVersion
  val fetchRequest = request.body[FetchRequest]

  // ① 区分来源:Consumer(replicaId=-1)还是 Follower 副本(replicaId>=0)
  val isFromFollower = fetchRequest.isFromFollower
  val isFromConsumer = fetchRequest.isFromConsumer

  // ② 授权检查(仅对 Consumer;Follower 间复制无需额外授权)
  val authorizedRequestInfo = mutable.Map[TopicIdPartition, FetchRequest.PartitionData]()
  fetchRequest.fetchData(topicNames).forEach { (tp, partitionData) =>
    if (isFromConsumer && !authorize(request.context, READ, TOPIC, tp.topic))
      erroneous(tp, Errors.TOPIC_AUTHORIZATION_FAILED)
    else
      authorizedRequestInfo(tp) = partitionData
  }

  // ③ 从 ReplicaManager 读取数据
  replicaManager.fetchMessages(
    params = new FetchParams(
      requestVersion  = versionId,
      replicaId       = fetchRequest.replicaId,
      maxWaitMs       = fetchRequest.maxWait,
      minBytes        = fetchRequest.minBytes,
      maxBytes        = fetchRequest.maxBytes,
      isolation       = FetchIsolation(fetchRequest),
      clientMetadata  = clientMetadata
    ),
    fetchInfos   = authorizedRequestInfo,
    quota        = quota,
    responseCallback = processResponseCallback  // 回调,数据就绪后发送响应
  )
}

replicaManager.fetchMessages()

// ReplicaManager.scala(简化)
def fetchMessages(
  params: FetchParams,
  fetchInfos: Seq[(TopicIdPartition, PartitionData)],
  quota: ReplicaQuota,
  responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
): Unit = {

  // ① 从本地 Log 读取数据
  val logReadResults = readFromLocalLog(
    replicaId       = params.replicaId,
    fetchOnlyFromLeader = params.fetchOnlyFromLeader,
    fetchIsolation  = params.isolation,
    fetchMaxBytes   = params.maxBytes,
    hardMaxBytesLimit = false,
    readPartitionInfo = fetchInfos,
    quota           = quota
  )

  // ② 判断是否满足 min.bytes 条件
  val bytesReadable = logReadResults.map(_._2.info.records.sizeInBytes).sum
  val errorReadingData = logReadResults.exists(_._2.error != Errors.NONE)

  if (params.maxWaitMs <= 0 || bytesReadable >= params.minBytes || errorReadingData) {
    // 数据充足或不需要等待,直接响应
    val fetchPartitionData = logReadResults.map { case (tp, result) => tp -> result.toFetchPartitionData(false) }
    responseCallback(fetchPartitionData)
  } else {
    // 数据不足,创建 DelayedFetch,等待更多数据到来
    val delayedFetch = new DelayedFetch(
      params.maxWaitMs, params, fetchInfos, this, quota, responseCallback
    )
    val fetchPartitionKeys = fetchInfos.map { case (tp, _) => new TopicPartitionOperationKey(tp) }
    delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, fetchPartitionKeys)
  }
}

readFromLocalLog() — 实际读取过程

// ReplicaManager.scala(简化)
private def readFromLocalLog(
  replicaId: Int,
  fetchOnlyFromLeader: Boolean,
  fetchIsolation: FetchIsolation,
  fetchMaxBytes: Int,
  hardMaxBytesLimit: Boolean,
  readPartitionInfo: Seq[(TopicIdPartition, PartitionData)],
  quota: ReplicaQuota
): Seq[(TopicIdPartition, LogReadResult)] = {

  readPartitionInfo.map { case (tp, fetchInfo) =>
    val adjustedMaxBytes = math.min(fetchInfo.maxBytes, fetchMaxBytes)
    try {
      val partition = getPartitionOrException(tp.topicPartition)
      // 读取到 FetchDataInfo
      val fetchDataInfo = partition.fetchMessages(
        fetchOffset    = fetchInfo.fetchOffset,
        currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
        maxBytes       = adjustedMaxBytes,
        fetchOnlyFromLeader = fetchOnlyFromLeader,
        fetchIsolation = fetchIsolation,
        includeAbortedTxns = params.isFromConsumer
      )
      tp -> LogReadResult(fetchDataInfo, ...)
    } catch {
      case e: KafkaStorageException => tp -> LogReadResult(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, error = Errors.KAFKA_STORAGE_ERROR, ...)
      case e: NotLeaderOrFollowerException => tp -> LogReadResult(..., error = Errors.NOT_LEADER_OR_FOLLOWER, ...)
    }
  }
}

partition.fetchMessages() 最终调用 UnifiedLog.read()

// UnifiedLog.scala(简化)
def read(
  startOffset: Long,
  maxLength: Int,
  isolation: FetchIsolation,
  minOneMessage: Boolean
): FetchDataInfo = {
  // ① 找到包含 startOffset 的 LogSegment(通过稀疏索引二分查找)
  val segmentEntry = segments.floorEntry(startOffset)
  val segment = segmentEntry.getValue

  // ② 在 Segment 内部精确定位到 startOffset 的物理位置
  val baseOffset = segmentEntry.getKey
  val offsetIndex = segment.offsetIndex
  val position = offsetIndex.lookup(startOffset)  // 返回最近的索引项

  // ③ 构造 FetchDataInfo,包含 FileRecords(尚未读出数据到内存!)
  // FileRecords 是对 FileChannel 的惰性引用,真正的数据传输在网络发送时发生
  val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
  FetchDataInfo(LogOffsetMetadata(startOffset), fetchInfo.records)
}

Zero-Copy:最重要的性能优化

Kafka 读取数据的核心优化是 Zero-Copy(零拷贝),通过 FileChannel.transferTo() 实现。

传统读取路径(4 次拷贝)

磁盘 → 内核缓冲区(DMA)→ 用户空间(JVM 堆)→ 内核 Socket 缓冲区 → 网卡(DMA)
           第1次拷贝         第2次拷贝             第3次拷贝          第4次拷贝

Zero-Copy 路径(2 次拷贝)

磁盘 → 内核 Page Cache(DMA)→ 网卡(DMA,通过 sendfile 系统调用)
           第1次拷贝                第2次拷贝(内核态完成,无用户态参与)

源码中的体现

// FileRecords.scala(简化)
class FileRecords(val file: File, private val channel: FileChannel, ...) extends Records {

  /**
   * 将文件数据写入 GatheringByteChannel(即网络 SocketChannel)
   * 这是 Zero-Copy 的核心调用
   */
  def writeTo(destChannel: GatheringByteChannel, offset: Long, length: Int): Long = {
    val tl = math.min(length.toLong, size - offset)
    // FileChannel.transferTo() 在 Linux 上映射到 sendfile() 系统调用
    // 数据从 Page Cache 直接传输到 Socket 缓冲区,不经过 JVM 堆
    channel.transferTo(start + offset, tl, destChannel)
  }
}

Zero-Copy 生效的前提是:数据已在操作系统 Page Cache 中(无需从磁盘读取)且目标是网络 Socket(非加密连接)。这也是为什么 Kafka 对 SSL/TLS 的处理需要额外的内存拷贝——加密破坏了 Zero-Copy 路径。

Leader vs Follower 读:fetchIsolation 的作用

FetchIsolation 枚举决定了能读到多远的数据:

隔离级别 能读到的最大 Offset 适用场景
LOG_END LEO(Log End Offset) Follower 副本复制
HIGH_WATERMARK HW(High Watermark) 普通 Consumer(isolation.level=read_uncommitted
TXN_COMMITTED Last Stable Offset (LSO) 事务 Consumer(isolation.level=read_committed

Follower 副本在 FetchRequest 中携带自己的 replicaId(Broker ID),Broker 识别后使用 LOG_END 级别,允许 Follower 读取到 Leader 的最新写入(包括未提交到 HW 的数据)。普通 Consumer 的 replicaId=-1,只能读到 HW,确保读到的数据是"已被所有 ISR 确认"的。

完整消费路径总结

KafkaConsumer.poll()
  │
  ├── Fetcher.sendFetches()
  │     → 构造 FetchRequest(Full 或 Incremental)
  │     → ConsumerNetworkClient.send()(异步 NIO)
  │
  ↓(网络传输)
  │
KafkaApis.handleFetchRequest()           ← Broker 侧
  │  [授权检查、区分 Consumer/Follower]
  ↓
ReplicaManager.fetchMessages()
  │  [readFromLocalLog,检查 minBytes]
  ↓
UnifiedLog.read()
  │  [索引定位 → FileRecords(惰性引用)]
  ↓
[数据充足] → responseCallback() → 发送 FetchResponse
[数据不足] → DelayedFetch → 等待新数据 or 超时
  │
  ↓(响应返回客户端)
  │
Fetcher.handleFetchResponse()
  → 放入 completedFetches 队列
  │
KafkaConsumer.pollForFetches()
  → 从 completedFetches 取数据
  → 反序列化 key/value
  → 返回 ConsumerRecords 给用户

Fetch 路径的核心设计哲学与 Produce 路径相同:异步 + 回调 + Purgatory。KafkaRequestHandler 线程不会阻塞等待数据,而是通过 DelayedFetch 机制让出线程资源,在条件满足时再触发响应。


Level 2 · 它是怎么运行的(3-5年经验)

Consumer 侧:Fetcher 线程的工作机制

KafkaConsumer 的线程模型

KafkaConsumer 不是线程安全的,所有方法必须从同一个线程调用。但在内部,它通过 ConsumerNetworkClient(包装了非阻塞 NIO 客户端)来异步发送和接收网络请求。从 Kafka 3.5 开始,KafkaConsumer 的内部结构迁移到了新的 AsyncKafkaConsumer 架构,但核心 Fetch 流程的语义没有变化。

Fetcher 的核心流程

// Fetcher.java(clients/src/main/java/org/apache/kafka/clients/consumer/internals/)
// 简化版,保留核心逻辑

public class Fetcher<K, V> implements Closeable {

  // 已完成的 Fetch 结果队列(Processor 线程填充,poll() 线程消费)
  private final ConcurrentLinkedQueue<CompletedFetch> completedFetches = new ConcurrentLinkedQueue<>();

  /**
   * 发送 FetchRequests 到所有分配给本 Consumer 的 Partition Leader
   * 由 poll() 方法内部调用
   */
  public synchronized int sendFetches() {
    // 找出所有"可拉取"的 Partition(已分配、有 Leader、没有 in-flight 请求)
    Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();

    for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
      final Node fetchTarget = entry.getKey();
      final FetchSessionHandler.FetchRequestData data = entry.getValue();

      // 构造 FetchRequest(包含 FetchSession ID,支持增量 Fetch,见 KIP-227)
      FetchRequest.Builder request = FetchRequest.Builder
          .forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
          .isolationLevel(isolationLevel)
          .setMaxBytes(this.maxBytes)
          .metadata(data.metadata())
          .removed(data.toForget())
          .replaced(data.toReplace());

      // 异步发送,注册回调
      client.send(fetchTarget, request)
          .addListener(new RequestFutureListener<ClientResponse>() {
            @Override
            public void onSuccess(ClientResponse resp) {
              // 解析响应,放入 completedFetches
              handleFetchResponse(fetchTarget, data, resp);
            }
            @Override
            public void onFailure(RuntimeException e) {
              handleFetchFailure(fetchTarget, data, e);
            }
          });
    }
    return fetchRequestMap.size();
  }

  /**
   * poll() 方法调用此方法,从 completedFetches 取出数据并反序列化
   */
  public List<ConsumerRecord<K, V>> fetchRecords(TopicPartition partition, int maxRecords) {
    CompletedFetch completedFetch = completedFetches.peek();
    if (completedFetch == null) return Collections.emptyList();

    // 从 CompletedFetch 中解析记录(反序列化 key/value)
    return completedFetch.fetchRecords(maxRecords);
  }
}

poll() 的内部执行顺序

// KafkaConsumer.java(简化)
public ConsumerRecords<K, V> poll(Duration timeout) {
  acquireAndEnsureOpen();
  try {
    // 1. 提交 offset(如果 enable.auto.commit=true 且到了提交时间)
    coordinator.poll(timer);

    // 2. 如果 completedFetches 为空,发送新的 FetchRequest 并等待响应
    if (!fetcher.hasAvailableFetches()) {
      client.poll(timer);  // 非阻塞轮询网络,最多等待 timeout
    }

    // 3. 从 completedFetches 排空,反序列化,返回给用户
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);

    // 4. 如果已有数据,发送预拉取请求(pipeline 效果)
    if (!records.isEmpty()) {
      if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
        client.pollNoWakeup();  // 尽快把下一批 FetchRequest 发出去
    }

    return new ConsumerRecords<>(records);
  } finally {
    release();
  }
}

注意步骤 4 中的预拉取:在把当前批次数据返回给用户代码之前,poll() 已经把下一批 FetchRequest 发出去了。这实现了 Fetch 请求的流水线,用户处理当前批次数据的同时,下一批数据已经在传输中。

FetchSession:增量 Fetch(KIP-227)

问题背景

在 KIP-227 之前,每次 FetchRequest 都需要携带所有分区的完整信息(Offset、Leader Epoch 等),即使这些分区上次 Fetch 之后没有任何变化。对于一个消费者订阅了 1000 个分区的场景,这意味着每秒数百次 FetchRequest 都在传输大量冗余元数据。

FetchSession 的工作原理

Broker 为每个 Consumer 维护一个 FetchSession,记录上次 Fetch 的状态:

// FetchSessionHandler.java(clients 侧,简化)
class FetchSessionHandler {
  private int sessionId = INVALID_SESSION_ID;  // 初始为 0,表示无 Session
  private long sessionEpoch = INITIAL_EPOCH;   // Session 版本号

  /**
   * 构造下一次 FetchRequest:
   * - 首次请求:携带所有分区(Full Fetch)
   * - 后续请求:只携带发生变化的分区(Incremental Fetch)
   */
  public FetchRequestData newBuilder() {
    if (sessionId == INVALID_SESSION_ID) {
      // Full fetch:携带所有分区
      return new FetchRequestData(toSend = all_partitions, sessionId = 0, epoch = INITIAL_EPOCH);
    } else {
      // Incremental fetch:只携带新增、删除或 offset 变化的分区
      Set<TopicPartition> added   = computeAdded();    // 新订阅的分区
      Set<TopicPartition> removed = computeRemoved();  // 取消订阅的分区
      Set<TopicPartition> altered = computeAltered();  // offset 推进的分区
      return new FetchRequestData(
          toSend   = altered ∪ added,
          toForget = removed,
          sessionId = sessionId,
          epoch    = nextEpoch()
      );
    }
  }
}

Broker 侧维护 FetchSessionCache

// FetchSession.scala(core/,简化)
class FetchSession(val id: Int, val privileged: Boolean, ...) {
  // 缓存上次 Fetch 请求中所有分区的状态(offset、leaderEpoch 等)
  val partitionMap: util.LinkedHashMap[TopicIdPartition, FetchSession.ENTRY_TYPE] = ...

  def update(toReplace: util.ArrayList[..], toAdd: util.ArrayList[..], toForget: util.ArrayList[..]): FetchSession = {
    // 增量更新分区状态,只处理发生变化的分区
    toForget.forEach(partitionMap.remove)
    toReplace.forEach { entry => partitionMap.put(entry.topicIdPartition, entry) }
    toAdd.forEach    { entry => partitionMap.put(entry.topicIdPartition, entry) }
    this
  }
}

效果:对于稳定的消费者,每次 FetchRequest 中变化的分区数量通常远小于总分区数。KIP-227 在大量分区场景下可以将 Fetch 请求的网络开销减少 90%+。


Level 3 · 规范怎么定义的(资深)

本章的协议与规范细节已融入上述 Level 中。如需深入了解,请参阅 Kafka 官方协议文档和对应 KIP 提案。


Level 4 · 边界与陷阱(所有人)

DelayedFetch 的完成条件

DelayedFetchtryComplete() 在两种情况下被触发:

  1. 定时器到期maxWaitMs 超时,不管有多少数据,都返回当前读到的内容(可能为空)。
  2. 新数据写入:每次 UnifiedLog.appendAsLeader() 成功后,ReplicaManager 会对该 Partition 的所有等待中的 DelayedFetch 调用 checkAndComplete()
// DelayedFetch.scala(简化)
class DelayedFetch(...) extends DelayedOperation(fetchMetadata.fetchParams.maxWaitMs) {

  override def tryComplete(): Boolean = {
    var accumulatedSize = 0
    fetchMetadata.fetchPartitionStatus.foreach { case (topicIdPartition, fetchStatus) =>
      val partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition)
      val endOffset = partition.highWatermark  // 或 LEO,取决于 isolation
      accumulatedSize += math.max(0, endOffset - fetchStatus.startOffsetMetadata.messageOffset).toInt * avgRecordSize
    }
    // 如果累计可读数据量 >= minBytes,立即完成
    if (accumulatedSize >= fetchMetadata.fetchParams.minBytes) forceComplete()
    else false
  }
}
本章评分
4.6  / 5  (11 评分)

💬 留言讨论