应用框架的设计与实现——.NET平台(9 MSMQMQSeries 访问)

    技术2022-05-11  89

    消息队列的应用场景 当有多个用户同时访问一个服务时,如果用户数量超出服务的承受能力,会造成阻塞、用户请求无反应或者用户请求丢失。 在这种情况下只有使用一个中间服务,让中间服务来缓存用户的请求,才能保证服务的正常运行而不至于丧失服务能力。 很多情况下我们都可以使用消息队列来充当缓存服务。 消息队列的好处: 1.通过部署多个服务器来处理队列中的消息,可以达到负载平衡; 2.通过让多个服务器来处理队列中的消息,可以达到容错目的; 3.客户请求数量急剧增加时,消息队列可以起到缓冲区的作用; 4.有助于单一或多个系统平台上的应用集成。 MSMQ: 有四种队列:发出队列、公共队列、专有队列、系统队列。 公共队列要向活动目录登记,专有队列不向活动目录注册,所以需要知道其机器名,消息队列名称等物理位置。 使用System.Messaging命名空间向MSMQ发送消息包含四个步骤:1.创建队列对象2.创建消息对象3.发送消息4.关闭队列 定位队列有两个方法,1是使用格式名,2是使用队列路径;作者建议我们尽量不要使用队列路径,因为其依赖于活动目录和域控制器。 从MSMQ获取消息包含三个步骤:1.创建队列对象2.获取消息3.关闭队列 Receive()方法会在队列中无消息时阻塞当前线程,可以使用一个时间参数指定线程的阻塞时间Receive(TimeSpan ts),超过时间后抛出异常。 如果我们发送了一个订单对象,但是接收时没有指定订单类型,那么取消息时会怎样?(尚未验证) 选项1.不会取到该消息 选项2.取到消息时异常 选项3.只是不能构造原对象,当然也无法按对象使用 MQSeries: 可在多种操作系统平台上使用,包括大型机平台。 .net 类库没有提供使用MQSeries的组件,我们可以通过包装MQSeries的COM来访问它。 MQAX200.dll 是MQSeries的COM动态链接库,它提供了编程所需的所有功能。 向MQSeries发送消息包括四个步骤:1.打开队列2.创建消息3.发送消息4.关闭队列 从MQSeries获取消息包括四个步骤:1.打开队列2.创建空消息3.获取消息4.关闭队列 MQSeries API提供的Get()方法即使队列为空也不会阻塞线程,而是以抛出异常的方式说明无消息。 以下代码是使用MSMQ发送接收消息的演示: 发送消息Form1 using  System; using  System.Collections.Generic; using  System.ComponentModel; using  System.Data; using  System.Drawing; using  System.Text; using  System.Windows.Forms; using  System.Messaging; namespace  MSMQSend {    public partial class Form1 : Form    {        public Form1()        {            InitializeComponent();        }    // 发送消息方法        public void Post(string label, object body)        {            MessageQueue mq = null;            try            {                mq = new MessageQueue(@"formatname:direct=os:.private$yifengtest");    // 格式名方式                System.Messaging.Message message = new System.Messaging.Message();                message.Label = label;                message.Body = body;    // 可以是任何对象                mq.Send(message);    // 发送消息            }            finally            {                mq.Close();            }        }        private void button1_Click(object sender, EventArgs e)        {            Post(this.textBox1.Text, this.textBox2.Text);        }    }} 接收消息类Receive using  System; using  System.Collections.Generic; using  System.Text; using  System.Messaging; using  System.Threading; namespace  MSMQReceive {    class Receive    {        public delegate void NewMessageHandler(object[] objs);        public event NewMessageHandler newMessage;    // 取消息方法        private object[] ReceiveMessage()        {            MessageQueue mq = null;            try            {                mq = new MessageQueue(@"formatname:direct=os:.private$yifengtest");    // 通过格式名访问消息队列                ((XmlMessageFormatter)mq.Formatter).TargetTypeNames = new string[1typeof(string).ToString() };    // 能接收的对象类型                System.Messaging.Message message = mq.Receive();    // 获取消息                return new object[] { message.Label, message.Body };            }            finally            {                mq.Close();            }        }    // 监听方法        public void loopReceive()        {            while (true)            {                object[] objs = ReceiveMessage();                newMessage(objs);    // 取得消息后通过委托发布            }        }    }} 接收消息Form1 using  System; using  System.Collections.Generic; using  System.ComponentModel; using  System.Data; using  System.Drawing; using  System.Text; using  System.Windows.Forms; using  System.Threading; namespace  MSMQReceive {    public partial class Form1 : Form    {        Thread receiveThread = null;        Receive rece = new Receive();        delegate void setText(string text);        public Form1()        {            InitializeComponent();        }        private void Form1_Load(object sender, EventArgs e)        {            rece.newMessage += new Receive.NewMessageHandler(rece_newMessage);    // 订阅            ThreadStart start = new ThreadStart(rece.loopReceive);    // 新线程            receiveThread = new Thread(start);            receiveThread.Start();    // 启动新线程        }    // 处理消息的方法        void rece_newMessage(object[] objs)        {            string fm = formatMessage(objs);            SetText(fm);        }    // 从线程中获取的消息显示到主界面上来        private void SetText(string text)        {            if (this.textBox2.InvokeRequired)            {                setText st = new setText(SetText);                this.Invoke(st, new object[] { text });            }            else            {                this.textBox2.Text += text;            }        }    // 消息显示格式        private string formatMessage(object[] objs)        {            if (objs.Length == 2)            {                return string.Format("L:{0};B:{1} ", objs[0].ToString().PadRight(7), objs[1]);            }            return "";        }    // 窗口关闭时关闭线程        private void Form1_FormClosing(object sender, FormClosingEventArgs e)        {            receiveThread.Abort();        }    }}

    最新回复(0)