简介
ConcurrentLinkedQueue是一个典型的非阻塞、无边界的线程安全队列,基于链接节点,采用CAS算法实现。CoucurrentLinkedQueue规定了如下几个不变形:
在入队的最后一个元素的next为null;
队列中所有未删除的节点的item都不能为null且都能从head节点遍历到;
对于要删除的节点,不能直接将其设置为null,而是先将其item域设置为null,迭代器会跳过item为null的节点;
允许head和tail更新滞后,就是说head、tail不总是指向第一个元素和最后一个元素;比如当前一个线程取到tail节点后,在操作前,另一个线程把tail更新了,此时第一个线程的tail就不是指向最后一个元素,也就是说tail更新滞后了。
head的不可变和可变性:
不可变
所有未删除的节点都可以通过head节点遍历到;
head不能为null;
head节点的next不能指向自身;
可变性
head的item可能为null,也可能不为null;
允许tail滞后head;也就是调用succ()方法
源码分析
Node内部类
无界队列的节点抽象
volatile E item;
volatile Node<E> next;
两个核心成员变量都是使用volatile声明的,说明其具备线程安全性
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
jdk8版本构造函数使用UNSAFE包进行约束,17版本已经切换到VarHandle
核心成员变量
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
一头一尾两个指针
ConcurrentLinkedQueue就是通过头尾指针移动进行链表的维护和遍历
offer - 添加方法
因为ConcurrentLinkedQueue是非阻塞的,因此添加失败是没有走park那套流程的,因此肯定不是基于AQS实现的,没有锁,也不存在唤醒。
为了保证线程安全,采用的是CAS操作+无限自旋。
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
……
}
自旋条件初始化是取tail尾,赋值给p,没有判断条件,没有增长算法,意味着除非CAS执行成功,否则永不返回。
if (q == null) {
if (p.casNext(null, newNode)) {
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
第一个对象入队,tail的next一定为null,做CAS操作,让p.next指向新节点。结果:head=tail,tail.next=newNode。因为这时p指向tail即p=t,不执行尾节点的更新操作,直接返回return true
补充图片concurrentqueue入队
第二个对象入队,这时q是p的next,肯定非null,且q != p,走最后一个else
else
p = (p != t && t != (t = tail)) ? t : q;
这一步是找队尾。核心是这个判断条件p != t && t != (t = tail)
首先p != t
初始的条件是p=t,这里就一定不成立,使p=q即往后移动一位
p做过移动,这里就成立
t != (t = tail)这个条件比较的是t和tail值,即t的初始值和tail,比完把tail赋值给t。但是初始化的时候t=tail,什么情况下t的初始值会不等于tail呢?其实是多线程,别的线程把tail改了,本线程的t还是老tail,这里其实就是判断多线程并发。
11: getstatic #10 // Field t:I
14: getstatic #10 // Field t:I
17: if_icmpeq 24
可见,这里是两次把t压入栈顶,比较两个栈顶元素,因此t = tail虽然执行了,但是也改变不了第一次入栈的t的值
综合来看,这个判断的逻辑是:
如果别的线程并发修改了tail,那么将tail赋值给p
如果没有并发修改,判断p和t,如果是第二次入队,p与t相等,就将q赋值给p
然后再走下一次循环执行q=p.next,如果这时p移动到了队尾,这里q就又是null了,执行第一个if,做CAS插入,如果不是队尾,再判断p != t && t != (t = tail),这一轮判断p!=t就成立了,在没有并发的情况下,后者也成立,就把t赋值给p,如果有并发,也把t赋值给p
然后再走下一次循环,继续判断q=p.next是否为null,直到找到队尾,走CAS插入
if (q == null) {
if (p.casNext(null, newNode)) {
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
插入后p!=t成立,因为这时p是真队尾,tail是假的,然后执行CAS操作修改tail指针。
补充图片concurrentqueue入队
可见入队操作,两个元素为1圈,tail不恒指向队尾。
为什么这样?
假如保证tail恒指向队尾,代码可以修改为
for (;;) {
Node<E> t = tail;
if (t.casNext(null, n) && casTail(t, n)) {
return true;
}
}
如果让tail恒指向队尾,每次插入就都需要指向两个操作:casNext、casTail,两个cas操作的性能势必不如1次/2次间隔,这样性能提升几乎50%
poll - 出队
出队因为没有阻塞,也是通过for循环死循环自旋实现的
首先假设一个场景,四个元素,一个head哨兵:
补充图片concurrentqueue出队
restartFromHead: for (;;) {
for (Node<E> h = head, p = h, q;; p = q) {
最外层是死循环强制自旋,第二层是指针移动的循环,初始条件是head值,让p=head,无循环条件,每次执行循环让p的指针指向q
if ((item = p.item) != null && p.casItem(item, null)) {
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
第一个if,即判断p指向的节点不是空节点(不是head哨兵),同时使用CAS设置p.item为null能成功,即取出了队头元素。如果这个时候h=head不指向对头,这里重置队头到下一个实际节点
补充图片concurrentqueue出队
但如果是第一圈循环,p一开始指向的是哨兵,这里就不成立,走第二个if
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
第二个if让q=p.next,然后判空,如果成立,即p没有下一个节点了,此时场景哨兵后面啥也没有,所以这里刷信头节点,返回null
else if (p == q)
continue restartFromHead;
如果判空不成立,走第三个if,这时肯定是不成立的,因为q=p.next,这时就不从外圈循环开始,而是从内圈开始。循环一次,让p=q。如果此时再跑第一个if成功取出了,这时p!=h成立,更新头节点并返回即可。
补充图片concurrentqueue出队
下次再弹节点,因为第二个for循环中,p=h指向一个实际节点,而非哨兵,第一个if判断p.item!=null能成立,直接就可以取出去了,这时p=h也成立,因此不需要再更新head直接返回。
从这个流程可以看出,两次弹出,移动一次head
评论区