什么是线程池?
线程池是一种利用池化技术思想来实现的线程管理技术,主要是为了复用线程、便利地管理线程和任务、并将线程的创建和任务的执行解耦开来。我们可以创建线程池来复用已经创建的线程来降低频繁创建和销毁线程所带来的资源消耗。在JAVA中主要是使用ThreadPoolExecutor类来创建线程池。
线程池的优点
降低资源消耗,复用已创建的线程来降低创建和销毁线程的消耗。
提高响应速度,任务到达时,可以不需要等待线程的创建立即执行。
提高线程的可管理性,使用线程池能够统一的分配、调优和监控。
线程池的状态及生命周期
ThreadPoolExecutor的构造方法参数
corePoolSize,核心线程数量
maximumPoolSize,最大线程数量
keepAliveTime,线程空闲时存活的时间
unit,空闲存活时间单位
workQueue,任务队列,用于存放已提交的任务(ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue)
threadFactory,线程工厂,用于创建线程执行任务
handler,拒绝策略,当线程池处于饱和时,使用某种策略来拒绝任务提交(AbortPolicy:抛异常;DiscardPolicy:直接丢弃;DiscardOldestPolicy:丢第一个;CallerRunsPolicy:谁提交任务,谁执行)
从execute源码分析线程池执行任务的流程
线程池核心属性
// AtomicInteger,就是一个int,写操作用CAS实现,保证了原子性
// ctl维护这线程池的2个核心内容:
// 1:线程池状态(高3位,维护着线程池状态)
// 2:工作线程数量(核心线程+非核心线程,低29位,维护着工作线程个数)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS=29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 工作线程的最大个数
// 00100000 00000000 00000000 00000000 - 1
// 000111111111111111111111111111111
private static final int CAPACITY = (1
execute()
public void execute(Runnable command) {
// 非空!!
if (command == null)
throw new NullPointerException();
// 拿到ctl
int c = ctl.get();
// 通过ctl获取当前工作线程个数
if (workerCountOf(c)
从execute()方法中,可以看到,当有一个任务进来后
1.首先会判断当前线程池里工作线程的个数是否小于设置的核心线程个数,如果核心线程个数未满,会尝试调用addWorker()方法,新增一个核心线程,如果新增失败,则重新获取一次线程池的状态,走下面逻辑。同时如果核心线程个数已满。也走下面的逻辑。
2.判断线程池状态是否是运行状态,如果是,调用阻塞队列的offer()方法将此任务添加到阻塞队列中,如果不是或者阻塞队列已满不能添加进去,就执行拒绝策略
3.如果在添加到阻塞队列后,DCL检查线程池的状态不是运行状态,就会把此任务从阻塞队列中移除,执行拒绝策略。如果是运行状态,就再检查一下线程池的线程数量,如果是0,就添加一个空任务的非核心线程去处理工作队列中的任务。
addWorker()
private boolean addWorker(Runnable firstTask, boolean core) {
xxx:
for (;;) {
// 阿巴阿巴…………
int c = ctl.get();
int rs = runStateOf(c);
// 判断线程池状态
if (rs >= SHUTDOWN &&
// 判断如果线程池的状态为SHUTDOWN,还要处理工作队列中的任务
// 如果你添加工作线程的方式,是任务的非核心线程,并且工作队列还有任务
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
// 判断工作线程个数
for (;;) {
// 阿巴阿巴……
int wc = workerCountOf(c);
// 判断1:工作线程是否已经 == 工作线程最大个数
// 判断2-true判断:判断是核心线程么?如果是判断是否超过核心线程个数
// 判断2-false判断:如果是非核心线程,查看是否超过设置的最大线程数
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 对工作线程进行 + 1操作
if (compareAndIncrementWorkerCount(c))
// +1成功,跳出外层循环,执行添加工作线程的业务
// 以CAS方式,对ctl+1,多线程并发操作,只有会有一个成功
break xxx;
// 重新拿ctl,
c = ctl.get();
// 判断线程池状态是否有变化
if (runStateOf(c) != rs)
continue xxx;
}
}
// 添加工作线程的业务
// 工作线程启动了吗?
boolean workerStarted = false;
// 工作线程添加了吗?
boolean workerAdded = false;
// Worker就是工作线程
Worker w = null;
try {
// 创建工作线程,将任务传到Worker中
w = new Worker(firstTask);
final Thread t = w.thread;
// 只有你写的线程工厂返回的是null,这里才会为null
if (t != null) {
// 获取锁资源
final ReentrantLock mainLock = this.mainLock;
// 加锁。 因为我要在启动这个工作线程时,避免线程池状态发生变化,加锁。
mainLock.lock();
try {
// 重新获取ctl,拿到线程池状态
int rs = runStateOf(ctl.get());
// DCL i think you know~~~
if (rs largestPoolSize)
largestPoolSize = s;
// 工作线程添加成功 0
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果添加成功
if (workerAdded) {
// 启动工作线程
t.start();
// 设置标识为true
workerStarted = true;
}
}
} finally {
// 如果工作线程启动失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker()方法有两个参数,一个是要执行的任务,另一个是是否要添加的是核心线程的布尔类型变量。
1.第一部分是一个双重死循环,第一层循环是判断当前线程池是否还有添加线程的必要,当线程池的
状态为STOP以上的状态或者是SHUTDOWN且阻塞队列中没有任务的时候,不用添加线程,且不处理此次进来的任务,直接返回false;第二层循环是根据我们传的布尔类型的core变量判断,如果我们要添加的是核心线程,就判断核心线程数量是否已满,如果满了,直接返回false,在execute()方法中,添加失败后,会把任务扔到阻塞队列里。如果添加的是非核心线程,判断当前线程池的线程数量是否超过设置的最大值,如果超过了,直接返回false,在execute()方法中,添加失败后,会直接执行拒绝策略。当所有判断都满足后,用CAS的方式对线程池的工作线程数量+1,成功则跳出循环,失败则继续自旋,直到添加成功,或者上面条件不满足,添加失败。
2.第二部分就是开始添加工作线程了,先将此次要执行的任务封装到Worker对象中,用ReentrantLock对整个线程池加锁,避免此次在添加工作线程的时候,线程池被其他线程干掉。然后将此工作线程添加到线程池的工作线程集合中,然后解锁。如果添加成功,直接启动线程,由于Worker类继承了Runnable接口并重写了run(),所以这里继续追踪Worker类中的runWorker()方法,如果添加失败,则执行addWorkerFailed()方法。
addWorkerFailed()
// 如果添加工作线程失败,执行
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 说明worker可能存放到了workers的hashSet中。
if (w != null)
// 移除!
workers.remove(w);
// 减掉workerCount的数值 -1
decrementWorkerCount();
// 尝试干掉自己
tryTerminate();
} finally {
mainLock.unlock();
}
}
runWorker()
final void runWorker(Worker w) {
// 拿到当前线程对象
Thread wt = Thread.currentThread();
// 拿到worker中存放的Runnable
Runnable task = w.firstTask;
// 将worker中的任务清空
w.firstTask = null;
// 揍是一个标识
boolean completedAbruptly = true;
try {
// 如果Worker自身携带任务,直接执行
// 如果Worker携带的是null,通过getTask去工作队列获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 判断线程池状态是否大于等于STOP,如果是要中断当前线程
if ((runStateAtLeast(ctl.get(), STOP) ||
// 中断当前线程(DCL)
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
// 前置钩子
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 后置钩子
afterExecute(task, thrown);
}
} finally {
task = null;
// 当前工作执行完一个任务,就++
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
在这个方法中,入参是上一步传进来的worker对象,因为这个worker对象有可能是已经封装了任务的,也有可能是没有封装任务的,所以这里循环条件是先从自己这里拿,如果没有,就用getTask()去阻塞队列中拿,如果是非核心线程或者是核心线程且允许超时调用的是阻塞队列的poll()方法,如果阻塞队列里一直没有任务,线程在这等待直到超时,如果是不允许超时的核心线程调用的是阻塞队列的take()方法,如果阻塞队列里一直没有任务,就会使用await()使线程阻塞住,直到队列是有任务添加,会singal()唤醒。在真正执行任务前后有两个钩子方法,beforeExecute(),afterExecute()可以由我们自己定义内容。
getTask()
private Runnable getTask() {
// 超时-false
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池状态判断
// 如果线程池状态为SHUTDOWN && 工作队列为空
// 如果线程池状态为STOP
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 对工作线程个数--
decrementWorkerCount();
return null;
}
// 对数量的判断。
int wc = workerCountOf(c);
// 判断核心线程是否允许超时?
// 工作线程个数是否大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 判断工作线程是否超过了最大线程数 && 工作队列为null
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
// 工作线程数有问题,必须-1,干掉当前工作线程
// 工作线程是否超过了核心线程,如果超时,就干掉当前线程
// 对工作线程个数--
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果是非核心,走poll,拉取工作队列任务,
// 如果是核心线程,走take一直阻塞,拉取工作队列任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 当工作队列没有任务时,这时就会被Condition通过await阻塞线程
// 当有任务添加到工作线程后,这是添加完任务后,就会用过Condition.signal唤醒阻塞的线程
workQueue.take();
if (r != null)
return r;
// 执行的poll方法,并且在指定时间没拿到任务,
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
至此,线程池执行的的流程结束了,流程如下图:
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net