SharedArrayBuffer 上的原子操作:Atomics.wait 与 Atomics.notify 的底层互斥量实现
各位编程爱好者、系统架构师以及对 Web 并发编程充满好奇的朋友们,大家好。在当今这个多核处理器普及的时代,并发编程已不再是后端或系统级语言的专属领域。随着 Web 技术的飞速发展,浏览器环境也对高性能、响应式的并发处理提出了越来越高的要求。JavaScript,作为 Web 的核心语言,传统上以其单线程、事件循环模型而著称。然而,这种模型在面对大量计算密集型任务时,往往会导致 UI 卡顿,用户体验下降。
为了突破这一瓶颈,Web Workers 应运而生,它允许 JavaScript 在后台线程中执行计算,从而避免阻塞主线程。但 Worker 之间的数据共享并非易事,传统的通过 postMessage 传递数据的方式,实际上是对数据进行序列化和反序列化,传递的是数据的副本,而非共享同一份内存。这种机制对于大量数据的共享或需要频繁同步的场景来说,效率低下且复杂。
正是在这样的背景下,SharedArrayBuffer 登上了历史舞台。它提供了一种在多个 Worker 线程以及主线程之间共享同一块内存区域的能力,极大地提升了 Web 应用处理复杂并发任务的潜力。然而,共享内存这把“双刃剑”也带来了新的挑战:如何确保在并发访问共享数据时的正确性与一致性?这正是原子操作 (Atomic Operations) 的用武之地,特别是 Atomics.wait 与 Atomics.notify,它们是构建高级同步原语(如互斥量、条件变量)的基石。
今天的讲座,我们将深入探讨 SharedArrayBuffer 上的原子操作,特别是 Atomics.wait 和 Atomics.notify 如何模拟底层互斥量和条件变量,实现线程间的协同与同步。我们将从基础概念入手,逐步深入,辅以丰富的代码示例,力求让您对这一复杂而强大的机制有一个全面且深刻的理解。
1. SharedArrayBuffer 基础:跨线程共享内存的基石
在深入原子操作之前,我们首先需要理解 SharedArrayBuffer 是什么,以及它为何如此重要。
1.1 ArrayBuffer 与 SharedArrayBuffer 的区别
要理解 SharedArrayBuffer,我们得先从 ArrayBuffer 说起。ArrayBuffer 是 JavaScript 中用于表示通用、固定长度的二进制数据缓冲区的一种类型。它本身并不能直接操作数据,而是作为底层二进制数据的一个“容器”,需要通过 TypedArray(如 Int32Array, Uint8Array, Float64Array 等)或 DataView 来创建视图,进而读写其中的数据。
ArrayBuffer 的一个关键特性是,当它被 postMessage 传递给 Worker 时,它会被“转移”(transfer)。这意味着原始 ArrayBuffer 在发送方会变得不可用,所有权转移到了接收方。这保证了在任何给定时间点,只有一方能够修改数据,从而避免了并发修改的问题。然而,这也意味着数据无法真正共享,每次传递都是所有权的转移。
SharedArrayBuffer 则打破了这一限制。顾名思义,它是一个“可共享的” ArrayBuffer。一旦创建,SharedArrayBuffer 可以通过 postMessage 传递给多个 Worker(或主线程),但与 ArrayBuffer 不同的是,它不会被转移,而是所有线程都获得了对同一个底层内存区域的引用。这意味着所有线程都可以同时读写这块共享内存。
核心区别总结:
| 特性 | ArrayBuffer |
SharedArrayBuffer |
|---|---|---|
| 共享性 | 不可共享,postMessage 导致所有权转移 |
可共享,多个线程可同时访问同一内存区域 |
| 并发访问 | 单线程访问(通过所有权转移保证) | 多线程并发访问 |
| 同步需求 | 无需显式同步原语(数据副本或所有权转移) | 必须 使用原子操作和同步原语(如 Atomics) |
| 用途 | 单线程处理、数据传输 | 多线程并发计算、高性能数据共享 |
1.2 创建与使用 SharedArrayBuffer
创建 SharedArrayBuffer 的方式与 ArrayBuffer 类似,只是构造函数不同:
// 在主线程中创建 SharedArrayBuffer
const sab = new SharedArrayBuffer(1024); // 创建一个 1024 字节的共享缓冲区
// 通过 TypedArray 创建视图来操作数据
// Atomics.wait 和 Atomics.notify 只能在 Int32Array 或 BigInt64Array 视图上操作
const intArray = new Int32Array(sab);
console.log("SharedArrayBuffer size:", sab.byteLength, "bytes");
console.log("Int32Array length:", intArray.length, "elements"); // 1024 / 4 = 256
// 可以在主线程中写入数据
intArray[0] = 123;
console.log("Initial value at index 0:", intArray[0]);
要将 SharedArrayBuffer 传递给 Worker,只需将其作为 postMessage 的第一个参数:
// index.html (主线程)
const worker = new Worker('worker.js');
const sab = new SharedArrayBuffer(1024);
const intArray = new Int32Array(sab);
// 将 SharedArrayBuffer 及其视图发送给 Worker
worker.postMessage({ sab, intArray });
// 此时主线程仍然可以访问和修改 intArray
intArray[1] = 456;
console.log("Main thread modified index 1:", intArray[1]);
// worker.js (Worker 线程)
self.onmessage = function(event) {
const { sab, intArray } = event.data; // Worker 接收到的是对同一块共享内存的引用
console.log("Worker received SharedArrayBuffer:", sab);
console.log("Worker received Int32Array view:", intArray);
// Worker 也可以访问和修改 intArray
console.log("Worker reads index 0:", intArray[0]); // 应该看到主线程写入的 123
console.log("Worker reads index 1:", intArray[1]); // 应该看到主线程写入的 456
intArray[2] = 789; // Worker 修改数据
console.log("Worker modified index 2:", intArray[2]);
// 可以选择向主线程发送消息通知
self.postMessage("Worker finished modifying SharedArrayBuffer.");
};
通过上述示例,您可以看到主线程和 Worker 线程都能够操作同一个 intArray 视图,从而读写同一块 SharedArrayBuffer。这正是 SharedArrayBuffer 强大的地方。然而,这种共享也带来了潜在的危险。
2. 原子操作的必要性:竞态条件与数据完整性
当多个线程同时访问和修改共享内存时,如果不加以适当的同步控制,就可能出现一种被称为“竞态条件”(Race Condition)的问题。竞态条件是指多个线程以不可预测的顺序访问和操作共享资源时,最终结果的正确性取决于这些线程的执行时序。
2.1 竞态条件示例:非原子操作的陷阱
考虑一个看似简单的操作:i++。在高级语言中,这看起来是一个单一的、不可分割的操作。但在底层,它通常会被分解为至少三个步骤:
- 从内存中加载
i的当前值。 - 将加载的值加 1。
- 将新值写回内存中的
i。
现在,假设有两个 Worker 线程 A 和 B,它们都尝试对共享内存中的同一个变量 i 执行 i++ 操作,且 i 的初始值为 0。
| 时间点 | Worker A 操作 | Worker B 操作 | i 的值 |
说明 |
|---|---|---|---|---|
| T1 | 从内存加载 i (0) |
0 | A 读取 i |
|
| T2 | 从内存加载 i (0) |
0 | B 读取 i |
|
| T3 | i 加 1 (0 -> 1) |
0 | A 计算新值 | |
| T4 | i 加 1 (0 -> 1) |
0 | B 计算新值 | |
| T5 | 将新值 1 写回内存 | 1 | A 将 i 更新为 1 |
|
| T6 | 将新值 1 写回内存 | 1 | B 将 i 更新为 1 (覆盖了 A 的结果) |
在这个例子中,尽管两个 Worker 都执行了 i++,但最终 i 的值却是 1,而不是我们期望的 2。这就是一个典型的竞态条件,由于操作的交错执行,导致了数据不一致和结果错误。
2.2 原子操作:不可中断的保证
为了解决竞态条件,我们需要原子操作(Atomic Operations)。原子操作是指在执行过程中不会被其他线程中断的操作。它要么完全执行成功,要么完全不执行,不存在中间状态。对于 i++ 这样的操作,如果能够保证它在底层是原子性的,那么上述的交错执行就不可能发生。当一个线程执行原子 i++ 时,另一个线程必须等待,直到前一个操作完成。
JavaScript 通过 Atomics 对象为 SharedArrayBuffer 提供了各种原子操作,包括原子读写、原子算术运算、原子比较交换,以及我们今天的主角——原子等待和通知操作。
3. Atomics 对象概览
Atomics 是一个全局对象,提供了一系列静态方法,用于在 SharedArrayBuffer 上的 Int32Array 或 BigInt64Array 视图中执行原子操作。这些操作确保了对共享内存的访问是不可中断的,从而避免了竞态条件。
Atomics 对象提供的主要方法可以分为以下几类:
-
原子读写操作:
Atomics.load(typedArray, index): 原子地读取指定索引的值。Atomics.store(typedArray, index, value): 原子地将值写入指定索引。
-
原子算术和位操作:
Atomics.add(typedArray, index, value): 原子地将value加到指定索引的值上,并返回旧值。Atomics.sub(typedArray, index, value): 原子地从指定索引的值中减去value,并返回旧值。Atomics.and(typedArray, index, value): 原子地对指定索引的值执行按位 AND 操作,并返回旧值。Atomics.or(typedArray, index, value): 原子地对指定索引的值执行按位 OR 操作,并返回旧值。Atomics.xor(typedArray, index, value): 原子地对指定索引的值执行按位 XOR 操作,并返回旧值。
-
原子比较交换操作:
Atomics.compareExchange(typedArray, index, expectedValue, replacementValue): 原子地检查指定索引的值是否等于expectedValue。如果是,则将其替换为replacementValue,并返回旧值。这是一个非常强大的原语,是构建许多高级同步机制的基础。
-
等待和通知操作(本讲座核心):
Atomics.wait(typedArray, index, value, [timeout]): 如果typedArray[index]等于value,则使当前 Worker 暂停执行,进入等待状态。Atomics.notify(typedArray, index, [count]): 唤醒正在typedArray[index]上等待的一个或多个 Worker。
-
内存同步屏障(Memory Fences):
Atomics.fence(): 确保内存操作的顺序性。这通常在实现复杂的无锁数据结构时使用,以保证不同线程观察到的内存写入顺序符合预期。
所有 Atomics 方法都要求操作的 typedArray 必须是 Int32Array 或 BigInt64Array 的实例,这是因为这些整数类型能够映射到处理器级别的原子指令,并且 wait/notify 需要固定大小的整数来作为“地址”进行协调。
4. Atomics.wait 与 Atomics.notify 深度剖析:底层互斥量实现
原子读写和算术操作解决了单一内存位置的竞态问题,但它们不足以解决更复杂的线程协调问题,例如“等待一个条件变为真”或“通知其他线程某个事件已发生”。这就是 Atomics.wait 和 Atomics.notify 登场的原因。它们提供了一种高效的、低级的线程间通信机制,是模拟操作系统级互斥量(Mutex)和条件变量(Condition Variable)的关键。
4.1 核心概念:用户态互斥量与条件变量的模拟
在操作系统层面,线程同步通常依赖于内核提供的互斥量和条件变量。互斥量用于保护共享资源,确保在任何时刻只有一个线程可以访问它。条件变量则用于线程间的通知,允许线程在特定条件不满足时挂起,并在条件满足时被唤醒。
Atomics.wait 和 Atomics.notify 为 JavaScript 在用户态(User-Space)提供了类似的功能:
Atomics.wait相当于条件变量的wait()操作。它让当前 Worker 线程进入休眠状态,直到被Atomics.notify唤醒,或者达到超时时间。Atomics.notify相当于条件变量的signal()或broadcast()操作。它唤醒一个或多个在特定内存地址上等待的 Worker 线程。
它们的底层实现通常依赖于操作系统的 Futex (Fast Userspace mUTEX) 机制,这种机制允许在用户空间进行线程同步,只有在发生争用时才需要陷入内核,从而提高了效率。
4.2 Atomics.wait 签名与参数
Atomics.wait(typedArray, index, value, [timeout])
typedArray: 必须是Int32Array或BigInt64Array的实例,用于操作共享内存。index:typedArray中要等待的元素的索引。这是所有等待和通知操作的“同步点”。value: 期望的值。这是Atomics.wait的一个关键特性。 如果typedArray[index]的当前值不等于value,那么Atomics.wait将立即返回"not-equal",而不会使线程进入等待状态。这个参数起到了一个“快速失败”的作用,避免了在条件已经满足时进行不必要的等待,也防止了“丢失唤醒”(lost wakeup)的问题(即在检查条件后、进入等待前,条件被另一个线程满足并唤醒)。timeout(可选): 一个表示等待毫秒数的整数。如果指定了timeout且在指定时间内未被唤醒,Atomics.wait将返回"timed-out"。如果timeout为Infinity,则表示无限期等待。
返回值:
Atomics.wait 返回一个字符串,表示等待的结果:
"ok": 成功被Atomics.notify唤醒。"not-equal": 在调用Atomics.wait时,typedArray[index]的值与value不匹配。"timed-out": 在timeout指定的时间内未被唤醒。
4.3 Atomics.notify 签名与参数
Atomics.notify(typedArray, index, [count])
typedArray: 必须是Int32Array或BigInt64Array的实例。index:typedArray中要唤醒等待线程的元素的索引。count(可选): 一个整数,表示要唤醒的 Worker 线程的最大数量。- 如果
count为0,则不唤醒任何线程。 - 如果
count为1,则唤醒一个线程。 - 如果
count为Infinity或省略,则唤醒所有在该index上等待的线程。
- 如果
返回值:
Atomics.notify 返回一个整数,表示实际被唤醒的 Worker 线程的数量。
4.4 工作机制:伪唤醒与循环检查条件
理解 Atomics.wait 的正确使用方式至关重要。由于存在“伪唤醒”(Spurious Wakeups)的可能性,以及为了确保条件真正满足,Atomics.wait 必须在一个循环中调用,并在每次从等待中返回时重新检查条件。
伪唤醒 (Spurious Wakeups): 即使没有 Atomics.notify 调用,一个线程也可能从 Atomics.wait 中返回。这通常是操作系统调度器的一个特性,为了简单和效率,它可能会在条件尚未满足时唤醒线程。因此,不能假设从 wait 返回就意味着条件已经满足。
正确的使用模式:
// Worker 线程等待某个条件
while (Atomics.load(typedArray, index) === EXPECTED_VALUE) {
// 只有当值仍然是 EXPECTED_VALUE 时才进入等待
const result = Atomics.wait(typedArray, index, EXPECTED_VALUE);
// 即使 Atomics.wait 返回 "ok",也需要重新检查条件,因为可能存在伪唤醒
// 或者在被唤醒后,其他线程又改变了条件
if (result === "timed-out") {
console.warn("Wait timed out, condition still not met.");
// 可以选择处理超时逻辑
break; // 或者继续等待
}
}
// 此时,条件已经满足,可以执行后续操作
Atomics.wait 的 value 参数是防御性的。它防止了所谓的“丢失唤醒”:如果在线程检查条件为 EXPECTED_VALUE 之后,但在它实际进入等待队列之前,另一个线程改变了值并通过 notify 唤醒,那么这个 notify 就会被“丢失”,导致前一个线程无限期等待。Atomics.wait 只有在 typedArray[index] 确实等于 value 时才会进入等待状态,否则会立即返回 "not-equal",从而避免了这种情况。
惊群效应 (Thundering Herd): 当多个 Worker 在同一个 index 上等待时,如果一个 notify 唤醒了所有等待者 (count: Infinity),那么所有被唤醒的 Worker 都会尝试去检查条件或竞争资源。如果实际上只有一个 Worker 能够成功处理该条件,那么其他 Worker 就会立即发现条件不满足,然后再次进入等待状态,这会造成不必要的 CPU 资源浪费。因此,count 参数的选择需要根据具体场景谨慎考虑。如果只有一个任务可以被处理,通常 notify(typedArray, index, 1) 更高效。
4.5 死锁 (Deadlock) 风险
与所有并发编程中的同步原语一样,Atomics.wait 和 Atomics.notify 的不当使用也可能导致死锁。死锁是指两个或多个线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干涉,它们都将无法继续执行。
例如,如果一个 Worker 线程在未释放其持有的锁的情况下调用 Atomics.wait,而另一个 Worker 线程又需要这个锁来执行 Atomics.notify,那么就会发生死锁。因此,在使用 wait/notify 时,必须仔细设计同步逻辑,确保资源的获取和释放顺序正确,并避免循环等待。
4.6 代码示例:简单的生产者-消费者模型
我们将通过一个经典的生产者-消费者模型来演示 Atomics.wait 和 Atomics.notify 的用法。在这个模型中,一个或多个生产者向共享缓冲区中添加数据,一个或多个消费者从缓冲区中取出数据。我们需要确保:
- 生产者不能在缓冲区满时写入。
- 消费者不能在缓冲区空时读取。
- 共享缓冲区的访问是原子的,以避免数据损坏。
这里我们简化模型,假设只有一个生产者和一个消费者,共享一个 SharedArrayBuffer 作为缓冲区。缓冲区中,一个位置存储数据,另一个位置作为状态标志。
<!-- index.html -->
<!DOCTYPE html>
<html>
<head>
<title>Atomics.wait/notify Producer-Consumer Demo</title>
</head>
<body>
<h1>生产者-消费者模型 (Atomics.wait/notify)</h1>
<p>打开控制台查看输出。</p>
<script>
// 共享缓冲区结构:
// index 0: 状态标志 (0: 空, 1: 有数据)
// index 1: 实际数据
const BUFFER_SIZE = 2; // 两个 Int32 元素
const STATUS_INDEX = 0;
const DATA_INDEX = 1;
const sab = new SharedArrayBuffer(BUFFER_SIZE * Int32Array.BYTES_PER_ELEMENT);
const sharedArray = new Int32Array(sab);
// 初始化状态:缓冲区为空
Atomics.store(sharedArray, STATUS_INDEX, 0);
Atomics.store(sharedArray, DATA_INDEX, 0); // 初始化数据为 0
console.log("主线程: SharedArrayBuffer 初始化完成.");
console.log(`主线程: 初始状态 - sharedArray[${STATUS_INDEX}]=${sharedArray[STATUS_INDEX]}, sharedArray[${DATA_INDEX}]=${sharedArray[DATA_INDEX]}`);
// --- 生产者 Worker ---
const producerWorker = new Worker(URL.createObjectURL(new Blob([`
self.onmessage = function(event) {
const { sharedArray, STATUS_INDEX, DATA_INDEX } = event.data;
let produceCount = 0;
function produce() {
// 生产者尝试写入数据
// 只有当状态为 0 (空) 时才写入
while (true) {
// 1. 等待缓冲区为空 (STATUS_INDEX === 0)
console.log(`[生产者] 尝试等待缓冲区为空... (当前状态: ${Atomics.load(sharedArray, STATUS_INDEX)})`);
const waitResult = Atomics.wait(sharedArray, STATUS_INDEX, 0, 5000); // 最多等待5秒
if (waitResult === "timed-out") {
console.warn("[生产者] 等待缓冲区为空超时,退出生产。");
break;
} else if (waitResult === "not-equal") {
// 伪唤醒或在检查后立即被其他线程填充
// 继续循环,重新检查条件
console.log("[生产者] 伪唤醒或条件不匹配,重新检查。");
continue;
}
// 如果到达这里,说明 Atomics.wait 成功返回 "ok",并且在进入等待前 STATUS_INDEX 确实是 0
// 但是,为了鲁棒性,仍然需要再次检查(伪唤醒的可能)
if (Atomics.load(sharedArray, STATUS_INDEX) === 0) {
produceCount++;
const newData = produceCount * 100;
console.log(`[生产者] 缓冲区为空,写入数据: ${newData}`);
Atomics.store(sharedArray, DATA_INDEX, newData);
Atomics.store(sharedArray, STATUS_INDEX, 1); // 设置状态为 1 (有数据)
// 2. 通知消费者有新数据
console.log("[生产者] 通知消费者有新数据.");
Atomics.notify(sharedArray, STATUS_INDEX, 1); // 唤醒一个等待的消费者
// 模拟生产间隔
setTimeout(produce, Math.random() * 1000 + 500); // 0.5 - 1.5秒后再次生产
break; // 成功生产一次后退出当前循环,等待下一次生产
} else {
// 再次检查发现条件不满足,可能是伪唤醒或者被其他生产者抢先
console.log("[生产者] 伪唤醒后条件不满足,重新进入等待。");
}
}
}
produce(); // 启动生产
};
`])));
producerWorker.postMessage({ sharedArray, STATUS_INDEX, DATA_INDEX });
// --- 消费者 Worker ---
const consumerWorker = new Worker(URL.createObjectURL(new Blob([`
self.onmessage = function(event) {
const { sharedArray, STATUS_INDEX, DATA_INDEX } = event.data;
let consumeCount = 0;
function consume() {
// 消费者尝试读取数据
// 只有当状态为 1 (有数据) 时才读取
while (true) {
// 1. 等待缓冲区有数据 (STATUS_INDEX === 1)
console.log(`[消费者] 尝试等待缓冲区有数据... (当前状态: ${Atomics.load(sharedArray, STATUS_INDEX)})`);
const waitResult = Atomics.wait(sharedArray, STATUS_INDEX, 1, 5000); // 最多等待5秒
if (waitResult === "timed-out") {
console.warn("[消费者] 等待缓冲区有数据超时,退出消费。");
break;
} else if (waitResult === "not-equal") {
// 伪唤醒或在检查后立即被其他线程清空
// 继续循环,重新检查条件
console.log("[消费者] 伪唤醒或条件不匹配,重新检查。");
continue;
}
// 如果到达这里,说明 Atomics.wait 成功返回 "ok",并且在进入等待前 STATUS_INDEX 确实是 1
if (Atomics.load(sharedArray, STATUS_INDEX) === 1) {
consumeCount++;
const consumedData = Atomics.load(sharedArray, DATA_INDEX);
console.log(`[消费者] 缓冲区有数据,读取数据: ${consumedData}`);
Atomics.store(sharedArray, DATA_INDEX, 0); // 清空数据
Atomics.store(sharedArray, STATUS_INDEX, 0); // 设置状态为 0 (空)
// 2. 通知生产者缓冲区已空
console.log("[消费者] 通知生产者缓冲区已空.");
Atomics.notify(sharedArray, STATUS_INDEX, 1); // 唤醒一个等待的生产者
// 模拟消费间隔
setTimeout(consume, Math.random() * 1000 + 500); // 0.5 - 1.5秒后再次消费
break; // 成功消费一次后退出当前循环,等待下一次消费
} else {
// 再次检查发现条件不满足,可能是伪唤醒或者被其他消费者抢先
console.log("[消费者] 伪唤醒后条件不满足,重新进入等待。");
}
}
}
consume(); // 启动消费
};
`])));
consumerWorker.postMessage({ sharedArray, STATUS_INDEX, DATA_INDEX });
// 可以在主线程中偶尔检查共享状态
setInterval(() => {
console.log(`[主线程] 当前共享状态: sharedArray[${STATUS_INDEX}]=${sharedArray[STATUS_INDEX]}, sharedArray[${DATA_INDEX}]=${sharedArray[DATA_INDEX]}`);
}, 3000); // 每3秒检查一次
</script>
</body>
</html>
代码解析:
- 共享内存布局: 我们使用
sharedArray[0]作为状态标志 (STATUS_INDEX),0表示缓冲区为空,1表示有数据。sharedArray[1](DATA_INDEX) 用于存储实际的生产/消费数据。 - 生产者逻辑:
- 生产者在一个
while(true)循环中运行,不断尝试生产。 - 它首先调用
Atomics.wait(sharedArray, STATUS_INDEX, 0)。这意味着它会等待sharedArray[STATUS_INDEX]的值变为0(即缓冲区为空)。 - 如果
wait返回"ok",并且再次检查STATUS_INDEX确认确实为0,生产者就会写入新数据到DATA_INDEX,并将STATUS_INDEX更新为1。 - 写入完成后,它会调用
Atomics.notify(sharedArray, STATUS_INDEX, 1),唤醒一个在STATUS_INDEX上等待的消费者。 - 生产者在成功生产一次后,通过
setTimeout模拟生产间隔,然后再次启动生产流程。
- 生产者在一个
- 消费者逻辑:
- 消费者也在一个
while(true)循环中运行,不断尝试消费。 - 它调用
Atomics.wait(sharedArray, STATUS_INDEX, 1),等待sharedArray[STATUS_INDEX]的值变为1(即缓冲区有数据)。 - 如果
wait返回"ok",并且再次检查STATUS_INDEX确认确实为1,消费者就会读取DATA_INDEX的数据,清空DATA_INDEX并将STATUS_INDEX更新为0。 - 读取完成后,它会调用
Atomics.notify(sharedArray, STATUS_INDEX, 1),唤醒一个在STATUS_INDEX上等待的生产者。 - 消费者在成功消费一次后,通过
setTimeout模拟消费间隔,然后再次启动消费流程。
- 消费者也在一个
waitResult处理:Atomics.wait的返回值timed-out和not-equal都需要妥善处理。not-equal意味着在调用wait时条件不满足,或者发生了伪唤醒。因此,即使返回ok,也必须在while循环中再次检查条件,这是使用Atomics.wait的黄金法则。
这个示例清晰地展示了 Atomics.wait 和 Atomics.notify 如何协同工作,实现线程间的同步和通信,确保共享资源的正确访问。
5. 高级主题与最佳实践
在掌握了 Atomics.wait 和 Atomics.notify 的基本用法后,我们可以进一步探讨如何利用它们构建更复杂的同步原语,并考虑一些高级问题。
5.1 构建更复杂的同步原语
Atomics.wait 和 Atomics.notify 是低级原语,它们可以作为构建更高级同步机制的基石。
5.1.1 互斥锁 (Mutex) 的实现
互斥锁是一种最基本的同步原语,它保证在任何时刻只有一个线程能够进入“临界区”并访问共享资源。我们可以使用 Atomics.compareExchange 和 Atomics.wait/notify 来实现一个简单的互斥锁。
互斥锁状态:
0: 解锁 (Unlocked)1: 锁定 (Locked)
// Mutex 类的实现(在主线程或 Worker 中都可以定义和使用)
class Mutex {
constructor(sharedArray, index) {
// sharedArray 必须是 Int32Array 或 BigInt64Array
// index 是 Mutex 状态在 sharedArray 中的位置
this.sharedArray = sharedArray;
this.index = index;
// 确保 Mutex 初始状态为解锁
if (Atomics.load(this.sharedArray, this.index) !== 0) {
Atomics.store(this.sharedArray, this.index, 0);
}
}
// 获取锁
lock() {
while (true) {
// 尝试将状态从 0 (解锁) 变为 1 (锁定)
const oldValue = Atomics.compareExchange(this.sharedArray, this.index, 0, 1);
if (oldValue === 0) {
// 成功获取锁,状态从 0 变为 1
return;
}
// 如果 oldValue 不是 0,说明锁已经被其他线程持有
// 等待锁被释放 (即状态变为 0)
console.log(`[Thread ${self.name || 'Main'}] 锁已被持有,等待释放...`);
const waitResult = Atomics.wait(this.sharedArray, this.index, 1); // 等待状态从 1 变为其他值 (通常是 0)
// 即使 wait 成功返回,也可能存在伪唤醒或锁再次被其他线程获取
// 循环会再次尝试 compareExchange
if (waitResult === "timed-out") {
console.warn(`[Thread ${self.name || 'Main'}] 获取锁超时,继续尝试。`);
}
}
}
// 释放锁
unlock() {
// 确保当前线程确实持有锁 (即状态为 1)
if (Atomics.load(this.sharedArray, this.index) !== 1) {
console.error(`[Thread ${self.name || 'Main'}] 尝试释放一个未持有的锁!`);
return;
}
// 将状态从 1 (锁定) 变为 0 (解锁)
Atomics.store(this.sharedArray, this.index, 0);
console.log(`[Thread ${self.name || 'Main'}] 释放锁。`);
// 通知一个等待的线程
Atomics.notify(this.sharedArray, this.index, 1);
}
}
使用示例:
// main.js 或 worker.js
// 共享缓冲区
const mutexSab = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
const mutexSharedArray = new Int32Array(mutexSab);
// Mutex 实例,状态存储在 mutexSharedArray[0]
const myMutex = new Mutex(mutexSharedArray, 0);
// 共享计数器,受 Mutex 保护
const counterSab = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
const counterSharedArray = new Int32Array(counterSab);
Atomics.store(counterSharedArray, 0, 0); // 计数器初始值
// 在主线程中模拟一个任务
async function runTask(threadName, count) {
self.name = threadName; // 方便日志识别
for (let i = 0; i < count; i++) {
myMutex.lock(); // 获取锁
try {
// 临界区:安全地访问共享计数器
const currentValue = Atomics.load(counterSharedArray, 0);
const newValue = currentValue + 1;
Atomics.store(counterSharedArray, 0, newValue);
console.log(`[${threadName}] Counter: ${newValue}`);
} finally {
myMutex.unlock(); // 释放锁
}
await new Promise(resolve => setTimeout(resolve, Math.random() * 50)); // 模拟一些工作
}
}
// 主线程作为第一个并发任务
runTask('Main Thread', 10);
// 启动一个 Worker 线程作为第二个并发任务
const workerMutex = new Worker(URL.createObjectURL(new Blob([`
self.onmessage = async function(event) {
const { mutexSab, counterSab } = event.data;
const mutexSharedArray = new Int32Array(mutexSab);
const counterSharedArray = new Int32Array(counterSab);
class Mutex {
constructor(sharedArray, index) {
this.sharedArray = sharedArray;
this.index = index;
}
lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.sharedArray, this.index, 0, 1);
if (oldValue === 0) return;
console.log(`[Worker Thread] 锁已被持有,等待释放...`);
const waitResult = Atomics.wait(this.sharedArray, this.index, 1);
if (waitResult === "timed-out") {
console.warn(`[Worker Thread] 获取锁超时,继续尝试。`);
}
}
}
unlock() {
if (Atomics.load(this.sharedArray, this.index) !== 1) {
console.error(`[Worker Thread] 尝试释放一个未持有的锁!`);
return;
}
Atomics.store(this.sharedArray, this.index, 0);
console.log(`[Worker Thread] 释放锁。`);
Atomics.notify(this.sharedArray, this.index, 1);
}
}
const myMutex = new Mutex(mutexSharedArray, 0);
self.name = 'Worker Thread';
for (let i = 0; i < 10; i++) {
myMutex.lock();
try {
const currentValue = Atomics.load(counterSharedArray, 0);
const newValue = currentValue + 1;
Atomics.store(counterSharedArray, 0, newValue);
console.log(`[Worker Thread] Counter: ${newValue}`);
} finally {
myMutex.unlock();
}
await new Promise(resolve => setTimeout(resolve, Math.random() * 50));
}
self.postMessage('Worker finished.');
};
`])));
workerMutex.postMessage({ mutexSab, counterSab });
在这个 Mutex 实现中:
lock()方法使用Atomics.compareExchange尝试将锁的状态从0(解锁) 原子地变为1(锁定)。- 如果
compareExchange返回0,说明成功获取锁。 - 如果返回
1,说明锁已被其他线程持有,当前线程进入Atomics.wait(..., 1)状态,等待锁被释放。 unlock()方法将锁状态设回0,并调用Atomics.notify唤醒一个等待的线程。
5.1.2 信号量 (Semaphore) 的实现
信号量是互斥锁的推广,它允许同时有 N 个线程访问共享资源。我们可以用一个整数来表示可用资源的数量,当资源数量为 0 时,线程等待。
// Semaphore 类的实现
class Semaphore {
constructor(sharedArray, index, initialCount) {
this.sharedArray = sharedArray;
this.index = index;
Atomics.store(this.sharedArray, this.index, initialCount);
}
// 获取资源
acquire() {
while (true) {
const currentValue = Atomics.load(this.sharedArray, this.index);
if (currentValue > 0) {
// 如果有可用资源,尝试原子地减 1
const oldValue = Atomics.compareExchange(this.sharedArray, this.index, currentValue, currentValue - 1);
if (oldValue === currentValue) {
// 成功获取资源
return;
}
// 否则,说明在检查和 compareExchange 之间有其他线程抢先,重新尝试
continue;
}
// 如果没有可用资源,则等待
console.log(`[Thread ${self.name || 'Main'}] 没有可用资源,等待...`);
const waitResult = Atomics.wait(this.sharedArray, this.index, 0); // 等待资源数变为非零
if (waitResult === "timed-out") {
console.warn(`[Thread ${self.name || 'Main'}] 获取信号量超时,继续尝试。`);
}
// 伪唤醒或条件不匹配,重新检查
}
}
// 释放资源
release() {
// 原子地将资源数量加 1
Atomics.add(this.sharedArray, this.index, 1);
console.log(`[Thread ${self.name || 'Main'}] 释放资源,当前可用: ${Atomics.load(this.sharedArray, this.index)}`);
// 通知一个等待的线程
Atomics.notify(this.sharedArray, this.index, 1);
}
}
信号量的实现与互斥锁类似,只是状态变量代表的是可用资源的数量,acquire() 操作会尝试减少资源数量,release() 操作会增加资源数量。当资源数量为 0 时,acquire() 会进入等待。
5.2 内存模型与顺序一致性 (Memory Model and Sequential Consistency)
并发编程中一个复杂但至关重要的概念是内存模型。它定义了多处理器系统如何以及何时将内存操作(读和写)反映给其他处理器。不同的处理器架构和编程语言有不同的内存模型。
JavaScript 的 Atomics 操作提供了强大的内存同步保证。具体来说,Atomics 操作(如 load, store, add, compareExchange 等)都具有“顺序一致性”(Sequential Consistency)或更弱的“获取-释放”(Acquire-Release)语义。
- 顺序一致性:所有线程观察到的内存操作顺序都是相同的,并且与程序代码中指定的顺序一致。这是最直观但通常开销最大的模型。
- 获取-释放语义:
- 获取操作 (Acquire):例如
Atomics.load或Atomics.compareExchange(当读取时),它确保在其之后的所有内存操作都不能被重排到它之前。 - 释放操作 (Release):例如
Atomics.store或Atomics.compareExchange(当写入时),它确保在其之前的所有内存操作都不能被重排到它之后。 - 当一个线程的释放操作与另一个线程的获取操作配对时,它能保证“发生在……之前”的顺序,确保数据在线程间正确同步。
- 获取操作 (Acquire):例如
Atomics.wait 隐式地包含了一个获取操作,而 Atomics.notify 隐式地包含了一个释放操作。这意味着当一个线程被 notify 唤醒并从 wait 返回时,它能够“看到”所有在 notify 之前发生的内存写入。这种保证对于正确同步共享数据至关重要,它防止了编译器和处理器为了优化而对内存操作进行重排,从而导致数据可见性问题。
虽然 Atomics.fence() 也存在,但对于大多数日常的同步需求,Atomics 的其他操作(特别是 wait/notify 和 compareExchange)已经提供了足够的内存同步保证,通常不需要显式使用 fence。
5.3 性能考量
Atomics.wait 和 Atomics.notify 在实现上通常利用操作系统的 Futex 机制,这意味着它们在没有竞争的情况下(即不需要实际等待时)开销非常小,几乎是用户态操作。只有当线程确实需要等待时,它们才会陷入内核并进入休眠状态,这会带来一些上下文切换的开销。
性能最佳实践:
- 避免不必要的等待和唤醒: 只有当确实需要等待某个条件时才调用
Atomics.wait。频繁的唤醒和等待会增加上下文切换的开销。 - 谨慎使用
Atomics.notify(..., Infinity): 唤醒所有等待者(惊群效应)可能导致不必要的竞争和开销。如果只需要一个线程处理,优先使用Atomics.notify(..., 1)。 - 使用
timeout参数: 对于可能长时间等待或需要及时响应的场景,设置一个合理的timeout可以防止线程无限期阻塞,并允许应用程序在超时时执行回退逻辑或报告错误。 - 忙等待 (Busy Waiting) vs. 阻塞等待:
Atomics.wait是一种阻塞等待机制,它让线程进入休眠,不消耗 CPU 周期。相比之下,如果在一个循环中不断地Atomics.load检查条件而不调用wait,这就是忙等待,会浪费大量的 CPU 资源,应尽量避免。
5.4 错误处理与鲁棒性
Atomics.wait的返回值: 始终检查Atomics.wait的返回值 ("ok","not-equal","timed-out"),并根据不同的结果采取相应的处理逻辑。特别是timed-out,可能需要重试、报告错误或采取其他策略。- Worker 异常终止: 如果一个 Worker 在持有锁或处于特定状态时异常终止,可能会导致其他 Worker 永久等待,形成死锁。在实际应用中,需要考虑更健壮的错误恢复机制,例如超时重试、清理机制或监控 Worker 状态。JavaScript 层面很难直接处理 Worker 的非正常终止对共享内存状态的影响,这通常需要更高级的协调逻辑。
- 共享内存的初始化: 确保
SharedArrayBuffer及其视图在所有线程中都得到正确的初始化,并且同步变量(如互斥锁、信号量的状态)具有正确的初始值。
6. 实际应用场景
SharedArrayBuffer 和 Atomics 机制为 Web 应用带来了前所未有的并发处理能力,使其能够胜任更复杂的任务:
- WebAssembly 与 JavaScript 线程间通信: WebAssembly 模块可以在 Worker 中运行,并且可以直接访问
SharedArrayBuffer。这使得高性能的 WebAssembly 代码可以与 JavaScript 线程高效地共享数据和同步执行。 - 高性能数据处理与并行计算: 例如,图像处理、视频编码/解码、科学计算等计算密集型任务可以被分解成小块,由多个 Worker 并行处理,并将结果汇集到共享内存中。
- 游戏引擎中的并发任务: 游戏逻辑、物理模拟、AI 计算等可以分配给不同的 Worker 线程,通过
SharedArrayBuffer共享游戏状态,实现更流畅的游戏体验。 - 复杂的用户界面渲染: 在一些高级 UI 框架中,可以使用 Worker 线程进行布局计算或部分渲染,然后通过
SharedArrayBuffer将结果数据传递给主线程进行最终渲染,避免 UI 阻塞。 - 构建 Web 数据库或缓存: 可以在 Worker 中维护一个共享的内存数据库或缓存,多个 Worker 和主线程都可以通过原子操作安全地读写数据。
7. 共享内存与原子操作的演进
SharedArrayBuffer 和 Atomics 的引入标志着 Web 平台并发模型的一个重要里程碑。尽管它们提供了强大的能力,但也带来了更高的复杂性,要求开发者对并发编程的基本原理有深刻的理解。未来,Web 平台可能会继续探索更高级的并发原语,例如更完善的锁、条件变量,甚至是一些无锁数据结构的支持,以进一步简化并发编程的难度,同时保持高性能。随着 WebAssembly System Interface (WASI) 等技术的发展,Web 环境中的线程和内存管理将与底层系统能力更加紧密地结合,为构建更强大的 Web 应用开辟新的道路。
通过今天的讲座,我们深入探讨了 SharedArrayBuffer 上的原子操作,特别是 Atomics.wait 和 Atomics.notify 如何作为用户态互斥量和条件变量的实现基础。我们理解了它们的参数、工作机制,以及如何通过它们构建生产者-消费者模型和互斥锁等同步原语。这些强大的工具为 JavaScript 带来了真正的共享内存并发能力,使 Web 开发者能够构建出更复杂、更高效的应用程序。掌握这些知识,您将能够更好地驾驭现代 Web 应用的并发挑战。