Chapter 14

Log Storage Engine: Why Kafka Is So Fast

Log Storage Engine: Why Kafka Is So Fast

Kafka's extreme throughput does not come from any single technology. It is the compound effect of several mutually reinforcing design decisions: sequential disk writes, OS page cache exploitation, zero-copy network transfer, and memory-mapped index access. Understanding these mechanisms is the prerequisite for optimizing Kafka deployments and diagnosing performance problems.

LogSegment File Structure: What's Inside a Partition Directory

Each Kafka partition corresponds to a directory on disk. Inside that directory, files are grouped into LogSegments โ€” each segment is a set of related files identified by a common base offset:

# Inspect a real partition directory
ls -lh /data/kafka/orders-0/

# Typical output for a busy production partition:
-rw-r--r-- 1 kafka kafka 1.0G Apr 26 00:00 00000000000000000000.log
-rw-r--r-- 1 kafka kafka  10M Apr 26 00:00 00000000000000000000.index
-rw-r--r-- 1 kafka kafka  10M Apr 26 00:00 00000000000000000000.timeindex
-rw-r--r-- 1 kafka kafka    0 Apr 26 00:00 00000000000000000000.txnindex
-rw-r--r-- 1 kafka kafka 1.0G Apr 26 01:00 00000000000010485760.log
-rw-r--r-- 1 kafka kafka  10M Apr 26 01:00 00000000000010485760.index
-rw-r--r-- 1 kafka kafka  10M Apr 26 01:00 00000000000010485760.timeindex
-rw-r--r-- 1 kafka kafka    0 Apr 26 01:00 00000000000010485760.txnindex
-rw-r--r-- 1 kafka kafka 542M Apr 26 10:33 00000000000020971520.log   โ† active segment
-rw-r--r-- 1 kafka kafka   5M Apr 26 10:33 00000000000020971520.index
-rw-r--r-- 1 kafka kafka   5M Apr 26 10:33 00000000000020971520.timeindex
-rw-r--r-- 1 kafka kafka    0 Apr 26 10:33 00000000000020971520.txnindex
-rw-r--r-- 1 kafka kafka   0  Apr 26 10:33 leader-epoch-checkpoint
-rw-r--r-- 1 kafka kafka  60  Apr 26 10:33 partition.metadata

The 20-digit number in each filename is the base offset of the first message in that segment.

The .log File: Raw Message Data

The .log file is an append-only record store. Messages are stored in RecordBatch format (Magic V2, introduced in Kafka 0.11):

RecordBatch layout:
โ”œโ”€โ”€ baseOffset         (8 bytes) โ€” offset of the first record in this batch
โ”œโ”€โ”€ batchLength        (4 bytes) โ€” total byte length of this batch
โ”œโ”€โ”€ partitionLeaderEpoch (4 bytes) โ€” leader generation (for consistency verification)
โ”œโ”€โ”€ magic              (1 byte)  โ€” format version, currently 2
โ”œโ”€โ”€ crc                (4 bytes) โ€” CRC32C checksum of everything after this field
โ”œโ”€โ”€ attributes         (2 bytes) โ€” compression type, timestamp type, transactional flag
โ”œโ”€โ”€ lastOffsetDelta    (4 bytes) โ€” (last record offset) - baseOffset
โ”œโ”€โ”€ baseTimestamp      (8 bytes) โ€” timestamp of the first record
โ”œโ”€โ”€ maxTimestamp       (8 bytes) โ€” maximum timestamp in this batch
โ”œโ”€โ”€ producerId         (8 bytes) โ€” for idempotent/transactional producers
โ”œโ”€โ”€ producerEpoch      (2 bytes) โ€” fencing zombie writers
โ”œโ”€โ”€ baseSequence       (4 bytes) โ€” starting sequence number for idempotency
โ”œโ”€โ”€ recordsCount       (4 bytes) โ€” number of records in this batch
โ””โ”€โ”€ Records[]:
    โ”œโ”€โ”€ length         (varint)
    โ”œโ”€โ”€ attributes     (int8)
    โ”œโ”€โ”€ timestampDelta (varint) โ€” delta from baseTimestamp (saves space)
    โ”œโ”€โ”€ offsetDelta    (varint) โ€” delta from baseOffset (saves space)
    โ”œโ”€โ”€ keyLength      (varint)
    โ”œโ”€โ”€ key            (bytes)
    โ”œโ”€โ”€ valueLength    (varint)
    โ”œโ”€โ”€ value          (bytes)
    โ””โ”€โ”€ headers[]               โ€” optional key-value metadata

Using variable-length integer encoding (varint) for per-record timestamp deltas and offset deltas means records within a batch are extremely compact โ€” the deltas are typically just 1-2 bytes each.

The .index File: Sparse Offset Index

The .index file is a sparse index โ€” not every record gets an entry. Instead, one index entry is written for every log.index.interval.bytes (default: 4096 bytes) of log data written.

Each index entry is exactly 8 bytes:

Index entries for segment with base offset 10,485,760:

Relative Offset    Physical Position
0                  0
157                4,096
314                8,192
471                12,288
628                16,384
...

Offset lookup procedure:

  1. Binary search across all LogSegment base offsets โ†’ identify which segment contains the target offset
  2. Binary search within that segment's .index file โ†’ find the nearest index entry (floor entry for target offset)
  3. Sequential scan of the .log file starting from the index entry's physical position โ†’ locate the exact record

The worst case for step 3 is scanning at most log.index.interval.bytes (4KB) of log data โ€” negligible for sequential reads.

The .timeindex File: Timestamp-Based Index

The .timeindex file has the same structure as .index but indexed by timestamp rather than offset:

Timeindex entry (12 bytes):
โ”œโ”€โ”€ timestamp      (8 bytes) โ€” record's timestamp
โ””โ”€โ”€ relative offset (4 bytes) โ€” corresponding offset in the segment

This index supports timestamp-based operations:

The .txnindex File: Transaction Abort Index

The .txnindex file records offset ranges of aborted transactions. When a consumer is configured with isolation.level=read_committed, it reads this file to determine which message ranges to skip.

If transactions are not used, this file remains empty (0 bytes), as shown in the directory listing above.

# Inspect index file contents
kafka-dump-log.sh \
  --files /data/kafka/orders-0/00000000000000000000.index \
  --index-sanity-check

# Output:
# Dumping /data/kafka/orders-0/00000000000000000000.index
# offset: 0 position: 0
# offset: 157 position: 4096
# offset: 314 position: 8192
# ...

# Decode actual message contents
kafka-dump-log.sh \
  --files /data/kafka/orders-0/00000000000000000000.log \
  --print-data-log | head -30

Why Sequential Writes Are Fast: Physics of Storage

HDD Physics

A traditional spinning hard disk's performance is governed by three mechanical factors:

  1. Seek time: moving the read/write head to the target track โ€” typically 3โ€“15ms
  2. Rotational latency: waiting for the target sector to rotate under the head โ€” at 7,200 RPM, average latency is ~4ms
  3. Transfer time: the actual data movement once the head is positioned

Random I/O: Each access requires seek + rotational latency = ~7โ€“20ms overhead per operation. This yields roughly 100โ€“200 IOPS, translating to ~0.1 MB/s for random 4K block writes.

Sequential I/O: No head movement; data occupies adjacent sectors. Throughput reaches 100โ€“200 MB/s. This is a 1,000ร— difference.

This massive gap is why append-only log design was not merely an optimization for Kafka โ€” for HDDs, it was a requirement to achieve any meaningful throughput at all.

SSD Characteristics

SSDs eliminate mechanical motion, dramatically reducing random I/O latency (SATA SSD: ~0.1ms, NVMe SSD: ~0.02ms). However, sequential access still holds significant advantages:

Write amplification: SSDs must erase in large blocks (typically 512KB to 4MB). Many small random writes trigger frequent block erasures โ€” each erase cycle degrades the block's write endurance and creates I/O stalls.

Parallelism: SSDs contain multiple NAND channels that can be written in parallel. Sequential writes can sustain this parallelism; scattered random writes cannot.

Practical gap: Even on NVMe SSDs, sequential write throughput typically exceeds random write throughput by 2โ€“5ร—, and sequential writes produce far less write amplification (longer SSD lifespan).

Kafka's append-only log guarantees sequential writes regardless of storage medium. On HDDs this is existential; on SSDs it extends device lifespan and maximizes channel parallelism.

Page Cache: Why Kafka Doesn't Manage Its Own Memory Cache

How the OS Page Cache Works

The Linux page cache is a kernel-managed buffer that sits between file system calls and physical storage:

Write path:
Application write() โ†’ copied into page cache โ†’ returns immediately to application
                           โ†“ (asynchronously, in background)
                    pdflush/kworker kernel threads write dirty pages to disk

Read path:
Application read() โ†’ check page cache โ†’ HIT:  return data directly from RAM (no disk I/O!)
                                        MISS: read from disk into page cache, then return

For a message broker like Kafka, the write-then-read pattern strongly favors the page cache: producers write messages, and consumers read them shortly after. If consumers are keeping up with producers โ€” the common case โ€” the messages they read are still in the page cache from the producer's write. Consumption becomes a memory-to-network transfer rather than a disk-to-network transfer.

Why Not a JVM Heap Cache?

Alternative: JVM heap-based cache (approach Kafka rejected):

Actual approach: rely on OS page cache:

# Check how much of the active log segment is in page cache
# Install pcstat: go install github.com/tobert/pcstat@latest
pcstat /data/kafka/orders-0/00000000000020971520.log

# Output:
# +---------------------------------------------------+----------------+--------+--------+---------+
# | Name                                              | Size (bytes)   | Pages  | Cached | Percent |
# |---------------------------------------------------+----------------+--------+--------+---------|
# | /data/kafka/orders-0/00000000000020971520.log     | 567,705,600    | 138600 | 138542 | 99.958% |
# +---------------------------------------------------+----------------+--------+--------+---------+
#
# โ†’ ~100% of the active segment is in page cache
# โ†’ Consumers are effectively reading from RAM

# Check total page cache usage on the broker
free -h
# Shows used/available memory โ€” "cached" column represents page cache

This is why Kafka's official guidance recommends limiting JVM heap to 4โ€“8 GB and letting the operating system use the remainder as page cache. On a 64GB broker, -Xmx6g leaves approximately 56GB available for caching active log data.

Zero-Copy: The sendfile() Optimization

Traditional Data Transfer Path (Without Zero-Copy)

When a consumer fetches data from a broker using conventional read/write system calls:

1. Broker calls read() system call
   โ†’ OS reads data from disk into page cache (if not already there)
   โ†’ OS copies data from page cache into Kafka's JVM heap buffer (user space)
   [CPU copy #1: kernel space โ†’ user space]

2. Kafka's Java code inspects/processes data (if needed)

3. Broker calls write() system call
   โ†’ OS copies data from JVM heap buffer into kernel socket send buffer
   [CPU copy #2: user space โ†’ kernel space]

4. OS uses DMA to transfer data from socket buffer to NIC
   [hardware DMA, no CPU involvement]

Total: 2 CPU memory copies + 4 user/kernel mode switches

sendfile(): Data Never Enters User Space

The sendfile() Linux system call was designed specifically for file-to-socket transfers:

Broker calls sendfile(socket_fd, file_fd, offset, length)
   โ†“
Kernel checks page cache for file data (reads from disk if needed)
   โ†“
Kernel directly DMA-transfers data from page cache to NIC send buffer
(entire operation stays in kernel space)

Total: 0 CPU memory copies (with scatter-gather DMA capable NIC)
       2 user/kernel mode switches (just the sendfile() call itself)

Visual comparison:

Traditional:
Disk โ†’ [DMA] โ†’ Page Cache โ†’ [CPU copy] โ†’ JVM Heap โ†’ [CPU copy] โ†’ Socket Buffer โ†’ [DMA] โ†’ NIC

Zero-copy (sendfile):
Disk โ†’ [DMA] โ†’ Page Cache โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€[DMA]โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€> NIC

Kafka implements zero-copy in Java via FileChannel.transferTo(), which maps to sendfile() on Linux:

// Conceptual representation of how Kafka sends log data to consumers
// (from kafka/core/src/main/scala/kafka/log/LogSegment.scala region)
public long transferTo(FileChannel sourceFile, long position, long count,
                       WritableByteChannel destSocket) throws IOException {
    // Java NIO FileChannel.transferTo() โ†’ JVM โ†’ Linux sendfile()
    // Data flows: page cache โ†’ NIC, bypassing JVM heap entirely
    return sourceFile.transferTo(position, count, destSocket);
}

When zero-copy doesn't apply: If the broker needs to decompress and recompress messages (when producer and consumer use different compression codecs, or when broker-side re-compression is configured), the data must enter user space for processing. This is why it's best to configure end-to-end compression with consistent codecs โ€” producer compresses, broker stores as-is, consumer decompresses โ€” allowing zero-copy for the broker-to-consumer path.

mmap: Memory-Mapped Access for Index Files

Kafka accesses its index files (.index and .timeindex) via memory-mapped I/O (mmap):

// Conceptual index file access pattern (from OffsetIndex.scala behavior)
MappedByteBuffer mmap = new RandomAccessFile(indexFile, "rw")
    .getChannel()
    .map(FileChannel.MapMode.READ_WRITE, 0, maxIndexSize);

// Binary search directly in memory โ€” no read() system calls
// OS transparently handles page faults (loads index pages from disk if not in cache)
public int lookup(long targetOffset) {
    int relativeTarget = (int)(targetOffset - baseOffset);
    int lo = 0;
    int hi = (mmap.position() / 8) - 1;  // each entry is 8 bytes

    while (lo <= hi) {
        int mid = (lo + hi) >>> 1;
        int relativeOffset = mmap.getInt(mid * 8);      // direct memory read
        int physicalPosition = mmap.getInt(mid * 8 + 4);

        if (relativeOffset < relativeTarget) {
            lo = mid + 1;
        } else if (relativeOffset > relativeTarget) {
            hi = mid - 1;
        } else {
            return physicalPosition;  // exact match
        }
    }
    // Return position of largest offset <= target
    return (hi >= 0) ? mmap.getInt(hi * 8 + 4) : -1;
}

With mmap, the index file appears to the application as a simple byte array. The kernel manages the mapping between virtual memory addresses and physical disk pages, transparently loading pages on demand. This eliminates all read() system call overhead for index access โ€” just pointer arithmetic and direct memory reads.

Log Recovery on Startup

Every time a Kafka broker starts, it performs a recovery scan on the last (active) LogSegment of each partition. This is necessary because a broker crash may have left incomplete RecordBatches at the end of the active segment (partially written before the crash):

Recovery algorithm:

  1. Starting from the beginning of the segment, iterate through RecordBatches
  2. For each batch, compute CRC32C and compare against the stored CRC
  3. When a CRC mismatch is found, truncate the file at the byte position of the corrupt batch
  4. Rebuild .index and .timeindex by scanning the now-clean .log file from the beginning
# Monitor broker startup log for recovery events
grep -E "(Recovering|Truncating|Recovery)" /var/log/kafka/server.log

# Normal recovery (no corruption):
# [2024-04-26 08:00:01,234] INFO Recovering unflushed segment 20971520
#   in log /data/kafka/orders-0 (kafka.log.Log)
# [2024-04-26 08:00:01,890] INFO Completed load of log /data/kafka/orders-0
#   with 3 log segments and log end offset 20973847 (kafka.log.Log)

# Recovery with truncation (corruption found):
# [2024-04-26 08:00:01,234] INFO Recovering unflushed segment 20971520
# [2024-04-26 08:00:01,456] WARN Found invalid messages in log segment
#   /data/kafka/orders-0/00000000000020971520.log at byte offset 542351088.
#   Message: Invalid record (e=Record is corrupt (stored crc = 1234567890,
#   computed crc = 987654321))
# [2024-04-26 08:00:01,457] INFO Truncating segment 20971520 to 542351088 bytes

log.recovery.threads.per.data.dir (default: 1) controls the parallelism of log recovery per data directory. For brokers with thousands of partitions, increasing this to match the number of disks can significantly reduce restart time:

# Increase recovery parallelism for brokers with many partitions
log.recovery.threads.per.data.dir=4  # or match disk count

Key Configuration Reference

Parameter Default Purpose
log.segment.bytes 1 GB Maximum size of a single .log segment file
log.index.size.max.bytes 10 MB Maximum size of .index and .timeindex files
log.index.interval.bytes 4 KB How often to add an index entry (bytes of log data per entry)
log.flush.interval.messages Long.MAX_VALUE Force fsync after this many messages (default: rely on OS)
log.flush.interval.ms Long.MAX_VALUE Force fsync after this interval (default: rely on OS)
log.recovery.threads.per.data.dir 1 Threads for parallel log recovery on startup

Critical note on fsync: Kafka defaults to not forcing fsync, relying on the OS to flush page cache dirty pages to disk asynchronously. This means in a broker crash scenario, the last few unflushed messages could be lost from that specific broker's local disk.

This is intentional โ€” durability is achieved through replication, not local fsync. With acks=all and min.insync.replicas=2, data has been written to at least 2 brokers' page caches (and likely disks). The probability of both crashing simultaneously before data is flushed is negligible in practice.

Enabling per-message fsync (log.flush.interval.messages=1) would nearly eliminate the throughput advantage of sequential writes โ€” disk write throughput would drop from hundreds of MB/s to hundreds of KB/s. Never enable this in production; rely on replication for durability instead.

Rate this chapter
4.6  / 5  (21 ratings)

๐Ÿ’ฌ Comments