CompletableFuture超时后资源无法释放?orTimeout算子与自定义ThreadPoolExecutor回收

CompletableFuture 超时后资源释放:orTimeout 与自定义 ThreadPoolExecutor

各位同学,大家好。今天我们来深入探讨 CompletableFuture 中的超时机制,特别是 orTimeout 算子,以及它与自定义 ThreadPoolExecutor 的协同工作,以及如何确保超时后资源的正确释放。

CompletableFuture 是 Java 并发编程中一个强大的工具,它允许我们以非阻塞的方式进行异步编程。orTimeout 算子为我们提供了一种优雅的方式来处理超时场景,但如果不小心,它可能会导致资源泄露。

1. CompletableFutureorTimeout 简介

CompletableFuture 代表一个异步计算的结果。它允许我们在计算完成时执行回调,组合多个异步操作,并处理异常。

orTimeout(long timeout, TimeUnit unit)CompletableFuture 提供的一个方法,它返回一个新的 CompletableFuture,该 CompletableFuture 在原始 CompletableFuture 完成之前,如果超过指定的 timeout 时间,则会以 TimeoutException 异常完成。

简单示例:

import java.util.concurrent.*;

public class OrTimeoutExample {

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟一个耗时操作
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Result";
        });

        CompletableFuture<String> timeoutFuture = future.orTimeout(2, TimeUnit.SECONDS);

        try {
            String result = timeoutFuture.get();
            System.out.println("Result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("Exception: " + e.getMessage());
            if (e.getCause() instanceof TimeoutException) {
                System.err.println("Timeout occurred!");
            }
        }
    }
}

在这个例子中,如果 future 在 2 秒内没有完成,timeoutFuture 将抛出 TimeoutException

2. 资源泄露的潜在问题

虽然 orTimeout 看起来很简单,但它隐藏了一个潜在的问题:原始 CompletableFuture 对应的任务可能仍然在后台运行,即使 orTimeout 已经触发了超时异常。 这会导致资源泄露,例如线程资源、数据库连接、文件句柄等。

考虑以下场景:

import java.util.concurrent.*;

public class ResourceLeakExample {

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(1);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Starting long-running task...");
            try {
                // 模拟一个耗时操作,例如数据库查询
                Thread.sleep(10000);
                System.out.println("Long-running task completed."); //可能永远不会执行
            } catch (InterruptedException e) {
                System.out.println("Long-running task interrupted."); //可能执行
                Thread.currentThread().interrupt();
            }
            return "Result";
        }, executor);

        CompletableFuture<String> timeoutFuture = future.orTimeout(2, TimeUnit.SECONDS);

        try {
            String result = timeoutFuture.get();
            System.out.println("Result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("Exception: " + e.getMessage());
            if (e.getCause() instanceof TimeoutException) {
                System.err.println("Timeout occurred!");
            }
        }

        // 等待一段时间,观察后台线程是否还在运行
        Thread.sleep(5000);
        System.out.println("Main thread completed.");
        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);
    }
}

在这个例子中,即使 timeoutFuture 抛出了 TimeoutException,原始的 future 对应的任务仍然在 executor 中运行。这意味着线程资源被占用,即使我们不再需要它的结果。

3. 如何解决资源泄露问题

解决资源泄露问题的关键是:orTimeout 触发超时后,我们需要主动取消原始的 CompletableFuture 对应的任务。

CompletableFuture 提供了 cancel(boolean mayInterruptIfRunning) 方法来取消任务。我们需要确保在超时发生时调用此方法。

以下是一些常用的解决方案:

3.1. 使用 completeOnTimeout 取代 orTimeout 并手动取消

completeOnTimeout(T value, long timeout, TimeUnit unit) 方法在超时时不会抛出异常,而是使用给定的 value 完成 CompletableFuture。 我们可以结合 completeOnTimeoutcancel 方法来实现超时取消。

import java.util.concurrent.*;

public class CompleteOnTimeoutExample {

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(1);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Starting long-running task...");
            try {
                Thread.sleep(10000);
                System.out.println("Long-running task completed.");
            } catch (InterruptedException e) {
                System.out.println("Long-running task interrupted.");
                Thread.currentThread().interrupt();
            }
            return "Result";
        }, executor);

        CompletableFuture<String> timeoutFuture = future.completeOnTimeout("Timeout Value", 2, TimeUnit.SECONDS);

        try {
            String result = timeoutFuture.get();
            System.out.println("Result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("Exception: " + e.getMessage());
        }

        if (!future.isDone()) {
            System.out.println("Cancelling the task...");
            future.cancel(true); // 尝试中断任务
        }

        Thread.sleep(5000);
        System.out.println("Main thread completed.");
        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);
    }
}

在这个例子中,如果 future 在 2 秒内没有完成,timeoutFuture 将使用 "Timeout Value" 完成。 然后,我们检查 future 是否已完成,如果未完成,则调用 cancel(true) 尝试中断任务。

3.2. 使用 exceptionally 结合 cancel

我们可以使用 exceptionally 方法来处理 TimeoutException,并在其中调用 cancel 方法。

import java.util.concurrent.*;

public class ExceptionallyCancelExample {

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(1);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Starting long-running task...");
            try {
                Thread.sleep(10000);
                System.out.println("Long-running task completed.");
            } catch (InterruptedException e) {
                System.out.println("Long-running task interrupted.");
                Thread.currentThread().interrupt();
            }
            return "Result";
        }, executor);

        CompletableFuture<String> timeoutFuture = future.orTimeout(2, TimeUnit.SECONDS)
                .exceptionally(ex -> {
                    if (ex instanceof TimeoutException) {
                        System.out.println("Timeout occurred. Cancelling the task...");
                        future.cancel(true); // 尝试中断任务
                        return "Timeout Value"; // 提供一个默认值
                    } else {
                        // 处理其他异常
                        System.err.println("Other exception: " + ex.getMessage());
                        return null; // 或者抛出异常
                    }
                });

        try {
            String result = timeoutFuture.get();
            System.out.println("Result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("Exception: " + e.getMessage());
        }

        Thread.sleep(5000);
        System.out.println("Main thread completed.");
        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);
    }
}

在这个例子中,exceptionally 方法会在 orTimeout 抛出 TimeoutException 时被调用。 我们在 exceptionally 方法中调用 cancel(true) 尝试中断任务,并返回一个默认值。

3.3. 使用自定义 ThreadPoolExecutor 进行更精细的控制

如果我们需要对线程池进行更精细的控制,例如监控线程池的状态、记录任务执行时间等,可以使用自定义 ThreadPoolExecutor

我们可以创建一个自定义的 ThreadPoolExecutor,并在任务超时后,从线程池中移除该任务。

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomThreadPoolExecutorExample {

    public static void main(String[] args) throws Exception {
        // 创建一个自定义的 ThreadPoolExecutor
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                1, // corePoolSize
                1, // maximumPoolSize
                60L, // keepAliveTime
                TimeUnit.SECONDS, // unit
                new LinkedBlockingQueue<>(), // workQueue
                new ThreadFactory() {
                    private final AtomicInteger counter = new AtomicInteger(0);

                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "CustomThread-" + counter.incrementAndGet());
                    }
                }
        ) {
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);
                if (t != null) {
                    System.err.println("Task threw an exception: " + t.getMessage());
                } else {
                    //  任务完成后的处理逻辑
                    System.out.println("Task completed successfully.");
                }
            }
        };

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Starting long-running task...");
            try {
                Thread.sleep(10000);
                System.out.println("Long-running task completed.");
            } catch (InterruptedException e) {
                System.out.println("Long-running task interrupted.");
                Thread.currentThread().interrupt();
            }
            return "Result";
        }, executor);

        CompletableFuture<String> timeoutFuture = future.orTimeout(2, TimeUnit.SECONDS)
                .exceptionally(ex -> {
                    if (ex instanceof TimeoutException) {
                        System.out.println("Timeout occurred. Cancelling the task...");
                        future.cancel(true); // 尝试中断任务

                        // 从线程池中移除该任务 (不推荐,因为 future.cancel 已经做了中断操作)
                        // executor.remove((Runnable) future);
                        return "Timeout Value"; // 提供一个默认值
                    } else {
                        // 处理其他异常
                        System.err.println("Other exception: " + ex.getMessage());
                        return null; // 或者抛出异常
                    }
                });

        try {
            String result = timeoutFuture.get();
            System.out.println("Result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("Exception: " + e.getMessage());
        }

        Thread.sleep(5000);
        System.out.println("Main thread completed.");
        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);
    }
}

在这个例子中,我们创建了一个自定义的 ThreadPoolExecutor,并重写了 afterExecute 方法,以便在任务执行完成后进行一些清理工作。 虽然这里尝试 executor.remove 操作,但是通常情况下,future.cancel(true) 已经尽力中断线程了, remove 并不总是有效,因为任务可能已经执行完成或正在执行中。 更好的做法是在任务内部,根据中断标志 Thread.currentThread().isInterrupted() 来提前结束任务。

4. 最佳实践

以下是一些使用 orTimeoutCompletableFuture 的最佳实践:

  • 始终考虑超时后资源的释放问题。 确保在超时发生时,能够及时取消原始任务,释放占用的资源。
  • 优先使用 completeOnTimeout 结合 cancel completeOnTimeout 避免了抛出异常,使得代码更加简洁。
  • 使用 exceptionally 处理 TimeoutExceptionexceptionally 中调用 cancel 方法,并提供一个默认值。
  • 使用自定义 ThreadPoolExecutor 进行更精细的控制。 如果需要对线程池进行更精细的控制,可以使用自定义 ThreadPoolExecutor
  • 在长时间运行的任务中,定期检查中断标志。 即使使用了 cancel(true) 尝试中断任务,也无法保证任务立即停止。 因此,在长时间运行的任务中,应该定期检查中断标志 Thread.currentThread().isInterrupted(),并及时结束任务。
  • 避免在 finally 块中执行耗时操作。 如果需要在 finally 块中执行耗时操作,应该使用单独的线程来执行,以避免阻塞主线程。

5. 代码示例:结合中断标志的完整示例

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class InterruptFlagExample {

    public static void main(String[] args) throws Exception {
        // 创建一个自定义的 ThreadPoolExecutor
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                1, // corePoolSize
                1, // maximumPoolSize
                60L, // keepAliveTime
                TimeUnit.SECONDS, // unit
                new LinkedBlockingQueue<>(), // workQueue
                new ThreadFactory() {
                    private final AtomicInteger counter = new AtomicInteger(0);

                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "CustomThread-" + counter.incrementAndGet());
                    }
                }
        ) {
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);
                if (t != null) {
                    System.err.println("Task threw an exception: " + t.getMessage());
                } else {
                    //  任务完成后的处理逻辑
                    System.out.println("Task completed successfully.");
                }
            }
        };

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Starting long-running task...");
            long startTime = System.currentTimeMillis();
            try {
                // 模拟一个耗时操作,例如数据库查询
                while (!Thread.currentThread().isInterrupted()) {
                    if (System.currentTimeMillis() - startTime > 10000) {
                        System.out.println("Long-running task completed normally (after timeout period).");
                        return "Result";
                    }
                    Thread.sleep(100); // 模拟工作
                }
                System.out.println("Long-running task interrupted."); // 任务被中断
                return "Interrupted Result";

            } catch (InterruptedException e) {
                System.out.println("Long-running task interrupted.");
                Thread.currentThread().interrupt(); // 重新设置中断标志
                return "Interrupted Result";
            } finally {
                System.out.println("Long-running task finally block executed.");
            }
        }, executor);

        CompletableFuture<String> timeoutFuture = future.orTimeout(2, TimeUnit.SECONDS)
                .exceptionally(ex -> {
                    if (ex instanceof TimeoutException) {
                        System.out.println("Timeout occurred. Cancelling the task...");
                        boolean cancelled = future.cancel(true); // 尝试中断任务
                        System.out.println("Cancel result: " + cancelled);
                        return "Timeout Value"; // 提供一个默认值
                    } else {
                        // 处理其他异常
                        System.err.println("Other exception: " + ex.getMessage());
                        return null; // 或者抛出异常
                    }
                });

        try {
            String result = timeoutFuture.get();
            System.out.println("Result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("Exception: " + e.getMessage());
        }

        Thread.sleep(5000);
        System.out.println("Main thread completed.");
        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);
    }
}

6. orTimeoutcancel(true) 的关系

orTimeout 内部实现会创建一个新的 CompletableFuture,如果原始 CompletableFuture 超时,它会使用 completeExceptionally(new TimeoutException()) 完成新的 CompletableFuture。 而 cancel(true) 则是尝试中断执行任务的线程。 cancel(true) 并不保证任务立即停止,它只是向线程发送一个中断信号。 线程是否响应中断信号取决于任务的具体实现。 因此,我们需要在任务内部定期检查中断标志 Thread.currentThread().isInterrupted(),并及时结束任务。

7. 总结

orTimeoutCompletableFuture 中一个方便的超时处理工具,但需要注意超时后的资源释放问题。 通过使用 completeOnTimeoutexceptionally 结合 cancel,或者自定义 ThreadPoolExecutor,并结合中断标志,我们可以有效地解决资源泄露问题,确保程序的健壮性。

确保任务取消与资源释放

总而言之,使用 CompletableFutureorTimeout 时,必须仔细考虑超时后的资源释放问题。 结合 cancel(true) 中断线程,并定期检查中断标志是确保任务及时停止的关键。 结合自定义 ThreadPoolExecutor 可以进行更精细的控制,但需要谨慎使用 remove 方法。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注