Netty5的启动说明,帮助理解netty5对NIO的封装
Netty5: ServerSocketChannel启动说明: 以NettyExample 里EchoServer为例
//EventLoop相当于线程,会在run()方法里不断的执行Selector.select(timeout)方法
//如果有就绪的事件,就创建一个处理对应事件的任务丢进线程池里执行。
//下边EventLoopGroup就是线程池。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //这里默认创建 cpu个数 * 2 个线程
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoServerHandler());
}
});
// 以上都是设置参数和ChannelHandler ,下面调用bind()才是启动服务器.
// 相当于原生NIO调用
// ServerSocketChannel servch = ServerSocketChannel.open();
// servch.configureBlocking(false);
// selectKey = servch.register(selector , 0, null);
// serverSocketChannel.bind();
// selectKey.interestOps(SelectionKey.OP_ACCEPT);
// 因为Event是注册到bossGroup里的seletor上的,所以后边bossGroup就能不断的处理Accept事件了。
//Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
那么Netty又是怎么处理Accept事件呢,看代码:ServerBootstrap 里有个ChannelHandler ServerBootstrapAcceptor
专门处理Accept事件.
来看看ServerBootstrapAcceptor的ChannelRead()方法是怎么处理的
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 由于NioServerSocketChannel是AbstractNioMessageChannel的子类,
// 当有Accept事件就绪时就会调用AbstractNioMessageChannel.NioMessageUnsafe.read()方法,
// unSafe.read()里再调用NioServerSocketChannel.doReadMessages()方法,实现对serverSocketChannel.accept()的调用,完成三次握手,
// 并触发PipeLine的channelRead()方法,所以这里的参数msg是serverSocketChannel.accept()的返回结果: SocketChannel的封装类:NioSocketChannel
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
// 这里是注册childChannel的READ事件到workGroup线程池的一个线程上,让Selector 处理这个连接的Read事件。
// 这里要注意一下 了 : netty的线程模型正是在这里体现的. 一个channel只会注册到一个EventLoop(线程)里,以后这个channel就由这个EventLoop负责了. 这样保证了channelHandler不会被多个线程调用. 避免了多线程并发的复杂性.
// 这样当有数据到达时,childGroup线程调用selector.select()方法会返回,并调用unSafe.read()方法。
// 最终会调用NioSocketChannel.doReadBytes()方法,完成数据读取,再触发PipeLine的channelRead()方法,跟Accep事件的处理一样的。
// 只是传给ChannelRead()方法的参数是Bytebuf.
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
上面介绍了Netty作为服务端处理Accept 和 Read. 下面看看是如何处理Connect的。 同样从EchoClient来看。
// 前边都不说了,跟Server差不多,从这句开始是真正开始建立连接。相当于原生NIO代码的:
// SocketChannel clich = SocketChannel.open();
// clich.configureBlocking(false);
// selectKey = clich.register(selector , 0, null);
// boolean connected = clich.connect(remoteAddress);
// selectKey.interestOps(SelectionKey.OP_CONNECT);
// 当服务端accept之后,workGroup就会处理Connect事件了。并调用unsafe.finishConnect()方法完成三次握手。
// 之后处理READ事件。
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();