yield-bytes

沉淀、分享与无限进步

Java高级主题:AQS核心源代码实现Condition的深入解析

关于AQS的独占模式同步器设计原理(以ReentrantLock为例)以及共享模式的同步器设计原理(以Semaphore为例)在前面文章的已经讨论完毕,这两种模式让我们理解了AQS通过底层的FIFO阻塞队列(又称同步队列/变体的CLH队列/Sync queue)实现了相当巧妙的多线程协调调度的复杂逻辑。当然AQS还有一个更为关键的设计:结合FIFO阻塞队列+条件队列(又称condition queue/wait queue)实现一种基于条件的await和signal的多线程间的协调机制,也即本文内容。

关于条件队列和阻塞队列的说明

对于阻塞队列,这里只给出独占模式的线程节点说明:阻塞队列其实有AQS内部定义的双向链表节点的属性如下:

1
2
3
4
5
6
7
8
   static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
Node head; // Head of the wait queue
Node tail; // Tail of the wait queue

而条件队列中,它是有单向链表实现,该单向链表的节点的属性为:

1
2
3
Node firstWaiter; // First node of condition queue.
Node lastWaiter; // Last node of condition queue.
Node nextWaiter; // Link to next node waiting on condition,

可以看到三个属性用于构成单向链表,而条件队里的nextWaiter指针是为了区别阻塞队列中的next指针。

阻塞队列的作用在前面的文章已经给出很明确的说明:只要当前线程没有请求到锁资源(state),则需要进入CLH阻塞队列进行排队等候,那么AQS给出的条件队列到底是解决什么场景呢?

这里不妨考虑这种场景:

消费者线程:消费者线程原本在阻塞队列等待,当外界有线程释放了锁资源,那么此消费者线程从阻塞队列被唤醒后出队并拿到锁资源后,发现“存放货物的队列是空的”,这种未加入“条件限制”的等待线程调度策略则显得不够明智。

因此可以这么设计:给当前独占锁lock对象添加添加一个条件队列:如果有一个或者多消费者线程过来取“货物”,当遇到“仓库没有货物可取”这种条件时,那么这些消费者线程先被安排在条件队列等待(阻塞自己):

firstWaiter(c0)->c1->c2-c3->c4->lastWaiter(c5)->null

直到条件“仓库有货物可取时”,那么条件队列的消费者线程再转移到阻塞队列里面排队等候被唤醒去抢占锁资源以实施消费行为

基于Condition实现的生产者和消费者模型

这里将以一个经典多线程间协调案例作为分析基于AQS实现的Condition底层工作机制

生产者线程:每次向“仓库”生产1个产品,等待2秒后,并使用signal通知消费者线程

消费者线程:在启动后如果条件“仓库为空”成立则进入await阻塞状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.ReentrantLock;
public class ConditionDemo {
public static ReentrantLock lock=new ReentrantLock(true); // 使用公平模式,以方便debug观察多个消费者线程的入队顺序和出队顺序等。
static Condition condition=lock.newCondition(); // 可以看到锁对象关联了一个条件对象
static List<ElectricCar> teslaWarehouse=new ArrayList<>(); // 存放车的“仓库”

public static void main(String[] args) throws InterruptedException {
// 启动5个消费者去“消费”车
for (int i = 0; i < 5; i++) {
new Consumer(condition,teslaWarehouse,lock).start();
}
//让消费者线程先启动,接着再启动1个生产者线程,每隔2秒生产一台车并放入“仓库”
Thread.sleep(1);
new Producer(condition,teslaWarehouse,lock).start();

}

}

interface ElectricCar {}
class ModelY implements ElectricCar{
}

class Producer extends Thread {
private Condition condition;
private List<ElectricCar> list;
private ReentrantLock lock;
Producer(Condition condition,List<ElectricCar> list,ReentrantLock lock){
this.condition=condition;
this.list=list;
this.lock=lock;
}

@Override
public void run() {
while (true){
lock.lock(); // 注意:Condition对象使用await或者signal方法前需要获得于条件对象关联的独占锁,否则抛出非法错误。至于为何这么设计,需要等到文章最后的解释。
this.list.add(new ModelY());
System.out.println("生产了一台modelY");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signal(); // 通知消费者线程可以“消费”车了,
lock.unlock();
}

}
}

class Consumer extends Thread {
private Condition condition;
private List<ElectricCar> list;
private ReentrantLock lock;

Consumer(Condition condition, List<ElectricCar> list, ReentrantLock lock) {
this.condition = condition;
this.list = list;
this.lock = lock;
}

@Override
public void run() {
lock.lock();// 注意:Condition对象使用await或者signal方法前需要获得于条件对象关联的独占锁,否则抛出非法错误。至于为何这么设计,需要等到文章最后的解释。
try {
if (list.isEmpty())
condition.await(); // 消费者线程会在此释放锁资源并进入条件队列且阻塞自己,直到收到生产者生产了车信号
list.remove(0);
System.out.println(Thread.currentThread().getName() + "消费一台modelY");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

运行结果如下:生产者线程循环一次生产一台车,消费者线程有序的“消费”对应的一台车,由于只启动5个消费者线程,因此当生产者线程生产到第6台车时,此时已经没有其他新来的消费者线程来“消费车”,因此不断打印生产者生产车的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
生产了一台modelY
Thread-0消费一台modelY
生产了一台modelY
Thread-1消费一台modelY
生产了一台modelY
Thread-2消费一台modelY
生产了一台modelY
Thread-3消费一台modelY
生产了一台modelY
Thread-4消费一台modelY
生产了一台modelY
生产了一台modelY
生产了一台modelY
生产了一台modelY

以上的多个消费者线程await和生产者线程signal并发协调机制及其源码解析将在下面给出。

消费者线程的await底层设计解析

首先,注释new Producer(condition,teslaWarehouse,lock).start()这一行代码,以考察多个消费者线程lock.lock()->await()内部逻辑,main启动后会发现主程序阻塞了,你会好奇到底消费者线程在AQS的哪个地方阻塞了?是因为都进入条件队列导致的main阻塞,还是在阻塞队列里面导致main阻塞?根据之前关于ReentrantLock的独占模式原理解析,可知:

(1)首先,考察5个线程并发执行时都使用lock.lock()争抢独占锁资源,由于lock对象使用公平模式实例化,因此Thread-0首先成功占有锁资源,而其他4个线程将有序进入lock对象的阻塞队列,state锁资源和阻塞队列的结构如下:

Thread-0作为优先抢占了锁资源的独占线程,可以继续执行自己的逻辑

1
exclusiveOwnerThread=Thread-0

Thread-1到Thread-4因为并发执行lock.lock()因此进入lock对象关联的阻塞队列且线程状态处于waiting阻塞状态:

1
head(null,-1)<->node(Thread1,-1)<->node(Thread2,-1)<->node(Thread3,-1)<->node(Thread4,0)->null

显然位于阻塞队列的第一个线程节点Thread-1正在等待Thread-0的唤醒。

注意:此时条件队列还未出现!此时条件队列还未出现!

(2) Thread-0可以继续执行自己逻辑,也即下面执行流(注意,此时锁资源还是由Thread-0占有!):

1
2
if (list.isEmpty()) // 由于消费者线程早于生产者线程启动,因此一开始“仓库列表”是空的,因此进入以下await逻辑
condition.await(); // 这里是理解条件队列的关键入口

condition.await()调用的是AQS内部类ConditionObject的await()方法,继续以消费者线程Thread-0作为分析对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

// 由于Thread-0已经作为独占锁线程,意味着await方法是线程安全的,因此在这里你不会看到自旋和CAS。
public final void await() throws InterruptedException {
// await方法是响应外部中断的:当线程调用condition.await()前,被外部中断过,则这里马上响应中断
if (Thread.interrupted())
throw new InterruptedException();
// ① 将Thread-0添加到条件队列(单向链表)
Node node = addConditionWaiter();
/* ② Thread-0添加到条件队列后,需要释放占有的锁资源。内部其实调用release方法,也即Thread-0会唤醒阻塞队列里面的第一个线程节点Thread-1,此外:
进入条件队列后,为何还要释放自己占有的锁资源? 务必理解其设计原因,参考后面的解释。
*/
// 注意这里:一旦Thread-0释放成功,那么就会唤醒位于阻塞队列的Thread-1
int savedState = fullyRelease(node);
// ③ 记录Thread-0可能被外部中断的中断标记
int interruptMode = 0;
// ④ 显然Thread-0节点不在阻塞队列(这里SyncQueue就是上面Thread-1~Thread-4所在的阻塞队列),
while (!isOnSyncQueue(node)) {
//⑤ 将Thread-0进行阻塞处理,避免浪费cpu时间片,这里正是解释了为何在main在启动仅有5个消费者线程后,主程序一直被阻塞运行的原因
LockSupport.park(this);
// 以下的四个if都是为了处理“当线程被signal通知唤醒后响应中断或者取消在条件队列排队的情况,其设计思路会在文章后面给出,这里先分析其阻塞设计。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

以上就是消费者线程首次调用condition.await()的内部工作流程,此时阻塞队列结构和条件队列结构如下:

Thread-0进入条件队列—>释放自己占有的锁资源(此操作会唤醒阻塞队列的Thread-1)—>在条件队列阻塞自己

同理被唤醒的Thread-1会继续执行await()方法,跟Thread-0执行流类似,以此类推,最后,5个消费者线程都在条件队列里面阻塞了:

firstWaiter(Thread-0,-2)->node(Thread1,-2)->node(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null

支持await的内部几个核心方法

  • addConditionWaiter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// ①将Thread-0添加到条件队列(单向链表)
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 只要条件队列的尾部线程节点是取消状态,那么将其剔除出队列。为何要从尾部判断呢?参考fullyRelease的解析
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter; // 从这里可以看出t的含义:指向非取消状态的尾部节点
}
// 这里即可表明条件队列节点构成:将调用await的线程包装为节点,且状态值node.waitStatus为CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node; // 因为上面已经知道t不为null,那么才能将await新来的线程节点放入条件队列
lastWaiter = node;
return node;
}


  • fullyRelease

当Thread-0线程使用addConditionWaiter进入条件队列后,需要释放自己占有的独占锁资源,但为何要安排释放独占锁资源的逻辑?

以下可通过反证法考察其设计意图:

如果Thread-0进入条件队列后且不释放独占锁资源就进入阻塞状态,那么就会生产者线程lock.lock()加锁失败从而进入阻塞队列队尾,而阻塞队列里面第一个线程节点Thread-1显然已经没有外面线程来唤醒,结果就是:整个main程序直接stuck(卡住了),而且此时阻塞队列结构和条件队列结构如下:

head(null,-1)<->node(Thread1,-1)<->node(Thread2,-1)<->node(Thread3,-1)<->node(Thread4,-1)<->node(生产者线程,0)->null

条件队列结构:firstWaiter(Thread-0,-2)->null

以上的流程可以这样通俗理解:你拿了办公室钥匙(唯一一把钥匙),你直接去休息室“休眠”了,且没有把钥匙“释放”给后面的同事,那么你的同事1、同事2….只能等你,如果你一直“休眠”,那么他们一直无法进入办公室。

那么考察加入释放的独占锁资源逻辑后,其工作过程将如何?

(1)Thread-0的fullyRelease会调用AQS的release方法,从而唤醒了阻塞队列里面的Thread-1,此时Thread-1独占锁资源,Thread-1继续执行业务代码await(),同理进入条件队列,并使用fullyRelease独占锁资源同时该操作也会唤醒阻塞队列里面的Thread-2,以此类推,这种进入条件队后释放独占锁资源的流程可以这样通俗理解:

你拿了办公室钥匙(唯一一把钥匙),你直接去休息室“休眠”前,把钥匙“释放”给后面的同事1,接着,同事1也要去休息室“休眠”,去之前把钥匙“释放”给后面的同事2,以此类推

(2)显然此时5个消费者线程都在条件队列里面阻塞着(5个同事都在休息室“休眠”),直到生产者线程使用使用signal后,条件队列的Thread-0将被唤醒,之后的逻辑就是signal支持实现。

这就是fullyRelease的设计意图!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 进入条件队列的线程释放自己占有的独占锁,可能释放一个或者多个锁资源,如何理解多个锁资源?由于是condition对象是关联ReentrantLock对象,支持同一线程的一次(state=1)或者多次重入占有锁(state>1),因此这里fully是指:不管同一线程占用一个锁资源或者同一线程多次重入占用多个锁资源,都可以在这里一次性释放此线程占有的所有锁资源。
// 如果释放失败,在finally里面将此节点标记为取消状态,从这里可以看出:条件队列的线程节点取消状态可以在这里产生。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 如果Thread-0在release能成功释放锁资源,由于release方法(内部调用unparkSuccessor)会唤醒阻塞队列的第一个线程节点,因此Thread-0此时会唤醒阻塞队列里面的Thread-1
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
// 这里做了一个优化设计:每次新节点条件队列后,如果此节点出现异常,则马上将其标记为取消状态,意味着条件队列的队尾要么是正常节点要么就是被标记为取消状态的节点,这种设计将方便addConditionWaiter加入if (t != null && t.waitStatus != Node.CONDITION) 的逻辑,也即:每次添加新节点到条件队列前,只需判断队列尾部是否存在取消状态的节点,如果有则先使用unlinkCancelledWaiters删除这类节点,然后再new Node入队。
if (failed)
node.waitStatus = Node.CANCELLED;
}
}


public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

  • isOnSyncQueue

判断刚进入条件队列的线程节点是否已经被转移到阻塞队列中,若不在,则使用 LockSupport.park(this)将当前条件队列的线程节点阻塞,避免浪费cpu时间片。对于Thread-0,显然刚进入条件队列,一定不在阻塞队列,因此将其阻塞在条件队列里等待,isOnSyncQueue就控制了条件队列节点的阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
//① 节点状态为CONDITION,或者节点的prev为null(因为条件队列的节点只用到firstWaiter、lastWaiter、nextWaiter)说明该节点在条件队列,但一定不在阻塞队列
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//② 阻塞队列使用next指针,因此当前节点的next不为空,必然是在阻塞队列里面
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/

//③ 在①和②都不满足时,官方注释做了这么一个特殊情况说明:在第一个if判断线程节点的prev不为空时表明此刻它还未进入阻塞队列(因为casTail尾部入队失败导致),但可能在下一个时刻就会进入阻塞队列尾部,因此需要重新读取阻塞队列也即在阻塞队列的尾部开始向前遍历,来判断当前node是否在阻塞队列。
return findNodeFromTail(node);
}

经过以上的lock.lock()到condition.await()的线程节点工作原理分析,现在可以清晰看到线程们在阻塞队列结构过渡到条件队列结构的过程:

(1)5个消费者线程并发执行lock.lock(),形成的阻塞队列结构如下:

Thread-0

1
exclusiveOwnerThread=Thread-0

Thread-1到Thread-4因为并发执行lock.lock()因此进入lock对象关联的阻塞队列且线程状态处于waiting阻塞状态:

head(null,-1)<->node(Thread1,-1)<->node(Thread2,-1)<->node(Thread3,-1)<->node(Thread4,0)->null

(2)5个消费者线程的执行流来到condition.await(),形成的条件队列结构如下(单向链表),且每个消费者线程都处于阻塞状态:

firstWaiter(Thread-0,-2)->node(Thread1,-2)->node(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null

在前面demo的mian方法,我们给出注释new Producer(condition,teslaWarehouse,lock).start(),也即没有生产者线程启动,也即没有调用condition.signal方法,那么消费者线程Thread-0只能在条件队列一直被阻塞,当然后面的Thread-1到Thread-4也同样处于阻塞状态。

下面将加入生产者的signal后的工作过程分析

生产者线程的signal底层设计解析

1
2
3
4
5
6
7
    for (int i = 0; i < 5; i++) {
new Consumer(condition,teslaWarehouse,lock).start();
}
// 执行流到此,5个消费线程已经在条件队列阻塞中
Thread.sleep(1);
// 启动生产者线程
new Producer(condition,teslaWarehouse,lock).start();

Producer线程的运行逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void run() {
while (true){
lock.lock(); // 此时没有其他线程和生产者线程抢锁资源,因此生产者线程成功持有独占锁
this.list.add(new ModelY());
System.out.println("生产了一台modelY");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signal(); //这里是关键:通知条件队列里面的第一个非取消节点:消费者Thread-0
lock.unlock();
}
  • signal()设计逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

/**
将当前lock对象关联的条件队列中等待时间最长的线程节点,这里为何不是说头节点呢?因为头节点或者其他节点都有可能变成取消状态,那么真正处理的目标是那些非取消状态的且等待时间最长的线程节点转移到阻塞队列中
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
// 显然生产者线程确实是独占锁资源的线程,这里不会抛出非法状态异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}


/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
// 在将首个非取消状态的条件队列节点(也即Thread-0)转移到阻塞队列,且顺便在这一过程中移除条件队列中不是condition状态的节点。
// 需要注意的是:对于未曾研究过AQS工作原理的同学,也许会错以为条件队列的节点被signal后会马上去抢独占锁,实际并非如此。所谓的转移就是要求条件队列的节点被signal后要先移动到阻塞队列中去排队等候被唤醒,这种设计也保证了Condition的设计思路和AQS独占模式设计思路的一致性。
private void doSignal(Node first) {
do {
// 因为转移的是first节点,因此将firstWaiter后移指向第二个节点也即Thread-1,当然如果第二个节点已经是null说明当前条件队列的所有节点都转移到阻塞队列中。
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// first节点出队,也即Thread-0断开和后面的Thread-1链接。(如果first是取消状态,则正好可以在此过程删除这些取消状态的节点)
first.nextWaiter = null;
/*
转移节点也不是随便转移的:只有first节点是非取消节点才能转移到阻塞队列。或者说
只要转移操作失败(因为当前first节点是取消状态才会转移失败)且条件队列还存在下一个节点,就继续找一个非取消状态的节点去转移
*/
} while (!transferForSignal(first) && //真正转移的逻辑是在transferForSignal实施的
(first = firstWaiter) != null);
}



/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
// 这里正好回答了上面doSignal为何需要重试的原因:因为当前转移的节点已经变为取消状态了,因此在判断!transferForSignal(first)后需要继续找条件队列链表的下一个非取消节点来转移。
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 这里就是条件队列节点转移到阻塞队列的秘密:条件队列节点头节点(假设first是非取消节点)维持自己阻塞状态出队,然后使用enq方法将此头节(注意此节点依旧处于阻塞状态)点放入阻塞队列的队尾。
Node p = enq(node);
int ws = p.waitStatus;
// 如果前一个节点是取消状态ws>0或者无法将前节点设为SIGNAL值,那么就无法让前节点来唤醒刚入阻塞队列的线程节点。为了处理这种特殊情况,直接唤醒此节点。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
// 执行到这里,说明:条件队列节点头节点(带着阻塞状态)已经成功进入阻塞队列且还处于阻塞状态
return true;
}

有了对signal的底层设计解析,现在分析这几个线程之间的内部协调

(1)生产者线程调用signal前,独占锁资源、阻塞队列和条件队列结构如下:

独占锁资源:生产者持有

阻塞队列:还未形成

条件队列如下:

firstWaiter(Thread-0,-2)->node(Thread1,-2)->node(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null

(2)生产者线程调用signal后,上面的独占锁资源、阻塞队列和条件队列结构如下:

独占锁资源:生产者持有

阻塞队列:head(null,-1)<->node(Thread-0,0) ,因为signal内部会将条件队列的头节点Thread-0转移(enq)到阻塞队列,此时Thread-0等待生产者线程使用unlock唤醒

条件队列如下:

firstWaiter(Thread1,-2)->node(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null

(3)生产者线程调用signal后且执行了lock.unlock(),唤醒了阻塞队列里面的Thread-0消费者线程,唤醒流程,lock.unlock—>release—>tryRelease—>unparkSuccessor。上面的独占锁资源、阻塞队列和条件队列结构如下:

独占锁资源::Thread-0持有

阻塞队列:head(null,0)

条件队列如下:

firstWaiter(Thread1,-2)->node(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null

(4)显然此时Thread-0可以从await唤醒的位置继续运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void run() {
lock.lock();
try {
if (list.isEmpty())
condition.await(); // Thread-0从这里唤醒后,继续执行

list.remove(0);
// Thread-0消费了生产者线程刚刚生产的车:也即对应的打印:Thread-0消费一台modelY
System.out.println(Thread.currentThread().getName() + "消费一台modelY");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// Thread-0释放独占锁
lock.unlock();

当Thread-0消费了一台车并在finally释放锁资源前、后过程中,生产者线程同时会进行第二次循环,执行流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void run() {
//第二次循环:
while (true){
lock.lock(); // Thread-0执行lock.unlock()前,生产者线程在第二次循环执行到lock.lock()时,因为此时锁资源还被消费者Thread-0占用,因此生产者只能进入阻塞队列等待。直到Thread-0执行lock.unlock()后,生产者马上抢占锁成功(因为消费者线程Thread-1~Thread-4还是条件队列中等待)
this.list.add(new ModelY());
System.out.println("生产了一台modelY");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signal(); // 通知条件队列里面的消费者Thread-1线程,以后的执行流程就按(2)~(4)不断循环执行,直到条件队列的消费者Thread-4完成lock.unlock()后,之后就没有消费者线程去消费生产者线程新生产的车,对应main程序可以观察到不断打印"生产了一台modelY"
lock.unlock();
}

await响应中断的逻辑分析

在上面await设计分析中,重点放在消费者线程入队、释放锁资源、阻塞操作的原理分析。在这一小节,则重点分析await是被设计为可响应外界中断的,await响应中断的逻辑相对复杂,两种特别的中断处理策略:

(1)中断处理策略1:如果条件队列的线程节点(例如上面提到的Thread-0)在waiting过程中被外界中断过,且还未被生产者signal过,那么此线程调用await就需要抛出InterruptedException

(2)中断处理策略2:如果条件队列的线程节点(例如上面提到的Thread-0)在waiting过程中没被外界中断,而是在signal后,再被外界中断过,那么此线程调用await只需补一次selfInterrupt()

当然此方法的注释也给出详细的说明(4、6):

1
2
3
4
5
6
7
8
9
/*
Implements interruptible condition wait.
1. If current thread is interrupted, throw InterruptedException.
2. Save lock state returned by getState.
3. Invoke release with saved state as argument, throwing IllegalMonitorStateException if it fails.
4. Block until signalled or interrupted.
5. Reacquire by invoking specialized version of acquire with saved state as argument.
6. If interrupted while blocked in step 4, throw InterruptedException.
*/

所以到此你应该猜测“await响应中断的设计中”是通过什么方式判断线程节点到底是被signal过还是没被signal呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155

// 使用interruptMode表征阻塞队列的线程的中断模式,根据情况,取以下两种值
/**
对应上述的中断模式2,这里的exit是指线程从阻塞队列获取锁资源后采取的处理中断策略(别错把exit看成是条件队列的线程出队)
Mode meaning to reinterrupt on exit from wait
*/
private static final int REINTERRUPT = 1;
/**
中断模式2:对应上述的中断模式1
Mode meaning to throw InterruptedException on exit from wait
*/
private static final int THROW_IE = -1;

/**
如果当前线程节点在signal前就被外界中断,则标记为为THROW_IE。
如果当前线程节点在signal之后被外界中断,那么则标记为REINTERRUPT,
如果当前节点在阻塞队列都没有被外界中断,则标记为0
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}

/** 以下就是判断当前线程节点到底是有没有被signal过的核心设计
* Transfers node, if necessary, to sync queue after a cancelled wait.
* Returns true if thread was cancelled before being signalled.
*
* @param node the node
* @return true if cancelled before the node was signalled
*/
final boolean transferAfterCancelledWait(Node node) {
/* 1、在条件队列的线程节点,其waitStatus一定是CONDITION,因此如果这里能CAS成功就说明:在signal之前,该线程已经被取消(被外界中断),但仍然需要将此节点放入阻塞队列。
thread was cancelled before being signalled.
*/
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
2、执行流来到这里,说明此时线程节点waitStatus不是CONDITION肯定被signal过,也即出了条件队列而且在转移到阻塞队列的过程中(可能还未进入阻塞队列),waitStatus在转移逻辑transferForSignal被更改过。换句话说:线程节点被signal过,已经出了条件队列但还未成功进入阻塞队列,因为enq()操作需要此线程和其他线程竞争入阻塞队列队尾,因此出现这种“incomplete transfer”不完整的转移是有可能的,但此情况即罕见又短暂,那么线程自己通过spin直到enq完成入阻塞队列就行,这就是源码注释要表达的含义。

* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

/**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
/*
两种情况会调用unlinkCancelledWaiters方法:
1、addConditionWaiter,添加一个新节点到条件队列队尾时,如果队尾节点恰好是取消状态的节点,则使用unlinkCancelledWaiters剔除它
2、在await方法中,当线程节点在条件队列中被中断(此时还未被signal,对应的interruptMode为THROW_IE),那么表示此节点取消在条件队列的排队,需要将其剔除。
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter; // 每次循环,t滑向当前要处理的节点,相当于遍历指针
Node trail = null; // trail指向非取消状态的节点
//
while (t != null) {
Node next = t.nextWaiter;
// 1、如果t指向的当前节点是取消状态,断开t和子链next的连接(也即将t剔除出队)
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
//1.1 如果trail还是null,说明条件队列的第一个节点是取消节点,把next作为条件队列的头节点即可
if (trail == null)
firstWaiter = next;
// 1.2 如果trial不为空,将trail-->t(cancelled)-->next变成trail-->next
else
trail.nextWaiter = next;
/*1.3 如果来到条件队列队尾,那么原条件队列的所有取消状态的节点都被剔除,之后的条件队列:
firstWaiter(CONDITION)-->全部都是CONDITION状态的节点-->lastWaiter(CONDITION)
*/
if (next == null)
lastWaiter = trail;
}
else
// 2、说明t当前处理节点是正常状态,trail滑向(指向)t
trail = t;
// 3、t滑向下一个条件队列节点
t = next;
}
}


/**
根据线程节点interruptMode被标记的状态:要么抛出异常、要么补一次自我中断、或者无需做其他操作
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}


// 以下以Thread-0调用await后进行中断逻辑分析,从唤醒处开始:
public final void await() throws InterruptedException {
// 如果Thread-0在进入条件队列等待之前就已经被外界中断过,此线程直接抛出异常,不再参与条件队列等待
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
/* Thread-0被唤醒后继续之后执行流,被唤醒的原因有以下两种:
1、被生产者线程使用signal唤醒的
注意:很多人以为是被生产者线程signal方法唤醒,这是理解不透彻的表现:signal方法只是把条件队列的Thread-0转移到阻塞队列中,真正唤醒Thread-0的代码是:生产者线程执行lock.unlock()内部使用unparkSuccessor唤醒阻塞队列里面的Thread-0(前提假设Thread-0是此队列的第一个线程节点)
2、其他线程中断了Thread-0导致唤醒
为了找出是哪种中断,需要在checkInterruptWhileWaiting进行判断,以便让Thread-0从阻塞队列出队拿到锁资源后,补充对应的中断类型(或者抛出InterruptedException或者补一个selfInterrupt())
*/

/* ② 醒来后的Thread-0使用Thread.interrupted()进行自我中断状态检查,如果自己在等待(线程休眠)过程中被中断过,则会跳出循环去到下面③的逻辑。
当然如果Thread-0没被中断过,那么回到while就满足OnSyncQueue,因此也会跳出while循环,意味着interruptMode为0
*/
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
/* ③ 由于Thread-0还在阻塞队列,运行到这里,就需要使用独占不可中断的acquireQueued方法重新获取锁资源,由于生产者线程执行②所提到的unlock后,此时Thread-0一定能acquireQueued成功获取独占锁(假设没有外界线程和Thread-0抢锁):
(1)acquireQueued返回true表示线程节点在阻塞队列中又被中断过1次,因此可以将interruptMode设为REINTERRUPT表示此线程节点被中断两次
(2)当然对于Thread-0已经占有独占锁来说,acquireQueued返回的interrupted=false,因此会跳过此逻辑
*/
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// ④ 如果interruptMode是THROW_IE那么执行流从②跳过③来到这里,说明此线程节点在条件队列中就被中断过(还未被signal),因此需要将此线程节点剔除出条件队列
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// ⑤ Thread-0从条件队列到阻塞队列出队的全过程是否发生的中断,都在这里得到最终的响应:如果是在signal前被外界中断,则THROW_IE,也即抛出异常;否则就是在signal后在阻塞队列等待锁期间被外界中断过,补上一个 selfInterrupt()即可
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

小结

有了以上完整的条件队列设计原理,本文的demo的内部线程之间的协调则很好理解:

  • (1)5个消费者线程公平模式lock.lock()之后在await执行前

条件队列:空

独占锁资源:Thread-0

其他4个线程因为Thread-0占有锁资源,只能先进入lock关联的CLH阻塞队列:

head(null,-1)<->node(Thread1,-1)<->node(Thread2,-1)<->node(Thread3,-1)<->node(Thread4,0)->null

  • (2)Thread-0执行await操作后,await操作包括的动作:进入条件队列、释放锁资源、阻塞自己

条件队列:firstWaiter(Thread-0,-2)->null (因为Thread-0进入条件队列)

独占线程:Thread-1 (因为Thread-0 fullyRelease()内部使用unparkSuccessor唤醒了阻塞队列的第一个线程节点Thread-1)

阻塞队列:head(null,-1)<->node(Thread2,-1)<->node(Thread3,-1)<->node(Thread4,0)->null

  • (3)接着Thread-1唤醒后执行await,重复以上(2)流程,直到Thread-4也执行await后有:

阻塞队列:head(null,0)

条件队列:firstWaiter(Thread-0,-2)->node(Thread1,-2)->node(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null

独占锁资源:null

可以看到,此时5个消费者线程都阻塞在lock关联下的条件队列,如果main程序没有安排生产者线程去signal此条件队列,那么main程序就会这种情况下挂起。

  • (4)接着生产者线程启动,加锁成功,执行到signal,将条件队列的中阻塞状态的Thread-0转移(transferForSignal)到阻塞队列

独占锁资源:生产者线程持有

阻塞队列:head(null,-1)<->node(Thread-0,0)<->null (Thread-0等待生产者释放锁资源)

条件队列:firstWaiter(Thread1,-2)->node(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null

  • (5)生产者完成signal后,执行到lock.unlock()后,也即使用unparkSuccessor唤醒阻塞队列的Thread-0

(注意:由于生产者此时可以进行第二次循环再次抢占锁但因为Thread-0已经持有锁,因此生产者线程此时会阻塞在阻塞队列)

独占锁资源:Thread-0持有

阻塞队列:head(null,-1)<-> 生产者线程

条件队列:firstWaiter(Thread1,-2)->node(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null

Thread-0继续执行“消费”代码:打印”生产了一台modelY”,后unlock释放锁资源

  • (6)生产者进行第二次循环后,因为Thread-0释放锁可以唤醒阻塞队列生产者线程,因此它能加锁成功,之后又来到signal,将条件队列的中阻塞状态的Thread-1转移到阻塞队列

独占锁资源:生产者线程持

阻塞队列:head(null,-1)<->node(Thread-1,0)<->null (等待生产者释放锁资源)

条件队列:firstWaiter(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null

  • (7)生产者完成signal后,执行到lock.unlock(),也即使用unparkSuccessor唤醒阻塞队列的Thread-1

独占锁资源:Thread-1持有

(注意:由于生产者此时可以进行第二次循环再次抢占锁但因为Thread-1已经持有锁,因此生产者线程此时会阻塞在阻塞队列)

阻塞队列:head(null,-1)<-> 生产者线程

条件队列:firstWaiter(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null

Thread-1继续执行“消费”代码:打印”生产了一台modelY”,后unlock释放锁资源

之后的循环逻辑类似,这里不再累赘。

如果你能清晰描述上述多个消费者和生产者线程的内部协调机制,那么才算是真正理解了AQS的底层设计及其源代码实现。