在c#多线程使用IOCP(完成端口)的简单示例

    技术2022-05-20  50

    在c#使用IOCP(完成端口)的简单示例 上次给大家发了利用winsock原生的api来做一个同步的socket服务器的例子,大致上只是贴了一些代码,相信大家这么冰雪聪明,已经研究的差不多了。因为winsock的api使用在msdn或者google上都能很方便的查到,所以我没太多罗嗦代码的原理。但是c#进行平台调用方面是有一些经验的,单靠google和msdn及社区的力量有时候不容易得到答案。这次给大家演示一下利用IOCP的在线程间传递数据的例子,顺便打算讲一些细节和注意的地方。

    概述:这里主要使用IOCP的三个 API,CreateIoCompletionPort,PostQueuedCompletionStatus,GetQueuedCompletionStatus,第一个是用来创建一个完成端口对象,第二个是向一个端口发送数据,第三个是接受数据,基本上用着三个函数,就可以写一个使用IOCP的简单示例。

    其中完成端口一个内核对象,所以创建的时候会耗费性能,CPU得切换到内核模式,而且一旦创建了内核对象,我们都要记着要不用的时候显式的释放它的句柄,释放非托管资源的最佳实践肯定是使用 Dispose模式,这个博客园有人讲过N次了。而一般要获取一个内核对象的引用,最好用SafeHandle来引用它,这个类可以帮你管理引用计数,而且用它引用内核对象,代码更健壮,如果用指针引用内核对象,在创建成功内核对象并复制给指针这个时间段,如果抛了 ThreadAbortException,这个内核对象就泄漏了,而用SafeHandle去应用内核对象就不会在赋值的时候发生 ThreadAbortException。另外SafeHandle类继承自CriticalFinalizerObject类,并实现了 IDispose接口,CLR对CriticalFinalizerObject及其子类有特殊照顾,比如说在编译的时候优先编译,在调用非 CriticalFinalizerObject类的Finalize方法后再调用CriticalFinalizerObject类的Finalize 类的Finalize方法等。在win32里,一般一个句柄是-1或者0的时候表示这个句柄是无效的,所以.net有一个SafeHandle的派生类 SafeHandleZeroOrMinusOneIsInvalid ,但是这个类是一个抽象类,你要引用自己使用的内核对象或者非托管对象,要从这个类派生一个类并重写Relseas方法。另外在.net框架里它有两个实现几乎一模一样的子类,一个是SafeFileHandle一个是SafeWaitHandle,前者表示文件句柄,后者表示等待句柄,我们这里为了方便就直接用SafeFileHandle来引用完成端口对象了。

    CreateIoCompletionPort函数的原型如下 [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)] public static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads);

    FileHandle参数表示要绑定在完成端口上的句柄,比如说一个已经accept的socket句柄。

    ExistingCompletionPort 参数表示一个已有的完成端口句柄,第一次创建完成端口的时候显然随便传个值就行,所以这个参数直接定义成IntPtr类型了。当你创建了工作线程来为 I/O请求服务的时候,才会把句柄和完成端口关联在一起,而之前第一次创建完成端口的时候这个参数传一个zero指针就O了,而FileHandle参数传一个-1的指针就行了。

    CompletionKey是完成键的意思,它可以是任意想传递给工作线程的数据,学名叫做单句柄数据,就是说跟随FileHandle参数走的一些状态数据,一般在socket的iocp 程序里是把socket传进去,以便在工作线程里拿到这个socket句柄,在收到异步操作完成的通知及处理后继续进行下一个异步操作的投递,如发送和接受数据等。

    NumberOfConcurrentThreads 参数表示在一个完成端口上同时允许执行的最大线程数量。如果传0,就是说你有几个CPU,就是允许最大有几个线程,这也是最理想情况,因为一个CPU一个线程可以防止线程上下文切换。关于这个值要和创建工作线程的数量的关系,大家要理解清楚,不一定CPU有多少个,你的工作线程就创建多少个。因为你的工作线程有时候会阻塞或者等待,而如果你正好创建了CPU个数个工作线程,有一个等待的话,因为你分配了同时最多有CPU个数多个最大IOCP线程,这时候就不能效率最大化了。所以一般工作线程创建的要比CPU个数多一些,除非你保证你的工作线程不会阻塞。

    PostQueuedCompletionStatus函数原型如下  [DllImport("Kernel32", CharSet = CharSet.Auto)]     private static extern bool PostQueuedCompletionStatus(SafeFileHandle CompletionPort, uint dwNumberOfBytesTransferred, IntPtr dwCompletionKey, IntPtr lpOverlapped); 该方法用于给完成端口投递自定义信息,一般情况下如果把某个句柄和完成端口绑定后,当有数据收发操作完成时会自动同时工作线程,工作线程里的 GetQueuedCompletionStatus就不会阻塞,而继续往下走,来进行接收到IO操作完成通知的流程。而有时候我们需要手工向工作者线程投递一些消息,比如说我们主线程知道所有的socket句柄都关闭了,工作线程可以退出了,我们就可以给工作线程发一个自定义数据,工作线程收到后判断是否是退出指令,然后退出。

    CompletionPort参数表示向哪个完成端口对象投递信息,在这个完成端口上等待消息的工作线程就会收到消息了。 dwNumberOfBytesTransferred表示你投递的数据有多大,我们一般投递的是一个对象的指针,在32位系统里,int指针就是4个字节了,直接写4就O了,要不就用sizeof你传的数据,如sizeof(IntPtr)。

    dwCompletionKey同CreateIoCompletionPort的解释,是单句柄数据,本示例用不到,不细说,直接用IntPtr.Zero填充了事。

    lpOverlapped参数,本意是一个win32的overlapped结构的指针,本示例中不用,所以不详细讲。它叫单IO数据,是相对单据并拘束CompletionKey来讲的,前者是一个句柄的每次IO操作的上下文,比如单词IO操作的数据、操作类型等,后者是整个句柄的上下文。但这里我们表示你要投递的数据,可以是任何类型的数据(谁让它是个指针呢,所以传啥都行),值得注意的一点就是,这个数据传递到工作线程的时候,中间这个数据走的是非托管代码。所以不能直接传一个引用进去,这里要使用到GCHandle类。先大致介绍一下这个类吧。它有个静态方法Alloc来给把一个对象在GC句柄表里注册,GC句柄表示CLR为没个应用程序域提供的一个表,它允许你来监视和管理对象的生命周期,你可以往里加一个对象的引用,也可以从里面移除一个对象,往里加对象的时候,还可以指定一个标记来表示我们希望如何监视和控制这个对象。而加入一个条目的操作就是GCHandle的Alloc对象,它有两个参数,第一个参数是对象,第二参数是 GCHandleType类型的枚举,第二个参数表示我们如何来监视和控制这个对象的生命周期。当这个参数是GCHandleType.Normal时,表示我们告诉垃圾收集器,及时托管代码里没有该对象的根,也不要回收该对象,但垃圾收集器可以移动它,一般我们向非托管代码传递一个对象,而又从非托管代码传递回来的时候用这个类型非常好,它不会让垃圾收集器在非托管代码返回托管代码的时候回收掉该对象,还不怎么影响GC的性能,因为GC还可以移动它。 dwCompletionKey就是我们在托管-非托管-托管之间传递的一个很典型的场景。所以这里用它,另外还有 GCHandleType.Pinned,它和GCHandleType.Normal不同的一点就是GC除了在没有根的时候不能回收这个对象外,还不能移动它,应用场景是给非托管代码传递一个byte[]的buffer,让托管代码去填充,如果用GCHandleType.Normal有可能在非托管代码返回托管代码的时候写错内存位置,因为有可能GC移动了这个对象的内存地址。关于根、GC原理,大家可以参考相关资料。另外在你的数据从非托管代码传递会托管代码后,要调用GCHandle的实例方法free来在GC句柄表里移除该对象,这时候你的托管代码还有个该对象的引用,也就是根,GC也不会给你回收的,当你用完了后,GC就给你回收了。GCHandle的Target属性用来访问GCHandle指向的对象。其它两个GCHandleType的成员是关于弱引用的,和本文关系不大,就不介绍了。

    GetQueuedCompletionStatus原型如下 [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]   public static extern bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort,       out uint lpNumberOfBytesTransferred, out IntPtr lpCompletionKey,       out IntPtr lpOverlapped, uint dwMilliseconds); 前几个参数和PostQueuedCompletionStatus差不多, CompletionPort表示在哪个完成端口上等待PostQueuedCompletionStatus发来的消息,或者IO操作完成的通知,

    lpNumberOfBytesTransferred 表示收到数据的大小,这个大小不是说CompletionKey的大小,而是在单次I/O操作完成后(WSASend或者WSAReceve),实际传输的字节数,我在这里理解的不是很透彻,我觉得如果是接受PostQueuedCompletionStatus的消息的话,应该是收到 lpOverlapped的大小,因为它才是单IO数据嘛。

    lpCompletionKey用来接收单据并数据,我们没传递啥,后来也没用,在socket程序里,一般接socket句柄。

    lpOverlapped用来接收单IO数据,或者我们的自定义消息。

    dwMilliseconds表示等待一个自定义消息或者IO完成通知消息在完成端口上出现的时间,传递INIFINITE(0xffffffff)表示无限等待下去。

    好了,API大概介绍这么多,下面介绍代码 1、主线程创建一个完成端口对象,不和任何句柄绑定,前几个参数都写0,NumberOfConcurrentThreads参数我们写1,因为我们的示例就一个工作线程。 2、创建一个工作线程,把第一步创建的完成端口传进去 3、创建两个单IO数据,分别发投递给第一步创建的完成端口 4、在工作线程里执行一个死循环,循环在传递进来的完成端口上等待消息,没有消息的时候GetQueuedCompletionStatus处于休息状态,有消息来的时候把指针转换成对象,然后输出 5、如果收到退出指令,就退出循环,从而结束工作者线程。 下面是完整代码,需要打开不安全代码的编译选项。

    using System; using System.Runtime.InteropServices; using System.Threading; using Microsoft.Win32.SafeHandles;

    [StructLayout(LayoutKind.Sequential)] class PER_IO_DATA {     public string Data; }

    public class IOCPApiTest {     [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]     public static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads);     [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]     public static extern bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort,         out uint lpNumberOfBytesTransferred, out IntPtr lpCompletionKey,         out IntPtr lpOverlapped, uint dwMilliseconds);     [DllImport("Kernel32", CharSet = CharSet.Auto)]     private static extern bool PostQueuedCompletionStatus(SafeFileHandle CompletionPort, uint dwNumberOfBytesTransferred, IntPtr dwCompletionKey, IntPtr lpOverlapped);

        public static unsafe void TestIOCPApi()     {         var CompletionPort = CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, IntPtr.Zero, 1);         if(CompletionPort.IsInvalid)         {             Console.WriteLine("CreateIoCompletionPort 出错:{0}",Marshal.GetLastWin32Error());         }         var thread = new Thread(ThreadProc);         thread.Start(CompletionPort);

            var PerIOData = new PER_IO_DATA() ;         var gch = GCHandle.Alloc(PerIOData);         PerIOData.Data = "hi,我是蛙蛙王子,你是谁?";         Console.WriteLine("{0}-主线程发送数据",Thread.CurrentThread.GetHashCode());         PostQueuedCompletionStatus(CompletionPort, (uint)sizeof(IntPtr), IntPtr.Zero, (IntPtr)gch);

            var PerIOData2 = new PER_IO_DATA();         var gch2 = GCHandle.Alloc(PerIOData2);         PerIOData2.Data = "关闭工作线程吧";         Console.WriteLine("{0}-主线程发送数据", Thread.CurrentThread.GetHashCode());         PostQueuedCompletionStatus(CompletionPort, 4, IntPtr.Zero, (IntPtr)gch2);         Console.WriteLine("主线程执行完毕");         Console.ReadKey();     }     static void ThreadProc(object CompletionPortID)     {         var CompletionPort = (SafeFileHandle)CompletionPortID;

            while (true)         {             uint BytesTransferred;             IntPtr PerHandleData;             IntPtr lpOverlapped;             Console.WriteLine("{0}-工作线程准备接受数据",Thread.CurrentThread.GetHashCode());             GetQueuedCompletionStatus(CompletionPort, out BytesTransferred,                                       out PerHandleData, out lpOverlapped, 0xffffffff);             if(BytesTransferred <= 0)                 continue;             GCHandle gch = GCHandle.FromIntPtr(lpOverlapped);             var per_HANDLE_DATA = (PER_IO_DATA)gch.Target;             Console.WriteLine("{0}-工作线程收到数据:{1}", Thread.CurrentThread.GetHashCode(), per_HANDLE_DATA.Data);             gch.Free();             if (per_HANDLE_DATA.Data != "关闭工作线程吧") continue;             Console.WriteLine("收到退出指令,正在退出");             CompletionPort.Dispose();             break;         }     }

        public static int Main(String[] args)     {         TestIOCPApi();         return 0;     } } posted on 2008-07-12 19:46 蛙蛙池塘 阅读(2678) 评论(19)  编辑 收藏 网摘

    评论: #1楼 [楼主]  蛙蛙池塘       Posted @ 2008-07-12 19:53 好想把文字的灰底儿去了,可是不会去,我复制上来就成这了,汗。   回复  引用  查看   

    #2楼 [楼主]  蛙蛙池塘       Posted @ 2008-07-12 19:55 汗,发现一个历史上的今天的帖子 蛙蛙推荐:偶心目中的编程高手,大家也推荐一下 http://www.cnblogs.com/onlytiancai/archive/2005/07/12/191287.html   回复  引用  查看   

    #3楼   henry       Posted @ 2008-07-12 20:30 c#的socket异步不是已经具备IOCP功能了吗?   回复  引用  查看   

    #4楼 [楼主]  蛙蛙池塘       Posted @ 2008-07-12 21:02 @henry 异步socket是封装了iocp,呵呵。.net 2.0的有些问题,3.0的异步提高了不少据说,不过本文主要是介绍底层原理及让有兴趣的朋友自己重新实现IOCP。 另外看你的博客对emit和反射研很了解。   回复  引用  查看   

    #5楼   airwolf2026       Posted @ 2008-07-12 22:21 c#的异步socket真的封装了IOCP了?俺对c#网络编程了解比较少.那是不是说 c#里面的一些异步操作.如果系统有提供IOCP功能的话,是不是也封装了?感觉自己问的这个问题好傻?   回复  引用  查看   

    #6楼 [楼主]  蛙蛙池塘       Posted @ 2008-07-12 22:26 @airwolf2026 socket的异步操作是用的IOCP实现的,一般来说IO密集型的异步操作.NET都是用IOCP实现的,计算密集型的异步操作都是用ThreadPool实现的,当然只是一般来说。   回复  引用  查看   

    #7楼   airwolf2026       Posted @ 2008-07-12 22:28 呵呵.多谢楼主.   回复  引用  查看   

    #8楼 [楼主]  蛙蛙池塘       Posted @ 2008-07-12 22:43 @airwolf2026 :)   回复  引用  查看   

    #9楼   黎叔 [未注册用户] Posted @ 2008-07-13 00:10 var只能让代码更难读懂。

    有必要全篇使用么   回复  引用   

    #10楼   黎叔 [未注册用户] Posted @ 2008-07-13 00:14 不过感谢楼主的好文章   回复  引用   

    #11楼   Anders Liu       Posted @ 2008-07-13 02:24 历史上的今天: 2006-07-12 数据库范式俗话 http://www.cnblogs.com/AndersLiu/archive/2006/07/12/448595.html   回复  引用  查看   

    #12楼 [楼主]  蛙蛙池塘       Posted @ 2008-07-13 23:11 哥们折腾一天IOCP也没写出个例子来,汗了。 Win32的WSAReceive函数,如下 WSARecv([In] SafeSocketHandle socketHandle, [In, Out] ref WSABuffer buffer, [In] int bufferCount, out int bytesTransferred, [In, Out] ref SocketFlags socketFlags, [In] IntPtr overlapped, [In] IntPtr completionRoutine);

    倒数第二个参数只能传递overlapped对象指针,我想传递一个自定义对象,不知道咋办。 目前只能用以下方式调用 GCHandle gcHandle = GCHandle.Alloc(PerIoData.Overlapped, GCHandleType.Pinned); WSARecv(Accept, ref PerIoData.DataBuf ,1, out RecvBytes, ref Flags, gcHandle.AddrOfPinnedObject(), IntPtr.Zero); 可是,我想把整个PerIoData传进去,而不是光一个overlapped对象,PerIoData类型如下。 [StructLayout(LayoutKind.Sequential)] class PER_IO_OPERATION_DATA { public unsafe NativeOverlapped Overlapped; public WSABuffer DataBuf; public readonly byte[] Buffer = new byte[DATA_BUFSIZE]; public uint BytesSEND; public uint BytesRECV; }

    在c++里很简单,只要传递PerIoData对象的首地址,其实也就是类型的第一个成员Overlapped的地址进去就行了,然后就可以在工作线程里用GetQueuedCompletionStatus取到了,代码大约如下 执行异步IO操作 WSARecv(Accept, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags,&(PerIoData->Overlapped), NULL); 接受异步IO完成通知 GetQueuedCompletionStatus(CompletionPort, &BytesTransferred,(LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE) 灰常的简单,直接就取出了PerHandleData数据。

    c#就不知道咋搞了,我用Marshal.StructureToPtr,Marshal.AllocHGlobal,Marshal.Copy折腾半天也没弄成。而直接用以下代码 GCHandle gcHandle2 = GCHandle.Alloc(PerIoData); PerIoData.Overlapped = new NativeOverlapped(); WSARecv(Accept, ref PerIoData.DataBuf ,1, out RecvBytes, ref Flags, GCHandle.ToIntPtr(gcHandle2), IntPtr.Zero) 会说无效句柄的。 我用WSARecv(Accept, ref PerIoData.DataBuf ,1, out RecvBytes, ref Flags, new IntPtr(GCHandle.ToIntPtr(gcHandle2).ToInt32()+8), IntPtr.Zero)倒是可以,然后再工作线程里用以下语句接受自定义对象 gcHandle_per_io_data = GCHandle.FromIntPtr(new IntPtr(intptr_per_io_data.ToInt32()-8)); PER_IO_OPERATION_DATA PerIoData = (PER_IO_OPERATION_DATA)gcHandle_per_io_data.Target; 确实是可以,但是当第二个请求进来之后,GCHandle.ToIntPtr(gcHandle2).ToInt32()+8就不是overlapped的地址了,我也不知道为啥,可能这只是一个巧合。

    要怎么才能在WSARecv里向工作线程传递一个自定义信息呢,我除了传overlapped外,怎么也得把异步操作类型、缓存等传给工作线程吧。   回复  引用  查看   

    #13楼 [楼主]  蛙蛙池塘       Posted @ 2008-07-13 23:12 除了这个问题,别的基本都解决了。   回复  引用  查看   

    #14楼   曲滨*銘龘鶽       Posted @ 2008-07-14 00:14 HANDLE WINAPI CreateIoCompletionPort( __in HANDLE FileHandle, __in HANDLE ExistingCompletionPort, __in ULONG_PTR CompletionKey, __in DWORD NumberOfConcurrentThreads ); ********************************* __in ULONG_PTR CompletionKey, 这个参数本来是一个 4 字节的整数,也就是说可以放入一个win32 指针; ********************************

    C++ 代码中是在第2处 CreateIoCompletionPort 调用处 把 PerHandleData 传递进去的的并不是 WSARecv

    if (CreateIoCompletionPort((HANDLE) Accept, CompletionPort, (DWORD) PerHandleData, 0) == NULL) { printf("CreateIoCompletionPort failed with error %d/n", GetLastError()); return; }

    接收

    if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE) == 0) { printf("GetQueuedCompletionStatus failed with error %d/n", GetLastError()); return 0; }

    WSARecv 本身参数是有类型大小限定的 乱传递也没用的;   回复  引用  查看   

    #15楼 [楼主]  蛙蛙池塘       Posted @ 2008-07-14 09:37 @曲滨*銘龘鶽 这可咋办,这可咋整,这可咋好,闪灵给搭救一下残喘吧。难道在c#里知道一个类的成员的内存地址,不能知道它所在类的内存地址吗?   回复  引用  查看   

    #16楼   曲滨*銘龘鶽       Posted @ 2008-07-14 10:23 --引用-------------------------------------------------- 蛙蛙池塘:

    @曲滨*銘龘鶽这可咋办,这可咋整,这可咋好,闪灵给搭救一下残喘吧。难道在c#里知道一个类的成员的内存地址,不能知道它所在类的内存地址吗? --------------------------------------------------------

    .net 的 对象好像和 C++ 不同不是一片连续的内存地址所以和C++ API 交互只能用人家能识别的东西值类型、结构体等;

    类不加 [StructLayout(LayoutKind.Sequential)] 标签的都不行好像;   回复  引用  查看   

    #17楼   T.t.T!Ck.¢#       Posted @ 2008-07-14 10:35 笨蛙蛙 o(∩_∩)o   回复  引用  查看   

    #18楼   overred       Posted @ 2009-01-05 16:35 ... .net 里的异步IO跟非阻塞 IOCP还是有区别的吧

    比如在县城切换以及阻塞上 我感觉异步IO就是系统做完然后通知你(不能精确) 非阻塞的IOCP则是自己不停的看看我是可以做某个IO,只要是一个线程忙,在cpu上就不开其他的,如果阻塞则开启新线程 while (true) {GetQueuedCompletionStatus。。。}

    。。。

    iocp在 vc里用得 多 。。。 蛙蛙今天没来上班 扣你工资。。。。   回复  引用  查看   

    #19楼 [楼主]  蛙蛙池塘       Posted @ 2009-01-05 20:30 @overred 我明天不去公司,后天也不去,大后天也不去,大大后天也不去。。。。   回复  引用  查看 

    本文来自博客,转载请标明出处:http://blog.csdn.net/zfrong/archive/2009/03/13/3987886.aspx


    最新回复(0)