Node.js Worker Threads:实现 Node.js 应用的真正多线程

好的,各位听众,观众,以及屏幕前的各位码农朋友们,欢迎来到今天的“Node.js Worker Threads:让你的 Node.js 应用飞起来!”技术讲座现场。我是你们的老朋友,江湖人称“代码诗人”的程序猿老王。

今天咱们不聊诗和远方,就聊聊眼前苟且——啊,不对,是聊聊如何用 Node.js Worker Threads 把你那单线程堵得死死的 Node.js 应用,变成多线程的火箭🚀,嗖嗖嗖地飞起来!

第一幕:单线程的无奈,我们都懂

Node.js 以其非阻塞 I/O 和事件循环机制,在处理高并发 I/O 密集型任务时表现出色。但它有个致命的弱点:单线程

想象一下,你是一家餐厅的老板,只有一位厨师(Node.js 主线程)。客人点餐(请求)如潮水般涌来,厨师忙得焦头烂额。如果客人点的都是炒青菜(I/O 密集型),厨师还能勉强应付,毕竟炒菜很快。但如果客人点了红烧肉(CPU 密集型),厨师就得花费大量时间炖肉,其他客人就只能干等着,甚至怒而离席(应用响应慢,甚至崩溃)。

这就是单线程的弊端。当 Node.js 主线程被 CPU 密集型任务(比如复杂的计算、图像处理、加密解密等)阻塞时,整个应用就卡住了,无法响应其他请求。

// 一个典型的 CPU 密集型任务
function fibonacci(n) {
  if (n <= 1) {
    return n;
  }
  return fibonacci(n - 1) + fibonacci(n - 2);
}

// 阻塞主线程的调用
console.log("开始计算斐波那契数列...");
const result = fibonacci(40); // 计算斐波那契数列的第40项,非常耗时
console.log("计算完成,结果是:", result);
console.log("后续代码继续执行...");

运行这段代码,你会发现,“后续代码继续执行…”要等很久才能打印出来,这就是主线程被阻塞的铁证!

第二幕:Worker Threads 登场,拯救世界!

为了解决单线程的困境,Node.js 引入了 Worker Threads 模块。它允许你创建多个线程,并将 CPU 密集型任务分配给这些线程并行执行,从而释放主线程的压力,提高应用的整体性能。

Worker Threads 就像餐厅里新招的几个厨师,他们可以同时处理不同的红烧肉订单,而主厨(主线程)则可以继续处理其他炒青菜订单,或者负责协调整个餐厅的运作。

Worker Threads 的核心思想:

  • 隔离性: 每个 Worker Thread 都有自己的 JavaScript 虚拟机(V8 引擎实例),拥有独立的内存空间和事件循环。它们之间通过消息传递进行通信,避免了共享内存带来的线程安全问题。
  • 并行性: Worker Threads 可以在不同的 CPU 核心上并行执行,充分利用多核 CPU 的性能。
  • 易用性: Node.js 提供了简洁易用的 API,方便你创建和管理 Worker Threads。

如何使用 Worker Threads?

  1. 引入 worker_threads 模块:

    const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
  2. 判断是否为主线程:

    isMainThread 属性用于判断当前代码是否运行在主线程中。如果是主线程,则可以创建 Worker Thread;如果是 Worker Thread,则执行相应的任务。

  3. 创建 Worker Thread:

    使用 new Worker(filename[, options]) 创建一个新的 Worker Thread。filename 是 Worker Thread 执行的 JavaScript 文件路径,options 可以设置 Worker Thread 的一些属性,比如 workerData(传递给 Worker Thread 的数据)。

  4. 主线程和 Worker Thread 之间的通信:

    • 主线程 -> Worker Thread: 使用 worker.postMessage(message) 向 Worker Thread 发送消息。
    • Worker Thread -> 主线程: 使用 parentPort.postMessage(message) 向主线程发送消息。
    • 主线程监听 Worker Thread 的消息: 使用 worker.on('message', (message) => { ... }) 监听 Worker Thread 发送的消息。
    • Worker Thread 监听主线程的消息: 使用 parentPort.on('message', (message) => { ... }) 监听主线程发送的消息。
  5. 处理 Worker Thread 的事件:

    • worker.on('error', (err) => { ... }):监听 Worker Thread 抛出的错误。
    • worker.on('exit', (code) => { ... }):监听 Worker Thread 退出事件,code 是退出码。

第三幕:实战演练,代码说话

让我们用 Worker Threads 来改造一下上面的斐波那契数列计算示例,看看效果如何。

1. 创建 worker.js 文件 (Worker Thread 代码):

const { parentPort, workerData } = require('worker_threads');

function fibonacci(n) {
  if (n <= 1) {
    return n;
  }
  return fibonacci(n - 1) + fibonacci(n - 2);
}

const { number } = workerData;
console.log(`Worker Thread 开始计算斐波那契数列 ${number} ...`);
const result = fibonacci(number);
console.log(`Worker Thread 计算完成,结果是:${result}`);
parentPort.postMessage(result); // 将结果发送给主线程

2. 修改主线程代码 (主文件,比如 index.js):

const { Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
  console.log("主线程开始执行...");

  const worker = new Worker('./worker.js', { workerData: { number: 40 } }); // 创建 Worker Thread,传递数据

  worker.on('message', (result) => {
    console.log("主线程接收到 Worker Thread 的结果:", result);
    console.log("后续代码继续执行...");
  });

  worker.on('error', (err) => {
    console.error("Worker Thread 发生错误:", err);
  });

  worker.on('exit', (code) => {
    console.log("Worker Thread 退出,退出码:", code);
  });

  console.log("主线程继续执行其他任务..."); // 主线程不会被阻塞
} else {
  // 这部分代码永远不会执行,因为已经通过 `worker.js` 定义了 Worker Thread 的逻辑
}

运行 index.js,你会发现,“主线程继续执行其他任务…”会立即打印出来,而斐波那契数列的计算是在 Worker Thread 中进行的,不会阻塞主线程。

第四幕:Worker Threads 的优势与劣势,理性分析

优势:

  • 解决 CPU 密集型任务阻塞主线程的问题: 这是 Worker Threads 最核心的优势。
  • 提高应用性能: 通过并行执行任务,充分利用多核 CPU 的性能,提高应用的整体吞吐量和响应速度。
  • 增强应用稳定性: 即使某个 Worker Thread 崩溃,也不会影响主线程和其他 Worker Thread 的运行。

劣势:

  • 增加了代码的复杂性: 需要编写额外的 Worker Thread 代码,并处理线程之间的通信。
  • 增加了内存消耗: 每个 Worker Thread 都有自己的内存空间,会增加应用的内存消耗。
  • 通信开销: 线程之间的通信需要进行数据序列化和反序列化,有一定的开销。
  • 调试难度增加: 多线程程序的调试比单线程程序更加困难。

何时使用 Worker Threads?

  • 存在 CPU 密集型任务,且这些任务会阻塞主线程。
  • 对应用性能有较高要求,需要充分利用多核 CPU 的性能。
  • 可以接受一定的代码复杂性和内存消耗。

不适用场景:

  • 应用主要处理 I/O 密集型任务,不需要进行大量的 CPU 计算。
  • 代码逻辑非常简单,不需要进行并行处理。
  • 对代码的简洁性要求极高,不希望引入额外的线程管理代码。

第五幕:进阶技巧,玩转 Worker Threads

  1. 线程池: 为了避免频繁创建和销毁 Worker Threads 带来的开销,可以使用线程池来管理 Worker Threads。线程池维护一个 Worker Threads 集合,当需要执行任务时,从线程池中获取一个空闲的 Worker Thread,任务完成后,将 Worker Thread 返回到线程池中。

    可以使用第三方库,比如 piscina 来实现线程池。

    const Piscina = require('piscina');
    
    const piscina = new Piscina({
      filename: './worker.js',
      minThreads: 4, // 最小线程数
      maxThreads: 8  // 最大线程数
    });
    
    // 调用 Worker Thread 的函数
    piscina.run({ number: 40 })
      .then((result) => {
        console.log("主线程接收到线程池的结果:", result);
      });
  2. 共享 ArrayBuffer: 虽然 Worker Threads 之间不能直接共享内存,但可以通过共享 ArrayBuffer 来实现数据共享。ArrayBuffer 是一种用于表示原始二进制数据的类型化数组,可以在主线程和 Worker Threads 之间传递。

    // 主线程
    const { Worker, isMainThread } = require('worker_threads');
    const { Int32Array } = global; // Node.js v16+
    
    if (isMainThread) {
      const sharedArrayBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * 10); // 创建共享的 ArrayBuffer
      const sharedArray = new Int32Array(sharedArrayBuffer); // 创建 Int32Array 视图
    
      const worker = new Worker('./worker.js', { workerData: sharedArrayBuffer }); // 传递 ArrayBuffer
    
      worker.on('message', (message) => {
        console.log("主线程接收到 Worker Thread 的消息:", message);
        console.log("主线程共享数组的值:", sharedArray); // 可以看到 Worker Thread 修改了共享数组
      });
    
      sharedArray[0] = 10; // 主线程修改共享数组的值
    
    }
    
    // worker.js
    const { workerData } = require('worker_threads');
    const { Int32Array } = global;
    
    const sharedArrayBuffer = workerData;
    const sharedArray = new Int32Array(sharedArrayBuffer);
    
    console.log("Worker Thread 接收到共享数组的值:", sharedArray);
    sharedArray[1] = 20; // Worker Thread 修改共享数组的值
    parentPort.postMessage("Worker Thread 完成修改");
  3. Atomics API: 当多个 Worker Threads 同时访问和修改共享 ArrayBuffer 时,可能会出现数据竞争问题。Atomics API 提供了一组原子操作,可以保证对共享数据的操作是原子性的,避免数据竞争。

    // 假设 sharedArrayBuffer 和 sharedArray 已经创建
    // 使用 Atomics.add 原子性地增加 sharedArray[0] 的值
    Atomics.add(sharedArray, 0, 5);

第六幕:最佳实践,避免踩坑

  • 尽量减少线程之间的通信: 线程之间的通信会带来额外的开销,尽量将需要通信的数据量控制在最小范围内。
  • 避免在 Worker Threads 中执行 I/O 操作: Worker Threads 主要用于处理 CPU 密集型任务,避免在 Worker Threads 中执行 I/O 操作,以免阻塞 Worker Threads 的执行。
  • 合理设置 Worker Threads 的数量: Worker Threads 的数量并非越多越好,需要根据 CPU 核心数和任务的特性进行合理的设置。一般来说,Worker Threads 的数量等于 CPU 核心数或略大于 CPU 核心数即可。
  • 进行充分的测试: 多线程程序的测试比单线程程序更加复杂,需要进行充分的测试,确保程序的正确性和稳定性。

第七幕:总结与展望

Worker Threads 是 Node.js 中一个强大的多线程解决方案,可以帮助你解决 CPU 密集型任务阻塞主线程的问题,提高应用的性能和稳定性。但同时,它也增加了代码的复杂性,需要谨慎使用。

希望今天的讲座能够帮助你更好地理解和使用 Worker Threads,让你的 Node.js 应用飞起来!

最后,用一句代码界的至理名言来结束今天的讲座:

"Talk is cheap. Show me the code." (少说多做,代码为王!)

感谢各位的聆听,下次再见! 🍻

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注