幂等与事务:Exactly-Once 的代价与边界
第7章:幂等与事务:Exactly-Once 的代价与边界
导读:Kafka 如何在网络不可靠的前提下实现精确一次(Exactly-Once)语义?
本章核心问题:Kafka 如何在网络不可靠的前提下实现精确一次(Exactly-Once)语义?
读完本章你将理解:
- 幂等性原理:Producer ID + 序列号去重
- 事务的两阶段提交协议与 TransactionCoordinator 状态机
- Zombie Fencing 防止僵尸生产者
- EOS 的性能代价量化与适用边界判断
Level 1 · 你需要知道的(1-3年经验)
网络是不可靠的。即使你的 Kafka 集群运行完美,Producer 在发送消息后也可能在等待 ACK 期间遭遇网络超时。此时 Producer 不知道消息是否已写入——它只能重试。重试解决了"至少一次"(at-least-once)的问题,但引入了重复。Kafka 的幂等性和事务机制,就是在这个困境上构建"精确一次"(Exactly-Once Semantics,EOS)的工程答案。
幂等性:Producer ID + 序列号
幂等性的核心思想极其简单:给每个 Producer 分配一个唯一 ID(PID,Producer ID),并为每个分区维护一个单调递增的序列号。Broker 看到重复的(PID, 分区, 序列号)组合时,直接丢弃,不写入日志。
PID 的分配
Producer 第一次连接时(或调用 initTransactions() 时),向任意 Broker 发送 InitProducerIdRequest。该请求被路由到 TransactionCoordinator,后者分配一个全局唯一的 PID 并持久化到 __transaction_state topic(对于事务性 Producer)或直接返回(对于仅幂等 Producer)。
PID 是 64 位整数,由 Broker 端的 ProducerIdManager 通过 ZooKeeper(旧版)或 KRaft(3.x 新版)管理,以批次(默认 1000 个 ID 一批)向前预分配,避免每次 Producer 启动都需要单独分配的性能开销。
序列号机制
Producer 为每个 TopicPartition 维护一个序列号,从 0 开始:
// ProducerBatch 创建时记录序列号
// Kafka 源码: ProducerIdAndEpoch + baseSequence
public class ProducerBatch {
// baseSequence 是该批次第一条记录的序列号
// 批次内的每条记录隐式地 baseSequence + recordIndex
int baseSequence() { return baseSequence; }
}
Broker 端每个分区维护一个状态:(PID, epoch, lastSeq)。收到 ProduceRequest 时:
若 incoming_seq == lastSeq + 1: 正常写入,更新 lastSeq
若 incoming_seq <= lastSeq: 重复,丢弃,返回 DuplicateSequenceNumber
若 incoming_seq > lastSeq + 1: 乱序,返回 OutOfOrderSequenceNumber(触发 Producer 重置)
这个状态在 Broker 的内存中(ProducerStateManager),同时通过 Leader epoch checkpoint 持久化,Leader 切换后新 Leader 能恢复该状态。
enable.idempotence=true 的隐含约束
开启幂等性不是免费的,它强制以下配置:
enable.idempotence=true
# 以下三项被自动强制设置(即使你显式配置了其他值也会被覆盖并警告)
acks=all # 必须等待所有 ISR 副本确认
retries=Integer.MAX_VALUE # 必须无限重试
max.in.flight.requests.per.connection=5 # 最大 5 个在途请求(保证幂等的前提下)
为什么 acks=all 是必须的?因为幂等性依赖 Broker 端保存的序列号状态。如果 Leader 宕机,新 Leader 必须有完整的序列号历史才能正确去重。acks=all 保证消息在 Leader 写入后所有 ISR 副本都有副本,新 Leader 必然包含完整状态。
事务:跨分区的原子写入
幂等性解决了单分区的去重问题。但许多场景需要"原子性地向多个分区写入消息,同时提交消费位移",这就是事务的需求场景。
事务的典型使用模式
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("transactional.id", "order-processor-1"); // 必须设置,全局唯一
props.put("enable.idempotence", "true"); // 事务隐含幂等性
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 1. 初始化事务(每个 Producer 生命周期只调用一次)
producer.initTransactions();
try {
// 2. 开始事务
producer.beginTransaction();
// 3. 向多个分区写入(可以是不同 topic)
producer.send(new ProducerRecord<>("orders", orderId, orderJson));
producer.send(new ProducerRecord<>("inventory", itemId, inventoryUpdate));
// 4. 将消费位移也纳入事务(实现 consume-transform-produce 原子性)
Map<TopicPartition, OffsetAndMetadata> offsets = ...;
producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
// 5. 提交事务
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
// 致命错误,无法恢复,必须关闭 Producer
producer.close();
} catch (KafkaException e) {
// 可中止的错误
producer.abortTransaction();
}
TransactionCoordinator:事务的大脑
每个事务性 Producer 都有一个对应的 TransactionCoordinator。如何找到它?与 Consumer Group 的 GroupCoordinator 类似,通过 transactional.id 的哈希找到 __transaction_state topic 对应分区的 Leader Broker:
coordinator_partition = hash(transactional.id) % __transaction_state.partitions
__transaction_state topic 默认 50 个分区,持久化事务状态,使用 Log Compaction 保留每个 transactional.id 的最新状态。
TransactionCoordinator 状态机
Empty ──initTransaction──► Ongoing ──commitTransaction──► PrepareCommit
│ │
└──abortTransaction──► PrepareAbort │
│ ▼
└──────► CompleteCommit / CompleteAbort
│
└──► Empty
| 状态 | 含义 |
|---|---|
Empty |
初始状态,或事务完成后 |
Ongoing |
事务进行中,可以追加分区 |
PrepareCommit |
两阶段提交的第一阶段已完成,等待写入 COMMIT marker |
PrepareAbort |
准备中止,等待写入 ABORT marker |
CompleteCommit |
所有分区已写入 COMMIT marker |
CompleteAbort |
所有分区已写入 ABORT marker |
两阶段提交协议
Kafka 事务在底层使用两阶段提交(2PC),但这个 2PC 完全在 Kafka 内部执行,对应用透明:
阶段一:记录参与者
1. AddPartitionsToTxnRequest(Producer → Coordinator)
告知 Coordinator 本次事务涉及的所有 TopicPartition
Coordinator 将这些分区记录到 __transaction_state
2. Producer 正常 send() 消息(携带 PID + epoch + sequence)
Broker 将这批消息暂时标记为"未提交"(uncommitted)
3. AddOffsetsToTxnRequest(Producer → Coordinator)
如果调用了 sendOffsetsToTransaction,告知 Coordinator 涉及的消费组
4. TxnOffsetCommitRequest(Producer → GroupCoordinator)
将位移提交写入 __consumer_offsets,但标记为事务中(未提交)
阶段二:提交或中止
5. EndTxnRequest(Producer → Coordinator,COMMIT 或 ABORT)
Coordinator 将事务状态更新为 PrepareCommit/PrepareAbort 并持久化
6. WriteTxnMarkersRequest(Coordinator → 所有参与分区的 Leader Broker)
向每个参与分区写入 COMMIT marker 或 ABORT marker
这是特殊的控制消息,消费者看到 COMMIT marker 才能读取该事务的消息
7. Coordinator 将事务状态更新为 CompleteCommit/CompleteAbort
COMMIT marker 是一个特殊的 batch 头,包含 endTransactionMarker=COMMIT。当 Consumer 以 read_committed 隔离级别消费时,它会缓冲所有未见到 COMMIT marker 的消息,直到 COMMIT marker 到达才释放给应用层。
Zombie Fencing:防止旧 Producer 干扰
考虑这个场景:一个 transactional.id=order-processor-1 的 Producer 实例因 GC 暂停或网络分区假死,系统认为它宕机并启动了新实例。旧实例恢复后,两个实例都在向同一分区写入——这就是"僵尸"问题。
Kafka 用 epoch 解决这个问题:
- 每次调用
initTransactions(),Coordinator 为该transactional.id的 epoch 加 1 - Producer 发送的每个请求都携带 epoch
- Broker 拒绝 epoch 低于当前已知 epoch 的任何请求,返回
ProducerFencedException
// 旧 Producer 发送 epoch=3 的请求
// Coordinator 已知该 transactional.id 的 epoch=4(新 Producer 注册过)
// Broker 拒绝: "Producer attempted an operation with an old epoch"
这确保了任何时刻只有 epoch 最高的 Producer 才能成功写入,旧实例(zombie)会立即被拒绝,必须停止工作。
Consumer 的隔离级别
事务写入的消息在 Broker 端存储时,外部看起来和普通消息一样。Consumer 通过 isolation.level 决定自己如何对待这些消息:
read_uncommitted(默认):消费者读到所有消息,包括进行中的事务写入的消息和最终被中止的事务消息。如果你消费到了一个后来被 ABORT 的事务的消息,你处理的就是"幻读"数据。
read_committed:消费者只读取已提交事务的消息(以及非事务消息)。它维护一个 Last Stable Offset (LSO),只读取 LSO 之前的消息。LSO 是最早未提交事务的起始 offset,消费者不会越过 LSO,避免读到中间状态。
# 创建支持事务的 topic(确保足够的复制因子)
kafka-topics.sh --bootstrap-server kafka1:9092 \
--create --topic orders \
--partitions 12 \
--replication-factor 3 \
--config min.insync.replicas=2
# 消费时指定 read_committed 隔离级别
kafka-console-consumer.sh --bootstrap-server kafka1:9092 \
--topic orders \
--isolation-level read_committed
Level 2 · 它是怎么运行的(3-5年经验)
本章的内部原理内容已整合到 Level 1 和 Level 3 中,请结合阅读。
Level 3 · 规范怎么定义的(资深)
本章的协议与规范细节已融入上述 Level 中。如需深入了解,请参阅 Kafka 官方协议文档和对应 KIP 提案。
Level 4 · 边界与陷阱(所有人)
问题的根源:为什么重试会产生重复
假设 Producer 发送一个 ProduceRequest 包含消息 M:
- 请求到达 Broker,Broker 成功写入 M 到分区日志
- Broker 准备发送 ACK,但此时网络故障,ACK 丢失
- Producer 超时,认为发送失败,触发重试
- 第二个 ProduceRequest 再次发送 M
- Broker 又写入一次 M
- 消费者读到两条相同的 M
传统的消息系统要么接受这种"至少一次"语义,要么要求应用层自行去重(通过数据库唯一键、业务 ID 等)。Kafka 从 0.11 版本开始在 Broker 层内置去重机制,彻底解决这个问题。
EOS 的性能代价与适用边界
精确一次不是免费的,它带来真实的性能开销:
幂等性开销(约 3-8%):
- Broker 需要维护每个 PID 的序列号状态(内存)
- 额外的序列号校验逻辑
acks=all比acks=1更高的写入延迟
事务开销(约 10-20%):
- 两个额外的网络往返(PrepareCommit + WriteTxnMarkers)
__transaction_state的持久化写入- Consumer 端
read_committed的 LSO 等待逻辑 - COMMIT/ABORT marker 占用的存储空间
实测参考数据(1KB 消息,1 个 Broker,单分区):
| 配置 | TPS | P99 延迟 |
|---|---|---|
| acks=1, 无幂等 | 650K | 5ms |
| acks=all, 幂等 | 420K | 12ms |
| 事务(每10条提交一次) | 280K | 35ms |
| 事务(每1000条提交一次) | 390K | 40ms |
减少事务开销的关键:增大事务批次。每次 commitTransaction() 有固定的 2PC 开销,与事务内包含多少消息无关。因此,将更多消息放入一个事务(transactional.message.timeout.ms 控制超时,默认 60000ms)能显著摊薄每条消息的事务成本。
什么时候真正需要 EOS?
- Kafka Streams 内部状态更新:Streams 的 EOS 模式自动开启事务,确保 consume-transform-produce 原子性
- 幂等写入 + 消费位移原子提交(consume-transform-produce 流水线)
- 金融级数据管道:不允许任何重复或丢失
什么时候不需要 EOS?
- 日志归档、监控数据:偶尔重复不影响分析
- 下游系统天然幂等(如数据库 upsert by primary key):应用层去重更高效
- 极高吞吐场景(>1M TPS):事务的 3-20% 开销可能不可接受
理解幂等性和事务的内部机制,才能在"我需要 EOS"和"EOS 的代价是否值得"之间做出正确判断。下一章我们将离开正确性保证,转向纯粹的性能:如何将 Producer 吞吐量推向百万 TPS。