第 4 章

Kafka 协议:二进制通信格式拆解

第4章:Kafka 协议:二进制通信格式拆解

导读:Kafka 的二进制 TCP 协议是如何设计的,为什么不使用 HTTP?

本章核心问题:Kafka 的二进制 TCP 协议是如何设计的,为什么不使用 HTTP?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

用 Wireshark 抓 Kafka 协议包

环境准备

# 启动本地 Kafka(KRaft 模式,不加密,便于抓包)
kafka-server-start.sh /etc/kafka/server.properties

# 安装 Wireshark(macOS)
brew install --cask wireshark

# 或使用 tshark(命令行版)
brew install wireshark

抓包与解析

# 抓取发往 9092 端口的 TCP 流量,保存到文件
tshark -i lo0 -f "tcp port 9092" -w /tmp/kafka.pcap &

# 发送一条测试消息
echo "hello world" | kcat -P -b localhost:9092 -t test-topic

# 停止抓包
kill %1

# 用 tshark 解析(Wireshark 内置 Kafka 协议解析器,从 3.x 开始支持)
tshark -r /tmp/kafka.pcap -d tcp.port==9092,kafka -T fields \
  -e kafka.request.api_key \
  -e kafka.request.api_version \
  -e kafka.correlationid \
  -e kafka.topic_name

用 kcat 调试协议

# 查看 Broker 支持的 API 版本(相当于发 ApiVersionsRequest)
kcat -b localhost:9092 -L

# 以 verbose 模式生产消息,显示详细请求/响应信息
kcat -b localhost:9092 -P -t test-topic -v <<< "test message"

# 消费并显示消息元数据
kcat -b localhost:9092 -C -t test-topic -f 'offset=%o partition=%p timestamp=%T\n' -e

手动解析 Hex 包

下面是一个真实抓到的 MetadataRequest 的前 30 字节 hex:

00 00 00 26   → Length = 38 字节
00 03         → ApiKey = 3 (Metadata)
00 0C         → ApiVersion = 12
00 00 00 01   → CorrelationId = 1
00            → ClientId null (COMPACT_NULLABLE_STRING: 0=null)
00            → TAG_BUFFER (Header)
02            → Topics 数组: 1+1=2 → 1个元素
0C            → Topic Name 长度 = 11+1=12
6F 72 64 65 72 2D 65 76 65 6E 74 73
              → "order-events" (12 bytes)
00            → TAG_BUFFER (topic)
00            → AllowAutoTopicCreation = false
08            → IncludeClusterAuthorizedOperations (bool flags)
00            → TAG_BUFFER (body)

Level 2 · 它是怎么运行的(3-5年经验)

本章的内部原理内容已整合到 Level 1 和 Level 3 中,请结合阅读。


Level 3 · 规范怎么定义的(资深)

为什么要了解 Kafka 的二进制协议

大多数使用 Kafka 的工程师从不需要直接接触协议层——Java 客户端、Python 的 kafka-python、Go 的 sarama 库都封装好了一切。但理解协议层,有几个重要的价值:

  1. 排查疑难问题:客户端卡死?连接频繁断开?用 Wireshark 抓包,读懂 hex 数据,能定位到底是客户端 Bug 还是 Broker 响应异常。
  2. 理解性能边界:批次大小限制来自哪里?为什么 max.request.size 默认 1MB?协议帧结构直接决定这些参数的含义。
  3. 构建非官方语言客户端:如果你需要为一个小众语言(如 Zig、Crystal)实现 Kafka 客户端,协议规范是唯一的参考。
  4. 理解版本协商机制:Kafka 协议的向后兼容性设计,是它能够在不停机的情况下跨版本滚动升级的基础。

TCP:选择二进制而非 HTTP

Kafka 没有选择 HTTP/REST,而是设计了自己的二进制 TCP 协议。这个选择在 2011 年是合理的,今天看来依然正确:

为什么不用 HTTP:HTTP 是文本协议(或 HTTP/2 的 HPACK 压缩帧),有冗余的头部(Content-TypeUser-AgentHost...),每个请求至少数百字节开销。对于 Kafka 这种高频小消息场景,协议开销占比太高。此外,HTTP/1.1 默认不支持请求多路复用(需要 keep-alive + 串行),HTTP/2 虽然支持多路复用,但在 2011 年尚未出现。

二进制协议的优势:紧凑(字段按精确字节对齐,无分隔符)、解析高效(无正则匹配,直接读 N 字节)、原生支持多路复用(CorrelationId 字段)。

请求帧结构

Kafka 协议的每个请求(Request)和响应(Response)都遵循统一的帧格式。

请求帧

┌─────────────────────────────────────────────────────────────┐
│  Length (4 bytes, Big-Endian int32)                         │
│  ─ 后续所有字节的总长度(不包含 Length 字段本身)              │
├──────────────────────────────────────┬──────────────────────┤
│  Request Header                      │  Request Body        │
│  ┌─────────────────────────────────┐ │  (由 ApiKey 和      │
│  │ ApiKey        (2 bytes, int16)  │ │   ApiVersion 决定)  │
│  │ ApiVersion    (2 bytes, int16)  │ │                      │
│  │ CorrelationId (4 bytes, int32)  │ │                      │
│  │ ClientId      (string, 可为null)│ │                      │
│  │  ─ 2字节长度 + N字节UTF-8字符串 │ │                      │
│  └─────────────────────────────────┘ │                      │
└──────────────────────────────────────┴──────────────────────┘

Length 字段:TCP 是流式协议,数据包边界不等同于请求边界。Length 字段告诉接收方"从现在开始读 N 字节,就是一个完整请求"。Kafka 的 Processor 线程用这个字段做请求边界检测。

ApiKey:标识请求类型。Produce=0,Fetch=1,Metadata=3,...,Vote=52,BeginQuorumEpoch=53 等。完整列表见 Kafka Protocol Guide

ApiVersion:同一 ApiKey 可以有多个版本(0、1、2...),版本之间向后兼容(新版本字段对旧版 Broker 透明)。

CorrelationId:客户端分配的请求 ID,Broker 在响应中原样返回。这实现了 TCP 连接上的请求多路复用——客户端可以在上一个响应返回前发出多个请求,靠 CorrelationId 将响应与请求匹配。

响应帧

┌─────────────────────────────────────────────┐
│  Length (4 bytes, Big-Endian int32)         │
├─────────────────────────────────────────────┤
│  CorrelationId (4 bytes, int32)             │
│  ─ 与对应请求的 CorrelationId 一致           │
├─────────────────────────────────────────────┤
│  Response Body                              │
│  (由请求的 ApiKey 和 ApiVersion 决定格式)  │
└─────────────────────────────────────────────┘

响应头没有 ApiKey 和 ApiVersion,因为客户端可以通过 CorrelationId 查找对应请求,从而知道如何解析响应体。

ApiVersions 请求:连接时的握手

客户端第一次连接 Broker 时,应该发送 ApiVersionsRequest(ApiKey=18),询问 Broker 支持哪些 API 及其版本范围:

ApiVersionsRequest v3:
  ApiKey:        18
  ApiVersion:    3
  CorrelationId: 0
  ClientId:      "my-producer"
  Body:
    ClientSoftwareName:    "apache-kafka-java"
    ClientSoftwareVersion: "3.7.0"

Broker 响应:

ApiVersionsResponse:
  CorrelationId: 0
  ErrorCode:     0 (NONE)
  ApiKeys:
    [ ApiKey=0  (Produce),      MinVersion=0, MaxVersion=10 ]
    [ ApiKey=1  (Fetch),        MinVersion=0, MaxVersion=16 ]
    [ ApiKey=3  (Metadata),     MinVersion=0, MaxVersion=12 ]
    [ ApiKey=18 (ApiVersions),  MinVersion=0, MaxVersion=3  ]
    ...(所有支持的 API)
  ThrottleTimeMs: 0

客户端选择 min(客户端支持的最高版本, Broker 支持的最高版本) 作为实际使用的版本。这是 Kafka 向后兼容性的核心机制:老客户端连接新 Broker,使用老版本;新客户端连接老 Broker,使用老 Broker 支持的最高版本。

ProduceRequest 深度解析(v9)

字段结构

ProduceRequest v9:
  ApiKey:        0
  ApiVersion:    9
  CorrelationId: 1001
  ClientId:      "order-producer"
  Body:
    TransactionalId:  null            (2字节 -1 表示非事务消息)
    Acks:             -1              (int16: -1=all, 0=none, 1=leader)
    TimeoutMs:        30000           (int32: 请求超时,毫秒)
    TopicData:        [               (COMPACT_ARRAY)
      {
        Name:           "order-events"     (COMPACT_STRING)
        PartitionData:  [
          {
            Index:      2                  (int32: Partition 编号)
            Records:    <RecordBatch 二进制数据>
          }
        ]
      }
    ]

Compact 编码

从 Kafka 2.4 开始,协议引入了 Flexible Version(灵活版本)和 Compact 编码。Compact 编码用于数组和字符串:

对于短字符串(长度 < 128),Varint 编码只需 1 字节,而固定 int16 需要 2 字节。在消息量极大的场景,这些节省积少成多。

实际 Hex 字节拆解

以下是一个最小 ProduceRequest(v9)的实际 hex 字节(简化版,无认证):

00 00 00 XX    → Length(后续字节总数,XX 代表实际值)
00 00          → ApiKey = 0 (Produce)
00 09          → ApiVersion = 9
00 00 03 E9    → CorrelationId = 1001
00 0E          → ClientId 长度 = 14 字节 (COMPACT_STRING: 实际长度+1=15 → varint=0F,此处简化)
6F 72 64 65 72 2D 70 72 6F 64 75 63 65 72
               → "order-producer" (14 bytes UTF-8)
00             → TAG_BUFFER (Flexible Version 的 tag 区,0=无额外 tag)
FF             → TransactionalId = null (COMPACT_NULLABLE_STRING: 0=null)
FF FF          → Acks = -1 (int16)
00 00 75 30    → TimeoutMs = 30000 (int32)
02             → TopicData 数组长度 = 1 (COMPACT_ARRAY: 1+1=2 → varint=0x02)
0D             → Topic Name 长度 = 12+1=13 → varint=0x0D
6F 72 64 65 72 2D 65 76 65 6E 74 73
               → "order-events" (12 bytes)
02             → PartitionData 数组长度 = 1 (COMPACT_ARRAY: 1+1=2)
00 00 00 02    → Partition Index = 2 (int32)
XX XX XX XX    → Records 长度 (COMPACT_BYTES)
<RecordBatch>  → 实际消息数据(见第5章格式)
00             → TAG_BUFFER (PartitionData)
00             → TAG_BUFFER (TopicData element)
00             → TAG_BUFFER (TopicData array)
00             → TAG_BUFFER (Request Body)

TAG_BUFFER:Flexible Version 引入的扩展机制,允许在不增加 ApiVersion 的情况下为现有字段添加可选的"标签字段"。格式是 (varint tag_id, varint size, bytes data)*,以 0x00 结束。对于不使用任何 tag 的情况,就是单字节 0x00

FetchRequest 深度解析

字段结构

FetchRequest v16:
  ApiKey:        1
  ApiVersion:    16
  CorrelationId: 2001
  ClientId:      "order-consumer"
  Body:
    ClusterId:         null           (可选,用于跨集群识别)
    ReplicaState:                     (v16新增:包含 ReplicaId 和 ReplicaEpoch)
      ReplicaId:       -1             (int32: -1=普通消费者,≥0=Follower)
      ReplicaEpoch:    -1             (int64: -1=普通消费者)
    MaxWaitMs:         500            (int32: 最长等待毫秒数,对应 fetch.max.wait.ms)
    MinBytes:          1              (int32: 最小返回字节,对应 fetch.min.bytes)
    MaxBytes:          52428800       (int32: 总最大字节 50MB)
    IsolationLevel:    1              (int8: 0=READ_UNCOMMITTED, 1=READ_COMMITTED)
    SessionId:         0              (int32: Fetch Session ID,0=无状态)
    SessionEpoch:      -1             (int32: -1=无状态 Fetch)
    Topics:            [
      {
        TopicId:       <UUID>         (16字节 UUID,v13+使用 ID 代替名称)
        Partitions:    [
          {
            Partition:          2    (int32)
            CurrentLeaderEpoch: 5    (int32: 消费者缓存的 Leader Epoch,用于检测过期 Leader)
            FetchOffset:        1000 (int64: 从此 offset 开始获取)
            LastFetchedEpoch:   5    (int32: 用于 leader epoch 边界检查)
            LogStartOffset:     -1   (int64: Follower 使用,消费者填 -1)
            PartitionMaxBytes:  1048576  (int32: 此分区最大 1MB)
          }
        ]
      }
    ]
    ForgottenTopicsData: []          (增量 Fetch Session 中不再需要的 Partition)
    RackId:            ""            (消费者的机架 ID,用于 Rack-aware Fetch)

IsolationLevel 的影响

IsolationLevel=READ_COMMITTED(1)时,Broker 只返回已提交事务的消息(即 LSO,Last Stable Offset 之前的消息)。对于非事务消息,LSO=HW,行为与 READ_UNCOMMITTED 相同。对于事务型生产者发出但尚未 commit/abort 的消息,它们对 READ_COMMITTED 消费者不可见,即使这些消息已经写入日志。

Fetch Session(增量 Fetch)

在 Kafka 2.0 引入 Fetch Session 之前,每次 FetchRequest 都需要带上所有需要消费的 Partition 列表,即使这个列表从不变化。对于消费者订阅了很多 Partition 的情况,这个重复的 Partition 列表会占用大量带宽。

Fetch Session 允许消费者与 Broker 建立一个有状态的 Fetch 会话:第一次 Fetch 带完整的 Partition 列表(SessionId=0,Broker 创建会话,返回新的 SessionIdSessionEpoch),后续 Fetch 只携带变化的 Partition(新增、移除或 FetchOffset 变化),大幅减少请求大小。

向后兼容性保证

Kafka 协议的向后兼容性设计原则:

  1. 只增不删:旧版本 Request/Response 中的字段永不删除(只标记为 deprecated)。
  2. 新字段在末尾:新版本在老版本字段之后追加新字段,旧版 Broker 在解析新版请求时可以忽略末尾的未知字段(通过 TAG_BUFFER 实现),不会出错。
  3. 版本号严格递增:任何字段语义变更(即使是可选字段)都必须递增 ApiVersion。
  4. TaggedFields 无需递增版本:从 Flexible Version 开始,可选的"标签字段"通过 TAG_BUFFER 传输,添加新的标签字段不需要递增 ApiVersion。

这套机制保证了 Kafka 跨版本滚动升级时的兼容性:新版 Broker 可以理解旧版客户端的请求;旧版客户端连接新版 Broker 时,通过 ApiVersionsRequest 协商使用旧版 API,一切正常工作。

Kafka 官方维护了一个协议版本兼容矩阵,列出了每个 ApiKey 各版本的 Broker 版本支持情况,是跨版本升级的重要参考。


Level 4 · 边界与陷阱(所有人)

以下是与"Kafka 协议:二进制通信格式拆解"相关的常见边界问题和生产陷阱:

陷阱一:忽略默认配置的隐含假设。 许多 Kafka 参数的默认值是为通用场景设计的,在特定业务场景下可能导致性能问题或数据风险。上线前务必逐一审查与本章主题相关的配置项,确认其是否符合业务需求。

陷阱二:缺乏端到端的验证。 理论理解和生产实践之间往往存在差距。建议在预发布环境中构建完整的测试场景,覆盖正常路径和异常路径(如节点宕机、网络分区、磁盘满),验证系统行为是否符合预期。

陷阱三:监控盲区。 本章涉及的机制如果缺乏对应的监控指标和告警规则,问题往往在造成业务影响后才被发现。建议将关键指标纳入监控体系,设置合理的告警阈值。

本章评分
4.5  / 5  (76 评分)

💬 留言讨论