使用CompletableFuture实现Java多线程任务编排与结果合并的高级技巧

CompletableFuture:Java 多线程任务编排与结果合并的高级技巧

各位朋友,大家好!今天我们来深入探讨 Java 并发编程中一个非常强大的工具:CompletableFuture。它不仅能够简化异步编程,还能让我们以声明式的方式编排复杂的并发任务,并高效地合并最终结果。本讲座将从 CompletableFuture 的基础概念入手,逐步深入到高级应用,并通过丰富的代码示例,帮助大家掌握这项关键技术。

1. CompletableFuture 的基础:Promise 与 Future 的进化

在传统的 Java 多线程编程中,我们通常使用 Future 接口来表示异步计算的结果。Future 允许我们提交一个任务到线程池,然后通过 get() 方法阻塞地等待结果。但是,这种方式存在几个明显的缺点:

  • 阻塞等待: get() 方法会阻塞当前线程,直到结果可用。这会导致线程资源的浪费,降低程序的响应性。
  • 缺乏回调机制: Future 本身不提供回调机制,当结果可用时,无法主动通知调用者。
  • 组合困难: 如果需要将多个 Future 的结果组合起来,逻辑会变得非常复杂,容易出错。

CompletableFuture 正是为了解决这些问题而诞生的。它实现了 FutureCompletionStage 接口,提供了更加灵活和强大的异步编程模型。可以将 CompletableFuture 看作是 Future 的一个增强版本,它引入了 Promise 的概念,允许我们以非阻塞的方式处理异步计算的结果,并提供了丰富的回调机制和组合方法。

简单来说,CompletableFuture 代表着一个最终可能完成的异步操作,并且提供了各种方法来注册在操作完成时执行的回调函数。

2. 创建 CompletableFuture 对象

创建 CompletableFuture 对象有多种方式,最常用的包括以下几种:

  • CompletableFuture.completedFuture(value): 创建一个已经完成的 CompletableFuture,并返回指定的值。

    CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("Hello World");
    try {
        String result = completedFuture.get(); // 直接返回 "Hello World",不会阻塞
        System.out.println(result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  • CompletableFuture.runAsync(Runnable): 创建一个异步执行的 CompletableFuture,执行一个 Runnable 任务,没有返回值。

    CompletableFuture<Void> runAsyncFuture = CompletableFuture.runAsync(() -> {
        System.out.println("Running task in separate thread: " + Thread.currentThread().getName());
    });
  • CompletableFuture.supplyAsync(Supplier): 创建一个异步执行的 CompletableFuture,执行一个 Supplier 任务,并返回一个结果。

    CompletableFuture<String> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Running task in separate thread: " + Thread.currentThread().getName());
        return "Result from Supplier";
    });
    
    try {
        String result = supplyAsyncFuture.get();
        System.out.println(result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  • new CompletableFuture(): 创建一个新的 CompletableFuture 对象,需要手动调用 complete()completeExceptionally() 方法来设置结果或抛出异常。

    CompletableFuture<String> future = new CompletableFuture<>();
    
    new Thread(() -> {
        try {
            Thread.sleep(2000); // 模拟耗时操作
            future.complete("Task completed"); // 设置结果
        } catch (InterruptedException e) {
            future.completeExceptionally(e); // 设置异常
        }
    }).start();
    
    try {
        String result = future.get();
        System.out.println(result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

runAsyncsupplyAsync 方法默认使用 ForkJoinPool.commonPool() 作为默认线程池。我们也可以通过传入 Executor 参数来指定自定义线程池。

3. CompletableFuture 的核心方法:回调与转换

CompletableFuture 提供了丰富的回调方法,用于在异步任务完成时执行特定的操作。这些回调方法可以分为以下几类:

  • thenApply(Function): 对结果进行转换,返回一个新的 CompletableFuture,其结果是应用 Function 转换后的结果。

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> upperCaseFuture = future.thenApply(String::toUpperCase); // 转换为大写
    try {
        System.out.println(upperCaseFuture.get()); // 输出: HELLO
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  • thenAccept(Consumer): 消费结果,返回 CompletableFuture<Void>,不返回新的结果。

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "World");
    CompletableFuture<Void> printFuture = future.thenAccept(System.out::println); // 打印结果
    
    try {
        printFuture.get(); // 等待打印完成,返回 null
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  • thenRun(Runnable): 在结果完成后执行一个 Runnable 任务,返回 CompletableFuture<Void>,不关心结果。

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Task Done");
    CompletableFuture<Void> runFuture = future.thenRun(() -> System.out.println("Task finished!"));
    
    try {
        runFuture.get(); // 等待任务完成,返回 null
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  • thenApplyAsync(Function)thenAcceptAsync(Consumer)thenRunAsync(Runnable): 与对应的同步方法类似,但是异步执行回调函数。

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Async");
    CompletableFuture<String> upperCaseFuture = future.thenApplyAsync(String::toUpperCase); // 异步转换为大写
    
    try {
        System.out.println(upperCaseFuture.get()); // 输出: ASYNC
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  • exceptionally(Function):CompletableFuture 抛出异常时,执行 Function 来处理异常,并返回一个新的 CompletableFuture,其结果是处理后的结果。

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        throw new RuntimeException("Something went wrong");
    });
    
    CompletableFuture<String> recoveredFuture = future.exceptionally(ex -> {
        System.err.println("Exception caught: " + ex.getMessage());
        return "Recovered value";
    });
    
    try {
        System.out.println(recoveredFuture.get()); // 输出: Recovered value
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  • whenComplete(BiConsumer): 无论 CompletableFuture 正常完成还是抛出异常,都会执行 BiConsumer,可以用于记录日志或清理资源。

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        if (Math.random() > 0.5) {
            return "Success";
        } else {
            throw new RuntimeException("Random failure");
        }
    });
    
    CompletableFuture<String> completedFuture = future.whenComplete((result, ex) -> {
        if (ex == null) {
            System.out.println("Task completed successfully: " + result);
        } else {
            System.err.println("Task completed with exception: " + ex.getMessage());
        }
    });
    
    try {
        System.out.println(completedFuture.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  • handle(BiFunction):whenComplete 类似,但是 handle 接收一个 BiFunction,可以根据结果或异常返回一个新的结果,并封装到新的 CompletableFuture 中。

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        if (Math.random() > 0.5) {
            return "Success";
        } else {
            throw new RuntimeException("Random failure");
        }
    });
    
    CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
        if (ex == null) {
            return "Handled: " + result;
        } else {
            System.err.println("Exception caught: " + ex.getMessage());
            return "Handled: Error occurred";
        }
    });
    
    try {
        System.out.println(handledFuture.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

4. CompletableFuture 的组合:编排复杂任务

CompletableFuture 提供了强大的组合方法,允许我们将多个 CompletableFuture 组合起来,构建复杂的异步任务流程。常用的组合方法包括:

  • thenCompose(Function): 将一个 CompletableFuture 的结果作为参数,传递给另一个返回 CompletableFutureFunction,并将两个 CompletableFuture 组合起来。适用于依赖关系的任务。

    CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> "user123");
    
    CompletableFuture<String> orderFuture = userFuture.thenCompose(userId -> {
        return CompletableFuture.supplyAsync(() -> "order456 for user " + userId);
    });
    
    try {
        System.out.println(orderFuture.get()); // 输出: order456 for user user123
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  • thenCombine(CompletableFuture, BiFunction): 将两个 CompletableFuture 的结果合并起来,应用 BiFunction 生成一个新的结果。两个 CompletableFuture 是并行执行的。

    CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> "John");
    CompletableFuture<String> lastNameFuture = CompletableFuture.supplyAsync(() -> "Doe");
    
    CompletableFuture<String> fullNameFuture = nameFuture.thenCombine(lastNameFuture, (firstName, lastName) -> firstName + " " + lastName);
    
    try {
        System.out.println(fullNameFuture.get()); // 输出: John Doe
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  • thenAcceptBoth(CompletableFuture, BiConsumer):thenCombine 类似,但是接收一个 BiConsumer,用于消费两个 CompletableFuture 的结果,不返回新的结果。

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
    
    CompletableFuture<Void> acceptBothFuture = future1.thenAcceptBoth(future2, (str1, str2) -> {
        System.out.println(str1 + " " + str2); // 输出: Hello World
    });
    
    try {
        acceptBothFuture.get(); // 等待任务完成,返回 null
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  • runAfterBoth(CompletableFuture, Runnable): 当两个 CompletableFuture 都完成后,执行一个 Runnable 任务。

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1");
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2");
    
    CompletableFuture<Void> runAfterBothFuture = future1.runAfterBoth(future2, () -> {
        System.out.println("Both tasks are completed");
    });
    
    try {
        runAfterBothFuture.get(); // 等待任务完成,返回 null
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  • applyToEither(CompletableFuture, Function): 当两个 CompletableFuture 中任何一个完成时,将它的结果应用 Function 生成一个新的结果。

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Result from Future 1";
    });
    
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result from Future 2");
    
    CompletableFuture<String> eitherFuture = future1.applyToEither(future2, result -> "The result is: " + result);
    
    try {
        System.out.println(eitherFuture.get()); // 可能输出: The result is: Result from Future 2
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  • acceptEither(CompletableFuture, Consumer):applyToEither 类似,但是接收一个 Consumer,用于消费先完成的 CompletableFuture 的结果,不返回新的结果。

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Result from Future 1";
    });
    
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result from Future 2");
    
    CompletableFuture<Void> acceptEitherFuture = future1.acceptEither(future2, result -> System.out.println("First completed result: " + result));
    
    try {
        acceptEitherFuture.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  • runAfterEither(CompletableFuture, Runnable): 当两个 CompletableFuture 中任何一个完成后,执行一个 Runnable 任务。

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Result from Future 1";
    });
    
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result from Future 2");
    
    CompletableFuture<Void> runAfterEitherFuture = future1.runAfterEither(future2, () -> System.out.println("One of the tasks is completed"));
    
    try {
        runAfterEitherFuture.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

5. 处理多个 CompletableFuture:allOf 和 anyOf

CompletableFuture 提供了 allOfanyOf 方法,用于处理多个 CompletableFuture 的聚合结果。

  • CompletableFuture.allOf(CompletableFuture...): 返回一个新的 CompletableFuture,当所有传入的 CompletableFuture 都完成时,该 CompletableFuture 才会完成。 返回的 CompletableFuture 的结果是 Void

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1");
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2");
    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Task 3");
    
    CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2, future3);
    
    try {
        allOfFuture.get(); // 等待所有任务完成
    
        System.out.println("All tasks are completed");
    
        // 获取每个 CompletableFuture 的结果 (需要确保每个 future 都已完成)
        String result1 = future1.get();
        String result2 = future2.get();
        String result3 = future3.get();
    
        System.out.println("Result 1: " + result1);
        System.out.println("Result 2: " + result2);
        System.out.println("Result 3: " + result3);
    
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
  • CompletableFuture.anyOf(CompletableFuture...): 返回一个新的 CompletableFuture,当任何一个传入的 CompletableFuture 完成时,该 CompletableFuture 就会完成。 返回的 CompletableFuture 的结果是第一个完成的 CompletableFuture 的结果。

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Task 1";
    });
    
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2");
    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Task 3");
    
    CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);
    
    try {
        Object result = anyOfFuture.get(); // 等待任何一个任务完成
        System.out.println("One of the tasks is completed: " + result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

6. 案例分析:电商平台商品详情页加载

假设我们需要加载一个电商平台商品详情页,需要从不同的微服务获取商品信息、价格、库存、评价等数据。我们可以使用 CompletableFuture 来并行地获取这些数据,并最终合并成一个商品详情对象。

class ProductDetail {
    private String name;
    private double price;
    private int stock;
    private double rating;

    // 省略构造函数、Getter 和 Setter
    public ProductDetail(String name, double price, int stock, double rating) {
        this.name = name;
        this.price = price;
        this.stock = stock;
        this.rating = rating;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }

    public int getStock() {
        return stock;
    }

    public void setStock(int stock) {
        this.stock = stock;
    }

    public double getRating() {
        return rating;
    }

    public void setRating(double rating) {
        this.rating = rating;
    }

    @Override
    public String toString() {
        return "ProductDetail{" +
                "name='" + name + ''' +
                ", price=" + price +
                ", stock=" + stock +
                ", rating=" + rating +
                '}';
    }
}

public class ProductDetailLoader {

    public static CompletableFuture<String> getNameAsync() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从商品信息微服务获取商品名称
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Awesome Product";
        });
    }

    public static CompletableFuture<Double> getPriceAsync() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从价格微服务获取商品价格
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 99.99;
        });
    }

    public static CompletableFuture<Integer> getStockAsync() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从库存微服务获取商品库存
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 100;
        });
    }

    public static CompletableFuture<Double> getRatingAsync() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从评价微服务获取商品评价
            try {
                Thread.sleep(400);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 4.5;
        });
    }

    public static CompletableFuture<ProductDetail> loadProductDetail() {
        CompletableFuture<String> nameFuture = getNameAsync();
        CompletableFuture<Double> priceFuture = getPriceAsync();
        CompletableFuture<Integer> stockFuture = getStockAsync();
        CompletableFuture<Double> ratingFuture = getRatingAsync();

        return CompletableFuture.allOf(nameFuture, priceFuture, stockFuture, ratingFuture)
                .thenApply(v -> {
                    try {
                        return new ProductDetail(nameFuture.get(), priceFuture.get(), stockFuture.get(), ratingFuture.get());
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                });
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        CompletableFuture<ProductDetail> productDetailFuture = loadProductDetail();
        ProductDetail productDetail = productDetailFuture.get();
        long endTime = System.currentTimeMillis();

        System.out.println("Product Detail: " + productDetail);
        System.out.println("Time taken: " + (endTime - startTime) + "ms"); // 预计耗时略大于 500ms
    }
}

在这个例子中,我们使用 CompletableFuture.supplyAsync 并行地从不同的微服务获取数据,然后使用 CompletableFuture.allOf 等待所有数据都获取完成,最后将所有数据合并成一个 ProductDetail 对象。

7. 最佳实践与注意事项

  • 避免阻塞: 尽可能使用异步回调方法,避免在 CompletableFuture 中进行阻塞操作。
  • 选择合适的线程池: 根据任务的性质选择合适的线程池,例如 CPU 密集型任务使用 ForkJoinPool,IO 密集型任务使用自定义的线程池。
  • 处理异常: 使用 exceptionallywhenCompletehandle 方法来处理异常,保证程序的健壮性。
  • 避免死锁: 在使用组合方法时,要注意避免死锁,例如避免在同一个 CompletableFuture 的回调函数中等待另一个 CompletableFuture 的结果。
  • 监控和调试: 使用 Java 的监控工具来监控 CompletableFuture 的执行情况,例如线程池的使用率、任务的执行时间等。

8. CompletableFuture的优势简述

CompletableFuture 的核心优势在于它提供了一种声明式、非阻塞的方式来编排并发任务。它通过回调机制和丰富的组合方法,简化了异步编程的复杂性,提高了程序的响应性和吞吐量。相比传统的 Future 接口,CompletableFuture 更加灵活和强大,是构建高性能并发应用的重要工具。

掌握要点:异步编排,高效并发

希望今天的讲座能够帮助大家更好地理解和使用 CompletableFuture。 谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注