MDB(message driven bean)收消息

    技术2022-05-20  50

    一、简述EJB2.0开始,引进了消息驱动的EJB,简称MDB(message driven bean)。当MOM收到消息时,能够自动传达给这种Bean。跟事件驱动一个道理。注:只是接收驱动。EJB2.1,MDB又得到了加强,能够处理非JMS的消息,如:MailMessage,SMSMessage,SOAPMessage。

    二、例子见上章提到的源码。(1)MDBQueueBean.java  这是EJB,它不需要其它什么 Home,Remote等接口。(2)ejb-jar.xml  EJB描述。跟其它类型的EJB的描述大不一样。(3)jboss.xml   JBOSS部署的描述。   需要部署的就以上三个文件。见myMDB.jar(4)MDBClient.java 这是一个客户端程序,用来发送信息的,执行后,服务端的MDB就能接收消息作出响应了。

    三、MDB的不同之处:在编程上,对比普通程序的JMS,MDB只是在初始化时少了定位消息目标和注册监听的语句而已:      QueueReceiver queueReceiver = queueSession.createReceiver("queue/testQueue");      queueReceiver.setMessageListener(this);

    感觉:版本这东西真麻烦。买了一本《J2EE应用开发详解》(电子工业出版社,飞思科技监制),里面的例子行不通,主要是配置文件,简直连编译也通不过。   摸了一天多,终于在jboss网站找到帮助文件:救星呀:http://docs.jboss.org/jbossas/admindevel326/html/ch6.chapt.html#d0e11155   不过,在我的环境中,我还得做一点点小修改。见源码。。。

     

    附:(1)MDBQueueBean.javapackage study.jms;

    import javax.ejb.MessageDrivenBean;import javax.jms.MessageListener;import javax.ejb.MessageDrivenContext;//import javax.ejb.CreateException;import javax.jms.Message;

    /**add by zengabo */import javax.ejb.EJBException;import javax.naming.InitialContext;import javax.naming.Context;import javax.naming.NamingException;import javax.jms.QueueConnectionFactory;import javax.jms.QueueConnection;import javax.jms.QueueSession;import java.util.HashMap;//import java.util.Hashtable;import javax.jms.JMSException;import javax.jms.Queue;import javax.jms.QueueSender;import javax.jms.TextMessage;

    public class MDBQueueBean    implements MessageDrivenBean, MessageListener {  MessageDrivenContext messageDrivenContext;

      /** add by zengabo */  private Context myContext;  private QueueConnectionFactory queueConnectionFactory;  private String queueConnectionFactoryName = "java:comp/env/jms/QCF";  private QueueConnection queueConnection;  private QueueSession queueSession;  private int i_count = 0 ;

      public MDBQueueBean(){

      }  public void ejbCreate() { //throws CreateException {    System.out.println(" ejbCreate() is time: "+Integer.toString(++i_count));    try{       init();    }catch (Exception e) {      System.out.println(" init QueueMDB error"+e.toString());      throw new EJBException(" init QueueMDB error", e);    } }  private void init() throws JMSException, NamingException {    myContext =  new InitialContext();    /** lookup 1 */    Object obj = myContext.lookup(queueConnectionFactoryName);    queueConnectionFactory = (QueueConnectionFactory) obj;    queueConnection = queueConnectionFactory.createQueueConnection();    queueConnection.start();    queueSession = queueConnection.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);    /** in MDB , these are not use .      QueueReceiver queueReceiver =null;      queueReceiver = queueSession.createReceiver("queue/testQueue");      queueReceiver.setMessageListener(this);     *     * */  }

      public void ejbRemove() {      this.messageDrivenContext = null ;      try{      if (queueSession != null) {         queueSession.close();       }       if (queueConnection != null) {         queueConnection.close();       }     }catch (Exception e) {    }  }

      public void onMessage(Message message) {    if (message instanceof javax.jms.BytesMessage) {      javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) message;      /** @todo Process bytesMessage here */      System.out.println("**** get bytesMessage:") ;      System.out.println(bytesMessage.toString()) ;    }    else if (message instanceof javax.jms.MapMessage) {      javax.jms.MapMessage mapMessage = (javax.jms.MapMessage) message;      /** @todo Process mapMessage here */      System.out.println("**** get mapMessage:") ;      System.out.println(mapMessage.toString()) ;    }    else if (message instanceof javax.jms.ObjectMessage) {      javax.jms.ObjectMessage objectMessage = (javax.jms.ObjectMessage) message;      /** @todo Process objectMessage here */      System.out.println("**** get ObjectMessage:") ;      HashMap hm = null;      try {        hm = (HashMap) objectMessage.getObject();      }      catch (JMSException ex2) {        System.out.println(ex2.toString());      }      System.out.println(hm.get("name")) ;      System.out.println(hm.get("age")) ;      System.out.println(hm.get("sex")) ;    }    else if (message instanceof javax.jms.StreamMessage) {      javax.jms.StreamMessage streamMessage = (javax.jms.StreamMessage) message;      /** @todo Process streamMessage here */      System.out.println("**** get StreamMessage:") ;      System.out.println(streamMessage.toString()) ;    }    else if (message instanceof javax.jms.TextMessage) {      javax.jms.TextMessage objectMessage = (javax.jms.TextMessage) message;      /** @todo Process textMessage here */      String rev = "";      System.out.println("**** get textMessage:") ;      try {        rev =  objectMessage.getText();        System.out.println(rev);      }      catch (JMSException ex1) {        System.out.println(ex1.toString());      }    }    else{      System.out.println("**** Message is unknow type") ;      return ;    }    /** 回复 */    try{      Queue dest = (Queue) message.getJMSReplyTo();      sendReply(" jboss recv ok", dest);    }catch(Exception ee){      System.out.println("**** Reply Message Error"+ee.toString()) ;    }  }  private void sendReply(String text, Queue dest) throws JMSException {    QueueSender sender = queueSession.createSender(dest);    TextMessage tm = queueSession.createTextMessage(text);    sender.send(tm);    sender.close();  }

      public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext) {    this.messageDrivenContext = messageDrivenContext;  }}

    (2)ejb-jar.xml<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE ejb-jar PUBLIC "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN" "http://java.sun.com/dtd/ejb-jar_2_0.dtd"><ejb-jar>  <display-name>myMDB</display-name>  <enterprise-beans>    <message-driven>      <ejb-name>MDBQueue</ejb-name>      <ejb-class>study.jms.MDBQueueBean</ejb-class>      <transaction-type>Container</transaction-type>      <message-driven-destination>        <destination-type>javax.jms.Queue</destination-type>      </message-driven-destination>      <resource-ref>        <description>test mdb</description>        <res-ref-name>jms/QCF</res-ref-name>        <res-type>javax.jms.QueueConnectionFactory</res-type>        <res-auth>Container</res-auth>      </resource-ref>    </message-driven>  </enterprise-beans>  <assembly-descriptor>    <container-transaction>      <method>        <ejb-name>MDBQueue</ejb-name>        <method-name>*</method-name>      </method>      <trans-attribute>Required</trans-attribute>    </container-transaction>  </assembly-descriptor></ejb-jar>

    (3)jboss.xml<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE jboss PUBLIC "-//JBoss//DTD JBOSS 3.2//EN" "http://www.jboss.org/j2ee/dtd/jboss_3_2.dtd"><jboss>  <enterprise-beans>    <message-driven>      <ejb-name>MDBQueue</ejb-name>      <destination-jndi-name>queue/B</destination-jndi-name>      <configuration-name>Standard Message Driven Bean</configuration-name>      <resource-ref>        <res-ref-name>jms/QCF</res-ref-name>        <jndi-name>ConnectionFactory</jndi-name>      </resource-ref>    </message-driven>  </enterprise-beans></jboss>

    (4)MDBClient.javapackage study.jms;

    import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.Queue;import javax.jms.QueueConnection;import javax.jms.QueueConnectionFactory;import javax.jms.QueueReceiver;import javax.jms.QueueSender;import javax.jms.QueueSession;import javax.jms.TextMessage;import javax.naming.InitialContext;import javax.naming.NamingException;

    import EDU.oswego.cs.dl.util.concurrent.CountDown;

    import java.util.Hashtable;import javax.naming.Context;

    /** *  A complete JMS client example program that sends N TextMessages to *  a Queue B and asynchronously receives the messages as modified by *  TextMDB from Queue A. * *  @author Scott.Stark@jboss.org *  @version $Revision: 1.10 $ */public class MDBClient{    static final int N = 10;    static CountDown done = new CountDown(N);

        QueueConnection conn;    QueueSession session;    Queue queA;    Queue queB;

        public static class ExListener        implements MessageListener    {        public void onMessage(Message msg)        {            done.release();            TextMessage tm = (TextMessage) msg;            try {                System.out.println("onMessage, recv text="+tm.getText());            } catch(Throwable t) {                t.printStackTrace();            }        }    }

        public void setupPTP()        throws JMSException, NamingException    {//        InitialContext iniCtx =  new InitialContext();        InitialContext iniCtx =  getInitialContext();        Object tmp = iniCtx.lookup("ConnectionFactory"); //java:/XAConnectionFactory ConnectionFactory        QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;        conn = qcf.createQueueConnection();        queA = (Queue) iniCtx.lookup("queue/A");        queB = (Queue) iniCtx.lookup("queue/B");        session = conn.createQueueSession(false,                                          QueueSession.AUTO_ACKNOWLEDGE);        conn.start();    }    private InitialContext getInitialContext() throws NamingException {      Hashtable environment = new Hashtable();      environment.put(Context.INITIAL_CONTEXT_FACTORY,                      "org.jnp.interfaces.NamingContextFactory");      environment.put(Context.URL_PKG_PREFIXES,                      "org.jboss.naming:org.jnp.interfaces");      environment.put(Context.PROVIDER_URL, "jnp://localhost:1099");      return new InitialContext(environment);    }

        public void sendRecvAsync(String textBase)        throws JMSException, NamingException, InterruptedException    {        System.out.println("Begin sendRecvAsync");

            // Setup the PTP connection, session        setupPTP();

            // Set the async listener for queA        QueueReceiver recv = session.createReceiver(queA);        recv.setMessageListener(new ExListener());

            // Send a few text msgs to queB        QueueSender send = session.createSender(queB);

            for(int m = 0; m < 10; m ++) {            TextMessage tm = session.createTextMessage(textBase+"#"+m);            tm.setJMSReplyTo(queA);            send.send(tm);            System.out.println("sendRecvAsync, sent text="+tm.getText());        }        System.out.println("End sendRecvAsync");    }

        public void stop()        throws JMSException    {        conn.stop();        session.close();        conn.close();    }    public static void main(String args[])        throws Exception    {        System.out.println("Begin MDBClient,now=" +                           System.currentTimeMillis());        MDBClient client = new MDBClient();        client.sendRecvAsync("A text msg");        client.done.acquire();        client.stop();        System.exit(0);        System.out.println("End MDBClient");    }

    }转自http://zengabo.bokee.com/619210.html


    最新回复(0)