Chapter 7

Idempotence and Transactions: The Cost and Boundaries of EOS

Networks are unreliable. Even with a perfectly healthy Kafka cluster, a producer can time out waiting for an acknowledgment after a message has already been durably written. The producer has no way to distinguish "the broker crashed before writing" from "the ACK was lost in transit." Its only safe choice is to retry โ€” and retrying creates duplicates. Kafka's idempotence and transaction mechanisms are the engineering answer to building Exactly-Once Semantics (EOS) on top of this inherent ambiguity.

The Root Cause: Why Retries Produce Duplicates

Consider a producer sending message M in a ProduceRequest:

  1. The request arrives at the broker; the broker writes M to the partition log and flushes
  2. The broker sends the ACK, but a network glitch drops it
  3. The producer times out and retries โ€” it sends M again in a new ProduceRequest
  4. The broker writes M a second time
  5. Consumers receive M twice

Traditional messaging systems accept at-least-once semantics or push deduplication responsibility to the application layer (unique keys in a database, idempotent business logic, etc.). Kafka 0.11 introduced broker-side deduplication, making the producer-to-broker leg exactly-once without any application changes.

Idempotence: Producer ID and Sequence Numbers

The core idea is elegantly simple: assign every producer a globally unique Producer ID (PID) and have the producer stamp each partition's records with a monotonically increasing sequence number. The broker discards any record whose sequence number it has already seen.

PID Assignment

When a producer is initialized (or when initTransactions() is called), it sends an InitProducerIdRequest to any broker. That broker forwards it to the TransactionCoordinator responsible for this producer (determined by transactional.id if present, otherwise any coordinator). The coordinator allocates a PID from ProducerIdManager.

ProducerIdManager in KRaft mode (Kafka 3.x) pre-allocates PID blocks (default 1,000 IDs per block) by writing to the Raft log, avoiding the per-producer ZooKeeper write that older versions required. The 64-bit PID space is effectively inexhaustible.

How Sequence Numbers Work

For every TopicPartition, the producer tracks a sequence number starting at 0. Each ProducerBatch carries the baseSequence of its first record; records within the batch are implicitly numbered baseSequence + recordIndex.

The broker, in ProducerStateManager, maintains per-(PID, partition) state:

// Simplified from ProducerStateManager in Kafka source
class ProducerStateEntry {
    long producerId;
    short producerEpoch;
    int lastSeq;       // last successfully written sequence number
    long lastOffset;   // last written offset (for duplicate detection response)
    // ...
}

On each ProduceRequest, the broker validates:

if (incomingSeq == lastSeq + batchCount):
    โ†’ accept, write, update lastSeq

if (incomingSeq <= lastSeq):
    โ†’ DUPLICATE: discard silently, return cached offset in success response

if (incomingSeq > lastSeq + batchCount):
    โ†’ OUT OF ORDER: return OutOfOrderSequenceException
      (producer resets its sequence state and reconnects)

This state is kept in memory and checkpointed to the leader epoch checkpoint file on disk. When a leader failover occurs, the new leader reloads the checkpoint and can immediately validate incoming sequences โ€” there is no recovery window during which duplicates could slip through.

Implied Constraints of enable.idempotence=true

Enabling idempotence is not free โ€” it silently forces three other settings. Kafka 3.x will emit a warning if you try to override them:

enable.idempotence=true

# These are automatically enforced โ€” you cannot relax them:
acks=all                                    # ensures replica state durability
retries=2147483647                          # Integer.MAX_VALUE โ€” retry forever
max.in.flight.requests.per.connection=5     # max allowed while preserving idempotence

Why acks=all? Broker-side sequence state must survive leader failover. With acks=all, any acknowledged write is guaranteed to be on every ISR replica. The new leader therefore has complete sequence history and can correctly deduplicate retries. With acks=1, a message acknowledged by the leader might not be replicated yet; the new leader would have a gap in sequence state and could accept a retried message as a new write.

Why max.in.flight <= 5? With idempotence, the broker can reorder batches as long as gaps of up to 5 are allowed in the sliding deduplication window. Allowing more than 5 breaks the gap-detection invariant and the broker would reject valid sequences as out-of-order.

Transactions: Atomic Writes Across Partitions

Idempotence handles single-partition deduplication. Many real-world pipelines need to atomically:

This is the consume-transform-produce pattern, and transactions make it atomic.

The Standard Transaction Pattern

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("transactional.id", "order-enrichment-v1"); // globally unique, stable
props.put("enable.idempotence", "true");               // implied by transactions
props.put("transaction.timeout.ms", "30000");          // abort if uncommitted for 30s

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Called ONCE per producer lifetime โ€” registers PID and bumps epoch
producer.initTransactions();

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    if (records.isEmpty()) continue;

    try {
        producer.beginTransaction();

        for (ConsumerRecord<String, String> record : records) {
            String enriched = enrich(record.value());
            producer.send(new ProducerRecord<>("enriched-orders", record.key(), enriched));
        }

        // Atomically commit the source offsets with the output messages
        Map<TopicPartition, OffsetAndMetadata> offsets = currentOffsets(records);
        producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());

        producer.commitTransaction(); // triggers 2PC

    } catch (ProducerFencedException | OutOfOrderSequenceException e) {
        // Fatal: a newer producer instance has taken over. Stop immediately.
        producer.close();
        break;
    } catch (KafkaException e) {
        producer.abortTransaction(); // rolls back all in-flight writes
        consumer.seek(...);          // reset consumer position if needed
    }
}

The TransactionCoordinator

Every transactional producer has a designated TransactionCoordinator โ€” a broker that owns the __transaction_state partition for this transactional.id:

coordinator_partition = abs(hash(transactional.id)) % __transaction_state.num.partitions

__transaction_state defaults to 50 partitions and uses Log Compaction to retain only the latest state per transactional.id. The coordinator is the single source of truth for transaction state, making the 2PC protocol crash-safe.

TransactionCoordinator State Machine

         initTransactions()
Empty โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–บ Ongoing
  โ–ฒ                              โ”‚
  โ”‚    CompleteCommit             โ”‚  commitTransaction()
  โ”‚  โ—„โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ PrepareCommit
  โ”‚                              โ”‚
  โ”‚    CompleteAbort              โ”‚  abortTransaction() or timeout
  โ”‚  โ—„โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ PrepareAbort
  โ”‚                              โ”‚
  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

The PrepareCommit and PrepareAbort states are durable. If the coordinator crashes mid-commit, when it recovers it finds the PrepareCommit record and continues the commit process. The client-facing API never sees this crash โ€” commitTransaction() simply takes longer to return.

The Two-Phase Commit Protocol in Detail

Kafka's internal 2PC spans five request types:

Phase 1 โ€” Register Participants:

โ‘  AddPartitionsToTxnRequest  (Producer โ†’ Coordinator)
   Body: transactionalId, producerId, epoch, [TopicPartition, ...]
   Effect: Coordinator writes participant list to __transaction_state

โ‘ก ProduceRequest             (Producer โ†’ Partition Leader Brokers)
   Body: records with PID + epoch + sequence, transactional flag set
   Effect: Broker writes records, marks them "uncommitted" (invisible to read_committed consumers)
   Storage: stored in normal log segments, not separated

โ‘ข AddOffsetsToTxnRequest     (Producer โ†’ Coordinator)  [only if sendOffsetsToTransaction called]
   Body: transactionalId, producerId, epoch, groupId
   Effect: Coordinator adds __consumer_offsets partition to participant list

โ‘ฃ TxnOffsetCommitRequest     (Producer โ†’ Group Coordinator)
   Body: group, offsets, producerId, epoch
   Effect: Group Coordinator writes offset records marked as transactional (uncommitted)

Phase 2 โ€” Commit or Abort:

โ‘ค EndTxnRequest              (Producer โ†’ Coordinator, result=COMMIT or ABORT)
   Effect: Coordinator atomically transitions state to PrepareCommit/PrepareAbort
           This is the decision point โ€” once written, the outcome is determined

โ‘ฅ WriteTxnMarkersRequest     (Coordinator โ†’ each participating Partition Leader)
   Body: producerId, epoch, result (COMMIT/ABORT), [TopicPartition, ...]
   Effect: Each broker appends a CONTROL batch with EndTransactionMarker
           to each participating partition

โ‘ฆ Coordinator writes CompleteCommit/CompleteAbort to __transaction_state
   Returns success to the producer's commitTransaction() call

The COMMIT marker is a special control record in the log. To consumers with isolation.level=read_committed, messages from a transaction are invisible until the COMMIT marker appears. If an ABORT marker appears instead, those records are permanently skipped.

Zombie Fencing: Protecting Against Old Producers

Consider a common failure in stream processing: a producer pod appears dead (GC pause, network partition) and the scheduler launches a replacement. Minutes later, the original pod recovers. Now two instances share the same transactional.id and are writing to the same partitions โ€” a "zombie producer."

Kafka solves this with epoch fencing:

  1. Every initTransactions() call increments the epoch for the transactional.id in the coordinator
  2. Every request sent by the producer carries its epoch
  3. Any broker receiving a request with a lower epoch than what the coordinator has recorded rejects it with ProducerFencedException
// Old producer (epoch=3) attempts to write
// Coordinator has registered epoch=4 (new producer called initTransactions())
// Broker response: ProducerFencedException
// "Producer epoch 3 is not the current epoch; current epoch is 4"

// Old producer MUST call producer.close() upon receiving this exception
// It cannot continue โ€” any retry would face the same rejection

The epoch is a 16-bit integer that increments monotonically per transactional.id. Overflow is a theoretical concern only (65,535 restarts per transactional.id).

Consumer Isolation Levels

Transactionally-written records sit in the log alongside committed and uncommitted data. The consumer's isolation.level determines what it sees:

read_uncommitted (default)

Consumers read all records, including records from in-progress transactions and records from transactions that will later be aborted. An application that processes these "phantom" messages and then finds them aborted has made decisions based on data that was never officially committed.

This mode is fine for use cases where transactions are not used on the producer side, or where occasional duplicate/phantom processing is acceptable.

read_committed

The consumer only reads records from committed transactions (and non-transactional records). It enforces this through the Last Stable Offset (LSO):

LSO = minimum start offset of all open (uncommitted) transactions

Consumer fetch: can only return records up to LSO - 1

If a long-running transaction holds the LSO at offset 1000 while the log end offset (LEO) is at offset 500,000, a read_committed consumer is effectively stuck โ€” it cannot read the records between 1000 and 500,000 even though most of them are from other, already-committed transactions. This is LSO lag, and it's a real operational hazard with transactions that take minutes to commit.

# Monitor LSO lag via consumer group describe
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 \
  --describe --group my-consumer-group

# The "LAG" column shows: LEO - committed_offset
# For read_committed consumers, effective lag = LEO - LSO + LSO - committed_offset
# The LSO component is invisible in standard tools โ€” use kafka-transactions.sh

kafka-transactions.sh --bootstrap-server kafka1:9092 \
  --list  # shows all open transactions and their age

EOS Performance Cost and Decision Framework

Exactly-once is not free. Here are representative numbers for a single-broker setup with 1KB messages:

Configuration Throughput P99 Latency Durability
acks=1, no idempotence 650K TPS 5ms At-most-once
acks=all, idempotence 420K TPS 12ms Exactly-once (single partition)
Transactions (commit per 10 records) 180K TPS 45ms Exactly-once (multi-partition)
Transactions (commit per 1000 records) 390K TPS 48ms Exactly-once (multi-partition)

The single most impactful transaction optimization: maximize records per transaction. The 2PC overhead is fixed per commitTransaction() call. A transaction committing 1,000 records amortizes the cost to ~0.045ms per record; a transaction committing 10 records pays ~4.5ms per record. Configure transaction.max.timeout.ms and your loop batch size accordingly.

Overhead sources:

When EOS Is Worth It

Use EOS when:

Skip EOS when:

Understanding idempotence and transactions at the protocol level โ€” PID assignment, sequence validation, epoch fencing, two-phase commit, LSO semantics โ€” is what separates "I turned on enable.idempotence=true" from "I understand exactly what guarantees I have and what they cost me." The next chapter turns from correctness to performance: how to push a Kafka producer to one million records per second.

Rate this chapter
4.6  / 5  (52 ratings)

๐Ÿ’ฌ Comments