watch 与调度器:异步更新队列、flush 策略与竞态清理
第8章:watch 与调度器——异步更新队列、flush 策略与竞态清理
Vue 3 的更新队列是一个去重的微任务队列:同一组件在一个宏任务内发生 100 次状态变化,只会触发 1 次 DOM 更新。这不是巧合,而是
queueJob+Promise.resolve()精心设计的结果。
本章核心问题:watch 的回调为什么不是"状态一变就立刻执行"?nextTick 的本质是什么?异步 watch 里的竞态问题如何安全处理?
读完本章你将理解:
watch与watchEffect的本质区别及适用场景- 调度器(Scheduler)的三个核心函数如何协作实现批量更新
flush: 'pre'/'post'/'sync'三种策略的行为差异与适用场景nextTick的实现原理与微任务队列- 使用
onCleanup+AbortController正确处理竞态条件
Level 1 · 你需要知道的(1-3年经验)
watch 与 watchEffect 的直觉对比
Vue 3 提供了两个侦听 API,它们在使用体验上有明显差别:
watchEffect:立即执行,自动追踪依赖
import { ref, watchEffect } from 'vue'
const count = ref(0)
// 立即执行一次,追踪 count.value
watchEffect(() => {
console.log('count is:', count.value) // 立即打印 "count is: 0"
})
count.value = 1 // 异步打印 "count is: 1"
watch:懒执行,显式指定监听源
import { ref, watch } from 'vue'
const count = ref(0)
// 不立即执行,等待 count 变化
watch(count, (newVal, oldVal) => {
console.log(`count: ${oldVal} → ${newVal}`)
})
// 此时什么都不打印
count.value = 1 // 异步打印 "count: 0 → 1"
核心区别总结:
| 特性 | watchEffect | watch |
|---|---|---|
| 首次执行 | 立即(eager) | 需要 immediate: true |
| 依赖追踪 | 自动(运行时收集) | 显式指定 |
| 回调参数 | 无 old/new | 有 oldValue / newValue |
| 主要用途 | 副作用同步 | 精确响应特定数据变化 |
| 停止 | 返回 stop 函数 | 返回 stop 函数 |
watch 的三种数据源
watch 支持三种数据源,行为各有不同:
const count = ref(0)
const state = reactive({ count: 0, name: 'Vue' })
// 1. 监听 ref(直接传入,不用 .value)
watch(count, (newVal, oldVal) => {
console.log(newVal, oldVal)
})
// 2. 监听 reactive 对象(自动深度监听)
watch(state, (newState, oldState) => {
// ⚠️ 注意:监听 reactive 时 newState === oldState
// 因为是同一个 Proxy 对象
console.log(newState.count)
})
// 3. 监听 getter 函数(精确控制,性能最优)
watch(
() => state.count, // 只监听 state.count,不监听 state.name
(newVal, oldVal) => {
console.log(`state.count: ${oldVal} → ${newVal}`)
}
)
// 监听多个源(数组形式)
watch([count, () => state.name], ([newCount, newName], [oldCount, oldName]) => {
console.log(newCount, newName)
})
监听 reactive 对象的陷阱:
const state = reactive({ a: { b: 1 } })
// ❌ 错误:监听整个 reactive 对象,newVal 和 oldVal 是同一个引用
watch(state, (newVal, oldVal) => {
console.log(newVal === oldVal) // true!无法比较前后值
})
// ✅ 正确:用 getter 函数 + 返回基础值
watch(() => state.a.b, (newVal, oldVal) => {
console.log(newVal, oldVal) // 正确的前后值
})
// ✅ 正确:如果需要深度监听且关心前后状态,使用 deep 选项配合手动序列化
watch(() => JSON.stringify(state), (newStr, oldStr) => {
// 代价:每次变化都序列化,有性能成本
})
flush 选项:控制回调执行时机
flush 选项决定 watch 回调在组件更新周期的哪个阶段执行:
// flush: 'pre'(默认值)
// 在组件更新前执行,读到的是更新前的 DOM
watch(count, callback, { flush: 'pre' })
// flush: 'post'
// 在组件更新后执行,读到的是最新的 DOM
// 等价于 watchPostEffect
watch(count, callback, { flush: 'post' })
// flush: 'sync'
// 依赖变化时同步立即执行,跳过调度器
// ⚠️ 危险:可能在一次状态更新中触发多次
watch(count, callback, { flush: 'sync' })
实际选择指南:
// 需要在回调中访问更新后的 DOM?用 flush: 'post'
watch(list, async () => {
await nextTick() // 等价方案
const el = document.querySelector('.list-item')
// 此时 DOM 已更新
}, { flush: 'post' })
// 需要在 DOM 更新前做一些准备?用默认 'pre'
watch(isVisible, (val) => {
if (val) {
// 此时组件 DOM 还未更新,可以做 pre-transition 准备
prepareAnimation()
}
})
nextTick 的正确使用
import { ref, nextTick } from 'vue'
const count = ref(0)
async function increment() {
count.value++
count.value++
count.value++
// 此时 DOM 还未更新(批量更新还在队列里)
console.log(document.getElementById('count')?.textContent) // 旧值
await nextTick()
// 此时 DOM 已更新
console.log(document.getElementById('count')?.textContent) // "3"
}
Level 2 · 它是怎么运行的(3-5年经验)
调度器的三个核心函数
Vue 3 的批量更新机制建立在三个关键函数上:
1. queueJob(job):将一个更新任务加入队列
// 简化示意(packages/runtime-core/src/scheduler.ts)
const queue: SchedulerJob[] = []
let isFlushing = false
let isFlushPending = false
export function queueJob(job: SchedulerJob) {
// 去重检查:同一个 job(用 id 标识)不加入两次
if (
!queue.length ||
!queue.includes(job, isFlushing && job.allowRecurse ? flushIndex + 1 : flushIndex)
) {
if (job.id == null) {
queue.push(job)
} else {
// 按 id 排序插入(保证父组件先于子组件更新)
queue.splice(findInsertionIndex(job.id), 0, job)
}
queueFlush()
}
}
2. queueFlush():安排微任务刷新队列
function queueFlush() {
if (!isFlushing && !isFlushPending) {
isFlushPending = true
// 关键:通过 Promise.resolve() 安排微任务
currentFlushPromise = resolvedPromise.then(flushJobs)
}
}
3. flushJobs():实际执行队列中的所有任务
function flushJobs(seen?: CountMap) {
isFlushPending = false
isFlushing = true
// 排序:保证父组件在子组件之前更新
queue.sort(comparator)
try {
for (flushIndex = 0; flushIndex < queue.length; flushIndex++) {
const job = queue[flushIndex]
if (job && job.active !== false) {
callWithErrorHandling(job, null, ErrorCodes.SCHEDULER)
}
}
} finally {
flushIndex = 0
queue.length = 0
// 刷新 post-flush 队列(flush: 'post' 的 watch 回调)
flushPostFlushCbs(seen)
isFlushing = false
currentFlushPromise = null
// 如果刷新过程中又产生了新任务,继续刷新
if (queue.length || pendingPostFlushCbs.length) {
flushJobs(seen)
}
}
}
三函数协作的完整流程图
用户代码修改多个 ref
│
▼
ref.value = 1
ref.value = 2 ─────► triggerRefValue(ref)
ref.value = 3 │
▼
遍历 ref.dep 中的 effects
│
┌───────┴──────────┐
▼ ▼
组件 render effect watch effect
│ │
▼ ▼
queueJob(renderJob) queueJob(watchJob)
│ │
└──────┬───────────┘
▼
queue = [renderJob, watchJob]
│
▼
queueFlush()(第一次调用)
│
▼
isFlushPending = true
Promise.resolve().then(flushJobs)
│
│(微任务,当前同步代码执行完后)
▼
flushJobs()
│
┌──────┴──────────┐
▼ ▼
watchJob执行 renderJob执行
(flush:pre,先于render) (更新 DOM)
│
▼
flushPostFlushCbs()
│
▼
watchJob执行(flush:post)
nextTick 的微任务本质
// packages/runtime-core/src/scheduler.ts
let resolvedPromise: Promise<any> = Promise.resolve() as any
let currentFlushPromise: Promise<void> | null = null
export function nextTick<T = void>(
this: T,
fn?: (this: T) => void,
): Promise<void> {
const p = currentFlushPromise || resolvedPromise
return fn ? p.then(this ? fn.bind(this) : fn) : p
}
关键理解:
- 如果当前有正在刷新的任务(
currentFlushPromise不为 null),nextTick 等待它完成后再执行 - 如果没有正在刷新的任务,等待
resolvedPromise(已 resolved 的 Promise)完成——这意味着回调会在下一个微任务时机执行
微任务 vs 宏任务对比:
宏任务(MacroTask):setTimeout, setInterval, I/O, UI rendering
微任务(MicroTask):Promise.then, MutationObserver, queueMicrotask
执行顺序:
同步代码 → 所有微任务 → UI渲染 → 下一个宏任务
↑
nextTick 在这里执行
Vue 3 刻意选择微任务(Promise)而非宏任务(setTimeout),原因是微任务在同一帧内执行,用户不会看到中间状态(避免闪烁)。
flush: 'pre' 与 flush: 'post' 的执行位置
同步代码执行 → count.value++ → queueJob(renderJob) + queueJob(watchPreJob)
│
▼
Promise 微任务开始
│
┌────────┴─────────┐
▼ ▼
flush:'pre' watch回调 组件 render
(此时 DOM 未更新) (DOM 更新)
│
▼
flushPostFlushCbs()
│
▼
flush:'post' watch回调
(此时 DOM 已更新)
watch 的停止机制
const stop = watch(count, callback)
// 停止监听
stop()
stop() 的内部实现调用了 effect.stop():
// packages/reactivity/src/effect.ts
stop() {
if (this.active) {
cleanupEffect(this) // 从所有 dep 中移除自身
if (this.onStop) {
this.onStop() // 触发 onStop 钩子
}
this.active = false
}
}
停止后,effect 的 active = false,即使依赖变化也不会触发回调。
竞态条件(Race Condition)的问题与解法
问题场景:
// ❌ 有竞态风险的代码
const userId = ref(1)
watch(userId, async (newId) => {
const data = await fetchUserData(newId)
// ⚠️ 如果 userId 在请求期间改变了,
// 后发先回的请求会覆盖先发后回的结果!
userData.value = data
})
// 场景:
// t=0: userId=1,发起请求A
// t=100ms: userId=2,发起请求B
// t=200ms: 请求B返回(因为用户2数据小)
// t=500ms: 请求A返回(因为用户1数据大)
// 结果:userData 显示的是用户1的数据,但 userId 是 2!
正确解法:onCleanup + AbortController:
// ✅ 正确处理竞态
const userId = ref(1)
const userData = ref(null)
const isLoading = ref(false)
watch(userId, async (newId, oldId, onCleanup) => {
// 创建一个 AbortController
const controller = new AbortController()
// onCleanup 在以下情况触发:
// 1. 下一次 watch 回调执行前(依赖变化时)
// 2. watch 被停止时
onCleanup(() => {
controller.abort() // 取消上一次还在进行的请求
})
isLoading.value = true
try {
const response = await fetch(`/api/users/${newId}`, {
signal: controller.signal
})
if (response.ok) {
userData.value = await response.json()
}
} catch (err) {
if (err.name === 'AbortError') {
// 请求被正常取消,忽略错误
return
}
console.error('请求失败:', err)
} finally {
isLoading.value = false
}
})
onCleanup 的触发时机图:
userId = 1 → watch回调1开始 → 发出请求A
│
userId = 2 → watch回调1的 onCleanup 触发 → controller1.abort() → 请求A取消
→ watch回调2开始 → 发出请求B
│
userId = 3 → watch回调2的 onCleanup 触发 → controller2.abort() → 请求B取消
→ watch回调3开始 → 发出请求C
│
请求C返回 → userData 更新 ✓
watchEffect 的依赖追踪与停止
// watchEffect 的 cleanup 与 watch 相同
const stop = watchEffect((onCleanup) => {
const controller = new AbortController()
onCleanup(() => controller.abort())
fetch(`/api/data?q=${searchQuery.value}`, {
signal: controller.signal
}).then(res => res.json())
.then(data => { results.value = data })
})
// 组件卸载时自动停止(在 setup 内创建的 watch/watchEffect 会绑定组件生命周期)
// 也可手动停止
stop()
Level 3 · 设计文档与源码(资深开发者)
queueJob 的 id 排序策略
文件路径:packages/runtime-core/src/scheduler.ts
// 通过 job.id 排序,保证父组件先于子组件执行
// 子组件的 id 总是大于父组件(创建顺序决定 id)
function comparator(a: SchedulerJob, b: SchedulerJob): number {
const diff = getId(a) - getId(b)
if (diff === 0) {
// 如果 id 相同,pre-flush job 先执行
if (a.pre && !b.pre) return -1
if (b.pre && !a.pre) return 1
}
return diff
}
function findInsertionIndex(id: number) {
let start = flushIndex + 1
let end = queue.length
while (start < end) {
const middle = (start + end) >>> 1
const middleJob = queue[middle]
const middleJobId = getId(middleJob)
if (middleJobId < id || (middleJobId === id && middleJob.pre && !isFlushing)) {
start = middle + 1
} else {
end = middle
}
}
return start
}
排序的意义:父组件的 id 小于子组件,因此父组件先更新。如果父组件更新导致子组件的 props 改变(子组件需要因此更新),这个顺序保证了正确的更新级联。
watch 的内部实现:doWatch 函数
文件路径:packages/runtime-core/src/apiWatch.ts
function doWatch(
source: WatchSource | WatchSource[] | WatchEffect | object,
cb: WatchCallback | null,
{ immediate, deep, flush, once, onTrack, onTrigger }: WatchOptions = EMPTY_OBJ,
): WatchStopHandle {
// 1. 根据 source 类型构建 getter 函数
let getter: () => any
let forceTrigger = false
let isMultiSource = false
if (isRef(source)) {
getter = () => source.value
forceTrigger = isShallow(source)
} else if (isReactive(source)) {
getter = () => source
deep = true // reactive 对象自动深度监听
} else if (isArray(source)) {
isMultiSource = true
forceTrigger = source.some(s => isReactive(s) || isShallow(s))
getter = () => source.map(s => {
if (isRef(s)) return s.value
if (isReactive(s)) return traverse(s)
if (isFunction(s)) return callWithErrorHandling(s, instance, ErrorCodes.WATCH_GETTER)
})
} else if (isFunction(source)) {
if (cb) {
// watch(fn, cb) 形式
getter = () => callWithErrorHandling(source, instance, ErrorCodes.WATCH_GETTER)
} else {
// watchEffect(fn) 形式
getter = () => {
if (cleanup) {
cleanup() // 执行上一次的 cleanup
}
return callWithAsyncErrorHandling(
source,
instance,
ErrorCodes.WATCH_CALLBACK,
[onCleanup],
)
}
}
}
// 2. 如果是 deep watch,包装 getter 以遍历所有属性
if (cb && deep) {
const baseGetter = getter
getter = () => traverse(baseGetter())
}
// 3. 构建 job(调度器任务)
const job: SchedulerJob = () => {
if (!effect.active || !effect.dirty) {
return
}
if (cb) {
const newValue = effect.run()
if (
deep ||
forceTrigger ||
(isMultiSource
? (newValue as any[]).some((v, i) => hasChanged(v, (oldValue as any[])[i]))
: hasChanged(newValue, oldValue))
) {
if (cleanup) {
cleanup()
}
callWithAsyncErrorHandling(cb, instance, ErrorCodes.WATCH_CALLBACK, [
newValue,
oldValue === INITIAL_WATCHER_VALUE
? undefined
: (isMultiSource && oldValue[0] === INITIAL_WATCHER_VALUE)
? []
: oldValue,
onCleanup,
])
oldValue = newValue
}
} else {
effect.run() // watchEffect 直接执行
}
}
// 4. 根据 flush 策略设置 scheduler
let scheduler: EffectScheduler
if (flush === 'sync') {
scheduler = job as any
} else if (flush === 'post') {
scheduler = () => queuePostFlushCb(job)
} else {
job.pre = true
if (instance) job.id = instance.uid
scheduler = () => queueJob(job)
}
// 5. 创建 ReactiveEffect
const effect = new ReactiveEffect(getter, NOOP, scheduler)
// 6. 根据 immediate 决定首次执行时机
if (cb) {
if (immediate) {
job()
} else {
oldValue = effect.run() // 只收集依赖,不执行 cb
}
} else if (flush === 'post') {
queuePostFlushCb(effect.run.bind(effect))
} else {
effect.run() // watchEffect 立即执行
}
// 7. 返回停止函数
const unwatch = () => {
effect.stop()
if (instance && instance.scope) {
remove(instance.scope.effects!, effect)
}
}
return unwatch
}
组件 uid 与更新顺序的关系
每个 Vue 组件实例在创建时会分配一个自增的 uid:
// packages/runtime-core/src/component.ts
let uid = 0
export function createComponentInstance(/* ... */) {
const instance: ComponentInternalInstance = {
uid: uid++,
// ...
}
}
组件的 render effect job.id = instance.uid,父组件先于子组件创建,因此 uid 更小,在 flushJobs 排序后父组件排在前面,保证了父→子的更新顺序。
这个设计解决了一个经典问题:如果子组件先更新,它的 vnode 可能引用了尚未更新的父组件 props,导致不一致。
Level 4 · 边界与陷阱(全体适用)
陷阱1:watch 监听 reactive 对象时 oldValue 失效
const state = reactive({ count: 0, name: 'Vue' })
watch(state, (newVal, oldVal) => {
// ❌ newVal 和 oldVal 是同一个 Proxy 对象
console.log(newVal === oldVal) // true
console.log(oldVal.count) // 已经是新值!
})
state.count = 1 // 触发,但无法获得旧值
根因:reactive 对象是 Proxy,watch 内部 getter 返回同一个代理引用。即使 Vue 记录了 oldValue,它指向的也是同一个对象,该对象已经反映了最新状态。
解决方案:
// ✅ 方案1:使用 getter 监听基础值
watch(() => state.count, (newVal, oldVal) => {
console.log(newVal, oldVal) // 正确的前后值
})
// ✅ 方案2:监听时返回克隆
watch(() => ({ ...state }), (newVal, oldVal) => {
console.log(newVal.count, oldVal.count) // 正确
})
陷阱2:flush: 'sync' 导致性能问题
const items = ref([])
// ❌ 危险:每次修改都同步执行 watch
watch(items, (newItems) => {
processItems(newItems) // 假设这个操作耗时 5ms
}, { flush: 'sync' })
// 下面的代码会导致 processItems 执行 1000 次!
for (let i = 0; i < 1000; i++) {
items.value.push(i) // 每次 push 都立即触发 watch
}
// 总耗时:5ms × 1000 = 5000ms
对比默认行为:
watch(items, (newItems) => {
processItems(newItems)
}) // 默认 flush: 'pre'
for (let i = 0; i < 1000; i++) {
items.value.push(i) // 只是 push,不触发 watch
}
// 下一个微任务时:processItems 执行 1 次,耗时 5ms
flush: 'sync' 只应在极少数需要立即响应的场景使用(如单元测试中验证同步行为)。
陷阱3:在 watch 外部创建的 watch 不会自动停止
// ❌ 在 setup 外部创建的 watch 不会随组件卸载而停止
export function useGlobalWatch() {
// 如果这个函数在组件的 setup 之外调用(如模块顶层),
// watch 不会在组件卸载时自动停止
watch(someGlobalRef, callback)
}
// ✅ 正确:在 setup 内调用,或手动管理生命周期
export function useFeature() {
const stop = watch(someRef, callback)
// 组件卸载时停止
onUnmounted(() => stop())
return { stop }
}
在 setup() 内部调用 watch/watchEffect,Vue 会自动将 effect 注册到当前组件实例的 scope,组件卸载时自动清理。
陷阱4:异步 watchEffect 的依赖追踪只在同步部分
// ⚠️ 依赖追踪只在第一个 await 之前有效
watchEffect(async () => {
const id = userId.value // ✓ 追踪到 userId
const data = await fetchUser(id) // await 之后的代码...
userData.value = data.result // ✓ 这里是赋值,不需要追踪
console.log(otherRef.value) // ❌ 这里的 otherRef.value 不会被追踪!
// 因为 await 之后 activeEffect 已经恢复为 null
})
根因:ReactiveEffect.run() 在执行完同步部分后将 activeEffect 恢复为之前的值。async 函数在第一个 await 处暂停,此时 run() 函数已经返回,activeEffect 已经被清除。
解决方案:将需要追踪的依赖提前到 await 之前读取:
watchEffect(async () => {
// 在 await 之前读取所有需要追踪的依赖
const id = userId.value
const filter = filterOptions.value // 也要追踪 filterOptions
const data = await fetchUser(id, filter)
userData.value = data.result
})
本章小结
-
watch 是懒的,watchEffect 是 eager 的:watch 首次不执行回调,只收集依赖;watchEffect 立即执行并收集依赖。这个差异决定了它们各自的适用场景——watch 用于响应已知数据源的变化(并关心前后值),watchEffect 用于同步副作用。
-
更新队列的核心是去重的微任务:
queueJob去重 +Promise.resolve().then()确保了同一宏任务内的多次状态变化只触发一次组件更新。这是 Vue 3 高性能的基础之一。 -
flush 策略决定了 watch 回调与 DOM 更新的相对时序:
'pre'在 DOM 更新前(无法访问最新 DOM),'post'在 DOM 更新后(可以访问最新 DOM),'sync'跳过队列立即执行(谨慎使用)。 -
nextTick 的本质是微任务链:它等待当前的 flushJobs 完成,或直接返回一个已 resolved 的 Promise。理解这一点有助于正确判断 nextTick 之后 DOM 状态的稳定性。
-
竞态条件必须用 onCleanup + AbortController 处理:异步 watch 中依赖
stale标志或顺序假设都不可靠。onCleanup是 Vue 3 提供的正式竞态清理机制,配合AbortController可以完全解决异步请求的竞态问题。