Java 高阶同步器:Phaser、Exchanger 在复杂任务协作中的灵活应用
大家好,今天我们来深入探讨 Java 并发包中两个相对高级的同步器:Phaser 和 Exchanger。 相较于 CountDownLatch、CyclicBarrier、Semaphore 这些我们常用的同步工具,Phaser 和 Exchanger 在处理更复杂、更灵活的任务协作场景时,能发挥更大的作用。 接下来,我将通过代码示例和实际场景,详细讲解它们的用法和优势。
1. Phaser:灵活可变的同步屏障
Phaser 是一个比 CyclicBarrier 更灵活的同步屏障。 它可以动态地调整参与者的数量,并且可以分阶段执行任务。 这使得它非常适合处理迭代计算、分而治之等复杂场景。
1.1 Phaser 的基本概念
- Phase (阶段):
Phaser的核心概念,代表一个执行阶段。 每次调用arriveAndAwaitAdvance()方法,都会使Phaser进入下一个阶段。 - Parties (参与者): 参与同步的线程数量。
Phaser允许动态地增加或减少参与者。 - Registration (注册): 线程通过
register()、bulkRegister()方法向Phaser注册,成为参与者。 - Arrival (到达): 线程通过
arrive()、arriveAndAwaitAdvance()方法通知Phaser自己已完成当前阶段的任务。 - Termination (终止):
Phaser可以通过isTerminated()方法判断是否已终止,通常由最后一个到达的线程调用onAdvance()方法来控制。
1.2 Phaser 的基本用法
import java.util.concurrent.Phaser;
public class PhaserExample {
public static void main(String[] args) throws InterruptedException {
Phaser phaser = new Phaser(3); // 初始注册 3 个参与者
for (int i = 0; i < 3; i++) {
final int threadId = i;
new Thread(() -> {
System.out.println("Thread " + threadId + " started, phase: " + phaser.getPhase());
phaser.arriveAndAwaitAdvance(); // 等待其他线程到达
System.out.println("Thread " + threadId + " executing task in phase: " + phaser.getPhase());
try {
Thread.sleep(1000); // 模拟任务执行
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndAwaitAdvance(); // 等待其他线程到达
System.out.println("Thread " + threadId + " completed task in phase: " + phaser.getPhase());
phaser.arriveAndDeregister(); // 线程完成任务,注销自己
}).start();
}
// 主线程等待所有线程完成
while (!phaser.isTerminated()) {
Thread.sleep(100);
}
System.out.println("All threads completed.");
}
}
在这个例子中,我们创建了一个 Phaser,初始注册了 3 个参与者。 每个线程启动后,首先等待其他线程到达,然后执行任务,最后等待其他线程完成任务。 每个线程在完成任务后,都会注销自己。 arriveAndDeregister() 方法将到达并注销线程,减少参与者数量。 当所有线程都注销后,Phaser 将终止。
1.3 Phaser 的高级用法:onAdvance() 方法
onAdvance() 方法是 Phaser 的一个关键特性。 它允许我们在每个阶段结束时执行一些自定义逻辑,例如检查是否需要终止 Phaser。
import java.util.concurrent.Phaser;
import java.util.Random;
public class PhaserTerminationExample {
public static void main(String[] args) throws InterruptedException {
Phaser phaser = new Phaser(3) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("Phase " + phase + " completed. Registered parties: " + registeredParties);
return registeredParties == 0 || phase > 5; // 如果没有参与者或超过 5 个阶段,则终止 Phaser
}
};
Random random = new Random();
for (int i = 0; i < 3; i++) {
final int threadId = i;
new Thread(() -> {
while (!phaser.isTerminated()) {
System.out.println("Thread " + threadId + " started, phase: " + phaser.getPhase());
phaser.arriveAndAwaitAdvance(); // 等待其他线程到达
if (phaser.isTerminated()) {
System.out.println("Thread " + threadId + " exiting because Phaser is terminated.");
break;
}
System.out.println("Thread " + threadId + " executing task in phase: " + phaser.getPhase());
try {
Thread.sleep(random.nextInt(500)); // 模拟任务执行
} catch (InterruptedException e) {
e.printStackTrace();
}
}
phaser.arriveAndDeregister();
System.out.println("Thread " + threadId + " deregistered.");
}).start();
}
}
}
在这个例子中,我们重写了 onAdvance() 方法。 onAdvance() 方法在每个阶段结束后被调用,并返回一个 boolean 值,指示 Phaser 是否应该终止。 如果返回 true,则 Phaser 将终止,否则将继续进行下一个阶段。 这里,我们设置了两个终止条件:如果没有参与者(registeredParties == 0)或超过 5 个阶段(phase > 5)。
1.4 Phaser 的应用场景:迭代计算
Phaser 非常适合用于迭代计算,例如图像处理、机器学习等。 在每次迭代中,多个线程并行处理数据,然后同步到下一个迭代。
import java.util.concurrent.Phaser;
import java.util.Arrays;
import java.util.Random;
public class PhaserIterationExample {
public static void main(String[] args) throws InterruptedException {
int numThreads = 4;
int iterations = 10;
int arraySize = 100;
int[] data = new int[arraySize];
Random random = new Random();
for (int i = 0; i < arraySize; i++) {
data[i] = random.nextInt(100);
}
Phaser phaser = new Phaser(numThreads) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("Iteration " + phase + " completed.");
return phase >= iterations || registeredParties == 0;
}
};
for (int i = 0; i < numThreads; i++) {
final int threadId = i;
new Thread(() -> {
int start = threadId * (arraySize / numThreads);
int end = (threadId == numThreads - 1) ? arraySize : (threadId + 1) * (arraySize / numThreads);
while (!phaser.isTerminated()) {
// 每个线程处理一部分数据
for (int j = start; j < end; j++) {
data[j] = data[j] * 2; // 模拟数据处理
}
System.out.println("Thread " + threadId + " processed data in iteration " + phaser.getPhase());
// 等待其他线程完成
phaser.arriveAndAwaitAdvance();
}
phaser.arriveAndDeregister();
System.out.println("Thread " + threadId + " deregistered.");
}).start();
}
// 主线程等待所有线程完成
while (!phaser.isTerminated()) {
Thread.sleep(100);
}
System.out.println("All threads completed " + iterations + " iterations.");
}
}
在这个例子中,我们使用 Phaser 来同步多个线程,每个线程负责处理一部分数据。 在每次迭代中,每个线程将自己的数据乘以 2。 Phaser 确保所有线程都完成当前迭代后,才会进入下一个迭代。
1.5 Phaser 的优势
- 动态参与者数量:
Phaser允许动态地增加或减少参与者,这使得它非常适合处理任务数量不确定的场景。 - 分阶段执行:
Phaser可以分阶段执行任务,每个阶段可以执行不同的逻辑。 - 自定义终止条件:
Phaser允许自定义终止条件,这使得它可以灵活地控制任务的执行。
1.6 Phaser 和 CyclicBarrier 的区别
| 特性 | Phaser | CyclicBarrier |
|---|---|---|
| 参与者数量 | 可动态调整 | 固定 |
| 阶段执行 | 可分阶段执行,onAdvance() 方法可自定义逻辑 |
仅同步等待,到达屏障后执行 Runnable |
| 灵活性 | 更高 | 较低 |
| 应用场景 | 迭代计算、分而治之等复杂场景 | 简单同步等待 |
2. Exchanger:线程间安全交换数据
Exchanger 允许两个线程安全地交换数据。 它提供了一个同步点,当两个线程都到达该点时,它们将交换各自的数据。
2.1 Exchanger 的基本概念
- Exchange Point (交换点):
Exchanger的核心概念。 线程到达交换点后,将等待另一个线程到达。 - Data Exchange (数据交换): 当两个线程都到达交换点时,它们将交换各自的数据。
2.2 Exchanger 的基本用法
import java.util.concurrent.Exchanger;
public class ExchangerExample {
public static void main(String[] args) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
String data = "Message from Thread 1";
try {
System.out.println("Thread 1: Sending data: " + data);
String receivedData = exchanger.exchange(data); // 交换数据
System.out.println("Thread 1: Received data: " + receivedData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
String data = "Message from Thread 2";
try {
System.out.println("Thread 2: Sending data: " + data);
String receivedData = exchanger.exchange(data); // 交换数据
System.out.println("Thread 2: Received data: " + receivedData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
在这个例子中,我们创建了一个 Exchanger,用于交换字符串类型的数据。 两个线程分别发送一条消息,并等待接收另一条消息。 当两个线程都到达 exchange() 方法时,它们将交换各自的消息。
2.3 Exchanger 的应用场景:生产者-消费者模式
Exchanger 可以用于实现生产者-消费者模式,其中一个线程负责生产数据,另一个线程负责消费数据。
import java.util.concurrent.Exchanger;
import java.util.Random;
public class ExchangerProducerConsumer {
public static void main(String[] args) throws InterruptedException {
Exchanger<DataBuffer> exchanger = new Exchanger<>();
DataBuffer initialBuffer = new DataBuffer();
// 生产者线程
new Thread(() -> {
DataBuffer buffer = initialBuffer;
Random random = new Random();
try {
for (int i = 0; i < 10; i++) {
// 生产数据
buffer.setData("Data " + i);
System.out.println("Producer: Produced data: " + buffer.getData());
// 交换数据
buffer = exchanger.exchange(buffer);
// 模拟生产耗时
Thread.sleep(random.nextInt(500));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 消费者线程
new Thread(() -> {
DataBuffer buffer = new DataBuffer();
try {
for (int i = 0; i < 10; i++) {
// 交换数据
buffer = exchanger.exchange(buffer);
// 消费数据
System.out.println("Consumer: Consumed data: " + buffer.getData());
buffer.setData(null); // 清空数据
// 模拟消费耗时
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
static class DataBuffer {
private String data;
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}
}
在这个例子中,生产者线程负责生产数据,并将数据存储在 DataBuffer 对象中。 消费者线程负责消费数据,并从 DataBuffer 对象中读取数据。 Exchanger 用于在生产者和消费者之间交换 DataBuffer 对象。
2.4 Exchanger 的优势
- 线程间安全交换数据:
Exchanger确保线程间安全地交换数据,避免了数据竞争和并发问题。 - 简单易用:
Exchanger的 API 非常简单,易于使用。 - 高效:
Exchanger的实现经过优化,性能较高。
2.5 Exchanger 的局限性
- 只能用于两个线程:
Exchanger只能用于两个线程之间交换数据。 - 必须同时到达: 两个线程必须同时到达交换点,否则将一直等待。
3. 总结与应用场景选择
- Phaser: 适用于需要动态调整参与者数量、分阶段执行任务的复杂同步场景,例如迭代计算、分而治之等。 通过
onAdvance()可以自定义终止条件,灵活控制任务执行流程。 - Exchanger: 适用于两个线程之间安全交换数据的场景,例如生产者-消费者模式。 确保数据安全交换,避免并发问题。
希望今天的分享能够帮助大家更好地理解和应用 Phaser 和 Exchanger 这两个强大的同步器。 谢谢大家!