应用框架的设计与实现——.NET平台(9 消息队列服务代码分析)

    技术2022-05-11  69

    正文内容已丢失,非常抱歉。 这本书看完后我会再补充这一部分。 摘要部分: 类设计如下:MessageQueueManagment 消息队列管理类,负责与MessageQueue类交互,减少了用户直接使用此类的麻烦IMessageQueueManagment 消息队列管理类的公用方法接口IMessageQueue 消息队列的共同接口MSMQ 微软的MSMQ消息队列服务访问类MQServic IBM的消息队列服务访问类Message 消息类。 消息队列服务组件代码: using  System; namespace  SAF.MessageQueue {    /// <summary>    /// Containing the interface and class referenced in    /// the message queue impelmentation class    /// </summary>    public delegate void MessageArrivalHandler (Message m, string queueName);    /// <summary>    /// interface that every message queue implementation must implements    /// </summary>    public interface IMessageQueue    {        void Send(Message message);        Message Retrieve();        event MessageArrivalHandler MessageArrival;        void Open();        void Close();    }    /// <summary>    /// Interface for MessageQueueManager.  it is used by    /// client application to interact with MessageQueueManager.    /// </summary>    public interface IMessageQueueManager    {        void SendMessage(Message message);        Message RetrieveMessage();        void RegisterMessageArrivalHanlder(MessageArrivalHandler mah);        void UnRegisterMessageArrivalHanlder(MessageArrivalHandler mah);        void OpenConnection();        void CloseConnection();    }    /// <summary>    /// A basic Message class, can be extended to create more    /// implmentation specific message class    /// </summary>    public class Message    {        public string Label;        public object Content;    }} using  System; using  System.Configuration; using  SAF.Configuration; namespace  SAF.MessageQueue {    /// <summary>    /// MessageQueueManager contains the API that developers    /// would interact with the most. It provide the "bridge"    /// to the underlying implementation of message queueing.    /// This compnoent show most simplified linkage between MessageQueueManager    /// and underlying Message queue.  The more dissimilar between IMessageManager and the    /// underlying Message Queue, the more complex MessageQueueManager becomes.    /// </summary>    public class MessageQueueManager : IMessageQueueManager    {        private IMessageQueue mq;        private ConfigurationManager cm;        public MessageQueueManager(IMessageQueue messageQueue)        {            mq = messageQueue;        }        /// <summary>        /// pass the call the actual message queue implementation        /// </summary>        /// <param name="m">message object</param>        public void SendMessage(Message m)        {            mq.Send(m);        }        /// <summary>        /// pass the call the actual message queue implementation        /// </summary>        /// <returns>the retrieved message from the queue</returns>        public Message RetrieveMessage()        {            return mq.Retrieve();        }        /// <summary>        /// register event hanlder that trigger when new message arrives at the queue        /// </summary>        /// <param name="mah">MessageArrivalHandler delegate</param>        public void RegisterMessageArrivalHanlder(MessageArrivalHandler mah)        {            mq.MessageArrival += mah;        }        /// <summary>        /// unregister event handler that trigger when new message arrives at the queue.        /// </summary>        /// <param name="mah">MessageArrivalHandler delegate</param>        public void UnRegisterMessageArrivalHanlder(MessageArrivalHandler mah)        {            mq.MessageArrival -= mah;        }        /// <summary>        /// open the connection of the underlying mesage queue        /// </summary>        public void OpenConnection()        {            mq.Open();        }        /// <summary>        /// close the connection of the underlying message queue        /// </summary>        public void CloseConnection()        {            mq.Close();        }    }} using  System; using  System.Xml; using  System.Messaging; using  SAF.Configuration; using  System.Configuration; using  System.Collections; using  System.Threading; namespace  SAF.MessageQueue {    /// <summary>    /// A sample implementation of IMessageQueue for MSMQ technology.    /// It allows developers to send, retrieve messages from MSMQ and register    /// event for new message arrivals.    /// </summary>    public class MSMQ : IMessageQueue    {        private int sleepTime;        private string formatName;        private ConfigurationManager cm;        private static MessageArrivalHandler handler;        private string queueName;        private static bool queueListenerStarted = false;        private ArrayList supportedTypes = new ArrayList();        private System.Messaging.MessageQueue mq;        /// <summary>        /// Constructor that retrieve the queue related information        /// for MessageQueueConfiguration object        /// </summary>        /// <param name="queueName">name of the queue</param>        public MSMQ(string queueName)        {            cm = (ConfigurationManager)ConfigurationSettings.GetConfig("Framework");            XmlNode queueInfo = cm.MessageQueueConfig.RetrieveQueueInformation("*[@name='" + queueName + "']" );            formatName = queueInfo.SelectSingleNode("FormatName").InnerText;            sleepTime =Int32.Parse(queueInfo.SelectSingleNode("SleepTime").InnerText);            this.queueName = queueName;            //supportedTypes is used to provide information to System.Messaging.MessageQueue            //information on how to serialize and deserialize the object sent to and retrieved from            //the queue.  The default data type is string type.            supportedTypes.Add(typeof(System.String).ToString());        }        /// <summary>        /// allows developers to add additional type information        /// for serialization and deserialzation of the messsage        /// </summary>        /// <param name="typeName">the data type name</param>        public void AddSupportedType(string typeName)        {            supportedTypes.Add(typeName);        }        /// <summary>        /// event for arrival messages to the queue        /// </summary>        public event MessageArrivalHandler MessageArrival        {            //when new handler is register for the event, start the listener            //if it is not yet started            add            {                handler += value;                if (queueListenerStarted != true)                {                    //create a new thread to listen on the queue                    ThreadStart ts = new ThreadStart(StartQueueListener);                    Thread t = new Thread(ts);                    t.Start();                }            }            remove            {                handler -= value;                //stop the listener if no handler is listed                if (handler == null || handler.GetInvocationList().Length <= 0)                {                    StopQueueListener();                }            }        }        /// <summary>        /// Sends the message to the MSMQ        /// </summary>        /// <param name="m">message object</param>        public void Send(SAF.MessageQueue.Message m)        {            //set type information queue can use to serialize the message            ((XmlMessageFormatter)mq.Formatter).TargetTypeNames = (string[])(supportedTypes.ToArray(typeof(System.String)));            mq.Send(m.Content,m.Label);        }        /// <summary>        /// Retrieve the message from the MSMQ        /// </summary>        /// <returns>retrieved message object from the queue</returns>        public SAF.MessageQueue.Message Retrieve()        {            //set type information queue can use to deserialize the message            ((XmlMessageFormatter)mq.Formatter).TargetTypeNames = (string[])(supportedTypes.ToArray(typeof(System.String)));            System.Messaging.Message message = mq.Receive();            SAF.MessageQueue.Message safMessage = new SAF.MessageQueue.Message();            safMessage.Label = message.Label;            safMessage.Content = message.Body;            return safMessage;        }        /// <summary>        /// open the connection to the message queue        /// </summary>        public void Open()        {             mq = new System.Messaging.MessageQueue(formatName);        }        /// <summary>        /// close the connection to the message queue        /// </summary>        public void Close()        {            if (mq != null)            {                mq.Close();                mq.Dispose();            }        }        /// <summary>        /// Start the listen to the queue for incoming messages and        /// notifiy the handlers as new messges arrive        /// </summary>        private void StartQueueListener()        {            //create a separate connection to the message queue            System.Messaging.MessageQueue listenermq = new System.Messaging.MessageQueue(formatName);            ((XmlMessageFormatter)listenermq.Formatter).TargetTypeNames = (string[])(supportedTypes.ToArray(typeof(System.String)));            System.Messaging.Message message = null;            queueListenerStarted = true;            try            {                //listen to the queue continusly through loop                while (queueListenerStarted == true)                {                    System.Threading.Thread.Sleep(sleepTime);                    if (handler.GetInvocationList().Length > 0)                    {                        //this is a call that will block the thread if no                        //message is in the queue.                        message = listenermq.Receive();                        SAF.MessageQueue.Message safMessage = new SAF.MessageQueue.Message();                        safMessage.Label = message.Label;                        safMessage.Content = message.Body;                        //fire the event                        handler(safMessage,queueName);                    }                }            }            finally            {                //close the connetion                listenermq.Close();            }        }        /// <summary>        /// stop the listener        /// </summary>        private void StopQueueListener()        {            queueListenerStarted = false;        }    }} using  System; using  System.Xml; using  SAF.Configuration; using  System.Configuration; using  MQAX200; using  System.Threading; namespace  SAF.MessageQueue {    /// <summary>    /// A sample implementation of IMessageQueue for MQSeries technology.    /// It allows developers to send, retrieve messages from MQSeries and register    /// event for new message arrivals.    /// </summary>    public class MQSeries : IMessageQueue    {        private ConfigurationManager cm;        private MessageArrivalHandler handler;        private string queueManager;        private string QueueName;        private MQQueue queue;        private MQSession queueSession;        private MQQueueManager mqm;        private bool queueListenerStarted;        private int sleepTime;        /// <summary>        /// Constructor that retrieve the queue related information        /// for MessageQueueConfiguration object        /// </summary>        /// <param name="queueName">the name of the queue</param>        public MQSeries(string queueName)        {            cm = (ConfigurationManager)ConfigurationSettings.GetConfig("Framework");            XmlNode queueInfo = cm.MessageQueueConfig.RetrieveQueueInformation("*[@name='" + queueName + "']" );            queueManager = queueInfo.SelectSingleNode("QueueManager").InnerText;            QueueName = queueInfo.SelectSingleNode("QueueName").InnerText;            sleepTime = Int32.Parse(queueInfo.SelectSingleNode("SleepTime").InnerText);            queueSession = new MQSessionClass();        }        /// <summary>        /// send the message to the MQSeries's queue        /// </summary>        /// <param name="m">a message object to be sent</param>        public void Send(Message m)        {            //create a new MQSeries message            MQMessage message = (MQMessage)queueSession.AccessMessage();            message.WriteString(m.Content.ToString());            MQPutMessageOptions messageOption = (MQPutMessageOptions)queueSession.AccessPutMessageOptions();            //send the message to the MQSeries queue            queue.Put(message,messageOption);        }        /// <summary>        /// Retrieve message from the MQSeries's queue        /// </summary>        /// <returns></returns>        public Message Retrieve()        {            //create a new message            MQMessage message  = (MQMessage)queueSession.AccessMessage();            MQGetMessageOptions messageOption = (MQGetMessageOptions)queueSession.AccessGetMessageOptions();            //fill the new message object with message from he queue            //unlike MSMQ, GET is not a blocking call, instead, it raise            //an exception if trying to retrieve message from a queue that is emtpy.            queue.Get(message,messageOption,System.Reflection.Missing.Value);            //create a new message object that contains the            //message from the queue.            Message m = new Message();            m.Content = message.ReadString(message.MessageLength);            m.Label = message.MessageId;            return m;        }        /// <summary>        /// event for arrival messages to the queue        /// </summary>        public event MessageArrivalHandler MessageArrival        {            //when new handler is register for the event, start the listener            //if it is not yet started            add            {                handler += value;                if (queueListenerStarted != true)                {                    //create a new thread to listen on the queue                    ThreadStart ts = new ThreadStart(StartQueueListener);                    Thread t = new Thread(ts);                    t.Start();                }            }            remove            {                handler -= value;                //stop the listener if no handler is listed                if (handler == null || handler.GetInvocationList().Length <= 0)                {                    StopQueueListener();                }            }        }        /// <summary>        /// Start the listen to the queue for incoming messages and        /// notifiy the handlers as new messges arrive        /// </summary>        public void StartQueueListener()        {            //create a separate connection to the message queue            queueListenerStarted = true;            MQQueueManager listenermqm = (MQQueueManager)queueSession.AccessQueueManager(queueManager);            MQQueue listenerqueue =(MQQueue)mqm.AccessQueue(QueueName,(int)MQ.MQOO_INPUT_AS_Q_DEF + (int)MQ.MQOO_OUTPUT,"","","");            listenerqueue.Open();            try            {                MQMessage message = (MQMessage)queueSession.AccessMessage();                MQGetMessageOptions messageOption = (MQGetMessageOptions)queueSession.AccessGetMessageOptions();                while(queueListenerStarted == true)                {                    System.Threading.Thread.Sleep(sleepTime);                    if (handler.GetInvocationList().Length > 0)                    {                        try                        {                            //GET will raise an exception if no message is in the queue.                            //we want to keep listening despite of the exception, see exception block                            //for detail                            listenerqueue.Get(message,messageOption,System.Reflection.Missing.Value);                            SAF.MessageQueue.Message safMessage = new SAF.MessageQueue.Message();                            safMessage.Label = message.MessageId;                            safMessage.Content = message.ReadString(message.MessageLength);                            //fire the event                            handler(safMessage,QueueName);                        }                        catch (System.Runtime.InteropServices.COMException ex)                        {                            //-2147467259 represents the error code for retrieving                            //message from an empty queue. do nothing if gotting this error code.                            if (ex.ErrorCode != -2147467259)                            {                                throw ex;                            }                        }                    }                }            }            finally            {                //close the connetion                listenerqueue.Close();                listenermqm.Disconnect();            }        }        /// <summary>        /// stop the listener        /// </summary>        public void StopQueueListener()        {            queueListenerStarted = false;        }        /// <summary>        /// open the connection to the message queue        /// </summary>        public void Open()        {            mqm = (MQQueueManager)queueSession.AccessQueueManager(queueManager);            queue = (MQQueue)mqm.AccessQueue(QueueName,(int)MQ.MQOO_INPUT_AS_Q_DEF + (int)MQ.MQOO_OUTPUT,"","","");            queue.Open();        }        /// <summary>        /// close the connection to the message queue        /// </summary>        public void Close()        {            if (queue != null)            {                queue.Close();            }            if (mqm != null)            {                mqm.Disconnect();            }        }    }}

    最新回复(0)