消息队列的应用场景
当有多个用户同时访问一个服务时,如果用户数量超出服务的承受能力,会造成阻塞、用户请求无反应或者用户请求丢失。
在这种情况下只有使用一个中间服务,让中间服务来缓存用户的请求,才能保证服务的正常运行而不至于丧失服务能力。
很多情况下我们都可以使用消息队列来充当缓存服务。
消息队列的好处:
1.通过部署多个服务器来处理队列中的消息,可以达到负载平衡;
2.通过让多个服务器来处理队列中的消息,可以达到容错目的;
3.客户请求数量急剧增加时,消息队列可以起到缓冲区的作用;
4.有助于单一或多个系统平台上的应用集成。
MSMQ:
有四种队列:发出队列、公共队列、专有队列、系统队列。
公共队列要向活动目录登记,专有队列不向活动目录注册,所以需要知道其机器名,消息队列名称等物理位置。
使用System.Messaging命名空间向MSMQ发送消息包含四个步骤:1.创建队列对象2.创建消息对象3.发送消息4.关闭队列
定位队列有两个方法,1是使用格式名,2是使用队列路径;作者建议我们尽量不要使用队列路径,因为其依赖于活动目录和域控制器。
从MSMQ获取消息包含三个步骤:1.创建队列对象2.获取消息3.关闭队列
Receive()方法会在队列中无消息时阻塞当前线程,可以使用一个时间参数指定线程的阻塞时间Receive(TimeSpan ts),超过时间后抛出异常。
如果我们发送了一个订单对象,但是接收时没有指定订单类型,那么取消息时会怎样?(尚未验证)
选项1.不会取到该消息
选项2.取到消息时异常
选项3.只是不能构造原对象,当然也无法按对象使用
MQSeries:
可在多种操作系统平台上使用,包括大型机平台。
.net 类库没有提供使用MQSeries的组件,我们可以通过包装MQSeries的COM来访问它。
MQAX200.dll 是MQSeries的COM动态链接库,它提供了编程所需的所有功能。
向MQSeries发送消息包括四个步骤:1.打开队列2.创建消息3.发送消息4.关闭队列
从MQSeries获取消息包括四个步骤:1.打开队列2.创建空消息3.获取消息4.关闭队列
MQSeries API提供的Get()方法即使队列为空也不会阻塞线程,而是以抛出异常的方式说明无消息。
以下代码是使用MSMQ发送接收消息的演示:
发送消息Form1
using
System;
using
System.Collections.Generic;
using
System.ComponentModel;
using
System.Data;
using
System.Drawing;
using
System.Text;
using
System.Windows.Forms;
using
System.Messaging;
namespace
MSMQSend
...
{ public partial class Form1 : Form ...{ public Form1() ...{ InitializeComponent(); } // 发送消息方法 public void Post(string label, object body) ...{ MessageQueue mq = null; try ...{ mq = new MessageQueue(@"formatname:direct=os:.private$yifengtest"); // 格式名方式 System.Messaging.Message message = new System.Messaging.Message(); message.Label = label; message.Body = body; // 可以是任何对象 mq.Send(message); // 发送消息 } finally ...{ mq.Close(); } } private void button1_Click(object sender, EventArgs e) ...{ Post(this.textBox1.Text, this.textBox2.Text); } }}
接收消息类Receive
using
System;
using
System.Collections.Generic;
using
System.Text;
using
System.Messaging;
using
System.Threading;
namespace
MSMQReceive
...
{ class Receive ...{ public delegate void NewMessageHandler(object[] objs); public event NewMessageHandler newMessage; // 取消息方法 private object[] ReceiveMessage() ...{ MessageQueue mq = null; try ...{ mq = new MessageQueue(@"formatname:direct=os:.private$yifengtest"); // 通过格式名访问消息队列 ((XmlMessageFormatter)mq.Formatter).TargetTypeNames = new string[1] ...{ typeof(string).ToString() }; // 能接收的对象类型 System.Messaging.Message message = mq.Receive(); // 获取消息 return new object[] ...{ message.Label, message.Body }; } finally ...{ mq.Close(); } } // 监听方法 public void loopReceive() ...{ while (true) ...{ object[] objs = ReceiveMessage(); newMessage(objs); // 取得消息后通过委托发布 } } }}
接收消息Form1
using
System;
using
System.Collections.Generic;
using
System.ComponentModel;
using
System.Data;
using
System.Drawing;
using
System.Text;
using
System.Windows.Forms;
using
System.Threading;
namespace
MSMQReceive
...
{ public partial class Form1 : Form ...{ Thread receiveThread = null; Receive rece = new Receive(); delegate void setText(string text); public Form1() ...{ InitializeComponent(); } private void Form1_Load(object sender, EventArgs e) ...{ rece.newMessage += new Receive.NewMessageHandler(rece_newMessage); // 订阅 ThreadStart start = new ThreadStart(rece.loopReceive); // 新线程 receiveThread = new Thread(start); receiveThread.Start(); // 启动新线程 } // 处理消息的方法 void rece_newMessage(object[] objs) ...{ string fm = formatMessage(objs); SetText(fm); } // 从线程中获取的消息显示到主界面上来 private void SetText(string text) ...{ if (this.textBox2.InvokeRequired) ...{ setText st = new setText(SetText); this.Invoke(st, new object[] ...{ text }); } else ...{ this.textBox2.Text += text; } } // 消息显示格式 private string formatMessage(object[] objs) ...{ if (objs.Length == 2) ...{ return string.Format("L:{0};B:{1} ", objs[0].ToString().PadRight(7), objs[1]); } return ""; } // 窗口关闭时关闭线程 private void Form1_FormClosing(object sender, FormClosingEventArgs e) ...{ receiveThread.Abort(); } }}