各位编程领域的同仁们,大家好!
今天,我们将深入探讨Java并发编程的核心基石之一:Runnable接口。这个看似简单的接口,却是Java多线程世界的起点,它与线程池、任务调度以及更高级的并发结构如invoke、batch和stream的底层线程分发机制紧密相连。作为一名编程专家,我将带领大家穿透表象,揭示这些机制在底层是如何协同工作,高效地管理和分发任务的。
一、Runnable接口:并发编程的基石
在Java中,如果你想让一段代码独立于主程序流运行,即并发执行,你就需要用到线程。而定义线程所执行的任务,最基本的方式就是实现Runnable接口。
1.1 什么是Runnable接口?
java.lang.Runnable是一个函数式接口,它只有一个抽象方法:
@FunctionalInterface
public interface Runnable {
/**
* 当一个实现了Runnable接口的对象作为参数传递给Thread构造函数时,
* 并调用Thread的start方法时,这个run方法就会在单独的线程中执行。
*/
public abstract void run();
}
它的核心职责是封装一个要在线程中执行的任务逻辑。run()方法不接受任何参数,也不返回任何结果(返回类型为void)。这种简洁性使其成为定义轻量级、无返回结果任务的理想选择。
1.2 Runnable与Thread的初次邂逅
最直接的使用Runnable的方式是将其传递给Thread类的构造函数,然后调用Thread对象的start()方法:
public class MyRunnableTask implements Runnable {
private String taskName;
public MyRunnableTask(String taskName) {
this.taskName = taskName;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 正在执行任务: " + taskName);
try {
// 模拟任务执行耗时
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 的任务 " + taskName + " 被中断。");
Thread.currentThread().interrupt(); // 重新设置中断标志
}
System.out.println(Thread.currentThread().getName() + " 完成任务: " + taskName);
}
public static void main(String[] args) {
System.out.println("主线程开始...");
// 创建并启动多个线程
for (int i = 0; i < 3; i++) {
Runnable task = new MyRunnableTask("任务-" + (i + 1));
Thread thread = new Thread(task, "工作线程-" + (i + 1));
thread.start();
}
System.out.println("主线程结束。");
}
}
输出示例 (顺序可能不同):
主线程开始...
主线程结束。
工作线程-1 正在执行任务: 任务-1
工作线程-2 正在执行任务: 任务-2
工作线程-3 正在执行任务: 任务-3
工作线程-2 完成任务: 任务-2
工作线程-3 完成任务: 任务-3
工作线程-1 完成任务: 任务-1
从输出可以看出,主线程在启动了其他线程后就继续执行,而不会等待它们完成。这是并发的本质。
1.3 Runnable与Callable的对比
虽然Runnable是基础,但在很多场景下,我们希望任务执行后能返回一个结果,或者在执行过程中抛出异常。这时,java.util.concurrent.Callable接口就派上用场了。
| 特性 | Runnable |
Callable<V> |
|---|---|---|
| 方法签名 | void run() |
V call() throws Exception |
| 返回结果 | 无(void) |
有(泛型类型V) |
| 异常处理 | 只能通过try-catch在run()方法内部处理 |
可以声明抛出异常(throws Exception) |
| 用途 | 定义不需要返回结果的任务 | 定义需要返回结果或抛出检查异常的任务 |
| 配合接口 | 通常与Thread或ExecutorService.execute() |
通常与ExecutorService.submit()配合使用,返回Future |
尽管Callable提供了更丰富的功能,但Runnable仍然是许多底层并发机制(尤其是线程池)的核心,因为线程池中的工作线程本质上执行的就是Runnable任务。当提交Callable任务时,线程池通常会将其包装成一个特殊的Runnable(例如FutureTask),以便在线程池中执行。
二、从手动创建线程到线程池:效率与管理的飞跃
直接创建Thread对象并启动虽然简单,但存在诸多问题:
- 资源消耗大:每次创建和销毁线程都有不小的开销。
- 管理复杂:难以控制并发线程的数量,过多线程可能导致系统资源耗尽。
- 任务调度:手动管理任务的提交和执行非常繁琐。
为了解决这些问题,Java 5引入了Executor框架,其中最核心的是ExecutorService。
2.1 ExecutorService:线程池的核心接口
ExecutorService是Executor的子接口,它提供了管理任务生命周期的方法,以及关闭线程池的方法。它将任务提交与任务执行解耦,由线程池来负责线程的创建、复用和管理。
主要的提交任务方法:
void execute(Runnable command): 提交一个Runnable任务,不返回结果。<T> Future<T> submit(Callable<T> task): 提交一个Callable任务,返回一个Future对象,用于获取任务结果或取消任务。Future<?> submit(Runnable task): 提交一个Runnable任务,返回一个Future对象,其get()方法在任务完成时返回null。<T> Future<T> submit(Runnable task, T result): 提交一个Runnable任务,返回一个Future对象,其get()方法在任务完成时返回指定的result。
2.2 Executors工厂类:快速创建线程池
java.util.concurrent.Executors类提供了一系列静态工厂方法,用于方便地创建不同类型的ExecutorService:
| 方法 | 描述 | 适用场景 |
|---|---|---|
newFixedThreadPool(int nThreads) |
创建固定大小的线程池。池中的线程数量始终不变。 | 适用于负载相对稳定、任务执行时间相近的场景,避免资源过度消耗。 |
newCachedThreadPool() |
创建一个可缓存的线程池。如果池中没有可用线程,则创建新线程;如果线程空闲时间超过60秒,则终止并从缓存中移除。 | 适用于执行大量短期任务的场景,能有效减少线程创建开销。 |
newSingleThreadExecutor() |
创建一个单线程的执行器。所有任务都按顺序在一个线程中执行。 | 适用于需要保证任务顺序执行的场景,或不希望任务并行执行。 |
newScheduledThreadPool(int corePoolSize) |
创建一个定长的线程池,支持定时及周期性任务执行。 | 适用于需要定时或周期性执行任务的场景(如心跳检测、定时刷新)。 |
newWorkStealingPool() (Java 8+) |
创建一个ForkJoinPool,使用工作窃取算法。 |
适用于并行处理大量小任务,如并行流操作。 |
使用 ExecutorService 提交 Runnable 任务示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceRunnableDemo {
public static void main(String[] args) throws InterruptedException {
System.out.println("主线程开始...");
// 创建一个固定大小为3的线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交10个Runnable任务到线程池
for (int i = 0; i < 10; i++) {
final int taskId = i + 1;
executor.execute(() -> { // 使用Lambda表达式作为Runnable
System.out.println(Thread.currentThread().getName() + " 正在执行任务: " + taskId);
try {
Thread.sleep((long) (Math.random() * 500));
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 的任务 " + taskId + " 被中断。");
Thread.currentThread().interrupt();
}
System.out.println(Thread.currentThread().getName() + " 完成任务: " + taskId);
});
}
// 关闭线程池,不再接受新任务,但会等待已提交任务完成
executor.shutdown();
// 等待所有任务执行完毕,最多等待10秒
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
System.err.println("部分任务未在指定时间内完成,强制关闭。");
executor.shutdownNow(); // 尝试立即停止所有正在执行的任务
}
System.out.println("主线程结束。所有任务已完成或被终止。");
}
}
输出示例 (顺序可能不同):
主线程开始...
pool-1-thread-1 正在执行任务: 1
pool-1-thread-2 正在执行任务: 2
pool-1-thread-3 正在执行任务: 3
pool-1-thread-1 完成任务: 1
pool-1-thread-1 正在执行任务: 4
pool-1-thread-2 完成任务: 2
pool-1-thread-2 正在执行任务: 5
pool-1-thread-3 完成任务: 3
pool-1-thread-3 正在执行任务: 6
...
主线程结束。所有任务已完成或被终止。
可以看到,线程池中的3个线程复用执行了10个任务,有效管理了线程资源。
三、深入探讨invoke、batch、stream在底层如何处理线程池分发
现在,我们将聚焦到更高级的任务分发模式:invoke、batch和stream。虽然它们不都是直接作用于Runnable的API,但其核心思想都是如何高效地利用线程池来并行处理任务。
3.1 invoke系列:批量提交与等待
ExecutorService提供了invokeAll()和invokeAny()两个方法,用于批量提交Callable任务并等待它们的结果。尽管它们直接接收Callable而不是Runnable,但理解它们如何与线程池交互,对于理解任务分发至关重要。如果需要提交Runnable任务并使用invoke系列的语义,可以将其包装成Callable。
3.1.1 invokeAll(Collection<? extends Callable<T>> tasks)
这个方法会执行给定的所有任务,并返回一个Future列表,其中每个Future对应一个任务的结果。它会阻塞直到所有任务都完成(正常完成或异常终止),或者达到超时时间。
底层线程池分发:
- 任务提交:当调用
invokeAll()时,ExecutorService会遍历传入的Callable集合,将每个Callable包装成一个FutureTask(FutureTask本身实现了RunnableFuture,而RunnableFuture继承了Runnable和Future),然后提交到线程池的内部工作队列中。 - 线程调度:线程池中的工作线程会从队列中取出这些
FutureTask并执行它们的run()方法。 - 结果收集:
invokeAll()会创建一个内部的CompletionService或类似机制来跟踪所有FutureTask的完成状态。它会等待所有FutureTask的isDone()方法返回true,然后按照原始任务的顺序返回对应的Future对象列表。这意味着即使某些任务先完成,invokeAll也会等待所有任务都完成后才返回。
代码示例 (包装 Runnable 为 Callable):
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class InvokeAllDemo {
public static void main(String[] args) throws InterruptedException {
System.out.println("主线程开始...");
ExecutorService executor = Executors.newFixedThreadPool(3);
List<Callable<String>> tasks = new ArrayList<>();
for (int i = 0; i < 5; i++) {
final int taskId = i + 1;
// 将Runnable任务包装成Callable,返回一个固定结果
Callable<String> callableTask = () -> {
System.out.println(Thread.currentThread().getName() + " 正在执行 invokeAll 任务: " + taskId);
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 的 invokeAll 任务 " + taskId + " 被中断。");
Thread.currentThread().interrupt();
throw new InterruptedException("任务中断"); // 抛出异常
}
System.out.println(Thread.currentThread().getName() + " 完成 invokeAll 任务: " + taskId);
return "任务 " + taskId + " 完成";
};
tasks.add(callableTask);
}
try {
// 提交所有任务并等待它们完成,最多等待5秒
System.out.println("提交 invokeAll 任务,等待所有完成...");
List<Future<String>> futures = executor.invokeAll(tasks, 5, TimeUnit.SECONDS);
System.out.println("所有 invokeAll 任务已完成或超时,开始获取结果:");
for (int i = 0; i < futures.size(); i++) {
Future<String> future = futures.get(i);
try {
if (future.isDone()) {
System.out.println("任务 " + (i + 1) + " 结果: " + future.get());
} else if (future.isCancelled()) {
System.out.println("任务 " + (i + 1) + " 被取消。");
} else {
System.out.println("任务 " + (i + 1) + " 未完成。");
}
} catch (CancellationException e) {
System.err.println("任务 " + (i + 1) + " 被取消: " + e.getMessage());
} catch (ExecutionException e) {
System.err.println("任务 " + (i + 1) + " 执行异常: " + e.getCause().getMessage());
}
}
} catch (InterruptedException e) {
System.err.println("主线程等待任务时被中断。");
Thread.currentThread().interrupt();
} finally {
executor.shutdown();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
System.out.println("主线程结束。");
}
}
}
3.1.2 invokeAny(Collection<? extends Callable<T>> tasks)
这个方法会执行给定的所有任务,并返回其中一个成功完成的任务的结果(如果多个任务同时成功,返回其中一个)。一旦有一个任务成功完成,其他所有未完成的任务都会被取消。它会阻塞直到有任务成功完成,或者所有任务都失败/超时。
底层线程池分发:
- 任务提交与跟踪:与
invokeAll()类似,每个Callable都会被提交到线程池。但invokeAny()会使用一个CompletionService来异步地跟踪任务的完成情况。 - 竞争与取消:
CompletionService会按完成顺序将Future放入一个队列。当第一个成功的Future出现时,invokeAny()会获取其结果,然后尝试取消所有其他尚未完成的任务。 - 效率:对于大量任务中只需要一个结果的场景,
invokeAny()可以显著提高效率,因为它不需要等待所有任务完成。
3.2 batch处理:按批次分发任务
batch(批处理)并非Java并发API中的一个具体方法,而是一种常见的编程模式。它指的是将大量同类型或相关联的任务,按照一定的数量或逻辑分组,然后将这些分组(或组内的任务)提交给线程池进行并发处理。这种模式在处理大数据集、文件上传下载、数据库操作等场景中非常普遍。
底层线程池分发:
- 任务切分:首先,一个大的任务列表会被逻辑上或物理上切分成多个较小的子列表(批次)。
- 批次提交:每个批次可以作为一个整体(例如,通过一个
Callable包装器来处理整个批次),或者批次内的每个独立任务(Runnable或Callable)分别提交给ExecutorService。 - 资源利用:批处理有助于平衡任务提交的开销与线程池的利用率。如果任务太小,频繁提交会增加调度开销;如果任务太大,则可能导致某些批次长时间占用线程,降低并行度。
- 控制流:批处理通常结合
invokeAll()或CompletionService来等待批次任务的完成,以便进行聚合、错误处理或启动下一个批次。
代码示例 (手动批处理 Runnable 任务):
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class BatchProcessingDemo {
public static void main(String[] args) throws InterruptedException {
System.out.println("主线程开始...");
// 模拟100个Runnable任务
List<Runnable> allTasks = new ArrayList<>();
for (int i = 0; i < 100; i++) {
final int taskId = i + 1;
allTasks.add(() -> {
System.out.println(Thread.currentThread().getName() + " 正在处理批次任务: " + taskId);
try {
Thread.sleep((long) (Math.random() * 200)); // 模拟较短的执行时间
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 的批次任务 " + taskId + " 被中断。");
Thread.currentThread().interrupt();
}
// System.out.println(Thread.currentThread().getName() + " 完成批次任务: " + taskId); // 频繁打印会影响性能
});
}
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // 使用CPU核心数
int batchSize = 20; // 定义批次大小
System.out.println("开始进行批处理,每批次 " + batchSize + " 个任务...");
for (int i = 0; i < allTasks.size(); i += batchSize) {
int end = Math.min(i + batchSize, allTasks.size());
List<Runnable> currentBatch = allTasks.subList(i, end);
// 将批次中的每个Runnable任务提交给线程池
List<Future<Void>> futures = new ArrayList<>();
for (Runnable task : currentBatch) {
// submit(Runnable task, T result) 可以返回一个Future
futures.add(executor.submit(task, null)); // result可以是null,表示不需要具体返回值
}
// 等待当前批次的所有任务完成
for (Future<Void> future : futures) {
try {
future.get(); // 阻塞直到当前任务完成
} catch (InterruptedException | ExecutionException e) {
System.err.println("批次任务执行异常: " + e.getMessage());
Thread.currentThread().interrupt();
}
}
System.out.println("批次 " + ((i / batchSize) + 1) + " 的 " + currentBatch.size() + " 个任务已完成。");
}
executor.shutdown();
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
System.err.println("部分批次任务未在指定时间内完成,强制关闭。");
executor.shutdownNow();
}
System.out.println("主线程结束。所有批次任务已完成。");
}
}
这个示例展示了如何将100个Runnable任务分成5批,每批20个。每提交完一个批次的所有任务后,主线程会等待该批次的所有任务完成,然后再处理下一个批次。这种方式可以有效控制并发度,并允许在批次级别进行错误处理和进度报告。
3.3 stream处理:并行流与ForkJoinPool
Java 8引入的Stream API,特别是并行流(parallelStream()),为集合数据的并行处理提供了声明式、函数式的方式。虽然并行流不直接接受Runnable对象,但其底层机制——ForkJoinPool——正是线程池的一种高级形式,用于高效地分发和执行任务。并行流中的每个操作(如map、filter、forEach等)都可以看作是一个个小型的“任务”,这些任务会被ForkJoinPool分发到不同的线程上执行。
底层线程池分发 (ForkJoinPool):
- 工作窃取 (Work-Stealing):
ForkJoinPool与传统的ThreadPoolExecutor最大的不同在于其工作窃取算法。每个工作线程都有一个双端队列(Deque),用于存放它自己的任务。当一个线程完成自己的任务后,它不会闲置,而是会“窃取”其他线程队列尾部的任务来执行。这最大限度地减少了线程空闲时间,提高了资源利用率。 - 分治法 (Divide and Conquer):
ForkJoinPool专门设计用于执行ForkJoinTask(例如RecursiveTask和RecursiveAction)。这些任务通常通过递归地将大问题分解为小问题来解决(fork),然后并行执行这些小问题,最后将结果合并(join)。 - 默认公共池:并行流默认使用
ForkJoinPool.commonPool()。这个公共池是JVM共享的,其线程数通常等于CPU核心数,以避免上下文切换的开销。 Runnable的间接参与:当你在并行流中使用Lambda表达式(如forEach中的Consumer,map中的Function)时,这些Lambda所代表的操作逻辑,实际上就是ForkJoinPool中的工作线程所执行的“任务”,类似于Runnable所封装的无返回值任务。
代码示例 (并行流处理数据):
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ParallelStreamDemo {
public static void main(String[] args) {
System.out.println("主线程开始...");
// 创建一个包含1000个数字的列表
List<Integer> numbers = IntStream.rangeClosed(1, 1000)
.boxed()
.collect(Collectors.toList());
System.out.println("开始使用并行流处理数据...");
long startTime = System.currentTimeMillis();
// 使用并行流对每个数字进行平方操作,并收集结果
List<Integer> squaredNumbers = numbers.parallelStream() // 开启并行流
.map(number -> {
// 这里的Lambda表达式代表了一个任务单元
// 它会在ForkJoinPool中的不同线程上执行
String threadName = Thread.currentThread().getName();
// System.out.println(threadName + " 处理数字: " + number); // 频繁打印会影响性能
try {
Thread.sleep(1); // 模拟少量计算
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return number * number;
})
.collect(Collectors.toList()); // 收集结果
long endTime = System.currentTimeMillis();
System.out.println("并行流处理完成。耗时: " + (endTime - startTime) + " ms");
// System.out.println("部分平方结果: " + squaredNumbers.subList(0, 10)); // 打印前10个结果
System.out.println("主线程结束。");
}
}
通过parallelStream(),map操作中定义的Lambda表达式会被自动地分解并分发到ForkJoinPool的多个工作线程上并行执行。我们不需要手动管理线程,ForkJoinPool会负责任务的切分和合并。
3.4 核心对比表格
| 特性 | ExecutorService.execute(Runnable) |
ExecutorService.invokeAll(Callable) |
Batch Processing (自定义) |
Stream.parallel() (ForkJoinPool) |
|---|---|---|---|---|
| 任务类型 | Runnable |
Callable (需包装Runnable获取结果) |
Runnable或Callable |
函数式接口 (Consumer, Function) |
| 返回结果 | 无 | 列表Future (阻塞等待所有) |
依赖具体实现 | 收集后的数据集合 |
| 阻塞特性 | 非阻塞 (任务提交后立即返回) | 阻塞 (等待所有任务完成) | 灵活 (可阻塞等待批次完成) | 阻塞 (等待所有并行操作完成) |
| 任务管理 | 简单提交 | 批量提交,等待所有结果 | 灵活的任务分组与提交 | 自动分解与执行 |
| 线程池 | ThreadPoolExecutor及其子类 |
ThreadPoolExecutor及其子类 |
ThreadPoolExecutor及其子类 |
ForkJoinPool (commonPool) |
| 适用场景 | 独立、无结果的任务 | 需要所有任务结果的批量操作 | 大量任务分阶段处理,控制并发 | 大规模数据集合的并行转换或聚合 |
| 核心分发机制 | 工作队列 (FIFO/LIFO) | 工作队列 + 内部CompletionService | 工作队列 (手动控制批次) | 双端队列 + 工作窃取算法 |
四、底层线程池分发机制:ThreadPoolExecutor的秘密
前面我们多次提到了线程池,现在是时候深入了解其核心实现:java.util.concurrent.ThreadPoolExecutor,以及它是如何进行任务分发的。Executors工厂方法创建的各种线程池,其底层都是ThreadPoolExecutor的定制化实例。
4.1 ThreadPoolExecutor的核心参数
理解ThreadPoolExecutor的工作原理,首先要掌握它的构造函数参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
| 参数名称 | 描述 | 对任务分发的影响 |
|---|---|---|
corePoolSize |
核心线程数。即使空闲,也保持这些线程存活。 | 当提交任务时,如果当前运行线程数小于corePoolSize,则会创建新线程来执行任务,直到达到corePoolSize。 |
maximumPoolSize |
最大线程数。线程池中允许存在的最大线程数量。 | 当任务队列已满,且当前运行线程数小于maximumPoolSize时,会创建新线程来执行任务。 |
keepAliveTime |
当线程数大于corePoolSize时,空闲线程在终止前等待新任务的最长时间。 |
影响非核心线程的生命周期。如果任务量波动大,keepAliveTime过小可能导致频繁创建销毁线程。 |
unit |
keepAliveTime的单位。 |
同上。 |
workQueue |
用于存放待执行任务的阻塞队列。 | 这是任务分发的关键。 当核心线程都在忙碌时,新提交的任务会进入此队列等待。队列的选择直接影响线程池行为。 |
threadFactory |
用于创建新线程的工厂。 | 可以自定义线程的命名、优先级、是否为守护线程等,便于调试和管理。 |
handler |
当线程池拒绝任务时(队列已满且达到maximumPoolSize)的处理策略。 |
当线程池过载时,决定如何处理新来的任务,是丢弃、阻塞、还是交给提交者执行。 |
4.2 workQueue:任务的缓冲区
workQueue是ThreadPoolExecutor实现任务分发的核心组件。它是一个BlockingQueue,这意味着它在队列为空时会阻塞生产者(工作线程获取任务),在队列已满时会阻塞消费者(任务提交者)。常见的阻塞队列类型:
ArrayBlockingQueue(有界队列):基于数组实现,固定容量。如果队列满了,新任务会被拒绝。- 分发影响:任务提交者在核心线程忙碌且队列已满时,会尝试创建新线程(直到
maximumPoolSize),如果还不行,则拒绝任务。适合控制内存使用,避免过多的待处理任务。
- 分发影响:任务提交者在核心线程忙碌且队列已满时,会尝试创建新线程(直到
LinkedBlockingQueue(无界队列):基于链表实现,默认容量为Integer.MAX_VALUE。- 分发影响:当核心线程忙碌时,新任务会无限制地排队。这意味着
maximumPoolSize参数将形同虚设,因为队列永远不会“满”到需要创建新线程来处理(除非线程数低于corePoolSize)。这可能导致内存溢出,应谨慎使用。
- 分发影响:当核心线程忙碌时,新任务会无限制地排队。这意味着
SynchronousQueue(同步队列):一个没有容量的队列。每个插入操作必须等待一个相应的移除操作,反之亦然。- 分发影响:当核心线程忙碌时,新任务不会进入队列,而是直接尝试创建新线程(直到
maximumPoolSize)。如果达到最大线程数,则拒绝任务。适合任务提交速度快,但任务处理时间短的场景,可以避免任务排队延迟。
- 分发影响:当核心线程忙碌时,新任务不会进入队列,而是直接尝试创建新线程(直到
PriorityBlockingQueue(优先级队列):一个支持优先级的无界队列。- 分发影响:任务按照优先级高低执行,而不是提交顺序。
任务分发流程概览:
- 当一个任务通过
execute()或submit()方法提交给ThreadPoolExecutor时: - 如果当前运行线程数小于
corePoolSize,即使有空闲的核心线程,也会创建一个新线程来执行任务(通常是为了快速启动)。 - 如果当前运行线程数大于或等于
corePoolSize,任务会被尝试放入workQueue。 - 如果
workQueue已满(对于有界队列),且当前运行线程数小于maximumPoolSize,则会创建一个新线程来执行任务。 - 如果
workQueue已满,且当前运行线程数已达到maximumPoolSize,则根据RejectedExecutionHandler策略拒绝任务。
4.3 RejectedExecutionHandler:应对过载
当线程池和工作队列都已满,无法接受新任务时,RejectedExecutionHandler会介入。Java提供了四种内置策略:
ThreadPoolExecutor.AbortPolicy(默认):直接抛出RejectedExecutionException异常。ThreadPoolExecutor.CallerRunsPolicy:由提交任务的线程(调用execute()或submit()的线程)自己来执行这个任务。这会减缓新任务的提交速度,从而为线程池争取处理现有任务的时间。ThreadPoolExecutor.DiscardPolicy:默默地丢弃新提交的任务,不抛出任何异常。ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列中等待时间最长的任务,然后尝试重新提交当前任务。
选择合适的拒绝策略对于系统在高负载下的行为至关重要。
4.4 ThreadFactory:定制你的工作线程
ThreadFactory是一个接口,它只有一个方法newThread(Runnable r)。通过实现这个接口并将其传递给ThreadPoolExecutor构造函数,你可以自定义线程池创建的每个线程。这对于设置线程名称、优先级、守护状态以及捕获未捕获异常等场景非常有用。
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger nextId = new AtomicInteger(1);
public CustomThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
@Override
public Thread newThread(Runnable r) {
String name = namePrefix + nextId.getAndIncrement();
Thread thread = new Thread(r, name);
System.out.println("创建新线程: " + name);
// 可以设置线程为守护线程,优先级等
// thread.setDaemon(true);
// thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
}
将这个工厂传递给ThreadPoolExecutor,你就能看到自定义的线程名称出现在日志中。
五、实践考量与最佳实践
5.1 选择合适的线程池类型和参数
- CPU密集型任务:
corePoolSize和maximumPoolSize通常设为CPU核心数 + 1或CPU核心数。因为这类任务大部分时间都在执行计算,过多的线程会导致频繁上下文切换,降低效率。队列通常选择SynchronousQueue,直接尝试创建线程或拒绝任务。 - IO密集型任务:
corePoolSize和maximumPoolSize通常可以设置得比CPU核心数大很多(例如2 * CPU核心数到N * CPU核心数,N取决于IO等待时间),因为线程在等待IO时不会占用CPU。队列通常选择LinkedBlockingQueue或ArrayBlockingQueue。 - 混合型任务:需要仔细分析任务特性,可能需要将任务拆分为CPU密集型和IO密集型子任务,分别使用不同的线程池。
- 定时任务:使用
ScheduledThreadPoolExecutor。 - 大量短期任务:
CachedThreadPool。
5.2 优雅地关闭线程池
始终记得在应用程序生命周期结束时调用executor.shutdown()。这会停止接受新任务,并等待所有已提交任务完成。如果需要更紧急的关闭,可以使用executor.shutdownNow(),它会尝试中断所有正在执行的任务并清空队列。
executor.shutdown(); // 不再接受新任务,等待现有任务完成
try {
// 最多等待60秒,看所有任务是否完成
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 强制关闭
System.err.println("线程池强制关闭。");
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
5.3 异常处理
Runnable的run()方法不抛出受检异常,这意味着任何运行时异常都必须在内部捕获。如果未捕获,它们会导致线程终止,但不会传播到提交任务的线程。为了更好地处理这种情况,可以:
- 在
run()方法内部进行try-catch。 - 为线程池提供一个
UncaughtExceptionHandler。 - 使用
submit()方法并获取Future,通过Future.get()来捕获ExecutionException(它包装了实际的任务异常)。
5.4 线程安全与同步
当多个Runnable任务并发访问共享资源时,必须确保线程安全。这通常通过synchronized关键字、java.util.concurrent.locks包中的锁、java.util.concurrent.atomic包中的原子操作以及并发集合(如ConcurrentHashMap)来实现。
5.5 监控与调优
在高并发系统中,监控线程池的状态至关重要。ThreadPoolExecutor提供了一些方法来获取当前状态信息,如getActiveCount()(当前活跃线程数)、getCompletedTaskCount()(已完成任务数)、getQueue().size()(队列中等待的任务数)等。结合日志和监控工具,可以及时发现并解决性能瓶颈。
结语
从最基础的Runnable接口,到复杂的ThreadPoolExecutor参数配置,再到invoke、batch和stream等高级任务分发模式,我们一同探索了Java并发编程的深度和广度。理解这些底层机制,特别是线程池如何根据其配置和工作队列来分发和调度任务,是构建高效、健壮并发应用程序的关键。希望今天的讲座能为您在并发编程的道路上点亮一盏明灯,助您设计出更加优异的系统。