目 录CONTENT

文章目录

Condition

FatFish1
2024-10-23 / 0 评论 / 0 点赞 / 60 阅读 / 0 字 / 正在检测是否收录...

condition的概念和使用

condition用于实现线程之间的合作,执行Condition::awiat方法可以让某个线程进入condition等待状态,并等待其他线程释放Condition::signal信号。示例如下:

public class ReentrantLockDemo {
    static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        Condition condition = lock.newCondition();

        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("线程一加锁成功");
                System.out.println("线程一执行await被挂起");
                condition.await();
                System.out.println("线程一被唤醒成功");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println("线程一释放锁成功");
            }
        }).start();

        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("线程二加锁成功");
                condition.signal();
                System.out.println("线程二唤醒线程一");
            } finally {
                lock.unlock();
                System.out.println("线程二释放锁成功");
            }
        }).start();
    }
}

condition源码分析

condition也有一个FIFO的等待队列,是跟AQS的同步队列并存的。

private transient Node firstWaiter;
private transient Node lastWaiter;

他也有自己的队头指针和队尾指针。但是对于这个队列的处理没有使用CAS保证,这是因为condition是和lock绑定的,能操作这个队列那必定是获取了lock

condition的方法

condition提供的方法包括:

  • await():线程进入等待状态直到被通知(signal)或中断,当前线程将进入运行状态且从await()方法返回。两种情况:要么是通过interrupt()中断的,要么是获取到了Condition对象锁对应的锁。进入await状态的线程会释放持有的lock,这一点和Object的wait一致

  • awaitUninterruptibly():对中断不敏感的等待

  • awaitUntil(Date ddl):进入等待状态直到被通知或者到达某个设定时间

  • signal():唤醒一个在Condition上等待的线程,返回前必须拿到锁,这一点和Object的notify一致

  • signalAll():唤醒所有线程,但是只有拿到锁的才能真正返回

await - 执行condition等待

await方法首先拿当前线程构造节点,把生成的node添加到condition队列中。一个Condition对象创建一个condition队列。

Node node = addConditionWaiter();

int savedState = fullyRelease(node);

然后执行fullyRelease方法释放这个节点的锁。这个方法就是先获取state,然后调用锁自己实现的release方法释放。如果释放失败了,节点要变成cancelled状态。可见一旦触发condition::await,哪怕已经拿到锁的线程也要立即释放。

然后进入一段循环,循环条件是node还不在AQS同步队列中。

    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }

    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    return findNodeFromTail(node);

简单看下面的判断方法:

如果node的status是condition那一定不在,前驱为null那也一定不在,如果后继非null那一定在,否则就从尾向头找。

while循环的内容是park当前的线程,等待signal。

这里为什么要用while+判断队列,因为要防止本线程condition后,一定是不在AQS等待队列里面的,如果发现这时候又在了,那肯定是已经被signal并且插入到AQS等待队列了,具体逻辑见后面signal,这时就不需要park了,而是直接继续走就好了

在被signal信号唤醒后,调用checkInterruptWhileWaiting方法判断状态同时插入AQS等待队列。

这里判断中断是在判断什么?因为中断和signal都可能唤醒线程,因此该方法判断的是当前线程中断和signal哪个执行导致的唤醒

然后持续开始自旋获取锁,可以跳转AQS的部分

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;

最后处理中断场景。

addConditionWaiter - 添加nodecondition队列

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }

在这里先取等待队列最后一个node,t。如果t非空,且t的状态不是condition的,执行unlinkCancelledWaiters方法把所有的cancelled状态节点全干掉。干掉后让t指向最新的last节点。

然后把当前线程创建成一个condition节点插入到队列中。最后返回这个节点。

unlinkCancelledWaiters - 干掉condition中的非condition节点

    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }

这里就好理解了,取first和移动标志trail,然后取first的next,first本身状态,决定trail怎么移动,如果t的状态不对,trail移动到next,如果t的状态对,trail指向t,t指向下一个。这样一直到最后把非condition的干掉。

signal - 唤醒condition队列中的第一个节点

    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);

首先判断持锁线程是不是当前线程。如果是,直接取condition队列中的头节点,通过doSignal给他做解锁。

doSignal - 唤醒单个contition节点

do {
    if ( (firstWaiter = first.nextWaiter) == null)
        lastWaiter = null;
    first.nextWaiter = null;
} while (!transferForSignal(first) &&
         (first = firstWaiter) != null);

调用transferForSignal,当执行不成功且头节点非空,重复执行

如果获取到的first非null,继续取后继节点

然后执行transferForSignal方法解锁,解不了重新执行上面的判断。

transferForSignal - 唤醒单个contition节点

这里面的逻辑就是把condition状态改为0状态,改成功后把这个头节点插入到AQS等待队列中

Node p = enq(node);

这里执行的是AQS的安全自旋入队的enq方法

然后执行unpark,让他从await方法的park部分继续执行。这里状态因为改为0,回到await的暂停位置判断循环条件就不会通过,从而能过跳出循环,开始申请锁。

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus  Node.CONDITION || node.prev  null)
            return false;

唤醒完成。

condition的代码案例

阻塞队列的入队和出队操作就是condition的典型应用。

以LinkedBlockingQueue为例,阻塞的出队入队方法:take、put方法都各自持有一个Condition对象:notFull、notEmpty。

take方法先获取AQS锁,获取到锁后判断如果当前队空,立刻触发notEmpty.await对当前线程释放锁并且当前线程在等待队列中排队,直到有一个入队方法例如put、offer等执行singal方法,才唤醒Condition队列中的第一个等待线程去做取操作。

put方法同理。

具体代码参考LinkedBlockingQueue部分。

0

评论区