CyclicBarrier源码学习
Java并发编程(十六):CyclicBarrier源码分析
CyclicBarrier执行流程:
最后一个就位线程负责把所有条件队列中的线程添加到同步队列,然后在finally中执行ReentrantLock的unlock方法唤醒同步队列中的head.next,本节点成为新的head节点,然后被唤醒线程同样会到finally的unlock方法中唤醒下一个线程,这样传递唤醒
一、初始化
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties 0) throw new IllegalArgumentException();
// 需要多少数量的线程到达才可以打碎屏障
this.parties = parties;
// 到达屏障线程的计数器
this.count = parties;
// 线程到达屏障时的任务方法
this.barrierCommand = barrierAction;
}
二、设置屏障:public int await()
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 调用为设置超时时间的等待
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 设置ReentrantLock 非公平锁 lock
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 每次使用屏障的都表示为一个生成实例,当屏障跳闸时generation会重置
final Generation g = generation;
// 检测当前屏障是否被打碎,下方调用breakBarrier()地方都会打碎
if (g.broken)
throw new BrokenBarrierException();
// 当前线程被哦中断,则调用breakBarrier(),中断屏障
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 每次有一个线程到达屏障,则将屏障等待线程计数器-1
int index = --count;
// 到达屏障的线程数已满足条件
if (index == 0) { // tripped
// 标识打碎屏障后的后续动作是否执行完成
boolean ranAction = false;
try {
// 执行到达屏障后的汇总任务
final Runnable command = barrierCommand;
if (command != null)
command.run();
// 标识打碎屏障后的后续动作已经执行完成
ranAction = true;
// 屏障跳闸时generation会重置
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 到达屏障的前n-1个线程会走到
for (;;) {
try {
// 是否超时等待
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 异常情况处理
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private voi服务器托管网d nextGeneration() {
// 此处是将条件队列中的所有节点转移到同步队列,当此线程(到达屏障的第n个线程)执行到finally 中调用 lock.unlock();,唤醒下一个节点,被唤醒节点成为新的head节点,同样会到finally的unlock方法中唤醒下一个线程,这样传递唤醒
trip.signalAll();
// 重置计数器
count = parties;
// 重新生成屏障实例
generation = new Generation();
}
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程生成node添加到条件队列,被lastWaiter引用
Node node = addConditionWaiter();
// 释放全部锁资源
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断是否在等待队列上
// 前n-1个线程都会执行到次数,此时都不在等待队列上,因此进入循环进行阻塞
// 第n个线程到达屏障后会将所有条件队列中的节点放到等待队列,详细参考方法nextGeneration()注解
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 如果线程被打断,则break
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// ReentrantLock入队阻塞逻辑,正常情况因为唤醒是在 finally 中调用 lock.unlock();,唤醒下一个节点的情况
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// 此节点已被取消,从条件队列中移除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 构建条件节点,添加到条件队列末尾
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = no服务器托管网de;
lastWaiter = node;
return node;
}
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 移除条件队列中的非条件节点
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 释放AQS中的state,此处其实是当前线程已经加入条件队列,那么可以"释放锁",但是没有执行释放锁的逻辑
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 此时head为null,因为第n个线程到达屏障时才会将条件队列中的节点转移到同步队列,此时同步队列为空
Node h = head;
if (h != null && h.waitStatus != 0)
// 执行不到
unparkSuccessor(h);
return true;
}
return false;
}
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
// 线程挂起到的时间点
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 挂起超时则从条件队列中移除,添加到同步队列
if (nanosTimeout 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net