1 FutureTask实现原理
下图所示为FutureTask的继承关系,我们以此为基础分析一下Future接口的具体实现;
2 FutureTask七种任务状态
future task内部定义了任务执行的七种状态,如下所示:
状态的流转可以简单总结如下:
3 FutureTask的内部变量
本节主要聚焦FutureTask 的几个关键成员变量,从这里可以大致看出FutureTask的实现思路:
- callable,需要执行的callable接口;
- outcome: 返回对象
- runner : 执行callable的线程
waiters: 用于保存由于调用Future.get方法而阻塞的线程的Treiber椎引用成变量waiters
特别重视以下这里的waiters,是线程阻塞以及获取内容的关键!
/** The underlying callable; nulled out after running */
private Callable callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
4 get的实现?
get的关键就是检查task的状态,如果是在completing状态之前就新增一个waiters,这里重点关注awaitDone(true, unit.toNanos(timeout))
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s
[外链图片转存失败(img-ZlYSkjNO-1568468795053)(assets/
这个循环的功能就是检查状态,然后由
LockSupport.park进行关闭操作
5 call如何执行?
上文看到get如果不成功,会调用park方法,使当前线程挂住,那么执行结果如何返回,从而退出执行状态的呢?这里需要重点看看call的执行!可以看到实际上call的执行仍然是封装在一个run thread内完成.当程序执行完成set(result)
会做一些额外工作,而get返回结果的原理也在于此!
下图是set的实现:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
缓存线程依次从单项链表中提取出来,进行解锁,而第四节的get也会继续开启循环,执行,最终找到变量退出!
6 cancel的原理?
上文关注的了一个call线程的返回处理,那么future如何取消一个线程,实际上也非常容易:
- 找到工作任务,中断线程;
- 设置工作任务线程future的最终状态!
如下图所示为代码实现:
7 Future用例
这里给出一个线程池调用run和call进行线程提交的用例:
package com.xinquanv1.everyday.java.test.future;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
public class CompletableFutureUse implements Callable {
private String param;
public CompletableFutureUse(String param) {
this.param = param;
}
@Override
public String call() throws Exception {
//模拟执行业务逻辑的耗时
TimeUnit.SECONDS.sleep(3);
String result = this.param + " 处理完成!";
return result;
}
public static void main(String[] args) throws Exception{
String queryStr = "query1";
String queryStr2 = "query2";
FutureTask future1 = new FutureTask(new UseFuture(queryStr));
FutureTask future2 = new FutureTask(new UseFuture(queryStr2));
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
System.out.println("---this is runnable test!");
});
Future valueFuture = executorService.submit(() -> {
System.out.println("---this is callable test!");
return "this is a callable test";
});
executorService.submit(future1);//异步操作
executorService.submit(future2);//异步操作
System.out.println("执行中...");
TimeUnit.SECONDS.sleep(2);//处理其他相关的任务。
String result1 = future1.get();
String result2 = future2.get();
String result3 = valueFuture.get();
System.out.println("数据处理完成。。" + result1);
System.out.println("数据处理完成。。" + result2);
System.out.println("数据处理完成。。" + result3);
}
}
这里给出用例主要是为了说明以下实际上线程池中关于run和call的使用都是在抽象类中生成了FutureTask的对象,因此所有的这些新线程的调度原理也是和上面的分析一样!
//callable
protected RunnableFuture newTaskFor(Callable callable) {
return new FutureTask(callable);
}
//run
protected RunnableFuture newTaskFor(Runnable runnable, T value) {
return new FutureTask(runnable, value);
}
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net