RocketMQ学习Broker流程、生产者和存储流程联系

小编:管理员 157阅读 2022.08.03

Broker作为代理,路由注册是通过Broker与nameServer的心跳功能实现的。除此之外,还联系了生产者和消费者、存储。因此可以知道Broker是非常重要的。在生产者发送消息时会将发送的消息结果sendResult放入到SendMessageContext,而此时发送消息处理器会根据上下文放入的发送结果,进行消息的存储工作putMessage操作,由于我看的源码是4.7.0的,因此可以看到在处理数据包上,消息发送处理器中sendMessage的操作已经没有使用,而使用的异步发送消息处理的方式。Broker的启动流程如下:

而此时我们可以看到生产者借助BrokerController的处理器Processor进行了处理将数据包信息放入到了存储DefaultMessageStore中的asyncPutMessage中。

可以看到发送消息的核心在sendKernelImpl中,而在发送消息和存储之间的联系方法MQClientApi#sendMessageAsync,在这个方法中,发送消息的同时,会将发送的结果放入到SendMessageContext中。这样发送消息处理器就可以进行存储操作了,此时会调用DefaultMessageStore#asyncSendMessage。

放入消息之后,进行操作体现在asyncSendMessage中。将消息以异步方式存储到存储器中,处理器可以处理下一个请求,而不是在结果完成后等待结果,以异步方式通知客户端。此时可以看到asyncPutMessage的操作中会进入到CommitLog中,此时进行提交日志操作,此时会执行写入到ByteBuffer中,然后刷盘到硬盘中。同时执行统计操作,进行HA同步。

关联标签: