SpringBoot WebSocket STOMP
关键词:Springboot, WebSocket, STOMP, broadcast, sendToUser, MessageMapping, SubscribeMapping, convertAndSendToUser
STOMP是一种发布订阅的模式,被订阅者发布消息以广播形式发送。如果需要一对一发送或者说指定某个客户端发送,springboot提供了convertAndSendToUser方法去指定user进行发送。
本文实现了既有广播形式,也有指定user发送形式,以做对比。
代码参考
maven
org.springframework.boot
spring-boot-starter-websocket
WebSocketConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
/**
* {@code @author:} keboom
* {@code @date:} 2023/9/21
* {@code @description:}
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/mobicaster-websocket/{androidID}").setAllowedOrigins("*").
setHandshakeHandler(new CustomHandshakeHandler());
}
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(8192);
container.setMaxBinaryMessageBufferSize(8192);
// container.setMaxSessionIdleTimeout(10000L);
return container;
}
}
registry.addEndpoint("/mobicaster-websocket/{androidID}")
这个是网页向服务器开启一个websocket连接的url地址。{androidID} (这个名字大家随便起哈,随便叫devId,sessionId都行)是作为一个websocket标识,这样我们服务器想要主动向一个websocket客户端发送message时,可以知道应该向谁发。
config.setApplicationDestinationPrefixes("/app");
这个是网页向服务器发送消息的uri前缀
config.enableSimpleBroker("/topic");
这个是服务器向网页发送消息的 uri 的“前缀”
setHandshakeHandler(new CustomHandshakeHandler());
顾名思义,这是websocket握手时,做一些自定义处理的handler。
ServletServerContainerFactoryBean
这个大家根据需求配了。setMaxSessionIdleTimeout这个在网页上的表示时,如果在一定时期没有发送任何消息,那么当前连接断开,然后建立一个新链接服务器托管网。
CustomHandshakeHandler
在这个类里面,我们对每个websocket做标识,标识每个websocket的user是什么,到时候服务器主动发送message,将根据user去发。
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
import java.security.Principal;
import java.util.Map;
/**
* {@code @author:} keboom
* {@code @date:} 2023/9/22
* {@code @description:}
*/
public class CustomHand服务器托管网shakeHandler extends DefaultHandshakeHandler {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map attributes) {
String uri = request.getURI().toString();
String androidID = uri.substring(uri.lastIndexOf("/") + 1);
return () -> androidID;
}
}
Controller
接下来看看我们的controller:
import com.cogent.mobicasterserver.controller.vo.DeviceInfo;
import com.cogent.mobicasterserver.controller.vo.FoldbackReq;
import com.cogent.mobicasterserver.controller.vo.LiveReq;
import com.cogent.mobicasterserver.controller.vo.RespVO;
import jakarta.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import static org.springframework.web.bind.annotation.RequestMethod.GET;
/**
* {@code @author:} keboom
* {@code @date:} 2023/9/21
* {@code @description:}
*/
@RestController
public class MobiController {
private SimpMessagingTemplate template;
@Autowired
public MobiController(SimpMessagingTemplate template) {
this.template = template;
}
@RequestMapping(path = "/foldback", method = GET)
public void foldback(FoldbackReq req) {
this.template.convertAndSendToUser(req.getAndroidID(), "/topic/foldback", req.toString());
}
@MessageMapping("/greetings")
@SendTo("/topic/greetings")
public RespVO greetings(@Payload LiveReq liveReq) throws Exception {
// do something ....
return new RespVO(200, "success", null);
}
@MessageMapping("/live")
@SendToUser("/topic/live")
public RespVO live(@Payload LiveReq liveReq) throws Exception {
// do something ....
return new RespVO(200, "success", null);
}
}
网上很多资料写的用 @Controller,我这边用的@RestController 完全没问题。
对于 sendToUser的,uri前缀需要加 /user ,这个通过下面的网页端 js 代码更清晰,还有就是我们看浏览器开发者工具的具体websocket的message更清楚,这里就不说每个注解的意思了。
app.js
const stompClient = new StompJs.Client({
brokerURL: 'ws://localhost:8082/mobicaster-websocket/androidId1234'
});
// --------------------------------------------------------------------------------------------
const stompClient2 = new StompJs.Client({
brokerURL: 'ws://localhost:8082/mobicaster-websocket/androidId2345'
});
stompClient2.onConnect = (frame) => {
stompClient2.subscribe('/topic/greetings', (greeting) => {
showGreeting(JSON.parse(greeting.body).content);
});
stompClient2.subscribe('/user/topic/live', (greeting) => {
// showGreeting(JSON.parse(greeting.body).content);
console.log('Live: ' + greeting);
});
stompClient2.subscribe('/user/topic/foldback', (greeting) => {
// showGreeting(JSON.parse(greeting.body).content);
console.log('foldback: ' + greeting);
});
};
stompClient2.onWebSocketError = (error) => {
console.error('Error with websocket', error);
};
stompClient2.onStompError = (frame) => {
console.error('Broker reported error: ' + frame.headers['message']);
console.error('Additional details: ' + frame.body);
};
// --------------------------------------------------------------------------------------------
stompClient.onConnect = (frame) => {
setConnected(true);
console.log('Connected: ' + frame);
stompClient.subscribe('/topic/greetings', (greeting) => {
showGreeting(JSON.parse(greeting.body).content);
});
stompClient.subscribe('/user/topic/live', (greeting) => {
// showGreeting(JSON.parse(greeting.body).content);
console.log('Live: ' + greeting);
});
stompClient.subscribe('/user/topic/foldback', (greeting) => {
// showGreeting(JSON.parse(greeting.body).content);
console.log('foldback: ' + greeting);
});
};
stompClient.onWebSocketError = (error) => {
console.error('Error with websocket', error);
};
stompClient.onStompError = (frame) => {
console.error('Broker reported error: ' + frame.headers['message']);
console.error('Additional details: ' + frame.body);
};
function setConnected(connected) {
$("#connect").prop("disabled", connected);
$("#disconnect").prop("disabled", !connected);
if (connected) {
$("#conversation").show();
} else {
$("#conversation").hide();
}
$("#greetings").html("");
}
function connect() {
stompClient.activate();
stompClient2.activate();
}
function disconnect() {
stompClient.deactivate();
stompClient2.deactivate();
setConnected(false);
console.log("Disconnected");
}
function sendName() {
stompClient.publish({
destination: "/app/greetings",
body: JSON.stringify({'androidId': "abad123svbasd12", 'liveAction': "start"})
});
stompClient.publish({
destination: "/app/live",
body: JSON.stringify({'androidId': "abad123svbasd12", 'liveAction': "start"})
});
stompClient2.publish({
destination: "/app/greetings",
body: JSON.stringify({'androidId': "abad123svbasd12", 'liveAction': "start"})
});
stompClient2.publish({
destination: "/app/live",
body: JSON.stringify({'androidId': "abad123svbasd12", 'liveAction': "start"})
});
}
function showGreeting(message) {
$("#greetings").append("" + message + " ");
}
$(function () {
$("form").on('submit', (e) => e.preventDefault());
$("#connect").click(() => connect());
$("#disconnect").click(() => disconnect());
$("#send").click(() => sendName());
});
这里我发起了两个连接,主要对比广播和一对一的user发送的效果。
index.html
Hello WebSocket
Seems your browser doesn't support Javascript! Websocket relies on Javascript being
enabled. Please enable
Javascript and reload this page!
Greetings
查看浏览器,打开开发者工具
点击connect
查看这两个websocket的url,查看message订阅的uri。
/user/topic/live和/user/topic/foldback是指定user进行发送的。
/topic/greetings是进行广播。
点击send
根据上面 app.js 的代码,或者我们从message中也可以看到,我们向服务器send :/app/greetings和 /app/live。
我们根据controller的代码,看到/app/greetings发送到此方法:
@MessageMapping("/greetings")
@SendTo("/topic/greetings")
public RespVO greetings(@Payload LiveReq liveReq) throws Exception {
return new RespVO(200, "success", null);
}
SendTo注解,意思是广播。
@MessageMapping("/live")
@SendToUser("/topic/live")
public RespVO live(@Payload LiveReq liveReq) throws Exception {
return new RespVO(200, "success", null);
}
SendToUser注解,则标识只向此发送 /app/live 的websocket 返回 /user/topic/live
在 app.js代码中,我们的两个websocket连接分别向服务器发送了 /app/greetings和 /app/live ,因此我们可以看到这两个websocket分别接受到了两个/topic/greetings和一个/user/topic/live。说明 /topic/greetings 确实是广播,/user/topic/live确实一对一的。
主动向浏览器发送message
private SimpMessagingTemplate template;
@Autowired
public MobiController(SimpMessagingTemplate template) {
this.template = template;
}
@RequestMapping(path = "/foldback", method = GET)
public void foldback(FoldbackReq req) {
this.template.convertAndSendToUser(req.getAndroidID(), "/topic/foldback", req.toString());
}
接着查看网页上的两个websocket:
可以看到只有其中对应的websocket接受到了/user/topic/foldback。
other
SubscribeMapping
https://medium.com/swlh/websockets-with-spring-part-3-stomp-over-websocket-3dab4a21f397
此注解当客户端发起订阅后,服务器就立刻发送message给客户端,这种一次性的,主要用来做初始化操作。
endpoint
多个endpoint,虽然在java上可以进行配置,但是网络上并没有看到对此更详细的使用。
参考资料:
https://spring.io/guides/gs/messaging-stomp-websocket/
https://docs.spring.io/spring-framework/reference/web/websocket/stomp/overview.html
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
def test4(): df = pd.DataFrame({‘key1’: [‘a’, ‘a’, ‘b’, ‘b’, ‘a’], ‘key2’:[‘one’, ‘two’, ‘one’, ‘two’, ‘one’], ‘data1’:np.random.r…