跳转至

非阻塞 + epoll 完整编程模式

是什么 / 解决什么问题

将非阻塞 I/O 与 epoll 结合,实现单线程(或少量线程)处理大量并发连接的事件驱动编程范式。这是 Nginx、Redis、Node.js 底层的核心模型。

使用模式

完整的单线程 echo server

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <netinet/tcp.h>

#define MAX_EVENTS 1024
#define BUF_SIZE   4096
#define PORT       8080

// 设置 fd 为非阻塞
static int set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL);
    return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

// 创建监听 socket
static int create_listener(void) {
    int fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);

    int on = 1;
    setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
    setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on));

    struct sockaddr_in addr = {
        .sin_family = AF_INET,
        .sin_port = htons(PORT),
        .sin_addr.s_addr = INADDR_ANY,
    };
    bind(fd, (struct sockaddr *)&addr, sizeof(addr));
    listen(fd, 512);  // backlog = 512

    return fd;
}

int main(void) {
    int listen_fd = create_listener();
    int epfd = epoll_create1(EPOLL_CLOEXEC);

    struct epoll_event ev = { .events = EPOLLIN, .data.fd = listen_fd };
    epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);

    struct epoll_event events[MAX_EVENTS];

    for (;;) {
        int nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
        if (nfds == -1) {
            if (errno == EINTR) continue;
            perror("epoll_wait");
            break;
        }

        for (int i = 0; i < nfds; i++) {
            int fd = events[i].data.fd;
            uint32_t ev_mask = events[i].events;

            if (fd == listen_fd) {
                // ===== 接受新连接 =====
                for (;;) {
                    int conn = accept4(listen_fd, NULL, NULL,
                                       SOCK_NONBLOCK | SOCK_CLOEXEC);
                    if (conn == -1) {
                        if (errno == EAGAIN) break;  // 所有连接都接受完了
                        perror("accept4");
                        break;
                    }
                    // 注册新连接,ET 模式
                    struct epoll_event cev = {
                        .events = EPOLLIN | EPOLLET,
                        .data.fd = conn,
                    };
                    epoll_ctl(epfd, EPOLL_CTL_ADD, conn, &cev);
                }
            } else if (ev_mask & EPOLLIN) {
                // ===== 读事件(ET 模式:循环读到 EAGAIN)=====
                char buf[BUF_SIZE];
                for (;;) {
                    ssize_t n = read(fd, buf, sizeof(buf));
                    if (n > 0) {
                        // echo: 写回去(简化版,忽略 partial write)
                        write(fd, buf, n);
                    } else if (n == 0) {
                        // 对端关闭
                        epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
                        close(fd);
                        break;
                    } else {
                        if (errno == EAGAIN) break;  // 读完了
                        // 错误
                        epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
                        close(fd);
                        break;
                    }
                }
            }
            if (ev_mask & (EPOLLHUP | EPOLLERR)) {
                epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
                close(fd);
            }
        }
    }

    close(epfd);
    close(listen_fd);
    return 0;
}

处理 partial write(生产级写入)

上面的 echo 示例简化了写入。实际中 write 可能只写入部分数据:

struct connection {
    int fd;
    char write_buf[65536];
    size_t write_pos;   // 已写到哪里
    size_t write_len;   // 总共要写多少
};

// 有数据要发送时调用
void start_write(int epfd, struct connection *conn, const char *data, size_t len) {
    memcpy(conn->write_buf, data, len);
    conn->write_pos = 0;
    conn->write_len = len;

    // 先尝试直接写(大概率能写进去)
    flush_write(epfd, conn);
}

// 尝试把缓冲区数据写出去
void flush_write(int epfd, struct connection *conn) {
    while (conn->write_pos < conn->write_len) {
        ssize_t n = write(conn->fd,
                          conn->write_buf + conn->write_pos,
                          conn->write_len - conn->write_pos);
        if (n > 0) {
            conn->write_pos += n;
        } else if (n == -1) {
            if (errno == EAGAIN) {
                // 内核缓冲区满,注册 EPOLLOUT 等待可写
                struct epoll_event ev = {
                    .events = EPOLLIN | EPOLLOUT | EPOLLET,
                    .data.ptr = conn,
                };
                epoll_ctl(epfd, EPOLL_CTL_MOD, conn->fd, &ev);
                return;
            }
            // 真正错误,关闭连接
            close_connection(epfd, conn);
            return;
        }
    }
    // 全部写完,取消 EPOLLOUT 监听
    conn->write_pos = 0;
    conn->write_len = 0;
    struct epoll_event ev = {
        .events = EPOLLIN | EPOLLET,
        .data.ptr = conn,
    };
    epoll_ctl(epfd, EPOLL_CTL_MOD, conn->fd, &ev);
}

// 在事件循环中处理 EPOLLOUT
if (ev_mask & EPOLLOUT) {
    flush_write(epfd, conn);
}

关键模式:先尝试直接 write,写不完再注册 EPOLLOUT,可写时继续发。这避免了每次都注册 EPOLLOUT 的开销。

多线程模型

// 模型1: 每线程独立 epoll(Nginx worker 模型)
// 每个线程有自己的 epfd,通过 SO_REUSEPORT 各自 accept
void *worker(void *arg) {
    int listen_fd = create_listener();  // SO_REUSEPORT 允许多个 socket 绑同一端口
    int epfd = epoll_create1(EPOLL_CLOEXEC);
    // ... 独立事件循环
}

// 模型2: 主线程 accept,分发给 worker 线程
// 主线程 accept 后用 round-robin 分配 fd 到 worker 的 epfd
void dispatch(int conn_fd, int worker_epfd) {
    struct epoll_event ev = { .events = EPOLLIN | EPOLLET, .data.fd = conn_fd };
    epoll_ctl(worker_epfd, EPOLL_CTL_ADD, conn_fd, &ev);
    // 注意:跨线程 epoll_ctl 是线程安全的
}

行为与语义

非阻塞 fd 各操作的返回行为

操作 正常 无数据/缓冲区满 对端关闭 错误
read 返回 > 0 返回 -1, EAGAIN 返回 0 返回 -1, errno
write 返回 > 0(可能 < 请求量) 返回 -1, EAGAIN 收到 SIGPIPE / 返回 -1, EPIPE 返回 -1, errno
accept 返回新 fd 返回 -1, EAGAIN N/A 返回 -1, errno
connect 返回 0 返回 -1, EINPROGRESS N/A 返回 -1, errno

非阻塞 connect

int fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
int ret = connect(fd, addr, addrlen);
if (ret == -1 && errno == EINPROGRESS) {
    // 连接进行中,注册 EPOLLOUT 等待连接完成
    struct epoll_event ev = { .events = EPOLLOUT, .data.fd = fd };
    epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
}

// EPOLLOUT 触发后检查是否连接成功
int err;
socklen_t len = sizeof(err);
getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
if (err == 0) {
    // 连接成功
} else {
    // 连接失败,err 即错误码
}

EAGAIN vs EWOULDBLOCK

// 在 Linux 上这两个值相等
#define EAGAIN      11
#define EWOULDBLOCK EAGAIN

// 但 POSIX 允许它们不同,可移植代码应两个都检查
if (errno == EAGAIN || errno == EWOULDBLOCK) { ... }

性能考量

1. 减少系统调用次数

// 差:每次 accept 一个
int conn = accept4(listen_fd, ...);

// 好:循环 accept 直到 EAGAIN(一次 epoll_wait 处理所有排队连接)
for (;;) {
    int conn = accept4(listen_fd, ...);
    if (conn == -1 && errno == EAGAIN) break;
    // handle conn
}

2. 用 readv/writev 减少拷贝

// 散布/聚集 I/O:一次系统调用读写多个缓冲区
struct iovec iov[2] = {
    { .iov_base = header, .iov_len = header_len },
    { .iov_base = body,   .iov_len = body_len },
};
writev(fd, iov, 2);  // 一次 syscall 写两块数据

3. TCP_NODELAY 减少延迟

// 关闭 Nagle 算法,小包立即发送
int on = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on));

4. 避免不必要的 epoll_ctl

// 差:每次处理完都 MOD
// 好:只在需要改变监听事件时才 MOD

// 更好:用 EPOLLET 减少 epoll_wait 返回次数
// 最好:对于请求-响应模式,用 EPOLLONESHOT + 处理完再 MOD

5. SO_REUSEPORT 多队列负载均衡

// 多个线程各自创建 socket 绑同一端口
// 内核自动负载均衡分配新连接到不同 socket
// 避免了共享 listen_fd 的锁竞争
int on = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on));

常见陷阱与 FAQ

1. 忘记处理 partial write

// 错误:假设 write 总能写完
write(fd, buf, len);  // 可能只写了一部分!

// 正确:检查返回值,未写完的部分缓冲起来等 EPOLLOUT

2. SIGPIPE 导致进程退出

对已关闭的 socket 写入会触发 SIGPIPE,默认行为是终止进程:

// 程序启动时忽略 SIGPIPE
signal(SIGPIPE, SIG_IGN);
// 之后 write 到已关闭连接会返回 -1, errno = EPIPE

// 或者用 MSG_NOSIGNAL 标志(仅对 send 有效)
send(fd, buf, len, MSG_NOSIGNAL);

3. 对端半关闭 (half-close)

// 对端调用了 shutdown(fd, SHUT_WR)
// 你的 read 会返回 0,但你仍然可以 write
// 这不是错误!HTTP/1.1 pipelining 等场景会用到

// 如果你想完全关闭:
if (read(fd, buf, size) == 0) {
    shutdown(fd, SHUT_WR);  // 通知对端我也不写了
    close(fd);
}

4. TIME_WAIT 导致端口占用

// 主动关闭方会进入 TIME_WAIT(2*MSL,通常 60s)
// 重启服务器时 bind 可能失败

// 解决:
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

5. accept 后立即收到 EPOLLHUP

某些情况下(如客户端 connect 后立即 RST),accept 成功但 fd 已经不可用。始终检查事件中的 EPOLLHUP/EPOLLERR。

观测与调试

# 查看连接数和状态
ss -tnp | grep <port>

# 查看 send/recv buffer 积压
ss -tnpi dst :8080
# 输出含 send/recv buffer 使用量

# strace 看完整 I/O 模式
strace -e trace=network,epoll_wait,read,write -p <pid>

# 统计 epoll_wait 每次返回多少事件
bpftrace -e '
tracepoint:syscalls:sys_exit_epoll_wait /args->ret > 0/ {
    @events_per_wait = hist(args->ret);
}'

# 查看 read 返回 EAGAIN 的频率
bpftrace -e '
tracepoint:syscalls:sys_exit_read /args->ret == -1/ {
    @eagain = count();
}'

延伸阅读

  • man 7 epoll — epoll 完整语义
  • man 7 socket — socket 通用选项
  • Redis src/ae.c + src/ae_epoll.c — 极简事件循环实现
  • libuv 源码 — Node.js 底层跨平台事件循环
  • The C10K problem — 经典问题背景