各位同仁,各位技术爱好者,大家好。
今天,我们将深入探讨一个在现代软件开发中极为常见且至关重要的话题:如何实现一个可控的并发请求函数,确保在任何时刻,正在处理的请求数量都不会超过我们预设的上限 N。这不仅仅是一个技术实现细节,它更是一种对系统资源负责、提升用户体验、保障服务稳定性的设计哲学。
并发控制的必要性与挑战
在互联网应用中,我们经常需要向后端服务器发起大量的异步请求,例如批量上传图片、获取数据列表、发送通知等。如果没有适当的并发控制,一股脑地发出所有请求,可能会带来一系列严重问题:
- 服务器过载 (Server Overload):瞬时涌入的大量请求可能超出服务器的处理能力,导致请求响应变慢、错误率升高,甚至服务崩溃。这如同对服务器进行了一次“非恶意”的分布式拒绝服务(DDoS)攻击。
- 客户端性能下降 (Client-side Performance Degradation):在浏览器环境中,过多的网络请求会占用大量的网络带宽、CPU 和内存资源,导致页面卡顿、响应迟缓,严重影响用户体验。
- API 限流触发 (API Rate Limiting):许多第三方服务或内部 API 都设有请求限流机制。如果短时间内发送的请求数量超过了限制,后续请求将被拒绝,导致业务中断。
- 资源争抢与死锁 (Resource Contention and Deadlock):在某些场景下,并发请求可能涉及到共享资源的访问。不加控制的并发可能导致资源争抢,甚至在设计不当的情况下引发死锁,使系统陷入停滞。
- 内存消耗 (Memory Consumption):每个待处理或正在处理的请求都需要占用一定的内存。如果并发量过高,可能导致内存溢出,尤其是在资源受限的环境中(如移动设备或嵌入式系统)。
因此,实现一个可控的并发请求函数,其核心目标是像交通管制员一样,对汹涌的请求洪流进行疏导和调度,确保交通(请求)既能高效通行,又不会造成拥堵。挑战在于,我们需要在异步环境中,精确地追踪和管理每个请求的状态,并在请求完成或失败后,及时释放“通行证”,允许新的请求进入。
核心概念:信号量与任务队列
要解决并发控制问题,我们首先需要理解两个核心概念:信号量 (Semaphore) 和 任务队列 (Task Queue)。
信号量 (Semaphore)
信号量是计算机科学中一个经典的同步原语,由荷兰计算机科学家 Edsger W. Dijkstra 提出。它本质上是一个计数器,用于控制对共享资源的访问。
- P 操作 (Wait/Acquire):尝试获取一个资源。如果计数器大于 0,则减 1 并继续执行;否则,进程/线程会被阻塞,直到计数器大于 0。
- V 操作 (Signal/Release):释放一个资源。计数器加 1。如果有进程/线程因 P 操作而被阻塞,其中一个将被唤醒。
在我们的并发请求场景中,这个“资源”就是允许执行的请求槽位。信号量的计数值就是我们设定的最大并发数 N。每次一个请求开始执行时,我们执行 P 操作(减少可用槽位);每次请求完成(无论成功或失败)时,我们执行 V 操作(增加可用槽位)。当没有可用槽位时,新的请求就必须等待。
任务队列 (Task Queue)
当信号量指示当前没有可用槽位时,新来的请求不能立即执行。它们需要被“排队”,等待有空闲槽位时再被调度。这就是任务队列的作用。
- 先进先出 (FIFO):最常见的队列策略是先进先出。请求按照它们被添加的顺序依次执行。
- 优先级队列 (Priority Queue):在某些高级场景中,我们可能希望某些请求比其他请求更早地执行。这时可以使用优先级队列,允许高优先级的请求插队。
结合这两个概念,我们的并发控制器将维护一个任务队列来存储待处理的请求,并利用一个内部计数器(模拟信号量)来跟踪当前正在执行的请求数量。当计数器未达到上限且队列中有任务时,就从队列中取出任务并执行。
基础实现:构建一个简单的并发控制器
让我们从一个最基础的并发控制器开始。我们的目标是创建一个类,它能接受一个异步任务函数(返回 Promise),并确保同时执行的任务不超过指定数量 limit。
需求分析
- 接受异步任务: 控制器需要能够接受一个返回 Promise 的函数。
- 限制并发数: 核心功能,确保
activeCount <= limit。 - 任务排队: 当达到并发上限时,新任务应进入队列等待。
- 任务调度: 当有任务完成时,从队列中取出下一个任务执行。
- 结果返回: 每个任务的 Promise 应该能正确地 resolve 或 reject。
设计思路
我们定义一个 ConcurrentLimiter 类,它包含以下核心属性和方法:
limit: 允许的最大并发数。activeCount: 当前正在执行的任务数量。taskQueue: 存储等待执行的任务的队列。每个任务都是一个包含原始函数和其对应的 Promiseresolve/reject方法的对象。add(taskFn): 添加一个新任务到队列。这个方法返回一个 Promise,代表taskFn的执行结果。_runNext(): 内部调度方法,负责检查是否有空闲槽位和待执行任务,并启动下一个任务。
代码实现 (版本 1 – 基础 Promise 调度)
/**
* @typedef {Object} QueuedTask
* @property {Function} fn - 实际要执行的异步函数。
* @property {(value: any) => void} resolve - 任务 Promise 的 resolve 方法。
* @property {(reason?: any) => void} reject - 任务 Promise 的 reject 方法。
*/
class ConcurrentLimiter {
/**
* 最大并发数。
* @private
* @type {number}
*/
private limit: number;
/**
* 当前正在执行的任务数量。
* @private
* @type {number}
*/
private activeCount: number = 0;
/**
* 等待执行的任务队列。
* @private
* @type {QueuedTask[]}
*/
private taskQueue: Array<{
fn: (...args: any[]) => Promise<any>;
resolve: (value: any) => void;
reject: (reason?: any) => void;
}> = [];
/**
* 创建一个并发请求限制器。
* @param {number} limit - 最大并发数。
* @throws {Error} 如果 limit 小于 1。
*/
constructor(limit: number) {
if (limit < 1) {
throw new Error('Concurrency limit must be at least 1.');
}
this.limit = limit;
console.log(`ConcurrentLimiter initialized with limit: ${this.limit}`);
}
/**
* 添加一个异步任务到队列中。
* 如果当前并发数未达到上限,任务会立即执行;否则会进入队列等待。
* @param {(...args: any[]) => Promise<any>} taskFn - 要执行的异步函数,必须返回一个 Promise。
* @returns {Promise<any>} - 一个 Promise,它会解析或拒绝为 taskFn 的结果。
*/
public add<T>(taskFn: (...args: any[]) => Promise<T>): Promise<T> {
return new Promise<T>((resolve, reject) => {
this.taskQueue.push({ fn: taskFn, resolve, reject });
console.log(`Task added. Queue length: ${this.taskQueue.length}`);
this._runNext(); // 尝试运行下一个任务
});
}
/**
* 内部方法,尝试从队列中取出任务并执行。
* 只有当当前活跃任务数小于限制且队列中有任务时才会执行。
* @private
*/
private _runNext(): void {
console.log(`_runNext called. Active: ${this.activeCount}, Limit: ${this.limit}, Queue: ${this.taskQueue.length}`);
if (this.activeCount >= this.limit || this.taskQueue.length === 0) {
console.log('Conditions not met to run next task.');
return; // 达到并发上限或队列为空,不执行
}
const task = this.taskQueue.shift(); // 从队列头部取出一个任务
if (!task) {
return; // 队列可能在其他地方被清空
}
this.activeCount++; // 增加活跃任务计数
console.log(`Starting task. Active count: ${this.activeCount}`);
// 执行任务,并处理其结果
task.fn()
.then(result => {
task.resolve(result); // 任务成功,解析对应的 Promise
})
.catch(error => {
task.reject(error); // 任务失败,拒绝对应的 Promise
})
.finally(() => {
this.activeCount--; // 任务完成(无论成功失败),减少活跃任务计数
console.log(`Task finished. Active count: ${this.activeCount}`);
this._runNext(); // 任务完成后,再次尝试运行下一个任务
});
}
/**
* 获取当前正在执行的任务数量。
* @returns {number}
*/
public getActiveCount(): number {
return this.activeCount;
}
/**
* 获取当前等待中的任务数量。
* @returns {number}
*/
public getQueueLength(): number {
return this.taskQueue.length;
}
}
使用示例
// 模拟一个异步请求
const mockRequest = (id: number, duration: number = 1000): Promise<string> => {
const startTime = Date.now();
return new Promise((resolve, reject) => {
setTimeout(() => {
const endTime = Date.now();
if (id % 5 === 0) { // 模拟一些任务失败
console.error(`Task ${id} FAILED after ${endTime - startTime}ms`);
reject(new Error(`Request ${id} failed`));
} else {
console.log(`Task ${id} completed in ${endTime - startTime}ms`);
resolve(`Data from request ${id}`);
}
}, duration);
});
};
const limiter = new ConcurrentLimiter(3); // 最多同时处理 3 个请求
const tasks = [];
for (let i = 1; i <= 10; i++) {
tasks.push(limiter.add(() => mockRequest(i, Math.random() * 2000 + 500)));
}
Promise.allSettled(tasks)
.then(results => {
console.log('nAll tasks settled:');
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
console.log(`Task ${index + 1} Succeeded: ${result.value}`);
} else {
console.log(`Task ${index + 1} Failed: ${result.reason}`);
}
});
console.log(`Final active count: ${limiter.getActiveCount()}`);
console.log(`Final queue length: ${limiter.getQueueLength()}`);
});
// 预期输出:
// - 最多同时有 3 个 "Starting task" 日志。
// - 任务完成和开始是交替的,保持 activeCount <= 3。
// - 最终所有任务都会被处理。
这个基础实现已经能够满足“最多 N 个请求在处理”的核心需求。它通过 activeCount 和 taskQueue 实现了信号量和任务队列的功能。_runNext 方法是调度的核心,在每次任务添加和任务完成时被调用,以确保系统始终尝试保持最大并发效率。
进阶功能与优化
基础的并发控制器虽然可用,但在实际生产环境中,我们往往需要更强大的功能来处理各种复杂情况。
1. 错误处理与重试机制
任务失败是常态。一个健壮的并发控制器应该能够优雅地处理任务失败,并提供可选的重试机制。
- 错误捕获:
Promise.catch可以捕获任务执行中的错误。 - 重试逻辑: 当任务失败时,可以根据配置决定是否重试。重试次数、重试间隔(例如指数退避)都是重要的参数。
让我们扩展 QueuedTask 接口和 add 方法,允许为每个任务指定重试选项。
/**
* @typedef {Object} TaskOptions
* @property {number} [priority=0] - 任务的优先级,数字越大优先级越高。
* @property {number} [retries=0] - 任务失败时的最大重试次数。
* @property {number} [retryDelay=0] - 重试前的等待毫秒数。
* @property {(error: any, attempt: number) => boolean} [shouldRetry] - 自定义重试判断函数。
*/
/**
* @typedef {Object} QueuedTask
* @property {Function} fn - 实际要执行的异步函数。
* @property {(value: any) => void} resolve - 任务 Promise 的 resolve 方法。
* @property {(reason?: any) => void} reject - 任务 Promise 的 reject 方法。
* @property {TaskOptions} options - 任务的配置选项。
* @property {number} attempt - 当前重试的次数(从 0 开始)。
* @property {number} id - 任务的唯一标识符。
*/
class ConcurrentLimiterWithRetry extends ConcurrentLimiter { // 继承基础类
private taskIdCounter: number = 0;
constructor(limit: number) {
super(limit);
}
public add<T>(
taskFn: (...args: any[]) => Promise<T>,
options?: TaskOptions
): Promise<T> {
const taskId = ++this.taskIdCounter;
return new Promise<T>((resolve, reject) => {
this.taskQueue.push({
id: taskId,
fn: taskFn,
resolve,
reject,
options: { retries: 0, retryDelay: 0, priority: 0, ...options },
attempt: 0,
});
// 在添加任务时,考虑优先级,这里暂时只做简单处理
// 后续会详细讲解优先级队列
this.taskQueue.sort((a, b) => (b.options?.priority || 0) - (a.options?.priority || 0));
this._runNext();
});
}
// 重写 _runNext 方法以支持重试
protected _runNext(): void {
if (this.activeCount >= this.limit || this.taskQueue.length === 0) {
return;
}
const taskIndex = this.taskQueue.findIndex(
t => t.attempt === 0 || t.options.retries > 0 // 寻找可以执行的任务
);
if (taskIndex === -1) {
return; // 没有可以执行的任务
}
const [task] = this.taskQueue.splice(taskIndex, 1); // 取出任务
this.activeCount++;
this._executeTask(task); // 抽离任务执行逻辑
}
/**
* 执行单个任务,包括重试逻辑。
* @private
* @param {QueuedTask} task - 要执行的任务对象。
*/
private _executeTask(task: QueuedTask): void {
console.log(`[Task ${task.id}] Starting (Attempt: ${task.attempt + 1}/${(task.options?.retries || 0) + 1}), Active: ${this.activeCount}`);
task.fn()
.then(result => {
task.resolve(result);
})
.catch(error => {
const maxRetries = task.options?.retries || 0;
const shouldRetryFn = task.options?.shouldRetry;
const currentAttempt = task.attempt;
let doRetry = false;
if (shouldRetryFn) {
doRetry = shouldRetryFn(error, currentAttempt + 1);
} else {
doRetry = currentAttempt < maxRetries;
}
if (doRetry) {
task.attempt++;
const retryDelay = task.options?.retryDelay || 0;
console.warn(`[Task ${task.id}] Failed, retrying in ${retryDelay}ms. Attempt ${task.attempt}/${maxRetries + 1}. Error: ${error.message}`);
setTimeout(() => {
this.taskQueue.unshift(task); // 重新放入队列头部,或者考虑优先级
this._runNext(); // 再次尝试运行
}, retryDelay);
} else {
console.error(`[Task ${task.id}] Permanently failed after ${currentAttempt + 1} attempts. Error: ${error.message}`);
task.reject(error);
}
})
.finally(() => {
this.activeCount--;
console.log(`[Task ${task.id}] Finished. Active: ${this.activeCount}`);
// 如果是重试任务,_runNext 会在 setTimeout 中被调用
// 如果是最终完成或失败的任务,在此调用_runNext
if (task.attempt === (task.options?.retries || 0) || !this.taskQueue.includes(task)) { // 确保不是正在等待重试的任务
this._runNext();
}
});
}
}
重试逻辑的考量点:
- 指数退避 (Exponential Backoff):重试间隔随着重试次数的增加而增长,这是一种常见的策略,可以避免在服务暂时不可用时,不断地给服务器施加压力。例如,
retryDelay * (2 ** attempt)。 - 自定义重试条件:
shouldRetry函数允许开发者根据错误类型或业务逻辑决定是否重试。
2. 任务优先级
在某些场景下,我们可能希望某些任务能够优先执行,即使它们是后加入队列的。例如,用户主动触发的操作可能比后台数据同步操作更重要。
实现优先级队列通常有两种方式:
- 每次添加任务时排序: 效率较低,如果队列很长,排序开销会很大。
- 使用专门的优先级队列数据结构: 更高效,如二叉堆 (Binary Heap)。
对于大多数情况,如果队列不是特别大,简单的插入排序或在 add 时进行一次排序就足够了。我们可以在 add 方法中,根据任务的 priority 属性将任务插入到队列的正确位置。
// 在 ConcurrentLimiterWithRetry 的 add 方法中:
public add<T>(
taskFn: (...args: any[]) => Promise<T>,
options?: TaskOptions
): Promise<T> {
const taskId = ++this.taskIdCounter;
return new Promise<T>((resolve, reject) => {
const newTask = {
id: taskId,
fn: taskFn,
resolve,
reject,
options: { retries: 0, retryDelay: 0, priority: 0, ...options },
attempt: 0,
};
// 插入到正确的位置以保持优先级顺序
let inserted = false;
for (let i = 0; i < this.taskQueue.length; i++) {
if ((newTask.options?.priority || 0) > (this.taskQueue[i].options?.priority || 0)) {
this.taskQueue.splice(i, 0, newTask);
inserted = true;
break;
}
}
if (!inserted) {
this.taskQueue.push(newTask); // 如果是最低优先级或队列为空,则添加到末尾
}
console.log(`[Task ${taskId}] added with priority ${newTask.options?.priority}. Queue length: ${this.taskQueue.length}`);
this._runNext();
});
}
// _runNext 方法现在直接从队列头部取任务即可,因为队列已经排序
protected _runNext(): void {
if (this.activeCount >= this.limit || this.taskQueue.length === 0) {
return;
}
// 确保取出的任务不是正在等待重试的
const task = this.taskQueue.shift(); // 队列头部已经是最高优先级的任务
if (!task) {
return;
}
// ... (rest of _runNext logic remains similar, calling _executeTask)
this.activeCount++;
this._executeTask(task);
}
3. 取消任务 (Task Cancellation)
有时,用户可能在任务完成前就离开了页面,或者业务逻辑不再需要某个任务的结果。这时,能够取消正在等待或正在执行的任务就变得很重要。
实现任务取消的关键是:
- 任务可感知取消: 任务函数本身需要能够检查是否已被取消。
- 取消信号传播: 控制器需要提供一种机制来通知任务其已被取消。
AbortController 是现代 JavaScript 中用于取消异步操作的标准机制。
// 扩展 TaskOptions 和 QueuedTask
interface TaskOptions {
// ... 其他选项
signal?: AbortSignal; // 用于任务取消的 AbortSignal
}
interface QueuedTask {
// ... 其他属性
abortController?: AbortController; // 用于控制器内部取消
}
class ConcurrentLimiterWithCancel extends ConcurrentLimiterWithRetry {
// ... 构造函数等
public add<T>(
taskFn: (signal?: AbortSignal) => Promise<T>, // 任务函数现在可以接收一个 signal
options?: TaskOptions
): Promise<T> {
const taskId = ++this.taskIdCounter;
const abortController = new AbortController(); // 为每个任务创建 AbortController
// 如果外部提供了 signal,则连接起来
if (options?.signal) {
options.signal.addEventListener('abort', () => {
abortController.abort();
});
}
return new Promise<T>((resolve, reject) => {
const newTask: QueuedTask = {
id: taskId,
fn: () => taskFn(abortController.signal), // 将 signal 传递给任务函数
resolve,
reject,
options: { retries: 0, retryDelay: 0, priority: 0, ...options },
attempt: 0,
abortController: abortController, // 保存 controller 实例
};
// ... (优先级排序逻辑,与之前相同)
// 假设已添加到 this.taskQueue
this.taskQueue.push(newTask);
this.taskQueue.sort((a, b) => (b.options?.priority || 0) - (a.options?.priority || 0));
console.log(`[Task ${taskId}] added. Queue length: ${this.taskQueue.length}`);
this._runNext();
});
}
/**
* 取消一个等待中的任务。
* @param {number} taskId - 要取消的任务的 ID。
* @returns {boolean} - 如果任务被找到并取消,返回 true;否则返回 false。
*/
public cancelTask(taskId: number): boolean {
const taskIndex = this.taskQueue.findIndex(task => task.id === taskId);
if (taskIndex !== -1) {
const [task] = this.taskQueue.splice(taskIndex, 1);
task.abortController?.abort(); // 发送取消信号
task.reject(new DOMException('Task cancelled by user.', 'AbortError')); // 拒绝任务 Promise
console.log(`[Task ${taskId}] cancelled from queue.`);
return true;
}
// 对于正在执行的任务,我们只能发送 signal,任务自身需要响应
// 这里暂时不处理正在执行的任务的直接移除,因为它们已经占用了 activeCount
// 任务内部接收到 signal 后会自行处理
console.warn(`[Task ${taskId}] not found in queue or already running.`);
return false;
}
// _executeTask 中无需特别改动,因为任务函数自身会监听 signal
// 如果任务函数内部能够响应 signal 并抛出 AbortError,那么它就会进入 catch 块
// 我们可以在 catch 块中检查错误类型
private _executeTask(task: QueuedTask): void {
console.log(`[Task ${task.id}] Starting (Attempt: ${task.attempt + 1}), Active: ${this.activeCount}`);
task.fn(task.abortController?.signal) // 传递 signal 给任务函数
.then(result => {
task.resolve(result);
})
.catch(error => {
if (error instanceof DOMException && error.name === 'AbortError') {
console.warn(`[Task ${task.id}] was aborted.`);
task.reject(error); // 任务被取消,直接拒绝
} else {
// ... (原有重试逻辑)
// ... 如果不是 AbortError,执行重试或永久失败逻辑
}
})
.finally(() => {
this.activeCount--;
console.log(`[Task ${task.id}] Finished. Active: ${this.activeCount}`);
if (task.attempt === (task.options?.retries || 0) || !this.taskQueue.includes(task)) {
this._runNext();
}
});
}
}
// 模拟一个可取消的请求
const cancellableMockRequest = (id: number, signal?: AbortSignal, duration: number = 1000): Promise<string> => {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
if (signal?.aborted) {
reject(new DOMException('Request aborted during delay', 'AbortError'));
return;
}
console.log(`Task ${id} completed.`);
resolve(`Data from request ${id}`);
}, duration);
signal?.addEventListener('abort', () => {
clearTimeout(timeoutId);
reject(new DOMException('Request aborted by signal', 'AbortError'));
}, { once: true });
});
};
// 使用示例
const limiterWithCancel = new ConcurrentLimiterWithCancel(2);
const task1Promise = limiterWithCancel.add(() => cancellableMockRequest(1, undefined, 2000));
const task2Promise = limiterWithCancel.add(() => cancellableMockRequest(2, undefined, 1000));
const task3Promise = limiterWithCancel.add(() => cancellableMockRequest(3, undefined, 3000)); // 这个任务会排队
// 假设我们有一个外部的 AbortController 来管理一组任务
const externalAbortController = new AbortController();
const task4Promise = limiterWithCancel.add(() => cancellableMockRequest(4, externalAbortController.signal, 4000));
setTimeout(() => {
// 假设我们想取消 task3,因为它还在队列中
limiterWithCancel.cancelTask(3);
}, 500);
setTimeout(() => {
// 假设我们想取消 task4,它可能正在运行或等待
externalAbortController.abort(); // 触发 task4 的 AbortSignal
}, 1500);
Promise.allSettled([task1Promise, task2Promise, task3Promise, task4Promise])
.then(results => {
console.log('nAll cancellable tasks settled:');
results.forEach((result, index) => {
console.log(`Task ${index + 1} status: ${result.status}`);
if (result.status === 'rejected') {
console.log(` Reason: ${result.reason?.name || result.reason}`);
}
});
});
4. 进度反馈与状态管理
在长时间运行的并发任务中,用户或上层应用可能需要了解当前的状态,例如:有多少任务等待中,有多少任务已完成,总任务数是多少等。这可以通过事件发射器 (EventEmitter) 来实现。
import EventEmitter from 'events'; // Node.js 环境,浏览器可以使用自定义的 EventEmitter 实现
// 扩展 ConcurrentLimiterWithCancel
class ConcurrentLimiterWithEvents extends ConcurrentLimiterWithCancel {
private eventEmitter: EventEmitter;
constructor(limit: number) {
super(limit);
this.eventEmitter = new EventEmitter();
}
/**
* 注册事件监听器。
* @param {string} eventName - 事件名称。
* @param {(...args: any[]) => void} listener - 事件监听函数。
* @returns {this}
*/
public on(eventName: string, listener: (...args: any[]) => void): this {
this.eventEmitter.on(eventName, listener);
return this;
}
/**
* 注册只触发一次的事件监听器。
* @param {string} eventName - 事件名称。
* @param {(...args: any[]) => void} listener - 事件监听函数。
* @returns {this}
*/
public once(eventName: string, listener: (...args: any[]) => void): this {
this.eventEmitter.once(eventName, listener);
return this;
}
/**
* 移除事件监听器。
* @param {string} eventName - 事件名称。
* @param {(...args: any[]) => void} listener - 事件监听函数。
* @returns {this}
*/
public off(eventName: string, listener: (...args: any[]) => void): this {
this.eventEmitter.off(eventName, listener);
return this;
}
// 在关键操作点触发事件
public add<T>(taskFn: (signal?: AbortSignal) => Promise<T>, options?: TaskOptions): Promise<T> {
// ... (原有 add 逻辑)
const promise = super.add(taskFn, options);
this.eventEmitter.emit('task:added', {
id: this.taskIdCounter,
queueLength: this.getQueueLength(),
options
});
this._checkIdleState(); // 检查是否从空闲状态变为非空闲
return promise;
}
protected _executeTask(task: QueuedTask): void {
this.eventEmitter.emit('task:start', { id: task.id, attempt: task.attempt + 1 });
super._executeTask(task); // 调用父类的执行逻辑
}
// 重写父类的 _executeTask 中的 finally 块,以触发完成/失败事件
private _executeTaskWithEvents(task: QueuedTask): void {
task.fn(task.abortController?.signal)
.then(result => {
task.resolve(result);
this.eventEmitter.emit('task:complete', { id: task.id, result });
})
.catch(error => {
if (error instanceof DOMException && error.name === 'AbortError') {
task.reject(error);
this.eventEmitter.emit('task:cancelled', { id: task.id, reason: error });
} else {
const maxRetries = task.options?.retries || 0;
const shouldRetryFn = task.options?.shouldRetry;
const currentAttempt = task.attempt;
let doRetry = false;
if (shouldRetryFn) {
doRetry = shouldRetryFn(error, currentAttempt + 1);
} else {
doRetry = currentAttempt < maxRetries;
}
if (doRetry) {
task.attempt++;
const retryDelay = task.options?.retryDelay || 0;
this.eventEmitter.emit('task:retry', { id: task.id, attempt: task.attempt, error });
setTimeout(() => {
this.taskQueue.unshift(task);
this._runNext();
}, retryDelay);
} else {
task.reject(error);
this.eventEmitter.emit('task:error', { id: task.id, error, finalAttempt: true });
}
}
})
.finally(() => {
this.activeCount--;
this.eventEmitter.emit('status:change', {
active: this.activeCount,
queue: this.getQueueLength(),
limit: this.limit
});
// Check if we are truly idle after a task finishes
this._checkIdleState();
if (task.attempt === (task.options?.retries || 0) || !this.taskQueue.includes(task)) {
this._runNext();
}
});
}
// 覆盖父类的 _executeTask 方法
protected _executeTask(task: QueuedTask): void {
this._executeTaskWithEvents(task);
}
// 辅助方法检查是否进入/离开空闲状态
private _isIdle: boolean = true;
private _checkIdleState(): void {
const currentIdle = this.activeCount === 0 && this.taskQueue.length === 0;
if (currentIdle && !this._isIdle) {
this._isIdle = true;
this.eventEmitter.emit('idle');
console.log('Limiter is now idle.');
} else if (!currentIdle && this._isIdle) {
this._isIdle = false;
this.eventEmitter.emit('not:idle');
console.log('Limiter is no longer idle.');
}
}
// 清空队列,并触发事件
public clear(): void {
const cancelledTasks = this.taskQueue.splice(0, this.taskQueue.length);
cancelledTasks.forEach(task => {
task.abortController?.abort();
task.reject(new DOMException('Limiter cleared, task cancelled.', 'AbortError'));
this.eventEmitter.emit('task:cancelled', { id: task.id, reason: 'Limiter cleared' });
});
this.eventEmitter.emit('queue:cleared', { count: cancelledTasks.length });
this._checkIdleState();
console.log(`Queue cleared. ${cancelledTasks.length} tasks cancelled.`);
}
/**
* 等待所有当前和待处理任务完成。
* @returns {Promise<void>} 当所有任务完成且队列为空时 resolve。
*/
public async waitIdle(): Promise<void> {
if (this.activeCount === 0 && this.taskQueue.length === 0) {
return Promise.resolve();
}
return new Promise(resolve => {
this.eventEmitter.once('idle', resolve);
});
}
}
// 示例:监听事件
const limiterWithEvents = new ConcurrentLimiterWithEvents(2);
limiterWithEvents.on('task:added', ({ id, queueLength }) => console.log(`[Event] Task ${id} added. Queue: ${queueLength}`));
limiterWithEvents.on('task:start', ({ id, attempt }) => console.log(`[Event] Task ${id} started (Attempt ${attempt}).`));
limiterWithEvents.on('task:complete', ({ id, result }) => console.log(`[Event] Task ${id} completed with result: ${result}`));
limiterWithEvents.on('task:error', ({ id, error, finalAttempt }) => console.error(`[Event] Task ${id} failed (finalAttempt: ${finalAttempt}): ${error.message}`));
limiterWithEvents.on('task:retry', ({ id, attempt, error }) => console.warn(`[Event] Task ${id} retrying (Attempt ${attempt}): ${error.message}`));
limiterWithEvents.on('task:cancelled', ({ id, reason }) => console.warn(`[Event] Task ${id} cancelled: ${reason.message || reason}`));
limiterWithEvents.on('status:change', (status) => console.log(`[Event] Status: Active: ${status.active}, Queue: ${status.queue}`));
limiterWithEvents.on('idle', () => console.log('[Event] Limiter is completely idle.'));
const taskPromises = [];
taskPromises.push(limiterWithEvents.add(() => mockRequest(1, 1500)));
taskPromises.push(limiterWithEvents.add(() => mockRequest(2, 1000), { retries: 1, retryDelay: 200 }));
taskPromises.push(limiterWithEvents.add(() => mockRequest(3, 2500))); // Queue
taskPromises.push(limiterWithEvents.add(() => mockRequest(4, 500), { priority: 1 })); // 高优先级,可能插队
// 模拟任务 2 首次失败
const originalMockRequest = mockRequest;
mockRequest = (id, duration) => {
if (id === 2 && limiterWithEvents.getActiveCount() === 1) { // 第一次执行 task 2 失败
return Promise.reject(new Error('Simulated failure for Task 2'));
}
return originalMockRequest(id, duration);
};
Promise.allSettled(taskPromises).then(() => {
console.log('nAll tasks submitted to limiterWithEvents have settled.');
limiterWithEvents.waitIdle().then(() => {
console.log('All tasks are truly finished and limiter is idle.');
});
});
5. 生命周期管理
控制器还需要一些方法来管理其自身的生命周期,例如暂停、恢复、清空队列和等待所有任务完成。
pause(): 暂停调度,不再启动新的任务,但已在运行的任务会继续。resume(): 恢复调度。clear(): 清空等待队列,取消所有待处理任务。waitIdle(): 返回一个 Promise,当所有任务完成(包括当前正在运行和队列中的任务)且控制器进入空闲状态时 resolve。
pause() 和 resume() 可以通过一个 isPaused 标志来实现。
// 扩展 ConcurrentLimiterWithEvents
class ConcurrentLimiterFullFeatures extends ConcurrentLimiterWithEvents {
private isPaused: boolean = false;
private isDraining: boolean = false; // 是否正在等待所有任务完成并清空
constructor(limit: number) {
super(limit);
}
/**
* 暂停任务调度。
* 正在执行的任务将继续,但不会从队列中取出新任务。
*/
public pause(): void {
this.isPaused = true;
this.eventEmitter.emit('paused');
console.log('Limiter paused.');
}
/**
* 恢复任务调度。
* 如果有待处理任务且有空闲槽位,将继续执行。
*/
public resume(): void {
this.isPaused = false;
this.eventEmitter.emit('resumed');
console.log('Limiter resumed.');
this._runNext(); // 恢复后尝试立即运行
}
// 重写 _runNext 以考虑暂停状态
protected _runNext(): void {
if (this.isPaused || this.activeCount >= this.limit || this.taskQueue.length === 0) {
return;
}
super._runNext(); // 调用父类的调度逻辑
}
/**
* 清空等待队列,取消所有待处理任务。
* 正在执行的任务不受影响。
*/
public clear(): void {
// ... (与 ConcurrentLimiterWithEvents 中的 clear 逻辑相同)
super.clear();
}
/**
* 等待所有当前和待处理任务完成。
* @returns {Promise<void>} 当所有任务完成且队列为空时 resolve。
*/
public async waitIdle(): Promise<void> {
return super.waitIdle();
}
/**
* 停止接受新任务,等待所有现有任务完成,然后进入空闲状态。
* @returns {Promise<void>} 当所有任务完成时 resolve。
*/
public async drain(): Promise<void> {
if (this.isDraining) {
return Promise.resolve(); // 已经处于 draining 状态
}
this.isDraining = true;
this.eventEmitter.emit('drain:start');
console.log('Limiter draining: no new tasks will be accepted, waiting for existing tasks.');
// 可以选择清空队列中尚未开始的任务,或者等待它们完成
// 这里选择等待它们完成,如果需要清空,可以先调用 clear()
await this.waitIdle();
this.isDraining = false;
this.eventEmitter.emit('drain:complete');
console.log('Limiter drain complete.');
}
// 重写 add 方法,在 draining 状态下拒绝新任务
public add<T>(taskFn: (signal?: AbortSignal) => Promise<T>, options?: TaskOptions): Promise<T> {
if (this.isDraining) {
return Promise.reject(new Error('Limiter is draining, no new tasks allowed.'));
}
return super.add(taskFn, options);
}
}
设计模式与最佳实践
在构建和使用并发控制器时,遵循一些设计模式和最佳实践可以提高代码质量、可维护性和健壮性。
- Promise 封装: 确保所有任务函数都返回 Promise。这使得异步操作结果的处理变得统一,无论是成功还是失败。
- 职责分离:
ConcurrentLimiter类的核心职责是调度任务。任务本身的业务逻辑应该由传入的taskFn负责。这样,控制器可以专注于其调度功能,而无需关心具体任务的实现细节。 - TypeScript 增强: 使用 TypeScript 能够提供类型安全,减少运行时错误,并提高代码的可读性和可维护性。我们上面的示例已经广泛使用了 TypeScript。
- 可测试性: 将逻辑封装在类中,并暴露公共方法,使得单元测试变得容易。可以模拟异步任务,然后断言
activeCount、taskQueue的状态以及事件是否正确触发。 - 内存管理: 长期运行的队列如果不断有任务失败并重试,或者任务卡住,可能会导致队列无限增长,造成内存泄漏。在设计时要考虑:
- 为队列设置最大长度,超出则拒绝新任务。
- 对重试次数进行严格限制。
- 提供
clear()方法来强制清空队列。
- 错误分类: 对于不同类型的错误(如网络错误、业务逻辑错误、取消错误),应进行分类处理。这有助于决定是否重试、是否需要通知用户,或者只是默默地记录日志。
实际应用场景
这种并发控制器在各种应用中都有广泛的用途:
前端应用
- 图片懒加载与预加载: 控制同时加载的图片数量,防止网络拥堵,提高页面渲染速度。
- 批量 API 请求: 当需要一次性发送大量数据到后端(如保存表单、同步数据)时,限制并发数可以避免服务器过载和浏览器网络栈的压力。
- 文件上传/下载队列: 限制同时上传或下载的文件数量,提供更好的用户体验和带宽管理。
- 轮询与长连接管理: 限制同时进行的轮询请求,或者当长连接断开后,控制重连的频率。
Node.js 后端服务
- 数据库批量操作: 限制同时写入或查询数据库的连接数,防止数据库连接池耗尽或数据库服务器过载。
- 第三方 API 调用: 严格遵守第三方服务的 API 限流策略,避免被封禁 IP 或拒绝请求。
- 文件处理: 在处理大量文件(如图像处理、文件压缩、数据导入导出)时,控制并发数以利用 CPU 核心数,同时避免内存溢出。
- 消息队列消费者: 当从消息队列中拉取消息并进行处理时,限制并发处理的消息数量,以匹配下游服务的处理能力。
微服务架构
- 服务间调用限流: 在客户端(调用方)实现并发限制,作为熔断和降级策略的一部分,保护被调用服务不受突发流量冲击。
- 批量数据同步: 不同微服务之间进行数据同步时,控制数据传输和处理的并发量。
对比其他并发控制方案
在 JavaScript 生态中,有几种方式可以进行并发控制,但它们各有侧重。
Promise.all / Promise.allSettled
- 特点: 这两个函数适用于“一次性”地并行执行一组已知的 Promise。它们会等待所有 Promise 完成(或拒绝)后,统一返回结果。
- 区别: 它们是静态的,不具备动态调度和限制并发的能力。如果你有 1000 个任务,
Promise.all会立即启动所有 1000 个任务,这正是我们想要避免的。它们适用于已知数量且可以全部同时启动的场景。
第三方库
社区中已经有许多成熟的并发控制库,它们通常提供了比我们手动实现更完善的功能和更鲁棒的错误处理。
p-limit: 一个轻量级的库,专注于限制 Promise 并发数,功能与我们基础实现类似,但更加精炼和经过测试。async(Node.js): 提供了一套丰富的异步流程控制工具,包括async.queue和async.mapLimit等,功能强大且稳定。bottleneck: 一个功能全面的限流和并发控制库,支持基于时间窗口的速率限制、并发限制、优先级、重试、集群模式等。
为什么还要自己实现?
理解其内部原理至关重要。通过手动实现,我们可以:
- 深入理解异步调度和资源管理的机制。
- 根据特定业务需求进行定制化,而无需引入一个可能过于庞大或不完全匹配需求的第三方库。
- 提升解决复杂问题的能力。
一个更完整的并发控制器实现
以下整合了上述所有进阶特性,形成一个更完整的并发控制器。
import EventEmitter from 'events';
/**
* @typedef {Object} TaskOptions
* @property {number} [priority=0] - 任务的优先级,数字越大优先级越高。
* @property {number} [retries=0] - 任务失败时的最大重试次数。
* @property {number} [retryDelay=0] - 重试前的等待毫秒数。
* @property {(error: any, attempt: number) => boolean} [shouldRetry] - 自定义重试判断函数。
* @property {AbortSignal} [signal] - 外部提供的用于取消任务的 AbortSignal。
*/
/**
* @typedef {Object} QueuedTaskInternal
* @property {number} id - 任务的唯一标识符。
* @property {(signal?: AbortSignal) => Promise<any>} fn - 实际要执行的异步函数。
* @property {(value: any) => void} resolve - 任务 Promise 的 resolve 方法。
* @property {(reason?: any) => void} reject - 任务 Promise 的 reject 方法。
* @property {TaskOptions} options - 任务的配置选项。
* @property {number} attempt - 当前重试的次数(从 0 开始)。
* @property {AbortController} abortController - 用于任务取消的内部 AbortController 实例。
*/
class FullFeaturedConcurrentLimiter {
private limit: number;
private activeCount: number = 0;
private taskQueue: QueuedTaskInternal[] = [];
private taskIdCounter: number = 0;
private eventEmitter: EventEmitter;
private isPaused: boolean = false;
private isDraining: boolean = false;
private isIdle: boolean = true; // 初始状态为空闲
constructor(limit: number) {
if (limit < 1) {
throw new Error('Concurrency limit must be at least 1.');
}
this.limit = limit;
this.eventEmitter = new EventEmitter();
console.log(`FullFeaturedConcurrentLimiter initialized with limit: ${this.limit}`);
this._checkIdleState(); // 初始化时检查一次空闲状态
}
/**
* 添加一个异步任务到队列中。
* @param {(signal?: AbortSignal) => Promise<T>} taskFn - 要执行的异步函数,必须返回一个 Promise,且可接收一个 AbortSignal。
* @param {TaskOptions} [options] - 任务的配置选项。
* @returns {Promise<T>} - 一个 Promise,它会解析或拒绝为 taskFn 的结果。
*/
public add<T>(
taskFn: (signal?: AbortSignal) => Promise<T>,
options?: TaskOptions
): Promise<T> {
if (this.isDraining) {
return Promise.reject(new Error('Limiter is draining, no new tasks allowed.'));
}
const taskId = ++this.taskIdCounter;
const abortController = new AbortController();
// 绑定外部 AbortSignal (如果存在)
if (options?.signal) {
options.signal.addEventListener('abort', () => {
abortController.abort();
}, { once: true });
}
return new Promise<T>((resolve, reject) => {
const newTask: QueuedTaskInternal = {
id: taskId,
fn: taskFn,
resolve,
reject,
options: { retries: 0, retryDelay: 0, priority: 0, ...options },
attempt: 0,
abortController: abortController,
};
// 插入到正确的位置以保持优先级顺序
let inserted = false;
for (let i = 0; i < this.taskQueue.length; i++) {
if ((newTask.options?.priority || 0) > (this.taskQueue[i].options?.priority || 0)) {
this.taskQueue.splice(i, 0, newTask);
inserted = true;
break;
}
}
if (!inserted) {
this.taskQueue.push(newTask);
}
this.eventEmitter.emit('task:added', {
id: taskId,
queueLength: this.taskQueue.length,
options
});
this._checkIdleState();
this._runNext();
});
}
/**
* 内部方法,尝试从队列中取出任务并执行。
* 只有当当前活跃任务数小于限制、队列中有任务且控制器未暂停时才会执行。
* @private
*/
private _runNext(): void {
if (this.isPaused || this.activeCount >= this.limit || this.taskQueue.length === 0) {
return;
}
const task = this.taskQueue.shift();
if (!task) {
return;
}
this.activeCount++;
this._executeTask(task);
}
/**
* 执行单个任务,包括重试和取消逻辑。
* @private
* @param {QueuedTaskInternal} task - 要执行的任务对象。
*/
private _executeTask(task: QueuedTaskInternal): void {
this.eventEmitter.emit('task:start', { id: task.id, attempt: task.attempt + 1 });
console.log(`[Task ${task.id}] Starting (Attempt: ${task.attempt + 1}/${(task.options?.retries || 0) + 1}), Active: ${this.activeCount}, Queue: ${this.taskQueue.length}`);
task.fn(task.abortController.signal)
.then(result => {
task.resolve(result);
this.eventEmitter.emit('task:complete', { id: task.id, result });
})
.catch(error => {
if (error instanceof DOMException && error.name === 'AbortError') {
task.reject(error);
this.eventEmitter.emit('task:cancelled', { id: task.id, reason: error });
console.warn(`[Task ${task.id}] was aborted.`);
} else {
const maxRetries = task.options?.retries || 0;
const shouldRetryFn = task.options?.shouldRetry;
const currentAttempt = task.attempt;
let doRetry = false;
if (shouldRetryFn) {
doRetry = shouldRetryFn(error, currentAttempt + 1);
} else {
doRetry = currentAttempt < maxRetries;
}
if (doRetry) {
task.attempt++;
const retryDelay = task.options?.retryDelay || 0;
this.eventEmitter.emit('task:retry', { id: task.id, attempt: task.attempt, error });
console.warn(`[Task ${task.id}] Failed, retrying in ${retryDelay}ms. Attempt ${task.attempt}/${maxRetries + 1}. Error: ${error.message}`);
setTimeout(() => {
this.taskQueue.unshift(task); // 重新放入队列头部
this.taskQueue.sort((a, b) => (b.options?.priority || 0) - (a.options?.priority || 0)); // 重新排序以维护优先级
this._runNext();
}, retryDelay);
} else {
task.reject(error);
this.eventEmitter.emit('task:error', { id: task.id, error, finalAttempt: true });
console.error(`[Task ${task.id}] Permanently failed after ${currentAttempt + 1} attempts. Error: ${error.message}`);
}
}
})
.finally(() => {
this.activeCount--;
this.eventEmitter.emit('status:change', {
active: this.activeCount,
queue: this.getQueueLength(),
limit: this.limit
});
console.log(`[Task ${task.id}] Finished. Active: ${this.activeCount}, Queue: ${this.taskQueue.length}`);
// 如果任务不是因重试而重新入队,则调度下一个任务
if (!this.taskQueue.includes(task)) {
this._runNext();
}
this._checkIdleState(); // 每次任务完成都检查空闲状态
});
}
/**
* 取消一个等待中的任务。
* @param {number} taskId - 要取消的任务的 ID。
* @returns {boolean} - 如果任务被找到并取消,返回 true;否则返回 false。
*/
public cancelTask(taskId: number): boolean {
const taskIndex = this.taskQueue.findIndex(task => task.id === taskId);
if (taskIndex !== -1) {
const [task] = this.taskQueue.splice(taskIndex, 1);
task.abortController.abort();
task.reject(new DOMException('Task cancelled by limiter.', 'AbortError'));
this.eventEmitter.emit('task:cancelled', { id: task.id, reason: 'Limiter cancelled' });
console.log(`[Task ${taskId}] cancelled from queue.`);
this._checkIdleState();
return true;
}
// 对于正在执行的任务,我们只能触发其 AbortSignal,任务自身需要响应。
// 这里不直接从 activeCount 中移除,等待任务自行终止。
console.warn(`[Task ${taskId}] not found in queue or already running. Cannot directly cancel.`);
return false;
}
/**
* 暂停任务调度。
* 正在执行的任务将继续,但不会从队列中取出新任务。
*/
public pause(): void {
this.isPaused = true;
this.eventEmitter.emit('paused');
console.log('Limiter paused.');
}
/**
* 恢复任务调度。
* 如果有待处理任务且有空闲槽位,将继续执行。
*/
public resume(): void {
this.isPaused = false;
this.eventEmitter.emit('resumed');
console.log('Limiter resumed.');
this._runNext(); // 恢复后尝试立即运行
}
/**
* 清空等待队列,取消所有待处理任务。
* 正在执行的任务不受影响。
*/
public clear(): void {
const cancelledTasks = this.taskQueue.splice(0, this.taskQueue.length);
cancelledTasks.forEach(task => {
task.abortController.abort();
task.reject(new DOMException('Limiter cleared, task cancelled.', 'AbortError'));
this.eventEmitter.emit('task:cancelled', { id: task.id, reason: 'Limiter cleared' });
});
this.eventEmitter.emit('queue:cleared', { count: cancelledTasks.length });
this._checkIdleState();
console.log(`Queue cleared. ${cancelledTasks.length} tasks cancelled.`);
}
/**
* 停止接受新任务,等待所有现有任务完成,然后进入空闲状态。
* @returns {Promise<void>} 当所有任务完成时 resolve。
*/
public async drain(): Promise<void> {
if (this.isDraining) {
return Promise.resolve();
}
this.isDraining = true;
this.eventEmitter.emit('drain:start');
console.log('Limiter draining: no new tasks will be accepted, waiting for existing tasks.');
await this.waitIdle();
this.isDraining = false;
this.eventEmitter.emit('drain:complete');
console.log('Limiter drain complete.');
}
/**
* 等待所有当前和待处理任务完成。
* @returns {Promise<void>} 当所有任务完成且队列为空时 resolve。
*/
public async waitIdle(): Promise<void> {
if (this.activeCount === 0 && this.taskQueue.length === 0) {
return Promise.resolve();
}
return new Promise(resolve => {
this.eventEmitter.once('idle', resolve);
});
}
/**
* 注册事件监听器。
* @param {string} eventName - 事件名称。
* @param {(...args: any[]) => void} listener - 事件监听函数。
* @returns {this}
*/
public on(eventName: string, listener: (...args: any[]) => void): this {
this.eventEmitter.on(eventName, listener);
return this;
}
/**
* 注册只触发一次的事件监听器。
* @param {string} eventName - 事件名称。
* @param {(...args: any[]) => void} listener - 事件监听函数。
* @returns {this}
*/
public once(eventName: string, listener: (...args: any[]) => void): this {
this.eventEmitter.once(eventName, listener);
return this;
}
/**
* 移除事件监听器。
* @param {string} eventName - 事件名称。
* @param {(...args: any[]) => void} listener - 事件监听函数。
* @returns {this}
*/
public off(eventName: string, listener: (...args: any[]) => void): this {
this.eventEmitter.off(eventName, listener);
return this;
}
/**
* 获取当前正在执行的任务数量。
* @returns {number}
*/
public getActiveCount(): number {
return this.activeCount;
}
/**
* 获取当前等待中的任务数量。
* @returns {number}
*/
public getQueueLength(): number {
return this.taskQueue.length;
}
/**
* 内部方法,检查并触发 'idle' 或 'not:idle' 事件。
* @private
*/
private _checkIdleState(): void {
const currentIdle = this.activeCount === 0 && this.taskQueue.length === 0;
if (currentIdle && !this.isIdle) {
this.isIdle = true;
this.eventEmitter.emit('idle');
console.log('[Status] Limiter is now idle.');
} else if (!currentIdle && this.isIdle) {
this.isIdle = false;
this.eventEmitter.emit('not:idle');
console.log('[Status] Limiter is no longer idle.');
}
}
}
FullFeaturedConcurrentLimiter 方法速查表
| 方法名称 | 描述 | 参数 | 返回值 |
|---|---|---|---|
constructor |
初始化并发控制器。 | limit: number – 最大并发数。 |
FullFeaturedConcurrentLimiter |
add |
添加一个异步任务。 | taskFn: (signal?: AbortSignal) => Promise<T>, options?: TaskOptions |
Promise<T> |
cancelTask |
取消一个等待中的任务。 | taskId: number |
boolean |
pause |
暂停任务调度(已运行任务继续)。 | 无 | void |
resume |
恢复任务调度。 | 无 | void |
clear |
清空等待队列,取消所有待处理任务。 | 无 | void |
drain |
停止接受新任务,等待所有现有任务完成。 | 无 | Promise<void> |
waitIdle |
等待所有任务完成且控制器空闲。 | 无 | Promise<void> |
on |
注册事件监听器。 | eventName: string, listener: Function |
this |
once |
注册只触发一次的事件监听器。 | eventName: string, listener: Function |
this |
off |
移除事件监听器。 | eventName: string, listener: Function |
this |
getActiveCount |
获取当前正在执行的任务数量。 | 无 | number |
getQueueLength |
获取当前等待中的任务数量。 | 无 | number |
FullFeaturedConcurrentLimiter 事件速查表
| 事件名称 | 描述 | 携带数据 |
|---|---|---|
task:added |
任务被添加到队列。 | { id: number, queueLength: number, options: TaskOptions } |
task:start |
任务开始执行。 | { id: number, attempt: number } |
task:complete |
任务成功完成。 | { id: number, result: any } |
task:error |
任务最终失败(不再重试)。 | { id: number, error: any, finalAttempt: boolean } |
task:retry |
任务失败并准备重试。 | { id: number, attempt: number, error: any } |
task:cancelled |
任务被取消(无论是从队列中移除还是执行中收到信号)。 | { id: number, reason: any } |
status:change |
控制器状态(活跃数、队列长度)发生变化。 | { active: number, queue: number, limit: number } |
paused |
控制器进入暂停状态。 | 无 |
resumed |
控制器从暂停状态恢复。 | 无 |
queue:cleared |
任务队列被清空。 | { count: number } |
drain:start |
控制器开始进入 draining 状态。 | 无 |
drain:complete |
控制器完成 draining 状态。 | 无 |
idle |
控制器进入完全空闲状态(无活跃任务,无等待任务)。 | 无 |
not:idle |
控制器从空闲状态变为非空闲。 | 无 |
资源与效率的平衡,系统稳定的基石
通过这次深入的探讨,我们不仅构建了一个功能强大的并发请求控制器,更重要的是,我们理解了其背后的核心原理和设计哲学。并发控制并非仅仅是限制请求数量,它代表着对系统资源的精细管理,是对用户体验的深切关怀,更是保障服务稳定运行的基石。在日益复杂的分布式系统和高并发场景下,掌握这种控制能力,将使我们能够设计出更健壮、更高效、更具弹性的应用程序。