第 36 章
10 大业务场景实现:排行榜到实时限流
第36章 10大业务场景实现:排行榜到实时限流
Redis 在工程实践中解决了大量高频业务问题。本章系统梳理10个经典场景,每个场景包含完整设计思路、核心命令、生产代码和踩坑总结。
1. 排行榜(ZSET)
1.1 实时更新
有序集合是排行榜的天然载体。得分更新使用 ZINCRBY,原子累加避免并发覆盖:
ZINCRBY game:rank 50 user:1001 # 用户1001增加50分
ZINCRBY game:rank -10 user:1002 # 减分也支持
1.2 分页查询
# 正序(分数从小到大)
ZRANGE game:rank 0 9 WITHSCORES
# 逆序(排行榜常用,分数从大到小)
ZREVRANGE game:rank 0 9 WITHSCORES
# 按分数范围+分页(Redis 6.2+ 推荐 ZRANGEBYSCORE)
ZREVRANGEBYSCORE game:rank +inf -inf WITHSCORES LIMIT 0 10
# 查询用户当前排名(0-based)
ZREVRANK game:rank user:1001
# 查询用户分数
ZSCORE game:rank user:1001
1.3 周榜 / 月榜
按时间维度分 key,定时 job 清理或归并:
import redis
from datetime import datetime
r = redis.Redis()
def add_score(user_id: str, score: float):
now = datetime.now()
# 全局榜
r.zincrby("game:rank:all", score, user_id)
# 周榜(ISO周)
week_key = f"game:rank:{now.year}:W{now.isocalendar()[1]:02d}"
r.zincrby(week_key, score, user_id)
r.expire(week_key, 8 * 86400) # 8天后过期
# 月榜
month_key = f"game:rank:{now.year}:{now.month:02d}"
r.zincrby(month_key, score, user_id)
r.expire(month_key, 35 * 86400)
def get_top(key: str, n: int = 10):
return r.zrevrange(key, 0, n - 1, withscores=True)
1.4 超大排行榜(亿级)
单个 ZSET 内存有限,亿级用户需要分片:
SHARD_COUNT = 100
def shard_key(user_id: str) -> str:
shard = hash(user_id) % SHARD_COUNT
return f"game:rank:shard:{shard}"
def add_score_sharded(user_id: str, score: float):
r.zincrby(shard_key(user_id), score, user_id)
def get_global_top(n: int = 100):
"""从各分片各取top-N,归并排序取全局top-N"""
candidates = []
for i in range(SHARD_COUNT):
key = f"game:rank:shard:{i}"
top = r.zrevrange(key, 0, n - 1, withscores=True)
candidates.extend(top)
candidates.sort(key=lambda x: x[1], reverse=True)
return candidates[:n]
生产注意:分片全局 top 精度依赖各分片 top-N 覆盖,N 越大精度越高,但归并成本也越高。通常 N=200 足够。
2. 用户签到(Bitmap)
2.1 签到与查询
# 签到:第15天
SETBIT sign:2024:01:1001 14 1 # bit offset 从0开始,第15天=offset 14
# 查询某天是否签到
GETBIT sign:2024:01:1001 14 # 返回 1 或 0
# 月签到次数(已签到天数)
BITCOUNT sign:2024:01:1001
# 某范围内签到次数(byte范围)
BITCOUNT sign:2024:01:1001 0 3 # 前4字节=前32天
2.2 连续签到天数
def get_consecutive_days(user_id: str, year: int, month: int) -> int:
"""从今天往前数连续签到天数"""
from datetime import date
today = date.today()
key = f"sign:{year}:{month:02d}:{user_id}"
# 获取整月bitmap(字节串)
bits = r.getrange(key, 0, -1)
if not bits:
return 0
# 转为二进制字符串,逐位检查
bit_str = ''.join(f'{b:08b}' for b in bits)
count = 0
# 从今天对应的bit往前遍历
day_offset = today.day - 1
for i in range(day_offset, -1, -1):
if i < len(bit_str) and bit_str[i] == '1':
count += 1
else:
break
return count
2.3 内存估算
31天/用户 = 31 bits ≈ 4 bytes。1亿用户全月签到数据:100_000_000 × 4 = 400MB,非常经济。
3. UV 统计(HyperLogLog)
HyperLogLog 以极小内存(12KB)实现亿级基数估算,误差约 0.81%。
# 记录访问
PFADD uv:2024:01:15 user:1001 user:1002 user:1003
# 查询 UV
PFCOUNT uv:2024:01:15 # 单日UV
PFCOUNT uv:2024:01:14 uv:2024:01:15 # 多日合并UV(非精确去重)
# 周UV合并到新key(精确去重多日)
PFMERGE uv:week:2024:03 \
uv:2024:01:15 uv:2024:01:16 uv:2024:01:17 \
uv:2024:01:18 uv:2024:01:19 uv:2024:01:20 uv:2024:01:21
PFCOUNT uv:week:2024:03
import redis
from datetime import date, timedelta
r = redis.Redis()
def record_visit(user_id: str):
today = date.today().strftime("%Y:%m:%d")
r.pfadd(f"uv:{today}", user_id)
def get_weekly_uv(start_date: date) -> int:
keys = [f"uv:{(start_date + timedelta(days=i)).strftime('%Y:%m:%d')}"
for i in range(7)]
week_key = f"uv:week:{start_date.strftime('%Y:%m:%d')}"
r.pfmerge(week_key, *keys)
r.expire(week_key, 30 * 86400)
return r.pfcount(week_key)
适用场景:UV/DAU/MAU 统计,不要求精确到个位数时优先选用。精确统计改用 SET 或 Bitmap(用户ID数值化后)。
4. 附近的人(GEO)
Redis GEO 基于 Geohash 存储在 ZSET 中,支持半径查询。
# 添加位置(经度, 纬度, 成员名)
GEOADD locations 116.397128 39.916527 user:1001
GEOADD locations 121.472644 31.231706 user:1002
# 查找附近(Redis 6.2+ 推荐 GEOSEARCH)
GEOSEARCH locations FROMMEMBER user:1001 BYRADIUS 5 km ASC COUNT 20 WITHCOORD WITHDIST
# 查找附近(指定坐标为中心)
GEOSEARCH locations FROMLONLAT 116.4 39.9 BYRADIUS 3 km ASC COUNT 10
# 距离计算
GEODIST locations user:1001 user:1002 km
# 获取坐标
GEOPOS locations user:1001
# 获取 geohash 字符串
GEOHASH locations user:1001
def find_nearby(user_id: str, radius_km: float, limit: int = 20):
"""查找附近用户,返回(user_id, 距离km)列表"""
results = r.geosearch(
"locations",
member=user_id,
radius=radius_km,
unit="km",
sort="ASC",
count=limit,
withdist=True,
withcoord=False,
)
# 过滤掉自己
return [(uid.decode(), dist) for uid, dist, *_ in results
if uid.decode() != user_id]
生产注意:GEO 数据存在单个 ZSET 中,百万级成员无压力。但要注意隐私:不要将真实坐标直接暴露给客户端,用 GEODIST 返回距离而非坐标。
5. 抢购秒杀(Lua 原子扣减)
5.1 核心 Lua 脚本
-- deduct_stock.lua
-- KEYS[1]: 库存 key(如 stock:sku:10086)
-- ARGV[1]: 扣减数量
-- 返回:1成功,0库存不足,-1库存key不存在
local stock = redis.call('get', KEYS[1])
if stock == false then
return -1
end
stock = tonumber(stock)
local requested = tonumber(ARGV[1])
if stock < requested then
return 0
end
redis.call('decrby', KEYS[1], requested)
return 1
import redis
r = redis.Redis()
# 注册脚本(获得 SHA)
DEDUCT_STOCK_SCRIPT = r.register_script(open("deduct_stock.lua").read())
def seckill(sku_id: str, user_id: str, quantity: int = 1) -> bool:
"""
秒杀入口
返回 True 表示扣减成功,False 表示失败
"""
stock_key = f"stock:sku:{sku_id}"
result = DEDUCT_STOCK_SCRIPT(keys=[stock_key], args=[quantity])
if result == 1:
# 写入消息队列,异步创建订单
order_data = {
"sku_id": sku_id,
"user_id": user_id,
"quantity": quantity,
}
r.xadd("orders:pending", order_data)
return True
return False
5.2 预热与防超卖
def warmup_stock(sku_id: str, stock: int, expire_seconds: int = 3600):
"""活动开始前预热库存"""
key = f"stock:sku:{sku_id}"
pipe = r.pipeline()
pipe.set(key, stock)
pipe.expire(key, expire_seconds)
pipe.execute()
防超卖要点:
- Lua 脚本保证 GET-判断-DECRBY 原子性
- 库存值不允许为负(脚本中已判断
stock < requested) - 异步创建订单,不在 Redis 层做订单落库
6. 社交关系(Set 操作)
# 关注 / 取关
SADD following:1001 1002 1003
SREM following:1001 1002
# 粉丝
SADD followers:1002 1001
# 共同关注
SINTER following:1001 following:1002
# 推荐好友:1002关注但我(1001)未关注的
SDIFF following:1002 following:1001
# 互相关注(是否互粉)
SISMEMBER following:1002 1001 # 1002是否关注了1001
# 关注数 / 粉丝数
SCARD following:1001
SCARD followers:1001
def get_mutual_friends(user_a: str, user_b: str):
"""共同关注列表"""
return r.sinter(f"following:{user_a}", f"following:{user_b}")
def get_recommendations(user_id: str, friend_id: str, limit: int = 20):
"""好友关注但我未关注的用户(推荐关注)"""
result_key = f"rec:{user_id}:tmp"
r.sdiffstore(result_key, f"following:{friend_id}", f"following:{user_id}")
r.expire(result_key, 60)
recs = r.srandmember(result_key, limit)
return [uid.decode() for uid in recs]
7. 消息队列对比:List vs Stream
| 特性 | List (LPUSH/BRPOP) | Stream |
|---|---|---|
| 消费者组 | 不支持 | 支持(XGROUP) |
| ACK 确认 | 不支持 | 支持(XACK) |
| 未确认消息追踪(PEL) | 不支持 | 支持 |
| 消息重放 | 不支持 | 支持(指定ID) |
| 消息持久化 | 依赖 AOF/RDB | 同上 |
| 多消费者并行 | 需自行分片 | 原生支持 |
Stream 示例:
# 生产者
r.xadd("orders:stream", {"order_id": "ORD001", "amount": "99.00"})
# 创建消费者组
r.xgroup_create("orders:stream", "payment_group", id="0", mkstream=True)
# 消费者读取(阻塞等待最多1秒)
messages = r.xreadgroup(
groupname="payment_group",
consumername="worker-1",
streams={"orders:stream": ">"}, # > 表示只读新消息
count=10,
block=1000,
)
for stream_name, entries in messages:
for msg_id, fields in entries:
print(msg_id, fields)
# 处理完成后 ACK
r.xack("orders:stream", "payment_group", msg_id)
结论:新项目统一使用 Stream,List 仅用于简单的单消费者任务队列。
8. Session 共享
# 存储(HMSET 在 Redis 4.0+ 可用 HSET 替代)
HSET session:abc123 user_id 1001 name Alice role admin created_at 1700000000
# 读取全部字段
HGETALL session:abc123
# 读取单个字段
HGET session:abc123 user_id
# 续期(每次请求刷新过期时间)
EXPIRE session:abc123 1800
# 注销(删除)
DEL session:abc123
import uuid
import time
from typing import Optional
SESSION_TTL = 1800 # 30分钟
def create_session(user_id: str, name: str, role: str) -> str:
token = str(uuid.uuid4()) # UUID v4,128位随机
key = f"session:{token}"
r.hset(key, mapping={
"user_id": user_id,
"name": name,
"role": role,
"created_at": int(time.time()),
})
r.expire(key, SESSION_TTL)
return token
def get_session(token: str) -> Optional[dict]:
key = f"session:{token}"
data = r.hgetall(key)
if not data:
return None
# 续期
r.expire(key, SESSION_TTL)
return {k.decode(): v.decode() for k, v in data.items()}
安全要点:
- token 使用 UUID v4(122位随机),不可预测
- 传输层必须 HTTPS,防止 token 被中间人截获
- 敏感操作额外校验(二次密码/MFA)
9. 令牌桶限流(Lua 脚本)
令牌桶算法支持突发流量,平均速率可控。
-- token_bucket.lua
-- KEYS[1]: 限流 key
-- ARGV[1]: 桶容量 capacity
-- ARGV[2]: 每秒补充速率 rate(tokens/ms)
-- ARGV[3]: 当前时间戳(毫秒)
-- ARGV[4]: 本次请求令牌数
-- 返回:1 允许,0 拒绝
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local last_tokens = tonumber(redis.call('hget', key, 'tokens'))
local last_time = tonumber(redis.call('hget', key, 'time'))
if last_tokens == nil then last_tokens = capacity end
if last_time == nil then last_time = now end
-- 按时间差补充令牌
local elapsed = math.max(0, now - last_time)
local new_tokens = math.min(capacity, last_tokens + elapsed * rate / 1000)
if new_tokens >= requested then
redis.call('hset', key, 'tokens', new_tokens - requested, 'time', now)
-- 自动过期:桶填满需要 capacity/rate 秒,额外+1秒
redis.call('expire', key, math.ceil(capacity / rate) + 1)
return 1
else
-- 更新时间戳,避免时间窗口被"借用"
redis.call('hset', key, 'time', now)
return 0
end
import time
import redis
r = redis.Redis()
TOKEN_BUCKET_SCRIPT = r.register_script(open("token_bucket.lua").read())
def is_allowed(user_id: str, capacity: int = 100, rate: float = 10) -> bool:
"""
capacity: 桶最大令牌数(突发上限)
rate: 每秒补充令牌数(平均速率)
"""
key = f"ratelimit:tb:{user_id}"
now_ms = int(time.time() * 1000)
result = TOKEN_BUCKET_SCRIPT(
keys=[key],
args=[capacity, rate, now_ms, 1]
)
return result == 1
10. 滑动窗口统计(ZSET)
滑动窗口精确统计任意时间段内的请求数,适合 API 限流和行为分析。
# 记录请求(score = 时间戳ms,member = 唯一请求ID)
ZADD requests:user:1001 1700000000000 req:uuid1
ZADD requests:user:1001 1700000001000 req:uuid2
# 统计最近60秒的请求数
ZCOUNT requests:user:1001 1699999940000 1700000000000
# 清理过期记录(60秒前)
ZREMRANGEBYSCORE requests:user:1001 0 1699999940000
import time
import uuid
WINDOW_MS = 60_000 # 60秒窗口
MAX_REQUESTS = 100 # 窗口内最大请求数
def check_rate_limit(user_id: str) -> bool:
now_ms = int(time.time() * 1000)
window_start = now_ms - WINDOW_MS
key = f"requests:{user_id}"
pipe = r.pipeline()
# 记录本次请求
pipe.zadd(key, {str(uuid.uuid4()): now_ms})
# 清理窗口外的记录
pipe.zremrangebyscore(key, 0, window_start)
# 统计当前窗口内请求数
pipe.zcard(key)
# 设置过期(窗口时长 + 1秒)
pipe.expire(key, WINDOW_MS // 1000 + 1)
results = pipe.execute()
current_count = results[2]
return current_count <= MAX_REQUESTS
与令牌桶对比:
- 滑动窗口:精确,但每个用户存储所有请求 ID,内存用量随 QPS 增长
- 令牌桶:近似,内存固定(只存 tokens + time),高并发优选
本章总结
| 场景 | 数据结构 | 核心命令 |
|---|---|---|
| 排行榜 | ZSET | ZINCRBY / ZREVRANGE |
| 签到 | Bitmap | SETBIT / BITCOUNT |
| UV统计 | HyperLogLog | PFADD / PFCOUNT / PFMERGE |
| 附近的人 | GEO(ZSET) | GEOADD / GEOSEARCH |
| 秒杀扣减 | String + Lua | GET / DECRBY (原子) |
| 社交关系 | Set | SINTER / SDIFF |
| 消息队列 | Stream | XADD / XREADGROUP / XACK |
| Session | Hash | HSET / HGETALL / EXPIRE |
| 令牌桶限流 | Hash + Lua | HGET / HSET / EXPIRE |
| 滑动窗口 | ZSET | ZADD / ZCOUNT / ZREMRANGEBYSCORE |
核心原则:选对数据结构,原子操作用 Lua,高并发场景提前压测验证。