webspere MQ

    技术2022-05-20  29

    最近做一个项目其中要用到websphere MQ 做数据传输。

    以下是我整理用到的主要代码,以备后用。

     

     

    package devx.articles.mqjms;

     

    /**  

     * @author Ace Sun  

     * @version 创建时间:2011-4-3  

     * 类说明  

     */  

    import java.io.BufferedReader;

    import java.io.File;

    import java.io.FileInputStream;

    import java.io.FileOutputStream;

    import java.io.IOException;

    import java.io.InputStream;

    import java.io.InputStreamReader;

     

    import com.ibm.mq.MQC;

    import com.ibm.mq.MQEnvironment;

    import com.ibm.mq.MQException;

    import com.ibm.mq.MQGetMessageOptions;

    import com.ibm.mq.MQMessage;

    import com.ibm.mq.MQPutMessageOptions;

    import com.ibm.mq.MQQueue;

    import com.ibm.mq.MQQueueManager;

     

    public class MessageByMQ{   

         //定义队列管理器和队列的名称   

         private static String qmName;    

         private static String qName;   

         private static MQQueueManager qMgr;   

         static{   

             //设置环境:   

             //MQEnvironment中包含控制MQQueueManager对象中的环境的构成的静态变量,MQEnvironment的值的设定会在MQQueueManager的构造函数加载的时候起作用,   

             //因此必须在建立MQQueueManager对象之前设定MQEnvironment中的值.   

             MQEnvironment.hostname="58.2.217.60";          //MQ服务器的IP地址72         

             MQEnvironment.channel= "CLIENT.QM_ORANGE";              //服务器连接的通道 reciver   

             MQEnvironment.CCSID=1381;                      //服务器MQ服务使用的编码1381代表GBK、1208代表UTF(Coded Character Set Identifier:CCSID)   

             MQEnvironment.port=1415;                       //MQ端口   send QM port

             qmName = "sendQM";                          //MQ的队列管理器名称   sendQM QM_ORANGE

             qName = "QM_APPLE";                               //MQ远程队列的名称   Q1 QM_APPLE

             try {   

                 //定义并初始化队列管理器对象并连接    

                 //MQQueueManager可以被多线程共享,但是从MQ获取信息的时候是同步的,任何时候只有一个线程可以和MQ通信。   

                qMgr = new MQQueueManager(qmName);   

            } catch (MQException e) {   

                // TODO Auto-generated catch block   

                System.out.println("初使化MQ出错");   

                e.printStackTrace();   

            }    

         }   

         /**  

          * 往MQ发送消息  

          * @param message  

          * @return  

          */  

         public static int sendMessage(String message){   

             int result=0;   

             try{      

                 //设置将要连接的队列属性   

                 // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface    

                 //(except for completion code constants and error code constants).   

                 //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.   

                 //MQOO_OUTPUT:Open the queue to put messages.   

                 /*目标为远程队列,所有这里不可以用MQOO_INPUT_AS_Q_DEF属性*/  

                 //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;   

                 /*以下选项可适合远程队列与本地队列*/  

                 int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;   

                 //连接队列    

                 //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues.    

                 //The inquire and set capabilities are inherited from MQManagedObject.    

                 /*关闭了就重新打开*/  

                if(qMgr==null || !qMgr.isConnected()){   

                    qMgr = new MQQueueManager(qmName);   

                }   

                 MQQueue queue = qMgr.accessQueue(qName, openOptions);             

                 //定义一个简单的消息   

                 MQMessage putMessage = new MQMessage();    

                 //将数据放入消息缓冲区   

                 putMessage.writeUTF(message);     

                 //设置写入消息的属性(默认属性)   

                 MQPutMessageOptions pmo = new MQPutMessageOptions();              

                 //将消息写入队列    

                 queue.put(putMessage,pmo);    

                 queue.close();   

             }catch (MQException ex) {    

                 System.out.println("A WebSphere MQ error occurred : Completion code "    

                 + ex.completionCode + " Reason code " + ex.reasonCode);    

                 ex.printStackTrace();   

             }catch (IOException ex) {    

                 System.out.println("An error occurred whilst writing to the message buffer: " + ex);    

             }catch(Exception ex){   

                 ex.printStackTrace();   

             }finally{   

                 try {   

                    qMgr.disconnect();   

                } catch (MQException e) {   

                    e.printStackTrace();   

                }   

              }   

             return result;   

         }   

         /**  

          * 从队列中去获取消息,如果队列中没有消息,就会发生异常,不过没有关系,有TRY...CATCH,如果是第三方程序调用方法,如果无返回则说明无消息  

          * 第三方可以将该方法放于一个无限循环的while(true){...}之中,不需要设置等待,因为在该方法内部在没有消息的时候会自动等待。  

          * @return  

          */  

         /**

         * @return

         */

        /**

         * @return

         */

        public static String getMessage(String fileName){   

             String message=null;   

             try{               

                 //设置将要连接的队列属性   

                 // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface    

                 //(except for completion code constants and error code constants).   

                 //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.   

                 //MQOO_OUTPUT:Open the queue to put messages.   

                 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;   

                 MQMessage retrieve = new MQMessage();   

                 //设置取出消息的属性(默认属性)   

                 //Set the put message options.(设置放置消息选项)    

                 MQGetMessageOptions gmo = new MQGetMessageOptions();    

                 gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;//Get messages under sync point control(在同步点控制下获取消息)    

                 gmo.options = gmo.options + MQC.MQGMO_WAIT;  // Wait if no messages on the Queue(如果在队列上没有消息则等待)    

                 gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// Fail if Qeue Manager Quiescing(如果队列管理器停顿则失败)    

                 gmo.waitInterval = 1000 ;  // Sets the time limit for the wait.(设置等待的毫秒时间限制)    

                 /*关闭了就重新打开*/  

                if(qMgr==null || !qMgr.isConnected()){   

                    qMgr = new MQQueueManager(qmName);   

                }   

                 MQQueue queue = qMgr.accessQueue(qName, openOptions);    

                 // 从队列中取出消息   

                 queue.get(retrieve, gmo);   

                 //message = retrieve.readUTF();     

                 String message2 = retrieve.readUTF();

     

                 //System.out.println("===ss===="+message2.byteValue());

                 //message = retrieve.readLine();

     

                 System.out.println("The message is: " + message2);  

     

                 /** 如果服务器上的文件是以BASE64Encoder编码形式创建的,则使用此段代码读取。

                 BASE64Decoder decoder = new BASE64Decoder();// 建立解码类实例

                 byte[] contentArray = decoder.decodeBuffer(message2);//解码生成byte数组

                 String path = "E://receivedMqXML//" + fileName;  

                 FileOutputStream out = new FileOutputStream(new File(path));//调动输出流把文件写到指定的位置  

                 out.write(contentArray, 0, contentArray.length);

                 out.close();

                 */

                 String path = "E://receivedMqXML//" + fileName;  

                 FileOutputStream out = new FileOutputStream(new File(path));//调动输出流把文件写到指定的位置

                 out.write(message2.getBytes());

                 out.close();

     

     

     

                 queue.close();   

             }catch (MQException ex) {   

             if(ex.completionCode==2&&ex.reasonCode==2033){

             System.out.println("There are no record from MQ server");

             }else{

             System.out.println("A WebSphere MQ error occurred : Completion code "    

             + ex.completionCode + " Reason code " + ex.reasonCode);

             }

             }catch (IOException ex) {    

                 System.out.println("An error occurred whilst writing to the message buffer: " + ex);    

             }catch(Exception ex){   

                 ex.printStackTrace();   

             }finally{   

                 try {   

                    qMgr.disconnect();   

                } catch (MQException e) {   

                    e.printStackTrace();   

                }   

             }   

             return message;   

         }   

         public static void main(String args[]) throws IOException {   

             /*下面两个方法可同时使用,也可以单独使用*/

         StringBuffer sb = new StringBuffer();

          

         String fileName = "offer_example.xml";//MyEmployee.xml

         /**BASE64Encoder编码形式传送

         InputStream in = new FileInputStream("E://sentXML//" + fileName); // 输入流读取要发送的文件

         BASE64Encoder encoder = new BASE64Encoder();// 创建BASE64Encoder编码实例

         byte[] data = new byte[in.available()];

         in.read(data);

         String stringFile = encoder.encode(data);

         sb.append(stringFile);

         sendMessage(sb.toString());

         */

     

         /**以下为String方式传送*/

         InputStream in = new FileInputStream("E://sentXML//" + fileName); // 输入流读取要发送的文件

         String line;

         String stringFile = "";

         if(in != null) {

        try{

        BufferedReader reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));

        while ((line = reader.readLine()) != null) {

        sb.append(line).append("/n");

        }

         

        }finally {

        in.close();

        }

        stringFile = sb.toString();

         } else {

         stringFile = "";

         }

     

     

         //sendMessage(stringFile);

          

         getMessage(fileName);   

         }   

    }


    最新回复(0)