目 录CONTENT

文章目录

Future - 异步任务和结果

FatFish1
2024-10-28 / 0 评论 / 0 点赞 / 68 阅读 / 0 字 / 正在检测是否收录...

Future接口

Future接口定义了操作异步任务执行的一些方法,提供了一种异步并行计算的功能,例如:获取异步任务的执行结果、取消异步任务的执行、判断任务是否被取消、判断任务执行是否完毕等

boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

RunnableFuture - 可执行的Future

继承了Future和Runnable两个接口,它出现的意义在于:

  • 异步计算:在某些情况下,我们希望在后台执行某些任务,然后在需要的时候再获取结果。FutureTask 允许程序在提交任务后立即返回,并可以在将来的某个时间点获取结果。

  • 任务取消:FutureTask 提供了取消任务的功能,你可以决定一个任务是否应该被中止。

  • 避免回调地狱:通过 FutureTask,我们可以更清晰地编排代码,而不是使用复杂的回调机制。

FutureTask

是什么

FutureTask实现了RunnableFuture接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。

它存在的意义是给耗时任务增加一个并行执行其他任务的方案,同时能够获取结果

根据FutureTask.run()方法被执行的时机,FutureTask可以处于下面3种状态:

  • 未启动:还没执行FutureTask#run方法

  • 已启动:正在执行FutureTask#run方法

  • 已完成:FutureTask#run方法正常结束/被取消/抛出了异常

用法

提交FutureTask:

// 使用线程池执行
executorService.submit(futureTask);
// 自行执行
futureTask.run();

获取futureTask结果

futureTask.get();

多线程执行和获取,get方法会阻塞直到执行线程完成

源码分析

构造函数

public FutureTask(Callable<V> callable) {
    ……
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

两个构造函数,会将无论callable还是runnable类型的入参统一封装成callable来执行

run

if (state != NEW ||
    !RUNNER.compareAndSet(this, null, Thread.currentThread()))

首先通过CAS修改状态

Callable<V> c = callable;
……
result = c.call();

执行Callable类的call方法,拿到结果

set(result);

执行set方法处理结果

set

outcome = v;
STATE.setRelease(this, NORMAL); // final state

首先把结果v给到outcome,然后CAS修改状态

finishCompletion();

激活阻塞的等待结果的线程

finishCompletion - 激活等待线程

for (WaitNode q; (q = waiters) != null;) {

是一个自旋,当有线程等待,开启自旋

Thread t = q.thread;
……
q.thread = null;
LockSupport.unpark(t);

激活线程

get - 获取结果

s = awaitDone(false, 0L);
return report(s);

两个,方法,一个阻塞等待,一个返回结果

awaitDone - 阻塞等待

for (;;) {
    // 场景1
    int s = state;
    if (s > COMPLETING) {
        if (q != null)
            q.thread = null;
        return s;
    }

也是一个自旋,如果当前状态已经完成,直接返回完成状态

场景1:当state>COMPLETING,说明已经完成,直接返回

// 场景2
else if (s == COMPLETING)
    Thread.yield();

场景2:COMPLETING状态,说明已经完成了,正在设置结果,这里就不做阻塞了,先让出cpu资源,再开启下一轮循环,减少线程上下文切换

else if (q == null) {
    if (timed && nanos <= 0L)
        return s;
    q = new WaitNode();

如果没获取到值,且没构造过node,这里先构造node,但是本轮不阻塞,再循环一次,下次阻塞,也是为了减少上下文切换

// 场景3:超时阻塞
else if (timed) {
    ……
}
// 场景4:直接阻塞
else
    LockSupport.park(this);

场景3、4就是阻塞,再循环一圈都没获取到值,就把自己挂起,等待执行线程调用finishCompletion方法唤醒自己

CompletableFuture

CompletableFuture不仅实现了Future相关接口,还实现了CompletionStage接口,是对Future的非常强大的扩展,提供函数式编程的能力。

CompletableFuture可以代表一个明确完成的 Future,也可以代表一个完成阶段 CompletionStage

ComletionStage

提供了许多函数式编程相关的能力

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenRun(Runnable action);
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
……

ComletableFuture源码分析

构造

四个静态方法基本可以分成两组,一组是无返回值的runAsync,一组是有返回值的supplyAsync

// 无返回值的runAsync
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 有返回值的supplyAsync
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

whenComplete - 处理结果

exceptionally - 处理异常

0

评论区