JS `Worker` 线程池的实现与管理:提升并发任务处理效率

各位观众老爷们,大家好!今天咱们来聊聊 JavaScript 里一个挺有意思的东西:Worker 线程池。别害怕“线程池”这个词,听起来好像很高大上,其实没那么复杂。咱们的目标是让 JS 在浏览器里也能像后端语言一样,并发处理任务,提升效率。

为什么我们需要 Worker 线程池?

首先,我们要明白 JS 是一门单线程语言。这意味着它一次只能执行一个任务。如果你让 JS 做一些耗时的操作,比如复杂的计算、图像处理、网络请求等等,浏览器就会卡住,用户体验会很糟糕。

这时候,Worker 就派上用场了。Worker 可以在后台线程中运行 JS 代码,不会阻塞主线程。但问题是,如果需要同时处理大量的任务,频繁地创建和销毁 Worker 也是很耗资源的。

所以,我们需要一个 Worker 线程池。线程池可以预先创建一些 Worker,然后将任务分配给这些 Worker 去执行。任务执行完毕后,Worker 不会被销毁,而是等待下一个任务。这样可以避免频繁创建和销毁 Worker 的开销,提高并发任务处理效率。

Worker 线程池的核心概念

在深入代码之前,我们需要了解几个核心概念:

  • Worker: 独立于主线程运行的 JS 脚本。
  • 任务队列: 存放待执行任务的队列。
  • 线程池大小: 线程池中 Worker 的数量。
  • 任务分配: 将任务从任务队列分配给空闲 Worker。
  • 消息传递: 主线程和 Worker 之间通过 postMessage 方法进行通信。

Worker 线程池的实现

接下来,我们就来一步一步地实现一个简单的 Worker 线程池。

1. Worker 代码 (worker.js)

首先,我们需要一个 Worker 的脚本,它负责接收任务、执行任务,然后将结果返回给主线程。

// worker.js
self.onmessage = function(event) {
  const data = event.data;
  const taskId = data.taskId;
  const taskData = data.taskData;

  // 模拟耗时操作
  const result = longRunningTask(taskData);

  self.postMessage({
    taskId: taskId,
    result: result
  });
};

function longRunningTask(data) {
  // 模拟耗时计算
  let sum = 0;
  for (let i = 0; i < data; i++) {
    sum += i;
  }
  return sum;
}
  • self.onmessage: 监听主线程发来的消息。
  • event.data: 包含主线程发来的数据。
  • taskId: 任务 ID,用于标识任务。
  • taskData: 任务数据,Worker 需要处理的数据。
  • longRunningTask: 模拟一个耗时的任务。这里是一个简单的求和操作,你可以根据实际需求替换成更复杂的任务。
  • self.postMessage: 将结果发送回主线程。

2. 线程池类 (ThreadPool.js)

接下来,我们创建一个 ThreadPool 类,用于管理 Worker 线程池。

// ThreadPool.js
class ThreadPool {
  constructor(size) {
    this.size = size; // 线程池大小
    this.workers = []; // Worker 数组
    this.taskQueue = []; // 任务队列
    this.idleWorkers = []; // 空闲 Worker 队列
    this.taskIdCounter = 0; // 任务 ID 计数器
    this.taskPromises = {}; // 存储任务 promise 的对象

    this.initialize();
  }

  initialize() {
    for (let i = 0; i < this.size; i++) {
      const worker = new Worker('worker.js');
      this.workers.push(worker);
      this.idleWorkers.push(worker);

      worker.onmessage = (event) => {
        const data = event.data;
        const taskId = data.taskId;
        const result = data.result;

        // Resolve 对应的 promise
        this.taskPromises[taskId].resolve(result);
        delete this.taskPromises[taskId];

        // 将 Worker 放回空闲队列
        this.idleWorkers.push(worker);
        this.processTaskQueue();
      };

      worker.onerror = (error) => {
        console.error('Worker error:', error);
        const taskId = error.message.match(/Task ID: (d+)/)?.[1]; // 尝试从错误消息中提取 Task ID
        if (taskId && this.taskPromises[taskId]) {
           this.taskPromises[taskId].reject(new Error(`Worker 执行失败,任务 ID: ${taskId}`));
           delete this.taskPromises[taskId];
        }
        // 移除错误的worker, 并新建一个
        const index = this.workers.indexOf(worker);
        if(index > -1){
            this.workers.splice(index,1);
        }
        const idleIndex = this.idleWorkers.indexOf(worker);
        if(idleIndex > -1){
            this.idleWorkers.splice(idleIndex, 1);
        }

        const newWorker = new Worker('worker.js');
        this.workers.push(newWorker);
        this.idleWorkers.push(newWorker);

        newWorker.onmessage = (event) => {
             const data = event.data;
             const taskId = data.taskId;
             const result = data.result;

              // Resolve 对应的 promise
              this.taskPromises[taskId].resolve(result);
              delete this.taskPromises[taskId];

              // 将 Worker 放回空闲队列
              this.idleWorkers.push(newWorker);
              this.processTaskQueue();

        };

        newWorker.onerror = (error) => {
             console.error('New Worker error:', error);
             const taskId = error.message.match(/Task ID: (d+)/)?.[1]; // 尝试从错误消息中提取 Task ID
             if (taskId && this.taskPromises[taskId]) {
                this.taskPromises[taskId].reject(new Error(`New Worker 执行失败,任务 ID: ${taskId}`));
                delete this.taskPromises[taskId];
             }
             // 移除错误的worker, 并新建一个
             const index = this.workers.indexOf(newWorker);
             if(index > -1){
                 this.workers.splice(index,1);
             }
             const idleIndex = this.idleWorkers.indexOf(newWorker);
             if(idleIndex > -1){
                 this.idleWorkers.splice(idleIndex, 1);
             }

        };
        this.processTaskQueue();

      };
    }
  }

  submitTask(taskData) {
    return new Promise((resolve, reject) => {
      const taskId = this.taskIdCounter++;
      this.taskPromises[taskId] = { resolve, reject };
      this.taskQueue.push({ taskId, taskData });
      this.processTaskQueue();
    });
  }

  processTaskQueue() {
    if (this.taskQueue.length > 0 && this.idleWorkers.length > 0) {
      const worker = this.idleWorkers.shift();
      const task = this.taskQueue.shift();
      const taskId = task.taskId;
      const taskData = task.taskData;

      worker.postMessage({ taskId: taskId, taskData: taskData });
    }
  }

  terminate() {
    this.workers.forEach(worker => worker.terminate());
    this.workers = [];
    this.idleWorkers = [];
    this.taskQueue = [];
    this.taskPromises = {};
  }
}
  • constructor(size): 构造函数,接收线程池大小作为参数。
    • size: 线程池中 Worker 的数量。
    • workers: 存储 Worker 对象的数组。
    • taskQueue: 存储待执行任务的队列。
    • idleWorkers: 存储空闲 Worker 对象的队列。
    • taskIdCounter: 任务 ID 计数器,用于生成唯一的任务 ID。
    • taskPromises: 一个对象,用于存储每个任务对应的 Promise 的 resolve 和 reject 函数。 这样,当 Worker 完成任务后,就可以通过 resolve 函数将结果传递给 Promise。
  • initialize(): 初始化线程池,创建指定数量的 Worker,并设置 onmessage 事件监听器。
    • initialize 方法中,我们循环创建指定数量的 Worker。
    • worker.onmessage: 监听 Worker 发回的消息。当 Worker 完成任务后,会触发 onmessage 事件。
      • data: 包含 Worker 发回的数据,包括 taskIdresult
      • taskId: 任务 ID,用于标识任务。
      • result: 任务执行结果。
      • onmessage 事件处理函数中,我们首先根据 taskId 找到对应的 Promise 的 resolve 函数,然后将 result 传递给 resolve 函数,从而完成 Promise。
      • 最后,我们将 Worker 放回空闲队列 idleWorkers,并调用 processTaskQueue() 方法,尝试从任务队列中取出新的任务分配给该 Worker。
    • worker.onerror: 监听worker错误,用于处理worker内部错误并重建worker。同时reject掉响应的Promise。
  • submitTask(taskData): 提交任务到线程池。
    • taskData: 任务数据,Worker 需要处理的数据。
    • submitTask 方法返回一个 Promise 对象。
    • 我们首先生成一个唯一的 taskId,然后创建一个 Promise 对象,并将它的 resolvereject 函数存储在 taskPromises 对象中。
    • 然后,我们将任务数据 taskDatataskId 封装成一个任务对象,并将其放入任务队列 taskQueue 中。
    • 最后,我们调用 processTaskQueue() 方法,尝试从任务队列中取出新的任务分配给空闲的 Worker。
  • processTaskQueue(): 从任务队列中取出任务,分配给空闲的 Worker。
    • processTaskQueue 方法首先判断任务队列 taskQueue 是否为空,以及空闲 Worker 队列 idleWorkers 是否为空。只有当两个队列都不为空时,才说明有任务需要处理,并且有空闲的 Worker 可以执行任务。
    • 如果条件满足,我们从 idleWorkers 队列中取出一个 Worker,并从 taskQueue 队列中取出一个任务。
    • 然后,我们将任务数据 taskDatataskId 通过 worker.postMessage() 方法发送给 Worker。
  • terminate(): 销毁线程池,终止所有 Worker。
    • 用于清理线程池,终止所有 Worker 线程。

3. 使用线程池 (index.html)

最后,我们在 HTML 文件中使用 ThreadPool 类。

<!DOCTYPE html>
<html>
<head>
  <title>Worker 线程池示例</title>
</head>
<body>
  <button id="startBtn">开始任务</button>
  <div id="result"></div>

  <script src="ThreadPool.js"></script>
  <script>
    const threadPoolSize = 4; // 设置线程池大小
    const threadPool = new ThreadPool(threadPoolSize);
    const resultDiv = document.getElementById('result');
    const startBtn = document.getElementById('startBtn');

    startBtn.addEventListener('click', async () => {
      const numTasks = 10; // 设置任务数量
      const taskData = 100000000; // 设置任务数据

      resultDiv.innerHTML = '任务开始...';
      const startTime = Date.now();

      const promises = [];
      for (let i = 0; i < numTasks; i++) {
        promises.push(threadPool.submitTask(taskData));
      }

      const results = await Promise.all(promises);
      const endTime = Date.now();
      const totalTime = endTime - startTime;

      resultDiv.innerHTML = `任务完成,总耗时:${totalTime}ms,结果:${results.join(', ')}`;

      // 可选:在不再需要线程池时,终止它
      // threadPool.terminate();
    });
  </script>
</body>
</html>
  • threadPoolSize: 设置线程池大小。
  • numTasks: 设置任务数量。
  • taskData: 设置任务数据。
  • 我们创建了一个 ThreadPool 实例,并提交了多个任务。
  • Promise.all(promises): 等待所有任务完成。
  • threadPool.terminate(): 销毁线程池。

代码解释

  • index.html 中,我们首先创建了一个 ThreadPool 实例,并设置线程池大小为 4。
  • 然后,我们定义了任务数量 numTasks 和任务数据 taskData
  • 当用户点击“开始任务”按钮时,我们循环 numTasks 次,每次调用 threadPool.submitTask(taskData) 方法提交一个任务。
  • threadPool.submitTask(taskData) 方法返回一个 Promise 对象,我们将所有 Promise 对象放入一个数组 promises 中。
  • 然后,我们使用 Promise.all(promises) 方法等待所有 Promise 对象完成。Promise.all() 方法返回一个新的 Promise 对象,该 Promise 对象在所有 Promise 对象都完成时才会完成,并且会将所有 Promise 对象的结果放入一个数组中。
  • 当所有任务都完成后,我们计算总耗时,并将结果显示在页面上。
  • 最后,我们调用 threadPool.terminate() 方法销毁线程池。

优化与改进

上面的代码只是一个简单的 Worker 线程池的实现,还有很多可以优化和改进的地方:

  • 任务优先级: 可以根据任务的优先级来分配任务。
  • Worker 重用: 可以对 Worker 进行更细粒度的管理,例如,可以根据 Worker 的负载情况来分配任务。
  • 错误处理: 可以添加更完善的错误处理机制。
  • 动态调整线程池大小: 可以根据任务的数量和 Worker 的负载情况,动态地调整线程池的大小。
  • 心跳检测: 可以定期检测 Worker 的状态,如果 Worker 出现故障,可以自动重启。
  • 代码模块化: 可以将代码拆分成多个模块,提高代码的可维护性。

表格总结

功能 实现 优点 缺点
Worker 创建 new Worker('worker.js') 创建独立的执行线程,不阻塞主线程。 创建和销毁 Worker 有一定的开销。
任务提交 threadPool.submitTask(taskData) 将任务放入队列,等待 Worker 处理。 需要管理任务队列和 Worker 状态。
任务分配 processTaskQueue() 将任务分配给空闲的 Worker。 需要考虑任务分配的策略,例如优先级、Worker 负载等。
结果返回 worker.postMessage({ taskId: taskId, result: result })resolve(result) Worker 完成任务后,将结果返回给主线程,并通过 Promise 的 resolve 方法通知调用方。 需要确保消息传递的可靠性。
错误处理 worker.onerror 捕获 Worker 内部的错误,并进行相应的处理,例如重启 Worker。 需要考虑各种可能的错误情况,并进行适当的处理。
线程池销毁 threadPool.terminate() 释放 Worker 占用的资源。 必须在不再需要线程池时调用,否则会导致资源泄漏。

幽默一下

Worker 线程池就像一个流水线工厂,每个 Worker 都是一个工人,负责处理特定的任务。老板(主线程)只需要把任务扔进流水线,工人们就会自动完成任务,然后把结果交还给老板。这样,老板就不用自己吭哧吭哧地干活了,可以悠闲地喝着咖啡,等着收钱就行了。

总结

Worker 线程池是 JavaScript 中一个非常有用的工具,可以有效地提高并发任务处理效率。虽然实现起来稍微有点复杂,但是只要掌握了核心概念,就可以轻松地使用它来优化你的代码。

希望今天的讲座对大家有所帮助! 感谢各位观众老爷的耐心观看! 散会!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注