yield-bytes

沉淀、分享与无限进步

Java高级主题:基于AQS条件队列实现的各种XXBlockingQueue分析

有了《基于AQS实现的Condition底层原理解析》文章关于条件队列底层设计的讨论后,那么关于使用BlockingQueue接口和AQS设计各种阻塞队列实现——ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue的数据结构设计和源代码实现则非常好理解。注意本文的讨论的“阻塞队列”是指代BlockingQueue接口,不是指代AQS源码分析文章中所说的CLH的FIFO阻塞队列。

BlockingQueue

BlockingQueue是一个接口,它有Queue接口常见的方法:add、offer,而put方法和take方法则是BlockingQueue特有的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface BlockingQueue<E> extends Queue<E> {

/**
* Inserts the specified element into this queue, waiting if necessary
* for space to become available.
*
* @param e the element to add
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
void put(E e) throws InterruptedException;

/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
E take() throws InterruptedException;

}

其实put和take就是经典的生产者-消费者模型中的逻辑:生产者使用put操作不断向“容器”中放入元素,消费者使用take操作不断从“容器”中取出(消费)元素。只不过对于BlockingQueue接口来说,put和take都要求实现此BlockingQueue接口的子类需实现“阻塞机制”,也即:

1、对于put操作来说:生产者线程(1个或者多线程并发)向容器(该容器可以是基于数组实现或者基于链表实现)put一个元素(item),如果此容器已满,则所有生产者线程的put操作都会被阻塞直到“容器不满notFull”时。

结合AQS的条件队列的阻塞机制设计可推出:对于put的阻塞,其实就是让执行put操作的生产者线程在一个“特定的”条件队列中阻塞,这里的阻塞实现就是使用该条件队列的await方法达成。

2、对于take操作来说:消费者线程(1个或者多线程并发)从容器(该容器可以是基于数组实现或者基于链表实现)中take一个元素(item),如果此容器为空,则所有消费者线程的take操作都会被阻塞直到“容器不空notEmtpy”时。

结合AQS的条件队列的阻塞机制设计可推出:对于take的阻塞,其实就是让执行take操作的消费者线程在一个“特定的”条件队列中阻塞,这里的阻塞实现就是使用该条件队列的await方法达成。

ArrayBlockingQueue基本使用

ArrayBlockingQueue的put阻塞demo

1
2
3
4
5
6
7
8
9
10
11
12
BlockingQueue<String> q=new ArrayBlockingQueue(3,true); // 这里的true表示使用ArrayBlockingQueue内部ReentrantLock的公平锁模式,目的是为了使得多个线程有序抢锁资源,以便观察输出情况
for (int i = 0; i < 5; i++) {
int ii=i;
new Thread(()->{
try {
q.put("foo");
System.out.println(Thread.currentThread().getName()+"生产了foo:"+ii);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

输出以下三句后主线程进入阻塞状态,因为队列长度为3,此时有5个生产线程向q并发生产,因此当第3个线程完成后,q队列已经满了,当后面的第4个、第5个线程向q生产元素时,就会被阻塞:

Thread-0生产了foo:0
Thread-1生产了foo:1
Thread-2生产了foo:2

ArrayBlockingQueue的take操作阻塞demo

1
2
3
4
5
6
7
8
9
10
11
12
BlockingQueue<String> q=new ArrayBlockingQueue(3,true);
for (int i = 0; i < 5; i++) {
int ii=i;
new Thread(()->{
try {
q.take();
System.out.println("foo");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

此时没有输出:因为q队列没有元素,因此5个消费者线程只能被阻塞。

ArrayBlockingQueue核心设计

本文并不定打算对ArrayBlockingQueue做全面的源代码的解析,本文仅挑出其核心的阻塞设计作为解析

关键属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

/**
* Serialization ID. This class relies on default serialization
* even for the items array, which is default-serialized, even if
* it is empty. Otherwise it could not be declared final, which is
* necessary here.
*/
private static final long serialVersionUID = -817911632652898426L;

/** The queued items ArrayBlockingQueue存放元素的容器是基于数组实现的 */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue
ArrayBlockingQueue当前含有元素的个数,每次向队列里面放入1个元素,则count++,每次向队列里面取走1个元素,则count--
*/
int count;

/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/* 可以看到,ArrayBlockingQueue的相关put、take也即读写操作都是基于全局一把ReentrantLock,也即是互斥的,而非无锁实现,性能当然会比无锁队列要低。阻塞机制实现:使用一把锁和两个条件队列实现
notEmpty:消费者线程调用take发现队列元素为空时,被阻塞所在的条件队列
notFull:生产者线程调用put发现队列元素满时,被阻塞所在的条件队列
*/
/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity]; // 初始化时可以选择存放元素的个数,以及ReentrantLock的公平模式,默认采用性能相对较高的非公平模式
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
put方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
 /**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
// 在队列尾部添加元素,且不支持添加null对象
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 1、如果有多个put线程,它们之间需要竞争互斥锁,因此没lock成功的写线程都会进入lock对象关联的CLH阻塞队列里面等着
lock.lockInterruptibly();
try {
// 2、就算此put线程抢到独占锁资源,但因为条件中“队列的元素个数已经满了”,此时put线程会被notFull.await()放入 notFull对象关联的条件队列,这既是ArrayBlockingQueue的put的阻塞设计实现
while (count == items.length)
notFull.await();
// 3、执行流到这里,说明队列还没满,因此可以把元素放入队列,使用enqueue放入元素
enqueue(e);
} finally {
lock.unlock();
}
}


/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
// 注意:将元素入队,是指将元素放在队尾,而取出元素也即出队,是从队列头取出
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
// 1、将元素放入putIndex下标指向的桶位,putIndex默认值为0,如果下一次要放入的下标已经达到数组长度,那么将putIndex重置为0,以便实现循环使用底层数组放置元素
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
// 2、入队一个元素,计数器加1
count++;
// 3、既然队列已经有刚添加的元素,那么可以通知在notEmpty条件队列等待的“消费者线程们”
notEmpty.signal();
}

take方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 1、显然take操作也是互斥锁操作,同理如果有多个消费者线程,未能抢到lock的消费者线程则进入lock关联的CLH阻塞队列,结合上面put方法,因此CLH阻塞队列的线程节点可能同时存在消费者线程节点和生产者线程节点
lock.lockInterruptibly();
try {
// 2、只要当前队列没有元素,就将当前消费者线程阻塞在notEmpty的条件队列中
while (count == 0)
notEmpty.await();
// 3、说明此时队列里面还有元素可供取出,使用dequeue
return dequeue();
} finally {
lock.unlock();
}
}

/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
// 1、取出当前位置的元素后马上置为null
items[takeIndex] = null;
// 2、如果下一个要取出的元素下标已经达到数组长度,则重置为0,以便重复利用items这个数组
if (++takeIndex == items.length)
takeIndex = 0;
// 3、既然取出了一个元素,且items[takeIndex] 已经置为null,当然对元素个数的计数器减1
count--;
/*
4、若使用了ArrayBlockingQueue的迭代器,在每次从队列取出一个元素时还需删除那些关联此队列的、“过时”的迭代器上对应的元素,以实现每个迭代器和底层容器数据数据的一致性:
例如下面,同一个队列,开启了两个迭代器
BlockingQueue<String> q=new ArrayBlockingQueue(3,true);
Iterator iterator1=q.iterator();
Iterator iterator2=q.iterator();
由于ArrayBlockingQueue的迭代器设计比普通的ArrayList迭代器设计要复杂,本文在此不再展开。
*/

if (itrs != null)
itrs.elementDequeued();
// 4、既然消费者线程已经从队列取出一个元素,说明队列肯定不满,因此可以通知阻塞在notFull条件队列的生产者线程。
notFull.signal();
return x;
}

其实关于put、take的设计思想,在java.util.concurrent.locks.Condition的源码设计中就有给出相应的demo说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class BoundedBuffer {
// 设计一个有界的阻塞队列:一个独占锁关联两个条件队列
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100]; // 内部指定此队列只能放置100个元素
int putptr, takeptr, count; // 这里的putptr 也即上面的putIndex,takeptr对应takeIndex

public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

BoundedBuffer buffer=new BoundedBuffer();
new Thread(()->{
try {
buffer.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();


new Thread(()->{
try {
buffer.put("foo");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

LinkedBlockingQueue基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
BlockingQueue<String> q=new LinkedBlockingQueue<String>(3);
Iterator iterator1=q.iterator();
Iterator iterator2=q.iterator();
for (int i = 0; i < 5; i++) {
int ii=i;
new Thread(()->{
try {
q.put("foo"); // 观察put的阻塞
System.out.println(Thread.currentThread().getName()+"生产了foo:"+ii);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

LinkedBlockingQueue基本使用其实跟ArrayBlockingQueue一样的,只不过内部存放元素的容器是基于链表实现,当然也可以在初始化就指定可存放元素的数量,这样原本的链表阻塞队列即可变为有界阻塞队列。

LinkedBlockingQueue核心设计

以下将存放元素的链表统称为容器链表,也即存放元素的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

/**
* Linked list node class
*/
static class Node<E> {
E item;

/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
// 注意由于容器链表的head节点是一个辅助节点,真正的数据头节点是first=head.next
Node<E> next;

Node(E x) { item = x; }
}


/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity; // 可以指定默认存放元素个数,最多只能到Integer.MAX_VALUE个

/** Current number of elements */
// 当前容器列表的元素个数,这里为何使用线程安全的原子计数器,而不是像ArrayBlockingQueue那样使用普通的count? 这里需要理解LinkedBlockingQueue双互斥锁设计才能明白其设计意图!
private final AtomicInteger count = new AtomicInteger();

/**
* Head of linked list.
* Invariant: head.item == null
*/
// 容器链表的头结点是head = new Node<E>(null),因此只需要判断head.item==null,说明此时位于head就是位于头节点位置
transient Node<E> head;

/**
* Tail of linked list.
* Invariant: last.next == null
*/
// 如果last.next=null(注意区别head.next=item),说明last执行的是容器链表的最后一个数据节点
private transient Node<E> last;

/** Lock held by take, poll, etc */
// 这里就是体现了双锁设计:一把take锁,用于消费者线程,意味着消费者线程的take操作并不会阻塞生产者线程的put操作
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
// take锁关联的条件队列,用于存放被阻塞的消费者线程
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
// 这里就是体现了双锁设计:一把put锁,用于生产者线程,意味着产者线程的put操作并不会阻塞消费者线程的take操作
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
// put锁关联的条件队列,用于存放被阻塞的生产者线程
private final Condition notFull = putLock.newCondition();

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14

// 默认构造器的容器链表长度使用了最大正整数值
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
/* 这里可以看到LinkedBlockingQueue存放元素的链表跟普通的链表的头节点不同,
普通链表的头节点在初始化是head=null,意味着当head指向节点时,此节点就是真正的数据头节点,而LinkedBlockingQueue的头节点是一个辅助节点,在初始化时head指向new Node<E>(null),显然head不是真正的数据头节点
*/
last = head = new Node<E>(null);
}

含有数据节点的链表结构示意如下:

head(null)->node0(item0)->node1(item1)->node2(item2)->node3(item3)->null

put方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

/**
源码注释清楚说明:采用了尾插法,也即生产者线程将元素放入容器链表尾部
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
// 这里也说明LinkedBlockingQueue不支持放入null对象
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);

// 当然如果有多个生产者线程使用put,那么那些没抢到独占锁的生产者线程将阻塞在putLock关联的CLH阻塞队列里面。
final ReentrantLock putLock = this.putLock;
// 1、因为存在两把锁:putLock和takeLock,意味着生产者线程和消费者可以同一时刻操作计数器count,因此采用AtomicInteger才能计数操作的线程安全
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 2、通过当前容器链表的元素数量达到指定长度,说明已经满了,则调用put生产者线程将被阻塞在notFull关联的条件队列
while (count.get() == capacity) {
notFull.await();
}
// 3、执行流来到这里,说明此时容器链表可以放入元素,则在链表尾部追加即可
enqueue(node);
// 4、添加元素后,对计时器加1并返回上一次计数值
c = count.getAndIncrement();
// 5、如果计时器+1还未超过指定长度,则唤醒可能在notFull条件队列阻塞的的生产者线程:当前容器链表还没满,你们可以醒来向此容器put入元素。
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
/* 6、首先c = count.getAndIncrement(),说明c是在容器链表添加元素之前的计数,如果c等于0,说明容器链表是空的,也说明可能在c=0时可能已经有take消费者线程阻塞在notEmpty的条件队列中,
而经过上面的3步骤后,既然已经入队了1个元素,那么就需要唤醒在c=0那段时刻被阻塞的消费者线程。
*/
if (c == 0)
signalNotEmpty();
}


/**
* Links node at end of queue.
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
// 可以看到如果是链表结构,入队操作比基于数组实现的效率快很多
// last.next=node 将当前链表尾指针的next指向新节点node,从而使得node入队
// last=node 更新尾指针的指向最新的尾节点。
last = last.next = node;
}


/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
// 此方法也很简单,唤醒阻塞在takeLock对象下的条件队列的消费者线程节点
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
take方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
  // 从容器链表头部的第一个数据节点取出元素(注意:这里说的是第一个数据节点,而不是head,因为head是一个辅助节点) 
public E take() throws InterruptedException {
E x;
int c = -1;
// 1、因为存在两把锁:putLock和takeLock,意味着生产者线程和消费者可以同一时刻操作计数器count,因此采用AtomicInteger才能计数操作的线程安全
final AtomicInteger count = this.count;
// 2、消费线程线程使用take锁,说明容器链表的put操作和take操作可以同时进行,而ArrayBlockingQueue是互斥的。当然如果有多个消费者线程使用take,那么那些没抢到独占锁的消费者线程将阻塞在takeLock关联的CLH阻塞队列里面。
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//3、 如果当前容器链表元素为0,也即无元素可取,那么消费者线程只能去notEmpty的条件队列阻塞等待
while (count.get() == 0) {
notEmpty.await();
}
// 4、来到这里说明容器是有元素的,从“队头”取出一个
x = dequeue();
// 5、从容器链表取出1个元素后,对计时器减1并返回上一次计数值
c = count.getAndDecrement();
/* 6、如果上一次计数值大于0,说明此刻容器队列还有元素可以去继续取,那么唤醒那些可能在notEmpty条件队列等待的消费线程:你们可以过来take元素。
注意:这里说的唤醒要求你确实掌握AQS对于signal的内部实现:signal只是将notEmpty条件队列中阻塞的第一个线程节点转移到takeLock关联的CLH阻塞队列,也即那个节点被转移到CLH阻塞队列还处于阻塞状态,直到当前线程执行完finally的unlock,此操作才会在AQS使用unparkSuccessor唤醒那个在takeLock关联的CLH阻塞队列的线程节点。
*/
if (c > 1)
notEmpty.signal();
} finally {
// 7、
takeLock.unlock();
}
/* 6、首先c = count.getAndIncrement(),说明c是在消费者取出元素之前的计数,如果c已经是指定容器长度,说明容器链表此时已满,也说明在c=capacity那一刻可能已经有put生产者线程阻塞在notFull的条件队列中,
而经过上面的4的出队操作后,既然已经容器已经扣减了1个元素,那么就需要唤醒c=capacity容器满的那段时刻被阻塞的生产者线程。
*/
if (c == capacity)
signalNotFull();
return x;
}

/**
* Removes a node from head of queue.
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
// 取出第一个数据节点:h.next,注意:不是head,因为head=new Node(null)是一个辅助节点
Node<E> first = h.next;
h.next = h; // help GC
// 将head指针指向第一个数据节点并取出该节点存放的数据,然后将节点的item置为null,那么此时head指向的first就是一个新的辅助节点:first(item=null,next=下一个数据节点)
head = first;
E x = first.item;
first.item = null;
return x;
}