配置方式
@Bean(name = "connectionFactory")
@Primary
public ConnectionFactory normalConnectionFactory(
@Value("${spring.rabbitmq.username}") String username,
@Value("${spring.rabbitmq.password}") String password,
@Value("${spring.rabbitmq.addresses}") String address) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(address);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
// connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setExecutor(createThreadPool(10,
20,
"mq-connection-",
"mq-connection-group"));
return connectionFactory;
}
或者配置文件里配置
spring:
# RabbitMQ 配置项,对应 RabbitProperties 配置类
rabbitmq:
publisher-confirm-type: correlated
publisher-confirm-type属性有三个可选值:
- none(默认):关闭发布确认模式。
- correlated:消息从生产者发送到交换机后触发回调方法。
- simple:会触发回调方法,相当于单个确认(发一条确认一条)。在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。
使用方式
SIMPLE模式
开启simple模式需要在invoke方法中一起执行 rabbitTemplate.waitForConfirms
同时也会收到回调,回调后结束阻塞,同时可以获取到返回结果。
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
System.out.println(correlationData.toString() + "发送成功");
}else {
System.out.println(correlationData.toString() + "发送失败, 原因: " + cause);
}
}
};
rabbitTemplate.setConfirmCallback(confirmCallback);
Boolean invoke = rabbitTemplate.invoke(operations -> {
rabbitTemplate.convertAndSend("direct_exchange", "ROUTING_KEY_01", message, correlationData);
return rabbitTemplate.waitForConfirms(1000l);
});
CORRELATED模式
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
Syst服务器托管网em.out.println(correlationData.toString() + "发送成功");
}else {
System.out.println(correlationData.toString() + "发送失败, 原因: " + cause);
}
}
};
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.convertAndSend("direct_exchange", "ROUTING_KEY_01", message, correlationData);
// correlationData.getFuture().get();
sleep(1000*60);
System.out.println("发送消息boot mq hello Direct成功");
实现通过callback实现保证消息发送成功。
可以看出来,在开启publisher-confirm的情况下,如果不自行实现ConfirmCallback的逻辑,也无法做到保证消息成功发送。
可以在发送消息时更新为发送中。
收到callback更新为发送成功,或者发送失败。
对于发送失败的安排重试,可以在消息头加上重试次数记录重试次数,服务器托管网达到指定次数,更新为发送失败。
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
相关推荐: 实力出圈,开源网安连续4年入选中国网络安全企业100强
近日,安全牛第十一版《中国网络安全企业100强》正式发布。开源网安突出的综合实力、技术创新能力,以及前沿技术的落地应用成果,再次受到权威认可,从数百家安全厂商中脱颖而出,连续多年上榜百强榜单。 《中国网络安全企业100强》评选,由中国计算机学会抗恶劣环境计算机…