rocketmq使用DefaultMQPushConsumer创建消费者客户端

小编:管理员 288阅读 2022.08.03

样例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("organization");
consumer.setNamesrvAddr("172.22.0.64:9876"); // NAME_SERVER地址
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 从哪个位置开始消费消息
consumer.subscribe("my-topic", "*"); // 订阅主题
consumer.registerMessageListener(new MessageListenerOrderly() { // 注册消息监听(顺序方式)
    @Override
    public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.println(new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});
consumer.start();
复制主要参数
  • ConsumeFromWhere 控制新的消费者组从哪个位置开始消费

枚举值

效果

CONSUME_FROM_LAST_OFFSET

从最新的消息开始消费

CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST

废弃,效果同上

CONSUME_FROM_MIN_OFFSET

废弃,效果同上

CONSUME_FROM_MAX_OFFSET

废弃 ,效果同上

CONSUME_FROM_FIRST_OFFSET

从最早的消息开始消费

CONSUME_FROM_TIMESTAMP

从指定时间开始消费

注:如果使用CONSUME_FROM_TIMESTAMP ,需设置参数 DefaultMQPushConsumer.setConsumeTimestamp(“20131223171201”) 时间戳字符串格式为yyyyMMddHHmmss

  • DefaultMQPushConsumer.subscribe(String topic, String subExpression) subExpression参数为tag选择表达式 语法:
  1. 不过滤tag:"*" 或者null
  2. 根据多个tag过滤:“tag1 || tag2 || tag3”
  • DefaultMQPushConsumer.registerMessageListener注册消费监听器
  1. 顺序消费监听器MessageListenerOrderly 返回ConsumeOrderlyStatus

枚举值

效果

SUCCESS

成功

ROLLBACK

废弃

COMMIT

废弃

SUSPEND_CURRENT_QUEUE_A_MOMENT

暂停当前队列一会

  1. 并行消费监听器MessageListenerConcurrently 返回ConsumeConcurrentlyStatus

枚举值

效果

CONSUME_SUCCESS

成功

RECONSUME_LATER

失败,稍后重试

源码
  • org.apache.rocketmq.client.impl.consumer.RebalancePushImpl
switch (consumeFromWhere) {
    case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
    case CONSUME_FROM_MIN_OFFSET:
    case CONSUME_FROM_MAX_OFFSET:
    case CONSUME_FROM_LAST_OFFSET: {
        long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
        if (lastOffset >= 0) {
            result = lastOffset;
        }
        // First start,no offset
        else if (-1 == lastOffset) {
            if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                result = 0L;
            } else {
                try {
                    result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                } catch (MQClientException e) {
                    result = -1;
                }
            }
        } else {
            result = -1;
        }
        break;
    }
    case CONSUME_FROM_FIRST_OFFSET: {
        long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
        if (lastOffset >= 0) {
            result = lastOffset;
        } else if (-1 == lastOffset) {
            result = 0L;
        } else {
            result = -1;
        }
        break;
    }
    case CONSUME_FROM_TIMESTAMP: {
        long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
        if (lastOffset >= 0) {
            result = lastOffset;
        } else if (-1 == lastOffset) {
            if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                try {
                    result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                } catch (MQClientException e) {
                    result = -1;
                }
            } else {
                try {
                    long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                        UtilAll.YYYYMMDDHHMMSS).getTime();
                    result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
                } catch (MQClientException e) {
                    result = -1;
                }
            }
        } else {
            result = -1;
        }
        break;
    }

    default:
        break;
}
复制
关联标签: