Chapter 22

Network Layer Source: ae Event Loop and I/O Multiplexing

Chapter 22 — Network Layer Source Code: The ae Event Loop and I/O Multiplexing

Redis uses a single-threaded command execution model, yet delivers sub-millisecond latency while handling hundreds of thousands of requests per second. The secret lies in ae (Asynchronous Events) — a lightweight but sophisticated I/O multiplexing framework. This chapter dives deep into ae.c, ae_epoll.c, and networking.c.


22.1 The aeEventLoop Core Structure

All event loop state is centralized in the aeEventLoop struct (defined in ae.h):

typedef struct aeEventLoop {
    int maxfd;                          // Highest fd currently registered
    int setsize;                        // Maximum number of fds we can monitor
    long long timeEventNextId;          // Auto-increment counter for time event IDs
    aeFileEvent *events;                // File event array, indexed by fd: events[fd]
    aeFiredEvent *fired;                // Events fired in the latest aeApiPoll call
    aeTimeEvent *timeEventHead;         // Head of the time events linked list
    int stop;                           // Set to 1 to terminate the event loop
    void *apidata;                      // Platform-specific data (aeApiState for epoll/kqueue)
    aeBeforeSleepProc *beforesleep;     // Callback before epoll_wait (beforeSleep)
    aeBeforeSleepProc *aftersleep;      // Callback after epoll_wait (afterSleep)
    int flags;                          // AE_DONT_WAIT and other flags
} aeEventLoop;

Key design decisions:

aeFileEvent: The File Event Descriptor

typedef struct aeFileEvent {
    int mask;                   // AE_READABLE | AE_WRITABLE | AE_BARRIER
    aeFileProc *rfileProc;      // Read callback (typically readQueryFromClient)
    aeFileProc *wfileProc;      // Write callback (typically sendReplyToClient)
    void *clientData;           // User data passed to callbacks (usually client*)
} aeFileEvent;

AE_BARRIER was introduced in Redis 6.0. When a fd is simultaneously readable and writable, AE_BARRIER forces the write callback to execute before the read callback — used to ensure the AOF has been fsynced before replying to clients.

aeFiredEvent: The Ready Event

typedef struct aeFiredEvent {
    int fd;     // The file descriptor that became ready
    int mask;   // Which events fired (AE_READABLE and/or AE_WRITABLE)
} aeFiredEvent;

22.2 The epoll Wrapper: ae_epoll.c

Redis's epoll wrapper is remarkably minimal — a state struct plus four operations:

// ae_epoll.c
typedef struct aeApiState {
    int epfd;                   // The epoll file descriptor
    struct epoll_event *events; // Result buffer for epoll_wait
} aeApiState;

// Initialization: aeApiCreate
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
    state->events = zmalloc(sizeof(struct epoll_event) * eventLoop->setsize);
    // The 1024 argument is historical; modern kernels ignore it
    state->epfd = epoll_create(1024);
    eventLoop->apidata = state;
    return 0;
}

// Register or modify an fd: aeApiAddEvent
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0};

    // Decide between EPOLL_CTL_ADD (new) and EPOLL_CTL_MOD (existing)
    int op = eventLoop->events[fd].mask == AE_NONE
             ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask;  // Merge with existing mask
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;

    epoll_ctl(state->epfd, op, fd, &ee);
    return 0;
}

// Remove events for a fd: aeApiDelEvent
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0};
    int mask = eventLoop->events[fd].mask & (~delmask);  // Clear the bit

    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;

    if (mask != AE_NONE) {
        epoll_ctl(state->epfd, EPOLL_CTL_MOD, fd, &ee);
    } else {
        epoll_ctl(state->epfd, EPOLL_CTL_DEL, fd, &ee);
    }
}

// Wait for ready events: aeApiPoll
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    // tvp == NULL means block indefinitely
    int timeout = tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1;

    retval = epoll_wait(state->epfd, state->events,
                        eventLoop->setsize, timeout);

    if (retval > 0) {
        numevents = retval;
        for (int j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events + j;
            if (e->events & EPOLLIN)  mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            // Map error conditions to both readable and writable
            if (e->events & EPOLLERR) mask |= AE_READABLE | AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_READABLE | AE_WRITABLE;
            // Populate the fired array
            eventLoop->fired[j].fd   = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

Performance reference numbers:


22.3 aeProcessEvents: The Main Loop Core

aeProcessEvents is the heart of the event loop. Each iteration processes all ready I/O events and expired time events:

int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    int processed = 0, numevents;

    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    if (eventLoop->maxfd != -1 ||
        (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))) {

        // 1. Compute the epoll_wait timeout
        //    Use the nearest time event's remaining time so we don't sleep too long
        struct timeval tv, *tvp = NULL;
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) {
            aeTimeEvent *shortest = aeSearchNearestTimer(eventLoop);
            if (shortest) {
                long now_sec, now_ms;
                aeGetTime(&now_sec, &now_ms);
                tvp = &tv;
                long long ms = (shortest->when_sec - now_sec) * 1000
                             + shortest->when_ms - now_ms;
                if (ms > 0) {
                    tvp->tv_sec  = ms / 1000;
                    tvp->tv_usec = (ms % 1000) * 1000;
                } else {
                    // Time event already overdue: return immediately
                    tvp->tv_sec = tvp->tv_usec = 0;
                }
            }
        }

        // 2. beforesleep callback (before epoll_wait)
        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        // 3. aeApiPoll → epoll_wait (the core blocking point)
        numevents = aeApiPoll(eventLoop, tvp);

        // 4. aftersleep callback (after epoll_wait returns — used to collect
        //    results from I/O threads in multi-threaded mode)
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        // 5. Process fired file events
        for (int j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd   = eventLoop->fired[j].fd;
            int fired = 0;

            // AE_BARRIER: write before read
            int invert = fe->mask & AE_BARRIER;

            if (!invert && (fe->mask & mask & AE_READABLE)) {
                fe->rfileProc(eventLoop, fd, fe->clientData, mask);
                fired++;
                fe = &eventLoop->events[fd]; // Callback may modify events
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop, fd, fe->clientData, mask);
                    fired++;
                }
            }
            // When AE_BARRIER is set, read after write
            if (invert) {
                fe = &eventLoop->events[fd];
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc)) {
                    fe->rfileProc(eventLoop, fd, fe->clientData, mask);
                    fired++;
                }
            }
            processed++;
        }
    }

    // 6. Process time events
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed;
}

22.4 beforeSleep: Critical Work Before Each epoll Call

beforeSleep bridges command execution with persistence (defined in server.c):

void beforeSleep(struct aeEventLoop *eventLoop) {
    // 1. Process clients whose reads were completed by I/O threads (Redis 6+)
    //    I/O threads have finished reading; main thread parses and executes here
    if (server.io_threads_num > 1)
        handleClientsWithPendingReadsUsingThreads();

    // 2. Unblock clients waiting for WAIT command acknowledgments
    handleClientsBlockedOnKeys();

    // 3. AOF fsync (the actual trigger point for everysec mode)
    //    Not every call triggers fsync — only if more than 1 second has elapsed
    if (server.aof_state == AOF_ON &&
        server.aof_fsync == AOF_FSYNC_EVERYSEC &&
        server.aof_buf != NULL) {
        flushAppendOnlyFile(0);
    }

    // 4. Send accumulated replication commands to replicas
    replicationFeedSlaves(server.slaves, ...);

    // 5. Dispatch write replies via I/O threads (or directly in single-thread mode)
    if (server.io_threads_num > 1)
        handleClientsWithPendingWritesUsingThreads();
    else
        handleClientsWithPendingWrites();

    // 6. Free clients marked CLIENT_CLOSE_ASAP
    freeClientsInAsyncFreeQueue();
}

The Real AOF fsync Timeline

Many engineers believe appendfsync everysec fsyncs every second on the main thread. The actual sequence is:

  1. After command execution, feedAppendOnlyFile appends the command to server.aof_buf (in-memory buffer)
  2. beforeSleep calls flushAppendOnlyFile, which write()s aof_buf to the kernel buffer (not fsync yet)
  3. The bio.c background thread runs fdatasync asynchronously (non-blocking to main thread)
  4. If the last fsync completed more than 2 seconds ago, the main thread waits — this is the worst-case data loss window for everysec mode

22.5 The Full Connection Lifecycle

// 1. acceptTcpHandler (networking.c) — AE_READABLE callback on listening fd
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
    acceptCommonHandler(connCreateAcceptedSocket(cfd), 0, cip);
}

// 2. acceptCommonHandler → createClient
client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));

    connSetReadHandler(conn, readQueryFromClient);
    // Equivalent to: aeCreateFileEvent(el, fd, AE_READABLE, readQueryFromClient, c)

    // Initialize client state
    c->db        = &server.db[0];    // Default database 0
    c->name      = NULL;
    c->querybuf  = sdsempty();       // Input buffer
    c->argc      = 0;
    c->argv      = NULL;
    c->reply     = listCreate();     // Output buffer linked list
    c->bufpos    = 0;                // Static output buffer write position

    listAddNodeTail(server.clients, c);
    return c;
}

Connection Lifecycle Timeline

TCP SYN → accept() → createClient() → Register AE_READABLE
    ↓
Client sends command → epoll fires AE_READABLE
    ↓
readQueryFromClient() → connRead() → processInputBuffer()
    ↓
processMultibulkBuffer() → processCommand()
    ↓
lookupCommand() → dispatch to handler (e.g., setCommand)
    ↓
addReply() → write to c->buf or c->reply list
    ↓
Register AE_WRITABLE → next iteration sendReplyToClient()
    ↓
Client closes → freeClient() → release all resources

22.6 Multi-Threaded I/O Coordination (Redis 6+)

Redis 6.0 introduced multi-threaded I/O while keeping command execution single-threaded to preserve atomicity.

Architecture

Main Thread                      I/O Threads (disabled by default)
    │
    ├─ accept new connections ──→ assign read tasks to I/O threads
    │
    ├─ spin-wait for all I/O threads to complete reads
    │   (checking io_threads_pending[id] == 0 atomically)
    │
    ├─ execute ALL commands single-threaded (atomic guarantee)
    │
    └─ assign write tasks to I/O threads → threads send responses

Key Synchronization Mechanism

// networking.c / threads_mngr.c
// Atomic task counter per I/O thread
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];

// Main thread distributes tasks, then spin-waits
while (1) {
    unsigned long pending = 0;
    for (int j = 1; j < server.io_threads_num; j++)
        pending += io_threads_pending[j];
    if (pending == 0) break;  // All I/O threads have finished
}

I/O Thread Worker Loop

void *IOThreadMain(void *myid) {
    long id = (unsigned long)myid;

    while (1) {
        // Spin while no work is available (busy-wait for low latency)
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_cond_wait(&io_threads_cond[id], &io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }

        // Process assigned clients
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id], &li);
        while ((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c, 0);       // Send buffered replies
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);  // Read client data
            }
        }
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;  // Signal completion
    }
}

Configuration

# redis.conf
io-threads 4              # Number of I/O threads (recommend: CPU count - 1)
io-threads-do-reads yes   # Use I/O threads for reads (default: no, writes only)

Measured performance data:


22.7 RESP Protocol Parsing in Detail

RESP Wire Format

*3\r\n           # Array of 3 elements
$3\r\n           # Bulk string, 3 bytes
SET\r\n
$5\r\n           # Bulk string, 5 bytes
hello\r\n
$5\r\n           # Bulk string, 5 bytes
world\r\n

processMultibulkBuffer Source

int processMultibulkBuffer(client *c) {
    // First call: parse the *<argc>\r\n header
    if (c->multibulklen == 0) {
        char *newline = strchr(c->querybuf + c->qb_pos, '\r');
        if (!newline) return C_ERR;  // Incomplete data
        c->multibulklen = atoi(c->querybuf + c->qb_pos + 1);
        c->qb_pos = newline - c->querybuf + 2;  // Skip \r\n
    }

    // Parse each argument
    while (c->multibulklen) {
        // Parse $<len>\r\n
        if (c->bulklen == -1) {
            char *newline = strchr(c->querybuf + c->qb_pos, '\r');
            if (!newline) return C_ERR;
            c->bulklen = atoi(c->querybuf + c->qb_pos + 1);
            c->qb_pos = newline - c->querybuf + 2;
        }

        // Check that the full bulk string (plus \r\n) is available
        if (sdslen(c->querybuf) - c->qb_pos < (size_t)(c->bulklen + 2))
            return C_ERR;

        // Create a redisObject and add it to argv
        c->argv[c->argc++] = createStringObject(
            c->querybuf + c->qb_pos, c->bulklen);
        c->qb_pos += c->bulklen + 2;
        c->bulklen = -1;
        c->multibulklen--;
    }

    return C_OK;
}

The addReply Family

void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;

    // First try the static buffer (16KB, avoids allocation)
    if (_addReplyToBuffer(c, obj->ptr, sdslen(obj->ptr)) != C_OK) {
        // Static buffer full — fall back to dynamic linked list
        _addReplyProtoToList(c, obj->ptr, sdslen(obj->ptr));
    }
}

int prepareClientToWrite(client *c) {
    if (c->flags & CLIENT_REPLY_OFF) return C_ERR;

    // Enqueue in pending-write list so beforeSleep handles sending
    if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_WRITE)) {
        c->flags |= CLIENT_PENDING_WRITE;
        listAddNodeHead(server.clients_pending_write, c);
    }
    return C_OK;
}

22.8 Network Layer Performance Tuning

Key Configuration Parameters

# Maximum number of client connections (default: 10000)
maxclients 50000

# TCP accept backlog (must also tune /proc/sys/net/core/somaxconn)
tcp-backlog 511

# TCP keepalive interval to detect dead connections (seconds)
tcp-keepalive 300

# Client idle timeout (0 = disabled)
timeout 0

System-Level Tuning

# Increase file descriptor limit
ulimit -n 100000
echo "* soft nofile 100000" >> /etc/security/limits.conf
echo "* hard nofile 100000" >> /etc/security/limits.conf

# Tune TCP kernel parameters
sysctl -w net.core.somaxconn=65535
sysctl -w net.ipv4.tcp_max_syn_backlog=65535
sysctl -w net.ipv4.tcp_fin_timeout=15
sysctl -w net.ipv4.tcp_tw_reuse=1

# Disable Transparent Huge Pages (reduces fork COW overhead)
echo never > /sys/kernel/mm/transparent_hugepage/enabled
echo never > /sys/kernel/mm/transparent_hugepage/defrag

Latency Analysis Tools

# Redis built-in latency monitoring
redis-cli --latency              # Real-time latency measurement
redis-cli --latency-history      # Historical latency (one sample per 15s)
redis-cli --latency-dist         # Latency distribution as ASCII histogram

# Simulate latency for testing
redis-cli DEBUG SLEEP 0.1        # Force 100ms sleep

# Slow log configuration and query
CONFIG SET slowlog-log-slower-than 1000  # Log commands slower than 1ms
CONFIG SET slowlog-max-len 256           # Keep 256 entries
SLOWLOG GET 10                           # Retrieve last 10 slow commands
SLOWLOG RESET                            # Clear the slow log

Chapter Summary

Rate this chapter
4.5  / 5  (8 ratings)

💬 Comments