文章目录
- 设置队列ttl
- 配置文件
- 生产者
- 消费者
- 设置消息ttl
- 延迟插件的使用
- 修改配置文件
- 修改生产者
- 修改消费者
设置队列ttl
代码架构:
创建两个队列QA和QB,两者队列TTL分别设置为10S和40S,然后在创建一个交换机X和死信交换机Y,它们的类型都是direct,创建一个死信队列QD
配置文件
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
@Configuration
public class Rabbitmqconfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
// 声明xExchange
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE); } // 声明xExchange
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); }
//声明队列A ttl为10s并绑定到对应的死信交换机
@Bean("queueA")
public Queue queueA()
{
Map args = new HashMap();
args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//声明当前队列绑定的死信交换机
args.put("x-dead-letter-routing-key","YD");
args.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
@Bean
public Binding queueaBingX(@Qualifier("queueA")Queue queueA,
@Qualifier("xExchange")DirectExchange exchange)
{
return BindingBuilder.bind(queueA).to(exchange).with("XA");//通过XA路由键让交换机与队列A绑定
}
//声明队列A ttl为40s并绑定到对应的死信交换机
@Bean("queueB")
public Queue queueB()
{
Map args = new HashMap();
args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//声明当前队列绑定的死信交换机
args.put("x-dead-letter-routing-key","YD");
args.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
@Bean
public Binding queuebBingX(@Qualifier("queueB")Queue queueA,
@Qualifier("xExchange")DirectExchange exchange)
{
return BindingBuilder.bind(queueA).to(exchange).with("XB");//通过XB路由键让交换机与队列B绑定
//这里队列A和队列B绑定的是同一个交换机
}
@Bean("queueD")
public Queue queueD()
{
return new Queue(DEAD_LETTER_QUEUE);//声明死信队列
}
@Bean
//死信队列与交换机通过yd路由键绑定 这里队列绑定的是y交换机而不是x交换机,上面两个队列绑定的是x交换机
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
生产者
@RestController
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message){
System.out.println("当前时间"+new Date().toString()+" 发送的消息:"+message);
rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10的队列"+message);
rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40的队列"+message);
}
}
消费者
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
//配置文件已经声明了死信队列 Queue("QD");//声明死信队列
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("当前时间:" +new Date().toString()+",收到死信队列信息"+msg); }
}
启动项目:
控制台:
设置消息ttl
上面是对队列属性设置了过期时间,但如果有很多数据需要设置不同的过期时间则需要很多队列,这样明显浪费不必要的内存,这里也可以对消息设置不同过期时间:
再定义一个新队列,这里队列不再设置ttl属性:
@Bean("queueC")
public Queue queueC(){
Map args = new HashMap();
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", "YD"); //没有声明TTL属性
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
//声明队列B绑定X交换机
@Bean public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange)
{
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
修改生产者:
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime)
{
rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{
correlationData.getMessageProperties().setExpiration(ttlTime); return correlationData; });
System.out.println("当前时间:{}"+ new Date().toString()+"发送一条时长"+ttlTime+"毫秒TTL信息给队列C:"
+ message); }
看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置TTL的方式,消息可能并不会按时“死亡“,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
延迟插件的使用
官网https://www.rabbitmq.com/community-plugins.html
下载:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
允许使用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
修改绑定关系:
修改配置文件
在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并不会立即投递到目标队列中,而是存储在mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。
@Configuration
public class DelayedQueueConfig {
//自定义交换机 我们在这里定义的是一个延迟交换机
@Bean
public CustomExchange delayedExchange()
{
Map args = new HashMap();
//自定义交换机的类型
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); }
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange delayedExchange)
{
return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
} }
修改生产者
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, correlationData -> {
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
}
);
}
修改消费者
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){
String msg = new String(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}",
new Date().toString(), msg);
}
小结:
延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用Java的DelayQueue,利用Redis的zset,利用Quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.e1idc.net