Java `CompletableFuture` `Composition` (`thenCompose`, `thenCombine`, `allOf`, `anyOf`) 异步编排

各位观众老爷,晚上好!今天咱们来聊聊Java CompletableFuture 里那些让人眼花缭乱的“组合技”——thenCompose, thenCombine, allOf, anyOf。 保证让你们听完,感觉自己也能玩转异步编排,成为异步世界里的Tony Stark!

开场白:异步世界的呼唤

话说,在单核CPU的年代,代码是线性的,你等着我,我等着他,大家排队执行,其乐融融。 但自从多核CPU横空出世,大家突然发现,排队效率太低了! 于是,异步编程应运而生,让大家可以并行执行,充分利用CPU资源。 然而,异步编程也不是那么容易驾驭的,尤其是在需要多个异步任务之间相互依赖的时候,代码很容易变得像一团乱麻。

CompletableFuture 就是Java为了解决这个问题而推出的利器,它提供了一系列强大的API,让我们能够以更优雅的方式进行异步编排。

第一部分:thenCompose – 异步任务的“链式反应”

thenCompose 就像异步任务的“多米诺骨牌”,一个任务完成后,它的结果会作为下一个任务的输入,形成一条链式反应。

原理讲解:

thenCompose 方法接收一个 Function 作为参数,这个 Function 的输入是上一个 CompletableFuture 的结果,输出是一个新的 CompletableFuture。 关键在于,thenCompose 会把两个 CompletableFuture “连接” 起来,只有当第一个 CompletableFuture 完成后,才会执行 Function 并创建第二个 CompletableFuture,并且最终返回的是第二个 CompletableFuture 的结果。

代码示例:

假设我们有两个异步任务:

  • getUserName(userId):根据用户ID获取用户名
  • getPhoneNumber(userName):根据用户名获取电话号码

如果我们要先获取用户名,然后再根据用户名获取电话号码,就可以使用 thenCompose

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenComposeExample {

    public static CompletableFuture<String> getUserName(String userId) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "User_" + userId;
        });
    }

    public static CompletableFuture<String> getPhoneNumber(String userName) {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "1380000" + userName.hashCode();
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> phoneNumberFuture = getUserName("123")
                .thenCompose(userName -> getPhoneNumber(userName));

        System.out.println("PhoneNumber: " + phoneNumberFuture.get()); // 输出电话号码
    }
}

代码解读:

  1. getUserName("123") 返回一个 CompletableFuture<String>,表示获取用户名的异步任务。
  2. .thenCompose(userName -> getPhoneNumber(userName))getUserName 的结果作为输入,传递给 getPhoneNumber 函数,并返回一个新的 CompletableFuture<String>,表示获取电话号码的异步任务。
  3. phoneNumberFuture.get() 会阻塞当前线程,直到 phoneNumberFuture 完成,然后返回电话号码。

thenComposethenApply 的区别:

很多小伙伴容易把 thenComposethenApply 搞混,它们最大的区别在于:

  • thenApply:接收一个 Function,对上一个 CompletableFuture 的结果进行转换,并返回一个新的 CompletableFuture但是,这个新的 CompletableFuture 包装的是 Function 返回的普通值,而不是另一个 CompletableFuture
  • thenCompose:接收一个 Function,对上一个 CompletableFuture 的结果进行转换,并返回一个新的 CompletableFuture,这个新的 CompletableFuture 包装的是 Function 返回的另一个 CompletableFuture

换句话说,thenApply 只是对结果进行简单的转换,而 thenCompose 则可以将多个异步任务串联起来,形成一个更复杂的异步流程。

第二部分:thenCombine – 异步任务的“强强联合”

thenCombine 就像两个超级英雄联手,各自完成自己的任务,然后合力完成最终的目标。

原理讲解:

thenCombine 方法接收两个参数:

  • 一个 CompletableFuture:表示另一个需要并行执行的异步任务。
  • 一个 BiFunction:接收两个 CompletableFuture 的结果作为输入,并返回最终的结果。

thenCombine 会同时执行两个 CompletableFuture,当它们都完成后,会将它们的结果传递给 BiFunction,并返回一个新的 CompletableFuture,包装 BiFunction 的返回值。

代码示例:

假设我们有两个异步任务:

  • getPrice():获取商品价格
  • getDiscount():获取折扣

如果我们要计算最终的商品价格(价格 * 折扣),就可以使用 thenCombine

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenCombineExample {

    public static CompletableFuture<Double> getPrice() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return 100.0;
        });
    }

    public static CompletableFuture<Double> getDiscount() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return 0.8;
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Double> finalPriceFuture = getPrice()
                .thenCombine(getDiscount(), (price, discount) -> price * discount);

        System.out.println("Final Price: " + finalPriceFuture.get()); // 输出最终价格
    }
}

代码解读:

  1. getPrice()getDiscount() 分别返回一个 CompletableFuture<Double>,表示获取价格和折扣的异步任务。
  2. .thenCombine(getDiscount(), (price, discount) -> price * discount)getPrice 的结果和 getDiscount 的结果作为输入,传递给 (price, discount) -> price * discount 这个 BiFunction,计算最终价格,并返回一个新的 CompletableFuture<Double>,包装最终价格。
  3. finalPriceFuture.get() 会阻塞当前线程,直到 finalPriceFuture 完成,然后返回最终价格。

thenCombine 的变体:

thenCombine 还有一些变体,例如:

  • thenAcceptBoth:和 thenCombine 类似,但是 BiFunction 的返回值是 void,适用于不需要返回结果的情况。
  • runAfterBoth:不接收任何参数,也不返回任何结果,只是在两个 CompletableFuture 都完成后执行一个 Runnable

第三部分:allOf – 异步任务的“集结号”

allOf 就像一个集结号,等待所有异步任务完成,然后才能进行下一步操作。

原理讲解:

allOf 方法接收一个 CompletableFuture 数组作为参数,返回一个新的 CompletableFuture<Void>。 只有当数组中的所有 CompletableFuture 都完成后,新的 CompletableFuture 才会完成。 注意,allOf 返回的 CompletableFuture 本身不包含任何结果,你需要自己去获取每个 CompletableFuture 的结果。

代码示例:

假设我们有三个异步任务:

  • task1()
  • task2()
  • task3()

我们要等待所有任务都完成后,才能打印 "All tasks completed"。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AllOfExample {

    public static CompletableFuture<String> task1() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Task 1 completed";
        });
    }

    public static CompletableFuture<String> task2() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(150);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Task 2 completed";
        });
    }

    public static CompletableFuture<String> task3() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Task 3 completed";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1(), task2(), task3());

        allTasks.get(); // 阻塞当前线程,直到所有任务完成

        System.out.println("All tasks completed");

        // 获取每个任务的结果
        System.out.println(task1().get());
        System.out.println(task2().get());
        System.out.println(task3().get());
    }
}

代码解读:

  1. CompletableFuture.allOf(task1(), task2(), task3()) 返回一个 CompletableFuture<Void>,表示等待所有任务完成。
  2. allTasks.get() 会阻塞当前线程,直到 allTasks 完成,也就是所有任务都完成。
  3. System.out.println("All tasks completed") 会在所有任务都完成后执行。
  4. task1().get(), task2().get(), task3().get() 分别获取每个任务的结果。

allOf 的应用场景:

allOf 适用于需要等待多个独立异步任务都完成后才能进行下一步操作的场景,例如:

  • 批量处理数据
  • 并行下载多个文件
  • 同时调用多个微服务

第四部分:anyOf – 异步任务的“赛跑”

anyOf 就像一场赛跑,只要有一个异步任务完成,就可以进行下一步操作。

原理讲解:

anyOf 方法接收一个 CompletableFuture 数组作为参数,返回一个新的 CompletableFuture<Object>。 只要数组中的任何一个 CompletableFuture 完成,新的 CompletableFuture 就会完成,并且返回第一个完成的 CompletableFuture 的结果。

代码示例:

假设我们有三个异步任务:

  • task1()
  • task2()
  • task3()

我们要只要有一个任务完成,就打印 "A task completed"。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AnyOfExample {

    public static CompletableFuture<String> task1() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Task 1 completed";
        });
    }

    public static CompletableFuture<String> task2() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Task 2 completed";
        });
    }

    public static CompletableFuture<String> task3() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(150);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Task 3 completed";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Object> anyTask = CompletableFuture.anyOf(task1(), task2(), task3());

        System.out.println("Result: " + anyTask.get()); // 输出第一个完成的任务的结果

        System.out.println("A task completed");
    }
}

代码解读:

  1. CompletableFuture.anyOf(task1(), task2(), task3()) 返回一个 CompletableFuture<Object>,表示等待任何一个任务完成。
  2. anyTask.get() 会阻塞当前线程,直到 anyTask 完成,也就是任何一个任务都完成。
  3. System.out.println("Result: " + anyTask.get()) 会输出第一个完成的任务的结果。
  4. System.out.println("A task completed") 会在任何一个任务都完成后执行。

anyOf 的应用场景:

anyOf 适用于只需要等待多个异步任务中的任何一个完成就可以进行下一步操作的场景,例如:

  • 从多个缓存中读取数据,只要从任何一个缓存中读取到数据就可以返回。
  • 向多个服务器发送请求,只要收到任何一个服务器的响应就可以返回。

第五部分:总结与表格对比

我们用一个表格来总结一下今天讲的这几个“组合技”:

方法 作用 输入参数 返回值类型 应用场景
thenCompose 将两个 CompletableFuture 连接起来,形成链式反应,前一个 CompletableFuture 的结果作为后一个 CompletableFuture 的输入。 Function<T, CompletableFuture<U>>:接收上一个 CompletableFuture 的结果作为输入,返回一个新的 CompletableFuture CompletableFuture<U> 异步任务之间存在依赖关系,需要按顺序执行。
thenCombine 并行执行两个 CompletableFuture,当它们都完成后,将它们的结果合并成一个结果。 CompletableFuture<U>, BiFunction<T, U, V>:另一个 CompletableFuture 和一个 BiFunction,接收两个 CompletableFuture 的结果作为输入,返回最终的结果。 CompletableFuture<V> 异步任务之间没有依赖关系,可以并行执行,最终需要将它们的结果合并。
allOf 等待所有 CompletableFuture 完成。 CompletableFuture<?>...:一个 CompletableFuture 数组。 CompletableFuture<Void> 需要等待多个独立的异步任务都完成后才能进行下一步操作。
anyOf 只要任何一个 CompletableFuture 完成即可。 CompletableFuture<?>...:一个 CompletableFuture 数组。 CompletableFuture<Object> 只需要等待多个异步任务中的任何一个完成就可以进行下一步操作。

结束语:异步编排的艺术

CompletableFuture 的这些“组合技” 就像乐高积木,可以灵活地组合成各种各样的异步流程,让我们的代码更加简洁、易读、易维护。 掌握它们,你就能在异步的世界里自由驰骋,编写出高效、强大的并发程序。

希望今天的讲解对大家有所帮助! 感谢各位的观看,下课!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注