Network Layer: Reactor Model and Request Pipeline
Network Layer: Reactor Model and Request Pipeline
Kafka's ability to handle hundreds of thousands of concurrent connections on a single broker stems from its carefully engineered network layer architecture. Understanding this architecture not only helps with parameter tuning but also enables rapid root-cause identification when network-related problems surface in production. This chapter starts with the fundamentals of the Reactor pattern and progressively unpacks Kafka's complete request processing pipeline.
Why Reactor: The Fundamental Limitation of Thread-per-Connection
Where the Thread Model Breaks Down
The most intuitive server design is thread-per-connection: for each client that connects, the server spawns a dedicated thread to handle all I/O and business logic for that connection. This model is simple to reason about and works well when connection counts are modest.
In Kafka's operating environment, it fails at a fundamental level:
Memory wall: A Java thread's default stack size is 512KB to 1MB. 1,000 connections require 500MB to 1GB of stack memory. 10,000 connections demand 5 to 10GB — consumed entirely by idle thread stacks doing nothing but waiting for I/O.
Context switch overhead: The OS kernel must schedule and switch between all active threads. When the vast majority of threads are blocked on network I/O — which is most of the time — the CPU wastes most of its cycles on context switching rather than actual computation.
I/O wait waste: Network I/O is inherently wait-heavy: send a request, wait for data, receive a response. A blocked thread consumes memory and a scheduler slot while contributing nothing.
Measurement studies on I/O-bound servers consistently show that threads spend less than 5% of their time doing useful computation — the remainder is waiting.
The Reactor Pattern: Event-Driven I/O
The Reactor pattern (also called Event Loop or I/O multiplexing) solves this with a simple insight: use a small fixed number of threads to monitor large numbers of connections, investing CPU only when a connection has actual work to do.
This relies on OS-level I/O multiplexing primitives:
- Linux:
epoll— O(1) event notification, scales to hundreds of thousands of file descriptors - macOS/BSD:
kqueue - Java abstraction:
java.nio.channels.Selector— wraps epoll/kqueue under the hood
// Conceptual Reactor implementation (illustrative, not Kafka source)
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false); // non-blocking is essential
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// Block until at least one channel has an event — this is epoll_wait()
int readyCount = selector.select();
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (key.isAcceptable()) {
// New connection arrived — register it for read events
SocketChannel client = serverChannel.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
// No new thread created — this same thread handles thousands of connections
} else if (key.isReadable()) {
// Data is available on this connection
handleRead((SocketChannel) key.channel());
} else if (key.isWritable()) {
// Socket buffer has space — write pending response
handleWrite((SocketChannel) key.channel());
}
}
}
A single Selector can monitor thousands of connections. CPU resources are consumed only when genuine I/O events occur, with zero per-connection thread overhead.
Kafka's Three-Layer Network Architecture
Kafka implements a refined multi-reactor variant, located in the kafka.network package. The architecture has three distinct layers:
Client Connections (potentially thousands)
│
▼
┌──────────────────────────────────────┐
│ Acceptor Thread (1) │
│ - Listens on configured port(s) │
│ - Accepts new TCP connections │
│ - Round-robin distributes to │
│ Processor threads │
└──────────────┬───────────────────────┘
│ distribute new connections
┌──────────┼──────────┐
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│ P-0 │ │ P-1 │ │ P-2 │ Processor Threads (num.network.threads, default 3)
│Selector│ │Selector│ │Selector│ Each owns its Selector, manages a connection subset
│read/ │ │read/ │ │read/ │ Reads raw bytes → parses into Requests
│write │ │write │ │write │ Writes Responses back to sockets
└───┬───┘ └───┬───┘ └───┬───┘
└──────────┴──────────┘
│ enqueue parsed Requests
▼
┌──────────────────────────────────────┐
│ RequestChannel │
│ (bounded blocking queue, │
│ capacity = queued.max.requests) │
└──────────────┬───────────────────────┘
│ dequeue Requests
┌──────────┼──────────┬──────────┐
▼ ▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐
│ H-0 │ │ H-1 │ │ H-2 │ │ H-7 │ I/O Handler Threads (num.io.threads, default 8)
│KafkaApis KafkaApis KafkaApis KafkaApis Execute actual business logic:
│ │ │ │ │ │ │ │ log writes, log reads, ISR updates,
└───────┘ └───────┘ └───────┘ └───────┘ group coordination, etc.
Layer 1: The Acceptor Thread
The Acceptor class (kafka.network.Acceptor) has exactly one responsibility: accept new TCP connections and hand them off. It maintains a ServerSocketChannel for each configured listener (e.g., PLAINTEXT://0.0.0.0:9092, SSL://0.0.0.0:9093).
On accepting a new connection, it assigns it to one of the Processor threads using round-robin. The Acceptor never reads or writes data — it only establishes connections. Consequently, it never becomes a bottleneck.
# Confirm Acceptor thread existence via thread dump
jstack $(pgrep -f kafka.Kafka) | grep -A 3 "Acceptor"
# Expected output:
# "kafka-network-thread-1-Acceptor" #43 daemon prio=5 os_prio=0
# java.lang.Thread.State: RUNNABLE
# at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) ← blocked in epoll_wait
Layer 2: Processor Threads (N threads)
Each Processor owns an independent Selector and manages its assigned subset of connections. It interleaves two activities:
Reading phase:
selector.select()detects which connections have data available- Read bytes from
SocketChannelusing the Kafka binary protocol framing (4-byte length prefix + payload) - Assemble complete requests from potentially fragmented reads
- Wrap in a
RequestChannel.Requestobject and enqueue to RequestChannel
Writing phase:
- I/O Handlers post completed
Responseobjects back to the Processor's response queue - Processor detects pending responses and writes them to the appropriate
SocketChannel
# View all Processor threads (one per num.network.threads, per listener)
jstack $(pgrep -f kafka.Kafka) | grep "kafka-network-thread"
# "kafka-network-thread-1-ListenerName(PLAINTEXT)-0"
# "kafka-network-thread-1-ListenerName(PLAINTEXT)-1"
# "kafka-network-thread-1-ListenerName(PLAINTEXT)-2"
# (3 threads for a broker with num.network.threads=3 and one PLAINTEXT listener)
Layer 3: I/O Handler Threads + KafkaApis Dispatch
I/O Handler threads (num.io.threads, default 8) dequeue requests from RequestChannel and route them through KafkaApis.handle():
// KafkaApis.handle() — central dispatch (Scala source, simplified)
def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
try {
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request, requestLocal)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request, requestLocal)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request, requestLocal)
case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionsToTxnRequest(request, requestLocal)
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request, requestLocal)
// 80+ ApiKey variants in total
case key => throw new IllegalStateException(s"Unexpected api key: $key")
}
} finally {
// Always send the response (even error responses) back through the Processor
sendResponseExemptThrottle(request, ...)
}
}
Each ApiKey corresponds to a distinct Kafka protocol operation. Producers use PRODUCE, consumers use FETCH, rebalancing uses JOIN_GROUP/SYNC_GROUP, metadata discovery uses METADATA, and transactions span several specialized ApiKeys.
Purgatory: The Waiting Room for Delayed Operations
The acks=all Challenge
When a producer sends with acks=all, the ProduceRequest cannot be acknowledged until all ISR members have written the data. This creates a problem: an I/O Handler thread cannot block in-place waiting for ISR acknowledgments. If it did, all handler threads would quickly become occupied waiting for network round-trips, leaving no threads to process incoming requests.
Purgatory (named after Dante's Purgatorio — a place of waiting) solves this with deferred completion:
ProduceRequest arrives at I/O Handler
↓
Write to local log (Leader) ← disk I/O, fast
↓
acks=all: need ISR confirmation
↓
Create DelayedProduce object
Register it in Purgatory (keyed on partition)
I/O Handler returns immediately → free to process next request
↓
[Later] Follower sends FetchRequest
Leader handles Fetch → sends records to Follower
Follower writes to local log → updates its LEO
↓
Leader's FetchResponse processing updates Follower LEO tracking
Checks: has min ISR count confirmed this offset?
YES ↓
Retrieve DelayedProduce from Purgatory
Build ProduceResponse with offset and timestamp
Route response back through Processor to Producer
This design keeps I/O Handler threads fully available even when hundreds of produce requests are waiting for ISR confirmation.
TimingWheel: O(1) Timeout Management
Every delayed operation in Purgatory has a deadline (request.timeout.ms). Managing expirations efficiently matters: a busy broker may have millions of in-flight delayed operations simultaneously.
Kafka uses a Hierarchical Timing Wheel rather than a PriorityQueue or DelayQueue:
| Operation | PriorityQueue | TimingWheel |
|---|---|---|
| Insert delayed task | O(log n) | O(1) |
| Cancel delayed task | O(log n) | O(1) |
| Advance clock (tick) | O(1) | O(1) |
The hierarchy (each level's tick = previous level's full period):
Level 1: tick=1ms, slots=20 → covers 20ms
Level 2: tick=20ms, slots=20 → covers 400ms
Level 3: tick=400ms, slots=20 → covers 8,000ms
Level 4: tick=8s, slots=20 → covers 160,000ms (~2.7min)
Tasks are placed in the wheel level appropriate to their deadline distance. As the clock advances, tasks in the current slot are either fired (expired) or moved to a lower-level wheel as they get closer to their deadline.
The Two Types of Delayed Operations
DelayedProduce: Waiting for ISR acknowledgment
- Completion condition: all ISR replicas' LEO ≥ the produce message's offset
- Timeout behavior: returns
REQUEST_TIMED_OUTerror to the producer - Typical wait time: a few milliseconds (same datacenter) to tens of milliseconds (cross-AZ)
DelayedFetch: Waiting for sufficient data accumulation (when fetch.min.bytes > 0)
- Completion condition: enough new data has been written to fill the fetch
- Timeout behavior: returns all currently available data, even if below
fetch.min.bytes - This is how Kafka efficiently implements "long polling" for consumers
# Monitor Purgatory depth via JMX
# Object: kafka.server:type=DelayedOperationPurgatory,delayedOperation=Produce,name=PurgatorySize
# Object: kafka.server:type=DelayedOperationPurgatory,delayedOperation=Fetch,name=PurgatorySize
# Under normal operations, both should be near 0 or small
# A large ProducePurgatory size indicates ISR is slow to acknowledge (replica lag)
# A large FetchPurgatory size is normal when consumers have fetch.min.bytes configured
# Query via JMX
java -jar kafka-jmx-tool.jar \
--jmx-url service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi \
--object-name "kafka.server:type=DelayedOperationPurgatory,delayedOperation=Produce,name=PurgatorySize"
Thread Pool Sizing: num.network.threads vs num.io.threads
num.network.threads (default: 3)
Processor threads are pure I/O threads: read bytes, parse protocol framing, write bytes. Their bottleneck is network bandwidth and connection count, not CPU computation.
When to increase:
- Broker has a high client connection count (e.g., > 5,000 connections)
- JMX metric
NetworkProcessorAvgIdlePercentconsistently below 30% - Request latency is high but I/O Handler queue is not full
Sizing guideline:
num.network.threads ≥ ceil(peak_connection_count / 1000)
For most production clusters with < 3,000 connections, the default of 3 is sufficient. For clusters serving many microservices, each with their own connection pool, this number grows.
# Monitor Processor thread utilization
# JMX: kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent
# Value range: 0.0 (fully busy) to 1.0 (completely idle)
# Alert threshold: below 0.3 (30% idle = 70% busy)
kafka-jmx-tool.jar \
--jmx-url service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi \
--object-name "kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent"
num.io.threads (default: 8)
I/O Handler threads execute the actual business logic: log writes, log reads, ISR tracking, group coordination, transaction management. These operations are CPU-intensive and may involve disk I/O.
When to increase:
- Disk I/O capacity is not the bottleneck (fast NVMe SSDs), but handler latency is still high
- JMX metric
RequestHandlerAvgIdlePercentconsistently below 30% - RequestChannel queue depth (
RequestQueueSize) trends towardqueued.max.requests
Sizing guidelines:
# I/O bound workloads (consumer fetches dominate):
num.io.threads = CPU_cores × 2
# CPU bound workloads (heavy compression enabled):
num.io.threads = CPU_cores
# Typical starting point for 16-core servers:
num.io.threads = 16
# Monitor I/O Handler utilization
# JMX: kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent
# Below 30% = consider increasing num.io.threads
# Monitor per-request-type latency (crucial for identifying bottlenecks)
# kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
# kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
# kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
Request Queue Full: What Actually Happens
RequestChannel is a bounded queue with capacity queued.max.requests (default: 500). When the queue fills:
New request parsed by Processor
↓
Attempt to enqueue to RequestChannel
↓ QUEUE FULL
Processor cannot enqueue
↓
Processor stops reading from that socket connection
↓
Unread bytes accumulate in OS TCP receive buffer for that socket
↓
TCP receive buffer fills → TCP flow control kicks in (zero window advertisement)
↓
Client's TCP send buffer fills → client's write operations block
↓
Client eventually receives connection timeout or request timeout
This is TCP backpressure propagation — the overload signal naturally flows back to the caller without an explicit "reject" response. Clients experience this as TimeoutException or connection stalls rather than an explicit error code.
To proactively detect this before clients start timing out:
# Alert when RequestChannel approaches capacity
# JMX: kafka.network:type=RequestChannel,name=RequestQueueSize
# Alert threshold: > (queued.max.requests * 0.8) for more than 1 minute
# Also monitor response queue (Processor → client)
# JMX: kafka.network:type=RequestChannel,name=ResponseQueueSize,processor=0
Complete Request Lifecycle: End-to-End Timing
Here is the complete journey of a PRODUCE request with acks=all, from client send to acknowledgment:
T=0ms [Client] Producer.send() called, serialized record placed in RecordAccumulator
T=0.2ms [Client] Sender thread batches record, sends ProduceRequest over TCP
T=0.3ms [Acceptor] New connection registered (or existing connection already tracked)
T=0.5ms [Processor-0] selector.select() returns: connection has readable data
Read bytes from SocketChannel
Parse Kafka protocol: API key=PRODUCE, version=9
Build RequestChannel.Request object
Enqueue to RequestChannel
T=1.5ms [Handler-3] Dequeue Request from RequestChannel
KafkaApis.handle() → handleProduceRequest()
Validate: authentication, authorization, quota
Write to LogManager: append to leader partition log
fsync() if flush.messages=1 (or relies on OS page cache)
T=3ms [Handler-3] Local write complete (LEO advanced)
acks=all: create DelayedProduce, register in Purgatory
Handler-3 freed → picks up next request from queue
T=4ms [Follower-1] Follower's fetch loop issues FetchRequest to Leader
T=4.5ms [Handler-5] Process FetchRequest, send response with new records
T=7ms [Follower-1] Follower writes records to local log, advances its LEO
Follower's next FetchRequest carries updated LEO
T=7.5ms [Handler-2] Process Follower-1's next FetchRequest
Update ISR tracking: Follower-1 is now caught up
Check DelayedProduce completion: Follower-2 still behind
T=9ms [Follower-2] Same as Follower-1 above
T=9.5ms [Handler-1] Update ISR tracking: Follower-2 caught up
All ISR members confirmed → DelayedProduce complete
Build ProduceResponse: partition=0, offset=1000050, timestamp=...
Post Response to Processor-0's response queue
T=10ms [Processor-0] Selector detects write-ready on producer's socket
Write ProduceResponse bytes to SocketChannel
T=10.2ms [Client] Producer receives response, Future<RecordMetadata> completes
RecordMetadata: topic=orders, partition=0, offset=1000050
Total end-to-end: ~10ms (same datacenter, SSDs)
Cross-AZ ISR replication adds 1-5ms per Follower round-trip
Understanding this pipeline tells you exactly where to look in JMX metrics when P99 latency spikes:
- High
RequestQueueTimeMs→ I/O Handler threads are the bottleneck (increasenum.io.threads) - High
LocalTimeMs→ disk write is slow (check disk throughput andlog.flush.interval.messages) - High
RemoteTimeMs→ Purgatory wait is long (ISR replication is slow, checkreplica.lag.time.max.msand follower network/disk) - High
ResponseQueueTimeMs→ Processor write-back is slow (consider increasingnum.network.threads) - High
ResponseSendTimeMs→ client TCP buffers are full (client is slow to read, or client-side backpressure)