JAVA Executors工厂方法默认线程池坑点详解与替代创建方式

JAVA Executors工厂方法默认线程池坑点详解与替代创建方式

大家好,今天我们来聊聊Java并发编程中一个经常被提及但又容易被忽视的话题:Executors 工厂方法创建的默认线程池及其潜在的坑点,以及更安全、更可控的替代方案。

java.util.concurrent.Executors 类提供了一系列静态工厂方法,用于创建不同类型的线程池。虽然这些方法使用起来非常方便,但如果对其内部实现和适用场景不了解,很容易掉进坑里,导致系统性能下降甚至崩溃。

Executors 工厂方法提供的常见线程池类型

工厂方法 描述 适用场景
newFixedThreadPool(int nThreads) 创建一个固定大小的线程池。线程池维护固定数量的线程,如果所有线程都在忙碌,则新提交的任务将在队列中等待,直到有线程空闲。 适用于执行CPU密集型任务,任务数量相对稳定,希望控制并发线程数量的场景。
newCachedThreadPool() 创建一个可缓存的线程池。线程池的大小是不固定的,可以根据需要动态增加或减少线程的数量。如果线程池中的线程空闲超过一定时间(默认60秒),则会被回收。如果线程池中没有可用的线程,则会创建一个新的线程来执行任务。 适用于执行大量耗时较少的任务,任务到达率高,但执行时间短的场景。线程池可以动态地调整线程数量,以适应任务负载的变化。
newSingleThreadExecutor() 创建一个单线程的线程池。线程池只有一个线程,所有提交的任务都将按照FIFO(先进先出)的顺序执行。 适用于需要保证任务顺序执行,并且不需要并发执行的场景。例如,处理GUI事件、单生产者单消费者模式等。
newScheduledThreadPool(int corePoolSize) 创建一个可以执行延迟任务和周期性任务的线程池。线程池维护固定数量的线程,可以安排任务在指定的时间后执行,或者按照固定的频率重复执行。 适用于需要定时执行任务或者周期性执行任务的场景。例如,定时备份数据、定时发送邮件等。

默认线程池的坑点

虽然 Executors 提供的线程池使用起来非常方便,但它们都存在一些潜在的坑点,主要集中在以下两个方面:

  1. newFixedThreadPoolnewSingleThreadExecutorLinkedBlockingQueue 导致的OOM风险

    这两个方法都使用 LinkedBlockingQueue 作为任务队列。LinkedBlockingQueue 如果在构造时没有指定容量,默认是一个无界队列。这意味着,当任务提交速度远大于任务执行速度时,队列会无限增长,最终导致 OutOfMemoryError (OOM)。

    // newFixedThreadPool 的源码片段
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    // newSingleThreadExecutor 的源码片段
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    示例代码:

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class FixedThreadPoolOOM {
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executor = Executors.newFixedThreadPool(10);
    
            try {
                for (int i = 0; i < Integer.MAX_VALUE; i++) {
                    final int taskNumber = i;
                    executor.submit(() -> {
                        try {
                            // 模拟耗时任务
                            TimeUnit.MILLISECONDS.sleep(10);
                            System.out.println("Task " + taskNumber + " completed by " + Thread.currentThread().getName());
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    });
                }
            } catch (Exception e) {
                System.err.println("Exception occurred: " + e.getMessage());
                e.printStackTrace();
            } finally {
                executor.shutdown();
                executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
            }
        }
    }

    在这个例子中,我们创建了一个固定大小为10的线程池,然后提交了大量的任务。由于任务执行速度慢于提交速度,任务队列会迅速增长,最终导致OOM。

  2. newCachedThreadPool 的线程无限制增长导致的资源耗尽

    newCachedThreadPool 使用 SynchronousQueue 作为任务队列。SynchronousQueue 不存储任何任务,每个提交的任务都必须立即找到一个可用的线程来执行,否则任务将被拒绝。这意味着,当任务到达率非常高时,newCachedThreadPool 会不断创建新的线程,而空闲的线程会在60秒后被回收。如果任务持续高并发,线程池可能会无限制地增长,最终导致CPU、内存等资源耗尽。

    // newCachedThreadPool 的源码片段
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    示例代码:

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class CachedThreadPoolResourceExhaustion {
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executor = Executors.newCachedThreadPool();
    
            try {
                for (int i = 0; i < 100000; i++) { // 模拟大量请求
                    final int taskNumber = i;
                    executor.submit(() -> {
                        try {
                            // 模拟短耗时任务
                            TimeUnit.MILLISECONDS.sleep(1);
                            System.out.println("Task " + taskNumber + " completed by " + Thread.currentThread().getName());
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    });
                }
            } catch (Exception e) {
                System.err.println("Exception occurred: " + e.getMessage());
                e.printStackTrace();
            } finally {
                executor.shutdown();
                executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
            }
        }
    }

    在这个例子中,我们创建了一个可缓存的线程池,然后提交了大量快速完成的任务。 由于任务到达率很高,线程池会迅速创建大量的线程来处理任务,如果长时间保持高并发,可能导致系统资源耗尽。

更安全的替代方案:手动创建 ThreadPoolExecutor

为了避免上述坑点,更安全的方式是手动创建 ThreadPoolExecutor,并显式地指定线程池的各项参数,包括核心线程数、最大线程数、队列类型、拒绝策略等。

ThreadPoolExecutor 的构造方法如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

参数说明:

  • corePoolSize: 核心线程数,线程池中始终保持的线程数量,即使线程处于空闲状态。
  • maximumPoolSize: 最大线程数,线程池中允许的最大线程数量。
  • keepAliveTime: 线程空闲时间,当线程池中的线程数量超过 corePoolSize 时,如果线程空闲的时间超过 keepAliveTime,则会被回收,直到线程池中的线程数量等于 corePoolSize
  • unit: keepAliveTime 的时间单位。
  • workQueue: 任务队列,用于存储等待执行的任务。
  • threadFactory: 线程工厂,用于创建新的线程。
  • handler: 拒绝策略,当任务队列已满且线程池中的线程数量达到 maximumPoolSize 时,用于处理新提交的任务。

选择合适的 BlockingQueue

选择合适的 BlockingQueue 非常重要,它直接影响线程池的性能和稳定性。常见的 BlockingQueue 类型包括:

BlockingQueue 类型 描述 适用场景
LinkedBlockingQueue 基于链表的阻塞队列,可以指定容量,也可以不指定容量(无界队列)。如果指定了容量,当队列已满时,新提交的任务将被阻塞,直到队列中有空闲位置。如果不指定容量,队列可以无限增长,但可能导致OOM。 适用于生产者速度远大于消费者速度,并且可以接受一定程度的延迟的场景。需要谨慎使用无界队列,避免OOM。
ArrayBlockingQueue 基于数组的阻塞队列,必须指定容量。当队列已满时,新提交的任务将被阻塞,直到队列中有空闲位置。 适用于生产者速度和消费者速度相对平衡,并且对延迟比较敏感的场景。
PriorityBlockingQueue 具有优先级的阻塞队列,可以根据任务的优先级来决定任务的执行顺序。 适用于需要根据任务的优先级来执行任务的场景。例如,需要优先处理紧急任务。
SynchronousQueue 同步队列,不存储任何元素。每个插入操作必须等待一个相应的移除操作,反之亦然。 适用于线程之间需要直接传递数据,并且不需要缓冲的场景。通常用于构建高性能的线程池,例如 newCachedThreadPool
DelayQueue 延迟队列,队列中的元素都必须实现 Delayed 接口,可以指定元素的延迟时间。队列中的元素只有在延迟时间到期后才能被取出。 适用于需要延迟执行任务的场景。例如,定时清理过期数据、定时发送消息等。

选择合适的 RejectedExecutionHandler

RejectedExecutionHandler 用于处理当任务队列已满且线程池中的线程数量达到 maximumPoolSize 时,新提交的任务。常见的 RejectedExecutionHandler 类型包括:

RejectedExecutionHandler 类型 描述 适用场景
AbortPolicy 默认策略,直接抛出 RejectedExecutionException 异常。 适用于不能接受任务被拒绝的场景。
CallerRunsPolicy 将任务交给提交任务的线程来执行。 适用于不希望丢弃任何任务,但可以接受任务执行时间延长的场景。
DiscardPolicy 直接丢弃任务,不做任何处理。 适用于可以接受任务被丢弃,并且不需要进行任何处理的场景。
DiscardOldestPolicy 丢弃队列中最老的任务,然后尝试执行新提交的任务。 适用于希望优先处理新提交的任务,并且可以接受旧任务被丢弃的场景。

示例代码:手动创建 ThreadPoolExecutor

import java.util.concurrent.*;

public class CustomThreadPool {

    public static void main(String[] args) throws InterruptedException {
        int corePoolSize = 5;
        int maximumPoolSize = 10;
        long keepAliveTime = 60L;
        TimeUnit unit = TimeUnit.SECONDS;
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100); // 使用有界队列
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); // 使用CallerRunsPolicy

        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue,
                threadFactory,
                handler);

        try {
            for (int i = 0; i < 200; i++) {
                final int taskNumber = i;
                executor.submit(() -> {
                    try {
                        TimeUnit.MILLISECONDS.sleep(10);
                        System.out.println("Task " + taskNumber + " completed by " + Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
            }
        } catch (Exception e) {
            System.err.println("Exception occurred: " + e.getMessage());
            e.printStackTrace();
        } finally {
            executor.shutdown();
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        }
    }
}

在这个例子中,我们手动创建了一个 ThreadPoolExecutor,并指定了核心线程数、最大线程数、空闲时间、时间单位、有界队列、线程工厂和拒绝策略。 通过使用有界队列和 CallerRunsPolicy 拒绝策略,我们可以避免OOM和资源耗尽的风险。

监控线程池状态

手动创建 ThreadPoolExecutor 之后,我们可以通过以下方法来监控线程池的状态:

  • getPoolSize(): 返回线程池中当前线程的数量。
  • getActiveCount(): 返回线程池中正在执行任务的线程数量。
  • getQueue().size(): 返回任务队列中等待执行的任务数量。
  • getCompletedTaskCount(): 返回线程池中已完成的任务数量。
  • getTaskCount(): 返回线程池中已提交的任务总数。

通过监控这些指标,我们可以及时发现线程池的性能瓶颈,并进行相应的调整。

总结:谨慎使用默认线程池,手动配置更安全

Executors 工厂方法提供的默认线程池使用方便,但存在OOM和资源耗尽的风险。 为了避免这些风险,更安全的方式是手动创建 ThreadPoolExecutor,并显式地指定线程池的各项参数。 同时,选择合适的 BlockingQueueRejectedExecutionHandler,并监控线程池的状态,可以帮助我们构建更稳定、更可靠的并发程序。

务必了解线程池的参数才能有效避免坑

理解 ThreadPoolExecutor 的参数和工作原理,选择合适的队列和拒绝策略,才能有效避免潜在的OOM和资源耗尽风险。

手动配置线程池,更安全更可控

手动创建并配置 ThreadPoolExecutor,可以更好地控制线程池的行为,并根据实际需求进行优化。

监控线程池状态,及时发现问题

监控线程池的状态,可以及时发现性能瓶颈和潜在问题,并采取相应的措施进行调整和优化。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注