JAVA 实战:如何用 CompletableFuture 构建异步任务流水线?

JAVA 实战:如何用 CompletableFuture 构建异步任务流水线?

各位同学,今天我们来聊聊如何使用 CompletableFuture 构建异步任务流水线。在并发编程中,我们经常需要将多个任务串联起来,一个任务的输出作为另一个任务的输入,形成一个流水线。传统的同步方式会阻塞线程,效率低下。CompletableFuture 提供了一种优雅的方式来构建异步流水线,充分利用多核 CPU,提高程序的吞吐量和响应速度。

1. CompletableFuture 基础:理解异步编程的核心

CompletableFuture 代表一个异步计算的结果。它允许我们注册回调函数,在计算完成时执行这些回调函数。与 Future 相比,CompletableFuture 更加灵活和强大,提供了丰富的 API 来处理异步任务的完成、异常和组合。

让我们从一个简单的例子开始:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建一个 CompletableFuture 对象
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行异步任务...");
            try {
                Thread.sleep(1000); // 模拟耗时操作
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "Hello, CompletableFuture!";
        });

        // 获取异步任务的结果 (会阻塞直到任务完成)
        String result = future.get();
        System.out.println("异步任务的结果: " + result);
    }
}

在这个例子中,CompletableFuture.supplyAsync() 创建了一个异步任务,该任务返回一个字符串 "Hello, CompletableFuture!"。 future.get() 方法会阻塞当前线程,直到异步任务完成并返回结果。

2. 构建简单的任务流水线:thenApplythenCompose

thenApplythenComposeCompletableFuture 中构建任务流水线的两个核心方法。

  • thenApply(Function<T, R> fn): 接收一个函数 fn,该函数将上一个任务的结果 T 作为输入,并返回一个新的结果 RthenApply 返回一个新的 CompletableFuture<R>,代表这个新的异步计算。

  • thenCompose(Function<T, CompletionStage<R>> fn): 与 thenApply 类似,但 fn 返回的是一个 CompletionStage<R>,而不是直接返回 RthenCompose 会将上一个任务的结果 T 作为输入,并创建一个新的异步任务 CompletionStage<R>。 它的作用是将两个 CompletableFuture 连接起来,形成一个整体的异步流程。

下面是一个使用 thenApply 的例子:

import java.util.concurrent.CompletableFuture;

public class ThenApplyExample {

    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");

        CompletableFuture<String> upperCaseFuture = future.thenApply(String::toUpperCase);

        upperCaseFuture.thenAccept(result -> System.out.println("转换后的结果: " + result)); // 输出: 转换后的结果: HELLO
    }
}

在这个例子中,我们首先创建一个 CompletableFuture<String>,返回 "Hello"。 然后,我们使用 thenApply 将结果转换为大写。 最后,我们使用 thenAccept 消费结果。

接下来,看一个使用 thenCompose 的例子:

import java.util.concurrent.CompletableFuture;

public class ThenComposeExample {

    public static CompletableFuture<String> getUserName(String userId) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从数据库获取用户名
            System.out.println("正在获取用户名...");
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "Alice";
        });
    }

    public static CompletableFuture<String> getEmail(String userName) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从数据库获取用户邮箱
            System.out.println("正在获取邮箱...");
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return userName + "@example.com";
        });
    }

    public static void main(String[] args) {
        CompletableFuture<String> emailFuture = CompletableFuture.supplyAsync(() -> "123") // 模拟用户ID
                .thenCompose(ThenComposeExample::getUserName)
                .thenCompose(ThenComposeExample::getEmail);

        emailFuture.thenAccept(email -> System.out.println("用户的邮箱: " + email));
    }
}

在这个例子中,我们首先使用 supplyAsync 创建一个 CompletableFuture,模拟用户 ID。 然后,我们使用 thenCompose 依次获取用户名和邮箱。 thenCompose 确保了获取邮箱的操作只有在获取用户名操作完成后才会执行,并且将两个异步操作组合成一个整体。

thenApply vs thenCompose 的区别

特性 thenApply thenCompose
输入函数返回值 直接返回结果 R 返回 CompletionStage<R>
作用 对上一个任务的结果进行转换 将两个 CompletableFuture 连接起来,形成一个整体的异步流程
适用场景 简单的转换操作,不需要创建新的异步任务 需要创建新的异步任务,并将它们串联起来的情况

3. 处理异常:exceptionallyhandle

在异步编程中,异常处理至关重要。 CompletableFuture 提供了 exceptionallyhandle 方法来处理异常。

  • exceptionally(Function<Throwable, T> fn): 接收一个函数 fn,该函数接收一个 Throwable 对象作为输入,并返回一个替代的结果 T。 如果上一个任务抛出异常,则会执行 fn

  • handle(BiFunction<T, Throwable, R> fn): 接收一个函数 fn,该函数接收上一个任务的结果 T 和一个 Throwable 对象作为输入,并返回一个新的结果 R。 无论上一个任务是否抛出异常,都会执行 fn

下面是一个使用 exceptionally 的例子:

import java.util.concurrent.CompletableFuture;

public class ExceptionallyExample {

    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行可能抛出异常的任务...");
            if (true) {
                throw new RuntimeException("任务执行失败!");
            }
            return 100;
        });

        CompletableFuture<Integer> recoveredFuture = future.exceptionally(ex -> {
            System.err.println("捕获到异常: " + ex.getMessage());
            return 0; // 返回一个默认值
        });

        recoveredFuture.thenAccept(result -> System.out.println("最终结果: " + result)); // 输出: 最终结果: 0
    }
}

在这个例子中,我们故意抛出一个异常。 exceptionally 方法捕获到异常,并返回一个默认值 0。

接下来,看一个使用 handle 的例子:

import java.util.concurrent.CompletableFuture;

public class HandleExample {

    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行可能抛出异常的任务...");
            if (true) {
                throw new RuntimeException("任务执行失败!");
            }
            return 100;
        });

        CompletableFuture<Integer> handledFuture = future.handle((result, ex) -> {
            if (ex != null) {
                System.err.println("捕获到异常: " + ex.getMessage());
                return 0; // 返回一个默认值
            } else {
                return result;
            }
        });

        handledFuture.thenAccept(result -> System.out.println("最终结果: " + result)); // 输出: 最终结果: 0
    }
}

在这个例子中, handle 方法接收到异常,并返回一个默认值 0。 如果没有异常,则返回原始结果。

exceptionally vs handle 的区别

特性 exceptionally handle
执行时机 仅在发生异常时执行 无论是否发生异常都会执行
输入参数 Throwable 对象 结果 TThrowable 对象
适用场景 只需要在发生异常时提供一个替代结果的场景 需要根据是否发生异常来执行不同逻辑的场景

4. 组合多个 CompletableFuturethenCombineallOf

除了串联任务,我们还可以并行执行多个任务,并将它们的结果组合起来。 CompletableFuture 提供了 thenCombineallOf 方法来实现这个功能。

  • thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U, ? extends V> fn): 接收另一个 CompletionStage<? extends U> 和一个函数 BiFunction<? super T,? super U, ? extends V> fn。 当两个 CompletionStage 都完成时,fn 会被执行,并将两个 CompletionStage 的结果作为输入,返回一个新的结果 V

  • allOf(CompletableFuture<?>... futures): 接收一个 CompletableFuture 数组。 当所有 CompletableFuture 都完成时,返回一个新的 CompletableFuture<Void>

下面是一个使用 thenCombine 的例子:

import java.util.concurrent.CompletableFuture;

public class ThenCombineExample {

    public static CompletableFuture<String> fetchUserName() {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("正在获取用户名...");
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "Alice";
        });
    }

    public static CompletableFuture<String> fetchUserEmail() {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("正在获取用户邮箱...");
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "[email protected]";
        });
    }

    public static void main(String[] args) {
        CompletableFuture<String> userNameFuture = fetchUserName();
        CompletableFuture<String> userEmailFuture = fetchUserEmail();

        CompletableFuture<String> combinedFuture = userNameFuture.thenCombine(userEmailFuture, (userName, userEmail) -> {
            System.out.println("正在组合用户名和邮箱...");
            return "用户名: " + userName + ", 邮箱: " + userEmail;
        });

        combinedFuture.thenAccept(result -> System.out.println("组合结果: " + result));
    }
}

在这个例子中,我们并行获取用户名和邮箱,然后使用 thenCombine 将它们组合成一个字符串。

接下来,看一个使用 allOf 的例子:

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;

public class AllOfExample {

    public static CompletableFuture<String> processTask(String taskName) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("正在执行任务: " + taskName);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "任务 " + taskName + " 完成!";
        });
    }

    public static void main(String[] args) {
        CompletableFuture<String> task1 = processTask("任务1");
        CompletableFuture<String> task2 = processTask("任务2");
        CompletableFuture<String> task3 = processTask("任务3");

        CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);

        allTasks.thenRun(() -> {
            System.out.println("所有任务都已完成!");
            // 获取每个任务的结果 (需要手动获取,因为 allOf 返回 CompletableFuture<Void>)
            try {
                System.out.println(task1.get());
                System.out.println(task2.get());
                System.out.println(task3.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

在这个例子中,我们并行执行三个任务,然后使用 allOf 等待所有任务完成。 注意,allOf 返回的是 CompletableFuture<Void>,如果需要获取每个任务的结果,需要手动调用 get() 方法。

thenCombine vs allOf 的区别

特性 thenCombine allOf
组合数量 两个 CompletableFuture 多个 CompletableFuture
返回值类型 一个新的 CompletableFuture,其结果是两个任务结果的组合 一个 CompletableFuture<Void>,表示所有任务都已完成
适用场景 需要将两个任务的结果组合起来的场景 需要等待多个任务都完成的场景,但不一定需要组合它们的結果

5. 实战案例:构建一个电商订单处理流水线

假设我们有一个电商系统,需要处理用户下单的流程。 这个流程包括以下步骤:

  1. 验证用户身份。
  2. 检查库存。
  3. 创建订单。
  4. 扣减库存。
  5. 发送通知。

我们可以使用 CompletableFuture 构建一个异步任务流水线来处理这个流程:

import java.util.concurrent.CompletableFuture;

public class OrderProcessingPipeline {

    public static CompletableFuture<Boolean> validateUser(String userId) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("验证用户身份: " + userId);
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return true; // 模拟验证成功
        });
    }

    public static CompletableFuture<Boolean> checkInventory(String productId, int quantity) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("检查库存: 产品 " + productId + ", 数量 " + quantity);
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return true; // 模拟库存充足
        });
    }

    public static CompletableFuture<String> createOrder(String userId, String productId, int quantity) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("创建订单: 用户 " + userId + ", 产品 " + productId + ", 数量 " + quantity);
            try {
                Thread.sleep(400);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "订单号: 123456"; // 模拟创建订单成功
        });
    }

    public static CompletableFuture<Boolean> deductInventory(String productId, int quantity) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("扣减库存: 产品 " + productId + ", 数量 " + quantity);
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return true; // 模拟扣减库存成功
        });
    }

    public static CompletableFuture<Void> sendNotification(String orderId) {
        return CompletableFuture.runAsync(() -> {
            System.out.println("发送通知: 订单 " + orderId);
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        });
    }

    public static void main(String[] args) {
        String userId = "user123";
        String productId = "product456";
        int quantity = 2;

        CompletableFuture<String> orderFuture = validateUser(userId)
                .thenCompose(isValidUser -> isValidUser ? checkInventory(productId, quantity) : CompletableFuture.completedFuture(false))
                .thenCompose(isSufficientInventory -> isSufficientInventory ? createOrder(userId, productId, quantity) : CompletableFuture.completedFuture("库存不足"))
                .thenCompose(orderId -> {
                    if (orderId.startsWith("订单号")) {
                        return deductInventory(productId, quantity).thenApply(deducted -> orderId);
                    } else {
                        return CompletableFuture.completedFuture(orderId); // Return the error message
                    }
                });

        CompletableFuture<Void> notificationFuture = orderFuture.thenAccept(orderId -> {
            if (orderId.startsWith("订单号")) {
                sendNotification(orderId);
            } else {
                System.out.println("订单处理失败: " + orderId);
            }
        });

        notificationFuture.join(); // 等待整个流程完成
    }
}

在这个例子中,我们使用 thenCompose 将各个步骤串联起来,形成一个异步任务流水线。 如果任何一个步骤失败,整个流程都会停止。 使用 CompletableFuture.completedFuture() 来提前返回一个已完成的 CompletableFuture,避免继续执行后续步骤。

6. 线程池的选择:避免资源饥饿

CompletableFuture 默认使用 ForkJoinPool.commonPool() 作为线程池。 在高并发场景下,共享线程池可能会导致资源竞争,影响性能。 因此,建议为 CompletableFuture 创建自定义的线程池。

import java.util.concurrent.*;

public class CustomThreadPoolExample {

    public static void main(String[] args) {
        // 创建一个自定义的线程池
        ExecutorService executor = Executors.newFixedThreadPool(10);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行异步任务...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "Hello, CompletableFuture!";
        }, executor); // 使用自定义线程池

        future.thenAccept(result -> System.out.println("异步任务的结果: " + result));

        executor.shutdown();
    }
}

在上面的例子中,我们创建了一个固定大小为 10 的线程池,并将其传递给 supplyAsync 方法。

7. 避免阻塞:使用非阻塞 API

尽量避免在 CompletableFuture 的回调函数中使用阻塞 API。 阻塞 API 会阻塞线程池中的线程,降低程序的并发能力。 如果必须使用阻塞 API,建议将其放在独立的线程中执行。

8. 一些补充建议

  • 使用 orTimeoutcompleteOnTimeout 设置超时时间: 防止任务无限期阻塞。
  • 使用 completecompleteExceptionally 手动完成 CompletableFuture: 在某些场景下,需要手动控制 CompletableFuture 的完成。
  • 使用 joinget 获取结果的区别: join 不会抛出 checked exception,而 get 会抛出 InterruptedExceptionExecutionException。通常建议使用 join,并处理 CompletionException
  • 考虑使用反应式编程框架: 如果需要处理更复杂的异步场景,可以考虑使用 Reactor 或 RxJava 等反应式编程框架。

构建高效的异步流水线

CompletableFuture 提供了一套强大的 API 用于构建异步任务流水线。 通过合理地使用 thenApplythenComposeexceptionallyhandlethenCombineallOf 等方法,我们可以构建高效、灵活的异步程序。 记住,选择合适的线程池、避免阻塞 API 以及充分理解各个方法的特性是构建成功的关键。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注