含义
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
与CountDownLatch的区别
CountDownLatch只能用一次,CyclicBarrier可以使用reset()方法重置
CountDownLatch强调的是一等多,即一个线程阻塞,等一堆子线程执行完,CyclicBarrier强调的是多等一,即多个线程阻塞,等最后一个线程执行完再一起执行
CyclicBarrier源码分析
核心成员变量
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
基于AQS的ReentrantLock和Condition
private int count;
计数
private Generation generation = new Generation();
代际标志,因为CyclicBarrier是可以重置的,因此需要一个generation对象标志是不是还在这一代内。
构造函数
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
……
this.barrierCommand = barrierAction;
}
提供了两个构造函数,一个带Runnable,一个不带
将Runnable类型对象barrierAction给到barrierCommand变量,用于后续激活阻塞线程,如果有这个变量,优先激活barrierAction,用于一些特殊处理
await/dowait - 阻塞方法
int index = --count;
if (index == 0) { // tripped
Runnable command = barrierCommand;
if (command != null) {
try {
command.run();
} catch (Throwable ex) {
breakBarrier();
throw ex;
}
}
nextGeneration();
return 0;
}
可以看到,每个线程执行await时,都会让计数index减1,直到最后一个线程过来,index减为0,执行if语句段的内容。
if语句内其实是调用barrierCommand的run方法,因此由最后一个到达屏障的线程执行barrierCommand,这里有两个结果:
如果barrierCommand抛异常了,调用breakBarrier方法,唤醒前面阻塞的线程,同时重置屏障计数,同时将当前这一个generation设置为broken
如果barrierCommand正常完成,调用nextGeneration方法,更新代际标志,重置计数器,唤醒前面阻塞的线程
如果来的不是最后一个线程呢,又会做什么?
跳过上面的if语句,执行阻塞流程:
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// part1.
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// part2.
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
因为刚刚判断count非0,因此自己不是最后一个到达屏障的,先执行trip.await()阻塞自己,并且释放锁。等最后一个线程来了唤醒所有线程,拿到锁,继续跑,如果正常没有InterruptedException,走到part2
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
首先判断自己所处的代际是不是已经被改成broken了,如果是,自己也不处理了,直接抛异常;如果不是,判断当前generation是不是和自己是一个代际,如果不是,说明最后一个线程已经跑完了开始执行下一个代际了,自己就正常返回,结束阻塞即可
如果自己在阻塞过程中出现了InterruptedException,走到part1
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
这里看下自己所处的代际是不是当前代际,且自己所处的代际不能broken,这时判断是因为自己导致的当前代际失败,因此要主动调用breakBarrier方法,释放broken信号。否则,直接中断自己就可以了。
注意以下场景:
// 场景1
CyclicBarrier cb = new CyclicBarrier(2)
new Thread({ cb.await() }).start();
cb.await()
// 场景2
CyclicBarrier cb = new CyclicBarrier(2, new Runnable({}))
new Thread({ cb.await() }).start();
cb.await()
场景1中,CyclicBarrier没有action,因此最后一个到达屏障的线程直接唤醒前面阻塞的,主线程和子线程谁先谁后,完全由CPU分配时间片决定
场景2中,有action,最后一个达到屏障的线程需要执行Runnable中的方法,具体谁最后一个达到,由CPU分配时间片决定,因此最终顺序是Runnable最先完成,然后最后到达的线程执行完,然后第一个到达的执行完成
评论区