EPOLL+FIFO多进程通信简单示例

    技术2022-05-19  24

    在linux多进程服务程序中,可以使用FIFO的方式实现进程间的通信。当子进程很多,或通信量很大的情况下,父进程通过epoll来进行IO复用是提高系统性能的很好选择。

    epoll的优势:

    epoll是2.6版本内核引入的新特性,相对于以前的IO复用方式poll和select方式,epoll能显著减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。

    使用epoll的主要步骤:

    包含头文件 #include <sys/epoll.h>

    主要函数为:

    int epoll_create(int size)

    创建epoll的文件描述符   参数size告诉内核预计要监听的fd数目,但不是最大数目,成功返回epoll的fd,出错返回-1。在使用完epoll后一定要close(),否则会浪费系统的fd

     

    int epoll_ctl(int epfd, int op, int fd, stuct epoll_event *event) 

    epoll事件注册函数                   epfd为epoll实例的fd, op为操作类型:EPOLL_CTL_ADD:注册新的fd到epfd中; EPOLL_CTL_MOD:修改已经注册的fd的监听事件; EPOLL_CTL_DEL:从epfd中删除一个fd

     

             epoll_event的结构体为:

             struct epoll_event {

                   __u32  events;

                   union{

                         void *ptr;

                         int    fd;

                         __u32 u32;

                         __u64 u64;

                   }data;

             }

     

    events可以是以下几个宏的集合: EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭); EPOLLOUT:表示对应的文件描述符可以写; EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来); EPOLLERR:表示对应的文件描述符发生错误; EPOLLHUP:表示对应的文件描述符被挂断; EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。缺省状态为LT EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

     

    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)

    等待事件的产生   参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时

     

    LT与ET的区别:

    LT为level triggered 水平触发模式 ,是EPOLL的缺省工作模式,只是阻塞和非阻塞的socket。

    ET为edge triggered 上升沿触发模式,只支持非阻塞的socket。

    工作方式的主要区别是,当文件描述符就绪是,内核会发出通知,在LT的模式下,如果没有对该文件描述符进行处理,内核会持续提醒,直到完成。而在ET模式下,内核只会提醒一次,然后就假设用户已经对该描述符进行了处理,不再继续提示了。

     

    另外要注意的一个细节是,当子进程退出时,程序会收到SIGCHILD的信号,会导致程序在epoll_wait处返回-1,并使epoll_wait报错,且错误码为EINTR,这时实际上程序并没有出错,还能够正常运行,所以在这正情况下应该让其继续运行(continue)而不退出

     

    代码如下,其中主进程拥有一个FIFO,保存在/tmp/下,以进程ID.fifo为文件名。当子进程启动后,要首先打开父进程的FIFO,向其中写入请求连入信息。子进程也创建自己的FIFO,也是保存在/tmp/下,以pid.fifo为文件名,并用这个FIFO向主进程发送信息,其构成为4个byte的包头和包体,包头描述包的长度,包体为要发送的内容

     

    #define MAX_COMMAND_LENGTH 4 int g_iFifoFd = 0; //全局变量,子进程FIFO的fd,由/tmp/pid.fifo打开,在启动子进程时确定 bool g_bQuit; //退出标志 void* ListenChildMsg(void *tmp) { int kdpfd ; int curfds = 1; int nfds,n; char* ReadBuffer=NULL; ssize_t ret; size_t recLength; char HeadBuffer[MAX_COMMAND_LENGTH + 1];//接受每次发来的信息head char ContentBuffer[TOTAL_LENGTH]={0};//接受信息内容的缓冲区 struct epoll_event ev; struct epoll_event events[MAXEPOLLSIZE]; char ListenFifoPath[MAX_PATH_LENGTH]={0}; int Listenfd = -1; sprintf(ListenFifoPath, "/tmp/%d.fifo",getpid()); if((mkfifo(ListenFifoPath, O_CREAT|O_EXCL)<0)&&(errno!=EEXIST)) { Log->error("创建fifo失败,原因:%s", strerror(errno)); exit(0); } else { Log->error("创建fifo成功,%s", ListenFifoPath); } Listenfd=open(ListenFifoPath, O_RDONLY, 0); if (Listenfd == -1) { Log->error("Open fifo %s error, %s", ListenFifoPath, strerror(errno)); exit(0); } /* 创建 epoll 句柄,把监听 socket 加入到 epoll 集合里 */ kdpfd = epoll_create (MAXEPOLLSIZE); ev.events = EPOLLIN;// | EPOLLET; LT not ET ev.data.fd = Listenfd; if (epoll_ctl (kdpfd, EPOLL_CTL_ADD, Listenfd, &ev) < 0) { Log->error("epoll set insertion error: fd=%d/n", Listenfd); return -1; } else Log->info("监听 socket 加入 epoll 成功!/n"); while (!g_bQuit) { printf ("wait/n"); /* 等待有事件发生 */ nfds = epoll_wait (kdpfd, events, curfds, -1); if (g_bQuit) { close(Listenfd); unlink(ListenFifoPath); return 0; } if (nfds == -1) { if (errno == EINTR) { continue; } else { Log->error("epoll_wait error, %s",strerror (errno)); break; } } printf ("/n/n%d/n/n/n",nfds); /* 处理所有事件 */ for (n = 0; n < nfds; ++n) { if (events[n].data.fd == Listenfd) //有新的子进程请求加入 { char test[5]={0}; ret = read(events[n].data.fd, test, 1); printf("test:%s/n",test); // setnonblocking (g_iFifoFd); // 设置成非阻塞 // ev.events = EPOLLIN | EPOLLET; // 使用LT模式 ev.events = EPOLLIN ; // 使用LT模式 int i=10 while(g_iFifoFd==0 || g_iFifoFd==-1) { i--; usleep(500); if (i==0) { continue; } } ev.data.fd = g_iFifoFd; //将子进程的FIFO放入监听队列 if (epoll_ctl (kdpfd, EPOLL_CTL_ADD, g_iFifoFd, &ev) < 0) { Log->warn("把fd:'%d' 加入 epoll 失败!%s/n", g_iFifoFd, strerror (errno)); return -1; } else { printf("fd:%d加入epoll",g_iFifoFd); } curfds++; g_iFifoFd = -1; } else //子进程发送信息 { ret = read(events[n].data.fd, HeadBuffer, MAX_COMMAND_LENGTH); printf("get message :%s ret:%d /n",HeadBuffer,ret); if (ret < MAX_COMMAND_LENGTH) { printf("read error: (%d) %s/n", errno, strerror(errno)); //prepare to receive the msg head if (ret == EAGAIN) { break; } epoll_ctl (kdpfd, EPOLL_CTL_DEL, events[n].data.fd, &ev); curfds--; close(events[n].data.fd); printf("关闭fifo"); } else { HeadBuffer[MAX_COMMAND_LENGTH]='/0'; memset(ContentBuffer, 0, sizeof(ContentBuffer)); memcpy(ContentBuffer, HeadBuffer, MAX_COMMAND_LENGTH); recLength=atoi(HeadBuffer); ReadBuffer = (char *)ContentBuffer+MAX_COMMAND_LENGTH; Log->info("Should receive length is %d",recLength+MAX_COMMAND_LENGTH); while( recLength != 0 && (ret = read(events[n].data.fd, ReadBuffer, recLength)) != 0 ) { if (ret == -1) { if (errno == EINTR) { continue; } Log->error("读I/O监控信息出错,%s",strerror(errno)); break; } recLength -= ret; ReadBuffer += ret; } Log->info("Total length is %d",strlen(ContentBuffer)); if (recLength != 0) { epoll_ctl (kdpfd, EPOLL_CTL_DEL, events[n].data.fd, &ev); curfds--; } else { /*获得信息*/ printf(ContentBuffer) } }// if (ret < MAX_COMMAND_LENGTH) }//if (events[n].data.fd == ListenSocket) }//for (n = 0; n < nfds; ++n) }//while (1) close(Listenfd); unlink(ListenFifoPath); return 0; }

     

    参考:http://apps.hi.baidu.com/share/detail/7319502

    参考: http://blog.chinaunix.net/u/16292/showart_1844376.html


    最新回复(0)