1.Return机制
Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。
而且exchange是不能持久化消息的,queue是可以持久化消息。
采用Return机制来监听消息是否从exchange送到了指定的queue中
2.Java的实现方式
1.导入依赖
com.rabbitmq
amqp-client
5.6.0
2.生产者的实现方式
采用Return机制来监听消息是否从exchange送到了指定的queue中
package com.qf.mq2302.hello;
import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;
import java.io.IOException;
public class SendRetrun {
public static final String QUEUE_NAME="hello-queue";
public static void main(String[] args) throws Exception {
//1.获取连接对象
Connection conn = MQUtils.getConnection();
//2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
Channel channel = conn.createChannel();
//3.声明了一个队列
/**
* queue – the name of the queue
* durable – true代表创建的队列是持久化的(当mq重启后,该队列依然存在)
* exclusive – 该队列是不是排他的 (该对立是否只能由当前创建该队列的连接使用)
* autoDelete – 该队列是否可以被mq服务器自动删除
* arguments – 队列的其他参数,可以为null
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//开启 return 机制
//编写回调方法
channel.addReturnListener(new ReturnListener() {
//如果消息没有成功发送到队列,这个方法会被调用
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("====================ReturnListener==================");
System.out.println("replyCode:"+replyCode);
System.out.println("replyText:"+replyText);
System.out.println("exchange:"+exchange);
System.out.println("routingKey:"+routingKey);
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body,"utf-8"));
System.out.println("====================ReturnListener==================");
}
});
String message = "Hello doubleasdasda!";
//生产者如何发送消息,使用下面的方法即可
/**
* exchange – 交换机的名字 ,如果是空串,说明是把消息发给了默认交换机
* routingKey – 路由的key,当发送消息给默认交换机时,routingkey代表队列的名字
* other properties - 消息的其他属性,可以为null
* body – 消息的内容,注意,要是有 字节数组
*/
//注意:如果要使用生产者的return机制,需要在发送消息时,指定mandatory(强制性)为true
channel.basicPublish("", "sadnaas", true,null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(1000);
// 关闭资源
channel.close();
conn.close();
}
}
这个必须要加上才能让rutern返回机制生效
3.消费者的实现方式
package com.qf.mq2302.hello;
import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
public class Recv {
private final static String QUEUE_NAME="hello-queue";
public static void main(String[] args) throws Exception {
//1.获取连接对象
Connection conn = MQUtils.getConnection();
//2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
Channel channel = conn.createChannel();
/**
* 第一个参数队列名称
* 第二个参数,耐用性
* 第三个参数排外性
* 第四个参数是否自动删除
* 第五个参数,可以定义什么类型的队列
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//3.该消费者收到消息之后的处理逻辑,写在DeliverCallback对象中
DeliverCallback deliverCallback =new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(consumerTag);
//从Delivery对象中可以获取到生产者,发送的消息的字节数组
byte[] body = message.getBody();
String msg = new String(body, "utf-8");
//在这里写消费者的业务逻辑,例如,发送邮件
System.out.println(msg);
}
};
//4.让当前消费者开始消费(QUEUE_NAME)队列中的消息
/**
* queue – the name of the queue
* autoAck – true 代表当前消费者是不是自动确认模式。true代表自动确认。
* deliverCallback – 当有消息发送给该消费者时,消费者如何处理消息的逻辑
* cancelCallback – 当消费者被取消掉时,如果要执行代码,写到这里
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {});
}
}
3.整合springboot实现
1.导入依赖
org.springframework.boot
spring-boot-starter-amqp
2.yml配置文件
spring:
rabbitmq:
host: 8.140.244.227
port: 6786
username: test
password: test
virtual-host: /test
publisher-returns: true #开启return机制
3.RabbitMQ配置文件
package com.qf.bootmq2302.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
//设置连接工厂对象
rabbitTemplate.setConnectionFactory(cachingConnectionFactory);
// 开启return机制
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("message:"+new String(message.getBody()));
System.out.println("replyCode:"+replyCode);
System.out.println("replyText:"+replyText);
System.out.println("exchange:"+exchange);
System.out.println("routingKey:"+routingKey);
}
服务器托管网 });
return rabbitTemplate;
}
}
4.生产者的controller
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/test1")
public String test1(String msg,String routkey){
System.out.println(msg);
服务器托管网 String exchangeName = "";//默认交换机
String routingkey = routkey;//队列名字
//生产者发送消息
rabbitTemplate.convertAndSend(exchangeName,routingkey,msg);
return "ok";
}
5.消费者写一个队列
@RabbitListener(queues = "queueA")
public void getMsg1(Map data, Channel channel,Message message) throws IOException {
System.out.println(data);
//手动ack//若开启手动ack,不给手动ack,就按照 prefetch: 1 #等价于basicQos(1)的量,就这么多,不会多给你了,因为你没有确认。确认一条,就给你一条
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
6.消费者的配置文件
spring:
rabbitmq:
host: 8.140.244.227
port: 6786
username: test
password: test
virtual-host: /test
#手动ACK
listener:
simple:
acknowledge-mode: manual # 手动ack
prefetch: 1 #等价于basicQos(1)
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net