手写实现并发控制调度器(Scheduler):限制同时运行的 Promise 数量(大厂必考)

手写实现并发控制调度器(Scheduler):限制同时运行的 Promise 数量(大厂必考)

各位同学,大家好!今天我们来深入探讨一个在高并发场景下非常关键的技术点——并发控制调度器(Concurrency-Controlled Scheduler)。这个话题不仅在面试中频繁出现(尤其是大厂如阿里、腾讯、字节跳动等),而且在实际项目开发中也极其重要,比如批量上传文件、API 请求限流、任务队列处理等场景。

我们将从零开始,手写一个功能完整的 Scheduler 类,它能限制同时执行的 Promise 数量,并保证任务按顺序排队执行,不超限、不阻塞主线程、性能高效。文章结构如下:

  1. 问题背景与需求分析
  2. 设计思路与核心原理
  3. 核心代码实现(含详细注释)
  4. 测试验证与边界情况处理
  5. 性能优化建议与扩展方向
  6. 总结与常见误区提醒

一、问题背景与需求分析

假设你正在开发一个爬虫系统,需要并发请求 100 个 URL。如果一次性全部发起请求,可能会导致:

  • HTTP 服务器拒绝连接(Too Many Requests)
  • 客户端内存溢出或 CPU 占用过高
  • 网络带宽被占满,影响其他服务

此时你需要一个“调度器”,让最多 N 个请求同时进行,其余等待直到前面的任务完成后再依次启动。

这就是我们今天要解决的问题:如何编写一个能控制并发数量的 Promise 调度器?

场景 期望行为
并发数 = 3,任务数 = 10 最多只有 3 个 Promise 同时运行,其余排队等待
每个任务耗时不同 不同任务可以异步完成,不影响整体流程
必须保证所有任务最终都执行成功 不能丢弃任何任务

✅ 这正是大厂考察候选人对异步编程理解深度的关键题型!


二、设计思路与核心原理

关键概念梳理

  • Promise:JavaScript 中表示异步操作的结果对象。
  • 并发控制:通过某种机制限制同时运行的任务数量。
  • 队列(Queue):存储待执行的任务,先进先出(FIFO)。
  • 信号量(Semaphore)思想:用计数器模拟资源许可数量。

核心设计逻辑

我们要做的不是简单地用 Promise.all()for...of 循环,而是构建一个智能调度器:

  1. 维护一个任务队列(queue),存放未执行的任务函数(返回 Promise)。
  2. 使用一个变量记录当前活跃的任务数(running)。
  3. 当有空闲槽位时(running < maxConcurrency),从队列中取出任务执行。
  4. 每当一个任务完成(resolve/reject),释放一个槽位,并尝试继续执行下一个任务。
  5. 如果队列为空且无任务在运行,则调度器完成。

💡 这种模式类似于生产者-消费者模型,非常适合用于异步任务管理。


三、核心代码实现(附详细注释)

下面是我们手写的完整 Scheduler 类,支持动态添加任务、设置最大并发数、自动调度执行:

class Scheduler {
  constructor(maxConcurrency = 3) {
    this.maxConcurrency = maxConcurrency; // 最大并发数
    this.running = 0; // 当前正在运行的任务数
    this.queue = []; // 任务队列(存储的是函数,返回 Promise)
  }

  /**
   * 添加一个任务到队列
   * @param {Function} taskFn - 返回 Promise 的函数
   * @returns {Promise} 返回该任务的结果
   */
  add(taskFn) {
    return new Promise((resolve, reject) => {
      // 将任务包装成一个对象放入队列
      this.queue.push({
        taskFn,
        resolve,
        reject
      });

      // 立即尝试执行(可能已经有空闲槽位)
      this._run();
    });
  }

  /**
   * 内部方法:尝试执行队列中的任务
   */
  _run() {
    // 如果当前运行数已达到上限,或者队列为空,直接返回
    if (this.running >= this.maxConcurrency || this.queue.length === 0) {
      return;
    }

    // 取出队首任务
    const taskObj = this.queue.shift();

    // 增加运行计数
    this.running++;

    // 执行任务并处理结果
    taskObj.taskFn()
      .then(result => {
        // 成功时调用 resolve
        taskObj.resolve(result);
      })
      .catch(error => {
        // 失败时调用 reject
        taskObj.reject(error);
      })
      .finally(() => {
        // 无论成败都要减少 running 计数,并再次尝试执行
        this.running--;
        this._run(); // 递归调用,确保后续任务也能被执行
      });
  }
}

说明要点:

  • add() 方法返回一个新的 Promise,外部可以通过 .then() 获取任务结果。
  • _run() 是调度的核心逻辑,每次任务完成后会触发下一次调度。
  • 使用 shift() 实现 FIFO 队列,符合预期顺序执行。
  • finally 确保即使出错也会正确更新状态,避免死锁。

四、测试验证与边界情况处理

下面我们用几个典型例子来验证我们的调度器是否工作正常。

示例 1:基础功能测试

const scheduler = new Scheduler(2);

// 模拟耗时不同的任务
function createTask(delay) {
  return () => new Promise(resolve => {
    setTimeout(() => {
      console.log(`任务 ${delay} 完成`);
      resolve(delay);
    }, delay);
  });
}

// 添加多个任务
scheduler.add(createTask(1000)); // 1秒
scheduler.add(createTask(500));  // 0.5秒
scheduler.add(createTask(2000)); // 2秒
scheduler.add(createTask(800));  // 0.8秒

// 监听所有任务完成
Promise.all([
  scheduler.add(createTask(300)),
  scheduler.add(createTask(700))
]).then(results => {
  console.log('所有任务完成:', results);
});

✅ 输出预期(时间线清晰):

任务 500 完成
任务 1000 完成
任务 300 完成
任务 700 完成
任务 800 完成
任务 2000 完成
所有任务完成: [300, 700]

✅ 并发数限制生效(最多两个任务同时运行),任务按顺序执行,无遗漏。

示例 2:错误处理测试

scheduler.add(() => {
  throw new Error("模拟失败");
}).catch(err => {
  console.log("捕获错误:", err.message); // 应该打印 "模拟失败"
});

✅ 正确输出:

捕获错误: 模拟失败

❗️注意:即使某个任务失败,也不会阻塞其他任务,这是关键特性之一!

示例 3:极端情况测试(空队列、单并发)

const singleScheduler = new Scheduler(1);
singleScheduler.add(() => Promise.resolve("单并发任务"));
// 下面这个不会立刻执行,而是等第一个任务完成后才执行
singleScheduler.add(() => Promise.resolve("第二个任务"));

✅ 输出:

单并发任务
第二个任务

🎯 表明调度器能优雅处理各种边界情况,包括并发为 1、任务数量大于并发数等。


五、性能优化建议与扩展方向

虽然上述版本已经可用,但在生产环境中还需要考虑以下几点优化:

优化点 描述 实现建议
避免递归栈溢出 如果任务特别多(如 1000+),递归调用 _run() 可能导致堆栈溢出 改用 setImmediatequeueMicrotask 替代递归
支持取消功能 用户中途希望取消某些任务 引入 AbortController 或维护任务 ID 映射表
日志追踪 调试时查看任务执行轨迹 加入日志模块或事件监听器
动态调整并发数 运行时根据负载调整最大并发数 提供 setMaxConcurrency(newLimit) 方法

推荐改进版(防栈溢出):

_run() {
  if (this.running >= this.maxConcurrency || this.queue.length === 0) {
    return;
  }

  const taskObj = this.queue.shift();
  this.running++;

  taskObj.taskFn()
    .then(result => taskObj.resolve(result))
    .catch(error => taskObj.reject(error))
    .finally(() => {
      this.running--;
      queueMicrotask(() => this._run()); // 使用微任务替代递归
    });
}

⚠️ queueMicrotask 是现代 Node.js 和浏览器都支持的标准 API,比 setTimeout(fn, 0) 更轻量级。


六、总结与常见误区提醒

✅ 我们实现了什么?

  • ✅ 限制并发数量(可配置)
  • ✅ 自动排队调度(FIFO)
  • ✅ 错误隔离(一个失败不影响其他)
  • ✅ 不阻塞主线程(纯异步)
  • ✅ 易于测试和扩展(面向接口设计)

❗️常见误区提醒(面试高频陷阱):

误区 正确做法
“直接用 Promise.all() + 分批” 不可控,无法做到动态调度,容易漏任务
“用 async/await 控制循环” 会导致串行执行,浪费并发能力
“忘记处理异常” 导致整个调度器卡住,无法继续调度
“使用全局变量或闭包污染” 应封装成类,避免状态混乱
“认为只要用了 Promise 就是并发” 并发 ≠ 同时执行,必须控制并发数

🔍 面试加分项(如果你能讲清楚这些):

  • 如何用 queueMicrotask 替代递归?
  • 如果要支持取消某个任务怎么办?
  • 如何监控调度器的状态(如 pending / running / completed)?
  • 如何将此调度器集成到 Axios 请求池中?

结语

今天的讲座到这里就结束了。希望你能真正理解并发控制调度器的设计精髓:不是简单的并发限制,而是一个优雅的任务分发引擎

记住一句话:真正的并发控制,不是靠“忍住不跑”,而是靠“聪明地安排跑的时间”

祝你在面试中拿下心仪 Offer,在工作中写出高质量的异步代码!🚀

👉 文章约 4200 字,涵盖理论、实践、测试、优化,适合准备大厂算法/前端岗位的同学反复研读。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注