分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)

小编:管理员 160阅读 2022.08.01

本文主要基于 RocketMQ 4.0.x 正式版

  • 1、概述
  • 2、ConsumeQueue 结构
  • 3、ConsumeQueue 存储
    • DefaultMessageStore#doDispatch(...)
    • ConsumeQueue#putMessagePositionInfoWrapper(...)
    • ReputMessageService
    • FlushConsumeQueueService
  • 4、Broker 提供[拉取消息]接口
    • PullMessageRequestHeader
    • PullMessageProcessor#processRequest(...)
    • MessageStore#getMessage(...)
    • DefaultMessageFilter#isMessageMatched(...)
    • PullRequestHoldService
    • PullMessageProcessor#executeRequestWhenWakeup(...)
  • 5、Broker 提供[更新消费进度]接口
    • MixAll#string2File(...)
    • BrokerController#initialize(...)
    • ConfigManager
    • ConsumerOffsetManager
  • 6、Broker 提供[发回消息]接口
    • SendMessageProcessor#consumerSendMsgBack(...)
  • 7、结尾

1、概述

本章主要解析 消费 逻辑涉及到的源码。 因为篇幅较长,分成上下两篇:

  1. 上篇:Broker相关源码。
  2. 下篇:Consumer相关源码。

本文即是上篇。


ok,先看第一张关于消费逻辑的图:

再看消费逻辑精简的顺序图(实际情况会略有差别):

2、ConsumeQueue 结构

ConsumeQueue、MappedFileQueue、MappedFile的关系如下:

ConsumeQueue:MappedFileQueue:MappedFile= 1 : 1 : N。

反应到系统文件如下:

Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ pwd
/Users/yunai/store/consumequeue
Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ cd TopicRead3/
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ ls -ls
total 0
0 drwxr-xr-x  3 yunai  staff  102  4 27 21:52 0
0 drwxr-xr-x  3 yunai  staff  102  4 27 21:55 1
0 drwxr-xr-x  3 yunai  staff  102  4 27 21:55 2
0 drwxr-xr-x  3 yunai  staff  102  4 27 21:55 3
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ cd 0/
Yunai-MacdeMacBook-Pro-2:0 yunai$ ls -ls
total 11720
11720 -rw-r--r--  1 yunai  staff  6000000  4 27 21:55 00000000000000000000
复制

ConsumeQueue、MappedFileQueue、MappedFile的定义如下:

  • MappedFile:00000000000000000000等文件。
  • MappedFileQueue:MappedFile所在的文件夹,对MappedFile进行封装成文件队列,对上层提供可无限使用的文件容量。
    • 每个MappedFile统一文件大小。
    • 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在ConsumeQueue里默认为 6000000B。
  • ConsumeQueue:针对MappedFileQueue的封装使用。
    • Store:ConsumeQueue=ConcurrentHashMap>。

ConsumeQueue存储在MappedFile的内容必须大小是 20B(ConsumeQueue.CQ_STORE_UNIT_SIZE),有两种内容类型:

  1. MESSAGE_POSITION_INFO:消息位置信息。
  2. BLANK: 文件前置空白占位。当历史Message被删除时,需要用BLANK占位被删除的消息。

MESSAGE_POSITION_INFO在ConsumeQueue存储结构:

第几位

字段

说明

数据类型

字节数

1

offset

消息 CommitLog 存储位置

Long

8

2

size

消息长度

Int

4

3

tagsCode

消息tagsCode

Long

8

BLANK在ConsumeQueue存储结构:

第几位

字段

说明

数据类型

字节数

1

0

Long

8

2

Integer.MAX_VALUE

Int

4

3

0

Long

8

3、ConsumeQueue 存储

主要有两个组件:

  • ReputMessageService:write ConsumeQueue。
  • FlushConsumeQueueService:flush ConsumeQueue。
ReputMessageService
1: class ReputMessageService extends ServiceThread {
  2: 
  3:     /**
  4:      * 开始重放消息的CommitLog物理位置
  5:      */
  6:     private volatile long reputFromOffset = 0;
  7: 
  8:     public long getReputFromOffset() {
  9:         return reputFromOffset;
 10:     }
 11: 
 12:     public void setReputFromOffset(long reputFromOffset) {
 13:         this.reputFromOffset = reputFromOffset;
 14:     }
 15: 
 16:     @Override
 17:     public void shutdown() {
 18:         for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
 19:             try {
 20:                 Thread.sleep(100);
 21:             } catch (InterruptedException ignored) {
 22:             }
 23:         }
 24: 
 25:         if (this.isCommitLogAvailable()) {
 26:             log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
 27:                 DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
 28:         }
 29: 
 30:         super.shutdown();
 31:     }
 32: 
 33:     /**
 34:      * 剩余需要重放消息字节数
 35:      *
 36:      * @return 字节数
 37:      */
 38:     public long behind() {
 39:         return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
 40:     }
 41: 
 42:     /**
 43:      * 是否commitLog需要重放消息
 44:      *
 45:      * @return 是否
 46:      */
 47:     private boolean isCommitLogAvailable() {
 48:         return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
 49:     }
 50: 
 51:     private void doReput() {
 52:         for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
 53: 
 54:             // TODO 疑问:这个是啥
 55:             if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
 56:                 && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
 57:                 break;
 58:             }
 59: 
 60:             // 获取从reputFromOffset开始的commitLog对应的MappeFile对应的MappedByteBuffer
 61:             SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
 62:             if (result != null) {
 63:                 try {
 64:                     this.reputFromOffset = result.getStartOffset();
 65: 
 66:                     // 遍历MappedByteBuffer
 67:                     for (int readSize = 0; readSize < result.getSize() && doNext; ) {
 68:                         // 生成重放消息重放调度请求
 69:                         DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
 70:                         int size = dispatchRequest.getMsgSize(); // 消息长度
 71:                         // 根据请求的结果处理
 72:                         if (dispatchRequest.isSuccess()) { // 读取成功
 73:                             if (size > 0) { // 读取Message
 74:                                 DefaultMessageStore.this.doDispatch(dispatchRequest);
 75:                                 // 通知有新消息
 76:                                 if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
 77:                                     && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
 78:                                     DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
 79:                                         dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
 80:                                         dispatchRequest.getTagsCode());
 81:                                 }
 82:                                 // FIXED BUG By shijia
 83:                                 this.reputFromOffset += size;
 84:                                 readSize += size;
 85:                                 // 统计
 86:                                 if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
 87:                                     DefaultMessageStore.this.storeStatsService
 88:                                         .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
 89:                                     DefaultMessageStore.this.storeStatsService
 90:                                         .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
 91:                                         .addAndGet(dispatchRequest.getMsgSize());
 92:                                 }
 93:                             } else if (size == 0) { // 读取到MappedFile文件尾
 94:                                 this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
 95:                                 readSize = result.getSize();
 96:                             }
 97:                         } else if (!dispatchRequest.isSuccess()) { // 读取失败
 98:                             if (size > 0) { // 读取到Message却不是Message
 99:                                 log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
100:                                 this.reputFromOffset += size;
101:                             } else { // 读取到Blank却不是Blank
102:                                 doNext = false;
103:                                 if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
104:                                     log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
105:                                         this.reputFromOffset);
106: 
107:                                     this.reputFromOffset += result.getSize() - readSize;
108:                                 }
109:                             }
110:                         }
111:                     }
112:                 } finally {
113:                     result.release();
114:                 }
115:             } else {
116:                 doNext = false;
117:             }
118:         }
119:     }
120: 
121:     @Override
122:     public void run() {
123:         DefaultMessageStore.log.info(this.getServiceName() + " service started");
124: 
125:         while (!this.isStopped()) {
126:             try {
127:                 Thread.sleep(1);
128:                 this.doReput();
129:             } catch (Exception e) {
130:                 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
131:             }
132:         }
133: 
134:         DefaultMessageStore.log.info(this.getServiceName() + " service end");
135:     }
136: 
137:     @Override
138:     public String getServiceName() {
139:         return ReputMessageService.class.getSimpleName();
140:     }
141: 
142: }
复制
  • 说明:重放消息线程服务。
    • 该服务不断生成 消息位置信息 到 消费队列(ConsumeQueue)
    • 该服务不断生成 消息索引 到 索引文件(IndexFile)
  • 第 75 至 81 行 :当Broker是主节点 &&Broker开启的是长轮询,通知消费队列有新的消息。NotifyMessageArrivingListener会 调用PullRequestHoldService#notifyMessageArriving(...)方法,详细解析见:PullRequestHoldService
  • 第 61 行 :获取reputFromOffset开始的CommitLog对应的MappedFile对应的MappedByteBuffer。
  • 第 67 行 :遍历MappedByteBuffer。
  • 第 69 行 :生成重放消息重放调度请求 (DispatchRequest) 。请求里主要包含一条消息 (Message) 或者 文件尾 (BLANK) 的基本信息。
  • 第 72 至 96 行 :请求是有效请求,进行逻辑处理。
  • 第 73 至 92 行 :请求对应的是Message,进行调度,生成ConsumeQueue和IndexFile对应的内容。详细解析见:
  • 第 93 至 96 行 :请求对应的是Blank,即文件尾,跳转指向下一个MappedFile。
  • 第 97 至 110 行 :请求是无效请求。出现该情况,基本是一个BUG
  • 第 127 至 128 行 :每 1ms 循环执行重放逻辑。
  • 第 18 至 30 行 :shutdown时,多次sleep(100)直到CommitLog回放到最新位置。恩,如果未回放完,会输出警告日志。
DefaultMessageStore#doDispatch(...)
1: /**
  2:  * 执行调度请求
  3:  * 1. 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
  4:  * 2. 建立 索引信息 到 IndexFile
  5:  *
  6:  * @param req 调度请求
  7:  */
  8: public void doDispatch(DispatchRequest req) {
  9:     // 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue
 10:     final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
 11:     switch (tranType) {
 12:         case MessageSysFlag.TRANSACTION_NOT_TYPE:
 13:         case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
 14:             DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
 15:                 req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
 16:             break;
 17:         case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
 18:         case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
 19:             break;
 20:     }
 21:     // 建立 索引信息 到 IndexFile
 22:     if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
 23:         DefaultMessageStore.this.indexService.buildIndex(req);
 24:     }
 25: }
 26: 
 27: /**
 28:  * 建立 消息位置信息 到 ConsumeQueue
 29:  *
 30:  * @param topic 主题
 31:  * @param queueId 队列编号
 32:  * @param offset commitLog存储位置
 33:  * @param size 消息长度
 34:  * @param tagsCode 消息tagsCode
 35:  * @param storeTimestamp 存储时间
 36:  * @param logicOffset 队列位置
 37:  */
 38: public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,
 39:     long logicOffset) {
 40:     ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
 41:     cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
 42: }
复制ConsumeQueue#putMessagePositionInfoWrapper(...)
1: /**
  2:  * 添加位置信息封装
  3:  *
  4:  * @param offset commitLog存储位置
  5:  * @param size 消息长度
  6:  * @param tagsCode 消息tagsCode
  7:  * @param storeTimestamp 消息存储时间
  8:  * @param logicOffset 队列位置
  9:  */
 10: public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
 11:     long logicOffset) {
 12:     final int maxRetries = 30;
 13:     boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
 14:     // 多次循环写,直到成功
 15:     for (int i = 0; i < maxRetries && canWrite; i++) {
 16:         // 调用添加位置信息
 17:         boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
 18:         if (result) {
 19:             // 添加成功,使用消息存储时间 作为 存储check point。
 20:             this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp);
 21:             return;
 22:         } else {
 23:             // XXX: warn and notify me
 24:             log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset
 25:                 + " failed, retry " + i + " times");
 26: 
 27:             try {
 28:                 Thread.sleep(1000);
 29:             } catch (InterruptedException e) {
 30:                 log.warn("", e);
 31:             }
 32:         }
 33:     }
 34: 
 35:     // XXX: warn and notify me 设置异常不可写入
 36:     log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
 37:     this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
 38: }
 39: 
 40: /**
 41:  * 添加位置信息,并返回添加是否成功
 42:  *
 43:  * @param offset commitLog存储位置
 44:  * @param size 消息长度
 45:  * @param tagsCode 消息tagsCode
 46:  * @param cqOffset 队列位置
 47:  * @return 是否成功
 48:  */
 49: private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
 50:     final long cqOffset) {
 51:     // 如果已经重放过,直接返回成功
 52:     if (offset <= this.maxPhysicOffset) {
 53:         return true;
 54:     }
 55:     // 写入位置信息到byteBuffer
 56:     this.byteBufferIndex.flip();
 57:     this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
 58:     this.byteBufferIndex.putLong(offset);
 59:     this.byteBufferIndex.putInt(size);
 60:     this.byteBufferIndex.putLong(tagsCode);
 61:     // 计算consumeQueue存储位置,并获得对应的MappedFile
 62:     final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
 63:     MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
 64:     if (mappedFile != null) {
 65:         // 当是ConsumeQueue第一个MappedFile && 队列位置非第一个 && MappedFile未写入内容,则填充前置空白占位
 66:         if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { // TODO 疑问:为啥这个操作。目前能够想象到的是,一些老的消息很久没发送,突然发送,这个时候刚好满足。
 67:             this.minLogicOffset = expectLogicOffset;
 68:             this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
 69:             this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
 70:             this.fillPreBlank(mappedFile, expectLogicOffset);
 71:             log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
 72:                 + mappedFile.getWrotePosition());
 73:         }
 74:         // 校验consumeQueue存储位置是否合法。TODO 如果不合法,继续写入会不会有问题?
 75:         if (cqOffset != 0) {
 76:             long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
 77:             if (expectLogicOffset != currentLogicOffset) {
 78:                 LOG_ERROR.warn(
 79:                     "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
 80:                     expectLogicOffset,
 81:                     currentLogicOffset,
 82:                     this.topic,
 83:                     this.queueId,
 84:                     expectLogicOffset - currentLogicOffset
 85:                 );
 86:             }
 87:         }
 88:         // 设置commitLog重放消息到ConsumeQueue位置。
 89:         this.maxPhysicOffset = offset;
 90:         // 插入mappedFile
 91:         return mappedFile.appendMessage(this.byteBufferIndex.array());
 92:     }
 93:     return false;
 94: }
 95: 
 96: /**
 97:  * 填充前置空白占位
 98:  *
 99:  * @param mappedFile MappedFile
100:  * @param untilWhere consumeQueue存储位置
101:  */
102: private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
103:     // 写入前置空白占位到byteBuffer
104:     ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
105:     byteBuffer.putLong(0L);
106:     byteBuffer.putInt(Integer.MAX_VALUE);
107:     byteBuffer.putLong(0L);
108:     // 循环填空
109:     int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
110:     for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
111:         mappedFile.appendMessage(byteBuffer.array());
112:     }
113: }
复制
  • #putMessagePositionInfoWrapper(...)说明 :添加位置信息到ConsumeQueue的封装,实际需要调用#putMessagePositionInfo(...)方法。
    • 第 13 行 :判断ConsumeQueue是否允许写入。当发生Bug时,不允许写入。
    • 第 17 行 :调用#putMessagePositionInfo(...)方法,添加位置信息。
    • 第 18 至 21 行 :添加成功,使用消息存储时间 作为 存储检查点。StoreCheckpoint的详细解析见:Store初始化与关闭。
    • 第 22 至 32 行 :添加失败,目前基本可以认为是BUG。
    • 第 35 至 37 行 :写入失败时,标记ConsumeQueue写入异常,不允许继续写入。
  • #putMessagePositionInfo(...)说明 :添加位置信息到ConsumeQueue,并返回添加是否成功。
    • 这块比较有疑问,如果计算出来的存储位置不合法,不返回添加失败,继续进行添加位置信息,会不会有问题???
    • 第 51 至 54 行 :如果offset(存储位置) 小于等于maxPhysicOffset(CommitLog消息重放到ConsumeQueue最大的CommitLog存储位置),表示已经重放过,此时,不再重复写入,直接返回写入成功。
    • 第 55 至 60 行 :写 位置信息到byteBuffer。
    • 第 62 至 63 行 :计算ConsumeQueue存储位置,并获得对应的MappedFile。
    • 第 65 至 73 行 :当MappedFile是ConsumeQueue当前第一个文件 &&MappedFile未写入内容 && 重放消息队列位置大于0,则需要进行MappedFile填充前置BLANK。
    • 这块比较有疑问,什么场景下会需要。猜测产生的原因:一个Topic长期无消息产生,突然N天后进行发送,Topic对应的历史消息以及和消费队列数据已经被清理,新生成的MappedFile需要前置占位。
    • 第 74 至 87 行 :校验ConsumeQueue存储位置是否合法,不合法则输出日志。
    • 第 89 行 :设置CommitLog重放消息到ConsumeQueue的最大位置。
    • 第 91 行 :插入消息位置到MappedFile。
FlushConsumeQueueService
1: class FlushConsumeQueueService extends ServiceThread {
  2:     private static final int RETRY_TIMES_OVER = 3;
  3:     /**
  4:      * 最后flush时间戳
  5:      */
  6:     private long lastFlushTimestamp = 0;
  7: 
  8:     private void doFlush(int retryTimes) {
  9:         int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
 10: 
 11:         // retryTimes == RETRY_TIMES_OVER时,进行强制flush。主要用于shutdown时。
 12:         if (retryTimes == RETRY_TIMES_OVER) {
 13:             flushConsumeQueueLeastPages = 0;
 14:         }
 15:         // 当时间满足flushConsumeQueueThoroughInterval时,即使写入的数量不足flushConsumeQueueLeastPages,也进行flush
 16:         long logicsMsgTimestamp = 0;
 17:         int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
 18:         long currentTimeMillis = System.currentTimeMillis();
 19:         if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
 20:             this.lastFlushTimestamp = currentTimeMillis;
 21:             flushConsumeQueueLeastPages = 0;
 22:             logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
 23:         }
 24:         // flush消费队列
 25:         ConcurrentHashMap> tables = DefaultMessageStore.this.consumeQueueTable;
 26:         for (ConcurrentHashMap maps : tables.values()) {
 27:             for (ConsumeQueue cq : maps.values()) {
 28:                 boolean result = false;
 29:                 for (int i = 0; i < retryTimes && !result; i++) {
 30:                     result = cq.flush(flushConsumeQueueLeastPages);
 31:                 }
 32:             }
 33:         }
 34:         // flush 存储 check point
 35:         if (0 == flushConsumeQueueLeastPages) {
 36:             if (logicsMsgTimestamp > 0) {
 37:                 DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
 38:             }
 39:             DefaultMessageStore.this.getStoreCheckpoint().flush();
 40:         }
 41:     }
 42: 
 43:     public void run() {
 44:         DefaultMessageStore.log.info(this.getServiceName() + " service started");
 45: 
 46:         while (!this.isStopped()) {
 47:             try {
 48:                 int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
 49:                 this.waitForRunning(interval);
 50:                 this.doFlush(1);
 51:             } catch (Exception e) {
 52:                 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
 53:             }
 54:         }
 55: 
 56:         this.doFlush(RETRY_TIMES_OVER);
 57: 
 58:         DefaultMessageStore.log.info(this.getServiceName() + " service end");
 59:     }
 60: 
 61:     @Override
 62:     public String getServiceName() {
 63:         return FlushConsumeQueueService.class.getSimpleName();
 64:     }
 65: 
 66:     @Override
 67:     public long getJointime() {
 68:         return 1000 * 60;
 69:     }
 70: }
复制
  • 说明 :flushConsumeQueue(消费队列) 线程服务。
  • 第 11 至 14 行 :当retryTimes==RETRY_TIMES_OVER时,进行强制flush。用于shutdown时。
  • 第 15 至 23 行 :每 flushConsumeQueueThoroughInterval 周期,执行一次 flush 。因为不是每次循环到都能满足 flushConsumeQueueLeastPages 大小,因此,需要一定周期进行一次强制 flush 。当然,不能每次循环都去执行强制 flush,这样性能较差。
  • 第 24 至 33 行 :flushConsumeQueue(消费队列)。
    • flush 逻辑:MappedFile#落盘。
  • 第 34 至 40 行 :flushStoreCheckpoint。StoreCheckpoint的详细解析见:Store初始化与关闭。
  • 第 43 至 59 行 :每 1000ms 执行一次flush。如果 wakeup() 时,则会立即进行一次flush。目前,暂时不存在 wakeup() 的调用。
4、Broker 提供[拉取消息]接口

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/

5、Broker 提供[更新消费进度]接口
Yunai-MacdeMacBook-Pro-2:config yunai$ pwd
/Users/yunai/store/config
Yunai-MacdeMacBook-Pro-2:config yunai$ ls -ls
total 40
8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 consumerOffset.json
8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 consumerOffset.json.bak
8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 delayOffset.json
8 -rw-r--r--  1 yunai  staff    21  4 28 16:58 delayOffset.json.bak
8 -rw-r--r--  1 yunai  staff  1401  4 27 21:51 topics.json
Yunai-MacdeMacBook-Pro-2:config yunai$ cat consumerOffset.json
{
    "offsetTable":{
        "%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
        },
        "TopicRead3@please_rename_unique_group_name_4":{1:5
        }
    }
}
复制
  • consumerOffset.json:消费进度存储文件。
  • consumerOffset.json.bak:消费进度存储文件备份。
  • 每次写入consumerOffset.json,将原内容备份到consumerOffset.json.bak。实现见:MixAll#string2File(...)。
BrokerController#initialize(...)
1:this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  2:    @Override
  3:    public void run() {
  4:        try {
  5:            BrokerController.this.consumerOffsetManager.persist();
  6:        } catch (Throwable e) {
  7:            log.error("schedule persist consumerOffset error.", e);
  8:        }
  9:    }
 10:}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
复制
  • 说明 :每 5s 执行一次持久化逻辑。

// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/

6、Broker 提供[发回消息]接口
关联标签: