Java并发之Lock原理解析

Java并发之Lock原理解析

同步器是实现锁的关键,利用同步器将锁的语义实现,然后在锁的实现中聚合同步器。可以这样理解:锁的API是面向使用者的,它定义了与锁交互的公共行为,而每个锁需要完成特定的操作也是透过这些行为来完成的(比如:可以允许两个线程进行加锁,排除两个以上的线程),但是实现是依托给同步器来完成;同步器面向的是线程访问和资源控制,它定义了线程对资源是否能够获取以及线程的排队等操作。锁和同步器很好的隔离了二者所需要关注的领域,严格意义上讲,同步器可以适用于除了锁以外的其他同步设施上(包括锁)。

AbstractQueuedSynchronizer提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置(Lock, Semaphore, Latch, Barrier)的基础框架。AbstractQueuedSynchronizer的子类推荐作为为自定义同步装置的内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干acquire之类的方法来供使用。该同步器即可以作为互斥模式也可以作为共享模式,当它被定义为一个互斥模式时,其他线程对其的获取就被阻止,而共享模式对于多个线程获取都可以成功。

AbstractQueuedSynchronizer的数据结构

AbstractQueuedSynchronizer利用了一个int来表示状态,期望它能够成为实现大部分同步需求的基础。使用的方法是继承,子类通过继承同步器并需要实现它的方法来管理其状态,管理的方式就是通过类似acquire和release的方式来操纵状态。然而多线程环境中对状态的操纵必须确保原子性,因此子类对于状态的把握,需要使用这个同步器提供的以下三个方法对状态进行操作:

1
2
3
java.util.concurrent.locks.AbstractQueuedSynchronizer.getState()
java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int)
java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int)

类似模板模式,AQS暴露给子类一些方法实现(如tryAcquire,tryRelease), 获取操作通常是依赖state状态的,当状态允许时获取成功否则加入等待队列一直等待到允许的状态发生时重新获取,(读锁的state是资源许可数,写锁是重入数)例如,Semaphore中当state(即许可数量)小于等于0时,获取不能成功。释放操作通过会修改状态并判断是否能让其他等待的线程能够重新获取,例如ReentrantLock中释放会减少重入次数并检查重入次数是否达到了0,达到0说明已经解锁,这时会通知其他等待线程重新获取。

同步器的实现依赖于一个FIFO队列,那么队列中的元素Node就是保存着线程引用和线程状态的容器,每个线程对同步器的访问,都可以看做是队列中的一个节点。Node的主要包含以下成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Node {
// 一个标识符,指示节点使用共享模式等待
static final Node SHARED = new Node();
// 一个标识符,指示节点使用独占模式等待
static final Node EXCLUSIVE = null;
// 等待状态值,表示线程已被取消
static final int CANCELLED = 1;
// 等待状态值,表示继任的线程需要取消阻塞
// 当前节点的后继节点已经通过park阻塞,因此当当前节点释放或取消时必须unpark他的后继节点
static final int SIGNAL = -1;
// 等待状态值,表示线程由于阻塞而等待
static final int CONDITION = -2;
// 等待状态值,表示处于共享模式下,下一次的acquireShared无条件的传播
static final int PROPAGATE = -3;
// 节点当前等待状态值, 0:None of the above
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 当前线程
volatile Thread thread;
// 指向下一个处于阻塞等待的节点
Node nextWaiter;
}

线程等待队列

AQS的核心是一个线程等待队列,采用的是一个先进先出FIFO队列,AQS基于CLH进行修改作为线程等待队列。

CLH队列使用pred引用前节点形成一个队列,入队enqueue和出队dequeue操作都可以通过原子操作完成。初始化时,head和tail都会指向一个dummy节点,CLH队列的入队一个新的Node可以用伪代码来表示:

1
2
3
do{
node.pred = tail;
} while(!tail.compareAndSet(pred, node));

每个节点的释放状态由前置节点存放,所以spinlock的自旋可以表示为:

1
2
3
while(node.pred.status != RELEASED) {
// spin
}

出队可以通过更新head来完成,通过将head设置为刚刚获得锁的节点head = node

CLH队列有很多优点,包括入队和出队非常快、无锁、非阻塞,并且是否有线程等待只需要判断head和tail是否相等。
CLH中并没有只有向后的指针引用后继节点,每个节点只需要修改自己的状态就能通知后继节点。但是在AQS这样的阻塞同步器中,需要主动唤醒(unpark)后继节点。所以在AQS中增加了next引用引用后继节点,但是并没有合适的方法使用CAS进行无锁插入双向链表的方法,所以节点的插入并不是原子完成的,需要在CAS替换tail完成后调用pred.next=node。

1
2
3
4
5
6
7
8
9
while(true) {
t = tail;
node.pred = t;
if (tail.compareAndSet(t, node)) {
t.next = node;
break;
}
}
// 详细代码见 enq())

所以next节点为null并不能表示这个节点是尾节点,next引用只能当做是一个优化路径,当next节点不存在或是被撤销时,需要从tail开始使用pred进行反向遍历来进行精确的check。

另外一个改动是使用Node中的status状态来控制阻塞而不是像CLH中控制自旋,但是在AQS中这个状态并不能表示这个线程能从acquire操作中返回,在AQS中排队的线程只有在通过了子类实现的tryAcquire方法后才能从acquire中返回,所以这个阻塞状态只能说明这个活跃线程队列的头结点的时候可以去调用tryAcquire方法,如果失败了还需要重新阻塞。

AbstractQueuedSynchronizer暴露的API

AQS仅仅只是提供独占锁和共享锁两种方式,但是每种方式都有响应中断和不响应中断的区别,所以说AQS锁的更细粒度的划分为:

  • 不响应中断的独占锁(acquire)
  • 响应中断的独占锁(acquireInterruptibly)
  • 不响应中断的共享锁(acquireShared)
  • 响应中断的共享锁(acquireSharedInterruptibly)

而释放锁的方式只有两种,独占锁的释放与共享锁的释放。分别为:

  • 独占锁的释放(release)
  • 共享锁的释放(releaseShared)

“不响应中断的独占锁”模式

在AQS中,“不响应中断的独占锁”模式的获取锁的的入口是以下方法:

1
2
3
4
5
6
7
8
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

static void selfInterrupt() {
Thread.currentThread().interrupt();
}

tryAcquire方法是子类实现的一个方法,如果tryAcquire返回的是true,即表明当前线程获得了锁,自然也就不需要构建数据结构进行阻塞等待。如果tryAcquire方法返回的是false,那么当前线程没有获得锁,接着执行”acquireQueued(addWaiter(Node.EXCLUSIVE), arg))”这句代码,这句话很明显,它是由两步构成的:

  1. addWaiter(Node.EXCLUSIVE), arg),将当前线程封装成一个节点,添加到“等待锁的线程队列”中去。
  2. acquireQueued,当前线程所在节点目前在“等待锁的线程队列”中,当前线程仍然尝试从等待队列中去获取锁。

具体分析addWaiter(Node.EXCLUSIVE), arg)流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

我们给addWaiter传递的参数是Node.EXCLUSIVE,从传入的参数我们知道这是独占模式的。并且获取当前线程thread,将当前线程thread以及EXCLUSIVE独占模式,构造成一个节点Node。构造完成之后,需要入队,即加入到“等待锁的线程队列CLH”中。

如何入队?首先尝试的是快速入队。何为快速入队?直接把我们刚才构造的node的前驱指针指向当前尾节点,然后通过CAS操作把我们刚才构造的node作为新的尾节点,最后再把原来老的尾节点的后继指针指向现在的新的尾节点。说了那么多,那么快速入队的大前提是什么?那就是这个“等待锁的线程队列CLH”必须先存在。

如果不存在,那么只能走常规的入队操作流程,也就是进入到enq(node)方法中。从这里我们也可以知道,其实enq(node)在AQS的整个生命周期中只会执行一次,因为只有第一次初始化“等待锁的线程队列CLH”才会走到这里,后来的入队都会走“快速入队”流程。假设我们这里还没有“等待锁的线程队列CLH”,即当前的tail节点为null,那么就会进入enq(node)方法。

下面我们看下enq(node)方法的源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

如果尾节点为空,即当前数据结构中没有节点,那么new一个不带任何状态的Node作为头节点,并且将head赋值给tail。如果尾节点不为空,那么并发下使用CAS算法将当前Node追加成为尾节点,由于是一个for(;;)循环,因此所有没有成功acquire的Node最终都会被追加到数据结构中。

acquireQueued(final Node node, int arg)流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取node的前驱节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//前驱是头结点且获得锁
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); // Cancels an ongoing attempt to acquire.
}
}

private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

如果当前线程所在节点的前继节点是head节点,那么当前节点就再次的tryAcquire了一次。如果当前线程所在节点tryAcquire成功了,那么执行setHead方法,将当前节点作为head、将当前节点中的thread设置为null、将当前节点的prev设置为null,这保证了数据结构中头结点永远是一个不带Thread的空节点。

如果当前线程所在节点的前继节点不是head节点或者当前线程所在节点tryAcquire失败了,那么它就需要判断自己需不需要进行阻塞了,因为至少到目前为止,它真的没有机会再去获取锁了。当前线程进行阻塞的大前提是,需要寻找一个前继节点的waitStatus为SIGNAL的节点,这是AQS约定的。只有自己节点的前继节点的waitStatus是SIGNAL,我这个节点才可以安心的去阻塞。因为我的前继节点的waitStatus是SIGNAL,就相当于我告诉了我的前继节点,我将要去阻塞了。

shouldParkAfterFailedAcquire(Node pred, Node node)源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* 前驱节点终止了. 跳过前驱节点,再尝试
* 此处删除了无效的线程终止节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 找到一个前驱节点waitStats不是CANCELLED的并且最靠近head节点的那一个为止
// 用CAS机制把前驱节点的waitStatus更新为SIGNAL状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
  1. 这里会判断当前节点的前驱节点的状态,如果当前节点的前驱节点的waitStatus是SIGNAL,返回true,表示当前节点应当park。这个时候就会调用parkAndCheckInterrupt()方法.当前线程就会被阻塞住。从这个方法还可以看出,如果这个线程被唤醒了,这个线程自己会返回在它阻塞期间有没有被中断过。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。在acquireQueued(final Node node, int arg)方法中,如果这个线程被唤醒了,并且曾经在阻塞期间被中断过,就将中断标识符interrupted置为true。接着线程又会进入acquireQueued(final Node node, int arg)的for循环中。如果当前这个被唤醒的线程是正常被唤醒的,那么它的前继节点就应该是head,这个时候当前被唤醒的线程就会执行tryAcquire方法去获取锁。如果假设它获取锁成功了,那么它会把自己设置为head节点,并且把head节点的持有线程设置为null,以保持head节点是dummy节点,接着当前线程就去做自己的业务了。

  2. 如果当前节点的前驱节点的waitStatus>0,相当于CANCELLED(因为状态值里面只有CANCELLED是大于0的),那么CANCELLED的节点作废,当前节点不断向前找并重新连接为双向队列,直到找到一个前驱节点waitStats不是CANCELLED的并且最靠近head节点的那一个为止。它的前驱节点不是SIGNAL状态且waitStatus<=0,利用CAS机制把前驱节点的waitStatus更新为SIGNAL状态。在这种情况下parkAndCheckInterrupt返回的是false,也就是说当前节点持有的线程还是不死心,它还需要最后一次tryAcquire,这也是它最后的一次挣扎的机会了。如果这一次失败了,就必须进行阻塞。

这里针对acquire做一下总结:

  1. 状态的维护;需要在锁定时,需要维护一个状态(int类型),而对状态的操作是原子和非阻塞的,通过同步器提供的对状态访问的方法对状态进行操纵,并且利用compareAndSet来确保原子性的修改。
  2. 状态的获取;一旦成功的修改了状态,当前线程或者说节点,就被设置为头节点。
  3. sync队列的维护。在获取资源未果的过程中条件不符合的情况下(不该自己,前驱节点不是头节点或者没有获取到资源)进入睡眠状态,停止线程调度器对当前节点线程的调度。这时引入的一个释放的问题,也就是说使睡眠中的Node或者说线程获得通知的关键,就是前驱节点的通知,而这一个过程就是释放,释放会通知它的后继节点从睡眠中返回准备运行。

abstractqueuedsynchronizer-acquire

如上图所示,其中的判定退出队列的条件,判定条件是否满足和休眠当前线程就是完成了自旋spin的过程。

下面来看看release()操作

此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。下面是release()的源码:

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

调用tryRelease来释放资源。有一点需要注意的是,它是根据tryRelease的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计tryRelease的时候要明确这一点!!跟tryAcquire一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release是根据tryRelease的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。如果没有彻底释放资源,也就是出现了重入的情况,需要多次释放。如果释放成功了,我们就需要唤醒head节点的下一个节点所持有的线程。基本逻辑如下:

  1. 首先拿到head节点,判断head节点不等于null,并且head节点的waitStatus是不等于0的话,就去唤醒head节点的下一个节点所持有的线程。
  2. 调用unparkSuccessor(Node node)方法唤醒head节点的下一个节点所持有的线程。

接下来看下unparkSuccessor(Node node)方法的源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

用unpark()唤醒等待队列中最前边的那个未放弃的线程,这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃的线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了!

“响应中断的独占锁”模式

1
2
3
4
5
6
7
8
// 用于实现Lock的lockInterruptibly方法
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

tryAcquire方法前面说过了,是子类实现的一个方法,如果tryAcquire返回的是true,即表明当前线程获得了锁,自然也就不需要构建数据结构进行阻塞等待。如果tryAcquire方法返回的是false,那么当前线程没有获得锁,接着执行”doAcquireInterruptibly(int arg)”这句代码。所有的这一切都是基于当前线程没有被interrupted的。

doAcquireInterruptibly(int arg)流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 此处是区别,响应中断
}
} finally {
if (failed)
cancelAcquire(node);
}
}

与上面提到的acquireQueued()方法的区别在于阻塞的线程被唤醒后会抛出中断异常。

“不响应中断的共享锁”模式

共享模式和之前的独占模式有所区别。以文件的查看为例,如果一个程序在对其进行读取操作,那么这一时刻,对这个文件的写操作就被阻塞,相反,这一时刻另一个程序对其进行同样的读操作是可以进行的。如果一个程序在对其进行写操作,那么所有的读与写操作在这一时刻就被阻塞,直到这个程序完成写操作。

“不响应中断的共享锁”模式的获取锁的的入口是以下方法:

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

tryAcquireShared是共享方式尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。如果tryAcquireShared返回的是负数,即表明当前线程获取锁失败,自然也就需要构建数据结构进行阻塞等待,此时需要进入到doAcquireShared方法了。如果tryAcquireShared方法返回的是正数,那么当前线程已经获得了锁,则直接跳过,去执行自己的业务。

如果tryAcquireShared返回的是负数, 那么流程会走doAcquireShared方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
  • 在方法内部,我们将当前线程封装成了一个Node.SHARED节点。构造完成之后,将该节点入队;
  • 如果当前线程所在节点的前继节点是head节点,那么当前节点就再次的tryAcquireShared了一次。如果tryAcquireShared的返回值是大于等于0的,就证明当前线程成功的获取了共享锁,那么执行setHeadAndPropagate方法,将当前节点作为head、将当前节点中的thread设置为null、将当前节点的prev设置为null,这保证了数据结构中头结点永远是一个不带Thread的空节点。其中setHeadAndPropagate方法的第二个参数是tryAcquireShared方法的返回值,我们知道: 负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

当propagate大于0的时候,就证明我们现在共享锁的资源充足,可能目前有线程阻塞在队列中,我们需要去唤醒当前节点的下一个节点,这就是共享锁唤醒的传播性。

  • 如果当前线程所在节点的前继节点不是head节点,那么它就需要判断自己需不需要进行阻塞了,因为至少到目前为止,它真的没有机会再去获取锁了。当前线程进行阻塞的大前提是,需要寻找一个前继节点的waitStatus为SIGNAL的节点,这是AQS约定的。只有自己节点的前继节点的waitStatus是SIGNAL,我这个节点才可以安心的去阻塞。因为我的前继节点的waitStatus是SIGNAL,就相当于我告诉了我的前继节点,我将要去阻塞了,到时候请唤醒我。

releaseShared()是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。

例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。

“响应中断的共享锁”模式

“响应中断的共享锁”模式的获取锁的的入口是以下方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 此处不同的处理
}
} finally {
if (failed)
cancelAcquire(node);
}
}

参考资料:

  • Doug Lea’s AQS paper
-------------本文结束感谢您的阅读-------------
坚持分享,您的支持将鼓励我继续创作!
0%