第 12 章

命令执行全链路:从 TCP 字节到 +OK 回包

第12章 命令执行全链路:从 TCP 字节到 +OK 回包

12.1 概述

本章以 SET key value EX 100 为例,完整追踪一条 Redis 命令从网络字节到响应的每一步。涉及文件:networking.cserver.ct_string.caof.creplication.cdb.c


12.2 第一步:TCP 字节到达

文件:networking.c,函数:readQueryFromClient()

Redis 使用 单线程 + I/O 多路复用(epoll/kqueue)模型。当 TCP 数据到达时,内核通知 epoll,事件循环调用 readQueryFromClient()

void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread;
    size_t readlen = PROTO_IOBUF_LEN; // 16KB

    // 动态扩展 querybuf
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = connRead(conn, c->querybuf + sdslen(c->querybuf), readlen);

    if (nread <= 0) {
        // 连接关闭或错误处理
        freeClientAsync(c);
        return;
    }
    sdsIncrLen(c->querybuf, nread);
    c->lastinteraction = server.unixtime;

    // 进入解析流程
    processInputBuffer(c);
}

client->querybuf 是一个动态字符串(SDS),存放未解析的原始 TCP 字节流。每次 read() 系统调用最多读取 16KB(PROTO_IOBUF_LEN)。

判断协议类型

// processInputBuffer() 开头
if (c->reqtype == 0) {
    if (c->querybuf[0] == '*') {
        c->reqtype = PROTO_REQ_MULTIBULK;  // RESP 多批量协议
    } else {
        c->reqtype = PROTO_REQ_INLINE;     // inline 协议(telnet 兼容)
    }
}

12.3 第二步:RESP 协议解析

文件:networking.c,函数:processMultibulkBuffer()

命令 SET key value EX 100 通过 RESP 协议传输的字节流:

*5\r\n          ← 5个参数
$3\r\n          ← 第1个参数长度3字节
SET\r\n         ← 命令名
$3\r\n          ← 第2个参数长度3字节
key\r\n         ← key
$5\r\n          ← 第3个参数长度5字节
value\r\n       ← value
$2\r\n          ← 第4个参数长度2字节
EX\r\n          ← 选项名
$3\r\n          ← 第5个参数长度3字节
100\r\n         ← 过期时间(秒)

解析过程(简化):

int processMultibulkBuffer(client *c) {
    // 解析 *5\r\n,设置 c->multibulklen = 5
    if (c->multibulklen == 0) {
        // 读取第一行,提取参数数量
        newline = strchr(c->querybuf + c->qb_pos, '\r');
        c->multibulklen = atoi(c->querybuf + c->qb_pos + 1);
        c->argv = zmalloc(sizeof(robj*) * c->multibulklen);
    }

    // 逐个解析 $len\r\ndata\r\n
    while (c->multibulklen) {
        if (c->bulklen == -1) {
            newline = strchr(c->querybuf + c->qb_pos, '\r');
            c->bulklen = atoi(c->querybuf + c->qb_pos + 1);
        }
        // 检查数据是否已全部到达
        if (sdslen(c->querybuf) - c->qb_pos < (size_t)(c->bulklen + 2))
            break; // 数据未到齐,等待下次 read

        // 创建 robj,写入 argv
        c->argv[c->argc++] = createStringObject(
            c->querybuf + c->qb_pos, c->bulklen
        );
        c->qb_pos += c->bulklen + 2;
        c->bulklen = -1;
        c->multibulklen--;
    }

    if (c->multibulklen == 0) return C_OK; // 解析完成
    return C_ERR; // 数据不完整,继续等待
}

解析完成后:

c->argc = 5
c->argv = [
  robj("SET"),    // argv[0] — 命令名
  robj("key"),    // argv[1]
  robj("value"),  // argv[2]
  robj("EX"),     // argv[3]
  robj("100"),    // argv[4]
]

12.4 第三步:命令查找

文件:server.c,函数:processCommand()lookupCommand()

int processCommand(client *c) {
    // 1. 查找命令
    c->cmd = lookupCommand(c->argv, c->argc);

    if (!c->cmd) {
        addReplyErrorFormat(c, "unknown command '%s'...", ...);
        return C_OK;
    }

    // 2. 参数数量检查(SET 的 arity = -3,表示最少3个参数)
    if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
        (c->argc < -c->cmd->arity)) {
        addReplyErrorFormat(c, "wrong number of arguments...");
        return C_OK;
    }
    // ... 后续检查
}

lookupCommand() 实现:

struct redisCommand *lookupCommand(robj **argv, int argc) {
    return lookupCommandLogic(server.commands, argv, argc, 0);
}

// server.commands 是一个 dict(哈希表),key = 命令名小写字符串
// O(1) 查找:siphash(命令名) → bucket → 链表搜索

server.commands 在 Redis 启动时由 populateCommandTable() 填充,将 redisCommandTable[] 数组中的每个命令注册进去。

redisCommand 结构体(关键字段):

struct redisCommand {
    const char *fullname;      // "set"
    redisCommandProc *proc;    // setCommand 函数指针
    int arity;                 // -3(至少3个参数)
    uint64_t flags;            // CMD_WRITE | CMD_DENYOOM | ...
    int first_key;             // 1(第一个key的参数索引)
    int last_key;              // 1
    int key_step;              // 1
    // Redis 7.0+: acl categories, subcommands, etc.
};

12.5 第四步:前置检查

文件:server.c,函数:processCommand()(续)

// 检查认证
if (!server.requirepass == NULL && !c->authenticated) {
    addReplyError(c, "NOAUTH Authentication required.");
    return C_OK;
}

// 检查是否暂停接受写命令(主从同步期间)
if (server.client_pause_type != CLIENT_PAUSE_OFF &&
    !isClientPauseForced(c) && !c->multi &&
    c->cmd->flags & CMD_WRITE) {
    // 将命令放入等待队列
    blockClientForReplicaAck(c);
    return C_OK;
}

// 检查内存上限,执行淘汰策略
if (server.maxmemory && !server.loading) {
    int out_of_memory = (freeMemoryIfNeededAndSafe() == C_ERR);
    if (out_of_memory && c->cmd->flags & CMD_DENYOOM) {
        addReply(c, shared.oomerr);
        return C_OK;
    }
}

// 持久化错误检查(AOF 或 RDB 写入失败时拒绝写命令)
if (((server.stop_writes_on_bgsave_err &&
      server.saveparamslen > 0 &&
      server.lastbgsave_status == C_ERR) ||
     (server.aof_state != AOF_OFF &&
      server.aof_last_write_status == C_ERR)) &&
    server.masterhost == NULL &&
    (c->cmd->flags & CMD_WRITE)) {
    // MISCONF error
    addReply(c, shared.bgsaveerr);
    return C_OK;
}

通过所有检查后,调用 call() 执行命令。


12.6 第五步:命令执行

文件:t_string.c,函数:setCommand()

void setCommand(client *c) {
    robj *expire = NULL;
    int unit = UNIT_SECONDS;
    int flags = OBJ_NO_FLAGS;

    // 解析 EX/PX/EXAT/PXAT/NX/XX/GET/KEEPTTL 选项
    if (parseExtendedStringArgumentsOrReply(c, &flags, &unit, &expire,
                                            COMMAND_SET) != C_OK) {
        return; // 参数错误,已发送错误回包
    }

    // 对 value 进行编码优化
    c->argv[2] = tryObjectEncoding(c->argv[2]);

    // 执行实际设置
    setGenericCommand(c, flags, c->argv[1], c->argv[2], expire, unit, NULL, NULL);
}

void setGenericCommand(client *c, int flags, robj *key, robj *val,
                       robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    long long milliseconds = 0;

    // 计算过期时间(毫秒)
    if (expire) {
        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
            return;
        if (unit == UNIT_SECONDS) milliseconds *= 1000LL;
        // 检查过期时间合法性
        if (milliseconds <= 0 || ...) {
            addReplyError(c, "invalid expire time in 'set' command");
            return;
        }
    }

    // NX/XX 语义检查
    if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db, key) != NULL) ||
        (flags & OBJ_SET_XX && lookupKeyWrite(c->db, key) == NULL)) {
        addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
        return;
    }

    // 核心写入
    genericSetKey(c, c->db, key, val, flags & OBJ_SET_KEEPTTL, 1);

    // 设置过期时间
    if (expire) {
        setExpire(c, c->db, key, milliseconds + mstime());
    }

    server.dirty++; // 脏数据计数器+1,触发 BGSAVE 条件检查

    // 发送成功回包
    addReply(c, ok_reply ? ok_reply : shared.ok);
}

genericSetKey()dbAdd()dbOverwrite()

void genericSetKey(client *c, redisDb *db, robj *key, robj *val,
                   int keepttl, int signal) {
    if (lookupKeyWrite(db, key) == NULL) {
        dbAdd(db, key, val);        // key 不存在,新增
    } else {
        dbOverwrite(db, key, val);  // key 已存在,覆盖
    }
    incrRefCount(val);
    if (!keepttl) removeExpire(db, key); // 覆盖时清除旧TTL
    if (signal) signalModifiedKey(c, db, key); // 通知 watch
}

void dbAdd(redisDb *db, robj *key, robj *val) {
    sds copy = sdsdup(key->ptr);
    int retval = dictAdd(db->dict, copy, val); // 写入主字典
    // 若该 key 在 expires 字典中存在(理论上不应),清除
    if (server.cluster_enabled) slotToKeyAddEntry(key->ptr, db);
}

12.7 第六步:String 编码选择

文件:object.c,函数:tryObjectEncoding()

robj *tryObjectEncoding(robj *o) {
    // 只对 RAW 或 EMBSTR 类型尝试优化
    if (!sdsEncodedObject(o)) return o;

    sds s = o->ptr;
    size_t len = sdslen(s);

    // 尝试整数编码
    long long value;
    if (len <= 20 && string2ll(s, len, &value)) {
        if ((value >= 0 && value < OBJ_SHARED_INTEGERS)) {
            decrRefCount(o);
            return shared.integers[value]; // 共享对象(0-9999)
        }
        if (value >= LONG_MIN && value <= LONG_MAX) {
            o->encoding = OBJ_ENCODING_INT;
            sdsfree(o->ptr);
            o->ptr = (void*)value; // 直接用指针存整数值
            return o;
        }
    }

    // 长度 <= 44 字节 → embstr(对象头+SDS 在一块连续内存)
    if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) { // 44
        robj *emb = createEmbeddedStringObject(s, len);
        decrRefCount(o);
        return emb;
    }

    // 否则保持 raw(SDS 独立分配)
    trimStringObjectIfNeeded(o);
    return o;
}

三种编码的内存布局:

INT 编码(值 12345):
  robj { type=0, encoding=1, refcount=1, lru=..., ptr=0x3039 }
  总大小:16 字节(ptr 直接存整数,无额外分配)

EMBSTR 编码("hello world",11字节):
  [robj头 16B][sdshdr8头 3B][数据 11B][\0]
  一次 malloc,连续内存,总 ~31 字节

RAW 编码(长字符串):
  robj { ..., ptr → sdshdr }(两次 malloc)
  sdshdr { len, alloc, flags, buf[] }

12.8 第七步:AOF 传播

文件:aof.c,函数:feedAppendOnlyFile()

void call(client *c, int flags) {
    // 执行命令
    c->cmd->proc(c);

    // 传播到 AOF 和从库
    if (flags & CMD_CALL_PROPAGATE) {
        int propagate_flags = PROPAGATE_NONE;
        if (c->cmd->flags & CMD_WRITE) propagate_flags |= PROPAGATE_AOF;
        if (propagate_flags != PROPAGATE_NONE && !server.loading) {
            propagate(c->cmd, c->db->id, c->argv, c->argc, propagate_flags);
        }
    }
}

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid,
                        robj **argv, int argc) {
    sds buf = sdsempty();

    // 若需要先切换 DB,追加 SELECT 命令
    if (dictid != server.aof_selected_db) {
        char seldb[64];
        snprintf(seldb, sizeof(seldb), "%d", dictid);
        buf = sdscatprintf(buf, "*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
                           strlen(seldb), seldb);
        server.aof_selected_db = dictid;
    }

    // 命令 SET key value EX 100 带有过期时间,转换为:
    // SET key value + PEXPIREAT key <abs_timestamp_ms>
    // (以便 AOF 重放时使用绝对时间,不受重放时间偏差影响)
    if (cmd->proc == setCommand && ...) {
        buf = catAppendOnlyExpireAtCommand(buf, server.db+dictid, argv[1]);
    }

    // 将 RESP 格式命令追加到 buf
    buf = catAppendOnlyGenericCommand(buf, argc, argv);

    // 追加到 server.aof_buf(内存缓冲区)
    server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf));
    sdsfree(buf);
}

AOF 落盘时机(由 flushAppendOnlyFile() 在事件循环 beforeSleep() 中调用):

// appendfsync always:每次 beforeSleep 都调用 fsync()
// appendfsync everysec:距上次 fsync 超过1秒才触发(异步线程执行)
// appendfsync no:只 write(),不 fsync(),由 OS 决定落盘时机

12.9 第八步:从库传播

文件:replication.c,函数:replicationFeedSlaves()

void replicationFeedSlaves(list *slaves, int dictid,
                           robj **argv, int argc) {
    // 写入 replication backlog(环形缓冲区)
    // 用于从库断线重连后的部分重同步(PSYNC)
    if (server.repl_backlog) {
        char aux[LONG_STR_SIZE+3];
        // 将命令序列化为 RESP 格式写入 backlog
        feedReplicationBuffer(buf, len);
    }

    // 向每个在线从库的 output buffer 追加命令
    listRewind(slaves, &li);
    while ((ln = listNext(&li)) != NULL) {
        client *slave = ln->value;
        if (slave->replstate != SLAVE_STATE_ONLINE) continue;
        addReplyProtoToList(slave->reply, buf, len);
        // 触发可写事件,由事件循环发送
    }
}

server.repl_backlog 是一个固定大小的环形缓冲区(默认 1MB),保存最近的复制数据。从库断线重连时,若 backlog 中仍有其缺失的数据,可执行 PSYNC(增量同步)而非完整的 RDB 全量同步。


12.10 第九步:回包

文件:networking.c,函数:addReply()

void addReply(client *c, robj *obj) {
    // 检查连接是否还有效
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        // 直接将 SDS 数据追加到 client 输出缓冲区
        _addReplyToBufferOrList(c, obj->ptr, sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        // 整数转字符串
        char buf[32];
        size_t len = ll2string(buf, sizeof(buf), (long)obj->ptr);
        _addReplyToBufferOrList(c, buf, len);
    }
}

// shared.ok 是预先分配的常量对象,内容为 "+OK\r\n"
// 避免每次都分配内存

static int _addReplyToBufferOrList(client *c, const char *s, size_t len) {
    // 优先写入固定大小的 c->buf(16KB)
    if (c->bufpos < (int)sizeof(c->buf) - (int)len) {
        memcpy(c->buf + c->bufpos, s, len);
        c->bufpos += len;
        return C_OK;
    }
    // buf 满了则追加到 c->reply 链表(动态扩展)
    listAddNodeTail(c->reply, sdsnewlen(s, len));
    return C_OK;
}

响应数据写入 c->buf 后,由下一次事件循环中的 sendReplyToClient() 通过 write() 系统调用发送给客户端:

客户端收到:+OK\r\n

12.11 完整调用链总结

TCP 读事件触发
  └─ readQueryFromClient()                   networking.c
       └─ processInputBuffer()               networking.c
            └─ processMultibulkBuffer()      networking.c  ← RESP 解析
                 └─ processCommandAndResetClient()
                      └─ processCommand()    server.c      ← 查找+前置检查
                           └─ call()         server.c
                                ├─ setCommand()            t_string.c  ← 执行
                                │    ├─ parseExtendedStringArgumentsOrReply()
                                │    └─ setGenericCommand()
                                │         ├─ genericSetKey() → dbAdd/dbOverwrite
                                │         └─ setExpire()
                                ├─ propagate()             server.c
                                │    ├─ feedAppendOnlyFile()  aof.c   ← AOF
                                │    └─ replicationFeedSlaves() replication.c ← 从库
                                └─ addReply(shared.ok)     networking.c ← 回包

12.12 关键性能数据

阶段 典型耗时 主要开销
TCP read + RESP 解析 1-3 µs 内存拷贝,字符串扫描
命令查找 < 0.1 µs 哈希表查找
前置检查 0.1-5 µs 内存淘汰可能触发驱逐
SET 执行 + 编码 0.5-2 µs dict 写入,SDS 操作
AOF buf 追加 0.5-1 µs sdscat 内存操作
addReply < 0.1 µs memcpy 到 c->buf
合计(不含网络RTT) ~5-10 µs
网络 RTT(同机房) ~200 µs 占总延迟 95%+

这解释了为什么 Redis 的"慢"几乎都在网络,而非命令执行本身。appendfsync always 会将 fsync() 的磁盘延迟(1-10ms)插入到关键路径,这是它使吞吐量从百万级降到万级的根本原因。

本章评分
4.6  / 5  (30 评分)

💬 留言讨论