第 8 章

watch 与调度器:异步更新队列、flush 策略与竞态清理

第8章:watch 与调度器——异步更新队列、flush 策略与竞态清理

Vue 3 的更新队列是一个去重的微任务队列:同一组件在一个宏任务内发生 100 次状态变化,只会触发 1 次 DOM 更新。这不是巧合,而是 queueJob + Promise.resolve() 精心设计的结果。

本章核心问题watch 的回调为什么不是"状态一变就立刻执行"?nextTick 的本质是什么?异步 watch 里的竞态问题如何安全处理?

读完本章你将理解


Level 1 · 你需要知道的(1-3年经验)

watch 与 watchEffect 的直觉对比

Vue 3 提供了两个侦听 API,它们在使用体验上有明显差别:

watchEffect:立即执行,自动追踪依赖

import { ref, watchEffect } from 'vue'

const count = ref(0)

// 立即执行一次,追踪 count.value
watchEffect(() => {
  console.log('count is:', count.value)  // 立即打印 "count is: 0"
})

count.value = 1  // 异步打印 "count is: 1"

watch:懒执行,显式指定监听源

import { ref, watch } from 'vue'

const count = ref(0)

// 不立即执行,等待 count 变化
watch(count, (newVal, oldVal) => {
  console.log(`count: ${oldVal} → ${newVal}`)
})

// 此时什么都不打印

count.value = 1  // 异步打印 "count: 0 → 1"

核心区别总结

特性 watchEffect watch
首次执行 立即(eager) 需要 immediate: true
依赖追踪 自动(运行时收集) 显式指定
回调参数 无 old/new 有 oldValue / newValue
主要用途 副作用同步 精确响应特定数据变化
停止 返回 stop 函数 返回 stop 函数

watch 的三种数据源

watch 支持三种数据源,行为各有不同:

const count = ref(0)
const state = reactive({ count: 0, name: 'Vue' })

// 1. 监听 ref(直接传入,不用 .value)
watch(count, (newVal, oldVal) => {
  console.log(newVal, oldVal)
})

// 2. 监听 reactive 对象(自动深度监听)
watch(state, (newState, oldState) => {
  // ⚠️ 注意:监听 reactive 时 newState === oldState
  // 因为是同一个 Proxy 对象
  console.log(newState.count)
})

// 3. 监听 getter 函数(精确控制,性能最优)
watch(
  () => state.count,  // 只监听 state.count,不监听 state.name
  (newVal, oldVal) => {
    console.log(`state.count: ${oldVal} → ${newVal}`)
  }
)

// 监听多个源(数组形式)
watch([count, () => state.name], ([newCount, newName], [oldCount, oldName]) => {
  console.log(newCount, newName)
})

监听 reactive 对象的陷阱

const state = reactive({ a: { b: 1 } })

// ❌ 错误:监听整个 reactive 对象,newVal 和 oldVal 是同一个引用
watch(state, (newVal, oldVal) => {
  console.log(newVal === oldVal)  // true!无法比较前后值
})

// ✅ 正确:用 getter 函数 + 返回基础值
watch(() => state.a.b, (newVal, oldVal) => {
  console.log(newVal, oldVal)  // 正确的前后值
})

// ✅ 正确:如果需要深度监听且关心前后状态,使用 deep 选项配合手动序列化
watch(() => JSON.stringify(state), (newStr, oldStr) => {
  // 代价:每次变化都序列化,有性能成本
})

flush 选项:控制回调执行时机

flush 选项决定 watch 回调在组件更新周期的哪个阶段执行:

// flush: 'pre'(默认值)
// 在组件更新前执行,读到的是更新前的 DOM
watch(count, callback, { flush: 'pre' })

// flush: 'post'
// 在组件更新后执行,读到的是最新的 DOM
// 等价于 watchPostEffect
watch(count, callback, { flush: 'post' })

// flush: 'sync'
// 依赖变化时同步立即执行,跳过调度器
// ⚠️ 危险:可能在一次状态更新中触发多次
watch(count, callback, { flush: 'sync' })

实际选择指南

// 需要在回调中访问更新后的 DOM?用 flush: 'post'
watch(list, async () => {
  await nextTick()  // 等价方案
  const el = document.querySelector('.list-item')
  // 此时 DOM 已更新
}, { flush: 'post' })

// 需要在 DOM 更新前做一些准备?用默认 'pre'
watch(isVisible, (val) => {
  if (val) {
    // 此时组件 DOM 还未更新,可以做 pre-transition 准备
    prepareAnimation()
  }
})

nextTick 的正确使用

import { ref, nextTick } from 'vue'

const count = ref(0)

async function increment() {
  count.value++
  count.value++
  count.value++
  
  // 此时 DOM 还未更新(批量更新还在队列里)
  console.log(document.getElementById('count')?.textContent) // 旧值
  
  await nextTick()
  
  // 此时 DOM 已更新
  console.log(document.getElementById('count')?.textContent) // "3"
}

Level 2 · 它是怎么运行的(3-5年经验)

调度器的三个核心函数

Vue 3 的批量更新机制建立在三个关键函数上:

1. queueJob(job):将一个更新任务加入队列

// 简化示意(packages/runtime-core/src/scheduler.ts)
const queue: SchedulerJob[] = []
let isFlushing = false
let isFlushPending = false

export function queueJob(job: SchedulerJob) {
  // 去重检查:同一个 job(用 id 标识)不加入两次
  if (
    !queue.length ||
    !queue.includes(job, isFlushing && job.allowRecurse ? flushIndex + 1 : flushIndex)
  ) {
    if (job.id == null) {
      queue.push(job)
    } else {
      // 按 id 排序插入(保证父组件先于子组件更新)
      queue.splice(findInsertionIndex(job.id), 0, job)
    }
    queueFlush()
  }
}

2. queueFlush():安排微任务刷新队列

function queueFlush() {
  if (!isFlushing && !isFlushPending) {
    isFlushPending = true
    // 关键:通过 Promise.resolve() 安排微任务
    currentFlushPromise = resolvedPromise.then(flushJobs)
  }
}

3. flushJobs():实际执行队列中的所有任务

function flushJobs(seen?: CountMap) {
  isFlushPending = false
  isFlushing = true
  
  // 排序:保证父组件在子组件之前更新
  queue.sort(comparator)
  
  try {
    for (flushIndex = 0; flushIndex < queue.length; flushIndex++) {
      const job = queue[flushIndex]
      if (job && job.active !== false) {
        callWithErrorHandling(job, null, ErrorCodes.SCHEDULER)
      }
    }
  } finally {
    flushIndex = 0
    queue.length = 0
    
    // 刷新 post-flush 队列(flush: 'post' 的 watch 回调)
    flushPostFlushCbs(seen)
    
    isFlushing = false
    currentFlushPromise = null
    
    // 如果刷新过程中又产生了新任务,继续刷新
    if (queue.length || pendingPostFlushCbs.length) {
      flushJobs(seen)
    }
  }
}

三函数协作的完整流程图

用户代码修改多个 ref
         │
         ▼
   ref.value = 1
   ref.value = 2   ─────►  triggerRefValue(ref)
   ref.value = 3            │
                            ▼
                    遍历 ref.dep 中的 effects
                            │
                    ┌───────┴──────────┐
                    ▼                  ▼
             组件 render effect    watch effect
                    │                  │
                    ▼                  ▼
             queueJob(renderJob)  queueJob(watchJob)
                    │                  │
                    └──────┬───────────┘
                           ▼
                     queue = [renderJob, watchJob]
                           │
                           ▼
                    queueFlush()(第一次调用)
                           │
                           ▼
                    isFlushPending = true
                    Promise.resolve().then(flushJobs)
                           │
                           │(微任务,当前同步代码执行完后)
                           ▼
                     flushJobs()
                           │
                    ┌──────┴──────────┐
                    ▼                 ▼
             watchJob执行         renderJob执行
             (flush:pre,先于render)  (更新 DOM)
                    │
                    ▼
             flushPostFlushCbs()
                    │
                    ▼
             watchJob执行(flush:post)

nextTick 的微任务本质

// packages/runtime-core/src/scheduler.ts
let resolvedPromise: Promise<any> = Promise.resolve() as any
let currentFlushPromise: Promise<void> | null = null

export function nextTick<T = void>(
  this: T,
  fn?: (this: T) => void,
): Promise<void> {
  const p = currentFlushPromise || resolvedPromise
  return fn ? p.then(this ? fn.bind(this) : fn) : p
}

关键理解

微任务 vs 宏任务对比

宏任务(MacroTask):setTimeout, setInterval, I/O, UI rendering
微任务(MicroTask):Promise.then, MutationObserver, queueMicrotask

执行顺序:
同步代码 → 所有微任务 → UI渲染 → 下一个宏任务
                ↑
         nextTick 在这里执行

Vue 3 刻意选择微任务(Promise)而非宏任务(setTimeout),原因是微任务在同一帧内执行,用户不会看到中间状态(避免闪烁)。

flush: 'pre' 与 flush: 'post' 的执行位置

同步代码执行 → count.value++ → queueJob(renderJob) + queueJob(watchPreJob)
                                       │
                                       ▼
                              Promise 微任务开始
                                       │
                              ┌────────┴─────────┐
                              ▼                  ▼
                    flush:'pre' watch回调    组件 render
                    (此时 DOM 未更新)      (DOM 更新)
                                                 │
                                                 ▼
                                      flushPostFlushCbs()
                                                 │
                                                 ▼
                                    flush:'post' watch回调
                                    (此时 DOM 已更新)

watch 的停止机制

const stop = watch(count, callback)

// 停止监听
stop()

stop() 的内部实现调用了 effect.stop()

// packages/reactivity/src/effect.ts
stop() {
  if (this.active) {
    cleanupEffect(this)  // 从所有 dep 中移除自身
    if (this.onStop) {
      this.onStop()      // 触发 onStop 钩子
    }
    this.active = false
  }
}

停止后,effect 的 active = false,即使依赖变化也不会触发回调。

竞态条件(Race Condition)的问题与解法

问题场景

// ❌ 有竞态风险的代码
const userId = ref(1)

watch(userId, async (newId) => {
  const data = await fetchUserData(newId)
  // ⚠️ 如果 userId 在请求期间改变了,
  // 后发先回的请求会覆盖先发后回的结果!
  userData.value = data
})

// 场景:
// t=0: userId=1,发起请求A
// t=100ms: userId=2,发起请求B
// t=200ms: 请求B返回(因为用户2数据小)
// t=500ms: 请求A返回(因为用户1数据大)
// 结果:userData 显示的是用户1的数据,但 userId 是 2!

正确解法:onCleanup + AbortController

// ✅ 正确处理竞态
const userId = ref(1)
const userData = ref(null)
const isLoading = ref(false)

watch(userId, async (newId, oldId, onCleanup) => {
  // 创建一个 AbortController
  const controller = new AbortController()
  
  // onCleanup 在以下情况触发:
  // 1. 下一次 watch 回调执行前(依赖变化时)
  // 2. watch 被停止时
  onCleanup(() => {
    controller.abort()  // 取消上一次还在进行的请求
  })
  
  isLoading.value = true
  
  try {
    const response = await fetch(`/api/users/${newId}`, {
      signal: controller.signal
    })
    
    if (response.ok) {
      userData.value = await response.json()
    }
  } catch (err) {
    if (err.name === 'AbortError') {
      // 请求被正常取消,忽略错误
      return
    }
    console.error('请求失败:', err)
  } finally {
    isLoading.value = false
  }
})

onCleanup 的触发时机图

userId = 1 → watch回调1开始 → 发出请求A
                  │
userId = 2 → watch回调1的 onCleanup 触发 → controller1.abort() → 请求A取消
           → watch回调2开始 → 发出请求B
                  │
userId = 3 → watch回调2的 onCleanup 触发 → controller2.abort() → 请求B取消
           → watch回调3开始 → 发出请求C
                  │
               请求C返回 → userData 更新 ✓

watchEffect 的依赖追踪与停止

// watchEffect 的 cleanup 与 watch 相同
const stop = watchEffect((onCleanup) => {
  const controller = new AbortController()
  
  onCleanup(() => controller.abort())
  
  fetch(`/api/data?q=${searchQuery.value}`, {
    signal: controller.signal
  }).then(res => res.json())
    .then(data => { results.value = data })
})

// 组件卸载时自动停止(在 setup 内创建的 watch/watchEffect 会绑定组件生命周期)
// 也可手动停止
stop()

Level 3 · 设计文档与源码(资深开发者)

queueJob 的 id 排序策略

文件路径packages/runtime-core/src/scheduler.ts

// 通过 job.id 排序,保证父组件先于子组件执行
// 子组件的 id 总是大于父组件(创建顺序决定 id)
function comparator(a: SchedulerJob, b: SchedulerJob): number {
  const diff = getId(a) - getId(b)
  if (diff === 0) {
    // 如果 id 相同,pre-flush job 先执行
    if (a.pre && !b.pre) return -1
    if (b.pre && !a.pre) return 1
  }
  return diff
}

function findInsertionIndex(id: number) {
  let start = flushIndex + 1
  let end = queue.length
  while (start < end) {
    const middle = (start + end) >>> 1
    const middleJob = queue[middle]
    const middleJobId = getId(middleJob)
    if (middleJobId < id || (middleJobId === id && middleJob.pre && !isFlushing)) {
      start = middle + 1
    } else {
      end = middle
    }
  }
  return start
}

排序的意义:父组件的 id 小于子组件,因此父组件先更新。如果父组件更新导致子组件的 props 改变(子组件需要因此更新),这个顺序保证了正确的更新级联。

watch 的内部实现:doWatch 函数

文件路径packages/runtime-core/src/apiWatch.ts

function doWatch(
  source: WatchSource | WatchSource[] | WatchEffect | object,
  cb: WatchCallback | null,
  { immediate, deep, flush, once, onTrack, onTrigger }: WatchOptions = EMPTY_OBJ,
): WatchStopHandle {
  
  // 1. 根据 source 类型构建 getter 函数
  let getter: () => any
  let forceTrigger = false
  let isMultiSource = false
  
  if (isRef(source)) {
    getter = () => source.value
    forceTrigger = isShallow(source)
  } else if (isReactive(source)) {
    getter = () => source
    deep = true  // reactive 对象自动深度监听
  } else if (isArray(source)) {
    isMultiSource = true
    forceTrigger = source.some(s => isReactive(s) || isShallow(s))
    getter = () => source.map(s => {
      if (isRef(s)) return s.value
      if (isReactive(s)) return traverse(s)
      if (isFunction(s)) return callWithErrorHandling(s, instance, ErrorCodes.WATCH_GETTER)
    })
  } else if (isFunction(source)) {
    if (cb) {
      // watch(fn, cb) 形式
      getter = () => callWithErrorHandling(source, instance, ErrorCodes.WATCH_GETTER)
    } else {
      // watchEffect(fn) 形式
      getter = () => {
        if (cleanup) {
          cleanup()  // 执行上一次的 cleanup
        }
        return callWithAsyncErrorHandling(
          source,
          instance,
          ErrorCodes.WATCH_CALLBACK,
          [onCleanup],
        )
      }
    }
  }
  
  // 2. 如果是 deep watch,包装 getter 以遍历所有属性
  if (cb && deep) {
    const baseGetter = getter
    getter = () => traverse(baseGetter())
  }
  
  // 3. 构建 job(调度器任务)
  const job: SchedulerJob = () => {
    if (!effect.active || !effect.dirty) {
      return
    }
    if (cb) {
      const newValue = effect.run()
      if (
        deep ||
        forceTrigger ||
        (isMultiSource
          ? (newValue as any[]).some((v, i) => hasChanged(v, (oldValue as any[])[i]))
          : hasChanged(newValue, oldValue))
      ) {
        if (cleanup) {
          cleanup()
        }
        callWithAsyncErrorHandling(cb, instance, ErrorCodes.WATCH_CALLBACK, [
          newValue,
          oldValue === INITIAL_WATCHER_VALUE
            ? undefined
            : (isMultiSource && oldValue[0] === INITIAL_WATCHER_VALUE)
              ? []
              : oldValue,
          onCleanup,
        ])
        oldValue = newValue
      }
    } else {
      effect.run()  // watchEffect 直接执行
    }
  }
  
  // 4. 根据 flush 策略设置 scheduler
  let scheduler: EffectScheduler
  if (flush === 'sync') {
    scheduler = job as any
  } else if (flush === 'post') {
    scheduler = () => queuePostFlushCb(job)
  } else {
    job.pre = true
    if (instance) job.id = instance.uid
    scheduler = () => queueJob(job)
  }
  
  // 5. 创建 ReactiveEffect
  const effect = new ReactiveEffect(getter, NOOP, scheduler)
  
  // 6. 根据 immediate 决定首次执行时机
  if (cb) {
    if (immediate) {
      job()
    } else {
      oldValue = effect.run()  // 只收集依赖,不执行 cb
    }
  } else if (flush === 'post') {
    queuePostFlushCb(effect.run.bind(effect))
  } else {
    effect.run()  // watchEffect 立即执行
  }
  
  // 7. 返回停止函数
  const unwatch = () => {
    effect.stop()
    if (instance && instance.scope) {
      remove(instance.scope.effects!, effect)
    }
  }
  
  return unwatch
}

组件 uid 与更新顺序的关系

每个 Vue 组件实例在创建时会分配一个自增的 uid

// packages/runtime-core/src/component.ts
let uid = 0

export function createComponentInstance(/* ... */) {
  const instance: ComponentInternalInstance = {
    uid: uid++,
    // ...
  }
}

组件的 render effect job.id = instance.uid,父组件先于子组件创建,因此 uid 更小,在 flushJobs 排序后父组件排在前面,保证了父→子的更新顺序。

这个设计解决了一个经典问题:如果子组件先更新,它的 vnode 可能引用了尚未更新的父组件 props,导致不一致。


Level 4 · 边界与陷阱(全体适用)

陷阱1:watch 监听 reactive 对象时 oldValue 失效

const state = reactive({ count: 0, name: 'Vue' })

watch(state, (newVal, oldVal) => {
  // ❌ newVal 和 oldVal 是同一个 Proxy 对象
  console.log(newVal === oldVal)  // true
  console.log(oldVal.count)       // 已经是新值!
})

state.count = 1  // 触发,但无法获得旧值

根因:reactive 对象是 Proxy,watch 内部 getter 返回同一个代理引用。即使 Vue 记录了 oldValue,它指向的也是同一个对象,该对象已经反映了最新状态。

解决方案

// ✅ 方案1:使用 getter 监听基础值
watch(() => state.count, (newVal, oldVal) => {
  console.log(newVal, oldVal)  // 正确的前后值
})

// ✅ 方案2:监听时返回克隆
watch(() => ({ ...state }), (newVal, oldVal) => {
  console.log(newVal.count, oldVal.count)  // 正确
})

陷阱2:flush: 'sync' 导致性能问题

const items = ref([])

// ❌ 危险:每次修改都同步执行 watch
watch(items, (newItems) => {
  processItems(newItems)  // 假设这个操作耗时 5ms
}, { flush: 'sync' })

// 下面的代码会导致 processItems 执行 1000 次!
for (let i = 0; i < 1000; i++) {
  items.value.push(i)  // 每次 push 都立即触发 watch
}
// 总耗时:5ms × 1000 = 5000ms

对比默认行为

watch(items, (newItems) => {
  processItems(newItems)
})  // 默认 flush: 'pre'

for (let i = 0; i < 1000; i++) {
  items.value.push(i)  // 只是 push,不触发 watch
}
// 下一个微任务时:processItems 执行 1 次,耗时 5ms

flush: 'sync' 只应在极少数需要立即响应的场景使用(如单元测试中验证同步行为)。

陷阱3:在 watch 外部创建的 watch 不会自动停止

// ❌ 在 setup 外部创建的 watch 不会随组件卸载而停止
export function useGlobalWatch() {
  // 如果这个函数在组件的 setup 之外调用(如模块顶层),
  // watch 不会在组件卸载时自动停止
  watch(someGlobalRef, callback)
}

// ✅ 正确:在 setup 内调用,或手动管理生命周期
export function useFeature() {
  const stop = watch(someRef, callback)
  
  // 组件卸载时停止
  onUnmounted(() => stop())
  
  return { stop }
}

setup() 内部调用 watch/watchEffect,Vue 会自动将 effect 注册到当前组件实例的 scope,组件卸载时自动清理。

陷阱4:异步 watchEffect 的依赖追踪只在同步部分

// ⚠️ 依赖追踪只在第一个 await 之前有效
watchEffect(async () => {
  const id = userId.value        // ✓ 追踪到 userId
  
  const data = await fetchUser(id)  // await 之后的代码...
  
  userData.value = data.result   // ✓ 这里是赋值,不需要追踪
  console.log(otherRef.value)    // ❌ 这里的 otherRef.value 不会被追踪!
  // 因为 await 之后 activeEffect 已经恢复为 null
})

根因ReactiveEffect.run() 在执行完同步部分后将 activeEffect 恢复为之前的值。async 函数在第一个 await 处暂停,此时 run() 函数已经返回,activeEffect 已经被清除。

解决方案:将需要追踪的依赖提前到 await 之前读取:

watchEffect(async () => {
  // 在 await 之前读取所有需要追踪的依赖
  const id = userId.value
  const filter = filterOptions.value  // 也要追踪 filterOptions
  
  const data = await fetchUser(id, filter)
  userData.value = data.result
})

本章小结

  1. watch 是懒的,watchEffect 是 eager 的:watch 首次不执行回调,只收集依赖;watchEffect 立即执行并收集依赖。这个差异决定了它们各自的适用场景——watch 用于响应已知数据源的变化(并关心前后值),watchEffect 用于同步副作用。

  2. 更新队列的核心是去重的微任务queueJob 去重 + Promise.resolve().then() 确保了同一宏任务内的多次状态变化只触发一次组件更新。这是 Vue 3 高性能的基础之一。

  3. flush 策略决定了 watch 回调与 DOM 更新的相对时序'pre' 在 DOM 更新前(无法访问最新 DOM),'post' 在 DOM 更新后(可以访问最新 DOM),'sync' 跳过队列立即执行(谨慎使用)。

  4. nextTick 的本质是微任务链:它等待当前的 flushJobs 完成,或直接返回一个已 resolved 的 Promise。理解这一点有助于正确判断 nextTick 之后 DOM 状态的稳定性。

  5. 竞态条件必须用 onCleanup + AbortController 处理:异步 watch 中依赖 stale 标志或顺序假设都不可靠。onCleanup 是 Vue 3 提供的正式竞态清理机制,配合 AbortController 可以完全解决异步请求的竞态问题。

本章评分
4.8  / 5  (45 评分)

💬 留言讨论