Java并发包中的Future/CompletableFuture:异步任务结果的优雅组合与错误处理
大家好,今天我们深入探讨Java并发包中Future
和CompletableFuture
这两个强大的工具,重点关注它们在异步任务结果的组合与错误处理方面的应用。Future
接口作为Java 5引入的并发特性,为我们提供了一种获取异步任务结果的方式。而CompletableFuture
则是在Java 8中引入的,它是Future
接口的扩展和增强,提供了更加丰富和灵活的异步编程模型。
Future
接口:异步计算的基石
Future
接口代表异步计算的结果。它允许我们启动一个任务,并在稍后的某个时间点获取其结果。Future
接口定义了以下主要方法:
get()
: 阻塞当前线程,直到异步任务完成并返回结果。如果任务抛出异常,get()
方法会抛出ExecutionException
,包含原始异常。get(long timeout, TimeUnit unit)
: 与get()
方法类似,但设置了超时时间。如果在指定时间内任务未完成,则抛出TimeoutException
。cancel(boolean mayInterruptIfRunning)
: 尝试取消任务。如果任务已经完成、已被取消或由于某些其他原因无法取消,则此尝试将失败。isDone()
: 如果任务已完成、已被取消或由于某些其他原因无法再继续执行,则返回true
。isCancelled()
: 如果任务在正常完成之前被取消,则返回true
。
一个简单的Future
使用例子:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
public class FutureExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(1);
Callable<String> task = () -> {
Thread.sleep(2000); // 模拟耗时操作
return "Task completed!";
};
Future<String> future = executor.submit(task);
try {
System.out.println("Task submitted. Waiting for result...");
String result = future.get(1, TimeUnit.SECONDS); // 设置超时时间
System.out.println("Result: " + result);
} catch (InterruptedException e) {
System.out.println("Task interrupted!");
} catch (ExecutionException e) {
System.out.println("Task execution failed: " + e.getCause());
} catch (TimeoutException e) {
System.out.println("Task timed out!");
future.cancel(true); // 取消任务
} finally {
executor.shutdown();
}
}
}
在这个例子中,我们使用ExecutorService
提交一个Callable
任务。Future
对象允许我们异步获取任务的结果。我们设置了超时时间,以防止无限期地等待。如果任务超时,我们会取消它。
Future
的局限性:
尽管Future
接口提供了一种基本的异步编程模型,但它也存在一些局限性:
- 阻塞性:
get()
方法是阻塞的,这意味着当前线程必须等待任务完成才能继续执行。这可能导致性能瓶颈。 - 缺乏组合能力: 很难将多个
Future
对象组合在一起,以实现复杂的异步流程。例如,如果我们需要在一个Future
完成后执行另一个Future
,我们需要手动编写复杂的代码。 - 错误处理的复杂性: 处理异步任务中的异常比较繁琐。我们需要捕获
ExecutionException
,并从中提取原始异常。 - 缺乏完成通知:
Future
接口没有提供一种方便的方式来在任务完成后立即执行某些操作。我们需要轮询isDone()
方法来检查任务是否完成。
CompletableFuture
:异步编程的进化
CompletableFuture
是Java 8引入的,它解决了Future
接口的许多局限性。CompletableFuture
实现了Future
和CompletionStage
接口,提供了更强大和灵活的异步编程模型。
CompletableFuture
的主要特性:
- 非阻塞性:
CompletableFuture
提供了许多非阻塞的方法,允许我们在不阻塞当前线程的情况下处理异步任务的结果。 - 强大的组合能力:
CompletableFuture
提供了丰富的组合方法,例如thenApply()
,thenCompose()
,thenCombine()
等,允许我们构建复杂的异步流程。 - 完善的错误处理:
CompletableFuture
提供了exceptionally()
和handle()
等方法,允许我们方便地处理异步任务中的异常。 - 完成通知:
CompletableFuture
允许我们在任务完成后立即执行某些操作,例如使用thenAccept()
方法。 - 手动完成:
CompletableFuture
允许我们手动设置任务的结果或抛出异常。
CompletableFuture
的核心方法:
为了更好地理解CompletableFuture
,我们先来了解一下其核心的方法,这些方法可以大致分为以下几类:
-
创建
CompletableFuture
对象:CompletableFuture.supplyAsync(Supplier<U> supplier)
: 使用提供的Supplier
异步地计算一个值。CompletableFuture.runAsync(Runnable runnable)
: 异步地运行一个Runnable
任务,不返回任何值。CompletableFuture.completedFuture(U value)
: 创建一个已经完成的CompletableFuture
,其值为给定的value
。new CompletableFuture<U>()
: 创建一个新的、未完成的CompletableFuture
实例。
-
结果处理:
thenApply(Function<? super T,? extends U> fn)
: 当前CompletableFuture
完成时,将结果传递给fn
函数进行处理,返回一个新的CompletableFuture
,其结果是fn
函数的返回值。thenApplyAsync(Function<? super T,? extends U> fn)
: 与thenApply
类似,但fn
函数在异步线程中执行。thenAccept(Consumer<? super T> action)
: 当当前CompletableFuture
完成时,将结果传递给action
消费,不返回任何值。thenAcceptAsync(Consumer<? super T> action)
: 与thenAccept
类似,但action
在异步线程中执行。thenRun(Runnable action)
: 当当前CompletableFuture
完成时,执行action
,不关心结果,也不返回任何值。thenRunAsync(Runnable action)
: 与thenRun
类似,但action
在异步线程中执行。get()
: 阻塞直到CompletableFuture
完成,然后返回结果。get(long timeout, TimeUnit unit)
: 与get()
类似,但设置了超时时间。
-
组合:
thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
: 将当前CompletableFuture
的结果传递给fn
函数,fn
函数返回一个新的CompletionStage
,然后将两个CompletionStage
合并为一个CompletableFuture
。thenCompose
用于连接依赖的任务。thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
: 与thenCompose
类似,但fn
函数在异步线程中执行。thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
: 当当前CompletableFuture
和other
都完成时,将它们的结果传递给fn
函数进行处理,返回一个新的CompletableFuture
,其结果是fn
函数的返回值。thenCombine
用于并行执行两个独立的任务,然后合并它们的结果。thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
: 与thenCombine
类似,但fn
函数在异步线程中执行。allOf(CompletableFuture<?>... cfs)
: 创建一个新的CompletableFuture
,当所有给定的CompletableFuture
都完成时,这个新的CompletableFuture
也会完成。anyOf(CompletableFuture<?>... cfs)
: 创建一个新的CompletableFuture
,当任何一个给定的CompletableFuture
完成时,这个新的CompletableFuture
也会完成。
-
异常处理:
exceptionally(Function<Throwable, ? extends T> fn)
: 当当前CompletableFuture
抛出异常时,将异常传递给fn
函数进行处理,返回一个新的CompletableFuture
,其结果是fn
函数的返回值。handle(BiFunction<? super T, Throwable, ? extends U> fn)
: 当当前CompletableFuture
完成时,无论是否抛出异常,都将结果和异常传递给fn
函数进行处理,返回一个新的CompletableFuture
,其结果是fn
函数的返回值。whenComplete(BiConsumer<? super T, ? super Throwable> action)
: 当当前CompletableFuture
完成时,无论是否抛出异常,都将结果和异常传递给action
消费,不返回任何值。whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
: 与whenComplete
类似,但action
在异步线程中执行。
-
手动控制:
complete(T value)
: 手动设置CompletableFuture
的结果。completeExceptionally(Throwable ex)
: 手动设置CompletableFuture
抛出异常。
CompletableFuture
的使用示例:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class CompletableFutureExample {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
// 1. 创建 CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1); // 模拟耗时操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Task interrupted";
}
return "Hello, CompletableFuture!";
});
// 2. 结果处理
CompletableFuture<String> thenApplyFuture = future.thenApply(result -> result + " - Processed");
System.out.println("ThenApply result: " + thenApplyFuture.get(2, TimeUnit.SECONDS));
// 3. 组合
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (x, y) -> x + y);
System.out.println("Combined result: " + combinedFuture.get(2, TimeUnit.SECONDS));
// 4. 异常处理
CompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Something went wrong!");
});
CompletableFuture<String> recoveredFuture = exceptionFuture.exceptionally(ex -> "Recovered from exception: " + ex.getMessage());
System.out.println("Recovered result: " + recoveredFuture.get(2, TimeUnit.SECONDS));
// 5. 完成时执行
future.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Task completed successfully with result: " + result);
} else {
System.err.println("Task completed with exception: " + ex.getMessage());
}
});
TimeUnit.SECONDS.sleep(3); // 等待所有任务完成
}
}
这个例子展示了CompletableFuture
的一些常见用法,包括创建、结果处理、组合和异常处理。
异步任务结果的优雅组合
CompletableFuture
提供了强大的组合方法,允许我们构建复杂的异步流程。以下是一些常见的组合模式:
1. 串行组合 (thenApply, thenCompose):
串行组合是指一个任务的完成依赖于另一个任务的完成。thenApply
和thenCompose
都用于实现串行组合,但它们之间有一个重要的区别:
thenApply
: 用于对结果进行转换。它接收一个Function
,该Function
将前一个CompletableFuture
的结果作为输入,并返回一个新的结果。thenApply
返回一个新的CompletableFuture
,其结果是Function
的返回值。thenCompose
: 用于连接两个相互依赖的异步任务。它接收一个Function
,该Function
将前一个CompletableFuture
的结果作为输入,并返回一个新的CompletableFuture
。thenCompose
返回一个新的CompletableFuture
,该CompletableFuture
代表第二个异步任务的结果。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = future1.thenApply(s -> s + " World"); // 转换结果
CompletableFuture<String> future3 = future1.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World")); // 连接异步任务
System.out.println("thenApply result: " + future2.get()); // 输出: Hello World
System.out.println("thenCompose result: " + future3.get()); // 输出: Hello World
2. 并行组合 (thenCombine, allOf, anyOf):
并行组合是指多个任务可以同时执行,而不需要等待彼此完成。thenCombine
, allOf
和anyOf
都用于实现并行组合:
thenCombine
: 用于合并两个独立的异步任务的结果。它接收另一个CompletableFuture
和一个BiFunction
。当两个CompletableFuture
都完成时,BiFunction
将被调用,并将两个CompletableFuture
的结果作为输入,返回一个新的结果。thenCombine
返回一个新的CompletableFuture
,其结果是BiFunction
的返回值。allOf
: 用于等待所有给定的CompletableFuture
完成。它接收一个CompletableFuture
数组,并返回一个新的CompletableFuture
。当所有给定的CompletableFuture
都完成时,新的CompletableFuture
也将完成。allOf
返回的CompletableFuture
没有结果,但我们可以使用join()
方法来阻塞当前线程,直到所有给定的CompletableFuture
完成。anyOf
: 用于等待任何一个给定的CompletableFuture
完成。它接收一个CompletableFuture
数组,并返回一个新的CompletableFuture
。当任何一个给定的CompletableFuture
完成时,新的CompletableFuture
也将完成。anyOf
返回的CompletableFuture
的结果是第一个完成的CompletableFuture
的结果。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
System.out.println("thenCombine result: " + combinedFuture.get()); // 输出: Hello World
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2);
allOfFuture.join(); // 等待所有任务完成
System.out.println("All tasks completed");
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2);
System.out.println("Any task completed with result: " + anyOfFuture.get()); // 输出: Hello 或 World (取决于哪个先完成)
表格对比:组合方法
方法 | 描述 | 适用场景 |
---|---|---|
thenApply |
将当前 CompletableFuture 的结果传递给一个 Function 进行转换,返回一个新的 CompletableFuture ,其结果是 Function 的返回值。 |
对异步任务的结果进行简单转换。 |
thenCompose |
将当前 CompletableFuture 的结果传递给一个 Function ,该 Function 返回一个新的 CompletableFuture ,然后将两个 CompletableFuture 合并为一个。 |
连接依赖的异步任务,其中第二个任务的启动依赖于第一个任务的结果。 |
thenCombine |
当两个 CompletableFuture 都完成时,将它们的结果传递给一个 BiFunction 进行处理,返回一个新的 CompletableFuture ,其结果是 BiFunction 的返回值。 |
并行执行两个独立的异步任务,然后合并它们的结果。 |
allOf |
创建一个新的 CompletableFuture ,当所有给定的 CompletableFuture 都完成时,这个新的 CompletableFuture 也会完成。 |
等待多个异步任务完成。 |
anyOf |
创建一个新的 CompletableFuture ,当任何一个给定的 CompletableFuture 完成时,这个新的 CompletableFuture 也会完成。 |
等待多个异步任务中的任何一个完成。 |
异步任务的错误处理
CompletableFuture
提供了多种方式来处理异步任务中的错误:
exceptionally
: 用于在发生异常时提供一个备用结果。它接收一个Function
,该Function
将异常作为输入,并返回一个备用结果。如果CompletableFuture
正常完成,则exceptionally
方法不会被调用。handle
: 用于处理正常结果和异常。它接收一个BiFunction
,该BiFunction
将结果和异常作为输入,并返回一个新的结果。无论CompletableFuture
是正常完成还是抛出异常,handle
方法都会被调用。whenComplete
: 用于在CompletableFuture
完成时执行一些操作,无论它是正常完成还是抛出异常。它接收一个BiConsumer
,该BiConsumer
将结果和异常作为输入。whenComplete
方法不会返回任何值。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("Something went wrong!");
}
return "Success!";
});
CompletableFuture<String> exceptionallyFuture = future.exceptionally(ex -> "Recovered: " + ex.getMessage());
CompletableFuture<String> handleFuture = future.handle((result, ex) -> (ex != null) ? "Handle Exception: " + ex.getMessage() : "Handle Result: " + result);
future.whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("WhenComplete Exception: " + ex.getMessage());
} else {
System.out.println("WhenComplete Result: " + result);
}
});
System.out.println("exceptionally result: " + exceptionallyFuture.get());
System.out.println("handle result: " + handleFuture.get());
表格对比:错误处理方法
方法 | 描述 | 适用场景 |
---|---|---|
exceptionally |
当当前 CompletableFuture 抛出异常时,将异常传递给一个 Function 进行处理,返回一个新的 CompletableFuture ,其结果是 Function 的返回值。 |
提供备用结果,从异常中恢复。 |
handle |
当当前 CompletableFuture 完成时,无论是否抛出异常,都将结果和异常传递给一个 BiFunction 进行处理,返回一个新的 CompletableFuture ,其结果是 BiFunction 的返回值。 |
处理正常结果和异常,可以根据情况返回不同的结果。 |
whenComplete |
当当前 CompletableFuture 完成时,无论是否抛出异常,都将结果和异常传递给一个 BiConsumer 进行处理,不返回任何值。 |
在任务完成后执行一些操作,例如记录日志或释放资源。 |
异步编程的要点
- 选择合适的线程池: 使用
ExecutorService
来管理线程。根据任务的类型和数量,选择合适的线程池。例如,对于CPU密集型任务,可以使用固定大小的线程池。对于IO密集型任务,可以使用更大的线程池或使用ForkJoinPool
。 - 避免阻塞操作: 尽量避免在异步任务中使用阻塞操作。如果必须使用阻塞操作,请考虑将其放在单独的线程中执行。
- 处理异常: 务必处理异步任务中的异常。可以使用
exceptionally
,handle
或whenComplete
方法来处理异常。 - 取消任务: 如果不再需要异步任务的结果,请取消它。可以使用
cancel
方法来取消任务。 - 避免死锁: 在使用
CompletableFuture
的组合方法时,要小心避免死锁。例如,避免在一个CompletableFuture
的回调函数中等待另一个CompletableFuture
完成。
Future
与CompletableFuture
的对比
特性 | Future |
CompletableFuture |
---|---|---|
阻塞性 | get() 方法阻塞 |
提供非阻塞方法,例如thenApplyAsync ,thenComposeAsync |
组合能力 | 缺乏组合能力 | 提供丰富的组合方法,例如thenApply ,thenCompose ,thenCombine ,allOf ,anyOf |
错误处理 | 错误处理复杂 | 提供exceptionally ,handle ,whenComplete 等方法,方便进行错误处理 |
完成通知 | 缺乏完成通知 | 提供thenAccept ,runAfterEither ,whenComplete 等方法,可以在任务完成后立即执行某些操作 |
手动完成 | 不支持手动完成 | 支持手动完成,可以使用complete 和completeExceptionally 方法 |
适用场景 | 简单异步任务,不需要复杂的组合和错误处理 | 复杂的异步流程,需要灵活的组合和完善的错误处理 |
灵活运用异步工具
Future
和CompletableFuture
是Java并发包中强大的工具,它们可以帮助我们构建高效和可伸缩的异步应用程序。Future
提供了基本的异步编程模型,而CompletableFuture
则提供了更强大和灵活的异步编程模型。在实际开发中,我们需要根据具体的需求选择合适的工具。正确使用它们能够显著提高应用程序的性能和响应能力。