一、简述EJB2.0开始,引进了消息驱动的EJB,简称MDB(message driven bean)。当MOM收到消息时,能够自动传达给这种Bean。跟事件驱动一个道理。注:只是接收驱动。EJB2.1,MDB又得到了加强,能够处理非JMS的消息,如:MailMessage,SMSMessage,SOAPMessage。
二、例子见上章提到的源码。(1)MDBQueueBean.java 这是EJB,它不需要其它什么 Home,Remote等接口。(2)ejb-jar.xml EJB描述。跟其它类型的EJB的描述大不一样。(3)jboss.xml JBOSS部署的描述。 需要部署的就以上三个文件。见myMDB.jar(4)MDBClient.java 这是一个客户端程序,用来发送信息的,执行后,服务端的MDB就能接收消息作出响应了。
三、MDB的不同之处:在编程上,对比普通程序的JMS,MDB只是在初始化时少了定位消息目标和注册监听的语句而已: QueueReceiver queueReceiver = queueSession.createReceiver("queue/testQueue"); queueReceiver.setMessageListener(this);
感觉:版本这东西真麻烦。买了一本《J2EE应用开发详解》(电子工业出版社,飞思科技监制),里面的例子行不通,主要是配置文件,简直连编译也通不过。 摸了一天多,终于在jboss网站找到帮助文件:救星呀:http://docs.jboss.org/jbossas/admindevel326/html/ch6.chapt.html#d0e11155 不过,在我的环境中,我还得做一点点小修改。见源码。。。
附:(1)MDBQueueBean.javapackage study.jms;
import javax.ejb.MessageDrivenBean;import javax.jms.MessageListener;import javax.ejb.MessageDrivenContext;//import javax.ejb.CreateException;import javax.jms.Message;
/**add by zengabo */import javax.ejb.EJBException;import javax.naming.InitialContext;import javax.naming.Context;import javax.naming.NamingException;import javax.jms.QueueConnectionFactory;import javax.jms.QueueConnection;import javax.jms.QueueSession;import java.util.HashMap;//import java.util.Hashtable;import javax.jms.JMSException;import javax.jms.Queue;import javax.jms.QueueSender;import javax.jms.TextMessage;
public class MDBQueueBean implements MessageDrivenBean, MessageListener { MessageDrivenContext messageDrivenContext;
/** add by zengabo */ private Context myContext; private QueueConnectionFactory queueConnectionFactory; private String queueConnectionFactoryName = "java:comp/env/jms/QCF"; private QueueConnection queueConnection; private QueueSession queueSession; private int i_count = 0 ;
public MDBQueueBean(){
} public void ejbCreate() { //throws CreateException { System.out.println(" ejbCreate() is time: "+Integer.toString(++i_count)); try{ init(); }catch (Exception e) { System.out.println(" init QueueMDB error"+e.toString()); throw new EJBException(" init QueueMDB error", e); } } private void init() throws JMSException, NamingException { myContext = new InitialContext(); /** lookup 1 */ Object obj = myContext.lookup(queueConnectionFactoryName); queueConnectionFactory = (QueueConnectionFactory) obj; queueConnection = queueConnectionFactory.createQueueConnection(); queueConnection.start(); queueSession = queueConnection.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE); /** in MDB , these are not use . QueueReceiver queueReceiver =null; queueReceiver = queueSession.createReceiver("queue/testQueue"); queueReceiver.setMessageListener(this); * * */ }
public void ejbRemove() { this.messageDrivenContext = null ; try{ if (queueSession != null) { queueSession.close(); } if (queueConnection != null) { queueConnection.close(); } }catch (Exception e) { } }
public void onMessage(Message message) { if (message instanceof javax.jms.BytesMessage) { javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) message; /** @todo Process bytesMessage here */ System.out.println("**** get bytesMessage:") ; System.out.println(bytesMessage.toString()) ; } else if (message instanceof javax.jms.MapMessage) { javax.jms.MapMessage mapMessage = (javax.jms.MapMessage) message; /** @todo Process mapMessage here */ System.out.println("**** get mapMessage:") ; System.out.println(mapMessage.toString()) ; } else if (message instanceof javax.jms.ObjectMessage) { javax.jms.ObjectMessage objectMessage = (javax.jms.ObjectMessage) message; /** @todo Process objectMessage here */ System.out.println("**** get ObjectMessage:") ; HashMap hm = null; try { hm = (HashMap) objectMessage.getObject(); } catch (JMSException ex2) { System.out.println(ex2.toString()); } System.out.println(hm.get("name")) ; System.out.println(hm.get("age")) ; System.out.println(hm.get("sex")) ; } else if (message instanceof javax.jms.StreamMessage) { javax.jms.StreamMessage streamMessage = (javax.jms.StreamMessage) message; /** @todo Process streamMessage here */ System.out.println("**** get StreamMessage:") ; System.out.println(streamMessage.toString()) ; } else if (message instanceof javax.jms.TextMessage) { javax.jms.TextMessage objectMessage = (javax.jms.TextMessage) message; /** @todo Process textMessage here */ String rev = ""; System.out.println("**** get textMessage:") ; try { rev = objectMessage.getText(); System.out.println(rev); } catch (JMSException ex1) { System.out.println(ex1.toString()); } } else{ System.out.println("**** Message is unknow type") ; return ; } /** 回复 */ try{ Queue dest = (Queue) message.getJMSReplyTo(); sendReply(" jboss recv ok", dest); }catch(Exception ee){ System.out.println("**** Reply Message Error"+ee.toString()) ; } } private void sendReply(String text, Queue dest) throws JMSException { QueueSender sender = queueSession.createSender(dest); TextMessage tm = queueSession.createTextMessage(text); sender.send(tm); sender.close(); }
public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext) { this.messageDrivenContext = messageDrivenContext; }}
(2)ejb-jar.xml<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE ejb-jar PUBLIC "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN" "http://java.sun.com/dtd/ejb-jar_2_0.dtd"><ejb-jar> <display-name>myMDB</display-name> <enterprise-beans> <message-driven> <ejb-name>MDBQueue</ejb-name> <ejb-class>study.jms.MDBQueueBean</ejb-class> <transaction-type>Container</transaction-type> <message-driven-destination> <destination-type>javax.jms.Queue</destination-type> </message-driven-destination> <resource-ref> <description>test mdb</description> <res-ref-name>jms/QCF</res-ref-name> <res-type>javax.jms.QueueConnectionFactory</res-type> <res-auth>Container</res-auth> </resource-ref> </message-driven> </enterprise-beans> <assembly-descriptor> <container-transaction> <method> <ejb-name>MDBQueue</ejb-name> <method-name>*</method-name> </method> <trans-attribute>Required</trans-attribute> </container-transaction> </assembly-descriptor></ejb-jar>
(3)jboss.xml<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE jboss PUBLIC "-//JBoss//DTD JBOSS 3.2//EN" "http://www.jboss.org/j2ee/dtd/jboss_3_2.dtd"><jboss> <enterprise-beans> <message-driven> <ejb-name>MDBQueue</ejb-name> <destination-jndi-name>queue/B</destination-jndi-name> <configuration-name>Standard Message Driven Bean</configuration-name> <resource-ref> <res-ref-name>jms/QCF</res-ref-name> <jndi-name>ConnectionFactory</jndi-name> </resource-ref> </message-driven> </enterprise-beans></jboss>
(4)MDBClient.javapackage study.jms;
import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.Queue;import javax.jms.QueueConnection;import javax.jms.QueueConnectionFactory;import javax.jms.QueueReceiver;import javax.jms.QueueSender;import javax.jms.QueueSession;import javax.jms.TextMessage;import javax.naming.InitialContext;import javax.naming.NamingException;
import EDU.oswego.cs.dl.util.concurrent.CountDown;
import java.util.Hashtable;import javax.naming.Context;
/** * A complete JMS client example program that sends N TextMessages to * a Queue B and asynchronously receives the messages as modified by * TextMDB from Queue A. * * @author Scott.Stark@jboss.org * @version $Revision: 1.10 $ */public class MDBClient{ static final int N = 10; static CountDown done = new CountDown(N);
QueueConnection conn; QueueSession session; Queue queA; Queue queB;
public static class ExListener implements MessageListener { public void onMessage(Message msg) { done.release(); TextMessage tm = (TextMessage) msg; try { System.out.println("onMessage, recv text="+tm.getText()); } catch(Throwable t) { t.printStackTrace(); } } }
public void setupPTP() throws JMSException, NamingException {// InitialContext iniCtx = new InitialContext(); InitialContext iniCtx = getInitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); //java:/XAConnectionFactory ConnectionFactory QueueConnectionFactory qcf = (QueueConnectionFactory) tmp; conn = qcf.createQueueConnection(); queA = (Queue) iniCtx.lookup("queue/A"); queB = (Queue) iniCtx.lookup("queue/B"); session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); conn.start(); } private InitialContext getInitialContext() throws NamingException { Hashtable environment = new Hashtable(); environment.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); environment.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"); environment.put(Context.PROVIDER_URL, "jnp://localhost:1099"); return new InitialContext(environment); }
public void sendRecvAsync(String textBase) throws JMSException, NamingException, InterruptedException { System.out.println("Begin sendRecvAsync");
// Setup the PTP connection, session setupPTP();
// Set the async listener for queA QueueReceiver recv = session.createReceiver(queA); recv.setMessageListener(new ExListener());
// Send a few text msgs to queB QueueSender send = session.createSender(queB);
for(int m = 0; m < 10; m ++) { TextMessage tm = session.createTextMessage(textBase+"#"+m); tm.setJMSReplyTo(queA); send.send(tm); System.out.println("sendRecvAsync, sent text="+tm.getText()); } System.out.println("End sendRecvAsync"); }
public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin MDBClient,now=" + System.currentTimeMillis()); MDBClient client = new MDBClient(); client.sendRecvAsync("A text msg"); client.done.acquire(); client.stop(); System.exit(0); System.out.println("End MDBClient"); }
}转自http://zengabo.bokee.com/619210.html