一、RocketMQ简介
RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
1、常见的MQ主要有:ActiveMQ、RabbitMQ、Kafka、RocketMQ
2、RocketMQ 基本概念
RocketMQ主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。
组件说明如下:
- Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息
- Producer Group:消息生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者
- Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费
- Consumer Group:消费者组,和生产者类似,消费同一类消息的多个Consumer 实例组成一个消费者组
- Topic:主题,用于将消息按主题做划分,Producer将消息发往指定的Topic,Consumer订阅该Topic就可以收到这条消息
- Message:消息,每个message必须指定一个topic,Message 还有一个可选的Tag设置,以便消费端可以基于Tag进行过滤消息
- Tag:标签,子主题(二级分类)对topic的进一步细化,用于区分同一个主题下的不同业务的消息
- Broker:Broker是RocketMQ的核心模块,负责接收并存储消息,同时提供Push/Pull接口来将消息发送给Consumer。Broker同时提供消息查询的功能,可以通过MessageID和MessageKey来查询消息。Borker会将自己的Topic配置信息实时同步到NameServer
- push:推模式:消息到达消息服务器之后,主动推送给消费者
- pull:拉模式:是消费端发起请求,主动向消息服务器(Broker)拉取消息
- Queue:Topic和Queue是1对多的关系,一个Topic下可以包含多个Queue,主要用于负载均衡,Queue数量设置建议不要比消费者数少。发送消息时,用户只指定Topic,Producer会根据Topic的路由信息选择具体发到哪个Queue上。Consumer订阅消息时,会根据负载均衡策略决定订阅哪些Queue的消息
- Offset:RocketMQ在存储消息时会为每个Topic下的每个Queue生成一个消息的索引文件,每个Queue都对应一个Offset记录当前Queue中消息条数
- NameServer:NameServer可以看作是RocketMQ的注册中心,它管理两部分数据:集群的Topic-Queue的路由配置;Broker的实时配置信息。其它模块通过Nameserv提供的接口获取最新的Topic配置和路由信息;各 NameServer 之间不会互相通信,各NameServer都有完整的路由信息,即无状态。
3、消费模式
1)广播模式
一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每一个Consumer都消费一次。
2)集群模式
一个Consumer Group中的所有Consumer平均分摊消费消息(一个消息只会被一个消费者消费)
4、消息类型
RocketMq消息类型分为三种:普通消息、顺序消息、事务消息
1)普通消息:有三种发送方式
- 单向发送:单向发送是指发送方只负责发送消息,不等待服务器回应,且没有回调函数触发。即只发送请求而不管响应。
- 同步发送:同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才会发送下一个数据包的通讯方式。
- 异步发送:异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下一个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
2)顺序消息
一般情况下,每个主题(topic)都会有多个消息队列(message queue),假设投递了同一个主题的十条消息,那么这十条消息会分散在不同的队列中。对于消费者而言,每个消息队列是等价的,就不能确保消息总体的顺序。而顺序消息的方案就是把这十条消息都投递到同一个消息队列中。顺序消息与普通消息同样有三种发送方式。
3)事务消息
RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。
事务消息发送步骤:
发送方将半事务消息发送至RocketMQ服务端。
RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功。由于消息为半事务消息,在未收到生产者对该消息的二次确认前,此消息被标记成"暂不能投递"状态。
发送方开始执行本地事务逻辑。
发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。
事务消息回查机制:
在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。
二、Centos 7.9安装RpcketMQ集群
1 、部署环境 准备
1)服务器环境(本次搭建双主双从同步双写模式)
|----|----------------|-----------------|-------------------------|----------------| | 序号 | IP地址 | 操作系统 | 角色 | 架构模式 | | 1 | 192.168.56.134 | CentOS 7.9.2009 | Dashboard | | | 2 | 192.168.56.137 | CentOS 7.9.2009 | nameserver、brokerserver | Master1、Slave2 | | 3 | 192.168.56.138 | CentOS 7.9.2009 | nameserver、brokerserver | Master2、Slave1 |
注:两台服务器各启动一个nameserver,各启动两个broker。
2 、 安装JDK
注:所以机器都需要安装
JDK下载地址:https://www.oracle.com/java/technologies/javase/javase8-archive-downloads.html
1)安装JDK-1.8
[root@localhost ~]# yum -y install jdk-8u202-linux-x64.rpm
2)查看是否安装成功
[root@localhost ~]# java -version
java version "1.8.0_202"
Java(TM) SE Runtime Environment (build 1.8.0_202-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.202-b08, mixed mode)
3 、 安装RocketMQ集群
官网下载地址:https://rocketmq.apache.org/zh/download
1)下载依赖包
[root@localhost ~]# yum -y install wget
2)下载RocketMQ安装包
[root@localhost ~]# wget https://archive.apache.org/dist/rocketmq/4.9.6/rocketmq-all-4.9.6-bin-release.zip
3)解压安装包
[root@localhost ~]# unzip rocketmq-all-4.9.6-bin-release.zip -d /usr/local
[root@localhost ~]# mv /usr/local/rocketmq-all-4.9.6-bin-release/ /usr/local/rocketmq
4)修改broker相关配置文件
在服务器192.168.56.137操作
[root@localhost ~]# cd /usr/local/rocketmq/conf/2m-2s-sync
[root@localhost conf]# vim broker-a.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0表示Master,大于0表示Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=192.168.56.137:9876;192.168.56.138:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/master
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/master/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/master/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/master/index
# checkpoint文件存储路径
storeCheckpoint=/usr/local/rocketmq/master/checkpoint
# abort文件存储路径
abortFile=/usr/local/rocketmq/master/abort
# 限制的消息大小
maxMessageSize=65536
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH
[root@localhost 2m-2s-sync]# vim broker-b-s.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
# 0表示Master,大于0表示Slave
brokerId=1
# nameServer地址,分号分割
namesrvAddr=192.168.56.137:9876;192.168.56.138:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=11011
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/slave
# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/slave/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/slave/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/slave/index
# checkpoint文件存储路径
storeCheckpoint=/usr/local/rocketmq/slave/checkpoint
# abort文件存储路径
abortFile=/usr/local/rocketmq/slave/abort
# 限制的消息大小
maxMessageSize=65536
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
在服务器192.168.56.138操作
[root@localhost ~]# cd /usr/local/rocketmq/conf/2m-2s-sync
[root@localhost conf]# vim broker-b.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0表示Master,大于0表示Slave
brokerId=1
# nameServer地址,分号分割
namesrvAddr=192.168.56.137:9876;192.168.56.138:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker对外服务的监听端口
listenPort=11011
# 删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/slave
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/slave/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/slave/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/slave/index
# checkpoint文件存储路径
storeCheckpoint=/usr/local/rocketmq/slave/checkpoint
# abort文件存储路径
abortFile=/usr/local/rocketmq/slave/abort
# 限制的消息大小
maxMessageSize=65536
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
[root@localhost 2m-2s-sync]# vim broker-a-s.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
# 0表示Master,大于0表示Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=192.168.56.137:9876;192.168.56.138:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/master
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/master/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/master/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/master/index
# checkpoint文件存储路径
storeCheckpoint=/usr/local/rocketmq/master/checkpoint
# abort文件存储路径
abortFile=/usr/local/rocketmq/master/abort
# 限制的消息大小
maxMessageSize=65536
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH
5)修改runserver.sh和runbroker.sh脚本的启动参数(根据自身配置修改)
[root@localhost 2m-2s-sync]# cd ../../bin
[root@localhost bin]# sed -i -e 's/4g/512m/g' -e 's/2g/256m/g' runserver.sh
[root@localhost bin]# sed -i 's/8g/512m/g' runbroker.sh
6)启动RocketMQ集群
注:需要启动NameServer和Broker。
启动两台服务器的NameServer
[root@localhost bin]# nohup ./mqnamesrv &
[root@localhost bin]# tail -fn 100 ~/logs/rocketmqlogs/namesrv.log
启动两台服务器的Broker
192.168.56.137
[root@localhost bin]# nohup ./mqbroker -c ../conf/2m-2s-sync/broker-a.properties &
[root@localhost bin]# nohup ./mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties &
192.168.56.138
[root@localhost bin]# nohup ./mqbroker -c ../conf/2m-2s-sync/broker-b.properties &
[root@localhost bin]# nohup ./mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties &
使用JPS命令查看JAVA进程
[root@localhost bin]# jps -m
13588 Jps -m
3179 NamesrvStartup
13468 BrokerStartup -c ../conf/2m-2s-sync/broker-b.properties
13517 BrokerStartup -c ../conf/2m-2s-sync/broker-a-s.properties
7)关闭RocketMQ命令
注:需要先先关闭Broker,然后再关闭NameServer。
[root@localhost bin]# ./mqshutdown broker
[root@localhost bin]# ./mqshutdown namesrv
三、安装RocketMQ-Dashboard
1、下载源代码
[root@localhost ~]# wget https://github.com/apache/rocketmq-dashboard/archive/refs/tags/rocketmq-dashboard-1.0.0.zip
[root@localhost ~]# unzip rocketmq-dashboard-1.0.0.zip
2、安装Maven
[root@localhost ~]# wget https://dlcdn.apache.org/maven/maven-3/3.9.3/binaries/apache-maven-3.9.3-bin.zip
3、解压及重命名和配置软连接
[root@localhost ~]# unzip apache-maven-3.9.3-bin.zip
[root@localhost ~]# mv apache-maven-3.9.3 /usr/local/maven
[root@localhost ~]# ln -sf /usr/local/maven/bin/mvn /usr/bin/
4、编译
[root@localhost ~]# cd rocketmq-dashboard-rocketmq-dashboard-1.0.0
[root@localhost rocketmq-dashboard-rocketmq-dashboard-1.0.0]# mvn clean package -Dmaven.test.skip=true
5、启动rocketmq-dashboard
[root@localhost rocketmq-dashboard-rocketmq-dashboard-1.0.0]# cd target
[root@localhost target]# nohup java -jar rocketmq-dashboard-1.0.0.jar &
6、访问rocketmq-dashboard
1)浏览器访问http://192.168.56.134:8080/,如下图所示
2)连接RocketMQ
3)切换中文语言
4)创建Topic
5)创建消费者组consumer
6)扩容Topic队列
7)发送消息
向指定Topic发送消息
发送结果
8)查看集群
历史上的今天
7 月
5