/** * Each use of the barrier is represented as a generation instance. * The generation changes whenever the barrier is tripped, or * is reset. There can be many generations associated with threads * using the barrier - due to the non-deterministic way the lock * may be allocated to waiting threads - but only one of these * can be active at a time (the one to which {@code count} applies) * and all the rest are either broken or tripped. * There need not be an active generation if there has been a break * but no subsequent reset. */ privatestaticclassGeneration{ boolean broken = false; }
/** The lock for guarding barrier entry */ // 从这里可以看到,内部使用一把独占锁和一个条件队列实现阻塞机制 privatefinal ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ // 这里tripped可以理解为:如果所有线程还未到达屏障,那么那些先来到屏障的线程会被阻塞在trip条件队列中 privatefinal Condition trip = lock.newCondition(); /** The number of parties */ // 到达同一屏障才给放行的线程数量,例如上面demo参与游戏的人数,要求3位同学到达关口才能进入下一关 privatefinalint parties; /* The command to run when tripped */ // 当所有线程都到达了屏障,由最晚到的线程去执行barrierCommand定义的具体任务,例如demo的C同学:最晚到关口的Thread-2说:A、B、C都齐了,可以开始下一关 privatefinal Runnable barrierCommand; /** The current generation */ // 这里的generation可以理解“当前屏障”、“本关”、“本局”、“本阶段”,就像游戏过关一样,如果在本局就是当前generation,如果通过本局就来到下一局,也即nextGeneration private Generation generation = new Generation();
/** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ /* (1)用于统计目前还差多少个线程到达屏障才能进行下一关任务,由barrier.await()对其扣减。 在new CyclicBarrier给定,count的初始值=parties 例如上面的demo: Thread-0执行barrier.await(),此时count=2,表示当前还差2个线程到达屏障才能进行下一关任务 Thread-1执行barrier.await(),此时count=1,表示当前还差1个线程到达屏障才能进行下一关任务 Thread-2执行barrier.await(),此时count=0,表示3个线程都到齐了屏障,可以开始任务 (2) 当“游戏”从本关进入到下一关(或者从本屏障通过,达到下一个屏障),count又恢复到parties的数量,这里体现了Cyclic重复、循环的意思。 (3) 当然还有其他情况,参考后面的解析。 */ privateint count;
/** 关于“when the barrier is tripped”,本文一律解释为“当屏障” * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. 注意这里已经很清楚说明:由最后到达barrier的线程来执行给定barrierAction(也即初始化传入一个Runnable的任务,类似demo里面“A、B、C都齐了,可以开始下一关”的打印任务),barrierAction也可以是null,意味着无任何操作。 * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @param barrierAction the command to execute when the barrier is * tripped, or {@code null} if there is no action * @throws IllegalArgumentException if {@code parties} is less than 1 */ // publicCyclicBarrier(int parties, Runnable barrierAction){ if (parties <= 0) thrownew IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
/*在源码该方法注释中,说明调用await的线程阻塞在trip条件队列中,除非发生以下5个情况(这些情况都会让条件队列中全部阻塞的线程们唤醒): 1、最后一个到达屏障线程,此线程会signalAll条件队列中所有阻塞线程 1、The last thread arrives; or 2、其他线程中断当前执行await的线程 2、Some other thread interrupts the current thread; or 3、其他线程中断一个已经在条件队列阻塞的线程 3、Some other thread interrupts one of the other waiting threads; or 4、在条件队列等待状态的线程出现等待超时的情况 4、Some other thread times out while waiting for barrier; or 5、有些线程主动使用reset方法 5、Some other thread invokes reset on this barrier. */ publicintawait()throws InterruptedException, BrokenBarrierException { try { //具体的阻塞机制和唤醒机制是在dowait实现。 return dowait(false, 0L); } catch (TimeoutException toe) { thrownew Error(toe); // cannot happen } }
/** * Main barrier code, covering the various policies. */ // 注释说这里面有多种处理策略 privateintdowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); // 先获取独占锁的这种设计对于条件队列来说已经再熟悉不过 try { // 临时变量g指向当前代(或者本局游戏) final Generation g = generation; // 1、此线程执行await前,当前屏障就已经被本局中的其他线程提前打破,则此线程只能抛出BrokenBarrierException。 if (g.broken) thrownew BrokenBarrierException(); // 2、线程还未进入条件队列就已经被外界中断,那么“所有线程都到达当前屏障的情况一定不会满足了”, 那么采取需要这样的操作:Sets current barrier generation as broken and wakes up everyone,并抛出中断异常。 if (Thread.interrupted()) { breakBarrier(); thrownew InterruptedException(); } // 3、线程调用await会在这里对计时器减1,或者说:每有一个线程到达屏障,计时器就减1 int index = --count; // 4、这里是关键的逻辑:当计时器为0时,说明所有的线程都到达了屏障,那么最晚到的那个线程可以开始执行一个指定任务:barrierCommand,然后再唤醒所有线程 if (index == 0) { // tripped boolean ranAction = false; // 字面意思也可以猜出:是否成功运行了任务 try { // 5、这里就是demo里面的由最晚到的C同学来完成“通知可以进行下一关”的任务 final Runnable command = barrierCommand; if (command != null) command.run(); // 由最晚到屏障的线程来执行给定的任务 ranAction = true; // 6 "开启下一关":这最晚的到屏障的线程(C同学)通知那些已经在条件队列阻塞的线程们“开始进行下一局游戏” nextGeneration(); return0; } finally { // 7、如果最晚到达屏障的线程没能“成功执行给定的任务”,那么也要唤醒条件队列里面的线程,表明本局游戏中止 if (!ranAction) breakBarrier(); } }
// loop until tripped, broken, interrupted, or timed out // 8、来到这里,说明执行await的线程一定不是最晚到达屏障的,那就先进入条件队列里面阻塞等待着,直到triped、当前屏障被打翻、在条件队列阻塞过程中被中断、超时等待,只要其中一个发生,此线程都会被唤醒。 for (;;) { try { // 9、如果使用普通的await,则调用AQS条件队列里面普通await,如果使用超时的await,那么就调用AQS条件队列里面有超时计时的awaitNanos方法 if (!timed) trip.await(); elseif (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 10、执行流来到这里,说明第9点提到线程在条件队列等待中被外界中断唤醒,如果唤醒线程发现还在本局“游戏”进度中且屏障还未被打翻,那么自己就去做breakBarrier这个操作:因为CyclicBarrier的设计要求只要任一在条件队列阻塞的线程被中断唤醒,那么“本局游戏”就认为是“输了”,表示本局需要中止。 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // 11 、来到这里,考察第10点提到的两个条件中,(1)如果g!=generation,说明这个醒来的线程发现自己处于在“下一局游戏环境中”,既然这样自己可以继续执行,不必打破已经处在“下一局”的环境,至多将自己标记为中断过 //(2)如果g == generation 且 g.broken=true,说明当前局的屏障被(其他醒来的线程)打翻过,自己只需补一个中断标记即可,不必重新又来一个breakBarrier()。 // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } /* 11、由于g.broken要么被当前线程自己来标记,要么就是本局中其他线程比当前线程提早标记g.broken,因此分为以下几种情况。也即对应dowait的上面各种策略 ① 此线程执行await前,当前屏障就已经被本局中的其他线程提前打破 ② 如果本局中最晚到达屏障的线程没能“成功执行给定的任务” ③ 如果本局中已经在条件队列中阻塞过程中被外界中断或者超时中断唤醒 ④ 本局中线程或者外界线程调用reset 或者参考await方法的注释 Some other thread interrupts the current thread; or Some other thread interrupts one of the other waiting threads; or Some other thread times out while waiting for barrier; or Some other thread invokes reset on this barrier. */ if (g.broken) thrownew BrokenBarrierException(); // 12、说明当前线程是被最晚到达屏障的线程(例如demo的C同学)唤醒来的,那么说明“本局所有线程在玩游戏过程中都是正常执行,没有任何异常发生”,因为最晚到达屏障的线程它会在dowait第6点逻辑中执行nextGeneration,从而使得g指向下一局(指向下一代) if (g != generation) return index; // 13、对于超时等待引起的中断从而唤醒正在条件队列等待的线程,那只能宣告“本局游戏中止,当前屏障要标记为打翻”然后抛出timeout异常 if (timed && nanos <= 0L) { breakBarrier(); thrownew TimeoutException(); } } } finally { lock.unlock(); } }
nextGeneration方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */ // 在dowait方法的第6个逻辑:唤醒那些已经到达屏障然后在条件队列中阻塞的线程们(wakes up everyone):“可以开始开启下一局” privatevoidnextGeneration(){ // signal completion of last generation trip.signalAll(); // set up next generation // 计时器重置为指定的线程数量,以便在下一局又可以重新对到达屏障的线程计数 count = parties; // “开始下一局”、“开始下一关”、“开始下一代” 这三种表达的意思都一样 generation = new Generation(); }
breakBarrier方法
1 2 3 4 5 6 7 8 9 10 11
/** broken有打翻、破碎的、中止了的意思,在这里可以理解本局中止了、本关中止了、当前代中止了。 * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */ // 在dowait方法的第7个逻辑:最晚到达屏障的线程没能“成功执行给定的任务”,这种情况称为“当前屏障被打翻”(不建议直接翻译,还是按current barrier generation as broken去理解显然更为自然) privatevoidbreakBarrier(){ generation.broken = true; // 将当前局(当前代、当前关口)设为中止了。 count = parties;// 重置计数器 trip.signalAll(); // 即使最晚到达屏障的线程没能“成功执行给定的任务”,它也要将阻塞在trip条件队列里面的所有线程唤醒。 }
/** * Resets the barrier to its initial state. If any parties are * currently waiting at the barrier, they will return with a * {@link BrokenBarrierException}. Note that resets <em>after</em> * a breakage has occurred for other reasons can be complicated to * carry out; threads need to re-synchronize in some other way, * and choose one to perform the reset. It may be preferable to * instead create a new barrier for subsequent use. */ publicvoidreset(){ final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }