关于AQS的独占模式同步器设计原理(以ReentrantLock为例)以及共享模式的同步器设计原理(以Semaphore为例)在前面文章的已经讨论完毕,这两种模式让我们理解了AQS通过底层的FIFO阻塞队列(又称同步队列/变体的CLH队列/Sync queue)实现了相当巧妙的多线程协调调度的复杂逻辑。当然AQS还有一个更为关键的设计:结合FIFO阻塞队列+条件队列(又称condition queue/wait queue)实现一种基于条件的await和signal的多线程间的协调机制,也即本文内容。
关于条件队列和阻塞队列的说明
对于阻塞队列,这里只给出独占模式的线程节点说明:阻塞队列其实有AQS内部定义的双向链表节点的属性如下:
1 | static final class Node { |
而条件队列中,它是有单向链表实现,该单向链表的节点的属性为:
1 | Node firstWaiter; // First node of condition queue. |
可以看到三个属性用于构成单向链表,而条件队里的nextWaiter指针是为了区别阻塞队列中的next指针。
阻塞队列的作用在前面的文章已经给出很明确的说明:只要当前线程没有请求到锁资源(state),则需要进入CLH阻塞队列进行排队等候,那么AQS给出的条件队列到底是解决什么场景呢?
这里不妨考虑这种场景:
消费者线程:消费者线程原本在阻塞队列等待,当外界有线程释放了锁资源,那么此消费者线程从阻塞队列被唤醒后出队并拿到锁资源后,发现“存放货物的队列是空的”,这种未加入“条件限制”的等待线程调度策略则显得不够明智。
因此可以这么设计:给当前独占锁lock对象添加添加一个条件队列:如果有一个或者多消费者线程过来取“货物”,当遇到“仓库没有货物可取”这种条件时,那么这些消费者线程先被安排在条件队列等待(阻塞自己):
firstWaiter(c0)->c1->c2-c3->c4->lastWaiter(c5)->null
直到条件“仓库有货物可取时”,那么条件队列的消费者线程再转移到阻塞队列里面排队等候被唤醒去抢占锁资源以实施消费行为
基于Condition实现的生产者和消费者模型
这里将以一个经典多线程间协调案例作为分析基于AQS实现的Condition底层工作机制
生产者线程:每次向“仓库”生产1个产品,等待2秒后,并使用signal通知消费者线程
消费者线程:在启动后如果条件“仓库为空”成立则进入await阻塞状态。
1 | import java.util.ArrayList; |
运行结果如下:生产者线程循环一次生产一台车,消费者线程有序的“消费”对应的一台车,由于只启动5个消费者线程,因此当生产者线程生产到第6台车时,此时已经没有其他新来的消费者线程来“消费车”,因此不断打印生产者生产车的信息。
1 | 生产了一台modelY |
以上的多个消费者线程await和生产者线程signal并发协调机制及其源码解析将在下面给出。
消费者线程的await底层设计解析
首先,注释new Producer(condition,teslaWarehouse,lock).start()
这一行代码,以考察多个消费者线程lock.lock()->await()
内部逻辑,main启动后会发现主程序阻塞了,你会好奇到底消费者线程在AQS的哪个地方阻塞了?是因为都进入条件队列导致的main阻塞,还是在阻塞队列里面导致main阻塞?根据之前关于ReentrantLock
的独占模式原理解析,可知:
(1)首先,考察5个线程并发执行时都使用lock.lock()
争抢独占锁资源,由于lock对象使用公平模式实例化,因此Thread-0首先成功占有锁资源,而其他4个线程将有序进入lock对象的阻塞队列,state锁资源和阻塞队列的结构如下:
Thread-0作为优先抢占了锁资源的独占线程,可以继续执行自己的逻辑
1 | exclusiveOwnerThread=Thread-0 |
Thread-1到Thread-4因为并发执行lock.lock()
因此进入lock对象关联的阻塞队列且线程状态处于waiting阻塞状态:
1 | head(null,-1)<->node(Thread1,-1)<->node(Thread2,-1)<->node(Thread3,-1)<->node(Thread4,0)->null |
显然位于阻塞队列的第一个线程节点Thread-1正在等待Thread-0的唤醒。
注意:此时条件队列还未出现!此时条件队列还未出现!
(2) Thread-0可以继续执行自己逻辑,也即下面执行流(注意,此时锁资源还是由Thread-0占有!):
1 | if (list.isEmpty()) // 由于消费者线程早于生产者线程启动,因此一开始“仓库列表”是空的,因此进入以下await逻辑 |
condition.await()
调用的是AQS内部类ConditionObject的await()方法,继续以消费者线程Thread-0作为分析对象:
1 |
|
以上就是消费者线程首次调用condition.await()的内部工作流程,此时阻塞队列结构和条件队列结构如下:
Thread-0进入条件队列—>释放自己占有的锁资源(此操作会唤醒阻塞队列的Thread-1)—>在条件队列阻塞自己
同理被唤醒的Thread-1会继续执行await()方法,跟Thread-0执行流类似,以此类推,最后,5个消费者线程都在条件队列里面阻塞了:
firstWaiter(Thread-0,-2)->node(Thread1,-2)->node(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null
支持await的内部几个核心方法
- addConditionWaiter
1 | // ①将Thread-0添加到条件队列(单向链表) |
- fullyRelease
当Thread-0线程使用addConditionWaiter进入条件队列后,需要释放自己占有的独占锁资源,但为何要安排释放独占锁资源的逻辑?
以下可通过反证法考察其设计意图:
如果Thread-0进入条件队列后且不释放独占锁资源就进入阻塞状态,那么就会生产者线程lock.lock()加锁失败从而进入阻塞队列队尾,而阻塞队列里面第一个线程节点Thread-1显然已经没有外面线程来唤醒,结果就是:整个main程序直接stuck(卡住了),而且此时阻塞队列结构和条件队列结构如下:
head(null,-1)<->node(Thread1,-1)<->node(Thread2,-1)<->node(Thread3,-1)<->node(Thread4,-1)<->node(生产者线程,0)->null
条件队列结构:firstWaiter(Thread-0,-2)->null
以上的流程可以这样通俗理解:你拿了办公室钥匙(唯一一把钥匙),你直接去休息室“休眠”了,且没有把钥匙“释放”给后面的同事,那么你的同事1、同事2….只能等你,如果你一直“休眠”,那么他们一直无法进入办公室。
那么考察加入释放的独占锁资源逻辑后,其工作过程将如何?
(1)Thread-0的fullyRelease会调用AQS的release方法,从而唤醒了阻塞队列里面的Thread-1,此时Thread-1独占锁资源,Thread-1继续执行业务代码await(),同理进入条件队列,并使用fullyRelease独占锁资源同时该操作也会唤醒阻塞队列里面的Thread-2,以此类推,这种进入条件队后释放独占锁资源的流程可以这样通俗理解:
你拿了办公室钥匙(唯一一把钥匙),你直接去休息室“休眠”前,把钥匙“释放”给后面的同事1,接着,同事1也要去休息室“休眠”,去之前把钥匙“释放”给后面的同事2,以此类推
(2)显然此时5个消费者线程都在条件队列里面阻塞着(5个同事都在休息室“休眠”),直到生产者线程使用使用signal后,条件队列的Thread-0将被唤醒,之后的逻辑就是signal支持实现。
这就是fullyRelease的设计意图!
1 | // 进入条件队列的线程释放自己占有的独占锁,可能释放一个或者多个锁资源,如何理解多个锁资源?由于是condition对象是关联ReentrantLock对象,支持同一线程的一次(state=1)或者多次重入占有锁(state>1),因此这里fully是指:不管同一线程占用一个锁资源或者同一线程多次重入占用多个锁资源,都可以在这里一次性释放此线程占有的所有锁资源。 |
- isOnSyncQueue
判断刚进入条件队列的线程节点是否已经被转移到阻塞队列中,若不在,则使用 LockSupport.park(this)将当前条件队列的线程节点阻塞,避免浪费cpu时间片。对于Thread-0,显然刚进入条件队列,一定不在阻塞队列,因此将其阻塞在条件队列里等待,isOnSyncQueue就控制了条件队列节点的阻塞。
1 |
|
经过以上的lock.lock()到condition.await()
的线程节点工作原理分析,现在可以清晰看到线程们在阻塞队列结构过渡到条件队列结构的过程:
(1)5个消费者线程并发执行lock.lock()
,形成的阻塞队列结构如下:
Thread-0
1 | exclusiveOwnerThread=Thread-0 |
Thread-1到Thread-4因为并发执行lock.lock()
因此进入lock对象关联的阻塞队列且线程状态处于waiting阻塞状态:
head(null,-1)<->node(Thread1,-1)<->node(Thread2,-1)<->node(Thread3,-1)<->node(Thread4,0)->null
(2)5个消费者线程的执行流来到condition.await()
,形成的条件队列结构如下(单向链表),且每个消费者线程都处于阻塞状态:
firstWaiter(Thread-0,-2)->node(Thread1,-2)->node(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null
在前面demo的mian方法,我们给出注释new Producer(condition,teslaWarehouse,lock).start()
,也即没有生产者线程启动,也即没有调用condition.signal方法,那么消费者线程Thread-0只能在条件队列一直被阻塞,当然后面的Thread-1到Thread-4也同样处于阻塞状态。
下面将加入生产者的signal后的工作过程分析
生产者线程的signal底层设计解析
1 | for (int i = 0; i < 5; i++) { |
Producer线程的运行逻辑
1 |
|
- signal()设计逻辑
1 |
|
有了对signal的底层设计解析,现在分析这几个线程之间的内部协调
(1)生产者线程调用signal前,独占锁资源、阻塞队列和条件队列结构如下:
独占锁资源:生产者持有
阻塞队列:还未形成
条件队列如下:
firstWaiter(Thread-0,-2)->node(Thread1,-2)->node(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null
(2)生产者线程调用signal后,上面的独占锁资源、阻塞队列和条件队列结构如下:
独占锁资源:生产者持有
阻塞队列:head(null,-1)<->node(Thread-0,0) ,因为signal内部会将条件队列的头节点Thread-0转移(enq)到阻塞队列,此时Thread-0等待生产者线程使用unlock唤醒
条件队列如下:
firstWaiter(Thread1,-2)->node(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null
(3)生产者线程调用signal后且执行了lock.unlock(),唤醒了阻塞队列里面的Thread-0消费者线程,唤醒流程,lock.unlock—>release—>tryRelease—>unparkSuccessor。上面的独占锁资源、阻塞队列和条件队列结构如下:
独占锁资源::Thread-0持有
阻塞队列:head(null,0)
条件队列如下:
firstWaiter(Thread1,-2)->node(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null
(4)显然此时Thread-0可以从await唤醒的位置继续运行
1 |
|
当Thread-0消费了一台车并在finally释放锁资源前、后过程中,生产者线程同时会进行第二次循环,执行流程如下:
1 |
|
await响应中断的逻辑分析
在上面await设计分析中,重点放在消费者线程入队、释放锁资源、阻塞操作的原理分析。在这一小节,则重点分析await是被设计为可响应外界中断的,await响应中断的逻辑相对复杂,两种特别的中断处理策略:
(1)中断处理策略1:如果条件队列的线程节点(例如上面提到的Thread-0)在waiting过程中被外界中断过,且还未被生产者signal过,那么此线程调用await就需要抛出InterruptedException
(2)中断处理策略2:如果条件队列的线程节点(例如上面提到的Thread-0)在waiting过程中没被外界中断,而是在signal后,再被外界中断过,那么此线程调用await只需补一次selfInterrupt()
当然此方法的注释也给出详细的说明(4、6):
1 | /* |
所以到此你应该猜测“await响应中断的设计中”是通过什么方式判断线程节点到底是被signal过还是没被signal呢?
1 |
|
小结
有了以上完整的条件队列设计原理,本文的demo的内部线程之间的协调则很好理解:
- (1)5个消费者线程公平模式lock.lock()之后在await执行前
条件队列:空
独占锁资源:Thread-0
其他4个线程因为Thread-0占有锁资源,只能先进入lock关联的CLH阻塞队列:
head(null,-1)<->node(Thread1,-1)<->node(Thread2,-1)<->node(Thread3,-1)<->node(Thread4,0)->null
- (2)Thread-0执行await操作后,await操作包括的动作:进入条件队列、释放锁资源、阻塞自己
条件队列:firstWaiter(Thread-0,-2)->null (因为Thread-0进入条件队列)
独占线程:Thread-1 (因为Thread-0 fullyRelease()内部使用unparkSuccessor唤醒了阻塞队列的第一个线程节点Thread-1)
阻塞队列:head(null,-1)<->node(Thread2,-1)<->node(Thread3,-1)<->node(Thread4,0)->null
- (3)接着Thread-1唤醒后执行await,重复以上(2)流程,直到Thread-4也执行await后有:
阻塞队列:head(null,0)
条件队列:firstWaiter(Thread-0,-2)->node(Thread1,-2)->node(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null
独占锁资源:null
可以看到,此时5个消费者线程都阻塞在lock关联下的条件队列,如果main程序没有安排生产者线程去signal此条件队列,那么main程序就会这种情况下挂起。
- (4)接着生产者线程启动,加锁成功,执行到signal,将条件队列的中阻塞状态的Thread-0转移(transferForSignal)到阻塞队列
独占锁资源:生产者线程持有
阻塞队列:head(null,-1)<->node(Thread-0,0)<->null (Thread-0等待生产者释放锁资源)
条件队列:firstWaiter(Thread1,-2)->node(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null
- (5)生产者完成signal后,执行到lock.unlock()后,也即使用unparkSuccessor唤醒阻塞队列的Thread-0
(注意:由于生产者此时可以进行第二次循环再次抢占锁但因为Thread-0已经持有锁,因此生产者线程此时会阻塞在阻塞队列)
独占锁资源:Thread-0持有
阻塞队列:head(null,-1)<-> 生产者线程
条件队列:firstWaiter(Thread1,-2)->node(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null
Thread-0继续执行“消费”代码:打印”生产了一台modelY”,后unlock释放锁资源
- (6)生产者进行第二次循环后,因为Thread-0释放锁可以唤醒阻塞队列生产者线程,因此它能加锁成功,之后又来到signal,将条件队列的中阻塞状态的Thread-1转移到阻塞队列
独占锁资源:生产者线程持
阻塞队列:head(null,-1)<->node(Thread-1,0)<->null (等待生产者释放锁资源)
条件队列:firstWaiter(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null
- (7)生产者完成signal后,执行到lock.unlock(),也即使用unparkSuccessor唤醒阻塞队列的Thread-1
独占锁资源:Thread-1持有
(注意:由于生产者此时可以进行第二次循环再次抢占锁但因为Thread-1已经持有锁,因此生产者线程此时会阻塞在阻塞队列)
阻塞队列:head(null,-1)<-> 生产者线程
条件队列:firstWaiter(Thread2,-2)->node(Thread3,-2)->node(Thread4,-2)->null
Thread-1继续执行“消费”代码:打印”生产了一台modelY”,后unlock释放锁资源
之后的循环逻辑类似,这里不再累赘。
如果你能清晰描述上述多个消费者和生产者线程的内部协调机制,那么才算是真正理解了AQS的底层设计及其源代码实现。