netty源码分析
一.netty概述
java nio是jdk1.4引入的网络编程API,相比于以前BIO阻塞方式的接口,能大大提高网络通信的效率。
netty是一个网络通信框架,底层基于java NIO,用来简化NIO编程的开发。因为如果基于NIO直接实现网络编程的开发,比较复杂,很容易出错,而使用了netty之后,只需要关注逻辑处理部分就可以了。此外,netty还提供了多种网络协议的封装。
Netty4的beta3加了AIO了,但是到beta9又去掉了,作者的意思是测试下来AIO性能不如NIO,所以没必要用。
NIO和AIO在linux中底层都是依赖于epoll来实现的。NIO的模型应该为同步非阻塞,AIO为异步非阻塞。底层的具体实现可以参考前面的文章。
本文基于netty4.0源码来分析netty的网络通信模型。
二.channel和channelPipeline
在Netty里,Channel是通讯的载体,而ChannelHandler负责Channel中的逻辑处理,channelPipeline可以理解为channel的载体,每个channel都有一个对应的ChannelPipeline。
一个channel包括一个pipeline、TCP参数、一个unsafe等,netty4的channel类图如下所示:
按IO类型可以分为:BIO和NIO。
按数据类型可以分为:byte,message。
按所处位置可以分为:ServerSocket,Socket。用的比较多的为NioServerSocketChannel和NioSocketChannel。
NioServerSocketChannel继承了AbstractNioMessageChannel,而NioSocketChannel继承了AbstractNioByteChannel。
netty通过一序列的ChannelHandler来处理一个任务,比如read一条消息后,需要decode、验证、计算等各个步骤。在handler中,每个步骤可以写成一个handler。这一序列的handler通过ChannelPipeline串联起来,其实就是Intercepting Filter模式。
ChannelPipeline通过一个双向链表将handler连接起来,在ChannelPipeline里有headContext和tailContext两个handler,来分别表示链表的头和尾,通过ChannelPipeline来控制消息的处理流程。
一个IO事件会被ChannelInboundHandler或ChannelOutboundHandler处理,这些handler再通过调用事件传播方法如fireChannelRegistered或bind等传递给相邻的handler,其实就是事件驱动的思想。
Netty的ChannelPipeline包含两条线路:inbound和outbound。inbound表示接收到消息、被动的消息,outbound表示发送的消息、主动的状态改变。一个handler可以包括inbound和outbound的一种或两种。
inbound的事件传播方法包括以下几种情况:
fireChannelRegistered
fireChannelActive
fireChannelRead
fireChannelReadComplete
fireExceptionCaught
fireUserEventTriggered
fireChannelWritabilityChanged
fireChannelInactive
fireChannelUnregistered
outbound的事件传播方法包括以下几种情况:
bind
connect
write
flush
read
disconnect
close
deregister
例如,有一个称为p的ChannelPipeline,添加了如下的handler:
1 | p.addLast("1", new InboundHandlerA()); |
则现在的链表结构为:head<->5<->4<->3<->2<->1<->tail (注意,每次添加handler都是插入到tail的前面),inbound的执行顺序为5、2、1,而outbound的执行顺序为3、4、5.
下图为从源码注解截取的pipeline流程图:
三.handler
handler的类图如下所示:
HeadContext和TailContext为pipeline的handler链表中的表头和表尾。此外,netty还提供了很多协议(如protobuf、mqtt、socks等等)的handler用来处理encode、decode等的功能。
四.buffer
buffer的缓存分配器和buffer类图如下所示:
除了可分配定长的buffer外,netty还提供了AdaptiveRecvByteBufAllocator,用来根据本次读取的字节数对下次缓冲接收区的容量进行动态的分配。具体如何分配可参考参考文献3的文档。为了提高效率,netty还提供了基于内存池的缓冲区重用策略(可参考Netty系列之Netty高性能之道一文)。
此外,netty还采用了”Zero-Copy-Capable”机制。我们知道一个报文在网络上传输有可能被拆分成多个,这些被拆分的报文对接收到的上层的逻辑是没有意义的,在netty中,通过将这些buffer组合起来,成为一个channelbuffer,变成一个有意义的报文。当然,zero-copy的含义不止这个,netty的zero-copy机制的具体体现参见李林锋大神的Netty系列之Netty高性能之道一文。如果说NIO的Buffer和Netty的ChannelBuffer最大的区别的话,就是前者仅仅是传输上的Buffer,而后者其实是传输Buffer和抽象后的逻辑Buffer的结合。
例如CompositeChannelBuffer是由多个ChannelBuffer组合而成的,CompositeChannelBuffer并不会开辟新的内存并直接复制所有ChannelBuffer内容,而是直接保存了所有ChannelBuffer的引用,并在子ChannelBuffer里进行读写。当然,要真正zero-copy可能要底层系统支持,下文是stackoverflow上的关于OS级别的zero-copy和netty的区别:
OS-level zero copy involves avoiding copying memory blocks from one location to another (typically from user space to kernel space) before sending data to the hardware driver (network card or disk drive) or vice versa.
Netty zero copy is talking about optimizing data manipulation on Java level (user-space only). Their ChannelBuffer allows to read contents of multiple byte buffers without actually copying their content.
In other words, while Netty works only in user space, it is still valid to call their approach “zero copy”.
However, if OS does not use or support true zero copy, it is possible that when data created by Netty-powered program will be sent over the network, data would still be copied from user space to kernel space, and thus true zero-copy would not be achieved.
五.netty线程模型
对于应用服务器,一个主要规律就是,CPU的处理速度是要远远快于IO速度的,如果CPU为了IO操作(例如从Socket读取一段数据)而阻塞显然是不划算的。好一点的方法是分为多进程或者线程去进行处理,但是这样会带来一些进程切换的开销。应用业务向一个中间人注册一个回调(event handler),当IO就绪后,就这个中间人产生一个事件,并通知此handler进行处理。这种回调的方式,也体现了“好莱坞原则”(Hollywood principle)-“Don’t call us, we’ll call you”。
我们如何知道IO就绪这个事件,谁来充当这个中间人?Reactor模式的答案是:由一个不断等待和循环的单独进程(线程)来做这件事,它接受所有handler的注册,并负责先操作系统查询IO是否就绪,在就绪后就调用指定handler进行处理,这个角色的名字就叫做Reactor。在NIO中Reactor的核心是Selector。
Reactor模式里,操作系统只负责通知IO就绪,具体的IO操作(例如读写)仍然是要在业务进程里阻塞的去做的,而Proactor模式则更进一步,由操作系统将IO操作执行好(例如读取,会将数据直接读到内存buffer中),而handler只负责处理自己的逻辑,真正做到了IO与程序处理异步执行。所以我们一般又说Reactor是同步IO,Proactor是异步IO。
如下是netty4.0源码的一个server端的例子(EchoServer):1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
下面是client端的例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
在server端,可以看到有两个NioEventLoopGroup,这两个其实就是两组线程组,称为boss和worker。boss和worker里面都可以包含多个NioEventLoop线程,一般boss设置1个线程就够了,一个线程绑定一个端口,而worker默认的线程数量为cpu个数的2倍。每个EventLoop线程有一个selector和queue,该线程会轮询绑定到此selector的channel,收到绑定的事件后,会触发发事件通知handler处理。
在初始化时,会将NioServerSocketChannel放进boss线程池中的一个eventLoop线程,让eventLoop关联的select轮询,处理client的connect。boss负责接收connect,接收到connect后,new一个NioSocketChannel,并放进worker的一个线程中,让对应的selector轮询处理read、write等操作。
线程模型如下图所示:
注:一个channel包含一个pipeline、TCP参数、unsafe等,一个worker包含多个EventLoop线程,一个EventLoop包含一个selector、一个queue。
上述只是netty常用的一种线程模型,netty可以根据不同的配置,演变出其他的线程模型(可以参考Netty系列之Netty高性能之道一文)。
此外,netty还采用了很多方法来提高netty的性能,如高效的并发编程、高性能的序列化框架、灵活的TCP参数配置能力等。
下面附上网上一个大神(海浪儿)总结的上文所述例子的执行过程,具体源码的解析可以参考参考文献2.
服务端依次发生的步骤
- 建立服务端监听套接字ServerSocketChannel,以及对应的管道pipeline;
- 启动boss线程,将ServerSocketChannel注册到boss线程持有的selector中,并将注册返回的selectionKey赋值给ServerSocketChannel关联的selectionKey变量;
- 在ServerSocketChannel对应的管道中触发channelRegistered事件;
- 绑定IP和端口
- 触发channelActive事件,并将ServerSocketChannel关联的selectionKey的OP_ACCEPT位置为1。
- 客户端发起connect请求后,boss线程正在运行的select循环检测到了该ServerSocketChannel的ACCEPT事件就绪,则通过accept系统调用建立一个已连接套接字SocketChannel,并为其创建对应的管道;
- 在服务端监听套接字对应的管道中触发channelRead事件;
- channelRead事件由ServerBootstrapAcceptor的channelRead方法响应:为已连接套接字对应的管道加入ChannelInitializer处理器;启动一个worker线程,并将已连接套接字的注册任务加入到worker线程的任务队列中;
- worker线程执行已连接套接字的注册任务:将已连接套接字注册到worker线程持有的selector中,并将注册返回的selectionKey赋值给已连接套接字关联的selectionKey变量;在已连接套接字对应的管道中触发channelRegistered事件;channelRegistered事件由ChannelInitializer的channelRegistered方法响应:将自定义的处理器(譬如EchoServerHandler)加入到已连接套接字对应的管道中;在已连接套接字对应的管道中触发channelActive事件;channelActive事件由已连接套接字对应的管道中的inbound处理器的channelActive方法响应;将已连接套接字关联的selectionKey的OP_READ位置为1;至此,worker线程关联的selector就开始监听已连接套接字的READ事件了。
- 在worker线程运行的同时,Boss线程接着在服务端监听套接字对应的管道中触发channelReadComplete事件。
- 客户端向服务端发送消息后,worker线程正在运行的selector循环会检测到已连接套接字的READ事件就绪。则通过read系统调用将消息从套接字的接受缓冲区中读到AdaptiveRecvByteBufAllocator(可以自适应调整分配的缓存的大小)分配的缓存中;
- 在已连接套接字对应的管道中触发channelRead事件;
- channelRead事件由EchoServerHandler处理器的channelRead方法响应:执行write操作将消息存储到ChannelOutboundBuffer中;
- 在已连接套接字对应的管道中触发ChannelReadComplete事件;
- ChannelReadComplete事件由EchoServerHandler处理器的channelReadComplete方法响应:执行flush操作将消息从ChannelOutboundBuffer中flush到套接字的发送缓冲区中;
客户端依次发生的步骤
- 建立套接字SocketChannel,以及对应的管道pipeline;
- 启动客户端线程,将SocketChannel注册到客户端线程持有的selector中,并将注册返回的selectionKey赋值给SocketChannel关联的selectionKey变量;
- 触发channelRegistered事件;
- channelRegistered事件由ChannelInitializer的channelRegistered方法响应:将客户端自定义的处理器(譬如EchoClientHandler)按顺序加入到管道中;
- 向服务端发起connect请求,并将SocketChannel关联的selectionKey的OP_CONNECT位置为1;
- 开始三次握手,客户端线程正在运行的select循环检测到了该SocketChannel的CONNECT事件就绪,则将关联的selectionKey的OP_CONNECT位置为0,再通过调用finishConnect完成连接的建立;
- 触发channelActive事件;
- channelActive事件由EchoClientHandler的channelActive方法响应,通过调用ctx.writeAndFlush方法将消息发往服务端;
- 首先将消息存储到ChannelOutboundBuffer中;(如果ChannelOutboundBuffer存储的所有未flush的消息的大小超过高水位线writeBufferHighWaterMark(默认值为64 * 1024),则会触发ChannelWritabilityChanged事件)
- 然后将消息从ChannelOutboundBuffer中flush到套接字的发送缓冲区中;(如果ChannelOutboundBuffer存储的所有未flush的消息的大小小于低水位线,则会触发ChannelWritabilityChanged事件)
六.参考文献
- netty源码解析 https://github.com/code4craft/netty-learning
- netty4.x源码分析 http://xw-z1985.iteye.com/blog/1918052
- Netty5.0架构剖析和源码解读 http://vdisk.weibo.com/s/C9LV9iVqH13rW/1391437855
- netty user guide http://netty.io/wiki/user-guide-for-4.x.html
- Netty系列之Netty高性能之道 http://www.infoq.com/cn/articles/netty-high-performance
- 《netty权威指南》-李林锋