Chapter 13

Network Layer: Reactor Model and Request Pipeline

Network Layer: Reactor Model and Request Pipeline

Kafka's ability to handle hundreds of thousands of concurrent connections on a single broker stems from its carefully engineered network layer architecture. Understanding this architecture not only helps with parameter tuning but also enables rapid root-cause identification when network-related problems surface in production. This chapter starts with the fundamentals of the Reactor pattern and progressively unpacks Kafka's complete request processing pipeline.

Why Reactor: The Fundamental Limitation of Thread-per-Connection

Where the Thread Model Breaks Down

The most intuitive server design is thread-per-connection: for each client that connects, the server spawns a dedicated thread to handle all I/O and business logic for that connection. This model is simple to reason about and works well when connection counts are modest.

In Kafka's operating environment, it fails at a fundamental level:

Memory wall: A Java thread's default stack size is 512KB to 1MB. 1,000 connections require 500MB to 1GB of stack memory. 10,000 connections demand 5 to 10GB — consumed entirely by idle thread stacks doing nothing but waiting for I/O.

Context switch overhead: The OS kernel must schedule and switch between all active threads. When the vast majority of threads are blocked on network I/O — which is most of the time — the CPU wastes most of its cycles on context switching rather than actual computation.

I/O wait waste: Network I/O is inherently wait-heavy: send a request, wait for data, receive a response. A blocked thread consumes memory and a scheduler slot while contributing nothing.

Measurement studies on I/O-bound servers consistently show that threads spend less than 5% of their time doing useful computation — the remainder is waiting.

The Reactor Pattern: Event-Driven I/O

The Reactor pattern (also called Event Loop or I/O multiplexing) solves this with a simple insight: use a small fixed number of threads to monitor large numbers of connections, investing CPU only when a connection has actual work to do.

This relies on OS-level I/O multiplexing primitives:

// Conceptual Reactor implementation (illustrative, not Kafka source)
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);  // non-blocking is essential
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
    // Block until at least one channel has an event — this is epoll_wait()
    int readyCount = selector.select();

    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
    while (keys.hasNext()) {
        SelectionKey key = keys.next();
        keys.remove();

        if (key.isAcceptable()) {
            // New connection arrived — register it for read events
            SocketChannel client = serverChannel.accept();
            client.configureBlocking(false);
            client.register(selector, SelectionKey.OP_READ);
            // No new thread created — this same thread handles thousands of connections
        } else if (key.isReadable()) {
            // Data is available on this connection
            handleRead((SocketChannel) key.channel());
        } else if (key.isWritable()) {
            // Socket buffer has space — write pending response
            handleWrite((SocketChannel) key.channel());
        }
    }
}

A single Selector can monitor thousands of connections. CPU resources are consumed only when genuine I/O events occur, with zero per-connection thread overhead.

Kafka's Three-Layer Network Architecture

Kafka implements a refined multi-reactor variant, located in the kafka.network package. The architecture has three distinct layers:

Client Connections (potentially thousands)
         │
         ▼
┌──────────────────────────────────────┐
│          Acceptor Thread (1)          │
│  - Listens on configured port(s)     │
│  - Accepts new TCP connections       │
│  - Round-robin distributes to        │
│    Processor threads                 │
└──────────────┬───────────────────────┘
               │ distribute new connections
    ┌──────────┼──────────┐
    ▼          ▼          ▼
┌───────┐  ┌───────┐  ┌───────┐
│  P-0  │  │  P-1  │  │  P-2  │   Processor Threads (num.network.threads, default 3)
│Selector│  │Selector│  │Selector│  Each owns its Selector, manages a connection subset
│read/  │  │read/  │  │read/  │  Reads raw bytes → parses into Requests
│write  │  │write  │  │write  │  Writes Responses back to sockets
└───┬───┘  └───┬───┘  └───┬───┘
    └──────────┴──────────┘
               │ enqueue parsed Requests
               ▼
┌──────────────────────────────────────┐
│         RequestChannel               │
│  (bounded blocking queue,            │
│   capacity = queued.max.requests)    │
└──────────────┬───────────────────────┘
               │ dequeue Requests
    ┌──────────┼──────────┬──────────┐
    ▼          ▼          ▼          ▼
┌───────┐  ┌───────┐  ┌───────┐  ┌───────┐
│  H-0  │  │  H-1  │  │  H-2  │  │  H-7  │  I/O Handler Threads (num.io.threads, default 8)
│KafkaApis  KafkaApis  KafkaApis  KafkaApis  Execute actual business logic:
│       │  │       │  │       │  │       │  log writes, log reads, ISR updates,
└───────┘  └───────┘  └───────┘  └───────┘  group coordination, etc.

Layer 1: The Acceptor Thread

The Acceptor class (kafka.network.Acceptor) has exactly one responsibility: accept new TCP connections and hand them off. It maintains a ServerSocketChannel for each configured listener (e.g., PLAINTEXT://0.0.0.0:9092, SSL://0.0.0.0:9093).

On accepting a new connection, it assigns it to one of the Processor threads using round-robin. The Acceptor never reads or writes data — it only establishes connections. Consequently, it never becomes a bottleneck.

# Confirm Acceptor thread existence via thread dump
jstack $(pgrep -f kafka.Kafka) | grep -A 3 "Acceptor"
# Expected output:
# "kafka-network-thread-1-Acceptor" #43 daemon prio=5 os_prio=0
#    java.lang.Thread.State: RUNNABLE
#    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)  ← blocked in epoll_wait

Layer 2: Processor Threads (N threads)

Each Processor owns an independent Selector and manages its assigned subset of connections. It interleaves two activities:

Reading phase:

  1. selector.select() detects which connections have data available
  2. Read bytes from SocketChannel using the Kafka binary protocol framing (4-byte length prefix + payload)
  3. Assemble complete requests from potentially fragmented reads
  4. Wrap in a RequestChannel.Request object and enqueue to RequestChannel

Writing phase:

  1. I/O Handlers post completed Response objects back to the Processor's response queue
  2. Processor detects pending responses and writes them to the appropriate SocketChannel
# View all Processor threads (one per num.network.threads, per listener)
jstack $(pgrep -f kafka.Kafka) | grep "kafka-network-thread"
# "kafka-network-thread-1-ListenerName(PLAINTEXT)-0"
# "kafka-network-thread-1-ListenerName(PLAINTEXT)-1"
# "kafka-network-thread-1-ListenerName(PLAINTEXT)-2"
# (3 threads for a broker with num.network.threads=3 and one PLAINTEXT listener)

Layer 3: I/O Handler Threads + KafkaApis Dispatch

I/O Handler threads (num.io.threads, default 8) dequeue requests from RequestChannel and route them through KafkaApis.handle():

// KafkaApis.handle() — central dispatch (Scala source, simplified)
def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
  try {
    request.header.apiKey match {
      case ApiKeys.PRODUCE            => handleProduceRequest(request, requestLocal)
      case ApiKeys.FETCH              => handleFetchRequest(request)
      case ApiKeys.LIST_OFFSETS       => handleListOffsetRequest(request, requestLocal)
      case ApiKeys.METADATA           => handleTopicMetadataRequest(request)
      case ApiKeys.JOIN_GROUP         => handleJoinGroupRequest(request, requestLocal)
      case ApiKeys.SYNC_GROUP         => handleSyncGroupRequest(request, requestLocal)
      case ApiKeys.HEARTBEAT          => handleHeartbeatRequest(request)
      case ApiKeys.LEAVE_GROUP        => handleLeaveGroupRequest(request)
      case ApiKeys.OFFSET_COMMIT      => handleOffsetCommitRequest(request, requestLocal)
      case ApiKeys.OFFSET_FETCH       => handleOffsetFetchRequest(request, requestLocal)
      case ApiKeys.DESCRIBE_GROUPS    => handleDescribeGroupRequest(request)
      case ApiKeys.CREATE_TOPICS      => handleCreateTopicsRequest(request)
      case ApiKeys.DELETE_TOPICS      => handleDeleteTopicsRequest(request, requestLocal)
      case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionsToTxnRequest(request, requestLocal)
      case ApiKeys.INIT_PRODUCER_ID   => handleInitProducerIdRequest(request, requestLocal)
      // 80+ ApiKey variants in total
      case key => throw new IllegalStateException(s"Unexpected api key: $key")
    }
  } finally {
    // Always send the response (even error responses) back through the Processor
    sendResponseExemptThrottle(request, ...)
  }
}

Each ApiKey corresponds to a distinct Kafka protocol operation. Producers use PRODUCE, consumers use FETCH, rebalancing uses JOIN_GROUP/SYNC_GROUP, metadata discovery uses METADATA, and transactions span several specialized ApiKeys.

Purgatory: The Waiting Room for Delayed Operations

The acks=all Challenge

When a producer sends with acks=all, the ProduceRequest cannot be acknowledged until all ISR members have written the data. This creates a problem: an I/O Handler thread cannot block in-place waiting for ISR acknowledgments. If it did, all handler threads would quickly become occupied waiting for network round-trips, leaving no threads to process incoming requests.

Purgatory (named after Dante's Purgatorio — a place of waiting) solves this with deferred completion:

ProduceRequest arrives at I/O Handler
          ↓
Write to local log (Leader)   ← disk I/O, fast
          ↓
acks=all: need ISR confirmation
          ↓
Create DelayedProduce object
Register it in Purgatory (keyed on partition)
I/O Handler returns immediately → free to process next request
          ↓
[Later] Follower sends FetchRequest
Leader handles Fetch → sends records to Follower
Follower writes to local log → updates its LEO
          ↓
Leader's FetchResponse processing updates Follower LEO tracking
Checks: has min ISR count confirmed this offset?
   YES ↓
Retrieve DelayedProduce from Purgatory
Build ProduceResponse with offset and timestamp
Route response back through Processor to Producer

This design keeps I/O Handler threads fully available even when hundreds of produce requests are waiting for ISR confirmation.

TimingWheel: O(1) Timeout Management

Every delayed operation in Purgatory has a deadline (request.timeout.ms). Managing expirations efficiently matters: a busy broker may have millions of in-flight delayed operations simultaneously.

Kafka uses a Hierarchical Timing Wheel rather than a PriorityQueue or DelayQueue:

Operation PriorityQueue TimingWheel
Insert delayed task O(log n) O(1)
Cancel delayed task O(log n) O(1)
Advance clock (tick) O(1) O(1)

The hierarchy (each level's tick = previous level's full period):

Level 1: tick=1ms,   slots=20 → covers   20ms
Level 2: tick=20ms,  slots=20 → covers  400ms
Level 3: tick=400ms, slots=20 → covers 8,000ms
Level 4: tick=8s,    slots=20 → covers 160,000ms (~2.7min)

Tasks are placed in the wheel level appropriate to their deadline distance. As the clock advances, tasks in the current slot are either fired (expired) or moved to a lower-level wheel as they get closer to their deadline.

The Two Types of Delayed Operations

DelayedProduce: Waiting for ISR acknowledgment

DelayedFetch: Waiting for sufficient data accumulation (when fetch.min.bytes > 0)

# Monitor Purgatory depth via JMX
# Object: kafka.server:type=DelayedOperationPurgatory,delayedOperation=Produce,name=PurgatorySize
# Object: kafka.server:type=DelayedOperationPurgatory,delayedOperation=Fetch,name=PurgatorySize

# Under normal operations, both should be near 0 or small
# A large ProducePurgatory size indicates ISR is slow to acknowledge (replica lag)
# A large FetchPurgatory size is normal when consumers have fetch.min.bytes configured

# Query via JMX
java -jar kafka-jmx-tool.jar \
  --jmx-url service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi \
  --object-name "kafka.server:type=DelayedOperationPurgatory,delayedOperation=Produce,name=PurgatorySize"

Thread Pool Sizing: num.network.threads vs num.io.threads

num.network.threads (default: 3)

Processor threads are pure I/O threads: read bytes, parse protocol framing, write bytes. Their bottleneck is network bandwidth and connection count, not CPU computation.

When to increase:

Sizing guideline:

num.network.threads ≥ ceil(peak_connection_count / 1000)

For most production clusters with < 3,000 connections, the default of 3 is sufficient. For clusters serving many microservices, each with their own connection pool, this number grows.

# Monitor Processor thread utilization
# JMX: kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent
# Value range: 0.0 (fully busy) to 1.0 (completely idle)
# Alert threshold: below 0.3 (30% idle = 70% busy)

kafka-jmx-tool.jar \
  --jmx-url service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi \
  --object-name "kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent"

num.io.threads (default: 8)

I/O Handler threads execute the actual business logic: log writes, log reads, ISR tracking, group coordination, transaction management. These operations are CPU-intensive and may involve disk I/O.

When to increase:

Sizing guidelines:

# I/O bound workloads (consumer fetches dominate):
num.io.threads = CPU_cores × 2

# CPU bound workloads (heavy compression enabled):
num.io.threads = CPU_cores

# Typical starting point for 16-core servers:
num.io.threads = 16
# Monitor I/O Handler utilization
# JMX: kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent
# Below 30% = consider increasing num.io.threads

# Monitor per-request-type latency (crucial for identifying bottlenecks)
# kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
# kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
# kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower

Request Queue Full: What Actually Happens

RequestChannel is a bounded queue with capacity queued.max.requests (default: 500). When the queue fills:

New request parsed by Processor
          ↓
Attempt to enqueue to RequestChannel
          ↓ QUEUE FULL
Processor cannot enqueue
          ↓
Processor stops reading from that socket connection
          ↓
Unread bytes accumulate in OS TCP receive buffer for that socket
          ↓
TCP receive buffer fills → TCP flow control kicks in (zero window advertisement)
          ↓
Client's TCP send buffer fills → client's write operations block
          ↓
Client eventually receives connection timeout or request timeout

This is TCP backpressure propagation — the overload signal naturally flows back to the caller without an explicit "reject" response. Clients experience this as TimeoutException or connection stalls rather than an explicit error code.

To proactively detect this before clients start timing out:

# Alert when RequestChannel approaches capacity
# JMX: kafka.network:type=RequestChannel,name=RequestQueueSize
# Alert threshold: > (queued.max.requests * 0.8) for more than 1 minute

# Also monitor response queue (Processor → client)
# JMX: kafka.network:type=RequestChannel,name=ResponseQueueSize,processor=0

Complete Request Lifecycle: End-to-End Timing

Here is the complete journey of a PRODUCE request with acks=all, from client send to acknowledgment:

T=0ms    [Client]     Producer.send() called, serialized record placed in RecordAccumulator
T=0.2ms  [Client]     Sender thread batches record, sends ProduceRequest over TCP

T=0.3ms  [Acceptor]   New connection registered (or existing connection already tracked)

T=0.5ms  [Processor-0] selector.select() returns: connection has readable data
                        Read bytes from SocketChannel
                        Parse Kafka protocol: API key=PRODUCE, version=9
                        Build RequestChannel.Request object
                        Enqueue to RequestChannel

T=1.5ms  [Handler-3]  Dequeue Request from RequestChannel
                        KafkaApis.handle() → handleProduceRequest()
                        Validate: authentication, authorization, quota
                        Write to LogManager: append to leader partition log
                        fsync() if flush.messages=1 (or relies on OS page cache)

T=3ms    [Handler-3]  Local write complete (LEO advanced)
                        acks=all: create DelayedProduce, register in Purgatory
                        Handler-3 freed → picks up next request from queue

T=4ms    [Follower-1] Follower's fetch loop issues FetchRequest to Leader
T=4.5ms  [Handler-5]  Process FetchRequest, send response with new records
T=7ms    [Follower-1] Follower writes records to local log, advances its LEO
                        Follower's next FetchRequest carries updated LEO

T=7.5ms  [Handler-2]  Process Follower-1's next FetchRequest
                        Update ISR tracking: Follower-1 is now caught up
                        Check DelayedProduce completion: Follower-2 still behind

T=9ms    [Follower-2] Same as Follower-1 above

T=9.5ms  [Handler-1]  Update ISR tracking: Follower-2 caught up
                        All ISR members confirmed → DelayedProduce complete
                        Build ProduceResponse: partition=0, offset=1000050, timestamp=...
                        Post Response to Processor-0's response queue

T=10ms   [Processor-0] Selector detects write-ready on producer's socket
                        Write ProduceResponse bytes to SocketChannel

T=10.2ms [Client]     Producer receives response, Future<RecordMetadata> completes
                        RecordMetadata: topic=orders, partition=0, offset=1000050

Total end-to-end: ~10ms (same datacenter, SSDs)
Cross-AZ ISR replication adds 1-5ms per Follower round-trip

Understanding this pipeline tells you exactly where to look in JMX metrics when P99 latency spikes:

Rate this chapter
4.7  / 5  (24 ratings)

💬 Comments