第 35 章

分布式锁完全指南:从 SETNX 到 Redisson

第35章 分布式锁完全指南:从 SETNX 到 Redisson

分布式锁是微服务架构中保证多节点互斥访问共享资源的核心机制。Redis 实现的分布式锁因其高性能与简单性被广泛采用,但也有许多容易踩坑的细节。本章从最简单的 SETNX 实现出发,逐步推导出正确的实现,深入剖析 Redisson 的工业级实现,最后讨论 Redlock 算法及其争议。


35.1 从 SETNX 说起

35.1.1 错误实现:两步命令不原子

早期分布式锁实现常见的错误模式:

# 错误示范1:两条命令之间可能宕机
SETNX lock:order 1       # 步骤1:设置锁
EXPIRE lock:order 30     # 步骤2:设置过期时间(若此步未执行→死锁)

# 错误示范2:GETSET 竞态
GET lock:order           # 检查是否存在
SET lock:order 1         # 设置(GET 和 SET 之间可能有其他线程插入)
EXPIRE lock:order 30

SETNX 成功后若进程崩溃,EXPIRE 永远不会执行,锁变成死锁,直到人工干预才能释放。

35.1.2 正确实现:原子 SET NX PX

Redis 2.6.12 起,SET 命令支持 NX(不存在才设置)和 PX(毫秒过期),合并为原子操作:

SET lock:order {token} NX PX 30000
# NX = Not eXists(只有 key 不存在时才设置)
# PX 30000 = 30000毫秒后自动过期
# {token} = 唯一标识(UUID),用于释放时验证所有权
import uuid

def acquire_lock(r, lock_name: str, expire_ms: int = 30000) -> str | None:
    """
    尝试获取锁。
    返回 token(获锁成功)或 None(获锁失败)。
    """
    token = str(uuid.uuid4())
    acquired = r.set(
        f'lock:{lock_name}',
        token,
        nx=True,
        px=expire_ms
    )
    return token if acquired else None

token 的必要性:token 是一个唯一标识(UUID),在释放锁时用于验证"这把锁确实是我加的",防止误释放他人的锁(详见 35.2 节)。


35.2 安全释放锁:Lua 原子 CAS

35.2.1 错误释放的危险

# 错误释放:不验证 token
r.delete(f'lock:{lock_name}')

场景1:线程 A 持有锁,但锁超时过期 → 线程 B 获取了锁 → 线程 A 执行 DELETE → 删掉了 B 的锁 → 线程 C 也能获锁 → 并发冲突。

场景2:只检查 token 但分两步(GET + DEL):

# 错误示范:GET 和 DEL 不原子
token = r.get(f'lock:{lock_name}')
if token == my_token:
    r.delete(f'lock:{lock_name}')   # GET 和 DEL 之间可能有其他线程修改 token

35.2.2 正确实现:Lua 脚本保证原子性

-- unlock.lua
-- KEYS[1] = 锁的 key
-- ARGV[1] = 获锁时设置的 token
if redis.call('get', KEYS[1]) == ARGV[1] then
    return redis.call('del', KEYS[1])  -- 原子删除
else
    return 0  -- 不是自己的锁,不删除
end
UNLOCK_SCRIPT = """
if redis.call('get', KEYS[1]) == ARGV[1] then
    return redis.call('del', KEYS[1])
else
    return 0
end
"""

def release_lock(r, lock_name: str, token: str) -> bool:
    """
    原子性地释放锁(仅当 token 匹配时)。
    返回 True=成功释放,False=token不匹配(不是自己的锁)
    """
    result = r.eval(UNLOCK_SCRIPT, 1, f'lock:{lock_name}', token)
    return result == 1

# 完整使用示例
import contextlib

@contextlib.contextmanager
def redis_lock(r, lock_name: str, expire_ms: int = 30000, max_retries: int = 3):
    token = None
    for attempt in range(max_retries):
        token = acquire_lock(r, lock_name, expire_ms)
        if token:
            break
        time.sleep(0.1 * (attempt + 1))  # 指数退避等待
    if not token:
        raise TimeoutError(f"无法获取锁: {lock_name}")
    try:
        yield token
    finally:
        release_lock(r, lock_name, token)

# 业务代码
with redis_lock(r, 'order:1001', expire_ms=10000) as token:
    process_order(1001)
var unlockScript = redis.NewScript(`
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    end
    return 0
`)

func AcquireLock(ctx context.Context, rdb *redis.Client, name string, ttl time.Duration) (string, error) {
    token := uuid.New().String()
    ok, err := rdb.SetNX(ctx, "lock:"+name, token, ttl).Result()
    if err != nil {
        return "", err
    }
    if !ok {
        return "", nil // 获锁失败
    }
    return token, nil
}

func ReleaseLock(ctx context.Context, rdb *redis.Client, name, token string) (bool, error) {
    n, err := unlockScript.Run(ctx, rdb, []string{"lock:" + name}, token).Int()
    return n == 1, err
}
// Node.js / ioredis
const UNLOCK_SCRIPT = `
  if redis.call('get', KEYS[1]) == ARGV[1] then
    return redis.call('del', KEYS[1])
  end
  return 0
`;

async function acquireLock(redis, name, ttlMs = 30000) {
    const token = crypto.randomUUID();
    const result = await redis.set(`lock:${name}`, token, 'NX', 'PX', ttlMs);
    return result === 'OK' ? token : null;
}

async function releaseLock(redis, name, token) {
    const result = await redis.eval(UNLOCK_SCRIPT, 1, `lock:${name}`, token);
    return result === 1;
}

35.3 锁超时与业务超时的矛盾

35.3.1 核心问题

分布式锁必须设置 TTL 以防死锁,但业务执行时间不可预测:

这是一个两难困境:TTL 太短怕锁超时,TTL 太长怕死锁太久

35.3.2 解决方案:WatchDog 自动续期

Redisson 的解决方案是不指定超时时间,由后台任务自动续期

  1. 调用 lock.lock()(无 leaseTime 参数)
  2. Redis 获取锁,设置 TTL = leaseTime(默认 30s)
  3. 同时在 Netty 时间轮中注册一个定时任务,每 leaseTime/3(默认 10s)执行一次:
    • 执行 PEXPIRE lockKey leaseTime,将锁的剩余时间重置为 30s
  4. 调用 lock.unlock() 时:
    • 取消定时任务
    • 执行 Lua 脚本删除锁 key

若进程宕机:定时任务消失 → 锁在 30s 内自然过期 → 其他节点可获锁。

Redisson WatchDog 源码级分析

// 伪代码,展示 Redisson 内部逻辑
private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    entry.addThreadId(threadId);
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getName(), entry);

    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        // 首次获取锁,启动 WatchDog
        renewExpiration();
    }
}

private void renewExpiration() {
    // 每 internalLockLeaseTime / 3 = 10s 执行一次
    Timeout task = commandExecutor.getConnectionManager().newTimeout(
        timeout -> {
            // 向 Redis 发送续期命令(Lua 脚本)
            RFuture<Boolean> future = renewExpirationAsync(Thread.currentThread().getId());
            future.onComplete((res, e) -> {
                if (res) {
                    // 续期成功,继续安排下一次
                    renewExpiration();
                }
                // 续期失败(锁已被删除),停止 WatchDog
            });
        },
        internalLockLeaseTime / 3,
        TimeUnit.MILLISECONDS
    );
    entry.setTimeout(task);
}

续期 Lua 脚本(检查持有者是否仍是当前线程):

-- 使用 Redis Hash 存储锁的持有者信息(支持可重入)
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('pexpire', KEYS[1], ARGV[1])
    return 1
end
return 0

35.3.3 Redisson 可重入锁机制

Redisson 实现了可重入锁(ReentrantLock):同一线程可以多次获取同一把锁,内部用 Redis Hash 记录持有线程和重入次数:

Redis Hash 结构:
key = "order:lock:1001"(Hash)
field = "{uuid}:{threadId}"(唯一标识此线程)
value = 2(重入次数)
RLock lock = redisson.getLock("order:lock:1001");
lock.lock();  // 第一次加锁:hash field = 1
try {
    lock.lock(); // 第二次加锁(可重入):hash field = 2
    try {
        processOrder();
    } finally {
        lock.unlock(); // 释放一次:hash field = 1
    }
} finally {
    lock.unlock(); // 释放一次:hash field = 0 → 删除 key
}

35.4 Redlock 算法

35.4.1 背景:单节点 Redis 的风险

单节点 Redis 做分布式锁有以下风险:

Redlock 是 Redis 作者 Salvatore Sanfilippo(antirez)提出的多节点分布式锁方案。

35.4.2 算法步骤(N=5个独立节点)

前提:5个独立的 Redis 实例(非集群,无主从关系),任意3个存活即可工作。

1. 记录当前时间 T₁
2. 依次向5个节点发送:SET lock:name {token} NX PX {锁TTL}
   - 每个请求的超时时间 = 锁TTL / 10(避免等待失效节点)
3. 记录当前时间 T₂
4. 计算获取耗时 = T₂ - T₁
5. 判断是否获锁成功:
   - 成功获取 ≥ 3 个节点(N/2+1 = quorum)
   - 且 T₂ - T₁ < 锁TTL(有效时间仍充足)
6. 若成功:
   - 实际有效时间 = 锁TTL - (T₂ - T₁) - 时钟漂移容错
   - 在有效时间内执行业务
7. 若失败(成功节点 < 3 或超时):
   - 向所有节点发送释放命令(无论是否成功获取)
import uuid, time, redis

class Redlock:
    CLOCK_DRIFT_FACTOR = 0.01  # 时钟漂移因子

    def __init__(self, nodes: list[redis.Redis]):
        self.nodes = nodes
        self.quorum = len(nodes) // 2 + 1

    def acquire(self, resource: str, ttl_ms: int) -> dict | None:
        token = str(uuid.uuid4())
        start_ms = int(time.time() * 1000)
        acquired_count = 0

        for node in self.nodes:
            try:
                ok = node.set(resource, token, nx=True, px=ttl_ms)
                if ok:
                    acquired_count += 1
            except Exception:
                pass  # 节点不可达,跳过

        elapsed_ms = int(time.time() * 1000) - start_ms
        drift_ms = int(ttl_ms * self.CLOCK_DRIFT_FACTOR) + 2
        validity_ms = ttl_ms - elapsed_ms - drift_ms

        if acquired_count >= self.quorum and validity_ms > 0:
            return {'token': token, 'validity_ms': validity_ms}

        # 未获锁,释放所有已获取的锁
        self.release(resource, token)
        return None

    def release(self, resource: str, token: str):
        for node in self.nodes:
            try:
                node.eval("""
                    if redis.call('get', KEYS[1]) == ARGV[1] then
                        return redis.call('del', KEYS[1])
                    end
                    return 0
                """, 1, resource, token)
            except Exception:
                pass

# 使用示例
nodes = [redis.Redis(host=h, port=6379) for h in ['r1', 'r2', 'r3', 'r4', 'r5']]
redlock = Redlock(nodes)

lock_info = redlock.acquire('lock:critical', 10000)  # 10秒 TTL
if lock_info:
    try:
        # 业务逻辑,必须在 validity_ms 内完成
        do_critical_work()
    finally:
        redlock.release('lock:critical', lock_info['token'])

35.4.3 Martin Kleppmann 的批判

2016年,分布式系统专家 Martin Kleppmann 在其博客发表文章,指出 Redlock 在以下场景仍不安全:

问题1:时钟跳跃(Clock Skew)

场景:
1. 客户端A 在5个节点成功获锁,有效期 10s
2. Redis 节点1 的系统时钟被 NTP 调快了 11s
3. 节点1 的锁立即过期
4. 客户端B 在节点1 获锁成功
5. 若此时节点2-5 上A的锁还有效,则A和B同时持有"锁"

问题2:GC Stop-The-World 暂停

场景:
1. 客户端A 成功获取 Redlock,有效期 10s
2. JVM 发生 GC,暂停 15s
3. 锁在GC期间过期
4. 客户端B 获取锁并开始操作
5. JVM GC恢复,A以为自己仍持有锁,继续操作
→ 两个客户端同时操作共享资源

Kleppmann 的结论:Redlock 依赖时间的准确性,而分布式系统中时钟是不可信赖的。它既无法提供真正的强一致性,又比单节点锁更复杂,不值得使用。

35.4.4 Fencing Token 方案

Kleppmann 提出了更安全的解决方案:令牌栅栏(Fencing Token)

机制:
1. 每次获取锁时,锁服务返回一个单调递增的整数 token(如:100, 101, 102...)
2. 客户端在访问存储层时携带 token
3. 存储层记录"已见过的最大 token",拒绝小于该值的请求
# 锁服务(使用 Redis INCR 生成单调递增 token)
def acquire_fencing_lock(r, resource: str, ttl_ms: int):
    token = r.incr('fencing:counter')  # 单调递增
    acquired = r.set(f'lock:{resource}', token, nx=True, px=ttl_ms)
    return token if acquired else None

# 存储层(数据库)的验证
def update_with_fencing(db, resource_id: int, data: dict, fencing_token: int):
    # SQL 示例:仅在 token 更大时才更新
    db.execute("""
        UPDATE resources
        SET data = %s, last_fencing_token = %s
        WHERE id = %s AND last_fencing_token < %s
    """, (data, fencing_token, resource_id, fencing_token))
    # 若影响行数为0,说明有更新的 token 先一步写入,当前写入被拒绝

Fencing Token 的优势:即使客户端 GC 暂停,持有旧 token 的请求也会被存储层拒绝,从根本上保证了安全性,无需依赖时钟。

35.4.5 antirez 的回应

Redis 作者 antirez 对 Kleppmann 的批评进行了回应,主要观点:

  1. GC 暂停和时钟跳跃是理论上的极端情况,实践中概率极低
  2. 使用 NTP "slewing"(平滑调整时钟)可减轻时钟跳跃问题
  3. Redlock 的目标是"提高安全性,相对于单节点方案",而不是"提供完美的分布式锁"

35.5 何时使用何种锁

35.5.1 决策矩阵

场景 推荐方案 原因
幂等操作防重复 单节点 SET NX 偶发并发无伤大雅
普通业务互斥 Redisson(单节点/Sentinel) WatchDog + 可重入,够用
高价值操作(扣款、库存) Redisson + 数据库乐观锁 双重保障
强一致分布式锁 ZooKeeper / etcd Raft 一致性,不依赖时钟
极致安全(金融核心) 数据库行锁 / SELECT FOR UPDATE 事务保证

35.5.2 ZooKeeper vs Redis 分布式锁

维度 Redis 锁 ZooKeeper 锁
一致性协议 无(依赖单点)/ Redlock(依赖时钟) Zab(Paxos变种),强一致
性能 高(10万+ QPS) 中(数万 QPS)
自动续期 Redisson WatchDog 临时节点(连接断开自动删除)
Fencing Token 需手动实现 内置(zxid 单调递增)
运维复杂度 高(需要奇数节点 ZooKeeper 集群)
适用场景 高频短锁 低频长锁、强一致要求

35.5.3 实践建议

90% 的业务场景,Redisson 的单节点(或 Sentinel)锁已经足够:

需要强一致时,直接使用数据库事务:

-- 用数据库行锁实现互斥
BEGIN;
SELECT * FROM orders WHERE id = 1001 FOR UPDATE;
-- 执行业务逻辑
UPDATE orders SET status = 'processing' WHERE id = 1001;
COMMIT;

中间地带(需要跨服务的分布式锁但 ZooKeeper 太重),使用 etcd:

client, _ := clientv3.New(clientv3.Config{
    Endpoints:   []string{"localhost:2379"},
    DialTimeout: 5 * time.Second,
})

session, _ := concurrency.NewSession(client, concurrency.WithTTL(10))
mutex := concurrency.NewMutex(session, "/lock/order/1001")

mutex.Lock(ctx)
defer mutex.Unlock(ctx)
// etcd 基于 Raft,锁的一致性由共识协议保证,不依赖时钟

35.6 生产运维要点

监控指标

# 锁相关 key 数量(告警:异常增多可能有锁泄漏)
redis-cli DBSIZE

# 查看所有锁 key(小心 KEYS 在生产中的影响)
redis-cli --scan --pattern 'lock:*' | wc -l

# 查看特定锁的 TTL(TTL=-1 说明未设置过期,可能是死锁)
redis-cli TTL 'lock:order:1001'

锁泄漏检测(定时扫描无 TTL 的锁 key):

import redis

r = redis.Redis()
cursor = 0
leaked = []
while True:
    cursor, keys = r.scan(cursor, match='lock:*', count=1000)
    for key in keys:
        ttl = r.ttl(key)
        if ttl == -1:  # 没有 TTL = 可能的死锁
            leaked.append(key)
    if cursor == 0:
        break

if leaked:
    alert(f"发现 {len(leaked)} 个无 TTL 的锁 key,可能存在死锁:{leaked[:10]}")

锁等待超时告警:应用层记录锁等待时间,P99 超过 1s 时发出告警,可能意味着锁竞争过于激烈或持有时间过长,需考虑细化锁粒度。

本章评分
4.8  / 5  (3 评分)

💬 留言讨论