10 Real Use Cases: Leaderboards to Real-Time Rate Limiting
Chapter 36: 10 Production Use Cases: Leaderboards to Real-Time Rate Limiting
Redis excels at solving high-frequency engineering problems. This chapter systematically covers ten canonical use cases — each with a complete design rationale, core commands, production-quality code, and hard-won lessons from the field.
1. Leaderboards (ZSET)
1.1 Real-Time Score Updates
Sorted sets are the natural fit for leaderboards. Use ZINCRBY to atomically accumulate scores, avoiding race conditions:
ZINCRBY game:rank 50 user:1001 # add 50 points to user:1001
ZINCRBY game:rank -10 user:1002 # deduct 10 points (fully supported)
1.2 Paginated Queries
# Ascending (lowest score first)
ZRANGE game:rank 0 9 WITHSCORES
# Descending (leaderboard standard — highest score first)
ZREVRANGE game:rank 0 9 WITHSCORES
# Score-range query with pagination (Redis 6.2+ unified command)
ZRANGEBYSCORE game:rank +inf -inf WITHSCORES LIMIT 0 10
# Get a user's rank (0-based index)
ZREVRANK game:rank user:1001
# Get a user's score
ZSCORE game:rank user:1001
1.3 Weekly / Monthly Leaderboards
Partition by time in the key name; a scheduled job handles cleanup or merge:
import redis
from datetime import datetime
r = redis.Redis()
def add_score(user_id: str, score: float):
now = datetime.now()
# Global leaderboard
r.zincrby("game:rank:all", score, user_id)
# Weekly (ISO week number)
week_key = f"game:rank:{now.year}:W{now.isocalendar()[1]:02d}"
r.zincrby(week_key, score, user_id)
r.expire(week_key, 8 * 86400) # expire 1 day after the week ends
# Monthly
month_key = f"game:rank:{now.year}:{now.month:02d}"
r.zincrby(month_key, score, user_id)
r.expire(month_key, 35 * 86400)
def get_top(key: str, n: int = 10):
return r.zrevrange(key, 0, n - 1, withscores=True)
1.4 Massive Leaderboards (Hundreds of Millions of Users)
A single ZSET can hold tens of millions of members without degradation, but at truly massive scale, shard across multiple sorted sets:
SHARD_COUNT = 100
def shard_key(user_id: str) -> str:
shard = hash(user_id) % SHARD_COUNT
return f"game:rank:shard:{shard}"
def add_score_sharded(user_id: str, score: float):
r.zincrby(shard_key(user_id), score, user_id)
def get_global_top(n: int = 100):
"""
Pull top-N from every shard, merge-sort, return global top-N.
Accuracy increases with N; N=200 covers virtually all production needs.
"""
candidates = []
for i in range(SHARD_COUNT):
key = f"game:rank:shard:{i}"
top = r.zrevrange(key, 0, n - 1, withscores=True)
candidates.extend(top)
candidates.sort(key=lambda x: x[1], reverse=True)
return candidates[:n]
2. Daily Check-Ins (Bitmap)
2.1 Recording and Querying Check-Ins
# User 1001 checks in on day 15 of January 2024
SETBIT sign:2024:01:1001 14 1 # bit offset is 0-based; day 15 = offset 14
# Did user 1001 check in on day 15?
GETBIT sign:2024:01:1001 14 # returns 1 (yes) or 0 (no)
# Total days checked in this month
BITCOUNT sign:2024:01:1001
# Days checked in during the first 32 days (byte-range query)
BITCOUNT sign:2024:01:1001 0 3
2.2 Computing Consecutive Days
from datetime import date
def get_consecutive_days(user_id: str, year: int, month: int) -> int:
"""Count how many consecutive days the user has checked in up to today."""
today = date.today()
key = f"sign:{year}:{month:02d}:{user_id}"
raw_bytes = r.getrange(key, 0, -1)
if not raw_bytes:
return 0
# Convert to binary string
bit_str = ''.join(f'{b:08b}' for b in raw_bytes)
count = 0
for i in range(today.day - 1, -1, -1): # walk backwards from today
if i < len(bit_str) and bit_str[i] == '1':
count += 1
else:
break
return count
2.3 Memory Efficiency
31 days per user = 31 bits ≈ 4 bytes. For 100 million users:
100,000,000 × 4 bytes = 400 MB for a full month — extraordinarily compact.
3. Unique Visitor Counting (HyperLogLog)
HyperLogLog uses a fixed 12 KB to estimate cardinality up to billions with ~0.81% error.
# Record a visit
PFADD uv:2024:01:15 user:1001 user:1002 user:1003
# Query daily UV
PFCOUNT uv:2024:01:15
# Multi-day combined UV (approximate de-duplication)
PFCOUNT uv:2024:01:14 uv:2024:01:15
# Merge a week into a single key for accurate weekly UV
PFMERGE uv:week:2024:03 \
uv:2024:01:15 uv:2024:01:16 uv:2024:01:17 \
uv:2024:01:18 uv:2024:01:19 uv:2024:01:20 uv:2024:01:21
PFCOUNT uv:week:2024:03
import redis
from datetime import date, timedelta
r = redis.Redis()
def record_visit(user_id: str):
today = date.today().strftime("%Y:%m:%d")
r.pfadd(f"uv:{today}", user_id)
def get_weekly_uv(start_date: date) -> int:
keys = [f"uv:{(start_date + timedelta(days=i)).strftime('%Y:%m:%d')}"
for i in range(7)]
week_key = f"uv:week:{start_date.strftime('%Y:%m:%d')}"
r.pfmerge(week_key, *keys)
r.expire(week_key, 30 * 86400)
return r.pfcount(week_key)
When to use: UV/DAU/MAU statistics where exact-to-the-unit precision is not required. For exact counts, use Bitmap (with integer user IDs) or SET.
4. Nearby Users (GEO)
Redis GEO stores coordinates as Geohash-encoded scores in a ZSET, enabling radius queries with sub-millisecond latency.
# Add positions (longitude, latitude, member)
GEOADD locations 116.397128 39.916527 user:1001
GEOADD locations 121.472644 31.231706 user:1002
# Find neighbors within 5 km, sorted by distance (Redis 6.2+)
GEOSEARCH locations FROMMEMBER user:1001 BYRADIUS 5 km ASC COUNT 20 WITHCOORD WITHDIST
# Find from an arbitrary coordinate center
GEOSEARCH locations FROMLONLAT 116.4 39.9 BYRADIUS 3 km ASC COUNT 10
# Calculate distance between two members
GEODIST locations user:1001 user:1002 km
# Retrieve coordinates
GEOPOS locations user:1001
# Get Geohash string (for sharing / external systems)
GEOHASH locations user:1001
def find_nearby(user_id: str, radius_km: float, limit: int = 20):
"""Return list of (user_id, distance_km) tuples for nearby users."""
results = r.geosearch(
"locations",
member=user_id,
radius=radius_km,
unit="km",
sort="ASC",
count=limit,
withdist=True,
)
return [
(uid.decode(), dist)
for uid, dist, *_ in results
if uid.decode() != user_id # exclude self
]
Privacy note: Never return raw coordinates to the client. Return distance only via GEODIST.
5. Flash Sales / Inventory Deduction (Lua Atomicity)
5.1 Core Lua Script
-- deduct_stock.lua
-- KEYS[1]: stock key (e.g., stock:sku:10086)
-- ARGV[1]: quantity to deduct
-- Returns: 1 = success, 0 = insufficient stock, -1 = key does not exist
local stock = redis.call('get', KEYS[1])
if stock == false then
return -1
end
stock = tonumber(stock)
local requested = tonumber(ARGV[1])
if stock < requested then
return 0
end
redis.call('decrby', KEYS[1], requested)
return 1
import redis
r = redis.Redis()
DEDUCT_STOCK_SCRIPT = r.register_script(open("deduct_stock.lua").read())
def seckill(sku_id: str, user_id: str, quantity: int = 1) -> bool:
stock_key = f"stock:sku:{sku_id}"
result = DEDUCT_STOCK_SCRIPT(keys=[stock_key], args=[quantity])
if result == 1:
# Push to async order queue — do NOT create the order synchronously here
r.xadd("orders:pending", {
"sku_id": sku_id,
"user_id": user_id,
"quantity": quantity,
})
return True
return False
5.2 Pre-warming Inventory
def warmup_stock(sku_id: str, stock: int, expire_seconds: int = 7200):
"""Load inventory into Redis before the sale begins."""
key = f"stock:sku:{sku_id}"
pipe = r.pipeline()
pipe.set(key, stock)
pipe.expire(key, expire_seconds)
pipe.execute()
Oversell prevention: The Lua script makes the read-check-decrement sequence atomic. The stock value can never go negative because stock < requested is checked before decrementing.
6. Social Graph (Set Operations)
# Follow / unfollow
SADD following:1001 1002 1003
SREM following:1001 1002
# Record followers
SADD followers:1002 1001
# Mutual follows (friends in common)
SINTER following:1001 following:1002
# "People you may know": who user 1002 follows that user 1001 does not
SDIFF following:1002 following:1001
# Is 1002 following 1001? (are they mutual?)
SISMEMBER following:1002 1001
# Follow count / follower count
SCARD following:1001
SCARD followers:1001
def get_mutual_follows(user_a: str, user_b: str) -> list[str]:
return [uid.decode() for uid in r.sinter(f"following:{user_a}", f"following:{user_b}")]
def get_recommendations(user_id: str, friend_id: str, limit: int = 20) -> list[str]:
"""Users that a friend follows but the current user does not."""
result_key = f"rec:{user_id}:tmp"
r.sdiffstore(result_key, f"following:{friend_id}", f"following:{user_id}")
r.expire(result_key, 60)
recs = r.srandmember(result_key, limit)
return [uid.decode() for uid in recs]
7. Message Queues: List vs Stream
| Feature | List (LPUSH / BRPOP) | Stream |
|---|---|---|
| Consumer groups | No | Yes (XGROUP) |
| Message ACK | No | Yes (XACK) |
| Pending Entry List (PEL) | No | Yes |
| Message replay | No | Yes (by ID) |
| Parallel consumers | Manual sharding | Native |
| Persistence | AOF / RDB | Same |
Stream Example:
# Producer
r.xadd("orders:stream", {"order_id": "ORD001", "amount": "99.00"})
# Create consumer group (idempotent)
r.xgroup_create("orders:stream", "payment_group", id="0", mkstream=True)
# Consumer — blocking read, up to 10 messages, wait 1 second
messages = r.xreadgroup(
groupname="payment_group",
consumername="worker-1",
streams={"orders:stream": ">"}, # ">" = only new, undelivered messages
count=10,
block=1000,
)
for stream_name, entries in messages:
for msg_id, fields in entries:
try:
# process fields...
r.xack("orders:stream", "payment_group", msg_id)
except Exception:
pass # message stays in PEL, will be reclaimed
Rule of thumb: Use Stream for all new projects. List is acceptable only for simple, single-consumer task queues where message loss on crash is tolerable.
8. Shared Session Storage
# Store (Redis 4.0+ HSET accepts variadic field-value pairs)
HSET session:abc123 user_id 1001 name Alice role admin created_at 1700000000
# Retrieve all fields
HGETALL session:abc123
# Retrieve one field
HGET session:abc123 user_id
# Renew TTL on every request
EXPIRE session:abc123 1800
# Logout
DEL session:abc123
import uuid, time
from typing import Optional
SESSION_TTL = 1800 # 30 minutes
def create_session(user_id: str, name: str, role: str) -> str:
token = str(uuid.uuid4()) # 122 bits of entropy — unguessable
key = f"session:{token}"
r.hset(key, mapping={
"user_id": user_id,
"name": name,
"role": role,
"created_at": int(time.time()),
})
r.expire(key, SESSION_TTL)
return token
def get_session(token: str) -> Optional[dict]:
key = f"session:{token}"
data = r.hgetall(key)
if not data:
return None
r.expire(key, SESSION_TTL) # sliding expiry
return {k.decode(): v.decode() for k, v in data.items()}
Security: Token must be UUID v4 (not sequential). Always use HTTPS to prevent interception. Require re-authentication for sensitive operations.
9. Token Bucket Rate Limiter (Lua Script)
Token bucket allows controlled bursting while enforcing an average rate.
-- token_bucket.lua
-- KEYS[1]: rate limit key
-- ARGV[1]: bucket capacity
-- ARGV[2]: refill rate (tokens per second)
-- ARGV[3]: current timestamp (milliseconds)
-- ARGV[4]: tokens requested
-- Returns: 1 = allowed, 0 = rejected
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local last_tokens = tonumber(redis.call('hget', key, 'tokens'))
local last_time = tonumber(redis.call('hget', key, 'time'))
if last_tokens == nil then last_tokens = capacity end
if last_time == nil then last_time = now end
local elapsed = math.max(0, now - last_time)
local new_tokens = math.min(capacity, last_tokens + elapsed * rate / 1000)
if new_tokens >= requested then
redis.call('hset', key, 'tokens', new_tokens - requested, 'time', now)
redis.call('expire', key, math.ceil(capacity / rate) + 1)
return 1
else
redis.call('hset', key, 'time', now)
return 0
end
import time
TOKEN_BUCKET_SCRIPT = r.register_script(open("token_bucket.lua").read())
def is_allowed(user_id: str, capacity: int = 100, rate: float = 10.0) -> bool:
"""
capacity: max burst size (tokens)
rate: sustained requests per second
"""
key = f"ratelimit:tb:{user_id}"
now_ms = int(time.time() * 1000)
result = TOKEN_BUCKET_SCRIPT(keys=[key], args=[capacity, rate, now_ms, 1])
return result == 1
10. Sliding Window Rate Limiter (ZSET)
Sliding window tracks the exact count of requests within any rolling time window.
# Record request (score = timestamp ms, member = unique request ID)
ZADD requests:user:1001 1700000000000 req-uuid-001
ZADD requests:user:1001 1700000001000 req-uuid-002
# Count requests in the last 60 seconds
ZCOUNT requests:user:1001 1699999940000 1700000000000
# Remove records outside the window
ZREMRANGEBYSCORE requests:user:1001 0 1699999940000
import time, uuid
WINDOW_MS = 60_000 # 60-second window
MAX_REQUESTS = 100 # limit per window
def check_rate_limit(user_id: str) -> bool:
now_ms = int(time.time() * 1000)
window_start = now_ms - WINDOW_MS
key = f"requests:{user_id}"
pipe = r.pipeline()
pipe.zadd(key, {str(uuid.uuid4()): now_ms}) # record this request
pipe.zremrangebyscore(key, 0, window_start) # trim expired entries
pipe.zcard(key) # count remaining
pipe.expire(key, WINDOW_MS // 1000 + 1) # auto-cleanup
results = pipe.execute()
return results[2] <= MAX_REQUESTS
Comparison:
| Token Bucket | Sliding Window | |
|---|---|---|
| Memory per user | Fixed (2 fields) | Grows with QPS |
| Burst handling | Yes (up to capacity) | Hard cutoff at limit |
| Accuracy | Approximate | Exact |
| Suitable for | High-traffic APIs | Precise security limits |
Chapter Summary
| Use Case | Data Structure | Key Commands |
|---|---|---|
| Leaderboard | ZSET | ZINCRBY / ZREVRANGE |
| Daily check-in | Bitmap | SETBIT / BITCOUNT |
| UV counting | HyperLogLog | PFADD / PFCOUNT / PFMERGE |
| Nearby users | GEO (ZSET) | GEOADD / GEOSEARCH |
| Flash sale | String + Lua | GET / DECRBY (atomic) |
| Social graph | Set | SINTER / SDIFF |
| Message queue | Stream | XADD / XREADGROUP / XACK |
| Session | Hash | HSET / HGETALL / EXPIRE |
| Token bucket | Hash + Lua | HGET / HSET / EXPIRE |
| Sliding window | ZSET | ZADD / ZCOUNT / ZREMRANGEBYSCORE |
Core principle: Choose the right data structure, use Lua for atomicity, and load-test every high-concurrency scenario before going to production.