RocketMQ入坑系列(一)角色介绍及基本使用
小编:管理员 220阅读 2022.08.03
从今天起,开始Rocket MQ系列。这位MQ经过了双11这种变态级别冲击的洗礼,有稳定出色的表现,作为我们这样的技术人员很有必要研究一下它。
先从以下几个方面对RocketMQ入个门:
- 消息队列介绍
- Rocket MQ介绍
- Rocket MQ安装使用
什么是「消息队列」呢?
队列,大家肯定都不陌生了,在数据结构中,它是一种先进先出的结构。
消息队列可以看成是一个容器,里面存放各种消息,这些消息代表业务上需要处理的动作。
消息队列是「分布式系统」中重要的组件,使用消息队列主要是为了「通过异步处理提高系统性能和削峰、降低系统耦合性」。
那么为什么要使用消息队列呢?
我在项目中遇到一种需求:后端抓取很多数据,抓取完成后需要进行解析处理,数据处理完成后要根据相应的规则发送给客户,最后持久化入库。
后端抓取完数据后需要提醒另外一个程序:数据我弄完了,你解析处理吧。在没有使用消息中间件的时候,需要保存一个「抓取完数据的消息」到数据库中,「解析程序需要不断的扫描数据库来确认消息是否抓取完」,然后再做后续操作。
这种就是串行化处理,数据处理上效率非常的低。
不使用消息队列
使用消息队列,「通过异步的方式提高了系统的性能,还降低了系统的耦合性」。
使用消息队列
消息队列具有很好的「削峰」作用的功能——即「通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务」。
现在的微服务架构中,每个服务几乎不存在耦合性,这两个应用之间可以使用消息中间件作为桥梁,进行消息沟通处理,也就是应用解耦合。
Rocket MQ基本概念消息模型 Message ModelRocketMQ主要由 「Producer」、「Broker」、「Consumer」 三部分组成,「其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息」。
Broker 在实际部署过程中对应一台服务器,「每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker」。
「Message Queue」 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。
消息生产者 Producer负责生产消息,「一般由业务系统负责生产消息」。
一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。
RocketMQ提供多种发送方式:
- 同步发送
- 异步发送
- 顺序发送
- 单向发送。
同步和异步方式均需要Broker返回确认信息,单向发送不需要。
消息消费者 Consumer负责消费消息,「一般是后台系统负责异步消费」。
一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。
从用户应用的角度而言提供了两种消费形式:拉取式(pull)消费、推动式(push)消费。
「拉取式消费」:主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
「推动式消费」:Broker收到数据后会主动推送给消费端,实时性较高。
主题 Topic表示「一类消息的集合」,「每个主题包含若干条消息,每条消息只能属于一个主题」,是RocketMQ进行消息订阅的基本单位。
代理服务器 Broker Server消息中转角色,负责「存储消息、转发消息」。
代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。
代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
名字服务 Name Server充当路由消息的提供者。
生产者或消费者能够通过Name Server查找各主题相应的Broker IP列表。
多个Namesrv实例组成集群,但相互独立,没有信息交换。
RocketMQ角色详解RocketMQ中的角色
broker- Broker面向producer和consumer接受和发送消息。
- 向nameserver提交自己的信息。
- 是消息中间件的消息存储、转发服务器。
- 每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报。
-
Broker高可用,可以配成Master/Slave结构,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave。
- 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master
- Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义BrokerId为0表示Master,非0表示Slave
-
Master多机负载,可以部署多个broker
- 每个Broker与nameserver集群中的所有节点建立长连接,定时注册Topic信息到所有nameserver。
- 消息的生产者
- 通过nameserver集群中的其中一个节点(随机选择)建立长连接,获得Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等
- 接下来向提供Topic服务的Master建立长连接,且定时向Master发送心跳
消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。
注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。
nameserver底层由netty实现,提供了路由管理、服务注册、服务发现的功能,是一个无状态节点
「nameserver是服务发现者」,集群中各个角色(producer、broker、consumer等)都需要定时向nameserver上报自己的状态,以便互相发现彼此,超时不上报的话,nameserver会把它从列表中剔除
「nameserver可以部署多个」,当多个nameserver存在的时候,其他角色同时向他们上报信息,以保证高可用,
「nameServer集群间互不通信」,没有主备的概念
「nameserver内存式存储」,nameserver中的broker、topic等信息默认不会持久化
「为什么不用zookeeper?」:rocketmq希望为了提高性能,CAP定理,客户端负载均衡。
使用RocketMQ安装RocketMQ理论知识BB完了,下面开始实操,从安装开始。
名称 |
版本号 |
---|---|
JDK |
11 |
Maven |
3.6.3 |
RocketMQ |
4.7.1 |
JDK和Maven的安装及环境变量配置就不赘述了,这里提一下maven的配置,打开 maven 的配置文件( MVN_HOME/conf/settings.xml ),在
复制aliyunmaven * 阿里云公共仓库 https://maven.aliyun.com/repository/public
这样下载jar包的时候速度快一点。
下载完RocketMQ源文件后,安装很简单:
unzip rocketmq-all-4.7.1-source-release.zip cd rocketmq-all-4.7.1-source-release mvn -Prelease-all -DskipTests clean install -U复制
上面命令执行完之后,在rocketmq-all-4.7.1-source-release目录下多出一个distribution目录:
安装好的RocketMQ就在distribution/target/rocketmq-4.7.1/rocketmq-4.7.1目录下:
cd distribution/target/rocketmq-4.7.1/rocketmq-4.7.1 && ll复制
RocketMQ目录
把它挪到习惯的位置:
cd distribution/target/rocketmq-4.7.1 mv rocketmq-4.7.1/ /usr/local/rocketmq复制
OK,安装完成了。
启动服务进入/usr/local/rocketmq/bin目录下,可以看到有很多可执行命令:
启动nameserver踩坑一:
[root@localhost bin]# ./mqnamesrv Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. Unrecognized VM option 'UseCMSCompactAtFullCollection' Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit.复制
报错了,这是由于RocketMQ的启动文件都是按照JDK8配置的,而前面我特意配置的JDK版本是11,有很多命令参数不支持导致的,使用JDK8,正常启动没有问题的。
「解决」
先看一下mqnamesrv.sh脚本的内容:
最后一行说明它需要执行runserver.sh
「修改runserver.sh」
环境变量部分:
export JAVA_HOME export JAVA="$JAVA_HOME/bin/java" export BASE_DIR=$(dirname $0)/.. # 加入RocketMQ的lib目录 # export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH} export CLASSPATH=.:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}复制
Java选项部分:
# 文件末尾 JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" #JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC" JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails" #JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages" #JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext" #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n" JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"复制
再次启动:
nameserver启动成功
nameserver成功启动,接下来启动broker。
启动broker踩坑二:
[root@localhost bin]# ./mqbroker [0.001s][warning][gc] -Xloggc is deprecated. Will use -Xlog:gc:/dev/shm/rmq_broker_gc_%p_%t.log instead. Unrecognized VM option 'PrintGCDateStamps' Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit.复制
看着和启动nameserver时报错一样,同样先来看一下mqbroker.sh:
说明需要执行runbroker.sh,修改runbroker.sh:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m" #JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_broker_gc_%p_%t.log -XX:+PrintGCDetails" #JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" #JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch" #JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g" JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages" #JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext" #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n" JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"复制
这里有一点需要说明的是需要修改一下jvm最大堆、最小堆以及新生代空间大小(虚拟机空间足够的可忽略)xms、xmx以及xmn。
修改完后再次启动:
测试发送消息使用tools.sh脚本测试消息的发送和接收。
「踩坑三」:
[root@localhost bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Producer -Djava.ext.dirs=./../lib:/usr/local/java/jre/lib/ext:/usr/local/java/lib/ext is not supported. Use -classpath instead. Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit.复制
这个就比较有经验了,和前面一样修改相关脚本。
修改tools.sh:
export JAVA_HOME export JAVA="$JAVA_HOME/bin/java" export BASE_DIR=$(dirname $0)/.. #export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH} export CLASSPATH=.:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH} #=========================================================================================== # JVM Configuration #=========================================================================================== JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m" #JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"复制
「踩坑四」:
再次执行tools.sh进行测试,报错:
java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:679) at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:509) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.tryToFindTopicPublishInfo(DefaultMQProducerImpl.java:693) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:557) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1289) at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:325) at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67) Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:394) at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1363) at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1353) at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:622) ... 7 more复制
这个报错原因是,找不到nameserver,在tools.sh脚本中添加如下内容:
export NAMESRV_ADDR=localhost:9876复制
「踩坑五」:
再次执行./tools.sh org.apache.rocketmq.example.quickstart.Producer,报错:
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicTest See http://rocketmq.apache.org/docs/faq/ for further details. at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:685) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343) at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1289) at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:325) at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67)复制
这个报错就和之前启动的broker有关系了,之前启动broker的时候没有连接到注册中心nameserver上,我们关掉broker,重新按照如下命令启动一下:
./mqbroker -n localhost:9876复制
broker启动起来之后,再次执行tools脚本进行测试,打印如下消息即表示服务测试成功:
SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9301103E1, offsetMsgId=C0A8026E00002A9F00000000000312FD, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=248] SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9301303E2, offsetMsgId=C0A8026E00002A9F00000000000313C8, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=248] SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9301603E3, offsetMsgId=C0A8026E00002A9F0000000000031493, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=248] SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9301A03E4, offsetMsgId=C0A8026E00002A9F000000000003155E, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=1], queueOffset=249] SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9301C03E5, offsetMsgId=C0A8026E00002A9F0000000000031629, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=249] SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9302103E6, offsetMsgId=C0A8026E00002A9F00000000000316F4, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=249] SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9302503E7, offsetMsgId=C0A8026E00002A9F00000000000317BF, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=249] 16:08:05.445 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true 16:08:05.458 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.2.110:10911] result: true复制测试消费消息
前面的实验已经成功发送了消息,现在来测试一下消费消息,同样借助于tools.sh:
./tools.sh org.apache.rocketmq.example.quickstart.Consumer复制
测试结果:
ConsumeMessageThread_16 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=203, queueOffset=200, sysFlag=0, bornTimestamp=1604909284622, bornHost=/192.168.2.110:45654, storeTimestamp=1604909284623, storeHost=/192.168.2.110:10911, msgId=C0A8026E00002A9F00000000000279F2, commitLogOffset=162290, bodyCRC=265672385, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1604909614601, UNIQ_KEY=C0A8026E222F4B85612C2CA92D0E0320, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 48, 48], transactionId='null'}]] ConsumeMessageThread_14 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=203, queueOffset=199, sysFlag=0, bornTimestamp=1604909284603, bornHost=/192.168.2.110:45654, storeTimestamp=1604909284604, storeHost=/192.168.2.110:10911, msgId=C0A8026E00002A9F00000000000276C6, commitLogOffset=161478, bodyCRC=1009291136, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1604909614601, UNIQ_KEY=C0A8026E222F4B85612C2CA92CFB031C, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 57, 54], transactionId='null'}]] ConsumeMessageThread_9 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=203, queueOffset=197, sysFlag=0, bornTimestamp=1604909284567, bornHost=/192.168.2.110:45654, storeTimestamp=1604909284569, storeHost=/192.168.2.110:10911, msgId=C0A8026E00002A9F000000000002706E, commitLogOffset=159854, bodyCRC=1116443590, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1604909614600, UNIQ_KEY=C0A8026E222F4B85612C2CA92CD70314, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 56, 56], transactionId='null'}]] ConsumeMessageThread_19 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=203, queueOffset=196, sysFlag=0, bornTimestamp=1604909284550, bornHost=/192.168.2.110:45654, storeTimestamp=1604909284552, storeHost=/192.168.2.110:10911, msgId=C0A8026E00002A9F0000000000026D42, commitLogOffset=159042, bodyCRC=1262346221, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1604909614600, UNIQ_KEY=C0A8026E222F4B85612C2CA92CC60310, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 56, 52], transactionId='null'}]]复制
验证成功。
相关推荐
- 【RocketMQ系列】RocketMQ集群,RocketMQ-on-DLedger集群 本文RocketMQ系列第四篇,主要介绍RocketMQ集群及如何部署自动容灾切换的 RocketMQ-on-DLedger Group。RocketMQ集群搭建ROcketMQ集群搭建有以下几种方案:「单Master模式」「多Master模式」「多Master多Slave模式-异步复制」「多Master多Slave模式-同步双写」其…
- 3DMAX提示和技巧 本主题标识使用 Civil View 的一些重要提示和技巧。常规使用屏幕分辨率至少为 1280x1024 的 Civil View。低于此分辨率时,一些面板将占用过多屏幕空间。 将视口设置为线框显示以达到最佳性能。 要尽可能简化用户界面,请在单个视口中工作并关闭 3ds Max 命令面…