yield-bytes

沉淀、分享与无限进步

Java高级主题:基于AQS驱动的CountDownLatch实现原理解析

countdown:倒计时,latch:门闩;插销

其实CountDownLatch是一种共享锁资源的同步器,用于协调多个线程并行执行(注意不是并发),也即控制多个线程完成后接着在同一时刻去完成某事。当然了,这种协调的底层实现是基于AQS的共享模式实现。

CountDownLatch一般用法如下

1
2
countdownLatch.countDown()  // 使得内部的“计数器值”减1,一般由子线程去调用
countdownLatch.await() // 当内部的“计算器值”为0时,说明所有子线程都已经完成任务,那么阻塞的主线程就会被唤醒再去执行其他任务

第一种用法:主线程等所有子线程完成后再统一做某事

这里用开会作为例子,假设有A、B、C三个人参与开会,主持人是AA,开会的规则:要求三个人到场后,主持人才可以开始开会。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch =new CountDownLatch(3);
new A(countDownLatch).start();
new B(countDownLatch).start();
new C(countDownLatch).start();
countDownLatch.await(); // 主持人在等待三位到达会场
System.out.println("三位都到了会场,现在可以开会了");
}
}

class A extends Thread{
private final CountDownLatch countDownLatch;
A (CountDownLatch countDownLatch){
this.countDownLatch=countDownLatch;
}

@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("A已到达会场");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class B extends Thread{
private final CountDownLatch countDownLatch;
B (CountDownLatch countDownLatch){
this.countDownLatch=countDownLatch;
}

@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("B已到达会场");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class C extends Thread{
private final CountDownLatch countDownLatch;
C (CountDownLatch countDownLatch){
this.countDownLatch=countDownLatch;
}

@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println("C已到达会场");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

输出:

A已到达会场
B已到达会场
C已到达会场
三位都到了会场,现在可以开会了

countDown的底层AQS解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//1、 用户代码(子线程调用)
countDownLatch.countDown();

//2、 CountDownLatch同步器的countDown方法
public void countDown() {
sync.releaseShared(1);
}


//3、 AQS的releaseShared方法,
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 减1成功后,如果state值不为0则不会进入唤醒传播操作。实际场景中一般由最后一个完成任务的子线程到这边一步就会tryReleaseShared==0,进入唤醒逻辑
doReleaseShared();
return true;
}
return false;
}


//4、 CountDownLatch同步器的tryReleaseShared方法,也即AQS定义的模板方法的子类实现,注意:该“释放方法”是将state值(计数器)减1,而在信号量Semaphore同步器的tryReleaseShared是对可用资源加1操作,注意区别。
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1; // 这里就体现用户代码调用countDownLatch.countDown()使得内部计数器state减1的核心设计
if (compareAndSetState(c, nextc))
return nextc == 0; // 如果计时器的值为0,那么调用此逻辑线程的就会接着执行doReleaseShared()唤醒传播操作:实际场景中一般由最后一个执行countDownLatch.countDown()的子线程来唤醒阻塞队列里面的线程节点
}
}

await()的底层AQS解析

注意,调用countDownLatch.await()的线程可能会马上进入阻塞状态,已经掌握AQS的底层设计原理的同学应该可以看出所谓的“阻塞”其实就是进入了CLH阻塞队列后将自己park起来,并等待释放锁的线程来唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//1、 主线程调用
countDownLatch.await(); // 主持人在等待三位到达会场

/*2、CountDownLatch的await方法
Causes the current thread to wait until the latch has counted down to zero, unless the thread is interrupted.
这里的官方注释:此方法会使得当前线程处于等待状态直到内部计数器的值为0或者当前线程被外界中断
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

//3、AQS的获取资源
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果调用await前就已经被其他线程中断,那么会直接抛出异常,不再进入阻塞队列排队。
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // 注意:这里不是Semaphore对state可用资源进行减1操作,而是判断只要内部计时器不为0,tryAcquireShared就会返回-1,会导致调用await()的线程进入CLH阻塞队列:这里经常会这样使用,其他子线程做完任务后才能调用countDown(),而主线程作为和子线程接近同一时刻启动,因此对于主线程来说它会先读取到的state肯定不为0,使得tryAcquireShared(arg) < 0成立,主线程就会优先执行阻塞队列的流程
doAcquireSharedInterruptibly(arg);
}

// 4、CountDownLatch内部的tryAcquireShared,注意第3点所提,主线程先运行后await()这里返回getState()返回-1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}


// 5、AQS内部获取资源失败则进入阻塞队列进行阻塞
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 调用await()的主线程作为节点进入阻塞队列的队尾
boolean failed = true;
try {
for (;;) {
// 由于此刻阻塞队列里面仅有主线程在排队,因此它可以再去尝试获取锁资源
final Node p = node.predecessor();
if (p == head) {
// 根据上面的第3、第4说明,这里tryAcquireShared返回-1,因此主线程节点进入下面if阻塞自己
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 主线程在这里会阻塞自己(当然是了避免cpu空转),此时阻塞队列里面仅有一个正在等待外面线程唤醒的线程节点。
// 当然被唤醒后,继续在parkAndCheckInterrupt()内执行,进行自我中断检查:如果线程节点在阻塞队列的等待过程被外界中断过,则醒来后,直接抛出中断异常,然后就会进入finally的取消节点逻辑。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

总结以上“等三个人到齐后再开会”的例子:

虽然A、B、C以及主线程都是同时start,但因为A、B、C线程分别要执行一定时间后才能进行countDown减1,因此在主线程执行await()方法时马上有getState()==3,也即tryAcquireShared返回-1,故主线程进入了doAcquireSharedInterruptibly逻辑。

谁去唤醒已经在阻塞队列里面“主线程节点”

上一小节已经分析了调用await()的主线程先被AQS“调度”进入了阻塞队列里面,那么谁去负责把这个位于阻塞队列里面的主线程唤醒呢?这里不妨给一个拟人化的演绎推理:

假设A先到,他说我到了主持人可以开始开会,但显然后面还有B、C没到,因此A需要等齐人再去“唤醒主持人”,这样才显得“开会流程”是合理的,接着B到了,他说我到了主持人可以开始开会,但显然后面还有C没到,因此B也不适合去“唤醒主持人”,最后C到了,C看到前面的A、B都到了,这时符合三人都到齐了的条件,因此由最后一个到达会场的C来执行doReleaseShared唤醒逻辑才是合理的,也即让在“等候间睡眠的主持人醒来”开始主持会议。

对应到CountDownLatch—AQS算法层面的流程如下:

A、B、C以及主线程同一时刻start

1、主线程先进入AQS的阻塞队列并park上(阻塞自己)。

2、接着线程A执行1秒结束后使用countDown,计数器从初始给定的3减为2,因为getState !=0 ,此时不会执行doReleaseShared 唤醒操作

2、线程B执行2秒结束后使用countDown,计数器从上面的2减为1,因为getState !=0 ,因此此时不会执行doReleaseShared 唤醒操作

3、线程C执行3秒结束后使用countDown,计数器从上面的1减为0,因为getState ==0成立 ,此时会进入doReleaseShared唤醒操作操作流程,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// C线程使用countDown对应内部以下流程:
protected boolean tryReleaseShared(1) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState(); // C读取的计数器值为1
if (c == 0)
return false;
int nextc = c-1; // nextc=1-1=0
if (compareAndSetState(c, nextc)) // 此时CAS后,state变为0
return nextc == 0; // nextc=0显然返回true
}
}

public final boolean releaseShared(arg) {
// 显然上面的tryReleaseShared返回true,C线程可以去执行doReleaseShared唤醒传播操作,对应的逻辑就是让最后一个完成任务的C去唤醒在阻塞队列等待锁资源的主线程。
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}


private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
/*
显然对于阻塞队列结构: head(waitStatus=-1) <-> 主线程(waitStatus=0),满足
h.waitStatus == Node.SIGNAL,因此进入unparkSuccessor唤醒主线程。
*/
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

以上的唤醒流程其实可以得出这样的结论:虽然有多个线程使用的countDown,但最终是由最后一个完成自身任务的线程C(因为它可以使得state减为0)去唤醒“调用await被放入阻塞队列”的主线线程。

有了以上解析,那么针对CountDownLatch的第二种场景使用则可以很好理解其背后的工作机制。

第二种用法:要求多个线程在同一时刻“开跑”

这里我们用了Doug Lea在源码给出的demo代码自行设计一个场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Doug Lea在源码给出的demo代码
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);

for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();

doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}

class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}

void doWork() { ... }
}

对应的Demo:

场景:三个跑者线程在同一起跑线上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch waitSignal=new CountDownLatch(1);
CountDownLatch launchSignal=new CountDownLatch(3);
new RunnerA(waitSignal,launchSignal).start();
new RunnerB(waitSignal,launchSignal).start();
new RunnerC(waitSignal,launchSignal).start();

waitSignal.countDown();
launchSignal.await();

}
}

class RunnerA extends Thread{
private final CountDownLatch waitSignal;
private final CountDownLatch launchSignal;
RunnerA (CountDownLatch startSignal,CountDownLatch doneSignal ){
this.waitSignal =startSignal;
this.launchSignal =doneSignal;
}

@Override
public void run() {
try {
System.out.println("A在起跑线等待裁判吹哨");
waitSignal.await();
doRun();
launchSignal.countDown();
}catch (InterruptedException e){
e.printStackTrace();
}
}
private void doRun() throws InterruptedException {
Thread.sleep(1000);
System.out.println("A 到达了终点");

}
}
class RunnerB extends Thread{
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
RunnerB (CountDownLatch startSignal,CountDownLatch doneSignal ){
this.startSignal=startSignal;
this.doneSignal=doneSignal;
}

@Override
public void run() {
try {
System.out.println("B在起跑线等待裁判吹哨");
startSignal.await();
doRun();
doneSignal.countDown();
}catch (InterruptedException e){
e.printStackTrace();
}
}
private void doRun() throws InterruptedException {
Thread.sleep(3000);
System.out.println("B 到达了终点");

}
}
class RunnerC extends Thread{
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
RunnerC (CountDownLatch startSignal,CountDownLatch doneSignal ){
this.startSignal=startSignal;
this.doneSignal=doneSignal;
}

@Override
public void run() {
try {
System.out.println("C在起跑线等待裁判吹哨");
startSignal.await();
doRun();
doneSignal.countDown();
}catch (InterruptedException e){
e.printStackTrace();
}
}
private void doRun() throws InterruptedException {
Thread.sleep(5000);
System.out.println("C 到达了终点");

}
}

输出:

1
2
3
4
5
6
7
A在起跑线等待裁判吹哨// 同一时间启动
B在起跑线等待裁判吹哨// 同一时间启动
C在起跑线等待裁判吹哨// 同一时间启动

A 到达了终点 // 1秒后
B 到达了终点 // 3秒后
C 到达了终点 // 5秒后

以上线程调度涉及到两个AQS底层的阻塞队列:

第一个阻塞队列的形成(waitSignal背后对应的阻塞队列):

RunnerA、RunnerB、RunnerC这三个线程都在“run”里面先调用waitSignal.await(),且同一时刻启动,由于比主线程waitSignal.countDown()先执行,因此这三个线程的await的getSate !=0 会进入阻塞队列操作doAcquireSharedInterruptibly,也即在waitSignal对象上形成以下CLH阻塞队列:

1
head(null,-1) <-> node(RunnerA,-1)  <-> node(RunnerB,-1)  <-> node(RunnerC,0)->null 

(假设按A、B、C入队顺序做的阻塞队列示意图)

什么时刻由谁去唤醒他们呢?

这里就是为何在主线程下使用waitSignal.countDown()的原因:对应以下1、2执行流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 //CountDownLatch内部
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState(); // waitSignal=new CountDownLatch(1),此时state值为1
if (c == 0)
return false;
int nextc = c-1; // 主线程waitSignal.countDown()先执行,因此这里nextc=1-1=0
if (compareAndSetState(c, nextc))
return nextc == 0; // 显然返回true
}

// AQS
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 上面返回true后,当然进入唤醒操作
doReleaseShared(); // 内部的unparkSuccessor(h)会先唤醒RunnerA线程节点
return true;
}

从上面可以看出,在主线程执行waitSignal.countDown()后会在AQS内部先通过doReleaseShared里面的unparkSuccessor唤醒RunnerA,那么之后的RunnerB、RunnerC又是怎么被唤醒呢?这就需要使用AQS共享模式的唤醒传播设计去解释了,以下便是RunnerA唤醒之后的执行流程:

RunnerA通过setHeadAndPropagate(node, r) 唤醒了RunnerB,同时RunnerA也会出队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// ①RunnerA被唤醒后,会来到此位置继续循环
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//② RunnerA通过此方法实现将唤醒传播到RunnerB
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
//省略部分

以此类推,RunnerB通过setHeadAndPropagate(node, r) 唤醒了RunnerC,同时RunnerC也会出队

最终实现了以下唤醒流:

1、主线程唤醒RunnerA

2、RunnerA使用setHeadAndPropagate传播唤醒RunnerB

3、RunnerB使用setHeadAndPropagate传播唤醒RunnerC

如果没有看本博客之前关于AQS的共享模式设计的解析,则无法理解背后多个线程入队阻塞之后被唤醒的流程,以及所谓的传播唤醒设计原理。

第二个阻塞队列的形成(launchSignal背后对应的阻塞队列):

虽然三个线程和主线程都是同一时刻执行,但因为每个线程需要“跑步一段时间后”才执行launchSignal.countDown(),故主线程执行launchSignal.await()时,getState=3显然不是0,因此会执行doAcquireSharedInterruptibly进入另外一条CLH阻塞队列,队列结构如下:

1
head(null,-1) <-> node(主线程,0)->null

当主线程进入阻塞队列且被阻塞后,此时对应的用户线程是这样的:RunnerA运行自己的任务、RunnerB运行自己的任务、RunnerC运行自己的任务,且主程序还未退出,那么在什么时间且最终由谁去唤醒位于launchSignal对象内部的阻塞队列的主线程呢?

由于RunnerA是第一个结束任务(执行1秒)、RunnerB是第二个结束任务(执行3秒),RunnerC是作为最后一个结束任务(运行5秒),因此容易推出:由RunnerC在众多线程中作为最后一个结束任务的线程去唤醒位于launchSignal对象内部的阻塞队列的主线程。这里就不再具体解释其唤醒过程了,参考第一种用法里面的部分解析即可。