[Linux Programming] Nonblocking I/O - 非阻塞I/0

I/O阻塞与非阻塞

I/O阻塞何时发生?

read/readv/recv/recvfrom/recvmsg: 内核接收缓冲区无数据, 阻塞
write/writev/send/sendto/sendmsg: 内核发送缓冲区满, 阻塞
accept: 无新连接到达, 阻塞
connect: 一个新连接没有返回, 阻塞

为何要使用非阻塞I/O?

防止用户在做任何有效操作期间发生阻塞, 提升用户体验
提升程序执行并发效率

头文件 与 API

#include <fcntl.h>
int fcntl(intfd, int cmd, ... /* int arg */ );

// set a socket as nonblocking
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
// turn off
fcntl(fd, F_SETFL, flags & ~O_NONBLOCK)

Demo

// client
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <sys/time.h>
#include <fcntl.h>
#include <errno.h>

#include <algorithm>

const unsigned int SVR_PORT = 1234;
const char* SVR_IP = "127.0.0.1";
const unsigned int MAX_BUF_LEN = 1024;

int main() {
    int sock_fd = socket(AF_INET, SOCK_STREAM, 0);

    sockaddr_in svr_addr; bzero(&svr_addr, sizeof(sockaddr_in));
    svr_addr.sin_family = AF_INET;
    svr_addr.sin_port = htons(SVR_PORT);
    svr_addr.sin_addr.s_addr = inet_addr(SVR_IP);

    int ret = connect(sock_fd, (sockaddr*)&svr_addr, sizeof(svr_addr));
    if(ret < 0){
        printf("connect error occured: %s\n", strerror(errno));
        return 0;
    }

    int flags = fcntl(sock_fd, F_GETFL, 0); fcntl(sock_fd, F_SETFL, flags | O_NONBLOCK); 
    int stdin_fd = fileno(stdin); flags = fcntl(stdin_fd, F_GETFL, 0); fcntl(stdin_fd, F_SETFL, flags | O_NONBLOCK); 
    int stdout_fd = fileno(stdout); flags = fcntl(stdout_fd, F_GETFL, 0); fcntl(stdout_fd, F_SETFL, flags | O_NONBLOCK); 

    char s_buffer[MAX_BUF_LEN], r_buffer[MAX_BUF_LEN];
    char *si_ptr, *so_ptr, *ri_ptr, *ro_ptr;
    si_ptr = so_ptr = s_buffer;
    ri_ptr = ro_ptr = r_buffer;

    fd_set rset; FD_ZERO(&rset);
    fd_set wset; FD_ZERO(&wset);

    bool is_read_eof = 0;

    while (1) {
        if(!is_read_eof && so_ptr < s_buffer + MAX_BUF_LEN) FD_SET(stdin_fd, &rset);
        if(si_ptr < so_ptr) FD_SET(sock_fd, &wset);
        if(ro_ptr < r_buffer + MAX_BUF_LEN) FD_SET(sock_fd, &rset);
        if(ri_ptr < ro_ptr) FD_SET(stdout_fd, &wset);

        int maxfdp1 = std::max({sock_fd + 1, stdout_fd, stdin_fd});
        select(maxfdp1, &rset, &wset, 0, 0); // 

        if (FD_ISSET(stdin_fd, &rset)) {
            int n = read(stdin_fd, s_buffer, s_buffer + MAX_BUF_LEN - so_ptr);
            if(n < 0 && errno != EWOULDBLOCK) printf("read stdin_fd other error occured: %s\n", strerror(errno));
            else if (n == 0) {
                is_read_eof = true; printf("client terminated\n");
                if(si_ptr == so_ptr) shutdown(sock_fd, SHUT_WR);
            }
            else if(n > 0){
                so_ptr += n;
                // printf("stdin_fd read %d, si_ptr = %ld, so_ptr = %ld.\n", n, si_ptr - s_buffer, so_ptr - s_buffer);
                FD_SET(sock_fd, &wset);
            }
        }

        if (FD_ISSET(sock_fd, &rset)) {
            int n = read(sock_fd, r_buffer, r_buffer + MAX_BUF_LEN - ro_ptr);
            if(n < 0 && errno != EWOULDBLOCK) printf("read sock_fd other error occured: %s\n", strerror(errno));
            else if (n == 0) {
                if (!is_read_eof) printf("server terminated prematurely\n");
                else return 0;
            }
            else if(n > 0){
                ro_ptr += n;
                // printf("sock_fd read %d, ri_ptr = %ld, ro_ptr = %ld.\n", n, ri_ptr - r_buffer, ro_ptr - r_buffer);
                FD_SET(stdout_fd, &wset);
            }
        }

        if (FD_ISSET(stdout_fd, &wset)) {
            int n = write(stdout_fd, r_buffer, ro_ptr - ri_ptr);
            if(n < 0 && errno != EWOULDBLOCK) printf("write stdout_fd other error occured: %s\n", strerror(errno));
            else if(n > 0) {
                ri_ptr += n;
                if(ri_ptr == ro_ptr) {
                    // printf("stdout_fd write %d, ri_ptr == ro_ptr, reset s_buffer.\n", n);
                    ri_ptr = ro_ptr = r_buffer;
                }
            }
        }

        if (FD_ISSET(sock_fd, &wset)) {
            int n = write(sock_fd, s_buffer, so_ptr - si_ptr);
            if(n < 0 && errno != EWOULDBLOCK) printf("write sock_fd other error occured: %s\n", strerror(errno));
            else if (n > 0) {
                si_ptr += n;
                if(si_ptr == so_ptr) {
                    // printf("sock_fd write %d, si_ptr == so_ptr, reset s_buffer.\n", n);
                    si_ptr = so_ptr = s_buffer;
                    if(is_read_eof) shutdown(sock_fd, SHUT_WR);
                }
            }
        }
    }

    close(sock_fd);

    return 0;
}

非阻塞conenct

  1. 避免程序阻塞于connect
  2. 同时建立多个连接?
  3. connect设置为非阻塞后, 通过select来缩短connect等待时间

connect虽然是阻塞的, 但connect的完成可能很快, 所以需要通过connect的返回值来判断连接是否已经完成了, 确保接下来是否取消在sock_fd上的非阻塞模式.
当连接成功建立时, 描述符变成可写; 当连接遇到错误时, 描述符即可读也可写, 所以如何判断connect成功了?

  1. 用getpeername来获取对端信息, 如果返回错误ENOTCONN那么连接意味着连接建立失败, 接下来处理错误.
  2. 以值为0的长度参数调用read, 如果read < 0失败意味着连接失败, errno会给出失败原因.
  3. 再次调用connect, 如果连接成功那么connect应该失败返回错误EISCONN.
    EINPROGRESS 连接已启动但未完成

非阻塞accept

使用select后, accept仍然可能阻塞
客户端发起连接, 服务器程序通过select(可读)后, 前往调用accept, 但在accept之前, 客户端断开连接, 这个已完成的连接被服务器驱除队列, 此时队列也没有其他已经完成的连接, 程序将阻塞在accept处, 直到某个连接建立, 再次之前select将失效.

解决方案:

  1. 对accept使用非阻塞模式
  2. 忽略EWOULDBLOCK ECONNABORTED EPROTO EINTR

发表评论

电子邮件地址不会被公开。必填项已用 * 标注