聊聊carrera的RocketMQProduceOffsetFetcher
小编:管理员 278阅读 2022.07.29
序
本文主要研究一下carrera的RocketMQProduceOffsetFetcher
RocketMQProduceOffsetFetcherDDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/lag/offset/RocketMQProduceOffsetFetcher.java
public class RocketMQProduceOffsetFetcher { private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProduceOffsetFetcher.class); private DefaultMQAdminExt defaultMQAdminExt; private DefaultMQPullConsumer defaultMQPullConsumer; private String namesrvAddr; public RocketMQProduceOffsetFetcher(String namesrvAddr) { this.defaultMQAdminExt = new DefaultMQAdminExt(); defaultMQAdminExt.setNamesrvAddr(namesrvAddr); defaultMQAdminExt.setInstanceName("admin-" + Long.toString(System.currentTimeMillis())); this.defaultMQPullConsumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null); defaultMQPullConsumer.setInstanceName("admin-" + Long.toString(System.currentTimeMillis())); defaultMQPullConsumer.setNamesrvAddr(namesrvAddr); this.namesrvAddr = namesrvAddr; } public String getNamesrvAddr() { return namesrvAddr; } public void start() throws MQClientException { defaultMQAdminExt.start(); defaultMQPullConsumer.start(); defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true); } public void shutdown() { defaultMQAdminExt.shutdown(); defaultMQPullConsumer.shutdown(); } public ConsumeStats getConsumeStats(String group, String topic) throws Exception { return defaultMQAdminExt.examineConsumeStats(group, topic); } public TopicStatsTable getProduceStats(String topic) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { return defaultMQAdminExt.examineTopicStats(topic); } public PullResult queryMsgByOffset(MessageQueue mq, long offset) throws Exception { return defaultMQPullConsumer.pull(mq, "*", offset, 1); } }复制
- RocketMQProduceOffsetFetcher的构造器接收namesrvAddr,然后创建DefaultMQAdminExt及DefaultMQPullConsumer
- 其start方法会执行defaultMQAdminExt.start()、defaultMQPullConsumer.start()及defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);其shutdown执行defaultMQAdminExt.shutdown()及defaultMQPullConsumer.shutdown()
- 其getConsumeStats方法执行的是defaultMQAdminExt.examineConsumeStats(group, topic);其getProduceStats方法执行的是defaultMQAdminExt.examineTopicStats(topic);其queryMsgByOffset方法执行的是defaultMQPullConsumer.pull(mq, "*", offset, 1)
DDMQ/rocketmq/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { private final DefaultMQAdminExtImpl defaultMQAdminExtImpl; private String adminExtGroup = "admin_ext_group"; private String createTopicKey = MixAll.DEFAULT_TOPIC; private long timeoutMillis = 5000; //...... @Override public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup, topic); } @Override public TopicStatsTable examineTopicStats( String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { return defaultMQAdminExtImpl.examineTopicStats(topic); } //...... }复制
- examineConsumeStats及examineTopicStats都委托给了defaultMQAdminExtImpl
DDMQ/rocketmq/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { private final Logger log = ClientLogger.getLog(); private final DefaultMQAdminExt defaultMQAdminExt; private ServiceState serviceState = ServiceState.CREATE_JUST; private MQClientInstance mqClientInstance; private RPCHook rpcHook; private long timeoutMillis = 20000; private Random random = new Random(); //...... @Override public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { String queryTopic = topic == null ? MixAll.getRetryTopic(consumerGroup) : topic; TopicRouteData topicRouteData = this.examineTopicRouteInfo(queryTopic); ConsumeStats result = new ConsumeStats(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { String addr = bd.selectBrokerAddr(); if (addr != null) { ConsumeStats consumeStats = this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3); result.getOffsetTable().putAll(consumeStats.getOffsetTable()); double value = result.getConsumeTps() + consumeStats.getConsumeTps(); result.setConsumeTps(value); } } if (result.getOffsetTable().isEmpty()) { throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message"); } return result; } @Override public TopicStatsTable examineTopicStats( String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); TopicStatsTable topicStatsTable = new TopicStatsTable(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { String addr = bd.selectBrokerAddr(); if (addr != null) { TopicStatsTable tst = this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis); topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable()); } } if (topicStatsTable.getOffsetTable().isEmpty()) { throw new MQClientException("Not found the topic stats info", null); } return topicStatsTable; } //...... }复制
- examineConsumeStats方法通过examineTopicRouteInfo(queryTopic)方法获取topicRouteData,然后通过topicRouteData.getBrokerDatas()获取brokerAddr,之后通过mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3)获取consumeStats;examineTopicStats方法也是先通过examineTopicRouteInfo(topic)方法获取topicRouteData,然后通过topicRouteData.getBrokerDatas()获取brokerAddr,之后通过mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis)获取topicStatsTable
- RocketMQProduceOffsetFetcher的构造器接收namesrvAddr,然后创建DefaultMQAdminExt及DefaultMQPullConsumer
- 其start方法会执行defaultMQAdminExt.start()、defaultMQPullConsumer.start()及defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);其shutdown执行defaultMQAdminExt.shutdown()及defaultMQPullConsumer.shutdown()
- 其getConsumeStats方法执行的是defaultMQAdminExt.examineConsumeStats(group, topic);其getProduceStats方法执行的是defaultMQAdminExt.examineTopicStats(topic);其queryMsgByOffset方法执行的是defaultMQPullConsumer.pull(mq, "*", offset, 1)
- RocketMQProduceOffsetFetcher
相关推荐
- 【RocketMQ系列】RocketMQ集群,RocketMQ-on-DLedger集群 本文RocketMQ系列第四篇,主要介绍RocketMQ集群及如何部署自动容灾切换的 RocketMQ-on-DLedger Group。RocketMQ集群搭建ROcketMQ集群搭建有以下几种方案:「单Master模式」「多Master模式」「多Master多Slave模式-异步复制」「多Master多Slave模式-同步双写」其…
- 3DMAX提示和技巧 本主题标识使用 Civil View 的一些重要提示和技巧。常规使用屏幕分辨率至少为 1280x1024 的 Civil View。低于此分辨率时,一些面板将占用过多屏幕空间。 将视口设置为线框显示以达到最佳性能。 要尽可能简化用户界面,请在单个视口中工作并关闭 3ds Max 命令面…