package cn.edu.itec.mqclient; import java.io.File; import com.ibm.mq.MQC; import com.ibm.mq.MQEnvironment; import com.ibm.mq.MQException; import com.ibm.mq.MQMessage; import com.ibm.mq.MQPutMessageOptions; import com.ibm.mq.MQQueueManager; public class MQPut { private String HOST_URL = "192.168.1.116"; private String MQ_CHANNEL = "EXAMPLE.CHANNEL"; private String MQ_MANAGER = "QMGR"; private String MQ_QUEUE = "EXAMPLE.QUEUE"; private int MQ_PORT = 4001; public static void main(String args[]) { new MQPut().SendFile("f:/JMSExampleEJB.jar"); } public void SendFile(String sFilePath) { try { /* 设置MQEnvironment 属性以便客户机连接 */ MQEnvironment.hostname = HOST_URL; MQEnvironment.channel = MQ_CHANNEL; MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES); MQEnvironment.CCSID = 1381; MQEnvironment.port = MQ_PORT; /* 连接到队列管理器 */ MQQueueManager qMgr = new MQQueueManager(MQ_MANAGER); System.out.println("queue manager is connected!"); /* 设置打开选项以便打开用于输出的队列,如果队列管理器正在停止,我们也已设置了选项去应对不成功情况。 */ int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING; /* 打开队列 */ com.ibm.mq.MQQueue queue = qMgr.accessQueue(MQ_QUEUE, openOptions); /* 设置放置消息选项我们将使用默认设置 */ MQPutMessageOptions pmo = new MQPutMessageOptions(); /* 创建消息,MQMessage 类包含实际消息数据的数据缓冲区,和描述消息的所有MQMD 参数 */ /* 创建消息缓冲区 */ MQMessage outMsg = new MQMessage(); /* set the properties of the message fot the selector */ outMsg.correlationId = "clinet_B_receive".getBytes(); outMsg.messageId = "1Aa".getBytes(); /* write msg */ MsgWriter.readFile(outMsg, new File(sFilePath)); /* put message with default options */ queue.put(outMsg, new MQPutMessageOptions()); System.out.println("send file is success!"); /* release resource */ queue.close(); qMgr.disconnect(); } catch (MQException ex) { //System.out.println("fft!"); System.out.println("An MQ Error Occurred: Completion Code is :/t" + ex.completionCode + "/n/n The Reason Code is :/t" + ex.reasonCode); ex.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } private void readFileToMessage(String FilePath) { } } JMS message 和 MQ message有几个字段是相同的,这些字段的值将会在转换中保留。比较方便的是使用CorrelationID这个字段。通过设置这个字段,达到选择性的接收特定消息的功能。其它字段没有完全搞清楚,有的数据类型需要转换,例如MessageID(对应于JMSMessageID)。MQ 消息选择和JMS不同,后者采用selector,前者通过设置接收消息的属性完成。例如设置CorrelationID为特定值。 客户端B MQGet.java
package cn.edu.itec.mqclient; import java.io.FileOutputStream; import java.io.IOException; import java.util.Hashtable; import com.ibm.mq.MQC; import com.ibm.mq.MQEnvironment; import com.ibm.mq.MQException; import com.ibm.mq.MQGetMessageOptions; import com.ibm.mq.MQMessage; import com.ibm.mq.MQQueueManager; /** * @author Administrator * * TODO To change the template for this generated type comment go to Window - * Preferences - Java - Code Style - Code Templates */ public class MQGet { private static String HOST_URL = "192.168.1.116"; private static String MQ_CHANNEL = "EXAMPLE.CHANNEL"; private static String MQ_MANAGER = "QMGR"; private static String MQ_QUEUE = "EXAMPLE.SENDQUEUE"; private static int MQ_PORT = 4001; public static void main(String[] args) { MQGet.getMessage(); } public static void getMessage() { try { /* 设置MQEnvironment 属性以便客户机连接 */ MQEnvironment.hostname = HOST_URL; MQEnvironment.channel = MQ_CHANNEL; MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES); MQEnvironment.CCSID = 1381; MQEnvironment.port = MQ_PORT; /* 连接到队列管理器 */ MQQueueManager qMgr = new MQQueueManager(MQ_MANAGER); System.out.println("queue manager is connected!"); /* * 设置打开选项以便打开用于输出的队列,如果队列管理器停止,我们也 已设置了选项去应对不成功情况 */ int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING; /* 打开队列 */ com.ibm.mq.MQQueue queue = qMgr.accessQueue(MQ_QUEUE, openOptions); System.out.println("队列连接成功"); /* 设置放置消息选项 */ MQGetMessageOptions gmo = new MQGetMessageOptions(); /* 在同步点控制下获取消息 */ gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT; /* 如果在队列上没有消息则等待 */ gmo.options = gmo.options + MQC.MQGMO_WAIT; /* 如果队列管理器停顿则失败 */ gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING; /* 设置等待的时间限制 */ gmo.waitInterval = MQC.MQWI_UNLIMITED; /* create the message buffer store */ MQMessage inMessage = new MQMessage(); /* set the selector */ inMessage.correlationId = "clinet_B_receive".getBytes(); /* get the message */ queue.get(inMessage, gmo); System.out.println("get message success"); /* 读出消息对象 */ Hashtable messageObject = (Hashtable) inMessage.readObject(); System.out.println(messageObject); /* 读出消息内容 */ byte[] content = (byte[]) messageObject.get("content"); /* save file */ FileOutputStream output = new FileOutputStream( "f:/exampleReceive.jar"); output.write(content); output.close(); System.out.println(messageObject.get("FileName")); /* 提交事务,相当于确认消息已经接收,服务器会删除该消息 */ qMgr.commit(); } catch (MQException e) { e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } MDB MQMDBBeanBean.java MDB文件
package ejbs; import javax.jms.ObjectMessage; import javax.jms.BytesMessage; import javax.jms.StreamMessage; import javax.jms.TextMessage; import javax.jms.JMSException; import ehub.ihub.exchangeManager.*; import java.util.Hashtable; import java.io.ByteArrayInputStream; import java.io.FileOutputStream; import java.io.File; import java.io.ObjectInputStream; /** * Bean implementation class for Enterprise Bean: MQMDBBean */ public class MQMDBBeanBean implements javax.ejb.MessageDrivenBean, javax.jms.MessageListener { private javax.ejb.MessageDrivenContext fMessageDrivenCtx; /** * getMessageDrivenContext */ public javax.ejb.MessageDrivenContext getMessageDrivenContext() { return fMessageDrivenCtx; } /** * setMessageDrivenContext */ public void setMessageDrivenContext(javax.ejb.MessageDrivenContext ctx) { fMessageDrivenCtx = ctx; } /** * ejbCreate */ public void ejbCreate() { } /** * onMessage */ public void onMessage(javax.jms.Message msg) { try { System.out.println(msg.toString()); if (msg instanceof TextMessage) { System.out.println("TextMessage"); } else if (msg instanceof ObjectMessage) { System.out.println("ObjectMessage"); } else if (msg instanceof StreamMessage) { System.out.println("StreamMessage"); } else if (msg instanceof BytesMessage) { System.out.println("BytesMessage"); BytesMessage bytesMessage = (BytesMessage) msg; String sCorrelationID = new String(bytesMessage .getJMSCorrelationIDAsBytes()); String sMessageID = bytesMessage.getJMSMessageID(); long size = bytesMessage.getBodyLength(); System.out.println("size=" + size + "/n CorrelationID=" + sCorrelationID + "/n MessageID=" + sMessageID); /*read the message and save the file*/ ReadMessage.readMessage(bytesMessage); System.out.println("read message success"); /*send the message to the client */ SendMessage.sendFileToReceiveQueue(new File("c:/receivedExample.jar")); System.out.println("send file success"); } else { System.out.println("no message"); } } catch (Exception e) { System.out.println("onmessage 执行错误,回滚!"); e.printStackTrace(System.err); fMessageDrivenCtx.setRollbackOnly(); } } private void getProperties(byte[] p) { } /** * ejbRemove */ public void ejbRemove() { } } ReadMessage.java
/* * Created on 2006-2-15 * * TODO To change the template for this generated file go to * Window - Preferences - Java - Code Style - Code Templates */ package ehub.ihub.exchangeManager; import java.io.ByteArrayInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.util.Hashtable; import javax.jms.BytesMessage; import javax.jms.JMSException; /** * @author Administrator * * */ public class ReadMessage { /** * read message including property and body * * @param Message * @throws JMSException * @throws IOException * @throws ClassNotFoundException */ public static void readMessage(BytesMessage Message) { try { long bodySize = Message.getBodyLength(); byte[] buf = new byte[Integer.parseInt(String.valueOf(bodySize))]; /* 消息内容读到字节数组中 */ Message.readBytes(buf); ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream( buf); /* 从字节流读出消息对象 */ ObjectInputStream objectInputStream = new ObjectInputStream( byteArrayInputStream); Hashtable messageObject = (Hashtable) objectInputStream .readObject(); /* 解析消息 */ byte[] contentBuf = (byte[]) messageObject.get("content"); /* 把文件保存 */ FileOutputStream fileWriter = new FileOutputStream( "c:/receivedExample.jar"); fileWriter.write(contentBuf); fileWriter.close(); } catch (JMSException e) { e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } SendMessage.java
/* * Created on 2006-2-16 * * TODO To change the template for this generated file go to * Window - Preferences - Java - Code Style - Code Templates */ package ehub.ihub.exchangeManager; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.Hashtable; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; /** * @author Administrator * * TODO To change the template for this generated type comment go to Window - * Preferences - Java - Code Style - Code Templates */ public class SendMessage { private static String MQ_CHANNEL = "EXAMPLE.CHANNEL"; private static String MQ_MANAGER = "QMGR"; private static String MQ_QUEUE = "EXAMPLE.SENDQUEUE"; private static int MQ_PORT = 4001; private static String JMS_CONNECTIONFACTORY = "jms/JMSExampleConnectionFactory"; private static String QUEUE_NAME="jms/JMSExampleSendQueue"; public static void sendFileToReceiveQueue(File file) { try { Context initContext = new InitialContext(); ConnectionFactory qconFactory = (ConnectionFactory) initContext .lookup(JMS_CONNECTIONFACTORY); Connection qcon = qconFactory.createConnection(); Session session = qcon.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = (Queue) initContext.lookup(QUEUE_NAME); MessageProducer producer = session.createProducer(queue); ObjectMessage outMessage=session.createObjectMessage(); /* write the file information into the message */ Hashtable fileInfo = new Hashtable(); fileInfo.put("FileName", file.getName()); fileInfo.put("FileSize", Long.toString(file.length())); /* write the file content into the message */ FileInputStream fos = new FileInputStream(file); byte[] buf; int size = (int) file.length(); buf = new byte[size]; int num = fos.read(buf); fos.close(); /*add the file byte stream to the object*/ fileInfo.put("content", buf); outMessage.setObject(fileInfo); outMessage.getObject(); outMessage.setJMSCorrelationIDAsBytes((new String("clinet_B_receive")).getBytes()); // qcon.start(); producer.send(outMessage); producer.close(); session.close(); qcon.close(); } catch (NamingException e) { System.out.println("获得连接失败,jndi 查找失败"); e.printStackTrace(); } catch (JMSException e) { System.out.println("发送文件异常"); // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block System.out.println("发送文件过程中io 操作失败"); e.printStackTrace(); } } }