堆与优先队列
第十二章:堆与优先队列
上一章我们学习了二叉搜索树,它支持高效的搜索、插入和删除。但有一类问题不需要全序维护——你只关心"当前最小(或最大)的元素是什么"。比如:操作系统要调度优先级最高的进程、Dijkstra 算法要取出距离最近的节点、实时数据流中要持续追踪中位数。
堆(Heap) 就是为这类"快速取极值"场景设计的数据结构。它只保证父节点比子节点更优(更小或更大),而不维护完整的有序关系。这个"弱一点"的约束换来了极好的性能:插入 O(log n)、取极值 O(1)、删除极值 O(log n)——而且堆可以用一个简单的数组实现,无需任何指针。
优先队列(Priority Queue) 是堆的抽象接口:插入一个元素、取出优先级最高的元素。几乎所有语言的标准库都有优先队列的实现——Python 的 heapq、Java 的 PriorityQueue、C++ 的 priority_queue。
这一章的目标:(1) 彻底理解堆的结构和操作原理;(2) 熟练使用 Python heapq 模块解决实际问题;(3) 掌握堆排序算法;(4) 在面试和工程中识别"需要用堆"的场景。
Level 1 · 你需要知道的
12.1 堆的概念
堆 是一棵完全二叉树,满足堆性质(heap property):
- 最小堆(Min-Heap):每个节点的值 ≤ 其子节点的值。根节点是全局最小值。
- 最大堆(Max-Heap):每个节点的值 ≥ 其子节点的值。根节点是全局最大值。
什么是完全二叉树?除了最后一层外,所有层都是满的,并且最后一层的节点从左到右连续排列(没有间隙)。
最小堆示例:
1
/ \
3 2
/ \ /
7 4 5
堆性质:每个父节点 ≤ 其子节点
根节点 1 是全局最小值
注意:3 和 2 之间没有大小关系——堆不保证兄弟节点的顺序
核心洞察:堆是一种"偏序"结构,它只保证父子之间的关系,不保证同层节点的顺序。这比 BST 的"全序"弱很多,但也因此获得了更简单的实现和更好的缓存性能。
12.2 数组表示
完全二叉树有一个美妙的性质:它可以用数组紧凑地表示,不需要任何指针。
如果我们把完全二叉树的节点按层序(从上到下、从左到右)编号为 0, 1, 2, ...,那么:
- 父节点索引:
parent(i) = (i - 1) // 2 - 左子节点索引:
left(i) = 2 * i + 1 - 右子节点索引:
right(i) = 2 * i + 2
数组:[1, 3, 2, 7, 4, 5]
索引: 0 1 2 3 4 5
对应的树:
1(0)
/ \
3(1) 2(2)
/ \ /
7(3) 4(4) 5(5)
验证:
- 节点 3 的索引是 1,父节点 = (1-1)//2 = 0 → 值为 1 ✓
- 节点 3 的左子 = 2*1+1 = 3 → 值为 7 ✓
- 节点 3 的右子 = 2*1+2 = 4 → 值为 4 ✓
为什么数组表示好?
- 零额外空间:不需要 left/right 指针,每个节点只存值
- 缓存友好:数组元素在内存中连续存放,CPU 预取效率极高
- 索引计算 O(1):位运算即可找到父子关系
这是堆比链式二叉树(如 BST、AVL 树)在实践中快得多的根本原因——不是渐近复杂度更优,而是常数因子小得多。
12.3 堆的基本操作
堆只需要两个核心操作:上浮(sift up) 和 下沉(sift down)。所有高层操作(插入、删除、建堆)都建立在这两个操作之上。
sift_up:新元素上浮
当我们在堆的末尾插入一个新元素时,它可能比父节点小(违反最小堆性质)。我们需要让它不断"上浮",直到找到正确位置。
def sift_up(heap, i):
"""将索引 i 处的元素上浮到正确位置(最小堆)"""
while i > 0:
parent = (i - 1) // 2
if heap[i] < heap[parent]:
heap[i], heap[parent] = heap[parent], heap[i]
i = parent
else:
break # 已满足堆性质
复杂度:最坏情况下从叶子上浮到根,经过 O(log n) 层。
sift_down:根元素下沉
当我们删除堆顶(取出最小值)后,把最后一个元素放到根的位置,它可能比子节点大(违反堆性质)。我们需要让它不断"下沉",每次和较小的子节点交换。
def sift_down(heap, i, size):
"""将索引 i 处的元素下沉到正确位置(最小堆)"""
while True:
smallest = i
left = 2 * i + 1
right = 2 * i + 2
if left < size and heap[left] < heap[smallest]:
smallest = left
if right < size and heap[right] < heap[smallest]:
smallest = right
if smallest == i:
break # 已满足堆性质
heap[i], heap[smallest] = heap[smallest], heap[i]
i = smallest
为什么选较小的子节点交换? 因为交换后 smallest 成为新的父节点,它必须同时小于另一个子节点。如果我们和较大的子节点交换,它可能比另一个子节点大,破坏堆性质。
复杂度:最坏情况下从根下沉到叶子,O(log n)。
插入操作
def heap_insert(heap, val):
"""向最小堆中插入一个元素"""
heap.append(val) # 放在末尾(保持完全二叉树性质)
sift_up(heap, len(heap) - 1) # 上浮到正确位置
删除堆顶操作
def heap_extract_min(heap):
"""取出并返回最小堆的堆顶(最小值)"""
if not heap:
raise IndexError("heap is empty")
min_val = heap[0]
# 用最后一个元素替换根
heap[0] = heap[-1]
heap.pop()
# 下沉新的根
if heap:
sift_down(heap, 0, len(heap))
return min_val
为什么用最后一个元素替换根? 直接删除根会破坏完全二叉树的结构。用最后一个元素填充根位置,只需要一次 sift_down 就能恢复堆性质,而且保持了完全二叉树的形状。
12.4 Python heapq 模块
Python 标准库的 heapq 模块提供了堆操作函数,直接在列表上操作(列表本身就是堆的数组表示)。
import heapq
# 创建堆
heap = []
heapq.heappush(heap, 3)
heapq.heappush(heap, 1)
heapq.heappush(heap, 2)
print(heap) # [1, 3, 2] — 最小值在 heap[0]
# 弹出最小值
min_val = heapq.heappop(heap)
print(min_val) # 1
print(heap) # [2, 3]
# 将普通列表转为堆(原地操作,O(n))
data = [5, 3, 8, 1, 2, 7]
heapq.heapify(data)
print(data) # [1, 2, 7, 5, 3, 8] — 注意:不是排序!只满足堆性质
# 查看堆顶但不弹出
print(data[0]) # 1 — 直接访问索引 0
# nlargest / nsmallest — 找前 K 大/小
nums = [5, 3, 8, 1, 2, 7, 4, 6]
print(heapq.nlargest(3, nums)) # [8, 7, 6]
print(heapq.nsmallest(3, nums)) # [1, 2, 3]
heapq 的关键特性:
| 操作 | 函数 | 复杂度 | 说明 |
|---|---|---|---|
| 插入 | heappush(heap, item) |
O(log n) | 推入元素 |
| 弹出最小 | heappop(heap) |
O(log n) | 弹出并返回最小值 |
| 查看最小 | heap[0] |
O(1) | 直接索引访问 |
| 建堆 | heapify(list) |
O(n) | 原地将列表变为堆 |
| 推入并弹出 | heappushpop(heap, item) |
O(log n) | 比先 push 再 pop 快 |
| 弹出并推入 | heapreplace(heap, item) |
O(log n) | 比先 pop 再 push 快 |
| 前 K 大 | nlargest(k, iterable) |
O(n log k) | 内部用堆实现 |
| 前 K 小 | nsmallest(k, iterable) |
O(n log k) | 内部用堆实现 |
12.5 最大堆:取反技巧
Python heapq 只支持最小堆。如果你需要最大堆,标准做法是对值取反:
import heapq
# 最大堆:存储 -val,取出时再取反
max_heap = []
heapq.heappush(max_heap, -5)
heapq.heappush(max_heap, -3)
heapq.heappush(max_heap, -8)
# 取出最大值
max_val = -heapq.heappop(max_heap)
print(max_val) # 8
对于存储元组的情况:
# 按优先级的最大堆(优先级, 任务)
tasks = []
heapq.heappush(tasks, (-priority, task_id, task))
# 取出最高优先级
neg_pri, tid, task = heapq.heappop(tasks)
actual_priority = -neg_pri
12.6 Top-K 问题
问题:从 n 个元素中找出最大的 K 个(或最小的 K 个)。
方法一:排序 — O(n log n),简单但当 K << n 时浪费。
方法二:最小堆,大小为 K — O(n log K),最佳平衡方案。
核心思路:维护一个大小为 K 的最小堆,堆顶是"K 个最大元素中最小的"。遍历所有元素,如果当前元素比堆顶大,就替换堆顶。
import heapq
def top_k_largest(nums, k):
"""找出数组中最大的 K 个元素"""
if k >= len(nums):
return sorted(nums, reverse=True)
# 方法一:使用 nlargest(推荐)
return heapq.nlargest(k, nums)
def top_k_largest_manual(nums, k):
"""手动维护大小为 K 的最小堆"""
# 初始化:取前 K 个元素建堆
heap = nums[:k]
heapq.heapify(heap)
# 遍历剩余元素
for num in nums[k:]:
if num > heap[0]: # 比堆顶大,替换
heapq.heapreplace(heap, num)
return sorted(heap, reverse=True)
# 示例
nums = [3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5]
print(top_k_largest(nums, 3)) # [9, 6, 5]
print(top_k_largest_manual(nums, 3)) # [9, 6, 5]
为什么用最小堆找最大的 K 个? 反直觉但很巧妙:
- 最小堆的堆顶是堆中最小的元素
- 堆中保存的是"目前见过的最大的 K 个"
- 堆顶就是这 K 个中的门槛(最小值)
- 新元素只有超过门槛才能进入"前 K 大"
如果用最大堆,你需要把所有 n 个元素都放进去再弹出 K 个,空间 O(n)。而最小堆只需要 O(K) 空间。
LeetCode #215 — 数组中的第 K 个最大元素:
def findKthLargest(nums, k):
"""返回数组中第 K 个最大的元素"""
# 方法一:heapq.nlargest(最直接)
return heapq.nlargest(k, nums)[-1]
# 方法二:维护大小为 K 的最小堆
# heap = nums[:k]
# heapq.heapify(heap)
# for num in nums[k:]:
# if num > heap[0]:
# heapq.heapreplace(heap, num)
# return heap[0]
12.7 常见使用模式总结
import heapq
# 模式 1:事件调度(按时间排序)
events = []
heapq.heappush(events, (timestamp, event_type, data))
# 模式 2:多路归并(从多个有序源合并)
# heapq.merge(*sorted_iterables) # 返回惰性迭代器
# 模式 3:带去重/延迟删除的优先队列
# 标记已删除的元素,弹出时跳过
removed = set()
while heap:
val = heapq.heappop(heap)
if val not in removed:
process(val)
break
Level 2 · 它是怎么运行的
12.8 手写完整的堆实现
下面是一个完整的最小堆类实现,展示了每个操作的内部逻辑:
class MinHeap:
"""完整的最小堆实现"""
def __init__(self):
self._data = []
def __len__(self):
return len(self._data)
def __bool__(self):
return len(self._data) > 0
def peek(self):
"""查看堆顶元素,O(1)"""
if not self._data:
raise IndexError("heap is empty")
return self._data[0]
def insert(self, val):
"""插入元素,O(log n)"""
self._data.append(val)
self._sift_up(len(self._data) - 1)
def extract_min(self):
"""取出并返回最小值,O(log n)"""
if not self._data:
raise IndexError("heap is empty")
min_val = self._data[0]
last = self._data.pop()
if self._data:
self._data[0] = last
self._sift_down(0)
return min_val
def _sift_up(self, i):
"""上浮操作"""
while i > 0:
parent = (i - 1) // 2
if self._data[i] < self._data[parent]:
self._data[i], self._data[parent] = self._data[parent], self._data[i]
i = parent
else:
break
def _sift_down(self, i):
"""下沉操作"""
n = len(self._data)
while True:
smallest = i
left = 2 * i + 1
right = 2 * i + 2
if left < n and self._data[left] < self._data[smallest]:
smallest = left
if right < n and self._data[right] < self._data[smallest]:
smallest = right
if smallest == i:
break
self._data[i], self._data[smallest] = self._data[smallest], self._data[i]
i = smallest
@classmethod
def build_heap(cls, items):
"""从列表构建堆,O(n)"""
heap = cls()
heap._data = list(items)
# 自底向上:从最后一个非叶节点开始 sift_down
n = len(heap._data)
for i in range(n // 2 - 1, -1, -1):
heap._sift_down(i)
return heap
def __repr__(self):
return f"MinHeap({self._data})"
# 使用示例
heap = MinHeap()
for val in [5, 3, 8, 1, 2, 7]:
heap.insert(val)
print(heap) # MinHeap([1, 2, 7, 5, 3, 8])
while heap:
print(heap.extract_min(), end=" ") # 1 2 3 5 7 8
12.9 build_heap 的 O(n) 复杂度证明
直觉上,建堆似乎应该是 O(n log n)——n 个元素每个做 O(log n) 的 sift_down。但实际上,自底向上建堆的总时间是 O(n)。
关键观察:不同层的节点做 sift_down 的代价不同。
设堆有 h = ⌊log₂n⌋ 层(从 0 开始编号):
- 第 0 层(根):1 个节点,sift_down 最多走 h 步
- 第 1 层:2 个节点,sift_down 最多走 h-1 步
- 第 k 层:2^k 个节点,sift_down 最多走 h-k 步
- 第 h-1 层:2^(h-1) 个节点,sift_down 最多走 1 步
- 第 h 层(叶子):约 n/2 个节点,不需要 sift_down
总工作量:
T(n) = Σ(k=0 to h-1) 2^k × (h - k)
令 j = h - k(从底部数的层数):
T(n) = Σ(j=1 to h) 2^(h-j) × j
= 2^h × Σ(j=1 to h) j / 2^j
级数 Σ(j=1 to ∞) j/2^j 收敛于 2(这是一个经典的幂级数结果)。
因此 T(n) ≤ 2^h × 2 = 2^(h+1) = O(n)。
直觉解释:叶子节点占了近一半的节点数,但它们完全不需要移动。靠近底部的节点很多但移动距离短;靠近顶部的节点移动距离长但数量极少。这种"多数节点做少量工作"的模式使得总和收敛到 O(n)。
为什么不自顶向下建堆? 如果我们对每个元素调用 insert(即 sift_up),那么第 k 层有 2^k 个节点,每个上浮 k 步,总和 Σ k×2^k = O(n log n)。自顶向下的模式恰好相反——"少数节点做少量工作,多数节点做大量工作"——所以更慢。
12.10 堆排序
堆排序由 J.W.J. Williams 在 1964 年发明(与堆这个数据结构一同提出)。它是一种原地排序算法,最坏情况 O(n log n),不需要额外空间。
算法步骤:
- 将数组原地建成最大堆:O(n)
- 重复 n-1 次:将堆顶(最大值)和末尾交换,堆大小减 1,对新堆顶 sift_down
def heap_sort(arr):
"""堆排序:原地、不稳定、O(n log n)"""
n = len(arr)
# 第一步:建最大堆(自底向上)
for i in range(n // 2 - 1, -1, -1):
_sift_down_max(arr, i, n)
# 第二步:逐步取出最大值放到末尾
for i in range(n - 1, 0, -1):
arr[0], arr[i] = arr[i], arr[0] # 堆顶与末尾交换
_sift_down_max(arr, 0, i) # 对缩小的堆做 sift_down
def _sift_down_max(arr, i, size):
"""最大堆的 sift_down"""
while True:
largest = i
left = 2 * i + 1
right = 2 * i + 2
if left < size and arr[left] > arr[largest]:
largest = left
if right < size and arr[right] > arr[largest]:
largest = right
if largest == i:
break
arr[i], arr[largest] = arr[largest], arr[i]
i = largest
# 示例
data = [5, 3, 8, 1, 2, 7, 4, 6]
heap_sort(data)
print(data) # [1, 2, 3, 4, 5, 6, 7, 8]
堆排序的性能分析:
| 度量 | 值 | 说明 |
|---|---|---|
| 时间(最好) | O(n log n) | 即使数组已排序也需要 O(n log n) |
| 时间(平均) | O(n log n) | |
| 时间(最坏) | O(n log n) | 保证,不会退化 |
| 空间 | O(1) | 原地排序 |
| 稳定性 | 不稳定 | 交换会打乱相等元素的顺序 |
堆排序 vs 快速排序 vs 归并排序:
- 快速排序:平均最快(常数因子小、缓存友好),但最坏 O(n²)
- 归并排序:稳定、保证 O(n log n),但需要 O(n) 额外空间
- 堆排序:保证 O(n log n) + 原地,但缓存不友好(跳跃访问多)
实践中,堆排序很少用于通用排序(快速排序更快),但它在需要严格空间限制 + 最坏情况保证的场景下有用(如 Linux 内核中的 sort 函数使用 introsort,在递归过深时 fallback 到堆排序)。
12.11 合并 K 个有序链表(LeetCode #23)
问题:给定 K 个已排序的链表,将它们合并为一个有序链表。
暴力:每次从 K 个链表头中找最小的,需要比较 K 次,总共 N 个节点 → O(NK)。
堆优化:用最小堆维护 K 个链表的当前头节点。每次取堆顶(全局最小),然后把该链表的下一个节点入堆。
import heapq
class ListNode:
def __init__(self, val=0, next=None):
self.val = val
self.next = next
def mergeKLists(lists):
"""合并 K 个有序链表,时间 O(N log K),空间 O(K)"""
# 最小堆:(节点值, 链表索引, 节点)
# 链表索引用于打破相同 val 时的比较
heap = []
for i, head in enumerate(lists):
if head:
heapq.heappush(heap, (head.val, i, head))
dummy = ListNode(0)
curr = dummy
while heap:
val, i, node = heapq.heappop(heap)
curr.next = node
curr = curr.next
if node.next:
heapq.heappush(heap, (node.next.val, i, node.next))
return dummy.next
复杂度分析:
- 时间:O(N log K),其中 N 是所有节点总数,K 是链表个数。每个节点入堆出堆各一次,每次 O(log K)。
- 空间:O(K),堆中最多 K 个元素。
为什么用链表索引 i? Python 的 heapq 比较元组时会逐个字段比。如果两个节点的 val 相同,它会比较第二个字段。ListNode 没有定义 __lt__,直接比较会报错。加一个唯一的索引 i 就能打破平局。
12.12 数据流的中位数(LeetCode #295)
问题:设计一个数据结构,支持:
addNum(num):从数据流中添加一个整数findMedian():返回当前所有已添加数字的中位数
核心思路:用两个堆把数据分成两半:
- 最大堆
lo:存储较小的一半(堆顶是这一半的最大值) - 最小堆
hi:存储较大的一半(堆顶是这一半的最小值)
维护约束:len(lo) == len(hi) 或 len(lo) == len(hi) + 1
中位数 = lo 的堆顶(奇数个),或 (lo堆顶 + hi堆顶) / 2(偶数个)。
import heapq
class MedianFinder:
def __init__(self):
self.lo = [] # 最大堆(存负值):较小的一半
self.hi = [] # 最小堆:较大的一半
def addNum(self, num):
# 先加入最大堆
heapq.heappush(self.lo, -num)
# 保证 lo 的最大值 ≤ hi 的最小值
if self.hi and (-self.lo[0] > self.hi[0]):
val = -heapq.heappop(self.lo)
heapq.heappush(self.hi, val)
# 保证 len(lo) >= len(hi)(lo 最多比 hi 多一个)
if len(self.lo) < len(self.hi):
val = heapq.heappop(self.hi)
heapq.heappush(self.lo, -val)
# 保证 lo 不会比 hi 多超过一个
if len(self.lo) > len(self.hi) + 1:
val = -heapq.heappop(self.lo)
heapq.heappush(self.hi, val)
def findMedian(self):
if len(self.lo) > len(self.hi):
return -self.lo[0]
return (-self.lo[0] + self.hi[0]) / 2
# 使用示例
mf = MedianFinder()
mf.addNum(1) # [1] → 中位数 = 1
mf.addNum(2) # [1, 2] → 中位数 = 1.5
mf.addNum(3) # [1, 2, 3] → 中位数 = 2
print(mf.findMedian()) # 2
mf.addNum(4) # [1, 2, 3, 4] → 中位数 = 2.5
print(mf.findMedian()) # 2.5
复杂度:
addNum:O(log n)——最多做三次堆操作findMedian:O(1)——直接读堆顶
为什么两个堆这么巧妙? 中位数把数据分成大小相等的两半。最大堆的堆顶是"小的那一半中最大的",最小堆的堆顶是"大的那一半中最小的"——它们恰好是中位数的左右邻居。
12.13 优先队列在 Dijkstra 中的应用
Dijkstra 算法求单源最短路径时,每轮需要找"未确定节点中距离最小的"。这正是优先队列的适用场景。
import heapq
def dijkstra(graph, start):
"""
Dijkstra 最短路径算法(优先队列实现)
graph: 邻接表 {node: [(neighbor, weight), ...]}
返回: {node: shortest_distance}
"""
dist = {start: 0}
heap = [(0, start)] # (距离, 节点)
visited = set()
while heap:
d, u = heapq.heappop(heap)
if u in visited:
continue # 已确定最短距离,跳过
visited.add(u)
for v, w in graph.get(u, []):
new_dist = d + w
if v not in dist or new_dist < dist[v]:
dist[v] = new_dist
heapq.heappush(heap, (new_dist, v))
return dist
# 示例
graph = {
'A': [('B', 1), ('C', 4)],
'B': [('C', 2), ('D', 5)],
'C': [('D', 1)],
'D': []
}
print(dijkstra(graph, 'A')) # {'A': 0, 'B': 1, 'C': 3, 'D': 4}
注意:这个实现使用了"lazy deletion"——同一个节点可能在堆中出现多次(因为我们没有 decrease-key 操作)。通过 visited 集合跳过已处理的节点来保证正确性。
复杂度:使用二叉堆时,Dijkstra 的复杂度是 O((V + E) log V)。如果使用斐波那契堆(支持 O(1) 的 decrease-key),可以优化到 O(V log V + E)——但在实践中二叉堆通常更快,因为常数因子更小。
关于图算法的完整讨论,请参考第 18 章。
Level 3 · 规范怎么定义的
12.14 二叉堆的历史
二叉堆由 J.W.J. Williams 在 1964 年发明,发表在论文 "Algorithm 232: Heapsort"(Communications of the ACM, Vol. 7, No. 6, June 1964)。Williams 同时提出了堆这个数据结构和堆排序算法——这在计算机科学史上很罕见,一个数据结构和它的主要应用在同一篇论文中诞生。
同年,Robert W. Floyd 在 "Algorithm 245: Treesort 3"(Communications of the ACM, Vol. 7, No. 12, December 1964)中改进了 Williams 的方法,提出了自底向上建堆的 O(n) 算法。Floyd 的贡献使得堆排序的总时间复杂度保持在 O(n log n) 的同时,建堆阶段从 O(n log n) 降低到 O(n)。
这两篇论文奠定了堆在算法领域的基础地位。此后六十年,二叉堆一直是实现优先队列的标准方法——简单、高效、空间紧凑。
12.15 斐波那契堆
1987 年,Michael L. Fredman 和 Robert E. Tarjan 在论文 "Fibonacci heaps and their uses in improved network optimization algorithms"(Journal of the ACM, Vol. 34, No. 3, July 1987)中提出了斐波那契堆。
动机:Dijkstra 算法和 Prim 算法中,频繁执行的操作是 decrease-key(降低某个节点的优先级)。在二叉堆中,decrease-key 需要 O(log n)(先找到元素,再 sift_up)。如果能把 decrease-key 降到 O(1) 摊还,整个算法就更快。
斐波那契堆的操作复杂度:
| 操作 | 二叉堆 | 斐波那契堆(摊还) |
|---|---|---|
| insert | O(log n) | O(1) |
| find-min | O(1) | O(1) |
| extract-min | O(log n) | O(log n) |
| decrease-key | O(log n) | O(1) |
| merge | O(n) | O(1) |
核心思想:斐波那契堆是一组"无序的"堆有序树的集合。insert 和 decrease-key 都是"懒惰"的——只做最小量的结构调整,把真正的重组推迟到 extract-min 时进行。这种"延迟合并"策略使得 extract-min 虽然可能很慢(O(n) 最坏),但摊还下来仍然是 O(log n)。
为什么叫"斐波那契"? 因为通过 cascading cut 操作维护的结构约束保证了度为 d 的子树至少包含 F(d+2) 个节点(其中 F 是斐波那契数列)。这确保了最大度数为 O(log n),从而保证 extract-min 的 O(log n) 摊还复杂度。
实践中的地位:尽管理论上更优,斐波那契堆在实践中很少使用,原因包括:
- 常数因子大(每个节点需要维护 parent、child、left、right 指针以及 degree 和 mark 标记)
- 缓存性能差(指针结构导致内存跳跃访问)
- 实现复杂,容易出错
- 对于实际规模的图(几百万节点以内),二叉堆 + lazy deletion 通常更快
但斐波那契堆在理论算法分析中至关重要——它给出了许多图算法的最优复杂度上界。
12.16 d-ary 堆
标准二叉堆每个节点有 2 个子节点。自然的推广是 d-ary 堆:每个节点有 d 个子节点。
数组表示:
parent(i) = (i - 1) // dchild_k(i) = d * i + k + 1(k = 0, 1, ..., d-1)
复杂度变化:
- sift_up:树高变为 log_d(n),所以 O(log_d(n)) = O(log n / log d)——更快
- sift_down:每层需要比较 d 个子节点找最小,所以 O(d × log_d(n))——可能更慢
最优选择:
- 如果 decrease-key(sift_up)操作远多于 extract-min(sift_down),增大 d 有利
- Dijkstra 算法中,decrease-key 最多 E 次,extract-min 最多 V 次。当 E >> V 时,使用 d = E/V 的 d-ary 堆可以得到 O(E log_{E/V} V) 的复杂度
实践中:d = 4(四叉堆)是一个常见的优化选择。四叉堆比二叉堆更扁,减少了 sift_up 的层数;同时每层比较 4 个子节点的开销在现代 CPU 上(SIMD 指令)可以被很好地优化。LevelDB 和某些 Java 实现使用了四叉堆。
12.17 堆在操作系统中的应用
定时器管理
操作系统需要管理大量定时器(网络超时、进程时间片、周期性任务等)。核心操作是"找到最近要触发的定时器"——这正是最小堆的应用场景。
Linux 内核早期使用链表管理定时器(O(1) 插入但 O(n) 查找最近到期的),后来演化为分级时间轮(timing wheel)和红黑树方案。但在用户态(如 libuv、Go runtime),堆仍然是定时器管理的主要数据结构。
Go 运行时的定时器堆:Go 1.14 之前使用全局四叉堆管理所有 goroutine 的定时器。1.14 之后改为每个 P(处理器)维护一个本地的四叉堆,减少锁争用。
进程调度
优先级调度算法中,就绪进程按优先级排列,每次选最高优先级的进程运行。这可以用最大堆实现。但现代操作系统(如 Linux CFS 调度器)使用红黑树而非堆,因为它需要高效地找到"虚拟运行时间最小的进程"并且支持任意删除。
内存分配
某些内存分配器使用堆来管理空闲块,按块大小排序,快速找到满足请求的最小(best-fit)或最大(worst-fit)空闲块。
Level 4 · 边界与陷阱
12.18 heapq 的坑
坑 1:没有 decrease-key
Python heapq 没有直接的 decrease-key 操作。如果你需要修改堆中某个元素的优先级,有两种策略:
策略一:Lazy deletion(推荐)
import heapq
class LazyPriorityQueue:
"""支持优先级更新的优先队列(延迟删除方式)"""
def __init__(self):
self._heap = []
self._entry_finder = {} # key -> [priority, key, valid]
self._counter = 0
def push(self, key, priority):
"""插入或更新 key 的优先级"""
if key in self._entry_finder:
# 标记旧条目为无效
self._entry_finder[key][-1] = False
entry = [priority, self._counter, key, True]
self._counter += 1
self._entry_finder[key] = entry
heapq.heappush(self._heap, entry)
def pop(self):
"""弹出优先级最高(最小)的有效元素"""
while self._heap:
priority, count, key, valid = heapq.heappop(self._heap)
if valid:
del self._entry_finder[key]
return key, priority
raise KeyError("priority queue is empty")
def __contains__(self, key):
return key in self._entry_finder and self._entry_finder[key][-1]
# 使用示例
pq = LazyPriorityQueue()
pq.push('task_a', 5)
pq.push('task_b', 3)
pq.push('task_a', 1) # 更新 task_a 的优先级为 1
print(pq.pop()) # ('task_a', 1) — 正确返回更新后的优先级
print(pq.pop()) # ('task_b', 3)
代价:堆中可能积累大量无效条目,导致空间和时间开销增加。如果 decrease-key 非常频繁,考虑用 sortedcontainers.SortedList 替代。
坑 2:没有 remove
heapq 没有高效的按值删除操作。list.remove(val) 是 O(n) 的,而且删除后需要 re-heapify。同样的解决方案是 lazy deletion。
坑 3:元组比较规则
import heapq
# 正确:元组第一个元素是可比较的优先级
heap = []
heapq.heappush(heap, (1, "task_a"))
heapq.heappush(heap, (2, "task_b")) # ✓
# 危险:如果优先级相同,Python 会比较第二个元素
heapq.heappush(heap, (1, "task_c")) # 可能 OK(字符串可比较)
# 错误:第二个元素不可比较时报错
class Task:
def __init__(self, name):
self.name = name
# heapq.heappush(heap, (1, Task("x")))
# heapq.heappush(heap, (1, Task("y")))
# TypeError: '<' not supported between instances of 'Task' and 'Task'
# 解决方案:加一个唯一计数器作为 tiebreaker
counter = 0
def push_task(heap, priority, task):
global counter
counter += 1
heapq.heappush(heap, (priority, counter, task))
坑 4:heapq 是最小堆,但面试常要最大堆
# 错误:忘记取反
import heapq
heap = [1, 2, 3, 4, 5]
heapq.heapify(heap)
print(heapq.heappop(heap)) # 1,不是 5!
# 正确:取反实现最大堆
max_heap = [-x for x in [1, 2, 3, 4, 5]]
heapq.heapify(max_heap)
print(-heapq.heappop(max_heap)) # 5
12.19 面试高频题详解
题目 1:数据流的中位数(LeetCode #295)
已在 12.12 节详细讲解。核心考点:两个堆维护动态数据的中位数。
面试追问:
- Q:如果数据流有大量重复值怎么办? A:无影响,堆天然支持重复。
- Q:如果要求支持删除某个值呢? A:用 lazy deletion + 维护两个堆的有效大小。
- Q:如果数据范围有限(如 0-100)? A:用计数排序/桶,O(1) 找中位数。
题目 2:前 K 个高频元素(LeetCode #347)
import heapq
from collections import Counter
def topKFrequent(nums, k):
"""返回出现频率前 K 高的元素"""
count = Counter(nums)
# 方法一:nlargest
return heapq.nlargest(k, count.keys(), key=count.get)
# 方法二:大小为 K 的最小堆
# return [item for item, freq in heapq.nlargest(k, count.items(), key=lambda x: x[1])]
# 示例
print(topKFrequent([1,1,1,2,2,3], 2)) # [1, 2]
方法三(最优):桶排序 O(n)
def topKFrequent_bucket(nums, k):
"""桶排序方法,O(n)"""
count = Counter(nums)
# 桶:索引是频率,值是具有该频率的元素列表
buckets = [[] for _ in range(len(nums) + 1)]
for num, freq in count.items():
buckets[freq].append(num)
result = []
for freq in range(len(buckets) - 1, -1, -1):
for num in buckets[freq]:
result.append(num)
if len(result) == k:
return result
return result
题目 3:丑数 II(LeetCode #264)
问题:找第 n 个丑数(只含因子 2, 3, 5 的正整数:1, 2, 3, 4, 5, 6, 8, 9, 10, 12, ...)
import heapq
def nthUglyNumber(n):
"""最小堆方法"""
heap = [1]
seen = {1}
for _ in range(n):
ugly = heapq.heappop(heap)
for factor in [2, 3, 5]:
new_ugly = ugly * factor
if new_ugly not in seen:
seen.add(new_ugly)
heapq.heappush(heap, new_ugly)
return ugly
# 更优的三指针方法(O(n) 时间,O(n) 空间)
def nthUglyNumber_dp(n):
"""动态规划 + 三指针"""
dp = [0] * n
dp[0] = 1
p2 = p3 = p5 = 0 # 三个指针
for i in range(1, n):
next2, next3, next5 = dp[p2] * 2, dp[p3] * 3, dp[p5] * 5
dp[i] = min(next2, next3, next5)
if dp[i] == next2: p2 += 1
if dp[i] == next3: p3 += 1
if dp[i] == next5: p5 += 1
return dp[n - 1]
print(nthUglyNumber(10)) # 12
print(nthUglyNumber_dp(10)) # 12
堆方法的复杂度:O(n log n)——每个丑数入堆出堆各一次。三指针方法更优:O(n) 时间。
题目 4:会议室 II(LeetCode #253)
问题:给定一组会议的开始和结束时间,求最少需要多少间会议室。
import heapq
def minMeetingRooms(intervals):
"""
贪心 + 最小堆
堆中存储当前正在使用的会议室的结束时间
"""
if not intervals:
return 0
# 按开始时间排序
intervals.sort(key=lambda x: x[0])
# 最小堆:存储各会议室的结束时间
rooms = []
heapq.heappush(rooms, intervals[0][1])
for i in range(1, len(intervals)):
start, end = intervals[i]
# 如果最早结束的会议室已经空了,复用它
if rooms[0] <= start:
heapq.heappop(rooms)
# 分配会议室(新的或复用的)
heapq.heappush(rooms, end)
return len(rooms)
# 示例
intervals = [[0, 30], [5, 10], [15, 20]]
print(minMeetingRooms(intervals)) # 2
intervals2 = [[7, 10], [2, 4]]
print(minMeetingRooms(intervals2)) # 1
思路:按开始时间排序后,对每个会议检查"最早结束的会议室"是否可以复用。最小堆让我们 O(log n) 地找到最早结束的房间。最终堆的大小就是最少需要的会议室数。
12.20 堆 vs 排序 vs 快速选择:Top-K 问题三解法对比
| 方法 | 时间复杂度 | 空间复杂度 | 是否需要所有数据 | 输出有序? |
|---|---|---|---|---|
| 完整排序 | O(n log n) | O(1)~O(n) | 是 | 是 |
| 最小堆(大小 K) | O(n log K) | O(K) | 否(流式) | 否(需额外排序) |
| 快速选择 | O(n) 平均 | O(1) | 是 | 否 |
如何选择?
- K 接近 n(如 K > n/2):直接排序最简单
- K 很小且数据是流式的(不能一次性放入内存):堆是唯一选择
- K 很小且数据可随机访问:快速选择(Quickselect)平均最快
- 需要输出有序结果且 K 较小:堆 + 排序堆结果
- 需要最坏情况保证:堆(快速选择最坏 O(n²),除非用中位数的中位数)
import heapq
import random
def quickselect(nums, k):
"""快速选择:找第 K 大的元素,平均 O(n)"""
target = len(nums) - k # 转换为第 target 小
def select(left, right):
pivot_idx = random.randint(left, right)
nums[pivot_idx], nums[right] = nums[right], nums[pivot_idx]
pivot = nums[right]
store = left
for i in range(left, right):
if nums[i] < pivot:
nums[i], nums[store] = nums[store], nums[i]
store += 1
nums[store], nums[right] = nums[right], nums[store]
if store == target:
return nums[store]
elif store < target:
return select(store + 1, right)
else:
return select(left, store - 1)
return select(0, len(nums) - 1)
# 三种方法对比
nums = [3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5]
k = 3
# 方法 1:排序
sorted_result = sorted(nums, reverse=True)[:k]
print(f"排序法: {sorted_result}") # [9, 6, 5]
# 方法 2:堆
heap_result = heapq.nlargest(k, nums)
print(f"堆方法: {heap_result}") # [9, 6, 5]
# 方法 3:快速选择(只找第 K 大,不排序)
kth = quickselect(nums[:], k) # 传副本,因为会修改原数组
print(f"快速选择第{k}大: {kth}") # 5
12.21 什么时候用堆,什么时候用有序容器
用堆(heapq)的场景:
- 只需要反复取最小/最大值
- 数据是流式到来的
- 不需要按任意 key 删除
- 内存敏感(堆比 BST 紧凑得多)
用有序容器(如 sortedcontainers.SortedList)的场景:
- 需要按任意位置删除
- 需要范围查询("值在 [a, b] 之间的有多少个")
- 需要按 rank 访问("第 K 小的是什么")
- 需要 predecessor/successor 查询
# heapq 无法高效做的事:
# 1. 删除特定元素(只能 lazy deletion)
# 2. 找到"第 3 小的元素"(只能取 3 次堆顶)
# 3. 修改已有元素的优先级(decrease-key)
# sortedcontainers 可以做:
from sortedcontainers import SortedList
sl = SortedList([5, 3, 8, 1, 2])
sl.remove(3) # O(log n) 按值删除
print(sl[2]) # O(log n) 按索引访问 → 5
sl.add(4) # O(log n) 插入
print(sl.bisect_left(4)) # O(log n) 二分查找位置
经验法则:如果你只需要"快速取极值",用 heapq。如果你需要更丰富的查询能力,用 SortedList。如果你在写 LeetCode 且需要有序容器但不想导入第三方库,可以用手写 BST 或者用两个堆模拟。
12.22 实战建议
-
Python 面试中,heapq 是你的首选工具。90% 的堆题用 heapq 就能解决,不需要手写堆。
-
识别"需要用堆"的信号词:
- "前 K 个..."
- "第 K 大/小..."
- "动态数据中的中位数/极值..."
- "合并多个有序..."
- "最短路径..."(Dijkstra)
- "贪心选择最优..."
-
heapq 的时间复杂度备忘:
- push/pop: O(log n)
- heapify: O(n)
- nlargest/nsmallest: O(n log k) 或 O(n + k log n)
-
元组技巧:将 (priority, counter, data) 作为堆元素可以解决所有比较问题。
-
不要过度使用堆:如果数据总量小(< 1000),直接排序通常更简单且足够快。堆的优势在数据量大且 K 远小于 n 时才明显。
本章小结
堆是一种简洁而强大的数据结构。它的核心洞察是:如果你只关心极值,就不需要维护完整的排序。用完全二叉树的结构约束加上弱化的有序性(只保证父子关系),堆在数组上实现了 O(1) 取极值和 O(log n) 修改。
从工程角度看,堆的数组实现是缓存最友好的树结构——没有指针、没有碎片、内存连续。这是它在实践中常常跑赢理论更优但指针结构复杂的方案(如斐波那契堆)的根本原因。
下一章我们将学习哈希表——另一种用"弱约束"换"高性能"的设计哲学。哈希表放弃了所有顺序信息,只追求 O(1) 的精确查找。