java8 CompletableFuture异步调用与lamda结合

前言:jdk1.8lamda记录

异步执行动作

static void thenApplyAsyncExample() { CompletableFuture<String>cf = CompletableFuture.completedFuture(“message”).thenApplyAsync(s -> { assertTrue(Thread.currentThread().isDaemon()); randomSleep(); returns.toUpperCase(); }); assertNull(cf.getNow(null)); assertEquals(“MESSAGE”, cf.join());}

使用固定的线程池完成异步执行动作

可以通过使用线程池方式来管理异步动作申请,以下代码基于固定的线程池,也是做一个大写字母转换动作

staticExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() { int count = 1; @Override public Thread newThread(Runnable runnable) { return new Thread(runnable, “custom-executor-” + count++); } }); static void thenApplyAsyncWithExecutorExample() { CompletableFuture<String>cf = CompletableFuture.completedFuture(“message”).thenApplyAsync(s -> { assertTrue(Thread.currentThread().getName().startsWith(“custom-executor-“)); assertFalse(Thread.currentThread().isDaemon()); randomSleep(); returns.toUpperCase(); }, executor); assertNull(cf.getNow(null)); assertEquals(“MESSAGE”, cf.join());}

作为消费者消费计算结果

假设我们本次计算只需要前一次的计算结果,而不需要返回本次计算结果,那就有点类似于生产者(前一次计算)-消费者(本次计算)模式了

static void thenAcceptExample() { StringBuilder result = new StringBuilder(); CompletableFuture.completedFuture(“thenAccept message”) .thenAccept(s ->result.append(s)); assertTrue(“Result was empty”, result.length() > 0);}

消费者是同步执行的,所以不需要在 CompletableFuture 里对结果进行合并。

异步消费

相较于前一个示例的同步方式,我们也对应有异步方式,代码如清单 11 所示。

static void thenAcceptAsyncExample() { StringBuilder result = new StringBuilder(); CompletableFuture<Void>cf = CompletableFuture.completedFuture(“thenAcceptAsync message”) .thenAcceptAsync(s ->result.append(s)); cf.join(); assertTrue(“Result was empty”, result.length() > 0);}

计算过程中的异常接下来介绍异步操作过程中的异常情况处理。下面这个示例中我们会在字符转换异步请求中刻意延迟 1 秒钟,然后才会提交到 ForkJoinPool 里面去执行。

static void completeExceptionallyExample() { CompletableFuture<String>cf = CompletableFuture.completedFuture(“message”).thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); CompletableFuture<String>exceptionHandler = cf.handle((s, th) -> { return (th != null) ? “message upon cancel” : “”; }); cf.completeExceptionally(new RuntimeException(“completed exceptionally”)); assertTrue(“Was not completed exceptionally”, cf.isCompletedExceptionally()); try { cf.join(); fail(“Should have thrown an exception”); } catch(CompletionException ex) { // just for testing assertEquals(“completed exceptionally”, ex.getCause().getMessage()); } assertEquals(“message upon cancel”, exceptionHandler.join());}

首先我们创建一个 CompletableFuture(计算完毕),然后调用 thenApplyAsync 返回一个新的 CompletableFuture,接着通过使用 delayedExecutor(timeout, timeUnit)方法延迟 1 秒钟执行。然后我们创建一个 handler(exceptionHandler),它会处理异常,返回另一个字符串”message upon cancel”。接下来进入 join()方法,执行大写转换操作,并且抛出 CompletionException 异常。

取消计算任务

与前面一个异常处理的示例类似,我们可以通过调用 cancel(boolean mayInterruptIfRunning)方法取消计算任务。此外,cancel()方法与 completeExceptionally(new CancellationException())等价。

static void cancelExample() { CompletableFuture cf = CompletableFuture.completedFuture(“message”).thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); CompletableFuture cf2 = cf.exceptionally(throwable -> “canceled message”); assertTrue(“Was not canceled”, cf.cancel(true)); assertTrue(“Was not completed exceptionally”, cf.isCompletedExceptionally()); assertEquals(“canceled message”, cf2.join());}

一个 CompletableFuture VS 两个异步计算

我们可以创建一个 CompletableFuture 接收两个异步计算的结果,下面代码首先创建了一个 String 对象,接下来分别创建了两个 CompletableFuture 对象 cf1 和 cf2,cf2 通过调用 applyToEither 方法实现我们的需求。

static void applyToEitherExample() { String original = “Message”; CompletableFuture cf1 = CompletableFuture.completedFuture(original) .thenApplyAsync(s -> delayedUpperCase(s)); CompletableFuture cf2 = cf1.applyToEither( CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), s -> s + ” from applyToEither”); assertTrue(cf2.join().endsWith(” from applyToEither”));}

使用消费者替换用于处理异步计算结果

static void acceptEitherExample() { String original = “Message”; StringBuilder result = new StringBuilder(); CompletableFuture cf = CompletableFuture.completedFuture(original) .thenApplyAsync(s -> delayedUpperCase(s)) .acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), s -> result.append(s).append(“acceptEither”)); cf.join(); assertTrue(“Result was empty”, result.toString().endsWith(“acceptEither”));}

运行两个阶段后执行

下面这个示例程序两个阶段执行完毕后返回结果,首先将字符转为大写,然后将字符转为小写,在两个计算阶段都结束之后触发 ??CompletableFuture??。

static void runAfterBothExample() { String original = “Message”; StringBuilder result = new StringBuilder(); CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth( CompletableFuture.completedFuture(original).thenApply(String::toLowerCase), () -> result.append(“done”)); assertTrue(“Result was empty”, result.length() > 0);}

也可以通过以下方式处理异步计算结果,

static void thenAcceptBothExample() { String original = “Message”; StringBuilder result = new StringBuilder(); CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth( CompletableFuture.completedFuture(original).thenApply(String::toLowerCase), (s1, s2) -> result.append(s1 + s2)); assertEquals(“MESSAGEmessage”, result.toString());}

整合两个计算结果

我们可以通过 thenCombine()方法整合两个异步计算的结果,注意,以下代码的整个程序过程是同步的,getNow()方法最终会输出整合后的结果,也就是说大写字符和小写字符的串联值。

static void thenCombineExample() { String original = “Message”; CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s)) .thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)), (s1, s2) -> s1 + s2); assertEquals(“MESSAGEmessage”, cf.getNow(null));}

按照同步方式执行两个方法后再合成字符串,以下代码采用异步方式同步执行两个方法,由于异步方式情况下不能够确定哪一个方法最终执行完毕,所以我们需要调用 join()方法等待后一个方法结束后再合成字符串,这一点和线程的 join()方法是一致的,主线程生成并起动了子线程,如果子线程里要进行大量的耗时的运算,主线程往往将于子线程之前结束,但是如果主线程处理完其他的事务后,需要用到子线程的处理结果,也就是主线程需要等待子线程执行完成之后再结束,这个时候就要用到 join()方法了,即 join()的作用是:”等待该线程终止”。

static void thenCombineAsyncExample() { String original = “Message”; CompletableFuture cf = CompletableFuture.completedFuture(original) .thenApplyAsync(s -> delayedUpperCase(s)) .thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), (s1, s2) -> s1 + s2); assertEquals(“MESSAGEmessage”, cf.join());}

除了 thenCombine()方法以外,还有另外一种方法-thenCompose(),这个方法也会实现两个方法执行后的返回结果的连接。

static void thenComposeExample() { String original = “Message”; CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s)) .thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)) .thenApply(s -> upper + s)); assertEquals(“MESSAGEmessage”, cf.join());}

anyOf()方法

以下代码模拟了如何在几个计算过程中任意一个完成后创建 CompletableFuture,在这个例子中,我们创建了几个计算过程,然后转换字符串到大写字符。由于这些 CompletableFuture 是同步执行的(下面这个例子使用的是 thenApply()方法,而不是 thenApplyAsync()方法),使用 anyOf()方法后返回的任何一个值都会立即触发 CompletableFuture。然后我们使用 whenComplete(BiConsumer<? super Object, ? super Throwable> action)方法处理结果。

static void anyOfExample() { StringBuilder result = new StringBuilder(); List messages = Arrays.asList(“a”, “b”, “c”); List<CompletableFuture> futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> { if(th == null) { assertTrue(isUpperCase((String) res)); result.append(res); }}); assertTrue(“Result was empty”, result.length() > 0);}

当所有的 CompletableFuture 完成后创建 CompletableFuture清单 22 所示我们会以同步方式执行多个异步计算过程,在所有计算过程都完成后,创建一个 CompletableFuture。

static void allOfExample() { StringBuilder result = new StringBuilder(); List messages = Arrays.asList(“a”, “b”, “c”); List<CompletableFuture> futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> { futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null)))); result.append(“done”);}); assertTrue(“Result was empty”, result.length() > 0);}

相较于前一个同步示例,我们也可以异步执行

static void allOfAsyncExample() { StringBuilder result = new StringBuilder(); List messages = Arrays.asList(“a”, “b”, “c”); List<CompletableFuture> futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> { futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null)))); result.append(“done”);}); allOf.join(); assertTrue(“Result was empty”, result.length() > 0);}

首先异步地通过调用 cars()方法获取 Car 对象,返回一个 CompletionStage<List>实例。Cars()方法可以在内部使用调用远端服务器上的 REST 服务等类似场景。然后和其他的 CompletionStage<List>组合,通过调用 rating(manufacturerId)方法异步地返回 CompletionStage 实例。当所有的 Car 对象都被填充了 rating 后,调用 allOf()方法获取最终值。调用 whenComplete()方法打印最终的评分(rating)。

cars().thenCompose(cars -> { List<CompletionStage> updatedCars = cars.stream() .map(car -> rating(car.manufacturerId).thenApply(r -> { car.setRating(r); return car; })).collect(Collectors.toList()); CompletableFuture done = CompletableFuture .allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()])); return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture) .map(CompletableFuture::join).collect(Collectors.toList())); }).whenComplete((cars, th) -> { if (th == null) { cars.forEach(System.out::println); } else { throw new RuntimeException(th); }}).toCompletableFuture().join();

自己改造的lamda+CompletableFuture调用模板

public class FurtureTest { public static void allOfExample() { StringBuilder result = new StringBuilder(); List<String> messages = Arrays.asList(“a”, “b”, “c”); List<CompletableFuture> futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s ->result.append(s))) .collect(Collectors.toList()); CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> { futures.forEach(cf -> { System.out.println(“a”);}); result.append(“done”); }); } public static String delayedLowerCase(String s){ return s.toString(); } public static void main(String[] argas){ allOfExample(); }}

我喜欢出发。凡是到达了的地方,

java8 CompletableFuture异步调用与lamda结合

相关文章:

你感兴趣的文章:

标签云: