1 select
1.1 select原理
// https://blog.csdn.net/shaosunrise/article/details/106957899
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <bitset>
#include <cassert>
#include <iostream>
int select(int nfds,
fd_set *readfds,
fd_set *writefds,
fd_set *exceptfds,
struct timeval *timeout);
/*
nfds:以上三个集合中最大的文件描述符数值 + 1,例如集合是{0,1,5,10},那么maxfd就是 11
readfds:内核检测该集合中的IO是否可读。如果想让内核帮忙检测某个IO是否可读,需要手动把文件描述符加入该集合。
writefds:内核检测该集合中的IO是否可写。同readfds,需要手动把文件描述符加入该集合。
exceptfds:内核检测该集合中的IO是否异常。同readfds,需要手动把文件描述符加入该集合。
timeout:用户线程调用select的超时时长。 设置成NULL,表示如果没有 I/O
事件发生,则 select 一直等待下去。 设置为非0的值,这个表示等待固定的一段时间后从
select 阻塞调用中返回。 设置成 0,表示根本不等待,检测完毕立即返回。
函数返回值:
大于0:成功,返回集合中已就绪的IO总个数
等于-1:调用失败
等于0:没有就绪的IO
*/
/*
select
函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds,当用户process调用select的时候,select会将需要监控的readfds集合拷贝到内核空间(假设监控的仅仅是socket可读),然后遍历自己监控的skb(SocketBuffer),挨个调用skb的poll逻辑以便检查该socket是否有可读事件,遍历完所有的skb后,如果没有任何一个socket可读,那么select会调用schedule_timeout进入schedule循环,使得process进入睡眠。如果在timeout时间内某个socket上有数据可读了,或者等待timeout了,则调用select的process会被唤醒,接下来select就是遍历监控的集合,挨个收集可读事件并返回给用户了。
*/
// 从上述的select函数声明可以看出,fd_set本质是一个bitmap数组,为了方便我们操作该数组,操作系统提供了以下函数:
// 将文件描述符fd从set集合中删除
// exp: using fd_set = std::bitset<1024>;
void FD_CLR(int fd, fd_set *set);
// 判断文件描述符fd是否在set集合中
int FD_ISSET(int fd, fd_set *set);
// 将文件描述符fd添加到set集合中
void FD_SET(int fd, fd_set *set);
// 将set集合中, 所有文件描述符对应的标志位设置为0
void FD_ZERO(fd_set *set);
/*
* select服务端伪码
*/
void select_init() {
int socketfd = socket(AF_INET, SOCK_STREAM, 0);
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(2000);
// IP地址设置成INADDR_ANY,让系统自动获取本机的IP地址。
addr.sin_addr.s_addr = INADDR_ANY;
bind(sockfd, (struct sockaddr *)&addr, sizeof(addr));
listen(sockfd, 5);
for (int i = 0; i < 5; i++) {
memset(&client, 0, sizeof(client));
addrlen = sizeof(client);
fds[i] = accept(sockfd, (struct sockaddr *)&client, &addrlen);
if (fds[i] > max) {
max = fds[i];
}
}
}
void select_main() {
FD_ZERO(&rset);
for (int i = 0; i < 5; i++) {
FD_SET(fds[i], &rset);
}
// rset is a bitmap
// fds[] 1 2 5 7 9
// rset 0110010101000(1024位)
// max = 10
puts("round again");
select(max + 1, &rset, NULL, NULL, NULL);
for (int i = 0; i < 5; i++) {
if (FD_ISSET(fds[i], &rset)) {
memset(buffer, 0, MAXBUF);
read(fds[i], buffer, MAXBUF);
puts(buffer);
}
}
}
/* select 存在的问题
[1] 用户态到内核态拷贝开销
每次调用select,都需要把被监控的fds集合从用户态空间拷贝到内核态空间,高并发场景下这样的拷贝会使得消耗的资源是很大的。
[2] 监听端口数量有限,32位机默认1024个,64位默认2048
能监听端口的数量有限,单个进程所能打开的最大连接数由FD_SETSIZE宏定义,监听上限就等于fds_bits位数组中所有元素的二进制位总数,其大小是32个整数的大小(在32位的机器上,大小就是3232,同理64位机器上为3264),当然我们可以对宏FD_SETSIZE进行修改,然后重新编译内核,但是性能可能会受到影响,一般该数和系统内存关系很大,具体数目可以cat
/proc/sys/fs/file-max察看。32位机默认1024个,64位默认2048。
[3]
找到就绪的事件是遍历的方式,复杂度O(n)
被监控的fds集合中,只要有一个有数据可读,整个socket集合就会被遍历一次调用sk的poll函数收集可读事件:由于当初的需求是朴素,仅仅关心是否有数据可读这样一个事件,当事件通知来的时候,由于数据的到来是异步的,我们不知道事件来的时候,有多少个被监控的socket有数据可读了,于是,只能挨个遍历每个socket来收集可读事件了。
[4] fd_set不可重用,每次都需要调用FD_ZERO初始化
*/
1.2 select服务端实现实例
#include <arpa/inet.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
#include <unistd.h>
#define BACKLOG 5
#define BUFF_SIZE 200
#define DEFAULT_PORT 6666
typedef struct {
int fd; /* client's connection descriptor */
struct sockaddr_in addr; /* client's address */
} CLIENT;
int main(int argc, char *argv[]) {
int SERVER_PORT = DEFAULT_PORT;
if (argc > 2)
printf("param err:\nUsage:\n\t%s port | %s\n\n", argv[0], argv[0]);
if (argc == 2) SERVER_PORT = atoi(argv[1]);
int i, maxi, maxfd, nready, nbytes;
int servSocket, cliSocket;
// 定义fd_set集合
fd_set allset, rset;
socklen_t addrLen;
char buffer[BUFF_SIZE];
CLIENT client[FD_SETSIZE]; /* FD_SETSIZE == 1024 */
struct sockaddr_in servAddr, cliAddr;
if ((servSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket");
exit(1);
}
int optval = 1;
// 设置端口可重用
// 一般来说,一个端口释放后需要等待两分钟左右才能被再次使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。
if (setsockopt(servSocket,
SOL_SOCKET,
SO_REUSEADDR,
&optval,
sizeof(optval)) < 0) {
perror("setsockopt");
exit(0);
}
bzero(&servAddr, sizeof(servAddr));
servAddr.sin_family = AF_INET;
servAddr.sin_port = htons(SERVER_PORT);
// IP地址设置成INADDR_ANY,让系统自动获取本机的IP地址。
servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(servSocket, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0) {
printf("bind");
exit(1);
}
if (listen(servSocket, BACKLOG) < 0) {
printf("listen err");
exit(1);
}
printf("Listen Port: %d\nListening ...\n", SERVER_PORT);
maxi = -1;
maxfd = servSocket;
// 把自定义的client数组中的fd都初始化为-1
for (i = 0; i < FD_SETSIZE; i++)
client[i].fd = -1; /* -1 indicates available entry */
// 清空allset集合的标志位
FD_ZERO(&allset);
// 把监听socket放入这个集合中
FD_SET(servSocket, &allset);
for (;;) {
rset = allset;
// 定义两秒的超时时间
struct timeval timeout;
timeout.tv_sec = 2;
timeout.tv_usec = 0;
// 这个只关注可读状态的描述符,并设置固定的超时时间
nready = select(maxfd + 1, &rset, NULL, NULL, &timeout);
// 出错返回-1
if (nready < 0) {
perror("select");
break;
}
// 超时时间到了返回0
else if (nready == 0) {
printf("select time out\n");
continue;
}
// 关注的描述符可操作,返回值>0
// select返回的是整个集合,检查监听的socket是否可读
if (FD_ISSET(servSocket, &rset)) {
addrLen = sizeof(cliAddr);
// 监听的socket可读,直接调用accept接收请求
if ((cliSocket =
accept(servSocket, (struct sockaddr *)&cliAddr, &addrLen)) < 0) {
perror("accept");
exit(1);
}
printf("\nNew client connections %s:%d\n",
inet_ntoa(cliAddr.sin_addr),
ntohs(cliAddr.sin_port));
// 保存客户端连接的socket,放在之前定义的client数组中
for (i = 0; i < FD_SETSIZE; i++) {
if (client[i].fd < 0) {
client[i].fd = cliSocket;
client[i].addr = cliAddr;
break;
}
}
if (i == FD_SETSIZE) perror("too many clients");
// 把刚刚接收的链接描述符放在关注集合中
FD_SET(cliSocket, &allset);
if (cliSocket > maxfd) maxfd = cliSocket; /* for select */
if (i > maxi) maxi = i; /* max index in client[] array */
if (--nready <= 0) continue; /* no more readable descriptors */
}
// 上一步处理了新连接,这里处理已有连接可读的socket
// 遍历所有的客户连接socket
for (i = 0; i <= maxi; i++) {
if ((cliSocket = client[i].fd) < 0) continue;
// 依次检查每一个客户连接是否可读
if (FD_ISSET(cliSocket, &rset)) {
memset(buffer, 0, BUFF_SIZE);
// 当前客户连接可读则直接使用recv接收数据
nbytes = recv(cliSocket, buffer, sizeof(buffer), 0);
if (nbytes < 0) {
perror("recv");
continue;
}
// recv返回0表示客户端断开连接
else if (nbytes == 0) {
printf("\nDisconnect %s:%d\n",
inet_ntoa(client[i].addr.sin_addr),
ntohs(client[i].addr.sin_port));
close(cliSocket);
// 把此客户端连接从关注集合中清除
FD_CLR(cliSocket, &allset);
client[i].fd = -1;
} else {
printf("\nFrom %s:%d\n",
inet_ntoa(client[i].addr.sin_addr),
ntohs(client[i].addr.sin_port));
printf("Recv: %sLength: %d\n\n", buffer, nbytes);
}
if (--nready <= 0) break; /* no more readable descriptors */
}
}
}
}
2 poll
2.1 poll原理
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <bitset>
#include <cassert>
#include <cmath>
#include <iostream>
/*
poll的实现和select非常相似,只是描述fd集合的方式不同。针对select遗留的三个问题中(问题(2)是fd限制问题,问题(1)和(3)则是性能问题),poll只是使用pollfd结构而不是select的fd_set结构,这就解决了select的问题(2)fds集合大小1024限制问题。但poll和select同样存在一个性能缺点就是包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
*/
struct pollfd {
int fd; /*文件描述符*/
short events; /*监控的事件*/
short revents; /*监控事件中满足条件返回的事件*/
};
// revents的取值一般默认有以下几个宏
#define POLLRDNORM 0x040 /* 普通数据可读 */
#define POLLWRNORM 0x100 /* 可以写数据 */
#define POLLERR 0x008 /* 发生错误 */
int poll(struct pollfd *fds, unsigned long nfds, int timeout);
/*
函数参数:
fds:struct pollfd类型的数组, 存储了待检测的文件描述符,struct
pollfd有三个成员: fd:委托内核检测的文件描述符
events:委托内核检测的fd事件(输入、输出、错误),每一个事件有多个取值
revents:这是一个传出参数,数据由内核写入,存储内核检测之后的结果
nfds:描述的是数组 fds 的大小
timeout: 指定poll函数的阻塞时长
-1:一直阻塞,直到检测的集合中有就绪的IO事件,然后解除阻塞函数返回
0:不阻塞,不管检测集合中有没有已就绪的IO事件,函数马上返回
大于0:表示 poll 调用方等待指定的毫秒数后返回
函数返回值:
-1:失败
大于0:表示检测的集合中已就绪的文件描述符的总个数
*/
/*
poll改变了fds集合的描述方式,使用了pollfd结构而不是select的fd_set结构,使得poll支持的fds集合限制远大于select的1024。poll虽然解决了fds集合大小1024的限制问题,从实现来看。很明显它并没优化大量描述符数组被整体复制于用户态和内核态的地址空间之间,以及个别描述符就绪触发整体描述符集合的遍历的低效问题。poll随着监控的socket集合的增加性能线性下降,使得poll也并不适合用于大并发场景。
*/
/*
poll服务端伪代码
*/
// pollfd结构见上方
void poll_init() {
for (int i = 0; i < 5; i++) {
memset(&client, 0, sizeof(client));
addrlen = sizeof(client);
pollfds[i].fd = accept(sockfd, (struct sockaddr *)&client, &addrlen);
pollfds[i].events = POLLRDNORM;
}
}
void poll_main() {
while (1) {
puts("round again");
poll(pollfds, 5, 50000);
}
for (int i = 0; i < 5; i++) {
if (pollfds[i].revents & POLLRDNORM) {
pollfds[i].revents = 0;
memset(buffer, 0, MAXBUF);
read(pullfds[i].fd, buffer, MAXBUF);
puts(buffer);
}
}
}
2.2 poll服务端实现实例
#include <arpa/inet.h>
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define BACKLOG 5
#define BUFF_SIZE 200
#define DEFAULT_PORT 6666
#define OPEN_MAX 1024 // 这个值可以更大
int main(int argc, char **argv) {
int SERV_PORT = DEFAULT_PORT;
if (argc > 2)
printf("param err:\nUsage:\n\t%s port | %s\n\n", argv[0], argv[0]);
if (argc == 2) SERV_PORT = atoi(argv[1]);
int i, maxi, nready;
int servSocket, cliSocket;
ssize_t nbytes;
char buf[BUFF_SIZE];
socklen_t addrLen;
struct pollfd client[OPEN_MAX]; // 定义一个很大的 pollfd 数组
struct sockaddr_in cliAddr, servAddr;
if ((servSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket");
exit(1);
}
int optval = 1;
// 设置端口可重用
// 一般来说,一个端口释放后需要等待两分钟左右才能被再次使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。
if (setsockopt(servSocket,
SOL_SOCKET,
SO_REUSEADDR,
&optval,
sizeof(optval)) < 0) {
perror("setsockopt");
exit(0);
}
bzero(&servAddr, sizeof(servAddr));
servAddr.sin_family = AF_INET;
servAddr.sin_port = htons(SERV_PORT);
// IP地址设置成INADDR_ANY,让系统自动获取本机的IP地址。
servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(servSocket, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0) {
perror("bind");
exit(1);
}
if (listen(servSocket, BACKLOG) < 0) {
perror("listen err");
exit(1);
}
printf("Listen Port: %d\nListening ...\n", SERV_PORT);
// 先把listen的描述符放进数组
client[0].fd = servSocket;
client[0].events = POLLRDNORM; // 关注可读状态
// 初始化此数组
for (i = 1; i < OPEN_MAX; i++)
client[i].fd = -1; /* -1 indicates available entry */
maxi = 0; /* max index into client[] array */
for (;;) {
// 开始监听啦~
nready = poll(client, maxi + 1, -1);
if (nready < 0) { // 报错了
printf("poll err");
exit(1);
}
// servSocket可读,说明有新链接来了
if (client[0].revents & POLLRDNORM) {
addrLen = sizeof(cliAddr);
if ((cliSocket =
accept(servSocket, (struct sockaddr *)&cliAddr, &addrLen)) < 0) {
printf("accept err");
exit(1);
}
for (i = 1; i < OPEN_MAX; i++) {
if (client[i].fd < 0) {
client[i].fd = cliSocket; // 保存客户端连接的描述符,按顺序放在数组中
client[i].events = POLLRDNORM; // 还是关注是否可读
break;
}
}
printf("\nNew client connections client[%d] %s:%d\n",
i,
inet_ntoa(cliAddr.sin_addr),
ntohs(cliAddr.sin_port));
if (i == OPEN_MAX) printf("too many clients");
if (i > maxi) maxi = i; /* max index in client[] array */
if (--nready <= 0) continue; /* no more readable descriptors */
}
// 循环检查所有的客户端连接
for (i = 1; i <= maxi; i++) {
if ((cliSocket = client[i].fd) < 0) continue;
if (client[i].revents & (POLLRDNORM | POLLERR)) {
memset(buf, 0, BUFF_SIZE);
nbytes = recv(cliSocket, buf, BUFF_SIZE, 0);
if (nbytes < 0) {
printf("recv err");
continue;
} else if (nbytes == 0) {
printf("client[%d] closed connection\n", i);
close(cliSocket);
client[i].fd = -1; // 客户端断开连接,重置标志位
} else {
printf("\nFrom client[%d]\n", i);
printf("Recv: %sLength: %d\n\n", buf, (int)nbytes);
}
if (--nready <= 0) break; /* no more readable descriptors */
}
}
}
}
3 epoll
3.1 epoll原理
// https://zhuanlan.zhihu.com/p/367591714
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <cassert>
#include <iostream>
/*
在linux的网络编程中,很长的时间都在使用select来做事件触发。在linux新的内核中,有了一种替换它的机制,就是epoll。相比于select,epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。如前面我们所说,在内核中的select实现中,它是采用轮询来处理的,轮询的fd数目越多,自然耗时越多。并且,在linux/posix_types.h头文件有这样的声明:
#define __FD_SETSIZE 1024
表示select最多同时监听1024个fd,当然,可以通过修改头文件再重编译内核来扩大这个数目,但这似乎并不治本。
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
epoll的接口非常简单,一共就三个函数:
epoll_create:创建一个epoll句柄
epoll_ctl:向 epoll 对象中添加/修改/删除要管理的连接
epoll_wait:等待其管理的连接上的 IO 事件
*/
int epoll_create(int size);
/*
功能:epoll_create 创建一个新的epoll实例并返回epoll这个实例的专用描述符。
参数size: 用来告诉内核这个监听的数目一共有多大,参数 size 并不是限制了 epoll
所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议。自从
linux 2.6.8 之后,size 参数是被忽略的,也就是说可以填只有大于 0 的任意值。
返回值:如果成功,返回poll 专用的文件描述符,否者失败,返回-1。
*/
// epoll_create的源码实现:
SYSCALL_DEFINE1(epoll_create1, int, flags) {
struct eventpoll *ep = NULL;
// 创建一个 eventpoll 对象
error = ep_alloc(&ep);
}
// struct eventpoll 的定义
// file:fs/eventpoll.c
struct eventpoll {
// sys_epoll_wait用到的等待队列
wait_queue_head_t wq;
// 接收就绪的描述符都会放到这里
struct list_head rdllist;
// 每个epoll对象中都有一颗红黑树
struct rb_root rbr;
// ......
}
static int
ep_alloc(struct eventpoll **pep) {
struct eventpoll *ep;
// 申请 epollevent 内存
ep = kzalloc(sizeof(*ep), GFP_KERNEL);
// 初始化等待队列头
init_waitqueue_head(&ep->wq);
// 初始化就绪列表
INIT_LIST_HEAD(&ep->rdllist);
// 初始化红黑树指针
ep->rbr = RB_ROOT;
// ......
}
/*
其中eventpoll 这个结构体中的几个成员的含义如下:
wq: 等待队列链表。软中断数据就绪的时候会通过 wq 来找到阻塞在 epoll
对象上的用户进程。 rbr:
红黑树。为了支持对海量连接的高效查找、插入和删除,eventpoll
内部使用的就是红黑树。通过红黑树来管理用户主进程accept添加进来的所有 socket
连接。 rdllist: 就绪的描述符链表。当有连接就绪的时候,内核会把就绪的连接放到
rdllist
链表里。这样应用进程只需要判断链表就能找出就绪进程,而不用去遍历红黑树的所有节点了。
*/
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
/*
epoll_ctl 注册感兴趣的文件描述符,把文件描述符添加到epoll实例的interest
list感兴趣列表中。
*/
/*
功能:epoll 的事件注册函数,它不同于 select()
是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
参数epfd: epoll 专用的文件描述符,epoll_create()的返回值
参数op: 表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的 fd 到 epfd 中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从 epfd 中删除一个 fd;
参数fd: 需要监听的文件描述符
参数event: 告诉内核要监听什么事件,struct epoll_event 结构如:
epoll_event定义:*/
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
/*
events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端 SOCKET 正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET :将 EPOLL 设为边缘触发(Edge Trigger)模式,这是相对于水平触发(Level
Trigger)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个
socket 的话,需要再次把这个 socket 加入到 EPOLL 队列里
返回值:0表示成功,-1表示失败。
*/
/*
水平触发(LT,Level Triggered)
epoll_wait()
会通知你某个描述符上有数据可读写,如果你不处理,下次调用的时候还会通知你,直到你处理为止。如果有大量不关心的文件描述符出现可读写状态,就会一直通知你,严重影响你检查关心的文件描述符的效率。
边缘触发(ET, Edge Triggered)
与水平触发模式相反,调用epoll_wait()的时候会通知你哪个文件描述符可读写,如果你不处理或者没处理完下次也不通知你,只通知你这一次,爱咋咋地。直到第二次有数据可读写的时候再次通知你。这种效率比较高,但是不能保证数据的完整性,如果一次处理不完就不告诉你了。
*/
int epoll_wait(int epfd,
struct epoll_event *events,
int maxevents,
int timeout);
/*
功能:等待事件的产生,收集在 epoll 监控的事件中已经发送的事件,类似于 select()
调用。
参数epfd: epoll 专用的文件描述符,epoll_create()的返回值
参数events:
分配好的 epoll_event 结构体数组,epoll 将会把发生的事件赋值到events
数组中(events 不可以是空指针,内核只负责把数据复制到这个 events
数组中,不会去帮助我们在用户态中分配内存)。
参数maxevents: maxevents
告之内核这个 events 有多少个 。
参数timeout: 超时时间,单位为毫秒,为 -1时,函数为阻塞。
返回值: 如果成功,表示返回需要处理的事件数目
如果返回0,表示已超时
如果返回-1,表示失败
*/
// epoll服务端伪代码
void epoll_init() {
struct epoll_event events[5];
int epfd = epoll_create(10);
//...
for (int i = 0; i < 5; i++) {
static struct epoll_event ev;
memset(&client, 0, sizeof(client));
addrlen = sizeof(client);
ev.data.fd = accept(sockfd, (struct sockaddr *)&client, &addrlen);
ev.events = EPOLLIN;
epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev);
}
}
void epoll_main() {
while (1) {
puts("round again");
nfds = epoll_wait(epfd, events, 5, 10000);
// 已经用红黑树排好序,nfds是就绪个数,前面的都是已经就绪的
for (int i = 0; i < nfds; i++) {
memset(buffer, 0, MAXBUF);
read(events[i].data.fd, buffer, MAXBUF);
puts(buffer);
}
}
}
3.2 epoll服务端实现实例
// epoll使用例子, 含读写
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <cassert>
#include <iostream>
const int MAX_EVENT_NUMBER = 10000; // 最大事件数
// 设置句柄非阻塞
int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
int main() {
// 创建套接字
int nRet = 0;
int m_listenfd = socket(PF_INET, SOCK_STREAM, 0);
if (m_listenfd < 0) {
printf("fail to socket!");
return -1;
}
//
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
address.sin_addr.s_addr = htonl(INADDR_ANY);
address.sin_port = htons(6666);
int flag = 1;
// 设置端口可重用
// 一般来说,一个端口释放后需要等待两分钟左右才能被再次使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。
setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
// 绑定端口号
int ret = bind(m_listenfd, (struct sockaddr *)&address, sizeof(address));
if (ret < 0) {
printf("fail to bind!,errno :%d", errno);
return ret;
}
// 监听连接fd,200是可排队的最大连接个数
ret = listen(m_listenfd, 200);
if (ret < 0) {
printf("fail to listen!,errno :%d", errno);
return ret;
}
// 初始化红黑树和事件链表结构rdlist结构
epoll_event events[MAX_EVENT_NUMBER];
// 创建epoll实例
int m_epollfd = epoll_create(5);
if (m_epollfd == -1) {
printf("fail to epoll create!");
return m_epollfd;
}
// 创建节点结构体将监听连接句柄
epoll_event event;
event.data.fd = m_listenfd;
// 设置该句柄为边缘触发(数据没处理完后续不会再触发事件,水平触发是不管数据有没有触发都返回事件),
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
// 添加监听连接句柄作为初始节点进入红黑树结构中,该节点后续处理连接的句柄
epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listenfd, &event);
// 进入服务器循环
while (1) {
int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
if (number < 0 && errno != EINTR) {
printf("epoll failure");
break;
}
for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
// 属于处理新到的客户连接
if (sockfd == m_listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(m_listenfd,
(struct sockaddr *)&client_address,
&client_addrlength);
if (connfd < 0) {
printf("errno is:%d accept error", errno);
return false;
}
epoll_event event;
event.data.fd = connfd;
// 设置该句柄为边缘触发(数据没处理完后续不会再触发事件,水平触发是不管数据有没有触发都返回事件),
event.events = EPOLLIN | EPOLLRDHUP;
// 添加监听连接句柄作为初始节点进入红黑树结构中,该节点后续处理连接的句柄
epoll_ctl(m_epollfd, EPOLL_CTL_ADD, connfd, &event);
setnonblocking(connfd);
} else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) {
// 服务器端关闭连接,
epoll_ctl(m_epollfd, EPOLL_CTL_DEL, sockfd, 0);
close(sockfd);
}
// 处理客户连接上接收到的数据
else if (events[i].events & EPOLLIN) {
char buf[1024] = {0};
read(sockfd, buf, 1024);
printf("from client :%s");
// 将事件设置为写事件返回数据给客户端
events[i].data.fd = sockfd;
events[i].events = EPOLLOUT | EPOLLET | EPOLLONESHOT | EPOLLRDHUP;
epoll_ctl(m_epollfd, EPOLL_CTL_MOD, sockfd, &events[i]);
} else if (events[i].events & EPOLLOUT) {
std::string response = "server response \n";
write(sockfd, response.c_str(), response.length());
// 将事件设置为读事件,继续监听客户端
events[i].data.fd = sockfd;
events[i].events = EPOLLIN | EPOLLRDHUP;
epoll_ctl(m_epollfd, EPOLL_CTL_MOD, sockfd, &events[i]);
}
// else if 可以加管道,unix套接字等等数据
}
}
return 0;
}
4 select、poll、epoll区别
4.1 epoll 为什么比select、poll更高效?
epoll 采用红黑树管理文件描述符
从上图可以看出,epoll使用红黑树管理文件描述符,红黑树插入和删除的都是时间复杂度 O(logN),不会随着文件描述符数量增加而改变。
select、poll采用数组或者链表的形式管理文件描述符,那么在遍历文件描述符时,时间复杂度会随着文件描述的增加而增加。
epoll 将文件描述符添加和检测分离,减少了文件描述符拷贝的消耗
select&poll 调用时会将全部监听的 fd 从用户态空间拷贝至内核态空间并线性扫描一遍找出就绪的 fd 再返回到用户态。下次需要监听时,又需要把之前已经传递过的文件描述符再读传递进去,增加了拷贝文件的无效消耗,当文件描述很多时,性能瓶颈更加明显。
而epoll只需要使用epoll_ctl添加一次,后续的检查使用epoll_wait,减少了文件拷贝的消耗。
4.2 select poll epoll 区别表
select | poll | epoll | |
---|---|---|---|
性能 | 随着连接数的增加,性能急剧下降,处理成千上万的并发连接数时,性能很差 | 随着连接数的增加,性能急剧下降,处理成千上万的并发连接数时,性能很差 | 随着连接数的增加,性能基本没有变化 |
连接数 | 一般1024 | 无限制 | 无限制 |
内存拷贝 | 每次调用select拷贝 | 每次调用poll拷贝 | fd首次调用epoll_ctl拷贝,每次调用epoll_wait不拷贝 |
数据结构 | bitmap | 数组 | 红黑树 |
内在处理机制 | 线性轮询 | 线性轮询 | FD挂在红黑树,通过事件回调callback |
时间复杂度 | O(n) | O(n) | O(1) |
4.3 总结
select,poll,epoll都是IO多路复用机制,即可以监视多个描述符,一旦某个描述符就绪(读或写就绪),能够通知程序进行相应读写操作。 但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。
select,poll实现需要自己不断轮询所有fd集合,直到设备就绪,期间可能要睡眠和唤醒多次交替。而epoll其实也需要调用epoll_wait不断轮询就绪链表,期间也可能多次睡眠和唤醒交替,但是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在epoll_wait中进入睡眠的进程。虽然都要睡眠和交替,但是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的时候只要判断一下就绪链表是否为空就行了,这节省了大量的CPU时间。这就是回调机制带来的性能提升。
select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,并且要把current往设备等待队列中挂一次,而epoll只要一次拷贝,而且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并不是设备等待队列,只是一个epoll内部定义的等G待队列)。这也能节省不少的开销。
其他:
当设置了非阻塞io后,accept和read将不会阻塞。
进程发起IO系统调用后,如果内核缓冲区没有数据,进程返回一个错误而不会被阻塞;
进程发起IO系统调用后,如果内核缓冲区有数据,内核就会把数据返回进程。(这个时候才会阻塞,所以是同步的!!!))
accept():
在non-blocking模式下,如果返回值为-1,且errno == EAGAIN或errno == EWOULDBLOCK表示no connections没有新连接请求;
recv()/recvfrom():
在non-blocking模式下,如果返回值为-1,且errno == EAGAIN表示没有可接受的数据或很在接受尚未完成;
send()/sendto():
在non-blocking模式下,如果返回值为-1,且errno == EAGAIN或errno == EWOULDBLOCK表示没有可发送数据或数据发送正在进行没有完成。
read/write:
在non-blocking模式下,如果返回-1,且errno == EAGAIN表示没有可读写数据或可读写正在进行尚未完成。
connect():
在non-bloking模式下,如果返回-1,且errno = EINPROGRESS表示正在连接。
errno常见错误码有哪些:
https://blog.csdn.net/huangwei858/article/details/47723613
5 参考链接
https://zhuanlan.zhihu.com/p/367591714
https://www.bilibili.com/video/BV1qJ411w7du
https://blog.csdn.net/Milu_Jingyu/article/details/120207986
epoll_create源码及解析:
https://blog.csdn.net/weixin_43705457/article/details/104522820
epoll_ctl源码及解析:
https://blog.csdn.net/weixin_43705457/article/details/104525624
epoll_wait源码及解析:
https://blog.csdn.net/weixin_43705457/article/details/104520751