基于requestreply模式的MQ例子

    技术2022-05-11  150

    今天做了一个request/reply模式的MQ例子程序。

    一个程序把消息放在Q上,请求的程序会在把消息放到Q上之前设置replyToQueue和replyToQueueManager消息标题属性。而后它打开回复队列并等待correlationId匹配已发出请求消息的MessageId值的消息。

    另一个程序是在接受到消息后判断如果是请求消息的话,则生成回复消息并发到请求消息指定的消息队列上。它还将拷贝请求消息的MessageId到回复消息的correlationId消息标题字段上。

    程序的代码如下:

     

    import  com.ibm.mq. * ; /** * @author ralph  * TODO To change the template for this generated type comment go to Window - * Preferences - Java - Code Style - Code Templates */ public   class  Requester  public static void main(String[] args) {  try {   String hostName = "neu";   String channel = "ch_server";   String qManager = "QM_guo";   String requestQueue = "Q_request";   String replyToQueue = "Q_reply";   String replyToQueueManager = "QM_guo";   // set up the MQEnvironment properties for the client   MQEnvironment.hostname = hostName;   MQEnvironment.channel = channel;   MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,     MQC.TRANSPORT_MQSERIES);   MQEnvironment.CCSID = 1381;       // connetion to Q Manager   MQQueueManager qMgr = new MQQueueManager(qManager);   // set up the open options   int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;   // open the Q   MQQueue queue = qMgr.accessQueue(requestQueue, openOptions, null,     nullnull);   // set the put message options,will use the default settings   MQPutMessageOptions pmo = new MQPutMessageOptions();   pmo.options = pmo.options + MQC.MQPMO_NEW_MSG_ID;   pmo.options = pmo.options + MQC.MQPMO_SYNCPOINT;   // build a message and write data   MQMessage outMsg = new MQMessage();   outMsg.messageFlags = MQC.MQMT_REQUEST;   outMsg.replyToQueueManagerName = replyToQueueManager;   outMsg.replyToQueueName = replyToQueue;   // prepare message with the user data   String msgString = "The request message from requester program!";   outMsg.writeUTF(msgString);   // Now we put the message on the Q   queue.put(outMsg, pmo);   // commit the transaction   qMgr.commit();   System.out.println("The message has been sussesfully put #####");   // close the Q   queue.close();   // set up the open options   int openOptions2 = MQC.MQOO_INPUT_SHARED     | MQC.MQOO_FAIL_IF_QUIESCING;   // open the Q   MQQueue respQueue = qMgr.accessQueue(replyToQueue, openOptions2,     nullnullnull);   MQMessage respMessage = new MQMessage();   MQGetMessageOptions gmo = new MQGetMessageOptions();   gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;   gmo.options = gmo.options + MQC.MQGMO_WAIT;   gmo.matchOptions = MQC.MQMO_MATCH_CORREL_ID;   gmo.waitInterval = 10000;   respMessage.correlationId = outMsg.correlationId;   // get the response message   respQueue.get(respMessage, gmo);   String response = respMessage.readUTF();   System.out.println("The response message is:" + response);   qMgr.commit();   respQueue.close();   qMgr.disconnect();  } catch (MQException ex) {   System.out.println("Completion code is:" + ex.completionCode     + "  reason code is:" + ex.reasonCode);   ex.printStackTrace();  } catch (Exception e) {   e.printStackTrace();  } }}   /** * @author ralph * * TODO To change the template for this generated type comment go to * Window - Preferences - Java - Code Style - Code Templates */ import  com.ibm.mq. * ; public   class  Responder  public static void main(String[] args) {  try {   String hostName = "neu";   String channel = "ch_server";   String qManager = "QM_guo";   String qName = "Q_request";   // set up the MQEnvironment properties for the client   MQEnvironment.hostname = hostName;   MQEnvironment.channel = channel;   MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,     MQC.TRANSPORT_MQSERIES);   MQEnvironment.CCSID = 1381;      // connetion to Q Manager   MQQueueManager qMgr = new MQQueueManager(qManager);   // set up the open options   int openOptions = MQC.MQOO_INPUT_SHARED     | MQC.MQOO_FAIL_IF_QUIESCING;   // open the Q   MQQueue queue = qMgr.accessQueue(qName, openOptions, nullnull,     null);   // set the put message options   MQGetMessageOptions gmo = new MQGetMessageOptions();   gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;   gmo.options = gmo.options + MQC.MQGMO_WAIT;   gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;   gmo.waitInterval = 3000;   // build mssage   MQMessage inMsg = new MQMessage();   // get the message from Q   queue.get(inMsg, gmo);   // read the data from the message   String msgString = inMsg.readUTF();   System.out.println("The Message from Q is :" + msgString);   // check if message is of type request message and reply to the   // request   if (inMsg.messageFlags == MQC.MQMT_REQUEST) {    System.out.println("Praparing to reply to the request");    String replyQueueName = inMsg.replyToQueueName;    int openOptions2 = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;    MQQueue respQueue = qMgr      .accessQueue(replyQueueName, openOptions2,        inMsg.replyToQueueManagerName, nullnull);    MQMessage respMessage = new MQMessage();    respMessage.correlationId = inMsg.messageId;    MQPutMessageOptions pmo = new MQPutMessageOptions();    respMessage.messageFlags = MQC.MQMT_REPLY;    String response = "reply from responder program";    respMessage.writeUTF(response);    respQueue.put(respMessage, pmo);    System.out.println("The response sucessfully send!");    qMgr.commit();    respQueue.close();   }   queue.close();   qMgr.disconnect();  } catch (MQException ex) {   System.out.println("Completion code is:" + ex.completionCode     + "  reason code is:" + ex.reasonCode);   ex.printStackTrace();  } catch (Exception e) {   e.printStackTrace();  } }}

     


    最新回复(0)