使用消息中间件的好处是模块之间的解耦,可以提升服务的并发处理能力。比如,一个服务接收用户请求后只将数据存储,然后马上响应客户端,而将耗时的操作交给一个后台服务慢慢去消费,自然服务的并发处理能力就提升上来了。
关于消息中间件的介绍我就不多言了,今天写这篇的目的分享下RocketMQ的集群搭建,还有一些需要注意的细节。
为什么选择RocketMQ
看过网上很多的各大消息中间件对比文章,但万事是没有绝对的,至少用哪种还是看你喜欢。
最初同事是建议我选择RabbitMQ,实际上RabbitMQ与RocketMQ在性能上差异不大,这是看了别人写的对比文章得出的结论,因为项目需求紧急,我也没有太多时间去考量。
RabbitMQ实现了标准的AMQP协议,而RocketMQ是自创一派。RabbitMQ支持多种语言,这并不是因为它使用Erlang语言的缘故,如果RocketMQ底层通信协议不使用依赖Java的序列号化协议,也可以支持各种语言,只要它想,它可以提供各种语言的sdk包给你。
我选择RocketMQ的原因,一是完全满足此次的需求以及后续的需求,这个决对要放在第一位;二是RocketMQ是Java写的,是我们团队最熟悉的语言,部署装个JDK比让我倒腾个Erlang容易;三是我们项目都是Java语言所编写,不需要支持多种语言,这个项目也不打算使用别的语言,这个弱点对我们来说并不是弱点;四是RocketMQ是国内产的,文档有中文,网上也能找出很多的博客,上手容易。对于没接触过MQ的我来说,我更看重以上这些,所以我选择了RocketMQ。
当然,RocketMQ也是经历过双十一的男人,既然能让你爽。RabbitMQ和RocketMQ我都去了解过点原理,入手一个框架最快的速度,当然是要先懂其原理,只有了解才懂得去用,至于怎么用好,那都是后话。
集群搭建方案
RocketMQ的集群搭建相对几个消息中间件来说还是简单的,但也有不足的地方,比如我需要搭建多主多从,那么我就需要在每台服务器上都启动一个namesrv服务,它的作用与kafka使用zookeeper监控集群状态,负责主从切换一样。也还好,只要不需要扩展多节点,对于一主一从或两主两从而言,这就不是毛病。
(推荐一篇博客:RocketMQ4.3.0集群搭建和部署rocketMq监控平台,介绍得还可以。)
RocketMQ的官网下载链接
https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip
复制链接,在服务器使用wget下载即可
wget http://apache.01link.hk/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip
使用unzip命令解压到你想安装的位置
unzip rocketmq-all-4.5.1-bin-release.zip
从官网上下载RocketMQ,解压后能看到一个conf目录,里面给了几种配置方案,可以参考来配置。
1、单机方案,broker.conf
2、两个主节点,不需要从节点,2m-noslave
3、两主两从,两主两从也分两种。一种是主从同步2m-2s-sync,一种是主从异步2m-2s-async。
主从同步与主从异步。如果一个broker组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制方式。同步复制是等Master和Slave均写成功后才反馈给生产者写成功状态;异步复制方式是只要Master写成功即可反馈给生产者写成功状态。
我选择的是主从异步复制方式。最后是两主两从,但一主一从已经满足我的需求。其实这里并不局限这几种部署方案,我可以按需配置,官方也只是给个参考方案。
比如一开始我是以2m-2s-async来配置部署的,但是我发现机器不够,只找到了两台机器部署,而且每台都仅剩余1.5g大小的内存可用,显然只能放弃,然后就只有两主无从2m-noslave或一主一从1m-1s-async可选择。(同步是对于主从来说的,所以两主无从就不存在同步异步复制的说法。)
但是我发现,同样的配置,我只需要按需启动即可,并不需要重新配置。下面给出我的配置方案,同时给出启动脚本。
“.properties”结尾的是broker的配置文件。这是一个两主两从的配置方案。其中broker-a-m与broker-a-s是名为broker-a的broker的配置文件,以“m”结尾的是主节点的配置,“s”结尾的是从节点的配置,而broker-b-m与broker-b-s分别是broker-b的主节点和从节点的配置。
还不了解什么是Namesrv与Broker的可以先从了解一下什么是RocketMQ。
”.sh”结尾的是启动脚本文件,这些脚本文件只是便于管理集群而已。“-stop.sh”是停止这台机器上的namesrv服务和所有broker服务的脚本文件。”-status.sh”是查看集群状态的脚本文件。
“-broker*”是broker-a与broker-b的主从节点的启动脚本,同样,以m结尾的是主节点的启动脚本,以s结尾的是从节点的启动脚本。
除了namesrv是每台服务器都需要启动的之外,四个broker我们可以按需启动。将”broker-a-m.sh”与”broker-b-s.sh”上传到A机器,将”broker-a-s”与”broker-b-m.sh”上传到B机器。“nameserver.sh”、“stop.sh”、”status.sh”这三个脚本文件每台机器都上传一份。
启动两主两从集群
两个broker。broker-a、broker-b的配置文件不要放在同一台机器上。比如broker-a的主节点配置放在A机器上,而从节点的配置则放在B机器上;同样的,对于broker-b,主节点的配置放在B机器上,从节点的配置则放在A机器上。很好理解,一个鸡蛋不要放同一个篮子里,比如A机器挂了,这时候B机器上运行的是broker-a的从节点、broker-b的主节点,这时broker-a的从节点就会变为主节点,两个broker都能正常工作。
集群的启动顺序。先分别启动两台机器上的namesrv服务,再分别启动两台机器上的broker主节点服务,最后分别启动两台机器上broker的从节点服务。
简单点说,就是主从交叉启动,先启动主再启动从。
启动一主一从集群
当你不需要配两个broker的时候,你也不需要去修改配置文件,只需要在A机器上启动broker-a的主节点,然后到B机器上启动broker-a的从节点就可以实现一主一从,保障可用性。
集群的启动顺序。先分别启动两台机器上的namesrv服务,再分别启动A机器的broker-a的主节点服务,最后启动B机器上的broker-a的从节点的服务。
配置参数说明
brokerClusterName:这是集群的名称
brokerName:broker的名称,给broker取个名称,
brokerId:用id标识broker,0为主,1为从,多个从按顺序往下加即可
deleteWhen:删除文件时间点,默认凌晨 4点
fileReservedTime:文件保留时间,默认 48 小时
brokerRole:broker角色,ASYNC_MASTER为主节点,SLAVE为从节点。
flushDiskType:刷盘类型,ASYNC_FLUSH为异步刷盘
listenPort:监听端口号,这是broker的端口号
namesrvAddr:namesrv的ip地址与端口号,多个namesrv之间使用’;‘分割。可以是内网ip,也可以是外网ip。如果是内网ip,只能内网的机器访问。
brokerIP1:这台机器的ip地址。
autoCreateTopicEnable:设置是否允许 Broker 自动创建Topic(主题),建议关闭,在集群监控管理后台创建即可。
autoCreateSubscriptionGroup:是否允许 Broker 自动创建订阅组,生成者与消费者需要绑定组,才能正常使用。建议关闭,在监控管理后台手动创建。
storePathRootDir:指定数据存储目录,需要自己创建这个目录,消息就是存储在这个目录下的,至于每个文件存储的是什么,就需要你去深入了解ReocketMQ了。
配置文件与脚本文件内容
broker配置文件
broker-a-m的配置文件
brokerClusterName=AuroraAdCluster
brokerName=aurora-broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10911
# 正式使用后改为内网ip
namesrvAddr=172.31.23.167:9876;172.31.18.81:9876
#本机器ip,正式使用后改为内网ip
brokerIP1=172.31.23.167
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#数据存储目录,需要在data目录下创建mqstore-broker-a目录
storePathRootDir=/data/mqstore-broker-a
broker-a-s的配置文件
brokerClusterName=AuroraAdCluster
brokerName=aurora-broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10921
# 正式使用后改为内网ip
namesrvAddr=172.31.23.167:9876;172.31.18.81:9876
#本机器ip,正式使用后改为内网ip
brokerIP1=172.31.18.81
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#数据存储目录
storePathRootDir=/data/mqstore-broker-a
broker-b-m的配置文件
brokerClusterName=AuroraAdCluster
brokerName=aurora-broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
# 正式使用后改为内网ip
namesrvAddr=172.31.23.167:9876;172.31.18.81:9876
#本机器ip,正式使用后改为内网ip
brokerIP1=172.31.18.81
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#数据存储目录
storePathRootDir=/data/mqstore-broker-b
broker-b-s的配置文件
brokerClusterName=AuroraAdCluster
brokerName=aurora-broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10921
# 正式使用后改为内网ip
namesrvAddr=172.31.23.167:9876;172.31.18.81:9876
#本机器ip,正式使用后改为内网ip
brokerIP1=172.31.23.167
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
#数据存储目录
storePathRootDir=/data/mqstore-broker-b
集群启动脚本文件
namesrv的启动脚本
#!/bin/sh
cd /data/install/rocketmq-all-4.5.1-bin-release
nohup sh bin/mqnamesrv > ./logs/namesrvrun.log &
broker-a-m的启动脚本
#!/bin/sh
cd /data/install/rocketmq-all-4.5.1-bin-release
#使用mqbroker启动broker服务
#-c conf/2m-2s-async/aurora-broker-a-m.properties 是指定使用哪个配置文件
nohup sh bin/mqbroker -c conf/2m-2s-async/aurora-broker-a-m.properties > ./logs/aurora-broker-a-m.log &
broker-a-s的启动脚本
#!/bin/sh
cd /data/install/rocketmq-all-4.5.1-bin-release
nohup sh bin/mqbroker \
-c conf/2m-2s-async/aurora-broker-a-s.properties \
> ./logs/aurora-broker-a-s.log &
broker-b-m的启动脚本
#!/bin/sh
cd /data/install/rocketmq-all-4.5.1-bin-release
nohup sh bin/mqbroker \
-c conf/2m-2s-async/aurora-broker-b-m.properties > \
./logs/aurora-broker-b-m.log &
broker-b-s的启动脚本
#!/bin/sh
cd /data/install/rocketmq-all-4.5.1-bin-release
nohup sh bin/mqbroker \
-c conf/2m-2s-async/aurora-broker-b-s.properties > \
./logs/aurora-broker-b-s.log &
集群状态监控与停止脚本文件
查看集群
#!/bin/sh
cd install/rocketmq-all-4.5.1-bin-release/
sh bin/mqadmin clusterList -n "172.31.23.167:9876;172.31.18.81:9876"
停止集群
#!/bin/sh
mq_path="/data/install/rocketmq-all-4.5.1-bin-release"
${mq_path}/bin/mqshutdown broker
${mq_path}/bin/mqshutdown namesrv
RocketMQ监控管理后台
肯定不能少了管理工具,就像redis有rdm,mongodb有Robo 3T一样,RocketMQ也提供了一个管理后台给我们监控服务状态,以及查询消息。但这需要我们自己下载源码,修改配置,然后打包部署。
从github上将项目克隆下来,“https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console”。
有空可以看下这个管理后台的代码,可以加深对RocketMQ的一些理解,还可以根据自己的需求加功能。便于集群的管理。比如增加消息堆积监控页面,消息查看页面添加Tag过滤功能等。当然,我也没有去看。
application.properties文件需要修改的地方,如下图画线的地方。
loginrequired=true配置需要登陆。那么需要修改用户配置文件,用户名密码是写死在配置文件中的。
用户名=密码。
#管理员(1是声明为管理员)
admin=123456,1
# 普通用户
jiuye=123456
lixuan=123456
打包成jar包部署,不需要添加额外依赖或插件,直接mvn打包就行。
或者直接用mvn命令。然后将jar部署到服务器上,开放相应端口即可,或者使用nginx 反向代理,这是服务部署的内容,如果还不会部署的话,那应该是刚入门java后端的同学。
一些疑问
1、消息消费完后会删除吗?
RocketMQ提供一个ack机制,要求消费者确认消息已经成功消费后返回一个确认状态,这时RocketMQ就会把这条消息移除,否则不会移除。要定期检查消息消费情况,处理消费失败的消息,避免消息堆积。
正常情况下,不会出现消息消费失败的情况,除非消息格式不对,或者由于系统异常引起的事务回滚。而如果是业务逻辑上的异常,比如订单过期,或者验证码失效(打比方)这种业务上的异常应该返回Ack确认消息消费成功。
2、我需要消息能够百分百发送成功怎么做?
RocketMQ提供一个重试机制,如下。
// 消息发送失败重试次数producer.setRetryTimesWhenSendFailed(3);
也可以自己写一个定时任务重试,在发送失败时将消息加入一个阻塞队列,由定时任务去重试发送。但是我觉得完全没有必要,如果是百分百不能发送失败的数据,那还不如将数据存在在数据库,再发条消息告诉消费者,你要消费的数据在表xxx,id是yyy。如果只是网络波动、使用重试策略就可以,但如果是整个服务器故障,这就没办法了,所以至少要部署一主一从。
我就使用缓存的方式,将数据缓存起来,生成一个唯一id,再告诉消费者,因为我不能选择因消息发送失败就拒绝请求响应404给调用方。这不像下单,下单失败了可以让用户重试,但这种请求失败了,就不会接收到第二次请求了。
3、为什么消费者启动没有异常,却收不到消息?
一是生产者和消费者需要绑定同一个组名,否则接收不到。
二是要确保是否配置了autoCreateSubscriptionGroup为true,如果为true,就需要自己先手动创建一个SubscriptionGroup,可以在监控管理后台创建。
4、使用外网IP还是内网IP?
肯定是优先选择内网,使用内网不需要担心数据安全问题,也不会受网络波动的影响。但如果要使用内网IP,就必须要保证所有消息生产者、消费者都在一个局域网内,比如我们使用的是亚马逊的服务器,不同区域之间的服务器是不同内网的,这种蛋疼情况还在想办法解决,比如服务器迁移到同一个区域。
5、使用内网IP时消息发送是正常的,但是换为外网IP之后就连接失败了?
安全组需要给两个broket、两个namesrv配置入网规则,使用内网也是要配置的,这里只是将内网IP改成外网IP。由于目前无法使用内网,我是先改用外网来部署,先让项目测试通过,后面再改回内网,这也是无奈之举。但就是这个问题,浪费我一下午时间去解决。就是安全组没有配好,可能是漏了某个IP,反正现在也是测试,我直接改成所有ip可访问就可以。
还有更多问题在等着你,比如消息的重复消费问题、消息的顺序消费问题、消息的堆积问题……
在使用上,我根据项目特点封装了一个通用模版(组件)。后续在使用过程中遇到什么问题再跟大家分享,就像之前的MongoDB一样。
总结
本篇是从入门者的角度,详细介绍一个集群的搭建过程,并给出一些入学者可能会有的疑问的解答。学习一个框架,最首要的事情就是要去学习它的工作原理。想要用好,还需要深入去研究,以及解决项目在使用这个中间件过程中产生的问题。
通过学习RocketMQ的集群原理,我知道,当一台服务器挂掉时,意味着一个namesrv挂了,此时会有一个30秒的空档期,就是这30秒内生产者无法发送消息给MQ,这是由于namesvr之间的心跳时间为30秒。所以,为保证数据的不丢失,就需要在消息发送失败的时候进行缓存重试,直到MQ的从节点变为主节点。
通过学习RocketMQ的集群原理,我知道什么需求情况下使用哪种集群方案。比如,是部署两主,还是一主一从,如果你土豪,请忽略。如果是配置两主,那么有一个主节点挂的时候,投放到这个主节点的消息就无法被消费,直到这个主节点重新启动。 我目前为项目选择一主一从方案。
每台机器都要部署一个namesrv服务,耗掉一些内存,再部署一个broker,现在没有购买新机器,因为不能增加花费,所以我是将Rocket MQ部署到了两台部署管理后台的机器上。 一是,管理后台是内部使用的,不占带宽,而消息中间件就吃带宽,刚好合理利用资源。二是,部署管理后台的两台机器都还有一个G多的内存可用。
通过学习RocketMQ保证消息可靠的实现原理,我知道了RocketMQ采取ACK确认方式来保证消息不丢失的。这借鉴了Http协议的ACK确保消息不丢包的原理。所以我知道,在消费消息的时候要回复消费状态告知RocketMQ。