什么是Fork/Join框架
出现于jdk7,它的作用是把一个大任务分割成若干小任务,最终汇总每个小任务结果后得到大任务结果。其中:
Fork:切分大任务成若干子任务执行
Join:合并子任务结果
含义其实比较类似于MapReduce
工作窃取算法
工作窃取算法指的是某个线程从其他队列里窃取任务来执行。
有时处理一个大任务,可能把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放在不同的队列里面,用单独的线程执行。
但有些时候不是所有任务的执行压力都一样大,可能有的队列任务都执行完了,有的还有很多,这时候执行完的可以从没执行的队列里面捞任务
工作窃取算法一般会用到双端队列,被窃取任务的线程永远从双端队列头
Fork/Join框架编程流程
ForkJoinTask
Fork/Join框架中执行的Task是ForkJoinTask,它的两个子类分别是RecursiveAction
和RecursiveTask
,分别用于没有返回和有返回的任务,类似线程池中的runnable
和callable
ForkJoinPool
ForkJoinTask使用的线程池是ForkJoinPool,固定使用submit方法,返回值是ForkJoinTask,使用类似于Future接口,使用ForkJoinTask#get
方法可以获取异步执行结果
编写ForkJoinTask
public static class CountTask extends RecursiveTask<Integer> {
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if (end - start <= 10) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
int middle = (start + end) / 2;
CountTask left = new CountTask(start, middle);
CountTask right = new CountTask(middle, end);
left.fork();
right.fork();
sum = left.join() + right.join();
}
return sum;
}
}
使用线程池执行
public static void main(String[] args) throws Exception{
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> submit = forkJoinPool.submit(new CountTask(0, 1000));
System.out.println(submit.get());
}
除了上面的submit,还有invoke、execute提交,区别如下:
invoke:invoke提交阻塞并等待结果返回
submit:提交但不等待结果返回,可以通过Future的实现类ForkJoinTask&get方法阻塞并等待结果获取
execute:只提交任务,无结果返回
源码分析
ForkJoinPool
常量定义
首先是一些界限定义:
static final int SMASK = 0xffff; // short bits == max index 0000 0000 0000 0000 1111 1111 1111 1111
static final int MAX_CAP = 0x7fff; // max #workers - 1理论最大工作线程数 0000 0000 0000 0000 0111 1111 1111 1111
static final int EVENMASK = 0xfffe; // even short bits 0000 0000 0000 0000 1111 1111 1111 1110
static final int SQMASK = 0x007e; // max 64 (even) slots 0000 0000 0000 0000 0000 0000 0111 1110
也有线程池状态定义
private static final int RSLOCK = 1; // 0000 0000 0000 0000 0000 0000 0000 0001
private static final int RSIGNAL = 1 << 1; // 0000 0000 0000 0000 0000 0000 0000 0010
private static final int STARTED = 1 << 2; // 0000 0000 0000 0000 0000 0000 0000 0100
private static final int STOP = 1 << 29; // 0010 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 1 << 30; // 0100 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN = 1 << 31; // 1000 0000 0000 0000 0000 0000 0000 0000
WorkQueue - 内部类
ForkJoinPool的工作队列
核心成员变量包括:
ForkJoinTask<?>[] array; // WorkQueue封装的数组
final ForkJoinWorkerThread owner; // 表示这个工作队列归属哪个线程
与其他线程池不同的是,ForkJoinPool的任务队列是一个队列数组,正常ThreadPoolExecutor是一个队列对象
WorkQueue[] queues; // main registry
这是由ForkJoinPool的特性决定的:
因为ForkJoinPool核心在于拆分任务,拆分后的任务有依赖关系,还得减少线程间竞争,因此就让线程池中的线程只执行自己的任务。这种情况下,必然出现有的任务池任务复杂,有的简单,导致木桶效应失衡。因此ForkJoinPool还引入了工作窃取机制。
几个跟工作偷取相关的变量:
int nsteals; // number of steals 偷取的任务个数
int hint; // randomization and stealer index hint 记录偷取者的索引,方便后面顺藤摸瓜
volatile ForkJoinTask<?> currentJoin; // 记录当前join来的任务
volatile ForkJoinTask<?> currentSteal; // 记录从其他工作队列偷取过来的任务
问题又来了,既然ForkJoinPool的工作队列也是个队列,那来了一个任务,应该往哪里放呢?
ForkJoinPool定义了一套路由规则,将Task分成了外部提交(submission Task)和自己拆分的(worker task), 将submission task放到WorkQueue[]的「偶数」下标中,将 workertask 放在 WorkQueue[]的「奇数」下标中,并且只有奇数下标才有线程( worker )与之相对
空参构造函数
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false,
0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
可以看到这里面取了一些默认值,调用全量构造函数。
参数1并行度:取的MAX_CAP(理论最大线程数)和当前环境可用线程数的最小值
参数2:线程池工厂;参数3:异常处理器,和
ThreadPoolExecutor
类似
全参构造
this.bounds = ((minAvail - p) & SMASK) | (maxSpares << SWIDTH);
this.mode = p | (asyncMode ? FIFO : 0);
this.ctl = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
(((long)(-p) << RC_SHIFT) & RC_MASK));
this.registrationLock = new ReentrantLock();
this.queues = new WorkQueue[size];
String pid = Integer.toString(getAndAddPoolIds(1) + 1);
this.workerNamePrefix = "ForkJoinPool-" + pid + "-worker-";
做了一些边界值计算,生成工作队列,设置线程名
ctl状态计算比较复杂,以后再研究
invoke、submit、execute - 三种任务提交方法
public <T> T invoke(ForkJoinTask<T> task) {
externalSubmit(task);
invoke方法直接调用到externalSubmit
,入参只能是ForkJoinTask
实现类
public void execute(Runnable task) {
externalSubmit((task instanceof ForkJoinTask<?>)
? (ForkJoinTask<Void>) task // avoid re-wrap
: new ForkJoinTask.RunnableExecuteAction(task));
}
execute方法也是直接调用到externalSubmit,但是传入的时候判断了一下task的类型,如果是ForkJoinTask的实现类,直接传入,如果不是,先包装成ForkJoinTask
需要注意的是execute这里直接传入的是ForkJoinTask<Void>
类型,说明没有结果返回
public <T> ForkJoinTask<T> submit(Callable<T> task) {
return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task));
submit方法提交的则是封装了返回的ForkJoinTask,同时也调用到externalSubmit
externalSubmit - 方法提交核心
if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
(q = (wt = (ForkJoinWorkerThread)t).workQueue) != null &&
wt.pool == this)
q.push(task, this);
else
externalPush(task);
方法核心内容在这部分
首先判断条件是t是当前线程,并且它是ForkJoinWorker线程即ForkJoinPool的工作线程(这代表这个task是由fork拆分出来的,而非外部提交),同时ForkJoinPool中的workQueue非null,直接取q=workQueue
,这里是因为如果是由ForkJoinWorkerThread做fork拆分出来的任务,这个thread在构造的时候就已经绑定workqueue了,这里直接取就好了
这里如果满足条件,调用push方法,不满足调用externalPush
这里区分两个方法的原因是ForkJoinPool自身的特性决定的:
ForkJoinPool的任务来源是两方面:外部提交任务(走externalPush方法)和自身fork拆分的任务(走push方法),因此走两个方法做不同处理
WorkQueue#push
由workQueue通过fork方法内部提交任务的方法
if (a != null && pool != null && (cap = a.length) > 0) {
setSlotVolatile(a, (m = cap - 1) & s, task);
if (d == m)
growArray();
if (d == m || a[m & (s - 1)] == null)
pool.signalWork(); // signal if was empty or resized
}
核心是这一部分,比较难理解。growArray用于扩容队列。signalWork用于唤醒工作线程。
externalPush - 外部提交任务
if ((q = submissionQueue()) == null)
首先调用submissionQueue
方法获取要提交的workqueue
else if (q.lockedPush(task))
这里调用到WorkQueue#lockedPush
方法。
submissionQueue
WorkQueue[] qs = queues;
……
else if ((q = qs[i = (n - 1) & id]) == null) {
……
qs是Pool持有的WorkQueue[]
数组
q是经过位运算取出来的一个元素位,用于获取WorkQueue
WorkQueue#lockedPush
if (a != null && (cap = a.length) > 0) {
a[(m = cap - 1) & s] = task;
if (d == m)
growArray();
source = 0; // unlock
if (d == m || a[m & (s - 1)] == null)
return true;
}
与push方法还是有区别的
signalWork - 激活工作线程
createWorker();
除了一些阈值判断,核心调用到这里
createWorker
if (fac != null && (wt = fac.newThread(this)) != null) {
wt.start();
return true;
}
这里构造的是ForkJoinPool特殊的线程实现ForkJoinWorkerThread
,调用线程的start方法
DefaultForkJoinWorkerThreadFactory#newThread
public ForkJoinWorkerThread run() {
return new ForkJoinWorkerThread(null, pool, true, false);
}},
可以看到调用的是ForkJoinWorkerThread的哪一个构造函数
fork - 任务拆分
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
(w = (ForkJoinWorkerThread)t).workQueue.push(this, w.pool);
else
ForkJoinPool.common.externalPush(this);
核心也是这个判断,判断条件与submit时差不多,判断当前线程
join - 任务合并
if ((s = status) >= 0)
s = awaitDone(null, false, false, false, 0L);
执行阻塞等待
ForkJoinWorkerThread
线程的start方法调用本地方法start0,然后调用自己的run方法,这里已经重写了run方法
核心成员变量
final ForkJoinPool pool; // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
工作线程中持有线程池pool的引用,方便回溯,同时还持有workQueue引用,可见ForkJoinPool中的WorkQueue[]
中的小queue是线程独立的
构造函数
this.workQueue = new ForkJoinPool.WorkQueue(this, isInnocuous);
这里可以看到,WorkQueue在构造的时候塞入了归属的thread引用
run
p.registerWorker(w);
onStart();
p.runWorker(w);
……
onTermination(exception);
……
p.deregisterWorker(this, exception);
run方法中的核心包括:
registerWorker:注册Worker
onStart:预留方法
runWorker:启动任务
onTermination:处理异常
deregiserWorker:注销Worker
评论区