yield-bytes

沉淀、分享与无限进步

Java高级主题:AQS核心源代码实现之共享模式的深入解析

信号量是典型的共享模式:

何为共享模式:同一时间可以有多个线程同时持有锁资源,强调的是线程并行概念。

独占模式:同一时间只能有一个线程持有锁资源

1、Semaphore信号量说明

底层实现原理:使用AQS同步状态的state来保存信号量的当前计数。release(1)表示线程释放了1个锁资源,对应的state值加1,acquire(1)表示线程消耗1个数量的锁资源,对应的state值减1。一般用于限制线程并发工作数量。

2、Semaphore使用acquire()的小demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SemaphoreDemo {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(5);
List<Thread> threadList=new ArrayList<>();
for (int i = 0; i < 10; i++) {
Thread t= new Thread(()->{
try {
semaphore.acquire();
System.out.println("线程" + Thread.currentThread().getId() + "已拿到锁资源");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
});
threadList.add(t);
}
for (Thread t : threadList) {
t.start();
}
for (Thread t : threadList) {
t.join();
}
}
}

可以观察到结果:5个线程能够同时成功抢到锁资源并执行了5秒,后面5个线程因为已经没有可用资源,只能等待前面5个线程运行结束后才能继续。

注意Semaphore在初始化的构造器也是可以指定公平模式和非公平模式,以下将以默认的公平模式去讨论AQS是如何支撑Semaphore的工作过程

3、Semaphore获取锁资源acquire的工作流程

构造器默认使用非公平模式,指定最大可用资源数(也可以称为可用信号量、许可证permits、最大可用锁资源等)

1
2
3
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

其实Semaphore类也是跟ReentrantLock设计如出一辙,内部定义了AQS的子类Sync类,并分别定义非公平模式实现类NonfairSync和公平模式的实现类FairSync

流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

// ① 用户代码:
Semaphore semaphore =new Semaphore(10); // 默认非公平模式
semaphore.acquire();


// ② Semaphore 内部的acquire方法其实调用的是可立即响应中断的获取锁资源方法:acquireSharedInterruptibly
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}


// ③ AQS定义的acquireSharedInterruptibly方法:在共享模式下获取锁资源,如果当前正在抢锁资源的线程被外界中断,则该线程会马上抛出中断异常。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 这里通过读取线程自己的中断标记来判断释放需要响应外界中断
if (Thread.interrupted())
throw new InterruptedException();
// 如果没有外界中断当前线程,且当前线程使用子类的tryAcquireShared具体逻辑去尝试获取锁资源时,已无剩余量,则进入排队逻辑
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}


//④ 上面的tryAcquireShared(arg)显然就是Semaphore子类Sync要实现的模板方法,也即NonfairSync的tryAcquireShared方法
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}


//⑤ 来自Semaphore子类Sync的非公平方式抢占共享锁资源
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 非公平性是指:所有新来的线程都通过自旋去竞争锁资源
int available = getState(); // 可以看到,state的值表示当前可用资源数
int remaining = available - acquires; // 剩余量=当前可用数量-请求量
// 使用CAS更新扣减之后的剩余量,将剩余量返回给外面调用方,例如`tryAcquireShared`方法
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}


/* ⑥
如果tryAcquireShared没有可用资源量,则线程可能会放入阻塞队列里面,具体由AQS里面的doAcquireSharedInterruptibly方法实现
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
*/
// AQS的可响应中断的共享模式获取锁资源方法,注意区别于doAcquireShared
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // new一个有共享标记的节点,将其添加到阻塞队列的队尾
boolean failed = true;
// 以下的设计流程其实跟独占模式的acquireQueued类似:当前线程节点node进入队尾后,立即检查它的前驱节点p,若p是head节点,说明node是阻塞队列的第一个线程节点,则可以再利用tryAcquireShared去尝试请求可用资源。这种设计就是所谓的“fast try,failed acquire then do enq”
try {
for (;;) {
final Node p = node.predecessor();
// ① 前驱节点是头节点,则可以进入尝试获取资源逻辑
if (p == head) {
// 这里还是使用Semaphore内部的NonfairSync的tryAcquireShared方法实现
int r = tryAcquireShared(arg);
/* ② 如果剩余的可用资源>=0,那么将node设为head,并且将唤醒动作传播到下一个线程节点上。这里要重点解释为何共享模式下不能用setHead,而是需要使用setHeadAndPropagate(node, r):
例如有10个线程,5个可用资源,假设前面5个线程抢用完资源后,第6到10个线程将进入阻塞队列并在等待中,在某一刻,前面5个线程同时释放资源,那么第6个线程唤醒后使用r = tryAcquireShared(arg)就会剩下4个可用资源,意味着第7、8、9、10这四个在阻塞队列的线程根本无需再等待,外面已经有4个资源够他们抢用了(除非此时外界又来了新线程直接跟7到10号线程竞争资源),因此需要在第6个线程唤醒获得资源后,马上将唤醒动作传播给第7个线程(第7个线程传播给第8个线程,类似多米诺骨牌效益),这就是共享模式为何采用Propagating wake up——将唤醒动作传播持续下去的设计思路
*/
if (r >= 0) {
setHeadAndPropagate(node, r); // 获取到锁资源的第6号线程可以出队,同时将第7号线程唤醒,完成“唤醒传播”操作
p.next = null; // help GC
failed = false;
return;
}
}

//③ 说明线程节点不是第一个线程节点,阻塞自己,避免浪费cpu时间片,这一点设计跟acquireQueued是一致的。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// ④在进入对尾阻塞自己后,parkAndCheckInterrupt()唤醒后,若发现自己阻塞期间被外界中断过,那么就通过抛出中断异常来响应“外界的中断请求”,try捕获到此异常后进入finally的“取消排队处理”
throw new InterruptedException();
}
} finally {
//⑤ 既然当前线程唤醒后,发现自己阻塞期间被外界中断过,那么线程取消排队,不再和阻塞队列里面的线程竞争共享锁资源
if (failed)
cancelAcquire(node);
}
}

4、doAcquireShared

注意其源码解释:Acquires in shared uninterruptible mode,强调了该方法是不响应中断的,而上面的doAcquireSharedInterruptibly 能响应外界设置中断标志且会抛出中断异常。

doAcquireShared在什么情况下会被调用呢?

用户层代码使用semaphore.acquireUninterruptibly()请求锁资源

1
2
3
4
5
6
7
8
9
  // Semaphore
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
// AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg); // 此处可以看到semaphore.acquireUninterruptibly()底层调用的是不响应中断的doAcquireShared
}

以下是doAcquireShared的设计解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/

private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// ① 假设node的前驱节点还不是head节点,则进入④流程,等到该node唤醒后重新来到①且此时p==head
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
// ③ 接④的interrupted = true的标志后,唤醒的线程将外界中断信号在这里补充了一个自我中断,注意:别错误认为此时线程就是抛出异常,doAcquireShared是被设计为“获取锁资源的过程中都不会响应中断处理”
// 如果线程是被unpark正常唤醒正常唤醒,那么就会跳过以下if的selfInterrupt()逻辑。
if (interrupted)
selfInterrupt();
// 虽然补充了自我中断标志,但是线程已经成功获得锁资源,因此不是“failed”,可直接返回。
failed = false;
return;
}
}
// ④
// 说明线程节点不是第一个线程节点,那么入队后,阻塞自己,避免浪费cpu时间片。
// shouldParkAfterFailedAcquire(p, node) 主要负责将node的前驱节点waitStatus改为SIGNAL,parkAndCheckInterrupt()主要负责阻塞线程并且在线程唤醒后,返回该线程有被中断的标志
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; // 这里指明了线程节点使用doAcquireShared成功获取锁资源前,是不会抛出中断异常的。当线程节点被唤醒时,对于被唤醒的情况有两种:一种是使用unpark正常唤醒,一种是被外界线程使用interrupt()中断,对于后面这种中断情况,当前唤醒线程并不会马上响应它(并将interrupted标志设为ture),而是继续后面的流程:在下一轮自旋将回到①处,如果p==head成立,则尝试获取锁资源,如果剩余的可用资源>=0,那么就会进入②位置,然后在③位置去处理“外界的中断信号”
}
// 如果线程节点node请求可用资源出现其他异常(显然不会是中断异常),则取消该线程节点排队,这一点设计跟acquireQueued是一致的。
} finally {
if (failed)
cancelAcquire(node);
}
}

5、setHeadAndPropagate

setHeadAndPropagate方法是共享模式获取锁资源的的一个核心设计,需要独立再给出详细分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
	    /**
因为阻塞队列第一个线程节点使用 tryAcquireShared获取可用资源后,剩余量r>=0,因此它可以出队,并且检查后驱节点是否以共享模式等待,如果是那么只要满足剩余量大于0或者后驱节点的PROPAGATE已经设置,那么就要讲唤醒动作传播下去,也即调用doReleaseShared来实现唤醒动作接力。注意到Doug lea有提到该方法会引起不必要的唤醒动作。
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below 保留旧头节点,用于之后检查
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/

/*
这里设计很机智:
条件① 如果可用资源量propagate>0,当然可以和后驱节点(如果存在)说:你们可以醒来啦,state这边还有资源可以抢占
条件②条件③是联合设计:如果旧头节点不为null那么就可以判断h.waitStatus头结点同步状态值,如果<0,说明waitStatus可能是SIGNAL或者PROPAGATE,故考虑唤醒传播逻辑
条件④:其实条件④就是Doug Lea经常用的一个trick写法,对于高并发场景下,可以在判断条件里面进行双重检查,也即前一刻读取一次,在后一刻马上读取一次
在这里,直接在条件里面重新读取新的阻塞队列头节点,如果为不空,就会跟前一刻的条件②条件③联合起来的逻辑一样。
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next; // 取出第一个线程节点的后驱节点,它可能null也可能是一个正在等待的线程节点
/*
唤醒后继共享节点
① s后驱节点为null,为何还要将唤醒动作接力下去? 这是因为虽然上一课刻读取node.next为null,但是在下一刻如果恰好有新的线程节点入队,那么此时doReleaseShared就可以派上用场,但如果下一刻还是没有新线程入队,那么doReleaseShared也会被调用,这就是注释提到的“may cause unnecessary wake-ups”,也即引起不必要的唤醒调用。
② 若s不为null,恰好保证第二个条件s.isShared()不会发生空指针异常且可以只要s.isShared()为true,说明后驱节点是共享模式,需要被唤醒去抢锁资源的。
这里之所以加入判断后驱节点s是共享模式节点的前提下才能执行唤醒操作,是因为在ReentrantReadWriteLock的读写锁同步器设计内部的阻塞队列中可能同时存在读线程节点(共享模式,需要继续被唤醒)和写线程节点(独占模式),而对于Semaphore同步器,只有s.isShared()为true才会将唤醒动作接力下去,否则说明此s节点是独占模式的节点(或者说此时用户使用的是ReentrantReadWriteLock同步器场景),不需要执行唤醒传播逻辑。
*/
if (s == null || s.isShared()) // 如果s不是null且s.isShared()不是true,说明当前使用setHeadAndPropagate是ReentrantReadWriteLock使用场景,那么此时该节点是一个写节点,不需要传播唤醒,因为这个写线程会等之前那个获得独占锁的写线程去唤醒。
// 当然这部分内容最好有了ReentrantReadWriteLock设计原理的基础后再来理解则更容易明白。
doReleaseShared();
}
}

6、doReleaseShared()

正如注释里面的说明:signals successor and ensures propagation,通知后驱节点以及保证唤醒传播(接力)下去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/* 由于在ReentrantReadWriteLock的阻塞队列里面可能同时存在独占模式的线程节点和共享模式的线程节点,因此该唤醒方法分为两种情况:如果遇到阻塞队列第一个线程节点是需要唤醒的
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) { // 注意到这里用到自旋,确保唤醒动作能够传播到下一个节点
Node h = head;
// 只要阻塞队列不是空链表(至少存在一个线程节点)
if (h != null && h != tail) {
int ws = h.waitStatus;
// ① 共享模式下,阻塞队列里面的线程节点的前驱节点waitStatus都会被设置SIGNAL,因此可以到这里如果直到head的ws是SIGNAL,就可以进行唤醒操作
if (ws == Node.SIGNAL) {
// 确保能将头节点从SIGNAL改为0才能进行唤醒后驱节点的操作
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// ② 如果此时头节点ws是0,就将其设为PROPAGATE,以保证唤醒操作能够传播到下一个节点
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果头节点已经被更新(也即inconsistent read),只能回到for循环继续重试
if (h == head) // loop if head changed
break;
}
}

7、Semaphore的释放资源release()

release的逻辑相对简单,这里不再过多说明。对比doAcquireShared方法,原来共享模式的同步器,不仅在释放资源时要求去唤醒阻塞队列的节点,被唤醒的线程节点在请求资源后,也要将唤醒操作传播下去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
 // 用户代码
semaphore.release();

// 在成功是否锁资源后,底层是调用AQS的doReleaseShared唤醒阻塞队列的首个等待的线程节点,tryReleaseShared(arg)是模板方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}


// Semaphore内部tryReleaseShared实现逻辑,对state进行CAS累加
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current=getState();
int next=current+releases;
if (next<current)
throw new Error("Permit count underflow");
if (compareAndSetState(current,next))
return true;

}
}

// 如果释放资源成功,也即state值加1(假设每次请求1个资源),那么进入AQS的共享模式下的唤醒操作:
if (tryReleaseShared(arg)) {
doReleaseShared();

其实释放资源的release()方法内部的doReleaseShared的唤醒设计在CountDownLatch的countDown里面也是同样的设计。

8、共享模式下的公平模式

此设计相对简单,和ReentrantLock的公平模式如出一辙,通过hasQueuedPredecessors()判断阻塞队列是否有正在排队的线程节点,如果有,那么只能乖乖的入队:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}