Java中的Phaser同步器:实现多阶段、可重用同步屏障的底层机制

好的,我们开始今天的讲座,主题是Java中的Phaser同步器:实现多阶段、可重用同步屏障的底层机制。

Phaser同步器:多阶段同步的利器

在并发编程中,我们经常需要多个线程在执行到某个特定点时进行同步,确保所有线程都到达该点后,才能继续执行后续操作。Java提供了多种同步工具,例如CountDownLatch、CyclicBarrier等,但它们在处理多阶段同步或需要动态调整参与线程数量的场景下,显得有些力不从心。这时,Phaser就派上了用场。

Phaser是一个灵活且强大的同步器,它提供了一种可重用、多阶段的同步屏障机制。它允许一组线程在多个阶段内协调工作,并且可以动态地注册和注销参与者,这使得它非常适合处理复杂并发场景,例如并行迭代、分而治之算法等。

Phaser的核心概念

要理解Phaser,我们需要掌握几个关键概念:

  • Phase(阶段): Phaser的核心是阶段的概念。每个Phaser对象维护一个内部的阶段计数器,初始值为0。当所有已注册的参与者都到达当前阶段的同步点时,Phaser会将阶段计数器递增,进入下一个阶段。
  • Parties(参与者): 参与者是指注册到Phaser中并参与同步的线程。每个线程在完成当前阶段的工作后,会调用Phaser的arriveAndAwaitAdvance()方法,等待其他参与者到达。
  • Registration(注册): 线程可以通过register()方法将自己注册为Phaser的参与者。注册后,线程就可以参与到Phaser的同步过程中。
  • Deregistration(注销): 线程可以通过arriveAndDeregister()方法注销自己,表示不再参与后续的同步。
  • Termination(终止): Phaser可以被终止,终止后,所有arriveAndAwaitAdvance()方法都会立即返回,不会再等待。

Phaser的主要方法

Phaser提供了一系列方法来控制同步过程,其中最常用的包括:

方法名 描述
Phaser() 构造函数,创建一个没有参与者的Phaser。
Phaser(int parties) 构造函数,创建一个具有指定数量参与者的Phaser。
register() 将当前线程注册为Phaser的参与者,增加参与者数量。
bulkRegister(int parties) 批量注册指定数量的参与者。
arrive() 通知Phaser当前线程已经到达当前阶段的同步点,但不等待其他线程。
arriveAndAwaitAdvance() 通知Phaser当前线程已经到达当前阶段的同步点,并等待其他线程到达。当所有线程都到达后,Phaser进入下一个阶段,该方法返回下一个阶段的编号。
arriveAndDeregister() 通知Phaser当前线程已经到达当前阶段的同步点,并注销自己。这通常用于处理那些只需要参与部分阶段同步的线程。
awaitAdvance(int phase) 显式等待指定的阶段完成。如果指定的阶段已经完成,则立即返回。
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) awaitAdvance类似,但允许在等待期间被中断,并且可以设置超时时间。
getPhase() 返回当前Phaser的阶段编号。
getRegisteredParties() 返回已注册的参与者数量。
getArrivedParties() 返回已到达当前阶段的参与者数量。
getUnarrivedParties() 返回尚未到达当前阶段的参与者数量。
isTerminated() 判断Phaser是否已经终止。
forceTermination() 强制终止Phaser,所有等待线程都会立即返回。
onAdvance(int phase, int registeredParties) 当Phaser进入下一个阶段时,会调用此方法。子类可以重写此方法来执行一些额外的操作,例如更新共享数据、检查终止条件等。

Phaser的使用示例:并行数据处理

假设我们有一个大型数据集,需要将其分成多个块,然后由多个线程并行处理。每个线程处理完自己的数据块后,需要等待其他线程完成,才能进行下一步的汇总操作。这个过程可以重复多次,直到所有数据处理完毕。

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class PhaserExample {

    private static final int NUM_THREADS = 5;
    private static final int NUM_PHASES = 3;

    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(NUM_THREADS) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("Phase " + phase + " completed. Registered parties: " + registeredParties);
                return phase >= NUM_PHASES - 1 || registeredParties == 0; // Terminate after NUM_PHASES or if no parties are registered
            }
        };

        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < NUM_THREADS; i++) {
            final int threadId = i;
            Thread thread = new Thread(() -> {
                System.out.println("Thread " + threadId + " started.");
                for (int phase = 0; phase < NUM_PHASES; phase++) {
                    // Simulate some work
                    try {
                        TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(500, 1500));
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }

                    System.out.println("Thread " + threadId + " completed phase " + phase + ".  Arriving and awaiting.");
                    phaser.arriveAndAwaitAdvance(); // Wait for all threads to complete this phase
                    System.out.println("Thread " + threadId + " continues after phase " + phase);
                }
                System.out.println("Thread " + threadId + " finished all phases.");
            });
            threads.add(thread);
            thread.start();
        }

        for (Thread thread : threads) {
            thread.join();
        }

        System.out.println("All threads finished. Phaser terminated: " + phaser.isTerminated());
    }
}

在这个例子中,我们创建了一个Phaser对象,并注册了5个参与者。每个线程模拟执行3个阶段的任务。在每个阶段完成后,线程调用arriveAndAwaitAdvance()方法,等待其他线程完成。当所有线程都到达当前阶段的同步点后,Phaser会进入下一个阶段,并唤醒所有等待的线程。onAdvance方法在每个阶段完成后被调用,我们可以在这里执行一些额外的操作,例如打印阶段信息或检查终止条件。

Phaser的动态注册和注销

Phaser的一个重要特性是可以动态地注册和注销参与者。这使得它非常适合处理那些参与线程数量不固定的场景。

import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class PhaserDynamicExample {

    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(1); // Initial party is the main thread
        System.out.println("Phaser started by main thread.");

        // Simulate adding and removing parties dynamically
        for (int i = 0; i < 3; i++) {
            // Add a new worker
            int workerId = i + 1;
            phaser.register();
            System.out.println("Main thread registered worker " + workerId + ". Total parties: " + phaser.getRegisteredParties());

            new Thread(() -> {
                System.out.println("Worker " + workerId + " started.");
                try {
                    // Simulate some work
                    TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(500, 1500));

                    System.out.println("Worker " + workerId + " arriving and awaiting phase " + phaser.getPhase());
                    phaser.arriveAndAwaitAdvance(); // Wait for the main thread and other workers

                    System.out.println("Worker " + workerId + " continuing after phase " + phaser.getPhase());

                    // Simulate more work
                    TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(500, 1500));
                    System.out.println("Worker " + workerId + " arriving and deregistering. Phase: " + phaser.getPhase());
                    phaser.arriveAndDeregister(); // Deregister after completing the work
                    System.out.println("Worker " + workerId + " finished.");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();

            // Main thread does some work
            TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(200, 800));
            System.out.println("Main thread completed its work for phase " + phaser.getPhase() + ". Awaiting workers.");
            phaser.arriveAndAwaitAdvance(); // Wait for all workers
            System.out.println("Main thread continues after phase " + phaser.getPhase());
        }

        // Main thread deregisters itself and terminates the phaser.
        phaser.arriveAndDeregister();
        System.out.println("Main thread finished and deregistered.  Phaser terminated: " + phaser.isTerminated());

    }
}

在这个例子中,主线程首先注册自己作为Phaser的参与者。然后,它循环三次,每次都注册一个新的工作线程,并启动该线程。工作线程在完成自己的任务后,会注销自己。主线程在每次循环结束后,也会等待所有工作线程完成。最后,主线程注销自己,并结束程序。

Phaser的onAdvance()方法

onAdvance()方法是一个protected方法,子类可以重写它来执行一些额外的操作,例如更新共享数据、检查终止条件等。该方法在每个阶段完成后被调用,并且接收两个参数:当前阶段的编号和已注册的参与者数量。

import java.util.concurrent.Phaser;

public class PhaserOnAdvanceExample {

    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(2) { // Main thread and worker thread
            private int counter = 0;

            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                counter++;
                System.out.println("onAdvance called. Phase: " + phase + ", Registered parties: " + registeredParties + ", Counter: " + counter);
                return counter >= 3; // Terminate after 3 phases
            }
        };

        new Thread(() -> {
            while (!phaser.isTerminated()) {
                System.out.println("Worker thread arriving and awaiting. Phase: " + phaser.getPhase());
                phaser.arriveAndAwaitAdvance();
                System.out.println("Worker thread continues after phase " + phaser.getPhase());
            }
            System.out.println("Worker thread finished.");
        }).start();

        while (!phaser.isTerminated()) {
            System.out.println("Main thread arriving and awaiting. Phase: " + phaser.getPhase());
            phaser.arriveAndAwaitAdvance();
            System.out.println("Main thread continues after phase " + phaser.getPhase());
            Thread.sleep(100);
        }
        System.out.println("Main thread finished. Phaser terminated.");

    }
}

在这个例子中,我们重写了onAdvance()方法,并在其中增加了一个计数器。每次onAdvance()方法被调用时,计数器都会递增。当计数器达到3时,onAdvance()方法返回true,表示Phaser应该终止。

Phaser与CountDownLatch、CyclicBarrier的比较

虽然Phaser、CountDownLatch和CyclicBarrier都可以用于线程同步,但它们之间存在一些关键区别:

特性 CountDownLatch CyclicBarrier Phaser
重用性 一次性 可重用 可重用
阶段性 单阶段 多阶段
参与者数量 初始化时固定 初始化时固定 动态可调
终止 无法重置或终止 可以重置 可以强制终止
适用场景 一组线程等待一个事件发生 一组线程在某个点同步 多阶段同步,参与者数量动态变化的复杂场景

Phaser的底层机制

Phaser的底层实现依赖于一个内部的状态变量,该变量包含了当前阶段、已注册的参与者数量、已到达的参与者数量等信息。当线程调用arriveAndAwaitAdvance()方法时,Phaser会将已到达的参与者数量递增,并检查是否所有已注册的参与者都已到达。如果所有参与者都已到达,Phaser会进入下一个阶段,并唤醒所有等待的线程。

为了避免竞争条件,Phaser使用了CAS(Compare and Swap)操作来更新内部状态变量。当多个线程同时尝试更新状态变量时,只有一个线程能够成功,其他线程需要重新尝试。

onAdvance方法的调用也需要同步处理,以确保在进入下一个阶段之前,所有必要的清理和更新操作都已完成。

使用Phaser的注意事项

  • 避免在onAdvance()方法中执行耗时操作,因为这会阻塞所有等待的线程。
  • 确保所有参与者最终都会调用arriveAndAwaitAdvance()arriveAndDeregister()方法,否则可能会导致死锁。
  • 合理设置Phaser的终止条件,避免程序无限期地运行下去。
  • 理解onAdvance的返回值,返回true会终止后续阶段,返回false会继续执行。

总结:掌握Phaser,应对复杂并发场景

Phaser是一个功能强大且灵活的同步器,它提供了一种可重用、多阶段的同步屏障机制。通过动态地注册和注销参与者,以及重写onAdvance()方法,我们可以使用Phaser来处理各种复杂的并发场景。 理解Phaser的核心概念、常用方法和底层机制,能够帮助我们更好地利用它来构建高效、可靠的并发程序。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注