rocketmq使用DefaultMQPushConsumer创建消费者客户端
小编:管理员 288阅读 2022.08.03
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("organization"); consumer.setNamesrvAddr("172.22.0.64:9876"); // NAME_SERVER地址 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 从哪个位置开始消费消息 consumer.subscribe("my-topic", "*"); // 订阅主题 consumer.registerMessageListener(new MessageListenerOrderly() { // 注册消息监听(顺序方式) @Override public ConsumeOrderlyStatus consumeMessage(List复制主要参数msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start();
- ConsumeFromWhere 控制新的消费者组从哪个位置开始消费
枚举值 |
效果 |
---|---|
CONSUME_FROM_LAST_OFFSET |
从最新的消息开始消费 |
CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST |
废弃,效果同上 |
CONSUME_FROM_MIN_OFFSET |
废弃,效果同上 |
CONSUME_FROM_MAX_OFFSET |
废弃 ,效果同上 |
CONSUME_FROM_FIRST_OFFSET |
从最早的消息开始消费 |
CONSUME_FROM_TIMESTAMP |
从指定时间开始消费 |
注:如果使用CONSUME_FROM_TIMESTAMP ,需设置参数 DefaultMQPushConsumer.setConsumeTimestamp(“20131223171201”) 时间戳字符串格式为yyyyMMddHHmmss
- DefaultMQPushConsumer.subscribe(String topic, String subExpression) subExpression参数为tag选择表达式 语法:
- 不过滤tag:"*" 或者null
- 根据多个tag过滤:“tag1 || tag2 || tag3”
- DefaultMQPushConsumer.registerMessageListener注册消费监听器
- 顺序消费监听器MessageListenerOrderly 返回ConsumeOrderlyStatus
枚举值 |
效果 |
---|---|
SUCCESS |
成功 |
ROLLBACK |
废弃 |
COMMIT |
废弃 |
SUSPEND_CURRENT_QUEUE_A_MOMENT |
暂停当前队列一会 |
- 并行消费监听器MessageListenerConcurrently 返回ConsumeConcurrentlyStatus
枚举值 |
效果 |
---|---|
CONSUME_SUCCESS |
成功 |
RECONSUME_LATER |
失败,稍后重试 |
- org.apache.rocketmq.client.impl.consumer.RebalancePushImpl
switch (consumeFromWhere) { case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: case CONSUME_FROM_MIN_OFFSET: case CONSUME_FROM_MAX_OFFSET: case CONSUME_FROM_LAST_OFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } // First start,no offset else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { result = 0L; } else { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; } case CONSUME_FROM_FIRST_OFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { result = 0L; } else { result = -1; } break; } case CONSUME_FROM_TIMESTAMP: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } else { try { long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS).getTime(); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; } default: break; }复制
相关推荐
- 【RocketMQ系列】RocketMQ集群,RocketMQ-on-DLedger集群 本文RocketMQ系列第四篇,主要介绍RocketMQ集群及如何部署自动容灾切换的 RocketMQ-on-DLedger Group。RocketMQ集群搭建ROcketMQ集群搭建有以下几种方案:「单Master模式」「多Master模式」「多Master多Slave模式-异步复制」「多Master多Slave模式-同步双写」其…
- 3DMAX提示和技巧 本主题标识使用 Civil View 的一些重要提示和技巧。常规使用屏幕分辨率至少为 1280x1024 的 Civil View。低于此分辨率时,一些面板将占用过多屏幕空间。 将视口设置为线框显示以达到最佳性能。 要尽可能简化用户界面,请在单个视口中工作并关闭 3ds Max 命令面…