非阻塞 + epoll 完整编程模式¶
是什么 / 解决什么问题¶
将非阻塞 I/O 与 epoll 结合,实现单线程(或少量线程)处理大量并发连接的事件驱动编程范式。这是 Nginx、Redis、Node.js 底层的核心模型。
使用模式¶
完整的单线程 echo server¶
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#define MAX_EVENTS 1024
#define BUF_SIZE 4096
#define PORT 8080
// 设置 fd 为非阻塞
static int set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL);
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
// 创建监听 socket
static int create_listener(void) {
int fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
int on = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on));
struct sockaddr_in addr = {
.sin_family = AF_INET,
.sin_port = htons(PORT),
.sin_addr.s_addr = INADDR_ANY,
};
bind(fd, (struct sockaddr *)&addr, sizeof(addr));
listen(fd, 512); // backlog = 512
return fd;
}
int main(void) {
int listen_fd = create_listener();
int epfd = epoll_create1(EPOLL_CLOEXEC);
struct epoll_event ev = { .events = EPOLLIN, .data.fd = listen_fd };
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);
struct epoll_event events[MAX_EVENTS];
for (;;) {
int nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
if (errno == EINTR) continue;
perror("epoll_wait");
break;
}
for (int i = 0; i < nfds; i++) {
int fd = events[i].data.fd;
uint32_t ev_mask = events[i].events;
if (fd == listen_fd) {
// ===== 接受新连接 =====
for (;;) {
int conn = accept4(listen_fd, NULL, NULL,
SOCK_NONBLOCK | SOCK_CLOEXEC);
if (conn == -1) {
if (errno == EAGAIN) break; // 所有连接都接受完了
perror("accept4");
break;
}
// 注册新连接,ET 模式
struct epoll_event cev = {
.events = EPOLLIN | EPOLLET,
.data.fd = conn,
};
epoll_ctl(epfd, EPOLL_CTL_ADD, conn, &cev);
}
} else if (ev_mask & EPOLLIN) {
// ===== 读事件(ET 模式:循环读到 EAGAIN)=====
char buf[BUF_SIZE];
for (;;) {
ssize_t n = read(fd, buf, sizeof(buf));
if (n > 0) {
// echo: 写回去(简化版,忽略 partial write)
write(fd, buf, n);
} else if (n == 0) {
// 对端关闭
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
break;
} else {
if (errno == EAGAIN) break; // 读完了
// 错误
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
break;
}
}
}
if (ev_mask & (EPOLLHUP | EPOLLERR)) {
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}
}
}
close(epfd);
close(listen_fd);
return 0;
}
处理 partial write(生产级写入)¶
上面的 echo 示例简化了写入。实际中 write 可能只写入部分数据:
struct connection {
int fd;
char write_buf[65536];
size_t write_pos; // 已写到哪里
size_t write_len; // 总共要写多少
};
// 有数据要发送时调用
void start_write(int epfd, struct connection *conn, const char *data, size_t len) {
memcpy(conn->write_buf, data, len);
conn->write_pos = 0;
conn->write_len = len;
// 先尝试直接写(大概率能写进去)
flush_write(epfd, conn);
}
// 尝试把缓冲区数据写出去
void flush_write(int epfd, struct connection *conn) {
while (conn->write_pos < conn->write_len) {
ssize_t n = write(conn->fd,
conn->write_buf + conn->write_pos,
conn->write_len - conn->write_pos);
if (n > 0) {
conn->write_pos += n;
} else if (n == -1) {
if (errno == EAGAIN) {
// 内核缓冲区满,注册 EPOLLOUT 等待可写
struct epoll_event ev = {
.events = EPOLLIN | EPOLLOUT | EPOLLET,
.data.ptr = conn,
};
epoll_ctl(epfd, EPOLL_CTL_MOD, conn->fd, &ev);
return;
}
// 真正错误,关闭连接
close_connection(epfd, conn);
return;
}
}
// 全部写完,取消 EPOLLOUT 监听
conn->write_pos = 0;
conn->write_len = 0;
struct epoll_event ev = {
.events = EPOLLIN | EPOLLET,
.data.ptr = conn,
};
epoll_ctl(epfd, EPOLL_CTL_MOD, conn->fd, &ev);
}
// 在事件循环中处理 EPOLLOUT
if (ev_mask & EPOLLOUT) {
flush_write(epfd, conn);
}
关键模式:先尝试直接 write,写不完再注册 EPOLLOUT,可写时继续发。这避免了每次都注册 EPOLLOUT 的开销。
多线程模型¶
// 模型1: 每线程独立 epoll(Nginx worker 模型)
// 每个线程有自己的 epfd,通过 SO_REUSEPORT 各自 accept
void *worker(void *arg) {
int listen_fd = create_listener(); // SO_REUSEPORT 允许多个 socket 绑同一端口
int epfd = epoll_create1(EPOLL_CLOEXEC);
// ... 独立事件循环
}
// 模型2: 主线程 accept,分发给 worker 线程
// 主线程 accept 后用 round-robin 分配 fd 到 worker 的 epfd
void dispatch(int conn_fd, int worker_epfd) {
struct epoll_event ev = { .events = EPOLLIN | EPOLLET, .data.fd = conn_fd };
epoll_ctl(worker_epfd, EPOLL_CTL_ADD, conn_fd, &ev);
// 注意:跨线程 epoll_ctl 是线程安全的
}
行为与语义¶
非阻塞 fd 各操作的返回行为¶
| 操作 | 正常 | 无数据/缓冲区满 | 对端关闭 | 错误 |
|---|---|---|---|---|
read |
返回 > 0 | 返回 -1, EAGAIN | 返回 0 | 返回 -1, errno |
write |
返回 > 0(可能 < 请求量) | 返回 -1, EAGAIN | 收到 SIGPIPE / 返回 -1, EPIPE | 返回 -1, errno |
accept |
返回新 fd | 返回 -1, EAGAIN | N/A | 返回 -1, errno |
connect |
返回 0 | 返回 -1, EINPROGRESS | N/A | 返回 -1, errno |
非阻塞 connect¶
int fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
int ret = connect(fd, addr, addrlen);
if (ret == -1 && errno == EINPROGRESS) {
// 连接进行中,注册 EPOLLOUT 等待连接完成
struct epoll_event ev = { .events = EPOLLOUT, .data.fd = fd };
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
}
// EPOLLOUT 触发后检查是否连接成功
int err;
socklen_t len = sizeof(err);
getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
if (err == 0) {
// 连接成功
} else {
// 连接失败,err 即错误码
}
EAGAIN vs EWOULDBLOCK¶
// 在 Linux 上这两个值相等
#define EAGAIN 11
#define EWOULDBLOCK EAGAIN
// 但 POSIX 允许它们不同,可移植代码应两个都检查
if (errno == EAGAIN || errno == EWOULDBLOCK) { ... }
性能考量¶
1. 减少系统调用次数¶
// 差:每次 accept 一个
int conn = accept4(listen_fd, ...);
// 好:循环 accept 直到 EAGAIN(一次 epoll_wait 处理所有排队连接)
for (;;) {
int conn = accept4(listen_fd, ...);
if (conn == -1 && errno == EAGAIN) break;
// handle conn
}
2. 用 readv/writev 减少拷贝¶
// 散布/聚集 I/O:一次系统调用读写多个缓冲区
struct iovec iov[2] = {
{ .iov_base = header, .iov_len = header_len },
{ .iov_base = body, .iov_len = body_len },
};
writev(fd, iov, 2); // 一次 syscall 写两块数据
3. TCP_NODELAY 减少延迟¶
4. 避免不必要的 epoll_ctl¶
// 差:每次处理完都 MOD
// 好:只在需要改变监听事件时才 MOD
// 更好:用 EPOLLET 减少 epoll_wait 返回次数
// 最好:对于请求-响应模式,用 EPOLLONESHOT + 处理完再 MOD
5. SO_REUSEPORT 多队列负载均衡¶
// 多个线程各自创建 socket 绑同一端口
// 内核自动负载均衡分配新连接到不同 socket
// 避免了共享 listen_fd 的锁竞争
int on = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on));
常见陷阱与 FAQ¶
1. 忘记处理 partial write¶
2. SIGPIPE 导致进程退出¶
对已关闭的 socket 写入会触发 SIGPIPE,默认行为是终止进程:
// 程序启动时忽略 SIGPIPE
signal(SIGPIPE, SIG_IGN);
// 之后 write 到已关闭连接会返回 -1, errno = EPIPE
// 或者用 MSG_NOSIGNAL 标志(仅对 send 有效)
send(fd, buf, len, MSG_NOSIGNAL);
3. 对端半关闭 (half-close)¶
// 对端调用了 shutdown(fd, SHUT_WR)
// 你的 read 会返回 0,但你仍然可以 write
// 这不是错误!HTTP/1.1 pipelining 等场景会用到
// 如果你想完全关闭:
if (read(fd, buf, size) == 0) {
shutdown(fd, SHUT_WR); // 通知对端我也不写了
close(fd);
}
4. TIME_WAIT 导致端口占用¶
// 主动关闭方会进入 TIME_WAIT(2*MSL,通常 60s)
// 重启服务器时 bind 可能失败
// 解决:
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
5. accept 后立即收到 EPOLLHUP¶
某些情况下(如客户端 connect 后立即 RST),accept 成功但 fd 已经不可用。始终检查事件中的 EPOLLHUP/EPOLLERR。
观测与调试¶
# 查看连接数和状态
ss -tnp | grep <port>
# 查看 send/recv buffer 积压
ss -tnpi dst :8080
# 输出含 send/recv buffer 使用量
# strace 看完整 I/O 模式
strace -e trace=network,epoll_wait,read,write -p <pid>
# 统计 epoll_wait 每次返回多少事件
bpftrace -e '
tracepoint:syscalls:sys_exit_epoll_wait /args->ret > 0/ {
@events_per_wait = hist(args->ret);
}'
# 查看 read 返回 EAGAIN 的频率
bpftrace -e '
tracepoint:syscalls:sys_exit_read /args->ret == -1/ {
@eagain = count();
}'
延伸阅读¶
man 7 epoll— epoll 完整语义man 7 socket— socket 通用选项- Redis
src/ae.c+src/ae_epoll.c— 极简事件循环实现 - libuv 源码 — Node.js 底层跨平台事件循环
- The C10K problem — 经典问题背景