博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty源代码学习——EventLoopGroup原理:NioEventLoopGroup分析
阅读量:5145 次
发布时间:2019-06-13

本文共 15534 字,大约阅读时间需要 51 分钟。

类结构图:

                                         

不了解Executor接口原理的能够查看concurrent包中的api介绍。这里仅仅介绍Netty中EventExecutorGroup的主要功能。

从类的结构图中能够看到EventExecutorGroup是直接继承ScheduledExecutorService这个接口的,为了说明确Group的原理这里顺便提一下ScheduledExecutorService的用途!

java.util.concurrent.ScheduledExecutorServiceAn ExecutorService that can schedule commands to run after a given delay, or to execute periodically. The schedule methods create tasks with various delays and return a task object that can be used to cancel or check execution. The scheduleAtFixedRate and scheduleWithFixedDelay methods create and execute tasks that run periodically until cancelled. Commands submitted using the Executor.execute and ExecutorService submit methods are scheduled with a requested delay of zero. Zero and negative delays (but not periods) are also allowed in schedule methods, and are treated as requests for immediate execution. All schedule methods accept relative delays and periods as arguments, not absolute times or dates. It is a simple matter to transform an absolute time represented as a java.util.Date to the required form. For example, to schedule at a certain future date, you can use: schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS). Beware however that expiration of a relative delay need not coincide with the current Date at which the task is enabled due to network time synchronization protocols, clock drift, or other factors. The Executors class provides convenient factory methods for the ScheduledExecutorService implementations provided in this package. Usage ExampleHere is a class with a method that sets up a ScheduledExecutorService to beep every ten seconds for an hour:  import static java.util.concurrent.TimeUnit.*; class BeeperControl {   private final ScheduledExecutorService scheduler =     Executors.newScheduledThreadPool(1);   public void beepForAnHour() {     final Runnable beeper = new Runnable() {       public void run() { System.out.println("beep"); }     };     final ScheduledFuture beeperHandle =       scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);     scheduler.schedule(new Runnable() {       public void run() { beeperHandle.cancel(true); }     }, 60 * 60, SECONDS);   } }}Since:1.5Author:Doug Lea
主要就是能够在一段延迟之后或者每隔一段时间运行task的ExecutorService定义。

EventLoopGroup:

              Special  which allows to register 's that get processed for later selection during theevent loop.

EventLoop:

              Will handle all the I/O-Operations for a once it was registered. One instance will usually handle more then one but this may depend on implementation details and internals.

ThreadPerChannelEventLoop( which is used to handle OIO's. So in general there will be one per .)是用来实现BIO的一对一模型的.

NioEventLoop( implementation which register the's to a and so does the multi-plexing of these in the event loop.)是用来实现NIO多路复用的。接下来通过源代码来分析!

构造函数(请注意parent參数。这是一个LoopGroup类型的):

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {        super(parent, threadFactory, false);        if (selectorProvider == null) {            throw new NullPointerException("selectorProvider");        }        provider = selectorProvider;        selector = openSelector();    }
持有的Selector:
private Selector openSelector() {        final Selector selector;        try {            selector = provider.openSelector();        } catch (IOException e) {            throw new ChannelException("failed to open a new selector", e);        }        if (DISABLE_KEYSET_OPTIMIZATION) {            return selector;        }        try {            SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();            Class
selectorImplClass = Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader()); selectorImplClass.isAssignableFrom(selector.getClass()); Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); selectedKeysField.setAccessible(true); publicSelectedKeysField.setAccessible(true); selectedKeysField.set(selector, selectedKeySet); publicSelectedKeysField.set(selector, selectedKeySet); selectedKeys = selectedKeySet; logger.trace("Instrumented an optimized java.util.Set into: {}", selector); } catch (Throwable t) { selectedKeys = null; logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t); } return selector; }
Run方法实现:
@Override    protected void run() {        for (;;) {            oldWakenUp = wakenUp.getAndSet(false);            try {                if (hasTasks()) {                    selectNow();                } else {                    select();                    // 'wakenUp.compareAndSet(false, true)' is always evaluated                    // before calling 'selector.wakeup()' to reduce the wake-up                    // overhead. (Selector.wakeup() is an expensive operation.)                    //                    // However, there is a race condition in this approach.                    // The race condition is triggered when 'wakenUp' is set to                    // true too early.                    //                    // 'wakenUp' is set to true too early if:                    // 1) Selector is waken up between 'wakenUp.set(false)' and                    //    'selector.select(...)'. (BAD)                    // 2) Selector is waken up between 'selector.select(...)' and                    //    'if (wakenUp.get()) { ... }'. (OK)                    //                    // In the first case, 'wakenUp' is set to true and the                    // following 'selector.select(...)' will wake up immediately.                    // Until 'wakenUp' is set to false again in the next round,                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore                    // any attempt to wake up the Selector will fail, too, causing                    // the following 'selector.select(...)' call to block                    // unnecessarily.                    //                    // To fix this problem, we wake up the selector again if wakenUp                    // is true immediately after selector.select(...).                    // It is inefficient in that it wakes up the selector for both                    // the first case (BAD - wake-up required) and the second case                    // (OK - no wake-up required).                    if (wakenUp.get()) {                        selector.wakeup();                    }                }                cancelledKeys = 0;                final long ioStartTime = System.nanoTime();                needsToSelectAgain = false;                if (selectedKeys != null) {                    processSelectedKeysOptimized(selectedKeys.flip());                } else {                    processSelectedKeysPlain(selector.selectedKeys());                }                final long ioTime = System.nanoTime() - ioStartTime;                final int ioRatio = this.ioRatio;                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                if (isShuttingDown()) {                    closeAll();                    if (confirmShutdown()) {                        break;                    }                }            } catch (Throwable t) {                logger.warn("Unexpected exception in the selector loop.", t);                // Prevent possible consecutive immediate failures that lead to                // excessive CPU consumption.                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    // Ignore.                }            }        }    }

Select方法:

private void select() throws IOException {        Selector selector = this.selector;        try {            int selectCnt = 0;            long currentTimeNanos = System.nanoTime();            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);            for (;;) {                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;                if (timeoutMillis <= 0) {                    if (selectCnt == 0) {                        selector.selectNow();                        selectCnt = 1;                    }                    break;                }                int selectedKeys = selector.select(timeoutMillis);                selectCnt ++;                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {                    // Selected something,                    // waken up by user, or                    // the task queue has a pending task.                    break;                }                if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {                    // The selector returned prematurely many times in a row.                    // Rebuild the selector to work around the problem.                    logger.warn(                            "Selector.select() returned prematurely {} times in a row; rebuilding selector.",                            selectCnt);                    rebuildSelector();                    selector = this.selector;                    // Select again to populate selectedKeys.                    selector.selectNow();                    selectCnt = 1;                    break;                }                currentTimeNanos = System.nanoTime();            }            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {                if (logger.isDebugEnabled()) {                    logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);                }            }        } catch (CancelledKeyException e) {            if (logger.isDebugEnabled()) {                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?

", e); } // Harmless exception - log anyway } } private void selectAgain() { needsToSelectAgain = false; try { selector.selectNow(); } catch (Throwable t) { logger.warn("Failed to update SelectionKeys.", t); } }

processSelectedKeysPlain实现:

private void processSelectedKeysPlain(Set
selectedKeys) { // check if the set is empty and if so just return to not create garbage by // creating a new Iterator every time even if there is nothing to process. // See https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) { return; } Iterator
i = selectedKeys.iterator(); for (;;) { final SelectionKey k = i.next(); final Object a = k.attachment(); i.remove(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask
task = (NioTask
) a; processSelectedKey(k, task); } if (!i.hasNext()) { break; } if (needsToSelectAgain) { selectAgain(); selectedKeys = selector.selectedKeys(); // Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) { break; } else { i = selectedKeys.iterator(); } } } }
selectAgain实现:

private void selectAgain() {        needsToSelectAgain = false;        try {            selector.selectNow();        } catch (Throwable t) {            logger.warn("Failed to update SelectionKeys.", t);        }    }
这里调用selectNow来继续select!

是不是和 中NIO的实现非常相似!

由于EventLoop是Executor,所以他就是Reactor中的Dispatcher,请深刻理解Executor!

接下来让我们想想開始提到的Group的用途,在看到Group的时候我突然想到了中的AsynchronousChannelGroup

这俩究竟有没有关系呢?带着这个问题,看下
NioEventLoopGroup的用途吧!

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {        super(parent, threadFactory, false);        if (selectorProvider == null) {            throw new NullPointerException("selectorProvider");        }        provider = selectorProvider;        selector = openSelector();    }
SingleThreadEventExecutor类:

/**     * Create a new instance     *     * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it     * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}     * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the     *                          executor thread     */    protected SingleThreadEventExecutor(            EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {        if (threadFactory == null) {            throw new NullPointerException("threadFactory");        }        this.parent = parent;        this.addTaskWakesUp = addTaskWakesUp;        thread = threadFactory.newThread(new Runnable() {            @Override            public void run() {                boolean success = false;                updateLastExecutionTime();                try {                    SingleThreadEventExecutor.this.run();                    success = true;                } catch (Throwable t) {                    logger.warn("Unexpected exception from an event executor: ", t);                } finally {                    if (state < ST_SHUTTING_DOWN) {                        state = ST_SHUTTING_DOWN;                    }                    // Check if confirmShutdown() was called at the end of the loop.                    if (success && gracefulShutdownStartTime == 0) {                        logger.error(                                "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +                                "before run() implementation terminates.");                    }                    try {                        // Run all remaining tasks and shutdown hooks.                        for (;;) {                            if (confirmShutdown()) {                                break;                            }                        }                    } finally {                        try {                            cleanup();                        } finally {                            synchronized (stateLock) {                                state = ST_TERMINATED;                            }                            threadLock.release();                            if (!taskQueue.isEmpty()) {                                logger.warn(                                        "An event executor terminated with " +                                        "non-empty task queue (" + taskQueue.size() + ')');                            }                            terminationFuture.setSuccess(null);                        }                    }                }            }        });        taskQueue = newTaskQueue();    }
追踪一下这个构造函数的调用过程:

最后找到了,NioEventLoopGroup,构造这个group的时候传递了一个ThreadFactory參数。所以这个group和中的AsynchronousChannelGroup它是大致相同的!

版权声明:本文博主原创文章,博客,未经同意不得转载。

转载于:https://www.cnblogs.com/gcczhongduan/p/4810547.html

你可能感兴趣的文章
【转】分布式锁的几种使用方式(redis、zookeeper、数据库)
查看>>
mybatis的批量插入
查看>>
微信公众平台接口,修改分享文案、图片等(前端js部分)
查看>>
HDU 5307 He is Flying (生成函数+FFT)
查看>>
PHP array_combine()
查看>>
浅析C\C++的动态内存管理
查看>>
Python 伪造数据的库:Faker
查看>>
《亿级用户下的新浪微博平台架构》阅读笔记
查看>>
nginx对nodejs服务器的http、https、ws、wss的配置
查看>>
短信中VB.NET编码PDU(一)
查看>>
easybuy项目总结_20180409
查看>>
JAVA学习笔记-异常机制
查看>>
ubunru12.10下安装Hadoop1.0.4
查看>>
极速理解设计模式系列【目录索引】
查看>>
.net 弹窗方式
查看>>
JavaScript中Element与Node的区别,children与childNodes的区别
查看>>
[POI2007]ATR-Tourist Attractions [TPLY]
查看>>
okhttp拦截器之RetryAndFollowUpInterceptor&BridgeInterceptor分析
查看>>
maven入门(上)
查看>>
Spring+hibernate事务详解
查看>>