好的,我们开始今天的讲座,主题是Java中的Phaser同步器:实现多阶段、可重用同步屏障的底层机制。
Phaser同步器:多阶段同步的利器
在并发编程中,我们经常需要多个线程在执行到某个特定点时进行同步,确保所有线程都到达该点后,才能继续执行后续操作。Java提供了多种同步工具,例如CountDownLatch、CyclicBarrier等,但它们在处理多阶段同步或需要动态调整参与线程数量的场景下,显得有些力不从心。这时,Phaser就派上了用场。
Phaser是一个灵活且强大的同步器,它提供了一种可重用、多阶段的同步屏障机制。它允许一组线程在多个阶段内协调工作,并且可以动态地注册和注销参与者,这使得它非常适合处理复杂并发场景,例如并行迭代、分而治之算法等。
Phaser的核心概念
要理解Phaser,我们需要掌握几个关键概念:
- Phase(阶段): Phaser的核心是阶段的概念。每个Phaser对象维护一个内部的阶段计数器,初始值为0。当所有已注册的参与者都到达当前阶段的同步点时,Phaser会将阶段计数器递增,进入下一个阶段。
- Parties(参与者): 参与者是指注册到Phaser中并参与同步的线程。每个线程在完成当前阶段的工作后,会调用Phaser的
arriveAndAwaitAdvance()方法,等待其他参与者到达。 - Registration(注册): 线程可以通过
register()方法将自己注册为Phaser的参与者。注册后,线程就可以参与到Phaser的同步过程中。 - Deregistration(注销): 线程可以通过
arriveAndDeregister()方法注销自己,表示不再参与后续的同步。 - Termination(终止): Phaser可以被终止,终止后,所有
arriveAndAwaitAdvance()方法都会立即返回,不会再等待。
Phaser的主要方法
Phaser提供了一系列方法来控制同步过程,其中最常用的包括:
| 方法名 | 描述 |
|---|---|
Phaser() |
构造函数,创建一个没有参与者的Phaser。 |
Phaser(int parties) |
构造函数,创建一个具有指定数量参与者的Phaser。 |
register() |
将当前线程注册为Phaser的参与者,增加参与者数量。 |
bulkRegister(int parties) |
批量注册指定数量的参与者。 |
arrive() |
通知Phaser当前线程已经到达当前阶段的同步点,但不等待其他线程。 |
arriveAndAwaitAdvance() |
通知Phaser当前线程已经到达当前阶段的同步点,并等待其他线程到达。当所有线程都到达后,Phaser进入下一个阶段,该方法返回下一个阶段的编号。 |
arriveAndDeregister() |
通知Phaser当前线程已经到达当前阶段的同步点,并注销自己。这通常用于处理那些只需要参与部分阶段同步的线程。 |
awaitAdvance(int phase) |
显式等待指定的阶段完成。如果指定的阶段已经完成,则立即返回。 |
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) |
与awaitAdvance类似,但允许在等待期间被中断,并且可以设置超时时间。 |
getPhase() |
返回当前Phaser的阶段编号。 |
getRegisteredParties() |
返回已注册的参与者数量。 |
getArrivedParties() |
返回已到达当前阶段的参与者数量。 |
getUnarrivedParties() |
返回尚未到达当前阶段的参与者数量。 |
isTerminated() |
判断Phaser是否已经终止。 |
forceTermination() |
强制终止Phaser,所有等待线程都会立即返回。 |
onAdvance(int phase, int registeredParties) |
当Phaser进入下一个阶段时,会调用此方法。子类可以重写此方法来执行一些额外的操作,例如更新共享数据、检查终止条件等。 |
Phaser的使用示例:并行数据处理
假设我们有一个大型数据集,需要将其分成多个块,然后由多个线程并行处理。每个线程处理完自己的数据块后,需要等待其他线程完成,才能进行下一步的汇总操作。这个过程可以重复多次,直到所有数据处理完毕。
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class PhaserExample {
private static final int NUM_THREADS = 5;
private static final int NUM_PHASES = 3;
public static void main(String[] args) throws InterruptedException {
Phaser phaser = new Phaser(NUM_THREADS) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("Phase " + phase + " completed. Registered parties: " + registeredParties);
return phase >= NUM_PHASES - 1 || registeredParties == 0; // Terminate after NUM_PHASES or if no parties are registered
}
};
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < NUM_THREADS; i++) {
final int threadId = i;
Thread thread = new Thread(() -> {
System.out.println("Thread " + threadId + " started.");
for (int phase = 0; phase < NUM_PHASES; phase++) {
// Simulate some work
try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(500, 1500));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
System.out.println("Thread " + threadId + " completed phase " + phase + ". Arriving and awaiting.");
phaser.arriveAndAwaitAdvance(); // Wait for all threads to complete this phase
System.out.println("Thread " + threadId + " continues after phase " + phase);
}
System.out.println("Thread " + threadId + " finished all phases.");
});
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("All threads finished. Phaser terminated: " + phaser.isTerminated());
}
}
在这个例子中,我们创建了一个Phaser对象,并注册了5个参与者。每个线程模拟执行3个阶段的任务。在每个阶段完成后,线程调用arriveAndAwaitAdvance()方法,等待其他线程完成。当所有线程都到达当前阶段的同步点后,Phaser会进入下一个阶段,并唤醒所有等待的线程。onAdvance方法在每个阶段完成后被调用,我们可以在这里执行一些额外的操作,例如打印阶段信息或检查终止条件。
Phaser的动态注册和注销
Phaser的一个重要特性是可以动态地注册和注销参与者。这使得它非常适合处理那些参与线程数量不固定的场景。
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class PhaserDynamicExample {
public static void main(String[] args) throws InterruptedException {
Phaser phaser = new Phaser(1); // Initial party is the main thread
System.out.println("Phaser started by main thread.");
// Simulate adding and removing parties dynamically
for (int i = 0; i < 3; i++) {
// Add a new worker
int workerId = i + 1;
phaser.register();
System.out.println("Main thread registered worker " + workerId + ". Total parties: " + phaser.getRegisteredParties());
new Thread(() -> {
System.out.println("Worker " + workerId + " started.");
try {
// Simulate some work
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(500, 1500));
System.out.println("Worker " + workerId + " arriving and awaiting phase " + phaser.getPhase());
phaser.arriveAndAwaitAdvance(); // Wait for the main thread and other workers
System.out.println("Worker " + workerId + " continuing after phase " + phaser.getPhase());
// Simulate more work
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(500, 1500));
System.out.println("Worker " + workerId + " arriving and deregistering. Phase: " + phaser.getPhase());
phaser.arriveAndDeregister(); // Deregister after completing the work
System.out.println("Worker " + workerId + " finished.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// Main thread does some work
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(200, 800));
System.out.println("Main thread completed its work for phase " + phaser.getPhase() + ". Awaiting workers.");
phaser.arriveAndAwaitAdvance(); // Wait for all workers
System.out.println("Main thread continues after phase " + phaser.getPhase());
}
// Main thread deregisters itself and terminates the phaser.
phaser.arriveAndDeregister();
System.out.println("Main thread finished and deregistered. Phaser terminated: " + phaser.isTerminated());
}
}
在这个例子中,主线程首先注册自己作为Phaser的参与者。然后,它循环三次,每次都注册一个新的工作线程,并启动该线程。工作线程在完成自己的任务后,会注销自己。主线程在每次循环结束后,也会等待所有工作线程完成。最后,主线程注销自己,并结束程序。
Phaser的onAdvance()方法
onAdvance()方法是一个protected方法,子类可以重写它来执行一些额外的操作,例如更新共享数据、检查终止条件等。该方法在每个阶段完成后被调用,并且接收两个参数:当前阶段的编号和已注册的参与者数量。
import java.util.concurrent.Phaser;
public class PhaserOnAdvanceExample {
public static void main(String[] args) throws InterruptedException {
Phaser phaser = new Phaser(2) { // Main thread and worker thread
private int counter = 0;
@Override
protected boolean onAdvance(int phase, int registeredParties) {
counter++;
System.out.println("onAdvance called. Phase: " + phase + ", Registered parties: " + registeredParties + ", Counter: " + counter);
return counter >= 3; // Terminate after 3 phases
}
};
new Thread(() -> {
while (!phaser.isTerminated()) {
System.out.println("Worker thread arriving and awaiting. Phase: " + phaser.getPhase());
phaser.arriveAndAwaitAdvance();
System.out.println("Worker thread continues after phase " + phaser.getPhase());
}
System.out.println("Worker thread finished.");
}).start();
while (!phaser.isTerminated()) {
System.out.println("Main thread arriving and awaiting. Phase: " + phaser.getPhase());
phaser.arriveAndAwaitAdvance();
System.out.println("Main thread continues after phase " + phaser.getPhase());
Thread.sleep(100);
}
System.out.println("Main thread finished. Phaser terminated.");
}
}
在这个例子中,我们重写了onAdvance()方法,并在其中增加了一个计数器。每次onAdvance()方法被调用时,计数器都会递增。当计数器达到3时,onAdvance()方法返回true,表示Phaser应该终止。
Phaser与CountDownLatch、CyclicBarrier的比较
虽然Phaser、CountDownLatch和CyclicBarrier都可以用于线程同步,但它们之间存在一些关键区别:
| 特性 | CountDownLatch | CyclicBarrier | Phaser |
|---|---|---|---|
| 重用性 | 一次性 | 可重用 | 可重用 |
| 阶段性 | 无 | 单阶段 | 多阶段 |
| 参与者数量 | 初始化时固定 | 初始化时固定 | 动态可调 |
| 终止 | 无法重置或终止 | 可以重置 | 可以强制终止 |
| 适用场景 | 一组线程等待一个事件发生 | 一组线程在某个点同步 | 多阶段同步,参与者数量动态变化的复杂场景 |
Phaser的底层机制
Phaser的底层实现依赖于一个内部的状态变量,该变量包含了当前阶段、已注册的参与者数量、已到达的参与者数量等信息。当线程调用arriveAndAwaitAdvance()方法时,Phaser会将已到达的参与者数量递增,并检查是否所有已注册的参与者都已到达。如果所有参与者都已到达,Phaser会进入下一个阶段,并唤醒所有等待的线程。
为了避免竞争条件,Phaser使用了CAS(Compare and Swap)操作来更新内部状态变量。当多个线程同时尝试更新状态变量时,只有一个线程能够成功,其他线程需要重新尝试。
onAdvance方法的调用也需要同步处理,以确保在进入下一个阶段之前,所有必要的清理和更新操作都已完成。
使用Phaser的注意事项
- 避免在
onAdvance()方法中执行耗时操作,因为这会阻塞所有等待的线程。 - 确保所有参与者最终都会调用
arriveAndAwaitAdvance()或arriveAndDeregister()方法,否则可能会导致死锁。 - 合理设置Phaser的终止条件,避免程序无限期地运行下去。
- 理解
onAdvance的返回值,返回true会终止后续阶段,返回false会继续执行。
总结:掌握Phaser,应对复杂并发场景
Phaser是一个功能强大且灵活的同步器,它提供了一种可重用、多阶段的同步屏障机制。通过动态地注册和注销参与者,以及重写onAdvance()方法,我们可以使用Phaser来处理各种复杂的并发场景。 理解Phaser的核心概念、常用方法和底层机制,能够帮助我们更好地利用它来构建高效、可靠的并发程序。