Timing Wheel and Delayed Operations: The Purgatory Design
The Scale of the Problem
Throughout the preceding chapters we repeatedly encountered DelayedProduce, DelayedFetch, and DelayedJoin — operations that need to either time out after a configurable deadline or complete early when a condition is satisfied. In production, a busy Kafka broker may have tens of thousands of such delayed operations pending simultaneously.
The most intuitive solution is Java's DelayQueue: a min-heap priority queue that surfaces the earliest-expiring task at the top.
Why DelayQueue does not scale:
- Both insert and delete are
O(log n) - With n = 100,000, each operation requires 17 comparisons
- Kafka may generate tens of thousands of
DelayedProduceper second, creating a very high insertion rate - Most delayed operations complete before timeout (via
tryComplete()succeeding), requiring frequent removal from the heap
Kafka instead uses a Hierarchical Timing Wheel, which achieves O(1) insert and delete for both operations.
The Single-Level Timing Wheel
Understand the single-level wheel before adding hierarchy.
Imagine a clock face: the dial has N slots (Buckets), and the second hand advances one slot every tickMs milliseconds. Each Bucket is a doubly-linked list holding all tasks that expire during that slot:
Bucket[0] Bucket[1] Bucket[2] ... Bucket[N-1]
│ │ │ │
[Task A] [Task B] [Task C] [Empty]
[Task D]
- Insert: compute the target bucket as
(expiration / tickMs) % N, prepend to its linked list —O(1) - Tick: advance the hand, drain the current bucket, execute or re-enqueue each task —
O(expired tasks) - Delete (early completion): doubly-linked list supports
O(1)removal — no heap rebuild needed
Limitation: a single-level wheel can only handle tasks with deadlines within tickMs * N milliseconds. Tasks beyond this range need somewhere to go.
Hierarchical Timing Wheel: Cascading to Extend Range
A hierarchical timing wheel is a nested set of single-level wheels, analogous to the seconds, minutes, and hours hands of a clock:
Level 1 (tickMs=1ms, wheelSize=20, range: 20ms)
Level 2 (tickMs=20ms, wheelSize=20, range: 400ms)
Level 3 (tickMs=400ms, wheelSize=20, range: 8,000ms)
- Tasks with delay ≤ 20ms go to Level 1
- Tasks with delay 20–400ms go to Level 2
- Tasks with delay 400–8,000ms go to Level 3
- Longer delays extend the hierarchy upward
Cascading (demotion): when Level 2's hand ticks and a bucket expires, the tasks in that bucket may have (say) 15ms remaining before they truly expire. They are re-inserted into the appropriate Level 1 bucket. This "demotion into a lower-level wheel" is the fundamental cascade mechanism of hierarchical timing wheels.
Kafka's Implementation: TimingWheel + SystemTimer
TimingWheel.scala
// TimingWheel.scala (core/src/main/scala/kafka/utils/timer/, simplified)
private[timer] class TimingWheel(
tickMs: Long, // duration of one bucket in milliseconds
wheelSize: Int, // number of buckets
startMs: Long, // wall-clock time at wheel creation
taskCounter: AtomicInteger, // shared task count across all wheels
queue: DelayQueue[TimerTaskList] // used to drive Tick precisely
) {
val interval: Long = tickMs * wheelSize // total time range this wheel covers
// N buckets; each is a circular doubly-linked list
private[this] val buckets: Array[TimerTaskList] = Array.tabulate[TimerTaskList](wheelSize) {
_ => new TimerTaskList(taskCounter)
}
// Current time, aligned to tickMs boundary
private[this] var currentTime: Long = startMs - (startMs % tickMs)
// Overflow wheel: lazily created when a task exceeds this wheel's range
@volatile private[this] var overflowWheel: TimingWheel = null
private[this] def addOverflowWheel(): Unit = synchronized {
if (overflowWheel == null) {
// The overflow wheel's tickMs equals this wheel's total interval
overflowWheel = new TimingWheel(
tickMs = interval,
wheelSize = wheelSize,
startMs = currentTime,
taskCounter = taskCounter,
queue = queue
)
}
}
def add(timerTask: TimerTask): Boolean = {
val expiration = timerTask.expirationMs
if (timerTask.cancelled) {
false // already cancelled — ignore
} else if (expiration < currentTime + tickMs) {
// Already expired (or expiring within the next tick) — caller should execute immediately
false
} else if (expiration < currentTime + interval) {
// Within this wheel's range
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTask)
// If this bucket just received its first task, offer it to the DelayQueue
// so SystemTimer knows when to come back and process it
if (bucket.setExpiration(virtualId * tickMs)) {
queue.offer(bucket)
}
true
} else {
// Overflow: delegate to the higher-level wheel
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTask)
}
}
// Advance the wheel's clock; cascade to overflow wheel
def advanceClock(timeMs: Long): Unit = {
if (timeMs >= currentTime + tickMs) {
currentTime = timeMs - (timeMs % tickMs)
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
}
}
}
TimerTaskList — The Bucket (Circular Doubly-Linked List)
// TimerTaskList.scala (simplified)
private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
// Sentinel/root node simplifies list manipulation
private[this] val root = new TimerTaskEntry(null, -1)
root.next = root
root.prev = root
private[this] val expiration = new AtomicLong(-1L)
// Returns true if expiration changed (bucket became non-empty for first time)
def setExpiration(expirationMs: Long): Boolean = expiration.getAndSet(expirationMs) != expirationMs
def getExpiration: Long = expiration.get
// O(1) insert at tail
def add(timerTaskEntry: TimerTaskEntry): Unit = {
var done = false
while (!done) {
timerTaskEntry.remove() // detach from old list if present
synchronized {
timerTaskEntry.synchronized {
if (timerTaskEntry.list == null) {
val tail = root.prev
timerTaskEntry.next = root
timerTaskEntry.prev = tail
timerTaskEntry.list = this
tail.next = timerTaskEntry
root.prev = timerTaskEntry
taskCounter.incrementAndGet()
done = true
}
}
}
}
}
// O(1) removal
def remove(timerTaskEntry: TimerTaskEntry): Unit = synchronized {
timerTaskEntry.synchronized {
if (timerTaskEntry.list eq this) {
timerTaskEntry.next.prev = timerTaskEntry.prev
timerTaskEntry.prev.next = timerTaskEntry.next
timerTaskEntry.next = null
timerTaskEntry.prev = null
timerTaskEntry.list = null
taskCounter.decrementAndGet()
}
}
}
// Drain all entries and apply f to each (cascade: re-insert into lower wheel, or execute)
def flush(f: (TimerTaskEntry) => Unit): Unit = synchronized {
var head = root.next
while (head ne root) {
remove(head)
f(head)
head = root.next
}
expiration.set(-1L)
}
}
SystemTimer — Driving the Whole System
// SystemTimer.scala (simplified)
class SystemTimer(executorName: String, tickMs: Long = 1, wheelSize: Int = 20, ...) extends Timer {
// Single-threaded executor for running expired tasks
private[this] val taskExecutor = Executors.newFixedThreadPool(1, ...)
// DelayQueue: holds non-empty Buckets sorted by expiration time
// SystemTimer polls this to know exactly when to advance the clock
private[this] val delayQueue = new DelayQueue[TimerTaskList]()
// The first-level timing wheel
private[this] val timingWheel = new TimingWheel(
tickMs = tickMs,
wheelSize = wheelSize,
startMs = time.hiResClockMs,
taskCounter = taskCounter,
queue = delayQueue
)
def add(timerTask: TimerTask): Unit = {
readLock.lock()
try {
addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + time.hiResClockMs))
} finally {
readLock.unlock()
}
}
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
if (!timingWheel.add(timerTaskEntry)) {
// add() returns false → task has already expired
if (!timerTaskEntry.cancelled) {
taskExecutor.submit(timerTaskEntry.timerTask) // execute immediately
}
}
}
/**
* Advance the clock, processing all buckets that have become due.
* Called periodically — or event-driven via DelayQueue.poll().
*/
def advanceClock(timeoutMs: Long): Boolean = {
// Block until the next bucket expires (or timeoutMs elapses)
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
if (bucket != null) {
writeLock.lock()
try {
while (bucket != null) {
// Push the clock forward
timingWheel.advanceClock(bucket.getExpiration)
// Flush the bucket: re-insert tasks into a lower wheel, or execute immediately
bucket.flush(addTimerTaskEntry)
// Check for more expired buckets without blocking
bucket = delayQueue.poll()
}
} finally {
writeLock.unlock()
}
true
} else {
false
}
}
}
The Key Insight: DelayQueue on Buckets, Not Tasks
You might have noticed that TimingWheel uses a DelayQueue — but critically, it manages Buckets rather than individual tasks. The number of non-empty buckets is always much smaller than the number of tasks (one bucket can hold any number of tasks). The O(log n) operations on DelayQueue operate on bucket count, not task count — the overhead remains tiny regardless of how many tasks are in the system.
This design elegantly combines the strengths of both structures:
TimingWheel:O(1)task insert and deleteDelayQueue: precise knowledge of "when does the next bucket expire", avoiding busy-polling
DelayedOperation and Purgatory
The DelayedOperation Abstract Class
// DelayedOperation.scala (core/, simplified)
abstract class DelayedOperation(
override val delayMs: Long,
lockOpt: Option[Lock] = None
) extends TimerTask with Logging {
private val completed = new AtomicBoolean(false)
/**
* Check whether the operation can complete now.
* Implemented by subclasses to check the specific condition.
* Return true to complete; call forceComplete() to do so.
*/
def tryComplete(): Boolean
/**
* Guarantee exactly-once completion (CAS protection).
* Called both by the timer (on timeout) and by condition-check code.
*/
def forceComplete(): Boolean = {
if (completed.compareAndSet(false, true)) {
cancel() // remove from TimingWheel (no-op if already expired)
onComplete() // subclass-defined completion logic (e.g., send response)
true
} else {
false // already completed by another path
}
}
// Called by TimingWheel when the timer fires
override def run(): Unit = {
if (forceComplete())
onExpiration() // optional subclass hook for timeout-specific logic
}
// Subclass defines what "done" means (e.g., DelayedProduce → invoke responseCallback)
def onComplete(): Unit
def onExpiration(): Unit = {}
}
DelayedOperationPurgatory — Managing All Delayed Operations
// DelayedOperationPurgatory.scala (core/, simplified)
final class DelayedOperationPurgatory[T <: DelayedOperation](
purgatoryName: String,
brokerId: Int = 0,
purgeInterval: Int = 1000,
reaperEnabled: Boolean = true,
timerEnabled: Boolean = true
) extends Logging with KafkaMetricsGroup {
private[this] val timeoutTimer = if (timerEnabled) new SystemTimer(purgatoryName) else new MockTimer
// Watchers: map from Key → list of DelayedOperations watching that key
// When a condition that might satisfy an operation occurs, we look up by key
private val watchersForKey = new ConcurrentHashMap[Any, Watchers]()
private[this] val estimatedTotalOperations = new AtomicInteger(0)
/**
* Try to complete the operation immediately.
* If it cannot complete, register it with the timer and all relevant Watchers.
*/
def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
// Optimistic: try completing without locks
if (operation.tryComplete()) return true
var isCompletedByMe = false
operation.lock.lock()
try {
isCompletedByMe = operation.tryComplete()
if (!isCompletedByMe) {
// Register with each watch key
watchKeys.foreach { key =>
if (!operation.isCompleted) {
watchersFor(key).watch(operation)
}
}
// Add to timing wheel for timeout handling
if (!operation.isCompleted) {
estimatedTotalOperations.incrementAndGet()
timeoutTimer.add(operation)
}
}
} finally {
operation.lock.unlock()
}
isCompletedByMe
}
/**
* Called when a condition associated with a key may have been satisfied.
* Example: new data written to Partition P0 → checkAndComplete(TopicPartitionOperationKey(P0))
* Iterates all DelayedOperations watching that key and calls tryComplete() on each.
*/
def checkAndComplete(key: Any): Int = {
val watchers = inReadLock(removeWatchersLock) { watchersForKey.get(key) }
if (watchers == null) 0
else watchers.tryCompleteWatched()
}
private class Watchers(val key: Any) {
private[this] val operations = new LinkedList[T]()
def watch(t: T): Unit = synchronized { operations.add(t) }
def tryCompleteWatched(): Int = {
var completed = 0
synchronized {
val iter = operations.iterator()
while (iter.hasNext) {
val curr = iter.next()
if (curr.isCompleted) {
iter.remove() // already done, clean up
} else if (curr.tryComplete()) {
completed += 1
iter.remove() // just completed, clean up
}
}
}
completed
}
}
}
The Purgatory in Action: End-to-End Walk-Through
For an acks=-1 Produce request:
Producer sends message batch
│
↓
ReplicaManager.appendRecords()
│ Write to local log (LEO advances)
↓
Create DelayedProduce(timeout=30,000ms)
│
├── tryComplete() → false (HW has not advanced yet)
│
├── Register in Purgatory:
│ watchKeys = [TopicPartitionOperationKey(tp0), TopicPartitionOperationKey(tp1)]
│ watchersForKey[tp0].watch(delayedProduce)
│ timeoutTimer.add(delayedProduce) ← enters Level 3 timing wheel
│
└── Suspended, waiting...
Follower sends FetchRequest (fetches data for tp0)
│
↓
Leader records follower's fetch offset; HW advances to cover the write
│
↓
delayedProducePurgatory.checkAndComplete(TopicPartitionOperationKey(tp0))
│
↓
DelayedProduce.tryComplete() → true (partition.highWatermark >= requiredOffset)
│ forceComplete() → cancel() + onComplete()
↓
responseCallback(produceResults) → response sent to Producer
The timing wheel handles the timeout path: if no follower fetches within 30 seconds, the timer fires DelayedProduce.run(), which calls forceComplete() and returns a REQUEST_TIMED_OUT error to the producer.
Comparison with Other Systems
Linux Kernel Timing Wheel
The Linux kernel uses a hierarchical timing wheel (include/linux/timer.h), with 5 levels, 256 buckets each, covering a range of approximately 340 million milliseconds (~4 days). Kafka's implementation is simpler (2–3 levels in typical deployments), but the core design is identical.
Netty's HashedWheelTimer
Netty's HashedWheelTimer is a single-level wheel augmented with a "round" counter: each bucket stores tasks from multiple rounds, distinguished by how many full wheel rotations must pass before they expire. This avoids the complexity of multi-level wheels but introduces imprecision — a task may fire up to one full wheel rotation late. Kafka requires precise timeout semantics (a request must time out within timeout.ms) so this imprecision is unacceptable.
Side-by-Side Comparison
| Approach | Insert | Delete | Precision | Suitable For |
|---|---|---|---|---|
DelayQueue |
O(log n) | O(log n) | Exact | Small task counts |
HashedWheelTimer |
O(1) | O(1) | Approximate | Tolerance for ±1 tick error |
| Hierarchical Timing Wheel | O(1) | O(1) | Exact | High concurrency + exact timeouts |
Performance Impact
According to JMH benchmarks referenced in Kafka's KIP-219 discussion, with 100,000 concurrent delayed operations:
- Throughput: ~7x higher than
DelayQueue(the O(1) vs O(log n) gap is amplified at scale) - P99 latency: ~50% lower (reduced lock contention and elimination of heap rebalancing)
What might appear to be an academic data structures choice is, in fact, one of the concrete engineering reasons Kafka maintains low latency under high load. Every acks=-1 produce request, every fetch.min.bytes consumer wait, and every join-group rebalance timeout flows through this timing wheel — getting it right at O(1) rather than O(log n) makes a measurable difference at Kafka's operating scale.