51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

RocketMQ使用事项

大家好,我是指北君。

消息中间件是我们工作中使用最频繁的一类中间件,它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。今天,指北君就来详细讲讲RocketMQ生产者和消费者在使用时的一些注意事项


一. 生产者 {#一-生产者}

1.1 发送消息注意事项 {#11-发送消息注意事项}

1)消息大小

建议消息大小不要超过512K,

2)异步发送

默认的发送为同步发送,send方法会一直阻塞,等待broker端的响应。如果你关注性能问题,可以通过send(msg, callback)来发起异步调用。

3)生产者组

正常情况下生产者组是没有作用的,但是在发送事务消息时,如果producer中途意外宕机,broker会主动回调producer group 内的任意一台机器来确认事务的状态。(目前开源版本还不支持事务消息)

4)线程安全问题

生产者实例是线程安全的,在应用中只需要实例化一次即可

5)性能问题

如果你希望在一个jvm进程内使用多个producer实例来提高发送能,我们建议:

使用异步发送,并且producer实例只需要3 ~ 5个即可 对每一个producer 调用 setInstanceName,区别不同的生产者

6)发送超时时间

当客户端向broker发送请求超时时,客户端会抛出 RemotingTimeoutException,默认的超时时间是3秒。通过调用send(msg, timeout) 可以设置超时时间。超时时间建议不要设置过小,因为 broker 可能需要时间刷盘或向 slave 同步数据

7)对于同一个应用最好只使用一个Topic,消息的子类型可以使用 tags 来标识,tags 可以由应用自由设置。当发送的消息设置了 tags 时,消费方在订阅消息时可以使用 tags 在 broker 做消息过滤。注意这里的命名虽然是复数,但是一条消息只能有一个tag

8)消息在业务层面的唯一标识可以设置到 keys 字段,方便根据 keys 来定位消息。broker 会为每个消息创建索引(哈希索引),应用可以通过topic 、key 查询这条消息的内容(MessageExt),以及消息被谁消费(MessageTrack,精确到consumer group)。由于是哈希索引,请尽量保证key 的唯一,这样可以避免潜在的哈希冲突

9)消息发送不管是成功还是失败都要打印消息日志,日志内容务必包含 sendResult 和 key 字段

10)对于消息不可丢失的应用,务必要有消息重发机制。例如如果消息发送失败,可以将消息存储到数据库,然后通过定时程序或者人工的方式触发重发

11)调用send 同步发送消息时,假定此时设置了 isWaitStoreMsgOK=true(default is true),只要不抛出异常就代表发送成功,但当 isWaitStoreMsgOK = false 时,发送永远返回 SEND_OK。但是对于发送"成功"会有多个状态,在 SendStatus 中定义如下:

FLUSH_DISK_TIMEOUT

如果 broker 设置的 FlushDiskType = SYNC_FLUSH,当 broker 的在刷盘超时时(MessageStoreConfig.syncFlushTimeout,默认5秒)会返回该状态。此时消息任然保存在内存中,只有broker 宕机时消息才会丢失

FLUSH_SLAVE_TIMEOU

如果 broker 的 role 是 SYNC_MASTER,当 slave 同步数据的时间超过了 MessageStoreConfig.syncFlushTimeout (默认5秒) 时会返回此状态。此时只有主从都宕机,并且主也没有刷盘时,消息才会丢失

SLAVE_NOT_AVAILABLE

如果 broker 的 role 是 SYNC_MASTER,并且此时 slave 不可用时会返回该状态

SEND_OK

发送成功。为了保证消息不丢失还需要配置 SYNC_MASTER or SYNC_FLUSH 12)消息重复

当发送消息时返回 FLUSH_DISK_TIMEOUT/FLUSH_SLAVE_TIMEOUT,若非常不幸的 broker 也宕机了,消息将会丢失。此时如果什么都不做,消息可能会丢失,如果重发消息,消息可能会出现重复。

通常我们建议发送端重发消息,由消费方来保证消息消费的幂等性。

1.2 消息发送失败如何处理 {#12-消息发送失败如何处理}

Producer 的 send 方法本生支持内部重试,重试逻辑如下:

至多重试3次 如果发送失败,则轮转到下一个broker 这个方法的总耗时时间不超过 sendMsgTimeout,默认3秒 所以发送消息已经产生超时异常的话就不会再重试。 以上策略仍不能保证消息发送一定成功,为保证消息发送一定成功,建议应用这么做:如果调用 send 同步发送失败,则尝试将消息存储到db,由后台线程定时重试,保证消息一定到达 Broker。

1.3 oneway 的发送形式 {#13-oneway-的发送形式}

对于可靠性要求不高的应用,可以采用 oneway 的发送形式,oneway 形式不等待应答。

1.4 发送顺序消息 {#14-发送顺序消息}

顺序消息分为分区有序和全局有序。

分区有序要求 producer 在send 时传入 MessageQueueSelector 的实现类,最终将某一类消息发送到同一队列。但是一旦发生通信异常、broker 重启等,由于队列总数发生变化,哈希取模后定位的队列会变化,会产生短暂的顺序不一致。如果业务能容忍在集群异常情况下(如某个 broker 宕机或者重启)消息短暂的乱序,使用分区有序比较合适

全局严格有序的消息即便在异常情况下也能保证消息的有序性,但是却牺牲了分布式的 failover 特性,即 broker 集群中只有要一台机器不可用,则整个集群都不可用,服务可用性会大大降低。

顺序消息的缺点:

发送顺序消息无法利用集群的 FailOver 特性 消费顺序消息的并行度依赖于队列数量 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消费堆积问题 遇到消费失败的消息,无法跳过,当前队列需要暂停 5.发送事务消息 目前暂不支持


二. 消费者 {#二-消费者}

2.1 消费者组和订阅 {#21-消费者组和订阅}

不同的消费者组可以独立消费相同的topic,这点类似于ActiveMQ的虚拟 topic. 另外对于相同的消费者组,需要确保组内的消费者订阅消息的规则是一致的!

MQ 里的一个Consumer Group 代表一个 Consumer 实例群组。对于大多数分布式应用来说,一个 Consumer Group 下通常会挂载多个 Consumer 实例。订阅关系一致指的是同一个 Consumer Group 下所有 Consumer 实例的处理逻辑必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

由于 MQ 的订阅关系主要由 Topic+Tag 共同组成,因此,保持订阅关系一致意味着同一个 Consumer Group 下所有的实例需在以下两方面均保持一致:

订阅的 Topic 必须一致; 订阅的 Topic 中的 Tag 必须一致。

技术架构 > Consumer 最佳实践 > image2017-11-15 15:50:13.png

2.2 MessageListener {#22-messagelistener}

1)顺序消费 MessageListenerOrderly

顺序消费时消费者会锁定队列,以确保消息被顺序消费,但是这样也会造成一定的性能损耗。当消费出现异常的时候,建议不要抛出异常,而是返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,让消费暂停一会,暂停时间由 context.setSuspendCurrentQueueTimeMillis 方法指定

2)并发消费

并发消费是推荐的消费方式,在此种模式下,消息将被并发的消费。消费出现异常时不建议抛出异常,只需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 即可。为了保证消息肯定被至少消费一次,消息将会被重发回 broker (topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置,通过 delayLevelWhenNextConsume 和 MessageStoreConfig.messageDelayLevel 设置)后,再次投递到这个 ConsumerGroup,而如果一直这样重复消费都持续失败到一定次数(默认是16次,DefaultMQPushConsumer.maxReconsumeTimes),就会投递到DLQ队列。应用可以监控死信队列来做人工干预。

3)返回状态

在并行消费时可以通过返回 RECONSUME_LATER 来告诉 Consumer 当前无法消费该消息,等延时一段时间再重新消费,但是此时消费不会停止,你可以继续消费其他消息。但在顺序消费时,因为要保证消费的顺序性,所以你不能跳过失败的消息,此时你可以通过返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 来告诉 Consumer 先暂停一会。

4)阻塞

不建议阻塞Listener,因为这会阻塞住线程池,同时也有可能造成消费者线程终止

2.3 线程数 {#23-线程数}

consumer 内部通过一个 ThreadPoolExecutor 来消费消息,可以通过 setConsumeThreadMin 和 setConsumeThreadMax 来改变线程池的大小

2.4 ConsumeFromWhere {#24-consumefromwhere}

当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次Pull请求。

如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:

CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息

CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍

CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前 注意:这些配置只对全新的消费组有效,老的消费组都是按已经存储过的消费进度继续消费

对于老消费组想跳过历史消息可以采用以下几种方法:

1)判断消息的发送时间,太老的消息直接返回 CONSUME_SUCCESS

2)判断消息的 offset 和 MAX_OFFSET 的差距,如果落后太多,可以直接返回 CONSUME_SUCCESS

3)消费者启动前,先调整该消费组的消费进度,再开始消费。可以人工使用命令 resetOffsetByTimeStamp,详见 ResetOffsetByTimeCommand.java

2.5 消息幂等 {#25-消息幂等}

由于 RocketMQ 无法避免消费重复,所以如果业务对消息重复非常敏感,务必在业务层面去重

2.6 消费速度慢处理方式 {#26-消费速度慢处理方式}

1)提高消费并行度

大部分消息消费行为都属于 IO 密集型业务,适当的提高并发度可以显著的改善消费的吞吐量

2)批量方式消费

默认情况下 consumer 的 consumeMessageBatchMaxSize 为1,即一次只消费一个消息,如果应用可以批量消费消息,则可以很大程度上提高消费吞吐量

3)跳过非重要消息

当消堆积严重时可以丢弃不重要的消息

4)优化消息消费过程

2.7 打印消费日志 {#27-打印消费日志}

建议在消费入口方法打印消息,方便后续排查问题,消费失败时也打印失败日志

2.8 利用broker过滤消息,避免多余的消息传输 {#28-利用broker过滤消息避免多余的消息传输}


三. 小结 {#三-小结}

好了,RocketMQ生产者与消费者的使用事项就总结完毕了,相信大家对RocketMQ的使用应该会更有信心了

我是指北君,操千曲而后晓声,观千剑而后识器。感谢各位人才的:点赞、收藏和评论,我们下期更精彩!

赞(2)
未经允许不得转载:工具盒子 » RocketMQ使用事项