使用CompletableFuture实现Java多线程任务编排与结果合并的高级技巧

CompletableFuture:Java多线程任务编排与结果合并的高级技巧

大家好,今天我们来深入探讨Java并发编程中一个非常强大的工具——CompletableFuture。它不仅简化了异步编程模型,还提供了丰富的API,让我们能够更优雅地进行多线程任务的编排和结果合并。本次讲座将从CompletableFuture的基本概念入手,逐步讲解其高级用法,并结合实例代码,帮助大家掌握利用CompletableFuture构建高效并发应用的技巧。

1. 基础概念与创建方式

CompletableFuture代表一个异步计算的结果。它允许你在任务完成时异步地执行后续操作,而无需阻塞当前线程。我们可以通过多种方式创建CompletableFuture

  • CompletableFuture.supplyAsync(Supplier<U> supplier): 使用 Supplier 异步执行一个任务并返回结果。 常用于执行耗时的计算任务。
  • CompletableFuture.runAsync(Runnable runnable): 使用 Runnable 异步执行一个任务,没有返回值。 常用于执行一些不需要返回值的操作。
  • CompletableFuture.completedFuture(T value): 创建一个已经完成的 CompletableFuture,其结果为给定的值。 用于快速创建一个已完成的任务。
  • new CompletableFuture<T>(): 创建一个空的 CompletableFuture,需要手动调用 complete(T value) 或者 completeExceptionally(Throwable ex) 来完成或者抛出异常。 用于更精细的控制任务的完成状态。

以下是一些代码示例:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

public class CompletableFutureExample {

    public static void main(String[] args) throws Exception {

        // 1. 使用 supplyAsync 创建 CompletableFuture
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Running task in thread: " + Thread.currentThread().getName());
            try {
                Thread.sleep(1000); // 模拟耗时操作
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return "Hello from supplyAsync";
        });

        System.out.println("supplyAsync Result: " + future1.get()); // get()会阻塞,直到任务完成

        // 2. 使用 runAsync 创建 CompletableFuture
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            System.out.println("Running task in thread: " + Thread.currentThread().getName());
            System.out.println("Hello from runAsync");
        });

        future2.get(); // 等待任务完成

        // 3. 使用 completedFuture 创建 CompletableFuture
        CompletableFuture<String> future3 = CompletableFuture.completedFuture("Hello from completedFuture");
        System.out.println("completedFuture Result: " + future3.get());

        // 4. 使用 new CompletableFuture() 创建 CompletableFuture
        CompletableFuture<String> future4 = new CompletableFuture<>();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(() -> {
            try {
                Thread.sleep(500);
                future4.complete("Hello from manual completion");
            } catch (InterruptedException e) {
                future4.completeExceptionally(e); // 发生异常时需要手动处理
                Thread.currentThread().interrupt();
            }
        });
        System.out.println("manual completion Result: " + future4.get());

        executor.shutdown();
    }
}

2. 任务编排:串行、并行与组合

CompletableFuture 提供了强大的API,可以灵活地编排异步任务。主要包括串行执行、并行执行和组合执行三种方式。

  • 串行执行 (Then Compose & Then Apply)

    • thenApply(Function<T,U> fn): 在前一个 CompletableFuture 完成后,将它的结果作为参数传递给 Function,并返回一个新的 CompletableFuture,其结果是 Function 的返回值。
    • thenApplyAsync(Function<T,U> fn): 与 thenApply 类似,但异步执行 Function
    • thenCompose(Function<T,CompletableFuture<U>> fn): 在前一个 CompletableFuture 完成后,将它的结果作为参数传递给 Function,该 Function 返回一个新的 CompletableFuturethenCompose 用于连接两个依赖于彼此的异步操作,避免嵌套的 CompletableFuture
    • thenComposeAsync(Function<T,CompletableFuture<U>> fn): 与 thenCompose 类似,但异步执行 Function
    • thenAccept(Consumer<T> consumer): 在前一个 CompletableFuture 完成后,将它的结果作为参数传递给 Consumer,没有返回值。
    • thenAcceptAsync(Consumer<T> consumer): 与 thenAccept 类似,但异步执行 Consumer
    • thenRun(Runnable runnable): 在前一个 CompletableFuture 完成后,执行 Runnable,没有返回值。
    • thenRunAsync(Runnable runnable): 与 thenRun 类似,但异步执行 Runnable

    thenApplythenCompose 的区别在于,thenApply 的参数是一个返回普通值的函数,而 thenCompose 的参数是一个返回 CompletableFuture 的函数。 thenCompose 通常用于处理依赖于前一个任务结果的异步操作。

  • 并行执行 (Then Combine & Then Accept Both)

    • thenCombine(CompletionStage<U> other, BiFunction<T,U,V> fn): 当两个 CompletableFuture 都完成后,将它们的结果作为参数传递给 BiFunction,并返回一个新的 CompletableFuture,其结果是 BiFunction 的返回值。
    • thenCombineAsync(CompletionStage<U> other, BiFunction<T,U,V> fn): 与 thenCombine 类似,但异步执行 BiFunction
    • thenAcceptBoth(CompletionStage<?> other, BiConsumer<T,U> consumer): 当两个 CompletableFuture 都完成后,将它们的结果作为参数传递给 BiConsumer,没有返回值。
    • thenAcceptBothAsync(CompletionStage<?> other, BiConsumer<T,U> consumer): 与 thenAcceptBoth 类似,但异步执行 BiConsumer
    • runAfterBoth(CompletionStage<?> other, Runnable runnable): 当两个 CompletableFuture 都完成后,执行 Runnable,没有返回值。
    • runAfterBothAsync(CompletionStage<?> other, Runnable runnable): 与 runAfterBoth 类似,但异步执行 Runnable
  • 组合执行 (Any Of & All Of)

    • anyOf(CompletableFuture<?>... cfs): 返回一个新的 CompletableFuture,当参数中的任何一个 CompletableFuture 完成时,新的 CompletableFuture 也会完成,其结果是第一个完成的 CompletableFuture 的结果。
    • allOf(CompletableFuture<?>... cfs): 返回一个新的 CompletableFuture,当参数中的所有 CompletableFuture 都完成时,新的 CompletableFuture 才会完成,其结果是一个空的 CompletableFuture。 可以使用 join() 方法等待所有任务完成并获取结果 (如果需要)。

以下是一些代码示例,展示了如何使用这些方法进行任务编排:

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureComposeExample {

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(4);

        // 1. 串行执行 (thenApply, thenCompose)
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello", executor)
                .thenApply(s -> s + " World")
                .thenApply(String::toUpperCase);
        System.out.println("串行执行 Result: " + future1.get()); // 输出: HELLO WORLD

        CompletableFuture<String> futureCompose = CompletableFuture.supplyAsync(() -> "Initial Value", executor)
                .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " - Transformed", executor));
        System.out.println("thenCompose Result: " + futureCompose.get()); // 输出: Initial Value - Transformed

        // 2. 并行执行 (thenCombine)
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Hello", executor);
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> " World", executor);
        CompletableFuture<String> combinedFuture = future2.thenCombine(future3, (s1, s2) -> s1 + s2);
        System.out.println("并行执行 Result: " + combinedFuture.get()); // 输出: Hello World

        // 3. 组合执行 (allOf)
        List<String> websites = Arrays.asList("https://www.google.com", "https://www.baidu.com", "https://www.bing.com");
        List<CompletableFuture<String>> futures = websites.stream()
                .map(url -> CompletableFuture.supplyAsync(() -> {
                    try {
                        // Simulate fetching content from a website
                        Thread.sleep(500);
                        return "Content from " + url;
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return "Error fetching from " + url;
                    }
                }, executor))
                .toList();

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

        allFutures.thenRun(() -> {
            System.out.println("All websites fetched.");
            futures.forEach(future -> {
                try {
                    System.out.println(future.get());
                } catch (Exception e) {
                    System.err.println("Error getting result: " + e.getMessage());
                }
            });
        });
        allFutures.get(); // Wait for all futures to complete

        // 4. 组合执行 (anyOf)
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Result A";
        }, executor);

        CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Result B";
        }, executor);

        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(futureA, futureB);

        System.out.println("anyOf Result: " + anyFuture.get());  // Will print "Result B" because it completes first

        executor.shutdown();
    }
}

3. 异常处理

在异步编程中,异常处理至关重要。CompletableFuture 提供了多种机制来处理任务执行过程中可能出现的异常:

  • exceptionally(Function<Throwable,T> fn): 当 CompletableFuture 抛出异常时,调用 Function 处理异常,并返回一个新的 CompletableFuture,其结果是 Function 的返回值。
  • handle(BiFunction<T,Throwable,U> fn): 无论 CompletableFuture 正常完成还是抛出异常,都会调用 BiFunction 处理结果或异常,并返回一个新的 CompletableFuture,其结果是 BiFunction 的返回值。
  • whenComplete(BiConsumer<T,Throwable> action): 无论 CompletableFuture 正常完成还是抛出异常,都会调用 BiConsumer 处理结果或异常,没有返回值。
  • completeExceptionally(Throwable ex): 手动设置 CompletableFuture 抛出异常。
  • obtrudeValue(T value): 强行设置 CompletableFuture 的值为给定值。此方法会忽略任何已存在的完成状态,包括异常。 通常不建议使用,因为它会破坏 CompletableFuture 的正常状态转换。
  • obtrudeException(Throwable ex): 强行设置 CompletableFuture 抛出给定异常。 与 obtrudeValue 类似,不推荐使用。

以下是一些代码示例:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExceptionHandling {

    public static void main(String[] args) throws Exception {

        // 1. exceptionally
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            if (true) {
                throw new RuntimeException("Something went wrong!");
            }
            return "Result";
        }).exceptionally(ex -> {
            System.err.println("Exception occurred: " + ex.getMessage());
            return "Recovered Result";
        });

        System.out.println("exceptionally Result: " + future1.get()); // 输出: Recovered Result

        // 2. handle
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            if (true) {
                throw new RuntimeException("Another error!");
            }
            return "Result";
        }).handle((result, ex) -> {
            if (ex != null) {
                System.err.println("Exception occurred: " + ex.getMessage());
                return "Handle Recovered Result";
            } else {
                return result;
            }
        });

        System.out.println("handle Result: " + future2.get()); // 输出: Handle Recovered Result

        // 3. whenComplete
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            if (true) {
                throw new RuntimeException("Yet another error!");
            }
            return "Result";
        }).whenComplete((result, ex) -> {
            if (ex != null) {
                System.err.println("Exception occurred: " + ex.getMessage());
            } else {
                System.out.println("Result: " + result);
            }
        });

        try {
            future3.get(); // 会抛出异常,因为 whenComplete 不会恢复异常
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("Exception caught: " + e.getMessage());
        }

        // 4. completeExceptionally
        CompletableFuture<String> future4 = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Thread.sleep(500);
                future4.completeExceptionally(new IllegalStateException("Manually completed with exception"));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                future4.completeExceptionally(e);
            }
        }).start();

        try {
            future4.get();
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("Exception caught: " + e.getMessage()); // 输出: Manually completed with exception
        }
    }
}

4. 线程池的选择与配置

CompletableFuture 默认使用 ForkJoinPool.commonPool() 作为其默认的线程池。 但为了更好地控制并发度,避免线程饥饿,以及更精细地管理线程资源,通常建议使用自定义的线程池。

  • Executor 接口: CompletableFuture 提供了接受 Executor 参数的方法,例如 supplyAsync(Supplier<U> supplier, Executor executor)

选择线程池时需要考虑以下因素:

  • CPU 密集型任务: 对于 CPU 密集型任务,线程池的大小通常设置为 CPU 核心数 + 1。
  • I/O 密集型任务: 对于 I/O 密集型任务,线程池的大小可以设置为 CPU 核心数的 2 倍甚至更多,具体取决于 I/O 操作的阻塞时间。
  • 任务的优先级: 可以使用优先级队列的线程池来执行不同优先级的任务。
  • 任务的隔离: 可以使用不同的线程池来隔离不同类型的任务,避免相互干扰。

以下是一些线程池的类型:

线程池类型 描述
FixedThreadPool 固定大小的线程池,适用于任务数量相对稳定,需要限制并发数的场景。
CachedThreadPool 可缓存的线程池,线程数量动态调整,适用于任务数量变化频繁,执行时间较短的场景。
ScheduledThreadPool 可以执行定时任务的线程池,适用于需要周期性执行任务的场景。
SingleThreadExecutor 单线程的线程池,保证任务按顺序执行,适用于需要串行执行任务的场景。
ForkJoinPool JDK 7 引入的线程池,适用于可以分解成小任务的场景,例如递归算法。
ThreadPoolExecutor (自定义) 允许更细粒度的配置,例如核心线程数、最大线程数、空闲线程存活时间、任务队列等。

以下是一个使用自定义线程池的示例:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureWithExecutor {

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Running task in thread: " + Thread.currentThread().getName());
            return "Hello from custom executor";
        }, executor);

        System.out.println("Result: " + future.get());

        executor.shutdown();
    }
}

5. 异步编程的最佳实践

  • 避免阻塞操作: 在 CompletableFuture 的回调函数中,尽量避免执行阻塞操作,否则会影响异步执行的效率。
  • 合理选择线程池: 根据任务的类型和特点,选择合适的线程池,并进行合理的配置。
  • 充分利用组合API: 利用 thenComposethenCombineallOf 等组合API,可以简化任务编排,提高代码的可读性和可维护性。
  • 注意异常处理: 在异步编程中,异常处理尤为重要,需要充分考虑各种异常情况,并进行妥善处理。
  • 避免回调地狱: 过多的回调嵌套会导致代码难以理解和维护,可以使用 thenCompose 等方法来避免回调地狱。
  • 监控和调优: 使用监控工具来观察 CompletableFuture 的执行情况,并根据实际情况进行调优。

代码演示:利用CompletableFuture处理订单流程

假设有一个在线购物平台,需要处理订单创建、支付、库存扣减和物流通知等多个步骤。 我们可以使用 CompletableFuture 来异步地执行这些步骤,提高系统的响应速度。

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class OrderProcessingExample {

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(4);

        // 1. 创建订单
        CompletableFuture<Order> createOrderFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Creating order in thread: " + Thread.currentThread().getName());
            // Simulate creating an order
            try {
                Thread.sleep(new Random().nextInt(500));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return new Order(123, "Product XYZ", 1, 99.99);
        }, executor);

        // 2. 支付
        CompletableFuture<PaymentResult> paymentFuture = createOrderFuture.thenApplyAsync(order -> {
            System.out.println("Processing payment for order " + order.orderId + " in thread: " + Thread.currentThread().getName());
            // Simulate payment processing
            try {
                Thread.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            boolean success = new Random().nextBoolean();
            return new PaymentResult(order.orderId, success, success ? "Payment successful" : "Payment failed");
        }, executor);

        // 3. 扣减库存
        CompletableFuture<InventoryResult> inventoryFuture = createOrderFuture.thenApplyAsync(order -> {
            System.out.println("Deducting inventory for order " + order.orderId + " in thread: " + Thread.currentThread().getName());
            // Simulate inventory deduction
            try {
                Thread.sleep(new Random().nextInt(300));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            boolean success = new Random().nextBoolean();
            return new InventoryResult(order.orderId, success, success ? "Inventory deducted" : "Inventory deduction failed");
        }, executor);

        // 4. 物流通知 (依赖于支付和库存结果)
        CompletableFuture<ShippingNotification> shippingFuture = paymentFuture.thenCombineAsync(inventoryFuture, (paymentResult, inventoryResult) -> {
            System.out.println("Sending shipping notification for order " + paymentResult.orderId + " in thread: " + Thread.currentThread().getName());
            // Simulate sending shipping notification
            try {
                Thread.sleep(new Random().nextInt(200));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            boolean readyForShipping = paymentResult.success && inventoryResult.success;
            String message = readyForShipping ? "Order ready for shipping" : "Order cannot be shipped";
            return new ShippingNotification(paymentResult.orderId, readyForShipping, message);
        }, executor);

        // 5. 处理最终结果
        shippingFuture.thenAccept(notification -> {
            System.out.println("Shipping notification: " + notification);
        });

        // 等待所有任务完成
        shippingFuture.get();
        executor.shutdown();
    }

    static class Order {
        int orderId;
        String product;
        int quantity;
        double price;

        public Order(int orderId, String product, int quantity, double price) {
            this.orderId = orderId;
            this.product = product;
            this.quantity = quantity;
            this.price = price;
        }
    }

    static class PaymentResult {
        int orderId;
        boolean success;
        String message;

        public PaymentResult(int orderId, boolean success, String message) {
            this.orderId = orderId;
            this.success = success;
            this.message = message;
        }
    }

    static class InventoryResult {
        int orderId;
        boolean success;
        String message;

        public InventoryResult(int orderId, boolean success, String message) {
            this.orderId = orderId;
            this.success = success;
            this.message = message;
        }
    }

    static class ShippingNotification {
        int orderId;
        boolean readyForShipping;
        String message;

        public ShippingNotification(int orderId, boolean readyForShipping, String message) {
            this.orderId = orderId;
            this.readyForShipping = readyForShipping;
            this.message = message;
        }

        @Override
        public String toString() {
            return "ShippingNotification{" +
                    "orderId=" + orderId +
                    ", readyForShipping=" + readyForShipping +
                    ", message='" + message + ''' +
                    '}';
        }
    }
}

通过这个例子,我们可以看到 CompletableFuture 如何方便地处理复杂的异步流程,提高系统的并发能力和响应速度。

总结:异步任务编排,高效并发编程

CompletableFuture 是 Java 并发编程中一个强大的工具,它通过提供丰富的 API 和灵活的编排方式,简化了异步编程模型,使得我们可以更优雅地构建高效的并发应用。 掌握 CompletableFuture 的高级用法,能够帮助我们更好地处理复杂的异步任务,提高系统的性能和可维护性。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注