大家好,最近在巩固JUC并发包,突然想到如果自己的应用体量不大,但有需要消息队列来实现应用解耦和削峰来缓解服务器突增压力,比如抢票时,突然有比较用户同时抢票,就容易造成服务器同时连接数较多,拒绝其他用户的使用,就想着可以用消息队列来缓解,但是体量有不大,还没必要用MQ框架,那就直接自己写一个,这样,抢票请求来了就直接丢给队列处理器,然后再延迟查询处理结果,这样能减轻不少压力,老样子,先看下实现效果
:
然后看下测试代码:
public class TestOptional {
@Test
public void doTestOptional(){
MxMQMessage> mxMQ = MxMQ.getInstance();
/**
* 添加分区 无消息一直阻塞
*/
mxMQ.addPartion("test", new MQHandlerMessage>() {
@Override
public void hand(Message message) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(message.getMessage());
}
});
/**
* 添加分区 无消息且等待时长超过20秒自动移除该分区
*/
mxMQ.addPartionAutoRemove("test2", new MQHandlerMessage>() {
@Override
public void hand(Message message) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(message.getMessage());
}
});
for(int index = 0;index 20;index++){
int finalIndex = index;
Message message = new Message("test_" + finalIndex);
Message message2 = new Message("test2_" + finalIndex);
try {
mxMQ.sendMessage("test",message);
mxMQ.sendMessage("test2",message2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
while (true){}
}
}
还可以自定义不同分区不同的处理器,逻辑自由定义,下面看下几个关键类:
MxMQRunnable:
package com.mx.mxmq;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class MxMQRunnableT> implements Runnable{
boolean isRun = false;
ArrayBlockingQueueT> arrayBlockingQueue = null;
MQHandlerT> mqHandler = null;
int state = 0;
MxMQ.QueueEmpty queueEmpty = null;
public void setQueueEmpty(MxMQ.QueueEmpty queueEmpty) {
this.queueEmpty = queueEmpty;
}
服务器托管网 public MxMQRunnable(MQHandlerT> mqHandler){
isRun = true;
arrayBlockingQueue = new ArrayBlockingQueue(50);
this.mqHandler = mqHandler;
state = MxMQ.STATE_WAIT;
}
public MxMQRunnable(int number,MQHandlerT> mqHandler){
arrayBlockingQueue = new ArrayBlockingQueue(number);
this.mqHandler = mqHandler;
state = MxMQ.STATE_WAIT;
}
public void setState(int state) {
this.state = state;
}
@Override
public void run() {
while (isRun){
try {
T t = null;
if(state == MxMQ.STATE_WAIT){
t = arrayBlockingQueue.take();
} else {
t = arrayBlockingQueue.poll(20,TimeUnit.SECONDS);
if(t == null){
close();
queueEmpty.empty(this);
break;
}
}
if(mqHandler != null){
mqHan服务器托管网dler.hand(t);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public boolean sendMessage(T t) throws InterruptedException {
return arrayBlockingQueue.offer(t,20, TimeUnit.SECONDS);
}
public boolean removeMessage(T t){
return arrayBlockingQueue.remove(t);
}
public void close(){
isRun = false;
}
}
MxMQ:
package com.mx.mxmq;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MxMQT> {
public static final int STATE_WAIT = 0;
public static final int STATE_REMOVE = 1;
private MxMQ(){
executors = Executors.newCachedThreadPool();
partionRunMap = new ConcurrentHashMap>();
}
public static MxMQ getInstance() {
if(instance == null){
synchronized (MxMQ.class){
if(instance == null){
instance = new MxMQ();
}
}
}
return instance;
}
private static volatile MxMQ instance = null;
private ConcurrentHashMapString,MxMQRunnableT>> partionRunMap = null;
private ExecutorService executors = null;
/**
* 添加分区
* @param partion 分区
* @param mxHandler 处理器
* @return
*/
public boolean addPartion(String partion,MQHandlerT> mxHandler){
if(partionRunMap.get(partion) == null){
MxMQRunnableT> curMxMQRunnable = new MxMQRunnableT>(mxHandler);
partionRunMap.put(partion,curMxMQRunnable);
executors.execute(curMxMQRunnable);
System.out.println(partion+"被添加");
return true;
}
return false;
}
/**
* 当分区里面没有任务超过20秒后就会自动移除分区
* @param partion 分区
* @param mxHandler 处理器
* @return
*/
public boolean addPartionAutoRemove(String partion,MQHandlerT> mxHandler){
if(partionRunMap.get(partion) == null){
MxMQRunnableT> curMxMQRunnable = new MxMQRunnableT>(mxHandler);
curMxMQRunnable.setState(STATE_REMOVE);
curMxMQRunnable.setQueueEmpty(new QueueEmpty() {
@Override
public void empty(MxMQRunnable mxMQRunnable) {
removePartion(partion);
}
});
partionRunMap.put(partion,curMxMQRunnable);
executors.execute(curMxMQRunnable);
System.out.println(partion+"被添加");
return true;
}
return false;
}
public boolean removePartion(String partion){
if(partionRunMap.get(partion) != null){
MxMQRunnableT> remove = partionRunMap.remove(partion);
remove.close();
System.out.println(partion+"被移除");
return true;
}
return false;
}
public boolean sendMessage(String partion,T t) throws InterruptedException {
MxMQRunnableT> tMxMQRunnable = partionRunMap.get(partion);
if(tMxMQRunnable != null){
tMxMQRunnable.sendMessage(t);
return true;
}
return false;
}
public boolean removeMessage(String partion,T t){
MxMQRunnableT> tMxMQRunnable = partionRunMap.get(partion);
if(tMxMQRunnable != null){
return tMxMQRunnable.removeMessage(t);
}
return false;
}
interface QueueEmpty{
void empty(MxMQRunnable mxMQRunnable);
}
}
MQHandler:
package com.mx.mxmq;
public interface MQHandlerT> {
void hand(T t);
}
Message:
package com.mx.mxmq;
public class Message {
String message;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Message(String message){
this.message = message;
}
}
好了,收,大概就是这样子,主要应用场景为:需要轻量级的顺序队列消费 应用场景
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
相关推荐: 竞赛选题 深度学习交通车辆流量分析 – 目标检测与跟踪 – python opencv
文章目录 0 前言 1 课题背景 2 实现效果 3 DeepSORT车辆跟踪 3.1 Deep SORT多目标跟踪算法 3.2 算法流程 4 YOLOV5算法 4.1 网络架构图 4.2 输入端 4.3 基准网络 4.4 Neck网络 4.5 Head输出层 …