51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

深度剖析 RocketMQ 事务消息!

你好,我是猿java。

这篇文章,我们将深入探讨 RocketMQ 的事务消息原理,并从源码角度进行分析,以及事务消息适合什么场景,使用事务消息需要注意哪些事项。

什么是事务消息 {#什么是事务消息}

事务消息是为了保证分布式系统中消息的一致性而引入的一种消息类型。事务消息允许消息发送方在发送消息后,进行本地事务操作,并根据本地事务的执行结果来决定消息的最终状态(提交或回滚)。

RocketMQ 事务消息的基本流程 {#RocketMQ-事务消息的基本流程}

RocketMQ 采用了 2PC(两段式协议) + 补偿机制(事务回查)的方式实现分布式事务功能,保证分布式事务的最终一致性。

1. 准备阶段(Prepare Phase):

  • 生产者发送一个half消息(Prepare Message)到 RocketMQ broker,这个消息会被存储但不会被消费者消费。
  • RocketMQ broker 接收到half消息后会进行持久化,并返回消息的状态给生产者。

2. 提交阶段(Commit Phase):

  • 生产者根据本地事务的执行结果(成功或失败)决定向 RocketMQ broker 发送提交(Commit)或回滚(Rollback)操作。
  • 如果本地事务执行成功,生产者发送提交消息,RocketMQ broker 会将half消息标记为可消费,消费者可以进行消费。
  • 如果本地事务执行失败,生产者发送回滚消息,RocketMQ broker 会删除half消息,消费者不会消费这条消息。

补偿机制

RocketMQ 还提供了一个事务回查机制,如果生产者在发送half事务消息后由于网络或其他原因未能及时通知 RocketMQ 提交或回滚消息,RocketMQ 会定期(默认 60s)回查生产者的事务状态,以决定如何处理这条half事务消息,以确保消息的一致性。

整个过程如下图:

img

为了更好的理解整个过程,我们通过一个完整的示例代码来展示:

|---------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | public class TransactionProducer { public static void main(String[] args) throws Exception { // 创建事务消息生产者 TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup"); producer.setNamesrvAddr("localhost:9876"); // 设置事务状态回查监听器 producer.setTransactionCheckListener(new TransactionCheckListener() { @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) { // 处理事务状态回查逻辑 System.out.println("Checking transaction state for message: " + new String(msg.getBody())); return LocalTransactionState.COMMIT_MESSAGE; } }); // 启动生产者 producer.start(); // 发送事务消息 Message msg = new Message("TransactionTopic", "TagA", "Transaction Message".getBytes()); SendResult sendResult = producer.sendMessageInTransaction(msg, new LocalTransactionExecuter() { @Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { // 执行本地事务逻辑 System.out.println("Executing local transaction for message: " + new String(msg.getBody())); // 假设本地事务执行成功,返回 COMMIT_MESSAGE // 如果本地事务失败,返回 ROLLBACK_MESSAGE return LocalTransactionState.COMMIT_MESSAGE; } }, null); System.out.println("Send result: " + sendResult); // 阻塞主线程,防止退出 System.in.read(); // 关闭生产者 producer.shutdown(); } } |

RocketMQ 事务消息的源码分析 {#RocketMQ-事务消息的源码分析}

发送half消息 {#发送half消息}

发送half消息的核心代码在 TransactionMQProducer 类中,通过 sendMessageInTransaction 方法实现:

|------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 | public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, Object arg) { // 1. 发送`half`消息 SendResult sendResult = this.defaultMQProducerImpl.send(msg); // 2. 执行本地事务 LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); // 3. 根据本地事务状态提交或回滚消息 this.endTransaction(msg, localTransactionState); return new TransactionSendResult(sendResult, localTransactionState); } |

在 sendMessageInTransaction 方法中,首先调用 send 方法发送half消息,然后执行本地事务,并根据本地事务的结果调用 endTransaction 方法提交或回滚消息。

执行本地事务 {#执行本地事务}

本地事务的执行由 LocalTransactionExecuter 接口的实现类来完成。在实际使用中,用户需要实现该接口,并在 executeLocalTransactionBranch 方法中定义具体的本地事务逻辑。

|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 | public interface LocalTransactionExecuter { LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg); } |

提交或回滚事务消息 {#提交或回滚事务消息}

提交或回滚事务消息的实现也在 TransactionMQProducer 类中,通过 endTransaction 方法完成:

|------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 | private void endTransaction(Message msg, LocalTransactionState localTransactionState) { // 构建事务结束请求 EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setCommitOrRollback(localTransactionState == LocalTransactionState.COMMIT_MESSAGE ? 0 : 1); requestHeader.setTranStateTableOffset(msg.getQueueOffset()); requestHeader.setCommitLogOffset(msg.getCommitLogOffset()); // 发送事务结束请求到 Broker this.defaultMQProducerImpl.endTransaction(msg, requestHeader); } |

在 endTransaction 方法中,根据本地事务的执行结果构建事务结束请求,并调用 endTransaction 方法将请求发送到 Broker。

事务状态回查 {#事务状态回查}

事务状态回查是由 Broker 发起的。当 Broker 在规定时间内没有收到提交或回滚请求时,会主动向消息发送方发起事务状态回查。回查的实现主要在 TransactionCheckListener 接口中:

|---------------|-------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 | public interface TransactionCheckListener { LocalTransactionState checkLocalTransactionState(final MessageExt msg); } |

消息发送方需要实现 TransactionCheckListener 接口,并在 checkLocalTransactionState 方法中定义如何检查本地事务的状态。

Broker 端的事务消息处理 {#Broker-端的事务消息处理}

Broker 端的事务消息处理主要在 TransactionalMessageServiceImpl 类中实现。Broker 负责接收half消息、提交或回滚请求,并在必要时发起事务状态回查。

接收half消息 {#接收half消息}

Broker 接收half消息的逻辑在 TransactionalMessageServiceImpl 类的 prepareMessage 方法中:

|-----------------|------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 | public PutMessageResult prepareMessage(MessageExtBrokerInner msgInner) { // 存储`half`消息 return this.store.putMessage(msgInner); } |

提交或回滚消息 {#提交或回滚消息}

Broker 处理提交或回滚请求的逻辑在 TransactionalMessageServiceImpl 类的 commitMessage 和 rollbackMessage 方法中:

|---------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 | public boolean commitMessage(MessageExt msgExt) { // 提交消息 return this.store.commitTransaction(msgExt); } public boolean rollbackMessage(MessageExt msgExt) { // 回滚消息 return this.store.rollbackTransaction(msgExt); } |

事务状态回查 {#事务状态回查-1}

Broker 发起事务状态回查的逻辑在 TransactionalMessageServiceImpl 类的 check 方法中:

|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 | public void check(long transactionTimeout, int transactionCheckMax, String topic) { // 遍历`half`消息队列,发起事务状态回查 List<MessageExt> halfMessages = this.store.getHalfMessages(topic); for (MessageExt msg : halfMessages) { // 发起回查请求 this.brokerController.getBroker2Client().checkProducerTransactionState(msg); } } |

RocketMQ 事务消息的优缺点 {#RocketMQ-事务消息的优缺点}

优点 {#优点}

  • 保证消息一致性:通过事务消息,RocketMQ 能够保证分布式系统中消息的一致性,避免数据不一致问题。
  • 高性能:RocketMQ 的事务消息性能较高,能够满足高并发场景的需求。
  • 易用性:RocketMQ 提供了简单易用的 API,使得开发者能够方便地使用事务消息。

缺点 {#缺点}

  • 复杂性:事务消息的引入增加了系统的复杂性,开发者需要处理事务状态回查等问题。
  • 时延:事务消息的处理涉及half消息、回查等操作,可能会增加消息的时延。

事务消息适用场景 {#事务消息适用场景}

资金转账 {#资金转账}

在金融系统中,资金转账需要确保资金的一致性和安全性。例如,从账户 A 转账到账户 B,必须确保 A 的金额减少和 B 的金额增加是一个原子操作。使用事务消息可以保证在转账过程中,如果任何一个步骤失败,整个操作都会回滚,确保数据一致性。

订单处理 {#订单处理}

在电子商务系统中,订单处理通常涉及多个步骤,例如创建订单、扣减库存、生成支付记录等。这些步骤需要保证一致性。使用事务消息可以确保如果某一步操作失败,整个订单处理过程可以回滚,避免数据不一致。

分布式事务 {#分布式事务}

在微服务架构中,分布式事务是一个常见的挑战。多个微服务之间的操作需要协调一致,事务消息可以作为一种分布式事务解决方案,确保各个微服务之间的数据一致性。

库存管理 {#库存管理}

在库存管理系统中,库存的增减操作需要保证一致性。例如,用户下单后需要扣减库存,使用事务消息可以确保在扣减库存失败时,订单状态不会被错误更新。

事务消息注意事项 {#事务消息注意事项}

确保本地事务的幂等性 {#确保本地事务的幂等性}

在分布式系统中,本地事务操作可能会被多次执行。例如,在事务状态回查时,Broker 可能会多次检查本地事务状态。因此,确保本地事务操作的幂等性非常重要。幂等性可以确保多次执行相同的操作不会产生副作用。

设置合理的超时时间 {#设置合理的超时时间}

事务消息的处理涉及half消息、提交或回滚请求以及事务状态回查。设置合理的超时时间可以避免长时间等待,影响系统性能。超时时间应根据实际业务需求和系统性能进行调整。

处理事务状态回查 {#处理事务状态回查}

事务状态回查是事务消息的重要机制。当 Broker 在规定时间内没有收到提交或回滚请求时,会主动发起事务状态回查。开发者需要实现 TransactionCheckListener 接口,并在 checkLocalTransactionState 方法中处理回查逻辑,确保能够正确返回事务状态。

监控和日志 {#监控和日志}

监控和日志是确保事务消息系统稳定运行的重要手段。通过监控,可以及时发现系统中的异常情况,例如事务状态回查失败、消息发送失败等。日志记录可以帮助开发者排查问题,分析系统性能。

资源隔离 {#资源隔离}

在使用事务消息时,确保事务消息与其他普通消息的资源隔离,以避免相互影响。例如,可以为事务消息单独配置 Topic 和队列,确保事务消息的处理不受其他消息影响。

事务消息的重试机制 {#事务消息的重试机制}

在某些情况下,事务消息的提交或回滚请求可能会失败。开发者需要考虑实现重试机制,以确保最终能够正确提交或回滚事务消息。重试机制可以通过定时任务或消息队列实现。

性能影响 {#性能影响}

事务消息的处理涉及多次网络通信和状态检查,可能会对系统性能产生一定影响。在高并发场景中,需要评估事务消息对系统性能的影响,并进行相应的优化。例如,可以通过批量处理、异步处理等方式提高性能。

总结 {#总结}

本文详细介绍了 RocketMQ 事务消息的基本流程,并通过源码分析揭示了其内部实现原理,尽管事务消息增加了系统的复杂性,但在需要保证消息一致性的场景中,它仍然是一种非常有效的解决方案,比如资金转账、订单处理、分布式事务、库存管理等场景。

参考资料 {#参考资料}

rocketmq官网

赞(0)
未经允许不得转载:工具盒子 » 深度剖析 RocketMQ 事务消息!