第 22 章
网络层源码:ae 事件循环与 I/O 多路复用
第22章 网络层源码:ae 事件循环与 I/O 多路复用
Redis 是单线程命令执行模型,却能以极低延迟处理每秒数十万请求。这背后的核心是 ae(Asynchronous Events)事件循环——一个轻量但精妙的 I/O 多路复用框架。本章深入分析 ae.c、ae_epoll.c 和 networking.c 的源码。
22.1 aeEventLoop 核心结构
事件循环的所有状态集中在 aeEventLoop 结构中(定义于 ae.h):
typedef struct aeEventLoop {
int maxfd; // 当前注册的最大 fd
int setsize; // 可监听的最大 fd 数量(maxclients + FDSET_INCR)
long long timeEventNextId; // 时间事件 ID 自增计数器
aeFileEvent *events; // 文件事件数组,按 fd 索引(events[fd])
aeFiredEvent *fired; // 本次 epoll_wait 返回的就绪事件列表
aeTimeEvent *timeEventHead; // 时间事件链表头(单向链表)
int stop; // 设为1时停止事件循环
void *apidata; // 指向 epoll/kqueue 特定数据(aeApiState)
aeBeforeSleepProc *beforesleep; // epoll_wait 之前的回调(beforeSleep)
aeBeforeSleepProc *aftersleep; // epoll_wait 之后的回调(afterSleep)
int flags; // AE_DONT_WAIT 等标志位
} aeEventLoop;
关键设计决策:
events数组按 fd 索引,O(1) 查找(但上限为setsize)fired数组在每次aeApiPoll后填充,避免动态分配timeEventHead是链表而非优先队列——时间事件数量通常极少(仅serverCron等),链表遍历开销可忽略
aeFileEvent 文件事件
typedef struct aeFileEvent {
int mask; // 事件掩码:AE_READABLE | AE_WRITABLE | AE_BARRIER
aeFileProc *rfileProc; // 可读回调(通常为 readQueryFromClient)
aeFileProc *wfileProc; // 可写回调(通常为 sendReplyToClient)
void *clientData; // 传给回调的用户数据(通常为 client*)
} aeFileEvent;
AE_BARRIER 是 Redis 6.0 引入的特殊标志:当某个 fd 同时可读可写时,AE_BARRIER 确保先执行写回调再执行读回调——用于在回复客户端前确保 AOF 已经 fsync。
aeFiredEvent 就绪事件
typedef struct aeFiredEvent {
int fd; // 就绪的文件描述符
int mask; // 就绪的事件类型(AE_READABLE 或 AE_WRITABLE)
} aeFiredEvent;
22.2 epoll 封装:ae_epoll.c
Redis 的 epoll 封装极为精简,核心结构和三个操作:
// ae_epoll.c
typedef struct aeApiState {
int epfd; // epoll 文件描述符
struct epoll_event *events; // 每次 epoll_wait 的结果缓冲区
} aeApiState;
// 初始化:aeApiCreate
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
state->events = zmalloc(sizeof(struct epoll_event) * eventLoop->setsize);
state->epfd = epoll_create(1024); // 参数仅为历史提示,现代内核忽略
eventLoop->apidata = state;
return 0;
}
// 注册/修改 fd:aeApiAddEvent
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0};
// 决定是 ADD 还是 MOD
int op = eventLoop->events[fd].mask == AE_NONE
? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
// 合并已有掩码与新掩码
mask |= eventLoop->events[fd].mask;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
epoll_ctl(state->epfd, op, fd, &ee);
return 0;
}
// 删除 fd 事件:aeApiDelEvent
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0};
int mask = eventLoop->events[fd].mask & (~delmask); // 清除对应位
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (mask != AE_NONE) {
epoll_ctl(state->epfd, EPOLL_CTL_MOD, fd, &ee);
} else {
epoll_ctl(state->epfd, EPOLL_CTL_DEL, fd, &ee);
}
}
// 等待就绪:aeApiPoll
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// tvp==NULL 表示永久阻塞
int timeout = tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1;
retval = epoll_wait(state->epfd, state->events,
eventLoop->setsize, timeout);
if (retval > 0) {
numevents = retval;
for (int j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events + j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_READABLE | AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_READABLE | AE_WRITABLE;
// 填充 fired 数组
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
性能数据:
epoll_wait系统调用开销:约 1–3 µs(空唤醒)epollvsselect:10,000 个连接时,epollO(活跃连接),selectO(maxfd) → 量级差异- Redis 实测:单实例 100,000 连接,epoll 空闲唤醒 CPU 消耗 < 1%
22.3 aeProcessEvents:主循环核心
aeProcessEvents 是整个事件循环的核心,每次迭代处理所有就绪的 I/O 事件和到期的时间事件:
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
int processed = 0, numevents;
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
if (eventLoop->maxfd != -1 || (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))) {
// 1. 计算 epoll_wait 的超时时间
// 取最近时间事件的剩余时间,避免 sleep 过久错过时间事件
struct timeval tv, *tvp = NULL;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) {
aeTimeEvent *shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
long long ms = (shortest->when_sec - now_sec) * 1000
+ shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms / 1000;
tvp->tv_usec = (ms % 1000) * 1000;
} else {
// 时间事件已到期,立即返回
tvp->tv_sec = tvp->tv_usec = 0;
}
}
}
// 2. beforesleep 回调(epoll_wait 之前)
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
// 3. aeApiPoll → epoll_wait(核心阻塞点)
numevents = aeApiPoll(eventLoop, tvp);
// 4. aftersleep 回调(epoll_wait 返回后,多线程IO收集结果用)
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 5. 处理就绪的文件事件
for (int j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0;
// AE_BARRIER:先写后读
int invert = fe->mask & AE_BARRIER;
if (!invert && (fe->mask & mask & AE_READABLE)) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
fe = &eventLoop->events[fd]; // 回调可能修改事件
}
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
}
// AE_BARRIER 时,写完后再读
if (invert) {
fe = &eventLoop->events[fd];
if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
}
processed++;
}
}
// 6. 处理时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed;
}
22.4 beforeSleep:每次 epoll 前的关键工作
beforeSleep 是连接命令执行和持久化的关键节点(server.c):
void beforeSleep(struct aeEventLoop *eventLoop) {
// 1. 处理多线程IO读取完成的客户端命令(Redis 6+)
// IO线程已完成读取,主线程在此统一解析并执行命令
if (server.io_threads_num > 1)
handleClientsWithPendingReadsUsingThreads();
// 2. 处理 WAIT 命令等待的从库确认
handleClientsBlockedOnKeys();
// 3. AOF fsync(everysec 模式的实际触发点)
// 不是每次都 fsync,而是检查是否距上次 fsync 超过1秒
if (server.aof_state == AOF_ON && server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.aof_buf != NULL) {
flushAppendOnlyFile(0);
}
// 4. 向从库发送积压的复制命令(replication backlog)
replicationFeedSlaves(server.slaves, ...);
// 5. 处理多线程IO的写回复任务
if (server.io_threads_num > 1)
handleClientsWithPendingWritesUsingThreads();
else
handleClientsWithPendingWrites();
// 6. 关闭标记为 CLIENT_CLOSE_ASAP 的客户端
freeClientsInAsyncFreeQueue();
// 7. 释放 lazyfree 任务(如果有)
// ...
}
AOF fsync 的真实时序
很多人误以为 appendfsync everysec 会每秒在主线程 fsync,实际并不是这样:
- 命令执行后,
feedAppendOnlyFile将命令追加到server.aof_buf(内存缓冲区) beforeSleep时调用flushAppendOnlyFile,将aof_buf写入内核缓冲(write系统调用)bio.c后台线程异步执行fdatasync(不阻塞主线程)- 若上次 fsync 超过2秒未完成,主线程会等待(这是 everysec 最坏情况丢数据的原因)
22.5 连接建立全链路
// 1. acceptTcpHandler(networking.c):监听端口的 AE_READABLE 回调
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd), 0, cip);
}
// 2. acceptCommonHandler → createClient
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
// ...
connSetReadHandler(conn, readQueryFromClient); // 注册读事件
// 等价于:aeCreateFileEvent(el, fd, AE_READABLE, readQueryFromClient, c)
// 初始化客户端各字段
c->db = &server.db[0]; // 默认 DB 0
c->name = NULL;
c->querybuf = sdsempty();
c->argc = 0;
c->argv = NULL;
c->reply = listCreate(); // 输出缓冲区链表
// ...
listAddNodeTail(server.clients, c); // 加入全局客户端列表
return c;
}
完整连接生命周期
TCP SYN → accept() → createClient() → 注册 AE_READABLE
↓
客户端发送命令 → epoll 触发 AE_READABLE
↓
readQueryFromClient() → connRead() → processInputBuffer()
↓
processMultibulkBuffer() → processCommand()
↓
lookupCommand() → 调用具体命令函数(如 setCommand)
↓
addReply() → 写入 c->buf 或 c->reply
↓
注册 AE_WRITABLE → 下次循环 sendReplyToClient()
↓
客户端关闭 → freeClient() → 释放所有资源
22.6 多线程 I/O 协同(Redis 6+)
Redis 6.0 引入多线程 I/O,但命令执行仍然是单线程,以保证原子性。
架构设计
主线程 IO线程(默认不启用)
│
├─ accept 新连接 ─────→ 分配给 IO 线程读取
│
├─ 等待所有IO线程完成读取(自旋等待 io_threads_pending[id]==0)
│
├─ 单线程执行所有命令(保证原子性)
│
└─ 分配写任务给 IO 线程 → IO线程发送响应
关键同步机制
// threads_mngr.c / networking.c
// io_threads_pending[i] 是每个线程的任务计数器
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
// 主线程分配任务后,自旋等待
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break; // 所有线程完成
}
配置
# redis.conf
io-threads 4 # IO 线程数(建议 = CPU核数 - 1)
io-threads-do-reads yes # 是否用IO线程读取(默认no,仅写)
实测性能数据:
- 禁用多线程 IO(默认):QPS ~100,000–150,000(单线程,100字节 GET/SET)
- 启用 4 个 IO 线程:QPS ~350,000–400,000(pipeline=1)
- 启用后延迟略有上升(多线程同步开销),适合吞吐量优先场景
22.7 RESP 协议解析详解
RESP 格式回顾
*3\r\n # 3个参数
$3\r\n # 第1个参数长度3
SET\r\n # 第1个参数值
$5\r\n # 第2个参数长度5
hello\r\n # 第2个参数值
$5\r\n # 第3个参数长度5
world\r\n # 第3个参数值
processMultibulkBuffer 源码
int processMultibulkBuffer(client *c) {
// 第一次调用:解析 *<argc>\r\n
if (c->multibulklen == 0) {
char *newline = strchr(c->querybuf + c->qb_pos, '\r');
if (!newline) return C_ERR; // 数据不完整
c->multibulklen = atoi(c->querybuf + c->qb_pos + 1);
c->qb_pos = newline - c->querybuf + 2; // 跳过 \r\n
}
// 逐个解析参数
while (c->multibulklen) {
// 解析 $<len>\r\n
if (c->bulklen == -1) {
char *newline = strchr(c->querybuf + c->qb_pos, '\r');
if (!newline) return C_ERR;
c->bulklen = atoi(c->querybuf + c->qb_pos + 1);
c->qb_pos = newline - c->querybuf + 2;
}
// 检查数据是否完整(含 \r\n)
if (sdslen(c->querybuf) - c->qb_pos < (size_t)(c->bulklen + 2))
return C_ERR;
// 创建 redisObject 并加入 argv
c->argv[c->argc++] = createStringObject(
c->querybuf + c->qb_pos, c->bulklen);
c->qb_pos += c->bulklen + 2;
c->bulklen = -1;
c->multibulklen--;
}
return C_OK;
}
addReply 系列函数
// 将响应写入客户端输出缓冲区
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
// 优先写入静态缓冲区 c->buf(16KB)
if (_addReplyToBuffer(c, obj->ptr, sdslen(obj->ptr)) != C_OK) {
// 静态缓冲区满,写入动态链表 c->reply
_addReplyProtoToList(c, obj->ptr, sdslen(obj->ptr));
}
}
// prepareClientToWrite:确保注册了 AE_WRITABLE 事件
int prepareClientToWrite(client *c) {
if (c->flags & CLIENT_REPLY_OFF) return C_ERR;
// 将客户端加入 pending writes 队列(beforeSleep 时统一处理)
if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_WRITE)) {
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write, c);
}
return C_OK;
}
22.8 网络层性能调优
关键配置参数
# 最大客户端连接数(默认10000)
maxclients 50000
# TCP 积压队列(须同步调整 /proc/sys/net/core/somaxconn)
tcp-backlog 511
# 保活检测(防止僵尸连接)
tcp-keepalive 300
# 客户端空闲超时(0=禁用)
timeout 0
# 单次读取最大字节数(防止大命令阻塞事件循环)
# 通过 CLIENT_QUERY_BUFFER_LIMIT 宏定义,默认 1GB
系统级调优
# 提高文件描述符限制
ulimit -n 100000
echo "* soft nofile 100000" >> /etc/security/limits.conf
# 调整 TCP 内核参数
sysctl -w net.core.somaxconn=65535
sysctl -w net.ipv4.tcp_max_syn_backlog=65535
sysctl -w net.ipv4.tcp_fin_timeout=15
# 禁用 THP(透明大页)防止 fork 时 COW 开销
echo never > /sys/kernel/mm/transparent_hugepage/enabled
延迟分析
# Redis 内置延迟监控
redis-cli --latency # 实时延迟测量
redis-cli --latency-history # 历史延迟(每15秒一个采样点)
redis-cli --latency-dist # 延迟分布(ASCII 图)
# DEBUG SLEEP 模拟延迟(测试)
redis-cli DEBUG SLEEP 0.1 # 模拟 100ms 延迟
# 慢日志
CONFIG SET slowlog-log-slower-than 1000 # 超过1ms记录
SLOWLOG GET 10 # 查看最近10条
本章小结
aeEventLoop用 fd 索引的数组存储文件事件,O(1) 查找,是性能的基础- epoll 封装仅封装了4个操作:create/add/del/poll,极度精简
aeProcessEvents的核心流程:计算超时 → beforesleep → epoll_wait → aftersleep → 处理 fired 事件 → 处理时间事件beforeSleep是 AOF fsync、复制发送、命令执行(多线程IO模式)的实际触发点- Redis 6+ 多线程 IO:IO线程负责读写,主线程负责执行,通过原子计数器同步
- RESP 协议解析在
processMultibulkBuffer中完成,结果存入c->argv addReply先写静态缓冲区(16KB),满了才写动态链表,减少内存分配- 多线程 IO 开启4线程时 QPS 可达 35–40万,代价是略高的延迟