一、摘要
在上篇文章中,我们介绍了Future
相关的用法,使用它可以获取异步任务执行的返回值。
我们再次回顾一下Future
相关的用法。
public class FutureTest {
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
// 创建一个线程池
ExecutorService executor = Executors.newFixedThreadPool(1);
// 提交任务并获得Future的实例
Future future = executor.submit(new Callable() {
@Override
public String call() throws Exception {
// 执行下载某文件任务,并返回文件名称
System.out.println("thread name:" + Thread.currentThread().getName() + " 开始执行下载任务");
Thread.sleep(200);
return "xxx.png";
}
});
//模拟主线程其它操作耗时
Thread.sleep(300);
// 通过阻塞方式,从Future中获取异步执行返回的结果
String result = future.get();
System.out.println("任务执行结果:" + result);
System.out.println("总共用时:" + (System.currentTimeMillis() - startTime) + "ms");
// 任务执行完毕之后,关闭线程池
executor.shutdown();
}
}
运行结果如下:
thread name:pool-1-thread-1 开始执行下载任务
任务执行结果:xxx.png
总共用时:308ms
如果不采用线程执行,那么总共用时应该会是 200 + 300 = 500 ms,而采用线程来异步执行,总共用时是 308 ms。不难发现,通过Future
和线程池的搭配使用,可以有效的提升程序的执行效率。
但是Future
对异步执行结果的获取并不是很友好,要么调用阻塞方法get()
获取结果,要么轮训调用isDone()
方法是否等于true
来判断任务是否执行完毕来获取结果,这两种方法都不算很好,因为主线程会被迫等待。
因此,从 Java 8 开始引入了CompletableFuture
,它针对Future
做了很多的改进,在实现Future
接口相关功能之外,还支持传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象方法。
下面我们一起来看看CompletableFuture
相关的用法!
二、CompletableFuture 用法介绍
我们还是以上面的例子为例,改用CompletableFuture
来实现,内容如下:
public class FutureTest2 {
public static void main(String[] args) throws Exception {
// 创建异步执行任务
CompletableFuture cf = CompletableFuture.supplyAsync(FutureTest2::download);
// 如果执行成功,回调此方法
cf.thenAccept((result) -> {
System.out.println("任务执行成功,返回结果值:" + result);
});
// 如果执行异常,回调此方法
cf.exceptionally((e) -> {
System.out.println("任务执行失败,原因:" + e.getMessage());
return null;
});
//模拟主线程其它操作耗时
Thread.sleep(300);
}
/**
* 下载某个任务
* @return
*/
private static String download(){
// 执行下载某文件任务,并返回文件名称
System.out.println("thread name:" + Thread.currentThread().getName() + " 开始执行下载任务");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "xxx.png";
}
}
运行结果如下:
thread name:ForkJoinPool.commonPool-worker-1 开始执行下载任务
任务执行成功,返回结果值:xxx.png
可以发现,采用CompletableFuture
类的supplyAsync()
方法进行异步编程,代码上简洁了很多,不需要单独创建线程池。
实际上,CompletableFuture
也使用了线程池来执行任务,部分核心源码如下:
public class CompletableFuture implements Future, CompletionStage {
// 判断当前机器 cpu 可用逻辑核心数是否大于1
private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
// 默认采用的线程池
// 如果useCommonPool = true,采用 ForkJoinPool.commonPool 线程池
// 如果useCommonPool = false,采用 ThreadPerTaskExecutor 执行器
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
// ThreadPerTaskExecutor执行器类
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); }
}
// 异步执行任务的方法
public static CompletableFuture supplyAsync(Supplier supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
// 异步执行任务的方法,支持传入自定义线程池
public static CompletableFuture supplyAsync(Supplier supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
}
从源码上可以分析出如下几点:
-
当前机器 cpu 可用逻辑核心数大于 1,默认会采用
ForkJoinPool.commonPool()
线程池来执行任务 -
当前机器 cpu 可用逻辑核心数等于 1,默认会采用
ThreadPerTaskExecutor
类来执行任务,它是个一对一执行器,每提交一个任务会创建一个新的线程来执行 - 同时也支持用户传入自定义线程池来异步执行任务
其中ForkJoinPool
线程池是从 JDK 1.7 版本引入的,它是一个全新的线程池,后面在介绍Fork/Join
框架文章中对其进行介绍。
除此之外,CompletableFuture
为开发者还提供了几十种方法,以便满足更多的异步任务执行的场景。这些方法包括创建异步任务、任务异步回调、多个任务组合处理等内容,下面我们就一起来学习一下相关的使用方式。
2.1、创建异步任务
CompletableFuture
创建异步任务,常用的方法有两个。
-
runAsync()
:执行异步任务时,没有返回值 -
supplyAsync()
:执行异步任务时,可以带返回值
runAsync()
和supplyAsync()
方法相关的源码如下:
// 使用默认内置线程池执行任务,根据runnable构建执行任务,无返回值
public static CompletableFuture runAsync(Runnable runnable)
// 使用自定义线程池执行任务,根据runnable构建执行任务,无返回值
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
// 使用默认内置线程池执行任务,根据supplyAsync构建执行任务,可以带返回值
public static CompletableFuture supplyAsync(Supplier supplier)
// 使用自定义线程池执行任务,根据supplyAsync构建执行任务,可以带返回值
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
两者都支持使用自定义的线程池来执行任务,稍有不同的是supplyAsync()
方法的入参使用的是Supplier
接口,它表示结果的提供者,该结果返回一个对象且不接受任何参数,支持通过 lambda 语法简写。
下面我们一起来看看相关的使用示例!
2.1.1、runAsync 使用示例
public static void main(String[] args) throws Exception {
// 创建异步执行任务
CompletableFuture cf = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println("runAsync,执行完毕");
}
});
System.out.println("runAsync,任务执行结果:" + cf.get());
}
输出结果:
runAsync,执行完毕
runAsync,任务执行结果:null
2.1.2、supplyAsync 使用示例
public static void main(String[] args) throws Exception {
// 创建异步执行任务
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
return "hello world";
});
System.out.println("supplyAsync,任务执行结果:" + cf.get());
}
输出结果:
supplyAsync,执行完毕
supplyAsync,任务执行结果:hello world
2.2、任务异步回调
当创建的异步任务执行完毕之后,我们希望拿着上一个任务的执行结果,继续执行后续的任务,此时就可以采用回调方法来处理。
CompletableFuture
针对任务异步回调做了很多的支持,常用服务器托管网的方法如下:
-
thenRun()/thenRunAsync()
:它表示上一个任务执行成功后的回调方法,无入参,无返回值 -
thenAccept()/thenAcceptAsync()
:它表示上一个任务执行成功后的回调方法,有入参,无返回值 -
thenApply()/thenApplyAsync()
:它表示上一个任务执行成功后的回调方法,有入参,有返回值 -
whenComplete()/whenCompleteAsync()
:它表示任务执行完成后的回调方法,有入参,无返回值 -
handle()/handleAsync()
:它表示任务执行完成后的回调方法,有入参,有返回值 -
exceptionally()
:它表示任务执行异常后的回调方法
下面我们一起来看看相关的使用示例!
2.2.1、thenRun/thenRunAsync
thenRun()/thenRunAsync()
方法,都表示上一个任务执行成功后的回调处理,无入参,无返回值。稍有不同的是,thenRunAsync()
方法会采用独立的线程池来执行任务。
相关的源码方法如下:
// 默认线程池
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
// 采用与上一个任务的线程池来执行任务
public CompletableFuture thenRun(Runnable action) {
return uniRunStage(null, action);
}
// 采用默认线程池来执行任务
public CompletableFuture thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
从源码上可以清晰的看到,thenRun()/thenRunAsync()
方法都调用了uniRunStage()
方法,不同的是thenRunAsync()
使用了asyncPool
参数,也就是默认的线程池;而thenRun()
方法使用的是null
,底层采用上一个任务的线程池来执行,总结下来就是:
- 当调用
thenRun()
方法执行任务时,当前任务和上一个任务都共用同一个线程池 - 当调用
thenRunAsync()
方法执行任务时,上一个任务采用自己的线程池来执行;而当前任务会采用默认线程池来执行,比如ForkJoinPool
。
thenAccept()/thenAcceptAsync()
、thenApply()/thenApplyAsync()
、whenComplete()/whenCompleteAsync()
、handle()/handleAsync()
方法之间的区别也类似,下文不再重复讲解。
下面我们一起来看看thenRun()
方法的使用示例。
public static void main(String[] args) throws Exception {
// 创建异步执行任务
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
return "hello world";
});
// 当上一个任务执行成功,会继续回调当前方法
CompletableFuture cf2 = cf1.thenRun(()服务器托管网 -> {
System.out.println("thenRun1,执行完毕");
});
CompletableFuture cf3 = cf2.thenRun(() -> {
System.out.println("thenRun2,执行完毕");
});
System.out.println("任务执行结果:" + cf3.get());
}
输出结果:
supplyAsync,执行完毕
thenRun1,执行完毕
thenRun2,执行完毕
任务执行结果:null
如果上一个任务执行异常,是不会回调thenRun()
方法的,示例如下:
public static void main(String[] args) throws Exception {
// 创建异步执行任务
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
if(1 == 1){
throw new RuntimeException("执行异常");
}
return "hello world";
});
// 当上一个任务执行成功,会继续回调当前方法
CompletableFuture cf1 = cf.thenRun(() -> {
System.out.println("thenRun1,执行完毕");
});
// 监听执行时异常的回调方法
CompletableFuture cf2 = cf1.exceptionally((e) -> {
System.out.println("发生异常,错误信息:" + e.getMessage());
return null;
});
System.out.println("任务执行结果:" + cf2.get());
}
输出结果:
supplyAsync,执行完毕
发生异常,错误信息:java.lang.RuntimeException: 执行异常
任务执行结果:null
可以清晰的看到,thenRun()
方法没有回调。
thenAccept()
、thenAcceptAsync()
、thenApply()
、thenApplyAsync()
方法也类似,当上一个任务执行异常,不会回调这些方法。
2.2.2、thenAccept/thenAcceptAsync
thenAccept()/thenAcceptAsync()
方法,表示上一个任务执行成功后的回调方法,有入参,无返回值。
相关的示例如下。
public static void main(String[] args) throws Exception {
// 创建异步执行任务
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
return "hello world";
});
// 当上一个任务执行成功,会继续回调当前方法
CompletableFuture cf2 = cf1.thenAccept((r) -> {
System.out.println("thenAccept,执行完毕,上一个任务执行结果值:" + r);
});
System.out.println("任务执行结果:" + cf2.get());
}
输出结果:
supplyAsync,执行完毕
thenAccept,执行完毕,上一个任务执行结果值:hello world
任务执行结果:null
2.2.3、thenApply/thenApplyAsync
thenApply()/thenApplyAsync()
方法,表示上一个任务执行成功后的回调方法,有入参,有返回值。
相关的示例如下。
public static void main(String[] args) throws Exception {
// 创建异步执行任务
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
return "hello world";
});
// 当上一个任务执行成功,会继续回调当前方法
CompletableFuture cf2 = cf1.thenApply((r) -> {
System.out.println("thenApply,执行完毕,上一个任务执行结果值:" + r);
return "gogogo";
});
System.out.println("任务执行结果:" + cf2.get());
}
输出结果:
supplyAsync,执行完毕
thenApply,执行完毕,上一个任务执行结果值:hello world
任务执行结果:gogogo
2.2.4、whenComplete/whenCompleteAsync
whenComplete()/whenCompleteAsync()
方法,表示任务执行完成后的回调方法,有入参,无返回值。
稍有不同的是:无论任务执行成功还是失败,它都会回调。
相关的示例如下。
public static void main(String[] args) throws Exception {
// 创建异步执行任务
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
if(1 == 1){
throw new RuntimeException("执行异常");
}
return "hello world";
});
// 当任务执行完成,会继续回调当前方法
CompletableFuture cf2 = cf1.whenComplete((r, e) -> {
System.out.println("whenComplete,执行完毕,上一个任务执行结果值:" + r + ",异常信息:" + e.getMessage());
});
// 监听执行时异常的回调方法
CompletableFuture cf3 = cf2.exceptionally((e) -> {
System.out.println("发生异常,错误信息:" + e.getMessage());
return e.getMessage();
});
System.out.println("任务执行结果:" + cf3.get());
}
输出结果:
supplyAsync,执行完毕
whenComplete,执行完毕,上一个任务执行结果值:null,异常信息:java.lang.RuntimeException: 执行异常
发生异常,错误信息:java.lang.RuntimeException: 执行异常
任务执行结果:java.lang.RuntimeException: 执行异常
2.2.5、handle/handleAsync
handle()/handleAsync()
方法,表示任务执行完成后的回调方法,有入参,有返回值。
同样的,无论任务执行成功还是失败,它都会回调。
相关的示例如下。
public static void main(String[] args) throws Exception {
// 创建异步执行任务
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行完毕");
if(1 == 1){
throw new RuntimeException("执行异常");
}
return "hello world";
});
// 当任务执行完成,会继续回调当前方法
CompletableFuture cf2 = cf1.handle((r, e) -> {
System.out.println("handle,执行完毕,上一个任务执行结果值:" + r + ",异常信息:" + e.getMessage());
return "handle";
});
System.out.println("任务执行结果:" + cf2.get());
}
输出结果:
supplyAsync,执行完毕
handle,执行完毕,上一个任务执行结果值:null,异常信息:java.lang.RuntimeException: 执行异常
任务执行结果:handle
2.2.6、exceptionally
exceptionally()
方法,表示任务执行异常后的回调方法。在上文的示例中有所介绍。
最后我们还是简单的看下示例。
public static void main(String[] args) throws Exception {
// 创建异步执行任务
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync,执行开始");
if(1 == 1){
throw new RuntimeException("执行异常");
}
return "hello world";
});
// 监听执行时异常的回调方法
CompletableFuture cf2 = cf1.exceptionally((e) -> {
System.out.println("发生异常,错误信息:" + e.getMessage());
return e.getMessage();
});
System.out.println("任务执行结果:" + cf2.get());
}
输出结果:
supplyAsync,执行开始
发生异常,错误信息:java.lang.RuntimeException: 执行异常
任务执行结果:java.lang.RuntimeException: 执行异常
2.3、多个任务组合处理
某些场景下,如果希望获取两个不同的异步执行结果进行组合处理,可以采用多个任务组合处理方式。
CompletableFuture
针对多个任务组合处理做了很多的支持,常用的组合方式有以下几种。
-
AND组合
:表示将两个CompletableFuture
任务组合起来,只有这两个任务都正常执行完了,才会继续执行回调任务,比如thenCombine()
方法 -
OR组合
:表示将两个CompletableFuture
任务组合起来,只要其中一个正常执行完了,就会继续执行回调任务,比如applyToEither
方法 -
AllOf组合
:可以将多个CompletableFuture
任务组合起来,只有所有的任务都正常执行完了,才会继续执行回调任务,比如allOf()
方法 -
AnyOf组合
:可以将多个CompletableFuture
任务组合起来,只要其中一个任务正常执行完了,就会继续执行回调任务,比如anyOf()
方法
下面我们一起来看看相关的使用示例!
2.3.1、AND组合
实现AND组合
的操作方法有很多,比如runAfterBoth()
、thenAcceptBoth()
、thenCombine()
等方法,它们之间的区别在于:是否带有入参、是否带有返回值。
其中thenCombine()
方法支持传入参、带返回值。
相关示例如下:
public static void main(String[] args) throws Exception {
// 创建异步执行任务
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync1,执行完毕");
return "supplyAsync1";
});
CompletableFuture cf2 = CompletableFuture
.supplyAsync(() -> {
System.out.println("supplyAsync2,执行完毕");
return "supplyAsync2";
})
.thenCombine(cf1, (r1, r2) -> {
System.out.println("r1任务执行结果:" + r1);
System.out.println("r2任务执行结果:" + r2);
return r1 + "_" + r2;
});
System.out.println("任务执行结果:" + cf2.get());
}
输出结果:
supplyAsync1,执行完毕
supplyAsync2,执行完毕
r1任务执行结果:supplyAsync2
r2任务执行结果:supplyAsync1
任务执行结果:supplyAsync2_supplyAsync1
2.3.2、OR组合
实现OR组合
的操作方法有很多,比如runAfterEither()
、acceptEither()
、applyToEither()
等方法,区别同上。
其中applyToEither()
方法支持传入参、带返回值。
相关示例如下:
public static void main(String[] args) throws Exception {
// 创建异步执行任务
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync1,执行完毕");
return "supplyAsync1";
});
CompletableFuture cf2 = CompletableFuture
.supplyAsync(() -> {
System.out.println("supplyAsync2,执行完毕");
return "supplyAsync2";
})
.applyToEither(cf1, (r) -> {
System.out.println("第一个执行成功的任务结果:" + r);
return r + "_applyToEither";
});
System.out.println("任务执行结果:" + cf2.get());
}
输出结果:
supplyAsync1,执行完毕
supplyAsync2,执行完毕
第一个执行成功的任务结果:supplyAsync2
任务执行结果:supplyAsync2_applyToEither
2.3.2、AllOf组合
实现AllOf组合
的操作就一个方法allOf()
,可以将多个任务进行组合,只有都执行成功才会回调,回调入参为空值。
相关示例如下:
public static void main(String[] args) throws Exception {
// 创建异步执行任务
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync1,执行完毕");
return "supplyAsync1";
});
CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync2,执行完毕");
return "supplyAsync2";
});
// 将多个任务,进行AND组合
CompletableFuture cf3 = CompletableFuture
.allOf(cf1, cf2)
.handle((r, e) -> {
System.out.println("所有任务都执行成功,result:" + r);
return "over";
});
System.out.println(cf3.get());
}
输出结果:
supplyAsync1,执行完毕
supplyAsync2,执行完毕
所有任务都执行成功,result:null
over
2.3.3、AnyOf组合
实现AnyOf组合
的操作,同样就一个方法anyOf()
,可以将多个任务进行组合,只要一个执行成功就会回调,回调入参有值。
相关示例如下:
public static void main(String[] args) throws Exception {
// 创建异步执行任务
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync1,执行完毕");
return "supplyAsync1";
});
CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync2,执行完毕");
return "supplyAsync2";
});
// 将多个任务,进行AND组合
CompletableFuture cf3 = CompletableFuture
.anyOf(cf1, cf2)
.handle((r, e) -> {
System.out.println("某个任务执行成功,返回值:" + r);
return "over";
});
System.out.println(cf3.get());
}
输出结果:
supplyAsync1,执行完毕
supplyAsync2,执行完毕
某个任务执行成功,返回值:supplyAsync1
over
三、小结
本文主要围绕CompletableFuture
类相关用法进行了一次知识总结,通过CompletableFuture
类可以简化异步编程,同时支持多种异步任务,按照条件组合处理,相比其它的并发工具类,操作更加强大、实用。
本篇内容比较多,如果有描述不对的地方,欢迎网友留言指出,希望本文知识总结能帮助到大家。
四、参考
1.https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650
2.https://juejin.cn/post/6970558076642394142
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
关于这个分布式服务的幂等性,这是在使用分布式服务的时候会经常遇到的问题,比如,重复提交的问题。而幂等性,就是为了解决问题存在的一个概念了。 什么是幂等 幂等(idempotent、idempotence)是⼀个数学与计算机学概念,常⻅于抽象代数中。 在编程中⼀…