如何实现一个带并发限制的 Promise 调度器:大厂面试高频手写题

并发限制的 Promise 调度器:深度解析与实践

各位编程爱好者、技术同行,大家好!今天我们将深入探讨一个在现代前端和后端开发中都极为重要的话题:如何实现一个带并发限制的 Promise 调度器。这不仅是解决实际工程问题的一把利器,更是大厂面试中衡量候选人对异步编程、并发控制和系统设计理解程度的高频手写题。

我们将以讲座的形式,从基础概念入手,逐步构建一个功能完善、逻辑严谨的并发调度器,并探讨其高级特性、应用场景及面试考点。

1. 引言:为何需要 Promise 调度器与并发限制?

在 JavaScript 异步编程日益普及的今天,Promise 已经成为处理异步操作的标准。然而,当我们需要同时处理大量异步任务时,无限制地启动所有任务可能会带来一系列问题:

  • 资源耗尽: 例如,在浏览器中同时发起数百个网络请求,可能导致浏览器内存占用过高,甚至崩溃;在 Node.js 环境中,过多的并发数据库连接或文件操作会迅速耗尽系统资源。
  • 性能下降: 尽管并发看起来能提高效率,但过高的并发数往往会导致上下文切换开销增大,甚至触发“拥塞”效应,使得整体吞吐量反而下降。
  • API 速率限制: 许多第三方服务对 API 调用有严格的速率限制。如果不加以控制,很容易因超出限制而被封禁 IP 或拒绝服务。
  • 用户体验: 在前端应用中,如果一次性发起大量图片加载请求,可能会阻塞主线程,导致页面卡顿,影响用户体验。

因此,我们需要一个机制来优雅地管理这些异步任务,限制同时运行的任务数量,这就是并发限制的 Promise 调度器的职责。它就像一个交通管理员,确保车辆(任务)能够高效、有序地通过,避免交通堵塞。

2. Promise 基础回顾

在深入调度器实现之前,我们快速回顾一下 Promise 的基本概念和常用的静态方法,这将为我们后续的实现打下基础。

Promise 核心状态:

  • pending (待定): 初始状态,既不是成功也不是失败。
  • fulfilled (已成功): 操作成功完成。
  • rejected (已失败): 操作失败。

一个 Promise 只能从 pending 变为 fulfilledrejected 一次,并且状态改变后不可逆。

Promise 常用方法:

  • new Promise((resolve, reject) => { /* 异步操作 */ }): 构造函数,用于创建 Promise。
  • .then(onFulfilled, onRejected): 注册成功和失败的回调函数。
  • .catch(onRejected): .then(null, onRejected) 的语法糖,专门处理失败。
  • .finally(onFinally): 无论 Promise 成功或失败,都会执行的回调。

Promise 静态方法:

  • Promise.resolve(value): 返回一个已成功状态的 Promise。
  • Promise.reject(reason): 返回一个已失败状态的 Promise。
  • Promise.all(iterable): 等待所有 Promise 都成功才成功,有一个失败则失败。
  • Promise.race(iterable): 只要有一个 Promise 成功或失败,就返回该 Promise 的结果。
  • Promise.allSettled(iterable): 等待所有 Promise 都完成(无论成功或失败),返回包含每个 Promise 状态和结果的对象数组。
  • Promise.any(iterable): 只要有一个 Promise 成功,就返回该 Promise 的结果。如果所有 Promise 都失败,则返回一个 AggregateError

在我们的调度器中,Promise.allSettled 将尤其有用,因为它能帮助我们收集所有任务的最终状态,无论它们成功与否。

3. 无限制并发的问题示例

为了更直观地理解并发限制的必要性,我们来看一个简单的例子。假设我们有一组模拟的网络请求,每个请求都有一定的延迟。

function simulateAsyncTask(id, delay) {
    console.log(`任务 ${id} 开始执行`);
    return new Promise(resolve => {
        setTimeout(() => {
            console.log(`任务 ${id} 执行完成`);
            resolve(`任务 ${id} 的结果`);
        }, delay);
    });
}

const taskFactories = [];
for (let i = 0; i < 100; i++) {
    // 创建100个任务,每个任务延迟随机,模拟真实场景
    taskFactories.push(() => simulateAsyncTask(i, Math.random() * 2000 + 500));
}

// 无限制并发执行
// Promise.all(taskFactories.map(factory => factory()))
//     .then(results => {
//         console.log("所有任务完成,结果:", results);
//     })
//     .catch(error => {
//         console.error("有任务失败:", error);
//     });

// 上述代码会立即启动所有100个任务。
// 如果 simulateAsyncTask 内部是真实的 HTTP 请求,100个同时发出的请求可能会:
// 1. 导致浏览器或Node.js进程的连接池迅速耗尽。
// 2. 触发服务器的速率限制。
// 3. 占用大量内存,尤其是在处理大量数据时。
// 4. 由于网络拥塞,单个请求的响应时间反而变长。

取消注释并运行 Promise.all 的代码,你会看到控制台瞬间打印出大量的“任务 X 开始执行”,这直观地展示了所有任务同时启动的情况。

4. 核心思想:基于计数器的并发控制

并发限制的核心思想并不复杂,它基于一个简单的计数器模型:

  1. 最大并发数 (maxConcurrency): 预设一个上限,表示允许同时运行的任务数量。
  2. 当前运行任务数 (runningTasks): 记录当前正在执行的任务数量。
  3. 任务队列 (taskQueue): 存储等待执行的任务。

调度逻辑:

  • 当一个新任务被添加到调度器时:
    • 如果 runningTasks 小于 maxConcurrency,则立即启动该任务,并将 runningTasks 加一。
    • 如果 runningTasks 已经达到 maxConcurrency,则将该任务放入 taskQueue 等待。
  • 当一个任务执行完成(无论成功或失败)时:
    • runningTasks 减一。
    • 检查 taskQueue 中是否有等待的任务。如果有,取出队首任务并启动它,重复上述检查 runningTasks 的过程。

这个过程就像一个有固定车位的停车场。当有空车位时,新来的车辆可以直接进入;如果没有空车位,车辆就需要在入口处排队等待。当有车辆离开时,排队的车辆才能进入。

5. 实现一个基础的并发 Promise 调度器

现在,我们开始构建我们的 Scheduler 类。

设计思路:

  1. Scheduler 类: 封装并发控制逻辑。
  2. 构造函数: 接收 maxConcurrency 参数。
  3. addTask 方法: 用户通过此方法添加任务。关键点在于,addTask 应该接收一个返回 Promise 的函数(promiseFactory),而不是一个已经执行的 Promise。 这样可以确保任务在被调度器真正选中执行时才启动,实现“懒加载”。同时,addTask 自身也应该返回一个 Promise,以便调用者能获取到它所添加任务的结果。
  4. 内部状态: 维护 maxConcurrencyrunningTaskstaskQueue
  5. _runNextTask 方法 (或 scheduleTasks): 这是一个内部方法,负责从队列中取出任务并执行,同时管理 runningTasks

5.1 骨架搭建

class Scheduler {
    constructor(maxConcurrency) {
        if (typeof maxConcurrency !== 'number' || maxConcurrency <= 0) {
            throw new Error("maxConcurrency must be a positive number.");
        }
        this.maxConcurrency = maxConcurrency; // 最大并发数
        this.runningTasks = 0;             // 当前正在运行的任务数
        this.taskQueue = [];               // 等待执行的任务队列
        this.allTaskPromises = [];         // 存储所有 addTask 返回的 Promise,用于 waitAll
    }

    /**
     * 添加一个任务到调度器。
     * @param {Function} promiseFactory - 一个返回 Promise 的函数。
     * @returns {Promise} - 返回一个 Promise,该 Promise 将在任务完成后解析或拒绝。
     */
    addTask(promiseFactory) {
        // 每一个 addTask 调用都应该返回一个 Promise,代表这个特定的任务
        const taskPromise = new Promise((resolve, reject) => {
            const taskWrapper = {
                promiseFactory, // 任务工厂函数
                resolve,        // 任务 Promise 的 resolve 函数
                reject,         // 任务 Promise 的 reject 函数
            };
            this.taskQueue.push(taskWrapper); // 将任务包装器加入队列
            this._scheduleTasks();            // 尝试调度任务
        });

        this.allTaskPromises.push(taskPromise); // 将此任务的 Promise 存储起来
        return taskPromise;
    }

    /**
     * 内部方法:尝试调度队列中的任务。
     * 当有空闲资源或新任务加入时调用。
     */
    _scheduleTasks() {
        // 只要有空闲资源且任务队列不为空,就不断启动任务
        while (this.runningTasks < this.maxConcurrency && this.taskQueue.length > 0) {
            const { promiseFactory, resolve, reject } = this.taskQueue.shift(); // 取出队首任务
            this.runningTasks++; // 运行任务数加一

            // 执行任务工厂函数,并处理其生命周期
            // 使用 Promise.resolve().then(promiseFactory) 确保 promiseFactory 是异步调用的
            // 并且可以处理 promiseFactory 直接返回非 Promise 值的情况
            Promise.resolve()
                .then(promiseFactory) // 调用任务工厂函数,获取其返回的 Promise
                .then(result => {
                    resolve(result); // 任务成功,解析 addTask 返回的 Promise
                })
                .catch(error => {
                    reject(error); // 任务失败,拒绝 addTask 返回的 Promise
                })
                .finally(() => {
                    this.runningTasks--; // 任务完成,运行任务数减一
                    this._scheduleTasks(); // 递归调用,尝试调度下一个任务
                });
        }
    }

    /**
     * 等待所有已添加的任务完成,无论成功或失败。
     * @returns {Promise<Array<{status: 'fulfilled'|'rejected', value?: any, reason?: any}>>}
     *          返回一个 Promise,解析为所有任务的 Promise.allSettled 风格的结果数组。
     */
    waitAll() {
        // 使用 Promise.allSettled 来等待所有任务完成,并收集它们的结果和状态
        // 这样即使有任务失败,waitAll 也会解析,而不是拒绝
        return Promise.allSettled(this.allTaskPromises);
    }
}

5.2 代码解释与设计考量

  1. constructor(maxConcurrency):

    • 初始化 maxConcurrencyrunningTaskstaskQueue
    • allTaskPromises 数组用于存储 addTask 每次调用时返回的 Promise。这个数组是实现 waitAll 方法的关键,它允许我们像 Promise.allSettled 那样等待所有任务完成,并获取它们的最终状态。
  2. addTask(promiseFactory):

    • 为什么接收 promiseFactory 而不是直接的 Promise? 这是调度器最核心的设计之一。如果 addTask 接收一个已经执行的 Promise,那么该 Promise 会立即开始执行,调度器就无法控制其并发了。通过接收一个函数,我们可以在调度器决定执行它时才调用这个函数,从而真正实现“懒启动”和并发控制。
    • taskWrapper 对象:我们创建了一个内部对象来包装用户的 promiseFactory 以及该任务对应的 resolvereject 函数。这样,当任务被执行时,我们可以通过这些 resolve/reject 函数来通知 addTask 返回的 Promise 它的最终状态。
    • this.allTaskPromises.push(taskPromise): 将每个 addTask 调用返回的 Promise 存入 allTaskPromises 数组,为 waitAll 方法做准备。
    • this._scheduleTasks(): 每当添加一个新任务,我们就立即尝试调度任务,看是否有空闲资源可以启动它。
  3. _scheduleTasks():

    • 循环 while (this.runningTasks < this.maxConcurrency && this.taskQueue.length > 0): 这是并发控制的核心循环。它会不断检查是否有空闲资源 (runningTasks < this.maxConcurrency) 并且任务队列中是否有等待的任务 (taskQueue.length > 0)。满足条件就立即启动任务。
    • this.taskQueue.shift(): 从队列头部取出下一个要执行的任务。
    • this.runningTasks++: 启动任务前,增加正在运行的任务计数。
    • Promise.resolve().then(promiseFactory):
      • promiseFactory() 调用会返回一个 Promise。
      • Promise.resolve() 包装确保即使 promiseFactory 返回的是同步值而非 Promise,也能被 then 链式处理。
      • 此处的 then 链用于处理 promiseFactory 返回的 Promise 的成功或失败。
    • resolve(result) / reject(error): 这些是 addTask 方法中为当前任务创建的 Promise 的 resolvereject 函数。通过它们,我们将实际任务的结果或错误传递给调用者。
    • finally(() => { ... }): 无论任务成功或失败,finally 块都会执行。
      • this.runningTasks--: 任务完成后,减少正在运行的任务计数。
      • this._scheduleTasks(): 递归调用自身。 这是一个非常关键的步骤。当一个任务完成并释放了资源后,我们需要再次尝试调度任务,看看队列中是否有等待的任务可以启动。这个递归调用确保了只要有空闲资源和等待任务,调度器就能持续工作。
  4. waitAll():

    • Promise.allSettled(this.allTaskPromises): 这是实现等待所有任务完成并收集其结果的最佳方式。Promise.allSettled 不会因为某个任务失败而立即拒绝,而是会等待所有任务都 settle(成功或失败),然后返回一个包含每个任务状态和结果的数组。这对于需要处理所有任务结果的场景非常有用。

5.3 示例使用

// 模拟一个异步任务,它可能成功也可能失败
function createMockTask(id, delay, shouldFail = false) {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            if (shouldFail) {
                console.error(`❌ 任务 ${id} 失败`);
                reject(new Error(`Task ${id} failed`));
            } else {
                console.log(`✅ 任务 ${id} 完成`);
                resolve(`Task ${id} result`);
            }
        }, delay);
    });
}

console.log("--- 启动并发调度器示例 ---");

const scheduler = new Scheduler(3); // 设置最大并发数为3

const taskPromises = [];

// 添加10个任务,其中一些会失败
for (let i = 0; i < 10; i++) {
    const delay = Math.random() * 1000 + 500; // 500ms - 1500ms 随机延迟
    const shouldFail = i % 4 === 0; // 每4个任务中有一个失败
    taskPromises.push(scheduler.addTask(() => createMockTask(i, delay, shouldFail)));
}

console.log("所有任务已添加到调度器,等待执行...");

// 等待所有任务完成,并处理最终结果
scheduler.waitAll()
    .then(results => {
        console.log("n--- 所有任务最终状态 ---");
        results.forEach((result, index) => {
            if (result.status === 'fulfilled') {
                console.log(`任务 ${index} 成功:`, result.value);
            } else {
                console.error(`任务 ${index} 失败:`, result.reason.message);
            }
        });
        console.log("调度器所有任务已完成。");
    })
    .catch(error => {
        // waitAll 使用了 allSettled,所以这里通常不会被调用,除非 Promise.allSettled 自身出错
        console.error("waitAll 出现意外错误:", error);
    });

// 也可以单独处理每个任务的结果
// taskPromises[0].then(res => console.log("单独任务0结果:", res)).catch(err => console.error("单独任务0错误:", err.message));

运行上述代码,你会观察到:

  1. 控制台会先打印“所有任务已添加到调度器,等待执行…”。
  2. 然后,任务会一批批地开始执行,每次最多只有3个任务在同时运行。
  3. 当一个任务完成时,调度器会立即从队列中取出下一个等待任务并启动它。
  4. 最终,waitAll 会在所有任务都完成后解析,并打印出每个任务的最终状态(成功或失败)。

这完美地展示了并发限制的效果。

6. 进阶特性与优化

我们已经实现了一个基础且功能完善的并发调度器。但在实际应用中,我们可能需要更多的灵活性和控制能力。

6.1 错误处理策略与重试机制

当前的调度器会简单地将任务的错误传递给 addTask 返回的 Promise。但在某些场景下,我们可能希望有更复杂的错误处理策略。

  • 失败即停止 (Fail-Fast): 当任何一个任务失败时,立即停止所有正在运行和等待的任务。
  • 继续执行: 即使有任务失败,也继续执行其他任务,直到所有任务都完成。这正是我们当前实现所采用的策略,通过 Promise.allSettled 来实现。
  • 重试机制: 对于某些可重试的错误(如网络瞬时故障),我们可能希望任务在失败后自动重试几次。

实现重试机制的思路:

可以在 taskWrapper 中增加重试次数的计数器,并在 _scheduleTasks 中捕获错误时判断是否重试。

// 改造 createMockTask,增加重试逻辑
function createRetryableMockTask(id, delay, attempt = 1, maxAttempts = 3) {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            // 模拟随机失败,但最后一次尝试总是成功
            if (attempt < maxAttempts && Math.random() < 0.7) { // 70% 概率失败,如果不是最后一次尝试
                console.warn(`⚠️ 任务 ${id} 第 ${attempt} 次尝试失败,准备重试...`);
                reject(new Error(`Task ${id} failed on attempt ${attempt}`));
            } else {
                console.log(`✅ 任务 ${id} 第 ${attempt} 次尝试成功!`);
                resolve(`Task ${id} result (attempt ${attempt})`);
            }
        }, delay);
    });
}

// 改造 addTask,支持重试
class SchedulerWithRetry extends Scheduler {
    addTask(promiseFactory, options = {}) {
        const { maxRetries = 0, retryDelay = 100 } = options;
        let currentAttempt = 0;

        const taskPromise = new Promise((resolve, reject) => {
            const executeTask = () => {
                currentAttempt++;
                Promise.resolve()
                    .then(() => promiseFactory(currentAttempt)) // 传入当前尝试次数
                    .then(result => {
                        resolve(result);
                    })
                    .catch(error => {
                        if (currentAttempt <= maxRetries) {
                            console.log(`任务重试中: ${currentAttempt}/${maxRetries}`);
                            setTimeout(() => {
                                this.taskQueue.unshift({ promiseFactory: () => executeTask(), resolve, reject }); // 重新放回队列头部进行重试
                                this._scheduleTasks(); // 立即尝试调度
                            }, retryDelay);
                        } else {
                            reject(error); // 达到最大重试次数,最终失败
                        }
                    })
                    .finally(() => {
                        // 注意:如果任务是重试,runningTasks 不应减少,因为它还在“运行”中
                        // 只有当任务最终成功或失败时才减少
                        if (this.taskQueue.find(t => t.resolve === resolve)) { // 如果任务还在队列中等待重试
                            // 不减少 runningTasks
                        } else {
                             this.runningTasks--; // 任务真正结束,减少 runningTasks
                             this._scheduleTasks(); // 尝试调度下一个任务
                        }
                    });
            };

            const taskWrapper = {
                promiseFactory: executeTask, // 包装后的执行函数
                resolve,
                reject,
                isRetryable: true // 标记为可重试任务
            };
            this.taskQueue.push(taskWrapper);
            this._scheduleTasks();
        });

        this.allTaskPromises.push(taskPromise);
        return taskPromise;
    }

    // _scheduleTasks 还需要微调,确保重试任务不会立即释放运行槽位
    _scheduleTasks() {
        while (this.runningTasks < this.maxConcurrency && this.taskQueue.length > 0) {
            const taskWrapper = this.taskQueue.shift();
            const { promiseFactory, resolve, reject, isRetryable } = taskWrapper;

            this.runningTasks++;

            Promise.resolve()
                .then(promiseFactory)
                .then(result => {
                    resolve(result);
                })
                .catch(error => {
                    // 如果是重试任务,它的 catch 逻辑已经处理了是否重新入队
                    // 如果不是重试任务或者重试已达上限,就直接 reject
                    if (!isRetryable) { // 如果不是由 addTaskWithRetry 添加的,或者重试逻辑已处理
                        reject(error);
                    }
                })
                .finally(() => {
                    // 仅当任务最终完成(成功或失败,且不再重试)时才减少 runningTasks
                    // 这里需要更精细的判断,或者将 runningTasks 的减操作放到 promiseFactory 的 finally 中,
                    // 并在重试时避免执行。
                    // 简化处理:如果任务在 finally 之后没有再次进入队列,则认为其结束。
                    // (更严谨的做法是让 promiseFactory 内部管理 runningTasks 的增减,或者在 taskWrapper 中增加一个状态标志)
                    // 为了保持 _scheduleTasks 简洁,我们假设 promiseFactory 最终会resolve/reject,并且在finally中处理runningTasks的减法
                    // 但重试机制需要更复杂的协调
                    // 这里为了避免大幅改动核心逻辑,我们假设重试逻辑在 promiseFactory 内部完成对 runningTasks 的管理
                    // 或者,更简单但略有偏差的实现是,每次失败都立刻减少 runningTasks,然后重试任务作为新任务重新排队
                    // 但那样会额外占用一个并发槽位,直到重试任务真正开始
                    // 最佳实践:将 runningTasks 减法逻辑完全移入 taskWrapper 的 `finally` 逻辑中,并且只在 `finally` 中
                    // 当任务最终不再重试时,才减少 runningTasks。
                    // 重新设计:将 runningTasks-- 和 _scheduleTasks() 移到 taskPromise resolve/reject 之后
                });
        }
    }
}
// 鉴于重试机制会使 _scheduleTasks 变得复杂,尤其是 `finally` 的执行时机和 `runningTasks` 的管理,
// 更好的做法是将重试逻辑封装在 promiseFactory 内部,或者在 `addTask` 返回的 Promise 的 `.catch` 链中处理。
// 这里我们先跳过在调度器内部直接实现重试的复杂版本,因为它会显著增加代码量和理解难度。
// 我们可以通过在 `addTask` 外部包装一个重试函数来模拟:

function retry(promiseFactory, maxRetries = 3, delay = 100) {
    let attempt = 0;
    return function retryWrapper() {
        attempt++;
        return promiseFactory(attempt).catch(error => {
            if (attempt <= maxRetries) {
                console.warn(`任务重试: 第 ${attempt} 次失败,${delay}ms 后重试...`);
                return new Promise(res => setTimeout(res, delay))
                    .then(retryWrapper); // 递归调用自身进行重试
            } else {
                throw error; // 达到最大重试次数,抛出最终错误
            }
        });
    };
}

// 示例使用重试
// const schedulerWithRetry = new Scheduler(3);
// for (let i = 0; i < 5; i++) {
//     const taskFn = retry(() => createRetryableMockTask(i, 500), 2, 200);
//     schedulerWithRetry.addTask(taskFn);
// }
// schedulerWithRetry.waitAll().then(results => console.log("所有重试任务完成:", results));

说明: 在调度器内部实现重试机制会使 _scheduleTasks 的逻辑变得非常复杂,尤其是 runningTasks 的增减和 finally 块的执行时机。一种更简洁且推荐的方式是将重试逻辑包装在 promiseFactory 外部,或者在任务失败后,由外部逻辑决定是否将其作为新任务重新添加到调度器。上述 retry 函数示例展示了如何在外部实现重试。

6.2 生命周期事件/钩子

在大型应用中,我们可能需要监听调度器的状态变化,例如:

  • onTaskStart(taskId): 任务开始执行时。
  • onTaskComplete(taskId, result): 任务成功完成时。
  • onTaskFail(taskId, error): 任务失败时。
  • onIdle(): 当所有任务都完成,调度器变为空闲状态时。
  • onDrain(): 当任务队列清空,但仍有任务在运行时(即 runningTasks > 0taskQueue.length === 0)。

可以通过 EventEmitter 或回调函数的方式添加这些钩子。

// 简单地通过回调函数实现 onIdle
class SchedulerWithHooks extends Scheduler {
    constructor(maxConcurrency) {
        super(maxConcurrency);
        this.onIdleCallback = null;
    }

    onIdle(callback) {
        this.onIdleCallback = callback;
    }

    _scheduleTasks() {
        super._scheduleTasks(); // 调用父类的调度逻辑

        // 检查是否空闲
        if (this.runningTasks === 0 && this.taskQueue.length === 0 && this.onIdleCallback) {
            this.onIdleCallback();
            this.onIdleCallback = null; // 只触发一次,或者根据需求决定是否重复触发
        }
    }
}

// 示例使用
// const schedulerWithIdle = new SchedulerWithHooks(2);
// schedulerWithIdle.onIdle(() => {
//     console.log("调度器空闲了!所有任务都已完成。");
// });
// schedulerWithIdle.addTask(() => createMockTask(1, 1000));
// schedulerWithIdle.addTask(() => createMockTask(2, 500));
// schedulerWithIdle.addTask(() => createMockTask(3, 1200));
// schedulerWithIdle.waitAll(); // 确保所有任务都完成

6.3 优先级队列

默认情况下,任务是按照 FIFO(先进先出)的顺序执行的。如果某些任务比其他任务更重要,我们可能需要实现一个优先级队列。

实现优先级队列通常需要:

  • addTask 接收一个 priority 参数。
  • taskQueue 不再是简单的数组,而是一个能够按优先级排序的数据结构(如最小堆或插入排序数组)。
  • _scheduleTasks 每次取任务时,总是取出优先级最高的任务。

这是一个相对复杂的改动,超出了基础调度器的范畴,但思路是清晰的。

6.4 可暂停/恢复与取消

  • 暂停 (pause()): 停止调度新任务,但已运行的任务继续执行。
  • 恢复 (resume()): 重新开始调度新任务。
  • 取消 (cancelTask(taskId) / cancelAll()): 停止一个或所有任务。这通常需要任务本身支持取消机制(例如,AbortController)。

实现这些功能会使调度器内部状态管理更加复杂,例如需要一个 isPaused 标志,并且 _scheduleTasksisPausedtrue 时不执行 while 循环。取消任务则需要能够中断正在进行的 Promise,这在 JavaScript 中本身就比较复杂,通常需要任务内部主动检查取消信号。

7. 完整代码示例 (Scheduler V2.0)

结合了 waitAllPromise.allSettled 风格结果收集和更健壮的结构。

class Scheduler {
    /**
     * 创建一个带并发限制的 Promise 调度器。
     * @param {number} maxConcurrency - 最大并发数。必须是正整数。
     */
    constructor(maxConcurrency) {
        if (typeof maxConcurrency !== 'number' || maxConcurrency <= 0) {
            throw new Error("maxConcurrency 必须是一个正整数。");
        }
        this.maxConcurrency = maxConcurrency;   // 最大并发数
        this.runningTasks = 0;                  // 当前正在运行的任务数
        this.taskQueue = [];                    // 等待执行的任务队列
        this.allTaskPromises = [];              // 存储所有 addTask 返回的 Promise,用于 waitAll
        this.isIdle = true;                     // 调度器是否空闲
        this.onIdleCallbacks = [];              // 存储空闲回调函数
        this.isPaused = false;                  // 调度器是否暂停
    }

    /**
     * 添加一个任务到调度器。
     * 任务工厂函数会在调度器有空闲资源时才被调用。
     * @param {Function} promiseFactory - 一个返回 Promise 的函数。
     * @returns {Promise} - 返回一个 Promise,该 Promise 将在任务完成后解析或拒绝。
     */
    addTask(promiseFactory) {
        // 确保 promiseFactory 是一个函数
        if (typeof promiseFactory !== 'function') {
            return Promise.reject(new Error("addTask 期望一个返回 Promise 的函数。"));
        }

        const taskPromise = new Promise((resolve, reject) => {
            const taskWrapper = {
                promiseFactory,
                resolve,
                reject,
            };
            this.taskQueue.push(taskWrapper);
            this.isIdle = false; // 有新任务加入,不再空闲
            this._scheduleTasks(); // 尝试调度任务
        });

        this.allTaskPromises.push(taskPromise);
        return taskPromise;
    }

    /**
     * 内部方法:尝试调度队列中的任务。
     * 当有空闲资源或新任务加入时调用。
     */
    _scheduleTasks() {
        if (this.isPaused) {
            // 如果调度器暂停,则不启动新任务
            return;
        }

        // 只要有空闲资源且任务队列不为空,就不断启动任务
        while (this.runningTasks < this.maxConcurrency && this.taskQueue.length > 0) {
            const { promiseFactory, resolve, reject } = this.taskQueue.shift();
            this.runningTasks++;

            // 执行任务工厂函数,并处理其生命周期
            // 使用 Promise.resolve().then(promiseFactory) 确保 promiseFactory 是异步调用的
            // 并且可以处理 promiseFactory 直接返回非 Promise 值的情况
            Promise.resolve()
                .then(promiseFactory) // 调用任务工厂函数,获取其返回的 Promise
                .then(result => {
                    resolve(result); // 任务成功,解析 addTask 返回的 Promise
                })
                .catch(error => {
                    reject(error); // 任务失败,拒绝 addTask 返回的 Promise
                })
                .finally(() => {
                    this.runningTasks--; // 任务完成,运行任务数减一
                    this._scheduleTasks(); // 递归调用,尝试调度下一个任务
                    this._checkIdleStatus(); // 检查调度器是否变为空闲
                });
        }
    }

    /**
     * 检查调度器是否空闲,并在空闲时触发回调。
     */
    _checkIdleStatus() {
        if (this.runningTasks === 0 && this.taskQueue.length === 0) {
            if (!this.isIdle) { // 避免重复触发
                this.isIdle = true;
                this.onIdleCallbacks.forEach(cb => cb());
                this.onIdleCallbacks = []; // 触发后清空,如果需要每次空闲都触发则不清除
            }
        }
    }

    /**
     * 注册一个回调函数,当调度器所有任务完成且队列为空时触发。
     * @param {Function} callback - 空闲时触发的回调函数。
     */
    onIdle(callback) {
        if (typeof callback === 'function') {
            this.onIdleCallbacks.push(callback);
            // 如果当前已经空闲,立即触发
            if (this.isIdle && this.runningTasks === 0 && this.taskQueue.length === 0) {
                callback();
                this.onIdleCallbacks.pop(); // 立即触发的移除,避免重复
            }
        }
    }

    /**
     * 暂停调度器,不会启动新的任务,但已运行的任务会继续。
     */
    pause() {
        this.isPaused = true;
        console.log("调度器已暂停。");
    }

    /**
     * 恢复调度器,继续启动新的任务。
     */
    resume() {
        this.isPaused = false;
        console.log("调度器已恢复。");
        this._scheduleTasks(); // 恢复后立即尝试调度
    }

    /**
     * 等待所有已添加的任务完成,无论成功或失败。
     * 返回一个 Promise,解析为所有任务的 Promise.allSettled 风格的结果数组。
     * @returns {Promise<Array<{status: 'fulfilled'|'rejected', value?: any, reason?: any}>>}
     */
    waitAll() {
        if (this.allTaskPromises.length === 0) {
            return Promise.resolve([]); // 如果没有添加任何任务,直接返回空数组
        }
        return Promise.allSettled(this.allTaskPromises);
    }
}

// --- 再次使用示例 ---
console.log("n--- 启动调度器 V2.0 示例 ---");
const schedulerV2 = new Scheduler(3); // 最大并发数3

schedulerV2.onIdle(() => {
    console.log("--- 调度器 V2.0 已进入空闲状态!所有任务都已完成。---");
});

const v2TaskPromises = [];
for (let i = 0; i < 10; i++) {
    const delay = Math.random() * 1000 + 300;
    const shouldFail = i % 5 === 0; // 每5个任务有一个失败
    v2TaskPromises.push(schedulerV2.addTask(() => createMockTask(i, delay, shouldFail)));
}

console.log("所有任务已添加到调度器 V2.0,等待执行...");

// 模拟在任务执行过程中暂停和恢复
setTimeout(() => {
    schedulerV2.pause();
    console.log("调度器暂停 2 秒...");
}, 2500);

setTimeout(() => {
    schedulerV2.resume();
    console.log("调度器恢复...");
}, 4500);

schedulerV2.waitAll()
    .then(results => {
        console.log("n--- 调度器 V2.0 所有任务最终状态 ---");
        results.forEach((result, index) => {
            if (result.status === 'fulfilled') {
                console.log(`任务 ${index} 成功:`, result.value);
            } else {
                console.error(`任务 ${index} 失败:`, result.reason.message);
            }
        });
        console.log("调度器 V2.0 的 waitAll 已完成。");
    })
    .catch(error => {
        console.error("waitAll 出现意外错误:", error);
    });

// 辅助函数 (与之前相同)
function createMockTask(id, delay, shouldFail = false) {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            if (shouldFail) {
                // console.error(`❌ 任务 ${id} 失败`);
                reject(new Error(`Task ${id} failed`));
            } else {
                // console.log(`✅ 任务 ${id} 完成`);
                resolve(`Task ${id} result`);
            }
        }, delay);
    });
}

8. 应用场景

并发限制的 Promise 调度器在多种场景下都非常有用:

  • 前端 Web 应用:
    • 图片懒加载/预加载: 限制同时加载的图片数量,避免阻塞网络和渲染。
    • 批量文件上传: 限制同时上传的文件数量,防止服务器压力过大。
    • 批量 API 请求: 对后端 API 进行批量数据更新或查询时,控制请求并发数,避免触发后端限流或过载。
    • 数据同步: 当需要从多个数据源同步数据时,控制并发以优化性能和资源使用。
  • Node.js 后端服务:
    • 网络爬虫: 限制同时发起的 HTTP 请求,遵守网站爬取规则,避免被封禁 IP。
    • 文件处理: 批量读取、写入或转换文件时,控制并发数,避免 I/O 瓶颈。
    • 数据库批量操作: 对数据库进行批量插入、更新或查询时,控制并发连接数,保护数据库。
    • 消息队列消费者: 当从消息队列中消费消息并进行处理时,限制并发处理能力。
  • 桌面应用 (Electron):
    • 本地文件操作: 类似 Node.js,控制本地文件系统操作的并发。
    • 后台数据处理: 执行耗时计算或数据转换时,避免阻塞 UI 线程。

9. 对比其他并发控制方法

特性/方法 Promise.all / Promise.allSettled async/await 循环 Scheduler (本文实现) 第三方库 (如 p-limit)
并发限制 无限制 (所有 Promise 立即启动) 默认串行 可配置的限制 可配置的限制
任务执行时机 立即执行 逐个执行 根据调度器空闲情况执行 根据调度器空闲情况执行
错误处理 Promise.all 失败即止,allSettled 收集所有 try/catch 逐个处理 waitAll 类似 allSettled 库提供不同策略
暂停/恢复 不支持 不支持 支持 部分支持
优先级 不支持 不支持 可扩展支持 部分支持
代码复杂性 简单 简单 中等 低 (开箱即用)
适用场景 少量且无并发冲突任务 简单顺序任务 需要精细并发控制的场景 快速实现并发控制

Promise.allasync/await 适用于相对简单或少量异步操作,当需要更精细的并发控制、错误处理和生命周期管理时,自定义调度器或使用第三方库是更好的选择。

10. 高频面试考点分析

这个题目之所以高频,因为它考察了面试者对多个核心概念的理解和应用:

  1. Promise 基础: 对 Promise 的状态、链式调用、静态方法的理解。
  2. 异步编程和事件循环: 了解 Promise 的执行机制,setTimeout 等宏任务/微任务。
  3. 并发控制原理: 如何通过计数器和队列实现并发限制。
  4. 面向对象设计: 如何设计 Scheduler 类,封装状态和行为。
  5. 懒加载机制: 为什么要接收 promiseFactory 而不是直接的 Promise。这是区分普通实现和优秀实现的关键点。
  6. 错误处理: 如何处理单个任务的错误,以及如何汇总所有任务的结果(Promise.allSettled 的应用)。
  7. 边缘情况考虑: 空队列、maxConcurrency 为1、所有任务都失败/成功等。
  8. 代码健壮性: 对输入参数的校验,对异步操作的容错。
  9. 可扩展性: 如果要增加优先级、重试、暂停/恢复等功能,如何设计。

在面试中,通常会从最简单的并发控制开始,然后逐步引导你添加更多高级功能,观察你如何迭代和优化设计。

11. 扩展思考

  • 背压 (Backpressure): 当任务生产者生成任务的速度远快于调度器处理任务的速度时,如何通知生产者减慢速度。这在流处理或消息队列集成中尤为重要。
  • 分布式调度: 在微服务架构中,如果任务需要在多台服务器上分布式执行,如何实现一个跨进程的调度器。这通常涉及消息队列、共享状态存储和分布式锁。
  • 与 Reactive Programming (如 RxJS) 结合: 将调度器与响应式编程库结合,可以构建更强大、更灵活的异步数据流处理管道。
  • 资源池管理: 例如数据库连接池、线程池等,调度器可以作为这些资源池的上层抽象,管理对底层资源的访问。

12. 结语

一个带并发限制的 Promise 调度器,不仅仅是一个技术题目,它更是我们在实际开发中管理异步操作、优化资源利用和提升系统稳定性的重要工具。通过今天的深入探讨和代码实践,希望大家不仅掌握了它的实现细节,更能理解其背后的设计哲学和应用价值。在未来的编程实践和面试挑战中,愿你能够游刃有余,构建出更加健壮、高效的异步系统。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注