手写实现并发控制调度器(Scheduler):限制同时运行的 Promise 数量(大厂必考)

引言:并发控制的艺术与挑战

在现代前端与后端开发中,异步操作无处不在。从网络请求到文件读写,从数据库查询到复杂计算,我们频繁地与需要时间才能完成的任务打交道。JavaScript作为一门天生异步的语言,通过回调函数、事件循环、Promise以及async/await等机制,极大地增强了处理异步任务的能力。

然而,仅仅能够处理异步任务是不够的。当面临大量异步任务需要同时执行时,我们常常会遇到一个核心问题:并发控制。想象一下,您的应用需要同时向服务器请求数百张图片、批量上传上千个文件,或者并行处理数十个耗时的数据转换任务。如果不加以限制,所有这些任务可能会在短时间内同时启动,导致以下问题:

  1. 服务器过载:短时间内接收到大量请求,超出服务器处理能力,可能导致服务变慢甚至崩溃。
  2. 客户端资源耗尽:浏览器或Node.js进程可能因为同时维护大量网络连接、Promise实例和内存占用而变得卡顿甚至崩溃。
  3. API限流:许多第三方服务对API调用有严格的速率限制。无限制的并发请求很可能触发限流机制,导致请求失败。
  4. 用户体验下降:过多的并发操作可能阻塞UI渲染,或使应用响应迟钝。

正因如此,我们需要一种机制来限制同时运行的异步操作的数量,这就是并发控制调度器(Scheduler)的核心职责。它就像一个交通警察,管理着异步任务的“车流”,确保系统稳定、高效地运行。

这个主题在大厂面试中是“必考”的重点,因为它不仅考察你对JavaScript异步编程深度的理解,更考验你对数据结构、算法、系统设计以及资源管理的综合能力。一个优秀的并发调度器,是衡量一个开发者能否构建健壮、高性能应用的关键指标之一。

本讲座将深入探讨如何手写实现这样一个并发控制调度器,从基础概念到高级特性,带你一步步构建一个功能完善、易于扩展的解决方案。

理解核心概念:Promise、并发与调度

在深入实现之前,我们首先需要对几个核心概念有清晰的理解。

JavaScript与事件循环:单线程的并发魔法

JavaScript是单线程的。这意味着在任何给定时间点,JavaScript引擎只能执行一个任务。那么,它是如何实现“并发”的呢?答案是事件循环(Event Loop)

事件循环机制允许JavaScript将耗时的操作(如网络请求、定时器)委托给宿主环境(浏览器或Node.js)的底层线程去执行,而JavaScript主线程则可以继续执行其他任务。当这些耗时操作完成后,它们的回调函数会被放入一个任务队列(Task Queue),等待JavaScript主线程空闲时再取出执行。Promise的微任务队列(Microtask Queue)甚至拥有更高的优先级。

因此,JavaScript的“并发”并非真正的并行(parallelism,多核CPU同时执行多个任务),而是一种并发(concurrency)——通过在不同任务之间快速切换,宏观上看起来像是在同时进行。我们的调度器将利用这一特性来管理异步任务。

Promise:异步操作的优雅封装

Promise是JavaScript中处理异步操作的利器。它代表了一个异步操作的最终完成(或失败)及其结果值。一个Promise有三种状态:

  • Pending(待定):初始状态,既没有成功,也没有失败。
  • Fulfilled(已成功):操作成功完成。
  • Rejected(已失败):操作失败。

Promise通过.then().catch()方法链式调用,使得异步代码更易读、更易维护,避免了“回调地狱”。async/await语法糖更是将Promise的强大能力提升到了新的层次,让异步代码看起来像同步代码一样直观。

我们的调度器将围绕Promise来构建,因为大多数现代JavaScript异步操作都返回或可以封装成Promise。

调度器:资源与任务的智能管家

调度器是一个软件组件,负责管理和协调任务的执行。在并发控制的语境下,它有以下职责:

  1. 任务排队:当任务数量超过最大并发限制时,将多余的任务放入等待队列。
  2. 任务调度:当有资源(即并发槽位)可用时,从等待队列中取出任务并启动执行。
  3. 状态监控:跟踪当前正在运行的任务数量,以及等待队列中的任务数量。
  4. 结果收集与传递:确保每个任务的执行结果(成功或失败)能够正确地返回给调用者。
  5. 整体完成通知:提供机制让调用者知道所有任务何时全部完成。

一个设计良好的调度器,能够有效地平衡系统性能与资源利用,提升应用的健壮性和用户体验。

初探问题:无限制并发的陷阱

在构建调度器之前,我们先来看一个没有并发控制的典型场景,以及它可能带来的问题。

假设我们有一个需要从API获取大量用户数据的任务。每个用户的数据请求都是一个独立的异步操作。

// 模拟一个异步请求用户数据的函数
function fetchUserData(userId) {
    console.log(`[START] Fetching data for user: ${userId}`);
    return new Promise(resolve => {
        const delay = Math.random() * 2000 + 500; // 模拟网络延迟
        setTimeout(() => {
            console.log(`[END] Fetched data for user: ${userId}`);
            resolve({ id: userId, name: `User ${userId}`, data: `Some data for ${userId}` });
        }, delay);
    });
}

// 假设我们有10个用户需要获取数据
const userIds = Array.from({ length: 10 }, (_, i) => i + 1);

console.log("--- 开始无限制并发请求 ---");

// 使用Promise.all无限制地发起所有请求
Promise.all(userIds.map(id => fetchUserData(id)))
    .then(results => {
        console.log("--- 所有用户数据请求完成 (无限制) ---");
        // console.log(results);
    })
    .catch(error => {
        console.error("--- 某些请求失败 (无限制) ---", error);
    });

console.log("--- 请求已全部发起,等待结果 (无限制) ---");

问题分析:

当我们运行上述代码时,你会立即看到所有10个[START]日志几乎同时打印出来,这意味着所有的fetchUserData Promise都在同一时间被创建并开始执行。虽然JavaScript是单线程的,但它会立即将所有这些网络请求(在浏览器中可能是XHR或Fetch API,在Node.js中可能是http模块)发送出去。

如果userIds的数量是1000而不是10,会发生什么?

  • 浏览器环境:浏览器可能会在短时间内打开数百个甚至上千个TCP连接。这会消耗大量的内存和CPU资源,可能导致浏览器卡顿、崩溃,甚至影响其他网站的正常运行。同时,由于浏览器的同源策略和每个域名下的最大连接数限制,过多的并发请求可能会被浏览器内部限制,但其尝试发起的数量仍然是巨大的。
  • Node.js环境:Node.js进程会迅速创建大量网络请求,消耗大量文件描述符和内存。这可能导致进程性能急剧下降,甚至因为资源耗尽而崩溃。
  • 服务器端:服务器会同时处理这1000个请求,如果其处理能力有限,可能会导致响应变慢,甚至拒绝服务(DoS)。
  • API限流:如果fetchUserData是调用一个有速率限制的API,那么很大概率会因为短时间内请求过多而被限流,导致大量请求失败。

显然,这种“无限制并发”的方式在处理大量异步任务时是不可取的。我们需要一个机制来控制同时进行的任务数量。

构建基础调度器:核心逻辑与数据结构

现在,让我们开始构建一个能够限制并发数量的调度器。我们的目标是创建一个Scheduler类,它能够接收任务(Promise或返回Promise的函数),并确保在任何时候,最多只有指定数量的任务正在运行。

设计思路:队列、计数器与循环

一个基本的并发调度器需要以下核心组件和逻辑:

  1. 最大并发数 (maxConcurrency):一个数字,定义了同时运行的最大任务数量。
  2. 任务队列 (queue):一个数组或链表,用于存储等待执行的任务。当正在运行的任务数量达到maxConcurrency时,新任务会被添加到这个队列中。
  3. 运行中任务计数器 (runningCount):一个数字,跟踪当前正在执行的任务数量。
  4. 调度循环 (_runNext):一个内部方法,负责检查是否有空闲的并发槽位,并从队列中取出任务执行。这个方法会在任务完成(成功或失败)后被调用,以尝试启动下一个任务。

调度器组件:maxConcurrencyqueuerunningCount

我们来定义Scheduler类的基本结构。

class Scheduler {
    constructor(maxConcurrency) {
        if (typeof maxConcurrency !== 'number' || maxConcurrency <= 0) {
            throw new Error('maxConcurrency must be a positive number.');
        }
        this.maxConcurrency = maxConcurrency; // 最大并发数
        this.queue = [];                    // 等待执行的任务队列
        this.runningCount = 0;              // 当前正在运行的任务数
        this.totalTasks = 0;                // 记录总共添加了多少任务
        this.completedTasks = 0;            // 记录已经完成的任务数

        // 用于等待所有任务完成的Promise
        this._allTasksPromise = null;
        this._resolveAllTasks = null;
        this._rejectAllTasks = null;
    }

    // 添加一个任务到调度器
    add(taskFn) {
        // taskFn 必须是一个返回 Promise 的函数
        if (typeof taskFn !== 'function') {
            return Promise.reject(new Error('Task must be a function that returns a Promise.'));
        }

        // 创建一个Promise来代表这个任务的执行结果
        return new Promise((resolve, reject) => {
            // 将任务及其对应的resolve/reject函数封装成一个对象,放入队列
            this.queue.push({ taskFn, resolve, reject });
            this.totalTasks++;
            // 尝试运行下一个任务
            this._runNext();
        });
    }

    // 内部方法:尝试运行下一个任务
    _runNext() {
        // 只有当有空闲的并发槽位 并且 队列中有等待任务时才运行
        while (this.runningCount < this.maxConcurrency && this.queue.length > 0) {
            const { taskFn, resolve, reject } = this.queue.shift(); // 从队列头部取出一个任务

            this.runningCount++; // 增加正在运行的任务计数

            // 执行任务函数,并确保它返回一个Promise
            // 使用 Promise.resolve().then() 包装,以防 taskFn 不是严格返回 Promise
            Promise.resolve().then(() => taskFn())
                .then(result => {
                    resolve(result); // 任务成功,解决外部Promise
                })
                .catch(error => {
                    reject(error); // 任务失败,拒绝外部Promise
                })
                .finally(() => {
                    this.runningCount--;     // 任务完成(无论成功失败),减少运行中计数
                    this.completedTasks++;   // 增加已完成任务计数

                    // 检查是否所有任务都已完成
                    this._checkAllTasksCompletion();

                    // 递归调用_runNext,尝试启动下一个任务
                    // 这一步非常关键,它形成了调度循环
                    this._runNext();
                });
        }
    }

    // 检查是否所有任务都已完成,并解决_allTasksPromise
    _checkAllTasksCompletion() {
        if (this.totalTasks > 0 && this.completedTasks === this.totalTasks) {
            if (this._resolveAllTasks) {
                this._resolveAllTasks();
                // 清理,防止重复触发
                this._allTasksPromise = null;
                this._resolveAllTasks = null;
                this._rejectAllTasks = null;
            }
        }
    }

    // 等待所有任务完成
    waitAll() {
        if (!this._allTasksPromise) {
            this._allTasksPromise = new Promise((resolve, reject) => {
                this._resolveAllTasks = resolve;
                this._rejectAllTasks = reject;
            });
            // 如果所有任务已经完成(例如,在调用waitAll之前就已完成),则立即解决
            this._checkAllTasksCompletion();
        }
        return this._allTasksPromise;
    }
}

详细解释调度流程:任务入队、出队、执行、完成与递归调度

  1. constructor(maxConcurrency)

    • 初始化maxConcurrencyqueuerunningCount
    • totalTaskscompletedTasks用于跟踪所有任务的整体完成状态。
    • _allTasksPromise及其相关的_resolveAllTasks, _rejectAllTasks是为了实现waitAll()方法,允许我们等待调度器中所有任务的最终完成。
  2. add(taskFn)

    • 接收一个taskFn,它是一个返回Promise的函数。这是调度器管理的基本任务单元。
    • 立即返回一个Promise。这个Promise代表了taskFn的最终结果。调用者可以通过await scheduler.add(myTaskFn())scheduler.add(myTaskFn()).then(...)来获取单个任务的结果。
    • 创建一个内部Promise,并将其resolvereject函数与taskFn一起封装成一个对象{ taskFn, resolve, reject }。这个对象被推入queue数组。
    • totalTasks增加,记录新添加的任务。
    • 调用_runNext()尝试立即启动任务。
  3. _runNext():这是调度器的核心逻辑。

    • 条件检查while (this.runningCount < this.maxConcurrency && this.queue.length > 0)。这个循环会持续检查:
      • 当前正在运行的任务数量是否小于最大并发数。
      • 等待队列中是否还有任务。
      • 只有这两个条件都满足时,循环才会继续,意味着有空闲槽位且有任务可执行。
    • 任务出队const { taskFn, resolve, reject } = this.queue.shift(); 从队列头部取出一个任务。shift()方法是高效的,因为它直接从数组头部移除元素。
    • 增加计数this.runningCount++; 增加正在运行任务的计数。
    • 执行任务
      • Promise.resolve().then(() => taskFn()):这一步非常重要。它确保taskFn总是被当作一个Promise来处理,即使taskFn本身不是严格返回Promise(例如,它可能直接返回一个值,或者是一个同步函数)。Promise.resolve()将任何值封装成Promise。
      • .then(result => { resolve(result); }):当taskFn成功完成时,调用之前封装的resolve函数,从而解决add方法返回的那个Promise。
      • .catch(error => { reject(error); }):当taskFn失败时,调用之前封装的reject函数,拒绝add方法返回的那个Promise。
    • 任务完成后的清理与调度.finally(() => { ... })块会在任务无论成功或失败后都执行:
      • this.runningCount--;:减少正在运行任务的计数,释放一个并发槽位。
      • this.completedTasks++;:增加已完成任务的计数。
      • this._checkAllTasksCompletion();:检查是否所有任务都已完成,以解决waitAll()的Promise。
      • this._runNext();这是最关键的一步。任务完成后,会立即再次调用_runNext()。这意味着:
        • 如果队列中还有等待任务且有空闲槽位,它将立即启动下一个任务。
        • 如果队列为空或者没有空闲槽位,while循环的条件将不满足,_runNext函数会自然退出。
  4. _checkAllTasksCompletion()

    • 在每次任务完成时被调用。
    • 检查completedTasks是否等于totalTasks。如果相等,并且_allTasksPromise已经被创建,就调用_resolveAllTasks来解决整个调度器的waitAll() Promise。
  5. waitAll()

    • 返回一个Promise,这个Promise会在调度器中所有任务都完成时被解决。
    • 如果_allTasksPromise尚未创建,则创建一个新的Promise,并保存其resolve/reject回调。
    • 立即调用_checkAllTasksCompletion(),以防在waitAll()被调用之前所有任务已经完成。

代码示例:基础调度器运行

让我们使用这个Scheduler来重新运行之前的用户数据请求示例,但这次限制并发数为3。

// 模拟一个异步请求用户数据的函数
function fetchUserData(userId) {
    console.log(`[START] Fetching data for user: ${userId}, Running: ${scheduler.runningCount}`);
    return new Promise(resolve => {
        const delay = Math.random() * 2000 + 500; // 模拟网络延迟
        setTimeout(() => {
            console.log(`[END] Fetched data for user: ${userId}`);
            resolve({ id: userId, name: `User ${userId}`, data: `Some data for ${userId}` });
        }, delay);
    });
}

const userIds = Array.from({ length: 10 }, (_, i) => i + 1);
const MAX_CONCURRENCY = 3;
const scheduler = new Scheduler(MAX_CONCURRENCY);

console.log(`--- 开始并发限制请求 (并发数: ${MAX_CONCURRENCY}) ---`);

const taskPromises = userIds.map(id => {
    // 将 fetchUserData(id) 封装成一个函数,因为 add 方法需要一个函数
    return scheduler.add(() => fetchUserData(id));
});

// 等待所有任务完成
Promise.all(taskPromises)
    .then(results => {
        console.log("--- 所有用户数据请求完成 (并发限制) ---");
        // console.log(results);
    })
    .catch(error => {
        console.error("--- 某些请求失败 (并发限制) ---", error);
    });

// 或者使用调度器自带的 waitAll 方法
// scheduler.waitAll()
//     .then(() => {
//         console.log("--- 所有用户数据请求完成 (通过 scheduler.waitAll) ---");
//     })
//     .catch(error => {
//         console.error("--- 某些请求失败 (通过 scheduler.waitAll) ---", error);
//     });

console.log(`--- 请求已全部添加到调度器,等待结果 (并发数: ${MAX_CONCURRENCY}) ---`);

运行结果分析:

当你运行这段代码时,你会观察到以下现象:

  1. 最开始,只有3个[START]日志被打印出来(因为MAX_CONCURRENCY是3)。
  2. 当其中一个任务完成(打印[END])后,调度器会自动从队列中取出下一个任务并启动它,然后打印其[START]日志。
  3. 这个过程会一直持续,直到所有10个任务都完成。
  4. scheduler.runningCount会实时反映当前正在运行的任务数量,它将始终保持在maxConcurrency或更少。

这表明我们的基础调度器已经成功地实现了并发控制。

完善调度器:错误处理与整体完成

上面的基础调度器已经可以工作,但它还有一些可以改进的地方,使其更加健壮和易用。

add方法返回Promise:跟踪单个任务状态

我们当前add方法已经返回了一个Promise,这允许我们独立地跟踪每个任务的成功或失败。

// 示例:跟踪单个任务
scheduler.add(() => fetchUserData(11))
    .then(data => console.log(`Task 11 completed:`, data.id))
    .catch(error => console.error(`Task 11 failed:`, error));

处理单个任务失败:不影响整体运行

_runNext方法中,我们已经正确地使用了.then().catch().finally(),这意味着如果一个任务taskFn内部抛出错误或返回一个rejected的Promise,它只会拒绝add方法返回的那个Promise,而不会中断整个调度器的运行。其他正在运行或等待的任务将继续它们的生命周期。

等待所有任务完成:waitAll方法的设计与实现

我们已经为waitAll()方法设计了骨架,并通过_allTasksPromise_checkAllTasksCompletion进行管理。

让我们考虑一个场景:如果所有任务都成功,waitAll()应该解决;如果至少有一个任务失败,waitAll()应该拒绝。这与Promise.all的行为相似。

当前的_checkAllTasksCompletion只处理了成功的情况,没有考虑如何将失败的状态传递给waitAll()。为了实现Promise.all的行为,我们需要收集所有任务的结果和错误。但是,如果waitAll()只是简单地等待所有任务完成,而不关心具体是成功还是失败(即类似于Promise.allSettled或只关心Promise.all的成功路径),那么当前实现是没问题的,它在所有任务都finally之后解决。

如果waitAll需要像Promise.all一样,一旦有任务失败,它就立即拒绝,那么我们需要修改_checkAllTasksCompletion_runNext中的逻辑,捕获第一个失败的任务,并拒绝_allTasksPromise

为了保持调度器的通用性,通常waitAll只会等待所有任务最终完成(无论成功或失败),并且如果所有任务都成功,它就解决;如果至少有一个任务失败,它就拒绝。这需要一个数组来保存所有任务的Promise,然后对这个数组调用Promise.all

改进waitAll方法:

与其在调度器内部维护一个复杂的_allTasksPromise来模仿Promise.all,不如让调度器专注于任务的调度,而让调用者负责聚合结果。add方法已经返回了每个任务的Promise。调用者可以收集这些Promise并自己使用Promise.allPromise.allSettled

但是,如果调度器本身需要提供一个等待所有任务完成的入口,那么它需要知道所有任务的Promise。

让我们修改Scheduler,使其能够更自然地与Promise.all结合,或者提供一个更明确的waitAll行为。

方案一:调度器只负责调度,聚合交给外部

这是最简洁的方式。add方法返回任务Promise,外部收集后用Promise.all

// ... Scheduler class as defined before ...

// 外部调用:
const taskPromises = userIds.map(id => {
    return scheduler.add(() => fetchUserData(id));
});

Promise.all(taskPromises) // 外部负责 Promise.all
    .then(results => { /* ... */ })
    .catch(error => { /* ... */ });

这种方式清晰地分离了关注点:调度器负责并发控制,外部负责结果聚合。这是推荐的做法。

方案二:调度器内部实现类似Promise.allwaitAll

如果我们真的需要在Scheduler内部实现一个能像Promise.all一样,一旦有任务失败就立即拒绝的waitAll,我们需要:

  1. add方法中,除了将任务推入队列,还要将返回的Promise推入一个内部数组。
  2. waitAll方法就直接对这个内部数组调用Promise.all

这将使得Scheduler变得稍微复杂,因为它需要维护所有任务的Promise引用。

class Scheduler {
    constructor(maxConcurrency) {
        // ... (同前) ...
        this.allAddedTaskPromises = []; // 存储所有通过add添加的任务的Promise
        this.hasFailedTask = false;     // 标记是否有任务失败
    }

    add(taskFn) {
        if (typeof taskFn !== 'function') {
            return Promise.reject(new Error('Task must be a function that returns a Promise.'));
        }

        const taskPromise = new Promise((resolve, reject) => {
            this.queue.push({ taskFn, resolve, reject });
            this.totalTasks++;
            this._runNext();
        });

        // 将每个任务的Promise添加到列表中
        this.allAddedTaskPromises.push(taskPromise);

        // 监听每个任务的失败,更新hasFailedTask标记
        taskPromise.catch(() => {
            this.hasFailedTask = true;
        });

        return taskPromise;
    }

    _runNext() {
        while (this.runningCount < this.maxConcurrency && this.queue.length > 0) {
            const { taskFn, resolve, reject } = this.queue.shift();

            this.runningCount++;

            Promise.resolve().then(() => taskFn())
                .then(result => {
                    resolve(result);
                })
                .catch(error => {
                    reject(error);
                })
                .finally(() => {
                    this.runningCount--;
                    this.completedTasks++;
                    // 不需要在这里调用 _checkAllTasksCompletion 了,因为 waitAll 将使用 Promise.all
                    this._runNext();
                });
        }
    }

    // 等待所有任务完成,行为类似 Promise.all
    waitAll() {
        // 如果还没有添加任何任务,直接返回一个已解决的Promise
        if (this.allAddedTaskPromises.length === 0) {
            return Promise.resolve([]);
        }
        // 使用 Promise.all 来等待所有任务的完成
        return Promise.all(this.allAddedTaskPromises);
    }
}

这个修改后的SchedulerwaitAll()方法就具有了Promise.all的语义:所有任务都成功则解决,任何一个任务失败则拒绝。

为什么这个版本更好?

  1. 语义清晰waitAll直接映射到Promise.all的功能,易于理解。
  2. 简单实现:调度器不需要自己管理_allTasksPromiseresolve/reject逻辑,而是利用了Promise.all的内置能力。
  3. 健壮性Promise.all本身就考虑了所有任务的成功和失败情况。

使用示例:

// ... (Scheduler 类定义如上) ...

const userIds = Array.from({ length: 10 }, (_, i) => i + 1);
const MAX_CONCURRENCY = 3;
const scheduler = new Scheduler(MAX_CONCURRENCY);

console.log(`--- 开始并发限制请求 (并发数: ${MAX_CONCURRENCY}) ---`);

userIds.forEach(id => {
    scheduler.add(() => fetchUserData(id));
});

scheduler.waitAll()
    .then(results => {
        console.log("--- 所有用户数据请求完成 (通过 scheduler.waitAll) ---");
        // console.log(results); // 包含所有任务的成功结果
    })
    .catch(error => {
        console.error("--- 某些请求失败 (通过 scheduler.waitAll) ---", error);
        // 如果有一个任务失败,这里会捕获到第一个失败任务的错误
    });

console.log(`--- 请求已全部添加到调度器,等待结果 (并发数: ${MAX_CONCURRENCY}) ---`);

// 模拟一个失败的任务
const failingScheduler = new Scheduler(2);
failingScheduler.add(() => Promise.resolve('Success 1'));
failingScheduler.add(() => Promise.reject('Error during task 2'));
failingScheduler.add(() => Promise.resolve('Success 3'));

failingScheduler.waitAll()
    .then(results => console.log('Failing scheduler all tasks success:', results))
    .catch(error => console.error('Failing scheduler encountered an error:', error)); // 会捕获到 'Error during task 2'

通过这个改进,我们的调度器不仅能控制并发,还能方便地等待所有任务的最终完成状态。

高级特性与生产级考量

为了让我们的调度器更加强大和适应生产环境,我们可以考虑添加一些高级特性。

1. 任务优先级

有时,并不是所有任务都具有相同的紧急程度。例如,在一个图像处理队列中,用户上传的图片可能比系统后台自动处理的图片优先级更高。

实现任务优先级可以通过改变queue的数据结构来实现:

  • 多队列:为不同优先级设置独立的队列,_runNext优先从高优先级队列中取任务。
  • 优先级队列(Min-heap/Max-heap):使用堆结构来存储任务,每次_runNext直接取出优先级最高的任务。这需要实现一个堆数据结构,或者利用外部库。
  • 排序队列:每次add任务时,根据优先级插入到队列的正确位置,保持队列有序。或者在_runNext每次shift之前,对queue进行排序(性能开销较大,不推荐)。

基于排序的简单优先级实现(仅作示例,性能一般):

class PriorityScheduler extends Scheduler {
    constructor(maxConcurrency) {
        super(maxConcurrency);
    }

    add(taskFn, priority = 0) { // 增加 priority 参数,数字越大优先级越高
        if (typeof taskFn !== 'function') {
            return Promise.reject(new Error('Task must be a function that returns a Promise.'));
        }

        const taskPromise = new Promise((resolve, reject) => {
            const taskItem = { taskFn, resolve, reject, priority };
            // 根据优先级插入到队列的正确位置
            let inserted = false;
            for (let i = 0; i < this.queue.length; i++) {
                if (taskItem.priority > this.queue[i].priority) {
                    this.queue.splice(i, 0, taskItem); // 插入到当前元素之前
                    inserted = true;
                    break;
                }
            }
            if (!inserted) {
                this.queue.push(taskItem); // 如果优先级最低,则添加到末尾
            }

            this.totalTasks++;
            this._runNext();
        });

        this.allAddedTaskPromises.push(taskPromise);
        taskPromise.catch(() => { this.hasFailedTask = true; });

        return taskPromise;
    }
    // _runNext 和 waitAll 保持不变,因为 _runNext 总是从 queue.shift() 取任务,
    // 而 queue 已经通过 add 方法维护了优先级顺序。
}

// 示例:
const priorityScheduler = new PriorityScheduler(2);
priorityScheduler.add(() => new Promise(res => setTimeout(() => { console.log('Task A (P0)'); res('A'); }, 1000)), 0);
priorityScheduler.add(() => new Promise(res => setTimeout(() => { console.log('Task B (P1)'); res('B'); }, 500)), 1);
priorityScheduler.add(() => new Promise(res => setTimeout(() => { console.log('Task C (P0)'); res('C'); }, 1200)), 0);
priorityScheduler.add(() => new Promise(res => setTimeout(() => { console.log('Task D (P2)'); res('D'); }, 200)), 2); // 最高优先级

priorityScheduler.waitAll().then(results => console.log('Priority tasks completed:', results));
// 预期输出顺序:D, B, A/C (取决于A和C的完成时间)

2. 任务取消

有时用户可能不再需要一个等待中的任务(例如,切换页面后不再需要之前的API请求)。取消一个任务意味着:

  • 如果任务还在队列中等待,将其从队列中移除。
  • 如果任务正在执行,尝试中断它(这通常取决于任务本身是否支持取消,例如AbortController)。

实现取消通常需要为每个任务添加一个唯一的ID,并在add方法返回的Promise上添加一个cancel方法,或者提供一个独立的cancelTask(taskId)方法。

// 简化的取消示例 (只取消队列中的任务)
class CancellableScheduler extends Scheduler {
    constructor(maxConcurrency) {
        super(maxConcurrency);
        this.nextTaskId = 0;
    }

    add(taskFn) {
        const taskId = this.nextTaskId++;
        const taskPromise = new Promise((resolve, reject) => {
            const taskItem = { taskFn, resolve, reject, taskId };
            this.queue.push(taskItem);
            this.totalTasks++;
            this._runNext();
        });

        this.allAddedTaskPromises.push(taskPromise);
        taskPromise.catch(() => { this.hasFailedTask = true; });

        // 增强 Promise,使其支持取消
        const cancellablePromise = Object.assign(taskPromise, {
            cancel: () => {
                const index = this.queue.findIndex(item => item.taskId === taskId);
                if (index !== -1) {
                    this.queue.splice(index, 1);
                    // 标记为取消,不计入完成计数,或计入取消计数
                    this.totalTasks--; // 如果取消,不应算作 totalTasks
                    console.log(`Task ${taskId} cancelled from queue.`);
                    // 拒绝这个Promise,表明它被取消
                    reject(new Error(`Task ${taskId} cancelled.`));
                    return true;
                }
                // 如果任务已在运行或已完成,则无法取消队列中的状态
                return false;
            }
        });
        return cancellablePromise;
    }
}

// 示例
const cancellableScheduler = new CancellableScheduler(1);
const task1 = cancellableScheduler.add(() => new Promise(res => setTimeout(() => { console.log('Task 1 done'); res(1); }, 2000)));
const task2 = cancellableScheduler.add(() => new Promise(res => setTimeout(() => { console.log('Task 2 done'); res(2); }, 1000)));
const task3 = cancellableScheduler.add(() => new Promise(res => setTimeout(() => { console.log('Task 3 done'); res(3); }, 500)));

// task1 正在运行
// task2, task3 在队列中等待

setTimeout(() => {
    task3.cancel(); // 应该能取消 task3
}, 100);

cancellableScheduler.waitAll().then(results => console.log('Cancellable tasks completed:', results))
    .catch(err => console.error('Cancellable tasks error:', err));

3. 任务超时与重试

  • 超时:每个任务可以设置一个最大执行时间。如果任务超过这个时间仍未完成,则自动将其标记为失败。这可以通过Promise.race与一个带有setTimeout的Promise实现。
  • 重试:如果任务失败,调度器可以尝试重新执行它几次。这需要跟踪每个任务的重试次数,并在catch块中根据重试策略决定是重试还是最终标记为失败。
// 任务超时和重试 (只提供思路,不完全实现到Scheduler中,因为它会增加Scheduler的复杂性)

// 任务超时函数
function withTimeout(promiseFn, timeout) {
    return new Promise((resolve, reject) => {
        const timeoutId = setTimeout(() => {
            reject(new Error(`Task timed out after ${timeout}ms`));
        }, timeout);

        promiseFn().then(result => {
            clearTimeout(timeoutId);
            resolve(result);
        }).catch(error => {
            clearTimeout(timeoutId);
            reject(error);
        });
    });
}

// 任务重试函数
function withRetry(promiseFn, retries = 3, delay = 100) {
    return new Promise((resolve, reject) => {
        const attempt = (count) => {
            promiseFn()
                .then(resolve)
                .catch(error => {
                    if (count < retries) {
                        console.warn(`Retrying task (attempt ${count + 1}/${retries})...`);
                        setTimeout(() => attempt(count + 1), delay);
                    } else {
                        reject(new Error(`Task failed after ${retries} retries: ${error.message}`));
                    }
                });
        };
        attempt(0);
    });
}

// 在 Scheduler.add 中使用:
// scheduler.add(() => withRetry(() => withTimeout(() => fetchUserData(id), 5000), 2, 200));

4. 事件通知

调度器可以通过EventEmitter模式,在关键生命周期事件(如任务开始、任务完成、任务失败、调度器空闲等)触发事件,允许外部系统监听和响应这些事件。

const EventEmitter = require('events'); // Node.js
// 或者自定义一个简单的 EventEmitter 类

class EventfulScheduler extends Scheduler {
    constructor(maxConcurrency) {
        super(maxConcurrency);
        this.events = new EventEmitter();
    }

    add(taskFn) {
        // ... (同 Scheduler.add) ...
        const taskPromise = new Promise((resolve, reject) => {
            const taskId = this.nextTaskId++; // 假设有 taskId
            const taskItem = { taskFn, resolve, reject, taskId };
            this.queue.push(taskItem);
            this.totalTasks++;
            this.events.emit('taskAdded', taskId); // 任务添加事件
            this._runNext();
        });
        // ... (同 Scheduler.add) ...
        return taskPromise;
    }

    _runNext() {
        while (this.runningCount < this.maxConcurrency && this.queue.length > 0) {
            const { taskFn, resolve, reject, taskId } = this.queue.shift(); // 假设有 taskId

            this.runningCount++;
            this.events.emit('taskStarted', taskId); // 任务开始事件

            Promise.resolve().then(() => taskFn())
                .then(result => {
                    resolve(result);
                    this.events.emit('taskCompleted', taskId, result); // 任务完成事件
                })
                .catch(error => {
                    reject(error);
                    this.events.emit('taskFailed', taskId, error); // 任务失败事件
                })
                .finally(() => {
                    this.runningCount--;
                    this.completedTasks++;
                    this._runNext();
                    if (this.runningCount === 0 && this.queue.length === 0 && this.completedTasks === this.totalTasks) {
                        this.events.emit('idle'); // 调度器空闲事件
                    }
                });
        }
    }
}

// 示例
const eventfulScheduler = new EventfulScheduler(2);
eventfulScheduler.events.on('taskStarted', taskId => console.log(`Event: Task ${taskId} started.`));
eventfulScheduler.events.on('taskCompleted', (taskId, result) => console.log(`Event: Task ${taskId} completed with result: ${result}`));
eventfulScheduler.events.on('taskFailed', (taskId, error) => console.error(`Event: Task ${taskId} failed with error: ${error.message}`));
eventfulScheduler.events.on('idle', () => console.log('Event: Scheduler is idle.'));

eventfulScheduler.add(() => new Promise(res => setTimeout(() => res('T1'), 1000)));
eventfulScheduler.add(() => new Promise(res => setTimeout(() => res('T2'), 800)));
eventfulScheduler.add(() => new Promise((_, rej) => setTimeout(() => rej(new Error('T3 Error')), 1500)));

5. 灵活的任务类型

目前我们的add方法要求传入一个返回Promise的函数。在实际应用中,我们可能希望更灵活,例如直接传入一个Promise实例,或者一个async函数,甚至一个同步函数。调度器可以内部进行适配。

// 在 add 方法中进行适配
// function add(task) {
//     let taskFn;
//     if (typeof task === 'function') {
//         // 已经是函数,可能是返回Promise的函数或普通函数
//         taskFn = task;
//     } else if (task instanceof Promise) {
//         // 如果直接传入Promise,则包装成一个返回该Promise的函数
//         taskFn = () => task;
//     } else {
//         // 其他类型,例如直接一个值,也包装成返回Promise的函数
//         taskFn = () => Promise.resolve(task);
//     }
//     // ... 后续处理 taskFn ...
// }

对比与选择:Promise.all与并发调度器的适用场景

特性/场景 Promise.all 并发调度器(Scheduler)
并发控制 无限制,所有Promise同时开始执行 有明确限制,由maxConcurrency决定
适用任务量 少量或中等数量的任务(几十个),且资源充足 大量任务(几百、几千个),需要精细控制资源使用
资源消耗 可能导致短时高CPU/内存/网络连接消耗 平稳的资源消耗,避免系统过载
API限流 极易触发API限流 有效避免API限流,可根据API限制调整maxConcurrency
错误处理 任意一个Promise拒绝,Promise.all立即拒绝 单个任务失败不影响其他任务,waitAll行为可配置(类似allallSettled
任务排队 无排队机制,所有任务立即发起 内置排队机制,等待队列中的任务按序执行
任务优先级 不支持 可扩展支持,通过修改队列数据结构实现
任务取消 不支持 可扩展支持,取消等待中的任务或信号中断运行中的任务
复杂性 简单易用,内置于JS标准库 需要手写实现,引入额外的类和逻辑
主要应用场景 同时等待几个不相关联的异步操作完成 批量处理、数据抓取、文件上传/下载、API限流、资源密集型任务

何时选择?

  • 如果您需要同时启动的异步操作数量不多(例如,少于10个),并且这些操作对系统资源消耗不大,那么Promise.all通常是更简洁、更直接的选择。
  • 如果您面对的是数百甚至数千个异步操作,或者这些操作非常耗费资源,或者您需要严格遵守API速率限制,那么一个并发调度器是必不可少的。它能帮助您构建更稳定、更高效、更具容错性的应用。

实战应用场景与面试技巧

并发控制调度器不仅仅是一个技术难题,更是一个在实际开发中广泛应用的模式。

实战应用场景

  1. API限流与批量请求:最经典的场景。当需要从第三方服务获取大量数据时(如爬虫、数据同步),使用调度器可以确保在不违反API速率限制的前提下,高效地完成所有请求。
  2. 文件上传/下载:在批量上传或下载文件时,限制同时进行的上传/下载任务数量,可以避免网络拥堵,提高用户体验,并减少服务器压力。
  3. 图片懒加载与预加载:可以调度图片的加载顺序和数量,优先加载视口内的图片,同时限制并发加载数量,优化页面性能。
  4. 数据抓取与爬虫:这是调度器发挥巨大作用的领域。通过限制并发请求,可以模拟用户行为,避免被目标网站识别为恶意爬虫而被封禁IP。同时,可以结合代理池、重试机制等,构建强大的爬虫系统。
  5. 离线数据同步:在离线应用中,当网络恢复时,可能需要将本地积累的大量数据同步到服务器。调度器可以控制同步任务的数量,保证数据传输的稳定性。
  6. 后台任务处理:在Node.js后端服务中,可能有很多CPU密集型或IO密集型的后台任务需要处理。调度器可以限制同时运行的任务数量,防止服务过载。

面试官视角:考察点深度解析

当大厂面试官提出“实现一个并发控制调度器”时,他们通常会考察以下几个核心能力:

  1. 对JavaScript异步编程的理解

    • 是否熟悉Promise的生命周期、then/catch/finallyasync/await
    • 是否理解事件循环机制,以及JavaScript的单线程模型如何实现并发。
    • 能否正确处理Promise的链式调用和错误冒泡。
  2. 数据结构与算法的应用

    • 能否想到使用队列(数组)来存储等待任务。
    • 如何高效地从队列中添加和移除任务(pushshift)。
    • 如果涉及到优先级,能否考虑优先级队列(堆)或有序数组的实现。
  3. 异常处理与健壮性

    • 单个任务失败是否会影响其他任务的执行(不应影响)。
    • 整个调度器是否能捕获并传递所有任务的错误。
    • 如何处理add方法传入非函数的情况。
    • maxConcurrency为0或负数时的处理(应抛出错误)。
  4. 系统设计与可扩展性

    • 代码结构是否清晰,模块化程度如何。
    • Scheduler类是否职责单一,易于理解和维护。
    • 是否考虑了未来可能的功能扩展(如优先级、取消、重试、事件通知)。
    • waitAll方法的设计是否合理,是等待所有完成还是像Promise.all一样一旦失败就拒绝。
  5. 代码的严谨性与细节

    • 变量命名是否规范,代码风格是否一致。
    • 是否有不必要的内存泄漏(例如,是否及时清理已完成任务的引用)。
    • _runNext中的循环和递归调用是否正确,不会导致栈溢出(在JS事件循环中,异步递归不会导致栈溢出)。

在面试中,除了实现基本功能,主动提出并讨论这些高级特性和边缘情况,将极大地加深面试官对你能力的印象。这表明你不仅能解决问题,还能预见问题、优化方案。

理解并发控制的精髓

并发控制调度器是现代异步编程中不可或缺的工具。它不仅是解决实际应用中资源管理和性能瓶颈的关键,更是衡量一个开发者对异步编程、系统设计和问题解决能力深度的试金石。通过手写实现它,我们不仅掌握了一个强大的模式,更深入理解了JavaScript异步机制的本质和如何构建健壮可扩展的系统。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注