51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Centos 7.9安装RocketMQ集群和RocketMQ-Dashboard

一、RocketMQ简介

RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

1、常见的MQ主要有:ActiveMQ、RabbitMQ、Kafka、RocketMQ

2、RocketMQ 基本概念

RocketMQ主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。

组件说明如下:

  • Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息
  • Producer Group:消息生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者
  • Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费
  • Consumer Group:消费者组,和生产者类似,消费同一类消息的多个Consumer 实例组成一个消费者组
  • Topic:主题,用于将消息按主题做划分,Producer将消息发往指定的Topic,Consumer订阅该Topic就可以收到这条消息
  • Message:消息,每个message必须指定一个topic,Message 还有一个可选的Tag设置,以便消费端可以基于Tag进行过滤消息
  • Tag:标签,子主题(二级分类)对topic的进一步细化,用于区分同一个主题下的不同业务的消息
  • Broker:Broker是RocketMQ的核心模块,负责接收并存储消息,同时提供Push/Pull接口来将消息发送给Consumer。Broker同时提供消息查询的功能,可以通过MessageID和MessageKey来查询消息。Borker会将自己的Topic配置信息实时同步到NameServer
    • push:推模式:消息到达消息服务器之后,主动推送给消费者
    • pull:拉模式:是消费端发起请求,主动向消息服务器(Broker)拉取消息
  • Queue:Topic和Queue是1对多的关系,一个Topic下可以包含多个Queue,主要用于负载均衡,Queue数量设置建议不要比消费者数少。发送消息时,用户只指定Topic,Producer会根据Topic的路由信息选择具体发到哪个Queue上。Consumer订阅消息时,会根据负载均衡策略决定订阅哪些Queue的消息
  • Offset:RocketMQ在存储消息时会为每个Topic下的每个Queue生成一个消息的索引文件,每个Queue都对应一个Offset记录当前Queue中消息条数
  • NameServer:NameServer可以看作是RocketMQ的注册中心,它管理两部分数据:集群的Topic-Queue的路由配置;Broker的实时配置信息。其它模块通过Nameserv提供的接口获取最新的Topic配置和路由信息;各 NameServer 之间不会互相通信,各NameServer都有完整的路由信息,即无状态。

3、消费模式

1)广播模式

一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每一个Consumer都消费一次。

2)集群模式

一个Consumer Group中的所有Consumer平均分摊消费消息(一个消息只会被一个消费者消费)

4、消息类型

RocketMq消息类型分为三种:普通消息、顺序消息、事务消息

1)普通消息:有三种发送方式

  • 单向发送:单向发送是指发送方只负责发送消息,不等待服务器回应,且没有回调函数触发。即只发送请求而不管响应。
  • 同步发送:同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才会发送下一个数据包的通讯方式。
  • 异步发送:异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下一个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。

2)顺序消息

一般情况下,每个主题(topic)都会有多个消息队列(message queue),假设投递了同一个主题的十条消息,那么这十条消息会分散在不同的队列中。对于消费者而言,每个消息队列是等价的,就不能确保消息总体的顺序。而顺序消息的方案就是把这十条消息都投递到同一个消息队列中。顺序消息与普通消息同样有三种发送方式。

3)事务消息

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。

事务消息发送步骤:

发送方将半事务消息发送至RocketMQ服务端。

RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功。由于消息为半事务消息,在未收到生产者对该消息的二次确认前,此消息被标记成"暂不能投递"状态。

发送方开始执行本地事务逻辑。

发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查机制:

在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。

发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

二、Centos 7.9安装RpcketMQ集群

1 、部署环境 准备

1)服务器环境(本次搭建双主双从同步双写模式)

|----|----------------|-----------------|-------------------------|----------------| | 序号 | IP地址 | 操作系统 | 角色 | 架构模式 | | 1 | 192.168.56.134 | CentOS 7.9.2009 | Dashboard | | | 2 | 192.168.56.137 | CentOS 7.9.2009 | nameserver、brokerserver | Master1、Slave2 | | 3 | 192.168.56.138 | CentOS 7.9.2009 | nameserver、brokerserver | Master2、Slave1 |

注:两台服务器各启动一个nameserver,各启动两个broker。

2 安装JDK

注:所以机器都需要安装

JDK下载地址:https://www.oracle.com/java/technologies/javase/javase8-archive-downloads.html

1)安装JDK-1.8

[root@localhost ~]# yum -y install jdk-8u202-linux-x64.rpm

2)查看是否安装成功

[root@localhost ~]# java -version

java version "1.8.0_202"

Java(TM) SE Runtime Environment (build 1.8.0_202-b08)

Java HotSpot(TM) 64-Bit Server VM (build 25.202-b08, mixed mode)

3 安装RocketMQ集群

官网下载地址:https://rocketmq.apache.org/zh/download

1)下载依赖包

[root@localhost ~]# yum -y install wget

2)下载RocketMQ安装包

[root@localhost ~]# wget https://archive.apache.org/dist/rocketmq/4.9.6/rocketmq-all-4.9.6-bin-release.zip

3)解压安装包

[root@localhost ~]# unzip rocketmq-all-4.9.6-bin-release.zip -d /usr/local

[root@localhost ~]# mv /usr/local/rocketmq-all-4.9.6-bin-release/ /usr/local/rocketmq

4)修改broker相关配置文件

在服务器192.168.56.137操作

[root@localhost ~]# cd /usr/local/rocketmq/conf/2m-2s-sync

[root@localhost conf]# vim broker-a.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0表示Master,大于0表示Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=192.168.56.137:9876;192.168.56.138:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/master
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/master/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/master/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/master/index
# checkpoint文件存储路径
storeCheckpoint=/usr/local/rocketmq/master/checkpoint
# abort文件存储路径
abortFile=/usr/local/rocketmq/master/abort
# 限制的消息大小
maxMessageSize=65536
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH

[root@localhost 2m-2s-sync]# vim broker-b-s.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
# 0表示Master,大于0表示Slave
brokerId=1
# nameServer地址,分号分割
namesrvAddr=192.168.56.137:9876;192.168.56.138:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=11011
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/slave
# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/slave/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/slave/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/slave/index
# checkpoint文件存储路径
storeCheckpoint=/usr/local/rocketmq/slave/checkpoint
# abort文件存储路径
abortFile=/usr/local/rocketmq/slave/abort
# 限制的消息大小
maxMessageSize=65536
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

在服务器192.168.56.138操作

[root@localhost ~]# cd /usr/local/rocketmq/conf/2m-2s-sync

[root@localhost conf]# vim broker-b.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0表示Master,大于0表示Slave
brokerId=1
# nameServer地址,分号分割
namesrvAddr=192.168.56.137:9876;192.168.56.138:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker对外服务的监听端口
listenPort=11011
# 删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/slave
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/slave/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/slave/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/slave/index
# checkpoint文件存储路径
storeCheckpoint=/usr/local/rocketmq/slave/checkpoint
# abort文件存储路径
abortFile=/usr/local/rocketmq/slave/abort
# 限制的消息大小
maxMessageSize=65536
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

[root@localhost 2m-2s-sync]# vim broker-a-s.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 集群名字
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
# 0表示Master,大于0表示Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=192.168.56.137:9876;192.168.56.138:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/master
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/master/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/master/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/master/index
# checkpoint文件存储路径
storeCheckpoint=/usr/local/rocketmq/master/checkpoint
# abort文件存储路径
abortFile=/usr/local/rocketmq/master/abort
# 限制的消息大小
maxMessageSize=65536
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH

5)修改runserver.sh和runbroker.sh脚本的启动参数(根据自身配置修改)

[root@localhost 2m-2s-sync]# cd ../../bin

[root@localhost bin]# sed -i -e 's/4g/512m/g' -e 's/2g/256m/g' runserver.sh

[root@localhost bin]# sed -i 's/8g/512m/g' runbroker.sh

6)启动RocketMQ集群

注:需要启动NameServer和Broker。

启动两台服务器的NameServer

[root@localhost bin]# nohup ./mqnamesrv &

[root@localhost bin]# tail -fn 100 ~/logs/rocketmqlogs/namesrv.log

启动两台服务器的Broker

192.168.56.137

[root@localhost bin]# nohup ./mqbroker -c ../conf/2m-2s-sync/broker-a.properties &

[root@localhost bin]# nohup ./mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties &

192.168.56.138

[root@localhost bin]# nohup ./mqbroker -c ../conf/2m-2s-sync/broker-b.properties &

[root@localhost bin]# nohup ./mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties &

使用JPS命令查看JAVA进程

[root@localhost bin]# jps -m

13588 Jps -m

3179 NamesrvStartup

13468 BrokerStartup -c ../conf/2m-2s-sync/broker-b.properties

13517 BrokerStartup -c ../conf/2m-2s-sync/broker-a-s.properties

7)关闭RocketMQ命令

注:需要先先关闭Broker,然后再关闭NameServer。

[root@localhost bin]# ./mqshutdown broker

[root@localhost bin]# ./mqshutdown namesrv

三、安装RocketMQ-Dashboard

1、下载源代码

[root@localhost ~]# wget https://github.com/apache/rocketmq-dashboard/archive/refs/tags/rocketmq-dashboard-1.0.0.zip

[root@localhost ~]# unzip rocketmq-dashboard-1.0.0.zip

2、安装Maven

[root@localhost ~]# wget https://dlcdn.apache.org/maven/maven-3/3.9.3/binaries/apache-maven-3.9.3-bin.zip

3、解压及重命名和配置软连接

[root@localhost ~]# unzip apache-maven-3.9.3-bin.zip

[root@localhost ~]# mv apache-maven-3.9.3 /usr/local/maven

[root@localhost ~]# ln -sf /usr/local/maven/bin/mvn /usr/bin/

4、编译

[root@localhost ~]# cd rocketmq-dashboard-rocketmq-dashboard-1.0.0

[root@localhost rocketmq-dashboard-rocketmq-dashboard-1.0.0]# mvn clean package -Dmaven.test.skip=true

5、启动rocketmq-dashboard

[root@localhost rocketmq-dashboard-rocketmq-dashboard-1.0.0]# cd target

[root@localhost target]# nohup java -jar rocketmq-dashboard-1.0.0.jar &

6、访问rocketmq-dashboard

1)浏览器访问http://192.168.56.134:8080/,如下图所示

2)连接RocketMQ

3)切换中文语言

4)创建Topic

5)创建消费者组consumer

6)扩容Topic队列

7)发送消息

向指定Topic发送消息

发送结果

8)查看集群

Centos 7.9安装RocketMQ集群和RocketMQ-Dashboard
继续阅读

历史上的今天

7 月
5

赞(0)
未经允许不得转载:工具盒子 » Centos 7.9安装RocketMQ集群和RocketMQ-Dashboard