CompletableFuture:Java多线程任务编排与结果合并的高级技巧
大家好,今天我们来深入探讨Java并发编程中一个非常强大的工具——CompletableFuture。它不仅简化了异步编程模型,还提供了丰富的API,让我们能够更优雅地进行多线程任务的编排和结果合并。本次讲座将从CompletableFuture的基本概念入手,逐步讲解其高级用法,并结合实例代码,帮助大家掌握利用CompletableFuture构建高效并发应用的技巧。
1. 基础概念与创建方式
CompletableFuture代表一个异步计算的结果。它允许你在任务完成时异步地执行后续操作,而无需阻塞当前线程。我们可以通过多种方式创建CompletableFuture:
CompletableFuture.supplyAsync(Supplier<U> supplier): 使用Supplier异步执行一个任务并返回结果。 常用于执行耗时的计算任务。CompletableFuture.runAsync(Runnable runnable): 使用Runnable异步执行一个任务,没有返回值。 常用于执行一些不需要返回值的操作。CompletableFuture.completedFuture(T value): 创建一个已经完成的CompletableFuture,其结果为给定的值。 用于快速创建一个已完成的任务。new CompletableFuture<T>(): 创建一个空的CompletableFuture,需要手动调用complete(T value)或者completeExceptionally(Throwable ex)来完成或者抛出异常。 用于更精细的控制任务的完成状态。
以下是一些代码示例:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
public class CompletableFutureExample {
public static void main(String[] args) throws Exception {
// 1. 使用 supplyAsync 创建 CompletableFuture
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("Running task in thread: " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "Hello from supplyAsync";
});
System.out.println("supplyAsync Result: " + future1.get()); // get()会阻塞,直到任务完成
// 2. 使用 runAsync 创建 CompletableFuture
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println("Running task in thread: " + Thread.currentThread().getName());
System.out.println("Hello from runAsync");
});
future2.get(); // 等待任务完成
// 3. 使用 completedFuture 创建 CompletableFuture
CompletableFuture<String> future3 = CompletableFuture.completedFuture("Hello from completedFuture");
System.out.println("completedFuture Result: " + future3.get());
// 4. 使用 new CompletableFuture() 创建 CompletableFuture
CompletableFuture<String> future4 = new CompletableFuture<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
try {
Thread.sleep(500);
future4.complete("Hello from manual completion");
} catch (InterruptedException e) {
future4.completeExceptionally(e); // 发生异常时需要手动处理
Thread.currentThread().interrupt();
}
});
System.out.println("manual completion Result: " + future4.get());
executor.shutdown();
}
}
2. 任务编排:串行、并行与组合
CompletableFuture 提供了强大的API,可以灵活地编排异步任务。主要包括串行执行、并行执行和组合执行三种方式。
-
串行执行 (Then Compose & Then Apply)
thenApply(Function<T,U> fn): 在前一个CompletableFuture完成后,将它的结果作为参数传递给Function,并返回一个新的CompletableFuture,其结果是Function的返回值。thenApplyAsync(Function<T,U> fn): 与thenApply类似,但异步执行Function。thenCompose(Function<T,CompletableFuture<U>> fn): 在前一个CompletableFuture完成后,将它的结果作为参数传递给Function,该Function返回一个新的CompletableFuture。thenCompose用于连接两个依赖于彼此的异步操作,避免嵌套的CompletableFuture。thenComposeAsync(Function<T,CompletableFuture<U>> fn): 与thenCompose类似,但异步执行Function。thenAccept(Consumer<T> consumer): 在前一个CompletableFuture完成后,将它的结果作为参数传递给Consumer,没有返回值。thenAcceptAsync(Consumer<T> consumer): 与thenAccept类似,但异步执行Consumer。thenRun(Runnable runnable): 在前一个CompletableFuture完成后,执行Runnable,没有返回值。thenRunAsync(Runnable runnable): 与thenRun类似,但异步执行Runnable。
thenApply和thenCompose的区别在于,thenApply的参数是一个返回普通值的函数,而thenCompose的参数是一个返回CompletableFuture的函数。thenCompose通常用于处理依赖于前一个任务结果的异步操作。 -
并行执行 (Then Combine & Then Accept Both)
thenCombine(CompletionStage<U> other, BiFunction<T,U,V> fn): 当两个CompletableFuture都完成后,将它们的结果作为参数传递给BiFunction,并返回一个新的CompletableFuture,其结果是BiFunction的返回值。thenCombineAsync(CompletionStage<U> other, BiFunction<T,U,V> fn): 与thenCombine类似,但异步执行BiFunction。thenAcceptBoth(CompletionStage<?> other, BiConsumer<T,U> consumer): 当两个CompletableFuture都完成后,将它们的结果作为参数传递给BiConsumer,没有返回值。thenAcceptBothAsync(CompletionStage<?> other, BiConsumer<T,U> consumer): 与thenAcceptBoth类似,但异步执行BiConsumer。runAfterBoth(CompletionStage<?> other, Runnable runnable): 当两个CompletableFuture都完成后,执行Runnable,没有返回值。runAfterBothAsync(CompletionStage<?> other, Runnable runnable): 与runAfterBoth类似,但异步执行Runnable。
-
组合执行 (Any Of & All Of)
anyOf(CompletableFuture<?>... cfs): 返回一个新的CompletableFuture,当参数中的任何一个CompletableFuture完成时,新的CompletableFuture也会完成,其结果是第一个完成的CompletableFuture的结果。allOf(CompletableFuture<?>... cfs): 返回一个新的CompletableFuture,当参数中的所有CompletableFuture都完成时,新的CompletableFuture才会完成,其结果是一个空的CompletableFuture。 可以使用join()方法等待所有任务完成并获取结果 (如果需要)。
以下是一些代码示例,展示了如何使用这些方法进行任务编排:
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureComposeExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(4);
// 1. 串行执行 (thenApply, thenCompose)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello", executor)
.thenApply(s -> s + " World")
.thenApply(String::toUpperCase);
System.out.println("串行执行 Result: " + future1.get()); // 输出: HELLO WORLD
CompletableFuture<String> futureCompose = CompletableFuture.supplyAsync(() -> "Initial Value", executor)
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " - Transformed", executor));
System.out.println("thenCompose Result: " + futureCompose.get()); // 输出: Initial Value - Transformed
// 2. 并行执行 (thenCombine)
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Hello", executor);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> " World", executor);
CompletableFuture<String> combinedFuture = future2.thenCombine(future3, (s1, s2) -> s1 + s2);
System.out.println("并行执行 Result: " + combinedFuture.get()); // 输出: Hello World
// 3. 组合执行 (allOf)
List<String> websites = Arrays.asList("https://www.google.com", "https://www.baidu.com", "https://www.bing.com");
List<CompletableFuture<String>> futures = websites.stream()
.map(url -> CompletableFuture.supplyAsync(() -> {
try {
// Simulate fetching content from a website
Thread.sleep(500);
return "Content from " + url;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Error fetching from " + url;
}
}, executor))
.toList();
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.thenRun(() -> {
System.out.println("All websites fetched.");
futures.forEach(future -> {
try {
System.out.println(future.get());
} catch (Exception e) {
System.err.println("Error getting result: " + e.getMessage());
}
});
});
allFutures.get(); // Wait for all futures to complete
// 4. 组合执行 (anyOf)
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result A";
}, executor);
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result B";
}, executor);
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(futureA, futureB);
System.out.println("anyOf Result: " + anyFuture.get()); // Will print "Result B" because it completes first
executor.shutdown();
}
}
3. 异常处理
在异步编程中,异常处理至关重要。CompletableFuture 提供了多种机制来处理任务执行过程中可能出现的异常:
exceptionally(Function<Throwable,T> fn): 当CompletableFuture抛出异常时,调用Function处理异常,并返回一个新的CompletableFuture,其结果是Function的返回值。handle(BiFunction<T,Throwable,U> fn): 无论CompletableFuture正常完成还是抛出异常,都会调用BiFunction处理结果或异常,并返回一个新的CompletableFuture,其结果是BiFunction的返回值。whenComplete(BiConsumer<T,Throwable> action): 无论CompletableFuture正常完成还是抛出异常,都会调用BiConsumer处理结果或异常,没有返回值。completeExceptionally(Throwable ex): 手动设置CompletableFuture抛出异常。obtrudeValue(T value): 强行设置CompletableFuture的值为给定值。此方法会忽略任何已存在的完成状态,包括异常。 通常不建议使用,因为它会破坏CompletableFuture的正常状态转换。obtrudeException(Throwable ex): 强行设置CompletableFuture抛出给定异常。 与obtrudeValue类似,不推荐使用。
以下是一些代码示例:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExceptionHandling {
public static void main(String[] args) throws Exception {
// 1. exceptionally
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("Something went wrong!");
}
return "Result";
}).exceptionally(ex -> {
System.err.println("Exception occurred: " + ex.getMessage());
return "Recovered Result";
});
System.out.println("exceptionally Result: " + future1.get()); // 输出: Recovered Result
// 2. handle
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("Another error!");
}
return "Result";
}).handle((result, ex) -> {
if (ex != null) {
System.err.println("Exception occurred: " + ex.getMessage());
return "Handle Recovered Result";
} else {
return result;
}
});
System.out.println("handle Result: " + future2.get()); // 输出: Handle Recovered Result
// 3. whenComplete
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("Yet another error!");
}
return "Result";
}).whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("Exception occurred: " + ex.getMessage());
} else {
System.out.println("Result: " + result);
}
});
try {
future3.get(); // 会抛出异常,因为 whenComplete 不会恢复异常
} catch (InterruptedException | ExecutionException e) {
System.err.println("Exception caught: " + e.getMessage());
}
// 4. completeExceptionally
CompletableFuture<String> future4 = new CompletableFuture<>();
new Thread(() -> {
try {
Thread.sleep(500);
future4.completeExceptionally(new IllegalStateException("Manually completed with exception"));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
future4.completeExceptionally(e);
}
}).start();
try {
future4.get();
} catch (InterruptedException | ExecutionException e) {
System.err.println("Exception caught: " + e.getMessage()); // 输出: Manually completed with exception
}
}
}
4. 线程池的选择与配置
CompletableFuture 默认使用 ForkJoinPool.commonPool() 作为其默认的线程池。 但为了更好地控制并发度,避免线程饥饿,以及更精细地管理线程资源,通常建议使用自定义的线程池。
Executor接口:CompletableFuture提供了接受Executor参数的方法,例如supplyAsync(Supplier<U> supplier, Executor executor)。
选择线程池时需要考虑以下因素:
- CPU 密集型任务: 对于 CPU 密集型任务,线程池的大小通常设置为 CPU 核心数 + 1。
- I/O 密集型任务: 对于 I/O 密集型任务,线程池的大小可以设置为 CPU 核心数的 2 倍甚至更多,具体取决于 I/O 操作的阻塞时间。
- 任务的优先级: 可以使用优先级队列的线程池来执行不同优先级的任务。
- 任务的隔离: 可以使用不同的线程池来隔离不同类型的任务,避免相互干扰。
以下是一些线程池的类型:
| 线程池类型 | 描述 |
|---|---|
FixedThreadPool |
固定大小的线程池,适用于任务数量相对稳定,需要限制并发数的场景。 |
CachedThreadPool |
可缓存的线程池,线程数量动态调整,适用于任务数量变化频繁,执行时间较短的场景。 |
ScheduledThreadPool |
可以执行定时任务的线程池,适用于需要周期性执行任务的场景。 |
SingleThreadExecutor |
单线程的线程池,保证任务按顺序执行,适用于需要串行执行任务的场景。 |
ForkJoinPool |
JDK 7 引入的线程池,适用于可以分解成小任务的场景,例如递归算法。 |
ThreadPoolExecutor (自定义) |
允许更细粒度的配置,例如核心线程数、最大线程数、空闲线程存活时间、任务队列等。 |
以下是一个使用自定义线程池的示例:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureWithExecutor {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Running task in thread: " + Thread.currentThread().getName());
return "Hello from custom executor";
}, executor);
System.out.println("Result: " + future.get());
executor.shutdown();
}
}
5. 异步编程的最佳实践
- 避免阻塞操作: 在
CompletableFuture的回调函数中,尽量避免执行阻塞操作,否则会影响异步执行的效率。 - 合理选择线程池: 根据任务的类型和特点,选择合适的线程池,并进行合理的配置。
- 充分利用组合API: 利用
thenCompose、thenCombine、allOf等组合API,可以简化任务编排,提高代码的可读性和可维护性。 - 注意异常处理: 在异步编程中,异常处理尤为重要,需要充分考虑各种异常情况,并进行妥善处理。
- 避免回调地狱: 过多的回调嵌套会导致代码难以理解和维护,可以使用
thenCompose等方法来避免回调地狱。 - 监控和调优: 使用监控工具来观察
CompletableFuture的执行情况,并根据实际情况进行调优。
代码演示:利用CompletableFuture处理订单流程
假设有一个在线购物平台,需要处理订单创建、支付、库存扣减和物流通知等多个步骤。 我们可以使用 CompletableFuture 来异步地执行这些步骤,提高系统的响应速度。
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class OrderProcessingExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(4);
// 1. 创建订单
CompletableFuture<Order> createOrderFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Creating order in thread: " + Thread.currentThread().getName());
// Simulate creating an order
try {
Thread.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return new Order(123, "Product XYZ", 1, 99.99);
}, executor);
// 2. 支付
CompletableFuture<PaymentResult> paymentFuture = createOrderFuture.thenApplyAsync(order -> {
System.out.println("Processing payment for order " + order.orderId + " in thread: " + Thread.currentThread().getName());
// Simulate payment processing
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
boolean success = new Random().nextBoolean();
return new PaymentResult(order.orderId, success, success ? "Payment successful" : "Payment failed");
}, executor);
// 3. 扣减库存
CompletableFuture<InventoryResult> inventoryFuture = createOrderFuture.thenApplyAsync(order -> {
System.out.println("Deducting inventory for order " + order.orderId + " in thread: " + Thread.currentThread().getName());
// Simulate inventory deduction
try {
Thread.sleep(new Random().nextInt(300));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
boolean success = new Random().nextBoolean();
return new InventoryResult(order.orderId, success, success ? "Inventory deducted" : "Inventory deduction failed");
}, executor);
// 4. 物流通知 (依赖于支付和库存结果)
CompletableFuture<ShippingNotification> shippingFuture = paymentFuture.thenCombineAsync(inventoryFuture, (paymentResult, inventoryResult) -> {
System.out.println("Sending shipping notification for order " + paymentResult.orderId + " in thread: " + Thread.currentThread().getName());
// Simulate sending shipping notification
try {
Thread.sleep(new Random().nextInt(200));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
boolean readyForShipping = paymentResult.success && inventoryResult.success;
String message = readyForShipping ? "Order ready for shipping" : "Order cannot be shipped";
return new ShippingNotification(paymentResult.orderId, readyForShipping, message);
}, executor);
// 5. 处理最终结果
shippingFuture.thenAccept(notification -> {
System.out.println("Shipping notification: " + notification);
});
// 等待所有任务完成
shippingFuture.get();
executor.shutdown();
}
static class Order {
int orderId;
String product;
int quantity;
double price;
public Order(int orderId, String product, int quantity, double price) {
this.orderId = orderId;
this.product = product;
this.quantity = quantity;
this.price = price;
}
}
static class PaymentResult {
int orderId;
boolean success;
String message;
public PaymentResult(int orderId, boolean success, String message) {
this.orderId = orderId;
this.success = success;
this.message = message;
}
}
static class InventoryResult {
int orderId;
boolean success;
String message;
public InventoryResult(int orderId, boolean success, String message) {
this.orderId = orderId;
this.success = success;
this.message = message;
}
}
static class ShippingNotification {
int orderId;
boolean readyForShipping;
String message;
public ShippingNotification(int orderId, boolean readyForShipping, String message) {
this.orderId = orderId;
this.readyForShipping = readyForShipping;
this.message = message;
}
@Override
public String toString() {
return "ShippingNotification{" +
"orderId=" + orderId +
", readyForShipping=" + readyForShipping +
", message='" + message + ''' +
'}';
}
}
}
通过这个例子,我们可以看到 CompletableFuture 如何方便地处理复杂的异步流程,提高系统的并发能力和响应速度。
总结:异步任务编排,高效并发编程
CompletableFuture 是 Java 并发编程中一个强大的工具,它通过提供丰富的 API 和灵活的编排方式,简化了异步编程模型,使得我们可以更优雅地构建高效的并发应用。 掌握 CompletableFuture 的高级用法,能够帮助我们更好地处理复杂的异步任务,提高系统的性能和可维护性。