Chapter 36

10 Real Use Cases: Leaderboards to Real-Time Rate Limiting

Chapter 36: 10 Production Use Cases: Leaderboards to Real-Time Rate Limiting

Redis excels at solving high-frequency engineering problems. This chapter systematically covers ten canonical use cases โ€” each with a complete design rationale, core commands, production-quality code, and hard-won lessons from the field.


1. Leaderboards (ZSET)

1.1 Real-Time Score Updates

Sorted sets are the natural fit for leaderboards. Use ZINCRBY to atomically accumulate scores, avoiding race conditions:

ZINCRBY game:rank 50 user:1001    # add 50 points to user:1001
ZINCRBY game:rank -10 user:1002   # deduct 10 points (fully supported)

1.2 Paginated Queries

# Ascending (lowest score first)
ZRANGE game:rank 0 9 WITHSCORES

# Descending (leaderboard standard โ€” highest score first)
ZREVRANGE game:rank 0 9 WITHSCORES

# Score-range query with pagination (Redis 6.2+ unified command)
ZRANGEBYSCORE game:rank +inf -inf WITHSCORES LIMIT 0 10

# Get a user's rank (0-based index)
ZREVRANK game:rank user:1001

# Get a user's score
ZSCORE game:rank user:1001

1.3 Weekly / Monthly Leaderboards

Partition by time in the key name; a scheduled job handles cleanup or merge:

import redis
from datetime import datetime

r = redis.Redis()

def add_score(user_id: str, score: float):
    now = datetime.now()
    # Global leaderboard
    r.zincrby("game:rank:all", score, user_id)
    # Weekly (ISO week number)
    week_key = f"game:rank:{now.year}:W{now.isocalendar()[1]:02d}"
    r.zincrby(week_key, score, user_id)
    r.expire(week_key, 8 * 86400)        # expire 1 day after the week ends
    # Monthly
    month_key = f"game:rank:{now.year}:{now.month:02d}"
    r.zincrby(month_key, score, user_id)
    r.expire(month_key, 35 * 86400)

def get_top(key: str, n: int = 10):
    return r.zrevrange(key, 0, n - 1, withscores=True)

1.4 Massive Leaderboards (Hundreds of Millions of Users)

A single ZSET can hold tens of millions of members without degradation, but at truly massive scale, shard across multiple sorted sets:

SHARD_COUNT = 100

def shard_key(user_id: str) -> str:
    shard = hash(user_id) % SHARD_COUNT
    return f"game:rank:shard:{shard}"

def add_score_sharded(user_id: str, score: float):
    r.zincrby(shard_key(user_id), score, user_id)

def get_global_top(n: int = 100):
    """
    Pull top-N from every shard, merge-sort, return global top-N.
    Accuracy increases with N; N=200 covers virtually all production needs.
    """
    candidates = []
    for i in range(SHARD_COUNT):
        key = f"game:rank:shard:{i}"
        top = r.zrevrange(key, 0, n - 1, withscores=True)
        candidates.extend(top)
    candidates.sort(key=lambda x: x[1], reverse=True)
    return candidates[:n]

2. Daily Check-Ins (Bitmap)

2.1 Recording and Querying Check-Ins

# User 1001 checks in on day 15 of January 2024
SETBIT sign:2024:01:1001 14 1       # bit offset is 0-based; day 15 = offset 14

# Did user 1001 check in on day 15?
GETBIT sign:2024:01:1001 14         # returns 1 (yes) or 0 (no)

# Total days checked in this month
BITCOUNT sign:2024:01:1001

# Days checked in during the first 32 days (byte-range query)
BITCOUNT sign:2024:01:1001 0 3

2.2 Computing Consecutive Days

from datetime import date

def get_consecutive_days(user_id: str, year: int, month: int) -> int:
    """Count how many consecutive days the user has checked in up to today."""
    today = date.today()
    key = f"sign:{year}:{month:02d}:{user_id}"
    
    raw_bytes = r.getrange(key, 0, -1)
    if not raw_bytes:
        return 0
    
    # Convert to binary string
    bit_str = ''.join(f'{b:08b}' for b in raw_bytes)
    
    count = 0
    for i in range(today.day - 1, -1, -1):   # walk backwards from today
        if i < len(bit_str) and bit_str[i] == '1':
            count += 1
        else:
            break
    return count

2.3 Memory Efficiency

31 days per user = 31 bits โ‰ˆ 4 bytes. For 100 million users: 100,000,000 ร— 4 bytes = 400 MB for a full month โ€” extraordinarily compact.


3. Unique Visitor Counting (HyperLogLog)

HyperLogLog uses a fixed 12 KB to estimate cardinality up to billions with ~0.81% error.

# Record a visit
PFADD uv:2024:01:15 user:1001 user:1002 user:1003

# Query daily UV
PFCOUNT uv:2024:01:15

# Multi-day combined UV (approximate de-duplication)
PFCOUNT uv:2024:01:14 uv:2024:01:15

# Merge a week into a single key for accurate weekly UV
PFMERGE uv:week:2024:03 \
    uv:2024:01:15 uv:2024:01:16 uv:2024:01:17 \
    uv:2024:01:18 uv:2024:01:19 uv:2024:01:20 uv:2024:01:21
PFCOUNT uv:week:2024:03
import redis
from datetime import date, timedelta

r = redis.Redis()

def record_visit(user_id: str):
    today = date.today().strftime("%Y:%m:%d")
    r.pfadd(f"uv:{today}", user_id)

def get_weekly_uv(start_date: date) -> int:
    keys = [f"uv:{(start_date + timedelta(days=i)).strftime('%Y:%m:%d')}"
            for i in range(7)]
    week_key = f"uv:week:{start_date.strftime('%Y:%m:%d')}"
    r.pfmerge(week_key, *keys)
    r.expire(week_key, 30 * 86400)
    return r.pfcount(week_key)

When to use: UV/DAU/MAU statistics where exact-to-the-unit precision is not required. For exact counts, use Bitmap (with integer user IDs) or SET.


4. Nearby Users (GEO)

Redis GEO stores coordinates as Geohash-encoded scores in a ZSET, enabling radius queries with sub-millisecond latency.

# Add positions (longitude, latitude, member)
GEOADD locations 116.397128 39.916527 user:1001
GEOADD locations 121.472644 31.231706 user:1002

# Find neighbors within 5 km, sorted by distance (Redis 6.2+)
GEOSEARCH locations FROMMEMBER user:1001 BYRADIUS 5 km ASC COUNT 20 WITHCOORD WITHDIST

# Find from an arbitrary coordinate center
GEOSEARCH locations FROMLONLAT 116.4 39.9 BYRADIUS 3 km ASC COUNT 10

# Calculate distance between two members
GEODIST locations user:1001 user:1002 km

# Retrieve coordinates
GEOPOS locations user:1001

# Get Geohash string (for sharing / external systems)
GEOHASH locations user:1001
def find_nearby(user_id: str, radius_km: float, limit: int = 20):
    """Return list of (user_id, distance_km) tuples for nearby users."""
    results = r.geosearch(
        "locations",
        member=user_id,
        radius=radius_km,
        unit="km",
        sort="ASC",
        count=limit,
        withdist=True,
    )
    return [
        (uid.decode(), dist)
        for uid, dist, *_ in results
        if uid.decode() != user_id      # exclude self
    ]

Privacy note: Never return raw coordinates to the client. Return distance only via GEODIST.


5. Flash Sales / Inventory Deduction (Lua Atomicity)

5.1 Core Lua Script

-- deduct_stock.lua
-- KEYS[1]: stock key (e.g., stock:sku:10086)
-- ARGV[1]: quantity to deduct
-- Returns: 1 = success, 0 = insufficient stock, -1 = key does not exist

local stock = redis.call('get', KEYS[1])
if stock == false then
    return -1
end
stock = tonumber(stock)
local requested = tonumber(ARGV[1])
if stock < requested then
    return 0
end
redis.call('decrby', KEYS[1], requested)
return 1
import redis

r = redis.Redis()

DEDUCT_STOCK_SCRIPT = r.register_script(open("deduct_stock.lua").read())

def seckill(sku_id: str, user_id: str, quantity: int = 1) -> bool:
    stock_key = f"stock:sku:{sku_id}"
    result = DEDUCT_STOCK_SCRIPT(keys=[stock_key], args=[quantity])
    if result == 1:
        # Push to async order queue โ€” do NOT create the order synchronously here
        r.xadd("orders:pending", {
            "sku_id": sku_id,
            "user_id": user_id,
            "quantity": quantity,
        })
        return True
    return False

5.2 Pre-warming Inventory

def warmup_stock(sku_id: str, stock: int, expire_seconds: int = 7200):
    """Load inventory into Redis before the sale begins."""
    key = f"stock:sku:{sku_id}"
    pipe = r.pipeline()
    pipe.set(key, stock)
    pipe.expire(key, expire_seconds)
    pipe.execute()

Oversell prevention: The Lua script makes the read-check-decrement sequence atomic. The stock value can never go negative because stock < requested is checked before decrementing.


6. Social Graph (Set Operations)

# Follow / unfollow
SADD following:1001 1002 1003
SREM following:1001 1002

# Record followers
SADD followers:1002 1001

# Mutual follows (friends in common)
SINTER following:1001 following:1002

# "People you may know": who user 1002 follows that user 1001 does not
SDIFF following:1002 following:1001

# Is 1002 following 1001? (are they mutual?)
SISMEMBER following:1002 1001

# Follow count / follower count
SCARD following:1001
SCARD followers:1001
def get_mutual_follows(user_a: str, user_b: str) -> list[str]:
    return [uid.decode() for uid in r.sinter(f"following:{user_a}", f"following:{user_b}")]

def get_recommendations(user_id: str, friend_id: str, limit: int = 20) -> list[str]:
    """Users that a friend follows but the current user does not."""
    result_key = f"rec:{user_id}:tmp"
    r.sdiffstore(result_key, f"following:{friend_id}", f"following:{user_id}")
    r.expire(result_key, 60)
    recs = r.srandmember(result_key, limit)
    return [uid.decode() for uid in recs]

7. Message Queues: List vs Stream

Feature List (LPUSH / BRPOP) Stream
Consumer groups No Yes (XGROUP)
Message ACK No Yes (XACK)
Pending Entry List (PEL) No Yes
Message replay No Yes (by ID)
Parallel consumers Manual sharding Native
Persistence AOF / RDB Same

Stream Example:

# Producer
r.xadd("orders:stream", {"order_id": "ORD001", "amount": "99.00"})

# Create consumer group (idempotent)
r.xgroup_create("orders:stream", "payment_group", id="0", mkstream=True)

# Consumer โ€” blocking read, up to 10 messages, wait 1 second
messages = r.xreadgroup(
    groupname="payment_group",
    consumername="worker-1",
    streams={"orders:stream": ">"},   # ">" = only new, undelivered messages
    count=10,
    block=1000,
)

for stream_name, entries in messages:
    for msg_id, fields in entries:
        try:
            # process fields...
            r.xack("orders:stream", "payment_group", msg_id)
        except Exception:
            pass  # message stays in PEL, will be reclaimed

Rule of thumb: Use Stream for all new projects. List is acceptable only for simple, single-consumer task queues where message loss on crash is tolerable.


8. Shared Session Storage

# Store (Redis 4.0+ HSET accepts variadic field-value pairs)
HSET session:abc123 user_id 1001 name Alice role admin created_at 1700000000

# Retrieve all fields
HGETALL session:abc123

# Retrieve one field
HGET session:abc123 user_id

# Renew TTL on every request
EXPIRE session:abc123 1800

# Logout
DEL session:abc123
import uuid, time
from typing import Optional

SESSION_TTL = 1800   # 30 minutes

def create_session(user_id: str, name: str, role: str) -> str:
    token = str(uuid.uuid4())    # 122 bits of entropy โ€” unguessable
    key = f"session:{token}"
    r.hset(key, mapping={
        "user_id": user_id,
        "name": name,
        "role": role,
        "created_at": int(time.time()),
    })
    r.expire(key, SESSION_TTL)
    return token

def get_session(token: str) -> Optional[dict]:
    key = f"session:{token}"
    data = r.hgetall(key)
    if not data:
        return None
    r.expire(key, SESSION_TTL)    # sliding expiry
    return {k.decode(): v.decode() for k, v in data.items()}

Security: Token must be UUID v4 (not sequential). Always use HTTPS to prevent interception. Require re-authentication for sensitive operations.


9. Token Bucket Rate Limiter (Lua Script)

Token bucket allows controlled bursting while enforcing an average rate.

-- token_bucket.lua
-- KEYS[1]: rate limit key
-- ARGV[1]: bucket capacity
-- ARGV[2]: refill rate (tokens per second)
-- ARGV[3]: current timestamp (milliseconds)
-- ARGV[4]: tokens requested
-- Returns: 1 = allowed, 0 = rejected

local key       = KEYS[1]
local capacity  = tonumber(ARGV[1])
local rate      = tonumber(ARGV[2])
local now       = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local last_tokens = tonumber(redis.call('hget', key, 'tokens'))
local last_time   = tonumber(redis.call('hget', key, 'time'))

if last_tokens == nil then last_tokens = capacity end
if last_time   == nil then last_time   = now      end

local elapsed   = math.max(0, now - last_time)
local new_tokens = math.min(capacity, last_tokens + elapsed * rate / 1000)

if new_tokens >= requested then
    redis.call('hset', key, 'tokens', new_tokens - requested, 'time', now)
    redis.call('expire', key, math.ceil(capacity / rate) + 1)
    return 1
else
    redis.call('hset', key, 'time', now)
    return 0
end
import time

TOKEN_BUCKET_SCRIPT = r.register_script(open("token_bucket.lua").read())

def is_allowed(user_id: str, capacity: int = 100, rate: float = 10.0) -> bool:
    """
    capacity: max burst size (tokens)
    rate: sustained requests per second
    """
    key = f"ratelimit:tb:{user_id}"
    now_ms = int(time.time() * 1000)
    result = TOKEN_BUCKET_SCRIPT(keys=[key], args=[capacity, rate, now_ms, 1])
    return result == 1

10. Sliding Window Rate Limiter (ZSET)

Sliding window tracks the exact count of requests within any rolling time window.

# Record request (score = timestamp ms, member = unique request ID)
ZADD requests:user:1001 1700000000000 req-uuid-001
ZADD requests:user:1001 1700000001000 req-uuid-002

# Count requests in the last 60 seconds
ZCOUNT requests:user:1001 1699999940000 1700000000000

# Remove records outside the window
ZREMRANGEBYSCORE requests:user:1001 0 1699999940000
import time, uuid

WINDOW_MS    = 60_000   # 60-second window
MAX_REQUESTS = 100      # limit per window

def check_rate_limit(user_id: str) -> bool:
    now_ms = int(time.time() * 1000)
    window_start = now_ms - WINDOW_MS
    key = f"requests:{user_id}"
    
    pipe = r.pipeline()
    pipe.zadd(key, {str(uuid.uuid4()): now_ms})      # record this request
    pipe.zremrangebyscore(key, 0, window_start)       # trim expired entries
    pipe.zcard(key)                                   # count remaining
    pipe.expire(key, WINDOW_MS // 1000 + 1)          # auto-cleanup
    results = pipe.execute()
    
    return results[2] <= MAX_REQUESTS

Comparison:

Token Bucket Sliding Window
Memory per user Fixed (2 fields) Grows with QPS
Burst handling Yes (up to capacity) Hard cutoff at limit
Accuracy Approximate Exact
Suitable for High-traffic APIs Precise security limits

Chapter Summary

Use Case Data Structure Key Commands
Leaderboard ZSET ZINCRBY / ZREVRANGE
Daily check-in Bitmap SETBIT / BITCOUNT
UV counting HyperLogLog PFADD / PFCOUNT / PFMERGE
Nearby users GEO (ZSET) GEOADD / GEOSEARCH
Flash sale String + Lua GET / DECRBY (atomic)
Social graph Set SINTER / SDIFF
Message queue Stream XADD / XREADGROUP / XACK
Session Hash HSET / HGETALL / EXPIRE
Token bucket Hash + Lua HGET / HSET / EXPIRE
Sliding window ZSET ZADD / ZCOUNT / ZREMRANGEBYSCORE

Core principle: Choose the right data structure, use Lua for atomicity, and load-test every high-concurrency scenario before going to production.

Rate this chapter
4.6  / 5  (3 ratings)

๐Ÿ’ฌ Comments