IO多路复用之select、poll、epoll详解

关于IO多路复用,是我们必须掌握的基本知识。

1 select函数

I/O多路复用的概念涉及一些基本的I/O模型,Linux中的I/O模型大致有五种:

阻塞式I/O
非阻塞式I/O
I/O多路复用
信号驱动式I/O
异步I/O

事实上,前面四种I/O模型都是属于同步式的,意思是实质I/O会阻塞进程。关于具体的某种I/O模型不再赘述,这里只看I/O多路复用中的情况。
select函数或者pselect函数允许进程指示内核等待多个事件中的任何一个发生,并只在一个或多个事件发生或经历一段指定的时间后才唤醒它。

1.1 select函数定义

从man中查看其函数定义:

1
2
3
4
5
6
7
8
9
10
11

/* According to POSIX.1-2001, POSIX.1-2008 */

#include <sys/select.h>

/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

根据参考也从最后的参数开始,这里就不比较select与pselect的区别了

1.1.1 timeout

表示告知内核等待所指定描述符中的任何一个就绪可花多长时间。意思是select阻塞等待文件描述符变为ready状态的时间间隔。
select中timout类型是timeval,pselect的类型是timespec,两者只是事件的细度略有差别。

1
2
3
4
5
6
7
8
struct timeval {
long tv_sec; /* 秒=10^6微秒 */
long tv_usec; /* 微秒 */
};
struct timespec {
long tv_sec; /* 秒=10^9纳秒 */
long tv_nsec; /* 纳秒 */
};

所以select将被阻塞直到下面三种情况中的任何一个发生:

一个文件描述符变为ready状态
调用被一个信号操作打断
timeout参数失效

所以根据调用select时候设置timeout的指,select有三种可能的情况:
1)永远等待下去:timeout == NULL,等待可以被一个信号中断。当有一个描述符做好准备或者是捕获到一个信号时函数会返回。
如果捕获到一个信号, select函数将返回-1,并将变量 erro设为 EINTR。
2)等待一段固定的时间:timeout->tv_sec !=0 || timeout->tv_usec!= 0
当有描述符符合条件或者超过超时时间的话,函数返回。
在超时时间即将用完但又没有描述符合条件的话,返回 0。
对于第一种情况,等待也会被信号所中断
3)根本不用等待:timeout->tv_sec == 0 &&timeout->tv_usec == 0
加入描述符集的描述符都会被测试,并且返回满足要求的描述符的个数。这种方法通过轮询,无阻塞地获得了多个文件描述符状态

前面两种情况的等待会被进在等待期间捕获的信号中断,并从信号函数返回。
在之前的版本中timout参数是const类型,意思是在函数返回时参数不会被select修改。但是现在最新的函数中取消这个限制,于是可修改了。

1.1.2 fd_set类型的三个集合readfds、writefds、exceptfds

指定内核测试读、写、异常的三个描述符集合。
->关于描述符集fd_set

fd_set是一个固定大小的buffer,通常是一个整数数组,其中每个整数中的每一位对应一个描述符。
例如32位的整数,那么数组index==0位置上的元素对应描述符0~31,类似的数组第二个元素对应描述符31~63。

我们不必关注fd_set的数据类型,具体的操作只需要关注下面的四个宏

1
2
3
4
5
6
#include <sys/select.h>

void FD_CLR(int fd, fd_set *set);//用于在文件描述符集合中删除一个文件描述符
int FD_ISSET(int fd, fd_set *set);//用于测试指定的文件描述符是否在该集合中
void FD_SET(int fd, fd_set *set);//用于在文件描述符集合中增加一个新的文件描述符
void FD_ZERO(fd_set *set); //将set的所有位都清空设为0,在对文件描述符集合进行设置前,必须对其进行初始化

1.1.3 nfds

nfds是三个集合中编号最高的文件描述符+1.
头文件中“bits/typesizes.h”中定义了FD_SETSIZE常量1024代表着文件描述符的最大值

1.2select函数返回值

成功的时候会返回包含在三个fd_set中的文件描述符数目,这个总的数目是指三个集合中的bit位的计算总和;
当然这个总数可能为0当在timeout超时前没有所关注的描述符的事件发生。
错误的时候返回-1

1.3 select函数的demo

select原理图
在网上看到一篇介绍select简单原理的图,这里摘过来
(注:select 原理图,摘自 IBM iSeries 信息中心)
下面是一段简单的代码,没有建立socket,只是关注了fd0的文件描述符,因为没做任何操作,所以会在5s时间过后退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int
main(void)
{
fd_set rfds;
struct timeval tv;
int retval;

/* Watch stdin (fd 0) to see when it has input. */
FD_ZERO(&rfds);
FD_SET(0, &rfds); //设置关注的描述符集合

/* Wait up to five seconds. */
tv.tv_sec = 5; //5秒的定时
tv.tv_usec = 0;

retval = select(1, &rfds, NULL, NULL, &tv); //writefds、exceptfds集合都是空的
/* Don't rely on the value of tv now! */

if (retval == -1)
perror("select()");
else if (retval)
{
printf("Data is available now.\n");
/* FD_ISSET(0, &rfds) will be true. */
if(FD_ISSET(0, &rfds))
{
printf("stdin (fd 0) has input \n");
}
}
else
printf("No data within 5 seconds.\n");

exit(EXIT_SUCCESS);
}

如果没有任何操作的话

1
2
No data within 5 seconds.
[Finished in 5.2s]

select返回是只是ready描述符的数目,因此需要从0开始去遍历fd_set集合,通过FD_ISSET判断描述符是否是关注的描述符。因此这种方式还是很低效的,
这里看到一个例子就不粘贴过来了。https://blog.csdn.net/poechant/article/details/7627894#

2 poll函数

poll函数和select函数一样,都是在返回的时候遍历描述符集合来查看描述符是否就绪。

2.1 poll函数定义

poll也和select一样有ppoll,这里也不赘述其升级版。

1
2
#include <poll.h> 
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

这里就从前往后看其参数

2.1.1 pollfd类型的fds

第一个参数fds是指向一个结构数组第一个元素的指针,也是放的所有被监视的文件描述符集合

1
2
3
4
5
struct pollfd {
int fd; /* file descriptor */
short events; /* 等待的事件 */
short revents; /* 实际发生的事件 */
};

从书上摘抄下events的相应常量值
事件event值
pollfd结构中fd绑定了相应的等待事件与返回事件,与select中的值-结果参数是存在区别的。

2.1.2 nfds

用于标记数组fds中的结构体元素的总数量,这里没有select中限制fd数1024的限制,但是一般是unsigned long或者unsigned int,基本属于无限制

2.1.3 timeout

这里的timeout是一个int值,与前面的select的timeout类型不同,指定poll返回前需要等待的时间,单位是毫秒(1秒=10^3毫秒)
timeout的可能只有三种情况:
——–INFTIM 永远等待
——–0 立即返回,不阻塞进程
——–>0 等待指定数目的毫秒数
INFTIM是一个负值,通常是-1;

2.2 poll函数返回值

发生错误时,返回-1;
定时器时间耗完没有fd就绪,返回0;
返回就绪描述符个数。
这和select返回值是一样的

2.3 poll的demo

poll与select大致相似,根据网上的demo进行一些学习
还是创建Server与Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

#include <netinet/in.h>
#include <sys/socket.h>
#include <poll.h>
#include <unistd.h>
#include <sys/types.h>
#include <arpa/inet.h>

#define IPADDRESS "127.0.0.1"
#define PORT 8787
#define MAXLINE 1024
#define LISTENQ 5
#define OPEN_MAX 1000
#define INFTIM -1

//函数声明
//创建套接字并进行绑定
static int socket_bind(const char* ip,int port);
//IO多路复用poll
static void do_poll(int listenfd);
//处理多个连接
static void handle_connection(struct pollfd *connfds,int num);

int main(int argc,char *argv[])
{
int listenfd,connfd,sockfd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
listenfd = socket_bind(IPADDRESS,PORT);
listen(listenfd,LISTENQ);
do_poll(listenfd);
return 0;
}

static int socket_bind(const char* ip,int port)
{
int listenfd;
struct sockaddr_in servaddr;
listenfd = socket(AF_INET,SOCK_STREAM,0);
if (listenfd == -1)
{
perror("socket error:");
exit(1);
}
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET,ip,&servaddr.sin_addr);
servaddr.sin_port = htons(port);
if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1)
{
perror("bind error: ");
exit(1);
}
return listenfd;
}

static void do_poll(int listenfd)
{
int connfd,sockfd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
struct pollfd clientfds[OPEN_MAX];
int maxi;
int i;
int nready;
//添加监听描述符
clientfds[0].fd = listenfd;
clientfds[0].events = POLLIN;
//初始化客户连接描述符
for (i = 1;i < OPEN_MAX;i++)
clientfds[i].fd = -1;
maxi = 0;
//循环处理
for ( ; ; )
{
//获取可用描述符的个数
nready = poll(clientfds,maxi+1,INFTIM);
if (nready == -1)
{
perror("poll error:");
exit(1);
}
//测试监听描述符是否准备好
if (clientfds[0].revents & POLLIN)
{
cliaddrlen = sizeof(cliaddr);
//接受新的连接
if ((connfd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen)) == -1)
{
if (errno == EINTR)
continue;
else
{
perror("accept error:");
exit(1);
}
}
fprintf(stdout,"accept a new client: %s:%d\n", inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
//将新的连接描述符添加到数组中
for (i = 1;i < OPEN_MAX;i++)
{
if (clientfds[i].fd < 0)
{
clientfds[i].fd = connfd;
break;
}
}
if (i == OPEN_MAX)
{
fprintf(stderr,"too many clients.\n");
exit(1);
}
//将新的描述符添加到读描述符集合中
clientfds[i].events = POLLIN;
//记录客户连接套接字的个数
maxi = (i > maxi ? i : maxi);
if (--nready <= 0)
continue;
}
//处理客户连接
handle_connection(clientfds,maxi);
}
}

static void handle_connection(struct pollfd *connfds,int num)
{
int i,n;
char buf[MAXLINE];
memset(buf,0,MAXLINE);
for (i = 1;i <= num;i++)
{
if (connfds[i].fd < 0)
continue;
//测试客户描述符是否准备好
if (connfds[i].revents & POLLIN)
{
//接收客户端发送的信息
n = read(connfds[i].fd,buf,MAXLINE);
if (n == 0)
{
close(connfds[i].fd);
connfds[i].fd = -1;
continue;
}
// printf("read msg is: ");
write(STDOUT_FILENO,buf,n);
//向客户端发送buf
write(connfds[i].fd,buf,n);
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

#include <netinet/in.h>
#include <sys/socket.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <poll.h>
#include <time.h>
#include <unistd.h>
#include <sys/types.h>
#include <arpa/inet.h>

#define MAXLINE 1024
#define IPADDRESS "127.0.0.1"
#define SERV_PORT 8787

#define max(a,b) (a > b) ? a : b

static void handle_connection(int sockfd);

int main(int argc,char *argv[])
{
int sockfd;
struct sockaddr_in servaddr;
sockfd = socket(AF_INET,SOCK_STREAM,0);
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
inet_pton(AF_INET,IPADDRESS,&servaddr.sin_addr);
connect(sockfd,(struct sockaddr*)&servaddr,sizeof(servaddr));
//处理连接描述符
handle_connection(sockfd);
return 0;
}

static void handle_connection(int sockfd)
{
char sendline[MAXLINE],recvline[MAXLINE];
int maxfdp,stdineof;
struct pollfd pfds[2];
int n;
//添加连接描述符
pfds[0].fd = sockfd;
pfds[0].events = POLLIN;
//添加标准输入描述符
pfds[1].fd = STDIN_FILENO;
pfds[1].events = POLLIN;
for (; ;)
{
poll(pfds,2,-1);
if (pfds[0].revents & POLLIN)
{
n = read(sockfd,recvline,MAXLINE);
if (n == 0)
{
fprintf(stderr,"client: server is closed.\n");
close(sockfd);
}
write(STDOUT_FILENO,recvline,n);
}
//测试标准输入是否准备好
if (pfds[1].revents & POLLIN)
{
n = read(STDIN_FILENO,sendline,MAXLINE);
if (n == 0)
{
shutdown(sockfd,SHUT_WR);
continue;
}
write(sockfd,sendline,n);
}
}
}

上面代码中可以看到,针对服务端,还是通过判断poll的返回值,然后去遍历查找pollfd类型的数组,和select是一样的

1
2
3
4
5
6
7
8
9
10
11
12
//获取可用描述符的个数
nready = poll(clientfds,maxi+1,INFTIM);
if (nready == -1)
{
perror("poll error:");
exit(1);
}
//测试监听描述符是否准备好
if (clientfds[0].revents & POLLIN)
{
....
}

参考链接:https://www.cnblogs.com/Anker/p/3261006.html https://www.cnblogs.com/zhuwbox/p/4222382.html

3 epoll函数

epoll是在内核2.6支持的,我当前的内核版本是4.4.0版本。
epoll和poll相似,都是监视多个文件描述符查看其是否就绪,但是epoll有很大的改进。
关于epoll的水平触发和边缘触发的工作模式在最后总结。
首先,你得包含头文件

1
#include <sys/epoll.h>

epoll调用主要是三个基本函数

1
2
3
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

3.1 epoll_create

epoll_create创建一个epoll实例,返回与该实例对应的文件描述符。当该fd不再需要的时候,应该通过close()函数进行关闭。
当指向一个epoll实例的全部文件描述符都已关闭时,内核销毁实例并释放关联的资源重用。
其定义如下:

1
2
#include <sys/epoll.h>
int epoll_create(int size);

从Linux 2.6.8后,参数size被忽略,但是必须大于0。

3.2 epoll_ctl

1
2
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll实例对应描述符的控制接口。

epfd
epoll_create()的返回值fd,标记epoll实例的fd

op
该参数表示相应的动作,该动作的操作对象是第三个参数fd。
op的动作是三个宏:
EPOLL_CTL_ADD:在epoll实例epfd上注册目标文件描述符fd,并将event事件与该fd(ˇˍˇ) 相关联
EPOLL_CTL_MOD:更改与目标fd相关联的事件event
EPOLL_CTL_MOD:从epoll实例epfd中删除(取消注册)目标文件fd

fd
表示需要监听的目标描述符fd

event
告诉内核需要监听fd上面的什么事件,参数event的类型是epoll_event,如下

1
2
3
4
5
6
7
8
9
10
11
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

epoll_event中的events是位掩码,由下面的几个宏组成:

EPOLLIN
对应的文件可读read

EPOLLOUT
对应的文件可写write

EPOLLRDHUP(Linux 2.6.17)
流套接字中使用的对等关闭连接

EPOLLPRI
有紧急的数据可读

EPOLLERR
文件描述符发生错误。epoll_wait会一直等待该事件;因此没必要将其放入events中

EPOLLHUP
对应文件描述符被挂起。和ERR一样,会造成一直等,也没必要放进events中。

EPOLLET
将epoll设置为边缘触发模式,针对水平触发而言的

EPOLLONESHOT
只监听一次,意味着当监听完后,事件从epoll_wait中移出,相关联的文件描述符不可用,若要再次监听需要再次放入epoll监 听队列

EPOLLWAKEUP
用的较少,遇到再详述。

3.3 epoll_wait

等待事件的产生

1
2
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

  • epfd是与epoll实例相关联的fd
  • maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size
  • 参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)
  • events指向的内存区域包含着可被调用者使用的事件

函数的返回值与上面类似,大于0的值表示已就绪的文件描述符数目
等于0表示定时器超时
-1表示epoll_wait函数发生了错误

3.4 通用的epoll代码框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
for( ; ; )
{
nfds = epoll_wait(epfd,events,20,500);
for(i=0;i<nfds;++i)
{
if(events[i].data.fd==listenfd) //如果是主socket的事件,则表示有新的连接
{
connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //accept这个连接
ev.data.fd=connfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //将新的fd添加到epoll的监听队列中
}
else if( events[i].events&EPOLLIN ) //接收到数据,读socket
{

if ( (sockfd = events[i].data.fd) < 0) continue;
n = read(sockfd, line, MAXLINE)) < 0 //读
ev.data.ptr = md; //md为自定义类型,添加数据
ev.events=EPOLLOUT|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓
}
else if(events[i].events&EPOLLOUT) //有数据待发送,写socket
{
struct myepoll_data* md = (myepoll_data*)events[i].data.ptr; //取数据
sockfd = md->fd;
send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); //发送数据
ev.data.fd=sockfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改标识符,等待下一个循环时接收数据
}
else
{
//其他情况的处理
}
}
}

当然最开始是创建epoll_create,然后是等待,通过for循环不断的注册需要监听的事件

3.5 从内核源码看epoll