我的WCF之旅(13):创建基于MSMQ的Responsive Service

    技术2022-06-30  74

    转帖地址:http://www.cnblogs.com/artech/archive/2007/07/01/802069.html

     

    一、One-way MEP V.S. Responsible Service

    我们知道MSMQ天生就具有异步的特性,它只能 以One-way的MEP(Message Exchange Pattern)进行通信。Client和Service之间采用One-way MEP的话就意味着Client调用Service之后立即返回,它无法获得Service的执行结果,也无法捕捉Service运行的 Exception。下图简单表述了基于MSMQ的WCF Service中Client和Service的交互。

    但 是在有些场景 中,这是无法容忍的。再拿我在上一篇文章的Order Delivery的例子来说。Client向Service提交了Order,却无法确认该Order是否被Service正确处理,这显然是不能接受 的。我们今天就来讨论一下,如何创建一个Responsive Service来解决这个问题:Client不再是对Service的执行情况一无所知,它可以获知Order是否被Service正确处理了。

    二、 Solution

    虽然我们的目的很简单:当Client向 Service递交了Order之后,能以某种方式获知Order的执行结果;对于Service端来说,在正确把Order从Message Queue中获取出来、并正确处理之后,能够向Order的递交者发送一个Acknowledge Message。为了简单起见,这个Acknowledge Message包含两组信息:

    Order No.: 被处理的Order的一个能够为一标志它的ID。 Exception: 如果处理失败的Exception,如果成功处理为null。

    要在WCF中实现这样的目的,对于 Request/Reply MEP来说是简单而直接的:Client向Service递交Order,并等待Service的Response,Service在处理接收到 Order之后直接将处理结果 返回给Client就可以了。但是我们说过MSMQ天生就是异步的,我们只有采取一种间接的方式实现“曲线救国”。

    我们的解决方案是:在每个Client Domain也创建一个基于MSMQ的本地的WCF Service,用于接收来自Order处理端发送的Acknowledge Message。对于处理Order 的Service来说,在正确处理Order之后,想对应的Client发送Acknowledge Message。下图简单演示整个过程:

    三、Implementation

    了解了上面的Solution之后,我们来看看 该Solution在真正实现过程中有什么样的困难。对于处理Order的Service来说,在向Client端发送Acknowledge Message的时候,它必须要知道该Order对应的Client的Response Service的MSMQ的Address以及其他和Operation相关的Context信息(在这里我们不需要,不过考虑到扩展性,我们把包括了 address的Context的信息 封装到一个了Class中,在这里叫做:OrderResponseContext)。而这些Context却不能在Configuration中进行配 置,因为他可以同时面临着很多个Client:比如每个Client用于接收Response 的Message Queue的address都不一样。所以这个OrderResponseContext必须通过对应的Client来提供。基于此,我们具有两面两种解 决方式:

    方式一、修改Service Contract,把OrderResponseContext当成是Operation的一个参数

    这是我们最容易想到的,比如我们原来的Operation这样定义:

    namespace  Artech.ResponsiveQueuedService.Contract {     [ServiceContract]     [ServiceKnownType( typeof (Order))]      public   interface  IOrderProcessor      {         [OperationContract(IsOneWay  =   true )]          void  Submit(Order order);     } }

    现在变成:

    namespace  Artech.ResponsiveQueuedService.Contract {     [ServiceContract]     [ServiceKnownType( typeof (Order))]      public   interface  IOrderProcessor      {         [OperationContract(IsOneWay  =   true )]          void  Submit(Order order, OrderResponseContext responseContext);     } }

    虽然这种方式看起来不错,但是却不值得推荐。在 一般情况下,我们的Contract需要是很稳定的,一经确定就不能轻易更改,因为Contract是被交互的多方共同支持的,牵一发动全身;此外,从 Service Contract代表的是Service的一个Interface,他是对业务逻辑的抽象、和具体实现无关,而对于我们的例子来说,我们仅仅是定义一个递 交Order的Operation,从业务逻辑来看,OrderResponseContext和抽象的业务逻辑毫无关系。基于此,我们需要寻求一种和 Service Contract无关的解决方式:

    方式二、将OrderResponseContext放到Soap Message 的Header中

    其实我们要解决的问题很简单,就是要把 OrderResponseContext的信息置于Soap Message中发送到Service。而我们知道,Soap的Header具有极强的可伸缩性,原则上,我们可以把任何控制信息置于Header中。基 于WCF的编程模式很容易地帮助我们实现对Soap Header的插入和获取:

    我们可以通过下面的方式获得当前Operation Context的Incoming Message Headers和Outgoing Message Headers

    OperationContext.Current.IncomingMessageHeaders OperationContext.Current.OutgoingMessageHeaders

    如果我们要把一个OrderResponseContext 对象插入到当前Operation Context的Outgoing Message Headers中,我们可以通过下面的代码来实现:

    OrderResponseContext context  =   new  OrderResponseContext(); MessageHeader < OrderResponseContext >  header  =   new  MessageHeader < OrderResponseContext > ( context); OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader( " name " " namespace " ));

    相应的,我们可以通过下面的代码从Outgoing Message Headers OrderResponseContext的数据获取的内容:

    OrderResponseContext context  =  OperationContext.Current.IncomingMessageHeaders.GetHeader < OrderResponseContext > ( " name " " namespace " ));

    四、Sample

    我们照例给出一个完整的Sample,下面是整个Solution的结构:

    除了一贯使用的4层结构(Contract-Service-Hosting-Client),还为ResponseService增加了下面两层:

    Localservice: 作为Client Domain的ResponseService。 LocalHosting:Host Localservice。

    1.Contract:  Artech.ResponsiveQueuedService.Contract

    Service Contract: Artech.ResponsiveQueuedService.Contract. IOrderProcessor

    using  System; using  System.Collections.Generic; using  System.Text; using  System.ServiceModel; namespace  Artech.ResponsiveQueuedService.Contract {     [ServiceContract]     [ServiceKnownType( typeof (Order))]      public   interface  IOrderProcessor      {         [OperationContract(IsOneWay  =   true )]          void  Submit(Order order);     } }

    Service Contract: Artech.ResponsiveQueuedService.Contract.IOrderRessponse

    using  System; using  System.Collections.Generic; using  System.Text; using  System.ServiceModel; namespace  Artech.ResponsiveQueuedService.Contract {     [ServiceContract]      public   interface   IOrderRessponse      {         [OperationContract(IsOneWay  = true )]          void  SubmitOrderResponse(Guid orderNo,FaultException exception);     } }

    接收来自Order processing端的Response:Order No.和Exception。

    Data Contract: Artech.ResponsiveQueuedService.Contract.Order

    using  System; using  System.Collections.Generic; using  System.Text; using  System.Runtime.Serialization; namespace  Artech.ResponsiveQueuedService.Contract {     [DataContract]      public   class  Order      {          Private Fields          Constructors          Public Properties          Public Methods     } }

    对Order的封装。

    Data Contract:Artech.ResponsiveQueuedService.Contract. OrderResponseContext

    using  System; using  System.Collections.Generic; using  System.Text; using  System.Runtime.Serialization; using  System.ServiceModel; namespace  Artech.ResponsiveQueuedService.Contract {         [DataContract]      public   class  OrderResponseContext      {          private  Uri _responseAddress;         [DataMember]          public  Uri ResponseAddress          {              get   return  _responseAddress; }              set   { _responseAddress  =  value; }         }          public   static  OrderResponseContext Current          {              get              {                  if  (OperationContext.Current  ==   null )                  {                      return   null ;                 }                  return  OperationContext.Current.IncomingMessageHeaders.GetHeader < OrderResponseContext > ( " OrderResponseContext " " Artech.ResponsiveQueuedService.Contract " );             }              set              {                 MessageHeader < OrderResponseContext >  header  =   new  MessageHeader < OrderResponseContext > (value);                 OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader( " OrderResponseContext " " Artech.ResponsiveQueuedService.Contract " ));             }         }     } }

    ResponseAddress 代表Host在Client Domain的Response Service的Address。同过Current把OrderResponseContext插入到Outgoing Message Headers 中、以及从Ingoing Message Headers 取出OrderResponseContext对象。

    2.Order Processing Service:Artech.ResponsiveQueuedService.Service

    using  System; using  System.Collections.Generic; using  System.Text; using  Artech.ResponsiveQueuedService.Contract; using  System.ServiceModel; using  System.Net.Security; namespace  Artech.ResponsiveQueuedService.Service {      public   class  OrderProcessorService:IOrderProcessor      {          private   void  ProcessOrder(Order order)          {              if  (order.OrderDate  <  DateTime.Today)              {                  throw   new  Exception();             }         }          IOrderProcessor Members     } }

    在这里我们模拟了这样的场景:先通过Order Date判断Order是否过期,如果过期创建一个FaultException,否则正确处理该Order,然后通过 OrderResponseContext.Current从Incoming Message Header中获取封装在OrderResponseContext对象中的Response Address,创建Binding并调用Response Service.

    3. Order Processing Service Hosting: Artech.ResponsiveQueuedService.Hosting

    Configuration

    <? xml version="1.0" encoding="utf-8"  ?> < configuration >    < appSettings >      < add  key ="msmqPath"  value ="./private$/orderprocessor" />    </ appSettings >    < system .serviceModel >      < bindings >        < netMsmqBinding >          < binding  name ="MsmqBinding"  exactlyOnce ="false"  useActiveDirectory ="false" >            < security >              < transport  msmqAuthenticationMode ="None"  msmqProtectionLevel ="None"   />            </ security >          </ binding >        </ netMsmqBinding >      </ bindings >      < services >        < service  name ="Artech.ResponsiveQueuedService.Service.OrderProcessorService" >          < endpoint  address ="net.msmq://localhost/private/orderprocessor"  binding ="netMsmqBinding"             bindingConfiguration ="MsmqBinding"  contract ="Artech.ResponsiveQueuedService.Contract.IOrderProcessor"   />        </ service >      </ services >    </ system.serviceModel > </ configuration >

    Program

    using  System; using  System.Collections.Generic; using  System.Text; using  Artech.ResponsiveQueuedService.Service; using  System.ServiceModel; using  System.Configuration; using  System.Messaging; namespace  Artech.ResponsiveQueuedService.Hosting {      class  Program      {          static   void  Main( string [] args)          {              string  path  =  ConfigurationManager.AppSettings[ " msmqPath " ];              if  ( ! MessageQueue.Exists(path))              {                 MessageQueue.Create(path);             }              using  (ServiceHost host  =   new  ServiceHost( typeof (OrderProcessorService)))              {                 host.Opened  +=   delegate                  {                     Console.WriteLine( " The Order Processor service has begun to listen " );                 } ;                 host.Open();                 Console.Read();             }         }     } }

    4. Response Service: Artech.ResponsiveQueuedService.LocalService.OrderRessponseService

    using  System; using  System.Collections.Generic; using  System.Text; using  Artech.ResponsiveQueuedService.Contract; using  System.ServiceModel; namespace  Artech.ResponsiveQueuedService.LocalService {      public   class  OrderRessponseService : IOrderRessponse      {          IOrderRessponse Members     } }

    5. Response Service Hosting: Artech.ResponsiveQueuedService.LocalhHosting

    Configuration

    <? xml version="1.0" encoding="utf-8"  ?> < configuration >    < appSettings >      < add  key ="msmqPath"  value ="./private$/orderresponse" />    </ appSettings >    < system .serviceModel >      < bindings >        < netMsmqBinding >          < binding  name ="msmqBinding"  exactlyOnce ="false" >            < security >              < transport  msmqAuthenticationMode ="None"  msmqProtectionLevel ="None"   />            </ security >          </ binding >        </ netMsmqBinding >      </ bindings >      < services >        < service  name ="Artech.ResponsiveQueuedService.LocalService.OrderRessponseService" >          < endpoint  address ="net.msmq://localhost/private/orderresponse"  binding ="netMsmqBinding"             bindingConfiguration ="msmqBinding"  contract ="Artech.ResponsiveQueuedService.Contract.IOrderRessponse"   />        </ service >      </ services >    </ system.serviceModel > </ configuration >

    Program

    using  System; using  System.Collections.Generic; using  System.Text; using  Artech.ResponsiveQueuedService.LocalService; using  System.Configuration; using  System.ServiceModel; using  System.Messaging; namespace  Artech.ResponsiveQueuedService.LocalhHosting {      class  Program      {          static   void  Main( string [] args)          {              string  path  =  ConfigurationManager.AppSettings[ " msmqPath " ];              if  ( ! MessageQueue.Exists(path))              {                 MessageQueue.Create(path);             }              using  (ServiceHost host  =   new  ServiceHost( typeof (OrderRessponseService)))              {                 host.Opened  +=   delegate                  {                     Console.WriteLine( " The Order Response service has begun to listen " );                 } ;                 host.Open();                 Console.Read();             }         }     } }

    6. Client: Artech.ResponsiveQueuedService.Client

    Configuration:

    <? xml version="1.0" encoding="utf-8"  ?> < configuration >    < appSettings >      < add  key ="msmqPath"  value ="net.msmq://localhost/private/orderresponse" />    </ appSettings >    < system .serviceModel >      < bindings >        < netMsmqBinding >          < binding  name ="MsmqBinding"  exactlyOnce ="false"  useActiveDirectory ="false" >            < security >              < transport  msmqAuthenticationMode ="None"  msmqProtectionLevel ="None"   />            </ security >          </ binding >        </ netMsmqBinding >      </ bindings >      < client >        < endpoint  address ="net.msmq://localhost/private/orderprocessor"  binding ="netMsmqBinding"             bindingConfiguration ="MsmqBinding"  contract ="Artech.ResponsiveQueuedService.Contract.IOrderProcessor"  name ="defaultEndpoint"   />      </ client >    </ system.serviceModel > </ configuration >

    Program:

    using  System; using  System.Collections.Generic; using  System.Text; using  System.Configuration; using  System.ServiceModel; using  Artech.ResponsiveQueuedService.Contract; using  System.Messaging; namespace  Artech.ResponsiveQueuedService.Clinet {      class  Program      {          static   void  Main( string [] args)          {             Order order1  =   new  Order(Guid.NewGuid(), DateTime.Today.AddDays( 5 ), Guid.NewGuid(),  " Supplier A " );             Order order2  =   new  Order(Guid.NewGuid(), DateTime.Today.AddDays( - 5 ), Guid.NewGuid(),  " Supplier A " );              string  path  =  ConfigurationManager.AppSettings[ " msmqPath " ];             Uri address  =   new  Uri(path);             OrderResponseContext context  =   new  OrderResponseContext();             context.ResponseAddress  =  address;             ChannelFactory < IOrderProcessor >  channelFactory  =   new  ChannelFactory < IOrderProcessor > ( " defaultEndpoint " );             IOrderProcessor orderProcessor  =  channelFactory.CreateChannel();              using  (OperationContextScope contextScope  =   new  OperationContextScope(orderProcessor  as  IContextChannel))              {                 Console.WriteLine( " Submit the order of order No.: {0} " , order1.OrderNo);                 OrderResponseContext.Current  =  context;                 orderProcessor.Submit(order1);             }              using  (OperationContextScope contextScope  =   new  OperationContextScope(orderProcessor  as  IContextChannel))              {                 Console.WriteLine( " Submit the order of order No.: {0} " , order2.OrderNo);                 OrderResponseContext.Current  =  context;                 orderProcessor.Submit(order2);             }             Console.Read();         }     } }

    我创建了两个Order对象, 其中一个已经过期。从Configuration中取出Response Address并购建一个OrderResponseContext,然后分两次将这两个Order向Order Processing Service递交。在调用Order Processing Order的Operation Context Scope中,通过OrderResponseContext.Current将OrderResponseContext对象插入Outcoming Message Header中。

    我们现在运行一下整个程序,看看最终的输出结果:

    Client:

    Order Processing:

    Order Response: Reference: Build a Queued WCF Response Service


    最新回复(0)