Network Layer Source: ae Event Loop and I/O Multiplexing
Chapter 22 — Network Layer Source Code: The ae Event Loop and I/O Multiplexing
Redis uses a single-threaded command execution model, yet delivers sub-millisecond latency while handling hundreds of thousands of requests per second. The secret lies in ae (Asynchronous Events) — a lightweight but sophisticated I/O multiplexing framework. This chapter dives deep into ae.c, ae_epoll.c, and networking.c.
22.1 The aeEventLoop Core Structure
All event loop state is centralized in the aeEventLoop struct (defined in ae.h):
typedef struct aeEventLoop {
int maxfd; // Highest fd currently registered
int setsize; // Maximum number of fds we can monitor
long long timeEventNextId; // Auto-increment counter for time event IDs
aeFileEvent *events; // File event array, indexed by fd: events[fd]
aeFiredEvent *fired; // Events fired in the latest aeApiPoll call
aeTimeEvent *timeEventHead; // Head of the time events linked list
int stop; // Set to 1 to terminate the event loop
void *apidata; // Platform-specific data (aeApiState for epoll/kqueue)
aeBeforeSleepProc *beforesleep; // Callback before epoll_wait (beforeSleep)
aeBeforeSleepProc *aftersleep; // Callback after epoll_wait (afterSleep)
int flags; // AE_DONT_WAIT and other flags
} aeEventLoop;
Key design decisions:
eventsis a flat array indexed by fd, giving O(1) lookup at the cost of pre-allocatingsetsizeentriesfiredis pre-allocated and reused each iteration to avoid dynamic allocation in the hot pathtimeEventHeadis a linked list rather than a priority queue — the number of time events is tiny (onlyserverCronetc.), so linear scan is negligible
aeFileEvent: The File Event Descriptor
typedef struct aeFileEvent {
int mask; // AE_READABLE | AE_WRITABLE | AE_BARRIER
aeFileProc *rfileProc; // Read callback (typically readQueryFromClient)
aeFileProc *wfileProc; // Write callback (typically sendReplyToClient)
void *clientData; // User data passed to callbacks (usually client*)
} aeFileEvent;
AE_BARRIER was introduced in Redis 6.0. When a fd is simultaneously readable and writable, AE_BARRIER forces the write callback to execute before the read callback — used to ensure the AOF has been fsynced before replying to clients.
aeFiredEvent: The Ready Event
typedef struct aeFiredEvent {
int fd; // The file descriptor that became ready
int mask; // Which events fired (AE_READABLE and/or AE_WRITABLE)
} aeFiredEvent;
22.2 The epoll Wrapper: ae_epoll.c
Redis's epoll wrapper is remarkably minimal — a state struct plus four operations:
// ae_epoll.c
typedef struct aeApiState {
int epfd; // The epoll file descriptor
struct epoll_event *events; // Result buffer for epoll_wait
} aeApiState;
// Initialization: aeApiCreate
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
state->events = zmalloc(sizeof(struct epoll_event) * eventLoop->setsize);
// The 1024 argument is historical; modern kernels ignore it
state->epfd = epoll_create(1024);
eventLoop->apidata = state;
return 0;
}
// Register or modify an fd: aeApiAddEvent
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0};
// Decide between EPOLL_CTL_ADD (new) and EPOLL_CTL_MOD (existing)
int op = eventLoop->events[fd].mask == AE_NONE
? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; // Merge with existing mask
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
epoll_ctl(state->epfd, op, fd, &ee);
return 0;
}
// Remove events for a fd: aeApiDelEvent
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0};
int mask = eventLoop->events[fd].mask & (~delmask); // Clear the bit
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (mask != AE_NONE) {
epoll_ctl(state->epfd, EPOLL_CTL_MOD, fd, &ee);
} else {
epoll_ctl(state->epfd, EPOLL_CTL_DEL, fd, &ee);
}
}
// Wait for ready events: aeApiPoll
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// tvp == NULL means block indefinitely
int timeout = tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1;
retval = epoll_wait(state->epfd, state->events,
eventLoop->setsize, timeout);
if (retval > 0) {
numevents = retval;
for (int j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events + j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
// Map error conditions to both readable and writable
if (e->events & EPOLLERR) mask |= AE_READABLE | AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_READABLE | AE_WRITABLE;
// Populate the fired array
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
Performance reference numbers:
epoll_waitsyscall overhead: ~1–3 µs per wakeup (empty wake)epollvsselectat 10,000 connections: epoll is O(active connections), select is O(maxfd) — orders of magnitude difference- Real-world Redis: single instance with 100,000 connections, epoll idle wakeup consumes < 1% CPU
22.3 aeProcessEvents: The Main Loop Core
aeProcessEvents is the heart of the event loop. Each iteration processes all ready I/O events and expired time events:
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
int processed = 0, numevents;
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
if (eventLoop->maxfd != -1 ||
(flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))) {
// 1. Compute the epoll_wait timeout
// Use the nearest time event's remaining time so we don't sleep too long
struct timeval tv, *tvp = NULL;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) {
aeTimeEvent *shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
long long ms = (shortest->when_sec - now_sec) * 1000
+ shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms / 1000;
tvp->tv_usec = (ms % 1000) * 1000;
} else {
// Time event already overdue: return immediately
tvp->tv_sec = tvp->tv_usec = 0;
}
}
}
// 2. beforesleep callback (before epoll_wait)
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
// 3. aeApiPoll → epoll_wait (the core blocking point)
numevents = aeApiPoll(eventLoop, tvp);
// 4. aftersleep callback (after epoll_wait returns — used to collect
// results from I/O threads in multi-threaded mode)
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 5. Process fired file events
for (int j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0;
// AE_BARRIER: write before read
int invert = fe->mask & AE_BARRIER;
if (!invert && (fe->mask & mask & AE_READABLE)) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
fe = &eventLoop->events[fd]; // Callback may modify events
}
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
}
// When AE_BARRIER is set, read after write
if (invert) {
fe = &eventLoop->events[fd];
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc)) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
}
processed++;
}
}
// 6. Process time events
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed;
}
22.4 beforeSleep: Critical Work Before Each epoll Call
beforeSleep bridges command execution with persistence (defined in server.c):
void beforeSleep(struct aeEventLoop *eventLoop) {
// 1. Process clients whose reads were completed by I/O threads (Redis 6+)
// I/O threads have finished reading; main thread parses and executes here
if (server.io_threads_num > 1)
handleClientsWithPendingReadsUsingThreads();
// 2. Unblock clients waiting for WAIT command acknowledgments
handleClientsBlockedOnKeys();
// 3. AOF fsync (the actual trigger point for everysec mode)
// Not every call triggers fsync — only if more than 1 second has elapsed
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.aof_buf != NULL) {
flushAppendOnlyFile(0);
}
// 4. Send accumulated replication commands to replicas
replicationFeedSlaves(server.slaves, ...);
// 5. Dispatch write replies via I/O threads (or directly in single-thread mode)
if (server.io_threads_num > 1)
handleClientsWithPendingWritesUsingThreads();
else
handleClientsWithPendingWrites();
// 6. Free clients marked CLIENT_CLOSE_ASAP
freeClientsInAsyncFreeQueue();
}
The Real AOF fsync Timeline
Many engineers believe appendfsync everysec fsyncs every second on the main thread. The actual sequence is:
- After command execution,
feedAppendOnlyFileappends the command toserver.aof_buf(in-memory buffer) beforeSleepcallsflushAppendOnlyFile, whichwrite()saof_bufto the kernel buffer (not fsync yet)- The
bio.cbackground thread runsfdatasyncasynchronously (non-blocking to main thread) - If the last fsync completed more than 2 seconds ago, the main thread waits — this is the worst-case data loss window for everysec mode
22.5 The Full Connection Lifecycle
// 1. acceptTcpHandler (networking.c) — AE_READABLE callback on listening fd
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd), 0, cip);
}
// 2. acceptCommonHandler → createClient
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
connSetReadHandler(conn, readQueryFromClient);
// Equivalent to: aeCreateFileEvent(el, fd, AE_READABLE, readQueryFromClient, c)
// Initialize client state
c->db = &server.db[0]; // Default database 0
c->name = NULL;
c->querybuf = sdsempty(); // Input buffer
c->argc = 0;
c->argv = NULL;
c->reply = listCreate(); // Output buffer linked list
c->bufpos = 0; // Static output buffer write position
listAddNodeTail(server.clients, c);
return c;
}
Connection Lifecycle Timeline
TCP SYN → accept() → createClient() → Register AE_READABLE
↓
Client sends command → epoll fires AE_READABLE
↓
readQueryFromClient() → connRead() → processInputBuffer()
↓
processMultibulkBuffer() → processCommand()
↓
lookupCommand() → dispatch to handler (e.g., setCommand)
↓
addReply() → write to c->buf or c->reply list
↓
Register AE_WRITABLE → next iteration sendReplyToClient()
↓
Client closes → freeClient() → release all resources
22.6 Multi-Threaded I/O Coordination (Redis 6+)
Redis 6.0 introduced multi-threaded I/O while keeping command execution single-threaded to preserve atomicity.
Architecture
Main Thread I/O Threads (disabled by default)
│
├─ accept new connections ──→ assign read tasks to I/O threads
│
├─ spin-wait for all I/O threads to complete reads
│ (checking io_threads_pending[id] == 0 atomically)
│
├─ execute ALL commands single-threaded (atomic guarantee)
│
└─ assign write tasks to I/O threads → threads send responses
Key Synchronization Mechanism
// networking.c / threads_mngr.c
// Atomic task counter per I/O thread
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
// Main thread distributes tasks, then spin-waits
while (1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break; // All I/O threads have finished
}
I/O Thread Worker Loop
void *IOThreadMain(void *myid) {
long id = (unsigned long)myid;
while (1) {
// Spin while no work is available (busy-wait for low latency)
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_cond_wait(&io_threads_cond[id], &io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
// Process assigned clients
listIter li;
listNode *ln;
listRewind(io_threads_list[id], &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c, 0); // Send buffered replies
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn); // Read client data
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0; // Signal completion
}
}
Configuration
# redis.conf
io-threads 4 # Number of I/O threads (recommend: CPU count - 1)
io-threads-do-reads yes # Use I/O threads for reads (default: no, writes only)
Measured performance data:
- Single-threaded (default): ~100,000–150,000 QPS (100-byte GET/SET, no pipeline)
- 4 I/O threads: ~350,000–400,000 QPS (pipeline=1)
- 8 I/O threads: ~500,000+ QPS (pipeline=1, on 16-core server)
- Trade-off: slightly higher per-operation latency due to thread synchronization
22.7 RESP Protocol Parsing in Detail
RESP Wire Format
*3\r\n # Array of 3 elements
$3\r\n # Bulk string, 3 bytes
SET\r\n
$5\r\n # Bulk string, 5 bytes
hello\r\n
$5\r\n # Bulk string, 5 bytes
world\r\n
processMultibulkBuffer Source
int processMultibulkBuffer(client *c) {
// First call: parse the *<argc>\r\n header
if (c->multibulklen == 0) {
char *newline = strchr(c->querybuf + c->qb_pos, '\r');
if (!newline) return C_ERR; // Incomplete data
c->multibulklen = atoi(c->querybuf + c->qb_pos + 1);
c->qb_pos = newline - c->querybuf + 2; // Skip \r\n
}
// Parse each argument
while (c->multibulklen) {
// Parse $<len>\r\n
if (c->bulklen == -1) {
char *newline = strchr(c->querybuf + c->qb_pos, '\r');
if (!newline) return C_ERR;
c->bulklen = atoi(c->querybuf + c->qb_pos + 1);
c->qb_pos = newline - c->querybuf + 2;
}
// Check that the full bulk string (plus \r\n) is available
if (sdslen(c->querybuf) - c->qb_pos < (size_t)(c->bulklen + 2))
return C_ERR;
// Create a redisObject and add it to argv
c->argv[c->argc++] = createStringObject(
c->querybuf + c->qb_pos, c->bulklen);
c->qb_pos += c->bulklen + 2;
c->bulklen = -1;
c->multibulklen--;
}
return C_OK;
}
The addReply Family
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
// First try the static buffer (16KB, avoids allocation)
if (_addReplyToBuffer(c, obj->ptr, sdslen(obj->ptr)) != C_OK) {
// Static buffer full — fall back to dynamic linked list
_addReplyProtoToList(c, obj->ptr, sdslen(obj->ptr));
}
}
int prepareClientToWrite(client *c) {
if (c->flags & CLIENT_REPLY_OFF) return C_ERR;
// Enqueue in pending-write list so beforeSleep handles sending
if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_WRITE)) {
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write, c);
}
return C_OK;
}
22.8 Network Layer Performance Tuning
Key Configuration Parameters
# Maximum number of client connections (default: 10000)
maxclients 50000
# TCP accept backlog (must also tune /proc/sys/net/core/somaxconn)
tcp-backlog 511
# TCP keepalive interval to detect dead connections (seconds)
tcp-keepalive 300
# Client idle timeout (0 = disabled)
timeout 0
System-Level Tuning
# Increase file descriptor limit
ulimit -n 100000
echo "* soft nofile 100000" >> /etc/security/limits.conf
echo "* hard nofile 100000" >> /etc/security/limits.conf
# Tune TCP kernel parameters
sysctl -w net.core.somaxconn=65535
sysctl -w net.ipv4.tcp_max_syn_backlog=65535
sysctl -w net.ipv4.tcp_fin_timeout=15
sysctl -w net.ipv4.tcp_tw_reuse=1
# Disable Transparent Huge Pages (reduces fork COW overhead)
echo never > /sys/kernel/mm/transparent_hugepage/enabled
echo never > /sys/kernel/mm/transparent_hugepage/defrag
Latency Analysis Tools
# Redis built-in latency monitoring
redis-cli --latency # Real-time latency measurement
redis-cli --latency-history # Historical latency (one sample per 15s)
redis-cli --latency-dist # Latency distribution as ASCII histogram
# Simulate latency for testing
redis-cli DEBUG SLEEP 0.1 # Force 100ms sleep
# Slow log configuration and query
CONFIG SET slowlog-log-slower-than 1000 # Log commands slower than 1ms
CONFIG SET slowlog-max-len 256 # Keep 256 entries
SLOWLOG GET 10 # Retrieve last 10 slow commands
SLOWLOG RESET # Clear the slow log
Chapter Summary
aeEventLoopstores file events in an fd-indexed array for O(1) lookup — the foundation of performance- The epoll wrapper exposes only four operations: create/add/del/poll, kept intentionally minimal
aeProcessEventsflow: compute timeout → beforesleep → epoll_wait → aftersleep → process fired events → process time eventsbeforeSleepis the real trigger point for AOF fsync, replication feed, and command execution (in multi-threaded I/O mode)- Redis 6+ multi-threaded I/O: I/O threads handle read/write, main thread executes commands; synchronized via atomic counters
- RESP parsing occurs in
processMultibulkBuffer, results stored inc->argv addReplywrites to a static 16KB buffer first, falls back to dynamic list — minimizing allocations- 4 I/O threads yields ~3.5x throughput improvement at the cost of marginally higher latency per operation