1、简介 {#1简介}
本文将会带你了解如何使用 Java JMS(Java Message Service)从 IBM MQ 队列读写消息。
2、设置环境 {#2设置环境}
我们可以在 Docker 容器中运行 IBM MQ,以避免手动安装和配置的复杂性。
使用以下命令以基本配置运行容器:
docker run -d --name my-mq -e LICENSE=accept -e MQ_QMGR_NAME=QM1 MQ_QUEUE_NAME=QUEUE1 -p 1414:1414 -p 9443:9443 ibmcom/mq
接下来,需要在 pom.xml 文件中添加 IBM MQ 客户端:
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.4.0.0</version>
</dependency>
3、配置 JMS Connection {#3配置-jms-connection}
首先,我们需要用 QueueConnectionFactory
建立 JMS Connection
(连接),用于创建与队列管理器(Queue Manager)的连接:
public class JMSSetup {
public QueueConnectionFactory createConnectionFactory() throws JMSException {
MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
factory.setHostName("localhost");
factory.setPort(1414);
factory.setQueueManager("QM1");
factory.setChannel("SYSTEM.DEF.SVRCONN");
return factory;
}
}
首先创建一个 MQQueueConnectionFactory
实例,用于配置和创建与 IBM MQ 服务器的连接。我们将主机名设置为 localhost
,因为 MQ 服务器是在本地 Docker 容器内运行的。暴露的映射端口为 1414。
然后,使用默认的 SYSTEM.DEF.SVRCONN channel。这是客户端连接 IBM MQ 的常用 channel。
4、写入消息到 IBM MQ 队列 {#4写入消息到-ibm-mq-队列}
本节将带你了解向 IBM MQ 队列发送消息的过程。
4.1、建立 JMS 连接 {#41建立-jms-连接}
首先,创建 MessageSender
类。该类负责设置与 IBM MQ 服务器的连接、管理会话和处理消息发送操作。我们声明 QueueConnectionFactory
、QueueConnection
、QueueSession
和 QueueSender
实例的变量,这些变量将用于与 IBM MQ 服务器交互。
下面是 IBM MQ 连接设置、会话创建和消息发送的示例:
public class MessageSender {
private QueueConnectionFactory factory;
private QueueConnection connection;
private QueueSession session;
private QueueSender sender;
public MessageSender() throws JMSException {
factory = new JMSSetup().createConnectionFactory();
connection = factory.createQueueConnection();
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("QUEUE1");
sender = session.createSender(queue);
connection.start();
}
// ...
}
接下来,在 MessageSender
的构造函数中,我们使用 JMSSetup
类初始化 QueueConnectionFactory
。然后使用该工厂创建 QueueConnection
。该连接允许我们与 IBM MQ 服务器交互。
连接建立后,我们使用 createQueueSession()
创建一个 QueueSession
。该会话允许我们与队列通信。这里,我们通过 false
参数表示会话是非事务性的,通过 Session.AUTO_ACKNOWLEDGE
表示在收到消息时自动确认。
然后,定义特定队列 QUEUE1
,并创建一个 QueueSender
来处理消息发送。最后,启动连接,以确保会话处于活动状态并准备好发送消息。
4.2、写入文本消息 {#42写入文本消息}
现在,我们已经建立了连接、创建了会话、定义了队列并创建了消息生产者,可以向队列发送文本消息了:
public void sendMessage(String messageText) {
try {
TextMessage message = session.createTextMessage();
message.setText(messageText);
sender.send(message);
} catch (JMSException e) {
// 异常处理
} finally {
// 资源释放
}
}
首先,创建一个接收 messageText
参数的 sendMessage()
方法。sendMessage()
方法负责向队列发送文本消息。它会创建一个 TextMessage
对象,并使用 setText()
方法设置消息内容。
接下来,使用 QueueSender
对象的 send()
方法将消息发送到定义的队列。由于只要 MessageSender
对象存在,连接和会话就会一直处于打开状态,因此这种设计可以实现高效的消息传输。
4.3、消息类型 {#43消息类型}
除了 TextMessage
之外,IBM MQ 还支持其他多种消息类型,以满足不同的使用情况。例如,我们可以发送以下消息:
- BytesMessage:以字节形式保存的原始二进制消息。
- ObjectMessage:序列化的 Java 对象消息。
- MapMessage:包含键值对的消息。
- StreamMessage:包含原始数据类型流的消息。
5、从 IBM MQ 队列读取消息 {#5从-ibm-mq-队列读取消息}
接下来看看如何从队列中读取消息。
5.1、建立 JMS 连接并创建会话 {#51建立-jms-连接并创建会话}
首先,需要建立连接并创建会话,这与发送消息时的操作类似。
首先创建一个 MessageReceiver
类。该类处理与 IBM MQ 服务器的连接,并设置消息消费所需的组件:
public class MessageReceiver {
private QueueConnectionFactory factory;
private QueueConnection connection;
private QueueSession session;
private QueueReceiver receiver;
public MessageReceiver() throws JMSException {
factory = new JMSSetup().createConnectionFactory();
connection = factory.createQueueConnection();
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("QUEUE1");
receiver = session.createReceiver(queue);
connection.start();
}
// ...
}
在该类中,我们首先创建一个 QueueConnectionFactory
来建立与 IBM MQ 服务器的连接。然后,使用此连接创建一个 QueueSession
,它允许我们与队列交互。
最后,定义了特定队列 QUEUE1
,并创建了一个 QueueReceiver
来处理从队列传入的消息。
5.2、读取文本消息 {#52读取文本消息}
连接、会话和 receiver 设置完成后,就可以开始从队列中接收消息了。
使用 QueueReceiver
的 receive()
方法从指定队列中读取消息:
public void receiveMessage() {
try {
Message message = receiver.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
} else {
// ...
}
} catch (JMSException e) {
// 异常处理
} finally {
// 资源释放
}
}
在 receiveMessage()
方法中,我们使用 receive()
函数等待队列中的消息,超时时间为 1000 毫秒。收到消息后,会检查它是否是 TextMessage
类型。
如果是,则可以使用 getText()
方法获取实际的消息内容,该方法会以字符串形式返回文本内容。
6、消息属性和 Header {#6消息属性和-header}
本节将介绍一些常用的 消息属性和 Header,我们可以在发送或接收消息时使用它们。
6.1、消息属性 {#61消息属性}
消息属性(Message properties)可用于存储和检索消息正文以外的其他信息。这对于过滤消息或向消息中添加上下文数据特别有用。
以下是在发送消息时设置自定义属性的示例:
TextMessage message = session.createTextMessage();
message.setText(messageText);
// 设置属性值
message.setStringProperty("OrderID", "12345");
接下来,可以在接收消息时检索属性:
Message message = receiver.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
// 读取属性值
String orderID = message.getStringProperty("OrderID");
}
6.2、消息 Header {#62消息-header}
消息 Header 供预定义字段,其中包括有关消息的元数据。一些常用的 消息 Header 包括:
- JMSMessageID:由 JMS Provider 分配给每条消息的唯一标识符。可以使用此 ID 跟踪和记录消息。
- JMSExpiration:定义消息过期时间(毫秒)。如果消息在此时间内未送达,则会被丢弃。
- JMSTimestamp:消息发送时间。
- JMSPriority:消息的优先级。
在接收消息时检索消息 Header:
Message message = receiver.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String messageId = message.getJMSMessageID();
long timestamp = message.getJMSTimestamp();
long expiration = message.getJMSExpiration();
int priority = message.getJMSPriority();
}
7、使用 Mockito 进行模拟测试 {#7使用-mockito-进行模拟测试}
在本节中,我们使用 Mockito
来模拟依赖,并验证 MessageSender
和 MessageReceiver
类的交互。
首先,使用 @Mock
注解创建依赖的 Mock 实例。
接下来,验证 sendMessage()
方法是否正确与 Mock (模拟)的 QueueSender
交互。我们模拟了 QueueSender
的 send()
方法,并验证 TextMessage
是否被正确创建:
String messageText = "Hello Baeldung! Nice to meet you!";
doNothing().when(sender).send(any(TextMessage.class));
messageSender.sendMessage(messageText);
verify(sender).send(any(TextMessage.class));
verify(textMessage).setText(messageText);
最后,验证 receiveMessage()
方法能否与 Mock (模拟)的 QueueReceiver
正确交互。我们模拟 receive()
方法来返回预定义的 TextMessage
,结果如我们所料,消息文本被检索到了:
when(receiver.receive(anyLong())).thenReturn(textMessage);
when(textMessage.getText()).thenReturn("Hello Baeldung! Nice to meet you!");
messageReceiver.receiveMessage();
verify(textMessage).getText();
8、总结 {#8总结}
本文介绍了如何使用 Java JMS 读写 IBM MQ 队列,详细介绍了如何设置 JMS 连接、会话以及消息生产者/接收者,还介绍了 IBM MQ 支持的几种消息类型以及如何使用自定义消息属性和消息 Header。
Ref:https://www.baeldung.com/java-message-service-ibm-mq-read-write