Java CompletableFuture进阶:异步流处理、异常处理与定制化线程池
大家好,今天我们来深入探讨Java CompletableFuture,重点关注异步流处理、异常处理以及定制化线程池的使用。CompletableFuture是Java 8引入的一个强大的异步编程工具,它极大地简化了异步编程的复杂性,并提供了丰富的功能来处理并发任务。
一、CompletableFuture基础回顾
在深入高级用法之前,我们先简单回顾CompletableFuture的基础知识。CompletableFuture代表一个异步计算的结果,这个结果可能已经完成,也可能尚未完成。它提供了一系列方法来创建、组合、转换和处理异步计算的结果。
创建CompletableFuture:
CompletableFuture.supplyAsync(Supplier<U> supplier): 使用提供的Supplier异步执行计算,并返回一个CompletableFuture。CompletableFuture.runAsync(Runnable runnable): 使用提供的Runnable异步执行任务,不返回结果。CompletableFuture.completedFuture(T value): 创建一个已经完成的CompletableFuture,其结果为给定的值。
组合CompletableFuture:
thenApply(Function<T, U> fn): 在CompletableFuture完成后,对结果应用一个函数。thenAccept(Consumer<T> consumer): 在CompletableFuture完成后,对结果应用一个Consumer。thenRun(Runnable runnable): 在CompletableFuture完成后,执行一个Runnable。thenCompose(Function<T, CompletableFuture<U>> fn): 在CompletableFuture完成后,应用一个函数,该函数返回另一个CompletableFuture。thenCombine(CompletionStage<U> other, BiFunction<T, U, V> fn): 将当前CompletableFuture与另一个CompletionStage组合,当两者都完成后,应用一个BiFunction。thenAcceptBoth(CompletionStage<U> other, BiConsumer<T, U> consumer): 将当前CompletableFuture与另一个CompletionStage组合,当两者都完成后,应用一个BiConsumer。runAfterBoth(CompletionStage<?> other, Runnable runnable): 将当前CompletableFuture与另一个CompletionStage组合,当两者都完成后,执行一个Runnable。applyToEither(CompletionStage<T> other, Function<T, U> fn): 当当前CompletableFuture或另一个CompletionStage中的任何一个完成时,应用一个函数。acceptEither(CompletionStage<T> other, Consumer<T> consumer): 当当前CompletableFuture或另一个CompletionStage中的任何一个完成时,应用一个Consumer。runAfterEither(CompletionStage<?> other, Runnable runnable): 当当前CompletableFuture或另一个CompletionStage中的任何一个完成时,执行一个Runnable。
二、异步流处理
CompletableFuture可以与Java 8的Stream API结合使用,实现高效的异步流处理。这种方式特别适合处理需要并行执行的任务,例如从多个数据源获取数据,或者对大量数据进行并行处理。
示例:并行下载多个URL的内容
假设我们需要从多个URL下载内容,并对下载的内容进行处理。使用CompletableFuture和Stream API,我们可以并行地下载这些内容,并异步地处理结果。
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class AsyncStreamProcessing {
public static void main(String[] args) {
List<String> urls = Arrays.asList(
"https://www.example.com",
"https://www.google.com",
"https://www.baidu.com"
);
List<CompletableFuture<String>> futures = urls.stream()
.map(url -> CompletableFuture.supplyAsync(() -> downloadContent(url)))
.collect(Collectors.toList());
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<List<String>> contentsFuture = allFutures.thenApply(v -> {
return futures.stream()
.map(CompletableFuture::join) //join()等待并获取结果,会阻塞线程
.collect(Collectors.toList());
});
contentsFuture.thenAccept(contents -> {
contents.forEach(content -> System.out.println("Content Length: " + content.length()));
}).join();
}
private static String downloadContent(String url) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder().uri(URI.create(url)).build();
try {
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
return response.body();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
代码解释:
- 创建CompletableFuture列表:
urls.stream().map(url -> CompletableFuture.supplyAsync(() -> downloadContent(url))).collect(Collectors.toList());这部分代码将URL列表转换为CompletableFuture列表。CompletableFuture.supplyAsync()用于异步下载每个URL的内容。 - 等待所有CompletableFuture完成:
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));CompletableFuture.allOf()返回一个新的CompletableFuture,它在所有输入的CompletableFuture都完成后完成。 - 获取所有内容:
futures.stream().map(CompletableFuture::join).collect(Collectors.toList());这部分代码获取所有已完成的CompletableFuture的结果。join()方法会阻塞当前线程,直到CompletableFuture完成。 - 处理内容:
contentsFuture.thenAccept(contents -> { contents.forEach(content -> System.out.println("Content Length: " + content.length())); }).join();这部分代码在所有内容下载完成后,打印每个内容的长度。
关键点:
- 使用
CompletableFuture.supplyAsync()可以异步地执行下载任务。 CompletableFuture.allOf()用于等待所有异步任务完成。CompletableFuture::join用于获取异步任务的结果,注意join()会阻塞线程,在异步流处理中一般用在最后的结果获取处。
三、异常处理
CompletableFuture提供了强大的异常处理机制,可以优雅地处理异步计算中可能出现的异常。
异常处理方法:
exceptionally(Function<Throwable, T> fn): 如果CompletableFuture抛出异常,则应用提供的函数来返回一个备用值。handle(BiFunction<T, Throwable, U> fn): 无论CompletableFuture是正常完成还是抛出异常,都应用提供的函数。whenComplete(BiConsumer<T, Throwable> action): 无论CompletableFuture是正常完成还是抛出异常,都执行提供的操作。
示例:处理下载内容时可能发生的异常
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class AsyncExceptionHandling {
public static void main(String[] args) {
List<String> urls = Arrays.asList(
"https://www.example.com",
"https://www.google.com",
"https://www.baidu.com",
"https://invalid-url" // 模拟一个无效的URL
);
List<CompletableFuture<String>> futures = urls.stream()
.map(url -> CompletableFuture.supplyAsync(() -> downloadContent(url))
.exceptionally(ex -> {
System.err.println("Error downloading " + url + ": " + ex.getMessage());
return "Error: " + ex.getMessage(); // 返回一个错误消息作为备用值
}))
.collect(Collectors.toList());
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<List<String>> contentsFuture = allFutures.thenApply(v -> {
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
});
contentsFuture.thenAccept(contents -> {
contents.forEach(content -> System.out.println("Content: " + content));
}).join();
}
private static String downloadContent(String url) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder().uri(URI.create(url)).build();
try {
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
return response.body();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
代码解释:
- 使用
exceptionally()处理异常:.exceptionally(ex -> { ... })这部分代码使用exceptionally()方法来处理downloadContent()方法中可能抛出的异常。如果下载过程中发生异常,exceptionally()方法会捕获该异常,并返回一个错误消息作为备用值。 - 打印错误信息: 在
exceptionally()方法中,我们打印了错误信息,以便于调试。 - 返回备用值:
return "Error: " + ex.getMessage();如果发生异常,我们返回一个错误消息作为备用值,这样可以保证CompletableFuture始终返回一个结果,即使该结果是一个错误消息。
其他异常处理方式:
- 使用
handle():handle()方法可以同时处理正常结果和异常。
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Simulated error");
}
return "Success";
}).handle((result, ex) -> {
if (ex != null) {
System.err.println("Error: " + ex.getMessage());
return "Fallback Value";
} else {
return result;
}
}).thenAccept(System.out::println).join();
- 使用
whenComplete():whenComplete()方法可以执行一个操作,无论CompletableFuture是正常完成还是抛出异常。 它主要用于记录日志或执行清理操作。
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Simulated error");
}
return "Success";
}).whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("Completed with error: " + ex.getMessage());
} else {
System.out.println("Completed with result: " + result);
}
}).join();
| 方法 | 描述 | 用途 |
|---|---|---|
exceptionally |
如果CompletableFuture抛出异常,则应用提供的函数来返回一个备用值。 | 提供备用值,防止程序崩溃。 |
handle |
无论CompletableFuture是正常完成还是抛出异常,都应用提供的函数。 | 同时处理正常结果和异常。 |
whenComplete |
无论CompletableFuture是正常完成还是抛出异常,都执行提供的操作。 | 记录日志、执行清理操作。 |
四、定制化线程池
CompletableFuture默认使用ForkJoinPool.commonPool()作为其执行线程池。虽然默认线程池在很多情况下都足够使用,但在某些情况下,我们可能需要使用定制化的线程池,以便更好地控制并发度、线程优先级等。
创建定制化线程池:
我们可以使用ExecutorService接口来创建定制化的线程池。例如,我们可以使用ThreadPoolExecutor来创建一个固定大小的线程池。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CustomThreadPool {
public static void main(String[] args) throws InterruptedException {
// 创建一个固定大小的线程池,包含5个线程
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交10个任务到线程池
for (int i = 0; i < 10; i++) {
int taskNumber = i;
executor.submit(() -> {
System.out.println("Task " + taskNumber + " is running on thread: " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskNumber + " is completed.");
});
}
// 关闭线程池
executor.shutdown();
// 等待所有任务完成
executor.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("All tasks are completed.");
}
}
在CompletableFuture中使用定制化线程池:
在创建CompletableFuture时,我们可以将定制化的线程池作为参数传递给supplyAsync()、runAsync()等方法。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureWithCustomThreadPool {
public static void main(String[] args) {
// 创建一个包含2个线程的线程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 使用定制化的线程池异步执行任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Task is running on thread: " + Thread.currentThread().getName());
try {
Thread.sleep(2000); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task completed";
}, executor);
// 处理结果
future.thenAccept(result -> System.out.println("Result: " + result + " processed on thread: " + Thread.currentThread().getName()));
// 关闭线程池
executor.shutdown();
}
}
选择合适的线程池类型:
选择合适的线程池类型取决于具体的应用场景。
newFixedThreadPool(int nThreads): 创建一个固定大小的线程池,包含指定数量的线程。 适合处理CPU密集型任务,可以有效地限制并发度。newCachedThreadPool(): 创建一个可缓存的线程池,线程数量可以动态增长。 适合处理IO密集型任务,可以充分利用系统资源。newSingleThreadExecutor(): 创建一个单线程的线程池,所有任务按顺序执行。 适合处理需要顺序执行的任务。newScheduledThreadPool(int corePoolSize): 创建一个可以调度任务的线程池,可以定时或周期性地执行任务。
监控线程池状态:
可以使用ThreadPoolExecutor提供的方法来监控线程池的状态,例如:
getPoolSize():获取线程池的当前大小。getActiveCount():获取正在执行任务的线程数量。getCompletedTaskCount():获取已完成的任务数量。getQueue().size():获取等待队列中的任务数量。
五、实战案例:构建一个异步的数据聚合服务
现在,我们结合前面所学的知识,来实现一个稍微复杂一点的实战案例:构建一个异步的数据聚合服务。该服务从多个数据源获取数据,并将这些数据聚合在一起,最终返回给客户端。
场景描述:
假设我们需要从三个不同的数据源获取用户信息:
- DatabaseService: 从数据库获取用户基本信息。
- RemoteApiService: 从远程API获取用户详细信息。
- CacheService: 从缓存获取用户偏好设置。
我们需要并行地从这三个数据源获取数据,并将这些数据聚合在一起,最终返回一个包含所有用户信息的UserProfile对象。
代码实现:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class AsyncDataAggregation {
public static void main(String[] args) {
// 创建定制化的线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建数据源服务
DatabaseService databaseService = new DatabaseService();
RemoteApiService remoteApiService = new RemoteApiService();
CacheService cacheService = new CacheService();
// 异步获取数据
CompletableFuture<UserBasicInfo> basicInfoFuture = CompletableFuture.supplyAsync(() -> databaseService.getUserBasicInfo(123), executor);
CompletableFuture<UserDetailInfo> detailInfoFuture = CompletableFuture.supplyAsync(() -> remoteApiService.getUserDetailInfo(123), executor);
CompletableFuture<UserPreferences> preferencesFuture = CompletableFuture.supplyAsync(() -> cacheService.getUserPreferences(123), executor);
// 聚合数据
CompletableFuture<UserProfile> userProfileFuture = basicInfoFuture.thenCombine(detailInfoFuture, (basicInfo, detailInfo) -> {
UserProfile profile = new UserProfile();
profile.setUserId(basicInfo.getUserId());
profile.setUserName(basicInfo.getUserName());
profile.setAge(basicInfo.getAge());
profile.setEmail(detailInfo.getEmail());
profile.setPhoneNumber(detailInfo.getPhoneNumber());
return profile;
}).thenCombine(preferencesFuture, (profile, preferences) -> {
profile.setPreferences(preferences);
return profile;
});
// 处理结果
userProfileFuture.thenAccept(profile -> {
System.out.println("User Profile: " + profile);
}).exceptionally(ex -> {
System.err.println("Error aggregating data: " + ex.getMessage());
return null;
}).join();
// 关闭线程池
executor.shutdown();
}
}
// 模拟数据源服务
class DatabaseService {
public UserBasicInfo getUserBasicInfo(int userId) {
System.out.println("Fetching basic info from database for user: " + userId + " on thread: " + Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
UserBasicInfo info = new UserBasicInfo();
info.setUserId(userId);
info.setUserName("John Doe");
info.setAge(30);
return info;
}
}
class RemoteApiService {
public UserDetailInfo getUserDetailInfo(int userId) {
System.out.println("Fetching detail info from remote API for user: " + userId + " on thread: " + Thread.currentThread().getName());
try {
Thread.sleep(800);
} catch (InterruptedException e) {
e.printStackTrace();
}
UserDetailInfo info = new UserDetailInfo();
info.setEmail("[email protected]");
info.setPhoneNumber("123-456-7890");
return info;
}
}
class CacheService {
public UserPreferences getUserPreferences(int userId) {
System.out.println("Fetching preferences from cache for user: " + userId + " on thread: " + Thread.currentThread().getName());
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
UserPreferences preferences = new UserPreferences();
preferences.setTheme("dark");
preferences.setLanguage("en");
return preferences;
}
}
// 定义数据模型
class UserBasicInfo {
private int userId;
private String userName;
private int age;
// Getters and setters
public int getUserId() { return userId; }
public void setUserId(int userId) { this.userId = userId; }
public String getUserName() { return userName; }
public void setUserName(String userName) { this.userName = userName; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
@Override
public String toString() {
return "UserBasicInfo{" +
"userId=" + userId +
", userName='" + userName + ''' +
", age=" + age +
'}';
}
}
class UserDetailInfo {
private String email;
private String phoneNumber;
// Getters and setters
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getPhoneNumber() { return phoneNumber; }
public void setPhoneNumber(String phoneNumber) { this.phoneNumber = phoneNumber; }
@Override
public String toString() {
return "UserDetailInfo{" +
"email='" + email + ''' +
", phoneNumber='" + phoneNumber + ''' +
'}';
}
}
class UserPreferences {
private String theme;
private String language;
// Getters and setters
public String getTheme() { return theme; }
public void setTheme(String theme) { this.theme = theme; }
public String getLanguage() { return language; }
public void setLanguage(String language) { this.language = language; }
@Override
public String toString() {
return "UserPreferences{" +
"theme='" + theme + ''' +
", language='" + language + ''' +
'}';
}
}
class UserProfile {
private int userId;
private String userName;
private int age;
private String email;
private String phoneNumber;
private UserPreferences preferences;
// Getters and setters
public int getUserId() { return userId; }
public void setUserId(int userId) { this.userId = userId; }
public String getUserName() { return userName; }
public void setUserName(String userName) { this.userName = userName; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getPhoneNumber() { return phoneNumber; }
public void setPhoneNumber(String phoneNumber) { this.phoneNumber = phoneNumber; }
public UserPreferences getPreferences() { return preferences; }
public void setPreferences(UserPreferences preferences) { this.preferences = preferences; }
@Override
public String toString() {
return "UserProfile{" +
"userId=" + userId +
", userName='" + userName + ''' +
", age=" + age +
", email='" + email + ''' +
", phoneNumber='" + phoneNumber + ''' +
", preferences=" + preferences +
'}';
}
}
代码解释:
- 创建定制化的线程池:
ExecutorService executor = Executors.newFixedThreadPool(3);我们创建了一个包含3个线程的线程池,用于并行地执行数据获取任务。 - 异步获取数据: 使用
CompletableFuture.supplyAsync()从三个数据源异步获取数据。 - 聚合数据: 使用
thenCombine()方法将三个CompletableFuture的结果聚合在一起。thenCombine()方法会在两个CompletableFuture都完成后执行,并将两个结果作为参数传递给BiFunction。 - 处理结果: 使用
thenAccept()方法处理最终的UserProfile对象。 使用exceptionally()方法处理可能发生的异常。 - 关闭线程池:
executor.shutdown();在所有任务完成后,关闭线程池。
总结
CompletableFuture为Java异步编程提供了强大的工具,能够简化异步流处理,优雅地处理异常,并支持定制化的线程池配置。掌握这些高级特性,可以编写出更高效、更健壮的并发程序,提高系统的性能和响应能力。
核心要点回顾
- CompletableFuture结合Stream API 可以高效进行异步流处理。
- 使用exceptionally、handle、whenComplete等方法可以优雅地处理异步任务中的异常。
- 通过定制化线程池,可以更好地控制并发度,优化资源利用,提升系统性能。