第 7 章

队列与双端队列

第七章:队列与双端队列

你在食堂排队打饭:先来的人先打到饭,后来的人排在队尾。你在银行取号等候:号码小的先被叫到窗口。你的电脑同时打开了十个程序,操作系统把它们排成一个队列,轮流分配 CPU 时间。

这些场景的共同数学结构是队列(Queue)。最先进入的元素,最先被取出——First In, First Out,简称 FIFO。

队列和栈(第 5 章)是一对"镜像"数据结构:栈是 LIFO,队列是 FIFO。栈解决的是"最近的事情最先处理"的问题(如递归、撤销),队列解决的是"先来的事情先处理"的问题(如调度、广度优先搜索)。两者加在一起,覆盖了绝大多数"按时间顺序处理事件"的场景。

本章我们将从最简单的队列出发,逐步深入循环队列、双端队列,最后触及优先队列和消息队列。你会发现,队列是连接数据结构与操作系统、网络协议、分布式系统的桥梁。


Level 1 · 你需要知道的

1.1 队列的基本概念

队列是一种受限的线性数据结构。它只允许在一端(队尾,rear/back)插入,在另一端(队头,front)删除。

队列支持的核心操作:

操作 含义 时间复杂度
enqueue(x) 将元素 x 加入队尾 O(1)
dequeue() 移除并返回队头元素 O(1)
peek() / front() 查看队头元素但不移除 O(1)

附加操作:

FIFO 性质的形式化描述

设队列 Q 上依次执行 enqueue(a₁), enqueue(a₂), ..., enqueue(aₙ),则执行 n 次 dequeue() 得到的序列是 a₁, a₂, ..., aₙ。即:输出顺序与输入顺序完全一致。

为什么要这个约束? 这个问题和栈一样:约束带来语义清晰性。当你看到一个队列,你立刻知道"先来的先走"。这种公平性假设在无数场景中自然成立——排队、调度、消息传递。队列的设计哲学就是:用结构保证公平性,而不是靠程序员手动维护顺序

1.2 用 Python 的 collections.deque 实现队列

很多初学者用 Python 的 list 来实现队列:

# ❌ 错误做法:用 list 实现队列
queue = []
queue.append("A")  # enqueue
queue.append("B")
queue.pop(0)       # dequeue — 这是 O(n)!

这是一个常见的性能陷阱。 list.pop(0) 需要把后面所有元素往前移一位,时间复杂度是 O(n)。如果你的队列有一百万个元素,每次 dequeue 都要移动一百万个元素——这显然不可接受。

正确的做法是使用 collections.deque

from collections import deque

class Queue:
    """基于 collections.deque 的队列实现"""
    
    def __init__(self):
        self._data = deque()
    
    def enqueue(self, item):
        """将元素加入队尾 — O(1)"""
        self._data.append(item)
    
    def dequeue(self):
        """移除并返回队头元素 — O(1)
        
        Raises:
            IndexError: 如果队列为空
        """
        if self.is_empty():
            raise IndexError("dequeue from empty queue")
        return self._data.popleft()
    
    def peek(self):
        """查看队头元素但不移除 — O(1)
        
        Raises:
            IndexError: 如果队列为空
        """
        if self.is_empty():
            raise IndexError("peek from empty queue")
        return self._data[0]
    
    def is_empty(self):
        """判断队列是否为空 — O(1)"""
        return len(self._data) == 0
    
    def size(self):
        """返回队列中元素个数 — O(1)"""
        return len(self._data)
    
    def __repr__(self):
        return f"Queue({list(self._data)})"


# 使用示例
q = Queue()
q.enqueue("Alice")
q.enqueue("Bob")
q.enqueue("Charlie")

print(q.dequeue())  # Alice — 先进先出
print(q.dequeue())  # Bob
print(q.peek())     # Charlie — 只看不取
print(q.size())     # 1

为什么 deque 的 popleft() 是 O(1)? 因为 deque 内部不是一个连续数组,而是一个双向链表连接的固定长度块(block)结构。两端的操作都不需要移动其他元素。我们在 Level 2 中会详细分析它的内部实现。

1.3 循环队列的实现

在很多底层系统中(操作系统内核、嵌入式设备、网络缓冲区),我们不能用动态分配内存的 deque,而是需要一个固定大小的数组来实现队列。这就是循环队列(Circular Queue)

循环队列的核心思想:用一个数组和两个指针(front、rear)来维护队列。当指针到达数组末尾时,通过取模运算让它"绕回"到数组开头,形成逻辑上的环形结构。

class CircularQueue:
    """基于数组的循环队列实现"""
    
    def __init__(self, capacity: int):
        """
        Args:
            capacity: 队列最大容量
        """
        self._data = [None] * (capacity + 1)  # 多分配一个空间用于区分满和空
        self._front = 0
        self._rear = 0
        self._capacity = capacity + 1
    
    def enqueue(self, item) -> bool:
        """将元素加入队尾 — O(1)
        
        Returns:
            True 如果入队成功,False 如果队列已满
        """
        if self.is_full():
            return False
        self._data[self._rear] = item
        self._rear = (self._rear + 1) % self._capacity
        return True
    
    def dequeue(self):
        """移除并返回队头元素 — O(1)
        
        Raises:
            IndexError: 如果队列为空
        """
        if self.is_empty():
            raise IndexError("dequeue from empty circular queue")
        item = self._data[self._front]
        self._data[self._front] = None  # 帮助 GC
        self._front = (self._front + 1) % self._capacity
        return item
    
    def peek(self):
        """查看队头元素 — O(1)"""
        if self.is_empty():
            raise IndexError("peek from empty circular queue")
        return self._data[self._front]
    
    def is_empty(self) -> bool:
        """队列为空的条件:front == rear"""
        return self._front == self._rear
    
    def is_full(self) -> bool:
        """队列为满的条件:(rear + 1) % capacity == front
        
        注意:我们牺牲一个位置来区分满和空。
        如果不这样做,满和空的状态都是 front == rear,无法区分。
        """
        return (self._rear + 1) % self._capacity == self._front
    
    def size(self) -> int:
        """当前队列中的元素个数"""
        return (self._rear - self._front + self._capacity) % self._capacity
    
    def __repr__(self):
        if self.is_empty():
            return "CircularQueue([])"
        items = []
        i = self._front
        while i != self._rear:
            items.append(self._data[i])
            i = (i + 1) % self._capacity
        return f"CircularQueue({items})"


# 使用示例
cq = CircularQueue(5)
for i in range(5):
    cq.enqueue(i)

print(cq)           # CircularQueue([0, 1, 2, 3, 4])
print(cq.is_full()) # True
print(cq.dequeue()) # 0
print(cq.dequeue()) # 1
cq.enqueue(5)       # 利用了之前释放的空间
cq.enqueue(6)
print(cq)           # CircularQueue([2, 3, 4, 5, 6])

循环队列的关键设计决策

  1. 为什么多分配一个空间? 如果 capacity = 5,我们实际分配 6 个位置。这样"满"的条件是 (rear + 1) % capacity == front,而"空"的条件是 front == rear,两者永远不会混淆。另一种做法是维护一个 size 计数器,但这会增加每次操作的开销(虽然只是一个加减法,在高频场景下也是有意义的)。

  2. 取模运算的含义(index + 1) % capacity 实现的是"到了末尾就回到开头"。这就是"循环"的本质——指针在数组上走一个圈。

  3. 与 Python 的 deque 的区别:循环队列是固定大小的,满了就拒绝入队(或者抛出异常)。deque 是动态大小的,会自动扩容。循环队列适合"缓冲区"场景——你不希望无限增长,而是希望控制内存使用。

1.4 BFS 基础:层序遍历与最短路径

队列最经典的算法应用是广度优先搜索(Breadth-First Search,BFS)。BFS 是"一层一层向外扩展"的搜索策略,它天然需要一个队列来维护"下一层要访问的节点"。

BFS 的基本模板

from collections import deque

def bfs(graph, start):
    """BFS 遍历图
    
    Args:
        graph: 邻接表表示的图,graph[node] 是 node 的所有邻居
        start: 起始节点
    
    Returns:
        visited: 从 start 能到达的所有节点
    """
    visited = set([start])
    queue = deque([start])
    
    while queue:
        node = queue.popleft()  # 取出队头
        # 处理当前节点
        print(node, end=" ")
        
        for neighbor in graph[node]:
            if neighbor not in visited:
                visited.add(neighbor)
                queue.append(neighbor)  # 加入队尾
    
    return visited

BFS 求最短路径(无权图)

在无权图中(所有边的权重都是 1),BFS 天然能找到从起点到任何可达节点的最短路径。原因很直觉:BFS 先访问距离为 1 的节点,再访问距离为 2 的节点……第一次到达某个节点时,走的一定是最短路径。

def shortest_path_bfs(graph, start, target):
    """在无权图中找从 start 到 target 的最短路径
    
    Returns:
        路径列表,如果不可达返回 None
    """
    if start == target:
        return [start]
    
    visited = {start}
    queue = deque([(start, [start])])  # (当前节点, 从start到当前节点的路径)
    
    while queue:
        node, path = queue.popleft()
        
        for neighbor in graph[node]:
            if neighbor not in visited:
                new_path = path + [neighbor]
                if neighbor == target:
                    return new_path
                visited.add(neighbor)
                queue.append((neighbor, new_path))
    
    return None  # 不可达


# 示例
graph = {
    'A': ['B', 'C'],
    'B': ['A', 'D', 'E'],
    'C': ['A', 'F'],
    'D': ['B'],
    'E': ['B', 'F'],
    'F': ['C', 'E']
}

path = shortest_path_bfs(graph, 'A', 'F')
print(path)  # ['A', 'C', 'F'] — 最短路径,长度为 2

**层序遍历(Level Order Traversal)**是 BFS 在树上的应用:

def level_order_traversal(root):
    """二叉树的层序遍历
    
    Returns:
        result: 二维列表,result[i] 是第 i 层的所有节点值
    """
    if not root:
        return []
    
    result = []
    queue = deque([root])
    
    while queue:
        level_size = len(queue)  # 当前层的节点数
        level_nodes = []
        
        for _ in range(level_size):
            node = queue.popleft()
            level_nodes.append(node.val)
            
            if node.left:
                queue.append(node.left)
            if node.right:
                queue.append(node.right)
        
        result.append(level_nodes)
    
    return result

关于 BFS 的更多应用——图遍历、最短路径变体、拓扑排序——我们将在第 10 章(图的基础)和第 16 章(图算法进阶)中详细展开。

1.5 优先队列简介

普通队列的规则是"先来先服务"。但在很多实际场景中,我们需要的是"优先级最高的先服务"——这就是优先队列(Priority Queue)

优先队列的操作接口和普通队列类似,但 dequeue 操作不是取出最早进入的元素,而是取出优先级最高的元素

import heapq

class PriorityQueue:
    """基于 heapq 的最小优先队列"""
    
    def __init__(self):
        self._heap = []
        self._counter = 0  # 用于打破优先级相同时的平局
    
    def enqueue(self, item, priority):
        """将元素加入优先队列
        
        Args:
            item: 元素
            priority: 优先级(数字越小优先级越高)
        """
        heapq.heappush(self._heap, (priority, self._counter, item))
        self._counter += 1
    
    def dequeue(self):
        """取出优先级最高(数字最小)的元素"""
        if self.is_empty():
            raise IndexError("dequeue from empty priority queue")
        priority, _, item = heapq.heappop(self._heap)
        return item
    
    def peek(self):
        """查看优先级最高的元素"""
        if self.is_empty():
            raise IndexError("peek from empty priority queue")
        return self._heap[0][2]
    
    def is_empty(self):
        return len(self._heap) == 0
    
    def size(self):
        return len(self._heap)


# 使用示例:医院急诊分诊
er = PriorityQueue()
er.enqueue("感冒患者", priority=5)
er.enqueue("心脏病发作", priority=1)
er.enqueue("骨折", priority=3)

print(er.dequeue())  # 心脏病发作 — 优先级最高
print(er.dequeue())  # 骨折
print(er.dequeue())  # 感冒患者

优先队列的高效实现依赖于**堆(Heap)**数据结构。我们在第 12 章会详细讲解堆的原理和实现。这里你只需要知道:Python 的 heapq 模块给你提供了一个 O(log n) 入队、O(log n) 出队的优先队列。

1.6 常见错误:用 list 做队列

让我们量化一下"用 list 做队列"到底有多慢:

import time
from collections import deque

def benchmark_list_queue(n):
    """用 list 做队列的性能"""
    q = []
    start = time.perf_counter()
    for i in range(n):
        q.append(i)
    for i in range(n):
        q.pop(0)  # O(n) 操作!
    return time.perf_counter() - start

def benchmark_deque_queue(n):
    """用 deque 做队列的性能"""
    q = deque()
    start = time.perf_counter()
    for i in range(n):
        q.append(i)
    for i in range(n):
        q.popleft()  # O(1) 操作
    return time.perf_counter() - start

# 测试不同规模
for n in [10_000, 100_000, 1_000_000]:
    t_list = benchmark_list_queue(n)
    t_deque = benchmark_deque_queue(n)
    print(f"n={n:>10,}: list={t_list:.4f}s, deque={t_deque:.4f}s, "
          f"ratio={t_list/t_deque:.1f}x")

典型输出(具体数字因机器而异):

n=    10,000: list=0.0150s, deque=0.0008s, ratio=18.8x
n=   100,000: list=1.4500s, deque=0.0070s, ratio=207.1x
n= 1,000,000: list=148.30s, deque=0.0680s, ratio=2181.0x

差距是二次级别的:list 版本的总时间是 O(n²)(n 次 O(n) 的 pop(0)),而 deque 版本是 O(n)(n 次 O(1) 的 popleft())。当 n 翻 10 倍,list 版本慢 100 倍,deque 版本只慢 10 倍。

记住这条规则:在 Python 中,永远不要用 list.pop(0) 做队列的出队操作。用 collections.deque


Level 2 · 它是怎么运行的

2.1 collections.deque 的内部实现

CPython 中 collections.deque 的源码位于 Modules/_collectionsmodule.c。它的内部结构不是一个简单的数组,也不是一个简单的链表,而是一种混合结构:双向链表连接的固定长度块(block)

每个 block 是一个包含 64 个 PyObject* 指针的数组(CPython 3.x 中 BLOCKLEN = 64)。多个 block 通过双向链表的方式串联起来。deque 对象本身维护:

deque 的内部布局:

  leftblock           ...          rightblock
  ┌──────────┐      ┌──────────┐      ┌──────────┐
  │ [64 slots] │ ←→ │ [64 slots] │ ←→ │ [64 slots] │
  └──────────┘      └──────────┘      └──────────┘
   ↑ leftindex                          rightindex ↑

appendleft() / popleft() 的过程

  1. popleft():取出 leftblock[leftindex] 处的元素,leftindex += 1。如果 leftindex 超过了 BLOCKLEN(当前 block 用完了),则释放这个 block,leftblock 指向下一个 block,leftindex = 0
  2. appendleft(x)leftindex -= 1,将 x 放在 leftblock[leftindex]。如果 leftindex < 0(当前 block 左边已满),则分配一个新 block,链接到 leftblock 的左边。

为什么这种设计比单纯链表更好?

  1. 缓存友好:每个 block 内的 64 个指针在内存中连续,访问它们时 CPU 缓存命中率高。纯链表的每个节点可能散落在内存各处。
  2. 内存开销低:纯链表每个元素需要一个 prevnext 指针(16 字节),加上 Python 对象头(56 字节)。deque 的 block 结构中,每个元素只需要一个 8 字节的指针,外加 block 本身的 prev/next 开销被 64 个元素分摊。
  3. 两端操作 O(1):append/appendleft/pop/popleft 都是真正的 O(1),没有均摊。

deque 的缺点:随机访问是 O(n)。deque[i] 需要从某一端开始,逐 block 跳转,找到第 i 个元素所在的 block,然后在 block 内定位。不像数组那样 O(1) 直接定位。

Raymond Hettinger(deque 的主要实现者,Python 核心开发者)在 CPython issue tracker 和 PyCon 演讲中多次解释了这个设计决策:"deque 的定位是双端操作的高效容器,不是通用的随机访问序列。如果你需要随机访问,用 list。"

2.2 循环队列 vs 链表队列的性能对比

队列有两种经典的底层实现:数组(循环队列)和链表。它们各有优劣:

维度 循环队列(数组) 链表队列
enqueue O(1),直接写入 O(1),新建节点
dequeue O(1),移动指针 O(1),删除头节点
内存布局 连续,缓存友好 散列,缓存不友好
内存开销 固定大小,可能浪费 按需分配,无浪费
最大容量 固定(需预先指定) 理论上无限
分配/释放 无(复用数组空间) 每次操作都涉及分配/释放

在什么场景下选择哪种?

性能实测

import time
from collections import deque

class LinkedNode:
    __slots__ = ('val', 'next')
    def __init__(self, val):
        self.val = val
        self.next = None

class LinkedQueue:
    """链表实现的队列"""
    def __init__(self):
        self._head = None
        self._tail = None
        self._size = 0
    
    def enqueue(self, item):
        node = LinkedNode(item)
        if self._tail:
            self._tail.next = node
        else:
            self._head = node
        self._tail = node
        self._size += 1
    
    def dequeue(self):
        if not self._head:
            raise IndexError("empty")
        val = self._head.val
        self._head = self._head.next
        if not self._head:
            self._tail = None
        self._size -= 1
        return val

def benchmark(queue_class, n, *args):
    q = queue_class(*args) if args else queue_class()
    start = time.perf_counter()
    for i in range(n):
        if hasattr(q, 'enqueue'):
            q.enqueue(i)
        else:
            q.append(i)
    for i in range(n):
        if hasattr(q, 'dequeue'):
            q.dequeue()
        else:
            q.popleft()
    return time.perf_counter() - start

n = 1_000_000
print(f"deque:        {benchmark(deque, n):.4f}s")
print(f"LinkedQueue:  {benchmark(LinkedQueue, n):.4f}s")
print(f"CircularQueue:{benchmark(CircularQueue, n, n):.4f}s")

典型结果:

deque:        0.068s
LinkedQueue:  0.350s
CircularQueue:0.120s

deque 最快,因为它是 C 实现的,而且块状结构缓存友好。CircularQueue 也不慢(纯 Python 限制了速度)。LinkedQueue 最慢,因为每次 enqueue 都要创建一个新的 Python 对象(节点),每次 dequeue 都要释放一个对象——Python 的对象创建开销很大。

2.3 双端队列的应用

**双端队列(Double-Ended Queue,Deque)**允许在两端同时进行插入和删除。Python 的 collections.deque 本身就是一个完整的双端队列。

应用一:回文检查

回文是一个从前往后读和从后往前读都一样的字符串。用双端队列可以优雅地实现回文检查:从两端同时取字符进行比较。

from collections import deque

def is_palindrome(s: str) -> bool:
    """用双端队列检查回文
    
    思路:将字符放入 deque,然后同时从两端取出比较。
    如果每对字符都相等,则是回文。
    """
    # 预处理:只保留字母和数字,转小写
    chars = deque(c.lower() for c in s if c.isalnum())
    
    while len(chars) > 1:
        front = chars.popleft()
        back = chars.pop()
        if front != back:
            return False
    
    return True


print(is_palindrome("A man, a plan, a canal: Panama"))  # True
print(is_palindrome("race a car"))                       # False

这个解法的时间复杂度是 O(n),空间复杂度是 O(n)。虽然双指针法可以做到 O(1) 空间,但 deque 的方法在概念上更直观地展示了"双端操作"的思想。

应用二:滑动窗口最大值

这是一个经典的面试题(LeetCode #239):给定一个数组和一个窗口大小 k,返回每个窗口中的最大值。

暴力做法是 O(nk),但利用双端队列可以做到 O(n):

from collections import deque

def max_sliding_window(nums: list, k: int) -> list:
    """滑动窗口最大值 — O(n)
    
    维护一个递减的双端队列:
    - 队列存储的是索引
    - 队列中的索引对应的值是递减的
    - 队头永远是当前窗口的最大值的索引
    
    为什么是递减?因为如果 nums[i] >= nums[j] 且 i > j,
    那么 j 永远不可能成为任何未来窗口的最大值(i 比 j 大且出现得晚)。
    所以 j 可以被安全地丢弃。
    """
    if not nums or k == 0:
        return []
    
    dq = deque()  # 存储索引,对应值递减
    result = []
    
    for i in range(len(nums)):
        # 移除队头过期的索引(超出窗口范围)
        while dq and dq[0] < i - k + 1:
            dq.popleft()
        
        # 移除队尾所有比当前元素小的索引
        while dq and nums[dq[-1]] <= nums[i]:
            dq.pop()
        
        dq.append(i)
        
        # 当窗口形成后,记录最大值
        if i >= k - 1:
            result.append(nums[dq[0]])
    
    return result


nums = [1, 3, -1, -3, 5, 3, 6, 7]
print(max_sliding_window(nums, 3))
# [3, 3, 5, 5, 6, 7]

这个算法在第 6 章(链表与滑动窗口)中也有所涉及。这里是队列视角的阐述:双端队列维护的是一个"候选最大值"的单调递减序列,每个元素最多入队一次、出队一次,因此总时间复杂度是 O(n)。

2.4 用队列实现栈 / 用栈实现队列

这两个问题是理解队列和栈本质区别的绝佳练习,也是面试高频题。

用两个队列实现栈

from collections import deque

class StackUsingQueues:
    """用两个队列实现栈
    
    策略:让 push 操作承担 O(n) 代价,pop 操作 O(1)。
    
    push 时:将新元素放入空队列 q2,然后把 q1 的所有元素依次出队再入队到 q2。
    最后交换 q1 和 q2。这样 q1 的队头永远是最后入栈的元素。
    """
    
    def __init__(self):
        self._q1 = deque()  # 主队列
        self._q2 = deque()  # 辅助队列
    
    def push(self, x):
        """O(n) — 将新元素放到队头"""
        self._q2.append(x)
        while self._q1:
            self._q2.append(self._q1.popleft())
        self._q1, self._q2 = self._q2, self._q1
    
    def pop(self):
        """O(1) — 队头就是栈顶"""
        if not self._q1:
            raise IndexError("pop from empty stack")
        return self._q1.popleft()
    
    def top(self):
        """O(1)"""
        if not self._q1:
            raise IndexError("top from empty stack")
        return self._q1[0]
    
    def empty(self):
        return len(self._q1) == 0

用两个栈实现队列(均摊 O(1))

这个更有趣,因为它展示了均摊分析的威力:

class QueueUsingStacks:
    """用两个栈实现队列 — 均摊 O(1)
    
    策略:
    - in_stack: 负责入队(push 到 in_stack)
    - out_stack: 负责出队(从 out_stack pop)
    
    当 out_stack 为空时,将 in_stack 的所有元素倒入 out_stack。
    倒入后,out_stack 的栈顶就是最早入队的元素。
    
    均摊分析:每个元素最多被 push 两次(进 in_stack 一次,进 out_stack 一次)
    和 pop 两次(出 in_stack 一次,出 out_stack 一次)。
    所以 n 次操作总计 O(n),均摊每次 O(1)。
    """
    
    def __init__(self):
        self._in_stack = []   # 入队端
        self._out_stack = []  # 出队端
    
    def enqueue(self, x):
        """O(1) — 直接 push 到 in_stack"""
        self._in_stack.append(x)
    
    def dequeue(self):
        """均摊 O(1) — 从 out_stack pop"""
        if not self._out_stack:
            if not self._in_stack:
                raise IndexError("dequeue from empty queue")
            # 将 in_stack 倒入 out_stack
            while self._in_stack:
                self._out_stack.append(self._in_stack.pop())
        return self._out_stack.pop()
    
    def peek(self):
        """均摊 O(1)"""
        if not self._out_stack:
            if not self._in_stack:
                raise IndexError("peek from empty queue")
            while self._in_stack:
                self._out_stack.append(self._in_stack.pop())
        return self._out_stack[-1]
    
    def is_empty(self):
        return not self._in_stack and not self._out_stack


# 验证
q = QueueUsingStacks()
q.enqueue(1)
q.enqueue(2)
q.enqueue(3)
print(q.dequeue())  # 1 — FIFO!
q.enqueue(4)
print(q.dequeue())  # 2
print(q.dequeue())  # 3
print(q.dequeue())  # 4

均摊分析的关键洞察:虽然单次 dequeue 在最坏情况下是 O(n)(需要把所有元素从 in_stack 倒到 out_stack),但每个元素只会被倒一次。所以 n 次 dequeue 操作的总代价是 O(n),均摊每次 O(1)。这和动态数组的 append 均摊 O(1) 是同样的分析思路。

2.5 多源 BFS

标准 BFS 从一个起点出发。多源 BFS 从多个起点同时出发,一层一层向外扩展。这就像你在地图上同时从多个城市放水,水面同时向外扩展,最终相遇。

经典问题:给定一个 0/1 矩阵,0 代表空地,1 代表建筑。求每个空地到最近建筑的距离。

from collections import deque

def distance_to_nearest_building(grid):
    """多源 BFS:计算每个空地到最近建筑的距离
    
    思路:将所有建筑(值为1)同时作为起点放入队列,
    然后 BFS 向外扩展。每个空地第一次被访问时的层数就是它到最近建筑的距离。
    """
    rows, cols = len(grid), len(grid[0])
    distances = [[float('inf')] * cols for _ in range(rows)]
    queue = deque()
    
    # 把所有建筑作为起点
    for r in range(rows):
        for c in range(cols):
            if grid[r][c] == 1:
                queue.append((r, c, 0))
                distances[r][c] = 0
    
    # BFS
    directions = [(0, 1), (0, -1), (1, 0), (-1, 0)]
    while queue:
        r, c, dist = queue.popleft()
        for dr, dc in directions:
            nr, nc = r + dr, c + dc
            if 0 <= nr < rows and 0 <= nc < cols and distances[nr][nc] == float('inf'):
                distances[nr][nc] = dist + 1
                queue.append((nr, nc, dist + 1))
    
    return distances


grid = [
    [0, 0, 0, 1],
    [0, 0, 0, 0],
    [1, 0, 0, 0],
    [0, 0, 0, 0]
]

result = distance_to_nearest_building(grid)
for row in result:
    print(row)
# [2, 3, 2, 0]  — 距离最近建筑
# [1, 2, 2, 1]
# [0, 1, 2, 2]
# [1, 2, 3, 3]

多源 BFS 的时间复杂度和单源 BFS 一样,都是 O(V + E)——因为每个节点仍然最多被访问一次。它的巧妙之处在于:通过把多个起点同时放入初始队列,把"多次单源 BFS"的 O(k(V+E)) 问题降为了一次 O(V+E)。

类似的应用包括 LeetCode #542(01 矩阵),以及"腐烂的橘子"(#994)。


Level 3 · 规范怎么定义的

3.1 队列在操作系统中的应用

队列是操作系统中使用最频繁的数据结构之一。几乎所有需要"排队等待处理"的场景都在使用某种形式的队列。

进程调度队列

操作系统管理 CPU 的方式就是维护一组队列。以 Linux 的 CFS(Completely Fair Scheduler,完全公平调度器)之前的 O(1) 调度器为例(Ingo Molnár, 2003):

Linux O(1) 调度器的结构(简化):

Priority Level 0:  [P1] → [P5] → [P9]       ← 最高优先级
Priority Level 1:  [P2] → [P7]
Priority Level 2:  [P3] → [P4] → [P8] → [P6]
...
Priority Level 139: [P10]                      ← 最低优先级

每个优先级是一个 FIFO 队列。
调度器选择最高非空优先级的队头进程运行。

为什么用队列而不是其他结构? 因为队列天然保证了公平性:在同一优先级内,先到的进程先运行。这是避免饥饿(starvation)的最基本机制。

I/O 请求队列

当多个进程同时请求磁盘读写时,操作系统将这些请求放入 I/O 队列。磁盘调度算法(如 SCAN、C-SCAN、Deadline 等)从这个队列中选择下一个要服务的请求。

Linux 的 Block I/O 层使用 struct request_queue 来管理 I/O 请求。每个块设备(如 /dev/sda)都有自己的请求队列。

// Linux 内核中请求队列的简化结构
struct request_queue {
    struct list_head queue_head;  // 请求链表
    struct elevator_type *elevator; // 调度算法(电梯算法)
    unsigned long nr_requests;   // 队列中最大请求数
    // ...
};

为什么 I/O 队列通常有最大长度限制? 如果队列无限增长,一个疯狂的进程可以发起大量 I/O 请求,耗尽所有内存。队列长度限制(默认通常是 128 或 256)提供了**背压(backpressure)**机制:当队列满时,新请求会被阻塞,从而限制了请求的产生速率。

3.2 消息队列的概念

从进程内的队列到进程间的队列,再到机器间的队列,队列的概念自然地延伸为消息队列(Message Queue)

消息队列是分布式系统中的核心基础设施。它解耦了消息的生产者和消费者,允许它们以不同的速率运行。

从单机队列到分布式消息队列的演化

  1. 线程内队列queue.Queue(Python)、BlockingQueue(Java)
  2. 进程间队列:POSIX 消息队列(mq_open)、System V 消息队列
  3. 机器间消息队列:RabbitMQ、Apache Kafka、Redis Streams、Amazon SQS
消息队列的基本模型:

Producer → [Message Queue] → Consumer

关键属性:
- 持久化:消息写入磁盘,即使 broker 重启也不丢失
- 有序性:同一分区内的消息保持 FIFO 顺序
- 确认机制:消费者处理完后发送 ACK,否则消息重新入队
- 背压:队列满时,生产者被减速或拒绝

Apache Kafka 的 Log 结构(Jay Kreps et al., LinkedIn, 2011):

Kafka 的核心抽象不是传统的"队列",而是分布式追加日志(Distributed Append-Only Log)。每个 partition 本质上就是一个 FIFO 队列,消费者通过 offset 来记住自己读到了哪里。这种设计让 Kafka 能够:

关于 Kafka 的详细架构,我们在消息队列专题书中会深入讨论。

3.3 阻塞队列与生产者-消费者模型

生产者-消费者模型(Producer-Consumer Pattern)是并发编程中最基础的设计模式之一。它的核心组件就是一个有界阻塞队列(Bounded Blocking Queue)

import threading
import queue
import time
import random

def producer(q: queue.Queue, name: str):
    """生产者:生产数据放入队列"""
    for i in range(10):
        item = f"{name}-item-{i}"
        q.put(item)  # 如果队列满,自动阻塞等待
        print(f"[{name}] Produced: {item}")
        time.sleep(random.uniform(0.01, 0.1))

def consumer(q: queue.Queue, name: str):
    """消费者:从队列取出数据处理"""
    while True:
        try:
            item = q.get(timeout=1)  # 等待最多1秒
            print(f"  [{name}] Consumed: {item}")
            time.sleep(random.uniform(0.05, 0.15))  # 模拟处理时间
            q.task_done()
        except queue.Empty:
            print(f"  [{name}] No more items, exiting.")
            break

# 创建一个容量为 5 的有界队列
bounded_queue = queue.Queue(maxsize=5)

# 启动生产者和消费者
producers = [
    threading.Thread(target=producer, args=(bounded_queue, f"P{i}"))
    for i in range(2)
]
consumers = [
    threading.Thread(target=consumer, args=(bounded_queue, f"C{i}"))
    for i in range(3)
]

for t in producers + consumers:
    t.start()
for t in producers:
    t.join()
bounded_queue.join()  # 等待所有任务完成

阻塞队列的实现原理

Python 的 queue.Queue 内部使用了条件变量(threading.Condition)来实现阻塞:

# queue.Queue 的核心逻辑(简化)
class SimpleBlockingQueue:
    def __init__(self, maxsize):
        self._queue = deque()
        self._maxsize = maxsize
        self._mutex = threading.Lock()
        self._not_full = threading.Condition(self._mutex)
        self._not_empty = threading.Condition(self._mutex)
    
    def put(self, item):
        with self._not_full:
            while len(self._queue) >= self._maxsize:
                self._not_full.wait()  # 满了就等待
            self._queue.append(item)
            self._not_empty.notify()  # 通知等待的消费者
    
    def get(self):
        with self._not_empty:
            while len(self._queue) == 0:
                self._not_empty.wait()  # 空了就等待
            item = self._queue.popleft()
            self._not_full.notify()  # 通知等待的生产者
            return item

为什么需要有界? 无界队列看似方便,但在生产环境中是隐患。如果消费者速度跟不上生产者,无界队列会无限增长,最终耗尽内存导致 OOM(Out Of Memory)。有界队列提供的背压机制迫使生产者在消费者忙不过来时减速——这是系统稳定性的关键。

这个原则在分布式系统中也成立:Kafka 的分区有容量限制,TCP 有接收窗口,HTTP/2 有流控——所有这些本质上都是"有界队列 + 背压"的应用。

3.4 优先队列的多种实现对比

优先队列的抽象很简单:insert(item, priority) 和 extract_min()。但它有多种实现方式,性能差异很大:

实现方式 insert extract_min peek 说明
无序数组 O(1) O(n) O(n) 插入快,取出慢
有序数组 O(n) O(1) O(1) 取出快,插入慢
二叉堆 O(log n) O(log n) O(1) 平衡的折中
斐波那契堆 O(1) 均摊 O(log n) 均摊 O(1) 理论最优,实现复杂

为什么二叉堆是实践中的默认选择?

  1. 时间复杂度平衡:insert 和 extract 都是 O(log n),没有明显短板。
  2. 空间局部性好:二叉堆可以用数组实现(不需要指针),CPU 缓存友好。
  3. 实现简单:只需要 sift_up 和 sift_down 两个操作。
  4. 常数因子小:相比红黑树等平衡树,堆的操作更简单,常数因子更小。

Michael Fredman 和 Robert Tarjan 在 1987 年的论文 "Fibonacci Heaps and Their Uses in Improved Network Optimization Algorithms" 中提出了斐波那契堆,将 decrease_key 操作优化到 O(1) 均摊,这对 Dijkstra 算法有重要意义。但在实践中,由于斐波那契堆的常数因子大、实现复杂,只有在图的边数远大于顶点数时才有优势。

# 三种优先队列实现的对比

class UnsortedPQ:
    """无序数组实现的优先队列"""
    def __init__(self):
        self._data = []
    
    def insert(self, item, priority):
        self._data.append((priority, item))  # O(1)
    
    def extract_min(self):
        if not self._data:
            raise IndexError("empty")
        min_idx = 0
        for i in range(1, len(self._data)):
            if self._data[i][0] < self._data[min_idx][0]:
                min_idx = i
        # 将最小元素与最后一个交换,然后 pop — O(n)
        self._data[min_idx], self._data[-1] = self._data[-1], self._data[min_idx]
        return self._data.pop()[1]


class SortedPQ:
    """有序数组实现的优先队列(按优先级降序排列)"""
    def __init__(self):
        self._data = []  # 按优先级降序排列,最小的在末尾
    
    def insert(self, item, priority):
        # 二分查找插入位置 — O(log n) 查找 + O(n) 移动
        import bisect
        bisect.insort(self._data, (priority, item))
    
    def extract_min(self):
        if not self._data:
            raise IndexError("empty")
        return self._data.pop(0)[1]  # O(n) 移动


import heapq

class HeapPQ:
    """二叉堆实现的优先队列"""
    def __init__(self):
        self._heap = []
        self._counter = 0
    
    def insert(self, item, priority):
        heapq.heappush(self._heap, (priority, self._counter, item))  # O(log n)
        self._counter += 1
    
    def extract_min(self):
        if not self._heap:
            raise IndexError("empty")
        return heapq.heappop(self._heap)[2]  # O(log n)

什么时候用哪种?

我们将在第 12 章详细讲解堆的内部结构、堆排序、以及 heapq 模块的实现原理。


Level 4 · 边界与陷阱

4.1 面试高频题

LeetCode #622:设计循环队列

class MyCircularQueue:
    """
    LeetCode 622: Design Circular Queue
    
    要点:
    1. 用数组 + front/rear 指针
    2. 区分满和空:牺牲一个位置,或用 size 计数器
    3. 取模运算实现循环
    """
    
    def __init__(self, k: int):
        self._data = [0] * (k + 1)
        self._front = 0
        self._rear = 0
        self._cap = k + 1
    
    def enQueue(self, value: int) -> bool:
        if self.isFull():
            return False
        self._data[self._rear] = value
        self._rear = (self._rear + 1) % self._cap
        return True
    
    def deQueue(self) -> bool:
        if self.isEmpty():
            return False
        self._front = (self._front + 1) % self._cap
        return True
    
    def Front(self) -> int:
        if self.isEmpty():
            return -1
        return self._data[self._front]
    
    def Rear(self) -> int:
        if self.isEmpty():
            return -1
        return self._data[(self._rear - 1 + self._cap) % self._cap]
    
    def isEmpty(self) -> bool:
        return self._front == self._rear
    
    def isFull(self) -> bool:
        return (self._rear + 1) % self._cap == self._front

面试陷阱

LeetCode #232:用栈实现队列

class MyQueue:
    """
    LeetCode 232: Implement Queue using Stacks
    
    两个栈:in_stack 和 out_stack。
    push → in_stack
    pop/peek → out_stack(空时从 in_stack 倒入)
    
    均摊 O(1)。面试时务必能解释清楚均摊分析。
    """
    
    def __init__(self):
        self._in = []
        self._out = []
    
    def push(self, x: int) -> None:
        self._in.append(x)
    
    def pop(self) -> int:
        self._transfer()
        return self._out.pop()
    
    def peek(self) -> int:
        self._transfer()
        return self._out[-1]
    
    def empty(self) -> bool:
        return not self._in and not self._out
    
    def _transfer(self):
        """当 out_stack 为空时,将 in_stack 全部倒入"""
        if not self._out:
            while self._in:
                self._out.append(self._in.pop())

面试追问

LeetCode #933:最近的请求次数

from collections import deque

class RecentCounter:
    """
    LeetCode 933: Number of Recent Calls
    
    维护一个队列,存储过去 3000 毫秒内的请求时间戳。
    每次新请求到来时,移除所有过期的时间戳。
    
    这是队列作为"滑动时间窗口"的典型应用。
    """
    
    def __init__(self):
        self._queue = deque()
    
    def ping(self, t: int) -> int:
        self._queue.append(t)
        # 移除所有超过 3000ms 前的请求
        while self._queue[0] < t - 3000:
            self._queue.popleft()
        return len(self._queue)


# 示例
rc = RecentCounter()
print(rc.ping(1))     # 1 — 窗口 [-2999, 1],只有 1
print(rc.ping(100))   # 2 — 窗口 [-2900, 100],有 1, 100
print(rc.ping(3001))  # 3 — 窗口 [1, 3001],有 1, 100, 3001
print(rc.ping(3002))  # 3 — 窗口 [2, 3002],有 100, 3001, 3002

4.2 deque vs list 性能对比实测

让我们做一个全面的性能对比:

import time
from collections import deque

def test_append_right(n):
    """右端追加:list vs deque"""
    # list
    lst = []
    start = time.perf_counter()
    for i in range(n):
        lst.append(i)
    t_list = time.perf_counter() - start
    
    # deque
    dq = deque()
    start = time.perf_counter()
    for i in range(n):
        dq.append(i)
    t_deque = time.perf_counter() - start
    
    return t_list, t_deque

def test_append_left(n):
    """左端追加:list vs deque"""
    # list
    lst = []
    start = time.perf_counter()
    for i in range(n):
        lst.insert(0, i)  # O(n)!
    t_list = time.perf_counter() - start
    
    # deque
    dq = deque()
    start = time.perf_counter()
    for i in range(n):
        dq.appendleft(i)  # O(1)
    t_deque = time.perf_counter() - start
    
    return t_list, t_deque

def test_pop_left(n):
    """左端弹出:list vs deque"""
    # list
    lst = list(range(n))
    start = time.perf_counter()
    for i in range(n):
        lst.pop(0)  # O(n)!
    t_list = time.perf_counter() - start
    
    # deque
    dq = deque(range(n))
    start = time.perf_counter()
    for i in range(n):
        dq.popleft()  # O(1)
    t_deque = time.perf_counter() - start
    
    return t_list, t_deque

def test_random_access(n):
    """随机访问:list vs deque"""
    lst = list(range(n))
    dq = deque(range(n))
    
    import random
    indices = [random.randint(0, n-1) for _ in range(10000)]
    
    start = time.perf_counter()
    for i in indices:
        _ = lst[i]  # O(1)
    t_list = time.perf_counter() - start
    
    start = time.perf_counter()
    for i in indices:
        _ = dq[i]  # O(n)!
    t_deque = time.perf_counter() - start
    
    return t_list, t_deque


n = 100_000
print("=" * 60)
print(f"Performance comparison: list vs deque (n={n:,})")
print("=" * 60)

t_list, t_deque = test_append_right(n)
print(f"Append right:  list={t_list:.4f}s  deque={t_deque:.4f}s  "
      f"winner={'tie' if abs(t_list-t_deque)/max(t_list,t_deque) < 0.2 else 'list' if t_list < t_deque else 'deque'}")

t_list, t_deque = test_append_left(n)
print(f"Append left:   list={t_list:.4f}s  deque={t_deque:.4f}s  "
      f"deque is {t_list/t_deque:.0f}x faster")

t_list, t_deque = test_pop_left(n)
print(f"Pop left:      list={t_list:.4f}s  deque={t_deque:.4f}s  "
      f"deque is {t_list/t_deque:.0f}x faster")

t_list, t_deque = test_random_access(n)
print(f"Random access: list={t_list:.4f}s  deque={t_deque:.4f}s  "
      f"list is {t_deque/t_list:.0f}x faster")

典型结果:

============================================================
Performance comparison: list vs deque (n=100,000)
============================================================
Append right:  list=0.0052s  deque=0.0048s  winner=tie
Append left:   list=2.1500s  deque=0.0048s  deque is 448x faster
Pop left:      list=1.8900s  deque=0.0041s  deque is 461x faster
Random access: list=0.0003s  deque=0.2100s  list is 700x faster

结论总结

操作 list deque 赢家
右端 append/pop O(1) 均摊 O(1) 平手
左端 insert/pop O(n) O(1) deque
随机访问 [i] O(1) O(n) list
内存连续性 连续 块状 list 略优

选择规则

4.3 什么时候用队列,什么时候用栈

这是一个看似简单但实际上很有深度的问题。核心区别在于搜索/处理的顺序

特性 栈(Stack) 队列(Queue)
顺序 LIFO — 后进先出 FIFO — 先进先出
搜索策略 DFS — 深度优先 BFS — 广度优先
搜索行为 先走到底,再回溯 一层一层向外扩展
找最短路径? 不保证 保证(无权图)
空间复杂度 O(最大深度) O(最大宽度)
典型应用 递归、回溯、括号匹配 调度、层序遍历、最短路径

DFS vs BFS 的选择指南

  1. 求最短路径 → BFS。DFS 可能找到一条很长的路径,而 BFS 保证找到最短的。
  2. 检测可达性(只需要知道能不能到) → DFS 和 BFS 都行。DFS 通常空间更省(递归栈)。
  3. 搜索空间很深但答案较浅 → BFS 更快找到。
  4. 搜索空间很宽但答案很深 → DFS 更节省空间。
  5. 需要遍历所有可能 → 两者都行,取决于具体问题的结构。
from collections import deque

def solve_maze_bfs(maze, start, end):
    """BFS 走迷宫 — 保证找到最短路径"""
    rows, cols = len(maze), len(maze[0])
    visited = {start}
    queue = deque([(start, [start])])
    
    while queue:
        (r, c), path = queue.popleft()
        if (r, c) == end:
            return path  # 第一次到达就是最短路径!
        
        for dr, dc in [(0,1), (0,-1), (1,0), (-1,0)]:
            nr, nc = r + dr, c + dc
            if (0 <= nr < rows and 0 <= nc < cols 
                and maze[nr][nc] == 0 and (nr, nc) not in visited):
                visited.add((nr, nc))
                queue.append(((nr, nc), path + [(nr, nc)]))
    
    return None  # 无解

def solve_maze_dfs(maze, start, end):
    """DFS 走迷宫 — 能找到路径,但不保证最短"""
    rows, cols = len(maze), len(maze[0])
    visited = set()
    
    def dfs(r, c, path):
        if (r, c) == end:
            return path
        visited.add((r, c))
        
        for dr, dc in [(0,1), (0,-1), (1,0), (-1,0)]:
            nr, nc = r + dr, c + dc
            if (0 <= nr < rows and 0 <= nc < cols 
                and maze[nr][nc] == 0 and (nr, nc) not in visited):
                result = dfs(nr, nc, path + [(nr, nc)])
                if result:
                    return result
        return None
    
    return dfs(start[0], start[1], [start])


# 0=通路,1=墙壁
maze = [
    [0, 0, 1, 0, 0],
    [0, 1, 0, 1, 0],
    [0, 0, 0, 0, 0],
    [1, 1, 0, 1, 0],
    [0, 0, 0, 1, 0]
]

bfs_path = solve_maze_bfs(maze, (0, 0), (4, 4))
dfs_path = solve_maze_dfs(maze, (0, 0), (4, 4))

print(f"BFS path length: {len(bfs_path)}")  # 最短
print(f"DFS path length: {len(dfs_path)}")  # 可能更长

实战中的另一个考量:递归深度限制

Python 默认递归深度限制是 1000。如果你的 DFS 搜索空间超过 1000 层,递归版 DFS 会报 RecursionError。解决方案:

  1. sys.setrecursionlimit(更大的数) — 简单但不优雅,可能栈溢出。
  2. 用显式栈代替递归 — 把 DFS 改成迭代版本,使用自己管理的栈。
  3. 改用 BFS — 如果问题允许的话。
def dfs_iterative(graph, start):
    """迭代版 DFS — 用显式栈代替递归"""
    visited = set()
    stack = [start]
    
    while stack:
        node = stack.pop()  # 栈:后进先出
        if node in visited:
            continue
        visited.add(node)
        print(node, end=" ")
        
        for neighbor in graph[node]:
            if neighbor not in visited:
                stack.append(neighbor)
    
    return visited

注意:迭代 DFS 和递归 DFS 的遍历顺序可能不完全一致(取决于邻居入栈的顺序),但都是深度优先。

4.4 边界情况与陷阱总结

陷阱 1:deque 的 maxlen 参数

from collections import deque

# maxlen 会自动丢弃另一端的元素!
dq = deque(maxlen=3)
dq.append(1)
dq.append(2)
dq.append(3)
dq.append(4)  # 1 被自动丢弃!
print(dq)     # deque([2, 3, 4], maxlen=3)

dq.appendleft(0)  # 4 被自动丢弃!
print(dq)     # deque([0, 2, 3], maxlen=3)

这个特性可以用来实现"固定大小的滑动窗口",但如果你不期望这个行为,可能会导致难以发现的 bug。

陷阱 2:多线程中的 deque

collections.dequeappend()popleft() 在 CPython 中由于 GIL 的存在是线程安全的(单个操作是原子的)。但这不代表你的代码是线程安全的

from collections import deque

dq = deque()

# ❌ 这段代码不是线程安全的!
if len(dq) > 0:     # 检查和操作之间可能被其他线程插入
    item = dq.popleft()  # 此时 dq 可能已经被其他线程清空

# ✅ 正确做法:用 try/except
try:
    item = dq.popleft()
except IndexError:
    item = None

# ✅ 或者使用 queue.Queue(自带锁)
import queue
q = queue.Queue()

陷阱 3:BFS 中忘记标记已访问

# ❌ 错误:在出队时标记
while queue:
    node = queue.popleft()
    visited.add(node)  # 太晚了!同一个节点可能已经被多次入队
    for neighbor in graph[node]:
        if neighbor not in visited:
            queue.append(neighbor)

# ✅ 正确:在入队时标记
while queue:
    node = queue.popleft()
    for neighbor in graph[node]:
        if neighbor not in visited:
            visited.add(neighbor)  # 入队时就标记
            queue.append(neighbor)

在出队时标记会导致同一个节点被多次入队(多个邻居都指向它),浪费空间和时间。在入队时标记保证每个节点最多入队一次。

陷阱 4:循环队列的 size 计算

# 当 rear >= front 时:size = rear - front
# 当 rear < front 时:size = rear - front + capacity
# 合并为一个公式:
size = (rear - front + capacity) % capacity

这个公式在 rear == front 时给出 0(空),在 (rear + 1) % capacity == front 时给出 capacity - 1(满,因为我们牺牲了一个位置)。


总结

队列是"公平性"的数学化身。当你需要保证"先来先服务"的语义时,队列就是你的选择。

本章的核心要点:

  1. 用 deque,不用 list — 这是 Python 中最重要的队列实践。
  2. 循环队列用取模 — 理解 (index + 1) % capacity 就理解了循环的本质。
  3. BFS 用队列 — 队列保证了"层层扩展"的顺序,因此天然找到最短路径。
  4. 优先队列用堆 — 第 12 章详细展开。
  5. 阻塞队列解耦生产者和消费者 — 这是并发编程和分布式系统的基石。

队列看似简单,但它的应用从操作系统调度到分布式消息系统,贯穿了计算机科学的每一层抽象。理解队列,就是理解"有序处理"这个基本计算模式。

下一步学习路径:

本章评分
4.6  / 5  (57 评分)

💬 留言讨论