Log Compaction and Retention: Source-Level Analysis
Log Compaction and Retention: Source-Level Analysis
Kafka provides two fundamentally different log retention policies, each solving a distinct class of problems: delete removes old data based on time or size constraints, while compact retains only the latest value per message key regardless of age. Understanding the compaction algorithm at the source level is the foundation for tuning its performance and ensuring correct business semantics.
Two Retention Policies: Fundamentally Different Purposes
delete: Time and Size Based Removal
# Topic-level configuration
cleanup.policy=delete
log.retention.hours=168 # 7 days (default)
log.retention.bytes=-1 # -1 = no size limit (default)
log.segment.bytes=1073741824 # 1GB per segment (default)
Mechanism: The LogRetentionThread in LogManager periodically scans all partitions:
- Time-based: Identify rolled (non-active) LogSegments whose last-modified time is older than
log.retention.ms. Mark them for deletion. - Size-based: If a partition's total log size exceeds
log.retention.bytes, delete segments from oldest to newest until total size falls below the threshold. - The active segment (currently being written to) is never deleted regardless of age or size.
# Monitor deletion activity in broker logs
grep "Deleting segment" /var/log/kafka/server.log | tail -5
# INFO Deleting segment LogSegment(baseOffset=0, size=1073741824)
# for partition events-0 (kafka.log.Log)
# See current disk usage per partition
du -sh /data/kafka/events-*/
# 1.0G /data/kafka/events-0
# 987M /data/kafka/events-1
compact: Key-Based Deduplication Retention
cleanup.policy=compact
min.cleanable.dirty.ratio=0.5 # trigger when dirty/total > 50%
min.compaction.lag.ms=0 # messages eligible for compaction immediately (0=no delay)
max.compaction.lag.ms=604800000 # compaction must happen within 7 days of write
delete.retention.ms=86400000 # tombstone messages kept for 24h before final deletion
Use cases where compact is the correct choice:
- CDC (Change Data Capture): A database table's change log, where each row's primary key is the Kafka message key. Only the latest row state matters โ historical updates are redundant.
- Kafka Streams state store changelogs: Restore a state store by replaying only the latest value per key, not the full history of changes.
- Configuration stores: Microservice configuration items with a name as key โ only the current value is needed.
Use cases where compact is wrong:
- Messages have null keys (null-key records cannot be compacted and accumulate indefinitely)
- Full event history is required for compliance, auditing, or event sourcing replay
Compaction Algorithm Internals: How LogCleaner Works
Step 1: Select the Dirtiest Partition
The LogCleaner thread pool (controlled by log.cleaner.threads, default 1) periodically evaluates all compact-policy partitions and selects the one with the highest "dirty ratio" to clean first:
// LogCleaner.scala โ simplified from Kafka source
def grabFilthiestLog(): Option[LogToClean] = {
val dirtyLogs = logs.filterNot(_.isFuture)
.filter(log => log.config.compact && !cleanerManager.isInProgress(log.topicPartition))
.map { log =>
val (cleanableBytes, dirtyBytes) = log.logSegments.foldLeft((0L, 0L)) {
case ((clean, dirty), seg) =>
if (seg.baseOffset >= log.cleanerCheckpoint) (clean, dirty + seg.size)
else (clean + seg.size, dirty)
}
val ratio = dirtyBytes.toDouble / (cleanableBytes + dirtyBytes)
LogToClean(log, ratio, cleanableBytes, dirtyBytes)
}
.filter(_.cleanableRatio > log.config.minCleanableRatio)
if (dirtyLogs.isEmpty) None
else Some(dirtyLogs.maxBy(_.cleanableRatio))
}
cleanerCheckpoint is a persisted marker recording the highest offset cleaned in a previous pass. Segments before this offset are "clean" (already compacted); segments after are "dirty" (may contain redundant older values).
Step 2: Build the OffsetMap
The OffsetMap is an in-memory hash table mapping message_key โ highest_offset_seen. It answers the question: "For each key, which offset is the most recent version that should be retained?"
// Building the OffsetMap by scanning dirty segments
val offsetMap = new SkimpyOffsetMap(
memory = config.dedupeBufferSize, // log.cleaner.dedupe.buffer.size (default 128MB)
hashAlgorithm = "MD5"
)
for (segment <- dirtySegments) {
val reader = segment.log.batchIterator()
while (reader.hasNext) {
val batch = reader.next()
for (record <- batch.asScala) {
if (record.hasKey) {
// For the same key, later offsets overwrite earlier ones in the map
offsetMap.put(record.key, batch.lastOffset)
// Result: offsetMap[key] = the highest offset at which this key appears
}
// null-key records: not added to offsetMap โ they will always be retained
}
}
}
SkimpyOffsetMap is Kafka's custom memory-bounded hash table using open addressing with linear probing. It's designed to use exactly log.cleaner.dedupe.buffer.size bytes. If the key space exceeds available memory, the cleaner processes dirty segments in multiple passes.
Step 3: Scan and Mark Records for Deletion
With the OffsetMap built, the cleaner re-scans dirty segments to determine which records to keep:
// Retention decision for each record
def shouldRetainRecord(map: OffsetMap, retainDeletesAndNulls: Boolean,
batch: RecordBatch, record: Record,
stats: CleanerStats): Boolean = {
// Records without keys cannot be compacted โ always retain
if (!record.hasKey) return true
// Look up the highest known offset for this key
val latestOffset = map.get(record.key)
if (latestOffset == map.lookupEntry.offset) {
// This IS the latest version of this key โ retain it
if (record.hasNullValue) {
// This is a tombstone (delete marker)
// Retain for now, but schedule for deletion after delete.retention.ms
stats.numTombstones += 1
}
return true
}
// record.offset < latestOffset: a newer version exists โ this is redundant
stats.numMessagesDropped += 1
return false
}
Step 4: Write Cleaned Segments and Atomically Swap
The cleaner writes retained records to temporary .swap files while preserving original segment files:
# Temporary files visible during active compaction
ls -la /data/kafka/user-profiles-0/
# During compaction:
-rw-r--r-- 00000000000000000000.log # original segment (being read)
-rw-r--r-- 00000000000000000000.log.deleted # marked for deletion after swap
-rw-r--r-- 00000000000001048576.log.swap # new compacted segment (being written)
-rw-r--r-- 00000000000001048576.index.swap # new index (being rebuilt)
-rw-r--r-- 00000000000002097152.log # untouched segment
After writing completes, an atomic rename makes the swap file the canonical segment:
# Swap complete โ original deleted, swap renamed to final
-rw-r--r-- 00000000000001048576.log # was .swap, now the real segment
-rw-r--r-- 00000000000001048576.index
-rw-r--r-- 00000000000002097152.log # untouched
The old segment files (.deleted) are deleted asynchronously to minimize impact on read I/O. The atomic rename ensures that readers always see either the old segment or the new segment โ never a partially written intermediate state.
Complete Compaction Example
Dirty segment containing offset range 100-200:
key=user1, offset=100, value={"name":"Alice"} โ old version
key=user2, offset=101, value={"name":"Bob"} โ old version
key=user1, offset=150, value={"name":"Alice Smith"} โ intermediate version
key=user3, offset=175, value=null โ tombstone (delete marker)
key=user2, offset=190, value={"name":"Robert"} โ new version
key=user1, offset=195, value={"name":"Alice Brown"} โ newest version
OffsetMap built from this segment:
user1 โ 195
user2 โ 190
user3 โ 175
After compaction (retained records):
key=user3, offset=175, value=null โ tombstone retained (pending delete.retention.ms)
key=user2, offset=190, value={"name":"Robert"} โ latest version of user2
key=user1, offset=195, value={"name":"Alice Brown"} โ latest version of user1
Dropped (3 records eliminated):
key=user1, offset=100 (superseded by offset 195)
key=user2, offset=101 (superseded by offset 190)
key=user1, offset=150 (superseded by offset 195)
Storage reduction: 6 records โ 3 records = 50% reduction for this segment
Tombstone Messages: The Delete Semantics
Why Tombstones Are Necessary
In a compacted topic, you cannot express "delete key X" by simply not sending a message. If you stop writing for a key, compaction will retain the last value indefinitely. New consumers starting from earliest would see that value as current โ they'd never know it was meant to be deleted.
The solution is a tombstone: a record with the target key and a null value:
// Delete a key from a compacted topic
ProducerRecord<String, String> tombstone = new ProducerRecord<>(
"user-profiles", // topic
"user-123", // key: the entry to delete
null // null value = tombstone marker
);
producer.send(tombstone).get();
// What consumers see:
// ConsumerRecord: key="user-123", value=null
// Application code should interpret null value as "this key was deleted"
// Pattern: handle tombstones in consumer
consumer.subscribe(List.of("user-profiles"));
while (true) {
for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(100))) {
if (record.value() == null) {
// Tombstone: remove this key from local state
localCache.remove(record.key());
database.delete("user_profiles", record.key());
} else {
// Normal record: upsert to local state
localCache.put(record.key(), record.value());
database.upsert("user_profiles", record.key(), record.value());
}
}
}
Tombstone Lifecycle
T=0: Tombstone written: key="user-123", value=null
Immediately visible to consumers โ they delete "user-123" from their state
T=1h: Compaction runs
Older versions of key="user-123" are deleted
Tombstone itself is RETAINED (it's the "latest" value for this key)
T=24h: delete.retention.ms (86400000ms) has elapsed since the tombstone was written
Next compaction pass removes the tombstone itself
Key "user-123" no longer exists in the topic
T=25h: A new consumer starts from earliest
It does NOT see key="user-123" at all (tombstone already purged)
This is correct โ the key doesn't exist
delete.retention.ms must be long enough for all consumers to have read the tombstone and updated their local state before it disappears. If you have a consumer that might be 12 hours behind, set delete.retention.ms to at least 24 hours to ensure it processes the tombstone before it's purged.
Mixed Policy: compact + delete
cleanup.policy=compact,delete
log.retention.hours=72 # delete segments older than 3 days
min.cleanable.dirty.ratio=0.5 # compact when dirty ratio > 50%
This applies both policies: segments are compacted (keeping only the latest value per key), AND segments are deleted entirely when they age beyond log.retention.hours.
Use cases:
- User session state: Compact to avoid redundant session updates; delete to purge expired sessions after 72 hours
- Device telemetry: Keep latest device state via compact; delete historical data after 3 days for storage management
# Verify the cleanup.policy on a topic
kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name user-sessions \
--describe
# Dynamic configs for topic 'user-sessions' are:
# cleanup.policy=compact,delete sensitive=false synonyms=...
Compaction and Transactions: Preserving Commit/Abort Markers
When a topic uses both transactions and log compaction, a subtle but critical constraint applies: transaction markers (COMMIT and ABORT control batches) must be preserved by the compaction process, even after their associated data records have been compacted away.
Why: Consumers with isolation.level=read_committed rely on COMMIT/ABORT markers to determine which message batches to expose to the application. If markers were deleted by compaction before all active transactional consumers have passed the corresponding offset, those consumers would lose the ability to correctly filter aborted transaction data โ potentially exposing "dirty" (aborted) records.
Kafka's LogCleaner implements this by:
- Checking the
.txnindexfile for the segment being cleaned - Retaining any COMMIT/ABORT markers whose corresponding
lastStableOffsethasn't advanced past the marker - Only removing markers after the transaction coordinator confirms they are no longer needed
This is handled automatically; you don't need to configure anything special, but it means the actual storage reduction from compaction may be slightly less than expected on heavily transactional topics.
Compaction Lag Monitoring and Alerting
How Compaction Lag Builds Up
If the write rate to a compact topic exceeds the cleaner's throughput, the dirty portion grows continuously. In extreme cases, old versions of some keys are never cleaned up, and compaction fails to deliver its storage benefits.
Quantifying compaction lag:
compaction_lag_for_partition = current_time - write_time_of(cleanerCheckpoint_offset)
This is: how long ago was the data at cleanerCheckpoint written? Equivalently โ how old is the oldest data that hasn't been compacted yet?
max.compaction.lag.ms (default: Long.MAX_VALUE = unlimited) caps this value. When set, the cleaner prioritizes partitions that have exceeded this lag even if their dirty ratio hasn't reached min.cleanable.dirty.ratio.
# Monitor compaction activity from broker logs
grep "Cleaner" /var/log/kafka/server.log | grep -v "DEBUG" | tail -15
# INFO [kafka-log-cleaner-thread-0]:
# Starting cleaner pass for /data/kafka/user-profiles-0
# INFO [kafka-log-cleaner-thread-0]:
# Completed cleaning /data/kafka/user-profiles-0 in 8.2s.
# Dirty ratio was 0.72, compacted 2.1 GB to 420 MB (80.0% reduction).
# Cleaned 1847293 messages, dropped 1613204 messages.
# Check compaction progress via kafka-log-dirs.sh
kafka-log-dirs.sh \
--bootstrap-server localhost:9092 \
--topic-list user-profiles \
--describe 2>/dev/null | python3 -m json.tool | grep -A5 "offsetLag"
Key JMX metrics for compaction monitoring:
# Metrics under kafka.log:type=LogCleaner
# max-compaction-delay-secs โ highest per-partition compaction lag in seconds
# compaction-stats-max-secs โ time taken by the most recent compaction pass
# Metrics under kafka.log:type=LogCleanerManager
# time-since-last-run-ms โ milliseconds since LogCleaner last ran
# dead-threads โ number of cleaner threads that have died (should always be 0)
Compaction Performance Tuning
# Increase cleaner thread count (default 1) โ one thread per disk is a good starting point
log.cleaner.threads=2
# Throttle cleaner I/O to avoid starving producers and consumers
log.cleaner.io.max.bytes.per.second=209715200 # 200 MB/s
# Increase OffsetMap memory for topics with large key cardinality
log.cleaner.dedupe.buffer.size=536870912 # 512 MB
# Lower dirty ratio threshold for more aggressive compaction
# (trades more frequent CPU/IO for better storage efficiency)
log.cleaner.min.cleanable.ratio=0.3 # 30%
# Increase cleaner backoff to reduce overhead on lightly loaded topics
log.cleaner.backoff.ms=30000 # 30 seconds between passes
Alerting Configuration
# Prometheus alert rules for compaction health
groups:
- name: kafka-compaction
rules:
- alert: KafkaCompactionLagHigh
expr: |
kafka_log_log_cleaner_compaction_delay_secs_max > 3600
for: 10m
labels:
severity: warning
annotations:
summary: "Kafka compaction lag exceeds 1 hour: {{ $value }}s"
description: |
Log compaction is falling behind the write rate. Old value versions
are accumulating, degrading storage efficiency and increasing consumer
state store recovery time.
runbook: |
1. Check log.cleaner.threads โ increase if disk I/O not saturated
2. Check disk I/O: iostat -x 1 โ compaction is disk-I/O bound
3. Consider increasing log.cleaner.dedupe.buffer.size
4. Consider increasing log.cleaner.io.max.bytes.per.second
5. If topic write rate is fundamentally too high, add more partitions
(distributes compaction work across more brokers)
- alert: KafkaCleanerThreadDead
expr: |
kafka_log_log_cleanermanager_dead_threads > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka log cleaner thread has died on {{ $labels.instance }}"
description: |
A LogCleaner thread has thrown an unhandled exception and died.
Compaction has stopped for partitions assigned to that thread.
Broker restart may be required.
Log compaction is one of the features that distinguishes Kafka from traditional message queues โ it transforms Kafka from a pure message conduit into a reliable event store and state snapshot system. A Kafka Stream's state store can be rebuilt from scratch by consuming a compacted changelog topic: it replays only the latest value per key, making recovery fast regardless of how long the state store has been accumulating updates. Understanding the LogCleaner algorithm โ OffsetMap construction, dirty ratio selection, tombstone lifecycle, and transaction marker preservation โ gives you the knowledge to design compact topics correctly, tune their performance parameters appropriately, and diagnose compaction lag when it appears in production.