第 38 章

设计类数据结构

第三十八章:设计类数据结构

面试中有一类题目,不考你"用哪个算法",而是考你"能不能从零开始设计一个数据结构"。题目会给出一组 API 接口和性能约束——通常是 O(1) 时间复杂度——让你选择底层存储方案、组合多种基础结构,在满足约束的前提下实现所有接口。

这类题考察的核心能力不是背模板,而是将需求拆解为数据结构特性的能力。"需要 O(1) 查找"暗示哈希表;"需要维护顺序"暗示链表或数组;"需要 O(1) 删除最旧的元素"暗示队列接口。当一个数据结构满足不了所有约束时,你就需要把两个、三个结构组合在一起,用指针或索引打通它们之间的关联。

本章从最经典的 LRU Cache 讲起,然后扩展到 LFU Cache、最小栈、随机化集合和 Twitter Timeline 设计,最后讨论缓存淘汰策略在工业界的真实应用以及面试中设计题的通用解题框架。


Level 1 · 你需要知道的

38.1 LRU Cache 完整实现(LeetCode #146)

问题定义

设计一个满足 LRU(Least Recently Used,最近最少使用)缓存约束的数据结构。实现 LRUCache 类:

要求: getput 操作均需 O(1) 时间复杂度。

为什么需要哈希表 + 双向链表?

先分析每个操作的核心需求:

操作 需要的能力 单一数据结构能否满足
get(key) O(1) 按 key 查找 哈希表可以
put(key, value) 更新后移到最前 O(1) 将节点移到链表头部 双向链表可以(已知节点指针)
缓存满时淘汰 O(1) 删除最旧节点 双向链表尾部删除 O(1)
淘汰时还需移除哈希表映射 节点自身要存 key 节点中保存 key

哈希表提供 O(1) 按 key 查找到链表节点的指针;双向链表提供 O(1) 的移动、删除操作(因为双向链表节点知道自己的前驱和后继)。两者组合才能满足所有 O(1) 约束。

如果只用哈希表:你无法以 O(1) 确定"最久未使用的是谁"——你需要额外维护时间顺序。 如果只用链表:你无法以 O(1) 按 key 查找节点——必须遍历。

这是 Sleator 和 Tarjan (1985, "Amortized Efficiency of List Update and Paging Rules") 分析的经典在线竞争策略之一,LRU 被证明是 k-competitive 的页面置换算法。

完整实现

class DLinkedNode:
    """双向链表节点"""
    __slots__ = ('key', 'value', 'prev', 'next')
    
    def __init__(self, key=0, value=0):
        self.key = key
        self.value = value
        self.prev = None
        self.next = None


class LRUCache:
    """
    哈希表 + 双向链表实现 LRU Cache
    
    设计思路:
    - 哈希表 cache: key -> DLinkedNode,提供 O(1) 查找
    - 双向链表:维护访问顺序,头部是最近使用的,尾部是最久未使用的
    - 哨兵节点 head/tail:简化边界条件处理,永远不会被删除
    """
    
    def __init__(self, capacity: int):
        self.cache = {}           # key -> DLinkedNode
        self.capacity = capacity
        self.size = 0
        
        # 使用哨兵节点(sentinel / dummy nodes)
        # 好处:不需要特判链表为空或只有一个节点的情况
        self.head = DLinkedNode()  # 伪头部
        self.tail = DLinkedNode()  # 伪尾部
        self.head.next = self.tail
        self.tail.prev = self.head
    
    def get(self, key: int) -> int:
        if key not in self.cache:
            return -1
        # 命中缓存:将节点移到头部(标记为最近使用)
        node = self.cache[key]
        self._move_to_head(node)
        return node.value
    
    def put(self, key: int, value: int) -> None:
        if key in self.cache:
            # key 已存在:更新值并移到头部
            node = self.cache[key]
            node.value = value
            self._move_to_head(node)
        else:
            # key 不存在:创建新节点
            node = DLinkedNode(key, value)
            self.cache[key] = node
            self._add_to_head(node)
            self.size += 1
            
            if self.size > self.capacity:
                # 超出容量:移除尾部节点(最久未使用)
                removed = self._remove_tail()
                del self.cache[removed.key]  # 节点中存了 key,所以能 O(1) 删除哈希表条目
                self.size -= 1
    
    def _add_to_head(self, node: DLinkedNode) -> None:
        """将节点添加到头部(哨兵之后)"""
        node.prev = self.head
        node.next = self.head.next
        self.head.next.prev = node
        self.head.next = node
    
    def _remove_node(self, node: DLinkedNode) -> None:
        """从链表中摘除一个节点(O(1),因为知道 prev 和 next)"""
        node.prev.next = node.next
        node.next.prev = node.prev
    
    def _move_to_head(self, node: DLinkedNode) -> None:
        """将节点移到头部 = 先摘除再插入头部"""
        self._remove_node(node)
        self._add_to_head(node)
    
    def _remove_tail(self) -> DLinkedNode:
        """移除尾部节点(哨兵之前的那个)并返回它"""
        node = self.tail.prev
        self._remove_node(node)
        return node

为什么节点中要存 key?

注意 DLinkedNode 同时保存了 keyvalue。value 是必须的(get 要返回),但为什么要存 key?因为当缓存满了执行淘汰时,你从链表尾部拿到一个节点,需要同时从哈希表中删除对应条目——如果节点不存 key,你就无法以 O(1) 在哈希表中定位这个条目。

为什么用哨兵节点?

没有哨兵节点的链表实现需要大量 if-else 判断:

# 没有哨兵时的 add_to_head(反面示例)
def _add_to_head(self, node):
    if self.head is None:       # 链表为空
        self.head = node
        self.tail = node
    else:
        node.next = self.head
        self.head.prev = node
        self.head = node

哨兵节点让链表永远不为空(至少有 head 和 tail 两个哨兵),所有操作逻辑统一,bug 更少。这是 Cormen 等人在《算法导论》(CLRS, 3rd Ed, 2009) 10.2 节推荐的实现技巧。

常见错误与修复

错误 后果 修复
put 已存在的 key 时没有 _move_to_head 访问顺序不正确,淘汰错误的节点 更新值后调用 _move_to_head
淘汰时忘记 del self.cache[removed.key] 哈希表中存在悬空引用,后续 get 返回已淘汰的值 淘汰链表节点的同时删除哈希表条目
使用单向链表 删除节点需要 O(n) 找前驱 必须用双向链表
_remove_node 后没有断开 node 的 prev/next 虽然逻辑正确但可能导致 GC 问题 可选:将 node.prev/node.next 置为 None

复杂度分析

38.2 LFU Cache 完整实现(LeetCode #460)

问题定义

设计一个满足 LFU(Least Frequently Used,最不经常使用)缓存约束的数据结构:

为什么 LFU 比 LRU 难?

LRU 只需维护一个"时间"维度。LFU 需要维护两个维度:频率(frequency)和同频率内的时间顺序。淘汰规则变成了"先按频率选最低,频率相同再按 LRU 选最旧"。

数据结构选择

我们需要 O(1) 完成以下操作:

  1. 按 key 查找节点 → 哈希表 key_to_node
  2. 获取当前最低频率 → 变量 min_freq
  3. 从最低频率桶中淘汰 LRU 节点 → 每个频率对应一个双向链表(LRU 链表)
  4. 将节点从频率 f 的链表移到频率 f+1 的链表 → 哈希表 freq_to_list

核心设计:两个哈希表 + 多个双向链表 + 一个 min_freq 变量

这个设计由 Shah, Mitra 和 Matani (2010, "An O(1) algorithm for implementing the LFU cache eviction scheme") 提出,证明了所有操作可以在 O(1) 内完成。

from collections import defaultdict, OrderedDict


class LFUCache:
    """
    LFU Cache:O(1) get/put
    
    核心数据结构:
    - key_to_val: dict, key -> value
    - key_to_freq: dict, key -> 当前频率
    - freq_to_keys: dict, freq -> OrderedDict (当作有序集合,维护 LRU 顺序)
    - min_freq: int, 当前最小频率
    
    为什么用 OrderedDict?
    Python 的 OrderedDict 底层是哈希表+双向链表,提供:
    - O(1) 插入末尾
    - O(1) 删除指定 key(因为内部是哈希表)
    - O(1) 弹出最老元素 (popitem(last=False))
    - O(1) move_to_end
    
    它恰好就是我们需要的"一个频率桶内的 LRU 链表"。
    """
    
    def __init__(self, capacity: int):
        self.capacity = capacity
        self.key_to_val = {}
        self.key_to_freq = {}
        self.freq_to_keys = defaultdict(OrderedDict)
        self.min_freq = 0
    
    def get(self, key: int) -> int:
        if key not in self.key_to_val:
            return -1
        # 增加频率
        self._increase_freq(key)
        return self.key_to_val[key]
    
    def put(self, key: int, value: int) -> None:
        if self.capacity <= 0:
            return
        
        if key in self.key_to_val:
            # 已存在:更新值,增加频率
            self.key_to_val[key] = value
            self._increase_freq(key)
        else:
            # 不存在:先检查容量
            if len(self.key_to_val) >= self.capacity:
                self._evict()
            
            # 插入新 key
            self.key_to_val[key] = value
            self.key_to_freq[key] = 1
            self.freq_to_keys[1][key] = None  # OrderedDict 当 OrderedSet 用
            self.min_freq = 1  # 新插入的 key 频率为 1,所以 min_freq 一定是 1
    
    def _increase_freq(self, key: int) -> None:
        """将 key 的频率 +1,维护 min_freq"""
        freq = self.key_to_freq[key]
        self.key_to_freq[key] = freq + 1
        
        # 从旧频率桶中移除
        del self.freq_to_keys[freq][key]
        
        # 如果旧频率桶空了且它是 min_freq,更新 min_freq
        if not self.freq_to_keys[freq]:
            del self.freq_to_keys[freq]
            if self.min_freq == freq:
                self.min_freq = freq + 1
        
        # 加入新频率桶(末尾 = 最近使用)
        self.freq_to_keys[freq + 1][key] = None
    
    def _evict(self) -> None:
        """淘汰 min_freq 桶中最久未使用的 key"""
        # popitem(last=False) 弹出最老的(插入最早的)
        evict_key, _ = self.freq_to_keys[self.min_freq].popitem(last=False)
        
        if not self.freq_to_keys[self.min_freq]:
            del self.freq_to_keys[self.min_freq]
        
        del self.key_to_val[evict_key]
        del self.key_to_freq[evict_key]

关键洞察:min_freq 的维护

min_freq 在什么时候变化?

  1. 插入新 key 时:新 key 频率为 1,min_freq = 1(这是唯一确定的赋值)
  2. 频率增加时:如果 min_freq 对应的桶变空了,min_freq += 1

不需要遍历所有频率桶来找最小值,因为频率只可能逐步递增,不会跳跃。

LRU vs LFU 对比

维度 LRU LFU
淘汰依据 最久未访问 访问频率最低
热点突发 处理好(新访问立刻到头部) 新热点可能因历史频率低被误淘汰
缓存污染 一次全表扫描可能冲掉所有热点 相对抗污染(高频 key 不会因一次扫描被淘汰)
实现复杂度 中等 较高
额外空间 O(n) O(n) 但常数更大

38.3 最小栈(LeetCode #155)

问题定义

设计一个支持 O(1) pushpoptopgetMin 的栈。

核心思路

普通栈天然支持 O(1) push/pop/top。难点在于 getMin——如何在栈顶元素被 pop 之后,仍能 O(1) 知道当前的最小值?

关键洞察:最小值是和栈的状态绑定的。当栈有 n 个元素时对应一个最小值;当栈变成 n-1 个元素时可能对应另一个最小值。所以我们对栈的每个"历史状态"都记住当时的最小值。

两种实现方式:

  1. 辅助栈法:用一个额外的栈 min_stack,栈顶始终是当前状态的最小值
  2. 元组法:主栈的每个元素存 (value, current_min) 二元组
class MinStack:
    """
    辅助栈实现
    
    为什么辅助栈有效?
    因为栈是 LIFO 的。如果当前最小值 m 在栈中第 k 层,
    那么只有第 k 层被 pop 时 m 才会消失。
    而第 k 层被 pop 之前,第 k+1...n 层必然已经全部被 pop。
    所以最小值的变化顺序也是"栈式"的——用栈来记录正好。
    """
    
    def __init__(self):
        self.stack = []
        self.min_stack = []  # min_stack[i] = 前 i+1 个元素中的最小值
    
    def push(self, val: int) -> None:
        self.stack.append(val)
        # 如果 min_stack 为空或新值 <= 当前最小值,压入 min_stack
        # 注意是 <=,不是 <。因为相同最小值可能出现多次
        if not self.min_stack or val <= self.min_stack[-1]:
            self.min_stack.append(val)
    
    def pop(self) -> None:
        val = self.stack.pop()
        # 如果弹出的值等于当前最小值,min_stack 也要弹出
        if val == self.min_stack[-1]:
            self.min_stack.pop()
    
    def top(self) -> int:
        return self.stack[-1]
    
    def getMin(self) -> int:
        return self.min_stack[-1]

常见错误:<= 写成 <

如果 push 时用 val < self.min_stack[-1](严格小于),那么当相同最小值出现两次时,min_stack 只记录一次。第一次 pop 掉一个最小值后,min_stack 也弹了,但栈里其实还有一个相同的最小值未被记录。

# 错误示例
stack.push(0)    # min_stack = [0]
stack.push(0)    # min_stack = [0]  ← 如果用 <,第二个 0 不会入 min_stack
stack.pop()      # 弹出 0,min_stack.pop() → min_stack = []
stack.getMin()   # 错误!min_stack 为空,但栈里还有一个 0

38.4 随机化集合 O(1) 插入/删除/随机获取(LeetCode #380)

问题定义

设计一个数据结构,支持以下操作均为平均 O(1):

为什么哈希表不够?

哈希表可以 O(1) 插入和删除,但无法 O(1) 随机获取——你无法以 O(1) 在哈希表中"随机选一个位置"(哈希表的内部数组有大量空槽)。

数组可以 O(1) 随机获取(随机下标即可),但无法 O(1) 删除中间元素(需要移动后续元素)。

解决方案:数组 + 哈希表

import random


class RandomizedSet:
    """
    数组 + 哈希表
    
    核心技巧:删除时将待删元素与数组末尾元素交换,然后 pop 末尾。
    这样不需要移动其他元素,保持 O(1)。
    """
    
    def __init__(self):
        self.nums = []           # 存储元素值
        self.val_to_idx = {}     # 元素值 -> 在 nums 中的下标
    
    def insert(self, val: int) -> bool:
        if val in self.val_to_idx:
            return False
        self.val_to_idx[val] = len(self.nums)
        self.nums.append(val)
        return True
    
    def remove(self, val: int) -> bool:
        if val not in self.val_to_idx:
            return False
        
        # 将 val 与末尾元素交换
        idx = self.val_to_idx[val]
        last = self.nums[-1]
        
        self.nums[idx] = last
        self.val_to_idx[last] = idx
        
        # 删除末尾
        self.nums.pop()
        del self.val_to_idx[val]
        
        return True
    
    def getRandom(self) -> int:
        return random.choice(self.nums)

为什么"交换到末尾"有效?

集合是无序的,所以元素在数组中的位置不重要。交换不改变集合的逻辑内容,但让物理删除变成 O(1) 的 pop 操作。

38.5 设计 Twitter(LeetCode #355)

问题定义

设计一个简化版 Twitter:

这是一个经典的"合并 K 个有序链表"场景

每个用户的推文按时间有序。获取 news feed 时,需要将用户自己及所有关注者的推文链表合并,取 top 10。这正是"合并 K 个有序链表取前 N 个"问题——用最小堆(实际用最大堆按时间戳)

import heapq
from collections import defaultdict


class Twitter:
    """
    设计思路:
    - 全局时间戳 self.time,每次发推递增
    - self.tweets[userId]: list of (timestamp, tweetId),按时间倒序
    - self.following[userId]: set of followeeId
    
    getNewsFeed 使用堆合并 K 个有序列表(每人最多取 10 条)
    """
    
    def __init__(self):
        self.time = 0
        self.tweets = defaultdict(list)       # userId -> [(time, tweetId), ...]
        self.following = defaultdict(set)     # userId -> set of followeeIds
    
    def postTweet(self, userId: int, tweetId: int) -> None:
        self.tweets[userId].append((self.time, tweetId))
        self.time += 1
    
    def getNewsFeed(self, userId: int) -> list:
        # 收集所有相关用户(自己 + 关注的人)
        users = self.following[userId] | {userId}
        
        # 最大堆合并(Python 用负数模拟最大堆)
        # 对每个用户,把最新的推文入堆
        heap = []
        for uid in users:
            tweet_list = self.tweets[uid]
            if tweet_list:
                idx = len(tweet_list) - 1
                t, tid = tweet_list[idx]
                # (负时间戳, tweetId, userId, 当前索引)
                heapq.heappush(heap, (-t, tid, uid, idx))
        
        result = []
        while heap and len(result) < 10:
            neg_t, tid, uid, idx = heapq.heappop(heap)
            result.append(tid)
            # 将该用户的下一条推文入堆
            if idx > 0:
                idx -= 1
                t, tid = self.tweets[uid][idx]
                heapq.heappush(heap, (-t, tid, uid, idx))
        
        return result
    
    def follow(self, followerId: int, followeeId: int) -> None:
        if followerId != followeeId:
            self.following[followerId].add(followeeId)
    
    def unfollow(self, followerId: int, followeeId: int) -> None:
        self.following[followerId].discard(followeeId)

时间复杂度分析


Level 2 · 它是怎么运行的

38.6 LRU Cache 的内部执行过程

让我们用一个具体例子追踪 LRU Cache 的完整执行过程:

capacity = 3
操作序列: put(1,A), put(2,B), put(3,C), get(2), put(4,D), get(1), get(3)

初始状态:

链表: head <-> tail
哈希表: {}
size: 0

put(1, A):

链表: head <-> [1:A] <-> tail
哈希表: {1: node_1}
size: 1

put(2, B):

链表: head <-> [2:B] <-> [1:A] <-> tail
哈希表: {1: node_1, 2: node_2}
size: 2

新节点总是插入头部。

put(3, C):

链表: head <-> [3:C] <-> [2:B] <-> [1:A] <-> tail
哈希表: {1: node_1, 2: node_2, 3: node_3}
size: 3 (已满)

get(2) → 返回 B:

链表: head <-> [2:B] <-> [3:C] <-> [1:A] <-> tail
                ↑ 移到头部
哈希表不变

节点 2 被访问,从中间摘除移到头部。

put(4, D):容量已满,需要淘汰

淘汰 tail.prev = node_1(最久未使用)
删除 cache[1]

链表: head <-> [4:D] <-> [2:B] <-> [3:C] <-> tail
哈希表: {2: node_2, 3: node_3, 4: node_4}
size: 3

get(1) → 返回 -1: key 1 已被淘汰。

get(3) → 返回 C:

链表: head <-> [3:C] <-> [4:D] <-> [2:B] <-> tail

38.7 OrderedDict 实现 LRU 的原理

Python 的 collections.OrderedDict 内部就是哈希表 + 双向链表。我们可以用它一行实现 LRU 的核心逻辑:

from collections import OrderedDict


class LRUCacheSimple(OrderedDict):
    """用 OrderedDict 实现 LRU Cache(面试中展示你理解底层后的简洁写法)"""
    
    def __init__(self, capacity: int):
        super().__init__()
        self.capacity = capacity
    
    def get(self, key: int) -> int:
        if key not in self:
            return -1
        self.move_to_end(key)  # 标记为最近使用
        return self[key]
    
    def put(self, key: int, value: int) -> None:
        if key in self:
            self.move_to_end(key)
        self[key] = value
        if len(self) > self.capacity:
            self.popitem(last=False)  # 弹出最旧的(头部)

OrderedDict.move_to_end(key)popitem(last=False) 都是 O(1)。

但面试中通常要求你手写双向链表,因为考察的就是你对底层数据结构的理解。使用 OrderedDict 是"知道有这个工具",手写是"知道它是怎么工作的"。

38.8 LFU 的频率桶结构详解

让我们可视化 LFU 的内部状态:

capacity = 3
操作: put(1,A), put(2,B), put(3,C), get(1), get(1), get(2), put(4,D)

put(1,A), put(2,B), put(3,C) 之后:

key_to_val:  {1:A, 2:B, 3:C}
key_to_freq: {1:1, 2:1, 3:1}
freq_to_keys: {1: OrderedDict([1, 2, 3])}   ← 按插入顺序
min_freq: 1

get(1), get(1) 之后:

key_to_val:  {1:A, 2:B, 3:C}
key_to_freq: {1:3, 2:1, 3:1}
freq_to_keys: {1: OrderedDict([2, 3]),  3: OrderedDict([1])}
min_freq: 1

key 1 的频率变成 3(初始 1 + 两次 get)。

get(2) 之后:

key_to_freq: {1:3, 2:2, 3:1}
freq_to_keys: {1: OrderedDict([3]),  2: OrderedDict([2]),  3: OrderedDict([1])}
min_freq: 1

put(4, D):容量已满,需要淘汰 min_freq=1 桶中最旧的 → key 3

淘汰 key 3
key_to_val:  {1:A, 2:B, 4:D}
key_to_freq: {1:3, 2:2, 4:1}
freq_to_keys: {1: OrderedDict([4]),  2: OrderedDict([2]),  3: OrderedDict([1])}
min_freq: 1    ← 新插入 key 4 频率为 1

38.9 随机化集合的"交换删除"技巧深入

"交换到末尾再删除"是一个在多种场景中反复出现的技巧。让我们理解它为什么正确:

不变式(Invariant): 数组 nums[0..n-1] 存储所有 n 个元素,val_to_idx[v] 是 v 在 nums 中的下标。

删除操作的原子步骤:

1. idx = val_to_idx[val]       # 找到待删元素的位置
2. last = nums[-1]             # 记住末尾元素
3. nums[idx] = last            # 用末尾元素覆盖待删位置
4. val_to_idx[last] = idx      # 更新末尾元素的下标映射
5. nums.pop()                  # 删除末尾(O(1))
6. del val_to_idx[val]         # 删除哈希表映射

边界情况: 如果 val 就是末尾元素(idx == len(nums) - 1),步骤 3-4 是自我赋值(无害),步骤 5-6 正确删除。

为什么 getRandom 等概率?

random.choice(nums) 在连续数组上等概率选一个下标,每个元素被选中概率恰好是 1/n。如果用哈希表,由于开放寻址或链表法的不均匀分布,无法保证等概率。

38.10 设计题的性能分析框架

对于设计类题目,面试官通常还会追问性能:

数据结构 get put/insert delete 随机访问 空间
LRU Cache O(1) O(1) O(1) 淘汰 N/A O(n)
LFU Cache O(1) O(1) O(1) 淘汰 N/A O(n) 但常数大
MinStack O(1) top/min O(1) push O(1) pop N/A O(n) 最坏 2n
RandomizedSet N/A O(1) avg O(1) avg O(1) O(n)

Level 3 · 规范怎么定义的

38.11 LRU 在操作系统中的应用

页面置换算法

操作系统的虚拟内存管理需要决定"物理内存满了,应该换出哪一页"。Belady (1966, "A Study of Replacement Algorithms for a Virtual-Storage Computer") 证明了最优算法 OPT(淘汰未来最久不使用的页面)是不可实现的(需要预知未来),LRU 是最接近 OPT 的可实现近似。

Sleator 和 Tarjan (1985, "Amortized Efficiency of List Update and Paging Rules") 进一步证明了 LRU 的竞争比:对于 k 个页面框架的缓存,LRU 是 k-competitive 的,即最坏情况下缺页次数不超过 OPT 的 k 倍。

Linux 内核的近似 LRU

真正的 LRU 需要在每次内存访问时更新链表,开销过大。Linux 内核使用近似 LRU(Clock/Second-Chance 算法)

这就是为什么教科书中说"操作系统使用 LRU"时需要加限定——没有系统真正实现精确 LRU,都是近似版本。

38.12 Redis 中的 LRU 实现

Redis 的 maxmemory-policy 支持多种淘汰策略:

Redis 的近似 LRU 实现

Redis 并没有为每个 key 维护一个真正的双向链表。在百万甚至亿级 key 的场景下,维护全局链表的内存开销和锁竞争不可接受。

Redis 的做法(由 Salvatore Sanfilippo 在 Redis 3.0 中改进):

  1. 每个 key 的对象头中存储一个 24 位时间戳(lru 字段),记录最后一次访问时间(精度为秒)
  2. 淘汰时随机采样 N 个 key(默认 N=5,可通过 maxmemory-samples 配置)
  3. 从 N 个样本中淘汰 lru 值最旧的那个
  4. Redis 3.0 引入淘汰候选池:维护一个大小为 16 的池,每次采样将"更旧"的 key 加入池中,淘汰池中最旧的

为什么近似够用?Salvatore 的测试表明,当 maxmemory-samples=10 时,近似 LRU 的命中率与精确 LRU 几乎无差异,但内存开销从每个 key 两个指针(16字节 × key数)降低到每个 key 3 字节(24位时间戳)。

Redis 的 LFU 实现

Redis 4.0 引入的 LFU 使用了 Morris 计数器(Morris, 1978, "Counting Large Numbers of Events in Small Registers")进行概率递增——用 8 位表示频率的对数近似值,避免高频 key 的计数器溢出。同时还引入时间衰减,使得长期不访问的 key 频率会下降。

38.13 MySQL Buffer Pool 的 LRU 变体

MySQL InnoDB 的 Buffer Pool 使用了改良的 LRU 算法,解决两个实际问题:

问题 1:预读(Read-Ahead)污染

InnoDB 会预读相邻页面到 Buffer Pool。如果预读的页面直接放到 LRU 链表头部,会把真正的热数据挤出去。

解决方案:midpoint insertion

链表分为"young"区(热端,默认占 5/8)和"old"区(冷端,占 3/8)。新读入的页面先放到 old 区头部。只有在 old 区中存活超过 innodb_old_blocks_time(默认 1 秒)后再次被访问,才会提升到 young 区。

问题 2:全表扫描(Full Table Scan)污染

一次 SELECT * FROM large_table 会依次读取所有页面。如果用标准 LRU,这些页面会把所有热数据挤出 Buffer Pool。

midpoint insertion 策略确保全表扫描的页面只在 old 区循环,不会进入 young 区影响热数据——因为全表扫描中每页通常只访问一次,无法满足"存活超过 1 秒后再次被访问"的提升条件。

38.14 缓存淘汰策略对比

策略 全称 淘汰依据 优点 缺点 典型应用
FIFO First In First Out 最早进入缓存的 实现最简单 不考虑访问模式 日志缓冲区
LRU Least Recently Used 最久未访问的 利用时间局部性 一次扫描可能污染 Redis, OS page cache
LFU Least Frequently Used 访问频率最低的 抗扫描污染 新数据难以获得高频率;需要计数器空间 CDN, 数据库
ARC Adaptive Replacement Cache 自适应(LRU + LFU 的混合) 自动调整策略 实现复杂;IBM 有专利 ZFS, PostgreSQL
CLOCK Clock / Second-Chance 近似 LRU 低开销实现 不如 LRU 精确 Linux 内核
2Q Two Queue 两队列(FIFO in + LRU out) 比 LRU 更抗扫描 需要调参 VoltDB
LIRS Low Inter-reference Recency Set 基于重用距离 优于 LRU 实现复杂 MySQL 5.6 曾考虑

ARC 算法详解(Megiddo & Modha, 2003, IBM Research)

ARC (Adaptive Replacement Cache) 维护四个列表:

核心思想:根据 B1 和 B2 的命中情况动态调整 T1 和 T2 的大小。如果被 B1 命中多(说明一次访问的数据后来又被访问),就增大 T1;如果被 B2 命中多,就增大 T2。

ARC 的优雅之处在于它不需要任何手动调参——算法自己根据工作负载特征自适应调整。

38.15 缓存命中率的理论分析

栈距离模型(Stack Distance Model)

Mattson 等人 (1970, "Evaluation Techniques for Storage Hierarchies") 提出了栈距离模型:对于 LRU 缓存,一次访问的栈距离 d 定义为"自上次访问该数据以来,有多少不同的数据被访问过"。如果缓存容量为 C,则该次访问命中当且仅当 d ≤ C。

这个模型的重要推论:LRU 满足包含性(inclusion property)——容量为 C 的 LRU 缓存中的所有内容,一定也在容量为 C+1 的 LRU 缓存中。这意味着增大缓存一定不会降低命中率。

但 LFU 和 ARC 不满足包含性——增大缓存在某些病态工作负载下可能降低命中率(虽然实践中极少发生)。


Level 4 · 边界与陷阱

38.16 面试中设计题的解题框架

面试中遇到"设计一个数据结构"类型的题目,使用以下五步框架:

第一步:明确接口和约束

第二步:逐个分析操作需要的"原子能力"

把每个操作拆解为基本能力:

第三步:组合数据结构

如果单一结构不能满足所有约束,用多结构组合 + 相互引用

第四步:处理边界情况

第五步:讨论扩展

面试官通常会追问,提前准备好:

38.17 并发安全的 LRU Cache

问题: 多线程环境下如何保证 LRU Cache 的正确性?

方案 1:粗粒度锁

import threading


class ConcurrentLRUCache:
    def __init__(self, capacity):
        self.cache = LRUCache(capacity)
        self.lock = threading.Lock()
    
    def get(self, key):
        with self.lock:
            return self.cache.get(key)
    
    def put(self, key, value):
        with self.lock:
            self.cache.put(key, value)

优点:简单正确。缺点:吞吐量受限,所有操作串行化。

方案 2:分段锁(Segmented/Striped Locking)

将 key 空间分成 N 个段(通常 N=16 或 N=CPU核数),每段有独立的锁和 LRU 链表。hash(key) % N 决定 key 属于哪个段。

class SegmentedLRUCache:
    def __init__(self, capacity, num_segments=16):
        self.num_segments = num_segments
        seg_capacity = max(1, capacity // num_segments)
        self.segments = [LRUCache(seg_capacity) for _ in range(num_segments)]
        self.locks = [threading.Lock() for _ in range(num_segments)]
    
    def _get_segment(self, key):
        return hash(key) % self.num_segments
    
    def get(self, key):
        seg = self._get_segment(key)
        with self.locks[seg]:
            return self.segments[seg].get(key)
    
    def put(self, key, value):
        seg = self._get_segment(key)
        with self.locks[seg]:
            self.segments[seg].put(key, value)

Java 的 ConcurrentHashMap 在 JDK 7 中就是用这种分段锁思想。Caffeine(Ben Manes 开发的高性能 Java 缓存库)使用了更精细的无锁 ring buffer + 写后台线程的方案。

方案 3:读写锁

如果读多写少,get 操作只需要读锁。但 LRU 的 get 会修改链表顺序(move_to_head),所以严格来说需要写锁。

变通:将访问记录放入一个 lock-free 的环形缓冲区(ring buffer),由后台线程定期批量更新链表。这是 Caffeine 的做法——让 get 变成近乎无锁的读操作,链表更新异步延迟执行。代价是 LRU 的"最近使用"信息有短暂延迟,但在高并发场景下,这种放松带来的吞吐量提升远大于精确性的微小损失。

38.18 分布式缓存中的 LRU

单机 LRU 的局限

单机内存有限(通常 64GB-512GB),无法缓存所有热数据。需要分布式缓存——多台机器组成缓存集群。

一致性哈希分片

将 key 通过一致性哈希分配到不同节点。每个节点独立运行自己的 LRU Cache。

问题:

Memcached 的 Slab Allocator + LRU

Memcached 按大小分类(slab class),每个 slab class 有独立的 LRU 链表。这避免了内存碎片,但可能出现小 slab class 频繁淘汰而大 slab class 空闲的不均衡。

Memcached 在 1.5 版本引入了 "slab automove" 和 "slab reassign" 机制来缓解这个问题——运行时动态在 slab class 之间重新分配内存页。

Redis Cluster 的缓存策略

Redis Cluster 将 key 空间分成 16384 个 slot,分布在多个 master 节点上。每个节点独立执行 maxmemory-policy。问题是全局来看,可能淘汰了节点 A 上不太热的 key,而节点 B 上有更冷的 key 存活——没有全局 LRU 视图。

实际工程中,这种"近似够用"的哲学贯穿始终:精确的全局 LRU 需要跨节点协调(分布式锁或共识协议),延迟和复杂度不可接受。

38.19 面试常见 Follow-up 问题

Q: 如果缓存的 value 很大(如图片),怎么优化?

A: 不在 LRU 缓存中直接存储大对象。缓存只存元数据和引用(如 CDN URL 或对象存储路径)。实际的大对象放在 CDN/S3/本地 SSD。LRU 管理的是"哪些引用是热的",而不是"哪些数据在内存中"。

Q: 如果数据有 TTL(过期时间),怎么和 LRU 结合?

A: 两种策略:

Q: 如何实现"如果 key 不在缓存中,自动从数据库加载"?

A: 这是 Cache-Aside 模式的扩展。可以在 get 中加入 loader 回调:

def get(self, key, loader=None):
    if key in self.cache:
        return self.cache[key]
    if loader:
        value = loader(key)  # 从 DB 加载
        self.put(key, value)
        return value
    return None

Google Guava 和 Caffeine 的 LoadingCache 就是这种设计。关键问题是缓存击穿——大量并发请求同一个不存在的 key,导致全部穿透到 DB。解决方案是 singleflight(Go 标准库)或 mutex + double-check。

38.20 设计题的边界情况总结

数据结构 容易遗漏的边界 后果
LRU Cache put 已存在的 key → 需要 move_to_head 淘汰顺序错误
LRU Cache capacity=1 时 put 两次 第一个被淘汰
LFU Cache 频率相同时要按 LRU 淘汰 淘汰了不该淘汰的
LFU Cache min_freq 维护 可能错误地保留 min_freq 值指向空桶
MinStack push 相同最小值两次 pop 一次后 getMin 返回错误值
RandomizedSet remove 时 val == nums[-1] 自交换(无害但需确认)
Twitter follow 自己 题目通常说不允许
Twitter 没有推文的用户调用 getNewsFeed 返回空列表

38.21 设计题与系统设计的区别

面试中的"设计数据结构"题和"系统设计"题有本质区别:

维度 设计数据结构 系统设计
代码 需要写完整可运行的代码 只需画架构图 + 伪代码
范围 单机内存中的一个类 分布式系统,多服务协作
约束 精确的时间复杂度(O(1)) 吞吐量/延迟/可用性的 trade-off
正确性 要通过所有测试用例 没有唯一正确答案
考察重点 数据结构组合能力 架构取舍能力

但两者有交集:如果系统设计中涉及"本地缓存用什么策略"、"如何实现限流器",就需要你展示数据结构层面的知识。本章的内容正是这个交集。

38.22 实战中的缓存模式选择决策树

需要缓存?
├── 数据量能放进单机内存?
│   ├── 是 → 使用进程内缓存(HashMap + LRU/LFU)
│   │   ├── 读多写少 → LRU 通常够用
│   │   ├── 有全表扫描风险 → LRU-K 或 ARC
│   │   └── 有频率倾斜 → LFU
│   └── 否 → 分布式缓存
│       ├── 一致性要求高 → Redis(持久化 + 主从复制)
│       ├── 纯缓存(丢了也行) → Memcached
│       └── 需要复杂数据结构 → Redis
├── 缓存与数据库一致性?
│   ├── 可以容忍短暂不一致 → Cache-Aside + TTL
│   ├── 需要强一致 → Write-Through 或 Write-Behind + 版本号
│   └── 最终一致即可 → 订阅 binlog 异步更新缓存
└── 缓存穿透/击穿/雪崩?
    ├── 穿透(查不存在的key)→ 布隆过滤器 + 空值缓存
    ├── 击穿(热key过期)→ 互斥锁 + 逻辑过期
    └── 雪崩(大量key同时过期)→ 随机TTL + 多级缓存

本章小结

设计类数据结构题的本质是需求分析 + 结构组合。每种基础数据结构都有自己擅长的操作和O(1)特性,当单一结构无法满足所有需求时,就需要组合多种结构并用指针/索引打通它们。

核心套路:

  1. 列出所有操作的时间复杂度要求
  2. 对每个操作,找出能满足它的基础结构
  3. 用哈希表做"胶水",把多个结构粘在一起
  4. 用哨兵节点简化链表边界条件
  5. 用"交换到末尾"实现数组的 O(1) 删除

LRU Cache 是所有设计题的基石——理解了"哈希表 + 双向链表"的组合思想,其他设计题都是同一思路的变体。而从工业界视角看,没有人真正用精确 LRU——近似、分段、异步是性能工程的必经之路。

本章评分
4.6  / 5  (3 评分)

💬 留言讨论