Java `Phaser` `CountDownLatch` `CyclicBarrier` 复杂并发协调器

各位观众老爷,大家好!我是今天的主讲人,咱们今天聊点并发编程里比较高级的玩具:PhaserCountDownLatchCyclicBarrier。别怕,这玩意儿虽然名字唬人,但理解起来其实挺有意思的,就像拆盲盒,拆开了就觉得“就这?”。咱们尽量用大白话,结合代码示例,把它们扒个精光,让大家以后遇到并发协调问题,能像老中医一样,精准把脉,药到病除。

一、并发协调的那些事儿

首先,我们要搞明白,为啥需要这些并发协调器。想象一下,你和几个朋友一起组队打游戏。

  • CountDownLatch (倒计时门闩): 你们约定好,必须所有人都加载完游戏资源,才能开始游戏。只有等待所有玩家准备完毕,游戏才能开始。

  • CyclicBarrier (循环栅栏): 游戏每进行一轮,都需要所有人确认准备好,才能进入下一轮。就像玩大富翁,所有人走到起点才能掷骰子。

  • Phaser (相位器): 游戏有多个阶段,比如准备阶段、战斗阶段、结算阶段。每个阶段都需要所有玩家完成才能进入下一个阶段。而且,玩家可以动态加入或退出游戏(比如有人掉线了)。

简单来说,这些工具就是用来同步多个线程的,让它们按照我们设定的步骤,协同工作,避免出现“你跑太快,等等我”的情况。

二、CountDownLatch:一锤定音的倒计时

CountDownLatch 的原理很简单,就像一个倒计时器。你给它设置一个初始值,然后每个线程完成任务后,就调用 countDown() 方法,让计数器减一。当计数器减到零时,所有等待 await() 方法的线程就会被唤醒。

适用场景:

  • 主线程等待多个子线程完成任务。
  • 测试用例中,等待多个异步操作完成。
  • 并发初始化:确保所有服务都启动完毕后再提供服务。

代码示例:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchExample {

    public static void main(String[] args) throws InterruptedException {
        int numberOfThreads = 3;
        CountDownLatch latch = new CountDownLatch(numberOfThreads); // 初始化计数器为 3

        ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);

        for (int i = 0; i < numberOfThreads; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    System.out.println("线程 " + taskId + " 开始执行任务...");
                    Thread.sleep((long) (Math.random() * 2000)); // 模拟任务执行时间
                    System.out.println("线程 " + taskId + " 任务执行完毕!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown(); // 计数器减一
                }
            });
        }

        System.out.println("主线程等待所有子线程完成...");
        latch.await(); // 主线程阻塞,直到计数器为零
        System.out.println("所有子线程执行完毕,主线程继续执行!");

        executor.shutdown(); // 关闭线程池
    }
}

代码解释:

  1. CountDownLatch latch = new CountDownLatch(numberOfThreads);:创建一个 CountDownLatch 对象,初始计数器值为 numberOfThreads
  2. latch.countDown();:每个子线程完成任务后,调用 countDown() 方法,计数器减一。
  3. latch.await();:主线程调用 await() 方法,进入阻塞状态,直到计数器减到零。
  4. executor.shutdown();:关闭线程池,释放资源。

注意事项:

  • CountDownLatch 是一次性的,计数器减到零之后就不能重置了。
  • 如果某个线程在 await() 期间被中断,会抛出 InterruptedException 异常。

三、CyclicBarrier:周而复始的同步

CyclicBarrier 就像一个栅栏,当所有线程都到达栅栏处时,栅栏才会打开,允许所有线程继续执行。与 CountDownLatch 不同的是,CyclicBarrier 可以重复使用,每次所有线程到达栅栏后,栅栏会重置,等待下一批线程。

适用场景:

  • 多线程分段计算:将一个大的计算任务分成多个阶段,每个阶段都需要所有线程完成才能进入下一个阶段。
  • 模拟并发:模拟多个用户同时访问服务器。
  • 并行算法:例如,并行遗传算法,每一代都需要所有个体完成评估才能进入下一代。

代码示例:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierExample {

    public static void main(String[] args) {
        int numberOfThreads = 3;
        CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> {
            System.out.println("所有线程到达栅栏,开始下一轮!"); // Runnable 在所有线程到达栅栏后执行
        });

        ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);

        for (int i = 0; i < numberOfThreads; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    for (int round = 1; round <= 3; round++) {
                        System.out.println("线程 " + taskId + " 正在执行第 " + round + " 轮任务...");
                        Thread.sleep((long) (Math.random() * 2000)); // 模拟任务执行时间
                        System.out.println("线程 " + taskId + " 第 " + round + " 轮任务执行完毕,等待其他线程...");
                        barrier.await(); // 等待其他线程到达栅栏
                        System.out.println("线程 " + taskId + " 开始执行第 " + (round + 1) + " 轮任务(如果存在)...");
                    }
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
    }
}

代码解释:

  1. CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> { ... });:创建一个 CyclicBarrier 对象,numberOfThreads 指定参与的线程数量,第二个参数是一个 Runnable 对象,当所有线程到达栅栏后,会执行这个 Runnable 对象。
  2. barrier.await();:每个线程调用 await() 方法,进入等待状态,直到所有线程都到达栅栏。
  3. 当所有线程都到达栅栏后,会执行 Runnable 对象,然后唤醒所有线程,栅栏重置,等待下一批线程。

注意事项:

  • 如果某个线程在 await() 期间被中断,或者栅栏被重置,会抛出 InterruptedExceptionBrokenBarrierException 异常。
  • 可以指定一个 Runnable 对象,在所有线程到达栅栏后执行一些额外的操作。

四、Phaser:灵活多变的相位器

PhaserCountDownLatchCyclicBarrier 更加灵活,它支持动态地注册和注销参与者,并且可以定义多个阶段(phase)。每个阶段都需要所有参与者完成才能进入下一个阶段。

适用场景:

  • 复杂的并发任务,需要动态地调整参与者数量。
  • 多阶段的计算任务,每个阶段都需要同步。
  • 模拟游戏中的不同阶段,例如准备阶段、战斗阶段、结算阶段。

Phaser 的核心概念:

  • Phase (阶段): Phaser 的核心是阶段,每个阶段都需要所有注册的参与者完成才能进入下一个阶段。
  • Participant (参与者): 线程需要在 Phaser 中注册才能参与同步。可以使用 register() 方法注册,arriveAndDeregister() 方法注销。
  • arriveAndAwaitAdvance(): 线程到达当前阶段的终点,并等待其他线程。
  • arrive(): 线程到达当前阶段的终点,但不等待其他线程。
  • arriveAndDeregister(): 线程到达当前阶段的终点,并注销自己。
  • onAdvance(): Phaser 进入下一个阶段时,会调用 onAdvance() 方法。可以重写这个方法来执行一些额外的操作,例如更新状态、判断是否需要结束。

代码示例:

import java.util.concurrent.Phaser;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class PhaserExample {

    public static void main(String[] args) {
        int numberOfThreads = 3;
        Phaser phaser = new Phaser(1) { // 初始注册一个参与者(主线程)
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("==== Phase " + phase + " 完成,准备进入下一个 Phase ====");
                return registeredParties == 0; // 如果没有参与者,则结束 Phaser
            }
        };

        ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);

        System.out.println("主线程注册了一个参与者,当前 Phase: " + phaser.getPhase());

        for (int i = 0; i < numberOfThreads; i++) {
            final int taskId = i;
            phaser.register(); // 注册一个参与者
            executor.submit(() -> {
                try {
                    System.out.println("线程 " + taskId + " 已注册,当前 Phase: " + phaser.getPhase());
                    Thread.sleep((long) (Math.random() * 2000));
                    System.out.println("线程 " + taskId + " 完成第一阶段任务,等待其他线程...");
                    phaser.arriveAndAwaitAdvance(); // 到达第一阶段终点,并等待其他线程

                    System.out.println("线程 " + taskId + " 开始执行第二阶段任务...");
                    Thread.sleep((long) (Math.random() * 2000));
                    System.out.println("线程 " + taskId + " 完成第二阶段任务,准备退出...");
                    phaser.arriveAndDeregister(); // 到达第二阶段终点,并注销自己
                    System.out.println("线程 " + taskId + " 已注销,当前 Phase: " + phaser.getPhase());

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        phaser.arriveAndDeregister(); // 主线程完成任务,注销自己
        System.out.println("主线程已注销,当前 Phase: " + phaser.getPhase());

        executor.shutdown();
    }
}

代码解释:

  1. Phaser phaser = new Phaser(1) { ... };:创建一个 Phaser 对象,初始注册一个参与者(主线程)。onAdvance() 方法在每个阶段完成后被调用,可以重写这个方法来执行一些额外的操作。
  2. phaser.register();:注册一个新的参与者。
  3. phaser.arriveAndAwaitAdvance();:线程到达当前阶段的终点,并等待其他线程。当所有线程都到达终点后,Phaser 会进入下一个阶段,并唤醒所有等待的线程。
  4. phaser.arriveAndDeregister();:线程到达当前阶段的终点,并注销自己。

注意事项:

  • PhaserCountDownLatchCyclicBarrier 更加灵活,但使用起来也更复杂。
  • 需要仔细考虑参与者的注册和注销时机,避免出现死锁或错误。
  • onAdvance() 方法可以用来控制 Phaser 的行为,例如判断是否需要结束。

五、三剑客对比:选哪个好呢?

为了方便大家选择,我把这三个家伙的特点总结成一张表格:

特性 CountDownLatch CyclicBarrier Phaser
功能 一次性倒计时同步 循环栅栏同步 动态相位同步
可重用性
参与者数量 初始化时固定 初始化时固定 可动态调整
阶段 单一阶段 单一阶段 多阶段
适用场景 主线程等待子线程完成任务 多线程分段计算,模拟并发 复杂的并发任务,多阶段计算任务
灵活性
复杂度
主要方法 countDown(), await() await(), reset() register(), arriveAndAwaitAdvance(), arriveAndDeregister(), onAdvance()

总结:

  • 如果你的需求很简单,只需要等待所有线程完成某个任务,CountDownLatch 就足够了。
  • 如果你的任务需要分成多个阶段,每个阶段都需要所有线程同步,CyclicBarrier 是一个不错的选择。
  • 如果你的任务非常复杂,需要动态地调整参与者数量,并且需要支持多个阶段,那么 Phaser 就是你的最佳选择。

六、总结与展望

今天我们一起学习了 CountDownLatchCyclicBarrierPhaser 这三个并发协调器。它们各有特点,适用于不同的场景。希望通过今天的讲解,大家能够对它们有更深入的理解,在实际开发中能够灵活运用,写出更高效、更可靠的并发程序。

记住,并发编程就像玩火,玩得好能给你带来光明,玩不好就可能把自己烧伤。所以,一定要慎之又慎,多学习,多实践,才能真正掌握并发编程的精髓。

好了,今天的分享就到这里,感谢大家的收听!下次再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注