分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费
小编:管理员 251阅读 2022.08.01
前传:分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)
本文主要基于 RocketMQ 4.0.x 正式版
- 1、概述
- 2、Consumer
- 3、PushConsumer 一览
- 4、PushConsumer 订阅
- 5、PushConsumer 消息队列分配
- 6、PushConsumer 消费进度读取
- 7、PushConsumer 拉取消息
- 8、PushConsumer 消费消息
- 9、PushConsumer 发回消费失败消息
- 10、Consumer 消费进度
- 11、结尾
1、概述
本文接:《RocketMQ 源码分析 —— Message 拉取与消费(上)》。
主要解析Consumer在 消费 逻辑涉及到的源码。
2、ConsumerMQ 提供了两类消费者:
-
PushConsumer:
- 在大多数场景下使用。
- 名字虽然是Push开头,实际在实现时,使用Pull方式实现。通过Pull不断不断不断轮询Broker获取消息。当不存在新消息时,Broker会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和Broker主动Push做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )。
- PullConsumer
本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费。 本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费。 本文主要讲解PushConsumer,部分讲解PullConsumer,跳过顺序消费。
3、PushConsumer 一览先看一张PushConsumer包含的组件以及组件之间的交互图:
- RebalanceService:均衡消息队列服务,负责分配当前Consumer可消费的消息队列(MessageQueue)。当有新的Consumer的加入或移除,都会重新分配消息队列。
- PullMessageService:拉取消息服务,不断不断不断从Broker拉取消息,并提交消费任务到ConsumeMessageService。
- ConsumeMessageService:消费消息服务,不断不断不断消费消息,并处理消费结果。
- RemoteBrokerOffsetStore:Consumer消费进度管理,负责从Broker获取消费进度,同步消费进度到Broker。
- ProcessQueue:消息处理队列。
- MQClientInstance:封装对Namesrv,Broker的 API调用,提供给Producer、Consumer使用。
1: public void subscribe(String topic, String subExpression) throws MQClientException { 2: try { 3: // 创建订阅数据 4: SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // 5: topic, subExpression); 6: this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); 7: // 通过心跳同步Consumer信息到Broker 8: if (this.mQClientFactory != null) { 9: this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); 10: } 11: } catch (Exception e) { 12: throw new MQClientException("subscription exception", e); 13: } 14: }复制
- 说明 :订阅Topic。
- 第 3 至 6 行 :创建订阅数据。详细解析见:FilterAPI.buildSubscriptionData(...)。
- 第 7 至 10 行 :通过心跳同步Consumer信息到Broker。
1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, 2: String subString) throws Exception { 3: SubscriptionData subscriptionData = new SubscriptionData(); 4: subscriptionData.setTopic(topic); 5: subscriptionData.setSubString(subString); 6: // 处理订阅表达式 7: if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) { 8: subscriptionData.setSubString(SubscriptionData.SUB_ALL); 9: } else { 10: String[] tags = subString.split("\\|\\|"); 11: if (tags.length > 0) { 12: for (String tag : tags) { 13: if (tag.length() > 0) { 14: String trimString = tag.trim(); 15: if (trimString.length() > 0) { 16: subscriptionData.getTagsSet().add(trimString); 17: subscriptionData.getCodeSet().add(trimString.hashCode()); 18: } 19: } 20: } 21: } else { 22: throw new Exception("subString split error"); 23: } 24: } 25: 26: return subscriptionData; 27: }复制
- 说明 :根据Topic和 订阅表达式 创建订阅数据
- subscriptionData.subVersion = System.currentTimeMillis()。
1: public void registerMessageListener(MessageListenerConcurrently messageListener) { 2: this.messageListener = messageListener; 3: this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); 4: }复制
- 说明 :注册消息监听器。
1: public class RebalanceService extends ServiceThread { 2: 3: /** 4: * 等待间隔,单位:毫秒 5: */ 6: private static long waitInterval = 7: Long.parseLong(System.getProperty( 8: "rocketmq.client.rebalance.waitInterval", "20000")); 9: 10: private final Logger log = ClientLogger.getLog(); 11: /** 12: * MQClient对象 13: */ 14: private final MQClientInstance mqClientFactory; 15: 16: public RebalanceService(MQClientInstance mqClientFactory) { 17: this.mqClientFactory = mqClientFactory; 18: } 19: 20: @Override 21: public void run() { 22: log.info(this.getServiceName() + " service started"); 23: 24: while (!this.isStopped()) { 25: this.waitForRunning(waitInterval); 26: this.mqClientFactory.doRebalance(); 27: } 28: 29: log.info(this.getServiceName() + " service end"); 30: } 31: 32: @Override 33: public String getServiceName() { 34: return RebalanceService.class.getSimpleName(); 35: } 36: }复制
- 说明 :均衡消息队列服务,负责分配当前Consumer可消费的消息队列(MessageQueue)。
-
第 26 行 :调用MQClientInstance#doRebalance(...)分配消息队列。目前有三种情况情况下触发:
详细解析见:MQClientInstance#doRebalance(...)。
- 如第25行等待超时,每 20s 调用一次。
- PushConsumer启动时,调用rebalanceService#wakeup(...)触发。
- Broker通知Consumer加入 或 移除时,Consumer响应通知,调用rebalanceService#wakeup(...)触发。
1: public void doRebalance() { 2: for (Map.Entry复制entry : this.consumerTable.entrySet()) { 3: MQConsumerInner impl = entry.getValue(); 4: if (impl != null) { 5: try { 6: impl.doRebalance(); 7: } catch (Throwable e) { 8: log.error("doRebalance exception", e); 9: } 10: } 11: } 12: }
- 说明 :遍历当前Client包含的consumerTable(Consumer集合 ),执行消息队列分配。
- 疑问:目前代码调试下来,consumerTable只包含Consumer自己。?有大大对这个疑问有解答的,烦请解答下。
- 第 6 行 :调用MQConsumerInner#doRebalance(...)进行队列分配。DefaultMQPushConsumerImpl、DefaultMQPullConsumerImpl分别对该接口方法进行了实现。DefaultMQPushConsumerImpl#doRebalance(...)详细解析见:DefaultMQPushConsumerImpl#doRebalance(...)。
1: public void doRebalance() { 2: if (!this.pause) { 3: this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); 4: } 5: }复制
- 说明:执行消息队列分配。
- 第 3 行 :调用RebalanceImpl#doRebalance(...)进行队列分配。详细解析见:RebalancePushImpl#doRebalance(...)。
1: /** 2: * 执行分配消息队列 3: * 4: * @param isOrder 是否顺序消息 5: */ 6: public void doRebalance(final boolean isOrder) { 7: // 分配每个 topic 的消息队列 8: Map复制subTable = this.getSubscriptionInner(); 9: if (subTable != null) { 10: for (final Map.Entry entry : subTable.entrySet()) { 11: final String topic = entry.getKey(); 12: try { 13: this.rebalanceByTopic(topic, isOrder); 14: } catch (Throwable e) { 15: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 16: log.warn("rebalanceByTopic Exception", e); 17: } 18: } 19: } 20: } 21: // 移除未订阅的topic对应的消息队列 22: this.truncateMessageQueueNotMyTopic(); 23: } 24: 25: /** 26: * 移除未订阅的消息队列 27: */ 28: private void truncateMessageQueueNotMyTopic() { 29: Map subTable = this.getSubscriptionInner(); 30: for (MessageQueue mq : this.processQueueTable.keySet()) { 31: if (!subTable.containsKey(mq.getTopic())) { 32: 33: ProcessQueue pq = this.processQueueTable.remove(mq); 34: if (pq != null) { 35: pq.setDropped(true); 36: log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq); 37: } 38: } 39: } 40: }
-
#doRebalance(...)说明 :执行分配消息队列。
- 第 7 至 20 行 :循环订阅主题集合(subscriptionInner),分配每一个Topic的消息队列。
- 第 22 行 :移除未订阅的Topic的消息队列。
- #truncateMessageQueueNotMyTopic(...)说明 :移除未订阅的消息队列。当调用 DefaultMQPushConsumer#unsubscribe(topic) 时,只移除订阅主题集合( subscriptionInner ),对应消息队列移除在该方法。
1: private void rebalanceByTopic(final String topic, final boolean isOrder) { 2: switch (messageModel) { 3: case BROADCASTING: { 4: Set复制mqSet = this.topicSubscribeInfoTable.get(topic); 5: if (mqSet != null) { 6: boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); 7: if (changed) { 8: this.messageQueueChanged(topic, mqSet, mqSet); 9: log.info("messageQueueChanged {} {} {} {}", // 10: consumerGroup, // 11: topic, // 12: mqSet, // 13: mqSet); 14: } 15: } else { 16: log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 17: } 18: break; 19: } 20: case CLUSTERING: { 21: // 获取 topic 对应的 队列 和 consumer信息 22: Set mqSet = this.topicSubscribeInfoTable.get(topic); 23: List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); 24: if (null == mqSet) { 25: if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 26: log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 27: } 28: } 29: 30: if (null == cidAll) { 31: log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); 32: } 33: 34: if (mqSet != null && cidAll != null) { 35: // 排序 消息队列 和 消费者数组。因为是在Client进行分配队列,排序后,各Client的顺序才能保持一致。 36: List mqAll = new ArrayList<>(); 37: mqAll.addAll(mqSet); 38: 39: Collections.sort(mqAll); 40: Collections.sort(cidAll); 41: 42: AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; 43: 44: // 根据 队列分配策略 分配消息队列 45: List allocateResult; 46: try { 47: allocateResult = strategy.allocate(// 48: this.consumerGroup, // 49: this.mQClientFactory.getClientId(), // 50: mqAll, // 51: cidAll); 52: } catch (Throwable e) { 53: log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), 54: e); 55: return; 56: } 57: 58: Set allocateResultSet = new HashSet<>(); 59: if (allocateResult != null) { 60: allocateResultSet.addAll(allocateResult); 61: } 62: 63: // 更新消息队列 64: boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); 65: if (changed) { 66: log.info( 67: "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", 68: strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), 69: allocateResultSet.size(), allocateResultSet); 70: this.messageQueueChanged(topic, mqSet, allocateResultSet); 71: } 72: } 73: break; 74: } 75: default: 76: break; 77: } 78: } 79: 80: /** 81: * 当负载均衡时,更新 消息处理队列 82: * - 移除 在processQueueTable && 不存在于 mqSet 里的消息队列 83: * - 增加 不在processQueueTable && 存在于mqSet 里的消息队列 84: * 85: * @param topic Topic 86: * @param mqSet 负载均衡结果后的消息队列数组 87: * @param isOrder 是否顺序 88: * @return 是否变更 89: */ 90: private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet, final boolean isOrder) { 91: boolean changed = false; 92: 93: // 移除 在processQueueTable && 不存在于 mqSet 里的消息队列 94: Iterator > it = this.processQueueTable.entrySet().iterator(); 95: while (it.hasNext()) { // TODO 待读: 96: Entry next = it.next(); 97: MessageQueue mq = next.getKey(); 98: ProcessQueue pq = next.getValue(); 99: 100: if (mq.getTopic().equals(topic)) { 101: if (!mqSet.contains(mq)) { // 不包含的队列 102: pq.setDropped(true); 103: if (this.removeUnnecessaryMessageQueue(mq, pq)) { 104: it.remove(); 105: changed = true; 106: log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); 107: } 108: } else if (pq.isPullExpired()) { // 队列拉取超时,进行清理 109: switch (this.consumeType()) { 110: case CONSUME_ACTIVELY: 111: break; 112: case CONSUME_PASSIVELY: 113: pq.setDropped(true); 114: if (this.removeUnnecessaryMessageQueue(mq, pq)) { 115: it.remove(); 116: changed = true; 117: log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", 118: consumerGroup, mq); 119: } 120: break; 121: default: 122: break; 123: } 124: } 125: } 126: } 127: 128: // 增加 不在processQueueTable && 存在于mqSet 里的消息队列。 129: List pullRequestList = new ArrayList<>(); // 拉消息请求数组 130: for (MessageQueue mq : mqSet) { 131: if (!this.processQueueTable.containsKey(mq)) { 132: if (isOrder && !this.lock(mq)) { 133: log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); 134: continue; 135: } 136: 137: this.removeDirtyOffset(mq); 138: ProcessQueue pq = new ProcessQueue(); 139: long nextOffset = this.computePullFromWhere(mq); 140: if (nextOffset >= 0) { 141: ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); 142: if (pre != null) { 143: log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); 144: } else { 145: log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); 146: PullRequest pullRequest = new PullRequest(); 147: pullRequest.setConsumerGroup(consumerGroup); 148: pullRequest.setNextOffset(nextOffset); 149: pullRequest.setMessageQueue(mq); 150: pullRequest.setProcessQueue(pq); 151: pullRequestList.add(pullRequest); 152: changed = true; 153: } 154: } else { 155: log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); 156: } 157: } 158: } 159: 160: // 发起消息拉取请求 161: this.dispatchPullRequest(pullRequestList); 162: 163: return changed; 164: }
-
#rebalanceByTopic(...)说明 :分配Topic的消息队列。
- 第 21 至 40 行 :获取Topic对应的消息队列和消费者们,并对其进行排序。因为各Consumer是在本地分配消息队列,排序后才能保证各Consumer顺序一致。
- 第 42 至 61 行 :根据 队列分配策略(AllocateMessageQueueStrategy) 分配消息队列。详细解析见:AllocateMessageQueueStrategy。
- 第 63 至 72 行 :更新Topic对应的消息队列。
- 第 3 至 19 行 :广播模式(BROADCASTING) 下,分配Topic对应的所有消息队列。
- 第 20 至 74 行 :集群模式(CLUSTERING) 下,分配Topic对应的部分消息队列。
-
#updateProcessQueueTableInRebalance(...)说明 :当分配队列时,更新Topic对应的消息队列,并返回是否有变更。
- 第 132 至 135 行 :顺序消费相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。
- 第 137 行 :移除消息队列的消费进度。
- 第 139 行 :获取队列消费进度。详细解析见:RebalancePushImpl#computePullFromWhere(...)。
- 第 140 至 156 行 :添加新消费处理队列,添加消费拉取消息请求。
- 第 103 行 :移除不需要的消息队列。详细解析见:RebalancePushImpl#removeUnnecessaryMessageQueue(...)。
- 第 108 至 120 行 :队列拉取超时,即当前时间-最后一次拉取消息时间>120s( 120s 可配置),判定发生 BUG,过久未进行消息拉取,移除消息队列。移除后,下面#新增队列逻辑#可以重新加入新的该消息队列。
- 第 93 至 126 行 :移除不存在于分配的消息队列(mqSet) 的 消息处理队列(processQueueTable)。
- 第 128 至 158 行 :增加 分配的消息队列(mqSet) 新增的消息队列。
- 第 161 行 :发起新增的消息队列消息拉取请求。详细解析见:RebalancePushImpl#dispatchPullRequest(...)。
1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { 2: // 同步队列的消费进度,并移除之。 3: this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); 4: this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); 5: // TODO 顺序消费 6: if (this.defaultMQPushConsumerImpl.isConsumeOrderly() 7: && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { 8: try { 9: if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) { 10: try { 11: return this.unlockDelay(mq, pq); 12: } finally { 13: pq.getLockConsume().unlock(); 14: } 15: } else { 16: log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", // 17: mq, // 18: pq.getTryUnlockTimes()); 19: 20: pq.incTryUnlockTimes(); 21: } 22: } catch (Exception e) { 23: log.error("removeUnnecessaryMessageQueue Exception", e); 24: } 25: 26: return false; 27: } 28: return true; 29: }复制
- 说明 :移除不需要的消息队列相关的信息,并返回是否移除成功。
- 第 2 至 4 行 :同步队列的消费进度,并移除之。
- 第 5 至 27 行 :顺序消费相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。
1: public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { 2: this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq); 3: this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq); 4: return true; 5: }复制
- 说明 :移除不需要的消息队列相关的信息,并返回移除成功。和RebalancePushImpl#removeUnnecessaryMessageQueue(...)基本一致。
1: public void dispatchPullRequest(List复制pullRequestList) { 2: for (PullRequest pullRequest : pullRequestList) { 3: this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); 4: log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); 5: } 6: }
- 说明 :发起消息拉取请求。该调用是PushConsumer不断不断不断拉取消息的起点。
1: public void executePullRequestImmediately(final PullRequest pullRequest) { 2: this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); 3: }复制
- 说明 :提交拉取请求。提交后,PullMessageService异步执行,非阻塞。详细解析见:PullMessageService。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :平均分配队列策略。
- 第 7 至 25 行 :参数校验。
-
第 26 至 36 行 :平均分配消息队列。
- [0,mod):mqAll.size()/cidAll.size()+1。前面mod个Consumer平分余数,多获得 1 个消息队列。
- [mod,cidAll.size()):mqAll.size()/cidAll.size()。
- 第 27 行 :index:当前Consumer在消费集群里是第几个。这里就是为什么需要对传入的cidAll参数必须进行排序的原因。如果不排序,Consumer在本地计算出来的index无法一致,影响计算结果。
- 第 28 行 :mod:余数,即多少消息队列无法平均分配。
-
第 29 至 31 行 :averageSize:代码可以简化成(mod>0&&index
- 第 32 行 :startIndex:Consumer分配消息队列开始位置。
- 第 33 行 :range:分配队列数量。之所以要Math#min(...)的原因:当mqAll.size()<=cidAll.size()时,最后几个Consumer分配不到消息队列。
- 第 34 至 36 行 :生成分配消息队列结果。
- 举个例子:
固定消息队列长度为4。
Consumer * 2 可以整除 |
Consumer * 3 不可整除 |
Consumer * 5 无法都分配 |
|
---|---|---|---|
消息队列[0] |
Consumer[0] |
Consumer[0] |
Consumer[0] |
消息队列[1] |
Consumer[0] |
Consumer[0] |
Consumer[1] |
消息队列[2] |
Consumer[1] |
Consumer[1] |
Consumer[2] |
消息队列[3] |
Consumer[1] |
Consumer[2] |
Consumer[3] |
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :平均分配可消费的 Broker对应的消息队列。
- 第 7 至 15 行 :参数校验。
- 第 16 至 23 行 :计算可消费的 Broker对应的消息队列。
- 第 25 至 34 行 :平均分配消息队列。该平均分配方式和AllocateMessageQueueAveragely略有不同,其是将多余的结尾部分分配给前rem个Consumer。
- 疑问:使用该分配策略时,Consumer和Broker分配需要怎么配置。?等研究主从相关源码时,仔细考虑下。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :环状分配消息队列。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :分配配置的消息队列。
- 疑问 :该分配策略的使用场景。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :计算消息队列开始消费位置。
-
PushConsumer读取消费进度有三种选项:
- CONSUME_FROM_LAST_OFFSET:第 6 至 29 行 :一个新的消费集群第一次启动从队列的最后位置开始消费。后续再启动接着上次消费的进度开始消费。
- CONSUME_FROM_FIRST_OFFSET:第 30 至 40 行 :一个新的消费集群第一次启动从队列的最前位置开始消费。后续再启动接着上次消费的进度开始消费。
- CONSUME_FROM_TIMESTAMP:第 41 至 65 行 :一个新的消费集群第一次启动从指定时间点开始消费。后续再启动接着上次消费的进度开始消费。
暂时跳过。?
6、PushConsumer 拉取消息PullMessageService// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :拉取消息服务,不断不断不断从Broker拉取消息,并提交消费任务到ConsumeMessageService。
- #executePullRequestLater(...):第 26 至 40 行 : 提交延迟拉取消息请求。
- #executePullRequestImmediately(...):第 42 至 53 行 :提交立即拉取消息请求。
- #executeTaskLater(...):第 55 至 63 行 :提交延迟任务。
- #pullMessage(...):第 69 至 82 行 :执行拉取消息逻辑。详细解析见:DefaultMQPushConsumerImpl#pullMessage(...)。
- #run(...):第 84 至 101 行 :循环拉取消息请求队列(pullRequestQueue),进行消息拉取。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
-
#pullMessage(...)说明 :拉取消息。
- 执行消息拉取异步请求。详细解析见:PullAPIWrapper#pullKernelImpl(...)。
- 当发起请求产生异常时,提交延迟拉取消息请求。对应Broker处理拉取消息逻辑见:PullMessageProcessor#processRequest(...)。
- 第 3 至 6 行 :消息处理队列已经终止,不进行消息拉取。
- 第 9 行 :设置消息处理队列最后拉取消息时间。
- 第 11 至 18 行 :Consumer未处于运行中状态,不进行消息拉取,提交延迟拉取消息请求。
- 第 20 至 25 行 :Consumer处于暂停中,不进行消息拉取,提交延迟拉取消息请求。
- 第 27 至 37 行 :消息处理队列持有消息超过最大允许值(默认:1000条),不进行消息拉取,提交延迟拉取消息请求。
- 第 39 至 49 行 :Consumer为并发消费 并且 消息队列持有消息跨度过大(消息跨度 = 持有消息最后一条和第一条的消息位置差,默认:2000),不进行消息拉取,提交延迟拉取消息请求。
- 第 50 至 70 行 :顺序消费相关跳过,详细解析见:《RocketMQ 源码分析 —— Message 顺序发送与消费》。
- 第 72 至 78 行 :Topic对应的订阅信息不存在,不进行消息拉取,提交延迟拉取消息请求。
- 第 222 至 224 行 :判断请求是否使用Consumer本地的订阅信息(SubscriptionData),而不使用Broker里的订阅信息。详细解析见:PullMessageProcessor#processRequest(...) 第 64 至 110 行代码。
- 第 226 行 :是否开启过滤类过滤模式。详细解析见:《RocketMQ 源码分析 —— Filtersrv》。
- 第 229 至 235 行 :计算拉取消息请求系统标识。详细解析见:PullMessageRequestHeader.sysFlag。
- 第 237 至 255 行 :
- PullCallback:拉取消息回调:
- 第 86 行 :处理拉取结果。详细逻辑见:PullAPIWrapper#processPullResult(...)。
-
第 89 至 192 行 :处理拉取状态结果: * 第 90 至 139 行 :拉取到消息(FOUND) : * 第 91 至 93 行 :设置下次拉取消息队列位置。 * 第 95 至 97 行 :统计。 * 第 101 至 102 行 :拉取到消息的消息列表为空,提交立即拉取消息请求。为什么会存在拉取到消息,但是消息结果未空呢?原因见:PullAPIWrapper#processPullResult(...)。 * 第 106 至 108 行 :统计。 * 第 111 行 :提交拉取到的消息到消息处理队列。详细解析见:ProcessQueue#putMessage(...)。 * 第 113 至 118 行 :提交消费请求到ConsumeMessageService。详细解析见:ConsumeMessageConcurrentlyService。 * 第 120 至 126 行 :根据拉取频率(pullInterval),提交立即或者延迟拉取消息请求。默认拉取频率为 0ms ,提交立即拉取消息请求。 * 第 129 至 137 行 :下次拉取消息队列位置小于上次拉取消息队列位置 或者 第一条消息的消息队列位置小于上次拉取消息队列位置,则判定为BUG,输出警告日志。
- 第 140 至 149 行 :没有新消息(NO_NEW_MSG) : * 第 142 行 : 设置下次拉取消息队列位置。 * 第 145 行 :更正消费进度。详细解析见:#correctTagsOffset(...)。 * 第 148 行 :提交立即拉取消息请求。
- 第 150 至 159 行 :有新消息但是不匹配(NO_MATCHED_MSG)。逻辑同NO_NEW_MSG。
- 第 160 至 189 行 :拉取请求的消息队列位置不合法 (OFFSET_ILLEGAL)。 * 第 164 行 :设置下次拉取消息队列位置。 * 第 167 行 :设置消息处理队列为dropped。 * 第 169 至 188 行 :提交延迟任务,进行队列移除。 * 第 175 至 178 行 :更新消费进度,同步消费进度到Broker。 * 第 181 行 :移除消费处理队列。 * 疑问:为什么不立即移除???
- 第 196 至 204 行 :发生异常,提交延迟拉取消息请求。
-
#correctTagsOffset(...):更正消费进度。
- 第 258 至 261 行 : 当消费处理队列持有消息数量为 0 时,更新消费进度为拉取请求的拉取消息队列位置。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :拉取消息核心方法。该方法参数较多,可以看下代码注释上每个参数的说明?。
-
第 34 至 43 行 :获取Broker信息(Broker地址、是否为从节点)。
- #recalculatePullFromWhichNode(...)
- #MQClientInstance#findBrokerAddressInSubscribe(...)
- 第 45 至 78 行 :请求拉取消息。
- 第 81 行 :当Broker信息不存在,则抛出异常。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :计算消息队列拉取消息对应的Broker编号。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :获取Broker信息(Broker地址、是否为从节点)。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
-
说明 :处理拉取结果。
- 更新消息队列拉取消息Broker编号的映射。
- 解析消息,并根据订阅信息消息tagCode匹配合适消息。
- 第 16 行 :更新消息队列拉取消息Broker编号的映射。下次拉取消息时,如果未设置默认拉取的Broker编号,会使用更新后的Broker编号。
-
第 18 至 55 行 :解析消息,并根据订阅信息消息tagCode匹配合适消息。
- 第 20 至 22 行 :解析消息。详细解析见:《RocketMQ 源码分析 —— Message基础》 。
- 第 24 至 35 行 :根据订阅信息tagCode匹配消息。
- 第 37 至 43 行 :Hook。
- 第 45 至 51 行 :设置消息队列当前最小/最大位置到消息拓展字段。
- 第 54 行 :设置消息队列。
- 第 58 行 :清空消息二进制数组。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
总结如果用最简单粗暴的方式描述PullConsumer拉取消息的过程,那就是如下的代码:
while (true) { if (不满足拉取消息) { Thread.sleep(间隔); continue; } 主动拉取消息(); }复制6、PushConsumer 消费消息ConsumeMessageConcurrentlyService 提交消费请求ConsumeMessageConcurrentlyService#submitConsumeRequest(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :提交立即消费请求。
- 第 16 至 22 行 :提交消息小于等于批量消费数,直接提交消费请求。
-
第 23 至 47 行 :当提交消息大于批量消费数,进行分拆成多个请求。
- 第 25 至 33 行 :计算当前拆分请求包含的消息。
- 第 35 至 38 行 :提交拆分消费请求。
- 第 39 至 44 行 :提交请求被拒绝,则将当前拆分消息 + 剩余消息提交延迟消费请求,结束拆分循环。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :提交延迟消费请求。
- 第 34 行 :直接调用ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);。如果消息数超过批量消费上限,会不会是BUG。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :消费请求。提交请求执行消费。
- 第 24 至 28 行 :废弃处理队列不进行消费。
- 第 34 至 44 行 :Hook。
- 第 51 行 :当消息为重试消息,设置Topic为原始Topic。例如:原始Topic为TopicTest,重试时Topic为%RETRY%please_rename_unique_group_name_4,经过该方法,Topic设置回TopicTest。
- 第 53 至 58 行 :设置开始消费时间。
- 第 61 行 :进行消费。
- 第 71 至 85 行 :解析消费返回结果类型
- 第 87 至 90 行 :Hook。
- 第 92 至 99 行 :消费结果状态未空时,则设置消费结果状态为稍后消费。
- 第 101 至 106 行 :Hook。
- 第 108 至 110 行 :统计。
- 第 112 至 117 行 :处理消费结果。如果消费处理队列被移除,恰好消息被消费,则可能导致消息重复消费,因此,消息消费要尽最大可能性实现幂等性。详细解析见:ConsumeMessageConcurrentlyService#processConsumeResult(...)。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :处理消费结果。
- 第 8 至 10 行 :消费请求消息未空时,直接返回。
-
第 12 至 32 行 :计算ackIndex值。consumeRequest.msgs[0-ackIndex]为消费成功,需要进行ack确认。
- 第 14 至 23 行 :CONSUME_SUCCESS:ackIndex=context.getAckIndex()。
- 第 24 至 29 行 :RECONSUME_LATER:ackIndex=-1。
-
第34 至 63 行 :处理消费失败的消息。
- 第 43 至 52 行 :发回消费失败的消息到Broker。详细解析见:DefaultMQPushConsumerImpl#sendMessageBack(...)。
- 第 54 至 59 行 :发回Broker失败的消息,直接提交延迟重新消费。
- 如果发回Broker成功,结果因为例如网络异常,导致Consumer以为发回失败,判定消费发回失败,会导致消息重复消费,因此,消息消费要尽最大可能性实现幂等性。
- 第 36 至 41 行 :BROADCASTING:广播模式,无论是否消费失败,不发回消息到Broker,只打印日志。
- 第 42 至 60 行 :CLUSTERING:集群模式,消费失败的消息发回到Broker。
-
第 65 至 69 行 :移除【消费成功】和【消费失败但发回Broker成功】的消息,并更新最新消费进度。
- 为什么会有【消费失败但发回Broker成功】的消息?见第 56 行。
- ProcessQueue#removeMessage(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
ConsumeMessageConcurrentlyService#cleanExpireMsg(...)// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :定时清理过期消息,默认周期:15min。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :移除过期消息。
- 第 2 至 5 行 :顺序消费时,直接返回。
- 第 7 至 9 行 :循环移除消息。默认最大循环次数:16次。
- 第 10 至 25 行 :获取第一条消息。判断是否超时,若不超时,则结束循环。
- 第 29 行 :发回超时消息到Broker。
- 第 32 至 48 行 :判断此时消息是否依然是第一条,若是,则进行移除。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :发回消息。
- 第 4 至 8 行 :Consumer发回消息。详细解析见:MQClientAPIImpl#consumerSendMessageBack(...)。
-
第 10 至 25 行 :发生异常时,Consumer内置默认Producer发送消息。
- ?疑问:什么样的情况下会发生异常呢?
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
8、Consumer 消费进度OffsetStore- RemoteBrokerOffsetStore:Consumer集群模式 下,使用远程Broker消费进度。
- LocalFileOffsetStore:Consumer广播模式下,使用本地文件消费进度。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :从本地文件加载消费进度到内存。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :本地Offset存储序列化。
Yunai-MacdeMacBook-Pro-2:config yunai$ cat /Users/yunai/.rocketmq_offsets/192.168.17.0@DEFAULT/please_rename_unique_group_name_1/offsets.json { "offsetTable":{{ "brokerName":"broker-a", "queueId":3, "topic":"TopicTest" }:1470,{ "brokerName":"broker-a", "queueId":2, "topic":"TopicTest" }:1471,{ "brokerName":"broker-a", "queueId":1, "topic":"TopicTest" }:1470,{ "brokerName":"broker-a", "queueId":0, "topic":"TopicTest" }:1470 } }复制RemoteBrokerOffsetStore#load(...)
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :不进行加载,实际读取消费进度时,从Broker获取。
读取消费进度类型:
- READ_FROM_MEMORY:从内存读取。
- READ_FROM_STORE:从存储(Broker或文件)读取。
- MEMORY_FIRST_THEN_STORE:优先从内存读取,读取不到,从存储读取。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 第 16 行 :从文件读取消费进度。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 第 16 行 :从Broker读取消费进度。
该方法RemoteBrokerOffsetStore与LocalFileOffsetStore实现相同。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
OffsetStore#persistAll(...)LocalFileOffsetStore#persistAll(...)// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :持久化消费进度。将消费进度写入文件。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :持久化指定消息队列数组的消费进度到Broker,并移除非指定消息队列。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/
- 说明 :定时进行持久化,默认周期:5000ms。
-
重要说明 :
- 消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。
- 消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。
- 消费进度持久化不仅仅只有定时持久化,拉取消息、分配消息队列等等操作,都会进行消费进度持久化。
相关推荐
- 【RocketMQ系列】RocketMQ集群,RocketMQ-on-DLedger集群 本文RocketMQ系列第四篇,主要介绍RocketMQ集群及如何部署自动容灾切换的 RocketMQ-on-DLedger Group。RocketMQ集群搭建ROcketMQ集群搭建有以下几种方案:「单Master模式」「多Master模式」「多Master多Slave模式-异步复制」「多Master多Slave模式-同步双写」其…
- 3DMAX提示和技巧 本主题标识使用 Civil View 的一些重要提示和技巧。常规使用屏幕分辨率至少为 1280x1024 的 Civil View。低于此分辨率时,一些面板将占用过多屏幕空间。 将视口设置为线框显示以达到最佳性能。 要尽可能简化用户界面,请在单个视口中工作并关闭 3ds Max 命令面…