countdown:倒计时,latch:门闩;插销
其实CountDownLatch是一种共享锁资源的同步器,用于协调多个线程并行执行(注意不是并发),也即控制多个线程完成后接着在同一时刻去完成某事。当然了,这种协调的底层实现是基于AQS的共享模式实现。
CountDownLatch一般用法如下
1 2 countdownLatch.countDown()   countdownLatch.await()  
第一种用法:主线程等所有子线程完成后再统一做某事 这里用开会作为例子,假设有A、B、C三个人参与开会,主持人是AA,开会的规则:要求三个人到场后,主持人才可以开始开会。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public  class  CountDownLatchDemo      public  static  void  main (String[] args)  throws  InterruptedException          CountDownLatch countDownLatch =new  CountDownLatch(3 );         new  A(countDownLatch).start();         new  B(countDownLatch).start();         new  C(countDownLatch).start();         countDownLatch.await();          System.out.println("三位都到了会场,现在可以开会了" );     } } class  A  extends  Thread     private  final  CountDownLatch countDownLatch;     A (CountDownLatch countDownLatch){         this .countDownLatch=countDownLatch;     }     @Override      public  void  run ()           try  {             Thread.sleep(1000 );             System.out.println("A已到达会场" );             countDownLatch.countDown();         } catch  (InterruptedException e) {             e.printStackTrace();         }     } } class  B  extends  Thread     private  final  CountDownLatch countDownLatch;     B (CountDownLatch countDownLatch){         this .countDownLatch=countDownLatch;     }     @Override      public  void  run ()           try  {             Thread.sleep(2000 );             System.out.println("B已到达会场" );             countDownLatch.countDown();         } catch  (InterruptedException e) {             e.printStackTrace();         }     } } class  C  extends  Thread     private  final  CountDownLatch countDownLatch;     C (CountDownLatch countDownLatch){         this .countDownLatch=countDownLatch;     }     @Override      public  void  run ()           try  {             Thread.sleep(3000 );             System.out.println("C已到达会场" );             countDownLatch.countDown();         } catch  (InterruptedException e) {             e.printStackTrace();         }     } } 
输出:
A已到达会场
countDown的底层AQS解析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 countDownLatch.countDown(); public  void  countDown ()    sync.releaseShared(1 ); }     public  final  boolean  releaseShared (int  arg)           if  (tryReleaseShared(arg)) {              doReleaseShared();             return  true ;         }         return  false ;     } protected  boolean  tryReleaseShared (int  releases)       for  (;;) {     int  c = getState();     if  (c == 0 )       return  false ;      int  nextc = c-1 ;      if  (compareAndSetState(c, nextc))       return  nextc == 0 ;    } } 
await()的底层AQS解析 注意,调用countDownLatch.await()的线程可能会马上进入阻塞状态,已经掌握AQS的底层设计原理的同学应该可以看出所谓的“阻塞”其实就是进入了CLH阻塞队列后将自己park起来,并等待释放锁的线程来唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 countDownLatch.await();      public  void  await ()  throws  InterruptedException          sync.acquireSharedInterruptibly(1 );     } public  final  void  acquireSharedInterruptibly (int  arg)   throws  InterruptedException  {     if  (Thread.interrupted())     throw  new  InterruptedException();   if  (tryAcquireShared(arg) < 0 )      doAcquireSharedInterruptibly(arg); } protected  int  tryAcquireShared (int  acquires)              return  (getState() == 0 ) ? 1  : -1 ;         }     private  void  doAcquireSharedInterruptibly (int  arg)          throws  InterruptedException  {        final  Node node = addWaiter(Node.SHARED);          boolean  failed = true ;         try  {             for  (;;) {                	                 final  Node p = node.predecessor();                 if  (p == head) {                   	                     int  r = tryAcquireShared(arg);                     if  (r >= 0 ) {                         setHeadAndPropagate(node, r);                         p.next = null ;                          failed = false ;                         return ;                     }                 }               	               	                 if  (shouldParkAfterFailedAcquire(p, node) &&                     parkAndCheckInterrupt())                     throw  new  InterruptedException();             }         } finally  {             if  (failed)                 cancelAcquire(node);         }     } 
总结以上“等三个人到齐后再开会”的例子:
虽然A、B、C以及主线程都是同时start,但因为A、B、C线程分别要执行一定时间后才能进行countDown减1,因此在主线程执行await()方法时马上有getState()==3,也即tryAcquireShared返回-1,故主线程进入了doAcquireSharedInterruptibly逻辑。
谁去唤醒已经在阻塞队列里面“主线程节点” 上一小节已经分析了调用await()的主线程先被AQS“调度”进入了阻塞队列里面,那么谁去负责把这个位于阻塞队列里面的主线程唤醒呢?这里不妨给一个拟人化的演绎推理:
假设A先到,他说我到了主持人可以开始开会,但显然后面还有B、C没到,因此A需要等齐人再去“唤醒主持人”,这样才显得“开会流程”是合理的,接着B到了,他说我到了主持人可以开始开会,但显然后面还有C没到,因此B也不适合去“唤醒主持人”,最后C到了,C看到前面的A、B都到了,这时符合三人都到齐了的条件,因此由最后一个到达会场的C来执行doReleaseShared唤醒逻辑才是合理的,也即让在“等候间睡眠的主持人醒来”开始主持会议。
 
对应到CountDownLatch—AQS算法层面的流程如下:
A、B、C以及主线程同一时刻start
1、主线程先进入AQS的阻塞队列并park上(阻塞自己)。
2、接着线程A执行1秒结束后使用countDown,计数器从初始给定的3减为2,因为getState !=0 ,此时不会执行doReleaseShared 唤醒操作
2、线程B执行2秒结束后使用countDown,计数器从上面的2减为1,因为getState !=0 ,因此此时不会执行doReleaseShared 唤醒操作
3、线程C执行3秒结束后使用countDown,计数器从上面的1减为0,因为getState ==0成立 ,此时会进入doReleaseShared唤醒操作操作流程,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 protected  boolean  tryReleaseShared (1 )       for  (;;) {     int  c = getState();      if  (c == 0 )       return  false ;      int  nextc = c-1 ;      if  (compareAndSetState(c, nextc))        return  nextc == 0 ;    } }          public  final  boolean  releaseShared (arg)                    if  (tryReleaseShared(arg)) {              doReleaseShared();             return  true ;         }         return  false ;     }     private  void  doReleaseShared ()           for  (;;) {             Node h = head;             if  (h != null  && h != tail) {                 int  ws = h.waitStatus;                                  if  (ws == Node.SIGNAL) {                      if  (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 ))                         continue ;                                 unparkSuccessor(h);                 }                 else  if  (ws == 0  &&                          !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE))                     continue ;                             }             if  (h == head)                                    break ;         }     } 
以上的唤醒流程其实可以得出这样的结论:虽然有多个线程使用的countDown,但最终是由最后一个完成自身任务的线程C(因为它可以使得state减为0)去唤醒“调用await被放入阻塞队列”的主线线程。
有了以上解析,那么针对CountDownLatch的第二种场景使用则可以很好理解其背后的工作机制。
第二种用法:要求多个线程在同一时刻“开跑” 这里我们用了Doug Lea在源码给出的demo代码自行设计一个场景 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class  Driver    void  main ()  throws  InterruptedException      CountDownLatch startSignal = new  CountDownLatch(1 );     CountDownLatch doneSignal = new  CountDownLatch(N);     for  (int  i = 0 ; i < N; ++i)        new  Thread(new  Worker(startSignal, doneSignal)).start();     doSomethingElse();                 startSignal.countDown();           doSomethingElse();     doneSignal.await();              } } class  Worker  implements  Runnable    private  final  CountDownLatch startSignal;   private  final  CountDownLatch doneSignal;   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {     this .startSignal = startSignal;     this .doneSignal = doneSignal;   }   public  void  run ()       try  {       startSignal.await();       doWork();       doneSignal.countDown();     } catch  (InterruptedException ex) {}    }   void  doWork ()   } 
对应的Demo:
场景:三个跑者线程在同一起跑线上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 public  class  CountDownLatchDemo      public  static  void  main (String[] args)  throws  InterruptedException          CountDownLatch waitSignal=new  CountDownLatch(1 );         CountDownLatch launchSignal=new  CountDownLatch(3 );         new  RunnerA(waitSignal,launchSignal).start();         new  RunnerB(waitSignal,launchSignal).start();         new  RunnerC(waitSignal,launchSignal).start();              waitSignal.countDown();          launchSignal.await();     } } class  RunnerA  extends  Thread     private  final  CountDownLatch waitSignal;     private  final  CountDownLatch launchSignal;     RunnerA (CountDownLatch startSignal,CountDownLatch doneSignal ){         this .waitSignal =startSignal;         this .launchSignal =doneSignal;     }     @Override      public  void  run ()           try  {             System.out.println("A在起跑线等待裁判吹哨" );             waitSignal.await();             doRun();             launchSignal.countDown();         }catch  (InterruptedException e){             e.printStackTrace();         }     }     private  void  doRun ()  throws  InterruptedException          Thread.sleep(1000 );         System.out.println("A 到达了终点" );     } } class  RunnerB  extends  Thread     private  final  CountDownLatch startSignal;     private  final  CountDownLatch doneSignal;     RunnerB (CountDownLatch startSignal,CountDownLatch doneSignal ){         this .startSignal=startSignal;         this .doneSignal=doneSignal;     }     @Override      public  void  run ()           try  {             System.out.println("B在起跑线等待裁判吹哨" );             startSignal.await();             doRun();             doneSignal.countDown();         }catch  (InterruptedException e){             e.printStackTrace();         }     }     private  void  doRun ()  throws  InterruptedException          Thread.sleep(3000 );         System.out.println("B 到达了终点" );     } } class  RunnerC  extends  Thread     private  final  CountDownLatch startSignal;     private  final  CountDownLatch doneSignal;     RunnerC (CountDownLatch startSignal,CountDownLatch doneSignal ){         this .startSignal=startSignal;         this .doneSignal=doneSignal;     }     @Override      public  void  run ()           try  {             System.out.println("C在起跑线等待裁判吹哨" );             startSignal.await();             doRun();             doneSignal.countDown();         }catch  (InterruptedException e){             e.printStackTrace();         }     }     private  void  doRun ()  throws  InterruptedException          Thread.sleep(5000 );         System.out.println("C 到达了终点" );     } } 
输出:
1 2 3 4 5 6 7 A在起跑线等待裁判吹哨 B在起跑线等待裁判吹哨 C在起跑线等待裁判吹哨 A 到达了终点  B 到达了终点  C 到达了终点  
以上线程调度涉及到两个AQS底层的阻塞队列:
第一个阻塞队列的形成(waitSignal背后对应的阻塞队列): RunnerA、RunnerB、RunnerC这三个线程都在“run”里面先调用waitSignal.await(),且同一时刻启动,由于比主线程waitSignal.countDown()先执行,因此这三个线程的await的getSate !=0 会进入阻塞队列操作doAcquireSharedInterruptibly,也即在waitSignal对象上形成以下CLH阻塞队列:
1 head(null,-1) <-> node(RunnerA,-1)  <-> node(RunnerB,-1)  <-> node(RunnerC,0)->null  
(假设按A、B、C入队顺序做的阻塞队列示意图)
什么时刻由谁去唤醒他们呢?
这里就是为何在主线程下使用waitSignal.countDown()的原因:对应以下1、2执行流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18    protected  boolean  tryReleaseShared (int  releases)              for  (;;) {          int  c = getState();           if  (c == 0 )              return  false ;          int  nextc = c-1 ;           if  (compareAndSetState(c, nextc))              return  nextc == 0 ;       }       public  final  boolean  releaseShared (int  arg)   if  (tryReleaseShared(arg)) {       doReleaseShared();       return  true ;  } 
从上面可以看出,在主线程执行waitSignal.countDown()后会在AQS内部先通过doReleaseShared里面的unparkSuccessor唤醒RunnerA,那么之后的RunnerB、RunnerC又是怎么被唤醒呢?这就需要使用AQS共享模式的唤醒传播设计去解释了,以下便是RunnerA唤醒之后的执行流程:
RunnerA通过setHeadAndPropagate(node, r) 唤醒了RunnerB,同时RunnerA也会出队
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private  void  doAcquireSharedInterruptibly (int  arg)     throws  InterruptedException  {    final  Node node = addWaiter(Node.SHARED);     boolean  failed = true ;     try  {         for  (;;) {           	             final  Node p = node.predecessor();             if  (p == head) {                 int  r = tryAcquireShared(arg);                  if  (r >= 0 ) {                   	                     setHeadAndPropagate(node, r);                     p.next = null ;                      failed = false ;                     return ;                	 
以此类推,RunnerB通过setHeadAndPropagate(node, r) 唤醒了RunnerC,同时RunnerC也会出队
最终实现了以下唤醒流:
1、主线程唤醒RunnerA
2、RunnerA使用setHeadAndPropagate传播唤醒RunnerB
3、RunnerB使用setHeadAndPropagate传播唤醒RunnerC
如果没有看本博客之前关于AQS的共享模式设计的解析,则无法理解背后多个线程入队阻塞之后被唤醒的流程,以及所谓的传播唤醒设计原理。
第二个阻塞队列的形成(launchSignal背后对应的阻塞队列): 虽然三个线程和主线程都是同一时刻执行,但因为每个线程需要“跑步一段时间后”才执行launchSignal.countDown(),故主线程执行launchSignal.await()时,getState=3显然不是0,因此会执行doAcquireSharedInterruptibly进入另外一条CLH阻塞队列,队列结构如下:
1 head(null,-1) <-> node(主线程,0)->null 
当主线程进入阻塞队列且被阻塞后,此时对应的用户线程是这样的:RunnerA运行自己的任务、RunnerB运行自己的任务、RunnerC运行自己的任务,且主程序还未退出,那么在什么时间且最终由谁去唤醒位于launchSignal对象内部的阻塞队列的主线程呢?
由于RunnerA是第一个结束任务(执行1秒)、RunnerB是第二个结束任务(执行3秒),RunnerC是作为最后一个结束任务(运行5秒),因此容易推出:由RunnerC在众多线程中作为最后一个结束任务的线程去唤醒位于launchSignal对象内部的阻塞队列的主线程。这里就不再具体解释其唤醒过程了,参考第一种用法里面的部分解析即可。