日志压缩与保留:Compaction 源码级解析
第16章:日志压缩与保留:Compaction 源码级解析
导读:日志压缩(Compaction)的内部算法是怎样的?
本章核心问题:日志压缩(Compaction)的内部算法是怎样的?
读完本章你将理解:
- delete 与 compact 两种保留策略
- LogCleaner 的四步工作流程
- 墓碑消息的生命周期
- 压缩延迟监控与性能调优
Level 1 · 你需要知道的(1-3年经验)
Kafka 提供两种基本的日志保留策略,两者解决的是截然不同的业务场景:delete 策略基于时间或大小删除旧数据,compact 策略基于消息键保留每个键的最新值。理解压缩的内部算法,是优化其性能和保证业务语义正确性的基础。
两种保留策略的本质区别
delete:基于时间和大小的删除
# topic 级别配置
cleanup.policy=delete
log.retention.hours=168 # 7天(默认)
log.retention.bytes=-1 # -1 表示不限大小(默认)
log.segment.bytes=1073741824 # 1GB 段大小(默认)
工作机制:LogManager 中的 LogCleaner 和 LogRetentionThread 定期检查所有分区:
- 时间保留:找到修改时间早于
log.retention.ms的已滚动(非活跃)LogSegment,标记为删除 - 大小保留:如果分区总大小超过
log.retention.bytes,从最旧的段开始删除,直到总大小降至阈值以下 - 活跃段(当前写入的段)永远不会被删除
# 查看 LogRetentionThread 的操作
grep "Deleting segment" /var/log/kafka/server.log | tail -5
# [2024-04-26] INFO Deleting segment 0 for log /data/kafka/events-0 (kafka.log.Log)
compact:基于键的去重保留
cleanup.policy=compact
min.cleanable.dirty.ratio=0.5 # 脏数据比例超过 50% 才触发压缩
min.compaction.lag.ms=0 # 消息写入后多久才能被压缩(0=立即可压缩)
max.compaction.lag.ms=Long.MAX_VALUE # 消息被写入后最长多久必须被压缩
delete.retention.ms=86400000 # 墓碑消息保留 24 小时后删除
适用场景:
- 变更数据捕获(CDC):数据库表的变更日志,每行以主键为 Kafka 消息 Key,只需保留每行的最新状态
- 事件溯源的状态快照:Kafka Streams 的 StateStore 变更日志
- 配置中心:配置项以名称为 Key,只需最新值
不适用场景:
- 消息没有 Key(Key 为 null 的消息无法压缩,会永久保留,直到 delete 策略清理)
- 业务需要完整的事件历史(如审计日志)
墓碑消息:删除语义的实现
什么是墓碑(Tombstone)
在 compact 主题中,不能简单地通过"不发消息"来删除某个 Key——因为压缩之后,旧的删除操作也会被删掉,无法传达给新加入的消费者。
解决方案是发送一条 value 为 null 的消息,这就是墓碑消息:
// 发送墓碑消息以"删除"某个键
producer.send(new ProducerRecord<>(
"user-profiles",
"user-123", // key
null // null value = tombstone
));
墓碑消息的生命周期:
- 写入主题:墓碑立即对消费者可见,通知他们"这个 key 已被删除"
- 经历压缩:同 key 的旧版本消息被删除,墓碑本身被保留
- delete.retention.ms 到期:墓碑消息本身被压缩删除
delete.retention.ms(默认 24 小时)的存在意义:给新加入的消费者足够的时间看到这条墓碑消息,确保他们也能同步到"这个 key 已删除"的状态。如果墓碑立即消失,一个从头开始消费(earliest)的新消费者可能永远看不到这个删除事件。
混合策略:compact + delete
# 同时启用两种策略
cleanup.policy=compact,delete
log.retention.hours=72 # 3天
min.cleanable.dirty.ratio=0.5
含义:对数据进行 compact(保留每个 key 的最新值),同时超过 72 小时的段整体删除。
适用场景:既需要去重(节省空间、快速 consumer 状态恢复),又不需要无限期保留数据(如会话状态、用户行为流)。
# 验证 topic 的 cleanup.policy 配置
kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name user-sessions \
--describe | grep cleanup.policy
# cleanup.policy=compact,delete
压缩与事务:必须保留的标记
当主题同时使用事务和压缩时,需要特别注意:事务标记(COMMIT/ABORT)必须被保留,即使其对应的数据批次已经被压缩删除。
原因:消费者使用 isolation.level=read_committed 时,需要读取事务标记来判断哪些批次是已提交的,哪些是已中止的。如果事务标记被压缩删除,消费者无法正确跳过已中止的事务,可能读取到中止事务的"脏数据"。
Kafka 的压缩逻辑会检查并保留事务标记,直到对应的 lastStableOffset(LSO)推进到安全位置。这部分逻辑在 LogCleaner 中通过检查 .txnindex 文件实现。
Level 2 · 它是怎么运行的(3-5年经验)
压缩算法内部:LogCleaner 的工作原理
第一步:选择最"脏"的分区
LogCleaner 线程(由 log.cleaner.threads 控制,默认 1)周期性地扫描所有 compact 策略的分区,选出"脏数据比例"最高的分区优先处理:
// kafka/core/src/main/scala/kafka/log/LogCleaner.scala(概念性代码)
def cleanerTask(): Unit = {
val dirtyLog = cleaner.grabFilthiestLog() // 选择 dirty/total 比值最大的分区
if (dirtyLog.isDefined) {
cleaner.clean(dirtyLog.get)
}
}
// 脏数据比例计算
def dirtyRatio(log: Log): Double = {
val (cleanedBytes, dirtyBytes) = log.logSegments.foldLeft((0L, 0L)) {
case ((clean, dirty), segment) =>
if (segment.baseOffset > log.lastStableOffset) (clean, dirty) // 跳过未稳定段
else if (segment.baseOffset >= log.cleanerCheckpoint) (clean, dirty + segment.size)
else (clean + segment.size, dirty)
}
dirtyBytes.toDouble / (cleanedBytes + dirtyBytes)
}
cleanerCheckpoint:记录上次压缩完成的位置,该 offset 之前的段被认为是"干净"的,之后的是"脏"的。
第二步:构建 OffsetMap
OffsetMap 是一个内存中的哈希表,存储 key → 最高offset 的映射。它告诉 Cleaner:"对于每个 Key,哪个 offset 的消息是最新的,应该保留"。
// 构建 OffsetMap
val offsetMap = new SkimpyOffsetMap(maxSize = log.config.maxMessageSize * 10)
// 扫描脏段,填充 OffsetMap
for (segment <- dirtySegments) {
for (batch <- segment.log.batches) {
for (record <- batch.records) {
if (record.hasKey) {
// 对同一个 key,后出现的 offset 覆盖前面的
offsetMap.put(record.key, batch.lastOffset)
}
// key 为 null 的记录:不放入 OffsetMap,意味着它会被保留(无法压缩)
}
}
}
SkimpyOffsetMap 是 Kafka 自己实现的内存受限哈希表,使用开放寻址法。其内存上限由 log.cleaner.dedupe.buffer.size(默认 128MB)控制。若 Key 数量超过内存容量,Cleaner 会分段处理(多次扫描脏段)。
第三步:扫描脏段,标记删除
根据构建好的 OffsetMap,扫描脏段中的每条记录:
// 决定一条记录是否应该保留
def shouldRetain(batch: RecordBatch, record: Record, offsetMap: OffsetMap): Boolean = {
if (!record.hasKey) return true // 没有 Key:不能压缩,保留
val latestOffset = offsetMap.get(record.key)
if (latestOffset == -1) {
// key 不在 OffsetMap 中(可能超出了脏数据范围,在已清理的段里)
return true // 保留:可能是最新的,不确定
}
if (record.offset < latestOffset) {
return false // 这是旧版本的记录,有更新的版本,删除
}
if (record.offset == latestOffset && record.hasNullValue) {
// 这是墓碑(tombstone)消息
// 保留,但会在 delete.retention.ms 后被删除
return true
}
return true // 这是最新版本,保留
}
第四步:写入清理后的段,原子替换
Cleaner 将保留的记录写入临时的 .swap 文件:
# 压缩进行中时可能看到的临时文件
ls /data/kafka/user-events-0/
# 00000000000000000000.log
# 00000000000000000000.log.deleted ← 被标记为待删除的旧段
# 00000000000001000000.log.swap ← 正在写入的新段(临时文件)
# 00000000000001000000.index.swap
写入完成后,原子重命名:
00000000000001000000.log.swap → 00000000000001000000.log
旧段文件 → 标记为 .deleted,等待异步删除
完整的压缩流程总结:
脏段 [offset 100..200] 中的原始数据:
key=user1, offset=100, value={"name":"Alice"}
key=user2, offset=101, value={"name":"Bob"}
key=user1, offset=150, value={"name":"Alice Smith"} ← user1 的更新
key=user3, offset=175, value=null ← user3 的墓碑
key=user2, offset=190, value={"name":"Robert"} ← user2 的更新
key=user1, offset=195, value={"name":"Alice Brown"} ← user1 的再次更新
OffsetMap:
user1 → 195
user2 → 190
user3 → 175
压缩结果(保留的记录):
key=user3, offset=175, value=null ← 墓碑,保留直到 delete.retention.ms
key=user2, offset=190, value="Robert" ← user2 的最新值
key=user1, offset=195, value="Alice Brown" ← user1 的最新值
删除的记录:
key=user1, offset=100 (有更新的 195)
key=user2, offset=101 (有更新的 190)
key=user1, offset=150 (有更新的 195)
Level 3 · 规范怎么定义的(资深)
压缩延迟监控:CompactionLag
压缩积压的形成
如果数据写入速度超过 Cleaner 的压缩速度,dirty 部分会持续增长,导致压缩积压。极端情况下,某些 key 的历史版本永远无法被清理,压缩失去意义。
压缩积压的量化:
compaction_lag = current_time - write_time_of_cleanerCheckpoint_offset
即:从 cleanerCheckpoint 对应的消息被写入,到该消息被纳入压缩,经过了多长时间。
max.compaction.lag.ms 设置了这个值的上限(默认 Long.MAX_VALUE,即无限制)。当设置了有限值时,Cleaner 会优先处理积压超过该时间的分区,即使其 dirty ratio 还未达到 min.cleanable.dirty.ratio。
# 监控压缩积压(JMX 指标)
# kafka.log:type=LogCleaner,name=max-compaction-delay-secs
# 值越大,说明某些分区的数据等待压缩的时间越长
# kafka.log:type=LogCleanerManager,name=time-since-last-run-ms
# 上一次 Cleaner 运行到现在的间隔,应保持在分钟级别
# 查看 Cleaner 运行日志
grep "Log cleaner" /var/log/kafka/server.log | tail -10
# INFO [kafka-log-cleaner-thread-0]:
# Starting cleaner iteration for /data/kafka/user-events-0
# (kafka.log.LogCleaner)
# INFO [kafka-log-cleaner-thread-0]:
# Cleaned /data/kafka/user-events-0 in 5.3s.
# Compacted 1.2 GB to 340 MB (71.7% reduction). (kafka.log.LogCleaner)
压缩性能调优参数
# 增加 Cleaner 线程数(默认 1,建议与磁盘数相当)
log.cleaner.threads=2
# Cleaner 的 I/O 速率限制(避免影响正常读写)
log.cleaner.io.max.bytes.per.second=104857600 # 100MB/s
# OffsetMap 内存大小(越大,单次可处理的 Key 越多)
log.cleaner.dedupe.buffer.size=268435456 # 256MB
# 脏数据比例阈值(越低,压缩越频繁,存储越节省,但 CPU 开销更大)
log.cleaner.min.cleanable.ratio=0.3 # 30%
# Cleaner 休眠间隔(两次压缩之间的间隔)
log.cleaner.backoff.ms=15000 # 15秒
告警策略
# Prometheus 告警
- alert: KafkaCompactionLagHigh
expr: |
kafka_log_log_cleaner_compaction_delay_secs_max > 3600
for: 10m
labels:
severity: warning
annotations:
summary: "Kafka compaction lag > 1 hour: {{ $value }}s"
description: |
Log compaction is falling behind writes. This means old values for some
keys may not be cleaned up in a timely manner, causing excessive storage
growth and degraded consumer group recovery performance.
runbook: |
1. Check log.cleaner.threads — consider increasing if I/O is not saturated
2. Check disk I/O utilization — compaction is I/O bound
3. Check log.cleaner.dedupe.buffer.size — may need more memory for large key spaces
4. Consider adding more disks or faster storage
日志压缩是 Kafka 区别于传统消息队列的重要特性之一——它让 Kafka 不仅是一个消息管道,更是一个可靠的事件存储和状态快照系统。理解 Cleaner 的内部算法,才能在设计 compact 主题时做出正确的参数决策,并在压缩出现积压时迅速定位和解决问题。
Level 4 · 边界与陷阱(所有人)
以下是与"日志压缩与保留:Compaction 源码级解析"相关的常见边界问题和生产陷阱:
陷阱一:忽略默认配置的隐含假设。 许多 Kafka 参数的默认值是为通用场景设计的,在特定业务场景下可能导致性能问题或数据风险。上线前务必逐一审查与本章主题相关的配置项,确认其是否符合业务需求。
陷阱二:缺乏端到端的验证。 理论理解和生产实践之间往往存在差距。建议在预发布环境中构建完整的测试场景,覆盖正常路径和异常路径(如节点宕机、网络分区、磁盘满),验证系统行为是否符合预期。
陷阱三:监控盲区。 本章涉及的机制如果缺乏对应的监控指标和告警规则,问题往往在造成业务影响后才被发现。建议将关键指标纳入监控体系,设置合理的告警阈值。