JAVA Executors工厂方法默认线程池坑点详解与替代创建方式
大家好,今天我们来聊聊Java并发编程中一个经常被提及但又容易被忽视的话题:Executors 工厂方法创建的默认线程池及其潜在的坑点,以及更安全、更可控的替代方案。
java.util.concurrent.Executors 类提供了一系列静态工厂方法,用于创建不同类型的线程池。虽然这些方法使用起来非常方便,但如果对其内部实现和适用场景不了解,很容易掉进坑里,导致系统性能下降甚至崩溃。
Executors 工厂方法提供的常见线程池类型
| 工厂方法 | 描述 | 适用场景 |
|---|---|---|
newFixedThreadPool(int nThreads) |
创建一个固定大小的线程池。线程池维护固定数量的线程,如果所有线程都在忙碌,则新提交的任务将在队列中等待,直到有线程空闲。 | 适用于执行CPU密集型任务,任务数量相对稳定,希望控制并发线程数量的场景。 |
newCachedThreadPool() |
创建一个可缓存的线程池。线程池的大小是不固定的,可以根据需要动态增加或减少线程的数量。如果线程池中的线程空闲超过一定时间(默认60秒),则会被回收。如果线程池中没有可用的线程,则会创建一个新的线程来执行任务。 | 适用于执行大量耗时较少的任务,任务到达率高,但执行时间短的场景。线程池可以动态地调整线程数量,以适应任务负载的变化。 |
newSingleThreadExecutor() |
创建一个单线程的线程池。线程池只有一个线程,所有提交的任务都将按照FIFO(先进先出)的顺序执行。 | 适用于需要保证任务顺序执行,并且不需要并发执行的场景。例如,处理GUI事件、单生产者单消费者模式等。 |
newScheduledThreadPool(int corePoolSize) |
创建一个可以执行延迟任务和周期性任务的线程池。线程池维护固定数量的线程,可以安排任务在指定的时间后执行,或者按照固定的频率重复执行。 | 适用于需要定时执行任务或者周期性执行任务的场景。例如,定时备份数据、定时发送邮件等。 |
默认线程池的坑点
虽然 Executors 提供的线程池使用起来非常方便,但它们都存在一些潜在的坑点,主要集中在以下两个方面:
-
newFixedThreadPool和newSingleThreadExecutor的LinkedBlockingQueue导致的OOM风险这两个方法都使用
LinkedBlockingQueue作为任务队列。LinkedBlockingQueue如果在构造时没有指定容量,默认是一个无界队列。这意味着,当任务提交速度远大于任务执行速度时,队列会无限增长,最终导致OutOfMemoryError(OOM)。// newFixedThreadPool 的源码片段 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } // newSingleThreadExecutor 的源码片段 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }示例代码:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class FixedThreadPoolOOM { public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(10); try { for (int i = 0; i < Integer.MAX_VALUE; i++) { final int taskNumber = i; executor.submit(() -> { try { // 模拟耗时任务 TimeUnit.MILLISECONDS.sleep(10); System.out.println("Task " + taskNumber + " completed by " + Thread.currentThread().getName()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } } catch (Exception e) { System.err.println("Exception occurred: " + e.getMessage()); e.printStackTrace(); } finally { executor.shutdown(); executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); } } }在这个例子中,我们创建了一个固定大小为10的线程池,然后提交了大量的任务。由于任务执行速度慢于提交速度,任务队列会迅速增长,最终导致OOM。
-
newCachedThreadPool的线程无限制增长导致的资源耗尽newCachedThreadPool使用SynchronousQueue作为任务队列。SynchronousQueue不存储任何任务,每个提交的任务都必须立即找到一个可用的线程来执行,否则任务将被拒绝。这意味着,当任务到达率非常高时,newCachedThreadPool会不断创建新的线程,而空闲的线程会在60秒后被回收。如果任务持续高并发,线程池可能会无限制地增长,最终导致CPU、内存等资源耗尽。// newCachedThreadPool 的源码片段 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }示例代码:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class CachedThreadPoolResourceExhaustion { public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); try { for (int i = 0; i < 100000; i++) { // 模拟大量请求 final int taskNumber = i; executor.submit(() -> { try { // 模拟短耗时任务 TimeUnit.MILLISECONDS.sleep(1); System.out.println("Task " + taskNumber + " completed by " + Thread.currentThread().getName()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } } catch (Exception e) { System.err.println("Exception occurred: " + e.getMessage()); e.printStackTrace(); } finally { executor.shutdown(); executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); } } }在这个例子中,我们创建了一个可缓存的线程池,然后提交了大量快速完成的任务。 由于任务到达率很高,线程池会迅速创建大量的线程来处理任务,如果长时间保持高并发,可能导致系统资源耗尽。
更安全的替代方案:手动创建 ThreadPoolExecutor
为了避免上述坑点,更安全的方式是手动创建 ThreadPoolExecutor,并显式地指定线程池的各项参数,包括核心线程数、最大线程数、队列类型、拒绝策略等。
ThreadPoolExecutor 的构造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数说明:
corePoolSize: 核心线程数,线程池中始终保持的线程数量,即使线程处于空闲状态。maximumPoolSize: 最大线程数,线程池中允许的最大线程数量。keepAliveTime: 线程空闲时间,当线程池中的线程数量超过corePoolSize时,如果线程空闲的时间超过keepAliveTime,则会被回收,直到线程池中的线程数量等于corePoolSize。unit:keepAliveTime的时间单位。workQueue: 任务队列,用于存储等待执行的任务。threadFactory: 线程工厂,用于创建新的线程。handler: 拒绝策略,当任务队列已满且线程池中的线程数量达到maximumPoolSize时,用于处理新提交的任务。
选择合适的 BlockingQueue
选择合适的 BlockingQueue 非常重要,它直接影响线程池的性能和稳定性。常见的 BlockingQueue 类型包括:
| BlockingQueue 类型 | 描述 | 适用场景 |
|---|---|---|
LinkedBlockingQueue |
基于链表的阻塞队列,可以指定容量,也可以不指定容量(无界队列)。如果指定了容量,当队列已满时,新提交的任务将被阻塞,直到队列中有空闲位置。如果不指定容量,队列可以无限增长,但可能导致OOM。 | 适用于生产者速度远大于消费者速度,并且可以接受一定程度的延迟的场景。需要谨慎使用无界队列,避免OOM。 |
ArrayBlockingQueue |
基于数组的阻塞队列,必须指定容量。当队列已满时,新提交的任务将被阻塞,直到队列中有空闲位置。 | 适用于生产者速度和消费者速度相对平衡,并且对延迟比较敏感的场景。 |
PriorityBlockingQueue |
具有优先级的阻塞队列,可以根据任务的优先级来决定任务的执行顺序。 | 适用于需要根据任务的优先级来执行任务的场景。例如,需要优先处理紧急任务。 |
SynchronousQueue |
同步队列,不存储任何元素。每个插入操作必须等待一个相应的移除操作,反之亦然。 | 适用于线程之间需要直接传递数据,并且不需要缓冲的场景。通常用于构建高性能的线程池,例如 newCachedThreadPool。 |
DelayQueue |
延迟队列,队列中的元素都必须实现 Delayed 接口,可以指定元素的延迟时间。队列中的元素只有在延迟时间到期后才能被取出。 |
适用于需要延迟执行任务的场景。例如,定时清理过期数据、定时发送消息等。 |
选择合适的 RejectedExecutionHandler
RejectedExecutionHandler 用于处理当任务队列已满且线程池中的线程数量达到 maximumPoolSize 时,新提交的任务。常见的 RejectedExecutionHandler 类型包括:
| RejectedExecutionHandler 类型 | 描述 | 适用场景 |
|---|---|---|
AbortPolicy |
默认策略,直接抛出 RejectedExecutionException 异常。 |
适用于不能接受任务被拒绝的场景。 |
CallerRunsPolicy |
将任务交给提交任务的线程来执行。 | 适用于不希望丢弃任何任务,但可以接受任务执行时间延长的场景。 |
DiscardPolicy |
直接丢弃任务,不做任何处理。 | 适用于可以接受任务被丢弃,并且不需要进行任何处理的场景。 |
DiscardOldestPolicy |
丢弃队列中最老的任务,然后尝试执行新提交的任务。 | 适用于希望优先处理新提交的任务,并且可以接受旧任务被丢弃的场景。 |
示例代码:手动创建 ThreadPoolExecutor
import java.util.concurrent.*;
public class CustomThreadPool {
public static void main(String[] args) throws InterruptedException {
int corePoolSize = 5;
int maximumPoolSize = 10;
long keepAliveTime = 60L;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100); // 使用有界队列
ThreadFactory threadFactory = Executors.defaultThreadFactory();
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); // 使用CallerRunsPolicy
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
handler);
try {
for (int i = 0; i < 200; i++) {
final int taskNumber = i;
executor.submit(() -> {
try {
TimeUnit.MILLISECONDS.sleep(10);
System.out.println("Task " + taskNumber + " completed by " + Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
} catch (Exception e) {
System.err.println("Exception occurred: " + e.getMessage());
e.printStackTrace();
} finally {
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
}
}
在这个例子中,我们手动创建了一个 ThreadPoolExecutor,并指定了核心线程数、最大线程数、空闲时间、时间单位、有界队列、线程工厂和拒绝策略。 通过使用有界队列和 CallerRunsPolicy 拒绝策略,我们可以避免OOM和资源耗尽的风险。
监控线程池状态
手动创建 ThreadPoolExecutor 之后,我们可以通过以下方法来监控线程池的状态:
getPoolSize(): 返回线程池中当前线程的数量。getActiveCount(): 返回线程池中正在执行任务的线程数量。getQueue().size(): 返回任务队列中等待执行的任务数量。getCompletedTaskCount(): 返回线程池中已完成的任务数量。getTaskCount(): 返回线程池中已提交的任务总数。
通过监控这些指标,我们可以及时发现线程池的性能瓶颈,并进行相应的调整。
总结:谨慎使用默认线程池,手动配置更安全
Executors 工厂方法提供的默认线程池使用方便,但存在OOM和资源耗尽的风险。 为了避免这些风险,更安全的方式是手动创建 ThreadPoolExecutor,并显式地指定线程池的各项参数。 同时,选择合适的 BlockingQueue 和 RejectedExecutionHandler,并监控线程池的状态,可以帮助我们构建更稳定、更可靠的并发程序。
务必了解线程池的参数才能有效避免坑
理解 ThreadPoolExecutor 的参数和工作原理,选择合适的队列和拒绝策略,才能有效避免潜在的OOM和资源耗尽风险。
手动配置线程池,更安全更可控
手动创建并配置 ThreadPoolExecutor,可以更好地控制线程池的行为,并根据实际需求进行优化。
监控线程池状态,及时发现问题
监控线程池的状态,可以及时发现性能瓶颈和潜在问题,并采取相应的措施进行调整和优化。