JS Node.js `Worker Threads` (Node 10+):多线程 CPU 密集型任务

各位观众,大家好!今天咱们来聊聊Node.js的Worker Threads,这玩意儿就像给你的Node.js程序装了个涡轮增压,专门解决CPU密集型任务,让你的服务器不再卡成PPT。

一、Node.js的单线程困境

Node.js以其非阻塞I/O和事件循环而闻名,这使得它在处理高并发I/O密集型任务时表现出色。但是,当遇到需要大量CPU运算的任务(比如图像处理、密码破解、大数据分析)时,单线程的Node.js就会被阻塞,导致整个应用程序的响应速度下降,就像高速公路上突然出现了一个堵车点,后面的车全得跟着遭殃。

想象一下,你在做一个在线图像编辑器,用户上传一张图片,你需要对图片进行各种复杂的滤镜处理。如果这些处理都在主线程中进行,那么用户在等待处理结果的时候,整个网站都会卡顿,用户体验瞬间跌落谷底。

二、Worker Threads:拯救Node.js的英雄

Worker Threads就像是Node.js的救星,它允许你创建多个线程,将CPU密集型任务分配给这些线程并行执行,从而避免阻塞主线程。这样,即使有复杂的计算任务,你的应用程序也能保持流畅的响应速度。

简单来说,Worker Threads就是给Node.js开了个分身术,让它能同时处理多个任务,不再是一个人单打独斗。

三、Worker Threads的基本用法

  1. 引入worker_threads模块

    首先,你需要引入Node.js的worker_threads模块,这个模块提供了创建和管理Worker Threads所需的API。

    const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
    • Worker: 用于创建新的Worker线程。
    • isMainThread: 用于判断当前代码是否运行在主线程中。
    • parentPort: 用于在Worker线程中与主线程进行通信。
    • workerData: 用于在主线程中向Worker线程传递数据。
  2. 判断当前线程是否为主线程

    使用isMainThread可以判断当前代码是否运行在主线程中。如果是主线程,则创建Worker线程;如果是Worker线程,则执行具体的任务。

    if (isMainThread) {
      // 主线程
      console.log('This is the main thread.');
    } else {
      // Worker线程
      console.log('This is a worker thread.');
    }
  3. 创建Worker线程

    在主线程中,使用new Worker()创建一个新的Worker线程。new Worker()的参数是一个JavaScript文件,这个文件包含了Worker线程需要执行的代码。

    if (isMainThread) {
      const worker = new Worker('./worker.js', { workerData: { value: 10 } }); // 传递数据给worker线程
    }
    • './worker.js': Worker线程执行的JavaScript文件路径。
    • workerData: 一个可选的配置对象,用于向Worker线程传递数据。这里我们传递了一个包含value属性的对象。
  4. 在Worker线程中执行任务

    在Worker线程中,你可以访问workerData获取主线程传递的数据,并执行具体的任务。

    if (!isMainThread) {
      const { value } = workerData;
      console.log(`Worker thread received data: ${value}`);
      // 执行一些耗时的计算
      let result = 0;
      for (let i = 0; i < value * 10000000; i++) {
          result += i;
      }
      parentPort.postMessage(result); // 将结果发送回主线程
    }
    • workerData: 包含了主线程传递的数据。
    • parentPort.postMessage(): 用于将数据发送回主线程。
  5. 主线程接收Worker线程的结果

    在主线程中,你可以监听Worker线程的message事件,接收Worker线程返回的结果。

    if (isMainThread) {
      const worker = new Worker('./worker.js', { workerData: { value: 10 } });
    
      worker.on('message', (result) => {
        console.log(`Main thread received result: ${result}`);
      });
    
      worker.on('error', (err) => {
          console.error(`Worker thread encountered an error: ${err}`);
      });
    
      worker.on('exit', (code) => {
          console.log(`Worker thread exited with code: ${code}`);
      });
    }
    • worker.on('message'): 监听message事件,接收Worker线程发送的数据。
    • worker.on('error'): 监听error事件,处理Worker线程的错误。
    • worker.on('exit'): 监听exit事件,Worker线程退出时的处理。

四、一个完整的例子:计算斐波那契数列

咱们来写一个完整的例子,用Worker Threads来计算斐波那契数列。这是一个经典的CPU密集型任务。

主线程 (index.js):

const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

if (isMainThread) {
  const num = 40; // 计算第40个斐波那契数
  console.log(`Starting Fibonacci calculation for ${num} in the main thread...`);

  const startTime = Date.now();
  const worker = new Worker('./worker.js', { workerData: { n: num } });

  worker.on('message', (result) => {
    const endTime = Date.now();
    console.log(`Fibonacci(${num}) = ${result}`);
    console.log(`Calculation took ${endTime - startTime}ms`);
  });

  worker.on('error', (err) => {
    console.error(`Worker thread encountered an error: ${err}`);
  });

  worker.on('exit', (code) => {
    console.log(`Worker thread exited with code: ${code}`);
  });

  console.log("Main thread continues executing other tasks..."); // 主线程继续执行其他任务
} else {
  // This is intentionally left empty. The worker logic is in worker.js
}

Worker线程 (worker.js):

const { parentPort, workerData } = require('worker_threads');

function fibonacci(n) {
  if (n <= 1) {
    return n;
  }
  return fibonacci(n - 1) + fibonacci(n - 2);
}

const { n } = workerData;
const result = fibonacci(n);

parentPort.postMessage(result);

运行 node index.js,你会看到主线程创建了一个Worker线程来计算斐波那契数列,而主线程并没有被阻塞,仍然可以执行其他任务。

五、Worker Threads的注意事项

  1. 数据共享: Worker Threads之间不共享内存,它们通过消息传递进行通信。这意味着你需要序列化和反序列化数据,这可能会带来性能开销。不过,Node.js提供了一些高效的序列化方法,比如Buffer

  2. 线程安全: 你需要注意线程安全问题。避免多个线程同时修改共享的数据,否则可能会导致数据竞争和死锁。可以使用锁或其他同步机制来保护共享数据。当然,最好的方法是尽量避免共享数据。

  3. 资源消耗: 创建和管理线程会消耗一定的系统资源。不要创建过多的线程,否则可能会导致系统性能下降。通常,线程的数量应该与CPU核心的数量相匹配。

  4. 错误处理: 你需要妥善处理Worker线程中可能出现的错误。可以使用worker.on('error')监听error事件,捕获Worker线程的错误。

  5. 调试: 调试Worker Threads可能会比较困难。可以使用Node.js的调试器,或者使用console.log进行调试。

六、Worker Threads与cluster模块的区别

Node.js还提供了一个cluster模块,也可以用于创建多个进程来处理并发请求。那么,Worker Threads和cluster模块有什么区别呢?

特性 Worker Threads cluster模块
进程/线程 多线程 多进程
内存共享 不共享(需要消息传递) 不共享(每个进程有独立的内存空间)
资源消耗 相对较低 相对较高
适用场景 CPU密集型任务,需要细粒度的并发控制 I/O密集型任务,需要负载均衡
复杂性 相对较高 相对较低
通信方式 postMessage IPC (进程间通信)

简单来说,Worker Threads适用于CPU密集型任务,而cluster模块适用于I/O密集型任务。Worker Threads的资源消耗更低,但编程模型更复杂。cluster模块的编程模型更简单,但资源消耗更高。

七、更高级的用法:使用Transferable对象

为了进一步提高Worker Threads的性能,Node.js还提供了Transferable对象。Transferable对象允许你将内存的所有权从一个线程转移到另一个线程,而无需复制数据。这对于处理大型数据非常有用。

比如,你可以使用Transferable对象来传递ArrayBufferMessagePort

// 主线程
const { Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
  const buffer = new Uint8Array(1024 * 1024 * 100); // 100MB
  const worker = new Worker('./worker.js');

  worker.postMessage(buffer.buffer, [buffer.buffer]); // 传递ArrayBuffer的所有权
}

// Worker线程
const { parentPort } = require('worker_threads');

parentPort.on('message', (buffer) => {
  console.log('Worker thread received ArrayBuffer');
  // 现在Worker线程拥有buffer的所有权
  // ...
});

在这个例子中,主线程将buffer.buffer的所有权传递给了Worker线程,而无需复制数据。这样可以避免大量的内存拷贝,从而提高性能。

八、总结

Worker Threads是Node.js中一个非常强大的工具,可以让你充分利用多核CPU的优势,提高应用程序的性能。但是,使用Worker Threads也需要注意一些问题,比如线程安全、资源消耗和错误处理。希望今天的讲解能够帮助你更好地理解和使用Worker Threads。

记住,选择合适的工具取决于你的具体需求。如果你的应用程序需要处理大量的CPU密集型任务,那么Worker Threads绝对值得你尝试。如果你的应用程序主要是I/O密集型任务,那么cluster模块可能更适合你。

好了,今天的讲座就到这里。 感谢各位的收看,希望下次有机会再和大家分享更多的技术知识!

代码示例汇总:

index.js (主线程)

const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

if (isMainThread) {
  const num = 40; // 计算第40个斐波那契数
  console.log(`Starting Fibonacci calculation for ${num} in the main thread...`);

  const startTime = Date.now();
  const worker = new Worker('./worker.js', { workerData: { n: num } });

  worker.on('message', (result) => {
    const endTime = Date.now();
    console.log(`Fibonacci(${num}) = ${result}`);
    console.log(`Calculation took ${endTime - startTime}ms`);
  });

  worker.on('error', (err) => {
    console.error(`Worker thread encountered an error: ${err}`);
  });

  worker.on('exit', (code) => {
    console.log(`Worker thread exited with code: ${code}`);
  });

  console.log("Main thread continues executing other tasks..."); // 主线程继续执行其他任务
} else {
  // This is intentionally left empty. The worker logic is in worker.js
}

worker.js (Worker线程)

const { parentPort, workerData } = require('worker_threads');

function fibonacci(n) {
  if (n <= 1) {
    return n;
  }
  return fibonacci(n - 1) + fibonacci(n - 2);
}

const { n } = workerData;
const result = fibonacci(n);

parentPort.postMessage(result);

ArrayBuffer 示例

// 主线程
const { Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
  const buffer = new Uint8Array(1024 * 1024 * 100); // 100MB
  const worker = new Worker('./worker.js');

  worker.postMessage(buffer.buffer, [buffer.buffer]); // 传递ArrayBuffer的所有权
}

// Worker线程
const { parentPort } = require('worker_threads');

parentPort.on('message', (buffer) => {
  console.log('Worker thread received ArrayBuffer');
  // 现在Worker线程拥有buffer的所有权
  // ...
});

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注