简单记录一下实现的整体框架,具体细节在实际生产中再细化就可以了。
第一步 引入netty依赖
SpringBoot的其他必要的依赖像Mybatis、Lombok这些都是老生常谈了 就不在这里放了
dependency>
groupId>io.nettygroupId>
artifactId>netty-allartifactId>
version>4.1.85.Finalversion>
dependency>
第二步接下来就是准备工作。
消息服务类(核心代码) 聊天服务的功能就是靠这个类的start()函数来启动的 绑定端口8087 之后可以通socket协议访问这个端口来执行通讯
import com.bxt.demo.im.handler.WebSocketHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @Description: 即时通讯服务类 * @author: bhw * @date: 2023年09月27日 13:44 */ @Slf4j public class IMServer {
// 用来存放连入服务器的用户集合 public static final Map USERS = new ConcurrentHashMap(1024); // 用来存放创建的群聊连接 public static final ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); publ服务器托管网ic static void start() throws InterruptedException { log.info("IM服务开始启动"); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); // 绑定端口 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 添加http编码解码器 pipeline.addLast(new HttpServerCodec()) //支持大数据流 .addLast(new ChunkedWriteHandler()) // 对http消息做聚合操作 FullHttpRequest FullHttpResponse .addLast(new HttpObjectAggregator(1024*64)) //支持websocket .addLast(new WebSocketServerProtocolHandler("/")) .addLast(new WebSocketHandler()); } }); ChannelFuture future = bootstrap.bind(8087).sync(); log.info("服务器启动开始监听端口: {}", 8087); future.channel().closeFuture().sync(); //关闭主线程组 bossGroup.shutdownGracefully(); //关闭工作线程组 workGroup.shutdownGracefully(); } }
创建聊天消息实体类
/** * @Description: 聊天消息对象 可以自行根据实际业务扩展 * @author: seizedays */ @Data public class ChatMessage extends IMCommand { //消息类型 private Integer type; //消息目标对象 private String target; //消息内容 private String content; }
连接类型枚举类,暂时定义为建立连接、发送消息和加入群组三种状态码
@AllArgsConstructor @Getter public enum CommandType { //建立连接 CONNECT(10001), //发送消息 CHAT(10002), //加入群聊 JOIN_GROUP(10003), ERROR(-1) ; private Integer code; public static CommandType match(Integer code){ for (CommandType value : CommandType.values()) { if (value.code.equals(code)){ return value; } } return ERROR; } }
命令动作为聊天的时候 消息类型又划分为私聊和群聊两种 枚举类如下:
@AllArgsConstructor @Getter public enum MessageType { //私聊 PRIVATE(1), //群聊 GROUP(2), ERROR(-1) ; private Integer type; public static MessageType match(Integer code){ for (MessageType value : MessageType.values()) { if (value.type.equals(code)){ return value; } } return ERROR; } }
创建连接请求的拦截器
import com.alibaba.fastjson2.JSON; 服务器托管网import com.bxt.common.vo.Result; import com.bxt.demo.im.cmd.IMCommand; import com.bxt.demo.im.server.IMServer; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; /** * @Description: 用户连接到服务端的拦截器 * @author: bhw * @date: 2023年09月27日 14:28 */ public class ConnectionHandler { public static void execute(ChannelHandlerContext ctx, IMCommand command) { if (IMServer.USERS.containsKey(command.getNickName())) { ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error(command.getNickName() + "已经在线,不能重复连接")))); ctx.channel().disconnect(); return; } IMServer.USERS.put(command.getNickName(), ctx.channel()); ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("系统消息:" + command.getNickName() + "与服务端连接成功")))); ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success(JSON.toJSONString(IMServer.USERS.keySet()))))); } }
加入群组功能的拦截器
/** * @Description: 加入群聊拦截器 * @author: bhw * @date: 2023年09月27日 15:07 */ public class JoinGroupHandler { public static void execute(ChannelHandlerContext ctx) { try { IMServer.GROUP.add(ctx.channel()); ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("加入系统默认群组成功!")))); } catch (Exception e) { ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("消息内容异常")))); } } }
发送聊天到指定对象的功能拦截器
import com.alibaba.excel.util.StringUtils; import com.alibaba.fastjson2.JSON; import com.bxt.common.vo.Result; import com.bxt.demo.im.cmd.ChatMessage; import com.bxt.demo.im.cmd.MessageType; import com.bxt.demo.im.server.IMServer; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import java.util.Objects; /** * @Description: 聊天拦截器 * @author: bhw * @date: 2023年09月27日 15:07 */ public class ChatHandler { public static void execute(ChannelHandlerContext ctx, TextWebSocketFrame frame) { try { ChatMessage message = JSON.parseObject(frame.text(), ChatMessage.class); MessageType msgType = MessageType.match(message.getType()); if (msgType.equals(MessageType.PRIVATE)) { if (StringUtils.isBlank(message.getTarget())){ ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系统消息:消息发送失败,请选择消息发送对象")))); return; } Channel channel = IMServer.USERS.get(message.getTarget()); if (Objects.isNull(channel) || !channel.isActive()){ ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系统消息:消息发送失败,对方不在线")))); IMServer.USERS.remove(message.getTarget()); return; } channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("私聊消息(" + message.getTarget() + "):" + message.getContent())))); } else if (msgType.equals(MessageType.GROUP)) { IMServer.GROUP.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("群消息:发送者(" + message.getNickName() + "):" + message.getContent())))); }else { ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系统消息:不支持的消息类型")))); } } catch (Exception e) { ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("消息内容异常")))); } } }
最后是websocket拦截器 接收到客户端的指令后选择对应的拦截器实现相应的功能:
import com.alibaba.fastjson2.JSON; import com.bxt.common.vo.Result; import com.bxt.demo.im.cmd.CommandType; import com.bxt.demo.im.cmd.IMCommand; import com.bxt.demo.im.server.IMServer; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.extern.slf4j.Slf4j; /** * @Description: websocket拦截器 * @author: bhw * @date: 2023年09月27日 13:59 */ @Slf4j public class WebSocketHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) { System.out.println(frame.text()); try { IMCommand command = JSON.parseObject(frame.text(), IMCommand.class); CommandType cmdType = CommandType.match(command.getCode()); if (cmdType.equals(CommandType.CONNECT)){ ConnectionHandler.execute(ctx, command); } else if (cmdType.equals(CommandType.CHAT)) { ChatHandler.execute(ctx,frame); } else if (cmdType.equals(CommandType.JOIN_GROUP)) { JoinGroupHandler.execute(ctx); } else { ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("不支持的code")))); } }catch (Exception e){ ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error(e.getMessage())))); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 当连接断开时被调用 Channel channel = ctx.channel(); // 从 USERS Map 中移除对应的 Channel removeUser(channel); super.channelInactive(ctx); } private void removeUser(Channel channel) { // 遍历 USERS Map,找到并移除对应的 Channel IMServer.USERS.entrySet().removeIf(entry -> entry.getValue() == channel); } }
第三步 启动服务
@SpringBootApplication public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); // 启动IM服务 try { IMServer.start(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
现在 客户端通过socket协议访问8087端口即可实现基本的聊天室功能了!
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
作者:马伟,青云科技容器顾问,云原生爱好者,目前专注于云原生技术,云原生领域技术栈涉及 Kubernetes、KubeSphere、KubeKey 等。 2019 年,我在给很多企业部署虚拟化,介绍虚拟网络和虚拟存储。 2023 年,这些企业都已经上了云原生了…