各位观众老爷,大家好!我是今天的主讲人,咱们今天聊点并发编程里比较高级的玩具:Phaser
、CountDownLatch
和 CyclicBarrier
。别怕,这玩意儿虽然名字唬人,但理解起来其实挺有意思的,就像拆盲盒,拆开了就觉得“就这?”。咱们尽量用大白话,结合代码示例,把它们扒个精光,让大家以后遇到并发协调问题,能像老中医一样,精准把脉,药到病除。
一、并发协调的那些事儿
首先,我们要搞明白,为啥需要这些并发协调器。想象一下,你和几个朋友一起组队打游戏。
-
CountDownLatch (倒计时门闩): 你们约定好,必须所有人都加载完游戏资源,才能开始游戏。只有等待所有玩家准备完毕,游戏才能开始。
-
CyclicBarrier (循环栅栏): 游戏每进行一轮,都需要所有人确认准备好,才能进入下一轮。就像玩大富翁,所有人走到起点才能掷骰子。
-
Phaser (相位器): 游戏有多个阶段,比如准备阶段、战斗阶段、结算阶段。每个阶段都需要所有玩家完成才能进入下一个阶段。而且,玩家可以动态加入或退出游戏(比如有人掉线了)。
简单来说,这些工具就是用来同步多个线程的,让它们按照我们设定的步骤,协同工作,避免出现“你跑太快,等等我”的情况。
二、CountDownLatch:一锤定音的倒计时
CountDownLatch
的原理很简单,就像一个倒计时器。你给它设置一个初始值,然后每个线程完成任务后,就调用 countDown()
方法,让计数器减一。当计数器减到零时,所有等待 await()
方法的线程就会被唤醒。
适用场景:
- 主线程等待多个子线程完成任务。
- 测试用例中,等待多个异步操作完成。
- 并发初始化:确保所有服务都启动完毕后再提供服务。
代码示例:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int numberOfThreads = 3;
CountDownLatch latch = new CountDownLatch(numberOfThreads); // 初始化计数器为 3
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
final int taskId = i;
executor.submit(() -> {
try {
System.out.println("线程 " + taskId + " 开始执行任务...");
Thread.sleep((long) (Math.random() * 2000)); // 模拟任务执行时间
System.out.println("线程 " + taskId + " 任务执行完毕!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 计数器减一
}
});
}
System.out.println("主线程等待所有子线程完成...");
latch.await(); // 主线程阻塞,直到计数器为零
System.out.println("所有子线程执行完毕,主线程继续执行!");
executor.shutdown(); // 关闭线程池
}
}
代码解释:
CountDownLatch latch = new CountDownLatch(numberOfThreads);
:创建一个CountDownLatch
对象,初始计数器值为numberOfThreads
。latch.countDown();
:每个子线程完成任务后,调用countDown()
方法,计数器减一。latch.await();
:主线程调用await()
方法,进入阻塞状态,直到计数器减到零。executor.shutdown();
:关闭线程池,释放资源。
注意事项:
CountDownLatch
是一次性的,计数器减到零之后就不能重置了。- 如果某个线程在
await()
期间被中断,会抛出InterruptedException
异常。
三、CyclicBarrier:周而复始的同步
CyclicBarrier
就像一个栅栏,当所有线程都到达栅栏处时,栅栏才会打开,允许所有线程继续执行。与 CountDownLatch
不同的是,CyclicBarrier
可以重复使用,每次所有线程到达栅栏后,栅栏会重置,等待下一批线程。
适用场景:
- 多线程分段计算:将一个大的计算任务分成多个阶段,每个阶段都需要所有线程完成才能进入下一个阶段。
- 模拟并发:模拟多个用户同时访问服务器。
- 并行算法:例如,并行遗传算法,每一代都需要所有个体完成评估才能进入下一代。
代码示例:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierExample {
public static void main(String[] args) {
int numberOfThreads = 3;
CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> {
System.out.println("所有线程到达栅栏,开始下一轮!"); // Runnable 在所有线程到达栅栏后执行
});
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
final int taskId = i;
executor.submit(() -> {
try {
for (int round = 1; round <= 3; round++) {
System.out.println("线程 " + taskId + " 正在执行第 " + round + " 轮任务...");
Thread.sleep((long) (Math.random() * 2000)); // 模拟任务执行时间
System.out.println("线程 " + taskId + " 第 " + round + " 轮任务执行完毕,等待其他线程...");
barrier.await(); // 等待其他线程到达栅栏
System.out.println("线程 " + taskId + " 开始执行第 " + (round + 1) + " 轮任务(如果存在)...");
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
代码解释:
CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> { ... });
:创建一个CyclicBarrier
对象,numberOfThreads
指定参与的线程数量,第二个参数是一个Runnable
对象,当所有线程到达栅栏后,会执行这个Runnable
对象。barrier.await();
:每个线程调用await()
方法,进入等待状态,直到所有线程都到达栅栏。- 当所有线程都到达栅栏后,会执行
Runnable
对象,然后唤醒所有线程,栅栏重置,等待下一批线程。
注意事项:
- 如果某个线程在
await()
期间被中断,或者栅栏被重置,会抛出InterruptedException
或BrokenBarrierException
异常。 - 可以指定一个
Runnable
对象,在所有线程到达栅栏后执行一些额外的操作。
四、Phaser:灵活多变的相位器
Phaser
比 CountDownLatch
和 CyclicBarrier
更加灵活,它支持动态地注册和注销参与者,并且可以定义多个阶段(phase)。每个阶段都需要所有参与者完成才能进入下一个阶段。
适用场景:
- 复杂的并发任务,需要动态地调整参与者数量。
- 多阶段的计算任务,每个阶段都需要同步。
- 模拟游戏中的不同阶段,例如准备阶段、战斗阶段、结算阶段。
Phaser 的核心概念:
- Phase (阶段):
Phaser
的核心是阶段,每个阶段都需要所有注册的参与者完成才能进入下一个阶段。 - Participant (参与者): 线程需要在
Phaser
中注册才能参与同步。可以使用register()
方法注册,arriveAndDeregister()
方法注销。 - arriveAndAwaitAdvance(): 线程到达当前阶段的终点,并等待其他线程。
- arrive(): 线程到达当前阶段的终点,但不等待其他线程。
- arriveAndDeregister(): 线程到达当前阶段的终点,并注销自己。
- onAdvance():
Phaser
进入下一个阶段时,会调用onAdvance()
方法。可以重写这个方法来执行一些额外的操作,例如更新状态、判断是否需要结束。
代码示例:
import java.util.concurrent.Phaser;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class PhaserExample {
public static void main(String[] args) {
int numberOfThreads = 3;
Phaser phaser = new Phaser(1) { // 初始注册一个参与者(主线程)
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("==== Phase " + phase + " 完成,准备进入下一个 Phase ====");
return registeredParties == 0; // 如果没有参与者,则结束 Phaser
}
};
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
System.out.println("主线程注册了一个参与者,当前 Phase: " + phaser.getPhase());
for (int i = 0; i < numberOfThreads; i++) {
final int taskId = i;
phaser.register(); // 注册一个参与者
executor.submit(() -> {
try {
System.out.println("线程 " + taskId + " 已注册,当前 Phase: " + phaser.getPhase());
Thread.sleep((long) (Math.random() * 2000));
System.out.println("线程 " + taskId + " 完成第一阶段任务,等待其他线程...");
phaser.arriveAndAwaitAdvance(); // 到达第一阶段终点,并等待其他线程
System.out.println("线程 " + taskId + " 开始执行第二阶段任务...");
Thread.sleep((long) (Math.random() * 2000));
System.out.println("线程 " + taskId + " 完成第二阶段任务,准备退出...");
phaser.arriveAndDeregister(); // 到达第二阶段终点,并注销自己
System.out.println("线程 " + taskId + " 已注销,当前 Phase: " + phaser.getPhase());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
phaser.arriveAndDeregister(); // 主线程完成任务,注销自己
System.out.println("主线程已注销,当前 Phase: " + phaser.getPhase());
executor.shutdown();
}
}
代码解释:
Phaser phaser = new Phaser(1) { ... };
:创建一个Phaser
对象,初始注册一个参与者(主线程)。onAdvance()
方法在每个阶段完成后被调用,可以重写这个方法来执行一些额外的操作。phaser.register();
:注册一个新的参与者。phaser.arriveAndAwaitAdvance();
:线程到达当前阶段的终点,并等待其他线程。当所有线程都到达终点后,Phaser
会进入下一个阶段,并唤醒所有等待的线程。phaser.arriveAndDeregister();
:线程到达当前阶段的终点,并注销自己。
注意事项:
Phaser
比CountDownLatch
和CyclicBarrier
更加灵活,但使用起来也更复杂。- 需要仔细考虑参与者的注册和注销时机,避免出现死锁或错误。
onAdvance()
方法可以用来控制Phaser
的行为,例如判断是否需要结束。
五、三剑客对比:选哪个好呢?
为了方便大家选择,我把这三个家伙的特点总结成一张表格:
特性 | CountDownLatch | CyclicBarrier | Phaser |
---|---|---|---|
功能 | 一次性倒计时同步 | 循环栅栏同步 | 动态相位同步 |
可重用性 | 否 | 是 | 是 |
参与者数量 | 初始化时固定 | 初始化时固定 | 可动态调整 |
阶段 | 单一阶段 | 单一阶段 | 多阶段 |
适用场景 | 主线程等待子线程完成任务 | 多线程分段计算,模拟并发 | 复杂的并发任务,多阶段计算任务 |
灵活性 | 低 | 中 | 高 |
复杂度 | 低 | 中 | 高 |
主要方法 | countDown() , await() |
await() , reset() |
register() , arriveAndAwaitAdvance() , arriveAndDeregister() , onAdvance() |
总结:
- 如果你的需求很简单,只需要等待所有线程完成某个任务,
CountDownLatch
就足够了。 - 如果你的任务需要分成多个阶段,每个阶段都需要所有线程同步,
CyclicBarrier
是一个不错的选择。 - 如果你的任务非常复杂,需要动态地调整参与者数量,并且需要支持多个阶段,那么
Phaser
就是你的最佳选择。
六、总结与展望
今天我们一起学习了 CountDownLatch
、CyclicBarrier
和 Phaser
这三个并发协调器。它们各有特点,适用于不同的场景。希望通过今天的讲解,大家能够对它们有更深入的理解,在实际开发中能够灵活运用,写出更高效、更可靠的并发程序。
记住,并发编程就像玩火,玩得好能给你带来光明,玩不好就可能把自己烧伤。所以,一定要慎之又慎,多学习,多实践,才能真正掌握并发编程的精髓。
好了,今天的分享就到这里,感谢大家的收听!下次再见!