Chapter 4

Kafka Protocol: Binary Wire Format Dissected

Why Understand the Protocol?

Most Kafka users never need to touch the wire protocol โ€” the Java client, kafka-python, sarama, and other libraries handle encoding and decoding transparently. But understanding the protocol layer has practical value that goes beyond academic curiosity:

Debugging production incidents: A consumer is stuck at a particular offset. A producer keeps getting UNKNOWN_TOPIC_OR_PARTITION errors. A connection is being closed unexpectedly. With Wireshark and the ability to read Kafka hex frames, you can determine definitively whether the problem is in the client, the broker, or the network โ€” rather than guessing.

Understanding performance limits: Why does max.request.size default to 1MB? Why does large-message handling cause broker pauses? The protocol frame structure directly constrains these parameters.

Building clients for unsupported languages: If you need a Kafka client for Zig, Swift, or an embedded system, the protocol specification is your only reference.

Understanding version negotiation: Kafka's ability to upgrade brokers and clients independently relies entirely on the protocol's version negotiation mechanism. Understanding it demystifies how Kafka achieves zero-downtime rolling upgrades.

TCP Binary Protocol: Why Not HTTP?

Kafka chose to design its own binary TCP protocol rather than use HTTP/REST. In 2011, this was the right choice; it remains defensible today.

HTTP's overhead: HTTP/1.1 headers are verbose โ€” Content-Type, User-Agent, Host, Accept-Encoding, and others add hundreds of bytes to every request, even tiny ones. For Kafka's workload of millions of small messages per second, protocol overhead compounds. HTTP/1.1 also defaults to serial request-response (no multiplexing without HTTP/2), and HTTP/2 wasn't finalized until 2015.

Binary protocol advantages: Zero padding, no delimiters, fields packed at exact byte boundaries. Parsing is a sequence of typed reads (read 4 bytes as int32, read N bytes as UTF-8 string) โ€” no regex matching, no string tokenization, no schema validation overhead.

Native multiplexing: Kafka's CorrelationId field (4 bytes in every request and response) implements full request-response multiplexing over a single TCP connection. Multiple in-flight requests coexist on the same connection, correlated by ID when responses arrive โ€” this is the foundation of max.in.flight.requests.per.connection.

Request Frame Structure

Every Kafka protocol request and response follows a common framing format.

Request Frame

Byte offset  Width    Field
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
0            4        Length (int32, big-endian)
                      Total bytes following this field.
                      TCP is a stream; this field marks frame boundaries.

4            2        ApiKey (int16)
                      Identifies the request type.
                      Produce=0, Fetch=1, ListOffsets=2, Metadata=3, ...
                      Full list: kafka.apache.org/protocol.html

6            2        ApiVersion (int16)
                      Version of this specific ApiKey.
                      Multiple versions can exist for the same ApiKey.

8            4        CorrelationId (int32)
                      Client-assigned request identifier.
                      Broker echoes it in the response for request-response matching.

12           variable ClientId (COMPACT_NULLABLE_STRING in flexible versions,
                      STRING in legacy versions)
                      Human-readable client identifier. Used in Broker metrics,
                      quota enforcement, and log messages.
                      Format: 2-byte length (int16) + UTF-8 bytes.
                      null = 2-byte value of -1.

variable     variable Request Body
                      Format determined entirely by ApiKey + ApiVersion.

Response Frame

Byte offset  Width    Field
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
0            4        Length (int32, big-endian)
                      Total bytes following this field.

4            4        CorrelationId (int32)
                      Matches the CorrelationId from the corresponding request.
                      Client uses this to route the response to the right waiter.

8            variable Response Body
                      Format determined by the ApiKey and ApiVersion of the
                      matching request (no ApiKey in response frame).

The response frame has no ApiKey or ApiVersion โ€” the client knows the format from the pending request it matched via CorrelationId.

ApiVersions Request: The Handshake

The very first message a well-behaved Kafka client sends after establishing a TCP connection is ApiVersionsRequest (ApiKey=18). This asks the broker what APIs it supports and what version range is available for each.

ApiVersionsRequest v3:
  Header:
    ApiKey:        18
    ApiVersion:    3
    CorrelationId: 0
    ClientId:      "my-producer"
  Body:
    ClientSoftwareName:    "apache-kafka-java"   (for broker-side logging)
    ClientSoftwareVersion: "3.7.0"

Broker response:

ApiVersionsResponse v3:
  CorrelationId: 0
  ErrorCode:     0      (NONE = success)
  ApiKeys:
    { ApiKey:  0 (Produce),         MinVersion: 0, MaxVersion: 10 }
    { ApiKey:  1 (Fetch),           MinVersion: 0, MaxVersion: 16 }
    { ApiKey:  2 (ListOffsets),     MinVersion: 0, MaxVersion:  8 }
    { ApiKey:  3 (Metadata),        MinVersion: 0, MaxVersion: 12 }
    { ApiKey: 18 (ApiVersions),     MinVersion: 0, MaxVersion:  3 }
    { ApiKey: 52 (Vote),            MinVersion: 0, MaxVersion:  0 }
    ...
  ThrottleTimeMs: 0
  SupportedFeatures: [...]
  FinalizedFeaturesEpoch: 1
  FinalizedFeatures: [...]

The client selects min(client_max_supported_version, broker_max_supported_version) for each API it will use. This is Kafka's forward-and-backward compatibility mechanism in its entirety: an old client connecting to a new broker uses the old API versions it knows; a new client connecting to an old broker uses the highest version the broker supports.

ProduceRequest Deep Dive (v9)

Complete Field Layout

ProduceRequest v9 is the version in use for Kafka 3.7 with Flexible Versions enabled. Fields use compact encoding (COMPACT_ARRAY, COMPACT_STRING) rather than fixed-length encodings.

ProduceRequest v9:
  Header:
    ApiKey:        0
    ApiVersion:    9
    CorrelationId: 1001
    ClientId:      "order-producer"
    TAG_BUFFER:    0x00 (no tagged header fields)

  Body:
    TransactionalId: null         (COMPACT_NULLABLE_STRING, null = not a transaction)
    Acks:            -1           (int16: -1=all ISR, 1=leader only, 0=no ack)
    TimeoutMs:       30000        (int32: request timeout in milliseconds)
    TopicData:                    (COMPACT_ARRAY: length as unsigned varint, value+1)
      TopicData[0]:
        Name:        "order-events"  (COMPACT_STRING)
        PartitionData:              (COMPACT_ARRAY)
          PartitionData[0]:
            Index:   2              (int32: partition number)
            Records: <bytes>        (COMPACT_BYTES: RecordBatch binary, see Chapter 5)
            TAG_BUFFER: 0x00
          TAG_BUFFER: 0x00
        TAG_BUFFER: 0x00
      TAG_BUFFER: 0x00
    TAG_BUFFER: 0x00

Compact Encoding Explained

Starting with Kafka 2.4, "flexible version" variants of APIs use compact encoding for variable-length fields:

COMPACT_ARRAY: Length encoded as unsigned varint where the value represents actual_length + 1. So: 0 = null, 1 = empty array, 2 = 1 element, 127 = 126 elements. For arrays up to 126 elements, the length fits in one byte (vs 4 bytes for fixed int32). Single-byte varint: values 0-127 use just the byte; values 128+ use multi-byte encoding.

COMPACT_STRING: Length as unsigned varint where value = actual_byte_length + 1. A 12-character ASCII topic name "order-events" encodes its length as 0x0D (13 in decimal = 12 + 1), saving 1 byte vs the old 2-byte int16 length.

TAG_BUFFER: Zero or more tagged fields, terminated by a 0x00 byte. Format: [(varint tag_id)(varint size)(bytes data)]*0x00. Tagged fields allow adding new optional fields to existing API versions without bumping the version number. When no tagged fields are present, the TAG_BUFFER is a single 0x00 byte.

Actual Hex Bytes: A Minimal ProduceRequest

Let's trace the exact bytes for a ProduceRequest producing one record to test (4 chars) partition 0, with acks=1, linger.ms=0:

00 00 00 XX        โ†’ Length (4 bytes): total bytes to follow
                     XX = computed after encoding the body

00 00              โ†’ ApiKey = 0 (Produce)
00 09              โ†’ ApiVersion = 9
00 00 00 01        โ†’ CorrelationId = 1

0E                 โ†’ ClientId length (COMPACT_STRING): 13+1=14 โ†’ but varint of 14 = 0x0E
70 72 6F 64 75 63 65 72 2D 31
                   โ†’ "producer-1" (10 bytes)... let's use this shorter name
                     Actually: 0x0B = 10+1=11, bytes: "producer-1"

00                 โ†’ TAG_BUFFER for header (0 tagged header fields)

FF                 โ†’ TransactionalId = null
                     COMPACT_NULLABLE_STRING: 0 = null, so: 0x00
                     Wait: null is encoded as 0x00, not 0xFF in compact form.
                     0x00 = null

00 01              โ†’ Acks = 1 (int16, big-endian)

00 00 75 30        โ†’ TimeoutMs = 30000 (int32, big-endian)

02                 โ†’ TopicData array length: 1 element โ†’ 1+1=2 โ†’ varint 0x02

05                 โ†’ Topic Name length: "test" = 4 chars โ†’ 4+1=5 โ†’ varint 0x05
74 65 73 74        โ†’ "test" (4 bytes)

02                 โ†’ PartitionData array: 1 element โ†’ varint 0x02

00 00 00 00        โ†’ Partition Index = 0 (int32)

XX XX XX XX        โ†’ Records length (COMPACT_BYTES: actual_length+1 as varint)
<RecordBatch>      โ†’ Binary RecordBatch (see Chapter 5 for format)

00                 โ†’ TAG_BUFFER (PartitionData[0])
00                 โ†’ TAG_BUFFER (TopicData[0])
00                 โ†’ TAG_BUFFER (Body)

The compact encoding is not as intuitive as fixed-width fields at first glance, but the savings are significant at scale. A topic name of 12 characters saves 1 byte per request (varint 1-byte vs int16 2-byte), which across a cluster handling 500K produce requests per second amounts to 500KB/s of reduced protocol overhead.

FetchRequest Deep Dive (v16)

Complete Field Layout

FetchRequest v16:
  Header:
    ApiKey:        1
    ApiVersion:    16
    CorrelationId: 2001
    ClientId:      "order-consumer"
    TAG_BUFFER:    0x00

  Body:
    ClusterId:       null         (optional, for cross-cluster identification)
    ReplicaState:
      ReplicaId:     -1           (int32: -1=regular consumer, โ‰ฅ0=follower replica)
      ReplicaEpoch:  -1           (int64: -1=regular consumer)
    MaxWaitMs:       500          (int32: max ms to wait if MinBytes not met)
    MinBytes:        1            (int32: min bytes before responding)
    MaxBytes:        52428800     (int32: total max bytes across all partitions)
    IsolationLevel:  1            (int8: 0=READ_UNCOMMITTED, 1=READ_COMMITTED)
    SessionId:       0            (int32: 0=stateless, >0=incremental fetch session)
    SessionEpoch:    -1           (int32: -1=initialize new session, 0+=existing)
    Topics:          (COMPACT_ARRAY)
      Topics[0]:
        TopicId:     <16-byte UUID>  (UUID: introduced in v13, replaces topic name)
        Partitions:  (COMPACT_ARRAY)
          Partitions[0]:
            Partition:           2     (int32)
            CurrentLeaderEpoch:  5     (int32: client's cached leader epoch)
            FetchOffset:         1000  (int64: fetch starting here)
            LastFetchedEpoch:    5     (int32: for leader epoch boundary detection)
            LogStartOffset:      -1    (int64: -1 for consumers; followers use real value)
            PartitionMaxBytes:   1048576  (int32: max bytes for this partition)
            TAG_BUFFER:          0x00
          TAG_BUFFER: 0x00
        TAG_BUFFER: 0x00
      TAG_BUFFER: 0x00
    ForgottenTopicsData: []     (COMPACT_ARRAY: partitions to drop from fetch session)
    RackId:          ""         (COMPACT_STRING: consumer's rack for rack-aware fetch)
    TAG_BUFFER:      0x00

CurrentLeaderEpoch: Detecting Stale Leaders

The CurrentLeaderEpoch field is a safety mechanism introduced to detect fenced leader situations. When a partition leader changes, the new leader has a higher epoch. If a consumer sends a FetchRequest with an epoch higher than the broker's current epoch for that partition, the broker knows the consumer's information is more recent, and returns UNKNOWN_LEADER_EPOCH. If the consumer's epoch is lower than the broker's, the broker returns FENCED_LEADER_EPOCH.

This prevents a consumer from fetching from an old leader that doesn't know it's been replaced โ€” a subtle source of data inconsistency in systems without epoch tracking.

IsolationLevel and LSO

IsolationLevel=READ_COMMITTED (1) restricts what the broker returns in FetchResponses:

Fetch Sessions: Eliminating Redundant Partition Lists

Before Kafka 2.0, every FetchRequest had to include the complete list of partitions the consumer wanted to fetch from, even if that list hadn't changed since the last request. For a consumer subscribed to 500 partitions, this repetitive list occupied significant bandwidth and processing overhead on both sides.

Fetch Sessions (KIP-227) solved this with stateful incremental fetching:

# Session initialization (first fetch):
FetchRequest:  SessionId=0, SessionEpoch=-1, Topics=[ALL 500 partitions]
FetchResponse: SessionId=12345, SessionEpoch=0, Data=[responses]

# Subsequent fetches (only changed partitions):
FetchRequest:  SessionId=12345, SessionEpoch=1,
               Topics=[only partitions where FetchOffset changed],
               ForgottenTopicsData=[any newly unassigned partitions]
FetchResponse: SessionId=12345, SessionEpoch=1, Data=[only changed partitions]

The broker maintains a FetchSession object mapping consumer to current fetch positions. If the broker needs to evict a session (memory pressure), it returns FETCH_SESSION_ID_NOT_FOUND, and the client reinitializes with a full partition list.

Reading Kafka Protocol with Wireshark

Setup

# Start local Kafka in KRaft mode without TLS (for easy packet capture)
# Use server.properties with: listeners=PLAINTEXT://localhost:9092

# Install Wireshark (macOS)
brew install --cask wireshark

# Or use tshark (CLI)
brew install wireshark  # includes tshark

Live Capture

# Capture all traffic on port 9092 (loopback interface on macOS)
tshark -i lo0 -f "tcp port 9092" -w /tmp/kafka-session.pcap &
TSHARK_PID=$!

# Generate some traffic
kafka-topics.sh --bootstrap-server localhost:9092 --create \
  --topic capture-test --partitions 3 --replication-factor 1

echo "test message 1" | kcat -P -b localhost:9092 -t capture-test
echo "test message 2" | kcat -P -b localhost:9092 -t capture-test
kcat -C -b localhost:9092 -t capture-test -c 2 -e

kill $TSHARK_PID

# Analyze with Wireshark's Kafka dissector (supported since Wireshark 3.x)
tshark -r /tmp/kafka-session.pcap \
  -d "tcp.port==9092,kafka" \
  -T fields \
  -e frame.number \
  -e kafka.request.api_key \
  -e kafka.request.api_version \
  -e kafka.correlationid \
  -e kafka.topic_name \
  -e kafka.partition_id \
  -e kafka.offset

Manual Hex Analysis with kcat

# Equivalent to sending ApiVersionsRequest + receiving response
kcat -b localhost:9092 -L -v 2>&1 | head -50

# Produce with verbose output showing request/response details
kcat -b localhost:9092 -P -t capture-test -v \
  -X debug=protocol \
  -X socket.blocking.max.ms=100 \
  <<< "{"orderId":"12345","amount":99.99}"

# Consume with offset and metadata display
kcat -b localhost:9092 -C -t capture-test \
  -f "Partition: %p | Offset: %o | Timestamp: %T | Key: %k | Value: %s\n" \
  -e

Decoding a Raw Packet Manually

Given a hex dump of a captured MetadataRequest:

Raw hex (after TCP reassembly, starting at the Kafka frame):
00 00 00 1C 00 03 00 0C 00 00 00 01 00 00 02 0C
6F 72 64 65 72 2D 65 76 65 6E 74 73 00 01 00 00 00

Decoded:
00 00 00 1C  โ†’ Length = 28 (bytes following)
00 03        โ†’ ApiKey = 3 (Metadata)
00 0C        โ†’ ApiVersion = 12
00 00 00 01  โ†’ CorrelationId = 1
00           โ†’ ClientId = null (COMPACT_NULLABLE_STRING: 0x00)
00           โ†’ TAG_BUFFER (header): no tagged header fields

02           โ†’ Topics array: 1 element (COMPACT_ARRAY: varint 2 = 1 element)
0D           โ†’ Topic name length: varint 13 = actual length 12
6F 72 64 65 72 2D 65 76 65 6E 74 73
             โ†’ "order-events" (12 bytes)
00           โ†’ TAG_BUFFER (topic element)

01           โ†’ AllowAutoTopicCreation = true (boolean: 0x01)
00           โ†’ IncludeTopicAuthorizedOperations = false
00           โ†’ TAG_BUFFER (body)

Backward Compatibility Guarantees

Kafka's protocol versioning model provides formal guarantees that make zero-downtime upgrades possible:

Rule 1: Fields are never removed. A field that existed in version N of an API remains present through all future versions. It may be deprecated (marked in the schema as ignored), but the byte offset is never reclaimed. This ensures old clients' encodings remain parseable by new brokers.

Rule 2: New fields are appended. New fields added in a newer API version come after all existing fields. An old broker parsing a new client's request can safely ignore trailing bytes it doesn't recognize. An old client's request will simply lack the new trailing fields, which the new broker handles by using default values.

Rule 3: Version numbers are strictly increasing. Any change in field semantics โ€” even making an optional field required, or changing the interpretation of a value โ€” mandates a version bump. Clients can rely on version numbers to indicate precisely what fields and semantics are available.

Rule 4: Tagged fields don't require version bumps. From flexible-version APIs onward, new optional fields can be added as tagged fields in the TAG_BUFFER without bumping the ApiVersion. Older clients and brokers simply skip unknown tag IDs. This was a significant improvement introduced with KIP-482.

Practical implication: You can upgrade brokers to a new version before upgrading clients. New brokers support all old API versions. Clients continue using old API versions until they're upgraded, at which point they negotiate newer versions. This is the mechanism that makes Kafka's rolling upgrade story work in practice.

The official Kafka Protocol Guide maintains a complete compatibility matrix showing which broker versions support which API key versions โ€” an essential reference for planning cluster upgrades in heterogeneous client environments.

Rate this chapter
4.5  / 5  (76 ratings)

๐Ÿ’ฌ Comments