/** 因为阻塞队列第一个线程节点使用 tryAcquireShared获取可用资源后,剩余量r>=0,因此它可以出队,并且检查后驱节点是否以共享模式等待,如果是那么只要满足剩余量大于0或者后驱节点的PROPAGATE已经设置,那么就要讲唤醒动作传播下去,也即调用doReleaseShared来实现唤醒动作接力。注意到Doug lea有提到该方法会引起不必要的唤醒动作。 * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ privatevoidsetHeadAndPropagate(Node node, int propagate){ Node h = head; // Record old head for check below 保留旧头节点,用于之后检查 setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ /* 这里设计很机智: 条件① 如果可用资源量propagate>0,当然可以和后驱节点(如果存在)说:你们可以醒来啦,state这边还有资源可以抢占 条件②条件③是联合设计:如果旧头节点不为null那么就可以判断h.waitStatus头结点同步状态值,如果<0,说明waitStatus可能是SIGNAL或者PROPAGATE,故考虑唤醒传播逻辑 条件④:其实条件④就是Doug Lea经常用的一个trick写法,对于高并发场景下,可以在判断条件里面进行双重检查,也即前一刻读取一次,在后一刻马上读取一次 在这里,直接在条件里面重新读取新的阻塞队列头节点,如果为不空,就会跟前一刻的条件②条件③联合起来的逻辑一样。 */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 取出第一个线程节点的后驱节点,它可能null也可能是一个正在等待的线程节点 /* 唤醒后继共享节点 ① s后驱节点为null,为何还要将唤醒动作接力下去? 这是因为虽然上一课刻读取node.next为null,但是在下一刻如果恰好有新的线程节点入队,那么此时doReleaseShared就可以派上用场,但如果下一刻还是没有新线程入队,那么doReleaseShared也会被调用,这就是注释提到的“may cause unnecessary wake-ups”,也即引起不必要的唤醒调用。 ② 若s不为null,恰好保证第二个条件s.isShared()不会发生空指针异常且可以只要s.isShared()为true,说明后驱节点是共享模式,需要被唤醒去抢锁资源的。 这里之所以加入判断后驱节点s是共享模式节点的前提下才能执行唤醒操作,是因为在ReentrantReadWriteLock的读写锁同步器设计内部的阻塞队列中可能同时存在读线程节点(共享模式,需要继续被唤醒)和写线程节点(独占模式),而对于Semaphore同步器,只有s.isShared()为true才会将唤醒动作接力下去,否则说明此s节点是独占模式的节点(或者说此时用户使用的是ReentrantReadWriteLock同步器场景),不需要执行唤醒传播逻辑。 */ if (s == null || s.isShared()) // 如果s不是null且s.isShared()不是true,说明当前使用setHeadAndPropagate是ReentrantReadWriteLock使用场景,那么此时该节点是一个写节点,不需要传播唤醒,因为这个写线程会等之前那个获得独占锁的写线程去唤醒。 // 当然这部分内容最好有了ReentrantReadWriteLock设计原理的基础后再来理解则更容易明白。 doReleaseShared(); } }
6、doReleaseShared()
正如注释里面的说明:signals successor and ensures propagation,通知后驱节点以及保证唤醒传播(接力)下去
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ privatevoiddoReleaseShared(){ /* 由于在ReentrantReadWriteLock的阻塞队列里面可能同时存在独占模式的线程节点和共享模式的线程节点,因此该唤醒方法分为两种情况:如果遇到阻塞队列第一个线程节点是需要唤醒的 * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { // 注意到这里用到自旋,确保唤醒动作能够传播到下一个节点 Node h = head; // 只要阻塞队列不是空链表(至少存在一个线程节点) if (h != null && h != tail) { int ws = h.waitStatus; // ① 共享模式下,阻塞队列里面的线程节点的前驱节点waitStatus都会被设置SIGNAL,因此可以到这里如果直到head的ws是SIGNAL,就可以进行唤醒操作 if (ws == Node.SIGNAL) { // 确保能将头节点从SIGNAL改为0才能进行唤醒后驱节点的操作 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } // ② 如果此时头节点ws是0,就将其设为PROPAGATE,以保证唤醒操作能够传播到下一个节点 elseif (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果头节点已经被更新(也即inconsistent read),只能回到for循环继续重试 if (h == head) // loop if head changed break; } }
// Semaphore内部tryReleaseShared实现逻辑,对state进行CAS累加 protectedfinalbooleantryReleaseShared(int releases){ for (;;) { int current=getState(); int next=current+releases; if (next<current) thrownew Error("Permit count underflow"); if (compareAndSetState(current,next)) returntrue;
} }
// 如果释放资源成功,也即state值加1(假设每次请求1个资源),那么进入AQS的共享模式下的唤醒操作: if (tryReleaseShared(arg)) { doReleaseShared();
protectedinttryAcquireShared(int acquires){ for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }