消息服务主要由三个角色组成:传送节点(End point)、接收节点、消息导向中间件(Message-Oriented Middleware, MOM)。传送节点又称之为消息生产者,接收节点又称为消息消费者。传送节点负责建立与传送消息,接收节点负责接收与处理消息,传送节点将消息送至MOM,接收节点则从MOM取得消息,MOM作为一个消息的交换场所,本身不处理消息,而提供消息送达保证、交易、容错、负载平衡等功能。MOM让传送节点与接收节点之间无需知道彼此的存在,MOM提供消息信道(Channel),传送节点仅需知道消息的目标信道,将消息送至信道上,接收节点也仅需知道至哪个信道取得消息并加以处理,传送节点与接收节点之间为松散耦合,任一方修改或更新,并不会影响另一方,除此之外,消息服务支持异步模式,消息传送之后无需等待,程序可以继续流程。消息的传送模式可以分为两种:出版-订阅(Publish-Subscribe)模式、点对点(Point-to-Point)模式。
出版-订阅模式在出版订阅模式下,传送节点为出版者,接收节点为订阅者,MOM提供的频道称之为主题(Topic),出版者将消息出版至主题上,订阅者订阅感兴趣的主题。每个主题可能会有多个订阅者,每个订阅者所收到的是消息的复本,基本上,每个消息只能被同一个订阅者接收一次。出版者也可以自行决定要将消息发送至哪一个主题。出版-订阅模式出版者采Push模式,只要主题上有新消息,订阅者将马上收到消息,发布消息时,订阅者不一定会在在线,一但订阅者上线,就会马上收到消息。
点对点模式在点对点模式下,MOM提供的频道称之为队列(Queue),传送节点称为传送者(Sender),接收节点称为接收者(Receiver),传送者将消息送至队列,接收者若要处理消息,则主动至队列上取得消息。点对点模式采用Pull模式,接收者要主动至队列查看取得消息,若消息发布至队列而尚未被接收者取出处理,则保存在队列中,接收者取得消息后,会发出一个ACK(Acknowledgment)给传送者,告知消息已取得。点对点模式下,队列会保留消息至接收者取出为主,因此队列亦扮演缓冲区的作用,多个接收者可以使用同一个队列,但队列中的每个消息只能被其中一个接收者取得,队列中的消息如何分配给接收者,由个别服务器决定。
java Message Service(JMS)是由Sun与MOM厂商所共同制定的接口,定义了消息的传送、接收、频道(Channel)、主题(Topic)、队列 (Queue)等标准接口,实作部份由厂商完成,Java开发人员只要学习标准API界面的使用,就可以利用各厂商的JMS支持系统来进行消息传送、接收 等处理(您可以想象JDBC与数据库厂商之间的关系)。
在JMS当中,频道是用Destination这个接口来定义,在 消息(Message)观念 中提及,消息服务有出版-订阅(Publish-Subscribe)与点对点(Point-to-Point)两种模式,而频道分为主题与队列,因此,Destination有两个子接口Topic与Queue来分别代表。而JMS中对于消息产生者、消息消费者的定义则分别为MessageProducer与MessageConsumer接口,而在两种模式下,分别有发布 者(Publisher)、传送者(Sender),以及订阅者(Subscriber)、接收者(Receiver),这分别都定义为MessageProducer的子接口TopicPublisher、QueueSender,以及MessageConsumer的子接口TopicSubscriber、QueueReceiver来定义。伺服端必须设定好ConnectionFactory以及Destination,JMS端点取得ConnectionFactory,使用其与伺服端建立联机,联机以Connection接口定义。根据两种模式的不同,ConnectionFactory有TopicConnectionFactory、QueueConnectionFactory两个子界面:而Connection有TopicConnection、QueueConnection两个子接口的定义:伺服端要事先设定好ConnectionFactory及Destination,并分别使用一个名称向JNDI注册,端点必须使用JNDI名称查找ConnectionFactory及Destination,以点对点模式为例:
Context context = new InitialContext();ConnectionFactory someFactory = (ConnectionFactory) context.lookup("jms/SomeQueueConnectionFactory");Destination someQueue = (Destination) context.lookup("jms/SomeQueue");
而后使用ConnectionFactory取得Connection对象,实际建立联机时,使用Connection建立Session对象,代表该次的会话:
Connection connection = someFactory.createConnection();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
createSession()时,第一个参数代表是否参与交易,第二个参数代表自动回复确认。若是消息产生者,则可以使用Session对象,指定要使用哪个Destination来建立MessageProducer的实例,例如:
MessageProducer producer = session.createProducer(someQueue);
在JMS中,定义有几种消息,它们都实作Message接口:
BytesMessageMapMessageObjectMessageStreamMessageTextMessageSession对象提供有createXXXMessage()等方法,以建立相对应的消息,以建立TextMessage并传送为例:
TextMessage message = sesion.createTextMessage();message.setText("Hello!JMS!");producer.send(message);
在消息中包括主体(Body),如以上的文字部份就是属于主体部份,消息还包括了标头(Header),您可以利用消息对象的setXXXSProperty()来设定,用getXXXProperty()来取得标头部份。在消息消费者部份,同样的要建立Session对象,之后使用Session建立MessageConsumer实例:
MessageConsumer consumer = session.createConsumer(someQueue);
在启动联机之后,可以使用receive()方法直接接收消息:
Message message = consumer.receive();
receive()是同步的,你可以指定其接收消息时等待的毫秒数,另一个方式是指定MessageListener,您可以实作其onMessage()方法,例如:
public class MessageListenerImpl implements MessageListener { public void onMessage(Message message) { .... }}
在启动联机之前,建立并指定MessageListener:
consumer.setMessageListener(new MessageListenerImpl());
当收到消息时,就会自动呼叫onMessage()方法,如此可以异步的方式来处理消息。
若要直接使用JMS API来进行消息传送与接收,基本上都会有一些固定的流程,例如一个JMS传送者可以如下撰写:
// 处理例外try { // 设定系统属性 System.setProperty("com.sun.appserv.iiop.endpoints", "127.0.0.1:3700"); // 设定 JNDI 属性 Properties properties = new Properties(); properties.setProperty(Context.PROVIDER_URL, "iiop://127.0.0.1:3700"); properties.setProperty(Context.INITIAL_CONTEXT_FACTORY, "com.sun.appserv.naming.S1ASCtxFactory"); // 建立 Context Context ctx = new InitialContext(properties); // 查找 ConnectionFactory ConnectionFactory connectionFactory = (ConnectionFactory) ctx.lookup("jms/HelloQueueFactory"); // 查找 Destination Queue queue = (Queue) ctx.lookup("jms/HelloQueue"); // 建立 Connection Connection connection = connectionFactory.createConnection(); // 建立 Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立 MessageProducer MessageProducer messageProducer = session.createProducer(queue); // 建立消息并传送 for (int i = 0; i < 5; i++) { TextMessage message = session.createTextMessage("Message " + ": " + i); messageProducer.send(message); }} catch (Exception ex) { ex.printStackTrace();}
一个JMS接收者也是类似的流程,每次都要撰写重复的流程有些麻烦,您可以利用Spring所提供的JmsTemplate来简化程序的撰写。以下范例假设您在Glassfish上建立了ConnectionFactory,JNDI名称为"jms/HelloQueueFactory",以及一个Queue,JNDI名称为"jms/HelloQueue"。ConnectionFactory与Queue的建立,都可以交由Spring容器,之后您可以建立JmsTemplate,并将Spring容器中的 ConnectionFactory、Queue注入JmsTemplate的实例中,例如您可以这么撰写beans-config.xml:
beans-config.xml<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE beans PUBLIC "-//SPRING/DTD BEAN/EN" "http://www.springframework.org/dtd/spring-beans.dtd"> <beans> <bean id="jmsFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="jms/HelloQueueFactory" /> </bean> <bean id="myQueue" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiName" value="jms/HelloQueue" /> </bean> <bean id="jmst" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <ref bean="jmsFactory"/> </property> <property name="defaultDestination"> <ref bean="myQueue"/> </property> </bean></beans>
若要传送消息,则从容器中取得JmsTemplate的实例,并呼叫对应方法传送即可,例如:
MessageProvider.javapackage lab7.ex3;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import org.springframework.jms.core.JmsTemplate;public class MessageProvider { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext( "lab7/ex3/beans-config.xml"); JmsTemplate jmst = (JmsTemplate) context.getBean("jmst"); jmst.convertAndSend("Hello World!"); }}
JmsTemplate会自动为您建立Connection、Session、Message并进行传送,而消息的处理者可以如下撰写:
MessageConsumer.javapackage lab7.ex3;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import org.springframework.jms.core.JmsTemplate;public class MessageConsumer { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext( "lab7/ex3/beans-config.xml"); JmsTemplate jmst = (JmsTemplate) context.getBean("jmst"); System.out.println(jmst.receiveAndConvert()); }}
消息传送者传送一个"Hello World!",所以您会在消息接收之后,于控制台下看到显示"Hello World!"。