第 36 章

10 大业务场景实现:排行榜到实时限流

第36章 10大业务场景实现:排行榜到实时限流

Redis 在工程实践中解决了大量高频业务问题。本章系统梳理10个经典场景,每个场景包含完整设计思路、核心命令、生产代码和踩坑总结。


1. 排行榜(ZSET)

1.1 实时更新

有序集合是排行榜的天然载体。得分更新使用 ZINCRBY,原子累加避免并发覆盖:

ZINCRBY game:rank 50 user:1001    # 用户1001增加50分
ZINCRBY game:rank -10 user:1002   # 减分也支持

1.2 分页查询

# 正序(分数从小到大)
ZRANGE game:rank 0 9 WITHSCORES

# 逆序(排行榜常用,分数从大到小)
ZREVRANGE game:rank 0 9 WITHSCORES

# 按分数范围+分页(Redis 6.2+ 推荐 ZRANGEBYSCORE)
ZREVRANGEBYSCORE game:rank +inf -inf WITHSCORES LIMIT 0 10

# 查询用户当前排名(0-based)
ZREVRANK game:rank user:1001

# 查询用户分数
ZSCORE game:rank user:1001

1.3 周榜 / 月榜

按时间维度分 key,定时 job 清理或归并:

import redis
from datetime import datetime

r = redis.Redis()

def add_score(user_id: str, score: float):
    now = datetime.now()
    # 全局榜
    r.zincrby("game:rank:all", score, user_id)
    # 周榜(ISO周)
    week_key = f"game:rank:{now.year}:W{now.isocalendar()[1]:02d}"
    r.zincrby(week_key, score, user_id)
    r.expire(week_key, 8 * 86400)  # 8天后过期
    # 月榜
    month_key = f"game:rank:{now.year}:{now.month:02d}"
    r.zincrby(month_key, score, user_id)
    r.expire(month_key, 35 * 86400)

def get_top(key: str, n: int = 10):
    return r.zrevrange(key, 0, n - 1, withscores=True)

1.4 超大排行榜(亿级)

单个 ZSET 内存有限,亿级用户需要分片:

SHARD_COUNT = 100

def shard_key(user_id: str) -> str:
    shard = hash(user_id) % SHARD_COUNT
    return f"game:rank:shard:{shard}"

def add_score_sharded(user_id: str, score: float):
    r.zincrby(shard_key(user_id), score, user_id)

def get_global_top(n: int = 100):
    """从各分片各取top-N,归并排序取全局top-N"""
    candidates = []
    for i in range(SHARD_COUNT):
        key = f"game:rank:shard:{i}"
        top = r.zrevrange(key, 0, n - 1, withscores=True)
        candidates.extend(top)
    candidates.sort(key=lambda x: x[1], reverse=True)
    return candidates[:n]

生产注意:分片全局 top 精度依赖各分片 top-N 覆盖,N 越大精度越高,但归并成本也越高。通常 N=200 足够。


2. 用户签到(Bitmap)

2.1 签到与查询

# 签到:第15天
SETBIT sign:2024:01:1001 14 1    # bit offset 从0开始,第15天=offset 14

# 查询某天是否签到
GETBIT sign:2024:01:1001 14      # 返回 1 或 0

# 月签到次数(已签到天数)
BITCOUNT sign:2024:01:1001

# 某范围内签到次数(byte范围)
BITCOUNT sign:2024:01:1001 0 3   # 前4字节=前32天

2.2 连续签到天数

def get_consecutive_days(user_id: str, year: int, month: int) -> int:
    """从今天往前数连续签到天数"""
    from datetime import date
    today = date.today()
    key = f"sign:{year}:{month:02d}:{user_id}"
    
    # 获取整月bitmap(字节串)
    bits = r.getrange(key, 0, -1)
    if not bits:
        return 0
    
    # 转为二进制字符串,逐位检查
    bit_str = ''.join(f'{b:08b}' for b in bits)
    
    count = 0
    # 从今天对应的bit往前遍历
    day_offset = today.day - 1
    for i in range(day_offset, -1, -1):
        if i < len(bit_str) and bit_str[i] == '1':
            count += 1
        else:
            break
    return count

2.3 内存估算

31天/用户 = 31 bits ≈ 4 bytes。1亿用户全月签到数据:100_000_000 × 4 = 400MB,非常经济。


3. UV 统计(HyperLogLog)

HyperLogLog 以极小内存(12KB)实现亿级基数估算,误差约 0.81%。

# 记录访问
PFADD uv:2024:01:15 user:1001 user:1002 user:1003

# 查询 UV
PFCOUNT uv:2024:01:15           # 单日UV
PFCOUNT uv:2024:01:14 uv:2024:01:15  # 多日合并UV(非精确去重)

# 周UV合并到新key(精确去重多日)
PFMERGE uv:week:2024:03 \
    uv:2024:01:15 uv:2024:01:16 uv:2024:01:17 \
    uv:2024:01:18 uv:2024:01:19 uv:2024:01:20 uv:2024:01:21
PFCOUNT uv:week:2024:03
import redis
from datetime import date, timedelta

r = redis.Redis()

def record_visit(user_id: str):
    today = date.today().strftime("%Y:%m:%d")
    r.pfadd(f"uv:{today}", user_id)

def get_weekly_uv(start_date: date) -> int:
    keys = [f"uv:{(start_date + timedelta(days=i)).strftime('%Y:%m:%d')}" 
            for i in range(7)]
    week_key = f"uv:week:{start_date.strftime('%Y:%m:%d')}"
    r.pfmerge(week_key, *keys)
    r.expire(week_key, 30 * 86400)
    return r.pfcount(week_key)

适用场景:UV/DAU/MAU 统计,不要求精确到个位数时优先选用。精确统计改用 SET 或 Bitmap(用户ID数值化后)。


4. 附近的人(GEO)

Redis GEO 基于 Geohash 存储在 ZSET 中,支持半径查询。

# 添加位置(经度, 纬度, 成员名)
GEOADD locations 116.397128 39.916527 user:1001
GEOADD locations 121.472644 31.231706 user:1002

# 查找附近(Redis 6.2+ 推荐 GEOSEARCH)
GEOSEARCH locations FROMMEMBER user:1001 BYRADIUS 5 km ASC COUNT 20 WITHCOORD WITHDIST

# 查找附近(指定坐标为中心)
GEOSEARCH locations FROMLONLAT 116.4 39.9 BYRADIUS 3 km ASC COUNT 10

# 距离计算
GEODIST locations user:1001 user:1002 km

# 获取坐标
GEOPOS locations user:1001

# 获取 geohash 字符串
GEOHASH locations user:1001
def find_nearby(user_id: str, radius_km: float, limit: int = 20):
    """查找附近用户,返回(user_id, 距离km)列表"""
    results = r.geosearch(
        "locations",
        member=user_id,
        radius=radius_km,
        unit="km",
        sort="ASC",
        count=limit,
        withdist=True,
        withcoord=False,
    )
    # 过滤掉自己
    return [(uid.decode(), dist) for uid, dist, *_ in results 
            if uid.decode() != user_id]

生产注意:GEO 数据存在单个 ZSET 中,百万级成员无压力。但要注意隐私:不要将真实坐标直接暴露给客户端,用 GEODIST 返回距离而非坐标。


5. 抢购秒杀(Lua 原子扣减)

5.1 核心 Lua 脚本

-- deduct_stock.lua
-- KEYS[1]: 库存 key(如 stock:sku:10086)
-- ARGV[1]: 扣减数量
-- 返回:1成功,0库存不足,-1库存key不存在

local stock = redis.call('get', KEYS[1])
if stock == false then
    return -1
end
stock = tonumber(stock)
local requested = tonumber(ARGV[1])
if stock < requested then
    return 0
end
redis.call('decrby', KEYS[1], requested)
return 1
import redis

r = redis.Redis()

# 注册脚本(获得 SHA)
DEDUCT_STOCK_SCRIPT = r.register_script(open("deduct_stock.lua").read())

def seckill(sku_id: str, user_id: str, quantity: int = 1) -> bool:
    """
    秒杀入口
    返回 True 表示扣减成功,False 表示失败
    """
    stock_key = f"stock:sku:{sku_id}"
    result = DEDUCT_STOCK_SCRIPT(keys=[stock_key], args=[quantity])
    
    if result == 1:
        # 写入消息队列,异步创建订单
        order_data = {
            "sku_id": sku_id,
            "user_id": user_id,
            "quantity": quantity,
        }
        r.xadd("orders:pending", order_data)
        return True
    return False

5.2 预热与防超卖

def warmup_stock(sku_id: str, stock: int, expire_seconds: int = 3600):
    """活动开始前预热库存"""
    key = f"stock:sku:{sku_id}"
    pipe = r.pipeline()
    pipe.set(key, stock)
    pipe.expire(key, expire_seconds)
    pipe.execute()

防超卖要点


6. 社交关系(Set 操作)

# 关注 / 取关
SADD following:1001 1002 1003
SREM following:1001 1002

# 粉丝
SADD followers:1002 1001

# 共同关注
SINTER following:1001 following:1002

# 推荐好友:1002关注但我(1001)未关注的
SDIFF following:1002 following:1001

# 互相关注(是否互粉)
SISMEMBER following:1002 1001   # 1002是否关注了1001

# 关注数 / 粉丝数
SCARD following:1001
SCARD followers:1001
def get_mutual_friends(user_a: str, user_b: str):
    """共同关注列表"""
    return r.sinter(f"following:{user_a}", f"following:{user_b}")

def get_recommendations(user_id: str, friend_id: str, limit: int = 20):
    """好友关注但我未关注的用户(推荐关注)"""
    result_key = f"rec:{user_id}:tmp"
    r.sdiffstore(result_key, f"following:{friend_id}", f"following:{user_id}")
    r.expire(result_key, 60)
    recs = r.srandmember(result_key, limit)
    return [uid.decode() for uid in recs]

7. 消息队列对比:List vs Stream

特性 List (LPUSH/BRPOP) Stream
消费者组 不支持 支持(XGROUP)
ACK 确认 不支持 支持(XACK)
未确认消息追踪(PEL) 不支持 支持
消息重放 不支持 支持(指定ID)
消息持久化 依赖 AOF/RDB 同上
多消费者并行 需自行分片 原生支持

Stream 示例

# 生产者
r.xadd("orders:stream", {"order_id": "ORD001", "amount": "99.00"})

# 创建消费者组
r.xgroup_create("orders:stream", "payment_group", id="0", mkstream=True)

# 消费者读取(阻塞等待最多1秒)
messages = r.xreadgroup(
    groupname="payment_group",
    consumername="worker-1",
    streams={"orders:stream": ">"},  # > 表示只读新消息
    count=10,
    block=1000,
)

for stream_name, entries in messages:
    for msg_id, fields in entries:
        print(msg_id, fields)
        # 处理完成后 ACK
        r.xack("orders:stream", "payment_group", msg_id)

结论:新项目统一使用 Stream,List 仅用于简单的单消费者任务队列。


8. Session 共享

# 存储(HMSET 在 Redis 4.0+ 可用 HSET 替代)
HSET session:abc123 user_id 1001 name Alice role admin created_at 1700000000

# 读取全部字段
HGETALL session:abc123

# 读取单个字段
HGET session:abc123 user_id

# 续期(每次请求刷新过期时间)
EXPIRE session:abc123 1800

# 注销(删除)
DEL session:abc123
import uuid
import time
from typing import Optional

SESSION_TTL = 1800  # 30分钟

def create_session(user_id: str, name: str, role: str) -> str:
    token = str(uuid.uuid4())   # UUID v4,128位随机
    key = f"session:{token}"
    r.hset(key, mapping={
        "user_id": user_id,
        "name": name,
        "role": role,
        "created_at": int(time.time()),
    })
    r.expire(key, SESSION_TTL)
    return token

def get_session(token: str) -> Optional[dict]:
    key = f"session:{token}"
    data = r.hgetall(key)
    if not data:
        return None
    # 续期
    r.expire(key, SESSION_TTL)
    return {k.decode(): v.decode() for k, v in data.items()}

安全要点


9. 令牌桶限流(Lua 脚本)

令牌桶算法支持突发流量,平均速率可控。

-- token_bucket.lua
-- KEYS[1]: 限流 key
-- ARGV[1]: 桶容量 capacity
-- ARGV[2]: 每秒补充速率 rate(tokens/ms)
-- ARGV[3]: 当前时间戳(毫秒)
-- ARGV[4]: 本次请求令牌数
-- 返回:1 允许,0 拒绝

local key      = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate     = tonumber(ARGV[2])
local now      = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local last_tokens = tonumber(redis.call('hget', key, 'tokens'))
local last_time   = tonumber(redis.call('hget', key, 'time'))

if last_tokens == nil then last_tokens = capacity end
if last_time   == nil then last_time   = now      end

-- 按时间差补充令牌
local elapsed   = math.max(0, now - last_time)
local new_tokens = math.min(capacity, last_tokens + elapsed * rate / 1000)

if new_tokens >= requested then
    redis.call('hset', key, 'tokens', new_tokens - requested, 'time', now)
    -- 自动过期:桶填满需要 capacity/rate 秒,额外+1秒
    redis.call('expire', key, math.ceil(capacity / rate) + 1)
    return 1
else
    -- 更新时间戳,避免时间窗口被"借用"
    redis.call('hset', key, 'time', now)
    return 0
end
import time
import redis

r = redis.Redis()

TOKEN_BUCKET_SCRIPT = r.register_script(open("token_bucket.lua").read())

def is_allowed(user_id: str, capacity: int = 100, rate: float = 10) -> bool:
    """
    capacity: 桶最大令牌数(突发上限)
    rate: 每秒补充令牌数(平均速率)
    """
    key = f"ratelimit:tb:{user_id}"
    now_ms = int(time.time() * 1000)
    result = TOKEN_BUCKET_SCRIPT(
        keys=[key],
        args=[capacity, rate, now_ms, 1]
    )
    return result == 1

10. 滑动窗口统计(ZSET)

滑动窗口精确统计任意时间段内的请求数,适合 API 限流和行为分析。

# 记录请求(score = 时间戳ms,member = 唯一请求ID)
ZADD requests:user:1001 1700000000000 req:uuid1
ZADD requests:user:1001 1700000001000 req:uuid2

# 统计最近60秒的请求数
ZCOUNT requests:user:1001 1699999940000 1700000000000

# 清理过期记录(60秒前)
ZREMRANGEBYSCORE requests:user:1001 0 1699999940000
import time
import uuid

WINDOW_MS = 60_000    # 60秒窗口
MAX_REQUESTS = 100    # 窗口内最大请求数

def check_rate_limit(user_id: str) -> bool:
    now_ms = int(time.time() * 1000)
    window_start = now_ms - WINDOW_MS
    key = f"requests:{user_id}"
    
    pipe = r.pipeline()
    # 记录本次请求
    pipe.zadd(key, {str(uuid.uuid4()): now_ms})
    # 清理窗口外的记录
    pipe.zremrangebyscore(key, 0, window_start)
    # 统计当前窗口内请求数
    pipe.zcard(key)
    # 设置过期(窗口时长 + 1秒)
    pipe.expire(key, WINDOW_MS // 1000 + 1)
    results = pipe.execute()
    
    current_count = results[2]
    return current_count <= MAX_REQUESTS

与令牌桶对比


本章总结

场景 数据结构 核心命令
排行榜 ZSET ZINCRBY / ZREVRANGE
签到 Bitmap SETBIT / BITCOUNT
UV统计 HyperLogLog PFADD / PFCOUNT / PFMERGE
附近的人 GEO(ZSET) GEOADD / GEOSEARCH
秒杀扣减 String + Lua GET / DECRBY (原子)
社交关系 Set SINTER / SDIFF
消息队列 Stream XADD / XREADGROUP / XACK
Session Hash HSET / HGETALL / EXPIRE
令牌桶限流 Hash + Lua HGET / HSET / EXPIRE
滑动窗口 ZSET ZADD / ZCOUNT / ZREMRANGEBYSCORE

核心原则:选对数据结构,原子操作用 Lua,高并发场景提前压测验证。

本章评分
4.6  / 5  (3 评分)

💬 留言讨论