时间轮与延迟操作:Purgatory 的精妙设计
第22章:时间轮与延迟操作:Purgatory 的精妙设计
导读:时间轮为什么比传统定时器更适合 Kafka?
本章核心问题:时间轮为什么比传统定时器更适合 Kafka?
读完本章你将理解:
- 分层时间轮的 O(1) 操作
- DelayedProduce/DelayedFetch/DelayedJoin 生命周期
- Purgatory 的 tryComplete 机制
- 高并发场景下的性能优势
Level 1 · 你需要知道的(1-3年经验)
单层时间轮
先理解单层时间轮,再扩展到分层。
想象一个钟表:表盘有 N 个刻度(Bucket),秒针每 tickMs 毫秒前进一格(Tick)。每个 Bucket 是一个双向链表,存放所有在"这个刻度到期"的任务。
Bucket[0] Bucket[1] Bucket[2] ... Bucket[N-1]
│ │ │ │
[Task A] [Task B] [Task C] [Empty]
[Task D]
- 插入:计算任务到期时刻对应的 Bucket 编号
(expiration / tickMs) % N,插入到该 Bucket 的链表头部——O(1) - Tick:秒针前进,扫描当前 Bucket,执行(或重新入队)所有任务——
O(过期任务数) - 删除(提前完成):双向链表支持
O(1)删除,不需要重建堆
局限:单层时间轮只能处理最大 tickMs * N 时间范围内的任务。超出范围的任务怎么办?
Purgatory 的工作全景
以 acks=-1 的 Produce 请求为例:
Producer 发送消息
│
↓
ReplicaManager.appendRecords()
│ 写入本地 Log(LEO 推进)
↓
创建 DelayedProduce(timeout=30000ms)
│
├── 尝试 tryComplete() → false(HW 还没推进)
│
├── 注册到 Purgatory:
│ watchKeys = [TopicPartitionOperationKey(tp0), TopicPartitionOperationKey(tp1), ...]
│ watchersForKey[tp0].watch(delayedProduce)
│ timeoutTimer.add(delayedProduce) ← 加入时间轮
│
└── 挂起,等待...
Follower 发送 FetchRequest(获取 tp0 的新数据)
│
↓
Leader 更新 Follower 的 fetchOffset
│ 如果所有 ISR 都已追上 → HW 推进
↓
delayedProducePurgatory.checkAndComplete(TopicPartitionOperationKey(tp0))
│
↓
DelayedProduce.tryComplete() → true(HW >= requiredOffset)
│ forceComplete() → cancel() + onComplete()
↓
responseCallback(produceResults) → 响应发送给 Producer
时间轮在这里处理超时场景:如果 Follower 迟迟不来 Fetch(或者根本没有 Follower),30 秒后时间轮触发 DelayedProduce.run(),强制完成操作并返回超时错误给 Producer。
与其他系统的对比
Linux 内核时间轮
Linux 内核也使用分层时间轮(include/linux/timer.h),共 5 层,每层 256 个 Bucket,总覆盖范围约 3.4 亿毫秒(4 天)。Kafka 的实现更简洁(只有 23 层),但核心思想一致。
Netty 的 HashedWheelTimer
Netty 的 HashedWheelTimer 是单层时间轮,但增加了"round"的概念(每个 Bucket 存储多轮的任务,用 round 计数器区分)。这避免了分层的复杂性,但精度较差(任务可能晚执行一轮)。Kafka 选择了分层方案,精度更高,代价是实现略复杂。
选择分层时间轮的理由
| 方案 | 插入 | 删除 | 精度 | 适用场景 |
|---|---|---|---|---|
| DelayQueue | O(log n) | O(log n) | 精确 | 任务数较少 |
| HashedWheelTimer | O(1) | O(1) | 有误差 | 允许一定误差 |
| 分层时间轮 | O(1) | O(1) | 精确 | 高并发、精确超时 |
Kafka 的选择是明智的:高吞吐量场景下,数以万计的 DelayedProduce 同时存在,O(1) 的插入/删除是必须的;而消息交付语义对超时精度有要求(超时必须在 timeout.ms 内响应),所以 HashedWheelTimer 的近似精度不可接受。
性能数据
根据 Kafka 的 JMH 基准测试(在 KIP-219 的 PR 讨论中),时间轮方案相比 DelayQueue 在 10 万并发任务场景下:
- 吞吐量提升:~7x(因为 O(1) vs O(log n) 的差异在高并发下被放大)
- P99 延迟降低:~50%(减少了锁竞争和堆排序开销)
这个看似"学术"的数据结构,是 Kafka 能在高负载下维持低延迟的重要基石之一。
Level 2 · 它是怎么运行的(3-5年经验)
Kafka 的实现:TimingWheel + SystemTimer
TimingWheel.scala
// TimingWheel.scala(core/src/main/scala/kafka/utils/timer/,简化)
private[timer] class TimingWheel(
tickMs: Long, // 每个 Bucket 对应的时间跨度(毫秒)
wheelSize: Int, // Bucket 数量
startMs: Long, // 时间轮创建时的当前时间
taskCounter: AtomicInteger, // 整个 Timer 中所有任务的计数器
queue: DelayQueue[TimerTaskList] // 用于驱动 Tick 的优先级队列
) {
// 时间轮覆盖的总时间范围(tickMs * wheelSize)
val interval: Long = tickMs * wheelSize
// N 个 Bucket,每个 Bucket 是一个循环双向链表
private[this] val buckets: Array[TimerTaskList] = Array.tabulate[TimerTaskList](wheelSize) {
_ => new TimerTaskList(taskCounter)
}
// 当前时间(对齐到 tickMs)
private[this] var currentTime: Long = startMs - (startMs % tickMs)
// 溢出轮:当任务超出本层范围时,放入上层时间轮
@volatile private[this] var overflowWheel: TimingWheel = null
private[this] def addOverflowWheel(): Unit = {
synchronized {
if (overflowWheel == null) {
// 上层时间轮的 tickMs = 本层的 interval
overflowWheel = new TimingWheel(
tickMs = interval,
wheelSize = wheelSize,
startMs = currentTime,
taskCounter = taskCounter,
queue = queue
)
}
}
}
def add(timerTask: TimerTask): Boolean = {
val expiration = timerTask.expirationMs
if (timerTask.cancelled) {
// 任务已取消,直接忽略
false
} else if (expiration < currentTime + tickMs) {
// 已经过期(或即将在下一 Tick 内到期),返回 false 让调用方立即执行
false
} else if (expiration < currentTime + interval) {
// 在本层时间轮范围内
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTask)
// 如果这个 Bucket 刚刚被设置了到期时间(之前是空的),加入 DelayQueue
// DelayQueue 用于驱动上层 Tick(让 SystemTimer 知道何时需要处理这个 Bucket)
if (bucket.setExpiration(virtualId * tickMs)) {
queue.offer(bucket)
}
true
} else {
// 超出本层范围,放入溢出轮(上层时间轮)
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTask)
}
}
// 推进当前时间,处理到期的 Bucket(降级)
def advanceClock(timeMs: Long): Unit = {
if (timeMs >= currentTime + tickMs) {
currentTime = timeMs - (timeMs % tickMs)
// 同步推进溢出轮的时间
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
}
}
}
TimerTaskList — Bucket 实现(循环双向链表)
// TimerTaskList.scala(简化)
private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
// 哨兵节点(root),简化链表操作
private[this] val root = new TimerTaskEntry(null, -1)
root.next = root
root.prev = root
private[this] val expiration = new AtomicLong(-1L)
def setExpiration(expirationMs: Long): Boolean = expiration.getAndSet(expirationMs) != expirationMs
def getExpiration: Long = expiration.get
// O(1) 插入
def add(timerTaskEntry: TimerTaskEntry): Unit = {
var done = false
while (!done) {
timerTaskEntry.remove() // 先从旧 list 中移除(如果存在)
synchronized {
timerTaskEntry.synchronized {
if (timerTaskEntry.list == null) {
// 插入到 root 前面(链表尾部)
val tail = root.prev
timerTaskEntry.next = root
timerTaskEntry.prev = tail
timerTaskEntry.list = this
tail.next = timerTaskEntry
root.prev = timerTaskEntry
taskCounter.incrementAndGet()
done = true
}
}
}
}
}
// O(1) 删除
def remove(timerTaskEntry: TimerTaskEntry): Unit = synchronized {
timerTaskEntry.synchronized {
if (timerTaskEntry.list eq this) {
timerTaskEntry.next.prev = timerTaskEntry.prev
timerTaskEntry.prev.next = timerTaskEntry.next
timerTaskEntry.next = null
timerTaskEntry.prev = null
timerTaskEntry.list = null
taskCounter.decrementAndGet()
}
}
}
// 刷新 Bucket:对所有任务调用 f(通常是重新 add 到低层时间轮,或执行到期任务)
def flush(f: (TimerTaskEntry) => Unit): Unit = synchronized {
var head = root.next
while (head ne root) {
remove(head)
f(head)
head = root.next
}
expiration.set(-1L)
}
}
SystemTimer — 驱动整个时间轮系统
// SystemTimer.scala(简化)
class SystemTimer(executorName: String, tickMs: Long = 1, wheelSize: Int = 20, ...) extends Timer {
// 单线程执行器:驱动时间轮 Tick
private[this] val taskExecutor = Executors.newFixedThreadPool(1, ...)
// DelayQueue:持有所有非空 Bucket,按到期时间排序
// SystemTimer 从这里取到期的 Bucket,推进时钟
private[this] val delayQueue = new DelayQueue[TimerTaskList]()
// 一级时间轮
private[this] val timingWheel = new TimingWheel(
tickMs = tickMs,
wheelSize = wheelSize,
startMs = time.hiResClockMs,
taskCounter = taskCounter,
queue = delayQueue
)
def add(timerTask: TimerTask): Unit = {
readLock.lock()
try {
addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + time.hiResClockMs))
} finally {
readLock.unlock()
}
}
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
if (!timingWheel.add(timerTaskEntry)) {
// add() 返回 false 说明任务已过期(或即将过期)
if (!timerTaskEntry.cancelled) {
// 立即在线程池中执行
taskExecutor.submit(timerTaskEntry.timerTask)
}
}
}
/**
* 推进时钟,处理到期任务
* 由外部调用,通常每 tickMs 调用一次(或由 DelayQueue 驱动)
*/
def advanceClock(timeoutMs: Long): Boolean = {
// 从 DelayQueue 取最近到期的 Bucket(阻塞等待最多 timeoutMs)
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
if (bucket != null) {
writeLock.lock()
try {
while (bucket != null) {
// 推进时间轮的当前时间
timingWheel.advanceClock(bucket.getExpiration)
// 将这个 Bucket 里的任务重新投入时间轮(降级)或立即执行
bucket.flush(addTimerTaskEntry)
// 继续取下一个到期的 Bucket
bucket = delayQueue.poll()
}
} finally {
writeLock.unlock()
}
true
} else {
false
}
}
}
关键设计细节:DelayQueue 的使用
你可能注意到 TimingWheel 的实现用到了 DelayQueue——不是用它来管理所有任务(那样就退化了),而是用它来管理非空的 Bucket。整个时间轮系统中,非空 Bucket 的数量远小于任务数量(一个 Bucket 可以装无数个任务)。DelayQueue 的 O(log n) 操作中的 n 是 Bucket 数,而不是任务数,因此开销极小。
这个设计精妙地结合了两种数据结构的优势:
TimingWheel:O(1)的任务插入和删除DelayQueue:精确地知道"下一个 Bucket 什么时候到期",避免空转轮询
DelayedOperation 与 Purgatory
DelayedOperation 抽象类
// DelayedOperation.scala(core/,简化)
abstract class DelayedOperation(
override val delayMs: Long,
lockOpt: Option[Lock] = None
) extends TimerTask with Logging {
private val completed = new AtomicBoolean(false)
/**
* 尝试完成操作(条件满足时调用)
* 子类实现具体的完成条件检查
* 返回 true 表示可以完成,调用 forceComplete()
*/
def tryComplete(): Boolean
/**
* 强制完成操作(超时或条件满足时调用)
* 保证只执行一次(CAS 保护)
*/
def forceComplete(): Boolean = {
if (completed.compareAndSet(false, true)) {
cancel() // 从 TimingWheel 中取消定时器
onComplete() // 执行实际的完成逻辑(发送响应等)
true
} else {
false
}
}
/**
* 超时时由 TimingWheel 调用
*/
override def run(): Unit = {
if (forceComplete())
onExpiration() // 超时特有的处理(例如记录超时指标)
}
/**
* 完成后的业务逻辑,由子类实现
* 例如 DelayedProduce.onComplete() = 调用 responseCallback
*/
def onComplete(): Unit
def onExpiration(): Unit = {}
}
DelayedOperationPurgatory — 管理所有延迟操作
// DelayedOperationPurgatory.scala(core/,简化)
final class DelayedOperationPurgatory[T <: DelayedOperation](
purgatoryName: String,
brokerId: Int = 0,
purgeInterval: Int = 1000,
reaperEnabled: Boolean = true,
timerEnabled: Boolean = true
) extends Logging with KafkaMetricsGroup {
// 内部 Timer(SystemTimer 包装)
private[this] val timeoutTimer = if (timerEnabled) new SystemTimer(purgatoryName) else new MockTimer
// 监视器(Watcher):按 Key 组织的延迟操作列表
// 每次可能触发完成的事件发生时,通过 Key 找到相关的 DelayedOperation 并检查
private val watchersForKey = new ConcurrentHashMap[Any, Watchers]()
// 当前 Purgatory 中未完成的延迟操作数
private[this] val estimatedTotalOperations = new AtomicInteger(0)
/**
* 尝试立即完成操作;如果不能完成,注册到时间轮并添加监视器
*/
def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
// 先尝试立即完成
if (operation.tryComplete()) return true
// 不能立即完成,注册时间轮定时器
var isCompletedByMe = false
operation.lock.lock()
try {
isCompletedByMe = operation.tryComplete()
if (!isCompletedByMe) {
// 将操作添加到所有相关 Key 的监视器列表
watchKeys.foreach { key =>
if (operation.isCompleted) {
// 如果并发完成,不需要注册了
} else {
val watcher = watchersFor(key)
watcher.watch(operation)
}
}
// 添加到时间轮(处理超时)
if (!operation.isCompleted) {
estimatedTotalOperations.incrementAndGet()
timeoutTimer.add(operation)
}
}
} finally {
operation.lock.unlock()
}
isCompletedByMe
}
/**
* 当某个 Key 相关的条件可能满足时,检查所有监视该 Key 的操作
* 例如:新数据写入 Partition P0 → checkAndComplete(TopicPartitionOperationKey(P0))
*/
def checkAndComplete(key: Any): Int = {
val watchers = inReadLock(removeWatchersLock) {
watchersForKey.get(key)
}
if (watchers == null) 0
else watchers.tryCompleteWatched()
}
/**
* Watcher 列表:监视同一个 Key 的所有延迟操作
*/
private class Watchers(val key: Any) {
private[this] val operations = new LinkedList[T]()
def watch(t: T): Unit = {
synchronized { operations.add(t) }
}
def tryCompleteWatched(): Int = {
var completed = 0
synchronized {
val iter = operations.iterator()
while (iter.hasNext) {
val curr = iter.next()
if (curr.isCompleted) {
iter.remove()
} else if (curr.tryComplete()) {
completed += 1
iter.remove()
}
}
}
completed
}
}
}
Level 3 · 规范怎么定义的(资深)
本章的协议与规范细节已融入上述 Level 中。如需深入了解,请参阅 Kafka 官方协议文档和对应 KIP 提案。
Level 4 · 边界与陷阱(所有人)
问题的规模
前面几章我们多次提到 DelayedProduce、DelayedFetch、DelayedJoin——这些延迟操作都需要在一定时间后超时,或者在某个条件满足时提前完成。在生产环境中,一个繁忙的 Kafka Broker 可能同时有数万个这样的延迟操作在等待。
管理这些定时任务,最直觉的方案是 Java 的 DelayQueue:一个基于最小堆(min-heap)的优先级队列,到期时间最早的任务排在堆顶。
DelayQueue 的问题:
- 插入和删除的时间复杂度均为
O(log n) - 当 n = 100,000 时,每次操作需要 17 次比较
- Kafka 每秒可能产生数万个 DelayedProduce,插入速率极高
- 大多数延迟操作在超时前就会被"提前完成"(
tryComplete()成功),需要频繁从堆中删除
Kafka 选择了一个更精妙的数据结构:分层时间轮(Hierarchical Timing Wheel),插入和删除的时间复杂度均为 O(1)。
分层时间轮:级联解决范围问题
分层时间轮是多个单层时间轮的嵌套,类似于钟表的秒针、分针、时针:
Level 1 (tickMs=1ms, wheelSize=20, 覆盖范围: 20ms)
Level 2 (tickMs=20ms, wheelSize=20, 覆盖范围: 400ms)
Level 3 (tickMs=400ms, wheelSize=20, 覆盖范围: 8000ms)
- 延迟 ≤ 20ms 的任务放入 Level 1
- 延迟 20~400ms 的任务放入 Level 2
- 延迟 400~8000ms 的任务放入 Level 3
- 更长的延迟:继续向上扩展
降级(Cascade):当 Level 2 的秒针 Tick 时,它的一个 Bucket 到期了。这个 Bucket 里的任务可能还有 15ms 才真正到期,需要重新投入 Level 1 的对应 Bucket。这个"重新投入低层时间轮"的过程叫做降级,也是分层时间轮的核心机制。