第 30 章

Key 设计与数据建模:避开所有常见陷阱

第30章 Key 设计与数据建模:避开所有常见陷阱

30.1 Key 设计的重要性

Redis 的 Key 不只是一个标识符,它直接影响内存占用、运维可读性、集群路由、扫描效率和过期策略。好的 Key 设计能在系统规模增大时保持可维护性;糟糕的 Key 设计会在数据量增长到百亿级别后积累成系统性问题。

本章从命名、序列化、热点、大 Key、TTL 五个维度,系统梳理生产环境中的最佳实践和常见陷阱。


30.2 Key 命名规范

30.2.1 层次结构命名

最广泛接受的命名惯例是 业务域:实体类型:id[:字段]

# 推荐格式
user:1000:profile          # 用户1000的profile(Hash)
user:1000:orders           # 用户1000的订单列表(List/ZSet)
order:20240501:12345       # 2024年5月1日的订单12345(String/Hash)
cache:product:SKU-9527     # 商品缓存(String)
lock:payment:order-12345   # 分布式锁(String)
rate:api:user:1000:v2      # 用户1000对v2 API的限流计数(String)
leaderboard:game:101:2024W20  # 游戏101的第20周排行榜(ZSet)

分隔符选择

30.2.2 Key 长度控制

# Key 内存占用(Redis 内部)
- dict entry:每个 key 约占 64 字节的 dictEntry 结构
- key 字符串本身:sds(Simple Dynamic String)header + 内容
- 总计:每个 key 约 90~200 字节基础开销

# 过长的 key(避免)
user_profile_data_for_the_user_with_id_12345_and_type_premium  # 60字符,语义冗余

# 过短的 key(避免)
u:1:p   # 6字符,完全无法从 key 判断业务含义

# 合适的 key
user:12345:profile  # 18字符,层次清晰,含义明确

规则

30.2.3 特殊字符与编码

# Redis key 可以包含任意二进制内容,但以下字符建议避免
空格:KEYS "user 1000"  # 命令行解析困难
换行符:会干扰调试工具
控制字符:难以在日志中识别

# 中文 key(可行,但不推荐)
SET 用户:1000:档案 value  # 合法,但每个汉字3字节,key 更长
# 推荐用英文缩写 + 数字

# Cluster 模式下的哈希标签
# {...} 内的内容决定 slot,用于控制同业务 key 落到同一节点
SET {user:1000}:profile value
SET {user:1000}:session value
# 两者落在同一 slot(可以在同一 Pipeline 或 MULTI/EXEC 中操作)

30.2.4 命名空间规划(多业务共用 Redis)

当多个业务系统共用同一个 Redis 实例时,必须用前缀严格隔离:

# 每个业务有独立的前缀
ecommerce:product:123      # 电商系统
crm:user:456               # CRM系统
bi:report:cache:20240501   # BI缓存

更好的方案(单实例多业务):

# redis.conf
databases 16   # 默认16个db,用 SELECT 0-15 切换
# 缺点:同一实例,无法独立配置 maxmemory、无法独立监控
# 推荐方案:每业务单独 Redis 实例,用端口或 Cluster 隔离

30.3 序列化选型

30.3.1 各格式对比

Redis 存储的 Value 是字节串,应用需要选择序列化格式:

格式 压缩率 序列化速度 跨语言支持 可读性 推荐场景
JSON 低(基准) 慢(字符串解析) 完美 调试、小对象、API响应缓存
Protobuf 高(~1/3 JSON) 极快(二进制) 好(有IDL) 高频大对象、跨语言微服务
MessagePack 中(~1/2 JSON) 快(二进制) 好(无IDL) 通用场景(JSON的高效替代)
Avro 高(需schema) 好(Schema Registry) Kafka+Redis协同场景
Hessian/Kryo 差(Java专属) Java单体应用
Thrift 好(有IDL) 老系统兼容

30.3.2 实测数据:用户对象(100个字段)

格式 序列化大小 序列化时间(μs) 反序列化时间(μs)
JSON 2.1 KB 45 62
MessagePack 1.0 KB 18 22
Protobuf 650 B 8 11
原始 String 拼接 500 B 3 12

结论:对于用户 Profile、商品详情等频繁读写的大对象,Protobuf 在带宽和 CPU 上均有显著优势。对于调试和运维可见性重要的场景,JSON 的可读性价值不可忽视。

30.3.3 压缩 Value

对于超大 Value(>10KB),在序列化后再压缩:

import gzip, json, redis

r = redis.Redis()

def set_compressed(key: str, obj: dict, ttl: int = 3600):
    data = json.dumps(obj).encode('utf-8')
    compressed = gzip.compress(data, compresslevel=6)
    r.setex(key, ttl, compressed)

def get_compressed(key: str) -> dict:
    data = r.get(key)
    if data is None:
        return None
    return json.loads(gzip.decompress(data).decode('utf-8'))

# 压缩效果:10KB JSON → 2KB gzip(5x压缩比)
# 代价:每次读写增加约 0.5ms CPU 时间

何时启用压缩


30.4 Hot Key(热点 Key)

30.4.1 什么是 Hot Key

当单个 Key 的访问量占到 Redis 节点总 QPS 的 10%~30% 以上时,该 Key 成为热点:

典型热点场景:
- 微博热搜第一(get hotSearch:rank:1)
- 爆款商品库存(get/decr stock:product:SKU-9527)
- 首页配置(get config:homepage)
- 全局计数器(incr global:pageview)
- 热门活动券码(get coupon:activity:999)

危害

  1. 单节点 CPU 100%,其他 key 的响应变慢
  2. Cluster 中数据倾斜(热点节点负载是其他节点的10倍+)
  3. 网络带宽耗尽(大 Value 热点 + 高QPS)

30.4.2 检测热点 Key

# 方案1:redis-cli --hotkeys(需要 maxmemory-policy = *-lfu)
redis-cli --hotkeys -h redis-host -p 6379
# 输出:
# -------- summary -------
# Sampled 1000000 keys in the keyspace!
# hot key found with counter: 9842  keyname: hotSearch:rank:1
# hot key found with counter: 7234  keyname: stock:product:SKU-9527

# 方案2:MONITOR(实时监控,线上慎用,极大影响性能)
redis-cli monitor | head -1000 | grep "GET " | awk '{print $4}' | sort | uniq -c | sort -rn | head -20

# 方案3:客户端统计(推荐)
# 在客户端拦截层记录 key 访问频率,上报监控系统(Prometheus)
# 无需直接操作 Redis

# 方案4:Redis 4.0+ OBJECT FREQ(LFU 策略下)
OBJECT FREQ hotSearch:rank:1  # 返回 LFU 访问频率估计值

30.4.3 热点 Key 解决方案

方案一:本地缓存(L1 Cache)

from cachetools import TTLCache
import threading

class LocalCache:
    def __init__(self, maxsize=1000, ttl=5):
        self._cache = TTLCache(maxsize=maxsize, ttl=ttl)
        self._lock = threading.Lock()

    def get(self, key: str):
        return self._cache.get(key)

    def set(self, key: str, value, ttl: int = None):
        with self._lock:
            self._cache[key] = value

local_cache = LocalCache(maxsize=200, ttl=3)  # 3秒本地缓存

def get_hot_value(key: str):
    # L1: 本地缓存
    val = local_cache.get(key)
    if val is not None:
        return val
    # L2: Redis
    val = r.get(key)
    if val:
        local_cache.set(key, val)
    return val

优点:完全不访问 Redis,延迟 < 0.1ms 缺点:数据存在短暂不一致(TTL 期间),多实例缓存不共享

方案二:Key 拆分(副本分散)

import random

def get_sharded_hot_key(base_key: str, shard_count: int = 10):
    """读时随机选一个分片,所有分片内容相同"""
    shard_idx = random.randint(0, shard_count - 1)
    return f"{base_key}:shard:{shard_idx}"

def set_sharded_hot_key(base_key: str, value: str, shard_count: int = 10, ttl: int = 60):
    """写时更新所有分片(保持一致性)"""
    pipe = r.pipeline()
    for i in range(shard_count):
        pipe.setex(f"{base_key}:shard:{i}", ttl, value)
    pipe.execute()

# 使用
# 写:更新所有分片
set_sharded_hot_key("hotSearch:rank:1", "Redis 8.0发布", shard_count=10)
# 读:随机读一个分片(QPS 分散到10个 key,10个节点)
val = r.get(get_sharded_hot_key("hotSearch:rank:1", shard_count=10))

方案三:读写分离

# 利用 Redis 从库承担读压力
# redis-py 配置主从读写分离
from redis.sentinel import Sentinel

sentinel = Sentinel([('sentinel1', 26379), ('sentinel2', 26379)])
master = sentinel.master_for('mymaster', socket_timeout=0.1)
slave = sentinel.slave_for('mymaster', socket_timeout=0.1)

# 写操作用 master
master.set('hotkey', 'value')
# 读操作用 slave(分散热点读压力)
val = slave.get('hotkey')

30.5 Big Key(大 Key)

30.5.1 大 Key 的定义

业界通用标准(以下任一条件满足):

数据类型 大 Key 阈值
String > 10 KB
Hash > 5000 个 field
List > 5000 个元素
Set > 5000 个元素
ZSet > 5000 个元素
Stream > 10000 条记录

30.5.2 大 Key 的危害

1. 网络传输慢

# GET 一个 1MB 的大 key,网络耗时约
# 千兆内网:~1ms(1MB / 125MB/s)
# 万兆内网:~0.1ms
# 客户端序列化:额外 1~5ms
# 对比:小 key GET 延迟约 0.1ms

2. 主线程阻塞(过期删除和持久化)

# DEL 一个有 10万个 field 的 Hash
# 释放内存的时间:数十毫秒 → 阻塞主线程
# 解决:用 UNLINK 代替 DEL(异步删除)
UNLINK big_hash   # 立即返回,后台线程释放内存

# 同理,FLUSHDB ASYNC、FLUSHALL ASYNC
FLUSHDB ASYNC

3. 集群数据倾斜

一个10MB的大 key 使其所在节点的内存和带宽占用远超其他节点,破坏集群负载均衡。

4. RDB 和 AOF 性能

大 key 在 RDB 序列化时占用大量 I/O,拉长 fork 后的 COW(copy-on-write)时间。

30.5.3 检测大 Key

# 方案1:redis-cli --bigkeys(使用 SCAN,不阻塞主线程)
redis-cli --bigkeys -h redis-host -p 6379
# 输出:
# Biggest string found so far 'user:1000:bio' with 52428 bytes
# Biggest hash found so far 'user:events:all' with 125432 fields

# 方案2:检查单个 key 内存占用
redis-cli memory usage user:1000:bio        # 返回字节数
redis-cli debug object user:1000:bio        # 序列化长度、编码、LRU等

# 方案3:OBJECT ENCODING 查看编码
redis-cli object encoding user:events:all   # hash (ziplist or hashtable)

# 方案4:RDB 分析工具
# rdb-tools: pip install rdbtools
rdb --command memory dump.rdb | sort -t, -k4 -rn | head -20

# redis-rdb-cli(Java工具)
rct -c memory -s /path/to/dump.rdb -o report.csv

30.5.4 大 Key 拆分策略

String 类型大 Value

# 场景:用户生成内容(长文本、图片base64)存为 String

# 策略1:存对象存储,Redis 只存 URL
r.setex("article:1000:content", 3600, "https://cdn.example.com/articles/1000.json")

# 策略2:分块存储(不推荐,读取需要多次往返)
def set_chunked(key: str, data: bytes, chunk_size: int = 1024*1024):
    chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
    pipe = r.pipeline()
    for i, chunk in enumerate(chunks):
        pipe.setex(f"{key}:chunk:{i}", 3600, chunk)
    pipe.setex(f"{key}:meta:chunks", 3600, len(chunks))
    pipe.execute()

Hash 类型大 Key(大量 field)

# 场景:用户属性有 500+ 个 field 存在同一个 Hash

# 策略:按 field 前缀分桶,拆分为多个 Hash
# 原来:HSET user:1000 name Alice age 30 city Beijing ... (500 fields)

# 拆分后
HSET user:1000:basic  name Alice age 30 gender F          # 基础属性
HSET user:1000:contact email [email protected] phone 13800138000    # 联系方式
HSET user:1000:prefs language zh timezone Asia/Shanghai   # 偏好设置
HSET user:1000:stats login_count 150 last_login 1716000000 # 统计数据

# 读取时只取需要的子 Hash
HGETALL user:1000:basic   # 只读基础属性

List/ZSet 类型大 Key(大量元素)

# 场景:用户的历史行为列表存在单个 ZSet(10万+ 元素)

# 策略1:时间分片(按月/周分桶)
# 原来:ZADD user:1000:history score action
# 拆分:
import time
from datetime import datetime

def add_user_history(user_id: int, action: str, score: float):
    month = datetime.now().strftime("%Y%m")
    key = f"user:{user_id}:history:{month}"
    r.zadd(key, {action: score})
    r.expire(key, 86400 * 90)  # 保留90天

def get_user_history(user_id: int, months: int = 3):
    now = datetime.now()
    all_results = []
    for i in range(months):
        month = (now.replace(day=1) - timedelta(days=i*30)).strftime("%Y%m")
        key = f"user:{user_id}:history:{month}"
        all_results.extend(r.zrevrange(key, 0, -1, withscores=True))
    return sorted(all_results, key=lambda x: x[1], reverse=True)

# 策略2:固定容量(只保留最近N条)
def add_capped_history(user_id: int, action: str):
    key = f"user:{user_id}:history"
    pipe = r.pipeline()
    pipe.lpush(key, action)
    pipe.ltrim(key, 0, 999)   # 只保留最近1000条
    pipe.execute()

30.6 TTL 设计原则

30.6.1 为什么所有缓存 Key 必须有 TTL

Redis 的 maxmemory 到达上限后会触发淘汰策略,可能主动删除 key。但如果所有 key 都没有 TTL,且淘汰策略配置为 noeviction(拒绝写入),生产事故概率极高。

规则:缓存 key 必须有 TTL,永久数据也应该定期刷新 TTL

30.6.2 雪崩预防:随机化 TTL

大量 key 同时设置相同 TTL(如 Redis 预热后所有 key 同时失效),会导致缓存雪崩:

import random

BASE_TTL = 3600  # 1小时

def set_with_jitter(key: str, value, base_ttl: int = BASE_TTL):
    """TTL 加随机抖动,防止同时失效"""
    jitter = random.randint(0, base_ttl // 10)  # ±10% 随机偏移
    actual_ttl = base_ttl + jitter
    r.setex(key, actual_ttl, value)

# 1000个 key 的 TTL 从 3600s 分散到 3600~3960s 之间
# 失效高峰从瞬时尖峰变成平滑曲线

30.6.3 TTL 与业务数据生命周期对齐

# 原则1:缓存 TTL > 后端数据变更频率
# 错误示例:商品价格缓存 1s TTL,每次请求都穿透到 DB
r.setex("product:price:123", 1, "99.9")  # 缓存毫无意义

# 正确:商品价格平均每小时变一次,缓存5分钟
r.setex("product:price:123", 300, "99.9")

# 原则2:不同数据按变更频率设置不同 TTL
TTL_MAPPING = {
    "user:profile":     86400,    # 用户档案,1天
    "user:session":     7200,     # 用户会话,2小时
    "product:detail":   300,      # 商品详情,5分钟
    "product:stock":    30,       # 库存,30秒
    "config:homepage":  600,      # 首页配置,10分钟
    "rate:api:*":       60,       # 限流计数,1分钟
    "lock:*":           30,       # 分布式锁,30秒(含超时保护)
}

30.6.4 TTL 刷新策略

# 滑动窗口 TTL(每次访问重置 TTL)
def get_with_sliding_ttl(key: str, ttl: int = 3600):
    """访问时刷新 TTL(类似 session 活跃续期)"""
    pipe = r.pipeline()
    pipe.get(key)
    pipe.expire(key, ttl)  # 重置 TTL
    results = pipe.execute()
    return results[0]

# 后台刷新(避免缓存失效时的请求尖峰)
import threading

class RefreshingCache:
    def __init__(self, refresh_func, ttl=3600, refresh_before_s=60):
        self.refresh_func = refresh_func
        self.ttl = ttl
        self.refresh_before_s = refresh_before_s

    def get(self, key: str):
        value = r.get(key)
        # 如果剩余 TTL < refresh_before_s,后台异步刷新
        remaining_ttl = r.ttl(key)
        if remaining_ttl < self.refresh_before_s:
            threading.Thread(
                target=self._async_refresh, args=(key,), daemon=True
            ).start()
        return value

    def _async_refresh(self, key: str):
        new_value = self.refresh_func(key)
        r.setex(key, self.ttl, new_value)

30.7 数据建模:从 RDBMS 迁移到 Redis

30.7.1 常见数据模型转换

一对多关系(用户 → 订单)

# RDBMS:orders 表有 user_id 外键
# Redis:用 Set 或 ZSet 维护关系

# 方案A:ZSet(订单时间为 score,可按时间范围查询)
ZADD user:1000:orders 1716000000 "order:20240501:12345"
ZADD user:1000:orders 1716003600 "order:20240501:12346"
ZRANGEBYSCORE user:1000:orders 1716000000 1716086400  # 查询当天订单

# 方案B:List(按写入顺序,快速取最新N个)
LPUSH user:1000:orders "order:20240501:12346"
LRANGE user:1000:orders 0 9   # 最近10个订单

多对多关系(用户 ↔ 标签)

# 为每个标签维护用户集合
SADD tag:redis:users "user:1000" "user:1001" "user:1002"
SADD tag:python:users "user:1001" "user:1003"

# 为每个用户维护标签集合
SADD user:1000:tags "redis" "distributed"
SADD user:1001:tags "redis" "python"

# 查找同时有 redis 和 python 标签的用户(交集)
SINTERSTORE result tag:redis:users tag:python:users
SMEMBERS result  # → user:1001

分页查询(有序列表)

# ZSet 天然支持分页
ZADD articles score1 "article:1" score2 "article:2" ...

# 按 score 正序分页(第2页,每页10条)
ZRANGEBYSCORE articles -inf +inf LIMIT 10 10

# 按分数倒序(最新文章)
ZREVRANGEBYSCORE articles +inf -inf LIMIT 0 10

# Redis 6.2+ 统一命令
ZRANGE articles 0 9 REV  # 从最高分开始取10个

30.7.2 避免 N+1 问题

# 错误:N+1 查询(每个 order 一次 GET)
order_ids = r.lrange("user:1000:orders", 0, 49)
orders = [r.hgetall(f"order:{oid}") for oid in order_ids]  # N次 GET!

# 正确:Pipeline 批量查询
order_ids = r.lrange("user:1000:orders", 0, 49)
pipe = r.pipeline()
for oid in order_ids:
    pipe.hgetall(f"order:{oid}")
orders = pipe.execute()  # 1次网络往返!

30.8 生产监控与运维

30.8.1 Key 数量和内存监控

# 快速检查
redis-cli INFO keyspace
# db0:keys=1500000,expires=800000,avg_ttl=3541000

redis-cli INFO memory
# used_memory_human:2.50G
# mem_fragmentation_ratio:1.15  ← 理想值 1.0-1.5

# DBSIZE(O(1))
redis-cli DBSIZE  # 返回当前 db 的 key 总数

# 按模式统计(SCAN,不阻塞)
redis-cli --scan --pattern "user:*" | wc -l

30.8.2 Key 扫描最佳实践

# 永远不要在生产环境用 KEYS(O(N),阻塞)
KEYS user:*     # 危险!1000万 key 时阻塞数秒

# 用 SCAN(分批,不阻塞)
SCAN 0 MATCH "user:*" COUNT 100
# 返回:下一个 cursor + 本批 key,直到 cursor=0

# Python 迭代扫描
for key in r.scan_iter("user:*", count=100):
    # 每批处理 ~100 个 key
    process(key)

30.9 小结与最佳实践清单

Key 设计是 Redis 系统工程的基础,以下是本章核心实践的提炼:

命名规范

序列化

热点 Key

大 Key

TTL 设计

本章评分
4.7  / 5  (3 评分)

💬 留言讨论