RocketMq实战第九篇-不同场景解决方案
小编:管理员 158阅读 2022.08.02
RocketMQ完结篇,主要使用消息队列处理一些业务场景。
正文顺序消息顺序消息分为全局顺序消息和部分顺序消息。
全局顺序消息指某个 Topic 下的 所有消息都要保证顺序;
部分顺序消息只要保证每一组消息被顺序消费即可,如订单消息的例子,只要保证 同一个订单 ID 的三个消息能按顺序消费 即可。
全局顺序消息
要保 证全局顺序消息, 需要 先把 Topic 的读写队列数设置为 一,然后 Producer 和 Consumer 的并发设置也要是一 。 简单来说,为了保证整个 Topic 的 全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理 。 这时 高并发、高吞吐量的功能完全用不上了 。
部分顺序消息
要保证部分消息有序,需要发送端和消费端配合处理 。 在发送端,要做到 把同一业务 ID 的消息发送到同一个 Message Queue ;在消费过程中,要做到从 同一个 Message Queue 读取的消息不被并发处理,这样才能达到部分有序 。
发送端使用 MessageQueueSelector类来控制 把消息发往哪个 Message Queue,如代码:
for (int i= 0; i <100; i++) { int orderid = i ; Message msg =new Message ("OrderTopic8”, tags,”KEY" + i , (”Hello RocketMQ " +orderid+” ” + i) .getBytes (RemotingHelper . DEFAULT_CHARSET)) ; SendResult sendResult = Producer.send(msg, new MessageQueueSelector() @Override public MessageQueue select (List复制消息重复问题mqs , Message msg ,Object arg) { System.out.println(”queue selector mq nums:”+mqs.size()); System.out.println (”msg info : ”+msg . toString()) ; for (MessageQueue mq: mqs) { System.out.println (mq.toString()) ; } Integer id = (Integer) arg ; int index = id % mqs.size(); return mqs.get(index) ; } }, orderid ); System .out.println(sendResult) }
RocketMQ 选 择了确保一定投递,保证消息不丢失,但有可能造成消息重复 。setRetryTimesWhenSendFailed, 设置在同步方式下自动重试的次数,默认值是 2,这样当第一次发送消息时, Broker端接收到了消息但 是没有正 确返回发送成功的状态,就造成了消息 重复。
消息重复一般情况下不会发生,但是如果消息量大,网络有波动,消息重 复就是个大概率事件。
解决消息重复有两种方法:
- 第一种方法是保证消费逻辑的幕等性(多次调 用和一次调用效果相同);
- 另一种方法是维护一个巳消费消息的记录,消费前查 询这个消息是否被消费过 。 这两种方法都需要使用者自己实现 。
RocketMQ 是个先人先出的队列,不支持消息级别或者 Topic 级别的优先级 。可以通过间接的方式解决,下面列举三种优先级相 关需求的具体处理方法 。
第一种
多个不同的消息类型使用同一个topic时,由于某一个种消息流量非常大,导致其他类型的消息无法及时消费,造成不公平,所以把流量大的类型消息在一个单独的 Topic,其他类型消息在另外一个 Topic,应用程序创建两个 Consumer,分别订阅不同的 Topic,这样就可以了。
第二种
情况和第一种情况类似,但是不用创建大量的 Topic。举个实际应用场景: 一个订单处理系统,接收从 100家快递门店过来的请求,把这些请求 通过 Producer 写人 RocketMQ ;订单处理程序通过 Consumer 从队列里读取消 息并处理,每天最多处理 1 万单 。 如果这 100 个快递门店中某几个门店订单量 大增,比如门店一接了个大客户,一个上午就发出 2万单消息请求,这样其他 的 99 家门店可能被迫等待门店一的 2 万单处理完,也就是两天后订单才能被处 理,显然很不公平 。
这时可以创建 一 个 Topic, 设置 Topic 的 MessageQueue 数 量 超过 100 个,Producer根据订单的门店号,把每个门店的订单写人 一 个 MessageQueue。 DefaultMQPushConsumer默认是采用循环的方式逐个读取一个 Topic 的 所有 MessageQueue,这样如果某家门店订单 量 大增,这家门店对应的 MessageQueue 消息数增多,等待时间增长,但不会造成其他家门店 等 待时间 增长。
DefaultMQPushConsumer 默认的 pullBatchSize 是 32,也就是每次从某个 MessageQueue 读取消息的时候,最多可以读 32 个 。 在上面的场景中,为了更 加公平,可以把 pullBatchSize 设置成 1。
第三种
强制优先级
TypeA、 TypeB、 TypeC 三类消息 。 TypeA 处于第 一优先级,要确保只要有 TypeA消息,必须优先处理; TypeB处于第二优先 级; TypeC 处于第 三 优先级 。 对这种要求,或者逻辑更复杂的要求,就要用 户自己编码实现优先级控制,如果上述的 三 类消息在一个 Topic 里,可以使 用 PullConsumer,自主控制 MessageQueue 的遍历,以及消息的读取;如果 上述三类消息在三个 Topic下,需要启动三个 Consumer, 实现逻辑控制三个 Consumer 的消费 。
提高 Consumer 处理能力(1)提高消费并行度
在同一个 ConsumerGroup 下( Clustering 方式),可以通过增加 Consumer 实例的数量来提高并行度,通过加机器,或者在 已有机器中启动 多个 Consumer 进程都可以增加 Consumer实例数。注意总的 Consumer数量不要超过 Topic下 Read Queue 数量,超过的 Consumer 实例接收不到消息。
(2)以批量方式进行消费
多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及 update 某个数据库, 一次 update10条的时间会大大小于十次 update1条数据的时 间 。 这时可以通过批量方式消费来提高消费的吞吐 量 。
实现方法是设置 Consumer 的 consumeMessageBatchMaxSize 这个参数 ,默 认是 1,如果设置为 N,在消息多的时候每次收到的是个长度为 N的消息链表。
(3)检测延时情况,跳过非重要消息
public ConsumeConcurrentlyStatus consumeMessage (List复制msgs, ConsumeConcurrentlyContext context) { long Offset = msgs.get (0).getQueueOffset() ; String maxOffset = msgs.get(0).getProperty(Message PROPERTY MAX OFFSET); long d iff = Long.parseLong(maxOffset) - Offset; if (diff > 90000) { return ConsumeConcurrentlyStatus .CONSUME SUCCESS; //正常消费消息 return ConsumeConcurrentlyStatus . CONSUME SUCCESS; }
当某个队列的消息数堆积到 90000条以上,就直接丢弃,以 便快速追上发送消息的进度 。
相关推荐
- 【RocketMQ系列】RocketMQ集群,RocketMQ-on-DLedger集群 本文RocketMQ系列第四篇,主要介绍RocketMQ集群及如何部署自动容灾切换的 RocketMQ-on-DLedger Group。RocketMQ集群搭建ROcketMQ集群搭建有以下几种方案:「单Master模式」「多Master模式」「多Master多Slave模式-异步复制」「多Master多Slave模式-同步双写」其…
- 3DMAX提示和技巧 本主题标识使用 Civil View 的一些重要提示和技巧。常规使用屏幕分辨率至少为 1280x1024 的 Civil View。低于此分辨率时,一些面板将占用过多屏幕空间。 将视口设置为线框显示以达到最佳性能。 要尽可能简化用户界面,请在单个视口中工作并关闭 3ds Max 命令面…