Chapter 2

Architecture: The Complete Journey of a Message

Why Trace a Single Message?

The most effective way to understand a distributed system is to pick one specific operation and follow its execution path layer by layer. For Kafka, that operation is: "one message from producer.send() to consumer.poll() returning it."

This path crosses seven major components, involves at least five thread handoffs, two network I/O rounds, and three distinct Broker subsystems. Every hop has a measurable latency cost. Understanding what each hop does — and why — is the knowledge base for optimizing Kafka performance and diagnosing latency problems.

We'll trace a message through a typical topology:

Topic: order-events, Partition: 2, Replication Factor: 3
Producer → Broker-1 (Leader) → Broker-2, Broker-3 (Followers) → Consumer

Producer Side: From send() to Bytes on the Wire

The Interceptor Chain

When you call producer.send(record), the first stop is the producer interceptor chain (ProducerInterceptor). Interceptors are applied in order; each can mutate the record (add headers, modify key/value) or record metadata (for distributed tracing, monitoring).

public class TracingInterceptor implements ProducerInterceptor<String, Order> {
    @Override
    public ProducerRecord<String, Order> onSend(ProducerRecord<String, Order> record) {
        // Inject trace context into message headers
        record.headers().add("trace-id",
            UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
        record.headers().add("send-timestamp",
            Long.toString(System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8));
        return record;  // return mutated record — or a new record
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (metadata != null) {
            // Called after the broker ACKs the batch containing this record
            metricsRegistry.record("produce.broker.latency",
                System.currentTimeMillis() - metadata.timestamp());
        }
    }
}

Interceptors execute synchronously in the calling thread — no thread switch yet.

Serialization

After interceptors, the key and value are converted to byte arrays by their respective Serializer implementations. Kafka ships with StringSerializer, ByteArraySerializer, IntegerSerializer, and others. Production systems typically use KafkaAvroSerializer (with Schema Registry) or custom Protobuf serializers.

Serialization is CPU-bound. For Avro with Schema Registry, the first serialization involves an HTTP call to fetch/register the schema (subsequent calls hit the local cache). Choose compact, fast formats for high-throughput pipelines: Protobuf is generally fastest, then Avro, then JSON. The difference matters at 500K+ msg/s.

Partitioning

After serialization, the record needs to be assigned to a partition. If the record has an explicit key, the default behavior is murmur2(key) % numPartitions, ensuring records with the same key always go to the same partition (and thus are ordered). If there's no key, Kafka 3.3+ uses the StickyPartitioner.

The StickyPartitioner was introduced to fix a fundamental throughput problem with the old round-robin approach:

Old Round-Robin: Each keyless message rotates to a different partition. The RecordAccumulator's batch for each partition fills slowly; the Sender thread sends many small batches. High request count, low throughput.

StickyPartitioner: The partitioner "sticks" to one partition until its batch fills (batch.size bytes) or the batch times out (linger.ms), then switches to the next partition. Larger batches, fewer requests, significantly higher throughput. Benchmarks show 2–3x throughput improvement for keyless high-volume producers.

// Custom partitioner: route VIP orders to a dedicated high-priority partition
public class OrderTypePartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        int numPartitions = cluster.partitionCountForTopic(topic);
        Order order = (Order) value;
        if (order.getType() == OrderType.VIP) {
            // VIP orders always go to the last partition — scale this partition's
            // consumer independently if needed
            return numPartitions - 1;
        }
        // Regular orders partitioned by orderId for per-order ordering guarantee
        return Math.abs(MurmurHash2.hash32(order.getOrderId())) % (numPartitions - 1);
    }

    @Override
    public void close() {}
    @Override
    public void configure(Map<String, ?> configs) {}
}

RecordAccumulator: The Batching Engine

Once a partition is chosen, the serialized record is appended to the RecordAccumulator — the producer's in-memory buffer. Internally it's a ConcurrentMap<TopicPartition, Deque<ProducerBatch>>. Each TopicPartition has a deque of batches; the tail batch is actively being filled.

RecordAccumulator memory layout:
  order-events-0: [ProducerBatch(16KB, FULL)] → [ProducerBatch(9KB, FILLING)]
  order-events-1: [ProducerBatch(15KB, FULL)]
  order-events-2: [ProducerBatch(4KB, FILLING)]

A ProducerBatch holds records in V2 RecordBatch binary format (covered in Chapter 5). When a batch reaches batch.size (default 16KB) or has been waiting longer than linger.ms (default 0ms), it transitions to "ready" and waits for the Sender thread.

The BufferPool: The RecordAccumulator maintains a pre-allocated memory pool of buffer.memory bytes (default 32MB). When a new batch is needed, it claims one batch.size chunk from the pool. When a batch is sent and acknowledged, the chunk is returned. This pool eliminates GC pressure from frequent ByteBuffer allocation — critical for consistent throughput at high rates.

If the pool is exhausted (producer faster than sender), send() blocks up to max.block.ms (default 60s) waiting for memory to be returned. Beyond that, BufferExhaustedException is thrown. This is an important backpressure signal: if you're hitting it, either your broker is slow, your linger.ms is too low, or your buffer.memory is too small.

The Sender Thread: Crossing from Application to Network

The Sender is a dedicated background daemon thread — arguably the most important architectural decision in the producer's design. It runs independently of the application thread, continuously draining the RecordAccumulator.

Sender's main loop (simplified):

while (running) {
    // 1. Find partitions with ready batches
    ReadyCheckResult ready = accumulator.ready(cluster, now);

    // 2. Establish connections to brokers that aren't yet connected
    for (Node node : ready.readyNodes) {
        client.maybeConnect(node);
    }

    // 3. Group batches by broker (each broker handles multiple partitions)
    Map<Integer, List<ProducerBatch>> batches = accumulator.drain(
        cluster, ready.readyNodes, maxRequestSize, now);

    // 4. Build and enqueue ProduceRequests
    for (Map.Entry<Integer, List<ProducerBatch>> entry : batches.entrySet()) {
        ClientRequest request = buildProduceRequest(entry.getKey(), entry.getValue());
        client.send(request, now);  // writes to socket send buffer
    }

    // 5. Execute NIO I/O — sends queued requests, reads responses
    client.poll(pollTimeout, now);
}

The Sender uses Java NIO's Selector (backed by epoll on Linux, kqueue on macOS) to manage connections to all brokers from a single thread. This works because Kafka's workload is I/O-bound, not compute-bound at the network layer.

In-flight requests: max.in.flight.requests.per.connection (default 5) limits how many ProduceRequests can be outstanding per broker connection. Higher values pipeline more requests, improving throughput. However, if a request fails and is retried, a later request that already succeeded creates a reordering gap. With enable.idempotence=true, the broker detects and handles out-of-order retries using producer ID + sequence numbers, so you can safely set this to 5 even with retries.

Latency at this stage (typical values):

Network Layer: TCP Connection Pool

Connection Lifecycle

Kafka's protocol runs directly over TCP — no HTTP, no Thrift, no gRPC overhead. The producer establishes one TCP connection per broker it needs to talk to, then multiplexes all produce traffic for that broker's partitions over that single connection. Connections are kept alive and reused; connections.max.idle.ms (default 9 minutes) closes idle connections.

The multiplexing works via CorrelationId: each request frame contains a 4-byte CorrelationId assigned by the sender, and the response frame echoes it back. This allows the Sender thread to match responses to requests even when multiple requests are in-flight on the same connection.

ProduceRequest on the Wire

The Sender writes a ProduceRequest to the socket's send buffer (send.buffer.bytes, default 128KB). The OS TCP stack handles segmentation, retransmission, and congestion control. Kafka makes no assumptions about delivery timing; it relies on TCP's reliability guarantees and handles timeouts at the application layer (request.timeout.ms, default 30s).

Broker Side: From Bytes to Persistence

Network Layer: Acceptor, Processors, and I/O Threads

Kafka's broker network architecture is a textbook Reactor pattern implementation, split into three thread pools:

Acceptor Thread (1 per listener): Runs ServerSocketChannel.accept() in a loop, accepting new TCP connections and distributing them to Processor threads in round-robin fashion. This thread does nothing but accept — it never reads data.

Processor Threads (num.network.threads, default 3): Each Processor owns a Selector and a set of client connections. It reads bytes from sockets, assembles complete request frames (length-prefixed, so it knows when a full request has arrived), and places them on the RequestChannel.requestQueue (a bounded ArrayBlockingQueue). It also reads from per-Processor response queues and writes responses back to clients.

I/O Threads (num.io.threads, default 8): The workhorses. They dequeue requests from RequestChannel.requestQueue, execute the business logic (auth, validation, log writes, leader checks), and place responses on the appropriate Processor's response queue.

Client             Acceptor          Processor[0]           I/O Thread[3]
  |                   |                    |                      |
  |-- TCP SYN ------> |                    |                      |
  |                   |-- assign to P[0] ->|                      |
  |-- ProduceReq -----|-------------------->|                      |
  |                   |                    |-- requestQueue ------>|
  |                   |                    |                      |-- KafkaApis.handle()
  |                   |                    |                      |-- LogSegment.append()
  |                   |                    |<-- responseQueue ----|
  |<-- ProduceResp----|<-------------------|                      |

This separation is critical: I/O threads can block on disk writes or lock contention without starving the Processor threads from reading new requests. And Processor threads can be busy with network I/O without blocking the actual business logic.

Request Handling: From Queue to Log

The I/O thread calls KafkaApis.handle(request), which dispatches based on the ApiKey. For a ProduceRequest (ApiKey=0, v9):

KafkaApis.handleProduceRequest()
  ├─ Parse TransactionalId, Acks, TimeoutMs, TopicData
  ├─ Auth check: does this client have Write permission on each topic?
  ├─ For each partition in the request:
  │   └─ ReplicaManager.appendRecords()
  │        ├─ Is this broker the leader for this partition?
  │        ├─ LogValidator.validateRecords()
  │        │   ├─ Magic byte version check
  │        │   ├─ CRC32C integrity check
  │        │   ├─ Idempotent sequence number check (if producer ID present)
  │        │   └─ Timestamp validation
  │        └─ UnifiedLog.append()
  │             └─ LogSegment.append()
  │                  └─ FileChannel.write(ByteBuffer) → OS Page Cache
  └─ If acks=0: respond immediately
     If acks=1: respond after leader append
     If acks=-1: register DelayedProduce, wait for ISR confirmation

LogSegment and the Page Cache Strategy

The LogSegment.append() call does a FileChannel.write() — this writes to the OS page cache, not directly to disk. The data is now in memory (durable against process crashes but not power loss). The OS will flush to disk asynchronously based on dirty page ratios and vm.dirty_expire_centisecs.

This is a deliberate tradeoff: Kafka trades fsync-per-write for throughput. A single fsync() on a spinning disk costs ~10ms (a seek + rotational wait). Batching thousands of writes before fsync amortizes this cost to near zero. For most Kafka deployments, the replication factor (3x) provides data durability — even if a broker loses power before fsync, the other two replicas have the data.

If you need stronger single-broker durability guarantees: log.flush.interval.messages=1 forces fsync after every message. Expect throughput to drop by 50–100x.

The Sparse Index: Each LogSegment has a .index file mapping relative offsets to byte positions in the .log file. The index is "sparse" — it only records one entry per log.index.interval.bytes (default 4KB) of data written. To find offset N, Kafka binary-searches the index for the largest indexed offset ≤ N, then sequentially scans the .log file from that position. This works because sequential scan over a few KB is fast (cache-warm), and the index stays small (a 1TB segment with 4KB intervals has ~250M index entries = ~2GB index, still manageable).

Replication: From Leader to Followers

The Follower Fetch Loop

Kafka replication is pull-based: Followers continuously issue Fetch requests to their partition Leader, using the same Fetch protocol path as consumers. This design choice is elegant — the broker's FetchRequest handling code is shared between consumers and replica fetchers.

Each Follower runs a ReplicaFetcherThread per Leader it fetches from:

// ReplicaFetcherThread main loop (simplified)
while (true) {
    Map<TopicPartition, FetchRequest.PartitionData> partitionData = new HashMap<>();
    for (TopicPartition tp : assignedPartitions) {
        long fetchOffset = replicaLog(tp).logEndOffset();  // fetch from where we left off
        partitionData.put(tp, new FetchRequest.PartitionData(fetchOffset, ...));
    }

    FetchResponse response = leader.fetch(buildFetchRequest(partitionData));

    for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry
            : response.responseData().entrySet()) {
        TopicPartition tp = entry.getKey();
        Records records = entry.getValue().records();
        replicaLog(tp).appendAsFollower(records);  // append to local log
        updateFetchOffset(tp, records.nextOffset());
    }
    // Report fetch offsets back to Leader (used for HW calculation)
}

ISR and High Watermark

ISR (In-Sync Replica Set): The set of replicas considered "in sync" with the Leader. A replica is in-sync if its fetch offset is within replica.lag.time.max.ms (default 30s) of the Leader's end offset. If a Follower's fetch offset falls behind the threshold — because of GC pauses, disk slowness, or network problems — the Leader removes it from ISR and records the change in the KRaft metadata log.

High Watermark (HW): The highest offset that all ISR replicas have confirmed writing. Consumers can only read messages at offset ≤ HW. Messages between HW and the Leader's Log End Offset (LEO) are "committed but not consumer-visible" — they're written to the Leader and some Followers, but not yet confirmed by all ISR members.

Scenario: ISR = {Broker-1(Leader), Broker-2, Broker-3}

Broker-1 (Leader):  LEO = 100 (has written offsets 0-99)
Broker-2 (Follower): LEO = 98 (has offsets 0-97, last fetch reported to Leader)
Broker-3 (Follower): LEO = 100 (has offsets 0-99, last fetch reported to Leader)

HW = min(ISR LEOs) = min(100, 98, 100) = 98
Consumers can read offsets 0-97 only.

After Broker-2 fetches offsets 98-99 and reports LEO=100:
HW = min(100, 100, 100) = 100
Consumers can now read offsets 0-99.

The Leader advances HW when it processes a Fetch request from a Follower: it compares all known Follower LEOs against its own LEO and updates HW to the minimum across all ISR members. This is why HW advancement is bounded by the slowest ISR member.

acks and the Latency Trade-off

The acks producer configuration is fundamentally a durability-vs-latency dial:

acks=0: Fire and forget. Sender doesn't register a callback, ProduceRequest has no response. Lowest latency (network one-way), highest throughput. Data loss on any broker crash or network issue. Use only for metrics/logs where loss is acceptable.

acks=1: Wait for Leader's page cache write. Typical latency 2–5ms same-datacenter. If Leader crashes before Follower has replicated, the unacknowledged messages are lost (even though the producer received an ACK). This is a subtle but important failure mode.

acks=-1 (all): Wait for HW to advance past the produced records, meaning all ISR members have confirmed the write. Latency 5–20ms (dominated by one network round-trip to the farthest Follower). Zero data loss as long as min.insync.replicas is met.

min.insync.replicas: Works in concert with acks=-1. If the ISR shrinks below this value (default 1), the Leader refuses produce requests with NOT_ENOUGH_REPLICAS error. Setting min.insync.replicas=2 for a 3-replica topic means: even if one Follower is down, you're still safe; if two are down, production stops rather than risking data loss.

Consumer Side: From Bytes to Application

The Fetch Request and CompletedFetch

The consumer's poll() runs in the application thread. Internally it manages a Fetcher (in Kafka 3.x, refactored into FetchCollector + background fetch machinery) that maintains the state of in-progress fetch requests.

// Simplified KafkaConsumer.poll() internals
public ConsumerRecords<K, V> poll(Duration timeout) {
    long deadlineMs = System.currentTimeMillis() + timeout.toMillis();

    do {
        // 1. Check if we need to rejoin a consumer group (rebalance protocol)
        coordinator.poll(timeout);

        // 2. Send fetch requests to partition leaders for assigned partitions
        //    that don't already have in-flight requests
        fetcher.sendFetches();

        // 3. Execute NIO — send outgoing requests, receive incoming responses
        //    Blocks up to remaining timeout if no fetch results ready
        client.poll(remainingTime(deadlineMs));

        // 4. Parse any completed FetchResponses into ConsumerRecords
        Map<TopicPartition, List<ConsumerRecord<K,V>>> records =
            fetcher.fetchedRecords();

        if (!records.isEmpty()) {
            return new ConsumerRecords<>(records);
        }
    } while (System.currentTimeMillis() < deadlineMs);

    return ConsumerRecords.empty();
}

FetchRequest parameters that matter:

fetch.min.bytes (default 1): The broker won't respond until at least this many bytes are available. Set to 1MB for batch-processing consumers to reduce empty polls. Leave at 1 for latency-sensitive consumers.

fetch.max.wait.ms (default 500ms): Maximum time the broker waits for fetch.min.bytes to be satisfied. The consumer will receive an empty response after this time even if no data arrived.

max.partition.fetch.bytes (default 1MB): Maximum data returned per partition per fetch. If a single record is larger than this, it still comes through (you can't drop it), but subsequent records wait for the next fetch.

Deserialization and Consumer Interceptors

When fetchedRecords() is called, it processes CompletedFetch objects — the buffered raw bytes from broker responses. For each record:

  1. Deserialization: Value bytes → typed object via Deserializer<V>. Key bytes → Deserializer<K>.
  2. Consumer Interceptor: ConsumerInterceptor.onConsume() is called, allowing record mutation or monitoring.
  3. Return to application: ConsumerRecord<K,V> with offset, timestamp, headers, partition, and deserialized key/value.
// Full consumer loop with manual offset commit
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, OrderDeserializer.class);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100);

KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("order-events"));

try {
    while (true) {
        ConsumerRecords<String, Order> records = consumer.poll(Duration.ofMillis(200));
        for (ConsumerRecord<String, Order> record : records) {
            System.out.printf("offset=%d partition=%d key=%s order=%s%n",
                record.offset(), record.partition(), record.key(), record.value());
            processOrder(record.value());
        }
        // Commit after processing the whole batch
        consumer.commitSync();
    }
} finally {
    consumer.close();
}

End-to-End Latency Breakdown

Stage Typical Latency Dominant Factor
Interceptor chain 0.01–0.1ms CPU (in calling thread)
Serialization 0.01–0.5ms CPU + Schema Registry cache
Partitioning < 0.01ms CPU
RecordAccumulator wait (linger.ms=5) 0–5ms Configured wait
Sender: NIO write to socket 0.01–0.1ms CPU
TCP transit (same datacenter) 0.3–1ms Network RTT
Broker: Processor read + queue 0.01–0.5ms Thread scheduling
Broker: Auth + validation 0.05–0.2ms CPU
Broker: LogSegment.append() 0.1–0.5ms Page cache write
Follower fetch + append (acks=-1) 2–10ms Network RTT + follower disk
Broker: ProduceResponse to client 0.1–0.5ms Network
Consumer: poll() fetch latency 0–500ms fetch.max.wait.ms + data availability
Consumer: deserialization 0.01–0.2ms CPU
p99 end-to-end (acks=-1, linger.ms=5) ~15–35ms ISR replication
p99 end-to-end (acks=1, linger.ms=0) ~3–8ms Network RTT
p99 end-to-end (acks=0, linger.ms=0) ~1–3ms Network one-way

The consumer fetch latency deserves special attention. With fetch.min.bytes=1 and fetch.max.wait.ms=500, a consumer polling an empty topic waits up to 500ms before receiving an empty response. If you're producing infrequently and consuming with a tight latency requirement, set fetch.max.wait.ms to something small (10–50ms) and accept more empty poll requests.

Complete End-to-End Flow Diagram

Application Thread          Sender Thread        Broker I/O Thread[N]    Follower ReplicaFetcher
─────────────────────────────────────────────────────────────────────────────────────────────
producer.send(record)
  │
  ├─ Interceptors
  ├─ Serializer (key, value)
  ├─ Partitioner → Partition 2
  └─ RecordAccumulator.append()
         │ batch full or linger timeout
         ↓
                     Sender wakes
                     accumulator.drain() → ProduceRequest
                     KafkaClient.send() → socket buffer
                                               │
                                               Processor reads bytes
                                               assembles full request
                                               → requestQueue
                                                        │
                                                        I/O Thread dequeues
                                                        KafkaApis.handleProduce()
                                                        ReplicaManager.appendRecords()
                                                        LogSegment.append() → PageCache
                                                                                   │
                                                                    Follower fetches new data
                                                                    LogSegment.append() → PageCache
                                                                    reports LEO to Leader
                                                                                   │
                                                        HW advances (if acks=-1)
                                                        → responseQueue
                                               │
                                               Processor writes response
                     KafkaClient.poll() reads response
                     future.complete(RecordMetadata)
  │
  Callback invoked
─────────────────────────────────────────────────────────────────────────────────────────────
Consumer Application Thread
  │
  consumer.poll(200ms)
  ├─ coordinator.poll() (heartbeat)
  ├─ fetcher.sendFetches() → FetchRequest to Broker-1
  │                                  │
  │                          Processor reads FetchRequest
  │                          I/O Thread: LogSegment.read(offset, maxBytes)
  │                          (only returns data up to HW)
  │                          → FetchResponse with records
  │                                  │
  ├─ client.poll() receives FetchResponse
  ├─ CompletedFetch parsed
  ├─ Deserializer applied
  ├─ ConsumerInterceptor.onConsume()
  └─ Returns ConsumerRecords<String, Order>

Every chapter that follows is a deep dive into one segment of this path. Chapter 3 covers KRaft — the consensus protocol that manages the metadata (partition leaders, ISR membership, topic configs) that every step in this flow depends on. Chapter 4 dissects the binary encoding of the ProduceRequest and FetchRequest frames crossing the wire. Chapters 5 through 10 complete the picture.

Rate this chapter
4.6  / 5  (98 ratings)

💬 Comments