MQ44# RocketMQ几个最近被问的问题
小编:管理员 312阅读 2022.08.03
前言
本周有点琐事,没有源码文章输出,掉一次链子。整理了几个最近被问的问题,大家随便看看。
问题一
问: RocketMQ消费者订阅了tag,但却收不到消息无法消费,并且根据 msgid 去查询,发现这条消息的状态为 CONSUMED_BUT_FILTERED,那这是为什么?
答: 在RocketMQ中,一个消费组能同时订阅多个 tag,但一个消费组的不同消费者不能分开订阅不同的tag,即同一个消费组的订阅关系必须保持一样。例如:常见错误使用方式同一个项目中,一段消费代码订阅tagA,然后拷贝到这段代码再更改为tagB。
正确用法
public void subscribe(){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("melon_online_test_consumer"); consumer.subscribe("melon_online_test","tag1 || tag2 || tag3"); }复制
错误用法
public class SubscribeTest { public void subscribeA(){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("melon_online_test_consumer"); consumer.subscribe("melon_online_test","tag1"); } public void subscribeB(){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("melon_online_test_consumer"); consumer.subscribe("melon_online_test","tag2"); } }复制
问题二
问: 发现大量的RocketMQ client 大量的info日志输出,我不关心,如何禁用呢?
答: 尝试以下设置,项目中使用了Slf4j
@1 可以配置RocketmqClient的logger设置优先级为warn
@2 也可以通过-Drocketmq.client.logUseSlf4j=false 和 -Drocketmq.client.logLevel=WARN 关闭MQ客户端使用Slf4j并提高日志等级
项目中没有使用Slf4j,可以通过-Drocketmq.client.logLevel=WARN调高日志等级。
问题三
问: 我的服务消费后需要调用第三方接口,别人的接口调用有限制,Rocketmq消费可以限流吗?
答: RocketMQ本身没有类似每秒消费多少条数据的精确限流,我们可以结合Sentienl来实现,示例代码如下:
private String KEY = "melon_topic:melon_consumer"; // 资源名称由topic和消费组构成 public static void main(String[] args) throws InterruptedException, MQClientException { initFlowControlRule(); // Sentinel流控规则 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("melon_consumer"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("melon_topic", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List复制msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { Entry entry = null; try { ContextUtil.enter(KEY); // 定义资源 entry = SphU.entry(KEY, EntryType.OUT); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg); } catch (BlockException ex) { // Blocked.被限流后消息重试 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } finally { if (entry != null) { entry.exit(); } ContextUtil.exit(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } private static void initFlowControlRule() { FlowRule rule = new FlowRule(); rule.setResource(KEY); rule.setCount(5);// 每秒通过5条消息 rule.setGrade(RuleConstant.FLOW_GRADE_QPS); rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER); rule.setMaxQueueingTimeMs(5 * 1000); // 排队超时时间5秒 FlowRuleManager.loadRules(Collections.singletonList(rule)); }
问题四
问:RocketMQ默认延迟等级有18个,我可以扩增吗?
答: 可以的,但是不建议扩增太多等级,可以通过修改broker属性messageDelayLevel来实现,注意修改了后需要重启broker。例如:
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d 3d 7d 14d 21d复制
相关推荐
- 【RocketMQ系列】RocketMQ集群,RocketMQ-on-DLedger集群 本文RocketMQ系列第四篇,主要介绍RocketMQ集群及如何部署自动容灾切换的 RocketMQ-on-DLedger Group。RocketMQ集群搭建ROcketMQ集群搭建有以下几种方案:「单Master模式」「多Master模式」「多Master多Slave模式-异步复制」「多Master多Slave模式-同步双写」其…
- 3DMAX提示和技巧 本主题标识使用 Civil View 的一些重要提示和技巧。常规使用屏幕分辨率至少为 1280x1024 的 Civil View。低于此分辨率时,一些面板将占用过多屏幕空间。 将视口设置为线框显示以达到最佳性能。 要尽可能简化用户界面,请在单个视口中工作并关闭 3ds Max 命令面…