最近在写一个服务,大致就是一个用来做多线程处理任务的东西。所以就看了下ScheduledThreadPoolExecutor的源码。
之前大概了解过ThreadPoolExecutor的源码,知道里面有线程池,有队列,针对不同的情况有不同的处理,但是对与ScheduledThreadPoolExecutor其实没有过多的了解,很简单的认为就是一个定时重复执行任务的线程。
一般我们会通过Executors来帮我们创建一些已经实现好的线程池,ScheduledThreadPoolExecutor一般就是通过下面两个方法创建出来的(当然还有其他方法)。
1 2 |
Executors.newSingleThreadScheduledExecutor() Executors.newScheduledThreadPool(int corePoolSize) |
newSingleThreadScheduledExecutor()这个方法,以前一直以为就是创建一个只包含一个线程的线程池,看了源码之后才知道,其实内部的构造参数是new ScheduledThreadPoolExecutor(1),其实外部还封装了一层,这个不重要。这个1的参数是什么呢?实际上是corePoolSize,这个构造参数调用了父类ThreadPoolExecutor的构造方法,如下
1 2 |
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); |
我们来看下前两个参数,corePoolSize是1,也就是说最小池中线程数是1个,第二个参数是池中允许的最大线程数,就是Integer.MAX_VALUE这么多个。所以看到这,我才知道,其实这样获得的ScheduledThreadPoolExecutor是可以有多个延时任务一起执行的。
接下来就是重点了,我们发现ScheduledThreadPoolExecutor的Queue居然是个DelayedWorkQueue,这个Queue有什么特别的呢?首先字面上我们可以看到他是个DelayQueue,就是个延迟队列,点击去看下源码,发现他实际上是继承了AbstractQueue实现了BlockingQueue,说明他也是一个阻塞队列。他有什么特别的吗,通过几个基本操作的代码发现,其中的offer操作被重新了,也就是添加元素的代码被重写了,下面来看下具体代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<!--?--> e = (RunnableScheduledFuture<!--?-->)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; } |
如果有兴趣可以详细看下任务被加入到队列之后的移动操作,其实这里offer一个任务,就是把指定的任务通过compareTo,移动到了Queue中指定的位置了而已。后面的add、put其实也都是简单直接的调用了offer方法。
接下来看下task方法。其中是通过available信号量来确定什么时候可以从队列里拿到任务去执行,这里包括第一次执行任务的延迟时间,以及每次延时时间的控制,在可以获取任务的时候才拿到任务,所以task还是一个阻塞获取任务的方法,通过之前设置好的时间来控制多久才能拿到任务。所以我们大概可以猜到,这个DelayQueue的offer会进行任务排序,task通过延迟指定的时间来拿到任务。
里面的东西我们大概分析了一下,接下来看下ScheduledThreadPoolExecutor的一般执行过程。在获取到ScheduledThreadPoolExecutor之后,我们一般会执行scheduler.scheduleAtFixedRate(thread, 1, TimeUnit.SECONDS),还有一个方法是scheduleWithFixedDelay,参数列表都一样的,唯一的区别就是,scheduleAtFixedRate的每次执行时间间隔包含程序执行时间,而scheduleWithFixedDelay的每次执行时间间隔不包含程序执行时间。也就是说同样都是5秒执行一次,如果程序执行需要一秒的话,scheduleAtFixedRate的执行时间会是0、5、10、15,而scheduleWithFixedDelay的执行时间会是0、6、12、18。
除了上面说的两点以外,就没有其他的区别了。接下来我们来看下在执行了scheduleAtFixedRate之后会发生什么。先来看下源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public ScheduledFuture<!--?--> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask sft = new ScheduledFutureTask(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } |
其中的new ScheduledFutureTask只是包了一层,这种用法和获取ScheduledThreadPoolExecutor的时候一样,就是适配了一下而已。接下来的重头戏就是delayedExecute,我们跟进去看到如下代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
private void delayedExecute(RunnableScheduledFuture<!--?--> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } } |
正常情况这个东西肯定没有shutdown,所以会忘queue里放入一个任务,这里调用的其实就是我们上面所说的DelayQueue的offer了。接下来会调用ensurePrestart方法,这个方法的作用就是确保就算是corePoolSize为0,也能跑起来一个线程。接下来不管怎么样都会调用addWorker方法,这个方法是父类ThreadPoolExecutor的私有方法,我们跟进去看下,忽略上面的检查Queue为空的情况,下面其实就会看到,如果worker添加成功的话,就会调用t.start();,也就是启动任务。这时候第一次任务调度完成了。
接下来会发生什么呢?我们直接去看这个线程ScheduledFutureTask的run方法,实际上会调用到最后一个elseif的ScheduledFutureTask.super.runAndReset()方法,搞定之后分别调用setNextRunTime();来设置这个任务的下次执行时间,调用reExecutePeriodic方法来走入最初的那套循环,我们跟进去看下其代码。
1 2 3 4 5 6 7 8 9 |
void reExecutePeriodic(RunnableScheduledFuture<!--?--> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } } |
首先它又回去往Queue里放任务了,还是那个DelayQueue,和上面说的是一样的,接下来会执行ensurePrestart,其实这里就又回到了原来的流程了,接着addWorker等等等。
现在思路就清晰了,我用一张图来描述一下这个ScheduledThreadPoolExecutor的执行流程。

©原创文章,转载请注明来源: 赵伊凡's Blog
©本文链接地址: ScheduledThreadPoolExecutor源码分析