第 12 章

堆与优先队列

第十二章:堆与优先队列

上一章我们学习了二叉搜索树,它支持高效的搜索、插入和删除。但有一类问题不需要全序维护——你只关心"当前最小(或最大)的元素是什么"。比如:操作系统要调度优先级最高的进程、Dijkstra 算法要取出距离最近的节点、实时数据流中要持续追踪中位数。

堆(Heap) 就是为这类"快速取极值"场景设计的数据结构。它只保证父节点比子节点更优(更小或更大),而不维护完整的有序关系。这个"弱一点"的约束换来了极好的性能:插入 O(log n)、取极值 O(1)、删除极值 O(log n)——而且堆可以用一个简单的数组实现,无需任何指针。

优先队列(Priority Queue) 是堆的抽象接口:插入一个元素、取出优先级最高的元素。几乎所有语言的标准库都有优先队列的实现——Python 的 heapq、Java 的 PriorityQueue、C++ 的 priority_queue

这一章的目标:(1) 彻底理解堆的结构和操作原理;(2) 熟练使用 Python heapq 模块解决实际问题;(3) 掌握堆排序算法;(4) 在面试和工程中识别"需要用堆"的场景。


Level 1 · 你需要知道的

12.1 堆的概念

是一棵完全二叉树,满足堆性质(heap property)

什么是完全二叉树?除了最后一层外,所有层都是满的,并且最后一层的节点从左到右连续排列(没有间隙)。

最小堆示例:
         1
       /   \
      3     2
     / \   /
    7   4  5

堆性质:每个父节点 ≤ 其子节点
根节点 1 是全局最小值
注意:3 和 2 之间没有大小关系——堆不保证兄弟节点的顺序

核心洞察:堆是一种"偏序"结构,它只保证父子之间的关系,不保证同层节点的顺序。这比 BST 的"全序"弱很多,但也因此获得了更简单的实现和更好的缓存性能。

12.2 数组表示

完全二叉树有一个美妙的性质:它可以用数组紧凑地表示,不需要任何指针。

如果我们把完全二叉树的节点按层序(从上到下、从左到右)编号为 0, 1, 2, ...,那么:

数组:[1, 3, 2, 7, 4, 5]
索引: 0  1  2  3  4  5

对应的树:
         1(0)
       /      \
     3(1)    2(2)
    /   \    /
  7(3) 4(4) 5(5)

验证:
- 节点 3 的索引是 1,父节点 = (1-1)//2 = 0 → 值为 1 ✓
- 节点 3 的左子 = 2*1+1 = 3 → 值为 7 ✓
- 节点 3 的右子 = 2*1+2 = 4 → 值为 4 ✓

为什么数组表示好?

  1. 零额外空间:不需要 left/right 指针,每个节点只存值
  2. 缓存友好:数组元素在内存中连续存放,CPU 预取效率极高
  3. 索引计算 O(1):位运算即可找到父子关系

这是堆比链式二叉树(如 BST、AVL 树)在实践中快得多的根本原因——不是渐近复杂度更优,而是常数因子小得多。

12.3 堆的基本操作

堆只需要两个核心操作:上浮(sift up)下沉(sift down)。所有高层操作(插入、删除、建堆)都建立在这两个操作之上。

sift_up:新元素上浮

当我们在堆的末尾插入一个新元素时,它可能比父节点小(违反最小堆性质)。我们需要让它不断"上浮",直到找到正确位置。

def sift_up(heap, i):
    """将索引 i 处的元素上浮到正确位置(最小堆)"""
    while i > 0:
        parent = (i - 1) // 2
        if heap[i] < heap[parent]:
            heap[i], heap[parent] = heap[parent], heap[i]
            i = parent
        else:
            break  # 已满足堆性质

复杂度:最坏情况下从叶子上浮到根,经过 O(log n) 层。

sift_down:根元素下沉

当我们删除堆顶(取出最小值)后,把最后一个元素放到根的位置,它可能比子节点大(违反堆性质)。我们需要让它不断"下沉",每次和较小的子节点交换。

def sift_down(heap, i, size):
    """将索引 i 处的元素下沉到正确位置(最小堆)"""
    while True:
        smallest = i
        left = 2 * i + 1
        right = 2 * i + 2
        
        if left < size and heap[left] < heap[smallest]:
            smallest = left
        if right < size and heap[right] < heap[smallest]:
            smallest = right
        
        if smallest == i:
            break  # 已满足堆性质
        
        heap[i], heap[smallest] = heap[smallest], heap[i]
        i = smallest

为什么选较小的子节点交换? 因为交换后 smallest 成为新的父节点,它必须同时小于另一个子节点。如果我们和较大的子节点交换,它可能比另一个子节点大,破坏堆性质。

复杂度:最坏情况下从根下沉到叶子,O(log n)。

插入操作

def heap_insert(heap, val):
    """向最小堆中插入一个元素"""
    heap.append(val)           # 放在末尾(保持完全二叉树性质)
    sift_up(heap, len(heap) - 1)  # 上浮到正确位置

删除堆顶操作

def heap_extract_min(heap):
    """取出并返回最小堆的堆顶(最小值)"""
    if not heap:
        raise IndexError("heap is empty")
    
    min_val = heap[0]
    # 用最后一个元素替换根
    heap[0] = heap[-1]
    heap.pop()
    # 下沉新的根
    if heap:
        sift_down(heap, 0, len(heap))
    
    return min_val

为什么用最后一个元素替换根? 直接删除根会破坏完全二叉树的结构。用最后一个元素填充根位置,只需要一次 sift_down 就能恢复堆性质,而且保持了完全二叉树的形状。

12.4 Python heapq 模块

Python 标准库的 heapq 模块提供了堆操作函数,直接在列表上操作(列表本身就是堆的数组表示)。

import heapq

# 创建堆
heap = []
heapq.heappush(heap, 3)
heapq.heappush(heap, 1)
heapq.heappush(heap, 2)
print(heap)  # [1, 3, 2] — 最小值在 heap[0]

# 弹出最小值
min_val = heapq.heappop(heap)
print(min_val)  # 1
print(heap)     # [2, 3]

# 将普通列表转为堆(原地操作,O(n))
data = [5, 3, 8, 1, 2, 7]
heapq.heapify(data)
print(data)  # [1, 2, 7, 5, 3, 8] — 注意:不是排序!只满足堆性质

# 查看堆顶但不弹出
print(data[0])  # 1 — 直接访问索引 0

# nlargest / nsmallest — 找前 K 大/小
nums = [5, 3, 8, 1, 2, 7, 4, 6]
print(heapq.nlargest(3, nums))   # [8, 7, 6]
print(heapq.nsmallest(3, nums))  # [1, 2, 3]

heapq 的关键特性

操作 函数 复杂度 说明
插入 heappush(heap, item) O(log n) 推入元素
弹出最小 heappop(heap) O(log n) 弹出并返回最小值
查看最小 heap[0] O(1) 直接索引访问
建堆 heapify(list) O(n) 原地将列表变为堆
推入并弹出 heappushpop(heap, item) O(log n) 比先 push 再 pop 快
弹出并推入 heapreplace(heap, item) O(log n) 比先 pop 再 push 快
前 K 大 nlargest(k, iterable) O(n log k) 内部用堆实现
前 K 小 nsmallest(k, iterable) O(n log k) 内部用堆实现

12.5 最大堆:取反技巧

Python heapq 只支持最小堆。如果你需要最大堆,标准做法是对值取反

import heapq

# 最大堆:存储 -val,取出时再取反
max_heap = []
heapq.heappush(max_heap, -5)
heapq.heappush(max_heap, -3)
heapq.heappush(max_heap, -8)

# 取出最大值
max_val = -heapq.heappop(max_heap)
print(max_val)  # 8

对于存储元组的情况:

# 按优先级的最大堆(优先级, 任务)
tasks = []
heapq.heappush(tasks, (-priority, task_id, task))

# 取出最高优先级
neg_pri, tid, task = heapq.heappop(tasks)
actual_priority = -neg_pri

12.6 Top-K 问题

问题:从 n 个元素中找出最大的 K 个(或最小的 K 个)。

方法一:排序 — O(n log n),简单但当 K << n 时浪费。

方法二:最小堆,大小为 K — O(n log K),最佳平衡方案。

核心思路:维护一个大小为 K 的最小堆,堆顶是"K 个最大元素中最小的"。遍历所有元素,如果当前元素比堆顶大,就替换堆顶。

import heapq

def top_k_largest(nums, k):
    """找出数组中最大的 K 个元素"""
    if k >= len(nums):
        return sorted(nums, reverse=True)
    
    # 方法一:使用 nlargest(推荐)
    return heapq.nlargest(k, nums)

def top_k_largest_manual(nums, k):
    """手动维护大小为 K 的最小堆"""
    # 初始化:取前 K 个元素建堆
    heap = nums[:k]
    heapq.heapify(heap)
    
    # 遍历剩余元素
    for num in nums[k:]:
        if num > heap[0]:  # 比堆顶大,替换
            heapq.heapreplace(heap, num)
    
    return sorted(heap, reverse=True)

# 示例
nums = [3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5]
print(top_k_largest(nums, 3))         # [9, 6, 5]
print(top_k_largest_manual(nums, 3))  # [9, 6, 5]

为什么用最小堆找最大的 K 个? 反直觉但很巧妙:

如果用最大堆,你需要把所有 n 个元素都放进去再弹出 K 个,空间 O(n)。而最小堆只需要 O(K) 空间。

LeetCode #215 — 数组中的第 K 个最大元素

def findKthLargest(nums, k):
    """返回数组中第 K 个最大的元素"""
    # 方法一:heapq.nlargest(最直接)
    return heapq.nlargest(k, nums)[-1]
    
    # 方法二:维护大小为 K 的最小堆
    # heap = nums[:k]
    # heapq.heapify(heap)
    # for num in nums[k:]:
    #     if num > heap[0]:
    #         heapq.heapreplace(heap, num)
    # return heap[0]

12.7 常见使用模式总结

import heapq

# 模式 1:事件调度(按时间排序)
events = []
heapq.heappush(events, (timestamp, event_type, data))

# 模式 2:多路归并(从多个有序源合并)
# heapq.merge(*sorted_iterables)  # 返回惰性迭代器

# 模式 3:带去重/延迟删除的优先队列
# 标记已删除的元素,弹出时跳过
removed = set()
while heap:
    val = heapq.heappop(heap)
    if val not in removed:
        process(val)
        break

Level 2 · 它是怎么运行的

12.8 手写完整的堆实现

下面是一个完整的最小堆类实现,展示了每个操作的内部逻辑:

class MinHeap:
    """完整的最小堆实现"""
    
    def __init__(self):
        self._data = []
    
    def __len__(self):
        return len(self._data)
    
    def __bool__(self):
        return len(self._data) > 0
    
    def peek(self):
        """查看堆顶元素,O(1)"""
        if not self._data:
            raise IndexError("heap is empty")
        return self._data[0]
    
    def insert(self, val):
        """插入元素,O(log n)"""
        self._data.append(val)
        self._sift_up(len(self._data) - 1)
    
    def extract_min(self):
        """取出并返回最小值,O(log n)"""
        if not self._data:
            raise IndexError("heap is empty")
        
        min_val = self._data[0]
        last = self._data.pop()
        if self._data:
            self._data[0] = last
            self._sift_down(0)
        return min_val
    
    def _sift_up(self, i):
        """上浮操作"""
        while i > 0:
            parent = (i - 1) // 2
            if self._data[i] < self._data[parent]:
                self._data[i], self._data[parent] = self._data[parent], self._data[i]
                i = parent
            else:
                break
    
    def _sift_down(self, i):
        """下沉操作"""
        n = len(self._data)
        while True:
            smallest = i
            left = 2 * i + 1
            right = 2 * i + 2
            
            if left < n and self._data[left] < self._data[smallest]:
                smallest = left
            if right < n and self._data[right] < self._data[smallest]:
                smallest = right
            
            if smallest == i:
                break
            
            self._data[i], self._data[smallest] = self._data[smallest], self._data[i]
            i = smallest
    
    @classmethod
    def build_heap(cls, items):
        """从列表构建堆,O(n)"""
        heap = cls()
        heap._data = list(items)
        # 自底向上:从最后一个非叶节点开始 sift_down
        n = len(heap._data)
        for i in range(n // 2 - 1, -1, -1):
            heap._sift_down(i)
        return heap
    
    def __repr__(self):
        return f"MinHeap({self._data})"


# 使用示例
heap = MinHeap()
for val in [5, 3, 8, 1, 2, 7]:
    heap.insert(val)
print(heap)  # MinHeap([1, 2, 7, 5, 3, 8])

while heap:
    print(heap.extract_min(), end=" ")  # 1 2 3 5 7 8

12.9 build_heap 的 O(n) 复杂度证明

直觉上,建堆似乎应该是 O(n log n)——n 个元素每个做 O(log n) 的 sift_down。但实际上,自底向上建堆的总时间是 O(n)。

关键观察:不同层的节点做 sift_down 的代价不同。

设堆有 h = ⌊log₂n⌋ 层(从 0 开始编号):

总工作量:

T(n) = Σ(k=0 to h-1) 2^k × (h - k)

令 j = h - k(从底部数的层数):

T(n) = Σ(j=1 to h) 2^(h-j) × j
     = 2^h × Σ(j=1 to h) j / 2^j

级数 Σ(j=1 to ∞) j/2^j 收敛于 2(这是一个经典的幂级数结果)。

因此 T(n) ≤ 2^h × 2 = 2^(h+1) = O(n)。

直觉解释:叶子节点占了近一半的节点数,但它们完全不需要移动。靠近底部的节点很多但移动距离短;靠近顶部的节点移动距离长但数量极少。这种"多数节点做少量工作"的模式使得总和收敛到 O(n)。

为什么不自顶向下建堆? 如果我们对每个元素调用 insert(即 sift_up),那么第 k 层有 2^k 个节点,每个上浮 k 步,总和 Σ k×2^k = O(n log n)。自顶向下的模式恰好相反——"少数节点做少量工作,多数节点做大量工作"——所以更慢。

12.10 堆排序

堆排序由 J.W.J. Williams 在 1964 年发明(与堆这个数据结构一同提出)。它是一种原地排序算法,最坏情况 O(n log n),不需要额外空间。

算法步骤

  1. 将数组原地建成最大堆:O(n)
  2. 重复 n-1 次:将堆顶(最大值)和末尾交换,堆大小减 1,对新堆顶 sift_down
def heap_sort(arr):
    """堆排序:原地、不稳定、O(n log n)"""
    n = len(arr)
    
    # 第一步:建最大堆(自底向上)
    for i in range(n // 2 - 1, -1, -1):
        _sift_down_max(arr, i, n)
    
    # 第二步:逐步取出最大值放到末尾
    for i in range(n - 1, 0, -1):
        arr[0], arr[i] = arr[i], arr[0]  # 堆顶与末尾交换
        _sift_down_max(arr, 0, i)         # 对缩小的堆做 sift_down


def _sift_down_max(arr, i, size):
    """最大堆的 sift_down"""
    while True:
        largest = i
        left = 2 * i + 1
        right = 2 * i + 2
        
        if left < size and arr[left] > arr[largest]:
            largest = left
        if right < size and arr[right] > arr[largest]:
            largest = right
        
        if largest == i:
            break
        
        arr[i], arr[largest] = arr[largest], arr[i]
        i = largest


# 示例
data = [5, 3, 8, 1, 2, 7, 4, 6]
heap_sort(data)
print(data)  # [1, 2, 3, 4, 5, 6, 7, 8]

堆排序的性能分析

度量 说明
时间(最好) O(n log n) 即使数组已排序也需要 O(n log n)
时间(平均) O(n log n)
时间(最坏) O(n log n) 保证,不会退化
空间 O(1) 原地排序
稳定性 不稳定 交换会打乱相等元素的顺序

堆排序 vs 快速排序 vs 归并排序

实践中,堆排序很少用于通用排序(快速排序更快),但它在需要严格空间限制 + 最坏情况保证的场景下有用(如 Linux 内核中的 sort 函数使用 introsort,在递归过深时 fallback 到堆排序)。

12.11 合并 K 个有序链表(LeetCode #23)

问题:给定 K 个已排序的链表,将它们合并为一个有序链表。

暴力:每次从 K 个链表头中找最小的,需要比较 K 次,总共 N 个节点 → O(NK)。

堆优化:用最小堆维护 K 个链表的当前头节点。每次取堆顶(全局最小),然后把该链表的下一个节点入堆。

import heapq

class ListNode:
    def __init__(self, val=0, next=None):
        self.val = val
        self.next = next

def mergeKLists(lists):
    """合并 K 个有序链表,时间 O(N log K),空间 O(K)"""
    # 最小堆:(节点值, 链表索引, 节点)
    # 链表索引用于打破相同 val 时的比较
    heap = []
    
    for i, head in enumerate(lists):
        if head:
            heapq.heappush(heap, (head.val, i, head))
    
    dummy = ListNode(0)
    curr = dummy
    
    while heap:
        val, i, node = heapq.heappop(heap)
        curr.next = node
        curr = curr.next
        
        if node.next:
            heapq.heappush(heap, (node.next.val, i, node.next))
    
    return dummy.next

复杂度分析

为什么用链表索引 i? Python 的 heapq 比较元组时会逐个字段比。如果两个节点的 val 相同,它会比较第二个字段。ListNode 没有定义 __lt__,直接比较会报错。加一个唯一的索引 i 就能打破平局。

12.12 数据流的中位数(LeetCode #295)

问题:设计一个数据结构,支持:

核心思路:用两个堆把数据分成两半:

维护约束:len(lo) == len(hi)len(lo) == len(hi) + 1

中位数 = lo 的堆顶(奇数个),或 (lo堆顶 + hi堆顶) / 2(偶数个)。

import heapq

class MedianFinder:
    def __init__(self):
        self.lo = []  # 最大堆(存负值):较小的一半
        self.hi = []  # 最小堆:较大的一半
    
    def addNum(self, num):
        # 先加入最大堆
        heapq.heappush(self.lo, -num)
        
        # 保证 lo 的最大值 ≤ hi 的最小值
        if self.hi and (-self.lo[0] > self.hi[0]):
            val = -heapq.heappop(self.lo)
            heapq.heappush(self.hi, val)
        
        # 保证 len(lo) >= len(hi)(lo 最多比 hi 多一个)
        if len(self.lo) < len(self.hi):
            val = heapq.heappop(self.hi)
            heapq.heappush(self.lo, -val)
        
        # 保证 lo 不会比 hi 多超过一个
        if len(self.lo) > len(self.hi) + 1:
            val = -heapq.heappop(self.lo)
            heapq.heappush(self.hi, val)
    
    def findMedian(self):
        if len(self.lo) > len(self.hi):
            return -self.lo[0]
        return (-self.lo[0] + self.hi[0]) / 2


# 使用示例
mf = MedianFinder()
mf.addNum(1)    # [1]         → 中位数 = 1
mf.addNum(2)    # [1, 2]      → 中位数 = 1.5
mf.addNum(3)    # [1, 2, 3]   → 中位数 = 2
print(mf.findMedian())  # 2
mf.addNum(4)    # [1, 2, 3, 4] → 中位数 = 2.5
print(mf.findMedian())  # 2.5

复杂度

为什么两个堆这么巧妙? 中位数把数据分成大小相等的两半。最大堆的堆顶是"小的那一半中最大的",最小堆的堆顶是"大的那一半中最小的"——它们恰好是中位数的左右邻居。

12.13 优先队列在 Dijkstra 中的应用

Dijkstra 算法求单源最短路径时,每轮需要找"未确定节点中距离最小的"。这正是优先队列的适用场景。

import heapq

def dijkstra(graph, start):
    """
    Dijkstra 最短路径算法(优先队列实现)
    graph: 邻接表 {node: [(neighbor, weight), ...]}
    返回: {node: shortest_distance}
    """
    dist = {start: 0}
    heap = [(0, start)]  # (距离, 节点)
    visited = set()
    
    while heap:
        d, u = heapq.heappop(heap)
        
        if u in visited:
            continue  # 已确定最短距离,跳过
        visited.add(u)
        
        for v, w in graph.get(u, []):
            new_dist = d + w
            if v not in dist or new_dist < dist[v]:
                dist[v] = new_dist
                heapq.heappush(heap, (new_dist, v))
    
    return dist


# 示例
graph = {
    'A': [('B', 1), ('C', 4)],
    'B': [('C', 2), ('D', 5)],
    'C': [('D', 1)],
    'D': []
}
print(dijkstra(graph, 'A'))  # {'A': 0, 'B': 1, 'C': 3, 'D': 4}

注意:这个实现使用了"lazy deletion"——同一个节点可能在堆中出现多次(因为我们没有 decrease-key 操作)。通过 visited 集合跳过已处理的节点来保证正确性。

复杂度:使用二叉堆时,Dijkstra 的复杂度是 O((V + E) log V)。如果使用斐波那契堆(支持 O(1) 的 decrease-key),可以优化到 O(V log V + E)——但在实践中二叉堆通常更快,因为常数因子更小。

关于图算法的完整讨论,请参考第 18 章。


Level 3 · 规范怎么定义的

12.14 二叉堆的历史

二叉堆由 J.W.J. Williams 在 1964 年发明,发表在论文 "Algorithm 232: Heapsort"(Communications of the ACM, Vol. 7, No. 6, June 1964)。Williams 同时提出了堆这个数据结构和堆排序算法——这在计算机科学史上很罕见,一个数据结构和它的主要应用在同一篇论文中诞生。

同年,Robert W. Floyd 在 "Algorithm 245: Treesort 3"(Communications of the ACM, Vol. 7, No. 12, December 1964)中改进了 Williams 的方法,提出了自底向上建堆的 O(n) 算法。Floyd 的贡献使得堆排序的总时间复杂度保持在 O(n log n) 的同时,建堆阶段从 O(n log n) 降低到 O(n)。

这两篇论文奠定了堆在算法领域的基础地位。此后六十年,二叉堆一直是实现优先队列的标准方法——简单、高效、空间紧凑。

12.15 斐波那契堆

1987 年,Michael L. FredmanRobert E. Tarjan 在论文 "Fibonacci heaps and their uses in improved network optimization algorithms"(Journal of the ACM, Vol. 34, No. 3, July 1987)中提出了斐波那契堆。

动机:Dijkstra 算法和 Prim 算法中,频繁执行的操作是 decrease-key(降低某个节点的优先级)。在二叉堆中,decrease-key 需要 O(log n)(先找到元素,再 sift_up)。如果能把 decrease-key 降到 O(1) 摊还,整个算法就更快。

斐波那契堆的操作复杂度

操作 二叉堆 斐波那契堆(摊还)
insert O(log n) O(1)
find-min O(1) O(1)
extract-min O(log n) O(log n)
decrease-key O(log n) O(1)
merge O(n) O(1)

核心思想:斐波那契堆是一组"无序的"堆有序树的集合。insert 和 decrease-key 都是"懒惰"的——只做最小量的结构调整,把真正的重组推迟到 extract-min 时进行。这种"延迟合并"策略使得 extract-min 虽然可能很慢(O(n) 最坏),但摊还下来仍然是 O(log n)。

为什么叫"斐波那契"? 因为通过 cascading cut 操作维护的结构约束保证了度为 d 的子树至少包含 F(d+2) 个节点(其中 F 是斐波那契数列)。这确保了最大度数为 O(log n),从而保证 extract-min 的 O(log n) 摊还复杂度。

实践中的地位:尽管理论上更优,斐波那契堆在实践中很少使用,原因包括:

  1. 常数因子大(每个节点需要维护 parent、child、left、right 指针以及 degree 和 mark 标记)
  2. 缓存性能差(指针结构导致内存跳跃访问)
  3. 实现复杂,容易出错
  4. 对于实际规模的图(几百万节点以内),二叉堆 + lazy deletion 通常更快

但斐波那契堆在理论算法分析中至关重要——它给出了许多图算法的最优复杂度上界。

12.16 d-ary 堆

标准二叉堆每个节点有 2 个子节点。自然的推广是 d-ary 堆:每个节点有 d 个子节点。

数组表示

复杂度变化

最优选择

实践中:d = 4(四叉堆)是一个常见的优化选择。四叉堆比二叉堆更扁,减少了 sift_up 的层数;同时每层比较 4 个子节点的开销在现代 CPU 上(SIMD 指令)可以被很好地优化。LevelDB 和某些 Java 实现使用了四叉堆。

12.17 堆在操作系统中的应用

定时器管理

操作系统需要管理大量定时器(网络超时、进程时间片、周期性任务等)。核心操作是"找到最近要触发的定时器"——这正是最小堆的应用场景。

Linux 内核早期使用链表管理定时器(O(1) 插入但 O(n) 查找最近到期的),后来演化为分级时间轮(timing wheel)和红黑树方案。但在用户态(如 libuv、Go runtime),堆仍然是定时器管理的主要数据结构。

Go 运行时的定时器堆:Go 1.14 之前使用全局四叉堆管理所有 goroutine 的定时器。1.14 之后改为每个 P(处理器)维护一个本地的四叉堆,减少锁争用。

进程调度

优先级调度算法中,就绪进程按优先级排列,每次选最高优先级的进程运行。这可以用最大堆实现。但现代操作系统(如 Linux CFS 调度器)使用红黑树而非堆,因为它需要高效地找到"虚拟运行时间最小的进程"并且支持任意删除。

内存分配

某些内存分配器使用堆来管理空闲块,按块大小排序,快速找到满足请求的最小(best-fit)或最大(worst-fit)空闲块。


Level 4 · 边界与陷阱

12.18 heapq 的坑

坑 1:没有 decrease-key

Python heapq 没有直接的 decrease-key 操作。如果你需要修改堆中某个元素的优先级,有两种策略:

策略一:Lazy deletion(推荐)

import heapq

class LazyPriorityQueue:
    """支持优先级更新的优先队列(延迟删除方式)"""
    
    def __init__(self):
        self._heap = []
        self._entry_finder = {}  # key -> [priority, key, valid]
        self._counter = 0
    
    def push(self, key, priority):
        """插入或更新 key 的优先级"""
        if key in self._entry_finder:
            # 标记旧条目为无效
            self._entry_finder[key][-1] = False
        
        entry = [priority, self._counter, key, True]
        self._counter += 1
        self._entry_finder[key] = entry
        heapq.heappush(self._heap, entry)
    
    def pop(self):
        """弹出优先级最高(最小)的有效元素"""
        while self._heap:
            priority, count, key, valid = heapq.heappop(self._heap)
            if valid:
                del self._entry_finder[key]
                return key, priority
        raise KeyError("priority queue is empty")
    
    def __contains__(self, key):
        return key in self._entry_finder and self._entry_finder[key][-1]


# 使用示例
pq = LazyPriorityQueue()
pq.push('task_a', 5)
pq.push('task_b', 3)
pq.push('task_a', 1)  # 更新 task_a 的优先级为 1
print(pq.pop())  # ('task_a', 1) — 正确返回更新后的优先级
print(pq.pop())  # ('task_b', 3)

代价:堆中可能积累大量无效条目,导致空间和时间开销增加。如果 decrease-key 非常频繁,考虑用 sortedcontainers.SortedList 替代。

坑 2:没有 remove

heapq 没有高效的按值删除操作。list.remove(val) 是 O(n) 的,而且删除后需要 re-heapify。同样的解决方案是 lazy deletion。

坑 3:元组比较规则

import heapq

# 正确:元组第一个元素是可比较的优先级
heap = []
heapq.heappush(heap, (1, "task_a"))
heapq.heappush(heap, (2, "task_b"))  # ✓

# 危险:如果优先级相同,Python 会比较第二个元素
heapq.heappush(heap, (1, "task_c"))  # 可能 OK(字符串可比较)

# 错误:第二个元素不可比较时报错
class Task:
    def __init__(self, name):
        self.name = name

# heapq.heappush(heap, (1, Task("x")))
# heapq.heappush(heap, (1, Task("y")))
# TypeError: '<' not supported between instances of 'Task' and 'Task'

# 解决方案:加一个唯一计数器作为 tiebreaker
counter = 0
def push_task(heap, priority, task):
    global counter
    counter += 1
    heapq.heappush(heap, (priority, counter, task))

坑 4:heapq 是最小堆,但面试常要最大堆

# 错误:忘记取反
import heapq
heap = [1, 2, 3, 4, 5]
heapq.heapify(heap)
print(heapq.heappop(heap))  # 1,不是 5!

# 正确:取反实现最大堆
max_heap = [-x for x in [1, 2, 3, 4, 5]]
heapq.heapify(max_heap)
print(-heapq.heappop(max_heap))  # 5

12.19 面试高频题详解

题目 1:数据流的中位数(LeetCode #295)

已在 12.12 节详细讲解。核心考点:两个堆维护动态数据的中位数。

面试追问

题目 2:前 K 个高频元素(LeetCode #347)

import heapq
from collections import Counter

def topKFrequent(nums, k):
    """返回出现频率前 K 高的元素"""
    count = Counter(nums)
    # 方法一:nlargest
    return heapq.nlargest(k, count.keys(), key=count.get)
    
    # 方法二:大小为 K 的最小堆
    # return [item for item, freq in heapq.nlargest(k, count.items(), key=lambda x: x[1])]


# 示例
print(topKFrequent([1,1,1,2,2,3], 2))  # [1, 2]

方法三(最优):桶排序 O(n)

def topKFrequent_bucket(nums, k):
    """桶排序方法,O(n)"""
    count = Counter(nums)
    # 桶:索引是频率,值是具有该频率的元素列表
    buckets = [[] for _ in range(len(nums) + 1)]
    for num, freq in count.items():
        buckets[freq].append(num)
    
    result = []
    for freq in range(len(buckets) - 1, -1, -1):
        for num in buckets[freq]:
            result.append(num)
            if len(result) == k:
                return result
    return result

题目 3:丑数 II(LeetCode #264)

问题:找第 n 个丑数(只含因子 2, 3, 5 的正整数:1, 2, 3, 4, 5, 6, 8, 9, 10, 12, ...)

import heapq

def nthUglyNumber(n):
    """最小堆方法"""
    heap = [1]
    seen = {1}
    
    for _ in range(n):
        ugly = heapq.heappop(heap)
        for factor in [2, 3, 5]:
            new_ugly = ugly * factor
            if new_ugly not in seen:
                seen.add(new_ugly)
                heapq.heappush(heap, new_ugly)
    
    return ugly


# 更优的三指针方法(O(n) 时间,O(n) 空间)
def nthUglyNumber_dp(n):
    """动态规划 + 三指针"""
    dp = [0] * n
    dp[0] = 1
    p2 = p3 = p5 = 0  # 三个指针
    
    for i in range(1, n):
        next2, next3, next5 = dp[p2] * 2, dp[p3] * 3, dp[p5] * 5
        dp[i] = min(next2, next3, next5)
        if dp[i] == next2: p2 += 1
        if dp[i] == next3: p3 += 1
        if dp[i] == next5: p5 += 1
    
    return dp[n - 1]


print(nthUglyNumber(10))     # 12
print(nthUglyNumber_dp(10))  # 12

堆方法的复杂度:O(n log n)——每个丑数入堆出堆各一次。三指针方法更优:O(n) 时间。

题目 4:会议室 II(LeetCode #253)

问题:给定一组会议的开始和结束时间,求最少需要多少间会议室。

import heapq

def minMeetingRooms(intervals):
    """
    贪心 + 最小堆
    堆中存储当前正在使用的会议室的结束时间
    """
    if not intervals:
        return 0
    
    # 按开始时间排序
    intervals.sort(key=lambda x: x[0])
    
    # 最小堆:存储各会议室的结束时间
    rooms = []
    heapq.heappush(rooms, intervals[0][1])
    
    for i in range(1, len(intervals)):
        start, end = intervals[i]
        
        # 如果最早结束的会议室已经空了,复用它
        if rooms[0] <= start:
            heapq.heappop(rooms)
        
        # 分配会议室(新的或复用的)
        heapq.heappush(rooms, end)
    
    return len(rooms)


# 示例
intervals = [[0, 30], [5, 10], [15, 20]]
print(minMeetingRooms(intervals))  # 2

intervals2 = [[7, 10], [2, 4]]
print(minMeetingRooms(intervals2))  # 1

思路:按开始时间排序后,对每个会议检查"最早结束的会议室"是否可以复用。最小堆让我们 O(log n) 地找到最早结束的房间。最终堆的大小就是最少需要的会议室数。

12.20 堆 vs 排序 vs 快速选择:Top-K 问题三解法对比

方法 时间复杂度 空间复杂度 是否需要所有数据 输出有序?
完整排序 O(n log n) O(1)~O(n)
最小堆(大小 K) O(n log K) O(K) 否(流式) 否(需额外排序)
快速选择 O(n) 平均 O(1)

如何选择?

  1. K 接近 n(如 K > n/2):直接排序最简单
  2. K 很小且数据是流式的(不能一次性放入内存):堆是唯一选择
  3. K 很小且数据可随机访问:快速选择(Quickselect)平均最快
  4. 需要输出有序结果且 K 较小:堆 + 排序堆结果
  5. 需要最坏情况保证:堆(快速选择最坏 O(n²),除非用中位数的中位数)
import heapq
import random

def quickselect(nums, k):
    """快速选择:找第 K 大的元素,平均 O(n)"""
    target = len(nums) - k  # 转换为第 target 小
    
    def select(left, right):
        pivot_idx = random.randint(left, right)
        nums[pivot_idx], nums[right] = nums[right], nums[pivot_idx]
        pivot = nums[right]
        
        store = left
        for i in range(left, right):
            if nums[i] < pivot:
                nums[i], nums[store] = nums[store], nums[i]
                store += 1
        nums[store], nums[right] = nums[right], nums[store]
        
        if store == target:
            return nums[store]
        elif store < target:
            return select(store + 1, right)
        else:
            return select(left, store - 1)
    
    return select(0, len(nums) - 1)


# 三种方法对比
nums = [3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5]
k = 3

# 方法 1:排序
sorted_result = sorted(nums, reverse=True)[:k]
print(f"排序法: {sorted_result}")  # [9, 6, 5]

# 方法 2:堆
heap_result = heapq.nlargest(k, nums)
print(f"堆方法: {heap_result}")    # [9, 6, 5]

# 方法 3:快速选择(只找第 K 大,不排序)
kth = quickselect(nums[:], k)  # 传副本,因为会修改原数组
print(f"快速选择第{k}大: {kth}")   # 5

12.21 什么时候用堆,什么时候用有序容器

用堆(heapq)的场景

用有序容器(如 sortedcontainers.SortedList)的场景

# heapq 无法高效做的事:
# 1. 删除特定元素(只能 lazy deletion)
# 2. 找到"第 3 小的元素"(只能取 3 次堆顶)
# 3. 修改已有元素的优先级(decrease-key)

# sortedcontainers 可以做:
from sortedcontainers import SortedList

sl = SortedList([5, 3, 8, 1, 2])
sl.remove(3)        # O(log n) 按值删除
print(sl[2])        # O(log n) 按索引访问 → 5
sl.add(4)           # O(log n) 插入
print(sl.bisect_left(4))  # O(log n) 二分查找位置

经验法则:如果你只需要"快速取极值",用 heapq。如果你需要更丰富的查询能力,用 SortedList。如果你在写 LeetCode 且需要有序容器但不想导入第三方库,可以用手写 BST 或者用两个堆模拟。

12.22 实战建议

  1. Python 面试中,heapq 是你的首选工具。90% 的堆题用 heapq 就能解决,不需要手写堆。

  2. 识别"需要用堆"的信号词

    • "前 K 个..."
    • "第 K 大/小..."
    • "动态数据中的中位数/极值..."
    • "合并多个有序..."
    • "最短路径..."(Dijkstra)
    • "贪心选择最优..."
  3. heapq 的时间复杂度备忘

    • push/pop: O(log n)
    • heapify: O(n)
    • nlargest/nsmallest: O(n log k) 或 O(n + k log n)
  4. 元组技巧:将 (priority, counter, data) 作为堆元素可以解决所有比较问题。

  5. 不要过度使用堆:如果数据总量小(< 1000),直接排序通常更简单且足够快。堆的优势在数据量大且 K 远小于 n 时才明显。


本章小结

堆是一种简洁而强大的数据结构。它的核心洞察是:如果你只关心极值,就不需要维护完整的排序。用完全二叉树的结构约束加上弱化的有序性(只保证父子关系),堆在数组上实现了 O(1) 取极值和 O(log n) 修改。

从工程角度看,堆的数组实现是缓存最友好的树结构——没有指针、没有碎片、内存连续。这是它在实践中常常跑赢理论更优但指针结构复杂的方案(如斐波那契堆)的根本原因。

下一章我们将学习哈希表——另一种用"弱约束"换"高性能"的设计哲学。哈希表放弃了所有顺序信息,只追求 O(1) 的精确查找。

本章评分
4.6  / 5  (30 评分)

💬 留言讨论