Java CompletableFuture进阶:异步流处理、异常处理与定制化线程池

Java CompletableFuture进阶:异步流处理、异常处理与定制化线程池

大家好,今天我们来深入探讨Java CompletableFuture,重点关注异步流处理、异常处理以及定制化线程池的使用。CompletableFuture是Java 8引入的一个强大的异步编程工具,它极大地简化了异步编程的复杂性,并提供了丰富的功能来处理并发任务。

一、CompletableFuture基础回顾

在深入高级用法之前,我们先简单回顾CompletableFuture的基础知识。CompletableFuture代表一个异步计算的结果,这个结果可能已经完成,也可能尚未完成。它提供了一系列方法来创建、组合、转换和处理异步计算的结果。

创建CompletableFuture:

  • CompletableFuture.supplyAsync(Supplier<U> supplier): 使用提供的Supplier异步执行计算,并返回一个CompletableFuture。
  • CompletableFuture.runAsync(Runnable runnable): 使用提供的Runnable异步执行任务,不返回结果。
  • CompletableFuture.completedFuture(T value): 创建一个已经完成的CompletableFuture,其结果为给定的值。

组合CompletableFuture:

  • thenApply(Function<T, U> fn): 在CompletableFuture完成后,对结果应用一个函数。
  • thenAccept(Consumer<T> consumer): 在CompletableFuture完成后,对结果应用一个Consumer。
  • thenRun(Runnable runnable): 在CompletableFuture完成后,执行一个Runnable。
  • thenCompose(Function<T, CompletableFuture<U>> fn): 在CompletableFuture完成后,应用一个函数,该函数返回另一个CompletableFuture。
  • thenCombine(CompletionStage<U> other, BiFunction<T, U, V> fn): 将当前CompletableFuture与另一个CompletionStage组合,当两者都完成后,应用一个BiFunction。
  • thenAcceptBoth(CompletionStage<U> other, BiConsumer<T, U> consumer): 将当前CompletableFuture与另一个CompletionStage组合,当两者都完成后,应用一个BiConsumer。
  • runAfterBoth(CompletionStage<?> other, Runnable runnable): 将当前CompletableFuture与另一个CompletionStage组合,当两者都完成后,执行一个Runnable。
  • applyToEither(CompletionStage<T> other, Function<T, U> fn): 当当前CompletableFuture或另一个CompletionStage中的任何一个完成时,应用一个函数。
  • acceptEither(CompletionStage<T> other, Consumer<T> consumer): 当当前CompletableFuture或另一个CompletionStage中的任何一个完成时,应用一个Consumer。
  • runAfterEither(CompletionStage<?> other, Runnable runnable): 当当前CompletableFuture或另一个CompletionStage中的任何一个完成时,执行一个Runnable。

二、异步流处理

CompletableFuture可以与Java 8的Stream API结合使用,实现高效的异步流处理。这种方式特别适合处理需要并行执行的任务,例如从多个数据源获取数据,或者对大量数据进行并行处理。

示例:并行下载多个URL的内容

假设我们需要从多个URL下载内容,并对下载的内容进行处理。使用CompletableFuture和Stream API,我们可以并行地下载这些内容,并异步地处理结果。

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class AsyncStreamProcessing {

    public static void main(String[] args) {
        List<String> urls = Arrays.asList(
                "https://www.example.com",
                "https://www.google.com",
                "https://www.baidu.com"
        );

        List<CompletableFuture<String>> futures = urls.stream()
                .map(url -> CompletableFuture.supplyAsync(() -> downloadContent(url)))
                .collect(Collectors.toList());

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

        CompletableFuture<List<String>> contentsFuture = allFutures.thenApply(v -> {
            return futures.stream()
                    .map(CompletableFuture::join) //join()等待并获取结果,会阻塞线程
                    .collect(Collectors.toList());
        });

        contentsFuture.thenAccept(contents -> {
            contents.forEach(content -> System.out.println("Content Length: " + content.length()));
        }).join();
    }

    private static String downloadContent(String url) {
        HttpClient client = HttpClient.newHttpClient();
        HttpRequest request = HttpRequest.newBuilder().uri(URI.create(url)).build();
        try {
            HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
            return response.body();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

代码解释:

  1. 创建CompletableFuture列表: urls.stream().map(url -> CompletableFuture.supplyAsync(() -> downloadContent(url))).collect(Collectors.toList()); 这部分代码将URL列表转换为CompletableFuture列表。 CompletableFuture.supplyAsync() 用于异步下载每个URL的内容。
  2. 等待所有CompletableFuture完成: CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); CompletableFuture.allOf() 返回一个新的CompletableFuture,它在所有输入的CompletableFuture都完成后完成。
  3. 获取所有内容: futures.stream().map(CompletableFuture::join).collect(Collectors.toList()); 这部分代码获取所有已完成的CompletableFuture的结果。 join() 方法会阻塞当前线程,直到CompletableFuture完成。
  4. 处理内容: contentsFuture.thenAccept(contents -> { contents.forEach(content -> System.out.println("Content Length: " + content.length())); }).join(); 这部分代码在所有内容下载完成后,打印每个内容的长度。

关键点:

  • 使用CompletableFuture.supplyAsync()可以异步地执行下载任务。
  • CompletableFuture.allOf()用于等待所有异步任务完成。
  • CompletableFuture::join用于获取异步任务的结果,注意join()会阻塞线程,在异步流处理中一般用在最后的结果获取处。

三、异常处理

CompletableFuture提供了强大的异常处理机制,可以优雅地处理异步计算中可能出现的异常。

异常处理方法:

  • exceptionally(Function<Throwable, T> fn): 如果CompletableFuture抛出异常,则应用提供的函数来返回一个备用值。
  • handle(BiFunction<T, Throwable, U> fn): 无论CompletableFuture是正常完成还是抛出异常,都应用提供的函数。
  • whenComplete(BiConsumer<T, Throwable> action): 无论CompletableFuture是正常完成还是抛出异常,都执行提供的操作。

示例:处理下载内容时可能发生的异常

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class AsyncExceptionHandling {

    public static void main(String[] args) {
        List<String> urls = Arrays.asList(
                "https://www.example.com",
                "https://www.google.com",
                "https://www.baidu.com",
                "https://invalid-url" // 模拟一个无效的URL
        );

        List<CompletableFuture<String>> futures = urls.stream()
                .map(url -> CompletableFuture.supplyAsync(() -> downloadContent(url))
                        .exceptionally(ex -> {
                            System.err.println("Error downloading " + url + ": " + ex.getMessage());
                            return "Error: " + ex.getMessage(); // 返回一个错误消息作为备用值
                        }))
                .collect(Collectors.toList());

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

        CompletableFuture<List<String>> contentsFuture = allFutures.thenApply(v -> {
            return futures.stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());
        });

        contentsFuture.thenAccept(contents -> {
            contents.forEach(content -> System.out.println("Content: " + content));
        }).join();
    }

    private static String downloadContent(String url) {
        HttpClient client = HttpClient.newHttpClient();
        HttpRequest request = HttpRequest.newBuilder().uri(URI.create(url)).build();
        try {
            HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
            return response.body();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

代码解释:

  1. 使用exceptionally()处理异常: .exceptionally(ex -> { ... }) 这部分代码使用exceptionally()方法来处理downloadContent()方法中可能抛出的异常。如果下载过程中发生异常,exceptionally()方法会捕获该异常,并返回一个错误消息作为备用值。
  2. 打印错误信息:exceptionally()方法中,我们打印了错误信息,以便于调试。
  3. 返回备用值: return "Error: " + ex.getMessage(); 如果发生异常,我们返回一个错误消息作为备用值,这样可以保证CompletableFuture始终返回一个结果,即使该结果是一个错误消息。

其他异常处理方式:

  • 使用handle() handle()方法可以同时处理正常结果和异常。
CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("Simulated error");
    }
    return "Success";
}).handle((result, ex) -> {
    if (ex != null) {
        System.err.println("Error: " + ex.getMessage());
        return "Fallback Value";
    } else {
        return result;
    }
}).thenAccept(System.out::println).join();
  • 使用whenComplete() whenComplete()方法可以执行一个操作,无论CompletableFuture是正常完成还是抛出异常。 它主要用于记录日志或执行清理操作。
CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("Simulated error");
    }
    return "Success";
}).whenComplete((result, ex) -> {
    if (ex != null) {
        System.err.println("Completed with error: " + ex.getMessage());
    } else {
        System.out.println("Completed with result: " + result);
    }
}).join();
方法 描述 用途
exceptionally 如果CompletableFuture抛出异常,则应用提供的函数来返回一个备用值。 提供备用值,防止程序崩溃。
handle 无论CompletableFuture是正常完成还是抛出异常,都应用提供的函数。 同时处理正常结果和异常。
whenComplete 无论CompletableFuture是正常完成还是抛出异常,都执行提供的操作。 记录日志、执行清理操作。

四、定制化线程池

CompletableFuture默认使用ForkJoinPool.commonPool()作为其执行线程池。虽然默认线程池在很多情况下都足够使用,但在某些情况下,我们可能需要使用定制化的线程池,以便更好地控制并发度、线程优先级等。

创建定制化线程池:

我们可以使用ExecutorService接口来创建定制化的线程池。例如,我们可以使用ThreadPoolExecutor来创建一个固定大小的线程池。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPool {

    public static void main(String[] args) throws InterruptedException {
        // 创建一个固定大小的线程池,包含5个线程
        ExecutorService executor = Executors.newFixedThreadPool(5);

        // 提交10个任务到线程池
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running on thread: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟耗时操作
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task " + taskNumber + " is completed.");
            });
        }

        // 关闭线程池
        executor.shutdown();

        // 等待所有任务完成
        executor.awaitTermination(10, TimeUnit.SECONDS);

        System.out.println("All tasks are completed.");
    }
}

在CompletableFuture中使用定制化线程池:

在创建CompletableFuture时,我们可以将定制化的线程池作为参数传递给supplyAsync()runAsync()等方法。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureWithCustomThreadPool {

    public static void main(String[] args) {
        // 创建一个包含2个线程的线程池
        ExecutorService executor = Executors.newFixedThreadPool(2);

        // 使用定制化的线程池异步执行任务
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task is running on thread: " + Thread.currentThread().getName());
            try {
                Thread.sleep(2000); // 模拟耗时操作
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Task completed";
        }, executor);

        // 处理结果
        future.thenAccept(result -> System.out.println("Result: " + result + " processed on thread: " + Thread.currentThread().getName()));

        // 关闭线程池
        executor.shutdown();
    }
}

选择合适的线程池类型:

选择合适的线程池类型取决于具体的应用场景。

  • newFixedThreadPool(int nThreads) 创建一个固定大小的线程池,包含指定数量的线程。 适合处理CPU密集型任务,可以有效地限制并发度。
  • newCachedThreadPool() 创建一个可缓存的线程池,线程数量可以动态增长。 适合处理IO密集型任务,可以充分利用系统资源。
  • newSingleThreadExecutor() 创建一个单线程的线程池,所有任务按顺序执行。 适合处理需要顺序执行的任务。
  • newScheduledThreadPool(int corePoolSize) 创建一个可以调度任务的线程池,可以定时或周期性地执行任务。

监控线程池状态:

可以使用ThreadPoolExecutor提供的方法来监控线程池的状态,例如:

  • getPoolSize():获取线程池的当前大小。
  • getActiveCount():获取正在执行任务的线程数量。
  • getCompletedTaskCount():获取已完成的任务数量。
  • getQueue().size():获取等待队列中的任务数量。

五、实战案例:构建一个异步的数据聚合服务

现在,我们结合前面所学的知识,来实现一个稍微复杂一点的实战案例:构建一个异步的数据聚合服务。该服务从多个数据源获取数据,并将这些数据聚合在一起,最终返回给客户端。

场景描述:

假设我们需要从三个不同的数据源获取用户信息:

  1. DatabaseService: 从数据库获取用户基本信息。
  2. RemoteApiService: 从远程API获取用户详细信息。
  3. CacheService: 从缓存获取用户偏好设置。

我们需要并行地从这三个数据源获取数据,并将这些数据聚合在一起,最终返回一个包含所有用户信息的UserProfile对象。

代码实现:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncDataAggregation {

    public static void main(String[] args) {
        // 创建定制化的线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // 创建数据源服务
        DatabaseService databaseService = new DatabaseService();
        RemoteApiService remoteApiService = new RemoteApiService();
        CacheService cacheService = new CacheService();

        // 异步获取数据
        CompletableFuture<UserBasicInfo> basicInfoFuture = CompletableFuture.supplyAsync(() -> databaseService.getUserBasicInfo(123), executor);
        CompletableFuture<UserDetailInfo> detailInfoFuture = CompletableFuture.supplyAsync(() -> remoteApiService.getUserDetailInfo(123), executor);
        CompletableFuture<UserPreferences> preferencesFuture = CompletableFuture.supplyAsync(() -> cacheService.getUserPreferences(123), executor);

        // 聚合数据
        CompletableFuture<UserProfile> userProfileFuture = basicInfoFuture.thenCombine(detailInfoFuture, (basicInfo, detailInfo) -> {
            UserProfile profile = new UserProfile();
            profile.setUserId(basicInfo.getUserId());
            profile.setUserName(basicInfo.getUserName());
            profile.setAge(basicInfo.getAge());
            profile.setEmail(detailInfo.getEmail());
            profile.setPhoneNumber(detailInfo.getPhoneNumber());
            return profile;
        }).thenCombine(preferencesFuture, (profile, preferences) -> {
            profile.setPreferences(preferences);
            return profile;
        });

        // 处理结果
        userProfileFuture.thenAccept(profile -> {
            System.out.println("User Profile: " + profile);
        }).exceptionally(ex -> {
            System.err.println("Error aggregating data: " + ex.getMessage());
            return null;
        }).join();

        // 关闭线程池
        executor.shutdown();
    }
}

// 模拟数据源服务
class DatabaseService {
    public UserBasicInfo getUserBasicInfo(int userId) {
        System.out.println("Fetching basic info from database for user: " + userId + " on thread: " + Thread.currentThread().getName());
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        UserBasicInfo info = new UserBasicInfo();
        info.setUserId(userId);
        info.setUserName("John Doe");
        info.setAge(30);
        return info;
    }
}

class RemoteApiService {
    public UserDetailInfo getUserDetailInfo(int userId) {
        System.out.println("Fetching detail info from remote API for user: " + userId + " on thread: " + Thread.currentThread().getName());
        try {
            Thread.sleep(800);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        UserDetailInfo info = new UserDetailInfo();
        info.setEmail("[email protected]");
        info.setPhoneNumber("123-456-7890");
        return info;
    }
}

class CacheService {
    public UserPreferences getUserPreferences(int userId) {
        System.out.println("Fetching preferences from cache for user: " + userId + " on thread: " + Thread.currentThread().getName());
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        UserPreferences preferences = new UserPreferences();
        preferences.setTheme("dark");
        preferences.setLanguage("en");
        return preferences;
    }
}

// 定义数据模型
class UserBasicInfo {
    private int userId;
    private String userName;
    private int age;

    // Getters and setters
    public int getUserId() { return userId; }
    public void setUserId(int userId) { this.userId = userId; }
    public String getUserName() { return userName; }
    public void setUserName(String userName) { this.userName = userName; }
    public int getAge() { return age; }
    public void setAge(int age) { this.age = age; }

    @Override
    public String toString() {
        return "UserBasicInfo{" +
                "userId=" + userId +
                ", userName='" + userName + ''' +
                ", age=" + age +
                '}';
    }
}

class UserDetailInfo {
    private String email;
    private String phoneNumber;

    // Getters and setters
    public String getEmail() { return email; }
    public void setEmail(String email) { this.email = email; }
    public String getPhoneNumber() { return phoneNumber; }
    public void setPhoneNumber(String phoneNumber) { this.phoneNumber = phoneNumber; }

    @Override
    public String toString() {
        return "UserDetailInfo{" +
                "email='" + email + ''' +
                ", phoneNumber='" + phoneNumber + ''' +
                '}';
    }
}

class UserPreferences {
    private String theme;
    private String language;

    // Getters and setters
    public String getTheme() { return theme; }
    public void setTheme(String theme) { this.theme = theme; }
    public String getLanguage() { return language; }
    public void setLanguage(String language) { this.language = language; }

    @Override
    public String toString() {
        return "UserPreferences{" +
                "theme='" + theme + ''' +
                ", language='" + language + ''' +
                '}';
    }
}

class UserProfile {
    private int userId;
    private String userName;
    private int age;
    private String email;
    private String phoneNumber;
    private UserPreferences preferences;

    // Getters and setters
    public int getUserId() { return userId; }
    public void setUserId(int userId) { this.userId = userId; }
    public String getUserName() { return userName; }
    public void setUserName(String userName) { this.userName = userName; }
    public int getAge() { return age; }
    public void setAge(int age) { this.age = age; }
    public String getEmail() { return email; }
    public void setEmail(String email) { this.email = email; }
    public String getPhoneNumber() { return phoneNumber; }
    public void setPhoneNumber(String phoneNumber) { this.phoneNumber = phoneNumber; }
    public UserPreferences getPreferences() { return preferences; }
    public void setPreferences(UserPreferences preferences) { this.preferences = preferences; }

    @Override
    public String toString() {
        return "UserProfile{" +
                "userId=" + userId +
                ", userName='" + userName + ''' +
                ", age=" + age +
                ", email='" + email + ''' +
                ", phoneNumber='" + phoneNumber + ''' +
                ", preferences=" + preferences +
                '}';
    }
}

代码解释:

  1. 创建定制化的线程池: ExecutorService executor = Executors.newFixedThreadPool(3); 我们创建了一个包含3个线程的线程池,用于并行地执行数据获取任务。
  2. 异步获取数据: 使用CompletableFuture.supplyAsync()从三个数据源异步获取数据。
  3. 聚合数据: 使用thenCombine()方法将三个CompletableFuture的结果聚合在一起。 thenCombine()方法会在两个CompletableFuture都完成后执行,并将两个结果作为参数传递给BiFunction。
  4. 处理结果: 使用thenAccept()方法处理最终的UserProfile对象。 使用exceptionally()方法处理可能发生的异常。
  5. 关闭线程池: executor.shutdown(); 在所有任务完成后,关闭线程池。

总结

CompletableFuture为Java异步编程提供了强大的工具,能够简化异步流处理,优雅地处理异常,并支持定制化的线程池配置。掌握这些高级特性,可以编写出更高效、更健壮的并发程序,提高系统的性能和响应能力。

核心要点回顾

  • CompletableFuture结合Stream API 可以高效进行异步流处理。
  • 使用exceptionally、handle、whenComplete等方法可以优雅地处理异步任务中的异常。
  • 通过定制化线程池,可以更好地控制并发度,优化资源利用,提升系统性能。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注