AQS概述
AQS(AbstractQueuedSynchronizer),所谓的AQS即是抽象的队列式的同步器,内部定义了很多锁相关的方法。AQS框架包括以下部分:
countDownLatch:计数器
reentrantLock:锁
reentrantReadWriteLock:读写锁
cyclleBarrier:计数器
semaphore:信号量
AQS的实现原共享资源状态(volatile int state)+FIFO线程等待队列。这里volatile能够保证多线程下的可见性,当state=1则代表当前对象锁已经被占有,其他线程来加锁时则会失败,加锁失败的线程会被放入一个FIFO的等待队列中,被UNSAFE.park()操作挂起,等待其他获取锁的线程释放锁才能够被唤醒。
AbstractQueuedSynchronizer
AQS的核心设计思路就是一个volatile的共享变量state和一个volatile修饰的FIFO队列,这两个都是线程安全的,能够在释放和获取锁时被正确改变。
// 阻塞队列的头节点
private transient volatile Node head;
// 阻塞队列的尾节点
private transient volatile Node tail;
// 锁状态
private volatile int state;
AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer,继承了持锁对象成员变量
private transient Thread exclusiveOwnerThread;
但由于AQS本身提供的是锁竞争的逻辑,不是真正的锁,所以此对象少有体现。
AQS有一个内部类Node,观察Node的成员变量:
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
private volatile int state;
private transient volatile Node head;
private transient volatile Node tail;
可以看到node的设计是一个双向的链表,同时保有其加锁线程,以及状态。同时还有一个直接指向头尾的head、tail。
这里的waitStatus是FIFO队列里面等待加锁的元素的状态,有以下几种:
SIGNAL:-1,表示待加锁任务的线程处于park状态
CANCELLED:1,表示待加锁线程已取消
CONDITION:-2,表示待加锁线程条件等待
PROPAGATE:-3
独占锁的申请流程和阻塞队列元素存入
在锁的申请流程中,acquire通过调用子类(真正的锁)的tryAcquire方法获取锁,具体怎么获取,看子类怎么实现的。如果获取不到,则把当前的任务塞到阻塞队列中做队尾。同时还要拿到这个node的前驱,如果前驱是head,尝试再次加锁。如果加锁加不上,或前驱不是head,则将前驱prev的状态修改为SIGNAL,同时线程挂起。
acquire - 获取锁方法架构
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire方法调用tryAcquire尝试获取锁,如果获取不到,则调用加入阻塞队列。tryAcquire就是父类的抽象方法,留给子类去实现具体怎么获取锁。
如果获取锁失败了,先执行addWaiter添加阻塞队列。添加队列完成后执行acquireQueued方法让这个新节点以死循环的方式获取锁,获取不到进入阻塞状态。
addWaiter - 把当前节点添加到阻塞队列尾部
Node pred = tail;
if (pred != null) {
……
}
enq(node);
这里比较简单就是拿到tail,然后让新加入的node做tail,新node的prev是前tail。如果tail是null,执行enq插入,enq方法执行的是死循环的形式插入,确保Node一定能正确添加到尾部。
for (;;) {
……
compareAndSetTail(t, node)
}
acquireQueued - 自旋申请锁
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
node是addWaiter添加到队列里面的那个没拿到锁的新任务。
node.predecessor是为了获取node的前驱p。这里判断,如果当前节点的前驱p是头结点head,然后让当前线程执行tryAcquire不阻塞申请锁。
如果申请到了,让node自己变成head,同时执行p.next=null是为了让p引用释放,便于垃圾回收。
如果没申请到,执行shouldParkAfterFailedAcquire方法修改状态。然后暂停线程。
这里是个永不停的for循环,一旦par掉,for循环暂停。直到后面release流程把head的后继激活,如果就是它,则继续推动它的head进行获取锁,如果不是它,那它还是park在这,继续等着。
这里为什么前驱是头节点才能获取锁?答:
头节点是已经成功获取到同步状态的节点,而头节点释放同步状态后,会唤醒后继节点,每个被唤醒的节点都检查自己的前驱是否是头
维护同步队列的FIFO原则,因为是先进先出。
shouldParkAfterFailedAcquire - 修改队列元素状态并判断是否满足park条件
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
这里拿pred的waitStatus,如果以及是signal了,就不再执行了直接return true。
如果waitStatus > 0,意味着前驱已经被取消了,则循环找前驱的前驱,把前面所有状态>0的全部丢弃,直到非取消状态的,配置为node的前驱,并且修改waitStatus为signal
parkAndCheckInterrupt暂停线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
这里就是把当前线程挂起了。park方法内部执行了线程的暂停。
独占锁的释放流程和阻塞队列元素的清理
执行release操作也是调用到子类锁中的tryRelease,判断锁释放后,执行唤醒当前线程的方法,被唤醒的等锁线程会继续其park的地方继续执行,即又回到了acquireQueued方法的无限for循环中。
release - 锁释放骨架
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
这里的tryRelease还是留给子类锁去实现的。如果释放锁成功了,则拿阻塞队列中的head,如果head的状态非0(大概率为SIGNAL -1)执行unparkSuccessor方法激活它。
unparkSuccessor - 激活后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
传进来的node是head,如果head的waitStatus是<0的,可以直接set为0,然后往后找。
取head的后继s,如果s为null或s的waitStatus>0,证明为cancel状态,则从tail往前推,直到推到t为head,或t为null停止,找到的最后一个t就是head需要指向的后继。
此时调用unpark方法激活s的线程,从前面挂起的地方(acquireQueued)继续执行
共享锁的获取和释放操作
共享锁获取与独占锁的区别在于,同一时刻是否能有多个线程同时获取到同步状态。
acquireShared - 共享锁获取锁骨架
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
这里也是先调用到锁自行实现的tryAcquireShared方法,区别就在于try方法的返回,共享锁返回>0说明是获取到,如果<0说明获取不到,然后调用doAcquireShared尝试自旋阻塞获取。
doAcquireShared - 共享锁自旋阻塞获取锁
for (;;) {
……
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
……
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
共享锁的自旋阻塞流程,可以看到,如果p是头节点,就可以去非阻塞获取锁,如果获取到了,这里调用setHeadAndPropagate方法把它设置为头节点,但这里会判断后面是否都是共享,如果是,立刻释放锁,给下一个获取
如果p不是头,才进入Park逻辑
setHeadAndPropagate - 共享锁设置node状态
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
这里与独占锁的区别在于,独占锁只是做setHead操作,但是共享锁还需要判断下一个节点也是共享,就直接通过doReleaseShared方法通知后面的节点继续抢锁
releaseShared - 共享锁释放锁骨架
if (tryReleaseShared(arg)) {
doReleaseShared();
……
非阻塞释放,如果成功,调用doReleaseShared方法唤醒后续节点,在doReleaseShared方法中也是获取头节点,通过无限自旋和CAS操作做状态修改,调用unparkSuccessor唤醒后置节点
独占式超时锁获取和释放
doAcquireNanos - 阻塞获取独占式超时锁
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
addWaiter的流程都一样,但是这里生成了一个deadline,用于判断超时
获取锁调用tryAcquire都一样,到获取不到判断超时的逻辑
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
首先如果deadline - 当前时间是负的,说明已经超时了,不用自旋直接返回了
否则,判断是否阻塞,跟独占锁一样,如果需要阻塞,还需要判断一下超时时间和超时时间阈值,如果nanosTimeout小于等于spinForTimeoutThreshold(1000纳秒)时,将不会使该线程进行超时等待,而是进入快速的自旋过程。原因在于,非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让nanosTimeout的超时从整体上表现得反而不精确。
因此超时时间很短的场景下,同步器会进入无条件的快速自旋。
AQS阻塞队列的其他操作
hasQueuedPredecessors - 判断队列是否有节点
判断思路如下:
第一个if判断head是否为null,为null直接返回fale没有节点,如果head非null判断第二个if
第二个if判断head.next(s)为null,或head.next的waitStatus是取消状态,如果第二个if判断失败,则判断第三个if。如果判断成功,从tail往前找,直到找到head或找到一个不为null的节点给s,再判断第三个if。
第三个if判断s非null,且s的所属线程不是当前线程,说明是等待状态的,证明队列有节点。这里s要么是第二个if中的head.next,要么是从tail往前找到的非null的节点。
if (s != null && s.thread != Thread.currentThread())
这里第三个if再次判断s非空,是为了防止enq存在并发操作。因为前面的入对操作,第一个线程先让tail = node,再让head.next = node,执行完第一步,第二步还没执行的时候,第二个线程来了正好判断head.next=null
transferAfterCancelledWait - 判断中断状态
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
//使用cas修改节点状态,如果还能修改成功,说明线程被中断时,signal还没有被调用。
// 这里要注意,就是线程被唤醒,并不一定是在java层面执行了locksupport.unpark
// 也可能是调用了线程的interrupt()方法,这个方法会更新一个中断标识,并且会唤醒处于阻塞状态下的线程。
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node); //如果cas成功,则把
node添加到AQS队列
return true;
}
//如果cas失败,则判断当前node是否已经在AQS队列上,如果不在,则让给其他线程执行
//当node被触发了signal方法时,node就会被加到 aqs队列上 while (!isOnSyncQueue(node))
//循环检测node是否已经成功添加到AQS队列中。
// 如果没有,则通过yield, 释放资源
Thread.yield();
return false;
}
check方法是判断线程是否中断,中断的话执行transfer方法判断抛异常还是继续中断
isHeldExclusively - 判断持锁线程是不是当前线程
是一个留给锁实现类自己去实现的判断方法。它用于判断持锁线程是不是当前的线程。
acquireInterruptibly - 监测当前线程是否中断的加锁方法
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
该方法会先判断当前线程是否是中断状态,如果是直接抛出中断异常,放弃取锁。如果拿不到锁,这里的入队和等待方法同样也是支持线程中断监测的。
doAcquireInterruptibly - 监测当前线程是否中断的入队方法
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
与acquire方法不同的点在于,该方法在park时会检查线程的Interrupt状态,如果是interrupt的,同样抛出中断异常。
评论区