JAVA 实战:如何用 CompletableFuture 构建异步任务流水线?
各位同学,今天我们来聊聊如何使用 CompletableFuture 构建异步任务流水线。在并发编程中,我们经常需要将多个任务串联起来,一个任务的输出作为另一个任务的输入,形成一个流水线。传统的同步方式会阻塞线程,效率低下。CompletableFuture 提供了一种优雅的方式来构建异步流水线,充分利用多核 CPU,提高程序的吞吐量和响应速度。
1. CompletableFuture 基础:理解异步编程的核心
CompletableFuture 代表一个异步计算的结果。它允许我们注册回调函数,在计算完成时执行这些回调函数。与 Future 相比,CompletableFuture 更加灵活和强大,提供了丰富的 API 来处理异步任务的完成、异常和组合。
让我们从一个简单的例子开始:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建一个 CompletableFuture 对象
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("执行异步任务...");
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "Hello, CompletableFuture!";
});
// 获取异步任务的结果 (会阻塞直到任务完成)
String result = future.get();
System.out.println("异步任务的结果: " + result);
}
}
在这个例子中,CompletableFuture.supplyAsync() 创建了一个异步任务,该任务返回一个字符串 "Hello, CompletableFuture!"。 future.get() 方法会阻塞当前线程,直到异步任务完成并返回结果。
2. 构建简单的任务流水线:thenApply 和 thenCompose
thenApply 和 thenCompose 是 CompletableFuture 中构建任务流水线的两个核心方法。
-
thenApply(Function<T, R> fn): 接收一个函数fn,该函数将上一个任务的结果T作为输入,并返回一个新的结果R。thenApply返回一个新的CompletableFuture<R>,代表这个新的异步计算。 -
thenCompose(Function<T, CompletionStage<R>> fn): 与thenApply类似,但fn返回的是一个CompletionStage<R>,而不是直接返回R。thenCompose会将上一个任务的结果T作为输入,并创建一个新的异步任务CompletionStage<R>。 它的作用是将两个CompletableFuture连接起来,形成一个整体的异步流程。
下面是一个使用 thenApply 的例子:
import java.util.concurrent.CompletableFuture;
public class ThenApplyExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> upperCaseFuture = future.thenApply(String::toUpperCase);
upperCaseFuture.thenAccept(result -> System.out.println("转换后的结果: " + result)); // 输出: 转换后的结果: HELLO
}
}
在这个例子中,我们首先创建一个 CompletableFuture<String>,返回 "Hello"。 然后,我们使用 thenApply 将结果转换为大写。 最后,我们使用 thenAccept 消费结果。
接下来,看一个使用 thenCompose 的例子:
import java.util.concurrent.CompletableFuture;
public class ThenComposeExample {
public static CompletableFuture<String> getUserName(String userId) {
return CompletableFuture.supplyAsync(() -> {
// 模拟从数据库获取用户名
System.out.println("正在获取用户名...");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "Alice";
});
}
public static CompletableFuture<String> getEmail(String userName) {
return CompletableFuture.supplyAsync(() -> {
// 模拟从数据库获取用户邮箱
System.out.println("正在获取邮箱...");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return userName + "@example.com";
});
}
public static void main(String[] args) {
CompletableFuture<String> emailFuture = CompletableFuture.supplyAsync(() -> "123") // 模拟用户ID
.thenCompose(ThenComposeExample::getUserName)
.thenCompose(ThenComposeExample::getEmail);
emailFuture.thenAccept(email -> System.out.println("用户的邮箱: " + email));
}
}
在这个例子中,我们首先使用 supplyAsync 创建一个 CompletableFuture,模拟用户 ID。 然后,我们使用 thenCompose 依次获取用户名和邮箱。 thenCompose 确保了获取邮箱的操作只有在获取用户名操作完成后才会执行,并且将两个异步操作组合成一个整体。
thenApply vs thenCompose 的区别
| 特性 | thenApply |
thenCompose |
|---|---|---|
| 输入函数返回值 | 直接返回结果 R |
返回 CompletionStage<R> |
| 作用 | 对上一个任务的结果进行转换 | 将两个 CompletableFuture 连接起来,形成一个整体的异步流程 |
| 适用场景 | 简单的转换操作,不需要创建新的异步任务 | 需要创建新的异步任务,并将它们串联起来的情况 |
3. 处理异常:exceptionally 和 handle
在异步编程中,异常处理至关重要。 CompletableFuture 提供了 exceptionally 和 handle 方法来处理异常。
-
exceptionally(Function<Throwable, T> fn): 接收一个函数fn,该函数接收一个Throwable对象作为输入,并返回一个替代的结果T。 如果上一个任务抛出异常,则会执行fn。 -
handle(BiFunction<T, Throwable, R> fn): 接收一个函数fn,该函数接收上一个任务的结果T和一个Throwable对象作为输入,并返回一个新的结果R。 无论上一个任务是否抛出异常,都会执行fn。
下面是一个使用 exceptionally 的例子:
import java.util.concurrent.CompletableFuture;
public class ExceptionallyExample {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("执行可能抛出异常的任务...");
if (true) {
throw new RuntimeException("任务执行失败!");
}
return 100;
});
CompletableFuture<Integer> recoveredFuture = future.exceptionally(ex -> {
System.err.println("捕获到异常: " + ex.getMessage());
return 0; // 返回一个默认值
});
recoveredFuture.thenAccept(result -> System.out.println("最终结果: " + result)); // 输出: 最终结果: 0
}
}
在这个例子中,我们故意抛出一个异常。 exceptionally 方法捕获到异常,并返回一个默认值 0。
接下来,看一个使用 handle 的例子:
import java.util.concurrent.CompletableFuture;
public class HandleExample {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("执行可能抛出异常的任务...");
if (true) {
throw new RuntimeException("任务执行失败!");
}
return 100;
});
CompletableFuture<Integer> handledFuture = future.handle((result, ex) -> {
if (ex != null) {
System.err.println("捕获到异常: " + ex.getMessage());
return 0; // 返回一个默认值
} else {
return result;
}
});
handledFuture.thenAccept(result -> System.out.println("最终结果: " + result)); // 输出: 最终结果: 0
}
}
在这个例子中, handle 方法接收到异常,并返回一个默认值 0。 如果没有异常,则返回原始结果。
exceptionally vs handle 的区别
| 特性 | exceptionally |
handle |
|---|---|---|
| 执行时机 | 仅在发生异常时执行 | 无论是否发生异常都会执行 |
| 输入参数 | Throwable 对象 |
结果 T 和 Throwable 对象 |
| 适用场景 | 只需要在发生异常时提供一个替代结果的场景 | 需要根据是否发生异常来执行不同逻辑的场景 |
4. 组合多个 CompletableFuture:thenCombine 和 allOf
除了串联任务,我们还可以并行执行多个任务,并将它们的结果组合起来。 CompletableFuture 提供了 thenCombine 和 allOf 方法来实现这个功能。
-
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U, ? extends V> fn): 接收另一个CompletionStage<? extends U>和一个函数BiFunction<? super T,? super U, ? extends V> fn。 当两个CompletionStage都完成时,fn会被执行,并将两个CompletionStage的结果作为输入,返回一个新的结果V。 -
allOf(CompletableFuture<?>... futures): 接收一个CompletableFuture数组。 当所有CompletableFuture都完成时,返回一个新的CompletableFuture<Void>。
下面是一个使用 thenCombine 的例子:
import java.util.concurrent.CompletableFuture;
public class ThenCombineExample {
public static CompletableFuture<String> fetchUserName() {
return CompletableFuture.supplyAsync(() -> {
System.out.println("正在获取用户名...");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "Alice";
});
}
public static CompletableFuture<String> fetchUserEmail() {
return CompletableFuture.supplyAsync(() -> {
System.out.println("正在获取用户邮箱...");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "[email protected]";
});
}
public static void main(String[] args) {
CompletableFuture<String> userNameFuture = fetchUserName();
CompletableFuture<String> userEmailFuture = fetchUserEmail();
CompletableFuture<String> combinedFuture = userNameFuture.thenCombine(userEmailFuture, (userName, userEmail) -> {
System.out.println("正在组合用户名和邮箱...");
return "用户名: " + userName + ", 邮箱: " + userEmail;
});
combinedFuture.thenAccept(result -> System.out.println("组合结果: " + result));
}
}
在这个例子中,我们并行获取用户名和邮箱,然后使用 thenCombine 将它们组合成一个字符串。
接下来,看一个使用 allOf 的例子:
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
public class AllOfExample {
public static CompletableFuture<String> processTask(String taskName) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("正在执行任务: " + taskName);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "任务 " + taskName + " 完成!";
});
}
public static void main(String[] args) {
CompletableFuture<String> task1 = processTask("任务1");
CompletableFuture<String> task2 = processTask("任务2");
CompletableFuture<String> task3 = processTask("任务3");
CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);
allTasks.thenRun(() -> {
System.out.println("所有任务都已完成!");
// 获取每个任务的结果 (需要手动获取,因为 allOf 返回 CompletableFuture<Void>)
try {
System.out.println(task1.get());
System.out.println(task2.get());
System.out.println(task3.get());
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
在这个例子中,我们并行执行三个任务,然后使用 allOf 等待所有任务完成。 注意,allOf 返回的是 CompletableFuture<Void>,如果需要获取每个任务的结果,需要手动调用 get() 方法。
thenCombine vs allOf 的区别
| 特性 | thenCombine |
allOf |
|---|---|---|
| 组合数量 | 两个 CompletableFuture |
多个 CompletableFuture |
| 返回值类型 | 一个新的 CompletableFuture,其结果是两个任务结果的组合 |
一个 CompletableFuture<Void>,表示所有任务都已完成 |
| 适用场景 | 需要将两个任务的结果组合起来的场景 | 需要等待多个任务都完成的场景,但不一定需要组合它们的結果 |
5. 实战案例:构建一个电商订单处理流水线
假设我们有一个电商系统,需要处理用户下单的流程。 这个流程包括以下步骤:
- 验证用户身份。
- 检查库存。
- 创建订单。
- 扣减库存。
- 发送通知。
我们可以使用 CompletableFuture 构建一个异步任务流水线来处理这个流程:
import java.util.concurrent.CompletableFuture;
public class OrderProcessingPipeline {
public static CompletableFuture<Boolean> validateUser(String userId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("验证用户身份: " + userId);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return true; // 模拟验证成功
});
}
public static CompletableFuture<Boolean> checkInventory(String productId, int quantity) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("检查库存: 产品 " + productId + ", 数量 " + quantity);
try {
Thread.sleep(300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return true; // 模拟库存充足
});
}
public static CompletableFuture<String> createOrder(String userId, String productId, int quantity) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("创建订单: 用户 " + userId + ", 产品 " + productId + ", 数量 " + quantity);
try {
Thread.sleep(400);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "订单号: 123456"; // 模拟创建订单成功
});
}
public static CompletableFuture<Boolean> deductInventory(String productId, int quantity) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("扣减库存: 产品 " + productId + ", 数量 " + quantity);
try {
Thread.sleep(300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return true; // 模拟扣减库存成功
});
}
public static CompletableFuture<Void> sendNotification(String orderId) {
return CompletableFuture.runAsync(() -> {
System.out.println("发送通知: 订单 " + orderId);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
}
public static void main(String[] args) {
String userId = "user123";
String productId = "product456";
int quantity = 2;
CompletableFuture<String> orderFuture = validateUser(userId)
.thenCompose(isValidUser -> isValidUser ? checkInventory(productId, quantity) : CompletableFuture.completedFuture(false))
.thenCompose(isSufficientInventory -> isSufficientInventory ? createOrder(userId, productId, quantity) : CompletableFuture.completedFuture("库存不足"))
.thenCompose(orderId -> {
if (orderId.startsWith("订单号")) {
return deductInventory(productId, quantity).thenApply(deducted -> orderId);
} else {
return CompletableFuture.completedFuture(orderId); // Return the error message
}
});
CompletableFuture<Void> notificationFuture = orderFuture.thenAccept(orderId -> {
if (orderId.startsWith("订单号")) {
sendNotification(orderId);
} else {
System.out.println("订单处理失败: " + orderId);
}
});
notificationFuture.join(); // 等待整个流程完成
}
}
在这个例子中,我们使用 thenCompose 将各个步骤串联起来,形成一个异步任务流水线。 如果任何一个步骤失败,整个流程都会停止。 使用 CompletableFuture.completedFuture() 来提前返回一个已完成的 CompletableFuture,避免继续执行后续步骤。
6. 线程池的选择:避免资源饥饿
CompletableFuture 默认使用 ForkJoinPool.commonPool() 作为线程池。 在高并发场景下,共享线程池可能会导致资源竞争,影响性能。 因此,建议为 CompletableFuture 创建自定义的线程池。
import java.util.concurrent.*;
public class CustomThreadPoolExample {
public static void main(String[] args) {
// 创建一个自定义的线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("执行异步任务...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return "Hello, CompletableFuture!";
}, executor); // 使用自定义线程池
future.thenAccept(result -> System.out.println("异步任务的结果: " + result));
executor.shutdown();
}
}
在上面的例子中,我们创建了一个固定大小为 10 的线程池,并将其传递给 supplyAsync 方法。
7. 避免阻塞:使用非阻塞 API
尽量避免在 CompletableFuture 的回调函数中使用阻塞 API。 阻塞 API 会阻塞线程池中的线程,降低程序的并发能力。 如果必须使用阻塞 API,建议将其放在独立的线程中执行。
8. 一些补充建议
- 使用
orTimeout和completeOnTimeout设置超时时间: 防止任务无限期阻塞。 - 使用
complete和completeExceptionally手动完成CompletableFuture: 在某些场景下,需要手动控制CompletableFuture的完成。 - 使用
join和get获取结果的区别:join不会抛出 checked exception,而get会抛出InterruptedException和ExecutionException。通常建议使用join,并处理CompletionException。 - 考虑使用反应式编程框架: 如果需要处理更复杂的异步场景,可以考虑使用 Reactor 或 RxJava 等反应式编程框架。
构建高效的异步流水线
CompletableFuture 提供了一套强大的 API 用于构建异步任务流水线。 通过合理地使用 thenApply、thenCompose、exceptionally、handle、thenCombine 和 allOf 等方法,我们可以构建高效、灵活的异步程序。 记住,选择合适的线程池、避免阻塞 API 以及充分理解各个方法的特性是构建成功的关键。