EJB整理3-消息驱动Bean(MDB)

    技术2022-06-12  41

    1 消息驱动Bean(MDB)

    1.1 JMS(Java Message Service)

    1.1.1 JMS概念

    JMSjava的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输,JMS支持两种消息模型:Point-to- PointPublish/Subscribepub/sub),即点对点和发布/订阅模式。这两种消息传递模型非常相似,但有以下区别:

     

    PTP  消息传递模型规定了一条消息只能传递给一个接收方。Pub/sub  消息传递模型允许一条消息传递给多个接收方。

     

    消息传递系统的中心就是消息。一条  Message  由三个部分组成:

    头(header)每条  JMS  消息都必须具有消息头。头字段包含用于路由和识别消息的值。可以通过多种方式来设置消息头的值:   由  JMS  提供者在生成或传送消息的过程中自动设置由生产者客户机通过在创建消息生产者时指定的设置进行设置由生产者客户机逐一对各条消息进行设置 属性(property)消息可以包含称作属性的可选头字段。它们是以属性名和属性值对的形式指定的。可以将属性视为消息头的扩展,其中可以包括以下信息:创建数据的进程、数据的创建时间以及每条数据的结构。JMS  提供者也可以添加影响消息处理的属性,如是否应压缩消息或如何在消息生命周期结束时废弃消息。主体(body)包含要发送给接收应用程序的内容。每个消息接口特定于它所支持的内容类型。JMS  为不同类型的内容提供了它们各自的消息类型,但是所有消息都派生自 Message  接口。   StreamMessage:一种主体中包含 Java  基元值流的消息。其填充和读取均按顺序进行。MapMessage:一种主体中包含一组名-值对的消息。没有定义条目顺序。TextMessage:一种主体中包含  Java  字符串的消息(例如,XML  消息)。ObjectMessage:一种主体中包含序列化  Java  对象的消息。BytesMessage:一种主体中包含连续字节流的消息。
    1.1.2 消息的传递模型

    Ø 点对点消息传递

    通过点对点 (PTP) 的消息传递模型,一个应用程序可以向另一个应用程序发送消息。在此传递模型中,目标类型是队列。消息首先被传送至队列目标,然后从该队列将消息传送至对此队列进行监听的某个消费者,如下图:

    一个队列可以关联多个队列发送方和接收方,但一条消息仅传递给一个接收方。如果多个接收方正在监听队列上的消息,JMS Provider 将根据“先来者优先”的原则确定由哪个接收方接收下一条消息。如果没有接收方在监听队列,消息将保留在队列中,直至接收方连接到队列为止。这种消息传递模型是传统意义上的拉模型或轮询模型。在此类模型中,消息不是自动推送给客户端的,而是要由客户端从队列中请求获得。

    Ø 发布/订阅消息传递

    通过发布/订阅 (pub/sub) 消息传递模型,应用程序能够将一条消息发送到多个接收方。在此传送模型中,目标类型是主题。消息首先被传送至主题目标,然后传送至所有已订阅此主题的活动消费者。 如下图:

    主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时该消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与 PTP 消息传递模型不同,pub/sub 消息传递模型允许多个主题订阅者接收同一条消息。JMS 一直保留消息,直至所有主题订阅者都收到消息为止。pub/sub 消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消费者无须通过主动请求或轮询主题的方式来获得新的消息。

    1.1.3 消息的消费

    在JMS中,消息的产生和消费时异步的。对于消费来说,JMS的消费者可以通过两张方式来消费消息:

    同步 – 订阅者或接收者调用receive方法来接受消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞。异步 – 订阅者或接收者可以注册为一个消息监听器,当消息到达之后,系统自动调用监听器的onMessage方法。对客户端来说,MDB局势异步消息的消费者。当消息到达之后,由容器负责调用MDB,客户端发送消息到destination,MDB作为一个MessageListener接收消息。

    一般而言,异步消息消费者的执行和伸缩性都优于同步消息接收者,体现在:

    1.异步消息接收者创建的网络流量比较小。单向推动消息,并使之通过管道进入消息监听器。管道操作支

    持将多条消息聚合为一个网络调用。

    2.异步消息接收者使用的线程比较少。异步消息接收者在不活动期间不使用线程。同步消息接收者在接收调用期间内使用线程。结果,线程可能会长时间保持空闲,尤其是如果该调用中指定了阻塞超时。

    3.对于服务器上运行的应用程序代码,使用异步消息接收者几乎总是最佳选择,尤其是通过消息驱动 Bean。使用异步消息接收者可以防止应用程序代码在服务器上执行阻塞操作。而阻塞操作会使服务器端线程空闲,甚至会导致死锁。阻塞操作使用所有线程时则发生死锁。如果没有空余的线程可以处理阻塞操作自身解锁所需的操作,则该操作永远无法停止阻塞。

    1.1.4 JMS编程模式

    Ø ConnectionFactory:创建Connection对象的工厂,针对两种不同的JMS消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两张,可以通过JNDI来查找ConnectionFactory对象。

    Ø Destination:消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic),对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源),所以,Destination实际上就是两种类型的对象:Queue、Topic。可以通过JNDI来查找Destination。

    Ø Connection:表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

    Ø Session:是我们操作消息的接口,可以通过session创建生产者、消费者、消息。Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

    Ø 消息生产者:由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者(send或publish方法)发送消息!

    Ø 消息消费者:由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscribe。可分别通过session的createReceiber(Queue)或createSubscribe(Topic)来创建。当然,也可以通过session的createDurableSubscribe方法来创建持久化的订阅者。

    Ø MessageListener:消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB就是一个MessageListener。

    1.2 MDB

    消息驱动 Bean(MDB)是设计用来专门处理基于消息请求的组件。MDB 负责处理消息,而EJB容器则负责处理服务(事务、安全、资源、并发、消息确认,等等),使 bean 开发者把精力集中在处理消息的业务逻辑上。如果你不使用 MDB,则必须编写一部分这些服务。MDB 像一个没有 local 和 remote 接口的无状态 Session Bean,它和无状态 Session Bean 一样也使用了实例池机制,容器可以为它创建大量的实例,用来并发处理成百上千个 JMS 消息。正因为 MDB 具有处理大量并发消息的能力,所以非常适合应用在一些消息网关产品。

    一个 MDB 通常要实现 MessageListener 接口,该接口定义了 onMessage()方法。Bean 通过它来处理收到的 JMS 消息。

    当容器检测到 bean 守候的管道有消息到达时,容器调用 onMessage()方法,将消息作为参数传入 MDB。MDB 在onMessage()中决定如何处理该消息。你可以使用注释指定 MDB 监听哪一个目标地址(Destination)。当 MDB 部署时,容器将读取其中的配置信息。

    如果一个业务执行的时间很长,而执行结果无需实时向用户反馈时,也很适合使用 MDB。如订单成功后给用户发送一封电子邮件或发送一条短信等。

    1.2.1 Queue消息的发送与接收(PTP消息传递模型)

    MDB例子代码如下:

    @MessageDriven( activationConfig = { @ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"), @ActivationConfigProperty(propertyName="destination", propertyValue="queue/myqueue") } ) public class MyMDBBean implements MessageListener { public void onMessage(Message msg) { try { TextMessage textMessage = (TextMessage)msg; System.out.println("MyMDBBean被调用了!【"+textMessage.getText()+"】"); } catch (JMSException e) { e.printStackTrace(); } } }  

    客户端例子代码:

    InitialContext ctx = new InitialContext(); //获取ConnectionFactory对象 QueueConnectionFactory factory = (QueueConnectionFactory)ctx.lookup("ConnectionFactory"); //创建QueueConnection对象 QueueConnection connection = factory.createQueueConnection(); //创建QueueSession对象,第一个参数表示事务自动提交,第二个参数标识一旦消息被正确送达,将自动发回响应 QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); //获得Destination对象 Queue queue = (Queue)ctx.lookup("queue/myqueue"); //创建文本消息 TextMessage msg = session.createTextMessage("世界,你好"); //创建发送者 QueueSender sender = session.createSender(queue); //发送消息 sender.send(msg); //关闭会话 session.close();  

    1.2.2 Topic消息的发送与接收(Pub/sub消息传递模型)

    MDB例子代码:

    @MessageDriven( activationConfig = { @ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Topic"), @ActivationConfigProperty(propertyName="destination", propertyValue="topic/mytopic") } ) public class MyTopicMDBBean implements MessageListener { public void onMessage(Message msg) { try { TextMessage textMessage = (TextMessage)msg; System.out.println("MyTopicMDBBean被调用了! 【"+textMessage.getText()+"】"); } catch (JMSException e) { e.printStackTrace(); } } }  

    客服端代码例子:

    InitialContext ctx = new InitialContext(); //获取ConnectionFactory对象 TopicConnectionFactory factory = (TopicConnectionFactory)ctx.lookup("ConnectionFactory"); //创建TopicConnection对象 TopicConnection connection = factory.createTopicConnection(); //创建TopicSession对象,第一个参数表示事务自动提交,第二个参数标识一旦消息被正确送达,将自动发回响应 TopicSession session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); //获得Destination对象 Topic topic = (Topic)ctx.lookup("topic/mytopic"); //创建文本消息 TextMessage msg = session.createTextMessage("世界,你好"); //创建发布者 TopicPublisher publisher = session.createPublisher(topic); //发送消息 publisher.publish(msg); //关闭会话 session.close();  

    1.2.3 消息选择器(Message selector)

    消息选择器允许 MDB 选择性地接收来自队列或主题特定的消息。消息选择器是基于消息属性进行选择的。消息属性是一种可以被附加于消息之上的头信息,开发人员可以通过它为消息附加一些信息,而这些信息不属于消息正文。Message 接口提供了若干属性读写的方法。属性值可以是 String 类型或某种基本数据类型(boolean, byte, short,int, long, float, double)。属性的命名、取值以及类型转换规则,都由 JMS 给出了严格的定义。

    当你的应用需要增加一种新业务,这种新业务需要在旧的消息格式上增加若干个参数。为了避免影响到其它业务模块,你决定增加新的 MDB 来处理新的业务消息,并且不希望新的业务消息被旧的业务模块所接收,这时你就需要使用到消息选择器。你可以为消息定义一个版本属性,规定旧消息使用 1.0 版本,新消息使用 2.0 版本。然后属于新业务的 MDB 只接收 2.0 版本的消息,旧业务的 MDB 只接收 1.0 版本的消息。具体操作如下:

    1) 在消息生产端,自定义一个消息版本属性MessageVersion,把它附加到消息属性里。如:

    InitialContext ctx = new InitialContext(); //获取ConnectionFactory对象 TopicConnectionFactory factory = (TopicConnectionFactory)ctx.lookup("ConnectionFactory"); //创建TopicConnection对象 TopicConnection connection = factory.createTopicConnection(); //创建TopicSession对象,第一个参数表示事务自动提交,第二个参数标识一旦消息被正确送达,将自动发回响应 TopicSession session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); //获得Destination对象 Topic topic = (Topic)ctx.lookup("topic/mytopic"); //创建文本消息 TextMessage msg = session.createTextMessage("世界,你好"); //消息版本属性MessageVersion msg.setStringProperty("MessageVersion", "2.0"); //创建发布者 TopicPublisher publisher = session.createPublisher(topic); //发送消息 publisher.publish(msg); //关闭会话 session.close();  

    2) 让处理新业务的MDB只接收2.0版本的消息,在@ActivationConfigProperty注释中,使用标准属性messageSelector来声明消息选择器。代码如下:

    @MessageDriven( activationConfig = { @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Topic"), @ActivationConfigProperty(propertyName="destination",propertyValue="topic/mytopic"), @ActivationConfigProperty(propertyName="messageSelector",propertyValue="MessageVersion='2.0'") } ) public class MyTopicMDBSelectorBean implements MessageListener { public void onMessage(Message msg) { try { TextMessage textMessage = (TextMessage)msg; System.out.println("SelectorMDBBean被调用了! 【"+textMessage.getText()+"】"); } catch (JMSException e) { e.printStackTrace(); } } }  

    3) 让处理旧业务的MDB只接收1.0版本的消息代码如下:

    @MessageDriven( activationConfig = { @ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Topic"), @ActivationConfigProperty(propertyName="destination", propertyValue="topic/mytopic"), @ActivationConfigProperty(propertyName="messageSelector", propertyValue="MessageVersion='1.0'") } ) public class MyTopicMDBBean2 implements MessageListener { public void onMessage(Message msg) { try { TextMessage textMessage = (TextMessage)msg; System.out.println("OldMDBBean2被调用了! 【"+textMessage.getText()+"】"); } catch (JMSException e) { e.printStackTrace(); } } }  

     


    最新回复(0)