JAVA 使用 CompletableFuture 合并多接口调用的最佳实践

JAVA CompletableFuture 合并多接口调用的最佳实践

各位同学们,大家好!今天我们来聊聊在 Java 开发中,如何使用 CompletableFuture 优雅地合并多个接口调用。在微服务架构日益流行的今天,一个业务流程往往需要调用多个不同的服务接口,并将这些接口的结果进行整合,才能最终返回给用户。传统的同步阻塞方式不仅效率低下,而且容易造成线程资源的浪费。CompletableFuture 作为 Java 8 引入的异步编程利器,为我们提供了一种更加高效、灵活的解决方案。

1. 为什么需要 CompletableFuture?

在深入 CompletableFuture 之前,我们先来分析一下传统方式处理多接口调用的痛点。

  • 同步阻塞: 传统的做法通常是顺序调用每个接口,直到所有接口都返回结果。这会导致调用线程阻塞,无法处理其他请求,降低了系统的吞吐量。

  • 错误处理困难: 如果某个接口调用失败,我们需要编写复杂的错误处理逻辑,并且很难保证所有资源都能被正确释放。

  • 代码可读性差: 大量的嵌套回调和同步代码使得代码难以阅读和维护。

CompletableFuture 旨在解决以上问题,它具有以下优点:

  • 异步非阻塞: 允许我们以非阻塞的方式执行接口调用,充分利用 CPU 资源。
  • 链式调用: 支持链式调用,可以轻松地将多个异步操作组合在一起。
  • 强大的错误处理: 提供了丰富的异常处理机制,可以优雅地处理各种异常情况。
  • 并行执行: 可以并行执行多个接口调用,显著缩短整体响应时间。
  • 代码简洁易懂: 使用 CompletableFuture 可以编写出更加简洁、易于维护的代码。

2. CompletableFuture 核心概念

CompletableFuture 代表一个异步计算的结果。它可以处于以下三种状态:

  • Pending: 异步计算正在进行中。
  • Done: 异步计算已完成,并返回结果或抛出异常。
  • Cancelled: 异步计算被取消。

CompletableFuture 提供了许多方法来处理异步计算的结果,例如:

  • thenApply(Function): 对结果进行转换。
  • thenAccept(Consumer): 对结果进行消费。
  • thenRun(Runnable): 在结果完成后执行一些操作。
  • exceptionally(Function): 处理异常。
  • whenComplete(BiConsumer): 在结果完成时执行一些操作,无论成功或失败。
  • allOf(CompletableFuture...): 等待所有 CompletableFuture 完成。
  • anyOf(CompletableFuture...): 只要有一个 CompletableFuture 完成就返回。

3. CompletableFuture 合并多接口调用的实战

现在,我们通过一个具体的例子来演示如何使用 CompletableFuture 合并多个接口调用。假设我们需要调用三个接口:

  • getUserInfo(userId): 获取用户信息。
  • getUserOrders(userId): 获取用户订单列表。
  • getUserCoupons(userId): 获取用户优惠券列表。

我们需要将这三个接口的结果合并成一个包含用户信息、订单列表和优惠券列表的 UserDetail 对象。

3.1 定义相关类

首先,我们定义相关的数据类:

import java.util.List;

class UserInfo {
    private String userId;
    private String userName;
    // 省略其他属性和 getter/setter 方法
    public UserInfo(String userId, String userName) {
        this.userId = userId;
        this.userName = userName;
    }

    public String getUserId() {
        return userId;
    }

    public String getUserName() {
        return userName;
    }
}

class Order {
    private String orderId;
    private String orderName;
    // 省略其他属性和 getter/setter 方法

    public Order(String orderId, String orderName) {
        this.orderId = orderId;
        this.orderName = orderName;
    }
}

class Coupon {
    private String couponId;
    private String couponName;
    // 省略其他属性和 getter/setter 方法
    public Coupon(String couponId, String couponName) {
        this.couponId = couponId;
        this.couponName = couponName;
    }
}

class UserDetail {
    private UserInfo userInfo;
    private List<Order> orders;
    private List<Coupon> coupons;

    public UserDetail(UserInfo userInfo, List<Order> orders, List<Coupon> coupons) {
        this.userInfo = userInfo;
        this.orders = orders;
        this.coupons = coupons;
    }

    public UserInfo getUserInfo() {
        return userInfo;
    }

    public List<Order> getOrders() {
        return orders;
    }

    public List<Coupon> getCoupons() {
        return coupons;
    }
    // 省略 getter/setter 方法
}

3.2 定义接口调用方法

接下来,我们定义模拟的接口调用方法。为了模拟异步调用,我们使用 CompletableFuture.supplyAsync() 方法:

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ApiService {

    private static final ExecutorService executor = Executors.newFixedThreadPool(10); // 创建一个线程池

    public static CompletableFuture<UserInfo> getUserInfo(String userId) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟接口调用,耗时操作
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return new UserInfo(userId, "张三");
        }, executor);
    }

    public static CompletableFuture<List<Order>> getUserOrders(String userId) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟接口调用,耗时操作
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return Arrays.asList(new Order("1", "订单1"), new Order("2", "订单2"));
        }, executor);
    }

    public static CompletableFuture<List<Coupon>> getUserCoupons(String userId) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟接口调用,耗时操作
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return Arrays.asList(new Coupon("100", "优惠券1"), new Coupon("200", "优惠券2"));
        }, executor);
    }
}

注意:为了模拟真实场景,我们在每个方法中都添加了 Thread.sleep() 来模拟接口调用的耗时。 CompletableFuture.supplyAsync() 的第二个参数,传入了线程池,这样异步任务就会提交到线程池中执行,避免了使用默认的 ForkJoinPool.commonPool() 导致的资源竞争问题。

3.3 使用 CompletableFuture 合并接口调用

现在,我们可以使用 CompletableFuture 来合并这三个接口的调用:

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String userId = "123";

        CompletableFuture<UserInfo> userInfoFuture = ApiService.getUserInfo(userId);
        CompletableFuture<List<Order>> userOrdersFuture = ApiService.getUserOrders(userId);
        CompletableFuture<List<Coupon>> userCouponsFuture = ApiService.getUserCoupons(userId);

        CompletableFuture<UserDetail> userDetailFuture = CompletableFuture.allOf(userInfoFuture, userOrdersFuture, userCouponsFuture)
                .thenApply(Void -> {
                    try {
                        UserInfo userInfo = userInfoFuture.get();
                        List<Order> orders = userOrdersFuture.get();
                        List<Coupon> coupons = userCouponsFuture.get();
                        return new UserDetail(userInfo, orders, coupons);
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e); // 包装成RuntimeException,方便后续处理
                    }
                });

        UserDetail userDetail = userDetailFuture.get();
        System.out.println("UserDetail: " + userDetail.getUserInfo().getUserName() + ", orders: " + userDetail.getOrders().size() + ", coupons: " + userDetail.getCoupons().size());
    }
}

在这个例子中,我们首先分别调用三个接口,得到三个 CompletableFuture 对象。然后,我们使用 CompletableFuture.allOf() 方法来等待所有 CompletableFuture 对象完成。allOf 方法返回一个新的 CompletableFuture<Void>,当所有输入的 CompletableFuture 都完成时,这个新的 CompletableFuture 也会完成。

接着,我们使用 thenApply() 方法来将三个接口的结果合并成一个 UserDetail 对象。在 thenApply() 方法中,我们使用 get() 方法来获取每个 CompletableFuture 的结果。注意,这里调用 get() 方法会阻塞,直到 CompletableFuture 完成。 但是由于allOf 已经保证了所有 futures 都已经完成,所以这里的get方法不会造成长时间阻塞。

最后,我们调用 userDetailFuture.get() 方法来获取最终的 UserDetail 对象。同样需要注意,这里也会阻塞直到userDetailFuture完成。

3.4 优化:避免阻塞,使用 thenCombine

上面的代码虽然可以实现合并接口调用的功能,但是它在 thenApply() 方法中使用了 get() 方法,这会导致阻塞。为了避免阻塞,我们可以使用 thenCombine() 方法。 thenCombine会在两个CompletableFuture都完成时,将它们的结果作为参数传递给一个BiFunction,该BiFunction负责组合结果。

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String userId = "123";

        CompletableFuture<UserInfo> userInfoFuture = ApiService.getUserInfo(userId);
        CompletableFuture<List<Order>> userOrdersFuture = ApiService.getUserOrders(userId);
        CompletableFuture<List<Coupon>> userCouponsFuture = ApiService.getUserCoupons(userId);

        CompletableFuture<UserDetail> userDetailFuture = userInfoFuture.thenCombine(userOrdersFuture, (userInfo, orders) -> {
            return userCouponsFuture.thenApply(coupons -> new UserDetail(userInfo, orders, coupons));
        }).thenCompose(future -> future);

        UserDetail userDetail = userDetailFuture.get();
        System.out.println("UserDetail: " + userDetail.getUserInfo().getUserName() + ", orders: " + userDetail.getOrders().size() + ", coupons: " + userDetail.getCoupons().size());
    }
}

在这个例子中,我们首先使用 userInfoFuture.thenCombine(userOrdersFuture)userInfoFutureuserOrdersFuture 的结果合并成一个包含 UserInfoList<Order> 的元组。然后,我们使用 thenApply 方法将这个元组和 userCouponsFuture 的结果合并成一个 UserDetail 对象。由于 thenCombine 返回的是 CompletableFuture<CompletableFuture<UserDetail>>,所以我们需要使用 thenCompose 来将它扁平化成 CompletableFuture<UserDetail>。 这样避免了阻塞,并且代码也更加简洁易懂。

3.5 错误处理

在实际开发中,接口调用可能会失败。我们需要编写相应的错误处理逻辑。 CompletableFuture 提供了 exceptionally()whenComplete() 方法来处理异常。

  • exceptionally(Function): 如果 CompletableFuture 抛出异常,则使用 Function 来处理异常,并返回一个默认值。
  • whenComplete(BiConsumer): 无论 CompletableFuture 成功或失败,都会执行 BiConsumerBiConsumer 接收两个参数:结果和异常。
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String userId = "123";

        CompletableFuture<UserInfo> userInfoFuture = ApiService.getUserInfo(userId)
                .exceptionally(ex -> {
                    System.err.println("获取用户信息失败: " + ex.getMessage());
                    return new UserInfo(userId, "default_user"); // 返回默认值
                });

        CompletableFuture<List<Order>> userOrdersFuture = ApiService.getUserOrders(userId)
                .exceptionally(ex -> {
                    System.err.println("获取用户订单失败: " + ex.getMessage());
                    return List.of(); // 返回空列表
                });

        CompletableFuture<List<Coupon>> userCouponsFuture = ApiService.getUserCoupons(userId)
                .exceptionally(ex -> {
                    System.err.println("获取用户优惠券失败: " + ex.getMessage());
                    return List.of(); // 返回空列表
                });

        CompletableFuture<UserDetail> userDetailFuture = userInfoFuture.thenCombine(userOrdersFuture, (userInfo, orders) -> {
            return userCouponsFuture.thenApply(coupons -> new UserDetail(userInfo, orders, coupons));
        }).thenCompose(future -> future)
                .whenComplete((userDetail, ex) -> {
                    if (ex != null) {
                        System.err.println("合并接口调用失败: " + ex.getMessage());
                    }
                });

        UserDetail userDetail = userDetailFuture.get();
        System.out.println("UserDetail: " + userDetail.getUserInfo().getUserName() + ", orders: " + userDetail.getOrders().size() + ", coupons: " + userDetail.getCoupons().size());
    }
}

在这个例子中,我们使用 exceptionally() 方法来处理每个接口调用可能抛出的异常。如果某个接口调用失败,我们会返回一个默认值,保证程序的正常运行。同时,我们使用 whenComplete() 方法来记录合并接口调用是否成功。

4. CompletableFuture 最佳实践

  • 使用线程池: 避免使用默认的 ForkJoinPool.commonPool(),因为它可能会被其他任务占用,导致性能下降。使用自定义的线程池可以更好地控制线程资源。

  • 避免阻塞: 尽量避免在 CompletableFuture 的回调方法中使用 get() 方法,因为它会导致阻塞。使用 thenCombine(), thenCompose() 等方法可以实现非阻塞的异步操作。

  • 处理异常: 编写完善的错误处理逻辑,使用 exceptionally()whenComplete() 方法来处理异常情况。

  • 链式调用: 充分利用 CompletableFuture 的链式调用特性,将多个异步操作组合在一起,使代码更加简洁易懂。

  • 设置超时:CompletableFuture 设置超时时间,避免长时间等待。可以使用 completeOnTimeout() 方法。

  • 避免过度使用: 虽然 CompletableFuture 功能强大,但过度使用也会导致代码复杂性增加。在简单的场景下,使用传统的同步方式可能更加合适。

5. 总结:巧妙利用 CompletableFuture,让并发编程更简单

CompletableFuture 为我们提供了一种高效、灵活的方式来合并多个接口调用。通过使用 CompletableFuture,我们可以编写出更加简洁、易于维护的异步代码,并显著提高系统的性能和吞吐量。 合理地运用线程池,避免阻塞,处理异常,设置超时等等是编写高质量并发代码的关键。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注