RocketMQ入坑系列(一)角色介绍及基本使用

小编:管理员 220阅读 2022.08.03

从今天起,开始Rocket MQ系列。这位MQ经过了双11这种变态级别冲击的洗礼,有稳定出色的表现,作为我们这样的技术人员很有必要研究一下它。

先从以下几个方面对RocketMQ入个门:

  • 消息队列介绍
  • Rocket MQ介绍
  • Rocket MQ安装使用
消息队列

什么是「消息队列」呢?

队列,大家肯定都不陌生了,在数据结构中,它是一种先进先出的结构。

消息队列可以看成是一个容器,里面存放各种消息,这些消息代表业务上需要处理的动作。

消息队列是「分布式系统」中重要的组件,使用消息队列主要是为了「通过异步处理提高系统性能和削峰、降低系统耦合性」

那么为什么要使用消息队列呢?

我在项目中遇到一种需求:后端抓取很多数据,抓取完成后需要进行解析处理,数据处理完成后要根据相应的规则发送给客户,最后持久化入库。

后端抓取完数据后需要提醒另外一个程序:数据我弄完了,你解析处理吧。在没有使用消息中间件的时候,需要保存一个「抓取完数据的消息」到数据库中,「解析程序需要不断的扫描数据库来确认消息是否抓取完」,然后再做后续操作。

这种就是串行化处理,数据处理上效率非常的低。

不使用消息队列

使用消息队列,「通过异步的方式提高了系统的性能,还降低了系统的耦合性」

使用消息队列

消息队列具有很好的「削峰」作用的功能——即「通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务」

现在的微服务架构中,每个服务几乎不存在耦合性,这两个应用之间可以使用消息中间件作为桥梁,进行消息沟通处理,也就是应用解耦合。

Rocket MQ基本概念消息模型 Message Model

RocketMQ主要由 「Producer」「Broker」「Consumer」 三部分组成,「其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息」

Broker 在实际部署过程中对应一台服务器,「每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker」

「Message Queue」 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。

消息生产者 Producer

负责生产消息,「一般由业务系统负责生产消息」

一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。

RocketMQ提供多种发送方式:

  • 同步发送
  • 异步发送
  • 顺序发送
  • 单向发送。

同步和异步方式均需要Broker返回确认信息,单向发送不需要。

消息消费者 Consumer

负责消费消息,「一般是后台系统负责异步消费」

一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。

从用户应用的角度而言提供了两种消费形式:拉取式(pull)消费、推动式(push)消费。

「拉取式消费」:主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

「推动式消费」:Broker收到数据后会主动推送给消费端,实时性较高。

主题 Topic

表示「一类消息的集合」「每个主题包含若干条消息,每条消息只能属于一个主题」,是RocketMQ进行消息订阅的基本单位。

代理服务器 Broker Server

消息中转角色,负责「存储消息、转发消息」

代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。

代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

名字服务 Name Server

充当路由消息的提供者。

生产者或消费者能够通过Name Server查找各主题相应的Broker IP列表。

多个Namesrv实例组成集群,但相互独立,没有信息交换。

RocketMQ角色详解

RocketMQ中的角色

broker
  • Broker面向producer和consumer接受和发送消息。
  • 向nameserver提交自己的信息。
  • 是消息中间件的消息存储、转发服务器。
  • 每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报。
broker集群
  • Broker高可用,可以配成Master/Slave结构,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave。
    • 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master
    • Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义BrokerId为0表示Master,非0表示Slave
  • Master多机负载,可以部署多个broker
    • 每个Broker与nameserver集群中的所有节点建立长连接,定时注册Topic信息到所有nameserver。
producer
  • 消息的生产者
  • 通过nameserver集群中的其中一个节点(随机选择)建立长连接,获得Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等
  • 接下来向提供Topic服务的Master建立长连接,且定时向Master发送心跳
consumer

消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。

注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。

nameserver

底层由netty实现,提供了路由管理、服务注册、服务发现的功能,是一个无状态节点

「nameserver是服务发现者」,集群中各个角色(producer、broker、consumer等)都需要定时向nameserver上报自己的状态,以便互相发现彼此,超时不上报的话,nameserver会把它从列表中剔除

「nameserver可以部署多个」,当多个nameserver存在的时候,其他角色同时向他们上报信息,以保证高可用,

「nameServer集群间互不通信」,没有主备的概念

「nameserver内存式存储」,nameserver中的broker、topic等信息默认不会持久化

「为什么不用zookeeper?」:rocketmq希望为了提高性能,CAP定理,客户端负载均衡。

使用RocketMQ安装RocketMQ

理论知识BB完了,下面开始实操,从安装开始。

名称

版本号

JDK

11

Maven

3.6.3

RocketMQ

4.7.1

JDK和Maven的安装及环境变量配置就不赘述了,这里提一下maven的配置,打开 maven 的配置文件( MVN_HOME/conf/settings.xml ),在标签中添加 mirror 子节点:


  aliyunmaven
  *
  阿里云公共仓库
  https://maven.aliyun.com/repository/public
复制

这样下载jar包的时候速度快一点。

下载完RocketMQ源文件后,安装很简单:

unzip rocketmq-all-4.7.1-source-release.zip

cd rocketmq-all-4.7.1-source-release

mvn -Prelease-all -DskipTests clean install -U
复制

上面命令执行完之后,在rocketmq-all-4.7.1-source-release目录下多出一个distribution目录:

安装好的RocketMQ就在distribution/target/rocketmq-4.7.1/rocketmq-4.7.1目录下:

cd distribution/target/rocketmq-4.7.1/rocketmq-4.7.1 && ll
复制

RocketMQ目录

把它挪到习惯的位置:

cd distribution/target/rocketmq-4.7.1

mv rocketmq-4.7.1/ /usr/local/rocketmq
复制

OK,安装完成了。

启动服务

进入/usr/local/rocketmq/bin目录下,可以看到有很多可执行命令:

启动nameserver

踩坑一:

[root@localhost bin]# ./mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Unrecognized VM option 'UseCMSCompactAtFullCollection'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
复制

报错了,这是由于RocketMQ的启动文件都是按照JDK8配置的,而前面我特意配置的JDK版本是11,有很多命令参数不支持导致的,使用JDK8,正常启动没有问题的。

「解决」

先看一下mqnamesrv.sh脚本的内容:

最后一行说明它需要执行runserver.sh

「修改runserver.sh」

环境变量部分:

export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
# 加入RocketMQ的lib目录
# export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
export CLASSPATH=.:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}
复制

Java选项部分:

# 文件末尾
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
#JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8  -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
#JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
#JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
复制

再次启动:

nameserver启动成功

nameserver成功启动,接下来启动broker。

启动broker

踩坑二:

[root@localhost bin]# ./mqbroker
[0.001s][warning][gc] -Xloggc is deprecated. Will use -Xlog:gc:/dev/shm/rmq_broker_gc_%p_%t.log instead.
Unrecognized VM option 'PrintGCDateStamps'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
复制

看着和启动nameserver时报错一样,同样先来看一下mqbroker.sh:

说明需要执行runbroker.sh,修改runbroker.sh:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
#JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_broker_gc_%p_%t.log -XX:+PrintGCDetails"
#JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
#JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
#JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
#JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
复制

这里有一点需要说明的是需要修改一下jvm最大堆、最小堆以及新生代空间大小(虚拟机空间足够的可忽略)xms、xmx以及xmn。

修改完后再次启动:

测试发送消息

使用tools.sh脚本测试消息的发送和接收。

「踩坑三」

[root@localhost bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Producer
-Djava.ext.dirs=./../lib:/usr/local/java/jre/lib/ext:/usr/local/java/lib/ext is not supported.  Use -classpath instead.
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
复制

这个就比较有经验了,和前面一样修改相关脚本。

修改tools.sh:

export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
#export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
export CLASSPATH=.:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}

#===========================================================================================
# JVM Configuration
#===========================================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
#JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
复制

「踩坑四」

再次执行tools.sh进行测试,报错:

java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed
 at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:679)
 at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:509)
 at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.tryToFindTopicPublishInfo(DefaultMQProducerImpl.java:693)
 at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:557)
 at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343)
 at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1289)
 at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:325)
 at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67)
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed
 at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:394)
 at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1363)
 at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1353)
 at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:622)
 ... 7 more
复制

这个报错原因是,找不到nameserver,在tools.sh脚本中添加如下内容:

export NAMESRV_ADDR=localhost:9876
复制

「踩坑五」

再次执行./tools.sh org.apache.rocketmq.example.quickstart.Producer,报错:

org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicTest
See http://rocketmq.apache.org/docs/faq/ for further details.
 at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:685)
 at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343)
 at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1289)
 at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:325)
 at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67)
复制

这个报错就和之前启动的broker有关系了,之前启动broker的时候没有连接到注册中心nameserver上,我们关掉broker,重新按照如下命令启动一下:

./mqbroker -n localhost:9876
复制

broker启动起来之后,再次执行tools脚本进行测试,打印如下消息即表示服务测试成功:

SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9301103E1, offsetMsgId=C0A8026E00002A9F00000000000312FD, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=248]
SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9301303E2, offsetMsgId=C0A8026E00002A9F00000000000313C8, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=248]
SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9301603E3, offsetMsgId=C0A8026E00002A9F0000000000031493, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=248]
SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9301A03E4, offsetMsgId=C0A8026E00002A9F000000000003155E, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=1], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9301C03E5, offsetMsgId=C0A8026E00002A9F0000000000031629, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9302103E6, offsetMsgId=C0A8026E00002A9F00000000000316F4, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9302503E7, offsetMsgId=C0A8026E00002A9F00000000000317BF, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=249]
16:08:05.445 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
16:08:05.458 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.2.110:10911] result: true
复制测试消费消息

前面的实验已经成功发送了消息,现在来测试一下消费消息,同样借助于tools.sh:

./tools.sh org.apache.rocketmq.example.quickstart.Consumer
复制

测试结果:

ConsumeMessageThread_16 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=203, queueOffset=200, sysFlag=0, bornTimestamp=1604909284622, bornHost=/192.168.2.110:45654, storeTimestamp=1604909284623, storeHost=/192.168.2.110:10911, msgId=C0A8026E00002A9F00000000000279F2, commitLogOffset=162290, bodyCRC=265672385, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1604909614601, UNIQ_KEY=C0A8026E222F4B85612C2CA92D0E0320, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 48, 48], transactionId='null'}]] 
ConsumeMessageThread_14 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=203, queueOffset=199, sysFlag=0, bornTimestamp=1604909284603, bornHost=/192.168.2.110:45654, storeTimestamp=1604909284604, storeHost=/192.168.2.110:10911, msgId=C0A8026E00002A9F00000000000276C6, commitLogOffset=161478, bodyCRC=1009291136, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1604909614601, UNIQ_KEY=C0A8026E222F4B85612C2CA92CFB031C, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 57, 54], transactionId='null'}]] 
ConsumeMessageThread_9 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=203, queueOffset=197, sysFlag=0, bornTimestamp=1604909284567, bornHost=/192.168.2.110:45654, storeTimestamp=1604909284569, storeHost=/192.168.2.110:10911, msgId=C0A8026E00002A9F000000000002706E, commitLogOffset=159854, bodyCRC=1116443590, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1604909614600, UNIQ_KEY=C0A8026E222F4B85612C2CA92CD70314, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 56, 56], transactionId='null'}]] 
ConsumeMessageThread_19 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=203, queueOffset=196, sysFlag=0, bornTimestamp=1604909284550, bornHost=/192.168.2.110:45654, storeTimestamp=1604909284552, storeHost=/192.168.2.110:10911, msgId=C0A8026E00002A9F0000000000026D42, commitLogOffset=159042, bodyCRC=1262346221, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1604909614600, UNIQ_KEY=C0A8026E222F4B85612C2CA92CC60310, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 56, 52], transactionId='null'}]]
复制

验证成功。


关联标签: