各位观众老爷,大家好!欢迎来到今天的并发编程小剧场。今天我们不讲高深的理论,只聊聊如何在 JavaScript 的 Worker 之间玩转 "等待/通知" 模式,也就是 Atomics.wait
和 Atomics.notify
这两位哥们儿。
咱们先来个“灵魂拷问”:你有没有遇到过这样的场景?
- Worker A: “哥们儿,数据准备好了没?好了吱一声!”
- Worker B: “还没呢,等等哈…”
- Worker A: (疯狂轮询) “好了没?好了没?好了没?” (CPU: 我裂开了!)
这种场景是不是很熟悉?Worker A 像个催债的一样,不停地问 Worker B 数据是否准备好。这种疯狂轮询 (busy-waiting) 会浪费大量的 CPU 资源,而且 Worker A 在没收到通知之前啥也干不了,简直是“躺平式等待”。
Atomics.wait
和 Atomics.notify
的出现就是为了解决这个问题。它们提供了一种高效的等待/通知机制,让 Worker 可以安心等待,直到收到通知再醒过来干活。
啥是 SharedArrayBuffer?
在深入 Atomics.wait
和 Atomics.notify
之前,我们必须先认识一位老朋友:SharedArrayBuffer
。
SharedArrayBuffer
允许在多个 Worker 之间共享一块内存区域。这块内存区域就像一个公共的白板,Worker 们可以在上面读写数据。而 Atomics
对象则提供了一系列原子操作,可以安全地读写 SharedArrayBuffer
中的数据,避免出现竞态条件 (race condition)。
简单来说,SharedArrayBuffer
提供了共享内存,Atomics
提供了安全访问共享内存的手段。
Atomics.wait
:沉睡的猛兽
Atomics.wait
让 Worker 进入休眠状态,直到满足特定条件才会被唤醒。它的语法如下:
Atomics.wait(typedArray, index, value, timeout);
typedArray
: 一个共享的Int32Array
或BigInt64Array
实例。index
:typedArray
中要检查的元素的索引。value
:typedArray[index]
期望的值。只有当typedArray[index]
等于value
时,Worker 才会进入休眠状态。timeout
: 可选参数,等待的超时时间,单位是毫秒。如果超过超时时间,Worker 会自动醒来。
Atomics.wait
的返回值有三种:
"ok"
: Worker 被Atomics.notify
唤醒。"timed-out"
: Worker 等待超时。"not-equal"
:typedArray[index]
的值不等于value
,Worker 没有进入休眠状态。
Atomics.notify
:唤醒沉睡者
Atomics.notify
用于唤醒等待在 SharedArrayBuffer
上的 Worker。它的语法如下:
Atomics.notify(typedArray, index, count);
typedArray
: 一个共享的Int32Array
或BigInt64Array
实例。index
:typedArray
中要唤醒的元素的索引。count
: 可选参数,要唤醒的 Worker 的数量。如果省略,则唤醒所有等待的 Worker。
Atomics.notify
的返回值是被唤醒的 Worker 的数量。
实战演练:生产者-消费者模型
咱们来用 Atomics.wait
和 Atomics.notify
实现一个经典的生产者-消费者模型。
场景:
- 生产者 (Producer): 负责生产数据,并将数据放入共享缓冲区。
- 消费者 (Consumer): 负责从共享缓冲区取出数据,并进行消费。
- 共享缓冲区 (Shared Buffer): 一个固定大小的数组,用于存储生产者生产的数据。
代码实现:
main.js (主线程):
// 定义缓冲区大小
const BUFFER_SIZE = 10;
// 创建 SharedArrayBuffer
const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * (BUFFER_SIZE + 2)); // +2 用于控制状态
// 创建视图
const buffer = new Int32Array(sharedBuffer);
// 定义状态常量
const EMPTY = 0;
const FULL = BUFFER_SIZE;
// 初始化状态:缓冲区为空
Atomics.store(buffer, BUFFER_SIZE, EMPTY); // buffer[BUFFER_SIZE] 用于记录缓冲区中数据的数量
Atomics.store(buffer, BUFFER_SIZE + 1, 0); // buffer[BUFFER_SIZE+1] 用于记录当前写入位置
// 创建 Worker
const producerWorker = new Worker('producer.js');
const consumerWorker = new Worker('consumer.js');
// 将 SharedArrayBuffer 发送给 Worker
producerWorker.postMessage({ buffer: sharedBuffer, bufferSize: BUFFER_SIZE });
consumerWorker.postMessage({ buffer: sharedBuffer, bufferSize: BUFFER_SIZE });
producer.js (生产者 Worker):
let buffer;
let bufferSize;
self.onmessage = function(event) {
buffer = new Int32Array(event.data.buffer);
bufferSize = event.data.bufferSize;
produce();
};
function produce() {
setInterval(() => {
// 检查缓冲区是否已满
const currentSize = Atomics.load(buffer, bufferSize);
if (currentSize === bufferSize) {
console.log('Producer: Buffer is full, waiting...');
Atomics.wait(buffer, bufferSize, bufferSize); // 等待消费者消费
}
// 生产数据
const data = Math.floor(Math.random() * 100);
const writeIndex = Atomics.load(buffer, bufferSize + 1);
Atomics.store(buffer, writeIndex, data);
console.log(`Producer: Produced ${data} at index ${writeIndex}`);
// 更新写入位置
const newWriteIndex = (writeIndex + 1) % bufferSize;
Atomics.store(buffer, bufferSize + 1, newWriteIndex);
// 增加缓冲区大小
Atomics.add(buffer, bufferSize, 1);
// 通知消费者
Atomics.notify(buffer, bufferSize, 1);
}, 500); // 每 500 毫秒生产一个数据
}
consumer.js (消费者 Worker):
let buffer;
let bufferSize;
self.onmessage = function(event) {
buffer = new Int32Array(event.data.buffer);
bufferSize = event.data.bufferSize;
consume();
};
function consume() {
setInterval(() => {
// 检查缓冲区是否为空
const currentSize = Atomics.load(buffer, bufferSize);
if (currentSize === 0) {
console.log('Consumer: Buffer is empty, waiting...');
Atomics.wait(buffer, bufferSize, 0); // 等待生产者生产
}
// 消费数据
const readIndex = (Atomics.load(buffer, bufferSize + 1) - Atomics.load(buffer, bufferSize) + bufferSize) % bufferSize;
const data = Atomics.load(buffer, readIndex);
console.log(`Consumer: Consumed ${data} from index ${readIndex}`);
// 减少缓冲区大小
Atomics.sub(buffer, bufferSize, 1);
// 通知生产者
Atomics.notify(buffer, bufferSize, 1);
}, 1000); // 每 1000 毫秒消费一个数据
}
代码解释:
-
main.js
:- 创建
SharedArrayBuffer
,大小为BUFFER_SIZE + 2
。其中BUFFER_SIZE
个元素用于存储数据,一个元素用于记录缓冲区中数据的数量 (buffer[BUFFER_SIZE]
),另一个元素用于记录当前写入位置 (buffer[BUFFER_SIZE + 1]
)。 - 初始化状态:缓冲区为空 (
buffer[BUFFER_SIZE] = 0
),写入位置为 0 (buffer[BUFFER_SIZE + 1] = 0
)。 - 创建生产者和消费者 Worker,并将
SharedArrayBuffer
发送给它们。
- 创建
-
producer.js
:- 生产者 Worker 接收到
SharedArrayBuffer
后,会定期检查缓冲区是否已满。 - 如果缓冲区已满,则调用
Atomics.wait(buffer, bufferSize, bufferSize)
进入休眠状态,等待消费者消费。 - 如果缓冲区未满,则生产数据,并将数据放入缓冲区。
- 更新写入位置,并增加缓冲区大小。
- 最后,调用
Atomics.notify(buffer, bufferSize, 1)
通知消费者。
- 生产者 Worker 接收到
-
consumer.js
:- 消费者 Worker 接收到
SharedArrayBuffer
后,会定期检查缓冲区是否为空。 - 如果缓冲区为空,则调用
Atomics.wait(buffer, bufferSize, 0)
进入休眠状态,等待生产者生产。 - 如果缓冲区不为空,则从缓冲区取出数据,并进行消费。
- 减少缓冲区大小。
- 最后,调用
Atomics.notify(buffer, bufferSize, 1)
通知生产者。
- 消费者 Worker 接收到
运行结果:
运行这段代码,你会看到生产者和消费者交替工作,生产者生产数据,消费者消费数据,并且当缓冲区满或空时,Worker 会进入休眠状态,避免了无谓的轮询。
注意事项:
Atomics.wait
只能用于Int32Array
或BigInt64Array
。Atomics.wait
可能会出现 “虚假唤醒 (spurious wakeup)”。 也就是说,即使没有调用Atomics.notify
,Worker 也有可能被唤醒。因此,在Atomics.wait
之后,需要再次检查等待的条件是否满足。Atomics.notify
唤醒的 Worker 的顺序是不确定的。
表格总结:
函数 | 描述 | 参数 | 返回值 |
---|---|---|---|
Atomics.wait |
让 Worker 进入休眠状态,直到满足特定条件才会被唤醒。 | typedArray : Int32Array 或 BigInt64Array 实例;index : 要检查的元素的索引;value : 期望的值;timeout : 超时时间 (可选)。 |
"ok" : Worker 被唤醒;"timed-out" : 等待超时;"not-equal" : 值不相等。 |
Atomics.notify |
唤醒等待在 SharedArrayBuffer 上的 Worker。 |
typedArray : Int32Array 或 BigInt64Array 实例;index : 要唤醒的元素的索引;count : 要唤醒的 Worker 的数量 (可选)。 |
被唤醒的 Worker 的数量。 |
SharedArrayBuffer |
允许多个 Worker 之间共享一块内存区域。 | length : 缓冲区的大小(以字节为单位)。 |
新创建的 SharedArrayBuffer 对象。 |
高级用法:信号量 (Semaphore)
Atomics.wait
和 Atomics.notify
还可以用来实现信号量,用于控制对共享资源的访问。
场景:
- 多个 Worker 需要访问同一个共享资源,但同一时刻只能有一个 Worker 访问。
代码实现 (简化版):
// 创建 SharedArrayBuffer
const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
const semaphore = new Int32Array(sharedBuffer);
// 初始化信号量为 1 (表示资源可用)
Atomics.store(semaphore, 0, 1);
function acquire() {
while (Atomics.compareExchange(semaphore, 0, 1, 0) !== 1) {
console.log("Waiting for semaphore...");
Atomics.wait(semaphore, 0, 0); // 等待信号量变为 1
}
console.log("Acquired semaphore!");
}
function release() {
Atomics.store(semaphore, 0, 1); // 释放信号量
Atomics.notify(semaphore, 0, 1); // 通知等待的 Worker
console.log("Released semaphore!");
}
// 使用示例:
acquire();
// 访问共享资源
// ...
release();
代码解释:
acquire()
函数尝试获取信号量。如果信号量当前为 1 (表示资源可用),则将其设置为 0 (表示资源已被占用),并返回。如果信号量当前为 0 (表示资源已被占用),则调用Atomics.wait
进入休眠状态,等待信号量变为 1。release()
函数释放信号量,将其设置为 1,并调用Atomics.notify
通知等待的 Worker。
总结:
Atomics.wait
和 Atomics.notify
是 JavaScript 中强大的并发编程工具,它们提供了一种高效的等待/通知机制,可以用于实现各种复杂的并发模式,例如生产者-消费者模型、信号量等。
当然,并发编程是一个复杂的话题,需要深入理解其原理和注意事项,才能编写出安全、高效的代码。希望今天的讲解能帮助你更好地理解 Atomics.wait
和 Atomics.notify
,并在实际项目中灵活运用。
记住,并发编程就像玩火,玩得好可以温暖自己,玩不好就会引火烧身。所以,在编写并发代码时,一定要小心谨慎,多加测试,确保代码的正确性和安全性。
好了,今天的并发编程小剧场就到这里,感谢大家的观看!下次再见!