Wilder's Blog.

netty总结二

字数统计: 2.7k阅读时长: 11 min
2018/03/05 Share

Netty 总结二

我们来看一下之前的Hello World 服务端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class HelloServer {
Logger logger = LoggerFactory.getLogger(HelloServer.class);
private int port;
public HelloServer(int port){
this.port = port;
}

public void bind(){
//处理NIO事件的多线程循环器,一个用来接收新来的连接,一个用来处理已经被接收的连接
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
//启动NIO配置的服务类
try {
//启动NIO服务的配置类
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventLoopGroup, workerGroup)
//服务器端选择的Channel是ServerSocketChannel
.channel(NioServerSocketChannel.class)
//childHandler 在客户端执行的时候才会触发
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new HelloServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 1024);
ChannelFuture future = bootstrap.bind(port).sync();
//服务器监听端口,阻塞模式,直到服务器链路关闭之后main才关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
workerGroup.shutdownGracefully();
eventLoopGroup.shutdownGracefully();
}
}

服务端创建的时序图

服务端创建时序图

  • 步骤一:创建 ServerBootstrap 实例

    ​ ServerBootstrap 是 Netty 服务端的启动辅助类,提供了一些列方法用于设置服务器启动相关的参数。在创建 ServerBootstrap 时不需要任何的构造参数,那是因为构造参数太多了,如果使用传统的方法来将所有的参数填入,会不知道填进去的每一个代表什么意义。因此采用的时Builder 模式来进行构造,简单来说就是将构造参数变为相应的构造方法,一个一个参数来构造。 Builder模式

  • 步骤二:设置并绑定 Reactor 线程池

    ​ Netty 的 Reactor 线程池是EventLoopGroup ,它实际上是 EventLoop 的数组。EventLoop 的职责是处理所有注册到本线程多路复用器 Selector 上的 Channel , Selector 的轮询操作由绑定的 EventLoop 线程 run 方法驱动,在一个循环体内执行。EventLoop 的职责不仅仅是处理网络 IO 事件,用户自定义的 Task 和定时任务 Task 也统一由 EventLoop 负责处理。从调度层看,也不存在从EventLoop线程中再启动其它类型的线程用于异步执行另外的任务,这样避免了多线程并发操作和锁竞争。

    ​ 简单的就是说,一个Channel 对应一个EventLoop,而一个EventLoop 对应着多个Channel ,当一个Channel 绑定到这个EventLoop 上之后,这个Channel 之后的所有操作都将由这个EventLoop 对应的线程处理,不会有并发的问题产生。

  • 步骤三:设置并绑定服务端 Channel

    ​ 作为服务端需要创建的是 ServerSocketChannel ,Netty 对原生的NIO类库进行了封装实现了NioServerSocketChannel 。Netty 通过工厂类,利用反射创建NioServerSocketChannel 对象。

    1
    2
    3
    4
    5
    6
    7
    public ServerBootstrap channel(Class<? extends ServerChannel> channelClass){
    if(channelClass == null){
    throw new NullPointerException("channelClass");
    }
    //通过反射创建 NioServerSocketChannel 对象
    return channelFactory(new ServerBootstrapChannelFactory<ServerChannel>(channelClass));
    }
  • 步骤四:链路建立的时候创建并初始化 ChannelPipeline 。

    ​ ChannelPipeline 的本质就是负责处理网络事件的责任链,负责管理和执行 ChannelHandler 。

  • 步骤五:添加并设置ChannelHandler

    ​ ChannelHandler 是 Netty 提供给用户定制和扩展的关键接口,比较实用的系统ChannelHandler如下:

    (1)系统编解码框架 —— ByteToMessageCodec;

    (2)用于基于长度的半包解码器 —— LengthFieldBasedFrameDecoder;

    (3)码流日志打印 —— LoggingHandler

    (4)SSL安全认证Handler —— SslHandler

    (5)链路空间检测Handler —— IdleStateHandler

    (6)流量整型Handler —— ChannelTrafficShapingHandler

    (7)Base64编解码 —— Base64Decoder 和 Base64Encoder

  • 步骤六:绑定并启动监听端口

    ​ 在绑定监听端口之前系统会做一系列的初始化和检测工作,完成之后,会启动监听端口,并将ServerSocketChannel 注册到Selector 上监听客户端连接

    1
    2
    3
    protected void doBind(SocketAddress localAddress){
    javaChannel().socket().bind(localAddress , config.getBacklog());
    }
  • 步骤七:Selector 轮询。

    ​ 由Reactor线程NioEventLoop 负责调度和执行 Selector 轮询操作,选择准备就绪的Channel 集合,当轮询到准备就绪的Channel 之后,就由Reactor 线程NioEventLoop执行ChannelPipeline 的相应方法,最终调度并执行ChannelHandler。

源码分析

首先通过构造函数创建 ServerBootstrap ,随后创建两个 EventLoopGroup 实例,通过ServerBootstrap 的 group 方法将两个EventLoopGroup 实例传入

1
2
3
4
5
6
7
8
9
10
11
public ServerBootstrap group(EventLoopGroup parentGroup , EventLoopGroup childGroup){
super.group(parentGroup);
if(childGroup == null){
throw new NullPointerException("childGroup");
}else if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
} else {
this.childGroup = childGroup;
return this;
}
}

parentGroup 被传入了父类构造器中,也就是AbstractBootstrap的构造器

1
2
3
4
5
6
7
8
9
10
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
} else if (this.group != null) {
throw new IllegalStateException("group set already");
} else {
this.group = group;
return this;
}
}

线程组和线程类型设置完成后,需要设置服务端 Channel 用于端口监听和服务端链路接入。Netty 通过 Channel 工厂来创建不同类型的 Channel 。

1
2
3
4
5
6
7
8
public T newChannel (EventLoop eventLoop , EventLoopGroup childGroup){
try{
Constructor<? extends T> constructor = clazz.getConstructor(EventLoop.class , EventLoopGroup.class);
return constructor.newInstance(eventLoop , childGroup);
}catch(Throwable t){
throw new ChannelException("Unable to create Channel from class "+clazz,t);
}
}

指定NioServerSocketChannel 后,需要设置TCP的一些参数,服务端主要是设置TCP的backlog 参数

1
.option(ChannelOption.SO_BACKLOG, 1024);

​ backlog指定了内核为此套接口排队的最大连接个数,对于给定的监听套接口,内核需要维护两个队列:未连接队列和已连接队列,根据TCP三路握手过程中三个分节来分隔这两个队列。服务器处于 listen 状态时,收到客户端syn分节时在未完成队列中创建一个新的条目,然后用三路握手的第二个分节即服务器的 syn 相应客户端,此条目在第三个分节到达前(客户端对服务器syn的ack)一直保留在未完成连接队列中,如果三路握手完成,该条目将从未完成连接队列搬到已完成连接队列尾部。当进程调用 accept 时,从已完成队列中的头部取出一个条目给进程,当已完成队列为空时进程将睡眠,直到有条目在已完成队列中才唤醒。什么是syn?

​ backlog 被规定为两个队列总和的最大值,Netty 设置的默认backlog为100,用户可以修改默认值,这需要根据实际场景和网络状况进行灵活设置。

​ 服务端启动的最后一步,就是绑定本地端口,启动服务,源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = this.initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
} else if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
}

}
});
return promise;
}
}
  • 首先创建Channel,它有两个参数:参数1是从父类的NIO线程池中顺序获取一个 NioEventLoop ,它就是服务端用于监听和接收客户端连接的Reactor 线程;参数2是所谓的workerGroup 线程池,它就是处理 IO 读写的 Reactor 线程组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final ChannelFuture initAndRegister() {
Channel channel = null;

try {
channel = this.channelFactory.newChannel();
this.init(channel);
} catch (Throwable var3) {
if (channel != null) {
channel.unsafe().closeForcibly();
}

return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
}

//......
}
  • NioServerSocketChannel 创建完成之后,将对它进行初始化,主要有一下三点:

    • 设置 socket 参数和 NioServerSocketChannel 的附加属性
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    void init(Channel var1) throws Exception{
    final Map<ChannelOption<?> , Object> options = options();
    synchronized(options){
    channel.config().setOptions(options);
    }

    final Map<AttributeKey<?> , Object> attrs = attrs();
    synchronized(attrs){
    for(Entry<AttributeKey<?> , Object> e : attrs.entrySet()){
    AttributeKey<Object> key = e.getKey();
    channel.attr(key).set(e.getValue());
    }
    }
    }
    • 将 AbstractBootstrap 的 Handler 添加到 NioServerSocketChannel 的 ChannelPipeline 中
    • 将用于服务端注册的 Handler ServerBootstrapAcceptor 添加到 ChannelPipeline 中

    到此服务端监听的相关资源已经初始化完毕

  • 最后一步,注册 NioServerSocektChannel 到 Reactor 线程的多路复用器上,然后轮询客户端连接事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 我们可以看到这段代码的含义:首先判断是否为 NieEventLoop 自身发起的操作。如果是则不存在并发操作,直接执行 Channel 注册;如果由其它线程发起,则封装成
* 一个Task 放入消息队列中异步执行
*/
public final void register(final ChannelPromise promise){
if(eventLoop.inEventLoop()){
register0(promise);
}else{
try{
eventLoop.execute(new Runnable(){
public void run(){
register0(promise);
}
});
}catch(Throwable t){
//......
}
}
}

接下来我们看一下 register0 ( ChannelPromise promise )的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void register0(ChannelPromise promise){
try{
if(!ensureOpen(promise))
return;
doRegister();
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
if(isActive())
pipeline.fireChannelActive();
}catch(Throwable t){
//......
}
}

​ 将NioServerSocketChannel 注册到 NioEventLoop 的 Selector 上:

1
2
3
4
5
6
7
protected void doRegister() throws Exception{
boolean selected = false;
for(;;){
selectionKey = javaChannel().register(eventLoop().selector , 0 , this);
//......
}
}

​ 我们可以看到注册了 0 到多路复用器上,不监听任何网络操作。这样是有原因的:

​ (1)注册方法是多态的,它既可以被 NioServerSocketChannel 用来监听客户端的连接接入,也可以注册 SocketChannel 用来监听网络读写操作

​ (2)通过SelectionKey 的 interestOps(int Ops)方法可以方便修改监听操作位。所以才需要返回selectionKey

​ 注册成功之后,出发 ChannelRegistered 事件

1
2
promise.setSuccess();
pipeline.fireChannelRegistered();

​ ChannelRegistered 事件传递完成后,判断 ServerSocketChannel 监听是否成功,如果成功需要触发 NioServerSocketChannel 的 ChannelActive 事件。isActive() 方法也是多态的。如果是服务端,判断监听是否启动;如果是客户端,判断TCP连接是否完成。ChannelActive 事件在 ChannelPipeline 中传递,完成之后根据配置决定是否自动出发 Channel 的读操作

1
2
3
4
5
6
7
public ChannelPipeline fireChannelActive(){
head.fireChannelActive();
if(channel.config().isAutoRead()){
channel.read();
}
return this;
}

​ AbstractChannel 的读操作触发 ChannelPipeline 的读操作,最终调用到HeadHandler 的读方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void read(ChannelHandlerContext ctx){
unsafe.beginRead();
}

public void beginRead(){
if(!isActive()){
return ;
}
try{
doBeginRead();
//......
}
}

​ 对于不同类型的 Channel 对读操作的准备工作不同,因此,beginRead 也是个多态方法,对于NIO通信,无论是客户端还是服务端,都是要修改网络监听操作位为自身感兴趣的,对于 NioServerSocketChannel 感兴趣的操作是 OP_ACCEPT(16) ,于是要修改注册操作位

1
2
3
4
5
6
7
8
protected void doBeginRead() throws Exception{
//......
final int interestOps = selectionKey.interestOps();
//只有当监听的操作类型和 Channel 关心的网络事件不一致的时候才需要重新注册操作位,所以增加了&操作的判断
if((interestOps & readInterestOps) == 0 ){
selectionKey.interestOps(interestOps | readInterestOps);
}
}

​ JDK SelectionKey 有4种操作类型:

(1)OP_READ = 1<<0;

(2)OP_WRITE = 1<<2;

(3)OP_CONNECT = 1<<3;

(4)OP_ACCEPT = 1<<4

​ 最后在服务器链路注册成功之后重新将操作位设置为监听客户端的网络连接操作,初始化 NioServerSocketChannel 的代码如下

1
2
3
4
public NioServerSocketChannel(EventLoop eventLoop , EventLoopGroup childGroup){
super(null , eventLoop , childGroup , newSocket() , SelectionKey.OP_ACCEPT);
config = new DefaultServerSocketChannelConfig(this , javaChannel().socket());
}

搞定!!!

CATALOG
  1. 1. Netty 总结二
    1. 1.1. 服务端创建的时序图
    2. 1.2. 源码分析