publicinterfaceBlockingQueue<E> extendsQueue<E> { /** * Inserts the specified element into this queue, waiting if necessary * for space to become available. * * @param e the element to add * @throws InterruptedException if interrupted while waiting * @throws ClassCastException if the class of the specified element * prevents it from being added to this queue * @throws NullPointerException if the specified element is null * @throws IllegalArgumentException if some property of the specified * element prevents it from being added to this queue */ voidput(E e)throws InterruptedException; /** * Retrieves and removes the head of this queue, waiting if necessary * until an element becomes available. * * @return the head of this queue * @throws InterruptedException if interrupted while waiting */ E take()throws InterruptedException; }
/** * Serialization ID. This class relies on default serialization * even for the items array, which is default-serialized, even if * it is empty. Otherwise it could not be declared final, which is * necessary here. */ privatestaticfinallong serialVersionUID = -817911632652898426L;
/** The queued items ArrayBlockingQueue存放元素的容器是基于数组实现的 */ final Object[] items;
/** items index for next take, poll, peek or remove */ int takeIndex;
/** items index for next put, offer, or add */ int putIndex;
/** Number of elements in the queue ArrayBlockingQueue当前含有元素的个数,每次向队列里面放入1个元素,则count++,每次向队列里面取走1个元素,则count-- */ int count;
/* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /* 可以看到,ArrayBlockingQueue的相关put、take也即读写操作都是基于全局一把ReentrantLock,也即是互斥的,而非无锁实现,性能当然会比无锁队列要低。阻塞机制实现:使用一把锁和两个条件队列实现 notEmpty:消费者线程调用take发现队列元素为空时,被阻塞所在的条件队列 notFull:生产者线程调用put发现队列元素满时,被阻塞所在的条件队列 */ /** Main lock guarding all access */ final ReentrantLock lock;
/** Condition for waiting takes */ privatefinal Condition notEmpty;
/** Condition for waiting puts */ privatefinal Condition notFull; publicArrayBlockingQueue(int capacity, boolean fair){ if (capacity <= 0) thrownew IllegalArgumentException(); this.items = new Object[capacity]; // 初始化时可以选择存放元素的个数,以及ReentrantLock的公平模式,默认采用性能相对较高的非公平模式 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
/** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ // 在队列尾部添加元素,且不支持添加null对象 publicvoidput(E e)throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; // 1、如果有多个put线程,它们之间需要竞争互斥锁,因此没lock成功的写线程都会进入lock对象关联的CLH阻塞队列里面等着 lock.lockInterruptibly(); try { // 2、就算此put线程抢到独占锁资源,但因为条件中“队列的元素个数已经满了”,此时put线程会被notFull.await()放入 notFull对象关联的条件队列,这既是ArrayBlockingQueue的put的阻塞设计实现 while (count == items.length) notFull.await(); // 3、执行流到这里,说明队列还没满,因此可以把元素放入队列,使用enqueue放入元素 enqueue(e); } finally { lock.unlock(); } }
/** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ // 注意:将元素入队,是指将元素放在队尾,而取出元素也即出队,是从队列头取出 privatevoidenqueue(E x){ // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // 1、将元素放入putIndex下标指向的桶位,putIndex默认值为0,如果下一次要放入的下标已经达到数组长度,那么将putIndex重置为0,以便实现循环使用底层数组放置元素 items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; // 2、入队一个元素,计数器加1 count++; // 3、既然队列已经有刚添加的元素,那么可以通知在notEmpty条件队列等待的“消费者线程们” notEmpty.signal(); }
classBoundedBuffer{ // 设计一个有界的阻塞队列:一个独占锁关联两个条件队列 final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100]; // 内部指定此队列只能放置100个元素 int putptr, takeptr, count; // 这里的putptr 也即上面的putIndex,takeptr对应takeIndex
/** * Linked list node class */ staticclassNode<E> { E item;
/** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ // 注意由于容器链表的head节点是一个辅助节点,真正的数据头节点是first=head.next Node<E> next;
Node(E x) { item = x; } }
/** The capacity bound, or Integer.MAX_VALUE if none */ privatefinalint capacity; // 可以指定默认存放元素个数,最多只能到Integer.MAX_VALUE个
/** Current number of elements */ // 当前容器列表的元素个数,这里为何使用线程安全的原子计数器,而不是像ArrayBlockingQueue那样使用普通的count? 这里需要理解LinkedBlockingQueue双互斥锁设计才能明白其设计意图! privatefinal AtomicInteger count = new AtomicInteger();
/** * Head of linked list. * Invariant: head.item == null */ // 容器链表的头结点是head = new Node<E>(null),因此只需要判断head.item==null,说明此时位于head就是位于头节点位置 transient Node<E> head;
/** 源码注释清楚说明:采用了尾插法,也即生产者线程将元素放入容器链表尾部 * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ publicvoidput(E e)throws InterruptedException { // 这里也说明LinkedBlockingQueue不支持放入null对象 if (e == null) thrownew NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); // 当然如果有多个生产者线程使用put,那么那些没抢到独占锁的生产者线程将阻塞在putLock关联的CLH阻塞队列里面。 final ReentrantLock putLock = this.putLock; // 1、因为存在两把锁:putLock和takeLock,意味着生产者线程和消费者可以同一时刻操作计数器count,因此采用AtomicInteger才能计数操作的线程安全 final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { // 2、通过当前容器链表的元素数量达到指定长度,说明已经满了,则调用put生产者线程将被阻塞在notFull关联的条件队列 while (count.get() == capacity) { notFull.await(); } // 3、执行流来到这里,说明此时容器链表可以放入元素,则在链表尾部追加即可 enqueue(node); // 4、添加元素后,对计时器加1并返回上一次计数值 c = count.getAndIncrement(); // 5、如果计时器+1还未超过指定长度,则唤醒可能在notFull条件队列阻塞的的生产者线程:当前容器链表还没满,你们可以醒来向此容器put入元素。 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } /* 6、首先c = count.getAndIncrement(),说明c是在容器链表添加元素之前的计数,如果c等于0,说明容器链表是空的,也说明可能在c=0时可能已经有take消费者线程阻塞在notEmpty的条件队列中, 而经过上面的3步骤后,既然已经入队了1个元素,那么就需要唤醒在c=0那段时刻被阻塞的消费者线程。 */ if (c == 0) signalNotEmpty(); }
/** * Links node at end of queue. */ privatevoidenqueue(Node<E> node){ // assert putLock.isHeldByCurrentThread(); // assert last.next == null; // 可以看到如果是链表结构,入队操作比基于数组实现的效率快很多 // last.next=node 将当前链表尾指针的next指向新节点node,从而使得node入队 // last=node 更新尾指针的指向最新的尾节点。 last = last.next = node; }
/** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */ // 此方法也很简单,唤醒阻塞在takeLock对象下的条件队列的消费者线程节点 privatevoidsignalNotEmpty(){ final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
// 从容器链表头部的第一个数据节点取出元素(注意:这里说的是第一个数据节点,而不是head,因为head是一个辅助节点) public E take()throws InterruptedException { E x; int c = -1; // 1、因为存在两把锁:putLock和takeLock,意味着生产者线程和消费者可以同一时刻操作计数器count,因此采用AtomicInteger才能计数操作的线程安全 final AtomicInteger count = this.count; // 2、消费线程线程使用take锁,说明容器链表的put操作和take操作可以同时进行,而ArrayBlockingQueue是互斥的。当然如果有多个消费者线程使用take,那么那些没抢到独占锁的消费者线程将阻塞在takeLock关联的CLH阻塞队列里面。 final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //3、 如果当前容器链表元素为0,也即无元素可取,那么消费者线程只能去notEmpty的条件队列阻塞等待 while (count.get() == 0) { notEmpty.await(); } // 4、来到这里说明容器是有元素的,从“队头”取出一个 x = dequeue(); // 5、从容器链表取出1个元素后,对计时器减1并返回上一次计数值 c = count.getAndDecrement(); /* 6、如果上一次计数值大于0,说明此刻容器队列还有元素可以去继续取,那么唤醒那些可能在notEmpty条件队列等待的消费线程:你们可以过来take元素。 注意:这里说的唤醒要求你确实掌握AQS对于signal的内部实现:signal只是将notEmpty条件队列中阻塞的第一个线程节点转移到takeLock关联的CLH阻塞队列,也即那个节点被转移到CLH阻塞队列还处于阻塞状态,直到当前线程执行完finally的unlock,此操作才会在AQS使用unparkSuccessor唤醒那个在takeLock关联的CLH阻塞队列的线程节点。 */ if (c > 1) notEmpty.signal(); } finally { // 7、 takeLock.unlock(); } /* 6、首先c = count.getAndIncrement(),说明c是在消费者取出元素之前的计数,如果c已经是指定容器长度,说明容器链表此时已满,也说明在c=capacity那一刻可能已经有put生产者线程阻塞在notFull的条件队列中, 而经过上面的4的出队操作后,既然已经容器已经扣减了1个元素,那么就需要唤醒c=capacity容器满的那段时刻被阻塞的生产者线程。 */ if (c == capacity) signalNotFull(); return x; }
/** * Removes a node from head of queue. * * @return the node */ private E dequeue(){ // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; // 取出第一个数据节点:h.next,注意:不是head,因为head=new Node(null)是一个辅助节点 Node<E> first = h.next; h.next = h; // help GC // 将head指针指向第一个数据节点并取出该节点存放的数据,然后将节点的item置为null,那么此时head指向的first就是一个新的辅助节点:first(item=null,next=下一个数据节点) head = first; E x = first.item; first.item = null; return x; }