CompletableFuture实现异步编排

来自版块 问答
355
1
媒介

为什么必要异步实行?
场景:电商体系中获取一个完备的商品信息大概分为以下几步:①获取商品根本信息 ②获取商品图片信息 ③获取商品促销运动信息 ④获取商品各种类的根本信息 等操纵,假如利用串行方式去实行这些操纵,假设每个操纵实行1s,那么用户看到完备的商品详情就必要4s的时间,假如利用并行方式实行这些操纵,大概只必要1s就可以完成。以是这就是异步实行的利益。
JDK5的Future接口
Future接口用于代表异步盘算的效果,通过Future接口提供的方法可以检察异步盘算是否实行完成,大概等候实行效果并获取实行效果,同时还可以取消实行。
枚举Future接口的方法:

  • get():获取使命实行效果,假如使命还没完成则会壅闭等候直到使命实行完成。假如使命被取消则会抛出CancellationException非常,假如使命实行过程发生非常则会抛出ExecutionException非常,假如壅闭等候过程中被停止则会抛出InterruptedException非常。
  • get(long timeout,Timeunit unit):带超时时间的get()方法,假如壅闭等候过程中超时则会抛出TimeoutException非常。
  • cancel():用于取消异步使命的实行。假如异步使命已经完成大概已经被取消,大概由于某些缘故原由不能取消,则会返回false。假如使命还没有被实行,则会返回true而且异步使命不会被实行。假如使命已经开始实行了但是还没有实行完成,若mayInterruptIfRunning为true,则会立刻停止实行使命的线程并返回true,若mayInterruptIfRunning为false,则会返回true且不会停止使命实行线程。
  • isCanceled():判定使命是否被取消,假如使命在竣事(正常实行竣事大概实行非常竣事)前被取消则返回true,否则返回false。
  • isDone():判定使命是否已经完成,假如完成则返回true,否则返回false。必要留意的是:使命实行过程中发生非常、使命被取消也属于使命已完成,也会返回true。
利用Future接口和Callable接口实现异步实行:
public static void main(String[] args) {        // 快速创建线程池        ExecutorService executorService = Executors.newFixedThreadPool(4);        // 获取商品根本信息(可以利用Lambda表达式简化Callable接口,这里为了便于观察不利用)        Future<String> future1 = executorService.submit(new Callable<String>() {                @Override                public String call() throws Exception {                        return &quot;获取到商品根本信息&quot;;                }        });        // 获取商品图片信息        Future<String> future2 = executorService.submit(new Callable<String>() {                @Override                public String call() throws Exception {                        return &quot;获取商品图片信息&quot;;                }        });        // 获取商品促销信息        Future<String> future3 = executorService.submit(new Callable<String>() {                @Override                public String call() throws Exception {                        return &quot;获取商品促销信息&quot;;                }        });        // 获取商品各种类根本信息        Future<String> future4 = executorService.submit(new Callable<String>() {                @Override                public String call() throws Exception {                        return &quot;获取商品各种类根本信息&quot;;                }        });        // 获取效果        try {                System.out.println(future1.get());                System.out.println(future2.get());                System.out.println(future3.get());                System.out.println(future4.get());        } catch (InterruptedException | ExecutionException e) {                e.printStackTrace();        }finally {                executorService.shutdown();        }}复制代码
既然Future可以实现异步实行并获取效果,为什么还会必要CompletableFuture?
简述一下Future接口的毛病:

  • 不支持手动完成 当提交了一个使命,但是实行太慢了,通过其他路径已经获取到了使命效果,如今没法把这个使命效果关照到正在实行的线程,以是必须自动取消大概不停等候它实行完成。
  • 不支持进一步的非壅闭调用 通过Future的get()方法会不停壅闭到使命完成,但是想在获取使命之后实行额外的使命,由于 Future 不支持回调函数,以是无法实现这个功能。
  • 不支持链式调用 对于Future的实行效果,想继承传到下一个Future处置惩罚利用,从而形成一个链式的pipline调用,这在 Future中无法实现。
  • 不支持多个 Future 归并 好比有10个Future并行实行,想在全部的Future运行完毕之后,实行某些函数,是无法通过Future实现的。
  • 不支持非常处置惩罚 Future的API没有任何的非常处置惩罚的api,以是在异步运行时,假如出了非常题目欠好定位。
利用Future接口可以通过get()壅闭式获取效果大概通过轮询+isDone()非壅闭式获取效果,但是前一种方法会壅闭,后一种会泯灭CPU资源,以是JDK的Future接口实现异步实行对获取效果不太友爱,以是在JDK8时推出了CompletableFuture实现异步编排

CompletableFuture的利用

CompletableFuture概述
JDK8中新增长了一个包罗50个方法左右的类CompletableFuture,提供了非常强盛的Future的扩展功能,可以资助昨们简化异步编程的复杂性,提供了函数式编程的本领,可以通过回调的方式处置惩罚盘算效果,而且提供了转换和组合CompletableFuture的方法
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>复制代码CompletableFuture类实现了Future接口和CompletionStage接口,即除了可以利用Future接口的全部方法之外,CompletionStage<T>接口提供了更多方法来更好的实现异步编排,而且大量的利用了JDK8引入的函数式编程概念。背面会过细的先容常用的API。



① 创建CompletableFuture的方式

利用new关键字创建
// 无返回效果CompletableFuture<String> completableFuture = new CompletableFuture<>();// 已知返回效果CompletableFuture<String> completableFuture = new CompletableFuture<>(&quot;result&quot;);// 已知返回效果(底层实在也是带参数的构造器赋值)CompletableFuture<String> completedFuture = CompletableFuture.completedFuture(&quot;result&quot;);复制代码创建一个返回效果范例为String的CompletableFuture,可以利用Future接口的get()方法获取该值(同样也会壅闭)。
可以利用无参构造器返回一个没有效果的CompletableFuture,也可以通过构造器的传参CompletableFuture设置好返回效果,大概利用
CompletableFuture.completedFuture(U value)构造一个已知效果的CompletableFuture。
利用CompletableFuture类的静态工厂方法(常用)

  • runAsync() 无返回值
// 利用默认线程池public static CompletableFuture<Void> runAsync(Runnable runnable)// 利用自界说线程池(保举)public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) 复制代码runAsync()方法的参数是Runnable接口,这是一个函数式接口,不答应返回值。当必要异步操纵且不关心返回效果的时间可以利用runAsync()方法。
// 例子public static void main(String[] args) {    // 快速创建线程池    ExecutorService executor = Executors.newFixedThreadPool(4);    try {        // 通过Lambda表达式实现Runnable接口        CompletableFuture.runAsync(()-> System.out.println(&quot;获取商品根本信息乐成&quot;), executor).get();    } catch (InterruptedException | ExecutionException e) {        e.printStackTrace();    }finally {        executor.shutdown();    }}复制代码

  • supplyAsync() 有返回值
// 利用默认线程池public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)// 利用自界说线程池(保举)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)复制代码supplyAsync()方法的参数是Supplier<U>供给型接口(无参有返回值),这也是一个函数式接口,U是返回效果值的范例当必要异步操纵且关心返回效果的时间,可以利用supplyAsync()方法。
// 例子public static void main(String[] args) {        // 快速创建线程池        ExecutorService executor = Executors.newFixedThreadPool(4);        try {                // 通过Lambda表达式实现实行内容,并返回效果通过CompletableFuture吸收                CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {                        System.out.println(&quot;获取商品信息乐成&quot;);                        return &quot;信息&quot;;                }, executor);                // 输出效果                System.out.println(completableFuture.get());        } catch (InterruptedException | ExecutionException e) {                e.printStackTrace();        }finally {                executor.shutdown();        }}  复制代码
关于第二个参数Executor executor阐明
在没有指定第二个参数(即没有指定线程池)时,CompletableFuture直接利用默认的ForkJoinPool.commonPool()作为它的线程池实行异步代码。
在现实生产中会利用自界说的线程池来实行异步代码,详细可以参考另一篇文章深入明白线程池ThreadPoolExecutor - 掘金 (juejin.cn),内里的第二节有生产中怎么创建自界说线程的例子,可以参考一下。

② 得到异步实行效果

get() 壅闭式获取实行效果
public T get() throws InterruptedException, ExecutionException复制代码该方法调用后假如使命还没完成则会壅闭等候直到使命实行完成。假如使命实行过程发生非常则会抛出ExecutionException非常,假如壅闭等候过程中被停止则会抛出InterruptedException非常。
get(long timeout, TimeUnit unit) 带超时的壅闭式获取实行效果
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException复制代码该方法调用后假如假如使命还没完成则会壅闭等候直到使命实行完成大概超出timeout时间,假如壅闭等候过程中超时则会抛出TimeoutException非常。
getNow(T valueIfAbsent) 立即获取实行效果
public T getNow(T valueIfAbsent)复制代码该方法调用后,会立即获取效果不会壅闭等候。假如使命完成则直接返回实行完成后的效果,假如使命没有完成,则返回调用方法时传入的参数valueIfAbsent值。
join() 不抛非常的壅闭时获取实行效果
public T join()复制代码该方法和get()方法作用一样,只是不会抛出非常
complete(T value) 自动触发盘算,返回异步是否实行完毕
public boolean complete(T value)复制代码该方法调用后,会自动触发盘算效果,假如此时异步实行并没有完成(此时boolean值返回true),则通过get()拿到的数据会是complete()设置的参数value值,假如此时异步实行已经完成(此时boolean值返回false),则通过get()拿到的就是实行完成的效果。
// 例子public static void main(String[] args) {    // 快速创建线程池    ExecutorService executor = Executors.newFixedThreadPool(4);    try {        // 通过Lambda表达式实现实行内容,并返回效果通过CompletableFuture吸收        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {            // 休眠2秒,使得异步实行变慢,会导致自动触发盘算先实行,此时返回的get就是555            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }            return 666;        }, executor);        // 自动触发盘算,判定异步实行是否完成        System.out.println(completableFuture.complete(555));        // 输出效果        System.out.println(completableFuture.get());    } catch (InterruptedException | ExecutionException e) {        e.printStackTrace();    }finally {        executor.shutdown();    }}/**输出效果:    true    555**/复制代码
③ 对实行效果举行处置惩罚

whenComplete 等候前面使命实行完再实行当前处置惩罚
public CompletableFuture<T> whenComplete(        BiConsumer<? super T, ? super Throwable> action)复制代码在创建好的初始使命大概是上一个使命后通过链式调用该方法,会在之前使命实行完成后继承实行whenComplete里的内容(whenComplete传入的action只是对之前使命的效果举行处置惩罚),纵然用该方法可以制止前面说到的Future接口的题目,不再必要通过壅闭大概轮询的方式去获取效果,而是通过调用该方法等使命实行完毕主动调用。
该方法的参数为BiConsumer<? super T, ? super Throwable> action消耗者接口,可以吸收两个参数,一个是使命实行完的效果,一个是实行使命时的非常
// 例子public static void main(String[] args) {    // 快速创建线程池    ExecutorService executor = Executors.newFixedThreadPool(4);    try {        CompletableFuture.supplyAsync(() -> 666, executor)                .whenComplete((res, ex) -> System.out.println(&quot;使命实行完毕,效果为&quot; + res + &quot; 非常为&quot; + ex)                );    } catch (Exception e) {        e.printStackTrace();    }finally {        executor.shutdown();    }}/**输出效果:    使命实行完毕,效果为666 非常为null**/复制代码
除了上述的方法外,另有一些雷同的方法如XXXAsync()大概是XXXAsync(XX,Executor executor),对于这些方法,这里同一阐明,后续文章中将不会再枚举
public CompletableFuture<T> whenCompleteAsync(        BiConsumer<? super T, ? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(        BiConsumer<? super T, ? super Throwable> action, Executor executor)复制代码XXXAsync():表现上一个使命实行完成后,不会再利用之前使命中的线程,而是重新利用从默认线程(ForkJoinPool 线程池)中重新获取新的线程实行当前使命
XXXAsync(XX,Executor executor):表现不会相沿之前使命的线程,而是利用本身第二个参数指定的线程池重新获取线程实行当前使命

④ 对实行效果举行消耗

thenRun 前面使命实行完后实行当前使命,不关心前面使命的效果,也没返回值
public CompletableFuture<Void> thenRun(Runnable action)复制代码
CompletableFuture.supplyAsync(actionA).thenRun(actionB)像如许链式调用该方法表现:实行使命A完成后接着实行使命B,但是使命B不必要A的效果,而且实行完使命B也不会返回效果
thenRun(Runnable action)的参数为Runnable接口即没有传入参数
// 例子public static void main(String[] args) {        // 快速创建线程池        ExecutorService executor = Executors.newFixedThreadPool(4);        try {                CompletableFuture.supplyAsync(() -> 666, executor)                    .thenRun(() -> System.out.println(&quot;我都没有参数怎么拿到之前的效果,我也没有返回值。&quot;)                );        } catch (Exception e) {                e.printStackTrace();        }finally {                executor.shutdown();        }}/**输出效果:    我都没有参数怎么拿到之前的效果,我也没有返回值。**/复制代码
thenAccept 前面使命实行完后实行当前使命,消耗前面的效果,没有返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)复制代码
CompletableFuture.supplyAsync(actionA).thenRun(actionB)像如许链式调用该方法表现:实行使命A完成后接着实行使命B,而且使命B必要A的效果,但是实行完使命B不会返回效果
thenAccept(Consumer<? super T> action)的参数为消耗者接口,即可以传入一个参数,该参数为上一个使命的实行效果。
// 例子public static void main(String[] args) {    // 快速创建线程池    ExecutorService executor = Executors.newFixedThreadPool(4);    try {        CompletableFuture.supplyAsync(() -> 666, executor)                .thenAccept((res) -> System.out.println(&quot;我能拿到上一个的效果&quot; + res + &quot;,但是我没法传出去。&quot;)                );    } catch (Exception e) {        e.printStackTrace();    }finally {        executor.shutdown();    }}/**输出效果:    我能拿到上一个的效果666,但是我没法传出去。**/复制代码
thenApply 前面使命实行完后实行当前使命,消耗前面的效果,具有返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)复制代码
CompletableFuture.supplyAsync(actionA).thenRun(actionB)像如许链式调用该方法表现:实行使命A完成后接着实行使命B,而且使命B必要A的效果,而且实行完使命B必要有返回效果
thenApply(Function<? super T,? extends U> fn)的参数为函数式接口,即可以传入一个参数范例为T,该参数是上一个使命的实行效果,而且函数式接口必要有返回值,范例为U。
// 例子public static void main(String[] args) {    // 快速创建线程池    ExecutorService executor = Executors.newFixedThreadPool(4);    try {        CompletableFuture.supplyAsync(() -> 666, executor)                .thenApply((res) -> {                        System.out.println(&quot;我能拿到上一个的效果&quot; + res + &quot;而且我要将效果传出去&quot;);                        return res;                    }                ).whenComplete((res, ex) -> System.out.println(&quot;效果&quot; + res));    } catch (Exception e) {        e.printStackTrace();    }finally {        executor.shutdown();    }}/**输出效果:    我能拿到上一个的效果666而且我要将效果传出去    效果666**/复制代码
⑤ 非常处置惩罚

exceptionally 非常捕捉,只消耗前面使命中出现的非常信息,具有返回值
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)复制代码可以通过链式调用该方法来获取非常信息,而且具有返回值。假如某一个使命出现非常被exceptionally捕捉到则剩余的使命将不会再实行。雷同于Java非常处置惩罚的catch。
exceptionally(Function<Throwable, ? extends T> fn)的参数是函数式接口,具有一个参数以及返回值,该参数为前面使命的非常信息。
// 例子public static void main(String[] args) {    // 快速创建线程池    ExecutorService executor = Executors.newFixedThreadPool(4);    try {        CompletableFuture.supplyAsync(() -> {                    if (Math.random() < 0.5) throw new RuntimeException(&quot;error&quot;);                    return 666;                }, executor)                .thenApply((res) -> {                    System.out.println(&quot;不出现非常,效果为&quot; + res);                    return res;                }).exceptionally((ex) -> {                    ex.printStackTrace();                    return -1;                });    } catch (Exception e) {        e.printStackTrace();    }finally {        executor.shutdown();    }}/**输出效果:// 这是不出现非常的环境不出现非常,效果为666// 这是出现非常的环境java.util.concurrent.CompletionException: java.lang.RuntimeException: error        at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)        at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)        at java.base/java.lang.Thread.run(Thread.java:834)Caused by: java.lang.RuntimeException: error        at com.xqsr.review.thread.ThreadTest.lambda$main$0(ThreadTest.java:15)        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)        ... 3 more**/复制代码
handle 非常处置惩罚,消耗前面的效果及非常信息,具有返回值,不会停止后续使命
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)复制代码可以通过链式调用该方法可以跟thenApply()一样可以消耗前面使命的效果并完成本身使命内容,而且具有返回值。差别之处在于出现非常也可以接着往下实行,根据非常参数做进一步处置惩罚。
handle(BiFunction<? super T, Throwable, ? extends U> fn)的参数是消耗者接口,一个参数是使命实行效果,一个是非常信息,而且具有返回值。
// 例子public static void main(String[] args) {    // 快速创建线程池    ExecutorService executor = Executors.newFixedThreadPool(4);    try {        CompletableFuture.supplyAsync(() -> 666, executor)                .thenApply((res) -> {                    if (Math.random() < 0.5) throw new RuntimeException(&quot;error&quot;);                    return res;                }).handle((res, ex) -> {                    System.out.println(&quot;效果&quot; + res + &quot;(null表现之前出现非常导致效果无法传过来)&quot;);                    return res == null ? -1 : res;                }).thenApply((res) -> {                    System.out.println(&quot;效果为&quot; + res + &quot;(-1表现之前出现非常,颠末handler使得效果处置惩罚成-1)&quot;);                    return res;                }).exceptionally((ex) -> {                    ex.printStackTrace();                    return -1;                });    } catch (Exception e) {        e.printStackTrace();    }finally {        executor.shutdown();    }}/**输出效果:// 这是不出现非常的环境效果666(null表现之前出现非常导致效果无法传过来)效果为666(-1表现之前出现非常,颠末handler使得效果处置惩罚成-1)// 这是出现非常的环境效果null(null表现之前出现非常导致效果无法传过来)效果为-1(-1表现之前出现非常,颠末handler使得效果处置惩罚成-1)**/复制代码可以看到通过handle雷同于Java非常处置惩罚的finally,出现非常并不会像利用exceptionally那样停止后续的使命,而是继承实行,可以通过handle为之前出现非常无法得到的效果重新赋值(根据业务需求设置安全值之类的)。

⑥ 两组使命按次序实行

thenCompose 实现两组使命按前后次序实行
public <U> CompletableFuture<U> thenCompose(    Function<? super T, ? extends CompletionStage<U>> fn)复制代码A.thenCompose(B)相称于使命A要排在使命B前面,即次序的实行使命A、使命B。该方法的参数是函数式接口,函数式接口的参数是调用者的实行效果,返回值是另一个使命B。
public static void main(String[] args) {    // 快速创建线程池    ExecutorService executor = Executors.newFixedThreadPool(4);    try {        CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {            System.out.println(&quot;使命A先实行效果为666&quot;);            return 666;        }, executor);        actionA.thenCompose((res) ->  CompletableFuture.supplyAsync(() -> {            System.out.println(&quot;使命B后实行效果加上333&quot;);            return 333 + res;        })).whenComplete((res, ex) -> System.out.println(res));    } catch (Exception e) {        e.printStackTrace();    }finally {        executor.shutdown();    }}/**输出效果:    使命A先实行效果为666    使命B后实行效果加上333    999**/复制代码
⑦ 两组使命谁快用谁

applyToEither 比力两组使命实行速率,谁快消耗谁的实行效果
public <U> CompletableFuture<U> applyToEither(        CompletionStage<? extends T> other, Function<? super T, U> fn)复制代码该方法用于比力两组使命的实行速率,谁先实行完就用谁的实行效果
传入参数阐明:第一个参数传入的是另一个使命的实行内容,第二个参数传入的是终极这两个使命谁快返回谁的效果,并通过当前函数式接口举行吸收和处置惩罚(利用函数式接口,有参且有返回值)。
// 例子public static void main(String[] args) {    // 快速创建线程池    ExecutorService executor = Executors.newFixedThreadPool(4);    try {        CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }            System.out.println(&quot;使命A等候久一点,实行效果为555&quot;);            return 555;        }, executor);        actionA.applyToEither(CompletableFuture.supplyAsync(() -> {            System.out.println(&quot;使命B很快,实行效果为666&quot;);            return 666;        }), (res) -> {            System.out.println(&quot;终极利用的实行效果为&quot; + res);            return res;        });    } catch (Exception e) {        e.printStackTrace();    }finally {        executor.shutdown();    }}/**输出效果:    使命B很快,实行效果为666    终极利用的实行效果为666    使命A等候久一点,实行效果为555**/复制代码
除了applyToEither对使命终极效果举行获取并消耗,而且具有返回值的方法外,另有两个雷同的方法。
// 这个方法结果和上面的一样,比谁快拿谁的效果,差别的是这个方法只消耗不具有返回值public CompletableFuture<Void> acceptEither(        CompletionStage<? extends T> other, Consumer<? super T> action)复制代码// 这个方法结果和上面的一样,比谁快拿谁的效果,差别的是这个方法不消耗效果也不具有返回值public CompletableFuture<Void> runAfterEither(        CompletionStage<?> other, Runnable action)复制代码
⑧ 两组使命完成后归并

thenCombine 等候两组使命实行完毕后,归并两组使命的实行效果
public <U,V> CompletableFuture<V> thenCombine(        CompletionStage<? extends U> other,        BiFunction<? super T,? super U,? extends V> fn)复制代码该方法用于两组使命都完成后,将两组使命的实行效果一起交给当火线法的BiFunction处置惩罚。先完成的使命会等候后者使命完成。
传入参数阐明:第一个参数传入的是另一个使命的实行内容,第二个参数传入的是带两个参数的函数式接口(第一个参数是使命1的实行效果,第二个参数是使命2的实行效果,具有返回值)。
// 例子public static void main(String[] args) {        // 快速创建线程池        ExecutorService executor = Executors.newFixedThreadPool(4);        try {                CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {                        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }                        System.out.println(&quot;使命A等候久一点,实行效果为333&quot;);                        return 333;                }, executor);                CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> {                        System.out.println(&quot;使命B很快,实行效果为666&quot;);                        return 666;                }, executor);                actionA.thenCombine(actionB, (res1, res2) -> {                        System.out.println(&quot;终极利用的实行效果为&quot; + (res1 + res2));                        return res1 + res2;                });        } catch (Exception e) {                e.printStackTrace();        }finally {                executor.shutdown();        }}/**输出效果:    使命B很快,实行效果为666    使命A等候久一点,实行效果为333    终极利用的实行效果为999**/复制代码
除了thenCombine对使命终极效果举行获取并消耗,而且具有返回值的方法外,另有两个雷同的方法。
// 这个方法结果和上面的一样,获取归并效果,差别的是这个方法只消耗不具有返回值public <U> CompletableFuture<Void> thenAcceptBoth(        CompletionStage<? extends U> other,        BiConsumer<? super T, ? super U> action)复制代码// 这个方法结果和上面的一样,获取归并效果,差别的是这个方法不消耗效果也不具有返回值public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,                                                Runnable action)复制代码
⑨ 多使命组合

allOf 实现并行地实行多个使命,等候全部使命实行完成(无需思量实行次序)
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)复制代码该方法可以实现并行地实行多个使命,实用于多个使命没有依靠关系,可以相互独立实行的,传入参数为多个使命,没有返回值。
allOf()方法会等候全部的使命实行完毕再返回,可以通过get()壅闭确保全部使命实行完毕
// 例子public static void main(String[] args) {    // 快速创建线程池    ExecutorService executor = Executors.newFixedThreadPool(4);    try {        CompletableFuture<Void> actionA = CompletableFuture.runAsync(() -> {            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }            System.out.println(&quot;使命A等候2秒后实行完毕&quot;);        }, executor);        CompletableFuture<Void> actionB = CompletableFuture.runAsync(() -> {            System.out.println(&quot;使命B很快实行完毕&quot;);        }, executor);        CompletableFuture<Void> actionC = CompletableFuture.runAsync(() -> {            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }            System.out.println(&quot;使命C等候1秒后实行完毕&quot;);        }, executor);        CompletableFuture<Void> actionD = CompletableFuture.runAsync(() -> {            try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }            System.out.println(&quot;使命D等候5秒后实行完毕&quot;);        }, executor);        CompletableFuture.allOf(actionA, actionB, actionC, actionD).get();    } catch (Exception e) {        e.printStackTrace();    }finally {        executor.shutdown();    }}/**输出效果:    使命B很快实行完毕    使命C等候1秒后实行完毕    使命A等候2秒后实行完毕    使命D等候5秒后实行完毕**/复制代码
anyOf 实现并行地实行多个使命,只要有个一个完成的便会返回实行效果
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)复制代码该方法可以实现并行地实行多个使命,传入参数为多个使命,具有返回值。该方法不会等候全部使命实行完成后再返回效果,而是当有一个使命完成时,便会返回谁人使命的实行效果
// 例子public static void main(String[] args) {    // 快速创建线程池    ExecutorService executor = Executors.newFixedThreadPool(4);    try {        CompletableFuture<Integer> actionA = CompletableFuture.supplyAsync(() -> {            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }            System.out.println(&quot;使命A等候2秒后实行完毕&quot;);            return 555;        }, executor);        CompletableFuture<Integer> actionB = CompletableFuture.supplyAsync(() -> {            System.out.println(&quot;使命B很快实行完毕&quot;);            return 666;        }, executor);        CompletableFuture<Integer> actionC = CompletableFuture.supplyAsync(() -> {            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }            System.out.println(&quot;使命C等候1秒后实行完毕&quot;);            return 999;        }, executor);        CompletableFuture<Integer> actionD = CompletableFuture.supplyAsync(() -> {            try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }            System.out.println(&quot;使命D等候5秒后实行完毕&quot;);            return 888;        }, executor);        System.out.println(&quot;开始实行完的返回效果为&quot; + CompletableFuture.anyOf(actionA, actionB, actionC, actionD).get());    } catch (Exception e) {        e.printStackTrace();    }finally {        executor.shutdown();    }}/**输出效果:    使命B很快实行完毕    开始实行完的返回效果为666    使命C等候1秒后实行完毕    使命A等候2秒后实行完毕    使命D等候5秒后实行完毕**/复制代码
一个利用CompletableFuture异步编排的例子

不必要关心例子中的业务内容,利用时按照本身业务的需求,对差别的需求调用差别API即可。
编写使命时重要关心以下几点: ① 是否必要消耗之前使命的效果 ② 是否必要返回效果给其他使命消耗 ③ 是否要求次序实行(是否答应并行,有没有前置要求)
/** * 该方法用于获取单个商品的全部信息 * 1. 商品的根本信息 * 2. 商品的图片信息 * 3. 商品的贩卖属性组合 * 4. 商品的各种分类根本信息 * 5. 商品的促销信息 */@Overridepublic SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {        // 创建商品Vo通过各个使命去美满Vo的信息        SkuItemVo skuItemVo = new SkuItemVo();                // 获取商品根本信息 查询到后设置进Vo中,返回根本信息给后续使命消耗 (利用自界说的线程池举行异步)        CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {                SkuInfoEntity info = this.getById(skuId);                skuItemVo.setInfo(info);                return info;        }, executor);        // 获取商品的图片信息 获取后设置进Vo中,此处不必要消耗图片信息,也不必要返回效果。以是利用runAsync即可        CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {                List<SkuImagesEntity> imagesEntities = skuImagesService.getImagesBySkuId(skuId);                skuItemVo.setImages(imagesEntities);        }, executor);        // 获取商品贩卖属性 由于要使用之前查询到的根本信息,但后续使命不必要消耗贩卖属性(不必要返回效果),以是利用thenAcceptAsync消耗之前的根本信息,不返回贩卖信息。        CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {                List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());                skuItemVo.setSaleAttr(saleAttrVos);        }, executor);        // 获取商品各分类根本信息,同样要消耗之前的根本信息,但无需返回,以是利用thenAcceptAsync即可        CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {                SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());                skuItemVo.setDesc(spuInfoDescEntity);        }, executor);        // 获取商品的促销信息 这个也不必要消耗之前使命的效果,也不必要返回效果。以是直接利用runAsync即可        CompletableFuture<Void> seckillFuture = CompletableFuture.runAsync(() -> {                R skuSeckilInfo = seckillFeignService.getSkuSeckilInfo(skuId);                if (skuSeckilInfo.getCode() == 0) {                        SeckillSkuVo seckilInfoData = skuSeckilInfo.getData(&quot;data&quot;, new TypeReference<SeckillSkuVo>() {                        });                        skuItemVo.setSeckillSkuVo(seckilInfoData);                        if (seckilInfoData != null) {                                long currentTime = System.currentTimeMillis();                                if (currentTime > seckilInfoData.getEndTime()) {                                        skuItemVo.setSeckillSkuVo(null);                                }                        }                }        }, executor);        // 利用allOf()组合全部使命,而且利用get()壅闭,等候全部使命完成。        CompletableFuture.allOf(saleAttrFuture,descFuture,baseAttrFuture,imageFuture,seckillFuture).get();                // 末了返回商品Vo        return skuItemVo;}

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

使用道具 举报

全部评论 1

转发了
6 天前

热文

所属版块

您需要登录后才可以回帖 立即登录
说说你的想法......
0
1
0
返回顶部