Chapter 10

Everything About Offsets: Storage, Commit and Reset

Offset is the fundamental coordinate in Kafka's consumption model. It determines where a consumer starts reading, where it resumes after a failure, and how it replays historical data. Confusing the three distinct offset positions โ€” current position, committed offset, and log end offset โ€” is the most common source of subtle consumer bugs. This chapter covers the full picture: how offsets are physically stored, why commitAsync deliberately never retries, what happens when committed offsets expire, and how to use seek() for precise partition replay.

Where Offsets Live: __consumer_offsets

Kafka stores Consumer Group offset commitments in the internal topic __consumer_offsets. This moved from ZooKeeper in Kafka 0.9 โ€” the ZooKeeper-based offset storage is completely removed in modern versions.

Topic Characteristics

# Inspect __consumer_offsets configuration
kafka-topics.sh --bootstrap-server kafka1:9092 \
  --describe --topic __consumer_offsets

# Key properties you'll see:
# PartitionCount: 50
# ReplicationFactor: 3 (set by offsets.topic.replication.factor)
# Configs: cleanup.policy=compact,compression.type=producer,segment.bytes=104857600

Fifty partitions distribute GroupCoordinator responsibility across brokers. Each broker hosts multiple __consumer_offsets partitions as leader, meaning multiple consumer groups can commit offsets simultaneously to different brokers without any single coordinator becoming a bottleneck.

Log Compaction (cleanup.policy=compact) ensures that for every (group, topic, partition) key, only the latest committed offset value is retained. Old commit records are eligible for deletion during Compaction runs, keeping the topic from growing unboundedly.

Binary Format: Key and Value

Every offset commit is stored as a binary key-value record. Understanding the format is useful for debugging with low-level tools.

Offset Commit Key (schema version 1, used by modern consumer groups):

Field           Type      Description
version         int16     Schema version (0 or 1)
group           string    Consumer group ID
topic           string    Topic name
partition       int32     Partition index

Offset Commit Value (schema version 3, Kafka 2.1+):

Field           Type      Description
version         int16     Schema version
offset          int64     Next-to-fetch offset (last processed + 1)
leaderEpoch     int32     Leader epoch at time of commit (for fencing stale reads)
metadata        string    Application-supplied string (usually empty)
commitTimestamp int64     Unix timestamp (ms) when this commit was written
expireTimestamp int64     Deprecated field, now controlled by offset.retention.minutes

In addition to offset records, __consumer_offsets also stores Group Metadata Records (member lists, protocol information). Both record types share the same topic; they are distinguished by the version field in their keys.

# Read raw __consumer_offsets contents (debug tool)
kafka-console-consumer.sh \
  --bootstrap-server kafka1:9092 \
  --topic __consumer_offsets \
  --from-beginning \
  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
  2>/dev/null | head -20

# Sample output:
# [order-consumers,orders,0]::OffsetAndMetadata[offset=12345, leaderEpoch=Optional[5],
#   metadata=, commitTimestamp=1714000000000, expireTimestamp=-1]
# [order-consumers,orders,1]::OffsetAndMetadata[offset=23456, ...]

Three Offset Positions: The Consumer's Coordinate System

Every Kafka consumer juggles three distinct offset values per partition. Mixing them up produces subtle bugs that only appear under failure conditions.

Partition Log:
 [0][1][2]...[999][1000][1001]...[4999][5000]
                   โ†‘                          โ†‘
           Committed Offset              Log End Offset
         (last confirmed = 999)       (next write position)
                     โ†‘
               Position = 1000
           (next record to fetch,
            lives only in consumer memory)

Position (Current Fetch Position)

The in-memory pointer to the next record the consumer will fetch on the next poll() call. It advances automatically each time poll() returns records. Position is local to the consumer process โ€” it is lost on restart. After restart, the consumer re-fetches the committed offset from __consumer_offsets to restore its position.

Committed Offset

The durable record in __consumer_offsets. Its semantic: "all records with offset < this value have been successfully processed." Note the off-by-one convention: the committed value is the offset of the next record to fetch, not the offset of the last processed record. When you call consumer.commitSync() after processing record at offset 999, Kafka commits offset 1000.

This convention means: after a restart, consumer.seek(tp, committedOffset) positions the consumer at record 1000, which is correct โ€” you don't want to reprocess record 999.

Log End Offset (LEO)

The offset at which the next producer write will land โ€” equivalently, max(written offsets) + 1. LEO is owned by the partition leader and advances as producers write. Consumers never exceed LEO; the broker returns records only up to the high watermark (which lags LEO by the replication delay).

Consumer Lag

Lag = LEO - Committed Offset

Lag is the primary operational health metric for consumer groups. Zero lag means the consumer is processing records as fast as producers write them. Continuously growing lag means consumption is slower than production โ€” the fix is either more consumer instances (more parallelism) or faster message processing.

# Real-time lag monitoring
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 \
  --describe --group order-consumers

# TOPIC     PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID      HOST
# orders    0          12345           12400           55    consumer-1-uuid  /10.0.0.1
# orders    1          23456           23456           0     consumer-2-uuid  /10.0.0.2
# orders    2          34567           35000           433   consumer-3-uuid  /10.0.0.3

commitSync(): Blocking, Reliable, Costly

// Basic usage
try {
    consumer.commitSync();
} catch (CommitFailedException e) {
    // Non-retriable failure: usually ILLEGAL_GENERATION (rebalance occurred)
    // or REBALANCE_IN_PROGRESS
    log.error("Commit failed โ€” rebalance may have occurred", e);
}

// Commit specific offsets (more precise control)
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("orders", 0), new OffsetAndMetadata(12346L));
consumer.commitSync(offsets);

commitSync() blocks the calling thread until:

For retriable errors (network exceptions, coordinator not available), commitSync() automatically retries with backoff. This is safe because commit requests are idempotent: committing offset 1000 twice has the same effect as committing it once.

The performance cost: Each commitSync() requires one network round-trip to the GroupCoordinator, which holds the calling thread. At 100,000 records/second with per-record commits, you'd need 100,000 synchronous network requests per second โ€” clearly unsustainable. The solution: commit per batch, not per record.

// Efficient pattern: commit once per poll() batch
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record);
    }
    consumer.commitSync(); // Once per batch, not per record
}

commitAsync(): Non-Blocking, High-Performance, No Auto-Retry

consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        // Log the failure โ€” but do NOT retry here
        log.warn("Async commit failed for offsets {}: {}", offsets, exception.getMessage());
    }
});
// Returns immediately โ€” does not wait for coordinator response

commitAsync() sends the commit request and returns immediately. The callback fires (on the consumer thread, during a subsequent poll() call) when the response arrives.

Why does commitAsync never auto-retry on failure?

This is a deliberate design decision to prevent offset regression. Consider the sequence:

T1: commitAsync sends Request-A committing offset=100
T2: Request-A times out (unknown if coordinator received it)
T3: commitAsync sends Request-B committing offset=200; coordinator writes 200
T4: Request-A times out; if we auto-retry, Request-C is sent committing offset=100
T5: Coordinator overwrites offset=200 with offset=100
T6: Consumer restarts โ†’ starts from offset=100 โ†’ records 100-199 processed twice

Auto-retry causes offset regression โ€” an older commit overwrites a newer one, moving the consumer backward. Since there is no way for the retry logic to know whether offset=200 has already been committed, the only safe behavior is to not retry and let the next successful commit (at a higher offset) supersede the failed one.

The application callback should log the failure and accept that this batch's offset commit was skipped โ€” the next successful commitAsync or commitSync will commit a higher offset that encompasses all previously unconfirmed offsets.

try {
    while (!shutdown) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        
        for (ConsumerRecord<String, String> record : records) {
            process(record);
        }
        
        // Normal path: async commit for throughput
        consumer.commitAsync((offsets, exception) -> {
            if (exception != null) {
                log.warn("Async commit failed (will retry implicitly via next commit): {}",
                    exception.getMessage());
                // Do not retry โ€” the next successful commit covers these offsets
            }
        });
    }
} catch (WakeupException e) {
    // Expected during shutdown โ€” ignore
} catch (Exception e) {
    log.error("Unexpected consumer error", e);
} finally {
    try {
        // Shutdown path: sync commit to ensure the final batch is committed
        // This is the last chance to durably record position before exit
        consumer.commitSync();
        log.info("Final synchronous commit completed");
    } finally {
        consumer.close();
    }
}

The rationale: async commits during normal operation provide high throughput. The final commitSync() on shutdown guarantees the last processed batch's offset is durably stored, preventing unnecessary reprocessing when the consumer restarts.

auto.offset.reset: When There Is No Committed Offset

When a Consumer Group accesses a partition for the first time (no prior committed offset), or when the committed offset has expired (data has been deleted), auto.offset.reset determines where consumption begins:

Value Behavior When to Use
latest (default) Start from LEO โ€” only consume new records Real-time processing, history irrelevant
earliest Start from the oldest available record Must process all historical data
none Throw NoOffsetForPartitionException Force explicit error handling; never silently skip or replay

The Ghost Consumption Scenario:

A consumer group that has been inactive for longer than offset.retention.minutes (default 7 days) will find its committed offsets purged from __consumer_offsets. On restart:

Neither outcome is correct without explicit awareness. For critical data pipelines, auto.offset.reset=none forces the issue to surface as an exception, which can be caught and reported:

// Explicit handling of missing committed offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

try {
    consumer.poll(Duration.ofMillis(100));
} catch (NoOffsetForPartitionException e) {
    log.error("No committed offset found for {} โ€” manual intervention required", 
        e.partitions());
    // Page on-call, investigate gap, then use seek() to set appropriate position
    alertingService.sendAlert("Kafka consumer offset expired for group: " + groupId);
}

offset.retention.minutes: The Expiry Clock

The retention mechanism works as follows:

  1. When all consumers in a group go offline, the group enters Empty state
  2. The GroupCoordinator starts a retention timer
  3. After offset.retention.minutes (default 10,080 minutes = 7 days), the coordinator writes tombstone records (value=null) for all (group, topic, partition) keys
  4. The next __consumer_offsets Compaction run physically deletes those entries

You can increase retention for groups that may be offline for extended periods:

# Increase offset retention to 30 days
kafka-configs.sh --bootstrap-server kafka1:9092 \
  --entity-type brokers --entity-default \
  --alter --add-config offsets.retention.minutes=43200

# Verify
kafka-configs.sh --bootstrap-server kafka1:9092 \
  --entity-type brokers --entity-default --describe | grep offset.retention
# Check if a group's offsets still exist
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 \
  --describe --group my-group
# "Consumer group 'my-group' does not exist" = offsets have been purged

Manual Partition Assignment and seek(): Custom Replay

Sometimes you need to bypass the Group protocol entirely โ€” for targeted reprocessing, disaster recovery, or custom routing logic:

// Manual partition assignment โ€” no GroupCoordinator, no rebalancing, no auto-commit
List<TopicPartition> partitions = Arrays.asList(
    new TopicPartition("orders", 0),
    new TopicPartition("orders", 1)
);
consumer.assign(partitions);

// Note: assign() and subscribe() are mutually exclusive
// assign() gives you full control but requires manual offset management

seek() Variants

// Jump to a specific offset
consumer.seek(new TopicPartition("orders", 0), 12345L);

// Jump to partition beginning (reprocess everything retained)
consumer.seekToBeginning(consumer.assignment());

// Jump to end (skip all existing records, consume only new ones)
consumer.seekToEnd(consumer.assignment());

Time-Based Offset Lookup: offsetsForTimes()

This is one of Kafka's most powerful and underused consumer APIs:

// Find the offset of the first record at or after a given timestamp
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
long replayFrom = Instant.parse("2024-01-15T00:00:00Z").toEpochMilli();

for (TopicPartition tp : consumer.assignment()) {
    timestampsToSearch.put(tp, replayFrom);
}

Map<TopicPartition, OffsetAndTimestamp> result = 
    consumer.offsetsForTimes(timestampsToSearch);

for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : result.entrySet()) {
    TopicPartition tp = entry.getKey();
    OffsetAndTimestamp offsetAndTs = entry.getValue();
    
    if (offsetAndTs != null) {
        log.info("Seeking {} to offset {} (timestamp {})", 
            tp, offsetAndTs.offset(), offsetAndTs.timestamp());
        consumer.seek(tp, offsetAndTs.offset());
    } else {
        // No records exist at or after the target timestamp โ€” seek to end
        consumer.seekToEnd(Collections.singleton(tp));
    }
}

offsetsForTimes() uses the broker's .timeindex files โ€” binary search indexes over (timestamp, offset) pairs maintained alongside each log segment. Even on a partition with terabytes of data, the lookup completes in milliseconds. The broker returns the offset of the first record whose timestamp is greater than or equal to the requested timestamp.

Practical use: incident replay

# CLI equivalent: reset group offsets to a specific datetime
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 \
  --group order-consumers \
  --topic orders \
  --reset-offsets \
  --to-datetime 2024-01-15T00:00:00.000 \
  --execute

# Dry run first (omit --execute)
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 \
  --group order-consumers \
  --topic orders \
  --reset-offsets \
  --to-datetime 2024-01-15T00:00:00.000

The consumer group must be inactive (all members stopped) for --reset-offsets to work. This prevents conflicting with running consumers.

Idempotent Offset Commit Pattern

In at-least-once mode, duplicate processing can occur after consumer restarts or rebalances. For truly idempotent processing, track processed offsets at the application layer:

// Idempotent pattern: tie offset tracking to business state in the same transaction
@Transactional
public void processAndCommit(ConsumerRecord<String, String> record) {
    String dedupeKey = String.format("%s-%d-%d",
        record.topic(), record.partition(), record.offset());

    // Skip if already processed (e.g., stored in DB or Redis)
    if (deduplicationStore.exists(dedupeKey)) {
        log.debug("Skipping duplicate record: {}", dedupeKey);
        return;
    }

    // Process the message
    businessService.handle(record.value());

    // Mark as processed (in same DB transaction as the business logic)
    deduplicationStore.put(dedupeKey, Instant.now());

    // Commit Kafka offset
    consumer.commitSync(Map.of(
        new TopicPartition(record.topic(), record.partition()),
        new OffsetAndMetadata(record.offset() + 1)
    ));
}

For strongest guarantees, use Kafka Transactions (Chapter 7) to atomically commit both the consumer offset and any downstream Kafka messages in a single two-phase commit.

Lag Alerting Strategy

Lag alerts have three common patterns, each suited to different workload characteristics:

Absolute threshold (lag > N records): Simple and predictable. Use when message rates are stable. Alert if lag > 10000.

Rate-of-change (dLag/dt > 0 sustained): Alert when lag is continuously increasing, even if the absolute value is acceptable. This detects a consumer that is processing but falling behind โ€” it will eventually fail even though it looks "almost OK" now.

Time-based lag (lag_seconds > T): Estimate how many seconds of unprocessed data the lag represents using record timestamps. lag_seconds = (current_time - oldest_unprocessed_record_timestamp). This is the most business-meaningful metric โ€” "we're 45 minutes behind on order processing" is clearer than "we're 500,000 records behind."

# Continuous lag monitoring with watch
watch -n 10 "kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --describe --group order-consumers 2>/dev/null \
  | awk 'NR>1 {print \$6}' | awk '{sum+=\$1} END {print \"Total lag:\", sum}'"

From the binary format of __consumer_offsets to the three-position coordinate system, from the deliberate non-retry design of commitAsync to time-based partition replay with offsetsForTimes() โ€” offset management is where consumer correctness is won or lost. Every decision in this space (when to commit, which commit API, what to do on offset expiry, how to handle the first consumption) has a direct impact on whether your pipeline delivers at-most-once, at-least-once, or effectively-once semantics.

Rate this chapter
4.5  / 5  (35 ratings)

๐Ÿ’ฌ Comments