第 22 章

时间轮与延迟操作:Purgatory 的精妙设计

第22章:时间轮与延迟操作:Purgatory 的精妙设计

导读:时间轮为什么比传统定时器更适合 Kafka?

本章核心问题:时间轮为什么比传统定时器更适合 Kafka?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

单层时间轮

先理解单层时间轮,再扩展到分层。

想象一个钟表:表盘有 N 个刻度(Bucket),秒针每 tickMs 毫秒前进一格(Tick)。每个 Bucket 是一个双向链表,存放所有在"这个刻度到期"的任务。

Bucket[0]  Bucket[1]  Bucket[2]  ...  Bucket[N-1]
   │           │           │                │
[Task A]   [Task B]    [Task C]          [Empty]
[Task D]

局限:单层时间轮只能处理最大 tickMs * N 时间范围内的任务。超出范围的任务怎么办?

Purgatory 的工作全景

acks=-1 的 Produce 请求为例:

Producer 发送消息
    │
    ↓
ReplicaManager.appendRecords()
    │  写入本地 Log(LEO 推进)
    ↓
创建 DelayedProduce(timeout=30000ms)
    │
    ├── 尝试 tryComplete() → false(HW 还没推进)
    │
    ├── 注册到 Purgatory:
    │   watchKeys = [TopicPartitionOperationKey(tp0), TopicPartitionOperationKey(tp1), ...]
    │   watchersForKey[tp0].watch(delayedProduce)
    │   timeoutTimer.add(delayedProduce)  ← 加入时间轮
    │
    └── 挂起,等待...

Follower 发送 FetchRequest(获取 tp0 的新数据)
    │
    ↓
Leader 更新 Follower 的 fetchOffset
    │  如果所有 ISR 都已追上 → HW 推进
    ↓
delayedProducePurgatory.checkAndComplete(TopicPartitionOperationKey(tp0))
    │
    ↓
DelayedProduce.tryComplete() → true(HW >= requiredOffset)
    │  forceComplete() → cancel() + onComplete()
    ↓
responseCallback(produceResults) → 响应发送给 Producer

时间轮在这里处理超时场景:如果 Follower 迟迟不来 Fetch(或者根本没有 Follower),30 秒后时间轮触发 DelayedProduce.run(),强制完成操作并返回超时错误给 Producer。

与其他系统的对比

Linux 内核时间轮

Linux 内核也使用分层时间轮(include/linux/timer.h),共 5 层,每层 256 个 Bucket,总覆盖范围约 3.4 亿毫秒(4 天)。Kafka 的实现更简洁(只有 23 层),但核心思想一致。

Netty 的 HashedWheelTimer

Netty 的 HashedWheelTimer 是单层时间轮,但增加了"round"的概念(每个 Bucket 存储多轮的任务,用 round 计数器区分)。这避免了分层的复杂性,但精度较差(任务可能晚执行一轮)。Kafka 选择了分层方案,精度更高,代价是实现略复杂。

选择分层时间轮的理由

方案 插入 删除 精度 适用场景
DelayQueue O(log n) O(log n) 精确 任务数较少
HashedWheelTimer O(1) O(1) 有误差 允许一定误差
分层时间轮 O(1) O(1) 精确 高并发、精确超时

Kafka 的选择是明智的:高吞吐量场景下,数以万计的 DelayedProduce 同时存在,O(1) 的插入/删除是必须的;而消息交付语义对超时精度有要求(超时必须在 timeout.ms 内响应),所以 HashedWheelTimer 的近似精度不可接受。

性能数据

根据 Kafka 的 JMH 基准测试(在 KIP-219 的 PR 讨论中),时间轮方案相比 DelayQueue 在 10 万并发任务场景下:

这个看似"学术"的数据结构,是 Kafka 能在高负载下维持低延迟的重要基石之一。


Level 2 · 它是怎么运行的(3-5年经验)

Kafka 的实现:TimingWheel + SystemTimer

TimingWheel.scala

// TimingWheel.scala(core/src/main/scala/kafka/utils/timer/,简化)
private[timer] class TimingWheel(
  tickMs: Long,     // 每个 Bucket 对应的时间跨度(毫秒)
  wheelSize: Int,   // Bucket 数量
  startMs: Long,    // 时间轮创建时的当前时间
  taskCounter: AtomicInteger,        // 整个 Timer 中所有任务的计数器
  queue: DelayQueue[TimerTaskList]   // 用于驱动 Tick 的优先级队列
) {
  // 时间轮覆盖的总时间范围(tickMs * wheelSize)
  val interval: Long = tickMs * wheelSize

  // N 个 Bucket,每个 Bucket 是一个循环双向链表
  private[this] val buckets: Array[TimerTaskList] = Array.tabulate[TimerTaskList](wheelSize) {
    _ => new TimerTaskList(taskCounter)
  }

  // 当前时间(对齐到 tickMs)
  private[this] var currentTime: Long = startMs - (startMs % tickMs)

  // 溢出轮:当任务超出本层范围时,放入上层时间轮
  @volatile private[this] var overflowWheel: TimingWheel = null

  private[this] def addOverflowWheel(): Unit = {
    synchronized {
      if (overflowWheel == null) {
        // 上层时间轮的 tickMs = 本层的 interval
        overflowWheel = new TimingWheel(
          tickMs    = interval,
          wheelSize = wheelSize,
          startMs   = currentTime,
          taskCounter = taskCounter,
          queue     = queue
        )
      }
    }
  }

  def add(timerTask: TimerTask): Boolean = {
    val expiration = timerTask.expirationMs

    if (timerTask.cancelled) {
      // 任务已取消,直接忽略
      false
    } else if (expiration < currentTime + tickMs) {
      // 已经过期(或即将在下一 Tick 内到期),返回 false 让调用方立即执行
      false
    } else if (expiration < currentTime + interval) {
      // 在本层时间轮范围内
      val virtualId = expiration / tickMs
      val bucket = buckets((virtualId % wheelSize.toLong).toInt)
      bucket.add(timerTask)

      // 如果这个 Bucket 刚刚被设置了到期时间(之前是空的),加入 DelayQueue
      // DelayQueue 用于驱动上层 Tick(让 SystemTimer 知道何时需要处理这个 Bucket)
      if (bucket.setExpiration(virtualId * tickMs)) {
        queue.offer(bucket)
      }
      true
    } else {
      // 超出本层范围,放入溢出轮(上层时间轮)
      if (overflowWheel == null) addOverflowWheel()
      overflowWheel.add(timerTask)
    }
  }

  // 推进当前时间,处理到期的 Bucket(降级)
  def advanceClock(timeMs: Long): Unit = {
    if (timeMs >= currentTime + tickMs) {
      currentTime = timeMs - (timeMs % tickMs)
      // 同步推进溢出轮的时间
      if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
    }
  }
}

TimerTaskList — Bucket 实现(循环双向链表)

// TimerTaskList.scala(简化)
private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {

  // 哨兵节点(root),简化链表操作
  private[this] val root = new TimerTaskEntry(null, -1)
  root.next = root
  root.prev = root

  private[this] val expiration = new AtomicLong(-1L)

  def setExpiration(expirationMs: Long): Boolean = expiration.getAndSet(expirationMs) != expirationMs

  def getExpiration: Long = expiration.get

  // O(1) 插入
  def add(timerTaskEntry: TimerTaskEntry): Unit = {
    var done = false
    while (!done) {
      timerTaskEntry.remove()  // 先从旧 list 中移除(如果存在)
      synchronized {
        timerTaskEntry.synchronized {
          if (timerTaskEntry.list == null) {
            // 插入到 root 前面(链表尾部)
            val tail = root.prev
            timerTaskEntry.next = root
            timerTaskEntry.prev = tail
            timerTaskEntry.list = this
            tail.next = timerTaskEntry
            root.prev = timerTaskEntry
            taskCounter.incrementAndGet()
            done = true
          }
        }
      }
    }
  }

  // O(1) 删除
  def remove(timerTaskEntry: TimerTaskEntry): Unit = synchronized {
    timerTaskEntry.synchronized {
      if (timerTaskEntry.list eq this) {
        timerTaskEntry.next.prev = timerTaskEntry.prev
        timerTaskEntry.prev.next = timerTaskEntry.next
        timerTaskEntry.next = null
        timerTaskEntry.prev = null
        timerTaskEntry.list = null
        taskCounter.decrementAndGet()
      }
    }
  }

  // 刷新 Bucket:对所有任务调用 f(通常是重新 add 到低层时间轮,或执行到期任务)
  def flush(f: (TimerTaskEntry) => Unit): Unit = synchronized {
    var head = root.next
    while (head ne root) {
      remove(head)
      f(head)
      head = root.next
    }
    expiration.set(-1L)
  }
}

SystemTimer — 驱动整个时间轮系统

// SystemTimer.scala(简化)
class SystemTimer(executorName: String, tickMs: Long = 1, wheelSize: Int = 20, ...) extends Timer {

  // 单线程执行器:驱动时间轮 Tick
  private[this] val taskExecutor = Executors.newFixedThreadPool(1, ...)

  // DelayQueue:持有所有非空 Bucket,按到期时间排序
  // SystemTimer 从这里取到期的 Bucket,推进时钟
  private[this] val delayQueue = new DelayQueue[TimerTaskList]()

  // 一级时间轮
  private[this] val timingWheel = new TimingWheel(
    tickMs    = tickMs,
    wheelSize = wheelSize,
    startMs   = time.hiResClockMs,
    taskCounter = taskCounter,
    queue     = delayQueue
  )

  def add(timerTask: TimerTask): Unit = {
    readLock.lock()
    try {
      addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + time.hiResClockMs))
    } finally {
      readLock.unlock()
    }
  }

  private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
    if (!timingWheel.add(timerTaskEntry)) {
      // add() 返回 false 说明任务已过期(或即将过期)
      if (!timerTaskEntry.cancelled) {
        // 立即在线程池中执行
        taskExecutor.submit(timerTaskEntry.timerTask)
      }
    }
  }

  /**
   * 推进时钟,处理到期任务
   * 由外部调用,通常每 tickMs 调用一次(或由 DelayQueue 驱动)
   */
  def advanceClock(timeoutMs: Long): Boolean = {
    // 从 DelayQueue 取最近到期的 Bucket(阻塞等待最多 timeoutMs)
    var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
    if (bucket != null) {
      writeLock.lock()
      try {
        while (bucket != null) {
          // 推进时间轮的当前时间
          timingWheel.advanceClock(bucket.getExpiration)
          // 将这个 Bucket 里的任务重新投入时间轮(降级)或立即执行
          bucket.flush(addTimerTaskEntry)
          // 继续取下一个到期的 Bucket
          bucket = delayQueue.poll()
        }
      } finally {
        writeLock.unlock()
      }
      true
    } else {
      false
    }
  }
}

关键设计细节:DelayQueue 的使用

你可能注意到 TimingWheel 的实现用到了 DelayQueue——不是用它来管理所有任务(那样就退化了),而是用它来管理非空的 Bucket。整个时间轮系统中,非空 Bucket 的数量远小于任务数量(一个 Bucket 可以装无数个任务)。DelayQueueO(log n) 操作中的 n 是 Bucket 数,而不是任务数,因此开销极小。

这个设计精妙地结合了两种数据结构的优势:

DelayedOperation 与 Purgatory

DelayedOperation 抽象类

// DelayedOperation.scala(core/,简化)
abstract class DelayedOperation(
  override val delayMs: Long,
  lockOpt: Option[Lock] = None
) extends TimerTask with Logging {

  private val completed = new AtomicBoolean(false)

  /**
   * 尝试完成操作(条件满足时调用)
   * 子类实现具体的完成条件检查
   * 返回 true 表示可以完成,调用 forceComplete()
   */
  def tryComplete(): Boolean

  /**
   * 强制完成操作(超时或条件满足时调用)
   * 保证只执行一次(CAS 保护)
   */
  def forceComplete(): Boolean = {
    if (completed.compareAndSet(false, true)) {
      cancel()     // 从 TimingWheel 中取消定时器
      onComplete() // 执行实际的完成逻辑(发送响应等)
      true
    } else {
      false
    }
  }

  /**
   * 超时时由 TimingWheel 调用
   */
  override def run(): Unit = {
    if (forceComplete())
      onExpiration()  // 超时特有的处理(例如记录超时指标)
  }

  /**
   * 完成后的业务逻辑,由子类实现
   * 例如 DelayedProduce.onComplete() = 调用 responseCallback
   */
  def onComplete(): Unit

  def onExpiration(): Unit = {}
}

DelayedOperationPurgatory — 管理所有延迟操作

// DelayedOperationPurgatory.scala(core/,简化)
final class DelayedOperationPurgatory[T <: DelayedOperation](
  purgatoryName: String,
  brokerId: Int = 0,
  purgeInterval: Int = 1000,
  reaperEnabled: Boolean = true,
  timerEnabled: Boolean = true
) extends Logging with KafkaMetricsGroup {

  // 内部 Timer(SystemTimer 包装)
  private[this] val timeoutTimer = if (timerEnabled) new SystemTimer(purgatoryName) else new MockTimer

  // 监视器(Watcher):按 Key 组织的延迟操作列表
  // 每次可能触发完成的事件发生时,通过 Key 找到相关的 DelayedOperation 并检查
  private val watchersForKey = new ConcurrentHashMap[Any, Watchers]()

  // 当前 Purgatory 中未完成的延迟操作数
  private[this] val estimatedTotalOperations = new AtomicInteger(0)

  /**
   * 尝试立即完成操作;如果不能完成,注册到时间轮并添加监视器
   */
  def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
    // 先尝试立即完成
    if (operation.tryComplete()) return true

    // 不能立即完成,注册时间轮定时器
    var isCompletedByMe = false
    operation.lock.lock()
    try {
      isCompletedByMe = operation.tryComplete()
      if (!isCompletedByMe) {
        // 将操作添加到所有相关 Key 的监视器列表
        watchKeys.foreach { key =>
          if (operation.isCompleted) {
            // 如果并发完成,不需要注册了
          } else {
            val watcher = watchersFor(key)
            watcher.watch(operation)
          }
        }
        // 添加到时间轮(处理超时)
        if (!operation.isCompleted) {
          estimatedTotalOperations.incrementAndGet()
          timeoutTimer.add(operation)
        }
      }
    } finally {
      operation.lock.unlock()
    }
    isCompletedByMe
  }

  /**
   * 当某个 Key 相关的条件可能满足时,检查所有监视该 Key 的操作
   * 例如:新数据写入 Partition P0 → checkAndComplete(TopicPartitionOperationKey(P0))
   */
  def checkAndComplete(key: Any): Int = {
    val watchers = inReadLock(removeWatchersLock) {
      watchersForKey.get(key)
    }
    if (watchers == null) 0
    else watchers.tryCompleteWatched()
  }

  /**
   * Watcher 列表:监视同一个 Key 的所有延迟操作
   */
  private class Watchers(val key: Any) {
    private[this] val operations = new LinkedList[T]()

    def watch(t: T): Unit = {
      synchronized { operations.add(t) }
    }

    def tryCompleteWatched(): Int = {
      var completed = 0
      synchronized {
        val iter = operations.iterator()
        while (iter.hasNext) {
          val curr = iter.next()
          if (curr.isCompleted) {
            iter.remove()
          } else if (curr.tryComplete()) {
            completed += 1
            iter.remove()
          }
        }
      }
      completed
    }
  }
}

Level 3 · 规范怎么定义的(资深)

本章的协议与规范细节已融入上述 Level 中。如需深入了解,请参阅 Kafka 官方协议文档和对应 KIP 提案。


Level 4 · 边界与陷阱(所有人)

问题的规模

前面几章我们多次提到 DelayedProduceDelayedFetchDelayedJoin——这些延迟操作都需要在一定时间后超时,或者在某个条件满足时提前完成。在生产环境中,一个繁忙的 Kafka Broker 可能同时有数万个这样的延迟操作在等待。

管理这些定时任务,最直觉的方案是 Java 的 DelayQueue:一个基于最小堆(min-heap)的优先级队列,到期时间最早的任务排在堆顶。

DelayQueue 的问题

Kafka 选择了一个更精妙的数据结构:分层时间轮(Hierarchical Timing Wheel),插入和删除的时间复杂度均为 O(1)

分层时间轮:级联解决范围问题

分层时间轮是多个单层时间轮的嵌套,类似于钟表的秒针、分针、时针:

Level 1 (tickMs=1ms,   wheelSize=20, 覆盖范围: 20ms)
Level 2 (tickMs=20ms,  wheelSize=20, 覆盖范围: 400ms)
Level 3 (tickMs=400ms, wheelSize=20, 覆盖范围: 8000ms)

降级(Cascade):当 Level 2 的秒针 Tick 时,它的一个 Bucket 到期了。这个 Bucket 里的任务可能还有 15ms 才真正到期,需要重新投入 Level 1 的对应 Bucket。这个"重新投入低层时间轮"的过程叫做降级,也是分层时间轮的核心机制。

本章评分
4.5  / 5  (7 评分)

💬 留言讨论