Fetch Request and Consumer Pull Mechanism
The Philosophy of Pull
Kafka chose consumer-driven pull over broker-driven push, and this decision has deep engineering roots.
The fundamental tension in a push model is that the broker must know how fast each consumer can process. Slow consumers cause backlog on the broker; fast consumers waste resources waiting. Kafka's designers chose the simpler answer: let the consumer decide when and how much to fetch. Slow consumers naturally fall behind without affecting other consumers or the broker. Consumers that crash and restart can resume from any offset โ a semantic that is extremely difficult to implement in a push model.
With this philosophy in mind, let's examine the implementation.
Consumer Side: How the Fetcher Thread Works
KafkaConsumer's Threading Model
KafkaConsumer is not thread-safe โ all methods must be called from a single thread. Internally, it uses ConsumerNetworkClient (which wraps a non-blocking NIO client) to send and receive network requests asynchronously. Since Kafka 3.5, the internal structure has migrated toward the new AsyncKafkaConsumer architecture, but the core Fetch semantics remain unchanged.
The Fetcher's Core Loop
// Fetcher.java (clients/src/main/java/org/apache/kafka/clients/consumer/internals/)
// Simplified โ core logic retained
public class Fetcher<K, V> implements Closeable {
// Completed fetch results (filled by network thread, consumed by poll() thread)
private final ConcurrentLinkedQueue<CompletedFetch> completedFetches = new ConcurrentLinkedQueue<>();
/**
* Send FetchRequests to all leaders of partitions assigned to this consumer.
* Called internally by poll().
*/
public synchronized int sendFetches() {
// Find all "fetchable" partitions: assigned, has a known leader, no in-flight request
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();
// Build FetchRequest (includes FetchSession ID for incremental fetch โ KIP-227)
FetchRequest.Builder request = FetchRequest.Builder
.forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
.isolationLevel(isolationLevel)
.setMaxBytes(this.maxBytes)
.metadata(data.metadata()) // session ID + epoch
.removed(data.toForget()) // partitions unsubscribed since last fetch
.replaced(data.toReplace()); // partitions whose fetch offset changed
// Send asynchronously, register callback
client.send(fetchTarget, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
handleFetchResponse(fetchTarget, data, resp); // enqueue to completedFetches
}
@Override
public void onFailure(RuntimeException e) {
handleFetchFailure(fetchTarget, data, e);
}
});
}
return fetchRequestMap.size();
}
/**
* Drain completedFetches and deserialize records.
* Called by poll().
*/
public List<ConsumerRecord<K, V>> fetchRecords(TopicPartition partition, int maxRecords) {
CompletedFetch completedFetch = completedFetches.peek();
if (completedFetch == null) return Collections.emptyList();
// Parse records from the CompletedFetch (deserialize key + value)
return completedFetch.fetchRecords(maxRecords);
}
}
The Internal Execution Order of poll()
// KafkaConsumer.java (simplified)
public ConsumerRecords<K, V> poll(Duration timeout) {
acquireAndEnsureOpen();
try {
// 1. Run coordinator duties: auto-commit, heartbeat, rebalance handling
coordinator.poll(timer);
// 2. If no completed fetches, send FetchRequests and wait for responses
if (!fetcher.hasAvailableFetches()) {
client.poll(timer); // non-blocking NIO poll, up to 'timeout' duration
}
// 3. Drain completedFetches, deserialize, return to user
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
// 4. If we got records, pipeline the NEXT fetch request immediately
if (!records.isEmpty()) {
if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
client.pollNoWakeup(); // push the next FetchRequest onto the wire right away
}
return new ConsumerRecords<>(records);
} finally {
release();
}
}
Step 4 is the pipelining trick: before returning the current batch to user code, poll() has already fired the next FetchRequest. While user code is processing the current batch, the next batch is already in transit. This effectively hides network latency from the application's perspective.
Broker Side: KafkaApis.handleFetchRequest()
// KafkaApis.scala (simplified)
def handleFetchRequest(request: RequestChannel.Request): Unit = {
val versionId = request.header.apiVersion
val fetchRequest = request.body[FetchRequest]
// โ Distinguish source: Consumer (replicaId=-1) vs Follower replica (replicaId >= 0)
val isFromFollower = fetchRequest.isFromFollower
val isFromConsumer = fetchRequest.isFromConsumer
// โก Authorization check (consumers only; inter-broker replication is pre-authorized)
val authorizedRequestInfo = mutable.Map[TopicIdPartition, FetchRequest.PartitionData]()
fetchRequest.fetchData(topicNames).forEach { (tp, partitionData) =>
if (isFromConsumer && !authorize(request.context, READ, TOPIC, tp.topic))
erroneous(tp, Errors.TOPIC_AUTHORIZATION_FAILED)
else
authorizedRequestInfo(tp) = partitionData
}
// โข Delegate to ReplicaManager to read data
replicaManager.fetchMessages(
params = new FetchParams(
requestVersion = versionId,
replicaId = fetchRequest.replicaId,
maxWaitMs = fetchRequest.maxWait,
minBytes = fetchRequest.minBytes,
maxBytes = fetchRequest.maxBytes,
isolation = FetchIsolation(fetchRequest), // HW or LEO or LSO
clientMetadata = clientMetadata
),
fetchInfos = authorizedRequestInfo,
quota = quota,
responseCallback = processResponseCallback
)
}
replicaManager.fetchMessages()
// ReplicaManager.scala (simplified)
def fetchMessages(
params: FetchParams,
fetchInfos: Seq[(TopicIdPartition, PartitionData)],
quota: ReplicaQuota,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
): Unit = {
// โ Read from local logs
val logReadResults = readFromLocalLog(
replicaId = params.replicaId,
fetchOnlyFromLeader = params.fetchOnlyFromLeader,
fetchIsolation = params.isolation,
fetchMaxBytes = params.maxBytes,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfos,
quota = quota
)
// โก Check if enough data was read
val bytesReadable = logReadResults.map(_._2.info.records.sizeInBytes).sum
val errorReadingData = logReadResults.exists(_._2.error != Errors.NONE)
if (params.maxWaitMs <= 0 || bytesReadable >= params.minBytes || errorReadingData) {
// Sufficient data (or no waiting required): respond immediately
val fetchPartitionData = logReadResults.map { case (tp, r) => tp -> r.toFetchPartitionData(false) }
responseCallback(fetchPartitionData)
} else {
// Not enough data: create DelayedFetch and park in Purgatory
val delayedFetch = new DelayedFetch(params.maxWaitMs, params, fetchInfos, this, quota, responseCallback)
val fetchPartitionKeys = fetchInfos.map { case (tp, _) => new TopicPartitionOperationKey(tp) }
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, fetchPartitionKeys)
}
}
readFromLocalLog() โ The Actual Read Path
// ReplicaManager.scala (simplified internal helper)
private def readFromLocalLog(
replicaId: Int,
fetchOnlyFromLeader: Boolean,
fetchIsolation: FetchIsolation,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
readPartitionInfo: Seq[(TopicIdPartition, PartitionData)],
quota: ReplicaQuota
): Seq[(TopicIdPartition, LogReadResult)] = {
readPartitionInfo.map { case (tp, fetchInfo) =>
val adjustedMaxBytes = math.min(fetchInfo.maxBytes, fetchMaxBytes)
try {
val partition = getPartitionOrException(tp.topicPartition)
val fetchDataInfo = partition.fetchMessages(
fetchOffset = fetchInfo.fetchOffset,
currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
maxBytes = adjustedMaxBytes,
fetchOnlyFromLeader = fetchOnlyFromLeader,
fetchIsolation = fetchIsolation,
includeAbortedTxns = params.isFromConsumer
)
tp -> LogReadResult(fetchDataInfo, ...)
} catch {
case e: KafkaStorageException =>
tp -> LogReadResult(..., error = Errors.KAFKA_STORAGE_ERROR)
case e: NotLeaderOrFollowerException =>
tp -> LogReadResult(..., error = Errors.NOT_LEADER_OR_FOLLOWER)
}
}
}
partition.fetchMessages() ultimately calls UnifiedLog.read():
// UnifiedLog.scala (simplified)
def read(
startOffset: Long,
maxLength: Int,
isolation: FetchIsolation,
minOneMessage: Boolean
): FetchDataInfo = {
// โ Locate the LogSegment containing startOffset
// segments is a ConcurrentNavigableMap[Long, LogSegment] (key = base offset)
val segmentEntry = segments.floorEntry(startOffset)
val segment = segmentEntry.getValue
// โก Narrow down to the physical file position via sparse offset index (binary search)
val offsetIndex = segment.offsetIndex
val position = offsetIndex.lookup(startOffset) // returns nearest index entry
// โข Build FetchDataInfo with a FileRecords reference โ lazy, no data read yet!
// Actual bytes are transferred during network send via zero-copy
val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
FetchDataInfo(LogOffsetMetadata(startOffset), fetchInfo.records)
}
The critical observation: FetchDataInfo contains a FileRecords object, which is a lazy reference to a FileChannel slice. No bytes have been copied into memory at this point. The data transfer happens later, in the network send path.
Zero-Copy: The Most Important Performance Optimization
Kafka's read path leverages zero-copy via FileChannel.transferTo(), which maps to the sendfile() system call on Linux.
Traditional Read Path (4 copies)
Disk โ Kernel buffer (DMA) โ JVM heap (user space) โ Socket buffer (kernel) โ NIC (DMA)
copy 1 copy 2 copy 3 copy 4
Steps 2 and 3 are the expensive ones: data crosses the kernel/user boundary twice and must traverse the JVM heap.
Zero-Copy Path (2 copies)
Disk โ OS Page Cache (DMA) โ NIC (DMA, via sendfile syscall)
copy 1 copy 2 (entirely in kernel space)
Source Code Evidence
// FileRecords.scala (simplified)
class FileRecords(val file: File, private val channel: FileChannel, ...) extends Records {
/**
* Write file data to a GatheringByteChannel (i.e., a network SocketChannel).
* This is the zero-copy core call.
*/
def writeTo(destChannel: GatheringByteChannel, offset: Long, length: Int): Long = {
val tl = math.min(length.toLong, size - offset)
// On Linux, FileChannel.transferTo() maps to sendfile()
// Data flows from Page Cache directly to the socket buffer โ never touches the JVM heap
channel.transferTo(start + offset, tl, destChannel)
}
}
Prerequisites for zero-copy to apply:
- The data is already in the OS Page Cache (no disk I/O needed).
- The destination is a plain socket (not SSL/TLS encrypted).
When SSL is enabled, Kafka must copy data through the JVM heap to encrypt it, which breaks zero-copy. This is why SSL adds measurable CPU overhead and latency, especially at high throughput.
FetchSession: Incremental Fetch (KIP-227)
The Problem
Before KIP-227, every FetchRequest had to carry the full state for all assigned partitions โ current offset, leader epoch, and partition metadata. For a consumer subscribed to 1,000 partitions sending hundreds of fetch requests per second, this was enormous redundant overhead.
How FetchSession Works
The broker maintains a FetchSession per consumer, caching the last known state of each partition:
// FetchSessionHandler.java (client side, simplified)
class FetchSessionHandler {
private int sessionId = INVALID_SESSION_ID; // 0 = no session yet
private long sessionEpoch = INITIAL_EPOCH;
/**
* Build the next FetchRequest:
* - First request: Full Fetch (all partitions)
* - Subsequent requests: Incremental Fetch (only changed partitions)
*/
public FetchRequestData newBuilder() {
if (sessionId == INVALID_SESSION_ID) {
// Full fetch: send complete partition list
return new FetchRequestData(toSend = all_partitions, sessionId = 0, epoch = INITIAL_EPOCH);
} else {
// Incremental fetch: only changed partitions
Set<TopicPartition> added = computeAdded(); // newly subscribed
Set<TopicPartition> removed = computeRemoved(); // unsubscribed
Set<TopicPartition> altered = computeAltered(); // fetch offset advanced
return new FetchRequestData(
toSend = altered โช added, // only what changed
toForget = removed,
sessionId = sessionId,
epoch = nextEpoch()
);
}
}
}
Broker-side session management:
// FetchSession.scala (core/, simplified)
class FetchSession(val id: Int, val privileged: Boolean, ...) {
// Cache of all partition states from the last complete fetch
val partitionMap: util.LinkedHashMap[TopicIdPartition, FetchSession.ENTRY_TYPE] = ...
def update(
toReplace: util.ArrayList[..],
toAdd: util.ArrayList[..],
toForget: util.ArrayList[..]
): FetchSession = {
// Incremental update: only process what changed
toForget.forEach(partitionMap.remove)
toReplace.forEach(entry => partitionMap.put(entry.topicIdPartition, entry))
toAdd.forEach (entry => partitionMap.put(entry.topicIdPartition, entry))
this
}
}
Impact: For a stable consumer (no rebalances, no partition reassignments), the typical incremental fetch request carries state for just a handful of partitions rather than all 1,000. KIP-227 can reduce per-fetch network overhead by 90%+ at scale.
Leader vs. Follower Reads: The Role of fetchIsolation
FetchIsolation determines the maximum readable offset:
| Isolation Level | Maximum Readable Offset | Use Case |
|---|---|---|
LOG_END |
LEO (Log End Offset) | Follower replica replication |
HIGH_WATERMARK |
HW (High Watermark) | Regular consumer (read_uncommitted) |
TXN_COMMITTED |
Last Stable Offset (LSO) | Transactional consumer (read_committed) |
Follower replicas include their own replicaId (Broker ID) in the FetchRequest. The broker recognizes this and uses LOG_END isolation, allowing followers to read data that has not yet been committed to the HW. Regular consumers use replicaId=-1; the broker uses HIGH_WATERMARK isolation, ensuring they only read data that all ISR members have acknowledged.
This single distinction in FetchIsolation is what keeps followers synchronized while keeping consumer semantics clean.
DelayedFetch Completion Conditions
DelayedFetch.tryComplete() is triggered in two scenarios:
- Timer expiry: When
maxWaitMselapses, the fetch returns whatever data is available (possibly nothing). - New data written: After every successful
UnifiedLog.appendAsLeader(),ReplicaManagercallscheckAndComplete()on allDelayedFetchoperations watching that partition.
// DelayedFetch.scala (simplified)
class DelayedFetch(...) extends DelayedOperation(fetchMetadata.fetchParams.maxWaitMs) {
override def tryComplete(): Boolean = {
var accumulatedSize = 0
fetchMetadata.fetchPartitionStatus.foreach { case (topicIdPartition, fetchStatus) =>
val partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition)
// Use HW for consumers, LEO for followers, depending on FetchIsolation
val endOffset = fetchStatus.fetchIsolation match {
case FetchHighWatermark => partition.highWatermark
case FetchLogEnd => partition.logEndOffset
case FetchTxnCommitted => partition.lastStableOffset
}
accumulatedSize += math.max(0L, endOffset - fetchStatus.startOffsetMetadata.messageOffset).toInt
}
// Complete if accumulated readable bytes >= minBytes
if (accumulatedSize >= fetchMetadata.fetchParams.minBytes) forceComplete()
else false
}
override def onComplete(): Unit = {
// Re-read logs (more data may have arrived since we first checked)
val logReadResults = replicaManager.readFromLocalLog(...)
responseCallback(logReadResults.map { case (tp, r) => tp -> r.toFetchPartitionData(isFetchedFromFollower) })
}
}
Note that onComplete() performs another log read rather than using the originally cached results. This ensures the response contains the freshest possible data at the moment of completion.
Complete Consumer Fetch Path Summary
KafkaConsumer.poll()
โ
โโโ Fetcher.sendFetches()
โ โ Build FetchRequest (Full or Incremental via KIP-227)
โ โ ConsumerNetworkClient.send() (async NIO)
โ
โ (network transit)
โ
KafkaApis.handleFetchRequest() โ Broker side
โ [auth check, identify consumer vs follower]
โ
ReplicaManager.fetchMessages()
โ [readFromLocalLog, check minBytes]
โ
UnifiedLog.read()
โ [index lookup โ FileRecords (lazy reference)]
โ
[data sufficient] โ responseCallback() โ FetchResponse
[data insufficient] โ DelayedFetch โ wait for new writes or timeout
โ
โ (response returns to client)
โ
Fetcher.handleFetchResponse()
โ Enqueue into completedFetches
โ
KafkaConsumer.pollForFetches()
โ Drain completedFetches
โ Deserialize key + value
โ Return ConsumerRecords to user code
The Fetch path shares the same core design philosophy as the Produce path: async + callback + Purgatory. Handler threads never block waiting for data โ they register a DelayedFetch and return immediately. The response fires when new data arrives (triggered by an incoming Produce) or when the timeout expires. This non-blocking design allows a small handler thread pool to manage thousands of concurrent long-poll fetch requests simultaneously.