RocketMQ之Pull Consumer负载均衡拉取正确姿势?
小编:管理员 166阅读 2022.08.03
RocketMQ分布式消息中间件Pull模式下,我们的一般消费步骤如下:
1. 读取 topic的消息队列message queue的信息; 2. 按队列去拉取一定数目的消息; 3.(持久化message queue的消费进度consume offset)
首先有一些关键概念,我们需要理清楚:
consume offset 是基于topic 以及 消费组 consumer group的,意思是什么?
意思就是当采用集群消费模型(CLUSTERING),我们的consume offset 默认是存储在broker服务器上的config/consumerOffset.json文件。
{ "offsetTable":{ "TopicTest@pullConsumerGroupTest":{0:1578,1:1578,2:1578,3:1578 } } }复制
(TopicTest为某个topic,初始化为4个队列, pullConsumerGroupTest为某个消费组)
那么当同一个consumer group的多个consumer instance默认是共享一个消费进度。
那么问题来了,我们同一个consumer group的多个consumer instance在第一个步骤的时候,如何快速感知和分配合适的消息队列message queue,给每一个consumer instance消费呢??
---即 负载均衡问题如何解决?
废话不多说,贴个代码:
public Set复制fetchMessageQueuesForPullOperation(String topic) throws MQClientException, InterruptedException { DefaultMQPullConsumer pullConsumer; // please init long fetchQueueTimeoutMillis = 5000l; long fetchQueueNextDelayTimeMillis = 200l; Set msgQueues = null; switch (pullConsumer.getMessageModel()) { case BROADCASTING: msgQueues = pullConsumer.fetchSubscribeMessageQueues(topic); break; case CLUSTERING: msgQueues = pullConsumer.fetchMessageQueuesInBalance(topic); // 未获取到负载均衡的时候,等待fetchQueueNextDelayTimeMillis毫秒重新获取,直到超时 long timeout = 0L; while (CollectionUtils.isEmpty(msgQueues) && timeout < fetchQueueTimeoutMillis) { Thread.sleep(fetchQueueNextDelayTimeMillis); timeout += fetchQueueNextDelayTimeMillis; msgQueues = pullConsumer.fetchMessageQueuesInBalance(topic); } break; default: break; } return msgQueues; }
当然还有一种内置的
MQPullConsumerScheduleService复制
也是可以实现。不同的是这个是回调模式。
by 斯武丶风晴 https://my.oschina.net/langxSpirit
相关推荐
- 【RocketMQ系列】RocketMQ集群,RocketMQ-on-DLedger集群 本文RocketMQ系列第四篇,主要介绍RocketMQ集群及如何部署自动容灾切换的 RocketMQ-on-DLedger Group。RocketMQ集群搭建ROcketMQ集群搭建有以下几种方案:「单Master模式」「多Master模式」「多Master多Slave模式-异步复制」「多Master多Slave模式-同步双写」其…
- 3DMAX提示和技巧 本主题标识使用 Civil View 的一些重要提示和技巧。常规使用屏幕分辨率至少为 1280x1024 的 Civil View。低于此分辨率时,一些面板将占用过多屏幕空间。 将视口设置为线框显示以达到最佳性能。 要尽可能简化用户界面,请在单个视口中工作并关闭 3ds Max 命令面…