目录
一、什么是消息队列?
二、需求分析
1)核心概念
2)核心API
3)交换机类型
4)持久化
5)网络通信
编辑6)消息应答
三、 模块划分
四、创建核心类
1.ExChange
2.MSGQueue
3.Binding
4.Message
五. 数据库设计
1.配置 sqlite
引⼊ pom.xml 依赖
配置数据源 application.yml
2.实现创建表和数据库基本操作
3.实现 DataBaseManager
4.测试 DataBaseManager
六. 消息存储设计
1.设计思路
2.创建 MessageFileManager 类
1)实现统计⽂件读写
2)实现创建队列⽬录
3)实现删除队列⽬录
4)检查队列⽂件是否存在
5)实现消息对象序列化/反序列化
6)实现写⼊消息⽂件
7)实现删除消息
8)实现消息加载
9)实现垃圾回收(GC)
10)测试 MessageFileManager
七. 整合数据库和⽂件
八. 内存数据结构设计
1.封装 Exchange 、Queue 、Binding 、Message 方法
2.针对未确认的消息的处理
3.实现重启后恢复内存
4.测试 MemoryDataCenter
九. 虚拟主机设计
1.创建 VirtualHost
2.实现构造⽅法和 getter
3.创建交换机
4.删除交换机
5.创建队列
6.删除队列
7.创建绑定
8.删除绑定
9.发布消息
10.路由规则
1) 实现 route ⽅法
2) 实现 checkRoutingKeyValid
3) 实现 checkBindingKeyValid
4) 实现 routeTopic
6) 测试 Router
11.订阅消息
1) 添加⼀个订阅者
2) 创建订阅者管理管理类
3) 添加令牌接⼝
4) 实现添加订阅者
5) 实现扫描线程
6) 实现消费消息
7)⼩结
12.消息确认
13. 测试 VirtualHost
⼗. ⽹络通信协议设计
1.明确需求
2.设计应⽤层协议
1)请求编辑
2)响应编辑
3.定义 Request / Response
4.定义参数⽗类
5.定义返回值⽗类
6.定义其他参数类
1) ExchangeDeclareArguments
2) ExchangeDeleteArguments
3) QueueDeclareArguments
4) QueueDeleteArguments
5) QueueBindArguments
6) QueueUnbindArguments
7) BasicPublishArguments
8) BasicConsumeArguments
9) SubScribeReturns
⼗一. 实现 BrokerServer
1.创建 BrokerServer 类
2.启动/停⽌服务器
3.实现处理连接
4.实现 readRequest
5.实现 writeResponse
6.实现处理请求
7.实现 clearClosedSession
⼗二. 实现客⼾端
1.创建 ConnectionFactory
2.Connection 和 Channel 的定义
1) Connection 的定义
2) Channel 的定义
3.封装请求响应读写操作
4.创建 channel
5.发送请求
1) 创建 channel
2) 关闭 channel
3) 创建交换机
4) 删除交换机
5) 创建队列
6) 删除队列
7) 创建绑定
8) 删除绑定
9) 发送消息
10) 订阅消息
11) 确认消息
6.处理响应
1) 创建扫描线程
2) 实现响应的分发
3) 实现 channel.putReturns
7.关闭 Connection
8.测试客⼾端-服务器
⼗三. 案例: 基于 MQ 的⽣产者消费者模型
1.生产者
2.消费者
3.运行结果
十四.总结
一、什么是消息队列?
消息队列是一种用于在应用程序之间或不同组件之间进行异步通信的软件架构模式。它允许发送方(生产者)将消息发送到队列中,而接收方(消费者)可以从队列中获取消息并进行处理。
消息队列的主要作用是解耦和异步化系统的各个组件。发送方可以将消息放入队列中后立即继续执行其他任务,而无需等待接收方的响应。接收方则可以在适当的时候从队列中获取消息并进行处理,而不需要与发送方实时交互。
消息队列可以用于各种场景,例如:
-
异步任务处理:将耗时的任务放入消息队列中,由后台的工作线程或服务进行处理,避免阻塞主线程或前端用户界面。
-
系统解耦:不同的组件可以通过消息队列进行通信,降低组件之间的耦合度,使系统更加灵活和可扩展。
-
流量控制:当系统负载过高时,可以通过消息队列来缓冲和限制请求的处理速度,以保护系统的稳定性。
-
日志处理:将应用程序的日志信息发送到消息队列中,然后由其他组件进行处理和分析。
常见的消息队列系统包括 RabbitMQ、Apache Kafka、ActiveMQ 等。它们提供了各种功能和特性,以满足不同场景下的需求。
二、需求分析
1)核心概念
• ⽣产者 (Producer)
• 消费者 (Consumer)
• 中间⼈ (Broker)
• 发布 (Publish)
• 订阅 (Subscribe)
其中, Broker 是最核⼼的部分. 负责消息的存储和转发.
在 Broker 中, ⼜存在以下概念.
虚拟机 (VirtualHost): 类似于 MySQL 的 “database”, 是⼀个逻辑上的集合. ⼀个 BrokerServer 上可 以存在多个 VirtualHost.
交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上. 再根据不同的规则, 把消息转发 给不同的 Queue.
队列 (Queue): 真正⽤来存储消息的部分. 每个消费者决定⾃⼰从哪个 Queue 上读取消息.
绑定 (Binding): Exchange 和 Queue 之间的关联关系. Exchange 和 Queue 可以理解成 “多对多” 关 系. 使⽤⼀个关联表就可以把这两个概念联系起来.
消息 (Message): 传递的内容.
这些概念, 既需要在内存中存储, 也需要在硬盘上存储.
内存存储: ⽅便使⽤.硬盘存储: 重启数据不丢失.
2)核心API
对于 Broker 来说, 要实现以下核⼼ API. 通过这些 API 来实现消息队列的基本功能.
1. 创建队列 (queueDeclare)
2. 销毁队列 (queueDelete)
3. 创建交换机 (exchangeDeclare)
4. 销毁交换机 (exchangeDelete)
5. 创建绑定 (queueBind)
6. 解除绑定 (queueUnbind)
7. 发布消息 (basicPublish)
8. 订阅消息 (basicConsume)
9. 确认消息 (basicAck)
另⼀⽅⾯, Producer 和 Consumer 则通过⽹络的⽅式, 远程调⽤这些 API, 实现 ⽣产者消费者模型.
3)交换机类型
对于 RabbitMQ 来说, 主要⽀持四种交换机类型:Direct 、Fanout 、Topic 、Header.我们这里实现前三种.
Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名.
Fanout: ⽣产者发送的消息会被复制到该交换机的所有队列中.
Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey. 发送消息指定⼀个字符串为 routingKey. 当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列.
4)持久化
Exchange, Queue, Binding, Message 都有持久化需求.
当程序重启 / 主机重启, 保证上述内容不丢失.
5)网络通信
⽣产者和消费者都是客⼾端程序, broker 则是作为服务器. 通过⽹络进⾏通信.
在⽹络通信的过程中, 客⼾端部分要提供对应的 api, 来实现对服务器的操作.
1. 创建 Connection
2. 关闭 Connection
3. 创建 Channel
4. 关闭 Channel
5. 创建队列 (queueDeclare)
6. 销毁队列 (queueDelete)
7. 创建交换机 (exchangeDeclare)
8. 销毁交换机 (exchangeDelete)
9. 创建绑定 (queueBind)
10. 解除绑定 (queueUnbind)
11. 发布消息 (basicPublish)
12. 订阅消息 (basicConsume)
13. 确认消息 (basicAck)
Connection 对应⼀个 TCP 连接. Channel 则是 Connection 中的逻辑通道. ⼀个 Connection 中可以包含多个 Channel. Channel 和 Channel 之间的数据是独⽴的. 不会相互⼲扰. 这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接.
Connection 可以理解成⼀根⽹线. Channel 则是⽹线⾥具体的线缆.
6)消息应答
被消费的消息, 需要进⾏应答. 应答模式分成两种.
• ⾃动应答: 消费者只要消费了消息, 就算应答完毕了. Broker 直接删除这个消息.
• ⼿动应答: 消费者⼿动调⽤应答接⼝, Broker 收到应答请求之后, 才真正删除这个消息.
(⼿动应答的⽬的, 是为了保证消息确实被消费者处理成功了. 在⼀些对于数据可靠性要求⾼的场景, ⽐ 较常⻅.)
三、 模块划分
可以看到, 像 交换机, 队列, 绑定, 消息, 这⼏个核⼼概念在内存和硬盘中都是存储了的. 其中内存为主, 是⽤来实现消息转发的关键; 硬盘为辅, 主要是保证服务器重启之后, 之前的信息都可以 正常保持.
四、创建核心类
1.ExChange
public class Exchange {
private String name;
private ExchangeType type = ExchangeType.DIRECT;
private boolean durable = false;
//省略 getter setter
}
public enum ExchangeType {
DIRECT(0),
FANOUT(1),
TOPIC(2);
private final int type;
private ExchangeType(int type) {
this.type = type;
}
public int getType() {
return this.type;
}
}
name : 交换机的名字. 相当于交换机的⾝份标识.
type : 交换机的类型. 三种取值, DIRECT, FANOUT, TOPIC.
durable : 交换机是否要持久化存储. true 为持久化, false 不持久化.
RabbitMQ 中的交换机, 还⽀持 autoDelete(使⽤完毕后是否⾃动删除) 和 arguments(交换机的其他参数属性) , 我们这里不实现这两个功能。
2.MSGQueue
public class MSGQueue {
private String name;
private boolean durable;
// 省略 getter setter
}
name : 队列的名字. 相当于队列的⾝份标识.
durable : 交换机是否要持久化存储. true 为持久化, false 不持久化.
exclusive : 独占(排他), 队列只能被⼀个消费者使⽤.
autoDelete : 使⽤完毕后是否⾃动删除.
arguments : 交换机的其他参数属性.
RabbitMQ还实现了以上功能,我们这里不实现
3.Binding
public class Binding {
private String exchangeName;
private String queueName;
private String bindingKey;
// 省略 getter setter
}
exchangeName 交换机名字
queueName 队列名字
bindingKey 只在交换机类型为 TOPIC 时才有效. ⽤于和消息中的 routingKey 进⾏匹配
4.Message
public class Message implements Serializable {
private BasicProperties basicProperties = new BasicProperties();
private byte[] body;
// 消息在⽂件中对应的 offset 的范围, [offsetBeg, offsetEnd)
// 从这个范围取出的 byte[] 正好可以反序列化成⼀个 Message 对象.
// offsetBeg 前⾯的 4 个字节是消息的⻓度
private transient long offsetBeg = 0;
private transient long offsetEnd = 0;
private byte isValid = 0x1; // 消息在⽂件中是否有效. 0x0 表⽰⽆效, 0x1 表⽰有效
// 创建新的消息, 同时给该消息分配⼀个新的 messageId
// routingKey 以参数的为准. 会覆盖掉 basicProperties 中的 routingKey
public static Message createMessageWithId(String routingKey, BasicProperties)
Message message = new Message();
if (basicProperties != null) {
message.basicProperties = basicProperties;
}
message.basicProperties.setMessageId("M-" + UUID.randomUUID().toString()
message.basicProperties.setRoutingKey(routingKey);
message.body = body;
return message;
}
// 省略 getter setter
}
public class BasicProperties implements Serializable {
private String messageId; // 消息的唯⼀ id. 使⽤ uuid 表⽰.
private String routingKey;
private int deliveryMode = 1; // 1 表⽰消息⾮持久化. 2 表⽰消息持久化
// 省略 getter setter
}
Message 需要实现 Serializable 接⼝. 后续需要把 Message 写⼊⽂件以及进⾏⽹络传输. basicProperties 是消息的属性信息. body 是消息体.
offsetBeg 和 offsetEnd 表⽰消息在消息⽂件中所在的起始位置和结束位置.使⽤ transient 关键字避免属性被序列化.
isValid ⽤来表⽰消息在⽂件中是否有效. 这⼀块具体的设计后⾯再详细介绍. createMessageWithId 相当于⼀个⼯⼚⽅法, ⽤来创建⼀个 Message 实例.
messageId 通过 UUID 的⽅式⽣成.
五. 数据库设计
对于 Exchange, MSGQueue, Binding, 我们使⽤数据库进⾏持久化保存. 此处我们使⽤的数据库是 SQLite, 是⼀个更轻量的数据库. SQLite 只是⼀个动态库(当然, 官⽅也提供了可执⾏程序 exe), 我们在 Java 中直接引⼊ SQLite 依赖, 即 可直接使⽤, 不必安装其他的软件.
1.配置 sqlite
引⼊ pom.xml 依赖
org.xerial
sqlite-jdbc
3.41.0.1
配置数据源 application.yml
spring:
datasource:
url: jdbc:sqlite:./data/meta.db
username:
password:
driver-class-name: org.sqlite.JDBC
mybatis:
mapper-locations: classpath:mapper/**Mapper.xml
此处我们约定, 把数据库⽂件放到 ./data/meta.db 中. SQLite 只是把数据单纯的存储到⼀个⽂件中. ⾮常简单⽅便.
2.实现创建表和数据库基本操作
mapper.MetaMapper
public interface MetaMapper {
// 提供三个核心建表方法
void createExchangeTable();
void createQueueTable();
void createBindingTable();
// 插入 删除 和 查找
void insertExchange(Exchange exchange);
void deleteExchange(String exchangeName);
List selectAllExchanges();
void insertQueue(MSGQueue queue);
void deleteQueue(String queueName);
List selectAllQueues();
void insertBinding(Binding binding);
void deleteBinding(Binding binding);
List selectAllBindings();
}
MetaMapper
create table if not exists exchange (
name varchar(50) primary key,
type int,
durable boolean
);
create table if not exists queue (
name varchar(50) primary key,
durable boolean
);
create table if not exists binding (
exchangeName varchar(50),
queueName varchar(50),
bindingKey varchar(256)
);
insert into exchange values(#{name}, #{type}, #{durable});
select * from exchange;
delete from exchange where name = #{exchangeName};
insert into queue values(#{name}, #{durable});
select * from queue;
delete from queue where name = #{queueName};
insert into binding values(#{exchangeName}, #{queueName}, #{bindingKey});
select * from binding;
delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName};
3.实现 DataBaseManager
mqserver.datacenter.DataBaseManager
/*
* 通过这个类, 来整合数据库操作.
*/
public class DataBaseManager {
// 要做的是从 Spring 中拿到现成的对象
private MetaMapper metaMapper;
// 针对数据库进行初始化
public void init() {
// 手动的获取到 MetaMapper
metaMapper = MqApplication.context.getBean(MetaMapper.class);
if (!checkDBExists()) {
// 数据库不存在, 就进行建建库表操作
// 先创建一个 data 目录
File dataDir = new File("./data");
dataDir.mkdirs();
// 创建数据表
createTable();
// 插入默认数据
createDefaultData();
System.out.println("[DataBaseManager] 数据库初始化完成!");
} else {
// 数据库已经存在了, 啥都不必做即可
System.out.println("[DataBaseManager] 数据库已经存在!");
}
}
public void deleteDB() {
File file = new File("./data/meta.db");
boolean ret = file.delete();
if (ret) {
System.out.println("[DataBaseManager] 删除数据库文件成功!");
} else {
System.out.println("[DataBaseManager] 删除数据库文件失败!");
}
File dataDir = new File("./data");
// 使用 delete 删除目录的时候, 需要保证目录是空的.
ret = dataDir.delete();
if (ret) {
System.out.println("[DataBaseManager] 删除数据库目录成功!");
} else {
System.out.println("[DataBaseManager] 删除数据库目录失败!");
}
}
private boolean checkDBExists() {
File file = new File("./data/meta.db");
if (file.exists()) {
return true;
}
return false;
}
// 这个方法用来建表.
// 建库操作并不需要手动执行. (不需要手动创建 meta.db 文件)
// 首次执行这里的数据库操作的时候, 就会自动的创建出 meta.db 文件来 (MyBatis 帮我们完成的)
private void createTable() {
metaMapper.createExchangeTable();
metaMapper.createQueueTable();
metaMapper.createBindingTable();
System.out.println("[DataBaseManager] 创建表完成!");
}
// 给数据库表中, 添加默认的数据.
// 此处主要是添加一个默认的交换机.
// RabbitMQ 里有一个这样的设定: 带有一个 匿名 的交换机, 类型是 DIRECT.
private void createDefaultData() {
// 构造一个默认的交换机.
Exchange exchange = new Exchange();
exchange.setName("");
exchange.setType(ExchangeType.DIRECT);
exchange.setDurable(true);
metaMapper.insertExchange(exchange);
System.out.println("[DataBaseManager] 创建初始数据完成!");
}
// 把其他的数据库的操作, 也在这个类中封装一下.
public void insertExchange(Exchange exchange) {
metaMapper.insertExchange(exchange);
}
public List selectAllExchanges() {
return metaMapper.selectAllExchanges();
}
public void deleteExchange(String exchangeName) {
metaMapper.deleteExchange(exchangeName);
}
public void insertQueue(MSGQueue queue) {
metaMapper.insertQueue(queue);
}
public List selectAllQueues() {
return metaMapper.selectAllQueues();
}
public void deleteQueue(String queueName) {
metaMapper.deleteQueue(queueName);
}
public void insertBinding(Binding binding) {
metaMapper.insertBinding(binding);
}
public List selectAllBindings() {
return metaMapper.selectAllBindings();
}
public void deleteBinding(Binding binding) {
metaMapper.deleteBinding(binding);
}
}
4.测试 DataBaseManager
// 加上这个注解之后, 改类就会被识别为单元测试类.
@SpringBootTest
public class DataBaseManagerTests {
private DataBaseManager dataBaseManager = new DataBaseManager();
// 接下来下面这里需要编写多个 方法 . 每个方法都是一个/一组单元测试用例.
// 还需要做一个准备工作. 需要写两个方法, 分别用于进行 "准备工作" 和 "收尾工作"
// 使用这个方法, 来执行准备工作. 每个用例执行前, 都要调用这个方法.
@BeforeEach
public void setUp() {
// 由于在 init 中, 需要通过 context 对象拿到 metaMapper 实例的.
// 所以就需要先把 context 对象给搞出来.
MqApplication.context = SpringApplication.run(MqApplication.class);
dataBaseManager.init();
}
// 使用这个方法, 来执行收尾工作. 每个用例执行后, 都要调用这个方法.
@AfterEach
public void tearDown() {
// 这里要进行的操作, 就是把数据库给清空~~ (把数据库文件, meta.db 直接删了就行了)
// 注意, 此处不能直接就删除, 而需要先关闭上述 context 对象!!
// 此处的 context 对象, 持有了 MetaMapper 的实例, MetaMapper 实例又打开了 meta.db 数据库文件.
// 如果 meta.db 被别人打开了, 此时的删除文件操作是不会成功的 (Windows 系统的限制, Linux 则没这个问题).
// 另一方面, 获取 context 操作, 会占用 8080 端口. 此处的 close 也是释放 8080.
MqApplication.context.close();
dataBaseManager.deleteDB();
}
@Test
public void testInitTable() {
// 由于 init 方法, 已经在上面 setUp 中调用过了. 直接在测试用例代码中, 检查当前的数据库状态即可.
// 直接从数据库中查询. 看数据是否符合预期.
// 查交换机表, 里面应该有一个数据(匿名的 exchange); 查队列表, 没有数据; 查绑定表, 没有数据.
List exchangeList = dataBaseManager.selectAllExchanges();
List queueList = dataBaseManager.selectAllQueues();
List bindingList = dataBaseManager.selectAllBindings();
// 直接打印结果, 通过肉眼来检查结果, 固然也可以. 但是不优雅, 不方便.
// 更好的办法是使用断言.
// System.out.println(exchangeList.size());
// assertEquals 判定结果是不是相等.
// 注意这俩参数的顺序. 虽然比较相等, 谁在前谁在后, 无所谓.
// 但是 assertEquals 的形参, 第一个形参叫做 expected (预期的), 第二个形参叫做 actual (实际的)
Assertions.assertEquals(1, exchangeList.size());
Assertions.assertEquals("", exchangeList.get(0).getName());
Assertions.assertEquals(ExchangeType.DIRECT, exchangeList.get(0).getType());
Assertions.assertEquals(0, queueList.size());
Assertions.assertEquals(0, bindingList.size());
}
private Exchange createTestExchange(String exchangeName) {
Exchange exchange = new Exchange();
exchange.setName(exchangeName);
exchange.setType(ExchangeType.FANOUT);
exchange.setDurable(true);
return exchange;
}
@Test
public void testInsertExchange() {
// 构造一个 Exchange 对象, 插入到数据库中. 再查询出来, 看结果是否符合预期.
Exchange exchange = createTestExchange("testExchange");
dataBaseManager.insertExchange(exchange);
// 插入完毕之后, 查询结果
List exchangeList = dataBaseManager.selectAllExchanges();
Assertions.assertEquals(2, exchangeList.size());
Exchange newExchange = exchangeList.get(1);
Assertions.assertEquals("testExchange", newExchange.getName());
Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());
Assertions.assertEquals(true, newExchange.isDurable());
}
@Test
public void testDeleteExchange() {
// 先构造一个交换机, 插入数据库; 然后再按照名字删除即可!
Exchange exchange = createTestExchange("testExchange");
dataBaseManager.insertExchange(exchange);
List exchangeList = dataBaseManager.selectAllExchanges();
Assertions.assertEquals(2, exchangeList.size());
Assertions.assertEquals("testExchange", exchangeList.get(1).getName());
// 进行删除操作
dataBaseManager.deleteExchange("testExchange");
// 再次查询
exchangeList = dataBaseManager.selectAllExchanges();
Assertions.assertEquals(1, exchangeList.size());
Assertions.assertEquals("", exchangeList.get(0).getName());
}
private MSGQueue createTestQueue(String queueName) {
MSGQueue queue = new MSGQueue();
queue.setName(queueName);
queue.setDurable(true);
return queue;
}
@Test
public void testInsertQueue() {
MSGQueue queue = createTestQueue("testQueue");
dataBaseManager.insertQueue(queue);
List queueList = dataBaseManager.selectAllQueues();
Assertions.assertEquals(1, queueList.size());
MSGQueue newQueue = queueList.get(0);
Assertions.assertEquals("testQueue", newQueue.getName());
Assertions.assertEquals(true, newQueue.isDurable());
}
@Test
public void testDeleteQueue() {
MSGQueue queue = createTestQueue("testQueue");
dataBaseManager.insertQueue(queue);
List queueList = dataBaseManager.selectAllQueues();
Assertions.assertEquals(1, queueList.size());
// 进行删除
dataBaseManager.deleteQueue("testQueue");
queueList = dataBaseManager.selectAllQueues();
Assertions.assertEquals(0, queueList.size());
}
private Binding createTestBinding(String exchangeName, String queueName) {
Binding binding = new Binding();
binding.setExchangeName(exchangeName);
binding.setQueueName(queueName);
binding.setBindingKey("testBindingKey");
return binding;
}
@Test
public void testInsertBinding() {
Binding binding = createTestBinding("testExchange", "testQueue");
dataBaseManager.insertBinding(binding);
List bindingList = dataBaseManager.selectAllBindings();
Assertions.assertEquals(1, bindingList.size());
Assertions.assertEquals("testExchange", bindingList.get(0).getExchangeName());
Assertions.assertEquals("testQueue", bindingList.get(0).getQueueName());
Assertions.assertEquals("testBindingKey", bindingList.get(0).getBindingKey());
}
@Test
public void testDeleteBinding() {
Binding binding = createTestBinding("testExchange", "testQueue");
dataBaseManager.insertBinding(binding);
List bindingList = dataBaseManager.selectAllBindings();
Assertions.assertEquals(1, bindingList.size());
// 删除
Binding toDeleteBinding = createTestBinding("testExchange", "testQueue");
dataBaseManager.deleteBinding(toDeleteBinding);
bindingList = dataBaseManager.selectAllBindings();
Assertions.assertEquals(0, bindingList.size());
}
}
六. 消息存储设计
1.设计思路
消息需要在硬盘上存储. 但是并不直接放到数据库中, ⽽是直接使⽤⽂件存储.因为:
1. 对于消息的操作并不需要复杂的 增删改查 .
2. 对于⽂件的操作效率⽐数据库会⾼很多.
我们给每个队列分配⼀个⽬录. ⽬录的名字为 data + 队列名. 形如 ./data/testQueue 该⽬录中包含两个固定名字的⽂件.
queue_data.txt 消息数据⽂件, ⽤来保存消息内容.
queue_stat.txt 消息统计⽂件, ⽤来保存消息统计信息.
queue_data.txt ⽂件格式: 使⽤⼆进制⽅式存储. 每个消息分成两个部分:
前四个字节, 表⽰ Message 对象的⻓度(字节数)
后⾯若⼲字节, 表⽰ Message 内容.
消息和消息之间⾸尾相连.
queue_stat.txt ⽂件格式:使⽤⽂本⽅式存储.
⽂件中只包含⼀⾏, ⾥⾯包含两列(都是整数), 使⽤ t 分割.
第⼀列表⽰当前总的消息数⽬. 第⼆列表⽰有效消息数⽬.
形如:2000t1500
2.创建 MessageFileManager 类
mqserver.database.MessageFileManager
public class MessageFileManager {
// 表⽰消息的统计信息
static public class Stat {
public int totalCount;
public int validCount;
}
public void init() {
// 当前这⾥不需要做任何⼯作.
}
// 队列⽬录
private String getQueueDir(String queueName) {
return "./data/" + queueName;
}
// 队列数据⽂件
// 这个⽂件来存储队列的真实数据
private String getQueueDataPath(String queueName) {
return getQueueDir(queueName) + "/queue_data.txt";
}
// 队列统计⽂件
// 这个⽂件⽤来存储队列中的统计信息.
// 包含⼀⾏, 两个列使⽤ t 分割, 分别是总数据, 和⽆效数据.
private String getQueueStatPath(String queueName) {
return getQueueDir(queueName) + "/queue_stat.txt";
}
}
1)实现统计⽂件读写
private Stat readStat(String queueName) {
// 由于当前的消息统计文件是文本文件, 可以直接使用 Scanner 来读取文件内容
Stat stat = new Stat();
try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {
Scanner scanner = new Scanner(inputStream);
stat.totalCount = scanner.nextInt();
stat.validCount = scanner.nextInt();
return stat;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
private void writeStat(String queueName, Stat stat) {
// 使用 PrintWrite 来写文件.
// OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.
try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.write(stat.totalCount + "t" + stat.validCount);
printWriter.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
2)实现创建队列⽬录
每个队列都有⾃⼰的⽬录和配套的⽂件. 通过下列⽅法把⽬录和⽂件先准备好.
// 创建队列对应的文件和目录
public void createQueueFiles(String queueName) throws IOException {
// 1. 先创建队列对应的消息目录
File baseDir = new File(getQueueDir(queueName));
if (!baseDir.exists()) {
// 不存在, 就创建这个目录
boolean ok = baseDir.mkdirs();
if (!ok) {
throw new IOException("创建目录失败! baseDir=" + baseDir.getAbsolutePath());
}
}
// 2. 创建队列数据文件
File queueDataFile = new File(getQueueDataPath(queueName));
if (!queueDataFile.exists()) {
boolean ok = queueDataFile.createNewFile();
if (!ok) {
throw new IOException("创建文件失败! queueDataFile=" + queueDataFile.getAbsolutePath());
}
}
// 3. 创建消息统计文件
File queueStatFile = new File(getQueueStatPath(queueName));
if (!queueStatFile.exists()) {
boolean ok = queueStatFile.createNewFile();
if (!ok) {
throw new IOException("创建文件失败! queueStatFile=" + queueStatFile.getAbsolutePath());
}
}
// 4. 给消息统计文件, 设定初始值. 0t0
Stat stat = new Stat();
stat.totalCount = 0;
stat.validCount = 0;
writeStat(queueName, stat);
}
3)实现删除队列⽬录
public void destroyQueueFiles(String queueName) throws IOException {
// 先删除里面的文件, 再删除目录.
File queueDataFile = new File(getQueueDataPath(queueName));
boolean ok1 = queueDataFile.delete();
File queueStatFile = new File(getQueueStatPath(queueName));
boolean ok2 = queueStatFile.delete();
File baseDir = new File(getQueueDir(queueName));
boolean ok3 = baseDir.delete();
if (!ok1 || !ok2 || !ok3) {
// 有任意一个删除失败, 都算整体删除失败.
throw new IOException("删除队列目录和文件失败! baseDir=" + baseDir.getAbsolutePath());
}
}
注意: File 类的 delete ⽅法只能删除空⽬录. 因此需要先把内部的⽂件先删除掉.
4)检查队列⽂件是否存在
判定该队列的消息⽂件和统计⽂件是否存在. ⼀旦出现缺失, 则不能进⾏后续⼯作.
// 检查队列的目录和文件是否存在.
// 比如后续有生产者给 broker server 生产消息了, 这个消息就可能需要记录到文件上(取决于消息是否要持久化)
public boolean checkFilesExits(String queueName) {
// 判定队列的数据文件和统计文件是否都存在!!
File queueDataFile = new File(getQueueDataPath(queueName));
if (!queueDataFile.exists()) {
return false;
}
File queueStatFile = new File(getQueueStatPath(queueName));
if (!queueStatFile.exists()) {
return false;
}
return true;
}
5)实现消息对象序列化/反序列化
Message 对象需要转成⼆进制写⼊⽂件. 并且也需要把⽂件中的⼆进制读出来解析成 Message 对象. 此 处针对这⾥的逻辑进⾏封装.
创建 common.BinaryTool
public class BinaryTool {
// 把一个对象序列化成一个字节数组
public static byte[] toBytes(Object object) throws IOException {
// 这个流对象相当于一个变长的字节数组.
// 就可以把 object 序列化的数据给逐渐的写入到 byteArrayOutputStream 中, 再统一转成 byte[]
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
// 此处的 writeObject 就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到
// ObjectOutputStream 中.
// 由于 ObjectOutputStream 又是关联到了 ByteArrayOutputStream, 最终结果就写入到 ByteArrayOutputStream 里了
objectOutputStream.writeObject(object);
}
// 这个操作就是把 byteArrayOutputStream 中持有的二进制数据取出来, 转成 byte[]
return byteArrayOutputStream.toByteArray();
}
}
// 把一个字节数组, 反序列化成一个对象
public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
Object object = null;
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {
try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
// 此处的 readObject, 就是从 data 这个 byte[] 中读取数据并进行反序列化.
object = objectInputStream.readObject();
}
}
return object;
}
}
6)实现写⼊消息⽂件
// 这个方法用来把一个新的消息, 放到队列对应的文件中.
// queue 表示要把消息写入的队列. message 则是要写的消息.
public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
// 1. 检查一下当前要写入的队列对应的文件是否存在.
if (!checkFilesExits(queue.getName())) {
throw new MqException("[MessageFileManager] 队列对应的文件不存在! queueName=" + queue.getName());
}
// 2. 把 Message 对象, 进行序列化, 转成二进制的字节数组.
byte[] messageBinary = BinaryTool.toBytes(message);
synchronized (queue) {
// 3. 先获取到当前的队列数据文件的长度, 用这个来计算出该 Message 对象的 offsetBeg 和 offsetEnd
// 把新的 Message 数据, 写入到队列数据文件的末尾. 此时 Message 对象的 offsetBeg , 就是当前文件长度 + 4
// offsetEnd 就是当前文件长度 + 4 + message 自身长度.
File queueDataFile = new File(getQueueDataPath(queue.getName()));
// 通过这个方法 queueDataFile.length() 就能获取到文件的长度. 单位字节.
message.setOffsetBeg(queueDataFile.length() + 4);
message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);
// 4. 写入消息到数据文件, 注意, 是追加写入到数据文件末尾.
try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
// 接下来要先写当前消息的长度, 占据 4 个字节的~~
dataOutputStream.writeInt(messageBinary.length);
// 写入消息本体
dataOutputStream.write(messageBinary);
}
}
// 5. 更新消息统计文件
Stat stat = readStat(queue.getName());
stat.totalCount += 1;
stat.validCount += 1;
writeStat(queue.getName(), stat);
}
}
• 考虑线程安全, 按照队列维度进⾏加锁.
• 使⽤ DataOutputStream 进⾏⼆进制写操作. ⽐原⽣ OutputStream 要⽅便.
• 需要记录 Message 对象在⽂件中的偏移量. 后续的删除操作依赖这个偏移量定位到消息. offsetBeg 是原有⽂件⼤⼩的基础上, 再 + 4. 4 个字节是存放消息⼤⼩的空间. (参考上⾯的图). • 写完消息, 要同时更新统计信息
创建 common.MqException , 作为⾃定义异常类. 后续业务上出现问题, 都统⼀抛出这个异常.
public class MqException extends Exception {
public MqException(String message) {
super(message);
}
}
7)实现删除消息
此处的删除只是 “逻辑删除”, 即把 Message 类中的 isValid 字段设置为 0.
这样删除速度⽐较快. 实际的彻底删除, 则通过我们⾃⼰实现的 GC 来解决.
// 这个是删除消息的方法.
// 这里的删除是逻辑删除, 也就是把硬盘上存储的这个数据里面的那个 isValid 属性, 设置成 0
// 1. 先把文件中的这一段数据, 读出来, 还原回 Message 对象;
// 2. 把 isValid 改成 0;
// 3. 把上述数据重新写回到文件.
// 此处这个参数中的 message 对象, 必须得包含有效的 offsetBeg 和 offsetEnd
public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {
synchronized (queue) {
try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {
// 1. 先从文件中读取对应的 Message 数据.
byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.read(bufferSrc);
// 2. 把当前读出来的二进制数据, 转换回成 Message 对象
Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);
// 3. 把 isValid 设置为无效.
diskMessage.setIsValid((byte) 0x0);
// 此处不需要给参数的这个 message 的 isValid 设为 0, 因为这个参数代表的是内存中管理的 Message 对象
// 而这个对象马上也要被从内存中销毁了.
// 4. 重新写入文件
byte[] bufferDest = BinaryTool.toBytes(diskMessage);
// 虽然上面已经 seek 过了, 但是上面 seek 完了之后, 进行了读操作, 这一读, 就导致, 文件光标往后移动, 移动到
// 下一个消息的位置了. 因此要想让接下来的写入, 能够刚好写回到之前的位置, 就需要重新调整文件光标.
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.write(bufferDest);
// 通过上述这通折腾, 对于文件来说, 只是有一个字节发生改变而已了~~
}
// 不要忘了, 更新统计文件!! 把一个消息设为无效了, 此时有效消息个数就需要 - 1
Stat stat = readStat(queue.getName());
if (stat.validCount > 0) {
stat.validCount -= 1;
}
writeStat(queue.getName(), stat);
}
}
8)实现消息加载
// 使用这个方法, 从文件中, 读取出所有的消息内容, 加载到内存中(具体来说是放到一个链表里)
// 这个方法, 准备在程序启动的时候, 进行调用.
// 这里使用一个 LinkedList, 主要目的是为了后续进行头删操作.
// 这个方法的参数, 只是一个 queueName 而不是 MSGQueue 对象. 因为这个方法不需要加锁, 只使用 queueName 就够了.
// 由于该方法是在程序启动时调用, 此时服务器还不能处理请求呢~~ 不涉及多线程操作文件.
public LinkedList loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
LinkedList messages = new LinkedList();
try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {
try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {
// 这个变量记录当前文件光标.
long currentOffset = 0;
// 一个文件中包含了很多消息, 此处势必要循环读取.
while (true) {
// 1. 读取当前消息的长度, 这里的 readInt 可能会读到文件的末尾(EOF)
// readInt 方法, 读到文件末尾, 会抛出 EOFException 异常. 这一点和之前的很多流对象不太一样.
int messageSize = dataInputStream.readInt();
// 2. 按照这个长度, 读取消息内容
byte[] buffer = new byte[messageSize];
int actualSize = dataInputStream.read(buffer);
if (messageSize != actualSize) {
// 如果不匹配, 说明文件有问题, 格式错乱了!!
throw new MqException("[MessageFileManager] 文件格式错误! queueName=" + queueName);
}
// 3. 把这个读到的二进制数据, 反序列化回 Message 对象
Message message = (Message) BinaryTool.fromBytes(buffer);
// 4. 判定一下看看这个消息对象, 是不是无效对象.
if (message.getIsValid() != 0x1) {
// 无效数据, 直接跳过.
// 虽然消息是无效数据, 但是 offset 不要忘记更新.
currentOffset += (4 + messageSize);
continue;
}
// 5. 有效数据, 则需要把这个 Message 对象加入到链表中. 加入之前还需要填写 offsetBeg 和 offsetEnd
// 进行计算 offset 的时候, 需要知道当前文件光标的位置的. 由于当下使用的 DataInputStream 并不方便直接获取到文件光标位置
// 因此就需要手动计算下文件光标.
message.setOffsetBeg(currentOffset + 4);
message.setOffsetEnd(currentOffset + 4 + messageSize);
currentOffset += (4 + messageSize);
messages.add(message);
}
} catch (EOFException e) {
// 这个 catch 并非真是处理 "异常", 而是处理 "正常" 的业务逻辑. 文件读到末尾, 会被 readInt 抛出该异常.
// 这个 catch 语句中也不需要做啥特殊的事情
System.out.println("[MessageFileManager] 恢复 Message 数据完成!");
}
}
return messages;
}
9)实现垃圾回收(GC)
上述删除操作, 只是把消息在⽂件上标记成了⽆效. 并没有腾出硬盘空间. 最终⽂件⼤⼩可能会越积越 多. 因此需要定期的进⾏批量清除. 此处使⽤类似于复制算法. 当总消息数超过 2000, 并且有效消息数⽬少于 50% 的时候, 就触发 GC. GC 的时候会把所有有效消息加载出来, 写⼊到⼀个新的消息⽂件中, 使⽤新⽂件, 代替旧⽂件即可.
// 检查当前是否要针对该队列的消息数据文件进行 GC
public boolean checkGC(String queueName) {
// 判定是否要 GC, 是根据总消息数和有效消息数. 这两个值都是在 消息统计文件 中的.
Stat stat = readStat(queueName);
if (stat.totalCount > 2000 && (double)stat.validCount / (double)stat.totalCount messages = loadAllMessageFromQueue(queue.getName());
// 3. 把有效消息, 写入到新的文件中.
try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
for (Message message : messages) {
byte[] buffer = BinaryTool.toBytes(message);
// 先写四个字节消息的长度
dataOutputStream.writeInt(buffer.length);
dataOutputStream.write(buffer);
}
}
}
// 4. 删除旧的数据文件, 并且把新的文件进行重命名
File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
ok = queueDataOldFile.delete();
if (!ok) {
throw new MqException("[MessageFileManager] 删除旧的数据文件失败! queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
}
// 把 queue_data_new.txt => queue_data.txt
ok = queueDataNewFile.renameTo(queueDataOldFile);
if (!ok) {
throw new MqException("[MessageFileManager] 文件重命名失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()
+ ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
}
// 5. 更新统计文件
Stat stat = readStat(queue.getName());
stat.totalCount = messages.size();
stat.validCount = messages.size();
writeStat(queue.getName(), stat);
long gcEnd = System.currentTimeMillis();
System.out.println("[MessageFileManager] gc 执行完毕! queueName=" + queue.getName() + ", time="
+ (gcEnd - gcBeg) + "ms");
}
}
10)测试 MessageFileManager
MessageFileManagerTests
@SpringBootTest
public class MessageFileManagerTests {
private MessageFileManager messageFileManager = new MessageFileManager();
private static final String queueName1 = "testQueue1";
private static final String queueName2 = "testQueue2";
// 这个方法是每个用例执行之前的准备工作
@BeforeEach
public void setUp() throws IOException {
// 准备阶段, 创建出两个队列, 以备后用
messageFileManager.createQueueFiles(queueName1);
messageFileManager.createQueueFiles(queueName2);
}
// 这个方法就是每个用例执行完毕之后的收尾工作
@AfterEach
public void tearDown() throws IOException {
// 收尾阶段, 就把刚才的队列给干掉.
messageFileManager.destroyQueueFiles(queueName1);
messageFileManager.destroyQueueFiles(queueName2);
}
@Test
public void testCreateFiles() {
// 创建队列文件已经在上面 setUp 阶段执行过了. 此处主要是验证看看文件是否存在.
File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");
Assertions.assertEquals(true, queueDataFile1.isFile());
File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");
Assertions.assertEquals(true, queueStatFile1.isFile());
File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");
Assertions.assertEquals(true, queueDataFile2.isFile());
File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");
Assertions.assertEquals(true, queueStatFile2.isFile());
}
@Test
public void testReadWriteStat() {
MessageFileManager.Stat stat = new MessageFileManager.Stat();
stat.totalCount = 100;
stat.validCount = 50;
// 此处就需要使用反射的方式, 来调用 writeStat 和 readStat 了.
// Java 原生的反射 API 其实非常难用~~
// 此处使用 Spring 帮我们封装好的 反射 的工具类.
ReflectionTestUtils.invokeMethod(messageFileManager, "writeStat", queueName1, stat);
// 写入完毕之后, 再调用一下读取, 验证读取的结果和写入的数据是一致的.
MessageFileManager.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);
Assertions.assertEquals(100, newStat.totalCount);
Assertions.assertEquals(50, newStat.validCount);
System.out.println("测试 readStat 和 writeStat 完成!");
}
private MSGQueue createTestQueue(String queueName) {
MSGQueue queue = new MSGQueue();
queue.setName(queueName);
queue.setDurable(true);
return queue;
}
private Message createTestMessage(String content) {
Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());
return message;
}
@Test
public void testSendMessage() throws IOException, MqException, ClassNotFoundException {
// 构造出消息, 并且构造出队列.
Message message = createTestMessage("testMessage");
// 此处创建的 queue 对象的 name, 不能随便写, 只能用 queueName1 和 queueName2. 需要保证这个队列对象
// 对应的目录和文件啥的都存在才行.
MSGQueue queue = createTestQueue(queueName1);
// 调用发送消息方法
messageFileManager.sendMessage(queue, message);
// 检查 stat 文件.
MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);
Assertions.assertEquals(1, stat.totalCount);
Assertions.assertEquals(1, stat.validCount);
// 检查 data 文件
LinkedList messages = messageFileManager.loadAllMessageFromQueue(queueName1);
Assertions.assertEquals(1, messages.size());
Message curMessage = messages.get(0);
Assertions.assertEquals(message.getMessageId(), curMessage.getMessageId());
Assertions.assertEquals(message.getRoutingKey(), curMessage.getRoutingKey());
Assertions.assertEquals(message.getDeliverMode(), curMessage.getDeliverMode());
// 比较两个字节数组的内容是否相同, 不能直接使用 assertEquals 了.
Assertions.assertArrayEquals(message.getBody(), curMessage.getBody());
System.out.println("message: " + curMessage);
}
@Test
public void testLoadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {
// 往队列中插入 100 条消息, 然后验证看看这 100 条消息从文件中读取之后, 是否和最初是一致的.
MSGQueue queue = createTestQueue(queueName1);
List expectedMessages = new LinkedList();
for (int i = 0; i actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
Assertions.assertEquals(expectedMessages.size(), actualMessages.size());
for (int i = 0; i expectedMessages = new LinkedList();
for (int i = 0; i actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
Assertions.assertEquals(7, actualMessages.size());
for (int i = 0; i expectedMessages = new LinkedList();
for (int i = 0; i actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
Assertions.assertEquals(50, actualMessages.size());
for (int i = 0; i afterGCLength);
}
}
七. 整合数据库和⽂件
上述代码中, 使⽤数据库存储了 Exchange, Queue, Binding, 使⽤⽂本⽂件存储了 Message. 接下来我们把两个部分整合起来, 统⼀进⾏管理.
创建 DiskDataCenter 使⽤ DiskDataCenter 来综合管理数据库和⽂本⽂件的内容. DiskDataCenter 会持有 DataBaseManager 和 MessageFileManager 对象.
public class DiskDataCenter {
// 这个实例用来管理数据库中的数据
private DataBaseManager dataBaseManager = new DataBaseManager();
// 这个实例用来管理数据文件中的数据
private MessageFileManager messageFileManager = new MessageFileManager();
public void init() {
// 针对上述两个实例进行初始化.
dataBaseManager.init();
// 当前 messageFileManager.init 是空的方法, 只是先列在这里, 一旦后续需要扩展, 就在这里进行初始化即可.
messageFileManager.init();
}
// 封装交换机操作
public void insertExchange(Exchange exchange) {
dataBaseManager.insertExchange(exchange);
}
public void deleteExchange(String exchangeName) {
dataBaseManager.deleteExchange(exchangeName);
}
public List selectAllExchanges() {
return dataBaseManager.selectAllExchanges();
}
// 封装队列操作
public void insertQueue(MSGQueue queue) throws IOException {
dataBaseManager.insertQueue(queue);
// 创建队列的同时, 不仅仅是把队列对象写到数据库中, 还需要创建出对应的目录和文件
messageFileManager.createQueueFiles(queue.getName());
}
public void deleteQueue(String queueName) throws IOException {
dataBaseManager.deleteQueue(queueName);
// 删除队列的同时, 不仅仅是把队列从数据库中删除, 还需要删除对应的目录和文件
messageFileManager.destroyQueueFiles(queueName);
}
public List selectAllQueues() {
return dataBaseManager.selectAllQueues();
}
// 封装绑定操作
public void insertBinding(Binding binding) {
dataBaseManager.insertBinding(binding);
}
public void deleteBinding(Binding binding) {
dataBaseManager.deleteBinding(binding);
}
public List selectAllBindings() {
return dataBaseManager.selectAllBindings();
}
// 封装消息操作
public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {
messageFileManager.sendMessage(queue, message);
}
public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {
messageFileManager.deleteMessage(queue, message);
if (messageFileManager.checkGC(queue.getName())) {
messageFileManager.gc(queue);
}
}
public LinkedList loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
return messageFileManager.loadAllMessageFromQueue(queueName);
}
}
八. 内存数据结构设计
硬盘上存储数据, 只是为了实现 “持久化” 这样的效果. 但是实际的消息存储/转发, 还是主要靠内存的结 构. 对于 MQ 来说, 内存部分是更关键的, 内存速度更快, 可以达成更⾼的并发.
创建 MemoryDataCenter
mqserver.datacenter.MemoryDataCenter
public class MemoryDataCenter {
// key 是 exchangeName, value 是 Exchange 对象
private ConcurrentHashMap exchangeMap = new ConcurrentHashMap();
// key 是 queueName, value 是 MSGQueue 对象
private ConcurrentHashMap queueMap = new ConcurrentHashMap();
// 第一个 key 是 exchangeName, 第二个 key 是 queueName
private ConcurrentHashMap> bindingsMap = new ConcurrentHashMap();
// key 是 messageId, value 是 Message 对象
private ConcurrentHashMap messageMap = new ConcurrentHashMap();
// key 是 queueName, value 是一个 Message 的链表
private ConcurrentHashMap> queueMessageMap = new ConcurrentHashMap();
// 第一个 key 是 queueName, 第二个 key 是 messageId
private ConcurrentHashMap> queueMessageWaitAckMap = new ConcurrentHashMap();
public void init(){
}
}
• 使⽤四个哈希表, 管理 Exchange, Queue, Binding, Message.
• 使⽤⼀个哈希表 + 链表管理 队列 -> 消息 之间的关系.
• 使⽤⼀个哈希表 + 哈希表管理所有的未被确认的消息.
为了保证消息被正确消费了, 会使⽤两种⽅式进⾏确认. ⾃动 ACK 和 ⼿动 ACK. 其中⾃动 ACK 是指当消息被消费之后, 就会⽴即被销毁释放. 其中⼿动 ACK 是指当消息被消费之后, 由消费者主动调⽤⼀个 basicAck ⽅法, 进⾏主动确认. 服务器 收到这个确认之后, 才能真正销毁消息. 此处的 “未确认消息” 就是指在⼿动 ACK 模式下, 该消息还没有被调⽤ basicAck. 此时消息不能删除, 但是要和其他未消费的消息区分开. 于是另搞了个结构. 当后续 basicAck 到了, 就可以删除消息了.
1.封装 Exchange 、Queue 、Binding 、Message 方法
public void insertExchange(Exchange exchange) {
exchangeMap.put(exchange.getName(), exchange);
System.out.println("[MemoryDataCenter] 新交换机添加成功! exchangeName=" + exchange.getName());
}
public Exchange getExchange(String exchangeName) {
return exchangeMap.get(exchangeName);
}
public void deleteExchange(String exchangeName) {
exchangeMap.remove(exchangeName);
System.out.println("[MemoryDataCenter] 交换机删除成功! exchangeName=" + exchangeName);
}
public void insertQueue(MSGQueue queue) {
queueMap.put(queue.getName(), queue);
System.out.println("[MemoryDataCenter] 新队列添加成功! queueName=" + queue.getName());
}
public MSGQueue getQueue(String queueName) {
return queueMap.get(queueName);
}
public void deleteQueue(String queueName) {
queueMap.remove(queueName);
System.out.println("[MemoryDataCenter] 队列删除成功! queueName=" + queueName);
}
public void insertBinding(Binding binding) throws MqException {
// ConcurrentHashMap bindingMap = bindingsMap.get(binding.getExchangeName());
// if (bindingMap == null) {
// bindingMap = new ConcurrentHashMap();
// bindingsMap.put(binding.getExchangeName(), bindingMap);
// }
// 先使用 exchangeName 查一下, 对应的哈希表是否存在. 不存在就创建一个.
ConcurrentHashMap bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
k -> new ConcurrentHashMap());
synchronized (bindingMap) {
// 再根据 queueName 查一下. 如果已经存在, 就抛出异常. 不存在才能插入.
if (bindingMap.get(binding.getQueueName()) != null) {
throw new MqException("[MemoryDataCenter] 绑定已经存在! exchangeName=" + binding.getExchangeName() +
", queueName=" + binding.getQueueName());
}
bindingMap.put(binding.getQueueName(), binding);
}
System.out.println("[MemoryDataCenter] 新绑定添加成功! exchangeName=" + binding.getExchangeName()
+ ", queueName=" + binding.getQueueName());
}
// 获取绑定, 写两个版本:
// 1. 根据 exchangeName 和 queueName 确定唯一一个 Binding
// 2. 根据 exchangeName 获取到所有的 Binding
public Binding getBinding(String exchangeName, String queueName) {
ConcurrentHashMap bindingMap = bindingsMap.get(exchangeName);
if (bindingMap == null) {
return null;
}
return bindingMap.get(queueName);
}
public ConcurrentHashMap getBindings(String exchangeName) {
return bindingsMap.get(exchangeName);
}
public void deleteBinding(Binding binding) throws MqException {
ConcurrentHashMap bindingMap = bindingsMap.get(binding.getExchangeName());
if (bindingMap == null) {
// 该交换机没有绑定任何队列. 报错.
throw new MqException("[MemoryDataCenter] 绑定不存在! exchangeName=" + binding.getExchangeName()
+ ", queueName=" + binding.getQueueName());
}
bindingMap.remove(binding.getQueueName());
System.out.println("[MemoryDataCenter] 绑定删除成功! exchangeName=" + binding.getExchangeName()
+ ", queueName=" + binding.getQueueName());
}
// 添加消息
public void addMessage(Message message) {
messageMap.put(message.getMessageId(), message);
System.out.println("[MemoryDataCenter] 新消息添加成功! messageId=" + message.getMessageId());
}
// 根据 id 查询消息
public Message getMessage(String messageId) {
return messageMap.get(messageId);
}
// 根据 id 删除消息
public void removeMessage(String messageId) {
messageMap.remove(messageId);
System.out.println("[MemoryDataCenter] 消息被移除! messageId=" + messageId);
}
// 发送消息到指定队列
public void sendMessage(MSGQueue queue, Message message) {
// 把消息放到对应的队列数据结构中.
// 先根据队列的名字, 找到该队列对应的消息链表.
LinkedList messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList());
// 再把数据加到 messages 里面
synchronized (messages) {
messages.add(message);
}
// 在这里把该消息也往消息中心中插入一下. 假设如果 message 已经在消息中心存在, 重复插入也没关系.
// 主要就是相同 messageId, 对应的 message 的内容一定是一样的. (服务器代码不会对 Message 内容做修改 basicProperties 和 body)
addMessage(message);
System.out.println("[MemoryDataCenter] 消息被投递到队列中! messageId=" + message.getMessageId());
}
// 从队列中取消息
public Message pollMessage(String queueName) {
// 根据队列名, 查找一下, 对应的队列的消息链表.
LinkedList messages = queueMessageMap.get(queueName);
if (messages == null) {
return null;
}
synchronized (messages) {
// 如果没找到, 说明队列中没有任何消息.
if (messages.size() == 0) {
return null;
}
// 链表中有元素, 就进行头删.
Message currentMessage = messages.remove(0);
System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId=" + currentMessage.getMessageId());
return currentMessage;
}
}
// 获取指定队列中消息的个数
public int getMessageCount(String queueName) {
LinkedList messages = queueMessageMap.get(queueName);
if (messages == null) {
// 队列中没有消息
return 0;
}
synchronized (messages) {
return messages.size();
}
}
2.针对未确认的消息的处理
// 添加未确认的消息
public void addMessageWaitAck(String queueName, Message message) {
ConcurrentHashMap messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,
k -> new ConcurrentHashMap());
messageHashMap.put(message.getMessageId(), message);
System.out.println("[MemoryDataCenter] 消息进入待确认队列! messageId=" + message.getMessageId());
}
// 删除未确认的消息(消息已经确认了)
public void removeMessageWaitAck(String queueName, String messageId) {
ConcurrentHashMap messageHashMap = queueMessageWaitAckMap.get(queueName);
if (messageHashMap == null) {
return;
}
messageHashMap.remove(messageId);
System.out.println("[MemoryDataCenter] 消息从待确认队列删除! messageId=" + messageId);
}
3.实现重启后恢复内存
// 这个方法就是从硬盘上读取数据, 把硬盘中之前持久化存储的各个维度的数据都恢复到内存中.
public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {
// 0. 清空之前的所有数据
exchangeMap.clear();
queueMap.clear();
bindingsMap.clear();
messageMap.clear();
queueMessageMap.clear();
// 1. 恢复所有的交换机数据
List exchanges = diskDataCenter.selectAllExchanges();
for (Exchange exchange : exchanges) {
exchangeMap.put(exchange.getName(), exchange);
}
// 2. 恢复所有的队列数据
List queues = diskDataCenter.selectAllQueues();
for (MSGQueue queue : queues) {
queueMap.put(queue.getName(), queue);
}
// 3. 恢复所有的绑定数据
List bindings = diskDataCenter.selectAllBindings();
for (Binding binding : bindings) {
ConcurrentHashMap bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
k -> new ConcurrentHashMap());
bindingMap.put(binding.getQueueName(), binding);
}
// 4. 恢复所有的消息数据
// 遍历所有的队列, 根据每个队列的名字, 获取到所有的消息.
for (MSGQueue queue : queues) {
LinkedList messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());
queueMessageMap.put(queue.getName(), messages);
for (Message message : messages) {
messageMap.put(message.getMessageId(), message);
}
}
// 注意!! 针对 "未确认的消息" 这部分内存中的数据, 不需要从硬盘恢复. 之前考虑硬盘存储的时候, 也没设定这一块.
// 一旦在等待 ack 的过程中, 服务器重启了, 此时这些 "未被确认的消息", 就恢复成 "未被取走的消息" .
// 这个消息在硬盘上存储的时候, 就是当做 "未被取走"
}
4.测试 MemoryDataCenter
创建 MemoryDataCenterTests
@SpringBootTest
public class MemoryDataCenterTests {
private MemoryDataCenter memoryDataCenter = null;
@BeforeEach
public void setUp() {
memoryDataCenter = new MemoryDataCenter();
}
@AfterEach
public void tearDown() {
memoryDataCenter = null;
}
// 创建一个测试交换机
private Exchange createTestExchange(String exchangeName) {
Exchange exchange = new Exchange();
exchange.setName(exchangeName);
exchange.setType(ExchangeType.DIRECT);
exchange.setDurable(true);
return exchange;
}
// 创建一个测试队列
private MSGQueue createTestQueue(String queueName) {
MSGQueue queue = new MSGQueue();
queue.setName(queueName);
queue.setDurable(true);
return queue;
}
// 针对交换机进行测试
@Test
public void testExchange() {
// 1. 先构造一个交换机并插入.
Exchange expectedExchange = createTestExchange("testExchange");
memoryDataCenter.insertExchange(expectedExchange);
// 2. 查询出这个交换机, 比较结果是否一致. 此处直接比较这俩引用指向同一个对象.
Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
Assertions.assertEquals(expectedExchange, actualExchange);
// 3. 删除这个交换机
memoryDataCenter.deleteExchange("testExchange");
// 4. 再查一次, 看是否就查不到了
actualExchange = memoryDataCenter.getExchange("testExchange");
Assertions.assertNull(actualExchange);
}
// 针对队列进行测试
@Test
public void testQueue() {
// 1. 构造一个队列, 并插入
MSGQueue expectedQueue = createTestQueue("testQueue");
memoryDataCenter.insertQueue(expectedQueue);
// 2. 查询这个队列, 并比较
MSGQueue actualQueue = memoryDataCenter.getQueue("testQueue");
Assertions.assertEquals(expectedQueue, actualQueue);
// 3. 删除这个队列
memoryDataCenter.deleteQueue("testQueue");
// 4. 再次查询队列, 看是否能查到
actualQueue = memoryDataCenter.getQueue("testQueue");
Assertions.assertNull(actualQueue);
}
// 针对绑定进行测试
@Test
public void testBinding() throws MqException {
Binding expectedBinding = new Binding();
expectedBinding.setExchangeName("testExchange");
expectedBinding.setQueueName("testQueue");
expectedBinding.setBindingKey("testBindingKey");
memoryDataCenter.insertBinding(expectedBinding);
Binding actualBinding = memoryDataCenter.getBinding("testExchange", "testQueue");
Assertions.assertEquals(expectedBinding, actualBinding);
ConcurrentHashMap bindingMap = memoryDataCenter.getBindings("testExchange");
Assertions.assertEquals(1, bindingMap.size());
Assertions.assertEquals(expectedBinding, bindingMap.get("testQueue"));
memoryDataCenter.deleteBinding(expectedBinding);
actualBinding = memoryDataCenter.getBinding("testExchange", "testQueue");
Assertions.assertNull(actualBinding);
}
private Message createTestMessage(String content) {
Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());
return message;
}
@Test
public void testMessage() {
Message expectedMessage = createTestMessage("testMessage");
memoryDataCenter.addMessage(expectedMessage);
Message actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());
Assertions.assertEquals(expectedMessage, actualMessage);
memoryDataCenter.removeMessage(expectedMessage.getMessageId());
actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());
Assertions.assertNull(actualMessage);
}
@Test
public void testSendMessage() {
// 1. 创建一个队列, 创建 10 条消息, 把这些消息都插入队列中.
MSGQueue queue = createTestQueue("testQueue");
List expectedMessages = new ArrayList();
for (int i = 0; i actualMessages = new ArrayList();
while (true) {
Message message = memoryDataCenter.pollMessage("testQueue");
if (message == null) {
break;
}
actualMessages.add(message);
}
// 3. 比较取出的消息和之前的消息是否一致.
Assertions.assertEquals(expectedMessages.size(), actualMessages.size());
for (int i = 0; i
九. 虚拟主机设计
⾄此, 内存和硬盘的数据都已经组织完成. 接下来使⽤ “虚拟主机” 这个概念, 把这两部分的数据也串起 来. 并且实现⼀些 MQ 的关键 API.
1.创建 VirtualHost
public class VirtualHost {
private String virtualHostName;
private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();
private DiskDataCenter diskDataCenter = new DiskDataCenter();
private Router router = new Router();
private ConsumerManager consumerManager = new ConsumerManager(this);
}
2.实现构造⽅法和 getter
构造⽅法中会针对 DiskDataCenter 和 MemoryDataCenter 进⾏初始化. 同时会把硬盘的数据恢复到内存中.
public String getVirtualHostName() {
return virtualHostName;
}
public MemoryDataCenter getMemoryDataCenter() {
return memoryDataCenter;
}
public DiskDataCenter getDiskDataCenter() {
return diskDataCenter;
}
public VirtualHost(String name) {
this.virtualHostName = name;
// 对于 MemoryDataCenter 来说, 不需要额外的初始化操作的. 只要对象 new 出来就行了
// 但是, 针对 DiskDataCenter 来说, 则需要进行初始化操作. 建库建表和初始数据的设定.
diskDataCenter.init();
// 另外还需要针对硬盘的数据, 进行恢复到内存中.
try {
memoryDataCenter.recovery(diskDataCenter);
} catch (IOException | MqException | ClassNotFoundException e) {
e.printStackTrace();
System.out.println("[VirtualHost] 恢复内存数据失败!");
}
}
3.创建交换机
约定, 交换机/队列的名字, 都加上 VirtualHostName 作为前缀. 这样不同 VirtualHost 中就可以存在 同名的交换机或者队列了.
exchangeDeclare 的语义是, 不存在就创建, 存在则直接返回. 因此不叫做 “exchangeCreate”. 先写硬盘, 后写内存. 因为写硬盘失败概率更⼤. 如果硬盘写失败了, 也就不必写内存了
// 创建交换机
// 如果交换机不存在, 就创建. 如果存在, 直接返回.
// 返回值是 boolean. 创建成功, 返回 true. 失败返回 false
public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable) {
// 把交换机的名字, 加上虚拟主机作为前缀.
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker) {
// 1. 判定该交换机是否已经存在. 直接通过内存查询.
Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);
if (existsExchange != null) {
// 该交换机已经存在!
System.out.println("[VirtualHost] 交换机已经存在! exchangeName=" + exchangeName);
return true;
}
// 2. 真正创建交换机. 先构造 Exchange 对象
Exchange exchange = new Exchange();
exchange.setName(exchangeName);
exchange.setType(exchangeType);
exchange.setDurable(durable);
// 3. 把交换机对象写入硬盘
if (durable) {
diskDataCenter.insertExchange(exchange);
}
// 4. 把交换机对象写入内存
memoryDataCenter.insertExchange(exchange);
System.out.println("[VirtualHost] 交换机创建完成! exchangeName=" + exchangeName);
// 上述逻辑, 先写硬盘, 后写内存. 目的就是因为硬盘更容易写失败. 如果硬盘写失败了, 内存就不写了.
// 要是先写内存, 内存写成功了, 硬盘写失败了, 还需要把内存的数据给再删掉. 就比较麻烦了.
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 交换机创建失败! exchangeName=" + exchangeName);
e.printStackTrace();
return false;
}
}
4.删除交换机
// 删除交换机
public boolean exchangeDelete(String exchangeName) {
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker) {
// 1. 先找到对应的交换机.
Exchange toDelete = memoryDataCenter.getExchange(exchangeName);
if (toDelete == null) {
throw new MqException("[VirtualHost] 交换机不存在无法删除!");
}
// 2. 删除硬盘上的数据
if (toDelete.isDurable()) {
diskDataCenter.deleteExchange(exchangeName);
}
// 3. 删除内存中的交换机数据
memoryDataCenter.deleteExchange(exchangeName);
System.out.println("[VirtualHost] 交换机删除成功! exchangeName=" + exchangeName);
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 交换机删除失败! exchangeName=" + exchangeName);
e.printStackTrace();
return false;
}
}
5.创建队列
// 创建队列
public boolean queueDeclare(String queueName, boolean durable) {
// 把队列的名字, 给拼接上虚拟主机的名字.
queueName = virtualHostName + queueName;
try {
synchronized (queueLocker) {
// 1. 判定队列是否存在
MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);
if (existsQueue != null) {
System.out.println("[VirtualHost] 队列已经存在! queueName=" + queueName);
return true;
}
// 2. 创建队列对象
MSGQueue queue = new MSGQueue();
queue.setName(queueName);
queue.setDurable(durable);
// 3. 写硬盘
if (durable) {
diskDataCenter.insertQueue(queue);
}
// 4. 写内存
memoryDataCenter.insertQueue(queue);
System.out.println("[VirtualHost] 队列创建成功! queueName=" + queueName);
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 队列创建失败! queueName=" + queueName);
e.printStackTrace();
return false;
}
}
6.删除队列
// 删除队列
public boolean queueDelete(String queueName) {
queueName = virtualHostName + queueName;
try {
synchronized (queueLocker) {
// 1. 根据队列名字, 查询下当前的队列对象
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null) {
throw new MqException("[VirtualHost] 队列不存在! 无法删除! queueName=" + queueName);
}
// 2. 删除硬盘数据
if (queue.isDurable()) {
diskDataCenter.deleteQueue(queueName);
}
// 3. 删除内存数据
memoryDataCenter.deleteQueue(queueName);
System.out.println("[VirtualHost] 删除队列成功! queueName=" + queueName);
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 删除队列失败! queueName=" + queueName);
e.printStackTrace();
return false;
}
}
7.创建绑定
bindingKey 是进⾏ topic 转发时的⼀个关键概念. 使⽤ router 类来检测是否是合法的 bindingKey.
public boolean queueBind(String queueName, String exchangeName, String bindingKey) {
queueName = virtualHostName + queueName;
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker) {
synchronized (queueLocker) {
// 1. 判定当前的绑定是否已经存在了.
Binding existsBinding = memoryDataCenter.getBinding(exchangeName, queueName);
if (existsBinding != null) {
throw new MqException("[VirtualHost] binding 已经存在! queueName=" + queueName
+ ", exchangeName=" + exchangeName);
}
// 2. 验证 bindingKey 是否合法.
if (!router.checkBindingKey(bindingKey)) {
throw new MqException("[VirtualHost] bindingKey 非法! bindingKey=" + bindingKey);
}
// 3. 创建 Binding 对象
Binding binding = new Binding();
binding.setExchangeName(exchangeName);
binding.setQueueName(queueName);
binding.setBindingKey(bindingKey);
// 4. 获取一下对应的交换机和队列. 如果交换机或者队列不存在, 这样的绑定也是无法创建的.
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null) {
throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);
}
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if (exchange == null) {
throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);
}
// 5. 先写硬盘
if (queue.isDurable() && exchange.isDurable()) {
diskDataCenter.insertBinding(binding);
}
// 6. 写入内存
memoryDataCenter.insertBinding(binding);
}
}
System.out.println("[VirtualHost] 绑定创建成功! exchangeName=" + exchangeName
+ ", queueName=" + queueName);
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 绑定创建失败! exchangeName=" + exchangeName
+ ", queueName=" + queueName);
e.printStackTrace();
return false;
}
}
8.删除绑定
public boolean queueUnbind(String queueName, String exchangeName) {
queueName = virtualHostName + queueName;
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker) {
synchronized (queueLocker) {
// 1. 获取 binding 看是否已经存在~
Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);
if (binding == null) {
throw new MqException("[VirtualHost] 删除绑定失败! 绑定不存在! exchangeName=" + exchangeName + ", queueName=" + queueName);
}
// 2. 无论绑定是否持久化了, 都尝试从硬盘删一下. 就算不存在, 这个删除也无副作用.
diskDataCenter.deleteBinding(binding);
// 3. 删除内存的数据
memoryDataCenter.deleteBinding(binding);
System.out.println("[VirtualHost] 删除绑定成功!");
}
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 删除绑定失败!");
e.printStackTrace();
return false;
}
}
9.发布消息
• 发布消息其实是把消息发送给指定的 Exchange, 再根据 Exchange 和 Queue 的 Binding 关系, 转发 到对应队列中.
• 发送消息需要指定 routingKey, 这个值的作⽤和 ExchangeType 是相关的.
◦ Direct: routingKey 就是对应队列的名字. 此时不需要 binding 关系, 也不需要 bindingKey, 就可 以直接转发消息.
◦ Fanout: routingKey 不起作⽤, bindingKey 也不起作⽤. 此时消息会转发给绑定到该交换机上的 所有队列中.
◦ Topic: routingKey 是⼀个特定的字符串, 会和 bindingKey 进⾏匹配. 如果匹配成功, 则发到对应 的队列中. 具体规则后续介绍.
• BasicProperties 是消息的元信息. body 是消息本体.
// 发送消息到指定的交换机/队列中.
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {
try {
// 1. 转换交换机的名字
exchangeName = virtualHostName + exchangeName;
// 2. 检查 routingKey 是否合法.
if (!router.checkRoutingKey(routingKey)) {
throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);
}
// 3. 查找交换机对象
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if (exchange == null) {
throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);
}
// 4. 判定交换机的类型
if (exchange.getType() == ExchangeType.DIRECT) {
// 按照直接交换机的方式来转发消息
// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.
// 此时, 可以无视绑定关系.
String queueName = virtualHostName + routingKey;
// 5. 构造消息对象
Message message = Message.createMessageWithId(routingKey, basicProperties, body);
// 6. 查找该队列名对应的对象
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null) {
throw new MqException("[VirtualHost] 队列不存在! queueNa服务器托管网me=" + queueName);
}
// 7. 队列存在, 直接给队列中写入消息
sendMessage(queue, message);
} else {
// 按照 fanout 和 topic 的方式来转发.
// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象
ConcurrentHashMap bindingsMap = memoryDataCenter.getBindings(exchangeName);
for (Map.Entry entry : bindingsMap.entrySet()) {
// 1) 获取到绑定对象, 判定对应的队列是否存在
Binding binding = entry.getValue();
MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
if (queue == null) {
// 此处咱们就不抛出异常了. 可能此处有多个这样的队列.
// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.
System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());
continue;
}
// 2) 构造消息对象
Message message = Message.createMessageWithId(routingKey, basicProperties, body);
// 3) 判定这个消息是否能转发给该队列.
// 如果是 fanout, 所有绑定的队列都要转发的.
// 如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配.
if (!router.route(exchange.getType(), binding, message)) {
continue;
}
// 4) 真正转发消息给队列
sendMessage(queue, message);
}
}
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] 消息发送失败!");
e.printStackTrace();
return false;
}
}
private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {
// 此处发送消息, 就是把消息写入到 硬盘 和 内存 上.
int deliverMode = message.getDeliverMode();
// deliverMode 为 1 , 不持久化. deliverMode 为 2 表示持久化.
if (deliverMode == 2) {
diskDataCenter.sendMessage(queue, message);
}
// 写入内存
memoryDataCenter.sendMessage(queue, message);
// 此处还需要补充一个逻辑, 通知消费者可以消费消息了.
consumerManager.notifyConsume(queue.getName());
}
10.路由规则
1) 实现 route ⽅法
public class Router {
public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException {
// 根据不同的 exchangeType 使用不同的判定转发规则.
if (exchangeType == ExchangeType.FANOUT) {
// 如果是 FANOUT 类型, 则该交换机上绑定的所有队列都需要转发
return true;
} else if (exchangeType == ExchangeType.TOPIC) {
// 如果是 TOPIC 主题交换机, 规则就要更复杂一些.
return routeTopic(binding, message);
} else {
// 其他情况是不应该存在的.
throw new MqException("[Router] 交换机类型非法! exchangeType=" + exchangeType);
}
}
}
2) 实现 checkRoutingKeyValid
// routingKey 的构造规则:
// 1. 数字, 字母, 下划线
// 2. 使用 . 分割成若干部分
public boolean checkRoutingKey(String routingKey) {
if (routingKey.length() == 0) {
// 空字符串. 合法的情况. 比如在使用 fanout 交换机的时候, routingKey 用不上, 就可以设为 ""
return true;
}
for (int i = 0; i = 'A' && ch = 'a' && ch = '0' && ch
3) 实现 checkBindingKeyValid
// bindingKey 的构造规则:
// 1. 数字, 字母, 下划线
// 2. 使用 . 分割成若干部分
// 3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.
public boolean checkBindingKey(String bindingKey) {
if (bindingKey.length() == 0) {
// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.
return true;
}
// 检查字符串中不能存在非法字符
for (int i = 0; i = 'A' && ch = 'a' && ch = '0' && ch 1 并且包含了 * 或者 # , 就是非法的格式了.
if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {
return false;
}
}
// 约定一下, 通配符之间的相邻关系(人为(俺)约定的).
// 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大~~
// 1. aaa.#.#.bbb => 非法
// 2. aaa.#.*.bbb => 非法
// 3. aaa.*.#.bbb => 非法
// 4. aaa.*.*.bbb => 合法
for (int i = 0; i
4) 实现 routeTopic
private boolean routeTopic(Binding binding, Message message) {
// 先把这两个 key 进行切分
String[] bindingTokens = binding.getBindingKey().split("\.");
String[] routingTokens = message.getRoutingKey().split("\.");
// 引入两个下标, 指向上述两个数组. 初始情况下都为 0
int bindingIndex = 0;
int routingIndex = 0;
// 此处使用 while 更合适, 每次循环, 下标不一定就是 + 1, 不适合使用 for
while (bindingIndex
6) 测试 Router
@SpringBootTest
public class RouterTests {
private Router router = new Router();
private Binding binding = null;
private Message message = null;
@BeforeEach
public void setUp() {
binding = new Binding();
message = new Message();
}
@AfterEach
public void tearDown() {
binding = null;
message = null;
}
// [测试用例]
// binding key routing key result
// aaa aaa true
// aaa.bbb aaa.bbb true
// aaa.bbb aaa.bbb.ccc false
// aaa.bbb aaa.ccc false
// aaa.bbb.ccc aaa.bbb.ccc true
// aaa.* aaa.bbb true
// aaa.*.bbb aaa.bbb.ccc false
// *.aaa.bbb aaa.bbb false
// # aaa.bbb.ccc true
// aaa.# aaa.bbb true
// aaa.# aaa.bbb.ccc true
// aaa.#.ccc aaa.ccc true
// aaa.#.ccc aaa.bbb.ccc true
// aaa.#.ccc aaa.aaa.bbb.ccc true
// #.ccc ccc true
// #.ccc aaa.bbb.ccc true
@Test
public void test1() throws MqException {
binding.setBindingKey("aaa");
message.setRoutingKey("aaa");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test2() throws MqException {
binding.setBindingKey("aaa.bbb");
message.setRoutingKey("aaa.bbb");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test3() throws MqException {
binding.setBindingKey("aaa.bbb");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test4() throws MqException {
binding.setBindingKey("aaa.bbb");
message.setRoutingKey("aaa.ccc");
Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test5() throws MqException {
binding.setBindingKey("aaa.bbb.ccc");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test6() throws MqException {
binding.setBindingKey("aaa.*");
message.setRoutingKey("aaa.bbb");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test7() throws MqException {
binding.setBindingKey("aaa.*.bbb");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test8() throws MqException {
binding.setBindingKey("*.aaa.bbb");
message.setRoutingKey("aaa.bbb");
Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test9() throws MqException {
binding.setBindingKey("#");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test10() throws MqException {
binding.setBindingKey("aaa.#");
message.setRoutingKey("aaa.bbb");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test11() throws MqException {
binding.setBindingKey("aaa.#");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test12() throws MqException {
binding.setBindingKey("aaa.#.ccc");
message.setRoutingKey("aaa.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test13() throws MqException {
binding.setBindingKey("aaa.#.ccc");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test14() throws MqException {
binding.setBindingKey("aaa.#.ccc");
message.setRoutingKey("aaa.aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test15() throws MqException {
binding.setBindingKey("#.ccc");
message.setRoutingKey("ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
@Test
public void test16() throws MqException {
binding.setBindingKey("#.ccc");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
}
}
11.订阅消息
1) 添加⼀个订阅者
// 订阅消息.
// 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者.
// consumerTag: 消费者的身份标识
// autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答.
// consumer: 是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作 lambda 样子了.
public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.
queueName = virtualHostName + queueName;
try {
consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);
System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName);
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName);
e.printStackTrace();
return false;
}
}
Consumer 相当于⼀个回调函数. 放到 common.Consumer 中.
@FunctionalInterface
public interface Consumer {
// Delivery 的意思是 "投递", 这个方法预期是在每次服务器收到消息之后, 来调用.
// 通过这个方法把消息推送给对应的消费者.
// (注意! 这里的方法名和参数, 也都是参考 RabbitMQ 展开的)
void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException;
}
2) 创建订阅者管理管理类
创建 mqserver.core.ConsumerManager
public class ConsumerManager {
// 持有上层的 VirtualHost 对象的引用. 用来操作数据.
private VirtualHost parent;
// 指定一个线程池, 负责去执行具体的回调任务.
private ExecutorService workerPool = Executors.newFixedThreadPool(4);
// 存放令牌的队列
private BlockingQueue tokenQueue = new LinkedBlockingQueue();
}
• parent ⽤来记录虚拟主机.
• 使⽤⼀个阻塞队列⽤来触发消息消费. 称为令牌队列. 每次有消息过来了, 都往队列中放⼀个令牌(也 就是队列名), 然后消费者再去消费对应队列的消息.
• 使⽤⼀个线程池⽤来执⾏消息回调. 这样令牌队列的设定避免搞出来太多线程. 否则就需要给每个队列都安排⼀个单独的线程了, 如果队 列很多则开销就⽐较⼤了.
3) 添加令牌接⼝
// 通知消费者去消费消息
public void notifyConsume(String queueName) throws InterruptedException {
tokenQueue.put(queueName);
}
4) 实现添加订阅者
public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
// 找到对应的队列.
MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
if (queue == null) {
throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);
}
ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);
synchronized (queue) {
queue.addConsumerEnv(consumerEnv);
// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.
int n = parent.getMemoryDataCenter().getMessageCount(queueName);
for (int i = 0; i
创建 ConsumerEnv , 这个类表⽰⼀个订阅者的执⾏环境.
public class ConsumerEnv {
private String consumerTag;
private String queueName;
private boolean autoAck;
// 通过这个回调来处理收到的消息.
private Consumer consumer;
public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
this.consumerTag = consumerTag;
this.queueName = queueName;
this.autoAck = autoAck;
this.consumer = consumer;
}
}
5) 实现扫描线程
在 ConsumerManager 中创建⼀个线程, 不停的尝试扫描令牌队列. 如果拿到了令牌, 就真正触发消费消 息操作.
public ConsumerManager(VirtualHost p) {
parent = p;
scannerThread = new Thread(() -> {
while (true) {
try {
// 1. 拿到令牌
String queueName = tokenQueue.take();
// 2. 根据令牌, 找到队列
MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
if (queue == null) {
throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);
}
// 3. 从这个队列中消费一个消息.
synchronized (queue) {
consumeMessage(queue);
}
} catch (InterruptedException | MqException e) {
e.printStackTrace();
}
}
});
// 把线程设为后台线程.
scannerThread.setDaemon(true);
scannerThread.start();
}
6) 实现消费消息
所谓的消费消息, 其实就是调⽤消息的回调. 并把消息删除掉.
private void consumeMessage(MSGQueue queue) {
// 1. 按照轮询的方式, 找个消费者出来.
ConsumerEnv luckyDog = queue.chooseConsumer();
if (luckyDog == null) {
// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.
return;
}
// 2. 从队列中取出一个消息
Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
if (message == null) {
// 当前队列中还没有消息, 也不需要消费.
return;
}
// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.
workerPool.submit(() -> {
try {
// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.
parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);
// 2. 真正执行回调操作
luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),
message.getBody());
// 3. 如果当前是 "自动应答" , 就可以直接把消息删除了.
// 如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.
if (luckyDog.isAutoAck()) {
// 1) 删除硬盘上的消息
if (message.getDeliverMode() == 2) {
parent.getDiskDataCenter().deleteMessage(queue, message);
}
// 2) 删除上面的待确认集合中的消息
parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
// 3) 删除内存中消息中心里的消息
parent.getMemoryDataCenter().removeMessage(message.getMessageId());
System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
7)⼩结
⼀. 消费消息的两种典型情况
1) 订阅者已经存在了, 才发送消息 这种直接获取队列的订阅者, 从中按照轮询的⽅式挑⼀个消费者来调⽤回调即可.
2) 消息先发送到队列了, 订阅者还没到. 此时当订阅者到达, 就快速把指定队列中的消息全都消费掉.
⼆. 关于消息不丢失的论证 每个消息在从内存队列中出队列时, 都会先进⼊ 待确认 中.
• 如果 autoAck 为 true 消息被消费完毕后(执⾏完消息回调之后), 再执⾏清除⼯作. 分别清除硬盘数据, 待确认队列, 消息中⼼.
• 如果 autoAck 为 false 在回调内部, 进⾏清除⼯作. 分别清除硬盘数据, 待确认队列, 消息中⼼.
1) 执⾏消息回调的时候抛出异常 此时消息仍然处在待确认队列中. 此时可以⽤⼀个线程扫描待确认队列, 如果发现队列中的消息超时未确认, 则放⼊死信队列. 死信队列咱们此处暂不实现.
2) 执⾏消息回调的时候服务器宕机 内存所有数据都没了, 但是消息在硬盘上仍然存在. 会在服务下次启动的时候, 加载回内存. 重新被消费.
12.消息确认
下列⽅法只是⼿动应答的时候才会使⽤. 应答成功, 则把消息删除掉.
public boolean basicAck(String queueName, String messageId) {
queueName = virtualHostName + queueName;
try {
// 1. 获取到消息和队列
Message message = memoryDataCenter.getMessage(messageId);
if (message == null) {
throw new MqException("[VirtualHost] 要确认的消息不存在! messageId=" + messageId);
}
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null) {
throw new MqException("[VirtualHost] 要确认的队列不存在! queueName=" + queueName);
}
// 2. 删除硬盘上的数据
if (message.getDeliverMode() == 2) {
diskDataCenter.deleteMessage(queue, message);
}
// 3. 删除消息中心中的数据
memoryDataCenter.removeMessage(messageId);
// 4. 删除待确认的集合中的数据
memoryDataCenter.removeMessageWaitAck(queueName, messageId);
System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName=" + queueName
+ ", messageId=" + messageId);
return true;
} catch (Exception e) {
System.out.println("[VirtualHost] basicAck 失败! 消息确认失败! queueName=" + queueName
+ ", messageId=" + messageId);
e.printStackTrace();
return false;
}
}
13. 测试 VirtualHost
@SpringBootTest
public class VirtualHostTests {
private VirtualHost virtualHost = null;
@BeforeEach
public void setUp() {
MqApplication.context = SpringApplication.run(MqApplication.class);
virtualHost = new VirtualHost("default");
}
@AfterEach
public void tearDown() throws IOException {
MqApplication.context.close();
virtualHost = null;
// 把硬盘的目录删除掉
File dataDir = new File("./data");
FileUtils.deleteDirectory(dataDir);
}
@Test
public void testExchangeDeclare() {
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
true);
Assertions.assertTrue(ok);
}
@Test
public void testExchangeDelete() {
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
true);
Assertions.assertTrue(ok);
ok = virtualHost.exchangeDelete("testExchange");
Assertions.assertTrue(ok);
}
@Test
public void testQueueDeclare() {
boolean ok = virtualHost.queueDeclare("testQueue", true);
Assertions.assertTrue(ok);
}
@Test
public void testQueueDelete() {
boolean ok = virtualHost.queueDeclare("testQueue", true);
Assertions.assertTrue(ok);
ok = virtualHost.queueDelete("testQueue");
Assertions.assertTrue(ok);
}
@Test
public void testQueueBind() {
boolean ok = virtualHost.queueDeclare("testQueue", true);
Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
true);
Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testQueue", "testExchange", "testBindingKey");
Assertions.assertTrue(ok);
}
@Test
public void testQueueUnbind() {
boolean ok = virtualHost.queueDeclare("testQueue", true);
Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
true);
Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testQueue", "testExchange", "testBindingKey");
Assertions.assertTrue(ok);
ok = virtualHost.queueUnbind("testQueue", "testExchange");
Assertions.assertTrue(ok);
}
@Test
public void testBasicPublish() {
boolean ok = virtualHost.queueDeclare("testQueue", true);
Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
true);
Assertions.assertTrue(ok);
ok = virtualHost.basicPublish("testExchange", "testQueue", null,
"hello".getBytes());
Assertions.assertTrue(ok);
}
// 先订阅队列, 后发送消息
@Test
public void testBasicConsume1() throws InterruptedException {
boolean ok = virtualHost.queueDeclare("testQueue", true);
Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
true);
Assertions.assertTrue(ok);
// 先订阅队列
ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
try {
// 消费者自身设定的回调方法.
System.out.println("messageId=" + basicProperties.getMessageId());
System.out.println("body=" + new String(body, 0, body.length));
Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
Assertions.assertEquals(1, basicProperties.getDeliverMode());
Assertions.assertArrayEquals("hello".getBytes(), body);
} catch (Error e) {
// 断言如果失败, 抛出的是 Error, 而不是 Exception!
e.printStackTrace();
System.out.println("error");
}
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
// 再发送消息
ok = virtualHost.basicPublish("testExchange", "testQueue", null,
"hello".getBytes());
Assertions.assertTrue(ok);
}
// 先发送消息, 后订阅队列.
@Test
public void testBasicConsume2() throws InterruptedException {
boolean ok = virtualHost.queueDeclare("testQueue", true);
Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
true);
Assertions.assertTrue(ok);
// 先发送消息
ok = virtualHost.basicPublish("testExchange", "testQueue", null,
"hello".getBytes());
Assertions.assertTrue(ok);
// 再订阅队列
ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
// 消费者自身设定的回调方法.
System.out.println("messageId=" + basicProperties.getMessageId());
System.out.println("body=" + new String(body, 0, body.length));
Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
Assertions.assertEquals(1, basicProperties.getDeliverMode());
Assertions.assertArrayEquals("hello".getBytes(), body);
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
}
@Test
public void testBasicConsumeFanout() throws InterruptedException {
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT, false);
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue1", false);
Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testQueue1", "testExchange", "");
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue2", false);
Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testQueue2", "testExchange", "");
Assertions.assertTrue(ok);
// 往交换机中发布一个消息
ok = virtualHost.basicPublish("testExchange", "", null, "hello".getBytes());
Assertions.assertTrue(ok);
Thread.sleep(500);
// 两个消费者订阅上述的两个队列.
ok = virtualHost.basicConsume("testConsumer1", "testQueue1", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
System.out.println("consumerTag=" + consumerTag);
System.out.println("messageId=" + basicProperties.getMessageId());
Assertions.assertArrayEquals("hello".getBytes(), body);
}
});
Assertions.assertTrue(ok);
ok = virtualHost.basicConsume("testConsumer2", "testQueue2", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
System.out.println("consumerTag=" + consumerTag);
System.out.println("messageId=" + basicProperties.getMessageId());
Assertions.assertArrayEquals("hello".getBytes(), body);
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
}
@Test
public void testBasicConsumeTopic() throws InterruptedException {
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC, false);
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue", false);
Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testQueue", "testExchange", "aaa.*.bbb");
Assertions.assertTrue(ok);
ok = virtualHost.basicPublish("testExchange", "aaa.ccc.bbb", null, "hello".getBytes());
Assertions.assertTrue(ok);
ok = virtualHost.basicConsume("testConsumer", "testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
System.out.println("consumerTag=" + consumerTag);
System.out.println("messageId=" + basicProperties.getMessageId());
Assertions.assertArrayEquals("hello".getBytes(), body);
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
}
@Test
public void testBasicAck() throws InterruptedException {
boolean ok = virtualHost.queueDeclare("testQueue", true);
Assertions.assertTrue(ok);
ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,
true);
Assertions.assertTrue(ok);
// 先发送消息
ok = virtualHost.basicPublish("testExchange", "testQueue", null,
"hello".getBytes());
Assertions.assertTrue(ok);
// 再订阅队列 [要改的地方, 把 autoAck 改成 false]
ok = virtualHost.basicConsume("testConsumerTag", "testQueue", false, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
// 消费者自身设定的回调方法.
System.out.println("messageId=" + basicProperties.getMessageId());
System.out.println("body=" + new String(body, 0, body.length));
Assertions.assertEquals("testQueue", basicProperties.getRoutingKey());
Assertions.assertEquals(1, basicProperties.getDeliverMode());
Assertions.assertArrayEquals("hello".getBytes(), body);
// [要改的地方, 新增手动调用 basicAck]
boolean ok = virtualHost.basicAck("testQueue", basicProperties.getMessageId());
Assertions.assertTrue(ok);
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
}
}
⼗. ⽹络通信协议设计
1.明确需求
⽣产者和消费者都是客⼾端, 都需要通过⽹络和 Broker Server 进⾏通信. 此处我们使⽤ TCP 协议, 来作为通信的底层协议. 同时在这个基础上⾃定义应⽤层协议, 完成客⼾端对服 务器这边功能的远程调⽤. 要调⽤的功能有:
• 创建 channel
• 关闭 channel
• 创建 exchange
• 删除 exchange
• 创建 queue
• 删除 queue
• 创建 binding
• 删除 binding
• 发送 message
• 订阅 message
• 发送 ack
• 返回 message (服务器 -> 客⼾端)
2.设计应⽤层协议
使⽤⼆进制的⽅式设定协议.
1)请求
2)响应
其中 type 表⽰请求响应不同的功能. 取值如下:
• 0x1 创建 channel
• 0x2 关闭 channel
• 0x3 创建 exchange
• 0x4 销毁 exchange
• 0x5 创建 queue
• 0x6 销毁 queue
• 0x7 创建 binding
• 0x8 销毁 binding
• 0x9 发送 message
• 0xa 订阅 message
• 0xb 返回 ack
• 0xc 服务器给客⼾端推送的消息. (被订阅的消息) 响应独有的.
其中 payload 部分, 会根据不同的 type, 存在不同的格式.
对于请求来说, payload 表⽰这次⽅法调⽤的各种参数信息.
对于响应来说, payload 表⽰这次⽅法调⽤的返回值.
3.定义 Request / Response
public class Request {
private int type;
private int length;
private byte[] payload;
//省略 getter setter
}
public class Response {
private int type;
private int length;
private byte[] payload;
//省略 getter setter
}
4.定义参数⽗类
构造⼀个类表⽰⽅法的参数, 作为 Request 的 payload. 不同的⽅法中, 参数形态各异, 但是有些信息是通⽤的, 使⽤⼀个⽗类表⽰出来. 具体每个⽅法的参数再 通过继承的⽅式体现.
public class BaseArguments implements Serializable {
// 表⽰⼀次请求/响应的唯⼀ id. ⽤来把响应和请求对上.
protected String rid;
protected String channelId;
// 省略 getter setter
}
5.定义返回值⽗类
public class BaseReturns implements Serializable {
// 表⽰⼀次请求/响应的唯⼀ id. ⽤来把响应和请求对上.
protected String rid;
protected String channelId;
protected boolean ok;
// 省略 getter setter
}
6.定义其他参数类
1) ExchangeDeclareArguments
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {
private String exchangeName;
private ExchangeType exchangeType;
private boolean durable;
}
⼀个创建交换机的请求, 形如:
• 可以把 ExchangeDeclareArguments 转成 byte[], 就得到了下列图⽚的结构. • 按照 length ⻓度读取出 payload, 就可以把读到的⼆进制数据转换成 ExchangeDeclareArguments 对象
2) ExchangeDeleteArguments
public class ExchangeDeleteArguments extends BasicArguments implements Serializable {
private String exchangeName;
}
3) QueueDeclareArguments
public class QueueDeclareArguments extends BasicArguments implements Serializable {
private String queueName;
private boolean durable;
private boolean exclusive;
private boolean autoDelete;
private Map arguments;
}
4) QueueDeleteArguments
public class QueueDeleteArguments extends BasicArguments implements Serializable {
private String queueName;
}
5) QueueBindArguments
public class QueueBindArguments extends BasicArguments implements Serializable {
private String queueName;
private String exchangeName;
private String bindingKey;
}
6) QueueUnbindArguments
public class QueueUnbindArguments extends BasicArguments implements Serializable {
private String queueName;
private String exchangeName;
}
7) BasicPublishArguments
public class BasicPublishArguments extends BasicArguments implements Serializable {
private String exchangeName;
private String routingKey;
private BasicProperties basicProperties;
private byte[] body;
}
8) BasicConsumeArguments
public class BasicConsumeArguments extends BasicArguments implements Serializable {
private String consumerTag;
private String queueName;
private boolean autoAck;
}
9) SubScribeReturns
public class SubScribeReturns extends BasicReturns implements Serializable {
private String consumerTag;
private BasicProperties basicProperties;
private byte[] body;
}
⼗一. 实现 BrokerServer
1.创建 BrokerServer 类
public class BrokerServer {
private ServerSocket serverSocket = null;
// 当前考虑一个 BrokerServer 上只有一个 虚拟主机
private VirtualHost virtualHost = new VirtualHost("default");
// 使用这个 哈希表 表示当前的所有会话(也就是说有哪些客户端正在和咱们的服务器进行通信)
// 此处的 key 是 channelId, value 为对应的 Socket 对象
private ConcurrentHashMap sessions = new ConcurrentHashMap();
// 引入一个线程池, 来处理多个客户端的请求.
private ExecutorService executorService = null;
// 引入一个 boolean 变量控制服务器是否继续运行
private volatile boolean runnable = true;
}
• virtualHost 表⽰服务器持有的虚拟主机. 队列, 交换机, 绑定, 消息都是通过虚拟主机管理.
• sessions ⽤来管理所有的客⼾端的连接. 记录每个客⼾端的 socket.
• serverSocket 是服务器⾃⾝的 socket
• executorService 这个线程池⽤来处理响应.
• runnable 这个标志位⽤来控制服务器的运⾏停⽌.
2.启动/停⽌服务器
public void start() throws IOException {
System.out.println("[BrokerServer] 启动!");
executorService = Executors.newCachedThreadPool();
try {
while (runnable) {
Socket clientSocket = serverSocket.accept();
// 把处理连接的逻辑丢给这个线程池.
executorService.submit(() -> {
processConnection(clientSocket);
});
}
} catch (SocketException e) {
System.out.println("[BrokerServer] 服务器停止运行!");
// e.printStackTrace();
}
}
// 一般来说停止服务器, 就是直接 kill 掉对应进程就行了.
// 此处还是搞一个单独的停止方法. 主要是用于后续的单元测试.
public void stop() throws IOException {
runnable = false;
// 把线程池中的任务都放弃了. 让线程都销毁.
executorService.shutdownNow();
serverSocket.close();
}
3.实现处理连接
• 对于 EOFException 和 SocketException , 我们视为客⼾端正常断开连接.
◦ 如果是客⼾端先 close, 后调⽤ DataInputStream 的 read, 则抛出 EOFException
◦ 如果是先调⽤ DataInputStream 的 read, 后客⼾端调⽤ close, 则抛出 SocketException
// 通过这个方法, 来处理一个客户端的连接.
// 在这一个连接中, 可能会涉及到多个请求和响应.
private void processConnection(Socket clientSocket) {
try (InputStream inputStream = clientSocket.getInputStream();
OutputStream outputStream = clientSocket.getOutputStream()) {
// 这里需要按照特定格式来读取并解析. 此时就需要用到 DataInputStream 和 DataOutputStream
try (DataInputStream dataInputStream = new DataInputStream(inputStream);
DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
while (true) {
// 1. 读取请求并解析.
Request request = readRequest(dataInputStream);
// 2. 根据请求计算响应
Response response = process(request, clientSocket);
// 3. 把响应写回给客户端
writeResponse(dataOutputStream, response);
}
}
} catch (EOFException | SocketException e) {
// 对于这个代码, DataInputStream 如果读到 EOF , 就会抛出一个 EOFException 异常.
// 需要借助这个异常来结束循环
System.out.println("[BrokerServer] connection 关闭! 客户端的地址: " + clientSocket.getInetAddress().toString()
+ ":" + clientSocket.getPort());
} catch (IOException | ClassNotFoundException | MqException e) {
System.out.println("[BrokerServer] connection 出现异常!");
e.printStackTrace();
} finally {
try {
// 当连接处理完了, 就需要记得关闭 socket
clientSocket.close();
// 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉.
clearClosedSession(clientSocket);
} catch (IOException e) {
e.printStackTrace();
}
}
}
4.实现 readRequest
private Request readRequest(DataInputStream dataInputStream) throws IOException {
Request request = new Request();
request.setType(dataInputStream.readInt());
request.setLength(dataInputStream.readInt());
byte[] payload = new byte[request.getLength()];
int n = dataInputStream.read(payload);
if (n != request.getLength()) {
throw new IOException("读取请求格式出错!");
}
request.setPayload(payload);
return request;
}
5.实现 writeResponse
private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
dataOutputStream.writeInt(response.getType());
dataOutputStream.writeInt(response.getLength());
dataOutputStream.write(response.getPayload());
// 这个刷新缓冲区也是重要的操作!!
dataOutputStream.flush();
}
6.实现处理请求
• 先把请求转换成 BaseArguments , 获取到其中的 channelId 和 rid
• 再根据不同的 type, 分别处理不同的逻辑. (主要是调⽤ virtualHost 中不同的⽅法).
• 针对消息订阅操作, 则需要在存在消息的时候通过回调, 把响应结果写回给对应的客⼾端.
• 最后构造成统⼀的响应.
private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
// 1. 把 request 中的 payload 做一个初步的解析.
BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId()
+ ", type=" + request.getType() + ", length=" + request.getLength());
// 2. 根据 type 的值, 来进一步区分接下来这次请求要干啥.
boolean ok = true;
if (request.getType() == 0x1) {
// 创建 channel
sessions.put(basicArguments.getChannelId(), clientSocket);
System.out.println("[BrokerServer] 创建 channel 完成! channelId=" + basicArguments.getChannelId());
} else if (request.getType() == 0x2) {
// 销毁 channel
sessions.remove(basicArguments.getChannelId());
System.out.println("[BrokerServer] 销毁 channel 完成! channelId=" + basicArguments.getChannelId());
} else if (request.getType() == 0x3) {
// 创建交换机. 此时 payload 就是 ExchangeDeclareArguments 对象了.
ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),
arguments.isDurable());
} else if (request.getType() == 0x4) {
ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
ok = virtualHost.exchangeDelete(arguments.getExchangeName());
} else if (request.getType() == 0x5) {
QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable());
} else if (request.getType() == 0x6) {
QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
ok = virtualHost.queueDelete((arguments.getQueueName()));
} else if (request.getType() == 0x7) {
QueueBindArguments arguments = (QueueBindArguments) basicArguments;
ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());
} else if (request.getType() == 0x8) {
QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;
ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());
} else if (request.getType() == 0x9) {
BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),
arguments.getBasicProperties(), arguments.getBody());
} else if (request.getType() == 0xa) {
BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;
ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),
new Consumer() {
// 这个回调函数要做的工作, 就是把服务器收到的消息可以直接推送回对应的消费者客户端
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
// 先知道当前这个收到的消息, 要发给哪个客户端.
// 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的
// socket 对象了, 从而可以往里面发送数据了
// 1. 根据 channelId 找到 socket 对象
Socket clientSocket = sessions.get(consumerTag);
if (clientSocket == null || clientSocket.isClosed()) {
throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");
}
// 2. 构造响应数据
SubScribeReturns subScribeReturns = new SubScribeReturns();
subScribeReturns.setChannelId(consumerTag);
subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应. rid 暂时不需要.
subScribeReturns.setOk(true);
subScribeReturns.setConsumerTag(consumerTag);
subScribeReturns.setBasicProperties(basicProperties);
subScribeReturns.setBody(body);
byte[] payload = BinaryTool.toBytes(subScribeReturns);
Response response = new Response();
// 0xc 表示服务器给消费者客户端推送的消息数据.
response.setType(0xc);
// response 的 payload 就是一个 SubScribeReturns
response.setLength(payload.length);
response.setPayload(payload);
// 3. 把数据写回给客户端.
// 注意! 此处的 dataOutputStream 这个对象不能 close !!!
// 如果 把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了.
// 此时就无法继续往 socket 中写入后续数据了.
DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
writeResponse(dataOutputStream, response);
}
});
} else if (request.getType() == 0xb) {
// 调用 basicAck 确认消息.
BasicAckArguments arguments = (BasicAckArguments) basicArguments;
ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());
} else {
// 当前的 type 是非法的.
throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());
}
// 3. 构造响应
BasicReturns basicReturns = new BasicReturns();
basicReturns.setChannelId(basicArguments.getChannelId());
basicReturns.setRid(basicArguments.getRid());
basicReturns.setOk(ok);
byte[] payload = BinaryTool.toBytes(basicReturns);
Response response = new Response();
response.setType(request.getType());
response.setLength(payload.length);
response.setPayload(payload);
System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()
+ ", type=" + response.getType() + ", length=" + response.getLength());
return response;
}
7.实现 clearClosedSession
• 如果客⼾端只关闭了 Connection, 没关闭 Connection 中包含的 Channel, 也没关系, 在这⾥统⼀进 ⾏清理.
• 注意迭代器失效问题
private void clearClosedSession(Socket clientSocket) {
// 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉.
List toDeleteChannelId = new ArrayList();
for (Map.Entry entry : sessions.entrySet()) {
if (entry.getValue() == clientSocket) {
// 不能在这里直接删除!!!
// 这属于使用集合类的一个大忌!!! 一边遍历, 一边删除!!!
// sessions.remove(entry.getKey());
toDeleteChannelId.add(entry.getKey());
}
}
for (String channelId : toDeleteChannelId) {
sessions.remove(channelId);
}
System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId);
}
⼗二. 实现客⼾端
1.创建 ConnectionFactory
public class ConnectionFactory {
// broker server 的 ip 地址
private String host;
// broker server 的端口号
private int port;
// 访问 broker server 的哪个虚拟主机.
// 下列几个属性暂时先都不搞了.
// private String virtualHostName;
// private String username;
// private String password;
public Connection newConnection() throws IOException {
Connection connection = new Connection(host, port);
return connection;
}
}
2.Connection 和 Channel 的定义
⼀个客⼾端可以创建多个 Connection.
⼀个 Connection 对应⼀个 socket, ⼀个 TCP 连接.
⼀个 Connection 可以包含多个 Channel
1) Connection 的定义
public class Connection {
private Socket socket = null;
// 需要管理多个 channel. 使用一个 hash 表把若干个 channel 组织起来.
private ConcurrentHashMap channelMap = new ConcurrentHashMap();
private InputStream inputStream;
private OutputStream outputStream;
private DataInputStream dataInputStream;
private DataOutputStream dataOutputStream;
}
• Socket 是客⼾端持有的套接字. InputStream OutputStream DataInputStream DataOutputStream 均为 socket 通信的接⼝.
• channelMap ⽤来管理该连接中所有的 Channel.
• callbackPool 是⽤来在客⼾端这边执⾏⽤⼾回调的线程池.
2) Channel 的定义
public class Channel {
private String channelId;
// 当前这个 channel 属于哪个连接.
private Connection connection;
// 用来存储后续客户端收到的服务器的响应.
private ConcurrentHashMap basicReturnsMap = new ConcurrentHashMap();
// 如果当前 Channel 订阅了某个队列, 就需要在此处记录下对应回调是啥. 当该队列的消息返回回来的时候, 调用回调.
// 此处约定一个 Channel 中只能有一个回调.
private Consumer consumer = null;
public Channel(String channelId, Connection connection) {
this.channelId = channelId;
this.connection = connection;
}
}
• channelId 为 channel 的⾝份标识, 使⽤ UUID 标识.
• Connection 为 channel 对应的连接.
• baseReturnsMap ⽤来保存响应的返回值. 放到这个哈希表中⽅便和请求匹配.
• consumer 为消费者的回调(⽤⼾注册的). 对于消息响应, 应该调⽤这个回调处理消息.
3.封装请求响应读写操作
public void writeRequest(Request request) throws IOException {
dataOutputStream.writeInt(request.getType());
dataOutputStream.writeInt(request.getLength());
dataOutputStream.write(request.getPayload());
dataOutputStream.flush();
System.out.println("[Connection] 发送请求! type=" + request.getType() + ", length=" + request.getLength());
}
// 读取响应
public Response readResponse() throws IOException {
Response response = new Response();
response.setType(dataInputStream.readInt());
response.setLength(dataInputStream.readInt());
byte[] payload = new byte[response.getLength()];
int n = dataInputStream.read(payload);
if (n != response.getLength()) {
throw new IOException("读取的响应数据不完整!");
}
response.setPayload(payload);
System.out.println("[Connection] 收到响应! type=" + response.getType() + ", length=" + response.getLength());
return response;
}
4.创建 channel
public Channel createChannel() throws IOException {
String channelId = "C-" + UUID.randomUUID().toString();
Channel channel = new Channel(channelId, this);
// 把这个 channel 对象放到 Connection 管理 channel 的 哈希表 中.
channelMap.put(channelId, channel);
// 同时也需要把 "创建 channel" 的这个消息也告诉服务器.
boolean ok = channel.createChannel();
if (!ok) {
// 服务器这里创建失败了!! 整个这次创建 channel 操作不顺利!!
// 把刚才已经加入 hash 表的键值对, 再删了.
channelMap.remove(channelId);
return null;
}
return channel;
}
5.发送请求
1) 创建 channel
public boolean createChannel() throws IOException {
// 对于创建 Channel 操作来说, payload 就是一个 basicArguments 对象
BasicArguments basicArguments = new BasicArguments();
basicArguments.setChannelId(channelId);
basicArguments.setRid(generateRid());
byte[] payload = BinaryTool.toBytes(basicArguments);
Request request = new Request();
request.setType(0x1);
request.setLength(payload.length);
request.setPayload(payload);
// 构造出完整请求之后, 就可以发送这个请求了.
connection.writeRequest(request);
// 等待服务器的响应
BasicReturns basicReturns = waitResult(basicArguments.getRid());
return basicReturns.isOk();
private String generateRid() {
return "R-" + UUID.randomUUID().toString();
}
private BasicReturns waitResult(String rid) {
BasicReturns basicReturns = null;
while ((basicReturns = basicReturnsMap.get(rid)) == null) {
// 如果查询结果为 null, 说明包裹还没回来.
// 此时就需要阻塞等待.
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 读取成功之后, 还需要把这个消息从哈希表中删除掉.
basicReturnsMap.remove(rid);
return basicReturns;
}
}
2) 关闭 channel
// 关闭 channel, 给服务器发送一个 type = 0x2 的请求
public boolean close() throws IOException {
BasicArguments basicArguments = new BasicArguments();
basicArguments.setRid(generateRid());
basicArguments.setChannelId(channelId);
byte[] payload = BinaryTool.toBytes(basicArguments);
Request request = new Request();
request.setType(0x2);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(basicArguments.getRid());
return basicReturns.isOk();
}
3) 创建交换机
// 创建交换机
public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable) throws IOException {
ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();
exchangeDeclareArguments.setRid(generateRid());
exchangeDeclareArguments.setChannelId(channelId);
exchangeDeclareArguments.setExchangeName(exchangeName);
exchangeDeclareArguments.setExchangeType(exchangeType);
exchangeDeclareArguments.setDurable(durable);
byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);
Request request = new Request();
request.setType(0x3);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());
return basicReturns.isOk();
}
4) 删除交换机
// 删除交换机
public boolean exchangeDelete(String exchangeName) throws IOException {
ExchangeDeleteArguments arguments = new ExchangeDeleteArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setExchangeName(exchangeName);
byte[] payload = BinaryTool.toBytes(arguments);
Request request = new Request();
request.setType(0x4);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(arguments.getRid());
return basicReturns.isOk();
}
5) 创建队列
// 创建队列
public boolean queueDeclare(String queueName, boolean durable) throws IOException {
QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();
queueDeclareArguments.setRid(generateRid());
queueDeclareArguments.setChannelId(channelId);
queueDeclareArguments.setQueueName(queueName);
queueDeclareArguments.setDurable(durable);
byte[] payload = BinaryTool.toBytes(queueDeclareArguments);
Request request = new Request();
request.setType(0x5);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());
return basicReturns.isOk();
}
6) 删除队列
public boolean queueDelete(String queueName) throws IOException {
QueueDeleteArguments arguments = new QueueDeleteArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setQueueName(queueName);
byte[] payload = BinaryTool.toBytes(arguments);
Request request = new Request();
request.setType(0x6);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(arguments.getRid());
return basicReturns.isOk();
}
7) 创建绑定
public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {
QueueBindArguments arguments = new QueueBindArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setQueueName(queueName);
arguments.setExchangeName(exchangeName);
arguments.setBindingKey(bindingKey);
byte[] payload = BinaryTool.toBytes(arguments);
Request request = new Request();
request.setType(0x7);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(arguments.getRid());
return basicReturns.isOk();
}
8) 删除绑定
// 解除绑定
public boolean queueUnbind(String queueName, String exchangeName) throws IOException {
QueueUnbindArguments arguments = new QueueUnbindArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setQueueName(queueName);
arguments.setExchangeName(exchangeName);
byte[] payload = BinaryTool.toBytes(arguments);
Request request = new Request();
request.setType(0x8);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(arguments.getRid());
return basicReturns.isOk();
}
9) 发送消息
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {
BasicPublishArguments arguments = new BasicPublishArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setExchangeName(exchangeName);
arguments.setRoutingKey(routingKey);
arguments.setBasicProperties(basicProperties);
arguments.setBody(body);
byte[] payload = BinaryTool.toBytes(arguments);
Request request = new Request();
request.setType(0x9);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(arguments.getRid());
return basicReturns.isOk();
}
10) 订阅消息
// 订阅消息
public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {
// 先设置回调.
if (this.consumer != null) {
throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!");
}
this.consumer = consumer;
BasicConsumeArguments arguments = new BasicConsumeArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setConsumerTag(channelId); // 此处 consumerTag 也使用 channelId 来表示了.
arguments.setQueueName(queueName);
arguments.setAutoAck(autoAck);
byte[] payload = BinaryTool.toBytes(arguments);
Request request = new Request();
request.setType(0xa);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(arguments.getRid());
return basicReturns.isOk();
}
11) 确认消息
// 确认消息
public boolean basicAck(String queueName, String messageId) throws IOException {
BasicAckArguments arguments = new BasicAckArguments();
arguments.setRid(generateRid());
arguments.setChannelId(channelId);
arguments.setQueueName(queueName);
arguments.setMessageId(messageId);
byte[] payload = BinaryTool.toBytes(arguments);
Request request = new Request();
request.setType(0xb);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(arguments.getRid());
return basicReturns.isOk();
}
6.处理响应
1) 创建扫描线程
创建⼀个扫描线程, ⽤来不停的读取 socket 中的响应数据.
public Connection(String host, int port) throws IOException {
socket = new Socket(host, port);
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
dataInputStream = new DataInputStream(inputStream);
dataOutputStream = new DataOutputStream(outputStream);
callbackPool = Executors.newFixedThreadPool(4);
// 创建一个扫描线程, 由这个线程负责不停的从 socket 中读取响应数据. 把这个响应数据再交给对应的 channel 负责处理.
Thread t = new Thread(() -> {
try {
while (!socket.isClosed()) {
Response response = readResponse();
dispatchResponse(response);
}
} catch (SocketException e) {
// 连接正常断开的. 此时这个异常直接忽略.
System.out.println("[Connection] 连接正常断开!");
} catch (IOException | ClassNotFoundException | MqException e) {
System.out.println("[Connection] 连接异常断开!");
e.printStackTrace();
}
});
t.start();
}
2) 实现响应的分发
给 Connection 创建 dispatchResponse ⽅法.
• 针对服务器返回的控制响应和消息响应, 分别处理.
◦ 如果是订阅数据, 则调⽤ channel 中的回调.
◦ 如果是控制消息, 直接放到结果集合中.
// 使用这个方法来分别处理, 当前的响应是一个针对控制请求的响应, 还是服务器推送的消息.
private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
if (response.getType() == 0xc) {
// 服务器推送来的消息数据
SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());
// 根据 channelId 找到对应的 channel 对象
Channel channel = channelMap.get(subScribeReturns.getChannelId());
if (channel == null) {
throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());
}
// 执行该 channel 对象内部的回调.
callbackPool.submit(() -> {
try {
channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),
subScribeReturns.getBody());
} catch (MqException | IOException e) {
e.printStackTrace();
}
});
} else {
// 当前响应是针对刚才的控制请求的响应
BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
// 把这个结果放到对应的 channel 的 hash 表中.
Channel channel = channelMap.get(basicReturns.getChannelId());
if (channel == null) {
throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());
}
channel.putReturns(basicReturns);
}
}
3) 实现 channel.putReturns
把响应放到响应的 hash 表中, 同时唤醒等待响应的线程去消费.
public void putReturns(BasicReturns basicReturns) {
basicReturnsMap.put(basicReturns.getRid(), basicReturns);
synchronized (this) {
// 当前也不知道有多少个线程在等待上述的这个响应.
// 把所有的等待的线程都唤醒.
notifyAll();
}
}
7.关闭 Connection
public void close() {
// 关闭 Connection 释放上述资源
try {
callbackPool.shutdownNow();
channelMap.clear();
inputStream.close();
outputStream.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
8.测试客⼾端-服务器
public class MqClientTests {
private BrokerServer brokerServer = null;
private ConnectionFactory factory = null;
private Thread t = null;
@BeforeEach
public void setUp() throws IOException {
// 1. 先启动服务器
MqApplication.context = SpringApplication.run(MqApplication.class);
brokerServer = new BrokerServer(9090);
t = new Thread(() -> {
// 这个 start 方法会进入一个死循环. 使用一个新的线程来运行 start 即可!
try {
brokerServer.start();
} catch (IOException e) {
e.printStackTrace();
}
});
t.start();
// 2. 配置 ConnectionFactory
factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
}
@AfterEach
public void tearDown() throws IOException {
// 停止服务器
brokerServer.stop();
// t.join();
MqApplication.context.close();
// 删除必要的文件
File file = new File("./data");
FileUtils.deleteDirectory(file);
factory = null;
}
@Test
public void testConnection() throws IOException {
Connection connection = factory.newConnection();
Assertions.assertNotNull(connection);
}
@Test
public void testChannel() throws IOException {
Connection connection = factory.newConnection();
Assertions.assertNotNull(connection);
Channel channel = connection.createChannel();
Assertions.assertNotNull(channel);
}
@Test
public void testExchange() throws IOException {
Connection connection = factory.newConnection();
Assertions.assertNotNull(connection);
Channel channel = connection.createChannel();
Assertions.assertNotNull(channel);
boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);
Assertions.assertTrue(ok);
ok = channel.exchangeDelete("testExchange");
Assertions.assertTrue(ok);
// 此处稳妥起见, 把改关闭的要进行关闭.
channel.close();
connection.close();
}
@Test
public void testQueue() throws IOException {
Connection connection = factory.newConnection();
Assertions.assertNotNull(connection);
Channel channel = connection.createChannel();
Assertions.assertNotNull(channel);
boolean ok = channel.queueDeclare("testQueue", true);
Assertions.assertTrue(ok);
ok = channel.queueDelete("testQueue");
Assertions.assertTrue(ok);
channel.close();
connection.close();
}
@服务器托管网Test
public void testBinding() throws IOException {
Connection connection = factory.newConnection();
Assertions.assertNotNull(connection);
Channel channel = connection.createChannel();
Assertions.assertNotNull(channel);
boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);
Assertions.assertTrue(ok);
ok = channel.queueDeclare("testQueue", true);
Assertions.assertTrue(ok);
ok = channel.queueBind("testQueue", "testExchange", "testBindingKey");
Assertions.assertTrue(ok);
ok = channel.queueUnbind("testQueue", "testExchange");
Assertions.assertTrue(ok);
channel.close();
connection.close();
}
@Test
public void testMessage() throws IOException, MqException, InterruptedException {
Connection connection = factory.newConnection();
Assertions.assertNotNull(connection);
Channel channel = connection.createChannel();
Assertions.assertNotNull(channel);
boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);
Assertions.assertTrue(ok);
ok = channel.queueDeclare("testQueue", true);
Assertions.assertTrue(ok);
byte[] requestBody = "hello".getBytes();
ok = channel.basicPublish("testExchange", "testQueue", null, requestBody);
Assertions.assertTrue(ok);
ok = channel.basicConsume("testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
System.out.println("[消费数据] 开始!");
System.out.println("consumerTag=" + consumerTag);
System.out.println("basicProperties=" + basicProperties);
Assertions.assertArrayEquals(requestBody, body);
System.out.println("[消费数据] 结束!");
}
});
Assertions.assertTrue(ok);
Thread.sleep(500);
channel.close();
connection.close();
}
}
⼗三. 案例: 基于 MQ 的⽣产者消费者模型
1.生产者
/*
* 这个类用来表示一个生产者.
* 通常这是一个单独的服务器程序.
*/
public class DemoProducer {
public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("启动生产者");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 创建交换机和队列
channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);
channel.queueDeclare("testQueue", true);
// 创建一个消息并发送
byte[] body = "hello".getBytes();
boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);
System.out.println("消息投递完成! ok=" + ok);
Thread.sleep(500);
channel.close();
connection.close();
}
}
2.消费者
/*
* 这个类表示一个消费者.
* 通常这个类也应该是在一个独立的服务器中被执行
*/
public class DemoConsumer {
public static void main(String[] args) throws IOException, MqException, InterruptedException {
System.out.println("启动消费者!");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);
channel.queueDeclare("testQueue", true);
channel.basicConsume("testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
System.out.println("[消费数据] 开始!");
System.out.println("consumerTag=" + consumerTag);
System.out.println("basicProperties=" + basicProperties);
String bodyString = new String(body, 0, body.length);
System.out.println("body=" + bodyString);
System.out.println("[消费数据] 结束!");
}
});
// 由于消费者也不知道生产者要生产多少, 就在这里通过这个循环模拟一直等待消费.
while (true) {
Thread.sleep(500);
}
}
}
3.运行结果
十四.总结
本文所有代码已上传wuchangsheng1/mq (github.com)
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net