Chapter 19

Fetch Request and Consumer Pull Mechanism

The Philosophy of Pull

Kafka chose consumer-driven pull over broker-driven push, and this decision has deep engineering roots.

The fundamental tension in a push model is that the broker must know how fast each consumer can process. Slow consumers cause backlog on the broker; fast consumers waste resources waiting. Kafka's designers chose the simpler answer: let the consumer decide when and how much to fetch. Slow consumers naturally fall behind without affecting other consumers or the broker. Consumers that crash and restart can resume from any offset โ€” a semantic that is extremely difficult to implement in a push model.

With this philosophy in mind, let's examine the implementation.

Consumer Side: How the Fetcher Thread Works

KafkaConsumer's Threading Model

KafkaConsumer is not thread-safe โ€” all methods must be called from a single thread. Internally, it uses ConsumerNetworkClient (which wraps a non-blocking NIO client) to send and receive network requests asynchronously. Since Kafka 3.5, the internal structure has migrated toward the new AsyncKafkaConsumer architecture, but the core Fetch semantics remain unchanged.

The Fetcher's Core Loop

// Fetcher.java (clients/src/main/java/org/apache/kafka/clients/consumer/internals/)
// Simplified โ€” core logic retained

public class Fetcher<K, V> implements Closeable {

  // Completed fetch results (filled by network thread, consumed by poll() thread)
  private final ConcurrentLinkedQueue<CompletedFetch> completedFetches = new ConcurrentLinkedQueue<>();

  /**
   * Send FetchRequests to all leaders of partitions assigned to this consumer.
   * Called internally by poll().
   */
  public synchronized int sendFetches() {
    // Find all "fetchable" partitions: assigned, has a known leader, no in-flight request
    Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();

    for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
      final Node fetchTarget = entry.getKey();
      final FetchSessionHandler.FetchRequestData data = entry.getValue();

      // Build FetchRequest (includes FetchSession ID for incremental fetch โ€” KIP-227)
      FetchRequest.Builder request = FetchRequest.Builder
          .forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
          .isolationLevel(isolationLevel)
          .setMaxBytes(this.maxBytes)
          .metadata(data.metadata())     // session ID + epoch
          .removed(data.toForget())      // partitions unsubscribed since last fetch
          .replaced(data.toReplace());   // partitions whose fetch offset changed

      // Send asynchronously, register callback
      client.send(fetchTarget, request)
          .addListener(new RequestFutureListener<ClientResponse>() {
            @Override
            public void onSuccess(ClientResponse resp) {
              handleFetchResponse(fetchTarget, data, resp);  // enqueue to completedFetches
            }
            @Override
            public void onFailure(RuntimeException e) {
              handleFetchFailure(fetchTarget, data, e);
            }
          });
    }
    return fetchRequestMap.size();
  }

  /**
   * Drain completedFetches and deserialize records.
   * Called by poll().
   */
  public List<ConsumerRecord<K, V>> fetchRecords(TopicPartition partition, int maxRecords) {
    CompletedFetch completedFetch = completedFetches.peek();
    if (completedFetch == null) return Collections.emptyList();

    // Parse records from the CompletedFetch (deserialize key + value)
    return completedFetch.fetchRecords(maxRecords);
  }
}

The Internal Execution Order of poll()

// KafkaConsumer.java (simplified)
public ConsumerRecords<K, V> poll(Duration timeout) {
  acquireAndEnsureOpen();
  try {
    // 1. Run coordinator duties: auto-commit, heartbeat, rebalance handling
    coordinator.poll(timer);

    // 2. If no completed fetches, send FetchRequests and wait for responses
    if (!fetcher.hasAvailableFetches()) {
      client.poll(timer);  // non-blocking NIO poll, up to 'timeout' duration
    }

    // 3. Drain completedFetches, deserialize, return to user
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);

    // 4. If we got records, pipeline the NEXT fetch request immediately
    if (!records.isEmpty()) {
      if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
        client.pollNoWakeup();  // push the next FetchRequest onto the wire right away
    }

    return new ConsumerRecords<>(records);
  } finally {
    release();
  }
}

Step 4 is the pipelining trick: before returning the current batch to user code, poll() has already fired the next FetchRequest. While user code is processing the current batch, the next batch is already in transit. This effectively hides network latency from the application's perspective.

Broker Side: KafkaApis.handleFetchRequest()

// KafkaApis.scala (simplified)
def handleFetchRequest(request: RequestChannel.Request): Unit = {
  val versionId    = request.header.apiVersion
  val fetchRequest = request.body[FetchRequest]

  // โ‘  Distinguish source: Consumer (replicaId=-1) vs Follower replica (replicaId >= 0)
  val isFromFollower = fetchRequest.isFromFollower
  val isFromConsumer = fetchRequest.isFromConsumer

  // โ‘ก Authorization check (consumers only; inter-broker replication is pre-authorized)
  val authorizedRequestInfo = mutable.Map[TopicIdPartition, FetchRequest.PartitionData]()
  fetchRequest.fetchData(topicNames).forEach { (tp, partitionData) =>
    if (isFromConsumer && !authorize(request.context, READ, TOPIC, tp.topic))
      erroneous(tp, Errors.TOPIC_AUTHORIZATION_FAILED)
    else
      authorizedRequestInfo(tp) = partitionData
  }

  // โ‘ข Delegate to ReplicaManager to read data
  replicaManager.fetchMessages(
    params = new FetchParams(
      requestVersion  = versionId,
      replicaId       = fetchRequest.replicaId,
      maxWaitMs       = fetchRequest.maxWait,
      minBytes        = fetchRequest.minBytes,
      maxBytes        = fetchRequest.maxBytes,
      isolation       = FetchIsolation(fetchRequest),   // HW or LEO or LSO
      clientMetadata  = clientMetadata
    ),
    fetchInfos       = authorizedRequestInfo,
    quota            = quota,
    responseCallback = processResponseCallback
  )
}

replicaManager.fetchMessages()

// ReplicaManager.scala (simplified)
def fetchMessages(
  params: FetchParams,
  fetchInfos: Seq[(TopicIdPartition, PartitionData)],
  quota: ReplicaQuota,
  responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
): Unit = {

  // โ‘  Read from local logs
  val logReadResults = readFromLocalLog(
    replicaId           = params.replicaId,
    fetchOnlyFromLeader = params.fetchOnlyFromLeader,
    fetchIsolation      = params.isolation,
    fetchMaxBytes       = params.maxBytes,
    hardMaxBytesLimit   = false,
    readPartitionInfo   = fetchInfos,
    quota               = quota
  )

  // โ‘ก Check if enough data was read
  val bytesReadable  = logReadResults.map(_._2.info.records.sizeInBytes).sum
  val errorReadingData = logReadResults.exists(_._2.error != Errors.NONE)

  if (params.maxWaitMs <= 0 || bytesReadable >= params.minBytes || errorReadingData) {
    // Sufficient data (or no waiting required): respond immediately
    val fetchPartitionData = logReadResults.map { case (tp, r) => tp -> r.toFetchPartitionData(false) }
    responseCallback(fetchPartitionData)
  } else {
    // Not enough data: create DelayedFetch and park in Purgatory
    val delayedFetch = new DelayedFetch(params.maxWaitMs, params, fetchInfos, this, quota, responseCallback)
    val fetchPartitionKeys = fetchInfos.map { case (tp, _) => new TopicPartitionOperationKey(tp) }
    delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, fetchPartitionKeys)
  }
}

readFromLocalLog() โ€” The Actual Read Path

// ReplicaManager.scala (simplified internal helper)
private def readFromLocalLog(
  replicaId: Int,
  fetchOnlyFromLeader: Boolean,
  fetchIsolation: FetchIsolation,
  fetchMaxBytes: Int,
  hardMaxBytesLimit: Boolean,
  readPartitionInfo: Seq[(TopicIdPartition, PartitionData)],
  quota: ReplicaQuota
): Seq[(TopicIdPartition, LogReadResult)] = {

  readPartitionInfo.map { case (tp, fetchInfo) =>
    val adjustedMaxBytes = math.min(fetchInfo.maxBytes, fetchMaxBytes)
    try {
      val partition = getPartitionOrException(tp.topicPartition)
      val fetchDataInfo = partition.fetchMessages(
        fetchOffset            = fetchInfo.fetchOffset,
        currentLeaderEpoch     = fetchInfo.currentLeaderEpoch,
        maxBytes               = adjustedMaxBytes,
        fetchOnlyFromLeader    = fetchOnlyFromLeader,
        fetchIsolation         = fetchIsolation,
        includeAbortedTxns     = params.isFromConsumer
      )
      tp -> LogReadResult(fetchDataInfo, ...)
    } catch {
      case e: KafkaStorageException =>
        tp -> LogReadResult(..., error = Errors.KAFKA_STORAGE_ERROR)
      case e: NotLeaderOrFollowerException =>
        tp -> LogReadResult(..., error = Errors.NOT_LEADER_OR_FOLLOWER)
    }
  }
}

partition.fetchMessages() ultimately calls UnifiedLog.read():

// UnifiedLog.scala (simplified)
def read(
  startOffset: Long,
  maxLength: Int,
  isolation: FetchIsolation,
  minOneMessage: Boolean
): FetchDataInfo = {
  // โ‘  Locate the LogSegment containing startOffset
  // segments is a ConcurrentNavigableMap[Long, LogSegment] (key = base offset)
  val segmentEntry = segments.floorEntry(startOffset)
  val segment      = segmentEntry.getValue

  // โ‘ก Narrow down to the physical file position via sparse offset index (binary search)
  val offsetIndex = segment.offsetIndex
  val position    = offsetIndex.lookup(startOffset)   // returns nearest index entry

  // โ‘ข Build FetchDataInfo with a FileRecords reference โ€” lazy, no data read yet!
  // Actual bytes are transferred during network send via zero-copy
  val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
  FetchDataInfo(LogOffsetMetadata(startOffset), fetchInfo.records)
}

The critical observation: FetchDataInfo contains a FileRecords object, which is a lazy reference to a FileChannel slice. No bytes have been copied into memory at this point. The data transfer happens later, in the network send path.

Zero-Copy: The Most Important Performance Optimization

Kafka's read path leverages zero-copy via FileChannel.transferTo(), which maps to the sendfile() system call on Linux.

Traditional Read Path (4 copies)

Disk โ†’ Kernel buffer (DMA) โ†’ JVM heap (user space) โ†’ Socket buffer (kernel) โ†’ NIC (DMA)
          copy 1                   copy 2                    copy 3              copy 4

Steps 2 and 3 are the expensive ones: data crosses the kernel/user boundary twice and must traverse the JVM heap.

Zero-Copy Path (2 copies)

Disk โ†’ OS Page Cache (DMA) โ†’ NIC (DMA, via sendfile syscall)
          copy 1                  copy 2  (entirely in kernel space)

Source Code Evidence

// FileRecords.scala (simplified)
class FileRecords(val file: File, private val channel: FileChannel, ...) extends Records {

  /**
   * Write file data to a GatheringByteChannel (i.e., a network SocketChannel).
   * This is the zero-copy core call.
   */
  def writeTo(destChannel: GatheringByteChannel, offset: Long, length: Int): Long = {
    val tl = math.min(length.toLong, size - offset)
    // On Linux, FileChannel.transferTo() maps to sendfile()
    // Data flows from Page Cache directly to the socket buffer โ€” never touches the JVM heap
    channel.transferTo(start + offset, tl, destChannel)
  }
}

Prerequisites for zero-copy to apply:

  1. The data is already in the OS Page Cache (no disk I/O needed).
  2. The destination is a plain socket (not SSL/TLS encrypted).

When SSL is enabled, Kafka must copy data through the JVM heap to encrypt it, which breaks zero-copy. This is why SSL adds measurable CPU overhead and latency, especially at high throughput.

FetchSession: Incremental Fetch (KIP-227)

The Problem

Before KIP-227, every FetchRequest had to carry the full state for all assigned partitions โ€” current offset, leader epoch, and partition metadata. For a consumer subscribed to 1,000 partitions sending hundreds of fetch requests per second, this was enormous redundant overhead.

How FetchSession Works

The broker maintains a FetchSession per consumer, caching the last known state of each partition:

// FetchSessionHandler.java (client side, simplified)
class FetchSessionHandler {
  private int  sessionId    = INVALID_SESSION_ID;  // 0 = no session yet
  private long sessionEpoch = INITIAL_EPOCH;

  /**
   * Build the next FetchRequest:
   * - First request: Full Fetch (all partitions)
   * - Subsequent requests: Incremental Fetch (only changed partitions)
   */
  public FetchRequestData newBuilder() {
    if (sessionId == INVALID_SESSION_ID) {
      // Full fetch: send complete partition list
      return new FetchRequestData(toSend = all_partitions, sessionId = 0, epoch = INITIAL_EPOCH);
    } else {
      // Incremental fetch: only changed partitions
      Set<TopicPartition> added   = computeAdded();    // newly subscribed
      Set<TopicPartition> removed = computeRemoved();  // unsubscribed
      Set<TopicPartition> altered = computeAltered();  // fetch offset advanced
      return new FetchRequestData(
          toSend   = altered โˆช added,    // only what changed
          toForget = removed,
          sessionId = sessionId,
          epoch    = nextEpoch()
      );
    }
  }
}

Broker-side session management:

// FetchSession.scala (core/, simplified)
class FetchSession(val id: Int, val privileged: Boolean, ...) {
  // Cache of all partition states from the last complete fetch
  val partitionMap: util.LinkedHashMap[TopicIdPartition, FetchSession.ENTRY_TYPE] = ...

  def update(
    toReplace: util.ArrayList[..],
    toAdd:     util.ArrayList[..],
    toForget:  util.ArrayList[..]
  ): FetchSession = {
    // Incremental update: only process what changed
    toForget.forEach(partitionMap.remove)
    toReplace.forEach(entry => partitionMap.put(entry.topicIdPartition, entry))
    toAdd.forEach    (entry => partitionMap.put(entry.topicIdPartition, entry))
    this
  }
}

Impact: For a stable consumer (no rebalances, no partition reassignments), the typical incremental fetch request carries state for just a handful of partitions rather than all 1,000. KIP-227 can reduce per-fetch network overhead by 90%+ at scale.

Leader vs. Follower Reads: The Role of fetchIsolation

FetchIsolation determines the maximum readable offset:

Isolation Level Maximum Readable Offset Use Case
LOG_END LEO (Log End Offset) Follower replica replication
HIGH_WATERMARK HW (High Watermark) Regular consumer (read_uncommitted)
TXN_COMMITTED Last Stable Offset (LSO) Transactional consumer (read_committed)

Follower replicas include their own replicaId (Broker ID) in the FetchRequest. The broker recognizes this and uses LOG_END isolation, allowing followers to read data that has not yet been committed to the HW. Regular consumers use replicaId=-1; the broker uses HIGH_WATERMARK isolation, ensuring they only read data that all ISR members have acknowledged.

This single distinction in FetchIsolation is what keeps followers synchronized while keeping consumer semantics clean.

DelayedFetch Completion Conditions

DelayedFetch.tryComplete() is triggered in two scenarios:

  1. Timer expiry: When maxWaitMs elapses, the fetch returns whatever data is available (possibly nothing).
  2. New data written: After every successful UnifiedLog.appendAsLeader(), ReplicaManager calls checkAndComplete() on all DelayedFetch operations watching that partition.
// DelayedFetch.scala (simplified)
class DelayedFetch(...) extends DelayedOperation(fetchMetadata.fetchParams.maxWaitMs) {

  override def tryComplete(): Boolean = {
    var accumulatedSize = 0
    fetchMetadata.fetchPartitionStatus.foreach { case (topicIdPartition, fetchStatus) =>
      val partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition)
      // Use HW for consumers, LEO for followers, depending on FetchIsolation
      val endOffset = fetchStatus.fetchIsolation match {
        case FetchHighWatermark => partition.highWatermark
        case FetchLogEnd        => partition.logEndOffset
        case FetchTxnCommitted  => partition.lastStableOffset
      }
      accumulatedSize += math.max(0L, endOffset - fetchStatus.startOffsetMetadata.messageOffset).toInt
    }
    // Complete if accumulated readable bytes >= minBytes
    if (accumulatedSize >= fetchMetadata.fetchParams.minBytes) forceComplete()
    else false
  }

  override def onComplete(): Unit = {
    // Re-read logs (more data may have arrived since we first checked)
    val logReadResults = replicaManager.readFromLocalLog(...)
    responseCallback(logReadResults.map { case (tp, r) => tp -> r.toFetchPartitionData(isFetchedFromFollower) })
  }
}

Note that onComplete() performs another log read rather than using the originally cached results. This ensures the response contains the freshest possible data at the moment of completion.

Complete Consumer Fetch Path Summary

KafkaConsumer.poll()
  โ”‚
  โ”œโ”€โ”€ Fetcher.sendFetches()
  โ”‚     โ†’ Build FetchRequest (Full or Incremental via KIP-227)
  โ”‚     โ†’ ConsumerNetworkClient.send() (async NIO)
  โ”‚
  โ†“  (network transit)
  โ”‚
KafkaApis.handleFetchRequest()          โ† Broker side
  โ”‚  [auth check, identify consumer vs follower]
  โ†“
ReplicaManager.fetchMessages()
  โ”‚  [readFromLocalLog, check minBytes]
  โ†“
UnifiedLog.read()
  โ”‚  [index lookup โ†’ FileRecords (lazy reference)]
  โ†“
[data sufficient]  โ†’ responseCallback() โ†’ FetchResponse
[data insufficient] โ†’ DelayedFetch โ†’ wait for new writes or timeout
  โ”‚
  โ†“  (response returns to client)
  โ”‚
Fetcher.handleFetchResponse()
  โ†’ Enqueue into completedFetches
  โ”‚
KafkaConsumer.pollForFetches()
  โ†’ Drain completedFetches
  โ†’ Deserialize key + value
  โ†’ Return ConsumerRecords to user code

The Fetch path shares the same core design philosophy as the Produce path: async + callback + Purgatory. Handler threads never block waiting for data โ€” they register a DelayedFetch and return immediately. The response fires when new data arrives (triggered by an incoming Produce) or when the timeout expires. This non-blocking design allows a small handler thread pool to manage thousands of concurrent long-poll fetch requests simultaneously.

Rate this chapter
4.6  / 5  (11 ratings)

๐Ÿ’ฌ Comments