yield-bytes

沉淀、分享与无限进步

Java高级主题:基于AQS驱动的ReentrantLock公平锁和非公平锁实现原理解析

本文是入门和理解AQS框架的重要文章,尽管AQS还有共享模式以及条件Condition等设计,但重入锁仍然是最适合理解AQS底层数据结构及其算法设计的切入点。

单线程使用可重入锁的内部简单工作机制

分别在以下两个断点位置进行debug,断点条件i==5,并且在variables窗口watch一个特殊的变量state

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockDemo {
public static ReentrantLock lock=new ReentrantLock();
public static void main(String[] args) {
for (int i = 1; i <= 5; i++) {
lock.lock(); // 断点位置
}
for (int i = 1; i <= 5; i++) {
lock.unlock();// 断点位置
}
}
}

可以看到lock.lock()的内部执行流程如下所示,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 1、java.util.concurrent.locks.ReentrantLock$NonfairSync@29444d75[State = 4, empty queue]
public void lock() {
sync.lock();
}

// 2、java.util.concurrent.locks.ReentrantLock
final void lock() {
if (compareAndSetState(0, 1)) // 重点:使用cas更新state的值
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

// 3、java.util.concurrent.locks.AbstractQueuedSynchronizer
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

// 4、java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 调用下面5在ReentrantLock定义的tryAcquire方法
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// 5、java.util.concurrent.locks.ReentrantLock
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

// 6、java.util.concurrent.locks.ReentrantLock
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();// 重点:获取state变量当前值
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc); // 重点:更新state变量值
return true;
}
return false;
}

经过这么一轮debug,可以观察到原来lock.lock() 通过cas更改state变量值来实现“可重入性”,例如,这里for循环5次使得该线程在同一锁对象上加锁了5次,可以看到对应的state累加计数等于5,基于此可以推出lock.unlock()操作则是每次unlock()就是对state进行cas减1操作,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

//1、java.util.concurrent.locks.ReentrantLock
public void unlock() {
sync.release(1);
}

// 2、java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

//3、java.util.concurrent.locks.ReentrantLock
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 当i=5次循环时,这里getState=1,releases=1
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 所有的重入都退出后,此刻不再有线程持有独占锁。
free = true;
setExclusiveOwnerThread(null);
}
setState(c); // 当i=5次循环时,state被置为0
return free;
}

其实这个state变量是一个非常重要的“同步状态”,记录了当前持有独占锁的线程的重入加锁次数,它在AbstractQueuedSynchronizer内部定义:

1
2
3
4
5
6

/**
* The synchronization state.
*/
private volatile int state;

ReentrantLock以上两个流程来看,重点需要解析sync.lock()sync.release(1),而这里sync实例是来自ReentrantLock内部定义Sync类,该类继承至AbstractQueuedSynchronizer,因此如果要真正理解ReentrantLock的可重入性,则需要深入底层的`AbstractQueuedSynchronizer,这是JUC众多锁工具的底层实现。

从Sync类了解AQS

在真正解析AQS之前,可以先看看在ReentrantLock内部定义的Sync,以下按ReentrantLock默认构造器进入分析流程

1
2
3
public ReentrantLock() {
sync = new NonfairSync();
}

NonfairSync是实现非公平锁的主要逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
# 这里说,当前线程当前直接无需排队去争抢锁资源,也即不是先到先到,所以是才称为非公平模式。抢不到才去阻塞队列里面排队
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

Sync类定义了获取锁的分配逻辑,这里涉及到对同步状态state更改、同一线程的锁重入、锁重入释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

/**

* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/

abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();

/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}

final ConditionObject newCondition() {
return new ConditionObject();
}

// Methods relayed from outer class

final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}

final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}

final boolean isLocked() {
return getState() != 0;
}

/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

1、AQS工作原理

源码给出的官方设计思路:

1
2
3
4
5
6
7
8
9
10
11
12
Even though this class is based on an internal FIFO queue, it does not automatically enforce FIFO acquisition policies. The core of exclusive synchronization takes the form:
Acquire:
while (!tryAcquire(arg)) {
enqueue thread if it is not already queued;
possibly block current thread;
}

Release:
if (tryRelease(arg))
unblock the first queued thread;

(Shared mode is similar but may involve cascading signals.)

等待队列(又称阻塞队列、阻塞链表、)内部工作原理:我们建议此工作原理在深度掌握AQS设计之前预读几遍,在深度掌握AQS源代码设计之后再回头理解它,你会发现能完全掌握AQS的整体设计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Wait queue node class.
The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks. We instead use them for blocking synchronizers, but use the same basic tactic of holding some of the control information about a thread in the predecessor of its node. A "status" field in each node keeps track of whether a thread should block. A node is signalled when its predecessor releases. Each node of the queue otherwise serves as a specific-notification-style monitor holding a single waiting thread. The status field does NOT control whether threads are granted locks etc though. A thread may try to acquire if it is first in the queue. But being first does not guarantee success; it only gives the right to contend. So the currently released contender thread may need to rewait.
To enqueue into a CLH lock, you atomically splice it in as new tail. To dequeue, you just set the head field.
+------+ prev +-----+ +-----+
head | | <---- | | <---- | | tail
+------+ +-----+ +-----+

Insertion into a CLH queue requires only a single atomic operation on "tail", so there is a simple atomic point of demarcation from unqueued to queued. Similarly, dequeuing involves only updating the "head". However, it takes a bit more work for nodes to determine who their successors are, in part to deal with possible cancellation due to timeouts and interrupts.
The "prev" links (not used in original CLH locks), are mainly needed to handle cancellation. If a node is cancelled, its successor is (normally) relinked to a non-cancelled predecessor. For explanation of similar mechanics in the case of spin locks, see the papers by Scott and Scherer at http://www.cs.rochester.edu/u/scott/synchronization/
We also use "next" links to implement blocking mechanics. The thread id for each node is kept in its own node, so a predecessor signals the next node to wake up by traversing next link to determine which thread it is. Determination of successor must avoid races with newly queued nodes to set the "next" fields of their predecessors. This is solved when necessary by checking backwards from the atomically updated "tail" when a node's successor appears to be null. (Or, said differently, the next-links are an optimization so that we don't usually need a backward scan.)
Cancellation introduces some conservatism to the basic algorithms. Since we must poll for cancellation of other nodes, we can miss noticing whether a cancelled node is ahead or behind us. This is dealt with by always unparking successors upon cancellation, allowing them to stabilize on a new predecessor, unless we can identify an uncancelled predecessor who will carry this responsibility.
CLH queues need a dummy header node to get started. But we don't create them on construction, because it would be wasted effort if there is never contention. Instead, the node is constructed and head and tail pointers are set upon first contention.
Wait queue node class.

Threads waiting on Conditions use the same nodes, but use an additional link. Conditions only need to link nodes in simple (non-concurrent) linked queues because they are only accessed when exclusively held. Upon await, a node is inserted into a condition queue. Upon signal, the node is transferred to the main queue. A special value of status field is used to mark which queue a node is on.
Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill Scherer and Michael Scott, along with members of JSR-166 expert group, for helpful ideas, discussions, and critiques on the design of this class.

这里当然最权威的方式是解析AQS源代码的开发注释

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState, setState and compareAndSetState is tracked with respect to synchronization.

AQS为众多同步器(例如semaphores,events,reentrantLock)提供了一个实现框架,该框架的底层是基于变体的FIFO等待队列(注意:此等待队列又称为阻塞队列)实现的,大部分同步器只需通过原子更改一个变量名为state值的方式即可完成相关同步状态控制,AQS的子类(也即自行设计的同步器)必须重新定义相关的protected方法——以更新state状态变量,从而实现state值得变化能够表征获得锁或者释放锁,AQS的其他所有方法则实现了排队和阻塞机制。AQS的子类可以维护其他状态字段,但只有子类使用getState、setState和compareAndSetState方法去原子更新这个state值才能追踪到同步状态的变化。

上面所说的FIFO等待队列其实就是CLH队列(Craig, Landin, Hagersten这三个人发明的数据结构,因此用他们名字命名),它在AQS内部使用双向链表队列+CAS原子锁实现,功能如下:

  • FIFO的设计,保证线程在阻塞队列中的公平性,也即先进去阻塞队列等待抢锁资源的线程,也将是最先被唤醒出队

  • 未成功拿到独占锁的线程们将通过自旋和CAS插入到队尾,显然是非阻塞设计,在短时间内能够实现无锁并发插入这些线程。

2、从构造方法解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

private static final long serialVersionUID = 7373984972572414691L;

/**
* Creates a new {@code AbstractQueuedSynchronizer} instance
* with initial synchronization state of zero.
*/
protected AbstractQueuedSynchronizer() { }

/**
* Wait queue node class.
*
# 这里就解释了CLH阻塞队列的设计细节
* <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
* Hagersten) lock queue. CLH locks are normally used for
* spinlocks. We instead use them for blocking synchronizers, but
* use the same basic tactic of holding some of the control
* information about a thread in the predecessor of its node. A
* "status" field in each node keeps track of whether a thread
* should block. A node is signalled when its predecessor
* releases. Each node of the queue otherwise serves as a
* specific-notification-style monitor holding a single waiting
* thread. The status field does NOT control whether threads are
* granted locks etc though. A thread may try to acquire if it is
* first in the queue. But being first does not guarantee success;
* it only gives the right to contend. So the currently released
* contender thread may need to rewait.
*
* <p>To enqueue into a CLH lock, you atomically splice it in as new
* tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
*
* <p>Insertion into a CLH queue requires only a single atomic
* operation on "tail", so there is a simple atomic point of
* demarcation from unqueued to queued. Similarly, dequeuing
* involves only updating the "head". However, it takes a bit
* more work for nodes to determine who their successors are,
* in part to deal with possible cancellation due to timeouts
* and interrupts.
*
* <p>The "prev" links (not used in original CLH locks), are mainly
* needed to handle cancellation. If a node is cancelled, its
* successor is (normally) relinked to a non-cancelled
* predecessor. For explanation of similar mechanics in the case
* of spin locks, see the papers by Scott and Scherer at
* http://www.cs.rochester.edu/u/scott/synchronization/
*
* <p>We also use "next" links to implement blocking mechanics.
* The thread id for each node is kept in its own node, so a
* predecessor signals the next node to wake up by traversing
* next link to determine which thread it is. Determination of
* successor must avoid races with newly queued nodes to set
* the "next" fields of their predecessors. This is solved
* when necessary by checking backwards from the atomically
* updated "tail" when a node's successor appears to be null.
* (Or, said differently, the next-links are an optimization
* so that we don't usually need a backward scan.)
*
* <p>Cancellation introduces some conservatism to the basic
* algorithms. Since we must poll for cancellation of other
* nodes, we can miss noticing whether a cancelled node is
* ahead or behind us. This is dealt with by always unparking
* successors upon cancellation, allowing them to stabilize on
* a new predecessor, unless we can identify an uncancelled
* predecessor who will carry this responsibility.
*
* <p>CLH queues need a dummy header node to get started. But
* we don't create them on construction, because it would be wasted
* effort if there is never contention. Instead, the node
* is constructed and head and tail pointers are set upon first
* contention.
*
* <p>Threads waiting on Conditions use the same nodes, but
* use an additional link. Conditions only need to link nodes
* in simple (non-concurrent) linked queues because they are
* only accessed when exclusively held. Upon await, a node is
* inserted into a condition queue. Upon signal, the node is
* transferred to the main queue. A special value of status
* field is used to mark which queue a node is on.
*
* <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
* Scherer and Michael Scott, along with members of JSR-166
* expert group, for helpful ideas, discussions, and critiques
* on the design of this class.
*/
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
// SHARED变量表示节点被标记为共享模式,可以看到是一个空的Node()对象,用于Semaphore这种共享模式的加锁工具
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
// EXCLUSIVE为null表示节点被标记为独占模式,用于类似ReentrantLock这种独占锁工具
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
// waitStatus变量值若为1,表示当前线程节点已经被取消排队(注意虽然被标记为取消状态,但还未出队)
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
// waitStatus变量值若为-1,表示当前线程节点的后驱线程节点需要被唤醒
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
// waitStatus变量值若为-2,表示当前线程节点处在条件等待当中
static final int CONDITION = -2;

// 用于共享模式的锁工具的连续唤醒操作设计,ReentrantLock用不上此设计,Semaphore这种工具可以用上。
static final int PROPAGATE = -3;

/**
* 以下是关于waitStatus变量取不同值时对应的场景说明
* waitStatus用来表示当前线程节点的状态,仅取以下5个值
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* SIGNAL的说明:当前线程节点的后驱节点发出SIGNAL唤醒的通知,当前线程需要在释放资源后或者自身被标记为取消状态后去唤醒挂在身后的后驱节点,为了避免无畏线程竞争,实现acquire功能的方法必须首先给出自己需要被通知,然后再重试“atomic acquire”,如果获取失败,则阻塞自己(而不是一直去CAS重试)
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* SIGNAL的说明:由于超时或中断,节点被取消。此类节点不会脱落此取消状态。取消节点的线程不会再次阻塞。
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* CONDITION的说明:节点当前位于“condition queue”,在被放到CLH队列之前,它不会用作同步队列节点,此时状态将设置为0。表示节点在等待队列上,当其他线程调用了Condition的signal方法后,CONDITION状态的节点将从等待队列转移到同步队列中,等待获取资源。
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* PROPAGATE的说明:应将releaseShared操作传播到其他节点,这是在doReleaseShared中设置的(仅针对头部节点),以确保releaseShared操作能够传播继续进行,即使其他操作已经介入。
* 换句话说:在共享模式下,前驱节点线程节点不仅要唤醒其后驱线程节点,同时也会唤醒后驱线程节点的后驱线程节点,类似一路唤醒下去。
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
* waitStatus采用这几个值得设计是为了方便使用,非负值表示节点不需要signal,所以,大多数代码不需要检查特定的值,只需要检查符号即可。
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
* waitStatus状态值,对于正常同步节点,它会被初始化为0,而对于condition nodes,则会被置为“CONDITION”,一般使用CAS去更新waitStatus的值。
*/
volatile int waitStatus;

/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
/*
结合前面的prev指针用于处理线程被取消的情形。
The "prev" links (not used in original CLH locks), are mainly
needed to handle cancellation. If a node is cancelled, its
successor is (normally) relinked to a non-cancelled
predecessor.
non-callcelled node -> a node ->successor
non-callcelled node ->successor
*/

volatile Node prev;

/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
/*
next指针被设计为“We also use "next" links to implement blocking mechanics”。 已经被取消的节点会将next指针指向自己而不是把next指向null,这一点设计很像SkipList里面的remove设计逻辑这种设计有利于GC。
*/
volatile Node next;

/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
/*
用于指向获取锁的线程,可以看到其实Node节点就是包装需要获取锁的线程,因此也可以称为线程节点。
*/
volatile Thread thread;

/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/

/*
ReentrantLock用不到此属性,CountDownLatch、CyclicBarrier等这种共享模式锁工具可以用到此属性。
nextWaiter特殊标记,Node在CLH队列时,nextWaiter表示共享式或独占式标记,也即nextWaiter=SHARED;Node在条件队列时,nextWaiter表示下个Node节点指针
*/
Node nextWaiter;

/**
* Returns true if node is waiting in shared mode.
判断线程节点是否处于共享模式
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}

// 没有成功获取资源(例如锁)的线程就会使用此包装为一个Node,此节点会被addWaiter方法使用,也即将节点入队(此队列称为wait queue)
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

其他属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/

// 阻塞队列(双向链表)的头结点,惰性创建,仅能在setHead方法能够对head指向做调整
private transient volatile Node head;

/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
// 阻塞队列(双向链表)的尾结点,惰性创建,仅能在enq方法能够对tail指向做调整
private transient volatile Node tail;


/**
* The synchronization state.
*/
// 这个state变量就是AQS子类需要去更爱的,用于控制资源
private volatile int state;


/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}

3、关于线程是如何被”Queuing” 入队操作

AQS本身并不会触发线程去入队的操作,那么是在什么时机才会有这个操作发生?这里不妨还是以

ReentrantLock使用为例:根据基本的demo用法,如下

1
2
3
4
5
6
7
8
9
10
11
public static ReentrantLock lock=new ReentrantLock();

public static void main(String[] args) {
lock.lock();
try{
// 创建新线程执行任务
} catch ( Exception e){
}finally {
lock.unlock();
}
}

从构造器可以看出,默认创建的是非公平的同步锁,具体逻辑由NonfairSync类实现

1
2
3
public ReentrantLock() {
sync = new NonfairSync();
}
3.1 NonfairSync

Sync object for non-fair locks,也即NonfairSync,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
// 线程执行lock.lock()也即对应以下逻辑
final void lock() {
// 这个动词短语immediate barge用得很贴切:线程可以立即"乱闯",也即新线程使用lock.lock()时,可以不排队直接参加竞争资源,如何竞争? 只要将state状态CAS更新为1则抢占成功,这就是所谓的非公平锁。
if (compareAndSetState(0, 1))
//该线程成功竞争后,将自己独占线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 新来的线程compareAndSetState(0, 1)失败,那么进入逻辑
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
3.2 acquire(1)

acquire(1) 是AQS内部的方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
/*
从该方法可以看出:线程在NonfairSync里面抢占更新state失败,也还有机会再来去申请资源,

也即上面的tryAcquire,如果成功,那么就返回true,如果线程还是tryAcquire失败,那么此时线程就会被排队:acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ,然后进入selfInterrupt()
*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

由于AQS的tryAcquire需要子类实现具体的逻辑,也即ReentrantLock内部的tryAcquire,如下:

1
2
3
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
3.3 nonfairTryAcquire

ReentrantLock内部的tryAcquire内部是由ReentrantLock内部的Sync类的nonfairTryAcquire方法实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
// NonfairSync类的lock()方法实现了该抽象方法
abstract void lock();

/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
// 这里就是AQS的tryAcquire在子类Sync的具体实现逻辑
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取同步状态state值
int c = getState();
//①如果同步状态此时为0,说明没有其他线程在竞争,那么当前线程果断使用CAS尝试将state从0更新为1,这里也再次证明新线程使用lock.lock()第一次if (compareAndSetState(0, 1))不成功,在tryAcquire里面还有机会再尝试一次compareAndSetState,如果本次成功CAS那么将自己设为独占线程,获得锁资源,返回true。
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
/*
② 如果当前线程就是之前设置的独占线程,说明当前线程再次“重入”获取锁,那么只要重入次数最大值只要不溢出,那么就可以让当前线程再次“重入”并更新同步状态state的值,这就是ReentrantLock可重入锁设计的原理,最多可以重入多少次? 首先state是int类型,因此最大值为:
max=(1<<31)-1
所以才会有以下的用法:
double tooLarge=Math.pow(2,31);
for (int i = 1; i <= tooLarge; i++) {
lock.lock(); // 同一线程,重入加锁到最大值,其实内部就是对state进行CAS累加,也即setState(nextc),直到state值累加到超过最大值则抛出"Maximum lock count exceeded"
}
for (int i = 1; i <= 5; i++) {
lock.unlock();
}

*/
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
/*③ 若无法满足①、②条件,说明当前线程面临激烈锁资源竞争,那么只能返回false,接着就被安排去入队,也即AQS里面的
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
*/
return false;
}

到此我们已经找出了线程从开始使用lock.lock() 到被Queuing入队的时机,因此接下里入队操作就是核心逻辑了

4、AQS 的acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

这里接着第3章节的内容,此时线程需要被入队,执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) ,这里的arg就是acquire(1)里面的1,表示需要对state加1或者需要获取1个锁资源

4.1 addWaiter(Node.EXCLUSIVE)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
/*对于独占模式来说,addWaiter的功能就是将当前线程包装为线程节点,并使用CAS将自己添加到阻塞队列的尾部,分两个步骤:
1、优先快速尝试将自己通过CAS加到阻塞队列尾部,如果入队失败就进入2
2、使用enq(node)确保自己最后可以加入到阻塞队列尾部(或者入队成功)
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//1、 node<=>node<=>pred 变成 node<=>node<=>pred<=>new Node
if (pred != null) {
// 新线程节点的前驱节点指针指向pred,如果pred节点没有被其他线程更改,那么CAS成功就会将pred的next指针指向新线程节点,从而使得新线程节点成功入队。
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
//返回线程节点给到外部调用方,也即acquireQueued方法
return node;
}
}
//2、enq内部使用自旋(for循环)保证线程一定能入队
enq(node);
//返回线程节点给到外部调用方,也即acquireQueued方法
return node;
}
4.2 enq(node)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
// 将线程节点插入双向链表(阻塞队列)尾部,如果链表为空,则需要先初始化后再插入节点
private Node enq(final Node node) {
// 自旋,保证线程一定能在某次循环中CAS入队成功
for (;;) {
Node t = tail;
// 如果链表尾节点为空,说明阻塞队列还未创建,因此需要初始化,这时新建一个非线程节点放在链表头部。
if (t == null) { // Must initialize
/* 这里非常关键:双向链表的头节点不是线程节点,而是一个无参构造方法的节点,这里可以称为辅助节点,就像ConcurrentSkipList底层的数据链表头节点(BASE_HEADER)也是辅助节点的设计:
head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),null, null, 1);
*/
if (compareAndSetHead(new Node()))
tail = head;
// 如果双向链表已经存在,则线程节点尝试使用CAS将自己添加到队列尾部
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
// 返回线程节点给到外部调用方,也即acquireQueued方法
return t;
}
}
}
}

未能获取的锁资源(未能对state更新成功的)新线程通过addWaiter或者enq方法让自己添加到阻塞队列尾部,或者这样理解:在addWaiter里面首先尝试,也就代码里面的注释所表达的意思:Try the fast path of enq; backup to full enq on failure

接下里还需要做什么?线程加入阻塞队列以后,是马上把自己阻塞起来,还是刚好位于队列第一个线程节点位置然后马上尝试获取锁资源呢?这就是acquireQueued(addWaiter(Node.EXCLUSIVE),1))核心设计

4.3 acquireQueued(addWaiter(Node.EXCLUSIVE),1))

从上面4.1和4.2可知,阻塞队列里面放的都是未能成功获取锁资源的线程节点(除了头节点辅助节点),这些节点已经入队,但总不能放着它们在队列里面就不管了,接下来还要做一件事情:

从队列取出线程节点,让它重新去尝试获取已经被外面活动线程释放的锁资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
已在队列中的线程以独占且不可中断的模式获取锁资源
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
// failed表示是否成功获得锁资源
boolean failed = true;
try {
// 表示线程是否被中断
boolean interrupted = false;
for (;;) {
// 核心逻辑:取出当前入队线程节点的前驱节点
final Node p = node.predecessor();
/*
(注意区分阻塞队列的head节点(辅助节点又称哨兵节点)和阻塞队列的第一个线程节点)
如果当前线程节点前驱节点恰好是head节点,说明当前线程节点就是阻塞队列的第一个线程节点,当然有资格且第一个先去尝试获取锁资源,如下结构
head(辅助节点) <-> node(第一个线程节点) -> null
*/
if (p == head && tryAcquire(arg)) {
// 这里的tryAcquire表示你(阻塞队列的第一个线程节点)有资格去抢锁资源,但也要看看能否抢赢外界线程
/* 如果线程节点获取资源成功,那么就可以做出队操作,这里出队很巧妙:
经过setHead之后,线程节点node.thread不再指向线程,而是变成了辅助节点而且成为阻塞队列的新head,原head节点通过p.next = null 完成GC,这就是为何阻塞队列将head节点设计辅助节点new Node()的原因:第一个线程节点获取锁资源后需要出队以及方便回收设计
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
*/
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/*
两种情况会进入if逻辑:
①如果当前节点是第一个线程节点或者说其前驱节点是head节点,但在tryAcquire(arg)竞争失败(因为外面有新的线程在lock.lock()里面CAS成功)
②当前线程节点不是第一个线程节点
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
4.4 shouldParkAfterFailedAcquire(p, node)

接4.3内容,这部分内容可以解析阻塞队列为何被称为“阻塞”队列的原因,shouldParkAfterFailedAcquire从方法名字也可以看出其设计目的:既然第一个线程节点获取锁资源失败,那么就不能一直无限去Acquire,而是在阻塞队列是自己变成“阻塞状态”去等待,这样就不会白白消耗cpu。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
/*非常重要的源码注释:根据4.3,如果线程节点未能p == head && tryAcquire(arg),则检查并更新该节点里面的Node.SIGNAL状态值。如果线程节点应该被阻塞,则返回true。
为了理解以下设计逻辑,这里不妨先假设当前链表结构为:
head(waitStatus=0) <-> node(waitStatus=0) -> null
那么这里pred显然指向head(waitStatus=0) ,node指向第一个线程节点node(waitStatus=0)
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// waitStatus也即线程节点的等待状态,新线程节点创建时waitStatus默认为0,
int ws = pred.waitStatus;
// ①waitStatus变量值若为Node.SIGNAL也即等于-1,说明对于pred(-1)<->node(ws=0)来说,pred节点其实已经是阻塞状态了,那么node作为pred的后驱节点也肯定要被阻塞,因此返回true后,在parkAndCheckInterrupt里面node就会被阻塞起来
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
/* ② 如果waitStatus的值>0,也即waitStatus=Node.CANCELLED=1,说明pred线程节点取消排队,考察以下阻塞队列结构
head(ws=-1)<->node1(ws=-1)<->node2(ws=1)<->node3(ws=0)-> null
假设pred=node2,node=node3,显然node2已经取消排队,那么node3不能跟在它后面,因此需要将node2出队,也即node.prev = pred = pred.prev,有:
head(ws=-1)<->node1(ws=-1)<->node3(ws=0)-> null

如果链表中不止一个node2取消排队,还有很多节点也是处于“取消排队状态”,那么就使用前向遍历,直到找到有一个前驱线程节点不是取消排队的节点(说明此节点状态可靠),然后把它作为node3的前驱节点。
*/
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
//将原来写法 node.prev = pred = pred.prev拆分为下面写法
pred=pred.prev;
node.prev = pred
} while (pred.waitStatus > 0);
// 经过前向遍历后,所有取消状态的节点都被出队,那么到这里就可以找到一个可靠状态的pred前驱节点,将要处理的node挂在它后面即可
pred.next = node;
} else {
/*注意这里的注释,非常重要:
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//③ 执行流来到这里说明前驱节点waitStatus要么是0要么是PROPAGATE,此时需要将待处理的node节点前驱节点pred的waitStatus设为SIGNAL值,以表示我作为pred的后驱节点需要等待被唤醒,但还没进入park阻塞状态。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

流程:

1、构造器使用非公平同步类对象new NonfairSync()

2、lock.lock()调用1定义的lock()方法,尝试对父类AQS的state变量使用CAS从0更新为1,成功就将自己设为独占锁,失败就进入3

3、使用acquire(1)继续去竞争资源,其内部调用AQS的acquire方法,

4、AQS的acquire方法是模板方法,因此会调用子类的tryAcquire(arg)再次尝试更新state

5、子类tryAcquire里面的nonfairTryAcquire(1),再次读取state,如果为0就尝试CAS加1,如果成功就跟2类似,失败CAS且当前线程是独占线程,就可以再次对state加1

6、否则AQS的tryAcquire(1)返回false,表示当前线程两次获取锁资源失败,那么就开始7

7、将当前线程包装为线程节点,将在addWaiter尝试加入双向链表:如果链表已经存在,快速执行CASTail,成功就返回,失败则使用enq使用for循环入队:第一次循环判断是否需要初始化双向链表,如果需要则新建一个辅助节点头结点,第二次循环才是CASTail,由于使用自旋,因此enq一定可以使得线程节点加入双向链表尾部。

8、在acquireQueued中,刚入队的线程节点如果恰好又是第一个线程节点就会马上尝试tryAcquire(1),如果成功获取锁资源,就会将setHead操作,并将当前线程节点设为辅助节点(也即线程出队),如果tryAcquire(1)失败来到9

9、使用shouldParkAfterFailedAcquire(pred, node) ,第一个线程节点尝试tryAcquire(1)失败后不能无限循环再获取资源,因此将它的前驱节点设为SIGNAL状态(要求:pred == node.prev 读一致性),表示当前节点等待阻塞中,需要被唤醒

10、由于当前节点的前驱节点可能处于“取消排队状态”,因此当前节点不能将此类节点作为自己的前驱节点,因此需要不断前向遍历,直到找到waitStatus<=0的前驱节点 pred(waitStatus<=0)(找到一个可以“挂靠”的、非取消状态的前驱节点)由于使用

node.prev = pred = pred.prev 因此只要处于“取消排队状态”的前驱节点都会被删掉而不在链表中。

11、只要将前驱节点的ws设为Node.SIGNAL,那么就可以返回true,来到parkAndCheckInterrupt

1
2
3
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;

线程节点就可以自己阻塞了

1
2
3
4
5
6
7
8
9
10
11

/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

12、回到acquire(1)

1
2
3
4
5
6
7
8
9
10
11

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

以上流程完成了 线程入队、尝试获取锁资源、阻塞自己变成等待唤醒,接下来

需要了解清楚,在线程节点执行阻塞自己的时候使用shouldParkAfterFailedAcquire,里面能够向前遍历阻塞队列并会有side-effect作用:沿途清除“处在取消状态的线程节点”,那么为何会出现又取消状态的节点呢?以及在什么时机变成取消状态节点?

4.5 acquireQueued里面的cancelAcquire(node)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

从队列中释放节点的疑虑打消了,那么又有新问题了:

  • shouldParkAfterFailedAcquire中取消节点是怎么生成的呢?什么时候会把一个节点的waitStatus设置为-1?
  • 是在什么时间释放节点通知到被挂起的线程呢?

通过cancelAcquire方法,将Node的状态标记为CANCELLED。接下来,我们逐行来分析这个方法的原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Utilities for various versions of acquire

/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
private void cancelAcquire(Node node) {
/* cancelAcquire流程将结合以下链表结构作为解释,其中node4就是当前要被设为取消排队的线程节点,如果node5是null,那么说明node4是tail节点,如果node5是线程节点,说明node4不在链表尾部:
head<->node1(非取消)<->node2(CANCELLED)<->node3(CANCELLED)<->node4(非取消)<->node5
*/
// Ignore if node doesn't exist
// 准备执行时发现node4已结被回收,则可以直接返回。
if (node == null)
return;

// 既然当前节点取消排队,那么node.thread不再指向线程,转而执行null,用于GC
node.thread = null;

// Skip cancelled predecessors

/*

例如按上面的链表节点结构假设,node4被设为CANCELLED状态前,必须先找到一个非CANCELLED的前驱节点,以便node4取消排队后,其后驱节点node5才可以挂靠在一个“可靠的前驱节点”pred,显然这个pred就是node1节点:
也即:head<->node1(非取消)<->node4(非取消)<->node5
而node2和node3线程节点都是取消排队状态,因此需要跳过它们:Skip cancelled predecessors
*/
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// pred.next=node ,pred=node.prev
Node predNext = pred.next;

// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
/*

继续上面已经跳过cancelled predecessors,有:
head<->node1(非取消)<->node4(非取消)<->node5,此时可以把node4设为取消状态
也即:
head<->node1(非取消)<->node4(CANCELLED)<->node5
做这个操作有个收益: other Nodes can skip past us,也就是说其他当外面有新线程入队需要做shouldParkAfterFailedAcquire等操作时,都会忽略这个已经是CANCELLED状态的node4节点,这样cancelAcquire操作就不会被外界线程节点干扰到。
acquireQueued和shouldParkAfterFailedAcquire回答了准备设置“CANCELLED”线程节点时机,而cancelAcquire的设计回答在什么条件下将“准备取消节点”设为“CANCELLED”
*/
node.waitStatus = Node.CANCELLED;
/*

对于②链表结构:head<->node1(非取消)<->node4(CANCELLED)<->node5,如果node4本身就是tail节点(此时node5其实是null),那么将node4进行CAS设为null后,链表结构变为:
head<->node1(非取消)->null,也即注释里面说到的:If we are the tail, remove ourselves.
*/
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
/*

根据③得到的链表结构且node4不是tail节点(此时node5是线程节点),
有:head<->node1(非取消)<->node4(CANCELLED)<->node5
if条件1:如果pred不是头节点而且指向具体线程(也即node1不是头节点)且pred节点也即node1.ws是SIGNAL,那么则进入if
if条件2:如果pred不是头节点而且指向具体线程(也即node1不是头节点)且pred节点也即node1.ws<=0且经过CAS将node1.ws设为SIGNAL,那么则进入if。
条件1意思是说如果node1.ws已经是SIGNAL,那么直接将node5(不是null且未取消)设为node1的后驱节点,这样就能正确的删除node4节点,而且能正确的将后面的线程节点挂在node1后面
head<->node1(ws=SIGNAL)<->node5(ws非取消)
条件2的意思是说如果node1.ws是未取消状态值那么就用CAS将node1.ws设为SIGNAL,然后再把node5(要求不是null且未取消)挂在node1后面
*/
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
// 要求即将被删除的node4节点的后驱节点node5既不是null且未取消状态才能将node5挂在node4前驱节点pred的后面
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
/*

若不满足④说明,node4的前驱节点就是head节点,也即:
head<->node4(CANCELLED)<->node5
那么,node4很快会被删除,那么node5(如果node5不是null也不是非取消节点)就作为阻塞队列的第一个线程节点当然可以优先出队去获外面其他线程释放的锁资源
因此ndoe4删除前,需要先唤醒后驱节点node5,接下来就来到4.6下面的章节逻辑
*/
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

4.6unparkSuccessor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
// 接上面4.5的⑤内容,如果node4有后驱节点,那么就node4需要唤醒这个后驱节点(假设后驱节点不是null也不是取消状态)
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 由于接上面4.5流程,node4已经设为CANCELLED,那么以下if逻辑会被跳过
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
/*
node4的后驱节点一定是个正常的线程节点吗? node4.next可能是以下几种情况:
① node4->null
② head<->node4(CANCELLED)<->node5(CANCELLED)<->node6(CANCELLED)<->node7(非取消状态)....
③ head<->node4(CANCELLED)<->node5(非取消)....
*/
Node s = node.next; // 先取出node4的后驱节点
// 针对后驱节点为null或者后驱节点是取消状态的情况,如①、②链表结构所示
if (s == null || s.waitStatus > 0) {
s = null;
// 从阻塞链表(阻塞队列)尾部向前遍历,找出一个waitStatus是不取消状态的的后驱节点,这个节点就是node4要唤醒的正常状态节点(总不能让node4唤醒一个已经取消排队或者为null的后驱节点吧)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 针对情况③ node4的直接后驱节点就是正常状态节点或者经过"traverse backwards"找到的the actual non-cancelled successor,唤醒之
if (s != null)
LockSupport.unpark(s.thread);
}

以上完成reentrantlock的加锁lock.lcok() 过程

unparkSuccessor为何采用从链表尾部向前遍历,而不是正序遍历去找一个the actual non-cancelled successor ?

首先在AQS的开头源代码注释里面有提到:

The “prev” links (not used in original CLH locks), are mainly needed to handle cancellation. If a node is cancelled, its successor is (normally) relinked to a non-cancelled predecessor.

prev前驱指针主要用于解决线程节点取消排队的情况(注意CLH locks并没有使用prev前驱指针),如果一个线程节点已经取消,那么它的后驱节点就会重新链接到一个非取消状态的前驱节点

例如 现有阻塞队列结构:node1(非取消)<—>node2(取消)<—>node3(非取消)<—>node4(非取消),假设现在要将node3设为取消节点,最终队列肯定要变成:node1(非取消)<—>node4(非取消),过程就是按照cancelAcquire的流程:

① 跳过node3“已经是取消状态”的节点,也即node2

② 此时node1就是node3的后驱节点node4需要挂靠的前驱节点

③ 根据以下逻辑compareAndSetNext(pred, predNext, next),也即compareAndSetNext(pred=node1, predNext=node3, next=node4);,可以推出:在产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,因此只能利用prev指针从后往前遍历才能够遍历完全部的Node。

1
2
3
4
5
6
7
8
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
// 要求即将被删除的node4节点的后驱节点node5既不是null且未取消状态才能将node5挂在node4前驱节点pred的后面
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
4.7关于阻塞队列本身的其他方法: Queue inspection methods
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128

/**
* Queries whether any threads are waiting to acquire. Note that
* because cancellations due to interrupts and timeouts may occur
* at any time, a {@code true} return does not guarantee that any
* other thread will ever acquire.
*
* <p>In this implementation, this operation returns in
* constant time.
*
* @return {@code true} if there may be other threads waiting to acquire
*/
// 返回队列里面是否有正在等待获取锁资源的线程节点,返回true也不代表存在“正在等待获取锁资源的线程节点”,因为“线程取消排队或者timeout”的情况随时可以发生,这会导致head != tail有二义性。
public final boolean hasQueuedThreads() {
return head != tail;
}

/**
* Queries whether any threads have ever contended to acquire this
* synchronizer; that is if an acquire method has ever blocked.
*
* <p>In this implementation, this operation returns in
* constant time.
*
* @return {@code true} if there has ever been contention
*/
// 返回是否曾经有线程出现过竞争同步器以获取锁资源,或者说acquire方法被阻塞过。简单的说:若head不为null,说明有线程节点执行enq方法里面的`compareAndSetHead(new Node())`,也即有竞争
public final boolean hasContended() {
return head != null;
}

/**
* Returns the first (longest-waiting) thread in the queue, or
* {@code null} if no threads are currently queued.
*
* <p>In this implementation, this operation normally returns in
* constant time, but may iterate upon contention if other threads are
* concurrently modifying the queue.
*
* @return the first (longest-waiting) thread in the queue, or
* {@code null} if no threads are currently queued
*/
// 返回阻塞队列中的第一个(等待时间最长的)线程节点,注意这可不是返回head这个头结点(辅助节点)。如果当前没有线程节点排队,则为null
public final Thread getFirstQueuedThread() {
// handle only fast path, else relay
// 先快速判断。head==tail说明阻塞队列还未创建,直接返回null即可,否则使用fullGetFirstQueuedThread去获取第一个线程节点。这种“投机性先实施A失败再实施B”的设计其实在ConcurrentHashMap里面的addCount:先尝试对baseCount累加失败则使用fullAddCount确保能够累加计数成功。
return (head == tail) ? null : fullGetFirstQueuedThread();
}

/**
* Version of getFirstQueuedThread called when fastpath fails
*/
private Thread fullGetFirstQueuedThread() {
/*
* The first node is normally head.next. Try to get its
* thread field, ensuring consistent reads: If thread
* field is nulled out or s.prev is no longer head, then
* some other thread(s) concurrently performed setHead in
* between some of our reads. We try this twice before
* resorting to traversal.
*/
// h:head节点的临时变量,s:head节点的后驱节点(successor)临时变量
Node h, s;
Thread st;// 后驱节点指向的线程:successorthread

/*
此设计非常巧妙:
正常来说,阻塞队列的第一个线程节点就是我们要找的“first queued thread”节点,那么为了确保获取该节点刚开始读的线程节点,必须要保证前后一致性读:不一致性读是如何出现的呢? 考察这种情况,第一次读node.thread不为空但第二读node.thread已经是null,或者第一次读s.prev是原来的head节点,第二次读s.prev已经不是原来的head节点,这是因为在这两次读的过程中,有其他线程节点正在并发执行setHead操作,那么如何快速实现一致性读呢? 直接实施两次尝试即可!
if ( (第一次尝试)|| (第二次尝试))
第一次尝试:要求head节点不是null且后驱节点s不为null且s.prev还是指向原head节点且后驱节点的thread引用不为null
第二次尝试: 要求head节点不是null且后驱节点s不为null且s.prev还是指向原head节点且后驱节点的thread引用不为null
第一次尝试成功说明没有其他线程干扰确实是一致性读,那么head的后驱节点s就是要返回的first queued thread
第一次尝试失败说明有其他线程正在进行setHead操作,那么需要再投机性的进行第二次尝试。
如果第一次尝试失败、第二次尝试也失败,那么只能通过遍历阻塞队列的方式去找`first queued thread`
*/
if (((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null) ||
((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null))
return st;

/*
* Head's next field might not have been set yet, or may have
* been unset after setHead. So we must check to see if tail
* is actually first node. If not, we continue on, safely
* traversing from tail back to head to find first,
* guaranteeing termination.
*/

/*
因为head节点next指针可能还未开始设置,又或者有线程在执行setHead过程中head的next还未设置好,因此直接无法利用next指针从链表头部往后遍历。
所以需要这么做:先检查tail节点不为空时且tail节点不是head节点,说明此时有阻塞队列且有线程节点队列里面,这样就可以safely从链表尾部开始遍历,直到t指针指向head节点时说明t的前驱节点已经来到head位置即可结束遍历,
*/
Node t = tail;
Thread firstThread = null;
while (t != null && t != head) {
Thread tt = t.thread;
// firstThread不断更新指向途中经过的“非取消”的线程节点的线程引用
// 或者说跳过“取消排队”的线程节点
if (tt != null)
firstThread = tt;
t = t.prev;
}
return firstThread;
}

/**

* Returns true if the given thread is currently queued.
*
* <p>This implementation traverses the queue to determine
* presence of the given thread.
*
* @param thread the thread
* @return {@code true} if the given thread is on the queue
* @throws NullPointerException if the thread is null
*/
// 判断给定线程是否在阻塞队列里面
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
// 从链表尾部开始遍历(safely traversing from tail back to head),只要遍历节点的thread==给定的thread引用,返回true。
for (Node p = tail; p != null; p = p.prev)
if (p.thread == thread)
return true;
return false;
}
// 还有其他共享模式下的队列方法,这里不在讨论范围。

5、解锁过程

其实只要掌握了加锁lock.lcok() 过程,也即renntrantlock和AQS之间的交互,那么解锁过程则相对节点

调用者:

1
lock.unlock() 

reentrantlock内部的unlock`:

1
2
3
 public void unlock() {
sync.release(1); // 加锁每次对state加1,释放锁则对state减1
}

AQS内部的release方法:这里说明真正释放锁的所有操作都是在AQS内部完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
// 这里的tryRelease就是子类Sync具体要实现的方法:将state值进行CAS减1操作
// (1)如果子类的tryRelease更新state同步状态且state值为0,就会返回true,那么说明外面有线程能够释放锁资源,此时就需要将阻塞队列里面的第一个线程节点唤醒
// (2) 如果子类的tryRelease更新state同步状态成功且state值不为0,则会返回false,说明当前有同一线程多次重入的锁并准备多次释放锁
if (tryRelease(arg)) {
Node h = head;
/*
取出头节点(辅助节点),如果阻塞队列存在或者头节点的后驱节点非取消非初始化,则该后驱节点需要被h唤醒
①对于h=null,说明阻塞队列已经不存在或者还未创建,或者刚创建Head=null,但是还未有线程节点入队,显然这些情况都不需要再做“唤醒操作”
②对于h不是null,说明头节点已经是一个new Node()辅助节点,此时如果h.waitStatus == 0,说明阻塞队列刚完成初始化,仅有一个head辅助节点,单还未有其他线程节点入队,因此也不需要再做“唤醒操作”
③对于h不是null,h.waitStatus != 0取值有哪些呢,因为reentrant是独占模式,waitStatus的取值分别是1,-1,而-2,-3是共享模式不在讨论范围,因此当h.waitStatus=-1,那么恰好head的后驱节点就是需要唤醒的节点当然使用unparkSuccessor逻辑,而h.waitStatus=1,说明head节点的后驱节点是一个“取消排队”的节点,那么需要借助unparkSuccessor方面里面找到一个真正的非取消状态节点(the actual non-cancelled successor)作为要唤醒的节点
*/
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

AQS内部的release方法中的tryRelease调用子类的Sysc的tryRelease

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected final boolean tryRelease(int releases) {
// 读取最新的state值并扣减1
int c = getState() - releases;
// 因为是独占模式,要求必须持有锁资源的线程才能去释放锁资源
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 表示是否成功释放资源
boolean free = false;
// 如果c=0,说明当前state值已经是0,说明没有其他线程占用锁资源,将独占线程引用设为null即可,并将释放资源标志设为true
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 将state更新为扣减后的c值:state=getState() - releases
setState(c);
return free; // true或者false将影响AQS的release流程
}

6、唤醒线程节点后的执行流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 这里是线程节点在acquireQueued里面被阻塞自己的地方,
// 唤醒后,将会执行下面这一句:返回线程是否中断状态
return Thread.interrupted();
}


// 关于Thread.interrupted();

/**
* Tests whether the current thread has been interrupted. The
* <i>interrupted status</i> of the thread is cleared by this method. In
* other words, if this method were to be called twice in succession, the
* second call would return false (unless the current thread were
* interrupted again, after the first call had cleared its interrupted
* status and before the second call had examined it).
*
* <p>A thread interruption ignored because a thread was not alive
* at the time of the interrupt will be reflected by this method
* returning false.
*
* @return <code>true</code> if the current thread has been interrupted;
* <code>false</code> otherwise.
* @see #isInterrupted()
* @revised 6.0
*/
// 返回当前线程是否被中断过,线程第一次调用将返回true,表示被中断,若线程两次调用,则第二次返回没被中断过,所以要注意其源码注释说明:after the first call had cleared its interrupted
* status and before the second call had examined it
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}

这里因为线程被唤醒,显然之前在阻塞队列等待的过程就等于“被中断过”,故parkAndCheckInterrupt返回为true,将继续acquireQueued以下的for流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; // 线程唤醒后parkAndCheckInterrupt返回true,回到for循环,当线程能够获取锁资源成功后,这个线程中断状态interrupted = true就会acquireQueued返回外部的acquire调用者。
}
} finally {
if (failed)
cancelAcquire(node);
}
}

经过以上流程,acquireQueued返回true,回到调用acquireQueued的流程 lock.lock()-->acquire(1)-->acquireQueued-->selfInterrupt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

那么将继续执行selfInterrupt逻辑:也即线程进入阻塞队列,被阻塞然后唤醒后(自己被中断过)拿到锁,最后线程自己中断一下,为何拿到锁后还要设计线程中断自己?这是什么逻辑?

1
2
3
4
5
6
7
8

/**
* Convenience method to interrupt current thread.
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

解释:

(1)先考察一个线程使用lock.lock且在compareAndSetState就直接拿到锁资源的情况(意味着它没有进入阻塞队列),显然整个过程线程自己没有被LockSupport.park(this)阻塞,因此这种线程无需执行selfInterrupt 操作,这样外界有其他逻辑检查这个线程的中断状态时就会知道它未中断过。

(2)由于Doug Lea采用这样的设计:Acquires in exclusive uninterruptible mode for thread already in queue(已在阻塞队列中的线程节点以独占且不可中断(不响应中断)的模式去等待获取锁资源)。只要有一个线程进入阻塞队列然后阻塞自己最后被唤醒且拿到锁的过程显然线程自己是有“中断过”的情况,因此拿到锁后,需要给自己补充一个“中断过”的标记,这样外界有其他逻辑检查这个线程的中断状态时就会知道它曾经中断过。

7、reentrantlock的公平锁模式

有了以上基础,那么分析公平锁则显得简单一些。内部使用的是 FairSync内部同步器实现相关逻辑

1
public static ReentrantLock lock=new ReentrantLock(true);  // 构造器指定公平锁方式
1
2
3
4
5
6
7
8
9
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

FairSync的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
// 以下是体现公平锁的设计思想:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//若state同步状态变量此时为0,那么说明线程可以直接去竞争锁资源
if (c == 0) {
/* ① 请求独占锁前先问问阻塞队列有无阻塞线程正在等待锁(这就体现公平的原则,后者当然要等先到者)
如果阻塞队列有正在排队的线程节点,那么当前新来的线程就无法设置state值,只能去到③(假设非重入线程) 返回false,最终可能是需要进入acquireQueued逻辑把自己入队处理,因此这里的逻辑正是体现了获取锁资源的“公平性”
*/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// ② 如果当前线程就是持有锁的线程,当然可以让它多次重入,只要不超过重入次数最大值即可。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// ③
return false;
}
}

加锁请求锁资源为acquire(1)是AQS的内部的acquire方法,再调用子类的tryAcquire

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

因此完整过程是:

1
lock.lock() --> lock内部的acquire(1) --> FairSync内部的tryAcquire--> 查询阻塞队列是否有线程节点正在排队 --> 如果tryAcquire失败则进入AQS的acquireQueued内部逻辑

hasQueuedPredecessors的设计逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

/**
Queries whether any threads have been waiting to acquire longer than the current thread.
查询是否有线程节点的等待获取锁资源时间长于当前线程。
或者用以下表达式也可以达到相同目的,但是hasQueuedPredecessors比它更高效
An invocation of this method is equivalent to (but may be more efficient than):
获取第一个线程节点 != 当前节点且阻塞队列不为空,则返回true,说明当前节点排还不能马上去抢锁资源需要排在阻塞队列的线程节点之后
如果阻塞队列为空,则返回false,当前节点可以马上去获取锁资源
getFirstQueuedThread() != Thread.currentThread() && hasQueuedThreads()

由于中断或者timeout导致线程节点的“取消排队”的情况在任何时候都可以发生,因此返回true也不能保证其他线程就是比当前线程更早申请锁资源
同理,返回false也不能说明现在阻塞队列是空,因为return false的瞬间可能会有其他线程成功竞争到进入排队
Note that because cancellations due to interrupts and
timeouts may occur at any time, a {@code true} return does not
guarantee that some other thread will acquire before the current
thread. Likewise, it is possible for another thread to win a
race to enqueue after this method has returned {@code false},
due to the queue being empty.

hasQueuedPredecessors方法用于公平同步器使用以避免线程之间的碰撞竞争。
This method is designed to be used by a fair synchronizer to avoid barging. Such a synchronizer's tryAcquire method should return false, and its tryAcquireShared method should return a negative value, if this method returns true (unless this is a reentrant acquire). For example, the tryAcquire method for a fair, reentrant, exclusive mode synchronizer might look like this:
公平的、可重入的、独占模式的同步器内部定义的tryAcquire应该是以下流程:
protected boolean tryAcquire(int arg) {
if (isHeldExclusively()) {
// 表示同一线程重入获取锁资源,只需要增加state值即可
// A reentrant acquire; increment hold count
return true;
} else if (hasQueuedPredecessors()) {
//如果阻塞队列里面有线程节点那么当前线程不能马上acquire到锁资源,要等其他线程节点,因此返回false
return false;
} else {
// 其他情况说明锁资源没有其他线程竞争,当前线程节点可以直接去acquire
// try to acquire normally
}
}
以上的设计流程就是Reentrantlock里面公平锁FairSync的tryAcquire实现流程

(1)如果有一个排队线程在当前线程之前则返回true,例如公平锁的FairSync,外面新来的当前线程使用tryAcquire则需要判断阻塞队列是否有阻塞线程,如果有,那么当前线程就没办法马上拿到锁,只能等,这就体现公平原则:先来先得,等得越久的那个线程节点优先获取锁资源
(2)如果当前线程恰好位于第一个线程节点或者阻塞队列没有线程节点,则返回false
true if there is a queued thread preceding the current thread, and false if the current thread is at the head of the queue or the queue is empty
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.

Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 返回true条件:
// 情况1:head辅助节点已经完成初始化也即:compareAndSetHead(new Node()) 且 head的后驱节点第一个线程节点是null
// 情况2:head辅助节点已经完成初始化也即:compareAndSetHead(new Node()) 且 head的后驱节点第一个线程节点不是当前节点
return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}

8、到此可以总结公平和非公平的锁的区别

非公平:直接CAS竞争state这个锁资源,不需要询问阻塞队列是否有线程节点正在等待获取锁资源

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

公平:当state为0时,需要询问阻塞队列是否有线程节点正在等待获取锁资源,如果有则无法马上获取锁资源,需要等待

1
2
3
4
5
6
7
8
9
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}

非公平的优点:线程并发性能高,因为所有新来的线程先通过compareAndSetState(0, 1)竞争锁资源,如果成功就能马上返回,无需经过排队。

非公平的缺点:因为新来的线程很快通过compareAndSetState(0, 1)提前获得锁资源,导致阻塞队列的线程节点可能一直没有机会出队获取锁资源,也即出现“线程饥饿”

线程饿死的解释:

If a thread is not granted CPU time because other threads grab it all, it is called “starvation”. The thread is “starved to death” because other threads are allowed the CPU time instead of it. The solution to starvation is called “fairness” - that all threads are fairly granted a chance to execute.

公平的优点:不会出现线程饿死,保证每个新来的线程都有机会获得竞争锁资源

公共的缺点:线程并发性能可能比非公平锁低一些。