序
本文主要研究一下parallelStream怎么使用自定义的线程池
ForkJoinPool
java/util/concurrent/ForkJoinPool.java
public class ForkJoinPool extends AbstractExecutorService {
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
}
parallelStream默认使用的是common的ForkJoinPool,可以通过系统属性来设置parallelism等
ForkJoinPoolFactoryBean
org/springframework/scheduling/concurrent/ForkJoinPoolFactoryBean.java
public class ForkJoinPoolFactoryBean implements FactoryBean, InitializingBean, DisposableBean {
private boolean commonPool = false;
private int parallelism = Runtime.getRuntime().availableProcessors();
private ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory;
@Nullable
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
private boolean asyncMode = false;
private int awaitTerminationSeconds = 0;
@Nullable
private ForkJoinPool forkJoinPool;
//......
@Override
public void destroy() {
if (this.forkJoinPool != null) {
// Ignored for the common pool.
this.forkJoinPool.shutdown();
// Wait for all tasks to terminate - works for the common pool as well.
if (this.awaitTerminationSeconds > 0) {
try {
this.forkJoinPool.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
}
}
spring3.1提供了ForkJoinPoolFactoryBean,可以用于创建并托管forkJoinPool
示例
配置
@Configuration
public class ForkJoinConfig {
@Bean
public ForkJoinPoolFactoryBean forkJoinPoolFactoryBean() {
ForkJoinPoolFactoryBean factoryBean = new ForkJoinPoolFactoryBean();
factoryBean.setCommonPool(false);
// NOTE LIFO_QUEUE FOR working steal from tail of queue
factoryBean.setAsyncMode(true); // NOTE true FIFO_QUEUE, false LIFO_QUEUE
factoryBean.setParallelism(10);
// factoryBean.setUncaughtExceptionHandler();
factoryBean.setAwaitTerminationSeconds(60);
return factoryBean;
}
}
使用
@Autowired
ForkJoinPoolFactoryBean forkJoinPoolFactoryBean;
public void streamParallel() throws ExecutionException, InterruptedException {
List result = forkJoinPoolFactoryBean.getObject().submit(new Callable>() {
@Override
public List call() throws Exception {
return IntStream.rangeClosed(1, 20).parallel().mapToObj(i -> {
log.info("thread:{}", Thread.currentThread().getName());
return new TodoTask(i, "name"+i);
}).collect(Collectors.toList());
}
}).get();
result.stream().forEach(System.out::println);
}
common的workerName前缀为ForkJoinPool.commonPool-worker-
自定义的workerName前缀默认为ForkJoinPool- nextPoolId() -worker-
小结
parallelStream默认使用的是commonPool,是static代码块默认初始化,针对个别场景可以自定义ForkJoinPool,将parallelStream作为一个任务丢进去,这样子就不会影响默认的commonPool。
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net