Chapter 5

Message Format Evolution: V0→V1→V2 RecordBatch

Why Three Format Revisions?

Kafka's on-disk message format — which is also the over-the-wire format for produce and fetch operations — has undergone three major revisions. Each revision was driven by concrete engineering requirements, not format churn. Understanding the evolution history answers practical questions:

The Magic Byte: Version Identifier

Every Kafka message format version is identified by a single byte called the Magic Byte, positioned at a fixed offset within the message envelope:

The Magic Byte is the cornerstone of backward compatibility. When a Broker reads a log segment or parses an incoming ProduceRequest, it reads the Magic Byte first, then dispatches to the corresponding parsing logic. Different format versions can coexist within the same partition — different log segments can use different formats. This is what makes Kafka's online format migration possible: after a cluster upgrade, old segments stay in their original format; new writes use the new format; consumers support all versions simultaneously.

V0 Format: The Starting Point

V0 was Kafka's original message format. Its structure is straightforward:

V0 Message Layout:
┌────────────────────────────────────────────────────────────────┐
│  Offset          (8 bytes, int64, big-endian)                  │
│  ← Only in the log file; not in ProduceRequest wire format     │
├────────────────────────────────────────────────────────────────┤
│  MessageSize     (4 bytes, int32)  bytes following this field  │
├────────────────────────────────────────────────────────────────┤
│  CRC             (4 bytes, uint32) CRC32 over fields below     │
│  Magic           (1 byte,  int8)   = 0                         │
│  Attributes      (1 byte,  int8)                               │
│    bits 0-2: Compression (0=NONE, 1=GZIP, 2=Snappy, 3=LZ4)    │
│    bits 3-7: Reserved                                           │
│  KeyLength       (4 bytes, int32)  -1 = null key               │
│  Key             (KeyLength bytes)                              │
│  ValueLength     (4 bytes, int32)  -1 = null value             │
│  Value           (ValueLength bytes)                           │
└────────────────────────────────────────────────────────────────┘

V0's Structural Limitations

No timestamp: V0 messages carry no time information whatsoever. Practical consequences:

Per-message CRC: Each V0 message has its own CRC32 checksum. For individual message integrity this is logically sound, but it creates overhead at scale: 4 bytes of storage per message, plus CPU time to compute and verify every individual message's CRC.

Nested compression: V0 (and V1) implement compression by packing multiple messages into a "message set," compressing the message set, then stuffing it into a single outer message's Value field. The outer message's Attributes field marks the compression type. Decompressing requires: reading the outer message, allocating a decompression buffer, decompressing into it, then parsing the inner message set. Two-level parsing with extra memory copies.

V1 Format: Adding Timestamps

V1 adds one field to V0 — the timestamp — and introduces TimestampType in the Attributes byte. Everything else remains identical:

V1 Message Layout (changes from V0 in bold):
┌────────────────────────────────────────────────────────────────┐
│  Offset          (8 bytes, int64)                              │
│  MessageSize     (4 bytes, int32)                              │
│  CRC             (4 bytes, uint32)                             │
│  Magic           (1 byte,  int8)   = 1                         │
│  Attributes      (1 byte,  int8)                               │
│    bits 0-2: Compression                                       │
│    bit 3:    TimestampType (0=CreateTime, 1=LogAppendTime)     │
│    bits 4-7: Reserved                                          │
│  Timestamp       (8 bytes, int64)  ← NEW in V1                 │
│  KeyLength       (4 bytes, int32)                              │
│  Key             (KeyLength bytes)                              │
│  ValueLength     (4 bytes, int32)                              │
│  Value           (ValueLength bytes)                           │
└────────────────────────────────────────────────────────────────┘

TimestampType semantics:

CreateTime (0): The producer sets the timestamp at message creation time (System.currentTimeMillis() by default, overridable via ProducerRecord constructor). The broker stores this value unchanged. This preserves the message's origin time, enabling accurate event-time stream processing.

LogAppendTime (1): The broker overwrites whatever timestamp the producer set with the broker's wall clock time when the message is appended to the log. All messages within a batch receive the same append time. This mode provides a monotonic, broker-authoritative timeline but loses producer-side time information.

V1 resolved the timestamp gap but left V0's other structural problems intact: per-message CRC, nested compression, and no support for arbitrary metadata (forcing applications to embed metadata in the message value or key).

V2 Format (RecordBatch): Revolutionary Redesign

V2 is not an incremental improvement — it's a ground-up redesign with a different fundamental abstraction. The shift is conceptual: from "individual messages" to "batch-first design".

Instead of a flat sequence of independently structured messages, V2 organizes messages into RecordBatches: a batch header containing shared fields, followed by tightly packed individual Records that store only deltas from the batch-level baseline.

RecordBatch Outer Structure

V2 RecordBatch Layout:
Byte Offset  Width    Field
─────────────────────────────────────────────────────────────────────
0            8        BaseOffset (int64)
                      Absolute offset of the first Record in this batch.

8            4        BatchLength (int32)
                      Bytes from the Magic field to the end of the batch.
                      Does NOT include the first 12 bytes (BaseOffset + BatchLength).

12           4        PartitionLeaderEpoch (int32)
                      The leader epoch when this batch was written.
                      Used during log recovery to detect and truncate
                      uncommitted entries that a previous leader wrote
                      before failing over.

16           1        Magic (int8) = 2
                      Format version identifier.

17           4        CRC (uint32, CRC32C)
                      Checksum covering bytes from Attributes (offset 21)
                      to the end of the batch.
                      Note: CRC is placed AFTER Magic, so Magic is not
                      included in the checksum. CRC32C (Castagnoli polynomial)
                      is faster than CRC32 and has better error detection.

21           2        Attributes (int16)
                      bit 0-2: Compression (0=NONE,1=GZIP,2=Snappy,3=LZ4,4=ZSTD)
                      bit 3:   TimestampType (0=CreateTime, 1=LogAppendTime)
                      bit 4:   IsTransactional (1=part of a transaction)
                      bit 5:   IsControl (1=transaction control batch)
                      bit 6:   HasDeleteHorizonMs (for compaction)
                      bit 7-15: Reserved

23           4        LastOffsetDelta (int32)
                      The OffsetDelta of the last Record in the batch.
                      Used for fast validation: BaseOffset + LastOffsetDelta
                      should equal the last record's absolute offset.

27           8        BaseTimestamp (int64)
                      Timestamp of the first Record. All Record timestamps
                      are stored as deltas from this value.

35           8        MaxTimestamp (int64)
                      Maximum timestamp in this batch. Used for time-based
                      index entries and log retention decisions.

43           8        ProducerId (int64)
                      Broker-assigned unique producer identifier.
                      -1 for non-idempotent producers.

51           2        ProducerEpoch (int16)
                      Epoch of the producer. Increments on producer restart
                      or transactional coordinator fence.
                      -1 for non-idempotent producers.

53           4        BaseSequence (int32)
                      Sequence number of the first Record in this batch.
                      Used by broker for idempotent deduplication.
                      -1 for non-idempotent producers.

57           4        RecordsCount (int32)
                      Number of Records in this batch.

61           variable Records
                      If compression = NONE: concatenated Record bytes.
                      If compression ≠ NONE: compressed bytes that decompress
                      to concatenated Record bytes.
─────────────────────────────────────────────────────────────────────

Record Inner Structure

Each individual message inside a RecordBatch is called a Record. Records use variable-length integers (varints) for all numeric fields, eliminating fixed-width overhead:

V2 Record Layout (all integers are ZigZag-encoded Varints):
┌────────────────────────────────────────────────────────────────┐
│  Length          (varint)  Total byte length of this Record    │
│  Attributes      (int8)    Reserved, currently always 0        │
│  TimestampDelta  (varint)  Signed delta from BaseTimestamp     │
│  OffsetDelta     (varint)  Signed delta from BaseOffset        │
│  KeyLength       (varint)  -1 = null key                       │
│  Key             (KeyLength bytes, if KeyLength >= 0)          │
│  ValueLength     (varint)  -1 = null value                     │
│  Value           (ValueLength bytes, if ValueLength >= 0)      │
│  HeadersCount    (varint)  Number of headers                   │
│  Headers[]                                                     │
│    HeaderKeyLength   (varint)                                  │
│    HeaderKey         (N bytes, UTF-8)                          │
│    HeaderValueLength (varint, -1 = null)                       │
│    HeaderValue       (N bytes)                                 │
└────────────────────────────────────────────────────────────────┘

The Four Core Innovations of V2

Innovation 1: Batch-Level CRC (Not Per-Message)

V2 moves the integrity checksum from individual messages to the entire RecordBatch. One CRC32C covers the entire batch from the Attributes field to the end.

Savings for a batch of 1,000 messages:

The CRC computation cost drops from O(N messages) to O(1). For a broker processing 500K messages/second in large batches, this is a measurable CPU reduction.

The tradeoff: individual Record integrity cannot be checked without verifying the entire batch. In practice, this is not a limitation — Kafka's read and write units are batches, and individual-message integrity checking is never needed in normal operation.

Why CRC32C instead of CRC32?: CRC32C uses the Castagnoli polynomial, which is natively accelerated by SSE4.2 CPU instructions on x86 (crc32 instruction). Software implementations are also faster. CRC32C has slightly better error-detection properties for typical data corruption patterns. Kafka chose CRC32C specifically for V2 to take advantage of hardware acceleration.

Innovation 2: Varint Delta Encoding (OffsetDelta + TimestampDelta)

This is V2's most impactful space optimization.

OffsetDelta: V0/V1 store each message's full 8-byte absolute offset. V2 stores the absolute BaseOffset once in the batch header, then each Record stores only its OffsetDelta — the signed offset relative to BaseOffset — as a ZigZag Varint.

For consecutive messages in a batch (OffsetDelta = 0, 1, 2, 3, ..., 999), each OffsetDelta fits in a single byte (ZigZag Varints 0–63 encode to 1 byte). Saving: 8 bytes (fixed int64) → 1–2 bytes (Varint) per message = 6–7 bytes saved per message, purely on offset storage.

TimestampDelta: The batch header stores BaseTimestamp. Each Record stores the signed millisecond delta from BaseTimestamp. Messages produced in the same call to producer.send() within a short time window (< 63ms apart) each require 1 byte for TimestampDelta vs 8 bytes for a full timestamp.

KeyLength and ValueLength: V0/V1 use fixed 4-byte int32 for lengths (handling keys/values up to 2GB). V2 uses ZigZag Varints. For keys shorter than 64 bytes or values shorter than 64 bytes: 1 byte length vs 4 bytes = 3 bytes saved per field per message.

ZigZag encoding maps signed integers to unsigned before Varint encoding: encode(n) = (n << 1) ^ (n >> 63), decode(m) = (m >>> 1) ^ -(m & 1). This ensures small negative numbers (like -1 for null) also encode compactly: -1 encodes to 1 (single byte), rather than the 10 bytes that a naïve Varint of the two's complement representation would require.

Innovation 3: Headers

V2 Records support an arbitrary number of key-value Header pairs — key as a UTF-8 string, value as raw bytes. Headers are completely separate from the message payload.

Before V2, metadata about a message had to be embedded in the payload (adding fields to a JSON envelope) or encoded in the key (sacrificing routing semantics). Both approaches pollute the business data contract with infrastructure concerns.

V2 Headers enable clean separation:

// Producer: inject distributed tracing context into headers
ProducerRecord<String, Order> record = new ProducerRecord<>(
    "order-events",
    order.getOrderId(),  // key: determines partition
    order              // value: business data only
);

// Infrastructure metadata goes in headers, not in the value
record.headers()
    .add("trace-id", currentSpan.context().traceId().getBytes(UTF_8))
    .add("span-id", currentSpan.context().spanId().getBytes(UTF_8))
    .add("content-type", "application/vnd.order.v2+json".getBytes(UTF_8))
    .add("schema-version", "2.3".getBytes(UTF_8))
    .add("source-service", "order-service".getBytes(UTF_8));

producer.send(record);

// Consumer: read headers for routing/tracing without deserializing value
for (ConsumerRecord<String, Order> r : records) {
    Header traceHeader = r.headers().lastHeader("trace-id");
    String traceId = traceHeader != null
        ? new String(traceHeader.value(), UTF_8)
        : "no-trace";

    try (Scope scope = tracer.buildSpan("process-order")
            .asChildOf(extractContext(r.headers()))
            .startActive(true)) {
        processOrder(r.value());
    }
}

Headers are particularly valuable for:

Innovation 4: ProducerId + ProducerEpoch + BaseSequence (Idempotence Foundation)

These three batch-header fields provide the mechanism for exactly-once produce semantics within a producer session.

ProducerId (PID): A globally unique 64-bit integer assigned by the broker when the producer calls InitProducerIdRequest (triggered by enable.idempotence=true). The PID identifies a specific producer across restarts.

ProducerEpoch: When a producer restarts, or when the transactional coordinator fences a zombie producer, the epoch increments. The broker rejects any ProduceBatch with an epoch lower than the current epoch for that PID. This prevents a crashed-and-recovered producer from having its in-flight requests accepted after a replacement producer has taken over.

BaseSequence: The sequence number of the first Record in the batch, monotonically increasing per (PID, ProducerEpoch, Partition) triple. The broker maintains the highest sequence number seen for each triple. Upon receiving a batch:

// Enable idempotent producer — automatically sets acks=-1 and retries=MAX_VALUE
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
// Kafka automatically enforces:
//   acks = -1 (all ISR must acknowledge)
//   retries = Integer.MAX_VALUE
//   max.in.flight.requests.per.connection = 5 (max for idempotent safety)

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Now: exactly-once delivery per partition per producer session
// Duplicate sends due to retries are automatically deduplicated by the broker

Idempotence scope: Single partition, single producer epoch (session). For cross-partition, cross-session exactly-once, use transactions:

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-producer-v1");
// transactional.id enables cross-session recovery: the same transactional.id
// lets the broker fence the previous session if it restarts

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();  // fetches PID, bumps epoch if restarting

try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("orders", orderId, orderJson));
    producer.send(new ProducerRecord<>("inventory", itemId, decrementJson));
    producer.commitTransaction();  // atomic across both partitions
} catch (ProducerFencedException e) {
    // Another instance with same transactional.id took over — this one is fenced
    producer.close();
} catch (KafkaException e) {
    producer.abortTransaction();
}

IsTransactional and IsControl Flags

Attributes.IsTransactional = 1: Marks that this RecordBatch is part of an open transaction. The broker holds these records in the log but does not advance the Last Stable Offset (LSO) past them until the transaction commits or aborts. READ_COMMITTED consumers (IsolationLevel=1) can only read up to LSO, so transactional records are invisible until committed.

Attributes.IsControl = 1: Marks a Control Batch — a special batch written by the transaction coordinator to finalize a transaction. Control Batches contain either a COMMIT or ABORT marker. The consumer library automatically filters out Control Batches and never exposes them to application code. They are visible in the log file and to the transaction coordinator's internal logic.

Complete Byte-Level Layout: A Real V2 RecordBatch

Let's trace the exact bytes for a RecordBatch containing two records ("hello" and "world"), no compression, no keys, non-idempotent producer:

RecordBatch (61-byte header + Records):

Bytes 0-7:    00 00 00 00 00 00 00 00
              BaseOffset = 0

Bytes 8-11:   00 00 00 4A
              BatchLength = 74 (total bytes from Magic to end)

Bytes 12-15:  FF FF FF FF
              PartitionLeaderEpoch = -1 (standalone/test mode)

Byte 16:      02
              Magic = 2

Bytes 17-20:  XX XX XX XX
              CRC32C (computed over bytes 21 to end of batch)

Bytes 21-22:  00 00
              Attributes = 0
              (NONE compression, CreateTime, not transactional, not control)

Bytes 23-26:  00 00 00 01
              LastOffsetDelta = 1 (two records: deltas 0 and 1)

Bytes 27-34:  00 00 01 8D 00 00 00 00
              BaseTimestamp = 1714000000000ms (Unix timestamp, example)

Bytes 35-42:  00 00 01 8D 00 00 00 00
              MaxTimestamp = 1714000000000ms (same as Base, no delta in batch)

Bytes 43-50:  FF FF FF FF FF FF FF FF
              ProducerId = -1

Bytes 51-52:  FF FF
              ProducerEpoch = -1

Bytes 53-56:  FF FF FF FF
              BaseSequence = -1

Bytes 57-60:  00 00 00 02
              RecordsCount = 2

Record[0] — "hello":
  0C          Length = 6 (ZigZag Varint: 0x0C = 12/2 = 6 bytes follow the length field)
  00          Attributes = 0
  00          TimestampDelta = 0 (ZigZag: 0 → 0)
  00          OffsetDelta = 0 (ZigZag: 0 → 0)
  01          KeyLength = -1 (ZigZag: -1 → ((-1<<1)^(-1>>63)) = 1)
  0A          ValueLength = 5 (ZigZag: 5 → (5<<1)^0 = 10 = 0x0A)
  68 65 6C 6C 6F
              Value = "hello"
  00          HeadersCount = 0

Record[1] — "world":
  0C          Length = 6
  00          Attributes = 0
  00          TimestampDelta = 0 (same millisecond as Record[0])
  02          OffsetDelta = 1 (ZigZag: 1 → (1<<1)^0 = 2 = 0x02)
  01          KeyLength = -1 (ZigZag → 1)
  0A          ValueLength = 5 (ZigZag → 10)
  77 6F 72 6C 64
              Value = "world"
  00          HeadersCount = 0

Total batch size: 61 (header) + 12 (Record[0]) + 12 (Record[1]) = 85 bytes for two 5-byte messages. V1 would require: 2 × (8+4+4+1+1+8+4+4+5) = 2 × 39 = 78 bytes + MessageSet framing overhead ≈ 95+ bytes. The advantage grows dramatically with larger batches.

Compression: Batch-Level vs Per-Message

In V0/V1, compression used a nested structure: multiple messages were serialized into a MessageSet, compressed, and stored as the Value of a single outer "wrapper" message. Decompression required two parsing passes and extra memory allocation.

In V2, compression is applied directly to the entire Records section of the RecordBatch:

V2 with LZ4 compression:

RecordBatch Header (61 bytes):
  Attributes = 0x0003  (bits 0-2 = 011 = LZ4)
  RecordsCount = 1000
  ...

Records field (compressed):
  [LZ4 block header]
  [LZ4 compressed payload]
  ← decompresses to: Record[0]||Record[1]||...||Record[999]

This enables zero-copy passthrough: when a consumer and producer both use the same compression codec, the broker can pass RecordBatches from disk to the network socket without decompressing and recompressing. The bytes on disk are the bytes on the wire. Zero-copy + batch compression is the combination that enables Kafka's extreme throughput (millions of messages per second per broker).

Quantified Space Savings: V1 vs V2

Scenario: 1,000 messages per batch, each with a 100-byte JSON value, no key:

Field V1 (per message) V2 (per message, amortized) Savings
Offset 8 bytes ~1 byte (OffsetDelta Varint) 7 bytes
CRC 4 bytes 0.004 bytes (batch CRC ÷ 1000) ~4 bytes
Magic 1 byte 0.001 bytes ~1 byte
Attributes 1 byte 0.002 bytes ~1 byte
Timestamp 8 bytes 1–2 bytes (delta) 6–7 bytes
KeyLength 4 bytes 1 byte (Varint -1) 3 bytes
ValueLength 4 bytes 1 byte (Varint 100) 3 bytes
Metadata total 30 bytes ~4–5 bytes ~25 bytes saved
Value 100 bytes 100 bytes
Total per message 130 bytes ~105 bytes ~19% smaller

With LZ4 compression on typical JSON payloads (60–65% compression ratio):

For a cluster storing 1 PB of data in V1 format, migrating to V2 with LZ4 would reduce storage to approximately 700–800 TB — a saving of 200–300 TB. At $20/TB/month in cloud storage, that's $4–6M/year in storage cost reduction for a large deployment.

Format evolution is engineering pragmatism materialized over time. V0 solved the baseline problem. V1 added the missing timestamp. V2 systematically optimized for space efficiency, operational simplicity, and provided the binary-level foundation for idempotence and transactions — capabilities that would have been impossible to add cleanly to V0 or V1's per-message structure.

Rate this chapter
4.9  / 5  (67 ratings)

💬 Comments