目 录CONTENT

文章目录

PriorityBlockingQueue/PriorityQueue - 优先级无界队列

FatFish1
2024-10-24 / 0 评论 / 0 点赞 / 47 阅读 / 0 字 / 正在检测是否收录...

概述

PriorityBlockingQueue是支持线程优先级排序的无界阻塞队列,默认排序是按自然序进行排序,也可以自定义实现compareTo()方法指定排序规则。但这种排序无法保证同优先级元素顺序。还有个兄弟PriorityQueue是优先级无界非阻塞队列

两者的实现方式是一模一样的,都是采用基于数组的平衡二叉堆实现,不论入队的顺序怎么样,take、poll出队的节点都是按优先级排序的。但是PriorityBlockingQueue/PriorityQueue队列中的所有元素并不是在入队之后就已经全部按优先级排好序了,而是只保证head节点即队列的首个元素是当前最小或者说最高优先级的,其它节点的顺序并不保证是按优先级排序的(堆排序),PriorityBlockingQueue/PriorityQueue队列只会在通过take、poll取走head之后才会再次决出新的最小或者说最高优先级的节点作为新的head其它节点的顺序依然不保证。所以通过peek拿到的head节点就是当前队列中最高优先级的节点。

无界并不是真的无界,而是通过扩容实现的无界。因为底层是数组实现,而数组是必须有个初始大小的,预分配内存空间

源码分析

构造函数

public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.comparator = comparator;
    this.queue = new Object[Math.max(1, initialCapacity)];
}

从构造函数可以看出来是给数组做了预分配空间的,因此这里无界并不是真的无界。comparator是优先级比较器。

put/offer - 非阻塞存方法

因为是无界的,因此存方法是不需要阻塞的,但是需要唤醒取方法的阻塞条件,因此还是需要预先加锁

lock.lock();
……
while ((n = size) >= (cap = (es = queue).length))
    tryGrow(es, cap);

加锁后首先判断大小,如果不够了,调用扩容方法

if ((cmp = comparator) == null)
    siftUpComparable(n, e, es);
else
    siftUpUsingComparator(n, e, es, cmp);

插入的时候有一组特殊方法,用于比较优先级

size = n + 1;
notEmpty.signal();

最后执行队列长度加1,同时唤醒取线程

tryGrow - 扩容

lock.unlock();
Object[] newArray = null;
if (allocationSpinLock == 0 &&
    ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
    ……
}
if (newArray == null) // back off if another thread is allocating
    Thread.yield();

首先把锁释放掉,然后通过CAS操作加个小锁,进行扩容,这里判断CAS操作返回结果,如果返回的false,说明没执行成功,有其他线程调整了,就去执行下面的Thread.yield(),主动把资源释放掉。

为什么要这么做?

为了提升性能,扩容是个麻烦的事情,如果扩容的时候一直持有lock锁,其他出队操作就无法执行。如果释放锁改用CAS,其他出入队操作就可以执行。因此这时候有可能有多个线程一起执行扩容,但最终只有一个能执行成功,执行不成功的就去执行yield方法释放cpu时间片,是为了让扩容线程更容易抢到锁,但是这里是不保证的

try {
    int growth = (oldCap < 64)
        ? (oldCap + 2) // grow faster if small
        : (oldCap >> 1);
    int newCap = ArraysSupport.newLength(oldCap, 1, growth);
    if (queue == array)
        newArray = new Object[newCap];
} finally {
    allocationSpinLock = 0;
}

扩容流程,判断老容量小于64,则长度加2,如果大于,则一次性扩容50%

siftUpUsingComparator - 入队构造小顶堆

while (k > 0) {
    // 取父节点(k - 1) >>> 1和对应元素e
    int parent = (k - 1) >>> 1;
    Object e = es[parent];
    // 通过比较器比较父节点和新节点,优先级越小越高,比较如果当前元素优先级低于父节点
// 就直接退出,插入最后了,如果高于父节点,让新节点变成父节点,k取父索引继续循环比
    if (cmp.compare(x, (T) e) >= 0)
        break;
    es[k] = e;
    k = parent;
}
es[k] = x;

入队构造小顶堆时,初始值k是queue的size,即倒着比

构造一个假的小顶堆,为什么说是假的,因为插入的时候其实只比了新的那一个节点,并没有比较其他被打乱的节

同时注意这里优先级值是越小优先级越高

dequeue - 出队方法

take、poll都是在dequeue方法基础上实现的阻塞/非阻塞

if ((result = (E) ((es = queue)[0])) != null) {

出队的时候默认出头节点,因为入队的时候已经做过小顶堆转换了,起码保证了堆顶优先级最高

final E x = (E) es[(n = --size)];
es[n] = null;
if (n > 0) {
    final Comparator<? super E> cmp;
    if ((cmp = comparator) == null)
        siftDownComparable(0, x, es, n);
    else
        siftDownUsingComparator(0, x, es, n, cmp);
}

然后取当前的最后一个元素,同时把最后一个位置置空,拿队尾元素做一次出队排序

siftDownUsingComparator - 出队构造小顶堆

int half = n >>> 1;
while (k < half) {

出队构造小顶堆,初始值k是0,n是出队后的新长度,先找中间节点

循环条件是k<中间索引

// 找到左右孩子索引,和左孩子的元素
int child = (k << 1) + 1;
Object c = es[child];
int right = child + 1;
// 右孩子(如果存在)小于size,且左孩子元素优先级高于右孩子
if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
    // 选中右孩子
    c = es[child = right];
// 当队尾节点优先级低于刚刚选中的孩子节点,退出,否则置换
if (cmp.compare(x, (T) c) <= 0)
    break;
es[k] = c;
k = child;

思路是从堆顶开始,找左右孩子中优先级最低的那个跟堆尾比,优先级更低,置换到堆尾。这里的目的是找到一个低优先级的往堆尾放

0

评论区