1.消息堆积问题介绍
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。
解决消息堆积有三种种思路:
- 增加更多消费者,提高消费速度
- 在消费者内开启线程池加快消息处理速度
- 扩大队列容积,提高堆积上限
注意:要提升队列容积,把消息保存服务器托管网在内存中显然是不行的。
思考:消息不放内存,放在哪里?
2.惰性队列(Lazy Queues)
RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
2.1. 基于RabbitMQ客户端创建设置
我们可以在客户端通过图形化创建队列,在创建队列的时候选择x-queue-mode属性为lazy
2.2. 基于命令行设置lazy-queue
设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解读:
- rabbitmqctl :RabbitMQ的命令行工具
- set_policy :添加一个策略
- Lazy :策略名称,可以自定义
- “^lazy-queue$” :用正则表达式匹配队列的名字
- ‘{“queue-mode”:”lazy”}’ :设置队列模式为lazy模式
- –apply-to queues:策略的作用对象,是所有的队列
2.3. 基于@Bean声明lazy-queue
在消费端设置队列
@Configuration
public class LazyConfig {
//2.声明队列
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy1.queue")
.lazy() //声明x-queue-mode属性为lazy,也就是惰性队列
.build();
}
}
2.4.基于@RabbitListener声明LazyQueue
@RabbitListener(queuesT服务器托管网oDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode" ,value="lazy")
))
public void listenlazyQueue(String msg){
System.out.println("lazy.queue 接收到-----:"+msg+" =====================时间为: "+ LocalTime.now());
}
2.5.测试
在消息发送队创建接口测试
@GetMapping("/lazy/{msg}")
public String lazyMsg(@PathVariable("msg") String msg){
// 发送消息
rabbitTemplate.convertAndSend("lazy.queue", msg);
return "发送成功 时间为"+ LocalTime.now();
}
- 在浏览器输入
http://localhost:8888/publisher/lazy/惰性队列
-
我们会发现消息不存储在
in memory
(内存中),而是持久化(即消息存储在磁盘)
注意:为了模拟出,上面效果,只启动发送消息项目,不启动接收消息项目
3.总结
消息堆积问题的解决方案
- 队列上绑定多个消费者,提高消费速度
- 使用惰性队列,可以再mq中保存更多消息
惰性队列的优点
- 基于磁盘存储,消息上限高
- 没有间歇性的page-out,性能比较稳定
惰性队列的缺点
- 基于磁盘存储,消息时效性会降低
- 性能受限于磁盘的IO
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net