第 39 章

实战算法:限流与调度

第三十九章:实战算法 — 限流与调度

前面三十八章我们研究了几乎所有经典数据结构和算法。但在真实生产环境中,有一类算法几乎每个后端工程师都会遇到,却很少出现在传统教科书中——限流(Rate Limiting)负载均衡/调度(Load Balancing / Scheduling)

限流解决的核心问题是:当请求量超过系统承载能力时,如何有选择地拒绝部分请求以保护系统不崩溃。调度解决的核心问题是:当有多个后端服务器可以处理请求时,如何将请求分配到最合适的服务器以最大化吞吐量、最小化延迟。

这两类算法在面试中通常出现在系统设计环节,但它们背后的数据结构和算法思想完全可以用代码实现——而且在 Nginx、Redis、API Gateway 等系统中确实有精确的代码实现。本章用严格的算法视角来剖析它们。


Level 1 · 你需要知道的

39.1 令牌桶算法(Token Bucket)

问题定义

给定一个系统,允许的最大请求速率为 rate 个/秒,允许的最大突发量为 burst 个。设计一个限流器:每来一个请求判断是否放行。

算法思想

想象一个桶,桶的容量是 burst。每秒往桶里匀速添加 rate 个令牌。每个请求需要消耗一个令牌才能通过:

令牌桶的巧妙之处在于:它允许短时间的突发流量(把桶中积累的令牌一次性消耗完),但长期平均速率不会超过 rate

这个算法最早由 Turner (1986, "New Directions in Communications") 提出,后被广泛应用于网络流量整形(Traffic Shaping)。IETF RFC 2697 和 RFC 2698 定义了基于令牌桶的流量策略标准。

为什么不需要后台线程添加令牌?

如果真的启一个线程每隔 1/rate 秒往桶里放一个令牌,那是最朴素的实现。但实际生产中,我们用惰性计算:只在请求到来时计算"自上次以来应该添加了多少令牌"。

import time


class TokenBucket:
    """
    令牌桶限流器
    
    核心公式:
    当前令牌数 = min(burst, 上次令牌数 + (当前时间 - 上次时间) * rate)
    
    为什么用惰性计算?
    1. 不需要后台线程(节省资源)
    2. 精度更高(浮点数时间戳 vs 固定间隔)
    3. 没有线程同步问题
    """
    
    def __init__(self, rate: float, burst: int):
        """
        Args:
            rate: 令牌填充速率(个/秒)
            burst: 桶容量(最大突发量)
        """
        self.rate = rate
        self.burst = burst
        self.tokens = float(burst)  # 初始满桶
        self.last_time = time.time()
    
    def allow(self) -> bool:
        """判断当前请求是否被允许通过"""
        now = time.time()
        
        # 计算自上次以来应该添加的令牌数
        elapsed = now - self.last_time
        self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
        self.last_time = now
        
        if self.tokens >= 1.0:
            self.tokens -= 1.0
            return True
        else:
            return False
    
    def allow_n(self, n: int) -> bool:
        """判断是否允许 n 个令牌同时消耗(批量请求)"""
        now = time.time()
        elapsed = now - self.last_time
        self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
        self.last_time = now
        
        if self.tokens >= n:
            self.tokens -= n
            return True
        else:
            return False

参数含义

参数 含义 典型值
rate 长期平均允许速率 100 req/s
burst 允许的最大瞬时突发 200 (2秒的缓冲)

为什么初始是满桶?

这是一个设计选择。满桶意味着系统启动后可以立即处理一波突发请求。如果初始为空桶,则系统启动后需要等待 burst/rate 秒才能达到最大处理能力——在服务重启后这可能造成不必要的拒绝。

令牌桶 vs 固定窗口计数器

最简单的限流是"每秒最多 N 个请求"(固定窗口计数器),但有一个致命问题:

时间轴: |--- 第1秒 ---|--- 第2秒 ---|
请求:   ......[99个在末尾][99个在开头]......

如果限制是 100 req/s,在第 1 秒的最后 0.5 秒和第 2 秒的最前 0.5 秒各发 99 个请求,实际 1 秒内有 198 个请求通过——几乎是限制的 2 倍!

令牌桶没有"窗口边界"问题,因为它是基于连续时间的。

39.2 漏桶算法(Leaky Bucket)

算法思想

想象一个桶底有一个固定大小的洞,水(请求)以任意速率倒入桶中:

漏桶的核心特性是输出速率严格恒定——即使输入有突发,输出也是匀速的。

import time
from collections import deque


class LeakyBucket:
    """
    漏桶限流器
    
    与令牌桶的关键区别:
    - 令牌桶:允许突发(只要桶里有令牌就放行)
    - 漏桶:严格匀速输出(请求被排队,按固定速率处理)
    
    漏桶更适合需要"平滑流量"的场景,如网络包发送。
    令牌桶更适合"允许突发但限制平均速率"的场景,如 API 限流。
    """
    
    def __init__(self, rate: float, capacity: int):
        """
        Args:
            rate: 漏出速率(个/秒),即处理速率
            capacity: 桶容量(等待队列最大长度)
        """
        self.rate = rate
        self.capacity = capacity
        self.water = 0.0  # 当前桶中的水量
        self.last_time = time.time()
    
    def allow(self) -> bool:
        """请求是否允许进入桶(排队等待处理)"""
        now = time.time()
        elapsed = now - self.last_time
        
        # 计算已经漏出的水量
        leaked = elapsed * self.rate
        self.water = max(0.0, self.water - leaked)
        self.last_time = now
        
        if self.water < self.capacity:
            self.water += 1.0
            return True  # 请求进入排队
        else:
            return False  # 桶满,拒绝


class LeakyBucketQueue:
    """
    带实际队列的漏桶实现(请求被缓冲后匀速处理)
    
    适用场景:消息队列消费者、任务调度器
    """
    
    def __init__(self, rate: float, capacity: int):
        self.rate = rate
        self.capacity = capacity
        self.queue = deque()
        self.last_leak_time = time.time()
    
    def enqueue(self, request) -> bool:
        """尝试将请求入队"""
        if len(self.queue) >= self.capacity:
            return False  # 队列满,拒绝
        self.queue.append(request)
        return True
    
    def dequeue(self):
        """按漏桶速率取出请求(应由消费线程调用)"""
        now = time.time()
        elapsed = now - self.last_leak_time
        
        # 只有间隔足够长才允许取出下一个
        if elapsed >= 1.0 / self.rate and self.queue:
            self.last_leak_time = now
            return self.queue.popleft()
        return None

令牌桶 vs 漏桶对比

维度 令牌桶 (Token Bucket) 漏桶 (Leaky Bucket)
突发处理 允许突发(桶中有令牌就放行) 不允许突发(严格匀速输出)
输出速率 可变(突发时可高于平均速率) 恒定(永远等于 rate)
实现复杂度 简单(一个计数器 + 时间戳) 中等(需要队列或计数器)
适用场景 API 限流、允许短暂突发 网络整形、需要平滑流量
数学模型 类似银行存款(积累+消费) 类似恒速传送带

为什么 Nginx 用漏桶而 Redis 用令牌桶?

Nginx 的 limit_req 模块使用漏桶模型,因为 Nginx 作为反向代理,需要平滑后端服务器收到的请求速率,避免突发压垮后端。

Redis 的 redis_cell 模块(GCRA 算法)和很多 API Gateway 使用令牌桶模型,因为对于用户 API 调用,允许短暂突发是更好的体验(用户不会因为恰好在毫秒级时间窗口发了两个请求就被限流)。

39.3 滑动窗口限流

问题定义

限制"任意连续 T 秒内最多 N 个请求"。这比固定窗口更精确(没有边界问题),比令牌桶更直观(直接表达"N 次/T 秒"的语义)。

方法 1:滑动窗口日志(精确但内存大)

记录每个请求的时间戳。判断时,数一下最近 T 秒内有多少个时间戳。

import time
from collections import deque


class SlidingWindowLog:
    """
    滑动窗口日志限流
    
    原理:维护一个时间戳队列,每次请求时:
    1. 移除所有 > T 秒前的时间戳
    2. 如果队列长度 < N,放行并记录时间戳
    3. 否则拒绝
    
    优点:精确,没有窗口边界问题
    缺点:每个用户需要存储最多 N 个时间戳,内存开销 O(N * 用户数)
    """
    
    def __init__(self, max_requests: int, window_seconds: float):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.timestamps = deque()
    
    def allow(self) -> bool:
        now = time.time()
        
        # 移除窗口外的旧时间戳
        while self.timestamps and self.timestamps[0] <= now - self.window_seconds:
            self.timestamps.popleft()
        
        if len(self.timestamps) < self.max_requests:
            self.timestamps.append(now)
            return True
        else:
            return False

方法 2:滑动窗口计数器(近似但省内存)

将时间划分为小的子窗口(如 1 秒的窗口划分为 10 个 100ms 子窗口),每个子窗口维护一个计数器。

import time
import math


class SlidingWindowCounter:
    """
    滑动窗口计数器限流(近似算法)
    
    思路:将大窗口划分为多个小窗格(slot),用环形数组存储每个窗格的计数。
    判断时,累加所有在当前窗口内的窗格计数。
    
    精度取决于窗格数量:窗格越多越精确,但内存越大。
    这是 Cloudflare 在其边缘限流中使用的算法变体。
    """
    
    def __init__(self, max_requests: int, window_seconds: float, num_slots: int = 10):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.num_slots = num_slots
        self.slot_duration = window_seconds / num_slots
        
        self.slots = [0] * num_slots
        self.slot_timestamps = [0.0] * num_slots  # 每个 slot 对应的时间
        self.current_slot = 0
    
    def _get_slot_index(self, timestamp: float) -> int:
        """根据时间戳计算对应的 slot 索引"""
        return int(timestamp / self.slot_duration) % self.num_slots
    
    def allow(self) -> bool:
        now = time.time()
        current_idx = self._get_slot_index(now)
        
        # 清除过期的 slot
        self._clean_expired(now)
        
        # 累加窗口内所有 slot 的计数
        total = sum(self.slots)
        
        if total < self.max_requests:
            self.slots[current_idx] += 1
            self.slot_timestamps[current_idx] = now
            return True
        else:
            return False
    
    def _clean_expired(self, now: float) -> None:
        """清除超出窗口的旧 slot"""
        window_start = now - self.window_seconds
        for i in range(self.num_slots):
            if self.slot_timestamps[i] < window_start:
                self.slots[i] = 0

方法 3:两窗口加权(工业常用近似)

这是一个极简但有效的近似方法,被广泛使用:

import time


class SlidingWindowApprox:
    """
    双窗口加权近似
    
    思路:
    - 维护当前窗口和上一个窗口的计数
    - 近似当前滑动窗口内的请求数 = 上一窗口计数 * 重叠比例 + 当前窗口计数
    
    例如:窗口 = 1分钟
    当前时间在当前窗口内过了 40%
    近似值 = prev_count * 0.6 + curr_count * 1.0
    
    这是 Cloudflare 博客 (2017) 中描述的生产限流算法。
    误差最大约 12.5%(可证明),对大多数场景足够精确。
    """
    
    def __init__(self, max_requests: int, window_seconds: float):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        
        self.prev_count = 0
        self.curr_count = 0
        self.window_start = time.time()
    
    def allow(self) -> bool:
        now = time.time()
        elapsed = now - self.window_start
        
        # 检查是否需要翻转窗口
        if elapsed >= self.window_seconds:
            # 可能跨了多个窗口
            windows_passed = int(elapsed / self.window_seconds)
            if windows_passed == 1:
                self.prev_count = self.curr_count
            else:
                self.prev_count = 0  # 跨了太多窗口,上一窗口计数归零
            self.curr_count = 0
            self.window_start += windows_passed * self.window_seconds
            elapsed = now - self.window_start
        
        # 计算加权近似值
        weight = 1.0 - elapsed / self.window_seconds  # 上一窗口的重叠比例
        approx_count = self.prev_count * weight + self.curr_count
        
        if approx_count < self.max_requests:
            self.curr_count += 1
            return True
        else:
            return False

39.4 三种限流算法对比总结

维度 令牌桶 漏桶 滑动窗口日志 滑动窗口计数器
精确度 精确 精确 精确 近似
允许突发 是(窗口内随意分布)
内存 O(1) O(1) 或 O(queue) O(N * 用户数) O(slots * 用户数)
实现复杂度 低-中
适用场景 API 限流 网络整形 小规模精确限流 大规模分布式限流

39.5 负载均衡算法

当有多个后端服务器时,如何选择将请求发给谁?

轮询(Round Robin)

class RoundRobin:
    """
    最简单的负载均衡:按顺序循环分配
    
    优点:实现简单,完全公平
    缺点:不考虑服务器处理能力差异,不考虑当前负载
    """
    
    def __init__(self, servers: list):
        self.servers = servers
        self.index = 0
    
    def next_server(self) -> str:
        server = self.servers[self.index % len(self.servers)]
        self.index += 1
        return server

加权轮询(Weighted Round Robin)

class WeightedRoundRobin:
    """
    加权轮询:按权重分配请求
    
    Nginx 使用的平滑加权轮询算法(Smooth Weighted Round Robin):
    避免连续将请求发到同一台高权重服务器。
    
    算法由 Nginx 开发者 phxc 提出:
    1. 每个服务器有 weight(配置权重)和 current_weight(当前权重,动态变化)
    2. 每次选择时:所有服务器 current_weight += weight
    3. 选 current_weight 最大的服务器
    4. 被选中的服务器 current_weight -= total_weight
    
    效果:权重为 {A:5, B:1, C:1} 时,分配序列是 A,A,B,A,C,A,A 而不是 A,A,A,A,A,B,C
    """
    
    def __init__(self, servers: list, weights: list):
        self.servers = servers
        self.weights = weights
        self.current_weights = [0] * len(servers)
        self.total_weight = sum(weights)
    
    def next_server(self) -> str:
        # 所有服务器 current_weight += weight
        for i in range(len(self.servers)):
            self.current_weights[i] += self.weights[i]
        
        # 选最大的
        max_idx = 0
        for i in range(1, len(self.servers)):
            if self.current_weights[i] > self.current_weights[max_idx]:
                max_idx = i
        
        # 被选中的减去 total_weight
        self.current_weights[max_idx] -= self.total_weight
        
        return self.servers[max_idx]

最少连接(Least Connections)

import heapq


class LeastConnections:
    """
    最少连接:将请求分配给当前活跃连接数最少的服务器
    
    适用场景:请求处理时间差异大(如有的请求 10ms,有的 10s)
    轮询在这种场景下会导致慢请求堆积在某台服务器上
    
    实现:用最小堆维护 (active_connections, server)
    """
    
    def __init__(self, servers: list):
        # (active_connections, server_index, server_name)
        self.heap = [(0, i, s) for i, s in enumerate(servers)]
        heapq.heapify(self.heap)
        self.server_entries = {}  # server -> heap entry reference
    
    def acquire(self) -> str:
        """获取一个服务器(连接数 +1)"""
        conns, idx, server = heapq.heappop(self.heap)
        heapq.heappush(self.heap, (conns + 1, idx, server))
        return server
    
    def release(self, server: str) -> None:
        """释放一个连接(连接数 -1)"""
        # 实际实现中需要 decrease-key 操作
        # 简化版:重建堆(生产中用更高效的数据结构)
        new_heap = []
        for conns, idx, s in self.heap:
            if s == server:
                new_heap.append((max(0, conns - 1), idx, s))
            else:
                new_heap.append((conns, idx, s))
        self.heap = new_heap
        heapq.heapify(self.heap)

一致性哈希(Consistent Hashing)

import hashlib
import bisect


class ConsistentHash:
    """
    一致性哈希:将 key 和服务器都映射到一个环上
    
    由 Karger 等人 (1997, "Consistent Hashing and Random Trees") 在 MIT 提出。
    
    核心优势:增减服务器时,只有 1/N 的 key 需要重新映射
    (而普通取模哈希 hash(key) % N 在 N 变化时几乎所有 key 都要重新映射)
    
    虚拟节点(virtual nodes):每个物理服务器映射为多个虚拟节点
    解决负载不均衡问题(物理节点少时,哈希环上的分布可能很不均匀)
    """
    
    def __init__(self, servers: list, num_replicas: int = 150):
        """
        Args:
            servers: 服务器列表
            num_replicas: 每个服务器的虚拟节点数(越多越均匀,但查找略慢)
        """
        self.num_replicas = num_replicas
        self.ring = []          # 排序的哈希值列表
        self.ring_map = {}      # 哈希值 -> 服务器名
        
        for server in servers:
            self.add_server(server)
    
    def _hash(self, key: str) -> int:
        """使用 MD5 生成哈希值(生产中可用 xxhash 等更快的哈希)"""
        return int(hashlib.md5(key.encode()).hexdigest(), 16)
    
    def add_server(self, server: str) -> None:
        """添加服务器(及其虚拟节点)到哈希环"""
        for i in range(self.num_replicas):
            virtual_key = f"{server}#{i}"
            h = self._hash(virtual_key)
            bisect.insort(self.ring, h)
            self.ring_map[h] = server
    
    def remove_server(self, server: str) -> None:
        """从哈希环中移除服务器"""
        for i in range(self.num_replicas):
            virtual_key = f"{server}#{i}"
            h = self._hash(virtual_key)
            self.ring.remove(h)
            del self.ring_map[h]
    
    def get_server(self, key: str) -> str:
        """根据 key 获取对应的服务器"""
        if not self.ring:
            return None
        
        h = self._hash(key)
        # 找到环上顺时针方向第一个 >= h 的节点
        idx = bisect.bisect_left(self.ring, h)
        if idx == len(self.ring):
            idx = 0  # 绕回环的起点
        
        return self.ring_map[self.ring[idx]]

为什么虚拟节点数通常是 100-200?

Karger 等人的分析表明,当虚拟节点数为 $k$ 时,负载不均衡度约为 $O(\sqrt{\log n / k})$,其中 $n$ 是 key 的数量。当 $k = 150$ 时,在大多数实际工作负载下,每台服务器的负载偏差在 5-10% 以内。更多虚拟节点意味着更均匀但查找更慢(bisect 在更长的数组上搜索),150 是常见的工程平衡点。

39.6 优先级调度

在任务调度中,不同任务有不同优先级。高优先级任务应该被优先处理。

import heapq
import time
from dataclasses import dataclass, field


@dataclass(order=True)
class Task:
    priority: int                          # 优先级(数字越小优先级越高)
    timestamp: float = field(compare=True) # 同优先级按 FIFO
    name: str = field(compare=False)       # 任务名(不参与比较)
    payload: object = field(compare=False, default=None)


class PriorityScheduler:
    """
    优先级调度器
    
    核心数据结构:最小堆
    支持:
    - 提交任务(带优先级)
    - 获取最高优先级任务
    - 优先级提升/降低(通过标记删除 + 重新入堆)
    
    防饥饿机制:aging(老化)
    长时间等待的低优先级任务会逐渐提升优先级
    """
    
    def __init__(self):
        self.heap = []
        self.counter = 0  # 用于 FIFO 打破平局
    
    def submit(self, name: str, priority: int, payload=None) -> None:
        """提交一个任务"""
        task = Task(priority=priority, timestamp=time.time(), name=name, payload=payload)
        heapq.heappush(self.heap, task)
    
    def next_task(self) -> Task:
        """获取下一个应该执行的任务"""
        if self.heap:
            return heapq.heappop(self.heap)
        return None
    
    def apply_aging(self, aging_seconds: float = 60.0, priority_boost: int = 1) -> None:
        """
        老化机制:等待超过 aging_seconds 的任务优先级提升
        
        为什么需要 aging?
        纯优先级调度会导致低优先级任务"饥饿"(永远得不到执行)。
        操作系统进程调度(如 Linux CFS)和作业调度系统都使用类似机制。
        """
        now = time.time()
        new_heap = []
        for task in self.heap:
            wait_time = now - task.timestamp
            if wait_time > aging_seconds:
                # 提升优先级(减小 priority 值)
                boosted_priority = max(0, task.priority - priority_boost)
                new_task = Task(priority=boosted_priority, timestamp=task.timestamp,
                              name=task.name, payload=task.payload)
                new_heap.append(new_task)
            else:
                new_heap.append(task)
        self.heap = new_heap
        heapq.heapify(self.heap)

Level 2 · 它是怎么运行的

39.7 令牌桶的数学模型

连续时间模型

设 $b(t)$ 为时刻 $t$ 桶中的令牌数,$r$ 为填充速率,$B$ 为桶容量。

$$b(t) = \min\left(B, , b(t_0) + r \cdot (t - t_0)\right)$$

其中 $t_0$ 是上一次消耗令牌的时刻。

突发持续时间

如果桶是满的($b = B$),以最大速率 $R_{max}$(瞬时无限速率,即尽可能快地发请求)消耗令牌,突发可以持续多久?

设突发速率为 $R_{burst}$($R_{burst} > r$),则突发持续时间:

$$T_{burst} = \frac{B}{R_{burst} - r}$$

因为在突发期间,每秒净消耗 $R_{burst} - r$ 个令牌(消耗 $R_{burst}$ 但同时以 $r$ 补充)。

例如:$r = 100$ req/s, $B = 500$, $R_{burst} = 600$ req/s $$T_{burst} = \frac{500}{600 - 100} = 1 \text{ 秒}$$

在这 1 秒内可以发 600 个请求(而不是只有 100 个),之后回到 100 req/s 的稳态速率。

与排队论的联系

令牌桶限流器可以建模为一个 M/D/1/K 排队系统(泊松到达、确定性服务、单服务台、有限队列长度 K = burst)。当请求到达率 $\lambda > r$ 时,丢弃概率为:

$$P_{drop} \approx 1 - \frac{r}{\lambda} \quad (\text{稳态近似,当 } \lambda \gg r)$$

39.8 GCRA 算法(通用信元速率算法)

GCRA(Generic Cell Rate Algorithm) 是 ATM 网络标准中定义的限流算法,等价于令牌桶但用不同的数学表述。Redis 的 redis_cell 模块实现的就是 GCRA。

核心思想:维护一个"理论到达时间"(TAT, Theoretical Arrival Time)。每个请求的 TAT 是它"应该到达的最早时间"。如果实际到达时间早于 TAT,说明请求太频繁,被限流。

import time


class GCRA:
    """
    GCRA (Generic Cell Rate Algorithm)
    
    等价于令牌桶,但用"时间"而非"令牌数"表述。
    
    核心变量:TAT (Theoretical Arrival Time)
    - 如果 now >= TAT:允许,TAT = now + emission_interval
    - 如果 now < TAT 但 TAT - now <= burst_tolerance:允许,TAT = TAT + emission_interval
    - 如果 TAT - now > burst_tolerance:拒绝
    
    emission_interval = 1 / rate(两次请求之间的最小间隔)
    burst_tolerance = burst * emission_interval(允许的最大提前量)
    
    由 ITU-T I.371 标准定义(1993)。
    """
    
    def __init__(self, rate: float, burst: int):
        self.emission_interval = 1.0 / rate   # 两次请求的理想间隔
        self.burst_tolerance = burst * self.emission_interval  # 最大容忍的"提前量"
        self.tat = 0.0  # Theoretical Arrival Time
    
    def allow(self) -> bool:
        now = time.time()
        
        # 新 TAT = max(TAT, now) + emission_interval
        new_tat = max(self.tat, now) + self.emission_interval
        
        # 如果新 TAT 超过 now + burst_tolerance + emission_interval,拒绝
        if new_tat - now > self.burst_tolerance + self.emission_interval:
            return False
        
        self.tat = new_tat
        return True

GCRA 的优势:只需要一个变量 TAT(一个时间戳),内存极小。Redis 的 CL.THROTTLE 命令就是基于 GCRA,每个 key 只存一个时间戳。

39.9 一致性哈希的负载均衡证明

定理(Karger et al., 1997): 在有 $n$ 台服务器、$m$ 个 key 的一致性哈希中:

增减节点时的 key 迁移量

添加一个新节点时,期望只有 $m/(n+1)$ 个 key 需要从其他节点迁移到新节点。这比取模哈希($m \cdot n/(n+1)$ 个 key 需要迁移)好 $n$ 倍。

虚拟节点与一致性的 trade-off

更多虚拟节点 → 更均匀的负载 → 但增减节点时迁移涉及更多的"碎片"(从多个不同节点各迁移少量 key)。

实践中的选择:

39.10 平滑加权轮询的数学证明

定理: Nginx 的平滑加权轮询算法,在一个完整的权重周期内($\sum w_i$ 次选择),每个服务器 $i$ 被选中恰好 $w_i$ 次。

证明思路:

设总权重 $W = \sum_{i=1}^{n} w_i$。每一轮选择:

  1. 所有服务器 $current_weight_i += w_i$
  2. 选最大的 $current_weight_{max}$
  3. 被选中的 $current_weight_{max} -= W$

一个完整周期($W$ 次选择)后,每个服务器的 current_weight 净变化为:$w_i \times W - w_i \times W = 0$(加了 $W$ 次 $w_i$,被选中 $w_i$ 次各减 $W$)。这意味着系统回到初始状态,且每个服务器恰好被选 $w_i$ 次。

"平滑性"的保证

关键性质:在权重序列中,不会出现高权重服务器被连续选择的情况。例如权重 {A:5, B:1, C:1} 的选择序列是 A B A C A A A 而不是 A A A A A B C

这个性质由算法中"选最大的后减去总权重"保证:一旦某服务器被选中,它的 current_weight 立刻下降 $W$ 的量,在随后的几轮中不太可能再次成为最大。

39.11 负载均衡算法的延迟分析

用排队论模型分析不同负载均衡算法的性能差异:

假设: N 台服务器,每台处理速率 $\mu$,总到达率 $\lambda < N\mu$。

Round Robin 的平均响应时间:

如果所有请求处理时间相同(确定性服务),Round Robin 是最优的——它等价于一个 M/D/1 队列(到达率 $\lambda/N$,服务率 $\mu$)。

如果处理时间有方差(指数分布),Round Robin 退化为 N 个独立的 M/M/1 队列,平均响应时间:

$$E[T_{RR}] = \frac{1}{\mu - \lambda/N}$$

Join-Shortest-Queue(JSQ,最短队列)的优势:

当处理时间有方差时,JSQ 可以达到接近单个 M/M/N 队列的性能:

$$E[T_{JSQ}] \approx \frac{1}{\mu} + \frac{\rho^N}{N\mu(1-\rho)} \quad (\text{其中 } \rho = \lambda/(N\mu))$$

JSQ 在高负载下比 Round Robin 好一个数量级——因为它利用了"当前队列长度"的信息做出更智能的决策。但 JSQ 在分布式环境中需要实时获取所有服务器的队列长度,通信开销大。

Power of Two Choices(两次随机选择):

Mitzenmacher (2001, "The Power of Two Choices in Randomized Load Balancing") 证明了一个惊人的结论:从 N 台服务器中随机选 2 台,然后选其中负载更小的那台——这个简单策略把最大队列长度从 $O(\log N / \log\log N)$(纯随机)降低到 $O(\log\log N)$(两次选择),是指数级的改善!

import random


class PowerOfTwoChoices:
    """
    两次随机选择负载均衡
    
    随机选两台服务器,选负载更轻的那台。
    数学上已证明这比纯随机好指数级(最大队列长度 O(log log N))。
    
    这是 Nginx 的 "random two" 策略的理论基础。
    """
    
    def __init__(self, servers: list):
        self.servers = servers
        self.loads = {s: 0 for s in servers}
    
    def acquire(self) -> str:
        # 随机选两台
        s1, s2 = random.sample(self.servers, 2)
        # 选负载小的
        chosen = s1 if self.loads[s1] <= self.loads[s2] else s2
        self.loads[chosen] += 1
        return chosen
    
    def release(self, server: str) -> None:
        self.loads[server] = max(0, self.loads[server] - 1)

Level 3 · 规范怎么定义的

39.12 Nginx 中的限流实现

Nginx 的 ngx_http_limit_req_module 模块实现了漏桶算法。核心配置:

# 定义限流区(共享内存)
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;

# 应用限流
location /api/ {
    limit_req zone=api burst=20 nodelay;
}

参数解析:

内部实现细节(Nginx 源码分析):

Nginx 使用红黑树(per-zone)存储每个 key 的限流状态(last access time + excess count),使用共享内存(ngx_slab)在 worker 进程间共享。

// 简化的核心逻辑 (ngx_http_limit_req_module.c)
// excess 表示"桶中超出的请求数"(惰性计算)
excess = lr->excess - rate * ms_since_last / 1000;
if (excess < 0) excess = 0;

if (excess + 1000 <= burst * 1000) {
    // 允许:更新 excess
    lr->excess = excess + 1000;
    lr->last = now;
    return NGX_OK;
} else {
    // 拒绝:返回 503
    return NGX_HTTP_SERVICE_UNAVAILABLE;
}

Nginx 用整数运算(乘以 1000)避免浮点数,这在高频调用路径中很重要。

nodelay 的含义

不加 nodelay 时:超出 rate 但在 burst 内的请求会被延迟处理(排队等待),直到漏桶漏出足够的空间。这是纯漏桶行为。

加了 nodelay 后:只要 burst 桶没满,所有请求立即转发——但 burst 的恢复仍然按 rate 速率。效果等价于令牌桶:允许突发,但长期平均不超过 rate。

39.13 Redis 中的限流实现

方案 1:固定窗口(最简单)

-- 固定窗口限流(Lua 脚本,保证原子性)
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])

local current = tonumber(redis.call('GET', key) or "0")
if current >= limit then
    return 0  -- 拒绝
end

redis.call('INCR', key)
if current == 0 then
    redis.call('EXPIRE', key, window)
end
return 1  -- 放行

方案 2:滑动窗口(Sorted Set)

-- 滑动窗口限流(使用有序集合)
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

-- 移除窗口外的旧记录
redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)

-- 计数
local count = redis.call('ZCARD', key)
if count >= limit then
    return 0  -- 拒绝
end

-- 添加当前请求
redis.call('ZADD', key, now, now .. math.random())
redis.call('PEXPIRE', key, window * 1000)
return 1  -- 放行

方案 3:redis_cell 模块(GCRA)

-- CL.THROTTLE 命令(redis_cell 模块)
-- CL.THROTTLE key burst rate period [quantity]
-- 例如:每 60 秒最多 30 次请求,允许突发 10 个
CL.THROTTLE user:123 10 30 60 1

-- 返回值:
-- 1) 0 (allowed) 或 1 (denied)
-- 2) 总容量 (burst + 1)
-- 3) 剩余容量
-- 4) 如果被拒绝,多少秒后重试
-- 5) 多少秒后桶完全恢复

redis_cell 的优势:一个命令完成限流判断 + 状态更新,原子性保证;返回丰富的元信息(剩余额度、重试时间),方便客户端实现退避策略。

39.14 API Gateway 中的限流分层

生产系统中的限流通常是多层的:

用户请求 → CDN 层限流 → API Gateway 限流 → 服务层限流 → 数据库连接池限流
                ↓              ↓                ↓              ↓
           DDoS 防护     用户级/API级       服务级保护      资源保护
         (L3/L4 层)       (L7 层)          (熔断器)        (信号量)

每层的职责不同:

限流粒度 算法选择 典型实现
CDN/边缘 IP / 地理位置 固定窗口 + 黑名单 Cloudflare, AWS WAF
API Gateway 用户 / API Key / 路由 令牌桶 / 滑动窗口 Kong, Envoy, Nginx
服务间 服务标识 / 方法 自适应限流 Sentinel, Hystrix
数据库 连接数 / 查询复杂度 信号量 / 排队 Connection Pool, Query Queue

39.15 Google SRE 的限流实践

Google SRE(Site Reliability Engineering)在《SRE Book》(Beyer et al., 2016, O'Reilly) 中描述了他们的限流哲学:

核心原则:客户端自适应限流(Client-Side Adaptive Throttling)

传统限流是服务端拒绝过多请求。Google 的方法是让客户端主动减少请求

import random


class AdaptiveThrottle:
    """
    Google SRE 的客户端自适应限流
    
    思路:客户端跟踪最近的请求成功率。
    如果服务端开始拒绝请求,客户端主动降低发送速率。
    
    核心公式:
    rejection_probability = max(0, (requests - K * accepts) / (requests + 1))
    
    其中:
    - requests: 最近窗口内的总请求数
    - accepts: 最近窗口内被服务端接受的请求数
    - K: 倍率因子(通常 K=2),表示"只要成功率 > 1/K,就不限流"
    
    K=2 意味着:只要超过 50% 的请求成功,客户端就不限流。
    当成功率降低到 50% 以下时,客户端开始按概率丢弃请求。
    
    优势:
    1. 无需服务端配置限流阈值
    2. 自动适应后端容量变化
    3. 优雅降级(不是突然从 100% 变到 0%)
    """
    
    def __init__(self, K: float = 2.0, window_size: int = 120):
        self.K = K
        self.requests = 0
        self.accepts = 0
    
    def should_send(self) -> bool:
        """客户端判断是否应该发送请求"""
        rejection_prob = max(0.0, (self.requests - self.K * self.accepts) / (self.requests + 1))
        
        if random.random() < rejection_prob:
            return False  # 主动丢弃
        return True
    
    def record_request(self) -> None:
        """记录发送了一个请求"""
        self.requests += 1
    
    def record_accept(self) -> None:
        """记录请求被服务端接受"""
        self.accepts += 1
    
    def record_reject(self) -> None:
        """记录请求被服务端拒绝"""
        pass  # requests 已经在 record_request 中 +1
    
    def decay(self, factor: float = 0.5) -> None:
        """定期衰减计数器(实现滑动窗口效果)"""
        self.requests = int(self.requests * factor)
        self.accepts = int(self.accepts * factor)

为什么 K=2?

Google 在生产中使用 K=2,因为它在"快速反应"和"避免误触发"之间找到了好的平衡点。

与传统限流的对比

维度 传统服务端限流 Google 客户端自适应限流
配置 需要预设阈值 无需配置(自适应)
适应性 静态阈值,扩缩容需更新 自动跟随后端容量
延迟 请求到达服务端才被拒绝 客户端直接丢弃,节省网络往返
级联故障 上游仍会发大量请求 上游主动减压
复杂度 服务端实现 需要客户端库支持

39.16 限流算法在微服务中的组合使用

实际生产系统通常组合多种限流策略:

class CompositeRateLimiter:
    """
    组合限流器:多维度限流
    
    示例:一个 API 同时有以下限制:
    - 全局:10000 req/s(保护整个服务)
    - 每用户:100 req/s(公平性)
    - 每用户每接口:10 req/s(防止滥用单个接口)
    - 每 IP:1000 req/s(防 DDoS)
    
    请求必须通过所有维度的检查才能放行。
    """
    
    def __init__(self):
        self.global_limiter = TokenBucket(rate=10000, burst=15000)
        self.user_limiters = {}   # user_id -> TokenBucket
        self.ip_limiters = {}     # ip -> TokenBucket
    
    def allow(self, user_id: str, ip: str, endpoint: str) -> bool:
        # 全局检查
        if not self.global_limiter.allow():
            return False
        
        # 用户级检查
        if user_id not in self.user_limiters:
            self.user_limiters[user_id] = TokenBucket(rate=100, burst=200)
        if not self.user_limiters[user_id].allow():
            return False
        
        # IP 级检查
        if ip not in self.ip_limiters:
            self.ip_limiters[ip] = TokenBucket(rate=1000, burst=2000)
        if not self.ip_limiters[ip].allow():
            return False
        
        return True

39.17 分布式限流的挑战

问题: 多台 API Gateway 实例各自维护独立的限流计数器。用户限制 100 req/s,有 5 台 Gateway 实例,用户实际可以发 500 req/s。

方案 1:集中式(Redis)

所有实例共享一个 Redis 计数器。精确但每个请求增加一次 Redis 往返延迟(~0.5-1ms)。

方案 2:分片配额

每台 Gateway 分到 100/5 = 20 req/s 的配额。简单但不均匀(某些 Gateway 收到更多来自某用户的请求)。

方案 3:本地限流 + 定期同步

每台 Gateway 本地限流(粗略配额),每隔 1-5 秒向中心同步实际使用量并调整配额。

class DistributedRateLimiter:
    """
    分布式限流:本地桶 + 定期 Redis 同步
    
    平衡精确性和延迟:
    - 本地判断:0 延迟
    - 定期同步:每秒一次 Redis 交互,调整本地配额
    
    误差:最多偏差 rate * sync_interval * num_instances
    例如:100 req/s * 1s * 5 instances = 最多多放 500 请求
    对于大多数场景可接受(限流不需要绝对精确)
    """
    
    def __init__(self, total_rate: float, num_instances: int, sync_interval: float = 1.0):
        # 初始均分配额
        self.local_rate = total_rate / num_instances
        self.local_bucket = TokenBucket(rate=self.local_rate, burst=int(self.local_rate * 2))
        self.sync_interval = sync_interval
        self.local_count = 0  # 上次同步以来的本地通过量
    
    def allow(self) -> bool:
        if self.local_bucket.allow():
            self.local_count += 1
            return True
        return False
    
    def sync_with_redis(self, redis_client, key: str, total_limit: int) -> None:
        """定期调用:与 Redis 同步并调整本地配额"""
        # 上报本地使用量
        # global_used = redis_client.incrby(key, self.local_count)
        # 根据全局使用情况调整本地配额
        # ...
        self.local_count = 0

Envoy 的本地限流 + 全局限流设计:

Envoy proxy 有两层限流:

  1. Local Rate Limit Filter: 每个 Envoy 实例独立限流,无外部依赖
  2. Global Rate Limit Service: 调用外部 gRPC 服务(如 ratelimit 项目)做全局判断

请求先过本地限流(快速拒绝明显超限的),再过全局限流(精确判断)。这种分层设计在延迟和精确性之间取得了很好的平衡。


Level 4 · 边界与陷阱

39.18 面试中系统设计题的限流部分怎么回答

典型面试场景: "设计一个短链接服务/聊天系统/支付系统——如何处理流量控制?"

回答框架(5 步):

第一步:明确限流目标

第二步:选择限流算法

第三步:确定限流位置

客户端 → CDN → Load Balancer → API Gateway → Service → Database
        L3/L4    L4/L7          L7 用户级    服务级     连接池级

越靠前拒绝,浪费的资源越少。但靠前的层没有用户身份信息。

第四步:处理限流响应

第五步:讨论分布式限流

39.19 限流的反模式

反模式 1:限流粒度太粗

只做全局限流(如"全服务 10000 QPS"),不做用户级限流。后果:一个恶意用户耗尽全部配额,正常用户全部受影响。

反模式 2:限流不返回有意义的错误

返回 500 Internal Server Error 而不是 429 Too Many Requests。后果:客户端无法区分"服务故障"和"被限流",无法做针对性的退避重试。

反模式 3:限流配置写死在代码中

# 反模式
RATE_LIMIT = 100  # 写死

# 正确做法
RATE_LIMIT = config.get("rate_limit", default=100)  # 可动态调整

后果:无法快速响应流量变化(需要发布代码才能改限流阈值)。

反模式 4:被限流的请求没有指数退避重试

客户端被限流后立即无限重试,造成"重试风暴"——被限流的请求越多,重试产生的额外请求越多,恶性循环。

正确做法:

import random
import time


def request_with_backoff(url, max_retries=5):
    """带指数退避的重试"""
    for attempt in range(max_retries):
        response = send_request(url)
        
        if response.status_code == 429:
            # 指数退避 + 抖动(jitter)
            wait = min(2 ** attempt + random.uniform(0, 1), 30)
            time.sleep(wait)
        else:
            return response
    
    raise Exception("Max retries exceeded")

为什么要加 jitter(随机抖动)?如果 100 个客户端同时被限流,不加 jitter 它们会在完全相同的时间重试(如 1s 后全部重试),造成"惊群效应"。加 jitter 让重试时间分散。

39.20 限流与熔断的区别

维度 限流 (Rate Limiting) 熔断 (Circuit Breaking)
触发条件 请求速率超过阈值 下游错误率超过阈值
保护对象 自身 下游服务
行为 拒绝超出的请求 暂时切断所有到下游的请求
恢复方式 速率降下来自动恢复 半开状态(尝试放行少量请求测试)
算法 令牌桶/漏桶/滑动窗口 状态机(关闭→打开→半开)

两者经常配合使用:限流保护自己不被上游压垮,熔断保护下游不被自己压垮。

import time
from enum import Enum


class CircuitState(Enum):
    CLOSED = "closed"      # 正常状态,请求正常转发
    OPEN = "open"          # 熔断状态,所有请求直接失败
    HALF_OPEN = "half_open"  # 半开状态,放行少量请求测试


class CircuitBreaker:
    """
    熔断器
    
    状态转换:
    CLOSED → OPEN: 错误率超过阈值(如 50%)
    OPEN → HALF_OPEN: 经过 recovery_timeout 后
    HALF_OPEN → CLOSED: 测试请求成功
    HALF_OPEN → OPEN: 测试请求失败
    
    Netflix Hystrix (2012) 开创了这个模式,现在已被
    Resilience4j、Sentinel 等框架广泛采用。
    """
    
    def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 30.0):
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.last_failure_time = 0
    
    def allow(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True
        elif self.state == CircuitState.OPEN:
            # 检查是否到了恢复时间
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                return True  # 放行一个测试请求
            return False
        else:  # HALF_OPEN
            return True  # 放行测试请求
    
    def record_success(self) -> None:
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.CLOSED
            self.failure_count = 0
    
    def record_failure(self) -> None:
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.OPEN
        elif self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

39.21 实战案例:设计一个多维限流系统

需求: 为一个 SaaS API 平台设计限流,支持:

from dataclasses import dataclass
from typing import Optional
import time


@dataclass
class RateLimitResult:
    allowed: bool
    limit: int
    remaining: int
    reset_at: float
    retry_after: Optional[float] = None
    
    def to_headers(self) -> dict:
        """转换为 HTTP 响应头"""
        headers = {
            'X-RateLimit-Limit': str(self.limit),
            'X-RateLimit-Remaining': str(max(0, self.remaining)),
            'X-RateLimit-Reset': str(int(self.reset_at)),
        }
        if self.retry_after is not None:
            headers['Retry-After'] = str(int(self.retry_after))
        return headers


class MultiDimensionRateLimiter:
    """
    多维限流器
    
    设计思路:
    1. 从最严格(最容易触发)到最宽松的顺序检查
    2. 一旦某层被限制,立即返回(短路评估)
    3. 只有所有层都通过才放行
    4. 返回最紧张那层的信息(帮助用户了解瓶颈)
    """
    
    def __init__(self):
        self.global_bucket = TokenBucket(rate=50000, burst=75000)
        self.ip_buckets = {}
        self.user_buckets = {}
        self.endpoint_buckets = {}
    
    def check(self, user_id: str, user_tier: str, ip: str, endpoint: str) -> RateLimitResult:
        now = time.time()
        
        # 层 1:全局
        if not self.global_bucket.allow():
            return RateLimitResult(
                allowed=False, limit=50000, remaining=0,
                reset_at=now + 1, retry_after=1.0
            )
        
        # 层 2:IP 级
        if ip not in self.ip_buckets:
            self.ip_buckets[ip] = TokenBucket(rate=100, burst=200)
        if not self.ip_buckets[ip].allow():
            return RateLimitResult(
                allowed=False, limit=100, remaining=0,
                reset_at=now + 1, retry_after=1.0
            )
        
        # 层 3:用户级(根据套餐)
        user_rate = 1000 if user_tier == 'paid' else 10
        user_burst = user_rate * 2
        if user_id not in self.user_buckets:
            self.user_buckets[user_id] = TokenBucket(rate=user_rate / 60, burst=user_burst // 60)
        if not self.user_buckets[user_id].allow():
            return RateLimitResult(
                allowed=False, limit=user_rate, remaining=0,
                reset_at=now + 60, retry_after=60.0 / user_rate
            )
        
        # 层 4:特定接口
        if endpoint == '/upload':
            ep_key = f"{user_id}:{endpoint}"
            if ep_key not in self.endpoint_buckets:
                self.endpoint_buckets[ep_key] = TokenBucket(rate=5.0/60, burst=5)
            if not self.endpoint_buckets[ep_key].allow():
                return RateLimitResult(
                    allowed=False, limit=5, remaining=0,
                    reset_at=now + 60, retry_after=12.0
                )
        
        return RateLimitResult(
            allowed=True, limit=user_rate, remaining=user_rate - 1,
            reset_at=now + 60
        )

39.22 调度算法在操作系统中的真实应用

Linux 完全公平调度器(CFS, Completely Fair Scheduler)

CFS 由 Ingo Molnar (2007) 开发,用红黑树维护所有可运行进程,按"虚拟运行时间"(vruntime)排序。每次调度选择 vruntime 最小的进程运行。

核心数据结构:红黑树(rb_tree),以 vruntime 为 key。

class SimpleCFS:
    """
    CFS 调度器简化模型
    
    核心思想:让每个进程获得公平的 CPU 时间。
    "公平"的定义:所有进程的 vruntime 尽量相等。
    
    vruntime 增长速率与权重成反比:
    高优先级进程的 vruntime 增长慢 → 被调度的机会更多
    低优先级进程的 vruntime 增长快 → 被调度的机会更少
    """
    
    def __init__(self):
        import sortedcontainers
        self.tree = sortedcontainers.SortedList(key=lambda p: p['vruntime'])
    
    def add_process(self, pid: int, nice: int = 0):
        """添加进程(nice 值越低优先级越高)"""
        weight = self._nice_to_weight(nice)
        process = {
            'pid': pid,
            'vruntime': self._min_vruntime(),  # 新进程从当前最小 vruntime 开始
            'weight': weight,
            'nice': nice
        }
        self.tree.add(process)
    
    def schedule(self) -> dict:
        """选择下一个运行的进程"""
        if not self.tree:
            return None
        # 选 vruntime 最小的(红黑树最左节点,O(1))
        return self.tree[0]
    
    def tick(self, running_process: dict, elapsed_ns: int):
        """时钟中断:更新运行进程的 vruntime"""
        self.tree.remove(running_process)
        # vruntime 增量与权重成反比
        delta_vruntime = elapsed_ns * 1024 / running_process['weight']
        running_process['vruntime'] += delta_vruntime
        self.tree.add(running_process)
    
    def _min_vruntime(self) -> float:
        if self.tree:
            return self.tree[0]['vruntime']
        return 0.0
    
    def _nice_to_weight(self, nice: int) -> int:
        """nice 值到权重的映射(简化版)"""
        # nice 每差 1,权重差约 25%(Linux 内核精确表)
        return max(1, int(1024 * (1.25 ** (-nice))))

CFS 之所以用红黑树而不是堆:因为需要 O(log n) 的插入和删除(进程阻塞/唤醒时从树中移除/加入),而堆只能 O(1) 获取最小值但删除任意元素需要 O(n) 查找。


本章小结

限流和调度是数据结构与算法在生产环境中最直接的应用。它们不是"面试造飞机"——每个后端服务背后都有限流器保护,每个操作系统内核都有调度器在运行。

核心要点:

  1. 令牌桶 = 允许突发的限流,一个计数器 + 惰性计算
  2. 漏桶 = 严格匀速的限流,本质是固定速率队列
  3. 滑动窗口 = 精确的"N次/T秒"语义,日志法精确但耗内存
  4. 一致性哈希 = 增减节点时最小化 key 迁移,虚拟节点保证均匀
  5. Power of Two Choices = 简单但效果指数级优于纯随机

工业实践的核心教训:没有完美的限流算法,只有适合场景的组合策略。 在延迟、精确性、内存、实现复杂度之间做 trade-off,是系统设计的本质。

本章评分
4.8  / 5  (3 评分)

💬 留言讨论