condition的概念和使用
condition用于实现线程之间的合作,执行Condition::awiat方法可以让某个线程进入condition等待状态,并等待其他线程释放Condition::signal信号。示例如下:
public class ReentrantLockDemo {
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
try {
System.out.println("线程一加锁成功");
System.out.println("线程一执行await被挂起");
condition.await();
System.out.println("线程一被唤醒成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("线程一释放锁成功");
}
}).start();
new Thread(() -> {
lock.lock();
try {
System.out.println("线程二加锁成功");
condition.signal();
System.out.println("线程二唤醒线程一");
} finally {
lock.unlock();
System.out.println("线程二释放锁成功");
}
}).start();
}
}
condition源码分析
condition也有一个FIFO的等待队列,是跟AQS的同步队列并存的。
private transient Node firstWaiter;
private transient Node lastWaiter;
他也有自己的队头指针和队尾指针。但是对于这个队列的处理没有使用CAS保证,这是因为condition是和lock绑定的,能操作这个队列那必定是获取了lock。
condition的方法
condition提供的方法包括:
await()
:线程进入等待状态直到被通知(signal)或中断,当前线程将进入运行状态且从await()方法返回。两种情况:要么是通过interrupt()中断的,要么是获取到了Condition对象锁对应的锁。进入await状态的线程会释放持有的lock,这一点和Object的wait一致awaitUninterruptibly()
:对中断不敏感的等待awaitUntil(Date ddl)
:进入等待状态直到被通知或者到达某个设定时间signal()
:唤醒一个在Condition上等待的线程,返回前必须拿到锁,这一点和Object的notify一致signalAll()
:唤醒所有线程,但是只有拿到锁的才能真正返回
await - 执行condition等待
await方法首先拿当前线程构造节点,把生成的node添加到condition队列中。一个Condition对象创建一个condition队列。
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
然后执行fullyRelease方法释放这个节点的锁。这个方法就是先获取state,然后调用锁自己实现的release方法释放。如果释放失败了,节点要变成cancelled状态。可见一旦触发condition::await,哪怕已经拿到锁的线程也要立即释放。
然后进入一段循环,循环条件是node还不在AQS同步队列中。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
简单看下面的判断方法:
如果node的status是condition那一定不在,前驱为null那也一定不在,如果后继非null那一定在,否则就从尾向头找。
while循环的内容是park当前的线程,等待signal。
这里为什么要用while+判断队列,因为要防止本线程condition后,一定是不在AQS等待队列里面的,如果发现这时候又在了,那肯定是已经被signal并且插入到AQS等待队列了,具体逻辑见后面signal,这时就不需要park了,而是直接继续走就好了。
在被signal信号唤醒后,调用checkInterruptWhileWaiting方法判断状态同时插入AQS等待队列。
这里判断中断是在判断什么?因为中断和signal都可能唤醒线程,因此该方法判断的是当前线程中断和signal哪个执行导致的唤醒。
然后持续开始自旋获取锁,可以跳转AQS的部分
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
最后处理中断场景。
addConditionWaiter - 添加node到condition队列
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
在这里先取等待队列最后一个node,t。如果t非空,且t的状态不是condition的,执行unlinkCancelledWaiters方法把所有的cancelled状态节点全干掉。干掉后让t指向最新的last节点。
然后把当前线程创建成一个condition节点插入到队列中。最后返回这个节点。
unlinkCancelledWaiters - 干掉condition中的非condition节点
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
这里就好理解了,取first和移动标志trail,然后取first的next,first本身状态,决定trail怎么移动,如果t的状态不对,trail移动到next,如果t的状态对,trail指向t,t指向下一个。这样一直到最后把非condition的干掉。
signal - 唤醒condition队列中的第一个节点
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
首先判断持锁线程是不是当前线程。如果是,直接取condition队列中的头节点,通过doSignal给他做解锁。
doSignal - 唤醒单个contition节点
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
调用transferForSignal,当执行不成功且头节点非空,重复执行
如果获取到的first非null,继续取后继节点
然后执行transferForSignal方法解锁,解不了重新执行上面的判断。
transferForSignal - 唤醒单个contition节点
这里面的逻辑就是把condition状态改为0状态,改成功后把这个头节点插入到AQS等待队列中
Node p = enq(node);
这里执行的是AQS的安全自旋入队的enq方法
然后执行unpark,让他从await方法的park部分继续执行。这里状态因为改为0,回到await的暂停位置判断循环条件就不会通过,从而能过跳出循环,开始申请锁。
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus Node.CONDITION || node.prev null)
return false;
唤醒完成。
condition的代码案例
阻塞队列的入队和出队操作就是condition的典型应用。
以LinkedBlockingQueue为例,阻塞的出队入队方法:take、put方法都各自持有一个Condition对象:notFull、notEmpty。
take方法先获取AQS锁,获取到锁后判断如果当前队空,立刻触发notEmpty.await对当前线程释放锁并且当前线程在等待队列中排队,直到有一个入队方法例如put、offer等执行singal方法,才唤醒Condition队列中的第一个等待线程去做取操作。
put方法同理。
具体代码参考LinkedBlockingQueue部分。
评论区