45 利用CompletableFuture进行组合式异步编程
Diego38 61 1

1. 前言

上一节讲了Executors工具类,可以帮助很方便的创建线程池。

CompletableFuture是JDK1.8的新引入的类,和Executors一样,目标也是为了简化线程池异步操作的,主要是为了简化任务提交和FutureTask的get的。

2. 为什么要使用CompletableFuture

CompletableFuture主要为了让Future操作能够更加灵活,在实际业务场景中,当提交任务后,需要对结果进行再处理,做submit + get + callback 这一系列操作能否封装起来呢?

首先看下面的使用Future的例子:

public class FutureTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService es = Executors.newFixedThreadPool(10);
        Future<Integer> feture = es.submit(() -> {
            Thread.sleep(100);
            return 1000;
        });
        //任务执行完成后,对结果进行输出
        System.out.println(feture.get());
    }
}

提交任务 + 获取结果 + 对结果进行处理,通过CompletableFuture可以合并成一个操作的,代码如下:

public class FutureTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
            return 1000;
        }).thenAccept(System.out::println);
        voidCompletableFuture.join();

    }
}

上述代码很方便的将计算和打印组合在了一起,简化了操作future的处理流程。CompletableFuture默认的Executor是ForkJoinPool.commonPool(), 如果要自定义可以在supplyAsync传入时指定Executor,比如自定义的ThreadPoolExecutor。ForkJoinPool.commonPool()有个坏处,就是内部也是不限制线程数量的,当线程数达到很大阈值,容易产生OOM。

3. CompletableFuture的一些核心API

CompletableFuture的核心API,可以分为四类。

3.1 阻塞获取结果

此类API和Future的核心API类似,都是阻塞获取结果

  • complete(T value) 主动触发一个结果的返回,比如f.complete(“hello world”)

  • completeExceptionally(Throwable ex) 主动触发一个异常,比如f.completeExceptionally(new NullPointerException());

  • T get() 类似于future.get,遇到中断或执行失败,会抛出异常,异常会通过ExecutionException进行包装

  • T get(long timeout, TimeUnit unit) 类似于future.get(long timeout, TimeUnit unit)

  • T getNow(T valueIfAbsent)

  • T join() 相比get操作,只是不抛出异常

举例:

 CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
            return "hello world";
        });
        System.out.println(helloFuture.get());

3.2 创建CompletableFuture

CompletableFuture无需通过new构建新对象,只需要借助静态方法进行创建,常见的有以下几种

  • CompletableFuture runAsync(Runnable runnable) 默认线程池异步执行,Future内的返回结果为Void

  • CompletableFuture runAsync(Runnable runnable, Executor executor) 指定线程池执行runnable任务

  • CompletableFuture<U> supplyAsync(Supplier<U> supplier) 顾名思义,默认线程池异步执行,Future内的返回结果为指定的结果类型U

  • CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

  • CompletableFuture<U> completedFuture(U value) 直接返回计算好的对象。

runAsync与supplyAsync的区别类似于Runnable和Callable的区别,一个无结果,一个有结果。

3.3 监听结果处理

这类API对CompletableFuture返回的结果再进行处理,前面thenAccept就是一种处理类型。

这一类按返回结果、输入参数由可以分为几类。

  • 仅仅执行另一个无关的任务 不会对CompletableFuture的结果进行处理,也不会返回执行结果

  • CompletableFuture thenRun(Runnable action)

  • CompletableFuture thenRunAsync(Runnable action)

  • CompletableFuture thenRunAsync(Runnable action, Executor executor) 举例

    CompletableFuture.supplyAsync(() -> "hello world").thenRun(() -> System.out.println("success"));
  • 纯消费

  • CompletableFuture thenAccept(Consumer<? super T> action)

  • CompletableFuture thenAcceptAsync(Consumer<? super T> action)

  • CompletableFuture thenAcceptAsync(Consumer<? super T> action, Executor executor)

仅仅对结果进行消费,不返回消费结果,比如:

CompletableFuture.supplyAsync(() -> "hello world").thenAccept(System.out::println);
  • 对结果进行处理,返回处理后的结果

提供异常参数,可以对异常进行处理

  • CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action)
  • CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
  • CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

比如:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello world").whenComplete((r, t) -> {
            if (t != null) {
                t.printStackTrace();
            } else {
                System.out.println(r);
            }
        });
System.out.println("原始结果" + future.get());
  • 对结果进行处理,并返回新类型的结果
  • CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
  • CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
  • CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

其中带Async是指交给线程池异步处理,不带Async是当前线程进行处理。比如:

 CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> "hello world").thenApplyAsync(x ->{
            System.out.println(x);
            return 1L;
        });
        //这里变成了long类型
        System.out.println(future.get());
  • 对结果进行处理,并返回新类型的结果, 并且支持异常参数的传参

与thenApply*类似,只不过增加了异常参数

  • CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
  • CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
  • CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
    CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
              throw new IllegalArgumentException();
          }).handleAsync((x, e) ->{
              if (e != null) {
                  e.printStackTrace();
              }
              System.out.println(x);
              return 1L;
          });
          future.join();
    相比thenApplyAsync会直接抛出异常,handleAsync提供了异常处理

3.4 与其他CompletableFuture组合

  • allOf 和 anyOf 类似于ThreadPoolExecutor的invokeAll和invokeAny,接收CompletableFuture参数

  • CompletableFuture allOf(CompletableFuture<?>… cfs) 所有cfs执行完后,返回Void,

  • CompletableFuture anyOf(CompletableFuture<?>… cfs) 有一个CompletableFuture执行完就返回,并且返回该CompletableFuture的执行结果

相比ThreadPoolExecutor的invokeAll和invokeAny,allOf和anyOf会阻塞执行, 但anyOf至少会返回结果,allOf就比较鸡肋,仅仅阻塞执行,结果的合并还需要交给监听方法来处理,比如:

CompletableFuture.allOf(future, future2).whenComplete((v, th) -> {
           if (th == null) {
               try {
                   System.out.println(future.get() * future2.get());
               } catch (InterruptedException | ExecutionException e) {
                   e.printStackTrace();
               }
           }

       });
  • combine-组合其他CompletableFuture

  • CompletableFuture thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

  • CompletableFuture thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

  • CompletableFuture thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

    CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
              return 1L;
          });
    
          CompletableFuture<Long> future2 = CompletableFuture.supplyAsync(() -> {
              return 2L;
          });
    
          CompletableFuture<String> f =  future.thenCombine(future2, (ret1 ,ret2 ) ->  ret1 * ret2 + "");
          System.out.println(f.get());

    combine的好处是两个CompletableFuture可以并行执行,最终将结果进行计算,下面介绍的compose是两个CompletableFuture相互之间有依赖关系。

  • compose-组合结果返回一个新的CompletableFuture

  • CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)

  • CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)

  • CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

 CompletableFuture<String> f =  future.thenCompose(x -> CompletableFuture.supplyAsync(() -> (x * 10) + ""));
        System.out.println(f.get());

compose完全可以翻译为thenApply,只不过往往会遇到复用CompletableFuture的情况,因此compose也是有用武之地的,使用场景相对比较少。

  • thenAcceptBoth 和 runAfterBoth

thenAcceptBoth和combine比较类似,只不过前者不返回结果,后者返回执行结果

  • CompletableFuture thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)

  • CompletableFuture thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)

  • CompletableFuture thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)

  • Either进行多CompletableFuture的组合

以下是无返回结果的Either

  • CompletableFuture acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
  • CompletableFuture acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
  • CompletableFuture acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

以下是有返回结果的Either

  • CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
  • CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
  • CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

Either和anyOf的区别是,前者执行两个CompletableFuture,而后者可以执行多个。

举例:

CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
            return 1L;
        });

CompletableFuture<Long> future2 = CompletableFuture.supplyAsync(() -> {
            return 2L;
        });

future.acceptEither(future2, System.out::println);

4. 总结

CompletableFuture是一个方便任务提交和回调的工具类,个人觉得它的API过于丰富了,重复度有些高,令人眼花缭乱,大家在接触时只需要掌握常见API即可,在学习更多API时,记得与核心API方法做对比,方便理解和记忆。

CompletableFuture使用的ForkJoin线程池,它与ThreadPoolExecutor有一定的区别,一般CompletableFuture个人倾向于使用ThreadPoolExecutor来替代默认的ForkJoin线程池,CompletableFuture仅仅是一个工具,内部也是使用线程池实现的,所以我们也没必要专研它的实现原理,只需要掌握线程池原理就足够了。

预览图
评论区

索引目录