布隆过滤器与概率数据结构
第九章:布隆过滤器与概率数据结构
你的爬虫已经抓了 10 亿个 URL。每发现一个新链接,你需要判断"之前是不是已经抓过了"。用 HashSet 存?10 亿个 URL 每个平均 100 字节,光存储就需要 100GB 内存。用数据库查?每秒几万次磁盘 IO,速度跟不上。
布隆过滤器给出了一个惊人的答案:只用 1.2GB 内存(每个 URL 对应约 10 位),就能以 0.1% 的误判率告诉你某个 URL 是否"可能存在"或"一定不存在"。代价是什么?它会有极小概率说"存在"但实际不存在(假阳性),但永远不会说"不存在"但实际存在(没有假阴性)。
这种用少量空间换取近似答案的思路,催生了一整个家族:布隆过滤器、Count-Min Sketch、HyperLogLog、Cuckoo Filter......它们统称为概率数据结构(Probabilistic Data Structures)。这一章会深入每一个成员,从数学原理到工程实践。
Level 1 · 你需要知道的
9.1 布隆过滤器的基本思想
布隆过滤器(Bloom Filter)由 Burton Howard Bloom 于 1970 年提出,核心思想可以用一句话概括:
用一个位数组 + 多个哈希函数,实现空间极小的集合成员判断。查询结果只有两种:「可能存在」或「一定不存在」。
为什么是"可能存在"而不是"一定存在"?因为不同元素的哈希值可能碰撞,多个不同元素可能把同一组位都置为 1,导致一个从未插入的元素看起来"存在"。但如果有任何一个哈希位置为 0,那该元素一定没有被插入过。
直觉理解
想象一个有 m 位的位数组(bit array),初始全为 0。你有 k 个独立的哈希函数,每个哈希函数把输入映射到 [0, m-1] 的范围。
- 插入:对元素计算 k 个哈希值,把位数组对应的 k 个位置设为 1。
- 查询:对元素计算 k 个哈希值,检查对应的 k 个位置是否全为 1。全为 1 → "可能存在";任何一个为 0 → "一定不存在"。
插入 "apple":
h1("apple") = 3, h2("apple") = 7, h3("apple") = 11
位数组: 0 0 0 1 0 0 0 1 0 0 0 1 0 0 0 0
↑ ↑ ↑ ↑
idx: 3 7 11
插入 "banana":
h1("banana") = 1, h2("banana") = 7, h3("banana") = 14
位数组: 0 1 0 1 0 0 0 1 0 0 0 1 0 0 1 0
↑ ↑
idx:1 idx:14
查询 "cherry":
h1("cherry") = 3, h2("cherry") = 5, h3("cherry") = 14
位置 3=1 ✓, 位置 5=0 ✗ → 一定不存在
查询 "grape":
h1("grape") = 1, h2("grape") = 7, h3("grape") = 11
位置 1=1 ✓, 位置 7=1 ✓, 位置 11=1 ✓ → 可能存在(假阳性!)
这就是布隆过滤器的精髓:牺牲一点准确性,换取巨大的空间节省。
9.2 Python 实现一个完整的布隆过滤器
import math
import mmh3 # MurmurHash3,高性能非加密哈希
class BloomFilter:
"""
布隆过滤器实现。
参数:
expected_items (int): 预期插入的元素数量 n
false_positive_rate (float): 期望的误判率 ε,例如 0.01 表示 1%
"""
def __init__(self, expected_items: int, false_positive_rate: float = 0.01):
self.n = expected_items
self.fp_rate = false_positive_rate
# 计算最优的位数组大小 m
# m = -n * ln(ε) / (ln2)^2
self.m = self._optimal_bit_count(expected_items, false_positive_rate)
# 计算最优的哈希函数个数 k
# k = (m/n) * ln2
self.k = self._optimal_hash_count(self.m, expected_items)
# 位数组,用 bytearray 实现
self.bit_array = bytearray(math.ceil(self.m / 8))
self.count = 0 # 已插入元素数量
@staticmethod
def _optimal_bit_count(n: int, fp_rate: float) -> int:
"""计算最优的位数组大小"""
m = -n * math.log(fp_rate) / (math.log(2) ** 2)
return int(math.ceil(m))
@staticmethod
def _optimal_hash_count(m: int, n: int) -> int:
"""计算最优的哈希函数个数"""
k = (m / n) * math.log(2)
return int(math.ceil(k))
def _get_hash_positions(self, item: str) -> list:
"""
用双重哈希技术生成 k 个哈希位置。
技术要点:不需要真的创建 k 个独立哈希函数,
只用两个哈希值 h1, h2 就能生成 k 个:
hash_i = (h1 + i * h2) % m
这个技术来自 Kirsch & Mitzenmacher (2006) 的论文
"Less Hashing, Same Performance: Building a Better Bloom Filter"。
"""
h1 = mmh3.hash(item, seed=0) % self.m
h2 = mmh3.hash(item, seed=42) % self.m
positions = []
for i in range(self.k):
pos = (h1 + i * h2) % self.m
positions.append(pos)
return positions
def _set_bit(self, position: int):
"""将指定位置的位设为 1"""
byte_idx = position // 8
bit_idx = position % 8
self.bit_array[byte_idx] |= (1 << bit_idx)
def _get_bit(self, position: int) -> bool:
"""获取指定位置的位值"""
byte_idx = position // 8
bit_idx = position % 8
return bool(self.bit_array[byte_idx] & (1 << bit_idx))
def add(self, item: str):
"""插入一个元素"""
for pos in self._get_hash_positions(item):
self._set_bit(pos)
self.count += 1
def __contains__(self, item: str) -> bool:
"""
查询元素是否可能存在。
返回 True: 元素可能存在(有 fp_rate 的概率是误判)
返回 False: 元素一定不存在
"""
return all(self._get_bit(pos) for pos in self._get_hash_positions(item))
@property
def current_false_positive_rate(self) -> float:
"""根据当前已插入元素数量,计算实际误判率"""
# (1 - e^(-kn/m))^k
exponent = -self.k * self.count / self.m
return (1 - math.exp(exponent)) ** self.k
def __repr__(self):
return (
f"BloomFilter(m={self.m} bits, k={self.k} hashes, "
f"n={self.count}/{self.n}, "
f"actual_fp≈{self.current_false_positive_rate:.6f})"
)
# 使用示例
if __name__ == "__main__":
# 创建一个期望存 100万 个元素、误判率 1% 的布隆过滤器
bf = BloomFilter(expected_items=1_000_000, false_positive_rate=0.01)
print(f"位数组大小: {bf.m:,} bits = {bf.m / 8 / 1024 / 1024:.2f} MB")
print(f"哈希函数个数: {bf.k}")
# 输出: 位数组大小: 9,585,059 bits = 1.14 MB
# 输出: 哈希函数个数: 7
# 插入数据
for i in range(100_000):
bf.add(f"url_{i}")
# 查询已存在元素
print(f"'url_42' in bf: {'url_42' in bf}") # True(正确)
print(f"'url_99999' in bf: {'url_99999' in bf}") # True(正确)
# 查询不存在的元素
false_positives = 0
test_count = 100_000
for i in range(test_count):
if f"not_exist_{i}" in bf:
false_positives += 1
print(f"实测误判率: {false_positives/test_count:.4f}")
# 约 0.0001(因为只插入了 10 万个,远少于设计容量 100 万)
print(bf)
关键观察
- 100 万个 URL,只用 1.14 MB。如果用 Python 的 set 存储 100 万个 URL(平均 50 字符),至少需要 100MB+。
- 哈希函数个数 k=7。不是越多越好,也不是越少越好,有最优值。
- 误判率随插入量增加而上升。设计时的 expected_items 是关键参数,超出后误判率会急剧恶化。
9.3 误判率公式与最优参数
布隆过滤器有三个核心参数的关系:
- m:位数组大小(位数)
- n:预期插入元素数量
- k:哈希函数个数
- ε:误判率(false positive rate)
误判率公式:
$$\varepsilon = \left(1 - e^{-kn/m}\right)^k$$
最优哈希函数个数:
$$k_{opt} = \frac{m}{n} \cdot \ln 2 \approx 0.693 \cdot \frac{m}{n}$$
最优位数组大小:
$$m = -\frac{n \cdot \ln \varepsilon}{(\ln 2)^2} \approx 1.44 \cdot n \cdot \log_2\frac{1}{\varepsilon}$$
import math
def bloom_filter_params(n: int, fp_rate: float):
"""给定元素数量和期望误判率,计算最优参数"""
# 最优 m
m = -n * math.log(fp_rate) / (math.log(2) ** 2)
m = int(math.ceil(m))
# 最优 k
k = (m / n) * math.log(2)
k = int(math.ceil(k))
# 每个元素占用的位数
bits_per_element = m / n
print(f"预期元素数: {n:,}")
print(f"期望误判率: {fp_rate}")
print(f"位数组大小: {m:,} bits = {m/8/1024/1024:.2f} MB")
print(f"哈希函数数: {k}")
print(f"每元素位数: {bits_per_element:.1f} bits/element")
return m, k
# 不同误判率下的参数对比
print("=== 1% 误判率 ===")
bloom_filter_params(10_000_000, 0.01)
# m=95,850,584 bits=11.44 MB, k=7, 9.6 bits/elem
print("\n=== 0.1% 误判率 ===")
bloom_filter_params(10_000_000, 0.001)
# m=143,775,877 bits=17.15 MB, k=10, 14.4 bits/elem
print("\n=== 0.01% 误判率 ===")
bloom_filter_params(10_000_000, 0.0001)
# m=191,701,169 bits=22.87 MB, k=14, 19.2 bits/elem
实用规律:误判率每降低 10 倍,每个元素多需要约 4.8 位(即 m/n 增加约 4.8)。
9.4 典型应用场景
场景 1:缓存穿透防护
"""
问题:恶意请求查询大量不存在的 key,所有请求穿透缓存直接打到数据库。
解决:在缓存层前加布隆过滤器,不在过滤器中的 key 直接返回空。
"""
class CacheWithBloomFilter:
def __init__(self, all_valid_keys):
# 将所有合法 key 加入布隆过滤器
self.bloom = BloomFilter(
expected_items=len(all_valid_keys),
false_positive_rate=0.001 # 0.1% 误判率
)
for key in all_valid_keys:
self.bloom.add(key)
self.cache = {} # 模拟缓存层
self.db = {} # 模拟数据库层
def get(self, key: str):
# 第一层:布隆过滤器拦截
if key not in self.bloom:
return None # 一定不存在,直接返回
# 第二层:查缓存
if key in self.cache:
return self.cache[key]
# 第三层:查数据库(只有极少数请求到这里)
value = self.db.get(key)
if value is not None:
self.cache[key] = value
return value
效果:假设恶意请求全部查询不存在的 key,布隆过滤器能拦截 99.9% 的请求(0.1% 误判率意味着只有千分之一的假请求会穿透到数据库)。
场景 2:爬虫 URL 去重
"""
10 亿 URL 去重:
- HashSet 方案: ~100GB 内存(不可行)
- 数据库方案: 太慢
- 布隆过滤器: ~1.2GB 内存(10 bits/URL, 0.1%误判率)
"""
class WebCrawler:
def __init__(self, expected_urls=1_000_000_000):
self.seen_urls = BloomFilter(
expected_items=expected_urls,
false_positive_rate=0.001
)
def should_crawl(self, url: str) -> bool:
"""判断是否应该抓取该 URL"""
if url in self.seen_urls:
return False # 可能已抓过,跳过
# 标记为已抓取
self.seen_urls.add(url)
return True
def crawl(self, seed_urls: list):
queue = list(seed_urls)
while queue:
url = queue.pop(0)
if self.should_crawl(url):
# 抓取页面并提取新链接
new_links = self._fetch_and_extract(url)
queue.extend(new_links)
权衡:0.1% 的误判率意味着大约有 100 万个 URL 会被错误跳过(10 亿 × 0.1%)。对于大规模爬虫来说,这完全可以接受。
场景 3:垃圾邮件过滤
"""
Google Chrome 早期用布隆过滤器存储恶意 URL 黑名单。
好处:不需要把完整的恶意 URL 列表下载到客户端,
只需要一个紧凑的布隆过滤器(几 MB)就能覆盖数百万恶意 URL。
"""
class SpamFilter:
def __init__(self, known_spam_addresses):
self.spam_bloom = BloomFilter(
expected_items=len(known_spam_addresses),
false_positive_rate=0.0001 # 万分之一误判率
)
for addr in known_spam_addresses:
self.spam_bloom.add(addr.lower())
def is_spam(self, sender: str) -> str:
"""
返回: "definitely_not_spam" 或 "possibly_spam"
"""
if sender.lower() not in self.spam_bloom:
return "definitely_not_spam"
else:
# 可能是垃圾邮件,需要进一步检查
return "possibly_spam"
9.5 布隆过滤器的局限性
- 不能删除元素:把某个位从 1 设回 0 可能影响其他元素的判断(多个元素共享同一个位)。
- 不能获取已插入的元素列表:布隆过滤器只存哈希信息,原始数据丢失。
- 容量固定:创建时必须确定预期元素数量,超出后误判率急剧上升。
- 只适用于集合成员判断:不能存储关联值(不像哈希表那样能存 key-value)。
Level 2 · 它是怎么运行的
9.6 布隆过滤器数学推导
让我们从第一原理推导误判率公式。
假设:k 个哈希函数将元素均匀随机地映射到 m 位的位数组中。
步骤 1:插入一个元素时,某个特定位没有被任何一个哈希函数设为 1 的概率是:
$$P(\text{该位为 0 after 1 hash}) = 1 - \frac{1}{m}$$
k 个哈希函数都没有设置该位:
$$P(\text{该位为 0 after 1 element}) = \left(1 - \frac{1}{m}\right)^k$$
步骤 2:插入 n 个元素后,该位仍然为 0 的概率:
$$P(\text{该位为 0 after n elements}) = \left(1 - \frac{1}{m}\right)^{kn}$$
利用极限 $(1-1/m)^m \to e^{-1}$(当 m 足够大时):
$$P(\text{该位为 0}) \approx e^{-kn/m}$$
步骤 3:该位为 1 的概率:
$$P(\text{该位为 1}) = 1 - e^{-kn/m}$$
步骤 4:假阳性发生的条件是 k 个哈希位置全为 1。假设各位独立(这是近似,但 Mitzenmacher & Upfal 2005 证明了该近似在实践中非常精确):
$$\varepsilon = \left(1 - e^{-kn/m}\right)^k$$
最优 k 的推导:
对 ε 取对数:
$$\ln \varepsilon = k \cdot \ln\left(1 - e^{-kn/m}\right)$$
设 $p = e^{-kn/m}$(即某一位为 0 的概率),则:
$$\ln \varepsilon = k \cdot \ln(1-p)$$
要最小化 ε,等价于最小化 $k \cdot \ln(1-p)$。通过对 k 求导并令导数为零,可以得到当 $p = 1/2$ 时(即每一位被设为 1 的概率恰好是 1/2)误判率最小。
从 $p = e^{-kn/m} = 1/2$ 解出:
$$k = \frac{m}{n} \cdot \ln 2$$
此时误判率为:
$$\varepsilon_{min} = (1/2)^k = (1/2)^{(m/n)\ln 2} = 2^{-(m/n)\ln 2}$$
反过来,给定目标误判率 ε,需要的位数组大小为:
$$m = -\frac{n \ln \varepsilon}{(\ln 2)^2} = \frac{n \ln(1/\varepsilon)}{(\ln 2)^2} \approx 1.44 \cdot n \cdot \log_2\frac{1}{\varepsilon}$$
import math
def verify_formula(m, n, k):
"""验证理论公式与实际误判率的一致性"""
# 理论误判率
theoretical_fp = (1 - math.exp(-k * n / m)) ** k
# 模拟实验
import random
bit_array = [0] * m
# 插入 n 个随机元素
for _ in range(n):
for _ in range(k):
pos = random.randint(0, m - 1)
bit_array[pos] = 1
# 测试 10000 个不存在的元素
false_positives = 0
tests = 10000
for _ in range(tests):
if all(bit_array[random.randint(0, m - 1)] == 1 for _ in range(k)):
false_positives += 1
actual_fp = false_positives / tests
print(f"理论误判率: {theoretical_fp:.6f}")
print(f"实测误判率: {actual_fp:.6f}")
print(f"相对误差: {abs(theoretical_fp - actual_fp) / theoretical_fp * 100:.1f}%")
# m=10000, n=1000, k=7 (接近最优)
verify_formula(10000, 1000, 7)
9.7 Count-Min Sketch:频率估计
布隆过滤器回答"元素是否存在",但如果我们想问"这个元素出现了多少次"呢?比如:网络流量中每个 IP 的访问频率、文本中每个词的出现次数。
Count-Min Sketch(Cormode & Muthukrishnan, 2005, "An Improved Data Stream Summary: The Count-Min Sketch and its Applications")解决的就是这个问题。
核心思想
Count-Min Sketch 是一个二维计数数组(d 行 × w 列),配合 d 个哈希函数。
- 更新:对于元素 x 的计数增加 c,计算 d 个哈希值,在每行对应列加 c。
- 查询:对于元素 x 的频率查询,取 d 行对应列的值中的最小值。
import mmh3
import math
class CountMinSketch:
"""
Count-Min Sketch: 频率估计的概率数据结构
保证: 估计值 >= 真实值(只会高估,不会低估)
误差界: P(估计值 - 真实值 > ε * N) < δ
其中 N 是所有元素的总频率之和
参数:
epsilon: 误差参数,估计误差不超过 ε*N
delta: 失败概率,误差超过 ε*N 的概率小于 δ
"""
def __init__(self, epsilon: float = 0.001, delta: float = 0.01):
# w = ceil(e/ε), d = ceil(ln(1/δ))
self.w = int(math.ceil(math.e / epsilon))
self.d = int(math.ceil(math.log(1.0 / delta)))
self.table = [[0] * self.w for _ in range(self.d)]
self.total_count = 0
def _hash(self, item: str, row: int) -> int:
"""第 row 个哈希函数"""
return mmh3.hash(item, seed=row) % self.w
def add(self, item: str, count: int = 1):
"""增加元素的计数"""
self.total_count += count
for row in range(self.d):
col = self._hash(item, row)
self.table[row][col] += count
def estimate(self, item: str) -> int:
"""
估计元素的频率。
返回值 >= 真实频率(因为哈希碰撞只会增加计数)
取各行最小值来减少高估幅度。
"""
min_count = float('inf')
for row in range(self.d):
col = self._hash(item, row)
min_count = min(min_count, self.table[row][col])
return min_count
@property
def memory_usage_bytes(self) -> int:
"""估计内存使用(每个计数器 4 字节)"""
return self.w * self.d * 4
# 示例:统计网站访问频率
cms = CountMinSketch(epsilon=0.001, delta=0.01)
print(f"表大小: {cms.d} × {cms.w} = {cms.d * cms.w:,} 个计数器")
print(f"内存使用: {cms.memory_usage_bytes / 1024:.1f} KB")
# 模拟访问日志
import random
access_log = []
# 模拟幂律分布:少数 IP 访问很多次
for i in range(100):
freq = int(10000 / (i + 1)) # 齐普夫分布
access_log.extend([f"192.168.1.{i}"] * freq)
random.shuffle(access_log)
# 用 CMS 统计
exact_counts = {}
for ip in access_log:
cms.add(ip)
exact_counts[ip] = exact_counts.get(ip, 0) + 1
# 验证精度
for ip in ["192.168.1.0", "192.168.1.9", "192.168.1.99"]:
exact = exact_counts.get(ip, 0)
estimated = cms.estimate(ip)
error = estimated - exact
print(f"{ip}: 精确={exact}, 估计={estimated}, 误差={error}")
为什么取最小值?
每个哈希函数都可能有碰撞(其他元素映射到同一列),碰撞只会增加计数。取 d 行的最小值,等于取碰撞最少的那一行的结果,能有效降低高估的幅度。
误差保证(来自 Cormode & Muthukrishnan 2005):
$$P\left[\hat{f}(x) - f(x) > \varepsilon \cdot N\right] < \delta$$
其中 $\hat{f}(x)$ 是估计频率,$f(x)$ 是真实频率,$N$ 是总元素数。
9.8 HyperLogLog:基数估计
"这个网站今天有多少不同的访客?"这是一个基数估计(cardinality estimation)问题。精确计算需要存储所有唯一访客 ID(可能几十 GB),HyperLogLog 只用约 12KB 就能估计上亿的基数,标准误差约 0.81%。
基本原理
HyperLogLog(Flajolet, Fusy, Gandouet & Meunier, 2007, "HyperLogLog: the analysis of a near-optimal cardinality estimation algorithm")基于一个精妙的概率观察:
如果你抛硬币,看到连续 k 个正面的概率是 $1/2^k$。如果你在大量实验中观察到的最长连续正面序列是 k,那么你大约做了 $2^k$ 次实验。
将这个思想应用到哈希值:把元素哈希为均匀分布的位串,观察前导零的最大个数来估计不同元素的数量。
import mmh3
import math
class HyperLogLog:
"""
HyperLogLog 基数估计算法
使用 2^p 个桶(registers),每个桶存储观察到的最大前导零数+1。
标准误差 ≈ 1.04 / sqrt(2^p)
参数:
p: 精度参数(4 <= p <= 16),使用 2^p 个桶
p=14 时有 16384 个桶,标准误差约 0.81%
"""
def __init__(self, p: int = 14):
assert 4 <= p <= 16, "p must be between 4 and 16"
self.p = p
self.m = 1 << p # 桶的数量 = 2^p
self.registers = [0] * self.m
# α_m 修正因子
if self.m == 16:
self.alpha = 0.673
elif self.m == 32:
self.alpha = 0.697
elif self.m == 64:
self.alpha = 0.709
else:
self.alpha = 0.7213 / (1 + 1.079 / self.m)
def _hash(self, item: str) -> int:
"""返回 64 位哈希值"""
# 使用 mmh3 的 128 位哈希,取低 64 位
h = mmh3.hash128(item, signed=False)
return h & ((1 << 64) - 1)
def _leading_zeros(self, hash_value: int, start_bit: int) -> int:
"""计算从 start_bit 位开始的前导零个数 + 1"""
# 我们检查从第 p 位开始的位
count = 1
for i in range(start_bit, 64):
if hash_value & (1 << (63 - i)):
break
count += 1
return count
def add(self, item: str):
"""添加一个元素"""
h = self._hash(item)
# 前 p 位决定桶的索引
bucket_idx = h >> (64 - self.p)
# 剩余位用于计算前导零
w = self._leading_zeros(h, self.p)
# 保留最大值
self.registers[bucket_idx] = max(self.registers[bucket_idx], w)
def estimate(self) -> int:
"""估计基数"""
# 调和平均数
indicator = sum(2.0 ** (-r) for r in self.registers)
raw_estimate = self.alpha * self.m * self.m / indicator
# 小基数修正
if raw_estimate <= 2.5 * self.m:
# 计算空桶数量
zeros = self.registers.count(0)
if zeros > 0:
# Linear Counting
raw_estimate = self.m * math.log(self.m / zeros)
# 大基数修正(超过 2^32 / 30 时)
elif raw_estimate > (1 << 32) / 30.0:
raw_estimate = -(1 << 32) * math.log(1 - raw_estimate / (1 << 32))
return int(raw_estimate)
@property
def memory_usage_bytes(self) -> int:
"""每个 register 用 6 位就够(最大值 64),这里简化用 1 字节"""
return self.m
def merge(self, other: 'HyperLogLog'):
"""合并两个 HLL(取每个桶的最大值),支持分布式计算"""
assert self.p == other.p, "Cannot merge HLLs with different precision"
for i in range(self.m):
self.registers[i] = max(self.registers[i], other.registers[i])
# 演示
hll = HyperLogLog(p=14)
print(f"桶数量: {hll.m:,}")
print(f"内存使用: {hll.memory_usage_bytes:,} bytes ≈ {hll.memory_usage_bytes/1024:.1f} KB")
print(f"理论标准误差: {1.04 / math.sqrt(hll.m):.4f} = {1.04 / math.sqrt(hll.m) * 100:.2f}%")
# 添加不同数量的元素并检查精度
import random
for true_cardinality in [1000, 10000, 100000, 1000000]:
hll = HyperLogLog(p=14)
for i in range(true_cardinality):
hll.add(f"user_{i}")
estimated = hll.estimate()
error = abs(estimated - true_cardinality) / true_cardinality * 100
print(f"真实基数: {true_cardinality:>10,} | "
f"估计: {estimated:>10,} | "
f"误差: {error:.2f}%")
Redis 中的 HyperLogLog
Redis 原生支持 HyperLogLog,使用固定 12KB 内存:
import redis
r = redis.Redis()
# PFADD: 添加元素到 HLL
r.pfadd("visitors:2024-01-15", "user_1", "user_2", "user_3")
r.pfadd("visitors:2024-01-15", "user_2", "user_4") # user_2 重复
# PFCOUNT: 估计基数
count = r.pfcount("visitors:2024-01-15")
print(f"今日独立访客数: {count}") # 输出: 4
# PFMERGE: 合并多个 HLL(求并集基数)
r.pfmerge("visitors:week", "visitors:2024-01-15", "visitors:2024-01-16")
weekly_count = r.pfcount("visitors:week")
print(f"本周独立访客数: {weekly_count}")
Redis HLL 实现细节(Antirez, Redis 源码):
- 使用 p=14(16384 个桶)
- 每个桶 6 位,总计 16384 × 6 / 8 = 12,288 字节 ≈ 12KB
- 标准误差约 0.81%
- 小基数时使用稀疏表示(更省内存),大基数时切换到密集表示
9.9 Cuckoo Filter:支持删除的布隆过滤器
布隆过滤器最大的痛点是不能删除元素。Cuckoo Filter(Fan, Andersen, Kaminsky & Mitzenmacher, 2014, "Cuckoo Filter: Practically Better Than Bloom")解决了这个问题。
核心思想
Cuckoo Filter 的名字来源于布谷鸟哈希(Cuckoo Hashing):
- 每个桶可以存储多个指纹(fingerprint,元素哈希值的一小段)。
- 每个元素有两个候选桶位置。
- 插入时如果两个位置都满了,随机踢出一个已有指纹到它的备选位置(像布谷鸟抢巢)。
- 删除只需从桶中移除对应的指纹。
import mmh3
import random
class CuckooFilter:
"""
Cuckoo Filter: 支持删除的概率集合数据结构
相比布隆过滤器的优势:
1. 支持删除操作
2. 查询速度更快(只需检查 2 个位置)
3. 在相同误判率下,空间效率通常更好(当 ε < 3%)
"""
def __init__(self, capacity: int, bucket_size: int = 4,
fingerprint_bits: int = 8, max_kicks: int = 500):
"""
参数:
capacity: 桶的数量
bucket_size: 每个桶能存的指纹数量
fingerprint_bits: 指纹的位数(决定误判率 ≈ 2*bucket_size/2^fingerprint_bits)
max_kicks: 最大踢出次数
"""
self.capacity = capacity
self.bucket_size = bucket_size
self.fingerprint_bits = fingerprint_bits
self.max_kicks = max_kicks
self.buckets = [[] for _ in range(capacity)]
self.count = 0
def _fingerprint(self, item: str) -> int:
"""计算元素的指纹"""
h = mmh3.hash(item, seed=0) & ((1 << self.fingerprint_bits) - 1)
# 指纹不能为 0(0 表示空位)
return h if h != 0 else 1
def _hash1(self, item: str) -> int:
"""第一个桶位置"""
return mmh3.hash(item, seed=42) % self.capacity
def _hash2(self, index: int, fingerprint: int) -> int:
"""
第二个桶位置:用部分键布谷鸟哈希
i2 = i1 XOR hash(fingerprint)
这个设计保证:知道任一位置和指纹,就能算出另一个位置
"""
fp_hash = mmh3.hash(str(fingerprint), seed=100) % self.capacity
return (index ^ fp_hash) % self.capacity
def insert(self, item: str) -> bool:
"""
插入元素。返回 True 表示成功,False 表示过滤器已满。
"""
fp = self._fingerprint(item)
i1 = self._hash1(item)
i2 = self._hash2(i1, fp)
# 尝试两个桶
if len(self.buckets[i1]) < self.bucket_size:
self.buckets[i1].append(fp)
self.count += 1
return True
if len(self.buckets[i2]) < self.bucket_size:
self.buckets[i2].append(fp)
self.count += 1
return True
# 两个桶都满了,开始踢出
idx = random.choice([i1, i2])
for _ in range(self.max_kicks):
# 随机选一个指纹踢出
victim_idx = random.randint(0, self.bucket_size - 1)
fp, self.buckets[idx][victim_idx] = self.buckets[idx][victim_idx], fp
# 被踢出的指纹去它的备选位置
idx = self._hash2(idx, fp)
if len(self.buckets[idx]) < self.bucket_size:
self.buckets[idx].append(fp)
self.count += 1
return True
# 踢出次数超限,认为过滤器已满
return False
def __contains__(self, item: str) -> bool:
"""查询元素是否可能存在"""
fp = self._fingerprint(item)
i1 = self._hash1(item)
i2 = self._hash2(i1, fp)
return fp in self.buckets[i1] or fp in self.buckets[i2]
def delete(self, item: str) -> bool:
"""
删除元素。返回 True 表示删除成功。
注意:只有确实插入过的元素才应该被删除。
删除一个从未插入的元素可能导致假阴性!
"""
fp = self._fingerprint(item)
i1 = self._hash1(item)
i2 = self._hash2(i1, fp)
if fp in self.buckets[i1]:
self.buckets[i1].remove(fp)
self.count -= 1
return True
if fp in self.buckets[i2]:
self.buckets[i2].remove(fp)
self.count -= 1
return True
return False
# 演示删除能力
cf = CuckooFilter(capacity=10000, bucket_size=4, fingerprint_bits=12)
# 插入
for i in range(5000):
cf.insert(f"item_{i}")
print(f"插入后: 'item_42' in cf = {'item_42' in cf}") # True
# 删除
cf.delete("item_42")
print(f"删除后: 'item_42' in cf = {'item_42' in cf}") # False
# 误判率测试
false_positives = 0
tests = 10000
for i in range(tests):
if f"not_exist_{i}" in cf:
false_positives += 1
print(f"误判率: {false_positives/tests:.4f}")
# 理论误判率 ≈ 2 * bucket_size / 2^fingerprint_bits = 8/4096 ≈ 0.002
Cuckoo Filter vs Bloom Filter 对比
| 特性 | Bloom Filter | Cuckoo Filter |
|---|---|---|
| 删除操作 | 不支持 | 支持 |
| 查询速度 | 检查 k 个位置 | 只检查 2 个位置 |
| 空间效率(ε < 3%) | 较差 | 更好 |
| 空间效率(ε > 3%) | 更好 | 较差 |
| 插入(满载时) | 总是 O(1) | 可能需要踢出 |
| 实现复杂度 | 简单 | 中等 |
Level 3 · 规范怎么定义的
9.10 Burton Bloom 1970 年原始论文
Burton Howard Bloom 在 1970 年发表了论文 "Space/Time Trade-offs in Hash Coding with Allowable Errors"(Communications of the ACM, Vol. 13, No. 7, July 1970, pp. 422-426)。这篇仅 5 页的论文定义了整个领域。
原始动机
Bloom 的原始问题是:如何用有限内存判断一个英语单词是否在字典中(用于拼写检查或连字符位置查询)?当时的计算机内存极其有限,完整的字典需要太多空间。
Bloom 的关键洞察是:如果允许一小部分误判(把不在字典中的词判断为在字典中),可以用远小于原始数据的空间来解决成员判断问题。
论文核心贡献
- 提出了位向量 + 多哈希的架构:这个设计在 50 多年后仍然没有本质改变。
- 分析了最优参数选择:给出了误判率公式和最优哈希函数数量。
- 指出了 trade-off 的本质:更多内存 → 更低误判率;更多哈希函数 → 更慢的速度但更低的误判率(到某个最优点之后反而增加误判率)。
历史影响
Bloom Filter 被发明时是为了节省磁盘访问(那个年代磁盘 seek 极慢),后来被广泛应用于:
- Akamai CDN(判断 URL 是否被缓存)
- Google BigTable(减少不必要的磁盘读取)
- Apache Cassandra(判断 SSTable 中是否可能存在某个 key)
- Bitcoin SPV 节点(BIP 37, 轻量级交易过滤)
9.11 信息论下界
为什么布隆过滤器需要 $1.44 \cdot n \cdot \log_2(1/\varepsilon)$ 位?这不是巧合,而是接近信息论的极限。
信息论分析
考虑一个能回答"元素 x 是否在集合 S 中"的数据结构。集合 S 有 n 个元素,来自一个大小为 U 的全集。这个数据结构的信息论下界是多少?
如果允许误判率 ε,那么对于不在 S 中的任何元素,数据结构必须能区分"在 S 中"和"不在 S 中",允许 ε 的错误率。
Carter, Floyd, Gill, Markowsky & Wegman (1978) 证明了:
定理:任何支持集合成员查询、误判率不超过 ε 的数据结构,至少需要 $n \cdot \log_2(1/\varepsilon)$ 位。
布隆过滤器使用 $1.44 \cdot n \cdot \log_2(1/\varepsilon)$ 位,比下界多了 44%。这个 1.44 的因子来自于:
$$\frac{1}{(\ln 2)^2} = \frac{1}{0.4805} \approx 2.0814$$
而 $\frac{m}{n \cdot \log_2(1/\varepsilon)} = \frac{-\ln\varepsilon / (\ln 2)^2}{\log_2(1/\varepsilon)} = \frac{\ln(1/\varepsilon) / (\ln 2)^2}{\ln(1/\varepsilon) / \ln 2} = \frac{1}{\ln 2} \approx 1.44$
也就是说,布隆过滤器的空间效率是信息论最优的 1/1.44 ≈ 69.3%。
更紧凑的数据结构
有没有能达到信息论下界的数据结构?有的:
- Compressed Bloom Filter(Mitzenmacher, 2002):通过压缩位数组来接近下界(需要通信场景)。
- Ribbon Filter(Dillinger & Walzer, 2021):接近最优,空间利用率超过 99%。
- Xor Filter(Graf & Lemire, 2020):使用约 1.23 bits/element × log₂(1/ε),非常接近下界。
# 对比不同数据结构的空间效率
import math
def space_comparison(n, epsilon):
"""对比不同数据结构的理论空间需求"""
info_lower_bound = n * math.log2(1/epsilon) # 信息论下界
bloom_space = 1.44 * n * math.log2(1/epsilon) # 标准布隆过滤器
xor_space = 1.23 * n * math.log2(1/epsilon) # Xor Filter
print(f"n={n:,}, ε={epsilon}")
print(f" 信息论下界: {info_lower_bound/8/1024/1024:.2f} MB (100%)")
print(f" 布隆过滤器: {bloom_space/8/1024/1024:.2f} MB ({bloom_space/info_lower_bound*100:.1f}%)")
print(f" Xor Filter: {xor_space/8/1024/1024:.2f} MB ({xor_space/info_lower_bound*100:.1f}%)")
print()
space_comparison(10_000_000, 0.01)
space_comparison(10_000_000, 0.001)
space_comparison(1_000_000_000, 0.01)
9.12 Count-Min Sketch 的理论保证
Cormode & Muthukrishnan 2005 年的论文 "An Improved Data Stream Summary: The Count-Min Sketch and its Applications" 给出了严格的理论保证。
参数选择
给定误差参数 ε 和失败概率 δ:
- 宽度 $w = \lceil e/\varepsilon \rceil$
- 深度 $d = \lceil \ln(1/\delta) \rceil$
- 空间复杂度:$O(1/\varepsilon \cdot \log(1/\delta))$
误差保证
对于任意元素 x:
$$f(x) \leq \hat{f}(x) \leq f(x) + \varepsilon \cdot N$$
其中 $\hat{f}(x)$ 是估计频率,$f(x)$ 是真实频率,$N$ 是所有元素的总频率。这个保证以至少 $1-\delta$ 的概率成立。
为什么是单侧误差?
Count-Min Sketch 只会高估频率,不会低估。这是因为碰撞只会增加计数器的值。这个性质在很多应用中非常有用——例如,如果 CMS 说某个 IP 的访问频率没有超过阈值,那它一定没有超过。
Heavy Hitter 问题
Count-Min Sketch 的一个经典应用是找重击者(Heavy Hitters):在数据流中找出频率超过某个阈值的元素。
class HeavyHitterDetector:
"""
使用 Count-Min Sketch 检测重击者(频率超过 φ*N 的元素)
"""
def __init__(self, phi: float = 0.01, epsilon: float = 0.001, delta: float = 0.01):
"""
参数:
phi: 重击者阈值(频率超过 phi * N 的元素)
epsilon: CMS 误差参数(应远小于 phi)
delta: 失败概率
"""
self.phi = phi
self.cms = CountMinSketch(epsilon=epsilon, delta=delta)
self.candidates = set() # 候选重击者
def add(self, item: str):
self.cms.add(item)
# 如果估计频率超过阈值,加入候选集
threshold = self.phi * self.cms.total_count
if self.cms.estimate(item) >= threshold:
self.candidates.add(item)
def get_heavy_hitters(self) -> list:
"""返回所有可能的重击者及其估计频率"""
threshold = self.phi * self.cms.total_count
results = []
for item in self.candidates:
est = self.cms.estimate(item)
if est >= threshold:
results.append((item, est))
return sorted(results, key=lambda x: -x[1])
9.13 HyperLogLog 的理论基础
从 Flajolet-Martin 到 HyperLogLog
基数估计算法的发展历史:
- Flajolet-Martin (1985):"Probabilistic Counting Algorithms for Data Base Applications"。使用位模式观察,单个估计器方差很大。
- LogLog (2003):Durand & Flajolet。使用 m 个桶取算术平均,空间 O(log log n) per bucket。
- SuperLogLog (2003):截断异常值后取平均。
- HyperLogLog (2007):Flajolet, Fusy, Gandouet & Meunier。使用调和平均代替算术平均,精度提升到标准误差 $1.04/\sqrt{m}$。
为什么用调和平均而不是算术平均?
假设 m 个桶的值为 $M_1, M_2, \ldots, M_m$:
-
算术平均估计器:$E_{AM} = 2^{\frac{1}{m}\sum M_i}$
- 问题:对异常大值极其敏感(一个桶的值异常大会严重高估)
-
调和平均估计器:$E_{HM} = \frac{\alpha_m \cdot m^2}{\sum 2^{-M_i}}$
- 优势:对异常大值有天然的"降权"效果($2^{-M_i}$ 在 M_i 大时趋近 0)
- 这就是 HyperLogLog 的核心改进
修正因子 α_m
修正因子 $\alpha_m$ 用于消除估计器的偏差:
$$\alpha_m = \left(m \int_0^\infty \left(\log_2\frac{2+u}{1+u}\right)^m du\right)^{-1}$$
当 m 较大时,$\alpha_m \to 1/(2\ln 2) \approx 0.7213$。
Redis 中 p=14 对应的 $\alpha_{16384}$ 取值为 0.7213/(1+1.079/16384) ≈ 0.72125。
9.14 Redis 中的 HyperLogLog 实现
Redis 的 HyperLogLog 实现(由 Salvatore Sanfilippo 即 Antirez 编写)有几个重要的工程优化:
稀疏表示与密集表示
Redis 使用两种编码方式:
-
稀疏编码(小基数时):使用游程编码(RLE)压缩连续的零值桶。当大部分桶为 0 时,稀疏表示远小于 12KB。
-
密集编码(大基数时):16384 个桶,每个 6 位,总计 12,288 字节。当稀疏编码超过一定阈值(server.hll_sparse_max_bytes,默认 3000 字节)或桶值超过 32 时,自动转换为密集编码。
# Redis HLL 的稀疏编码示意
class RedisHLLSparse:
"""
Redis HLL 稀疏编码的简化模型
三种操作码:
- ZERO(len): len 个连续的零值桶 (1 byte, len <= 64)
- XZERO(len): 更长的连续零值桶 (2 bytes, len <= 16384)
- VAL(value, len): len 个连续的相同非零值桶 (1 byte, value <= 32, len <= 4)
"""
def __init__(self):
# 初始状态:16384 个零值桶 = 一个 XZERO(16384) 操作码
self.encoding = [("XZERO", 16384)]
self.is_dense = False
def estimated_memory(self):
"""估计当前内存使用"""
total = 0
for op in self.encoding:
if op[0] == "ZERO":
total += 1
elif op[0] == "XZERO":
total += 2
elif op[0] == "VAL":
total += 1
return total
偏差修正(Bias Correction)
Redis 实现了 Google 提出的改进修正策略(Heule, Nunkesser & Hall, 2013, "HyperLogLog in Practice"):
- 小基数(estimate ≤ 5m):使用预计算的偏差修正表进行插值。
- 中等基数:直接使用原始 HyperLogLog 估计。
- 大基数(接近 2^64):使用对数修正。
Level 4 · 边界与陷阱
9.15 布隆过滤器不能删除元素的深入分析
为什么不能删除?
考虑两个元素 A 和 B,它们的哈希位置有重叠:
- A 的哈希位置: {3, 7, 11}
- B 的哈希位置: {5, 7, 14}
位置 7 是共享的。如果删除 A(将位置 3, 7, 11 清零),位置 7 被清零会导致查询 B 时返回 False——B 明明存在,却说不存在。这引入了假阴性,破坏了布隆过滤器"不存在则一定不存在"的核心保证。
解决方案比较
| 方案 | 原理 | 优点 | 缺点 |
|---|---|---|---|
| Counting Bloom Filter | 每个位换成计数器 | 简单直觉 | 空间增大 3-4 倍 |
| Cuckoo Filter | 存指纹+布谷鸟哈希 | 空间效率好 | 实现复杂 |
| Quotient Filter | 商余数存储 | 缓存友好 | 聚集问题 |
| 重建 | 定期从源数据重建 | 无额外空间 | 重建开销大 |
class CountingBloomFilter:
"""
计数布隆过滤器:每个位变成一个 4-bit 计数器
支持删除操作,但空间开销是标准布隆过滤器的 4 倍。
计数器溢出(>15)时停止递增(饱和计数),可能导致假阴性
但这在实践中极其罕见。
"""
def __init__(self, expected_items: int, false_positive_rate: float = 0.01):
import math
self.m = int(math.ceil(-expected_items * math.log(false_positive_rate)
/ (math.log(2) ** 2)))
self.k = int(math.ceil((self.m / expected_items) * math.log(2)))
# 4-bit 计数器,2 个计数器共用 1 字节
self.counters = bytearray(math.ceil(self.m / 2))
def _get_counter(self, pos: int) -> int:
byte_idx = pos // 2
if pos % 2 == 0:
return self.counters[byte_idx] & 0x0F
else:
return (self.counters[byte_idx] >> 4) & 0x0F
def _set_counter(self, pos: int, value: int):
byte_idx = pos // 2
if pos % 2 == 0:
self.counters[byte_idx] = (self.counters[byte_idx] & 0xF0) | (value & 0x0F)
else:
self.counters[byte_idx] = (self.counters[byte_idx] & 0x0F) | ((value & 0x0F) << 4)
def _hash_positions(self, item: str) -> list:
import mmh3
h1 = mmh3.hash(item, seed=0) % self.m
h2 = mmh3.hash(item, seed=42) % self.m
return [(h1 + i * h2) % self.m for i in range(self.k)]
def add(self, item: str):
for pos in self._hash_positions(item):
val = self._get_counter(pos)
if val < 15: # 饱和计数,防止溢出
self._set_counter(pos, val + 1)
def remove(self, item: str) -> bool:
"""删除元素。返回 False 如果元素不存在。"""
positions = self._hash_positions(item)
# 先检查是否存在
if not all(self._get_counter(pos) > 0 for pos in positions):
return False
# 所有计数器减 1
for pos in positions:
val = self._get_counter(pos)
if val < 15: # 饱和的计数器不减(安全措施)
self._set_counter(pos, val - 1)
return True
def __contains__(self, item: str) -> bool:
return all(self._get_counter(pos) > 0 for pos in self._hash_positions(item))
9.16 面试题:设计一个 URL 去重系统
题目:设计一个系统,判断爬虫即将抓取的 URL 是否已经被抓取过。预期 URL 量为 100 亿,要求低延迟、可扩展。
分析框架
Step 1: 明确需求
- 100 亿个 URL,假设平均长度 100 字节
- 精确存储需要 ~1TB 内存,不可行
- 允许极低误判率(漏抓一些 URL 可接受)
- 不需要删除(已抓 URL 不会"未抓")
- 需要分布式(单机 1TB 不现实)
Step 2: 方案对比
import math
def memory_analysis():
"""各方案的内存分析"""
n = 10_000_000_000 # 100 亿 URL
# 方案1:HashSet
# 每个 URL hash 64位 + overhead,约 16 字节/元素
hashset_mem = n * 16 / (1024**3)
print(f"HashSet: {hashset_mem:.0f} GB")
# 方案2:布隆过滤器 (0.1% 误判率)
bloom_bits = -n * math.log(0.001) / (math.log(2)**2)
bloom_mem = bloom_bits / 8 / (1024**3)
print(f"Bloom Filter (0.1% FP): {bloom_mem:.1f} GB")
# 方案3:布隆过滤器 (1% 误判率)
bloom_bits_1 = -n * math.log(0.01) / (math.log(2)**2)
bloom_mem_1 = bloom_bits_1 / 8 / (1024**3)
print(f"Bloom Filter (1% FP): {bloom_mem_1:.1f} GB")
memory_analysis()
# HashSet: 149 GB
# Bloom Filter (0.1% FP): 17.9 GB
# Bloom Filter (1% FP): 11.9 GB
Step 3: 分布式架构
┌─────────────────────────────────────────────┐
│ URL 去重系统架构 │
├─────────────────────────────────────────────┤
│ │
│ 新 URL ──→ Hash(URL) % N ──→ 路由到节点 │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Node 0 │ │ Node 1 │ │ Node N │ │
│ │ BF 2GB │ │ BF 2GB │ │ BF 2GB │ │
│ │(12.5亿URL)│ │(12.5亿URL)│ │(12.5亿URL)│ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ 持久化层: 定期将位数组写入磁盘/对象存储 │
│ 恢复: 启动时从持久化加载位数组 │
└─────────────────────────────────────────────┘
Step 4: 完整回答要点
"""
面试回答框架:
1. 容量估算
- 100 亿 URL, 0.1% 误判率
- 布隆过滤器: 约 18 GB
- 分 8 个节点,每个节点约 2.2 GB
2. 分片策略
- URL 哈希取模,决定去哪个节点
- 一致性哈希(如果节点需要动态扩缩)
3. 持久化
- 位数组定期 checkpoint 到磁盘
- 增量日志:记录每个新插入操作
- 恢复时加载 checkpoint + replay 增量日志
4. 容错
- 主从复制:每个节点一个从节点
- 位数组丢失后从 checkpoint 恢复
5. 扩容
- 方案A:分层布隆过滤器(旧数据在 BF1,新数据在 BF2)
- 方案B:一致性哈希 + 迁移
6. 监控
- 监控实际误判率(抽样验证)
- 当元素数接近设计容量时报警
7. 优化
- 本地内存 + Redis 结合(热点 URL 本地缓存)
- 分层过滤:先查本地小 BF,miss 再查远程大 BF
"""
9.17 误判率与内存的工程权衡
在真实系统中,布隆过滤器的参数选择需要考虑多个维度:
def engineering_tradeoffs():
"""工程中的参数选择决策树"""
scenarios = [
{
"场景": "CDN URL 缓存判断",
"n": 100_000_000,
"误判代价": "一次额外的回源请求(成本低)",
"推荐误判率": 0.01,
"理由": "误判只是多一次回源,1%完全可以接受"
},
{
"场景": "恶意 URL 黑名单",
"n": 10_000_000,
"误判代价": "正常网站被标记为恶意(用户投诉)",
"推荐误判率": 0.0001,
"理由": "误判=误封,用户体验差,需要极低误判率"
},
{
"场景": "数据库 key 存在性检查",
"n": 50_000_000,
"误判代价": "一次无用的磁盘读取",
"推荐误判率": 0.001,
"理由": "磁盘IO成本中等,0.1%是好的平衡点"
},
{
"场景": "去重(爬虫/消息)",
"n": 1_000_000_000,
"误判代价": "漏处理一些URL/消息",
"推荐误判率": 0.001,
"理由": "漏抓少量URL可接受,但太多则影响覆盖率"
}
]
for s in scenarios:
m = -s["n"] * math.log(s["推荐误判率"]) / (math.log(2)**2)
k = int(math.ceil((m/s["n"]) * math.log(2)))
mem_mb = m / 8 / 1024 / 1024
print(f"场景: {s['场景']}")
print(f" 元素数: {s['n']:,}")
print(f" 误判率: {s['推荐误判率']}")
print(f" 内存: {mem_mb:.1f} MB")
print(f" 哈希数: {k}")
print(f" 理由: {s['理由']}")
print()
engineering_tradeoffs()
常见陷阱
陷阱 1:元素数超过设计容量
def capacity_overflow_demo():
"""展示超容后的误判率恶化"""
bf = BloomFilter(expected_items=10000, false_positive_rate=0.01)
for filled_ratio in [0.5, 1.0, 1.5, 2.0, 3.0]:
# 重新创建并填充
bf2 = BloomFilter(expected_items=10000, false_positive_rate=0.01)
actual_items = int(10000 * filled_ratio)
for i in range(actual_items):
bf2.add(f"item_{i}")
# 测试误判率
fp = 0
tests = 10000
for i in range(tests):
if f"not_exist_{i}" in bf2:
fp += 1
print(f"填充率 {filled_ratio:.0%} ({actual_items:,} items): "
f"实测误判率 = {fp/tests:.4f}")
# 典型输出:
# 填充率 50%: 实测误判率 = 0.0001
# 填充率 100%: 实测误判率 = 0.0098 (接近设计值 1%)
# 填充率 150%: 实测误判率 = 0.0500 (5%!)
# 填充率 200%: 实测误判率 = 0.1400 (14%!)
# 填充率 300%: 实测误判率 = 0.3700 (37%! 几乎无用)
结论:超容 50% 误判率就从 1% 涨到 5%,超容 2 倍达到 14%。设计容量必须留余量。推荐做法:设计容量设为预期最大值的 1.5-2 倍。
陷阱 2:哈希函数质量不够
"""
错误示范:用简单的哈希函数
"""
# 坏的哈希函数:位分布不均匀
def bad_hash(item, seed):
return (hash(item) + seed) % 1000 # hash() + 偏移不是独立哈希!
# 好的哈希函数:使用双重哈希技术
def good_hash(item, i, m):
import mmh3
h1 = mmh3.hash(item, seed=0)
h2 = mmh3.hash(item, seed=42)
return (h1 + i * h2) % m # Kirsch-Mitzenmacher (2006)
为什么 hash(item) + seed 不行? 因为这只是简单的偏移,不同 seed 产生的哈希值高度相关(差值恒定),违反了布隆过滤器要求哈希函数"独立"的假设。Kirsch & Mitzenmacher 2006 年证明了双重哈希 $g_i(x) = h_1(x) + i \cdot h_2(x)$ 在理论上与 k 个完全独立的哈希函数性能相同。
陷阱 3:并发写入的线程安全
import threading
class ThreadSafeBloomFilter:
"""
线程安全的布隆过滤器
注意:标准布隆过滤器的 add 操作不是原子的
(需要设置 k 个位,中间状态可能被其他线程观察到)
但这对正确性没有影响——最坏情况只是暂时的假阴性,
add 完成后就恢复正常。
真正的问题是位数组的并发写入可能导致更新丢失
(read-modify-write 不是原子的)。
"""
def __init__(self, expected_items, false_positive_rate=0.01):
self._bf = BloomFilter(expected_items, false_positive_rate)
self._lock = threading.Lock()
def add(self, item: str):
# 方案1:全局锁(简单但性能差)
with self._lock:
self._bf.add(item)
def __contains__(self, item: str) -> bool:
# 查询不需要加锁(只读操作)
# 但可能看到部分更新的状态(对布隆过滤器来说这是安全的)
return item in self._bf
# 更高性能的方案:分段锁
class StripedBloomFilter:
"""使用分段锁减少竞争"""
def __init__(self, expected_items, false_positive_rate=0.01, num_stripes=16):
self._bf = BloomFilter(expected_items, false_positive_rate)
self._stripes = [threading.Lock() for _ in range(num_stripes)]
def _get_stripe(self, position: int) -> threading.Lock:
return self._stripes[position % len(self._stripes)]
def add(self, item: str):
positions = self._bf._get_hash_positions(item)
# 按顺序获取锁,避免死锁
unique_stripes = sorted(set(p % len(self._stripes) for p in positions))
for s in unique_stripes:
self._stripes[s].acquire()
try:
self._bf.add(item)
finally:
for s in unique_stripes:
self._stripes[s].release()
9.18 Redis 布隆过滤器模块(RedisBloom)
Redis 原生不支持布隆过滤器,但 RedisBloom 模块(原 ReBloom)提供了生产级实现。
import redis
# 需要 Redis 加载了 RedisBloom 模块
r = redis.Redis()
# BF.RESERVE: 创建布隆过滤器
# BF.RESERVE key error_rate capacity [EXPANSION expansion] [NONSCALING]
r.execute_command('BF.RESERVE', 'urls:seen', 0.001, 10000000)
# 创建一个容量 1000万、误判率 0.1% 的布隆过滤器
# BF.ADD: 添加元素
r.execute_command('BF.ADD', 'urls:seen', 'https://example.com/page1')
# BF.MADD: 批量添加
r.execute_command('BF.MADD', 'urls:seen',
'https://example.com/page2',
'https://example.com/page3')
# BF.EXISTS: 查询
exists = r.execute_command('BF.EXISTS', 'urls:seen', 'https://example.com/page1')
print(f"exists: {exists}") # 1 (可能存在)
not_exists = r.execute_command('BF.EXISTS', 'urls:seen', 'https://never-added.com')
print(f"not_exists: {not_exists}") # 0 (一定不存在)
# BF.MEXISTS: 批量查询
results = r.execute_command('BF.MEXISTS', 'urls:seen',
'https://example.com/page1',
'https://never-added.com')
print(f"batch results: {results}") # [1, 0]
# BF.INFO: 查看过滤器信息
info = r.execute_command('BF.INFO', 'urls:seen')
print(f"info: {info}")
# 输出包括:Capacity, Size, Number of filters, Number of items inserted, Expansion rate
RedisBloom 的自动扩容机制
"""
RedisBloom 的 EXPANSION 参数允许自动扩容:
当第一个子过滤器满了,自动创建第二个容量为原来 expansion 倍的子过滤器。
查询时需要检查所有子过滤器(任一个返回"存在"则结果为"存在")。
代价:
- 查询变慢(需要查多个子过滤器)
- 误判率略微增加
- 但避免了因超容导致的误判率急剧恶化
默认 expansion=2,即每次扩容容量翻倍。
"""
# 创建可扩容的布隆过滤器
r.execute_command('BF.RESERVE', 'scalable:bf',
0.01, # 初始误判率
1000000, # 初始容量
'EXPANSION', 2) # 扩容倍数
# 如果不需要扩容(固定容量,性能更好)
r.execute_command('BF.RESERVE', 'fixed:bf',
0.01,
10000000,
'NONSCALING') # 超容后 BF.ADD 返回错误
生产环境最佳实践
"""
Redis 布隆过滤器的生产最佳实践:
1. 容量规划
- 预估峰值数据量的 2 倍作为 capacity
- 或使用 EXPANSION 自动扩容
2. 误判率选择
- 缓存穿透: 0.01 (1%)
- URL 去重: 0.001 (0.1%)
- 安全黑名单: 0.0001 (0.01%)
3. 持久化
- RedisBloom 数据随 Redis RDB/AOF 持久化
- 大过滤器的 RDB 保存可能导致延迟抖动
4. 内存估算
- BF.DEBUG 可查看实际内存使用
- 经验公式: n * (-ln(ε) / (ln2)^2) / 8 字节
5. 监控指标
- 已插入元素数 vs 设计容量
- 实际误判率(定期抽样测试)
- 子过滤器数量(扩容次数)
"""
class BloomFilterManager:
"""Redis 布隆过滤器管理器"""
def __init__(self, redis_client, key_prefix="bf:"):
self.r = redis_client
self.prefix = key_prefix
def create_if_not_exists(self, name: str, error_rate: float,
capacity: int, expansion: int = 2):
"""安全创建(幂等)"""
key = f"{self.prefix}{name}"
try:
self.r.execute_command('BF.INFO', key)
except redis.ResponseError:
self.r.execute_command('BF.RESERVE', key,
error_rate, capacity,
'EXPANSION', expansion)
def check_health(self, name: str) -> dict:
"""检查过滤器健康状态"""
key = f"{self.prefix}{name}"
info = self.r.execute_command('BF.INFO', key)
# 解析 info(返回格式因版本而异)
info_dict = {}
for i in range(0, len(info), 2):
info_dict[info[i].decode()] = info[i+1]
capacity = info_dict.get('Capacity', 0)
inserted = info_dict.get('Number of items inserted', 0)
fill_ratio = inserted / capacity if capacity > 0 else 0
status = "healthy"
if fill_ratio > 0.9:
status = "critical" # 即将满载
elif fill_ratio > 0.7:
status = "warning" # 接近容量
return {
"key": key,
"capacity": capacity,
"inserted": inserted,
"fill_ratio": fill_ratio,
"status": status
}
9.19 概率数据结构的统一视角
布隆过滤器、Count-Min Sketch、HyperLogLog 看似解决不同问题,但它们共享一个核心思想:
用哈希将数据映射到紧凑的摘要结构中,牺牲少量精确性,换取数量级的空间节省。
| 数据结构 | 回答的问题 | 误差类型 | 空间复杂度 |
|---|---|---|---|
| Bloom Filter | x ∈ S? | 假阳性 | O(n) bits |
| Count-Min Sketch | freq(x) = ? | 只高估 | O(1/ε · log(1/δ)) |
| HyperLogLog | S | = ? | |
| Cuckoo Filter | x ∈ S? + delete | 假阳性 | O(n) bits |
| MinHash | Jaccard(A,B) = ? | ±相对误差 | O(1/ε²) |
它们的共同特点:
- 单向性:可以插入,但信息被压缩,原始数据不可恢复。
- 可合并性:两个摘要可以合并为一个(支持分布式计算)。
- 误差可控:用户可以通过增加空间来降低误差。
- 流式处理:只需要扫描数据一次,不需要随机访问。
这些性质使得概率数据结构特别适合大数据流处理场景——数据量太大无法精确存储,但近似答案已经足够做出正确的工程决策。
本章小结
| 概念 | 核心 | 空间 | 误差 | 典型应用 |
|---|---|---|---|---|
| Bloom Filter | 位数组 + k 哈希 | ~10 bits/elem @1% FP | 假阳性 | 缓存穿透、URL去重 |
| Count-Min Sketch | d×w 计数矩阵 | O(1/ε · log(1/δ)) | 只高估 | 流量统计、热点检测 |
| HyperLogLog | 前导零观察 | ~12KB (Redis) | ±0.81% | 独立访客数 |
| Cuckoo Filter | 指纹 + 布谷鸟哈希 | ~12 bits/elem @0.1% FP | 假阳性 | 需要删除的场景 |
核心思想:在精确性和空间之间找到最优的工程平衡点。当你面对"TB 级数据做 membership/frequency/cardinality 查询"的问题时,概率数据结构几乎总是正确答案。