拓扑排序与有向图
第十七章:拓扑排序与有向图
上一章我们学习了图的表示和遍历——BFS 和 DFS。在图的众多应用中,有一类问题极其常见且重要:给定一组任务和它们之间的依赖关系,找到一个合法的执行顺序。 例如:大学课程有先修关系,你必须先修完线性代数才能学机器学习;软件项目中模块 A 依赖模块 B,必须先编译 B 才能编译 A;烹饪一道菜,必须先切菜才能下锅。
这就是拓扑排序(Topological Sort) 要解决的问题。它只适用于有向无环图(DAG, Directed Acyclic Graph)——如果有环(A 依赖 B,B 依赖 C,C 依赖 A),则不可能存在合法的顺序。
本章我们将:(1) 理解拓扑排序的概念和性质;(2) 掌握两种经典实现——BFS(Kahn 算法)和 DFS;(3) 将拓扑排序应用于课程表、编译依赖、关键路径等实际问题;(4) 解决相关的高频面试题。
Level 1 · 你需要知道的
17.1 什么是拓扑排序?
定义:给定有向无环图 G = (V, E),拓扑排序是 V 的一个线性序列,使得对于图中每条有向边 (u, v),u 在序列中出现在 v 之前。
直观理解:拓扑排序是一种"尊重依赖关系的排列"。如果 A 必须在 B 之前完成(存在边 A→B),那么排序后 A 一定排在 B 前面。
示例:课程依赖关系
数学 → 物理 → 量子力学
数学 → 线性代数 → 机器学习
编程 → 数据结构 → 算法
有效的拓扑排序:
[数学, 编程, 物理, 线性代数, 数据结构, 量子力学, 机器学习, 算法]
[编程, 数学, 数据结构, 物理, 线性代数, 算法, 量子力学, 机器学习]
... (多种合法排序)
注意:拓扑排序通常不唯一!
关键性质:
- 存在性:拓扑排序存在 ⟺ 图是 DAG(无环)
- 非唯一性:一个 DAG 通常有多种合法的拓扑排序
- 只对有向图有意义:无向图没有"方向",无法定义"前后"关系
17.2 BFS 实现:Kahn 算法
Kahn 算法(1962)的思想极其优雅:
- 找到所有入度为 0的顶点(没有任何依赖的任务)——这些可以最先执行
- 将它们加入结果,并将它们从图中"移除"(即将它们指向的顶点的入度减 1)
- 重复步骤 1-2,直到所有顶点都被处理,或者找不到入度为 0 的顶点(说明有环)
from collections import deque, defaultdict
def topological_sort_kahn(num_vertices: int, edges: list) -> list:
"""
Kahn 算法(BFS 拓扑排序)
参数:
num_vertices: 顶点数量 (0 到 num_vertices-1)
edges: 边列表 [(u, v), ...] 表示 u → v 的依赖
返回:
拓扑排序列表,如果有环则返回空列表
时间复杂度: O(V + E)
空间复杂度: O(V + E)
"""
# 建图 + 计算入度
graph = defaultdict(list)
in_degree = [0] * num_vertices
for u, v in edges:
graph[u].append(v)
in_degree[v] += 1
# 所有入度为 0 的顶点入队
queue = deque()
for i in range(num_vertices):
if in_degree[i] == 0:
queue.append(i)
result = []
while queue:
node = queue.popleft()
result.append(node)
# "移除" node:将其邻居的入度减 1
for neighbor in graph[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
# 如果结果包含所有顶点,则排序成功(无环)
if len(result) == num_vertices:
return result
else:
return [] # 有环,无法完成拓扑排序
# 示例
# 0 → 1 → 3
# 0 → 2 → 3
# 2 → 4
edges = [(0,1), (0,2), (1,3), (2,3), (2,4)]
print(topological_sort_kahn(5, edges)) # [0, 1, 2, 3, 4] 或 [0, 2, 1, 4, 3] 等
# 有环的情况
edges_cycle = [(0,1), (1,2), (2,0)]
print(topological_sort_kahn(3, edges_cycle)) # [] (有环)
为什么 Kahn 算法有效?
直觉:入度为 0 的顶点没有任何前置依赖,所以可以安全地排在当前位置。移除它们后,新暴露出的入度为 0 的顶点同样可以安全排列。这个过程如同"层层剥洋葱"。
正确性证明:
- 如果图是 DAG,则必然存在至少一个入度为 0 的顶点(否则可以从任意顶点沿入边回溯,因为图有限,必然形成环——矛盾)。
- 移除入度为 0 的顶点后,剩余图仍是 DAG(DAG 的子图仍是 DAG)。
- 因此算法不会卡住,最终处理完所有顶点。
17.3 DFS 实现
DFS 拓扑排序的关键洞察:一个顶点在 DFS 中"完成"的时刻,意味着它的所有后继(依赖它的节点)都已经被探索完毕。 因此,按"完成时间"的逆序排列就是拓扑排序。
def topological_sort_dfs(num_vertices: int, edges: list) -> list:
"""
DFS 拓扑排序
核心思想:按 DFS 完成时间的逆序排列
时间复杂度: O(V + E)
空间复杂度: O(V + E)
"""
graph = defaultdict(list)
for u, v in edges:
graph[u].append(v)
# 三色标记检测环
WHITE, GRAY, BLACK = 0, 1, 2
color = [WHITE] * num_vertices
result = []
has_cycle = False
def dfs(node):
nonlocal has_cycle
if has_cycle:
return
color[node] = GRAY
for neighbor in graph[node]:
if color[neighbor] == GRAY:
has_cycle = True # 发现回边 → 有环
return
if color[neighbor] == WHITE:
dfs(neighbor)
if has_cycle:
return
color[node] = BLACK
result.append(node) # 完成时加入结果(后续会逆序)
for i in range(num_vertices):
if color[i] == WHITE:
dfs(i)
if has_cycle:
return []
result.reverse() # 逆序得到拓扑排序
return result
# 示例
edges = [(0,1), (0,2), (1,3), (2,3), (2,4)]
print(topological_sort_dfs(5, edges)) # 一种合法的拓扑排序
# 有环
edges_cycle = [(0,1), (1,2), (2,0)]
print(topological_sort_dfs(3, edges_cycle)) # []
为什么"完成时间逆序"是拓扑排序?
对于任何边 (u, v):
- DFS 从 u 出发时,要么 v 还未被发现(白色),要么 v 已经完成(黑色)。
- 如果 v 是白色,DFS 会先访问 v 并完成 v,然后才完成 u。所以 f[v] < f[u]。
- 如果 v 是黑色,f[v] < f[u](v 早于 u 完成)。
- 如果 v 是灰色,这是回边——意味着有环,不是 DAG。
总之,在 DAG 中,对于边 (u, v),总有 f[u] > f[v]。按完成时间降序排列,u 排在 v 前面。正是拓扑排序的定义。
17.4 环检测与拓扑排序的关系
环检测和拓扑排序是一体两面:
- 拓扑排序成功 → 图是 DAG(无环)
- 拓扑排序失败 → 图有环
因此,很多"判断是否有环"的题目,实质上就是"尝试做拓扑排序"。
Kahn 算法的环检测:如果处理完所有入度为 0 的节点后,result 的长度 < V,说明有些节点永远不会变成入度 0——它们形成了环。
DFS 的环检测:如果 DFS 过程中遇到灰色节点(正在递归栈中的节点),说明找到了回边——有环。
def detect_cycle_with_path(num_vertices: int, edges: list) -> list:
"""
检测有向图中的环,并返回环上的节点
返回环的节点列表(如果有环),否则返回空列表
"""
graph = defaultdict(list)
for u, v in edges:
graph[u].append(v)
WHITE, GRAY, BLACK = 0, 1, 2
color = [WHITE] * num_vertices
parent = [-1] * num_vertices
cycle = []
def dfs(node):
color[node] = GRAY
for neighbor in graph[node]:
if color[neighbor] == GRAY:
# 找到环!从 neighbor 到 node 再回到 neighbor
path = [neighbor, node]
cur = node
while parent[cur] != neighbor and parent[cur] != -1:
cur = parent[cur]
path.append(cur)
cycle.extend(reversed(path))
return True
if color[neighbor] == WHITE:
parent[neighbor] = node
if dfs(neighbor):
return True
color[node] = BLACK
return False
for i in range(num_vertices):
if color[i] == WHITE:
if dfs(i):
return cycle
return []
# 示例
edges = [(0,1), (1,2), (2,3), (3,1)] # 环: 1→2→3→1
print(detect_cycle_with_path(4, edges))
17.5 两种算法的对比
| 特性 | Kahn 算法 (BFS) | DFS 拓扑排序 |
|---|---|---|
| 数据结构 | 队列 + 入度数组 | 递归栈 + 颜色标记 |
| 思路 | "入度为 0 先输出" | "完成时间逆序" |
| 环检测 | result.size < V | 遇到灰色节点 |
| 字典序最小排序 | 用最小堆代替普通队列即可 | 不容易实现 |
| 所有拓扑排序 | 容易扩展为回溯法 | 较难 |
| 在线/增量 | 较好支持 | 需要重新跑 |
什么时候用 Kahn?
- 需要字典序最小的拓扑排序
- 需要统计拓扑排序的数量
- 需要"层"的概念(如最短路径/关键路径)
什么时候用 DFS?
- 已经有 DFS 框架(如同时做其他分析)
- 代码简洁优先
17.6 常见错误
错误1:混淆边的方向
# 题目说 [a, b] 表示 "a 依赖 b"(学 a 之前要先学 b)
# 建图时边的方向应该是 b → a
# 很多人搞反了!
# 正确
for a, b in prerequisites:
graph[b].append(a) # b 是前置,a 是后置
in_degree[a] += 1
# 错误
for a, b in prerequisites:
graph[a].append(b) # 方向反了!
错误2:忘记处理不连通的图
# DAG 可能不连通(有孤立节点或多个连通分量)
# Kahn 算法天然处理:孤立节点入度为 0,直接入队
# DFS:必须对所有未访问节点都调用 dfs()
for i in range(num_vertices): # 不能只从节点 0 开始!
if color[i] == WHITE:
dfs(i)
错误3:以为拓扑排序唯一
# DAG 0→2, 1→2 的合法拓扑排序有 [0,1,2] 和 [1,0,2]
# 如果题目要求唯一答案(如字典序最小),需要特殊处理
Level 2 · 它是怎么运行的
17.7 课程表系列完整解析
LeetCode 207:课程表(判断能否完成)
这道题在上一章已经出现。核心就是判断有向图是否有环。我们这里给出 Kahn 算法的详细过程分析。
def canFinish(numCourses: int, prerequisites: list) -> bool:
"""
LeetCode 207: Course Schedule
本质:有向图环检测 = 尝试拓扑排序
"""
graph = defaultdict(list)
in_degree = [0] * numCourses
for course, prereq in prerequisites:
graph[prereq].append(course) # prereq → course
in_degree[course] += 1
queue = deque([i for i in range(numCourses) if in_degree[i] == 0])
processed = 0
while queue:
node = queue.popleft()
processed += 1
for neighbor in graph[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return processed == numCourses
执行过程追踪:
输入: numCourses=4, prerequisites=[[1,0],[2,0],[3,1],[3,2]]
图: 0→1, 0→2, 1→3, 2→3
入度: [0, 1, 1, 2]
初始队列: [0] (入度为0)
步骤1: 处理 0, 邻居 1 入度→0, 邻居 2 入度→0, 队列=[1,2]
步骤2: 处理 1, 邻居 3 入度→1, 队列=[2]
步骤3: 处理 2, 邻居 3 入度→0, 队列=[3]
步骤4: 处理 3, 无邻居, 队列=[]
处理了 4 个节点 = numCourses → 无环 → 可以完成
LeetCode 210:课程表 II(返回修课顺序)
def findOrder(numCourses: int, prerequisites: list) -> list:
"""
LeetCode 210: Course Schedule II
返回一个可行的修课顺序(拓扑排序)
如果无法完成则返回空列表
"""
graph = defaultdict(list)
in_degree = [0] * numCourses
for course, prereq in prerequisites:
graph[prereq].append(course)
in_degree[course] += 1
queue = deque([i for i in range(numCourses) if in_degree[i] == 0])
order = []
while queue:
node = queue.popleft()
order.append(node)
for neighbor in graph[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return order if len(order) == numCourses else []
# 测试
print(findOrder(4, [[1,0],[2,0],[3,1],[3,2]])) # [0, 1, 2, 3] 或 [0, 2, 1, 3]
print(findOrder(2, [[1,0]])) # [0, 1]
print(findOrder(2, [[0,1],[1,0]])) # [] (有环)
DFS 版本(更常见于面试 follow-up):
def findOrder_dfs(numCourses: int, prerequisites: list) -> list:
"""DFS 版本的课程表 II"""
graph = defaultdict(list)
for course, prereq in prerequisites:
graph[prereq].append(course)
WHITE, GRAY, BLACK = 0, 1, 2
color = [WHITE] * numCourses
result = []
def dfs(node) -> bool:
"""返回 True 表示无环"""
color[node] = GRAY
for neighbor in graph[node]:
if color[neighbor] == GRAY:
return False # 有环
if color[neighbor] == WHITE:
if not dfs(neighbor):
return False
color[node] = BLACK
result.append(node)
return True
for i in range(numCourses):
if color[i] == WHITE:
if not dfs(i):
return []
return result[::-1] # 逆序
17.8 编译依赖分析
拓扑排序最经典的工程应用之一是编译系统。在大型软件项目中,源文件之间存在依赖关系(#include、import),编译器必须按正确的顺序编译这些文件。
Make 和 Bazel 的核心原理:
- 解析所有文件的依赖关系,构建依赖图(DAG)
- 对依赖图做拓扑排序,得到编译顺序
- 按拓扑序编译(保证每个文件编译时,它依赖的文件都已编译完毕)
class BuildSystem:
"""简化的编译系统"""
def __init__(self):
self.dependencies = defaultdict(set) # module → set of dependencies
self.dependents = defaultdict(set) # module → set of dependents
def add_dependency(self, module: str, depends_on: str):
"""声明 module 依赖 depends_on"""
self.dependencies[module].add(depends_on)
self.dependents[depends_on].add(module)
# 确保两个模块都在图中
if module not in self.dependents:
self.dependents[module] # 触发 defaultdict 创建空集合
if depends_on not in self.dependencies:
self.dependencies[depends_on]
def get_build_order(self) -> list:
"""
返回编译顺序(拓扑排序)
如果有循环依赖,抛出异常
"""
# 计算入度
all_modules = set(self.dependencies.keys()) | set(self.dependents.keys())
in_degree = {m: len(self.dependencies[m]) for m in all_modules}
# Kahn 算法
queue = deque([m for m in all_modules if in_degree[m] == 0])
order = []
while queue:
module = queue.popleft()
order.append(module)
for dependent in self.dependents[module]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
if len(order) != len(all_modules):
cycle_modules = [m for m in all_modules if in_degree[m] > 0]
raise ValueError(f"Circular dependency detected: {cycle_modules}")
return order
def parallel_build_plan(self) -> list:
"""
返回并行编译计划:每一"波"可以并行编译的模块
这本质上是 BFS 的分层结果
"""
all_modules = set(self.dependencies.keys()) | set(self.dependents.keys())
in_degree = {m: len(self.dependencies[m]) for m in all_modules}
queue = deque([m for m in all_modules if in_degree[m] == 0])
waves = []
while queue:
wave = list(queue)
waves.append(wave)
next_queue = deque()
for module in wave:
for dependent in self.dependents[module]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
next_queue.append(dependent)
queue = next_queue
return waves
# 示例:C++ 项目的编译依赖
build = BuildSystem()
build.add_dependency("main.cpp", "utils.h")
build.add_dependency("main.cpp", "network.h")
build.add_dependency("network.cpp", "network.h")
build.add_dependency("network.cpp", "utils.h")
build.add_dependency("network.h", "utils.h")
build.add_dependency("tests.cpp", "main.cpp")
build.add_dependency("tests.cpp", "network.cpp")
print("编译顺序:", build.get_build_order())
print("并行编译计划:", build.parallel_build_plan())
# Wave 1: [utils.h] (无依赖)
# Wave 2: [network.h] (只依赖 utils.h)
# Wave 3: [main.cpp, network.cpp] (可以并行)
# Wave 4: [tests.cpp] (依赖上面的)
并行编译的关键洞察:Kahn 算法中,每一"层"(同时入度变为 0 的节点)可以并行处理。这就是 make -j8 能加速编译的原理——它利用拓扑排序的层次结构,在不违反依赖的前提下最大化并行度。
17.9 关键路径(最长路径 / Critical Path)
在项目管理中,关键路径是项目完成所需的最短时间——它是 DAG 中从起点到终点的最长路径(因为所有任务必须完成,瓶颈在最长链上)。
问题定义:给定 DAG,每个顶点有一个权重(任务所需时间),求从起点到终点的最长路径。
关键观察:在 DAG 中求最长路径不是 NP-hard 的(一般图中是)!因为可以利用拓扑排序做动态规划。
def critical_path(num_tasks: int, edges: list, durations: list) -> tuple:
"""
关键路径算法
参数:
num_tasks: 任务数量
edges: [(u, v), ...] 表示 u 必须在 v 之前完成
durations: 每个任务的耗时
返回:
(最早完成时间, 关键路径上的任务列表)
"""
graph = defaultdict(list)
reverse_graph = defaultdict(list)
in_degree = [0] * num_tasks
for u, v in edges:
graph[u].append(v)
reverse_graph[v].append(u)
in_degree[v] += 1
# 步骤1:拓扑排序
topo_order = []
queue = deque([i for i in range(num_tasks) if in_degree[i] == 0])
temp_in_degree = in_degree[:]
while queue:
node = queue.popleft()
topo_order.append(node)
for neighbor in graph[node]:
temp_in_degree[neighbor] -= 1
if temp_in_degree[neighbor] == 0:
queue.append(neighbor)
# 步骤2:按拓扑序计算最早开始时间(EST)
# EST[v] = max(EST[u] + duration[u]) 对所有 u→v
est = [0] * num_tasks
predecessor = [-1] * num_tasks
for node in topo_order:
for neighbor in graph[node]:
new_est = est[node] + durations[node]
if new_est > est[neighbor]:
est[neighbor] = new_est
predecessor[neighbor] = node
# 步骤3:找到最早完成时间(项目总耗时)
# 找所有"汇点"(出度为 0 的节点),取 EST + duration 的最大值
project_end_time = 0
end_task = -1
for i in range(num_tasks):
finish_time = est[i] + durations[i]
if finish_time > project_end_time:
project_end_time = finish_time
end_task = i
# 步骤4:回溯关键路径
critical = []
cur = end_task
while cur != -1:
critical.append(cur)
cur = predecessor[cur]
critical.reverse()
return project_end_time, critical
# 示例:软件项目
# 任务: 0=需求分析(3天), 1=设计(5天), 2=编码(8天), 3=测试(4天), 4=文档(2天), 5=部署(1天)
tasks = 6
durations = [3, 5, 8, 4, 2, 1]
deps = [
(0, 1), # 需求→设计
(0, 4), # 需求→文档
(1, 2), # 设计→编码
(2, 3), # 编码→测试
(3, 5), # 测试→部署
(4, 5), # 文档→部署
]
total_time, critical = critical_path(tasks, deps, durations)
print(f"项目最早完成时间: {total_time} 天")
print(f"关键路径: {critical}")
# 关键路径: 需求(3) → 设计(5) → 编码(8) → 测试(4) → 部署(1) = 21天
# 文档路径: 需求(3) → 文档(2) → 部署(1) = 6天(非关键)
关键路径方法(CPM, Critical Path Method) 的工程意义:
- 确定项目最短工期:关键路径的总长度就是项目不可压缩的最短时间
- 识别瓶颈:关键路径上的任务延迟 1 天,项目就延迟 1 天;非关键任务有"浮动时间"
- 资源分配:非关键路径上的任务可以延后,把资源集中到关键路径上
17.10 拓扑排序与动态规划
拓扑排序和动态规划有深刻的联系:DAG 上的 DP 就是按拓扑序计算。
许多 DP 问题本质上是在 DAG 上做运算:
- 状态是 DAG 的顶点
- 状态转移是 DAG 的边
- 计算顺序就是拓扑排序
def dag_longest_path(num_vertices: int, edges: list, weights: dict) -> int:
"""
DAG 上的最长路径(带边权)
weights: {(u,v): weight} 边权字典
"""
graph = defaultdict(list)
in_degree = [0] * num_vertices
for u, v in edges:
graph[u].append(v)
in_degree[v] += 1
# 拓扑排序
topo = []
queue = deque([i for i in range(num_vertices) if in_degree[i] == 0])
while queue:
node = queue.popleft()
topo.append(node)
for neighbor in graph[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
# 按拓扑序做 DP
dist = [0] * num_vertices # dist[v] = 从任意起点到 v 的最长路径
for node in topo:
for neighbor in graph[node]:
w = weights.get((node, neighbor), 1)
dist[neighbor] = max(dist[neighbor], dist[node] + w)
return max(dist)
# 示例
edges = [(0,1), (0,2), (1,3), (2,3), (3,4)]
weights = {(0,1): 3, (0,2): 2, (1,3): 4, (2,3): 1, (3,4): 5}
print(dag_longest_path(5, edges, weights)) # 0→1→3→4 = 3+4+5 = 12
经典例子:最长递增子序列(LIS)
LIS 问题可以建模为 DAG:如果 a[i] < a[j] 且 i < j,则有边 i→j。LIS 就是这个 DAG 的最长路径。虽然实际中我们用 O(n log n) 的贪心+二分解法,但 DAG 视角帮助理解问题本质。
17.11 层次化拓扑排序
有时我们不仅需要一个合法的拓扑顺序,还需要知道每个节点的"层级"——即它最早可以在第几轮被处理。
def topological_sort_with_levels(num_vertices: int, edges: list) -> list:
"""
分层拓扑排序:返回每一层的节点列表
同一层的节点之间没有依赖关系,可以并行处理
"""
graph = defaultdict(list)
in_degree = [0] * num_vertices
for u, v in edges:
graph[u].append(v)
in_degree[v] += 1
current_level = [i for i in range(num_vertices) if in_degree[i] == 0]
levels = []
while current_level:
levels.append(current_level)
next_level = []
for node in current_level:
for neighbor in graph[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
next_level.append(neighbor)
current_level = next_level
# 验证是否所有节点都被处理
total = sum(len(level) for level in levels)
if total != num_vertices:
return [] # 有环
return levels
# 示例
edges = [(0,2), (1,2), (2,3), (2,4), (3,5), (4,5)]
levels = topological_sort_with_levels(6, edges)
print(f"层次: {levels}")
# [[0, 1], [2], [3, 4], [5]]
# 第1层: 0和1可以并行
# 第2层: 2(依赖0和1)
# 第3层: 3和4可以并行
# 第4层: 5(依赖3和4)
层数的含义:层数 = 从任意入度为 0 的节点到当前节点的最长路径。这等价于"最早完成时间"的概念。
Level 3 · 规范怎么定义的
17.12 Kahn 1962 年论文:拓扑排序的 BFS 算法
1962年,Arthur B. Kahn 发表论文《Topological sorting of large networks》(Communications of the ACM, Vol. 5, Issue 11),首次形式化描述了我们今天所说的"Kahn 算法"。
历史背景:1960 年代初,计算机刚开始用于大型工程管理(如 PERT 网络图、CPM 关键路径分析)。这些应用需要对数百甚至数千个任务按依赖关系排序。Kahn 的论文直接回应了这一工程需求。
Kahn 的原始描述(翻译简化):
"给定一个有限有向图表示的偏序关系,算法如下:
- 找到所有没有前驱的节点,输出它们(它们之间的顺序任意)。
- 从图中移除这些节点及其出发的边。
- 重复步骤 1-2,直到图为空。 如果图非空但找不到没有前驱的节点,则图包含环。"
关键贡献:
- 形式化了拓扑排序问题:明确定义了偏序关系的线性化
- 给出了 O(V + E) 的算法:这在当时的计算机能力下非常重要——效率高意味着能处理更大的网络
- 同时提供了环检测:算法自然地在有环时终止并报告
- 讨论了并行性:Kahn 指出,每轮中所有入度为 0 的节点可以并行处理——这个观察在 60 年后的并行编译系统中依然核心
算法复杂度分析(Kahn 原文的论证方式):
- 每个顶点恰好被加入队列一次、移出队列一次 → O(V) 的顶点处理
- 每条边在其起始顶点被移除时恰好被检查一次 → O(E) 的边处理
- 总计 O(V + E)
这个分析与 BFS 的复杂度分析完全一致——事实上,Kahn 算法就是一种特殊的 BFS。
17.13 Tarjan 的 DFS 拓扑排序
Robert Tarjan 在 1972 年的论文《Depth-First Search and Linear Graph Algorithms》(SIAM Journal on Computing)中系统化了 DFS 在图算法中的应用,其中包括基于 DFS 完成时间的拓扑排序。
Tarjan 的洞察:DFS 的完成时间(finish time)自然给出了逆拓扑序。这个观察看似简单,但它统一了多个看似不相关的图算法:
- 拓扑排序:按 finish time 逆序
- 强连通分量(Tarjan 算法 / Kosaraju 算法):利用 finish time 进行两遍 DFS
- 割点和桥:利用 DFS 树的结构
为什么 Tarjan 的方法更"深刻"?
Kahn 的算法是"构造性的"——它直接构建排序。Tarjan 的方法则揭示了拓扑排序与图的深度结构的关系:
- 完成时间反映了"依赖深度":一个节点的 finish time 大意味着它是更"浅"的依赖(更早完成 = 更多东西依赖于它)
- 回边(back edge)的存在与环的存在等价——这比"找不到入度为 0 的节点"更具结构洞察
Tarjan 的正确性证明(简化):
设 G = (V, E) 是 DAG,对于任何边 (u, v) ∈ E:
- 在 DFS 访问 u 时,v 要么是白色(未访问),要么是黑色(已完成)
- v 不可能是灰色(因为灰色意味着 v 是 u 的祖先,加上边 (u,v) 就形成了环——与 DAG 矛盾)
- 如果 v 是白色:v 成为 u 的后代,f[v] < f[u]
- 如果 v 是黑色:f[v] < f[u](v 在 u 之前完成)
- 两种情况下 f[u] > f[v]
因此,按 f 值降序排列的顶点序列满足:对任何边 (u,v),u 排在 v 前面。这正是拓扑排序的定义。□
Tarjan vs Kahn 的哲学差异:
| Kahn | Tarjan |
|---|---|
| 自底向上(Bottom-up) | 自顶向下(Top-down) |
| "先做能做的" | "深入到底再回来" |
| 增量式(适合动态图) | 全局式(一次 DFS 遍历) |
| 直觉更容易 | 揭示更深结构 |
17.14 偏序与全序:拓扑排序的数学基础
偏序(Partial Order) 是一个满足自反性、反对称性、传递性的关系。DAG 的可达性定义了一个偏序:如果从 u 可达 v,则 u ≤ v。但不是所有顶点对都可比较(不可达的两个顶点之间没有偏序关系)。
全序(Total Order) 要求所有元素对都可比较。
拓扑排序 = 偏序的线性扩展:将偏序扩展为全序,使得偏序中的所有关系在全序中都被保持。
Dilworth 定理(1950):在任何有限偏序集中,最大反链(互不可比较的元素集合)的大小等于将偏序集分解为最少链(完全可比较的子集)的数量。
应用到拓扑排序:最大反链 = 并行编译时同一层最多可以同时处理的任务数。
def max_parallel_width(num_vertices: int, edges: list) -> int:
"""
计算 DAG 的最大宽度(最大反链大小)
即任何合法调度中,同时进行的任务最大数量
等价于分层拓扑排序中最大层的大小
"""
levels = topological_sort_with_levels(num_vertices, edges)
if not levels:
return 0 # 有环
return max(len(level) for level in levels)
17.15 拓扑排序在现代系统中的应用
包管理器(npm, pip, apt)
每当你运行 pip install tensorflow 时,包管理器需要:
- 解析 tensorflow 的所有依赖(递归)
- 构建依赖图
- 检测循环依赖(如果有,报错)
- 拓扑排序确定安装顺序
- 按序安装(或尽可能并行安装独立的包)
电子表格(Excel/Google Sheets)
单元格之间的公式引用形成 DAG。当你修改一个单元格时,电子表格需要按拓扑序重新计算所有依赖于它的单元格。
class Spreadsheet:
"""简化的电子表格引擎"""
def __init__(self):
self.values = {} # cell → value
self.formulas = {} # cell → formula (as lambda)
self.deps = defaultdict(set) # cell → cells it depends on
self.rdeps = defaultdict(set) # cell → cells that depend on it
def set_formula(self, cell: str, formula, dependencies: list):
"""设置单元格公式"""
# 移除旧的依赖关系
for old_dep in self.deps[cell]:
self.rdeps[old_dep].discard(cell)
self.formulas[cell] = formula
self.deps[cell] = set(dependencies)
for dep in dependencies:
self.rdeps[dep].add(cell)
# 重新计算
self._recalculate(cell)
def set_value(self, cell: str, value):
"""直接设置值(非公式)"""
self.values[cell] = value
self.formulas.pop(cell, None)
self.deps[cell] = set()
# 触发依赖链的重新计算
self._propagate(cell)
def _propagate(self, changed_cell: str):
"""按拓扑序重新计算所有受影响的单元格"""
# BFS 找到所有受影响的单元格
affected = set()
queue = deque([changed_cell])
while queue:
cell = queue.popleft()
for dependent in self.rdeps[cell]:
if dependent not in affected:
affected.add(dependent)
queue.append(dependent)
# 对受影响的单元格做拓扑排序并重新计算
# ...(省略实现细节)
def _recalculate(self, cell: str):
"""重新计算单个单元格"""
if cell in self.formulas:
dep_values = {d: self.values.get(d, 0) for d in self.deps[cell]}
self.values[cell] = self.formulas[cell](dep_values)
React/Vue 的响应式系统
前端框架的响应式更新本质上也是拓扑排序:
- 状态变量是 DAG 的源节点
- 计算属性(computed)是中间节点
- UI 组件是叶节点
- 当源状态改变时,框架按拓扑序更新所有下游依赖
Level 4 · 边界与陷阱
17.16 LeetCode 207:课程表(详细面试分析)
我们在 Level 1 和 Level 2 已经给出了代码实现。这里重点讨论面试中的follow-up 问题和边界情况。
Follow-up 1:如果课程数量极大(10⁶),怎么优化?
# Kahn 算法已经是 O(V+E),无法渐近优化
# 但可以优化常数因子:
# 1. 用数组代替字典(顶点是 0~n-1 的整数)
# 2. 邻接表用 list 而非 defaultdict
# 3. 避免创建新列表(原地修改)
def canFinish_optimized(numCourses: int, prerequisites: list) -> bool:
"""内存优化版本"""
# 预分配数组
graph = [[] for _ in range(numCourses)]
in_degree = [0] * numCourses
for course, prereq in prerequisites:
graph[prereq].append(course)
in_degree[course] += 1
# 用 list 模拟队列(对于纯遍历,list 也够用)
queue = [i for i in range(numCourses) if in_degree[i] == 0]
idx = 0 # 用指针代替 popleft
while idx < len(queue):
node = queue[idx]
idx += 1
for neighbor in graph[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return len(queue) == numCourses
Follow-up 2:找出所有形成环的课程
def find_cycle_courses(numCourses: int, prerequisites: list) -> list:
"""找出所有参与环的课程"""
graph = [[] for _ in range(numCourses)]
in_degree = [0] * numCourses
for course, prereq in prerequisites:
graph[prereq].append(course)
in_degree[course] += 1
# Kahn 算法
queue = deque([i for i in range(numCourses) if in_degree[i] == 0])
removed = set()
while queue:
node = queue.popleft()
removed.add(node)
for neighbor in graph[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
# 剩余未被移除的节点都参与了环
return [i for i in range(numCourses) if i not in removed]
边界情况:
numCourses = 0:返回 True(空图无环)prerequisites = []:所有课程互相独立,返回 True- 自环
[0, 0]:课程依赖自己,有环,返回 False - 重复边:不影响正确性(入度可能多计,但 Kahn 仍然正确)
17.17 LeetCode 210:课程表 II(返回具体排序)
def findOrder(numCourses: int, prerequisites: list) -> list:
"""
LeetCode 210: Course Schedule II
完整的面试级实现
"""
graph = [[] for _ in range(numCourses)]
in_degree = [0] * numCourses
for course, prereq in prerequisites:
graph[prereq].append(course)
in_degree[course] += 1
queue = deque([i for i in range(numCourses) if in_degree[i] == 0])
order = []
while queue:
node = queue.popleft()
order.append(node)
for neighbor in graph[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return order if len(order) == numCourses else []
# 如果要求字典序最小的拓扑排序:
import heapq
def findOrder_lexicographic(numCourses: int, prerequisites: list) -> list:
"""字典序最小的拓扑排序"""
graph = [[] for _ in range(numCourses)]
in_degree = [0] * numCourses
for course, prereq in prerequisites:
graph[prereq].append(course)
in_degree[course] += 1
# 用最小堆代替普通队列
heap = [i for i in range(numCourses) if in_degree[i] == 0]
heapq.heapify(heap)
order = []
while heap:
node = heapq.heappop(heap)
order.append(node)
for neighbor in graph[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
heapq.heappush(heap, neighbor)
return order if len(order) == numCourses else []
# 测试
print(findOrder(4, [[1,0],[2,0],[3,1],[3,2]]))
# 可能输出: [0, 1, 2, 3] 或 [0, 2, 1, 3]
print(findOrder_lexicographic(4, [[1,0],[2,0],[3,1],[3,2]]))
# 一定输出: [0, 1, 2, 3] (字典序最小)
面试要点:
- Kahn 和 DFS 两种方法都要能写
- 如果要字典序最小 → 用 min-heap 代替 queue
- 时间 O(V + E),空间 O(V + E)
17.18 LeetCode 269:外星文字典
题目:给定一组按外星文字母顺序排序的单词,推断出字母的顺序。
分析:这是一道"从排序结果逆推排序规则"的题,本质是构建约束图 + 拓扑排序。
def alienOrder(words: list) -> str:
"""
LeetCode 269: Alien Dictionary
从排好序的单词列表推断字母顺序
思路:
1. 比较相邻单词,找出字母间的偏序关系
2. 构建有向图
3. 拓扑排序
时间: O(C) 其中 C 是所有单词的总字符数
空间: O(1) 字母表大小固定为 26
"""
# 步骤1:收集所有出现的字母
all_chars = set()
for word in words:
all_chars.update(word)
# 步骤2:比较相邻单词,提取偏序关系
graph = defaultdict(set)
in_degree = {c: 0 for c in all_chars}
for i in range(len(words) - 1):
word1, word2 = words[i], words[i + 1]
# 特殊情况:如果 word1 是 word2 的前缀的超集,无效
# 例如 ["abc", "ab"] → 违反排序规则
if len(word1) > len(word2) and word1[:len(word2)] == word2:
return "" # 无效输入
# 找到第一个不同的字符
for c1, c2 in zip(word1, word2):
if c1 != c2:
if c2 not in graph[c1]:
graph[c1].add(c2)
in_degree[c2] += 1
break # 只有第一个不同字符给出信息
# 步骤3:Kahn 拓扑排序
queue = deque([c for c in all_chars if in_degree[c] == 0])
result = []
while queue:
char = queue.popleft()
result.append(char)
for neighbor in graph[char]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
# 如果不是所有字符都被排序,说明有环
if len(result) != len(all_chars):
return ""
return "".join(result)
# 测试
print(alienOrder(["wrt", "wrf", "er", "ett", "rftt"]))
# 输出: "wertf"
# 推理过程:
# "wrt" vs "wrf": t 在 f 之前 → t→f
# "wrf" vs "er": w 在 e 之前 → w→e
# "er" vs "ett": r 在 t 之前 → r→t
# "ett" vs "rftt": e 在 r 之前 → e→r
# 图: w→e→r→t→f
# 拓扑排序: wertf
print(alienOrder(["z", "x"])) # "zx"
print(alienOrder(["z", "x", "z"])) # "" (z<x 且 x<z,矛盾→环)
面试要点:
- 关键洞察:只有相邻单词的比较才有意义(非相邻单词的关系是传递推导的)
- 只取第一个不同字符:后续字符的比较无效(因为排序只看第一个不同位置)
- 边界情况:
- 输入无效(如 ["abc", "ab"])
- 只有一个单词(所有字母顺序任意)
- 有环(矛盾的约束)
- 如果要求唯一解:当拓扑排序的某一步有多个入度为 0 的节点,说明顺序不唯一
17.19 拓扑排序的陷阱总结
陷阱1:边的方向
不同题目的边方向约定不同!
# 课程表: prerequisites[i] = [a, b] 表示 "要学 a,先学 b"
# 边方向: b → a (b 是前置)
for a, b in prerequisites:
graph[b].append(a)
# 外星文字典: 如果 c1 在 c2 前面
# 边方向: c1 → c2
graph[c1].add(c2)
陷阱2:不连通的 DAG
# 如果图有多个连通分量(孤立节点),它们在拓扑排序中可以任意排列
# Kahn 算法自然处理:所有孤立节点入度为 0,都会被入队
# 但某些题目可能要求特定顺序(如字典序最小)
陷阱3:重边
# 如果有重边 [(0,1), (0,1)],入度会被多算
# 解决方案1:建图时去重
graph = defaultdict(set) # 用 set 而非 list
# 解决方案2:先去重再建图
edges = list(set(edges))
陷阱4:以为拓扑排序能处理有环图
# 拓扑排序只适用于 DAG!
# 如果图有环,必须先检测/报告,不能强行排序
# Kahn 算法的返回长度 < V 就是环的信号
# DFS 的灰色节点重遇就是环的信号
17.20 面试中拓扑排序题的通用模板
def topological_sort_template(problem_input):
"""
拓扑排序面试题通用模板
关键步骤:
1. 识别"这是拓扑排序"(依赖关系、先后顺序、DAG)
2. 建图(注意边的方向!)
3. 选择算法(Kahn 更通用,DFS 更简洁)
4. 处理边界(环、孤立节点、不唯一性)
"""
# 步骤1:从问题描述中提取依赖关系
# 确定:什么是顶点?什么是边?边的方向?
# 步骤2:建图
graph = defaultdict(list)
in_degree = defaultdict(int)
all_nodes = set()
for dependency in problem_input:
# 根据题意确定 u, v
u, v = extract_edge(dependency)
graph[u].append(v)
in_degree[v] += 1
all_nodes.add(u)
all_nodes.add(v)
# 确保所有节点都有入度记录
for node in all_nodes:
if node not in in_degree:
in_degree[node] = 0
# 步骤3:Kahn 算法
queue = deque([n for n in all_nodes if in_degree[n] == 0])
result = []
while queue:
node = queue.popleft()
result.append(node)
for neighbor in graph[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
# 步骤4:检查是否成功
if len(result) != len(all_nodes):
return "IMPOSSIBLE" # 有环
return result
识别拓扑排序题的信号词:
- "先修/前置条件"
- "依赖关系"
- "顺序/排列"
- "能否完成"
- "任务调度"
- "编译顺序"
17.21 本章总结
| 概念 | 核心要点 |
|---|---|
| 拓扑排序 | DAG 的顶点线性排列,尊重所有边的方向 |
| Kahn 算法 | BFS:入度为 0 的先输出,O(V+E) |
| DFS 拓扑排序 | 按完成时间逆序,O(V+E) |
| 环检测 | Kahn: result < V; DFS: 遇到灰色节点 |
| 关键路径 | DAG 最长路径 = 项目最短工期 |
| 并行调度 | 分层 Kahn = 最大并行度 |
拓扑排序是有向图中最基本也最重要的算法之一。它的应用远远超出了"排序"本身——编译器、包管理器、电子表格、响应式框架、项目管理工具,这些日常使用的工具背后都有拓扑排序的身影。理解了拓扑排序,你就理解了"在依赖约束下做最优调度"这一类问题的核心。