CompletableFuture:Java 多线程任务编排与结果合并的高级技巧
各位朋友,大家好!今天我们来深入探讨 Java 并发编程中一个非常强大的工具:CompletableFuture
。它不仅能够简化异步编程,还能让我们以声明式的方式编排复杂的并发任务,并高效地合并最终结果。本讲座将从 CompletableFuture
的基础概念入手,逐步深入到高级应用,并通过丰富的代码示例,帮助大家掌握这项关键技术。
1. CompletableFuture 的基础:Promise 与 Future 的进化
在传统的 Java 多线程编程中,我们通常使用 Future
接口来表示异步计算的结果。Future
允许我们提交一个任务到线程池,然后通过 get()
方法阻塞地等待结果。但是,这种方式存在几个明显的缺点:
- 阻塞等待:
get()
方法会阻塞当前线程,直到结果可用。这会导致线程资源的浪费,降低程序的响应性。 - 缺乏回调机制:
Future
本身不提供回调机制,当结果可用时,无法主动通知调用者。 - 组合困难: 如果需要将多个
Future
的结果组合起来,逻辑会变得非常复杂,容易出错。
CompletableFuture
正是为了解决这些问题而诞生的。它实现了 Future
和 CompletionStage
接口,提供了更加灵活和强大的异步编程模型。可以将 CompletableFuture
看作是 Future
的一个增强版本,它引入了 Promise
的概念,允许我们以非阻塞的方式处理异步计算的结果,并提供了丰富的回调机制和组合方法。
简单来说,CompletableFuture
代表着一个最终可能完成的异步操作,并且提供了各种方法来注册在操作完成时执行的回调函数。
2. 创建 CompletableFuture 对象
创建 CompletableFuture
对象有多种方式,最常用的包括以下几种:
-
CompletableFuture.completedFuture(value)
: 创建一个已经完成的CompletableFuture
,并返回指定的值。CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("Hello World"); try { String result = completedFuture.get(); // 直接返回 "Hello World",不会阻塞 System.out.println(result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
-
CompletableFuture.runAsync(Runnable)
: 创建一个异步执行的CompletableFuture
,执行一个Runnable
任务,没有返回值。CompletableFuture<Void> runAsyncFuture = CompletableFuture.runAsync(() -> { System.out.println("Running task in separate thread: " + Thread.currentThread().getName()); });
-
CompletableFuture.supplyAsync(Supplier)
: 创建一个异步执行的CompletableFuture
,执行一个Supplier
任务,并返回一个结果。CompletableFuture<String> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> { System.out.println("Running task in separate thread: " + Thread.currentThread().getName()); return "Result from Supplier"; }); try { String result = supplyAsyncFuture.get(); System.out.println(result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
-
new CompletableFuture()
: 创建一个新的CompletableFuture
对象,需要手动调用complete()
、completeExceptionally()
方法来设置结果或抛出异常。CompletableFuture<String> future = new CompletableFuture<>(); new Thread(() -> { try { Thread.sleep(2000); // 模拟耗时操作 future.complete("Task completed"); // 设置结果 } catch (InterruptedException e) { future.completeExceptionally(e); // 设置异常 } }).start(); try { String result = future.get(); System.out.println(result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
runAsync
和 supplyAsync
方法默认使用 ForkJoinPool.commonPool()
作为默认线程池。我们也可以通过传入 Executor
参数来指定自定义线程池。
3. CompletableFuture 的核心方法:回调与转换
CompletableFuture
提供了丰富的回调方法,用于在异步任务完成时执行特定的操作。这些回调方法可以分为以下几类:
-
thenApply(Function)
: 对结果进行转换,返回一个新的CompletableFuture
,其结果是应用Function
转换后的结果。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> upperCaseFuture = future.thenApply(String::toUpperCase); // 转换为大写 try { System.out.println(upperCaseFuture.get()); // 输出: HELLO } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
-
thenAccept(Consumer)
: 消费结果,返回CompletableFuture<Void>
,不返回新的结果。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture<Void> printFuture = future.thenAccept(System.out::println); // 打印结果 try { printFuture.get(); // 等待打印完成,返回 null } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
-
thenRun(Runnable)
: 在结果完成后执行一个Runnable
任务,返回CompletableFuture<Void>
,不关心结果。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Task Done"); CompletableFuture<Void> runFuture = future.thenRun(() -> System.out.println("Task finished!")); try { runFuture.get(); // 等待任务完成,返回 null } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
-
thenApplyAsync(Function)
、thenAcceptAsync(Consumer)
、thenRunAsync(Runnable)
: 与对应的同步方法类似,但是异步执行回调函数。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Async"); CompletableFuture<String> upperCaseFuture = future.thenApplyAsync(String::toUpperCase); // 异步转换为大写 try { System.out.println(upperCaseFuture.get()); // 输出: ASYNC } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
-
exceptionally(Function)
: 当CompletableFuture
抛出异常时,执行Function
来处理异常,并返回一个新的CompletableFuture
,其结果是处理后的结果。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Something went wrong"); }); CompletableFuture<String> recoveredFuture = future.exceptionally(ex -> { System.err.println("Exception caught: " + ex.getMessage()); return "Recovered value"; }); try { System.out.println(recoveredFuture.get()); // 输出: Recovered value } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
-
whenComplete(BiConsumer)
: 无论CompletableFuture
正常完成还是抛出异常,都会执行BiConsumer
,可以用于记录日志或清理资源。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) { return "Success"; } else { throw new RuntimeException("Random failure"); } }); CompletableFuture<String> completedFuture = future.whenComplete((result, ex) -> { if (ex == null) { System.out.println("Task completed successfully: " + result); } else { System.err.println("Task completed with exception: " + ex.getMessage()); } }); try { System.out.println(completedFuture.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
-
handle(BiFunction)
: 与whenComplete
类似,但是handle
接收一个BiFunction
,可以根据结果或异常返回一个新的结果,并封装到新的CompletableFuture
中。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) { return "Success"; } else { throw new RuntimeException("Random failure"); } }); CompletableFuture<String> handledFuture = future.handle((result, ex) -> { if (ex == null) { return "Handled: " + result; } else { System.err.println("Exception caught: " + ex.getMessage()); return "Handled: Error occurred"; } }); try { System.out.println(handledFuture.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
4. CompletableFuture 的组合:编排复杂任务
CompletableFuture
提供了强大的组合方法,允许我们将多个 CompletableFuture
组合起来,构建复杂的异步任务流程。常用的组合方法包括:
-
thenCompose(Function)
: 将一个CompletableFuture
的结果作为参数,传递给另一个返回CompletableFuture
的Function
,并将两个CompletableFuture
组合起来。适用于依赖关系的任务。CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> "user123"); CompletableFuture<String> orderFuture = userFuture.thenCompose(userId -> { return CompletableFuture.supplyAsync(() -> "order456 for user " + userId); }); try { System.out.println(orderFuture.get()); // 输出: order456 for user user123 } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
-
thenCombine(CompletableFuture, BiFunction)
: 将两个CompletableFuture
的结果合并起来,应用BiFunction
生成一个新的结果。两个CompletableFuture
是并行执行的。CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> "John"); CompletableFuture<String> lastNameFuture = CompletableFuture.supplyAsync(() -> "Doe"); CompletableFuture<String> fullNameFuture = nameFuture.thenCombine(lastNameFuture, (firstName, lastName) -> firstName + " " + lastName); try { System.out.println(fullNameFuture.get()); // 输出: John Doe } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
-
thenAcceptBoth(CompletableFuture, BiConsumer)
: 与thenCombine
类似,但是接收一个BiConsumer
,用于消费两个CompletableFuture
的结果,不返回新的结果。CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture<Void> acceptBothFuture = future1.thenAcceptBoth(future2, (str1, str2) -> { System.out.println(str1 + " " + str2); // 输出: Hello World }); try { acceptBothFuture.get(); // 等待任务完成,返回 null } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
-
runAfterBoth(CompletableFuture, Runnable)
: 当两个CompletableFuture
都完成后,执行一个Runnable
任务。CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2"); CompletableFuture<Void> runAfterBothFuture = future1.runAfterBoth(future2, () -> { System.out.println("Both tasks are completed"); }); try { runAfterBothFuture.get(); // 等待任务完成,返回 null } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
-
applyToEither(CompletableFuture, Function)
: 当两个CompletableFuture
中任何一个完成时,将它的结果应用Function
生成一个新的结果。CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result from Future 1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result from Future 2"); CompletableFuture<String> eitherFuture = future1.applyToEither(future2, result -> "The result is: " + result); try { System.out.println(eitherFuture.get()); // 可能输出: The result is: Result from Future 2 } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
-
acceptEither(CompletableFuture, Consumer)
: 与applyToEither
类似,但是接收一个Consumer
,用于消费先完成的CompletableFuture
的结果,不返回新的结果。CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result from Future 1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result from Future 2"); CompletableFuture<Void> acceptEitherFuture = future1.acceptEither(future2, result -> System.out.println("First completed result: " + result)); try { acceptEitherFuture.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
-
runAfterEither(CompletableFuture, Runnable)
: 当两个CompletableFuture
中任何一个完成后,执行一个Runnable
任务。CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Result from Future 1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result from Future 2"); CompletableFuture<Void> runAfterEitherFuture = future1.runAfterEither(future2, () -> System.out.println("One of the tasks is completed")); try { runAfterEitherFuture.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
5. 处理多个 CompletableFuture:allOf 和 anyOf
CompletableFuture
提供了 allOf
和 anyOf
方法,用于处理多个 CompletableFuture
的聚合结果。
-
CompletableFuture.allOf(CompletableFuture...)
: 返回一个新的CompletableFuture
,当所有传入的CompletableFuture
都完成时,该CompletableFuture
才会完成。 返回的CompletableFuture
的结果是Void
。CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2"); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Task 3"); CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2, future3); try { allOfFuture.get(); // 等待所有任务完成 System.out.println("All tasks are completed"); // 获取每个 CompletableFuture 的结果 (需要确保每个 future 都已完成) String result1 = future1.get(); String result2 = future2.get(); String result3 = future3.get(); System.out.println("Result 1: " + result1); System.out.println("Result 2: " + result2); System.out.println("Result 3: " + result3); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
-
CompletableFuture.anyOf(CompletableFuture...)
: 返回一个新的CompletableFuture
,当任何一个传入的CompletableFuture
完成时,该CompletableFuture
就会完成。 返回的CompletableFuture
的结果是第一个完成的CompletableFuture
的结果。CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Task 1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2"); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Task 3"); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3); try { Object result = anyOfFuture.get(); // 等待任何一个任务完成 System.out.println("One of the tasks is completed: " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
6. 案例分析:电商平台商品详情页加载
假设我们需要加载一个电商平台商品详情页,需要从不同的微服务获取商品信息、价格、库存、评价等数据。我们可以使用 CompletableFuture
来并行地获取这些数据,并最终合并成一个商品详情对象。
class ProductDetail {
private String name;
private double price;
private int stock;
private double rating;
// 省略构造函数、Getter 和 Setter
public ProductDetail(String name, double price, int stock, double rating) {
this.name = name;
this.price = price;
this.stock = stock;
this.rating = rating;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
public int getStock() {
return stock;
}
public void setStock(int stock) {
this.stock = stock;
}
public double getRating() {
return rating;
}
public void setRating(double rating) {
this.rating = rating;
}
@Override
public String toString() {
return "ProductDetail{" +
"name='" + name + ''' +
", price=" + price +
", stock=" + stock +
", rating=" + rating +
'}';
}
}
public class ProductDetailLoader {
public static CompletableFuture<String> getNameAsync() {
return CompletableFuture.supplyAsync(() -> {
// 模拟从商品信息微服务获取商品名称
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Awesome Product";
});
}
public static CompletableFuture<Double> getPriceAsync() {
return CompletableFuture.supplyAsync(() -> {
// 模拟从价格微服务获取商品价格
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 99.99;
});
}
public static CompletableFuture<Integer> getStockAsync() {
return CompletableFuture.supplyAsync(() -> {
// 模拟从库存微服务获取商品库存
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
}
public static CompletableFuture<Double> getRatingAsync() {
return CompletableFuture.supplyAsync(() -> {
// 模拟从评价微服务获取商品评价
try {
Thread.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 4.5;
});
}
public static CompletableFuture<ProductDetail> loadProductDetail() {
CompletableFuture<String> nameFuture = getNameAsync();
CompletableFuture<Double> priceFuture = getPriceAsync();
CompletableFuture<Integer> stockFuture = getStockAsync();
CompletableFuture<Double> ratingFuture = getRatingAsync();
return CompletableFuture.allOf(nameFuture, priceFuture, stockFuture, ratingFuture)
.thenApply(v -> {
try {
return new ProductDetail(nameFuture.get(), priceFuture.get(), stockFuture.get(), ratingFuture.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
long startTime = System.currentTimeMillis();
CompletableFuture<ProductDetail> productDetailFuture = loadProductDetail();
ProductDetail productDetail = productDetailFuture.get();
long endTime = System.currentTimeMillis();
System.out.println("Product Detail: " + productDetail);
System.out.println("Time taken: " + (endTime - startTime) + "ms"); // 预计耗时略大于 500ms
}
}
在这个例子中,我们使用 CompletableFuture.supplyAsync
并行地从不同的微服务获取数据,然后使用 CompletableFuture.allOf
等待所有数据都获取完成,最后将所有数据合并成一个 ProductDetail
对象。
7. 最佳实践与注意事项
- 避免阻塞: 尽可能使用异步回调方法,避免在
CompletableFuture
中进行阻塞操作。 - 选择合适的线程池: 根据任务的性质选择合适的线程池,例如 CPU 密集型任务使用
ForkJoinPool
,IO 密集型任务使用自定义的线程池。 - 处理异常: 使用
exceptionally
、whenComplete
或handle
方法来处理异常,保证程序的健壮性。 - 避免死锁: 在使用组合方法时,要注意避免死锁,例如避免在同一个
CompletableFuture
的回调函数中等待另一个CompletableFuture
的结果。 - 监控和调试: 使用 Java 的监控工具来监控
CompletableFuture
的执行情况,例如线程池的使用率、任务的执行时间等。
8. CompletableFuture的优势简述
CompletableFuture
的核心优势在于它提供了一种声明式、非阻塞的方式来编排并发任务。它通过回调机制和丰富的组合方法,简化了异步编程的复杂性,提高了程序的响应性和吞吐量。相比传统的 Future
接口,CompletableFuture
更加灵活和强大,是构建高性能并发应用的重要工具。
掌握要点:异步编排,高效并发
希望今天的讲座能够帮助大家更好地理解和使用 CompletableFuture
。 谢谢大家!