Command Execution Pipeline: From TCP Bytes to +OK Response
Chapter 12: Full Command Execution Path โ From TCP Bytes to +OK
12.1 Overview
This chapter traces every step a single SET key value EX 100 command takes from raw network bytes to the final +OK\r\n response. Source files involved: networking.c, server.c, t_string.c, db.c, aof.c, replication.c, object.c.
Redis is built around a single-threaded event loop backed by epoll (Linux) or kqueue (macOS/BSD). All command processing happens in this single thread, which is why Redis achieves microsecond latency without locks.
12.2 Step 1: TCP Bytes Arrive
File: networking.c โ Function: readQueryFromClient()
When the OS signals an inbound read event, the event loop calls readQueryFromClient():
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread;
size_t readlen = PROTO_IOBUF_LEN; /* 16 KB read buffer */
/* Expand the query buffer if needed */
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = connRead(conn, c->querybuf + sdslen(c->querybuf), readlen);
if (nread == -1) {
if (connGetState(conn) != CONN_STATE_CONNECTED) {
freeClientAsync(c);
return;
}
return; /* EAGAIN โ no data yet */
}
if (nread == 0) {
/* Connection closed by client */
freeClientAsync(c);
return;
}
sdsIncrLen(c->querybuf, nread);
c->lastinteraction = server.unixtime;
/* Track bytes received per client (for client stats) */
atomicIncr(server.stat_net_input_bytes, nread);
processInputBuffer(c);
}
c->querybuf is an SDS (Simple Dynamic String) that accumulates raw bytes. Each connRead() call reads at most 16 KB. If a command's payload exceeds 16 KB it is split across multiple calls; processInputBuffer() handles partial buffers gracefully by returning early if the buffer does not yet hold a complete command.
Protocol type detection:
/* In processInputBuffer() */
if (c->reqtype == 0) {
if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK; /* standard RESP */
} else {
c->reqtype = PROTO_REQ_INLINE; /* telnet-style */
}
}
12.3 Step 2: RESP Protocol Parsing
File: networking.c โ Function: processMultibulkBuffer()
The RESP wire format for SET key value EX 100:
*5\r\n โ array of 5 elements
$3\r\n โ bulk string, 3 bytes
SET\r\n
$3\r\n โ bulk string, 3 bytes
key\r\n
$5\r\n โ bulk string, 5 bytes
value\r\n
$2\r\n โ bulk string, 2 bytes
EX\r\n
$3\r\n โ bulk string, 3 bytes
100\r\n
Total wire bytes: 38 bytes for this command.
int processMultibulkBuffer(client *c) {
char *newline;
int ok;
long long ll;
/* Parse the '*N' line to get argument count */
if (c->multibulklen == 0) {
newline = strchr(c->querybuf + c->qb_pos, '\r');
if (newline == NULL) return C_ERR; /* incomplete */
ok = string2ll(c->querybuf + c->qb_pos + 1,
newline - (c->querybuf + c->qb_pos) - 1, &ll);
c->qb_pos = newline - c->querybuf + 2; /* skip \r\n */
c->multibulklen = ll;
c->argv = zmalloc(sizeof(robj*) * c->multibulklen);
}
/* Parse each '$N\r\nDATA\r\n' element */
while (c->multibulklen) {
/* Read the '$N' length line */
if (c->bulklen == -1) {
newline = strchr(c->querybuf + c->qb_pos, '\r');
if (newline == NULL) break; /* wait for more data */
string2ll(c->querybuf + c->qb_pos + 1,
newline - (c->querybuf + c->qb_pos) - 1, &ll);
c->bulklen = ll;
c->qb_pos = newline - c->querybuf + 2;
}
/* Check if full bulk data has arrived */
if ((size_t)(c->qb_pos + c->bulklen + 2) > sdslen(c->querybuf))
break; /* partial โ wait for next read */
c->argv[c->argc++] = createStringObject(
c->querybuf + c->qb_pos, c->bulklen
);
c->qb_pos += c->bulklen + 2; /* skip data + \r\n */
c->bulklen = -1;
c->multibulklen--;
}
if (c->multibulklen == 0) return C_OK; /* fully parsed */
return C_ERR;
}
After successful parsing:
c->argc = 5
c->argv[0] = robj{ encoding=EMBSTR, ptr="SET" }
c->argv[1] = robj{ encoding=EMBSTR, ptr="key" }
c->argv[2] = robj{ encoding=EMBSTR, ptr="value" }
c->argv[3] = robj{ encoding=EMBSTR, ptr="EX" }
c->argv[4] = robj{ encoding=EMBSTR, ptr="100" }
12.4 Step 3: Command Lookup
File: server.c โ Function: processCommand() โ lookupCommand()
int processCommand(client *c) {
/* --- Step 3a: look up the command in the command table --- */
c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv, c->argc);
if (c->cmd == NULL) {
sds args = catClientArguments(c, 3);
addReplyErrorFormat(c,
"unknown command `%s`, with args beginning with: %s",
(char*)c->argv[0]->ptr, args);
sdsfree(args);
return C_OK;
}
// ...
}
struct redisCommand *lookupCommand(robj **argv, int argc) {
return lookupCommandLogic(server.commands, argv, argc, 0);
}
server.commands is a dict (hash map) populated at startup by populateCommandTable(). The hash function is SipHash-1-2. Lookup is O(1).
The redisCommand descriptor for SET:
/* From server.c redisCommandTable[] */
{
"set", /* name */
setCommand, /* proc โ function pointer */
-3, /* arity: -3 = at least 3 args */
"write denyoom", /* flags */
0, /* acl categories */
{"@write","@string","@slow"},
1, /* first key index */
1, /* last key index */
1, /* key step */
}
The arity field of -3 means "3 or more arguments." Redis validates:
if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
addReplyErrorFormat(c, "wrong number of arguments for '%s' command",
c->cmd->fullname);
return C_OK;
}
12.5 Step 4: Pre-Execution Checks
Still inside processCommand(), Redis runs a gauntlet of safety checks:
/* Authentication */
if (!(c->flags & CLIENT_MULTI) && authRequired(c)) {
addReplyError(c, "NOAUTH Authentication required.");
return C_OK;
}
/* Client pause (triggered by WAIT, FAILOVER, or RESET) */
if (isPausedActionsWithUpdate(PAUSE_ACTION_CLIENT_WRITE)) {
blockClientForPausedActions(c);
return C_OK;
}
/* OOM check: evict memory if needed before write commands */
if (server.maxmemory && !server.loading) {
int out_of_memory = (freeMemoryIfNeededAndSafe() == C_ERR);
if (out_of_memory && c->cmd->flags & CMD_DENYOOM) {
addReply(c, shared.oomerr); /* -OOM command not allowed... */
return C_OK;
}
}
/* Disk error protection: refuse writes if AOF/RDB write failed */
if (((server.stop_writes_on_bgsave_err &&
server.saveparamslen > 0 &&
server.lastbgsave_status == C_ERR) ||
(server.aof_state != AOF_OFF &&
server.aof_last_write_status == C_ERR)) &&
server.masterhost == NULL &&
c->cmd->flags & CMD_WRITE) {
addReply(c, shared.bgsaveerr);
return C_OK;
}
/* Cluster key slot check */
if (server.cluster_enabled) {
if (getNodeByQuery(c, c->cmd, c->argv, c->argc, NULL, NULL)
!= server.cluster->myself) {
clusterRedirectClient(c, ...);
return C_OK;
}
}
Only after all checks pass does Redis call call().
12.6 Step 5: Command Execution
File: t_string.c โ Function: setCommand()
void setCommand(client *c) {
robj *expire = NULL;
int unit = UNIT_SECONDS;
int flags = OBJ_NO_FLAGS;
/* Parse optional arguments: EX, PX, EXAT, PXAT, NX, XX, GET, KEEPTTL */
if (parseExtendedStringArgumentsOrReply(c, &flags, &unit, &expire,
COMMAND_SET) != C_OK)
return;
/* Attempt encoding optimization before storing */
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c, flags, c->argv[1], c->argv[2],
expire, unit, NULL, NULL);
}
void setGenericCommand(client *c, int flags, robj *key, robj *val,
robj *expire, int unit,
robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0, when = 0;
/* Convert EX 100 โ 100,000 ms */
if (expire) {
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
return;
if (unit == UNIT_SECONDS)
milliseconds = milliseconds * 1000;
when = milliseconds + mstime(); /* absolute expiry in ms epoch */
/* Validate expiry */
if (checkAlreadyExpired(milliseconds)) {
/* If NX flag and key doesn't exist, we'd have set it then immediately
expired it โ just return NULL */
addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
return;
}
}
/* NX: fail if key already exists */
if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db, key) != NULL) ||
/* XX: fail if key does not exist */
(flags & OBJ_SET_XX && lookupKeyWrite(c->db, key) == NULL)) {
addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
return;
}
/* GET flag: return old value before overwriting */
if (flags & OBJ_SET_GET) {
if (getGenericCommand(c) == C_ERR) return;
}
/* Core write */
genericSetKey(c, c->db, key, val, flags & OBJ_SET_KEEPTTL, 1);
server.dirty++;
/* Set expiry in the expires dictionary */
if (expire) setExpire(c, c->db, key, when);
/* Keyspace notification */
notifyKeyspaceEvent(NOTIFY_STRING, "set", key, c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire", key, c->db->id);
addReply(c, ok_reply ? ok_reply : shared.ok);
}
Inside genericSetKey():
void genericSetKey(client *c, redisDb *db, robj *key, robj *val,
int keepttl, int signal) {
int existed = 0;
if (lookupKeyWrite(db, key) == NULL) {
dbAdd(db, key, val);
} else {
existed = 1;
dbOverwrite(db, key, val);
}
incrRefCount(val);
if (!keepttl) removeExpire(db, key); /* clear old TTL on overwrite */
if (signal) signalModifiedKey(c, db, key); /* wake WATCH'd clients */
}
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr); /* new SDS for dict key */
dictAdd(db->dict, copy, val); /* O(1) insert into main hash table */
if (server.cluster_enabled)
slotToKeyAddEntry(key->ptr, db); /* update slotโkey mapping */
}
12.7 Step 6: String Encoding Selection
File: object.c โ Function: tryObjectEncoding()
Before the value is stored, Redis attempts to encode it more compactly:
robj *tryObjectEncoding(robj *o) {
if (!sdsEncodedObject(o)) return o; /* already INT, skip */
sds s = o->ptr;
size_t len = sdslen(s);
/* Attempt integer encoding */
long long value;
if (len <= 20 && string2ll(s, len, &value)) {
/* Shared integers pool: 0 to 9999 */
if (value >= 0 && value < OBJ_SHARED_INTEGERS) {
decrRefCount(o);
return shared.integers[value]; /* pointer to pre-allocated object */
}
/* Store integer directly in pointer field โ no heap allocation */
if (value >= LONG_MIN && value <= LONG_MAX) {
o = makeObjectShared(o);
o->encoding = OBJ_ENCODING_INT;
sdsfree(o->ptr);
o->ptr = (void*)value;
return o;
}
}
/* EMBSTR: value fits within 44 bytes
robj header + sdshdr8 + data in one contiguous malloc() */
if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) { /* 44 */
robj *emb = createEmbeddedStringObject(s, len);
decrRefCount(o);
return emb;
}
/* RAW: large string โ robj and SDS allocated separately */
trimStringObjectIfNeeded(o); /* shrink SDS to fit, save memory */
return o;
}
Memory layout visualization:
Encoding INT (value = 42):
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ robj (16 bytes) โ
โ type = OBJ_STRING โ
โ encoding = OBJ_ENC_INT โ
โ refcount = 1 โ
โ ptr = (void*)42 โ โ integer stored in pointer itself
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Total: 16 bytes, zero extra heap allocation
Encoding EMBSTR ("hello", 5 bytes):
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ robj (16B) + sdshdr8 (3B) + data (5B) + NUL โ
โ allocated as ONE malloc() call โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Total: 24 bytes, one allocation โ CPU cache friendly
Encoding RAW ("very long string..." > 44 bytes):
โโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ robj (16B) โโโโโถโ sdshdr (3-11B) + data + NUL โ
โโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Two separate malloc() calls
The 44-byte EMBSTR threshold is chosen so that robj (16B) + sdshdr8 (3B) + data (44B) + NUL (1B) = 64 bytes, fitting exactly in a typical jemalloc size class.
12.8 Step 7: AOF Propagation
File: aof.c โ Function: feedAppendOnlyFile()
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid,
robj **argv, int argc) {
sds buf = sdsempty();
/* Emit SELECT if current DB changed since last AOF write */
if (dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb, sizeof(seldb), "%d", dictid);
buf = sdscatprintf(buf, "*2\r\n$6\r\nSELECT\r\n$%zu\r\n%s\r\n",
strlen(seldb), seldb);
server.aof_selected_db = dictid;
}
/* For SET with relative TTL (EX/PX), rewrite to absolute PEXPIREAT.
This ensures AOF replay uses wall-clock time rather than
relative offsets from replay time, which would corrupt TTLs. */
if (cmd->proc == setCommand && argc > 3) {
buf = catAppendOnlyExpireAtCommand(buf, server.db + dictid, argv[1]);
}
/* Serialize command as RESP into buf */
buf = catAppendOnlyGenericCommand(buf, argc, argv);
/* Append to in-memory AOF buffer โ will be flushed in beforeSleep() */
server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf));
sdsfree(buf);
}
The AOF buffer (server.aof_buf) is flushed to disk by flushAppendOnlyFile() called from beforeSleep() (the hook that runs just before epoll_wait()):
void flushAppendOnlyFile(int force) {
if (sdslen(server.aof_buf) == 0) {
/* If fsync is pending from last iteration, call it now */
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && ...)
aof_background_fsync(server.aof_fd);
return;
}
/* Write buffer to kernel page cache */
nwritten = aofWrite(server.aof_fd, server.aof_buf, sdslen(server.aof_buf));
/* Sync strategy */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* Synchronous: blocks until data hits disk */
redis_fsync(server.aof_fd);
} else if (server.aof_fsync == AOF_FSYNC_EVERYSEC) {
/* Asynchronous: delegate to background thread */
if (server.aof_fsync_scheduled == 0 &&
(now - server.aof_last_fsync) >= 1) {
aof_background_fsync(server.aof_fd);
}
}
/* AOF_FSYNC_NO: no fsync(), OS decides when to flush page cache */
}
AOF file content for this command:
*2\r\n$6\r\nSELECT\r\n$1\r\n0\r\n โ SELECT 0 (if needed)
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*3\r\n$9\r\nPEXPIREAT\r\n$3\r\nkey\r\n$13\r\n1704067300000\r\n
Note the EX 100 is converted to an absolute PEXPIREAT timestamp.
12.9 Step 8: Replication Propagation
File: replication.c โ Function: replicationFeedSlaves()
void replicationFeedSlaves(list *slaves, int dictid,
robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[LONG_STR_SIZE];
/* Write to replication backlog ring buffer first.
Backlog default size: 1 MB (repl-backlog-size).
Used for PSYNC (partial resync) when a replica reconnects. */
if (server.repl_backlog) {
char aux[LONG_STR_SIZE + 3];
/* Encode SELECT command if db changed */
if (server.slaveseldb != dictid) {
len = ll2string(aux + 1, sizeof(aux) - 2, dictid);
aux[0] = '*'; aux[len+1] = '\r'; aux[len+2] = '\n';
feedReplicationBuffer(aux, len + 3);
feedReplicationBuffer("$6\r\nSELECT\r\n", 12);
server.slaveseldb = dictid;
}
/* Encode argv as RESP and write to backlog */
feedReplicationBufferWithObject(
catAppendOnlyGenericCommand(sdsempty(), argc, argv)
);
}
/* Push to each online replica's output buffer */
listRewind(slaves, &li);
while ((ln = listNext(&li)) != NULL) {
client *slave = ln->value;
if (slave->replstate != SLAVE_STATE_ONLINE) continue;
/* Each replica has its own output buffer list.
The event loop sends buffered data via write() when the
socket becomes writable. */
addReplyProto(slave, /* RESP bytes */, /* len */);
}
}
Replication backlog and PSYNC:
Replication timeline:
Master backlog (ring buffer, 1 MB):
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ offset 1000 ... offset 5000 ... offset 8192 ... โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โฒ โฒ
replica B (online) replica A (reconnecting)
Replica A sends: PSYNC <repl_id> 1000
Master checks: is offset 1000 still in backlog? Yes โ send delta
Result: partial resync (no full RDB transfer needed)
12.10 Step 9: Response Delivery
File: networking.c โ Function: addReply()
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
_addReplyToBufferOrList(c, obj->ptr, sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
char buf[32];
size_t len = ll2string(buf, sizeof(buf), (long)obj->ptr);
_addReplyToBufferOrList(c, buf, len);
}
}
static int _addReplyToBufferOrList(client *c, const char *s, size_t len) {
/* c->buf is a fixed 16 KB inline buffer.
Writing here avoids heap allocation for small responses. */
if (c->bufpos + len <= sizeof(c->buf)) {
memcpy(c->buf + c->bufpos, s, len);
c->bufpos += len;
return C_OK;
}
/* Overflow to the reply list (heap-allocated SDS nodes) */
listAddNodeTail(c->reply, sdsnewlen(s, len));
c->reply_bytes += len;
return C_OK;
}
shared.ok is a pre-allocated robj containing +OK\r\n (5 bytes). This avoids an allocation on every successful write command.
The response sits in c->buf until the event loop detects a writable socket event and calls sendReplyToClient(), which drains the buffer via write() system calls.
12.11 Complete Call Stack
epoll_wait() fires readable event
โโ readQueryFromClient() networking.c:1624
โโ processInputBuffer() networking.c:2234
โโ processMultibulkBuffer() networking.c:2057 โ RESP parse
โโ processCommandAndResetClient()
โโ processCommand() server.c:3948
โโ lookupCommand() server.c:3414 โ O(1) hash lookup
โโ [auth check]
โโ [oom check โ freeMemoryIfNeededAndSafe()]
โโ call() server.c:3633
โโ setCommand() t_string.c:97
โ โโ parseExtendedStringArgumentsOrReply()
โ โโ setGenericCommand()
โ โโ genericSetKey()
โ โ โโ dbAdd() db.c:174
โ โ โ โโ dictAdd() dict.c:366
โ โ โโ signalModifiedKey()
โ โโ setExpire() db.c:1265
โ โโ addReply(shared.ok) networking.c:431
โโ propagate() server.c:3573
โ โโ feedAppendOnlyFile() aof.c:1348
โ โโ replicationFeedSlaves() replication.c:522
โโ [update server.stat_*, dirty++]
epoll_wait() fires writable event
โโ sendReplyToClient() networking.c:1534
โโ write(fd, c->buf, c->bufpos)
โ client receives "+OK\r\n"
12.12 Latency Breakdown
| Stage | Typical latency | Dominant cost |
|---|---|---|
| TCP read + buffer | 0.5โ2 ยตs | read() syscall, memcpy |
| RESP parsing | 0.3โ1 ยตs | string scanning |
| Command lookup | < 0.1 ยตs | hash table lookup |
| Pre-checks (no eviction) | 0.1โ0.5 ยตs | dict lookups |
| SET execution + encoding | 0.5โ2 ยตs | dict write, SDS ops |
| AOF buf append | 0.3โ1 ยตs | sdscat |
addReply |
< 0.1 ยตs | memcpy 5 bytes |
| Total (no disk I/O) | ~2โ7 ยตs | |
appendfsync always penalty |
+1โ10 ms | fsync() disk flush |
| Network RTT (same datacenter) | ~200 ยตs | dominates end-to-end |
This explains why Redis's "slowness" is almost always the network, not the command execution. The appendfsync always setting inserts a mandatory disk seek into the hot path, dropping throughput from ~1M ops/s to ~10โ20K ops/s on spinning disks.
Pipeline optimization: Using pipelining (sending multiple commands before reading responses) amortizes the per-command RTT, achieving near-wire-speed throughput with the same single-threaded event loop.