工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者。
举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败)。如果在一个系统中,用户注册信息有邮箱、手机号,那么在注册完后会向邮箱和手机号都发送注册完成信息。利用MQ实现业务异步处理,如果是用工作队列的话,就会声明一个注册信息队列。注册完成之后生产者会向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息。但是实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应该放在一块处理。这个时候就可以利用发布/订阅模式将消息发送到转换机(EXCHANGE),声明两个不同的队列(邮箱、手机),并绑定到交换机。这样生产者只需要发布一次消息,两个队列都会接收到消息发给对应的消费者。
1、什么是发布/订阅模式(Publish/Subscribe)
简单解释就是,可以将消息发送给不同类型的消费者。做到发布一次,消费多个。下图取自于官方网站(RabbitMQ)的发布/订阅模式的图例
P:消息的生产者
X:交换机
红色:队列
C1,C2:消息消费者
下面是利用用户注册解释的该模式。(先运行两个消费者,在运行生产者。如果没有提前将队列绑定到交换机,那么直接运行生产者的话,消息是不会发到任何队列里的)
2、生产者(Send)代码
public class Send
{
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args)
{
try
{
//获取连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明交换机(分发:发布/订阅模式)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//发送消息
for (int i = 0; i
[send]:this is user registe message0
[send]:this is user registe message1
[send]:this is user registe message2
[send]:this is user registe message3
[send]:this is user registe message4
[send]:this is user registe message5
[send]:this is user registe message6
[send]:this is user registe message7
[send]:this is user registe message8
[send]:this is user registe message9
3、消费者1(ReceiveEmail)
public class ReceiveEmail
{
//交换机名称
private final static String EXCHANGE_NAME = "test_exchange_fanout";
//队列名称
private static final String QUEUE_NAME = "test_queue_email";
public static void main(String[] args)
{
try
{
//获取连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
final Channel channel = connection.createChannel();
//声明交换机(分发:发布/订阅模式)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//保证一次只分发一个
int prefetchCount = 1;
channel.basicQos(prefetchCount);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel)
{
//当消息到达时执行回调方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException
{
String message = new String(body, "utf-8");
System.out.println("[email] Receive message:" + message);
try
{
//消费者休息2s处理业务
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
System.out.println("[1] done");
//手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//设置手动应答
boolean autoAck = false;
//监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
运行结果:
[email] Receive message:this is user registe message0
[1] done
[email] Receive message:this is user registe message1
[1] done
[email] Receive message:this is user registe message2
[1] done
[email] Receive message:this is user registe message3
[1] done
[email] Receive message:this is user registe message4
[1] done
[email] Receive message:this is user registe message5
[1] done
[email] Receive message:this is user registe message6
[1] done
[email] Receive message:this is user registe message7
[1] done
[email] Receive message:this is user registe message8
[1] done
[email] Receive message:this is user registe message9
[1] done
4、消费者2(ReceivePhone)
public class ReceivePhone
{
//交换机名称
private final static String EXCHANGE_NAME = "test_exchange_fanout";
//队列名称
private static final String QUEUE_NAME = "test_queue_phone";
public static void main(String[] args)
{
try
{
//获取连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
final Channel channel = connection.createChannel();
//声明交换机(分发:发布/订阅模式)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//将队列绑定到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//保证一次只分发一个
int prefetchCount = 1;
channel.basicQos(prefetchCount);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel)
{
//当消息到达时执行回调方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException
{
String message = new String(body, "utf-8");
System.out.println("[phone] Receive message:" + message);
try
{
//消费者休息2s处理业务
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
System.out.println("[2] done");
//手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//设置手动应答
boolean autoAck = false;
//监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
运行结果:
[phone] Receive message:this is user registe message0
[2] done
[phone] Receive message:this is user registe message1
[2] done
[phone] Receive message:this is user registe message2
[2] done
[phone] Receive message:this is user registe message3
[2] done
[phone] Receive message:this is user registe message4
[2] done
[phone] Receive message:this is user registe message5
[2] done
[phone] Receive message:this is user registe message6
[2] done
[phone] Receive message:this is user registe message7
[2] done
[phone] Receive message:this is user registe message8
[2] done
[phone] Receive message:this is user registe message9
[2] done
总结:
1、该模式下生产者并不是直接操作队列,而是将数据发送给交换机,由交换机将数据发送给与之绑定的队列。从运行结果中可以看到,两中类型的消费者(Email,Phone)都收到相同数量的消息。
2、该模式必须声明交换机,并且设置模式:channel.exchangeDeclare(EXCHANGE_NAME, “fanout”) fanout指分发模式(将每一条消息都发送到与交换机绑定的队列。
3、 队列必须绑定交换机:channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “”);
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
相关推荐: 老夫的正则表达式大成了,桀桀桀桀!!!【Python 正则表达式笔记】
一、正则表达式语法 (一) 字符与字符类 特殊字符 .^$?+*{}[]()| 为特殊字符,若想要使用字面值,必须使用 进行转义 字符类 [] [] 匹配包含在方括号中的任何字符。它也可以指定范围,例: [a-zA-Z0-9]表示a到z,A到Z,0到9之间的任…