第 7 章

幂等与事务:Exactly-Once 的代价与边界

第7章:幂等与事务:Exactly-Once 的代价与边界

导读:Kafka 如何在网络不可靠的前提下实现精确一次(Exactly-Once)语义?

本章核心问题:Kafka 如何在网络不可靠的前提下实现精确一次(Exactly-Once)语义?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

网络是不可靠的。即使你的 Kafka 集群运行完美,Producer 在发送消息后也可能在等待 ACK 期间遭遇网络超时。此时 Producer 不知道消息是否已写入——它只能重试。重试解决了"至少一次"(at-least-once)的问题,但引入了重复。Kafka 的幂等性和事务机制,就是在这个困境上构建"精确一次"(Exactly-Once Semantics,EOS)的工程答案。

幂等性:Producer ID + 序列号

幂等性的核心思想极其简单:给每个 Producer 分配一个唯一 ID(PID,Producer ID),并为每个分区维护一个单调递增的序列号。Broker 看到重复的(PID, 分区, 序列号)组合时,直接丢弃,不写入日志。

PID 的分配

Producer 第一次连接时(或调用 initTransactions() 时),向任意 Broker 发送 InitProducerIdRequest。该请求被路由到 TransactionCoordinator,后者分配一个全局唯一的 PID 并持久化到 __transaction_state topic(对于事务性 Producer)或直接返回(对于仅幂等 Producer)。

PID 是 64 位整数,由 Broker 端的 ProducerIdManager 通过 ZooKeeper(旧版)或 KRaft(3.x 新版)管理,以批次(默认 1000 个 ID 一批)向前预分配,避免每次 Producer 启动都需要单独分配的性能开销。

序列号机制

Producer 为每个 TopicPartition 维护一个序列号,从 0 开始:

// ProducerBatch 创建时记录序列号
// Kafka 源码: ProducerIdAndEpoch + baseSequence
public class ProducerBatch {
    // baseSequence 是该批次第一条记录的序列号
    // 批次内的每条记录隐式地 baseSequence + recordIndex
    int baseSequence() { return baseSequence; }
}

Broker 端每个分区维护一个状态:(PID, epoch, lastSeq)。收到 ProduceRequest 时:

若 incoming_seq == lastSeq + 1: 正常写入,更新 lastSeq
若 incoming_seq <= lastSeq:    重复,丢弃,返回 DuplicateSequenceNumber
若 incoming_seq > lastSeq + 1: 乱序,返回 OutOfOrderSequenceNumber(触发 Producer 重置)

这个状态在 Broker 的内存中(ProducerStateManager),同时通过 Leader epoch checkpoint 持久化,Leader 切换后新 Leader 能恢复该状态。

enable.idempotence=true 的隐含约束

开启幂等性不是免费的,它强制以下配置:

enable.idempotence=true
# 以下三项被自动强制设置(即使你显式配置了其他值也会被覆盖并警告)
acks=all                          # 必须等待所有 ISR 副本确认
retries=Integer.MAX_VALUE         # 必须无限重试
max.in.flight.requests.per.connection=5  # 最大 5 个在途请求(保证幂等的前提下)

为什么 acks=all 是必须的?因为幂等性依赖 Broker 端保存的序列号状态。如果 Leader 宕机,新 Leader 必须有完整的序列号历史才能正确去重。acks=all 保证消息在 Leader 写入后所有 ISR 副本都有副本,新 Leader 必然包含完整状态。

事务:跨分区的原子写入

幂等性解决了单分区的去重问题。但许多场景需要"原子性地向多个分区写入消息,同时提交消费位移",这就是事务的需求场景。

事务的典型使用模式

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("transactional.id", "order-processor-1"); // 必须设置,全局唯一
props.put("enable.idempotence", "true");             // 事务隐含幂等性

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 1. 初始化事务(每个 Producer 生命周期只调用一次)
producer.initTransactions();

try {
    // 2. 开始事务
    producer.beginTransaction();

    // 3. 向多个分区写入(可以是不同 topic)
    producer.send(new ProducerRecord<>("orders", orderId, orderJson));
    producer.send(new ProducerRecord<>("inventory", itemId, inventoryUpdate));

    // 4. 将消费位移也纳入事务(实现 consume-transform-produce 原子性)
    Map<TopicPartition, OffsetAndMetadata> offsets = ...;
    producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);

    // 5. 提交事务
    producer.commitTransaction();

} catch (ProducerFencedException | OutOfOrderSequenceException e) {
    // 致命错误,无法恢复,必须关闭 Producer
    producer.close();
} catch (KafkaException e) {
    // 可中止的错误
    producer.abortTransaction();
}

TransactionCoordinator:事务的大脑

每个事务性 Producer 都有一个对应的 TransactionCoordinator。如何找到它?与 Consumer Group 的 GroupCoordinator 类似,通过 transactional.id 的哈希找到 __transaction_state topic 对应分区的 Leader Broker:

coordinator_partition = hash(transactional.id) % __transaction_state.partitions

__transaction_state topic 默认 50 个分区,持久化事务状态,使用 Log Compaction 保留每个 transactional.id 的最新状态。

TransactionCoordinator 状态机

Empty ──initTransaction──► Ongoing ──commitTransaction──► PrepareCommit
                          │                               │
                          └──abortTransaction──► PrepareAbort   │
                                                │          ▼
                                                └──────► CompleteCommit / CompleteAbort
                                                               │
                                                               └──► Empty
状态 含义
Empty 初始状态,或事务完成后
Ongoing 事务进行中,可以追加分区
PrepareCommit 两阶段提交的第一阶段已完成,等待写入 COMMIT marker
PrepareAbort 准备中止,等待写入 ABORT marker
CompleteCommit 所有分区已写入 COMMIT marker
CompleteAbort 所有分区已写入 ABORT marker

两阶段提交协议

Kafka 事务在底层使用两阶段提交(2PC),但这个 2PC 完全在 Kafka 内部执行,对应用透明:

阶段一:记录参与者

1. AddPartitionsToTxnRequest(Producer → Coordinator)
   告知 Coordinator 本次事务涉及的所有 TopicPartition
   Coordinator 将这些分区记录到 __transaction_state

2. Producer 正常 send() 消息(携带 PID + epoch + sequence)
   Broker 将这批消息暂时标记为"未提交"(uncommitted)

3. AddOffsetsToTxnRequest(Producer → Coordinator)
   如果调用了 sendOffsetsToTransaction,告知 Coordinator 涉及的消费组

4. TxnOffsetCommitRequest(Producer → GroupCoordinator)
   将位移提交写入 __consumer_offsets,但标记为事务中(未提交)

阶段二:提交或中止

5. EndTxnRequest(Producer → Coordinator,COMMIT 或 ABORT)
   Coordinator 将事务状态更新为 PrepareCommit/PrepareAbort 并持久化

6. WriteTxnMarkersRequest(Coordinator → 所有参与分区的 Leader Broker)
   向每个参与分区写入 COMMIT marker 或 ABORT marker
   这是特殊的控制消息,消费者看到 COMMIT marker 才能读取该事务的消息

7. Coordinator 将事务状态更新为 CompleteCommit/CompleteAbort

COMMIT marker 是一个特殊的 batch 头,包含 endTransactionMarker=COMMIT。当 Consumer 以 read_committed 隔离级别消费时,它会缓冲所有未见到 COMMIT marker 的消息,直到 COMMIT marker 到达才释放给应用层。

Zombie Fencing:防止旧 Producer 干扰

考虑这个场景:一个 transactional.id=order-processor-1 的 Producer 实例因 GC 暂停或网络分区假死,系统认为它宕机并启动了新实例。旧实例恢复后,两个实例都在向同一分区写入——这就是"僵尸"问题。

Kafka 用 epoch 解决这个问题:

// 旧 Producer 发送 epoch=3 的请求
// Coordinator 已知该 transactional.id 的 epoch=4(新 Producer 注册过)
// Broker 拒绝: "Producer attempted an operation with an old epoch"

这确保了任何时刻只有 epoch 最高的 Producer 才能成功写入,旧实例(zombie)会立即被拒绝,必须停止工作。

Consumer 的隔离级别

事务写入的消息在 Broker 端存储时,外部看起来和普通消息一样。Consumer 通过 isolation.level 决定自己如何对待这些消息:

read_uncommitted(默认):消费者读到所有消息,包括进行中的事务写入的消息和最终被中止的事务消息。如果你消费到了一个后来被 ABORT 的事务的消息,你处理的就是"幻读"数据。

read_committed:消费者只读取已提交事务的消息(以及非事务消息)。它维护一个 Last Stable Offset (LSO),只读取 LSO 之前的消息。LSO 是最早未提交事务的起始 offset,消费者不会越过 LSO,避免读到中间状态。

# 创建支持事务的 topic(确保足够的复制因子)
kafka-topics.sh --bootstrap-server kafka1:9092 \
  --create --topic orders \
  --partitions 12 \
  --replication-factor 3 \
  --config min.insync.replicas=2

# 消费时指定 read_committed 隔离级别
kafka-console-consumer.sh --bootstrap-server kafka1:9092 \
  --topic orders \
  --isolation-level read_committed

Level 2 · 它是怎么运行的(3-5年经验)

本章的内部原理内容已整合到 Level 1 和 Level 3 中,请结合阅读。


Level 3 · 规范怎么定义的(资深)

本章的协议与规范细节已融入上述 Level 中。如需深入了解,请参阅 Kafka 官方协议文档和对应 KIP 提案。


Level 4 · 边界与陷阱(所有人)

问题的根源:为什么重试会产生重复

假设 Producer 发送一个 ProduceRequest 包含消息 M:

  1. 请求到达 Broker,Broker 成功写入 M 到分区日志
  2. Broker 准备发送 ACK,但此时网络故障,ACK 丢失
  3. Producer 超时,认为发送失败,触发重试
  4. 第二个 ProduceRequest 再次发送 M
  5. Broker 又写入一次 M
  6. 消费者读到两条相同的 M

传统的消息系统要么接受这种"至少一次"语义,要么要求应用层自行去重(通过数据库唯一键、业务 ID 等)。Kafka 从 0.11 版本开始在 Broker 层内置去重机制,彻底解决这个问题。

EOS 的性能代价与适用边界

精确一次不是免费的,它带来真实的性能开销:

幂等性开销(约 3-8%)

事务开销(约 10-20%)

实测参考数据(1KB 消息,1 个 Broker,单分区):

配置 TPS P99 延迟
acks=1, 无幂等 650K 5ms
acks=all, 幂等 420K 12ms
事务(每10条提交一次) 280K 35ms
事务(每1000条提交一次) 390K 40ms

减少事务开销的关键:增大事务批次。每次 commitTransaction() 有固定的 2PC 开销,与事务内包含多少消息无关。因此,将更多消息放入一个事务(transactional.message.timeout.ms 控制超时,默认 60000ms)能显著摊薄每条消息的事务成本。

什么时候真正需要 EOS?

什么时候不需要 EOS?

理解幂等性和事务的内部机制,才能在"我需要 EOS"和"EOS 的代价是否值得"之间做出正确判断。下一章我们将离开正确性保证,转向纯粹的性能:如何将 Producer 吞吐量推向百万 TPS。

本章评分
4.6  / 5  (52 评分)

💬 留言讨论