队列与双端队列
第七章:队列与双端队列
你在食堂排队打饭:先来的人先打到饭,后来的人排在队尾。你在银行取号等候:号码小的先被叫到窗口。你的电脑同时打开了十个程序,操作系统把它们排成一个队列,轮流分配 CPU 时间。
这些场景的共同数学结构是队列(Queue)。最先进入的元素,最先被取出——First In, First Out,简称 FIFO。
队列和栈(第 5 章)是一对"镜像"数据结构:栈是 LIFO,队列是 FIFO。栈解决的是"最近的事情最先处理"的问题(如递归、撤销),队列解决的是"先来的事情先处理"的问题(如调度、广度优先搜索)。两者加在一起,覆盖了绝大多数"按时间顺序处理事件"的场景。
本章我们将从最简单的队列出发,逐步深入循环队列、双端队列,最后触及优先队列和消息队列。你会发现,队列是连接数据结构与操作系统、网络协议、分布式系统的桥梁。
Level 1 · 你需要知道的
1.1 队列的基本概念
队列是一种受限的线性数据结构。它只允许在一端(队尾,rear/back)插入,在另一端(队头,front)删除。
队列支持的核心操作:
| 操作 | 含义 | 时间复杂度 |
|---|---|---|
enqueue(x) |
将元素 x 加入队尾 | O(1) |
dequeue() |
移除并返回队头元素 | O(1) |
peek() / front() |
查看队头元素但不移除 | O(1) |
附加操作:
is_empty()— 判断队列是否为空size()— 返回队列中元素个数
FIFO 性质的形式化描述:
设队列 Q 上依次执行 enqueue(a₁), enqueue(a₂), ..., enqueue(aₙ),则执行 n 次 dequeue() 得到的序列是 a₁, a₂, ..., aₙ。即:输出顺序与输入顺序完全一致。
为什么要这个约束? 这个问题和栈一样:约束带来语义清晰性。当你看到一个队列,你立刻知道"先来的先走"。这种公平性假设在无数场景中自然成立——排队、调度、消息传递。队列的设计哲学就是:用结构保证公平性,而不是靠程序员手动维护顺序。
1.2 用 Python 的 collections.deque 实现队列
很多初学者用 Python 的 list 来实现队列:
# ❌ 错误做法:用 list 实现队列
queue = []
queue.append("A") # enqueue
queue.append("B")
queue.pop(0) # dequeue — 这是 O(n)!
这是一个常见的性能陷阱。 list.pop(0) 需要把后面所有元素往前移一位,时间复杂度是 O(n)。如果你的队列有一百万个元素,每次 dequeue 都要移动一百万个元素——这显然不可接受。
正确的做法是使用 collections.deque:
from collections import deque
class Queue:
"""基于 collections.deque 的队列实现"""
def __init__(self):
self._data = deque()
def enqueue(self, item):
"""将元素加入队尾 — O(1)"""
self._data.append(item)
def dequeue(self):
"""移除并返回队头元素 — O(1)
Raises:
IndexError: 如果队列为空
"""
if self.is_empty():
raise IndexError("dequeue from empty queue")
return self._data.popleft()
def peek(self):
"""查看队头元素但不移除 — O(1)
Raises:
IndexError: 如果队列为空
"""
if self.is_empty():
raise IndexError("peek from empty queue")
return self._data[0]
def is_empty(self):
"""判断队列是否为空 — O(1)"""
return len(self._data) == 0
def size(self):
"""返回队列中元素个数 — O(1)"""
return len(self._data)
def __repr__(self):
return f"Queue({list(self._data)})"
# 使用示例
q = Queue()
q.enqueue("Alice")
q.enqueue("Bob")
q.enqueue("Charlie")
print(q.dequeue()) # Alice — 先进先出
print(q.dequeue()) # Bob
print(q.peek()) # Charlie — 只看不取
print(q.size()) # 1
为什么 deque 的 popleft() 是 O(1)? 因为 deque 内部不是一个连续数组,而是一个双向链表连接的固定长度块(block)结构。两端的操作都不需要移动其他元素。我们在 Level 2 中会详细分析它的内部实现。
1.3 循环队列的实现
在很多底层系统中(操作系统内核、嵌入式设备、网络缓冲区),我们不能用动态分配内存的 deque,而是需要一个固定大小的数组来实现队列。这就是循环队列(Circular Queue)。
循环队列的核心思想:用一个数组和两个指针(front、rear)来维护队列。当指针到达数组末尾时,通过取模运算让它"绕回"到数组开头,形成逻辑上的环形结构。
class CircularQueue:
"""基于数组的循环队列实现"""
def __init__(self, capacity: int):
"""
Args:
capacity: 队列最大容量
"""
self._data = [None] * (capacity + 1) # 多分配一个空间用于区分满和空
self._front = 0
self._rear = 0
self._capacity = capacity + 1
def enqueue(self, item) -> bool:
"""将元素加入队尾 — O(1)
Returns:
True 如果入队成功,False 如果队列已满
"""
if self.is_full():
return False
self._data[self._rear] = item
self._rear = (self._rear + 1) % self._capacity
return True
def dequeue(self):
"""移除并返回队头元素 — O(1)
Raises:
IndexError: 如果队列为空
"""
if self.is_empty():
raise IndexError("dequeue from empty circular queue")
item = self._data[self._front]
self._data[self._front] = None # 帮助 GC
self._front = (self._front + 1) % self._capacity
return item
def peek(self):
"""查看队头元素 — O(1)"""
if self.is_empty():
raise IndexError("peek from empty circular queue")
return self._data[self._front]
def is_empty(self) -> bool:
"""队列为空的条件:front == rear"""
return self._front == self._rear
def is_full(self) -> bool:
"""队列为满的条件:(rear + 1) % capacity == front
注意:我们牺牲一个位置来区分满和空。
如果不这样做,满和空的状态都是 front == rear,无法区分。
"""
return (self._rear + 1) % self._capacity == self._front
def size(self) -> int:
"""当前队列中的元素个数"""
return (self._rear - self._front + self._capacity) % self._capacity
def __repr__(self):
if self.is_empty():
return "CircularQueue([])"
items = []
i = self._front
while i != self._rear:
items.append(self._data[i])
i = (i + 1) % self._capacity
return f"CircularQueue({items})"
# 使用示例
cq = CircularQueue(5)
for i in range(5):
cq.enqueue(i)
print(cq) # CircularQueue([0, 1, 2, 3, 4])
print(cq.is_full()) # True
print(cq.dequeue()) # 0
print(cq.dequeue()) # 1
cq.enqueue(5) # 利用了之前释放的空间
cq.enqueue(6)
print(cq) # CircularQueue([2, 3, 4, 5, 6])
循环队列的关键设计决策:
-
为什么多分配一个空间? 如果
capacity = 5,我们实际分配 6 个位置。这样"满"的条件是(rear + 1) % capacity == front,而"空"的条件是front == rear,两者永远不会混淆。另一种做法是维护一个size计数器,但这会增加每次操作的开销(虽然只是一个加减法,在高频场景下也是有意义的)。 -
取模运算的含义:
(index + 1) % capacity实现的是"到了末尾就回到开头"。这就是"循环"的本质——指针在数组上走一个圈。 -
与 Python 的 deque 的区别:循环队列是固定大小的,满了就拒绝入队(或者抛出异常)。
deque是动态大小的,会自动扩容。循环队列适合"缓冲区"场景——你不希望无限增长,而是希望控制内存使用。
1.4 BFS 基础:层序遍历与最短路径
队列最经典的算法应用是广度优先搜索(Breadth-First Search,BFS)。BFS 是"一层一层向外扩展"的搜索策略,它天然需要一个队列来维护"下一层要访问的节点"。
BFS 的基本模板:
from collections import deque
def bfs(graph, start):
"""BFS 遍历图
Args:
graph: 邻接表表示的图,graph[node] 是 node 的所有邻居
start: 起始节点
Returns:
visited: 从 start 能到达的所有节点
"""
visited = set([start])
queue = deque([start])
while queue:
node = queue.popleft() # 取出队头
# 处理当前节点
print(node, end=" ")
for neighbor in graph[node]:
if neighbor not in visited:
visited.add(neighbor)
queue.append(neighbor) # 加入队尾
return visited
BFS 求最短路径(无权图):
在无权图中(所有边的权重都是 1),BFS 天然能找到从起点到任何可达节点的最短路径。原因很直觉:BFS 先访问距离为 1 的节点,再访问距离为 2 的节点……第一次到达某个节点时,走的一定是最短路径。
def shortest_path_bfs(graph, start, target):
"""在无权图中找从 start 到 target 的最短路径
Returns:
路径列表,如果不可达返回 None
"""
if start == target:
return [start]
visited = {start}
queue = deque([(start, [start])]) # (当前节点, 从start到当前节点的路径)
while queue:
node, path = queue.popleft()
for neighbor in graph[node]:
if neighbor not in visited:
new_path = path + [neighbor]
if neighbor == target:
return new_path
visited.add(neighbor)
queue.append((neighbor, new_path))
return None # 不可达
# 示例
graph = {
'A': ['B', 'C'],
'B': ['A', 'D', 'E'],
'C': ['A', 'F'],
'D': ['B'],
'E': ['B', 'F'],
'F': ['C', 'E']
}
path = shortest_path_bfs(graph, 'A', 'F')
print(path) # ['A', 'C', 'F'] — 最短路径,长度为 2
**层序遍历(Level Order Traversal)**是 BFS 在树上的应用:
def level_order_traversal(root):
"""二叉树的层序遍历
Returns:
result: 二维列表,result[i] 是第 i 层的所有节点值
"""
if not root:
return []
result = []
queue = deque([root])
while queue:
level_size = len(queue) # 当前层的节点数
level_nodes = []
for _ in range(level_size):
node = queue.popleft()
level_nodes.append(node.val)
if node.left:
queue.append(node.left)
if node.right:
queue.append(node.right)
result.append(level_nodes)
return result
关于 BFS 的更多应用——图遍历、最短路径变体、拓扑排序——我们将在第 10 章(图的基础)和第 16 章(图算法进阶)中详细展开。
1.5 优先队列简介
普通队列的规则是"先来先服务"。但在很多实际场景中,我们需要的是"优先级最高的先服务"——这就是优先队列(Priority Queue)。
优先队列的操作接口和普通队列类似,但 dequeue 操作不是取出最早进入的元素,而是取出优先级最高的元素。
import heapq
class PriorityQueue:
"""基于 heapq 的最小优先队列"""
def __init__(self):
self._heap = []
self._counter = 0 # 用于打破优先级相同时的平局
def enqueue(self, item, priority):
"""将元素加入优先队列
Args:
item: 元素
priority: 优先级(数字越小优先级越高)
"""
heapq.heappush(self._heap, (priority, self._counter, item))
self._counter += 1
def dequeue(self):
"""取出优先级最高(数字最小)的元素"""
if self.is_empty():
raise IndexError("dequeue from empty priority queue")
priority, _, item = heapq.heappop(self._heap)
return item
def peek(self):
"""查看优先级最高的元素"""
if self.is_empty():
raise IndexError("peek from empty priority queue")
return self._heap[0][2]
def is_empty(self):
return len(self._heap) == 0
def size(self):
return len(self._heap)
# 使用示例:医院急诊分诊
er = PriorityQueue()
er.enqueue("感冒患者", priority=5)
er.enqueue("心脏病发作", priority=1)
er.enqueue("骨折", priority=3)
print(er.dequeue()) # 心脏病发作 — 优先级最高
print(er.dequeue()) # 骨折
print(er.dequeue()) # 感冒患者
优先队列的高效实现依赖于**堆(Heap)**数据结构。我们在第 12 章会详细讲解堆的原理和实现。这里你只需要知道:Python 的 heapq 模块给你提供了一个 O(log n) 入队、O(log n) 出队的优先队列。
1.6 常见错误:用 list 做队列
让我们量化一下"用 list 做队列"到底有多慢:
import time
from collections import deque
def benchmark_list_queue(n):
"""用 list 做队列的性能"""
q = []
start = time.perf_counter()
for i in range(n):
q.append(i)
for i in range(n):
q.pop(0) # O(n) 操作!
return time.perf_counter() - start
def benchmark_deque_queue(n):
"""用 deque 做队列的性能"""
q = deque()
start = time.perf_counter()
for i in range(n):
q.append(i)
for i in range(n):
q.popleft() # O(1) 操作
return time.perf_counter() - start
# 测试不同规模
for n in [10_000, 100_000, 1_000_000]:
t_list = benchmark_list_queue(n)
t_deque = benchmark_deque_queue(n)
print(f"n={n:>10,}: list={t_list:.4f}s, deque={t_deque:.4f}s, "
f"ratio={t_list/t_deque:.1f}x")
典型输出(具体数字因机器而异):
n= 10,000: list=0.0150s, deque=0.0008s, ratio=18.8x
n= 100,000: list=1.4500s, deque=0.0070s, ratio=207.1x
n= 1,000,000: list=148.30s, deque=0.0680s, ratio=2181.0x
差距是二次级别的:list 版本的总时间是 O(n²)(n 次 O(n) 的 pop(0)),而 deque 版本是 O(n)(n 次 O(1) 的 popleft())。当 n 翻 10 倍,list 版本慢 100 倍,deque 版本只慢 10 倍。
记住这条规则:在 Python 中,永远不要用 list.pop(0) 做队列的出队操作。用 collections.deque。
Level 2 · 它是怎么运行的
2.1 collections.deque 的内部实现
CPython 中 collections.deque 的源码位于 Modules/_collectionsmodule.c。它的内部结构不是一个简单的数组,也不是一个简单的链表,而是一种混合结构:双向链表连接的固定长度块(block)。
每个 block 是一个包含 64 个 PyObject* 指针的数组(CPython 3.x 中 BLOCKLEN = 64)。多个 block 通过双向链表的方式串联起来。deque 对象本身维护:
leftblock— 指向最左边的 blockrightblock— 指向最右边的 blockleftindex— 左端下一个可用位置在 leftblock 中的索引rightindex— 右端最后一个元素在 rightblock 中的索引
deque 的内部布局:
leftblock ... rightblock
┌──────────┐ ┌──────────┐ ┌──────────┐
│ [64 slots] │ ←→ │ [64 slots] │ ←→ │ [64 slots] │
└──────────┘ └──────────┘ └──────────┘
↑ leftindex rightindex ↑
appendleft() / popleft() 的过程:
popleft():取出leftblock[leftindex]处的元素,leftindex += 1。如果leftindex超过了 BLOCKLEN(当前 block 用完了),则释放这个 block,leftblock指向下一个 block,leftindex = 0。appendleft(x):leftindex -= 1,将 x 放在leftblock[leftindex]。如果leftindex < 0(当前 block 左边已满),则分配一个新 block,链接到 leftblock 的左边。
为什么这种设计比单纯链表更好?
- 缓存友好:每个 block 内的 64 个指针在内存中连续,访问它们时 CPU 缓存命中率高。纯链表的每个节点可能散落在内存各处。
- 内存开销低:纯链表每个元素需要一个
prev和next指针(16 字节),加上 Python 对象头(56 字节)。deque 的 block 结构中,每个元素只需要一个 8 字节的指针,外加 block 本身的 prev/next 开销被 64 个元素分摊。 - 两端操作 O(1):append/appendleft/pop/popleft 都是真正的 O(1),没有均摊。
deque 的缺点:随机访问是 O(n)。deque[i] 需要从某一端开始,逐 block 跳转,找到第 i 个元素所在的 block,然后在 block 内定位。不像数组那样 O(1) 直接定位。
Raymond Hettinger(deque 的主要实现者,Python 核心开发者)在 CPython issue tracker 和 PyCon 演讲中多次解释了这个设计决策:"deque 的定位是双端操作的高效容器,不是通用的随机访问序列。如果你需要随机访问,用 list。"
2.2 循环队列 vs 链表队列的性能对比
队列有两种经典的底层实现:数组(循环队列)和链表。它们各有优劣:
| 维度 | 循环队列(数组) | 链表队列 |
|---|---|---|
| enqueue | O(1),直接写入 | O(1),新建节点 |
| dequeue | O(1),移动指针 | O(1),删除头节点 |
| 内存布局 | 连续,缓存友好 | 散列,缓存不友好 |
| 内存开销 | 固定大小,可能浪费 | 按需分配,无浪费 |
| 最大容量 | 固定(需预先指定) | 理论上无限 |
| 分配/释放 | 无(复用数组空间) | 每次操作都涉及分配/释放 |
在什么场景下选择哪种?
- 循环队列:当你知道队列的最大长度、追求极致性能、或者在不允许动态内存分配的环境中(嵌入式、内核)。Linux 内核中的
kfifo就是循环队列。 - 链表队列:当队列长度不可预测、或者元素大小差异很大时。Java 的
LinkedList实现的Queue接口就是链表队列。 - 块状双端队列(deque 的设计):兼顾两者优点——既有较好的缓存性能(块内连续),又可以动态增长。Python 的
deque、C++ 的std::deque都采用这种设计。
性能实测:
import time
from collections import deque
class LinkedNode:
__slots__ = ('val', 'next')
def __init__(self, val):
self.val = val
self.next = None
class LinkedQueue:
"""链表实现的队列"""
def __init__(self):
self._head = None
self._tail = None
self._size = 0
def enqueue(self, item):
node = LinkedNode(item)
if self._tail:
self._tail.next = node
else:
self._head = node
self._tail = node
self._size += 1
def dequeue(self):
if not self._head:
raise IndexError("empty")
val = self._head.val
self._head = self._head.next
if not self._head:
self._tail = None
self._size -= 1
return val
def benchmark(queue_class, n, *args):
q = queue_class(*args) if args else queue_class()
start = time.perf_counter()
for i in range(n):
if hasattr(q, 'enqueue'):
q.enqueue(i)
else:
q.append(i)
for i in range(n):
if hasattr(q, 'dequeue'):
q.dequeue()
else:
q.popleft()
return time.perf_counter() - start
n = 1_000_000
print(f"deque: {benchmark(deque, n):.4f}s")
print(f"LinkedQueue: {benchmark(LinkedQueue, n):.4f}s")
print(f"CircularQueue:{benchmark(CircularQueue, n, n):.4f}s")
典型结果:
deque: 0.068s
LinkedQueue: 0.350s
CircularQueue:0.120s
deque 最快,因为它是 C 实现的,而且块状结构缓存友好。CircularQueue 也不慢(纯 Python 限制了速度)。LinkedQueue 最慢,因为每次 enqueue 都要创建一个新的 Python 对象(节点),每次 dequeue 都要释放一个对象——Python 的对象创建开销很大。
2.3 双端队列的应用
**双端队列(Double-Ended Queue,Deque)**允许在两端同时进行插入和删除。Python 的 collections.deque 本身就是一个完整的双端队列。
应用一:回文检查
回文是一个从前往后读和从后往前读都一样的字符串。用双端队列可以优雅地实现回文检查:从两端同时取字符进行比较。
from collections import deque
def is_palindrome(s: str) -> bool:
"""用双端队列检查回文
思路:将字符放入 deque,然后同时从两端取出比较。
如果每对字符都相等,则是回文。
"""
# 预处理:只保留字母和数字,转小写
chars = deque(c.lower() for c in s if c.isalnum())
while len(chars) > 1:
front = chars.popleft()
back = chars.pop()
if front != back:
return False
return True
print(is_palindrome("A man, a plan, a canal: Panama")) # True
print(is_palindrome("race a car")) # False
这个解法的时间复杂度是 O(n),空间复杂度是 O(n)。虽然双指针法可以做到 O(1) 空间,但 deque 的方法在概念上更直观地展示了"双端操作"的思想。
应用二:滑动窗口最大值
这是一个经典的面试题(LeetCode #239):给定一个数组和一个窗口大小 k,返回每个窗口中的最大值。
暴力做法是 O(nk),但利用双端队列可以做到 O(n):
from collections import deque
def max_sliding_window(nums: list, k: int) -> list:
"""滑动窗口最大值 — O(n)
维护一个递减的双端队列:
- 队列存储的是索引
- 队列中的索引对应的值是递减的
- 队头永远是当前窗口的最大值的索引
为什么是递减?因为如果 nums[i] >= nums[j] 且 i > j,
那么 j 永远不可能成为任何未来窗口的最大值(i 比 j 大且出现得晚)。
所以 j 可以被安全地丢弃。
"""
if not nums or k == 0:
return []
dq = deque() # 存储索引,对应值递减
result = []
for i in range(len(nums)):
# 移除队头过期的索引(超出窗口范围)
while dq and dq[0] < i - k + 1:
dq.popleft()
# 移除队尾所有比当前元素小的索引
while dq and nums[dq[-1]] <= nums[i]:
dq.pop()
dq.append(i)
# 当窗口形成后,记录最大值
if i >= k - 1:
result.append(nums[dq[0]])
return result
nums = [1, 3, -1, -3, 5, 3, 6, 7]
print(max_sliding_window(nums, 3))
# [3, 3, 5, 5, 6, 7]
这个算法在第 6 章(链表与滑动窗口)中也有所涉及。这里是队列视角的阐述:双端队列维护的是一个"候选最大值"的单调递减序列,每个元素最多入队一次、出队一次,因此总时间复杂度是 O(n)。
2.4 用队列实现栈 / 用栈实现队列
这两个问题是理解队列和栈本质区别的绝佳练习,也是面试高频题。
用两个队列实现栈
from collections import deque
class StackUsingQueues:
"""用两个队列实现栈
策略:让 push 操作承担 O(n) 代价,pop 操作 O(1)。
push 时:将新元素放入空队列 q2,然后把 q1 的所有元素依次出队再入队到 q2。
最后交换 q1 和 q2。这样 q1 的队头永远是最后入栈的元素。
"""
def __init__(self):
self._q1 = deque() # 主队列
self._q2 = deque() # 辅助队列
def push(self, x):
"""O(n) — 将新元素放到队头"""
self._q2.append(x)
while self._q1:
self._q2.append(self._q1.popleft())
self._q1, self._q2 = self._q2, self._q1
def pop(self):
"""O(1) — 队头就是栈顶"""
if not self._q1:
raise IndexError("pop from empty stack")
return self._q1.popleft()
def top(self):
"""O(1)"""
if not self._q1:
raise IndexError("top from empty stack")
return self._q1[0]
def empty(self):
return len(self._q1) == 0
用两个栈实现队列(均摊 O(1))
这个更有趣,因为它展示了均摊分析的威力:
class QueueUsingStacks:
"""用两个栈实现队列 — 均摊 O(1)
策略:
- in_stack: 负责入队(push 到 in_stack)
- out_stack: 负责出队(从 out_stack pop)
当 out_stack 为空时,将 in_stack 的所有元素倒入 out_stack。
倒入后,out_stack 的栈顶就是最早入队的元素。
均摊分析:每个元素最多被 push 两次(进 in_stack 一次,进 out_stack 一次)
和 pop 两次(出 in_stack 一次,出 out_stack 一次)。
所以 n 次操作总计 O(n),均摊每次 O(1)。
"""
def __init__(self):
self._in_stack = [] # 入队端
self._out_stack = [] # 出队端
def enqueue(self, x):
"""O(1) — 直接 push 到 in_stack"""
self._in_stack.append(x)
def dequeue(self):
"""均摊 O(1) — 从 out_stack pop"""
if not self._out_stack:
if not self._in_stack:
raise IndexError("dequeue from empty queue")
# 将 in_stack 倒入 out_stack
while self._in_stack:
self._out_stack.append(self._in_stack.pop())
return self._out_stack.pop()
def peek(self):
"""均摊 O(1)"""
if not self._out_stack:
if not self._in_stack:
raise IndexError("peek from empty queue")
while self._in_stack:
self._out_stack.append(self._in_stack.pop())
return self._out_stack[-1]
def is_empty(self):
return not self._in_stack and not self._out_stack
# 验证
q = QueueUsingStacks()
q.enqueue(1)
q.enqueue(2)
q.enqueue(3)
print(q.dequeue()) # 1 — FIFO!
q.enqueue(4)
print(q.dequeue()) # 2
print(q.dequeue()) # 3
print(q.dequeue()) # 4
均摊分析的关键洞察:虽然单次 dequeue 在最坏情况下是 O(n)(需要把所有元素从 in_stack 倒到 out_stack),但每个元素只会被倒一次。所以 n 次 dequeue 操作的总代价是 O(n),均摊每次 O(1)。这和动态数组的 append 均摊 O(1) 是同样的分析思路。
2.5 多源 BFS
标准 BFS 从一个起点出发。多源 BFS 从多个起点同时出发,一层一层向外扩展。这就像你在地图上同时从多个城市放水,水面同时向外扩展,最终相遇。
经典问题:给定一个 0/1 矩阵,0 代表空地,1 代表建筑。求每个空地到最近建筑的距离。
from collections import deque
def distance_to_nearest_building(grid):
"""多源 BFS:计算每个空地到最近建筑的距离
思路:将所有建筑(值为1)同时作为起点放入队列,
然后 BFS 向外扩展。每个空地第一次被访问时的层数就是它到最近建筑的距离。
"""
rows, cols = len(grid), len(grid[0])
distances = [[float('inf')] * cols for _ in range(rows)]
queue = deque()
# 把所有建筑作为起点
for r in range(rows):
for c in range(cols):
if grid[r][c] == 1:
queue.append((r, c, 0))
distances[r][c] = 0
# BFS
directions = [(0, 1), (0, -1), (1, 0), (-1, 0)]
while queue:
r, c, dist = queue.popleft()
for dr, dc in directions:
nr, nc = r + dr, c + dc
if 0 <= nr < rows and 0 <= nc < cols and distances[nr][nc] == float('inf'):
distances[nr][nc] = dist + 1
queue.append((nr, nc, dist + 1))
return distances
grid = [
[0, 0, 0, 1],
[0, 0, 0, 0],
[1, 0, 0, 0],
[0, 0, 0, 0]
]
result = distance_to_nearest_building(grid)
for row in result:
print(row)
# [2, 3, 2, 0] — 距离最近建筑
# [1, 2, 2, 1]
# [0, 1, 2, 2]
# [1, 2, 3, 3]
多源 BFS 的时间复杂度和单源 BFS 一样,都是 O(V + E)——因为每个节点仍然最多被访问一次。它的巧妙之处在于:通过把多个起点同时放入初始队列,把"多次单源 BFS"的 O(k(V+E)) 问题降为了一次 O(V+E)。
类似的应用包括 LeetCode #542(01 矩阵),以及"腐烂的橘子"(#994)。
Level 3 · 规范怎么定义的
3.1 队列在操作系统中的应用
队列是操作系统中使用最频繁的数据结构之一。几乎所有需要"排队等待处理"的场景都在使用某种形式的队列。
进程调度队列
操作系统管理 CPU 的方式就是维护一组队列。以 Linux 的 CFS(Completely Fair Scheduler,完全公平调度器)之前的 O(1) 调度器为例(Ingo Molnár, 2003):
- 就绪队列(Ready Queue):所有准备运行但尚未获得 CPU 的进程排在这里。
- 多级反馈队列(Multilevel Feedback Queue, MLFQ):由 Fernando Corbató 在 1962 年提出(后来 Corbató 因此工作获得图灵奖),用多个优先级不同的队列来平衡交互式进程和批处理进程的需求。
- 实时调度队列:用于硬实时或软实时任务,通常按优先级排列(实际上是优先队列)。
Linux O(1) 调度器的结构(简化):
Priority Level 0: [P1] → [P5] → [P9] ← 最高优先级
Priority Level 1: [P2] → [P7]
Priority Level 2: [P3] → [P4] → [P8] → [P6]
...
Priority Level 139: [P10] ← 最低优先级
每个优先级是一个 FIFO 队列。
调度器选择最高非空优先级的队头进程运行。
为什么用队列而不是其他结构? 因为队列天然保证了公平性:在同一优先级内,先到的进程先运行。这是避免饥饿(starvation)的最基本机制。
I/O 请求队列
当多个进程同时请求磁盘读写时,操作系统将这些请求放入 I/O 队列。磁盘调度算法(如 SCAN、C-SCAN、Deadline 等)从这个队列中选择下一个要服务的请求。
Linux 的 Block I/O 层使用 struct request_queue 来管理 I/O 请求。每个块设备(如 /dev/sda)都有自己的请求队列。
// Linux 内核中请求队列的简化结构
struct request_queue {
struct list_head queue_head; // 请求链表
struct elevator_type *elevator; // 调度算法(电梯算法)
unsigned long nr_requests; // 队列中最大请求数
// ...
};
为什么 I/O 队列通常有最大长度限制? 如果队列无限增长,一个疯狂的进程可以发起大量 I/O 请求,耗尽所有内存。队列长度限制(默认通常是 128 或 256)提供了**背压(backpressure)**机制:当队列满时,新请求会被阻塞,从而限制了请求的产生速率。
3.2 消息队列的概念
从进程内的队列到进程间的队列,再到机器间的队列,队列的概念自然地延伸为消息队列(Message Queue)。
消息队列是分布式系统中的核心基础设施。它解耦了消息的生产者和消费者,允许它们以不同的速率运行。
从单机队列到分布式消息队列的演化:
- 线程内队列:
queue.Queue(Python)、BlockingQueue(Java) - 进程间队列:POSIX 消息队列(
mq_open)、System V 消息队列 - 机器间消息队列:RabbitMQ、Apache Kafka、Redis Streams、Amazon SQS
消息队列的基本模型:
Producer → [Message Queue] → Consumer
关键属性:
- 持久化:消息写入磁盘,即使 broker 重启也不丢失
- 有序性:同一分区内的消息保持 FIFO 顺序
- 确认机制:消费者处理完后发送 ACK,否则消息重新入队
- 背压:队列满时,生产者被减速或拒绝
Apache Kafka 的 Log 结构(Jay Kreps et al., LinkedIn, 2011):
Kafka 的核心抽象不是传统的"队列",而是分布式追加日志(Distributed Append-Only Log)。每个 partition 本质上就是一个 FIFO 队列,消费者通过 offset 来记住自己读到了哪里。这种设计让 Kafka 能够:
- 支持多个消费者独立消费同一份数据
- 消费者可以回溯到任意 offset 重新消费
- 顺序写磁盘,吞吐量极高(每秒百万级消息)
关于 Kafka 的详细架构,我们在消息队列专题书中会深入讨论。
3.3 阻塞队列与生产者-消费者模型
生产者-消费者模型(Producer-Consumer Pattern)是并发编程中最基础的设计模式之一。它的核心组件就是一个有界阻塞队列(Bounded Blocking Queue):
- 有界:队列有最大容量限制
- 阻塞:队列满时,enqueue 操作阻塞等待;队列空时,dequeue 操作阻塞等待
import threading
import queue
import time
import random
def producer(q: queue.Queue, name: str):
"""生产者:生产数据放入队列"""
for i in range(10):
item = f"{name}-item-{i}"
q.put(item) # 如果队列满,自动阻塞等待
print(f"[{name}] Produced: {item}")
time.sleep(random.uniform(0.01, 0.1))
def consumer(q: queue.Queue, name: str):
"""消费者:从队列取出数据处理"""
while True:
try:
item = q.get(timeout=1) # 等待最多1秒
print(f" [{name}] Consumed: {item}")
time.sleep(random.uniform(0.05, 0.15)) # 模拟处理时间
q.task_done()
except queue.Empty:
print(f" [{name}] No more items, exiting.")
break
# 创建一个容量为 5 的有界队列
bounded_queue = queue.Queue(maxsize=5)
# 启动生产者和消费者
producers = [
threading.Thread(target=producer, args=(bounded_queue, f"P{i}"))
for i in range(2)
]
consumers = [
threading.Thread(target=consumer, args=(bounded_queue, f"C{i}"))
for i in range(3)
]
for t in producers + consumers:
t.start()
for t in producers:
t.join()
bounded_queue.join() # 等待所有任务完成
阻塞队列的实现原理:
Python 的 queue.Queue 内部使用了条件变量(threading.Condition)来实现阻塞:
# queue.Queue 的核心逻辑(简化)
class SimpleBlockingQueue:
def __init__(self, maxsize):
self._queue = deque()
self._maxsize = maxsize
self._mutex = threading.Lock()
self._not_full = threading.Condition(self._mutex)
self._not_empty = threading.Condition(self._mutex)
def put(self, item):
with self._not_full:
while len(self._queue) >= self._maxsize:
self._not_full.wait() # 满了就等待
self._queue.append(item)
self._not_empty.notify() # 通知等待的消费者
def get(self):
with self._not_empty:
while len(self._queue) == 0:
self._not_empty.wait() # 空了就等待
item = self._queue.popleft()
self._not_full.notify() # 通知等待的生产者
return item
为什么需要有界? 无界队列看似方便,但在生产环境中是隐患。如果消费者速度跟不上生产者,无界队列会无限增长,最终耗尽内存导致 OOM(Out Of Memory)。有界队列提供的背压机制迫使生产者在消费者忙不过来时减速——这是系统稳定性的关键。
这个原则在分布式系统中也成立:Kafka 的分区有容量限制,TCP 有接收窗口,HTTP/2 有流控——所有这些本质上都是"有界队列 + 背压"的应用。
3.4 优先队列的多种实现对比
优先队列的抽象很简单:insert(item, priority) 和 extract_min()。但它有多种实现方式,性能差异很大:
| 实现方式 | insert | extract_min | peek | 说明 |
|---|---|---|---|---|
| 无序数组 | O(1) | O(n) | O(n) | 插入快,取出慢 |
| 有序数组 | O(n) | O(1) | O(1) | 取出快,插入慢 |
| 二叉堆 | O(log n) | O(log n) | O(1) | 平衡的折中 |
| 斐波那契堆 | O(1) 均摊 | O(log n) 均摊 | O(1) | 理论最优,实现复杂 |
为什么二叉堆是实践中的默认选择?
- 时间复杂度平衡:insert 和 extract 都是 O(log n),没有明显短板。
- 空间局部性好:二叉堆可以用数组实现(不需要指针),CPU 缓存友好。
- 实现简单:只需要 sift_up 和 sift_down 两个操作。
- 常数因子小:相比红黑树等平衡树,堆的操作更简单,常数因子更小。
Michael Fredman 和 Robert Tarjan 在 1987 年的论文 "Fibonacci Heaps and Their Uses in Improved Network Optimization Algorithms" 中提出了斐波那契堆,将 decrease_key 操作优化到 O(1) 均摊,这对 Dijkstra 算法有重要意义。但在实践中,由于斐波那契堆的常数因子大、实现复杂,只有在图的边数远大于顶点数时才有优势。
# 三种优先队列实现的对比
class UnsortedPQ:
"""无序数组实现的优先队列"""
def __init__(self):
self._data = []
def insert(self, item, priority):
self._data.append((priority, item)) # O(1)
def extract_min(self):
if not self._data:
raise IndexError("empty")
min_idx = 0
for i in range(1, len(self._data)):
if self._data[i][0] < self._data[min_idx][0]:
min_idx = i
# 将最小元素与最后一个交换,然后 pop — O(n)
self._data[min_idx], self._data[-1] = self._data[-1], self._data[min_idx]
return self._data.pop()[1]
class SortedPQ:
"""有序数组实现的优先队列(按优先级降序排列)"""
def __init__(self):
self._data = [] # 按优先级降序排列,最小的在末尾
def insert(self, item, priority):
# 二分查找插入位置 — O(log n) 查找 + O(n) 移动
import bisect
bisect.insort(self._data, (priority, item))
def extract_min(self):
if not self._data:
raise IndexError("empty")
return self._data.pop(0)[1] # O(n) 移动
import heapq
class HeapPQ:
"""二叉堆实现的优先队列"""
def __init__(self):
self._heap = []
self._counter = 0
def insert(self, item, priority):
heapq.heappush(self._heap, (priority, self._counter, item)) # O(log n)
self._counter += 1
def extract_min(self):
if not self._heap:
raise IndexError("empty")
return heapq.heappop(self._heap)[2] # O(log n)
什么时候用哪种?
- 如果你只需要少量元素(< 100),用什么都无所谓,甚至 O(n) 的扫描也很快。
- 如果 insert 远多于 extract(比如你收集大量数据后一次性取最小的几个),无序数组的 O(1) insert 可能更好。
- 一般情况下,二叉堆是最佳选择。Python 中直接用
heapq模块。
我们将在第 12 章详细讲解堆的内部结构、堆排序、以及 heapq 模块的实现原理。
Level 4 · 边界与陷阱
4.1 面试高频题
LeetCode #622:设计循环队列
class MyCircularQueue:
"""
LeetCode 622: Design Circular Queue
要点:
1. 用数组 + front/rear 指针
2. 区分满和空:牺牲一个位置,或用 size 计数器
3. 取模运算实现循环
"""
def __init__(self, k: int):
self._data = [0] * (k + 1)
self._front = 0
self._rear = 0
self._cap = k + 1
def enQueue(self, value: int) -> bool:
if self.isFull():
return False
self._data[self._rear] = value
self._rear = (self._rear + 1) % self._cap
return True
def deQueue(self) -> bool:
if self.isEmpty():
return False
self._front = (self._front + 1) % self._cap
return True
def Front(self) -> int:
if self.isEmpty():
return -1
return self._data[self._front]
def Rear(self) -> int:
if self.isEmpty():
return -1
return self._data[(self._rear - 1 + self._cap) % self._cap]
def isEmpty(self) -> bool:
return self._front == self._rear
def isFull(self) -> bool:
return (self._rear + 1) % self._cap == self._front
面试陷阱:
Rear()返回的是队尾元素,不是 rear 指针位置。rear 指向的是"下一个可写位置",所以队尾元素在(rear - 1 + cap) % cap。- 边界处理:
rear - 1可能是负数(当 rear = 0 时),加 cap 再取模保证正确。
LeetCode #232:用栈实现队列
class MyQueue:
"""
LeetCode 232: Implement Queue using Stacks
两个栈:in_stack 和 out_stack。
push → in_stack
pop/peek → out_stack(空时从 in_stack 倒入)
均摊 O(1)。面试时务必能解释清楚均摊分析。
"""
def __init__(self):
self._in = []
self._out = []
def push(self, x: int) -> None:
self._in.append(x)
def pop(self) -> int:
self._transfer()
return self._out.pop()
def peek(self) -> int:
self._transfer()
return self._out[-1]
def empty(self) -> bool:
return not self._in and not self._out
def _transfer(self):
"""当 out_stack 为空时,将 in_stack 全部倒入"""
if not self._out:
while self._in:
self._out.append(self._in.pop())
面试追问:
- "为什么是均摊 O(1)?" — 用势能法或记账法解释:每个元素最多被转移一次,所以 n 次操作总代价 O(n)。
- "如果要求严格 O(1) 的 pop 呢?" — 不可能只用栈做到。可以用两个队列在 push 时重排(但那时 push 是 O(n))。
LeetCode #933:最近的请求次数
from collections import deque
class RecentCounter:
"""
LeetCode 933: Number of Recent Calls
维护一个队列,存储过去 3000 毫秒内的请求时间戳。
每次新请求到来时,移除所有过期的时间戳。
这是队列作为"滑动时间窗口"的典型应用。
"""
def __init__(self):
self._queue = deque()
def ping(self, t: int) -> int:
self._queue.append(t)
# 移除所有超过 3000ms 前的请求
while self._queue[0] < t - 3000:
self._queue.popleft()
return len(self._queue)
# 示例
rc = RecentCounter()
print(rc.ping(1)) # 1 — 窗口 [-2999, 1],只有 1
print(rc.ping(100)) # 2 — 窗口 [-2900, 100],有 1, 100
print(rc.ping(3001)) # 3 — 窗口 [1, 3001],有 1, 100, 3001
print(rc.ping(3002)) # 3 — 窗口 [2, 3002],有 100, 3001, 3002
4.2 deque vs list 性能对比实测
让我们做一个全面的性能对比:
import time
from collections import deque
def test_append_right(n):
"""右端追加:list vs deque"""
# list
lst = []
start = time.perf_counter()
for i in range(n):
lst.append(i)
t_list = time.perf_counter() - start
# deque
dq = deque()
start = time.perf_counter()
for i in range(n):
dq.append(i)
t_deque = time.perf_counter() - start
return t_list, t_deque
def test_append_left(n):
"""左端追加:list vs deque"""
# list
lst = []
start = time.perf_counter()
for i in range(n):
lst.insert(0, i) # O(n)!
t_list = time.perf_counter() - start
# deque
dq = deque()
start = time.perf_counter()
for i in range(n):
dq.appendleft(i) # O(1)
t_deque = time.perf_counter() - start
return t_list, t_deque
def test_pop_left(n):
"""左端弹出:list vs deque"""
# list
lst = list(range(n))
start = time.perf_counter()
for i in range(n):
lst.pop(0) # O(n)!
t_list = time.perf_counter() - start
# deque
dq = deque(range(n))
start = time.perf_counter()
for i in range(n):
dq.popleft() # O(1)
t_deque = time.perf_counter() - start
return t_list, t_deque
def test_random_access(n):
"""随机访问:list vs deque"""
lst = list(range(n))
dq = deque(range(n))
import random
indices = [random.randint(0, n-1) for _ in range(10000)]
start = time.perf_counter()
for i in indices:
_ = lst[i] # O(1)
t_list = time.perf_counter() - start
start = time.perf_counter()
for i in indices:
_ = dq[i] # O(n)!
t_deque = time.perf_counter() - start
return t_list, t_deque
n = 100_000
print("=" * 60)
print(f"Performance comparison: list vs deque (n={n:,})")
print("=" * 60)
t_list, t_deque = test_append_right(n)
print(f"Append right: list={t_list:.4f}s deque={t_deque:.4f}s "
f"winner={'tie' if abs(t_list-t_deque)/max(t_list,t_deque) < 0.2 else 'list' if t_list < t_deque else 'deque'}")
t_list, t_deque = test_append_left(n)
print(f"Append left: list={t_list:.4f}s deque={t_deque:.4f}s "
f"deque is {t_list/t_deque:.0f}x faster")
t_list, t_deque = test_pop_left(n)
print(f"Pop left: list={t_list:.4f}s deque={t_deque:.4f}s "
f"deque is {t_list/t_deque:.0f}x faster")
t_list, t_deque = test_random_access(n)
print(f"Random access: list={t_list:.4f}s deque={t_deque:.4f}s "
f"list is {t_deque/t_list:.0f}x faster")
典型结果:
============================================================
Performance comparison: list vs deque (n=100,000)
============================================================
Append right: list=0.0052s deque=0.0048s winner=tie
Append left: list=2.1500s deque=0.0048s deque is 448x faster
Pop left: list=1.8900s deque=0.0041s deque is 461x faster
Random access: list=0.0003s deque=0.2100s list is 700x faster
结论总结:
| 操作 | list | deque | 赢家 |
|---|---|---|---|
| 右端 append/pop | O(1) 均摊 | O(1) | 平手 |
| 左端 insert/pop | O(n) | O(1) | deque |
随机访问 [i] |
O(1) | O(n) | list |
| 内存连续性 | 连续 | 块状 | list 略优 |
选择规则:
- 只在右端操作 → 用 list(更简单,随机访问 O(1))
- 需要左端操作 → 用 deque
- 需要随机访问 → 用 list
- 实现队列 → 永远用 deque
4.3 什么时候用队列,什么时候用栈
这是一个看似简单但实际上很有深度的问题。核心区别在于搜索/处理的顺序:
| 特性 | 栈(Stack) | 队列(Queue) |
|---|---|---|
| 顺序 | LIFO — 后进先出 | FIFO — 先进先出 |
| 搜索策略 | DFS — 深度优先 | BFS — 广度优先 |
| 搜索行为 | 先走到底,再回溯 | 一层一层向外扩展 |
| 找最短路径? | 不保证 | 保证(无权图) |
| 空间复杂度 | O(最大深度) | O(最大宽度) |
| 典型应用 | 递归、回溯、括号匹配 | 调度、层序遍历、最短路径 |
DFS vs BFS 的选择指南:
- 求最短路径 → BFS。DFS 可能找到一条很长的路径,而 BFS 保证找到最短的。
- 检测可达性(只需要知道能不能到) → DFS 和 BFS 都行。DFS 通常空间更省(递归栈)。
- 搜索空间很深但答案较浅 → BFS 更快找到。
- 搜索空间很宽但答案很深 → DFS 更节省空间。
- 需要遍历所有可能 → 两者都行,取决于具体问题的结构。
from collections import deque
def solve_maze_bfs(maze, start, end):
"""BFS 走迷宫 — 保证找到最短路径"""
rows, cols = len(maze), len(maze[0])
visited = {start}
queue = deque([(start, [start])])
while queue:
(r, c), path = queue.popleft()
if (r, c) == end:
return path # 第一次到达就是最短路径!
for dr, dc in [(0,1), (0,-1), (1,0), (-1,0)]:
nr, nc = r + dr, c + dc
if (0 <= nr < rows and 0 <= nc < cols
and maze[nr][nc] == 0 and (nr, nc) not in visited):
visited.add((nr, nc))
queue.append(((nr, nc), path + [(nr, nc)]))
return None # 无解
def solve_maze_dfs(maze, start, end):
"""DFS 走迷宫 — 能找到路径,但不保证最短"""
rows, cols = len(maze), len(maze[0])
visited = set()
def dfs(r, c, path):
if (r, c) == end:
return path
visited.add((r, c))
for dr, dc in [(0,1), (0,-1), (1,0), (-1,0)]:
nr, nc = r + dr, c + dc
if (0 <= nr < rows and 0 <= nc < cols
and maze[nr][nc] == 0 and (nr, nc) not in visited):
result = dfs(nr, nc, path + [(nr, nc)])
if result:
return result
return None
return dfs(start[0], start[1], [start])
# 0=通路,1=墙壁
maze = [
[0, 0, 1, 0, 0],
[0, 1, 0, 1, 0],
[0, 0, 0, 0, 0],
[1, 1, 0, 1, 0],
[0, 0, 0, 1, 0]
]
bfs_path = solve_maze_bfs(maze, (0, 0), (4, 4))
dfs_path = solve_maze_dfs(maze, (0, 0), (4, 4))
print(f"BFS path length: {len(bfs_path)}") # 最短
print(f"DFS path length: {len(dfs_path)}") # 可能更长
实战中的另一个考量:递归深度限制。
Python 默认递归深度限制是 1000。如果你的 DFS 搜索空间超过 1000 层,递归版 DFS 会报 RecursionError。解决方案:
sys.setrecursionlimit(更大的数)— 简单但不优雅,可能栈溢出。- 用显式栈代替递归 — 把 DFS 改成迭代版本,使用自己管理的栈。
- 改用 BFS — 如果问题允许的话。
def dfs_iterative(graph, start):
"""迭代版 DFS — 用显式栈代替递归"""
visited = set()
stack = [start]
while stack:
node = stack.pop() # 栈:后进先出
if node in visited:
continue
visited.add(node)
print(node, end=" ")
for neighbor in graph[node]:
if neighbor not in visited:
stack.append(neighbor)
return visited
注意:迭代 DFS 和递归 DFS 的遍历顺序可能不完全一致(取决于邻居入栈的顺序),但都是深度优先。
4.4 边界情况与陷阱总结
陷阱 1:deque 的 maxlen 参数
from collections import deque
# maxlen 会自动丢弃另一端的元素!
dq = deque(maxlen=3)
dq.append(1)
dq.append(2)
dq.append(3)
dq.append(4) # 1 被自动丢弃!
print(dq) # deque([2, 3, 4], maxlen=3)
dq.appendleft(0) # 4 被自动丢弃!
print(dq) # deque([0, 2, 3], maxlen=3)
这个特性可以用来实现"固定大小的滑动窗口",但如果你不期望这个行为,可能会导致难以发现的 bug。
陷阱 2:多线程中的 deque
collections.deque 的 append() 和 popleft() 在 CPython 中由于 GIL 的存在是线程安全的(单个操作是原子的)。但这不代表你的代码是线程安全的:
from collections import deque
dq = deque()
# ❌ 这段代码不是线程安全的!
if len(dq) > 0: # 检查和操作之间可能被其他线程插入
item = dq.popleft() # 此时 dq 可能已经被其他线程清空
# ✅ 正确做法:用 try/except
try:
item = dq.popleft()
except IndexError:
item = None
# ✅ 或者使用 queue.Queue(自带锁)
import queue
q = queue.Queue()
陷阱 3:BFS 中忘记标记已访问
# ❌ 错误:在出队时标记
while queue:
node = queue.popleft()
visited.add(node) # 太晚了!同一个节点可能已经被多次入队
for neighbor in graph[node]:
if neighbor not in visited:
queue.append(neighbor)
# ✅ 正确:在入队时标记
while queue:
node = queue.popleft()
for neighbor in graph[node]:
if neighbor not in visited:
visited.add(neighbor) # 入队时就标记
queue.append(neighbor)
在出队时标记会导致同一个节点被多次入队(多个邻居都指向它),浪费空间和时间。在入队时标记保证每个节点最多入队一次。
陷阱 4:循环队列的 size 计算
# 当 rear >= front 时:size = rear - front
# 当 rear < front 时:size = rear - front + capacity
# 合并为一个公式:
size = (rear - front + capacity) % capacity
这个公式在 rear == front 时给出 0(空),在 (rear + 1) % capacity == front 时给出 capacity - 1(满,因为我们牺牲了一个位置)。
总结
队列是"公平性"的数学化身。当你需要保证"先来先服务"的语义时,队列就是你的选择。
本章的核心要点:
- 用 deque,不用 list — 这是 Python 中最重要的队列实践。
- 循环队列用取模 — 理解
(index + 1) % capacity就理解了循环的本质。 - BFS 用队列 — 队列保证了"层层扩展"的顺序,因此天然找到最短路径。
- 优先队列用堆 — 第 12 章详细展开。
- 阻塞队列解耦生产者和消费者 — 这是并发编程和分布式系统的基石。
队列看似简单,但它的应用从操作系统调度到分布式消息系统,贯穿了计算机科学的每一层抽象。理解队列,就是理解"有序处理"这个基本计算模式。
下一步学习路径:
- 第 10 章:图的遍历(BFS 和 DFS 的完整展开)
- 第 12 章:堆与优先队列
- 第 16 章:图算法进阶(Dijkstra 等带权最短路径)