实战算法:限流与调度
第三十九章:实战算法 — 限流与调度
前面三十八章我们研究了几乎所有经典数据结构和算法。但在真实生产环境中,有一类算法几乎每个后端工程师都会遇到,却很少出现在传统教科书中——限流(Rate Limiting) 和 负载均衡/调度(Load Balancing / Scheduling)。
限流解决的核心问题是:当请求量超过系统承载能力时,如何有选择地拒绝部分请求以保护系统不崩溃。调度解决的核心问题是:当有多个后端服务器可以处理请求时,如何将请求分配到最合适的服务器以最大化吞吐量、最小化延迟。
这两类算法在面试中通常出现在系统设计环节,但它们背后的数据结构和算法思想完全可以用代码实现——而且在 Nginx、Redis、API Gateway 等系统中确实有精确的代码实现。本章用严格的算法视角来剖析它们。
Level 1 · 你需要知道的
39.1 令牌桶算法(Token Bucket)
问题定义
给定一个系统,允许的最大请求速率为 rate 个/秒,允许的最大突发量为 burst 个。设计一个限流器:每来一个请求判断是否放行。
算法思想
想象一个桶,桶的容量是 burst。每秒往桶里匀速添加 rate 个令牌。每个请求需要消耗一个令牌才能通过:
- 如果桶中有令牌:消耗一个,放行
- 如果桶中没有令牌:拒绝
令牌桶的巧妙之处在于:它允许短时间的突发流量(把桶中积累的令牌一次性消耗完),但长期平均速率不会超过 rate。
这个算法最早由 Turner (1986, "New Directions in Communications") 提出,后被广泛应用于网络流量整形(Traffic Shaping)。IETF RFC 2697 和 RFC 2698 定义了基于令牌桶的流量策略标准。
为什么不需要后台线程添加令牌?
如果真的启一个线程每隔 1/rate 秒往桶里放一个令牌,那是最朴素的实现。但实际生产中,我们用惰性计算:只在请求到来时计算"自上次以来应该添加了多少令牌"。
import time
class TokenBucket:
"""
令牌桶限流器
核心公式:
当前令牌数 = min(burst, 上次令牌数 + (当前时间 - 上次时间) * rate)
为什么用惰性计算?
1. 不需要后台线程(节省资源)
2. 精度更高(浮点数时间戳 vs 固定间隔)
3. 没有线程同步问题
"""
def __init__(self, rate: float, burst: int):
"""
Args:
rate: 令牌填充速率(个/秒)
burst: 桶容量(最大突发量)
"""
self.rate = rate
self.burst = burst
self.tokens = float(burst) # 初始满桶
self.last_time = time.time()
def allow(self) -> bool:
"""判断当前请求是否被允许通过"""
now = time.time()
# 计算自上次以来应该添加的令牌数
elapsed = now - self.last_time
self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
self.last_time = now
if self.tokens >= 1.0:
self.tokens -= 1.0
return True
else:
return False
def allow_n(self, n: int) -> bool:
"""判断是否允许 n 个令牌同时消耗(批量请求)"""
now = time.time()
elapsed = now - self.last_time
self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
self.last_time = now
if self.tokens >= n:
self.tokens -= n
return True
else:
return False
参数含义
| 参数 | 含义 | 典型值 |
|---|---|---|
| rate | 长期平均允许速率 | 100 req/s |
| burst | 允许的最大瞬时突发 | 200 (2秒的缓冲) |
为什么初始是满桶?
这是一个设计选择。满桶意味着系统启动后可以立即处理一波突发请求。如果初始为空桶,则系统启动后需要等待 burst/rate 秒才能达到最大处理能力——在服务重启后这可能造成不必要的拒绝。
令牌桶 vs 固定窗口计数器
最简单的限流是"每秒最多 N 个请求"(固定窗口计数器),但有一个致命问题:
时间轴: |--- 第1秒 ---|--- 第2秒 ---|
请求: ......[99个在末尾][99个在开头]......
如果限制是 100 req/s,在第 1 秒的最后 0.5 秒和第 2 秒的最前 0.5 秒各发 99 个请求,实际 1 秒内有 198 个请求通过——几乎是限制的 2 倍!
令牌桶没有"窗口边界"问题,因为它是基于连续时间的。
39.2 漏桶算法(Leaky Bucket)
算法思想
想象一个桶底有一个固定大小的洞,水(请求)以任意速率倒入桶中:
- 水以固定速率从洞中流出(处理请求),无论倒入速率多快
- 如果桶满了(队列满),新倒入的水溢出(拒绝请求)
漏桶的核心特性是输出速率严格恒定——即使输入有突发,输出也是匀速的。
import time
from collections import deque
class LeakyBucket:
"""
漏桶限流器
与令牌桶的关键区别:
- 令牌桶:允许突发(只要桶里有令牌就放行)
- 漏桶:严格匀速输出(请求被排队,按固定速率处理)
漏桶更适合需要"平滑流量"的场景,如网络包发送。
令牌桶更适合"允许突发但限制平均速率"的场景,如 API 限流。
"""
def __init__(self, rate: float, capacity: int):
"""
Args:
rate: 漏出速率(个/秒),即处理速率
capacity: 桶容量(等待队列最大长度)
"""
self.rate = rate
self.capacity = capacity
self.water = 0.0 # 当前桶中的水量
self.last_time = time.time()
def allow(self) -> bool:
"""请求是否允许进入桶(排队等待处理)"""
now = time.time()
elapsed = now - self.last_time
# 计算已经漏出的水量
leaked = elapsed * self.rate
self.water = max(0.0, self.water - leaked)
self.last_time = now
if self.water < self.capacity:
self.water += 1.0
return True # 请求进入排队
else:
return False # 桶满,拒绝
class LeakyBucketQueue:
"""
带实际队列的漏桶实现(请求被缓冲后匀速处理)
适用场景:消息队列消费者、任务调度器
"""
def __init__(self, rate: float, capacity: int):
self.rate = rate
self.capacity = capacity
self.queue = deque()
self.last_leak_time = time.time()
def enqueue(self, request) -> bool:
"""尝试将请求入队"""
if len(self.queue) >= self.capacity:
return False # 队列满,拒绝
self.queue.append(request)
return True
def dequeue(self):
"""按漏桶速率取出请求(应由消费线程调用)"""
now = time.time()
elapsed = now - self.last_leak_time
# 只有间隔足够长才允许取出下一个
if elapsed >= 1.0 / self.rate and self.queue:
self.last_leak_time = now
return self.queue.popleft()
return None
令牌桶 vs 漏桶对比
| 维度 | 令牌桶 (Token Bucket) | 漏桶 (Leaky Bucket) |
|---|---|---|
| 突发处理 | 允许突发(桶中有令牌就放行) | 不允许突发(严格匀速输出) |
| 输出速率 | 可变(突发时可高于平均速率) | 恒定(永远等于 rate) |
| 实现复杂度 | 简单(一个计数器 + 时间戳) | 中等(需要队列或计数器) |
| 适用场景 | API 限流、允许短暂突发 | 网络整形、需要平滑流量 |
| 数学模型 | 类似银行存款(积累+消费) | 类似恒速传送带 |
为什么 Nginx 用漏桶而 Redis 用令牌桶?
Nginx 的 limit_req 模块使用漏桶模型,因为 Nginx 作为反向代理,需要平滑后端服务器收到的请求速率,避免突发压垮后端。
Redis 的 redis_cell 模块(GCRA 算法)和很多 API Gateway 使用令牌桶模型,因为对于用户 API 调用,允许短暂突发是更好的体验(用户不会因为恰好在毫秒级时间窗口发了两个请求就被限流)。
39.3 滑动窗口限流
问题定义
限制"任意连续 T 秒内最多 N 个请求"。这比固定窗口更精确(没有边界问题),比令牌桶更直观(直接表达"N 次/T 秒"的语义)。
方法 1:滑动窗口日志(精确但内存大)
记录每个请求的时间戳。判断时,数一下最近 T 秒内有多少个时间戳。
import time
from collections import deque
class SlidingWindowLog:
"""
滑动窗口日志限流
原理:维护一个时间戳队列,每次请求时:
1. 移除所有 > T 秒前的时间戳
2. 如果队列长度 < N,放行并记录时间戳
3. 否则拒绝
优点:精确,没有窗口边界问题
缺点:每个用户需要存储最多 N 个时间戳,内存开销 O(N * 用户数)
"""
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.timestamps = deque()
def allow(self) -> bool:
now = time.time()
# 移除窗口外的旧时间戳
while self.timestamps and self.timestamps[0] <= now - self.window_seconds:
self.timestamps.popleft()
if len(self.timestamps) < self.max_requests:
self.timestamps.append(now)
return True
else:
return False
方法 2:滑动窗口计数器(近似但省内存)
将时间划分为小的子窗口(如 1 秒的窗口划分为 10 个 100ms 子窗口),每个子窗口维护一个计数器。
import time
import math
class SlidingWindowCounter:
"""
滑动窗口计数器限流(近似算法)
思路:将大窗口划分为多个小窗格(slot),用环形数组存储每个窗格的计数。
判断时,累加所有在当前窗口内的窗格计数。
精度取决于窗格数量:窗格越多越精确,但内存越大。
这是 Cloudflare 在其边缘限流中使用的算法变体。
"""
def __init__(self, max_requests: int, window_seconds: float, num_slots: int = 10):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.num_slots = num_slots
self.slot_duration = window_seconds / num_slots
self.slots = [0] * num_slots
self.slot_timestamps = [0.0] * num_slots # 每个 slot 对应的时间
self.current_slot = 0
def _get_slot_index(self, timestamp: float) -> int:
"""根据时间戳计算对应的 slot 索引"""
return int(timestamp / self.slot_duration) % self.num_slots
def allow(self) -> bool:
now = time.time()
current_idx = self._get_slot_index(now)
# 清除过期的 slot
self._clean_expired(now)
# 累加窗口内所有 slot 的计数
total = sum(self.slots)
if total < self.max_requests:
self.slots[current_idx] += 1
self.slot_timestamps[current_idx] = now
return True
else:
return False
def _clean_expired(self, now: float) -> None:
"""清除超出窗口的旧 slot"""
window_start = now - self.window_seconds
for i in range(self.num_slots):
if self.slot_timestamps[i] < window_start:
self.slots[i] = 0
方法 3:两窗口加权(工业常用近似)
这是一个极简但有效的近似方法,被广泛使用:
import time
class SlidingWindowApprox:
"""
双窗口加权近似
思路:
- 维护当前窗口和上一个窗口的计数
- 近似当前滑动窗口内的请求数 = 上一窗口计数 * 重叠比例 + 当前窗口计数
例如:窗口 = 1分钟
当前时间在当前窗口内过了 40%
近似值 = prev_count * 0.6 + curr_count * 1.0
这是 Cloudflare 博客 (2017) 中描述的生产限流算法。
误差最大约 12.5%(可证明),对大多数场景足够精确。
"""
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.prev_count = 0
self.curr_count = 0
self.window_start = time.time()
def allow(self) -> bool:
now = time.time()
elapsed = now - self.window_start
# 检查是否需要翻转窗口
if elapsed >= self.window_seconds:
# 可能跨了多个窗口
windows_passed = int(elapsed / self.window_seconds)
if windows_passed == 1:
self.prev_count = self.curr_count
else:
self.prev_count = 0 # 跨了太多窗口,上一窗口计数归零
self.curr_count = 0
self.window_start += windows_passed * self.window_seconds
elapsed = now - self.window_start
# 计算加权近似值
weight = 1.0 - elapsed / self.window_seconds # 上一窗口的重叠比例
approx_count = self.prev_count * weight + self.curr_count
if approx_count < self.max_requests:
self.curr_count += 1
return True
else:
return False
39.4 三种限流算法对比总结
| 维度 | 令牌桶 | 漏桶 | 滑动窗口日志 | 滑动窗口计数器 |
|---|---|---|---|---|
| 精确度 | 精确 | 精确 | 精确 | 近似 |
| 允许突发 | 是 | 否 | 是(窗口内随意分布) | 是 |
| 内存 | O(1) | O(1) 或 O(queue) | O(N * 用户数) | O(slots * 用户数) |
| 实现复杂度 | 低 | 低-中 | 低 | 中 |
| 适用场景 | API 限流 | 网络整形 | 小规模精确限流 | 大规模分布式限流 |
39.5 负载均衡算法
当有多个后端服务器时,如何选择将请求发给谁?
轮询(Round Robin)
class RoundRobin:
"""
最简单的负载均衡:按顺序循环分配
优点:实现简单,完全公平
缺点:不考虑服务器处理能力差异,不考虑当前负载
"""
def __init__(self, servers: list):
self.servers = servers
self.index = 0
def next_server(self) -> str:
server = self.servers[self.index % len(self.servers)]
self.index += 1
return server
加权轮询(Weighted Round Robin)
class WeightedRoundRobin:
"""
加权轮询:按权重分配请求
Nginx 使用的平滑加权轮询算法(Smooth Weighted Round Robin):
避免连续将请求发到同一台高权重服务器。
算法由 Nginx 开发者 phxc 提出:
1. 每个服务器有 weight(配置权重)和 current_weight(当前权重,动态变化)
2. 每次选择时:所有服务器 current_weight += weight
3. 选 current_weight 最大的服务器
4. 被选中的服务器 current_weight -= total_weight
效果:权重为 {A:5, B:1, C:1} 时,分配序列是 A,A,B,A,C,A,A 而不是 A,A,A,A,A,B,C
"""
def __init__(self, servers: list, weights: list):
self.servers = servers
self.weights = weights
self.current_weights = [0] * len(servers)
self.total_weight = sum(weights)
def next_server(self) -> str:
# 所有服务器 current_weight += weight
for i in range(len(self.servers)):
self.current_weights[i] += self.weights[i]
# 选最大的
max_idx = 0
for i in range(1, len(self.servers)):
if self.current_weights[i] > self.current_weights[max_idx]:
max_idx = i
# 被选中的减去 total_weight
self.current_weights[max_idx] -= self.total_weight
return self.servers[max_idx]
最少连接(Least Connections)
import heapq
class LeastConnections:
"""
最少连接:将请求分配给当前活跃连接数最少的服务器
适用场景:请求处理时间差异大(如有的请求 10ms,有的 10s)
轮询在这种场景下会导致慢请求堆积在某台服务器上
实现:用最小堆维护 (active_connections, server)
"""
def __init__(self, servers: list):
# (active_connections, server_index, server_name)
self.heap = [(0, i, s) for i, s in enumerate(servers)]
heapq.heapify(self.heap)
self.server_entries = {} # server -> heap entry reference
def acquire(self) -> str:
"""获取一个服务器(连接数 +1)"""
conns, idx, server = heapq.heappop(self.heap)
heapq.heappush(self.heap, (conns + 1, idx, server))
return server
def release(self, server: str) -> None:
"""释放一个连接(连接数 -1)"""
# 实际实现中需要 decrease-key 操作
# 简化版:重建堆(生产中用更高效的数据结构)
new_heap = []
for conns, idx, s in self.heap:
if s == server:
new_heap.append((max(0, conns - 1), idx, s))
else:
new_heap.append((conns, idx, s))
self.heap = new_heap
heapq.heapify(self.heap)
一致性哈希(Consistent Hashing)
import hashlib
import bisect
class ConsistentHash:
"""
一致性哈希:将 key 和服务器都映射到一个环上
由 Karger 等人 (1997, "Consistent Hashing and Random Trees") 在 MIT 提出。
核心优势:增减服务器时,只有 1/N 的 key 需要重新映射
(而普通取模哈希 hash(key) % N 在 N 变化时几乎所有 key 都要重新映射)
虚拟节点(virtual nodes):每个物理服务器映射为多个虚拟节点
解决负载不均衡问题(物理节点少时,哈希环上的分布可能很不均匀)
"""
def __init__(self, servers: list, num_replicas: int = 150):
"""
Args:
servers: 服务器列表
num_replicas: 每个服务器的虚拟节点数(越多越均匀,但查找略慢)
"""
self.num_replicas = num_replicas
self.ring = [] # 排序的哈希值列表
self.ring_map = {} # 哈希值 -> 服务器名
for server in servers:
self.add_server(server)
def _hash(self, key: str) -> int:
"""使用 MD5 生成哈希值(生产中可用 xxhash 等更快的哈希)"""
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_server(self, server: str) -> None:
"""添加服务器(及其虚拟节点)到哈希环"""
for i in range(self.num_replicas):
virtual_key = f"{server}#{i}"
h = self._hash(virtual_key)
bisect.insort(self.ring, h)
self.ring_map[h] = server
def remove_server(self, server: str) -> None:
"""从哈希环中移除服务器"""
for i in range(self.num_replicas):
virtual_key = f"{server}#{i}"
h = self._hash(virtual_key)
self.ring.remove(h)
del self.ring_map[h]
def get_server(self, key: str) -> str:
"""根据 key 获取对应的服务器"""
if not self.ring:
return None
h = self._hash(key)
# 找到环上顺时针方向第一个 >= h 的节点
idx = bisect.bisect_left(self.ring, h)
if idx == len(self.ring):
idx = 0 # 绕回环的起点
return self.ring_map[self.ring[idx]]
为什么虚拟节点数通常是 100-200?
Karger 等人的分析表明,当虚拟节点数为 $k$ 时,负载不均衡度约为 $O(\sqrt{\log n / k})$,其中 $n$ 是 key 的数量。当 $k = 150$ 时,在大多数实际工作负载下,每台服务器的负载偏差在 5-10% 以内。更多虚拟节点意味着更均匀但查找更慢(bisect 在更长的数组上搜索),150 是常见的工程平衡点。
39.6 优先级调度
在任务调度中,不同任务有不同优先级。高优先级任务应该被优先处理。
import heapq
import time
from dataclasses import dataclass, field
@dataclass(order=True)
class Task:
priority: int # 优先级(数字越小优先级越高)
timestamp: float = field(compare=True) # 同优先级按 FIFO
name: str = field(compare=False) # 任务名(不参与比较)
payload: object = field(compare=False, default=None)
class PriorityScheduler:
"""
优先级调度器
核心数据结构:最小堆
支持:
- 提交任务(带优先级)
- 获取最高优先级任务
- 优先级提升/降低(通过标记删除 + 重新入堆)
防饥饿机制:aging(老化)
长时间等待的低优先级任务会逐渐提升优先级
"""
def __init__(self):
self.heap = []
self.counter = 0 # 用于 FIFO 打破平局
def submit(self, name: str, priority: int, payload=None) -> None:
"""提交一个任务"""
task = Task(priority=priority, timestamp=time.time(), name=name, payload=payload)
heapq.heappush(self.heap, task)
def next_task(self) -> Task:
"""获取下一个应该执行的任务"""
if self.heap:
return heapq.heappop(self.heap)
return None
def apply_aging(self, aging_seconds: float = 60.0, priority_boost: int = 1) -> None:
"""
老化机制:等待超过 aging_seconds 的任务优先级提升
为什么需要 aging?
纯优先级调度会导致低优先级任务"饥饿"(永远得不到执行)。
操作系统进程调度(如 Linux CFS)和作业调度系统都使用类似机制。
"""
now = time.time()
new_heap = []
for task in self.heap:
wait_time = now - task.timestamp
if wait_time > aging_seconds:
# 提升优先级(减小 priority 值)
boosted_priority = max(0, task.priority - priority_boost)
new_task = Task(priority=boosted_priority, timestamp=task.timestamp,
name=task.name, payload=task.payload)
new_heap.append(new_task)
else:
new_heap.append(task)
self.heap = new_heap
heapq.heapify(self.heap)
Level 2 · 它是怎么运行的
39.7 令牌桶的数学模型
连续时间模型
设 $b(t)$ 为时刻 $t$ 桶中的令牌数,$r$ 为填充速率,$B$ 为桶容量。
$$b(t) = \min\left(B, , b(t_0) + r \cdot (t - t_0)\right)$$
其中 $t_0$ 是上一次消耗令牌的时刻。
突发持续时间
如果桶是满的($b = B$),以最大速率 $R_{max}$(瞬时无限速率,即尽可能快地发请求)消耗令牌,突发可以持续多久?
设突发速率为 $R_{burst}$($R_{burst} > r$),则突发持续时间:
$$T_{burst} = \frac{B}{R_{burst} - r}$$
因为在突发期间,每秒净消耗 $R_{burst} - r$ 个令牌(消耗 $R_{burst}$ 但同时以 $r$ 补充)。
例如:$r = 100$ req/s, $B = 500$, $R_{burst} = 600$ req/s $$T_{burst} = \frac{500}{600 - 100} = 1 \text{ 秒}$$
在这 1 秒内可以发 600 个请求(而不是只有 100 个),之后回到 100 req/s 的稳态速率。
与排队论的联系
令牌桶限流器可以建模为一个 M/D/1/K 排队系统(泊松到达、确定性服务、单服务台、有限队列长度 K = burst)。当请求到达率 $\lambda > r$ 时,丢弃概率为:
$$P_{drop} \approx 1 - \frac{r}{\lambda} \quad (\text{稳态近似,当 } \lambda \gg r)$$
39.8 GCRA 算法(通用信元速率算法)
GCRA(Generic Cell Rate Algorithm) 是 ATM 网络标准中定义的限流算法,等价于令牌桶但用不同的数学表述。Redis 的 redis_cell 模块实现的就是 GCRA。
核心思想:维护一个"理论到达时间"(TAT, Theoretical Arrival Time)。每个请求的 TAT 是它"应该到达的最早时间"。如果实际到达时间早于 TAT,说明请求太频繁,被限流。
import time
class GCRA:
"""
GCRA (Generic Cell Rate Algorithm)
等价于令牌桶,但用"时间"而非"令牌数"表述。
核心变量:TAT (Theoretical Arrival Time)
- 如果 now >= TAT:允许,TAT = now + emission_interval
- 如果 now < TAT 但 TAT - now <= burst_tolerance:允许,TAT = TAT + emission_interval
- 如果 TAT - now > burst_tolerance:拒绝
emission_interval = 1 / rate(两次请求之间的最小间隔)
burst_tolerance = burst * emission_interval(允许的最大提前量)
由 ITU-T I.371 标准定义(1993)。
"""
def __init__(self, rate: float, burst: int):
self.emission_interval = 1.0 / rate # 两次请求的理想间隔
self.burst_tolerance = burst * self.emission_interval # 最大容忍的"提前量"
self.tat = 0.0 # Theoretical Arrival Time
def allow(self) -> bool:
now = time.time()
# 新 TAT = max(TAT, now) + emission_interval
new_tat = max(self.tat, now) + self.emission_interval
# 如果新 TAT 超过 now + burst_tolerance + emission_interval,拒绝
if new_tat - now > self.burst_tolerance + self.emission_interval:
return False
self.tat = new_tat
return True
GCRA 的优势:只需要一个变量 TAT(一个时间戳),内存极小。Redis 的 CL.THROTTLE 命令就是基于 GCRA,每个 key 只存一个时间戳。
39.9 一致性哈希的负载均衡证明
定理(Karger et al., 1997): 在有 $n$ 台服务器、$m$ 个 key 的一致性哈希中:
- 每台服务器的期望负载为 $m/n$
- 使用 $k$ 个虚拟节点时,每台服务器负载在 $[m/n \cdot (1-\epsilon), m/n \cdot (1+\epsilon)]$ 范围内的概率至少为 $1 - 2/n^2$,其中 $\epsilon = O(\sqrt{\log n / k})$
增减节点时的 key 迁移量
添加一个新节点时,期望只有 $m/(n+1)$ 个 key 需要从其他节点迁移到新节点。这比取模哈希($m \cdot n/(n+1)$ 个 key 需要迁移)好 $n$ 倍。
虚拟节点与一致性的 trade-off
更多虚拟节点 → 更均匀的负载 → 但增减节点时迁移涉及更多的"碎片"(从多个不同节点各迁移少量 key)。
实践中的选择:
- Amazon DynamoDB: 每个节点 150-200 虚拟节点
- Cassandra: 每个节点 256 虚拟节点(默认
num_tokens) - Redis Cluster: 不用虚拟节点,直接把 16384 个 slot 分配给节点(本质是粗粒度的虚拟节点)
39.10 平滑加权轮询的数学证明
定理: Nginx 的平滑加权轮询算法,在一个完整的权重周期内($\sum w_i$ 次选择),每个服务器 $i$ 被选中恰好 $w_i$ 次。
证明思路:
设总权重 $W = \sum_{i=1}^{n} w_i$。每一轮选择:
- 所有服务器 $current_weight_i += w_i$
- 选最大的 $current_weight_{max}$
- 被选中的 $current_weight_{max} -= W$
一个完整周期($W$ 次选择)后,每个服务器的 current_weight 净变化为:$w_i \times W - w_i \times W = 0$(加了 $W$ 次 $w_i$,被选中 $w_i$ 次各减 $W$)。这意味着系统回到初始状态,且每个服务器恰好被选 $w_i$ 次。
"平滑性"的保证
关键性质:在权重序列中,不会出现高权重服务器被连续选择的情况。例如权重 {A:5, B:1, C:1} 的选择序列是 A B A C A A A 而不是 A A A A A B C。
这个性质由算法中"选最大的后减去总权重"保证:一旦某服务器被选中,它的 current_weight 立刻下降 $W$ 的量,在随后的几轮中不太可能再次成为最大。
39.11 负载均衡算法的延迟分析
用排队论模型分析不同负载均衡算法的性能差异:
假设: N 台服务器,每台处理速率 $\mu$,总到达率 $\lambda < N\mu$。
Round Robin 的平均响应时间:
如果所有请求处理时间相同(确定性服务),Round Robin 是最优的——它等价于一个 M/D/1 队列(到达率 $\lambda/N$,服务率 $\mu$)。
如果处理时间有方差(指数分布),Round Robin 退化为 N 个独立的 M/M/1 队列,平均响应时间:
$$E[T_{RR}] = \frac{1}{\mu - \lambda/N}$$
Join-Shortest-Queue(JSQ,最短队列)的优势:
当处理时间有方差时,JSQ 可以达到接近单个 M/M/N 队列的性能:
$$E[T_{JSQ}] \approx \frac{1}{\mu} + \frac{\rho^N}{N\mu(1-\rho)} \quad (\text{其中 } \rho = \lambda/(N\mu))$$
JSQ 在高负载下比 Round Robin 好一个数量级——因为它利用了"当前队列长度"的信息做出更智能的决策。但 JSQ 在分布式环境中需要实时获取所有服务器的队列长度,通信开销大。
Power of Two Choices(两次随机选择):
Mitzenmacher (2001, "The Power of Two Choices in Randomized Load Balancing") 证明了一个惊人的结论:从 N 台服务器中随机选 2 台,然后选其中负载更小的那台——这个简单策略把最大队列长度从 $O(\log N / \log\log N)$(纯随机)降低到 $O(\log\log N)$(两次选择),是指数级的改善!
import random
class PowerOfTwoChoices:
"""
两次随机选择负载均衡
随机选两台服务器,选负载更轻的那台。
数学上已证明这比纯随机好指数级(最大队列长度 O(log log N))。
这是 Nginx 的 "random two" 策略的理论基础。
"""
def __init__(self, servers: list):
self.servers = servers
self.loads = {s: 0 for s in servers}
def acquire(self) -> str:
# 随机选两台
s1, s2 = random.sample(self.servers, 2)
# 选负载小的
chosen = s1 if self.loads[s1] <= self.loads[s2] else s2
self.loads[chosen] += 1
return chosen
def release(self, server: str) -> None:
self.loads[server] = max(0, self.loads[server] - 1)
Level 3 · 规范怎么定义的
39.12 Nginx 中的限流实现
Nginx 的 ngx_http_limit_req_module 模块实现了漏桶算法。核心配置:
# 定义限流区(共享内存)
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
# 应用限流
location /api/ {
limit_req zone=api burst=20 nodelay;
}
参数解析:
rate=10r/s: 漏桶的漏出速率,每秒 10 个请求burst=20: 桶容量,最多排队 20 个请求nodelay: 桶中的请求不排队等待,直接转发(相当于令牌桶行为)$binary_remote_addr: 按客户端 IP 限流(每个 IP 有独立的桶)
内部实现细节(Nginx 源码分析):
Nginx 使用红黑树(per-zone)存储每个 key 的限流状态(last access time + excess count),使用共享内存(ngx_slab)在 worker 进程间共享。
// 简化的核心逻辑 (ngx_http_limit_req_module.c)
// excess 表示"桶中超出的请求数"(惰性计算)
excess = lr->excess - rate * ms_since_last / 1000;
if (excess < 0) excess = 0;
if (excess + 1000 <= burst * 1000) {
// 允许:更新 excess
lr->excess = excess + 1000;
lr->last = now;
return NGX_OK;
} else {
// 拒绝:返回 503
return NGX_HTTP_SERVICE_UNAVAILABLE;
}
Nginx 用整数运算(乘以 1000)避免浮点数,这在高频调用路径中很重要。
nodelay 的含义
不加 nodelay 时:超出 rate 但在 burst 内的请求会被延迟处理(排队等待),直到漏桶漏出足够的空间。这是纯漏桶行为。
加了 nodelay 后:只要 burst 桶没满,所有请求立即转发——但 burst 的恢复仍然按 rate 速率。效果等价于令牌桶:允许突发,但长期平均不超过 rate。
39.13 Redis 中的限流实现
方案 1:固定窗口(最简单)
-- 固定窗口限流(Lua 脚本,保证原子性)
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = tonumber(redis.call('GET', key) or "0")
if current >= limit then
return 0 -- 拒绝
end
redis.call('INCR', key)
if current == 0 then
redis.call('EXPIRE', key, window)
end
return 1 -- 放行
方案 2:滑动窗口(Sorted Set)
-- 滑动窗口限流(使用有序集合)
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- 移除窗口外的旧记录
redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
-- 计数
local count = redis.call('ZCARD', key)
if count >= limit then
return 0 -- 拒绝
end
-- 添加当前请求
redis.call('ZADD', key, now, now .. math.random())
redis.call('PEXPIRE', key, window * 1000)
return 1 -- 放行
方案 3:redis_cell 模块(GCRA)
-- CL.THROTTLE 命令(redis_cell 模块)
-- CL.THROTTLE key burst rate period [quantity]
-- 例如:每 60 秒最多 30 次请求,允许突发 10 个
CL.THROTTLE user:123 10 30 60 1
-- 返回值:
-- 1) 0 (allowed) 或 1 (denied)
-- 2) 总容量 (burst + 1)
-- 3) 剩余容量
-- 4) 如果被拒绝,多少秒后重试
-- 5) 多少秒后桶完全恢复
redis_cell 的优势:一个命令完成限流判断 + 状态更新,原子性保证;返回丰富的元信息(剩余额度、重试时间),方便客户端实现退避策略。
39.14 API Gateway 中的限流分层
生产系统中的限流通常是多层的:
用户请求 → CDN 层限流 → API Gateway 限流 → 服务层限流 → 数据库连接池限流
↓ ↓ ↓ ↓
DDoS 防护 用户级/API级 服务级保护 资源保护
(L3/L4 层) (L7 层) (熔断器) (信号量)
每层的职责不同:
| 层 | 限流粒度 | 算法选择 | 典型实现 |
|---|---|---|---|
| CDN/边缘 | IP / 地理位置 | 固定窗口 + 黑名单 | Cloudflare, AWS WAF |
| API Gateway | 用户 / API Key / 路由 | 令牌桶 / 滑动窗口 | Kong, Envoy, Nginx |
| 服务间 | 服务标识 / 方法 | 自适应限流 | Sentinel, Hystrix |
| 数据库 | 连接数 / 查询复杂度 | 信号量 / 排队 | Connection Pool, Query Queue |
39.15 Google SRE 的限流实践
Google SRE(Site Reliability Engineering)在《SRE Book》(Beyer et al., 2016, O'Reilly) 中描述了他们的限流哲学:
核心原则:客户端自适应限流(Client-Side Adaptive Throttling)
传统限流是服务端拒绝过多请求。Google 的方法是让客户端主动减少请求:
import random
class AdaptiveThrottle:
"""
Google SRE 的客户端自适应限流
思路:客户端跟踪最近的请求成功率。
如果服务端开始拒绝请求,客户端主动降低发送速率。
核心公式:
rejection_probability = max(0, (requests - K * accepts) / (requests + 1))
其中:
- requests: 最近窗口内的总请求数
- accepts: 最近窗口内被服务端接受的请求数
- K: 倍率因子(通常 K=2),表示"只要成功率 > 1/K,就不限流"
K=2 意味着:只要超过 50% 的请求成功,客户端就不限流。
当成功率降低到 50% 以下时,客户端开始按概率丢弃请求。
优势:
1. 无需服务端配置限流阈值
2. 自动适应后端容量变化
3. 优雅降级(不是突然从 100% 变到 0%)
"""
def __init__(self, K: float = 2.0, window_size: int = 120):
self.K = K
self.requests = 0
self.accepts = 0
def should_send(self) -> bool:
"""客户端判断是否应该发送请求"""
rejection_prob = max(0.0, (self.requests - self.K * self.accepts) / (self.requests + 1))
if random.random() < rejection_prob:
return False # 主动丢弃
return True
def record_request(self) -> None:
"""记录发送了一个请求"""
self.requests += 1
def record_accept(self) -> None:
"""记录请求被服务端接受"""
self.accepts += 1
def record_reject(self) -> None:
"""记录请求被服务端拒绝"""
pass # requests 已经在 record_request 中 +1
def decay(self, factor: float = 0.5) -> None:
"""定期衰减计数器(实现滑动窗口效果)"""
self.requests = int(self.requests * factor)
self.accepts = int(self.accepts * factor)
为什么 K=2?
- K=1: 一旦有任何请求被拒绝就开始限流——太敏感
- K=2: 允许最多 50% 的请求失败才开始限流——平衡灵敏度和稳定性
- K=3: 允许 67% 失败才限流——太迟钝
Google 在生产中使用 K=2,因为它在"快速反应"和"避免误触发"之间找到了好的平衡点。
与传统限流的对比
| 维度 | 传统服务端限流 | Google 客户端自适应限流 |
|---|---|---|
| 配置 | 需要预设阈值 | 无需配置(自适应) |
| 适应性 | 静态阈值,扩缩容需更新 | 自动跟随后端容量 |
| 延迟 | 请求到达服务端才被拒绝 | 客户端直接丢弃,节省网络往返 |
| 级联故障 | 上游仍会发大量请求 | 上游主动减压 |
| 复杂度 | 服务端实现 | 需要客户端库支持 |
39.16 限流算法在微服务中的组合使用
实际生产系统通常组合多种限流策略:
class CompositeRateLimiter:
"""
组合限流器:多维度限流
示例:一个 API 同时有以下限制:
- 全局:10000 req/s(保护整个服务)
- 每用户:100 req/s(公平性)
- 每用户每接口:10 req/s(防止滥用单个接口)
- 每 IP:1000 req/s(防 DDoS)
请求必须通过所有维度的检查才能放行。
"""
def __init__(self):
self.global_limiter = TokenBucket(rate=10000, burst=15000)
self.user_limiters = {} # user_id -> TokenBucket
self.ip_limiters = {} # ip -> TokenBucket
def allow(self, user_id: str, ip: str, endpoint: str) -> bool:
# 全局检查
if not self.global_limiter.allow():
return False
# 用户级检查
if user_id not in self.user_limiters:
self.user_limiters[user_id] = TokenBucket(rate=100, burst=200)
if not self.user_limiters[user_id].allow():
return False
# IP 级检查
if ip not in self.ip_limiters:
self.ip_limiters[ip] = TokenBucket(rate=1000, burst=2000)
if not self.ip_limiters[ip].allow():
return False
return True
39.17 分布式限流的挑战
问题: 多台 API Gateway 实例各自维护独立的限流计数器。用户限制 100 req/s,有 5 台 Gateway 实例,用户实际可以发 500 req/s。
方案 1:集中式(Redis)
所有实例共享一个 Redis 计数器。精确但每个请求增加一次 Redis 往返延迟(~0.5-1ms)。
方案 2:分片配额
每台 Gateway 分到 100/5 = 20 req/s 的配额。简单但不均匀(某些 Gateway 收到更多来自某用户的请求)。
方案 3:本地限流 + 定期同步
每台 Gateway 本地限流(粗略配额),每隔 1-5 秒向中心同步实际使用量并调整配额。
class DistributedRateLimiter:
"""
分布式限流:本地桶 + 定期 Redis 同步
平衡精确性和延迟:
- 本地判断:0 延迟
- 定期同步:每秒一次 Redis 交互,调整本地配额
误差:最多偏差 rate * sync_interval * num_instances
例如:100 req/s * 1s * 5 instances = 最多多放 500 请求
对于大多数场景可接受(限流不需要绝对精确)
"""
def __init__(self, total_rate: float, num_instances: int, sync_interval: float = 1.0):
# 初始均分配额
self.local_rate = total_rate / num_instances
self.local_bucket = TokenBucket(rate=self.local_rate, burst=int(self.local_rate * 2))
self.sync_interval = sync_interval
self.local_count = 0 # 上次同步以来的本地通过量
def allow(self) -> bool:
if self.local_bucket.allow():
self.local_count += 1
return True
return False
def sync_with_redis(self, redis_client, key: str, total_limit: int) -> None:
"""定期调用:与 Redis 同步并调整本地配额"""
# 上报本地使用量
# global_used = redis_client.incrby(key, self.local_count)
# 根据全局使用情况调整本地配额
# ...
self.local_count = 0
Envoy 的本地限流 + 全局限流设计:
Envoy proxy 有两层限流:
- Local Rate Limit Filter: 每个 Envoy 实例独立限流,无外部依赖
- Global Rate Limit Service: 调用外部 gRPC 服务(如
ratelimit项目)做全局判断
请求先过本地限流(快速拒绝明显超限的),再过全局限流(精确判断)。这种分层设计在延迟和精确性之间取得了很好的平衡。
Level 4 · 边界与陷阱
39.18 面试中系统设计题的限流部分怎么回答
典型面试场景: "设计一个短链接服务/聊天系统/支付系统——如何处理流量控制?"
回答框架(5 步):
第一步:明确限流目标
- 保护谁?(后端服务、数据库、第三方 API)
- 限制什么?(请求总量、每用户请求量、每IP请求量)
- 限制到什么程度?(根据系统容量估算)
第二步:选择限流算法
- 对用户友好(允许突发):令牌桶
- 需要平滑流量:漏桶
- 需要精确的"N次/T秒"语义:滑动窗口
第三步:确定限流位置
客户端 → CDN → Load Balancer → API Gateway → Service → Database
L3/L4 L4/L7 L7 用户级 服务级 连接池级
越靠前拒绝,浪费的资源越少。但靠前的层没有用户身份信息。
第四步:处理限流响应
- HTTP 429 Too Many Requests
Retry-Afterheader(告诉客户端多久后重试)X-RateLimit-Limit、X-RateLimit-Remaining、X-RateLimit-Reset(告诉客户端当前配额状态)
第五步:讨论分布式限流
- 单点 vs 分布式(Redis vs 本地)
- 精确性 vs 延迟的 trade-off
- 限流的"柔性":不是突然 100% 拒绝,而是渐进式降级
39.19 限流的反模式
反模式 1:限流粒度太粗
只做全局限流(如"全服务 10000 QPS"),不做用户级限流。后果:一个恶意用户耗尽全部配额,正常用户全部受影响。
反模式 2:限流不返回有意义的错误
返回 500 Internal Server Error 而不是 429 Too Many Requests。后果:客户端无法区分"服务故障"和"被限流",无法做针对性的退避重试。
反模式 3:限流配置写死在代码中
# 反模式
RATE_LIMIT = 100 # 写死
# 正确做法
RATE_LIMIT = config.get("rate_limit", default=100) # 可动态调整
后果:无法快速响应流量变化(需要发布代码才能改限流阈值)。
反模式 4:被限流的请求没有指数退避重试
客户端被限流后立即无限重试,造成"重试风暴"——被限流的请求越多,重试产生的额外请求越多,恶性循环。
正确做法:
import random
import time
def request_with_backoff(url, max_retries=5):
"""带指数退避的重试"""
for attempt in range(max_retries):
response = send_request(url)
if response.status_code == 429:
# 指数退避 + 抖动(jitter)
wait = min(2 ** attempt + random.uniform(0, 1), 30)
time.sleep(wait)
else:
return response
raise Exception("Max retries exceeded")
为什么要加 jitter(随机抖动)?如果 100 个客户端同时被限流,不加 jitter 它们会在完全相同的时间重试(如 1s 后全部重试),造成"惊群效应"。加 jitter 让重试时间分散。
39.20 限流与熔断的区别
| 维度 | 限流 (Rate Limiting) | 熔断 (Circuit Breaking) |
|---|---|---|
| 触发条件 | 请求速率超过阈值 | 下游错误率超过阈值 |
| 保护对象 | 自身 | 下游服务 |
| 行为 | 拒绝超出的请求 | 暂时切断所有到下游的请求 |
| 恢复方式 | 速率降下来自动恢复 | 半开状态(尝试放行少量请求测试) |
| 算法 | 令牌桶/漏桶/滑动窗口 | 状态机(关闭→打开→半开) |
两者经常配合使用:限流保护自己不被上游压垮,熔断保护下游不被自己压垮。
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # 正常状态,请求正常转发
OPEN = "open" # 熔断状态,所有请求直接失败
HALF_OPEN = "half_open" # 半开状态,放行少量请求测试
class CircuitBreaker:
"""
熔断器
状态转换:
CLOSED → OPEN: 错误率超过阈值(如 50%)
OPEN → HALF_OPEN: 经过 recovery_timeout 后
HALF_OPEN → CLOSED: 测试请求成功
HALF_OPEN → OPEN: 测试请求失败
Netflix Hystrix (2012) 开创了这个模式,现在已被
Resilience4j、Sentinel 等框架广泛采用。
"""
def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 30.0):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = 0
def allow(self) -> bool:
if self.state == CircuitState.CLOSED:
return True
elif self.state == CircuitState.OPEN:
# 检查是否到了恢复时间
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
return True # 放行一个测试请求
return False
else: # HALF_OPEN
return True # 放行测试请求
def record_success(self) -> None:
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
def record_failure(self) -> None:
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
39.21 实战案例:设计一个多维限流系统
需求: 为一个 SaaS API 平台设计限流,支持:
- 免费用户 10 req/min,付费用户 1000 req/min
- 全局限流 50000 req/s
- 单 IP 限流 100 req/s
- 特定接口(如 /upload)额外限制 5 req/min
- 超限后返回明确的错误信息和重试建议
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class RateLimitResult:
allowed: bool
limit: int
remaining: int
reset_at: float
retry_after: Optional[float] = None
def to_headers(self) -> dict:
"""转换为 HTTP 响应头"""
headers = {
'X-RateLimit-Limit': str(self.limit),
'X-RateLimit-Remaining': str(max(0, self.remaining)),
'X-RateLimit-Reset': str(int(self.reset_at)),
}
if self.retry_after is not None:
headers['Retry-After'] = str(int(self.retry_after))
return headers
class MultiDimensionRateLimiter:
"""
多维限流器
设计思路:
1. 从最严格(最容易触发)到最宽松的顺序检查
2. 一旦某层被限制,立即返回(短路评估)
3. 只有所有层都通过才放行
4. 返回最紧张那层的信息(帮助用户了解瓶颈)
"""
def __init__(self):
self.global_bucket = TokenBucket(rate=50000, burst=75000)
self.ip_buckets = {}
self.user_buckets = {}
self.endpoint_buckets = {}
def check(self, user_id: str, user_tier: str, ip: str, endpoint: str) -> RateLimitResult:
now = time.time()
# 层 1:全局
if not self.global_bucket.allow():
return RateLimitResult(
allowed=False, limit=50000, remaining=0,
reset_at=now + 1, retry_after=1.0
)
# 层 2:IP 级
if ip not in self.ip_buckets:
self.ip_buckets[ip] = TokenBucket(rate=100, burst=200)
if not self.ip_buckets[ip].allow():
return RateLimitResult(
allowed=False, limit=100, remaining=0,
reset_at=now + 1, retry_after=1.0
)
# 层 3:用户级(根据套餐)
user_rate = 1000 if user_tier == 'paid' else 10
user_burst = user_rate * 2
if user_id not in self.user_buckets:
self.user_buckets[user_id] = TokenBucket(rate=user_rate / 60, burst=user_burst // 60)
if not self.user_buckets[user_id].allow():
return RateLimitResult(
allowed=False, limit=user_rate, remaining=0,
reset_at=now + 60, retry_after=60.0 / user_rate
)
# 层 4:特定接口
if endpoint == '/upload':
ep_key = f"{user_id}:{endpoint}"
if ep_key not in self.endpoint_buckets:
self.endpoint_buckets[ep_key] = TokenBucket(rate=5.0/60, burst=5)
if not self.endpoint_buckets[ep_key].allow():
return RateLimitResult(
allowed=False, limit=5, remaining=0,
reset_at=now + 60, retry_after=12.0
)
return RateLimitResult(
allowed=True, limit=user_rate, remaining=user_rate - 1,
reset_at=now + 60
)
39.22 调度算法在操作系统中的真实应用
Linux 完全公平调度器(CFS, Completely Fair Scheduler)
CFS 由 Ingo Molnar (2007) 开发,用红黑树维护所有可运行进程,按"虚拟运行时间"(vruntime)排序。每次调度选择 vruntime 最小的进程运行。
核心数据结构:红黑树(rb_tree),以 vruntime 为 key。
class SimpleCFS:
"""
CFS 调度器简化模型
核心思想:让每个进程获得公平的 CPU 时间。
"公平"的定义:所有进程的 vruntime 尽量相等。
vruntime 增长速率与权重成反比:
高优先级进程的 vruntime 增长慢 → 被调度的机会更多
低优先级进程的 vruntime 增长快 → 被调度的机会更少
"""
def __init__(self):
import sortedcontainers
self.tree = sortedcontainers.SortedList(key=lambda p: p['vruntime'])
def add_process(self, pid: int, nice: int = 0):
"""添加进程(nice 值越低优先级越高)"""
weight = self._nice_to_weight(nice)
process = {
'pid': pid,
'vruntime': self._min_vruntime(), # 新进程从当前最小 vruntime 开始
'weight': weight,
'nice': nice
}
self.tree.add(process)
def schedule(self) -> dict:
"""选择下一个运行的进程"""
if not self.tree:
return None
# 选 vruntime 最小的(红黑树最左节点,O(1))
return self.tree[0]
def tick(self, running_process: dict, elapsed_ns: int):
"""时钟中断:更新运行进程的 vruntime"""
self.tree.remove(running_process)
# vruntime 增量与权重成反比
delta_vruntime = elapsed_ns * 1024 / running_process['weight']
running_process['vruntime'] += delta_vruntime
self.tree.add(running_process)
def _min_vruntime(self) -> float:
if self.tree:
return self.tree[0]['vruntime']
return 0.0
def _nice_to_weight(self, nice: int) -> int:
"""nice 值到权重的映射(简化版)"""
# nice 每差 1,权重差约 25%(Linux 内核精确表)
return max(1, int(1024 * (1.25 ** (-nice))))
CFS 之所以用红黑树而不是堆:因为需要 O(log n) 的插入和删除(进程阻塞/唤醒时从树中移除/加入),而堆只能 O(1) 获取最小值但删除任意元素需要 O(n) 查找。
本章小结
限流和调度是数据结构与算法在生产环境中最直接的应用。它们不是"面试造飞机"——每个后端服务背后都有限流器保护,每个操作系统内核都有调度器在运行。
核心要点:
- 令牌桶 = 允许突发的限流,一个计数器 + 惰性计算
- 漏桶 = 严格匀速的限流,本质是固定速率队列
- 滑动窗口 = 精确的"N次/T秒"语义,日志法精确但耗内存
- 一致性哈希 = 增减节点时最小化 key 迁移,虚拟节点保证均匀
- Power of Two Choices = 简单但效果指数级优于纯随机
工业实践的核心教训:没有完美的限流算法,只有适合场景的组合策略。 在延迟、精确性、内存、实现复杂度之间做 trade-off,是系统设计的本质。