正文内容已丢失,非常抱歉。
这本书看完后我会再补充这一部分。
摘要部分:
类设计如下:MessageQueueManagment 消息队列管理类,负责与MessageQueue类交互,减少了用户直接使用此类的麻烦IMessageQueueManagment 消息队列管理类的公用方法接口IMessageQueue 消息队列的共同接口MSMQ 微软的MSMQ消息队列服务访问类MQServic IBM的消息队列服务访问类Message 消息类。
消息队列服务组件代码:
using
System;
namespace
SAF.MessageQueue
...
{ /**//// <summary> /// Containing the interface and class referenced in /// the message queue impelmentation class /// </summary> public delegate void MessageArrivalHandler (Message m, string queueName); /**//// <summary> /// interface that every message queue implementation must implements /// </summary> public interface IMessageQueue ...{ void Send(Message message); Message Retrieve(); event MessageArrivalHandler MessageArrival; void Open(); void Close(); } /**//// <summary> /// Interface for MessageQueueManager. it is used by /// client application to interact with MessageQueueManager. /// </summary> public interface IMessageQueueManager ...{ void SendMessage(Message message); Message RetrieveMessage(); void RegisterMessageArrivalHanlder(MessageArrivalHandler mah); void UnRegisterMessageArrivalHanlder(MessageArrivalHandler mah); void OpenConnection(); void CloseConnection(); } /**//// <summary> /// A basic Message class, can be extended to create more /// implmentation specific message class /// </summary> public class Message ...{ public string Label; public object Content; }}
using
System;
using
System.Configuration;
using
SAF.Configuration;
namespace
SAF.MessageQueue
...
{ /**//// <summary> /// MessageQueueManager contains the API that developers /// would interact with the most. It provide the "bridge" /// to the underlying implementation of message queueing. /// This compnoent show most simplified linkage between MessageQueueManager /// and underlying Message queue. The more dissimilar between IMessageManager and the /// underlying Message Queue, the more complex MessageQueueManager becomes. /// </summary> public class MessageQueueManager : IMessageQueueManager ...{ private IMessageQueue mq; private ConfigurationManager cm; public MessageQueueManager(IMessageQueue messageQueue) ...{ mq = messageQueue; } /**//// <summary> /// pass the call the actual message queue implementation /// </summary> /// <param name="m">message object</param> public void SendMessage(Message m) ...{ mq.Send(m); } /**//// <summary> /// pass the call the actual message queue implementation /// </summary> /// <returns>the retrieved message from the queue</returns> public Message RetrieveMessage() ...{ return mq.Retrieve(); } /**//// <summary> /// register event hanlder that trigger when new message arrives at the queue /// </summary> /// <param name="mah">MessageArrivalHandler delegate</param> public void RegisterMessageArrivalHanlder(MessageArrivalHandler mah) ...{ mq.MessageArrival += mah; } /**//// <summary> /// unregister event handler that trigger when new message arrives at the queue. /// </summary> /// <param name="mah">MessageArrivalHandler delegate</param> public void UnRegisterMessageArrivalHanlder(MessageArrivalHandler mah) ...{ mq.MessageArrival -= mah; } /**//// <summary> /// open the connection of the underlying mesage queue /// </summary> public void OpenConnection() ...{ mq.Open(); } /**//// <summary> /// close the connection of the underlying message queue /// </summary> public void CloseConnection() ...{ mq.Close(); } }}
using
System;
using
System.Xml;
using
System.Messaging;
using
SAF.Configuration;
using
System.Configuration;
using
System.Collections;
using
System.Threading;
namespace
SAF.MessageQueue
...
{ /**//// <summary> /// A sample implementation of IMessageQueue for MSMQ technology. /// It allows developers to send, retrieve messages from MSMQ and register /// event for new message arrivals. /// </summary> public class MSMQ : IMessageQueue ...{ private int sleepTime; private string formatName; private ConfigurationManager cm; private static MessageArrivalHandler handler; private string queueName; private static bool queueListenerStarted = false; private ArrayList supportedTypes = new ArrayList(); private System.Messaging.MessageQueue mq; /**//// <summary> /// Constructor that retrieve the queue related information /// for MessageQueueConfiguration object /// </summary> /// <param name="queueName">name of the queue</param> public MSMQ(string queueName) ...{ cm = (ConfigurationManager)ConfigurationSettings.GetConfig("Framework"); XmlNode queueInfo = cm.MessageQueueConfig.RetrieveQueueInformation("*[@name='" + queueName + "']" ); formatName = queueInfo.SelectSingleNode("FormatName").InnerText; sleepTime =Int32.Parse(queueInfo.SelectSingleNode("SleepTime").InnerText); this.queueName = queueName; //supportedTypes is used to provide information to System.Messaging.MessageQueue //information on how to serialize and deserialize the object sent to and retrieved from //the queue. The default data type is string type. supportedTypes.Add(typeof(System.String).ToString()); } /**//// <summary> /// allows developers to add additional type information /// for serialization and deserialzation of the messsage /// </summary> /// <param name="typeName">the data type name</param> public void AddSupportedType(string typeName) ...{ supportedTypes.Add(typeName); } /**//// <summary> /// event for arrival messages to the queue /// </summary> public event MessageArrivalHandler MessageArrival ...{ //when new handler is register for the event, start the listener //if it is not yet started add ...{ handler += value; if (queueListenerStarted != true) ...{ //create a new thread to listen on the queue ThreadStart ts = new ThreadStart(StartQueueListener); Thread t = new Thread(ts); t.Start(); } } remove ...{ handler -= value; //stop the listener if no handler is listed if (handler == null || handler.GetInvocationList().Length <= 0) ...{ StopQueueListener(); } } } /**//// <summary> /// Sends the message to the MSMQ /// </summary> /// <param name="m">message object</param> public void Send(SAF.MessageQueue.Message m) ...{ //set type information queue can use to serialize the message ((XmlMessageFormatter)mq.Formatter).TargetTypeNames = (string[])(supportedTypes.ToArray(typeof(System.String))); mq.Send(m.Content,m.Label); } /**//// <summary> /// Retrieve the message from the MSMQ /// </summary> /// <returns>retrieved message object from the queue</returns> public SAF.MessageQueue.Message Retrieve() ...{ //set type information queue can use to deserialize the message ((XmlMessageFormatter)mq.Formatter).TargetTypeNames = (string[])(supportedTypes.ToArray(typeof(System.String))); System.Messaging.Message message = mq.Receive(); SAF.MessageQueue.Message safMessage = new SAF.MessageQueue.Message(); safMessage.Label = message.Label; safMessage.Content = message.Body; return safMessage; } /**//// <summary> /// open the connection to the message queue /// </summary> public void Open() ...{ mq = new System.Messaging.MessageQueue(formatName); } /**//// <summary> /// close the connection to the message queue /// </summary> public void Close() ...{ if (mq != null) ...{ mq.Close(); mq.Dispose(); } } /**//// <summary> /// Start the listen to the queue for incoming messages and /// notifiy the handlers as new messges arrive /// </summary> private void StartQueueListener() ...{ //create a separate connection to the message queue System.Messaging.MessageQueue listenermq = new System.Messaging.MessageQueue(formatName); ((XmlMessageFormatter)listenermq.Formatter).TargetTypeNames = (string[])(supportedTypes.ToArray(typeof(System.String))); System.Messaging.Message message = null; queueListenerStarted = true; try ...{ //listen to the queue continusly through loop while (queueListenerStarted == true) ...{ System.Threading.Thread.Sleep(sleepTime); if (handler.GetInvocationList().Length > 0) ...{ //this is a call that will block the thread if no //message is in the queue. message = listenermq.Receive(); SAF.MessageQueue.Message safMessage = new SAF.MessageQueue.Message(); safMessage.Label = message.Label; safMessage.Content = message.Body; //fire the event handler(safMessage,queueName); } } } finally ...{ //close the connetion listenermq.Close(); } } /**//// <summary> /// stop the listener /// </summary> private void StopQueueListener() ...{ queueListenerStarted = false; } }}
using
System;
using
System.Xml;
using
SAF.Configuration;
using
System.Configuration;
using
MQAX200;
using
System.Threading;
namespace
SAF.MessageQueue
...
{ /**//// <summary> /// A sample implementation of IMessageQueue for MQSeries technology. /// It allows developers to send, retrieve messages from MQSeries and register /// event for new message arrivals. /// </summary> public class MQSeries : IMessageQueue ...{ private ConfigurationManager cm; private MessageArrivalHandler handler; private string queueManager; private string QueueName; private MQQueue queue; private MQSession queueSession; private MQQueueManager mqm; private bool queueListenerStarted; private int sleepTime; /**//// <summary> /// Constructor that retrieve the queue related information /// for MessageQueueConfiguration object /// </summary> /// <param name="queueName">the name of the queue</param> public MQSeries(string queueName) ...{ cm = (ConfigurationManager)ConfigurationSettings.GetConfig("Framework"); XmlNode queueInfo = cm.MessageQueueConfig.RetrieveQueueInformation("*[@name='" + queueName + "']" ); queueManager = queueInfo.SelectSingleNode("QueueManager").InnerText; QueueName = queueInfo.SelectSingleNode("QueueName").InnerText; sleepTime = Int32.Parse(queueInfo.SelectSingleNode("SleepTime").InnerText); queueSession = new MQSessionClass(); } /**//// <summary> /// send the message to the MQSeries's queue /// </summary> /// <param name="m">a message object to be sent</param> public void Send(Message m) ...{ //create a new MQSeries message MQMessage message = (MQMessage)queueSession.AccessMessage(); message.WriteString(m.Content.ToString()); MQPutMessageOptions messageOption = (MQPutMessageOptions)queueSession.AccessPutMessageOptions(); //send the message to the MQSeries queue queue.Put(message,messageOption); } /**//// <summary> /// Retrieve message from the MQSeries's queue /// </summary> /// <returns></returns> public Message Retrieve() ...{ //create a new message MQMessage message = (MQMessage)queueSession.AccessMessage(); MQGetMessageOptions messageOption = (MQGetMessageOptions)queueSession.AccessGetMessageOptions(); //fill the new message object with message from he queue //unlike MSMQ, GET is not a blocking call, instead, it raise //an exception if trying to retrieve message from a queue that is emtpy. queue.Get(message,messageOption,System.Reflection.Missing.Value); //create a new message object that contains the //message from the queue. Message m = new Message(); m.Content = message.ReadString(message.MessageLength); m.Label = message.MessageId; return m; } /**//// <summary> /// event for arrival messages to the queue /// </summary> public event MessageArrivalHandler MessageArrival ...{ //when new handler is register for the event, start the listener //if it is not yet started add ...{ handler += value; if (queueListenerStarted != true) ...{ //create a new thread to listen on the queue ThreadStart ts = new ThreadStart(StartQueueListener); Thread t = new Thread(ts); t.Start(); } } remove ...{ handler -= value; //stop the listener if no handler is listed if (handler == null || handler.GetInvocationList().Length <= 0) ...{ StopQueueListener(); } } } /**//// <summary> /// Start the listen to the queue for incoming messages and /// notifiy the handlers as new messges arrive /// </summary> public void StartQueueListener() ...{ //create a separate connection to the message queue queueListenerStarted = true; MQQueueManager listenermqm = (MQQueueManager)queueSession.AccessQueueManager(queueManager); MQQueue listenerqueue =(MQQueue)mqm.AccessQueue(QueueName,(int)MQ.MQOO_INPUT_AS_Q_DEF + (int)MQ.MQOO_OUTPUT,"","",""); listenerqueue.Open(); try ...{ MQMessage message = (MQMessage)queueSession.AccessMessage(); MQGetMessageOptions messageOption = (MQGetMessageOptions)queueSession.AccessGetMessageOptions(); while(queueListenerStarted == true) ...{ System.Threading.Thread.Sleep(sleepTime); if (handler.GetInvocationList().Length > 0) ...{ try ...{ //GET will raise an exception if no message is in the queue. //we want to keep listening despite of the exception, see exception block //for detail listenerqueue.Get(message,messageOption,System.Reflection.Missing.Value); SAF.MessageQueue.Message safMessage = new SAF.MessageQueue.Message(); safMessage.Label = message.MessageId; safMessage.Content = message.ReadString(message.MessageLength); //fire the event handler(safMessage,QueueName); } catch (System.Runtime.InteropServices.COMException ex) ...{ //-2147467259 represents the error code for retrieving //message from an empty queue. do nothing if gotting this error code. if (ex.ErrorCode != -2147467259) ...{ throw ex; } } } } } finally ...{ //close the connetion listenerqueue.Close(); listenermqm.Disconnect(); } } /**//// <summary> /// stop the listener /// </summary> public void StopQueueListener() ...{ queueListenerStarted = false; } /**//// <summary> /// open the connection to the message queue /// </summary> public void Open() ...{ mqm = (MQQueueManager)queueSession.AccessQueueManager(queueManager); queue = (MQQueue)mqm.AccessQueue(QueueName,(int)MQ.MQOO_INPUT_AS_Q_DEF + (int)MQ.MQOO_OUTPUT,"","",""); queue.Open(); } /**//// <summary> /// close the connection to the message queue /// </summary> public void Close() ...{ if (queue != null) ...{ queue.Close(); } if (mqm != null) ...{ mqm.Disconnect(); } } }}
转载请注明原文地址: https://ibbs.8miu.com/read-23130.html