批量消息
在高并发场景中,批量发送消息能显著提高传递消息发送时的性能(减少网络连接及IO的开销)。使用批量消息时的限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK(集群时会细讲),且不能是延时消息。
在发送批量消息时先构建一个消息对象集合,然后调用send(Collection msg)系列的方法即可。由于批量消息的4MB限制,所以一般情况下在集合中添加消息需要先计算当前集合中消息对象的大小是否超过限制,如果超过限制也可以使用分割消息的方式进行多次批量发送。
1 一般批量发送(不考虑消息分割)
因为批量消息是一个Collection,所以送入消息可以是List,也可以使Set,这里为方便起见,使用List进行批量组装发送。
生产者
package com.neu.rocketmq.example.batch;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
/**
* Created with IntelliJ IDEA.
*
* @Author: yqq
* @Date: 2023/06/20/15:39
* @Description:
*/
public class BatchProducer {
public static void main(String[] args) throws Exception {
//实例化生产者
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
//指定NameServer地址
producer.setNamesrvAddr("node1:9876");
//启动生产者
producer.start();
String topic = "BatchTopic";
List messageList = new ArrayList();
messageList.add(new Message(topic,"Tag","OrderID-001","Hello world 1".getBytes()));
messageList.add(new Message(topic,"Tag","OrderID-002","Hello world 2".getBytes()));
messageList.add(new Message(topic,"Tag","OrderID-003","Hello world 3".getBytes()));
try {
producer.send(messageList);
}catch (Exception e){
producer.shutdown();
e.printStackTrace();
}
//关闭生产者
producer.shutdown();
}
}
2 批量切分发送
如果消息的总长度可能大于4MB时,这时候最好把消息进行分割,案例中以1M大小进行消息分割。
我们需要发送10万元素的数组,这个量很大,怎么快速发送完。使用批量发送,同时每一批控制在1M左右确保不超过消息大小限制。
package com.neu.rocketmq.example.batch;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* Created with IntelliJ IDEA.
*
* @Author: yqq
* @Date: 2023/06/20/15:51
* @Description:
*/
public class SplitBatchProducer {
public static void main(String[] args) throws Exception {
//实例化生产者
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
//指定NameServer地址
producer.setNamesrvAddr("node1:9876");
//启动生产者
producer.start();
String topic = "BatchTopic";
//使用List组装
List messages = new ArrayList(100 * 1000);
for (int i = 0; i messageList = splitter.next();
producer.send(messageList);
Thread.sleep(100);
}
}
}
class ListSplitter implements Iterator>{
private int sizeLimit = 1000 * 1000;//1M
private final List messages;
private int currIndex;
public ListSplitter(List messages){
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex next() {
int nextIndex = currIndex;
int totalSize = 0;
for (;nextIndex properties = message.getProperties();
for (Map.Entry entry : properties.entrySet()){
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = totalSize + 20; //增加日子开销的字节
if(tmpSize >sizeLimit){
//单个消息超过了最大的限制(1M),否则会阻塞进程
if (nextIndex - sizeLimit == 0){
//假如下一个子列表没有元素,则添加这个子列表然后推出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > sizeLimit){
break;
}else {
totalSize += tmpSize;
}
}
List subList = this.messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
消费者代码
package com.neu.rocketmq.example.batch;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/**
* Created with IntelliJ IDEA.
*
* @Author: yqq
* @Date: 2023/06/20/15:46
* @Description:
*/
public class BatchConsumer {
public static void main(String[] args) throws Exception {
//实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchConsumer");
//指定NameServer地址
consumer.setNamesrvAddr("node1:9876");
//订阅主题
consumer.subscribe("BatchTopic","*");
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
//注册回调函数。处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Message: %s %n",Thread.currentThread().getName(),msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
相关推荐: 通用Propterties工具类,兼容读jar同级目录的文件。
1.前言。 properties文件的写方法比较不好用,折中写了个通用类。 2.例子。 1. import 2. import 3. import 4. import 5. import 6. import 7. import 8. import 9. …