ScheduledThreadPoolExecutor是ThreadPoolExecutor的扩展类,用来实现延迟执行的任务、或者周期性执行的任务。
一般来讲,周期性任务或者定时任务包含两大组件:一个是执行任务的线程池,一个是存储任务的存储器。还记得Quartz吗?企业级定时任务框架,最重要的内容其实也是这两部分:SimpleThreadPool和JobStore。
ScheduledThreadPoolExecutor也不例外,由线程池和任务队列组成。线程池继承自ThreadPoolExecutor,任务队列DelayedWorkQueue,了解了ThreadPoolExecutor和DelayedWorkQueue,也就基本了解了ScheduledThreadPoolExecutor。
此外,ScheduledThreadPoolExecutor的特殊之处还在于他所执行的任务必须是ScheduledFutureTask,ScheduledFutureTask是“未来要执行的任务”,“未来”由delay指定。即使是通过ScheduledThreadPoolExecutor提交“立即”而不是“未来”要执行的任务,也要通过指定delay时长为0的ScheduledFutureTask来提交。ScheduledFutureTask任务提交之后加入阻塞队列DelayedWorkQueue等待调度。
ScheduledThreadPoolExecutor的创建
提供了四个构造方法,都是通过调用父类ThreadPoolExecutor的构造方法完成ScheduledThreadPoolExecutor对象的创建:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
corePoolSize通过构造方法的参数指定,maximumPoolSize在构造方法中都固定设置为Integer.MAX_VALUE,也就是不受限制。
keepAliveTime设置为0,后面我们会知道其实ScheduledThreadPoolExecutor的线程数不会超过corePoolSize,而且如果allowCoreThreadTimeOut保持默认的话(false),那其实这个keepAliveTime是没有意义的。
四个构造方法均设置阻塞队列为new DelayedWorkQueue(),即仅支持DelayedWorkQueue。
ScheduledThreadPoolExecutor的线程池管理
ScheduledThreadPoolExecutor是ThreadPoolExecutor的扩展,线程池管理部分没有做扩展,保留了ThreadPoolExecutor的原有功能。
每次任务加入队列后会调用ensurePrestart(ThreadPoolExecutor实现)方法创建并启动一个线程,ThreadPoolExecutor的ensurePrestart方法:
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc
如果线程数小于corePoolSize则调用addWorker(null, true)创建核心线程线程,否则如果线程数为0(设置corePoolSize=0则可能走到这个分支)创建非核心线程。
corePoolSize大于0的情况下,ScheduledThreadPoolExecutor启动的线程数不会大于核心线程数。而且每一个线程创建的时候都不会有firstTask,线程总是从阻塞队列里获取任务执行。
提交任务
ScheduledThreadPoolExecutor提供execute(Executor接口的任务提交方法)、submit、schedule、scheduleAtFixedRate、scheduleWithFixedDelay等方法提交任务。
虽然说ScheduledThreadPoolExecutor只接受ScheduledFutureTask,但这并不是说应用层只能提交给他ScheduledFutureTask的任务,应用通过以上各方法提交任务的时候的Task是非常灵活的:可以是Callable,也可以是Runnable,ScheduledThreadPoolExecutor内部再把它们包装为ScheduledFutureTask — 对应用层来说是透明的。
提供了这么多提交任务的方法,无非是为了支持应用层以更加灵活的方式提交任务,其实底层执行逻辑大同小异。
我们就以scheduleAtFixedRate为例来分析任务提交过程:
public ScheduledFuture> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period sft =
new ScheduledFutureTask(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
scheduleAtFixedRate方法的目的是提交一个在提交后延时(参数initialDelay指定)启动、以固定时间周期(参数period指定)重复执行的任务。
主要执行了以下动作:
- 首先创建了ScheduledFutureTask任务
- 之后将它包装成RunnableScheduledFuture,其实decorateTask方法直接返回了创建好的ScheduledFutureTask,没什么好分析的
- 然后调用delayedExecute启动线程执行任务
ScheduledFutureTask任务
这个也是ScheduledThreadPoolExecutor的重头戏!
ScheduledFutureTask继承自FutureTask,并实现了RunnableScheduledFuture接口,所以他是一个复合体:
- 可以异步执行带有返回的任务(Future接口)
- 可以执行周期性任务(RunnableScheduledFuture接口)
先认识几个重要属性:
time: 任务应该被执行的时间(纳秒)
period: 周期执行任务的间隔时间(纳秒),正数表示fix-rate,负数表述fix-delay(fixed-rate和fixed-delay前面jdk timer的文章讲过,含义一样)
outerTask:指向自己的RunnableScheduledFuture对象
compareTo方法:进入DelayedWorkQueue队列时需要调用CompareTo方法比较大小,以便把最近执行的任务放在堆头。compareTo方法最终比较的其实就是time属性。
run方法:
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
如果是一次性任务则直接调用FutureTask的run方法执行任务,否则,如果是周期性任务,首先调用FutureTask的runAndReset方法,调用成功的话,设置下次执行时间,然后通过调用reExecutePeriodic将当前任务再次加入队列。
runAndReset方法首先执行当前任务,执行完成后重新设置当前任务状态为NEW,准备下次执行。
通过分析runAndReset方法可以知道,周期性任务执行后不再能够获取到返回(回忆一下FutureTask的代码逻辑,状态设置为NEW之后,就不再可能获取到返回了)。
delayedExecute启动线程
方法代码很简单:
private void delayedExecute(RunnableScheduledFuture> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
任务加入队列,之后调用ensurePrestart方法启动线程!
任务直接进入队列,之后启动线程,把任务的执行完全交给ThreadPoolExecutor的任务执行线程Worker去调度了:Worker线程调用getTask方法从队列获取并执行任务!
我们现在可以大胆猜测一下了:周期性任务是有严格的执行时间要求的,没到执行时间的任务是不能执行的,由于ThreadPoolExecutor的任务执行线程的逻辑中并没有执行时间的判断,那么,这个逻辑应该是在getTask方法向队列获取任务的时候、由队列的出队方法实现的。
现在轮到阻塞队列DelayedWorkQueue出场了,我们带着这个疑问来研究一下DelayedWorkQueue队列。
阻塞队列DelayedWorkQueue
DelayedWorkQueue底层是以数组实现的堆结构。堆结构是一个完全二叉树,可以确保每一个节点都比他的叶子节点大(或者小),这样的话堆头节点(也就是数组的第一个元素)就一定是最大(或最小的)。
DelayedWorkQueue是存储ScheduledFutureTask的队列,最近执行的任务需要存放在堆头,每一个节点都应该比他的叶子节点小。
每次任务加入队列、节点出队、删除节点等操作都需要按照任务执行时间time重新调整队列。
初始化容量为16,新节点加入队列时如果队列容量不够则扩容原来容量的50%(新容量 = 1.5 * 旧容量)。
新节点入队的时候调用siftUp方法重新调整队列,以便新节点加入到堆的合适位置。
private void siftUp(int k, RunnableScheduledFuture> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
siftUp方法基于完全二叉树的一个特性:完全二叉树的第k个节点的父节点在数组中的位置为:(k-1)/2取整。
节点加入队列的时候默认加入到尾部k(当前数组的size),获取到k的父节点、比较当前节点和父节点,如果当前节点大于父节点(调用了ScheduledFutureTask的compareTo方法,比较的是time),说明当前节点找到了正确的位置,否则当前节点与父节点交换位置,继续寻找父节点比较、直到找到根节点。
这样,新加入的节点通过siftUp操作之后会根据任务触发时间time进入到队列的合适位置。
节点需要从队列remove的时候也需要执行类似的操作(调用siftUp或siftDown方法)确保堆的正确顺序。
这样一来,DelayedWorkQueue队列可以始终保证堆头(也就是数组的第一个元素)就是最近需要执行的任务。任务执行线程在获取最近需要被执行任务的时候,不需要遍历整个队列、只需要获取堆头第一个节点执行即可。
堆结构非常适合周期性任务或定时任务这一应用场景:节点加入任务时的时效性要求不高(因为是需要延时执行的任务嘛,时效性要求肯定就不高了),获取数据的时效性要求高(到了任务的执行时间了,最好当然是能即时获取到、立即执行),能够非常有效的提高任务执行效率。
了解了堆结构的特性,知道了堆结构入队出队的排序逻辑,接下来还需要去验证我们的猜测:出队逻辑会判断当前是否已经到了节点的执行时间。
因为ThreadPoolExecutor的getTask()方法调用队列的take方法获取任务,所以,直接看DelayedWorkQueue的take()方法就可以了,
public RunnableScheduledFuture> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay
逻辑很清晰:
- 队列上锁
- 获取堆头的任务,如果为空(空队列,没有任务),等待(注意等待是会释放锁资源的、等待被唤醒之后重新获取锁资源,有关ReentrantLock我们后面会专门做详细分析)
- 否则,判断堆头任务已经到执行时间了,堆头任务出队列并返回堆头任务。
- 否则,堆头任务执行时间未到,采用Leader-Follower模式等待
上面第3点验证了我们的猜测!
Leader-Follower模式是指线程池中多个线程在等待执行任务的时候,线程会竞争Leader,只有一个线程会在竞争中获胜成为Leader,其他线程就都是Follower。Leader获权仅等待指定时间(当前距下次任务执行的时间差)、Follower线程则需要无限期等待(被取消或者被其他线程唤醒)。等待过程中如果有新的节点加入队列并成为堆头的话(新加入的任务变成了最近要被执行的任务),此时需要设置leader为空并唤醒等待线程重新竞争leader。Leader-Follower模式可以有效避免所有等待线程都进入无限期、被动等待其他线程唤醒的等待模式、在等待时长达到后主动唤醒执行任务。
小结
周期性任务线程池ScheduledThreadPoolExecutor扒完了,简单总结下:
- 周期性线程池可以处理立即执行的任务、延迟执行的一次性任务、延迟执行的周期性任务(FixedRate和FixedDelay两种模式)
- 创建的时候指定核心线程数量,线程池最终启动的线程数量不会超过核心线程数量,每提交一个任务的同时启动一个线程、直到线程池数量达到核心线程数
- 不管是立即执行的任务、还是延迟执行的任务,任务提交后直接加入阻塞队列,等待线程从队列中获取并执行
- 阻塞队列采用DelayedWorkQueue,堆结构,最近执行的任务始终放在堆头
- 线程池中的线程采用Leader-Follower模式竞争任务,可以认为竞争成为Leader的线程获得了堆头任务的优先执行权
Thanks a lot!
上一篇 线程池 – ThreadPoolExecutor源码分析
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net