高性能的socket通讯服务器(完成端口模型--IOCP)

    技术2024-07-24  29

    高性能的socket通讯服务器(完成端口模型--IOCP)

              很多人费尽心思,都没有找到一个完美的 I/O CP 例程,甚至跟人于误解,先将本人编写的例程公布出来,希望对那些苦苦寻觅的人带来收获。本例程可以作为初学者的学习之用,亦可以作为大型服务程序的通讯模块。其处理速度可以说,优化到了极点。如果理解了本例程的精髓,加上一个高效的通讯协议,你完全可以用它来构建一个高性能的通讯服务器。       在公布代码前,先谈谈I/O CP。对I/O CP的函数不多做说明了,网上很多,都一样。在此本人仅说一些技术上要注意的问题。     一、如何管理内存 1、IO数据缓冲管理    动态分配内存,是一种灵活的方式。但对于系统资源浪费是巨大的。因此本人采用的是预先分配服务器最大需要的内存,用链表来管理。任何时候分配交还都不需要遍历,仅需要互斥而已。    更巧妙的是,将IO发送信息和内存块有机的结合在一起,减少了链表的管理工作。     //IO操作标志   TIOFlag = (IO_ACCEPT, IO_READ, IO_WRITE);   //IO操作信息   PIOInfo =^ TIOInfo;   TIOInfo = packed record     Overlapped: TOverlapped;   //重叠结构     DataBuf: TWSABUF;          //IO数据信息     Socket: TSocket;     Flag: TIOFlag;     TickCountSend: DWord;     Next: PIOInfo;     Prior: PIOInfo;   end;      PUNode =^ TUNode;   TUNode = record     Next: Pointer;   end;      PIOMem =^ TIOMem;   TIOMem = packed record     IOInfo: TIOInfo;     Data: array[1..IO_MEM_SIZE] of Byte;     //申请内存的时候,返回的是Data的地址   end;     2、链路数据管理   采用双向链表结构,减少删除节点时遍历消耗的时间     //每个连接的信息   PLink =^ TLink;   TLink = record     Socket: TSocket;     RemoteIP: string[30];     RemotePort: DWord;     //最后收到数据时的系统节拍     TickCountActive: DWord;     //处理该连接的当前线程的信息     Worker: PWorker;     Data: Pointer;     //应用层可以设置这个成员,当OnReceive的时候,就不要每次遍历每个连接对应的数据区了      Section: TRTLCriticalSection;     Next: PLink;     Prior: PLink;   end;   二、如何管理线程    每个工作线程创建的时候,调用:OnWorkerThreadCreateEvt,该函数可以返回这个线程对应的信息,比如为该线程创建的数据库连接控件或对应的类等,在OnReceive的可以从Link的Worker访问该成员Worker^.Data。   //工作线程信息   PWorker =^ TWorker;   TWorker = record     ID: THandle;     CompletionPort: THandle;     Data: Pointer;  //调用OnWorkerThreadCreateEvt返回的值     //用于反应工作情况的数据     TickCountLong,     TickCountActive: DWord;     ExecCount: Integer;     //线程完成后设置     Finished: THandle;     Next: PWorker;   end;      同理,服务线程也是具有一样的特点。相见源码。       关于线程同步,一直是众多程序头疼的问题。在本例程中,尽量避免了过多的互斥,并有效地防止了死锁现象。用RTLCriticalSection,稍微不注意,就会造成死锁的灾难。哪怕是两行代码的差别,对多线程而言都是灾难的。在本例程中,对数据同步需要操作的是在维护链路链表方面上。服务线程需要计算哪个连接空闲超时了,工作线程需要处理断线情况,应用层主动发送数据时需要对该链路独占,否则一个在发送,一个在处理断线故障,就会发送冲突,导致灾难后果。      在本人的压力测试中,已经有效的解决了这个问题,应用层部分不需要做什么同步工作,可以安心的收发数据了。同时每个线程都支持了数据库连接。     三、到底要创建多少个工作线程合适    很多文章说,有N个CPU就创建N个线程,也有说N*2+2。最不喜欢说话不负责任的人了,本例程可以让刚入门 I/O CP 的人对它有更深入的了解。 例程测试结果:     四、该不该使用类   有人说,抛弃一切类,对于服务器而言,会为类付出很多代价,从我的观点看,为类付出代价的,主要是动态创建的原因。其实,类成员访问和结构成员访问一样,需要相对地址。如果都是预先创建的,两者没有多大的差别。本例程采用裸奔函数的方式,当然在应用层可以采用类来管理,很难想象,如果没有没有类,需要多做多少工作。   五、缺点   不能发大数据包,只能发不超过固定数的数据包。但对于小数据报而言,它将是优秀的。       时间原因,不能做太多的解释和对代码做太多的注释,需要例程源码的可以和本人联系,免费提供。QQ:48092788    例程源码: http://d.download.csdn.net/down/1546336/guestcode   完成端口通讯服务模块源码: {****************************************************************************** *                      UCode 系列组件、控件                                   * *                   作者:卢益贵         2003~2009                           * *       版权所有    任何未经授权的使用和销售,均保留追究法律责任的权力        * *                                                                             * *      UCode 系列由XCtrls-YCtrls-ICtrls-NCode系列演变而来                  * *           QQ:48092788        luyigui.blog.gxsky.com                         * ******************************************************************************} {******************************************************************************                      完成端口模型的socket服务器 ******************************************************************************} unit UTcpServer; interface uses   Windows, Classes, UClasses, UWinSock2; const   //每个IO缓冲区的大小   IO_MEM_SIZE                            = 2048;   //内存要足够用,可视情况设置   IO_MEM_MAX_COUNT                       = 1000 * 10;   //最大连接数   SOCK_MAX_COUNT                         = 3000;   //连接空闲实现,超过这个时间未收到客户端数据则关闭   SOCK_IDLE_OVERTIME                     = 60; type   //工作线程信息   PWorker =^ TWorker;   TWorker = record     ID: THandle;     CompletionPort: THandle;     Data: Pointer;     //用于反应工作情况的数据     TickCountLong,     TickCountActive: DWord;     ExecCount: Integer;     //线程完成后设置     Finished: THandle;     Next: PWorker;   end;   //每个连接的信息   PLink =^ TLink;   TLink = record     Socket: TSocket;     RemoteIP: string[30];     RemotePort: DWord;     //最后收到数据时的系统节拍     TickCountActive: DWord;     //处理该连接的当前线程的信息     Worker: PWorker;     Data: Pointer;     Section: TRTLCriticalSection;     Next: PLink;     Prior: PLink;   end;   TOnLinkIdleOvertimeEvt = procedure(Link: PLink);   TOnDisconnectEvt = procedure(Link: PLink);   TOnReceiveEvt = function(Link: PLink; Buf: PByte; Len: Integer): Boolean;   TOnThreadCreateEvt = function(IsWorkerThread: Boolean): Pointer; //取得链路链表使用情况X% function GetLinkUse(): real; //链路链表所占内存 function GetLinkSize(): Integer; //当前链路数 function GetLinkCount(): Integer; //空闲链路数 function GetLinkFree(): Integer; //IO内存使用情况 function GetIOMemUse(): Real; //IO内存链表占内存数 function GetIOMemSize(): Integer; //IO内存空闲数 function GetIOMemFree(): Integer; //交还一个IO内存 procedure FreeIOMem(Mem: Pointer); //获取一个IO内存区 function GetIOMem(): Pointer; //获取工作线程的工作情况 function GetWorkerExecInfo(Index: Integer; var TickCount: DWord): Integer; //获取工作线程的ID function GetWorkerID(Index: Integer): Integer; //获取工作线程数量 function GetWorkerCount(): Integer; //打开一个IP端口,并监听 function StartTcpServer(RemoteIP: String; RemotePort: DWord): Boolean; //停止并关闭一个IP端口 function StopTcpServer(): Boolean; //设置响应事件的函数指针,在StartTcpServer之前调用 procedure SetEventProc(OnReceive: TOnReceiveEvt;                        OnDisconnect: TOnDisconnectEvt;                        OnLinkIdleOvertime: TOnLinkIdleOvertimeEvt;                        OnServerThreadCreate: TOnThreadCreateEvt;                        OnWorkerThreadCreate: TOnThreadCreateEvt); //写日志文件 procedure WriteLog(Log: String); function PostRecv(Link: PLink; IOMem: Pointer): Boolean; //抛出一个发送事件 function PostSend(Link: PLink; IOMem: Pointer; Len: Integer): Boolean; //广播数据到所有的链路对方 procedure PostBroadcast(Buf: PByte; Len: Integer); //当前是否打开 function IsTcpServerActive(): Boolean; //获取服务线程最后一次工作所占的时间(MS) function GetServerExecLong(): DWord; //获取服务线程工作次数 function GetServerExecCount(): Integer; //获取本地或对外IP地址 function GetLocalIP(IsIntnetIP: Boolean): String; implementation uses   IniFiles, SysUtils, ActiveX; var   ExePath: String = ''; const   HEAP_NO_SERIALIZE          = 1;  {非互斥, 此标记可允许多个线程同时访问此堆}   HEAP_GENERATE_EXCEPTIONS   = 4;  {当建立堆出错时, 此标记可激发一个异常并返回异常标识}   HEAP_ZERO_MEMORY           = 8;  {把分配的内存初始化为 0}   HEAP_REALLOC_IN_PLACE_ONLY = 16; {此标记不允许改变原来的内存位置}   STATUS_ACCESS_VIOLATION    = DWORD($C0000005); {参数错误}   STATUS_NO_MEMORY           = DWORD($C0000017); {内存不足} {===============================================================================                               IO内存管理 ================================================================================} type   //IO操作标志   TIOFlag = (IO_ACCEPT, IO_READ, IO_WRITE);   //IO操作信息   PIOInfo =^ TIOInfo;   TIOInfo = packed record     Overlapped: TOverlapped;   //重叠结构     DataBuf: TWSABUF;          //IO数据信息     Socket: TSocket;     Flag: TIOFlag;     TickCountSend: DWord;     Next: PIOInfo;     Prior: PIOInfo;   end;      PUNode =^ TUNode;   TUNode = record     Next: Pointer;   end;      PIOMem =^ TIOMem;   TIOMem = packed record     IOInfo: TIOInfo;     Data: array[1..IO_MEM_SIZE] of Byte;   end; var   IOMemHead: PIOMem = nil;   IOMemLast: PIOMem = nil;   IOMemUse: Integer = 0;   IOMemSec: TRTLCriticalSection;   IOMemList: array[1..IO_MEM_MAX_COUNT] of Pointer; function GetIOMem(): Pointer; begin   //内存要足够用,如果不够,即使是动态分配,神仙也救不了   EnterCriticalSection(IOMemSec);   try     try       Result := @(IOMemHead^.Data);       IOMemHead := PUNode(IOMemHead)^.Next;       IOMemUse := IOMemUse + 1;     except       Result := nil;       WriteLog('GetIOMem: error');     end;   finally     LeaveCriticalSection(IOMemSec);   end; end; procedure FreeIOMem(Mem: Pointer); begin   EnterCriticalSection(IOMemSec);   try     try       Mem := Pointer(Integer(Mem) - sizeof(TIOInfo));       PUNode(Mem).Next := nil;       PUNode(IOMemLast)^.Next := Mem;       IOMemLast := Mem;       IOMemUse := IOMemUse - 1;     except       WriteLog('FreeIOMem: error');     end;   finally     LeaveCriticalSection(IOMemSec);   end; end; procedure IniIOMem(); var   i: Integer;   Heap: THandle; begin   InitializeCriticalSection(IOMemSec);   IOMemHead := HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(TIOMem));   IOMemLast := IOMemHead;   IOMemList[1] := IOMemHead;   Heap := GetProcessHeap();   for i := 2 to IO_MEM_MAX_COUNT do   begin     PUNode(IOMemLast)^.Next := HeapAlloc(Heap, HEAP_ZERO_MEMORY, sizeof(TIOMem));     IOMemList[i] := PUNode(IOMemLast)^.Next;     IOMemLast := PUNode(IOMemLast)^.Next;   end;   PUNode(IOMemLast).Next := nil; end; function GetIOMemFree(): Integer; var   IOMems: PUNode; begin   EnterCriticalSection(IOMemSec);   Result := 0;   IOMems := PUNode(IOMemHead);   while IOMems <> nil do   begin     Result := Result + 1;     IOMems := IOMems^.Next;   end;   LeaveCriticalSection(IOMemSec); end; procedure DeleteIOMem(); var   i: Integer;   Heap: THandle; begin   Heap := GetProcessHeap();   for i := 1 to IO_MEM_MAX_COUNT do      HeapFree(Heap, HEAP_NO_SERIALIZE, IOMemList[i]);   IOMemUse := 0;   DeleteCriticalSection(IOMemSec); end; function GetIOMemSize(): Integer; begin   Result := IO_MEM_MAX_COUNT * sizeof(TIOMem); end; function GetIOMemUse(): Real; begin   Result := (IOMemUse * 100) / IO_MEM_MAX_COUNT; end; {===============================================================================                               Socket链路管理 ================================================================================} procedure OnLinkIdleOvertimeDef(Link: PLink); begin end; var   LinkHead: PLink = nil;   LinkLast: PLink = nil;   LinkUse: Integer = 0;   LinkCount: Integer = 0;   LinkSec: TRTLCriticalSection;   LinkList: array[1..SOCK_MAX_COUNT] of PLink;   OnLinkIdleOvertimeEvt: TOnLinkIdleOvertimeEvt = OnLinkIdleOvertimeDef;   LinksHead: PLink = nil;   LinksLast: PLink = nil; function GetLinkFree(): Integer; var   Links: PLink; begin   EnterCriticalSection(LinkSec);   Result := 0;   Links := LinkHead;   while Links <> nil do   begin     Result := Result + 1;     Links := Links^.Next;   end;   LeaveCriticalSection(LinkSec); end; function GetLink(): PLink; begin   try     //内存要足够用,如果不够,即使是动态分配,神仙也救不了     Result := LinkHead;     LinkHead := LinkHead^.Next;     LinkUse := LinkUse + 1;     LinkCount := LinkCount + 1;     if LinksHead = nil then     begin       LinksHead := Result;       LinksHead^.Next := nil;       LinksHead^.Prior := nil;       LinksLast := LinksHead;     end else     begin       Result^.Prior := LinksLast;       LinksLast^.Next := Result;       LinksLast := Result;       LinksLast^.Next := nil;     end;     with Result^ do     begin       Socket := INVALID_SOCKET;       RemoteIP := '';       RemotePort := 0;       TickCountActive := GetTickCount();       Worker := nil;       Data := nil;     end;   except     Result := nil;     WriteLog('GetLink: error');   end; end; procedure FreeLink(Link: PLink); begin   try     with Link^ do     begin       Link^.Worker := nil;       if Link = LinksHead then       begin         LinksHead := Next;         if LinksLast = Link then           LinksLast := LinksHead         else           LinksHead^.Prior := nil;       end else       begin         Prior^.Next := Next;         if Next <> nil then           Next^.Prior := Prior;         if Link = LinksLast then           LinksLast := Prior;       end;       Next := nil;       LinkLast^.Next := Link;       LinkLast := Link;       LinkUse := LinkUse - 1;       LinkCount := LinkCount - 1;     end;   except     WriteLog('FreeLink: error');   end; end; procedure CloseLink(Link: PLink); begin   EnterCriticalSection(LinkSec);   with Link^ do   begin     EnterCriticalSection(Section);     if Socket <> INVALID_SOCKET then     begin       try         CloseSocket(Socket);       except         WriteLog('CloseSocket: error');       end;       Socket := INVALID_SOCKET;       FreeLink(Link);     end;     LeaveCriticalSection(Link^.Section);   end;   LeaveCriticalSection(LinkSec); end; procedure CheckLinkLinkIdleOvertime(Data: Pointer); var   TickCount: DWord;   Long: Integer;   Link: PLink; begin   EnterCriticalSection(LinkSec);   try     TickCount := GetTickCount();     Link := LinksHead;     while Link <> nil do     with Link^ do     begin       EnterCriticalSection(Section);       if Socket <> INVALID_SOCKET then       begin         if TickCount > TickCountActive then           Long := TickCount - TickCountActive         else           Long := $FFFFFFFF - TickCountActive + TickCount;         if SOCK_IDLE_OVERTIME * 1000 < Long then         begin           try             CloseSocket(Socket);           except             WriteLog('CloseSocket overtime: error');           end;           Socket := INVALID_SOCKET;           Worker := Data;           try             OnLinkIdleOvertimeEvt(Link);           except             WriteLog('OnLinkIdleOvertimeEvt: error');           end;           Worker := nil;           FreeLink(Link);           LeaveCriticalSection(Section);           break;         end;       end else       begin         LeaveCriticalSection(Section);         break;       end;       LeaveCriticalSection(Section);       Link := Link^.Next;     end;   except     WriteLog('CheckLinkLinkIdleOvertime: error');   end;   LeaveCriticalSection(LinkSec); end; procedure IniLink(); var   i: Integer;   Heap: THandle; begin   InitializeCriticalSection(LinkSec);   LinkHead := HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(TLink));   InitializeCriticalSection(LinkHead^.Section);   LinkLast := LinkHead;   LinkList[1] := LinkHead;   Heap := GetProcessHeap();   for i := 2 to SOCK_MAX_COUNT do   begin     LinkLast^.Next := HeapAlloc(Heap, HEAP_ZERO_MEMORY, sizeof(TLink));     LinkLast := LinkLast^.Next;     InitializeCriticalSection(LinkLast^.Section);     LinkList[i] := LinkLast;   end;   LinkLast.Next := nil; end; procedure DeleteLink(); var   i: Integer;   Heap: THandle; begin   Heap := GetProcessHeap();   for i := 1 to SOCK_MAX_COUNT do   begin     DeleteCriticalSection(LinkList[i]^.Section);     HeapFree(Heap, HEAP_NO_SERIALIZE, LinkList[i]);   end;   LinkUse := 0;   LinkCount := 0;   LinksHead := nil;   LinksLast := nil;   DeleteCriticalSection(LinkSec); end; function GetLinkSize(): Integer; begin   Result := SOCK_MAX_COUNT * sizeof(TLink); end; function GetLinkUse(): real; begin   Result := (LinkUse * 100) / SOCK_MAX_COUNT; end; function GetLinkCount(): Integer; begin   Result := LinkCount; end; {===============================================================================                               工作线程 ================================================================================} procedure OnDisconnectDef(Link: PLink); begin end; function OnReceiveDef(Link: PLink; Buf: PByte; Len: Integer): Boolean; var   IOMem: Pointer;   i: Integer; begin   Result := True;                     IOMem := GetIOMem();   CopyMemory(IOMem, Buf, Len);   i := 1000000;   while i > 0 do     i := i - 1;   if not PostSend(Link, IOMem, Len) then     FreeIOMem(IOMem); end; function OnWorkerThreadCreateDef(IsWorkerThread: Boolean): Pointer; begin   Result := nil; end; var   WorkerHead: PWorker = nil;   WorkerCount: Integer = 0;   OnDisconnectEvt: TOnDisconnectEvt = OnDisconnectDef;   OnReceiveEvt: TOnReceiveEvt = OnReceiveDef;   OnWorkerThreadCreateEvt: TOnThreadCreateEvt = OnWorkerThreadCreateDef; function GetWorkerCount(): Integer; begin   Result := WorkerCount; end; function WorkerThread(Worker: PWorker): DWORD; stdcall; var   Link: PLink;   IOInfo: PIOInfo;   Bytes: DWord;   CompletionPort: THandle; begin   Result := 0;   CompletionPort := Worker^.CompletionPort;   with Worker^ do   begin     TickCountActive := GetTickCount();     TickCountLong := 0;     ExecCount := 0;   end;   WriteLog(Format('Worker thread:%d begin', [Worker^.ID]));   CoInitialize(nil);   try     while True do     begin       try         with Worker^ do           TickCountLong := TickCountLong + GetTickCount() - TickCountActive;                    if GetQueuedCompletionStatus(CompletionPort, Bytes, DWORD(Link), POverlapped(IOInfo), INFINITE) = False then         begin           if (Link <> nil) then           with Link^ do           begin             EnterCriticalSection(LinkSec);             EnterCriticalSection(Section);             if Link^.Socket <> INVALID_SOCKET then             begin               try                 CloseSocket(Socket);               except                 WriteLog(Format('CloseSocket1:%d error', [Worker^.ID]));               end;               Socket := INVALID_SOCKET;               Link^.Worker := Worker;               try                 OnDisconnectEvt(Link);               except                 WriteLog(Format('OnDisconnectEvt1:%d error', [Worker^.ID]));               end;               Link^.Worker := nil;               FreeLink(Link);             end;             LeaveCriticalSection(Section);             LeaveCriticalSection(LinkSec);           end;           if IOInfo <> nil then             FreeIOMem(IOInfo^.DataBuf.buf);           WriteLog(Format('GetQueuedCompletionStatus:%d error', [Worker^.ID]));           continue;         end;                  with Worker^ do         begin           TickCountActive := GetTickCount();           ExecCount := ExecCount + 1;         end;         if (Bytes = 0) then         begin           if (Link <> nil) then           with Link^ do           begin             EnterCriticalSection(LinkSec);             EnterCriticalSection(Section);             if Link^.Socket <> INVALID_SOCKET then             begin               try                 CloseSocket(Socket);               except                 WriteLog(Format('CloseSocket2:%d error', [Worker^.ID]));               end;               Socket := INVALID_SOCKET;               Link^.Worker := Worker;               try                 OnDisconnectEvt(Link);               except                 WriteLog(Format('OnDisconnectEvt2:%d error', [Worker^.ID]));               end;               Link^.Worker := nil;               FreeLink(Link);             end;             LeaveCriticalSection(Section);             LeaveCriticalSection(LinkSec);             if IOInfo.Flag = IO_WRITE then               FreeIOMem(IOInfo^.DataBuf.buf)             else               FreeIOMem(IOInfo^.DataBuf.buf);             continue;           end else           begin             if IOInfo <> nil then               FreeIOMem(IOInfo^.DataBuf.buf);             break;           end;         end;                if IOInfo.Flag = IO_WRITE then         begin           FreeIOMem(IOInfo^.DataBuf.buf);           continue;         end;                  {if IOInfo.Flag = IO_ACCEPT then         begin           ......           continue;         end;}         with Link^, IOInfo^.DataBuf do         begin           Link^.Worker := Worker;           try             OnReceiveEvt(Link, buf, Bytes);           except             WriteLog(Format('OnReceiveEvt:%d error', [Worker^.ID]));           end;           Link^.Worker := nil;           TickCountActive := GetTickCount();           if not PostRecv(Link, buf) then           begin             EnterCriticalSection(LinkSec);             EnterCriticalSection(Section);             if Socket <> INVALID_SOCKET then             begin               try                 CloseSocket(Socket);               except                 WriteLog(Format('CloseSocket3:%d error', [Worker^.ID]));               end;               Socket := INVALID_SOCKET;               Link^.Worker := Worker;               try                 OnDisconnectEvt(Link);               except                 WriteLog(Format('OnDisconnectEvt3:%d error', [Worker^.ID]));               end;               Link^.Worker := nil;               FreeLink(Link);             end;             LeaveCriticalSection(Section);             LeaveCriticalSection(LinkSec);             FreeIOMem(buf);           end;         end;       except         WriteLog(Format('Worker thread:%d error', [Worker^.ID]));       end;     end;   finally     CoUninitialize();     WriteLog(Format('Worker thread:%d end', [Worker^.ID]));     SetEvent(Worker^.Finished);   end; end; procedure CreateWorkerThread(CompletionPort: THandle); var   Worker, Workers: PWorker;   i: Integer;   SystemInfo: TSystemInfo;   ThreadHandle: THandle; begin   GetSystemInfo(SystemInfo);   Workers := nil;   WorkerCount := (SystemInfo.dwNumberOfProcessors * 2 + 2);   for i := 1 to WorkerCount do   begin     Worker := HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(TWorker));     if Workers = nil then     begin       Workers := Worker;       WorkerHead := Workers;     end else     begin       Workers^.Next := Worker;       Workers := Worker;     end;     Worker^.CompletionPort := CompletionPort;     Worker^.Data := OnWorkerThreadCreateEvt(False);     Worker^.Finished := CreateEvent(nil, True, False, nil);     ThreadHandle := CreateThread(nil, 0, @WorkerThread, Worker, 0, Worker^.ID);     if ThreadHandle <> 0 then       CloseHandle(ThreadHandle);   end;   Workers^.Next := nil; end; procedure DestroyWorkerThread(); var   Worker, Save: PWorker; begin   WorkerCount := 0;   Worker := WorkerHead;   while Worker <> nil do   begin     PostQueuedCompletionStatus(Worker^.CompletionPort, 0, 0, nil);     Worker := Worker^.Next;   end;   Worker := WorkerHead;   while Worker <> nil do   begin     with Worker^ do     begin       WaitForSingleObject(Worker^.Finished, INFINITE);       CloseHandle(Worker^.Finished);       Save := Worker^.Next;     end;     HeapFree(GetProcessHeap(), HEAP_NO_SERIALIZE, Worker);     Worker := Save;   end; end; function GetWorkerExecInfo(Index: Integer; var TickCount: DWord): Integer; var   Worker: PWorker;   Count: Integer; begin   Worker := WorkerHead;   Count := 0;   Result := 0;   while Worker <> nil do   with Worker^ do   begin     Count := Count + 1;     if Count = Index then     begin       TickCount := TickCountLong;       TickCountLong := 0;       Result := Worker^.ExecCount;       break;     end;     Worker := Worker^.Next;   end; end; function GetWorkerID(Index: Integer): Integer; var   Worker: PWorker;   Count: Integer; begin   Worker := WorkerHead;   Count := 0;   while Worker <> nil do   begin     Count := Count + 1;     if Count = Index then     begin       Count := Worker^.ID;       break;     end;     Worker := Worker^.Next;   end;   Result := Count; end; {===============================================================================                               服务线程 ================================================================================} function OnServerThreadCreateDef(IsWorkerThread: Boolean): Pointer; begin   Result := nil; end; var   ListenSocket: TSocket = INVALID_SOCKET;   SocketEvent: THandle = WSA_INVALID_EVENT;   CompletionPort: THandle = 0;   Terminated: Boolean = False;   ServerThreadID: DWORD = 0;   ServerExecCount: Integer = 0;   ServerExecLong: DWord = 0;   OnServerThreadCreateEvt: TOnThreadCreateEvt = OnServerThreadCreateDef;   ServerFinished: THandle; function GetServerExecCount(): Integer; begin   Result := ServerExecCount; end; function GetServerExecLong(): DWord; begin   Result := ServerExecLong;   ServerExecLong := 0; end;     function ServerThread(Param: Pointer): DWORD; stdcall; var   AcceptSocket: TSocket;   Addr: TSockAddrIn;   Len: Integer;   Link: PLink;   IOMem: Pointer;   bNodelay: Boolean;   TickCount: DWord;   WR: DWord; begin   Result := 0;   CoInitialize(nil);   WriteLog('Server thread begin');   TickCount := GetTickCount();   try     while not Terminated do     begin       try         ServerExecLong := ServerExecLong + (GetTickCount() - TickCount);         WR := WaitForSingleObject(SocketEvent, 10000);                  ServerExecCount := ServerExecCount + 1;         TickCount := GetTickCount();                  if (WAIT_TIMEOUT = WR) then         begin           CheckLinkLinkIdleOvertime(Param);           continue;         end else         if (WAIT_FAILED = WR) then         begin           continue;         end else         begin                     Len := SizeOf(TSockAddrIn);           AcceptSocket := WSAAccept(ListenSocket, @Addr, @Len, nil, 0);           if (AcceptSocket = INVALID_SOCKET) then             continue;           if LinkCount >= SOCK_MAX_COUNT then           begin             try               CloseSocket(AcceptSocket);             except               WriteLog('Link count over');             end;             continue;           end;                        bNodelay := True;           if SetSockOpt(AcceptSocket, IPPROTO_TCP, TCP_NODELAY,                         PChar(@bNodelay), sizeof(bNodelay)) = SOCKET_ERROR then           begin             try               CloseSocket(AcceptSocket);             except               WriteLog('SetSockOpt: error');             end;             continue;           end;           EnterCriticalSection(LinkSec);           Link := GetLink();           with Link^ do           begin             EnterCriticalSection(Section);             RemoteIP := inet_ntoa(Addr.sin_addr);             RemotePort := Addr.sin_port;             TickCountActive := GetTickCount();             Socket := AcceptSocket;             IOMem := GetIOMem();             if (CreateIoCompletionPort(AcceptSocket, CompletionPort, DWORD(Link), 0) = 0) or                (not PostRecv(Link, IOMem)) then             begin               try                 CloseSocket(Socket);               except                 WriteLog('CreateIoCompletionPort or PostRecv: error');               end;               Socket := INVALID_SOCKET;               FreeLink(Link);               FreeIOMem(IOMem);             end;             LeaveCriticalSection(Section);           end;           LeaveCriticalSection(LinkSec);         end;       except         WriteLog('Server thread error');       end;     end;   finally     CoUninitialize();     WriteLog('Server thread end');     SetEvent(ServerFinished);   end; end; function StartTcpServer(RemoteIP: String; RemotePort: DWord): Boolean; var   NonBlock: Integer;   bNodelay: Boolean;   Addr: TSockAddrIn;   ThreadHandle: THANDLE; begin   Result := ListenSocket = INVALID_SOCKET;   if not Result then     exit;   IniIOMem();   IniLink();        ListenSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED);   Result := ListenSocket <> INVALID_SOCKET;   if not Result then   begin     DeleteLink();     DeleteIOMem();     exit;   end;   bNodelay := True;   NonBlock := 1;   Addr.sin_family := AF_INET;   Addr.sin_addr.s_addr := inet_addr(PChar(RemoteIP));   Addr.sin_port := htons(RemotePort);   Result := (SetSockOpt(ListenSocket, IPPROTO_TCP, TCP_NODELAY, PChar(@bNodelay), sizeof(bNodelay)) <> SOCKET_ERROR) and             (ioctlsocket(ListenSocket, Integer(FIONBIO), NonBlock) <> SOCKET_ERROR) and             (Bind(ListenSocket, @Addr, SizeOf(TSockAddrIn)) <> SOCKET_ERROR) and             (Listen(ListenSocket, SOMAXCONN) <> SOCKET_ERROR);   if not Result then   begin     ListenSocket := INVALID_SOCKET;     DeleteLink();     DeleteIOMem();     exit;   end;   SocketEvent := CreateEvent(nil, FALSE, FALSE, nil);   Result := (SocketEvent <> WSA_INVALID_EVENT);   if (not Result) then   begin     CloseSocket(ListenSocket);     ListenSocket := INVALID_SOCKET;     DeleteLink();     DeleteIOMem();     exit;   end;   Result := (WSAEventSelect(ListenSocket, SocketEvent, FD_ACCEPT) <> SOCKET_ERROR);   if not Result then   begin     CloseSocket(ListenSocket);     ListenSocket := INVALID_SOCKET;     WSACloseEvent(SocketEvent);     SocketEvent := WSA_INVALID_EVENT;     DeleteLink();     DeleteIOMem();     exit;   end;   CompletionPort := CreateIoCompletionPort(INVALID_HANDLE_value, 0, 0, 0);   Result := CompletionPort <> 0;   if not Result then   begin     CloseSocket(ListenSocket);     ListenSocket := INVALID_SOCKET;     WSACloseEvent(SocketEvent);     SocketEvent := WSA_INVALID_EVENT;     DeleteLink();     DeleteIOMem();     exit;   end;   WriteLog('Server Start');   CreateWorkerThread(CompletionPort);   ServerFinished := CreateEvent(nil, True, False, nil);   Result := ServerFinished <> 0;   if not Result then   begin     CloseSocket(ListenSocket);     ListenSocket := INVALID_SOCKET;     WSACloseEvent(SocketEvent);     SocketEvent := WSA_INVALID_EVENT;     DeleteLink();     DeleteIOMem();     exit;   end;   Terminated := False;   ThreadHandle := CreateThread(nil, 0, @ServerThread, OnServerThreadCreateEvt(False), 0, ServerThreadID);   if (ThreadHandle = 0) then   begin     StopTcpServer();     exit;   end;   CloseHandle(ThreadHandle); end; function StopTcpServer(): Boolean; begin   Result := ListenSocket <> INVALID_SOCKET;   if not Result then     exit;   WriteLog('Server Stop');   Terminated := True;   if ServerFinished <> 0 then   begin     WaitForSingleObject(ServerFinished, INFINITE);     CloseHandle(ServerFinished);     ServerFinished := 0;   end;   if SocketEvent <> 0 then     WSACloseEvent(SocketEvent);   SocketEvent := 0;   DestroyWorkerThread();   if ListenSocket <> INVALID_SOCKET then     CloseSocket(ListenSocket);   ListenSocket := INVALID_SOCKET;   if CompletionPort <> 0 then     CloseHandle(CompletionPort);   CompletionPort := 0;   ServerExecCount := 0;   ServerExecLong := 0;   DeleteLink();   DeleteIOMem(); end; function GetLocalIP(IsIntnetIP: Boolean): String; type   TaPInAddr = Array[0..10] of PInAddr;   PaPInAddr = ^TaPInAddr; var   phe: PHostEnt;   pptr: PaPInAddr;   Buffer: Array[0..63] of Char;   I: Integer; begin   Result := '0.0.0.0';   try     GetHostName(Buffer, SizeOf(Buffer));     phe := GetHostByName(buffer);     if phe = nil then       Exit;     pPtr := PaPInAddr(phe^.h_addr_list);     if IsIntnetIP then     begin       I := 0;       while pPtr^[I] <> nil do       begin         Result := inet_ntoa(pptr^[I]^);         Inc(I);       end;     end else       Result := inet_ntoa(pptr^[0]^);   except   end; end; procedure SetEventProc(OnReceive: TOnReceiveEvt;                        OnDisconnect: TOnDisconnectEvt;                        OnLinkIdleOvertime: TOnLinkIdleOvertimeEvt;                        OnServerThreadCreate: TOnThreadCreateEvt;                        OnWorkerThreadCreate: TOnThreadCreateEvt); begin   OnReceiveEvt := OnReceive;   OnDisconnectEvt := OnDisconnect;   OnLinkIdleOvertimeEvt := OnLinkIdleOvertime;   OnServerThreadCreateEvt := OnServerThreadCreate;   OnWorkerThreadCreateEvt := OnWorkerThreadCreate; end; function PostRecv(Link: PLink; IOMem: Pointer): Boolean; var   Flags: DWord;   Bytes: DWord;   IOInfo: PIOInfo; begin   Result := Link^.Socket <> INVALID_SOCKET;   if Result then   try     Flags := 0;     Bytes := 0;     IOInfo := PIOInfo(Integer(IOMem) - sizeof(TIOInfo));     with IOInfo^ do     begin       ZeroMemory(IOInfo, sizeof(TIOInfo));       DataBuf.buf := IOMem;       DataBuf.len := IO_MEM_SIZE;       Socket := Link^.Socket;       Flag := IO_READ;       Result := (WSARecv(Socket, @DataBuf, 1, @Bytes, @Flags, @Overlapped, nil) <> SOCKET_ERROR) or                 (WSAGetLastError() = ERROR_IO_PENDING);     end;   except     Result := False;     WriteLog('PostRecv: error');   end; end; function PostSend(Link: PLink; IOMem: Pointer; Len: Integer): Boolean; var   Bytes: DWord;   IOInfo: PIOInfo; begin   Result := Link^.Socket <> INVALID_SOCKET;   if Result then   try     Bytes := 0;     IOInfo := PIOInfo(Integer(IOMem) - sizeof(TIOInfo));     with IOInfo^ do     begin       ZeroMemory(IOInfo, sizeof(TIOInfo));       DataBuf.buf := IOMem;       DataBuf.len := Len;       Socket := Link^.Socket;       Flag := IO_WRITE;       Result := (WSASend(Socket, @(DataBuf), 1, @Bytes, 0, @(Overlapped), nil) <> SOCKET_ERROR) or                 (WSAGetLastError() = ERROR_IO_PENDING);     end;   except     Result := False;     WriteLog('PostSend: error');   end; end; procedure PostBroadcast(Buf: PByte; Len: Integer); var   IOMem: Pointer;   Link: PLink; begin   EnterCriticalSection(LinkSec);   Link := LinksHead;   while Link <> nil do   with Link^ do   begin     if Socket <> INVALID_SOCKET then     begin       IOMem := GetIOMem();       CopyMemory(IOMem, Buf, Len);       if not PostSend(Link, IOMem, Len) then         FreeIOMem(IOMem);     end;     Link := Link^.Next;   end;   LeaveCriticalSection(LinkSec); end; function IsTcpServerActive(): Boolean; begin   Result := ListenSocket <> INVALID_SOCKET; end; {===============================================================================                               日志管理 ================================================================================} var   LogSec: TRTLCriticalSection;   Inifile: TIniFile;   LogCount: Integer = 0;   LogName: String = ''; procedure WriteLog(Log: String); begin   EnterCriticalSection(LogSec);   try     LogCount := LogCount + 1;     IniFile.WriteString(LogName,                         'Index' + IntToStr(LogCount),                         DateTimeToStr(Now()) + ':' + Log);   finally     LeaveCriticalSection(LogSec);   end; end; {===============================================================================                               初始化Window Socket ================================================================================} var   WSAData: TWSAData;    procedure Startup; var   ErrorCode: Integer; begin   ErrorCode := WSAStartup(  {$SK_blogItemTitle$} {$SK_ItemBody$}  {$SK_blogDiary$}{$SK_blogItemLink$} {$SK_blogItemComm$} {$SK_blogItemQuote$} {$SK_blogItemVisit$} 01, WSAData);   if ErrorCode <> 0 then     WriteLog('Window Socket init Error!'); end; procedure Cleanup; var   ErrorCode: Integer; begin   ErrorCode := WSACleanup;   if ErrorCode <> 0 then     WriteLog('Window Socket cleanup error!'); end; function GetExePath(): String; var   ModuleName: array[0..1024] of char; begin   GetModuleFileName(MainInstance, ModuleName, SizeOf(ModuleName));   Result := ExtractFilePath(ModuleName); end; initialization   LogName := DateTimeToStr(Now());   InitializeCriticalSection(LogSec);   ExePath := GetExePath();   IniFile := TIniFile.Create(ExePath + 'Logs.Ini');   Startup(); finalization   Cleanup();   DeleteCriticalSection(LogSec);   IniFile.Destroy();    end.     主窗口单元源码: unit uMainTcpServerIOCP; interface uses   Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,   Dialogs, ExtCtrls, StdCtrls, ComCtrls, UTcpServer, Sockets, Grids; type   TfrmMainUTcpServerIOCP = class(TForm)     Label1: TLabel;     Label2: TLabel;     edtIP: TEdit;     edtPort: TEdit;     btn: TButton;     Timer1: TTimer;     Label3: TLabel;     lbIO: TLabel;     Label5: TLabel;     lbIOU: TLabel;     Label7: TLabel;     lbL: TLabel;     Label9: TLabel;     lbLU: TLabel;     Label11: TLabel;     lbLS: TLabel;     Label13: TLabel;     lbW: TLabel;     Info: TStringGrid;     Label4: TLabel;     lbWC: TLabel;     Label8: TLabel;     lbWU: TLabel;     Label12: TLabel;     lbLF: TLabel;     Label15: TLabel;     lbLFL: TLabel;     Label6: TLabel;     lbIOF: TLabel;     lbIOFL: TLabel;     Label16: TLabel;     Timer2: TTimer;     procedure btnClick(Sender: TObject);     procedure FormCreate(Sender: TObject);     procedure Timer1Timer(Sender: TObject);     procedure FormDestroy(Sender: TObject);     procedure Timer2Timer(Sender: TObject);   private     { Private declarations }     FTickCount: DWord;   public     { Public declarations }   end; var   frmMainUTcpServerIOCP: TfrmMainUTcpServerIOCP; implementation {$R *.dfm} { TfrmMainUTcpServerIOCP } procedure TfrmMainUTcpServerIOCP.btnClick(Sender: TObject); var   i: Integer;   C1: Integer;   C2: DWord;   DT: TDateTime; begin   if btn.Caption = 'Open' then   begin     StartTcpServer(edtIP.Text, StrToInt(edtPort.Text));     if IsTcpServerActive() then     begin       FTickCount := GetTickCount();       Info.RowCount := GetWorkerCount() + 1;       DT := Now();       for i := 1 to Info.RowCount - 1 do       begin         Info.Cells[0, i] := IntToStr(i);         Info.Cells[1, i] := IntToStr(GetWorkerID(i));         C1 := GetWorkerExecInfo(i, C2);         Info.Cells[2, i] := IntToStr(C1);         Info.Cells[3, i] := '0';         Info.Cells[4, i] := IntToStr(C2);         Info.Cells[5, i] := '0';         Info.Cells[6, i] := DateTimeToStr(DT);       end;       Timer1.Enabled := True;     end;   end else   begin     Timer1.Enabled := False;     StopTcpServer();   end;   if IsTcpServerActive() then     btn.Caption := 'Close'   else     btn.Caption := 'Open'; end; procedure TfrmMainUTcpServerIOCP.FormCreate(Sender: TObject); begin   edtIP.Text := GetLocalIP(False);   Info.ColCount := 7;   Info.RowCount := 2;   Info.ColWidths[0] := 30;   Info.ColWidths[1] := 30;   Info.ColWidths[2] := 40;   Info.ColWidths[3] := 40;   Info.ColWidths[4] := 30;   Info.ColWidths[5] := 40;   Info.ColWidths[6] := 110;   Info.Cells[0, 0] := '序号';   Info.Cells[1, 0] := 'ID';   Info.Cells[2, 0] := '计数';   Info.Cells[3, 0] := '次/S';   Info.Cells[4, 0] := '时长';   Info.Cells[5, 0] := '使用率';   Info.Cells[6, 0] := '时间'; end; procedure TfrmMainUTcpServerIOCP.Timer1Timer(Sender: TObject); var   i: Integer;   Count1, Count2, Count3, TC, TCC: DWord; begin   if not IsTcpServerActive() then   begin     Timer1.Enabled := False;     exit;   end;   TC := GetTickCount();   TCC := TC - FTickCount;   if TCC = 0 then     TCC := $FFFFFFFF;   lbWC.Caption := IntToStr(GetServerExecCount());   lbWU.Caption := FloatToStrF(GetServerExecLong() / TCC * 100, ffFixed, 10, 3) + '%';   for i := 1 to Info.RowCount - 1 do   begin     Count1 := GetWorkerExecInfo(i, Count2);     TC := GetTickCount();     TCC := TC - FTickCount;     if TCC = 0 then       TCC := $FFFFFFFF;            Count3 := StrToInt(Info.Cells[2, i]);     if Count1 <> Count3 then     begin       Info.Cells[2, i] := IntToStr(Count1);       Info.Cells[3, i] := IntToStr(Count1 - Count3);       Info.Cells[4, i] := IntToStr(Count2);       Info.Cells[5, i] := FloatToStrF(Count2 / TCC * 100, ffFixed, 10, 1) + '%';       Info.Cells[6, i] := DateTimeToStr(Now());     end;   end;   FTickCount := TC;   lbIO.Caption := IntToStr(GetIOMemSize());   lbIOU.Caption := FloatToStrF(GetIOMemUse(), ffFixed, 10, 3) + '%';   Count1 := GetIOMemFree();   lbIOF.Caption := IntToStr(Count1);   lbIOFL.Caption := FloatToStrF(Count1 / IO_MEM_MAX_COUNT * 100, ffFixed, 10, 3) + '%';   lbW.Caption := IntToStr(GetWorkerCount());   lbL.Caption := IntToStr(GetLinkSize());   Count1 := GetLinkFree();   lbLF.Caption := IntToStr(Count1);   lbLFL.Caption := FloatToStrF(Count1 / SOCK_MAX_COUNT * 100, ffFixed, 10, 3) + '%';   lbLU.Caption := FloatToStrF(GetLinkUse(), ffFixed, 10, 3) + '%';   lbLS.Caption := IntToStr(GetLinkCount()); end; procedure TfrmMainUTcpServerIOCP.FormDestroy(Sender: TObject); begin   StopTcpServer(); end; procedure TfrmMainUTcpServerIOCP.Timer2Timer(Sender: TObject); begin   if not IsTcpServerActive() then   begin     Timer1.Enabled := False;     exit;   end;   PostBroadcast(PByte(PChar('这是来自服务器的数据!')), 21); end; end.
    最新回复(0)