countdown:倒计时,latch:门闩;插销
其实CountDownLatch是一种共享锁资源的同步器,用于协调多个线程并行执行(注意不是并发),也即控制多个线程完成后接着在同一时刻去完成某事。当然了,这种协调的底层实现是基于AQS的共享模式实现。
CountDownLatch一般用法如下
1 2 countdownLatch.countDown() countdownLatch.await()
第一种用法:主线程等所有子线程完成后再统一做某事
这里用开会作为例子,假设有A、B、C三个人参与开会,主持人是AA,开会的规则:要求三个人到场后,主持人才可以开始开会。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public class CountDownLatchDemo { public static void main (String[] args) throws InterruptedException { CountDownLatch countDownLatch =new CountDownLatch(3 ); new A(countDownLatch).start(); new B(countDownLatch).start(); new C(countDownLatch).start(); countDownLatch.await(); System.out.println("三位都到了会场,现在可以开会了" ); } } class A extends Thread { private final CountDownLatch countDownLatch; A (CountDownLatch countDownLatch){ this .countDownLatch=countDownLatch; } @Override public void run () { try { Thread.sleep(1000 ); System.out.println("A已到达会场" ); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } } class B extends Thread { private final CountDownLatch countDownLatch; B (CountDownLatch countDownLatch){ this .countDownLatch=countDownLatch; } @Override public void run () { try { Thread.sleep(2000 ); System.out.println("B已到达会场" ); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } } class C extends Thread { private final CountDownLatch countDownLatch; C (CountDownLatch countDownLatch){ this .countDownLatch=countDownLatch; } @Override public void run () { try { Thread.sleep(3000 ); System.out.println("C已到达会场" ); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }
输出:
A已到达会场 B已到达会场 C已到达会场 三位都到了会场,现在可以开会了
countDown的底层AQS解析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 countDownLatch.countDown(); public void countDown () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } }
await()的底层AQS解析 注意,调用countDownLatch.await()的线程可能会马上进入阻塞状态,已经掌握AQS的底层设计原理的同学应该可以看出所谓的“阻塞”其实就是进入了CLH阻塞队列后将自己park起来,并等待释放锁的线程来唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 countDownLatch.await(); public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
总结以上“等三个人到齐后再开会”的例子:
虽然A、B、C以及主线程都是同时start,但因为A、B、C线程分别要执行一定时间后才能进行countDown减1,因此在主线程执行await()方法时马上有getState()==3,也即tryAcquireShared
返回-1,故主线程进入了doAcquireSharedInterruptibly逻辑。
谁去唤醒已经在阻塞队列里面“主线程节点” 上一小节已经分析了调用await()的主线程先被AQS“调度”进入了阻塞队列里面,那么谁去负责把这个位于阻塞队列里面的主线程唤醒呢?这里不妨给一个拟人化的演绎推理:
假设A先到,他说我到了主持人可以开始开会,但显然后面还有B、C没到,因此A需要等齐人再去“唤醒主持人”,这样才显得“开会流程”是合理的,接着B到了,他说我到了主持人可以开始开会,但显然后面还有C没到,因此B也不适合去“唤醒主持人”,最后C到了,C看到前面的A、B都到了,这时符合三人都到齐了的条件,因此由最后一个到达会场的C来执行doReleaseShared唤醒逻辑才是合理的,也即让在“等候间睡眠的主持人醒来”开始主持会议。
对应到CountDownLatch—AQS算法层面的流程如下:
A、B、C以及主线程同一时刻start
1、主线程先进入AQS的阻塞队列并park上(阻塞自己)。
2、接着线程A执行1秒结束后使用countDown,计数器从初始给定的3减为2,因为getState !=0 ,此时不会执行doReleaseShared
唤醒操作
2、线程B执行2秒结束后使用countDown,计数器从上面的2减为1,因为getState !=0 ,因此此时不会执行doReleaseShared
唤醒操作
3、线程C执行3秒结束后使用countDown,计数器从上面的1减为0,因为getState ==0成立 ,此时会进入doReleaseShared
唤醒操作操作流程,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 protected boolean tryReleaseShared (1 ) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } public final boolean releaseShared (arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
以上的唤醒流程其实可以得出这样的结论:虽然有多个线程使用的countDown,但最终是由最后一个完成自身任务的线程C(因为它可以使得state减为0)去唤醒“调用await被放入阻塞队列”的主线线程。
有了以上解析,那么针对CountDownLatch的第二种场景使用则可以很好理解其背后的工作机制。
第二种用法:要求多个线程在同一时刻“开跑” 这里我们用了Doug Lea在源码给出的demo代码自行设计一个场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class Driver { void main () throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1 ); CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0 ; i < N; ++i) new Thread(new Worker(startSignal, doneSignal)).start(); doSomethingElse(); startSignal.countDown(); doSomethingElse(); doneSignal.await(); } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this .startSignal = startSignal; this .doneSignal = doneSignal; } public void run () { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) {} } void doWork () { ... } }
对应的Demo:
场景:三个跑者线程在同一起跑线上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 public class CountDownLatchDemo { public static void main (String[] args) throws InterruptedException { CountDownLatch waitSignal=new CountDownLatch(1 ); CountDownLatch launchSignal=new CountDownLatch(3 ); new RunnerA(waitSignal,launchSignal).start(); new RunnerB(waitSignal,launchSignal).start(); new RunnerC(waitSignal,launchSignal).start(); waitSignal.countDown(); launchSignal.await(); } } class RunnerA extends Thread { private final CountDownLatch waitSignal; private final CountDownLatch launchSignal; RunnerA (CountDownLatch startSignal,CountDownLatch doneSignal ){ this .waitSignal =startSignal; this .launchSignal =doneSignal; } @Override public void run () { try { System.out.println("A在起跑线等待裁判吹哨" ); waitSignal.await(); doRun(); launchSignal.countDown(); }catch (InterruptedException e){ e.printStackTrace(); } } private void doRun () throws InterruptedException { Thread.sleep(1000 ); System.out.println("A 到达了终点" ); } } class RunnerB extends Thread { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; RunnerB (CountDownLatch startSignal,CountDownLatch doneSignal ){ this .startSignal=startSignal; this .doneSignal=doneSignal; } @Override public void run () { try { System.out.println("B在起跑线等待裁判吹哨" ); startSignal.await(); doRun(); doneSignal.countDown(); }catch (InterruptedException e){ e.printStackTrace(); } } private void doRun () throws InterruptedException { Thread.sleep(3000 ); System.out.println("B 到达了终点" ); } } class RunnerC extends Thread { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; RunnerC (CountDownLatch startSignal,CountDownLatch doneSignal ){ this .startSignal=startSignal; this .doneSignal=doneSignal; } @Override public void run () { try { System.out.println("C在起跑线等待裁判吹哨" ); startSignal.await(); doRun(); doneSignal.countDown(); }catch (InterruptedException e){ e.printStackTrace(); } } private void doRun () throws InterruptedException { Thread.sleep(5000 ); System.out.println("C 到达了终点" ); } }
输出:
1 2 3 4 5 6 7 A在起跑线等待裁判吹哨 B在起跑线等待裁判吹哨 C在起跑线等待裁判吹哨 A 到达了终点 B 到达了终点 C 到达了终点
以上线程调度涉及到两个AQS底层的阻塞队列:
第一个阻塞队列的形成(waitSignal背后对应的阻塞队列): RunnerA、RunnerB、RunnerC这三个线程都在“run”里面先调用waitSignal.await()
,且同一时刻启动,由于比主线程waitSignal.countDown()
先执行,因此这三个线程的await的getSate !=0 会进入阻塞队列操作doAcquireSharedInterruptibly
,也即在waitSignal对象上形成以下CLH阻塞队列:
1 head(null,-1) <-> node(RunnerA,-1) <-> node(RunnerB,-1) <-> node(RunnerC,0)->null
(假设按A、B、C入队顺序做的阻塞队列示意图)
什么时刻由谁去唤醒他们呢?
这里就是为何在主线程下使用waitSignal.countDown()
的原因:对应以下1、2执行流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; }
从上面可以看出,在主线程执行waitSignal.countDown()
后会在AQS内部先通过doReleaseShared
里面的unparkSuccessor
唤醒RunnerA,那么之后的RunnerB、RunnerC又是怎么被唤醒呢?这就需要使用AQS共享模式的唤醒传播设计去解释了,以下便是RunnerA唤醒之后的执行流程:
RunnerA通过setHeadAndPropagate(node, r)
唤醒了RunnerB,同时RunnerA也会出队
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ;
以此类推,RunnerB通过setHeadAndPropagate(node, r)
唤醒了RunnerC,同时RunnerC也会出队
最终实现了以下唤醒流:
1、主线程唤醒RunnerA
2、RunnerA使用setHeadAndPropagate传播唤醒RunnerB
3、RunnerB使用setHeadAndPropagate传播唤醒RunnerC
如果没有看本博客之前关于AQS的共享模式设计的解析,则无法理解背后多个线程入队阻塞之后被唤醒的流程,以及所谓的传播唤醒设计原理。
第二个阻塞队列的形成(launchSignal背后对应的阻塞队列): 虽然三个线程和主线程都是同一时刻执行,但因为每个线程需要“跑步一段时间后”才执行launchSignal.countDown()
,故主线程执行launchSignal.await()
时,getState=3显然不是0,因此会执行doAcquireSharedInterruptibly
进入另外一条CLH阻塞队列,队列结构如下:
1 head(null,-1) <-> node(主线程,0)->null
当主线程进入阻塞队列且被阻塞后,此时对应的用户线程是这样的:RunnerA运行自己的任务、RunnerB运行自己的任务、RunnerC运行自己的任务,且主程序还未退出,那么在什么时间且最终由谁去唤醒位于launchSignal对象内部的阻塞队列
的主线程呢?
由于RunnerA是第一个结束任务(执行1秒)、RunnerB是第二个结束任务(执行3秒),RunnerC是作为最后一个结束任务(运行5秒),因此容易推出:由RunnerC在众多线程中作为最后一个结束任务的线程去唤醒位于launchSignal对象内部的阻塞队列
的主线程。这里就不再具体解释其唤醒过程了,参考第一种用法里面的部分解析即可。