JAVA使用ThreadPoolExecutor线程回收异常导致线程拒绝问题排查

Java ThreadPoolExecutor 线程回收异常导致线程拒绝问题排查

各位朋友,大家好!今天我们来聊聊在使用 ThreadPoolExecutor 时,线程回收异常导致线程拒绝的问题。这个问题在实际开发中比较常见,而且往往排查起来比较棘手。我会从原理、现象、原因、排查方法和解决方案几个方面,结合代码示例,为大家详细讲解。

1. ThreadPoolExecutor 的基本原理

ThreadPoolExecutor 是 Java 并发包 java.util.concurrent 中一个非常重要的类,它提供了一个线程池的实现。理解 ThreadPoolExecutor 的工作原理是解决问题的关键。

线程池的核心参数:

参数名 类型 含义
corePoolSize int 核心线程数。即使线程池空闲,也会保持的线程数量。
maximumPoolSize int 最大线程数。线程池允许创建的最大线程数量。
keepAliveTime long 空闲线程存活时间。当线程池中的线程数量超过 corePoolSize 时,空闲线程在超过这个时间后会被销毁。
unit TimeUnit keepAliveTime 的时间单位。
workQueue BlockingQueue<Runnable> 任务队列。用于存放等待执行的任务。常用的队列有 ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue 等。
threadFactory ThreadFactory 线程工厂。用于创建线程。可以自定义线程的名称、优先级等。
rejectedExecutionHandler RejectedExecutionHandler 拒绝策略。当任务队列已满,且线程池中的线程数量达到 maximumPoolSize 时,新提交的任务会被拒绝。常用的拒绝策略有 AbortPolicyCallerRunsPolicyDiscardPolicyDiscardOldestPolicy

ThreadPoolExecutor 的工作流程:

  1. 当提交一个新任务时,线程池首先会检查当前线程池中的线程数量是否小于 corePoolSize
  2. 如果小于,则创建一个新的线程来执行该任务。
  3. 如果大于等于 corePoolSize,则将该任务添加到 workQueue 中。
  4. 如果 workQueue 已满,且当前线程池中的线程数量小于 maximumPoolSize,则创建一个新的线程来执行该任务。
  5. 如果 workQueue 已满,且当前线程池中的线程数量大于等于 maximumPoolSize,则执行 rejectedExecutionHandler 指定的拒绝策略。

2. 线程回收异常导致线程拒绝的现象

线程回收异常导致线程拒绝,通常表现为以下几种现象:

  • 任务拒绝执行: 新提交的任务无法被线程池执行,rejectedExecutionHandler 被调用。
  • 线程池饱和: 线程池中的线程数量达到 maximumPoolSize,并且 workQueue 已满。
  • 程序运行缓慢: 由于任务被拒绝,整体处理速度下降。
  • 异常日志: 可能会出现与线程池拒绝策略相关的异常日志,例如 RejectedExecutionException

3. 线程回收异常的原因分析

线程回收异常的根本原因是:线程在执行任务的过程中抛出了未捕获的异常,导致线程死亡,但线程池并没有及时感知到,或者感知到后没有正确处理,最终导致线程池中的可用线程数量减少,当任务提交速度超过线程池的处理速度时,就会触发拒绝策略。

更详细的分析:

  • 任务代码存在未捕获的异常: 这是最常见的原因。如果任务的 run() 方法中抛出了未捕获的异常,线程会终止,但线程池本身可能不知道这个线程已经死了。

    ExecutorService executor = Executors.newFixedThreadPool(1);
    executor.execute(() -> {
        System.out.println("Task started");
        throw new RuntimeException("Simulated exception in task");
        //System.out.println("Task finished"); // 这行代码不会被执行
    });
    executor.shutdown();

    在这个例子中,RuntimeException 没有被捕获,线程会直接终止。

  • 自定义 ThreadFactory 的问题: 如果使用了自定义的 ThreadFactory,并且在创建线程时没有正确处理异常,也可能导致线程死亡。

    ThreadFactory threadFactory = new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setUncaughtExceptionHandler((thread, throwable) -> {
                System.err.println("Uncaught exception in thread: " + thread.getName() + ", " + throwable.getMessage());
            });
            return t;
        }
    };
    
    ExecutorService executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
    executor.execute(() -> {
        System.out.println("Task started");
        throw new RuntimeException("Simulated exception in task");
        //System.out.println("Task finished"); // 这行代码不会被执行
    });
    executor.shutdown();

    这个例子展示了如何使用 UncaughtExceptionHandler 来捕获线程中的未捕获异常,但即使捕获了异常,线程仍然会终止。关键在于,我们需要在捕获异常后,通知线程池,让线程池重新创建一个新的线程。

  • 线程池配置不合理: 线程池的 corePoolSizemaximumPoolSize 配置过小,或者 workQueue 容量太小,在高并发场景下容易导致线程池饱和,从而触发拒绝策略。

  • 拒绝策略选择不当: 如果选择了 AbortPolicy,当任务被拒绝时,会直接抛出 RejectedExecutionException,这可能会中断程序的正常流程。

4. 排查方法

排查线程回收异常导致线程拒绝的问题,需要从以下几个方面入手:

  1. 查看日志: 首先要查看程序的日志,特别是与线程池相关的日志,看看是否有 RejectedExecutionException 或其他异常信息。

  2. 监控线程池状态: 使用 ThreadPoolExecutor 提供的方法,监控线程池的状态,例如:

    • getPoolSize():获取线程池中当前的线程数量。
    • getActiveCount():获取正在执行任务的线程数量。
    • getQueue().size():获取任务队列中等待执行的任务数量。
    • getCompletedTaskCount():获取已完成的任务数量。
    • getTaskCount():获取已提交的任务总数。

    通过监控这些指标,可以了解线程池的运行状况,判断是否存在线程池饱和的情况。

    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
    
    // 模拟提交任务
    for (int i = 0; i < 10; i++) {
        final int taskNumber = i;
        executor.execute(() -> {
            try {
                System.out.println("Task " + taskNumber + " started by " + Thread.currentThread().getName());
                Thread.sleep(1000); // 模拟耗时操作
                System.out.println("Task " + taskNumber + " finished by " + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    
    // 定时打印线程池状态
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    scheduler.scheduleAtFixedRate(() -> {
        System.out.println("Pool Size: " + executor.getPoolSize());
        System.out.println("Active Count: " + executor.getActiveCount());
        System.out.println("Queue Size: " + executor.getQueue().size());
        System.out.println("Completed Task Count: " + executor.getCompletedTaskCount());
        System.out.println("Task Count: " + executor.getTaskCount());
    }, 0, 1, TimeUnit.SECONDS);
    
    // 关闭线程池
    executor.shutdown();
    try {
        executor.awaitTermination(10, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    scheduler.shutdown();

    这个例子展示了如何使用 ScheduledExecutorService 定时打印线程池的状态信息。

  3. 检查任务代码: 仔细检查任务的 run() 方法,确保所有可能抛出异常的地方都进行了捕获和处理。可以使用 try-catch 块来捕获异常,并记录异常信息。

  4. 自定义 ThreadFactory 和 UncaughtExceptionHandler: 使用自定义的 ThreadFactory,并设置 UncaughtExceptionHandler,以便捕获线程中的未捕获异常。在 UncaughtExceptionHandler 中,可以记录异常信息,并尝试重新启动线程。

  5. 使用线程池监控工具: 可以使用一些线程池监控工具,例如 VisualVMJConsole 等,来监控线程池的运行状态。

5. 解决方案

针对线程回收异常导致线程拒绝的问题,可以采取以下解决方案:

  1. 在任务代码中捕获异常: 这是最基本的解决方案。在任务的 run() 方法中使用 try-catch 块来捕获异常,并记录异常信息。

    executor.execute(() -> {
        try {
            System.out.println("Task started");
            // 可能会抛出异常的代码
            int result = 10 / 0; // 模拟除零异常
            System.out.println("Task finished");
        } catch (Exception e) {
            System.err.println("Exception in task: " + e.getMessage());
            // 处理异常,例如记录日志、发送告警等
        }
    });
  2. 使用自定义 ThreadFactory 和 UncaughtExceptionHandler: 使用自定义的 ThreadFactory,并设置 UncaughtExceptionHandler,以便捕获线程中的未捕获异常。在 UncaughtExceptionHandler 中,可以记录异常信息,并尝试重新启动线程。

    ThreadFactory threadFactory = new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setUncaughtExceptionHandler((thread, throwable) -> {
                System.err.println("Uncaught exception in thread: " + thread.getName() + ", " + throwable.getMessage());
                // 重新提交任务,或者创建一个新的线程来执行任务
                // 这是一个简化示例,实际应用中需要考虑线程池的状态和任务的特性
                executor.execute(r);
            });
            return t;
        }
    };
    
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
    executor.execute(() -> {
        System.out.println("Task started");
        throw new RuntimeException("Simulated exception in task");
        //System.out.println("Task finished"); // 这行代码不会被执行
    });
    executor.shutdown();

    注意:UncaughtExceptionHandler 中重新提交任务或者创建新的线程时,需要谨慎考虑,避免出现死循环或者线程池过度膨胀的情况。需要根据实际情况,设置合适的重试次数或者延迟时间。

  3. 调整线程池配置: 根据实际情况,调整线程池的 corePoolSizemaximumPoolSizeworkQueue 的大小,以提高线程池的处理能力。

    • 如果任务是 CPU 密集型的,可以适当增加 corePoolSizemaximumPoolSize
    • 如果任务是 I/O 密集型的,可以适当增加 maximumPoolSize,并减少 corePoolSize
    • 如果任务的提交速度很快,可以适当增加 workQueue 的大小。
  4. 选择合适的拒绝策略: 根据实际情况,选择合适的拒绝策略。

    • AbortPolicy:直接抛出 RejectedExecutionException
    • CallerRunsPolicy:由提交任务的线程来执行被拒绝的任务。
    • DiscardPolicy:直接丢弃被拒绝的任务。
    • DiscardOldestPolicy:丢弃任务队列中最旧的任务,然后将新任务添加到队列中。

    一般来说,CallerRunsPolicy 是一个比较好的选择,它可以避免任务被直接丢弃,并且可以减缓任务的提交速度,从而缓解线程池的压力。

  5. 使用监控工具: 使用线程池监控工具,可以实时监控线程池的运行状态,及时发现问题。

6. 代码示例:一个更完整的解决方案

下面是一个更完整的解决方案,它结合了自定义 ThreadFactoryUncaughtExceptionHandler 和合适的拒绝策略,并加入了日志记录功能。

import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ThreadPoolExceptionHandler {

    private static final Logger LOGGER = Logger.getLogger(ThreadPoolExceptionHandler.class.getName());

    public static void main(String[] args) {
        ThreadFactory threadFactory = new ThreadFactory() {
            private int counter = 0;

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "Task-Thread-" + counter++);
                thread.setUncaughtExceptionHandler(new CustomUncaughtExceptionHandler());
                return thread;
            }
        };

        RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); // 使用 CallerRunsPolicy

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, // corePoolSize
                4, // maximumPoolSize
                60, // keepAliveTime
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100), // workQueue
                threadFactory,
                rejectedExecutionHandler
        );

        // 提交任务
        for (int i = 0; i < 20; i++) {
            final int taskNumber = i;
            executor.execute(() -> {
                try {
                    LOGGER.info("Task " + taskNumber + " started by " + Thread.currentThread().getName());
                    // 模拟耗时操作
                    Thread.sleep(500);
                    if (taskNumber % 5 == 0) {
                        throw new RuntimeException("Simulated exception in task " + taskNumber);
                    }
                    LOGGER.info("Task " + taskNumber + " finished by " + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LOGGER.log(Level.WARNING, "Task " + taskNumber + " interrupted", e);
                } catch (Exception e) {
                    LOGGER.log(Level.SEVERE, "Exception in task " + taskNumber, e);
                    throw e; // 抛出异常,让 UncaughtExceptionHandler 处理
                }
            });
        }

        executor.shutdown();

        try {
            executor.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            LOGGER.log(Level.WARNING, "Interrupted while waiting for termination", e);
            Thread.currentThread().interrupt();
        }

        LOGGER.info("All tasks submitted.");
    }

    static class CustomUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            LOGGER.log(Level.SEVERE, "Uncaught exception in thread " + t.getName(), e);
            // 可以尝试重新提交任务,但是需要谨慎处理,避免死循环
            // 例如,可以记录重试次数,超过一定次数后不再重试
        }
    }
}

这个例子中,我们使用了 CallerRunsPolicy 作为拒绝策略,这意味着当线程池饱和时,提交任务的线程会直接执行被拒绝的任务。同时,我们使用了自定义的 ThreadFactoryUncaughtExceptionHandler 来捕获线程中的未捕获异常,并记录异常信息。

7. 总结一些重点

ThreadPoolExecutor的线程回收异常会导致线程拒绝。需要从日志分析,监控线程池状态,检查任务代码,自定义ThreadFactory和UncaughtExceptionHandler等步骤进行排查。解决方案包括在任务代码中捕获异常,调整线程池配置,选择合适的拒绝策略等。

希望今天的讲解对大家有所帮助!谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注