Wilder's Blog.

netty 总结一

字数统计: 2.1k阅读时长: 8 min
2018/02/18 Share

Netty 总结一

我们都知道,传统的IO流(BIO)因为是阻塞的,导致性能低下,特别是当多个线程同时处理的时候会导致线程的阻塞,使得服务器的效率大大降低。非阻塞IO流(NIO)相对于传统IO流来说效率上有相应的提升,但是实现步骤太多,我们来看看原生NIO的实现需要多少步骤:

传统NIO执行步骤

​ 我们可以看到一个简单接收就需要如此复杂的步骤。

Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。下面开门见山的看一看Hello World!

​ 下面代码实现了客户端向服务器发送日期,服务器返回 Hello World在客户端显示:

1、pom.xml 增加依赖

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.15.Final</version>
</dependency>

2、服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
* @author:Wilder Gao
* @time:2018/2/18
* @Discription:使用netty第一步,hello World篇
*/
public class HelloServer {
Logger logger = LoggerFactory.getLogger(HelloServer.class);
private int port;
public HelloServer(int port){
this.port = port;
}

public void bind(){
//处理NIO事件的多线程循环器,一个用来接收新来的连接,一个用来处理已经被接收的连接
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
//启动NIO配置的服务类
try {
//启动NIO服务的配置类
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventLoopGroup, workerGroup)
//服务器端选择的Channel是ServerSocketChannel
.channel(NioServerSocketChannel.class)
//childHandler 在客户端执行的时候才会触发
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new HelloServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 1024);
ChannelFuture future = bootstrap.bind(port).sync();
//服务器监听端口,阻塞模式,直到服务器链路关闭之后main才关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
workerGroup.shutdownGracefully();
eventLoopGroup.shutdownGracefully();
}
}

private class HelloServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//============以下是接收到消息的逻辑处理===============
logger.info("接收到客户端发来的消息");
//接收的消息也是ByteBuf 格式,将这种格式转换为字符串
ByteBuf byteBuf = (ByteBuf) msg;
byte[] result = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(result);
String receiveString = new String(result,"utf-8");
logger.info("内容为"+receiveString);
//释放资源
byteBuf.release();

//===========以下是返回消息的逻辑处理===============
String response = "Hello world";
//发送消息时一定要将它打包成ByteBuf 格式
ByteBuf encode = ctx.alloc().buffer(4 * response.length());
encode.writeBytes(response.getBytes());
ctx.writeAndFlush(encode);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("===========出现异常========");
cause.printStackTrace();
ctx.close();
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}

public static void main(String[] args) {
new HelloServer(8080).bind();
}
}

3、客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
* @author:Wilder Gao
* @time:2018/2/18
* @Discription:hello world服务器端
*/
public class HelloClient {
Logger logger = LoggerFactory.getLogger(HelloClient.class);

/**
* 连接服务器端
* @param ip
* @param port
*/
public void connect(String ip , int port){
//客户端可以只使用一个EventLoopGroup , 用来进行对服务器的连接、发送和接收信息
EventLoopGroup clientGroup = new NioEventLoopGroup();
//客户端使用BootStrap
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientGroup)
//服务器端选择的Channel为SocketChannel
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE , true)
//handler 方法在初始化时就会执行
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())
.addLast(new ClientHandler());
}
});
ChannelFuture future = bootstrap.connect(ip, port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
clientGroup.shutdownGracefully();
}
}

private class ClientHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
byte[] result = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(result);
String receiveString = new String(result,"utf-8");
logger.info("=======服务器端返回内容来了======\n"+receiveString);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Date date = new Date();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
//连接成功之后向服务器发送消息
String message = format.format(date);
ByteBuf encode = ctx.alloc().buffer(4 * message.length());
encode.writeBytes(message.getBytes());
ctx.writeAndFlush(encode);
}

//产生异常的处理情况
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//关闭Channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
}

public static void main(String[] args) {
new HelloClient().connect("127.0.0.1",8080);
}
}

4、核心组件介绍

EventLoop

​ EventLoop 定义了处理在连接过程中发生的事件的核心抽象。一个EventLoopGroup 包含一个或多个EventLoop ,并且EventLoop 提供了一种迭代用于检索清单中的下一个EventLoop。每个Channel 都会分配到一个EventLoop ,用于处理该Channel 所有的事件。

  • 一个Channel 使用EventLoop 进行注册 , 而一个EventLoop 会对应多个Channel
  • 一个EventLoop 在生命周期中将会被绑定到一个专有的Thread 上,这个专有的线程处理该EventLoop 所有的事件

EventLoop

Channel

​ 基础的IO操作,比如 read、bind、connect 都依赖于底层网络传输所提供的原语,在Java 网络编程中,这些都依赖于 “Socket” ,而Netty 极大简化了直接与Socket 进行操作的复杂性。我对Channel 的理解就是将它看成是一个连接的载体,基础的读写操作将由Channel 建立起连接。

​ 此外,Channel 还是很多类的父类 EmbeddedChannel、LocalServerChannel、NioDatagramChannel、NioSctpChannel、NioSocketChannel等。

ChannelFuture

​ Netty 中所有的IO操作都是异步的,不会立即返回,因此提供了ChannelFuture ,其addListener 方法可以注册一个ChannelFutureListener ,当操作完成时,不管成功还是失败均会被通知。在并发编程Executor 框架中,也有类似于ChannelFuture 的 Future ,通过它的get() 方法可以得知该线程执行任务的结果。

ChannelHandler

ChannelHandler 可以说是最重要的组件,处理进站和出站数据的用户逻辑都将存放在这里。常用的使用方法就是继承ChannelHandler 的子接口 ChannelInboundHandler ,ChannelFuture 可以用于几乎任何类型的操作,如发送数据、处理抛出的异常等。

ChannelPipeline

​ ChannelPipeline为ChannelHandler链提供了一个容器并定义了用于沿着链传播入站和出站事件流的API。当创建Channel时,会自动创建一个附属的ChannelPipeline。ChannelHandlers按照如下步骤安装在ChannelPipeline中。

  • ChannelInitializer 通过BootStrap 进行注册
  • 当调用 initChannel() 方法时,ChannelInitializer 会向管道中注册一组自定义的ChannelHandler
  • 当操作完成时,ChannelInitializer 将会从管道中自动删除

ChannelHandler可被当做放置任何代码的容器,用于处理到达并通过ChannelPipeline的事件或者数据,数据可以沿着处理链进行传递。

img

​ 当事件从客户端移动至服务端时称为出站,反之称为入站。并且入站处理器和出站处理器可共存于同一个管道中,当读取入站数据或事件时,将会从管道的头部开始传递到第一个入站处理器,然后传递至下一个处理器直至管道的尾部,此时数据处理结束。当出站时,沿着处理链直到管道的头,然后进行网络传输。

5、TCP粘包问题

​ TCP 是个 “流” 协议,所谓流就是没有界限的一串数据,期间并没有分界线。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆成多个包进行发送,多个小的包也可能被封装成一个大的数据包进行发送,这就是所谓的TCP 粘包问题。

​ Netty 解决TCP 的粘包问题尤为简单,有多种已经实现好的ChannelHandler 为我们解决粘包问题,使用的时候只要在ChannelPipeline 管道中加入相应的解码器即可,我们来看看三种常用的解码器:

  • LineBasedFrameDecoder 解码器,按照 \n、\r\n 进行解码
1
2
3
4
5
6
7
8
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())
.addLast(new ClientHandler());
}
});
  • DelimiterBasedFrameDecoder 分隔符解码器,自己设定要用什么符号来划分
1
2
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));//1024为单条消息的最大长度
  • FixedLengthFrameDecoder 定长解码器,按照固定的长度进行解码
1
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(200));
CATALOG
  1. 1. Netty 总结一
    1. 1.1. 1、pom.xml 增加依赖
    2. 1.2. 2、服务器端
    3. 1.3. 3、客户端
    4. 1.4. 4、核心组件介绍
      1. 1.4.1. EventLoop
      2. 1.4.2. Channel
      3. 1.4.3. ChannelFuture
      4. 1.4.4. ChannelHandler
      5. 1.4.5. ChannelPipeline
    5. 1.5. 5、TCP粘包问题