JAVA 多线程任务依赖结果?使用 CompletableFuture.thenCompose 链式处理

Java 多线程任务依赖结果:CompletableFuture.thenCompose 链式处理

各位朋友,大家好!今天我们来深入探讨 Java 多线程编程中一个非常重要的概念:任务依赖以及如何使用 CompletableFuture.thenCompose 进行优雅的链式处理。在实际的软件开发中,很多任务并非孤立存在,而是存在着复杂的依赖关系,一个任务的执行可能需要依赖另一个任务的结果。处理这种依赖关系,是保证程序正确性和效率的关键。

任务依赖的场景分析

在深入代码之前,我们先来看看一些常见的任务依赖场景:

  • 数据准备和处理: 假设我们需要从数据库获取用户信息,然后根据用户信息调用第三方服务获取用户画像。获取用户画像的操作显然依赖于获取用户信息的成功。
  • 订单处理: 在电商系统中,订单的创建可能依赖于库存的检查。只有库存足够,才能创建订单。
  • 异步服务调用: 多个异步服务调用,后面的服务调用需要使用前一个服务的返回值作为参数。

这些场景都体现了任务之间的依赖关系。如果我们不合理地处理这些依赖,可能会导致以下问题:

  • 阻塞: 主线程等待某个任务完成,导致程序响应缓慢。
  • 资源浪费: 多个线程同时等待同一个资源,导致资源竞争激烈。
  • 代码复杂: 使用回调函数或者复杂的同步机制,导致代码难以维护。

CompletableFuture 简介

CompletableFuture 是 Java 8 引入的一个强大的异步编程工具。它实现了 FutureCompletionStage 接口,提供了丰富的 API 来处理异步任务。CompletableFuture 的核心优势在于:

  • 异步执行: 允许在后台线程中执行任务,避免阻塞主线程。
  • 链式调用: 提供了 thenApply, thenAccept, thenRun, thenCompose 等方法,可以方便地将多个异步任务串联起来。
  • 异常处理: 提供了 exceptionally, handle, whenComplete 等方法,可以优雅地处理异步任务中的异常。
  • 组合操作: 提供了 allOf, anyOf 等方法,可以组合多个 CompletableFuture 对象。

thenCompose:处理任务依赖的核心

thenComposeCompletableFuture 中一个非常重要的方法,它专门用于处理任务依赖的场景。thenCompose 的作用是:将一个 CompletableFuture 的结果作为参数,传递给另一个返回 CompletableFuture 的函数,并将这两个 CompletableFuture 组合成一个新的 CompletableFuture

thenCompose 的方法签名如下:

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
  • T:当前 CompletableFuture 的结果类型。
  • UthenCompose 返回的 CompletableFuture 的结果类型。
  • fn:一个函数,接收 T 作为参数,返回一个 CompletionStage<U>

重点:fn 必须返回一个 CompletableFuture 或者 CompletionStage,这是 thenCompose 的关键。

thenComposethenApply 的区别:

方法 功能 返回值类型
thenApply 将一个 CompletableFuture 的结果作为参数,传递给一个函数,并将该函数的结果作为新的 CompletableFuture 的结果。 CompletableFuture<U>
thenCompose 将一个 CompletableFuture 的结果作为参数,传递给一个返回 CompletableFuture 的函数,并将这两个 CompletableFuture 组合成一个新的 CompletableFuture CompletableFuture<U>

简单来说,thenApply 用于转换结果,而 thenCompose 用于处理依赖关系。thenCompose 会 “解开” 嵌套的 CompletableFuture,最终返回一个扁平的 CompletableFuture

代码示例:用户数据和用户画像

我们用一个实际的例子来演示 thenCompose 的用法。假设我们有两个方法:

  • getUserInfo(String userId): 异步地从数据库获取用户信息,返回一个 CompletableFuture<UserInfo>
  • getUserProfile(UserInfo userInfo): 异步地根据用户信息获取用户画像,返回一个 CompletableFuture<UserProfile>

代码如下:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class UserInfo {
    private String userId;
    private String userName;

    public UserInfo(String userId, String userName) {
        this.userId = userId;
        this.userName = userName;
    }

    public String getUserId() {
        return userId;
    }

    public String getUserName() {
        return userName;
    }

    @Override
    public String toString() {
        return "UserInfo{" +
                "userId='" + userId + ''' +
                ", userName='" + userName + ''' +
                '}';
    }
}

class UserProfile {
    private String userId;
    private String profileData;

    public UserProfile(String userId, String profileData) {
        this.userId = userId;
        this.profileData = profileData;
    }

    public String getUserId() {
        return userId;
    }

    public String getProfileData() {
        return profileData;
    }

    @Override
    public String toString() {
        return "UserProfile{" +
                "userId='" + userId + ''' +
                ", profileData='" + profileData + ''' +
                '}';
    }
}

public class CompletableFutureThenComposeExample {

    private static final ExecutorService executor = Executors.newFixedThreadPool(10);

    public static CompletableFuture<UserInfo> getUserInfo(String userId) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟从数据库获取用户信息
            try {
                TimeUnit.MILLISECONDS.sleep(200); // 模拟数据库查询延迟
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("获取用户信息, thread: " + Thread.currentThread().getName());
            return new UserInfo(userId, "用户" + userId);
        }, executor);
    }

    public static CompletableFuture<UserProfile> getUserProfile(UserInfo userInfo) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟调用第三方服务获取用户画像
            try {
                TimeUnit.MILLISECONDS.sleep(300); // 模拟第三方服务调用延迟
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("获取用户画像, thread: " + Thread.currentThread().getName());
            return new UserProfile(userInfo.getUserId(), "画像数据 for " + userInfo.getUserId());
        }, executor);
    }

    public static void main(String[] args) throws Exception {
        String userId = "123";

        CompletableFuture<UserProfile> userProfileFuture = getUserInfo(userId)
                .thenCompose(userInfo -> getUserProfile(userInfo));

        UserProfile userProfile = userProfileFuture.get(); // 阻塞直到结果返回
        System.out.println("最终结果: " + userProfile);

        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
    }
}

在这个例子中,我们首先使用 getUserInfo 异步地获取用户信息,然后使用 thenComposegetUserInfo 的结果传递给 getUserProfile,从而获取用户画像。thenCompose 保证了 getUserProfile 只有在 getUserInfo 完成后才会执行。

分析:

  • getUserInfo(userId) 返回一个 CompletableFuture<UserInfo>
  • thenCompose(userInfo -> getUserProfile(userInfo)) 接收 UserInfo 作为参数,并返回一个 CompletableFuture<UserProfile>
  • thenCompose 将这两个 CompletableFuture 组合成一个新的 CompletableFuture<UserProfile>
  • userProfileFuture.get() 阻塞主线程,直到 UserProfile 的结果返回。
  • 我们使用了 ExecutorService 来管理线程池,避免创建过多的线程。

异常处理

在使用 CompletableFuture 时,异常处理至关重要。我们可以使用 exceptionally 方法来处理异步任务中的异常。

例如,如果 getUserInfo 或者 getUserProfile 抛出异常,我们可以使用 exceptionally 来提供一个默认值:

CompletableFuture<UserProfile> userProfileFuture = getUserInfo(userId)
        .thenCompose(userInfo -> getUserProfile(userInfo))
        .exceptionally(ex -> {
            System.err.println("发生异常: " + ex.getMessage());
            return new UserProfile(userId, "默认画像数据");
        });

或者,我们可以使用 handle 方法来处理异常并返回一个结果:

CompletableFuture<UserProfile> userProfileFuture = getUserInfo(userId)
        .thenCompose(userInfo -> getUserProfile(userInfo))
        .handle((userProfile, ex) -> {
            if (ex != null) {
                System.err.println("发生异常: " + ex.getMessage());
                return new UserProfile(userId, "默认画像数据");
            } else {
                return userProfile;
            }
        });

exceptionally 接收一个 Function<Throwable, ? extends T>,用于处理异常并返回一个默认值。handle 接收一个 BiFunction<? super T, Throwable, ? extends U>,用于处理正常结果和异常,并返回一个结果。

进一步的优化:使用不同的 Executor

默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool() 作为默认的执行器。在某些情况下,我们可能需要使用自定义的 Executor 来执行任务,以提高程序的性能和可控性。

我们可以在 supplyAsync 方法中指定 Executor

public static CompletableFuture<UserInfo> getUserInfo(String userId) {
    return CompletableFuture.supplyAsync(() -> {
        // 模拟从数据库获取用户信息
        try {
            TimeUnit.MILLISECONDS.sleep(200); // 模拟数据库查询延迟
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println("获取用户信息, thread: " + Thread.currentThread().getName());
        return new UserInfo(userId, "用户" + userId);
    }, executor); // 使用自定义的 Executor
}

通过使用不同的 Executor,我们可以将不同的任务分配到不同的线程池中,从而避免线程之间的竞争,提高程序的并发性能。

实际案例:订单处理流程

现在,让我们考虑一个更复杂的实际案例:订单处理流程。假设订单处理流程包含以下步骤:

  1. 库存检查: 检查商品库存是否足够。
  2. 创建订单: 如果库存足够,则创建订单。
  3. 支付: 尝试进行支付。
  4. 发送通知: 订单创建成功后,发送通知给用户。

我们可以使用 CompletableFuturethenCompose 来实现这个流程:

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class Order {
    private String orderId;
    private String userId;
    private String productId;
    private int quantity;
    private boolean paid;

    public Order(String orderId, String userId, String productId, int quantity) {
        this.orderId = orderId;
        this.userId = userId;
        this.productId = productId;
        this.quantity = quantity;
        this.paid = false;
    }

    public String getOrderId() {
        return orderId;
    }

    public String getUserId() {
        return userId;
    }

    public String getProductId() {
        return productId;
    }

    public int getQuantity() {
        return quantity;
    }

    public boolean isPaid() {
        return paid;
    }

    public void setPaid(boolean paid) {
        this.paid = paid;
    }

    @Override
    public String toString() {
        return "Order{" +
                "orderId='" + orderId + ''' +
                ", userId='" + userId + ''' +
                ", productId='" + productId + ''' +
                ", quantity=" + quantity +
                ", paid=" + paid +
                '}';
    }
}

public class OrderProcessingExample {

    private static final ExecutorService executor = Executors.newFixedThreadPool(10);
    private static final Random random = new Random();

    public static CompletableFuture<Boolean> checkInventory(String productId, int quantity) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟库存检查
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            boolean hasEnoughInventory = random.nextBoolean(); // 随机模拟库存是否足够
            System.out.println("检查库存, productId: " + productId + ", quantity: " + quantity + ", hasEnoughInventory: " + hasEnoughInventory + ", thread: " + Thread.currentThread().getName());
            return hasEnoughInventory;
        }, executor);
    }

    public static CompletableFuture<Order> createOrder(String userId, String productId, int quantity) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟创建订单
            try {
                TimeUnit.MILLISECONDS.sleep(150);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            String orderId = "ORDER-" + System.currentTimeMillis();
            Order order = new Order(orderId, userId, productId, quantity);
            System.out.println("创建订单, orderId: " + orderId + ", thread: " + Thread.currentThread().getName());
            return order;
        }, executor);
    }

    public static CompletableFuture<Order> processPayment(Order order) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟支付
            try {
                TimeUnit.MILLISECONDS.sleep(200);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            boolean paymentSuccessful = random.nextBoolean(); // 随机模拟支付是否成功
            if (paymentSuccessful) {
                order.setPaid(true);
                System.out.println("支付成功, orderId: " + order.getOrderId() + ", thread: " + Thread.currentThread().getName());
            } else {
                System.out.println("支付失败, orderId: " + order.getOrderId() + ", thread: " + Thread.currentThread().getName());
                throw new RuntimeException("支付失败");
            }
            return order;
        }, executor);
    }

    public static CompletableFuture<Void> sendNotification(Order order) {
        return CompletableFuture.runAsync(() -> {
            // 模拟发送通知
            try {
                TimeUnit.MILLISECONDS.sleep(50);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("发送通知, orderId: " + order.getOrderId() + ", thread: " + Thread.currentThread().getName());
        }, executor);
    }

    public static void main(String[] args) throws Exception {
        String userId = "USER-123";
        String productId = "PRODUCT-456";
        int quantity = 2;

        CompletableFuture<Order> orderFuture = checkInventory(productId, quantity)
                .thenCompose(hasEnoughInventory -> {
                    if (hasEnoughInventory) {
                        return createOrder(userId, productId, quantity);
                    } else {
                        return CompletableFuture.failedFuture(new RuntimeException("库存不足"));
                    }
                })
                .thenCompose(order -> processPayment(order))
                .thenCompose(order -> sendNotification(order).thenApply(v -> order)) // sendNotification 返回 Void, 这里需要转换成 Order
                .exceptionally(ex -> {
                    System.err.println("订单处理失败: " + ex.getMessage());
                    return null; // 返回 null 表示订单处理失败
                });

        Order finalOrder = orderFuture.get();

        if (finalOrder != null) {
            System.out.println("订单处理完成: " + finalOrder);
        } else {
            System.out.println("订单处理失败");
        }

        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
    }
}

代码分析:

  1. checkInventory 检查库存,返回 CompletableFuture<Boolean>
  2. 如果库存足够,createOrder 创建订单,返回 CompletableFuture<Order>。 如果库存不足,返回一个失败的 CompletableFuture
  3. processPayment 处理支付,返回 CompletableFuture<Order>
  4. sendNotification 发送通知,返回 CompletableFuture<Void>。 由于 sendNotification 返回 Void,我们需要使用 thenApply(v -> order) 将其转换为 CompletableFuture<Order>,以便后续的链式调用。
  5. exceptionally 处理异常。

这个例子展示了如何使用 CompletableFuturethenCompose 来构建一个复杂的异步流程,并处理各种异常情况。

避免死锁

在使用 CompletableFuture 时,需要特别注意避免死锁。死锁通常发生在以下情况:

  • 互相依赖: 两个或多个 CompletableFuture 互相等待对方完成。
  • 阻塞等待:CompletableFuture 的回调函数中使用 get() 方法阻塞等待结果。

为了避免死锁,应该尽量避免阻塞等待,并使用 CompletableFuture 提供的异步方法来处理结果。

总结:使用 thenCompose 优雅地处理任务依赖

今天我们深入探讨了 Java 多线程编程中任务依赖的概念,以及如何使用 CompletableFuture.thenCompose 进行链式处理。thenCompose 允许我们优雅地将多个异步任务串联起来,处理任务之间的依赖关系,并进行有效的异常处理。 通过实际案例,我们展示了 thenCompose 在用户数据处理和订单处理流程中的应用。希望这些内容能帮助大家更好地理解和使用 CompletableFuture,写出更高效、更健壮的并发程序。

关键要点回顾

  • thenCompose 专门用于处理异步任务间的依赖关系。
  • thenCompose 接收一个返回 CompletableFuture 的函数作为参数。
  • 要避免在使用 CompletableFuture 时出现死锁。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注