命令执行全链路:从 TCP 字节到 +OK 回包
第12章 命令执行全链路:从 TCP 字节到 +OK 回包
12.1 概述
本章以 SET key value EX 100 为例,完整追踪一条 Redis 命令从网络字节到响应的每一步。涉及文件:networking.c、server.c、t_string.c、aof.c、replication.c、db.c。
12.2 第一步:TCP 字节到达
文件:networking.c,函数:readQueryFromClient()
Redis 使用 单线程 + I/O 多路复用(epoll/kqueue)模型。当 TCP 数据到达时,内核通知 epoll,事件循环调用 readQueryFromClient()。
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread;
size_t readlen = PROTO_IOBUF_LEN; // 16KB
// 动态扩展 querybuf
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = connRead(conn, c->querybuf + sdslen(c->querybuf), readlen);
if (nread <= 0) {
// 连接关闭或错误处理
freeClientAsync(c);
return;
}
sdsIncrLen(c->querybuf, nread);
c->lastinteraction = server.unixtime;
// 进入解析流程
processInputBuffer(c);
}
client->querybuf 是一个动态字符串(SDS),存放未解析的原始 TCP 字节流。每次 read() 系统调用最多读取 16KB(PROTO_IOBUF_LEN)。
判断协议类型:
// processInputBuffer() 开头
if (c->reqtype == 0) {
if (c->querybuf[0] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK; // RESP 多批量协议
} else {
c->reqtype = PROTO_REQ_INLINE; // inline 协议(telnet 兼容)
}
}
12.3 第二步:RESP 协议解析
文件:networking.c,函数:processMultibulkBuffer()
命令 SET key value EX 100 通过 RESP 协议传输的字节流:
*5\r\n ← 5个参数
$3\r\n ← 第1个参数长度3字节
SET\r\n ← 命令名
$3\r\n ← 第2个参数长度3字节
key\r\n ← key
$5\r\n ← 第3个参数长度5字节
value\r\n ← value
$2\r\n ← 第4个参数长度2字节
EX\r\n ← 选项名
$3\r\n ← 第5个参数长度3字节
100\r\n ← 过期时间(秒)
解析过程(简化):
int processMultibulkBuffer(client *c) {
// 解析 *5\r\n,设置 c->multibulklen = 5
if (c->multibulklen == 0) {
// 读取第一行,提取参数数量
newline = strchr(c->querybuf + c->qb_pos, '\r');
c->multibulklen = atoi(c->querybuf + c->qb_pos + 1);
c->argv = zmalloc(sizeof(robj*) * c->multibulklen);
}
// 逐个解析 $len\r\ndata\r\n
while (c->multibulklen) {
if (c->bulklen == -1) {
newline = strchr(c->querybuf + c->qb_pos, '\r');
c->bulklen = atoi(c->querybuf + c->qb_pos + 1);
}
// 检查数据是否已全部到达
if (sdslen(c->querybuf) - c->qb_pos < (size_t)(c->bulklen + 2))
break; // 数据未到齐,等待下次 read
// 创建 robj,写入 argv
c->argv[c->argc++] = createStringObject(
c->querybuf + c->qb_pos, c->bulklen
);
c->qb_pos += c->bulklen + 2;
c->bulklen = -1;
c->multibulklen--;
}
if (c->multibulklen == 0) return C_OK; // 解析完成
return C_ERR; // 数据不完整,继续等待
}
解析完成后:
c->argc = 5
c->argv = [
robj("SET"), // argv[0] — 命令名
robj("key"), // argv[1]
robj("value"), // argv[2]
robj("EX"), // argv[3]
robj("100"), // argv[4]
]
12.4 第三步:命令查找
文件:server.c,函数:processCommand() → lookupCommand()
int processCommand(client *c) {
// 1. 查找命令
c->cmd = lookupCommand(c->argv, c->argc);
if (!c->cmd) {
addReplyErrorFormat(c, "unknown command '%s'...", ...);
return C_OK;
}
// 2. 参数数量检查(SET 的 arity = -3,表示最少3个参数)
if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
addReplyErrorFormat(c, "wrong number of arguments...");
return C_OK;
}
// ... 后续检查
}
lookupCommand() 实现:
struct redisCommand *lookupCommand(robj **argv, int argc) {
return lookupCommandLogic(server.commands, argv, argc, 0);
}
// server.commands 是一个 dict(哈希表),key = 命令名小写字符串
// O(1) 查找:siphash(命令名) → bucket → 链表搜索
server.commands 在 Redis 启动时由 populateCommandTable() 填充,将 redisCommandTable[] 数组中的每个命令注册进去。
redisCommand 结构体(关键字段):
struct redisCommand {
const char *fullname; // "set"
redisCommandProc *proc; // setCommand 函数指针
int arity; // -3(至少3个参数)
uint64_t flags; // CMD_WRITE | CMD_DENYOOM | ...
int first_key; // 1(第一个key的参数索引)
int last_key; // 1
int key_step; // 1
// Redis 7.0+: acl categories, subcommands, etc.
};
12.5 第四步:前置检查
文件:server.c,函数:processCommand()(续)
// 检查认证
if (!server.requirepass == NULL && !c->authenticated) {
addReplyError(c, "NOAUTH Authentication required.");
return C_OK;
}
// 检查是否暂停接受写命令(主从同步期间)
if (server.client_pause_type != CLIENT_PAUSE_OFF &&
!isClientPauseForced(c) && !c->multi &&
c->cmd->flags & CMD_WRITE) {
// 将命令放入等待队列
blockClientForReplicaAck(c);
return C_OK;
}
// 检查内存上限,执行淘汰策略
if (server.maxmemory && !server.loading) {
int out_of_memory = (freeMemoryIfNeededAndSafe() == C_ERR);
if (out_of_memory && c->cmd->flags & CMD_DENYOOM) {
addReply(c, shared.oomerr);
return C_OK;
}
}
// 持久化错误检查(AOF 或 RDB 写入失败时拒绝写命令)
if (((server.stop_writes_on_bgsave_err &&
server.saveparamslen > 0 &&
server.lastbgsave_status == C_ERR) ||
(server.aof_state != AOF_OFF &&
server.aof_last_write_status == C_ERR)) &&
server.masterhost == NULL &&
(c->cmd->flags & CMD_WRITE)) {
// MISCONF error
addReply(c, shared.bgsaveerr);
return C_OK;
}
通过所有检查后,调用 call() 执行命令。
12.6 第五步:命令执行
文件:t_string.c,函数:setCommand()
void setCommand(client *c) {
robj *expire = NULL;
int unit = UNIT_SECONDS;
int flags = OBJ_NO_FLAGS;
// 解析 EX/PX/EXAT/PXAT/NX/XX/GET/KEEPTTL 选项
if (parseExtendedStringArgumentsOrReply(c, &flags, &unit, &expire,
COMMAND_SET) != C_OK) {
return; // 参数错误,已发送错误回包
}
// 对 value 进行编码优化
c->argv[2] = tryObjectEncoding(c->argv[2]);
// 执行实际设置
setGenericCommand(c, flags, c->argv[1], c->argv[2], expire, unit, NULL, NULL);
}
void setGenericCommand(client *c, int flags, robj *key, robj *val,
robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0;
// 计算过期时间(毫秒)
if (expire) {
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
return;
if (unit == UNIT_SECONDS) milliseconds *= 1000LL;
// 检查过期时间合法性
if (milliseconds <= 0 || ...) {
addReplyError(c, "invalid expire time in 'set' command");
return;
}
}
// NX/XX 语义检查
if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db, key) != NULL) ||
(flags & OBJ_SET_XX && lookupKeyWrite(c->db, key) == NULL)) {
addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
return;
}
// 核心写入
genericSetKey(c, c->db, key, val, flags & OBJ_SET_KEEPTTL, 1);
// 设置过期时间
if (expire) {
setExpire(c, c->db, key, milliseconds + mstime());
}
server.dirty++; // 脏数据计数器+1,触发 BGSAVE 条件检查
// 发送成功回包
addReply(c, ok_reply ? ok_reply : shared.ok);
}
genericSetKey() → dbAdd() 或 dbOverwrite():
void genericSetKey(client *c, redisDb *db, robj *key, robj *val,
int keepttl, int signal) {
if (lookupKeyWrite(db, key) == NULL) {
dbAdd(db, key, val); // key 不存在,新增
} else {
dbOverwrite(db, key, val); // key 已存在,覆盖
}
incrRefCount(val);
if (!keepttl) removeExpire(db, key); // 覆盖时清除旧TTL
if (signal) signalModifiedKey(c, db, key); // 通知 watch
}
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
int retval = dictAdd(db->dict, copy, val); // 写入主字典
// 若该 key 在 expires 字典中存在(理论上不应),清除
if (server.cluster_enabled) slotToKeyAddEntry(key->ptr, db);
}
12.7 第六步:String 编码选择
文件:object.c,函数:tryObjectEncoding()
robj *tryObjectEncoding(robj *o) {
// 只对 RAW 或 EMBSTR 类型尝试优化
if (!sdsEncodedObject(o)) return o;
sds s = o->ptr;
size_t len = sdslen(s);
// 尝试整数编码
long long value;
if (len <= 20 && string2ll(s, len, &value)) {
if ((value >= 0 && value < OBJ_SHARED_INTEGERS)) {
decrRefCount(o);
return shared.integers[value]; // 共享对象(0-9999)
}
if (value >= LONG_MIN && value <= LONG_MAX) {
o->encoding = OBJ_ENCODING_INT;
sdsfree(o->ptr);
o->ptr = (void*)value; // 直接用指针存整数值
return o;
}
}
// 长度 <= 44 字节 → embstr(对象头+SDS 在一块连续内存)
if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) { // 44
robj *emb = createEmbeddedStringObject(s, len);
decrRefCount(o);
return emb;
}
// 否则保持 raw(SDS 独立分配)
trimStringObjectIfNeeded(o);
return o;
}
三种编码的内存布局:
INT 编码(值 12345):
robj { type=0, encoding=1, refcount=1, lru=..., ptr=0x3039 }
总大小:16 字节(ptr 直接存整数,无额外分配)
EMBSTR 编码("hello world",11字节):
[robj头 16B][sdshdr8头 3B][数据 11B][\0]
一次 malloc,连续内存,总 ~31 字节
RAW 编码(长字符串):
robj { ..., ptr → sdshdr }(两次 malloc)
sdshdr { len, alloc, flags, buf[] }
12.8 第七步:AOF 传播
文件:aof.c,函数:feedAppendOnlyFile()
void call(client *c, int flags) {
// 执行命令
c->cmd->proc(c);
// 传播到 AOF 和从库
if (flags & CMD_CALL_PROPAGATE) {
int propagate_flags = PROPAGATE_NONE;
if (c->cmd->flags & CMD_WRITE) propagate_flags |= PROPAGATE_AOF;
if (propagate_flags != PROPAGATE_NONE && !server.loading) {
propagate(c->cmd, c->db->id, c->argv, c->argc, propagate_flags);
}
}
}
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid,
robj **argv, int argc) {
sds buf = sdsempty();
// 若需要先切换 DB,追加 SELECT 命令
if (dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb, sizeof(seldb), "%d", dictid);
buf = sdscatprintf(buf, "*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
strlen(seldb), seldb);
server.aof_selected_db = dictid;
}
// 命令 SET key value EX 100 带有过期时间,转换为:
// SET key value + PEXPIREAT key <abs_timestamp_ms>
// (以便 AOF 重放时使用绝对时间,不受重放时间偏差影响)
if (cmd->proc == setCommand && ...) {
buf = catAppendOnlyExpireAtCommand(buf, server.db+dictid, argv[1]);
}
// 将 RESP 格式命令追加到 buf
buf = catAppendOnlyGenericCommand(buf, argc, argv);
// 追加到 server.aof_buf(内存缓冲区)
server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf));
sdsfree(buf);
}
AOF 落盘时机(由 flushAppendOnlyFile() 在事件循环 beforeSleep() 中调用):
// appendfsync always:每次 beforeSleep 都调用 fsync()
// appendfsync everysec:距上次 fsync 超过1秒才触发(异步线程执行)
// appendfsync no:只 write(),不 fsync(),由 OS 决定落盘时机
12.9 第八步:从库传播
文件:replication.c,函数:replicationFeedSlaves()
void replicationFeedSlaves(list *slaves, int dictid,
robj **argv, int argc) {
// 写入 replication backlog(环形缓冲区)
// 用于从库断线重连后的部分重同步(PSYNC)
if (server.repl_backlog) {
char aux[LONG_STR_SIZE+3];
// 将命令序列化为 RESP 格式写入 backlog
feedReplicationBuffer(buf, len);
}
// 向每个在线从库的 output buffer 追加命令
listRewind(slaves, &li);
while ((ln = listNext(&li)) != NULL) {
client *slave = ln->value;
if (slave->replstate != SLAVE_STATE_ONLINE) continue;
addReplyProtoToList(slave->reply, buf, len);
// 触发可写事件,由事件循环发送
}
}
server.repl_backlog 是一个固定大小的环形缓冲区(默认 1MB),保存最近的复制数据。从库断线重连时,若 backlog 中仍有其缺失的数据,可执行 PSYNC(增量同步)而非完整的 RDB 全量同步。
12.10 第九步:回包
文件:networking.c,函数:addReply()
void addReply(client *c, robj *obj) {
// 检查连接是否还有效
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
// 直接将 SDS 数据追加到 client 输出缓冲区
_addReplyToBufferOrList(c, obj->ptr, sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
// 整数转字符串
char buf[32];
size_t len = ll2string(buf, sizeof(buf), (long)obj->ptr);
_addReplyToBufferOrList(c, buf, len);
}
}
// shared.ok 是预先分配的常量对象,内容为 "+OK\r\n"
// 避免每次都分配内存
static int _addReplyToBufferOrList(client *c, const char *s, size_t len) {
// 优先写入固定大小的 c->buf(16KB)
if (c->bufpos < (int)sizeof(c->buf) - (int)len) {
memcpy(c->buf + c->bufpos, s, len);
c->bufpos += len;
return C_OK;
}
// buf 满了则追加到 c->reply 链表(动态扩展)
listAddNodeTail(c->reply, sdsnewlen(s, len));
return C_OK;
}
响应数据写入 c->buf 后,由下一次事件循环中的 sendReplyToClient() 通过 write() 系统调用发送给客户端:
客户端收到:+OK\r\n
12.11 完整调用链总结
TCP 读事件触发
└─ readQueryFromClient() networking.c
└─ processInputBuffer() networking.c
└─ processMultibulkBuffer() networking.c ← RESP 解析
└─ processCommandAndResetClient()
└─ processCommand() server.c ← 查找+前置检查
└─ call() server.c
├─ setCommand() t_string.c ← 执行
│ ├─ parseExtendedStringArgumentsOrReply()
│ └─ setGenericCommand()
│ ├─ genericSetKey() → dbAdd/dbOverwrite
│ └─ setExpire()
├─ propagate() server.c
│ ├─ feedAppendOnlyFile() aof.c ← AOF
│ └─ replicationFeedSlaves() replication.c ← 从库
└─ addReply(shared.ok) networking.c ← 回包
12.12 关键性能数据
| 阶段 | 典型耗时 | 主要开销 |
|---|---|---|
| TCP read + RESP 解析 | 1-3 µs | 内存拷贝,字符串扫描 |
| 命令查找 | < 0.1 µs | 哈希表查找 |
| 前置检查 | 0.1-5 µs | 内存淘汰可能触发驱逐 |
| SET 执行 + 编码 | 0.5-2 µs | dict 写入,SDS 操作 |
| AOF buf 追加 | 0.5-1 µs | sdscat 内存操作 |
| addReply | < 0.1 µs | memcpy 到 c->buf |
| 合计(不含网络RTT) | ~5-10 µs | |
| 网络 RTT(同机房) | ~200 µs | 占总延迟 95%+ |
这解释了为什么 Redis 的"慢"几乎都在网络,而非命令执行本身。appendfsync always 会将 fsync() 的磁盘延迟(1-10ms)插入到关键路径,这是它使吞吐量从百万级降到万级的根本原因。