Netty 总结一
我们都知道,传统的IO流(BIO)因为是阻塞的,导致性能低下,特别是当多个线程同时处理的时候会导致线程的阻塞,使得服务器的效率大大降低。非阻塞IO流(NIO)相对于传统IO流来说效率上有相应的提升,但是实现步骤太多,我们来看看原生NIO的实现需要多少步骤:

我们可以看到一个简单接收就需要如此复杂的步骤。
Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。下面开门见山的看一看Hello World!
下面代码实现了客户端向服务器发送日期,服务器返回 Hello World在客户端显示:
1、pom.xml 增加依赖
1 2 3 4 5
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.15.Final</version> </dependency>
|
2、服务器端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
public class HelloServer { Logger logger = LoggerFactory.getLogger(HelloServer.class); private int port; public HelloServer(int port){ this.port = port; }
public void bind(){ EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(eventLoopGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new HelloServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 1024); ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { workerGroup.shutdownGracefully(); eventLoopGroup.shutdownGracefully(); } }
private class HelloServerHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("接收到客户端发来的消息"); ByteBuf byteBuf = (ByteBuf) msg; byte[] result = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(result); String receiveString = new String(result,"utf-8"); logger.info("内容为"+receiveString); byteBuf.release();
String response = "Hello world"; ByteBuf encode = ctx.alloc().buffer(4 * response.length()); encode.writeBytes(response.getBytes()); ctx.writeAndFlush(encode); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("===========出现异常========"); cause.printStackTrace(); ctx.close(); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
public static void main(String[] args) { new HelloServer(8080).bind(); } }
|
3、客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
public class HelloClient { Logger logger = LoggerFactory.getLogger(HelloClient.class);
public void connect(String ip , int port){ EventLoopGroup clientGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(clientGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE , true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)) .addLast(new StringDecoder()) .addLast(new ClientHandler()); } }); ChannelFuture future = bootstrap.connect(ip, port).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { clientGroup.shutdownGracefully(); } }
private class ClientHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; byte[] result = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(result); String receiveString = new String(result,"utf-8"); logger.info("=======服务器端返回内容来了======\n"+receiveString); }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Date date = new Date(); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); String message = format.format(date); ByteBuf encode = ctx.alloc().buffer(4 * message.length()); encode.writeBytes(message.getBytes()); ctx.writeAndFlush(encode); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } }
public static void main(String[] args) { new HelloClient().connect("127.0.0.1",8080); } }
|
4、核心组件介绍
EventLoop
EventLoop 定义了处理在连接过程中发生的事件的核心抽象。一个EventLoopGroup 包含一个或多个EventLoop ,并且EventLoop 提供了一种迭代用于检索清单中的下一个EventLoop。每个Channel 都会分配到一个EventLoop ,用于处理该Channel 所有的事件。
- 一个Channel 使用EventLoop 进行注册 , 而一个EventLoop 会对应多个Channel
- 一个EventLoop 在生命周期中将会被绑定到一个专有的Thread 上,这个专有的线程处理该EventLoop 所有的事件

Channel
基础的IO操作,比如 read、bind、connect 都依赖于底层网络传输所提供的原语,在Java 网络编程中,这些都依赖于 “Socket” ,而Netty 极大简化了直接与Socket 进行操作的复杂性。我对Channel 的理解就是将它看成是一个连接的载体,基础的读写操作将由Channel 建立起连接。
此外,Channel 还是很多类的父类 EmbeddedChannel、LocalServerChannel、NioDatagramChannel、NioSctpChannel、NioSocketChannel等。
ChannelFuture
Netty 中所有的IO操作都是异步的,不会立即返回,因此提供了ChannelFuture ,其addListener 方法可以注册一个ChannelFutureListener ,当操作完成时,不管成功还是失败均会被通知。在并发编程Executor 框架中,也有类似于ChannelFuture 的 Future ,通过它的get() 方法可以得知该线程执行任务的结果。
ChannelHandler
ChannelHandler 可以说是最重要的组件,处理进站和出站数据的用户逻辑都将存放在这里。常用的使用方法就是继承ChannelHandler 的子接口 ChannelInboundHandler ,ChannelFuture 可以用于几乎任何类型的操作,如发送数据、处理抛出的异常等。
ChannelPipeline
ChannelPipeline为ChannelHandler链提供了一个容器并定义了用于沿着链传播入站和出站事件流的API。当创建Channel时,会自动创建一个附属的ChannelPipeline。ChannelHandlers按照如下步骤安装在ChannelPipeline中。
- ChannelInitializer 通过BootStrap 进行注册
- 当调用 initChannel() 方法时,ChannelInitializer 会向管道中注册一组自定义的ChannelHandler
- 当操作完成时,ChannelInitializer 将会从管道中自动删除
ChannelHandler可被当做放置任何代码的容器,用于处理到达并通过ChannelPipeline的事件或者数据,数据可以沿着处理链进行传递。

当事件从客户端移动至服务端时称为出站,反之称为入站。并且入站处理器和出站处理器可共存于同一个管道中,当读取入站数据或事件时,将会从管道的头部开始传递到第一个入站处理器,然后传递至下一个处理器直至管道的尾部,此时数据处理结束。当出站时,沿着处理链直到管道的头,然后进行网络传输。
5、TCP粘包问题
TCP 是个 “流” 协议,所谓流就是没有界限的一串数据,期间并没有分界线。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆成多个包进行发送,多个小的包也可能被封装成一个大的数据包进行发送,这就是所谓的TCP 粘包问题。
Netty 解决TCP 的粘包问题尤为简单,有多种已经实现好的ChannelHandler 为我们解决粘包问题,使用的时候只要在ChannelPipeline 管道中加入相应的解码器即可,我们来看看三种常用的解码器:
- LineBasedFrameDecoder 解码器,按照 \n、\r\n 进行解码
1 2 3 4 5 6 7 8
| .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)) .addLast(new StringDecoder()) .addLast(new ClientHandler()); } });
|
- DelimiterBasedFrameDecoder 分隔符解码器,自己设定要用什么符号来划分
1 2
| ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
|
- FixedLengthFrameDecoder 定长解码器,按照固定的长度进行解码
1
| socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(200));
|