第 22 章

网络层源码:ae 事件循环与 I/O 多路复用

第22章 网络层源码:ae 事件循环与 I/O 多路复用

Redis 是单线程命令执行模型,却能以极低延迟处理每秒数十万请求。这背后的核心是 ae(Asynchronous Events)事件循环——一个轻量但精妙的 I/O 多路复用框架。本章深入分析 ae.cae_epoll.cnetworking.c 的源码。


22.1 aeEventLoop 核心结构

事件循环的所有状态集中在 aeEventLoop 结构中(定义于 ae.h):

typedef struct aeEventLoop {
    int maxfd;                          // 当前注册的最大 fd
    int setsize;                        // 可监听的最大 fd 数量(maxclients + FDSET_INCR)
    long long timeEventNextId;          // 时间事件 ID 自增计数器
    aeFileEvent *events;                // 文件事件数组,按 fd 索引(events[fd])
    aeFiredEvent *fired;                // 本次 epoll_wait 返回的就绪事件列表
    aeTimeEvent *timeEventHead;         // 时间事件链表头(单向链表)
    int stop;                           // 设为1时停止事件循环
    void *apidata;                      // 指向 epoll/kqueue 特定数据(aeApiState)
    aeBeforeSleepProc *beforesleep;     // epoll_wait 之前的回调(beforeSleep)
    aeBeforeSleepProc *aftersleep;      // epoll_wait 之后的回调(afterSleep)
    int flags;                          // AE_DONT_WAIT 等标志位
} aeEventLoop;

关键设计决策

aeFileEvent 文件事件

typedef struct aeFileEvent {
    int mask;                   // 事件掩码:AE_READABLE | AE_WRITABLE | AE_BARRIER
    aeFileProc *rfileProc;      // 可读回调(通常为 readQueryFromClient)
    aeFileProc *wfileProc;      // 可写回调(通常为 sendReplyToClient)
    void *clientData;           // 传给回调的用户数据(通常为 client*)
} aeFileEvent;

AE_BARRIER 是 Redis 6.0 引入的特殊标志:当某个 fd 同时可读可写时,AE_BARRIER 确保先执行写回调再执行读回调——用于在回复客户端前确保 AOF 已经 fsync。

aeFiredEvent 就绪事件

typedef struct aeFiredEvent {
    int fd;     // 就绪的文件描述符
    int mask;   // 就绪的事件类型(AE_READABLE 或 AE_WRITABLE)
} aeFiredEvent;

22.2 epoll 封装:ae_epoll.c

Redis 的 epoll 封装极为精简,核心结构和三个操作:

// ae_epoll.c
typedef struct aeApiState {
    int epfd;                   // epoll 文件描述符
    struct epoll_event *events; // 每次 epoll_wait 的结果缓冲区
} aeApiState;

// 初始化:aeApiCreate
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
    state->events = zmalloc(sizeof(struct epoll_event) * eventLoop->setsize);
    state->epfd = epoll_create(1024);  // 参数仅为历史提示,现代内核忽略
    eventLoop->apidata = state;
    return 0;
}

// 注册/修改 fd:aeApiAddEvent
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0};

    // 决定是 ADD 还是 MOD
    int op = eventLoop->events[fd].mask == AE_NONE
             ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    // 合并已有掩码与新掩码
    mask |= eventLoop->events[fd].mask;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;

    epoll_ctl(state->epfd, op, fd, &ee);
    return 0;
}

// 删除 fd 事件:aeApiDelEvent
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0};
    int mask = eventLoop->events[fd].mask & (~delmask);  // 清除对应位

    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;

    if (mask != AE_NONE) {
        epoll_ctl(state->epfd, EPOLL_CTL_MOD, fd, &ee);
    } else {
        epoll_ctl(state->epfd, EPOLL_CTL_DEL, fd, &ee);
    }
}

// 等待就绪:aeApiPoll
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    // tvp==NULL 表示永久阻塞
    int timeout = tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1;

    retval = epoll_wait(state->epfd, state->events,
                        eventLoop->setsize, timeout);

    if (retval > 0) {
        numevents = retval;
        for (int j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events + j;
            if (e->events & EPOLLIN)  mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_READABLE | AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_READABLE | AE_WRITABLE;
            // 填充 fired 数组
            eventLoop->fired[j].fd   = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

性能数据


22.3 aeProcessEvents:主循环核心

aeProcessEvents 是整个事件循环的核心,每次迭代处理所有就绪的 I/O 事件和到期的时间事件:

int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    int processed = 0, numevents;

    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    if (eventLoop->maxfd != -1 || (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))) {

        // 1. 计算 epoll_wait 的超时时间
        //    取最近时间事件的剩余时间,避免 sleep 过久错过时间事件
        struct timeval tv, *tvp = NULL;
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) {
            aeTimeEvent *shortest = aeSearchNearestTimer(eventLoop);
            if (shortest) {
                long now_sec, now_ms;
                aeGetTime(&now_sec, &now_ms);
                tvp = &tv;
                long long ms = (shortest->when_sec - now_sec) * 1000
                             + shortest->when_ms - now_ms;
                if (ms > 0) {
                    tvp->tv_sec  = ms / 1000;
                    tvp->tv_usec = (ms % 1000) * 1000;
                } else {
                    // 时间事件已到期,立即返回
                    tvp->tv_sec = tvp->tv_usec = 0;
                }
            }
        }

        // 2. beforesleep 回调(epoll_wait 之前)
        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        // 3. aeApiPoll → epoll_wait(核心阻塞点)
        numevents = aeApiPoll(eventLoop, tvp);

        // 4. aftersleep 回调(epoll_wait 返回后,多线程IO收集结果用)
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        // 5. 处理就绪的文件事件
        for (int j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd   = eventLoop->fired[j].fd;
            int fired = 0;

            // AE_BARRIER:先写后读
            int invert = fe->mask & AE_BARRIER;

            if (!invert && (fe->mask & mask & AE_READABLE)) {
                fe->rfileProc(eventLoop, fd, fe->clientData, mask);
                fired++;
                fe = &eventLoop->events[fd]; // 回调可能修改事件
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop, fd, fe->clientData, mask);
                    fired++;
                }
            }
            // AE_BARRIER 时,写完后再读
            if (invert) {
                fe = &eventLoop->events[fd];
                if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) {
                    fe->rfileProc(eventLoop, fd, fe->clientData, mask);
                    fired++;
                }
            }
            processed++;
        }
    }

    // 6. 处理时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed;
}

22.4 beforeSleep:每次 epoll 前的关键工作

beforeSleep 是连接命令执行和持久化的关键节点(server.c):

void beforeSleep(struct aeEventLoop *eventLoop) {
    // 1. 处理多线程IO读取完成的客户端命令(Redis 6+)
    //    IO线程已完成读取,主线程在此统一解析并执行命令
    if (server.io_threads_num > 1)
        handleClientsWithPendingReadsUsingThreads();

    // 2. 处理 WAIT 命令等待的从库确认
    handleClientsBlockedOnKeys();

    // 3. AOF fsync(everysec 模式的实际触发点)
    //    不是每次都 fsync,而是检查是否距上次 fsync 超过1秒
    if (server.aof_state == AOF_ON && server.aof_fsync == AOF_FSYNC_EVERYSEC &&
        server.aof_buf != NULL) {
        flushAppendOnlyFile(0);
    }

    // 4. 向从库发送积压的复制命令(replication backlog)
    replicationFeedSlaves(server.slaves, ...);

    // 5. 处理多线程IO的写回复任务
    if (server.io_threads_num > 1)
        handleClientsWithPendingWritesUsingThreads();
    else
        handleClientsWithPendingWrites();

    // 6. 关闭标记为 CLIENT_CLOSE_ASAP 的客户端
    freeClientsInAsyncFreeQueue();

    // 7. 释放 lazyfree 任务(如果有)
    // ...
}

AOF fsync 的真实时序

很多人误以为 appendfsync everysec 会每秒在主线程 fsync,实际并不是这样:

  1. 命令执行后,feedAppendOnlyFile 将命令追加到 server.aof_buf(内存缓冲区)
  2. beforeSleep 时调用 flushAppendOnlyFile,将 aof_buf 写入内核缓冲(write 系统调用)
  3. bio.c 后台线程异步执行 fdatasync(不阻塞主线程)
  4. 若上次 fsync 超过2秒未完成,主线程会等待(这是 everysec 最坏情况丢数据的原因)

22.5 连接建立全链路

// 1. acceptTcpHandler(networking.c):监听端口的 AE_READABLE 回调
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
    acceptCommonHandler(connCreateAcceptedSocket(cfd), 0, cip);
}

// 2. acceptCommonHandler → createClient
client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));
    // ...
    connSetReadHandler(conn, readQueryFromClient);  // 注册读事件
    // 等价于:aeCreateFileEvent(el, fd, AE_READABLE, readQueryFromClient, c)

    // 初始化客户端各字段
    c->db = &server.db[0];       // 默认 DB 0
    c->name = NULL;
    c->querybuf = sdsempty();
    c->argc = 0;
    c->argv = NULL;
    c->reply = listCreate();     // 输出缓冲区链表
    // ...

    listAddNodeTail(server.clients, c);  // 加入全局客户端列表
    return c;
}

完整连接生命周期

TCP SYN → accept() → createClient() → 注册 AE_READABLE
    ↓
客户端发送命令 → epoll 触发 AE_READABLE
    ↓
readQueryFromClient() → connRead() → processInputBuffer()
    ↓
processMultibulkBuffer() → processCommand()
    ↓
lookupCommand() → 调用具体命令函数(如 setCommand)
    ↓
addReply() → 写入 c->buf 或 c->reply
    ↓
注册 AE_WRITABLE → 下次循环 sendReplyToClient()
    ↓
客户端关闭 → freeClient() → 释放所有资源

22.6 多线程 I/O 协同(Redis 6+)

Redis 6.0 引入多线程 I/O,但命令执行仍然是单线程,以保证原子性。

架构设计

主线程                    IO线程(默认不启用)
  │
  ├─ accept 新连接 ─────→ 分配给 IO 线程读取
  │
  ├─ 等待所有IO线程完成读取(自旋等待 io_threads_pending[id]==0)
  │
  ├─ 单线程执行所有命令(保证原子性)
  │
  └─ 分配写任务给 IO 线程 → IO线程发送响应

关键同步机制

// threads_mngr.c / networking.c
// io_threads_pending[i] 是每个线程的任务计数器
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];

// 主线程分配任务后,自旋等待
while(1) {
    unsigned long pending = 0;
    for (int j = 1; j < server.io_threads_num; j++)
        pending += io_threads_pending[j];
    if (pending == 0) break;  // 所有线程完成
}

配置

# redis.conf
io-threads 4              # IO 线程数(建议 = CPU核数 - 1)
io-threads-do-reads yes   # 是否用IO线程读取(默认no,仅写)

实测性能数据


22.7 RESP 协议解析详解

RESP 格式回顾

*3\r\n           # 3个参数
$3\r\n           # 第1个参数长度3
SET\r\n          # 第1个参数值
$5\r\n           # 第2个参数长度5
hello\r\n        # 第2个参数值
$5\r\n           # 第3个参数长度5
world\r\n        # 第3个参数值

processMultibulkBuffer 源码

int processMultibulkBuffer(client *c) {
    // 第一次调用:解析 *<argc>\r\n
    if (c->multibulklen == 0) {
        char *newline = strchr(c->querybuf + c->qb_pos, '\r');
        if (!newline) return C_ERR;  // 数据不完整
        c->multibulklen = atoi(c->querybuf + c->qb_pos + 1);
        c->qb_pos = newline - c->querybuf + 2;  // 跳过 \r\n
    }

    // 逐个解析参数
    while (c->multibulklen) {
        // 解析 $<len>\r\n
        if (c->bulklen == -1) {
            char *newline = strchr(c->querybuf + c->qb_pos, '\r');
            if (!newline) return C_ERR;
            c->bulklen = atoi(c->querybuf + c->qb_pos + 1);
            c->qb_pos = newline - c->querybuf + 2;
        }

        // 检查数据是否完整(含 \r\n)
        if (sdslen(c->querybuf) - c->qb_pos < (size_t)(c->bulklen + 2))
            return C_ERR;

        // 创建 redisObject 并加入 argv
        c->argv[c->argc++] = createStringObject(
            c->querybuf + c->qb_pos, c->bulklen);
        c->qb_pos += c->bulklen + 2;
        c->bulklen = -1;
        c->multibulklen--;
    }

    return C_OK;
}

addReply 系列函数

// 将响应写入客户端输出缓冲区
void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;

    // 优先写入静态缓冲区 c->buf(16KB)
    if (_addReplyToBuffer(c, obj->ptr, sdslen(obj->ptr)) != C_OK) {
        // 静态缓冲区满,写入动态链表 c->reply
        _addReplyProtoToList(c, obj->ptr, sdslen(obj->ptr));
    }
}

// prepareClientToWrite:确保注册了 AE_WRITABLE 事件
int prepareClientToWrite(client *c) {
    if (c->flags & CLIENT_REPLY_OFF) return C_ERR;

    // 将客户端加入 pending writes 队列(beforeSleep 时统一处理)
    if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_WRITE)) {
        c->flags |= CLIENT_PENDING_WRITE;
        listAddNodeHead(server.clients_pending_write, c);
    }
    return C_OK;
}

22.8 网络层性能调优

关键配置参数

# 最大客户端连接数(默认10000)
maxclients 50000

# TCP 积压队列(须同步调整 /proc/sys/net/core/somaxconn)
tcp-backlog 511

# 保活检测(防止僵尸连接)
tcp-keepalive 300

# 客户端空闲超时(0=禁用)
timeout 0

# 单次读取最大字节数(防止大命令阻塞事件循环)
# 通过 CLIENT_QUERY_BUFFER_LIMIT 宏定义,默认 1GB

系统级调优

# 提高文件描述符限制
ulimit -n 100000
echo "* soft nofile 100000" >> /etc/security/limits.conf

# 调整 TCP 内核参数
sysctl -w net.core.somaxconn=65535
sysctl -w net.ipv4.tcp_max_syn_backlog=65535
sysctl -w net.ipv4.tcp_fin_timeout=15

# 禁用 THP(透明大页)防止 fork 时 COW 开销
echo never > /sys/kernel/mm/transparent_hugepage/enabled

延迟分析

# Redis 内置延迟监控
redis-cli --latency              # 实时延迟测量
redis-cli --latency-history      # 历史延迟(每15秒一个采样点)
redis-cli --latency-dist         # 延迟分布(ASCII 图)

# DEBUG SLEEP 模拟延迟(测试)
redis-cli DEBUG SLEEP 0.1        # 模拟 100ms 延迟

# 慢日志
CONFIG SET slowlog-log-slower-than 1000  # 超过1ms记录
SLOWLOG GET 10                           # 查看最近10条

本章小结

本章评分
4.5  / 5  (8 评分)

💬 留言讨论