ThreadPoolExecutor为一些Executor 提供了基本的实现,这些Executor 是由Executors中的newCachedThreadPool、newFixedThreadPool和newScheduledThreadExecutor 等工厂方法返回的。ThreadPoolExecutor是一个灵活的、稳定的线程池,允许进行各种定制。
如果默认的执行策略不能满足需求,那么可以通过ThreadPoolExecutor的构造函数来实例化一个对象,并根据自己的需求来定制,并且可以参考Executors的源代码来了解默认配置下的执行策略,然后再以这些执行策略为基础进行修改。ThreadPoolExecutor定义了很多构造函数,在程序清单8-2中给出了最常见的形式。
程序清单8-2 ThreadPoolExecutor的通用构造函数 .
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueueworkQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){}
线程的创建与销毁
线程池的基本大小(Core Pool Size)、最大大小(Maximum Pool Size)以及存活时间等因素共同负责线程的创建与销毁。基本大小也就是线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。线程池的最大大小表示可同时活动的线程数量的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。
通过调节线程池的基本大小和存活时间,可以帮助线程池回收空闲线程占有的资源,从而使得这些资源可以用于执行其他工作。(显然,这是一种折衷:回收空闲线程会产生额外的延迟,因为当需求增加时,必须创建新的线程来满足需求。)
newFixedThreadPool 工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时。newCachedThreadPool 工厂方法将线程池的最大大小设置为Integer.MAX VALUE,而将基本大小设置为零,并将超时设置为1分钟,这种方法创建出来的线程池可以被无限扩展,并且当需求降低时会自动收缩。其他形式的线程池可以通过显式的ThreadPoolExecutor构造函数来构造。
管理队列任务
在有限的线程池中会限制可并发执行的任务数量。(单线程的Executor是一种值得注意的
⊖在创建ThreadPoolExecutor初期,线程并不会立即启动,而是等到有任务提交时才会启动,除非调用prestartAllCoreThreads。
开发人员以免有时会将线程池的基本大小设置为零,从而最终销毁工作者线程以免阻碍JVM的退出。然而,如果在线程池中没有使用SynchronousQueue 作为其工作队列(例如在newCachedThreadPool中就是如此),那么这种方式将产生一些奇怪的行为。如果线程池中的线程数量等于线程池的基本大小,那么仅当在工作队列已满的情况下ThreadPoolExecutor 才会创建新的线程。因此,如果线程池的基本大小为零并且其工作队列有一定的容量,那么当把任务提交给该线程池时,只有当线程池的工作队列被填满后,才会开始执行任务,而这种行为通常并不是我们所希望的。在Java 6中,可以通过allowCoreThreadTimeOut来使线程池中的所有线程超时。对于一个大小有限的线程池并且在该线程池中包含一个工作队列,如果希望这个线程池在没有任务的情况下能销毁所有线程,那么可以启用这个特性并将基本大小设置为零。
特例:它们能确保不会有任务并发执行,因为它们通过线程封闭来实现线程安全性。)
在6.1.2节中曾介绍,如果无限制地创建线程,那么将导致不稳定性,并通过采用固定大小的线程池(而不是每收到一个请求就创建一个新线程)来解决这个问题。然而,这个方案并不完整。在高负载情况下,应用程序仍可能耗尽资源,只是出现问题的概率较小。如果新请求的到达速率超过了线程池的处理速率,那么新到来的请求将累积起来。在线程池中,这些请求会在一个由Executor 管理的Runnable 队列中等待,而不会像线程那样去竞争CPU资源。通过一个Runnable 和一个链表节点来表现一个等待中的任务,当然比使用线程来表示的开销低很多,但如果客户提交给服务器请求的速率超过了服务器的处理速率,那么仍可能会耗尽资源。
即使请求的平均到达速率很稳定,也仍然会出现请求突增的情况。尽管队列有助于缓解任务的突增问题,但如果任务持续高速地到来,那么最终还是会抑制请求的到达率以避免耗尽内存。甚至在耗尽内存之前,响应性能也将随着任务队列的增长而变得越来越糟。
ThreadPoolExecutor允许提供一个BlockingQueue 来保存等待执行的任务。基本的任务排队方法有3种:无界队列、有界队列和同步移交(Synchronous Handoff)。队列的选择与其他的配置参数有关,例如线程池的大小等。
newFixedThreadPool和newSingleThreadExecutor 在默认情况下将使用一个无界的LinkedBlockingQueue。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过了线程池处理它们的速度,那么队列将无限制地增加。
一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue。有界队列有助于避免资源耗尽的情况发生,但它又带来了新的问题:当队列填满后,新的任务该怎么办?(有许多饱和策略[Saturation Policy]可以解决这个问题。请参见8.3.3节。)在使用有界的工作队列时,队列的大小与线程池的大小必须一起调节。如果线程池较小而队列较大,那么有助于减少内存使用量,降低CPU的使用率,同时还可以减少上下文切换,但付出的代价是可能会限制吞吐量。
对于非常大的或者无界的线程池,可以通过使用SynchronousQueue 来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor 将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是被首先放在队列中,然后由工作者线程从队列中提取该任务。只有当线程池是无界的或者可以拒绝任务时,SynchronousQueue 才有实际价值。在newCachedThreadPool工厂方法中就使用了SynchronousQueue。
当使用像LinkedBlockingQueue 或ArrayBlockingQueue 这样的FIFO(先进先出)队列时,任务的执行顺序与它们的到达顺序相同。如果想进一步控制任务执行顺序,还可以使用PriorityBlockingQueue,这个队列将根据优先级来安排任务。任务的优先级是通过自然顺序或
这类似于通信网络中的流量控制:可以缓存一定数量的数据,但最终需要通过某种方式来告诉发送端停止发送数据,或者丢弃过多的数据并希望发送端在空闲时重传被丢弃的数据。
Comparator (如果任务实现了Comparable)来定义的。
对于Executor,newCachedThreadPool工厂方法是一种很好的默认选择,它能提供比固定大小的线程池更好的排队性能。当需要限制当前任务的数量以满足资源管理需求时,那么可以选择固定大小的线程池,就像在接受网络客户请求的服务器应用程序中,如果不进行限制,那么很容易发生过载问题。
只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么有界的线程池或队列就可能导致线程“饥饿”死锁问题。此时应该使用无界的线程池,例如1 newCachedThreadPool。
饱和策略
当有界队列被填满后,饱和策略开始发挥作用。ThreadPoolExecutor 的饱和策略可以通过调用setRejectedExecutionHandler来修改。(如果某个任务被提交到一个已被关闭的Executor 时,也会用到饱和策略。)JDK提供了几种不同的RejectedExecutionHandler 实现,每种实现都包含有不同的饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy 和DiscardOldestPolicy。
“中止(Abort)”策略是默认的饱和策略,该策略将抛出未检查的RejectedExecution-Exception。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。当新提交的任务无法保存到队列中等待执行时,“抛弃(Discard)”策略会悄悄抛弃该任务。“抛弃最旧的(Discard-Oldest)”策略则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。(如果工作队列是一个优先队列,那么“抛弃最旧的”策略将导致抛弃优先级最高的任务,因此最好不要将“抛弃最旧的”饱和策略和优先级队列放在一起使用。)
“调用者运行(Caller-Runs)”策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute 的线程中执行该任务。我们可以将WebServer示例修改为使用有界队列和“调用者运行”饱和策略,当线程池中的所有线程都被占用,并且工作队列被填满后,下一个任务会在调用execute 时在主线程中执行。由于执行任务需要一定的时间,因此主线程至少在一段时间内不能提交任何任务,从而使得工作者线程有时间来处理完正在执行的任务。在这期间,主线程不会调用accept,因此到达的请求将被保存在TCP层的队列中而不是在应用程序的队列中。如果持续过载,那么TCP层将最终发现它的请求队列被填满,因此同样会开始抛弃请求。当服务器过载时,这种过载情况会逐渐向外蔓延开来——从线程池到工作队列到应用程序再到TCP层,最终达到客户端,导致服务器在高负载下实现一种平缓的性能降低。
这种性能差异是由于使用了SynchronousQueue而不是LinkedBlockingQueue。在Java 6 中提供了一个新的非阻塞算法来替代SynchronousQueue,与Java 5.0 中的SynchronousQueue 相比,该算法把Executor 基准的吞吐量提高了3 倍(Scherer et al.,2006)。
②对于提交其他任务并等待其结果的任务来说,还有另一种配置方法,就是使用有界的线程池,并使用SynchronousQueue 作为工作队列,以及“调用者运行(Caller-Runs)”饱和策略。
当创建Executor时,可以选择饱和策略或者对执行策略进行修改。程序清单8-3 给出了如何创建一个固定大小的线程池,同时使用“调用者运行”饱和策略。
程序清单8-3 创建一个固定大小的线程池,并采用有界队列以及“调用者运行”饱和策略
ThreadPoolExecutor executor
= new ThreadPoolExecutor(N_THREADS,N_THREADS,
0L,TimeUnit. MILLISECONDS,
new LinkedBlockingQueue(CAPACITY));
executor. setRejectedExecutionHandler(
new ThreadPoolExecutor. CallerRunsPolicy());
当工作队列被填满后,没有预定义的饱和策略来阻塞execute。然而,通过使用Semaphore (信号量)来限制任务的到达率,就可以实现这个功能。在程序清单8-4的BoundedExecutor中给出了这种方法。该方法使用了一个无界队列(因为不能限制队列的大小和任务的到达率),并设置信号量的上界设置为线程池的大小加上可排队任务的数量,这是因为信号量需要控制正在执行的和等待执行的任务数量。
程序清单8-4 使用 Semaphore 来控制任务的提交速率
@ThreadSafe
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound){
this. exec =exec;
this. semaphore =new Semaphore(bound);
}
public void submitTask(final ‘Runnable command)
throws InterruptedException {
semaphore. acquire();
try {
exec. execute(new Runnable(){
public void run(){
try {
command. run();
}finally {
semaphore. release();
}
}catch (RejectedExecutionException e){
semaphore. release();
}
}
}
线程工厂
每当线程池需要创建一个线程时,都是通过线程工厂方法(请参见程序清单8-5)来完成的。默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊的配置信息。通过指定一个线程工厂方法,可以定制线程池的配置信息。在ThreadFactory 中只定义了一个方法newThread,每当线程池需要创建一个新线程时都会调用这个方法。
然而,在许多情况下都需要使用定制的线程工厂方法。例如,你希望为线程池中的线程指定一个UncaughtExceptionHandler,或者实例化一个定制的Thread 类用于执行调试信息的记录。你还可能希望修改线程的优先级(这通常并不是一个好主意。请参见10.3.1节)或者守护状态(同样,这也不是一个好主意。请参见7.4.2节)。或许你只是希望给线程取一个更有意义的名称,用来解释线程的转储信息和错误日志。
程序清单8-5 ThreadFactory接口
public interface ThreadFactory {
Thread newThread(Runnable r);
}
在程序清单8-6的MyThreadFactory 中给出了一个自定义的线程工厂。它创建了一个新的MyAppThread 实例,并将一个特定于线程池的名字传递给MyAppThread 的构造函数,从而可以在线程转储和错误日志信息中区分来自不同线程池的线程。在应用程序的其他地方也可以使用MyAppThread,以便所有线程都能使用它的调试功能。
程序清单8-6 自定义的线程工厂
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String poolName){
this. poolName =poolName;
}
public Thread newThread(Runnable runnable){
return new MyAppThread(runnable,poolName);
}
}
在MyAppThread 中还可以定制其他行为,如程序清单8-7 所示,包括:为线程指定名字,设置自定义UncaughtExceptionHandler向Logger 中写入信息,维护一些统计信息(包括有多少个线程被创建和销毁),以及在线程被创建或者终止时把调试消息写入日志。
程序清单8-7定制 Thread 基类
public class MyAppThread extends Thread {
public static final String DEFAULT_NAME=”MyAppThread”;
private static volatile boolean debugLifecycle =false;
private static final AtomicInteger created =new AtomicInteger();
private static final AtomicInteger alive =new AtomicInteger();
private static final Logger log =Logger. getAnonymousLogger();
public MyAppThread( Runnable r){ this(r,DEFAULT_NAME);}
public MyAppThread(Runnable runnable, String name){
super(runnable, name+”-“+created. incrementAndGet());
setUncaughtExceptionHandler(
new Thread. UncaughtExceptionHandler(){
public void uncaughtException(Thread t,
Throwable e){
log. log(Level. SEVERE,
“UNCAUGHT in thread”+t. getName(),e);
〕 |
} ) ;
}
public void run(){
//复制debug标志以确保一致的值
boolean debug=debugLifecycle;
if (debug) log. log(Level. FINB,”Created “+getName());
try {
alive. incrementAndGet();
super. run();
}finally {
alive. decrementAndGet();
if (debug) log. log(Level. FINE,”Exiting “+getName());
}
}
public static int getThreadsCreated(){return created. get();}
public static int getThreadsAlive(){return alive. get();}
public static boolean getDebug(){return debugLifecycle;}
public static void setDebug(boolean b){debugLifecycle =b;}
}
如果在应用程序中需要利用安全策略来控制对某些特殊代码库的访问权限,那么可以通过Executor中的privilegedThreadFactory工厂来定制自己的线程工厂。通过这种方式创建出来的线程,将与创建privilegedThreadFactory的线程拥有相同的访问权限、AccessControlContext和contextClassLoader。如果不使用privilegedThreadFactory,线程池创建的线程将从在需要新线程时调用execute 或submit的客户程序中继承访问权限,从而导致令人困惑的安全性异常。
在调用构造函数后再定制ThreadPoolExecutor
在调用完ThreadPoolExecutor的构造函数后,仍然可以通过设置函数(Setter)来修改大多数传递给它的构造函数的参数(例如线程池的基本大小、最大大小、存活时间、线程工厂以及拒绝执行处理器(Rejected Execution Handler))。如果Executor 是通过Executors中的某个(newSingleThreadExecutor除外)工厂方法创建的,那么可以将结果的类型转换为ThreadPoolExecutor以访问设置器,如程序清单8-8所示。
程序清单8-8 对通过标准工厂方法创建的 Executor 进行修改
ExecutorService exec =Executors. newCachedThreadPool();
if (exec instanceof ThreadPoolExecutor)
((ThreadPoolExecutor) exec). setCorePoolSize(10);
else
throw new AssertionError(“ Oops, bad assumption”);
在Executors中包含一个unconfigurableExecutorService 工厂方法,该方法对一个现有的ExecutorService 进行包装,使其只暴露出ExecutorService的方法,因此不能对它进行配置。newSingleThreadExecutor返回按这种方式封装的ExecutorService,而不是最初的ThreadPoolExecutor。虽然单线程的Executor实际上被实现为一个只包含唯一线程的线程池,但它同样确保了不会并发地执行任务。如果在代码中增加单线程Executor 的线程池大小,那么将破坏它的执行语义。
你可以在自己的Executor 中使用这项技术以防止执行策略被修改。如果将ExecutorService 暴露给不信任的代码,又不希望对其进行修改,就可以通过unconfigurableExecutorService来包装它。.
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
相关推荐: 【可乐荐书】有趣的矩阵:看得懂又好看的线性代数
一、书籍简介
二、内容简介
三、作者简介
最后本栏目将推荐一些经典的、有趣的、有启发性的书籍,这些书籍涵盖了各个领域,包括文学、历史、哲学、科学、技术等等。相信这些书籍不仅可以让你获得知识,还可以让你感受到阅读的乐趣和魅力。 今天给大家推荐的书籍是:《有趣的矩阵:看得懂又好看的线性代数》 一、书籍简介 线…