基于 DotNetty 实现通信
DotNetty : 是微软的 Azure 团队,使用 C#实现的 Netty 的版本发布。是.NET 平台的优秀网络库。
项目介绍
OpenDeploy.Communication
类库项目,是通信相关基础设施层
-
Codec
模块实现编码解码 -
Convention
模块定义约定,比如抽象的业务 Handler, 消息载体NettyMessage
, 消息上下文 ‘NettyContext’ 等
自定义消息格式
消息类为 NettyMessage
,封装了消息头 NettyHeader
和消息体 Body
NettyMessage
封装了消息头
NettyHeader
和消息体Body
NettyMessage 点击查看代码
/// Netty服务器托管网消息
public class NettyMessage
{
/// 消息头
public NettyHeader Header { get; init; } = default!;
/// 消息体(可空,可根据具体业务而定)
public byte[]? Body { get; init; }
/// 消息头转为字节数组
public byte[] GetHeaderBytes()
{
var headerString = Header.ToString();
return Encoding.UTF8.GetBytes(headerString);
}
/// 是否同步消息
public bool IsSync() => Header.Sync;
/// 创建Netty消息工厂方法
public static NettyMessage Create(string endpoint, bool sync = false, byte[]? body = null)
{
return new NettyMessage
{
Header = new NettyHeader { EndPoint = endpoint, Sync = sync },
Body = body
};
}
/// 序列化为JSON字符串
public override string ToString() => Header.ToString();
}
NettyHeader
消息头,包含请求唯一标识,是否同步消息,终结点等, 在传输数据时会序列化为 JSON
NettyHeader 点击查看代码
/// Netty消息头
public class NettyHeader
{
/// 请求消息唯一标识
public Guid RequestId { get; init; } = Guid.NewGuid();
/// 是否同步消息, 默认false是异步消息
public bool Sync { get; init; }
/// 终结点 (借鉴MVC,约定为Control/Action模式)
public string EndPoint { get; init; } = string.Empty;
/// 序列化为JSON字符串
public override string ToString() => this.ToJsonString();
}
- 请求消息唯一标识
RequestId
, 用来唯一标识消息, 主要用于 发送同步请求, 因为默认的消息是异步的,只管发出去,不需要等待响应 - 是否同步消息
Sync
, 可以不需要,主要为了可视化,便于调试 - 终结点
EndPoint
, (借鉴 MVC,约定为 Control/Action 模式), 服务端直接解析出对应的处理器
编码器
DefaultEncoder 点击查看代码
public class DefaultEncoder : MessageToByteEncoder
{
protected override void Encode(IChanne服务器托管网lHandlerContext context, NettyMessage message, IByteBuffer output)
{
//消息头转为字节数组
var headerBytes = message.GetHeaderBytes();
//写入消息头长度
output.WriteInt(headerBytes.Length);
//写入消息头字节数组
output.WriteBytes(headerBytes);
//写入消息体字节数组
if (message.Body != null && message.Body.Length > 0)
{
output.WriteBytes(message.Body);
}
}
}
解码器
DefaultDecoder 点击查看代码
public class DefaultDecoder : MessageToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List
NettyServer 实现
NettyServer 点击查看代码
public static class NettyServer
{
///
/// 开启Netty服务
///
public static async Task RunAsync(int port = 20007)
{
var bossGroup = new MultithreadEventLoopGroup(1);
var workerGroup = new MultithreadEventLoopGroup();
try
{
var bootstrap = new ServerBootstrap().Group(bossGroup, workerGroup);
bootstrap
.Channel()
.Option(ChannelOption.SoBacklog, 100)
.Option(ChannelOption.SoReuseaddr, true)
.Option(ChannelOption.SoReuseport, true)
.ChildHandler(new ActionChannelInitializer(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
pipeline.AddLast("framing-enc", new LengthFieldPrepender(4));
pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
pipeline.AddLast("decoder", new DefaultDecoder());
pipeline.AddLast("encoder", new DefaultEncoder());
pipeline.AddLast("handler", new ServerMessageEntry());
}));
var boundChannel = await bootstrap.BindAsync(port);
Logger.Info($"NettyServer启动成功...{boundChannel}");
Console.ReadLine();
await boundChannel.CloseAsync();
Logger.Info($"NettyServer关闭监听了...{boundChannel}");
}
finally
{
await Task.WhenAll(
bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1))
);
Logger.Info($"NettyServer退出了...");
}
}
}
- 服务端管道最后我们添加了
ServerMessageEntry
,作为消息处理的入口
ServerMessageEntry 点击查看代码
public class ServerMessageEntry : ChannelHandlerAdapter
{
/// Netty处理器选择器
private readonly DefaultNettyHandlerSelector handlerSelector = new();
public ServerMessageEntry()
{
//注册Netty处理器
handlerSelector.RegisterHandlerTypes(typeof(EchoHandler).Assembly.GetTypes());
}
/// 通道激活
public override void ChannelActive(IChannelHandlerContext context)
{
Logger.Warn($"ChannelActive: {context.Channel}");
}
/// 通道关闭
public override void ChannelInactive(IChannelHandlerContext context)
{
Logger.Warn($"ChannelInactive: {context.Channel}");
}
/// 收到客户端的消息
public override async void ChannelRead(IChannelHandlerContext context, object message)
{
if (message is not NettyMessage nettyMessage)
{
Logger.Error("从客户端接收消息为空");
return;
}
try
{
Logger.Info($"收到客户端的消息: {nettyMessage}");
//封装请求
var nettyContext = new NettyContext(context.Channel, nettyMessage);
//选择处理器
AbstractNettyHandler handler = handlerSelector.SelectHandler(nettyContext);
//处理请求
await handler.ProcessAsync();
}
catch(Exception ex)
{
Logger.Error($"ServerMessageEntry.ChannelRead: {ex}");
}
}
}
-
按照约定, 把继承
AbstractNettyHandler
的类视为业务处理器 -
ServerMessageEntry
拿到消息后,首先把消息封装为NettyContext
, 类似与 MVC 中的 HttpContext, 封装了请求和响应对象, 内部解析请求的EndPoint
, 拆分为HandlerName
,ActionName
-
DefaultNettyHandlerSelector
提供注册处理器的方法RegisterHandlerTypes
, 和选择处理器的方法SelectHandler
-
SelectHandler
, 默认规则是查找已注册的处理器中以HandlerName
开头的类型 -
AbstractNettyHandler
的ProcessAsync
方法,通过ActionName
, 反射拿到MethodInfo
, 调用终结点
NettyClient 实现
NettyClient 点击查看代码
public sealed class NettyClient(string serverHost, int serverPort) : IDisposable
{
public EndPoint ServerEndPoint { get; } = new IPEndPoint(IPAddress.Parse(serverHost), serverPort);
private static readonly Bootstrap bootstrap = new();
private static readonly IEventLoopGroup eventLoopGroup = new SingleThreadEventLoop();
private bool _disposed;
private IChannel? _channel;
public bool IsConnected => _channel != null && _channel.Open;
public bool IsWritable => _channel != null && _channel.IsWritable;
static NettyClient()
{
bootstrap
.Group(eventLoopGroup)
.Channel()
.Option(ChannelOption.SoReuseaddr, true)
.Option(ChannelOption.SoReuseport, true)
.Handler(new ActionChannelInitializer(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
//pipeline.AddLast("ping", new IdleStateHandler(0, 5, 0));
pipeline.AddLast("framing-enc", new LengthFieldPrepender(4));
pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
pipeline.AddLast("decoder", new DefaultDecoder());
pipeline.AddLast("encoder", new DefaultEncoder());
pipeline.AddLast("handler", new ClientMessageEntry());
}));
}
/// 连接服务器
private async Task TryConnectAsync()
{
try
{
if (IsConnected) { return; }
_channel = await bootstrap.ConnectAsync(ServerEndPoint);
}
catch (Exception ex)
{
throw new Exception($"连接服务器失败 : {ServerEndPoint} {ex.Message}");
}
}
///
/// 发送消息
///
/// 终结点
/// 是否同步等待响应
/// 正文
public async Task SendAsync(string endpoint, bool sync = false, byte[]? body = null)
{
var message = NettyMessage.Create(endpoint, sync, body);
if (sync)
{
var task = ClientMessageSynchronizer.TryAdd(message);
try
{
await SendAsync(message);
await task;
}
catch
{
ClientMessageSynchronizer.TryRemove(message);
throw;
}
}
else
{
await SendAsync(message);
}
}
///
/// 发送消息
///
private async Task SendAsync(NettyMessage message)
{
await TryConnectAsync();
await _channel!.WriteAndFlushAsync(message);
}
/// 释放连接(程序员手动释放, 一般在代码使用using语句,或在finally里面Dispose)
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// 释放连接
private void Dispose(bool disposing)
{
if (_disposed)
{
return;
}
//释放托管资源,比如嵌套的对象
if (disposing)
{
}
//释放非托管资源
if (_channel != null)
{
_channel.CloseAsync();
_channel = null;
}
_disposed = true;
}
~NettyClient()
{
Dispose(true);
}
}
-
NettyClient
封装了 Netty 客户端逻辑,提供发送异步请求(默认)和发布同步请求方法 -
DotNetty
默认不提供同步请求,但是有些情况我们需要同步等待服务器的响应,所有需要自行实现,实现也很简单,把消息 ID 缓存起来,收到服务器响应后激活就行了,具体实现在消息同步器ClientMessageSynchronizer
, 就不贴了
总结
至此,我们实现了基于 DotNetty
搭建通信模块, 实现了客户端和服务器的编解码,处理器选择,客户端实现了同步消息等,大家可以在 ConsoleHost
结尾的控制台项目中,测试下同步和异步的消息,实现的简单的 Echo
模式
代码仓库
项目暂且就叫
OpenDeploy
吧
- OpenDeploy: https://gitee.com/broadm-dotnet/OpenDeploy
欢迎大家拍砖,Star
下一步
计划下一步,基于WPF
的客户端, 实现接口项目的配置与发现
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net