Informal Essay By English
It is always a pleasure to learn
背景
在某一个风和日丽的早上,小组同事说昨晚线上服务有20分钟左右的不可用,当时内心一紧,不会是我写的代码有bug导致的吧,我正了正心态,故作轻松地说有定位到是什么原因导致的吗?(内心慌的一批)他开始滔滔不绝地说了一大堆是如何排查问题的(技术人的特性,对于解决问题非常热忱),虽然我当时一直保持着很认真的神态,但其实心里非常煎熬(是谁的代码导致的一直没有说!!)。10分钟后…..,同事语重心长地说,这次这个线上问题暴露我们以前写的代码还是欠缺一些场景的考虑。听到这里我大概已经知道不是我的代码导致的(注:我刚进这个小组不久),这个时候我也开始语重心长地附和道“是啊,以前我们在这块的考量还是有些不足的…………然后就开始和他就‘代码质量如何保证’话题讨论了半小时”。
问题描述
问题最终定位到是一个并发问题。线上有一个接口是通过CompletableFuture与线程池结合使用去获取下游的数据(注:使用异步的方式去获取下游数据则是因为调用的下游的接口是一个耗时高的soa接口),大致代码如下:
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ThreadPoolExecutor flowContractThreadPool = new ThreadPoolExecutor(2,
5, 15, TimeUnit.MINUTES,
new ArrayBlockingQueue(5),
new ThreadPoolExecutor.DiscardPolicy());
List threadList = Lists.newArrayList();
for (int i = 0; i future = CompletableFuture.runAsync(() -> {
log.info("main for");
// do something
}, flowContractThreadPool);
threadList.add(future);
}
CompletableFuture.allOf(threadList.toArray(new CompletableFuture[threadList.size()])).join();
//获取结果,然后做相关业务处理
log.info("end");
}
现在暂时不去分析代码,先描述现象。出现问题的那个晚上,有一波突刺流量,由于我们没有针对接口的请求失败做短信告警(虽然有钉钉异常告警群,但是大家都不是很关心群里消息),因此一开始出现问题的接口出现大面积的请求超时而我们都没有感知到,直到最终服务出现不可用,值班同事才发现这个问题(经过此事件,我们很自然地加上请求失败告警️)。从结合链路和日志分析定位问题出现的原因到服务恢复的MTTR大概花了10分钟(在此特意表扬一下此同事☀️☀️)。
问题分析
通过上面背景与问题的描述,已经知道整个问题的全貌,现在从技术的角度去分析一下CompletableFuture结合线程池的原理与使用注意事项。
CompletableFuture
从上述的案例代码里面,涉及到CompletableFuture中的三个方法,他们分别是runAsync、join、allOf,下面我们逐步去分析这几个方法:
runAsync
这个方法的效果返回一个新的CompletableFuture,该CompletableFuture是在给定执行器中运行的任务在运行给定动作后异步完成的。
public static CompletableFuture runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
//此方法是为了保证操作系统是多核的情况下走线程池,单核情况下不走线程池,由单个线程去跑相应的逻辑
static Executor screenExecutor(Executor e) {
if (!useCommonPool && e == ForkJoinPool.commonPool())
return asyncPool;
if (e == null) throw new NullPointerException();
return e;
}
static CompletableFuture asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture d = new CompletableFuture();
e.execute(new AsyncRun(d, f));
return d;
}
runAsync方法中对CompletableFuture进行了一层封装,通过AsyncRun对象组装一个空的CompletableFuture与Runnable,然后将空的CompletableFuture返回,我们再来看看AsyncRun的一个结构:
static final class AsyncRun extends ForkJoinTask
implements Runnable, AsynchronousCompletionTask {
CompletableFuture dep; Runnable fn;
AsyncRun(CompletableFuture dep, Runnable fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return true; }
public void run() {
CompletableFuture d; Runnable f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
//由于f
f.run();
d.completeNull();
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}
AsyncRun和AsyncSupply的实现略有不同,AsyncRun的run中,计算的执行是通过调用传入的Runnable(源码中的 f 变量)的run方法进行的。由于没有返回值,所以这里在设置CompletableFuture的值时,使用其completeNull()方法,设置一个特殊的空值标记。
AsyncRun的继承结构大致如下:
allOf
allOf的方法的作用是当所有给定的CompletableFutures完成时,返回一个新的CompletableFuture。如果给定的任何一个CompletableFuture异常完成,那么返回的CompletableFuture也会异常完成,并使用CompletionException将此异常作为其原因。否则,给定的CompletableFuture的结果(如果有的话)不会反映在返回的CompletableFuture中,而是可以通过单独检查它们来获得。如果没有提供CompletableFutures,则返回一个完整的CompletableFuture,其值为null。该方法的应用之一是在继续一个程序之前等待一组独立的CompletableFuture的完成,如:allOf(c1, c2, c3).join();
public static CompletableFuture allOf(CompletableFuture>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
static CompletableFuture andTree(CompletableFuture>[] cfs,
int lo, int hi) {
CompletableFuture d = new CompletableFuture();
//对传入的CompletableFuture的参数校验,如果没有通过则返回AltResult
if (lo > hi) // empty
d.result = NIL;
else {
CompletableFuture> a, b;
//通过右移操作获取中值
int mid = (lo + hi) >>> 1;
//通过递归的方式a、b的赋值操作(这个逻辑有点抽象,大家可以通过花图去理解一下)
if ((a = (lo == mid ? cfs[lo] :
andTree(cfs, lo, mid))) == null ||
(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
andTree(cfs, mid+1, hi))) == null)
throw new NullPointerException服务器托管网();
//判断任务是否执行
if (!d.biRelay(a, b)) {
BiRelay,?> c = new BiRelay(d, a, b);
a.bipush(b, c);
c.tryFire(SYNC);
}
}
return d;
}
//判断任务是否执行,可简单理解为:result 是 null 任务没执行,不是 null 任务已执行。
boolean biRelay(CompletableFuture> a, CompletableFuture> b) {
Object r, s; Throwable x;
if (a == null || (r = a.result) == null ||
b == null || (s = b.result) == null)
return false;
if (result == null) {
if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
completeThrowable(x, r);
else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null)
completeThrowable(x, s);
else
completeNull();
}
return服务器托管网 true;
}
//这个方法是用于对象编排
final void bipush(CompletableFuture> b, BiCompletion,?,?> c) {
if (c != null) {
Object r;
while ((r = result) == null && !tryPushStack(c))
lazySetNext(c, null); // clear on failure
if (b != null && b != this && b.result == null) {
Completion q = (r != null) ? c : new CoCompletion(c);
while (b.result == null && !b.tryPushStack(q))
lazySetNext(q, null); // clear on failure
}
}
}
//用于获取future执行完的返回值
final CompletableFuture tryFire(int mode) {
CompletableFuture d;
CompletableFuture a;
CompletableFuture b;
if ((d = dep) == null || !d.biRelay(a = src, b = snd))
return null;
src = null; snd = null; dep = null;
return d.postFire(a, b, mode);
}
join
完成时返回结果值,如果异常完成则抛出(未检查的)异常。为了更好地符合通用函数形式的使用,如果在CompletableFuture的完成过程中涉及的计算抛出了异常,则该方法抛出一个(未检查的)CompletionException,并将底层异常作为其原因。
public T join() {
Object r;
return reportJoin((r = result) == null ? waitingGet(false) : r);
}
//此方法的的作用就是用于判断AltResult的result中是否有异常,如果有抛出来
private static T reportJoin(Object r) {
if (r instanceof AltResult) {
Throwable x;
if ((x = ((AltResult)r).ex) == null)
return null;
if (x instanceof CancellationException)
throw (CancellationException)x;
if (x instanceof CompletionException)
throw (CompletionException)x;
throw new CompletionException(x);
}
@SuppressWarnings("unchecked") T t = (T) r;
return t;
}
//等待后返回原始结果,如果可中断且已中断则返回null。
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) {
//spins用于自旋,可不用关注
if (spins 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
// 必会执行的分支,会将q的值赋值new Signaller(interruptible, 0L, 0L);
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
//必会执行的分支,把stack 设置为 q
else if (!queued)
queued = tryPushStack(q);
//线程中断时会执行
else if (interruptible && q.interruptControl
上面我们对于CompletableFuture有了一个粗略的认识,想了解更多的话,推荐去看看CompletableFuture 的 allOf 方法底层原理是什么。
案例分析
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ThreadPoolExecutor flowContractThreadPool = new ThreadPoolExecutor(2,
5, 15, TimeUnit.MINUTES,
new ArrayBlockingQueue(5),
new ThreadPoolExecutor.DiscardPolicy());
List threadList = Lists.newArrayList();
for (int i = 0; i future = CompletableFuture.runAsync(() -> {
log.info("main for");
// do something
}, flowContractThreadPool);
threadList.add(future);
}
CompletableFuture.allOf(threadList.toArray(new CompletableFuture[threadList.size()])).join();
//获取结果,然后做相关业务处理
log.info("end");
}
再回到我们的案例代码里面,案例中我们给线程池设置的拒绝策略是DiscardPolicy(造成此次线上问题的罪魁祸首),此策略的作用是当线程池中的队列满时再来任务会静默丢弃task,现在问题来了,这个丢弃的任务就有可能是某些阻塞等待线程的FutureTask,那么这些调用了get()的无限时等待api的线程将无限时阻塞了,没人去唤醒他,如下图:
FutureTask被丢弃的话,CompletableFuture.allOf(threadList.toArray(new CompletableFuture[threadList.size()])).join();这段代码就会一直阻塞线程获取FutureTask结果,此时结果是永远无法获取到(task丢弃,因此FutureTask 的引用无法获取数据),因此会一直夯住主线程,随着夯住的线程越来越多,tomcat的线程也会被打满,整个服务就瘫痪了。
问题处理
那么如何去处理这种现象呢?如果认真看完上面的内容的小伙伴其实已经有答案了,我这里提供两种思路去处理。第一种就是根据线上流量峰值去增大队列长度(这种方式不推荐,治标不治本);第二种是通过修改线程池的拒绝策略避免这种情况(最好是自己实现拒绝策略,这种方式对于业务的扩展更佳灵活)。
最后提出一个问题,如果是你碰到这个问题,你会怎么去处理呢?
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
相关推荐: 年终收官!华为云开发者日·2023年度创享峰会成功举办
12月20日,华为云开发者日2023年度创享峰会成功举办,众多开发者与技术爱好者齐聚一堂,在现场,有600余名开发者与华为云技术专家共同就大模型应用、CodeArts软件开发等技术话题进行深入探讨,分享实战技巧与解决方案。此外,华为云还精心设置了KooLabs…