一、MQ 的基本概念
1.1、MQ概述
MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
小结
-
MQ,消息队列,存储消息的中间件
-
分布式系统通信两种方式:直接远程调用 和 借助第三方 完成间接通信
-
发送方称为生产者,接收方称为消费
1.2、MQ 的优势
优势:
-
应用解耦:提高系统容错性和可维护性
-
异步提速:提升用户体验和系统吞吐量
-
削峰填谷:提高系统稳定性
1.2.1、应用解耦
系统的耦合性越高,容错性就越低,可维护性就越低。
使用 MQ 使得应用间解耦,提升容错性和可维护性。
1.2.2、异步提速
一个下单操作耗时:20 + 300 + 300 + 300 = 920ms
用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!
用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。
1.2.3、削峰填谷
使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。
使用MQ后,可以提高系统稳定性。
1.3、MQ 的劣势
劣势:
-
系统可用性降低
-
系统复杂度提高
-
一致性问题
-
系统可用性降低
- 系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
-
系统复杂度提高
- MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
-
一致性问题
- A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理
失败。如何保证消息数据处理的一致性?
- A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理
既然 MQ 有优势也有劣势,那么使用 MQ 需要满足什么条件呢?
-
生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
-
允许短暂的不一致性。
-
确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。
1.4、常见的 MQ 产品
目前业界有很多的 MQ 产品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑。
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义 | 自定义协议,社区封装了http协议支持 |
客户端支持语言 | 官方支持Erlang,Java,Ruby等,社区产出多种API,几乎支持所有语言 | Java,C,C++,Python,PHP,Perl,.net等 | Java,C++(不成熟) | 官方支持Java,社区产出多种API,如PHP,Python等 |
单机吞吐量 | 万级(其次) | 万级(最差) | 十万级(最好) | 十万级(次之) |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
功能特性 | 并发能力强,性能极其好,延时低,社区活跃,管理界面丰富 | 老牌产品,成熟度高,文档较多 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,毕竟是为大数据领域准备的。 |
1.5、RabbitMQ 简介
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。
2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
RabbitMQ 基础架构如下图:
RabbitMQ 中的相关概念:
-
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
-
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
-
Connection:publisher/consumer 和 broker 之间的 TCP 连接
-
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
-
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
-
Queue:消息最终被送到这里等待 consumer 取走
-
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
RabbitMQ 提供了 6 种工作模式:
- 简单模式
- work queues
- Publish/Subscribe 发布与订阅模式
- Routing路由模式
- Topics 主题模式
- RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
1.6、JMS
-
JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件的API
-
JMS 是 JavaEE 规范中的一种,类比JDBC
-
很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开源社区有
1.7、小结
-
RabbitMQ 是基于 AMQP 协议使用 Erlang 语言开发的一款消息队列产品。
-
RabbitMQ提供了6种工作模式,我们学习5种。
-
AMQP 是协议,类比HTTP。
-
JMS 是 API 规范接口,类比 JDBC。
二、RabbitMQ 的安装和配置
RabbitMQ 官方地址:http://www.rabbitmq.com/
2.1、安装
2.1.1、安装依赖环境
在线安装依赖环境:
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
2.1.2、安装Erlang
上传
erlang-18.3-1.el7.centos.x86_64.rpm
socat-1.7.3.2-5.el7.lux.x86_64.rpm
rabbitmq-server-3.6.5-1.noarch.rpm
# 安装
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
如果出现如下错误
说明gblic 版本太低。我们可以查看当前机器的gblic 版本
strings /lib64/libc.so.6 | grep GLIBC
当前最高版本2.12,需要2.15.所以需要升级glibc
-
使用yum更新安装依赖
sudo yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make -y
-
下载rpm包
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-utils-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-static-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-common-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-devel-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-headers-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/nscd-2.17-55.el6.x86_64.rpm &
-
安装rpm包
sudo rpm -Uvh *-2.17-55.el6.x86_64.rpm --force --nodeps
-
安装完毕后再查看glibc版本,发现glibc版本已经到2.17了
strings /lib64/libc.so.6 | grep GLIBC
2.1.3、安装RabbitMQ
# 安装
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
# 安装
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
2.1.4、开启管理界面及配置
# 开启管理界面
rabbitmq-plugins enable rabbitmq_management
# 修改默认配置信息
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
# 比如修改密码、配置等等,例如:loopback_users 中的 >,只保留guest
2.1.5、启动
service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务
- 设置配置文件
cd /usr/share/doc/rabbitmq-server-3.6.5/
cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
2.1.6、配置虚拟主机及用户
2.1.6.1、用户角色
RabbitMQ在安装好后,可以访问http://ip地址:15672
;其自带了guest/guest的用户名和密码;如果需要创建自定义用户;那么也可以登录管理界面后,如下操作:
角色说明:
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
2.1.6.2、Virtual Hosts配置
像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。
三、RabbitMQ 快速入门
3.1、入门程序
需求:使用简单模式完成消息传递
步骤:
- ① 创建工程(生成者、消费者)
- ② 分别添加依赖
- ③ 编写生产者发送消息
- ④ 编写消费者接收消息
3.1.1、生产者
dependencies>
dependency>
groupId>com.rabbitmqgroupId>
artifactId>amqp-clientartifactId>
version>5.6.0version>
dependency>
dependencies>
/**
* 发送消息
*/
public class Producer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/angyan");//虚拟机 默认值/
factory.setUsername("angyan");//用户名 默认 guest
factory.setPassword("angyan");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("hello_world",true,false,false,null);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
String body = "hello rabbitmq~~~";
//6. 发送消息
channel.basicPublish("","hello_world",null,body.getBytes());
//7.释放资源
channel.close();
connection.close();
}
3.1.2、消费者
dependencies>
dependency>
groupId>com.rabbitmqgroupId>
artifactId>amqp-clientartifactId>
version>5.6.0version>
dependency>
dependencies>
/**
* 消费消息
*/
public class Consumer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/angyan");//虚拟机 默认值/
factory.setUsername("angyan");//用户名 默认 guest
factory.setPassword("angyan");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("hello_world",true,false,false,null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("hello_world",true,consumer);
//关闭资源?不要
}
}
3.2、小结
上述的入门案例中其实使用的是如下的简单模式:
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接收者,会一直等待消息到来
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息
四、RabbitMQ 的工作模式
4.1 Work queues 工作队列模式
4.1.1、模式说明
-
Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
-
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
4.1.2、代码编写
4.1.2.1、生产者
/**
* 发送消息
*/
public class Producer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/angyan");//虚拟机 默认值/
factory.setUsername("angyan");//用户名 默认 guest
factory.setPassword("angyan");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues",true,false,false,null);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称
3. props:配置信息
4. body:发送消息数据
*/
for (int i = 1; i 10; i++) {
String body = i+"hello rabbitmq~~~";
//6. 发送消息
channel.basicPublish("","work_queues",null,body.getBytes());
}
//7.释放资源
channel.close();
connection.close();
}
}
4.1.2.2、消费者
/**
* 消费消息
*/
public class Consumer_WorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/angyan");//虚拟机 默认值/
factory.setUsername("angyan");//用户名 默认 guest
factory.setPassword("angyan");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues",true,false,false,null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
//关闭资源?不要
}
}
/**
* 消费消息
*/
public class Consumer_WorkQueues2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/angyan");//虚拟机 默认值/
factory.setUsername("angyan");//用户名 默认 guest
factory.setPassword("angyan");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
*
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues",true,false,false,null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
//关闭资源?不要
}
}
4.1.3、小结
-
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
-
Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可。
4.2、Pub/Sub 订阅模式
4.2.1、模式说明
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接收者,会一直等待消息到来
- Queue:消息队列,接收消息、缓存消息
- Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
4.2.1、代码编写
4.2.1.1、生产者
/**
* 发送消息
*/
public class Producer_PubSub {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/angyan");//虚拟机 默认值/
factory.setUsername("angyan");//用户名 默认 guest
factory.setPassword("angyan");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName = "test_fanout";
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//6. 创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7. 绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
//8. 发送消息
channel.basicPublish(exchangeName,"",null,body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}
4.2.1.2、消费者
/**
* 消费消息
*/
public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/angyan");//虚拟机 默认值/
factory.setUsername("angyan");//用户名 默认 guest
factory.setPassword("angyan");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
//关闭资源?不要
}
}
/**
* 消费消息
*/
public class Consumer_PubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/angyan");//虚拟机 默认值/
factory.setUsername("angyan");//用户名 默认 guest
factory.setPassword("angyan");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息保存数据库.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
//关闭资源?不要
}
}
4.2.3、小结
-
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
-
发布订阅模式与工作队列模式的区别:
-
工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
-
发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)
-
发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机
-
4.3、Routing 路由模式
4.3.1、模式说明
-
队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
-
消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
-
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
图解:
- P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
- C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
4.3.2、代码编写
4.3.2.1、生产者
/**
* 发送消息
*/
public class Producer_Routing {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/angyan");//虚拟机 默认值/
factory.setUsername("angyan");//用户名 默认 guest
factory.setPassword("angyan");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName = "test_direct";
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//6. 创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7. 绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
//队列1绑定 error
channel.queueBind(queue1Name,exchangeName,"error");
//队列2绑定 info error warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";
//8. 发送消息
channel.basicPublish(exchangeName,"warning",null,body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}
4.3.2.2、消费者
/**
* 消费消息
*/
public class Consumer_Routing1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/angyan");//虚拟机 默认值/
factory.setUsername("angyan");//用户名 默认 guest
factory.setPassword("heiangyanma");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
//关闭资源?不要
}
}
/**
* 消费消息
*/
public class Consumer_Routing2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/angyan");//虚拟机 默认值/
factory.setUsername("angyan");//用户名 默认 guest
factory.setPassword("angyan");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息存储到数据库.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
//关闭资源?不要
}
}
4.3.3、小结
- Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。
4.4、Topics 通配符模式
4.4.1、模式说明
- Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
- Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
- 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert
图解:
- 红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
- 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配
4.4.2、代码编写
4.4.2.1、生产者
/**
* 发送消息
*/
public class Producer_Topics {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/angyan");//虚拟机 默认值/
factory.setUsername("angyan");//用户名 默认 guest
factory.setPassword("angyan");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName = "test_topic";
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//6. 创建队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7. 绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
// routing key 系统的名称.日志的级别。
//=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
//8. 发送消息
channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());
//9. 释放资源
channel.close();
connection.close();
}
}
4.4.2.2、消费者
/**
* 消费消息
*/
public class Consumer_Topic1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/angyan");//虚拟机 默认值/
factory.setUsername("angyan");//用户名 默认 guest
factory.setPassword("angyan");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息存入数据库.......");
}
};
channel.basicConsume(queue1Name,true,consumer);
//关闭资源?不要
}
}
/**
* 消费消息
*/
public class Consumer_Topic2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/angyan");//虚拟机 默认值/
factory.setUsername("angyan");//用户名 默认 guest
factory.setPassword("angyan");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印控制台.......");
}
};
channel.basicConsume(queue2Name,true,consumer);
//关闭资源?不要
}
}
4.4.3、小结
- Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。
4.5、工作模式总结
-
简单模式 HelloWorld
- 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。
-
工作队列模式 Work Queue
- 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。
-
发布订阅模式 Publish/subscribe
- 需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。
-
路由模式 Routing
- 需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
-
通配符模式 Topic
- 需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
五、SpringBoot整合 RabbitMQ
生产端
-
创建生产者SpringBoot工程
-
引入start,依赖坐标
-
编写yml配置,基本信息配置
-
定义交换机,队列以及绑定关系的配置类
-
注入RabbitTemplate,调用方法,完成消息发送
消费端
-
创建消费者SpringBoot工程
-
引入start,依赖坐标
-
编写yml配置,基本信息配置
-
定义监听类,使用@RabbitListener注解完成队列监听。
5.1、生产者
5.1.1、引入start依赖坐标
project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
parent>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-parentartifactId>
version>2.3.2.RELEASEversion>
relativePath/>
parent>
modelVersion>4.0.0modelVersion>
artifactId>producerartifactId>
properties>
maven.compiler.source>11maven.compiler.source>
maven.compiler.target>11maven.compiler.target>
spring-boot-version>2.3.2.RELEASEspring-boot-version>
properties>
dependencies>
dependency>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-webartifactId>
dependency>
dependency>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-amqpartifactId>
dependency>
dependency>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-testartifactId>
dependency>
dependency>
groupId>org.projectlombokgroupId>
artifactId>lombokartifactId>
optional>trueoptional>
dependency>
dependencies>
build>
plugins>
plugin>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-maven-pluginartifactId>
version>${spring-boot-version}version>
plugin>
plugins>
build>
project>
5.1.2、编写yml配置,基本信息配置
# 配置tomcat端口号
server:
port: 9002
# 配置rabbitmq
spring:
rabbitmq:
host: 127.0.0.1 #mq地址
port: 5672 #端口
username: guest #登录账号
password: guest #登录密码
virtual-host: / #虚拟机
#publisher-confirms: true # 开启消息发送确认机制,低版本
publisher-confirm-type: correlated # 开启消息发送确认机制
5.1.3、定义生产者确认回调对象
/**
*
* 定义生产者确认回调对象
* 需要配置spring.rabbitmq.publisher-confirms=true 低版本
* 或spring.rabbitmq.publisher-confirm-type=correlated 高版本
* 每个RabbitTemplate只支持一个ConfirmCallback
*
*
**/
@Slf4j
@Component
public class ProducerAckConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init() {
//设置生产者确认回调对象
rabbitTemplate.setConfirmCallback(this);
}
/**
* 发送ack确认回调
*
* @param correlationData 这里获取唯一id
* @param ack 是否确认收到(true已确认收到,false未确认收到)
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//有些没有设置发送应答ack的,不需要走后续的逻辑
if (correlationData == null) {
return;
}
// 确认方法
log.info("是否确认发送成功ack = {} 失败原因cause={}", ack, cause);
// 如果为true,代表mq已成功接收消息
if (ack) {
// 从Redis数据中删除消息
log.info("消息确认发送成功:correlationDataId = {}", correlationData.getId());
} else {
// 如果为false,代表mq没有接收到消息(消息生产失败)
// 业务处理
log.error("消息确认发送失败:correlationDataId = {}", correlationData.getId());
}
}
}
5.1.4、RabbitMQ生产端常用方法整合
/**
*
* RabbitMQ生产端常用方法整合
*
*
**/
public interface RabbitMQService {
/**
* 发送队列模式消息(工作模式)
*
* @param queue 队列名称
* @param msg 消息内容(所有发送MQ的消息,需要序列化,下面不再叙述)
*/
void sendMessageByWork(String queue, Object msg);
/**
* 发送队列模式消息(工作模式)
*
* @param queue 队列名称
* @param msg 消息内容(所有发送MQ的消息,需要序列化,下面不再叙述)
* @param ackId 开启消息发送确认机制,应答ackId,建议根据业务确认唯一值。配置参考{@link com.example.config.ProducerAckConfirmCallback}
* (需要配置spring.rabbitmq.publisher-confirms=true):低版本
* 或(spring.rabbitmq.publisher-confirm-type=correlated):高版本
*/
void sendMessageByWork(String queue, Object msg, String ackId);
/**
* 发送队列模式消息(发布与订阅模式,路由,主题模式模式,主要看消费者)
*
* @param exchange 交换机
* @param routingKey 路由key,为空则是 发布与订阅模式(还是看消费者)
* @param msg 消息内容(所有发送MQ的消息,需要序列化,下面不再叙述)
*/
void sendMessageByExchange(String exchange, String routingKey, Object msg);
/**
* 发送队列模式消息(发布与订阅模式,路由,主题模式模式,主要看消费者)
*
* @param exchange 交换机
* @param routingKey 路由key,为空则是 发布与订阅模式(还是看消费者)
* @param msg 消息内容(所有发送MQ的消息,需要序列化,下面不再叙述)
* @param ackId 开启消息发送确认机制,应答ackId,建议根据业务确认唯一值。配置参考{@link com.example.config.ProducerAckConfirmCallback}
* (需要配置spring.rabbitmq.publisher-confirms=true):低版本
* 或(spring.rabbitmq.publisher-confirm-type=correlated):高版本
*/
void sendMessageByExchange(String exchange, String routingKey, Object msg, String ackId);
}
/**
*
* RabbitMQ生产端常用方法整合
*
*
**/
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendMessageByWork(String queue, Object msg) {
rabbitTemplate.convertAndSend("", queue, msg);
}
@Override
public void sendMessageByWork(String queue, Object msg, String ackId) {
rabbitTemplate.convertAndSend("", queue, msg, new CorrelationData(ackId));
}
@Override
public void sendMessageByExchange(String exchange, String routingKey, Object msg) {
rabbitTemplate.convertAndSend(exchange, routingKey == null ? "" : routingKey, msg);
}
@Override
public void sendMessageByExchange(String exchange, String routingKey, Object msg, String ackId) {
rabbitTemplate.convertAndSend(exchange, routingKey == null ? "" : routingKey, msg, new CorrelationData(ackId));
}
}
5.1.5、测试类
/**
*
* 生产者
*
*
*/
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMQProducerApplication.class)
public class ProducerTest {
@Autowired
private RabbitMQService rabbitMQService;
/*
发送消息时候,需要创建对应的队列和交换机
可以手动在MQ控制台创建
可以直接启动消费端即可
*/
/**
* 工作队列模式
* 只有一个消费者能接收到消息(竞争的消费者模式)
*/
@Test
public void sendWord() {
for (int i = 0; i 50; i++) {
rabbitMQService.sendMessageByWork("work-queue", "工作队列模式的消息!" + i);
}
}
/**
* 工作队列模式,演示发送应答ack,其它模式也一样
*/
@Test
public void sendWordAndAck() {
for (int i = 0; i 50; i++) {
rabbitMQService.sendMessageByWork("work-queue", "工作队列模式的消息!" + i, "工作模式ackId" + i);
}
}
/**
* 发布与订阅模式-生产者通过交换机,同时向多个消费者发送消息
* 所有消费者都能收到消息
*/
@Test
public void sendFanout() {
for (int i = 0; i 50; i++) {
rabbitMQService.sendMessageByExchange("fanout-exchange", null, "发布与订阅模式的消息!" + i);
}
}
/**
* 路由模式-交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
* 符合路由(routing key,固定)的消费者收到
*/
@Test
public void sendDirect() {
for (int i = 0; i 50; i++) {
String routingKey = i % 2 == 0 ? "error" : "info";
rabbitMQService.sendMessageByExchange("direct-exchange", routingKey, "路由模式的消息,路由routingKey=" + routingKey + " i=" + i);
}
}
/**
* 主题模式-通配符/主题模式-交换机和队列进行绑定,并且通配符方式routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
* 符合路由(routing key,通配符)的消费者收到
*/
@Test
public void sendTopic() {
for (int i = 0; i 50; i++) {
String routingKey = i % 2 == 0 ? "topic.log.info" : "topic.log.error";
rabbitMQService.sendMessageByExchange("topic-exchange", routingKey, "主题模式的消息,路由routingKey=" + routingKey + " i=" + i);
}
}
/**
* 主题模式-测试消费端的ack
* 消费端需要配置 spring.rabbitmq.listener.acknowledge-mode=manual 消费者开启手动ack消息确认,所有队列都会生效
* 消费端需要配置 spring.rabbitmq.listener.default-requeue-rejected=false 设置为false,会重发消息到死信队列,所有队列都会生效
*/
@Test
public void sendTopicByConsumerAck() {
//消费端最大消息堆积是3,这里发10条会有几条瞬间被扔到死信队列中,剩下的消费失败被拒绝确认后,才会扔到死信队列中
//当然,也可以设置为3,测试消费失败被拒绝确认,扔到死信队列中的情景
for (int i = 0; i 10; i++) {
String routingKey = i % 2 == 0 ? "topicAck.log.info" : "topicAck.log.error";
rabbitMQService.sendMessageByExchange("topicAck-exchange", routingKey, "主题模式-测试消费端的ack,路由routingKey=" + routingKey + " i=" + i);
}
}
}
5.2、消费者
5.2.1、引入start依赖坐标
project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
parent>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-parentartifactId>
version>2.3.2.RELEASEversion>
relativePath/>
parent>
modelVersion>4.0.0modelVersion>
artifactId>consumerartifactId>
properties>
maven.compiler.source>11maven.compiler.source>
maven.compiler.target>11maven.compiler.target>
spring-boot-version>2.3.2.RELEASEspring-boot-version>
properties>
dependencies>
dependency>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-webartifactId>
dependency>
dependency>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-amqpartifactId>
dependency>
dependency>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-testartifactId>
dependency>
dependency>
groupId>org.projectlombokgroupId>
artifactId>lombokartifactId>
optional>trueoptional>
dependency>
dependencies>
build>
plugins>
plugin>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-maven-pluginartifactId>
version>${spring-boot-version}version>
plugin>
plugins>
build>
project>
5.2.2、编写yml配置,基本信息配置
# 配置tomcat端口号
server:
port: 9003
# 配置rabbitmq
spring:
rabbitmq:
host: 127.0.0.1 # rabbitMQ的ip地址
#addresses: 192.168.150.101:8071, 192.168.150.101:8072, 192.168.150.101:8073 #集群地址
port: 5672 # 端口
username: guest # 账号
password: guest #密码
virtual-host: / #虚拟机
#publisher-confirms: true # 开启消息发送确认机制,低版本(一般在生产端,消费端加上也没关系)
publisher-confirm-type: correlated # 开启消息发送确认机制(一般在生产端,消费端加上也没关系)
listener:
simple:
# 并发消费:每个侦听器线程的最小数量,具体数值根据系统性能配置(一般为系统cpu核数)
concurrency: 2
# 并发消费:每个侦听器线程的最大数量,具体数值根据系统性能配置(一般为系统cpu核数*2)
max-concurrency: 4
# 每次只能获取一条消息,处理完成才能获取下一个消息,避免照成消息堆积在一个消费线程上
prefetch: 1
#acknowledge-mode: manual # 消费者开启手动ack消息确认,需要测试请看示例请AckConsumer,所有队列都会生效
#default-requeue-rejected: false # 设置为false,会重发消息到死信队列(防止手动ack确认失败的消息堆积),需要测试请示例AckConsumer,所有队列都会生效
retry:
enabled: true # 解决消息死循环问题-启用重试
max-attempts: 3 # 最大重试3次(默认),超过就丢失(或放到死信队列中,防止消息堆积)
multiplier: 2 # 乘子
initial-interval: 3000 # 第一次和第二次之间的重试间隔,后面的用乘子计算 3s 6s 12s
max-interval: 16000 # 最大重试时间间隔16s
# ==================================其实下面用常量配置更好,统一管理,这里只是作为配置文件的演示==================================
# 发布与订阅模式
fanout:
# 交换机
exchange: fanout-exchange
# 队列名称
sms:
queue: fanout-sms-queue
weixin:
queue: fanout-weixin-queue
# 路由模式
direct:
# 交换机
exchange: direct-exchange
# 队列名称
info:
queue: direct-info-queue
error:
queue: direct-error-queue
all:
queue: direct-all-queue
# 主题模式
topic:
# 交换机
exchange: topic-exchange
# 配置队列名称
info:
queue: topic-info-queue
error:
queue: topic-error-queue
full:
queue: topic-full-queue
# 主题模式下的ack确认+死信队列
topicAck:
# 交换机
exchange: topicAck-exchange
# 配置队列名称
ack:
queue: topicAck-ack-queue
# 死信队列
dead:
# 交换机
exchange: dead-exchange
# 配置队列名称
queue: dead-queue
5.2.3、定义监听类,使用@RabbitListener注解完成队列监听。
5.2.3.1、死信队列消费者
/**
*
* 消费端-手动确认ack,
* 需要配置 spring.rabbitmq.listener.acknowledge-mode=manual 消费者开启手动ack消息确认
* 需要配置 spring.rabbitmq.listener.default-requeue-rejected=false 设置为false,会重发消息到死信队列
*
**/
@Component
public class AckConsumer {
/**
* 消息监听方法
* bindings: 完成队列与交换机的绑定
* Queue: 队列属性,超过最大值,超时未被消费,消费失败超过重试次数,都会被仍到信息队列中
* exchange:交换机属性
* key:路由key,通配符
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "${topicAck.ack.queue}", durable = "true", arguments = {
@Argument(name = "x-max-length", value = "3", type = "java.lang.Long"), // 队列的最大存储界限,这里示例设为3
@Argument(name = "x-message-ttl", value = "5000000", type = "java.lang.Long"), // 消息过期时间,多久没有被消费
@Argument(name = "x-dead-letter-exchange", value = "${dead.exchange}"), // 死信队列交换机-dead-exchange
@Argument(name = "x-dead-letter-routing-key", value = "xxx")}), // 死信队列路由key
exchange = @Exchange(name = "${topicAck.exchange}", type = ExchangeTypes.TOPIC),// 交换机
key = {"#"}// 路由key,通配符
))
public void handlerMessage(String msg, Channel channel, Message message) throws IOException {
try {
System.out.printf("================AckConsumer接受到消息,准备消费,msg=%s,================", msg);
System.out.println();
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("AckConsumer--->接受到的消息是:" + msg);
// 业务处理
int i = 10 / 0;
// 手动ack确认
//参数1:deliveryTag:消息唯一传输ID
//参数2:multiple:true: 手动批量处理,false: 手动单条处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.printf("================AckConsumer消费成功,msg=%s,================", msg);
System.out.println();
} catch (Exception ex) {
// 如果真得出现了异常,我们采用消息重投,获取redelivered,判断是否为重投: false没有重投,true重投
Boolean redelivered = message.getMessageProperties().getRedelivered();
System.out.println("redelivered = " + redelivered);
try {
// (已重投)拒绝确认
if (redelivered) {
/**
* 拒绝确认,从队列中删除该消息,防止队列阻塞(消息堆积)
* boolean requeue: false不重新入队列(丢弃消息)
*/
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
System.out.printf("================AckConsumer消费消息已投入死信队列,msg=%s,================", msg);
System.out.println();
} else { // (没有重投) 消息重投
/**
* 消息重投,重新把消息放回队列中
* boolean multiple: 单条或批量
* boolean requeue: true重回队列
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.out.println("=========消息重投了=======");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
*
* 死信队列消费者
*
*
**/
@Component
public class DeadLetterConsumer {
/**
* 监听消息的方法
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "${dead.queue}", durable = "true"), // 队列
exchange = @Exchange(name = "${dead.exchange}", type = ExchangeTypes.TOPIC),// 交换机
key = {"#"}// 路由key
))
public void handlerMessage(String msg, Channel channel, Message message) throws IOException {
//其实这里也要像AckConsumer一样处理,这里只是简单的确认即可
System.out.println("死信队列接收到的消息:" + msg);
// 手动ack确认
//参数1:deliveryTag:消息唯一传输ID
//参数2:multiple:true: 手动批量处理,false: 手动单条处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("死信队列接已手动确认,消息:" + msg);
}
}
5.2.3.2、Routing 路由模式
/**
*
* Routing 路由模式-交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
* 符合路由(routing key,固定)的消费者收到
*
*
**/
@Component
public class AllConsumer {
/**
* 消息监听方法
* bindings: 完成队列与交换机的绑定
* Queue: 队列属性
* exchange:交换机属性
* key:路由key
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "${direct.all.queue}", durable = "true"), // 队列
exchange = @Exchange(name = "${direct.exchange}", type = ExchangeTypes.DIRECT),// 交换机
key = {"info", "error"} // 路由key
))
public void handlerMessage(String msg) {
System.out.println("all--->接受到的消息是:" + msg);
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
*
* Routing 路由模式-交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
* 符合路由(routing key,固定)的消费者收到
*
*
**/
@Component
public class InfoConsumer {
/**
* 消息监听方法
* bindings: 完成队列与交换机的绑定
* Queue: 队列属性
* exchange:交换机属性
* key:路由key
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "${direct.info.queue}", durable = "true"), // 队列
exchange = @Exchange(name = "${direct.exchange}", type = ExchangeTypes.DIRECT),// 交换机
key = {"info"} // 路由key
))
public void handlerMessage(String msg) {
System.out.println("info--->接受到的消息是:" + msg);
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
*
* Routing 路由模式-交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
* 符合路由(routing key,固定)的消费者收到
*
*
**/
@Component
public class ErrorConsumer {
/**
* 消息监听方法
* bindings: 完成队列与交换机的绑定
* Queue: 队列属性
* exchange:交换机属性
* key:路由key
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "${direct.error.queue}", durable = "true"), // 队列
exchange = @Exchange(name = "${direct.exchange}", type = ExchangeTypes.DIRECT),// 交换机
key = {"error"} // 路由key
))
public void handlerMessage(String msg) {
System.out.println("error--->接受到的消息是:" + msg);
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
5.2.3.3、Fanout/subscribe发布订阅模式
/**
*
* Fanout/subscribe发布订阅模式-生产者通过交换机,同时向多个消费者发送消息: 交换机模式采用(type=fanout)
* 所有消费者都能收到消息
*
*
**/
@Component
public class SmsConsumer {
/**
* 消息监听方法
* bindings: 完成队列与交换机的绑定
* Queue: 队列属性
* exchange:交换机属性
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "${fanout.sms.queue}", durable = "true"),
exchange = @Exchange(name = "${fanout.exchange}", type = ExchangeTypes.FANOUT)))
public void handlerMessage(String msg) {
System.out.println("发布订阅模式:fanout Sms接收到的消息是: " + msg);
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
*
* Fanout/subscribe发布订阅模式-生产者通过交换机,同时向多个消费者发送消息: 交换机模式采用(type=fanout)
* 所有消费者都能收到消息
*
*
**/
@Component
public class WeixinConsumer {
/**
* 消息监听方法
* bindings: 完成队列与交换机的绑定
* Queue: 队列属性
* exchange:交换机属性
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "${fanout.weixin.queue}", durable = "true"),
exchange = @Exchange(name = "${fanout.exchange}", type = ExchangeTypes.FANOUT)))
public void handlerMessage(String msg) {
System.out.println("发布订阅模式:fanout Weixin接收到的消息是: " + msg);
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
5.2.3.4、topic 通配符/主题模式
/**
*
* topic 通配符/主题模式-交换机和队列进行绑定,并且通配符方式routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
* 符合路由(routing key,通配符)的消费者收到
*
*
**/
@Component
public class FullLogConsumer {
/**
* 消息监听方法
* bindings: 完成队列与交换机的绑定
* Queue: 队列属性
* exchange:交换机属性
* key:路由key,通配符
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "${topic.full.queue}", durable = "true"),// 队列
exchange = @Exchange(name = "${topic.exchange}", type = ExchangeTypes.TOPIC),// 交换机
key = {"#"}// 路由key,通配符
))
public void handlerMessage(String msg) {
System.out.println("log.full--->接受到的消息是:" + msg);
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
*
* topic 通配符/主题模式-交换机和队列进行绑定,并且通配符方式routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
* 符合路由(routing key,通配符)的消费者收到
*
*
**/
@Component
public class InfoLogConsumer {
/**
* 消息监听方法
* bindings: 完成队列与交换机的绑定
* Queue: 队列属性
* exchange:交换机属性
* key:路由key,通配符
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "${topic.info.queue}", durable = "true"),// 队列
exchange = @Exchange(name = "${topic.exchange}", type = ExchangeTypes.TOPIC),// 交换机
key = {"*.log.info"}// 路由key,通配符
))
public void handlerMessage(String msg) {
System.out.println("log.info--->接受到的消息是:" + msg);
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
*
* topic 通配符/主题模式-交换机和队列进行绑定,并且通配符方式routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
* 符合路由(routing key,通配符)的消费者收到
*
*
**/
@Component
public class ErrorLogConsumer {
/**
* 消息监听方法
* bindings: 完成队列与交换机的绑定
* Queue: 队列属性
* exchange:交换机属性
* key:路由key,通配符
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "${topic.error.queue}", durable = "true"),// 队列
exchange = @Exchange(name = "${topic.exchange}", type = ExchangeTypes.TOPIC),// 交换机
key = {"*.log.error"}// 路由key,通配符
))
public void handlerMessage(String msg) {
System.out.println("log.error--->接受到的消息是:" + msg);
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
5.2.3.5、work工作队列模式
/**
*
* work工作队列模式的消费者-只有一个消费者能接收到消息(竞争的消费者模式)
*
*
**/
@Component
public class Consumer1 {
/**
* 消息监听方法
* queuesToDeclare: 声明队列
* durable: 是否为持久化队列
*/
@RabbitListener(queuesToDeclare = @Queue(name = "work-queue", durable = "true"))
public void handlerMessage(String msg) {
System.out.println("=====消费者1: " + msg);
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
//模拟消费失败,触发重试
// int i = 1 / 0;
}
}
/**
*
* work工作队列模式的消费者-只有一个消费者能接收到消息(竞争的消费者模式)
*
*
**/
@Component
public class Consumer2 {
/**
* 消息监听方法
* queuesToDeclare: 声明队列
* durable: 是否为持久化队列
*/
@RabbitListener(queuesToDeclare = @Queue(name = "work-queue", durable = "true"))
public void handlerMessage(String msg) {
System.out.println("=====消费者2: " + msg);
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
//模拟消费失败,触发重试
// int i = 1 / 0;
}
}
小结
- SpringBoot提供了快速整合RabbitMQ的方式
- 基本信息再yml中配置,队列交互机以及绑定关系在配置类中使用Bean的方式配置
- 生产端直接注入RabbitTemplate完成消息发送
- 消费端直接使用@RabbitListener完成消息接收
六、RabbitMQ 高级特性
6.1、消息的可靠投递
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
-
confirm 确认模式
-
return 退回模式
rabbitmq 整个消息投递的路径为:
- producer—>rabbitmq broker—>exchange—>queue—>consumer
- 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
- 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递
小结
-
设置ConnectionFactory的publisher-confirms=“true” 开启 确认模式。
-
使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
-
设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。
-
使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
-
在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。
- 使用channel下列方法,完成事务控制:
- txSelect(), 用于将当前channel设置成transaction模式
- txCommit(),用于提交事务
- txRollback(),用于回滚事务
- 使用channel下列方法,完成事务控制:
6.2、Consumer Ack
ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
-
自动确认:acknowledge=“none”
-
手动确认:acknowledge=“manual”
-
根据异常情况确认:acknowledge=“auto”
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
小结
-
在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
-
如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
-
如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
6.3、消费端限流
小结
-
在rabbit:listener-container 中配置 prefetch属性设置消费端一次拉取多少消息
-
消费端的确认模式一定为手动确认。acknowledge=“manual”
6.4、TTL
- TTL 全称 Time To Live(存活时间/过期时间)。
- 当消息到达存活时间后,还没有被消费,会被自动清除。
- RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
小结
- 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
- 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
- 如果两者都进行了设置,以时间短的为准。
6.5、死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
消息成为死信的三种情况:
- 队列消息长度到达限制;
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
- 原队列存在消息过期设置,消息到达超时时间未被消费;
队列绑定死信交换机:
- 给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
小结
- 死信交换机和死信队列和普通的没有区别
- 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
- 消息成为死信的三种情况:
- 队列消息长度到达限制;
- 消费者拒接消费消息,并且不重回队列;
- 原队列存在消息过期设置,消息到达超时时间未被消费;
6.6、延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:
-
- 下单后,30分钟未支付,取消订单,回滚库存。
-
- 新用户注册成功7天后,发送短信问候。
实现方式:
-
- 定时器
-
- 延迟队列
在RabbitMQ中并未提供延迟队列功能。
但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
小结
- 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
- RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。
6.7、日志与监控
6.7.1、RabbitMQ日志
RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log
日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等。
6.7.2、web管控台监控
6.7.3、rabbitmqctl管理和监控
查看队列
# rabbitmqctl list_queues
查看exchanges
# rabbitmqctl list_exchanges
查看用户
# rabbitmqctl list_users
查看连接
# rabbitmqctl list_connections
查看消费者信息
# rabbitmqctl list_consumers
查看环境变量
# rabbitmqctl environment
查看未被确认的队列
# rabbitmqctl list_queues name messages_unacknowledged
查看单个队列的内存使用
# rabbitmqctl list_queues name memory
查看准备就绪的队列
# rabbitmqctl list_queues name messages_ready
6.8、消息追踪
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。
6.8.1、Firehose
firehose的机制是将生产者投递给rabbitmq的消息,rabbitmq投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace,它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息。
注意:打开 trace 会影响消息写入功能,适当打开后请关闭。
- rabbitmqctl trace_on:开启Firehose命令
- rabbitmqctl trace_off:关闭Firehose命令
6.8.2、rabbitmq_tracing
rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一
层GUI的包装,更容易使用和管理。
启用插件:rabbitmq-plugins enable rabbitmq_tracing
七、RabbitMQ 应用问题
7.1、消息可靠性保障
消息补偿
7.2、消息幂等性保障
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任
意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
乐观锁机制
八、RabbitMQ 集群搭建
8.1、集群方案的原理
RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。
8.2、单机多实例部署
由于某些因素的限制,有时候你不得不在一台机器上去搭建一个rabbitmq集群,这个有点类似zookeeper的单机版。真实生成环境还是要配成多机集群的。这里主要论述如何在单机中配置多个rabbitmq实例。
主要参考官方文档:https://www.rabbitmq.com/clustering.html
首先确保RabbitMQ运行没有问题
[root@super ~]# rabbitmqctl status
Status of node rabbit@super ...
[{pid,10232},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","3.6.5"},
{rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.5"},
{webmachine,"webmachine","1.10.3"},
{mochiweb,"MochiMedia Web Server","2.13.1"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.5"},
{rabbit,"RabbitMQ","3.6.5"},
{os_mon,"CPO CXC 138 46","2.4"},
{syntax_tools,"Syntax tools","1.7"},
{inets,"INETS CXC 138 49","6.2"},
{amqp_client,"RabbitMQ AMQP Client","3.6.5"},
{rabbit_common,[],"3.6.5"},
{ssl,"Erlang/OTP SSL application","7.3"},
{public_key,"Public key infrastructure","1.1.1"},
{asn1,"The Erlang ASN1 compiler version 4.0.2","4.0.2"},
{ranch,"Socket acceptor pool for TCP protocols.","1.2.1"},
{mnesia,"MNESIA CXC 138 12","4.13.3"},
{compiler,"ERTS CXC 138 10","6.0.3"},
{crypto,"CRYPTO","3.6.3"},
{xmerl,"XML parser","1.3.10"},
{sasl,"SASL CXC 138 11","2.7"},
{stdlib,"ERTS CXC 138 10","2.8"},
{kernel,"ERTS CXC 138 10","4.2"}]},
{os,{unix,linux}},
{erlang_version,
"Erlang/OTP 18 [erts-7.3] [source] [64-bit] [async-threads:64] [hipe] [kernel-poll:true]n"},
{memory,
[{total,56066752},
{connection_readers,0},
{connection_writers,0},
{connection_channels,0},
{connection_other,2680},
{queue_procs,268248},
{queue_slave_procs,0},
{plugins,1131936},
{other_proc,18144280},
{mnesia,125304},
{mgmt_db,921312},
{msg_index,69440},
{other_ets,1413664},
{binary,755736},
{code,27824046},
{atom,1000601},
{other_system,4409505}]},
{alarms,[]},
{listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,411294105},
{disk_free_limit,50000000},
{disk_free,13270233088},
{file_descriptors,
[{total_limit,924},{total_used,6},{sockets_limit,829},{sockets_used,0}]},
{processes,[{limit,1048576},{used,262}]},
{run_queue,0},
{uptime,43651},
{kernel,{net_ticktime,60}}]
停止rabbitmq服务
[root@super sbin]# service rabbitmq-server stop
Stopping rabbitmq-server: rabbitmq-server.
启动第一个节点:
[root@super sbin]# RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start
RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /var/log/rabbitmq/rabbit1.log
###### ## /var/log/rabbitmq/rabbit1-sasl.log
##########
Starting broker...
completed with 6 plugins.
启动第二个节点:
[root@super ~]# RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start
RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /var/log/rabbitmq/rabbit2.log
###### ## /var/log/rabbitmq/rabbit2-sasl.log
##########
Starting broker...
completed with 6 plugins.
结束命令:
rabbitmqctl -n rabbit1 stop
rabbitmqctl -n rabbit2 stop
rabbit1操作作为主节点:
[root@super ~]# rabbitmqctl -n rabbit1 stop_app
Stopping node rabbit1@super ...
[root@super ~]# rabbitmqctl -n rabbit1 reset
Resetting node rabbit1@super ...
[root@super ~]# rabbitmqctl -n rabbit1 start_app
Starting node rabbit1@super ...
[root@super ~]#
rabbit2操作为从节点:
[root@super ~]# rabbitmqctl -n rabbit2 stop_app
Stopping node rabbit2@super ...
[root@super ~]# rabbitmqctl -n rabbit2 reset
Resetting node rabbit2@super ...
[root@super ~]# rabbitmqctl -n rabbit2 join_cluster rabbit1@'super' ###''内是主机名换成自己的
Clustering node rabbit2@super with rabbit1@super ...
[root@super ~]# rabbitmqctl -n rabbit2 start_app
Starting node rabbit2@super ...
查看集群状态:
[root@super ~]# rabbitmqctl cluster_status -n rabbit1
Cluster status of node rabbit1@super ...
[{nodes,[{disc,[rabbit1@super,rabbit2@super]}]},
{running_nodes,[rabbit2@super,rabbit1@super]},
{cluster_name,"rabbit1@super">>},
{partitions,[]},
{alarms,[{rabbit2@super,[]},{rabbit1@super,[]}]}]
8.3、集群管理
rabbitmqctl join_cluster {cluster_node} [–ram]
- 将节点加入指定集群中。在这个命令执行前需要停止RabbitMQ应用并重置节点。
rabbitmqctl cluster_status
- 显示集群的状态。
rabbitmqctl change_cluster_node_type {disc|ram}
- 修改集群节点的类型。在这个命令执行前需要停止RabbitMQ应用。
rabbitmqctl forget_cluster_node [–offline]
- 将节点从集群中删除,允许离线执行。
rabbitmqctl update_cluster_nodes {clusternode}
- 在集群中的节点应用启动前咨询clusternode节点的最新信息,并更新相应的集群信息。这个和join_cluster不同,它不加入集群。考虑这样一种情况,节点A和节点B都在集群中,当节点A离线了,节点C又和节点B组成了一个集群,然后节点B又离开了集群,当A醒来的时候,它会尝试联系节点B,但是这样会失败,因为节点B已经不在集群中了。
rabbitmqctl cancel_sync_queue [-p vhost] {queue}
- 取消队列queue同步镜像的操作。
rabbitmqctl set_cluster_name {name}
- 设置集群名称。集群名称在客户端连接时会通报给客户端。Federation和Shovel插件也会有用到集群名称的地方。集群名称默认是集群中第一个节点的名称,通过这个命令可以重新设置。
8.4、RabbitMQ镜像集群配置
上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。
镜像队列是基于普通的集群模式的,然后再添加一些策略,所以你还是得先配置普通集群,然后才能设置镜像队列,我们就以上面的集群接着做。
设置的镜像队列可以通过开启的网页的管理端Admin->Policies,也可以通过命令。
rabbitmqctl set_policy my_ha "^" '{"ha-mode":"all"}'
- Name:策略名称
- Pattern:匹配的规则,如果是匹配所有的队列,是^.
- Definition:使用ha-mode模式中的all,也就是同步所有匹配的队列。问号链接帮助文档。
8.5、负载均衡-HAProxy
HAProxy提供高可用性、负载均衡以及基于TCP和HTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括Twitter,Reddit,StackOverflow,GitHub在内的多家知名互联网公司在使用。HAProxy实现了一种事件驱动、单一进程模型,此模型支持非常大的并发连接数。
8.5.1、安装HAProxy
//下载依赖包
yum install gcc vim wget
//上传haproxy源码包
//解压
tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
//进入目录、进行编译、安装
cd /usr/local/haproxy-1.6.5
make TARGET=linux31 PREFIX=/usr/local/haproxy
make install PREFIX=/usr/local/haproxy
mkdir /etc/haproxy
//赋权
groupadd -r -g 149 haproxy
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
//创建haproxy配置文件
mkdir /etc/haproxy
vim /etc/haproxy/haproxy.cfg
8.5.2、配置HAProxy
配置文件路径:/etc/haproxy/haproxy.cfg
#logging options
global
log 127.0.0.1 local0 info
maxconn 5120
chroot /usr/local/haproxy
uid 99
gid 99
daemon
quiet
nbproc 20
pidfile /var/run/haproxy.pid
defaults
log global
mode tcp
option tcplog
option dontlognull
retries 3
option redispatch
maxconn 2000
contimeout 5s
clitimeout 60s
srvtimeout 15s
#front-end IP for consumers and producters
listen rabbitmq_cluster
bind 0.0.0.0:5672
mode tcp
#balance url_param userid
#balance url_param session_id check_post 64
#balance hdr(User-Agent)
#balance hdr(host)
#balance hdr(Host) use_domain_only
#balance rdp-cookie
#balance leastconn
#balance source //ip
balance roundrobin
server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2
server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2
listen stats
bind 172.16.98.133:8100
mode http
option httplog
stats enable
stats uri /rabbitmq-stats
stats refresh 5s
启动HAproxy负载
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
//查看haproxy进程状态
ps -ef | grep haproxy
访问如下地址对mq节点进行监控
http://172.16.98.133:8100/rabbitmq-stats
代码中访问mq集群地址,则变为访问haproxy地址:5672
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net