之所以存在线程池是基于以下两个原因:
- 线程的创建和销毁是需要有资源消耗的,多线程环境下频繁创建、销毁线程会影响系统性能
- 对于一个需要频繁创建任务、线程的应用来说,创建的任务数、线程数需要受到控制或管理
有了线程池,尤其是类似ThreadPoolExecutor这种可以通过参数调整其行为的线程池,可以近乎完美的解决上述两个问题。
线程池工作原理
简单来说线程池的工作原理就是:提前或者在执行任务的时候创建线程,执行完任务之后不销毁线程而是将线程归还到线程池中,后续有任务提交上来之后就可以不再创建线程、而是由线程池中空闲的线程执行任务。
这样一来就可以避免频繁创建和销毁线程,并且也可以控制线程池中线程的数量,同时如果提交任务的速度太快、线程池中的线程来不及执行任务的话,可以将任务放在队列中等待,等前面的任务执行完成、线程归还到线程池中之后,再从队列中获取任务继续执行。
其实以上就是ThreadPoolExecutor功能的简单描述。
当然ThreadPoolExecutor的功能要比这个描述强大的多也复杂的多。我们就从以下几个方面来详细分析一下ThreadPoolExecutor的功能和底层原理:
- 基本属性
- 任务队列
- 创建线程池
- 提交任务
- 执行任务
- 拒绝任务
- 钩子函数
基本属性
corePoolSize&maximumPoolSize:ThreadPoolExecutor有核心线程数(corePoolSize)和最大线程数(maximumPoolSize)的概念,新任务提交后,如果当前线程数小于corePoolSize,即使线程池中有空闲线程,ThreadPoolExecutor也会立即创建一个线程去执行任务。如果当前线程数大于corePoolSize且小于maximumPoolSize,则只有队列满的情况下才会创建线程、否则任务入队列排队。
ctl:绑定了状态runState和线程数workerCount两个属性的AtomicInteger变量:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
理解ctl的工作原理是读懂ThreadPoolExecutor源码的必要前提。
先看两个辅助变量:
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1
COUNT_BITS是Integer.size-3=31-3=29,CAPACITY是1向左位移29位后减1,用二进制表示就是:
0001 1111 1111 1111 1111 1111 1111 1111
~CAPACITY用二进制表示就是:
1110 0000 0000 0000 0000 0000 0000 0000
// runState is stored in the high-order bits
private static final int RUNNING = -1
runState有RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED等5个状态,压入ctl表示的时候全部要左移29位。意思是:ctl是按照2进制位来表达含义的,高位的3位用来表示状态runstate,低位的29位用来表示线程数workerCount。
ctl通过ctlOf函数(runState的实际状态值左移29位,移动到高3位后,和workerCount按位或操作)得到。
runStateOf函数:ctl和~CAPACITY进行按位与,~CAPACITY的二级制表示为:
1110 0000 0000 0000 0000 0000 0000 0000
按位与操作得到的就是ctl的高3位,对应的就是runState。
workerCountOf函数:ctl和CAPACITY进行按位与,CAPACITY的二进制表示为:
0001 1111 1111 1111 1111 1111 1111 1111
按位与得到的就是低29位,对应的就是workerCount。
Keep-alive:如果当前线程数超过了corePoolSize,那么超出的线程如果空闲时间超过了keep-alive会被回收(terminate)。核心线程是没有超时概念也不会被回收的,但是可以通过设置allowCoreThreadTimeOut为true,使得核心线程也受到参数Keep-alive控制从而被回收。
任务队列
创建线程池的时候,通过ThreadPoolExutor构造方法指定任务队列,可以支持任何BlockingQueue。通过Executors工具创建ThreadPoolExutor的话,支持SynchronousQueue、LinkedBlockingQueue和 ArrayBlockingQueue三种阻塞队列。
任务队列是ThreadPoolExutor的重要参数,与corePoolSize和MaxPoolSize配合使用会创建出表现完全不同的线程池:有界还是无界队列会影响到线程池接收任务的能力或表现,FIFO还是LIFO会影响到任务执行顺序,等等。
创建线程池
ThreadPoolExecutor提供了4个构造方法,但是如果你不打算替换默认的ThreadFactory和RejectedExecutionHandler的话,最常用的构造方法其实只有一个:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
调用到另外一个构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize
从构造方法中我们可以看到,ThreadPoolExecutor创建后除了设置重要属性之外啥也没干,核心线程也并没有启动!
提交任务
默认情况下,ThreadPoolExecutor提交任务的过程也同时是创建线程的过程,因为缺省情况下ThreadPoolExecutor创建的时候并不创建线程。
ThreadPoolExecutor实现了Executor接口,Executor通过其唯一方法execute来提交任务。
ThreadPoolExecutor的execute方法接收一个Runable参数作为任务,按照如下逻辑完成任务的提交:
- 如果线程池的线程数量小于corePoolSize,则通过addWorker(command,true)创建并启动一个新线程来执行任务
- 否则,尝试将任务加入队列,如果成功,再次检查线程池状态,如果线程池已经停止运行则任务出队,拒绝任务。再次检查如果当前线程数为0则调用addWorker(null,false)创建新线程
- 否则,超出核心线程数且队列满,如果尚未超出最大线程数则通过addWorker(command,false)创建新线程,否则超出最大线程数,拒绝任务
源码比较简单,需要注意的是加入队列部分:
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
调用阻塞队列的非阻塞方法offer入队,想想为啥不用阻塞方法?
线程启动时机
默认情况下ThreadPoolExecutor创建之后不会启动任何线程,包括核心线程。所有线程都是在任务提交后启动。
可以通过调用prestartCoreThread()或prestartAllCoreThreads()随时启动一个或所有核心线程。
启动线程#addWorker
线程通过addWorker(Runnable firstTask, boolean core)方法启动。firstTask是该线程的第一个任务,firstTask=null表示只启动线程、无任务。core表示要启动的是核心线程、还是普通线程,用来判断线程数是否已达上限。
addWorker方法首先做必要的合法性判断:当前线程池状态,线程数是否已达上限等,满足启动条件则更新当前线程数WorkerCount。
然后创建线程对象Worker,获得锁,加锁操作:Worker加入workers缓存(worker存储当前线程池的所有尚未执行任务的线程)……操作完成之后,解锁,并启动线程。
Worker对象
Worker是实现了Runnable接口的内部类,主要属性:
- thread:线程池的线程本尊
- firstTask: 线程创建时绑定的任务,该线程如果是任务提交的时候创建的,firstTask就是被提交的任务,如果线程创建成功,则firstTask具有优先执行权
- completedTasks:当前线程完成执行的任务数
初始化方法通过线程工厂创建一个线程:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
this作为Runnable参数传递给Thread的构造方法,线程启动的时候就回调this的run方法,所以Worker的run方法就是线程池中的线程执行任务的入口方法。
执行任务Worker.run
run方法调用了runWorker方法:
public void run() {
runWorker(this);
}
继续跟踪runWorker方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//有限执行firstTask,firstTask为空的话通过getTask()从队列中获取task
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//调用任务执行前钩子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//调用任务执行后钩子函数
afterExecute(task, thrown);
}
} finally {
//释放任务
task = null;
//当前线程执行任务数更新 +1
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//线程池相关数据更新
processWorkerExit(w, completedAbruptly);
}
}
从源码可以看到,Worker的firstTask会得到当前线程的优先执行,因为代码中获取并执行任务的循环条件中的task的初始值就是Worker的firstTask:
while (task != null || (task = getTask()) != null) {
firstTask执行完成之后,释放任务(task=null)。线程继续运行,下次循环时会通过getTask方法从队列获取任务。这个动作相当于:线程执行完一个任务之后并没有结束或销毁,而是交还给线程池,通过getTask继续从队列领任务,领到任务后继续执行。
getTask方法
getTask方法从队列获取排队等待执行的任务。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//如果当前线程池已停止,或者处于SHUTDOWN状态且队列为空则返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//线程数
int wc = workerCountOf(c);
// Are workers subject to culling?
//是否需要限定时间
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果超过最大线程数且队列空,或等待超时且(线程数>1或者队列空),返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果需要限时,则通过限时参数从阻塞队列获取任务,否则不需要限时的话,阻塞直到获取到任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//从队列拿到任务就返回该任务
if (r != null)
return r;
//否则没有拿到任务,设置超时
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
我们发现线程池的keepAliveTime参数就是在这个getTask方法中生效的。
如果线程数小于核心线程数,并且allowCoreThreadTimeOut设置为false的话,线程不受等待任务时长的限制,则采用阻塞队列的take方法、无限期等待直到可以从队列中获取任务。
如果线程数大于核心线程数,或者参数设置核心线程也需要受到超时控制,就会设置获取任务的限时时长为keepAliveTime,如果在keepAliveTime时间范围内仍然没有从阻塞队列中拿到任务,则返回null。
超过keepAliveTime时长没拿到任务将导致在runWorker方法的while循环满足结束条件而退出循环:
while (task != null || (task = getTask()) != null) {
//获取任务...
} finally {
processWorkerExit(w, completedAbruptly);
}
退出循环后,调用processWorkerExit方法结束线程、退出线程池。
keepAliveTime=0L的情况
如果设置keepAliveTime=0L,并且线程数超出核心线程数,会是什么情况?
下面这一行代码交代的清清楚楚了:
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
调用poll方法、等待时间为0去阻塞队列获取任务,可以去看一下阻塞队列的限定等待时长的poll方法的源码,等待时长为0则效果等同于非阻塞方法:获取不到数据立刻返回null。
所以keepAliveTime=0L表示:超出核心线程数后,在执行完任务之后允许空闲时间为0!即:如果没有新的任务提交上来的话,只保留corePoolSize个线程继续留在线程池等待任务,其他线程立即销毁、退出线程池。
拒绝任务
采用有界阻塞队列的线程池,在队列已满、且超出最大线程数后提交上来的任务会被拒绝,拒绝后的处理方式由ThreadPoolExecutor的拒绝策略RejectedExecutionHandler决定。拒绝策略在ThreadPoolExecutor创建时指定。主要包括:
- AbortPolicy:直接抛异常RejectedExecutionException
- CallerRunsPolicy:调用方处理,即交给提交execute方法的线程自己执行任务
- DiscardPolicy:直接扔掉该任务
- DiscardOldestPolicy:扔掉队列中等待时间最久的任务,执行当前任务
- 自定义:实现RejectedExecutionHandler接口,自定义拒绝策略
钩子函数
任务执行前和任务执行后分别调用beforeExecute/afterExecute方法,这两个方法在ThreadPoolExecutor中默认都是哑实现,什么都没做。如果你既想要采用ThreadPoolExecutor作为线程池、又想在任务执行前后做额外的动作,可以继承ThreadPoolExecutor并覆盖他的beforeExecute/afterExecute方法。
小结
线程池ThreadPoolExecutor源码分析完成,其实从应用层面来讲,绝大部分线程池需求都可以通过ThreadPoolExecutor得到满足,而且可以利用Executors工具类创建满足各种不同场景的线程池。
Thanks a lot!
上一篇 BlockingQueue – ArrayBlockingQueue
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net