ScheduledThreadPoolExecutor源码分析

最近在写一个服务,大致就是一个用来做多线程处理任务的东西。所以就看了下ScheduledThreadPoolExecutor的源码。

之前大概了解过ThreadPoolExecutor的源码,知道里面有线程池,有队列,针对不同的情况有不同的处理,但是对与ScheduledThreadPoolExecutor其实没有过多的了解,很简单的认为就是一个定时重复执行任务的线程。

一般我们会通过Executors来帮我们创建一些已经实现好的线程池,ScheduledThreadPoolExecutor一般就是通过下面两个方法创建出来的(当然还有其他方法)。

newSingleThreadScheduledExecutor()这个方法,以前一直以为就是创建一个只包含一个线程的线程池,看了源码之后才知道,其实内部的构造参数是new ScheduledThreadPoolExecutor(1),其实外部还封装了一层,这个不重要。这个1的参数是什么呢?实际上是corePoolSize,这个构造参数调用了父类ThreadPoolExecutor的构造方法,如下

我们来看下前两个参数,corePoolSize是1,也就是说最小池中线程数是1个,第二个参数是池中允许的最大线程数,就是Integer.MAX_VALUE这么多个。所以看到这,我才知道,其实这样获得的ScheduledThreadPoolExecutor是可以有多个延时任务一起执行的。

接下来就是重点了,我们发现ScheduledThreadPoolExecutor的Queue居然是个DelayedWorkQueue,这个Queue有什么特别的呢?首先字面上我们可以看到他是个DelayQueue,就是个延迟队列,点击去看下源码,发现他实际上是继承了AbstractQueue实现了BlockingQueue,说明他也是一个阻塞队列。他有什么特别的吗,通过几个基本操作的代码发现,其中的offer操作被重新了,也就是添加元素的代码被重写了,下面来看下具体代码。

如果有兴趣可以详细看下任务被加入到队列之后的移动操作,其实这里offer一个任务,就是把指定的任务通过compareTo,移动到了Queue中指定的位置了而已。后面的add、put其实也都是简单直接的调用了offer方法。

接下来看下task方法。其中是通过available信号量来确定什么时候可以从队列里拿到任务去执行,这里包括第一次执行任务的延迟时间,以及每次延时时间的控制,在可以获取任务的时候才拿到任务,所以task还是一个阻塞获取任务的方法,通过之前设置好的时间来控制多久才能拿到任务。所以我们大概可以猜到,这个DelayQueue的offer会进行任务排序,task通过延迟指定的时间来拿到任务。

里面的东西我们大概分析了一下,接下来看下ScheduledThreadPoolExecutor的一般执行过程。在获取到ScheduledThreadPoolExecutor之后,我们一般会执行scheduler.scheduleAtFixedRate(thread, 1, TimeUnit.SECONDS),还有一个方法是scheduleWithFixedDelay,参数列表都一样的,唯一的区别就是,scheduleAtFixedRate的每次执行时间间隔包含程序执行时间,而scheduleWithFixedDelay的每次执行时间间隔不包含程序执行时间。也就是说同样都是5秒执行一次,如果程序执行需要一秒的话,scheduleAtFixedRate的执行时间会是0、5、10、15,而scheduleWithFixedDelay的执行时间会是0、6、12、18。

除了上面说的两点以外,就没有其他的区别了。接下来我们来看下在执行了scheduleAtFixedRate之后会发生什么。先来看下源码。

其中的new ScheduledFutureTask只是包了一层,这种用法和获取ScheduledThreadPoolExecutor的时候一样,就是适配了一下而已。接下来的重头戏就是delayedExecute,我们跟进去看到如下代码。

正常情况这个东西肯定没有shutdown,所以会忘queue里放入一个任务,这里调用的其实就是我们上面所说的DelayQueue的offer了。接下来会调用ensurePrestart方法,这个方法的作用就是确保就算是corePoolSize为0,也能跑起来一个线程。接下来不管怎么样都会调用addWorker方法,这个方法是父类ThreadPoolExecutor的私有方法,我们跟进去看下,忽略上面的检查Queue为空的情况,下面其实就会看到,如果worker添加成功的话,就会调用t.start();,也就是启动任务。这时候第一次任务调度完成了。

接下来会发生什么呢?我们直接去看这个线程ScheduledFutureTask的run方法,实际上会调用到最后一个elseif的ScheduledFutureTask.super.runAndReset()方法,搞定之后分别调用setNextRunTime();来设置这个任务的下次执行时间,调用reExecutePeriodic方法来走入最初的那套循环,我们跟进去看下其代码。

首先它又回去往Queue里放任务了,还是那个DelayQueue,和上面说的是一样的,接下来会执行ensurePrestart,其实这里就又回到了原来的流程了,接着addWorker等等等。

现在思路就清晰了,我用一张图来描述一下这个ScheduledThreadPoolExecutor的执行流程。

scheduleExecutor实现原理
scheduleExecutor实现原理

 

©原创文章,转载请注明来源: 赵伊凡's Blog
©本文链接地址: ScheduledThreadPoolExecutor源码分析

发表评论

电子邮件地址不会被公开。 必填项已用*标注