Java并发包中的Future/CompletableFuture:异步任务结果的优雅组合与错误处理
大家好,今天我们深入探讨Java并发包中Future和CompletableFuture这两个强大的工具,重点关注它们在异步任务结果的组合与错误处理方面的应用。Future接口作为Java 5引入的并发特性,为我们提供了一种获取异步任务结果的方式。而CompletableFuture则是在Java 8中引入的,它是Future接口的扩展和增强,提供了更加丰富和灵活的异步编程模型。
Future接口:异步计算的基石
Future接口代表异步计算的结果。它允许我们启动一个任务,并在稍后的某个时间点获取其结果。Future接口定义了以下主要方法:
get(): 阻塞当前线程,直到异步任务完成并返回结果。如果任务抛出异常,get()方法会抛出ExecutionException,包含原始异常。get(long timeout, TimeUnit unit): 与get()方法类似,但设置了超时时间。如果在指定时间内任务未完成,则抛出TimeoutException。cancel(boolean mayInterruptIfRunning): 尝试取消任务。如果任务已经完成、已被取消或由于某些其他原因无法取消,则此尝试将失败。isDone(): 如果任务已完成、已被取消或由于某些其他原因无法再继续执行,则返回true。isCancelled(): 如果任务在正常完成之前被取消,则返回true。
一个简单的Future使用例子:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
public class FutureExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(1);
Callable<String> task = () -> {
Thread.sleep(2000); // 模拟耗时操作
return "Task completed!";
};
Future<String> future = executor.submit(task);
try {
System.out.println("Task submitted. Waiting for result...");
String result = future.get(1, TimeUnit.SECONDS); // 设置超时时间
System.out.println("Result: " + result);
} catch (InterruptedException e) {
System.out.println("Task interrupted!");
} catch (ExecutionException e) {
System.out.println("Task execution failed: " + e.getCause());
} catch (TimeoutException e) {
System.out.println("Task timed out!");
future.cancel(true); // 取消任务
} finally {
executor.shutdown();
}
}
}
在这个例子中,我们使用ExecutorService提交一个Callable任务。Future对象允许我们异步获取任务的结果。我们设置了超时时间,以防止无限期地等待。如果任务超时,我们会取消它。
Future的局限性:
尽管Future接口提供了一种基本的异步编程模型,但它也存在一些局限性:
- 阻塞性:
get()方法是阻塞的,这意味着当前线程必须等待任务完成才能继续执行。这可能导致性能瓶颈。 - 缺乏组合能力: 很难将多个
Future对象组合在一起,以实现复杂的异步流程。例如,如果我们需要在一个Future完成后执行另一个Future,我们需要手动编写复杂的代码。 - 错误处理的复杂性: 处理异步任务中的异常比较繁琐。我们需要捕获
ExecutionException,并从中提取原始异常。 - 缺乏完成通知:
Future接口没有提供一种方便的方式来在任务完成后立即执行某些操作。我们需要轮询isDone()方法来检查任务是否完成。
CompletableFuture:异步编程的进化
CompletableFuture是Java 8引入的,它解决了Future接口的许多局限性。CompletableFuture实现了Future和CompletionStage接口,提供了更强大和灵活的异步编程模型。
CompletableFuture的主要特性:
- 非阻塞性:
CompletableFuture提供了许多非阻塞的方法,允许我们在不阻塞当前线程的情况下处理异步任务的结果。 - 强大的组合能力:
CompletableFuture提供了丰富的组合方法,例如thenApply(),thenCompose(),thenCombine()等,允许我们构建复杂的异步流程。 - 完善的错误处理:
CompletableFuture提供了exceptionally()和handle()等方法,允许我们方便地处理异步任务中的异常。 - 完成通知:
CompletableFuture允许我们在任务完成后立即执行某些操作,例如使用thenAccept()方法。 - 手动完成:
CompletableFuture允许我们手动设置任务的结果或抛出异常。
CompletableFuture的核心方法:
为了更好地理解CompletableFuture,我们先来了解一下其核心的方法,这些方法可以大致分为以下几类:
-
创建
CompletableFuture对象:CompletableFuture.supplyAsync(Supplier<U> supplier): 使用提供的Supplier异步地计算一个值。CompletableFuture.runAsync(Runnable runnable): 异步地运行一个Runnable任务,不返回任何值。CompletableFuture.completedFuture(U value): 创建一个已经完成的CompletableFuture,其值为给定的value。new CompletableFuture<U>(): 创建一个新的、未完成的CompletableFuture实例。
-
结果处理:
thenApply(Function<? super T,? extends U> fn): 当前CompletableFuture完成时,将结果传递给fn函数进行处理,返回一个新的CompletableFuture,其结果是fn函数的返回值。thenApplyAsync(Function<? super T,? extends U> fn): 与thenApply类似,但fn函数在异步线程中执行。thenAccept(Consumer<? super T> action): 当当前CompletableFuture完成时,将结果传递给action消费,不返回任何值。thenAcceptAsync(Consumer<? super T> action): 与thenAccept类似,但action在异步线程中执行。thenRun(Runnable action): 当当前CompletableFuture完成时,执行action,不关心结果,也不返回任何值。thenRunAsync(Runnable action): 与thenRun类似,但action在异步线程中执行。get(): 阻塞直到CompletableFuture完成,然后返回结果。get(long timeout, TimeUnit unit): 与get()类似,但设置了超时时间。
-
组合:
thenCompose(Function<? super T, ? extends CompletionStage<U>> fn): 将当前CompletableFuture的结果传递给fn函数,fn函数返回一个新的CompletionStage,然后将两个CompletionStage合并为一个CompletableFuture。thenCompose用于连接依赖的任务。thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn): 与thenCompose类似,但fn函数在异步线程中执行。thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn): 当当前CompletableFuture和other都完成时,将它们的结果传递给fn函数进行处理,返回一个新的CompletableFuture,其结果是fn函数的返回值。thenCombine用于并行执行两个独立的任务,然后合并它们的结果。thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn): 与thenCombine类似,但fn函数在异步线程中执行。allOf(CompletableFuture<?>... cfs): 创建一个新的CompletableFuture,当所有给定的CompletableFuture都完成时,这个新的CompletableFuture也会完成。anyOf(CompletableFuture<?>... cfs): 创建一个新的CompletableFuture,当任何一个给定的CompletableFuture完成时,这个新的CompletableFuture也会完成。
-
异常处理:
exceptionally(Function<Throwable, ? extends T> fn): 当当前CompletableFuture抛出异常时,将异常传递给fn函数进行处理,返回一个新的CompletableFuture,其结果是fn函数的返回值。handle(BiFunction<? super T, Throwable, ? extends U> fn): 当当前CompletableFuture完成时,无论是否抛出异常,都将结果和异常传递给fn函数进行处理,返回一个新的CompletableFuture,其结果是fn函数的返回值。whenComplete(BiConsumer<? super T, ? super Throwable> action): 当当前CompletableFuture完成时,无论是否抛出异常,都将结果和异常传递给action消费,不返回任何值。whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action): 与whenComplete类似,但action在异步线程中执行。
-
手动控制:
complete(T value): 手动设置CompletableFuture的结果。completeExceptionally(Throwable ex): 手动设置CompletableFuture抛出异常。
CompletableFuture的使用示例:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class CompletableFutureExample {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
// 1. 创建 CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1); // 模拟耗时操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Task interrupted";
}
return "Hello, CompletableFuture!";
});
// 2. 结果处理
CompletableFuture<String> thenApplyFuture = future.thenApply(result -> result + " - Processed");
System.out.println("ThenApply result: " + thenApplyFuture.get(2, TimeUnit.SECONDS));
// 3. 组合
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (x, y) -> x + y);
System.out.println("Combined result: " + combinedFuture.get(2, TimeUnit.SECONDS));
// 4. 异常处理
CompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Something went wrong!");
});
CompletableFuture<String> recoveredFuture = exceptionFuture.exceptionally(ex -> "Recovered from exception: " + ex.getMessage());
System.out.println("Recovered result: " + recoveredFuture.get(2, TimeUnit.SECONDS));
// 5. 完成时执行
future.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Task completed successfully with result: " + result);
} else {
System.err.println("Task completed with exception: " + ex.getMessage());
}
});
TimeUnit.SECONDS.sleep(3); // 等待所有任务完成
}
}
这个例子展示了CompletableFuture的一些常见用法,包括创建、结果处理、组合和异常处理。
异步任务结果的优雅组合
CompletableFuture提供了强大的组合方法,允许我们构建复杂的异步流程。以下是一些常见的组合模式:
1. 串行组合 (thenApply, thenCompose):
串行组合是指一个任务的完成依赖于另一个任务的完成。thenApply和thenCompose都用于实现串行组合,但它们之间有一个重要的区别:
thenApply: 用于对结果进行转换。它接收一个Function,该Function将前一个CompletableFuture的结果作为输入,并返回一个新的结果。thenApply返回一个新的CompletableFuture,其结果是Function的返回值。thenCompose: 用于连接两个相互依赖的异步任务。它接收一个Function,该Function将前一个CompletableFuture的结果作为输入,并返回一个新的CompletableFuture。thenCompose返回一个新的CompletableFuture,该CompletableFuture代表第二个异步任务的结果。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = future1.thenApply(s -> s + " World"); // 转换结果
CompletableFuture<String> future3 = future1.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World")); // 连接异步任务
System.out.println("thenApply result: " + future2.get()); // 输出: Hello World
System.out.println("thenCompose result: " + future3.get()); // 输出: Hello World
2. 并行组合 (thenCombine, allOf, anyOf):
并行组合是指多个任务可以同时执行,而不需要等待彼此完成。thenCombine, allOf和anyOf都用于实现并行组合:
thenCombine: 用于合并两个独立的异步任务的结果。它接收另一个CompletableFuture和一个BiFunction。当两个CompletableFuture都完成时,BiFunction将被调用,并将两个CompletableFuture的结果作为输入,返回一个新的结果。thenCombine返回一个新的CompletableFuture,其结果是BiFunction的返回值。allOf: 用于等待所有给定的CompletableFuture完成。它接收一个CompletableFuture数组,并返回一个新的CompletableFuture。当所有给定的CompletableFuture都完成时,新的CompletableFuture也将完成。allOf返回的CompletableFuture没有结果,但我们可以使用join()方法来阻塞当前线程,直到所有给定的CompletableFuture完成。anyOf: 用于等待任何一个给定的CompletableFuture完成。它接收一个CompletableFuture数组,并返回一个新的CompletableFuture。当任何一个给定的CompletableFuture完成时,新的CompletableFuture也将完成。anyOf返回的CompletableFuture的结果是第一个完成的CompletableFuture的结果。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
System.out.println("thenCombine result: " + combinedFuture.get()); // 输出: Hello World
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2);
allOfFuture.join(); // 等待所有任务完成
System.out.println("All tasks completed");
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);
System.out.println("Any task completed with result: " + anyOfFuture.get()); // 输出: Hello 或 World (取决于哪个先完成)
表格对比:组合方法
| 方法 | 描述 | 适用场景 |
|---|---|---|
thenApply |
将当前 CompletableFuture 的结果传递给一个 Function 进行转换,返回一个新的 CompletableFuture,其结果是 Function 的返回值。 |
对异步任务的结果进行简单转换。 |
thenCompose |
将当前 CompletableFuture 的结果传递给一个 Function,该 Function 返回一个新的 CompletableFuture,然后将两个 CompletableFuture 合并为一个。 |
连接依赖的异步任务,其中第二个任务的启动依赖于第一个任务的结果。 |
thenCombine |
当两个 CompletableFuture 都完成时,将它们的结果传递给一个 BiFunction 进行处理,返回一个新的 CompletableFuture,其结果是 BiFunction 的返回值。 |
并行执行两个独立的异步任务,然后合并它们的结果。 |
allOf |
创建一个新的 CompletableFuture,当所有给定的 CompletableFuture 都完成时,这个新的 CompletableFuture 也会完成。 |
等待多个异步任务完成。 |
anyOf |
创建一个新的 CompletableFuture,当任何一个给定的 CompletableFuture 完成时,这个新的 CompletableFuture 也会完成。 |
等待多个异步任务中的任何一个完成。 |
异步任务的错误处理
CompletableFuture提供了多种方式来处理异步任务中的错误:
exceptionally: 用于在发生异常时提供一个备用结果。它接收一个Function,该Function将异常作为输入,并返回一个备用结果。如果CompletableFuture正常完成,则exceptionally方法不会被调用。handle: 用于处理正常结果和异常。它接收一个BiFunction,该BiFunction将结果和异常作为输入,并返回一个新的结果。无论CompletableFuture是正常完成还是抛出异常,handle方法都会被调用。whenComplete: 用于在CompletableFuture完成时执行一些操作,无论它是正常完成还是抛出异常。它接收一个BiConsumer,该BiConsumer将结果和异常作为输入。whenComplete方法不会返回任何值。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("Something went wrong!");
}
return "Success!";
});
CompletableFuture<String> exceptionallyFuture = future.exceptionally(ex -> "Recovered: " + ex.getMessage());
CompletableFuture<String> handleFuture = future.handle((result, ex) -> (ex != null) ? "Handle Exception: " + ex.getMessage() : "Handle Result: " + result);
future.whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("WhenComplete Exception: " + ex.getMessage());
} else {
System.out.println("WhenComplete Result: " + result);
}
});
System.out.println("exceptionally result: " + exceptionallyFuture.get());
System.out.println("handle result: " + handleFuture.get());
表格对比:错误处理方法
| 方法 | 描述 | 适用场景 |
|---|---|---|
exceptionally |
当当前 CompletableFuture 抛出异常时,将异常传递给一个 Function 进行处理,返回一个新的 CompletableFuture,其结果是 Function 的返回值。 |
提供备用结果,从异常中恢复。 |
handle |
当当前 CompletableFuture 完成时,无论是否抛出异常,都将结果和异常传递给一个 BiFunction 进行处理,返回一个新的 CompletableFuture,其结果是 BiFunction 的返回值。 |
处理正常结果和异常,可以根据情况返回不同的结果。 |
whenComplete |
当当前 CompletableFuture 完成时,无论是否抛出异常,都将结果和异常传递给一个 BiConsumer 进行处理,不返回任何值。 |
在任务完成后执行一些操作,例如记录日志或释放资源。 |
异步编程的要点
- 选择合适的线程池: 使用
ExecutorService来管理线程。根据任务的类型和数量,选择合适的线程池。例如,对于CPU密集型任务,可以使用固定大小的线程池。对于IO密集型任务,可以使用更大的线程池或使用ForkJoinPool。 - 避免阻塞操作: 尽量避免在异步任务中使用阻塞操作。如果必须使用阻塞操作,请考虑将其放在单独的线程中执行。
- 处理异常: 务必处理异步任务中的异常。可以使用
exceptionally,handle或whenComplete方法来处理异常。 - 取消任务: 如果不再需要异步任务的结果,请取消它。可以使用
cancel方法来取消任务。 - 避免死锁: 在使用
CompletableFuture的组合方法时,要小心避免死锁。例如,避免在一个CompletableFuture的回调函数中等待另一个CompletableFuture完成。
Future与CompletableFuture的对比
| 特性 | Future |
CompletableFuture |
|---|---|---|
| 阻塞性 | get()方法阻塞 |
提供非阻塞方法,例如thenApplyAsync,thenComposeAsync |
| 组合能力 | 缺乏组合能力 | 提供丰富的组合方法,例如thenApply,thenCompose,thenCombine,allOf,anyOf |
| 错误处理 | 错误处理复杂 | 提供exceptionally,handle,whenComplete等方法,方便进行错误处理 |
| 完成通知 | 缺乏完成通知 | 提供thenAccept,runAfterEither,whenComplete等方法,可以在任务完成后立即执行某些操作 |
| 手动完成 | 不支持手动完成 | 支持手动完成,可以使用complete和completeExceptionally方法 |
| 适用场景 | 简单异步任务,不需要复杂的组合和错误处理 | 复杂的异步流程,需要灵活的组合和完善的错误处理 |
灵活运用异步工具
Future和CompletableFuture是Java并发包中强大的工具,它们可以帮助我们构建高效和可伸缩的异步应用程序。Future提供了基本的异步编程模型,而CompletableFuture则提供了更强大和灵活的异步编程模型。在实际开发中,我们需要根据具体的需求选择合适的工具。正确使用它们能够显著提高应用程序的性能和响应能力。