互联网架构 软件架构-rocketmq之实践(62)

小编:管理员 248阅读 2022.08.02

上次完成rocketMq的安装,并测试了发送和接收,这次简单的利用rocketMq的源码连接rocketMq集群。 源码:https://github.com/limingios/netFuture/tree/master/jms

(一)broker的properties配置文件
  • 启动2m-2s-sync的rocket集群
  • 配置文件

上节直接用了我提供的properties文件,并没有详细解释每个字段的含义这次详细说下。

  • broker参数

参数名

默认值

描述

listenPort

10911

broker的服务端口号,作为对producer和consumer使用服务的端口号

namesrvAddr

null

namesrv的ip地址。格式: ip:port;ip:port

brokerIP1

本机IP

broker所在的机器ip,默认不用设置,如果机器有多个网卡,需要手动设置

brokerName

本机主机名

作用为一组master与slave通过brokerName是否相同来标示,通过brokerId来区分master还是slave brokerClusterName DefaultCluster 整个broker集群的名字,创建topic时需要指定。

brokerId

0

0:master 非0:slave

storePathCommitLog

$HOME/store/commitlog/

commitLog存储路径

storePathConsumerQueue

$HOME/store/consumequeue/

消费队列存储路径

mapedFileSizeCommitLog

1024 * 1024 * 1024(1G)

commitLog每个文件的大小,默认1G

deleteWhen

4

删除文件时间点,默认凌晨 4点

fileReservedTime

72

文件保留时间,默认72小时.

brokerRole

ASYNC_MASTER

Broker 的角色ASYNCMASTER 异步复制Master SYNCMASTER 同步双写Master SLAVE

flushDiskType

ASYNC_FLUSH

刷盘方式 ASYNCFLUSH 异步刷盘 SYNCFLUSH 同步刷盘

defaultTopicQueueNums

4

在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。

autoCreateTopicEnable

true

是否自动创建topic。

autoCreateSubscriptionGroup

true

是否允许Broker自动创建订阅组,建议线下开启,线上关闭

rejectTransactionMessage

false

是否拒绝事务消息接入

fetchNamesrvAddrByAddressServer

false

是否从web服务器获取Name Server地址,针对大规模的Broker集群建议使用这种方式

storePathIndex

$HOME/store/index

消息索引存储路径

storeCheckpoint

$HOME/store/checkpoint

checkpoint文件存储路径

abortFile

$HOME/store/abort

abort文件存储路径

maxTransferBytesOnMessageInMemory

262144

单次Pull消息(内存)传输的最大字节数

maxTransferCountOnMessageInMemory

32

单次Pull消息(内存)传输的最大条数

maxTransferBytesOnMessageInDisk

65536

单次Pull消息(磁盘)传输的最大字节数

maxTransferCountOnMessageInDisk

8

单次Pull消息(磁盘)传输的最大条数

messageIndexEnable

true

是否开启消息索引功能

messageIndexSafe

false

是否提供安全的消息索引机制,索引保证不丢

haMasterAddress

在Slave上直接设置Master地址,默认从Name Server上自动获取,也可以手工强制配置

cleanFileForciblyEnable

true

磁盘满、且无过期文件情况下 TRUE 表示强制删除文件,优先保证服务可用 FALSE 标记服务不可用,文件不删除

Consumer

参数名

默认值

描述

:------:

:--------:

:-------:

namesrvAddr

Name Server地址列表,多个NameServer地址用分号隔开

clientIP

本机IP

客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定

instanceName

DEFAULT

客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等)

clientCallbackExecutorThreads

4

通信层异步回调线程数

pollNameServerInteval

30000

轮询Name Server间隔时间,单位毫秒

heartbeatBrokerInterval

30000

向Broker发送心跳间隔时间,单位毫秒

persistConsumerOffsetInterval

5000

持久化Consumer消费进度间隔时间,单位毫秒

  • Consumer

参数名默认值描述:------::--------::-------:namesrvAddr Name Server地址列表,多个NameServer地址用分号隔开clientIP本机IP客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定instanceNameDEFAULT客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等)clientCallbackExecutorThreads4通信层异步回调线程数pollNameServerInteval30000轮询Name Server间隔时间,单位毫秒heartbeatBrokerInterval30000向Broker发送心跳间隔时间,单位毫秒persistConsumerOffsetInterval5000持久化Consumer消费进度间隔时间,单位毫秒

  • Producer参数

Producer配置

参数名

默认值

描述

producerGroup

DEFAULT_PRODUCER

Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组

createTopicKey

TBW102

在发送消息时,自动创建服务器不存在的topic,需要指定Key。

defaultTopicQueueNums

4

在发送消息时,自动创建服务器不存在的topic,默认创建的队列数

sendMsgTimeout

10000

发送消息超时时间,单位毫秒

compressMsgBodyOverHowmuch

4096

消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节

retryAnotherBrokerWhenNotStoreOK

FALSE

如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送

maxMessageSize

131072

客户端限制的消息大小,超过报错,同时服务端也会限制

transactionCheckListener

事务消息回查监听器,如果发送事务消息,必须设置

checkThreadPoolMinSize

1

Broker回查Producer事务状态时,线程池大小

checkThreadPoolMaxSize

Broker回查Producer事务状态时,线程池大小

checkRequestHoldMax

2000

Broker回查Producer事务状态时,Producer本地缓冲请求队列大小

Push Consumer配置

参数名

默认值

描述

consumerGroup

DEFAULT_CONSUMER

Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组

messageModel

CLUSTERING

消息模型,支持以下两种1、集群消费(CLSUTER)2、广播消费(BROADCASTING)

consumeFromWhere

CONSUMEFROMLAST_OFFSET

Consumer启动后,默认从什么位置开始消费1、CONSUMEFROMLASTOFFSET:默认策略,从该队列最尾开始消费,即跳过历史消息2、CONSUMEFROMFIRSTOFFSET:从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍3、CONSUMEFROMTIMESTAMP:从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前

allocateMessageQueueStrategy

AllocateMessageQueueAveragely

Rebalance算法实现策略

subscription

{}

订阅关系

messageListener

消息监听器

offsetStore

消费进度存储

consumeThreadMin

10

消费线程池数量

consumeThreadMax

20

消费线程池数量

consumeConcurrentlyMaxSpan

2000

单队列并行消费允许的最大跨度

pullThresholdForQueue

1000

拉消息本地队列缓存消息最大数

pullInterval

0

拉消息间隔,由于是长轮询,所以为0,但是如果应用为了流控,也可以设置大于0的值,单位毫秒

consumeMessageBatchMaxSize

1

批量消费,一次消费多少条消息

pullBatchSize

32

批量拉消息,一次最多拉多少条

Pull Consumer配置

参数名

默认值

描述

consumerGroup

DEFAULT_CONSUMER

Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组

brokerSuspendMaxTimeMillis

20000

长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒

consumerTimeoutMillisWhenSuspend

30000

长轮询,Consumer拉消息请求在Broker挂起超过指定时间,客户端认为超时,单位毫秒

consumerPullTimeoutMillis

10000

非长轮询,拉消息超时时间,单位毫秒

messageModel

BROADCASTING

消息模型,支持以下两种1、集群消费2、广播消费

messageQueueListener

监听队列变化

offsetStore

消费进度存储

registerTopics

[]

注册的topic集合

allocateMessageQueueStrategy

AllocateMessageQueueAveragely

Rebalance算法实现策略

  • Meesage数据结构

Message数据结构各个字段都可以通过get、set方式访问,例如访问topic: msg.getTopic(); msg.setTopic("test");

字段名

默认值

必填

说明

Topic

null

true

线下环境不需要申请,线上环境需要申请后才能使用

Body

null

true

二进制形式,序列化由应用决定,Producer与Consumer要协商好序列化形式。

Tags

null

false

类似于Gmail为每封邮件设置的标签,方便服务器过滤使用。目前只支持每个消息设置一个tag,所以也可以类比为Notify的MessageType概念。

Keys

null

false

代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以再Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品ID等。

Flag

0

false

完全由应用来设置,RocketMQ不做敢于。

DelayTimeLevel

0

false

消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费。

WaitStoreMsgOK

TRUE

false

表示消息是否在服务器罗盘后才返回应答。

(二)源码测试
  • 连接集群

修改这2个文件:Producer 和 Consumer

Producer

Consumer

  • 部署rocketmq-console

GitHub地址:https://github.com/apache/rocketmq-externals

添加192.168.89.100:9876;192.168.89.101:9876

(三)流程梳理

生产者流程

  1. 生产者首先需要设置namesrv,或者指定其他方式更新namesrv。
  2. 从namesrv获取topic的路由信息,路由信息包括broker以及Message Queue等信息,同时将路由信息保存在本地内存中,方便下次使用。
  3. 从Message Queue列表中选择合适的Queue发送消息,实现负载均衡。

消费者流程

  1. namesrv告诉消费者,他从broker中获取消息。
  2. 获取完之后开始消费。
  • RocketMq高可用

特点:master挂了之后角色不会做切换(slave不会成为master)(商用版本的这种情况不知) 然后master和slave需要制定。

发送消息

存储消息

接受消息

停掉一个namesrv

不受影响

不受影响

不受影响

停全部的namesrv

影响

不受影响

影响

停单个master broker

不受影响

受影响(很小)

不影响

停全部master broker

影响

影响

影响

停全部salve broker

不影响

不影响

不影响

恢复任意master broker

不受影响

受影响(很小)

受影响(很小)

  • Rocketmq文档参考

源码:jm下有文档

PS:说了rocketmq的概念的东西,下次重点说说rocketMq在双11是如何做到的抗压,我听过一次公开课,稍后总结下,分享给各位老铁。

关联标签: