Wilder's Blog.

netty总结三

字数统计: 1.7k阅读时长: 7 min
2018/03/10 Share

Netty 总结三

我们来看一下客户端代码,虽然和服务器端有点相似,但是原理还是有区别的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class HelloClient {
Logger logger = LoggerFactory.getLogger(HelloClient.class);

/**
* 连接服务器端
* @param ip
* @param port
*/
public void connect(String ip , int port){
EventLoopGroup clientGroup = new NioEventLoopGroup();
//客户端使用BootStrap
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE , true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS , 3000)
//handler 方法在初始化时就会执行
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())
.addLast(new ClientHandler());
}
});
ChannelFuture future = bootstrap.connect(ip, port).sync();

future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
clientGroup.shutdownGracefully();
}
}

Netty 客户端创建时序图

这里写图片描述

  • 步骤1:用户线程创建 BootStrap 实例,设置创建客户端相关参数,异步发起客户端连接
  • 步骤2:创建处理客户端连接、I/O 读写的 Reactor 线程组 NioEventLoopGroup
  • 步骤3:通过 BootStrap 的 ChannelFactory 和用户指定的 Channel 类型创建用于客户端连接的 NioSocketChannel
  • 步骤4:创建默认的 ChannelHandlerPipeline ,用于调度和执行网络事件
  • 步骤5:异步发起 TCP 连接,判断连接是否成功。如果成功则直接将 NioSocketChannel 注册到多路复用器上,监听读操作位,用于数据报读取和消息发送;如果没有立即连接成功则注册连接监听到多路复用器,等待连接结果
  • 步骤6:注册对应的网络监听状态到多路复用器
  • 步骤7:由多路复用器在I/O现场中轮询各 Channel ,处理连接结果
  • 步骤8:如果连接成功,设置 Future 结果,发送连接成功事件,触发 ChannelPipeline 执行
  • 步骤9:由 ChannelPipeline 调度执行系统和用户的 ChannelHandler ,执行业务逻辑

源码分析

客户端连接辅助类 BootStrap

​ 在NIO模式中一个线程可以处理多个 TCP 连接。客户端相对于服务器来说,只需要创建一个处理 I/O 读写的线程组即可:

1
2
3
4
5
6
7
8
9
10
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
} else if (this.group != null) {
throw new IllegalStateException("group set already");
} else {
this.group = group;
return this;
}
}

​ TCP 参数设置接口:创建客户端套接字的时候通常都会设置连接参数。BootStrap 也提供了 TCP 参数设置接口,通过 option 方法进行连接

各种 TCP 参数极其含义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public <T> B option (ChannelOption<T> option , T value){
if(option == null){
throw new NullPointerException("option");
}else {
Map var3;
if (value == null){
var3 = this.options; //options是一个LinkedMap
synchronized(this.options){
this.option.remove(option);
}
} else {
var3 = this.options;
synchronized(this.options){
this.options.put(option , value);
}
}
return this;
}
}

​ 我们可以看到,在设置参数的时候采用了Map,将各个参数一一对应

​ channel 接口:用于指定客户端使用的 Channel 接口,对于 TCP 客户端,默认使用 NioSocketChannel 。和服务端一样,使用ChannelFactory通过反射来创建 channel 对象:

1
2
3
4
5
6
7
public BootStrap channel(Class<? extends Channel> channelClass) {
if (channelClass == null){
throw new NullPointerException("channelClass");
} else {
return this.channelFactory(new BootstrapChannelFactory<Channel>(channelClass));
}
}

​ 设置 Handler 接口:BootStrap 提供了 ChannelInitializer , 先调用 channelRegister 接口注册链路,当 TCP 链路注册成功之后,调用 initChannel 接口,用于设置用户 ChannelHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final void channelRegistered (ChannelHandlerContext ctx) throws Exception {
ChannelPipeline pipeline = ctx.pipeline();
boolean success = false;
try{
initChannel ((C) ctx.channel());
pipeline.remove(this);
ctx.fireChannelRegistered();
success = true ;

//......
}
}

.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())
.addLast(new ClientHandler());
}
});

客户端连接操作

  • 创建和初始化 NioSocketChannel
1
2
3
4
5
6
7
8
private ChannelFuture doConnect (final SocketAddress remoteAddress , final SocketAddress localAddress){
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null){
return regFuture;
}
//...........
}
  • 从 NioEventLoopGroup 中获取 NioEventLoop , 然后使用其作为参数创建 NioSocketChannel
1
2
3
4
Channel createChannel() {
EventLoop eventLoop = group().next();
return channelFactory().newChannel(eventLoop);
}
  • 初始化 Channel 之后,将其注册到 Selector 上(也就是注册到 EventLoop 上,之前说过一个 EventLooo 对应多个Channel , 而一个 Channel 对应一个EventLoop )
1
2
ChannelPromise regFuture = channel.newPromise();
channel.unsafe().register(regFuture);
  • 链路创建成功之后,发起异步的 TCP 连接,从连接开始,操作就切换到了channel 对应的EventLoop 线程进行了,也就是异步操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static void doConnect0 (final ChannelFuture regFuture , final Channel channel , 
final SocketAddress remoteAddress , final SocketAddress localAddress , final ChannelPromise promise){
channel.eventLoop().execute(new Runnable(){
@Override
public void run(){
if (regFuture.isSuccess()) {
if (localAddress == null){
channel.connect(remoteAddress , promise);
} else {
channel.connect(remoteAddress , localAddress , promise);
}
//......
}
}
})
}

​ doConnect0 最终调用 HeadHandler 的connect 方法:

1
2
3
4
public void connect (ChannelHandlerContext ctx , SocketAddress remoteAddress , SocketAddress localAddress , 
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress , localAddress , promise);
}

​ AbstractNioUnsafe 的 connect 操作如下:

1
2
3
4
5
if (doConnect(remoteAddress , localAddress)){
fulfillConnectPromise (promise , wasActive);
}else{
//......
}

​ 执行 connect() 操作后有三种结果:

(1)连接成功,返回 true

(2)暂时没有连接上,服务端没有返回 ACK 应答,连接结果不确定,返回 False

(3)连接失败,直接抛出 I/O 异常

​ 注意:如果是第二种结果,需要将 NioSocketChannel 中的 selectionKey 设置为 OP_CONNECT ,监听连接结果

  • 异步连接返回之后,需要判断连接结果:

    • 连接成功,出发 ChannelActive 事件
    1
    2
    3
    if(!wasActive && isActice()){
    pipeline().fireChannelActive(); //最终将 NioSocketChannel 中的selectionKey 设置为 SelectionKey.OP_READ,用于监听网络读操作
    }
    • 没有立即连接上,则注册 SelectionKey.OP_CONNECT 到多路复用器
    1
    2
    3
    4
    5
    6
    7
    8
    9
    boolean success = false ; 
    try {
    boolean connected = javaChannel().connect(remoteAddress);
    if (!connected){
    selectionKey().interestOps(SelectionKey.OP_CONNECT)l
    }
    success = true ;
    return connected ;
    }

    • 连接发生异常,则关闭链路,进入连接失败处理流程
    1
    2
    3
    4
    5
    finally{
    if (!success){
    doClose();
    }
    }

异步连接结果通知

​ NioEventLoop 的 Selector 轮询客户端连接 Channel , 当服务端返回握手应答之后,对连接结果进行判断

1
2
3
4
5
6
7
8
9
10
11
12
13
if ((readOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}

//以下是finishConnect 的代码
try{
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise , wasActive);
}

​ doFinishConnect 用于判断 JDK 的 SocketChannel 的连接结果,如果返回 true 表示连接成功,其他值或者发生异常表示连接失败

1
2
3
4
5
protected void doFinishConnect() throws Exception {
if (!javaChannel().finishConnect()){
throw new Error();
}
}

​ 连接成功之后,调用 fulfillConnectPromise 方法,触发链路激活事件,该事件由 ChannelPipeline 进行传播:

1
2
3
4
5
6
7
8
9
private void fulfillConnectPromise (ChannelPromise promise , boolean wasActive){
boolean promiseSet = promise.trySuccess();
if (!wasActive && isActive()){
pipeline().fireChannelActive();
}
if(!promiseSet){
close(voidPromise());
}
}

客户端连接超时机制

  • 用户在创建 Netty 客户端的时候,可以通过 ChannelOption.CONNECT_TIMEOUT_MILLIS 配置项设置连接超时时间
1
2
3
4
bootstrap.group(clientGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE , true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS , 3000)
  • 发起连接的同时,启动连接超时检测定时器
  • 如果在连接超时之前获取到结果,则删除连接超时定时器,防止其被触发
1
2
3
4
5
6
7
8
9
public void finishConnect(){
}finally{
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
//......
}

大致的连接流程就是这样子了~参考了《Netty 权威指南第二版》

CATALOG
  1. 1. Netty 总结三
    1. 1.1. Netty 客户端创建时序图
    2. 1.2. 源码分析
      1. 1.2.1. 客户端连接辅助类 BootStrap
      2. 1.2.2. 客户端连接操作
      3. 1.2.3. 异步连接结果通知
      4. 1.2.4. 客户端连接超时机制