异步IO完整例子-可改写用于服务器

    技术2022-05-18  11

    因需要改写将服务器读取文件同步IO为异步IO,因此在自己机子上编写了这个根据文件列表随机不间断读取文件块得操作

    环境:CentOS4.8 512M内存,2个1.8 cpu

    编译器:g++4.5

    库:librt

    文件列表格式:文件绝对路径+文件名 空格 文件总长度

    提示:最好选取文件大点

     

    /root/work/data/[www.dy2018.com]jixieshi.rmvb 371324360

    /root/work/data/oracle9i_system01.dbf 398467072

     

    编译: g++4.5 chlaws_aio.c -lrt

     

    程序如下:

    程序还可以大幅度优化,这里只是简单实现下,有问题可以留言,转载请注明文章出处.

     

    #include <iostream> #include <map> #include <iterator> #include <list> #include <string> #include <ctime> #include <cstdlib> #include <fstream> #include <errno.h> #include <aio.h> #include <unistd.h> #include <sys/types.h> #include <cstring> #include <strings.h> #include <stdlib.h> using namespace std; struct AIO_INFO { int reqblock; string filesize; string filename; struct aiocb *my_aiocb; }; struct aiocb aiocb_list[100]; int block = 1024*256; int main() { map<string,int>file_list; ifstream infile("/root/work/etc/file_list.txt",ios::in); if( infile ) { string line; while(getline(infile,line)) { if(line.size()>2 && line.at(line.size()-2)=='/r') { line = line.substr(0,line.size()-2); } if(line.size()>1 && line.at(line.size()-1)=='/n') { line = line.substr(0,line.size()-1); } if(line.size() > 0) { size_t pos = line.find(' '); if(pos != string::npos) { string file = line.substr(0,pos); string size = line.substr(pos+1,line.size()); file_list.insert( make_pair(file, atoi(size.c_str()) ) ); } } } } if(file_list.size() <= 0) { cout << "no file" << endl; return 1; } else { cout << file_list.begin()->first << "/t"; cout << file_list.begin()->second << "/n"; } //FILE *fp=fopen(file,"rb+"); int ix=0; int nread=0,nret=0; //char rbuf[100][1024*256]; //fd_map:filename-fd map<string,int> fd_map; fd_map.clear(); struct AIO_INFO aio_info; list<struct AIO_INFO> aio_list; list<int>err_fd; srand(time(NULL)); int last_time = 0; int now; int count = 0; while(1) { now = time(NULL); if(now - last_time > 120) { cout << "aio_list size:" << aio_list.size() << endl; count ++; if (count >= 5) break; last_time = now; } fd_map.clear(); aio_list.clear(); ix = 0; aio_list.clear(); list<int>::iterator tmp_fd = err_fd.begin(); cout << "err_fd size:" << err_fd.size() << endl; while( tmp_fd != err_fd.end()) { close(*tmp_fd); err_fd.erase(tmp_fd++); } for( ; ix<100; ix++){ map<string,int>::iterator file_iter = file_list.begin(); int req = rand()%file_list.size(); //add fd to fd_map int fd=-1; //int need_insert_fd_map=0; while(req--)file_iter++; string reqfile; if(file_iter != file_list.end()) reqfile=file_iter->first; else continue; if(reqfile.size() == 0) break; map<string,int>::iterator fd_iter = fd_map.find(reqfile); if(fd_iter != fd_map.end()) { fd = fd_iter->second; } else { fd = open(reqfile.c_str(),O_RDONLY); if(fd == -1) {cout << "open error:" << strerror(errno) << endl; exit(-1);} else fd_map.insert(make_pair(reqfile,fd)); //need_insert_fd_map = 1; } int filesize = file_iter->second; int blk = rand()%(filesize/block); //cout << "req file:" << reqfile << "/tblock:" << blk << endl; //struct aiocb *my_aiocb =(struct aiocb*)malloc(sizeof(struct aiocb)); //bzero(&aio_info.my_aiocb,sizeof(struct aiocb)); //aio_info.my_aiocb = (struct aiocb*)malloc(sizeof(struct aiocb)); bzero(&aiocb_list[ix],sizeof(struct aiocb)); aio_info.my_aiocb = &aiocb_list[ix]; aio_info.my_aiocb->aio_fildes = fd; aio_info.my_aiocb->aio_buf = (char*)malloc(block+1); aio_info.my_aiocb->aio_nbytes = block; aio_info.my_aiocb->aio_offset = blk*block; //memcpy((char*)&(aio_info.my_aiocb),(char*)&my_aiocb,sizeof (my_aiocb)); //aio_info.my_aiocb = &my_aiocb; aio_info.reqblock=blk; aio_info.filename=reqfile; aio_info.filesize=filesize; //aio_list.insert(aio_list.begin(),aio_info); //printf("aio_read , aiocb address:%x/n",&aio_info.my_aiocb); int ret = aio_read(&aiocb_list[ix]); if(ret == -1) { cout << "aio_read error " << "reqfile:" << reqfile << " block:" << blk << endl; //free((void*)aio_info.my_aiocb); //bzero(&aio_info,sizeof(struct AIO_INFO)); /* map<string,int>::iterator del_fd = fd_map.find(reqfile); if(del_fd != fd_map.end()) { close(del_fd->second); fd_map.erase(del_fd); } */ err_fd.insert(err_fd.begin(),fd); continue; } cout << "req file:" << reqfile << "/tblock:" << blk << endl; aio_list.insert(aio_list.begin(),aio_info); //if(need_insert_fd_map == 1) fd_map.insert(make_pair(reqfile,fd)); /* while(aio_error(&my_aiocb) == EINPROGRESS); ret = aio_return(&my_aiocb); if(ret > 0) cout << "aio_return ok, ret=" << ret <<"/n"; else cout << "aio_return error/n"; */ } if(aio_list.size() == 0) { continue; } int ret = -1; int wait_last = 0; int wait_now = 0; count = 0; while(1) { wait_now = time(NULL); if(wait_now - wait_last > 3) { cout << "[in while] wait 30s,aio_list size:" << aio_list.size() << "fd_map size:" << fd_map.size() << endl; if( count ++ > 3) { cout << "timeout exit while loop /n"; list<struct AIO_INFO>::iterator del_it = aio_list.begin(); while(del_it!=aio_list.end()) { free((void*)del_it->my_aiocb->aio_buf); //free((void*)del_it->my_aiocb); aio_list.erase(del_it++); } break; } if(count == 1) usleep(1000); wait_last = wait_now; } if(aio_list.size() == 0) { cout << "[in while] the aio_list size : 0,exit while loop/n"; break; } list<struct AIO_INFO>::iterator aio_iter = aio_list.begin(); for( ;aio_iter!=aio_list.end();) { ret = -1; // cout << "in aio_list,fd:" << aio_iter->my_aiocb.aio_fildes // << " filename:" << aio_iter->filename << " block:" << aio_iter->reqblock << endl; //printf("aio_read , aiocb address:%x/n",&aio_iter->my_aiocb); if( (ret=aio_error((aio_iter->my_aiocb))) == -1) { free((void*)aio_iter->my_aiocb->aio_buf); //free((void*)aio_iter->my_aiocb); aio_list.erase(aio_iter++); cout << "aio_error" << endl; continue; } else if( ret == EINPROGRESS ) { aio_iter++; //cout << "async io this req not ready" << endl; continue; } else if(ret == 0) { //cout << "request :"<<aio_iter->filename << "/tblock:" << aio_iter->reqblock << "/taio_error ok/n"; ret = aio_return((aio_iter->my_aiocb)); if(ret == block) { cout << "aio_return ok, this req:" << aio_iter->filename << "/treq block:" << aio_iter->reqblock << endl; free((void*)aio_iter->my_aiocb->aio_buf); //free((void*)aio_iter->my_aiocb); aio_list.erase(aio_iter++); } else { cout << "aio_return error,ret :" << ret << endl; free((void*)aio_iter->my_aiocb->aio_buf); //free((void*)aio_iter->my_aiocb); aio_list.erase(aio_iter++); } } } }//end while wait aio return //free buf list<struct AIO_INFO>::iterator aio_iter = aio_list.begin(); for( ; aio_iter!=aio_list.end() ; ) { free((void*)aio_iter->my_aiocb->aio_buf); //free((void*)aio_iter->my_aiocb); aio_list.erase(aio_iter++); } if(aio_list.size() != 0) { cout << "error:aio_list size != 0" << endl; } //close fd map<string,int>::iterator fd_iter = fd_map.begin(); for( ; fd_iter!=fd_map.end() ; ) { cout << "close fd:" << fd_iter->second << endl; close(fd_iter->second); fd_map.erase(fd_iter++); } usleep(2*1000); } return 0; }  


    最新回复(0)