Netty 总结三
我们来看一下客户端代码,虽然和服务器端有点相似,但是原理还是有区别的
1 | public class HelloClient { |
Netty 客户端创建时序图
- 步骤1:用户线程创建 BootStrap 实例,设置创建客户端相关参数,异步发起客户端连接
- 步骤2:创建处理客户端连接、I/O 读写的 Reactor 线程组 NioEventLoopGroup
- 步骤3:通过 BootStrap 的 ChannelFactory 和用户指定的 Channel 类型创建用于客户端连接的 NioSocketChannel
- 步骤4:创建默认的 ChannelHandlerPipeline ,用于调度和执行网络事件
- 步骤5:异步发起 TCP 连接,判断连接是否成功。如果成功则直接将 NioSocketChannel 注册到多路复用器上,监听读操作位,用于数据报读取和消息发送;如果没有立即连接成功则注册连接监听到多路复用器,等待连接结果
- 步骤6:注册对应的网络监听状态到多路复用器
- 步骤7:由多路复用器在I/O现场中轮询各 Channel ,处理连接结果
- 步骤8:如果连接成功,设置 Future 结果,发送连接成功事件,触发 ChannelPipeline 执行
- 步骤9:由 ChannelPipeline 调度执行系统和用户的 ChannelHandler ,执行业务逻辑
源码分析
客户端连接辅助类 BootStrap
在NIO模式中一个线程可以处理多个 TCP 连接。客户端相对于服务器来说,只需要创建一个处理 I/O 读写的线程组即可:
1 | public B group(EventLoopGroup group) { |
TCP 参数设置接口:创建客户端套接字的时候通常都会设置连接参数。BootStrap 也提供了 TCP 参数设置接口,通过 option 方法进行连接
1 | public <T> B option (ChannelOption<T> option , T value){ |
我们可以看到,在设置参数的时候采用了Map,将各个参数一一对应
channel 接口:用于指定客户端使用的 Channel 接口,对于 TCP 客户端,默认使用 NioSocketChannel 。和服务端一样,使用ChannelFactory通过反射来创建 channel 对象:
1 | public BootStrap channel(Class<? extends Channel> channelClass) { |
设置 Handler 接口:BootStrap 提供了 ChannelInitializer , 先调用 channelRegister 接口注册链路,当 TCP 链路注册成功之后,调用 initChannel 接口,用于设置用户 ChannelHandler:
1 | public final void channelRegistered (ChannelHandlerContext ctx) throws Exception { |
客户端连接操作
- 创建和初始化 NioSocketChannel
1 | private ChannelFuture doConnect (final SocketAddress remoteAddress , final SocketAddress localAddress){ |
- 从 NioEventLoopGroup 中获取 NioEventLoop , 然后使用其作为参数创建 NioSocketChannel
1 | Channel createChannel() { |
- 初始化 Channel 之后,将其注册到 Selector 上(也就是注册到 EventLoop 上,之前说过一个 EventLooo 对应多个Channel , 而一个 Channel 对应一个EventLoop )
1 | ChannelPromise regFuture = channel.newPromise(); |
- 链路创建成功之后,发起异步的 TCP 连接,从连接开始,操作就切换到了channel 对应的EventLoop 线程进行了,也就是异步操作:
1 | private static void doConnect0 (final ChannelFuture regFuture , final Channel channel , |
doConnect0 最终调用 HeadHandler 的connect 方法:
1 | public void connect (ChannelHandlerContext ctx , SocketAddress remoteAddress , SocketAddress localAddress , |
AbstractNioUnsafe 的 connect 操作如下:
1 | if (doConnect(remoteAddress , localAddress)){ |
执行 connect() 操作后有三种结果:
(1)连接成功,返回 true
(2)暂时没有连接上,服务端没有返回 ACK 应答,连接结果不确定,返回 False
(3)连接失败,直接抛出 I/O 异常
注意:如果是第二种结果,需要将 NioSocketChannel 中的 selectionKey 设置为 OP_CONNECT ,监听连接结果
异步连接返回之后,需要判断连接结果:
- 连接成功,出发 ChannelActive 事件
1
2
3if(!wasActive && isActice()){
pipeline().fireChannelActive(); //最终将 NioSocketChannel 中的selectionKey 设置为 SelectionKey.OP_READ,用于监听网络读操作
}- 没有立即连接上,则注册 SelectionKey.OP_CONNECT 到多路复用器
1
2
3
4
5
6
7
8
9boolean success = false ;
try {
boolean connected = javaChannel().connect(remoteAddress);
if (!connected){
selectionKey().interestOps(SelectionKey.OP_CONNECT)l
}
success = true ;
return connected ;
}
- 连接发生异常,则关闭链路,进入连接失败处理流程
1
2
3
4
5finally{
if (!success){
doClose();
}
}
异步连接结果通知
NioEventLoop 的 Selector 轮询客户端连接 Channel , 当服务端返回握手应答之后,对连接结果进行判断
1 | if ((readOps & SelectionKey.OP_CONNECT) != 0) { |
doFinishConnect 用于判断 JDK 的 SocketChannel 的连接结果,如果返回 true 表示连接成功,其他值或者发生异常表示连接失败
1 | protected void doFinishConnect() throws Exception { |
连接成功之后,调用 fulfillConnectPromise 方法,触发链路激活事件,该事件由 ChannelPipeline 进行传播:
1 | private void fulfillConnectPromise (ChannelPromise promise , boolean wasActive){ |
客户端连接超时机制
- 用户在创建 Netty 客户端的时候,可以通过 ChannelOption.CONNECT_TIMEOUT_MILLIS 配置项设置连接超时时间
1 | bootstrap.group(clientGroup) |
- 发起连接的同时,启动连接超时检测定时器
- 如果在连接超时之前获取到结果,则删除连接超时定时器,防止其被触发
1 | public void finishConnect(){ |
大致的连接流程就是这样子了~参考了《Netty 权威指南第二版》