完成上一篇文章的未尽事宜:
- ReentrantLock的lock、unlock源码分析
- Condition的await、signal源码分析
ReentrantLock#lock
lock方法最终是由sync实现的,公平锁的sync是FairSync,非公平锁是UnfairSync。
两者lock方法的区别是,公平锁FairSync直接调用acquire(1)方法,非公平锁UnfairSync则首先尝试获得锁资源(直接尝试修改锁状态)、获取不到才调用acquire(1)方法。
acquire方法由AQS实现。
AbstractQueuedSynchronizer#acquire
首先调用tryAcquire方法尝试获得锁资源,如果获取不成功的话,调用acquireQueued方法进入队列排队等待。
如果是通过acquireQueued方法经过排队获取到锁资源、并且方法返回true的话,说明在线程排队等待锁资源的过程中收到了中断信号,但是由于线程处于挂起状态、尚未获得锁资源,不能对CLH队列做操作,所以需要等到获取到锁资源之后、再调用selfInterrupt()中断。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
下面我们看一下tryAcquire及acquireQueued方法。
FairSync#tryAcquire以及UnFairSync#tryAcquire
公平锁与非公平锁的tryAcquire方法的实现逻辑不同。
公平锁FairSync#tryAcquire方法判断锁状态处于空闲的话,首先调用hasQueuedPredecessors方法,判断当前CLH队列中是否存在比当前线程等待时间更久的线程,没有的话才尝试获取锁资源(CAS方式修改锁状态)。
非公平锁UnFairSync#tryAcquire方法在判断锁处于空闲状态的话,直接尝试获取锁资源(CAS方式修改锁状态)。
不管是FairSync还是UnFairSync的tryAcquire方法,如果判断锁资源是被当前线程独占,则可以直接再次获取锁资源,锁状态state在当前值的基础上加1。这也就是可重入锁的意思,同一线程可以多次获得锁资源。
AbstractQueuedSynchronizer#acquireQueued
在tryAcquire获取锁资源失败后,调用acquireQueued方法,方法名很好的说明了方法的作用:通过排队获得锁资源。
方法的参数addWaiter(Node.EXCLUSIVE), arg),我们先看一下这个方法。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
代码其实很简单,首先将当前线程封装成Node,因为ReentrantLock是独占锁,所以Node的mode是独占。
然后如果队列不空的话,将当前节点的prev设置为尾结点(注意这个时候的操作不是CAS的,也没有获得锁),之后才利用CAS操作将尾结点设置为当前节点(到这一步当前节点才正式加入队列),返回。
否则如果队列空的话,调用enq方法把当前节点加入队列,enq方法通过自旋+CAS的方式将当前节点入队(加入到尾结点),如果当前队列空的话,为CLH队列创建一个空的头节点后再将当前节点作为尾结点加入到队列中。
值得关注的是enq方法的操作方式,与addWaiter在队列不空的时候尝试直接将当前节点加入队列尾部的操作逻辑一致(关注点1):操作是在没有获取到锁资源的前提下进行的,在不使用CAS的情况下首先将当前节点的prev指向尾结点,然后再尝试CAS改变队列的尾结点为当前节点。如果本次CAS操作失败(有其他线程已经加入队列,队列的尾结点有变化),则自旋再来一次,如此反复直到成功加入队列。
然后再来看acquireQueued方法:
先上代码:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//当前节点的前一个节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
定义了一个interrupted变量,表明当前线程排队挂起等待的过程中是否收到了中断信号。
判断当前节点的前一个节点如果是头节点的话,说明当前节点排队排到了,该出队获取锁资源了,所以就调用tryAcquire尝试,如果确实拿到锁资源了,则头节点出队,当前节点变成头节点,线程成功拿到了锁资源,返回interrupted。
否则,前一节点不是头节点的话,说明还得排队等待,所以首先调用shouldParkAfterFailedAcquire:方法名真的是太好了,建议大家学习一下,不要害怕方法名称太长,相比之下能够表达意思更重要!这个方法是判断在没有拿到锁资源后是否需要通过park挂起当前线程。
shouldParkAfterFailedAcquire的代码逻辑:
上一节点的waitStatus=Node.SIGNAL的话,说明上一节点处于等待状态,则返回true,可以挂起。
如果上一节点已经被取消(waitStatus>0)则循环检查所有取消状态的上一节点,将所有取消状态的上一节点全部出队列。
如果上一节点状态为0,则尝试CAS修改上一节点状态为Node.SIGNAL,返回false(还不能挂起,一直要等到上一节点状态为Node.SIGNAL才可以挂起),返回false之后调用方不挂起线程、在下次自旋过程中挂起。
修改上一节点状态为SIGNAL后才挂起是一种“负责任的挂起”态度(关注点2),是为了挂起之后能够顺利的被唤醒,因为锁占用线程在完成操作调用unlock释放锁资源之后,是针对队列中状态为Node.SIGNAL的节点做唤醒。
接下来,线程可以被挂起了,调用parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
很简单,挂起线程,(关注点3)被唤醒后返回线程的中断状态(挂起过程中如果收到中断信号则返回true,否则,没有收到中断信号则返回false)。
所以我们可以看到,如果线程排队、排队挂起等待锁资源的过程中如果收到了中断信号,则acquireQueued方法会返回true。
所以我们也就应该能够理解acquire方法中,调用acquireQueued方法返回为true的话,通过selfInterrupt()方法补发一个中断的原因了。
如果没有中断的话,继续自旋,再次判断当前节点的上一个节点是否是头节点,是的话就tryAcquire,不是的话就继续shouldParkAfterFailedAcquire、parkAndCheckInterrupt……如此反复,直到tryAcquire成功,获取到锁资源!
lock方法源码分析完毕!
ReentrantLock#unlock
unlock方法直接调用sync的release:
public void unlock() {
sync.release(1);
}
release方法是AQS实现的:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先调用ReentranceLock的tryRelease方法。之后检查CLH队列不空、且首节点waitStatus!=0的话(参见关注点2)则调用unparkSuccessor(h)。
先看tryRelease:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
tryRelease方法更新锁状态,如果更新后锁状态为0则释放锁setExclusiveOwnerThread(null),返回true。
释放锁资源成功,调用AQS的unparkSuccessor(h):
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus
如果当前节点(调用时传入的是头节点)waitStatus
一般来讲需要唤醒的节点就是当前节点的下一节点,但是如果下一节点被取消了,就只能找后面的没有被取消的节点。不过unparkSuccessor采用的算法是从尾结点向前找,一直找到当前节点,最后被找到的那个节点就是距离当前节点最近的那个未被取消的节点。
有一个疑问是:既然是双向队列,这里为什么要从尾结点向前查找,为什么不从当前节点向后查找?两者相比显然向后查找效率更高而且更容易理解。
建议结合关注点1来理解,因为节点加入队列的时候包含3步:
- 首先是设置当前节点的prev为原尾结点
- CAS修改尾结点为当前节点
- 修改原尾结点的next为当前节点
这3步操作并没有锁保护,3步一起看不是原子操作,只有单独看第2步是原子操作。所以执行完第1步、执行完第2步当前线程都有可能被cpu挂起。
为了方便说明问题,我们假设当前队列有H头节点,3个节点N1、N2、N3,N3是尾结点:
H -> N1 -> N2 -> N3(T)
假设线程1申请获取锁资源,但是锁资源被线程2独占了,线程1只能封装为N4申请加入队列。假设线程1的加入队列操作执行到上述第2步完成之后被挂起了。队列变成:
H -> N1 -> N2 -> N3 -> N4(T)
由于线程1还没有执行完加入队列的第3步,所以N4变成了尾结点,N4的prev是N3,但是N3的next是null。
假设这个时候碰巧,N1、N2、N3都被取消了。
好了,场景准备完成了。
线程2释放锁资源之后,该调用unparkSuccessor唤醒等待线程了。
如果向后查找,从头节点H开始,N1、N2、N3都被取消所以就被跳过了,N3的next是null,找不到尾结点N4……就出问题了。
推导一下向前查找应该是不存在这个问题的,这个算法应该是和加入队列算法(关注点1)匹配的。
好了,unparkSuccessor找到了应该被唤醒的排队节点后,唤醒该节点的线程,被唤醒的线程会从关注点3继续执行。
unlock分析完毕!
Condition#await
Conditon提供了await()、awaitUninterruptibly()、awaitNanos(long nanosTimeout)、await(long time, TimeUnit unit)、awaitUntil(Date deadline)等等待方法,其实主要也就是等待时长、可中断等待或不可中断等待的区别,代码逻辑大同小异。所以我们就以await()作为例子来分析。
AQS中包含了Condition接口的一个默认实现ConditionObject,await()方法是ConditionObject实现的:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
long savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
第一步,当前节点入Condition队列排队:调用addConditionWaiter方法,addConditionWaiter方法负责将当前线程封装为mode为CONDITION的Node,并将该Node放入Condition的队列中排队。
入Condition队列的逻辑也很简单,由lastWaiter指定的队尾节点的nextWaiter指定为当前节点,当前节点变为lastWaiter。addConditionWaiter同时也会清理队列中被取消的节点。
需要注意的是,Condition队列中的首节点不是空架子,是实实在在排队等待的线程!
第二步,用刚创建的节点node调用fullyRelease方法。
fullRelease方法的逻辑是:当前线程放弃对ReentrantLock的锁定,释放已经获取的锁资源。放弃锁资源的原因是,当前线程需要挂起等待,必须等其他线程唤醒之后才能继续执行操作,持有锁资源去睡觉显然是不合适的,是对锁资源的浪费,或许会造成死锁,所以就一定要释放锁资源,只有把锁资源交给其他线程,其他线程有机会获取锁资源,才有可能会对当前线程发出signal信号、重新唤醒当前线程。
fullRelease方法最终会调用release方法。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先调用tryRelease方法,成功则意味着当前线程释放掉了锁资源,直接检查CLH队列头节点如果不空、状态为等待唤醒的话,唤醒该节点。
第三步,循环检查刚刚加入到Condtion队列中的节点node如果不在CLH队列中,则挂起当前线程(关注点4)。直到当前线程被其他线程通过signal操作唤醒之后,则该节点node会被放入到CLH队列中(满足循环结束条件),结束循环。
第四步,已经结束Condition排队了,节点已经在CLH队列中了。我们知道这个时候当前线程必须要再次获取到锁资源才能继续。所以,使用当前节点node调用acquireQueued方法(这个方法前面分析过,我们已经很熟悉了)
第五步,如果Condition队列中当前节点node的下一节点nextWaiter不空的话,调用unlinkCancelledWaiters()清除被取消的节点。
第六步,挂起等待过程中如果发生中断的话,调用reportInterruptAfterWait处理中断。
await方法分析完成,接下来看signal方法。
Condition#signal
signal方法的调用场景是:当前业务完成操作之后,可能会导致在Condition对象的队列中排队的线程满足相应条件,因此需要调用该COndition对象的signal方法唤醒等待线程。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
signal方法首先获取到队列中的第一个节点,然后调用doSignal方法:
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
首先处理好排队队列,首节点出队列,如果队列空的话,做相应的处理。
然后使用当前节点调用transferForSignal方法,如果transferForSignal返回false,则获取fisrt为firstWaiter(其实就是用队列中的下一个节点继续循环)。
transferForSignal方法
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
如果将当前节点从CONDITION状态修改为0失败,*表明当前节点状态不正确,可能被取消,或者已经被其他线程唤醒中(已经修改状态为0了),则返回false。
否则,如果状态修改成功,该节点入CLH队列,并检查CLH队列中该节点的上一节点如果已经被取消、或者修改上一节点状态为SIGNAL失败的话(特殊情况),直接唤醒当前节点的线程重新同步(关注点4),返回true。
上面所说的特殊情况下,上一节点可能已经被取消,或者已经被其他线程修改了状态,所以当前节点可能获得了出队列从而获得锁资源的机会,所以就唤醒线程做一次尝试。
否则,正常修改上一节点状态为SIGNAL后,当前节点加入CLH队列中正常排队,等待被ReentranceLock的unlock方法唤醒(唤醒后程序逻辑依然会到关注点4)!
小结
Finally,ReentranceLock以及Condition分析完了,欢迎指正错误!
Thanks a lot!
上一篇 Java并发编程 Lock Condition & ReentrantLock(一)
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net