The Complete Path of a Produce Request
From Network Bytes to Disk: A Message's Journey
When a producer calls producer.send(record), what exactly happens inside Kafka? The answer spans the network layer, the API layer, the replication layer, and the storage layer — all before the broker can satisfy the acks semantic and return a response. This chapter traces that complete path with actual source code.
For clarity, we assume:
- Producer configured with
acks=-1(wait for full ISR acknowledgment) - The topic has 3 replicas (1 Leader + 2 Followers)
- The receiving broker is the Partition Leader
Stop 1: Network Reception and Parsing
SocketServer and Processor Threads
Kafka's network layer uses the Reactor pattern. SocketServer starts a set of Acceptor threads that listen on ports; each Acceptor is associated with multiple Processor threads. Processors use Java NIO Selector to read bytes from sockets, assemble complete NetworkReceive frames, and enqueue them into RequestChannel.
// SocketServer.scala (simplified)
class Processor(id: Int, requestChannel: RequestChannel, ...) extends AbstractServerThread {
private val selector = NetworkClientUtils.createSelector(...)
override def run(): Unit = {
while (isRunning) {
val ready = selector.select(300)
if (ready > 0) {
// Assemble complete request frames from received bytes
processCompletedReceives()
// Send each NetworkReceive to the RequestChannel queue
newRequests.foreach(req => requestChannel.sendRequest(req))
}
}
}
}
RequestChannel — The Thread-Crossing Bridge
RequestChannel is the decoupling queue between Processor threads and KafkaRequestHandler threads. A pool of KafkaRequestHandler threads dequeues requests and calls KafkaApis.handle(). This design separates I/O threads (Processors) from computation threads (Handlers), preventing slow request processing from stalling network reads.
Stop 2: KafkaApis.handleProduceRequest()
// KafkaApis.scala (simplified, core logic retained)
def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
val produceRequest = request.body[ProduceRequest]
// ① Authorization check (per topic)
val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
produceRequest.data.topicData.forEach { topicData =>
topicData.partitionData.forEach { partitionData =>
val tp = new TopicPartition(topicData.name, partitionData.index)
if (!authorize(request.context, WRITE, TOPIC, tp.topic))
unauthorizedTopicResponses(tp) = new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
else
authorizedRequestInfo(tp) = partitionData.records.asInstanceOf[MemoryRecords]
}
}
// ② Quota check (ClientQuotaManager)
// If produce rate quota is exceeded, throttleMs > 0 and response is delayed
val requestThrottleMs = quotas.produce.maybeRecordAndGetThrottleTimeMs(request, ...)
// ③ Delegate actual write to ReplicaManager
def processProduceRequest(): Unit = {
replicaManager.appendRecords(
timeout = produceRequest.timeout.toLong,
requiredAcks = produceRequest.acks,
internalTopicsAllowed = isInternalTopic,
origin = AppendOrigin.CLIENT,
entriesPerPartition = authorizedRequestInfo,
responseCallback = sendResponseCallback, // closure: called when write completes or times out
recordConversionStatsCallback = updateRecordConversionStats,
requestLocal = requestLocal
)
}
if (requestThrottleMs > 0)
throttle(quotas.produce, request, requestThrottleMs, processProduceRequest)
else
processProduceRequest()
}
sendResponseCallback is a closure that captures the request reference and eventually calls requestChannel.sendResponse() to write the response back to the Processor thread. This is the key to asynchronous response handling — the handler thread does not block waiting for replication; instead, it registers a callback and returns.
Stop 3: ReplicaManager.appendRecords()
// ReplicaManager.scala (simplified)
def appendRecords(
timeout: Long,
requiredAcks: Short,
internalTopicsAllowed: Boolean,
origin: AppendOrigin,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = ...,
requestLocal: RequestLocal = RequestLocal.NoCaching
): Unit = {
// ① Validate acks value (only 0, 1, -1 are legal)
if (!isValidRequiredAcks(requiredAcks)) {
sendErrorResponseCallback(Errors.INVALID_REQUIRED_ACKS)
return
}
// ② Write to local log: append to the Leader's local log
val localProduceResults: Map[TopicPartition, LogAppendResult] =
appendToLocalLog(internalTopicsAllowed, origin, entriesPerPartition, requiredAcks, requestLocal)
// ③ Decide whether to wait for follower replication
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
// acks=-1 and write succeeded: create DelayedProduce, park in Purgatory
val produceMetadata = ProduceMetadata(requiredAcks, localProduceResults)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, ...)
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else {
// acks=0 or acks=1: fire callback immediately
val produceResponseStatus = localProduceResults.map { case (k, v) =>
k -> new PartitionResponse(v.error, v.firstOffset, v.logAppendTime, v.logStartOffset)
}
responseCallback(produceResponseStatus)
}
}
appendToLocalLog() looks up the Partition object for each TopicPartition and delegates:
// ReplicaManager.scala (internal helper, simplified)
private def appendToLocalLog(...): Map[TopicPartition, LogAppendResult] = {
entriesPerPartition.map { case (topicPartition, records) =>
val partition = getPartitionOrException(topicPartition)
val result = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal)
topicPartition -> result
}
}
Stop 4: Partition.appendRecordsToLeader()
// Partition.scala (simplified)
def appendRecordsToLeader(
records: MemoryRecords,
origin: AppendOrigin,
requiredAcks: Int,
requestLocal: RequestLocal
): LogAppendResult = {
// ① Confirm leadership (throw NotLeaderOrFollowerException if epoch mismatch)
val leaderEpoch = leaderEpochStartOffsetOpt.getOrElse(
throw new NotLeaderOrFollowerException(s"Leader not local for partition $topicPartition")
)
// ② For acks=-1, check ISR size against min.insync.replicas
val numFollowers = isrState.isr.size - 1
if (requiredAcks == -1 && numFollowers < config.minInSyncReplicas - 1) {
throw new NotEnoughReplicasException(
s"The size of the current ISR ${isrState.isr} is insufficient to satisfy" +
s" the required acks=-1 for topic partition $topicPartition"
)
}
// ③ Delegate write to UnifiedLog
val log = leaderLogIfLocal.getOrElse(throw new NotLeaderOrFollowerException(...))
log.appendAsLeader(records, leaderEpoch, origin, requestLocal)
}
The min.insync.replicas check is enforced here — before any bytes are written. If the ISR has shrunk (due to follower failures) to a size that cannot satisfy the configured minimum, the write is rejected with NotEnoughReplicasException, not silently degraded.
Stop 5: UnifiedLog.appendAsLeader()
// UnifiedLog.scala (simplified)
def appendAsLeader(
records: MemoryRecords,
leaderEpoch: Int,
origin: AppendOrigin = AppendOrigin.CLIENT,
requestLocal: RequestLocal = RequestLocal.NoCaching
): LogAppendInfo = {
// Delegates to internal append() with assignOffsets=true
append(records, origin, assignOffsets = true, leaderEpoch, requestLocal, ignoreRecordSize = false)
}
private def append(
records: MemoryRecords,
origin: AppendOrigin,
assignOffsets: Boolean,
leaderEpoch: Int,
requestLocal: RequestLocal,
ignoreRecordSize: Boolean
): LogAppendInfo = {
// ① Analyze and validate all record batches
// Checks: magic version, compression codec, CRC, max message size, key/value constraints
val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch)
// ② Convert old message format if needed (v0/v1 → v2)
// This involves CPU-intensive byte copying — prefer up-to-date producer clients to avoid this
var validRecords = trimInvalidBytes(records, appendInfo)
// ③ Assign offsets + update producer state (idempotence / transaction tracking)
validRecords = updateProducerStateAndAssignOffsets(validRecords, appendInfo, requestLocal)
// ④ Write to the active LogSegment, under lock
lock synchronized {
maybeHandleIOException(s"Error while appending records to $topicPartition") {
// Roll to a new segment if the active segment is full
val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
// Perform the actual write
segment.append(
largestOffset = appendInfo.lastOffset,
largestTimestamp = appendInfo.maxTimestamp,
shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
records = validRecords
)
// Advance LEO (Log End Offset)
updateLogEndOffset(appendInfo.lastOffset + 1)
}
}
appendInfo
}
Offset Assignment
updateProducerStateAndAssignOffsets() performs two things:
- Updates the ProducerID state (sequence number validation for idempotent producers, transaction marker handling for transactional producers).
- Assigns a monotonically increasing offset to each record in the batch, starting from the current LEO. The
baseOffsetfield of theRecordBatchheader is set to the first assigned offset.
Stop 6: LogSegment.append()
// LogSegment.java (migrated to Java in 3.6+, simplified)
public void append(
long largestOffset,
long largestTimestamp,
long shallowOffsetOfMaxTimestamp,
MemoryRecords records
) throws IOException {
if (records.sizeInBytes() > 0) {
// ① Write to the .log file via FileChannel.write()
// Data lands in OS Page Cache — NOT synced to disk yet
int appendedBytes = log.append(records);
// ② Update timestamp index if a new maximum timestamp was observed
if (largestTimestamp > maxTimestampSoFar()) {
maxTimestampAndOffsetSoFar = TimestampOffset.apply(
largestTimestamp, shallowOffsetOfMaxTimestamp);
timeIndex.maybeAppend(maxTimestampSoFar(), offsetOfMaxTimestampSoFar());
}
// ③ Maybe append to the sparse offset index
// One index entry per indexIntervalBytes of log data (default 4 KB)
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex.append(largestOffset, log.sizeInBytes());
bytesSinceLastIndexEntry = 0;
}
bytesSinceLastIndexEntry += appendedBytes;
}
}
log.append(records) ultimately calls FileChannel.write(ByteBuffer). At this point, the data is in the operating system's Page Cache — in memory, not on disk. Kafka deliberately avoids calling fsync on every write, instead relying on OS background flushing and replication redundancy for durability. This is the primary reason Kafka achieves such high write throughput.
Stop 7: DelayedProduce Waits for ISR Acknowledgment
When acks=-1, appendRecords() creates a DelayedProduce and registers it with the Purgatory:
// DelayedProduce.scala (simplified)
class DelayedProduce(
delayMs: Long,
produceMetadata: ProduceMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
...
) extends DelayedOperation(delayMs) {
// Completion condition: has the HW advanced past the written offsets?
override def tryComplete(): Boolean = {
var allComplete = true
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
if (status.acksPending) {
val partition = replicaManager.getPartitionOrException(topicPartition)
// HW > requiredOffset means all ISR replicas have fetched this message
if (partition.highWatermark >= status.requiredOffset) {
status.acksPending = false
status.responseStatus = new PartitionResponse(Errors.NONE, ...)
} else if (partition.isOffline) {
status.acksPending = false
status.responseStatus = new PartitionResponse(Errors.KAFKA_STORAGE_ERROR, ...)
allComplete = false
} else {
allComplete = false // still waiting
}
}
}
if (allComplete) forceComplete() else false
}
// Called whether we completed normally or timed out
override def onComplete(): Unit = {
val produceResponseStatus = produceMetadata.produceStatus.map { case (k, v) => k -> v.responseStatus }
responseCallback(produceResponseStatus)
}
}
How Follower FetchRequests Advance the High Watermark
Followers periodically send FetchRequests to the Leader. When the Leader processes a Fetch request from a Follower, it records how far that Follower has replicated. Once all ISR members have replicated up to a given offset, the Leader advances its HW to that offset and triggers a Purgatory check:
// ReplicaManager.scala (simplified)
def updateFollowerFetchState(
followerId: Int,
readResults: Seq[(TopicPartition, LogReadResult)],
...
): Unit = {
readResults.foreach { case (topicPartition, readResult) =>
val partition = getPartitionOrException(topicPartition)
// Record follower's fetch position; advance HW if all ISR members have caught up
partition.updateFollowerFetchState(
followerId,
followerFetchOffsetMetadata = readResult.info.fetchOffsetMetadata,
followerStartOffset = readResult.followerLogStartOffset,
followerFetchTimeMs = readResult.fetchTimeMs,
leaderEndOffset = readResult.leaderLogEndOffset
)
}
// After HW advances, wake up any DelayedProduce that was waiting
delayedProducePurgatory.checkAndComplete(new TopicPartitionOperationKey(...))
}
Complete Call Chain Summary
KafkaApis.handleProduceRequest()
│ [auth check, quota check, parse ProduceRequest]
↓
ReplicaManager.appendRecords()
│ [validate acks, dispatch to local log]
↓
Partition.appendRecordsToLeader()
│ [confirm leadership, check min.insync.replicas]
↓
UnifiedLog.appendAsLeader()
│ [validate format, assign offsets, update producer state]
↓
LogSegment.append()
│ [FileChannel.write() → OS Page Cache]
│ [update offset index + time index]
↓
[acks=0/1] ──→ responseCallback() immediately ──→ send response
[acks=-1] ──→ create DelayedProduce ──→ register in Purgatory
▲
Follower sends FetchRequest
│
updateFollowerFetchState() → HW advances
│
DelayedProduce.tryComplete() → forceComplete()
│
onComplete() → responseCallback() → send response
Design Details Worth Noting
Minimal Lock Contention on the Write Path
The lock synchronized block in UnifiedLog.append() protects only the brief operation of finding the active segment and recording the write in memory structures. The actual FileChannel.write() happens inside FileRecords.append(), which relies on FileChannel's own thread-safety guarantees. This means lock contention does not become a bottleneck even under high concurrent write load.
OS Page Cache Is the Performance Secret
Kafka maintains no application-level message cache. It relies entirely on the operating system's Page Cache. Consumers reading recently produced messages find the data already in Page Cache — it never touches the JVM heap on the read path. When combined with zero-copy transfer (Chapter 19), data flows from Page Cache directly to the network socket through the kernel, bypassing all user-space copying.
The Cost of Format Conversion
When a producer sends messages in old format (v0/v1), the convertAndAppend() path in UnifiedLog.append() performs CPU-intensive byte-by-byte copying to upgrade the format. For high-throughput topics, this can visibly increase broker CPU usage. Always use a current producer client library to ensure messages arrive in format v2, eliminating this conversion entirely.
Why the Handler Thread Never Blocks
Notice that the KafkaRequestHandler thread that calls appendRecords() never blocks waiting for follower replication. It writes to the local log, registers a callback in the Purgatory, and immediately returns to pick up the next request from RequestChannel. The callback fires later — either when the HW advances (triggered by a Follower's Fetch) or when the timeout expires. This non-blocking design is what allows a small handler thread pool to service thousands of concurrent in-flight produce requests.