SharedArrayBuffer 上的原子操作:`Atomics.wait` 与 `Atomics.notify` 的底层互斥量实现

SharedArrayBuffer 上的原子操作:Atomics.waitAtomics.notify 的底层互斥量实现

各位编程爱好者、系统架构师以及对 Web 并发编程充满好奇的朋友们,大家好。在当今这个多核处理器普及的时代,并发编程已不再是后端或系统级语言的专属领域。随着 Web 技术的飞速发展,浏览器环境也对高性能、响应式的并发处理提出了越来越高的要求。JavaScript,作为 Web 的核心语言,传统上以其单线程、事件循环模型而著称。然而,这种模型在面对大量计算密集型任务时,往往会导致 UI 卡顿,用户体验下降。

为了突破这一瓶颈,Web Workers 应运而生,它允许 JavaScript 在后台线程中执行计算,从而避免阻塞主线程。但 Worker 之间的数据共享并非易事,传统的通过 postMessage 传递数据的方式,实际上是对数据进行序列化和反序列化,传递的是数据的副本,而非共享同一份内存。这种机制对于大量数据的共享或需要频繁同步的场景来说,效率低下且复杂。

正是在这样的背景下,SharedArrayBuffer 登上了历史舞台。它提供了一种在多个 Worker 线程以及主线程之间共享同一块内存区域的能力,极大地提升了 Web 应用处理复杂并发任务的潜力。然而,共享内存这把“双刃剑”也带来了新的挑战:如何确保在并发访问共享数据时的正确性与一致性?这正是原子操作 (Atomic Operations) 的用武之地,特别是 Atomics.waitAtomics.notify,它们是构建高级同步原语(如互斥量、条件变量)的基石。

今天的讲座,我们将深入探讨 SharedArrayBuffer 上的原子操作,特别是 Atomics.waitAtomics.notify 如何模拟底层互斥量和条件变量,实现线程间的协同与同步。我们将从基础概念入手,逐步深入,辅以丰富的代码示例,力求让您对这一复杂而强大的机制有一个全面且深刻的理解。

1. SharedArrayBuffer 基础:跨线程共享内存的基石

在深入原子操作之前,我们首先需要理解 SharedArrayBuffer 是什么,以及它为何如此重要。

1.1 ArrayBufferSharedArrayBuffer 的区别

要理解 SharedArrayBuffer,我们得先从 ArrayBuffer 说起。ArrayBuffer 是 JavaScript 中用于表示通用、固定长度的二进制数据缓冲区的一种类型。它本身并不能直接操作数据,而是作为底层二进制数据的一个“容器”,需要通过 TypedArray(如 Int32Array, Uint8Array, Float64Array 等)或 DataView 来创建视图,进而读写其中的数据。

ArrayBuffer 的一个关键特性是,当它被 postMessage 传递给 Worker 时,它会被“转移”(transfer)。这意味着原始 ArrayBuffer 在发送方会变得不可用,所有权转移到了接收方。这保证了在任何给定时间点,只有一方能够修改数据,从而避免了并发修改的问题。然而,这也意味着数据无法真正共享,每次传递都是所有权的转移。

SharedArrayBuffer 则打破了这一限制。顾名思义,它是一个“可共享的” ArrayBuffer。一旦创建,SharedArrayBuffer 可以通过 postMessage 传递给多个 Worker(或主线程),但与 ArrayBuffer 不同的是,它不会被转移,而是所有线程都获得了对同一个底层内存区域的引用。这意味着所有线程都可以同时读写这块共享内存。

核心区别总结:

特性 ArrayBuffer SharedArrayBuffer
共享性 不可共享,postMessage 导致所有权转移 可共享,多个线程可同时访问同一内存区域
并发访问 单线程访问(通过所有权转移保证) 多线程并发访问
同步需求 无需显式同步原语(数据副本或所有权转移) 必须 使用原子操作和同步原语(如 Atomics
用途 单线程处理、数据传输 多线程并发计算、高性能数据共享

1.2 创建与使用 SharedArrayBuffer

创建 SharedArrayBuffer 的方式与 ArrayBuffer 类似,只是构造函数不同:

// 在主线程中创建 SharedArrayBuffer
const sab = new SharedArrayBuffer(1024); // 创建一个 1024 字节的共享缓冲区

// 通过 TypedArray 创建视图来操作数据
// Atomics.wait 和 Atomics.notify 只能在 Int32Array 或 BigInt64Array 视图上操作
const intArray = new Int32Array(sab);

console.log("SharedArrayBuffer size:", sab.byteLength, "bytes");
console.log("Int32Array length:", intArray.length, "elements"); // 1024 / 4 = 256

// 可以在主线程中写入数据
intArray[0] = 123;
console.log("Initial value at index 0:", intArray[0]);

要将 SharedArrayBuffer 传递给 Worker,只需将其作为 postMessage 的第一个参数:

// index.html (主线程)
const worker = new Worker('worker.js');

const sab = new SharedArrayBuffer(1024);
const intArray = new Int32Array(sab);

// 将 SharedArrayBuffer 及其视图发送给 Worker
worker.postMessage({ sab, intArray });

// 此时主线程仍然可以访问和修改 intArray
intArray[1] = 456;
console.log("Main thread modified index 1:", intArray[1]);
// worker.js (Worker 线程)
self.onmessage = function(event) {
    const { sab, intArray } = event.data; // Worker 接收到的是对同一块共享内存的引用

    console.log("Worker received SharedArrayBuffer:", sab);
    console.log("Worker received Int32Array view:", intArray);

    // Worker 也可以访问和修改 intArray
    console.log("Worker reads index 0:", intArray[0]); // 应该看到主线程写入的 123
    console.log("Worker reads index 1:", intArray[1]); // 应该看到主线程写入的 456

    intArray[2] = 789; // Worker 修改数据
    console.log("Worker modified index 2:", intArray[2]);

    // 可以选择向主线程发送消息通知
    self.postMessage("Worker finished modifying SharedArrayBuffer.");
};

通过上述示例,您可以看到主线程和 Worker 线程都能够操作同一个 intArray 视图,从而读写同一块 SharedArrayBuffer。这正是 SharedArrayBuffer 强大的地方。然而,这种共享也带来了潜在的危险。

2. 原子操作的必要性:竞态条件与数据完整性

当多个线程同时访问和修改共享内存时,如果不加以适当的同步控制,就可能出现一种被称为“竞态条件”(Race Condition)的问题。竞态条件是指多个线程以不可预测的顺序访问和操作共享资源时,最终结果的正确性取决于这些线程的执行时序。

2.1 竞态条件示例:非原子操作的陷阱

考虑一个看似简单的操作:i++。在高级语言中,这看起来是一个单一的、不可分割的操作。但在底层,它通常会被分解为至少三个步骤:

  1. 从内存中加载 i 的当前值。
  2. 将加载的值加 1。
  3. 将新值写回内存中的 i

现在,假设有两个 Worker 线程 A 和 B,它们都尝试对共享内存中的同一个变量 i 执行 i++ 操作,且 i 的初始值为 0。

时间点 Worker A 操作 Worker B 操作 i 的值 说明
T1 从内存加载 i (0) 0 A 读取 i
T2 从内存加载 i (0) 0 B 读取 i
T3 i 加 1 (0 -> 1) 0 A 计算新值
T4 i 加 1 (0 -> 1) 0 B 计算新值
T5 将新值 1 写回内存 1 A 将 i 更新为 1
T6 将新值 1 写回内存 1 B 将 i 更新为 1 (覆盖了 A 的结果)

在这个例子中,尽管两个 Worker 都执行了 i++,但最终 i 的值却是 1,而不是我们期望的 2。这就是一个典型的竞态条件,由于操作的交错执行,导致了数据不一致和结果错误。

2.2 原子操作:不可中断的保证

为了解决竞态条件,我们需要原子操作(Atomic Operations)。原子操作是指在执行过程中不会被其他线程中断的操作。它要么完全执行成功,要么完全不执行,不存在中间状态。对于 i++ 这样的操作,如果能够保证它在底层是原子性的,那么上述的交错执行就不可能发生。当一个线程执行原子 i++ 时,另一个线程必须等待,直到前一个操作完成。

JavaScript 通过 Atomics 对象为 SharedArrayBuffer 提供了各种原子操作,包括原子读写、原子算术运算、原子比较交换,以及我们今天的主角——原子等待和通知操作。

3. Atomics 对象概览

Atomics 是一个全局对象,提供了一系列静态方法,用于在 SharedArrayBuffer 上的 Int32ArrayBigInt64Array 视图中执行原子操作。这些操作确保了对共享内存的访问是不可中断的,从而避免了竞态条件。

Atomics 对象提供的主要方法可以分为以下几类:

  1. 原子读写操作:

    • Atomics.load(typedArray, index): 原子地读取指定索引的值。
    • Atomics.store(typedArray, index, value): 原子地将值写入指定索引。
  2. 原子算术和位操作:

    • Atomics.add(typedArray, index, value): 原子地将 value 加到指定索引的值上,并返回旧值。
    • Atomics.sub(typedArray, index, value): 原子地从指定索引的值中减去 value,并返回旧值。
    • Atomics.and(typedArray, index, value): 原子地对指定索引的值执行按位 AND 操作,并返回旧值。
    • Atomics.or(typedArray, index, value): 原子地对指定索引的值执行按位 OR 操作,并返回旧值。
    • Atomics.xor(typedArray, index, value): 原子地对指定索引的值执行按位 XOR 操作,并返回旧值。
  3. 原子比较交换操作:

    • Atomics.compareExchange(typedArray, index, expectedValue, replacementValue): 原子地检查指定索引的值是否等于 expectedValue。如果是,则将其替换为 replacementValue,并返回旧值。这是一个非常强大的原语,是构建许多高级同步机制的基础。
  4. 等待和通知操作(本讲座核心):

    • Atomics.wait(typedArray, index, value, [timeout]): 如果 typedArray[index] 等于 value,则使当前 Worker 暂停执行,进入等待状态。
    • Atomics.notify(typedArray, index, [count]): 唤醒正在 typedArray[index] 上等待的一个或多个 Worker。
  5. 内存同步屏障(Memory Fences):

    • Atomics.fence(): 确保内存操作的顺序性。这通常在实现复杂的无锁数据结构时使用,以保证不同线程观察到的内存写入顺序符合预期。

所有 Atomics 方法都要求操作的 typedArray 必须是 Int32ArrayBigInt64Array 的实例,这是因为这些整数类型能够映射到处理器级别的原子指令,并且 wait/notify 需要固定大小的整数来作为“地址”进行协调。

4. Atomics.waitAtomics.notify 深度剖析:底层互斥量实现

原子读写和算术操作解决了单一内存位置的竞态问题,但它们不足以解决更复杂的线程协调问题,例如“等待一个条件变为真”或“通知其他线程某个事件已发生”。这就是 Atomics.waitAtomics.notify 登场的原因。它们提供了一种高效的、低级的线程间通信机制,是模拟操作系统级互斥量(Mutex)和条件变量(Condition Variable)的关键。

4.1 核心概念:用户态互斥量与条件变量的模拟

在操作系统层面,线程同步通常依赖于内核提供的互斥量和条件变量。互斥量用于保护共享资源,确保在任何时刻只有一个线程可以访问它。条件变量则用于线程间的通知,允许线程在特定条件不满足时挂起,并在条件满足时被唤醒。

Atomics.waitAtomics.notify 为 JavaScript 在用户态(User-Space)提供了类似的功能:

  • Atomics.wait 相当于条件变量的 wait() 操作。它让当前 Worker 线程进入休眠状态,直到被 Atomics.notify 唤醒,或者达到超时时间。
  • Atomics.notify 相当于条件变量的 signal()broadcast() 操作。它唤醒一个或多个在特定内存地址上等待的 Worker 线程。

它们的底层实现通常依赖于操作系统的 Futex (Fast Userspace mUTEX) 机制,这种机制允许在用户空间进行线程同步,只有在发生争用时才需要陷入内核,从而提高了效率。

4.2 Atomics.wait 签名与参数

Atomics.wait(typedArray, index, value, [timeout])

  • typedArray: 必须是 Int32ArrayBigInt64Array 的实例,用于操作共享内存。
  • index: typedArray 中要等待的元素的索引。这是所有等待和通知操作的“同步点”。
  • value: 期望的值。这是 Atomics.wait 的一个关键特性。 如果 typedArray[index] 的当前值不等于 value,那么 Atomics.wait 将立即返回 "not-equal",而不会使线程进入等待状态。这个参数起到了一个“快速失败”的作用,避免了在条件已经满足时进行不必要的等待,也防止了“丢失唤醒”(lost wakeup)的问题(即在检查条件后、进入等待前,条件被另一个线程满足并唤醒)。
  • timeout (可选): 一个表示等待毫秒数的整数。如果指定了 timeout 且在指定时间内未被唤醒,Atomics.wait 将返回 "timed-out"。如果 timeoutInfinity,则表示无限期等待。

返回值:
Atomics.wait 返回一个字符串,表示等待的结果:

  • "ok": 成功被 Atomics.notify 唤醒。
  • "not-equal": 在调用 Atomics.wait 时,typedArray[index] 的值与 value 不匹配。
  • "timed-out": 在 timeout 指定的时间内未被唤醒。

4.3 Atomics.notify 签名与参数

Atomics.notify(typedArray, index, [count])

  • typedArray: 必须是 Int32ArrayBigInt64Array 的实例。
  • index: typedArray 中要唤醒等待线程的元素的索引。
  • count (可选): 一个整数,表示要唤醒的 Worker 线程的最大数量。
    • 如果 count0,则不唤醒任何线程。
    • 如果 count1,则唤醒一个线程。
    • 如果 countInfinity 或省略,则唤醒所有在该 index 上等待的线程。

返回值:
Atomics.notify 返回一个整数,表示实际被唤醒的 Worker 线程的数量。

4.4 工作机制:伪唤醒与循环检查条件

理解 Atomics.wait 的正确使用方式至关重要。由于存在“伪唤醒”(Spurious Wakeups)的可能性,以及为了确保条件真正满足,Atomics.wait 必须在一个循环中调用,并在每次从等待中返回时重新检查条件。

伪唤醒 (Spurious Wakeups): 即使没有 Atomics.notify 调用,一个线程也可能从 Atomics.wait 中返回。这通常是操作系统调度器的一个特性,为了简单和效率,它可能会在条件尚未满足时唤醒线程。因此,不能假设从 wait 返回就意味着条件已经满足。

正确的使用模式:

// Worker 线程等待某个条件
while (Atomics.load(typedArray, index) === EXPECTED_VALUE) {
    // 只有当值仍然是 EXPECTED_VALUE 时才进入等待
    const result = Atomics.wait(typedArray, index, EXPECTED_VALUE);
    // 即使 Atomics.wait 返回 "ok",也需要重新检查条件,因为可能存在伪唤醒
    // 或者在被唤醒后,其他线程又改变了条件
    if (result === "timed-out") {
        console.warn("Wait timed out, condition still not met.");
        // 可以选择处理超时逻辑
        break; // 或者继续等待
    }
}
// 此时,条件已经满足,可以执行后续操作

Atomics.waitvalue 参数是防御性的。它防止了所谓的“丢失唤醒”:如果在线程检查条件为 EXPECTED_VALUE 之后,但在它实际进入等待队列之前,另一个线程改变了值并通过 notify 唤醒,那么这个 notify 就会被“丢失”,导致前一个线程无限期等待。Atomics.wait 只有在 typedArray[index] 确实等于 value 时才会进入等待状态,否则会立即返回 "not-equal",从而避免了这种情况。

惊群效应 (Thundering Herd): 当多个 Worker 在同一个 index 上等待时,如果一个 notify 唤醒了所有等待者 (count: Infinity),那么所有被唤醒的 Worker 都会尝试去检查条件或竞争资源。如果实际上只有一个 Worker 能够成功处理该条件,那么其他 Worker 就会立即发现条件不满足,然后再次进入等待状态,这会造成不必要的 CPU 资源浪费。因此,count 参数的选择需要根据具体场景谨慎考虑。如果只有一个任务可以被处理,通常 notify(typedArray, index, 1) 更高效。

4.5 死锁 (Deadlock) 风险

与所有并发编程中的同步原语一样,Atomics.waitAtomics.notify 的不当使用也可能导致死锁。死锁是指两个或多个线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干涉,它们都将无法继续执行。

例如,如果一个 Worker 线程在未释放其持有的锁的情况下调用 Atomics.wait,而另一个 Worker 线程又需要这个锁来执行 Atomics.notify,那么就会发生死锁。因此,在使用 wait/notify 时,必须仔细设计同步逻辑,确保资源的获取和释放顺序正确,并避免循环等待。

4.6 代码示例:简单的生产者-消费者模型

我们将通过一个经典的生产者-消费者模型来演示 Atomics.waitAtomics.notify 的用法。在这个模型中,一个或多个生产者向共享缓冲区中添加数据,一个或多个消费者从缓冲区中取出数据。我们需要确保:

  • 生产者不能在缓冲区满时写入。
  • 消费者不能在缓冲区空时读取。
  • 共享缓冲区的访问是原子的,以避免数据损坏。

这里我们简化模型,假设只有一个生产者和一个消费者,共享一个 SharedArrayBuffer 作为缓冲区。缓冲区中,一个位置存储数据,另一个位置作为状态标志。

<!-- index.html -->
<!DOCTYPE html>
<html>
<head>
    <title>Atomics.wait/notify Producer-Consumer Demo</title>
</head>
<body>
    <h1>生产者-消费者模型 (Atomics.wait/notify)</h1>
    <p>打开控制台查看输出。</p>
    <script>
        // 共享缓冲区结构:
        // index 0: 状态标志 (0: 空, 1: 有数据)
        // index 1: 实际数据
        const BUFFER_SIZE = 2; // 两个 Int32 元素
        const STATUS_INDEX = 0;
        const DATA_INDEX = 1;

        const sab = new SharedArrayBuffer(BUFFER_SIZE * Int32Array.BYTES_PER_ELEMENT);
        const sharedArray = new Int32Array(sab);

        // 初始化状态:缓冲区为空
        Atomics.store(sharedArray, STATUS_INDEX, 0);
        Atomics.store(sharedArray, DATA_INDEX, 0); // 初始化数据为 0

        console.log("主线程: SharedArrayBuffer 初始化完成.");
        console.log(`主线程: 初始状态 - sharedArray[${STATUS_INDEX}]=${sharedArray[STATUS_INDEX]}, sharedArray[${DATA_INDEX}]=${sharedArray[DATA_INDEX]}`);

        // --- 生产者 Worker ---
        const producerWorker = new Worker(URL.createObjectURL(new Blob([`
            self.onmessage = function(event) {
                const { sharedArray, STATUS_INDEX, DATA_INDEX } = event.data;
                let produceCount = 0;

                function produce() {
                    // 生产者尝试写入数据
                    // 只有当状态为 0 (空) 时才写入
                    while (true) {
                        // 1. 等待缓冲区为空 (STATUS_INDEX === 0)
                        console.log(`[生产者] 尝试等待缓冲区为空... (当前状态: ${Atomics.load(sharedArray, STATUS_INDEX)})`);
                        const waitResult = Atomics.wait(sharedArray, STATUS_INDEX, 0, 5000); // 最多等待5秒

                        if (waitResult === "timed-out") {
                            console.warn("[生产者] 等待缓冲区为空超时,退出生产。");
                            break;
                        } else if (waitResult === "not-equal") {
                            // 伪唤醒或在检查后立即被其他线程填充
                            // 继续循环,重新检查条件
                            console.log("[生产者] 伪唤醒或条件不匹配,重新检查。");
                            continue;
                        }

                        // 如果到达这里,说明 Atomics.wait 成功返回 "ok",并且在进入等待前 STATUS_INDEX 确实是 0
                        // 但是,为了鲁棒性,仍然需要再次检查(伪唤醒的可能)
                        if (Atomics.load(sharedArray, STATUS_INDEX) === 0) {
                            produceCount++;
                            const newData = produceCount * 100;
                            console.log(`[生产者] 缓冲区为空,写入数据: ${newData}`);
                            Atomics.store(sharedArray, DATA_INDEX, newData);
                            Atomics.store(sharedArray, STATUS_INDEX, 1); // 设置状态为 1 (有数据)

                            // 2. 通知消费者有新数据
                            console.log("[生产者] 通知消费者有新数据.");
                            Atomics.notify(sharedArray, STATUS_INDEX, 1); // 唤醒一个等待的消费者

                            // 模拟生产间隔
                            setTimeout(produce, Math.random() * 1000 + 500); // 0.5 - 1.5秒后再次生产
                            break; // 成功生产一次后退出当前循环,等待下一次生产
                        } else {
                            // 再次检查发现条件不满足,可能是伪唤醒或者被其他生产者抢先
                            console.log("[生产者] 伪唤醒后条件不满足,重新进入等待。");
                        }
                    }
                }

                produce(); // 启动生产
            };
        `])));

        producerWorker.postMessage({ sharedArray, STATUS_INDEX, DATA_INDEX });

        // --- 消费者 Worker ---
        const consumerWorker = new Worker(URL.createObjectURL(new Blob([`
            self.onmessage = function(event) {
                const { sharedArray, STATUS_INDEX, DATA_INDEX } = event.data;
                let consumeCount = 0;

                function consume() {
                    // 消费者尝试读取数据
                    // 只有当状态为 1 (有数据) 时才读取
                    while (true) {
                        // 1. 等待缓冲区有数据 (STATUS_INDEX === 1)
                        console.log(`[消费者] 尝试等待缓冲区有数据... (当前状态: ${Atomics.load(sharedArray, STATUS_INDEX)})`);
                        const waitResult = Atomics.wait(sharedArray, STATUS_INDEX, 1, 5000); // 最多等待5秒

                        if (waitResult === "timed-out") {
                            console.warn("[消费者] 等待缓冲区有数据超时,退出消费。");
                            break;
                        } else if (waitResult === "not-equal") {
                            // 伪唤醒或在检查后立即被其他线程清空
                            // 继续循环,重新检查条件
                            console.log("[消费者] 伪唤醒或条件不匹配,重新检查。");
                            continue;
                        }

                        // 如果到达这里,说明 Atomics.wait 成功返回 "ok",并且在进入等待前 STATUS_INDEX 确实是 1
                        if (Atomics.load(sharedArray, STATUS_INDEX) === 1) {
                            consumeCount++;
                            const consumedData = Atomics.load(sharedArray, DATA_INDEX);
                            console.log(`[消费者] 缓冲区有数据,读取数据: ${consumedData}`);
                            Atomics.store(sharedArray, DATA_INDEX, 0); // 清空数据
                            Atomics.store(sharedArray, STATUS_INDEX, 0); // 设置状态为 0 (空)

                            // 2. 通知生产者缓冲区已空
                            console.log("[消费者] 通知生产者缓冲区已空.");
                            Atomics.notify(sharedArray, STATUS_INDEX, 1); // 唤醒一个等待的生产者

                            // 模拟消费间隔
                            setTimeout(consume, Math.random() * 1000 + 500); // 0.5 - 1.5秒后再次消费
                            break; // 成功消费一次后退出当前循环,等待下一次消费
                        } else {
                            // 再次检查发现条件不满足,可能是伪唤醒或者被其他消费者抢先
                            console.log("[消费者] 伪唤醒后条件不满足,重新进入等待。");
                        }
                    }
                }

                consume(); // 启动消费
            };
        `])));

        consumerWorker.postMessage({ sharedArray, STATUS_INDEX, DATA_INDEX });

        // 可以在主线程中偶尔检查共享状态
        setInterval(() => {
            console.log(`[主线程] 当前共享状态: sharedArray[${STATUS_INDEX}]=${sharedArray[STATUS_INDEX]}, sharedArray[${DATA_INDEX}]=${sharedArray[DATA_INDEX]}`);
        }, 3000); // 每3秒检查一次
    </script>
</body>
</html>

代码解析:

  1. 共享内存布局: 我们使用 sharedArray[0] 作为状态标志 (STATUS_INDEX),0 表示缓冲区为空,1 表示有数据。sharedArray[1] (DATA_INDEX) 用于存储实际的生产/消费数据。
  2. 生产者逻辑:
    • 生产者在一个 while(true) 循环中运行,不断尝试生产。
    • 它首先调用 Atomics.wait(sharedArray, STATUS_INDEX, 0)。这意味着它会等待 sharedArray[STATUS_INDEX] 的值变为 0(即缓冲区为空)。
    • 如果 wait 返回 "ok",并且再次检查 STATUS_INDEX 确认确实为 0,生产者就会写入新数据到 DATA_INDEX,并将 STATUS_INDEX 更新为 1
    • 写入完成后,它会调用 Atomics.notify(sharedArray, STATUS_INDEX, 1),唤醒一个在 STATUS_INDEX 上等待的消费者。
    • 生产者在成功生产一次后,通过 setTimeout 模拟生产间隔,然后再次启动生产流程。
  3. 消费者逻辑:
    • 消费者也在一个 while(true) 循环中运行,不断尝试消费。
    • 它调用 Atomics.wait(sharedArray, STATUS_INDEX, 1),等待 sharedArray[STATUS_INDEX] 的值变为 1(即缓冲区有数据)。
    • 如果 wait 返回 "ok",并且再次检查 STATUS_INDEX 确认确实为 1,消费者就会读取 DATA_INDEX 的数据,清空 DATA_INDEX 并将 STATUS_INDEX 更新为 0
    • 读取完成后,它会调用 Atomics.notify(sharedArray, STATUS_INDEX, 1),唤醒一个在 STATUS_INDEX 上等待的生产者。
    • 消费者在成功消费一次后,通过 setTimeout 模拟消费间隔,然后再次启动消费流程。
  4. waitResult 处理: Atomics.wait 的返回值 timed-outnot-equal 都需要妥善处理。not-equal 意味着在调用 wait 时条件不满足,或者发生了伪唤醒。因此,即使返回 ok,也必须在 while 循环中再次检查条件,这是使用 Atomics.wait 的黄金法则。

这个示例清晰地展示了 Atomics.waitAtomics.notify 如何协同工作,实现线程间的同步和通信,确保共享资源的正确访问。

5. 高级主题与最佳实践

在掌握了 Atomics.waitAtomics.notify 的基本用法后,我们可以进一步探讨如何利用它们构建更复杂的同步原语,并考虑一些高级问题。

5.1 构建更复杂的同步原语

Atomics.waitAtomics.notify 是低级原语,它们可以作为构建更高级同步机制的基石。

5.1.1 互斥锁 (Mutex) 的实现

互斥锁是一种最基本的同步原语,它保证在任何时刻只有一个线程能够进入“临界区”并访问共享资源。我们可以使用 Atomics.compareExchangeAtomics.wait/notify 来实现一个简单的互斥锁。

互斥锁状态:

  • 0: 解锁 (Unlocked)
  • 1: 锁定 (Locked)
// Mutex 类的实现(在主线程或 Worker 中都可以定义和使用)
class Mutex {
    constructor(sharedArray, index) {
        // sharedArray 必须是 Int32Array 或 BigInt64Array
        // index 是 Mutex 状态在 sharedArray 中的位置
        this.sharedArray = sharedArray;
        this.index = index;

        // 确保 Mutex 初始状态为解锁
        if (Atomics.load(this.sharedArray, this.index) !== 0) {
            Atomics.store(this.sharedArray, this.index, 0);
        }
    }

    // 获取锁
    lock() {
        while (true) {
            // 尝试将状态从 0 (解锁) 变为 1 (锁定)
            const oldValue = Atomics.compareExchange(this.sharedArray, this.index, 0, 1);

            if (oldValue === 0) {
                // 成功获取锁,状态从 0 变为 1
                return;
            }

            // 如果 oldValue 不是 0,说明锁已经被其他线程持有
            // 等待锁被释放 (即状态变为 0)
            console.log(`[Thread ${self.name || 'Main'}] 锁已被持有,等待释放...`);
            const waitResult = Atomics.wait(this.sharedArray, this.index, 1); // 等待状态从 1 变为其他值 (通常是 0)

            // 即使 wait 成功返回,也可能存在伪唤醒或锁再次被其他线程获取
            // 循环会再次尝试 compareExchange
            if (waitResult === "timed-out") {
                console.warn(`[Thread ${self.name || 'Main'}] 获取锁超时,继续尝试。`);
            }
        }
    }

    // 释放锁
    unlock() {
        // 确保当前线程确实持有锁 (即状态为 1)
        if (Atomics.load(this.sharedArray, this.index) !== 1) {
            console.error(`[Thread ${self.name || 'Main'}] 尝试释放一个未持有的锁!`);
            return;
        }

        // 将状态从 1 (锁定) 变为 0 (解锁)
        Atomics.store(this.sharedArray, this.index, 0);
        console.log(`[Thread ${self.name || 'Main'}] 释放锁。`);

        // 通知一个等待的线程
        Atomics.notify(this.sharedArray, this.index, 1);
    }
}

使用示例:

// main.js 或 worker.js
// 共享缓冲区
const mutexSab = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
const mutexSharedArray = new Int32Array(mutexSab);

// Mutex 实例,状态存储在 mutexSharedArray[0]
const myMutex = new Mutex(mutexSharedArray, 0);

// 共享计数器,受 Mutex 保护
const counterSab = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT);
const counterSharedArray = new Int32Array(counterSab);
Atomics.store(counterSharedArray, 0, 0); // 计数器初始值

// 在主线程中模拟一个任务
async function runTask(threadName, count) {
    self.name = threadName; // 方便日志识别
    for (let i = 0; i < count; i++) {
        myMutex.lock(); // 获取锁
        try {
            // 临界区:安全地访问共享计数器
            const currentValue = Atomics.load(counterSharedArray, 0);
            const newValue = currentValue + 1;
            Atomics.store(counterSharedArray, 0, newValue);
            console.log(`[${threadName}] Counter: ${newValue}`);
        } finally {
            myMutex.unlock(); // 释放锁
        }
        await new Promise(resolve => setTimeout(resolve, Math.random() * 50)); // 模拟一些工作
    }
}

// 主线程作为第一个并发任务
runTask('Main Thread', 10);

// 启动一个 Worker 线程作为第二个并发任务
const workerMutex = new Worker(URL.createObjectURL(new Blob([`
    self.onmessage = async function(event) {
        const { mutexSab, counterSab } = event.data;
        const mutexSharedArray = new Int32Array(mutexSab);
        const counterSharedArray = new Int32Array(counterSab);

        class Mutex {
            constructor(sharedArray, index) {
                this.sharedArray = sharedArray;
                this.index = index;
            }
            lock() {
                while (true) {
                    const oldValue = Atomics.compareExchange(this.sharedArray, this.index, 0, 1);
                    if (oldValue === 0) return;
                    console.log(`[Worker Thread] 锁已被持有,等待释放...`);
                    const waitResult = Atomics.wait(this.sharedArray, this.index, 1);
                    if (waitResult === "timed-out") {
                        console.warn(`[Worker Thread] 获取锁超时,继续尝试。`);
                    }
                }
            }
            unlock() {
                if (Atomics.load(this.sharedArray, this.index) !== 1) {
                    console.error(`[Worker Thread] 尝试释放一个未持有的锁!`);
                    return;
                }
                Atomics.store(this.sharedArray, this.index, 0);
                console.log(`[Worker Thread] 释放锁。`);
                Atomics.notify(this.sharedArray, this.index, 1);
            }
        }
        const myMutex = new Mutex(mutexSharedArray, 0);

        self.name = 'Worker Thread';
        for (let i = 0; i < 10; i++) {
            myMutex.lock();
            try {
                const currentValue = Atomics.load(counterSharedArray, 0);
                const newValue = currentValue + 1;
                Atomics.store(counterSharedArray, 0, newValue);
                console.log(`[Worker Thread] Counter: ${newValue}`);
            } finally {
                myMutex.unlock();
            }
            await new Promise(resolve => setTimeout(resolve, Math.random() * 50));
        }
        self.postMessage('Worker finished.');
    };
`])));

workerMutex.postMessage({ mutexSab, counterSab });

在这个 Mutex 实现中:

  • lock() 方法使用 Atomics.compareExchange 尝试将锁的状态从 0 (解锁) 原子地变为 1 (锁定)。
  • 如果 compareExchange 返回 0,说明成功获取锁。
  • 如果返回 1,说明锁已被其他线程持有,当前线程进入 Atomics.wait(..., 1) 状态,等待锁被释放。
  • unlock() 方法将锁状态设回 0,并调用 Atomics.notify 唤醒一个等待的线程。
5.1.2 信号量 (Semaphore) 的实现

信号量是互斥锁的推广,它允许同时有 N 个线程访问共享资源。我们可以用一个整数来表示可用资源的数量,当资源数量为 0 时,线程等待。

// Semaphore 类的实现
class Semaphore {
    constructor(sharedArray, index, initialCount) {
        this.sharedArray = sharedArray;
        this.index = index;
        Atomics.store(this.sharedArray, this.index, initialCount);
    }

    // 获取资源
    acquire() {
        while (true) {
            const currentValue = Atomics.load(this.sharedArray, this.index);
            if (currentValue > 0) {
                // 如果有可用资源,尝试原子地减 1
                const oldValue = Atomics.compareExchange(this.sharedArray, this.index, currentValue, currentValue - 1);
                if (oldValue === currentValue) {
                    // 成功获取资源
                    return;
                }
                // 否则,说明在检查和 compareExchange 之间有其他线程抢先,重新尝试
                continue;
            }
            // 如果没有可用资源,则等待
            console.log(`[Thread ${self.name || 'Main'}] 没有可用资源,等待...`);
            const waitResult = Atomics.wait(this.sharedArray, this.index, 0); // 等待资源数变为非零
            if (waitResult === "timed-out") {
                console.warn(`[Thread ${self.name || 'Main'}] 获取信号量超时,继续尝试。`);
            }
            // 伪唤醒或条件不匹配,重新检查
        }
    }

    // 释放资源
    release() {
        // 原子地将资源数量加 1
        Atomics.add(this.sharedArray, this.index, 1);
        console.log(`[Thread ${self.name || 'Main'}] 释放资源,当前可用: ${Atomics.load(this.sharedArray, this.index)}`);
        // 通知一个等待的线程
        Atomics.notify(this.sharedArray, this.index, 1);
    }
}

信号量的实现与互斥锁类似,只是状态变量代表的是可用资源的数量,acquire() 操作会尝试减少资源数量,release() 操作会增加资源数量。当资源数量为 0 时,acquire() 会进入等待。

5.2 内存模型与顺序一致性 (Memory Model and Sequential Consistency)

并发编程中一个复杂但至关重要的概念是内存模型。它定义了多处理器系统如何以及何时将内存操作(读和写)反映给其他处理器。不同的处理器架构和编程语言有不同的内存模型。

JavaScript 的 Atomics 操作提供了强大的内存同步保证。具体来说,Atomics 操作(如 load, store, add, compareExchange 等)都具有“顺序一致性”(Sequential Consistency)或更弱的“获取-释放”(Acquire-Release)语义。

  • 顺序一致性:所有线程观察到的内存操作顺序都是相同的,并且与程序代码中指定的顺序一致。这是最直观但通常开销最大的模型。
  • 获取-释放语义
    • 获取操作 (Acquire):例如 Atomics.loadAtomics.compareExchange(当读取时),它确保在其之后的所有内存操作都不能被重排到它之前。
    • 释放操作 (Release):例如 Atomics.storeAtomics.compareExchange(当写入时),它确保在其之前的所有内存操作都不能被重排到它之后。
    • 当一个线程的释放操作与另一个线程的获取操作配对时,它能保证“发生在……之前”的顺序,确保数据在线程间正确同步。

Atomics.wait 隐式地包含了一个获取操作,而 Atomics.notify 隐式地包含了一个释放操作。这意味着当一个线程被 notify 唤醒并从 wait 返回时,它能够“看到”所有在 notify 之前发生的内存写入。这种保证对于正确同步共享数据至关重要,它防止了编译器和处理器为了优化而对内存操作进行重排,从而导致数据可见性问题。

虽然 Atomics.fence() 也存在,但对于大多数日常的同步需求,Atomics 的其他操作(特别是 wait/notifycompareExchange)已经提供了足够的内存同步保证,通常不需要显式使用 fence

5.3 性能考量

Atomics.waitAtomics.notify 在实现上通常利用操作系统的 Futex 机制,这意味着它们在没有竞争的情况下(即不需要实际等待时)开销非常小,几乎是用户态操作。只有当线程确实需要等待时,它们才会陷入内核并进入休眠状态,这会带来一些上下文切换的开销。

性能最佳实践:

  • 避免不必要的等待和唤醒: 只有当确实需要等待某个条件时才调用 Atomics.wait。频繁的唤醒和等待会增加上下文切换的开销。
  • 谨慎使用 Atomics.notify(..., Infinity) 唤醒所有等待者(惊群效应)可能导致不必要的竞争和开销。如果只需要一个线程处理,优先使用 Atomics.notify(..., 1)
  • 使用 timeout 参数: 对于可能长时间等待或需要及时响应的场景,设置一个合理的 timeout 可以防止线程无限期阻塞,并允许应用程序在超时时执行回退逻辑或报告错误。
  • 忙等待 (Busy Waiting) vs. 阻塞等待: Atomics.wait 是一种阻塞等待机制,它让线程进入休眠,不消耗 CPU 周期。相比之下,如果在一个循环中不断地 Atomics.load 检查条件而不调用 wait,这就是忙等待,会浪费大量的 CPU 资源,应尽量避免。

5.4 错误处理与鲁棒性

  • Atomics.wait 的返回值: 始终检查 Atomics.wait 的返回值 ("ok", "not-equal", "timed-out"),并根据不同的结果采取相应的处理逻辑。特别是 timed-out,可能需要重试、报告错误或采取其他策略。
  • Worker 异常终止: 如果一个 Worker 在持有锁或处于特定状态时异常终止,可能会导致其他 Worker 永久等待,形成死锁。在实际应用中,需要考虑更健壮的错误恢复机制,例如超时重试、清理机制或监控 Worker 状态。JavaScript 层面很难直接处理 Worker 的非正常终止对共享内存状态的影响,这通常需要更高级的协调逻辑。
  • 共享内存的初始化: 确保 SharedArrayBuffer 及其视图在所有线程中都得到正确的初始化,并且同步变量(如互斥锁、信号量的状态)具有正确的初始值。

6. 实际应用场景

SharedArrayBufferAtomics 机制为 Web 应用带来了前所未有的并发处理能力,使其能够胜任更复杂的任务:

  • WebAssembly 与 JavaScript 线程间通信: WebAssembly 模块可以在 Worker 中运行,并且可以直接访问 SharedArrayBuffer。这使得高性能的 WebAssembly 代码可以与 JavaScript 线程高效地共享数据和同步执行。
  • 高性能数据处理与并行计算: 例如,图像处理、视频编码/解码、科学计算等计算密集型任务可以被分解成小块,由多个 Worker 并行处理,并将结果汇集到共享内存中。
  • 游戏引擎中的并发任务: 游戏逻辑、物理模拟、AI 计算等可以分配给不同的 Worker 线程,通过 SharedArrayBuffer 共享游戏状态,实现更流畅的游戏体验。
  • 复杂的用户界面渲染: 在一些高级 UI 框架中,可以使用 Worker 线程进行布局计算或部分渲染,然后通过 SharedArrayBuffer 将结果数据传递给主线程进行最终渲染,避免 UI 阻塞。
  • 构建 Web 数据库或缓存: 可以在 Worker 中维护一个共享的内存数据库或缓存,多个 Worker 和主线程都可以通过原子操作安全地读写数据。

7. 共享内存与原子操作的演进

SharedArrayBufferAtomics 的引入标志着 Web 平台并发模型的一个重要里程碑。尽管它们提供了强大的能力,但也带来了更高的复杂性,要求开发者对并发编程的基本原理有深刻的理解。未来,Web 平台可能会继续探索更高级的并发原语,例如更完善的锁、条件变量,甚至是一些无锁数据结构的支持,以进一步简化并发编程的难度,同时保持高性能。随着 WebAssembly System Interface (WASI) 等技术的发展,Web 环境中的线程和内存管理将与底层系统能力更加紧密地结合,为构建更强大的 Web 应用开辟新的道路。

通过今天的讲座,我们深入探讨了 SharedArrayBuffer 上的原子操作,特别是 Atomics.waitAtomics.notify 如何作为用户态互斥量和条件变量的实现基础。我们理解了它们的参数、工作机制,以及如何通过它们构建生产者-消费者模型和互斥锁等同步原语。这些强大的工具为 JavaScript 带来了真正的共享内存并发能力,使 Web 开发者能够构建出更复杂、更高效的应用程序。掌握这些知识,您将能够更好地驾驭现代 Web 应用的并发挑战。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注