第 17 章

拓扑排序与有向图

第十七章:拓扑排序与有向图

上一章我们学习了图的表示和遍历——BFS 和 DFS。在图的众多应用中,有一类问题极其常见且重要:给定一组任务和它们之间的依赖关系,找到一个合法的执行顺序。 例如:大学课程有先修关系,你必须先修完线性代数才能学机器学习;软件项目中模块 A 依赖模块 B,必须先编译 B 才能编译 A;烹饪一道菜,必须先切菜才能下锅。

这就是拓扑排序(Topological Sort) 要解决的问题。它只适用于有向无环图(DAG, Directed Acyclic Graph)——如果有环(A 依赖 B,B 依赖 C,C 依赖 A),则不可能存在合法的顺序。

本章我们将:(1) 理解拓扑排序的概念和性质;(2) 掌握两种经典实现——BFS(Kahn 算法)和 DFS;(3) 将拓扑排序应用于课程表、编译依赖、关键路径等实际问题;(4) 解决相关的高频面试题。


Level 1 · 你需要知道的

17.1 什么是拓扑排序?

定义:给定有向无环图 G = (V, E),拓扑排序是 V 的一个线性序列,使得对于图中每条有向边 (u, v),u 在序列中出现在 v 之前。

直观理解:拓扑排序是一种"尊重依赖关系的排列"。如果 A 必须在 B 之前完成(存在边 A→B),那么排序后 A 一定排在 B 前面。

示例:课程依赖关系
  数学 → 物理 → 量子力学
  数学 → 线性代数 → 机器学习
  编程 → 数据结构 → 算法

有效的拓扑排序:
  [数学, 编程, 物理, 线性代数, 数据结构, 量子力学, 机器学习, 算法]
  [编程, 数学, 数据结构, 物理, 线性代数, 算法, 量子力学, 机器学习]
  ... (多种合法排序)

注意:拓扑排序通常不唯一!

关键性质

  1. 存在性:拓扑排序存在 ⟺ 图是 DAG(无环)
  2. 非唯一性:一个 DAG 通常有多种合法的拓扑排序
  3. 只对有向图有意义:无向图没有"方向",无法定义"前后"关系

17.2 BFS 实现:Kahn 算法

Kahn 算法(1962)的思想极其优雅:

  1. 找到所有入度为 0的顶点(没有任何依赖的任务)——这些可以最先执行
  2. 将它们加入结果,并将它们从图中"移除"(即将它们指向的顶点的入度减 1)
  3. 重复步骤 1-2,直到所有顶点都被处理,或者找不到入度为 0 的顶点(说明有环)
from collections import deque, defaultdict

def topological_sort_kahn(num_vertices: int, edges: list) -> list:
    """
    Kahn 算法(BFS 拓扑排序)
    
    参数:
        num_vertices: 顶点数量 (0 到 num_vertices-1)
        edges: 边列表 [(u, v), ...] 表示 u → v 的依赖
    
    返回:
        拓扑排序列表,如果有环则返回空列表
    
    时间复杂度: O(V + E)
    空间复杂度: O(V + E)
    """
    # 建图 + 计算入度
    graph = defaultdict(list)
    in_degree = [0] * num_vertices
    
    for u, v in edges:
        graph[u].append(v)
        in_degree[v] += 1
    
    # 所有入度为 0 的顶点入队
    queue = deque()
    for i in range(num_vertices):
        if in_degree[i] == 0:
            queue.append(i)
    
    result = []
    
    while queue:
        node = queue.popleft()
        result.append(node)
        
        # "移除" node:将其邻居的入度减 1
        for neighbor in graph[node]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)
    
    # 如果结果包含所有顶点,则排序成功(无环)
    if len(result) == num_vertices:
        return result
    else:
        return []  # 有环,无法完成拓扑排序


# 示例
#   0 → 1 → 3
#   0 → 2 → 3
#   2 → 4
edges = [(0,1), (0,2), (1,3), (2,3), (2,4)]
print(topological_sort_kahn(5, edges))  # [0, 1, 2, 3, 4] 或 [0, 2, 1, 4, 3] 等

# 有环的情况
edges_cycle = [(0,1), (1,2), (2,0)]
print(topological_sort_kahn(3, edges_cycle))  # [] (有环)

为什么 Kahn 算法有效?

直觉:入度为 0 的顶点没有任何前置依赖,所以可以安全地排在当前位置。移除它们后,新暴露出的入度为 0 的顶点同样可以安全排列。这个过程如同"层层剥洋葱"。

正确性证明

17.3 DFS 实现

DFS 拓扑排序的关键洞察:一个顶点在 DFS 中"完成"的时刻,意味着它的所有后继(依赖它的节点)都已经被探索完毕。 因此,按"完成时间"的逆序排列就是拓扑排序。

def topological_sort_dfs(num_vertices: int, edges: list) -> list:
    """
    DFS 拓扑排序
    
    核心思想:按 DFS 完成时间的逆序排列
    
    时间复杂度: O(V + E)
    空间复杂度: O(V + E)
    """
    graph = defaultdict(list)
    for u, v in edges:
        graph[u].append(v)
    
    # 三色标记检测环
    WHITE, GRAY, BLACK = 0, 1, 2
    color = [WHITE] * num_vertices
    result = []
    has_cycle = False
    
    def dfs(node):
        nonlocal has_cycle
        if has_cycle:
            return
        
        color[node] = GRAY
        
        for neighbor in graph[node]:
            if color[neighbor] == GRAY:
                has_cycle = True  # 发现回边 → 有环
                return
            if color[neighbor] == WHITE:
                dfs(neighbor)
                if has_cycle:
                    return
        
        color[node] = BLACK
        result.append(node)  # 完成时加入结果(后续会逆序)
    
    for i in range(num_vertices):
        if color[i] == WHITE:
            dfs(i)
            if has_cycle:
                return []
    
    result.reverse()  # 逆序得到拓扑排序
    return result


# 示例
edges = [(0,1), (0,2), (1,3), (2,3), (2,4)]
print(topological_sort_dfs(5, edges))  # 一种合法的拓扑排序

# 有环
edges_cycle = [(0,1), (1,2), (2,0)]
print(topological_sort_dfs(3, edges_cycle))  # []

为什么"完成时间逆序"是拓扑排序?

对于任何边 (u, v):

总之,在 DAG 中,对于边 (u, v),总有 f[u] > f[v]。按完成时间降序排列,u 排在 v 前面。正是拓扑排序的定义。

17.4 环检测与拓扑排序的关系

环检测和拓扑排序是一体两面:

因此,很多"判断是否有环"的题目,实质上就是"尝试做拓扑排序"。

Kahn 算法的环检测:如果处理完所有入度为 0 的节点后,result 的长度 < V,说明有些节点永远不会变成入度 0——它们形成了环。

DFS 的环检测:如果 DFS 过程中遇到灰色节点(正在递归栈中的节点),说明找到了回边——有环。

def detect_cycle_with_path(num_vertices: int, edges: list) -> list:
    """
    检测有向图中的环,并返回环上的节点
    返回环的节点列表(如果有环),否则返回空列表
    """
    graph = defaultdict(list)
    for u, v in edges:
        graph[u].append(v)
    
    WHITE, GRAY, BLACK = 0, 1, 2
    color = [WHITE] * num_vertices
    parent = [-1] * num_vertices
    cycle = []
    
    def dfs(node):
        color[node] = GRAY
        for neighbor in graph[node]:
            if color[neighbor] == GRAY:
                # 找到环!从 neighbor 到 node 再回到 neighbor
                path = [neighbor, node]
                cur = node
                while parent[cur] != neighbor and parent[cur] != -1:
                    cur = parent[cur]
                    path.append(cur)
                cycle.extend(reversed(path))
                return True
            if color[neighbor] == WHITE:
                parent[neighbor] = node
                if dfs(neighbor):
                    return True
        color[node] = BLACK
        return False
    
    for i in range(num_vertices):
        if color[i] == WHITE:
            if dfs(i):
                return cycle
    return []


# 示例
edges = [(0,1), (1,2), (2,3), (3,1)]  # 环: 1→2→3→1
print(detect_cycle_with_path(4, edges))

17.5 两种算法的对比

特性 Kahn 算法 (BFS) DFS 拓扑排序
数据结构 队列 + 入度数组 递归栈 + 颜色标记
思路 "入度为 0 先输出" "完成时间逆序"
环检测 result.size < V 遇到灰色节点
字典序最小排序 用最小堆代替普通队列即可 不容易实现
所有拓扑排序 容易扩展为回溯法 较难
在线/增量 较好支持 需要重新跑

什么时候用 Kahn?

什么时候用 DFS?

17.6 常见错误

错误1:混淆边的方向

# 题目说 [a, b] 表示 "a 依赖 b"(学 a 之前要先学 b)
# 建图时边的方向应该是 b → a
# 很多人搞反了!

# 正确
for a, b in prerequisites:
    graph[b].append(a)  # b 是前置,a 是后置
    in_degree[a] += 1

# 错误
for a, b in prerequisites:
    graph[a].append(b)  # 方向反了!

错误2:忘记处理不连通的图

# DAG 可能不连通(有孤立节点或多个连通分量)
# Kahn 算法天然处理:孤立节点入度为 0,直接入队
# DFS:必须对所有未访问节点都调用 dfs()
for i in range(num_vertices):  # 不能只从节点 0 开始!
    if color[i] == WHITE:
        dfs(i)

错误3:以为拓扑排序唯一

# DAG 0→2, 1→2 的合法拓扑排序有 [0,1,2] 和 [1,0,2]
# 如果题目要求唯一答案(如字典序最小),需要特殊处理

Level 2 · 它是怎么运行的

17.7 课程表系列完整解析

LeetCode 207:课程表(判断能否完成)

这道题在上一章已经出现。核心就是判断有向图是否有环。我们这里给出 Kahn 算法的详细过程分析。

def canFinish(numCourses: int, prerequisites: list) -> bool:
    """
    LeetCode 207: Course Schedule
    本质:有向图环检测 = 尝试拓扑排序
    """
    graph = defaultdict(list)
    in_degree = [0] * numCourses
    
    for course, prereq in prerequisites:
        graph[prereq].append(course)  # prereq → course
        in_degree[course] += 1
    
    queue = deque([i for i in range(numCourses) if in_degree[i] == 0])
    processed = 0
    
    while queue:
        node = queue.popleft()
        processed += 1
        for neighbor in graph[node]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)
    
    return processed == numCourses

执行过程追踪

输入: numCourses=4, prerequisites=[[1,0],[2,0],[3,1],[3,2]]
图: 0→1, 0→2, 1→3, 2→3
入度: [0, 1, 1, 2]

初始队列: [0] (入度为0)
步骤1: 处理 0, 邻居 1 入度→0, 邻居 2 入度→0, 队列=[1,2]
步骤2: 处理 1, 邻居 3 入度→1, 队列=[2]
步骤3: 处理 2, 邻居 3 入度→0, 队列=[3]
步骤4: 处理 3, 无邻居, 队列=[]

处理了 4 个节点 = numCourses → 无环 → 可以完成

LeetCode 210:课程表 II(返回修课顺序)

def findOrder(numCourses: int, prerequisites: list) -> list:
    """
    LeetCode 210: Course Schedule II
    返回一个可行的修课顺序(拓扑排序)
    如果无法完成则返回空列表
    """
    graph = defaultdict(list)
    in_degree = [0] * numCourses
    
    for course, prereq in prerequisites:
        graph[prereq].append(course)
        in_degree[course] += 1
    
    queue = deque([i for i in range(numCourses) if in_degree[i] == 0])
    order = []
    
    while queue:
        node = queue.popleft()
        order.append(node)
        for neighbor in graph[node]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)
    
    return order if len(order) == numCourses else []


# 测试
print(findOrder(4, [[1,0],[2,0],[3,1],[3,2]]))  # [0, 1, 2, 3] 或 [0, 2, 1, 3]
print(findOrder(2, [[1,0]]))  # [0, 1]
print(findOrder(2, [[0,1],[1,0]]))  # [] (有环)

DFS 版本(更常见于面试 follow-up):

def findOrder_dfs(numCourses: int, prerequisites: list) -> list:
    """DFS 版本的课程表 II"""
    graph = defaultdict(list)
    for course, prereq in prerequisites:
        graph[prereq].append(course)
    
    WHITE, GRAY, BLACK = 0, 1, 2
    color = [WHITE] * numCourses
    result = []
    
    def dfs(node) -> bool:
        """返回 True 表示无环"""
        color[node] = GRAY
        for neighbor in graph[node]:
            if color[neighbor] == GRAY:
                return False  # 有环
            if color[neighbor] == WHITE:
                if not dfs(neighbor):
                    return False
        color[node] = BLACK
        result.append(node)
        return True
    
    for i in range(numCourses):
        if color[i] == WHITE:
            if not dfs(i):
                return []
    
    return result[::-1]  # 逆序

17.8 编译依赖分析

拓扑排序最经典的工程应用之一是编译系统。在大型软件项目中,源文件之间存在依赖关系(#include、import),编译器必须按正确的顺序编译这些文件。

Make 和 Bazel 的核心原理

  1. 解析所有文件的依赖关系,构建依赖图(DAG)
  2. 对依赖图做拓扑排序,得到编译顺序
  3. 按拓扑序编译(保证每个文件编译时,它依赖的文件都已编译完毕)
class BuildSystem:
    """简化的编译系统"""
    
    def __init__(self):
        self.dependencies = defaultdict(set)  # module → set of dependencies
        self.dependents = defaultdict(set)    # module → set of dependents
    
    def add_dependency(self, module: str, depends_on: str):
        """声明 module 依赖 depends_on"""
        self.dependencies[module].add(depends_on)
        self.dependents[depends_on].add(module)
        # 确保两个模块都在图中
        if module not in self.dependents:
            self.dependents[module]  # 触发 defaultdict 创建空集合
        if depends_on not in self.dependencies:
            self.dependencies[depends_on]
    
    def get_build_order(self) -> list:
        """
        返回编译顺序(拓扑排序)
        如果有循环依赖,抛出异常
        """
        # 计算入度
        all_modules = set(self.dependencies.keys()) | set(self.dependents.keys())
        in_degree = {m: len(self.dependencies[m]) for m in all_modules}
        
        # Kahn 算法
        queue = deque([m for m in all_modules if in_degree[m] == 0])
        order = []
        
        while queue:
            module = queue.popleft()
            order.append(module)
            
            for dependent in self.dependents[module]:
                in_degree[dependent] -= 1
                if in_degree[dependent] == 0:
                    queue.append(dependent)
        
        if len(order) != len(all_modules):
            cycle_modules = [m for m in all_modules if in_degree[m] > 0]
            raise ValueError(f"Circular dependency detected: {cycle_modules}")
        
        return order
    
    def parallel_build_plan(self) -> list:
        """
        返回并行编译计划:每一"波"可以并行编译的模块
        这本质上是 BFS 的分层结果
        """
        all_modules = set(self.dependencies.keys()) | set(self.dependents.keys())
        in_degree = {m: len(self.dependencies[m]) for m in all_modules}
        
        queue = deque([m for m in all_modules if in_degree[m] == 0])
        waves = []
        
        while queue:
            wave = list(queue)
            waves.append(wave)
            next_queue = deque()
            
            for module in wave:
                for dependent in self.dependents[module]:
                    in_degree[dependent] -= 1
                    if in_degree[dependent] == 0:
                        next_queue.append(dependent)
            
            queue = next_queue
        
        return waves


# 示例:C++ 项目的编译依赖
build = BuildSystem()
build.add_dependency("main.cpp", "utils.h")
build.add_dependency("main.cpp", "network.h")
build.add_dependency("network.cpp", "network.h")
build.add_dependency("network.cpp", "utils.h")
build.add_dependency("network.h", "utils.h")
build.add_dependency("tests.cpp", "main.cpp")
build.add_dependency("tests.cpp", "network.cpp")

print("编译顺序:", build.get_build_order())
print("并行编译计划:", build.parallel_build_plan())
# Wave 1: [utils.h]  (无依赖)
# Wave 2: [network.h]  (只依赖 utils.h)
# Wave 3: [main.cpp, network.cpp]  (可以并行)
# Wave 4: [tests.cpp]  (依赖上面的)

并行编译的关键洞察:Kahn 算法中,每一"层"(同时入度变为 0 的节点)可以并行处理。这就是 make -j8 能加速编译的原理——它利用拓扑排序的层次结构,在不违反依赖的前提下最大化并行度。

17.9 关键路径(最长路径 / Critical Path)

在项目管理中,关键路径是项目完成所需的最短时间——它是 DAG 中从起点到终点的最长路径(因为所有任务必须完成,瓶颈在最长链上)。

问题定义:给定 DAG,每个顶点有一个权重(任务所需时间),求从起点到终点的最长路径。

关键观察:在 DAG 中求最长路径不是 NP-hard 的(一般图中是)!因为可以利用拓扑排序做动态规划。

def critical_path(num_tasks: int, edges: list, durations: list) -> tuple:
    """
    关键路径算法
    
    参数:
        num_tasks: 任务数量
        edges: [(u, v), ...] 表示 u 必须在 v 之前完成
        durations: 每个任务的耗时
    
    返回:
        (最早完成时间, 关键路径上的任务列表)
    """
    graph = defaultdict(list)
    reverse_graph = defaultdict(list)
    in_degree = [0] * num_tasks
    
    for u, v in edges:
        graph[u].append(v)
        reverse_graph[v].append(u)
        in_degree[v] += 1
    
    # 步骤1:拓扑排序
    topo_order = []
    queue = deque([i for i in range(num_tasks) if in_degree[i] == 0])
    temp_in_degree = in_degree[:]
    
    while queue:
        node = queue.popleft()
        topo_order.append(node)
        for neighbor in graph[node]:
            temp_in_degree[neighbor] -= 1
            if temp_in_degree[neighbor] == 0:
                queue.append(neighbor)
    
    # 步骤2:按拓扑序计算最早开始时间(EST)
    # EST[v] = max(EST[u] + duration[u]) 对所有 u→v
    est = [0] * num_tasks
    predecessor = [-1] * num_tasks
    
    for node in topo_order:
        for neighbor in graph[node]:
            new_est = est[node] + durations[node]
            if new_est > est[neighbor]:
                est[neighbor] = new_est
                predecessor[neighbor] = node
    
    # 步骤3:找到最早完成时间(项目总耗时)
    # 找所有"汇点"(出度为 0 的节点),取 EST + duration 的最大值
    project_end_time = 0
    end_task = -1
    for i in range(num_tasks):
        finish_time = est[i] + durations[i]
        if finish_time > project_end_time:
            project_end_time = finish_time
            end_task = i
    
    # 步骤4:回溯关键路径
    critical = []
    cur = end_task
    while cur != -1:
        critical.append(cur)
        cur = predecessor[cur]
    critical.reverse()
    
    return project_end_time, critical


# 示例:软件项目
# 任务: 0=需求分析(3天), 1=设计(5天), 2=编码(8天), 3=测试(4天), 4=文档(2天), 5=部署(1天)
tasks = 6
durations = [3, 5, 8, 4, 2, 1]
deps = [
    (0, 1),  # 需求→设计
    (0, 4),  # 需求→文档
    (1, 2),  # 设计→编码
    (2, 3),  # 编码→测试
    (3, 5),  # 测试→部署
    (4, 5),  # 文档→部署
]

total_time, critical = critical_path(tasks, deps, durations)
print(f"项目最早完成时间: {total_time} 天")
print(f"关键路径: {critical}")
# 关键路径: 需求(3) → 设计(5) → 编码(8) → 测试(4) → 部署(1) = 21天
# 文档路径: 需求(3) → 文档(2) → 部署(1) = 6天(非关键)

关键路径方法(CPM, Critical Path Method) 的工程意义:

  1. 确定项目最短工期:关键路径的总长度就是项目不可压缩的最短时间
  2. 识别瓶颈:关键路径上的任务延迟 1 天,项目就延迟 1 天;非关键任务有"浮动时间"
  3. 资源分配:非关键路径上的任务可以延后,把资源集中到关键路径上

17.10 拓扑排序与动态规划

拓扑排序和动态规划有深刻的联系:DAG 上的 DP 就是按拓扑序计算

许多 DP 问题本质上是在 DAG 上做运算:

def dag_longest_path(num_vertices: int, edges: list, weights: dict) -> int:
    """
    DAG 上的最长路径(带边权)
    weights: {(u,v): weight} 边权字典
    """
    graph = defaultdict(list)
    in_degree = [0] * num_vertices
    
    for u, v in edges:
        graph[u].append(v)
        in_degree[v] += 1
    
    # 拓扑排序
    topo = []
    queue = deque([i for i in range(num_vertices) if in_degree[i] == 0])
    while queue:
        node = queue.popleft()
        topo.append(node)
        for neighbor in graph[node]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)
    
    # 按拓扑序做 DP
    dist = [0] * num_vertices  # dist[v] = 从任意起点到 v 的最长路径
    
    for node in topo:
        for neighbor in graph[node]:
            w = weights.get((node, neighbor), 1)
            dist[neighbor] = max(dist[neighbor], dist[node] + w)
    
    return max(dist)


# 示例
edges = [(0,1), (0,2), (1,3), (2,3), (3,4)]
weights = {(0,1): 3, (0,2): 2, (1,3): 4, (2,3): 1, (3,4): 5}
print(dag_longest_path(5, edges, weights))  # 0→1→3→4 = 3+4+5 = 12

经典例子:最长递增子序列(LIS)

LIS 问题可以建模为 DAG:如果 a[i] < a[j] 且 i < j,则有边 i→j。LIS 就是这个 DAG 的最长路径。虽然实际中我们用 O(n log n) 的贪心+二分解法,但 DAG 视角帮助理解问题本质。

17.11 层次化拓扑排序

有时我们不仅需要一个合法的拓扑顺序,还需要知道每个节点的"层级"——即它最早可以在第几轮被处理。

def topological_sort_with_levels(num_vertices: int, edges: list) -> list:
    """
    分层拓扑排序:返回每一层的节点列表
    同一层的节点之间没有依赖关系,可以并行处理
    """
    graph = defaultdict(list)
    in_degree = [0] * num_vertices
    
    for u, v in edges:
        graph[u].append(v)
        in_degree[v] += 1
    
    current_level = [i for i in range(num_vertices) if in_degree[i] == 0]
    levels = []
    
    while current_level:
        levels.append(current_level)
        next_level = []
        
        for node in current_level:
            for neighbor in graph[node]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    next_level.append(neighbor)
        
        current_level = next_level
    
    # 验证是否所有节点都被处理
    total = sum(len(level) for level in levels)
    if total != num_vertices:
        return []  # 有环
    
    return levels


# 示例
edges = [(0,2), (1,2), (2,3), (2,4), (3,5), (4,5)]
levels = topological_sort_with_levels(6, edges)
print(f"层次: {levels}")
# [[0, 1], [2], [3, 4], [5]]
# 第1层: 0和1可以并行
# 第2层: 2(依赖0和1)
# 第3层: 3和4可以并行
# 第4层: 5(依赖3和4)

层数的含义:层数 = 从任意入度为 0 的节点到当前节点的最长路径。这等价于"最早完成时间"的概念。


Level 3 · 规范怎么定义的

17.12 Kahn 1962 年论文:拓扑排序的 BFS 算法

1962年,Arthur B. Kahn 发表论文《Topological sorting of large networks》(Communications of the ACM, Vol. 5, Issue 11),首次形式化描述了我们今天所说的"Kahn 算法"。

历史背景:1960 年代初,计算机刚开始用于大型工程管理(如 PERT 网络图、CPM 关键路径分析)。这些应用需要对数百甚至数千个任务按依赖关系排序。Kahn 的论文直接回应了这一工程需求。

Kahn 的原始描述(翻译简化):

"给定一个有限有向图表示的偏序关系,算法如下:

  1. 找到所有没有前驱的节点,输出它们(它们之间的顺序任意)。
  2. 从图中移除这些节点及其出发的边。
  3. 重复步骤 1-2,直到图为空。 如果图非空但找不到没有前驱的节点,则图包含环。"

关键贡献

  1. 形式化了拓扑排序问题:明确定义了偏序关系的线性化
  2. 给出了 O(V + E) 的算法:这在当时的计算机能力下非常重要——效率高意味着能处理更大的网络
  3. 同时提供了环检测:算法自然地在有环时终止并报告
  4. 讨论了并行性:Kahn 指出,每轮中所有入度为 0 的节点可以并行处理——这个观察在 60 年后的并行编译系统中依然核心

算法复杂度分析(Kahn 原文的论证方式):

这个分析与 BFS 的复杂度分析完全一致——事实上,Kahn 算法就是一种特殊的 BFS。

17.13 Tarjan 的 DFS 拓扑排序

Robert Tarjan 在 1972 年的论文《Depth-First Search and Linear Graph Algorithms》(SIAM Journal on Computing)中系统化了 DFS 在图算法中的应用,其中包括基于 DFS 完成时间的拓扑排序。

Tarjan 的洞察:DFS 的完成时间(finish time)自然给出了逆拓扑序。这个观察看似简单,但它统一了多个看似不相关的图算法:

  1. 拓扑排序:按 finish time 逆序
  2. 强连通分量(Tarjan 算法 / Kosaraju 算法):利用 finish time 进行两遍 DFS
  3. 割点和桥:利用 DFS 树的结构

为什么 Tarjan 的方法更"深刻"?

Kahn 的算法是"构造性的"——它直接构建排序。Tarjan 的方法则揭示了拓扑排序与图的深度结构的关系:

Tarjan 的正确性证明(简化):

设 G = (V, E) 是 DAG,对于任何边 (u, v) ∈ E:

因此,按 f 值降序排列的顶点序列满足:对任何边 (u,v),u 排在 v 前面。这正是拓扑排序的定义。□

Tarjan vs Kahn 的哲学差异

Kahn Tarjan
自底向上(Bottom-up) 自顶向下(Top-down)
"先做能做的" "深入到底再回来"
增量式(适合动态图) 全局式(一次 DFS 遍历)
直觉更容易 揭示更深结构

17.14 偏序与全序:拓扑排序的数学基础

偏序(Partial Order) 是一个满足自反性、反对称性、传递性的关系。DAG 的可达性定义了一个偏序:如果从 u 可达 v,则 u ≤ v。但不是所有顶点对都可比较(不可达的两个顶点之间没有偏序关系)。

全序(Total Order) 要求所有元素对都可比较。

拓扑排序 = 偏序的线性扩展:将偏序扩展为全序,使得偏序中的所有关系在全序中都被保持。

Dilworth 定理(1950):在任何有限偏序集中,最大反链(互不可比较的元素集合)的大小等于将偏序集分解为最少链(完全可比较的子集)的数量。

应用到拓扑排序:最大反链 = 并行编译时同一层最多可以同时处理的任务数。

def max_parallel_width(num_vertices: int, edges: list) -> int:
    """
    计算 DAG 的最大宽度(最大反链大小)
    即任何合法调度中,同时进行的任务最大数量
    等价于分层拓扑排序中最大层的大小
    """
    levels = topological_sort_with_levels(num_vertices, edges)
    if not levels:
        return 0  # 有环
    return max(len(level) for level in levels)

17.15 拓扑排序在现代系统中的应用

包管理器(npm, pip, apt)

每当你运行 pip install tensorflow 时,包管理器需要:

  1. 解析 tensorflow 的所有依赖(递归)
  2. 构建依赖图
  3. 检测循环依赖(如果有,报错)
  4. 拓扑排序确定安装顺序
  5. 按序安装(或尽可能并行安装独立的包)

电子表格(Excel/Google Sheets)

单元格之间的公式引用形成 DAG。当你修改一个单元格时,电子表格需要按拓扑序重新计算所有依赖于它的单元格。

class Spreadsheet:
    """简化的电子表格引擎"""
    
    def __init__(self):
        self.values = {}      # cell → value
        self.formulas = {}    # cell → formula (as lambda)
        self.deps = defaultdict(set)  # cell → cells it depends on
        self.rdeps = defaultdict(set)  # cell → cells that depend on it
    
    def set_formula(self, cell: str, formula, dependencies: list):
        """设置单元格公式"""
        # 移除旧的依赖关系
        for old_dep in self.deps[cell]:
            self.rdeps[old_dep].discard(cell)
        
        self.formulas[cell] = formula
        self.deps[cell] = set(dependencies)
        for dep in dependencies:
            self.rdeps[dep].add(cell)
        
        # 重新计算
        self._recalculate(cell)
    
    def set_value(self, cell: str, value):
        """直接设置值(非公式)"""
        self.values[cell] = value
        self.formulas.pop(cell, None)
        self.deps[cell] = set()
        
        # 触发依赖链的重新计算
        self._propagate(cell)
    
    def _propagate(self, changed_cell: str):
        """按拓扑序重新计算所有受影响的单元格"""
        # BFS 找到所有受影响的单元格
        affected = set()
        queue = deque([changed_cell])
        while queue:
            cell = queue.popleft()
            for dependent in self.rdeps[cell]:
                if dependent not in affected:
                    affected.add(dependent)
                    queue.append(dependent)
        
        # 对受影响的单元格做拓扑排序并重新计算
        # ...(省略实现细节)
    
    def _recalculate(self, cell: str):
        """重新计算单个单元格"""
        if cell in self.formulas:
            dep_values = {d: self.values.get(d, 0) for d in self.deps[cell]}
            self.values[cell] = self.formulas[cell](dep_values)

React/Vue 的响应式系统

前端框架的响应式更新本质上也是拓扑排序:


Level 4 · 边界与陷阱

17.16 LeetCode 207:课程表(详细面试分析)

我们在 Level 1 和 Level 2 已经给出了代码实现。这里重点讨论面试中的follow-up 问题边界情况

Follow-up 1:如果课程数量极大(10⁶),怎么优化?

# Kahn 算法已经是 O(V+E),无法渐近优化
# 但可以优化常数因子:
# 1. 用数组代替字典(顶点是 0~n-1 的整数)
# 2. 邻接表用 list 而非 defaultdict
# 3. 避免创建新列表(原地修改)

def canFinish_optimized(numCourses: int, prerequisites: list) -> bool:
    """内存优化版本"""
    # 预分配数组
    graph = [[] for _ in range(numCourses)]
    in_degree = [0] * numCourses
    
    for course, prereq in prerequisites:
        graph[prereq].append(course)
        in_degree[course] += 1
    
    # 用 list 模拟队列(对于纯遍历,list 也够用)
    queue = [i for i in range(numCourses) if in_degree[i] == 0]
    idx = 0  # 用指针代替 popleft
    
    while idx < len(queue):
        node = queue[idx]
        idx += 1
        for neighbor in graph[node]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)
    
    return len(queue) == numCourses

Follow-up 2:找出所有形成环的课程

def find_cycle_courses(numCourses: int, prerequisites: list) -> list:
    """找出所有参与环的课程"""
    graph = [[] for _ in range(numCourses)]
    in_degree = [0] * numCourses
    
    for course, prereq in prerequisites:
        graph[prereq].append(course)
        in_degree[course] += 1
    
    # Kahn 算法
    queue = deque([i for i in range(numCourses) if in_degree[i] == 0])
    removed = set()
    
    while queue:
        node = queue.popleft()
        removed.add(node)
        for neighbor in graph[node]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)
    
    # 剩余未被移除的节点都参与了环
    return [i for i in range(numCourses) if i not in removed]

边界情况

17.17 LeetCode 210:课程表 II(返回具体排序)

def findOrder(numCourses: int, prerequisites: list) -> list:
    """
    LeetCode 210: Course Schedule II
    完整的面试级实现
    """
    graph = [[] for _ in range(numCourses)]
    in_degree = [0] * numCourses
    
    for course, prereq in prerequisites:
        graph[prereq].append(course)
        in_degree[course] += 1
    
    queue = deque([i for i in range(numCourses) if in_degree[i] == 0])
    order = []
    
    while queue:
        node = queue.popleft()
        order.append(node)
        for neighbor in graph[node]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)
    
    return order if len(order) == numCourses else []


# 如果要求字典序最小的拓扑排序:
import heapq

def findOrder_lexicographic(numCourses: int, prerequisites: list) -> list:
    """字典序最小的拓扑排序"""
    graph = [[] for _ in range(numCourses)]
    in_degree = [0] * numCourses
    
    for course, prereq in prerequisites:
        graph[prereq].append(course)
        in_degree[course] += 1
    
    # 用最小堆代替普通队列
    heap = [i for i in range(numCourses) if in_degree[i] == 0]
    heapq.heapify(heap)
    order = []
    
    while heap:
        node = heapq.heappop(heap)
        order.append(node)
        for neighbor in graph[node]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                heapq.heappush(heap, neighbor)
    
    return order if len(order) == numCourses else []


# 测试
print(findOrder(4, [[1,0],[2,0],[3,1],[3,2]]))
# 可能输出: [0, 1, 2, 3] 或 [0, 2, 1, 3]

print(findOrder_lexicographic(4, [[1,0],[2,0],[3,1],[3,2]]))
# 一定输出: [0, 1, 2, 3] (字典序最小)

面试要点

17.18 LeetCode 269:外星文字典

题目:给定一组按外星文字母顺序排序的单词,推断出字母的顺序。

分析:这是一道"从排序结果逆推排序规则"的题,本质是构建约束图 + 拓扑排序。

def alienOrder(words: list) -> str:
    """
    LeetCode 269: Alien Dictionary
    从排好序的单词列表推断字母顺序
    
    思路:
    1. 比较相邻单词,找出字母间的偏序关系
    2. 构建有向图
    3. 拓扑排序
    
    时间: O(C) 其中 C 是所有单词的总字符数
    空间: O(1) 字母表大小固定为 26
    """
    # 步骤1:收集所有出现的字母
    all_chars = set()
    for word in words:
        all_chars.update(word)
    
    # 步骤2:比较相邻单词,提取偏序关系
    graph = defaultdict(set)
    in_degree = {c: 0 for c in all_chars}
    
    for i in range(len(words) - 1):
        word1, word2 = words[i], words[i + 1]
        
        # 特殊情况:如果 word1 是 word2 的前缀的超集,无效
        # 例如 ["abc", "ab"] → 违反排序规则
        if len(word1) > len(word2) and word1[:len(word2)] == word2:
            return ""  # 无效输入
        
        # 找到第一个不同的字符
        for c1, c2 in zip(word1, word2):
            if c1 != c2:
                if c2 not in graph[c1]:
                    graph[c1].add(c2)
                    in_degree[c2] += 1
                break  # 只有第一个不同字符给出信息
    
    # 步骤3:Kahn 拓扑排序
    queue = deque([c for c in all_chars if in_degree[c] == 0])
    result = []
    
    while queue:
        char = queue.popleft()
        result.append(char)
        for neighbor in graph[char]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)
    
    # 如果不是所有字符都被排序,说明有环
    if len(result) != len(all_chars):
        return ""
    
    return "".join(result)


# 测试
print(alienOrder(["wrt", "wrf", "er", "ett", "rftt"]))
# 输出: "wertf"
# 推理过程:
# "wrt" vs "wrf": t 在 f 之前 → t→f
# "wrf" vs "er":  w 在 e 之前 → w→e
# "er" vs "ett":  r 在 t 之前 → r→t
# "ett" vs "rftt": e 在 r 之前 → e→r
# 图: w→e→r→t→f
# 拓扑排序: wertf

print(alienOrder(["z", "x"]))  # "zx"
print(alienOrder(["z", "x", "z"]))  # "" (z<x 且 x<z,矛盾→环)

面试要点

  1. 关键洞察:只有相邻单词的比较才有意义(非相邻单词的关系是传递推导的)
  2. 只取第一个不同字符:后续字符的比较无效(因为排序只看第一个不同位置)
  3. 边界情况
    • 输入无效(如 ["abc", "ab"])
    • 只有一个单词(所有字母顺序任意)
    • 有环(矛盾的约束)
  4. 如果要求唯一解:当拓扑排序的某一步有多个入度为 0 的节点,说明顺序不唯一

17.19 拓扑排序的陷阱总结

陷阱1:边的方向

不同题目的边方向约定不同!

# 课程表: prerequisites[i] = [a, b] 表示 "要学 a,先学 b"
# 边方向: b → a (b 是前置)
for a, b in prerequisites:
    graph[b].append(a)

# 外星文字典: 如果 c1 在 c2 前面
# 边方向: c1 → c2
graph[c1].add(c2)

陷阱2:不连通的 DAG

# 如果图有多个连通分量(孤立节点),它们在拓扑排序中可以任意排列
# Kahn 算法自然处理:所有孤立节点入度为 0,都会被入队
# 但某些题目可能要求特定顺序(如字典序最小)

陷阱3:重边

# 如果有重边 [(0,1), (0,1)],入度会被多算
# 解决方案1:建图时去重
graph = defaultdict(set)  # 用 set 而非 list

# 解决方案2:先去重再建图
edges = list(set(edges))

陷阱4:以为拓扑排序能处理有环图

# 拓扑排序只适用于 DAG!
# 如果图有环,必须先检测/报告,不能强行排序
# Kahn 算法的返回长度 < V 就是环的信号
# DFS 的灰色节点重遇就是环的信号

17.20 面试中拓扑排序题的通用模板

def topological_sort_template(problem_input):
    """
    拓扑排序面试题通用模板
    
    关键步骤:
    1. 识别"这是拓扑排序"(依赖关系、先后顺序、DAG)
    2. 建图(注意边的方向!)
    3. 选择算法(Kahn 更通用,DFS 更简洁)
    4. 处理边界(环、孤立节点、不唯一性)
    """
    # 步骤1:从问题描述中提取依赖关系
    # 确定:什么是顶点?什么是边?边的方向?
    
    # 步骤2:建图
    graph = defaultdict(list)
    in_degree = defaultdict(int)
    all_nodes = set()
    
    for dependency in problem_input:
        # 根据题意确定 u, v
        u, v = extract_edge(dependency)
        graph[u].append(v)
        in_degree[v] += 1
        all_nodes.add(u)
        all_nodes.add(v)
    
    # 确保所有节点都有入度记录
    for node in all_nodes:
        if node not in in_degree:
            in_degree[node] = 0
    
    # 步骤3:Kahn 算法
    queue = deque([n for n in all_nodes if in_degree[n] == 0])
    result = []
    
    while queue:
        node = queue.popleft()
        result.append(node)
        for neighbor in graph[node]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)
    
    # 步骤4:检查是否成功
    if len(result) != len(all_nodes):
        return "IMPOSSIBLE"  # 有环
    
    return result

识别拓扑排序题的信号词

17.21 本章总结

概念 核心要点
拓扑排序 DAG 的顶点线性排列,尊重所有边的方向
Kahn 算法 BFS:入度为 0 的先输出,O(V+E)
DFS 拓扑排序 按完成时间逆序,O(V+E)
环检测 Kahn: result < V; DFS: 遇到灰色节点
关键路径 DAG 最长路径 = 项目最短工期
并行调度 分层 Kahn = 最大并行度

拓扑排序是有向图中最基本也最重要的算法之一。它的应用远远超出了"排序"本身——编译器、包管理器、电子表格、响应式框架、项目管理工具,这些日常使用的工具背后都有拓扑排序的身影。理解了拓扑排序,你就理解了"在依赖约束下做最优调度"这一类问题的核心。

本章评分
4.9  / 5  (15 评分)

💬 留言讨论