ForkJoinPool源码深度解析:工作窃取(Work Stealing)算法与并行计算
大家好,今天我们来深入探讨Java并发包中一个非常重要的组件:ForkJoinPool,以及它所依赖的核心算法——工作窃取(Work Stealing)。ForkJoinPool是Java 7引入的一个线程池,专门用于支持并行计算,特别是那些可以分解成更小任务的任务。理解ForkJoinPool的工作原理,对于编写高效的并发程序至关重要。
1. 并行计算的需求与传统线程池的局限性
在多核处理器日益普及的今天,充分利用硬件资源进行并行计算变得越来越重要。传统的线程池(如ThreadPoolExecutor)虽然能够管理线程的生命周期,并减少线程创建和销毁的开销,但它们在处理计算密集型、可分解的任务时存在一些局限性:
- 任务分配不均: 传统的线程池通常采用集中式的任务队列,容易造成某些线程空闲,而另一些线程忙于处理任务的现象,即负载不均衡。
- 上下文切换开销: 当任务执行时间较长,且线程数量较少时,线程可能会频繁地进行上下文切换,降低整体性能。
- 难以适应递归分解的任务: 对于可以递归分解成更小任务的任务(例如:归并排序),传统线程池处理起来比较繁琐,需要手动管理任务的依赖关系。
2. 工作窃取(Work Stealing)算法:解决负载均衡的利器
为了解决上述问题,ForkJoinPool引入了工作窃取(Work Stealing)算法。该算法的核心思想是:每个线程都有自己的工作队列(Deque),当一个线程完成自己队列中的任务后,它会尝试从其他线程的队列尾部“窃取”任务来执行。
工作窃取算法的优势:
- 负载均衡: 线程从其他线程的队列窃取任务,可以有效地实现负载均衡,避免某些线程空闲,而另一些线程过载的情况。
- 减少竞争: 每个线程主要操作自己的队列,只有在需要窃取任务时才访问其他线程的队列,从而减少了线程之间的竞争。
- 适应递归分解的任务: 工作窃取算法天然适合处理可以递归分解的任务,每个子任务都可以放入线程自己的队列中,方便高效地执行。
工作窃取算法的缺点:
- 增加了开销: 窃取任务的过程会带来一定的开销,例如需要获取锁或者进行原子操作。
- 可能存在延迟: 当所有线程都有任务时,窃取操作可能会失败,导致一些线程需要等待。
3. ForkJoinPool的核心组件与源码分析
ForkJoinPool由多个核心组件组成,它们协同工作来实现工作窃取算法:
ForkJoinPool: 这是ForkJoinPool的主要类,负责创建和管理线程池,以及提交任务。ForkJoinWorkerThread: 这是ForkJoinPool中的工作线程,它会执行任务,并尝试从其他线程的队列中窃取任务。ForkJoinTask: 这是所有提交给ForkJoinPool的任务的基类。它定义了fork()和join()方法,用于分解任务和等待子任务完成。WorkQueue: 这是每个ForkJoinWorkerThread维护的双端队列,用于存储待执行的任务。
3.1 ForkJoinPool的构造函数:
ForkJoinPool提供了多个构造函数,其中最常用的构造函数如下:
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
parallelism:指定线程池的并行度,即线程的数量。通常情况下,建议将并行度设置为CPU的核心数。defaultForkJoinWorkerThreadFactory:默认的线程工厂,用于创建ForkJoinWorkerThread。uncaughtExceptionHandler:用于处理未捕获的异常。asyncMode:指定是否使用异步模式。
3.2 ForkJoinTask:任务的抽象
ForkJoinTask是一个抽象类,它定义了fork()和join()方法。
fork(): 用于将任务放入线程池的队列中,并异步执行。实际上是将任务放入当前线程的WorkQueue中。join(): 用于等待任务完成,并获取任务的结果。如果任务尚未完成,则会阻塞当前线程,直到任务完成。
ForkJoinTask有两个常用的子类:
RecursiveAction: 用于执行没有返回值的任务。RecursiveTask<V>: 用于执行有返回值的任务。
3.3 WorkQueue:工作队列
WorkQueue是每个ForkJoinWorkerThread维护的双端队列(Deque)。它用于存储待执行的任务。
push(ForkJoinTask<?> task): 将任务放入队列的头部。poll(): 从队列的头部取出一个任务。pollLast(): 从队列的尾部取出一个任务(用于工作窃取)。peek(): 返回队列头部的任务,但不移除。isEmpty(): 检查队列是否为空。
3.4 工作窃取的核心逻辑:
当一个ForkJoinWorkerThread完成自己队列中的任务后,它会尝试从其他线程的队列尾部“窃取”任务来执行。
final ForkJoinTask<?> steal(WorkQueue victim) {
ForkJoinTask<?> t = null;
for (;;) {
int h = victim.base;
int l = victim.top;
if (h >= l)
break;
if (U.compareAndSwapInt(victim, QBASE, h, h + 1)) {
t = victim.array[h & victim.mask];
if (t != null) {
victim.array[h & victim.mask] = null; // clear slot
victim.base = h + 1;
break;
} else {
victim.base = h + 1;
// try to recover
}
}
}
return t;
}
这段代码展示了ForkJoinPool中工作窃取的核心逻辑。它从目标线程的WorkQueue的尾部尝试窃取任务。使用CAS(Compare-and-Swap)操作来保证线程安全。
3.5 ForkJoinWorkerThread:工作线程
ForkJoinWorkerThread是ForkJoinPool中的工作线程,它负责执行任务,并尝试从其他线程的队列中窃取任务。ForkJoinWorkerThread的核心方法是run()方法,它会不断地从自己的队列中取出任务执行,或者从其他线程的队列中窃取任务执行。
protected void onStart() {
}
protected void onTermination(Throwable exception) {
}
public void run() {
Throwable exception = null;
try {
onStart();
ForkJoinPool p = pool;
if (p != null) {
for (ForkJoinTask<?> task = firstTask; task != null || (task = p.nextTask(this)) != null;) {
if (task.status < 0)
break;
task.exec();
}
}
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
nextTask(this)负责从当前线程的WorkQueue或者其他线程的WorkQueue获取任务。
4. 一个简单的例子:使用ForkJoinPool计算斐波那契数列
下面是一个使用ForkJoinPool计算斐波那契数列的例子:
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) { this.n = n; }
@Override
protected Integer compute() {
if (n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
Fibonacci task = new Fibonacci(40);
long startTime = System.currentTimeMillis();
Integer result = pool.invoke(task);
long endTime = System.currentTimeMillis();
System.out.println("Fibonacci(" + 40 + ") = " + result);
System.out.println("Time taken: " + (endTime - startTime) + " ms");
}
}
在这个例子中,我们将计算斐波那契数列的任务分解成更小的子任务,并使用fork()方法将子任务放入线程池的队列中。join()方法用于等待子任务完成,并获取子任务的结果。
5. ForkJoinPool的源码分析:提交任务与任务执行
当我们向ForkJoinPool提交一个任务时,会发生什么?
- 任务提交: 使用
ForkJoinPool.invoke(ForkJoinTask<V> task)或者ForkJoinPool.submit(ForkJoinTask<V> task)提交任务。 - 任务放入队列: 提交的任务会被放入提交线程对应的
WorkQueue中。如果没有对应线程,则放入公共队列(common queue)。 - 线程执行任务:
ForkJoinWorkerThread会不断地从自己的WorkQueue中取出任务执行,或者从其他线程的WorkQueue中窃取任务执行。
5.1 任务提交的源码分析:
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}
private void externalPush(ForkJoinTask<?> task) {
int r = ThreadLocalRandom.getProbe();
return; //TODO
}
externalPush方法将任务推入工作队列中,这里简化了实现,实际实现涉及到随机数生成和工作队列的选取。
5.2 任务执行的源码分析:
任务执行的核心逻辑在ForkJoinWorkerThread.run()方法中。
public void run() {
Throwable exception = null;
try {
onStart();
ForkJoinPool p = pool;
if (p != null) {
for (ForkJoinTask<?> task = firstTask; task != null || (task = p.nextTask(this)) != null;) {
if (task.status < 0)
break;
task.exec();
}
}
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
p.nextTask(this)方法负责从当前线程的WorkQueue或者其他线程的WorkQueue获取任务。task.exec()方法执行任务。
6. ForkJoinPool的适用场景与注意事项
ForkJoinPool适用于以下场景:
- 计算密集型任务:
ForkJoinPool能够充分利用多核处理器的资源,加速计算密集型任务的执行。 - 可分解的任务:
ForkJoinPool特别适合处理可以递归分解成更小任务的任务,例如:归并排序、快速排序等。 - 需要并行执行的任务:
ForkJoinPool可以并行执行多个任务,提高整体性能。
使用ForkJoinPool的注意事项:
- 避免阻塞操作: 在
ForkJoinTask中避免进行阻塞操作,否则会影响线程池的性能。 - 合理设置并行度: 并行度应该设置为CPU的核心数,或者略大于CPU的核心数。
- 注意任务的粒度: 任务的粒度应该适中,太小会导致线程切换的开销增加,太大则无法充分利用多核处理器的资源。
- 避免共享可变状态: 尽量避免在不同的
ForkJoinTask之间共享可变状态,否则需要进行同步处理,增加程序的复杂性。
7. ForkJoinPool的配置参数
ForkJoinPool提供了一些配置参数,可以根据实际情况进行调整:
| 参数 | 描述 | 默认值 |
|---|---|---|
parallelism |
指定线程池的并行度,即线程的数量。 | CPU核心数 |
ForkJoinWorkerThreadFactory |
用于创建ForkJoinWorkerThread的工厂类。 |
defaultForkJoinWorkerThreadFactory |
UncaughtExceptionHandler |
用于处理未捕获的异常。 | null |
asyncMode |
指定是否使用异步模式。如果设置为true,则任务会以异步的方式执行。 |
false |
8. 使用得当,才能发挥最大效率
ForkJoinPool是一个强大的工具,可以帮助我们编写高效的并发程序。理解其工作原理,特别是工作窃取算法,对于充分利用多核处理器的资源至关重要。在使用ForkJoinPool时,需要注意任务的分解、并行度的设置、以及避免阻塞操作等问题。只有正确地使用ForkJoinPool,才能发挥其最大的优势。
9. 工作窃取算法与ForkJoinPool的价值
工作窃取算法通过动态地调整线程的任务分配,有效地实现了负载均衡,减少了线程间的竞争,并适应了递归分解的任务。ForkJoinPool作为工作窃取算法的实现,为Java开发者提供了一个便捷、高效的并行计算框架。