导入MD文档图片 RocketMQ集群Broker高可用测试
小编:管理员 143阅读 2022.08.03
ip地址 |
主机名 |
操作系统版本 |
RocketMQ版本 |
JDK版本 |
maven版本 |
备注 |
---|---|---|---|---|---|---|
172.16.7.91 |
nameserver01 |
centos 7.6 |
4.8.0 |
1.8.0_291 |
3.6 |
Name Server集群 |
172.16.7.92 |
nameserver03 |
centos 7.6 |
4.8.0 |
1.8.0_291 |
3.6 |
Name Server集群 |
172.16.7.93 |
master01 |
centos 7.6 |
4.8.0 |
1.8.0_291 |
3.6 |
Broker集群1 |
172.16.7.94 |
slave01 |
centos 7.6 |
4.8.0 |
1.8.0_291 |
3.6 |
Broker集群1 |
172.16.7.95 |
master02 |
centos 7.6 |
4.8.0 |
1.8.0_291 |
3.6 |
Broker集群2 |
172.16.7.96 |
slave02 |
centos 7.6 |
4.8.0 |
1.8.0_291 |
3.6 |
Broker集群2 |
[root@master01 ~]# init 6 [root@master01 ~]# cd /root/logs/rocketmqlogs/ [root@master01 rocketmqlogs]# rm -rf * [root@master01 rocketmqlogs]# cd /root/store/ [root@master01 store]# rm -rf *复制
以master01为例,首先停止所有rocketmq进程,然后删除日志和存储信息。所有服务器都执行该操作。
2.测试前集群查看启动各节点服务,查看集群状态
3.新建topic新增主题topic_broker_test
主题配置如下:
查看新增的主题
4.新建订阅组新建订阅组group_broker_test
配置如下:
查看新建的订阅组
5.producer代码package com.my.maven.rocketmq; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Producer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("group_broker_test"); // Specify name server addresses. producer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876"); producer.setRetryTimesWhenSendAsyncFailed(2); //Launch the instance. producer.start(); for (int i = 0; i < 10000; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("topic_broker_test" /* Topic */, "TagA" /* Tag */, ("Broker HA Test" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }复制
使用循环方式产生多条消息
6.consumer代码package com.my.maven.rocketmq; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "group_broker_test"); consumer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876"); consumer.subscribe("topic_broker_test", "TagA || tagB"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List复制四、消息发送高可用测试1.broker-a master重启msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("topic_broker_test")) { if (msg.getTags() != null && msg.getTags().equals("tagA")) { // 获取消息体 String message = new String(msg.getBody()); System.out.println("receive tagA message:" + message); } else if (msg.getTags() != null && msg.getTags().equals("tagB")) { // 获取消息体 String message = new String(msg.getBody()); System.out.println("receive tagB message:" + message); } } // 成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
[root@master01 rocketmq]# init 6复制
主机名 |
状态 |
---|---|
broker-a master |
发送时重启 |
broker-a slave |
正常运行 |
broker-b master |
正常运行 |
broker-b slave |
正常运行 |
发送5000条消息,在消息发送的同时关闭broker-a master
消息发送会暂停,一共发送了153条
结论:消息发送时如果有master宕机,则消息发送会终止,主机起来后消息也不会继续发送。
2.broker-a slave重启[root@slave01 rocketmq]# init 6复制
主机名 |
状态 |
---|---|
broker-a master |
宕机 |
broker-a slave |
发送时重启 |
broker-b master |
正常运行 |
broker-b slave |
正常运行 |
发送5000条消息,在发送过程中同时重启broker-a slave
消息发送会暂停,一共发送了339条
结论:消息发送时如果有slave宕机,则消息发送会终止,主机起来后消息也不会继续发送。
3.所有 slave关机
主机名 |
状态 |
---|---|
broker-a master |
正常运行 |
broker-a slave |
宕机 |
broker-b master |
正常运行 |
broker-b slave |
宕机 |
[root@slave01 rocketmq]# init 0 [root@slave02 rocketmq]# init 0复制
发送5000条消息,在发送过程中同时关闭所有的slave
消息发送会暂停,一共发送了401条,这也验证了上面的结论:消息发送时如果有slave宕机,则消息发送会终止
保持两个slave宕机状态,继续发送5000条消息
console显示消息记录数为5000条
结论:slave都宕机不影响消息发送。
4.broker-b master关机
主机名 |
状态 |
---|---|
broker-a master |
正常运行 |
broker-a slave |
宕机 |
broker-b master |
宕机 |
broker-b slave |
宕机 |
[root@master02 rocketmq]# init 0复制
发送5000条测试消息,发送前broker-b master关机,只保留broker-a master运行
console显示发送了5000条消息
结论:集群只有一台master消息发送正常。
5.关闭所有的master
主机名 |
状态 |
---|---|
broker-a master |
宕机 |
broker-a slave |
正常运行 |
broker-b master |
宕机 |
broker-b slave |
正常运行 |
关闭所有的master,启动所有的slave,发送5000条消息
消息发送前:
消息发送:
console报错,消息无法发送
结论:master都宕机消息无法正常发送。
五、消息消费高可用测试在消息消费高可用测试前先清空消息,然后发送1万条消息
1.broker-a master关机[root@master01 rocketmq]# init 0复制
在消息消费时将broker-a master关机
主机名 |
状态 |
---|---|
broker-a master |
消费时关机 |
broker-a slave |
正常运行 |
broker-b master |
正常运行 |
broker-b slave |
正常运行 |
消费刚发送的1万条消息,消费过程中将broker-a master关机
console日志显示消息消费了1万条
结论:某台master宕机不影响消息消费。
2.broker-a slave关机先发送1万条消息,然后消费,消费过程中broker-a slave关机
[root@slave01 rocketmq]# init 0复制
dashboard的消费统计不是很准确,以eclipse的console日志为准。
主机名 |
状态 |
---|---|
broker-a master |
消费时关机 |
broker-a slave |
消费时关机 |
broker-b master |
正常运行 |
broker-b slave |
正常运行 |
消费刚发送的1万条消息,消费过程中将broker-a slave关机
console显示消费了1万条
结论:某台slave宕机不影响消息消费
3.所有slave关机先发送1万条消息,然后消费,消费过程中broker-b slave关机
[root@slave02 rocketmq]# init 0复制
主机名 |
状态 |
---|---|
broker-a master |
消费时关机 |
broker-a slave |
消费时关机 |
broker-b master |
正常运行 |
broker-b slave |
消费时关机 |
消费刚发送的1万条消息,消费过程中将broker-b slave关机
console消费1万条消息
结论:slave都宕机不影响消息消费
4.所有master关机拉起broker-a slave或者broker-b slave,保持broker-b master开机状态,发送1万条消息,再将所有master关机最后消费
主机名 |
状态 |
---|---|
broker-a master |
关机 |
broker-a slave |
关机 |
broker-b master |
关机 |
broker-b slave |
正常运行 |
消费刚发送的1万条消息
console显示消费了1w条记录
结论:master都宕机不影响消息发送
5.所有slave关机拉起broker-a master或者broker-b master,关闭所有的slave,发送1万条消息,然后消费
主机名 |
状态 |
---|---|
broker-a master |
正常运行 |
broker-a slave |
关机 |
broker-b master |
关机 |
broker-b slave |
关机 |
消费刚发送的1万条消息
console显示消息全部被消费
结论:slave都宕机不影响消息消费
六、测试总结- 1.消息发送过程中只要有任意一台master或者slave宕机则发送程序暂停;
- 2.消息发送前slave都宕机不影响消息发送;
- 3.master或者slave都宕机不影响消息消费;
- 4.为保证消息正常的收发,集群最小配置为必需要有一台master主机;
本文所有代码和配置文件已上传github:RocketMQ_Broker_HA_Test
单机版RocketMQ搭建详见:Centos7.6搭建RocketMQ4.8全纪录
集群版RocketMQ搭建详见: RocketMQ4.8集群搭建全纪录
集群启停详见: RocketMQ集群启停手册
集群消息收发测试: RocketMQ集群消息收发测试全纪录
NameServer高可用测试:RocketMQ集群NameServer高可用测试
相关推荐
- 【RocketMQ系列】RocketMQ集群,RocketMQ-on-DLedger集群 本文RocketMQ系列第四篇,主要介绍RocketMQ集群及如何部署自动容灾切换的 RocketMQ-on-DLedger Group。RocketMQ集群搭建ROcketMQ集群搭建有以下几种方案:「单Master模式」「多Master模式」「多Master多Slave模式-异步复制」「多Master多Slave模式-同步双写」其…
- 3DMAX提示和技巧 本主题标识使用 Civil View 的一些重要提示和技巧。常规使用屏幕分辨率至少为 1280x1024 的 Civil View。低于此分辨率时,一些面板将占用过多屏幕空间。 将视口设置为线框显示以达到最佳性能。 要尽可能简化用户界面,请在单个视口中工作并关闭 3ds Max 命令面…