概述
链式队列,此队列按照先进先出FIFO原则对元素进行排序,默认长度为Integer.MAX_VALUE。作为链表队列,就会有Node的概念,linkedBlockingQueue的node比较简单,只有元素本身及其下一个节点:
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
node固定有一个head和一个tail,head.item=null,tail.next=null。分别提供了take/poll方法对head进行取操作,和offer/put操作对tail进行存操作,以及takeLock和putLock对应取存锁,获取锁的条件即notEmpty和notFull,分别提供了signal和await方法来判断条件。
链式阻塞队列的模型如下:
补充链式无界阻塞队列模型
链式阻塞队列的常用操作
自动阻塞的put和take
队列容量不足或为0时自动阻塞
void put(E e)
:自动阻塞,队列容量满后,自动阻塞。E take()
:自动阻塞,队列容量为0后,自动阻塞。
非阻塞不抛异常返回结果的offer、poll、peek、drainTo方法
源码分析
LinkedBlockingQueue的核心成员变量
// 当前队列元素数量
private final AtomicInteger count = new AtomicInteger();
// 头节点、尾节点
transient Node<E> head;
private transient Node<E> last;
// 取锁和AQS条件-非空
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
// 存锁和AQS条件-非满
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
take方法
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
先拿了队列长度和取锁。
这里先尝试对取锁进行加锁,防止同一时间有别的线程从队列中取。
lockInterruptibly方法即ReentrantLock中的监测线程中断的获取锁方法,调用点是AQS的acquireInterruptibly方法。该方法判断线程中断时终止获取锁。
如果获取不到锁,有AQS的存在,线程会直接park阻塞,直到获取锁后继续执行。
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
这里因为要从队列里面取任务,因此先取出count变量判断队列中有没有元素。如果没有,执行notEmpty.await,这里就到了AQS的condition中,如果条件不满足,执行await后进入condition队列,等待条件达成后唤醒。
被唤醒后说明此线程已经可以做take操作,因此完成出队和数量减1的操作,这里getAndDecrement方法的返回值是上一个值。此时再判断count数量,如果还有存货,调用signal方法唤醒下一个take线程。最后释放锁。
最后同时因为已经做过take了,这里再判断如果已经消费完了,同时唤醒一个存流程。
if (c == capacity)
signalNotFull();
这里c为最大容量时,唤醒一个存线程。为什么c是最大容量时唤醒,因为上面getAndDecrement返回值是上一个值,说明是由满减1。
put方法
与take方法基本一致,都是先判断数量,同时获取存锁。在存锁加锁成功的条件下,判断存锁条件已满
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
这里就是如果数量已达上限,put阻塞,被唤醒后入队,同时完成存入操作。如果还有余量,再唤醒一个put线程。
if (c == 0)
signalNotEmpty();
最后如果c为0,因为getAndIncrement拿的也是上一个值,说明现在里面有等待的,唤醒一个取take线程。
poll和offer
代码与take、put基本一致,差别就在于是否通过notFull和notEmpty做await操作,因此产生了阻塞操作和非阻塞操作的差异。虽然不阻塞,但是同样会唤醒阻塞线程。
评论区