前言
本篇博文是《从0到1学习 Netty》中入门系列的第五篇博文,主要内容是介绍 Netty 中 Pipeline 与 ChannelHandler 的概念和作用,通过源码分析和应用案例进行详细讲解,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;
Pipeline
在 Netty 中,pipeline
是一种机制,它由一系列的 ChannelHandler
组成。pipeline
负责处理进入或离开 Channel
的数据,并且将事件(比如连接建立、数据读取等)转发给正确的 handler
进行处理。
handler
是 pipeline
的节点,每个 handler
会接收来自前一个 handler
的处理结果,并进行自己的处理。然后,它将处理结果传递给下一个 handler
,直到最终达到 pipeline
的尾部。pipeline
的头部和尾部都是特殊的 handler
,头部负责处理 Inbound
操作,尾部则负责处理 Outbound
操作。
接下来进行具体操作:
1、通过 channel
获取 pipeline
:
ChannelPipeline pipeline = ch.pipeline();
2、添加处理器:
pipeline.addLast(){
...
};
3、服务端代码如下,完整代码见 TestPipeline.java:
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("Inbound1");
super.channelRead(ctx, msg);
}
});
// h2
// h3
...
pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("Outbound4");
super.write(ctx, msg, promise);
}
});
// h5
// h6
...
}
})
4、客户端在博文 ChannelFuture 与 CloseFuture 中有详细讲解,完整代码见 CloseFutureClient.java:
.handler(new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
5、服务端运行结果:
6、客户端运行结果:
看到服务端的运行结果,可能有小伙伴会感到疑惑,按照 handler 的添加顺序,运行结果不应该是 In1 -> In2 -> In3 -> Out4 -> Out5 -> Out6
吗?
这是因为,当 Inbound
操作发生时,Pipeline 会从头部开始向后调用 Handler,直到找到能够处理该操作的 Handler。一旦找到,该 Handler 将处理数据并将其传递给下一个 Handler,直到达到尾部为止。
同样地,当 Outbound
操作发生时,Pipeline 会从尾部开始向前调用 Handler,直到找到能够处理该操作的 Handler。一旦找到,该 Handler 将处理数据并将其传递给上一个 Handler,直到达到头部为止。
ChannelHandler
在 Netty 中,ChannelHandler
是处理 IO 事件的最基本组件之一。ChannelHandler
位于 Netty 的核心位置,并负责接收入站事件 Inbound 和转发出站事件 Outbound。
具体而言,ChannelHandler
主要有两个作用:
- 处理各种类型的 IO 事件,包括连接建立、连接关闭、数据读写等。
- 实现业务逻辑,对网络数据进行处理,例如编解码、协议解析、消息过滤、消息转发等。
Inbound
Inbound 是一种 ChannelHandler 的类型,它主要用于处理从网络接收到的数据。具体来说,当数据到达 Netty 应用程序的网络层时,Inbound 处理程序会被触发并开始处理这些数据。
Inbound 处理程序通常会执行以下操作:
- 解码:将二进制数据转换为 Java 对象。
- 验证:确保数据格式正确以及发送方有权进行操作。
- 处理:执行实际的业务逻辑,可能包括修改状态、创建响应等。
- 转发:将处理后的数据传递给下一个处理程序或写回到网络中。
在处理完所有 Inbound 处理程序之后,Netty 应用程序通常会将处理结果传递给 Outbound 处理程序,让其对数据进行编码、加密等操作,并发送回网络。
举个例子,接下来将使用三个 Inbound,第一个 handler 用于接收 name
属性,第二个 handler 用于生成 Person
类,第三个 handler 返回该结果,代码如下:
pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("Inbound1, value: {}, class: {}", msg, msg.getClass());
ByteBuf buf = (ByteBuf) msg;
String name = buf.toString(Charset.defaultCharset());
super.channelRead(ctx, name);
}
});
pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
log.debug("Inbound2, value: {}, class: {}", name, name.getClass());
Person person = new Person(name.toString());
super.channelRead(ctx, person);
}
});
pipeline.addLast("h3", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("Inbound3, value: {}, class: {}", msg, msg.getClass());
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});
运行结果:
在上述代码中,super.channelRead(ctx, msg)
方法用于将接收到的消息传递给下一个 ChannelInboundHandler 处理器进行处理,实现了消息在处理链条中的流转。
super.channelRead(ctx, msg)
源码如下:
@Skip
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
通过 super.channelRead(ctx, msg)
的源码可以获知,在该方法中,通过调用 ctx.fireChannelRead(msg)
将数据传递给下一个 ChannelInboundHandler,从而实现事件的传播。
而 fireChannelRead
是在 ChannelHandlerContext 接口中定义的,默认实现是在当前 ChannelHandlerContext 中查找与 MASK_CHANNEL_READ
相应类型的 ChannelInboundHandler,并将数据传递给它的 channelRead
方法。这个方法返回当前的 ChannelHandlerContext 对象,可以链式调用其他方法。其中,invokeChannelRead()
方法在博文 从0到1(七):入门-EventLoop 进行过详细讲解。
Outbound
Outbound 是一种 ChannelHandler 的类型,它主要用于处理将数据发送到网络的操作。具体来说,当应用程序需要向网络发送数据时,会触发 Outbound 处理程序,并让其对数据进行编码、加密等处理后再发送出去。
Outbound 处理程序通常会执行以下操作:
- 编码:将 Java 对象转换为二进制数据。
- 加密:对数据进行加密以保证安全性。
- 写入:将处理后的数据写入网络中发送出去。
在处理完所有 Outbound 处理程序之后,Netty 应用程序通常会将数据传递给底层的传输层(如 TCP)并发送到远程端点。
这里需要注意的是,socketChannel.writeAndFlush()
和 ctx.writeAndFlush()
这两个方法,两个方法的作用都是写入数据并刷新缓冲区,但还是有所不同的,接下来通过实例进行讲解,为了效果更加明显,将原先代码中的 h3 与 h4 互换:
pipeline.addLast("h3", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("Outbound4");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h4", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("Inbound3, value: {}, class: {}", msg, msg.getClass());
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});
使用 socketChannel.writeAndFlush()
的运行结果:
使用 ctx.writeAndFlush()
的运行结果:
可以发现,socketChannel.writeAndFlush()
的运行结果包含了三个 Outbound,但是 ctx.writeAndFlush()
的运行结果只有一个 Outbound4,这是为什么呢?接下来我们通过源码进一步分析。
socketChannel.writeAndFlush()
的源码如下:
@Override
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
@Override
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
ctx.writeAndFlush()
的源码如下:
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
从源码中可以看出,socketChannel.writeAndFlush()
是从尾部开始向前寻找 Outbound,而 ctx.writeAndFlush()
则是从当前位置开始向前寻找 Outbound。
因此,socketChannel.writeAndFlush()
的运行结果包含了三个 Outbound,而 ctx.writeAndFlush()
的运行结果只有一个 Outbound4。
EmbeddedChannel
EmbeddedChannel
是 Netty 提供的工具类,用于在单元测试中模拟 Netty Channel 的行为。它可以被用于测试 ChannelHandler、ChannelPipeline 等模块。
通常来说,在使用 Netty 进行网络编程的时候,我们需要连接远程服务器或者监听本地端口以接收请求。这个过程需要真实的网络环境,即需要实际建立连接和发送数据,这样会增加测试的复杂性和不稳定性。而使用 EmbeddedChannel
,我们可以通过将 ChannelHandler 添加到 EmbeddedChannel
对象上,来模拟整个请求/响应的流程,从而达到测试的目的,避免了对网络环境的依赖。
另外,使用 EmbeddedChannel
还可以轻松测试多个 ChannelHandler 之间的协作情况。例如,在进行 WebSocket 消息处理的时候,我们可能需要多个 ChannelHandler 协同工作才能完成消息的解析和转发,此时使用 EmbeddedChannel
就非常方便。
测试代码如下,完整代码见 TestEmbeddedChannel.java:
public class TestEmbeddedChannel {
public static void main(String[] args) {
...
// 用于测试Handler的Channel
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
// 执行Inbound操作
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
// 执行Outbound操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
}
}
运行结果:
1
2
4
3
后记
Pipeline 提供了一种灵活而高效的方式来处理传入和传出的数据流,而 Inbound 和 Outbound 作为两个关键组成部分,负责处理数据的接收和发送。同时,EmbeddedChannel 作为一个内嵌的通道处理器,使得我们可以方便地进行单元测试和模拟网络环境。
总之,Pipeline 与 ChannelHandler 是构建高效网络应用程序的核心元素,它们的设计和使用将极大地简化和提升我们在网络编程中的工作效率。
以上就是 掌握 Pipeline 和 ChannelHandler:构建高效网络应用程序的关键 的所有内容了,希望本篇博文对大家有所帮助!
参考:
- Netty API reference;
- 黑马程序员Netty全套教程 ;
📝 上篇精讲:「萌新入门」(四)异步编程模型:利用 Future 和 Promise 提高性能与响应能力
💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注,创作不易,请多多支持;
👍 公众号:sidiot的技术驿站;
🔥 系列专栏:探索 Netty:源码解析与应用案例分享
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net