Java高阶同步器:Phaser、Exchanger在复杂任务协作中的灵活应用

Java 高阶同步器:Phaser、Exchanger 在复杂任务协作中的灵活应用

大家好,今天我们来深入探讨 Java 并发包中两个相对高级的同步器:PhaserExchanger。 相较于 CountDownLatchCyclicBarrierSemaphore 这些我们常用的同步工具,PhaserExchanger 在处理更复杂、更灵活的任务协作场景时,能发挥更大的作用。 接下来,我将通过代码示例和实际场景,详细讲解它们的用法和优势。

1. Phaser:灵活可变的同步屏障

Phaser 是一个比 CyclicBarrier 更灵活的同步屏障。 它可以动态地调整参与者的数量,并且可以分阶段执行任务。 这使得它非常适合处理迭代计算、分而治之等复杂场景。

1.1 Phaser 的基本概念

  • Phase (阶段): Phaser 的核心概念,代表一个执行阶段。 每次调用 arriveAndAwaitAdvance() 方法,都会使 Phaser 进入下一个阶段。
  • Parties (参与者): 参与同步的线程数量。 Phaser 允许动态地增加或减少参与者。
  • Registration (注册): 线程通过 register()bulkRegister() 方法向 Phaser 注册,成为参与者。
  • Arrival (到达): 线程通过 arrive()arriveAndAwaitAdvance() 方法通知 Phaser 自己已完成当前阶段的任务。
  • Termination (终止): Phaser 可以通过 isTerminated() 方法判断是否已终止,通常由最后一个到达的线程调用 onAdvance() 方法来控制。

1.2 Phaser 的基本用法

import java.util.concurrent.Phaser;

public class PhaserExample {

    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(3); // 初始注册 3 个参与者

        for (int i = 0; i < 3; i++) {
            final int threadId = i;
            new Thread(() -> {
                System.out.println("Thread " + threadId + " started, phase: " + phaser.getPhase());
                phaser.arriveAndAwaitAdvance(); // 等待其他线程到达

                System.out.println("Thread " + threadId + " executing task in phase: " + phaser.getPhase());
                try {
                    Thread.sleep(1000); // 模拟任务执行
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                phaser.arriveAndAwaitAdvance(); // 等待其他线程到达

                System.out.println("Thread " + threadId + " completed task in phase: " + phaser.getPhase());
                phaser.arriveAndDeregister(); // 线程完成任务,注销自己
            }).start();
        }

        // 主线程等待所有线程完成
        while (!phaser.isTerminated()) {
            Thread.sleep(100);
        }

        System.out.println("All threads completed.");
    }
}

在这个例子中,我们创建了一个 Phaser,初始注册了 3 个参与者。 每个线程启动后,首先等待其他线程到达,然后执行任务,最后等待其他线程完成任务。 每个线程在完成任务后,都会注销自己。 arriveAndDeregister() 方法将到达并注销线程,减少参与者数量。 当所有线程都注销后,Phaser 将终止。

1.3 Phaser 的高级用法:onAdvance() 方法

onAdvance() 方法是 Phaser 的一个关键特性。 它允许我们在每个阶段结束时执行一些自定义逻辑,例如检查是否需要终止 Phaser

import java.util.concurrent.Phaser;
import java.util.Random;

public class PhaserTerminationExample {

    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(3) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("Phase " + phase + " completed. Registered parties: " + registeredParties);
                return registeredParties == 0 || phase > 5; // 如果没有参与者或超过 5 个阶段,则终止 Phaser
            }
        };

        Random random = new Random();

        for (int i = 0; i < 3; i++) {
            final int threadId = i;
            new Thread(() -> {
                while (!phaser.isTerminated()) {
                    System.out.println("Thread " + threadId + " started, phase: " + phaser.getPhase());
                    phaser.arriveAndAwaitAdvance(); // 等待其他线程到达

                    if (phaser.isTerminated()) {
                        System.out.println("Thread " + threadId + " exiting because Phaser is terminated.");
                        break;
                    }

                    System.out.println("Thread " + threadId + " executing task in phase: " + phaser.getPhase());
                    try {
                        Thread.sleep(random.nextInt(500)); // 模拟任务执行
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                phaser.arriveAndDeregister();
                System.out.println("Thread " + threadId + " deregistered.");
            }).start();
        }
    }
}

在这个例子中,我们重写了 onAdvance() 方法。 onAdvance() 方法在每个阶段结束后被调用,并返回一个 boolean 值,指示 Phaser 是否应该终止。 如果返回 true,则 Phaser 将终止,否则将继续进行下一个阶段。 这里,我们设置了两个终止条件:如果没有参与者(registeredParties == 0)或超过 5 个阶段(phase > 5)。

1.4 Phaser 的应用场景:迭代计算

Phaser 非常适合用于迭代计算,例如图像处理、机器学习等。 在每次迭代中,多个线程并行处理数据,然后同步到下一个迭代。

import java.util.concurrent.Phaser;
import java.util.Arrays;
import java.util.Random;

public class PhaserIterationExample {

    public static void main(String[] args) throws InterruptedException {
        int numThreads = 4;
        int iterations = 10;
        int arraySize = 100;

        int[] data = new int[arraySize];
        Random random = new Random();
        for (int i = 0; i < arraySize; i++) {
            data[i] = random.nextInt(100);
        }

        Phaser phaser = new Phaser(numThreads) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("Iteration " + phase + " completed.");
                return phase >= iterations || registeredParties == 0;
            }
        };

        for (int i = 0; i < numThreads; i++) {
            final int threadId = i;
            new Thread(() -> {
                int start = threadId * (arraySize / numThreads);
                int end = (threadId == numThreads - 1) ? arraySize : (threadId + 1) * (arraySize / numThreads);

                while (!phaser.isTerminated()) {
                    // 每个线程处理一部分数据
                    for (int j = start; j < end; j++) {
                        data[j] = data[j] * 2; // 模拟数据处理
                    }

                    System.out.println("Thread " + threadId + " processed data in iteration " + phaser.getPhase());

                    // 等待其他线程完成
                    phaser.arriveAndAwaitAdvance();
                }
                phaser.arriveAndDeregister();
                System.out.println("Thread " + threadId + " deregistered.");
            }).start();
        }

        // 主线程等待所有线程完成
        while (!phaser.isTerminated()) {
            Thread.sleep(100);
        }

        System.out.println("All threads completed " + iterations + " iterations.");
    }
}

在这个例子中,我们使用 Phaser 来同步多个线程,每个线程负责处理一部分数据。 在每次迭代中,每个线程将自己的数据乘以 2。 Phaser 确保所有线程都完成当前迭代后,才会进入下一个迭代。

1.5 Phaser 的优势

  • 动态参与者数量: Phaser 允许动态地增加或减少参与者,这使得它非常适合处理任务数量不确定的场景。
  • 分阶段执行: Phaser 可以分阶段执行任务,每个阶段可以执行不同的逻辑。
  • 自定义终止条件: Phaser 允许自定义终止条件,这使得它可以灵活地控制任务的执行。

1.6 Phaser 和 CyclicBarrier 的区别

特性 Phaser CyclicBarrier
参与者数量 可动态调整 固定
阶段执行 可分阶段执行,onAdvance() 方法可自定义逻辑 仅同步等待,到达屏障后执行 Runnable
灵活性 更高 较低
应用场景 迭代计算、分而治之等复杂场景 简单同步等待

2. Exchanger:线程间安全交换数据

Exchanger 允许两个线程安全地交换数据。 它提供了一个同步点,当两个线程都到达该点时,它们将交换各自的数据。

2.1 Exchanger 的基本概念

  • Exchange Point (交换点): Exchanger 的核心概念。 线程到达交换点后,将等待另一个线程到达。
  • Data Exchange (数据交换): 当两个线程都到达交换点时,它们将交换各自的数据。

2.2 Exchanger 的基本用法

import java.util.concurrent.Exchanger;

public class ExchangerExample {

    public static void main(String[] args) throws InterruptedException {
        Exchanger<String> exchanger = new Exchanger<>();

        new Thread(() -> {
            String data = "Message from Thread 1";
            try {
                System.out.println("Thread 1: Sending data: " + data);
                String receivedData = exchanger.exchange(data); // 交换数据
                System.out.println("Thread 1: Received data: " + receivedData);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            String data = "Message from Thread 2";
            try {
                System.out.println("Thread 2: Sending data: " + data);
                String receivedData = exchanger.exchange(data); // 交换数据
                System.out.println("Thread 2: Received data: " + receivedData);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

在这个例子中,我们创建了一个 Exchanger,用于交换字符串类型的数据。 两个线程分别发送一条消息,并等待接收另一条消息。 当两个线程都到达 exchange() 方法时,它们将交换各自的消息。

2.3 Exchanger 的应用场景:生产者-消费者模式

Exchanger 可以用于实现生产者-消费者模式,其中一个线程负责生产数据,另一个线程负责消费数据。

import java.util.concurrent.Exchanger;
import java.util.Random;

public class ExchangerProducerConsumer {

    public static void main(String[] args) throws InterruptedException {
        Exchanger<DataBuffer> exchanger = new Exchanger<>();
        DataBuffer initialBuffer = new DataBuffer();

        // 生产者线程
        new Thread(() -> {
            DataBuffer buffer = initialBuffer;
            Random random = new Random();
            try {
                for (int i = 0; i < 10; i++) {
                    // 生产数据
                    buffer.setData("Data " + i);
                    System.out.println("Producer: Produced data: " + buffer.getData());

                    // 交换数据
                    buffer = exchanger.exchange(buffer);

                    // 模拟生产耗时
                    Thread.sleep(random.nextInt(500));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 消费者线程
        new Thread(() -> {
            DataBuffer buffer = new DataBuffer();
            try {
                for (int i = 0; i < 10; i++) {
                    // 交换数据
                    buffer = exchanger.exchange(buffer);

                    // 消费数据
                    System.out.println("Consumer: Consumed data: " + buffer.getData());
                    buffer.setData(null); // 清空数据

                    // 模拟消费耗时
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

    static class DataBuffer {
        private String data;

        public String getData() {
            return data;
        }

        public void setData(String data) {
            this.data = data;
        }
    }
}

在这个例子中,生产者线程负责生产数据,并将数据存储在 DataBuffer 对象中。 消费者线程负责消费数据,并从 DataBuffer 对象中读取数据。 Exchanger 用于在生产者和消费者之间交换 DataBuffer 对象。

2.4 Exchanger 的优势

  • 线程间安全交换数据: Exchanger 确保线程间安全地交换数据,避免了数据竞争和并发问题。
  • 简单易用: Exchanger 的 API 非常简单,易于使用。
  • 高效: Exchanger 的实现经过优化,性能较高。

2.5 Exchanger 的局限性

  • 只能用于两个线程: Exchanger 只能用于两个线程之间交换数据。
  • 必须同时到达: 两个线程必须同时到达交换点,否则将一直等待。

3. 总结与应用场景选择

  • Phaser: 适用于需要动态调整参与者数量、分阶段执行任务的复杂同步场景,例如迭代计算、分而治之等。 通过onAdvance()可以自定义终止条件,灵活控制任务执行流程。
  • Exchanger: 适用于两个线程之间安全交换数据的场景,例如生产者-消费者模式。 确保数据安全交换,避免并发问题。

希望今天的分享能够帮助大家更好地理解和应用 PhaserExchanger 这两个强大的同步器。 谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注