Java并发之Lock功能分析

Java并发之Lock功能分析

可重入锁

ReentrantLock实现了Lock接口,Lock接口中定义了lock与unlock相关操作,并且还存在newCondition方法,表示生成一个条件。

重入锁相比 synchronized 有哪些优势:

  • 可以在线程等待锁的时候中断线程(lockInterruptibly),synchronized 是做不到的。
  • 可以尝试获取锁,如果获取不到就放弃,或者设置一定的时间,这也是 synchroized 做不到的。
  • 可以设置公平锁,synchronized 默认是非公平锁,无法实现公平锁。

公平与非公平

ReentrantLock类内部总共存在Sync、NonfairSync、FairSync三个类,NonfairSync与FairSync类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类。ReentrantLock还提供了公平锁和非公平锁的选择,构造方法接受一个可选的公平参数(默认非公平锁),当设置为true时,表示公平锁,否则为非公平锁。公平锁与非公平锁的区别在于公平锁的锁获取是有顺序的。但是公平锁的效率往往没有非公平锁的效率高,在许多线程访问的情况下,公平锁表现出较低的吞吐量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();

/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 可以获得锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 重入
int nextc = c + acquires;
if (nextc < 0) // overflow 锁的最大上限int.max
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
...
}

NonfairSync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread()); // 非公平锁,不参与队列排队,尝试直接插队
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

FairSync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1); // 公平锁,从队列head开始
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

读写锁

所谓读写锁,是对访问资源共享锁和排斥锁,一般的重入性语义为 如果对资源加了写锁,其他线程无法再获得写锁与读锁,但是持有写锁的线程,可以对资源加读锁(锁降级);如果一个线程对资源加了读锁,其他线程可以继续加读锁。java.util.concurrent.locks中关于多写锁的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading.
*/
Lock readLock();

/**
* Returns the lock used for writing.
*
* @return the lock used for writing.
*/
Lock writeLock();
}

ReentrantReadWriterLock通过两个内部类实现Lock接口,分别是ReadLock,WriterLock类。与ReentrantLock一样,ReentrantReadWriterLock同样使用自己的内部类Sync(继承AbstractQueuedSynchronizer)实现CLH算法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/

static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

ReentrantReadWriterLock使用一个32位的int类型来表示锁被占用的线程数(AbstractQueuedSynchronizer中的state),高16位用来表示读锁占有的线程数量,用低16位表示写锁被同一个线程申请的次数。

  • SHARED_SHIFT,表示读锁占用的位数,常量16
  • SHARED_UNIT,增加一个读锁,按照上述设计,就相当于增加 SHARED_UNIT;
  • MAX_COUNT,表示申请读锁最大的线程数量,为65535
  • EXCLUSIVE_MASK,表示计算写锁的具体值时,该值为 15个1,用 getState & EXCLUSIVE_MASK算出写锁的线程数,大于1表示重入。

比如,现在当前,申请读锁的线程数为13个,写锁一个,那state怎么表示?

上文说过,用一个32位的int类型的高16位表示读锁线程数,13的二进制为1101,那state的二进制表示为 00000000 00001101 00000000 00000001,十进制数为851969, 接下在具体获取锁时,需要根据这个851968这个值得出上文中的 13 与 1。要算成13,只需要将state 无符号向左移位16位置,得出00000000 00001101,就出13,根据851969要算成低16位置,只需要用该00000000 00001101 00000000 00000001 & 111111111111111(15位),就可以得出00000001,就是利用了1&1得1,1&0得0这个技巧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* The number of reentrant read locks held by current thread.
* Initialized only in constructor and readObject.
* Removed whenever a thread's read hold count drops to 0.
*/
private transient ThreadLocalHoldCounter readHolds;

/**
* The hold count of the last thread to successfully acquire
* readLock. This saves ThreadLocal lookup in the common case
* where the next thread to release is the last one to
* acquire. This is non-volatile since it is just used
* as a heuristic, and would be great for threads to cache.
*
* <p>Can outlive the Thread for which it is caching the read
* hold count, but avoids garbage retention by not retaining a
* reference to the Thread.
*
* <p>Accessed via a benign data race; relies on the memory
* model's final field and out-of-thin-air guarantees.
*/
private transient HoldCounter cachedHoldCounter;

/**
* firstReader is the first thread to have acquired the read lock.
* firstReaderHoldCount is firstReader's hold count.
*
* <p>More precisely, firstReader is the unique thread that last
* changed the shared count from 0 to 1, and has not released the
* read lock since then; null if there is no such thread.
*
* <p>Cannot cause garbage retention unless the thread terminated
* without relinquishing its read locks, since tryReleaseShared
* sets it to null.
*
* <p>Accessed via a benign data race; relies on the memory
* model's out-of-thin-air guarantees for references.
*
* <p>This allows tracking of read holds for uncontended read
* locks to be very cheap.
*/
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

上述这4个变量,其实就是完成一件事情,将获取读锁的线程放入线程本地变量(ThreadLocal),方便从整个上 下文,根据当前线程获取持有锁的次数信息。其实 firstReader,firstReaderHoldCount ,cachedHoldCounter 这三个变量就是为readHolds变量服务的,是一个优化手段,尽量减少直接使用readHolds.get方法的次数,firstReader与firstReadHoldCount保存第一个获取读锁的线程,也就是readHolds中并不会保存第一个获取读锁的线程;cachedHoldCounter 缓存的是最后一个获取线程的HolderCount信息,该变量主要是在如果当前线程多次获取读锁时,减少从readHolds中获取HoldCounter的次数。

sync

1
2
3
4
5
6
7
8
9
10
11
12
13
 /**
* Returns true if the current thread, when trying to acquire
* the read lock, and otherwise eligible to do so, should block
* because of policy for overtaking other waiting threads.
*/
abstract boolean readerShouldBlock();

/**
* Returns true if the current thread, when trying to acquire
* the write lock, and otherwise eligible to do so, should block
* because of policy for overtaking other waiting threads.
*/
abstract boolean writerShouldBlock();

Sync中提供了很多方法,但是有两个方法是抽象的,子类必须实现。

公平

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
//// --> call AbstractQueuedSynchronizer.hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

writerShouldBlock和readerShouldBlock方法都表示当有别的线程也在尝试获取锁时,是否应该阻塞。
对于公平模式,hasQueuedPredecessors()方法表示前面是否有等待线程。一旦前面有等待线程,那么为了遵循公平,当前线程也就应该被挂起。

非公平

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive(); //该方法,具体又是在 AbstractQueuedSynchronizer中
}
//// --> call AbstractQueuedSynchronizer.apparentlyFirstQueuedIsExclusive
/**
* Returns {@code true} if the apparent first queued thread, if one
* exists, is waiting in exclusive mode. If this method returns
* {@code true}, and the current thread is attempting to acquire in
* shared mode (that is, this method is invoked from {@link
* #tryAcquireShared}) then it is guaranteed that the current thread
* is not the first queued thread. Used only as a heuristic in
* ReentrantReadWriteLock.
*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

可以看到,非公平模式下,writerShouldBlock直接返回false,说明不需要阻塞;

而readShouldBlock调用了apparentFirstQueuedIsExcluisve()方法。即判断阻塞队列中 head 的第一个后继节点是否是来获取写锁的,如果是的话,让这个写锁先来,避免写锁饥饿。

如果当前占据的锁是读锁,那么紧接着的下一个读线程不应排队,排队s若不为null,!s.isShared()一定是true。当前线程需要排队。
如果当前占据的锁是写锁,那么一定是当前线程占据了写锁,同时在申请读锁。如果此时下一个是写线程在等待,当前线程需要排队。

排队的原因,在注释中有说,为了防止等待写锁线程的无限饥饿

ReadLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Acquires the read lock.
*
* <p>Acquires the read lock if the write lock is not held by
* another thread and returns immediately.
*
* <p>If the write lock is held by another thread then
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until the read lock has been acquired.
*/
public void lock() {
sync.acquireShared(1);
}
//// --> call AbstractQueuedSynchronizer.acquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
// 在 AQS 中,如果 tryAcquireShared(arg) 方法返回值小于 0 代表没有获取到共享锁(读锁),大于 0 代表获取到
}
//// --> call ReentrantReadWriteLock.Sync.tryAcquireShared
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState(); // 获取当前锁状态
if (exclusiveCount(c) != 0 && // 有线程已经抢占了写锁
getExclusiveOwnerThread() != current) // 写线程不是当前线程
return -1; // 返回-1,进入lock等待队列
int r = sharedCount(c); // 获取当前读线程数
if (!readerShouldBlock() && // 当前读线程是否应当排队
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { // cas 新增读状态成功
if (r == 0) { // 如果r=0, 表示,当前线程为第一个获取读锁的线程
firstReader = current;
firstReaderHoldCount = 1; // 如果第一个获取读锁的对象为当前对象,将firstReaderHoldCount 加一
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 如果不是第一个获取多锁的线程,将该线程持有锁的次数信息,放入线程本地变量中,方便在整个请求上下文(请求锁、释放锁等过程中)使用持有锁次数信息
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1; // 成功获取锁
}
return fullTryAcquireShared(current);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
// 第一次尝试获取读锁失败后
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// 如果当前线程不是写锁的持有者,直接返回-1,
// 结束尝试获取读锁,需要排队去申请读锁。
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// readerShouldBlock() 为 true,说明阻塞队列中有其他线程在等待
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// firstReader 线程重入读锁,直接到下面的 CAS
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
// 如果发现 count == 0,也就是说,纯属上一行代码初始化的,那么执行 remove
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { // cas获取读锁
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 下面这几行,就是将 cachedHoldCounter 设置为当前线程
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++; // 计数+1
cachedHoldCounter = rh; // cache for release
}
return 1; // 成功获得锁
}
}

下面来看看读锁的释放:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Attempts to release this lock.
*
* <p>If the number of readers is now zero then the lock
* is made available for write lock attempts.
*/
public void unlock() {
sync.releaseShared(1);
}
//// --> call AbstractQueuedSynchronizer.releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared(); // 唤醒后继节点
return true;
}
return false;
}
//// --> call ReentrantReadWriteLock.Sync.tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1) //如果等于 1,那么这次解锁后就不再持有锁了,把 firstReader 置为 null,给后来的线程用
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 判断 cachedHoldCounter 是否缓存的是当前线程,不是的话要到 ThreadLocal 中取
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
// 这一步将 ThreadLocal remove 掉,防止内存泄漏。因为已经不再持有读锁了
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
//nextc 是 state 高 16 位减 1 后的值
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
// 如果 nextc == 0,那就是 state 全部 32 位都为 0,也就是读锁和写锁都空了
// 此时这里返回 true 的话,其实是帮助唤醒后继节点中的获取写锁的线程
return nextc == 0;
}
}

WriteLock

写锁是独占锁。如果有读锁被占用,写锁获取是要进入到阻塞队列中等待的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// WriteLock
public void lock() {
sync.acquire(1);
}
// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 如果 tryAcquire 失败,那么进入到阻塞队列等待
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// Sync
protected final boolean tryAcquire(int acquires) {

Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {

// 看下这里返回 false 的情况:
// c != 0 && w == 0: 写锁可用,但是有线程持有读锁(也可能是自己持有)
// c != 0 && w !=0 && current != getExclusiveOwnerThread(): 其他线程持有写锁
// 也就是说,只要有读锁或写锁被占用,这次就不能获取到写锁
if (w == 0 || current != getExclusiveOwnerThread())
return false;

if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");

// 这里不需要 CAS,仔细看就知道了,能到这里的,只可能是写锁重入,不然在上面的 if 就拦截了
setState(c + acquires);
return true;
}

// 如果写锁获取不需要 block,那么进行 CAS,成功就代表获取到了写锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

// unlock
public void unlock() {
sync.release(1);
}

// AQS
public final boolean release(int arg) {
// 1. 释放锁
if (tryRelease(arg)) {
// 2. 如果独占锁释放"完全",唤醒后继节点
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

// Sync
// 释放锁,是线程安全的,因为写锁是独占锁,具有排他性
// 实现很简单,state 减 1 就是了
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
// 如果 exclusiveCount(nextc) == 0,也就是说包括重入的,所有的写锁都释放了,
// 那么返回 true,这样会进行唤醒后继节点的操作。
return free;
}

锁降级

Doug Lea 没有说写锁更高级,如果有线程持有读锁,那么写锁获取也需要等待。

不过从源码中也可以看出,确实会给写锁一些特殊照顾,如非公平模式下,为了提高吞吐量,lock 的时候会先 CAS 竞争一下,能成功就代表读锁获取成功了,但是如果发现 head.next 是获取写锁的线程,就不会去做 CAS 操作。

Doug Lea 将持有写锁的线程,去获取读锁的过程称为锁降级(Lock downgrading)。这样,此线程就既持有写锁又持有读锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void processCachedData() {
// 获取读锁
rwl.readLock().lock();
if (!cacheValid) { // 如果缓存过期了,或者为 null
// 释放掉读锁,然后获取写锁 (后面会看到,没释放掉读锁就获取写锁,会发生死锁情况)
rwl.readLock().unlock();
rwl.writeLock().lock();

try {
if (!cacheValid) { // 重新判断,因为在等待写锁的过程中,可能前面有其他写线程执行过了
data = ...
cacheValid = true;
}
// 获取读锁 (持有写锁的情况下,是允许获取读锁的,称为 “锁降级”,反之不行。)
rwl.readLock().lock();
} finally {
// 释放写锁,此时还剩一个读锁
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
// 释放读锁
rwl.readLock().unlock();
}
}

Condition

条件(也称为条件队列 或条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,一直挂起该线程(即让其“等待”)。因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,因此要将某种形式的锁与该条件相关联。

1
2
3
4
5
6
7
8
9
10
public interface Condition {
void await() throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
void awaitUninterruptibly();
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}

以上是Condition接口定义的方法,await*对应于Object.wait,signal对应于Object.notify,signalAll对应于Object.notifyAll。特别说明的是Condition的接口改变名称就是为了避免与Object中的wait/notify/notifyAll的语义和使用上混淆,因为Condition同样有wait/notify/notifyAll方法。

每一个Lock可以有任意数据的Condition对象,Condition是与Lock绑定的,所以就有Lock的公平性特性:如果是公平锁,线程为按照FIFO的顺序从Condition.await中释放,如果是非公平锁,那么后续的锁竞争就不保证FIFO顺序了。

等待队列

等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。

一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列。

condition

await & signal|signalAll

await-signal

线程awaitThread先通过lock.lock()方法获取锁成功后调用了condition.await方法进入等待队列,而另一个线程signalThread通过lock.lock()方法获取锁成功后调用了condition.signal或者signalAll方法,使得线程awaitThread能够有机会移入到同步队列中,当其他线程释放lock后使得线程awaitThread能够有机会获取lock,从而使得线程awaitThread能够从await方法中退出执行后续操作。如果awaitThread获取lock失败会直接进入到同步队列。

condition 源码分析

Condition实例的产生方式是newCondition()方法,通过追踪方法。我们可以发现Condition的生产方法调用链是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// ReentrantLock
public Condition newCondition() {
return sync.newCondition();
}
// --> sync.newCondition()
final ConditionObject newCondition() {
return new ConditionObject();
}
// --> AbstractQueuedSynchronizer.ConditionObject.<init>
public class ConditionObject implements Condition, java.io.Serializable {
// 头结点
private transient Node firstWaiter;
// 尾节点
private transient Node lastWaiter;
public ConditionObject() { }
// ...
}

先来看看 await() 方法

调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 当前线程加入等待队列
Node node = addConditionWaiter();
// 释放同步状态,也就是释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 释放完毕后,遍历AQS的队列,看当前节点是否在队列中,
//不在,说明它还没有竞争锁的资格,所以继续将自己沉睡。
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被唤醒后,重新开始正式竞争锁,同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

addConditionWaiter

addConditionWaiter方法主要用于调用Condition.await 时将当前节点封装成 一个Node, 加入到 Condition Queue里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
// 调用 unlinkCancelledWaiters 对 "waitStatus != Node.CONDITION" 的节点进行删除
//(在Condition里面的Node的waitStatus 要么是CONDITION(正常), 要么就是 (signal/timeout/interrupt))
unlinkCancelledWaiters();
t = lastWaiter;
}
// 封装node并放到尾节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

这里有1个问题, 何时出现 t.waitStatus != Node.CONDITION

  • ConditionObject.await ->
  • checkInterruptWhileWaiting ->
  • transferAfterCancelledWait “compareAndSetWaitStatus(node, Node.CONDITION, 0)”

导致这种情况一般是 线程中断或 await 超时。注意: 当Condition进行 awiat 超时或被中断时, Condition里面的节点是没有被删除掉的, 需要其他 await 在将线程加入 Condition Queue 时调用addConditionWaiter而进而删除, 或 await 操作差不多结束时, 调用 “node.nextWaiter != null” 进行判断而删除 (通过 signal 进行唤醒时 node.nextWaiter 会被置空, 而中断和超时时不会)。

unlinkCancelledWaiters

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void unlinkCancelledWaiters(){
Node t = firstWaiter;
Node trail = null;
while(t != null){
Node next = t.nextWaiter;
// 1. 先初始化 next 节点
if(t.waitStatus != Node.CONDITION){
// 2. 节点无效
t.nextWaiter = null;
// 3. Node.nextWaiter 置空
if(trail == null){
// 4. 一次都没有遇到有效的节点
firstWaiter = next;
// 5. 将 next 赋值给 firstWaiter(此时 next 可能也是无效的, 这只是一个临时处理)
}else{
trail.nextWaiter = next;
// 6. next 赋值给 trail.nextWaiter, 这一步其实就是删除节点 t
}
if(next == null){
// 7. next == null 说明 已经遍历 完了 Condition Queue
lastWaiter = trail;
}
}else{
trail = t;
// 8. 将有效节点赋值给 trail
}
t = next;
}
}

fullyRelease

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState; // 保存节点状态
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

signal()

调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中.调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//firstWaiter为condition自己维护的一个链表的头结点,
//取出第一个节点后开始唤醒操作
if (first != null)
doSignal(first);
}
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}

doSignal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void doSignal(Node first) {
do {
// 将 first.nextWaiter 赋值给 firstWaiter
if ( (firstWaiter = first.nextWaiter) == null)
//这时若 firstWaiter == null, 则说明 Condition 为空了, 所以直接置空 lastWaiter
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
final boolean transferForSignal(Node node) {
// 无法CAS改变 waitStatus为0, 节点已经终止
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// AQS 入队,返回前驱节点
Node p = enq(node);
int ws = p.waitStatus;
//如果前驱节点结点的状态为cancel 或者修改waitStatus失败,则直接唤醒。
// 否则等待AQS竞争锁时唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

signal()方法只是将Condition等待队列头结点移出队列,此时该线程节点还是阻塞的,同时将该节点的线程重新包装加入AQS等待队列,当调用unlock方法时,会唤醒AQS等待队列的第二个节点,假如这个新节点是处于第二个位置,那么它将会被唤醒,否则,继续阻塞。

await 不响应中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final void awaitUninterruptibly(){
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
// 线程中断标识
while(!isOnSyncQueue(node)){
//这里是一个 while loop, 调用 isOnSyncQueue 判断当前的 Node 是否已经被转移到 Sync Queue 里面
LockSupport.park(this);
// 若当前 node 不在 sync queue 里面, 则先 block 一下等待其他线程调用 signal 进行唤醒;
if(Thread.interrupted()){
//判断这是唤醒是 signal 还是 interrupted(Thread.interrupted()会清楚线程的中断标记, interrupted进行记录)
interrupted = true;
// 说明这次唤醒是被中断而唤醒的,这个标记若是true的话, 在 awiat 离开时还要 自己中断一下(selfInterrupt), 其他的函数可能需要线程的中断标识
}
}
if(acquireQueued(node, savedState) || interrupted){
//acquireQueued 返回 true 说明线程在 block 的过程中式被 inetrrupt 过
selfInterrupt();
//自我中断, 外面的线程可以通过这个标识知道, 整个 awaitUninterruptibly 运行过程中 是否被中断过
}
}
// 下面两个是用于追踪 调用 awaitXXX 方法时线程有没有被中断过
// 主要的区别是
// 1. REINTERRUPT: 代表线程是在 signal 后被中断的 (REINTERRUPT = re-interrupt 再次中断 最后会调用 selfInterrupt)
// 2. THROW_IE: 代表在接受 signal 前被中断的, 则直接抛出异常 (Throw_IE = throw inner exception)
-------------本文结束感谢您的阅读-------------
坚持分享,您的支持将鼓励我继续创作!
0%