Netty应用以及Server端启动源码(一)
2021-01-02 14:13:46

Netty是基于事件处理的高性能异步非阻塞网络框架,Zookeeper、Dubbo、RocketMQ底层都有Netty的影子,是网络编程的基石,需要熟练掌握,本章会介绍NIO相关知识和Netty Server端源码

1. NIO

1.1 运行状态和异步同步

泡茶例子:

  • 同步:自己来烧水,自己来泡茶;
  • 异步:自己烧水,然后走了换另一个人来泡茶;
  • 阻塞:自己来烧水,自己需要站在原地等水烧开;
  • 非阻塞:自己来烧水,自己可以在烧水途中做其他的事情;

1.2 网络IO

  • BIO:blocking I/O,阻塞性IO,需要当客户端发送请求之前则建立一个阻塞线程,去监听端口,待请求建立连接后继续阻塞,待客户端发送数据后再处理数据,适用于连接数固定且较小的项目中;
  • NIO:new I/O,使用单独的线程进行轮询是否有人建立连接,当建立了连接后才会丢给工作线程组,让线程组以块的模式来处理客户端数据,适用于连接数较大且连接时长较短的业务;
  • AIO:asynchronous I/O,完全异步的处理内核态的客户端数据,无需关心建立连接的细节,只需要根据系统通知进行事件处理即可,适用于连接数较大且连接时长较长的业务,用于系统调用比较多的场景,例如文件下载;

1.3 Reactor模式

主要分为reactor和handler,reactor用来处理客户端连接,handler用来负责处理相关请求。

2. Netty Server端启动源码

2.1 声明线程组

1
2
// 初始化工作线程组
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

EventLoopGroup:线程池,如果不设置内部线程数则默认为CPU核数和两倍,内部存放EventExecutor数组,每个数组存放一个初始化完成了的NioEventLoop对象。
NioEventLoop:本质上是一个线程,因为其继承SingleThreadEbentLoop,在初始化时设置Selector和TaskQueue,Selector为多路复用器,TaskQueue为任务队列。

2.2 初始化ServerBootstrap并设置参数

1
2
3
4
5
// 设置ServerBootstrap相关的Channel和EventLoopGroup线程池
ServerBootstrap serverBootstrap = bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(initializer);

设置ServerBootstrap的Channel类型为NioServerSocketChannel,设置handler。

2.3 初始化Channel并监听Selector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 初始化Channel并绑定端口
Channel channel = serverBootstrap.bind(port).sync().channel();
...
// 设置Channel只关注连接事件
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
...
// 设置当前SelectableChannel为非阻塞
ch.configureBlocking(false);
...
// 通过反射创建Channel对象并完成Channel自己的初始化
channel = channelFactory.newChannel();

执行绑定事件,在AbstractBootstrap中执行doBind方法,进入AbstractBootstrap中调用initAndRegister方法,进行初始化ServerBootstrap和进行注册。NioServerSocketChannel本质上就是Java NIO中的ServerSocketChannel。通过上面指定的Channel类型为NioServerSocketChannel.class来生成NioServerSocketChannel作为ServerBootstrap中的Channel。将Channel和ServerBootstrap绑定,并初始化Channel只关注SelectionKey.OP_ACCEPT事件,并在初始化过程中设置SelectableChannel的阻塞状态为非阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// workerGroup
final EventLoopGroup currentChildGroup = childGroup;
// 自定义的handler
final ChannelHandler currentChildHandler = childHandler;
...
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 设置Channel中EventLoop中各个线程都需要初始化其中的ChannelPipline并且新构建一个ServerBootstrapAccpetor进去
// ServerBootstrapAccpetor是ChannelInboundHandlerAdapter的子类,所以在Channel收到网络请求时会依次执行其中的方法
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});

之后将childHandler方法中的pipeline和原有的handler转换为Channel中的ChannelPipeline对象,ChannelPipeline本质上是一个Map<String, ChannelHandler>,将Channel中各个EventLoop中的ChannelPipeline都初始化了一个ServerBootstrapAcceptor,相当于初始化请求链路并在其中新增了一个自己的处理逻辑,并将Channel中的子线程组、自定义拦截器、自定义属性、自定义参数都放入初始化的Acceptor中。

1
2
3
4
5
6
7
8
// Group注册到Channel上
ChannelFuture regFuture = config().group().register(channel);
...
// next方法是依次获取当前EventGroup中的所有线程
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

将Group注册到Channel上,返回一个ChannelFuture对象来进行监听回调处理。
channel.closeFuture().sync();