消息队列中间件 RocketMQ 源码分析 —— Message 存储
小编:管理员 178阅读 2022.08.03
- 1、概述
- 2、CommitLog 结构
-
3、CommitLog 存储消息
- MappedFile#落盘
- FlushRealTimeService
- CommitRealTimeService
- GroupCommitService
- CommitLog#putMessage(...)
- MappedFileQueue#getLastMappedFile(...)
- MappedFile#appendMessage(...)
- DefaultAppendMessageCallback#doAppend(...)
- FlushCommitLogService
本文接《RocketMQ 源码分析 —— Message 发送与接收》。 主要解析CommitLog存储消息部分。
2、CommitLog 结构CommitLog、MappedFileQueue、MappedFile的关系如下:
CommitLog:MappedFileQueue:MappedFile= 1 : 1 : N。
反应到系统文件如下:
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd/Users/yunai/store/commitlog Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -ltotal 10485760 -rw-r--r-- 1 yunai staff 1073741824 4 21 16:27 00000000000000000000 -rw-r--r-- 1 yunai staff 1073741824 4 21 16:29 00000000001073741824 -rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000002147483648 -rw-r--r-- 1 yunai staff 1073741824 4 21 16:33 00000000003221225472 -rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000004294967296复制
CommitLog、MappedFileQueue、MappedFile的定义如下:
- MappedFile:00000000000000000000、00000000001073741824、00000000002147483648等文件。
-
MappedFileQueue:MappedFile所在的文件夹,对MappedFile进行封装成文件队列,对上层提供可无限使用的文件容量。
- 每个MappedFile统一文件大小。
- 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在CommitLog里默认为 1GB。
- CommitLog:针对MappedFileQueue的封装使用。
CommitLog目前存储在MappedFile有两种内容类型:
- MESSAGE :消息。
- BLANK :文件不足以存储消息时的空白占位。
CommitLog存储在MappedFile的结构:
MESSAGE[1] |
MESSAGE[2] |
... |
MESSAGE[n - 1] |
MESSAGE[n] |
BLANK |
---|
MESSAGE在CommitLog存储结构:
第几位 |
字段 |
说明 |
数据类型 |
字节数 |
---|---|---|---|---|
1 |
MsgLen |
消息总长度 |
Int |
4 |
2 |
MagicCode |
MESSAGE_MAGIC_CODE |
Int |
4 |
3 |
BodyCRC |
消息内容CRC |
Int |
4 |
4 |
QueueId |
消息队列编号 |
Int |
4 |
5 |
Flag |
flag |
Int |
4 |
6 |
QueueOffset |
消息队列位置 |
Long |
8 |
7 |
PhysicalOffset |
物理位置。在 CommitLog 的顺序存储位置。 |
Long |
8 |
8 |
SysFlag |
MessageSysFlag |
Int |
4 |
9 |
BornTimestamp |
生成消息时间戳 |
Long |
8 |
10 |
BornHost |
生效消息的地址+端口 |
Long |
8 |
11 |
StoreTimestamp |
存储消息时间戳 |
Long |
8 |
12 |
StoreHost |
存储消息的地址+端口 |
Long |
8 |
13 |
ReconsumeTimes |
重新消费消息次数 |
Int |
4 |
14 |
PreparedTransationOffset |
Long |
8 |
|
15 |
BodyLength + Body |
内容长度 + 内容 |
Int + Bytes |
4 + bodyLength |
16 |
TopicLength + Topic |
Topic长度 + Topic |
Byte + Bytes |
1 + topicLength |
17 |
PropertiesLength + Properties |
拓展字段长度 + 拓展字段 |
Short + Bytes |
2 + PropertiesLength |
BLANK在CommitLog存储结构:
第几位 |
字段 |
说明 |
数据类型 |
字节数 |
---|---|---|---|---|
1 |
maxBlank |
空白长度 |
Int |
4 |
2 |
MagicCode |
BLANK_MAGIC_CODE |
Int |
4 |
CommitLog#putMessage(...)
// 省略代码复制
- 说明 :存储消息,并返回存储结果。
- 第 2 行 :设置存储时间等。
- 第 16 至 36 行 :事务消息相关,暂未了解。
- 第 45 & 97 行 :获取锁与释放锁。
- 第 52 行 :再次设置存储时间。目前会有多处地方设置存储时间。
- 第 55 至 62 行 :获取MappedFile,若不存在或已满,则进行创建。详细解析见:MappedFileQueue#getLastMappedFile(...)。
- 第 65 行 :插入消息到MappedFile,解析解析见:MappedFile#appendMessage(...)。
- 第 69 至 80 行 :MappedFile已满,创建新的,再次插入消息。
- 第 116 至 140 行 :消息刷盘,即持久化到文件。上面插入消息实际未存储到硬盘。此处,根据不同的刷盘策略,执行会有不同。详细解析见:FlushCommitLogService。
- 第 143 至 173 行 :Broker主从同步。后面的文章会详细解析?。
// 省略代码复制
- 说明 :获取最后一个MappedFile,若不存在或文件已满,则进行创建。
- 第 5 至 11 行 :计算当文件不存在或已满时,新创建文件的createOffset。
- 第 14 行 :计算文件名。从此处我们可 以得知,MappedFile的文件命名规则:
> fileName[n] = fileName[n - 1] + n * mappedFileSize > fileName[0] = startOffset - (startOffset % this.mappedFileSize) 目前 `CommitLog` 的 `startOffset` 为 0。 此处有个**疑问**,为什么需要 `(startOffset % this.mappedFileSize)`。例如: | startOffset | mappedFileSize | createOffset | | --- | :-- | :-- | | 5 | 1 | 5 | | 5 | 2 | 4 | | 5 | 3 | 3 | | 5 | 4 | 4 | | 5 | > 5 | 0 | _如果有知道的同学,麻烦提示下。?_*解答:fileName[0] = startOffset - (startOffset % this.mappedFileSize) 计算出来的是,以 `this.mappedFileSize` 为每个文件大小时,`startOffset` 所在文件的开始`offset`*复制
- 第 30 至 35 行 :设置MappedFile是否是第一个创建的文件。该标识用于ConsumeQueue对应的MappedFile,详见ConsumeQueue#fillPreBlank。
// 省略代码复制
- 说明 :插入消息到MappedFile,并返回插入结果。
- 第 8 行 :获取需要写入的字节缓冲区。为什么会有writeBuffer != null的判断后,使用不同的字节缓冲区,见:FlushCommitLogService。
- 第 9 至 11 行 :设置写入position,执行写入,更新wrotePosition(当前写入位置,下次开始写入开始位置)。
// 省略代码复制
- 说明 :插入消息到字节缓冲区。
- 第 45 行 :计算物理位置。在CommitLog的顺序存储位置。
- 第 47 至 49 行 :计算CommitLog里的offsetMsgId。这里一定要和msgId区分开。
计算方式 |
长度 |
|||
---|---|---|---|---|
offsetMsgId |
Broker存储时生成 |
Hex(storeHostBytes, wroteOffset) |
32 |
|
msgId |
Client发送消息时生成 |
Hex(进程编号, IP, ClassLoader, startTime, currentTime, 自增序列) |
32 |
《RocketMQ 源码分析 —— Message 基础》 |
- 第 51 至 61 行 :获取队列位置(offset)。
- 第 78 至 95 行 :计算消息总长度。
- 第 98 至 112 行 :当文件剩余空间不足时,写入BLANK占位,返回结果。
- 第 114 至 161 行 :写入MESSAGE。
- 第 173 行 :更新队列位置(offset)。
线程服务 |
场景 |
插入消息性能 |
---|---|---|
CommitRealTimeService |
异步刷盘 && 开启内存字节缓冲区 |
第一 |
FlushRealTimeService |
异步刷盘 && 关闭内存字节缓冲区 |
第二 |
GroupCommitService |
同步刷盘 |
第三 |
方式 |
|||
---|---|---|---|
方式一 |
写入内存字节缓冲区(writeBuffer) |
从内存字节缓冲区(write buffer)提交(commit)到文件通道(fileChannel) |
文件通道(fileChannel)flush |
方式二 |
写入映射文件字节缓冲区(mappedByteBuffer) |
映射文件字节缓冲区(mappedByteBuffer)flush |
flush相关代码
考虑到写入性能,满足flushLeastPages * OS_PAGE_SIZE才进行flush。
// 省略代码复制
commit相关代码:
考虑到写入性能,满足commitLeastPages * OS_PAGE_SIZE才进行commit。
// 省略代码复制FlushRealTimeService
消息插入成功时,异步刷盘时使用。
// 省略代码复制
- 说明:实时flush线程服务,调用MappedFile#flush相关逻辑。
- 第 23 至 29 行 :每flushPhysicQueueThoroughInterval周期,执行一次flush。因为不是每次循环到都能满足flushCommitLogLeastPages大小,因此,需要一定周期进行一次强制flush。当然,不能每次循环都去执行强制flush,这样性能较差。
- 第 33 行 至 37 行 :根据flushCommitLogTimed参数,可以选择每次循环是固定周期还是等待唤醒。默认配置是后者,所以,每次插入消息完成,会去调用commitLogService.wakeup()。
- 第 45 行 :调用MappedFile进行flush。
- 第 61 至 65 行 :Broker关闭时,强制flush,避免有未刷盘的数据。
消息插入成功时,异步刷盘时使用。 和FlushRealTimeService类似,性能更好。
// 省略代码复制GroupCommitService
消息插入成功时,同步刷盘时使用。
// 省略代码复制
- 说明:批量写入线程服务。
- 第 16 至 25 行 :添加写入请求。方法设置了sync的原因:this.requestsWrite会和this.requestsRead不断交换,无法保证稳定的同步。
- 第 27 至 34 行 :读写队列交换。
-
第 38 至 60 行 :循环写入队列,进行flush。
- 第 43 行 :考虑到有可能每次循环的消息写入的消息,可能分布在两个 MappedFile(写第N个消息时,MappedFile已满,创建了一个新的),所以需要有循环2次。
- 第 51 行 :唤醒等待写入请求线程,通过CountDownLatch实现
- 第 61 至 66 行 :直接刷盘。此处是由于发送的消息的isWaitStoreMsgOK未设置成TRUE,导致未走批量提交。
- 第 73 至 80 行 :每 10ms 执行一次批量提交。当然,如果wakeup()时,则会立即进行一次批量提交。当Broker设置成同步落盘 && 消息isWaitStoreMsgOK=true,消息需要略大于 10ms 才能发送成功。当然,性能相对异步落盘较差,可靠性更高,需要我们在实际使用时去取舍。
相关推荐
- 【RocketMQ系列】RocketMQ集群,RocketMQ-on-DLedger集群 本文RocketMQ系列第四篇,主要介绍RocketMQ集群及如何部署自动容灾切换的 RocketMQ-on-DLedger Group。RocketMQ集群搭建ROcketMQ集群搭建有以下几种方案:「单Master模式」「多Master模式」「多Master多Slave模式-异步复制」「多Master多Slave模式-同步双写」其…
- 3DMAX提示和技巧 本主题标识使用 Civil View 的一些重要提示和技巧。常规使用屏幕分辨率至少为 1280x1024 的 Civil View。低于此分辨率时,一些面板将占用过多屏幕空间。 将视口设置为线框显示以达到最佳性能。 要尽可能简化用户界面,请在单个视口中工作并关闭 3ds Max 命令面…