.NET Compact Framework 中的点对点消息队列

    技术2022-05-11  13

    .NET Compact Framework 中的点对点消息队列

     

    发布日期: 12/26/2005 | 更新日期: 12/26/2005

    Daniel MothTrend Control Systems Ltd.

    适用于:.NET Compact FrameworkVisual Studio .NET 2003Windows CE .NET 版本 4.2Windows CE 版本 5.0Windows Mobile 2003

    摘要:本文说明如何针对 Windows CE .NET 及其更高版本的消息队列点对点函数编写和使用托管包装。

    请从 Microsoft 下载中心下载 Point to Point Message Queues with the .NET Compact Framework.msi。

    本页内容
    简介本机 API 概述 设计托管接口P2PMessageQueue使用 P2PMessageQueue 小结关于作者

    简介

    您对 Windows CE 设备上的进程间通讯 (IPC) 限制感到失望吗?在其他 Windows 平台上,IPC 通过命名管道(本机)或远程处理(托管)的方式实现;这些选项都不能用于 Windows CE。本文说明点对点消息队列 — 一个鲜为人知的 IPC 机制,它高效、灵活,并且专用于 Windows CE 4.0 及其更高版本。此外,本文还说明如何设计和使用托管包装,使 .NET Compact Framework 应用程序中的 IPC 极其简便。

    开发人员能够以多种方式使用点对点消息队列。开发人员通常在无法宿主 .NET Compact Framework 运行库的情况下使用 IPC 作为替代方案(本机进程通过 IPC 与托管进程进行通讯),并且点对点消息队列是最佳选择。点对点消息队列还可以与操作系统进行交互,例如,用于获取电源信息。本文不介绍点对点消息队列的其他用途,这些用途留待您了解核心原理之后自行探究。

    返回页首

    本机 API 概述

    本部分以一个您易于了解的方式描述点对点消息队列函数。要从 MSDN 上查看该应用程序编程接口 (API) 中的 6 个函数和 2 个结构,请参阅 Message Queue Point-to-Point Reference。

    当开发人员使用 IPC 时,一个进程创建用于写入的队列,另一个进程创建用于读取的队列 (CreateMsgQueue)。在双向 IPC 通讯中,开发人员在通讯的每一端(在两个独立的进程间或同一进程的两个对象间)都需要两个队列。

    在使用点对点消息队列时,开发人员可以采用若干种方法。例如,有用于读取 (ReadMsgQueue)、写入 (WriteMsgQueue) 和关闭队列 (CloseMsgQueue) 的方法。此外,如果除了队列句柄之外,开发人员还具有拥有该队列的源进程的句柄,那么他们就可以打开现有队列 (OpenMsgQueue)。开发人员可以查询一些统计信息 (GetMsgQueueInfo),最后可以使他们的应用程序等待队列句柄 (WaitForSingleObject) 接收信号,以便确定队列中是否有数据。

    打开一个现有队列时,开发人员只能指定一个选项 — 该队列是用于读还是用于写。当创建队列时,开发人员可以指定其他参数:名称 (lpszName) 和选项(类型为 MSGQUEUOPTIONS 的 lpOptions)。队列选项包括以下内容:单个消息的最大大小 (cbMaxMessage)、队列中消息的随机最大数量 (dwMaxMessages)、是否应该动态分配缓冲区(dwFlags、MSGQUEUE_NOPRECOMMIT),以及阅读器是否可以在没有编写器的情况下存在(dwFlags、MSGQUEUE_ALLOW_BROKEN),反之亦然。关闭队列时,开发人员将句柄作为参数 (hMsgQ)。

    写入队列需要一个指针 (lpBuffer)、消息中的字节数 (cbDataSize)、超时 (dwTimeout),以及该消息是否是一个警告消息(dwFlags、MSGQUEUE_MSGALERT)。从队列读取需要:传入一个缓冲区 (lpBuffer)、缓冲区的大小 (cbBufferSize),以及一个超时值 (dwTimeout);因此,可获悉实际字节数 (lpNumberOfBytesRead) 以及该消息是否为一个警告(pdwFlags、MSGQUEUE_MSGALERT)。注意,超时是以毫秒计算的;0 表示“不阻塞”,而 INFINITE(-1) 表示“阻塞,直到操作完成或队列状态更改”。如果成功,读函数和写函数都返回 TRUE;否则,返回 FALSE。在后一种情况中,您可以获得扩展的错误信息 (GetLastError)。函数返回 FALSE 的可能原因有:缓冲区太小 (ERROR_INSUFFICIENT_BUFFER);没有编写器或阅读器,且开发人员未按前面段落中所述的那样指定 MSGQUEUE_ALLOW_BROKEN (ERROR_PIPE_NOT_CONNECTED);或者发生超时 (ERROR_TIMEOUT)。对于 WriteMsgQueue,如果未按前面段落中所述的那样指定 MSGQUEUE_NOPRECOMMIT,则也可能得到错误 ERROR_OUTOFMEMORY。

    可调用 GetMsgQueueInfo 来获得包含统计信息的结构 (MSGQUEUEINFO)。该结构包含的信息包括开发人员在创建队列时传入的一些参数:消息的最大大小 (cbMaxMessage)、消息的最大数量 (dwMaxMessages)、是否应该动态分配缓冲区(dwFlags、MSGQUEUE_NOPRECOMMIT),以及阅读器是否可以在没有编写器的情况下存在(dwFlags、MSGQUEUE_ALLOW_BROKEN),反之亦然。此外,该结构还包含以下内容:当前队列中非警告信息的数量 (dwCurrentMessages)、队列中曾经存在消息的最大数量 (dwMaxQueueMessages)、当前阅读器的数量 (wNumReaders),以及当前编写器的数量 (wNumWriters)。

    返回页首

    设计托管接口

    在为任何本机 API 集设计托管接口之前,首先需要查看它是否已经实现了。对于点对点消息队列,没有现成的库且搜索结果只提供有关 API 描述的少量信息。由于之前很少涉足该领域,因此描述针对点对点消息 API 的托管包装的设计和实现看起来是值得的。

    当开发人员将一组相关 Win32 API 包装到一个或多个托管类中时,会使用一个通用模式;大多数 Windows API 都操作一个句柄(并将其视为它们的第一个参数)。从面向对象的角度看,可将该句柄看作对象标识。您可以将与句柄相关的所有方法分组到公开相同方法的类中。除不需要该句柄之外,这些方法拥有与原始方法相同的签名。该句柄在对象创建时获取,在对象处置/析构时关闭。在 .NET Compact Framework 中,句柄由 IntPtr 类型表示。因此,根据前面的信息,您可以按以下方式创建包装特定本机 API 的类。

    + constructor(String name, MsgQueueOptions opt) + ReadMsgQueue(byte[] buf, Int32 bufSize, Int32 numRead, Int32 timeout, Int32 flags) + WriteMsgQueue(byte[] buf, Int32 bufSize, Int32 timeout, Int32 flags) + GetMsgQueueInfo(MsgQueueInfo info) + Close()

    上述代码示例中的五个方法可以形成主接口,但是还要用托管代码定义 MsgQueueOptions 和任何其他结构(如同在平台调用中使用的非托管结构所做的那样)。该接口是一个不错的开端并为您提供了一个包装,但它并不是完全面向对象的并且不适用于 .NET Compact Framework 的其他部分,而且它给客户端带来了较大的额外负担(就要编写的代码而言)。该接口仍然可以改进。

    无论在何处引入方法的重载都应该这样做,以便简化客户端代码必须处理的方法。例如,如果需要创建一个无名队列,则客户端不一定要传入 NULL — 进一步说,name 参数不应该在构造函数中。如果需要以阻塞方式读取或写入对列,而不是将 INFINITE (-1) 作为 timeout 参数进行传递,则不一定要传递任何内容。应用这些更改将增加类中的方法数量,而且将使它更易于在较简单的方案中使用,同时不会限制更复杂的情况。

    该接口可以进一步改进。注意,需要传递字节数组以及该数祖的大小的方式;传递数组大小不是必要的,因为可在任何时间确定给定数组的大小。除非在签名中显式需要数组长度(例如,出于性能原因的考虑,因为缓存该大小比每次重新计算它要快),否则可以删除 bufSize 参数。实际上,可将字节数组参数 (buf) 与 flags 参数(它指明该信息是否为一个警告信息)一起封装到它自己的类型/类之中。

    另一个使 API 更加面向对象的方法是消除结构 — 这很有意义。例如,您不必设置结构并将其传递给构造函数,而是可以将结构元素作为参数与适当的重载内联(换言之,可以使结构成员变为一系列参数)。与返回带有队列信息的结构相比,更好的解决方案是将结构字段内联到类本身上的只读属性。

    您应该能回想起本文第一部分中的两个事实:与其他许多方法一样,点对点消息队列 API 方法返回 BOOL 来指示成功或失败,而且扩展的错误信息需要一个单独的调用。可以使用两个非独占方法来映射该行为(即,返回布尔值以及需要单独检索扩展的错误信息)。第一个方法是使应用程序在发生错误时引发一个异常,并使该异常带有扩展的错误信息。第二个方法是使应用程序返回包含该方法调用的可能结果(包括成功)的枚举。作为一个通用原则,开发的应用程序应该仅在状况无法恢复时才引发异常。

    在该阶段,在 .NET Framework 中查找相似类是很有用的,并且您肯定能在 System.Messaging(在 .NET Compact Framework 版本 2.0 中也可用)中找到 MSMQ 类。您可以采用该类的成员所使用的相同命名约定(例如,将 Read 更改为 Receive,并将 Write 更改为 Send)。注意,MSMQ 类如何提供一个用于清空队列中消息的 Purge 方法。您可以通过该方法增强自己的类;换言之,虽然本机 API 不提供方法,但这并不意味着您无法通过添加一个方法来向包装添加值。

    由于 OpenMsgQueue 方法返回一个句柄,并且您已将该句柄映射到一个类,因此包装方法返回包装类的实例是很有意义的。此外,该方法在执行时不需要现有状态,因此您应该使它成为静态的。最后,需要将该结构转化为所需的单个参数:它是只读队列还是只写队列。

    请注意,这里不描述平台 invoke 声明;您可以在附加代码中找到它们。下一部分展示前面描述的所有改进(还有很多地方有待改进)的结果。它还将描述向该类添加的最后一个内容:DataOnQueueChanged 事件!

    返回页首

    P2PMessageQueue

    确定 P2PMessageQueue 的公共接口的最佳方式是使用类。IntelliSense 将显示每个方法和每个参数的描述性帮助信息。由于本文的下载中包含了完整代码,因此您可以看到源代码。您现在就可以下载并熟悉该源代码。以下几个段落将概述 P2PMessageQueue 接口。

    图 1 显示该类的统一建模语言 (UML) 表示形式(以及它的两个依赖项:Message 类和 ReadWriteResult 枚举)。

    图 1. 显示以下三种类型的类关系图:P2PmessageQueue、Message 和 ReadWriteResult。

    如果您偏爱描述性信息较少的 Object Browser 屏幕快照(但对于某些人来说可能更为熟悉),请参见图 2。

    图 2. 显示这三种类型的 Visual Studio Object Browser。

    在查看本文后面部分中的示例之前,请注意尚未讨论的项以及 UML 关系图中未显示的项(特意去注意)。

    添加了 DataOnQueueChanged 事件。正如您将看到的那样,在没有该事件的情况下,P2PMessageQueue 类仍然可用,但是当只读队列非空并且只写队列未满时,该增强功能将通知客户端。在内部,将阻塞等待队列句柄的线程,并在句柄收到信号时激发该事件,这就避免了错误的发生。(请参见代码以获取详细信息。)请注意,该事件不 在 UI 线程上,因此您需要使用 Control.Invoke。如果查看代码,您还将看到在调用 Close 方法时,该线程是如何完全关闭的,这正是 .NET Compact Framework 版本 1.0 的线程所需要的。.NET Compact Framework 版本 1.0 中的 Thread 类不提供 IsBackground 属性或 Abort 方法;因此,必须确保应用程序提供必需的代码来完全终止该应用程序启动的所有线程。如果不提供该代码,则运行应用程序的进程很可能无法终止,因为运行中的 .NET Compact Framework 1.0 线程会保持该进程运行,即使主应用程序线程尝试退出也是如此。

    此外,如果查看该实现会发现两个受保护的虚拟方法。

    # void StartEventThread() # Int32 GetBehaviourFlag()

    如果不需要启动事件线程,则可以用一个空方法重写第一个方法。不启动事件线程是一个非常规方案(因为无法直接订阅该事件),但是的确存在该选项。第二个方法返回传入到 CreateMsgQueue 方法的标志 (MSGQUEUOPTIONS.dwFlags) 中使用的整型参数。MSGQUEUE_ALLOW_BROKEN 的默认设置很不错,但是如果要更改它(例如,更改为 0 或 MSGQUEUE_NOPRECOMMIT),则重写 GetBehaviorFlag 是最好的做法。

    现在您应该已经十分了解 P2PMessageQueue 类的外部特征,以及为什么以这种方式设计。接下来,您将看到如何使用组件。

    返回页首

    使用 P2PMessageQueue

    本部分,您将看到使用 P2PMessageQueue 类和相关类型的示例。

    注 当运行该示例时,可以选择部署到 Windows CE 或 Pocket PC 2003 模拟器或设备。您可以在不进行修改的情况下在任一个平台上调试该项目(并运行该应用程序)。运行示例时,看到的第一个屏幕提示您在阅读器进程和发送器进程间进行选择。无论选择哪一个,都必须再次运行 .exe 文件(从 Program Files),然后选择另一个选项。在 Pocket PC 平台上,无论是使用模拟器还是设备,都必须重命名 .exe 文件(否则将激活现有的运行中应用程序)。

    字符串的简单 IPC 交换

    首先使用托管进程将字符串传入另一个 .NET Compact Framework 应用程序(也可以使发送方或接收方成为本机应用程序。有三种不同的方法用来读取接收端的字符串(相同的原理也适用于发送端):阻塞、非阻塞以及事件驱动。

    发送方和接收方的图形用户界面 (GUI) 在功能方面是自描述性的,如图 3、4 和 5 所示。

    图 3. 主窗体

    图 4. 读取端

    当发送方单击 Send 按钮时,发送一个字符串(如文本框中输入的),并可以选择将该消息设置为警告消息(基于 Message Is Alert 复选框状态)。发送方会阻塞,直到针对指定超时发送该消息(作为 combobox 中选择的发送方)。位于该复选框下面的窗体底部显示 Send 方法的返回结果,如图 5 所示(即 OK)。

    图 5. 发送/编写端

    这里再次使用了下载示例中的 Send 方法。

    private void cmdSend_Click(object sender, System.EventArgs e) { Message msg; msg = new Message( System.Text.Encoding.ASCII.GetBytes(txtSend.Text), chkIsAlert.Checked); ReadWriteResult rwr = mQue.Send(msg, mTimeout); lblSendResult.Text = rwr.ToString(); }

    当阅读器收到一个消息后,会将它显示在列表视图(第三列)中,并指出它是否是警告消息(第二列)。当成功接收到该消息时,第一列将始终显示 OK。默认情况下,要接收一个消息,请单击 Receive 按钮;如果没有消息要接收或者方法失败,则列表视图的第一列将指出原因(另两列在该情形中不适用)。

    在读取和发送时,QueueInfo 菜单(单击 Info,然后单击 QueueInfo)会显示有关队列的数据。接收端上的 Mode 菜单(单击 Read,然后单击 Mode)有三个菜单项:On Demand Only、Event driven 和 Block a Thread。这些项用于配置该程序如何接收队列外的消息。当您选择一个模式后,它在示例应用程序的生命周期内不应该更改(开发人员可针对自己的设计进行混合与匹配)。以下几个小节描述三种读取模式。

    按命令读(对应于菜单 On Demand Only)

    当接收方单击 Receive 按钮时,将执行以下方法。

    private void cmdReceive_Click(object sender, System.EventArgs e) { Message msg; msg = new Message(); // mTimeout is set by the end user by means of the GUI // to DON'T BLOCK (0), BLOCK (-1), or a real timeout value ReadWriteResult rwr = mQue.Receive(ref msg, mTimeout); ListViewItem lvi; if (rwr == ReadWriteResult.OK){ bool isAlrt; string payload; isAlrt = msg.IsAlert; byte[] bytes = msg.MessageBytes; payload = System.Text.Encoding.ASCII.GetString( bytes, 0, bytes.GetLength(0)); lvi = new ListViewItem( new string[]{rwr.ToString(), isAlrt.ToString(), payload}); }else{ lvi = new ListViewItem( new string[]{rwr.ToString(), @"n/a", @"n/a"}); } listView1.Items.Add(lvi); listView1.Columns[2].Width = -2;}

    事件驱动

    事件驱动模型基本上意味着应用程序不会在任意时刻通过调用 Receive(例如,在计时器上或者要求用户单击 Receive 按钮)来轮询新消息,相反,应用程序会订阅并捕获来自 P2PMessageQueue 类的事件。要订阅事件,需要使用正规的 .NET Compact Framework 委托习语(队列的创建也不例外)。

    mQue = new P2PMessageQueue( isReader, txtQueueName.Text, maxL, maxM, out firstTime); mQue.DataOnQueueChanged += new EventHandler(mQue_DataOnQueueChanged); 引发该事件会调用方法,在本例中,只调用现有的接收方法。 private void mQue_DataOnQueueChanged(object sender, EventArgs e) { this.Invoke(new EventHandler(this.cmdReceive_Click)); }

    阻塞线程

    第三种从队列进行读取的方法是:创建一个线程,并使其阻塞以等待队列的 Receive 方法。每次接收到消息时,应用程序都会处理它,然后再次循环回阻塞。以下是一些带有解释的示例代码。

    您会在某个地方创建并启动以下线程。

    Thread t = new Thread(new ThreadStart(ThreadBlockedOnRead)); t.Start();

    该线程用下面的方法运行。(有关更多上下文,请下载代码)。

    private void ThreadBlockedOnRead(){ while (mMode == 2){ // Thread mode Message msg = new Message(); //Can actually omit a timeout for a true infinite block ReadWriteResult rwr = mQue.Receive(msg, 60 * 1000); if (rwr == ReadWriteResult.InvalidHandle || mMode != 2){ return; } string body = rwr.ToString(); if (rwr == ReadWriteResult.OK){ byte[] bytes = msg.MessageBytes; string bytesAsString = System.Text.Encoding.ASCII.GetString( bytes, 0, bytes.GetLength(0)); body += " | " + msg.IsAlert.ToString() + " | " + bytesAsString; } MessageBox.Show(body, "To terminate this mode use the menu again"); } }

    基本示例代码至此结束。读取队列的三种方式也可以应用于通过队列发送。当事件接收到信号时(即,队列从已满转变为未满),可进行阻塞和发送或尝试在不进行阻塞的情况下随时进行发送。在下一部分中,您将看到该设计如何允许队列中的消息包含其他结构 — 而不仅仅是字符串。

    注在开发一个依赖事件信号来识别队列从已满转变为未满的应用程序时,需要在应用程序启动时针对队列执行一个初始写入操作。如果不执行该初始写入操作,则应用程序永远不会开始写入,因为该初始写入操作必须经过特定地执行才能填充转变为未满状态的队列,因此,向事件发出信号以触发针对该队列的进一步写入操作。

    发送和接收更复杂的类型(不仅仅是字符串)

    在前几部分中,是在应用程序之间传递字符串,但如果可以将字符串与字节数组进行转换,则还可以传递任何数据类型。因此,将该类型转换为一个字节数组,在其中创建 Message 类,然后发送 Message。在接收端,检索 Message,获取字节数组,然后在其中创建该类型(例如,公开类型的 ToBytes 和 FromBytes 方法)。另一个方法是,从 Message 类继承自己的类,并在其中实现转换。很自然,如果您尝试传递一个复杂的对象图,则在类型与字节数组之间进行转换会难得多。尝试使用没有源代码的类型需要特别注意,因为您可能不具有对该类型的完整状态(例如,私有成员)的访问权,因此,可能无法准确地在类型与字节数组之间进行转换。

    出于简单的目的,假设将一个 Int64 和一个 Boolean 从一个进程传递到另一个进程。将创建一个 CustomMessage 类,如下所示。

    public class CustomMessage : Message { public long TotalMemory; public bool AfterGC; public CustomMessage(){ TotalMemory = 0; AfterGC = false; } public CustomMessage(long totMem, bool afterGarbCol){ TotalMemory = totMem; AfterGC = afterGarbCol; } public override byte[] MessageBytes { get { byte[] b1 = BitConverter.GetBytes(TotalMemory); byte[] b2 = BitConverter.GetBytes(AfterGC); byte[] b = new byte[9]; Buffer.BlockCopy(b1, 0, b, 0, 8); Buffer.BlockCopy(b2, 0, b, 8, 1); return b; } set { TotalMemory = BitConverter.ToInt64(value, 0); AfterGC = BitConverter.ToBoolean(value, 8); base.MessageBytes = value; } } }

    添加了两个感兴趣的字段(TotalMemory、AfterGC),重写 MessageBytes 属性以实现转换(get 和 set 方法位于转换发生的位置),然后添加一个默认的构造函数(这个参数化的构造函数是可选的)。

    现在,如果要使用前面的示例,只需更改两处地方:

    1.

    发送消息时,要创建一个 CustomMessage,而不是将值赋给 TotalMemory 和 AfterGC。

    //msg = new Message(. . .); //Instead of this line msg = new CustomMessage(GC.GetTotalMemory(false), false); ReadWriteResult rwr = mQue.Send(msg, mTimeout);

    2.

    接收消息时,要创建一个 CustomMessage。

    //msg = new Message(); msg = new CustomMessage(); //msg still declared as Message ReadWriteResult rwr = mQue.Receive(msg, mTimeout);

    然后,在 mQue.Receive 返回时读取它的属性。

    //byte[] bytes = msg.MessageBytes; //payload = System.Text.Encoding.ASCII.GetString(. . .); payload = "Total Memory = " + ((CustomMessage)msg).TotalMemory.ToString() + (((CustomMessage)msg).AfterGC ? " after a GC" : " without forcing a GC");

    演练阅读器

    到目前为止,读者应该很了解该类及其针对 IPC 的用法了。

    队列用于在某些情况下与操作系统进行通讯(Windows CE 操作系统本身就在很多方面使用该机制)。例如,点对点消息队列可用在设备电源管理的上下文中 (RequestPowerNotifications)。通过使用 P2PMessageQueue 类以及在电源管理结构(例如,POWER_BROADCAST)和字节数组之间进行转换的技能,您现在就可以接收和解释此类事件。详述示例已超出本文的范围,但它可单独成篇。

    还有另一个示例也超出了本文的范围,即,一个用 eMbedded Visual C++ 编写的、与托管应用程序进行通讯的应用程序。原理都是相同的,除非您只在托管端使用 P2PMessageQueue 类。

    返回页首

    小结

    本文的主旨是介绍 P2PMessageQueue 类(以及 Message 和 ReadWriteResult)的实际用法。您可以在 .NET Compact Framework 应用程序(无论是托管的还是本机的)中使用它与其他进程通讯。最后,如果您要将一个本机 API 包装到一个托管类中,那么针对 P2PMessageQueue 类的进程可用作设计决策的示例。

    返回页首

    关于作者

    Daniel Moth 是一名 .NET Compact Framework MVP,目前供职于 Trend Control Systems Ltd.。您可以通过他的 Web 日志与他取得联系,并阅读他的有关文章和网络日记条目的反馈。他还在 microsoft.public.dotnet.framework.compactframework 新闻组和新的 MSDN 论坛中回答关于 .NET Compact Framework 的常见问题。


    最新回复(0)