第 34 章

缓存穿透、击穿与雪崩:原理与解决方案

第34章 缓存穿透、击穿与雪崩

缓存三大问题是 Redis 工程实践中最高频被问到、最容易在生产环境酿成事故的知识点。本章系统梳理穿透、击穿、雪崩的成因、危害、检测方式与解决方案,并给出可直接落地的代码实现。


34.1 缓存穿透(Cache Penetration)

34.1.1 定义与成因

缓存穿透:请求的 key 在缓存中不存在,且在数据库中也不存在,导致每次请求都直接打到数据库。

正常流量:
  request(user:1001) → Redis HIT → 返回缓存值

穿透流量:
  request(user:-1) → Redis MISS → DB 查询 → 未找到 → 不写缓存 → 下次仍然穿透

触发场景

危害:每次穿透请求直接打到数据库,高并发下等同于 DB 压测,可致数据库崩溃。

34.1.2 解决方案1:缓存空值

将数据库中"未找到"的结果以短时间 TTL 缓存到 Redis。

import redis
import json

r = redis.Redis(decode_responses=True)
NULL_PLACEHOLDER = '__NULL__'

def get_user(user_id: int):
    key = f'user:{user_id}'
    cached = r.get(key)

    if cached is not None:
        if cached == NULL_PLACEHOLDER:
            return None          # 命中空值缓存,直接返回 None
        return json.loads(cached)

    # 缓存未命中,查数据库
    user = db.query_user(user_id)

    if user is None:
        # 缓存空值,TTL 设短(5分钟),防止占用大量内存
        r.setex(key, 300, NULL_PLACEHOLDER)
        return None

    # 缓存真实数据,TTL 设长(1小时)
    r.setex(key, 3600, json.dumps(user))
    return user

缺点与注意事项

34.1.3 解决方案2:布隆过滤器(Bloom Filter)

布隆过滤器是一个概率型数据结构:可以快速判断"一个元素一定不在集合中"或"可能在集合中"。不会有漏报(false negative),只有误报(false positive)

原理:k 个哈希函数将元素映射到 m 位的 bit 数组,置对应位为 1。查询时,若任一位为 0 则元素一定不存在;若全部为 1 则可能存在(有误判率)。

误判率公式(k 个哈希函数,m 位,n 个元素):

FPR ≈ (1 - e^(-kn/m))^k

最优哈希函数数量:k = (m/n) × ln(2)

位数组大小计算(给定误判率 p,元素数 n):

m = -n × ln(p) / (ln(2))^2
n(元素数) p(误判率) m(bit 数) 内存
100万 1% 9,585,059 位 1.1 MB
1000万 0.1% 143,775,879 位 17 MB
1亿 0.01% 1,917,011,718 位 228 MB

Redisson 布隆过滤器实现

// 初始化(服务启动时)
RBloomFilter<Long> bloomFilter = redisson.getBloomFilter("known_user_ids");
// tryInit 是幂等的,重复调用不报错
bloomFilter.tryInit(10_000_000L, 0.001); // 1000万元素,0.1%误判率

// 预热:将所有合法 ID 加入过滤器
try (Stream<Long> ids = userRepository.streamAllIds()) {
    ids.forEach(bloomFilter::add);
}
log.info("Bloom filter warmed up, approximate count: {}", bloomFilter.count());

// 请求处理
public User getUser(long userId) {
    // 布隆过滤器判断:不存在则 100% 确定不存在
    if (!bloomFilter.contains(userId)) {
        return null;  // 直接拒绝,不查 DB
    }
    // 可能存在(含0.1%误判),继续查缓存/DB
    String cached = redis.get("user:" + userId);
    if (cached != null) return deserialize(cached);

    User user = db.findById(userId);
    if (user != null) {
        redis.setex("user:" + userId, 3600, serialize(user));
    }
    return user;
}

// 新增用户时同步更新布隆过滤器
public User createUser(User user) {
    User saved = userRepository.save(user);
    bloomFilter.add(saved.getId());
    return saved;
}

redis-py 布隆过滤器(RedisBloom 模块 / Redis Stack):

from redis.commands.bf import BFCommands

r = redis.Redis()

# 创建布隆过滤器
r.bf().create('known_users', 0.001, 10_000_000)

# 批量添加
user_ids = db.get_all_user_ids()
pipe = r.pipeline(transaction=False)
for i, uid in enumerate(user_ids):
    pipe.bf().add('known_users', uid)
    if i % 1000 == 0:
        pipe.execute()
        pipe = r.pipeline(transaction=False)
pipe.execute()

# 请求处理
def get_user(user_id):
    if not r.bf().exists('known_users', user_id):
        return None  # 一定不存在
    return fetch_from_cache_or_db(user_id)

34.2 缓存击穿(Cache Breakdown / Hotspot Invalidation)

34.2.1 定义与场景

缓存击穿:单个热点 key 在过期瞬间,大量并发请求同时发现 Cache Miss,全部穿透到数据库,造成数据库瞬间过载。

时间线:
  t=0: 10万并发请求 key = "product:SKU001:detail"
  t=1: key 恰好在 t=0 时过期
  → 10万请求同时 Cache Miss
  → 10万请求同时查询 DB
  → DB 崩溃或超时

与穿透的区别:

34.2.2 解决方案1:互斥锁(Mutex Lock)

只让一个线程查询数据库并重建缓存,其他线程等待:

import redis
import time
import json

r = redis.Redis(decode_responses=True)

def get_product_detail(sku_id: str) -> dict:
    key = f'product:{sku_id}:detail'
    cached = r.get(key)
    if cached:
        return json.loads(cached)

    # Cache Miss — 尝试获取互斥锁
    lock_key = f'lock:{key}'
    lock_acquired = r.set(lock_key, 1, nx=True, ex=5)  # 最多持锁5秒

    if lock_acquired:
        try:
            # 双重检查:可能在等锁期间其他线程已重建缓存
            cached = r.get(key)
            if cached:
                return json.loads(cached)

            product = db.query_product(sku_id)
            if product:
                r.setex(key, 3600, json.dumps(product))
            return product
        finally:
            r.delete(lock_key)
    else:
        # 未获得锁,短暂等待后重试
        time.sleep(0.05)
        return get_product_detail(sku_id)  # 递归重试(注意:需防无限递归)
func getProductDetail(ctx context.Context, rdb *redis.Client, skuID string) (*Product, error) {
    key := "product:" + skuID + ":detail"
    lockKey := "lock:" + key

    // 检查缓存
    cached, err := rdb.Get(ctx, key).Bytes()
    if err == nil {
        var p Product
        json.Unmarshal(cached, &p)
        return &p, nil
    }

    // 尝试获取分布式锁(NX + 5秒超时)
    ok, err := rdb.SetNX(ctx, lockKey, 1, 5*time.Second).Result()
    if err != nil {
        return nil, err
    }

    if ok {
        defer rdb.Del(ctx, lockKey)

        // 双重检查
        if cached, err = rdb.Get(ctx, key).Bytes(); err == nil {
            var p Product
            json.Unmarshal(cached, &p)
            return &p, nil
        }

        product, err := db.QueryProduct(skuID)
        if err != nil {
            return nil, err
        }
        data, _ := json.Marshal(product)
        rdb.SetEx(ctx, key, data, time.Hour)
        return product, nil
    }

    // 等待重试
    time.Sleep(50 * time.Millisecond)
    return getProductDetail(ctx, rdb, skuID)
}

互斥锁的问题:等待锁期间请求被阻塞,高并发下会形成"锁等待队列",可能引发雪崩效应(所有等待线程同时释放后再次打到 DB)。

34.2.3 解决方案2:逻辑过期(Logical Expiration)

不在 Redis 中设置 TTL,而是在 value 内部嵌入一个"逻辑过期时间"字段。缓存永不被 Redis 自动删除,读到"逻辑过期"数据时异步触发刷新,对调用方返回旧值(牺牲强一致性换取高可用)。

import json
import time
import threading
from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class CacheEntry:
    value: Any
    expire_at: float  # unix timestamp

r = redis.Redis(decode_responses=True)
_refresh_locks = {}  # 进程内锁

def get_with_logical_expire(key: str, loader_fn, ttl: int = 3600) -> Optional[Any]:
    raw = r.get(key)
    if raw is None:
        # 冷启动:同步加载
        data = loader_fn()
        entry = CacheEntry(value=data, expire_at=time.time() + ttl)
        r.set(key, json.dumps(entry.__dict__))  # 永不过期
        return data

    entry_dict = json.loads(raw)
    entry = CacheEntry(**entry_dict)

    if entry.expire_at > time.time():
        return entry.value  # 未逻辑过期,直接返回

    # 逻辑过期——异步刷新,返回旧值
    lock = _refresh_locks.setdefault(key, threading.Lock())
    if lock.acquire(blocking=False):  # 非阻塞尝试获取进程内锁
        def refresh():
            try:
                new_data = loader_fn()
                new_entry = CacheEntry(value=new_data, expire_at=time.time() + ttl)
                r.set(key, json.dumps(new_entry.__dict__))
            finally:
                lock.release()
        threading.Thread(target=refresh, daemon=True).start()

    return entry.value  # 立即返回旧值,不阻塞

# 使用
product = get_with_logical_expire(
    key=f'product:{sku_id}:detail',
    loader_fn=lambda: db.query_product(sku_id),
    ttl=3600
)

逻辑过期 vs 互斥锁对比

互斥锁 逻辑过期
一致性 强(刷新完成前阻塞) 最终一致(可能读到旧值)
可用性 刷新期间请求可能超时 始终可用(返回旧值)
复杂度 中等 高(需异步刷新机制)
适用场景 强一致要求 高可用/高并发热点数据

34.3 缓存雪崩(Cache Avalanche)

34.3.1 定义与两种成因

缓存雪崩:大量缓存在同一时刻失效(或 Redis 完全不可用),所有请求同时打到数据库,数据库在极短时间内承受巨大压力。

成因1:集中失效(大量 key TTL 相同,同时过期):

# 反模式:所有 key 使用相同 TTL
for product in products:
    r.setex(f'product:{product.id}', 3600, serialize(product))  # 1小时后集体过期

成因2:Redis 实例宕机(单点故障或网络分区导致 Redis 不可用)。

34.3.2 解决方案1:随机 TTL 打散过期时间

import random

def cache_product(product_id: int, data: dict, base_ttl: int = 3600):
    # 在基础 TTL 上加随机抖动(±10%),打散过期时间分布
    jitter = random.randint(-360, 360)   # ±10%
    actual_ttl = base_ttl + jitter
    r.setex(f'product:{product_id}', actual_ttl, json.dumps(data))
// Java 版本
int jitter = ThreadLocalRandom.current().nextInt(-360, 361);
jedis.setex(key, baseTtl + jitter, value);

更精细的 TTL 策略(按 key 热度差异化):

34.3.3 解决方案2:Redis 高可用部署

Sentinel 模式:3个(或5个)哨兵监控主节点,主节点故障后30–60秒内自动完成故障转移。期间业务受影响(新主节点选出前请求失败)。

Cluster 模式:16384 个槽分散到多个主节点,单个主节点宕机只影响该分片(约 1/N 的 key),其他分片继续服务。

双写多活(跨 IDC):在两个独立数据中心各部署一套 Redis,使用异步复制同步数据,任一 IDC 故障不影响全局可用性(需处理数据一致性)。

34.3.4 解决方案3:多级缓存

请求 → L1(进程内 Caffeine/Guava Cache)→ L2(Redis)→ DB

L1 缓存(进程内):延迟 < 0.01ms,无网络开销,但容量小(通常几百MB)、不跨进程共享

@Configuration
public class CacheConfig {

    @Bean
    public Cache<String, Object> localCache() {
        return Caffeine.newBuilder()
            .maximumSize(10_000)
            .expireAfterWrite(Duration.ofMinutes(5))
            .recordStats()      // 开启命中率统计
            .build();
    }
}

@Service
public class ProductService {
    @Autowired private Cache<String, Object> localCache;
    @Autowired private StringRedisTemplate redisTemplate;

    public Product getProduct(String id) {
        String key = "product:" + id;

        // L1 查询
        Product cached = (Product) localCache.getIfPresent(key);
        if (cached != null) return cached;

        // L2 查询
        String json = redisTemplate.opsForValue().get(key);
        if (json != null) {
            Product product = deserialize(json);
            localCache.put(key, product);  // 回填 L1
            return product;
        }

        // DB 查询
        Product product = productRepository.findById(id).orElse(null);
        if (product != null) {
            redisTemplate.opsForValue().set(key, serialize(product), Duration.ofHours(1));
            localCache.put(key, product);
        }
        return product;
    }
}

L1 缓存失效同步(Redis 宕机时 L1 撑住部分流量):

L1 过期时间通常设为 L2 的 1/12(5分钟 vs 1小时),L1 过期后请求 L2,L2 不可用时降级直查 DB(结合限流保护)。

34.3.5 解决方案4:熔断与限流

当 Redis 不可用时,不应将全量请求打到数据库。配合熔断器(Circuit Breaker)和限流器:

# 使用 pybreaker 实现熔断
from pybreaker import CircuitBreaker

db_breaker = CircuitBreaker(fail_max=5, reset_timeout=30)

@db_breaker
def query_db(key):
    return db.get(key)

def get_data(key):
    # 先查 Redis
    try:
        val = r.get(key)
        if val:
            return val
    except redis.RedisError:
        pass  # Redis 不可用,跌入 DB

    # 熔断保护的 DB 查询
    try:
        return query_db(key)
    except pybreaker.CircuitBreakerError:
        # 熔断开启,DB 已过载,返回降级响应
        return get_degraded_response(key)
// Spring Boot + Resilience4j
@CircuitBreaker(name = "redis", fallbackMethod = "getFromDb")
public String getFromCache(String key) {
    return redisTemplate.opsForValue().get(key);
}

@RateLimiter(name = "db-fallback", fallbackMethod = "getDegraded")
public String getFromDb(String key, Exception ex) {
    return dbRepository.findByKey(key);
}

public String getDegraded(String key, Exception ex) {
    return "default-response";  // 降级兜底
}

34.4 三种问题横向对比

维度 缓存穿透 缓存击穿 缓存雪崩
触发条件 查询不存在的 key 热点 key 过期瞬间高并发 大量 key 同时过期 / Redis 宕机
影响范围 单个不存在的 key 单个热点 key 全局大量 key
对 DB 的冲击 持续小流量穿透 瞬间大流量峰值 瞬间全量流量涌入
首选方案 布隆过滤器 逻辑过期 / 互斥锁 随机 TTL + 多级缓存
兜底方案 缓存空值 互斥锁(降级) 熔断 + 限流 + 降级

34.5 监控与告警

生产环境中应针对三种问题设置告警:

# 缓存命中率下降告警(穿透/雪崩早期信号)
redis-cli INFO stats | grep -E 'keyspace_hits|keyspace_misses'
# 计算 hit_rate = hits / (hits + misses),低于 80% 告警

# 连接数激增告警(雪崩时 DB 连接池耗尽的前兆)
redis-cli INFO clients | grep connected_clients

# 内存使用告警(缓存空值过多导致内存膨胀)
redis-cli INFO memory | grep used_memory_human

Prometheus + Grafana 指标

# prometheus.yml 中配置 redis_exporter
scrape_configs:
  - job_name: redis
    static_configs:
      - targets: ['localhost:9121']

# 关键告警规则
- alert: RedisCacheMissRatioHigh
  expr: rate(redis_keyspace_misses_total[5m]) /
        (rate(redis_keyspace_hits_total[5m]) + rate(redis_keyspace_misses_total[5m])) > 0.3
  for: 2m
  labels:
    severity: warning
  annotations:
    summary: "Redis cache miss ratio > 30%"

- alert: RedisConnectionSurge
  expr: redis_connected_clients > 2000
  for: 1m
  labels:
    severity: critical
本章评分
4.5  / 5  (3 评分)

💬 留言讨论