博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
如何接入新连接
阅读量:7065 次
发布时间:2019-06-28

本文共 21048 字,大约阅读时间需要 70 分钟。

欢迎关注公众号:【爱编程】 如果有需要后台回复2019赠送1T的学习资料哦!!

前文再续,书接上一回【】。 在研究NioEventLoop执行过程的时候,检测IO事件(包括新连接),处理IO事件,执行所有任务三个过程。其中检测IO事件中通过持有的selector去轮询事件,检测出新连接。这里复用同一段代码。

Channel的设计

在开始分析前,先了解一下Channel的设计

顶层Channel接口定义了socket事件如读、写、连接、绑定等事件,并使用AbstractChannel作为骨架实现了这些方法。查看器成员变量,发现大多数通用的组件,都被定义在这里

第二层AbstractNioChannel定义了以NIO,即Selector的方式进行读写事件的监听。其成员变量保存了selector相关的一些属性。

第三层内容比较多,定义了服务端channel(左边继承了AbstractNioMessageChannel的NioServerSocketChannel)以及客户端channel(右边继承了AbstractNioByteChannel的NioSocketChannel)。

如何接入新连接?

本文开始探索一下Netty是如何接入新连接?主要分为四个部分

1.检测新连接 2.创建NioSocketChannel 3.分配线程和注册Selector 4.向Selector注册读事件

1.检测新连接

Netty服务端在启动的时候会绑定一个bossGroup,即NioEventLoop,在bind()绑定端口的时候注册accept(新连接接入)事件。扫描到该事件后,便处理。因此入口从:NioEventLoop#processSelectedKeys()开始。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();        //省略代码        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead        // to a spin loop        //如果当前NioEventLoop是workGroup 则可能是OP_READ,bossGroup是OP_ACCEPT        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {            //新连接接入以及读事件处理入口            unsafe.read();        }      }复制代码

关键的新连接接入以及读事件处理入口unsafe.read();

a).这里的unsafe是在Channel创建过程的时候,调用了父类AbstractChannel#AbstractChannel()的构造方法,和pipeline一起初始化的。

protected AbstractChannel(Channel parent) {        this.parent = parent;        id = newId();        unsafe = newUnsafe();        pipeline = newChannelPipeline();    }复制代码

服务端: unsafe 为NioServerSockeChannel的父类AbstractNioMessageChannel#newUnsafe()创建,可以看到对应的是AbstractNioMessageChannel的内部类NioMessageUnsafe;

客户端: unsafe为NioSocketChannel的的父类AbstractNioUnsafe#newUnsafe()创建的话,它对应的是AbstractNioByteChannel的内部类NioByteUnsafe

b).unsafe.read()

NioMessageUnsafe.read()中主要的操作如下:

1.循环调用jdk底层的代码创建channel,并用netty的NioSocketChannel包装起来,代表新连接成功接入一个通道。 2.将所有获取到的channel存储到一个容器当中,检测接入的连接数,默认是一次接16个连接 3.遍历容器中的channel,依次调用方法fireChannelRead,4.fireChannelReadComplete,fireExceptionCaught来触发对应的传播事件。

private final class NioMessageUnsafe extends AbstractNioUnsafe {        //临时存储读到的连接        private final List readBuf = new ArrayList();        @Override        public void read() {            assert eventLoop().inEventLoop();            final ChannelConfig config = config();            final ChannelPipeline pipeline = pipeline();            //服务端接入速率处理器            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();            allocHandle.reset(config);            boolean closed = false;            Throwable exception = null;            try {                try {                    //while循环调用doReadMessages()创建新连接对象                    do {                        //获取jdk底层的channel,并加入readBuf容器                        int localRead = doReadMessages(readBuf);                        if (localRead == 0) {                            break;                        }                        if (localRead < 0) {                            closed = true;                            break;                        }                        //把读到的连接做一个累加totalMessages,默认最多累计读取16个连接,结束循环                        allocHandle.incMessagesRead(localRead);                                            } while (allocHandle.continueReading());                } catch (Throwable t) {                    exception = t;                }                                //触发readBuf容器内所有的传播事件:ChannelRead 读事件                int size = readBuf.size();                for (int i = 0; i < size; i ++) {                    readPending = false;                    pipeline.fireChannelRead(readBuf.get(i));                }                //清空容器                readBuf.clear();                allocHandle.readComplete();                //触发传播事件:ChannelReadComplete,所有的读事件完成                pipeline.fireChannelReadComplete();                if (exception != null) {                    closed = closeOnReadError(exception);                    //触发传播事件:exceptionCaught,触发异常                    pipeline.fireExceptionCaught(exception);                }                if (closed) {                    inputShutdown = true;                    if (isOpen()) {                        close(voidPromise());                    }                }            } finally {                // Check if there is a readPending which was not processed yet.                // This could be for two reasons:                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method                //                // See https://github.com/netty/netty/issues/2254                if (!readPending && !config.isAutoRead()) {                    removeReadOp();                }            }        }    }复制代码

而这一段关键代码逻辑中int localRead = doReadMessages(readBuf);它创建jdk底层channel并且用NioSocketChannel包装起来,将该channel添加到传入的容器保存起来,同时返回一个计数。

protected int doReadMessages(List buf) throws Exception {        SocketChannel ch = SocketUtils.accept(javaChannel());        try {            if (ch != null) {  //将jdk底层的channel封装到netty的channel,并存储到传入的容器当中                //this为服务端channel                buf.add(new NioSocketChannel(this, ch)); //成功和创建 客户端接入的一条通道,并返回                return 1;            }        } catch (Throwable t) {            logger.warn("Failed to create a new channel from an accepted socket.", t);            try {                ch.close();            } catch (Throwable t2) {                logger.warn("Failed to close a socket.", t2);            }        }        return 0;    }复制代码

2.创建NioSocketChannel

通过检测IO事件轮询新连接,当前成功检测到连接接入事件之后,会调用NioServerSocketChannel#doReadMessages()方法,进行创建NioSocketChannel,即客户端channel的过程。

下面就来了解一下NioSocketChannel的主要工作: .查看原代码做了两件事,调用父类构造方法,实例化一个NioSocketChannelConfig。

public NioSocketChannel(Channel parent, SocketChannel socket) {        super(parent, socket);        //实例化一个NioSocketChannelConfig        config = new NioSocketChannelConfig(this, socket.socket());    }复制代码

1)、查看NioSocketChannel父类构造方法,主要是保存客户端注册的读事件、channel为成员变量,以及设置阻塞模式为非阻塞。

public NioSocketChannel(Channel parent, SocketChannel socket) {        super(parent, socket);        //实例化一个NioSocketChannelConfig        config = new NioSocketChannelConfig(this, socket.socket());    }    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {        //传入感兴趣的读事件:客户端channel的读事件        super(parent, ch, SelectionKey.OP_READ);    }    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {        super(parent);        //保存客户端channel为成员变量        this.ch = ch;        //保存感兴趣的读事件为成员变量        this.readInterestOp = readInterestOp;        try {            //配置阻塞模式为非阻塞            ch.configureBlocking(false);        } catch (IOException e) {            try {                ch.close();            } catch (IOException e2) {                if (logger.isWarnEnabled()) {                    logger.warn(                            "Failed to close a partially initialized socket.", e2);                }            }            throw new ChannelException("Failed to enter non-blocking mode.", e);        }    }复制代码

最后调用父类的构造方法,是设置该客户端channel对应的服务端channel,以及channel的id和两大组件unsafe和pipeline

protected AbstractChannel(Channel parent) {        //parent为创建次客户端channel的服务端channel(服务端启动过程中通过反射创建的)        this.parent = parent;        id = newId();        unsafe = newUnsafe();        pipeline = newChannelPipeline();    }复制代码

2)、再看NioSocketChannelConfig实例化。主要是保存了javaSocket,并且通过setTcpNoDelay(true);禁止了tcp的Nagle算法,目的是为了尽量让小的数据包整合成大的发送出去,降低延时.

private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {            super(channel, javaSocket);            calculateMaxBytesPerGatheringWrite();        }    public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {        super(channel);        if (javaSocket == null) {            throw new NullPointerException("javaSocket");        }        //保存socket        this.javaSocket = javaSocket;        // Enable TCP_NODELAY by default if possible.        if (PlatformDependent.canEnableTcpNoDelayByDefault()) {            try {                //禁止Nagle算法,目的是为了让小的数据包尽量集合成大的数据包发送出去                setTcpNoDelay(true);            } catch (Exception e) {                // Ignore.            }        }    }复制代码

3.分配线程和注册Selector

服务端启动初始化的时候ServerBootstrap#init(),主要做了一些参数的配置。其中对于childGroup,childOptions,childAttrs,childHandler等参数被进行了单独配置。作为参数和ServerBootstrapAcceptor一起,被当作一个特殊的handle,封装到pipeline中。ServerBootstrapAcceptor中的eventLoopworkGroup

public class ServerBootstrap extends AbstractBootstrap
{ //省略了很多代码............. @Override void init(Channel channel) throws Exception { //配置AbstractBootstrap.option final Map
, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } //配置AbstractBootstrap.attr final Map
, Object> attrs = attrs0(); synchronized (attrs) { for (Entry
, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey
key = (AttributeKey) e.getKey(); channel.attr(key).set(e.getValue()); } } //配置pipeline ChannelPipeline p = channel.pipeline(); //获取ServerBootstrapAcceptor配置参数 final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry
, Object>[] currentChildOptions; final Entry
, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } p.addLast(new ChannelInitializer
() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); //配置AbstractBootstrap.handler ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { //配置ServerBootstrapAcceptor,作为Handle紧跟HeadContext pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }//省略了很多代码.............}复制代码

可见,整个服务端pipeline的结构如下图所示。bossGroup控制IO事件的检测与处理,整个bossGroup对应的pipeline只包括头(HeadContext)尾(TailContext)以及中部的ServerBootstrap.ServerBootstrapAcceptor

当新连接接入的时候AbstractNioMessageChannel.NioMessageUnsafe#read()方法被调用,最终调用fireChannelRead(),方法来触发下一个Handler的channelRead方法。而这个Handler正是ServerBootstrapAcceptor

它是ServerBootstrap的内部类,同时继承自ChannelInboundHandlerAdapter。也是一个ChannelInboundHandler。其中channelRead主要做了以下几件事。

1.为客户端channel的pipeline添加childHandler 2.设置客户端TCP相关属性childOptions和自定义属性childAttrs 3.workGroup选择NioEventLoop并注册Selector

1)、为客户端channel的pipeline添加childHandler

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {        private final EventLoopGroup childGroup;        private final ChannelHandler childHandler;        private final Entry
, Object>[] childOptions; private final Entry
, Object>[] childAttrs; private final Runnable enableAutoReadTask; ServerBootstrapAcceptor( final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry
, Object>[] childOptions, Entry
, Object>[] childAttrs) { this.childGroup = childGroup; this.childHandler = childHandler; this.childOptions = childOptions; this.childAttrs = childAttrs; //省略了一些代码。。。。。 @Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { //该channel为客户端接入时创建的channel final Channel child = (Channel) msg; //添加childHandler child.pipeline().addLast(childHandler); //设置TCP相关属性:childOptions setChannelOptions(child, childOptions, logger); //设置自定义属性:childAttrs for (Entry
, Object> e: childAttrs) { child.attr((AttributeKey
) e.getKey()).set(e.getValue()); } try { //选择NioEventLoop并注册Selector childGroup.register(child) .addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } //省略了一些代码。。。。。 }复制代码

客户端channel的pipeline添加childHandler,在服务端EchoServer创建流程中,childHandler的时候,使用了ChannelInitializer的一个自定义实例。并且覆盖了其initChannel方法,改方法获取到pipeline并添加具体的Handler。查看ChannelInitializer具体的添加逻辑,handlerAdded方法。其实在initChannel逻辑中,首先是回调到用户代码执行initChannel,用户代码执行添加Handler的添加操作,之后将ChannelInitializer自己从pipeline中删除

public abstract class ChannelInitializer
extends ChannelInboundHandlerAdapter { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. //初始化Channel if (initChannel(ctx)) { // We are done with init the Channel, removing the initializer now. removeState(ctx); } } } private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { //回调到用户代码 initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { //删除本身 pipeline.remove(this); } } return true; } return false; }}复制代码

2)、设置客户端TCP相关属性childOptions和自定义属性childAttrs 这点在ServerBootstrapAcceptor#init()方法中已经体现

3)、workGroup选择NioEventLoop并注册Selector 这要从AbstractBootstrap#initAndRegister()方法开始,然后跟踪源码会来到AbstractUnsafe#register()方法

protected abstract class AbstractUnsafe implements Unsafe {      //省略了一些代码。。。。。  @Override        public final void register(EventLoop eventLoop, final ChannelPromise promise) {            if (eventLoop == null) {                throw new NullPointerException("eventLoop");            }            if (isRegistered()) {                promise.setFailure(new IllegalStateException("registered to an event loop already"));                return;            }            if (!isCompatible(eventLoop)) {                promise.setFailure(                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));                return;            }            AbstractChannel.this.eventLoop = eventLoop;            if (eventLoop.inEventLoop()) {                register0(promise);            } else {                try {                    eventLoop.execute(new Runnable() {                        @Override                        public void run() {                            register0(promise);                        }                    });                } catch (Throwable t) {                    logger.warn(                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",                            AbstractChannel.this, t);                    closeForcibly();                    closeFuture.setClosed();                    safeSetFailure(promise, t);                }            }        }      //省略了一些代码。。。。。}复制代码

最后调用AbstractNioUnsafe#doRegister()方法通过jdk的javaChannel().register完成注册功能。

protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {      //省略了一些代码。。。。。  @Override    protected void doRegister() throws Exception {        boolean selected = false;        for (;;) {            try {                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);                return;            } catch (CancelledKeyException e) {                if (!selected) {                    // Force the Selector to select now as the "canceled" SelectionKey may still be                    // cached and not removed because no Select.select(..) operation was called yet.                    eventLoop().selectNow();                    selected = true;                } else {                    // We forced a select operation on the selector before but the SelectionKey is still cached                    // for whatever reason. JDK bug ?                    throw e;                }            }        }    }      //省略了一些代码。。。。。}复制代码

4.向Selector注册读事件

a)、入口:ServerBootstrap.ServerBootstrapAcceptor#channelRead()#childGroup.register();

public void channelRead(ChannelHandlerContext ctx, Object msg) {            final Channel child = (Channel) msg;            child.pipeline().addLast(childHandler);            setChannelOptions(child, childOptions, logger);            for (Entry
, Object> e: childAttrs) { child.attr((AttributeKey
) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }复制代码

b)、实际上调用了AbstractChannel.AbstractUnsafe#register0(),触发了通道激活事件;

//触发通道激活事件,调用HeadContent的   pipeline.fireChannelActive();复制代码

c)、pipeline的头部开始,即DefaultChannelPipeline.HeadContext#channelActive()从而触发了readIfIsAutoRead();

@Override  public void channelActive(ChannelHandlerContext ctx) {            ctx.fireChannelActive();            readIfIsAutoRead();  }复制代码

d)、读事件将从尾部的TailContent#read()被触发,从而依次执行ctx.read(),从尾部开始,每个outboundHandler的read()事件都被触发。直到头部。

@Override    public final ChannelPipeline read() {        tail.read();        return this;    }    @Override    public ChannelHandlerContext read() {        //获取最近的outboundhandler        final AbstractChannelHandlerContext next = findContextOutbound();        EventExecutor executor = next.executor();        //并依次执行其read方法        if (executor.inEventLoop()) {            next.invokeRead();        } else {            Tasks tasks = next.invokeTasks;            if (tasks == null) {                next.invokeTasks = tasks = new Tasks(next);            }            executor.execute(tasks.invokeReadTask);        }        return this;    }复制代码

e)、进入头部HeadContext#read(),并且最终更改了selectionKey,向selector注册了读事件

HeadContext#read()

@Override        public void read(ChannelHandlerContext ctx) {            unsafe.beginRead();        }复制代码

AbstractChannel#beginRead()

@Override        public final void beginRead() {            assertEventLoop();            if (!isActive()) {                return;            }            try {                doBeginRead();            } catch (final Exception e) {                invokeLater(new Runnable() {                    @Override                    public void run() {                        pipeline.fireExceptionCaught(e);                    }                });                close(voidPromise());            }        }复制代码

AbstractNioMessageChannel#doBeginRead

@Override    protected void doBeginRead() throws Exception {        if (inputShutdown) {            return;        }        super.doBeginRead();    }复制代码

AbstractNioChannel#doBeginRead()

@Override    protected void doBeginRead() throws Exception {        // Channel.read() or ChannelHandlerContext.read() was called        final SelectionKey selectionKey = this.selectionKey;        if (!selectionKey.isValid()) {            return;        }        readPending = true;        final int interestOps = selectionKey.interestOps();        if ((interestOps & readInterestOp) == 0) {            selectionKey.interestOps(interestOps | readInterestOp);        }    }复制代码

参考文章:

总结

Netty如何接入新连接基本流程如上所述,如果有误,还望各位指正。建议先从前两篇看起比较好理解点。

最后

如果对 Java、大数据感兴趣请长按二维码关注一波,我会努力带给你们价值。觉得对你哪怕有一丁点帮助的请帮忙点个赞或者转发哦。 关注公众号**【爱编码】,回复2019**有相关资料哦。

转载于:https://juejin.im/post/5cfa3501f265da1bd4247174

你可能感兴趣的文章
小猪的Python学习之旅 —— 20.抓取Gank.io所有数据存储到MySQL中
查看>>
从小数学就不及格的我,竟然用极坐标系表白了我的女神!(附代码)
查看>>
iOS远程hot patch的优点和风险
查看>>
Android uncovers master-key 漏洞分析
查看>>
常用操作命令使用总结
查看>>
单元测试工具 TestNG 使用
查看>>
关于Java面试,你应该准备这些知识点
查看>>
某PA科技iOS开发工程师面试回忆
查看>>
从jvm角度看懂类初始化、方法重载、重写。
查看>>
项目难做,程序员难当,软件开发中的 9 大难题
查看>>
ELK 使用小技巧(第 4 期)
查看>>
浅谈前端测试
查看>>
wordpress主题实现彩色标签云效果
查看>>
python线程同步机制
查看>>
Koa源码分析
查看>>
Vim命令的简单使用
查看>>
ArrayList,LinkedList,Vector,Stack之间的区别
查看>>
iOS 本地DNS解析方法
查看>>
python-17-socket
查看>>
java中23种设计模式--原型模式(Portotype)
查看>>