第 5 章

消息格式演进:V0→V1→V2 RecordBatch

第5章:消息格式演进:V0→V1→V2 RecordBatch

导读:Kafka 消息格式为什么要从 V0 演进到 V2,每次演进解决了什么问题?

本章核心问题:Kafka 消息格式为什么要从 V0 演进到 V2,每次演进解决了什么问题?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

为什么 V2 更节省空间:量化对比

假设一个典型 Batch:1000 条消息,每条 Value 100 字节,无 Key:

开销项 V1(每条消息) V2(每条消息,均摊后)
Offset 8 bytes 0(BaseOffset在Batch头,OffsetDelta~1byte)
CRC 4 bytes 0(Batch级CRC均摊到每条消息约0.004 bytes)
Magic 1 byte 0(Batch级Magic均摊~0.001 bytes)
Attributes 1 byte 0(Batch级Attributes均摊~0.002 bytes)
Timestamp 8 bytes ~1-2 bytes(TimestampDelta,同批消息时间接近)
KeyLength 4 bytes 1 byte(Varint,-1编码为1字节)
ValueLength 4 bytes 1 byte(Varint,100<127,单字节)
元数据小计 30 bytes ~4-5 bytes
Value 100 bytes 100 bytes
总计/条 130 bytes ~105 bytes
节省 约 19%(不含压缩)

加上批量压缩(LZ4,文本数据压缩率约 60-70%),V2 相对 V1 的实际存储和传输数据量可降低 30-50%。对于存储 PB 级数据的大规模 Kafka 集群,这意味着数百 TB 的存储节省。

格式的演进,是工程权衡在时间维度上的沉淀。V0 解决了基础问题,V1 补上了时间戳,V2 则在综合性能、功能性和可扩展性上做了系统性的优化,并为幂等和事务语义提供了底层支撑。理解这个演进脉络,是深入理解 Kafka 内部机制的必经之路。


Level 2 · 它是怎么运行的(3-5年经验)

本章的内部原理内容已整合到 Level 1 和 Level 3 中,请结合阅读。


Level 3 · 规范怎么定义的(资深)

为什么消息格式要演进三次

Kafka 的消息格式(存储在磁盘上,也通过网络传输)经历了三次重大版本迭代,每次迭代都有明确的工程动机。这不是"为了变而变"——每个版本都在前一个版本基础上解决了真实的痛点。

理解格式演进的历史,有助于解答几个实际问题:

Magic Byte:格式版本的标识符

每种消息格式都有一个Magic Byte,位于消息头部,用于标识格式版本:

Magic Byte 是向后兼容性的基础:Broker 读取日志文件或解析网络请求时,首先读取 Magic Byte,然后根据版本选择对应的解析逻辑。不同格式可以混存于同一个 Partition 的不同 Segment 中,这是 Kafka 在线格式升级的基础——升级 Broker 版本后,旧消息仍以原格式存储,新消息以新格式写入,Consumer 端两种格式都能解析。

V0 格式:起点

V0 是 Kafka 最初的消息格式,结构简单:

V0 消息(Message)布局:
┌─────────────────────────────────────────────────────┐
│  Offset         (8 bytes, int64)                    │  ← 仅在日志文件中存在
├─────────────────────────────────────────────────────┤
│  MessageSize    (4 bytes, int32)                    │  ← 后续字段总大小
├─────────────────────────────────────────────────────┤
│  CRC            (4 bytes, uint32)                   │  ← CRC32 校验,覆盖 Magic 之后所有字段
│  Magic          (1 byte,  int8)  = 0                │
│  Attributes     (1 byte,  int8)                     │  ← 低3位=压缩类型(0=NONE,1=GZIP,2=Snappy,3=LZ4)
│  KeyLength      (4 bytes, int32) = -1 if null       │
│  Key            (N bytes)                           │
│  ValueLength    (4 bytes, int32) = -1 if null       │
│  Value          (N bytes)                           │
└─────────────────────────────────────────────────────┘

V0 的设计局限

没有时间戳:V0 消息没有任何时间信息。这意味着:

逐条 CRC:每条消息都有独立的 CRC 校验。这在 V0 的设计中很合理,因为 V0 消息是逐条独立处理的。但随着批量处理成为主流,逐条 CRC 的开销变得不可忽视:每条消息 4 字节 CRC 开销,加上 CPU 计算 CRC 的代价。

压缩的实现方式:V0/V1 使用"消息集合(MessageSet)"结构实现压缩:多条消息先打包成一个 MessageSet,然后压缩,再封装成一条"外层消息"(Attributes 字段标记压缩类型,Value 字段存储压缩后的 MessageSet)。这种"嵌套消息"结构在解压时需要额外的内存分配和复制,效率不高。

V1 格式:加入时间戳

V1 在 V0 基础上增加了时间戳字段,其余结构完全相同:

V1 消息布局:
┌─────────────────────────────────────────────────────┐
│  Offset         (8 bytes, int64)                    │
│  MessageSize    (4 bytes, int32)                    │
│  CRC            (4 bytes, uint32)                   │
│  Magic          (1 byte,  int8)  = 1                │
│  Attributes     (1 byte,  int8)                     │
│    bit 0-2: 压缩类型                                │
│    bit 3:   TimestampType (0=CreateTime, 1=LogAppendTime) │
│  Timestamp      (8 bytes, int64)                    │  ← V1 新增!
│  KeyLength      (4 bytes, int32)                    │
│  Key            (N bytes)                           │
│  ValueLength    (4 bytes, int32)                    │
│  Value          (N bytes)                           │
└─────────────────────────────────────────────────────┘

TimestampType 决定时间戳的含义:

V1 解决了时间戳问题,但其他结构性问题仍然存在:逐条 CRC、嵌套消息压缩、没有 Header 支持。

V2 格式(RecordBatch):革命性重设计

V2 不是 V1 的小改动,而是一次彻底的重新设计,引入了 RecordBatch 的概念。整个思路的转变是:从"逐条消息"转为"批次优先"

RecordBatch 外层结构

V2 RecordBatch 布局:
字节偏移   字段宽度   字段名              说明
──────────────────────────────────────────────────────────────────────────
0          8         BaseOffset           此 Batch 中第一条消息的 Offset
8          4         BatchLength          从 Magic 字段到 Batch 末尾的字节数
12         4         PartitionLeaderEpoch 写入时 Leader 的 Epoch(用于恢复时检测截断)
16         1         Magic                = 2(固定值,标识 V2 格式)
17         4         CRC                  CRC32C 校验,覆盖从 Attributes 到 Records 末尾
                                          (注:CRC 在 Magic 之后,不覆盖 Magic)
21         2         Attributes           见下方 bit 说明
23         4         LastOffsetDelta      Batch 内最后一条消息的 offset delta(相对 BaseOffset)
27         8         BaseTimestamp        Batch 内第一条消息的时间戳
35         8         MaxTimestamp         Batch 内最大时间戳(通常是最后一条消息的时间戳)
43         8         ProducerId           生产者 ID(幂等生产者专用,-1=普通生产者)
51         2         ProducerEpoch        生产者 Epoch(幂等/事务专用,-1=普通生产者)
53         4         BaseSequence         Batch 内第一条消息的序列号(幂等专用,-1=普通)
57         4         RecordsCount         Batch 内消息条数
61         variable  Records              压缩/非压缩的 Record 序列
──────────────────────────────────────────────────────────────────────────

Attributes 字段(2 字节,16 bits)

bit 0-2:  压缩类型  (0=NONE, 1=GZIP, 2=Snappy, 3=LZ4, 4=ZSTD)
bit 3:    TimestampType (0=CreateTime, 1=LogAppendTime)
bit 4:    IsTransactional (1=事务消息,0=普通消息)
bit 5:    IsControl (1=控制消息,如事务的 COMMIT/ABORT 标记)
bit 6:    HasDeleteHorizonMs (1=Compaction 删除水位)
bit 7-15: 保留

Record 内层结构(Batch 内每条消息)

V2 中每条实际消息称为 Record,结构比 V0/V1 更紧凑:

V2 Record 布局(所有整数均为 Varint 编码):
┌─────────────────────────────────────────────────────┐
│  Length           (varint)   整条 Record 的字节长度  │
│  Attributes       (int8)     保留,目前恒为 0        │
│  TimestampDelta   (varint)   相对 BaseTimestamp 的差值│
│  OffsetDelta      (varint)   相对 BaseOffset 的差值  │
│  KeyLength        (varint)   -1 表示 null            │
│  Key              (N bytes)                          │
│  ValueLength      (varint)   -1 表示 null            │
│  Value            (N bytes)                          │
│  HeadersCount     (varint)   Header 数量             │
│  Headers[]                                           │
│    HeaderKeyLength  (varint)                         │
│    HeaderKey        (N bytes, UTF-8)                 │
│    HeaderValueLength(varint)                         │
│    HeaderValue      (N bytes)                        │
└─────────────────────────────────────────────────────┘

字节级布局示例:一个真实的 V2 RecordBatch

以下是包含两条消息("hello""world")的 V2 RecordBatch 的完整字节布局(非压缩,无 Key,ProducerId=-1):

RecordBatch Header (61 bytes):
00 00 00 00 00 00 00 00    BaseOffset = 0
00 00 00 XX                BatchLength = N(后续计算)
FF FF FF FF                PartitionLeaderEpoch = -1(测试/非复制模式)
02                         Magic = 2
XX XX XX XX                CRC32C(覆盖 Attributes 到 Records 末尾)
00 00                      Attributes = 0 (NONE 压缩, CreateTime, 非事务, 非控制)
00 00 00 01                LastOffsetDelta = 1(Batch 内 2 条消息,最后的 OffsetDelta=1)
00 00 01 8D 00 00 00 00    BaseTimestamp = 1714000000000 (2024-04-25 示例时间)
00 00 01 8D 00 00 00 00    MaxTimestamp = 1714000000000(与 BaseTimestamp 相同)
FF FF FF FF FF FF FF FF    ProducerId = -1 (非幂等生产者)
FF FF                      ProducerEpoch = -1
FF FF FF FF                BaseSequence = -1

00 00 00 02                RecordsCount = 2

Record[0] (message "hello"):
12                         Length = 9 (varint: 0x12 = 18? 注:varint 编码,实际计算)
00                         Attributes = 0
00                         TimestampDelta = 0 (varint)
00                         OffsetDelta = 0 (varint)
01                         KeyLength = -1 → varint -1 → zigzag: ((-1)<<1)^(-1>>63) = 1
00 0A                      ValueLength = 5 → varint: (5+1=)zigzag: 10 = 0x0A
68 65 6C 6C 6F             Value = "hello" (5 bytes)
00                         HeadersCount = 0 (varint)

Record[1] (message "world"):
12                         Length (同上结构)
00                         Attributes = 0
00                         TimestampDelta = 0 (同批内,时间相同)
02                         OffsetDelta = 1 → zigzag: (1<<1)^0 = 2 = 0x02
01                         KeyLength = -1 → varint zigzag = 1
0A                         ValueLength = 5 → zigzag = 10 = 0x0A
77 6F 72 6C 64             Value = "world" (5 bytes)
00                         HeadersCount = 0

注:V2 Record 中的有符号整数使用 ZigZag Varint 编码:将有符号整数映射为无符号整数后再 Varint 编码。encode(n) = (n << 1) ^ (n >> 63)decode(m) = (m >>> 1) ^ -(m & 1)。这使得小的负数(如 -1)也能用少量字节表示(-1 编码为 1,只需 1 字节)。

压缩:批次级别 vs 消息级别

V0/V1 的压缩是将一批消息的 Value 放入另一条消息的 Value 中(嵌套),解压时需要额外分配缓冲区、复制数据。

V2 的压缩是将整个 Records 区域(所有 Record 的字节序列)直接压缩,存储在 RecordBatch 的 Records 字段中,无需嵌套。

V2 压缩 RecordBatch(LZ4 为例):
RecordBatch Header (61 bytes):
  Attributes.bit0-2 = 3 (LZ4)
  ...

Records (LZ4 compressed bytes):
  LZ4 decompressed content = [Record[0] bytes][Record[1] bytes]...[Record[N] bytes]

这使得 Broker 可以在不解压的情况下传输整个 RecordBatch(Producer → Broker → Consumer),只在最终消费端解压。"Zero-copy 传输 + 批量压缩"是 Kafka 高吞吐低延迟的关键组合。


Level 4 · 边界与陷阱(所有人)

V2 的四大核心创新

创新一:Batch 级 CRC(而非逐条 CRC)

V2 将 CRC 从每条 Record 提升到整个 Batch 级别:一个 RecordBatch 只有一个 CRC32C(注意是 CRC32C,不是 V0/V1 的 CRC32,计算更快)。

节省:假设一个 Batch 有 1000 条消息,V1 需要 1000 次 CRC 计算 + 4000 字节的 CRC 存储;V2 只需 1 次 CRC 计算 + 4 字节存储。CPU 开销从 O(N) 降为 O(1)(相对消息数)。

代价:无法单独校验 Batch 内某条消息的完整性(只能校验整个 Batch)。在实践中这不是问题,因为 Kafka 的读取单位也是 Batch,很少需要单条消息级别的校验。

创新二:Varint 差值编码(OffsetDelta + TimestampDelta)

这是 V2 最精妙的空间优化。

OffsetDelta:V0/V1 每条消息存储完整的 8 字节 Offset。V2 在 RecordBatch 头存储 BaseOffset(8 字节),每条 Record 只存储相对 BaseOffset 的差值(OffsetDelta),用 Varint 编码。

对于 Batch 内连续的消息(OffsetDelta=0, 1, 2, 3...),Varint 编码只需 1 字节(Varint 0-127 单字节)。每条消息节省约 7 字节(8字节完整Offset - 1字节Varint差值)。

TimestampDelta:类似地,RecordBatch 头存储 BaseTimestamp,每条 Record 只存储与 BaseTimestamp 的差值(毫秒)。对于在短时间内(<127ms)生产的同批消息,TimestampDelta 同样只需 1-2 字节,而不是 8 字节。

KeyLength/ValueLength:V0/V1 使用固定 4 字节 int32 存储长度;V2 使用 Varint,对于长度 < 128 字节的 Key/Value,节省 3 字节。

整体效果:每条消息的元数据开销从 V1 的约 34 字节(Offset 8 + CRC 4 + Magic 1 + Attr 1 + Timestamp 8 + KeyLen 4 + ValueLen 4 + etc.)降为 V2 的约 8-12 字节(Varint 编码的各个 delta 字段)。对于大量小消息(Value < 1KB),V2 的空间节省可达 30-50%。

创新三:Header 支持

V2 Record 支持任意数量的键值对 Header(key 为 UTF-8 字符串,value 为任意字节数组)。

Header 的用途极为广泛:

// 注入分布式追踪信息(不修改消息 Value)
ProducerRecord<String, Order> record = new ProducerRecord<>("orders", order);
record.headers().add("trace-id", traceContext.traceId().getBytes());
record.headers().add("span-id", traceContext.spanId().getBytes());
record.headers().add("content-type", "application/json".getBytes());
record.headers().add("schema-version", "v2.3".getBytes());

// 消费端读取 Header
for (ConsumerRecord<String, Order> r : records) {
    Header traceHeader = r.headers().lastHeader("trace-id");
    String traceId = traceHeader != null ?
        new String(traceHeader.value()) : "unknown";
    // 用 traceId 关联日志
}

在 V0/V1 时代,这些元数据只能混入消息 Value(比如 JSON 中多一个 __trace_id 字段),或者通过 Key 传递(牺牲了路由语义)。V2 的 Header 将元数据与业务数据完全分离,是架构上的重大改进。

创新四:ProducerId + ProducerEpoch + BaseSequence(幂等性基础)

RecordBatch 头部的这三个字段共同构成 Kafka 幂等生产者的基础:

ProducerId(PID):由 Broker 分配的全局唯一生产者标识符。生产者初始化时(enable.idempotence=true)向 Broker 发起 InitProducerIdRequest,Broker 返回一个 PID。

ProducerEpoch:同一个 PID 可以有多个 Epoch(生产者重启时 Epoch 递增)。Broker 用 (PID, Epoch) 的组合来识别"当前活跃的生产者实例",拒绝来自旧 Epoch 的请求(防止僵尸生产者写入)。

BaseSequence:Batch 内第一条消息的序列号,单调递增。Broker 为每个 (PID, Epoch, Partition) 维护已收到的最大序列号,检测到重复(序列号 ≤ 已见最大值)则丢弃,检测到序列号跳跃(> 已见最大值 + Batch 大小)则报错(消息丢失)。

// 开启幂等生产者
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);  // 自动设置 acks=-1 + retries=MAX
// 内部自动设置:
// acks = -1
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection = 5(幂等模式下最多5可保序)
KafkaProducer<String, Order> producer = new KafkaProducer<>(props);

幂等保证范围:单分区、单会话(ProducerEpoch 相同的生命周期内)。跨会话(生产者重启)或跨分区的精确一次,需要事务(transactional.id 配置 + beginTransaction/commitTransaction API)。

IsTransactional 和 IsControl 标志

Attributes.IsTransactional=1 标记此 Batch 是事务的一部分。Broker 在 READ_COMMITTED 模式下,只有当包含此 Batch 的事务被 COMMIT 后,才将其暴露给消费者。

Attributes.IsControl=1 标记此 Batch 是控制批次(Control Batch),包含事务的最终状态标记:COMMIT 或 ABORT。控制批次对普通消费者不可见(消费者库自动过滤),只对事务协调器有意义。

本章评分
4.9  / 5  (67 评分)

💬 留言讨论