Java ThreadPoolExecutor 线程回收异常导致线程拒绝问题排查
各位朋友,大家好!今天我们来聊聊在使用 ThreadPoolExecutor 时,线程回收异常导致线程拒绝的问题。这个问题在实际开发中比较常见,而且往往排查起来比较棘手。我会从原理、现象、原因、排查方法和解决方案几个方面,结合代码示例,为大家详细讲解。
1. ThreadPoolExecutor 的基本原理
ThreadPoolExecutor 是 Java 并发包 java.util.concurrent 中一个非常重要的类,它提供了一个线程池的实现。理解 ThreadPoolExecutor 的工作原理是解决问题的关键。
线程池的核心参数:
| 参数名 | 类型 | 含义 |
|---|---|---|
corePoolSize |
int |
核心线程数。即使线程池空闲,也会保持的线程数量。 |
maximumPoolSize |
int |
最大线程数。线程池允许创建的最大线程数量。 |
keepAliveTime |
long |
空闲线程存活时间。当线程池中的线程数量超过 corePoolSize 时,空闲线程在超过这个时间后会被销毁。 |
unit |
TimeUnit |
keepAliveTime 的时间单位。 |
workQueue |
BlockingQueue<Runnable> |
任务队列。用于存放等待执行的任务。常用的队列有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue 等。 |
threadFactory |
ThreadFactory |
线程工厂。用于创建线程。可以自定义线程的名称、优先级等。 |
rejectedExecutionHandler |
RejectedExecutionHandler |
拒绝策略。当任务队列已满,且线程池中的线程数量达到 maximumPoolSize 时,新提交的任务会被拒绝。常用的拒绝策略有 AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。 |
ThreadPoolExecutor 的工作流程:
- 当提交一个新任务时,线程池首先会检查当前线程池中的线程数量是否小于
corePoolSize。 - 如果小于,则创建一个新的线程来执行该任务。
- 如果大于等于
corePoolSize,则将该任务添加到workQueue中。 - 如果
workQueue已满,且当前线程池中的线程数量小于maximumPoolSize,则创建一个新的线程来执行该任务。 - 如果
workQueue已满,且当前线程池中的线程数量大于等于maximumPoolSize,则执行rejectedExecutionHandler指定的拒绝策略。
2. 线程回收异常导致线程拒绝的现象
线程回收异常导致线程拒绝,通常表现为以下几种现象:
- 任务拒绝执行: 新提交的任务无法被线程池执行,
rejectedExecutionHandler被调用。 - 线程池饱和: 线程池中的线程数量达到
maximumPoolSize,并且workQueue已满。 - 程序运行缓慢: 由于任务被拒绝,整体处理速度下降。
- 异常日志: 可能会出现与线程池拒绝策略相关的异常日志,例如
RejectedExecutionException。
3. 线程回收异常的原因分析
线程回收异常的根本原因是:线程在执行任务的过程中抛出了未捕获的异常,导致线程死亡,但线程池并没有及时感知到,或者感知到后没有正确处理,最终导致线程池中的可用线程数量减少,当任务提交速度超过线程池的处理速度时,就会触发拒绝策略。
更详细的分析:
-
任务代码存在未捕获的异常: 这是最常见的原因。如果任务的
run()方法中抛出了未捕获的异常,线程会终止,但线程池本身可能不知道这个线程已经死了。ExecutorService executor = Executors.newFixedThreadPool(1); executor.execute(() -> { System.out.println("Task started"); throw new RuntimeException("Simulated exception in task"); //System.out.println("Task finished"); // 这行代码不会被执行 }); executor.shutdown();在这个例子中,
RuntimeException没有被捕获,线程会直接终止。 -
自定义 ThreadFactory 的问题: 如果使用了自定义的
ThreadFactory,并且在创建线程时没有正确处理异常,也可能导致线程死亡。ThreadFactory threadFactory = new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setUncaughtExceptionHandler((thread, throwable) -> { System.err.println("Uncaught exception in thread: " + thread.getName() + ", " + throwable.getMessage()); }); return t; } }; ExecutorService executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); executor.execute(() -> { System.out.println("Task started"); throw new RuntimeException("Simulated exception in task"); //System.out.println("Task finished"); // 这行代码不会被执行 }); executor.shutdown();这个例子展示了如何使用
UncaughtExceptionHandler来捕获线程中的未捕获异常,但即使捕获了异常,线程仍然会终止。关键在于,我们需要在捕获异常后,通知线程池,让线程池重新创建一个新的线程。 -
线程池配置不合理: 线程池的
corePoolSize和maximumPoolSize配置过小,或者workQueue容量太小,在高并发场景下容易导致线程池饱和,从而触发拒绝策略。 -
拒绝策略选择不当: 如果选择了
AbortPolicy,当任务被拒绝时,会直接抛出RejectedExecutionException,这可能会中断程序的正常流程。
4. 排查方法
排查线程回收异常导致线程拒绝的问题,需要从以下几个方面入手:
-
查看日志: 首先要查看程序的日志,特别是与线程池相关的日志,看看是否有
RejectedExecutionException或其他异常信息。 -
监控线程池状态: 使用
ThreadPoolExecutor提供的方法,监控线程池的状态,例如:getPoolSize():获取线程池中当前的线程数量。getActiveCount():获取正在执行任务的线程数量。getQueue().size():获取任务队列中等待执行的任务数量。getCompletedTaskCount():获取已完成的任务数量。getTaskCount():获取已提交的任务总数。
通过监控这些指标,可以了解线程池的运行状况,判断是否存在线程池饱和的情况。
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); // 模拟提交任务 for (int i = 0; i < 10; i++) { final int taskNumber = i; executor.execute(() -> { try { System.out.println("Task " + taskNumber + " started by " + Thread.currentThread().getName()); Thread.sleep(1000); // 模拟耗时操作 System.out.println("Task " + taskNumber + " finished by " + Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } }); } // 定时打印线程池状态 ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() -> { System.out.println("Pool Size: " + executor.getPoolSize()); System.out.println("Active Count: " + executor.getActiveCount()); System.out.println("Queue Size: " + executor.getQueue().size()); System.out.println("Completed Task Count: " + executor.getCompletedTaskCount()); System.out.println("Task Count: " + executor.getTaskCount()); }, 0, 1, TimeUnit.SECONDS); // 关闭线程池 executor.shutdown(); try { executor.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } scheduler.shutdown();这个例子展示了如何使用
ScheduledExecutorService定时打印线程池的状态信息。 -
检查任务代码: 仔细检查任务的
run()方法,确保所有可能抛出异常的地方都进行了捕获和处理。可以使用try-catch块来捕获异常,并记录异常信息。 -
自定义 ThreadFactory 和 UncaughtExceptionHandler: 使用自定义的
ThreadFactory,并设置UncaughtExceptionHandler,以便捕获线程中的未捕获异常。在UncaughtExceptionHandler中,可以记录异常信息,并尝试重新启动线程。 -
使用线程池监控工具: 可以使用一些线程池监控工具,例如
VisualVM、JConsole等,来监控线程池的运行状态。
5. 解决方案
针对线程回收异常导致线程拒绝的问题,可以采取以下解决方案:
-
在任务代码中捕获异常: 这是最基本的解决方案。在任务的
run()方法中使用try-catch块来捕获异常,并记录异常信息。executor.execute(() -> { try { System.out.println("Task started"); // 可能会抛出异常的代码 int result = 10 / 0; // 模拟除零异常 System.out.println("Task finished"); } catch (Exception e) { System.err.println("Exception in task: " + e.getMessage()); // 处理异常,例如记录日志、发送告警等 } }); -
使用自定义 ThreadFactory 和 UncaughtExceptionHandler: 使用自定义的
ThreadFactory,并设置UncaughtExceptionHandler,以便捕获线程中的未捕获异常。在UncaughtExceptionHandler中,可以记录异常信息,并尝试重新启动线程。ThreadFactory threadFactory = new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setUncaughtExceptionHandler((thread, throwable) -> { System.err.println("Uncaught exception in thread: " + thread.getName() + ", " + throwable.getMessage()); // 重新提交任务,或者创建一个新的线程来执行任务 // 这是一个简化示例,实际应用中需要考虑线程池的状态和任务的特性 executor.execute(r); }); return t; } }; ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); executor.execute(() -> { System.out.println("Task started"); throw new RuntimeException("Simulated exception in task"); //System.out.println("Task finished"); // 这行代码不会被执行 }); executor.shutdown();注意: 在
UncaughtExceptionHandler中重新提交任务或者创建新的线程时,需要谨慎考虑,避免出现死循环或者线程池过度膨胀的情况。需要根据实际情况,设置合适的重试次数或者延迟时间。 -
调整线程池配置: 根据实际情况,调整线程池的
corePoolSize、maximumPoolSize和workQueue的大小,以提高线程池的处理能力。- 如果任务是 CPU 密集型的,可以适当增加
corePoolSize和maximumPoolSize。 - 如果任务是 I/O 密集型的,可以适当增加
maximumPoolSize,并减少corePoolSize。 - 如果任务的提交速度很快,可以适当增加
workQueue的大小。
- 如果任务是 CPU 密集型的,可以适当增加
-
选择合适的拒绝策略: 根据实际情况,选择合适的拒绝策略。
AbortPolicy:直接抛出RejectedExecutionException。CallerRunsPolicy:由提交任务的线程来执行被拒绝的任务。DiscardPolicy:直接丢弃被拒绝的任务。DiscardOldestPolicy:丢弃任务队列中最旧的任务,然后将新任务添加到队列中。
一般来说,
CallerRunsPolicy是一个比较好的选择,它可以避免任务被直接丢弃,并且可以减缓任务的提交速度,从而缓解线程池的压力。 -
使用监控工具: 使用线程池监控工具,可以实时监控线程池的运行状态,及时发现问题。
6. 代码示例:一个更完整的解决方案
下面是一个更完整的解决方案,它结合了自定义 ThreadFactory、UncaughtExceptionHandler 和合适的拒绝策略,并加入了日志记录功能。
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ThreadPoolExceptionHandler {
private static final Logger LOGGER = Logger.getLogger(ThreadPoolExceptionHandler.class.getName());
public static void main(String[] args) {
ThreadFactory threadFactory = new ThreadFactory() {
private int counter = 0;
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Task-Thread-" + counter++);
thread.setUncaughtExceptionHandler(new CustomUncaughtExceptionHandler());
return thread;
}
};
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); // 使用 CallerRunsPolicy
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize
4, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100), // workQueue
threadFactory,
rejectedExecutionHandler
);
// 提交任务
for (int i = 0; i < 20; i++) {
final int taskNumber = i;
executor.execute(() -> {
try {
LOGGER.info("Task " + taskNumber + " started by " + Thread.currentThread().getName());
// 模拟耗时操作
Thread.sleep(500);
if (taskNumber % 5 == 0) {
throw new RuntimeException("Simulated exception in task " + taskNumber);
}
LOGGER.info("Task " + taskNumber + " finished by " + Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.log(Level.WARNING, "Task " + taskNumber + " interrupted", e);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception in task " + taskNumber, e);
throw e; // 抛出异常,让 UncaughtExceptionHandler 处理
}
});
}
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.log(Level.WARNING, "Interrupted while waiting for termination", e);
Thread.currentThread().interrupt();
}
LOGGER.info("All tasks submitted.");
}
static class CustomUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOGGER.log(Level.SEVERE, "Uncaught exception in thread " + t.getName(), e);
// 可以尝试重新提交任务,但是需要谨慎处理,避免死循环
// 例如,可以记录重试次数,超过一定次数后不再重试
}
}
}
这个例子中,我们使用了 CallerRunsPolicy 作为拒绝策略,这意味着当线程池饱和时,提交任务的线程会直接执行被拒绝的任务。同时,我们使用了自定义的 ThreadFactory 和 UncaughtExceptionHandler 来捕获线程中的未捕获异常,并记录异常信息。
7. 总结一些重点
ThreadPoolExecutor的线程回收异常会导致线程拒绝。需要从日志分析,监控线程池状态,检查任务代码,自定义ThreadFactory和UncaughtExceptionHandler等步骤进行排查。解决方案包括在任务代码中捕获异常,调整线程池配置,选择合适的拒绝策略等。
希望今天的讲解对大家有所帮助!谢谢大家!