目 录CONTENT

文章目录

LinkedBlockingQueue - 基于链表的无界阻塞队列

FatFish1
2024-10-24 / 0 评论 / 0 点赞 / 66 阅读 / 0 字 / 正在检测是否收录...

概述

链式队列,此队列按照先进先出FIFO原则对元素进行排序,默认长度为Integer.MAX_VALUE。作为链表队列,就会有Node的概念,linkedBlockingQueuenode比较简单,只有元素本身及其下一个节点:

static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
}

node固定有一个head和一个tailhead.item=nulltail.next=null。分别提供了take/poll方法对head进行取操作,和offer/put操作对tail进行存操作,以及takeLockputLock对应取存锁,获取锁的条件即notEmptynotFull,分别提供了signalawait方法来判断条件。

链式阻塞队列的模型如下:

补充链式无界阻塞队列模型

链式阻塞队列的常用操作

自动阻塞的put和take

队列容量不足或为0时自动阻塞

  • void put(E e):自动阻塞,队列容量满后,自动阻塞。

  • E take():自动阻塞,队列容量为0后,自动阻塞。

非阻塞不抛异常返回结果的offer、poll、peek、drainTo方法

源码分析

LinkedBlockingQueue的核心成员变量

// 当前队列元素数量
private final AtomicInteger count = new AtomicInteger();
// 头节点、尾节点
transient Node<E> head;
private transient Node<E> last;
// 取锁和AQS条件-非空
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
// 存锁和AQS条件-非满
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

take方法

final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();

先拿了队列长度和取锁。

这里先尝试对取锁进行加锁,防止同一时间有别的线程从队列中取。

lockInterruptibly方法即ReentrantLock中的监测线程中断的获取锁方法,调用点是AQS的acquireInterruptibly方法。该方法判断线程中断时终止获取锁。

如果获取不到锁,有AQS的存在,线程会直接park阻塞,直到获取锁后继续执行。

try {
    while (count.get() == 0) {
        notEmpty.await();
    }
    x = dequeue();
    c = count.getAndDecrement();
    if (c > 1)
        notEmpty.signal();
} finally {
    takeLock.unlock();
}

这里因为要从队列里面取任务,因此先取出count变量判断队列中有没有元素。如果没有,执行notEmpty.await,这里就到了AQS的condition中,如果条件不满足,执行await后进入condition队列,等待条件达成后唤醒。

被唤醒后说明此线程已经可以做take操作,因此完成出队和数量减1的操作,这里getAndDecrement方法的返回值是上一个值。此时再判断count数量,如果还有存货,调用signal方法唤醒下一个take线程。最后释放锁。

最后同时因为已经做过take了,这里再判断如果已经消费完了,同时唤醒一个存流程。

if (c == capacity)
    signalNotFull();

这里c为最大容量时,唤醒一个存线程。为什么c是最大容量时唤醒,因为上面getAndDecrement返回值是上一个值,说明是由满减1。

put方法

与take方法基本一致,都是先判断数量,同时获取存锁。在存锁加锁成功的条件下,判断存锁条件已满

try {
     while (count.get() == capacity) {
         notFull.await();
     }
     enqueue(node);
     c = count.getAndIncrement();
     if (c + 1 < capacity)
         notFull.signal();
 } finally {
     putLock.unlock();
 }

这里就是如果数量已达上限,put阻塞,被唤醒后入队,同时完成存入操作。如果还有余量,再唤醒一个put线程。

if (c == 0)
    signalNotEmpty();

最后如果c为0,因为getAndIncrement拿的也是上一个值,说明现在里面有等待的,唤醒一个取take线程。

polloffer

代码与take、put基本一致,差别就在于是否通过notFull和notEmpty做await操作,因此产生了阻塞操作和非阻塞操作的差异。虽然不阻塞,但是同样会唤醒阻塞线程。

0

评论区