目 录CONTENT

文章目录

ForkJoin框架

FatFish1
2024-10-28 / 0 评论 / 0 点赞 / 56 阅读 / 0 字 / 正在检测是否收录...

什么是Fork/Join框架

出现于jdk7,它的作用是把一个大任务分割成若干小任务,最终汇总每个小任务结果后得到大任务结果。其中:

  • Fork:切分大任务成若干子任务执行

  • Join:合并子任务结果

含义其实比较类似于MapReduce

工作窃取算法

工作窃取算法指的是某个线程从其他队列里窃取任务来执行

有时处理一个大任务,可能把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放在不同的队列里面,用单独的线程执行。

但有些时候不是所有任务的执行压力都一样大,可能有的队列任务都执行完了,有的还有很多,这时候执行完的可以从没执行的队列里面捞任务

工作窃取算法一般会用到双端队列,被窃取任务的线程永远从双端队列头

Fork/Join框架编程流程

ForkJoinTask

Fork/Join框架中执行的Task是ForkJoinTask,它的两个子类分别是RecursiveActionRecursiveTask,分别用于没有返回和有返回的任务,类似线程池中的runnablecallable

ForkJoinPool

ForkJoinTask使用的线程池是ForkJoinPool,固定使用submit方法,返回值是ForkJoinTask,使用类似于Future接口,使用ForkJoinTask#get方法可以获取异步执行结果

编写ForkJoinTask

public static class CountTask extends RecursiveTask<Integer> {
    private int start;
    private int end;
    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        if (end - start <= 10) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            int middle = (start + end) / 2;
            CountTask left = new CountTask(start, middle);
            CountTask right = new CountTask(middle, end);
            left.fork();
            right.fork();
            sum = left.join() + right.join();
        }
        return sum;
    }
}

使用线程池执行

public static void main(String[] args) throws Exception{
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    ForkJoinTask<Integer> submit = forkJoinPool.submit(new CountTask(0, 1000));
    System.out.println(submit.get());
}

除了上面的submit,还有invoke、execute提交,区别如下:

  • invoke:invoke提交阻塞并等待结果返回

  • submit:提交但不等待结果返回,可以通过Future的实现类ForkJoinTask&get方法阻塞并等待结果获取

  • execute:只提交任务,无结果返回

源码分析

ForkJoinPool

常量定义

首先是一些界限定义:

static final int SMASK        = 0xffff;        // short bits == max index             0000 0000 0000 0000 1111 1111 1111 1111
static final int MAX_CAP      = 0x7fff;        // max #workers - 1理论最大工作线程数  0000 0000 0000 0000 0111 1111 1111 1111
static final int EVENMASK     = 0xfffe;        // even short bits                     0000 0000 0000 0000 1111 1111 1111 1110
static final int SQMASK       = 0x007e;        // max 64 (even) slots                 0000 0000 0000 0000 0000 0000 0111 1110

也有线程池状态定义

private static final int  RSLOCK     = 1;             // 0000 0000 0000 0000 0000 0000 0000 0001
private static final int  RSIGNAL    = 1 << 1;        // 0000 0000 0000 0000 0000 0000 0000 0010
private static final int  STARTED    = 1 << 2;        // 0000 0000 0000 0000 0000 0000 0000 0100
private static final int  STOP       = 1 << 29;       // 0010 0000 0000 0000 0000 0000 0000 0000
private static final int  TERMINATED = 1 << 30;       // 0100 0000 0000 0000 0000 0000 0000 0000
private static final int  SHUTDOWN   = 1 << 31;       // 1000 0000 0000 0000 0000 0000 0000 0000

WorkQueue - 内部类

ForkJoinPool的工作队列

核心成员变量包括:

ForkJoinTask<?>[] array;   // WorkQueue封装的数组
final ForkJoinWorkerThread owner; // 表示这个工作队列归属哪个线程

与其他线程池不同的是,ForkJoinPool的任务队列是一个队列数组,正常ThreadPoolExecutor是一个队列对象

WorkQueue[] queues;                  // main registry

这是由ForkJoinPool的特性决定的:

因为ForkJoinPool核心在于拆分任务,拆分后的任务有依赖关系,还得减少线程间竞争,因此就让线程池中的线程只执行自己的任务。这种情况下,必然出现有的任务池任务复杂,有的简单,导致木桶效应失衡。因此ForkJoinPool还引入了工作窃取机制

几个跟工作偷取相关的变量:

int nsteals;   // number of steals  偷取的任务个数
int hint;     // randomization and stealer index hint  记录偷取者的索引,方便后面顺藤摸瓜
volatile ForkJoinTask<?> currentJoin;  // 记录当前join来的任务
volatile ForkJoinTask<?> currentSteal; // 记录从其他工作队列偷取过来的任务

问题又来了,既然ForkJoinPool的工作队列也是个队列,那来了一个任务,应该往哪里放呢?

ForkJoinPool定义了一套路由规则,将Task分成了外部提交(submission Task)和自己拆分的(worker task), 将submission task放到WorkQueue[]的「偶数」下标中,将 workertask 放在 WorkQueue[]的「奇数」下标中,并且只有奇数下标才有线程( worker )与之相对

空参构造函数

this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
     defaultForkJoinWorkerThreadFactory, null, false,
     0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);

可以看到这里面取了一些默认值,调用全量构造函数。

  • 参数1并行度:取的MAX_CAP(理论最大线程数)和当前环境可用线程数的最小值

  • 参数2:线程池工厂;参数3:异常处理器,和ThreadPoolExecutor类似

全参构造

this.bounds = ((minAvail - p) & SMASK) | (maxSpares << SWIDTH);
this.mode = p | (asyncMode ? FIFO : 0);
this.ctl = ((((long)(-corep) << TC_SHIFT) & TC_MASK) |
            (((long)(-p)     << RC_SHIFT) & RC_MASK));
this.registrationLock = new ReentrantLock();
this.queues = new WorkQueue[size];
String pid = Integer.toString(getAndAddPoolIds(1) + 1);
this.workerNamePrefix = "ForkJoinPool-" + pid + "-worker-";

做了一些边界值计算,生成工作队列,设置线程名

ctl状态计算比较复杂,以后再研究

invoke、submit、execute - 三种任务提交方法

public <T> T invoke(ForkJoinTask<T> task) {
    externalSubmit(task);

invoke方法直接调用到externalSubmit,入参只能是ForkJoinTask实现类

public void execute(Runnable task) {
    externalSubmit((task instanceof ForkJoinTask<?>)
                       ? (ForkJoinTask<Void>) task // avoid re-wrap
                       : new ForkJoinTask.RunnableExecuteAction(task));
}

execute方法也是直接调用到externalSubmit,但是传入的时候判断了一下task的类型,如果是ForkJoinTask的实现类,直接传入,如果不是,先包装成ForkJoinTask

需要注意的是execute这里直接传入的是ForkJoinTask<Void>类型,说明没有结果返回

public <T> ForkJoinTask<T> submit(Callable<T> task) {
    return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task));

submit方法提交的则是封装了返回的ForkJoinTask,同时也调用到externalSubmit

externalSubmit - 方法提交核心

if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
    (q = (wt = (ForkJoinWorkerThread)t).workQueue) != null &&
    wt.pool == this)
    q.push(task, this);
else
    externalPush(task);

方法核心内容在这部分

首先判断条件是t是当前线程,并且它是ForkJoinWorker线程即ForkJoinPool的工作线程(这代表这个task是由fork拆分出来的,而非外部提交),同时ForkJoinPool中的workQueue非null,直接取q=workQueue这里是因为如果是由ForkJoinWorkerThread做fork拆分出来的任务,这个thread在构造的时候就已经绑定workqueue了,这里直接取就好了

这里如果满足条件,调用push方法,不满足调用externalPush

这里区分两个方法的原因是ForkJoinPool自身的特性决定的:

ForkJoinPool的任务来源是两方面:外部提交任务(走externalPush方法)和自身fork拆分的任务(走push方法),因此走两个方法做不同处理

WorkQueue#push

由workQueue通过fork方法内部提交任务的方法

if (a != null && pool != null && (cap = a.length) > 0) {
    setSlotVolatile(a, (m = cap - 1) & s, task);
    if (d == m)
        growArray();
    if (d == m || a[m & (s - 1)] == null)
        pool.signalWork(); // signal if was empty or resized
}

核心是这一部分,比较难理解。growArray用于扩容队列。signalWork用于唤醒工作线程。

externalPush - 外部提交任务

if ((q = submissionQueue()) == null)

首先调用submissionQueue方法获取要提交的workqueue

else if (q.lockedPush(task))

这里调用到WorkQueue#lockedPush方法。

submissionQueue

WorkQueue[] qs = queues;
……
else if ((q = qs[i = (n - 1) & id]) == null) {
……

qs是Pool持有的WorkQueue[]数组

q是经过位运算取出来的一个元素位,用于获取WorkQueue

WorkQueue#lockedPush

if (a != null && (cap = a.length) > 0) {
    a[(m = cap - 1) & s] = task;
    if (d == m)
        growArray();
    source = 0; // unlock
    if (d == m || a[m & (s - 1)] == null)
        return true;
}

与push方法还是有区别的

signalWork - 激活工作线程

createWorker();

除了一些阈值判断,核心调用到这里

createWorker

if (fac != null && (wt = fac.newThread(this)) != null) {
    wt.start();
    return true;
}

这里构造的是ForkJoinPool特殊的线程实现ForkJoinWorkerThread,调用线程的start方法

DefaultForkJoinWorkerThreadFactory#newThread

public ForkJoinWorkerThread run() {
    return new ForkJoinWorkerThread(null, pool, true, false);
}},

可以看到调用的是ForkJoinWorkerThread的哪一个构造函数

fork - 任务拆分

if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
    (w = (ForkJoinWorkerThread)t).workQueue.push(this, w.pool);
else
    ForkJoinPool.common.externalPush(this);

核心也是这个判断,判断条件与submit时差不多,判断当前线程

join - 任务合并

if ((s = status) >= 0)
    s = awaitDone(null, false, false, false, 0L);

执行阻塞等待

ForkJoinWorkerThread

线程的start方法调用本地方法start0,然后调用自己的run方法,这里已经重写了run方法

核心成员变量

final ForkJoinPool pool;                // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

工作线程中持有线程池pool的引用,方便回溯,同时还持有workQueue引用,可见ForkJoinPool中的WorkQueue[]中的小queue是线程独立的

构造函数

this.workQueue = new ForkJoinPool.WorkQueue(this, isInnocuous);

这里可以看到,WorkQueue在构造的时候塞入了归属的thread引用

run

p.registerWorker(w);
onStart();
p.runWorker(w);
……
onTermination(exception);
……
p.deregisterWorker(this, exception);

run方法中的核心包括:

  • registerWorker:注册Worker

  • onStart:预留方法

  • runWorker:启动任务

  • onTermination:处理异常

  • deregiserWorker:注销Worker

0

评论区