高效的服务器

    技术2024-04-20  8

    高效服务器的关键技术: 1.需要有固定数量的线程来去处理请求,而不可以每次收到请求都fork或者create_thread; 2.如果socket使用堵塞模式,那么read必须要有超时,如果使用非堵塞模式,那么read就必须要有数据缓冲; 3.使用epoll分别在不同的线程中监视可读和可写; 4.对于tcp长连接使用主线程分配连接,然后由任务线程处理,一个任务线程对应一个任务队列;对于tcp短连接和udp连接,使用竞争线程; 5.时刻检测并且关闭超时连接; 6.真正关闭socket的地方只能由一处,其他地方发现其socket需要关闭只能标识socket应该关闭了,然后由那一处安全的关闭socket; 7.当服务器需要不断的发送硬盘上的数据给客户端(比如说文件传输服务器和流媒体服务器),那么服务器端必须先从硬盘加载数据到数据缓存,再从数据缓冲发送数据给客户端; 8.内存分配必须成功,不然线程睡眠然后重新分配直到分配成功为止; 通用设计方案: 现在我们设计一个通用的服务器,这个服务器接受客户端的连接,然后不断读取客户端的请求,在某一时刻又转为不断的发送数据给客户端;下面我们看看怎么设计这样的一个服务器 注: 1.为了只关注设计方面并且让代码尽量简洁,对于一些错误处理我删除了。 2.must_malloc,must_calloc和其他前缀为must的函数封装了相应的内存分配函数,使得分配必须成功; 3.queue_t结构是一个通用的任务队列结构,queue_new是创建新的队列,queue_push是推数据到队列中,queue_pop是从队列中抛出数据 4.下面的代码只是一个良好的服务器架构,而不是一个真正能使用的服务器 5.欢迎讨论和优化! Cpp代码 < type="application/x-shockwave-flash" width="14" height="15" src="http://andylin02.javaeye.com/javascripts/syntaxhighlighter/clipboard_new.swf" pluginspage="http://www.macromedia.com/go/getflashplayer" allowscriptaccess="always" quality="high" flashvars="clipboard=typedef%20struct%20server_st%20*server_t%3B%0Atypedef%20struct%20task_queue_st%20*task_queue_t%3B%0Atypedef%20struct%20conn_st%20*conn_t%3B%0A%0A%2F**%20%E6%9C%8D%E5%8A%A1%E5%99%A8%E7%BB%93%E6%9E%84%20*%2F%0Astruct%20server_st%20%7B%0A%09%2F**%20%E7%9B%91%E5%90%AC%E5%9C%B0%E5%9D%80%20*%2F%0A%09struct%20in_addr%20listen_addr%3B%0A%0A%09%2F**%20%E7%9B%91%E5%90%AC%E7%AB%AF%E5%8F%A3%20*%2F%0A%09int%20port%3B%0A%0A%09%2F**%20%E7%9B%91%E5%90%AC%E6%8F%8F%E8%BF%B0%E7%AC%A6%20*%2F%0A%09int%20listen_fd%3B%0A%0A%09%2F**%E8%B4%9F%E8%B4%A3%E8%AF%BB%E5%8F%96%E8%AF%B7%E6%B1%82%E7%9A%84%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%2C%E4%B8%80%E4%B8%AA%E7%BA%BF%E7%A8%8B%E8%B4%9F%E8%B4%A3%E5%A4%84%E7%90%86%E4%B8%80%E4%B8%AA%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%EF%BC%8C%E4%B8%80%E5%85%B1%E4%BA%94%E4%B8%AA*%2F%0A%09queue_t%20readreq_task_queues%5B5%5D%3B%0A%0A%09%2F**%E8%B4%9F%E8%B4%A3%E5%86%99%E6%95%B0%E6%8D%AE%E7%9A%84%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%2C%E4%B8%80%E4%B8%AA%E7%BA%BF%E7%A8%8B%E8%B4%9F%E8%B4%A3%E5%A4%84%E7%90%86%E4%B8%80%E4%B8%AA%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%EF%BC%8C%E4%B8%80%E5%85%B1%E4%BA%94%E4%B8%AA*%2F%0A%09queue_t%20writedata_task_queues%5B5%5D%3B%0A%7D%3B%0A%0A%2F**%20%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%E7%BB%93%E6%9E%84%20*%2F%0Astruct%20task_queue_st%20%7B%0A%09%2F**%20%E5%BE%85%E6%B7%BB%E5%8A%A0%E7%9A%84%E8%BF%9E%E6%8E%A5%20*%2F%0A%09queue_t%20pending_conn_queue%3B%0A%0A%09%2F**%20%E4%BA%92%E6%96%A5%E9%94%81%2C%E7%94%A8%E4%BA%8E%E5%A4%9A%E7%BA%BF%E7%A8%8B%E7%8E%AF%E5%A2%83%E4%B8%8B%E5%AF%B9%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%E7%9A%84%E8%AE%BF%E9%97%AE%E6%8E%A7%E5%88%B6%20*%2F%0A%09pthread_mutex_t%20task_queue_mutex%3B%0A%0A%09%2F**%20%E9%93%BE%E8%A1%A8%E7%BB%93%E6%9E%84%EF%BC%8C%E4%BF%9D%E5%AD%98%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%E4%B8%AD%E7%9A%84%E6%89%80%E6%9C%89%E8%BF%9E%E6%8E%A5%20*%2F%0A%09conn_t%20conn_head%3B%0A%0A%09%2F**%20%E9%93%BE%E8%A1%A8%E7%9A%84%E6%9C%80%E5%90%8E%E4%B8%80%E4%B8%AA%E8%8A%82%E7%82%B9%EF%BC%8C%E7%94%A8%E4%BA%8E%E6%96%B9%E4%BE%BF%E5%9C%A8%E7%BB%93%E5%B0%BE%E6%8F%92%E5%85%A5%E6%96%B0%E7%9A%84%E8%8A%82%E7%82%B9%20*%2F%0A%09conn_t%20conn_tail%3B%0A%0A%09%2F**%20%E6%80%BB%E8%BF%9E%E6%8E%A5%E6%95%B0*%2F%0A%09int%20total_conns%3B%0A%7D%3B%0A%0A%2F**%20%E7%94%A8%E6%88%B7%E7%9A%84%E9%98%B6%E6%AE%B5%E7%8A%B6%E6%80%81%20*%2F%0Atypedef%20enum%20%7B%0A%09status_ESTABLISHED%2C%2F**%20%E5%AE%A2%E6%88%B7%E7%AB%AF%E5%88%9A%E5%BB%BA%E7%AB%8B%E8%BF%9E%E6%8E%A5%20*%2F%0A%09status_CONNECTED%2C%2F**%20%E5%AE%A2%E6%88%B7%E7%AB%AF%E6%AD%A3%E5%9C%A8%E8%BF%9E%E6%8E%A5%20*%2F%0A%09status_START_RECV_DATA%2C%2F**%20%E5%AE%A2%E6%88%B7%E7%AB%AF%E5%BC%80%E5%A7%8B%E6%8E%A5%E6%94%B6%E6%95%B0%E6%8D%AE%2C%E5%8D%B3%E8%A1%A8%E7%A4%BA%E6%9C%8D%E5%8A%A1%E5%99%A8%E7%AB%AF%E5%BC%80%E5%A7%8B%E5%8F%91%E9%80%81%E6%95%B0%E6%8D%AE%20*%2F%0A%09status_CLOSED%2F**%20%E5%AE%A2%E6%88%B7%E7%AB%AF%E5%85%B3%E9%97%AD%E8%BF%9E%E6%8E%A5%20*%2F%0A%7D%20client_status_t%3B%0A%0A%2F**%20%E6%9C%8D%E5%8A%A1%E5%99%A8%E7%9A%84%E9%98%B6%E6%AE%B5%E7%8A%B6%E6%80%81%20*%2F%0Atypedef%20enum%20%7B%0A%09status_SERVER_CONNECTED%2C%2F**%20%E4%B8%8E%E5%AE%A2%E6%88%B7%E7%AB%AF%E5%BB%BA%E7%AB%8B%E8%BF%9E%E6%8E%A5%20*%2F%0A%09status_SERVER_CLOSING%20%2F**%20%E6%9C%8D%E5%8A%A1%E5%99%A8%E5%85%B3%E9%97%AD%E8%BF%9E%E6%8E%A5%20*%2F%0A%7D%20server_status_t%3B%0A%0A%2F**%20%E8%BF%9E%E6%8E%A5%E7%BB%93%E6%9E%84%20*%2F%0Astruct%20conn_st%20%7B%0A%09%2F**%20%E8%AF%A5%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%89%80%E5%B1%9E%E6%9C%8D%E5%8A%A1%E5%99%A8%20*%2F%0A%09server_t%20server%3B%0A%0A%09%2F**%20%E8%AF%A5%E8%BF%9E%E6%8E%A5%E7%9A%84%E5%BC%95%E7%94%A8%E8%AE%A1%E6%95%B0%20*%2F%0A%09int%20ref_count%3B%0A%0A%09%2F**%20%E7%94%A8%E6%88%B7%E6%9C%80%E5%90%8E%E4%B8%80%E6%AC%A1%E6%B4%BB%E5%8A%A8%E7%9A%84%E6%97%B6%E9%97%B4*%2F%0A%09time_t%20last_activity%3B%0A%0A%09%2F**%20%E5%AE%A2%E6%88%B7%E7%AB%AF%E5%BD%93%E5%89%8D%E7%9A%84%E7%8A%B6%E6%80%81%E9%98%B6%E6%AE%B5*%2F%0A%09client_status_t%20client_status%3B%0A%0A%09%2F**%20%E6%9C%8D%E5%8A%A1%E5%99%A8%E5%BD%93%E5%89%8D%E7%9A%84%E7%8A%B6%E6%80%81%E9%98%B6%E6%AE%B5*%2F%0A%09server_status_t%20server_status%3B%0A%0A%09%2F**%20socket%E6%8F%8F%E8%BF%B0%E7%AC%A6*%2F%0A%09int%20fd%3B%0A%0A%09%2F**%20%E6%95%B0%E6%8D%AE%E7%BC%93%E5%86%B2%EF%BC%8C%E5%BD%93%E6%9C%8D%E5%8A%A1%E5%99%A8%E7%AB%AF%E9%9C%80%E8%A6%81%E4%B8%8D%E6%96%AD%E7%9A%84%E5%8F%91%E9%80%81%E6%95%B0%E6%8D%AE%E7%BB%99%E5%AE%A2%E6%88%B7%E7%AB%AF%E6%97%B6%E8%A2%AB%E7%94%A8%E5%88%B0%20*%2F%0A%09char%20*remain_data%3B%0A%0A%09%2F**%20%E6%95%B0%E6%8D%AE%E7%BC%93%E5%86%B2%E5%89%A9%E4%BD%99%E6%95%B0%E6%8D%AE%E5%A4%A7%E5%B0%8F%20*%2F%0A%09int%20remain_data_size%3B%0A%0A%09%2F**%20%E4%B8%8B%E4%B8%80%E4%B8%AA%E8%BF%9E%E6%8E%A5%20*%2F%0A%09conn_t%20readreq_task_queue_next%3B%0A%0A%09%2F**%20%E4%B8%8B%E4%B8%80%E4%B8%AA%E8%BF%9E%E6%8E%A5%20*%2F%0A%09conn_t%20writedata_task_queue_next%3B%0A%7D%3B%0A%0A%2F**%20%E4%B8%BB%E5%87%BD%E6%95%B0%20*%2F%0Aint%20main(int%20argc%2C%20char%20**argv)%20%7B%0A%09int%20idx%2C%20conn_fd%3B%0A%09server_t%20server%3B%0A%0A%09server%20%3D%20must_calloc(1%2C%20sizeof(struct%20server_st))%3B%0A%09server-%3Eport%20%3D%20%E7%AB%AF%E5%8F%A3%E5%8F%B7%3B%0A%09server-%3Elisten_addrs.s_addr%20%3D%20INADDR_ANY%3B%0A%09server-%3Elisten_fd%20%3D%20%E6%9C%8D%E5%8A%A1%E5%99%A8%E7%9B%91%E5%90%AC(%26server-%3Elisten_addr%2Cserver-%3Eport)%3B%0A%09%E8%AE%BE%E7%BD%AE%E6%88%90%E9%9D%9E%E5%A0%B5%E5%A1%9E(server-%3Elisten_fd)%3B%0A%0A%09%2F*%20%E5%88%9B%E5%BB%BA5%E4%B8%AA%E8%B4%9F%E8%B4%A3%E8%AF%BB%E5%8F%96%E8%AF%B7%E6%B1%82%E7%9A%84%E4%BB%BB%E5%8A%A1%E7%BA%BF%E7%A8%8B%20*%2F%0A%09for%20(idx%20%3D%200%3B%20idx%20%3C%205%3B%20idx%2B%2B)%20%7B%0A%09%09task_queue_t%20task_queue%20%3D%20server-%3Ereadreq_task_queues%5Bidx%5D%3B%0A%0A%09%09bzero(task_queue%2Csizeof(struct%20task_queue_st))%3B%0A%09%09task_queue-%3Epending_conn_queue%20%3D%20queue_new()%3B%0A%09%09pthread_mutex_init(%26task_queue-%3Etask_queue_mutex%2CNULL)%3B%0A%0A%09%09pthread_create(%26task_queue-%3Ethread_tid%2C%20NULL%2C%20readreq_task_thread%2C%20task_queue)%3B%0A%09%7D%0A%0A%09%2F*%20%E5%88%9B%E5%BB%BA5%E4%B8%AA%E8%B4%9F%E8%B4%A3%E5%86%99%E6%95%B0%E6%8D%AE%E7%9A%84%E4%BB%BB%E5%8A%A1%E7%BA%BF%E7%A8%8B%20*%2F%0A%09for%20(idx%20%3D%200%3B%20idx%20%3C%205%3B%20idx%2B%2B)%20%7B%0A%09%09task_queue_t%20task_queue%20%3D%20server-%3Ewritedata_task_queues%5Bidx%5D%3B%0A%0A%09%09bzero(task_queue%2Csizeof(struct%20task_queue_st))%3B%0A%09%09task_queue-%3Epending_conn_queue%20%3D%20queue_new()%3B%0A%09%09pthread_mutex_init(%26task_queue-%3Etask_queue_mutex%2CNULL)%3B%0A%0A%09%09pthread_create(%26task_queue-%3Ethread_tid%2C%20NULL%2C%20writedata_task_thread%2C%20task_queue)%3B%0A%09%7D%0A%0A%09%2F*%20%E6%97%A0%E9%99%90%E5%BE%AA%E7%8E%AF%E7%9A%84%E7%9B%91%E5%90%AC%2C%E7%AD%89%E5%BE%85%E5%AE%A2%E6%88%B7%E7%AB%AF%E7%9A%84%E8%BF%9E%E6%8E%A5%E8%AF%B7%E6%B1%82%20*%2F%0A%09while%20(1)%20%7B%0A%09%09conn_fd%20%3D%20accept(server-%3Elisten_fd%2C%200%2C%200)%3B%0A%09%09if%20(conn_fd%20%3C%200)%20continue%3B%0A%0A%09%09%2F*%20%E5%92%8C%E4%B8%80%E4%B8%AA%E5%AE%A2%E6%88%B7%E7%AB%AF%E5%BB%BA%E7%AB%8B%E4%BA%86%E8%BF%9E%E6%8E%A5%EF%BC%8C%E4%B8%8B%E9%9D%A2%E5%BC%80%E5%A7%8B%E6%8A%8A%E8%AF%A5%E8%BF%9E%E6%8E%A5%E5%88%86%E9%85%8D%E7%BB%99%E8%B4%9F%E8%B4%A3%E8%AF%BB%E5%8F%96%E8%AF%B7%E6%B1%82%E7%9A%84%E4%BB%BB%E5%8A%A1%E7%BA%BF%E7%A8%8B*%2F%0A%09%09assign_to_readreq_task_thread(server%2Cconn_fd)%3B%0A%09%7D%0A%0A%09return%200%3B%0A%7D%0A%0A%0A%2F*%E5%88%86%E9%85%8D%E4%BB%BB%E5%8A%A1%2C%E5%8E%9F%E5%88%99%E4%B8%BA%E5%93%AA%E4%B8%AA%E7%BA%BF%E7%A8%8B%E7%9A%84%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%E6%9C%80%E5%B0%91%E5%B0%B1%E5%88%86%E9%85%8D%E7%BB%99%E5%93%AA%E4%B8%80%E4%B8%AA%E7%BA%BF%E7%A8%8B%E5%8E%BB%E5%A4%84%E7%90%86*%2F%0Astatic%20void%20assign_to_readreq_task_thread(server_t%20server%2C%20int%20conn_fd)%20%7B%0A%09int%20idx%3B%0A%09task_queue_t%20task_queue%2C%20min_task_queue%20%3D%20NULL%3B%0A%09conn_t%20conn%3B%0A%0A%09%2F*%20%E6%9F%A5%E6%89%BE%E8%BF%9E%E6%8E%A5%E6%95%B0%E6%9C%80%E5%B0%91%E7%9A%84%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%20*%2F%0A%09for%20(idx%20%3D%200%3B%20idx%20%3C%205%3B%20idx%2B%2B)%20%7B%0A%09%09task_queue%20%3D%20server-%3Ereadreq_task_queues%5Bidx%5D%3B%0A%0A%09%09if%20(min_task_queue%20%3D%3D%20NULL)%0A%09%09%09min_task_queue%20%3D%20task_queue%3B%0A%09%09else%20if%20(min_task_queue-%3Etotal_conns%20%3E%20task_queue-%3Etotal_conns)%0A%09%09%09min_task_queue%20%3D%20task_queue%3B%0A%09%7D%0A%0A%09if%20(min_task_queue)%20%7B%0A%09%09conn%20%3D%20must_calloc(1%2C%20sizeof(struct%20conn_st))%3B%0A%0A%09%09conn-%3Eserver%20%3D%20server%3B%0A%09%09conn-%3Eclient_status%20%3D%20status_ESTABLISHED%3B%0A%09%09conn-%3Eserver_status%20%3D%20status_SERVER_CONNECTED%3B%0A%09%09conn-%3Eref_count%2B%2B%3B%2F*%E7%B4%AF%E5%8A%A0%E5%BC%95%E7%94%A8%E8%AE%A1%E6%95%B0*%2F%0A%0A%09%09%2F*%E6%B7%BB%E5%8A%A0%E5%88%B0%E8%AF%A5%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%E4%B8%AD%E7%9A%84%E5%BE%85%E6%B7%BB%E5%8A%A0%E9%98%9F%E5%88%97%E5%B9%B6%E4%B8%94%E7%B4%AF%E5%8A%A0%E8%AF%A5%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%E9%87%8C%E7%9A%84%E8%BF%9E%E6%8E%A5%E6%95%B0*%2F%0A%09%09pthread_mutex_lock(%26min_task_queue-%3Etask_queue_mutex)%3B%0A%09%09queue_push(min_task_queue-%3Epending_conn_queue%2C%20conn%2C%201)%3B%0A%09%09min_task_queue-%3Etotal_conns%2B%2B%3B%0A%09%09pthread_mutex_unlock(%26min_task_queue-%3Etask_queue_mutex)%3B%0A%09%7D%0A%7D%0A%0A%2F**%0A*%E8%B4%9F%E8%B4%A3%E8%AF%BB%E5%8F%96%E8%AF%B7%E6%B1%82%E7%9A%84%E4%BB%BB%E5%8A%A1%E7%BA%BF%E7%A8%8B%E5%A4%84%E7%90%86%E4%B8%BB%E5%87%BD%E6%95%B0%EF%BC%8C%E8%B4%9F%E8%B4%A3%E7%AE%A1%E7%90%86%EF%BC%8C%E5%A4%84%E7%90%86%E8%87%AA%E5%B7%B1%E9%98%9F%E5%88%97%E4%B8%AD%E7%9A%84%E6%89%80%E6%9C%89%E8%BF%9E%E6%8E%A5%0A*%2F%0Astatic%20void%20*readreq_task_thread(void%20*arg)%20%7B%0A%09int%20epfd%20%3D%20epoll_create(20480)%3B%0A%09struct%20epoll_event%20ev%2Chappened_ev%5B128%5D%3B%0A%09task_queue_t%20task_queue%20%3D%20(task_queue_t)%20arg%3B%0A%09int%20ready%2Ci%3B%0A%0A%09while%20(1)%20%7B%0A%09%09time_t%20curtime%20%3D%20time(NULL)%3B%0A%09%09%2F*%E4%B8%8B%E9%9D%A2%E4%B8%89%E4%B8%AA%E5%8F%98%E9%87%8F%E9%81%8D%E5%8E%86%E9%93%BE%E8%A1%A8%E6%97%B6%E4%BD%BF%E7%94%A8%EF%BC%8Cconn%E6%98%AF%E5%BD%93%E5%89%8D%E8%BF%9E%E6%8E%A5%EF%BC%8Cprev_conn%E6%98%AF%E5%85%88%E5%89%8D%E7%9A%84%E8%BF%9E%E6%8E%A5%EF%BC%8Cdiscarded_conn%E6%98%AF%E4%B8%A2%E5%BC%83%E7%9A%84%E8%BF%9E%E6%8E%A5*%2F%0A%09%09conn_t%20conn%20%3D%20NULL%2C%20prev_conn%20%3D%20NULL%2C%20discarded_conn%3B%0A%0A%09%09%2F*%E6%8A%8A%E5%BE%85%E6%B7%BB%E5%8A%A0%E9%98%9F%E5%88%97%E9%87%8C%E7%9A%84%E8%BF%9E%E6%8E%A5%E6%B7%BB%E5%8A%A0%E5%88%B0%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%E4%B8%AD*%2F%0A%09%09pthread_mutex_lock(%26task_queue-%3Etask_queue_mutex)%3B%0A%09%09while%20((conn%20%3D%20queue_pop(task_queue-%3Epending_conn_queue)))%20%7B%0A%09%09%09if%20(!task_queue-%3Econn_head)%20%7B%0A%09%09%09%09task_queue-%3Econn_head%20%3D%20conn%3B%0A%09%09%09%7D%20else%20%7B%0A%09%09%09%09task_queue-%3Econn_tail-%3Ereadreq_task_queue_next%20%3D%20conn%3B%0A%09%09%09%7D%0A%09%09%09task_queue-%3Econn_tail%20%3D%20conn%3B%0A%09%09%7D%0A%09%09pthread_mutex_unlock(%26task_queue-%3Etask_queue_mutex)%3B%0A%0A%09%09if%20(task_queue-%3Etotal_conns%20%3D%3D%200)%20%7B%0A%09%09%09thread_sleep(1)%3B%0A%09%09%09continue%3B%0A%09%09%7D%0A%0A%09%09conn%20%3D%20task_queue-%3Econn_head%3B%0A%09%09while%20(conn)%20%7B%0A%09%09%09%2F%2F%E6%B8%85%E7%90%86%E5%B7%B2%E7%BB%8F%E5%85%B3%E9%97%AD%E7%9A%84%E8%BF%9E%E6%8E%A5%2C%E6%8A%8A%E5%B7%B2%E7%BB%8F%E5%85%B3%E9%97%AD%E7%9A%84%E8%BF%9E%E6%8E%A5%E5%92%8C%E8%B6%85%E6%97%B6%E7%9A%84%E8%BF%9E%E6%8E%A5%E4%BB%8E%E9%93%BE%E8%A1%A8%E4%B8%AD%E5%88%A0%E9%99%A4%E6%8E%89%0A%09%09%09boolean%20conn_timeout%20%3D%20conn-%3Elast_activity%20%3E%200%20%26%26%20curtime%20-%20conn-%3Elast_activity%20%3E%20CLIENT_TIMEOUT%3B%0A%0A%09%09%09if%20((conn-%3Eclient_status%20%3D%3D%20status_CLOSED%20%7C%7C%20conn-%3Eserver_status%20%3D%3D%20status_SERVER_CLOSING%20%7C%7C%20conn_timeout)%20%26%26%20conn-%3Eref_count%20%3D%3D%201)%20%7B%0A%09%09%09%09task_queue-%3Etotal_conns--%3B%0A%0A%09%09%09%09discarded_conn%20%3D%20conn%3B%2F%2F%E5%85%88%E6%94%BE%E5%85%A5%E4%B8%B4%E6%97%B6%E5%8F%98%E9%87%8F%E4%B8%AD%0A%0A%09%09%09%09%2F%2F%E4%BB%8E%E9%93%BE%E8%A1%A8%E4%B8%AD%E5%88%A0%E9%99%A4%0A%09%09%09%09if%20(prev_conn)%20%7B%0A%09%09%09%09%09if%20(!conn-%3Ereadreq_task_queue_next)%20%7B%0A%09%09%09%09%09%09task_queue-%3Econn_tail%20%3D%20prev_conn%3B%0A%09%09%09%09%09%09prev_conn-%3Ereadreq_task_queue_next%20%3D%20NULL%3B%0A%09%09%09%09%09%7D%20else%20%7B%0A%09%09%09%09%09%09prev_conn-%3Ereadreq_task_queue_next%20%3D%20conn-%3Ereadreq_task_queue_next%3B%0A%09%09%09%09%09%7D%0A%09%09%09%09%7D%20else%20%7B%0A%09%09%09%09%09task_queue-%3Econn_head%20%3D%20conn-%3Ereadreq_task_queue_next%3B%0A%09%09%09%09%7D%0A%09%09%09%09conn%20%3D%20conn-%3Ereadreq_task_queue_next%3B%0A%0A%09%09%09%09close(discarded_conn-%3Efd)%3B%2F*%E7%9C%9F%E6%AD%A3%E5%85%B3%E9%97%AD%E8%BF%9E%E6%8E%A5%E7%9A%84%E5%9C%B0%E6%96%B9*%2F%0A%09%09%09%09free(discarded_conn)%3B%0A%09%09%09%7D%20else%20%7B%0A%09%09%09%09ev.data.fd%20%3D%20conn-%3Efd%3B%0A%09%09%09%09ev.data.ptr%20%3D%20conn%3B%0A%09%09%09%09%2F*%E5%8F%AA%E8%B4%9F%E8%B4%A3%E7%9B%91%E8%A7%86%E8%AF%BB%E4%BA%8B%E4%BB%B6%E5%92%8C%E5%87%BA%E9%94%99%E4%BA%8B%E4%BB%B6*%2F%0A%09%09%09%09ev.events%20%3D%20EPOLLIN%20%7C%20EPOLLET%3B%0A%09%09%09%09if(conn-%3Eclient_status%20%3D%3D%20status_ESTABLISHED)%20%7B%0A%09%09%09%09%09%E8%AE%BE%E7%BD%AE%E8%B6%85%E6%97%B6%E6%97%B6%E9%97%B4%E6%88%96%E8%80%85%E8%AE%BE%E7%BD%AE%E9%9D%9E%E5%A0%B5%E5%A1%9E(conn-%3Efd)%3B%2F*%E4%B8%A4%E8%80%85%E5%BF%85%E9%A1%BB%E9%80%89%E4%B8%80%E4%B8%AA*%2F%0A%09%09%09%09%09epoll_ctl(epfd%2CEPOLL_CTL_ADD%2Cconn-%3Efd%2C%26ev)%3B%0A%09%09%09%09%09conn-%3Eclient_status%20%3D%20status_CONNECTED%3B%0A%09%09%09%09%7D%20else%20%7B%0A%09%09%09%09%09epoll_ctl(epfd%2CEPOLL_CTL_MOD%2Cconn-%3Efd%2C%26ev)%3B%0A%09%09%09%09%7D%0A%09%09%09%09prev_conn%20%3D%20conn%3B%0A%09%09%09%09conn%20%3D%20conn-%3Ereadreq_task_queue_next%3B%0A%09%09%09%7D%0A%09%09%7D%0A%0A%09%09%2F**%20%E5%A0%B5%E5%A1%9E%E7%AD%89%E5%BE%85%EF%BC%8C%E8%B6%85%E6%97%B6%E6%97%B6%E9%97%B4%E4%B8%BAPOLL_TIMEOUT%E6%AF%AB%E7%A7%92*%2F%0A%09%09if%20((ready%20%3D%20epoll_wait(epfd%2C%20happened_ev%2Csizeof(happened_ev)%20%2F%20sizeof(struct%20epoll_event)%2C%20POLL_TIMEOUT))%20%3C%200)%0A%09%09%09continue%3B%0A%0A%09%09%2F**%0A%09%09*%20%E5%BE%AA%E7%8E%AF%E6%9F%A5%E6%89%BE%E5%90%91%E6%88%91%E4%BB%AC%E5%8F%91%E9%80%81%E8%AF%B7%E6%B1%82%E7%9A%84%E5%AE%A2%E6%88%B7%E7%AB%AF%E7%9A%84%E8%BF%9E%E6%8E%A5%0A%09%09*%2F%0A%09%09for%20(i%20%3D%200%3B%20i%20%3C%20ready%3B%20i%2B%2B)%20%7B%0A%09%09%09if%20(happened_ev%5Bi%5D.events%20%26%20POLLIN)%20%7B%0A%09%09%09%09conn%20%3D%20(conn_t)%20happened_ev%5Bi%5D.data.ptr%3B%0A%0A%09%09%09%09conn-%3Elast_activity%20%3D%20time(NULL)%3B%2F*%20%E8%AE%B0%E5%BD%95%E5%AE%A2%E6%88%B7%E7%AB%AF%E7%9A%84%E6%9C%80%E5%90%8E%E6%B4%BB%E5%8A%A8%E6%97%B6%E9%97%B4*%2F%0A%0A%09%09%09%09proc_protocol(conn)%3B%2F*%E5%A4%84%E7%90%86%E5%8D%8F%E8%AE%AE%E5%87%BD%E6%95%B0*%2F%0A%09%09%09%7D%20else%20if(happened_ev%5Bi%5D.events%20%26%20EPOLLHUP)%20%7B%0A%09%09%09%09conn%20%3D%20(conn_t)%20happened_ev%5Bi%5D.data.ptr%3B%0A%0A%09%09%09%09conn-%3Eclient_status%20%3D%20status_CLOSED%3B%0A%09%09%09%7D%0A%09%09%7D%0A%09%7D%0A%7D%0A%0A%2F*%20%E5%8D%8F%E8%AE%AE%E5%A4%84%E7%90%86%E5%87%BD%E6%95%B0%20*%2F%0Avoid%20proc_protocol(conn_t%20conn)%20%7B%0A%09%E6%9F%90%E4%B8%80%E6%97%B6%E5%88%BB%E5%AE%A2%E6%88%B7%E7%AB%AF%E8%A6%81%E6%B1%82%E6%9C%8D%E5%8A%A1%E5%99%A8%E7%AB%AF%E4%B8%8D%E6%96%AD%E5%8F%91%E9%80%81%E6%95%B0%E6%8D%AE%E7%BB%99%E5%AE%A2%E6%88%B7%E7%AB%AF%EF%BC%8C%E7%84%B6%E5%90%8E%E8%AE%BE%E7%BD%AE%E4%BA%86conn-%3Eclient_status%20%3D%20status_START_RECV_DATA%2C%E5%B9%B6%E4%B8%94%E8%B0%83%E7%94%A8%E4%BA%86%E4%B8%8B%E9%9D%A2%E7%9A%84move_to_writedata_task_thread(conn)%3B%0A%7D%0A%0A%2F*%E5%88%86%E9%85%8D%E4%BB%BB%E5%8A%A1%2C%E5%8E%9F%E5%88%99%E4%B8%BA%E5%93%AA%E4%B8%AA%E7%BA%BF%E7%A8%8B%E7%9A%84%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%E6%9C%80%E5%B0%91%E5%B0%B1%E5%88%86%E9%85%8D%E7%BB%99%E5%93%AA%E4%B8%80%E4%B8%AA%E7%BA%BF%E7%A8%8B%E5%8E%BB%E5%A4%84%E7%90%86*%2F%0Astatic%20void%20move_to_writedata_task_thread(conn_t%20conn)%20%7B%0A%09server_t%20server%20%3D%20conn-%3Eserver%3B%0A%09task_queue_t%20task_queue%2C%20min_task_queue%3D%20NULL%3B%0A%09int%20idx%3B%0A%0A%09%2F*%20%E6%9F%A5%E6%89%BE%E8%BF%9E%E6%8E%A5%E6%95%B0%E6%9C%80%E5%B0%91%E7%9A%84%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%20*%2F%0A%09for%20(idx%20%3D%200%3B%20idx%20%3C%205%3B%20idx%2B%2B)%20%7B%0A%09%09task_queue%20%3D%20server-%3Ewritedata_task_queues%5Bidx%5D%3B%0A%0A%09%09if%20(min_task_queue%20%3D%3D%20NULL)%0A%09%09%09min_task_queue%20%3D%20task_queue%3B%0A%09%09else%20if%20(min_task_queue-%3Etotal_conns%20%3E%20task_queue-%3Etotal_conns)%0A%09%09%09min_task_queue%20%3D%20task_queue%3B%0A%09%7D%0A%0A%09if%20(min_task_queue)%20%7B%0A%09%09conn-%3Eref_count%2B%2B%3B%2F*%E7%B4%AF%E5%8A%A0%E5%BC%95%E7%94%A8%E8%AE%A1%E6%95%B0*%2F%0A%0A%09%09%2F*%E6%B7%BB%E5%8A%A0%E5%88%B0%E8%AF%A5%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%E4%B8%AD%E7%9A%84%E5%BE%85%E6%B7%BB%E5%8A%A0%E9%98%9F%E5%88%97%E5%B9%B6%E4%B8%94%E7%B4%AF%E5%8A%A0%E8%AF%A5%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%E9%87%8C%E7%9A%84%E8%BF%9E%E6%8E%A5%E6%95%B0*%2F%0A%09%09pthread_mutex_lock(%26min_task_queue-%3Etask_queue_mutex)%3B%0A%09%09queue_push(min_task_queue-%3Epending_conn_queue%2C%20conn%2C%201)%3B%0A%09%09min_task_queue-%3Etotal_conns%2B%2B%3B%0A%09%09pthread_mutex_unlock(%26min_task_queue-%3Etask_queue_mutex)%3B%0A%09%7D%0A%7D%0A%0A%2F**%0A*%E8%B4%9F%E8%B4%A3%E5%86%99%E6%95%B0%E6%8D%AE%E7%9A%84%E7%BA%BF%E7%A8%8B%E5%A4%84%E7%90%86%E4%B8%BB%E5%87%BD%E6%95%B0%EF%BC%8C%E8%B4%9F%E8%B4%A3%E7%AE%A1%E7%90%86%EF%BC%8C%E5%A4%84%E7%90%86%E8%87%AA%E5%B7%B1%E9%98%9F%E5%88%97%E4%B8%AD%E7%9A%84%E6%89%80%E6%9C%89%E8%BF%9E%E6%8E%A5%0A*%2F%0Astatic%20void%20*writedata_task_thread(void%20*arg)%20%7B%0A%09task_queue_t%20task_queue%20%3D%20(task_queue_t)%20arg%3B%0A%09int%20epfd%20%3D%20epoll_create(20480)%3B%0A%09struct%20epoll_event%20ev%2Chappened_ev%5B128%5D%3B%0A%09int%20ready%2Ci%2Cnwrite%2Cnread%2Csndbuf_size%20%3D%2010240%3B%0A%09char%20buf%5Bsndbuf_size%5D%3B%0A%0A%09while%20(1)%20%7B%0A%09%09time_t%20curtime%20%3D%20time(NULL)%3B%0A%09%09%2F*%E4%B8%8B%E9%9D%A2%E4%B8%89%E4%B8%AA%E5%8F%98%E9%87%8F%E9%81%8D%E5%8E%86%E9%93%BE%E8%A1%A8%E6%97%B6%E4%BD%BF%E7%94%A8%EF%BC%8Cconn%E6%98%AF%E5%BD%93%E5%89%8D%E8%BF%9E%E6%8E%A5%EF%BC%8Cprev_conn%E6%98%AF%E5%85%88%E5%89%8D%E7%9A%84%E8%BF%9E%E6%8E%A5%EF%BC%8Cdiscarded_conn%E6%98%AF%E4%B8%A2%E5%BC%83%E7%9A%84%E8%BF%9E%E6%8E%A5*%2F%0A%09%09conn_t%20conn%20%3D%20NULL%2C%20prev_conn%20%3D%20NULL%2C%20discarded_conn%3B%0A%0A%09%09%2F*%E6%8A%8A%E5%BE%85%E6%B7%BB%E5%8A%A0%E9%98%9F%E5%88%97%E9%87%8C%E7%9A%84%E8%BF%9E%E6%8E%A5%E6%B7%BB%E5%8A%A0%E5%88%B0%E4%BB%BB%E5%8A%A1%E9%98%9F%E5%88%97%E4%B8%AD*%2F%0A%09%09pthread_mutex_lock(%26task_queue-%3Etask_queue_mutex)%3B%0A%09%09while%20((conn%20%3D%20queue_pop(task_queue-%3Epending_conn_queue)))%20%7B%0A%09%09%09if%20(!task_queue-%3Econn_head)%20%7B%0A%09%09%09%09task_queue-%3Econn_head%20%3D%20conn%3B%0A%09%09%09%7D%20else%20%7B%0A%09%09%09%09task_queue-%3Econn_tail-%3Ewritedata_task_queue_next%20%3D%20conn%3B%0A%09%09%09%7D%0A%09%09%09task_queue-%3Econn_tail%20%3D%20conn%3B%0A%09%09%7D%0A%09%09pthread_mutex_unlock(%26task_queue-%3Etask_queue_mutex)%3B%0A%0A%09%09if%20(task_queue-%3Etotal_conns%20%3D%3D%200)%20%7B%0A%09%09%09thread_sleep(1)%3B%0A%09%09%09continue%3B%0A%09%09%7D%0A%0A%09%09conn%20%3D%20task_queue-%3Econn_head%3B%0A%09%09while%20(conn)%20%7B%0A%09%09%09%2F%2F%E6%B8%85%E7%90%86%E5%B7%B2%E7%BB%8F%E5%85%B3%E9%97%AD%E7%9A%84%E8%BF%9E%E6%8E%A5%2C%E6%8A%8A%E5%B7%B2%E7%BB%8F%E5%85%B3%E9%97%AD%E7%9A%84%E8%BF%9E%E6%8E%A5%E5%92%8C%E8%B6%85%E6%97%B6%E7%9A%84%E8%BF%9E%E6%8E%A5%E4%BB%8E%E9%93%BE%E8%A1%A8%E4%B8%AD%E5%88%A0%E9%99%A4%E6%8E%89%0A%09%09%09boolean%20conn_timeout%20%3D%20conn-%3Elast_activity%20%3E%200%20%26%26%20curtime%20-%20conn-%3Elast_activity%20%3E%20CLIENT_TIMEOUT%3B%0A%0A%09%09%09if%20(conn-%3Eclient_status%20%3D%3D%20status_CLOSED%20%7C%7C%20conn-%3Eserver_status%20%3D%3D%20status_SERVER_CLOSING%20%7C%7C%20conn_timeout)%20%7B%0A%09%09%09%09task_queue-%3Etotal_conns--%3B%0A%0A%09%09%09%09discarded_conn%20%3D%20conn%3B%2F%2F%E6%94%BE%E5%85%A5%E4%B8%B4%E6%97%B6%E5%8F%98%E9%87%8F%E4%B8%AD%0A%0A%09%09%09%09%2F%2F%E4%BB%8E%E9%93%BE%E8%A1%A8%E4%B8%AD%E5%88%A0%E9%99%A4%0A%09%09%09%09if%20(prev_conn)%20%7B%0A%09%09%09%09%09if%20(!conn-%3Ewritedata_task_queue_next)%20%7B%0A%09%09%09%09%09%09task_queue-%3Econn_tail%20%3D%20prev_conn%3B%0A%09%09%09%09%09%09prev_conn-%3Ewritedata_task_queue_next%20%3D%20NULL%3B%0A%09%09%09%09%09%7D%20else%20%7B%0A%09%09%09%09%09%09prev_conn-%3Ewritedata_task_queue_next%20%3D%20conn-%3Ewritedata_task_queue_next%3B%0A%09%09%09%09%09%7D%0A%09%09%09%09%7D%20else%20%7B%0A%09%09%09%09%09task_queue-%3Econn_head%20%3D%20conn-%3Ewritedata_task_queue_next%3B%0A%09%09%09%09%7D%0A%09%09%09%09conn%20%3D%20conn-%3Ewritedata_task_queue_next%3B%0A%0A%09%09%09%09discarded_conn-%3Eref_count--%3B%2F*%E5%BC%95%E7%94%A8%E8%AE%A1%E6%95%B0%E5%87%8F%E4%B8%80%EF%BC%8C%E5%B9%B6%E4%B8%94%E8%AF%A5%E8%BF%9E%E6%8E%A5%E5%B7%B2%E7%BB%8F%E5%85%B3%E9%97%AD%E4%BA%86%EF%BC%8C%E9%82%A3%E4%B9%88%E7%9C%9F%E6%AD%A3%E5%85%B3%E9%97%AD%E8%BF%9E%E6%8E%A5%E7%9A%84%E4%BB%BB%E5%8A%A1%E5%B0%B1%E4%BA%A4%E7%BB%99readreq_task_thread%E5%87%BD%E6%95%B0%E4%BA%86*%2F%0A%09%09%09%7D%20else%20%7B%0A%09%09%09%09ev.data.fd%20%3D%20conn-%3Efd%3B%0A%09%09%09%09ev.data.ptr%20%3D%20conn%3B%0A%09%09%09%09%2F*%E5%8F%AA%E8%B4%9F%E8%B4%A3%E7%9B%91%E8%A7%86%E5%86%99%E4%BA%8B%E4%BB%B6%E5%92%8C%E5%87%BA%E9%94%99%E4%BA%8B%E4%BB%B6*%2F%0A%09%09%09%09ev.events%20%3D%20EPOLLOUT%20%7C%20EPOLLET%3B%0A%09%09%09%09if(conn-%3Eclient_status%20%3D%3D%20status_START_RECV_DATA)%20%7B%0A%09%09%09%09%09setsockopt(conn-%3Efd%2C%20SOL_SOCKET%2C%20SO_SNDBUF%2C%20(char%20*)%26sndbuf_size%2C%20sizeof(int))%3B%0A%09%09%09%09%09epoll_ctl(epfd%2CEPOLL_CTL_ADD%2Cconn-%3Efd%2C%26ev)%3B%0A%09%09%09%09%09conn-%3Eclient_status%20%3D%20status_CONNECTED%3B%0A%09%09%09%09%7D%20else%20%7B%0A%09%09%09%09%09epoll_ctl(epfd%2CEPOLL_CTL_MOD%2Cconn-%3Efd%2C%26ev)%3B%0A%09%09%09%09%7D%0A%09%09%09%09prev_conn%20%3D%20conn%3B%0A%09%09%09%09conn%20%3D%20conn-%3Ewritedata_task_queue_next%3B%0A%09%09%09%7D%0A%09%09%7D%0A%0A%09%09%2F**%E5%A0%B5%E5%A1%9E%E7%AD%89%E5%BE%85%EF%BC%8C%E8%B6%85%E6%97%B6%E6%97%B6%E9%97%B4%E4%B8%BAPOLL_TIMEOUT%E6%AF%AB%E7%A7%92*%2F%0A%09%09if%20((ready%20%3D%20epoll_wait(epfd%2C%20happened_ev%2Csizeof(happened_ev)%20%2F%20sizeof(struct%20epoll_event)%2C%20POLL_TIMEOUT))%20%3C%200)%0A%09%09%09continue%3B%0A%0A%09%09%2F**%0A%09%09*%20%E5%BE%AA%E7%8E%AF%E6%9F%A5%E6%89%BE%E5%90%91%E6%88%91%E4%BB%AC%E5%8F%91%E9%80%81%E8%AF%B7%E6%B1%82%E7%9A%84%E5%AE%A2%E6%88%B7%E7%AB%AF%E7%9A%84%E8%BF%9E%E6%8E%A5%0A%09%09*%2F%0A%09%09for%20(i%20%3D%200%3B%20i%20%3C%20ready%3B%20i%2B%2B)%20%7B%0A%09%09%09if%20(happened_ev%5Bi%5D.events%20%26%20EPOLLOUT)%20%7B%0A%09%09%09%09conn%20%3D%20(conn_t)%20happened_ev%5Bi%5D.data.ptr%3B%0A%0A%09%09%09%09conn-%3Elast_activity%20%3D%20time(NULL)%3B%2F*%20%E8%AE%B0%E5%BD%95%E5%AE%A2%E6%88%B7%E7%AB%AF%E7%9A%84%E6%9C%80%E5%90%8E%E6%B4%BB%E5%8A%A8%E6%97%B6%E9%97%B4*%2F%0A%09%09%09%09if(conn-%3Eremain_data_size%20%3D%3D%200)%20%7B%2F*%E5%A6%82%E6%9E%9C%E6%95%B0%E6%8D%AE%E7%BC%93%E5%AD%98%E5%B7%B2%E7%BB%8F%E6%B2%A1%E6%9C%89%E4%BB%BB%E4%BD%95%E6%95%B0%E6%8D%AE%E4%BA%86%E7%9A%84%E8%AF%9D*%2F%0A%09%09%09%09%09if((nread%20%3D%20read(%E6%96%87%E4%BB%B6%E6%8F%8F%E8%BF%B0%E7%AC%A6%2Cbuf%2Csizeof(buf)))%20%3E%200)%20%7B%2F*%E4%BB%8E%E7%A1%AC%E7%9B%98%E8%AF%BB%E6%95%B0%E6%8D%AE%E5%87%BA%E6%9D%A5%E5%88%B0%E6%95%B0%E6%8D%AE%E7%BC%93%E5%AD%98*%2F%0A%09%09%09%09%09%09if((nwrite%20%3D%20write(conn-%3Efd%2Cbuf%2Cnread))%20%3C%200%20%26%26%20errno%20!%3D%20EINTR)%20%7B%0A%09%09%09%09%09%09%09conn-%3Eclient_status%20%3D%20status_CLOSED%3B%0A%09%09%09%09%09%09%09continue%3B%0A%09%09%09%09%09%09%7D%0A%0A%09%09%09%09%09%09if(nwrite%20%3C%20nread)%20%7B%0A%09%09%09%09%09%09%09conn-%3Eremain_data_size%20%3D%20nread%20-%20nwrite%3B%0A%09%09%09%09%09%09%09conn-%3Eremain_data%20%3D%20must_malloc(conn-%3Eremain_data_size)%3B%0A%09%09%09%09%09%09%09memcpy(conn-%3Eremain_data%2Cbuf%20%2B%20nwrite%2C%20conn-%3Eremain_data_size)%3B%0A%09%09%09%09%09%09%7D%0A%09%09%09%09%09%7D%20else%20if(nread%20%3D%3D%200)%20%7B%0A%09%09%09%09%09%09conn-%3Eserver_status%20%3D%20status_SERVER_CLOSING%3B%0A%09%09%09%09%09%7D%0A%09%09%09%09%7D%20else%20%7B%0A%09%09%09%09%09%2F*%E5%86%99%E6%95%B0%E6%8D%AE%E7%BC%93%E5%AD%98%E9%87%8C%E7%9A%84%E6%95%B0%E6%8D%AE%E5%88%B0%E5%AE%A2%E6%88%B7%E7%AB%AF*%2F%0A%09%09%09%09%09if((nwrite%20%3D%20write(conn-%3Efd%2Cconn-%3Eremain_data%2Cconn-%3Eremain_data_size))%20%3C%200%20%26%26%20errno%20!%3D%20EINTR)%20%7B%0A%09%09%09%09%09%09conn-%3Eclient_status%20%3D%20status_CLOSED%3B%0A%09%09%09%09%09%09free(conn-%3Eremain_data)%3B%0A%09%09%09%09%09%09continue%3B%0A%09%09%09%09%09%7D%0A%0A%09%09%09%09%09if(nwrite%20%3C%20conn-%3Eremain_data_size)%20%7B%0A%09%09%09%09%09%09conn-%3Eremain_data_size%20%3D%20conn-%3Eremain_data_size%20-%20nwrite%3B%0A%09%09%09%09%09%09memcpy(buf%2C%20conn-%3Eremain_data%20%2B%20nwrite%2C%20conn-%3Eremain_data_size)%3B%0A%09%09%09%09%09%09conn-%3Eremain_data%20%3D%20must_realloc(conn-%3Eremain_data%2C%20conn-%3Eremain_data_size)%3B%0A%09%09%09%09%09%09memcpy(conn-%3Eremain_data%2C%20buf%2C%20conn-%3Eremain_data_size)%3B%0A%09%09%09%09%09%7D%20else%20%7B%0A%09%09%09%09%09%09free(conn-%3Eremain_data)%3B%0A%09%09%09%09%09%09conn-%3Eremain_data%20%3D%20NULL%3B%0A%09%09%09%09%09%09conn-%3Eremain_data_size%20%3D%200%3B%0A%09%09%09%09%09%7D%0A%09%09%09%09%7D%0A%09%09%09%7D%20else%20if(happened_ev%5Bi%5D.events%20%26%20EPOLLHUP)%20%7B%0A%09%09%09%09conn%20%3D%20(conn_t)%20happened_ev%5Bi%5D.data.ptr%3B%0A%0A%09%09%09%09conn-%3Eclient_status%20%3D%20status_CLOSED%3B%0A%09%09%09%7D%0A%09%09%7D%0A%09%7D%0A%7D"> typedef struct server_st *server_t; typedef struct task_queue_st *task_queue_t; typedef struct conn_st *conn_t; /** 服务器结构 */ struct server_st { /** 监听地址 */ struct in_addr listen_addr; /** 监听端口 */ int port; /** 监听描述符 */ int listen_fd; /**负责读取请求的任务队列,一个线程负责处理一个任务队列,一共五个*/ queue_t readreq_task_queues[5]; /**负责写数据的任务队列,一个线程负责处理一个任务队列,一共五个*/ queue_t writedata_task_queues[5]; }; /** 任务队列结构 */ struct task_queue_st { /** 待添加的连接 */ queue_t pending_conn_queue; /** 互斥锁,用于多线程环境下对任务队列的访问控制 */ pthread_mutex_t task_queue_mutex; /** 链表结构,保存任务队列中的所有连接 */ conn_t conn_head; /** 链表的最后一个节点,用于方便在结尾插入新的节点 */ conn_t conn_tail; /** 总连接数*/ int total_conns; }; /** 用户的阶段状态 */ typedef enum { status_ESTABLISHED,/** 客户端刚建立连接 */ status_CONNECTED,/** 客户端正在连接 */ status_START_RECV_DATA,/** 客户端开始接收数据,即表示服务器端开始发送数据 */ status_CLOSED/** 客户端关闭连接 */ } client_status_t; /** 服务器的阶段状态 */ typedef enum { status_SERVER_CONNECTED,/** 与客户端建立连接 */ status_SERVER_CLOSING /** 服务器关闭连接 */ } server_status_t; /** 连接结构 */ struct conn_st { /** 该连接的所属服务器 */ server_t server; /** 该连接的引用计数 */ int ref_count; /** 用户最后一次活动的时间*/ time_t last_activity; /** 客户端当前的状态阶段*/ client_status_t client_status; /** 服务器当前的状态阶段*/ server_status_t server_status; /** socket描述符*/ int fd; /** 数据缓冲,当服务器端需要不断的发送数据给客户端时被用到 */ char *remain_data; /** 数据缓冲剩余数据大小 */ int remain_data_size; /** 下一个连接 */ conn_t readreq_task_queue_next; /** 下一个连接 */ conn_t writedata_task_queue_next; }; /** 主函数 */ int main(int argc, char **argv) { int idx, conn_fd; server_t server; server = must_calloc(1, sizeof(struct server_st)); server->port = 端口号; server->listen_addrs.s_addr = INADDR_ANY; server->listen_fd = 服务器监听(&server->listen_addr,server->port); 设置成非堵塞(server->listen_fd); /* 创建5个负责读取请求的任务线程 */ for (idx = 0; idx < 5; idx++) { task_queue_t task_queue = server->readreq_task_queues[idx]; bzero(task_queue,sizeof(struct task_queue_st)); task_queue->pending_conn_queue = queue_new(); pthread_mutex_init(&task_queue->task_queue_mutex,NULL); pthread_create(&task_queue->thread_tid, NULL, readreq_task_thread, task_queue); } /* 创建5个负责写数据的任务线程 */ for (idx = 0; idx < 5; idx++) { task_queue_t task_queue = server->writedata_task_queues[idx]; bzero(task_queue,sizeof(struct task_queue_st)); task_queue->pending_conn_queue = queue_new(); pthread_mutex_init(&task_queue->task_queue_mutex,NULL); pthread_create(&task_queue->thread_tid, NULL, writedata_task_thread, task_queue); } /* 无限循环的监听,等待客户端的连接请求 */ while (1) { conn_fd = accept(server->listen_fd, 0, 0); if (conn_fd < 0) continue; /* 和一个客户端建立了连接,下面开始把该连接分配给负责读取请求的任务线程*/ assign_to_readreq_task_thread(server,conn_fd); } return 0; } /*分配任务,原则为哪个线程的任务队列最少就分配给哪一个线程去处理*/ static void assign_to_readreq_task_thread(server_t server, int conn_fd) { int idx; task_queue_t task_queue, min_task_queue = NULL; conn_t conn; /* 查找连接数最少的任务队列 */ for (idx = 0; idx < 5; idx++) { task_queue = server->readreq_task_queues[idx]; if (min_task_queue == NULL) min_task_queue = task_queue; else if (min_task_queue->total_conns > task_queue->total_conns) min_task_queue = task_queue; } if (min_task_queue) { conn = must_calloc(1, sizeof(struct conn_st)); conn->server = server; conn->client_status = status_ESTABLISHED; conn->server_status = status_SERVER_CONNECTED; conn->ref_count++;/*累加引用计数*/ /*添加到该任务队列中的待添加队列并且累加该任务队列里的连接数*/ pthread_mutex_lock(&min_task_queue->task_queue_mutex); queue_push(min_task_queue->pending_conn_queue, conn, 1); min_task_queue->total_conns++; pthread_mutex_unlock(&min_task_queue->task_queue_mutex); } } /** *负责读取请求的任务线程处理主函数,负责管理,处理自己队列中的所有连接 */ static void *readreq_task_thread(void *arg) { int epfd = epoll_create(20480); struct epoll_event ev,happened_ev[128]; task_queue_t task_queue = (task_queue_t) arg; int ready,i; while (1) { time_t curtime = time(NULL); /*下面三个变量遍历链表时使用,conn是当前连接,prev_conn是先前的连接,discarded_conn是丢弃的连接*/ conn_t conn = NULL, prev_conn = NULL, discarded_conn; /*把待添加队列里的连接添加到任务队列中*/ pthread_mutex_lock(&task_queue->task_queue_mutex); while ((conn = queue_pop(task_queue->pending_conn_queue))) { if (!task_queue->conn_head) { task_queue->conn_head = conn; } else { task_queue->conn_tail->readreq_task_queue_next = conn; } task_queue->conn_tail = conn; } pthread_mutex_unlock(&task_queue->task_queue_mutex); if (task_queue->total_conns == 0) { thread_sleep(1); continue; } conn = task_queue->conn_head; while (conn) { //清理已经关闭的连接,把已经关闭的连接和超时的连接从链表中删除掉 boolean conn_timeout = conn->last_activity > 0 && curtime - conn->last_activity > CLIENT_TIMEOUT; if ((conn->client_status == status_CLOSED || conn->server_status == status_SERVER_CLOSING || conn_timeout) && conn->ref_count == 1) { task_queue->total_conns--; discarded_conn = conn;//先放入临时变量中 //从链表中删除 if (prev_conn) { if (!conn->readreq_task_queue_next) { task_queue->conn_tail = prev_conn; prev_conn->readreq_task_queue_next = NULL; } else { prev_conn->readreq_task_queue_next = conn->readreq_task_queue_next; } } else { task_queue->conn_head = conn->readreq_task_queue_next; } conn = conn->readreq_task_queue_next; close(discarded_conn->fd);/*真正关闭连接的地方*/ free(discarded_conn); } else { ev.data.fd = conn->fd; ev.data.ptr = conn; /*只负责监视读事件和出错事件*/ ev.events = EPOLLIN | EPOLLET; if(conn->client_status == status_ESTABLISHED) { 设置超时时间或者设置非堵塞(conn->fd);/*两者必须选一个*/ epoll_ctl(epfd,EPOLL_CTL_ADD,conn->fd,&ev); conn->client_status = status_CONNECTED; } else { epoll_ctl(epfd,EPOLL_CTL_MOD,conn->fd,&ev); } prev_conn = conn; conn = conn->readreq_task_queue_next; } } /** 堵塞等待,超时时间为POLL_TIMEOUT毫秒*/ if ((ready = epoll_wait(epfd, happened_ev,sizeof(happened_ev) / sizeof(struct epoll_event), POLL_TIMEOUT)) < 0) continue; /** * 循环查找向我们发送请求的客户端的连接 */ for (i = 0; i < ready; i++) { if (happened_ev[i].events & POLLIN) { conn = (conn_t) happened_ev[i].data.ptr; conn->last_activity = time(NULL);/* 记录客户端的最后活动时间*/ proc_protocol(conn);/*处理协议函数*/ } else if(happened_ev[i].events & EPOLLHUP) { conn = (conn_t) happened_ev[i].data.ptr; conn->client_status = status_CLOSED; } } } } /* 协议处理函数 */ void proc_protocol(conn_t conn) { 某一时刻客户端要求服务器端不断发送数据给客户端,然后设置了conn->client_status = status_START_RECV_DATA,并且调用了下面的move_to_writedata_task_thread(conn); } /*分配任务,原则为哪个线程的任务队列最少就分配给哪一个线程去处理*/ static void move_to_writedata_task_thread(conn_t conn) { server_t server = conn->server; task_queue_t task_queue, min_task_queue= NULL; int idx; /* 查找连接数最少的任务队列 */ for (idx = 0; idx < 5; idx++) { task_queue = server->writedata_task_queues[idx]; if (min_task_queue == NULL) min_task_queue = task_queue; else if (min_task_queue->total_conns > task_queue->total_conns) min_task_queue = task_queue; } if (min_task_queue) { conn->ref_count++;/*累加引用计数*/ /*添加到该任务队列中的待添加队列并且累加该任务队列里的连接数*/ pthread_mutex_lock(&min_task_queue->task_queue_mutex); queue_push(min_task_queue->pending_conn_queue, conn, 1); min_task_queue->total_conns++; pthread_mutex_unlock(&min_task_queue->task_queue_mutex); } } /** *负责写数据的线程处理主函数,负责管理,处理自己队列中的所有连接 */ static void *writedata_task_thread(void *arg) { task_queue_t task_queue = (task_queue_t) arg; int epfd = epoll_create(20480); struct epoll_event ev,happened_ev[128]; int ready,i,nwrite,nread,sndbuf_size = 10240; char buf[sndbuf_size]; while (1) { time_t curtime = time(NULL); /*下面三个变量遍历链表时使用,conn是当前连接,prev_conn是先前的连接,discarded_conn是丢弃的连接*/ conn_t conn = NULL, prev_conn = NULL, discarded_conn; /*把待添加队列里的连接添加到任务队列中*/ pthread_mutex_lock(&task_queue->task_queue_mutex); while ((conn = queue_pop(task_queue->pending_conn_queue))) { if (!task_queue->conn_head) { task_queue->conn_head = conn; } else { task_queue->conn_tail->writedata_task_queue_next = conn; } task_queue->conn_tail = conn; } pthread_mutex_unlock(&task_queue->task_queue_mutex); if (task_queue->total_conns == 0) { thread_sleep(1); continue; } conn = task_queue->conn_head; while (conn) { //清理已经关闭的连接,把已经关闭的连接和超时的连接从链表中删除掉 boolean conn_timeout = conn->last_activity > 0 && curtime - conn->last_activity > CLIENT_TIMEOUT; if (conn->client_status == status_CLOSED || conn->server_status == status_SERVER_CLOSING || conn_timeout) { task_queue->total_conns--; discarded_conn = conn;//放入临时变量中 //从链表中删除 if (prev_conn) { if (!conn->writedata_task_queue_next) { task_queue->conn_tail = prev_conn; prev_conn->writedata_task_queue_next = NULL; } else { prev_conn->writedata_task_queue_next = conn->writedata_task_queue_next; } } else { task_queue->conn_head = conn->writedata_task_queue_next; } conn = conn->writedata_task_queue_next; discarded_conn->ref_count--;/*引用计数减一,并且该连接已经关闭了,那么真正关闭连接的任务就交给readreq_task_thread函数了*/ } else { ev.data.fd = conn->fd; ev.data.ptr = conn; /*只负责监视写事件和出错事件*/ ev.events = EPOLLOUT | EPOLLET; if(conn->client_status == status_START_RECV_DATA) { setsockopt(conn->fd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_size, sizeof(int)); epoll_ctl(epfd,EPOLL_CTL_ADD,conn->fd,&ev); conn->client_status = status_CONNECTED; } else { epoll_ctl(epfd,EPOLL_CTL_MOD,conn->fd,&ev); } prev_conn = conn; conn = conn->writedata_task_queue_next; } } /**堵塞等待,超时时间为POLL_TIMEOUT毫秒*/ if ((ready = epoll_wait(epfd, happened_ev,sizeof(happened_ev) / sizeof(struct epoll_event), POLL_TIMEOUT)) < 0) continue; /** * 循环查找向我们发送请求的客户端的连接 */ for (i = 0; i < ready; i++) { if (happened_ev[i].events & EPOLLOUT) { conn = (conn_t) happened_ev[i].data.ptr; conn->last_activity = time(NULL);/* 记录客户端的最后活动时间*/ if(conn->remain_data_size == 0) {/*如果数据缓存已经没有任何数据了的话*/ if((nread = read(文件描述符,buf,sizeof(buf))) > 0) {/*从硬盘读数据出来到数据缓存*/ if((nwrite = write(conn->fd,buf,nread)) < 0 && errno != EINTR) { conn->client_status = status_CLOSED; continue; } if(nwrite < nread) { conn->remain_data_size = nread - nwrite; conn->remain_data = must_malloc(conn->remain_data_size); memcpy(conn->remain_data,buf + nwrite, conn->remain_data_size); } } else if(nread == 0) { conn->server_status = status_SERVER_CLOSING; } } else { /*写数据缓存里的数据到客户端*/ if((nwrite = write(conn->fd,conn->remain_data,conn->remain_data_size)) < 0 && errno != EINTR) { conn->client_status = status_CLOSED; free(conn->remain_data); continue; } if(nwrite < conn->remain_data_size) { conn->remain_data_size = conn->remain_data_size - nwrite; memcpy(buf, conn->remain_data + nwrite, conn->remain_data_size); conn->remain_data = must_realloc(conn->remain_data, conn->remain_data_size); memcpy(conn->remain_data, buf, conn->remain_data_size); } else { free(conn->remain_data); conn->remain_data = NULL; conn->remain_data_size = 0; } } } else if(happened_ev[i].events & EPOLLHUP) { conn = (conn_t) happened_ev[i].data.ptr; conn->client_status = status_CLOSED; } } } } 本文来自博客,转载请标明出处:http://blog.csdn.net/andylin02/archive/2009/12/02/4924294.aspx
    最新回复(0)