互联网架构 软件架构-rocketmq之实践(62)
小编:管理员 248阅读 2022.08.02
(一)broker的properties配置文件上次完成rocketMq的安装,并测试了发送和接收,这次简单的利用rocketMq的源码连接rocketMq集群。 源码:https://github.com/limingios/netFuture/tree/master/jms
- 启动2m-2s-sync的rocket集群
- 配置文件
上节直接用了我提供的properties文件,并没有详细解释每个字段的含义这次详细说下。
- broker参数
参数名 |
默认值 |
描述 |
---|---|---|
listenPort |
10911 |
broker的服务端口号,作为对producer和consumer使用服务的端口号 |
namesrvAddr |
null |
namesrv的ip地址。格式: ip:port;ip:port |
brokerIP1 |
本机IP |
broker所在的机器ip,默认不用设置,如果机器有多个网卡,需要手动设置 |
brokerName |
本机主机名 |
作用为一组master与slave通过brokerName是否相同来标示,通过brokerId来区分master还是slave brokerClusterName DefaultCluster 整个broker集群的名字,创建topic时需要指定。 |
brokerId |
0 |
0:master 非0:slave |
storePathCommitLog |
$HOME/store/commitlog/ |
commitLog存储路径 |
storePathConsumerQueue |
$HOME/store/consumequeue/ |
消费队列存储路径 |
mapedFileSizeCommitLog |
1024 * 1024 * 1024(1G) |
commitLog每个文件的大小,默认1G |
deleteWhen |
4 |
删除文件时间点,默认凌晨 4点 |
fileReservedTime |
72 |
文件保留时间,默认72小时. |
brokerRole |
ASYNC_MASTER |
Broker 的角色ASYNCMASTER 异步复制Master SYNCMASTER 同步双写Master SLAVE |
flushDiskType |
ASYNC_FLUSH |
刷盘方式 ASYNCFLUSH 异步刷盘 SYNCFLUSH 同步刷盘 |
defaultTopicQueueNums |
4 |
在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。 |
autoCreateTopicEnable |
true |
是否自动创建topic。 |
autoCreateSubscriptionGroup |
true |
是否允许Broker自动创建订阅组,建议线下开启,线上关闭 |
rejectTransactionMessage |
false |
是否拒绝事务消息接入 |
fetchNamesrvAddrByAddressServer |
false |
是否从web服务器获取Name Server地址,针对大规模的Broker集群建议使用这种方式 |
storePathIndex |
$HOME/store/index |
消息索引存储路径 |
storeCheckpoint |
$HOME/store/checkpoint |
checkpoint文件存储路径 |
abortFile |
$HOME/store/abort |
abort文件存储路径 |
maxTransferBytesOnMessageInMemory |
262144 |
单次Pull消息(内存)传输的最大字节数 |
maxTransferCountOnMessageInMemory |
32 |
单次Pull消息(内存)传输的最大条数 |
maxTransferBytesOnMessageInDisk |
65536 |
单次Pull消息(磁盘)传输的最大字节数 |
maxTransferCountOnMessageInDisk |
8 |
单次Pull消息(磁盘)传输的最大条数 |
messageIndexEnable |
true |
是否开启消息索引功能 |
messageIndexSafe |
false |
是否提供安全的消息索引机制,索引保证不丢 |
haMasterAddress |
在Slave上直接设置Master地址,默认从Name Server上自动获取,也可以手工强制配置 |
|
cleanFileForciblyEnable |
true |
磁盘满、且无过期文件情况下 TRUE 表示强制删除文件,优先保证服务可用 FALSE 标记服务不可用,文件不删除 |
Consumer |
||
参数名 |
默认值 |
描述 |
:------: |
:--------: |
:-------: |
namesrvAddr |
Name Server地址列表,多个NameServer地址用分号隔开 |
|
clientIP |
本机IP |
客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定 |
instanceName |
DEFAULT |
客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等) |
clientCallbackExecutorThreads |
4 |
通信层异步回调线程数 |
pollNameServerInteval |
30000 |
轮询Name Server间隔时间,单位毫秒 |
heartbeatBrokerInterval |
30000 |
向Broker发送心跳间隔时间,单位毫秒 |
persistConsumerOffsetInterval |
5000 |
持久化Consumer消费进度间隔时间,单位毫秒 |
- Consumer
参数名默认值描述:------::--------::-------:namesrvAddr Name Server地址列表,多个NameServer地址用分号隔开clientIP本机IP客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定instanceNameDEFAULT客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等)clientCallbackExecutorThreads4通信层异步回调线程数pollNameServerInteval30000轮询Name Server间隔时间,单位毫秒heartbeatBrokerInterval30000向Broker发送心跳间隔时间,单位毫秒persistConsumerOffsetInterval5000持久化Consumer消费进度间隔时间,单位毫秒
- Producer参数
Producer配置
参数名 |
默认值 |
描述 |
---|---|---|
producerGroup |
DEFAULT_PRODUCER |
Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组 |
createTopicKey |
TBW102 |
在发送消息时,自动创建服务器不存在的topic,需要指定Key。 |
defaultTopicQueueNums |
4 |
在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 |
sendMsgTimeout |
10000 |
发送消息超时时间,单位毫秒 |
compressMsgBodyOverHowmuch |
4096 |
消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 |
retryAnotherBrokerWhenNotStoreOK |
FALSE |
如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 |
maxMessageSize |
131072 |
客户端限制的消息大小,超过报错,同时服务端也会限制 |
transactionCheckListener |
事务消息回查监听器,如果发送事务消息,必须设置 |
|
checkThreadPoolMinSize |
1 |
Broker回查Producer事务状态时,线程池大小 |
checkThreadPoolMaxSize |
Broker回查Producer事务状态时,线程池大小 |
|
checkRequestHoldMax |
2000 |
Broker回查Producer事务状态时,Producer本地缓冲请求队列大小 |
Push Consumer配置
参数名 |
默认值 |
描述 |
---|---|---|
consumerGroup |
DEFAULT_CONSUMER |
Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 |
messageModel |
CLUSTERING |
消息模型,支持以下两种1、集群消费(CLSUTER)2、广播消费(BROADCASTING) |
consumeFromWhere |
CONSUMEFROMLAST_OFFSET |
Consumer启动后,默认从什么位置开始消费1、CONSUMEFROMLASTOFFSET:默认策略,从该队列最尾开始消费,即跳过历史消息2、CONSUMEFROMFIRSTOFFSET:从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍3、CONSUMEFROMTIMESTAMP:从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前 |
allocateMessageQueueStrategy |
AllocateMessageQueueAveragely |
Rebalance算法实现策略 |
subscription |
{} |
订阅关系 |
messageListener |
消息监听器 |
|
offsetStore |
消费进度存储 |
|
consumeThreadMin |
10 |
消费线程池数量 |
consumeThreadMax |
20 |
消费线程池数量 |
consumeConcurrentlyMaxSpan |
2000 |
单队列并行消费允许的最大跨度 |
pullThresholdForQueue |
1000 |
拉消息本地队列缓存消息最大数 |
pullInterval |
0 |
拉消息间隔,由于是长轮询,所以为0,但是如果应用为了流控,也可以设置大于0的值,单位毫秒 |
consumeMessageBatchMaxSize |
1 |
批量消费,一次消费多少条消息 |
pullBatchSize |
32 |
批量拉消息,一次最多拉多少条 |
Pull Consumer配置
参数名 |
默认值 |
描述 |
---|---|---|
consumerGroup |
DEFAULT_CONSUMER |
Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 |
brokerSuspendMaxTimeMillis |
20000 |
长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒 |
consumerTimeoutMillisWhenSuspend |
30000 |
长轮询,Consumer拉消息请求在Broker挂起超过指定时间,客户端认为超时,单位毫秒 |
consumerPullTimeoutMillis |
10000 |
非长轮询,拉消息超时时间,单位毫秒 |
messageModel |
BROADCASTING |
消息模型,支持以下两种1、集群消费2、广播消费 |
messageQueueListener |
监听队列变化 |
|
offsetStore |
消费进度存储 |
|
registerTopics |
[] |
注册的topic集合 |
allocateMessageQueueStrategy |
AllocateMessageQueueAveragely |
Rebalance算法实现策略 |
- Meesage数据结构
Message数据结构各个字段都可以通过get、set方式访问,例如访问topic: msg.getTopic(); msg.setTopic("test");
字段名 |
默认值 |
必填 |
说明 |
---|---|---|---|
Topic |
null |
true |
线下环境不需要申请,线上环境需要申请后才能使用 |
Body |
null |
true |
二进制形式,序列化由应用决定,Producer与Consumer要协商好序列化形式。 |
Tags |
null |
false |
类似于Gmail为每封邮件设置的标签,方便服务器过滤使用。目前只支持每个消息设置一个tag,所以也可以类比为Notify的MessageType概念。 |
Keys |
null |
false |
代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以再Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品ID等。 |
Flag |
0 |
false |
完全由应用来设置,RocketMQ不做敢于。 |
DelayTimeLevel |
0 |
false |
消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费。 |
WaitStoreMsgOK |
TRUE |
false |
表示消息是否在服务器罗盘后才返回应答。 |
- 连接集群
修改这2个文件:Producer 和 Consumer
Producer
Consumer
- 部署rocketmq-console
GitHub地址:https://github.com/apache/rocketmq-externals
(三)流程梳理添加192.168.89.100:9876;192.168.89.101:9876
生产者流程
- 生产者首先需要设置namesrv,或者指定其他方式更新namesrv。
- 从namesrv获取topic的路由信息,路由信息包括broker以及Message Queue等信息,同时将路由信息保存在本地内存中,方便下次使用。
- 从Message Queue列表中选择合适的Queue发送消息,实现负载均衡。
消费者流程
- namesrv告诉消费者,他从broker中获取消息。
- 获取完之后开始消费。
- RocketMq高可用
特点:master挂了之后角色不会做切换(slave不会成为master)(商用版本的这种情况不知) 然后master和slave需要制定。
发送消息 |
存储消息 |
接受消息 |
|
---|---|---|---|
停掉一个namesrv |
不受影响 |
不受影响 |
不受影响 |
停全部的namesrv |
影响 |
不受影响 |
影响 |
停单个master broker |
不受影响 |
受影响(很小) |
不影响 |
停全部master broker |
影响 |
影响 |
影响 |
停全部salve broker |
不影响 |
不影响 |
不影响 |
恢复任意master broker |
不受影响 |
受影响(很小) |
受影响(很小) |
- Rocketmq文档参考
源码:jm下有文档
PS:说了rocketmq的概念的东西,下次重点说说rocketMq在双11是如何做到的抗压,我听过一次公开课,稍后总结下,分享给各位老铁。
相关推荐
- 【RocketMQ系列】RocketMQ集群,RocketMQ-on-DLedger集群 本文RocketMQ系列第四篇,主要介绍RocketMQ集群及如何部署自动容灾切换的 RocketMQ-on-DLedger Group。RocketMQ集群搭建ROcketMQ集群搭建有以下几种方案:「单Master模式」「多Master模式」「多Master多Slave模式-异步复制」「多Master多Slave模式-同步双写」其…
- 3DMAX提示和技巧 本主题标识使用 Civil View 的一些重要提示和技巧。常规使用屏幕分辨率至少为 1280x1024 的 Civil View。低于此分辨率时,一些面板将占用过多屏幕空间。 将视口设置为线框显示以达到最佳性能。 要尽可能简化用户界面,请在单个视口中工作并关闭 3ds Max 命令面…