今天做了一个request/reply模式的MQ例子程序。
一个程序把消息放在Q上,请求的程序会在把消息放到Q上之前设置replyToQueue和replyToQueueManager消息标题属性。而后它打开回复队列并等待correlationId匹配已发出请求消息的MessageId值的消息。
另一个程序是在接受到消息后判断如果是请求消息的话,则生成回复消息并发到请求消息指定的消息队列上。它还将拷贝请求消息的MessageId到回复消息的correlationId消息标题字段上。
程序的代码如下:
import com.ibm.mq. * ; /** */ /** * @author ralph * TODO To change the template for this generated type comment go to Window - * Preferences - Java - Code Style - Code Templates */ public class Requester ... { public static void main(String[] args) ...{ try ...{ String hostName = "neu"; String channel = "ch_server"; String qManager = "QM_guo"; String requestQueue = "Q_request"; String replyToQueue = "Q_reply"; String replyToQueueManager = "QM_guo"; // set up the MQEnvironment properties for the client MQEnvironment.hostname = hostName; MQEnvironment.channel = channel; MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES); MQEnvironment.CCSID = 1381; // connetion to Q Manager MQQueueManager qMgr = new MQQueueManager(qManager); // set up the open options int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING; // open the Q MQQueue queue = qMgr.accessQueue(requestQueue, openOptions, null, null, null); // set the put message options,will use the default settings MQPutMessageOptions pmo = new MQPutMessageOptions(); pmo.options = pmo.options + MQC.MQPMO_NEW_MSG_ID; pmo.options = pmo.options + MQC.MQPMO_SYNCPOINT; // build a message and write data MQMessage outMsg = new MQMessage(); outMsg.messageFlags = MQC.MQMT_REQUEST; outMsg.replyToQueueManagerName = replyToQueueManager; outMsg.replyToQueueName = replyToQueue; // prepare message with the user data String msgString = "The request message from requester program!"; outMsg.writeUTF(msgString); // Now we put the message on the Q queue.put(outMsg, pmo); // commit the transaction qMgr.commit(); System.out.println("The message has been sussesfully put #####"); // close the Q queue.close(); // set up the open options int openOptions2 = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING; // open the Q MQQueue respQueue = qMgr.accessQueue(replyToQueue, openOptions2, null, null, null); MQMessage respMessage = new MQMessage(); MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT; gmo.options = gmo.options + MQC.MQGMO_WAIT; gmo.matchOptions = MQC.MQMO_MATCH_CORREL_ID; gmo.waitInterval = 10000; respMessage.correlationId = outMsg.correlationId; // get the response message respQueue.get(respMessage, gmo); String response = respMessage.readUTF(); System.out.println("The response message is:" + response); qMgr.commit(); respQueue.close(); qMgr.disconnect(); } catch (MQException ex) ...{ System.out.println("Completion code is:" + ex.completionCode + " reason code is:" + ex.reasonCode); ex.printStackTrace(); } catch (Exception e) ...{ e.printStackTrace(); } }} /** */ /** * @author ralph * * TODO To change the template for this generated type comment go to * Window - Preferences - Java - Code Style - Code Templates */ import com.ibm.mq. * ; public class Responder ... { public static void main(String[] args) ...{ try ...{ String hostName = "neu"; String channel = "ch_server"; String qManager = "QM_guo"; String qName = "Q_request"; // set up the MQEnvironment properties for the client MQEnvironment.hostname = hostName; MQEnvironment.channel = channel; MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES); MQEnvironment.CCSID = 1381; // connetion to Q Manager MQQueueManager qMgr = new MQQueueManager(qManager); // set up the open options int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING; // open the Q MQQueue queue = qMgr.accessQueue(qName, openOptions, null, null, null); // set the put message options MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT; gmo.options = gmo.options + MQC.MQGMO_WAIT; gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING; gmo.waitInterval = 3000; // build mssage MQMessage inMsg = new MQMessage(); // get the message from Q queue.get(inMsg, gmo); // read the data from the message String msgString = inMsg.readUTF(); System.out.println("The Message from Q is :" + msgString); // check if message is of type request message and reply to the // request if (inMsg.messageFlags == MQC.MQMT_REQUEST) ...{ System.out.println("Praparing to reply to the request"); String replyQueueName = inMsg.replyToQueueName; int openOptions2 = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING; MQQueue respQueue = qMgr .accessQueue(replyQueueName, openOptions2, inMsg.replyToQueueManagerName, null, null); MQMessage respMessage = new MQMessage(); respMessage.correlationId = inMsg.messageId; MQPutMessageOptions pmo = new MQPutMessageOptions(); respMessage.messageFlags = MQC.MQMT_REPLY; String response = "reply from responder program"; respMessage.writeUTF(response); respQueue.put(respMessage, pmo); System.out.println("The response sucessfully send!"); qMgr.commit(); respQueue.close(); } queue.close(); qMgr.disconnect(); } catch (MQException ex) ...{ System.out.println("Completion code is:" + ex.completionCode + " reason code is:" + ex.reasonCode); ex.printStackTrace(); } catch (Exception e) ...{ e.printStackTrace(); } }}