// 3、java.util.concurrent.locks.AbstractQueuedSynchronizer protectedfinalbooleancompareAndSetState(int expect, int update){ // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
/** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstractstaticclassSyncextendsAbstractQueuedSynchronizer{ privatestaticfinallong serialVersionUID = -5179523762034025860L;
/** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstractvoidlock();
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ finalbooleannonfairTryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); returntrue; } } elseif (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow thrownew Error("Maximum lock count exceeded"); setState(nextc); returntrue; } returnfalse; }
protectedfinalbooleantryRelease(int releases){ int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) thrownew IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
protectedfinalbooleanisHeldExclusively(){ // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); }
final ConditionObject newCondition(){ returnnew ConditionObject(); }
/** * Reconstitutes the instance from a stream (that is, deserializes it). */ privatevoidreadObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } }
1、AQS工作原理
源码给出的官方设计思路:
1 2 3 4 5 6 7 8 9 10 11 12
Even though this class is based on an internal FIFO queue, it does not automatically enforce FIFO acquisition policies. The core of exclusive synchronization takes the form: Acquire: while (!tryAcquire(arg)) { enqueue thread if it is not already queued; possibly block current thread; } Release: if (tryRelease(arg)) unblock the first queued thread; (Shared mode is similar but may involve cascading signals.)
Wait queue node class. The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks. We instead use them for blocking synchronizers, but use the same basic tactic of holding some of the control information about a thread in the predecessor of its node. A "status" field in each node keeps track of whether a thread should block. A node is signalled when its predecessor releases. Each node of the queue otherwise serves as a specific-notification-style monitor holding a single waiting thread. The status field does NOT control whether threads are granted locks etc though. A thread may try to acquire if it is first in the queue. But being first does not guarantee success; it only gives the right to contend. So the currently released contender thread may need to rewait. To enqueue into a CLH lock, you atomically splice it in as new tail. To dequeue, you just set the head field. +------+ prev +-----+ +-----+ head | | <---- | | <---- | | tail +------+ +-----+ +-----+ Insertion into a CLH queue requires only a single atomic operation on "tail", so there is a simple atomic point of demarcation from unqueued to queued. Similarly, dequeuing involves only updating the "head". However, it takes a bit more work for nodes to determine who their successors are, in part to deal with possible cancellation due to timeouts and interrupts. The "prev" links (not used in original CLH locks), are mainly needed to handle cancellation. If a node is cancelled, its successor is (normally) relinked to a non-cancelled predecessor. For explanation of similar mechanics in the case of spin locks, see the papers by Scott and Scherer at http://www.cs.rochester.edu/u/scott/synchronization/ We also use "next" links to implement blocking mechanics. The thread id for each node is kept in its own node, so a predecessor signals the next node to wake up by traversing next link to determine which thread it is. Determination of successor must avoid races with newly queued nodes to set the "next" fields of their predecessors. This is solved when necessary by checking backwards from the atomically updated "tail" when a node's successor appears to be null. (Or, said differently, the next-links are an optimization so that we don't usually need a backward scan.) Cancellation introduces some conservatism to the basic algorithms. Since we must poll for cancellation of other nodes, we can miss noticing whether a cancelled node is ahead or behind us. This is dealt with by always unparking successors upon cancellation, allowing them to stabilize on a new predecessor, unless we can identify an uncancelled predecessor who will carry this responsibility. CLH queues need a dummy header node to get started. But we don't create them on construction, because it would be wasted effort if there is never contention. Instead, the node is constructed and head and tail pointers are set upon first contention. Wait queue node class.
Threads waiting on Conditions use the same nodes, but use an additional link. Conditions only need to link nodes in simple (non-concurrent) linked queues because they are only accessed when exclusively held. Upon await, a node is inserted into a condition queue. Upon signal, the node is transferred to the main queue. A special value of status field is used to mark which queue a node is on. Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill Scherer and Michael Scott, along with members of JSR-166 expert group, for helpful ideas, discussions, and critiques on the design of this class.
这里当然最权威的方式是解析AQS源代码的开发注释
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState, setState and compareAndSetState is tracked with respect to synchronization.
/** * Creates a new {@code AbstractQueuedSynchronizer} instance * with initial synchronization state of zero. */ protectedAbstractQueuedSynchronizer(){ }
/** * Wait queue node class. * # 这里就解释了CLH阻塞队列的设计细节 * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and * Hagersten) lock queue. CLH locks are normally used for * spinlocks. We instead use them for blocking synchronizers, but * use the same basic tactic of holding some of the control * information about a thread in the predecessor of its node. A * "status" field in each node keeps track of whether a thread * should block. A node is signalled when its predecessor * releases. Each node of the queue otherwise serves as a * specific-notification-style monitor holding a single waiting * thread. The status field does NOT control whether threads are * granted locks etc though. A thread may try to acquire if it is * first in the queue. But being first does not guarantee success; * it only gives the right to contend. So the currently released * contender thread may need to rewait. * * <p>To enqueue into a CLH lock, you atomically splice it in as new * tail. To dequeue, you just set the head field. * <pre> * +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+ * </pre> * * <p>Insertion into a CLH queue requires only a single atomic * operation on "tail", so there is a simple atomic point of * demarcation from unqueued to queued. Similarly, dequeuing * involves only updating the "head". However, it takes a bit * more work for nodes to determine who their successors are, * in part to deal with possible cancellation due to timeouts * and interrupts. * * <p>The "prev" links (not used in original CLH locks), are mainly * needed to handle cancellation. If a node is cancelled, its * successor is (normally) relinked to a non-cancelled * predecessor. For explanation of similar mechanics in the case * of spin locks, see the papers by Scott and Scherer at * http://www.cs.rochester.edu/u/scott/synchronization/ * * <p>We also use "next" links to implement blocking mechanics. * The thread id for each node is kept in its own node, so a * predecessor signals the next node to wake up by traversing * next link to determine which thread it is. Determination of * successor must avoid races with newly queued nodes to set * the "next" fields of their predecessors. This is solved * when necessary by checking backwards from the atomically * updated "tail" when a node's successor appears to be null. * (Or, said differently, the next-links are an optimization * so that we don't usually need a backward scan.) * * <p>Cancellation introduces some conservatism to the basic * algorithms. Since we must poll for cancellation of other * nodes, we can miss noticing whether a cancelled node is * ahead or behind us. This is dealt with by always unparking * successors upon cancellation, allowing them to stabilize on * a new predecessor, unless we can identify an uncancelled * predecessor who will carry this responsibility. * * <p>CLH queues need a dummy header node to get started. But * we don't create them on construction, because it would be wasted * effort if there is never contention. Instead, the node * is constructed and head and tail pointers are set upon first * contention. * * <p>Threads waiting on Conditions use the same nodes, but * use an additional link. Conditions only need to link nodes * in simple (non-concurrent) linked queues because they are * only accessed when exclusively held. Upon await, a node is * inserted into a condition queue. Upon signal, the node is * transferred to the main queue. A special value of status * field is used to mark which queue a node is on. * * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill * Scherer and Michael Scott, along with members of JSR-166 * expert group, for helpful ideas, discussions, and critiques * on the design of this class. */ staticfinalclassNode{ /** Marker to indicate a node is waiting in shared mode */ // SHARED变量表示节点被标记为共享模式,可以看到是一个空的Node()对象,用于Semaphore这种共享模式的加锁工具 staticfinal Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ // EXCLUSIVE为null表示节点被标记为独占模式,用于类似ReentrantLock这种独占锁工具 staticfinal Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */ // waitStatus变量值若为1,表示当前线程节点已经被取消排队(注意虽然被标记为取消状态,但还未出队) staticfinalint CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ // waitStatus变量值若为-1,表示当前线程节点的后驱线程节点需要被唤醒 staticfinalint SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ // waitStatus变量值若为-2,表示当前线程节点处在条件等待当中 staticfinalint CONDITION = -2; // 用于共享模式的锁工具的连续唤醒操作设计,ReentrantLock用不上此设计,Semaphore这种工具可以用上。 staticfinalint PROPAGATE = -3;
/** * 以下是关于waitStatus变量取不同值时对应的场景说明 * waitStatus用来表示当前线程节点的状态,仅取以下5个值 * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * SIGNAL的说明:当前线程节点的后驱节点发出SIGNAL唤醒的通知,当前线程需要在释放资源后或者自身被标记为取消状态后去唤醒挂在身后的后驱节点,为了避免无畏线程竞争,实现acquire功能的方法必须首先给出自己需要被通知,然后再重试“atomic acquire”,如果获取失败,则阻塞自己(而不是一直去CAS重试) * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * SIGNAL的说明:由于超时或中断,节点被取消。此类节点不会脱落此取消状态。取消节点的线程不会再次阻塞。 * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * CONDITION的说明:节点当前位于“condition queue”,在被放到CLH队列之前,它不会用作同步队列节点,此时状态将设置为0。表示节点在等待队列上,当其他线程调用了Condition的signal方法后,CONDITION状态的节点将从等待队列转移到同步队列中,等待获取资源。 * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * PROPAGATE的说明:应将releaseShared操作传播到其他节点,这是在doReleaseShared中设置的(仅针对头部节点),以确保releaseShared操作能够传播继续进行,即使其他操作已经介入。 * 换句话说:在共享模式下,前驱节点线程节点不仅要唤醒其后驱线程节点,同时也会唤醒后驱线程节点的后驱线程节点,类似一路唤醒下去。 * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * waitStatus采用这几个值得设计是为了方便使用,非负值表示节点不需要signal,所以,大多数代码不需要检查特定的值,只需要检查符号即可。 * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). * waitStatus状态值,对于正常同步节点,它会被初始化为0,而对于condition nodes,则会被置为“CONDITION”,一般使用CAS去更新waitStatus的值。 */ volatileint waitStatus;
/** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ /* 结合前面的prev指针用于处理线程被取消的情形。 The "prev" links (not used in original CLH locks), are mainly needed to handle cancellation. If a node is cancelled, its successor is (normally) relinked to a non-cancelled predecessor. non-callcelled node -> a node ->successor non-callcelled node ->successor */ volatile Node prev;
/** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ /* next指针被设计为“We also use "next" links to implement blocking mechanics”。 已经被取消的节点会将next指针指向自己而不是把next指向null,这一点设计很像SkipList里面的remove设计逻辑这种设计有利于GC。 */ volatile Node next;
/** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ /* 用于指向获取锁的线程,可以看到其实Node节点就是包装需要获取锁的线程,因此也可以称为线程节点。 */ volatile Thread thread;
/** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ /* ReentrantLock用不到此属性,CountDownLatch、CyclicBarrier等这种共享模式锁工具可以用到此属性。 nextWaiter特殊标记,Node在CLH队列时,nextWaiter表示共享式或独占式标记,也即nextWaiter=SHARED;Node在条件队列时,nextWaiter表示下个Node节点指针 */ Node nextWaiter;
/** * Returns true if node is waiting in shared mode. 判断线程节点是否处于共享模式 */ finalbooleanisShared(){ return nextWaiter == SHARED; }
/** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor()throws NullPointerException { Node p = prev; if (p == null) thrownew NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker }
// 没有成功获取资源(例如锁)的线程就会使用此包装为一个Node,此节点会被addWaiter方法使用,也即将节点入队(此队列称为wait queue) Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }
Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ // 阻塞队列(双向链表)的头结点,惰性创建,仅能在setHead方法能够对head指向做调整 privatetransientvolatile Node head;
/** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ // 阻塞队列(双向链表)的尾结点,惰性创建,仅能在enq方法能够对tail指向做调整 privatetransientvolatile Node tail;
/** * The synchronization state. */ // 这个state变量就是AQS子类需要去更爱的,用于控制资源 privatevolatileint state;
/** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */ protectedfinalintgetState(){ return state; }
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ publicfinalvoidacquire(int arg){ /* 从该方法可以看出:线程在NonfairSync里面抢占更新state失败,也还有机会再来去申请资源, 也即上面的tryAcquire,如果成功,那么就返回true,如果线程还是tryAcquire失败,那么此时线程就会被排队:acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ,然后进入selfInterrupt() */ if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
/** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstractstaticclassSyncextendsAbstractQueuedSynchronizer{ privatestaticfinallong serialVersionUID = -5179523762034025860L;
/** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ // NonfairSync类的lock()方法实现了该抽象方法 abstractvoidlock();
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ // 这里就是AQS的tryAcquire在子类Sync的具体实现逻辑 finalbooleannonfairTryAcquire(int acquires){ final Thread current = Thread.currentThread(); // 获取同步状态state值 int c = getState(); //①如果同步状态此时为0,说明没有其他线程在竞争,那么当前线程果断使用CAS尝试将state从0更新为1,这里也再次证明新线程使用lock.lock()第一次if (compareAndSetState(0, 1))不成功,在tryAcquire里面还有机会再尝试一次compareAndSetState,如果本次成功CAS那么将自己设为独占线程,获得锁资源,返回true。 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); returntrue; } } /* ② 如果当前线程就是之前设置的独占线程,说明当前线程再次“重入”获取锁,那么只要重入次数最大值只要不溢出,那么就可以让当前线程再次“重入”并更新同步状态state的值,这就是ReentrantLock可重入锁设计的原理,最多可以重入多少次? 首先state是int类型,因此最大值为: max=(1<<31)-1 所以才会有以下的用法: double tooLarge=Math.pow(2,31); for (int i = 1; i <= tooLarge; i++) { lock.lock(); // 同一线程,重入加锁到最大值,其实内部就是对state进行CAS累加,也即setState(nextc),直到state值累加到超过最大值则抛出"Maximum lock count exceeded" } for (int i = 1; i <= 5; i++) { lock.unlock(); } */ elseif (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow thrownew Error("Maximum lock count exceeded"); setState(nextc); returntrue; } /*③ 若无法满足①、②条件,说明当前线程面临激烈锁资源竞争,那么只能返回false,接着就被安排去入队,也即AQS里面的 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); */ returnfalse; }
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ /*对于独占模式来说,addWaiter的功能就是将当前线程包装为线程节点,并使用CAS将自己添加到阻塞队列的尾部,分两个步骤: 1、优先快速尝试将自己通过CAS加到阻塞队列尾部,如果入队失败就进入2 2、使用enq(node)确保自己最后可以加入到阻塞队列尾部(或者入队成功) */ private Node addWaiter(Node mode){ Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //1、 node<=>node<=>pred 变成 node<=>node<=>pred<=>new Node if (pred != null) { // 新线程节点的前驱节点指针指向pred,如果pred节点没有被其他线程更改,那么CAS成功就会将pred的next指针指向新线程节点,从而使得新线程节点成功入队。 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; //返回线程节点给到外部调用方,也即acquireQueued方法 return node; } } //2、enq内部使用自旋(for循环)保证线程一定能入队 enq(node); //返回线程节点给到外部调用方,也即acquireQueued方法 return node; }
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ // 将线程节点插入双向链表(阻塞队列)尾部,如果链表为空,则需要先初始化后再插入节点 private Node enq(final Node node){ // 自旋,保证线程一定能在某次循环中CAS入队成功 for (;;) { Node t = tail; // 如果链表尾节点为空,说明阻塞队列还未创建,因此需要初始化,这时新建一个非线程节点放在链表头部。 if (t == null) { // Must initialize /* 这里非常关键:双向链表的头节点不是线程节点,而是一个无参构造方法的节点,这里可以称为辅助节点,就像ConcurrentSkipList底层的数据链表头节点(BASE_HEADER)也是辅助节点的设计: head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),null, null, 1); */ if (compareAndSetHead(new Node())) tail = head; // 如果双向链表已经存在,则线程节点尝试使用CAS将自己添加到队列尾部 } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; // 返回线程节点给到外部调用方,也即acquireQueued方法 return t; } } } }
未能获取的锁资源(未能对state更新成功的)新线程通过addWaiter或者enq方法让自己添加到阻塞队列尾部,或者这样理解:在addWaiter里面首先尝试,也就代码里面的注释所表达的意思:Try the fast path of enq; backup to full enq on failure
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ /*非常重要的源码注释:根据4.3,如果线程节点未能p == head && tryAcquire(arg),则检查并更新该节点里面的Node.SIGNAL状态值。如果线程节点应该被阻塞,则返回true。 为了理解以下设计逻辑,这里不妨先假设当前链表结构为: head(waitStatus=0) <-> node(waitStatus=0) -> null 那么这里pred显然指向head(waitStatus=0) ,node指向第一个线程节点node(waitStatus=0) */ privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node){ // waitStatus也即线程节点的等待状态,新线程节点创建时waitStatus默认为0, int ws = pred.waitStatus; // ①waitStatus变量值若为Node.SIGNAL也即等于-1,说明对于pred(-1)<->node(ws=0)来说,pred节点其实已经是阻塞状态了,那么node作为pred的后驱节点也肯定要被阻塞,因此返回true后,在parkAndCheckInterrupt里面node就会被阻塞起来 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; /* ② 如果waitStatus的值>0,也即waitStatus=Node.CANCELLED=1,说明pred线程节点取消排队,考察以下阻塞队列结构 head(ws=-1)<->node1(ws=-1)<->node2(ws=1)<->node3(ws=0)-> null 假设pred=node2,node=node3,显然node2已经取消排队,那么node3不能跟在它后面,因此需要将node2出队,也即node.prev = pred = pred.prev,有: head(ws=-1)<->node1(ws=-1)<->node3(ws=0)-> null 如果链表中不止一个node2取消排队,还有很多节点也是处于“取消排队状态”,那么就使用前向遍历,直到找到有一个前驱线程节点不是取消排队的节点(说明此节点状态可靠),然后把它作为node3的前驱节点。 */ if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { //将原来写法 node.prev = pred = pred.prev拆分为下面写法 pred=pred.prev; node.prev = pred } while (pred.waitStatus > 0); // 经过前向遍历后,所有取消状态的节点都被出队,那么到这里就可以找到一个可靠状态的pred前驱节点,将要处理的node挂在它后面即可 pred.next = node; } else { /*注意这里的注释,非常重要: * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //③ 执行流来到这里说明前驱节点waitStatus要么是0要么是PROPAGATE,此时需要将待处理的node节点前驱节点pred的waitStatus设为SIGNAL值,以表示我作为pred的后驱节点需要等待被唤醒,但还没进入park阻塞状态。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; }
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;
线程节点就可以自己阻塞了
1 2 3 4 5 6 7 8 9 10 11
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ privatefinalbooleanparkAndCheckInterrupt(){ LockSupport.park(this); return Thread.interrupted(); }
12、回到acquire(1)
1 2 3 4 5 6 7 8 9 10 11
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ finalvoidlock(){ if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
// Skip cancelled predecessors /* ① 例如按上面的链表节点结构假设,node4被设为CANCELLED状态前,必须先找到一个非CANCELLED的前驱节点,以便node4取消排队后,其后驱节点node5才可以挂靠在一个“可靠的前驱节点”pred,显然这个pred就是node1节点: 也即:head<->node1(非取消)<->node4(非取消)<->node5 而node2和node3线程节点都是取消排队状态,因此需要跳过它们:Skip cancelled predecessors */ Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. // pred.next=node ,pred=node.prev Node predNext = pred.next;
// Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. /* ② 继续上面已经跳过cancelled predecessors,有: head<->node1(非取消)<->node4(非取消)<->node5,此时可以把node4设为取消状态 也即: head<->node1(非取消)<->node4(CANCELLED)<->node5 做这个操作有个收益: other Nodes can skip past us,也就是说其他当外面有新线程入队需要做shouldParkAfterFailedAcquire等操作时,都会忽略这个已经是CANCELLED状态的node4节点,这样cancelAcquire操作就不会被外界线程节点干扰到。 acquireQueued和shouldParkAfterFailedAcquire回答了准备设置“CANCELLED”线程节点时机,而cancelAcquire的设计回答在什么条件下将“准备取消节点”设为“CANCELLED” */ node.waitStatus = Node.CANCELLED; /* ③ 对于②链表结构:head<->node1(非取消)<->node4(CANCELLED)<->node5,如果node4本身就是tail节点(此时node5其实是null),那么将node4进行CAS设为null后,链表结构变为: head<->node1(非取消)->null,也即注释里面说到的:If we are the tail, remove ourselves. */ if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; /* ④ 根据③得到的链表结构且node4不是tail节点(此时node5是线程节点), 有:head<->node1(非取消)<->node4(CANCELLED)<->node5 if条件1:如果pred不是头节点而且指向具体线程(也即node1不是头节点)且pred节点也即node1.ws是SIGNAL,那么则进入if if条件2:如果pred不是头节点而且指向具体线程(也即node1不是头节点)且pred节点也即node1.ws<=0且经过CAS将node1.ws设为SIGNAL,那么则进入if。 条件1意思是说如果node1.ws已经是SIGNAL,那么直接将node5(不是null且未取消)设为node1的后驱节点,这样就能正确的删除node4节点,而且能正确的将后面的线程节点挂在node1后面 head<->node1(ws=SIGNAL)<->node5(ws非取消) 条件2的意思是说如果node1.ws是未取消状态值那么就用CAS将node1.ws设为SIGNAL,然后再把node5(要求不是null且未取消)挂在node1后面 */ if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; // 要求即将被删除的node4节点的后驱节点node5既不是null且未取消状态才能将node5挂在node4前驱节点pred的后面 if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { /* ⑤ 若不满足④说明,node4的前驱节点就是head节点,也即: head<->node4(CANCELLED)<->node5 那么,node4很快会被删除,那么node5(如果node5不是null也不是非取消节点)就作为阻塞队列的第一个线程节点当然可以优先出队去获外面其他线程释放的锁资源 因此ndoe4删除前,需要先唤醒后驱节点node5,接下来就来到4.6下面的章节逻辑 */ unparkSuccessor(node); }
/** * Wakes up node's successor, if one exists. * * @param node the node */ // 接上面4.5的⑤内容,如果node4有后驱节点,那么就node4需要唤醒这个后驱节点(假设后驱节点不是null也不是取消状态) privatevoidunparkSuccessor(Node node){ /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ // 由于接上面4.5流程,node4已经设为CANCELLED,那么以下if逻辑会被跳过 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ /* node4的后驱节点一定是个正常的线程节点吗? node4.next可能是以下几种情况: ① node4->null ② head<->node4(CANCELLED)<->node5(CANCELLED)<->node6(CANCELLED)<->node7(非取消状态).... ③ head<->node4(CANCELLED)<->node5(非取消).... */ Node s = node.next; // 先取出node4的后驱节点 // 针对后驱节点为null或者后驱节点是取消状态的情况,如①、②链表结构所示 if (s == null || s.waitStatus > 0) { s = null; // 从阻塞链表(阻塞队列)尾部向前遍历,找出一个waitStatus是不取消状态的的后驱节点,这个节点就是node4要唤醒的正常状态节点(总不能让node4唤醒一个已经取消排队或者为null的后驱节点吧) for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 针对情况③ node4的直接后驱节点就是正常状态节点或者经过"traverse backwards"找到的the actual non-cancelled successor,唤醒之 if (s != null) LockSupport.unpark(s.thread); }
以上完成reentrantlock的加锁lock.lcok() 过程
在unparkSuccessor为何采用从链表尾部向前遍历,而不是正序遍历去找一个the actual non-cancelled successor ?
首先在AQS的开头源代码注释里面有提到:
The “prev” links (not used in original CLH locks), are mainly needed to handle cancellation. If a node is cancelled, its successor is (normally) relinked to a non-cancelled predecessor.
/** * Queries whether any threads are waiting to acquire. Note that * because cancellations due to interrupts and timeouts may occur * at any time, a {@code true} return does not guarantee that any * other thread will ever acquire. * * <p>In this implementation, this operation returns in * constant time. * * @return {@code true} if there may be other threads waiting to acquire */ // 返回队列里面是否有正在等待获取锁资源的线程节点,返回true也不代表存在“正在等待获取锁资源的线程节点”,因为“线程取消排队或者timeout”的情况随时可以发生,这会导致head != tail有二义性。 publicfinalbooleanhasQueuedThreads(){ return head != tail; }
/** * Queries whether any threads have ever contended to acquire this * synchronizer; that is if an acquire method has ever blocked. * * <p>In this implementation, this operation returns in * constant time. * * @return {@code true} if there has ever been contention */ // 返回是否曾经有线程出现过竞争同步器以获取锁资源,或者说acquire方法被阻塞过。简单的说:若head不为null,说明有线程节点执行enq方法里面的`compareAndSetHead(new Node())`,也即有竞争 publicfinalbooleanhasContended(){ return head != null; }
/** * Returns the first (longest-waiting) thread in the queue, or * {@code null} if no threads are currently queued. * * <p>In this implementation, this operation normally returns in * constant time, but may iterate upon contention if other threads are * concurrently modifying the queue. * * @return the first (longest-waiting) thread in the queue, or * {@code null} if no threads are currently queued */ // 返回阻塞队列中的第一个(等待时间最长的)线程节点,注意这可不是返回head这个头结点(辅助节点)。如果当前没有线程节点排队,则为null publicfinal Thread getFirstQueuedThread(){ // handle only fast path, else relay // 先快速判断。head==tail说明阻塞队列还未创建,直接返回null即可,否则使用fullGetFirstQueuedThread去获取第一个线程节点。这种“投机性先实施A失败再实施B”的设计其实在ConcurrentHashMap里面的addCount:先尝试对baseCount累加失败则使用fullAddCount确保能够累加计数成功。 return (head == tail) ? null : fullGetFirstQueuedThread(); }
/** * Version of getFirstQueuedThread called when fastpath fails */ private Thread fullGetFirstQueuedThread(){ /* * The first node is normally head.next. Try to get its * thread field, ensuring consistent reads: If thread * field is nulled out or s.prev is no longer head, then * some other thread(s) concurrently performed setHead in * between some of our reads. We try this twice before * resorting to traversal. */ // h:head节点的临时变量,s:head节点的后驱节点(successor)临时变量 Node h, s; Thread st;// 后驱节点指向的线程:successorthread /* 此设计非常巧妙: 正常来说,阻塞队列的第一个线程节点就是我们要找的“first queued thread”节点,那么为了确保获取该节点刚开始读的线程节点,必须要保证前后一致性读:不一致性读是如何出现的呢? 考察这种情况,第一次读node.thread不为空但第二读node.thread已经是null,或者第一次读s.prev是原来的head节点,第二次读s.prev已经不是原来的head节点,这是因为在这两次读的过程中,有其他线程节点正在并发执行setHead操作,那么如何快速实现一致性读呢? 直接实施两次尝试即可! if ( (第一次尝试)|| (第二次尝试)) 第一次尝试:要求head节点不是null且后驱节点s不为null且s.prev还是指向原head节点且后驱节点的thread引用不为null 第二次尝试: 要求head节点不是null且后驱节点s不为null且s.prev还是指向原head节点且后驱节点的thread引用不为null 第一次尝试成功说明没有其他线程干扰确实是一致性读,那么head的后驱节点s就是要返回的first queued thread 第一次尝试失败说明有其他线程正在进行setHead操作,那么需要再投机性的进行第二次尝试。 如果第一次尝试失败、第二次尝试也失败,那么只能通过遍历阻塞队列的方式去找`first queued thread` */ if (((h = head) != null && (s = h.next) != null && s.prev == head && (st = s.thread) != null) || ((h = head) != null && (s = h.next) != null && s.prev == head && (st = s.thread) != null)) return st;
/* * Head's next field might not have been set yet, or may have * been unset after setHead. So we must check to see if tail * is actually first node. If not, we continue on, safely * traversing from tail back to head to find first, * guaranteeing termination. */ /* 因为head节点next指针可能还未开始设置,又或者有线程在执行setHead过程中head的next还未设置好,因此直接无法利用next指针从链表头部往后遍历。 所以需要这么做:先检查tail节点不为空时且tail节点不是head节点,说明此时有阻塞队列且有线程节点队列里面,这样就可以safely从链表尾部开始遍历,直到t指针指向head节点时说明t的前驱节点已经来到head位置即可结束遍历, */ Node t = tail; Thread firstThread = null; while (t != null && t != head) { Thread tt = t.thread; // firstThread不断更新指向途中经过的“非取消”的线程节点的线程引用 // 或者说跳过“取消排队”的线程节点 if (tt != null) firstThread = tt; t = t.prev; } return firstThread; }
/** * Returns true if the given thread is currently queued. * * <p>This implementation traverses the queue to determine * presence of the given thread. * * @param thread the thread * @return {@code true} if the given thread is on the queue * @throws NullPointerException if the thread is null */ // 判断给定线程是否在阻塞队列里面 publicfinalbooleanisQueued(Thread thread){ if (thread == null) thrownew NullPointerException(); // 从链表尾部开始遍历(safely traversing from tail back to head),只要遍历节点的thread==给定的thread引用,返回true。 for (Node p = tail; p != null; p = p.prev) if (p.thread == thread) returntrue; returnfalse; } // 还有其他共享模式下的队列方法,这里不在讨论范围。
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ publicfinalbooleanrelease(int arg){ // 这里的tryRelease就是子类Sync具体要实现的方法:将state值进行CAS减1操作 // (1)如果子类的tryRelease更新state同步状态且state值为0,就会返回true,那么说明外面有线程能够释放锁资源,此时就需要将阻塞队列里面的第一个线程节点唤醒 // (2) 如果子类的tryRelease更新state同步状态成功且state值不为0,则会返回false,说明当前有同一线程多次重入的锁并准备多次释放锁 if (tryRelease(arg)) { Node h = head; /* 取出头节点(辅助节点),如果阻塞队列存在或者头节点的后驱节点非取消非初始化,则该后驱节点需要被h唤醒 ①对于h=null,说明阻塞队列已经不存在或者还未创建,或者刚创建Head=null,但是还未有线程节点入队,显然这些情况都不需要再做“唤醒操作” ②对于h不是null,说明头节点已经是一个new Node()辅助节点,此时如果h.waitStatus == 0,说明阻塞队列刚完成初始化,仅有一个head辅助节点,单还未有其他线程节点入队,因此也不需要再做“唤醒操作” ③对于h不是null,h.waitStatus != 0取值有哪些呢,因为reentrant是独占模式,waitStatus的取值分别是1,-1,而-2,-3是共享模式不在讨论范围,因此当h.waitStatus=-1,那么恰好head的后驱节点就是需要唤醒的节点当然使用unparkSuccessor逻辑,而h.waitStatus=1,说明head节点的后驱节点是一个“取消排队”的节点,那么需要借助unparkSuccessor方面里面找到一个真正的非取消状态节点(the actual non-cancelled successor)作为要唤醒的节点 */ if (h != null && h.waitStatus != 0) unparkSuccessor(h); returntrue; } returnfalse; }
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ privatefinalbooleanparkAndCheckInterrupt(){ LockSupport.park(this); // 这里是线程节点在acquireQueued里面被阻塞自己的地方, // 唤醒后,将会执行下面这一句:返回线程是否中断状态 return Thread.interrupted(); }
// 关于Thread.interrupted();
/** * Tests whether the current thread has been interrupted. The * <i>interrupted status</i> of the thread is cleared by this method. In * other words, if this method were to be called twice in succession, the * second call would return false (unless the current thread were * interrupted again, after the first call had cleared its interrupted * status and before the second call had examined it). * * <p>A thread interruption ignored because a thread was not alive * at the time of the interrupt will be reflected by this method * returning false. * * @return <code>true</code> if the current thread has been interrupted; * <code>false</code> otherwise. * @see #isInterrupted() * @revised 6.0 */ // 返回当前线程是否被中断过,线程第一次调用将返回true,表示被中断,若线程两次调用,则第二次返回没被中断过,所以要注意其源码注释说明:after the first call had cleared its interrupted * status and before the second call had examined it publicstaticbooleaninterrupted(){ return currentThread().isInterrupted(true); }
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ finalvoidlock(){ if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ publicfinalvoidacquire(int arg){ if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
(2)由于Doug Lea采用这样的设计:Acquires in exclusive uninterruptible mode for thread already in queue(已在阻塞队列中的线程节点以独占且不可中断(不响应中断)的模式去等待获取锁资源)。只要有一个线程进入阻塞队列然后阻塞自己最后被唤醒且拿到锁的过程显然线程自己是有“中断过”的情况,因此拿到锁后,需要给自己补充一个“中断过”的标记,这样外界有其他逻辑检查这个线程的中断状态时就会知道它曾经中断过。
/** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ publicReentrantLock(boolean fair){ sync = fair ? new FairSync() : new NonfairSync(); }
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ // 以下是体现公平锁的设计思想: protectedfinalbooleantryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); //若state同步状态变量此时为0,那么说明线程可以直接去竞争锁资源 if (c == 0) { /* ① 请求独占锁前先问问阻塞队列有无阻塞线程正在等待锁(这就体现公平的原则,后者当然要等先到者) 如果阻塞队列有正在排队的线程节点,那么当前新来的线程就无法设置state值,只能去到③(假设非重入线程) 返回false,最终可能是需要进入acquireQueued逻辑把自己入队处理,因此这里的逻辑正是体现了获取锁资源的“公平性” */ if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); returntrue; } } // ② 如果当前线程就是持有锁的线程,当然可以让它多次重入,只要不超过重入次数最大值即可。 elseif (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) thrownew Error("Maximum lock count exceeded"); setState(nextc); returntrue; } // ③ returnfalse; } }
/** Queries whether any threads have been waiting to acquire longer than the current thread. 查询是否有线程节点的等待获取锁资源时间长于当前线程。 或者用以下表达式也可以达到相同目的,但是hasQueuedPredecessors比它更高效 An invocation of this method is equivalent to (but may be more efficient than): 获取第一个线程节点 != 当前节点且阻塞队列不为空,则返回true,说明当前节点排还不能马上去抢锁资源需要排在阻塞队列的线程节点之后 如果阻塞队列为空,则返回false,当前节点可以马上去获取锁资源 getFirstQueuedThread() != Thread.currentThread() && hasQueuedThreads() 由于中断或者timeout导致线程节点的“取消排队”的情况在任何时候都可以发生,因此返回true也不能保证其他线程就是比当前线程更早申请锁资源 同理,返回false也不能说明现在阻塞队列是空,因为return false的瞬间可能会有其他线程成功竞争到进入排队 Note that because cancellations due to interrupts and timeouts may occur at any time, a {@code true} return does not guarantee that some other thread will acquire before the current thread. Likewise, it is possible for another thread to win a race to enqueue after this method has returned {@code false}, due to the queue being empty. hasQueuedPredecessors方法用于公平同步器使用以避免线程之间的碰撞竞争。 This method is designed to be used by a fair synchronizer to avoid barging. Such a synchronizer's tryAcquire method should return false, and its tryAcquireShared method should return a negative value, if this method returns true (unless this is a reentrant acquire). For example, the tryAcquire method for a fair, reentrant, exclusive mode synchronizer might look like this: 公平的、可重入的、独占模式的同步器内部定义的tryAcquire应该是以下流程: protected boolean tryAcquire(int arg) { if (isHeldExclusively()) { // 表示同一线程重入获取锁资源,只需要增加state值即可 // A reentrant acquire; increment hold count return true; } else if (hasQueuedPredecessors()) { //如果阻塞队列里面有线程节点那么当前线程不能马上acquire到锁资源,要等其他线程节点,因此返回false return false; } else { // 其他情况说明锁资源没有其他线程竞争,当前线程节点可以直接去acquire // try to acquire normally } } 以上的设计流程就是Reentrantlock里面公平锁FairSync的tryAcquire实现流程 (1)如果有一个排队线程在当前线程之前则返回true,例如公平锁的FairSync,外面新来的当前线程使用tryAcquire则需要判断阻塞队列是否有阻塞线程,如果有,那么当前线程就没办法马上拿到锁,只能等,这就体现公平原则:先来先得,等得越久的那个线程节点优先获取锁资源 (2)如果当前线程恰好位于第一个线程节点或者阻塞队列没有线程节点,则返回false true if there is a queued thread preceding the current thread, and false if the current thread is at the head of the queue or the queue is empty */ publicfinalbooleanhasQueuedPredecessors(){ // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; // 返回true条件: // 情况1:head辅助节点已经完成初始化也即:compareAndSetHead(new Node()) 且 head的后驱节点第一个线程节点是null // 情况2:head辅助节点已经完成初始化也即:compareAndSetHead(new Node()) 且 head的后驱节点第一个线程节点不是当前节点 return h != t &&((s = h.next) == null || s.thread != Thread.currentThread()); }
8、到此可以总结公平和非公平的锁的区别
非公平:直接CAS竞争state这个锁资源,不需要询问阻塞队列是否有线程节点正在等待获取锁资源
1 2 3 4 5 6
finalvoidlock(){ if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
protectedfinalbooleantryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); returntrue; }
If a thread is not granted CPU time because other threads grab it all, it is called “starvation”. The thread is “starved to death” because other threads are allowed the CPU time instead of it. The solution to starvation is called “fairness” - that all threads are fairly granted a chance to execute.