分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)
小编:管理员 160阅读 2022.08.01
本文主要基于 RocketMQ 4.0.x 正式版
- 1、概述
- 2、ConsumeQueue 结构
-
3、ConsumeQueue 存储
- DefaultMessageStore#doDispatch(...)
- ConsumeQueue#putMessagePositionInfoWrapper(...)
- ReputMessageService
- FlushConsumeQueueService
-
4、Broker 提供[拉取消息]接口
- PullMessageRequestHeader
- PullMessageProcessor#processRequest(...)
- MessageStore#getMessage(...)
- DefaultMessageFilter#isMessageMatched(...)
- PullRequestHoldService
- PullMessageProcessor#executeRequestWhenWakeup(...)
-
5、Broker 提供[更新消费进度]接口
- MixAll#string2File(...)
- BrokerController#initialize(...)
- ConfigManager
- ConsumerOffsetManager
-
6、Broker 提供[发回消息]接口
- SendMessageProcessor#consumerSendMsgBack(...)
- 7、结尾
1、概述
本章主要解析 消费 逻辑涉及到的源码。 因为篇幅较长,分成上下两篇:
- 上篇:Broker相关源码。
- 下篇:Consumer相关源码。
本文即是上篇。
ok,先看第一张关于消费逻辑的图:
再看消费逻辑精简的顺序图(实际情况会略有差别):
2、ConsumeQueue 结构
ConsumeQueue、MappedFileQueue、MappedFile的关系如下:
ConsumeQueue:MappedFileQueue:MappedFile= 1 : 1 : N。
反应到系统文件如下:
Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ pwd /Users/yunai/store/consumequeue Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ cd TopicRead3/ Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ ls -ls total 0 0 drwxr-xr-x 3 yunai staff 102 4 27 21:52 0 0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 1 0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 2 0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 3 Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ cd 0/ Yunai-MacdeMacBook-Pro-2:0 yunai$ ls -ls total 11720 11720 -rw-r--r-- 1 yunai staff 6000000 4 27 21:55 00000000000000000000复制
ConsumeQueue、MappedFileQueue、MappedFile的定义如下:
- MappedFile:00000000000000000000等文件。
-
MappedFileQueue:MappedFile所在的文件夹,对MappedFile进行封装成文件队列,对上层提供可无限使用的文件容量。
- 每个MappedFile统一文件大小。
- 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在ConsumeQueue里默认为 6000000B。
-
ConsumeQueue:针对MappedFileQueue的封装使用。
-
Store:ConsumeQueue=ConcurrentHashMap
>。
-
Store:ConsumeQueue=ConcurrentHashMap
ConsumeQueue存储在MappedFile的内容必须大小是 20B(ConsumeQueue.CQ_STORE_UNIT_SIZE),有两种内容类型:
- MESSAGE_POSITION_INFO:消息位置信息。
- BLANK: 文件前置空白占位。当历史Message被删除时,需要用BLANK占位被删除的消息。
MESSAGE_POSITION_INFO在ConsumeQueue存储结构:
第几位 |
字段 |
说明 |
数据类型 |
字节数 |
---|---|---|---|---|
1 |
offset |
消息 CommitLog 存储位置 |
Long |
8 |
2 |
size |
消息长度 |
Int |
4 |
3 |
tagsCode |
消息tagsCode |
Long |
8 |
BLANK在ConsumeQueue存储结构:
第几位 |
字段 |
说明 |
数据类型 |
字节数 |
---|---|---|---|---|
1 |
0 |
Long |
8 |
|
2 |
Integer.MAX_VALUE |
Int |
4 |
|
3 |
0 |
Long |
8 |
主要有两个组件:
- ReputMessageService:write ConsumeQueue。
- FlushConsumeQueueService:flush ConsumeQueue。
1: class ReputMessageService extends ServiceThread { 2: 3: /** 4: * 开始重放消息的CommitLog物理位置 5: */ 6: private volatile long reputFromOffset = 0; 7: 8: public long getReputFromOffset() { 9: return reputFromOffset; 10: } 11: 12: public void setReputFromOffset(long reputFromOffset) { 13: this.reputFromOffset = reputFromOffset; 14: } 15: 16: @Override 17: public void shutdown() { 18: for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) { 19: try { 20: Thread.sleep(100); 21: } catch (InterruptedException ignored) { 22: } 23: } 24: 25: if (this.isCommitLogAvailable()) { 26: log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}", 27: DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset); 28: } 29: 30: super.shutdown(); 31: } 32: 33: /** 34: * 剩余需要重放消息字节数 35: * 36: * @return 字节数 37: */ 38: public long behind() { 39: return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset; 40: } 41: 42: /** 43: * 是否commitLog需要重放消息 44: * 45: * @return 是否 46: */ 47: private boolean isCommitLogAvailable() { 48: return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset(); 49: } 50: 51: private void doReput() { 52: for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { 53: 54: // TODO 疑问:这个是啥 55: if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() // 56: && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { 57: break; 58: } 59: 60: // 获取从reputFromOffset开始的commitLog对应的MappeFile对应的MappedByteBuffer 61: SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); 62: if (result != null) { 63: try { 64: this.reputFromOffset = result.getStartOffset(); 65: 66: // 遍历MappedByteBuffer 67: for (int readSize = 0; readSize < result.getSize() && doNext; ) { 68: // 生成重放消息重放调度请求 69: DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); 70: int size = dispatchRequest.getMsgSize(); // 消息长度 71: // 根据请求的结果处理 72: if (dispatchRequest.isSuccess()) { // 读取成功 73: if (size > 0) { // 读取Message 74: DefaultMessageStore.this.doDispatch(dispatchRequest); 75: // 通知有新消息 76: if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() 77: && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { 78: DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), 79: dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, 80: dispatchRequest.getTagsCode()); 81: } 82: // FIXED BUG By shijia 83: this.reputFromOffset += size; 84: readSize += size; 85: // 统计 86: if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { 87: DefaultMessageStore.this.storeStatsService 88: .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); 89: DefaultMessageStore.this.storeStatsService 90: .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) 91: .addAndGet(dispatchRequest.getMsgSize()); 92: } 93: } else if (size == 0) { // 读取到MappedFile文件尾 94: this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); 95: readSize = result.getSize(); 96: } 97: } else if (!dispatchRequest.isSuccess()) { // 读取失败 98: if (size > 0) { // 读取到Message却不是Message 99: log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); 100: this.reputFromOffset += size; 101: } else { // 读取到Blank却不是Blank 102: doNext = false; 103: if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { 104: log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}", 105: this.reputFromOffset); 106: 107: this.reputFromOffset += result.getSize() - readSize; 108: } 109: } 110: } 111: } 112: } finally { 113: result.release(); 114: } 115: } else { 116: doNext = false; 117: } 118: } 119: } 120: 121: @Override 122: public void run() { 123: DefaultMessageStore.log.info(this.getServiceName() + " service started"); 124: 125: while (!this.isStopped()) { 126: try { 127: Thread.sleep(1); 128: this.doReput(); 129: } catch (Exception e) { 130: DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); 131: } 132: } 133: 134: DefaultMessageStore.log.info(this.getServiceName() + " service end"); 135: } 136: 137: @Override 138: public String getServiceName() { 139: return ReputMessageService.class.getSimpleName(); 140: } 141: 142: }复制
-
说明:重放消息线程服务。
- 该服务不断生成 消息位置信息 到 消费队列(ConsumeQueue)
- 该服务不断生成 消息索引 到 索引文件(IndexFile)
- 第 75 至 81 行 :当Broker是主节点 &&Broker开启的是长轮询,通知消费队列有新的消息。NotifyMessageArrivingListener会 调用PullRequestHoldService#notifyMessageArriving(...)方法,详细解析见:PullRequestHoldService
- 第 61 行 :获取reputFromOffset开始的CommitLog对应的MappedFile对应的MappedByteBuffer。
- 第 67 行 :遍历MappedByteBuffer。
- 第 69 行 :生成重放消息重放调度请求 (DispatchRequest) 。请求里主要包含一条消息 (Message) 或者 文件尾 (BLANK) 的基本信息。
- 第 72 至 96 行 :请求是有效请求,进行逻辑处理。
- 第 73 至 92 行 :请求对应的是Message,进行调度,生成ConsumeQueue和IndexFile对应的内容。详细解析见:
- 第 93 至 96 行 :请求对应的是Blank,即文件尾,跳转指向下一个MappedFile。
- 第 97 至 110 行 :请求是无效请求。出现该情况,基本是一个BUG。
- 第 127 至 128 行 :每 1ms 循环执行重放逻辑。
- 第 18 至 30 行 :shutdown时,多次sleep(100)直到CommitLog回放到最新位置。恩,如果未回放完,会输出警告日志。
1: /** 2: * 执行调度请求 3: * 1. 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue 4: * 2. 建立 索引信息 到 IndexFile 5: * 6: * @param req 调度请求 7: */ 8: public void doDispatch(DispatchRequest req) { 9: // 非事务消息 或 事务提交消息 建立 消息位置信息 到 ConsumeQueue 10: final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag()); 11: switch (tranType) { 12: case MessageSysFlag.TRANSACTION_NOT_TYPE: 13: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: 14: DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(), 15: req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset()); 16: break; 17: case MessageSysFlag.TRANSACTION_PREPARED_TYPE: 18: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: 19: break; 20: } 21: // 建立 索引信息 到 IndexFile 22: if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) { 23: DefaultMessageStore.this.indexService.buildIndex(req); 24: } 25: } 26: 27: /** 28: * 建立 消息位置信息 到 ConsumeQueue 29: * 30: * @param topic 主题 31: * @param queueId 队列编号 32: * @param offset commitLog存储位置 33: * @param size 消息长度 34: * @param tagsCode 消息tagsCode 35: * @param storeTimestamp 存储时间 36: * @param logicOffset 队列位置 37: */ 38: public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp, 39: long logicOffset) { 40: ConsumeQueue cq = this.findConsumeQueue(topic, queueId); 41: cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset); 42: }复制ConsumeQueue#putMessagePositionInfoWrapper(...)
1: /** 2: * 添加位置信息封装 3: * 4: * @param offset commitLog存储位置 5: * @param size 消息长度 6: * @param tagsCode 消息tagsCode 7: * @param storeTimestamp 消息存储时间 8: * @param logicOffset 队列位置 9: */ 10: public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp, 11: long logicOffset) { 12: final int maxRetries = 30; 13: boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable(); 14: // 多次循环写,直到成功 15: for (int i = 0; i < maxRetries && canWrite; i++) { 16: // 调用添加位置信息 17: boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset); 18: if (result) { 19: // 添加成功,使用消息存储时间 作为 存储check point。 20: this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp); 21: return; 22: } else { 23: // XXX: warn and notify me 24: log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset 25: + " failed, retry " + i + " times"); 26: 27: try { 28: Thread.sleep(1000); 29: } catch (InterruptedException e) { 30: log.warn("", e); 31: } 32: } 33: } 34: 35: // XXX: warn and notify me 设置异常不可写入 36: log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId); 37: this.defaultMessageStore.getRunningFlags().makeLogicsQueueError(); 38: } 39: 40: /** 41: * 添加位置信息,并返回添加是否成功 42: * 43: * @param offset commitLog存储位置 44: * @param size 消息长度 45: * @param tagsCode 消息tagsCode 46: * @param cqOffset 队列位置 47: * @return 是否成功 48: */ 49: private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, 50: final long cqOffset) { 51: // 如果已经重放过,直接返回成功 52: if (offset <= this.maxPhysicOffset) { 53: return true; 54: } 55: // 写入位置信息到byteBuffer 56: this.byteBufferIndex.flip(); 57: this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); 58: this.byteBufferIndex.putLong(offset); 59: this.byteBufferIndex.putInt(size); 60: this.byteBufferIndex.putLong(tagsCode); 61: // 计算consumeQueue存储位置,并获得对应的MappedFile 62: final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE; 63: MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); 64: if (mappedFile != null) { 65: // 当是ConsumeQueue第一个MappedFile && 队列位置非第一个 && MappedFile未写入内容,则填充前置空白占位 66: if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { // TODO 疑问:为啥这个操作。目前能够想象到的是,一些老的消息很久没发送,突然发送,这个时候刚好满足。 67: this.minLogicOffset = expectLogicOffset; 68: this.mappedFileQueue.setFlushedWhere(expectLogicOffset); 69: this.mappedFileQueue.setCommittedWhere(expectLogicOffset); 70: this.fillPreBlank(mappedFile, expectLogicOffset); 71: log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " " 72: + mappedFile.getWrotePosition()); 73: } 74: // 校验consumeQueue存储位置是否合法。TODO 如果不合法,继续写入会不会有问题? 75: if (cqOffset != 0) { 76: long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); 77: if (expectLogicOffset != currentLogicOffset) { 78: LOG_ERROR.warn( 79: "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", 80: expectLogicOffset, 81: currentLogicOffset, 82: this.topic, 83: this.queueId, 84: expectLogicOffset - currentLogicOffset 85: ); 86: } 87: } 88: // 设置commitLog重放消息到ConsumeQueue位置。 89: this.maxPhysicOffset = offset; 90: // 插入mappedFile 91: return mappedFile.appendMessage(this.byteBufferIndex.array()); 92: } 93: return false; 94: } 95: 96: /** 97: * 填充前置空白占位 98: * 99: * @param mappedFile MappedFile 100: * @param untilWhere consumeQueue存储位置 101: */ 102: private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) { 103: // 写入前置空白占位到byteBuffer 104: ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE); 105: byteBuffer.putLong(0L); 106: byteBuffer.putInt(Integer.MAX_VALUE); 107: byteBuffer.putLong(0L); 108: // 循环填空 109: int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize()); 110: for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) { 111: mappedFile.appendMessage(byteBuffer.array()); 112: } 113: }复制
-
#putMessagePositionInfoWrapper(...)说明 :添加位置信息到ConsumeQueue的封装,实际需要调用#putMessagePositionInfo(...)方法。
- 第 13 行 :判断ConsumeQueue是否允许写入。当发生Bug时,不允许写入。
- 第 17 行 :调用#putMessagePositionInfo(...)方法,添加位置信息。
- 第 18 至 21 行 :添加成功,使用消息存储时间 作为 存储检查点。StoreCheckpoint的详细解析见:Store初始化与关闭。
- 第 22 至 32 行 :添加失败,目前基本可以认为是BUG。
- 第 35 至 37 行 :写入失败时,标记ConsumeQueue写入异常,不允许继续写入。
-
#putMessagePositionInfo(...)说明 :添加位置信息到ConsumeQueue,并返回添加是否成功。
- 这块比较有疑问,如果计算出来的存储位置不合法,不返回添加失败,继续进行添加位置信息,会不会有问题???
- 第 51 至 54 行 :如果offset(存储位置) 小于等于maxPhysicOffset(CommitLog消息重放到ConsumeQueue最大的CommitLog存储位置),表示已经重放过,此时,不再重复写入,直接返回写入成功。
- 第 55 至 60 行 :写 位置信息到byteBuffer。
- 第 62 至 63 行 :计算ConsumeQueue存储位置,并获得对应的MappedFile。
- 第 65 至 73 行 :当MappedFile是ConsumeQueue当前第一个文件 &&MappedFile未写入内容 && 重放消息队列位置大于0,则需要进行MappedFile填充前置BLANK。
- 这块比较有疑问,什么场景下会需要。猜测产生的原因:一个Topic长期无消息产生,突然N天后进行发送,Topic对应的历史消息以及和消费队列数据已经被清理,新生成的MappedFile需要前置占位。
- 第 74 至 87 行 :校验ConsumeQueue存储位置是否合法,不合法则输出日志。
- 第 89 行 :设置CommitLog重放消息到ConsumeQueue的最大位置。
- 第 91 行 :插入消息位置到MappedFile。
1: class FlushConsumeQueueService extends ServiceThread { 2: private static final int RETRY_TIMES_OVER = 3; 3: /** 4: * 最后flush时间戳 5: */ 6: private long lastFlushTimestamp = 0; 7: 8: private void doFlush(int retryTimes) { 9: int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages(); 10: 11: // retryTimes == RETRY_TIMES_OVER时,进行强制flush。主要用于shutdown时。 12: if (retryTimes == RETRY_TIMES_OVER) { 13: flushConsumeQueueLeastPages = 0; 14: } 15: // 当时间满足flushConsumeQueueThoroughInterval时,即使写入的数量不足flushConsumeQueueLeastPages,也进行flush 16: long logicsMsgTimestamp = 0; 17: int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval(); 18: long currentTimeMillis = System.currentTimeMillis(); 19: if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) { 20: this.lastFlushTimestamp = currentTimeMillis; 21: flushConsumeQueueLeastPages = 0; 22: logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp(); 23: } 24: // flush消费队列 25: ConcurrentHashMap复制> tables = DefaultMessageStore.this.consumeQueueTable; 26: for (ConcurrentHashMap maps : tables.values()) { 27: for (ConsumeQueue cq : maps.values()) { 28: boolean result = false; 29: for (int i = 0; i < retryTimes && !result; i++) { 30: result = cq.flush(flushConsumeQueueLeastPages); 31: } 32: } 33: } 34: // flush 存储 check point 35: if (0 == flushConsumeQueueLeastPages) { 36: if (logicsMsgTimestamp > 0) { 37: DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp); 38: } 39: DefaultMessageStore.this.getStoreCheckpoint().flush(); 40: } 41: } 42: 43: public void run() { 44: DefaultMessageStore.log.info(this.getServiceName() + " service started"); 45: 46: while (!this.isStopped()) { 47: try { 48: int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue(); 49: this.waitForRunning(interval); 50: this.doFlush(1); 51: } catch (Exception e) { 52: DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); 53: } 54: } 55: 56: this.doFlush(RETRY_TIMES_OVER); 57: 58: DefaultMessageStore.log.info(this.getServiceName() + " service end"); 59: } 60: 61: @Override 62: public String getServiceName() { 63: return FlushConsumeQueueService.class.getSimpleName(); 64: } 65: 66: @Override 67: public long getJointime() { 68: return 1000 * 60; 69: } 70: }
- 说明 :flushConsumeQueue(消费队列) 线程服务。
- 第 11 至 14 行 :当retryTimes==RETRY_TIMES_OVER时,进行强制flush。用于shutdown时。
- 第 15 至 23 行 :每 flushConsumeQueueThoroughInterval 周期,执行一次 flush 。因为不是每次循环到都能满足 flushConsumeQueueLeastPages 大小,因此,需要一定周期进行一次强制 flush 。当然,不能每次循环都去执行强制 flush,这样性能较差。
-
第 24 至 33 行 :flushConsumeQueue(消费队列)。
- flush 逻辑:MappedFile#落盘。
- 第 34 至 40 行 :flushStoreCheckpoint。StoreCheckpoint的详细解析见:Store初始化与关闭。
- 第 43 至 59 行 :每 1000ms 执行一次flush。如果 wakeup() 时,则会立即进行一次flush。目前,暂时不存在 wakeup() 的调用。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/
5、Broker 提供[更新消费进度]接口Yunai-MacdeMacBook-Pro-2:config yunai$ pwd /Users/yunai/store/config Yunai-MacdeMacBook-Pro-2:config yunai$ ls -ls total 40 8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 consumerOffset.json 8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 consumerOffset.json.bak 8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 delayOffset.json 8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 delayOffset.json.bak 8 -rw-r--r-- 1 yunai staff 1401 4 27 21:51 topics.json Yunai-MacdeMacBook-Pro-2:config yunai$ cat consumerOffset.json { "offsetTable":{ "%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0 }, "TopicRead3@please_rename_unique_group_name_4":{1:5 } } }复制
- consumerOffset.json:消费进度存储文件。
- consumerOffset.json.bak:消费进度存储文件备份。
- 每次写入consumerOffset.json,将原内容备份到consumerOffset.json.bak。实现见:MixAll#string2File(...)。
1:this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 2: @Override 3: public void run() { 4: try { 5: BrokerController.this.consumerOffsetManager.persist(); 6: } catch (Throwable e) { 7: log.error("schedule persist consumerOffset error.", e); 8: } 9: } 10:}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);复制
- 说明 :每 5s 执行一次持久化逻辑。
// ... 微信长度限制,请访问 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/
6、Broker 提供[发回消息]接口相关推荐
- 【RocketMQ系列】RocketMQ集群,RocketMQ-on-DLedger集群 本文RocketMQ系列第四篇,主要介绍RocketMQ集群及如何部署自动容灾切换的 RocketMQ-on-DLedger Group。RocketMQ集群搭建ROcketMQ集群搭建有以下几种方案:「单Master模式」「多Master模式」「多Master多Slave模式-异步复制」「多Master多Slave模式-同步双写」其…
- 3DMAX提示和技巧 本主题标识使用 Civil View 的一些重要提示和技巧。常规使用屏幕分辨率至少为 1280x1024 的 Civil View。低于此分辨率时,一些面板将占用过多屏幕空间。 将视口设置为线框显示以达到最佳性能。 要尽可能简化用户界面,请在单个视口中工作并关闭 3ds Max 命令面…