大家好,欢迎来到本次技术讲座。今天我们将深入探讨一个令人兴奋且极具挑战性的话题:在 JavaScript 中构建一个无锁(Lock-free)并发哈希表。我们将利用现代 Web 平台提供的强大原语——SharedArrayBuffer 和 Futex——来实现这一目标。
在多核处理器日益普及的今天,前端应用也开始面临传统后端领域才有的并发编程挑战。Web Workers 允许我们将计算密集型任务卸载到后台线程,避免阻塞主线程。然而,当这些 Workers 需要共享和修改同一份数据时,问题就来了。传统的做法是使用消息传递,但这通常涉及到数据的序列化和反序列化,效率低下。更高级的解决方案是共享内存,但共享内存带来了数据竞争的风险。
这就是无锁编程的用武之地。通过避免传统的互斥锁(mutexes),我们可以消除死锁的风险,提高在高并发下的吞吐量和响应速度。当然,无锁编程的实现难度也相应增加。
1. 并发编程的基石:SharedArrayBuffer 与 Atomics
要实现无锁数据结构,我们首先需要两个核心工具:SharedArrayBuffer 和 Atomics 对象。
1.1 SharedArrayBuffer:共享内存的门户
SharedArrayBuffer 是一种特殊类型的 ArrayBuffer,它的内容可以被多个执行上下文(例如,主线程和多个 Web Workers)共享。这意味着,不同 Workers 可以直接读写同一块内存区域,而无需通过消息传递来复制数据。
// 主线程或一个 Worker
const sharedBuffer = new SharedArrayBuffer(1024); // 创建一个1KB的共享缓冲区
const sharedInt32Array = new Int32Array(sharedBuffer); // 在其上创建一个32位整数视图
// 现在,sharedBuffer 或 sharedInt32Array 可以通过 postMessage 传递给其他 Workers
// 但传递的是引用,而不是副本
worker.postMessage({ buffer: sharedBuffer });
// 在另一个 Worker 中接收
self.onmessage = (event) => {
const receivedBuffer = event.data.buffer;
const receivedInt32Array = new Int32Array(receivedBuffer);
// 两个 Workers 现在可以读写相同的 receivedInt32Array / sharedInt32Array
// 但必须使用 Atomics 操作来确保安全和正确性
console.log('Worker received shared array:', receivedInt32Array);
};
SharedArrayBuffer 是实现并发数据结构的基础,因为它提供了共享的数据存储。然而,仅仅共享内存是不够的。当多个 Workers 同时读写同一内存位置时,可能会发生数据竞争,导致不可预测的结果。这就是 Atomics 对象发挥作用的地方。
1.2 Atomics:原子操作的守护者
Atomics 对象提供了一组原子操作,用于在 SharedArrayBuffer 上执行读、写和读-修改-写操作。原子操作保证了操作的不可中断性:在任何给定的时间,只有一个 Worker 可以执行该操作,并且该操作要么完全完成,要么根本不发生。这解决了数据竞争问题,是构建无锁数据结构的关键。
以下是一些我们将用到的 Atomics 方法:
Atomics.load(typedArray, index): 原子地读取typedArray在index处的值。Atomics.store(typedArray, index, value): 原子地写入value到typedArray在index处。Atomics.compareExchange(typedArray, index, expectedValue, replacementValue): 这是一个强大的读-修改-写操作。它原子地检查typedArray在index处的值是否等于expectedValue。如果相等,则将其替换为replacementValue。无论是否替换,它都会返回index处的原始值。这是实现 CAS (Compare-And-Swap) 语义的核心。Atomics.add(typedArray, index, value): 原子地将value加到typedArray在index处的值上,并返回原始值。Atomics.sub(typedArray, index, value): 原子地从typedArray在index处的值中减去value,并返回原始值。
这些操作确保了即使在高度并发的环境中,对共享内存的修改也是可预测和正确的。
2. Futex:高效的线程同步原语
虽然 Atomics 解决了数据竞争,但有时 Workers 需要等待某个条件变为真才能继续执行。如果使用忙等待(busy-waiting),即在一个循环中反复检查条件,会浪费大量的 CPU 周期。Futex (Fast User-space Mutex) 原语就是为了解决这个问题而设计的。
Futex 在 JavaScript 中通过 Atomics.wait 和 Atomics.notify 实现,它们允许 Workers 阻塞(暂停执行)直到被另一个 Worker 唤醒,从而实现高效的线程同步。
-
Atomics.wait(typedArray, index, value, [timeout]):- 原子地检查
typedArray在index处的值是否等于value。 - 如果相等,则 Worker 进入休眠状态,直到被
Atomics.notify唤醒,或发生超时。 - 如果不相等,则立即返回 "not-equal"。
- 返回
ok(被唤醒),timed-out(超时), 或not-equal。 - 重要:
wait操作必须在一个SharedArrayBuffer的视图上进行。
- 原子地检查
-
Atomics.notify(typedArray, index, [count]):- 唤醒最多
count个正在typedArray在index处等待的 Workers。 count默认为Infinity(唤醒所有等待的 Workers)。- 返回被唤醒的 Workers 数量。
- 唤醒最多
Futex 的工作原理简述:
- 一个 Worker 想要等待某个条件。它会检查
SharedArrayBuffer中的一个特定内存位置(通常是一个整数)。 - 如果条件不满足(例如,该整数的值不符合预期),Worker 调用
Atomics.wait,传入该内存位置、期望值,并进入休眠。 - 另一个 Worker 改变了该内存位置的值,使其满足条件。
- 该 Worker 调用
Atomics.notify,传入相同的内存位置,唤醒一个或多个正在等待的 Worker。 - 被唤醒的 Worker 从
Atomics.wait返回,并可以继续执行。
Futex 是实现复杂无锁数据结构中协调的关键,例如在哈希表扩容时,可以用来协调 Workers 帮助完成扩容任务或等待扩容完成。
3. 无锁并发哈希表的设计挑战
构建一个无锁哈希表比实现一个简单的无锁计数器或队列要复杂得多。主要挑战包括:
- 冲突解决(Collision Resolution): 哈希冲突是不可避免的。我们需要一种无锁的方式来处理它们。
- 链式(Chaining): 每个桶是一个链表。在无锁环境中,实现无锁链表本身就很复杂(需要无锁指针修改,涉及到 ABA 问题)。
- 开放寻址(Open Addressing): 当发生冲突时,探测(probe)哈希表中的其他位置。这对于无锁实现更友好,因为我们主要操作数组索引,避免了复杂的指针操作。我们将采用线性探测。
- 动态扩容(Resizing/Rehashing): 当哈希表中的元素数量达到一定阈值时,需要扩容以保持性能。在无锁环境中,这尤其困难,因为扩容需要移动大量数据,并且必须确保在扩容过程中,读写操作仍然能够正确进行。
- 内存管理与回收: 在
SharedArrayBuffer中,我们不能像普通对象那样依赖 JavaScript 的垃圾回收。我们需要管理键值对的存储,以及如何表示它们的生命周期(空、已存在、已删除)。 - 键值对的存储:
SharedArrayBuffer只能存储原始字节。我们需要一种方式将 JavaScript 对象(键和值)序列化到SharedArrayBuffer中,并在读取时反序列化。为了简化,本讲座将主要关注存储数值类型的键和值,或者使用简单的编码方式存储字符串。
3.1 哈希表的内部结构
我们将使用开放寻址与线性探测来解决冲突。每个槽位(slot)在 SharedArrayBuffer 中将存储:
- 状态(State): 表示该槽位是空的、已占用还是已删除。
- 键的哈希值(Key Hash): 存储键的哈希值,用于快速比较和探测。
- 键(Key): 实际的键数据。
- 值(Value): 实际的值数据。
为了简化,我们假设键和值都是32位整数。如果需要存储字符串或其他复杂对象,则需要更复杂的序列化逻辑,例如在 SharedArrayBuffer 中预留字符串池,并存储字符串的偏移量和长度。
槽位状态定义:
| 状态常量 | 值 | 描述 |
|---|---|---|
EMPTY |
0 | 槽位为空,可以写入新数据。 |
OCCUPIED |
1 | 槽位已被占用,包含有效的键值对。 |
DELETED |
2 | 槽位曾被占用但已被删除。探测时需跳过此槽位。 |
RESIZING |
3 | 扩容过程中使用的临时状态,通常用于标记正在迁移的槽位。 |
我们将使用一个 Int32Array 视图来表示哈希表的主体,每个槽位占用固定的多个 Int32 单元。
例如,一个槽位可能结构如下:
[STATE, KEY_HASH, KEY, VALUE]
3.2 扩容策略 (Help-on-the-fly Resizing)
扩容是无锁哈希表最复杂的方面之一。我们将采用一种“辅助式”的扩容策略:
- 当哈希表的负载因子超过阈值时,某个 Worker 决定启动扩容。
- 它会创建一个新的、更大的
SharedArrayBuffer来存储新哈希表,并将其链接到旧哈希表。 - 旧哈希表的元数据中会设置一个
RESIZE_IN_PROGRESS标志,并记录新表的引用。 - 后续的任何操作(
put、get、delete)如果访问到旧哈希表,会检测到RESIZE_IN_PROGRESS标志。 - 这些操作在执行自己的任务之前,会“帮助”移动旧哈希表中的一些元素到新哈希表。这通常被称为“帮助-随-飞” (help-on-the-fly) 或“多步” (multi-step) 扩容。
- 当所有元素都移动完毕后,旧哈希表被废弃,新哈希表成为当前的主哈希表。
为了协调扩容,我们可能需要额外的 Futex 位置来等待扩容完成,或者协调多个 Workers 共同迁移数据。
4. 实现无锁并发哈希表
让我们开始构建这个哈希表。
4.1 核心数据结构和常量
我们首先定义一些常量和哈希表的元数据结构。
// shared-hash-table.js (或一个模块)
// 定义槽位状态
const SlotState = {
EMPTY: 0,
OCCUPIED: 1,
DELETED: 2,
RESERVED_FOR_RESIZE: 3, // 扩容时可能用于标记正在迁移
};
// 哈希表元数据偏移量 (假设我们用一个 Int32Array 视图来管理所有数据)
// 我们可以将元数据放在共享缓冲区的开头
const META_OFFSET = {
CAPACITY: 0, // 表格当前容量
SIZE: 1, // 当前元素数量
RESIZE_ACTIVE: 2, // 0: 无扩容, 1: 扩容进行中
NEW_TABLE_PTR: 3, // 新表的起始索引 (如果扩容进行中)
RESIZE_INDEX: 4, // 扩容时,下一个要迁移的旧表索引
FUTEX_WAIT_LOCATION: 5, // 用于Futex等待扩容完成的地址
// ... 其他元数据
TOTAL_META_FIELDS: 6, // 元数据字段总数
};
// 每个槽位占用的 Int32 单元数量
const SLOT_SIZE_INT32 = 4; // [STATE, KEY_HASH, KEY, VALUE]
// 负载因子阈值
const LOAD_FACTOR_THRESHOLD = 0.7;
// 一个简单的哈希函数 (用于整数键)
function simpleHash(key, capacity) {
// 确保哈希结果是非负数
return (key % capacity + capacity) % capacity;
}
// 模拟字符串哈希(如果需要)
function stringHash(str, capacity) {
let hash = 0;
for (let i = 0; i < str.length; i++) {
const char = str.charCodeAt(i);
hash = ((hash << 5) - hash) + char; // hash * 31 + char
hash |= 0; // Convert to 32bit integer
}
return (hash % capacity + capacity) % capacity;
}
4.2 哈希表类结构
现在,我们定义 LockFreeHashTable 类。
class LockFreeHashTable {
constructor(initialCapacity = 16, sharedBuffer = null, isNewTable = true) {
if (!Number.isInteger(initialCapacity) || initialCapacity < 1) {
throw new Error("Initial capacity must be a positive integer.");
}
this.initialCapacity = initialCapacity;
this.slotSize = SLOT_SIZE_INT32; // 每个槽位占用的 Int32 单元数
if (isNewTable) {
// 计算需要的 SharedArrayBuffer 大小
// 元数据 + (容量 * 每个槽位的大小)
const bufferSize = (META_OFFSET.TOTAL_META_FIELDS + initialCapacity * this.slotSize) * Int32Array.BYTES_PER_ELEMENT;
this.buffer = new SharedArrayBuffer(bufferSize);
this.data = new Int32Array(this.buffer);
// 初始化元数据
Atomics.store(this.data, META_OFFSET.CAPACITY, initialCapacity);
Atomics.store(this.data, META_OFFSET.SIZE, 0);
Atomics.store(this.data, META_OFFSET.RESIZE_ACTIVE, 0); // 0: not resizing
Atomics.store(this.data, META_OFFSET.RESIZE_INDEX, 0); // For tracking resize progress
Atomics.store(this.data, META_OFFSET.FUTEX_WAIT_LOCATION, 0); // Initial value for futex
// 初始化所有槽位状态为 EMPTY
for (let i = 0; i < initialCapacity; i++) {
const slotOffset = this._getSlotOffset(i);
Atomics.store(this.data, slotOffset, SlotState.EMPTY);
}
} else {
// 从已有的 SharedArrayBuffer 视图创建
if (!sharedBuffer || !(sharedBuffer instanceof SharedArrayBuffer)) {
throw new Error("Must provide a SharedArrayBuffer for existing tables.");
}
this.buffer = sharedBuffer;
this.data = new Int32Array(this.buffer);
this.initialCapacity = Atomics.load(this.data, META_OFFSET.CAPACITY); // Load actual capacity
}
}
_getSlotOffset(index) {
return META_OFFSET.TOTAL_META_FIELDS + index * this.slotSize;
}
_getSlotStateOffset(index) {
return this._getSlotOffset(index);
}
_getSlotKeyHashOffset(index) {
return this._getSlotOffset(index) + 1;
}
_getSlotKeyOffset(index) {
return this._getSlotOffset(index) + 2;
}
_getSlotValueOffset(index) {
return this._getSlotOffset(index) + 3;
}
// 哈希函数(使用我们之前定义的 simpleHash)
_hash(key) {
// 使用一个伪随机种子,或者更复杂的哈希函数
return simpleHash(key, Atomics.load(this.data, META_OFFSET.CAPACITY));
}
// ... 其他方法 (put, get, delete, resize) 将在此处添加
}
4.3 辅助函数:处理扩容
在 put, get, delete 之前,我们需要检查是否正在扩容,并提供帮助。
class LockFreeHashTable {
// ... constructor and helper methods ...
/**
* 帮助迁移旧表中的一些元素到新表。
* 这是一个协作式的扩容机制。
* @returns {boolean} 如果扩容完成,返回 true。
*/
_helpResize() {
const oldTable = this; // 当前 Worker 正在操作的表 (可能已经是旧表)
const oldCapacity = Atomics.load(oldTable.data, META_OFFSET.CAPACITY);
// 如果没有扩容,或者扩容已完成,直接返回
if (Atomics.load(oldTable.data, META_OFFSET.RESIZE_ACTIVE) === 0) {
return true; // No resize in progress or already finished
}
// 获取新表的引用
const newTablePtr = Atomics.load(oldTable.data, META_OFFSET.NEW_TABLE_PTR);
if (newTablePtr === 0) {
// 新表尚未完全初始化,等待或稍后重试
// 这是一个简化,实际可能需要更复杂的等待机制
return false;
}
const newTableBuffer = new SharedArrayBuffer(oldTable.data.buffer.byteLength * 2); // 假设新表是旧表两倍大
// 这里的 newTableBuffer 应该通过某种机制被初始化并存储在 oldTable.data[NEW_TABLE_PTR] 中
// 但为了演示,我们假设 newTablePtr 是一个全局注册的 SharedArrayBuffer 的 ID 或索引,
// 或者我们直接传入一个新表的 LockFreeHashTable 实例。
// 实际应用中,`NEW_TABLE_PTR` 可能存储新表的 `SharedArrayBuffer` 在某个全局注册表中的索引。
// 这里简化为直接引用。
const newTable = new LockFreeHashTable(oldCapacity * 2, this._getNewTableBufferFromPtr(newTablePtr), false);
// 迁移一批元素 (例如,16个)
const BATCH_SIZE = 16;
let elementsMoved = 0;
while (elementsMoved < BATCH_SIZE) {
// 原子地获取并递增 ResizeIndex
const oldIndex = Atomics.fetchAdd(oldTable.data, META_OFFSET.RESIZE_INDEX, 1);
if (oldIndex >= oldCapacity) {
// 所有元素都已处理完毕
// 确保只有最后一个完成的 Worker 标记扩容完成
if (Atomics.compareExchange(oldTable.data, META_OFFSET.RESIZE_ACTIVE, 1, 0) === 1) {
// 扩容完成,通知所有等待的 Worker
Atomics.notify(oldTable.data, META_OFFSET.FUTEX_WAIT_LOCATION, Infinity);
}
return true; // Resize completed
}
const oldSlotStateOffset = oldTable._getSlotStateOffset(oldIndex);
const oldSlotKeyHashOffset = oldTable._getSlotKeyHashOffset(oldIndex);
const oldSlotKeyOffset = oldTable._getSlotKeyOffset(oldIndex);
const oldSlotValueOffset = oldTable._getSlotValueOffset(oldIndex);
const state = Atomics.load(oldTable.data, oldSlotStateOffset);
if (state === SlotState.OCCUPIED) {
// 尝试将旧槽位标记为 RESERVED_FOR_RESIZE
// 确保只有一个 Worker 迁移这个槽位
if (Atomics.compareExchange(oldTable.data, oldSlotStateOffset, SlotState.OCCUPIED, SlotState.RESERVED_FOR_RESIZE) === SlotState.OCCUPIED) {
const keyHash = Atomics.load(oldTable.data, oldSlotKeyHashOffset);
const key = Atomics.load(oldTable.data, oldSlotKeyOffset);
const value = Atomics.load(oldTable.data, oldSlotValueOffset);
// 将元素插入新表 (这里需要调用新表的 put 方法,但要避免递归触发 resize)
// 为了简化,我们直接在这里进行一次性的插入
this._insertIntoNewTable(newTable, keyHash, key, value);
elementsMoved++;
}
} else if (state === SlotState.DELETED) {
// 如果旧槽位是 DELETED,直接跳过,不需要迁移
elementsMoved++;
}
// 如果是 EMPTY 或 RESERVED_FOR_RESIZE,则不做任何事,等待下一个槽位
}
return false; // Not yet completed
}
// 假设有一个机制,根据 NEW_TABLE_PTR 获取实际的 SharedArrayBuffer
// 在实际应用中,这可能是一个全局的 SharedArrayBuffer 注册表
_getNewTableBufferFromPtr(ptr) {
// 这是一个简化示例,实际中需要一个更健壮的机制来管理共享缓冲区的生命周期和引用。
// 例如,一个中央注册表,根据 ptr 返回 SharedArrayBuffer 实例。
// For now, let's just return a dummy SharedArrayBuffer for conceptual clarity.
// In a real system, `ptr` would be an index into an array of SharedArrayBuffers or similar.
// This is a placeholder for demonstration.
// For a robust solution, one might pass the entire new LockFreeHashTable instance
// or its SharedArrayBuffer through `postMessage` to all workers, and store references.
return new SharedArrayBuffer(this.data.buffer.byteLength * 2); // Dummy buffer
}
// 专门用于扩容时将元素插入新表,不进行负载因子检查或递归扩容
_insertIntoNewTable(targetTable, keyHash, key, value) {
const capacity = Atomics.load(targetTable.data, META_OFFSET.CAPACITY);
let index = simpleHash(key, capacity); // 使用原始键重新哈希到新表
// 线性探测
for (let i = 0; i < capacity; i++) {
const currentProbeIndex = (index + i) % capacity;
const slotStateOffset = targetTable._getSlotStateOffset(currentProbeIndex);
const slotKeyHashOffset = targetTable._getSlotKeyHashOffset(currentProbeIndex);
const slotKeyOffset = targetTable._getSlotKeyOffset(currentProbeIndex);
const slotValueOffset = targetTable._getSlotValueOffset(currentProbeIndex);
const currentState = Atomics.load(targetTable.data, slotStateOffset);
if (currentState === SlotState.EMPTY || currentState === SlotState.DELETED) {
// 尝试写入该槽位
if (Atomics.compareExchange(targetTable.data, slotStateOffset, currentState, SlotState.OCCUPIED) === currentState) {
Atomics.store(targetTable.data, slotKeyHashOffset, keyHash);
Atomics.store(targetTable.data, slotKeyOffset, key);
Atomics.store(targetTable.data, slotValueOffset, value);
Atomics.add(targetTable.data, META_OFFSET.SIZE, 1); // 原子递增新表大小
return true;
}
}
// 如果是 OCCUPIED 且哈希值/键不匹配,继续探测
}
// 如果到达这里,说明新表也满了,这通常不应该发生在一个足够大的新表中。
// 在实际系统中,这可能意味着需要立即再次扩容或抛出错误。
return false;
}
// 启动扩容流程
_startResize() {
// 尝试将 RESIZE_ACTIVE 从 0 变为 1
if (Atomics.compareExchange(this.data, META_OFFSET.RESIZE_ACTIVE, 0, 1) === 0) {
// 成功启动扩容
const oldCapacity = Atomics.load(this.data, META_OFFSET.CAPACITY);
const newCapacity = oldCapacity * 2; // 通常是两倍
// 创建一个新的 SharedArrayBuffer 用于新表
const newTableBufferSize = (META_OFFSET.TOTAL_META_FIELDS + newCapacity * this.slotSize) * Int32Array.BYTES_PER_ELEMENT;
const newTableBuffer = new SharedArrayBuffer(newTableBufferSize);
const newTableData = new Int32Array(newTableBuffer);
// 初始化新表的元数据
Atomics.store(newTableData, META_OFFSET.CAPACITY, newCapacity);
Atomics.store(newTableData, META_OFFSET.SIZE, 0);
Atomics.store(newTableData, META_OFFSET.RESIZE_ACTIVE, 0); // 新表本身不处于扩容状态
Atomics.store(newTableData, META_OFFSET.RESIZE_INDEX, 0);
Atomics.store(newTableData, META_OFFSET.FUTEX_WAIT_LOCATION, 0);
// 初始化新表槽位状态为 EMPTY
for (let i = 0; i < newCapacity; i++) {
Atomics.store(newTableData, META_OFFSET.TOTAL_META_FIELDS + i * this.slotSize, SlotState.EMPTY);
}
// 将新表的引用存储在旧表的元数据中
// 实际中可能存储 newTableBuffer 的一个唯一 ID 或索引
// 这里我们简化为直接存储一个整数表示新表已存在
Atomics.store(this.data, META_OFFSET.NEW_TABLE_PTR, 1); // Placeholder for new table reference
// 实际可能需要一个全局注册表来管理 SharedArrayBuffer 实例。
// 这里为了演示,我们假设 newTableBuffer 可以通过某种方式被其他 Worker 访问。
// 真实场景下,LockFreeHashTable 实例本身会被传递给 Workers,或者通过 `postMessage` 传递 `newTableBuffer`。
// 如果新表是完全独立的 SharedArrayBuffer,那么需要一个机制让其他 Worker 知道如何获取它。
// 鉴于 JS 的 Worker 模型,最直接的方式是:
// 1. 主线程创建并管理所有 SharedArrayBuffer,并将其 postMessage 给 Workers。
// 2. Workers 之间通过消息传递 SharedArrayBuffer 的引用。
// 为了简化,我们假设 `this` 始终代表当前的活动表,且新表实例会被正确传递或引用。
// 此时,_helpResize 会开始工作,将元素从旧表迁移到新表。
// 扩容完成后,需要更新“根”哈希表引用,指向新表。
// 这是一个全局协调点,可能由主线程或一个专门的协调 Worker 来完成。
// 假设我们有一个全局的 `currentHashTable` 引用,需要原子更新。
// 例如:
// `globalCurrentHashTable = newTable;` (非原子,需要 CAS)
// 更好的方式是,在旧表末尾放置一个指向新表的指针,并在所有迁移完成后原子更新。
}
}
// 获取当前活动的哈希表实例
// 在扩容过程中,它会返回新表,并可能帮助迁移
_getCurrentTable() {
let currentTable = this;
while (Atomics.load(currentTable.data, META_OFFSET.RESIZE_ACTIVE) === 1) {
// 发现正在扩容,帮助迁移
currentTable._helpResize();
// 如果扩容完成,需要获取新的哈希表实例
// 这是一个复杂之处,因为 `this` 可能仍然是旧表。
// 理想情况下,全局有一个指向当前表的原子指针。
// 让我们简化一下:如果 `RESIZE_ACTIVE` 变为 0,则当前表就是 `this`。
// 否则,我们需要一个方法来获取新表。
// 假设在全局上下文有一个 `_rootHashTable` 变量,并在扩容完成后原子更新。
// 为了本例,我们假设 Worker 总是操作最新的表,或者在发现扩容时,直接等待扩容完成。
// 简化为:如果扩容完成,`this` 就会被替换为新表。
// 否则,Worker 应该等待 `FUTEX_WAIT_LOCATION` 被唤醒,然后重新获取表。
Atomics.wait(currentTable.data, META_OFFSET.FUTEX_WAIT_LOCATION, 1); // 等待扩容完成信号
// 醒来后,重新检查 `RESIZE_ACTIVE`。如果为 0,则 `this` 是正确的。
// 否则,意味着扩容还没完全完成,或者又开始了新的扩容,继续等待或帮助。
// 这种等待是阻塞的,但比忙等待高效。
// 实际上,更优雅的方案是 `_helpResize` 返回新表的引用,或者有一个全局原子指针。
// For now, let's assume if RESIZE_ACTIVE is 0, we are good.
if (Atomics.load(currentTable.data, META_OFFSET.RESIZE_ACTIVE) === 0) {
break; // Resize finished, continue with current table (this)
}
}
return currentTable;
}
}
关于 _getCurrentTable 和 _startResize 的进一步说明:
在实际的无锁哈希表中,_startResize 之后,需要一个全局的原子指针来指向当前的“活动”哈希表。所有的 put/get/delete 操作首先会原子地读取这个全局指针来确定它们应该操作哪个哈希表。当扩容完成时,这个全局指针会被原子地更新为指向新哈希表。这个全局指针本身也需要存储在 SharedArrayBuffer 中。
为了简化本讲座的示例,我们假设 LockFreeHashTable 实例本身就是通过某种方式被 Workers 访问的当前表,并且 _startResize 会在后台启动扩容过程,而 _helpResize 负责将数据迁移。真正的全局原子指针管理会增加代码的复杂性。
4.4 put(key, value) 方法
插入操作需要找到一个空槽位或现有键的槽位,并原子地更新它。
class LockFreeHashTable {
// ... constructor, helper methods, _helpResize, _startResize ...
put(key, value) {
// 在操作之前,检查是否正在扩容,并帮助迁移一些元素
let currentTable = this._getCurrentTable(); // 获取最新的活动表
if (Atomics.load(currentTable.data, META_OFFSET.RESIZE_ACTIVE) === 1) {
// 如果仍然在扩容,等待它完成
Atomics.wait(currentTable.data, META_OFFSET.FUTEX_WAIT_LOCATION, 1);
currentTable = this._getCurrentTable(); // 重新获取表
}
const capacity = Atomics.load(currentTable.data, META_OFFSET.CAPACITY);
const keyHash = simpleHash(key, capacity); // 先计算哈希值,避免重复计算
let index = currentTable._hash(key);
for (let i = 0; i < capacity; i++) {
const currentProbeIndex = (index + i) % capacity;
const slotStateOffset = currentTable._getSlotStateOffset(currentProbeIndex);
const slotKeyHashOffset = currentTable._getSlotKeyHashOffset(currentProbeIndex);
const slotKeyOffset = currentTable._getSlotKeyOffset(currentProbeIndex);
const slotValueOffset = currentTable._getSlotValueOffset(currentProbeIndex);
const currentState = Atomics.load(currentTable.data, slotStateOffset);
if (currentState === SlotState.EMPTY || currentState === SlotState.DELETED) {
// 尝试将 EMPTY 或 DELETED 槽位标记为 OCCUPIED
if (Atomics.compareExchange(currentTable.data, slotStateOffset, currentState, SlotState.OCCUPIED) === currentState) {
// 成功占有槽位,写入数据
Atomics.store(currentTable.data, slotKeyHashOffset, keyHash);
Atomics.store(currentTable.data, slotKeyOffset, key);
Atomics.store(currentTable.data, slotValueOffset, value);
Atomics.add(currentTable.data, META_OFFSET.SIZE, 1); // 原子递增大小
// 检查是否需要扩容
const currentSize = Atomics.load(currentTable.data, META_OFFSET.SIZE);
if (currentSize / capacity > LOAD_FACTOR_THRESHOLD) {
currentTable._startResize();
}
return true;
}
} else if (currentState === SlotState.OCCUPIED) {
// 槽位已被占用,检查是否是相同的键
const existingKeyHash = Atomics.load(currentTable.data, slotKeyHashOffset);
const existingKey = Atomics.load(currentTable.data, slotKeyOffset);
if (existingKeyHash === keyHash && existingKey === key) {
// 找到相同的键,更新值
// 注意:这里需要考虑“写-写”冲突,如果只是简单 `store`,可能覆盖旧值。
// 为了简单起见,我们允许直接覆盖。
// 更严格的实现可能需要版本号或额外的 CAS。
Atomics.store(currentTable.data, slotValueOffset, value);
return true;
}
// 否则,键不匹配,继续探测
}
// 如果遇到 RESERVED_FOR_RESIZE,表示这个槽位正在被迁移,需要等待或跳过
// 实际中,如果 `_getCurrentTable` 设计得当,应该不会直接遇到这个状态,
// 而是在新表上操作。为了健壮性,这里我们可以简单地继续探测。
}
// 如果循环结束仍未找到位置,说明哈希表已满或探测路径太长
// 这通常意味着负载因子过高导致探测失败,需要强制扩容或抛出错误
console.warn("Hash table is full or probe path too long, forcing resize.");
currentTable._startResize();
// 尝试再次插入,或者等待扩容完成并重试
Atomics.wait(currentTable.data, META_OFFSET.FUTEX_WAIT_LOCATION, 1);
return this.put(key, value); // 递归重试
}
}
4.5 get(key) 方法
读取操作需要找到键对应的槽位,并原子地读取值。
class LockFreeHashTable {
// ... all previous methods ...
get(key) {
let currentTable = this._getCurrentTable(); // 获取最新的活动表
if (Atomics.load(currentTable.data, META_OFFSET.RESIZE_ACTIVE) === 1) {
// 如果仍然在扩容,等待它完成
Atomics.wait(currentTable.data, META_OFFSET.FUTEX_WAIT_LOCATION, 1);
currentTable = this._getCurrentTable(); // 重新获取表
}
const capacity = Atomics.load(currentTable.data, META_OFFSET.CAPACITY);
const keyHash = simpleHash(key, capacity);
let index = currentTable._hash(key);
for (let i = 0; i < capacity; i++) {
const currentProbeIndex = (index + i) % capacity;
const slotStateOffset = currentTable._getSlotStateOffset(currentProbeIndex);
const slotKeyHashOffset = currentTable._getSlotKeyHashOffset(currentProbeIndex);
const slotKeyOffset = currentTable._getSlotKeyOffset(currentProbeIndex);
const slotValueOffset = currentTable._getSlotValueOffset(currentProbeIndex);
const currentState = Atomics.load(currentTable.data, slotStateOffset);
if (currentState === SlotState.EMPTY) {
// 遇到空槽位,说明键不存在
return undefined;
} else if (currentState === SlotState.OCCUPIED) {
// 槽位被占用,检查键是否匹配
const existingKeyHash = Atomics.load(currentTable.data, slotKeyHashOffset);
const existingKey = Atomics.load(currentTable.data, slotKeyOffset);
if (existingKeyHash === keyHash && existingKey === key) {
// 找到匹配的键,原子地读取值
return Atomics.load(currentTable.data, slotValueOffset);
}
}
// 如果是 DELETED 或 RESERVED_FOR_RESIZE,继续探测
}
// 遍历所有槽位仍未找到
return undefined;
}
}
4.6 delete(key) 方法
删除操作需要找到键对应的槽位,并原子地将其标记为 DELETED。
class LockFreeHashTable {
// ... all previous methods ...
delete(key) {
let currentTable = this._getCurrentTable(); // 获取最新的活动表
if (Atomics.load(currentTable.data, META_OFFSET.RESIZE_ACTIVE) === 1) {
// 如果仍然在扩容,等待它完成
Atomics.wait(currentTable.data, META_OFFSET.FUTEX_WAIT_LOCATION, 1);
currentTable = this._getCurrentTable(); // 重新获取表
}
const capacity = Atomics.load(currentTable.data, META_OFFSET.CAPACITY);
const keyHash = simpleHash(key, capacity);
let index = currentTable._hash(key);
for (let i = 0; i < capacity; i++) {
const currentProbeIndex = (index + i) % capacity;
const slotStateOffset = currentTable._getSlotStateOffset(currentProbeIndex);
const slotKeyHashOffset = currentTable._getSlotKeyHashOffset(currentProbeIndex);
const slotKeyOffset = currentTable._getSlotKeyOffset(currentProbeIndex);
const currentState = Atomics.load(currentTable.data, slotStateOffset);
if (currentState === SlotState.EMPTY) {
// 遇到空槽位,说明键不存在
return false;
} else if (currentState === SlotState.OCCUPIED) {
// 槽位被占用,检查键是否匹配
const existingKeyHash = Atomics.load(currentTable.data, slotKeyHashOffset);
const existingKey = Atomics.load(currentTable.data, slotKeyOffset);
if (existingKeyHash === keyHash && existingKey === key) {
// 找到匹配的键,尝试将其状态从 OCCUPIED 变为 DELETED
if (Atomics.compareExchange(currentTable.data, slotStateOffset, SlotState.OCCUPIED, SlotState.DELETED) === SlotState.OCCUPIED) {
Atomics.sub(currentTable.data, META_OFFSET.SIZE, 1); // 原子递减大小
return true;
}
// 如果 CAS 失败,说明其他 Worker 已经修改了这个槽位
// 例如,可能已经将其删除,或者在扩容过程中被标记为 RESERVED_FOR_RESIZE
// 在这种情况下,我们可以认为删除操作已经完成(或被其他操作覆盖),或者需要重试
// 为了简化,我们直接返回 true,假设最终状态是 DELETED
return true;
}
}
// 如果是 DELETED 或 RESERVED_FOR_RESIZE,继续探测
}
// 遍历所有槽位仍未找到
return false;
}
}
5. 如何在 Workers 中使用
现在我们有了 LockFreeHashTable 类,如何在 Web Workers 中使用它呢?
main.js (主线程)
// main.js
import { LockFreeHashTable } from './shared-hash-table.js';
const initialCapacity = 16;
const hashTable = new LockFreeHashTable(initialCapacity);
console.log('Main thread created hash table with buffer:', hashTable.buffer);
// 创建多个 Worker
const workerCount = 4;
const workers = [];
for (let i = 0; i < workerCount; i++) {
const worker = new Worker('./worker.js', { type: 'module' });
workers.push(worker);
// 将 SharedArrayBuffer 传递给 Worker
// 注意:SharedArrayBuffer 是通过引用传递的,而不是复制
worker.postMessage({ type: 'init', buffer: hashTable.buffer });
worker.onmessage = (event) => {
console.log(`Worker ${i} message:`, event.data);
};
}
// 主线程也可以执行操作
hashTable.put(100, 1000);
console.log('Main thread: put(100, 1000)');
console.log('Main thread: get(100) ->', hashTable.get(100));
// 等待一段时间,让 Workers 执行操作
setTimeout(() => {
console.log('nFinal state of hash table from main thread:');
for (let i = 0; i < 20; i++) { // 检查一些键
const value = hashTable.get(i);
if (value !== undefined) {
console.log(`get(${i}) -> ${value}`);
}
}
console.log('Final size:', Atomics.load(hashTable.data, META_OFFSET.SIZE));
}, 2000); // 2秒后检查
worker.js (Web Worker)
// worker.js
import { LockFreeHashTable, SlotState, META_OFFSET } from './shared-hash-table.js';
let hashTableInstance;
let workerId;
self.onmessage = (event) => {
const { type, buffer, id } = event.data;
if (type === 'init') {
workerId = id; // 假设传递 Worker ID
hashTableInstance = new LockFreeHashTable(0, buffer, false); // 从现有 buffer 创建
console.log(`Worker ${workerId} initialized with shared buffer.`);
// Worker 开始执行并发操作
performConcurrentOperations();
}
};
function performConcurrentOperations() {
const startKey = workerId * 100; // 每个 Worker 操作不同的键范围
const endKey = startKey + 50;
for (let i = startKey; i < endKey; i++) {
// 模拟一些并发操作
if (i % 3 === 0) {
hashTableInstance.put(i, i * 10 + workerId);
// self.postMessage(`Worker ${workerId}: put(${i}, ${i * 10 + workerId})`);
} else if (i % 5 === 0) {
hashTableInstance.delete(i - 1); // 尝试删除前一个键
// self.postMessage(`Worker ${workerId}: delete(${i - 1})`);
} else {
const value = hashTableInstance.get(i);
// if (value !== undefined) {
// self.postMessage(`Worker ${workerId}: get(${i}) -> ${value}`);
// }
}
}
self.postMessage(`Worker ${workerId} finished operations.`);
}
请注意,为了让 worker.js 能够导入 LockFreeHashTable,你需要确保 shared-hash-table.js 是一个 ES 模块。在 worker.js 的 new Worker('./worker.js', { type: 'module' }); 中,type: 'module' 是关键。
6. 挑战与考量
虽然我们实现了一个基本的无锁并发哈希表,但无锁编程的复杂性远不止于此。
- ABA 问题:
compareExchange操作只能保证值在操作瞬间没有改变,但不能保证它没有从 A 变为 B 再变回 A。对于我们当前的整数键值对,这通常不是问题。但在更复杂的场景(例如,使用指针或索引来链接数据结构时),ABA 问题可能导致逻辑错误。解决办法通常是使用版本号(tagging)或双字 CAS (DCAS) 来确保状态的唯一性。 - 内存回收: 我们的哈希表只是将槽位标记为
DELETED。这些槽位不会被真正回收,导致空间浪费。在 C/C++ 等语言中,无锁内存回收是一个非常复杂的问题(如 RCU, Hazard Pointers)。在 JavaScript 中,由于有垃圾回收器,我们通常不需要手动管理SharedArrayBuffer中已删除槽位的字节,但如果内部数据结构是复杂对象引用,那么如何安全地回收这些对象的空间仍需仔细设计。 - 字符串和其他复杂类型: 我们的示例只处理了整数键和值。存储字符串需要额外的序列化/反序列化逻辑,例如将字符串编码为 UTF-8 字节数组,并将其存储在
SharedArrayBuffer的单独区域,然后在槽位中存储其起始偏移量和长度。这将显著增加实现复杂性。 - 死锁 vs. 活锁/饥饿: 无锁编程消除了死锁,但可能引入活锁(多个 Worker 不断重试但都无法取得进展)或饥饿(某个 Worker 总是输掉竞争)。良好的设计可以最小化这些风险。我们的扩容机制通过
_helpResize协作式地完成任务,减少了单个 Worker 饥饿的可能性。Atomics.wait/notify缓解了忙等待带来的 CPU 浪费,但如果notify机制设计不当,仍然可能导致某些 Worker 长期等待。 - 性能考量:
Atomics操作通常比普通的内存访问开销更大,因为它们需要硬件级别的同步。在高竞争环境下,无锁数据结构通常表现优异;但在低竞争或单线程场景下,其性能可能不如简单的有锁数据结构甚至非并发数据结构。 - 错误处理和调试: 无锁并发代码的调试极其困难,因为错误可能非确定性地出现。仔细的单元测试、并发测试和日志记录至关重要。
7. 总结
本讲座深入探讨了在 JavaScript 中利用 SharedArrayBuffer 和 Atomics 原语构建无锁并发哈希表的方法。我们了解了 SharedArrayBuffer 如何提供共享内存,Atomics 如何保证操作的原子性以避免数据竞争,以及 Futex (Atomics.wait 和 Atomics.notify) 如何实现高效的线程间同步。通过一个协作式扩容的线性探测哈希表示例,我们展示了如何在实践中应用这些概念。
无锁编程是现代并发系统中的一项强大技术,它能有效提高多线程应用的性能和响应能力,并消除死锁风险。然而,其实现复杂性和调试难度也要求开发者具备深入的并发编程知识和严谨的设计思维。希望本次讲座能为您开启无锁并发编程的大门,激发您在 JavaScript 中探索更高效并发解决方案的兴趣。