JS `Atomics.wait` 与 `Atomics.notify`:等待/通知模式在 Worker 间的同步

各位观众老爷,大家好!欢迎来到今天的并发编程小剧场。今天我们不讲高深的理论,只聊聊如何在 JavaScript 的 Worker 之间玩转 "等待/通知" 模式,也就是 Atomics.waitAtomics.notify 这两位哥们儿。

咱们先来个“灵魂拷问”:你有没有遇到过这样的场景?

  • Worker A: “哥们儿,数据准备好了没?好了吱一声!”
  • Worker B: “还没呢,等等哈…”
  • Worker A: (疯狂轮询) “好了没?好了没?好了没?” (CPU: 我裂开了!)

这种场景是不是很熟悉?Worker A 像个催债的一样,不停地问 Worker B 数据是否准备好。这种疯狂轮询 (busy-waiting) 会浪费大量的 CPU 资源,而且 Worker A 在没收到通知之前啥也干不了,简直是“躺平式等待”。

Atomics.waitAtomics.notify 的出现就是为了解决这个问题。它们提供了一种高效的等待/通知机制,让 Worker 可以安心等待,直到收到通知再醒过来干活。

啥是 SharedArrayBuffer?

在深入 Atomics.waitAtomics.notify 之前,我们必须先认识一位老朋友:SharedArrayBuffer

SharedArrayBuffer 允许在多个 Worker 之间共享一块内存区域。这块内存区域就像一个公共的白板,Worker 们可以在上面读写数据。而 Atomics 对象则提供了一系列原子操作,可以安全地读写 SharedArrayBuffer 中的数据,避免出现竞态条件 (race condition)。

简单来说,SharedArrayBuffer 提供了共享内存,Atomics 提供了安全访问共享内存的手段。

Atomics.wait:沉睡的猛兽

Atomics.wait 让 Worker 进入休眠状态,直到满足特定条件才会被唤醒。它的语法如下:

Atomics.wait(typedArray, index, value, timeout);
  • typedArray: 一个共享的 Int32ArrayBigInt64Array 实例。
  • index: typedArray 中要检查的元素的索引。
  • value: typedArray[index] 期望的值。只有当 typedArray[index] 等于 value 时,Worker 才会进入休眠状态。
  • timeout: 可选参数,等待的超时时间,单位是毫秒。如果超过超时时间,Worker 会自动醒来。

Atomics.wait 的返回值有三种:

  • "ok": Worker 被 Atomics.notify 唤醒。
  • "timed-out": Worker 等待超时。
  • "not-equal": typedArray[index] 的值不等于 value,Worker 没有进入休眠状态。

Atomics.notify:唤醒沉睡者

Atomics.notify 用于唤醒等待在 SharedArrayBuffer 上的 Worker。它的语法如下:

Atomics.notify(typedArray, index, count);
  • typedArray: 一个共享的 Int32ArrayBigInt64Array 实例。
  • index: typedArray 中要唤醒的元素的索引。
  • count: 可选参数,要唤醒的 Worker 的数量。如果省略,则唤醒所有等待的 Worker。

Atomics.notify 的返回值是被唤醒的 Worker 的数量。

实战演练:生产者-消费者模型

咱们来用 Atomics.waitAtomics.notify 实现一个经典的生产者-消费者模型。

场景:

  • 生产者 (Producer): 负责生产数据,并将数据放入共享缓冲区。
  • 消费者 (Consumer): 负责从共享缓冲区取出数据,并进行消费。
  • 共享缓冲区 (Shared Buffer): 一个固定大小的数组,用于存储生产者生产的数据。

代码实现:

main.js (主线程):

// 定义缓冲区大小
const BUFFER_SIZE = 10;

// 创建 SharedArrayBuffer
const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * (BUFFER_SIZE + 2)); // +2 用于控制状态

// 创建视图
const buffer = new Int32Array(sharedBuffer);

// 定义状态常量
const EMPTY = 0;
const FULL = BUFFER_SIZE;

// 初始化状态:缓冲区为空
Atomics.store(buffer, BUFFER_SIZE, EMPTY); // buffer[BUFFER_SIZE] 用于记录缓冲区中数据的数量
Atomics.store(buffer, BUFFER_SIZE + 1, 0); // buffer[BUFFER_SIZE+1] 用于记录当前写入位置

// 创建 Worker
const producerWorker = new Worker('producer.js');
const consumerWorker = new Worker('consumer.js');

// 将 SharedArrayBuffer 发送给 Worker
producerWorker.postMessage({ buffer: sharedBuffer, bufferSize: BUFFER_SIZE });
consumerWorker.postMessage({ buffer: sharedBuffer, bufferSize: BUFFER_SIZE });

producer.js (生产者 Worker):

let buffer;
let bufferSize;

self.onmessage = function(event) {
  buffer = new Int32Array(event.data.buffer);
  bufferSize = event.data.bufferSize;
  produce();
};

function produce() {
  setInterval(() => {
    // 检查缓冲区是否已满
    const currentSize = Atomics.load(buffer, bufferSize);
    if (currentSize === bufferSize) {
      console.log('Producer: Buffer is full, waiting...');
      Atomics.wait(buffer, bufferSize, bufferSize); // 等待消费者消费
    }

    // 生产数据
    const data = Math.floor(Math.random() * 100);
    const writeIndex = Atomics.load(buffer, bufferSize + 1);
    Atomics.store(buffer, writeIndex, data);
    console.log(`Producer: Produced ${data} at index ${writeIndex}`);

    // 更新写入位置
    const newWriteIndex = (writeIndex + 1) % bufferSize;
    Atomics.store(buffer, bufferSize + 1, newWriteIndex);

    // 增加缓冲区大小
    Atomics.add(buffer, bufferSize, 1);

    // 通知消费者
    Atomics.notify(buffer, bufferSize, 1);
  }, 500); // 每 500 毫秒生产一个数据
}

consumer.js (消费者 Worker):

let buffer;
let bufferSize;

self.onmessage = function(event) {
  buffer = new Int32Array(event.data.buffer);
  bufferSize = event.data.bufferSize;
  consume();
};

function consume() {
  setInterval(() => {
    // 检查缓冲区是否为空
    const currentSize = Atomics.load(buffer, bufferSize);
    if (currentSize === 0) {
      console.log('Consumer: Buffer is empty, waiting...');
      Atomics.wait(buffer, bufferSize, 0); // 等待生产者生产
    }

    // 消费数据
    const readIndex = (Atomics.load(buffer, bufferSize + 1) - Atomics.load(buffer, bufferSize) + bufferSize) % bufferSize;
    const data = Atomics.load(buffer, readIndex);
    console.log(`Consumer: Consumed ${data} from index ${readIndex}`);

    // 减少缓冲区大小
    Atomics.sub(buffer, bufferSize, 1);

    // 通知生产者
    Atomics.notify(buffer, bufferSize, 1);
  }, 1000); // 每 1000 毫秒消费一个数据
}

代码解释:

  1. main.js:

    • 创建 SharedArrayBuffer,大小为 BUFFER_SIZE + 2。其中 BUFFER_SIZE 个元素用于存储数据,一个元素用于记录缓冲区中数据的数量 (buffer[BUFFER_SIZE]),另一个元素用于记录当前写入位置 (buffer[BUFFER_SIZE + 1])。
    • 初始化状态:缓冲区为空 (buffer[BUFFER_SIZE] = 0),写入位置为 0 (buffer[BUFFER_SIZE + 1] = 0)。
    • 创建生产者和消费者 Worker,并将 SharedArrayBuffer 发送给它们。
  2. producer.js:

    • 生产者 Worker 接收到 SharedArrayBuffer 后,会定期检查缓冲区是否已满。
    • 如果缓冲区已满,则调用 Atomics.wait(buffer, bufferSize, bufferSize) 进入休眠状态,等待消费者消费。
    • 如果缓冲区未满,则生产数据,并将数据放入缓冲区。
    • 更新写入位置,并增加缓冲区大小。
    • 最后,调用 Atomics.notify(buffer, bufferSize, 1) 通知消费者。
  3. consumer.js:

    • 消费者 Worker 接收到 SharedArrayBuffer 后,会定期检查缓冲区是否为空。
    • 如果缓冲区为空,则调用 Atomics.wait(buffer, bufferSize, 0) 进入休眠状态,等待生产者生产。
    • 如果缓冲区不为空,则从缓冲区取出数据,并进行消费。
    • 减少缓冲区大小。
    • 最后,调用 Atomics.notify(buffer, bufferSize, 1) 通知生产者。

运行结果:

运行这段代码,你会看到生产者和消费者交替工作,生产者生产数据,消费者消费数据,并且当缓冲区满或空时,Worker 会进入休眠状态,避免了无谓的轮询。

注意事项:

  • Atomics.wait 只能用于 Int32ArrayBigInt64Array
  • Atomics.wait 可能会出现 “虚假唤醒 (spurious wakeup)”。 也就是说,即使没有调用 Atomics.notify,Worker 也有可能被唤醒。因此,在 Atomics.wait 之后,需要再次检查等待的条件是否满足。
  • Atomics.notify 唤醒的 Worker 的顺序是不确定的。

表格总结:

函数 描述 参数 返回值
Atomics.wait 让 Worker 进入休眠状态,直到满足特定条件才会被唤醒。 typedArray: Int32ArrayBigInt64Array 实例;index: 要检查的元素的索引;value: 期望的值;timeout: 超时时间 (可选)。 "ok": Worker 被唤醒;"timed-out": 等待超时;"not-equal": 值不相等。
Atomics.notify 唤醒等待在 SharedArrayBuffer 上的 Worker。 typedArray: Int32ArrayBigInt64Array 实例;index: 要唤醒的元素的索引;count: 要唤醒的 Worker 的数量 (可选)。 被唤醒的 Worker 的数量。
SharedArrayBuffer 允许多个 Worker 之间共享一块内存区域。 length: 缓冲区的大小(以字节为单位)。 新创建的 SharedArrayBuffer 对象。

高级用法:信号量 (Semaphore)

Atomics.waitAtomics.notify 还可以用来实现信号量,用于控制对共享资源的访问。

场景:

  • 多个 Worker 需要访问同一个共享资源,但同一时刻只能有一个 Worker 访问。

代码实现 (简化版):

// 创建 SharedArrayBuffer
const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
const semaphore = new Int32Array(sharedBuffer);

// 初始化信号量为 1 (表示资源可用)
Atomics.store(semaphore, 0, 1);

function acquire() {
  while (Atomics.compareExchange(semaphore, 0, 1, 0) !== 1) {
    console.log("Waiting for semaphore...");
    Atomics.wait(semaphore, 0, 0); // 等待信号量变为 1
  }
  console.log("Acquired semaphore!");
}

function release() {
  Atomics.store(semaphore, 0, 1); // 释放信号量
  Atomics.notify(semaphore, 0, 1); // 通知等待的 Worker
  console.log("Released semaphore!");
}

// 使用示例:
acquire();
// 访问共享资源
// ...
release();

代码解释:

  • acquire() 函数尝试获取信号量。如果信号量当前为 1 (表示资源可用),则将其设置为 0 (表示资源已被占用),并返回。如果信号量当前为 0 (表示资源已被占用),则调用 Atomics.wait 进入休眠状态,等待信号量变为 1。
  • release() 函数释放信号量,将其设置为 1,并调用 Atomics.notify 通知等待的 Worker。

总结:

Atomics.waitAtomics.notify 是 JavaScript 中强大的并发编程工具,它们提供了一种高效的等待/通知机制,可以用于实现各种复杂的并发模式,例如生产者-消费者模型、信号量等。

当然,并发编程是一个复杂的话题,需要深入理解其原理和注意事项,才能编写出安全、高效的代码。希望今天的讲解能帮助你更好地理解 Atomics.waitAtomics.notify,并在实际项目中灵活运用。

记住,并发编程就像玩火,玩得好可以温暖自己,玩不好就会引火烧身。所以,在编写并发代码时,一定要小心谨慎,多加测试,确保代码的正确性和安全性。

好了,今天的并发编程小剧场就到这里,感谢大家的观看!下次再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注