RocketMQ之Pull Consumer负载均衡拉取正确姿势?

小编:管理员 166阅读 2022.08.03

RocketMQ分布式消息中间件Pull模式下,我们的一般消费步骤如下:

1. 读取 topic的消息队列message queue的信息; 2. 按队列去拉取一定数目的消息; 3.(持久化message queue的消费进度consume offset)

首先有一些关键概念,我们需要理清楚:

consume offset 是基于topic 以及 消费组 consumer group的,意思是什么?

意思就是当采用集群消费模型(CLUSTERING),我们的consume offset 默认是存储在broker服务器上的config/consumerOffset.json文件。

{
	"offsetTable":{
		"TopicTest@pullConsumerGroupTest":{0:1578,1:1578,2:1578,3:1578
		}
	}
}
复制

(TopicTest为某个topic,初始化为4个队列, pullConsumerGroupTest为某个消费组)

那么当同一个consumer group的多个consumer instance默认是共享一个消费进度。

那么问题来了,我们同一个consumer group的多个consumer instance在第一个步骤的时候,如何快速感知和分配合适的消息队列message queue,给每一个consumer instance消费呢??

---即 负载均衡问题如何解决?

废话不多说,贴个代码:

public Set fetchMessageQueuesForPullOperation(String topic)
			throws MQClientException, InterruptedException {
		DefaultMQPullConsumer pullConsumer; // please init
		long fetchQueueTimeoutMillis = 5000l;
		long fetchQueueNextDelayTimeMillis = 200l;

		Set msgQueues = null;
		switch (pullConsumer.getMessageModel()) {
			case BROADCASTING:
				msgQueues = pullConsumer.fetchSubscribeMessageQueues(topic);
				break;
			case CLUSTERING:
				msgQueues = pullConsumer.fetchMessageQueuesInBalance(topic);
				// 未获取到负载均衡的时候,等待fetchQueueNextDelayTimeMillis毫秒重新获取,直到超时
				long timeout = 0L;
				while (CollectionUtils.isEmpty(msgQueues) && timeout < fetchQueueTimeoutMillis) {
					Thread.sleep(fetchQueueNextDelayTimeMillis);
					timeout += fetchQueueNextDelayTimeMillis;
					msgQueues = pullConsumer.fetchMessageQueuesInBalance(topic);
				}
				break;
			default:
				break;
		}
		return msgQueues;
	}
复制

当然还有一种内置的

MQPullConsumerScheduleService
复制

也是可以实现。不同的是这个是回调模式。

by 斯武丶风晴 https://my.oschina.net/langxSpirit

关联标签: