Unix Network Programming - 2. I/O复用模型 select/poll

Unix下的5中I/O模型

  • 阻塞式I/O: 前一篇实际上即是阻塞式I/O模型了, 默认情况下, socket系列函数都是阻塞的.
  • 非阻塞式I/O: 应用进程持续轮询内核, 以询问该描述符是否有事件就绪, 有事件则处理, 没有则返回EWOULDBLOCK错误, 以达到非阻塞的效果.
  • I/O复用: I/O复用将阻塞作用于select或poll系统调用上, 而不是阻塞在I/O系统调用上, 事件在select/poll上注册, 再由它们分发.
  • 信号驱动式I/O: 当事件发生时通过信号来通知启动一个I/O操作.
  • 异步I/O: 告知内核启动某个操作, 并让内核在整个操作完成后发出通知.

本篇主要了解I/O复用模型, 学习使用select和poll, 另外以及Linux下的epoll, 了解此三者有何不同, 有何优劣, 以及在何时使用.

I/O复用的优势在于可以同时指定多个描述符, 指示内核监听多个事件, 当有一个或多个事件发生时, 分发事件到正确的逻辑处处理.
在Unix下主要是使用select和poll来完成这个操作, 而在Linux下有了更高效的epoll, 实际上, 如今服务器普遍使用Linux系统, epoll因其高效性, 出场率更高一些, 需要重点学习. 但select和poll仍是需要学习的, 一步一步来吧.

select/poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <sys/select.h>
#include <sts/time.h>

int select(int maxfdp1, fd_set* readset, fd_set* writeset, fd_set* exceptset, const struct timeval* time);
// return 就绪描述符数目/0(超时)/1(出错)

// @maxfdp1: maxfd plus 1, 最大描述符+1, 0~maxfdp1-1的的描述符都将被检测.
// @readset/writeset/exceptset, 分别表示select将要关心的读/写/异常描述符集, 将关心的fd置入相应的fdset, 表示关注发生在fd上的事件.
// @time: 超时时间, timeval的结构如下, 所以超时的时间精度为微妙级.
// 如果time为nullptr则表示永远等待知道一个描述符就绪, 如果结构体为0表示不等待, 检测描述符后立即返回, 等同于轮询.
// struct timeval{
// long tv_sec; // 秒级
// long tv_usec; // 微妙级
// }

// 描述符与描述符集之间的联系
void FD_ZERO(fd_set* fdset); // 初始化, 清除fdset中的所有描述符, 必要步骤!!!
void FD_SET(int fd, fd_set* fdset); // 将fd置入fdset
void FD_CLR(int fd, fd_set* fdset); // 将fd从fdset清除
void FD_ISSET(int fd, fd_set* fdset); // 用于检测fd是否存在于fdset

select被调用时, readset/writeset/expectset传入时用来指定所关心的描述符, 返回时用来指示已就绪的描述符.
返回的就绪描述符可以通过FD_ISSET来检测fd_set中的描述符.
注意!描述符内任何未就绪的描述符对应的位在select返回是都将置为0, 也就是说再次调用select是需要重新设置所关心的描述符.

描述符就绪条件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#include "Common.h"

void str_cli(FILE* fd, int sockfd) {
char buffer[MAXLINE];
fd_set rset;
FD_ZERO(&rset);

int stdineof = 0;

while (1) {
if(stdineof == 0) FD_SET(fileno(fd), &rset);
FD_SET(sockfd, &rset);
int maxfd = std::max(fileno(fd), sockfd) + 1;
select(maxfd, &rset, 0, 0, 0);

if (FD_ISSET(sockfd, &rset)) {
int n = read(sockfd, buffer, MAXLINE);
if (n == 0) {
if (stdineof == 1)
return;
else {
std::cout << "server terminated prematurely" << std::endl;
return;
}
}
write(fileno(stdout), buffer, n);
}

if (FD_ISSET(fileno(fd), &rset)) {
int n = read(fileno(fd), buffer, MAXLINE);
if (n == 0) {
stdineof = 1;
shutdown(sockfd, SHUT_WR); // FIN -> server
FD_CLR(fileno(fd), &rset);
continue;
}
write(sockfd, buffer, n);
}
}
}

int main(int argc, char** argv) {
if(argc != 2) return 0;

int sockfd = socket(AF_INET, SOCK_STREAM, 0); // IPv4, TCP
if (sockfd < 0) {
std::cout << "create blank socket failure." << std::endl;
}

sockaddr_in server_addr;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT); // 将端口转换为网络字节序
inet_pton(AF_INET, argv[1], &server_addr.sin_addr); // 点分十进制到IP地址结构

// 发起与服务器建立连接, 触发三次握手
// C: CLOSED --[connect(), send:SYN]--> SYNSEND --[recv: SYN,ACK, send: ACK ]--> ESTABLISHED
// S: CLOSED --[listen()]--> LISTENING --[recv: SYN, send: SYN,ACK]--> SYNRECV --[recv: ACK]--> ESTABLISHED
//
// 该函数在连接成功或出错时才会返回, 其中可能会遇到的错误:
// 1. 客户端的SYN没有收到服务器任何响应, 返回ETIMEOUT错误
// 2. 服务器相应的端口没有进程在等待客户端与之建立连接, 此时响应SYN一个RST, 触发ECONNREFUSED错误
// 3. 目的不可达ICMP错误, 但客户端会继续进行尝试, 一段时间后将保存的ICMP错误转换为EHOSTUNREACH或ENEWUNREACH错误
if (connect(sockfd, (sockaddr*)& server_addr, sizeof(server_addr)) < 0) {
std::cout << "connect to server failure." << std::endl;
}

str_cli(stdin, sockfd);

exit(0);
// 客户端进程正常退出, 将关闭进程打开的所有描述符, 在关闭sockfd时, 将会向服务器发送一个FIN, 开始TCP断开连接四次挥手的前两个分节
// C: ESTABLISHED --[close(), send:FIN]--> FIN_WAIT_1 --[recv:ACK]--> FIN_WAIT_2 --[recv:FIN, send:ACK]--> TIME_WAIT --[2MSL timeout]--> CLOSED
// S: ESTABLISHED --[recv:FIN, send:ACK]--> CLOSE_WAIT --[close(), send:FIN]--> LAST_ACK --[recv:ACK]--> CLOSED

return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#include "Common.h"

int main() {
int listen_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_sockfd < 0) {
std::cout << "create blank socket failure." << std::endl;
}

sockaddr_in socket_addr;
bzero(&socket_addr, sizeof(socket_addr));
socket_addr.sin_family = AF_INET;
socket_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 等到TCP连接成功后选择IP地址
socket_addr.sin_port = htons(SERVER_PORT);

if (bind(listen_sockfd, (sockaddr*)& socket_addr, sizeof(socket_addr)) < 0) {
std::cout << "bind socket address failure." << std::endl;
}

if (listen(listen_sockfd, LISTENQ) < 0) {
std::cout << "listening socket create failure." << std::endl;
}

sockaddr_in client_addr;
socklen_t clilen = sizeof(sockaddr_in);
std::array<int, FD_SETSIZE> vConnectfd;
vConnectfd.fill(-1);

fd_set rset, allset;
FD_ZERO(&rset);
FD_ZERO(&allset);
FD_SET(listen_sockfd, &allset);

int maxfd = listen_sockfd;
char buffer[MAXLINE];
int maxi = 0;

while (1) {
rset = allset;
int nready = select(maxfd + 1, &rset, 0, 0, 0);

int connect_sockfd;
if (FD_ISSET(listen_sockfd, &rset)) {
int connect_sockfd = accept(listen_sockfd, (sockaddr*)& client_addr, &clilen);
if (connect_sockfd < 0) {
if (errno == EINTR) continue;
else std::cout << "connected socket create failure." << std::endl;
}
int i = 0;
for (; i < vConnectfd.size(); ++i) {
if (vConnectfd[i] < 0) {
vConnectfd[i] = connect_sockfd;
break;
}
}

FD_SET(connect_sockfd, &allset);

if (i > maxi) maxi = i;

if (connect_sockfd > maxfd) maxfd = connect_sockfd;

if (--nready <= 0) continue;
}

for (int i = 0; i <= maxi; ++i) {
int sockfd = vConnectfd[i];
if (sockfd < 0) continue;;

if (FD_ISSET(sockfd, &rset)) {
int n = read(sockfd, buffer, MAXLINE);
if (n == 0) {
close(sockfd);
FD_CLR(sockfd, &allset);
}
else write(sockfd, buffer, n);

if (--nready <= 0) break;
}
}
}
}

poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#include "Common.h"

int main() {
int listen_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_sockfd < 0) {
std::cout << "create blank socket failure." << std::endl;
}

sockaddr_in socket_addr;
bzero(&socket_addr, sizeof(socket_addr));
socket_addr.sin_family = AF_INET;
socket_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 等到TCP连接成功后选择IP地址
socket_addr.sin_port = htons(SERVER_PORT);

if (bind(listen_sockfd, (sockaddr*)& socket_addr, sizeof(socket_addr)) < 0) {
std::cout << "bind socket address failure." << std::endl;
}

if (listen(listen_sockfd, LISTENQ) < 0) {
std::cout << "listening socket create failure." << std::endl;
}

sockaddr_in client_addr;
socklen_t clilen = sizeof(sockaddr_in);

pollfd client[1024];
client[0].fd = listen_sockfd;
client[0].events = POLLRDNORM; // 普通事件可读

for (int i = 1; i < 1024; ++i) client[i].fd = -1;

char buffer[MAXLINE];
int maxi = 0;

while (1) {
int nready = poll(client,maxi+1, -1);

int connect_sockfd;
if (client[0].revents & POLLRDNORM) {
int connect_sockfd = accept(listen_sockfd, (sockaddr*)& client_addr, &clilen);
if (connect_sockfd < 0) {
if (errno == EINTR) continue;
else std::cout << "connected socket create failure." << std::endl;
}
int i = 0;
for (; i < 1024; ++i) {
if (client[i].fd < 0) {
client[i].fd = connect_sockfd;
break;
}
}

client[i].events = POLLRDNORM;

if (i > maxi) maxi = i;

if (--nready <= 0) continue;
}

for (int i = 0; i <= maxi; ++i) {
int sockfd = client[i].fd;
if (sockfd < 0) continue;;

if (client[i].revents & (POLLRDNORM | POLLERR)) {
int n = read(sockfd, buffer, MAXLINE);
if (n < 0) {
if(errno == ECONNRESET)
close(sockfd);
client[i].fd = -1;
}
else if (n == 0) {
close(sockfd);
client[i].fd = -1;
}
else write(sockfd, buffer, n);

if (--nready <= 0) break;
}
}
}
}