Chapter 12

Consumer Tuning and Lag Management

Consumer Tuning and Lag Management

Consumer tuning is one of the most experience-dependent areas of Kafka engineering. The subtle interactions between three core parameters, the correct way to measure lag, and the emergency strategies for massive backlogs โ€” each of these can be the root cause of production incidents. This chapter starts from the underlying semantics of each parameter and builds a complete picture of consumer tuning.

The Dangerous Triangle: Fatal Interactions Between Three Parameters

Deep Parameter Semantics

max.poll.records (default: 500): The maximum number of records returned by a single poll() call.

This controls the batch size delivered to your application per poll cycle. It does not affect how frequently the consumer fetches from brokers or how much data each network request carries โ€” those are controlled by fetch.min.bytes and fetch.max.bytes. It solely governs how many records are handed from the local fetch buffer to your processing loop per poll() invocation.

max.poll.interval.ms (default: 300000ms = 5 minutes): The maximum time allowed between successive poll() calls before the consumer is considered dead.

This is the consumer group's application-level liveness detection mechanism. Kafka's designers faced a dilemma: a consumer legitimately processing a large batch might not call poll() for a while, but that's not the same as being stuck. However, from the Group Coordinator's perspective, a consumer that hasn't polled for a long time is indistinguishable from a dead one.

max.poll.interval.ms resolves this: if a consumer doesn't call poll() within this interval, the Coordinator declares it failed, removes it from the group, and triggers a rebalance.

session.timeout.ms (default: 45000ms in Kafka 3.x; was 10000ms in older versions): The heartbeat timeout.

Heartbeats are sent by a dedicated background thread, independent of the main thread's poll() calls. Even when the main thread is blocked processing records, the heartbeat thread continues reporting to the Coordinator. Therefore, session.timeout.ms detects process-level failures (process crash, complete network loss), while max.poll.interval.ms detects application-level stalls (processing too slow, deadlock, infinite loop in business logic).

The Crash Scenario: When the Triangle Collapses

Consider this seemingly reasonable configuration:

max.poll.records     = 500
max.poll.interval.ms = 30000ms (30 seconds)

Processing logic: each record calls an external HTTP API, averaging 70ms per record.

500 records ร— 70ms/record = 35,000ms = 35 seconds > 30 seconds (max.poll.interval.ms)

Here's what happens:

T=0:     poll() returns 500 records
T=30s:   Coordinator has received no poll() from this consumer in 30 seconds
         โ†’ Coordinator kicks the consumer out of the group
         โ†’ Rebalance triggered
T=35s:   Consumer finishes processing, attempts to call poll()
         โ†’ Receives REBALANCE_IN_PROGRESS exception
         โ†’ Consumer rejoins the group
         โ†’ Another rebalance
T=0:     poll() returns the SAME 500 records again (offsets were never committed)
         โ†’ Infinite rebalance loop

The symptoms in production: the consumer group's rebalance counter climbs rapidly, lag grows instead of shrinking, and logs fill with Offset commit failed with a retriable exception and Heartbeat session expired.

// The problematic code pattern
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // DANGER: synchronous external call, 70ms per record
        externalApiClient.process(record.value());  // blocks here
    }
    // This line is never reached โ€” consumer was already kicked out at 30s
    consumer.commitSync();
}

Correct Configuration Principles

Principle 1: max.poll.interval.ms must exceed the worst-case processing time for one batch

max.poll.interval.ms > max.poll.records ร— worst_case_processing_time_per_record ร— safety_factor(1.5)

Example: 200 records ร— 80ms ร— 1.5 = 24,000ms โ†’ set max.poll.interval.ms=30000

Principle 2: session.timeout.ms must be less than max.poll.interval.ms

If session.timeout.ms > max.poll.interval.ms, the timeout check would fire before the poll interval check ever matters. Both detect failures, but they should remain distinct: session.timeout.ms is the last-resort detector (process crash), max.poll.interval.ms is the application-layer monitor (slow processing).

Principle 3: heartbeat.interval.ms should be roughly 1/3 of session.timeout.ms

This allows at least 3 heartbeat opportunities before declaring failure, providing resilience against transient network jitter.

// Correct configuration for external API call workload
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-api-processor");

// Reduce batch size: 50 records ร— 80ms = 4s, well under any reasonable interval
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "50");

// 50 records ร— 80ms ร— 1.5 safety factor โ‰ˆ 6s โ†’ set much higher for safety
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "120000"); // 2 minutes

// Heartbeat timeout: process-level failure detection
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "45000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "15000");

Async Processing: Fully Decoupling Poll from Processing

For truly high-throughput workloads, separate the poll loop from the processing entirely:

public class AsyncKafkaConsumer implements Runnable {
    private final KafkaConsumer<String, String> consumer;
    private final ExecutorService processingPool;
    private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> pendingOffsets
        = new ConcurrentHashMap<>();
    private volatile boolean running = true;

    public AsyncKafkaConsumer(Properties props) {
        this.consumer = new KafkaConsumer<>(props);
        // Thread pool sized to available CPU cores ร— 2 (I/O bound workload)
        this.processingPool = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors() * 2);
    }

    @Override
    public void run() {
        consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                // Commit offsets for revoked partitions before handing them off
                commitPendingOffsets(partitions);
            }
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                // Initialize state for new partitions if needed
            }
        });

        while (running) {
            ConsumerRecords<String, String> records =
                consumer.poll(Duration.ofMillis(1000));

            if (records.isEmpty()) continue;

            // Submit all records to thread pool, collect futures
            List<CompletableFuture<Void>> futures = new ArrayList<>();
            for (ConsumerRecord<String, String> record : records) {
                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                    processRecord(record);
                    // Record the offset atomically, keeping the highest seen
                    pendingOffsets.merge(
                        new TopicPartition(record.topic(), record.partition()),
                        new OffsetAndMetadata(record.offset() + 1),
                        (existing, incoming) ->
                            existing.offset() >= incoming.offset() ? existing : incoming
                    );
                }, processingPool);
                futures.add(future);
            }

            // Wait for batch completion with timeout < max.poll.interval.ms
            try {
                CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                    .get(60, TimeUnit.SECONDS); // well under max.poll.interval.ms=120s
            } catch (TimeoutException e) {
                log.error("Batch processing timed out โ€” risk of poll interval exceeded");
                // Could trigger graceful shutdown here
            } catch (Exception e) {
                log.error("Batch processing failed", e);
            }

            // Async commit the accumulated offsets
            consumer.commitAsync(new HashMap<>(pendingOffsets),
                (offsets, exception) -> {
                    if (exception != null) {
                        log.warn("Async commit failed for {}", offsets, exception);
                    }
                });
            pendingOffsets.clear();
        }
    }

    private void commitPendingOffsets(Collection<TopicPartition> partitions) {
        Map<TopicPartition, OffsetAndMetadata> toCommit = partitions.stream()
            .filter(pendingOffsets::containsKey)
            .collect(Collectors.toMap(tp -> tp, pendingOffsets::get));
        if (!toCommit.isEmpty()) {
            consumer.commitSync(toCommit);
        }
    }
}

Important caveat: async processing introduces an exactly-once delivery challenge. If the thread pool completes processing but the offset commit fails before the next poll(), those records will be reprocessed after a restart. Your processing logic must be idempotent, or you must use a transactional offset commit strategy.

Measuring Consumer Lag: Three Levels of Visibility

Consumer lag = partition's Log End Offset โˆ’ consumer's committed offset. The absolute value matters, but the trend โ€” is it growing or shrinking? โ€” is what determines whether your system is healthy.

Method 1: kafka-consumer-groups.sh for Quick Diagnosis

# Full lag breakdown per partition
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group order-processor \
  --describe

# Example output:
# GROUP           TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG    CONSUMER-ID           HOST
# order-processor orders  0          1,000,000       1,050,000       50,000  consumer-0-abc       /10.0.1.10
# order-processor orders  1          2,000,000       2,100,000      100,000  consumer-1-def       /10.0.1.11
# order-processor orders  2          1,500,000       1,500,100          100  consumer-2-ghi       /10.0.1.12

# Calculate total lag across all partitions
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group order-processor \
  --describe 2>/dev/null \
  | awk 'NR>1 && $6 ~ /^[0-9]+$/ {sum += $6} END {print "Total lag:", sum}'

# List all groups with lag > 0
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list 2>/dev/null \
  | while read group; do
      lag=$(kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
        --group "$group" --describe 2>/dev/null \
        | awk '$6 ~ /^[0-9]+$/ {sum+=$6} END {print sum}')
      [ "$lag" -gt 0 ] 2>/dev/null && echo "$group: $lag"
    done

Limitation: This is a point-in-time snapshot. You see the lag at the moment of the query, not whether it's trending up or down.

Method 2: JMX Metrics for Per-Partition Monitoring

The consumer exposes these JMX metrics under kafka.consumer:type=consumer-fetch-manager-metrics:

# Query via JConsole or kafka-jmx-tool.jar
# Assuming JMX is exposed on port 9999:
java -jar kafka-jmx-tool.jar \
  --jmx-url service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi \
  --object-name "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-order-processor-1" \
  --attributes records-lag-max,records-lag-avg

With Prometheus JMX Exporter:

# prometheus-alert.yml
groups:
  - name: kafka-consumer
    rules:
      - alert: ConsumerLagHigh
        expr: |
          kafka_consumer_fetch_manager_records_lag_max{
            group="order-processor"
          } > 100000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Consumer lag critical: {{ $value }} records behind"
          runbook: "https://wiki.company.com/kafka-lag-runbook"

      - alert: ConsumerLagGrowing
        expr: |
          increase(kafka_consumer_fetch_manager_records_lag_max{
            group="order-processor"
          }[10m]) > 10000
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Consumer lag is growing at {{ $value }} records/10min"

Method 3: Burrow โ€” Sliding Window Trend Analysis

Burrow is LinkedIn's open-source Kafka consumer monitoring tool. Its key innovation is that it evaluates lag trends, not just snapshots.

Burrow's evaluation logic:

For each consumer group, Burrow maintains a sliding window (default: 10 samples, collected every 60 seconds) and evaluates:

  1. Rate of consumer_offset growth โ†’ consumption speed
  2. Rate of log_end_offset growth โ†’ production speed
  3. Decision logic:
    • consumer_offset growing at rate โ‰ฅ log_end_offset growth โ†’ OK (keeping up or catching up)
    • Lag value is growing โ†’ WARNING (falling behind)
    • consumer_offset completely stagnant โ†’ ERROR (consumer stopped processing)
    • consumer_offset decreasing (rewind) โ†’ REWIND (offset reset detected)
    • Consumer exists but never commits โ†’ NOTFOUND

The STALL status is particularly valuable: when offsets are moving (consumer appears alive) but lag isn't decreasing, it means consumption speed equals production speed โ€” the consumer will never catch up. This subtle condition is invisible in point-in-time snapshots.

# Burrow configuration (burrow.toml)
[kafka.production]
brokers = ["broker1:9092", "broker2:9092", "broker3:9092"]
offset-refresh = 30    # fetch consumer offsets every 30s
topic-refresh = 60     # refresh topic metadata every 60s

[consumer.production]
cluster = "production"
group-whitelist = ".*"
group-blacklist = "^_"  # skip internal groups

# Query Burrow HTTP API
curl -s http://localhost:8000/v3/kafka/production/consumer/order-processor/status | jq .

# Response:
# {
#   "status": {
#     "cluster": "production",
#     "group": "order-processor",
#     "status": "WARN",
#     "complete": true,
#     "partitions": [
#       {
#         "topic": "orders",
#         "partition": 1,
#         "status": "WARN",
#         "start": {"offset": 2000000, "timestamp": 1714000000000, "lag": 50000},
#         "end":   {"offset": 2000100, "timestamp": 1714000060000, "lag": 100000},
#         "current_lag": 100000,
#         "complete": true
#       }
#     ],
#     "maxlag": {"topic": "orders", "partition": 1, "current_lag": 100000}
#   }
# }

Five Emergency Strategies for Massive Lag

When lag reaches tens of millions of records, immediate action is required. These strategies are ordered from least to most disruptive:

Strategy 1: Scale Out Consumers (Least Disruptive)

Consumer count is capped at partition count โ€” consumers beyond that number sit idle. If you have headroom, scaling out is the safest first step.

# Check current partition count
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic orders
# PartitionCount: 12

# Check current consumer count
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group order-processor --describe | grep -c "order-processor"
# 4 consumers โ†’ can scale to 12

# Scale up
kubectl scale deployment order-processor --replicas=12

# Monitor lag reduction
watch -n 10 'kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group order-processor \
  --describe 2>/dev/null | awk "NR>1 && \$6~/^[0-9]+$/ {sum+=\$6} END {print \"Total lag:\", sum}"'

If partition count is insufficient, you'll need to increase partitions first โ€” but this requires careful consideration as partition count cannot be decreased and changing it affects key-based ordering guarantees.

Strategy 2: Skip to Latest Offset (Most Aggressive)

When stale data is acceptable to drop โ€” analytics events, real-time metrics, non-critical notifications:

# CRITICAL: Stop all consumers first
kubectl scale deployment order-processor --replicas=0

# Wait for group to become Empty
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group order-processor --describe

# Reset offsets to latest
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group order-processor \
  --topic orders \
  --reset-offsets \
  --to-latest \
  --execute

# Verify reset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group order-processor --describe

# Restart consumers
kubectl scale deployment order-processor --replicas=3

Absolute prohibition: Never use this for order processing, payment events, inventory updates, or any data that must be processed exactly once. The skipped messages are gone forever from this consumer group's perspective.

Strategy 3: Parallel Catchup Consumer Group

Create a separate consumer group to consume the backlog in parallel while the original group continues from where it is:

// Temporary catchup consumer โ€” runs alongside the original group
public class CatchupConsumer {

    public static void main(String[] args) {
        String catchupGroupId = "order-processor-catchup-" +
            LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE);

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, catchupGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());

        // High-throughput tuning for catchup
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2000");
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1048576"); // 1MB
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1000");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000"); // 10min

        KafkaConsumer<String, String> catchupConsumer = new KafkaConsumer<>(props);

        // Manually assign all partitions (no group rebalance needed)
        List<TopicPartition> partitions = IntStream.range(0, 12)
            .mapToObj(i -> new TopicPartition("orders", i))
            .collect(Collectors.toList());
        catchupConsumer.assign(partitions);

        // Seek to the start of the backlog (where the original group is stuck)
        Map<TopicPartition, Long> lagStartOffsets = getLagStartOffsets("order-processor");
        lagStartOffsets.forEach(catchupConsumer::seek);

        long targetOffset = getCurrentEndOffset("orders");

        // Process until we reach the end (where original group resumes)
        while (true) {
            ConsumerRecords<String, String> records =
                catchupConsumer.poll(Duration.ofMillis(1000));

            for (ConsumerRecord<String, String> record : records) {
                processIdempotently(record); // MUST be idempotent!
            }

            catchupConsumer.commitSync();

            // Check if we've caught up to the target
            boolean caughtUp = partitions.stream().allMatch(tp ->
                catchupConsumer.position(tp) >= targetOffset);
            if (caughtUp) {
                log.info("Catchup consumer has reached target offset, shutting down");
                break;
            }
        }

        catchupConsumer.close();
    }
}

Strategy 4: Batch Size Increase + Async Processing

Increase throughput per consumer instance without adding more instances:

// High-throughput catchup configuration
Properties props = new Properties();
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2000");         // 4ร— larger batches
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1048576");       // 1MB minimum per fetch
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1000");        // wait up to 1s for data
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "10485760"); // 10MB per partition
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000");   // 10 minutes

// Pair with async processing using all available CPU cores
ExecutorService pool = Executors.newFixedThreadPool(
    Runtime.getRuntime().availableProcessors() * 2);

This works best when the processing bottleneck is CPU or I/O within the consumer process itself, and when the consumer's CPU is underutilized.

Strategy 5: Graceful Degradation (Protect the Critical Path)

When the system is overloaded and any catchup attempt risks cascading failures to downstream systems, prioritize protecting the core business path:

public void processRecord(ConsumerRecord<String, String> record) {
    BusinessEvent event = deserialize(record.value());
    long eventAgeMs = System.currentTimeMillis() - event.getTimestampMs();

    // Tiered handling by event criticality and age
    switch (event.getType()) {
        case PAYMENT:
        case ORDER_CONFIRMED:
            // Critical: always process, regardless of age
            coreProcessor.process(event);
            break;

        case INVENTORY_UPDATE:
            if (eventAgeMs < 3_600_000) { // less than 1 hour old
                inventoryService.update(event);
            } else {
                // Stale inventory update: log and skip, inventory will self-correct
                log.warn("Dropping stale inventory update, age={}s, SKU={}",
                    eventAgeMs / 1000, event.getSku());
                metrics.increment("events.dropped.stale_inventory");
            }
            break;

        case ANALYTICS:
        case RECOMMENDATION_SIGNAL:
            if (eventAgeMs < 1_800_000) { // less than 30 minutes
                analyticsService.record(event);
            } else {
                // Analytics older than 30 minutes are worthless for real-time models
                log.debug("Dropping stale analytics event, age={}s", eventAgeMs / 1000);
                metrics.increment("events.dropped.stale_analytics");
            }
            break;
    }
}

Fetch Tuning: Fewer Requests, Higher Throughput

fetch.min.bytes and fetch.max.wait.ms Working Together

These two parameters control when the broker responds to a FetchRequest:

Scenario A: Broker has data โ‰ฅ fetch.min.bytes โ€” responds immediately, no waiting.

Scenario B: Broker has data < fetch.min.bytes โ€” broker holds the request until either:

This broker-side waiting is free in terms of consumer resources โ€” the consumer's network thread is not blocked, it's simply waiting for the broker's response.

// High-throughput configuration (for catching up on lag)
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1048576");       // 1MB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1000");        // max 1s wait
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5242880"); // 5MB/partition

// Low-latency configuration (for real-time processing)
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");             // respond immediately
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "100");         // max 100ms wait
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1048576"); // 1MB/partition

Parameter Quick Reference by Workload

Workload Type max.poll.records fetch.min.bytes fetch.max.wait.ms max.poll.interval.ms
Low-latency real-time 50โ€“100 1 byte 100ms 30s
Standard business processing 200โ€“500 4 KB 500ms 5min
High-throughput lag catchup 1000โ€“2000 1 MB 1000ms 10min
Heavy compute (ML inference) 10โ€“20 1 byte 100ms 30min

Consumer tuning has no universal optimal configuration. The correct approach is to establish baseline measurements first, then tune against the measured bottleneck. Profile your processing logic before touching Kafka parameters โ€” most consumer performance problems live in the application code, not the Kafka configuration.

Rate this chapter
4.6  / 5  (27 ratings)

๐Ÿ’ฌ Comments