RocketMq实战-不同类型生产者(DefaultMQProducer)

小编:管理员 234阅读 2022.08.01

前言 本文来介绍RocketMQ生产者发送消息默认使用的DefaultMQProducer类。

生产者 向消息队列里写入消息,不 同的业务场景需要生产者采用不同的写入策略 。 比如同步发送、异步发送、 延迟发送、 发送事务消息等。

正文 我们结合代码来了解一下,

代码位置在package org.apache.rocketmq.example.simple;

public class AsyncProducer {
    public static void main(
        String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {

        DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);

        for (int i = 0; i < 10000000; i++) {
            try {
                final int index = i;
                Message msg = new Message("Jodie_topic_1023",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}
复制

代码讲解

发送消息要经过几个步骤 :

(1 )设置 Producer 的 GroupName。

(2 )设置 lnstanceName,当一个 Jvm 需要启动多个 Producer 的时候,通过设置不同的 InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”。(本例没有写)

( 3 )设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。想保证不丢消息,可以设置多重试几次 。

(4 )设置 NameServer 地址 。 (5 )组装消息并发送 。

消息的发送有同步和异步两种方式,上面的代码使用的是异步方式 。消息发送的返回状态有如下四种 : FLUSH_DISK_TIMEOUT 、 FLUSH_SLAVE_TIMEOUT 、SLAVE_NOT_AVAILABLE 、SEND_OK,不同状态在不同的刷盘策略和同步策略的配置下含义是不同的 。

FLUSH_DISK_TIMEOUT : 表示没有在规定时间内完成刷盘(需要 Broker 的刷盘策被设置成 SYNC_FLUSH 才会报这个错误) 。 FLUSH_SLAVE_TIMEOUT :表示在主备方式下,并且 Broker被设 置 成 SYNC_MASTER 方式,没有在设定时间内完成 主从同步 。 SLAVE_NOT_AVAILABLE : 这个状态 产生的场景和 FLUSH_SLAVE_TIMEOUT 类似, 表示在主备 方式下,并且 Broker被设置成 SYNC_MASTER,但是没有找到被配置成 Slave 的 Broker。 SEN_ OK :表示发送成功,发送成功的具体含义,比如消息是否已经 被存储到融盘?消息是否被同步到了 Slave上?消息在 Slave上是否被 写人磁盘?需要结合所配置的刷盘策略、主从策略来定 。 这个状态还可 以简单理解为,没有发生上面列出的 三个问题状态就是 SEND OK。

发送延迟消息

Broker收到这类消息后 ,延迟一段时间再处理, 使消息在规定的一段时间后生效。

使用方法:在创建 Message对象时,调用 setDelayTimeLevel ( int level) 方法设置延迟时间, 然后再把这个消息发送 出去。 目前延迟的时间不支 持任意设置,仅支持预设值的时间长度 ( 1s/5s/1Os/30s/Im/2m/3m/4m/5m/6m/ 7m/8m/9m/1Om/20m/30m/1h/2h)。 比如 setDelayTimeLevel(3)表示延迟 10s。

自定义消息发送规则

一个 Topic会有多个 Message Queue,如果使用 Producer的默认配置,这 个 Producer 会轮流向各个 Message Queue 发 送 消息 。 Consumer 在消费消息的 时候,会根据负载均衡策略,消费被分配到的 Message Queue,如果不经过特 定的设置,某条消息被发往哪个 Message Queue,被哪个 Consumer 消费是未 知的。

如果业务 需 要我们把消息 发 送到指定的 Message Queue 里,比如把同 一 类型 的消息都发 往 相同的 Message Queue,可以用 Message­ QueueSelector。

发送消息的时候,把 MessageQueueSelector 的对象作为参数,使用 public SendResult send ( Message msg, MessageQueueSelector selector, Object arg)函 数发送消 息即可 。 在 MessageQueueSelector 的实现中,根据传人的 Object参 数,或者根据 Message 消息内容确定把消息发往那个 Message Queue,返回被 选中的 Message Queue。

关联标签: