Chapter 12

Command Execution Pipeline: From TCP Bytes to +OK Response

Chapter 12: Full Command Execution Path — From TCP Bytes to +OK

12.1 Overview

This chapter traces every step a single SET key value EX 100 command takes from raw network bytes to the final +OK\r\n response. Source files involved: networking.c, server.c, t_string.c, db.c, aof.c, replication.c, object.c.

Redis is built around a single-threaded event loop backed by epoll (Linux) or kqueue (macOS/BSD). All command processing happens in this single thread, which is why Redis achieves microsecond latency without locks.


12.2 Step 1: TCP Bytes Arrive

File: networking.c — Function: readQueryFromClient()

When the OS signals an inbound read event, the event loop calls readQueryFromClient():

void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread;
    size_t readlen = PROTO_IOBUF_LEN; /* 16 KB read buffer */

    /* Expand the query buffer if needed */
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = connRead(conn, c->querybuf + sdslen(c->querybuf), readlen);

    if (nread == -1) {
        if (connGetState(conn) != CONN_STATE_CONNECTED) {
            freeClientAsync(c);
            return;
        }
        return; /* EAGAIN — no data yet */
    }
    if (nread == 0) {
        /* Connection closed by client */
        freeClientAsync(c);
        return;
    }

    sdsIncrLen(c->querybuf, nread);
    c->lastinteraction = server.unixtime;

    /* Track bytes received per client (for client stats) */
    atomicIncr(server.stat_net_input_bytes, nread);

    processInputBuffer(c);
}

c->querybuf is an SDS (Simple Dynamic String) that accumulates raw bytes. Each connRead() call reads at most 16 KB. If a command's payload exceeds 16 KB it is split across multiple calls; processInputBuffer() handles partial buffers gracefully by returning early if the buffer does not yet hold a complete command.

Protocol type detection:

/* In processInputBuffer() */
if (c->reqtype == 0) {
    if (c->querybuf[c->qb_pos] == '*') {
        c->reqtype = PROTO_REQ_MULTIBULK; /* standard RESP */
    } else {
        c->reqtype = PROTO_REQ_INLINE;    /* telnet-style */
    }
}

12.3 Step 2: RESP Protocol Parsing

File: networking.c — Function: processMultibulkBuffer()

The RESP wire format for SET key value EX 100:

*5\r\n          ← array of 5 elements
$3\r\n          ← bulk string, 3 bytes
SET\r\n
$3\r\n          ← bulk string, 3 bytes
key\r\n
$5\r\n          ← bulk string, 5 bytes
value\r\n
$2\r\n          ← bulk string, 2 bytes
EX\r\n
$3\r\n          ← bulk string, 3 bytes
100\r\n

Total wire bytes: 38 bytes for this command.

int processMultibulkBuffer(client *c) {
    char *newline;
    int ok;
    long long ll;

    /* Parse the '*N' line to get argument count */
    if (c->multibulklen == 0) {
        newline = strchr(c->querybuf + c->qb_pos, '\r');
        if (newline == NULL) return C_ERR; /* incomplete */

        ok = string2ll(c->querybuf + c->qb_pos + 1,
                       newline - (c->querybuf + c->qb_pos) - 1, &ll);
        c->qb_pos = newline - c->querybuf + 2; /* skip \r\n */
        c->multibulklen = ll;
        c->argv = zmalloc(sizeof(robj*) * c->multibulklen);
    }

    /* Parse each '$N\r\nDATA\r\n' element */
    while (c->multibulklen) {
        /* Read the '$N' length line */
        if (c->bulklen == -1) {
            newline = strchr(c->querybuf + c->qb_pos, '\r');
            if (newline == NULL) break; /* wait for more data */

            string2ll(c->querybuf + c->qb_pos + 1,
                      newline - (c->querybuf + c->qb_pos) - 1, &ll);
            c->bulklen = ll;
            c->qb_pos = newline - c->querybuf + 2;
        }

        /* Check if full bulk data has arrived */
        if ((size_t)(c->qb_pos + c->bulklen + 2) > sdslen(c->querybuf))
            break; /* partial — wait for next read */

        c->argv[c->argc++] = createStringObject(
            c->querybuf + c->qb_pos, c->bulklen
        );
        c->qb_pos += c->bulklen + 2; /* skip data + \r\n */
        c->bulklen = -1;
        c->multibulklen--;
    }

    if (c->multibulklen == 0) return C_OK; /* fully parsed */
    return C_ERR;
}

After successful parsing:

c->argc = 5
c->argv[0] = robj{ encoding=EMBSTR, ptr="SET"   }
c->argv[1] = robj{ encoding=EMBSTR, ptr="key"   }
c->argv[2] = robj{ encoding=EMBSTR, ptr="value" }
c->argv[3] = robj{ encoding=EMBSTR, ptr="EX"    }
c->argv[4] = robj{ encoding=EMBSTR, ptr="100"   }

12.4 Step 3: Command Lookup

File: server.c — Function: processCommand()lookupCommand()

int processCommand(client *c) {
    /* --- Step 3a: look up the command in the command table --- */
    c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv, c->argc);

    if (c->cmd == NULL) {
        sds args = catClientArguments(c, 3);
        addReplyErrorFormat(c,
            "unknown command `%s`, with args beginning with: %s",
            (char*)c->argv[0]->ptr, args);
        sdsfree(args);
        return C_OK;
    }
    // ...
}

struct redisCommand *lookupCommand(robj **argv, int argc) {
    return lookupCommandLogic(server.commands, argv, argc, 0);
}

server.commands is a dict (hash map) populated at startup by populateCommandTable(). The hash function is SipHash-1-2. Lookup is O(1).

The redisCommand descriptor for SET:

/* From server.c redisCommandTable[] */
{
  "set",              /* name */
  setCommand,         /* proc — function pointer */
  -3,                 /* arity: -3 = at least 3 args */
  "write denyoom",    /* flags */
  0,                  /* acl categories */
  {"@write","@string","@slow"},
  1,                  /* first key index */
  1,                  /* last key index */
  1,                  /* key step */
}

The arity field of -3 means "3 or more arguments." Redis validates:

if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
    (c->argc < -c->cmd->arity)) {
    addReplyErrorFormat(c, "wrong number of arguments for '%s' command",
                        c->cmd->fullname);
    return C_OK;
}

12.5 Step 4: Pre-Execution Checks

Still inside processCommand(), Redis runs a gauntlet of safety checks:

/* Authentication */
if (!(c->flags & CLIENT_MULTI) && authRequired(c)) {
    addReplyError(c, "NOAUTH Authentication required.");
    return C_OK;
}

/* Client pause (triggered by WAIT, FAILOVER, or RESET) */
if (isPausedActionsWithUpdate(PAUSE_ACTION_CLIENT_WRITE)) {
    blockClientForPausedActions(c);
    return C_OK;
}

/* OOM check: evict memory if needed before write commands */
if (server.maxmemory && !server.loading) {
    int out_of_memory = (freeMemoryIfNeededAndSafe() == C_ERR);
    if (out_of_memory && c->cmd->flags & CMD_DENYOOM) {
        addReply(c, shared.oomerr); /* -OOM command not allowed... */
        return C_OK;
    }
}

/* Disk error protection: refuse writes if AOF/RDB write failed */
if (((server.stop_writes_on_bgsave_err &&
      server.saveparamslen > 0 &&
      server.lastbgsave_status == C_ERR) ||
     (server.aof_state != AOF_OFF &&
      server.aof_last_write_status == C_ERR)) &&
    server.masterhost == NULL &&
    c->cmd->flags & CMD_WRITE) {
    addReply(c, shared.bgsaveerr);
    return C_OK;
}

/* Cluster key slot check */
if (server.cluster_enabled) {
    if (getNodeByQuery(c, c->cmd, c->argv, c->argc, NULL, NULL)
        != server.cluster->myself) {
        clusterRedirectClient(c, ...);
        return C_OK;
    }
}

Only after all checks pass does Redis call call().


12.6 Step 5: Command Execution

File: t_string.c — Function: setCommand()

void setCommand(client *c) {
    robj *expire = NULL;
    int unit = UNIT_SECONDS;
    int flags = OBJ_NO_FLAGS;

    /* Parse optional arguments: EX, PX, EXAT, PXAT, NX, XX, GET, KEEPTTL */
    if (parseExtendedStringArgumentsOrReply(c, &flags, &unit, &expire,
                                            COMMAND_SET) != C_OK)
        return;

    /* Attempt encoding optimization before storing */
    c->argv[2] = tryObjectEncoding(c->argv[2]);

    setGenericCommand(c, flags, c->argv[1], c->argv[2],
                      expire, unit, NULL, NULL);
}

void setGenericCommand(client *c, int flags, robj *key, robj *val,
                       robj *expire, int unit,
                       robj *ok_reply, robj *abort_reply) {
    long long milliseconds = 0, when = 0;

    /* Convert EX 100 → 100,000 ms */
    if (expire) {
        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
            return;
        if (unit == UNIT_SECONDS)
            milliseconds = milliseconds * 1000;
        when = milliseconds + mstime(); /* absolute expiry in ms epoch */

        /* Validate expiry */
        if (checkAlreadyExpired(milliseconds)) {
            /* If NX flag and key doesn't exist, we'd have set it then immediately
               expired it — just return NULL */
            addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
            return;
        }
    }

    /* NX: fail if key already exists */
    if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db, key) != NULL) ||
    /* XX: fail if key does not exist */
        (flags & OBJ_SET_XX && lookupKeyWrite(c->db, key) == NULL)) {
        addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
        return;
    }

    /* GET flag: return old value before overwriting */
    if (flags & OBJ_SET_GET) {
        if (getGenericCommand(c) == C_ERR) return;
    }

    /* Core write */
    genericSetKey(c, c->db, key, val, flags & OBJ_SET_KEEPTTL, 1);
    server.dirty++;

    /* Set expiry in the expires dictionary */
    if (expire) setExpire(c, c->db, key, when);

    /* Keyspace notification */
    notifyKeyspaceEvent(NOTIFY_STRING, "set", key, c->db->id);
    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire", key, c->db->id);

    addReply(c, ok_reply ? ok_reply : shared.ok);
}

Inside genericSetKey():

void genericSetKey(client *c, redisDb *db, robj *key, robj *val,
                   int keepttl, int signal) {
    int existed = 0;

    if (lookupKeyWrite(db, key) == NULL) {
        dbAdd(db, key, val);
    } else {
        existed = 1;
        dbOverwrite(db, key, val);
    }

    incrRefCount(val);
    if (!keepttl) removeExpire(db, key); /* clear old TTL on overwrite */
    if (signal) signalModifiedKey(c, db, key); /* wake WATCH'd clients */
}

void dbAdd(redisDb *db, robj *key, robj *val) {
    sds copy = sdsdup(key->ptr); /* new SDS for dict key */
    dictAdd(db->dict, copy, val); /* O(1) insert into main hash table */
    if (server.cluster_enabled)
        slotToKeyAddEntry(key->ptr, db); /* update slot→key mapping */
}

12.7 Step 6: String Encoding Selection

File: object.c — Function: tryObjectEncoding()

Before the value is stored, Redis attempts to encode it more compactly:

robj *tryObjectEncoding(robj *o) {
    if (!sdsEncodedObject(o)) return o; /* already INT, skip */

    sds s = o->ptr;
    size_t len = sdslen(s);

    /* Attempt integer encoding */
    long long value;
    if (len <= 20 && string2ll(s, len, &value)) {
        /* Shared integers pool: 0 to 9999 */
        if (value >= 0 && value < OBJ_SHARED_INTEGERS) {
            decrRefCount(o);
            return shared.integers[value]; /* pointer to pre-allocated object */
        }
        /* Store integer directly in pointer field — no heap allocation */
        if (value >= LONG_MIN && value <= LONG_MAX) {
            o = makeObjectShared(o);
            o->encoding = OBJ_ENCODING_INT;
            sdsfree(o->ptr);
            o->ptr = (void*)value;
            return o;
        }
    }

    /* EMBSTR: value fits within 44 bytes
       robj header + sdshdr8 + data in one contiguous malloc() */
    if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) { /* 44 */
        robj *emb = createEmbeddedStringObject(s, len);
        decrRefCount(o);
        return emb;
    }

    /* RAW: large string — robj and SDS allocated separately */
    trimStringObjectIfNeeded(o); /* shrink SDS to fit, save memory */
    return o;
}

Memory layout visualization:

Encoding INT (value = 42):
  ┌─────────────────────────────┐
  │ robj (16 bytes)             │
  │   type     = OBJ_STRING     │
  │   encoding = OBJ_ENC_INT    │
  │   refcount = 1              │
  │   ptr      = (void*)42      │  ← integer stored in pointer itself
  └─────────────────────────────┘
  Total: 16 bytes, zero extra heap allocation

Encoding EMBSTR ("hello", 5 bytes):
  ┌─────────────────────────────────────────┐
  │ robj (16B) + sdshdr8 (3B) + data (5B) + NUL │
  │ allocated as ONE malloc() call          │
  └─────────────────────────────────────────┘
  Total: 24 bytes, one allocation → CPU cache friendly

Encoding RAW ("very long string..." > 44 bytes):
  ┌────────────┐    ┌─────────────────────────────┐
  │ robj (16B) │───▶│ sdshdr (3-11B) + data + NUL │
  └────────────┘    └─────────────────────────────┘
  Two separate malloc() calls

The 44-byte EMBSTR threshold is chosen so that robj (16B) + sdshdr8 (3B) + data (44B) + NUL (1B) = 64 bytes, fitting exactly in a typical jemalloc size class.


12.8 Step 7: AOF Propagation

File: aof.c — Function: feedAppendOnlyFile()

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid,
                        robj **argv, int argc) {
    sds buf = sdsempty();

    /* Emit SELECT if current DB changed since last AOF write */
    if (dictid != server.aof_selected_db) {
        char seldb[64];
        snprintf(seldb, sizeof(seldb), "%d", dictid);
        buf = sdscatprintf(buf, "*2\r\n$6\r\nSELECT\r\n$%zu\r\n%s\r\n",
                           strlen(seldb), seldb);
        server.aof_selected_db = dictid;
    }

    /* For SET with relative TTL (EX/PX), rewrite to absolute PEXPIREAT.
       This ensures AOF replay uses wall-clock time rather than
       relative offsets from replay time, which would corrupt TTLs. */
    if (cmd->proc == setCommand && argc > 3) {
        buf = catAppendOnlyExpireAtCommand(buf, server.db + dictid, argv[1]);
    }

    /* Serialize command as RESP into buf */
    buf = catAppendOnlyGenericCommand(buf, argc, argv);

    /* Append to in-memory AOF buffer — will be flushed in beforeSleep() */
    server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf));
    sdsfree(buf);
}

The AOF buffer (server.aof_buf) is flushed to disk by flushAppendOnlyFile() called from beforeSleep() (the hook that runs just before epoll_wait()):

void flushAppendOnlyFile(int force) {
    if (sdslen(server.aof_buf) == 0) {
        /* If fsync is pending from last iteration, call it now */
        if (server.aof_fsync == AOF_FSYNC_EVERYSEC && ...)
            aof_background_fsync(server.aof_fd);
        return;
    }

    /* Write buffer to kernel page cache */
    nwritten = aofWrite(server.aof_fd, server.aof_buf, sdslen(server.aof_buf));

    /* Sync strategy */
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        /* Synchronous: blocks until data hits disk */
        redis_fsync(server.aof_fd);
    } else if (server.aof_fsync == AOF_FSYNC_EVERYSEC) {
        /* Asynchronous: delegate to background thread */
        if (server.aof_fsync_scheduled == 0 &&
            (now - server.aof_last_fsync) >= 1) {
            aof_background_fsync(server.aof_fd);
        }
    }
    /* AOF_FSYNC_NO: no fsync(), OS decides when to flush page cache */
}

AOF file content for this command:

*2\r\n$6\r\nSELECT\r\n$1\r\n0\r\n      ← SELECT 0 (if needed)
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*3\r\n$9\r\nPEXPIREAT\r\n$3\r\nkey\r\n$13\r\n1704067300000\r\n

Note the EX 100 is converted to an absolute PEXPIREAT timestamp.


12.9 Step 8: Replication Propagation

File: replication.c — Function: replicationFeedSlaves()

void replicationFeedSlaves(list *slaves, int dictid,
                           robj **argv, int argc) {
    listNode *ln;
    listIter li;
    int j, len;
    char llstr[LONG_STR_SIZE];

    /* Write to replication backlog ring buffer first.
       Backlog default size: 1 MB (repl-backlog-size).
       Used for PSYNC (partial resync) when a replica reconnects. */
    if (server.repl_backlog) {
        char aux[LONG_STR_SIZE + 3];

        /* Encode SELECT command if db changed */
        if (server.slaveseldb != dictid) {
            len = ll2string(aux + 1, sizeof(aux) - 2, dictid);
            aux[0] = '*'; aux[len+1] = '\r'; aux[len+2] = '\n';
            feedReplicationBuffer(aux, len + 3);
            feedReplicationBuffer("$6\r\nSELECT\r\n", 12);
            server.slaveseldb = dictid;
        }

        /* Encode argv as RESP and write to backlog */
        feedReplicationBufferWithObject(
            catAppendOnlyGenericCommand(sdsempty(), argc, argv)
        );
    }

    /* Push to each online replica's output buffer */
    listRewind(slaves, &li);
    while ((ln = listNext(&li)) != NULL) {
        client *slave = ln->value;
        if (slave->replstate != SLAVE_STATE_ONLINE) continue;

        /* Each replica has its own output buffer list.
           The event loop sends buffered data via write() when the
           socket becomes writable. */
        addReplyProto(slave, /* RESP bytes */, /* len */);
    }
}

Replication backlog and PSYNC:

Replication timeline:

  Master backlog (ring buffer, 1 MB):
  ┌──────────────────────────────────────────────────┐
  │ offset 1000 ... offset 5000 ... offset 8192 ...  │
  └──────────────────────────────────────────────────┘
             ▲                         ▲
         replica B (online)      replica A (reconnecting)

  Replica A sends: PSYNC <repl_id> 1000
  Master checks: is offset 1000 still in backlog? Yes → send delta
  Result: partial resync (no full RDB transfer needed)

12.10 Step 9: Response Delivery

File: networking.c — Function: addReply()

void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        _addReplyToBufferOrList(c, obj->ptr, sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        char buf[32];
        size_t len = ll2string(buf, sizeof(buf), (long)obj->ptr);
        _addReplyToBufferOrList(c, buf, len);
    }
}

static int _addReplyToBufferOrList(client *c, const char *s, size_t len) {
    /* c->buf is a fixed 16 KB inline buffer.
       Writing here avoids heap allocation for small responses. */
    if (c->bufpos + len <= sizeof(c->buf)) {
        memcpy(c->buf + c->bufpos, s, len);
        c->bufpos += len;
        return C_OK;
    }
    /* Overflow to the reply list (heap-allocated SDS nodes) */
    listAddNodeTail(c->reply, sdsnewlen(s, len));
    c->reply_bytes += len;
    return C_OK;
}

shared.ok is a pre-allocated robj containing +OK\r\n (5 bytes). This avoids an allocation on every successful write command.

The response sits in c->buf until the event loop detects a writable socket event and calls sendReplyToClient(), which drains the buffer via write() system calls.


12.11 Complete Call Stack

epoll_wait() fires readable event
  └─ readQueryFromClient()                    networking.c:1624
       └─ processInputBuffer()                networking.c:2234
            └─ processMultibulkBuffer()       networking.c:2057  ← RESP parse
                 └─ processCommandAndResetClient()
                      └─ processCommand()     server.c:3948
                           ├─ lookupCommand() server.c:3414      ← O(1) hash lookup
                           ├─ [auth check]
                           ├─ [oom check → freeMemoryIfNeededAndSafe()]
                           └─ call()          server.c:3633
                                ├─ setCommand()                  t_string.c:97
                                │    ├─ parseExtendedStringArgumentsOrReply()
                                │    └─ setGenericCommand()
                                │         ├─ genericSetKey()
                                │         │    ├─ dbAdd()        db.c:174
                                │         │    │    └─ dictAdd() dict.c:366
                                │         │    └─ signalModifiedKey()
                                │         ├─ setExpire()         db.c:1265
                                │         └─ addReply(shared.ok) networking.c:431
                                ├─ propagate()                   server.c:3573
                                │    ├─ feedAppendOnlyFile()     aof.c:1348
                                │    └─ replicationFeedSlaves()  replication.c:522
                                └─ [update server.stat_*, dirty++]

epoll_wait() fires writable event
  └─ sendReplyToClient()                      networking.c:1534
       └─ write(fd, c->buf, c->bufpos)
            → client receives "+OK\r\n"

12.12 Latency Breakdown

Stage Typical latency Dominant cost
TCP read + buffer 0.5–2 µs read() syscall, memcpy
RESP parsing 0.3–1 µs string scanning
Command lookup < 0.1 µs hash table lookup
Pre-checks (no eviction) 0.1–0.5 µs dict lookups
SET execution + encoding 0.5–2 µs dict write, SDS ops
AOF buf append 0.3–1 µs sdscat
addReply < 0.1 µs memcpy 5 bytes
Total (no disk I/O) ~2–7 µs
appendfsync always penalty +1–10 ms fsync() disk flush
Network RTT (same datacenter) ~200 µs dominates end-to-end

This explains why Redis's "slowness" is almost always the network, not the command execution. The appendfsync always setting inserts a mandatory disk seek into the hot path, dropping throughput from ~1M ops/s to ~10–20K ops/s on spinning disks.

Pipeline optimization: Using pipelining (sending multiple commands before reading responses) amortizes the per-command RTT, achieving near-wire-speed throughput with the same single-threaded event loop.

Rate this chapter
4.6  / 5  (30 ratings)

💬 Comments