Java 多线程任务依赖结果:CompletableFuture.thenCompose 链式处理
各位朋友,大家好!今天我们来深入探讨 Java 多线程编程中一个非常重要的概念:任务依赖以及如何使用 CompletableFuture.thenCompose 进行优雅的链式处理。在实际的软件开发中,很多任务并非孤立存在,而是存在着复杂的依赖关系,一个任务的执行可能需要依赖另一个任务的结果。处理这种依赖关系,是保证程序正确性和效率的关键。
任务依赖的场景分析
在深入代码之前,我们先来看看一些常见的任务依赖场景:
- 数据准备和处理: 假设我们需要从数据库获取用户信息,然后根据用户信息调用第三方服务获取用户画像。获取用户画像的操作显然依赖于获取用户信息的成功。
- 订单处理: 在电商系统中,订单的创建可能依赖于库存的检查。只有库存足够,才能创建订单。
- 异步服务调用: 多个异步服务调用,后面的服务调用需要使用前一个服务的返回值作为参数。
这些场景都体现了任务之间的依赖关系。如果我们不合理地处理这些依赖,可能会导致以下问题:
- 阻塞: 主线程等待某个任务完成,导致程序响应缓慢。
- 资源浪费: 多个线程同时等待同一个资源,导致资源竞争激烈。
- 代码复杂: 使用回调函数或者复杂的同步机制,导致代码难以维护。
CompletableFuture 简介
CompletableFuture 是 Java 8 引入的一个强大的异步编程工具。它实现了 Future 和 CompletionStage 接口,提供了丰富的 API 来处理异步任务。CompletableFuture 的核心优势在于:
- 异步执行: 允许在后台线程中执行任务,避免阻塞主线程。
- 链式调用: 提供了
thenApply,thenAccept,thenRun,thenCompose等方法,可以方便地将多个异步任务串联起来。 - 异常处理: 提供了
exceptionally,handle,whenComplete等方法,可以优雅地处理异步任务中的异常。 - 组合操作: 提供了
allOf,anyOf等方法,可以组合多个CompletableFuture对象。
thenCompose:处理任务依赖的核心
thenCompose 是 CompletableFuture 中一个非常重要的方法,它专门用于处理任务依赖的场景。thenCompose 的作用是:将一个 CompletableFuture 的结果作为参数,传递给另一个返回 CompletableFuture 的函数,并将这两个 CompletableFuture 组合成一个新的 CompletableFuture。
thenCompose 的方法签名如下:
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
T:当前CompletableFuture的结果类型。U:thenCompose返回的CompletableFuture的结果类型。fn:一个函数,接收T作为参数,返回一个CompletionStage<U>。
重点:fn 必须返回一个 CompletableFuture 或者 CompletionStage,这是 thenCompose 的关键。
thenCompose 与 thenApply 的区别:
| 方法 | 功能 | 返回值类型 |
|---|---|---|
thenApply |
将一个 CompletableFuture 的结果作为参数,传递给一个函数,并将该函数的结果作为新的 CompletableFuture 的结果。 |
CompletableFuture<U> |
thenCompose |
将一个 CompletableFuture 的结果作为参数,传递给一个返回 CompletableFuture 的函数,并将这两个 CompletableFuture 组合成一个新的 CompletableFuture。 |
CompletableFuture<U> |
简单来说,thenApply 用于转换结果,而 thenCompose 用于处理依赖关系。thenCompose 会 “解开” 嵌套的 CompletableFuture,最终返回一个扁平的 CompletableFuture。
代码示例:用户数据和用户画像
我们用一个实际的例子来演示 thenCompose 的用法。假设我们有两个方法:
getUserInfo(String userId): 异步地从数据库获取用户信息,返回一个CompletableFuture<UserInfo>。getUserProfile(UserInfo userInfo): 异步地根据用户信息获取用户画像,返回一个CompletableFuture<UserProfile>。
代码如下:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class UserInfo {
private String userId;
private String userName;
public UserInfo(String userId, String userName) {
this.userId = userId;
this.userName = userName;
}
public String getUserId() {
return userId;
}
public String getUserName() {
return userName;
}
@Override
public String toString() {
return "UserInfo{" +
"userId='" + userId + ''' +
", userName='" + userName + ''' +
'}';
}
}
class UserProfile {
private String userId;
private String profileData;
public UserProfile(String userId, String profileData) {
this.userId = userId;
this.profileData = profileData;
}
public String getUserId() {
return userId;
}
public String getProfileData() {
return profileData;
}
@Override
public String toString() {
return "UserProfile{" +
"userId='" + userId + ''' +
", profileData='" + profileData + ''' +
'}';
}
}
public class CompletableFutureThenComposeExample {
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
public static CompletableFuture<UserInfo> getUserInfo(String userId) {
return CompletableFuture.supplyAsync(() -> {
// 模拟从数据库获取用户信息
try {
TimeUnit.MILLISECONDS.sleep(200); // 模拟数据库查询延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("获取用户信息, thread: " + Thread.currentThread().getName());
return new UserInfo(userId, "用户" + userId);
}, executor);
}
public static CompletableFuture<UserProfile> getUserProfile(UserInfo userInfo) {
return CompletableFuture.supplyAsync(() -> {
// 模拟调用第三方服务获取用户画像
try {
TimeUnit.MILLISECONDS.sleep(300); // 模拟第三方服务调用延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("获取用户画像, thread: " + Thread.currentThread().getName());
return new UserProfile(userInfo.getUserId(), "画像数据 for " + userInfo.getUserId());
}, executor);
}
public static void main(String[] args) throws Exception {
String userId = "123";
CompletableFuture<UserProfile> userProfileFuture = getUserInfo(userId)
.thenCompose(userInfo -> getUserProfile(userInfo));
UserProfile userProfile = userProfileFuture.get(); // 阻塞直到结果返回
System.out.println("最终结果: " + userProfile);
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}
}
在这个例子中,我们首先使用 getUserInfo 异步地获取用户信息,然后使用 thenCompose 将 getUserInfo 的结果传递给 getUserProfile,从而获取用户画像。thenCompose 保证了 getUserProfile 只有在 getUserInfo 完成后才会执行。
分析:
getUserInfo(userId)返回一个CompletableFuture<UserInfo>。thenCompose(userInfo -> getUserProfile(userInfo))接收UserInfo作为参数,并返回一个CompletableFuture<UserProfile>。thenCompose将这两个CompletableFuture组合成一个新的CompletableFuture<UserProfile>。userProfileFuture.get()阻塞主线程,直到UserProfile的结果返回。- 我们使用了
ExecutorService来管理线程池,避免创建过多的线程。
异常处理
在使用 CompletableFuture 时,异常处理至关重要。我们可以使用 exceptionally 方法来处理异步任务中的异常。
例如,如果 getUserInfo 或者 getUserProfile 抛出异常,我们可以使用 exceptionally 来提供一个默认值:
CompletableFuture<UserProfile> userProfileFuture = getUserInfo(userId)
.thenCompose(userInfo -> getUserProfile(userInfo))
.exceptionally(ex -> {
System.err.println("发生异常: " + ex.getMessage());
return new UserProfile(userId, "默认画像数据");
});
或者,我们可以使用 handle 方法来处理异常并返回一个结果:
CompletableFuture<UserProfile> userProfileFuture = getUserInfo(userId)
.thenCompose(userInfo -> getUserProfile(userInfo))
.handle((userProfile, ex) -> {
if (ex != null) {
System.err.println("发生异常: " + ex.getMessage());
return new UserProfile(userId, "默认画像数据");
} else {
return userProfile;
}
});
exceptionally 接收一个 Function<Throwable, ? extends T>,用于处理异常并返回一个默认值。handle 接收一个 BiFunction<? super T, Throwable, ? extends U>,用于处理正常结果和异常,并返回一个结果。
进一步的优化:使用不同的 Executor
默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool() 作为默认的执行器。在某些情况下,我们可能需要使用自定义的 Executor 来执行任务,以提高程序的性能和可控性。
我们可以在 supplyAsync 方法中指定 Executor:
public static CompletableFuture<UserInfo> getUserInfo(String userId) {
return CompletableFuture.supplyAsync(() -> {
// 模拟从数据库获取用户信息
try {
TimeUnit.MILLISECONDS.sleep(200); // 模拟数据库查询延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("获取用户信息, thread: " + Thread.currentThread().getName());
return new UserInfo(userId, "用户" + userId);
}, executor); // 使用自定义的 Executor
}
通过使用不同的 Executor,我们可以将不同的任务分配到不同的线程池中,从而避免线程之间的竞争,提高程序的并发性能。
实际案例:订单处理流程
现在,让我们考虑一个更复杂的实际案例:订单处理流程。假设订单处理流程包含以下步骤:
- 库存检查: 检查商品库存是否足够。
- 创建订单: 如果库存足够,则创建订单。
- 支付: 尝试进行支付。
- 发送通知: 订单创建成功后,发送通知给用户。
我们可以使用 CompletableFuture 和 thenCompose 来实现这个流程:
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class Order {
private String orderId;
private String userId;
private String productId;
private int quantity;
private boolean paid;
public Order(String orderId, String userId, String productId, int quantity) {
this.orderId = orderId;
this.userId = userId;
this.productId = productId;
this.quantity = quantity;
this.paid = false;
}
public String getOrderId() {
return orderId;
}
public String getUserId() {
return userId;
}
public String getProductId() {
return productId;
}
public int getQuantity() {
return quantity;
}
public boolean isPaid() {
return paid;
}
public void setPaid(boolean paid) {
this.paid = paid;
}
@Override
public String toString() {
return "Order{" +
"orderId='" + orderId + ''' +
", userId='" + userId + ''' +
", productId='" + productId + ''' +
", quantity=" + quantity +
", paid=" + paid +
'}';
}
}
public class OrderProcessingExample {
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
private static final Random random = new Random();
public static CompletableFuture<Boolean> checkInventory(String productId, int quantity) {
return CompletableFuture.supplyAsync(() -> {
// 模拟库存检查
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
boolean hasEnoughInventory = random.nextBoolean(); // 随机模拟库存是否足够
System.out.println("检查库存, productId: " + productId + ", quantity: " + quantity + ", hasEnoughInventory: " + hasEnoughInventory + ", thread: " + Thread.currentThread().getName());
return hasEnoughInventory;
}, executor);
}
public static CompletableFuture<Order> createOrder(String userId, String productId, int quantity) {
return CompletableFuture.supplyAsync(() -> {
// 模拟创建订单
try {
TimeUnit.MILLISECONDS.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
String orderId = "ORDER-" + System.currentTimeMillis();
Order order = new Order(orderId, userId, productId, quantity);
System.out.println("创建订单, orderId: " + orderId + ", thread: " + Thread.currentThread().getName());
return order;
}, executor);
}
public static CompletableFuture<Order> processPayment(Order order) {
return CompletableFuture.supplyAsync(() -> {
// 模拟支付
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
boolean paymentSuccessful = random.nextBoolean(); // 随机模拟支付是否成功
if (paymentSuccessful) {
order.setPaid(true);
System.out.println("支付成功, orderId: " + order.getOrderId() + ", thread: " + Thread.currentThread().getName());
} else {
System.out.println("支付失败, orderId: " + order.getOrderId() + ", thread: " + Thread.currentThread().getName());
throw new RuntimeException("支付失败");
}
return order;
}, executor);
}
public static CompletableFuture<Void> sendNotification(Order order) {
return CompletableFuture.runAsync(() -> {
// 模拟发送通知
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("发送通知, orderId: " + order.getOrderId() + ", thread: " + Thread.currentThread().getName());
}, executor);
}
public static void main(String[] args) throws Exception {
String userId = "USER-123";
String productId = "PRODUCT-456";
int quantity = 2;
CompletableFuture<Order> orderFuture = checkInventory(productId, quantity)
.thenCompose(hasEnoughInventory -> {
if (hasEnoughInventory) {
return createOrder(userId, productId, quantity);
} else {
return CompletableFuture.failedFuture(new RuntimeException("库存不足"));
}
})
.thenCompose(order -> processPayment(order))
.thenCompose(order -> sendNotification(order).thenApply(v -> order)) // sendNotification 返回 Void, 这里需要转换成 Order
.exceptionally(ex -> {
System.err.println("订单处理失败: " + ex.getMessage());
return null; // 返回 null 表示订单处理失败
});
Order finalOrder = orderFuture.get();
if (finalOrder != null) {
System.out.println("订单处理完成: " + finalOrder);
} else {
System.out.println("订单处理失败");
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}
}
代码分析:
checkInventory检查库存,返回CompletableFuture<Boolean>。- 如果库存足够,
createOrder创建订单,返回CompletableFuture<Order>。 如果库存不足,返回一个失败的CompletableFuture。 processPayment处理支付,返回CompletableFuture<Order>。sendNotification发送通知,返回CompletableFuture<Void>。 由于sendNotification返回Void,我们需要使用thenApply(v -> order)将其转换为CompletableFuture<Order>,以便后续的链式调用。exceptionally处理异常。
这个例子展示了如何使用 CompletableFuture 和 thenCompose 来构建一个复杂的异步流程,并处理各种异常情况。
避免死锁
在使用 CompletableFuture 时,需要特别注意避免死锁。死锁通常发生在以下情况:
- 互相依赖: 两个或多个
CompletableFuture互相等待对方完成。 - 阻塞等待: 在
CompletableFuture的回调函数中使用get()方法阻塞等待结果。
为了避免死锁,应该尽量避免阻塞等待,并使用 CompletableFuture 提供的异步方法来处理结果。
总结:使用 thenCompose 优雅地处理任务依赖
今天我们深入探讨了 Java 多线程编程中任务依赖的概念,以及如何使用 CompletableFuture.thenCompose 进行链式处理。thenCompose 允许我们优雅地将多个异步任务串联起来,处理任务之间的依赖关系,并进行有效的异常处理。 通过实际案例,我们展示了 thenCompose 在用户数据处理和订单处理流程中的应用。希望这些内容能帮助大家更好地理解和使用 CompletableFuture,写出更高效、更健壮的并发程序。
关键要点回顾
thenCompose专门用于处理异步任务间的依赖关系。thenCompose接收一个返回CompletableFuture的函数作为参数。- 要避免在使用
CompletableFuture时出现死锁。