手写实现一个支持多 Promise 并发限制的调度器:基于信号量(Semaphore)的算法控制

各位同仁,各位技术爱好者,大家好。

在现代异步编程中,我们经常会遇到这样的场景:需要处理大量的异步任务,例如并发请求多个API接口、批量上传文件、处理图片或视频、执行数据抓取任务等等。这些任务虽然是异步的,但它们并非总是可以无限制地并发执行。过度的并发可能导致资源耗尽(如内存、CPU)、网络拥堵、API限流,甚至服务崩溃。此时,我们需要一种机制来有效地调度这些异步任务,限制其并发数量,以确保系统的稳定性和资源的合理利用。

今天,我们将深入探讨如何手写实现一个支持多 Promise 并发限制的调度器,并以信号量(Semaphore)作为核心的算法控制机制。我们将从信号量的基本概念出发,逐步构建我们的调度器,并详细解析其内部运作原理。

1. 异步任务与并发控制的必要性

JavaScript,作为一门单线程语言,其并发模型主要依赖于事件循环(Event Loop)和异步非阻塞I/O。这意味着虽然JavaScript引擎本身一次只能执行一个操作,但它可以发起多个异步操作(如网络请求、定时器、文件读写),并在这些操作完成后,将相应的回调函数放入任务队列,等待事件循环执行。

Promise 是 JavaScript 中处理异步操作的强大工具,它代表一个异步操作的最终完成(或失败)及其结果值。async/await 语法糖进一步简化了 Promise 的使用,使异步代码看起来更像是同步代码,提高了可读性。

然而,当我们需要处理成百上千个 Promise 任务时,简单地使用 Promise.all 或直接启动所有任务,可能会带来以下问题:

  1. 资源耗尽:例如,同时打开过多的网络连接,可能耗尽操作系统的文件描述符或内存。
  2. API限流:许多第三方API都有严格的请求速率限制。如果短时间内发送大量请求,可能会被服务器拒绝服务。
  3. 性能下降:过多的并发任务切换上下文,反而可能导致整体性能下降,而不是提升。
  4. 系统不稳定:高负载可能使服务变得不稳定,甚至崩溃。

因此,我们需要一种智能的调度器,能够接收任意数量的异步任务,但只允许其中一部分任务同时运行,当有任务完成时,再从等待队列中取出新的任务启动,从而实现对并发度的精确控制。

2. 信号量(Semaphore)原理详解

在并发编程领域,信号量(Semaphore)是一种非常经典的同步原语,由荷兰计算机科学家 Edsger W. Dijkstra 于1960年代提出。它是一个计数器,用于控制对共享资源的访问。

2.1 信号量的核心思想

想象一个停车场,它有固定数量的停车位。当一辆车进入停车场时,它会占用一个停车位;当一辆车离开时,它会释放一个停车位。如果所有停车位都被占满了,新的车辆就必须在入口处等待,直到有车辆离开。

信号量的工作原理与此类似。它维护一个内部计数器,代表可用资源的数量。

2.2 信号量的关键操作

信号量主要提供两个原子操作:

  1. acquire (P 操作或 wait 操作)
    • 尝试获取一个资源(或一个许可)。
    • 如果计数器大于零,则将其减一,并允许当前操作继续执行。
    • 如果计数器等于零,则表示没有可用资源,当前操作必须阻塞(或等待),直到有资源被释放。
  2. release (V 操作或 signal 操作)
    • 释放一个资源(或一个许可)。
    • 将计数器加一。
    • 如果当前有其他操作因等待资源而阻塞,则选择其中一个使其解除阻塞,允许其获取资源并继续执行。

2.3 信号量的分类

  • 二值信号量(Binary Semaphore):计数器只能是0或1,常用于实现互斥锁(Mutex),确保只有一个线程/进程能访问临界区。
  • 计数信号量(Counting Semaphore):计数器可以是一个任意的非负整数,用于控制对具有N个相同资源的访问。这正是我们实现并发限制调度器所需的类型。

在我们的 Promise 调度器中,计数信号量将用于限制同时运行的 Promise 任务数量。每一个正在运行的 Promise 任务会“占用”一个许可(acquire),任务完成后会“释放”一个许可(release)。

3. 设计我们的 Semaphore

为了使我们的调度器模块化且易于理解,我们首先实现一个独立的 Semaphore 类。这个类将封装信号量的核心逻辑。

3.1 Semaphore 类结构设计

我们的 Semaphore 类需要包含以下几个关键部分:

  • _capacity: 信号量的最大容量,即允许同时持有的许可数量。
  • _currentCount: 当前可用的许可数量。初始时等于 _capacity
  • _waiters: 一个队列,用于存储那些因为没有可用许可而被阻塞的 Promise 的 resolve 函数。当许可被释放时,我们会从这个队列中取出最前面的 resolve 函数并调用它,从而解除一个等待者的阻塞状态。

3.2 Semaphore 类的代码实现

/**
 * @class Semaphore
 * @description 计数信号量实现,用于控制并发访问。
 */
class Semaphore {
    /**
     * 创建一个 Semaphore 实例。
     * @param {number} capacity - 信号量的最大容量,即允许同时持有的许可数量。
     *                          必须是非负整数。
     * @throws {Error} 如果 capacity 小于 0 或不是整数。
     */
    constructor(capacity) {
        if (typeof capacity !== 'number' || capacity < 0 || !Number.isInteger(capacity)) {
            throw new Error('Semaphore capacity must be a non-negative integer.');
        }

        this._capacity = capacity;
        this._currentCount = capacity; // 当前可用的许可数量,初始时等于最大容量
        this._waiters = []; // 等待队列,存储因无法获取许可而阻塞的Promise的resolve函数
    }

    /**
     * 尝试获取一个许可。
     * 如果当前有可用许可,则立即获取并返回一个已解决的Promise。
     * 如果没有可用许可,则当前操作将阻塞(通过返回一个未解决的Promise),
     * 直到有许可被释放并分配给它。
     * @returns {Promise<void>} 一个Promise,当成功获取到许可时解决。
     */
    acquire() {
        // 如果当前有可用许可
        if (this._currentCount > 0) {
            this._currentCount--; // 减少可用许可数量
            // 立即返回一个已解决的Promise,表示成功获取
            return Promise.resolve();
        } else {
            // 如果没有可用许可,则创建一个新的Promise,并将其resolve函数加入等待队列
            // 当许可被释放时,这个Promise会被解决
            return new Promise(resolve => {
                this._waiters.push(resolve);
            });
        }
    }

    /**
     * 释放一个许可。
     * 这会增加可用许可的数量。如果等待队列中有等待者,
     * 则会从队列中取出第一个等待者的Promise并解决它,从而允许它继续执行。
     */
    release() {
        // 增加可用许可数量,但不能超过最大容量
        if (this._currentCount < this._capacity) {
            this._currentCount++;
        }

        // 如果等待队列中有等待者,并且当前有可用许可(虽然我们刚刚增加了,但这是为了确保逻辑严谨性)
        // 实际上,只要有等待者,且_currentCount > 0,就应该唤醒
        if (this._waiters.length > 0 && this._currentCount > 0) {
            const resolve = this._waiters.shift(); // 取出最早的等待者
            resolve(); // 解决其Promise,使其解除阻塞
        }
    }

    /**
     * 获取当前可用的许可数量。
     * @returns {number}
     */
    get currentCount() {
        return this._currentCount;
    }

    /**
     * 获取信号量的最大容量。
     * @returns {number}
     */
    get capacity() {
        return this._capacity;
    }

    /**
     * 获取当前等待的Promise数量。
     * @returns {number}
     */
    get pendingWaiters() {
        return this._waiters.length;
    }
}

3.3 Semaphore 类代码解析

  1. constructor(capacity):

    • 初始化 _capacity 为传入的并发限制数。
    • _currentCount 初始化为 _capacity,表示初始时所有许可都可用。
    • _waiters 是一个数组,用于存储 acquire 调用时因无许可而阻塞的 Promise 的 resolve 函数。
    • 进行了严格的参数类型和值校验,确保 capacity 是一个非负整数。
  2. acquire():

    • 这是获取许可的核心方法。
    • 如果 _currentCount > 0,说明有可用许可,则 _currentCount 减一,并立即返回一个 Promise.resolve()。这意味着调用者可以立即执行后续操作。
    • 如果 _currentCount 为0,说明没有可用许可。此时,我们创建一个新的 Promise,并将其 resolve 函数推入 _waiters 队列。这个 Promise 会在后续 release 被调用时解决,从而解除当前 acquire 调用的阻塞。
  3. release():

    • 这是释放许可的核心方法。
    • _currentCount 加一,但会检查是否超过 _capacity,以防止不合理的释放导致计数器溢出。
    • 如果 _waiters 队列不为空,并且 _currentCount 大于0(即至少有一个许可可用),则从 _waiters 队列中取出最前面的 resolve 函数并调用它。这会解决之前被阻塞的 acquire 调用所返回的 Promise,允许一个等待中的任务继续执行。

通过这个 Semaphore 类,我们有了一个强大的工具来控制任意数量的“东西”的并发访问。接下来,我们将它集成到我们的 Promise 调度器中。

4. 设计 PromiseScheduler

现在我们有了信号量,可以开始构建我们的 PromiseScheduler 了。这个调度器将负责接收 Promise 生成函数,并将它们排队,然后利用信号量按顺序执行它们,同时遵守并发限制。

4.1 PromiseScheduler 类结构设计

我们的 PromiseScheduler 需要实现以下功能:

  • 初始化:设置最大并发数。
  • 添加任务:接收一个函数,该函数在执行时返回一个 Promise。
  • 启动调度:开始处理等待队列中的任务。
  • 任务管理:内部维护一个任务队列和当前正在运行的任务计数。

为了实现这些功能,PromiseScheduler 类将包含以下成员:

  • _semaphore: 一个 Semaphore 实例,用于控制实际的并发限制。
  • _taskQueue: 一个数组,存储待执行的任务函数。每个任务函数都应该返回一个 Promise。
  • _runningTasks: 一个 Set 或 Map,用于跟踪当前正在运行的 Promise 实例,以便我们可以知道它们何时完成。
  • _isStarted: 一个布尔值,指示调度器是否已经开始运行。
  • _results: 一个数组,用于收集所有任务的结果。
  • _onIdleCallbacks: 一个数组,存储当所有任务完成时需要调用的回调函数。

4.2 PromiseScheduler 类的代码实现

/**
 * @class PromiseScheduler
 * @description 支持并发限制的 Promise 调度器。
 *              使用 Semaphore 来控制同时运行的 Promise 数量。
 */
class PromiseScheduler {
    /**
     * 创建一个 PromiseScheduler 实例。
     * @param {number} maxConcurrency - 最大并发数,即同时运行的 Promise 数量上限。
     * @throws {Error} 如果 maxConcurrency 小于 1 或不是整数。
     */
    constructor(maxConcurrency) {
        if (typeof maxConcurrency !== 'number' || maxConcurrency < 1 || !Number.isInteger(maxConcurrency)) {
            throw new Error('maxConcurrency must be a positive integer.');
        }

        this._semaphore = new Semaphore(maxConcurrency); // 核心:使用信号量控制并发
        this._taskQueue = []; // 任务队列,存储待执行的 Promise 生成函数
        this._runningTasks = new Set(); // 存储当前正在运行的 Promise 实例
        this._isStarted = false; // 调度器是否已启动
        this._results = []; // 存储所有任务的结果
        this._onIdleCallbacks = []; // 当调度器空闲时(所有任务完成)调用的回调函数
        this._totalTasksAdded = 0; // 记录总共添加了多少任务
        this._totalTasksCompleted = 0; // 记录总共完成了多少任务
    }

    /**
     * 添加一个 Promise 生成函数到调度器。
     * 这个函数应该返回一个 Promise。
     * @param {Function} taskFn - 一个无参数函数,调用时返回一个 Promise。
     * @returns {Promise<any>} 一个 Promise,当任务完成时解决,返回任务的结果。
     */
    addTask(taskFn) {
        if (typeof taskFn !== 'function') {
            throw new Error('Task must be a function that returns a Promise.');
        }

        this._totalTasksAdded++;
        // 返回一个新的 Promise,它的解决/拒绝状态将与实际任务Promise的解决/拒绝状态同步。
        // 这样,外部调用者可以 await addTask() 并获得最终结果。
        return new Promise((resolve, reject) => {
            // 将包含原始任务函数和其对应的 resolve/reject 的对象放入队列
            this._taskQueue.push({ taskFn, resolve, reject, id: this._totalTasksAdded });
            // 如果调度器已经启动,尝试立即处理任务。
            // 否则,任务会在 start() 被调用时处理。
            if (this._isStarted) {
                this._processNextTask();
            }
        });
    }

    /**
     * 启动调度器。
     * 如果调度器已经启动,则不执行任何操作。
     * @returns {Promise<any[]>} 一个Promise,当所有任务完成时解决,返回所有任务的结果数组。
     */
    start() {
        if (this._isStarted) {
            console.warn('Scheduler already started.');
            return Promise.resolve(this._results); // 或者返回一个表示已完成的Promise
        }
        this._isStarted = true;

        // 如果任务队列不为空,则开始处理任务
        if (this._taskQueue.length > 0) {
            // 尝试填充到最大并发数
            for (let i = 0; i < this._semaphore.capacity; i++) {
                this._processNextTask();
            }
        }

        // 返回一个Promise,当所有任务完成且调度器空闲时解决
        return new Promise(resolve => {
            this._onIdleCallbacks.push(resolve);
            // 如果所有任务已经完成,立即解决
            if (this._totalTasksAdded === this._totalTasksCompleted && this._runningTasks.size === 0) {
                this._notifyIdle();
            }
        });
    }

    /**
     * 暂停调度器。
     * 已经运行的任务会继续运行直到完成,但不会再从队列中取出新任务。
     */
    pause() {
        this._isStarted = false;
        console.log('Scheduler paused. No new tasks will be picked from queue.');
    }

    /**
     * 停止调度器。
     * 暂停并清空任务队列。已经运行的任务会继续运行直到完成。
     * @returns {Promise<void>} 一个Promise,当所有运行中的任务完成时解决。
     */
    async stop() {
        this.pause(); // 暂停新任务的调度
        this._taskQueue = []; // 清空待处理任务队列
        console.log('Scheduler stopped. Task queue cleared.');

        // 等待所有正在运行的任务完成
        if (this._runningTasks.size > 0) {
            console.log(`Waiting for ${this._runningTasks.size} running tasks to complete...`);
            await Promise.allSettled(Array.from(this._runningTasks));
        }
        console.log('All running tasks completed after stop.');
        this._notifyIdle(); // 即使停止了,也需要通知可能在等待的 start() Promise
    }

    /**
     * 内部方法:尝试处理下一个任务。
     * 只有当调度器已启动且任务队列不为空时才执行。
     */
    async _processNextTask() {
        if (!this._isStarted || this._taskQueue.length === 0) {
            return;
        }

        // 尝试获取信号量许可。如果没有可用许可,此 await 会阻塞。
        await this._semaphore.acquire();

        // 成功获取许可后,从队列中取出一个任务
        const taskEntry = this._taskQueue.shift();
        if (!taskEntry) { // 理论上不会发生,因为前面检查了队列长度
            this._semaphore.release(); // 释放刚刚获取的许可
            return;
        }

        const { taskFn, resolve, reject, id } = taskEntry;
        let taskPromise;

        try {
            console.log(`[Task ${id}] Acquiring permit, current running: ${this._runningTasks.size}, queue: ${this._taskQueue.length}, semaphore available: ${this._semaphore.currentCount}`);
            taskPromise = taskFn(); // 执行任务函数,获取实际的 Promise
            if (!(taskPromise instanceof Promise)) {
                throw new Error('Task function must return a Promise.');
            }
            this._runningTasks.add(taskPromise); // 将实际的 Promise 加入运行中集合
            console.log(`[Task ${id}] Started. Running tasks: ${this._runningTasks.size}`);

            const result = await taskPromise; // 等待任务完成
            this._results.push({ id, status: 'fulfilled', value: result });
            resolve(result); // 解决 addTask 返回的 Promise
        } catch (error) {
            this._results.push({ id, status: 'rejected', reason: error });
            reject(error); // 拒绝 addTask 返回的 Promise
        } finally {
            if (taskPromise) {
                this._runningTasks.delete(taskPromise); // 从运行中集合移除
            }
            this._semaphore.release(); // 释放信号量许可
            this._totalTasksCompleted++;
            console.log(`[Task ${id}] Finished. Running tasks: ${this._runningTasks.size}, semaphore available: ${this._semaphore.currentCount}, completed: ${this._totalTasksCompleted}/${this._totalTasksAdded}`);

            // 任务完成后,立即尝试处理下一个任务,以保持并发度
            this._processNextTask();

            // 检查是否所有任务都已完成
            if (this._totalTasksAdded === this._totalTasksCompleted && this._runningTasks.size === 0) {
                this._notifyIdle();
            }
        }
    }

    /**
     * 内部方法:通知所有等待调度器空闲的回调。
     */
    _notifyIdle() {
        if (this._onIdleCallbacks.length > 0) {
            console.log('Scheduler is idle. Notifying waiting Promises.');
            // 解决所有等待 start() 完成的 Promise
            while (this._onIdleCallbacks.length > 0) {
                const resolve = this._onIdleCallbacks.shift();
                resolve(this._results); // 传递收集到的所有结果
            }
        }
    }

    /**
     * 获取当前正在运行的任务数量。
     * @returns {number}
     */
    get runningCount() {
        return this._runningTasks.size;
    }

    /**
     * 获取当前等待队列中的任务数量。
     * @returns {number}
     */
    get pendingTaskCount() {
        return this._taskQueue.length;
    }

    /**
     * 获取总共添加的任务数量。
     * @returns {number}
     */
    get totalTasksAdded() {
        return this._totalTasksAdded;
    }

    /**
     * 获取总共完成的任务数量。
     * @returns {number}
     */
    get totalTasksCompleted() {
        return this._totalTasksCompleted;
    }
}

4.3 PromiseScheduler 类代码解析

  1. constructor(maxConcurrency):

    • 初始化 _semaphore 实例,将 maxConcurrency 传递给它。这是并发控制的核心。
    • _taskQueue 存储待处理的任务,每个元素是一个包含 taskFn(返回 Promise 的函数)以及其对应 resolvereject 函数的对象。
    • _runningTasks 是一个 Set,用于跟踪当前正在执行的 Promise 实例,方便在任务完成时移除。
    • _isStarted 标志调度器是否处于运行状态。
    • _results 收集所有任务的最终结果。
    • _onIdleCallbacks 用于存储当所有任务完成时需要通知的回调(主要是 start() 方法返回的 Promise 的 resolve)。
    • _totalTasksAdded_totalTasksCompleted 用于统计,也作为判断调度器是否空闲的依据。
  2. addTask(taskFn):

    • 接收一个 taskFn 函数,它必须返回一个 Promise。
    • 返回一个新的 Promise。这个 Promise 是外部调用者 await scheduler.addTask(...) 的结果。
    • taskFn 和这个新 Promise 的 resolve/reject 函数一起封装成一个对象,推入 _taskQueue
    • 如果调度器已启动,立即调用 _processNextTask() 尝试处理。
  3. start():

    • _isStarted 设置为 true
    • 如果任务队列不为空,循环调用 _processNextTask(),尝试立即填充到 maxConcurrency 个任务。
    • 返回一个 Promise,这个 Promise 将在所有任务完成后解决。其 resolve 函数被添加到 _onIdleCallbacks
  4. pause() / stop():

    • pause() 简单地将 _isStarted 设置为 false,阻止 _processNextTask 从队列中取出新任务。已经运行的任务会继续。
    • stop() 会先 pause(),然后清空 _taskQueue,并 await Promise.allSettled(Array.from(this._runningTasks)),等待所有当前正在运行的任务完成。
  5. _processNextTask() (核心调度逻辑):

    • 检查条件:确保调度器已启动且任务队列不为空。
    • await this._semaphore.acquire():这是关键!它尝试获取一个许可。
      • 如果 _semaphore 有可用许可 (_currentCount > 0),则 _currentCount 减一,并立即继续执行。
      • 如果 _semaphore 没有可用许可 (_currentCount === 0),则当前 _processNextTask 的执行会在这里“暂停”(实际上是 async/await 的特性,它会 yield 控制权给事件循环),直到 _semaphore.release() 被调用并解决了对应的 acquire Promise。
    • 取出任务:一旦获得许可,就从 _taskQueueshift 出一个任务。
    • 执行任务:调用 taskFn() 得到实际的 Promise,并将其添加到 _runningTasks 集合。
    • 等待任务完成await taskPromise 等待任务的实际完成(成功或失败)。
    • 处理结果:根据任务的成功或失败,调用 taskEntry 中存储的 resolvereject 函数,从而解决/拒绝 addTask 返回给外部的 Promise。
    • finally
      • _runningTasks 中移除已完成的任务。
      • this._semaphore.release():释放一个许可,这可能唤醒一个正在 acquire 处等待的 _processNextTask 调用。
      • _totalTasksCompleted 计数器增加。
      • 递归调用 this._processNextTask():这是保持并发度的关键。一个任务完成后,调度器会立即尝试处理下一个任务,如果此时有可用许可,就会立即启动新任务,否则会继续等待许可。
    • 检查空闲:每次任务完成时,都会检查 _totalTasksAdded === _totalTasksCompleted && this._runningTasks.size === 0 来判断调度器是否已完全空闲,并调用 _notifyIdle()
  6. _notifyIdle():

    • 遍历 _onIdleCallbacks 队列,调用其中的所有 resolve 函数,将所有任务的结果数组传递给它们。这会解决 start() 方法返回的 Promise。

5. 调度器执行流程详解

为了更好地理解调度器的运作,我们通过一个具体的例子来跟踪其执行流程。

假设我们设置 maxConcurrency = 3,并添加了 5 个任务,每个任务模拟一个耗时操作。

// 模拟一个异步任务
function mockAsyncTask(id, duration) {
    console.log(`Task ${id}: Started, will complete in ${duration}ms.`);
    return new Promise(resolve => {
        setTimeout(() => {
            console.log(`Task ${id}: Completed.`);
            resolve(`Task ${id} Result`);
        }, duration);
    });
}

async function runSchedulerExample() {
    const scheduler = new PromiseScheduler(3); // 最大并发数设置为 3

    console.log('Adding tasks...');
    const taskPromises = [];
    taskPromises.push(scheduler.addTask(() => mockAsyncTask(1, 2000))); // 任务1 (2s)
    taskPromises.push(scheduler.addTask(() => mockAsyncTask(2, 1000))); // 任务2 (1s)
    taskPromises.push(scheduler.addTask(() => mockAsyncTask(3, 3000))); // 任务3 (3s)
    taskPromises.push(scheduler.addTask(() => mockAsyncTask(4, 1500))); // 任务4 (1.5s)
    taskPromises.push(scheduler.addTask(() => mockAsyncTask(5, 500)));  // 任务5 (0.5s)
    console.log('All tasks added. Scheduler current pending:', scheduler.pendingTaskCount);

    console.log('Starting scheduler...');
    const allResultsPromise = scheduler.start(); // 启动调度器

    const results = await allResultsPromise; // 等待所有任务完成
    console.log('n--- All tasks completed ---');
    console.log('Final results:', results);
    console.log('Scheduler total completed:', scheduler.totalTasksCompleted);
}

runSchedulerExample();

执行日志(简化版)及流程分析:

  1. new PromiseScheduler(3): 创建调度器,_semaphore 容量为3。_isStarted = false

  2. scheduler.addTask(...) (5次):

    • 每个 addTask 调用都会将一个 { taskFn, resolve, reject, id } 对象推入 _taskQueue
    • _isStarted 此时为 false,所以 _processNextTask() 不会被立即调用。
    • 此时 _taskQueue 包含 5 个任务。
  3. scheduler.start():

    • _isStarted 设置为 true

    • for (let i = 0; i < 3; i++) { this._processNextTask(); } 循环执行 3 次 _processNextTask()

    • 第一次 _processNextTask() (for 循环):

      • await this._semaphore.acquire(): 成功获取许可(_currentCount 从 3 变为 2)。
      • _taskQueue 取出 任务1
      • taskFn() 执行 mockAsyncTask(1, 2000),返回 Promise。
      • _runningTasks 添加 任务1 的 Promise。
      • console.log('Task 1: Started...')
      • await taskPromise: 任务1 开始计时 2000ms。_processNextTask 在这里暂停,控制权返回事件循环。
    • 第二次 _processNextTask() (for 循环):

      • await this._semaphore.acquire(): 成功获取许可(_currentCount 从 2 变为 1)。
      • _taskQueue 取出 任务2
      • taskFn() 执行 mockAsyncTask(2, 1000),返回 Promise。
      • _runningTasks 添加 任务2 的 Promise。
      • console.log('Task 2: Started...')
      • await taskPromise: 任务2 开始计时 1000ms。_processNextTask 在这里暂停。
    • 第三次 _processNextTask() (for 循环):

      • await this._semaphore.acquire(): 成功获取许可(_currentCount 从 1 变为 0)。
      • _taskQueue 取出 任务3
      • taskFn() 执行 mockAsyncTask(3, 3000),返回 Promise。
      • _runningTasks 添加 任务3 的 Promise。
      • console.log('Task 3: Started...')
      • await taskPromise: 任务3 开始计时 3000ms。_processNextTask 在这里暂停。
    • 此时,_runningTasks.size 为 3,_semaphore.currentCount 为 0。_taskQueue 中还剩下 任务4 和 任务5。

    • scheduler.start() 将其返回的 Promise 的 resolve 推入 _onIdleCallbacks,并返回这个 Promise。

  4. 1000ms 后:

    • 任务2 完成 (console.log('Task 2: Completed.'))。
    • 任务2 对应的 _processNextTask 恢复执行,继续 await taskPromise 之后的代码。
    • _runningTasks 移除 任务2 的 Promise。
    • this._semaphore.release(): 释放许可(_currentCount 从 0 变为 1)。
    • this._totalTasksCompleted 变为 1。
    • 递归调用 this._processNextTask():
      • _isStartedtrue_taskQueue.length (2) > 0。
      • await this._semaphore.acquire(): 成功获取许可(_currentCount 从 1 变为 0)。
      • _taskQueue 取出 任务4
      • taskFn() 执行 mockAsyncTask(4, 1500)
      • _runningTasks 添加 任务4 的 Promise。
      • console.log('Task 4: Started...')
      • await taskPromise: 任务4 开始计时 1500ms。_processNextTask 在这里暂停。
  5. 2000ms 后 (从开始算):

    • 任务1 完成 (console.log('Task 1: Completed.'))。
    • 任务1 对应的 _processNextTask 恢复执行。
    • _runningTasks 移除 任务1 的 Promise。
    • this._semaphore.release(): 释放许可(_currentCount 从 0 变为 1)。
    • this._totalTasksCompleted 变为 2。
    • 递归调用 this._processNextTask():
      • _isStartedtrue_taskQueue.length (1) > 0。
      • await this._semaphore.acquire(): 成功获取许可(_currentCount 从 1 变为 0)。
      • _taskQueue 取出 任务5
      • taskFn() 执行 mockAsyncTask(5, 500)
      • _runningTasks 添加 任务5 的 Promise。
      • console.log('Task 5: Started...')
      • await taskPromise: 任务5 开始计时 500ms。_processNextTask 在这里暂停。
  6. 2500ms 后 (从开始算):

    • 任务5 完成 (console.log('Task 5: Completed.'))。
    • 任务5 对应的 _processNextTask 恢复执行。
    • _runningTasks 移除 任务5 的 Promise。
    • this._semaphore.release(): 释放许可(_currentCount 从 0 变为 1)。
    • this._totalTasksCompleted 变为 3。
    • 递归调用 this._processNextTask():
      • _isStartedtrue,但 _taskQueue.length (0) 为 0。此调用立即返回。
    • 检查空闲: _totalTasksAdded (5) != _totalTasksCompleted (3),所以 _notifyIdle 不会执行。
  7. 2500ms + 500ms = 3000ms 后 (从开始算):

    • 任务3 完成 (console.log('Task 3: Completed.'))。
    • 任务3 对应的 _processNextTask 恢复执行。
    • _runningTasks 移除 任务3 的 Promise。
    • this._semaphore.release(): 释放许可(_currentCount 从 1 变为 2)。
    • this._totalTasksCompleted 变为 4。
    • 递归调用 this._processNextTask():
      • _isStartedtrue,但 _taskQueue.length (0) 为 0。此调用立即返回。
    • 检查空闲: _totalTasksAdded (5) != _totalTasksCompleted (4),所以 _notifyIdle 不会执行。
  8. 3000ms + 1500ms = 4500ms 后 (从开始算):

    • 任务4 完成 (console.log('Task 4: Completed.'))。
    • 任务4 对应的 _processNextTask 恢复执行。
    • _runningTasks 移除 任务4 的 Promise。
    • this._semaphore.release(): 释放许可(_currentCount 从 2 变为 3)。
    • this._totalTasksCompleted 变为 5。
    • 递归调用 this._processNextTask():
      • _isStartedtrue,但 _taskQueue.length (0) 为 0。此调用立即返回。
    • 检查空闲: _totalTasksAdded (5) === _totalTasksCompleted (5) 且 _runningTasks.size (0) === 0。
    • _notifyIdle() 执行: 解决 start() 方法返回的 Promise。
    • runSchedulerExample 中的 await allResultsPromise 完成,打印最终结果。

通过这个详细的流程,我们可以清晰地看到信号量是如何精确控制并发任务数量的。当许可不足时,acquire 会“阻塞”后续任务的启动;当任务完成时,release 会释放许可并唤醒等待中的任务,确保并发度始终维持在设定值以内。

6. 进阶考量与扩展

我们构建的 PromiseScheduler 已经非常强大和实用,但在实际应用中,我们可能还需要考虑一些更复杂的场景和功能。

6.1 错误处理与重试机制

当前调度器能够捕获并传递任务的拒绝状态。但对于某些临时性错误(如网络瞬断、API限流),我们可能希望自动重试任务。

  • 实现方式
    • addTask 中,除了 taskFn,还可以接受 retryCountretryDelay 参数。
    • _processNextTaskcatch 块中,如果任务失败且 retryCount > 0,则不立即 reject 外部 Promise,而是将任务重新包装(递减 retryCount),并重新推入任务队列(或一个特殊重试队列),并可能添加一个延迟。

6.2 任务优先级

如果任务具有不同的重要性,我们可能希望高优先级的任务能够插队执行。

  • 实现方式
    • _taskQueue 不再是简单的数组,而是一个优先级队列(如基于堆实现),或者多个独立的队列(高优、中优、低优)。
    • _processNextTask 在选择任务时,优先从高优先级队列中取出。

6.3 任务取消

有时用户可能希望取消一个正在等待或正在执行的任务。

  • 实现方式
    • 对于等待中的任务:可以在 addTask 返回的 Promise 上添加一个 cancel 方法,调用时从 _taskQueue 中移除该任务。
    • 对于运行中的任务:JavaScript 的 AbortController API 是一个很好的选择。taskFn 应该接收一个 signal 对象,并用它来监听取消事件。调度器在取消时调用 abortController.abort()

6.4 调度器状态监控

为了更好地了解调度器的运行状况,可以添加更多的监控指标。

  • 实现方式
    • getRunningCount(), getPendingCount(), getTotalCompleted(), getTotalFailed(), getAverageLatency() 等方法。
    • 可以集成事件发射器(EventEmitter),在任务启动、完成、失败、队列空闲等事件发生时发出通知。

6.5 动态调整并发数

在某些情况下,我们可能希望在运行时根据系统负载或外部条件动态调整 maxConcurrency

  • 实现方式
    • Semaphore 类添加一个 setCapacity(newCapacity) 方法。
    • 这个方法需要小心处理:如果 newCapacity 小于当前正在运行的任务数,可能需要等到一些任务完成才能达到新的限制。如果 newCapacity 大于当前运行数,则可以立即尝试启动更多任务。

6.6 背压(Backpressure)机制

如果任务被添加的速度远超它们被处理的速度,_taskQueue 可能会无限增长,导致内存问题。

  • 实现方式
    • 限制 _taskQueue 的最大长度。当队列满时,addTask 可以选择:
      • 立即拒绝新的任务(返回一个 Promise.reject)。
      • 阻塞 addTask 调用,直到队列有空位(这需要一个额外的信号量来控制 addTask 本身的并发)。

6.7 任务超时

为每个任务设置一个超时时间,如果任务在规定时间内未完成,则视为失败。

  • 实现方式
    • _processNextTask 中,将 taskPromise 与一个 setTimeout 包装的 Promise.reject 进行 Promise.race

7. 实际应用场景

基于信号量的 Promise 调度器在许多实际场景中都非常有用:

  1. Web 爬虫与数据抓取:限制对目标网站的请求频率,避免被封禁IP,同时提高抓取效率。
  2. 批量 API 请求:例如,需要从第三方服务获取大量数据,但服务有每秒请求次数限制。调度器可以确保不会超过这个限制。
  3. 文件处理队列:在前端上传多个文件时,可能希望同时只处理少量文件的压缩、预览生成等操作,而不是一次性处理所有文件。
  4. 图片/视频处理服务:后端服务处理用户上传的图片或视频时,为了避免服务器过载,可以限制同时进行的编码、缩放或水印添加任务。
  5. 数据库连接池(概念类比):虽然 JavaScript 通常不直接管理数据库连接,但在其他语言中,信号量是实现连接池的关键,限制同时活跃的数据库连接数。
  6. CI/CD 流水线:限制同时运行的构建或部署任务的数量。

8. 思考与展望

通过本次讲座,我们深入探讨了如何利用信号量这一经典并发控制原语,在 JavaScript 异步环境中构建一个健壮的 Promise 调度器。我们从信号量的基本概念、核心操作入手,逐步实现了 Semaphore 类,并将其巧妙地集成到 PromiseScheduler 中,最终形成了一个功能完善、逻辑严谨的并发控制解决方案。

这个调度器不仅解决了过度并发带来的资源消耗和稳定性问题,还为我们提供了一个灵活的框架,可以根据实际需求进行扩展,如添加重试、优先级、取消和监控等高级功能。掌握这种基于信号量的并发控制模式,对于构建高性能、高稳定性的异步应用至关重要。我鼓励大家在自己的项目中尝试使用和改进这个调度器,探索其在各种复杂场景下的应用潜力。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注