51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Java JMS 读写 IBM MQ 队列

1、简介 {#1简介}

本文将会带你了解如何使用 Java JMS(Java Message Service)从 IBM MQ 队列读写消息。

2、设置环境 {#2设置环境}

我们可以在 Docker 容器中运行 IBM MQ,以避免手动安装和配置的复杂性。

使用以下命令以基本配置运行容器:

docker run -d --name my-mq -e LICENSE=accept -e MQ_QMGR_NAME=QM1 MQ_QUEUE_NAME=QUEUE1 -p 1414:1414 -p 9443:9443 ibmcom/mq

接下来,需要在 pom.xml 文件中添加 IBM MQ 客户端

<dependency>
    <groupId>com.ibm.mq</groupId>
    <artifactId>com.ibm.mq.allclient</artifactId>
    <version>9.4.0.0</version>
</dependency>

3、配置 JMS Connection {#3配置-jms-connection}

首先,我们需要用 QueueConnectionFactory 建立 JMS Connection(连接),用于创建与队列管理器(Queue Manager)的连接:

public class JMSSetup {
    public QueueConnectionFactory createConnectionFactory() throws JMSException {
        MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
        factory.setHostName("localhost");
        factory.setPort(1414);
        factory.setQueueManager("QM1");
        factory.setChannel("SYSTEM.DEF.SVRCONN"); 
        
        return factory;
    }
}

首先创建一个 MQQueueConnectionFactory 实例,用于配置和创建与 IBM MQ 服务器的连接。我们将主机名设置为 localhost,因为 MQ 服务器是在本地 Docker 容器内运行的。暴露的映射端口为 1414

然后,使用默认的 SYSTEM.DEF.SVRCONN channel。这是客户端连接 IBM MQ 的常用 channel。

4、写入消息到 IBM MQ 队列 {#4写入消息到-ibm-mq-队列}

本节将带你了解向 IBM MQ 队列发送消息的过程。

4.1、建立 JMS 连接 {#41建立-jms-连接}

首先,创建 MessageSender 类。该类负责设置与 IBM MQ 服务器的连接、管理会话和处理消息发送操作。我们声明 QueueConnectionFactoryQueueConnectionQueueSessionQueueSender 实例的变量,这些变量将用于与 IBM MQ 服务器交互。

下面是 IBM MQ 连接设置、会话创建和消息发送的示例:

public class MessageSender {
    private QueueConnectionFactory factory;
    private QueueConnection connection;
    private QueueSession session;
    private QueueSender sender;

    public MessageSender() throws JMSException {
        factory = new JMSSetup().createConnectionFactory();
        connection = factory.createQueueConnection();
        session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("QUEUE1");
        sender = session.createSender(queue);
        connection.start();
    }

    // ...
}

接下来,在 MessageSender 的构造函数中,我们使用 JMSSetup 类初始化 QueueConnectionFactory。然后使用该工厂创建 QueueConnection。该连接允许我们与 IBM MQ 服务器交互。

连接建立后,我们使用 createQueueSession() 创建一个 QueueSession。该会话允许我们与队列通信。这里,我们通过 false 参数表示会话是非事务性的,通过 Session.AUTO_ACKNOWLEDGE 表示在收到消息时自动确认。

然后,定义特定队列 QUEUE1,并创建一个 QueueSender 来处理消息发送。最后,启动连接,以确保会话处于活动状态并准备好发送消息。

4.2、写入文本消息 {#42写入文本消息}

现在,我们已经建立了连接、创建了会话、定义了队列并创建了消息生产者,可以向队列发送文本消息了:

public void sendMessage(String messageText) {
    try {
        TextMessage message = session.createTextMessage();
        message.setText(messageText);
        sender.send(message);
    } catch (JMSException e) {
        // 异常处理
    } finally {
        // 资源释放
    }
}

首先,创建一个接收 messageText 参数的 sendMessage() 方法。sendMessage() 方法负责向队列发送文本消息。它会创建一个 TextMessage 对象,并使用 setText() 方法设置消息内容。

接下来,使用 QueueSender 对象的 send() 方法将消息发送到定义的队列。由于只要 MessageSender 对象存在,连接和会话就会一直处于打开状态,因此这种设计可以实现高效的消息传输。

4.3、消息类型 {#43消息类型}

除了 TextMessage 之外,IBM MQ 还支持其他多种消息类型,以满足不同的使用情况。例如,我们可以发送以下消息:

  • BytesMessage:以字节形式保存的原始二进制消息。
  • ObjectMessage:序列化的 Java 对象消息。
  • MapMessage:包含键值对的消息。
  • StreamMessage:包含原始数据类型流的消息。

5、从 IBM MQ 队列读取消息 {#5从-ibm-mq-队列读取消息}

接下来看看如何从队列中读取消息。

5.1、建立 JMS 连接并创建会话 {#51建立-jms-连接并创建会话}

首先,需要建立连接并创建会话,这与发送消息时的操作类似。

首先创建一个 MessageReceiver 类。该类处理与 IBM MQ 服务器的连接,并设置消息消费所需的组件:

public class MessageReceiver {
    private QueueConnectionFactory factory;
    private QueueConnection connection;
    private QueueSession session;
    private QueueReceiver receiver;

    public MessageReceiver() throws JMSException {
        factory = new JMSSetup().createConnectionFactory();
        connection = factory.createQueueConnection();
        session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("QUEUE1");
        receiver = session.createReceiver(queue);
        connection.start();
    }

    // ...
}

在该类中,我们首先创建一个 QueueConnectionFactory 来建立与 IBM MQ 服务器的连接。然后,使用此连接创建一个 QueueSession,它允许我们与队列交互。

最后,定义了特定队列 QUEUE1,并创建了一个 QueueReceiver 来处理从队列传入的消息。

5.2、读取文本消息 {#52读取文本消息}

连接、会话和 receiver 设置完成后,就可以开始从队列中接收消息了。

使用 QueueReceiverreceive() 方法从指定队列中读取消息:

public void receiveMessage() {
    try {
        Message message = receiver.receive(1000);
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
        } else {
            // ...
        }
    } catch (JMSException e) {
        // 异常处理
    } finally {
        // 资源释放
    }
}

receiveMessage() 方法中,我们使用 receive() 函数等待队列中的消息,超时时间为 1000 毫秒。收到消息后,会检查它是否是 TextMessage 类型。

如果是,则可以使用 getText() 方法获取实际的消息内容,该方法会以字符串形式返回文本内容。

6、消息属性和 Header {#6消息属性和-header}

本节将介绍一些常用的 消息属性和 Header,我们可以在发送或接收消息时使用它们。

6.1、消息属性 {#61消息属性}

消息属性(Message properties)可用于存储和检索消息正文以外的其他信息。这对于过滤消息或向消息中添加上下文数据特别有用。

以下是在发送消息时设置自定义属性的示例:

TextMessage message = session.createTextMessage();
message.setText(messageText);

// 设置属性值
message.setStringProperty("OrderID", "12345");

接下来,可以在接收消息时检索属性:

Message message = receiver.receive(1000);
if (message instanceof TextMessage) {
    TextMessage textMessage = (TextMessage) message;

    // 读取属性值
    String orderID = message.getStringProperty("OrderID");
} 

6.2、消息 Header {#62消息-header}

消息 Header 供预定义字段,其中包括有关消息的元数据。一些常用的 消息 Header 包括:

  • JMSMessageID:由 JMS Provider 分配给每条消息的唯一标识符。可以使用此 ID 跟踪和记录消息。
  • JMSExpiration:定义消息过期时间(毫秒)。如果消息在此时间内未送达,则会被丢弃。
  • JMSTimestamp:消息发送时间。
  • JMSPriority:消息的优先级。

在接收消息时检索消息 Header:

Message message = receiver.receive(1000);

if (message instanceof TextMessage) {
    TextMessage textMessage = (TextMessage) message;
    String messageId = message.getJMSMessageID();
    long timestamp = message.getJMSTimestamp();
    long expiration = message.getJMSExpiration();
    int priority = message.getJMSPriority();
}

7、使用 Mockito 进行模拟测试 {#7使用-mockito-进行模拟测试}

在本节中,我们使用 Mockito 来模拟依赖,并验证 MessageSenderMessageReceiver 类的交互。

首先,使用 @Mock 注解创建依赖的 Mock 实例。

接下来,验证 sendMessage() 方法是否正确与 Mock (模拟)的 QueueSender 交互。我们模拟了 QueueSendersend() 方法,并验证 TextMessage 是否被正确创建:

String messageText = "Hello Baeldung! Nice to meet you!";
doNothing().when(sender).send(any(TextMessage.class));

messageSender.sendMessage(messageText);

verify(sender).send(any(TextMessage.class));
verify(textMessage).setText(messageText);

最后,验证 receiveMessage() 方法能否与 Mock (模拟)的 QueueReceiver 正确交互。我们模拟 receive() 方法来返回预定义的 TextMessage,结果如我们所料,消息文本被检索到了:

when(receiver.receive(anyLong())).thenReturn(textMessage);
when(textMessage.getText()).thenReturn("Hello Baeldung! Nice to meet you!");

messageReceiver.receiveMessage();
verify(textMessage).getText();

8、总结 {#8总结}

本文介绍了如何使用 Java JMS 读写 IBM MQ 队列,详细介绍了如何设置 JMS 连接、会话以及消息生产者/接收者,还介绍了 IBM MQ 支持的几种消息类型以及如何使用自定义消息属性和消息 Header。


Ref:https://www.baeldung.com/java-message-service-ibm-mq-read-write

赞(1)
未经允许不得转载:工具盒子 » Java JMS 读写 IBM MQ 队列