概述
PriorityBlockingQueue是支持线程优先级排序的无界阻塞队列,默认排序是按自然序进行排序,也可以自定义实现compareTo()方法指定排序规则。但这种排序无法保证同优先级元素顺序。还有个兄弟PriorityQueue是优先级无界非阻塞队列。
两者的实现方式是一模一样的,都是采用基于数组的平衡二叉堆实现,不论入队的顺序怎么样,take、poll出队的节点都是按优先级排序的。但是PriorityBlockingQueue/PriorityQueue队列中的所有元素并不是在入队之后就已经全部按优先级排好序了,而是只保证head节点即队列的首个元素是当前最小或者说最高优先级的,其它节点的顺序并不保证是按优先级排序的(堆排序),PriorityBlockingQueue/PriorityQueue队列只会在通过take、poll取走head之后才会再次决出新的最小或者说最高优先级的节点作为新的head,其它节点的顺序依然不保证。所以通过peek拿到的head节点就是当前队列中最高优先级的节点。
无界并不是真的无界,而是通过扩容实现的无界。因为底层是数组实现,而数组是必须有个初始大小的,预分配内存空间。
源码分析
构造函数
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.comparator = comparator;
this.queue = new Object[Math.max(1, initialCapacity)];
}
从构造函数可以看出来是给数组做了预分配空间的,因此这里无界并不是真的无界。comparator是优先级比较器。
put/offer - 非阻塞存方法
因为是无界的,因此存方法是不需要阻塞的,但是需要唤醒取方法的阻塞条件,因此还是需要预先加锁
lock.lock();
……
while ((n = size) >= (cap = (es = queue).length))
tryGrow(es, cap);
加锁后首先判断大小,如果不够了,调用扩容方法
if ((cmp = comparator) == null)
siftUpComparable(n, e, es);
else
siftUpUsingComparator(n, e, es, cmp);
插入的时候有一组特殊方法,用于比较优先级
size = n + 1;
notEmpty.signal();
最后执行队列长度加1,同时唤醒取线程
tryGrow - 扩容
lock.unlock();
Object[] newArray = null;
if (allocationSpinLock == 0 &&
ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {
……
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
首先把锁释放掉,然后通过CAS操作加个小锁,进行扩容,这里判断CAS操作返回结果,如果返回的false,说明没执行成功,有其他线程调整了,就去执行下面的Thread.yield(),主动把资源释放掉。
为什么要这么做?
为了提升性能,扩容是个麻烦的事情,如果扩容的时候一直持有lock锁,其他出队操作就无法执行。如果释放锁改用CAS,其他出入队操作就可以执行。因此这时候有可能有多个线程一起执行扩容,但最终只有一个能执行成功,执行不成功的就去执行yield方法释放cpu时间片,是为了让扩容线程更容易抢到锁,但是这里是不保证的。
try {
int growth = (oldCap < 64)
? (oldCap + 2) // grow faster if small
: (oldCap >> 1);
int newCap = ArraysSupport.newLength(oldCap, 1, growth);
if (queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
扩容流程,判断老容量小于64,则长度加2,如果大于,则一次性扩容50%
siftUpUsingComparator - 入队构造小顶堆
while (k > 0) {
// 取父节点(k - 1) >>> 1和对应元素e
int parent = (k - 1) >>> 1;
Object e = es[parent];
// 通过比较器比较父节点和新节点,优先级越小越高,比较如果当前元素优先级低于父节点
// 就直接退出,插入最后了,如果高于父节点,让新节点变成父节点,k取父索引继续循环比
if (cmp.compare(x, (T) e) >= 0)
break;
es[k] = e;
k = parent;
}
es[k] = x;
入队构造小顶堆时,初始值k是queue的size,即倒着比
构造一个假的小顶堆,为什么说是假的,因为插入的时候其实只比了新的那一个节点,并没有比较其他被打乱的节点
同时注意这里优先级值是越小优先级越高
dequeue - 出队方法
take、poll都是在dequeue方法基础上实现的阻塞/非阻塞
if ((result = (E) ((es = queue)[0])) != null) {
出队的时候默认出头节点,因为入队的时候已经做过小顶堆转换了,起码保证了堆顶优先级最高
final E x = (E) es[(n = --size)];
es[n] = null;
if (n > 0) {
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
siftDownComparable(0, x, es, n);
else
siftDownUsingComparator(0, x, es, n, cmp);
}
然后取当前的最后一个元素,同时把最后一个位置置空,拿队尾元素做一次出队排序
siftDownUsingComparator - 出队构造小顶堆
int half = n >>> 1;
while (k < half) {
出队构造小顶堆,初始值k是0,n是出队后的新长度,先找中间节点
循环条件是k<中间索引
// 找到左右孩子索引,和左孩子的元素
int child = (k << 1) + 1;
Object c = es[child];
int right = child + 1;
// 右孩子(如果存在)小于size,且左孩子元素优先级高于右孩子
if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
// 选中右孩子
c = es[child = right];
// 当队尾节点优先级低于刚刚选中的孩子节点,退出,否则置换
if (cmp.compare(x, (T) c) <= 0)
break;
es[k] = c;
k = child;
思路是从堆顶开始,找左右孩子中优先级最低的那个跟堆尾比,优先级更低,置换到堆尾。这里的目的是找到一个低优先级的往堆尾放。
评论区