实现一个可控的并发请求函数:保证同时最多只有 N 个请求在处理

各位同仁,各位技术爱好者,大家好。

今天,我们将深入探讨一个在现代软件开发中极为常见且至关重要的话题:如何实现一个可控的并发请求函数,确保在任何时刻,正在处理的请求数量都不会超过我们预设的上限 N。这不仅仅是一个技术实现细节,它更是一种对系统资源负责、提升用户体验、保障服务稳定性的设计哲学。

并发控制的必要性与挑战

在互联网应用中,我们经常需要向后端服务器发起大量的异步请求,例如批量上传图片、获取数据列表、发送通知等。如果没有适当的并发控制,一股脑地发出所有请求,可能会带来一系列严重问题:

  1. 服务器过载 (Server Overload):瞬时涌入的大量请求可能超出服务器的处理能力,导致请求响应变慢、错误率升高,甚至服务崩溃。这如同对服务器进行了一次“非恶意”的分布式拒绝服务(DDoS)攻击。
  2. 客户端性能下降 (Client-side Performance Degradation):在浏览器环境中,过多的网络请求会占用大量的网络带宽、CPU 和内存资源,导致页面卡顿、响应迟缓,严重影响用户体验。
  3. API 限流触发 (API Rate Limiting):许多第三方服务或内部 API 都设有请求限流机制。如果短时间内发送的请求数量超过了限制,后续请求将被拒绝,导致业务中断。
  4. 资源争抢与死锁 (Resource Contention and Deadlock):在某些场景下,并发请求可能涉及到共享资源的访问。不加控制的并发可能导致资源争抢,甚至在设计不当的情况下引发死锁,使系统陷入停滞。
  5. 内存消耗 (Memory Consumption):每个待处理或正在处理的请求都需要占用一定的内存。如果并发量过高,可能导致内存溢出,尤其是在资源受限的环境中(如移动设备或嵌入式系统)。

因此,实现一个可控的并发请求函数,其核心目标是像交通管制员一样,对汹涌的请求洪流进行疏导和调度,确保交通(请求)既能高效通行,又不会造成拥堵。挑战在于,我们需要在异步环境中,精确地追踪和管理每个请求的状态,并在请求完成或失败后,及时释放“通行证”,允许新的请求进入。

核心概念:信号量与任务队列

要解决并发控制问题,我们首先需要理解两个核心概念:信号量 (Semaphore)任务队列 (Task Queue)

信号量 (Semaphore)

信号量是计算机科学中一个经典的同步原语,由荷兰计算机科学家 Edsger W. Dijkstra 提出。它本质上是一个计数器,用于控制对共享资源的访问。

  • P 操作 (Wait/Acquire):尝试获取一个资源。如果计数器大于 0,则减 1 并继续执行;否则,进程/线程会被阻塞,直到计数器大于 0。
  • V 操作 (Signal/Release):释放一个资源。计数器加 1。如果有进程/线程因 P 操作而被阻塞,其中一个将被唤醒。

在我们的并发请求场景中,这个“资源”就是允许执行的请求槽位。信号量的计数值就是我们设定的最大并发数 N。每次一个请求开始执行时,我们执行 P 操作(减少可用槽位);每次请求完成(无论成功或失败)时,我们执行 V 操作(增加可用槽位)。当没有可用槽位时,新的请求就必须等待。

任务队列 (Task Queue)

当信号量指示当前没有可用槽位时,新来的请求不能立即执行。它们需要被“排队”,等待有空闲槽位时再被调度。这就是任务队列的作用。

  • 先进先出 (FIFO):最常见的队列策略是先进先出。请求按照它们被添加的顺序依次执行。
  • 优先级队列 (Priority Queue):在某些高级场景中,我们可能希望某些请求比其他请求更早地执行。这时可以使用优先级队列,允许高优先级的请求插队。

结合这两个概念,我们的并发控制器将维护一个任务队列来存储待处理的请求,并利用一个内部计数器(模拟信号量)来跟踪当前正在执行的请求数量。当计数器未达到上限且队列中有任务时,就从队列中取出任务并执行。

基础实现:构建一个简单的并发控制器

让我们从一个最基础的并发控制器开始。我们的目标是创建一个类,它能接受一个异步任务函数(返回 Promise),并确保同时执行的任务不超过指定数量 limit

需求分析

  1. 接受异步任务: 控制器需要能够接受一个返回 Promise 的函数。
  2. 限制并发数: 核心功能,确保 activeCount <= limit
  3. 任务排队: 当达到并发上限时,新任务应进入队列等待。
  4. 任务调度: 当有任务完成时,从队列中取出下一个任务执行。
  5. 结果返回: 每个任务的 Promise 应该能正确地 resolve 或 reject。

设计思路

我们定义一个 ConcurrentLimiter 类,它包含以下核心属性和方法:

  • limit: 允许的最大并发数。
  • activeCount: 当前正在执行的任务数量。
  • taskQueue: 存储等待执行的任务的队列。每个任务都是一个包含原始函数和其对应的 Promise resolve/reject 方法的对象。
  • add(taskFn): 添加一个新任务到队列。这个方法返回一个 Promise,代表 taskFn 的执行结果。
  • _runNext(): 内部调度方法,负责检查是否有空闲槽位和待执行任务,并启动下一个任务。

代码实现 (版本 1 – 基础 Promise 调度)

/**
 * @typedef {Object} QueuedTask
 * @property {Function} fn - 实际要执行的异步函数。
 * @property {(value: any) => void} resolve - 任务 Promise 的 resolve 方法。
 * @property {(reason?: any) => void} reject - 任务 Promise 的 reject 方法。
 */

class ConcurrentLimiter {
    /**
     * 最大并发数。
     * @private
     * @type {number}
     */
    private limit: number;

    /**
     * 当前正在执行的任务数量。
     * @private
     * @type {number}
     */
    private activeCount: number = 0;

    /**
     * 等待执行的任务队列。
     * @private
     * @type {QueuedTask[]}
     */
    private taskQueue: Array<{
        fn: (...args: any[]) => Promise<any>;
        resolve: (value: any) => void;
        reject: (reason?: any) => void;
    }> = [];

    /**
     * 创建一个并发请求限制器。
     * @param {number} limit - 最大并发数。
     * @throws {Error} 如果 limit 小于 1。
     */
    constructor(limit: number) {
        if (limit < 1) {
            throw new Error('Concurrency limit must be at least 1.');
        }
        this.limit = limit;
        console.log(`ConcurrentLimiter initialized with limit: ${this.limit}`);
    }

    /**
     * 添加一个异步任务到队列中。
     * 如果当前并发数未达到上限,任务会立即执行;否则会进入队列等待。
     * @param {(...args: any[]) => Promise<any>} taskFn - 要执行的异步函数,必须返回一个 Promise。
     * @returns {Promise<any>} - 一个 Promise,它会解析或拒绝为 taskFn 的结果。
     */
    public add<T>(taskFn: (...args: any[]) => Promise<T>): Promise<T> {
        return new Promise<T>((resolve, reject) => {
            this.taskQueue.push({ fn: taskFn, resolve, reject });
            console.log(`Task added. Queue length: ${this.taskQueue.length}`);
            this._runNext(); // 尝试运行下一个任务
        });
    }

    /**
     * 内部方法,尝试从队列中取出任务并执行。
     * 只有当当前活跃任务数小于限制且队列中有任务时才会执行。
     * @private
     */
    private _runNext(): void {
        console.log(`_runNext called. Active: ${this.activeCount}, Limit: ${this.limit}, Queue: ${this.taskQueue.length}`);
        if (this.activeCount >= this.limit || this.taskQueue.length === 0) {
            console.log('Conditions not met to run next task.');
            return; // 达到并发上限或队列为空,不执行
        }

        const task = this.taskQueue.shift(); // 从队列头部取出一个任务
        if (!task) {
            return; // 队列可能在其他地方被清空
        }

        this.activeCount++; // 增加活跃任务计数
        console.log(`Starting task. Active count: ${this.activeCount}`);

        // 执行任务,并处理其结果
        task.fn()
            .then(result => {
                task.resolve(result); // 任务成功,解析对应的 Promise
            })
            .catch(error => {
                task.reject(error); // 任务失败,拒绝对应的 Promise
            })
            .finally(() => {
                this.activeCount--; // 任务完成(无论成功失败),减少活跃任务计数
                console.log(`Task finished. Active count: ${this.activeCount}`);
                this._runNext(); // 任务完成后,再次尝试运行下一个任务
            });
    }

    /**
     * 获取当前正在执行的任务数量。
     * @returns {number}
     */
    public getActiveCount(): number {
        return this.activeCount;
    }

    /**
     * 获取当前等待中的任务数量。
     * @returns {number}
     */
    public getQueueLength(): number {
        return this.taskQueue.length;
    }
}

使用示例

// 模拟一个异步请求
const mockRequest = (id: number, duration: number = 1000): Promise<string> => {
    const startTime = Date.now();
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            const endTime = Date.now();
            if (id % 5 === 0) { // 模拟一些任务失败
                console.error(`Task ${id} FAILED after ${endTime - startTime}ms`);
                reject(new Error(`Request ${id} failed`));
            } else {
                console.log(`Task ${id} completed in ${endTime - startTime}ms`);
                resolve(`Data from request ${id}`);
            }
        }, duration);
    });
};

const limiter = new ConcurrentLimiter(3); // 最多同时处理 3 个请求

const tasks = [];
for (let i = 1; i <= 10; i++) {
    tasks.push(limiter.add(() => mockRequest(i, Math.random() * 2000 + 500)));
}

Promise.allSettled(tasks)
    .then(results => {
        console.log('nAll tasks settled:');
        results.forEach((result, index) => {
            if (result.status === 'fulfilled') {
                console.log(`Task ${index + 1} Succeeded: ${result.value}`);
            } else {
                console.log(`Task ${index + 1} Failed: ${result.reason}`);
            }
        });
        console.log(`Final active count: ${limiter.getActiveCount()}`);
        console.log(`Final queue length: ${limiter.getQueueLength()}`);
    });

// 预期输出:
// - 最多同时有 3 个 "Starting task" 日志。
// - 任务完成和开始是交替的,保持 activeCount <= 3。
// - 最终所有任务都会被处理。

这个基础实现已经能够满足“最多 N 个请求在处理”的核心需求。它通过 activeCounttaskQueue 实现了信号量和任务队列的功能。_runNext 方法是调度的核心,在每次任务添加和任务完成时被调用,以确保系统始终尝试保持最大并发效率。

进阶功能与优化

基础的并发控制器虽然可用,但在实际生产环境中,我们往往需要更强大的功能来处理各种复杂情况。

1. 错误处理与重试机制

任务失败是常态。一个健壮的并发控制器应该能够优雅地处理任务失败,并提供可选的重试机制。

  • 错误捕获: Promise.catch 可以捕获任务执行中的错误。
  • 重试逻辑: 当任务失败时,可以根据配置决定是否重试。重试次数、重试间隔(例如指数退避)都是重要的参数。

让我们扩展 QueuedTask 接口和 add 方法,允许为每个任务指定重试选项。

/**
 * @typedef {Object} TaskOptions
 * @property {number} [priority=0] - 任务的优先级,数字越大优先级越高。
 * @property {number} [retries=0] - 任务失败时的最大重试次数。
 * @property {number} [retryDelay=0] - 重试前的等待毫秒数。
 * @property {(error: any, attempt: number) => boolean} [shouldRetry] - 自定义重试判断函数。
 */

/**
 * @typedef {Object} QueuedTask
 * @property {Function} fn - 实际要执行的异步函数。
 * @property {(value: any) => void} resolve - 任务 Promise 的 resolve 方法。
 * @property {(reason?: any) => void} reject - 任务 Promise 的 reject 方法。
 * @property {TaskOptions} options - 任务的配置选项。
 * @property {number} attempt - 当前重试的次数(从 0 开始)。
 * @property {number} id - 任务的唯一标识符。
 */

class ConcurrentLimiterWithRetry extends ConcurrentLimiter { // 继承基础类
    private taskIdCounter: number = 0;

    constructor(limit: number) {
        super(limit);
    }

    public add<T>(
        taskFn: (...args: any[]) => Promise<T>,
        options?: TaskOptions
    ): Promise<T> {
        const taskId = ++this.taskIdCounter;
        return new Promise<T>((resolve, reject) => {
            this.taskQueue.push({
                id: taskId,
                fn: taskFn,
                resolve,
                reject,
                options: { retries: 0, retryDelay: 0, priority: 0, ...options },
                attempt: 0,
            });
            // 在添加任务时,考虑优先级,这里暂时只做简单处理
            // 后续会详细讲解优先级队列
            this.taskQueue.sort((a, b) => (b.options?.priority || 0) - (a.options?.priority || 0));

            this._runNext();
        });
    }

    // 重写 _runNext 方法以支持重试
    protected _runNext(): void {
        if (this.activeCount >= this.limit || this.taskQueue.length === 0) {
            return;
        }

        const taskIndex = this.taskQueue.findIndex(
            t => t.attempt === 0 || t.options.retries > 0 // 寻找可以执行的任务
        );

        if (taskIndex === -1) {
            return; // 没有可以执行的任务
        }

        const [task] = this.taskQueue.splice(taskIndex, 1); // 取出任务

        this.activeCount++;

        this._executeTask(task); // 抽离任务执行逻辑
    }

    /**
     * 执行单个任务,包括重试逻辑。
     * @private
     * @param {QueuedTask} task - 要执行的任务对象。
     */
    private _executeTask(task: QueuedTask): void {
        console.log(`[Task ${task.id}] Starting (Attempt: ${task.attempt + 1}/${(task.options?.retries || 0) + 1}), Active: ${this.activeCount}`);
        task.fn()
            .then(result => {
                task.resolve(result);
            })
            .catch(error => {
                const maxRetries = task.options?.retries || 0;
                const shouldRetryFn = task.options?.shouldRetry;
                const currentAttempt = task.attempt;

                let doRetry = false;
                if (shouldRetryFn) {
                    doRetry = shouldRetryFn(error, currentAttempt + 1);
                } else {
                    doRetry = currentAttempt < maxRetries;
                }

                if (doRetry) {
                    task.attempt++;
                    const retryDelay = task.options?.retryDelay || 0;
                    console.warn(`[Task ${task.id}] Failed, retrying in ${retryDelay}ms. Attempt ${task.attempt}/${maxRetries + 1}. Error: ${error.message}`);
                    setTimeout(() => {
                        this.taskQueue.unshift(task); // 重新放入队列头部,或者考虑优先级
                        this._runNext(); // 再次尝试运行
                    }, retryDelay);
                } else {
                    console.error(`[Task ${task.id}] Permanently failed after ${currentAttempt + 1} attempts. Error: ${error.message}`);
                    task.reject(error);
                }
            })
            .finally(() => {
                this.activeCount--;
                console.log(`[Task ${task.id}] Finished. Active: ${this.activeCount}`);
                // 如果是重试任务,_runNext 会在 setTimeout 中被调用
                // 如果是最终完成或失败的任务,在此调用_runNext
                if (task.attempt === (task.options?.retries || 0) || !this.taskQueue.includes(task)) { // 确保不是正在等待重试的任务
                     this._runNext();
                }
            });
    }
}

重试逻辑的考量点:

  • 指数退避 (Exponential Backoff):重试间隔随着重试次数的增加而增长,这是一种常见的策略,可以避免在服务暂时不可用时,不断地给服务器施加压力。例如,retryDelay * (2 ** attempt)
  • 自定义重试条件: shouldRetry 函数允许开发者根据错误类型或业务逻辑决定是否重试。

2. 任务优先级

在某些场景下,我们可能希望某些任务能够优先执行,即使它们是后加入队列的。例如,用户主动触发的操作可能比后台数据同步操作更重要。

实现优先级队列通常有两种方式:

  1. 每次添加任务时排序: 效率较低,如果队列很长,排序开销会很大。
  2. 使用专门的优先级队列数据结构: 更高效,如二叉堆 (Binary Heap)。

对于大多数情况,如果队列不是特别大,简单的插入排序或在 add 时进行一次排序就足够了。我们可以在 add 方法中,根据任务的 priority 属性将任务插入到队列的正确位置。

// 在 ConcurrentLimiterWithRetry 的 add 方法中:
public add<T>(
    taskFn: (...args: any[]) => Promise<T>,
    options?: TaskOptions
): Promise<T> {
    const taskId = ++this.taskIdCounter;
    return new Promise<T>((resolve, reject) => {
        const newTask = {
            id: taskId,
            fn: taskFn,
            resolve,
            reject,
            options: { retries: 0, retryDelay: 0, priority: 0, ...options },
            attempt: 0,
        };

        // 插入到正确的位置以保持优先级顺序
        let inserted = false;
        for (let i = 0; i < this.taskQueue.length; i++) {
            if ((newTask.options?.priority || 0) > (this.taskQueue[i].options?.priority || 0)) {
                this.taskQueue.splice(i, 0, newTask);
                inserted = true;
                break;
            }
        }
        if (!inserted) {
            this.taskQueue.push(newTask); // 如果是最低优先级或队列为空,则添加到末尾
        }

        console.log(`[Task ${taskId}] added with priority ${newTask.options?.priority}. Queue length: ${this.taskQueue.length}`);
        this._runNext();
    });
}

// _runNext 方法现在直接从队列头部取任务即可,因为队列已经排序
protected _runNext(): void {
    if (this.activeCount >= this.limit || this.taskQueue.length === 0) {
        return;
    }

    // 确保取出的任务不是正在等待重试的
    const task = this.taskQueue.shift(); // 队列头部已经是最高优先级的任务
    if (!task) {
        return;
    }

    // ... (rest of _runNext logic remains similar, calling _executeTask)
    this.activeCount++;
    this._executeTask(task);
}

3. 取消任务 (Task Cancellation)

有时,用户可能在任务完成前就离开了页面,或者业务逻辑不再需要某个任务的结果。这时,能够取消正在等待或正在执行的任务就变得很重要。

实现任务取消的关键是:

  1. 任务可感知取消: 任务函数本身需要能够检查是否已被取消。
  2. 取消信号传播: 控制器需要提供一种机制来通知任务其已被取消。

AbortController 是现代 JavaScript 中用于取消异步操作的标准机制。

// 扩展 TaskOptions 和 QueuedTask
interface TaskOptions {
    // ... 其他选项
    signal?: AbortSignal; // 用于任务取消的 AbortSignal
}

interface QueuedTask {
    // ... 其他属性
    abortController?: AbortController; // 用于控制器内部取消
}

class ConcurrentLimiterWithCancel extends ConcurrentLimiterWithRetry {
    // ... 构造函数等

    public add<T>(
        taskFn: (signal?: AbortSignal) => Promise<T>, // 任务函数现在可以接收一个 signal
        options?: TaskOptions
    ): Promise<T> {
        const taskId = ++this.taskIdCounter;
        const abortController = new AbortController(); // 为每个任务创建 AbortController

        // 如果外部提供了 signal,则连接起来
        if (options?.signal) {
            options.signal.addEventListener('abort', () => {
                abortController.abort();
            });
        }

        return new Promise<T>((resolve, reject) => {
            const newTask: QueuedTask = {
                id: taskId,
                fn: () => taskFn(abortController.signal), // 将 signal 传递给任务函数
                resolve,
                reject,
                options: { retries: 0, retryDelay: 0, priority: 0, ...options },
                attempt: 0,
                abortController: abortController, // 保存 controller 实例
            };

            // ... (优先级排序逻辑,与之前相同)
            // 假设已添加到 this.taskQueue
            this.taskQueue.push(newTask);
            this.taskQueue.sort((a, b) => (b.options?.priority || 0) - (a.options?.priority || 0));

            console.log(`[Task ${taskId}] added. Queue length: ${this.taskQueue.length}`);
            this._runNext();
        });
    }

    /**
     * 取消一个等待中的任务。
     * @param {number} taskId - 要取消的任务的 ID。
     * @returns {boolean} - 如果任务被找到并取消,返回 true;否则返回 false。
     */
    public cancelTask(taskId: number): boolean {
        const taskIndex = this.taskQueue.findIndex(task => task.id === taskId);
        if (taskIndex !== -1) {
            const [task] = this.taskQueue.splice(taskIndex, 1);
            task.abortController?.abort(); // 发送取消信号
            task.reject(new DOMException('Task cancelled by user.', 'AbortError')); // 拒绝任务 Promise
            console.log(`[Task ${taskId}] cancelled from queue.`);
            return true;
        }
        // 对于正在执行的任务,我们只能发送 signal,任务自身需要响应
        // 这里暂时不处理正在执行的任务的直接移除,因为它们已经占用了 activeCount
        // 任务内部接收到 signal 后会自行处理
        console.warn(`[Task ${taskId}] not found in queue or already running.`);
        return false;
    }

    // _executeTask 中无需特别改动,因为任务函数自身会监听 signal
    // 如果任务函数内部能够响应 signal 并抛出 AbortError,那么它就会进入 catch 块
    // 我们可以在 catch 块中检查错误类型
    private _executeTask(task: QueuedTask): void {
        console.log(`[Task ${task.id}] Starting (Attempt: ${task.attempt + 1}), Active: ${this.activeCount}`);
        task.fn(task.abortController?.signal) // 传递 signal 给任务函数
            .then(result => {
                task.resolve(result);
            })
            .catch(error => {
                if (error instanceof DOMException && error.name === 'AbortError') {
                    console.warn(`[Task ${task.id}] was aborted.`);
                    task.reject(error); // 任务被取消,直接拒绝
                } else {
                    // ... (原有重试逻辑)
                    // ... 如果不是 AbortError,执行重试或永久失败逻辑
                }
            })
            .finally(() => {
                this.activeCount--;
                console.log(`[Task ${task.id}] Finished. Active: ${this.activeCount}`);
                if (task.attempt === (task.options?.retries || 0) || !this.taskQueue.includes(task)) {
                     this._runNext();
                }
            });
    }
}

// 模拟一个可取消的请求
const cancellableMockRequest = (id: number, signal?: AbortSignal, duration: number = 1000): Promise<string> => {
    return new Promise((resolve, reject) => {
        const timeoutId = setTimeout(() => {
            if (signal?.aborted) {
                reject(new DOMException('Request aborted during delay', 'AbortError'));
                return;
            }
            console.log(`Task ${id} completed.`);
            resolve(`Data from request ${id}`);
        }, duration);

        signal?.addEventListener('abort', () => {
            clearTimeout(timeoutId);
            reject(new DOMException('Request aborted by signal', 'AbortError'));
        }, { once: true });
    });
};

// 使用示例
const limiterWithCancel = new ConcurrentLimiterWithCancel(2);
const task1Promise = limiterWithCancel.add(() => cancellableMockRequest(1, undefined, 2000));
const task2Promise = limiterWithCancel.add(() => cancellableMockRequest(2, undefined, 1000));
const task3Promise = limiterWithCancel.add(() => cancellableMockRequest(3, undefined, 3000)); // 这个任务会排队

// 假设我们有一个外部的 AbortController 来管理一组任务
const externalAbortController = new AbortController();
const task4Promise = limiterWithCancel.add(() => cancellableMockRequest(4, externalAbortController.signal, 4000));

setTimeout(() => {
    // 假设我们想取消 task3,因为它还在队列中
    limiterWithCancel.cancelTask(3);
}, 500);

setTimeout(() => {
    // 假设我们想取消 task4,它可能正在运行或等待
    externalAbortController.abort(); // 触发 task4 的 AbortSignal
}, 1500);

Promise.allSettled([task1Promise, task2Promise, task3Promise, task4Promise])
    .then(results => {
        console.log('nAll cancellable tasks settled:');
        results.forEach((result, index) => {
            console.log(`Task ${index + 1} status: ${result.status}`);
            if (result.status === 'rejected') {
                console.log(`  Reason: ${result.reason?.name || result.reason}`);
            }
        });
    });

4. 进度反馈与状态管理

在长时间运行的并发任务中,用户或上层应用可能需要了解当前的状态,例如:有多少任务等待中,有多少任务已完成,总任务数是多少等。这可以通过事件发射器 (EventEmitter) 来实现。

import EventEmitter from 'events'; // Node.js 环境,浏览器可以使用自定义的 EventEmitter 实现

// 扩展 ConcurrentLimiterWithCancel
class ConcurrentLimiterWithEvents extends ConcurrentLimiterWithCancel {
    private eventEmitter: EventEmitter;

    constructor(limit: number) {
        super(limit);
        this.eventEmitter = new EventEmitter();
    }

    /**
     * 注册事件监听器。
     * @param {string} eventName - 事件名称。
     * @param {(...args: any[]) => void} listener - 事件监听函数。
     * @returns {this}
     */
    public on(eventName: string, listener: (...args: any[]) => void): this {
        this.eventEmitter.on(eventName, listener);
        return this;
    }

    /**
     * 注册只触发一次的事件监听器。
     * @param {string} eventName - 事件名称。
     * @param {(...args: any[]) => void} listener - 事件监听函数。
     * @returns {this}
     */
    public once(eventName: string, listener: (...args: any[]) => void): this {
        this.eventEmitter.once(eventName, listener);
        return this;
    }

    /**
     * 移除事件监听器。
     * @param {string} eventName - 事件名称。
     * @param {(...args: any[]) => void} listener - 事件监听函数。
     * @returns {this}
     */
    public off(eventName: string, listener: (...args: any[]) => void): this {
        this.eventEmitter.off(eventName, listener);
        return this;
    }

    // 在关键操作点触发事件
    public add<T>(taskFn: (signal?: AbortSignal) => Promise<T>, options?: TaskOptions): Promise<T> {
        // ... (原有 add 逻辑)
        const promise = super.add(taskFn, options);
        this.eventEmitter.emit('task:added', {
            id: this.taskIdCounter,
            queueLength: this.getQueueLength(),
            options
        });
        this._checkIdleState(); // 检查是否从空闲状态变为非空闲
        return promise;
    }

    protected _executeTask(task: QueuedTask): void {
        this.eventEmitter.emit('task:start', { id: task.id, attempt: task.attempt + 1 });
        super._executeTask(task); // 调用父类的执行逻辑
    }

    // 重写父类的 _executeTask 中的 finally 块,以触发完成/失败事件
    private _executeTaskWithEvents(task: QueuedTask): void {
        task.fn(task.abortController?.signal)
            .then(result => {
                task.resolve(result);
                this.eventEmitter.emit('task:complete', { id: task.id, result });
            })
            .catch(error => {
                if (error instanceof DOMException && error.name === 'AbortError') {
                    task.reject(error);
                    this.eventEmitter.emit('task:cancelled', { id: task.id, reason: error });
                } else {
                    const maxRetries = task.options?.retries || 0;
                    const shouldRetryFn = task.options?.shouldRetry;
                    const currentAttempt = task.attempt;

                    let doRetry = false;
                    if (shouldRetryFn) {
                        doRetry = shouldRetryFn(error, currentAttempt + 1);
                    } else {
                        doRetry = currentAttempt < maxRetries;
                    }

                    if (doRetry) {
                        task.attempt++;
                        const retryDelay = task.options?.retryDelay || 0;
                        this.eventEmitter.emit('task:retry', { id: task.id, attempt: task.attempt, error });
                        setTimeout(() => {
                            this.taskQueue.unshift(task);
                            this._runNext();
                        }, retryDelay);
                    } else {
                        task.reject(error);
                        this.eventEmitter.emit('task:error', { id: task.id, error, finalAttempt: true });
                    }
                }
            })
            .finally(() => {
                this.activeCount--;
                this.eventEmitter.emit('status:change', {
                    active: this.activeCount,
                    queue: this.getQueueLength(),
                    limit: this.limit
                });
                // Check if we are truly idle after a task finishes
                this._checkIdleState();
                if (task.attempt === (task.options?.retries || 0) || !this.taskQueue.includes(task)) {
                     this._runNext();
                }
            });
    }

    // 覆盖父类的 _executeTask 方法
    protected _executeTask(task: QueuedTask): void {
        this._executeTaskWithEvents(task);
    }

    // 辅助方法检查是否进入/离开空闲状态
    private _isIdle: boolean = true;
    private _checkIdleState(): void {
        const currentIdle = this.activeCount === 0 && this.taskQueue.length === 0;
        if (currentIdle && !this._isIdle) {
            this._isIdle = true;
            this.eventEmitter.emit('idle');
            console.log('Limiter is now idle.');
        } else if (!currentIdle && this._isIdle) {
            this._isIdle = false;
            this.eventEmitter.emit('not:idle');
            console.log('Limiter is no longer idle.');
        }
    }

    // 清空队列,并触发事件
    public clear(): void {
        const cancelledTasks = this.taskQueue.splice(0, this.taskQueue.length);
        cancelledTasks.forEach(task => {
            task.abortController?.abort();
            task.reject(new DOMException('Limiter cleared, task cancelled.', 'AbortError'));
            this.eventEmitter.emit('task:cancelled', { id: task.id, reason: 'Limiter cleared' });
        });
        this.eventEmitter.emit('queue:cleared', { count: cancelledTasks.length });
        this._checkIdleState();
        console.log(`Queue cleared. ${cancelledTasks.length} tasks cancelled.`);
    }

    /**
     * 等待所有当前和待处理任务完成。
     * @returns {Promise<void>} 当所有任务完成且队列为空时 resolve。
     */
    public async waitIdle(): Promise<void> {
        if (this.activeCount === 0 && this.taskQueue.length === 0) {
            return Promise.resolve();
        }
        return new Promise(resolve => {
            this.eventEmitter.once('idle', resolve);
        });
    }
}

// 示例:监听事件
const limiterWithEvents = new ConcurrentLimiterWithEvents(2);

limiterWithEvents.on('task:added', ({ id, queueLength }) => console.log(`[Event] Task ${id} added. Queue: ${queueLength}`));
limiterWithEvents.on('task:start', ({ id, attempt }) => console.log(`[Event] Task ${id} started (Attempt ${attempt}).`));
limiterWithEvents.on('task:complete', ({ id, result }) => console.log(`[Event] Task ${id} completed with result: ${result}`));
limiterWithEvents.on('task:error', ({ id, error, finalAttempt }) => console.error(`[Event] Task ${id} failed (finalAttempt: ${finalAttempt}): ${error.message}`));
limiterWithEvents.on('task:retry', ({ id, attempt, error }) => console.warn(`[Event] Task ${id} retrying (Attempt ${attempt}): ${error.message}`));
limiterWithEvents.on('task:cancelled', ({ id, reason }) => console.warn(`[Event] Task ${id} cancelled: ${reason.message || reason}`));
limiterWithEvents.on('status:change', (status) => console.log(`[Event] Status: Active: ${status.active}, Queue: ${status.queue}`));
limiterWithEvents.on('idle', () => console.log('[Event] Limiter is completely idle.'));

const taskPromises = [];
taskPromises.push(limiterWithEvents.add(() => mockRequest(1, 1500)));
taskPromises.push(limiterWithEvents.add(() => mockRequest(2, 1000), { retries: 1, retryDelay: 200 }));
taskPromises.push(limiterWithEvents.add(() => mockRequest(3, 2500))); // Queue
taskPromises.push(limiterWithEvents.add(() => mockRequest(4, 500), { priority: 1 })); // 高优先级,可能插队

// 模拟任务 2 首次失败
const originalMockRequest = mockRequest;
mockRequest = (id, duration) => {
    if (id === 2 && limiterWithEvents.getActiveCount() === 1) { // 第一次执行 task 2 失败
        return Promise.reject(new Error('Simulated failure for Task 2'));
    }
    return originalMockRequest(id, duration);
};

Promise.allSettled(taskPromises).then(() => {
    console.log('nAll tasks submitted to limiterWithEvents have settled.');
    limiterWithEvents.waitIdle().then(() => {
        console.log('All tasks are truly finished and limiter is idle.');
    });
});

5. 生命周期管理

控制器还需要一些方法来管理其自身的生命周期,例如暂停、恢复、清空队列和等待所有任务完成。

  • pause(): 暂停调度,不再启动新的任务,但已在运行的任务会继续。
  • resume(): 恢复调度。
  • clear(): 清空等待队列,取消所有待处理任务。
  • waitIdle(): 返回一个 Promise,当所有任务完成(包括当前正在运行和队列中的任务)且控制器进入空闲状态时 resolve。

pause()resume() 可以通过一个 isPaused 标志来实现。

// 扩展 ConcurrentLimiterWithEvents
class ConcurrentLimiterFullFeatures extends ConcurrentLimiterWithEvents {
    private isPaused: boolean = false;
    private isDraining: boolean = false; // 是否正在等待所有任务完成并清空

    constructor(limit: number) {
        super(limit);
    }

    /**
     * 暂停任务调度。
     * 正在执行的任务将继续,但不会从队列中取出新任务。
     */
    public pause(): void {
        this.isPaused = true;
        this.eventEmitter.emit('paused');
        console.log('Limiter paused.');
    }

    /**
     * 恢复任务调度。
     * 如果有待处理任务且有空闲槽位,将继续执行。
     */
    public resume(): void {
        this.isPaused = false;
        this.eventEmitter.emit('resumed');
        console.log('Limiter resumed.');
        this._runNext(); // 恢复后尝试立即运行
    }

    // 重写 _runNext 以考虑暂停状态
    protected _runNext(): void {
        if (this.isPaused || this.activeCount >= this.limit || this.taskQueue.length === 0) {
            return;
        }
        super._runNext(); // 调用父类的调度逻辑
    }

    /**
     * 清空等待队列,取消所有待处理任务。
     * 正在执行的任务不受影响。
     */
    public clear(): void {
        // ... (与 ConcurrentLimiterWithEvents 中的 clear 逻辑相同)
        super.clear();
    }

    /**
     * 等待所有当前和待处理任务完成。
     * @returns {Promise<void>} 当所有任务完成且队列为空时 resolve。
     */
    public async waitIdle(): Promise<void> {
        return super.waitIdle();
    }

    /**
     * 停止接受新任务,等待所有现有任务完成,然后进入空闲状态。
     * @returns {Promise<void>} 当所有任务完成时 resolve。
     */
    public async drain(): Promise<void> {
        if (this.isDraining) {
            return Promise.resolve(); // 已经处于 draining 状态
        }
        this.isDraining = true;
        this.eventEmitter.emit('drain:start');
        console.log('Limiter draining: no new tasks will be accepted, waiting for existing tasks.');

        // 可以选择清空队列中尚未开始的任务,或者等待它们完成
        // 这里选择等待它们完成,如果需要清空,可以先调用 clear()
        await this.waitIdle();
        this.isDraining = false;
        this.eventEmitter.emit('drain:complete');
        console.log('Limiter drain complete.');
    }

    // 重写 add 方法,在 draining 状态下拒绝新任务
    public add<T>(taskFn: (signal?: AbortSignal) => Promise<T>, options?: TaskOptions): Promise<T> {
        if (this.isDraining) {
            return Promise.reject(new Error('Limiter is draining, no new tasks allowed.'));
        }
        return super.add(taskFn, options);
    }
}

设计模式与最佳实践

在构建和使用并发控制器时,遵循一些设计模式和最佳实践可以提高代码质量、可维护性和健壮性。

  1. Promise 封装: 确保所有任务函数都返回 Promise。这使得异步操作结果的处理变得统一,无论是成功还是失败。
  2. 职责分离: ConcurrentLimiter 类的核心职责是调度任务。任务本身的业务逻辑应该由传入的 taskFn 负责。这样,控制器可以专注于其调度功能,而无需关心具体任务的实现细节。
  3. TypeScript 增强: 使用 TypeScript 能够提供类型安全,减少运行时错误,并提高代码的可读性和可维护性。我们上面的示例已经广泛使用了 TypeScript。
  4. 可测试性: 将逻辑封装在类中,并暴露公共方法,使得单元测试变得容易。可以模拟异步任务,然后断言 activeCounttaskQueue 的状态以及事件是否正确触发。
  5. 内存管理: 长期运行的队列如果不断有任务失败并重试,或者任务卡住,可能会导致队列无限增长,造成内存泄漏。在设计时要考虑:
    • 为队列设置最大长度,超出则拒绝新任务。
    • 对重试次数进行严格限制。
    • 提供 clear() 方法来强制清空队列。
  6. 错误分类: 对于不同类型的错误(如网络错误、业务逻辑错误、取消错误),应进行分类处理。这有助于决定是否重试、是否需要通知用户,或者只是默默地记录日志。

实际应用场景

这种并发控制器在各种应用中都有广泛的用途:

前端应用

  • 图片懒加载与预加载: 控制同时加载的图片数量,防止网络拥堵,提高页面渲染速度。
  • 批量 API 请求: 当需要一次性发送大量数据到后端(如保存表单、同步数据)时,限制并发数可以避免服务器过载和浏览器网络栈的压力。
  • 文件上传/下载队列: 限制同时上传或下载的文件数量,提供更好的用户体验和带宽管理。
  • 轮询与长连接管理: 限制同时进行的轮询请求,或者当长连接断开后,控制重连的频率。

Node.js 后端服务

  • 数据库批量操作: 限制同时写入或查询数据库的连接数,防止数据库连接池耗尽或数据库服务器过载。
  • 第三方 API 调用: 严格遵守第三方服务的 API 限流策略,避免被封禁 IP 或拒绝请求。
  • 文件处理: 在处理大量文件(如图像处理、文件压缩、数据导入导出)时,控制并发数以利用 CPU 核心数,同时避免内存溢出。
  • 消息队列消费者: 当从消息队列中拉取消息并进行处理时,限制并发处理的消息数量,以匹配下游服务的处理能力。

微服务架构

  • 服务间调用限流: 在客户端(调用方)实现并发限制,作为熔断和降级策略的一部分,保护被调用服务不受突发流量冲击。
  • 批量数据同步: 不同微服务之间进行数据同步时,控制数据传输和处理的并发量。

对比其他并发控制方案

在 JavaScript 生态中,有几种方式可以进行并发控制,但它们各有侧重。

Promise.all / Promise.allSettled

  • 特点: 这两个函数适用于“一次性”地并行执行一组已知的 Promise。它们会等待所有 Promise 完成(或拒绝)后,统一返回结果。
  • 区别: 它们是静态的,不具备动态调度和限制并发的能力。如果你有 1000 个任务,Promise.all 会立即启动所有 1000 个任务,这正是我们想要避免的。它们适用于已知数量且可以全部同时启动的场景。

第三方库

社区中已经有许多成熟的并发控制库,它们通常提供了比我们手动实现更完善的功能和更鲁棒的错误处理。

  • p-limit: 一个轻量级的库,专注于限制 Promise 并发数,功能与我们基础实现类似,但更加精炼和经过测试。
  • async (Node.js): 提供了一套丰富的异步流程控制工具,包括 async.queueasync.mapLimit 等,功能强大且稳定。
  • bottleneck: 一个功能全面的限流和并发控制库,支持基于时间窗口的速率限制、并发限制、优先级、重试、集群模式等。

为什么还要自己实现?
理解其内部原理至关重要。通过手动实现,我们可以:

  1. 深入理解异步调度和资源管理的机制
  2. 根据特定业务需求进行定制化,而无需引入一个可能过于庞大或不完全匹配需求的第三方库。
  3. 提升解决复杂问题的能力

一个更完整的并发控制器实现

以下整合了上述所有进阶特性,形成一个更完整的并发控制器。

import EventEmitter from 'events';

/**
 * @typedef {Object} TaskOptions
 * @property {number} [priority=0] - 任务的优先级,数字越大优先级越高。
 * @property {number} [retries=0] - 任务失败时的最大重试次数。
 * @property {number} [retryDelay=0] - 重试前的等待毫秒数。
 * @property {(error: any, attempt: number) => boolean} [shouldRetry] - 自定义重试判断函数。
 * @property {AbortSignal} [signal] - 外部提供的用于取消任务的 AbortSignal。
 */

/**
 * @typedef {Object} QueuedTaskInternal
 * @property {number} id - 任务的唯一标识符。
 * @property {(signal?: AbortSignal) => Promise<any>} fn - 实际要执行的异步函数。
 * @property {(value: any) => void} resolve - 任务 Promise 的 resolve 方法。
 * @property {(reason?: any) => void} reject - 任务 Promise 的 reject 方法。
 * @property {TaskOptions} options - 任务的配置选项。
 * @property {number} attempt - 当前重试的次数(从 0 开始)。
 * @property {AbortController} abortController - 用于任务取消的内部 AbortController 实例。
 */

class FullFeaturedConcurrentLimiter {
    private limit: number;
    private activeCount: number = 0;
    private taskQueue: QueuedTaskInternal[] = [];
    private taskIdCounter: number = 0;
    private eventEmitter: EventEmitter;
    private isPaused: boolean = false;
    private isDraining: boolean = false;
    private isIdle: boolean = true; // 初始状态为空闲

    constructor(limit: number) {
        if (limit < 1) {
            throw new Error('Concurrency limit must be at least 1.');
        }
        this.limit = limit;
        this.eventEmitter = new EventEmitter();
        console.log(`FullFeaturedConcurrentLimiter initialized with limit: ${this.limit}`);
        this._checkIdleState(); // 初始化时检查一次空闲状态
    }

    /**
     * 添加一个异步任务到队列中。
     * @param {(signal?: AbortSignal) => Promise<T>} taskFn - 要执行的异步函数,必须返回一个 Promise,且可接收一个 AbortSignal。
     * @param {TaskOptions} [options] - 任务的配置选项。
     * @returns {Promise<T>} - 一个 Promise,它会解析或拒绝为 taskFn 的结果。
     */
    public add<T>(
        taskFn: (signal?: AbortSignal) => Promise<T>,
        options?: TaskOptions
    ): Promise<T> {
        if (this.isDraining) {
            return Promise.reject(new Error('Limiter is draining, no new tasks allowed.'));
        }

        const taskId = ++this.taskIdCounter;
        const abortController = new AbortController();

        // 绑定外部 AbortSignal (如果存在)
        if (options?.signal) {
            options.signal.addEventListener('abort', () => {
                abortController.abort();
            }, { once: true });
        }

        return new Promise<T>((resolve, reject) => {
            const newTask: QueuedTaskInternal = {
                id: taskId,
                fn: taskFn,
                resolve,
                reject,
                options: { retries: 0, retryDelay: 0, priority: 0, ...options },
                attempt: 0,
                abortController: abortController,
            };

            // 插入到正确的位置以保持优先级顺序
            let inserted = false;
            for (let i = 0; i < this.taskQueue.length; i++) {
                if ((newTask.options?.priority || 0) > (this.taskQueue[i].options?.priority || 0)) {
                    this.taskQueue.splice(i, 0, newTask);
                    inserted = true;
                    break;
                }
            }
            if (!inserted) {
                this.taskQueue.push(newTask);
            }

            this.eventEmitter.emit('task:added', {
                id: taskId,
                queueLength: this.taskQueue.length,
                options
            });
            this._checkIdleState();
            this._runNext();
        });
    }

    /**
     * 内部方法,尝试从队列中取出任务并执行。
     * 只有当当前活跃任务数小于限制、队列中有任务且控制器未暂停时才会执行。
     * @private
     */
    private _runNext(): void {
        if (this.isPaused || this.activeCount >= this.limit || this.taskQueue.length === 0) {
            return;
        }

        const task = this.taskQueue.shift();
        if (!task) {
            return;
        }

        this.activeCount++;
        this._executeTask(task);
    }

    /**
     * 执行单个任务,包括重试和取消逻辑。
     * @private
     * @param {QueuedTaskInternal} task - 要执行的任务对象。
     */
    private _executeTask(task: QueuedTaskInternal): void {
        this.eventEmitter.emit('task:start', { id: task.id, attempt: task.attempt + 1 });
        console.log(`[Task ${task.id}] Starting (Attempt: ${task.attempt + 1}/${(task.options?.retries || 0) + 1}), Active: ${this.activeCount}, Queue: ${this.taskQueue.length}`);

        task.fn(task.abortController.signal)
            .then(result => {
                task.resolve(result);
                this.eventEmitter.emit('task:complete', { id: task.id, result });
            })
            .catch(error => {
                if (error instanceof DOMException && error.name === 'AbortError') {
                    task.reject(error);
                    this.eventEmitter.emit('task:cancelled', { id: task.id, reason: error });
                    console.warn(`[Task ${task.id}] was aborted.`);
                } else {
                    const maxRetries = task.options?.retries || 0;
                    const shouldRetryFn = task.options?.shouldRetry;
                    const currentAttempt = task.attempt;

                    let doRetry = false;
                    if (shouldRetryFn) {
                        doRetry = shouldRetryFn(error, currentAttempt + 1);
                    } else {
                        doRetry = currentAttempt < maxRetries;
                    }

                    if (doRetry) {
                        task.attempt++;
                        const retryDelay = task.options?.retryDelay || 0;
                        this.eventEmitter.emit('task:retry', { id: task.id, attempt: task.attempt, error });
                        console.warn(`[Task ${task.id}] Failed, retrying in ${retryDelay}ms. Attempt ${task.attempt}/${maxRetries + 1}. Error: ${error.message}`);
                        setTimeout(() => {
                            this.taskQueue.unshift(task); // 重新放入队列头部
                            this.taskQueue.sort((a, b) => (b.options?.priority || 0) - (a.options?.priority || 0)); // 重新排序以维护优先级
                            this._runNext();
                        }, retryDelay);
                    } else {
                        task.reject(error);
                        this.eventEmitter.emit('task:error', { id: task.id, error, finalAttempt: true });
                        console.error(`[Task ${task.id}] Permanently failed after ${currentAttempt + 1} attempts. Error: ${error.message}`);
                    }
                }
            })
            .finally(() => {
                this.activeCount--;
                this.eventEmitter.emit('status:change', {
                    active: this.activeCount,
                    queue: this.getQueueLength(),
                    limit: this.limit
                });
                console.log(`[Task ${task.id}] Finished. Active: ${this.activeCount}, Queue: ${this.taskQueue.length}`);

                // 如果任务不是因重试而重新入队,则调度下一个任务
                if (!this.taskQueue.includes(task)) {
                    this._runNext();
                }
                this._checkIdleState(); // 每次任务完成都检查空闲状态
            });
    }

    /**
     * 取消一个等待中的任务。
     * @param {number} taskId - 要取消的任务的 ID。
     * @returns {boolean} - 如果任务被找到并取消,返回 true;否则返回 false。
     */
    public cancelTask(taskId: number): boolean {
        const taskIndex = this.taskQueue.findIndex(task => task.id === taskId);
        if (taskIndex !== -1) {
            const [task] = this.taskQueue.splice(taskIndex, 1);
            task.abortController.abort();
            task.reject(new DOMException('Task cancelled by limiter.', 'AbortError'));
            this.eventEmitter.emit('task:cancelled', { id: task.id, reason: 'Limiter cancelled' });
            console.log(`[Task ${taskId}] cancelled from queue.`);
            this._checkIdleState();
            return true;
        }
        // 对于正在执行的任务,我们只能触发其 AbortSignal,任务自身需要响应。
        // 这里不直接从 activeCount 中移除,等待任务自行终止。
        console.warn(`[Task ${taskId}] not found in queue or already running. Cannot directly cancel.`);
        return false;
    }

    /**
     * 暂停任务调度。
     * 正在执行的任务将继续,但不会从队列中取出新任务。
     */
    public pause(): void {
        this.isPaused = true;
        this.eventEmitter.emit('paused');
        console.log('Limiter paused.');
    }

    /**
     * 恢复任务调度。
     * 如果有待处理任务且有空闲槽位,将继续执行。
     */
    public resume(): void {
        this.isPaused = false;
        this.eventEmitter.emit('resumed');
        console.log('Limiter resumed.');
        this._runNext(); // 恢复后尝试立即运行
    }

    /**
     * 清空等待队列,取消所有待处理任务。
     * 正在执行的任务不受影响。
     */
    public clear(): void {
        const cancelledTasks = this.taskQueue.splice(0, this.taskQueue.length);
        cancelledTasks.forEach(task => {
            task.abortController.abort();
            task.reject(new DOMException('Limiter cleared, task cancelled.', 'AbortError'));
            this.eventEmitter.emit('task:cancelled', { id: task.id, reason: 'Limiter cleared' });
        });
        this.eventEmitter.emit('queue:cleared', { count: cancelledTasks.length });
        this._checkIdleState();
        console.log(`Queue cleared. ${cancelledTasks.length} tasks cancelled.`);
    }

    /**
     * 停止接受新任务,等待所有现有任务完成,然后进入空闲状态。
     * @returns {Promise<void>} 当所有任务完成时 resolve。
     */
    public async drain(): Promise<void> {
        if (this.isDraining) {
            return Promise.resolve();
        }
        this.isDraining = true;
        this.eventEmitter.emit('drain:start');
        console.log('Limiter draining: no new tasks will be accepted, waiting for existing tasks.');

        await this.waitIdle();
        this.isDraining = false;
        this.eventEmitter.emit('drain:complete');
        console.log('Limiter drain complete.');
    }

    /**
     * 等待所有当前和待处理任务完成。
     * @returns {Promise<void>} 当所有任务完成且队列为空时 resolve。
     */
    public async waitIdle(): Promise<void> {
        if (this.activeCount === 0 && this.taskQueue.length === 0) {
            return Promise.resolve();
        }
        return new Promise(resolve => {
            this.eventEmitter.once('idle', resolve);
        });
    }

    /**
     * 注册事件监听器。
     * @param {string} eventName - 事件名称。
     * @param {(...args: any[]) => void} listener - 事件监听函数。
     * @returns {this}
     */
    public on(eventName: string, listener: (...args: any[]) => void): this {
        this.eventEmitter.on(eventName, listener);
        return this;
    }

    /**
     * 注册只触发一次的事件监听器。
     * @param {string} eventName - 事件名称。
     * @param {(...args: any[]) => void} listener - 事件监听函数。
     * @returns {this}
     */
    public once(eventName: string, listener: (...args: any[]) => void): this {
        this.eventEmitter.once(eventName, listener);
        return this;
    }

    /**
     * 移除事件监听器。
     * @param {string} eventName - 事件名称。
     * @param {(...args: any[]) => void} listener - 事件监听函数。
     * @returns {this}
     */
    public off(eventName: string, listener: (...args: any[]) => void): this {
        this.eventEmitter.off(eventName, listener);
        return this;
    }

    /**
     * 获取当前正在执行的任务数量。
     * @returns {number}
     */
    public getActiveCount(): number {
        return this.activeCount;
    }

    /**
     * 获取当前等待中的任务数量。
     * @returns {number}
     */
    public getQueueLength(): number {
        return this.taskQueue.length;
    }

    /**
     * 内部方法,检查并触发 'idle' 或 'not:idle' 事件。
     * @private
     */
    private _checkIdleState(): void {
        const currentIdle = this.activeCount === 0 && this.taskQueue.length === 0;
        if (currentIdle && !this.isIdle) {
            this.isIdle = true;
            this.eventEmitter.emit('idle');
            console.log('[Status] Limiter is now idle.');
        } else if (!currentIdle && this.isIdle) {
            this.isIdle = false;
            this.eventEmitter.emit('not:idle');
            console.log('[Status] Limiter is no longer idle.');
        }
    }
}

FullFeaturedConcurrentLimiter 方法速查表

方法名称 描述 参数 返回值
constructor 初始化并发控制器。 limit: number – 最大并发数。 FullFeaturedConcurrentLimiter
add 添加一个异步任务。 taskFn: (signal?: AbortSignal) => Promise<T>, options?: TaskOptions Promise<T>
cancelTask 取消一个等待中的任务。 taskId: number boolean
pause 暂停任务调度(已运行任务继续)。 void
resume 恢复任务调度。 void
clear 清空等待队列,取消所有待处理任务。 void
drain 停止接受新任务,等待所有现有任务完成。 Promise<void>
waitIdle 等待所有任务完成且控制器空闲。 Promise<void>
on 注册事件监听器。 eventName: string, listener: Function this
once 注册只触发一次的事件监听器。 eventName: string, listener: Function this
off 移除事件监听器。 eventName: string, listener: Function this
getActiveCount 获取当前正在执行的任务数量。 number
getQueueLength 获取当前等待中的任务数量。 number

FullFeaturedConcurrentLimiter 事件速查表

事件名称 描述 携带数据
task:added 任务被添加到队列。 { id: number, queueLength: number, options: TaskOptions }
task:start 任务开始执行。 { id: number, attempt: number }
task:complete 任务成功完成。 { id: number, result: any }
task:error 任务最终失败(不再重试)。 { id: number, error: any, finalAttempt: boolean }
task:retry 任务失败并准备重试。 { id: number, attempt: number, error: any }
task:cancelled 任务被取消(无论是从队列中移除还是执行中收到信号)。 { id: number, reason: any }
status:change 控制器状态(活跃数、队列长度)发生变化。 { active: number, queue: number, limit: number }
paused 控制器进入暂停状态。
resumed 控制器从暂停状态恢复。
queue:cleared 任务队列被清空。 { count: number }
drain:start 控制器开始进入 draining 状态。
drain:complete 控制器完成 draining 状态。
idle 控制器进入完全空闲状态(无活跃任务,无等待任务)。
not:idle 控制器从空闲状态变为非空闲。

资源与效率的平衡,系统稳定的基石

通过这次深入的探讨,我们不仅构建了一个功能强大的并发请求控制器,更重要的是,我们理解了其背后的核心原理和设计哲学。并发控制并非仅仅是限制请求数量,它代表着对系统资源的精细管理,是对用户体验的深切关怀,更是保障服务稳定运行的基石。在日益复杂的分布式系统和高并发场景下,掌握这种控制能力,将使我们能够设计出更健壮、更高效、更具弹性的应用程序。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注