yield-bytes

沉淀、分享与无限进步

Java高级主题:基于AQS驱动的CyclicBarrier实现原理解析

CyclicBarrier的基本使用

这里的Barrier可以翻译为屏障、篱栅、栅栏、“关口”,本文统一称为屏障,因此CyclicBarrier如果非得翻译的话,可以理解为:一个可循环制造屏障的同步器。

CyclicBarrier是一个(用于多线程协调的)同步器,它允许一组线程实现互相等待,直到所有线程的执行流都来到一个公共屏障点 (然后再去做一个任务)

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point

要理解CyclicBarrier的使用或者其设计意图,可以通过以下简单的demo来了解:

场景: 三位同学A、B、C玩一个闯关游戏,总共有两关(每一关的关口称为屏障),每个同学在本关玩游戏花费时间不一定相同,但要求:当前仅当三位同学都到达关口位置时,由最晚到关口的同学去说:可以进行第二关的任务,然后三位同学继续下一关游戏…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class CyclicBarrierDemo {

static class Player extends Thread{
CyclicBarrier bar;
int duration;
public Player(CyclicBarrier bar,int duration){
this.bar=bar;
this.duration=duration;
}

@Override
public void run() {
System.out.println();
try {
Thread.sleep(1000*this.duration);
System.out.println(Thread.currentThread().getName()+"同学到达关口1");
bar.await();
Thread.sleep(1000*this.duration+1000);
System.out.println(Thread.currentThread().getName()+"同学到达关口2");
bar.await();

} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
CyclicBarrier bar=new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
String broadcast="最晚到关口的"+Thread.currentThread().getName()+"说:A、B、C都齐了,可以开始下一关";
System.out.println(broadcast);
}
});

new Player(bar,1).start(); // A同学,对应Thread-0
new Player(bar,2).start(); // B同学,对应Thread-1
new Player(bar,3).start(); // C同学,对应Thread-2

}
}

输出:

Thread-0同学到达关口1
Thread-1同学到达关口1
Thread-2同学到达关口1
最晚到关口的Thread-2说:A、B、C都齐了,可以开始下一关
Thread-0同学到达关口2
Thread-1同学到达关口2
Thread-2同学到达关口2
最晚到关口的Thread-2说:A、B、C都齐了,可以开始下一关

对输出的说明:

可以看到A、B、C三位依次到达关口1,最晚到关口的C同学来执行“通告工作”,接着三位同学又可以进行下一关游戏(第二关),同样A、B、C三位依次到达关口2,且由最晚到关口的C同学来执行“通告工作”。

可以结合以下示意图帮助理解
在这里插入图片描述

CyclicBarrier设计解析

重要的一些成员变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
  /**
* Each use of the barrier is represented as a generation instance.
* The generation changes whenever the barrier is tripped, or
* is reset. There can be many generations associated with threads
* using the barrier - due to the non-deterministic way the lock
* may be allocated to waiting threads - but only one of these
* can be active at a time (the one to which {@code count} applies)
* and all the rest are either broken or tripped.
* There need not be an active generation if there has been a break
* but no subsequent reset.
*/
private static class Generation {
boolean broken = false;
}

/** The lock for guarding barrier entry */
// 从这里可以看到,内部使用一把独占锁和一个条件队列实现阻塞机制
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
// 这里tripped可以理解为:如果所有线程还未到达屏障,那么那些先来到屏障的线程会被阻塞在trip条件队列中
private final Condition trip = lock.newCondition();
/** The number of parties */
// 到达同一屏障才给放行的线程数量,例如上面demo参与游戏的人数,要求3位同学到达关口才能进入下一关
private final int parties;
/* The command to run when tripped */
// 当所有线程都到达了屏障,由最晚到的线程去执行barrierCommand定义的具体任务,例如demo的C同学:最晚到关口的Thread-2说:A、B、C都齐了,可以开始下一关
private final Runnable barrierCommand;
/** The current generation */
// 这里的generation可以理解“当前屏障”、“本关”、“本局”、“本阶段”,就像游戏过关一样,如果在本局就是当前generation,如果通过本局就来到下一局,也即nextGeneration
private Generation generation = new Generation();

/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
/*
(1)用于统计目前还差多少个线程到达屏障才能进行下一关任务,由barrier.await()对其扣减。
在new CyclicBarrier给定,count的初始值=parties
例如上面的demo:
Thread-0执行barrier.await(),此时count=2,表示当前还差2个线程到达屏障才能进行下一关任务
Thread-1执行barrier.await(),此时count=1,表示当前还差1个线程到达屏障才能进行下一关任务
Thread-2执行barrier.await(),此时count=0,表示3个线程都到齐了屏障,可以开始任务
(2) 当“游戏”从本关进入到下一关(或者从本屏障通过,达到下一个屏障),count又恢复到parties的数量,这里体现了Cyclic重复、循环的意思。
(3) 当然还有其他情况,参考后面的解析。
*/
private int count;


/**
关于“when the barrier is tripped”,本文一律解释为“当屏障”
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
注意这里已经很清楚说明:由最后到达barrier的线程来执行给定barrierAction(也即初始化传入一个Runnable的任务,类似demo里面“A、B、C都齐了,可以开始下一关”的打印任务),barrierAction也可以是null,意味着无任何操作。
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
//
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
核心方法await、dowait

对于await方法,它会响应中断异常和所谓的“打破屏障异常”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*在源码该方法注释中,说明调用await的线程阻塞在trip条件队列中,除非发生以下5个情况(这些情况都会让条件队列中全部阻塞的线程们唤醒):
1、最后一个到达屏障线程,此线程会signalAll条件队列中所有阻塞线程
1、The last thread arrives; or
2、其他线程中断当前执行await的线程
2、Some other thread interrupts the current thread; or
3、其他线程中断一个已经在条件队列阻塞的线程
3、Some other thread interrupts one of the other waiting threads; or
4、在条件队列等待状态的线程出现等待超时的情况
4、Some other thread times out while waiting for barrier; or
5、有些线程主动使用reset方法
5、Some other thread invokes reset on this barrier.

*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
//具体的阻塞机制和唤醒机制是在dowait实现。
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
dowait核心方法的设计
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

/**
* Main barrier code, covering the various policies.
*/
// 注释说这里面有多种处理策略
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock(); // 先获取独占锁的这种设计对于条件队列来说已经再熟悉不过
try {
// 临时变量g指向当前代(或者本局游戏)
final Generation g = generation;
// 1、此线程执行await前,当前屏障就已经被本局中的其他线程提前打破,则此线程只能抛出BrokenBarrierException。
if (g.broken)
throw new BrokenBarrierException();
// 2、线程还未进入条件队列就已经被外界中断,那么“所有线程都到达当前屏障的情况一定不会满足了”, 那么采取需要这样的操作:Sets current barrier generation as broken and wakes up everyone,并抛出中断异常。
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 3、线程调用await会在这里对计时器减1,或者说:每有一个线程到达屏障,计时器就减1
int index = --count;
// 4、这里是关键的逻辑:当计时器为0时,说明所有的线程都到达了屏障,那么最晚到的那个线程可以开始执行一个指定任务:barrierCommand,然后再唤醒所有线程
if (index == 0) { // tripped
boolean ranAction = false; // 字面意思也可以猜出:是否成功运行了任务
try {
// 5、这里就是demo里面的由最晚到的C同学来完成“通知可以进行下一关”的任务
final Runnable command = barrierCommand;
if (command != null)
command.run(); // 由最晚到屏障的线程来执行给定的任务
ranAction = true;
// 6 "开启下一关":这最晚的到屏障的线程(C同学)通知那些已经在条件队列阻塞的线程们“开始进行下一局游戏”
nextGeneration();
return 0;
} finally {
// 7、如果最晚到达屏障的线程没能“成功执行给定的任务”,那么也要唤醒条件队列里面的线程,表明本局游戏中止
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
// 8、来到这里,说明执行await的线程一定不是最晚到达屏障的,那就先进入条件队列里面阻塞等待着,直到triped、当前屏障被打翻、在条件队列阻塞过程中被中断、超时等待,只要其中一个发生,此线程都会被唤醒。
for (;;) {
try {
// 9、如果使用普通的await,则调用AQS条件队列里面普通await,如果使用超时的await,那么就调用AQS条件队列里面有超时计时的awaitNanos方法
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 10、执行流来到这里,说明第9点提到线程在条件队列等待中被外界中断唤醒,如果唤醒线程发现还在本局“游戏”进度中且屏障还未被打翻,那么自己就去做breakBarrier这个操作:因为CyclicBarrier的设计要求只要任一在条件队列阻塞的线程被中断唤醒,那么“本局游戏”就认为是“输了”,表示本局需要中止。
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// 11 、来到这里,考察第10点提到的两个条件中,(1)如果g!=generation,说明这个醒来的线程发现自己处于在“下一局游戏环境中”,既然这样自己可以继续执行,不必打破已经处在“下一局”的环境,至多将自己标记为中断过
//(2)如果g == generation 且 g.broken=true,说明当前局的屏障被(其他醒来的线程)打翻过,自己只需补一个中断标记即可,不必重新又来一个breakBarrier()。
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
/*
11、由于g.broken要么被当前线程自己来标记,要么就是本局中其他线程比当前线程提早标记g.broken,因此分为以下几种情况。也即对应dowait的上面各种策略
① 此线程执行await前,当前屏障就已经被本局中的其他线程提前打破
② 如果本局中最晚到达屏障的线程没能“成功执行给定的任务”
③ 如果本局中已经在条件队列中阻塞过程中被外界中断或者超时中断唤醒
④ 本局中线程或者外界线程调用reset
或者参考await方法的注释
Some other thread interrupts the current thread; or
Some other thread interrupts one of the other waiting threads; or
Some other thread times out while waiting for barrier; or
Some other thread invokes reset on this barrier.
*/
if (g.broken)
throw new BrokenBarrierException();
// 12、说明当前线程是被最晚到达屏障的线程(例如demo的C同学)唤醒来的,那么说明“本局所有线程在玩游戏过程中都是正常执行,没有任何异常发生”,因为最晚到达屏障的线程它会在dowait第6点逻辑中执行nextGeneration,从而使得g指向下一局(指向下一代)
if (g != generation)
return index;
// 13、对于超时等待引起的中断从而唤醒正在条件队列等待的线程,那只能宣告“本局游戏中止,当前屏障要标记为打翻”然后抛出timeout异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
nextGeneration方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
// 在dowait方法的第6个逻辑:唤醒那些已经到达屏障然后在条件队列中阻塞的线程们(wakes up everyone):“可以开始开启下一局”
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
// 计时器重置为指定的线程数量,以便在下一局又可以重新对到达屏障的线程计数
count = parties;
// “开始下一局”、“开始下一关”、“开始下一代” 这三种表达的意思都一样
generation = new Generation();
}
breakBarrier方法
1
2
3
4
5
6
7
8
9
10
11
/**
broken有打翻、破碎的、中止了的意思,在这里可以理解本局中止了、本关中止了、当前代中止了。
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
// 在dowait方法的第7个逻辑:最晚到达屏障的线程没能“成功执行给定的任务”,这种情况称为“当前屏障被打翻”(不建议直接翻译,还是按current barrier generation as broken去理解显然更为自然)
private void breakBarrier() {
generation.broken = true; // 将当前局(当前代、当前关口)设为中止了。
count = parties;// 重置计数器
trip.signalAll(); // 即使最晚到达屏障的线程没能“成功执行给定的任务”,它也要将阻塞在trip条件队列里面的所有线程唤醒。
}
reset方法

首先reset方法是public的,因此如果有一组线程正在当前的barrier进行某任务中,假设此时外面有一个线程(或者本组的其他线程)主动调用reset,那么就导致“本局游戏中止”,所有正在trip条件队列阻塞的线程都要醒来,然后再重新开始“新一局游戏”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Resets the barrier to its initial state. If any parties are
* currently waiting at the barrier, they will return with a
* {@link BrokenBarrierException}. Note that resets <em>after</em>
* a breakage has occurred for other reasons can be complicated to
* carry out; threads need to re-synchronize in some other way,
* and choose one to perform the reset. It may be preferable to
* instead create a new barrier for subsequent use.
*/
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
关于“打破本局游戏”的说明

理解了CyclicBarrier的await方法(dowait)核心设计思想后,可以看到“如果有一组线程想通过CyclicBarrier不中断的完成一局又一局游戏”,条件是严格的:

(1)先完成自己工作的线程先到达关口(屏障)后,都得乖乖的在屏障这里等待(也即在trip关联的条件队列中阻塞),外界或者本组线程最好不要出现中断它的操作,(如果给定了在屏障的等待时长,线程还不能等太长导致超时,或者组内的还未到达屏障的线程没有去中断已经在条件队列的其他线程)

(2)所有的线程都到达了屏障

(3)最晚到达关口(屏障)的线程能不出错地完成指定的任务(barrierCommand)

以上三点都能顺利的话,那么这一组线程们就可以顺利进入“下一关(下一个屏障)游戏”,也即使得线程们能继续执行下一阶段的工作任务。

如何设计有多个关口的“游戏任务”?

这里说的“游戏任务”是指一种类似流水线的工作任务。只需在每个线程定义自己的run方法时,每完成一个“特定工作”就调用barrier.await(),例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static class Player extends Thread{
CyclicBarrier bar;
int duration;
public Player(CyclicBarrier bar){
this.bar=bar;
}

@Override
public void run() {
System.out.println();
try {
doSomeWork1()
bar.await(); // 此线程在第一局关口等待其他线程
doSomeWork2()
bar.await();// 此线程在第二局关口等待其他线程
doSomeWork3()
bar.await();// 此线程在第三局关口等待其他线程
.... // 以此类推

} catch (Exception e) {
e.printStackTrace();
}
}
private void doSomeWork1(){}
private void doSomeWork2(){}
private void doSomeWork3(){}

CyclicBarrier和CountDownLatch的对比说明

1、首先回顾CountDownLatch的基本使用:

以下是“等A、B、C三个同学到齐后才可以开会”的伪代码

1
2
3
4
5
6
7
8
9
10
11
1.初始化一个CountDownLatch waitSignal=new CountDownLatch(3)

2.主线程先执行condition.await(),此时主线程在“CLH阻塞队列中阻塞”

3.随后
子线程A启动便调用condition.countDown(),AQS的内部state从3减到2
子线程B启动便调用condition.countDown(),AQS的内部state从2减到1
子线程C最晚启动调用condition.countDown(),AQS的内部state从1减到0,满足唤醒条件,子线程C使用unparkSuccessor唤醒CHL阻塞队列的主线程

4.最后
主线程打印:A、B、C三个同学已经到齐,现在可以开会

可以看到CountDownLatch也可以完成CyclicBarrier的这种“等人到齐了再去做某任务”的场景,但你可以清楚看到,CountDownLatch并不能持续进行“下一阶段的开会”,但是使用CyclicBarrier可以实现“A、B、C三个同学到齐了可以进行第一次开会,接着再来一轮A、B、C三个同学到齐了可以开第二次会,依次类推”,这就是两者在使用上的区别。

2、AQS的底层工作机制不同

CyclicBarrier显然是基于一把独占锁(支持独占模式和共享模式)、一个条件队列和一个CLH阻塞队列去实现的,唤醒方法是使用条件队列的signalAll

而CountDownLatch是基于一把独占锁(仅共享模式)、一个CLH阻塞队列实现的,唤醒方法使用传播唤醒方式去唤醒,用到doReleaseShared(unparkSuccessor)去传播唤醒。