SSM整合RabbitMQ目录
- 前言
- 版本
- 实现
-
- 目录参考
- pom.xml依赖
- rabbitmq.properties配置文件
- spring-rabbitmq.xml
- spring-mvc.xml或applicationContext.xml
- rabbitmq目录下
-
- MessageConsumer.java
- MessageConsumer2.java
- MessageProducer.java
- MessageConstant.java
- 测试调用
- 扩展
-
- 消息重发
-
- 方式一
- 方式二
- 多部署Tomcat下问题
前言
SSM框架整合RabbitMQ【比较简单,复制粘贴可用】
本人使用的Spring版本是4.x
版本
RabbitMQ相关
erl10.0.1
RabbitMQ3.7.9
安装步骤参考:https://www.cnblogs.com/saryli/p/9729591.html
相关依赖
spring4.0.2.RELEASE
spring-rabbit1.3.5.RELEASE
实现
目录参考
这是我整合时的项目结构
关键:rabbitmq文件包和rabbitmq.properties、spring-rabbitmq.xml、spring-mvc.xml
pom.xml依赖
在现成的SSM项目中整合
dependency>
groupId>org.springframework.amqpgroupId>
artifactId>spring-rabbitartifactId>
version>1.3.5.RELEASEversion>
dependency>
rabbitmq.properties配置文件
将 rabbitmq.properties配置文件添加到resources目录下
mq.host=127.0.0.1
mq.username=guest
mq.password=guest
mq.port=5672
mq.virtual-host=/
spring-rabbitmq.xml
将spring-rabbitmq.xml添加到resources目录下
beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">
bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
property name="location" value="classpath:rabbitmq.properties" />
bean>
rabbit:connection-factory id="connectionFactory"
username="${mq.username}"
password="${mq.password}"
host="${mq.host}"
port="${mq.port}"
virtual-host="${mq.virtual-host}" />
rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
rabbit:queue name="queueTest1" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false" declared-by="connectAdmin">
rabbit:bindings>
rabbit:binding queue="queueTest" key="queueTestKey">rabbit:binding>
rabbit:binding queue="queueTest1" key="queueTestKey1">rabbit:binding>
rabbit:bindings>
rabbit:direct-exchange>
rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="exchangeTest" />
bean id="messageReceiver" class="com.rabbitmq.MessageConsumer">bean>
bean id="messageReceiver1" class="com.rabbitmq.MessageConsumer2">bean>
rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
rabbit:listener queues="queueTest" ref="messageReceiver" />
rabbit:listener queues="queueTest1" ref="messageReceiver1" />
rabbit:listener-container>
context:component-scan base-package="com.rabbitmq" />
beans>
spring-mvc.xml或applicationContext.xml
我这里使用的spring-mvc.xml,根据自己配置文件使用
import resource="classpath:spring-rabbitmq.xml" />
将这个import引入添加到 spring-mvc.xml 里的最前面,如果不添加到前面可能会报错
rabbitmq目录下
这个目录下的java文件已在spring-rabbitmq.xml中进行扫描注入
MessageConsumer.java
说明:MessageConsumer和MessageConsumer2其实都可以使用同一个类,修改xml指向即可,但是分开明了些
package com.rabbitmq;
import java.nio.charset.Charset;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* @Title消息消费者
* @date 2023/10/8
*/
public class MessageConsumer implements MessageListener {
@Override
public void onMessage(Message message) {
// 逻辑处理
System.out.println("message------->:" + new String(message.getBody(), Charset.forName("utf-8")));
}
}
MessageConsumer2.java
package com.rab服务器托管网bitmq;
import java.nio.charset.Charset;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* @Title消息消费者2
* @date 2023/10/8
*/
public class MessageConsumer2 implements MessageListener {
@Override
public void onMessage(Message message) {
// 逻辑处理
System.out.println("message2------->:" + new String(message.getBody(), Charset.forName("utf-8")));
}
}
MessageProducer.java
package com.rabbitmq;
import javax.annotation.Resource;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;
/**
* @Title 消息生产者
* @date 2023/10/8
*/
@Service
public class MessageProducer {
@Resource
private AmqpTemplate amqpTemplate;
public void sendMessage(String key, Object message){
amqpTemplate.convertAndSend(key, message);
}
}
MessageConstant.java
package com.rabbitmq;
/**
* @Title 消息队列常量
* @date 2023/10/8
*/
publ服务器托管网ic class MessageConstant{
public static String queueTestKey = "queueTestKey";
public static String queueTestKey1 = "queueTestKey1";
}
测试调用
比如这个下面在某个类里作为接口调用测试
@Autowired
private MessageProducer messageProducer;
@RequestMapping(value = "/testMq")
@ResponseBody
public Result testMq(HttpServletRequest request) throws IOException {
messageProducer.sendMessage(MessageConstant.queueTestKey, "登录");
messageProducer.sendMessage(MessageConstant.queueTestKey1, "退出");
return Result.success("测试成功");
}
调用接口后打印结果
连接结果
以上即可!
扩展
包括消息手动确认,消息失败重新加入队列处理
消息重发
SpringBoot版可在配置文件中设置,且异常后直接抛出即可
方式一
package com.rabbitmq;
import java.nio.charset.Charset;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import com.rabbitmq.client.Channel;
/**
* @Title 消息消费者
* @date 2023/10/8
*/
public class MessageConsumer2 implements ChannelAwareMessageListener {
private int aa = 1;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
// 逻辑处理
if(aa == 1) {
aa = 2;
int a = 1/0;
}
System.out.println("成功处理确认message2------->:" + new String(message.getBody(), Charset.forName("utf-8")));
// 消费者ack确认【消息处理成功确认】
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e) {
System.out.println("失败重新入队message2------->:" + new String(message.getBody(), Charset.forName("utf-8")));
// 消费者reject确认【消息失败重新加入队列-重发】
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
方式二
MessageConstant.java中加入
/** 重试次数 3 */
public static Integer RETRY_COUNT = 3;
消息接收处理类
package com.rabbitmq;
import java.nio.charset.Charset;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import com.alibaba.fastjson.JSONObject;
import com.bean.ConsumptionRequest;
import com.rabbitmq.client.Channel;
import com.service.ReceiveDormitoryService;
/**
* 宿舍mq消息处理
*
* @author Administrator
*/
public class MessageConsumerSuShe implements ChannelAwareMessageListener {
private final Logger logger = Logger.getLogger(MessageConsumerSuShe.class);
@Autowired
private ReceiveDormitoryService service;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
int retryCount = 0; // 重试机制
long deliveryTag = message.getMessageProperties().getDeliveryTag();
while(retryCount MessageConstant.RETRY_COUNT) {
retryCount ++;
try {
// 逻辑处理
String s = new String(message.getBody(), Charset.forName("utf-8"));
ConsumptionRequest bean = JSONObject.parseObject(s, ConsumptionRequest.class);
//service.uploadData(bean, bean.getPath());
//logger.info("【SUSHE_QUEUE_KEY宿舍队列成功】:" + new String(message.getBody(), Charset.forName("utf-8")));
// 消费者ack确认【消息处理成功确认】
channel.basicAck(deliveryTag, false);
return;
}catch (Exception e) {
logger.error("【SUSHE_QUEUE_KEY宿舍队列错误,重试"+retryCount+"】:" + new String(message.getBody(), Charset.forName("utf-8")));
// 0.5s重试一次
Thread.sleep(500);
}
}
// 重试3次后直接处理(这里设置为死信消息)
if(retryCount >= MessageConstant.RETRY_COUNT) {
channel.basicNack(deliveryTag, false, false);
}
}
}
多部署Tomcat下问题
本人使用单个RabbitMQ服务
测试两个Tomcat服务连接同一个交换机和队列进行发送消息,并没有造成两个Tomcat服务都推送处理这条消息,而是单个Tomcat处理了这条消息
所以未造成多部署下一条消息多服务处理问题
其他
参考类似博客1:https://blog.csdn.net/u012988901/article/details/89499634
参考类似博客2:https://blog.csdn.net/weixin_42654295/article/details/109006276
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
本文是使用Util应用框架开发 Web Api 项目快速入门教程. 前面已经详细介绍了环境搭建,如果你还未准备好,请参考前文. 开发流程概述 创建代码生成专用数据库. Util应用框架需要专门用来生成代码的数据库,该数据库仅用于代码生成. 约定: 代码生成数据…