第 9 章

布隆过滤器与概率数据结构

第九章:布隆过滤器与概率数据结构

你的爬虫已经抓了 10 亿个 URL。每发现一个新链接,你需要判断"之前是不是已经抓过了"。用 HashSet 存?10 亿个 URL 每个平均 100 字节,光存储就需要 100GB 内存。用数据库查?每秒几万次磁盘 IO,速度跟不上。

布隆过滤器给出了一个惊人的答案:只用 1.2GB 内存(每个 URL 对应约 10 位),就能以 0.1% 的误判率告诉你某个 URL 是否"可能存在"或"一定不存在"。代价是什么?它会有极小概率说"存在"但实际不存在(假阳性),但永远不会说"不存在"但实际存在(没有假阴性)。

这种用少量空间换取近似答案的思路,催生了一整个家族:布隆过滤器、Count-Min Sketch、HyperLogLog、Cuckoo Filter......它们统称为概率数据结构(Probabilistic Data Structures)。这一章会深入每一个成员,从数学原理到工程实践。


Level 1 · 你需要知道的

9.1 布隆过滤器的基本思想

布隆过滤器(Bloom Filter)由 Burton Howard Bloom 于 1970 年提出,核心思想可以用一句话概括:

用一个位数组 + 多个哈希函数,实现空间极小的集合成员判断。查询结果只有两种:「可能存在」或「一定不存在」。

为什么是"可能存在"而不是"一定存在"?因为不同元素的哈希值可能碰撞,多个不同元素可能把同一组位都置为 1,导致一个从未插入的元素看起来"存在"。但如果有任何一个哈希位置为 0,那该元素一定没有被插入过。

直觉理解

想象一个有 m 位的位数组(bit array),初始全为 0。你有 k 个独立的哈希函数,每个哈希函数把输入映射到 [0, m-1] 的范围。

插入 "apple":
  h1("apple") = 3, h2("apple") = 7, h3("apple") = 11

  位数组:  0 0 0 1 0 0 0 1 0 0 0 1 0 0 0 0
           ↑     ↑           ↑           ↑
         idx:   3           7          11

插入 "banana":
  h1("banana") = 1, h2("banana") = 7, h3("banana") = 14

  位数组:  0 1 0 1 0 0 0 1 0 0 0 1 0 0 1 0
                ↑                         ↑
              idx:1                     idx:14

查询 "cherry":
  h1("cherry") = 3, h2("cherry") = 5, h3("cherry") = 14
  位置 3=1 ✓, 位置 5=0 ✗ → 一定不存在

查询 "grape":
  h1("grape") = 1, h2("grape") = 7, h3("grape") = 11
  位置 1=1 ✓, 位置 7=1 ✓, 位置 11=1 ✓ → 可能存在(假阳性!)

这就是布隆过滤器的精髓:牺牲一点准确性,换取巨大的空间节省

9.2 Python 实现一个完整的布隆过滤器

import math
import mmh3  # MurmurHash3,高性能非加密哈希


class BloomFilter:
    """
    布隆过滤器实现。
    
    参数:
        expected_items (int): 预期插入的元素数量 n
        false_positive_rate (float): 期望的误判率 ε,例如 0.01 表示 1%
    """
    
    def __init__(self, expected_items: int, false_positive_rate: float = 0.01):
        self.n = expected_items
        self.fp_rate = false_positive_rate
        
        # 计算最优的位数组大小 m
        # m = -n * ln(ε) / (ln2)^2
        self.m = self._optimal_bit_count(expected_items, false_positive_rate)
        
        # 计算最优的哈希函数个数 k
        # k = (m/n) * ln2
        self.k = self._optimal_hash_count(self.m, expected_items)
        
        # 位数组,用 bytearray 实现
        self.bit_array = bytearray(math.ceil(self.m / 8))
        self.count = 0  # 已插入元素数量
    
    @staticmethod
    def _optimal_bit_count(n: int, fp_rate: float) -> int:
        """计算最优的位数组大小"""
        m = -n * math.log(fp_rate) / (math.log(2) ** 2)
        return int(math.ceil(m))
    
    @staticmethod
    def _optimal_hash_count(m: int, n: int) -> int:
        """计算最优的哈希函数个数"""
        k = (m / n) * math.log(2)
        return int(math.ceil(k))
    
    def _get_hash_positions(self, item: str) -> list:
        """
        用双重哈希技术生成 k 个哈希位置。
        
        技术要点:不需要真的创建 k 个独立哈希函数,
        只用两个哈希值 h1, h2 就能生成 k 个:
        hash_i = (h1 + i * h2) % m
        
        这个技术来自 Kirsch & Mitzenmacher (2006) 的论文
        "Less Hashing, Same Performance: Building a Better Bloom Filter"。
        """
        h1 = mmh3.hash(item, seed=0) % self.m
        h2 = mmh3.hash(item, seed=42) % self.m
        
        positions = []
        for i in range(self.k):
            pos = (h1 + i * h2) % self.m
            positions.append(pos)
        return positions
    
    def _set_bit(self, position: int):
        """将指定位置的位设为 1"""
        byte_idx = position // 8
        bit_idx = position % 8
        self.bit_array[byte_idx] |= (1 << bit_idx)
    
    def _get_bit(self, position: int) -> bool:
        """获取指定位置的位值"""
        byte_idx = position // 8
        bit_idx = position % 8
        return bool(self.bit_array[byte_idx] & (1 << bit_idx))
    
    def add(self, item: str):
        """插入一个元素"""
        for pos in self._get_hash_positions(item):
            self._set_bit(pos)
        self.count += 1
    
    def __contains__(self, item: str) -> bool:
        """
        查询元素是否可能存在。
        
        返回 True: 元素可能存在(有 fp_rate 的概率是误判)
        返回 False: 元素一定不存在
        """
        return all(self._get_bit(pos) for pos in self._get_hash_positions(item))
    
    @property
    def current_false_positive_rate(self) -> float:
        """根据当前已插入元素数量,计算实际误判率"""
        # (1 - e^(-kn/m))^k
        exponent = -self.k * self.count / self.m
        return (1 - math.exp(exponent)) ** self.k
    
    def __repr__(self):
        return (
            f"BloomFilter(m={self.m} bits, k={self.k} hashes, "
            f"n={self.count}/{self.n}, "
            f"actual_fp≈{self.current_false_positive_rate:.6f})"
        )


# 使用示例
if __name__ == "__main__":
    # 创建一个期望存 100万 个元素、误判率 1% 的布隆过滤器
    bf = BloomFilter(expected_items=1_000_000, false_positive_rate=0.01)
    print(f"位数组大小: {bf.m:,} bits = {bf.m / 8 / 1024 / 1024:.2f} MB")
    print(f"哈希函数个数: {bf.k}")
    # 输出: 位数组大小: 9,585,059 bits = 1.14 MB
    # 输出: 哈希函数个数: 7
    
    # 插入数据
    for i in range(100_000):
        bf.add(f"url_{i}")
    
    # 查询已存在元素
    print(f"'url_42' in bf: {'url_42' in bf}")   # True(正确)
    print(f"'url_99999' in bf: {'url_99999' in bf}")  # True(正确)
    
    # 查询不存在的元素
    false_positives = 0
    test_count = 100_000
    for i in range(test_count):
        if f"not_exist_{i}" in bf:
            false_positives += 1
    
    print(f"实测误判率: {false_positives/test_count:.4f}")
    # 约 0.0001(因为只插入了 10 万个,远少于设计容量 100 万)
    print(bf)

关键观察

  1. 100 万个 URL,只用 1.14 MB。如果用 Python 的 set 存储 100 万个 URL(平均 50 字符),至少需要 100MB+。
  2. 哈希函数个数 k=7。不是越多越好,也不是越少越好,有最优值。
  3. 误判率随插入量增加而上升。设计时的 expected_items 是关键参数,超出后误判率会急剧恶化。

9.3 误判率公式与最优参数

布隆过滤器有三个核心参数的关系:

误判率公式:

$$\varepsilon = \left(1 - e^{-kn/m}\right)^k$$

最优哈希函数个数:

$$k_{opt} = \frac{m}{n} \cdot \ln 2 \approx 0.693 \cdot \frac{m}{n}$$

最优位数组大小:

$$m = -\frac{n \cdot \ln \varepsilon}{(\ln 2)^2} \approx 1.44 \cdot n \cdot \log_2\frac{1}{\varepsilon}$$

import math

def bloom_filter_params(n: int, fp_rate: float):
    """给定元素数量和期望误判率,计算最优参数"""
    # 最优 m
    m = -n * math.log(fp_rate) / (math.log(2) ** 2)
    m = int(math.ceil(m))
    
    # 最优 k
    k = (m / n) * math.log(2)
    k = int(math.ceil(k))
    
    # 每个元素占用的位数
    bits_per_element = m / n
    
    print(f"预期元素数: {n:,}")
    print(f"期望误判率: {fp_rate}")
    print(f"位数组大小: {m:,} bits = {m/8/1024/1024:.2f} MB")
    print(f"哈希函数数: {k}")
    print(f"每元素位数: {bits_per_element:.1f} bits/element")
    return m, k

# 不同误判率下的参数对比
print("=== 1% 误判率 ===")
bloom_filter_params(10_000_000, 0.01)
# m=95,850,584 bits=11.44 MB, k=7, 9.6 bits/elem

print("\n=== 0.1% 误判率 ===")
bloom_filter_params(10_000_000, 0.001)
# m=143,775,877 bits=17.15 MB, k=10, 14.4 bits/elem

print("\n=== 0.01% 误判率 ===")
bloom_filter_params(10_000_000, 0.0001)
# m=191,701,169 bits=22.87 MB, k=14, 19.2 bits/elem

实用规律:误判率每降低 10 倍,每个元素多需要约 4.8 位(即 m/n 增加约 4.8)。

9.4 典型应用场景

场景 1:缓存穿透防护

"""
问题:恶意请求查询大量不存在的 key,所有请求穿透缓存直接打到数据库。
解决:在缓存层前加布隆过滤器,不在过滤器中的 key 直接返回空。
"""

class CacheWithBloomFilter:
    def __init__(self, all_valid_keys):
        # 将所有合法 key 加入布隆过滤器
        self.bloom = BloomFilter(
            expected_items=len(all_valid_keys),
            false_positive_rate=0.001  # 0.1% 误判率
        )
        for key in all_valid_keys:
            self.bloom.add(key)
        
        self.cache = {}  # 模拟缓存层
        self.db = {}     # 模拟数据库层
    
    def get(self, key: str):
        # 第一层:布隆过滤器拦截
        if key not in self.bloom:
            return None  # 一定不存在,直接返回
        
        # 第二层:查缓存
        if key in self.cache:
            return self.cache[key]
        
        # 第三层:查数据库(只有极少数请求到这里)
        value = self.db.get(key)
        if value is not None:
            self.cache[key] = value
        return value

效果:假设恶意请求全部查询不存在的 key,布隆过滤器能拦截 99.9% 的请求(0.1% 误判率意味着只有千分之一的假请求会穿透到数据库)。

场景 2:爬虫 URL 去重

"""
10 亿 URL 去重:
- HashSet 方案: ~100GB 内存(不可行)
- 数据库方案: 太慢
- 布隆过滤器: ~1.2GB 内存(10 bits/URL, 0.1%误判率)
"""

class WebCrawler:
    def __init__(self, expected_urls=1_000_000_000):
        self.seen_urls = BloomFilter(
            expected_items=expected_urls,
            false_positive_rate=0.001
        )
    
    def should_crawl(self, url: str) -> bool:
        """判断是否应该抓取该 URL"""
        if url in self.seen_urls:
            return False  # 可能已抓过,跳过
        
        # 标记为已抓取
        self.seen_urls.add(url)
        return True
    
    def crawl(self, seed_urls: list):
        queue = list(seed_urls)
        while queue:
            url = queue.pop(0)
            if self.should_crawl(url):
                # 抓取页面并提取新链接
                new_links = self._fetch_and_extract(url)
                queue.extend(new_links)

权衡:0.1% 的误判率意味着大约有 100 万个 URL 会被错误跳过(10 亿 × 0.1%)。对于大规模爬虫来说,这完全可以接受。

场景 3:垃圾邮件过滤

"""
Google Chrome 早期用布隆过滤器存储恶意 URL 黑名单。
好处:不需要把完整的恶意 URL 列表下载到客户端,
只需要一个紧凑的布隆过滤器(几 MB)就能覆盖数百万恶意 URL。
"""

class SpamFilter:
    def __init__(self, known_spam_addresses):
        self.spam_bloom = BloomFilter(
            expected_items=len(known_spam_addresses),
            false_positive_rate=0.0001  # 万分之一误判率
        )
        for addr in known_spam_addresses:
            self.spam_bloom.add(addr.lower())
    
    def is_spam(self, sender: str) -> str:
        """
        返回: "definitely_not_spam" 或 "possibly_spam"
        """
        if sender.lower() not in self.spam_bloom:
            return "definitely_not_spam"
        else:
            # 可能是垃圾邮件,需要进一步检查
            return "possibly_spam"

9.5 布隆过滤器的局限性

  1. 不能删除元素:把某个位从 1 设回 0 可能影响其他元素的判断(多个元素共享同一个位)。
  2. 不能获取已插入的元素列表:布隆过滤器只存哈希信息,原始数据丢失。
  3. 容量固定:创建时必须确定预期元素数量,超出后误判率急剧上升。
  4. 只适用于集合成员判断:不能存储关联值(不像哈希表那样能存 key-value)。

Level 2 · 它是怎么运行的

9.6 布隆过滤器数学推导

让我们从第一原理推导误判率公式。

假设:k 个哈希函数将元素均匀随机地映射到 m 位的位数组中。

步骤 1:插入一个元素时,某个特定位没有被任何一个哈希函数设为 1 的概率是:

$$P(\text{该位为 0 after 1 hash}) = 1 - \frac{1}{m}$$

k 个哈希函数都没有设置该位:

$$P(\text{该位为 0 after 1 element}) = \left(1 - \frac{1}{m}\right)^k$$

步骤 2:插入 n 个元素后,该位仍然为 0 的概率:

$$P(\text{该位为 0 after n elements}) = \left(1 - \frac{1}{m}\right)^{kn}$$

利用极限 $(1-1/m)^m \to e^{-1}$(当 m 足够大时):

$$P(\text{该位为 0}) \approx e^{-kn/m}$$

步骤 3:该位为 1 的概率:

$$P(\text{该位为 1}) = 1 - e^{-kn/m}$$

步骤 4:假阳性发生的条件是 k 个哈希位置全为 1。假设各位独立(这是近似,但 Mitzenmacher & Upfal 2005 证明了该近似在实践中非常精确):

$$\varepsilon = \left(1 - e^{-kn/m}\right)^k$$

最优 k 的推导

对 ε 取对数:

$$\ln \varepsilon = k \cdot \ln\left(1 - e^{-kn/m}\right)$$

设 $p = e^{-kn/m}$(即某一位为 0 的概率),则:

$$\ln \varepsilon = k \cdot \ln(1-p)$$

要最小化 ε,等价于最小化 $k \cdot \ln(1-p)$。通过对 k 求导并令导数为零,可以得到当 $p = 1/2$ 时(即每一位被设为 1 的概率恰好是 1/2)误判率最小。

从 $p = e^{-kn/m} = 1/2$ 解出:

$$k = \frac{m}{n} \cdot \ln 2$$

此时误判率为:

$$\varepsilon_{min} = (1/2)^k = (1/2)^{(m/n)\ln 2} = 2^{-(m/n)\ln 2}$$

反过来,给定目标误判率 ε,需要的位数组大小为:

$$m = -\frac{n \ln \varepsilon}{(\ln 2)^2} = \frac{n \ln(1/\varepsilon)}{(\ln 2)^2} \approx 1.44 \cdot n \cdot \log_2\frac{1}{\varepsilon}$$

import math

def verify_formula(m, n, k):
    """验证理论公式与实际误判率的一致性"""
    # 理论误判率
    theoretical_fp = (1 - math.exp(-k * n / m)) ** k
    
    # 模拟实验
    import random
    bit_array = [0] * m
    
    # 插入 n 个随机元素
    for _ in range(n):
        for _ in range(k):
            pos = random.randint(0, m - 1)
            bit_array[pos] = 1
    
    # 测试 10000 个不存在的元素
    false_positives = 0
    tests = 10000
    for _ in range(tests):
        if all(bit_array[random.randint(0, m - 1)] == 1 for _ in range(k)):
            false_positives += 1
    
    actual_fp = false_positives / tests
    print(f"理论误判率: {theoretical_fp:.6f}")
    print(f"实测误判率: {actual_fp:.6f}")
    print(f"相对误差: {abs(theoretical_fp - actual_fp) / theoretical_fp * 100:.1f}%")

# m=10000, n=1000, k=7 (接近最优)
verify_formula(10000, 1000, 7)

9.7 Count-Min Sketch:频率估计

布隆过滤器回答"元素是否存在",但如果我们想问"这个元素出现了多少次"呢?比如:网络流量中每个 IP 的访问频率、文本中每个词的出现次数。

Count-Min Sketch(Cormode & Muthukrishnan, 2005, "An Improved Data Stream Summary: The Count-Min Sketch and its Applications")解决的就是这个问题。

核心思想

Count-Min Sketch 是一个二维计数数组(d 行 × w 列),配合 d 个哈希函数。

import mmh3
import math


class CountMinSketch:
    """
    Count-Min Sketch: 频率估计的概率数据结构
    
    保证: 估计值 >= 真实值(只会高估,不会低估)
    误差界: P(估计值 - 真实值 > ε * N) < δ
    其中 N 是所有元素的总频率之和
    
    参数:
        epsilon: 误差参数,估计误差不超过 ε*N
        delta: 失败概率,误差超过 ε*N 的概率小于 δ
    """
    
    def __init__(self, epsilon: float = 0.001, delta: float = 0.01):
        # w = ceil(e/ε), d = ceil(ln(1/δ))
        self.w = int(math.ceil(math.e / epsilon))
        self.d = int(math.ceil(math.log(1.0 / delta)))
        self.table = [[0] * self.w for _ in range(self.d)]
        self.total_count = 0
    
    def _hash(self, item: str, row: int) -> int:
        """第 row 个哈希函数"""
        return mmh3.hash(item, seed=row) % self.w
    
    def add(self, item: str, count: int = 1):
        """增加元素的计数"""
        self.total_count += count
        for row in range(self.d):
            col = self._hash(item, row)
            self.table[row][col] += count
    
    def estimate(self, item: str) -> int:
        """
        估计元素的频率。
        返回值 >= 真实频率(因为哈希碰撞只会增加计数)
        取各行最小值来减少高估幅度。
        """
        min_count = float('inf')
        for row in range(self.d):
            col = self._hash(item, row)
            min_count = min(min_count, self.table[row][col])
        return min_count
    
    @property
    def memory_usage_bytes(self) -> int:
        """估计内存使用(每个计数器 4 字节)"""
        return self.w * self.d * 4


# 示例:统计网站访问频率
cms = CountMinSketch(epsilon=0.001, delta=0.01)
print(f"表大小: {cms.d} × {cms.w} = {cms.d * cms.w:,} 个计数器")
print(f"内存使用: {cms.memory_usage_bytes / 1024:.1f} KB")

# 模拟访问日志
import random
access_log = []
# 模拟幂律分布:少数 IP 访问很多次
for i in range(100):
    freq = int(10000 / (i + 1))  # 齐普夫分布
    access_log.extend([f"192.168.1.{i}"] * freq)

random.shuffle(access_log)

# 用 CMS 统计
exact_counts = {}
for ip in access_log:
    cms.add(ip)
    exact_counts[ip] = exact_counts.get(ip, 0) + 1

# 验证精度
for ip in ["192.168.1.0", "192.168.1.9", "192.168.1.99"]:
    exact = exact_counts.get(ip, 0)
    estimated = cms.estimate(ip)
    error = estimated - exact
    print(f"{ip}: 精确={exact}, 估计={estimated}, 误差={error}")

为什么取最小值?

每个哈希函数都可能有碰撞(其他元素映射到同一列),碰撞只会增加计数。取 d 行的最小值,等于取碰撞最少的那一行的结果,能有效降低高估的幅度。

误差保证(来自 Cormode & Muthukrishnan 2005):

$$P\left[\hat{f}(x) - f(x) > \varepsilon \cdot N\right] < \delta$$

其中 $\hat{f}(x)$ 是估计频率,$f(x)$ 是真实频率,$N$ 是总元素数。

9.8 HyperLogLog:基数估计

"这个网站今天有多少不同的访客?"这是一个基数估计(cardinality estimation)问题。精确计算需要存储所有唯一访客 ID(可能几十 GB),HyperLogLog 只用约 12KB 就能估计上亿的基数,标准误差约 0.81%。

基本原理

HyperLogLog(Flajolet, Fusy, Gandouet & Meunier, 2007, "HyperLogLog: the analysis of a near-optimal cardinality estimation algorithm")基于一个精妙的概率观察:

如果你抛硬币,看到连续 k 个正面的概率是 $1/2^k$。如果你在大量实验中观察到的最长连续正面序列是 k,那么你大约做了 $2^k$ 次实验。

将这个思想应用到哈希值:把元素哈希为均匀分布的位串,观察前导零的最大个数来估计不同元素的数量。

import mmh3
import math


class HyperLogLog:
    """
    HyperLogLog 基数估计算法
    
    使用 2^p 个桶(registers),每个桶存储观察到的最大前导零数+1。
    标准误差 ≈ 1.04 / sqrt(2^p)
    
    参数:
        p: 精度参数(4 <= p <= 16),使用 2^p 个桶
           p=14 时有 16384 个桶,标准误差约 0.81%
    """
    
    def __init__(self, p: int = 14):
        assert 4 <= p <= 16, "p must be between 4 and 16"
        self.p = p
        self.m = 1 << p  # 桶的数量 = 2^p
        self.registers = [0] * self.m
        
        # α_m 修正因子
        if self.m == 16:
            self.alpha = 0.673
        elif self.m == 32:
            self.alpha = 0.697
        elif self.m == 64:
            self.alpha = 0.709
        else:
            self.alpha = 0.7213 / (1 + 1.079 / self.m)
    
    def _hash(self, item: str) -> int:
        """返回 64 位哈希值"""
        # 使用 mmh3 的 128 位哈希,取低 64 位
        h = mmh3.hash128(item, signed=False)
        return h & ((1 << 64) - 1)
    
    def _leading_zeros(self, hash_value: int, start_bit: int) -> int:
        """计算从 start_bit 位开始的前导零个数 + 1"""
        # 我们检查从第 p 位开始的位
        count = 1
        for i in range(start_bit, 64):
            if hash_value & (1 << (63 - i)):
                break
            count += 1
        return count
    
    def add(self, item: str):
        """添加一个元素"""
        h = self._hash(item)
        # 前 p 位决定桶的索引
        bucket_idx = h >> (64 - self.p)
        # 剩余位用于计算前导零
        w = self._leading_zeros(h, self.p)
        # 保留最大值
        self.registers[bucket_idx] = max(self.registers[bucket_idx], w)
    
    def estimate(self) -> int:
        """估计基数"""
        # 调和平均数
        indicator = sum(2.0 ** (-r) for r in self.registers)
        raw_estimate = self.alpha * self.m * self.m / indicator
        
        # 小基数修正
        if raw_estimate <= 2.5 * self.m:
            # 计算空桶数量
            zeros = self.registers.count(0)
            if zeros > 0:
                # Linear Counting
                raw_estimate = self.m * math.log(self.m / zeros)
        
        # 大基数修正(超过 2^32 / 30 时)
        elif raw_estimate > (1 << 32) / 30.0:
            raw_estimate = -(1 << 32) * math.log(1 - raw_estimate / (1 << 32))
        
        return int(raw_estimate)
    
    @property
    def memory_usage_bytes(self) -> int:
        """每个 register 用 6 位就够(最大值 64),这里简化用 1 字节"""
        return self.m
    
    def merge(self, other: 'HyperLogLog'):
        """合并两个 HLL(取每个桶的最大值),支持分布式计算"""
        assert self.p == other.p, "Cannot merge HLLs with different precision"
        for i in range(self.m):
            self.registers[i] = max(self.registers[i], other.registers[i])


# 演示
hll = HyperLogLog(p=14)
print(f"桶数量: {hll.m:,}")
print(f"内存使用: {hll.memory_usage_bytes:,} bytes ≈ {hll.memory_usage_bytes/1024:.1f} KB")
print(f"理论标准误差: {1.04 / math.sqrt(hll.m):.4f} = {1.04 / math.sqrt(hll.m) * 100:.2f}%")

# 添加不同数量的元素并检查精度
import random
for true_cardinality in [1000, 10000, 100000, 1000000]:
    hll = HyperLogLog(p=14)
    for i in range(true_cardinality):
        hll.add(f"user_{i}")
    
    estimated = hll.estimate()
    error = abs(estimated - true_cardinality) / true_cardinality * 100
    print(f"真实基数: {true_cardinality:>10,} | "
          f"估计: {estimated:>10,} | "
          f"误差: {error:.2f}%")

Redis 中的 HyperLogLog

Redis 原生支持 HyperLogLog,使用固定 12KB 内存:

import redis

r = redis.Redis()

# PFADD: 添加元素到 HLL
r.pfadd("visitors:2024-01-15", "user_1", "user_2", "user_3")
r.pfadd("visitors:2024-01-15", "user_2", "user_4")  # user_2 重复

# PFCOUNT: 估计基数
count = r.pfcount("visitors:2024-01-15")
print(f"今日独立访客数: {count}")  # 输出: 4

# PFMERGE: 合并多个 HLL(求并集基数)
r.pfmerge("visitors:week", "visitors:2024-01-15", "visitors:2024-01-16")
weekly_count = r.pfcount("visitors:week")
print(f"本周独立访客数: {weekly_count}")

Redis HLL 实现细节(Antirez, Redis 源码):

9.9 Cuckoo Filter:支持删除的布隆过滤器

布隆过滤器最大的痛点是不能删除元素。Cuckoo Filter(Fan, Andersen, Kaminsky & Mitzenmacher, 2014, "Cuckoo Filter: Practically Better Than Bloom")解决了这个问题。

核心思想

Cuckoo Filter 的名字来源于布谷鸟哈希(Cuckoo Hashing):

  1. 每个桶可以存储多个指纹(fingerprint,元素哈希值的一小段)。
  2. 每个元素有两个候选桶位置。
  3. 插入时如果两个位置都满了,随机踢出一个已有指纹到它的备选位置(像布谷鸟抢巢)。
  4. 删除只需从桶中移除对应的指纹。
import mmh3
import random


class CuckooFilter:
    """
    Cuckoo Filter: 支持删除的概率集合数据结构
    
    相比布隆过滤器的优势:
    1. 支持删除操作
    2. 查询速度更快(只需检查 2 个位置)
    3. 在相同误判率下,空间效率通常更好(当 ε < 3%)
    """
    
    def __init__(self, capacity: int, bucket_size: int = 4, 
                 fingerprint_bits: int = 8, max_kicks: int = 500):
        """
        参数:
            capacity: 桶的数量
            bucket_size: 每个桶能存的指纹数量
            fingerprint_bits: 指纹的位数(决定误判率 ≈ 2*bucket_size/2^fingerprint_bits)
            max_kicks: 最大踢出次数
        """
        self.capacity = capacity
        self.bucket_size = bucket_size
        self.fingerprint_bits = fingerprint_bits
        self.max_kicks = max_kicks
        self.buckets = [[] for _ in range(capacity)]
        self.count = 0
    
    def _fingerprint(self, item: str) -> int:
        """计算元素的指纹"""
        h = mmh3.hash(item, seed=0) & ((1 << self.fingerprint_bits) - 1)
        # 指纹不能为 0(0 表示空位)
        return h if h != 0 else 1
    
    def _hash1(self, item: str) -> int:
        """第一个桶位置"""
        return mmh3.hash(item, seed=42) % self.capacity
    
    def _hash2(self, index: int, fingerprint: int) -> int:
        """
        第二个桶位置:用部分键布谷鸟哈希
        i2 = i1 XOR hash(fingerprint)
        
        这个设计保证:知道任一位置和指纹,就能算出另一个位置
        """
        fp_hash = mmh3.hash(str(fingerprint), seed=100) % self.capacity
        return (index ^ fp_hash) % self.capacity
    
    def insert(self, item: str) -> bool:
        """
        插入元素。返回 True 表示成功,False 表示过滤器已满。
        """
        fp = self._fingerprint(item)
        i1 = self._hash1(item)
        i2 = self._hash2(i1, fp)
        
        # 尝试两个桶
        if len(self.buckets[i1]) < self.bucket_size:
            self.buckets[i1].append(fp)
            self.count += 1
            return True
        if len(self.buckets[i2]) < self.bucket_size:
            self.buckets[i2].append(fp)
            self.count += 1
            return True
        
        # 两个桶都满了,开始踢出
        idx = random.choice([i1, i2])
        for _ in range(self.max_kicks):
            # 随机选一个指纹踢出
            victim_idx = random.randint(0, self.bucket_size - 1)
            fp, self.buckets[idx][victim_idx] = self.buckets[idx][victim_idx], fp
            
            # 被踢出的指纹去它的备选位置
            idx = self._hash2(idx, fp)
            if len(self.buckets[idx]) < self.bucket_size:
                self.buckets[idx].append(fp)
                self.count += 1
                return True
        
        # 踢出次数超限,认为过滤器已满
        return False
    
    def __contains__(self, item: str) -> bool:
        """查询元素是否可能存在"""
        fp = self._fingerprint(item)
        i1 = self._hash1(item)
        i2 = self._hash2(i1, fp)
        return fp in self.buckets[i1] or fp in self.buckets[i2]
    
    def delete(self, item: str) -> bool:
        """
        删除元素。返回 True 表示删除成功。
        
        注意:只有确实插入过的元素才应该被删除。
        删除一个从未插入的元素可能导致假阴性!
        """
        fp = self._fingerprint(item)
        i1 = self._hash1(item)
        i2 = self._hash2(i1, fp)
        
        if fp in self.buckets[i1]:
            self.buckets[i1].remove(fp)
            self.count -= 1
            return True
        if fp in self.buckets[i2]:
            self.buckets[i2].remove(fp)
            self.count -= 1
            return True
        return False


# 演示删除能力
cf = CuckooFilter(capacity=10000, bucket_size=4, fingerprint_bits=12)

# 插入
for i in range(5000):
    cf.insert(f"item_{i}")

print(f"插入后: 'item_42' in cf = {'item_42' in cf}")  # True

# 删除
cf.delete("item_42")
print(f"删除后: 'item_42' in cf = {'item_42' in cf}")  # False

# 误判率测试
false_positives = 0
tests = 10000
for i in range(tests):
    if f"not_exist_{i}" in cf:
        false_positives += 1
print(f"误判率: {false_positives/tests:.4f}")
# 理论误判率 ≈ 2 * bucket_size / 2^fingerprint_bits = 8/4096 ≈ 0.002

Cuckoo Filter vs Bloom Filter 对比

特性 Bloom Filter Cuckoo Filter
删除操作 不支持 支持
查询速度 检查 k 个位置 只检查 2 个位置
空间效率(ε < 3%) 较差 更好
空间效率(ε > 3%) 更好 较差
插入(满载时) 总是 O(1) 可能需要踢出
实现复杂度 简单 中等

Level 3 · 规范怎么定义的

9.10 Burton Bloom 1970 年原始论文

Burton Howard Bloom 在 1970 年发表了论文 "Space/Time Trade-offs in Hash Coding with Allowable Errors"(Communications of the ACM, Vol. 13, No. 7, July 1970, pp. 422-426)。这篇仅 5 页的论文定义了整个领域。

原始动机

Bloom 的原始问题是:如何用有限内存判断一个英语单词是否在字典中(用于拼写检查或连字符位置查询)?当时的计算机内存极其有限,完整的字典需要太多空间。

Bloom 的关键洞察是:如果允许一小部分误判(把不在字典中的词判断为在字典中),可以用远小于原始数据的空间来解决成员判断问题

论文核心贡献

  1. 提出了位向量 + 多哈希的架构:这个设计在 50 多年后仍然没有本质改变。
  2. 分析了最优参数选择:给出了误判率公式和最优哈希函数数量。
  3. 指出了 trade-off 的本质:更多内存 → 更低误判率;更多哈希函数 → 更慢的速度但更低的误判率(到某个最优点之后反而增加误判率)。

历史影响

Bloom Filter 被发明时是为了节省磁盘访问(那个年代磁盘 seek 极慢),后来被广泛应用于:

9.11 信息论下界

为什么布隆过滤器需要 $1.44 \cdot n \cdot \log_2(1/\varepsilon)$ 位?这不是巧合,而是接近信息论的极限。

信息论分析

考虑一个能回答"元素 x 是否在集合 S 中"的数据结构。集合 S 有 n 个元素,来自一个大小为 U 的全集。这个数据结构的信息论下界是多少?

如果允许误判率 ε,那么对于不在 S 中的任何元素,数据结构必须能区分"在 S 中"和"不在 S 中",允许 ε 的错误率。

Carter, Floyd, Gill, Markowsky & Wegman (1978) 证明了:

定理:任何支持集合成员查询、误判率不超过 ε 的数据结构,至少需要 $n \cdot \log_2(1/\varepsilon)$ 位。

布隆过滤器使用 $1.44 \cdot n \cdot \log_2(1/\varepsilon)$ 位,比下界多了 44%。这个 1.44 的因子来自于:

$$\frac{1}{(\ln 2)^2} = \frac{1}{0.4805} \approx 2.0814$$

而 $\frac{m}{n \cdot \log_2(1/\varepsilon)} = \frac{-\ln\varepsilon / (\ln 2)^2}{\log_2(1/\varepsilon)} = \frac{\ln(1/\varepsilon) / (\ln 2)^2}{\ln(1/\varepsilon) / \ln 2} = \frac{1}{\ln 2} \approx 1.44$

也就是说,布隆过滤器的空间效率是信息论最优的 1/1.44 ≈ 69.3%

更紧凑的数据结构

有没有能达到信息论下界的数据结构?有的:

# 对比不同数据结构的空间效率
import math

def space_comparison(n, epsilon):
    """对比不同数据结构的理论空间需求"""
    info_lower_bound = n * math.log2(1/epsilon)  # 信息论下界
    bloom_space = 1.44 * n * math.log2(1/epsilon)  # 标准布隆过滤器
    xor_space = 1.23 * n * math.log2(1/epsilon)    # Xor Filter
    
    print(f"n={n:,}, ε={epsilon}")
    print(f"  信息论下界:     {info_lower_bound/8/1024/1024:.2f} MB (100%)")
    print(f"  布隆过滤器:     {bloom_space/8/1024/1024:.2f} MB ({bloom_space/info_lower_bound*100:.1f}%)")
    print(f"  Xor Filter:    {xor_space/8/1024/1024:.2f} MB ({xor_space/info_lower_bound*100:.1f}%)")
    print()

space_comparison(10_000_000, 0.01)
space_comparison(10_000_000, 0.001)
space_comparison(1_000_000_000, 0.01)

9.12 Count-Min Sketch 的理论保证

Cormode & Muthukrishnan 2005 年的论文 "An Improved Data Stream Summary: The Count-Min Sketch and its Applications" 给出了严格的理论保证。

参数选择

给定误差参数 ε 和失败概率 δ:

误差保证

对于任意元素 x:

$$f(x) \leq \hat{f}(x) \leq f(x) + \varepsilon \cdot N$$

其中 $\hat{f}(x)$ 是估计频率,$f(x)$ 是真实频率,$N$ 是所有元素的总频率。这个保证以至少 $1-\delta$ 的概率成立。

为什么是单侧误差?

Count-Min Sketch 只会高估频率,不会低估。这是因为碰撞只会增加计数器的值。这个性质在很多应用中非常有用——例如,如果 CMS 说某个 IP 的访问频率没有超过阈值,那它一定没有超过。

Heavy Hitter 问题

Count-Min Sketch 的一个经典应用是找重击者(Heavy Hitters):在数据流中找出频率超过某个阈值的元素。

class HeavyHitterDetector:
    """
    使用 Count-Min Sketch 检测重击者(频率超过 φ*N 的元素)
    """
    
    def __init__(self, phi: float = 0.01, epsilon: float = 0.001, delta: float = 0.01):
        """
        参数:
            phi: 重击者阈值(频率超过 phi * N 的元素)
            epsilon: CMS 误差参数(应远小于 phi)
            delta: 失败概率
        """
        self.phi = phi
        self.cms = CountMinSketch(epsilon=epsilon, delta=delta)
        self.candidates = set()  # 候选重击者
    
    def add(self, item: str):
        self.cms.add(item)
        # 如果估计频率超过阈值,加入候选集
        threshold = self.phi * self.cms.total_count
        if self.cms.estimate(item) >= threshold:
            self.candidates.add(item)
    
    def get_heavy_hitters(self) -> list:
        """返回所有可能的重击者及其估计频率"""
        threshold = self.phi * self.cms.total_count
        results = []
        for item in self.candidates:
            est = self.cms.estimate(item)
            if est >= threshold:
                results.append((item, est))
        return sorted(results, key=lambda x: -x[1])

9.13 HyperLogLog 的理论基础

从 Flajolet-Martin 到 HyperLogLog

基数估计算法的发展历史:

  1. Flajolet-Martin (1985):"Probabilistic Counting Algorithms for Data Base Applications"。使用位模式观察,单个估计器方差很大。
  2. LogLog (2003):Durand & Flajolet。使用 m 个桶取算术平均,空间 O(log log n) per bucket。
  3. SuperLogLog (2003):截断异常值后取平均。
  4. HyperLogLog (2007):Flajolet, Fusy, Gandouet & Meunier。使用调和平均代替算术平均,精度提升到标准误差 $1.04/\sqrt{m}$。

为什么用调和平均而不是算术平均?

假设 m 个桶的值为 $M_1, M_2, \ldots, M_m$:

修正因子 α_m

修正因子 $\alpha_m$ 用于消除估计器的偏差:

$$\alpha_m = \left(m \int_0^\infty \left(\log_2\frac{2+u}{1+u}\right)^m du\right)^{-1}$$

当 m 较大时,$\alpha_m \to 1/(2\ln 2) \approx 0.7213$。

Redis 中 p=14 对应的 $\alpha_{16384}$ 取值为 0.7213/(1+1.079/16384) ≈ 0.72125。

9.14 Redis 中的 HyperLogLog 实现

Redis 的 HyperLogLog 实现(由 Salvatore Sanfilippo 即 Antirez 编写)有几个重要的工程优化:

稀疏表示与密集表示

Redis 使用两种编码方式:

  1. 稀疏编码(小基数时):使用游程编码(RLE)压缩连续的零值桶。当大部分桶为 0 时,稀疏表示远小于 12KB。

  2. 密集编码(大基数时):16384 个桶,每个 6 位,总计 12,288 字节。当稀疏编码超过一定阈值(server.hll_sparse_max_bytes,默认 3000 字节)或桶值超过 32 时,自动转换为密集编码。

# Redis HLL 的稀疏编码示意
class RedisHLLSparse:
    """
    Redis HLL 稀疏编码的简化模型
    
    三种操作码:
    - ZERO(len): len 个连续的零值桶 (1 byte, len <= 64)
    - XZERO(len): 更长的连续零值桶 (2 bytes, len <= 16384)
    - VAL(value, len): len 个连续的相同非零值桶 (1 byte, value <= 32, len <= 4)
    """
    
    def __init__(self):
        # 初始状态:16384 个零值桶 = 一个 XZERO(16384) 操作码
        self.encoding = [("XZERO", 16384)]
        self.is_dense = False
    
    def estimated_memory(self):
        """估计当前内存使用"""
        total = 0
        for op in self.encoding:
            if op[0] == "ZERO":
                total += 1
            elif op[0] == "XZERO":
                total += 2
            elif op[0] == "VAL":
                total += 1
        return total

偏差修正(Bias Correction)

Redis 实现了 Google 提出的改进修正策略(Heule, Nunkesser & Hall, 2013, "HyperLogLog in Practice"):


Level 4 · 边界与陷阱

9.15 布隆过滤器不能删除元素的深入分析

为什么不能删除?

考虑两个元素 A 和 B,它们的哈希位置有重叠:

位置 7 是共享的。如果删除 A(将位置 3, 7, 11 清零),位置 7 被清零会导致查询 B 时返回 False——B 明明存在,却说不存在。这引入了假阴性,破坏了布隆过滤器"不存在则一定不存在"的核心保证

解决方案比较

方案 原理 优点 缺点
Counting Bloom Filter 每个位换成计数器 简单直觉 空间增大 3-4 倍
Cuckoo Filter 存指纹+布谷鸟哈希 空间效率好 实现复杂
Quotient Filter 商余数存储 缓存友好 聚集问题
重建 定期从源数据重建 无额外空间 重建开销大
class CountingBloomFilter:
    """
    计数布隆过滤器:每个位变成一个 4-bit 计数器
    
    支持删除操作,但空间开销是标准布隆过滤器的 4 倍。
    计数器溢出(>15)时停止递增(饱和计数),可能导致假阴性
    但这在实践中极其罕见。
    """
    
    def __init__(self, expected_items: int, false_positive_rate: float = 0.01):
        import math
        self.m = int(math.ceil(-expected_items * math.log(false_positive_rate) 
                               / (math.log(2) ** 2)))
        self.k = int(math.ceil((self.m / expected_items) * math.log(2)))
        # 4-bit 计数器,2 个计数器共用 1 字节
        self.counters = bytearray(math.ceil(self.m / 2))
    
    def _get_counter(self, pos: int) -> int:
        byte_idx = pos // 2
        if pos % 2 == 0:
            return self.counters[byte_idx] & 0x0F
        else:
            return (self.counters[byte_idx] >> 4) & 0x0F
    
    def _set_counter(self, pos: int, value: int):
        byte_idx = pos // 2
        if pos % 2 == 0:
            self.counters[byte_idx] = (self.counters[byte_idx] & 0xF0) | (value & 0x0F)
        else:
            self.counters[byte_idx] = (self.counters[byte_idx] & 0x0F) | ((value & 0x0F) << 4)
    
    def _hash_positions(self, item: str) -> list:
        import mmh3
        h1 = mmh3.hash(item, seed=0) % self.m
        h2 = mmh3.hash(item, seed=42) % self.m
        return [(h1 + i * h2) % self.m for i in range(self.k)]
    
    def add(self, item: str):
        for pos in self._hash_positions(item):
            val = self._get_counter(pos)
            if val < 15:  # 饱和计数,防止溢出
                self._set_counter(pos, val + 1)
    
    def remove(self, item: str) -> bool:
        """删除元素。返回 False 如果元素不存在。"""
        positions = self._hash_positions(item)
        # 先检查是否存在
        if not all(self._get_counter(pos) > 0 for pos in positions):
            return False
        # 所有计数器减 1
        for pos in positions:
            val = self._get_counter(pos)
            if val < 15:  # 饱和的计数器不减(安全措施)
                self._set_counter(pos, val - 1)
        return True
    
    def __contains__(self, item: str) -> bool:
        return all(self._get_counter(pos) > 0 for pos in self._hash_positions(item))

9.16 面试题:设计一个 URL 去重系统

题目:设计一个系统,判断爬虫即将抓取的 URL 是否已经被抓取过。预期 URL 量为 100 亿,要求低延迟、可扩展。

分析框架

Step 1: 明确需求

Step 2: 方案对比

import math

def memory_analysis():
    """各方案的内存分析"""
    n = 10_000_000_000  # 100 亿 URL
    
    # 方案1:HashSet
    # 每个 URL hash 64位 + overhead,约 16 字节/元素
    hashset_mem = n * 16 / (1024**3)
    print(f"HashSet: {hashset_mem:.0f} GB")
    
    # 方案2:布隆过滤器 (0.1% 误判率)
    bloom_bits = -n * math.log(0.001) / (math.log(2)**2)
    bloom_mem = bloom_bits / 8 / (1024**3)
    print(f"Bloom Filter (0.1% FP): {bloom_mem:.1f} GB")
    
    # 方案3:布隆过滤器 (1% 误判率)
    bloom_bits_1 = -n * math.log(0.01) / (math.log(2)**2)
    bloom_mem_1 = bloom_bits_1 / 8 / (1024**3)
    print(f"Bloom Filter (1% FP): {bloom_mem_1:.1f} GB")

memory_analysis()
# HashSet: 149 GB
# Bloom Filter (0.1% FP): 17.9 GB
# Bloom Filter (1% FP): 11.9 GB

Step 3: 分布式架构

┌─────────────────────────────────────────────┐
│               URL 去重系统架构                │
├─────────────────────────────────────────────┤
│                                             │
│   新 URL ──→ Hash(URL) % N ──→ 路由到节点    │
│                                             │
│   ┌──────────┐ ┌──────────┐ ┌──────────┐   │
│   │  Node 0  │ │  Node 1  │ │  Node N  │   │
│   │  BF 2GB  │ │  BF 2GB  │ │  BF 2GB  │   │
│   │(12.5亿URL)│ │(12.5亿URL)│ │(12.5亿URL)│  │
│   └──────────┘ └──────────┘ └──────────┘   │
│                                             │
│   持久化层: 定期将位数组写入磁盘/对象存储       │
│   恢复: 启动时从持久化加载位数组               │
└─────────────────────────────────────────────┘

Step 4: 完整回答要点

"""
面试回答框架:

1. 容量估算
   - 100 亿 URL, 0.1% 误判率
   - 布隆过滤器: 约 18 GB
   - 分 8 个节点,每个节点约 2.2 GB

2. 分片策略
   - URL 哈希取模,决定去哪个节点
   - 一致性哈希(如果节点需要动态扩缩)

3. 持久化
   - 位数组定期 checkpoint 到磁盘
   - 增量日志:记录每个新插入操作
   - 恢复时加载 checkpoint + replay 增量日志

4. 容错
   - 主从复制:每个节点一个从节点
   - 位数组丢失后从 checkpoint 恢复

5. 扩容
   - 方案A:分层布隆过滤器(旧数据在 BF1,新数据在 BF2)
   - 方案B:一致性哈希 + 迁移

6. 监控
   - 监控实际误判率(抽样验证)
   - 当元素数接近设计容量时报警

7. 优化
   - 本地内存 + Redis 结合(热点 URL 本地缓存)
   - 分层过滤:先查本地小 BF,miss 再查远程大 BF
"""

9.17 误判率与内存的工程权衡

在真实系统中,布隆过滤器的参数选择需要考虑多个维度:

def engineering_tradeoffs():
    """工程中的参数选择决策树"""
    
    scenarios = [
        {
            "场景": "CDN URL 缓存判断",
            "n": 100_000_000,
            "误判代价": "一次额外的回源请求(成本低)",
            "推荐误判率": 0.01,
            "理由": "误判只是多一次回源,1%完全可以接受"
        },
        {
            "场景": "恶意 URL 黑名单",
            "n": 10_000_000,
            "误判代价": "正常网站被标记为恶意(用户投诉)",
            "推荐误判率": 0.0001,
            "理由": "误判=误封,用户体验差,需要极低误判率"
        },
        {
            "场景": "数据库 key 存在性检查",
            "n": 50_000_000,
            "误判代价": "一次无用的磁盘读取",
            "推荐误判率": 0.001,
            "理由": "磁盘IO成本中等,0.1%是好的平衡点"
        },
        {
            "场景": "去重(爬虫/消息)",
            "n": 1_000_000_000,
            "误判代价": "漏处理一些URL/消息",
            "推荐误判率": 0.001,
            "理由": "漏抓少量URL可接受,但太多则影响覆盖率"
        }
    ]
    
    for s in scenarios:
        m = -s["n"] * math.log(s["推荐误判率"]) / (math.log(2)**2)
        k = int(math.ceil((m/s["n"]) * math.log(2)))
        mem_mb = m / 8 / 1024 / 1024
        
        print(f"场景: {s['场景']}")
        print(f"  元素数: {s['n']:,}")
        print(f"  误判率: {s['推荐误判率']}")
        print(f"  内存: {mem_mb:.1f} MB")
        print(f"  哈希数: {k}")
        print(f"  理由: {s['理由']}")
        print()

engineering_tradeoffs()

常见陷阱

陷阱 1:元素数超过设计容量

def capacity_overflow_demo():
    """展示超容后的误判率恶化"""
    bf = BloomFilter(expected_items=10000, false_positive_rate=0.01)
    
    for filled_ratio in [0.5, 1.0, 1.5, 2.0, 3.0]:
        # 重新创建并填充
        bf2 = BloomFilter(expected_items=10000, false_positive_rate=0.01)
        actual_items = int(10000 * filled_ratio)
        for i in range(actual_items):
            bf2.add(f"item_{i}")
        
        # 测试误判率
        fp = 0
        tests = 10000
        for i in range(tests):
            if f"not_exist_{i}" in bf2:
                fp += 1
        
        print(f"填充率 {filled_ratio:.0%} ({actual_items:,} items): "
              f"实测误判率 = {fp/tests:.4f}")

# 典型输出:
# 填充率 50%: 实测误判率 = 0.0001
# 填充率 100%: 实测误判率 = 0.0098  (接近设计值 1%)
# 填充率 150%: 实测误判率 = 0.0500  (5%!)
# 填充率 200%: 实测误判率 = 0.1400  (14%!)
# 填充率 300%: 实测误判率 = 0.3700  (37%! 几乎无用)

结论:超容 50% 误判率就从 1% 涨到 5%,超容 2 倍达到 14%。设计容量必须留余量。推荐做法:设计容量设为预期最大值的 1.5-2 倍。

陷阱 2:哈希函数质量不够

"""
错误示范:用简单的哈希函数
"""

# 坏的哈希函数:位分布不均匀
def bad_hash(item, seed):
    return (hash(item) + seed) % 1000  # hash() + 偏移不是独立哈希!

# 好的哈希函数:使用双重哈希技术
def good_hash(item, i, m):
    import mmh3
    h1 = mmh3.hash(item, seed=0)
    h2 = mmh3.hash(item, seed=42)
    return (h1 + i * h2) % m  # Kirsch-Mitzenmacher (2006)

为什么 hash(item) + seed 不行? 因为这只是简单的偏移,不同 seed 产生的哈希值高度相关(差值恒定),违反了布隆过滤器要求哈希函数"独立"的假设。Kirsch & Mitzenmacher 2006 年证明了双重哈希 $g_i(x) = h_1(x) + i \cdot h_2(x)$ 在理论上与 k 个完全独立的哈希函数性能相同。

陷阱 3:并发写入的线程安全

import threading

class ThreadSafeBloomFilter:
    """
    线程安全的布隆过滤器
    
    注意:标准布隆过滤器的 add 操作不是原子的
    (需要设置 k 个位,中间状态可能被其他线程观察到)
    但这对正确性没有影响——最坏情况只是暂时的假阴性,
    add 完成后就恢复正常。
    
    真正的问题是位数组的并发写入可能导致更新丢失
    (read-modify-write 不是原子的)。
    """
    
    def __init__(self, expected_items, false_positive_rate=0.01):
        self._bf = BloomFilter(expected_items, false_positive_rate)
        self._lock = threading.Lock()
    
    def add(self, item: str):
        # 方案1:全局锁(简单但性能差)
        with self._lock:
            self._bf.add(item)
    
    def __contains__(self, item: str) -> bool:
        # 查询不需要加锁(只读操作)
        # 但可能看到部分更新的状态(对布隆过滤器来说这是安全的)
        return item in self._bf


# 更高性能的方案:分段锁
class StripedBloomFilter:
    """使用分段锁减少竞争"""
    
    def __init__(self, expected_items, false_positive_rate=0.01, num_stripes=16):
        self._bf = BloomFilter(expected_items, false_positive_rate)
        self._stripes = [threading.Lock() for _ in range(num_stripes)]
    
    def _get_stripe(self, position: int) -> threading.Lock:
        return self._stripes[position % len(self._stripes)]
    
    def add(self, item: str):
        positions = self._bf._get_hash_positions(item)
        # 按顺序获取锁,避免死锁
        unique_stripes = sorted(set(p % len(self._stripes) for p in positions))
        for s in unique_stripes:
            self._stripes[s].acquire()
        try:
            self._bf.add(item)
        finally:
            for s in unique_stripes:
                self._stripes[s].release()

9.18 Redis 布隆过滤器模块(RedisBloom)

Redis 原生不支持布隆过滤器,但 RedisBloom 模块(原 ReBloom)提供了生产级实现。

import redis

# 需要 Redis 加载了 RedisBloom 模块
r = redis.Redis()

# BF.RESERVE: 创建布隆过滤器
# BF.RESERVE key error_rate capacity [EXPANSION expansion] [NONSCALING]
r.execute_command('BF.RESERVE', 'urls:seen', 0.001, 10000000)
# 创建一个容量 1000万、误判率 0.1% 的布隆过滤器

# BF.ADD: 添加元素
r.execute_command('BF.ADD', 'urls:seen', 'https://example.com/page1')

# BF.MADD: 批量添加
r.execute_command('BF.MADD', 'urls:seen', 
                  'https://example.com/page2',
                  'https://example.com/page3')

# BF.EXISTS: 查询
exists = r.execute_command('BF.EXISTS', 'urls:seen', 'https://example.com/page1')
print(f"exists: {exists}")  # 1 (可能存在)

not_exists = r.execute_command('BF.EXISTS', 'urls:seen', 'https://never-added.com')
print(f"not_exists: {not_exists}")  # 0 (一定不存在)

# BF.MEXISTS: 批量查询
results = r.execute_command('BF.MEXISTS', 'urls:seen',
                           'https://example.com/page1',
                           'https://never-added.com')
print(f"batch results: {results}")  # [1, 0]

# BF.INFO: 查看过滤器信息
info = r.execute_command('BF.INFO', 'urls:seen')
print(f"info: {info}")
# 输出包括:Capacity, Size, Number of filters, Number of items inserted, Expansion rate

RedisBloom 的自动扩容机制

"""
RedisBloom 的 EXPANSION 参数允许自动扩容:

当第一个子过滤器满了,自动创建第二个容量为原来 expansion 倍的子过滤器。
查询时需要检查所有子过滤器(任一个返回"存在"则结果为"存在")。

代价:
- 查询变慢(需要查多个子过滤器)
- 误判率略微增加
- 但避免了因超容导致的误判率急剧恶化

默认 expansion=2,即每次扩容容量翻倍。
"""

# 创建可扩容的布隆过滤器
r.execute_command('BF.RESERVE', 'scalable:bf', 
                  0.01,        # 初始误判率
                  1000000,     # 初始容量
                  'EXPANSION', 2)  # 扩容倍数

# 如果不需要扩容(固定容量,性能更好)
r.execute_command('BF.RESERVE', 'fixed:bf',
                  0.01,
                  10000000,
                  'NONSCALING')  # 超容后 BF.ADD 返回错误

生产环境最佳实践

"""
Redis 布隆过滤器的生产最佳实践:

1. 容量规划
   - 预估峰值数据量的 2 倍作为 capacity
   - 或使用 EXPANSION 自动扩容

2. 误判率选择
   - 缓存穿透: 0.01 (1%)
   - URL 去重: 0.001 (0.1%)
   - 安全黑名单: 0.0001 (0.01%)

3. 持久化
   - RedisBloom 数据随 Redis RDB/AOF 持久化
   - 大过滤器的 RDB 保存可能导致延迟抖动

4. 内存估算
   - BF.DEBUG 可查看实际内存使用
   - 经验公式: n * (-ln(ε) / (ln2)^2) / 8 字节

5. 监控指标
   - 已插入元素数 vs 设计容量
   - 实际误判率(定期抽样测试)
   - 子过滤器数量(扩容次数)
"""

class BloomFilterManager:
    """Redis 布隆过滤器管理器"""
    
    def __init__(self, redis_client, key_prefix="bf:"):
        self.r = redis_client
        self.prefix = key_prefix
    
    def create_if_not_exists(self, name: str, error_rate: float, 
                             capacity: int, expansion: int = 2):
        """安全创建(幂等)"""
        key = f"{self.prefix}{name}"
        try:
            self.r.execute_command('BF.INFO', key)
        except redis.ResponseError:
            self.r.execute_command('BF.RESERVE', key, 
                                  error_rate, capacity, 
                                  'EXPANSION', expansion)
    
    def check_health(self, name: str) -> dict:
        """检查过滤器健康状态"""
        key = f"{self.prefix}{name}"
        info = self.r.execute_command('BF.INFO', key)
        
        # 解析 info(返回格式因版本而异)
        info_dict = {}
        for i in range(0, len(info), 2):
            info_dict[info[i].decode()] = info[i+1]
        
        capacity = info_dict.get('Capacity', 0)
        inserted = info_dict.get('Number of items inserted', 0)
        fill_ratio = inserted / capacity if capacity > 0 else 0
        
        status = "healthy"
        if fill_ratio > 0.9:
            status = "critical"  # 即将满载
        elif fill_ratio > 0.7:
            status = "warning"   # 接近容量
        
        return {
            "key": key,
            "capacity": capacity,
            "inserted": inserted,
            "fill_ratio": fill_ratio,
            "status": status
        }

9.19 概率数据结构的统一视角

布隆过滤器、Count-Min Sketch、HyperLogLog 看似解决不同问题,但它们共享一个核心思想:

用哈希将数据映射到紧凑的摘要结构中,牺牲少量精确性,换取数量级的空间节省。

数据结构 回答的问题 误差类型 空间复杂度
Bloom Filter x ∈ S? 假阳性 O(n) bits
Count-Min Sketch freq(x) = ? 只高估 O(1/ε · log(1/δ))
HyperLogLog S = ?
Cuckoo Filter x ∈ S? + delete 假阳性 O(n) bits
MinHash Jaccard(A,B) = ? ±相对误差 O(1/ε²)

它们的共同特点:

  1. 单向性:可以插入,但信息被压缩,原始数据不可恢复。
  2. 可合并性:两个摘要可以合并为一个(支持分布式计算)。
  3. 误差可控:用户可以通过增加空间来降低误差。
  4. 流式处理:只需要扫描数据一次,不需要随机访问。

这些性质使得概率数据结构特别适合大数据流处理场景——数据量太大无法精确存储,但近似答案已经足够做出正确的工程决策。


本章小结

概念 核心 空间 误差 典型应用
Bloom Filter 位数组 + k 哈希 ~10 bits/elem @1% FP 假阳性 缓存穿透、URL去重
Count-Min Sketch d×w 计数矩阵 O(1/ε · log(1/δ)) 只高估 流量统计、热点检测
HyperLogLog 前导零观察 ~12KB (Redis) ±0.81% 独立访客数
Cuckoo Filter 指纹 + 布谷鸟哈希 ~12 bits/elem @0.1% FP 假阳性 需要删除的场景

核心思想:在精确性和空间之间找到最优的工程平衡点。当你面对"TB 级数据做 membership/frequency/cardinality 查询"的问题时,概率数据结构几乎总是正确答案。

本章评分
4.7  / 5  (44 评分)

💬 留言讨论