CompletableFuture 超时后资源释放:orTimeout 与自定义 ThreadPoolExecutor
各位同学,大家好。今天我们来深入探讨 CompletableFuture 中的超时机制,特别是 orTimeout 算子,以及它与自定义 ThreadPoolExecutor 的协同工作,以及如何确保超时后资源的正确释放。
CompletableFuture 是 Java 并发编程中一个强大的工具,它允许我们以非阻塞的方式进行异步编程。orTimeout 算子为我们提供了一种优雅的方式来处理超时场景,但如果不小心,它可能会导致资源泄露。
1. CompletableFuture 与 orTimeout 简介
CompletableFuture 代表一个异步计算的结果。它允许我们在计算完成时执行回调,组合多个异步操作,并处理异常。
orTimeout(long timeout, TimeUnit unit) 是 CompletableFuture 提供的一个方法,它返回一个新的 CompletableFuture,该 CompletableFuture 在原始 CompletableFuture 完成之前,如果超过指定的 timeout 时间,则会以 TimeoutException 异常完成。
简单示例:
import java.util.concurrent.*;
public class OrTimeoutExample {
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
// 模拟一个耗时操作
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result";
});
CompletableFuture<String> timeoutFuture = future.orTimeout(2, TimeUnit.SECONDS);
try {
String result = timeoutFuture.get();
System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
System.err.println("Exception: " + e.getMessage());
if (e.getCause() instanceof TimeoutException) {
System.err.println("Timeout occurred!");
}
}
}
}
在这个例子中,如果 future 在 2 秒内没有完成,timeoutFuture 将抛出 TimeoutException。
2. 资源泄露的潜在问题
虽然 orTimeout 看起来很简单,但它隐藏了一个潜在的问题:原始 CompletableFuture 对应的任务可能仍然在后台运行,即使 orTimeout 已经触发了超时异常。 这会导致资源泄露,例如线程资源、数据库连接、文件句柄等。
考虑以下场景:
import java.util.concurrent.*;
public class ResourceLeakExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(1);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Starting long-running task...");
try {
// 模拟一个耗时操作,例如数据库查询
Thread.sleep(10000);
System.out.println("Long-running task completed."); //可能永远不会执行
} catch (InterruptedException e) {
System.out.println("Long-running task interrupted."); //可能执行
Thread.currentThread().interrupt();
}
return "Result";
}, executor);
CompletableFuture<String> timeoutFuture = future.orTimeout(2, TimeUnit.SECONDS);
try {
String result = timeoutFuture.get();
System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
System.err.println("Exception: " + e.getMessage());
if (e.getCause() instanceof TimeoutException) {
System.err.println("Timeout occurred!");
}
}
// 等待一段时间,观察后台线程是否还在运行
Thread.sleep(5000);
System.out.println("Main thread completed.");
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
}
在这个例子中,即使 timeoutFuture 抛出了 TimeoutException,原始的 future 对应的任务仍然在 executor 中运行。这意味着线程资源被占用,即使我们不再需要它的结果。
3. 如何解决资源泄露问题
解决资源泄露问题的关键是:在 orTimeout 触发超时后,我们需要主动取消原始的 CompletableFuture 对应的任务。
CompletableFuture 提供了 cancel(boolean mayInterruptIfRunning) 方法来取消任务。我们需要确保在超时发生时调用此方法。
以下是一些常用的解决方案:
3.1. 使用 completeOnTimeout 取代 orTimeout 并手动取消
completeOnTimeout(T value, long timeout, TimeUnit unit) 方法在超时时不会抛出异常,而是使用给定的 value 完成 CompletableFuture。 我们可以结合 completeOnTimeout 和 cancel 方法来实现超时取消。
import java.util.concurrent.*;
public class CompleteOnTimeoutExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(1);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Starting long-running task...");
try {
Thread.sleep(10000);
System.out.println("Long-running task completed.");
} catch (InterruptedException e) {
System.out.println("Long-running task interrupted.");
Thread.currentThread().interrupt();
}
return "Result";
}, executor);
CompletableFuture<String> timeoutFuture = future.completeOnTimeout("Timeout Value", 2, TimeUnit.SECONDS);
try {
String result = timeoutFuture.get();
System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
System.err.println("Exception: " + e.getMessage());
}
if (!future.isDone()) {
System.out.println("Cancelling the task...");
future.cancel(true); // 尝试中断任务
}
Thread.sleep(5000);
System.out.println("Main thread completed.");
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
}
在这个例子中,如果 future 在 2 秒内没有完成,timeoutFuture 将使用 "Timeout Value" 完成。 然后,我们检查 future 是否已完成,如果未完成,则调用 cancel(true) 尝试中断任务。
3.2. 使用 exceptionally 结合 cancel
我们可以使用 exceptionally 方法来处理 TimeoutException,并在其中调用 cancel 方法。
import java.util.concurrent.*;
public class ExceptionallyCancelExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(1);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Starting long-running task...");
try {
Thread.sleep(10000);
System.out.println("Long-running task completed.");
} catch (InterruptedException e) {
System.out.println("Long-running task interrupted.");
Thread.currentThread().interrupt();
}
return "Result";
}, executor);
CompletableFuture<String> timeoutFuture = future.orTimeout(2, TimeUnit.SECONDS)
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
System.out.println("Timeout occurred. Cancelling the task...");
future.cancel(true); // 尝试中断任务
return "Timeout Value"; // 提供一个默认值
} else {
// 处理其他异常
System.err.println("Other exception: " + ex.getMessage());
return null; // 或者抛出异常
}
});
try {
String result = timeoutFuture.get();
System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
System.err.println("Exception: " + e.getMessage());
}
Thread.sleep(5000);
System.out.println("Main thread completed.");
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
}
在这个例子中,exceptionally 方法会在 orTimeout 抛出 TimeoutException 时被调用。 我们在 exceptionally 方法中调用 cancel(true) 尝试中断任务,并返回一个默认值。
3.3. 使用自定义 ThreadPoolExecutor 进行更精细的控制
如果我们需要对线程池进行更精细的控制,例如监控线程池的状态、记录任务执行时间等,可以使用自定义 ThreadPoolExecutor。
我们可以创建一个自定义的 ThreadPoolExecutor,并在任务超时后,从线程池中移除该任务。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomThreadPoolExecutorExample {
public static void main(String[] args) throws Exception {
// 创建一个自定义的 ThreadPoolExecutor
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, // corePoolSize
1, // maximumPoolSize
60L, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<>(), // workQueue
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "CustomThread-" + counter.incrementAndGet());
}
}
) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null) {
System.err.println("Task threw an exception: " + t.getMessage());
} else {
// 任务完成后的处理逻辑
System.out.println("Task completed successfully.");
}
}
};
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Starting long-running task...");
try {
Thread.sleep(10000);
System.out.println("Long-running task completed.");
} catch (InterruptedException e) {
System.out.println("Long-running task interrupted.");
Thread.currentThread().interrupt();
}
return "Result";
}, executor);
CompletableFuture<String> timeoutFuture = future.orTimeout(2, TimeUnit.SECONDS)
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
System.out.println("Timeout occurred. Cancelling the task...");
future.cancel(true); // 尝试中断任务
// 从线程池中移除该任务 (不推荐,因为 future.cancel 已经做了中断操作)
// executor.remove((Runnable) future);
return "Timeout Value"; // 提供一个默认值
} else {
// 处理其他异常
System.err.println("Other exception: " + ex.getMessage());
return null; // 或者抛出异常
}
});
try {
String result = timeoutFuture.get();
System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
System.err.println("Exception: " + e.getMessage());
}
Thread.sleep(5000);
System.out.println("Main thread completed.");
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
}
在这个例子中,我们创建了一个自定义的 ThreadPoolExecutor,并重写了 afterExecute 方法,以便在任务执行完成后进行一些清理工作。 虽然这里尝试 executor.remove 操作,但是通常情况下,future.cancel(true) 已经尽力中断线程了, remove 并不总是有效,因为任务可能已经执行完成或正在执行中。 更好的做法是在任务内部,根据中断标志 Thread.currentThread().isInterrupted() 来提前结束任务。
4. 最佳实践
以下是一些使用 orTimeout 和 CompletableFuture 的最佳实践:
- 始终考虑超时后资源的释放问题。 确保在超时发生时,能够及时取消原始任务,释放占用的资源。
- 优先使用
completeOnTimeout结合cancel。completeOnTimeout避免了抛出异常,使得代码更加简洁。 - 使用
exceptionally处理TimeoutException。 在exceptionally中调用cancel方法,并提供一个默认值。 - 使用自定义
ThreadPoolExecutor进行更精细的控制。 如果需要对线程池进行更精细的控制,可以使用自定义ThreadPoolExecutor。 - 在长时间运行的任务中,定期检查中断标志。 即使使用了
cancel(true)尝试中断任务,也无法保证任务立即停止。 因此,在长时间运行的任务中,应该定期检查中断标志Thread.currentThread().isInterrupted(),并及时结束任务。 - 避免在
finally块中执行耗时操作。 如果需要在finally块中执行耗时操作,应该使用单独的线程来执行,以避免阻塞主线程。
5. 代码示例:结合中断标志的完整示例
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class InterruptFlagExample {
public static void main(String[] args) throws Exception {
// 创建一个自定义的 ThreadPoolExecutor
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, // corePoolSize
1, // maximumPoolSize
60L, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<>(), // workQueue
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "CustomThread-" + counter.incrementAndGet());
}
}
) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null) {
System.err.println("Task threw an exception: " + t.getMessage());
} else {
// 任务完成后的处理逻辑
System.out.println("Task completed successfully.");
}
}
};
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Starting long-running task...");
long startTime = System.currentTimeMillis();
try {
// 模拟一个耗时操作,例如数据库查询
while (!Thread.currentThread().isInterrupted()) {
if (System.currentTimeMillis() - startTime > 10000) {
System.out.println("Long-running task completed normally (after timeout period).");
return "Result";
}
Thread.sleep(100); // 模拟工作
}
System.out.println("Long-running task interrupted."); // 任务被中断
return "Interrupted Result";
} catch (InterruptedException e) {
System.out.println("Long-running task interrupted.");
Thread.currentThread().interrupt(); // 重新设置中断标志
return "Interrupted Result";
} finally {
System.out.println("Long-running task finally block executed.");
}
}, executor);
CompletableFuture<String> timeoutFuture = future.orTimeout(2, TimeUnit.SECONDS)
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
System.out.println("Timeout occurred. Cancelling the task...");
boolean cancelled = future.cancel(true); // 尝试中断任务
System.out.println("Cancel result: " + cancelled);
return "Timeout Value"; // 提供一个默认值
} else {
// 处理其他异常
System.err.println("Other exception: " + ex.getMessage());
return null; // 或者抛出异常
}
});
try {
String result = timeoutFuture.get();
System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
System.err.println("Exception: " + e.getMessage());
}
Thread.sleep(5000);
System.out.println("Main thread completed.");
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
}
6. orTimeout 与 cancel(true) 的关系
orTimeout 内部实现会创建一个新的 CompletableFuture,如果原始 CompletableFuture 超时,它会使用 completeExceptionally(new TimeoutException()) 完成新的 CompletableFuture。 而 cancel(true) 则是尝试中断执行任务的线程。 cancel(true) 并不保证任务立即停止,它只是向线程发送一个中断信号。 线程是否响应中断信号取决于任务的具体实现。 因此,我们需要在任务内部定期检查中断标志 Thread.currentThread().isInterrupted(),并及时结束任务。
7. 总结
orTimeout 是 CompletableFuture 中一个方便的超时处理工具,但需要注意超时后的资源释放问题。 通过使用 completeOnTimeout、exceptionally 结合 cancel,或者自定义 ThreadPoolExecutor,并结合中断标志,我们可以有效地解决资源泄露问题,确保程序的健壮性。
确保任务取消与资源释放
总而言之,使用 CompletableFuture 和 orTimeout 时,必须仔细考虑超时后的资源释放问题。 结合 cancel(true) 中断线程,并定期检查中断标志是确保任务及时停止的关键。 结合自定义 ThreadPoolExecutor 可以进行更精细的控制,但需要谨慎使用 remove 方法。