Java高阶并发结构:CyclicBarrier与Phaser的灵活任务同步应用
大家好,今天我们来深入探讨Java并发编程中两个重要的同步工具:CyclicBarrier
和 Phaser
。它们都用于协调多个线程的任务执行,但适用场景和灵活性各有不同。我们将通过实际例子和代码分析,了解它们的原理、用法以及在不同场景下的选择。
CyclicBarrier:循环栅栏
CyclicBarrier
,顾名思义,是一个循环使用的栅栏。它允许一组线程互相等待,直到所有线程都到达一个公共点(栅栏点)后,再一起继续执行。它的循环性体现在所有线程通过栅栏后,可以再次使用该栅栏进行下一轮的同步。
1. 原理与机制
CyclicBarrier
的核心在于计数器。初始化时,需要指定参与同步的线程数量(parties)。每个线程调用 await()
方法时,计数器减一。当计数器归零时,表示所有线程都已到达栅栏点,此时:
- 计数器重置为初始值(parties)。
- 可以选择执行一个 Runnable 任务(barrier action)。这个任务由最后一个到达的线程执行,可以在所有线程释放前进行一些公共的处理,例如汇总结果等。
- 所有等待的线程被释放,可以继续执行。
2. 基本用法
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierExample {
private static final int NUMBER_OF_THREADS = 3;
private static final CyclicBarrier barrier = new CyclicBarrier(NUMBER_OF_THREADS, () -> {
System.out.println("所有线程到达栅栏点,执行汇总任务...");
});
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
for (int i = 0; i < NUMBER_OF_THREADS; i++) {
final int threadId = i;
executor.execute(() -> {
try {
System.out.println("线程 " + threadId + " 开始执行...");
Thread.sleep((long) (Math.random() * 3000)); // 模拟线程的执行时间
System.out.println("线程 " + threadId + " 到达栅栏点,等待其他线程...");
barrier.await(); // 等待其他线程到达
System.out.println("线程 " + threadId + " 越过栅栏,继续执行...");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
在这个例子中,我们创建了一个 CyclicBarrier
,指定了 3 个线程参与同步,并且定义了一个 barrier action,在所有线程到达栅栏点后执行。每个线程模拟执行一段时间,然后调用 await()
方法等待其他线程。当所有线程都到达时,barrier action 被执行,所有线程被释放,继续执行。
3. 异常处理
CyclicBarrier
的 await()
方法会抛出两种异常:
InterruptedException
:当线程在等待过程中被中断时抛出。BrokenBarrierException
:当栅栏被破坏时抛出。栅栏被破坏的原因可能是:- 某个线程在等待过程中被中断。
- barrier action 执行过程中抛出异常。
reset()
方法被调用。
当栅栏被破坏时,所有等待的线程都会抛出 BrokenBarrierException
。需要注意的是,栅栏被破坏后,不能再继续使用,需要重新创建。
4. reset()
方法
CyclicBarrier
提供了 reset()
方法,可以重置栅栏。调用 reset()
方法后,计数器会被重置为初始值,并且所有等待的线程都会抛出 BrokenBarrierException
。通常在发生异常需要重新开始同步时使用。
5. 应用场景
CyclicBarrier
适用于以下场景:
- 并行计算: 将一个大任务分解成多个子任务,每个线程执行一个子任务,然后使用
CyclicBarrier
等待所有子任务完成,再进行结果汇总。 - 多阶段计算: 将一个计算过程分成多个阶段,每个线程执行一个阶段的任务,然后使用
CyclicBarrier
等待所有线程完成当前阶段,再进入下一个阶段。 - 游戏模拟: 模拟多个玩家同时进入游戏场景,需要等待所有玩家加载完成才能开始游戏。
6. 示例:并行计算
假设我们需要计算一个大型数组的平均值,可以将数组分成多个部分,每个线程计算一部分的平均值,然后使用 CyclicBarrier
等待所有线程完成,再将各个部分的平均值合并,得到最终的平均值。
import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ParallelAverage {
private static final int NUMBER_OF_THREADS = 4;
private static final int ARRAY_SIZE = 1000000;
private static final int CHUNK_SIZE = ARRAY_SIZE / NUMBER_OF_THREADS;
private static final double[] array = new double[ARRAY_SIZE];
private static double[] partialSums = new double[NUMBER_OF_THREADS];
private static double totalSum = 0;
private static final CyclicBarrier barrier = new CyclicBarrier(NUMBER_OF_THREADS, () -> {
System.out.println("所有线程完成局部求和,开始汇总...");
for (double sum : partialSums) {
totalSum += sum;
}
System.out.println("总和为:" + totalSum);
System.out.println("平均值为:" + totalSum / ARRAY_SIZE);
});
public static void main(String[] args) {
// 初始化数组
Arrays.fill(array, 1.0);
ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
for (int i = 0; i < NUMBER_OF_THREADS; i++) {
final int threadId = i;
final int start = threadId * CHUNK_SIZE;
final int end = (threadId == NUMBER_OF_THREADS - 1) ? ARRAY_SIZE : (threadId + 1) * CHUNK_SIZE;
executor.execute(() -> {
try {
double sum = 0;
for (int j = start; j < end; j++) {
sum += array[j];
}
partialSums[threadId] = sum;
System.out.println("线程 " + threadId + " 完成局部求和,结果为:" + sum);
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
Phaser:更灵活的同步器
Phaser
是 Java 7 引入的一个更灵活的同步器,它提供了比 CyclicBarrier
更强大的功能。Phaser
可以动态地调整参与同步的线程数量,并且可以支持多个同步阶段。
1. 原理与机制
Phaser
的核心概念是 phase(阶段)。Phaser
维护一个 phase number,表示当前所处的阶段。每个线程可以 register(注册)到 Phaser
,表示参与同步。当线程完成当前阶段的任务后,可以调用 arriveAndAwaitAdvance()
方法,表示到达当前阶段的终点,并等待其他线程。当所有注册的线程都到达当前阶段的终点后,Phaser
会自动进入下一个阶段(phase number 加 1),并唤醒所有等待的线程。
Phaser
的主要特点:
- 动态注册: 可以随时注册新的线程参与同步。
- 可配置终止: 可以配置当所有线程都取消注册后,自动终止
Phaser
,不再进入下一个阶段。 - 分层结构: 可以创建分层的
Phaser
,实现更复杂的同步逻辑。
2. 基本用法
import java.util.concurrent.Phaser;
public class PhaserExample {
private static final int NUMBER_OF_THREADS = 3;
private static final Phaser phaser = new Phaser(1) { // 初始注册一个线程 (主线程)
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("Phase " + phase + " 完成,开始 Phase " + (phase + 1));
return registeredParties == 0; // 当没有线程注册时,终止 Phaser
}
};
public static void main(String[] args) {
System.out.println("Phaser 开始...");
for (int i = 0; i < NUMBER_OF_THREADS; i++) {
phaser.register(); // 注册线程
final int threadId = i;
new Thread(() -> {
try {
System.out.println("线程 " + threadId + " 开始执行 Phase " + phaser.getPhase());
Thread.sleep((long) (Math.random() * 3000));
System.out.println("线程 " + threadId + " 完成 Phase " + phaser.getPhase() + ",等待其他线程...");
phaser.arriveAndAwaitAdvance(); // 等待其他线程
System.out.println("线程 " + threadId + " 继续执行 Phase " + phaser.getPhase());
System.out.println("线程 " + threadId + " 开始执行 Phase " + phaser.getPhase());
Thread.sleep((long) (Math.random() * 3000));
System.out.println("线程 " + threadId + " 完成 Phase " + phaser.getPhase() + ",等待其他线程...");
phaser.arriveAndAwaitAdvance(); // 等待其他线程
System.out.println("线程 " + threadId + " 继续执行 Phase " + phaser.getPhase());
phaser.arriveAndDeregister(); // 注销线程
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 主线程也参与同步
phaser.arriveAndDeregister();
}
}
在这个例子中,我们创建了一个 Phaser
,初始注册了一个线程(主线程)。在 onAdvance()
方法中,我们定义了每个 phase 完成后的操作,并且判断是否终止 Phaser
。每个子线程注册到 Phaser
后,执行两个 phase 的任务,然后注销自己。主线程也参与同步,并在最后注销自己。
3. onAdvance()
方法
onAdvance()
方法是 Phaser
的一个关键方法。它在每个 phase 完成后被调用,用于执行一些公共的任务,例如更新状态、检查终止条件等。onAdvance()
方法的返回值是一个 boolean 值,表示是否终止 Phaser
。如果返回 true,则 Phaser
终止,否则 Phaser
进入下一个 phase。
4. 常用方法
方法 | 描述 |
---|---|
register() |
注册一个线程参与同步。 |
bulkRegister(int parties) |
批量注册多个线程。 |
arrive() |
线程到达当前阶段终点,但不等待其他线程。 |
arriveAndAwaitAdvance() |
线程到达当前阶段终点,并等待其他线程。 |
arriveAndDeregister() |
线程到达当前阶段终点,并注销自己。 |
getPhase() |
获取当前 phase number。 |
getRegisteredParties() |
获取当前注册的线程数量。 |
getArrivedParties() |
获取当前已到达当前阶段终点的线程数量。 |
getUnarrivedParties() |
获取当前未到达当前阶段终点的线程数量。 |
isTerminated() |
判断 Phaser 是否已终止。 |
5. 分层 Phaser
Phaser
支持分层结构,可以创建树状的 Phaser
。父 Phaser
的 advance 会触发子 Phaser
的 advance,可以实现更复杂的同步逻辑。
6. 应用场景
Phaser
适用于以下场景:
- 动态线程池: 当线程池中的线程数量动态变化时,可以使用
Phaser
来协调线程的执行。 - 迭代计算: 当计算过程需要进行多次迭代,并且每次迭代的线程数量可能不同时,可以使用
Phaser
来进行同步。 - 复杂的任务分解: 当任务可以分解成多个阶段,并且每个阶段的线程数量和任务逻辑都不同时,可以使用
Phaser
来进行同步。
7. 示例:模拟赛马
import java.util.Random;
import java.util.concurrent.Phaser;
public class HorseRace {
private static final int NUMBER_OF_HORSES = 7;
private static final int PHASES = 3;
private static final Phaser phaser = new Phaser(1); // 注册主线程
public static void main(String[] args) {
Random random = new Random();
for (int i = 0; i < NUMBER_OF_HORSES; i++) {
phaser.register(); // 注册马
new Thread(() -> {
try {
String horseName = Thread.currentThread().getName();
System.out.println(horseName + " 准备就绪!");
phaser.arriveAndAwaitAdvance(); // 等待所有马准备就绪
for (int phase = 1; phase <= PHASES; phase++) {
System.out.println(horseName + " 开始冲刺! (Phase " + phase + ")");
Thread.sleep((long) (random.nextDouble() * 3000)); // 模拟冲刺时间
System.out.println(horseName + " 完成冲刺! (Phase " + phase + ")");
phaser.arriveAndAwaitAdvance(); // 等待所有马完成当前阶段
}
System.out.println(horseName + " 比赛结束!");
phaser.arriveAndDeregister(); // 注销马
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Horse-" + i).start();
}
// 主线程等待所有马完成比赛
phaser.arriveAndAwaitAdvance(); // 等待所有马准备就绪
for (int i = 1; i <= PHASES; i++) {
System.out.println("=== Phase " + i + " 结束 ===");
phaser.arriveAndAwaitAdvance(); // 等待所有马完成当前阶段
}
System.out.println("所有马完成比赛,Phaser 结束。");
phaser.arriveAndDeregister(); // 注销主线程
}
}
CyclicBarrier vs. Phaser:选择合适的工具
特性 | CyclicBarrier | Phaser |
---|---|---|
线程数量 | 固定,初始化时指定。 | 动态,可以随时注册和注销。 |
阶段数量 | 只有一个阶段,所有线程到达栅栏后,可以循环使用。 | 可以有多个阶段,每个阶段完成后,自动进入下一个阶段。 |
灵活性 | 较低,适用于线程数量固定、只有一个同步点的场景。 | 较高,适用于线程数量动态变化、需要多个同步阶段的场景。 |
异常处理 | 当栅栏被破坏时,所有等待的线程都会抛出 BrokenBarrierException ,需要重新创建。 |
没有专门的异常处理机制,需要通过 onAdvance() 方法来判断是否终止 Phaser 。 |
适用场景 | 并行计算、多阶段计算、游戏模拟等。 | 动态线程池、迭代计算、复杂的任务分解等。 |
是否支持分层结构 | 不支持 | 支持 |
简单来说,如果你的场景是线程数量固定,且只需要一个同步点,那么 CyclicBarrier
是一个不错的选择。如果你的场景是线程数量动态变化,或者需要多个同步阶段,那么 Phaser
提供了更大的灵活性。
总结
CyclicBarrier
和 Phaser
都是 Java 并发编程中强大的同步工具。CyclicBarrier
适用于线程数量固定,同步点单一的场景;而 Phaser
则更为灵活,能够适应线程数量动态变化和多阶段同步的复杂情况。理解它们的原理和适用场景,能够帮助我们编写更高效、更健壮的并发程序。