直接上代码
package com.example.demo;
import org.aopalliance.aop.Advice;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.retry.interceptor.RetryInterceptorBuilder;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
@Configuration
public class RabbitMqConfig implements RabbitListenerConfigurer {
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, BeforePublishPostProcessor messagePostProcessor){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setBeforePublishPostProcessors(messagePostProcessor);
return rabbitTemplate;
}
public Jackson2JsonMessageConverter producerJackson2MessageConverter(){
return new Jackson2JsonMessageConverter();
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
public MessageHandlerMethodFactory messageHandlerMethodFactory(){
DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
return messageHandlerMethodFactory;
}
public MappingJackson2MessageConverter consumerJackson2MessageConverter(){
return new MappingJackson2MessageConverter();
}
/**
* 使用的时候在
* @RabbitListener(containerFactory="containerFactory")
*/
@Bean("containerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(Connec服务器托管网tionFactory connectionFactory,
AfterReceivePostProcessor messagePostProcessor,
RabbitErrorHandler rabbitErrorHandler){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAfterReceivePostProcessors(messagePostProcessor);
factory.setErrorHandler(rabbitErrorHandler);
factory.setDefaultRequeueRejected(false);
factory.setAdviceChain(new Advice[]{getRetryOperationsInterceptor()});
return factory;
}
/**
* 防止队列出错无限次重试
*/
private RetryOperationsInterceptor getRetryOperationsIntercepto服务器托管网r(){
return RetryInterceptorBuilder.stateless().maxAttempts(3)
.backOffOptions(10000,2,30000)
.build();
}
/**
* 配置了交换机队列以及绑定的配置类上加@DependsOn("rabbitAdmin")
* 可以防止交换机队列以及绑定无法创建的问题
* rabbitAdmin创建一定要在前
*/
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
AfterReceivePostProcessor
package com.example.demo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class AfterReceivePostProcessor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
log.info("接收消息属性={},body={}",message.getMessageProperties(),new String(message.getBody()));
return message;
}
}
BeforePublishPostProcessor
package com.example.demo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class BeforePublishPostProcessor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
return message;
}
@Override
public Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
log.info("发送的消息MessageProperties={},body={},correlation={},exchange={},routingKey={}",message.getMessageProperties(),message.getBody(),correlation,exchange,routingKey);
return MessagePostProcessor.super.postProcessMessage(message, correlation, exchange, routingKey);
}
}
RabbitErrorHandler
package com.example.demo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ErrorHandler;
@Slf4j
@Component
public class RabbitErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
log.error(t.getMessage(),t);
}
}
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
相关推荐: SpringBoot RabbitMQ收发消息、配置及原理
今天分析SpringBoot通过自动配置集成RabbitMQ的原理以及使用。 AMQP概念 RabbitMQ是基于AMQP协议的message broker,所以我们首先要对AMQP做一个简单的了解。 AMQP (Advanced Message Queuin…