第 16 章

日志压缩与保留:Compaction 源码级解析

第16章:日志压缩与保留:Compaction 源码级解析

导读:日志压缩(Compaction)的内部算法是怎样的?

本章核心问题:日志压缩(Compaction)的内部算法是怎样的?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

Kafka 提供两种基本的日志保留策略,两者解决的是截然不同的业务场景:delete 策略基于时间或大小删除旧数据,compact 策略基于消息键保留每个键的最新值。理解压缩的内部算法,是优化其性能和保证业务语义正确性的基础。

两种保留策略的本质区别

delete:基于时间和大小的删除

# topic 级别配置
cleanup.policy=delete
log.retention.hours=168     # 7天(默认)
log.retention.bytes=-1      # -1 表示不限大小(默认)
log.segment.bytes=1073741824 # 1GB 段大小(默认)

工作机制:LogManager 中的 LogCleanerLogRetentionThread 定期检查所有分区:

  1. 时间保留:找到修改时间早于 log.retention.ms 的已滚动(非活跃)LogSegment,标记为删除
  2. 大小保留:如果分区总大小超过 log.retention.bytes,从最旧的段开始删除,直到总大小降至阈值以下
  3. 活跃段(当前写入的段)永远不会被删除
# 查看 LogRetentionThread 的操作
grep "Deleting segment" /var/log/kafka/server.log | tail -5
# [2024-04-26] INFO Deleting segment 0 for log /data/kafka/events-0 (kafka.log.Log)

compact:基于键的去重保留

cleanup.policy=compact
min.cleanable.dirty.ratio=0.5    # 脏数据比例超过 50% 才触发压缩
min.compaction.lag.ms=0          # 消息写入后多久才能被压缩(0=立即可压缩)
max.compaction.lag.ms=Long.MAX_VALUE  # 消息被写入后最长多久必须被压缩
delete.retention.ms=86400000     # 墓碑消息保留 24 小时后删除

适用场景

不适用场景

墓碑消息:删除语义的实现

什么是墓碑(Tombstone)

在 compact 主题中,不能简单地通过"不发消息"来删除某个 Key——因为压缩之后,旧的删除操作也会被删掉,无法传达给新加入的消费者。

解决方案是发送一条 value 为 null 的消息,这就是墓碑消息:

// 发送墓碑消息以"删除"某个键
producer.send(new ProducerRecord<>(
    "user-profiles",
    "user-123",    // key
    null           // null value = tombstone
));

墓碑消息的生命周期:

  1. 写入主题:墓碑立即对消费者可见,通知他们"这个 key 已被删除"
  2. 经历压缩:同 key 的旧版本消息被删除,墓碑本身被保留
  3. delete.retention.ms 到期:墓碑消息本身被压缩删除

delete.retention.ms(默认 24 小时)的存在意义:给新加入的消费者足够的时间看到这条墓碑消息,确保他们也能同步到"这个 key 已删除"的状态。如果墓碑立即消失,一个从头开始消费(earliest)的新消费者可能永远看不到这个删除事件。

混合策略:compact + delete

# 同时启用两种策略
cleanup.policy=compact,delete
log.retention.hours=72      # 3天
min.cleanable.dirty.ratio=0.5

含义:对数据进行 compact(保留每个 key 的最新值),同时超过 72 小时的段整体删除。

适用场景:既需要去重(节省空间、快速 consumer 状态恢复),又不需要无限期保留数据(如会话状态、用户行为流)。

# 验证 topic 的 cleanup.policy 配置
kafka-configs.sh \
  --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name user-sessions \
  --describe | grep cleanup.policy
# cleanup.policy=compact,delete

压缩与事务:必须保留的标记

当主题同时使用事务和压缩时,需要特别注意:事务标记(COMMIT/ABORT)必须被保留,即使其对应的数据批次已经被压缩删除。

原因:消费者使用 isolation.level=read_committed 时,需要读取事务标记来判断哪些批次是已提交的,哪些是已中止的。如果事务标记被压缩删除,消费者无法正确跳过已中止的事务,可能读取到中止事务的"脏数据"。

Kafka 的压缩逻辑会检查并保留事务标记,直到对应的 lastStableOffset(LSO)推进到安全位置。这部分逻辑在 LogCleaner 中通过检查 .txnindex 文件实现。


Level 2 · 它是怎么运行的(3-5年经验)

压缩算法内部:LogCleaner 的工作原理

第一步:选择最"脏"的分区

LogCleaner 线程(由 log.cleaner.threads 控制,默认 1)周期性地扫描所有 compact 策略的分区,选出"脏数据比例"最高的分区优先处理:

// kafka/core/src/main/scala/kafka/log/LogCleaner.scala(概念性代码)
def cleanerTask(): Unit = {
  val dirtyLog = cleaner.grabFilthiestLog() // 选择 dirty/total 比值最大的分区
  if (dirtyLog.isDefined) {
    cleaner.clean(dirtyLog.get)
  }
}

// 脏数据比例计算
def dirtyRatio(log: Log): Double = {
  val (cleanedBytes, dirtyBytes) = log.logSegments.foldLeft((0L, 0L)) {
    case ((clean, dirty), segment) =>
      if (segment.baseOffset > log.lastStableOffset) (clean, dirty) // 跳过未稳定段
      else if (segment.baseOffset >= log.cleanerCheckpoint) (clean, dirty + segment.size)
      else (clean + segment.size, dirty)
  }
  dirtyBytes.toDouble / (cleanedBytes + dirtyBytes)
}

cleanerCheckpoint:记录上次压缩完成的位置,该 offset 之前的段被认为是"干净"的,之后的是"脏"的。

第二步:构建 OffsetMap

OffsetMap 是一个内存中的哈希表,存储 key → 最高offset 的映射。它告诉 Cleaner:"对于每个 Key,哪个 offset 的消息是最新的,应该保留"。

// 构建 OffsetMap
val offsetMap = new SkimpyOffsetMap(maxSize = log.config.maxMessageSize * 10)

// 扫描脏段,填充 OffsetMap
for (segment <- dirtySegments) {
  for (batch <- segment.log.batches) {
    for (record <- batch.records) {
      if (record.hasKey) {
        // 对同一个 key,后出现的 offset 覆盖前面的
        offsetMap.put(record.key, batch.lastOffset)
      }
      // key 为 null 的记录:不放入 OffsetMap,意味着它会被保留(无法压缩)
    }
  }
}

SkimpyOffsetMap 是 Kafka 自己实现的内存受限哈希表,使用开放寻址法。其内存上限由 log.cleaner.dedupe.buffer.size(默认 128MB)控制。若 Key 数量超过内存容量,Cleaner 会分段处理(多次扫描脏段)。

第三步:扫描脏段,标记删除

根据构建好的 OffsetMap,扫描脏段中的每条记录:

// 决定一条记录是否应该保留
def shouldRetain(batch: RecordBatch, record: Record, offsetMap: OffsetMap): Boolean = {
  if (!record.hasKey) return true  // 没有 Key:不能压缩,保留

  val latestOffset = offsetMap.get(record.key)

  if (latestOffset == -1) {
    // key 不在 OffsetMap 中(可能超出了脏数据范围,在已清理的段里)
    return true  // 保留:可能是最新的,不确定
  }

  if (record.offset < latestOffset) {
    return false  // 这是旧版本的记录,有更新的版本,删除
  }

  if (record.offset == latestOffset && record.hasNullValue) {
    // 这是墓碑(tombstone)消息
    // 保留,但会在 delete.retention.ms 后被删除
    return true
  }

  return true  // 这是最新版本,保留
}

第四步:写入清理后的段,原子替换

Cleaner 将保留的记录写入临时的 .swap 文件:

# 压缩进行中时可能看到的临时文件
ls /data/kafka/user-events-0/
# 00000000000000000000.log
# 00000000000000000000.log.deleted  ← 被标记为待删除的旧段
# 00000000000001000000.log.swap     ← 正在写入的新段(临时文件)
# 00000000000001000000.index.swap

写入完成后,原子重命名:

00000000000001000000.log.swap → 00000000000001000000.log
旧段文件 → 标记为 .deleted,等待异步删除

完整的压缩流程总结:

脏段 [offset 100..200] 中的原始数据:
key=user1, offset=100, value={"name":"Alice"}
key=user2, offset=101, value={"name":"Bob"}
key=user1, offset=150, value={"name":"Alice Smith"}  ← user1 的更新
key=user3, offset=175, value=null                    ← user3 的墓碑
key=user2, offset=190, value={"name":"Robert"}       ← user2 的更新
key=user1, offset=195, value={"name":"Alice Brown"}  ← user1 的再次更新

OffsetMap:
user1 → 195
user2 → 190
user3 → 175

压缩结果(保留的记录):
key=user3, offset=175, value=null     ← 墓碑,保留直到 delete.retention.ms
key=user2, offset=190, value="Robert" ← user2 的最新值
key=user1, offset=195, value="Alice Brown" ← user1 的最新值

删除的记录:
key=user1, offset=100  (有更新的 195)
key=user2, offset=101  (有更新的 190)
key=user1, offset=150  (有更新的 195)

Level 3 · 规范怎么定义的(资深)

压缩延迟监控:CompactionLag

压缩积压的形成

如果数据写入速度超过 Cleaner 的压缩速度,dirty 部分会持续增长,导致压缩积压。极端情况下,某些 key 的历史版本永远无法被清理,压缩失去意义。

压缩积压的量化

compaction_lag = current_time - write_time_of_cleanerCheckpoint_offset

即:从 cleanerCheckpoint 对应的消息被写入,到该消息被纳入压缩,经过了多长时间。

max.compaction.lag.ms 设置了这个值的上限(默认 Long.MAX_VALUE,即无限制)。当设置了有限值时,Cleaner 会优先处理积压超过该时间的分区,即使其 dirty ratio 还未达到 min.cleanable.dirty.ratio

# 监控压缩积压(JMX 指标)
# kafka.log:type=LogCleaner,name=max-compaction-delay-secs
# 值越大,说明某些分区的数据等待压缩的时间越长

# kafka.log:type=LogCleanerManager,name=time-since-last-run-ms
# 上一次 Cleaner 运行到现在的间隔,应保持在分钟级别

# 查看 Cleaner 运行日志
grep "Log cleaner" /var/log/kafka/server.log | tail -10
# INFO [kafka-log-cleaner-thread-0]:
#   Starting cleaner iteration for /data/kafka/user-events-0
#   (kafka.log.LogCleaner)
# INFO [kafka-log-cleaner-thread-0]:
#   Cleaned /data/kafka/user-events-0 in 5.3s.
#   Compacted 1.2 GB to 340 MB (71.7% reduction). (kafka.log.LogCleaner)

压缩性能调优参数

# 增加 Cleaner 线程数(默认 1,建议与磁盘数相当)
log.cleaner.threads=2

# Cleaner 的 I/O 速率限制(避免影响正常读写)
log.cleaner.io.max.bytes.per.second=104857600  # 100MB/s

# OffsetMap 内存大小(越大,单次可处理的 Key 越多)
log.cleaner.dedupe.buffer.size=268435456  # 256MB

# 脏数据比例阈值(越低,压缩越频繁,存储越节省,但 CPU 开销更大)
log.cleaner.min.cleanable.ratio=0.3  # 30%

# Cleaner 休眠间隔(两次压缩之间的间隔)
log.cleaner.backoff.ms=15000  # 15秒

告警策略

# Prometheus 告警
- alert: KafkaCompactionLagHigh
  expr: |
    kafka_log_log_cleaner_compaction_delay_secs_max > 3600
  for: 10m
  labels:
    severity: warning
  annotations:
    summary: "Kafka compaction lag > 1 hour: {{ $value }}s"
    description: |
      Log compaction is falling behind writes. This means old values for some
      keys may not be cleaned up in a timely manner, causing excessive storage
      growth and degraded consumer group recovery performance.
    runbook: |
      1. Check log.cleaner.threads — consider increasing if I/O is not saturated
      2. Check disk I/O utilization — compaction is I/O bound
      3. Check log.cleaner.dedupe.buffer.size — may need more memory for large key spaces
      4. Consider adding more disks or faster storage

日志压缩是 Kafka 区别于传统消息队列的重要特性之一——它让 Kafka 不仅是一个消息管道,更是一个可靠的事件存储和状态快照系统。理解 Cleaner 的内部算法,才能在设计 compact 主题时做出正确的参数决策,并在压缩出现积压时迅速定位和解决问题。


Level 4 · 边界与陷阱(所有人)

以下是与"日志压缩与保留:Compaction 源码级解析"相关的常见边界问题和生产陷阱:

陷阱一:忽略默认配置的隐含假设。 许多 Kafka 参数的默认值是为通用场景设计的,在特定业务场景下可能导致性能问题或数据风险。上线前务必逐一审查与本章主题相关的配置项,确认其是否符合业务需求。

陷阱二:缺乏端到端的验证。 理论理解和生产实践之间往往存在差距。建议在预发布环境中构建完整的测试场景,覆盖正常路径和异常路径(如节点宕机、网络分区、磁盘满),验证系统行为是否符合预期。

陷阱三:监控盲区。 本章涉及的机制如果缺乏对应的监控指标和告警规则,问题往往在造成业务影响后才被发现。建议将关键指标纳入监控体系,设置合理的告警阈值。

本章评分
4.5  / 5  (16 评分)

💬 留言讨论