[Linux Programming] I/O Multiplexing - I/O复用(select & poll)

select

#include <sys/select.h>
#include <sys/time.h>
int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout);
// @maxfdp1: maxfd plus 1, 0 ~ maxfdp1 - 1 will be check
// @timeout: nullptr for wait forever, .

void FD_ZERO(fd_set* fdset); // 初始化, 清除fdset中的所有描述符, 必要步骤!!!
void FD_SET(int fd, fd_set* fdset); // 将fd置入fdset
void FD_CLR(int fd, fd_set* fdset); // 将fd从fdset清除
void FD_ISSET(int fd, fd_set* fdset); // 用于检测fd是否存在于fdset
// client
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <sys/time.h>

const unsigned int SVR_PORT = 1234;
const char* SVR_IP = "127.0.0.1";
const unsigned int MAXLINE = 1024;

int main() {
    int sock_fd = socket(AF_INET, SOCK_STREAM, 0);

    sockaddr_in svr_addr;
    bzero(&svr_addr, sizeof(sockaddr_in));
    svr_addr.sin_family = AF_INET;
    svr_addr.sin_port = htons(SVR_PORT);
    svr_addr.sin_addr.s_addr = inet_addr(SVR_IP);

    connect(sock_fd, (sockaddr*)&svr_addr, sizeof(svr_addr));

    char buffer[MAXLINE];

    fd_set rset; FD_ZERO(&rset);
    int stdin_fd = fileno(stdin);
    bool is_write_eof = 0;
    while (1) {
        if(!is_write_eof) FD_SET(stdin_fd, &rset);

        FD_SET(sock_fd, &rset);
        int maxfd = stdin_fd < sock_fd ? sock_fd + 1 : stdin_fd + 1;
        select(maxfd, &rset, 0, 0, 0);

        if (FD_ISSET(sock_fd, &rset)) {
            int n = read(sock_fd, buffer, MAXLINE);
            if (n == 0) {
                if (!is_write_eof) printf("server terminated prematurely\n");

                return 0;
            }

            write(fileno(stdout), buffer, n);
        }

        if (FD_ISSET(stdin_fd, &rset)) {
            int n = read(stdin_fd, buffer, MAXLINE);
            if (n == 0) {
                is_write_eof = true;
                printf("client terminated\n");
                shutdown(sock_fd, SHUT_WR);
                FD_CLR(sock_fd, &rset);
                continue;
            }

            write(sock_fd, buffer, n);
        }
    }

    close(sock_fd);

    return 0;
}
// server
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <poll.h>

const unsigned int SVR_PORT = 1234;
const unsigned int LISTEN_QUEUE_LENTH = 5;
const unsigned int MAXLINE = 1024;
const unsigned int ACCEPT_CLIENT_SIZE = 5;

int main() {
    int lfd = socket(AF_INET, SOCK_STREAM, 0);

    sockaddr_in svr_addr;
    bzero(&svr_addr, sizeof(sockaddr_in));
    svr_addr.sin_family = AF_INET;
    svr_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    svr_addr.sin_port = htons(SVR_PORT);
    socklen_t addr_len = sizeof(svr_addr);
    bind(lfd, (sockaddr*)&svr_addr, addr_len);

    listen(lfd, LISTEN_QUEUE_LENTH);

    sockaddr_in clt_addr;
    bzero(&clt_addr, sizeof(sockaddr_in));

    int cfds[ACCEPT_CLIENT_SIZE];
    for(size_t i  =  0; i < ACCEPT_CLIENT_SIZE; ++i) cfds[i] = -1;

    fd_set rset; FD_ZERO(&rset);
    fd_set allset; FD_ZERO(&allset); FD_SET(lfd, &allset);

    int maxfd = lfd + 1;

    char buffer[MAXLINE];

    size_t maxi = 0;

    while (1) {
        rset = allset;
        int nready = select(maxfd + 1, &rset, 0, 0, 0);

        if (FD_ISSET(lfd, &rset)) {
            int cfd = accept(lfd, (sockaddr*)&clt_addr, &addr_len);
            FD_SET(cfd, &allset);
            size_t i = 0;
            for (; i < ACCEPT_CLIENT_SIZE; ++i) {
                if (cfds[i] < 0) {
                    cfds[i] = cfd;
                    break;
                }
            }
            if (cfds[i] > maxfd) maxfd = cfd;

            if (i > maxi) maxi = i;

            if (--nready <= 0) continue;
        }

        for (size_t i = 0; i <= maxi; ++i) {
            int fd = cfds[i];
            if (fd < 0) continue;;

            if (FD_ISSET(fd, &rset)) {
                int n = read(fd, buffer, MAXLINE);
                if (n <= 0) {
                    close(fd);
                    FD_CLR(fd, &allset);
                }
                else write(fd, buffer, n);

                if (--nready <= 0) break;
            }
        }
    }

    return 0;
}

poll

#include <poll.h>
struct pollfd {
    int fd; //descriptor to check
    short events; // events of interest on fd
    short revents; // events that occurred on fd
};
int poll (struct pollfd *fdarray, unsigned long nfds, int timeout);
Constant events revents desc
POLLIN 1 1 = POLLRDNORM POLLRDBAND
POLLRDNORM 1 1 normal data
POLLRDBAND 1 1 priority band data
POLLPRI 1 1 high priority data
POLLOUT 1 1 = POLLWRNORM
POLLWRNORM 1 1 normal
POLLWRBAND 1 1 priority band data
POLLERR 0 1 error
POLLHUP 0 1 hangup
POLLNVAL 0 1 not valid
// server
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <poll.h>

const unsigned int SVR_PORT = 1234;
const unsigned int LISTEN_QUEUE_LENTH = 5;
const unsigned int MAXLINE = 1024;
const unsigned int ACCEPT_CLIENT_SIZE = 5;

int main() {
    int lfd = socket(AF_INET, SOCK_STREAM, 0);

    sockaddr_in svr_addr;
    bzero(&svr_addr, sizeof(sockaddr_in));
    svr_addr.sin_family = AF_INET;
    svr_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    svr_addr.sin_port = htons(SVR_PORT);
    socklen_t addr_len = sizeof(svr_addr);
    bind(lfd, (sockaddr*)&svr_addr, addr_len);

    listen(lfd, LISTEN_QUEUE_LENTH);

    sockaddr_in clt_addr;
    bzero(&clt_addr, sizeof(sockaddr_in));

    pollfd c_poll[ACCEPT_CLIENT_SIZE];
    c_poll[0].fd = lfd;
    c_poll[0].events = POLLRDNORM;
    for (size_t i = 1; i < ACCEPT_CLIENT_SIZE; ++i) c_poll[i].fd = -1;

    size_t maxi = 0;
    char buffer[MAXLINE];

    while (1) {
         int nready = poll(c_poll, maxi + 1, -1);

         if (c_poll[0].revents & POLLRDNORM) {
             int cfd = accept(lfd, (sockaddr*)&clt_addr, &addr_len);

             size_t i = 0;
             for (; i < ACCEPT_CLIENT_SIZE; ++i) {
                 if (c_poll[i].fd < 0) {
                     c_poll[i].fd = cfd;
                     break;
                 }
             }
             c_poll[i].events = POLLRDNORM;

             if (i > maxi) maxi = i;

             if (--nready <= 0) continue;
         }

         for (size_t i = 0; i <= maxi; ++i) {
             int fd = c_poll[i].fd;
             if (fd < 0) continue;;

             if (c_poll[i].revents & (POLLRDNORM | POLLERR)) {
                 int n = read(fd, buffer, MAXLINE);
                 if (n <= 0) {
                    printf("client terminated\n");
                    close(fd);
                    c_poll[i].fd = -1;
                 }
                 else write(fd, buffer, n);

                 if (--nready <= 0) break;
             }
         }
     }

    return 0;
}

epoll

#include <sys/epoll.h>

int epoll_create(int size); // create epoll instance
// @size: any value > 0

typedef union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;
struct epoll_event {
    uint32_t events; // epoll events
    epoll_data_t data; // user data variable
};
// @events: event types
// - EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLPRI | EPOLLERR | EPOLLHUP | EPOLLET | EPOLLONESHOT
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// @op
// - EPOLL_CTL_ADD | EPOLL_CTL_MOD | EPOLL_CTL_DEL
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
// @maxevents: 
// @timeout: >0: minimum ms will block, =0: return immediately any way, =-1: block always
// server
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/epoll.h>

const unsigned int SVR_PORT = 1234;
const unsigned int LISTEN_QUEUE_LENTH = 5;
const unsigned int MAX_EVENTS = 10;
const unsigned int MAX_BUF_LEN = 1024;

int main() {
    int lfd = socket(AF_INET, SOCK_STREAM, 0);

    sockaddr_in svr_addr;
    bzero(&svr_addr, sizeof(sockaddr_in));
    svr_addr.sin_family = AF_INET;
    svr_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    svr_addr.sin_port = htons(SVR_PORT);
    socklen_t addr_len = sizeof(sockaddr);
    bind(lfd, (sockaddr*)&svr_addr, addr_len);

    listen(lfd, LISTEN_QUEUE_LENTH);

    int efd = epoll_create(MAX_EVENTS);
    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = lfd;
    epoll_ctl(efd, EPOLL_CTL_ADD, lfd, &ev);

    struct epoll_event events[MAX_EVENTS];
    char buf[MAX_BUF_LEN];
    while (1) {
        int nfds = epoll_wait(efd, events, MAX_EVENTS, -1);

        for(int i = 0; i < nfds; ++i){
            int sfd = events[i].data.fd;
            if(sfd == lfd) {
                sockaddr_in clt_addr;
                int cfd = accept(lfd, (sockaddr*)&clt_addr, &addr_len);
                if (cfd > 0) {
                    int flags = fcntl(cfd, F_GETFL, 0);
                    fcntl(cfd, F_SETFL, flags | O_NONBLOCK);

                    ev.events = EPOLLIN | EPOLLET;
                    ev.data.fd = cfd;
                    epoll_ctl(efd, EPOLL_CTL_ADD, cfd, &ev);
                }
                continue;
            }

            if(events[i].events & EPOLLIN) {
                size_t n = read(sfd, buf, MAX_BUF_LEN);
                if(n == 0) {
                    printf("read 0");
                    break;
                }
                else if(n > 0) {
                    ev.events = EPOLLOUT;
                    ev.data.fd = sfd;
                    epoll_ctl(efd, EPOLL_CTL_MOD, sfd, &ev);
                }
            }
            else if(events[i].events & EPOLLOUT){
                int len = strlen(buf);
                while(len > 0){
                    int n = write(sfd, buf, len);
                    len -= n;
                }

                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = sfd;
                epoll_ctl(efd, EPOLL_CTL_MOD, sfd, &ev);
            }
        }
    }

    close(efd);
    close(lfd);

    return 0;
}

发表评论

电子邮件地址不会被公开。必填项已用 * 标注