Windows Socket IO模型之 WSAEventSelect模式

    技术2024-08-17  62

    # include < winsock2. h> # include < ws2tcpip. h> # include "public.h"# include "resolve.h" typedef SINGLE_LIST_HEADER BuffHeader;typedef SINGLE_LIST BuffObj;typedef SINGLE_LIST_HEADER TheadObjHeader;typedef SINGLE_LIST ThreadObj;typedef DOUBLE_LIST_HEADER SockObjHeader;typedef DOUBLE_LIST SockObj; typedef struct _SOCKET_OBJ{    SOCKET s; // Socket handle    HANDLE event; // Event handle    int listening; // Socket is a listening socket (TCP)    int closing; // Indicates whether the connection is closing     SOCKADDR_STORAGE addr; // Used for client's remote address    int addrlen; // Length of the address     BuffHeader buff;     DOUBLE_LIST entry;} SOCKET_OBJ; typedef struct _THREAD_OBJ{    SockObjHeader sockHeader;     HANDLE Event; // Used to signal new clients assigned                                       // to this thread    HANDLE Thread;     HANDLE Handles[ MAXIMUM_WAIT_OBJECTS] ; // Array of socket's event handles     CRITICAL_SECTION ThreadCritSec; // Protect access to SOCKET_OBJ lists     ThreadObj entry; // Next thread object in list} THREAD_OBJ; TheadObjHeader theadObjHeader; SOCKET_OBJ* GetSocketObj( SOCKET s, int listening) {    SOCKET_OBJ * sockobj = NULL ;     sockobj = ( SOCKET_OBJ* ) HeapAlloc( GetProcessHeap( ) , HEAP_ZERO_MEMORY, sizeof ( SOCKET_OBJ) ) ;    if ( sockobj = = NULL ) {        fprintf ( stderr , "HeapAlloc failed./n" ) ;        ExitProcess( - 1) ;    }     sockobj- > s = s;    sockobj- > listening = listening;    sockobj- > addrlen = sizeof ( sockobj- > addr) ;     sockobj- > event = WSACreateEvent( ) ;    if ( sockobj- > event = = NULL )    {        fprintf ( stderr , "GetSocketObj: WSACreateEvent failed: %d/n" , WSAGetLastError( ) ) ;        ExitProcess( - 1) ;    }     InitializeCriticalSection( & sockobj- > buff. SendRecvQueueCritSec) ;     return sockobj;} THREAD_OBJ * GetThreadObj( ) {    THREAD_OBJ * thread = NULL ;     thread = ( THREAD_OBJ* ) HeapAlloc( GetProcessHeap( ) , HEAP_ZERO_MEMORY, sizeof ( THREAD_OBJ) ) ;    if ( thread = = NULL ) {        fprintf ( stderr , "HeapAllco failed./n" ) ;        ExitProcess( - 1) ;    }     thread- > Event = WSACreateEvent( ) ;    if ( thread- > Event = = NULL ) {        fprintf ( stderr , "WSACreateEvent failed./n" ) ;        ExitProcess( - 1) ;    }     thread- > Handles[ 0] = thread- > Event;     InitializeCriticalSection( & thread- > ThreadCritSec) ;    InitializeDoubleHead( & thread- > sockHeader) ;     return thread;} int InsertSocketObj( THREAD_OBJ * thread, SOCKET_OBJ * sockobj) {    int ret;     EnterCriticalSection( & thread- > ThreadCritSec) ;     if ( thread- > sockHeader. count < MAXIMUM_WAIT_OBJECTS - 1) {        EnqueueDoubleListHead( & ( thread- > sockHeader) , & ( sockobj- > entry) ) ;         thread- > Handles[ thread- > sockHeader. count ] = sockobj- > event;         ret = NO_ERROR;    } else {        ret = SOCKET_ERROR;    }     LeaveCriticalSection( & thread- > ThreadCritSec) ;     return ret;} SOCKET_OBJ * FindSocketObj( THREAD_OBJ * thread, int index) {    SOCKET_OBJ * sockobj = NULL ;    int i;     EnterCriticalSection( & thread- > ThreadCritSec) ;     SockObj * sptr = ( SockObj * ) GotoNextDoubleList( & thread- > sockHeader, & ( thread- > sockHeader. head) ) ;    for ( i = 0; i < index; + + i) {        if ( sptr = = NULL )        {            fprintf ( stderr , "FindSocketobj failed./n" ) ;            ExitProcess( - 1) ;        }        sptr = ( SockObj * ) GotoNextDoubleList( & thread- > sockHeader, sptr) ;    }     sockobj = ( SOCKET_OBJ * ) container_of( SOCKET_OBJ, entry, sptr) ;     LeaveCriticalSection( & thread- > ThreadCritSec) ;     return sockobj;} void RemoveSocketObj( THREAD_OBJ * thread, SOCKET_OBJ * sock) {    EnterCriticalSection( & thread- > ThreadCritSec) ;     RemoveDoubleList( & thread- > sockHeader, & sock- > entry) ;    WSASetEvent( thread- > Event) ;     LeaveCriticalSection( & thread- > ThreadCritSec) ;} void FreeSocketObj( SOCKET_OBJ * obj) {    BuffObj * ptr = NULL ;    BUFFER_OBJ * blk = NULL ;     while ( true ) {        ptr = DequeueSingleList( & obj- > buff) ;        if ( ptr = = NULL )            break ;         blk = ( BUFFER_OBJ * ) container_of( BUFFER_OBJ, next, ptr) ;        FreeBufferObj( blk) ;    }     WSACloseEvent( obj- > event) ;     if ( obj- > s ! = INVALID_SOCKET) {        closesocket( obj- > s) ;    }     HeapFree( GetProcessHeap( ) , 0, obj) ;} void RenumberThreadArray( THREAD_OBJ * thread) {    EnterCriticalSection( & thread- > ThreadCritSec) ;     SOCKET_OBJ * obj = NULL ;    int i = 0;     SockObj * sptr = NULL ;    sptr = ( SockObj * ) GotoNextDoubleList( & thread- > sockHeader, & ( thread- > sockHeader. head) ) ;    while ( sptr) {        obj = ( SOCKET_OBJ * ) container_of( SOCKET_OBJ, entry, sptr) ;         thread- > Handles[ + + i] = obj- > event;         sptr = ( SockObj * ) GotoNextDoubleList( & thread- > sockHeader, sptr) ;    }     LeaveCriticalSection( & thread- > ThreadCritSec) ;} int ReceivePendingData( SOCKET_OBJ * sockobj) {    BUFFER_OBJ * buffobj= NULL ;    int rc,                ret;     // Get a buffer to receive the data    buffobj = GetBufferObj( gBufferSize) ;     ret = 0;     if ( gProtocol = = IPPROTO_TCP )    {        rc = recv ( sockobj- > s, buffobj- > buf, buffobj- > buflen, 0) ;    } else {        fprintf ( stderr , "Tcp failed./n" ) ;        ExitProcess( - 1) ;    }     if ( rc = = SOCKET_ERROR) {        fprintf ( stderr , "recv failed./n" ) ;        ExitProcess( - 1) ;    } else if ( rc = = 0) {        FreeBufferObj( buffobj) ;         sockobj- > closing = TRUE ;         if ( sockobj- > buff. head = = NULL )        {            // If no sends are pending, close the socket for good            closesocket( sockobj- > s) ;            sockobj- > s = INVALID_SOCKET;            ret = - 1;        }        else        {            ret = 0;        }    } else {        buffobj- > buflen = rc;        EnqueueSingleList( & sockobj- > buff, & buffobj- > next) ;         ret = 1;    }     return ret;} int SendPendingData( SOCKET_OBJ * sock) {    BUFFER_OBJ * bufobj = NULL ;    BuffObj * entry = NULL ;    int nleft = 0,                idx = 0,                ret = 0,                rc = 0;     while ( entry = DequeueSingleList( & sock- > buff) ) {        bufobj = ( BUFFER_OBJ * ) container_of( BUFFER_OBJ, next, entry) ;         if ( gProtocol = = IPPROTO_TCP ) {            nleft = bufobj- > buflen;            idx = 0;             while ( nleft > 0) {                rc = send ( sock- > s, & ( bufobj- > buf[ idx] ) , nleft, 0) ;                if ( rc = = SOCKET_ERROR) {                    ExitProcess( - 1) ;                } else {                    idx + = rc;                    nleft - = rc;                }            }             printf ( "send %d./n" , bufobj- > buflen) ;             FreeBufferObj( bufobj) ;        } else {            ExitProcess( - 1) ;        }    }     if ( ( sock- > buff. head = = NULL ) & & ( sock- > closing = = TRUE ) ) {        closesocket( sock- > s) ;        sock- > s = INVALID_SOCKET;        ret = - 1;         printf ( "Closing Connection./n" ) ;    }     return ret;} int HandleIo( THREAD_OBJ * thread, SOCKET_OBJ * sock) {    WSANETWORKEVENTS nevents;    int rc;     // Enumerate the events    rc = WSAEnumNetworkEvents( sock- > s, sock- > event, & nevents) ;    if ( rc = = SOCKET_ERROR)    {        fprintf ( stderr , "HandleIo: WSAEnumNetworkEvents failed: %d/n" , WSAGetLastError( ) ) ;        return SOCKET_ERROR;    }     if ( nevents. lNetworkEvents & FD_READ) {        if ( nevents. iErrorCode[ FD_READ_BIT] = = 0) {            rc = ReceivePendingData( sock) ;            if ( rc = = - 1)            {                RemoveSocketObj( thread, sock) ;                FreeSocketObj( sock) ;                return SOCKET_ERROR;            }            rc = SendPendingData( sock) ;            if ( rc = = - 1)            {                RemoveSocketObj( thread, sock) ;                FreeSocketObj( sock) ;                return SOCKET_ERROR;            }        } else {            fprintf ( stderr , "HandleIo: FD_READ error %d/n" , nevents. iErrorCode[ FD_READ_BIT] ) ;            RemoveSocketObj( thread, sock) ;            FreeSocketObj( sock) ;            return SOCKET_ERROR;        }    }    if ( nevents. lNetworkEvents & FD_WRITE) {        if ( nevents. iErrorCode[ FD_WRITE_BIT] = = 0)        {            rc = SendPendingData( sock) ;            if ( rc = = - 1)            {                RemoveSocketObj( thread, sock) ;                FreeSocketObj( sock) ;                return SOCKET_ERROR;            }        }        else        {            fprintf ( stderr , "HandleIo: FD_WRITE error %d/n" , nevents. iErrorCode[ FD_WRITE_BIT] ) ;            return SOCKET_ERROR;        }    }    if ( nevents. lNetworkEvents & FD_CLOSE) {        if ( nevents. iErrorCode[ FD_CLOSE_BIT] = = 0)        {            // Socket has been indicated as closing so make sure all the data            // has been read            printf ( "close./n" ) ;            while ( 1)            {                rc = ReceivePendingData( sock) ;                if ( rc = = - 1)                {                    RemoveSocketObj( thread, sock) ;                    FreeSocketObj( sock) ;                    return SOCKET_ERROR;                }                else if ( rc ! = 0)                {                    continue ;                }                else                {                    break ;                }            }            // See if there is any data pending, if so try to send it            rc = SendPendingData( sock) ;            if ( rc = = - 1)            {                RemoveSocketObj( thread, sock) ;                FreeSocketObj( sock) ;                return SOCKET_ERROR;            }        }        else        {            fprintf ( stderr , "HandleIo: FD_CLOSE error %d/n" , nevents. iErrorCode[ FD_CLOSE_BIT] ) ;            RemoveSocketObj( thread, sock) ;            FreeSocketObj( sock) ;            return SOCKET_ERROR;        }    }     return NO_ERROR;} DWORD WINAPI ChildThread( LPVOID lpParam) {    THREAD_OBJ * thread= NULL ;    SOCKET_OBJ * sptr= NULL ,               * sockobj= NULL ;    int index,                rc,                i;     thread = ( THREAD_OBJ * ) lpParam;     while ( true ) {        rc = WaitForMultipleObjects( thread- > sockHeader. count + 1, thread- > Handles, FALSE , INFINITE) ;        if ( rc = = WAIT_FAILED | | rc = = WAIT_TIMEOUT)        {            fprintf ( stderr , "ChildThread: WaitForMultipleObjects failed: %d/n" , GetLastError( ) ) ;            break ;        } else {            for ( i = 0; i < thread- > sockHeader. count + 1; i+ + ) {                rc = WaitForSingleObject( thread- > Handles[ i] , 0) ;                 if ( rc = = WAIT_FAILED)                {                    fprintf ( stderr , "ChildThread: WaitForSingleObject failed: %d/n" , GetLastError( ) ) ;                    ExitThread( - 1) ;                }                else if ( rc = = WAIT_TIMEOUT)                {                    // This event isn't signaled, continue to the next one                    continue ;                }                index = i;                 if ( index = = 0)                {                    // If index 0 is signaled then rebuild the array of event                    // handles to wait on                    WSAResetEvent( thread- > Handles[ index] ) ;                     RenumberThreadArray( thread) ;                     i = 1;                } else {                    sockobj = FindSocketObj( thread, index- 1) ;                    if ( sockobj ! = NULL )                    {                        if ( HandleIo( thread, sockobj) = = SOCKET_ERROR)                        {                            RenumberThreadArray( thread) ;                        }                    }                    else                    {                        printf ( "Unable to find socket object!/n" ) ;                    }                }            }        }    }} void AssignToFreeThread( SOCKET_OBJ * sock) {    ThreadObj * threadobj = NULL ;    THREAD_OBJ * thread = NULL ;     threadobj = ( ThreadObj * ) GotoNextSingleList( & theadObjHeader, theadObjHeader. head) ;    while ( threadobj) {        thread = ( THREAD_OBJ * ) container_of( THREAD_OBJ, entry, threadobj) ;         if ( InsertSocketObj( thread, sock) ! = SOCKET_ERROR) {            break ;        }        threadobj = ( ThreadObj * ) GotoNextSingleList( & theadObjHeader, threadobj) ;    }     if ( threadobj = = NULL ) {        thread = GetThreadObj( ) ;         thread- > Thread = CreateThread( NULL , 0, ChildThread, ( LPVOID) thread, 0, NULL ) ;        if ( thread- > Thread = = NULL )        {            fprintf ( stderr , "AssignToFreeThread: CreateThread failed: %d/n" , GetLastError( ) ) ;            ExitProcess( - 1) ;        }         InsertSocketObj( thread, sock) ;         EnqueueSingleList( & theadObjHeader, & thread- > entry) ;    }     WSASetEvent( thread- > Event) ;} int _tmain( int argc, _TCHAR* argv[ ] ){    WSADATA wsd;     struct addrinfo * res= NULL ,                    * ptr= NULL ;     THREAD_OBJ * thread= NULL ;    SOCKET_OBJ * sockobj= NULL ,                    * newsock= NULL ;     int index,                     rc;     if ( WSAStartup( MAKEWORD( 2, 2) , & wsd) ! = 0)    {        fprintf ( stderr , "unable to load Winsock!/n" ) ;        return - 1;    }     res = ResolveAddress( gSrvAddr, gPort, gAddressFamily, gSocketType, gProtocol) ;    if ( res = = NULL )    {        fprintf ( stderr , "ResolveAddress failed to return any addresses!/n" ) ;        return - 1;    }     thread = GetThreadObj( ) ;     InitializeCriticalSection( & theadObjHeader. SendRecvQueueCritSec) ;    theadObjHeader. head = theadObjHeader. tail = NULL ;     ptr = res;    while ( ptr) {        sockobj = GetSocketObj( INVALID_SOCKET, ( gProtocol = = IPPROTO_TCP ) ? TRUE : FALSE ) ;         sockobj- > s = socket ( ptr- > ai_family, ptr- > ai_socktype, ptr- > ai_protocol) ;        if ( sockobj- > s = = INVALID_SOCKET) {            fprintf ( stderr , "create socket failed./n" ) ;            ExitProcess( - 1) ;        }         InsertSocketObj( thread, sockobj) ;         rc = bind ( sockobj- > s, ptr- > ai_addr, ptr- > ai_addrlen) ;        if ( rc = = SOCKET_ERROR)        {            fprintf ( stderr , "bind failed: %d/n" , WSAGetLastError( ) ) ;            return - 1;        }         if ( gProtocol = = IPPROTO_TCP ) {            rc = listen ( sockobj- > s, 200) ;            if ( rc = = SOCKET_ERROR) {                fprintf ( stderr , "listen failed./n" ) ;                ExitProcess( - 1) ;            }             rc = WSAEventSelect( sockobj- > s, sockobj- > event, FD_ACCEPT | FD_CLOSE) ;            if ( rc = = SOCKET_ERROR) {                fprintf ( stderr , "WSAEventSelect failed: %d/n" , WSAGetLastError( ) ) ;                ExitProcess( - 1) ;            }        }         ptr = ptr- > ai_next;    }     freeaddrinfo ( res) ;     while ( true ) {        rc = WaitForMultipleObjects( thread- > sockHeader. count + 1, thread- > Handles, FALSE , 5000) ;        if ( rc = = WAIT_FAILED) {            fprintf ( stderr , "WaitForMultipleObjects failed:%d/n" , WSAGetLastError( ) ) ;            break ;        } else if ( rc = = WAIT_TIMEOUT) {            continue ;        } else {            index = rc - WAIT_OBJECT_0;             sockobj = FindSocketObj( thread, index - 1) ;             if ( gProtocol = = IPPROTO_TCP ) {                SOCKADDR_STORAGE sa;                WSANETWORKEVENTS ne;                SOCKET sc;                int salen;                 rc = WSAEnumNetworkEvents( sockobj- > s, thread- > Handles[ index] , & ne) ;                if ( rc = = SOCKET_ERROR) {                    fprintf ( stderr , "WSAEnumNetworkEvents failed./n" ) ;                    break ;                }                 while ( true ) {                    sc = INVALID_SOCKET;                    salen = sizeof ( sa) ;                     sc = accept ( sockobj- > s, ( SOCKADDR * ) & sa, & salen) ;                    if ( ( sc = = INVALID_SOCKET) & & ( WSAGetLastError( ) ! = WSAEWOULDBLOCK) ) {                        fprintf ( stderr , "accept failed./n" ) ;                        break ;                    } else if ( sc = = INVALID_SOCKET) {                        continue ;                    } else {                        newsock = GetSocketObj( INVALID_SOCKET, FALSE ) ;                         memcpy ( & newsock- > addr, & sa, salen) ;                        newsock- > addrlen = salen;                        newsock- > s = sc;                         rc = WSAEventSelect( newsock- > s, newsock- > event, FD_READ | FD_WRITE | FD_CLOSE) ;                        if ( rc = = SOCKET_ERROR)                        {                            fprintf ( stderr , "WSAEventSelect failed: %d/n" , WSAGetLastError( ) ) ;                            break ;                        }                         AssignToFreeThread( newsock) ;                    }                }            }        }    }     WSACleanup( ) ;     return 0;}

     

    版权声明: 原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。

    最新回复(0)