顺序消息
1 全局有序
全局有序比较简单,主要控制在于创建Topic指定只有一个队列,同步确保生产者与消费者都只有一个实例进行即可。
2 分区有序
在电商业务场景中,一个订单的流程是:创建、付款、推送、完成。在加入RocketMQ后,一个订单会分别产生对于这个订单的创建、付款、推送、完成等消息,如果我们把所有消息全部送入到RocketMQ中的一个主题中,这里该如何实现针对一个订单的消息顺序性呢!如下图:
要完成分区有序性,在生产者环节使用自定义的消息队列选择策略,确保订单号尾数相同的消息会被先后发送到同一个队列中(案例中主题有3个队列,生产环境中可设定成10个满足全部尾数的需求),然后再消费端开启负载均衡模式,最终确保一个消费者拿到的消息对于一个订单来说是有序的。
代码实现生产者
package com.neu.rocketmq.example.ordermessage;
import lombok.Data;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.ArrayList;
import java.util.List;
/**
* Created with IntelliJ IDEA.
*
* @Author: yqq
* @Date: 2023/06/19/21:43
* @Description:
*/
public class ProducerInOrder {
@Data
private static class Order{
private long orderId;
private String desc;
}
private List buildOrders(){
List orderList = new ArrayList();
Order order = new Order();
//订单1
order.setOrderId(001);
order.setDesc("创建");
orderList.add(order);
order = new Order();
order.setOrderId(001);
order.setDesc("付款");
orderList.add(order);
order = new Order();
order.setOrderId(001);
order.setDesc("推送");
orderList.add(order);
order = new Order();
order.setOrderId(001);
order.setDesc("完成");
orderList.add(order);
//订单2
order = new Order();
order.setOrderId(002);
order.setDesc("创建");
orderList.add(order);
order = new Order();
order.setOrderId(002);
order.setDesc("付款");
orderList.add(order);
order = new Order();
order.setOrderId(002);
order.setDesc("推送");
orderList.add(order);
order = new Order();
order.setOrderId(002);
order.setDesc("完成");
orderList.add(order);
//订单3
order = new Order();
order.setOrderId(003);
order.setDesc("创建");
orderList.add(order);
order = new Order();
order.setOrderId(003);
order.setDesc("付款");
orderList.add(order);
order = new Order();
order.setOrderId(003);
order.setDesc("推送");
orderList.add(order);
order = new Order();
order.setOrderId(003);
order.setDesc("完成");
orderList.add(order);
return orderList;
}
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer00");
producer.setNamesrvAddr("node1:9876");
producer.start();
//订单列表
List orderList = new ProducerInOrder().buildOrders();
for (int i = 0; i mqs, Message msg, Object arg) {
Long id = (Long) arg;//根据订单ID选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
},orderList.get(i).getOrderId());//订单ID
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body
));
}
producer.shutdown();
}
}
代码实现消费者
package com.neu.rocketmq.example.ordermessage;
import jdk.nashorn.internal.ir.CallNode;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* Created with IntelliJ IDEA.
*
* @Author: yqq
* @Date: 2023/06/19/21:44
* @Description:
*/
public class ConsumerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer2");
consumer.setNamesrvAddr("node1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("PartOrder010","*");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs){
//每个queue有唯一的consume线程来消费,订单对每个queue分区有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + ", " +
"queueId" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟处理业务逻辑
TimeUnit.MILLISECONDS.sleep(300);
} catch (Exception e) {
e.printStackTrace();
//先等一会儿,再处理这批消息,而不是放到重试队列里
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer started.%s");
}
}
注意事项
使用顺序消息:首先要保证消息是有序进入MQ的,消息放入MQ之前,对id等关键字进行取模,放入指定messageQueue,同时consume消费消息失败时,不能返回reconsume——later,这样会导致乱序,所以应该返回suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
相关推荐: 云知声上市再添一把火,山海大模型将运用到多行业多领域
随着AIGC产业的飞速发展,如今已进入一个崭新的阶段。在这个环境下,大模型产品的实际应用和场景商业化变得越发重要,对于推动产业发展具有举足轻重的作用。云知声,作为国内领先的全栈AI技术企业,正紧跟时代步伐,致力于将山海大模型(UniGPT)融入各种产业场景…