转帖地址:http://www.cnblogs.com/artech/archive/2007/06/29/799529.html
一、为什么要使用MSMQ
在一个分布式的环境中,我们往往需要根据具体的 情况采用不同的方式进行数据的传输。比如在一个Intranet内,我们一般通过TCP进行高效的数据通信;而在一个Internet的环境中,我们则通 常使用Http进行跨平台的数据交换。而这些通信方式具有一个显著的特点,那就是他们是基于Connection的,也就是说,交互双方在进行通信的时候 必须保证有一个可用的Connection存在于他们之间。而在某些时候,比如那些使用拨号连接的用户、以及使用便携式计算机的用户,我们不能保证在他们 和需要访问的Server之间有一个的可靠的连接,在这种情况下,基于Messaging Queue的连接就显得尤为重要了。我们今天就来谈谈在WCF中如何使用MSMQ。
MSMQ不仅仅是作为支持客户端连接工具而存在,合理的使用MSMQ可以在很大程度上提升系统的Performance和Scalability。我们先来看看MSMQ能给我们带来怎样的好处:
1.MSMQ是基于Disconnection
MSMQ通过Message Queue进行通信,这种通信方式为离线工作 成为了可能。比如在介绍MSMQ时都会提到的Order Delivery的例子:在一个基于B2C的系统中,订单从各种各样的客户传来,由于 客户的各异性,不能保证每个客户在每时每刻都和用于接收订单的Server保持一个可靠的连接,我们有时候甚至允许客户即使在离线 的 情况下也可以递交订单(虽然订单不能发送到订单的接收方,但是我们可以通过某种机制保证先在本地保存该订单,一旦连接建立,则马上向接收方递交订单),而 MSMQ则有效地提供了这样的机制:Server端建立一个Message Queue来接收来个客户的订单,客户端通过向该Message Queue发送承载了订单数据的Message实现订单的递交。如果在客户离线的情况下,他仍然可以通过客户端程序进行订单递交的操作,存储着订单数据的 Message会被暂时保存在本地的Message Queue中,一旦客户联机,MSMQ将Message从中取出,发送到真正的接收方,而这个动作对于用户的透明的。
2.MSMQ天生是One-way、异步的
在MSMQ中,Message始终以One-way的方式进行发送,所以MSMQ具有天生的异步特性。所以MSMQ使用于那些对于用户的请求,Server端无需立即响应 的场景。也就是说Server对数据的处理无需和Client的数据的发送进行同步 ,它可以独自地按照自己的Schedule进行工作。这可以避免峰值负载。比如Server端可以在一个相对低负载的时段(比如深夜)来对接收到的Order进行批处理,而无需一天24小时一直进行Order的监听、接收和处理。
3.MSMQ能够提供高质量的Reliable Messaging
我们知道,在一般的情况下,如果Client端 以异步的方式对Service进行调用就意味着:Client无法获知Message是否成功抵达Service端;也不会获得Service端执行的结 果和出错信息。但是我们仍然说MSMQ为我们提供了可靠的传输(Reliable Messaging),这主要是因为MSMQ为我们提供一些列Reliable Messaging的机制:
超时机制(Timeout): 可以设置发送和接收的时间,超出该时间则被认为操作失败。 确认机制(Acknowledgement): 当Message成功抵达Destination Queue,或者被成功接收,向发送端发送一个Acknowledgement message用以确认操作的状态。 日志机制(Journaling): 当Message被发送或接收后,被Copy一份存放在Journal Queue中。此外,MSMQ还提供了死信队列 (Dead letter Queue)用以保存发送失败的message。这一切保证了保证了Reliable Messaging。
二、 MSMQ在WCF的运用
在WCF中,MSMQ提供的数据传输功能被封装在一个Binding 中,提供WCF Endpoint之间 、以及Endpoint和现有的基于MSMQ的Application进行通信的实现。为此WCF为我们提供了两种不同的built-in binding:
NetMsmqBinding : 从提供的功能和使用 方式上看,NetMsmqBinding和一般使用的binding,比如basicHttpBinding,netTcpBinding没有什么区别: 在两个Endpoint之间实现了数据的通信,所不同的是,它提供的是基于MSMQ的Reliable Messaging。从变成模式上看,和一般的binding完全一样。 MsmqIntegrationBinding : 从命名上我们可以看出,MsmqIntegrationBinding主要用于需要将我们的WCF Application和现有的基于MSMQ的Application集成的情况。MsmqIntegrationBinding实现了WCF Endpoint和某个Message Queue进行数据的通信,具体来说,就是实现了单一的向某个Message Queue 发送Message,和从某个Message Queue中接收Message的功能。从编程模式上看,也有所不同,比如Operation只接收一个 MsmqMessage<T> 的参数。这是Client和Service通信的图示:
三、MSMQ和Transaction
MSMQ提供对Transaction的支持。在一般的情况下,MSMQ通过Message Queue Transaction 实现对Transaction的原生的支持,借助Message Queue Transaction,可以把基于一个或多个Message Queue的相关操作纳入同一个Transaction中。
Message Queue Transaction仅仅限于基于Message Queue的操作,倘若操作涉及到另外一些资源,比如SQL Server, 则可以使用基于DTC的分布式Transaction 。
对于WCF中MSMQ,由于Client和Service的相对独立(可能Client发送Message到Service处理Message会相隔很长一段时间),所以Client和Service的操作只能纳入不同的Transaction中,如下图。 四、Sample1:NetMsmqBinding
我们首先做一个基于NetMsmqBinding Sample,实现的功能就是我们开篇所提出的Order Delivery。我们说过,NetMsmqBinding和一般的binding在实现的功能和变成模式上完全一样。下面是我们熟悉的4层结构:
1.Contract
DataContract:Order & OrderItem
using System; using System.Collections.Generic; using System.Text; using System.Runtime.Serialization; namespace Artech.QueuedService.Contract { [DataContract] [KnownType( typeof (OrderItem))] public class Order { Private Fields #region Private Fields private Guid _orderNo; private DateTime _orderDate; private Guid _supplierID; private string _supplierName; private IList < OrderItem > _orderItems; #endregion Constructors #region Constructors public Order() { this ._orderItems = new List < OrderItem > (); } public Order(Guid orderNo, DateTime orderDate, Guid supplierID, string supplierName) { this ._orderNo = orderNo; this ._orderDate = orderDate; this ._supplierID = supplierID; this ._supplierName = supplierName; this ._orderItems = new List < OrderItem > (); } #endregion Public Properties #region Public Properties [DataMember] public Guid OrderNo { get { return _orderNo; } set { _orderNo = value; } } [DataMember] public DateTime OrderDate { get { return _orderDate; } set { _orderDate = value; } } [DataMember] public Guid SupplierID { get { return _supplierID; } set { _supplierID = value; } } [DataMember] public string SupplierName { get { return _supplierName; } set { _supplierName = value; } } [DataMember] public IList < OrderItem > OrderItems { get { return _orderItems; } set { _orderItems = value; } } #endregion Public Methods #region Public Methods public override string ToString() { string description = string .Format( " General Informaion:/n/tOrder No./t: {0}/n/tOrder Date/t: {1}/n/tSupplier No./t: {2}/n/tSupplier Name/t: {3} " , this ._orderNo, this ._orderDate.ToString( " yyyy/MM/dd " ), this ._supplierID, this ._supplierName); StringBuilder productList = new StringBuilder(); productList.AppendLine( " /nProducts: " ); int index = 0 ; foreach (OrderItem item in this ._orderItems) { productList.AppendLine( string .Format( " /n{4}. /tNo./t/t: {0}/n/tName/t/t: {1}/n/tPrice/t/t: {2}/n/tQuantity/t: {3} " , item.ProductID, item.ProductName, item.UnitPrice, item.Quantity, ++ index)); } return description + productList.ToString(); } #endregion } } using System; using System.Collections.Generic; using System.Text; using System.Runtime.Serialization; namespace Artech.QueuedService.Contract { [DataContract] public class OrderItem { Private Fields #region Private Fields private Guid _productID; private string _productName; private decimal _unitPrice; private int _quantity; #endregion Constructors #region Constructors public OrderItem() { } public OrderItem(Guid productID, string productName, decimal unitPrice, int quantity) { this ._productID = productID; this ._productName = productName; this ._unitPrice = unitPrice; this ._quantity = quantity; } #endregion Public Properties #region Public Properties [DataMember] public Guid ProductID { get { return _productID; } set { _productID = value; } } [DataMember] public string ProductName { get { return _productName; } set { _productName = value; } } [DataMember] public decimal UnitPrice { get { return _unitPrice; } set { _unitPrice = value; } } [DataMember] public int Quantity { get { return _quantity; } set { _quantity = value; } } #endregion } }
ServiceContract: IOrderProcessor
using System; using System.Collections.Generic; using System.Text; using System.ServiceModel; namespace Artech.QueuedService.Contract { [ServiceContract] [ServiceKnownType( typeof (Order))] public interface IOrderProcessor { [OperationContract(IsOneWay = true )] void Submit(Order order); } }2.Service:IOrderProcessor :
using System; using System.Collections.Generic; using System.Text; using Artech.QueuedService.Contract; using System.ServiceModel; namespace Artech.QueuedService.Service { public class OrderProcessorService:IOrderProcessor { ISubmitOrder Members #region ISubmitOrder Members [OperationBehavior(TransactionScopeRequired = true , TransactionAutoComplete = true )] public void Submit(Order order) { Orders.Add(order); Console.WriteLine( " Receive an order. " ); Console.WriteLine(order.ToString()); } #endregion } } using System; using System.Collections.Generic; using System.Text; using Artech.QueuedService.Contract; namespace Artech.QueuedService.Service { public static class Orders { private static IDictionary < Guid, Order > _orderList = new Dictionary < Guid, Order > (); public static void Add(Order order) { _orderList.Add(order.OrderNo, order); } public static Order GetOrder(Guid orderNo) { return _orderList[orderNo]; } } }3.Hosting
Configuration
<? xml version="1.0" encoding="utf-8" ?> < configuration > < system .serviceModel > < bindings > < netMsmqBinding > < binding name ="msmqBinding" > < security > < transport msmqAuthenticationMode ="None" msmqProtectionLevel ="None" /> < message clientCredentialType ="None" /> </ security > </ binding > </ netMsmqBinding > </ bindings > < services > < service name ="Artech.QueuedService.Service. OrderProcessorService" > < endpoint address ="net.msmq://localhost/private/orders" binding ="netMsmqBinding" bindingConfiguration ="msmqBinding" contract ="Artech.QueuedService.Contract.IOrderProcessor" /> </ service > </ services > </ system.serviceModel > </ configuration >在默认的情况下,netMsmqBinding 的msmqAuthenticationMode 为WindowsDomain ,由于基于WindowsDomain必须安装AD ,利于在本机模拟,我把msmqAuthenticationMode改为None,相应的ProtectionLevel和clientCredentialType改为None。
Program:
using System; using System.Collections.Generic; using System.Text; using System.Messaging; using System.ServiceModel; using Artech.QueuedService.Service; namespace Artech.QueuedService.Hosting { class Program { static void Main( string [] args) { string path = @" ./private$/orders " ; if ( ! MessageQueue.Exists(path)) { MessageQueue.Create(path, true ); } using (ServiceHost host = new ServiceHost( typeof (OrderProcessorService))) { host.Opened += delegate { Console.WriteLine( " Service has begun to listen /n/n " ); } ; host.Open(); Console.Read(); } } } }在Host Service之前,通过MessageQueue.Create创建一个Message Queue,第二个参数为表明Queue是否支持Transaction的indicator,这里支持Transaction。
4.Client:
Configuration
<? xml version="1.0" encoding="utf-8" ?> < configuration > < system .serviceModel > < bindings > < netMsmqBinding > < binding name ="msmqBinding" > < security > < transport msmqAuthenticationMode ="None" msmqProtectionLevel ="None" /> < message clientCredentialType ="None" /> </ security > </ binding > </ netMsmqBinding > </ bindings > < client > < endpoint address ="net.msmq://localhost/private/orders" binding ="netMsmqBinding" bindingConfiguration ="msmqBinding" contract ="Artech.QueuedService.Contract.IOrderProcessor" name ="defaultEndpoint" /> </ client > </ system.serviceModel > </ configuration >Program
using System; using System.Collections.Generic; using System.Text; using Artech.QueuedService.Contract; using System.ServiceModel; using System.Transactions; namespace Artech.QueuedService.Client { class Program { static void Main( string [] args) { ChannelFactory < IOrderProcessor > channelFactory = new ChannelFactory < IOrderProcessor > ( " defaultEndpoint " ); IOrderProcessor channel = channelFactory.CreateChannel(); Order order = new Order(Guid.NewGuid(),DateTime.Today,Guid.NewGuid(), " A Company " ); order.OrderItems.Add( new OrderItem(Guid.NewGuid(), " PC " , 5000 , 20 )); order.OrderItems.Add( new OrderItem(Guid.NewGuid(), " Printer " , 7000 , 2 )); Console.WriteLine( " Submit order to server " ); using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required)) { channel.Submit(order); scope.Complete(); } Console.Read(); } } }先后运行Host和Client,Host端有下面的输出: