Kafka 协议:二进制通信格式拆解
第4章:Kafka 协议:二进制通信格式拆解
导读:Kafka 的二进制 TCP 协议是如何设计的,为什么不使用 HTTP?
本章核心问题:Kafka 的二进制 TCP 协议是如何设计的,为什么不使用 HTTP?
读完本章你将理解:
- 请求帧与响应帧的完整字节结构
- ApiVersions 握手与版本协商机制
- ProduceRequest 和 FetchRequest 的深度字段解析
- 向后兼容性保证的四条设计原则
Level 1 · 你需要知道的(1-3年经验)
用 Wireshark 抓 Kafka 协议包
环境准备
# 启动本地 Kafka(KRaft 模式,不加密,便于抓包)
kafka-server-start.sh /etc/kafka/server.properties
# 安装 Wireshark(macOS)
brew install --cask wireshark
# 或使用 tshark(命令行版)
brew install wireshark
抓包与解析
# 抓取发往 9092 端口的 TCP 流量,保存到文件
tshark -i lo0 -f "tcp port 9092" -w /tmp/kafka.pcap &
# 发送一条测试消息
echo "hello world" | kcat -P -b localhost:9092 -t test-topic
# 停止抓包
kill %1
# 用 tshark 解析(Wireshark 内置 Kafka 协议解析器,从 3.x 开始支持)
tshark -r /tmp/kafka.pcap -d tcp.port==9092,kafka -T fields \
-e kafka.request.api_key \
-e kafka.request.api_version \
-e kafka.correlationid \
-e kafka.topic_name
用 kcat 调试协议
# 查看 Broker 支持的 API 版本(相当于发 ApiVersionsRequest)
kcat -b localhost:9092 -L
# 以 verbose 模式生产消息,显示详细请求/响应信息
kcat -b localhost:9092 -P -t test-topic -v <<< "test message"
# 消费并显示消息元数据
kcat -b localhost:9092 -C -t test-topic -f 'offset=%o partition=%p timestamp=%T\n' -e
手动解析 Hex 包
下面是一个真实抓到的 MetadataRequest 的前 30 字节 hex:
00 00 00 26 → Length = 38 字节
00 03 → ApiKey = 3 (Metadata)
00 0C → ApiVersion = 12
00 00 00 01 → CorrelationId = 1
00 → ClientId null (COMPACT_NULLABLE_STRING: 0=null)
00 → TAG_BUFFER (Header)
02 → Topics 数组: 1+1=2 → 1个元素
0C → Topic Name 长度 = 11+1=12
6F 72 64 65 72 2D 65 76 65 6E 74 73
→ "order-events" (12 bytes)
00 → TAG_BUFFER (topic)
00 → AllowAutoTopicCreation = false
08 → IncludeClusterAuthorizedOperations (bool flags)
00 → TAG_BUFFER (body)
Level 2 · 它是怎么运行的(3-5年经验)
本章的内部原理内容已整合到 Level 1 和 Level 3 中,请结合阅读。
Level 3 · 规范怎么定义的(资深)
为什么要了解 Kafka 的二进制协议
大多数使用 Kafka 的工程师从不需要直接接触协议层——Java 客户端、Python 的 kafka-python、Go 的 sarama 库都封装好了一切。但理解协议层,有几个重要的价值:
- 排查疑难问题:客户端卡死?连接频繁断开?用 Wireshark 抓包,读懂 hex 数据,能定位到底是客户端 Bug 还是 Broker 响应异常。
- 理解性能边界:批次大小限制来自哪里?为什么
max.request.size默认 1MB?协议帧结构直接决定这些参数的含义。 - 构建非官方语言客户端:如果你需要为一个小众语言(如 Zig、Crystal)实现 Kafka 客户端,协议规范是唯一的参考。
- 理解版本协商机制:Kafka 协议的向后兼容性设计,是它能够在不停机的情况下跨版本滚动升级的基础。
TCP:选择二进制而非 HTTP
Kafka 没有选择 HTTP/REST,而是设计了自己的二进制 TCP 协议。这个选择在 2011 年是合理的,今天看来依然正确:
为什么不用 HTTP:HTTP 是文本协议(或 HTTP/2 的 HPACK 压缩帧),有冗余的头部(Content-Type、User-Agent、Host...),每个请求至少数百字节开销。对于 Kafka 这种高频小消息场景,协议开销占比太高。此外,HTTP/1.1 默认不支持请求多路复用(需要 keep-alive + 串行),HTTP/2 虽然支持多路复用,但在 2011 年尚未出现。
二进制协议的优势:紧凑(字段按精确字节对齐,无分隔符)、解析高效(无正则匹配,直接读 N 字节)、原生支持多路复用(CorrelationId 字段)。
请求帧结构
Kafka 协议的每个请求(Request)和响应(Response)都遵循统一的帧格式。
请求帧
┌─────────────────────────────────────────────────────────────┐
│ Length (4 bytes, Big-Endian int32) │
│ ─ 后续所有字节的总长度(不包含 Length 字段本身) │
├──────────────────────────────────────┬──────────────────────┤
│ Request Header │ Request Body │
│ ┌─────────────────────────────────┐ │ (由 ApiKey 和 │
│ │ ApiKey (2 bytes, int16) │ │ ApiVersion 决定) │
│ │ ApiVersion (2 bytes, int16) │ │ │
│ │ CorrelationId (4 bytes, int32) │ │ │
│ │ ClientId (string, 可为null)│ │ │
│ │ ─ 2字节长度 + N字节UTF-8字符串 │ │ │
│ └─────────────────────────────────┘ │ │
└──────────────────────────────────────┴──────────────────────┘
Length 字段:TCP 是流式协议,数据包边界不等同于请求边界。Length 字段告诉接收方"从现在开始读 N 字节,就是一个完整请求"。Kafka 的 Processor 线程用这个字段做请求边界检测。
ApiKey:标识请求类型。Produce=0,Fetch=1,Metadata=3,...,Vote=52,BeginQuorumEpoch=53 等。完整列表见 Kafka Protocol Guide。
ApiVersion:同一 ApiKey 可以有多个版本(0、1、2...),版本之间向后兼容(新版本字段对旧版 Broker 透明)。
CorrelationId:客户端分配的请求 ID,Broker 在响应中原样返回。这实现了 TCP 连接上的请求多路复用——客户端可以在上一个响应返回前发出多个请求,靠 CorrelationId 将响应与请求匹配。
响应帧
┌─────────────────────────────────────────────┐
│ Length (4 bytes, Big-Endian int32) │
├─────────────────────────────────────────────┤
│ CorrelationId (4 bytes, int32) │
│ ─ 与对应请求的 CorrelationId 一致 │
├─────────────────────────────────────────────┤
│ Response Body │
│ (由请求的 ApiKey 和 ApiVersion 决定格式) │
└─────────────────────────────────────────────┘
响应头没有 ApiKey 和 ApiVersion,因为客户端可以通过 CorrelationId 查找对应请求,从而知道如何解析响应体。
ApiVersions 请求:连接时的握手
客户端第一次连接 Broker 时,应该发送 ApiVersionsRequest(ApiKey=18),询问 Broker 支持哪些 API 及其版本范围:
ApiVersionsRequest v3:
ApiKey: 18
ApiVersion: 3
CorrelationId: 0
ClientId: "my-producer"
Body:
ClientSoftwareName: "apache-kafka-java"
ClientSoftwareVersion: "3.7.0"
Broker 响应:
ApiVersionsResponse:
CorrelationId: 0
ErrorCode: 0 (NONE)
ApiKeys:
[ ApiKey=0 (Produce), MinVersion=0, MaxVersion=10 ]
[ ApiKey=1 (Fetch), MinVersion=0, MaxVersion=16 ]
[ ApiKey=3 (Metadata), MinVersion=0, MaxVersion=12 ]
[ ApiKey=18 (ApiVersions), MinVersion=0, MaxVersion=3 ]
...(所有支持的 API)
ThrottleTimeMs: 0
客户端选择 min(客户端支持的最高版本, Broker 支持的最高版本) 作为实际使用的版本。这是 Kafka 向后兼容性的核心机制:老客户端连接新 Broker,使用老版本;新客户端连接老 Broker,使用老 Broker 支持的最高版本。
ProduceRequest 深度解析(v9)
字段结构
ProduceRequest v9:
ApiKey: 0
ApiVersion: 9
CorrelationId: 1001
ClientId: "order-producer"
Body:
TransactionalId: null (2字节 -1 表示非事务消息)
Acks: -1 (int16: -1=all, 0=none, 1=leader)
TimeoutMs: 30000 (int32: 请求超时,毫秒)
TopicData: [ (COMPACT_ARRAY)
{
Name: "order-events" (COMPACT_STRING)
PartitionData: [
{
Index: 2 (int32: Partition 编号)
Records: <RecordBatch 二进制数据>
}
]
}
]
Compact 编码
从 Kafka 2.4 开始,协议引入了 Flexible Version(灵活版本)和 Compact 编码。Compact 编码用于数组和字符串:
- COMPACT_ARRAY:长度用 Unsigned Varint 编码(而非固定 4 字节 int32)。长度值 = 实际元素数 + 1(0 表示 null,1 表示空数组,N+1 表示 N 个元素)。
- COMPACT_STRING:长度用 Unsigned Varint 编码,字节数 = 实际字符数 + 1。
对于短字符串(长度 < 128),Varint 编码只需 1 字节,而固定 int16 需要 2 字节。在消息量极大的场景,这些节省积少成多。
实际 Hex 字节拆解
以下是一个最小 ProduceRequest(v9)的实际 hex 字节(简化版,无认证):
00 00 00 XX → Length(后续字节总数,XX 代表实际值)
00 00 → ApiKey = 0 (Produce)
00 09 → ApiVersion = 9
00 00 03 E9 → CorrelationId = 1001
00 0E → ClientId 长度 = 14 字节 (COMPACT_STRING: 实际长度+1=15 → varint=0F,此处简化)
6F 72 64 65 72 2D 70 72 6F 64 75 63 65 72
→ "order-producer" (14 bytes UTF-8)
00 → TAG_BUFFER (Flexible Version 的 tag 区,0=无额外 tag)
FF → TransactionalId = null (COMPACT_NULLABLE_STRING: 0=null)
FF FF → Acks = -1 (int16)
00 00 75 30 → TimeoutMs = 30000 (int32)
02 → TopicData 数组长度 = 1 (COMPACT_ARRAY: 1+1=2 → varint=0x02)
0D → Topic Name 长度 = 12+1=13 → varint=0x0D
6F 72 64 65 72 2D 65 76 65 6E 74 73
→ "order-events" (12 bytes)
02 → PartitionData 数组长度 = 1 (COMPACT_ARRAY: 1+1=2)
00 00 00 02 → Partition Index = 2 (int32)
XX XX XX XX → Records 长度 (COMPACT_BYTES)
<RecordBatch> → 实际消息数据(见第5章格式)
00 → TAG_BUFFER (PartitionData)
00 → TAG_BUFFER (TopicData element)
00 → TAG_BUFFER (TopicData array)
00 → TAG_BUFFER (Request Body)
TAG_BUFFER:Flexible Version 引入的扩展机制,允许在不增加 ApiVersion 的情况下为现有字段添加可选的"标签字段"。格式是 (varint tag_id, varint size, bytes data)*,以 0x00 结束。对于不使用任何 tag 的情况,就是单字节 0x00。
FetchRequest 深度解析
字段结构
FetchRequest v16:
ApiKey: 1
ApiVersion: 16
CorrelationId: 2001
ClientId: "order-consumer"
Body:
ClusterId: null (可选,用于跨集群识别)
ReplicaState: (v16新增:包含 ReplicaId 和 ReplicaEpoch)
ReplicaId: -1 (int32: -1=普通消费者,≥0=Follower)
ReplicaEpoch: -1 (int64: -1=普通消费者)
MaxWaitMs: 500 (int32: 最长等待毫秒数,对应 fetch.max.wait.ms)
MinBytes: 1 (int32: 最小返回字节,对应 fetch.min.bytes)
MaxBytes: 52428800 (int32: 总最大字节 50MB)
IsolationLevel: 1 (int8: 0=READ_UNCOMMITTED, 1=READ_COMMITTED)
SessionId: 0 (int32: Fetch Session ID,0=无状态)
SessionEpoch: -1 (int32: -1=无状态 Fetch)
Topics: [
{
TopicId: <UUID> (16字节 UUID,v13+使用 ID 代替名称)
Partitions: [
{
Partition: 2 (int32)
CurrentLeaderEpoch: 5 (int32: 消费者缓存的 Leader Epoch,用于检测过期 Leader)
FetchOffset: 1000 (int64: 从此 offset 开始获取)
LastFetchedEpoch: 5 (int32: 用于 leader epoch 边界检查)
LogStartOffset: -1 (int64: Follower 使用,消费者填 -1)
PartitionMaxBytes: 1048576 (int32: 此分区最大 1MB)
}
]
}
]
ForgottenTopicsData: [] (增量 Fetch Session 中不再需要的 Partition)
RackId: "" (消费者的机架 ID,用于 Rack-aware Fetch)
IsolationLevel 的影响
IsolationLevel=READ_COMMITTED(1)时,Broker 只返回已提交事务的消息(即 LSO,Last Stable Offset 之前的消息)。对于非事务消息,LSO=HW,行为与 READ_UNCOMMITTED 相同。对于事务型生产者发出但尚未 commit/abort 的消息,它们对 READ_COMMITTED 消费者不可见,即使这些消息已经写入日志。
Fetch Session(增量 Fetch)
在 Kafka 2.0 引入 Fetch Session 之前,每次 FetchRequest 都需要带上所有需要消费的 Partition 列表,即使这个列表从不变化。对于消费者订阅了很多 Partition 的情况,这个重复的 Partition 列表会占用大量带宽。
Fetch Session 允许消费者与 Broker 建立一个有状态的 Fetch 会话:第一次 Fetch 带完整的 Partition 列表(SessionId=0,Broker 创建会话,返回新的 SessionId 和 SessionEpoch),后续 Fetch 只携带变化的 Partition(新增、移除或 FetchOffset 变化),大幅减少请求大小。
向后兼容性保证
Kafka 协议的向后兼容性设计原则:
- 只增不删:旧版本 Request/Response 中的字段永不删除(只标记为 deprecated)。
- 新字段在末尾:新版本在老版本字段之后追加新字段,旧版 Broker 在解析新版请求时可以忽略末尾的未知字段(通过 TAG_BUFFER 实现),不会出错。
- 版本号严格递增:任何字段语义变更(即使是可选字段)都必须递增 ApiVersion。
- TaggedFields 无需递增版本:从 Flexible Version 开始,可选的"标签字段"通过 TAG_BUFFER 传输,添加新的标签字段不需要递增 ApiVersion。
这套机制保证了 Kafka 跨版本滚动升级时的兼容性:新版 Broker 可以理解旧版客户端的请求;旧版客户端连接新版 Broker 时,通过 ApiVersionsRequest 协商使用旧版 API,一切正常工作。
Kafka 官方维护了一个协议版本兼容矩阵,列出了每个 ApiKey 各版本的 Broker 版本支持情况,是跨版本升级的重要参考。
Level 4 · 边界与陷阱(所有人)
以下是与"Kafka 协议:二进制通信格式拆解"相关的常见边界问题和生产陷阱:
陷阱一:忽略默认配置的隐含假设。 许多 Kafka 参数的默认值是为通用场景设计的,在特定业务场景下可能导致性能问题或数据风险。上线前务必逐一审查与本章主题相关的配置项,确认其是否符合业务需求。
陷阱二:缺乏端到端的验证。 理论理解和生产实践之间往往存在差距。建议在预发布环境中构建完整的测试场景,覆盖正常路径和异常路径(如节点宕机、网络分区、磁盘满),验证系统行为是否符合预期。
陷阱三:监控盲区。 本章涉及的机制如果缺乏对应的监控指标和告警规则,问题往往在造成业务影响后才被发现。建议将关键指标纳入监控体系,设置合理的告警阈值。