Chapter 22

Timing Wheel and Delayed Operations: The Purgatory Design

The Scale of the Problem

Throughout the preceding chapters we repeatedly encountered DelayedProduce, DelayedFetch, and DelayedJoin โ€” operations that need to either time out after a configurable deadline or complete early when a condition is satisfied. In production, a busy Kafka broker may have tens of thousands of such delayed operations pending simultaneously.

The most intuitive solution is Java's DelayQueue: a min-heap priority queue that surfaces the earliest-expiring task at the top.

Why DelayQueue does not scale:

Kafka instead uses a Hierarchical Timing Wheel, which achieves O(1) insert and delete for both operations.

The Single-Level Timing Wheel

Understand the single-level wheel before adding hierarchy.

Imagine a clock face: the dial has N slots (Buckets), and the second hand advances one slot every tickMs milliseconds. Each Bucket is a doubly-linked list holding all tasks that expire during that slot:

Bucket[0]  Bucket[1]  Bucket[2]  ...  Bucket[N-1]
   โ”‚           โ”‚           โ”‚                โ”‚
[Task A]   [Task B]    [Task C]          [Empty]
[Task D]

Limitation: a single-level wheel can only handle tasks with deadlines within tickMs * N milliseconds. Tasks beyond this range need somewhere to go.

Hierarchical Timing Wheel: Cascading to Extend Range

A hierarchical timing wheel is a nested set of single-level wheels, analogous to the seconds, minutes, and hours hands of a clock:

Level 1 (tickMs=1ms,   wheelSize=20, range: 20ms)
Level 2 (tickMs=20ms,  wheelSize=20, range: 400ms)
Level 3 (tickMs=400ms, wheelSize=20, range: 8,000ms)

Cascading (demotion): when Level 2's hand ticks and a bucket expires, the tasks in that bucket may have (say) 15ms remaining before they truly expire. They are re-inserted into the appropriate Level 1 bucket. This "demotion into a lower-level wheel" is the fundamental cascade mechanism of hierarchical timing wheels.

Kafka's Implementation: TimingWheel + SystemTimer

TimingWheel.scala

// TimingWheel.scala (core/src/main/scala/kafka/utils/timer/, simplified)
private[timer] class TimingWheel(
  tickMs: Long,       // duration of one bucket in milliseconds
  wheelSize: Int,     // number of buckets
  startMs: Long,      // wall-clock time at wheel creation
  taskCounter: AtomicInteger,        // shared task count across all wheels
  queue: DelayQueue[TimerTaskList]   // used to drive Tick precisely
) {
  val interval: Long = tickMs * wheelSize  // total time range this wheel covers

  // N buckets; each is a circular doubly-linked list
  private[this] val buckets: Array[TimerTaskList] = Array.tabulate[TimerTaskList](wheelSize) {
    _ => new TimerTaskList(taskCounter)
  }

  // Current time, aligned to tickMs boundary
  private[this] var currentTime: Long = startMs - (startMs % tickMs)

  // Overflow wheel: lazily created when a task exceeds this wheel's range
  @volatile private[this] var overflowWheel: TimingWheel = null

  private[this] def addOverflowWheel(): Unit = synchronized {
    if (overflowWheel == null) {
      // The overflow wheel's tickMs equals this wheel's total interval
      overflowWheel = new TimingWheel(
        tickMs      = interval,
        wheelSize   = wheelSize,
        startMs     = currentTime,
        taskCounter = taskCounter,
        queue       = queue
      )
    }
  }

  def add(timerTask: TimerTask): Boolean = {
    val expiration = timerTask.expirationMs

    if (timerTask.cancelled) {
      false   // already cancelled โ€” ignore
    } else if (expiration < currentTime + tickMs) {
      // Already expired (or expiring within the next tick) โ€” caller should execute immediately
      false
    } else if (expiration < currentTime + interval) {
      // Within this wheel's range
      val virtualId = expiration / tickMs
      val bucket    = buckets((virtualId % wheelSize.toLong).toInt)
      bucket.add(timerTask)

      // If this bucket just received its first task, offer it to the DelayQueue
      // so SystemTimer knows when to come back and process it
      if (bucket.setExpiration(virtualId * tickMs)) {
        queue.offer(bucket)
      }
      true
    } else {
      // Overflow: delegate to the higher-level wheel
      if (overflowWheel == null) addOverflowWheel()
      overflowWheel.add(timerTask)
    }
  }

  // Advance the wheel's clock; cascade to overflow wheel
  def advanceClock(timeMs: Long): Unit = {
    if (timeMs >= currentTime + tickMs) {
      currentTime = timeMs - (timeMs % tickMs)
      if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
    }
  }
}

TimerTaskList โ€” The Bucket (Circular Doubly-Linked List)

// TimerTaskList.scala (simplified)
private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {

  // Sentinel/root node simplifies list manipulation
  private[this] val root = new TimerTaskEntry(null, -1)
  root.next = root
  root.prev = root

  private[this] val expiration = new AtomicLong(-1L)

  // Returns true if expiration changed (bucket became non-empty for first time)
  def setExpiration(expirationMs: Long): Boolean = expiration.getAndSet(expirationMs) != expirationMs

  def getExpiration: Long = expiration.get

  // O(1) insert at tail
  def add(timerTaskEntry: TimerTaskEntry): Unit = {
    var done = false
    while (!done) {
      timerTaskEntry.remove()  // detach from old list if present
      synchronized {
        timerTaskEntry.synchronized {
          if (timerTaskEntry.list == null) {
            val tail = root.prev
            timerTaskEntry.next  = root
            timerTaskEntry.prev  = tail
            timerTaskEntry.list  = this
            tail.next  = timerTaskEntry
            root.prev  = timerTaskEntry
            taskCounter.incrementAndGet()
            done = true
          }
        }
      }
    }
  }

  // O(1) removal
  def remove(timerTaskEntry: TimerTaskEntry): Unit = synchronized {
    timerTaskEntry.synchronized {
      if (timerTaskEntry.list eq this) {
        timerTaskEntry.next.prev = timerTaskEntry.prev
        timerTaskEntry.prev.next = timerTaskEntry.next
        timerTaskEntry.next = null
        timerTaskEntry.prev = null
        timerTaskEntry.list = null
        taskCounter.decrementAndGet()
      }
    }
  }

  // Drain all entries and apply f to each (cascade: re-insert into lower wheel, or execute)
  def flush(f: (TimerTaskEntry) => Unit): Unit = synchronized {
    var head = root.next
    while (head ne root) {
      remove(head)
      f(head)
      head = root.next
    }
    expiration.set(-1L)
  }
}

SystemTimer โ€” Driving the Whole System

// SystemTimer.scala (simplified)
class SystemTimer(executorName: String, tickMs: Long = 1, wheelSize: Int = 20, ...) extends Timer {

  // Single-threaded executor for running expired tasks
  private[this] val taskExecutor = Executors.newFixedThreadPool(1, ...)

  // DelayQueue: holds non-empty Buckets sorted by expiration time
  // SystemTimer polls this to know exactly when to advance the clock
  private[this] val delayQueue = new DelayQueue[TimerTaskList]()

  // The first-level timing wheel
  private[this] val timingWheel = new TimingWheel(
    tickMs      = tickMs,
    wheelSize   = wheelSize,
    startMs     = time.hiResClockMs,
    taskCounter = taskCounter,
    queue       = delayQueue
  )

  def add(timerTask: TimerTask): Unit = {
    readLock.lock()
    try {
      addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + time.hiResClockMs))
    } finally {
      readLock.unlock()
    }
  }

  private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
    if (!timingWheel.add(timerTaskEntry)) {
      // add() returns false โ†’ task has already expired
      if (!timerTaskEntry.cancelled) {
        taskExecutor.submit(timerTaskEntry.timerTask)  // execute immediately
      }
    }
  }

  /**
   * Advance the clock, processing all buckets that have become due.
   * Called periodically โ€” or event-driven via DelayQueue.poll().
   */
  def advanceClock(timeoutMs: Long): Boolean = {
    // Block until the next bucket expires (or timeoutMs elapses)
    var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
    if (bucket != null) {
      writeLock.lock()
      try {
        while (bucket != null) {
          // Push the clock forward
          timingWheel.advanceClock(bucket.getExpiration)
          // Flush the bucket: re-insert tasks into a lower wheel, or execute immediately
          bucket.flush(addTimerTaskEntry)
          // Check for more expired buckets without blocking
          bucket = delayQueue.poll()
        }
      } finally {
        writeLock.unlock()
      }
      true
    } else {
      false
    }
  }
}

The Key Insight: DelayQueue on Buckets, Not Tasks

You might have noticed that TimingWheel uses a DelayQueue โ€” but critically, it manages Buckets rather than individual tasks. The number of non-empty buckets is always much smaller than the number of tasks (one bucket can hold any number of tasks). The O(log n) operations on DelayQueue operate on bucket count, not task count โ€” the overhead remains tiny regardless of how many tasks are in the system.

This design elegantly combines the strengths of both structures:

DelayedOperation and Purgatory

The DelayedOperation Abstract Class

// DelayedOperation.scala (core/, simplified)
abstract class DelayedOperation(
  override val delayMs: Long,
  lockOpt: Option[Lock] = None
) extends TimerTask with Logging {

  private val completed = new AtomicBoolean(false)

  /**
   * Check whether the operation can complete now.
   * Implemented by subclasses to check the specific condition.
   * Return true to complete; call forceComplete() to do so.
   */
  def tryComplete(): Boolean

  /**
   * Guarantee exactly-once completion (CAS protection).
   * Called both by the timer (on timeout) and by condition-check code.
   */
  def forceComplete(): Boolean = {
    if (completed.compareAndSet(false, true)) {
      cancel()     // remove from TimingWheel (no-op if already expired)
      onComplete() // subclass-defined completion logic (e.g., send response)
      true
    } else {
      false        // already completed by another path
    }
  }

  // Called by TimingWheel when the timer fires
  override def run(): Unit = {
    if (forceComplete())
      onExpiration()   // optional subclass hook for timeout-specific logic
  }

  // Subclass defines what "done" means (e.g., DelayedProduce โ†’ invoke responseCallback)
  def onComplete(): Unit

  def onExpiration(): Unit = {}
}

DelayedOperationPurgatory โ€” Managing All Delayed Operations

// DelayedOperationPurgatory.scala (core/, simplified)
final class DelayedOperationPurgatory[T <: DelayedOperation](
  purgatoryName: String,
  brokerId: Int     = 0,
  purgeInterval: Int = 1000,
  reaperEnabled: Boolean = true,
  timerEnabled: Boolean  = true
) extends Logging with KafkaMetricsGroup {

  private[this] val timeoutTimer = if (timerEnabled) new SystemTimer(purgatoryName) else new MockTimer

  // Watchers: map from Key โ†’ list of DelayedOperations watching that key
  // When a condition that might satisfy an operation occurs, we look up by key
  private val watchersForKey = new ConcurrentHashMap[Any, Watchers]()

  private[this] val estimatedTotalOperations = new AtomicInteger(0)

  /**
   * Try to complete the operation immediately.
   * If it cannot complete, register it with the timer and all relevant Watchers.
   */
  def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
    // Optimistic: try completing without locks
    if (operation.tryComplete()) return true

    var isCompletedByMe = false
    operation.lock.lock()
    try {
      isCompletedByMe = operation.tryComplete()
      if (!isCompletedByMe) {
        // Register with each watch key
        watchKeys.foreach { key =>
          if (!operation.isCompleted) {
            watchersFor(key).watch(operation)
          }
        }
        // Add to timing wheel for timeout handling
        if (!operation.isCompleted) {
          estimatedTotalOperations.incrementAndGet()
          timeoutTimer.add(operation)
        }
      }
    } finally {
      operation.lock.unlock()
    }
    isCompletedByMe
  }

  /**
   * Called when a condition associated with a key may have been satisfied.
   * Example: new data written to Partition P0 โ†’ checkAndComplete(TopicPartitionOperationKey(P0))
   * Iterates all DelayedOperations watching that key and calls tryComplete() on each.
   */
  def checkAndComplete(key: Any): Int = {
    val watchers = inReadLock(removeWatchersLock) { watchersForKey.get(key) }
    if (watchers == null) 0
    else watchers.tryCompleteWatched()
  }

  private class Watchers(val key: Any) {
    private[this] val operations = new LinkedList[T]()

    def watch(t: T): Unit = synchronized { operations.add(t) }

    def tryCompleteWatched(): Int = {
      var completed = 0
      synchronized {
        val iter = operations.iterator()
        while (iter.hasNext) {
          val curr = iter.next()
          if (curr.isCompleted) {
            iter.remove()        // already done, clean up
          } else if (curr.tryComplete()) {
            completed += 1
            iter.remove()        // just completed, clean up
          }
        }
      }
      completed
    }
  }
}

The Purgatory in Action: End-to-End Walk-Through

For an acks=-1 Produce request:

Producer sends message batch
    โ”‚
    โ†“
ReplicaManager.appendRecords()
    โ”‚  Write to local log (LEO advances)
    โ†“
Create DelayedProduce(timeout=30,000ms)
    โ”‚
    โ”œโ”€โ”€ tryComplete() โ†’ false (HW has not advanced yet)
    โ”‚
    โ”œโ”€โ”€ Register in Purgatory:
    โ”‚   watchKeys = [TopicPartitionOperationKey(tp0), TopicPartitionOperationKey(tp1)]
    โ”‚   watchersForKey[tp0].watch(delayedProduce)
    โ”‚   timeoutTimer.add(delayedProduce)   โ† enters Level 3 timing wheel
    โ”‚
    โ””โ”€โ”€ Suspended, waiting...

Follower sends FetchRequest (fetches data for tp0)
    โ”‚
    โ†“
Leader records follower's fetch offset; HW advances to cover the write
    โ”‚
    โ†“
delayedProducePurgatory.checkAndComplete(TopicPartitionOperationKey(tp0))
    โ”‚
    โ†“
DelayedProduce.tryComplete() โ†’ true (partition.highWatermark >= requiredOffset)
    โ”‚  forceComplete() โ†’ cancel() + onComplete()
    โ†“
responseCallback(produceResults) โ†’ response sent to Producer

The timing wheel handles the timeout path: if no follower fetches within 30 seconds, the timer fires DelayedProduce.run(), which calls forceComplete() and returns a REQUEST_TIMED_OUT error to the producer.

Comparison with Other Systems

Linux Kernel Timing Wheel

The Linux kernel uses a hierarchical timing wheel (include/linux/timer.h), with 5 levels, 256 buckets each, covering a range of approximately 340 million milliseconds (~4 days). Kafka's implementation is simpler (2โ€“3 levels in typical deployments), but the core design is identical.

Netty's HashedWheelTimer

Netty's HashedWheelTimer is a single-level wheel augmented with a "round" counter: each bucket stores tasks from multiple rounds, distinguished by how many full wheel rotations must pass before they expire. This avoids the complexity of multi-level wheels but introduces imprecision โ€” a task may fire up to one full wheel rotation late. Kafka requires precise timeout semantics (a request must time out within timeout.ms) so this imprecision is unacceptable.

Side-by-Side Comparison

Approach Insert Delete Precision Suitable For
DelayQueue O(log n) O(log n) Exact Small task counts
HashedWheelTimer O(1) O(1) Approximate Tolerance for ยฑ1 tick error
Hierarchical Timing Wheel O(1) O(1) Exact High concurrency + exact timeouts

Performance Impact

According to JMH benchmarks referenced in Kafka's KIP-219 discussion, with 100,000 concurrent delayed operations:

What might appear to be an academic data structures choice is, in fact, one of the concrete engineering reasons Kafka maintains low latency under high load. Every acks=-1 produce request, every fetch.min.bytes consumer wait, and every join-group rebalance timeout flows through this timing wheel โ€” getting it right at O(1) rather than O(log n) makes a measurable difference at Kafka's operating scale.

Rate this chapter
4.5  / 5  (7 ratings)

๐Ÿ’ฌ Comments