pingnanlee的专栏

Bootstrap的意思就是引导,辅助的意思,在编写服务端或客户端程序时,我们都需要先new一个bootstrap,然后基于这个bootstrap调用函数,添加eventloop和handler,可见对bootstrap进行分析还是有必要的。

1、bootstrap结构图

bootstrap的结构比较简单,涉及的类和接口很少,如下图所示,其中Bootstrap则是客户端程序用的引导类,ServerBootstrap是服务端程序用的引导类。

2、serverbootstrap分析

这部分,专门对serverbootstrap进行分析,bootstrap过程大同小异就不作详细的分析了。下面是我们编写服务端代码的一般化过程,整个分析过程将基于下面这段代码中用到的函数进行。

// Configure the bootstrap.EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new HexDumpProxyInitializer(remoteHost, remotePort)).childOption(ChannelOption.AUTO_READ, false).bind(localPort).sync().channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}

先看关键代码(注意这里面的部分函数是在AbstractBootstrap中定义的)

private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();private volatile EventLoopGroup childGroup;private volatile ChannelHandler childHandler;/*** Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link SocketChannel} and* {@link Channel}'s.*/public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (childGroup == null) {throw new NullPointerException("childGroup");}if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = childGroup;return this;}属性值ChildGroup,ChildHandler,是用来处理accpt的Channel的。group函数其实就是将parentGroup和ChildGroup进行赋值,其中parentGroup用于处理accept事件,ChildGroup用于处理accpt的Channel的IO事件。 //channel函数的实现定义在抽象父类中,其实就是通过newInstance函数生成一个具体的channel对象。<pre name="code" class="java"> /*** The {@link Class} which is used to create {@link Channel} instances from.* You either use this or {@link #channelFactory(ChannelFactory)} if your* {@link Channel} implementation has no no-args constructor.*/public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");}return channelFactory(new BootstrapChannelFactory<C>(channelClass));}/*** {@link ChannelFactory} which is used to create {@link Channel} instances from* when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}* is not working for you because of some more complex needs. If your {@link Channel} implementation* has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for* simplify your code.*/@SuppressWarnings("unchecked")public B channelFactory(ChannelFactory<? extends C> channelFactory) {if (channelFactory == null) {throw new NullPointerException("channelFactory");}if (this.channelFactory != null) {throw new IllegalStateException("channelFactory set already");}this.channelFactory = channelFactory;return (B) this;}<pre name="code" class="java"> private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {private final Class<? extends T> clazz;BootstrapChannelFactory(Class<? extends T> clazz) {this.clazz = clazz;}@Overridepublic T newChannel() {try {return clazz.newInstance();} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + clazz, t);}}@Overridepublic String toString() {return clazz.getSimpleName() + ".class";}}Channel函数比较简单,其实就是通过newInstance函数,生成一个具体的Channel对象,例如服务端的NioServerSocketChannel。 /*** Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.*/public ServerBootstrap childHandler(ChannelHandler childHandler) {if (childHandler == null) {throw new NullPointerException("childHandler");}this.childHandler = childHandler;return this;}上面的函数即给serverbootstrap的childHandler赋值。 /*** Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they get created* (after the acceptor accepted the {@link Channel}). Use a value of {@code null} to remove a previous set* {@link ChannelOption}.*/public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {if (childOption == null) {throw new NullPointerException("childOption");}if (value == null) {synchronized (childOptions) {childOptions.remove(childOption);}} else {synchronized (childOptions) {childOptions.put(childOption, value);}}return this;}上面的函数是指定accpt的channel的属性,channel有很多属性,比如SO_TIMEOUT时间,Buf长度等等。 /*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind() {validate();SocketAddress localAddress = this.localAddress;if (localAddress == null) {throw new IllegalStateException("localAddress not set");}return doBind(localAddress);}/*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));}/*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind(String inetHost, int inetPort) {return bind(new InetSocketAddress(inetHost, inetPort));}<pre name="code" class="java"> /*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind(SocketAddress localAddress) {validate();if (localAddress == null) {throw new NullPointerException("localAddress");}return doBind(localAddress);}private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regPromise = initAndRegister();final Channel channel = regPromise.channel();final ChannelPromise promise = channel.newPromise();if (regPromise.isDone()) {doBind0(regPromise, channel, localAddress, promise);} else {regPromise.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {doBind0(future, channel, localAddress, promise);}});}return promise;}<pre name="code" class="java"> private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}Bind函数层层调用过来之后,最后就调用Channel的bind函数了,下面再看channel的bind函数是如何处理的。定义在AbstractChannel中: @Overridepublic ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);}channel的bind函数,最终就是调用pipeline的bind,而pipeline的bind实际上就是调用contexthandler的bind,之个之前分析write和flush的时候说过了。所以这里直接看contexthandler的bind函数。下面是定义: @Overridepublic ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {if (localAddress == null) {throw new NullPointerException("localAddress");}validatePromise(promise, false);final DefaultChannelHandlerContext next = findContextOutbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeBind(localAddress, promise);} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeBind(localAddress, promise);}});}return promise;}<pre name="code" class="java"> private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {try {((ChannelOutboundHandler) handler).bind(this, localAddress, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}}最终调用Handler的bind函数,还记得之前说的outbound类型的事件吗,这类事件提供了默认的实现方法,HeadHandler的bind函数,下面是它的定义:@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)throws Exception {unsafe.bind(localAddress, promise);}我们又看到了unsafe这个苦力了,最终的操作还是得由它来完成啊,赶紧去看看这个bind函数吧,@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {if (!ensureOpen(promise)) {return;}// See: https://github.com/netty/netty/issues/576if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&localAddress instanceof InetSocketAddress &&!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {// Warn a user about the fact that a non-root user can't receive a// broadcast packet on *nix if the socket is bound on non-wildcard address.logger.warn("A non-root user can't receive a broadcast packet if the socket " +"is not bound to a wildcard address; binding to a non-wildcard " +"address (" + localAddress + ") anyway as requested.");}boolean wasActive = isActive();try {doBind(localAddress);} catch (Throwable t) {closeIfClosed();promise.setFailure(t);return;}if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive();}});}promise.setSuccess();}上面的代码最终调用了Channel的doBind函数,这里我们的Channel是NioServerSocketChannel,所以最终就是调用它的bind函数了,代码如下 @Overrideprotected void doBind(SocketAddress localAddress) throws Exception {javaChannel().socket().bind(localAddress, config.getBacklog());}其实它最终也是调用了JDK的Channel的socket bind函数。

美好的生命应该充满期待惊喜和感激

pingnanlee的专栏

相关文章:

你感兴趣的文章:

标签云: