yield-bytes

沉淀、分享与无限进步

Java高级主题:基于AQS实现的ReentrantReadWriteLock源代码深入分析

在ReentrantLock的独占模式下,当需要在一个读写高度竞争场景中使用它的lock.lock()时,你会发现这是独占锁,它会让大量同时请求锁的线程们都不得不进入CLH阻塞队列中等待,如果在“读多写少”的场景中,ReentrantLock的这种独占锁方式显然会降低并发性能,因此ReentrantReadWriteLock就是为了解决这种“读多写少”的场景:一个线程正在请求锁进行读操作可以不影响其他线程同时请求读锁(共享锁),意味着多个线程可以并发读。

这里首先通过Doug Lea在源码注释给出的demo作为对ReentrantReadWriteLock一般用法说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class CachedData {
// 读线程需要对data读取,写线程可以对data进行更新,那么data显然是线程不安全的,需要借助锁进行相关操作
Object data;
volatile boolean cacheValid; // 判断data是否已经被缓存
// 1、创建一个读写锁
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
//该方式的功能就是对已经缓存的数据进行预处理
void processCachedData() {
// 2、由于一开始,我们乐观认为data可能已经被缓存,因此我们不急着申请独占锁,而是申请并发的共享读锁
rwl.readLock().lock();
// 3、结果发现:我们太乐观了,原来data没缓存,因此还不能读取数据(如果非得去读,只能读到旧数据)
if (!cacheValid) {
// Must release read lock before acquiring write lock
// 4、既然乐观错估了情况,那么只能用上“强大的独占锁”,保证自己能独占的实施“写操作、更新操作、删除操作”:因为读写锁是互斥的,需先释放读锁,然后在升级为写锁。
//
rwl.readLock().unlock();
// 这就是所谓的“锁升级”。虽然说升级,但这里是当前线程请求独占锁,若同一时刻有其他线程已经拿到写锁,那么在这里当前线程会被阻塞在AQS里面的CLH阻塞队列
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
// 5、由于我们获取写锁的时机也许比其他线程晚一步拿到,因此在这里拿到独占锁后,还得重新检查是否data已经提前被其他线程更新过
if (!cacheValid) {
data = ... // 例如从数据库重新读取最新的data,然后将cacheValid标记为true,表示data已经更新(或已经更新缓存)
cacheValid = true;
}
// 6、由于我们还得使用更新后的data,因此可以申请读锁。注意,由于当前线程还持有写锁,因此其他线程不可能获得写锁进行写操作,因此当前线程此时可以申请读锁
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
// 7、显然data已经被我“独占式”地更新过,可以释放写锁了
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}

try {
// 8、在第6点获得共享的读锁后,在这里可以使用已经缓存的新data
use(data);
} finally {
// 9、释放读锁
rwl.readLock().unlock();
}
}
}

关于读锁、写锁、读线程、写线程的一些说明

rwl.readLock().lock() `:很明显,线程在请求读锁,此锁是AQS的共享模式

rwl.writeLock().lock():很明显,线程在请求写锁,此锁是AQS的独占模式

严格来说:对于ReentrantReadWriteLock这种读写锁的设计,请求读锁的线程直接称为“读线程”可以吗,例如,请求写锁的线程,由于已经获取独占锁的线程可以去请求读锁,那么这个线程是应该称为“写线程”?“读线程”?还是“写、读线程”?还是“读写线程”呢? 这么看来,请求到读锁的线程似乎也不适合称为读线程?成功请求到写锁的线程也不适合称为写线程。也许换一个角度可以更容易区分:

在用户代码层面的角度出发:

1、用户设计此方法具有明显“读取”数据的逻辑时,该方法首先使用`rwl.readLock().lock() `,过程中不管是否需要再次请求写锁,都可以把执行用户方法的线程称为“读线程”

2、用户设计此方法具有明显“更新、删除、改、插入”数据的逻辑时,该方法首先使用`rwl.writeLock().lock() `,过程中不管是否需要再次请求写锁、还是读锁,都可以把执行用户方法的线程称为“写线程”

但是从ReentrantReadWriteLock内部设计来看,可能按以下思路去想会更清晰:

1、仅在ReentrantReadWriteLock内部调用tryAcquire方法来看,调用tryAcquire方法的线程可以看成是写线程,因为它是从rwl.writeLock().lock()来的

2、仅在ReentrantReadWriteLock内部调用tryAcquireShared方法来看,调用tryAcquireShared方法的线程可以看成是读线程,因为它是从rwl.readLock().lock()来的

如果不按”用户代码”或者不按ReentrantReadWriteLock内部设计的角度,那么也可以按以下说明进行统一化描述:

“请求读锁的线程”、“持有读锁的线程”

“请求写锁的线程”、“持有写锁的线程”

在持有写锁的情况下,请求读锁的线程。

不存在“持有读锁的情况下,还能同时持有写锁的线程”的情形! (否则此读锁已不是共享锁,而是变相成为了独占锁)或者说不满足“读写互斥设计”

关于同步状态值的设计

在ReentrantLock中,如何记录线程已经获得锁资源的次数呢?回顾代码设计如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 只有state为0时,其他线程才有机会争抢独占锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 同一线程再次获取独占锁,同一线程重入锁
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

ReentrantLock仅有一个state值,对于同一线程执行lock.lock()n次,那么state变为n,表示此线程获取独占锁n次或者说重入n次(当然也要求此线程要释放n次),既然是独占锁,那么就不会出现这种情况:有多个不同线程同时成功更改state,也即同一个时刻,仅能有个线程CAS更改成功。

而在ReentrantReadWriteLock的同步状态值中,state的设计非常巧妙,使用高16位作为读锁重入的计数,低16位作为线程写锁重入的计数

对于一个int值通过位运算实现不同场景下的计数,在Doug Lea的并发设计里面一个高级的手段:例如在ConcurrentHashMap里面的resizeStamp也采用这种移位计算状态的策略,还有ConcurrentHashMap的TreeBin读写锁设计也是采用位运算策略。

高16位作为读锁重入的计数有两种情况:

(1)此高16位的数值可表示同一读线程的请求的读锁总数(总重入数)

(2)此高16位的数值可表示多个读线程同时请求到读锁总数

低16位作为写锁计数的情况仅有一种:

因为写锁是独占锁,因此低16位一定是表示同一写线程请求的总独占锁数量(总重入数)

以下是读写锁的同步计数移位计算的设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
SHARED_SHIFT   = 16
(1)读锁设计
static final int SHARED_UNIT = (1 << SHARED_SHIFT); 用于获取state高16的读锁总数
0000 0000 0000 0000 0000 0000 0000 0001
// 1右移16位,对应的就是SHARED_UNIT
0000 0000 0000 0001 0000 0000 0000 0000

如何获取读锁总数?
0000 0000 0000 0110 0000 0000 0000 0010;某一时刻state的值
对其左移16位:
0000 0000 0000 0000 0000 0000 0000 0110
因此可以快速得出读锁数量:6个读锁(当然也同时存在某个线程重入2次的写锁)
以上就是sharedCount的设计
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

如何实现读锁加1呢?如果直接state+1,那么这个加1是加在低位上,显然不合理,其实也很简单:
state+(1<<SHARED_SHIFT),这就实现在在高16位的读锁加1

(2) 写锁设计
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; 用于获取state低16位的写线程数量
任意的state值和这个独占锁掩码相与后即可得到state的低16位置,例如下面
0000 0000 0000 0110 0000 0000 0000 0010 ;;某一时刻state的值
0000 0000 0000 0000 1111 1111 1111 1111 ;独占锁掩码
两者取与后,得到写线程数量:
0000 0000 0000 0000 0000 0000 0000 0010
因此可以快速得出写锁数量:2个写锁(当然也同时存在6个读锁)
以上exclusiveCount的设计
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

写锁如何加1?
state+(1 & EXCLUSIVE_MASK),显然state+1就是在state的低16位上做累加,计算公式合理
写锁如何加n?
state+(n & EXCLUSIVE_MASK),显然state+n就是在state的低16位上做累加,计算公式合理

(3)读锁最大可重入数量为65535,当然写锁也是
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1
0000 0000 0000 0001 0000 0000 0000 0000 减1 也即:
0000 0000 0000 0000 1111 1111 1111 1111

写锁实现

1
2
3
4
// 1、创建一个读写锁
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
// 2、请求写锁
rwl.writeLock().lock();

原来写锁的获取并不像ReentrantLock使用lock.lock()的方式,而是通过writeLock().lock()获取,从方法名就知道这样设计为了方便使用者知道当前使用什么类型的锁。

首先看其构造器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

/**
* Creates a new {@code ReentrantReadWriteLock} with
* default (nonfair) ordering properties.
*/
public ReentrantReadWriteLock() { // 默认构造器使用的是非公平模式
this(false);
}

/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this); // new时就已经实例化一个readerLock 对象和 writerLock 对象
writerLock = new WriteLock(this);
}


// Sync当然是ReentrantReadWriteLock内部核心实现类,实现了AQS的tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared的关键逻辑,以及其他辅助方法
abstract static class Sync extends AbstractQueuedSynchronizer
static final class NonfairSync extends Sync
static final class FairSync extends Sync


// WriteLockReadLock 都是ReentrantReadWriteLock内部类
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }

public static class ReadLock implements Lock, java.io.Serializable
public static class WriteLock implements Lock, java.io.Serializable
WriteLock实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47


public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;

/**
* Constructor for use by subclasses
*
* @param lock the outer lock object
* @throws NullPointerException if the lock is null
*/
// WriteLock的调用点是在ReentrantReadWriteLock的构造器中:
/*
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
*/
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}

/**
* Acquires the write lock.
*
* <p>Acquires the write lock if neither the read nor write lock
* are held by another thread
* and returns immediately, setting the write lock hold count to
* one.
*
* <p>If the current thread already holds the write lock then the
* hold count is incremented by one and the method returns
* immediately.
*
* <p>If the lock is held by another thread then the current
* thread becomes disabled for thread scheduling purposes and
* lies dormant until the write lock has been acquired, at which
* time the write lock hold count is set to one.
*/
// 从这里可以看出:类似的用户程序使用rwl.writeLock().lock()时,其实就是调用AQS的独占锁的获取锁逻辑:acquire(1),显然既然是独占锁,那么获取逻辑自然跟ReentrantLock的lock()是类似的,也即AQS的acquire方法.
public void lock() {
sync.acquire(1);
}
...//WriteLock类其他省略部分

显然sync.acquire(1)对应内部的AQS逻辑如下:

1
2
3
4
5
6
public final void acquire(int arg) {
// tryAcquire由ReentrantReadWriteLock的sync实现
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
写锁获取-tryAcquire

要知道所谓的写锁获取,其实就是独占锁的请求,要么仅能一个线程拥有此独占锁,要么同一线程重入独占锁多次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
protected final boolean tryAcquire(int acquires) {
/* Walkthrough:攻略或玩法,也即tryAcquire的成功与否的策略
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState(); // 当前同步状态值
int w = exclusiveCount(c); // 当前写锁计数(需要利用移位来计算出)
// 由于同步状态可以表征读锁计数和写锁计数,因为当前线程想获取写锁,因此它需要判断:同步状态值不为0到底是指有其他线程获取到了写锁还是获取了读锁还是都被获取?
if (c != 0) {
// 1、c != 0且写锁计数为0,说明当前线程持有读锁,不允许请求写锁(否则就是持有读锁情况下拿到写锁,那么这个读锁就不是共享模式读锁,而是变相的独占锁,显然违背了ReentrantReadWriteLock的功能设计初衷);或者说“当前线程持有读锁且不释放读锁的情况下,不允许升级为写锁,否则读锁就变相的成为了独占锁”
// 2、写锁计数不为了0,说明是当前线程正在重入独占锁,如果不是同一线程请求,写锁当然请求失败,返回false
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 3、是当前线程重入的写锁请求,但重入次数超过最大值,直接抛出异常提示
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 4、说明是同一线程的写锁重入,累加计数后,允许通过请求锁。
setState(c + acquires);
return true;
}
// 5、执行流来到这里说明c等于0,说明此刻暂时还没其他线程在竞争读、写锁,那么当前线程可以去CAS抢锁
/* writerShouldBlock() 是用于是否使用公平模式抢锁策略,对于写锁获取策略来说,显然是false,如下所示
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() { return false; // writers can always barge ,写锁获取只能直接争抢
}
因此这里的if第一个writerShouldBlock()返回false然后第二个条件就是去跟其他写线程CAS抢锁了,失败那么只能返回false
*/
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 6、对于c != 0,说明此写线程重入成功,对于c=0,说明此线程是第一个获得独占锁的线程
setExclusiveOwnerThread(current);
return true;
}
写锁释放-tryRelease

用户代码层面,只需调用rwl.writeLock().unlock()来释放写锁,需要注意的是,这里指代独占锁的释放,意味着要么已经拥有独占锁的写线程释放此锁,要么已经重入独占锁的写线程退出重入锁

(读锁的释放也会使用此方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then
* the hold count is decremented. If the hold count is now
* zero then the lock is released. If the current thread is
* not the holder of this lock then {@link
* IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
//按照注释提示:
// 1、如果当前线程是重入的,那么每次unlock就是对重入次数减1,也即此线程内部写锁计数hold count 减1 (注意,如果hold count减1后不为0,此写线程是不会释放独占锁的)
// 2、如果当前线程的不是重入,仅是获取独占锁一次,那么当它调用unlock时,显然state变为0,意味着是释放独占锁,其他线程可以争抢此独占锁了。
public void unlock() {
sync.release(1);
}

/*
* Note that tryRelease and tryAcquire can be called by
* Conditions. So it is possible that their arguments contain
* both read and write holds that are all released during a
* condition wait and re-established in tryAcquire.
*/
// 释放锁的逻辑
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases; // 扣减释放量
boolean free = exclusiveCount(nextc) == 0; // 这里的free是指写锁释放完全释放,如果等于0,说写锁完全被释放。
// 如果为0,说明写锁完全被释放,其他线程(含读、写线程)可以争抢了
if (free)
setExclusiveOwnerThread(null);
// free不为0,说明是同一线程重入写锁后,现在是释放自己重入的写锁,也即“重入-退出”操作,将state设为扣减后的值
setState(nextc);
return free;
}

读锁实现

读锁的实现类最关键的还是lockunlock方法,具体实现由ReentrantReadWriteLock 内部的Sync类的tryAcquireSharedtryReleaseShared对应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

/**
* The lock returned by method {@link ReentrantReadWriteLock#readLock}.
*/
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;

/**
* Constructor for use by subclasses
*
* @param lock the outer lock object
* @throws NullPointerException if the lock is null
*/
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}


/**
* Acquires the read lock.
*
* <p>Acquires the read lock if the write lock is not held by
* another thread and returns immediately.
*
* <p>If the write lock is held by another thread then
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until the read lock has been acquired.
*/
public void lock() {
sync.acquireShared(1); // 具体获取读锁的逻辑在tryAcquireShared实现
}



// 响应中断的获取读锁方式
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

// 即时返回是否成功获取读锁,显然如果写锁别其他线程持有,那么此时tryLock() 立即返回false
public boolean tryLock() {
return sync.tryReadLock();
}


public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

// 是否读锁,具体释放读锁的逻辑在tryReleaseShared实现
public void unlock() {
sync.releaseShared(1);
}

/**
* Throws {@code UnsupportedOperationException} because
* {@code ReadLocks} do not support conditions.
*
* @throws UnsupportedOperationException always
*/
public Condition newCondition() {
throw new UnsupportedOperationException();
}

/**
* Returns a string identifying this lock, as well as its lock state.
* The state, in brackets, includes the String {@code "Read locks ="}
* followed by the number of held read locks.
*
* @return a string identifying this lock, as well as its lock state
*/
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}
读锁获取(tryAcquireShared)

调用者使用rwl.readLock().lock() 获取读锁,内部调用是acquireShared方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14

/**
* Acquires the read lock.
*
* <p>Acquires the read lock if the write lock is not held by
* another thread and returns immediately.
*
* <p>If the write lock is held by another thread then
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until the read lock has been acquired.
*/
public void lock() {
sync.acquireShared(1);
}

acquireShared当然是AQS的模板方法:

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) // 只要ReentrantReadWriteLock的tryAcquireShared获取读锁方法返回-1,就说明当前线程没能马上获得读锁,得去CLH排队等待
doAcquireShared(arg);
}

tryAcquireShared则是ReentrantReadWriteLock实现请求读锁的关键逻辑

其实tryAcquireShared总逻辑分为4部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected final int tryAcquireShared(int unused) {

Thread current = Thread.currentThread();
int c = getState();
// 第1部分:全局存在写锁且不是当前线程持有,因此当前线程不能获取读锁
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// 第2部分:全局层面获得读锁
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {

// 第3部分:{第2部分的全局层面获得读锁后,还得对当前线程自己持有的读锁计数进行相关管理},这部分最为核心,也是最难理解的部分。

return 1; // 第2部分的全局层面获得读锁,必然返回1
}
// 第4部分:在全局层面获取读锁失败,使用以下全面尝试获取读锁的逻辑去获取读锁,增加当前线程获取读锁的概率。
return fullTryAcquireShared(current);
}

完整解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
//1、 当前state存在写锁计数但不是当前线程持有,那么不容许获取读锁,因为两者被设计为是互斥的,换句话说:在有写锁存在的条件下,只能是已经持有写锁的当前线程才能获取当前读锁,当然这种情况被称为“锁降级”。想想为何?
// 因为如果存在写锁的条件下但不是当前线程持有,还去允许当前线程获取读锁的话,这不就等价于将别人占有的写锁“抢了过来”,这显然是不符合ReentrantReadWriteLock的读写互斥设计的。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//2、当前state拥有的读锁数量
int r = sharedCount(c);
//3、如果此时读线程是非公平模式且读锁计数小于最大值且CAS更新读锁计数加1成功,那么就执行此if里面的条件
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 在state的高16位累加1,也即使得所有线程总的读锁计数加1,执行流来到这里,说明当前线程已经成功获得读锁,下面就是要完成当前线程持有读锁的计数管理相关逻辑。

//4、在2读取的r值如果为0,说明当前线程是第一个进来请求读锁的线程,记为firstReader
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1; // 记录第一个读线程的重入数量,首次当然是1
// 5、 如果在第2点读取的读锁数量不为0且当前线程就是之前的第一个读线程,说明是重入,对第一个读线程的重入进行计数加1
} else if (firstReader == current) {
firstReaderHoldCount++;
// 6、以下的读锁设计比较难理解,参考后面的内容分析。
} else {
// 读取“全局缓存计数器”,注意到此“全局缓存计数器”只缓存“最新成功获取读锁的那个线程”:The hold count of the last thread to successfully acquire readLock,目的是This saves ThreadLocal lookup,避免回到ThreadLocalHoldCounter的readHolds去查找。
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
/* 6.1
情况1 如果此“全局缓存计数器”变量为空,说明在此刻之前没有“最新成功获取读锁的那个线程””出现,那么当前线程自然可以将自己设为“最新成功获取读锁的那个线程”这个角色
情况2 如果此“全局缓存计数器”变量不为空但已经缓存“最新成功获取读锁的那个线程”,而这个缓存线程又不是当前线程,说明当前线程从此刻起将成为“最新成功获取读锁的线程”角色。
针对情况1和情况2,既然当前线程在此刻已经成为“最新成功获取读锁的线程”角色,那么当前线程取出自己的计数器并放入“全局缓存计数器”:cachedHoldCounter = rh = readHolds.get()。这样就保证了cachedHoldCounter会一直指向“最新成功获取读锁的线程”
*/
cachedHoldCounter = rh = readHolds.get();
/*6.2
说明6.1两个条件都不成立,说明“全局缓存计数器”缓存的恰好是当前线程,如果缓存的读锁计数为0,那么说明这个线程是在上一刻释放了自己持有的最后一个读锁且将“全局缓存计数器”计数减至0(并且它会调用readHolds.remove()移除了rh计数器对象,这一操作可以发生在tryReleaseShared中),现在这一刻此线程又再次进来作为“最新成功获取读锁的线程”,而此刻当前线程自己readHolds并没有放置计时器,于是作为“最新成功获取读锁的线程”,当前线程再将计数器放入到自己的“ThreadLocalHoldCounter的readHolds中”。
*/
else if (rh.count == 0)
readHolds.set(rh);
//
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
关于“当前线程自己持有的读锁计数进行相关管理”的设计(ReentrantReadWriteLock最核心的设计)

首先,读锁的计数需要在两个层面上进行去理解:

所有线程请求的总读锁计数的加1操作,使用以下方式记录:

1
2
3
4
     // 也即每个线程,只要请求读锁成功,那么读锁总数对应加1(state的高16位加1)  
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {

其次,还需要记录每个线程自己请求读锁的数量(或者重入读锁的次数),也即“当前线程自己持有的读锁计数进行相关管理”。这里解释为何每个线程需要记录自己读锁的数量,这是因为ReentrantReadWriteLock设计的读锁是可重入的,那么每个线程管理自己的读锁数量后,可以方便进行且准确的加锁、释放锁操作(自己重入多少次,就需要释放多少次)以及是否需要进入排队操作。而compareAndSetState(c, c + SHARED_UNIT)是记录所有线程的总读锁数量的设计,显然无法实现“当前线程自己持有的读锁计数进行相关管理”的功能。

这种要实现“每个线程自己的状态管理和线程关联起来”的场景,ThreadLocal无疑是最适合的,这在“ThreadLocal的源代码设计的深度解析”的文章中已提及过。在这里通过设计一个计数器HoldCounter并放入到ThreadLocal,也即ThreadLocalHoldCounter,即可实现线程安全方式实现“当前线程自己持有的读锁计数进行相关管理”的需求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// 理解本节内容的前提是掌握ThreadLocal的底层设计原理及其源代码实现,否则无法掌握其设计内涵。	
/**
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/
// 1、此辅助对象就是为了记录当前线程请求读锁的次数,目的是为了知道此线程第一有没有持有读锁,第二持有读锁情况下重入多少次。交由ThreadLocal来维护。
static final class HoldCounter {
int count = 0; // 记录当前线程的读锁请求次数
// Use id, not reference, to avoid garbage retention
// 当前线程的内存地址ID号,用于一些特别场景例如下面的cachedHoldCounter
// 使用线程ID号而不是对象可以有效优化GC
final long tid = getThreadId(Thread.currentThread());
}
// 2、通过ThreadLocal维护HoldCounter的更新,实现每个线程能线程安全的方式去管理持有读锁的计数。
// 这里是设定HoldCounter的初始化值
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

/**
* The number of reentrant read locks held by current thread.
* Initialized only in constructor and readObject.
* Removed whenever a thread's read hold count drops to 0.
*/
/*
3、当前线程持有读锁的重入次数,如果不为0,说明当前线程肯定持有读锁,需要注意它的灵活用法:
总体上一定要遵守“当前线程不再持有读锁时,当前线程的ThreadLocal对象一定不能还存放着HoldCounter对象,否则HoldCounter是强引用,会造成当前线程的ThreadLocal出现内存泄露的风险”

3.1 一旦当前线程释放自己持有的最后一个读锁,说明当前线程不再需要持有“读锁计时器”引用来给自己计算读锁数量,那么当前线程还需要做多一个操作: readHolds.remove(),保证当前线程不再引用此“ThreadLocal变量”的读锁计数器,从而避免当前线程的ThreadLocal内存泄露,对应的代码逻辑(此逻辑一般在释放锁的操作中实施):
if (count <= 1) {
readHolds.remove();

readHolds = new ThreadLocalHoldCounter() ,当然此初始化已经被上面initialValue取代了。
每个线程持有的readHolds初始值:count=0,tid=此线程内存地址ID

3.2 如果当前线程查询自己readHolds的rh.count=0,说明当前线程是第一次成功获取读锁,考虑当前线程以后可能有重入读锁,那么此时可以将自己“读锁计数器” rh放到readHolds里面,对应的代码逻辑(此逻辑一般在请求读锁的操作中实施):
else if (rh.count == 0)
readHolds.set(rh);

*/
private transient ThreadLocalHoldCounter readHolds;

/**
* The hold count of the last thread to successfully acquire
* readLock. This saves ThreadLocal lookup in the common case
* where the next thread to release is the last one to
* acquire. This is non-volatile since it is just used
* as a heuristic, and would be great for threads to cache.
*
* <p>Can outlive the Thread for which it is caching the read
* hold count, but avoids garbage retention by not retaining a
* reference to the Thread.
*
* <p>Accessed via a benign data race; relies on the memory
* model's final field and out-of-thin-air guarantees.
*/

// 注意此cachedHoldCounter从名字可以直到它是一个缓存的读锁计数器,那么它是缓存哪个线程的读锁计时器呢?
// 缓存的是“最近(最新)获得读锁的那个线程的读锁计数器”,例如Thread-0、Thread-1、Thread-2三个线程并发竞争读锁,如果Thread-1是最后一个成功获取读锁的线程,那么cachedHoldCounter就会缓存Thread-1的读锁计数count=1和Thread-1对应的内存地址ID号。当然如果后面还有最新的来的Thread-xx,那么cachedHoldCounter就会马上更新缓存Thread-xx的信息。
/* 这个缓存cachedHoldCounter有何作用呢?
(1)在前面ThreadLocalHoldCounter中,每个线程HoldCounter由ThreadLocal维护,如果此线程需要查询自己的读锁计数,需要使用ThreadLocal里面的get方法,而此get方法会调用ThreadLocal里面ThreadLocalMap的getEntry方法,了解ThreadLocal的底层设计都知道这一get操作还可能引起ThreadLocalMap的额外清理操作,这无疑降低查询效率,因此干脆设计用于“具有缓存功能、快速查询”的cachedHoldCounter,如果当前线程恰好就是设置cachedHoldCounter的线程,那么它直接在cachedHoldCounter就可以拿到自己的读锁数,完全不需要调用ThreadLocal里面get方法来查询(避免了在ThreadLocalMap内部的一系列操作)通过这种方式,当前线程在读锁重入时能提高加锁效率。
(2)当前如果读锁竞争激烈,那么假设线程加锁后缓存了Thread-1,在Thread-1释放锁前,又有其他线程Thread-2、Thread-3请求了读锁,那么cachedHoldCounter就会一直更新,这种情况导致Thread-1需要调用ThreadLocal查询自己的读锁计数。可以说Thread-1在请求读锁重入锁期间没有“享受到缓存带来的性能提升”
正如源码注释里面的说的:This is non-volatile since it is just used as a heuristic, and would be great for threads to cache.
因为这是被设计为一个试探性的缓存,因此不需要设为volatile类型,如果当前线程试探性去查询发现此缓存是自己加锁后设置的缓存,那么就可以提高此线程的本次加锁和释放读锁的性能,如果不是自己设置缓存,那就稍微牺牲一点性能ThreadLocal(在内部的ThreadLocalMap)去查询。
*/
private transient HoldCounter cachedHoldCounter;


/**
* firstReader is the first thread to have acquired the read lock.
* firstReaderHoldCount is firstReader's hold count.
*
* <p>More precisely, firstReader is the unique thread that last
* changed the shared count from 0 to 1, and has not released the
* read lock since then; null if there is no such thread.
*
* <p>Cannot cause garbage retention unless the thread terminated
* without relinquishing its read locks, since tryReleaseShared
* sets it to null.
*
* <p>Accessed via a benign data race; relies on the memory
* model's out-of-thin-air guarantees for references.
*
* <p>This allows tracking of read holds for uncontended read
* locks to be very cheap.
*/
/* 以下两个成员也是为了提升读锁加速的性能。认真看其源码注释说:更准确地说,firstReader是指向一个线程———这个线程是让总读锁计数从0变为1的线程。注意需要用动态的思维看待这样的解释。简单来说就是第一个获得读锁的线程,firstReaderHoldCount显然就是第一个获得读锁线程的重入计数
那么在什么场景下firstReader和firstReaderHoldCount起到提升性能的作用呢?
对于这种有序获取读锁的场景,优化效果明显,例如,
Thread-0加锁-释放锁后,接着Thread-1才开始请求读锁,Thread-1释放读锁,然后Thread-2才开始请求读锁....
可以这么理解:
当Thraed-0作为第一个请求读锁的线程,它从加锁到释放锁的代码执行期间,此时Thread-1还没开始启动请求读锁。那么当Thread-0再次重入锁时(而且此刻还没其他线程来请求读锁),显然只需查询firstReaderHoldCount自己的读锁计数即可,无需进入自己的ThreadLocal里面去查询读锁计数,因为进入ThreadLocal去查询意味着要去ThreadLocalMap查询(get操作还可能引起清理stale entry)意味着降低查询效率。
这就是注释所说的:This allows tracking of read holds for uncontended read locks to be very cheap. 能够使用最小代价去追踪那种非竞争线程获取的读锁计数
uncontended read locks:一个线程拥有读锁的时候,没有其他线程企图获得读锁(也就是非并发竞争的)
*/
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

可以看到,读锁的获取设计确实较为复杂,因为Doug Lea为了实现在某些非竞争情况下请求锁性能的提升,采用了非常高技巧的“fast path”策略——先试探性执行某个“快速”操作,如果没命中那就再执行那个“相对耗时”的操作。(这种设计思想其实在ConcurrentHashMap里面的addCountfullAddCount已经体现过)

其次可以通过一个内部方法快速理解线程自己管理的读锁计数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  // 获取当前线程想获取自己持有读锁的数量
final int getReadHoldCount() {
// 1. 如果总读锁数量为0,那么当前线程持有的读锁数量肯定也为0
if (getReadLockCount() == 0)
return 0;

Thread current = Thread.currentThread();
// 2. 看看自己是否为第一个获取读锁的角色,如果是,好办直接使用firstReaderHoldCount存储的数量即可,无需进入到ThreadLocal里面查询
if (firstReader == current)
return firstReaderHoldCount;
// 3. 说明当前线程不是“第一个获取读锁”的线程,那就看看“全局缓存计时器”缓存的是哪个线程的
HoldCounter rh = cachedHoldCounter;
// 4. 如果存在“全局缓存计时器”且缓存的就是当前线程自己,那么说明当前线程一定是“最新获取读锁的那个线程”,返回缓存的计数即可
if (rh != null && rh.tid == getThreadId(current))
return rh.count;
// 5. 说明自己不是“最新获取读锁的那个线程”,当然也不是“第一个获取读锁的角色”,因此要读取自己持有的读锁计数,显然要先取出自己在ThreadLocal放置的计时器
int count = readHolds.get().count;
// 6. 自己在ThreadLocal放置的计时器的读锁数量已经变为0,说明当前线程接下来已经不再持有读锁,也即不需要再借助“计数器”,那么应该及时在自己的ThreadLocal变量删除此“计数器对象”——ThreadLocalHoldCounter
if (count == 0) readHolds.remove();
return count;
}

有了以上缓存和快速试探的算法设计以及线程自己是如何管理自己持有的读锁的分析后,那么fullTryAcquireShared的算法流程才相对好理解。

fullTryAcquireShared

处理“快速尝试tryAcquireShared中CAS失败更新总读锁”的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96

/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
// 正如注释里面说的:专为快速尝试tryAcquireShared中CAS失败更新总读锁、重入读锁的情况
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
// 注释也说到fullTryAcquireShared的总体代码实现跟tryAcquireShared大体类似(有部分冗余)
HoldCounter rh = null;
// 自旋+内部CAS,保证了当前线程一定会在某个时机上成功拿到读锁,或在某个时机拿不到锁然后被放入CLH阻塞队列里面排队
for (;;) {
int c = getState();
// 1、同tryAcquireShared的设计,同步状态存在写锁,且是其他线程持有的写锁,显然当前线程无法获取读锁,因为不同线程的写、读锁请求是互斥的。
if (exclusiveCount(c) != 0) {
// A
if (getExclusiveOwnerThread() != current)
return -1;
// else return -1
//补充技术点:注意到这里有个死锁的解释:假设给A配一个else分支,返回-1,那么就使得持有写锁的当前线程进入阻塞队列,它持有的写锁一直不会释放,而外面的线程又等着这个写锁的释放,因此进入了死锁困境,所以不能在这里return -1。换句话说,不要试图阻塞已持有的独占锁线程,否则会出现死锁意外。其实这种合理的设计在AQS的条件队列await里面的“addConditionWaiter然后fullyRelease”有提到(进了条件队列后,阻塞前把自己持有的所有独占锁完全释放掉)
// else we hold the exclusive lock; blocking here
// would cause deadlock.
// 2、来到这里说明此时写锁计数为0,暂时没有其他线程请求写锁,当前线程可以去请求读锁但遗憾的是readerShouldBlock为true时,表示当前线程需要被阻塞,而if里面的逻辑并没有关于“进入CLH阻塞队列排队的操作”,原来这里的if是这么设计的:既然当前线程待会要进去阻塞队列,那么在它进去之前,要求当前线程先处理好自己的读锁计数情况
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly

// 如果存在这样的场合:假设某线程AA在某次tryAcquireShared请求读锁成功,然后在某次重入读锁时tryAcquireShared没有抢到读锁只能调用fullTryAcquireShared来处理重入锁时,线程AA来到if (readerShouldBlock()) 为true情况,接下里怎么安排呢? 以下就是对应的处理策略:
// 3.1 如果当前线程就是第一个加读锁的线程,说明是重入读锁,即使readerShouldBlock为true,当前线程也不需要去阻塞队列排队(也即不需要在这里return -1),因为对于重入锁情况,直接它进入下面的第11位置执行重入锁逻辑
if (firstReader == current) {
// 并没有直接返回-1
// assert firstReaderHoldCount > 0;
} else {
// 3.2 说明当前线程不是firstReader也不是firstReader的重入,而是此线程是第一次竞争读锁,但因为大前提readerShouldBlock是true,因此此线程一定会进入CLH阻塞队列。
// 4、
if (rh == null) {
rh = cachedHoldCounter;
//5、
// 5.1 当前线程(显然是非firstReader角色)如果是第一次请求读锁,发现cachedHoldCounter还是空或者里面缓存不是自己的,从自己的ThreadLocal取出HoldCounter对象,然后进入6.1
// 5.2 当前线程是重入锁的情况,显然rh不是null,且6.1的判断不成立,会走入7.2逻辑
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
// 6、
// 6.1 对接5.1 由于当前线程是第一次请求读锁,但此时并没有拿到读锁,需要清空自己的ThreadLocal里面的HoldCounter对象,然后会来到7.1。因为这个线程将被送入CLH阻塞队列中,既然它准备要在阻塞队列了,那么它的ThreadLocal存放的HoldCounter对象要移除,避免ThreadLocal造成此线程的内存泄露。然后会进入7.1步骤
if (rh.count == 0)
readHolds.remove();
}
}
// 7、
// 7.1 当前线程第一次请求读锁,此时它持有读锁数量是0,由于readerShouldBlock为true,因此当前线程不能直接竞争读锁要去阻塞队列排队等待,因此返回-1
// 7.2 当前线程是重入情况,显然rh.count不等于0,因此会进入第12的重入锁累加
if (rh.count == 0)
return -1;
}
}
//8、 读锁总数达到最大值
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 9、如果当前线程CAS对总读锁加1成功,那么结果一定是返回1,中间逻辑就是线程对自己的计数器加1
if (compareAndSetState(c, c + SHARED_UNIT)) {
//10、发现此时读锁为0,那么当前线程就变成firstReader角色。
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 11、firstReader的重入
} else if (firstReader == current) {
firstReaderHoldCount++;
// 12、 当前线程(其他线程,非firstReader角色)的第一次请求读锁或者重入锁的情况
} else {
// 12.1 rh为空,说明准备要将缓冲计时器cachedHoldCounter指向当前“最新成功获取读锁的线程”
if (rh == null)
rh = cachedHoldCounter;
/* 12.2
情况1 如果此“全局缓存计数器”变量为空,说明在此刻之前没有“最新成功获取读锁的那个线程””出现,那么当前线程自然可以将自己设为“最新成功获取读锁的那个线程”这个角色
情况2 如果此“全局缓存计数器”变量不为空但已经缓存“最新成功获取读锁的那个线程”,而这个缓存线程又不是当前线程,说明当前线程从此刻起将成为“最新成功获取读锁的线程”角色。
针对情况1和情况2,既然当前线程在此刻已经成为“最新成功获取读锁的线程”角色,那么当前线程取出自己的计数器并放入“全局缓存计数器”:放入的前提是rh.count == 0,说明当前线程是“最新成功获取读锁的线程”且不是重入的,cachedHoldCounter = rh = readHolds.get()。这样就保证了cachedHoldCounter会一直指向“最新成功获取读锁的线程” 。
如果当前线程是重入请求读锁,那么这里if (rh.count == 0)就不会成立,自然不会有readHolds.set(rh)
*/
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 当前线程(其他线程,非firstReader角色)增加自己的读锁计数器,并将缓存设置为自己的
rh.count++;

cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

其实fullTryAcquireShared考虑到两种角色的线程处理,而每种角色有对应着“第一次请求读锁”和“重入读锁”的两种情况,这导致了fullTryAcquireShared设计的复杂性

第一种角色的线程:firstReader

readerShouldBlock为true时(意味着此线程要进行排队,但有例外:重入锁情况不需要排队):

(1)如果firstReader 是重入的,不需要返回-1,也即不需要进入CLH阻塞排队,直接进入第11位置的重入锁加1逻辑

第二种角色的线程:非firstReader角色,也即不是第一个请求读锁的线程

readerShouldBlock为true时(意味着此线程要进行排队,但有例外:重入锁情况不需要排队):

(1)如果当前线程(非firstReader角色)是重入的,此时rh.count!=0不需要返回-1,也即不需要进入CLH阻塞排队,直接进入第11位置的重入锁加1逻辑

(2)如果当前线程(非firstReader角色)是第一次请求读锁,但因为readerShouldBlock为true,表示需要进入阻塞队列排队,因此会满足rh.count=0返回-1,此线程进入阻塞队列

或者用更为简洁的话:

如果其他线程是第一次获取读锁,因为readerShouldBlock为true,因此就要返回-1,这样这种线程才会被安排到CLH队列去排队,符合“ShouldBlock的要求”。又因为这些线程第一次获取读锁就需要被安排到CLH排队,因此他们持有的计数器对象如果rh.count=0,就需要readHolds.remove(),这样保证了入队后,这个线程首先没有持有读锁且能对应没有持有读锁计数器,保证了这些线程不会发生“ThreadLocal内存泄露”。

如果是firstReader是重入锁或者其他线程做重入锁,那么对自己的HoldCount进行加1计数后返回1,表示重入锁成功。

关于读锁的非公平设计readerShouldBlock

如果ReentrantReadWriteLock在初始化给定的是非公平模式,那么其设计思路如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

/**
* Nonfair version of Sync
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}

主要到这里的readerShouldBlock方法都会在tryAcquireSharedfullTryAcquireShared使用,具体实现是apparentlyFirstQueuedIsExclusive,注释说它可以“试探性”的(或者说在一定概率上)防止在阻塞队列等待的写线程节点发生“锁饥饿”情况。

为何会出现“阻塞队列等待的写线程节点发生“锁饥饿”情况”?

假设现在有一个写锁节点已经在阻塞队列,结构如下:

head(null,-1)<->node(写线程,0)

fullTryAcquireShared中,如果readerShouldBlock的非公平模式设计为直接返回false:

1
2
3
final boolean readerShouldBlock() {
return false
}

(1)假设现在有新来的10个线程Thread-1~Thread-10并发请求读锁的线程,那么按照readerShouldBlock返回false的逻辑,他们可以直接竞争第一次请求锁或者重入锁,不妨假设这些请求读锁的10个线程都恰好领先于阻塞队列的写线程获取锁(毕竟新来的线程拿到CPU时间片的概率要比阻塞队列里面的写线程要高),那么就会导致此写线程根本无机会去请求锁,只能一直“委屈的在阻塞队列里面等待”,也即发生了“锁饥饿”情况。

那么如何优化这种极限情况?这就是apparentlyFirstQueuedIsExclusive方法要解决的。

(2)从方法的命名也可以看出它的含义:阻塞队列的第一个排队节点是否就是独占模式节点(在本文可以理解为:阻塞队列的第一个排队节点是否就是写线程节点),如果是独占模式节点,那么readerShouldBlock就会返回true,就会使得那些新来的请求读锁的线程们不能直接去竞争锁资源,而是被安排到阻塞队列去排队等候,这就使得阻塞队列的第一个写线程节点能够出队去竞争锁资源了,从而避免在(1)提到的“锁饥饿”情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

/**
* Returns {@code true} if the apparent first queued thread, if one
* exists, is waiting in exclusive mode. If this method returns
* {@code true}, and the current thread is attempting to acquire in
* shared mode (that is, this method is invoked from {@link
* #tryAcquireShared}) then it is guaranteed that the current thread
* is not the first queued thread. Used only as a heuristic in
* ReentrantReadWriteLock.
*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
// 1、存在阻塞队列
// 2、阻塞队列至少有1个线程节点在排队
// 3、且第一个线程节点是独占模式节点
// 4、且第一个线程节点线程未取消
// 如果以上4个条件同时成立,那么就会返回true,也即readerShouldBlock返回true
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

以上逻辑可以结合以下两种情况来理解:

第一种情况:

外面的新来的线程们:Thread-1~Thread10

阻塞队列的情况:head(null,-1)<->node(写线程,0)

因此,根据apparentlyFirstQueuedIsExclusive的第三个条件,阻塞队列的第一个节点是独占模式节点(请求写锁的线程),那么readerShouldBlock就会返回true,要求外面的新来的线程们去阻塞队列排队等待,保证了此写线程能够出队抢锁。

第二种情况:

外面的新来的线程们:Thread-1~Thread10

阻塞队列的情况:head(null,-1)<->node(读线程,-1)<->node(写线程,0)

或者:head(null,-1)<->node(读线程,-1)…….<->node(写线程,0)

可以看到,阻塞队列里面的第一个线程节点为读线程节点,写线程排在第二或者其他更后面的位置,这时apparentlyFirstQueuedIsExclusive 显然因为!s.isShared()是共享模式节点而返回false,也即`readerShouldBlock就会返回false,那么外面新来的10个线程就会直接争抢锁资源而不需要排队,这会导致排在第二位置或者更后位置的写线程发生了一定程度的“锁饥饿”,因此只能说这种设计能够在一定程度上减少writer starvation,但不能保证一定杜绝writer starvation

因此,正如readerShouldBlock里面的注释:a probabilistic effect 或者As a heuristic to avoid indefinite writer starvation,也即有一定概率性的。

这里不得不赞叹Dung Lea设计的读锁请求算法是如此的精妙和高超!

读锁释放
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 1、用户代码
rwl.readLock().unlock();

// 2、ReentrantReadWriteLock的unlock

/**
* Attempts to release this lock.
*
* <p>If the number of readers is now zero then the lock
* is made available for write lock attempts.
*/
public void unlock() {
sync.releaseShared(1);
}

// 3、调用的是AQS的releaseShared:共享锁释放
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

// 4、ReentrantReadWriteLock内部Sync定义的释放读锁逻辑
// 从读锁的加锁分析中可以推理出:有几种释放锁情况
// 第一:请求读锁然后马上释放锁的情况,
// 第二:已经是重入锁的情况去释放自己的重入锁
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 1、如果当前线程恰好是第一个拿到读锁的线程,那么对于首次加锁那么释放锁就是直接将firstReader置为null,否则说明是重入锁的释放,只需将自己持有的读锁计数减1
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;

// 2、 非firstReader角色释放自己持有的读锁
} else {
HoldCounter rh = cachedHoldCounter;
// 2.1 如果缓存为空或者缓存的HoldCounter对象不是当前线程,那么就从当前线程的ThreadLocal取出它的HoldCounter对象赋给rh变量
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();

int count = rh.count;
// 2.2 如果原来持有的读锁<=1,那么因为本次释放就会使得自己持有读锁计数变为0,对应要清除自己ThreadLocal存放的HoldCounter对象,这是因为考虑到ThreadLocal里面的entry是WeakReference类型,而HoldCounter是放在entry中,那么显然调用remove后,使得这个entry没有对象引用,加速GC回收。
// 此外,当前线程因此已经准备要释放自己持有的最后一个读锁,说明释放后它就不不再需要计数,因此通过主动调用remove来避免ThreadLocal出现内存泄漏。
if (count <= 1) {
readHolds.remove(); // 务必已经掌握了ThreadLocal的底层源码设计,否则这段将无法理解为何要使用remove。
if (count <= 0)
throw unmatchedUnlockException();
}
// 2.3 如果原来持有的读锁计数大于1,那么当前线程的本次释放并不是释放自己持有的最后一个读锁,而是重入锁的退出,只需对自己持有的重入锁计数器减1即可
--rh.count;
}
//3、以上1和2步骤都是为了完成线程自己持有读锁的计数减1,那么下面使用自旋保证当前线程一定能使得总读锁计数state-1,如果释放后总锁(含读、写锁)剩余量刚好为0,那么当前下线程就会使用doReleaseShared去唤醒阻塞队里面正在等待的线程节点,尤其是被阻塞的写线程节点。
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
/* 这里也说到:如果是读线程释放读锁,对于持有读锁的线程来说是无影响的,如何理解这句话?
想象一下:假设有65535个线程并发使用lock请求读锁,并假设都可以拿到读锁(意味着每个线程拿到1个读锁),此时state的高16位计数为65535,假设在下一刻有65534个线程同一时刻释放自己的读锁,可以容易推出:每次有线程成功CAS:compareAndSetState(c, nextc),且nextc不等于0,对应tryReleaseShared返回false,那么在外面此线程并不需要进入doReleaseShared()逻辑,这其实在说明:65534个线程在释放锁的过程中,tryReleaseShared都是返回false,显然都不影响其他读线程,这就是“Releasing the read lock has no effect on readers”的含义。
当tryReleaseShared返回true时,说明第65535个线程也释放自己的锁,此时nextc==0成立,这时当前线程才会进入doReleaseShared()唤醒阻塞队列里面的线程节点。 因此这一刻,读锁和写锁都没人请求,阻塞队列的线程节点当然可以出队去请求锁。
*/
return nextc == 0;
}
}

读写锁的公平性

我们知道,ReentrantReadWriteLock在创建时默认使用的非公平模式

1
2
3
4
5
6
7
8
9
10
// 默认构造器使用的是非公平模式
public ReentrantReadWriteLock() {
this(false);
}

public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this); // new时就已经实例化一个readerLock 对象和 writerLock 对象
writerLock = new WriteLock(this);
}

非公平模式实现:

请求写锁时:当前准备请求写锁的线程可以直接CAS写锁,不需要询问CLH阻塞队列是否有线程在等待拿锁。

请求读锁时:返回apparentlyFirstQueuedIsExclusive的策略,解析参考上文。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

/**
* Nonfair version of Sync
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}

公平模式:

请求写锁时:当前准备请求写锁的线程要去查询CLH阻塞队列是否有线程正在排队

请求读锁时:当前准备请求读锁的线程要去查询CLH阻塞队列是否有线程正在排队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

/**
* Fair version of Sync
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}



/**
* Queries whether any threads have been waiting to acquire longer
* than the current thread.
*
* <p>An invocation of this method is equivalent to (but may be
* more efficient than):
* <pre> {@code
* getFirstQueuedThread() != Thread.currentThread() &&
* hasQueuedThreads()}</pre>
*
* <p>Note that because cancellations due to interrupts and
* timeouts may occur at any time, a {@code true} return does not
* guarantee that some other thread will acquire before the current
* thread. Likewise, it is possible for another thread to win a
* race to enqueue after this method has returned {@code false},
* due to the queue being empty.
*
* <p>This method is designed to be used by a fair synchronizer to
* avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
* Such a synchronizer's {@link #tryAcquire} method should return
* {@code false}, and its {@link #tryAcquireShared} method should
* return a negative value, if this method returns {@code true}
* (unless this is a reentrant acquire). For example, the {@code
* tryAcquire} method for a fair, reentrant, exclusive mode
* synchronizer might look like this:
*
* <pre> {@code
// 这里给出当前线程请求独占锁前需要询问阻塞队列的是否有线程正在排队情况
* protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
* } else {
* // try to acquire normally
* }
* }}</pre>
*
* @return {@code true} if there is a queued thread preceding the
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty
* @since 1.7
*/

// 1. 如果当前准备请求读写锁的线程调用hasQueuedPredecessors()返回false,说明当前线程要么已经在队列且作为第一个排队节点,要么就是没有CLH阻塞队列,也即没有其他线程在排队,这两种情况都可以说明当前线程不需要排队可以直接去竞争锁。
// 2. s = h.next且s.thread = Thread.currentThread(),就是说明CLH阻塞队列里面第一个排队线程就是当前线程本身。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

小结

基于以上全文,显然ReentrantReadWriteLock的一些特定如下:

读读不互斥:持有读锁的线程A读数据显然不影响持有读锁的线程B读数据,这样的读读不互斥才是ReentrantReadWriteLock的高性能设计,如果采用ReentrantLock,那么持有lock的线程A读数据显然会导致线程B需要等待线程A释放了ReentrantLock的lock才能读数据,这就是为何ReentrantLock读读互斥不再符合读多写少的高并发需求的场景。

读写互斥:设想,如果允许读写不互斥,那么持有写锁的线程A正在写数据时,持有读锁的线程B读数据就会读到旧数据,这不是ReentrantReadWriteLock的设计初衷。因此持有写锁的线程A正在写数据时,线程B将无法请求到读锁或者写锁。持有读锁的线程A正在读数据时,线程B将无法请求到写锁来执行相关写操作,这样就保证了线程A在写时,其他线程不会读到乱数据。

写写互斥:持有写锁的线程A正在写数据时,线程B显然无法请求到写锁。

对于同一线程持有读锁的情况下,不允许持有写锁:否则就是持当前线程有读锁情况下竟然可以请求到写锁,那么这个读锁就不是共享模式读锁,而是变相称为了独占锁。

对于同一线程持有写锁的情况下,允许持有读锁:这是允许的,因此例如持有写锁的线程A正在写数据(外面的其他线程无法请求读锁和写锁),写完数据后,可以请求读锁再去读数据,这样不管是线程A自己还是外面的线程都能正确的读取到正确的数据。