Epoll 使用示例

    技术2022-05-20  38

    By fireworks@foxmail.com

    所谓边沿触发:将调用 epoll_wait 期间的事件进行了合并,因此事件数量较多时,边沿触发才显出优势

    使用UDP时,如果 read 时缓冲区小于包长时,其结果是依赖于实现的,可能会导致epoll_wait出错

     

    其他相关的背景知识,可以在网上轻松获取到,这里仅列一个示例的源码

    ------------------------------------------------------------------------------------------------------------------------------------------------

     

    #if 0

    作者:fireworks2@foxmail.com

    说明:一个简易的 TCP/UDP - epoll 服务器使用样例,尽量简化了逻辑

    日期:2011-04-11

    #endif

     

    #include <iostream>

    #include <sys/socket.h>

    #include <sys/epoll.h>

    #include <netinet/in.h>

    #include <arpa/inet.h>

    #include <fcntl.h>

    #include <unistd.h>

    #include <stdio.h>

    #include <errno.h>

     

    /* For KeepAlive */

    #include <linux/tcp.h>

    #include <linux/socket.h>

     

    using namespace std;

     

    #define DBG_TCP // 是否使用 TCP 样例,注释后为UDP

     

    #define MAXSOCK 1000

    #define MAXLINE 1024

    #define LISTENQ 20

    #define MAXEVENTS 30

    #define TIMEOUT 500 // ms

     

    #define LOCAL_IP "192.168.138.9"

    #define LOCAL_PORT 5002

     

    #define W_UDP SOCK_DGRAM

    #define W_TCP SOCK_STREAM

     

    #define ERRSTR strerror(errno) // 打印错误信息

     

    /* 将一个FD设置为非阻塞 */

    int w_UnblockFd(int fd)

    {

        int opts;

     

        opts = fcntl(fd, F_GETFL);

        if (opts < 0)

        {

            return -1;

        }

     

        if (fcntl(fd, F_SETFL, opts)<0)

        {

            return -2;

        }   

    }

     

    /* 创建一个套接字 */

    int w_Socket(int type){

        return socket(AF_INET, type, 0);

    }

     

    /* 开启 EPOLL */

    int w_EpollCreate(int count){

        return epoll_create(count);

    }

     

    /* epoll等待事件发生 */

    int w_EpollWait(int epfd, struct epoll_event *pEvents, int maxevents, int timeout){

        return epoll_wait(epfd, pEvents, maxevents, timeout); /* time out in ms */

    }

     

    /* 向epoll中加入一个描述符 */

    int w_EpollAdd(int epfd, int sockfd, int events){

        struct epoll_event ev;

     

        ev.data.fd = sockfd;

        ev.events = events;

        return epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);

    }

     

    /* 删除一个观察的描述符 */

    int w_EpollDel(int epfd, int sockfd){

        int iRet = epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);

        close(sockfd);

        return iRet;

    }

     

    /* 修改一个描述符的观测事件 */

    int w_EpollMod(int epfd, int sockfd, int events){

        struct epoll_event ev;

     

        ev.data.fd = sockfd;

        ev.events = events;

        return epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);

    }

     

    /* 查看事件是否为 IN */

    bool w_IsPollIn(struct epoll_event *pEvent){

        if (NULL == pEvent){

            return false;

        }

        return pEvent->events & EPOLLIN;

    }

    /* 查看事件是否为 OUT */

    bool w_IsPollOut(struct epoll_event *pEvent){

        if (NULL == pEvent){

            return false;

        }

        return pEvent->events & EPOLLOUT;

    }

     

    /* 从struct epoll_event中抽取fd */

    int w_GetFdFromEvent(struct epoll_event *pEvent){

        return pEvent->data.fd;

    }

     

    /* 由struct sockaddr_in 得到IP地址、端口 */

    int w_STAddr2IpPort(char** ppClientIp, int *pClientPort, struct sockaddr_in *pAddress){

        if (NULL == pAddress){

            return -1;

        }

     

        if (ppClientIp != NULL){

            *ppClientIp = inet_ntoa(pAddress->sin_addr);

        }    

        if (pClientPort != NULL){

            *pClientPort = ntohs(pAddress->sin_port);

        }

     

        return 0;

    }

     

    /* 由IP、端口得到struct sockaddr_in */

    int w_IpPort2STAddr(struct sockaddr_in *pAddress, const char *pIp, int port){

        if (NULL == pAddress){

            return -1;

        }

     

        bzero(pAddress, sizeof(struct sockaddr));

        pAddress->sin_family = AF_INET;

        pAddress->sin_port = htons(port);

     

        return inet_aton(pIp, &(pAddress->sin_addr));

    }

     

     

    /* 设置套接字收/发超时, 同时设置收发(发送的也可以使用非阻塞) */

    int w_SetSockTimeout(int sockfd, int ms){

        struct timeval stTimeval;

        stTimeval.tv_sec = ms / 1000;

        stTimeval.tv_usec = (ms % 1000) * 1000;

        setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, &stTimeval, sizeof(stTimeval));

        setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, &stTimeval, sizeof(stTimeval));

    }

     

    /* 设置 KEEP_ALIVE 的存活探测 */

    /* idleTime 秒没动静后,尝试以 intvlCheckTime 秒为间隔发探测包,最多探测 checkCnt 次 */

    /* 探测失败会触发事件(epoll / select 感知为套接字可读) */

    int w_SetKeepAlive(int sockfd, int idleTime, int intvlCheckTime, int checkCnt){

        int optval; 

        socklen_t optlen = sizeof(optval);    

     

        optval = 1;    

        if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen)){

            return -1;

        }

        optval = idleTime;

        if (setsockopt(sockfd, IPPROTO_TCP , TCP_KEEPIDLE, &optval, optlen)){

            return -2;

        }

        optval = intvlCheckTime;

        if (setsockopt(sockfd, IPPROTO_TCP , TCP_KEEPINTVL, &optval, optlen)){

            return -3;

        }

        optval = checkCnt;

        if (setsockopt(sockfd, IPPROTO_TCP, TCP_KEEPCNT, &optval, optlen)){

            return -4;

        }

     

        return 0;

    }

     

     

    /* 绑定一个地址:端口 */

    int w_Bind(int sockfd, const char *pIp, int port){

        struct sockaddr_in addr;

     

    #if 0

        bzero(&addr, sizeof(addr));

        addr.sin_family = AF_INET;

        inet_aton(ip, &(addr.sin_addr));

        addr.sin_port=htons(port);

    #endif

        w_IpPort2STAddr(&addr, pIp, port);

     

        return bind(sockfd,(sockaddr *)&addr, sizeof(addr));

    }

     

    /* 接收一个套接字 */

    int w_Accept(int sockfd, char** ppClientIp, int *pClientPort){

        struct sockaddr_in clientAddr;

        unsigned int addrlen = sizeof(clientAddr);

        int newSockfd = accept(sockfd, (sockaddr *)&clientAddr, &addrlen);

     

        if (newSockfd < 0){

            return newSockfd;

        }

     

        w_STAddr2IpPort(ppClientIp, pClientPort, &clientAddr);

     

        return newSockfd;

    }

     

    /* 得到 TCP 对端地址 */

    int w_Getpeername(int sockfd, char** ppClientIp, int *pClientPort){

        struct sockaddr_in clientAddr;

        socklen_t addrLen = sizeof clientAddr;

        int iRet = getpeername(sockfd, (struct sockaddr*)&clientAddr, &addrLen);

     

        w_STAddr2IpPort(ppClientIp, pClientPort, &clientAddr);

     

        return iRet;    

    }

     

     

    int main()

    {

        int epfd, listenfd;    

        /* 仅为示例,没有检查返回值 */

    #ifdef DBG_TCP

        // 创建监听套接字

        listenfd = w_Socket(W_TCP);    

        w_Bind(listenfd, LOCAL_IP, LOCAL_PORT);

        listen(listenfd, LISTENQ);

    #else

        // 创建普通套接字

        listenfd = w_Socket(W_UDP);    

        w_Bind(listenfd, LOCAL_IP, LOCAL_PORT); /* 指定本地所用的端口 */

    #endif

     

        // 创建 epoll

        w_UnblockFd(listenfd);

        epfd = w_EpollCreate(MAXSOCK);

        w_EpollAdd(epfd, listenfd, EPOLLIN); /* 可以加入多个fd,本地程序可监听多个端口 */

     

        struct epoll_event events[MAXEVENTS];

        while (1) {

            //等待epoll事件的发生

            int nfds = w_EpollWait(epfd, events, sizeof events, TIMEOUT);

            if (nfds < 0){

                // TO DO... 处理出错的情况(在UDP收包不收全时会产生错误)

                // 及时退出,否则会导致 CPU 100%

                break;

            }

     

            //处理所发生的所有事件

            for (int i = 0; i < nfds; ++i) {

                int fd = w_GetFdFromEvent(&events[i]);

                if (fd < 0) {

                    continue; // 出错处理

                }

     

    #ifdef DBG_TCP

                /* 监听到的套接字 */

                if (fd == listenfd) {

                    char *pIP;

                    int port;

                    int connfd = w_Accept(listenfd, &pIP, &port);

                    if(connfd < 0){

                        continue; // 出错处理

                    }

     

                    cout << "New connection from " << pIP << ":" << port << endl;

                    /* TO DO... */

     

                    w_UnblockFd(connfd);

                    //添加观测的文件描述符

                    w_EpollAdd(epfd, connfd, EPOLLIN); // EPOLLOUT|EPOLLET  

                    // LT 最后一批数据和退出事件不会合并

                    // ET 最后一次数据可能会和退出合并在一起了

     

    // keepalive 探测失败后,会触发2个 IN 事件(只是对超时而言,至于主机不可达或崩溃则是不一样的情况)

    //     第一次recv的错误码是 ETIMEDOUT,可以在这里的超时选择关闭fd

    //     第二次recv返回0

                    // w_SetKeepAlive(connfd, 60, 5, 8); 

                    continue;

                }

    #endif

                /* 读取数据 */

                if (w_IsPollIn(&events[i])) { 

                    int nbytes;

                    char buffer[MAXLINE];

     

    #ifdef DBG_TCP

                    nbytes = recv(fd, buffer, MAXLINE, 0);

                    // 可通过 w_Getpeername 获取对方的地址

    #else

                    // nbytes = recv(fd, buffer, MAXLINE, 0); // 不想获取对端地址可以直接调用recv

                    struct sockaddr clientAddr;

                    socklen_t addrLen = sizeof clientAddr;                

                    /* 配合epoll ET使用时,最好写个循环确保已读取所有数据,否则可能会出错 */

                    nbytes = recvfrom(fd, buffer, MAXLINE, 0, &clientAddr, &addrLen);                 

    #endif

                    if (0 == nbytes) { /* 无法感知网络断开,需要配置心跳包或KEEPALIVE,建议用前者 */

                        // TO DO

                        cout << "Connection closed!" << endl;

                        w_EpollDel(epfd, fd);  /* 忘记close会导致 LT 模式下 CPU 100% */

                        continue;

                    }else if (nbytes < 0) {

                        if (errno == ECONNRESET || errno == ETIMEDOUT) {

                            // TO DO

                            cout << "Line broke:" << nbytes << ERRSTR << endl;

                            w_EpollDel(epfd, fd);

                        }else{

                            // TO DO

                            cout << "Recv error:" << nbytes << ERRSTR << endl;

                        }

                        continue;

                    } 

     

                    // TO DO 正常处理流程

                    buffer[nbytes] = '/0';

                    // cout << "Read data:" << buffer << endl;

                    // w_EpollMod

                }

    #if 0

                else if (w_IsPollOut(&events[i])) {  /* EPOLLOUT 事件使用较少,大多数情况下是可写的,除非对端TCP recv 过慢 */

                    const char *buffer = "echo";

                    send(fd, buffer, sizeof buffer, 0); // udp 得用 sendto

                }

    #endif

                else {  /* 其他事件 */

                    // TO DO

                    cout << "Connection closed abnormal!" << endl;

                    w_EpollDel(epfd, fd);

                }

            }

        }

     

        return 0;

    }

     


    最新回复(0)