各位同仁,各位技术爱好者,大家好。
在现代异步编程中,我们经常会遇到这样的场景:需要处理大量的异步任务,例如并发请求多个API接口、批量上传文件、处理图片或视频、执行数据抓取任务等等。这些任务虽然是异步的,但它们并非总是可以无限制地并发执行。过度的并发可能导致资源耗尽(如内存、CPU)、网络拥堵、API限流,甚至服务崩溃。此时,我们需要一种机制来有效地调度这些异步任务,限制其并发数量,以确保系统的稳定性和资源的合理利用。
今天,我们将深入探讨如何手写实现一个支持多 Promise 并发限制的调度器,并以信号量(Semaphore)作为核心的算法控制机制。我们将从信号量的基本概念出发,逐步构建我们的调度器,并详细解析其内部运作原理。
1. 异步任务与并发控制的必要性
JavaScript,作为一门单线程语言,其并发模型主要依赖于事件循环(Event Loop)和异步非阻塞I/O。这意味着虽然JavaScript引擎本身一次只能执行一个操作,但它可以发起多个异步操作(如网络请求、定时器、文件读写),并在这些操作完成后,将相应的回调函数放入任务队列,等待事件循环执行。
Promise 是 JavaScript 中处理异步操作的强大工具,它代表一个异步操作的最终完成(或失败)及其结果值。async/await 语法糖进一步简化了 Promise 的使用,使异步代码看起来更像是同步代码,提高了可读性。
然而,当我们需要处理成百上千个 Promise 任务时,简单地使用 Promise.all 或直接启动所有任务,可能会带来以下问题:
- 资源耗尽:例如,同时打开过多的网络连接,可能耗尽操作系统的文件描述符或内存。
- API限流:许多第三方API都有严格的请求速率限制。如果短时间内发送大量请求,可能会被服务器拒绝服务。
- 性能下降:过多的并发任务切换上下文,反而可能导致整体性能下降,而不是提升。
- 系统不稳定:高负载可能使服务变得不稳定,甚至崩溃。
因此,我们需要一种智能的调度器,能够接收任意数量的异步任务,但只允许其中一部分任务同时运行,当有任务完成时,再从等待队列中取出新的任务启动,从而实现对并发度的精确控制。
2. 信号量(Semaphore)原理详解
在并发编程领域,信号量(Semaphore)是一种非常经典的同步原语,由荷兰计算机科学家 Edsger W. Dijkstra 于1960年代提出。它是一个计数器,用于控制对共享资源的访问。
2.1 信号量的核心思想
想象一个停车场,它有固定数量的停车位。当一辆车进入停车场时,它会占用一个停车位;当一辆车离开时,它会释放一个停车位。如果所有停车位都被占满了,新的车辆就必须在入口处等待,直到有车辆离开。
信号量的工作原理与此类似。它维护一个内部计数器,代表可用资源的数量。
2.2 信号量的关键操作
信号量主要提供两个原子操作:
acquire(P 操作或wait操作):- 尝试获取一个资源(或一个许可)。
- 如果计数器大于零,则将其减一,并允许当前操作继续执行。
- 如果计数器等于零,则表示没有可用资源,当前操作必须阻塞(或等待),直到有资源被释放。
release(V 操作或signal操作):- 释放一个资源(或一个许可)。
- 将计数器加一。
- 如果当前有其他操作因等待资源而阻塞,则选择其中一个使其解除阻塞,允许其获取资源并继续执行。
2.3 信号量的分类
- 二值信号量(Binary Semaphore):计数器只能是0或1,常用于实现互斥锁(Mutex),确保只有一个线程/进程能访问临界区。
- 计数信号量(Counting Semaphore):计数器可以是一个任意的非负整数,用于控制对具有N个相同资源的访问。这正是我们实现并发限制调度器所需的类型。
在我们的 Promise 调度器中,计数信号量将用于限制同时运行的 Promise 任务数量。每一个正在运行的 Promise 任务会“占用”一个许可(acquire),任务完成后会“释放”一个许可(release)。
3. 设计我们的 Semaphore 类
为了使我们的调度器模块化且易于理解,我们首先实现一个独立的 Semaphore 类。这个类将封装信号量的核心逻辑。
3.1 Semaphore 类结构设计
我们的 Semaphore 类需要包含以下几个关键部分:
_capacity: 信号量的最大容量,即允许同时持有的许可数量。_currentCount: 当前可用的许可数量。初始时等于_capacity。_waiters: 一个队列,用于存储那些因为没有可用许可而被阻塞的 Promise 的resolve函数。当许可被释放时,我们会从这个队列中取出最前面的resolve函数并调用它,从而解除一个等待者的阻塞状态。
3.2 Semaphore 类的代码实现
/**
* @class Semaphore
* @description 计数信号量实现,用于控制并发访问。
*/
class Semaphore {
/**
* 创建一个 Semaphore 实例。
* @param {number} capacity - 信号量的最大容量,即允许同时持有的许可数量。
* 必须是非负整数。
* @throws {Error} 如果 capacity 小于 0 或不是整数。
*/
constructor(capacity) {
if (typeof capacity !== 'number' || capacity < 0 || !Number.isInteger(capacity)) {
throw new Error('Semaphore capacity must be a non-negative integer.');
}
this._capacity = capacity;
this._currentCount = capacity; // 当前可用的许可数量,初始时等于最大容量
this._waiters = []; // 等待队列,存储因无法获取许可而阻塞的Promise的resolve函数
}
/**
* 尝试获取一个许可。
* 如果当前有可用许可,则立即获取并返回一个已解决的Promise。
* 如果没有可用许可,则当前操作将阻塞(通过返回一个未解决的Promise),
* 直到有许可被释放并分配给它。
* @returns {Promise<void>} 一个Promise,当成功获取到许可时解决。
*/
acquire() {
// 如果当前有可用许可
if (this._currentCount > 0) {
this._currentCount--; // 减少可用许可数量
// 立即返回一个已解决的Promise,表示成功获取
return Promise.resolve();
} else {
// 如果没有可用许可,则创建一个新的Promise,并将其resolve函数加入等待队列
// 当许可被释放时,这个Promise会被解决
return new Promise(resolve => {
this._waiters.push(resolve);
});
}
}
/**
* 释放一个许可。
* 这会增加可用许可的数量。如果等待队列中有等待者,
* 则会从队列中取出第一个等待者的Promise并解决它,从而允许它继续执行。
*/
release() {
// 增加可用许可数量,但不能超过最大容量
if (this._currentCount < this._capacity) {
this._currentCount++;
}
// 如果等待队列中有等待者,并且当前有可用许可(虽然我们刚刚增加了,但这是为了确保逻辑严谨性)
// 实际上,只要有等待者,且_currentCount > 0,就应该唤醒
if (this._waiters.length > 0 && this._currentCount > 0) {
const resolve = this._waiters.shift(); // 取出最早的等待者
resolve(); // 解决其Promise,使其解除阻塞
}
}
/**
* 获取当前可用的许可数量。
* @returns {number}
*/
get currentCount() {
return this._currentCount;
}
/**
* 获取信号量的最大容量。
* @returns {number}
*/
get capacity() {
return this._capacity;
}
/**
* 获取当前等待的Promise数量。
* @returns {number}
*/
get pendingWaiters() {
return this._waiters.length;
}
}
3.3 Semaphore 类代码解析
-
constructor(capacity):- 初始化
_capacity为传入的并发限制数。 _currentCount初始化为_capacity,表示初始时所有许可都可用。_waiters是一个数组,用于存储acquire调用时因无许可而阻塞的 Promise 的resolve函数。- 进行了严格的参数类型和值校验,确保
capacity是一个非负整数。
- 初始化
-
acquire():- 这是获取许可的核心方法。
- 如果
_currentCount > 0,说明有可用许可,则_currentCount减一,并立即返回一个Promise.resolve()。这意味着调用者可以立即执行后续操作。 - 如果
_currentCount为0,说明没有可用许可。此时,我们创建一个新的 Promise,并将其resolve函数推入_waiters队列。这个 Promise 会在后续release被调用时解决,从而解除当前acquire调用的阻塞。
-
release():- 这是释放许可的核心方法。
_currentCount加一,但会检查是否超过_capacity,以防止不合理的释放导致计数器溢出。- 如果
_waiters队列不为空,并且_currentCount大于0(即至少有一个许可可用),则从_waiters队列中取出最前面的resolve函数并调用它。这会解决之前被阻塞的acquire调用所返回的 Promise,允许一个等待中的任务继续执行。
通过这个 Semaphore 类,我们有了一个强大的工具来控制任意数量的“东西”的并发访问。接下来,我们将它集成到我们的 Promise 调度器中。
4. 设计 PromiseScheduler 类
现在我们有了信号量,可以开始构建我们的 PromiseScheduler 了。这个调度器将负责接收 Promise 生成函数,并将它们排队,然后利用信号量按顺序执行它们,同时遵守并发限制。
4.1 PromiseScheduler 类结构设计
我们的 PromiseScheduler 需要实现以下功能:
- 初始化:设置最大并发数。
- 添加任务:接收一个函数,该函数在执行时返回一个 Promise。
- 启动调度:开始处理等待队列中的任务。
- 任务管理:内部维护一个任务队列和当前正在运行的任务计数。
为了实现这些功能,PromiseScheduler 类将包含以下成员:
_semaphore: 一个Semaphore实例,用于控制实际的并发限制。_taskQueue: 一个数组,存储待执行的任务函数。每个任务函数都应该返回一个 Promise。_runningTasks: 一个 Set 或 Map,用于跟踪当前正在运行的 Promise 实例,以便我们可以知道它们何时完成。_isStarted: 一个布尔值,指示调度器是否已经开始运行。_results: 一个数组,用于收集所有任务的结果。_onIdleCallbacks: 一个数组,存储当所有任务完成时需要调用的回调函数。
4.2 PromiseScheduler 类的代码实现
/**
* @class PromiseScheduler
* @description 支持并发限制的 Promise 调度器。
* 使用 Semaphore 来控制同时运行的 Promise 数量。
*/
class PromiseScheduler {
/**
* 创建一个 PromiseScheduler 实例。
* @param {number} maxConcurrency - 最大并发数,即同时运行的 Promise 数量上限。
* @throws {Error} 如果 maxConcurrency 小于 1 或不是整数。
*/
constructor(maxConcurrency) {
if (typeof maxConcurrency !== 'number' || maxConcurrency < 1 || !Number.isInteger(maxConcurrency)) {
throw new Error('maxConcurrency must be a positive integer.');
}
this._semaphore = new Semaphore(maxConcurrency); // 核心:使用信号量控制并发
this._taskQueue = []; // 任务队列,存储待执行的 Promise 生成函数
this._runningTasks = new Set(); // 存储当前正在运行的 Promise 实例
this._isStarted = false; // 调度器是否已启动
this._results = []; // 存储所有任务的结果
this._onIdleCallbacks = []; // 当调度器空闲时(所有任务完成)调用的回调函数
this._totalTasksAdded = 0; // 记录总共添加了多少任务
this._totalTasksCompleted = 0; // 记录总共完成了多少任务
}
/**
* 添加一个 Promise 生成函数到调度器。
* 这个函数应该返回一个 Promise。
* @param {Function} taskFn - 一个无参数函数,调用时返回一个 Promise。
* @returns {Promise<any>} 一个 Promise,当任务完成时解决,返回任务的结果。
*/
addTask(taskFn) {
if (typeof taskFn !== 'function') {
throw new Error('Task must be a function that returns a Promise.');
}
this._totalTasksAdded++;
// 返回一个新的 Promise,它的解决/拒绝状态将与实际任务Promise的解决/拒绝状态同步。
// 这样,外部调用者可以 await addTask() 并获得最终结果。
return new Promise((resolve, reject) => {
// 将包含原始任务函数和其对应的 resolve/reject 的对象放入队列
this._taskQueue.push({ taskFn, resolve, reject, id: this._totalTasksAdded });
// 如果调度器已经启动,尝试立即处理任务。
// 否则,任务会在 start() 被调用时处理。
if (this._isStarted) {
this._processNextTask();
}
});
}
/**
* 启动调度器。
* 如果调度器已经启动,则不执行任何操作。
* @returns {Promise<any[]>} 一个Promise,当所有任务完成时解决,返回所有任务的结果数组。
*/
start() {
if (this._isStarted) {
console.warn('Scheduler already started.');
return Promise.resolve(this._results); // 或者返回一个表示已完成的Promise
}
this._isStarted = true;
// 如果任务队列不为空,则开始处理任务
if (this._taskQueue.length > 0) {
// 尝试填充到最大并发数
for (let i = 0; i < this._semaphore.capacity; i++) {
this._processNextTask();
}
}
// 返回一个Promise,当所有任务完成且调度器空闲时解决
return new Promise(resolve => {
this._onIdleCallbacks.push(resolve);
// 如果所有任务已经完成,立即解决
if (this._totalTasksAdded === this._totalTasksCompleted && this._runningTasks.size === 0) {
this._notifyIdle();
}
});
}
/**
* 暂停调度器。
* 已经运行的任务会继续运行直到完成,但不会再从队列中取出新任务。
*/
pause() {
this._isStarted = false;
console.log('Scheduler paused. No new tasks will be picked from queue.');
}
/**
* 停止调度器。
* 暂停并清空任务队列。已经运行的任务会继续运行直到完成。
* @returns {Promise<void>} 一个Promise,当所有运行中的任务完成时解决。
*/
async stop() {
this.pause(); // 暂停新任务的调度
this._taskQueue = []; // 清空待处理任务队列
console.log('Scheduler stopped. Task queue cleared.');
// 等待所有正在运行的任务完成
if (this._runningTasks.size > 0) {
console.log(`Waiting for ${this._runningTasks.size} running tasks to complete...`);
await Promise.allSettled(Array.from(this._runningTasks));
}
console.log('All running tasks completed after stop.');
this._notifyIdle(); // 即使停止了,也需要通知可能在等待的 start() Promise
}
/**
* 内部方法:尝试处理下一个任务。
* 只有当调度器已启动且任务队列不为空时才执行。
*/
async _processNextTask() {
if (!this._isStarted || this._taskQueue.length === 0) {
return;
}
// 尝试获取信号量许可。如果没有可用许可,此 await 会阻塞。
await this._semaphore.acquire();
// 成功获取许可后,从队列中取出一个任务
const taskEntry = this._taskQueue.shift();
if (!taskEntry) { // 理论上不会发生,因为前面检查了队列长度
this._semaphore.release(); // 释放刚刚获取的许可
return;
}
const { taskFn, resolve, reject, id } = taskEntry;
let taskPromise;
try {
console.log(`[Task ${id}] Acquiring permit, current running: ${this._runningTasks.size}, queue: ${this._taskQueue.length}, semaphore available: ${this._semaphore.currentCount}`);
taskPromise = taskFn(); // 执行任务函数,获取实际的 Promise
if (!(taskPromise instanceof Promise)) {
throw new Error('Task function must return a Promise.');
}
this._runningTasks.add(taskPromise); // 将实际的 Promise 加入运行中集合
console.log(`[Task ${id}] Started. Running tasks: ${this._runningTasks.size}`);
const result = await taskPromise; // 等待任务完成
this._results.push({ id, status: 'fulfilled', value: result });
resolve(result); // 解决 addTask 返回的 Promise
} catch (error) {
this._results.push({ id, status: 'rejected', reason: error });
reject(error); // 拒绝 addTask 返回的 Promise
} finally {
if (taskPromise) {
this._runningTasks.delete(taskPromise); // 从运行中集合移除
}
this._semaphore.release(); // 释放信号量许可
this._totalTasksCompleted++;
console.log(`[Task ${id}] Finished. Running tasks: ${this._runningTasks.size}, semaphore available: ${this._semaphore.currentCount}, completed: ${this._totalTasksCompleted}/${this._totalTasksAdded}`);
// 任务完成后,立即尝试处理下一个任务,以保持并发度
this._processNextTask();
// 检查是否所有任务都已完成
if (this._totalTasksAdded === this._totalTasksCompleted && this._runningTasks.size === 0) {
this._notifyIdle();
}
}
}
/**
* 内部方法:通知所有等待调度器空闲的回调。
*/
_notifyIdle() {
if (this._onIdleCallbacks.length > 0) {
console.log('Scheduler is idle. Notifying waiting Promises.');
// 解决所有等待 start() 完成的 Promise
while (this._onIdleCallbacks.length > 0) {
const resolve = this._onIdleCallbacks.shift();
resolve(this._results); // 传递收集到的所有结果
}
}
}
/**
* 获取当前正在运行的任务数量。
* @returns {number}
*/
get runningCount() {
return this._runningTasks.size;
}
/**
* 获取当前等待队列中的任务数量。
* @returns {number}
*/
get pendingTaskCount() {
return this._taskQueue.length;
}
/**
* 获取总共添加的任务数量。
* @returns {number}
*/
get totalTasksAdded() {
return this._totalTasksAdded;
}
/**
* 获取总共完成的任务数量。
* @returns {number}
*/
get totalTasksCompleted() {
return this._totalTasksCompleted;
}
}
4.3 PromiseScheduler 类代码解析
-
constructor(maxConcurrency):- 初始化
_semaphore实例,将maxConcurrency传递给它。这是并发控制的核心。 _taskQueue存储待处理的任务,每个元素是一个包含taskFn(返回 Promise 的函数)以及其对应resolve和reject函数的对象。_runningTasks是一个Set,用于跟踪当前正在执行的 Promise 实例,方便在任务完成时移除。_isStarted标志调度器是否处于运行状态。_results收集所有任务的最终结果。_onIdleCallbacks用于存储当所有任务完成时需要通知的回调(主要是start()方法返回的 Promise 的resolve)。_totalTasksAdded和_totalTasksCompleted用于统计,也作为判断调度器是否空闲的依据。
- 初始化
-
addTask(taskFn):- 接收一个
taskFn函数,它必须返回一个 Promise。 - 返回一个新的 Promise。这个 Promise 是外部调用者
await scheduler.addTask(...)的结果。 - 将
taskFn和这个新 Promise 的resolve/reject函数一起封装成一个对象,推入_taskQueue。 - 如果调度器已启动,立即调用
_processNextTask()尝试处理。
- 接收一个
-
start():- 将
_isStarted设置为true。 - 如果任务队列不为空,循环调用
_processNextTask(),尝试立即填充到maxConcurrency个任务。 - 返回一个 Promise,这个 Promise 将在所有任务完成后解决。其
resolve函数被添加到_onIdleCallbacks。
- 将
-
pause()/stop():pause()简单地将_isStarted设置为false,阻止_processNextTask从队列中取出新任务。已经运行的任务会继续。stop()会先pause(),然后清空_taskQueue,并await Promise.allSettled(Array.from(this._runningTasks)),等待所有当前正在运行的任务完成。
-
_processNextTask()(核心调度逻辑):- 检查条件:确保调度器已启动且任务队列不为空。
await this._semaphore.acquire():这是关键!它尝试获取一个许可。- 如果
_semaphore有可用许可 (_currentCount > 0),则_currentCount减一,并立即继续执行。 - 如果
_semaphore没有可用许可 (_currentCount === 0),则当前_processNextTask的执行会在这里“暂停”(实际上是async/await的特性,它会 yield 控制权给事件循环),直到_semaphore.release()被调用并解决了对应的acquirePromise。
- 如果
- 取出任务:一旦获得许可,就从
_taskQueue中shift出一个任务。 - 执行任务:调用
taskFn()得到实际的 Promise,并将其添加到_runningTasks集合。 - 等待任务完成:
await taskPromise等待任务的实际完成(成功或失败)。 - 处理结果:根据任务的成功或失败,调用
taskEntry中存储的resolve或reject函数,从而解决/拒绝addTask返回给外部的 Promise。 finally块:- 从
_runningTasks中移除已完成的任务。 this._semaphore.release():释放一个许可,这可能唤醒一个正在acquire处等待的_processNextTask调用。_totalTasksCompleted计数器增加。- 递归调用
this._processNextTask():这是保持并发度的关键。一个任务完成后,调度器会立即尝试处理下一个任务,如果此时有可用许可,就会立即启动新任务,否则会继续等待许可。
- 从
- 检查空闲:每次任务完成时,都会检查
_totalTasksAdded === _totalTasksCompleted && this._runningTasks.size === 0来判断调度器是否已完全空闲,并调用_notifyIdle()。
-
_notifyIdle():- 遍历
_onIdleCallbacks队列,调用其中的所有resolve函数,将所有任务的结果数组传递给它们。这会解决start()方法返回的 Promise。
- 遍历
5. 调度器执行流程详解
为了更好地理解调度器的运作,我们通过一个具体的例子来跟踪其执行流程。
假设我们设置 maxConcurrency = 3,并添加了 5 个任务,每个任务模拟一个耗时操作。
// 模拟一个异步任务
function mockAsyncTask(id, duration) {
console.log(`Task ${id}: Started, will complete in ${duration}ms.`);
return new Promise(resolve => {
setTimeout(() => {
console.log(`Task ${id}: Completed.`);
resolve(`Task ${id} Result`);
}, duration);
});
}
async function runSchedulerExample() {
const scheduler = new PromiseScheduler(3); // 最大并发数设置为 3
console.log('Adding tasks...');
const taskPromises = [];
taskPromises.push(scheduler.addTask(() => mockAsyncTask(1, 2000))); // 任务1 (2s)
taskPromises.push(scheduler.addTask(() => mockAsyncTask(2, 1000))); // 任务2 (1s)
taskPromises.push(scheduler.addTask(() => mockAsyncTask(3, 3000))); // 任务3 (3s)
taskPromises.push(scheduler.addTask(() => mockAsyncTask(4, 1500))); // 任务4 (1.5s)
taskPromises.push(scheduler.addTask(() => mockAsyncTask(5, 500))); // 任务5 (0.5s)
console.log('All tasks added. Scheduler current pending:', scheduler.pendingTaskCount);
console.log('Starting scheduler...');
const allResultsPromise = scheduler.start(); // 启动调度器
const results = await allResultsPromise; // 等待所有任务完成
console.log('n--- All tasks completed ---');
console.log('Final results:', results);
console.log('Scheduler total completed:', scheduler.totalTasksCompleted);
}
runSchedulerExample();
执行日志(简化版)及流程分析:
-
new PromiseScheduler(3): 创建调度器,_semaphore容量为3。_isStarted = false。 -
scheduler.addTask(...)(5次):- 每个
addTask调用都会将一个{ taskFn, resolve, reject, id }对象推入_taskQueue。 _isStarted此时为false,所以_processNextTask()不会被立即调用。- 此时
_taskQueue包含 5 个任务。
- 每个
-
scheduler.start():-
_isStarted设置为true。 -
for (let i = 0; i < 3; i++) { this._processNextTask(); }循环执行 3 次_processNextTask()。 -
第一次
_processNextTask()(for 循环):await this._semaphore.acquire(): 成功获取许可(_currentCount从 3 变为 2)。- 从
_taskQueue取出 任务1。 taskFn()执行mockAsyncTask(1, 2000),返回 Promise。_runningTasks添加 任务1 的 Promise。console.log('Task 1: Started...')。await taskPromise: 任务1 开始计时 2000ms。_processNextTask在这里暂停,控制权返回事件循环。
-
第二次
_processNextTask()(for 循环):await this._semaphore.acquire(): 成功获取许可(_currentCount从 2 变为 1)。- 从
_taskQueue取出 任务2。 taskFn()执行mockAsyncTask(2, 1000),返回 Promise。_runningTasks添加 任务2 的 Promise。console.log('Task 2: Started...')。await taskPromise: 任务2 开始计时 1000ms。_processNextTask在这里暂停。
-
第三次
_processNextTask()(for 循环):await this._semaphore.acquire(): 成功获取许可(_currentCount从 1 变为 0)。- 从
_taskQueue取出 任务3。 taskFn()执行mockAsyncTask(3, 3000),返回 Promise。_runningTasks添加 任务3 的 Promise。console.log('Task 3: Started...')。await taskPromise: 任务3 开始计时 3000ms。_processNextTask在这里暂停。
-
此时,
_runningTasks.size为 3,_semaphore.currentCount为 0。_taskQueue中还剩下 任务4 和 任务5。 -
scheduler.start()将其返回的 Promise 的resolve推入_onIdleCallbacks,并返回这个 Promise。
-
-
1000ms 后:
- 任务2 完成 (
console.log('Task 2: Completed.'))。 - 任务2 对应的
_processNextTask恢复执行,继续await taskPromise之后的代码。 _runningTasks移除 任务2 的 Promise。this._semaphore.release(): 释放许可(_currentCount从 0 变为 1)。this._totalTasksCompleted变为 1。- 递归调用
this._processNextTask():_isStarted为true,_taskQueue.length(2) > 0。await this._semaphore.acquire(): 成功获取许可(_currentCount从 1 变为 0)。- 从
_taskQueue取出 任务4。 taskFn()执行mockAsyncTask(4, 1500)。_runningTasks添加 任务4 的 Promise。console.log('Task 4: Started...')。await taskPromise: 任务4 开始计时 1500ms。_processNextTask在这里暂停。
- 任务2 完成 (
-
2000ms 后 (从开始算):
- 任务1 完成 (
console.log('Task 1: Completed.'))。 - 任务1 对应的
_processNextTask恢复执行。 _runningTasks移除 任务1 的 Promise。this._semaphore.release(): 释放许可(_currentCount从 0 变为 1)。this._totalTasksCompleted变为 2。- 递归调用
this._processNextTask():_isStarted为true,_taskQueue.length(1) > 0。await this._semaphore.acquire(): 成功获取许可(_currentCount从 1 变为 0)。- 从
_taskQueue取出 任务5。 taskFn()执行mockAsyncTask(5, 500)。_runningTasks添加 任务5 的 Promise。console.log('Task 5: Started...')。await taskPromise: 任务5 开始计时 500ms。_processNextTask在这里暂停。
- 任务1 完成 (
-
2500ms 后 (从开始算):
- 任务5 完成 (
console.log('Task 5: Completed.'))。 - 任务5 对应的
_processNextTask恢复执行。 _runningTasks移除 任务5 的 Promise。this._semaphore.release(): 释放许可(_currentCount从 0 变为 1)。this._totalTasksCompleted变为 3。- 递归调用
this._processNextTask():_isStarted为true,但_taskQueue.length(0) 为 0。此调用立即返回。
- 检查空闲:
_totalTasksAdded(5) !=_totalTasksCompleted(3),所以_notifyIdle不会执行。
- 任务5 完成 (
-
2500ms + 500ms = 3000ms 后 (从开始算):
- 任务3 完成 (
console.log('Task 3: Completed.'))。 - 任务3 对应的
_processNextTask恢复执行。 _runningTasks移除 任务3 的 Promise。this._semaphore.release(): 释放许可(_currentCount从 1 变为 2)。this._totalTasksCompleted变为 4。- 递归调用
this._processNextTask():_isStarted为true,但_taskQueue.length(0) 为 0。此调用立即返回。
- 检查空闲:
_totalTasksAdded(5) !=_totalTasksCompleted(4),所以_notifyIdle不会执行。
- 任务3 完成 (
-
3000ms + 1500ms = 4500ms 后 (从开始算):
- 任务4 完成 (
console.log('Task 4: Completed.'))。 - 任务4 对应的
_processNextTask恢复执行。 _runningTasks移除 任务4 的 Promise。this._semaphore.release(): 释放许可(_currentCount从 2 变为 3)。this._totalTasksCompleted变为 5。- 递归调用
this._processNextTask():_isStarted为true,但_taskQueue.length(0) 为 0。此调用立即返回。
- 检查空闲:
_totalTasksAdded(5) ===_totalTasksCompleted(5) 且_runningTasks.size(0) === 0。 _notifyIdle()执行: 解决start()方法返回的 Promise。runSchedulerExample中的await allResultsPromise完成,打印最终结果。
- 任务4 完成 (
通过这个详细的流程,我们可以清晰地看到信号量是如何精确控制并发任务数量的。当许可不足时,acquire 会“阻塞”后续任务的启动;当任务完成时,release 会释放许可并唤醒等待中的任务,确保并发度始终维持在设定值以内。
6. 进阶考量与扩展
我们构建的 PromiseScheduler 已经非常强大和实用,但在实际应用中,我们可能还需要考虑一些更复杂的场景和功能。
6.1 错误处理与重试机制
当前调度器能够捕获并传递任务的拒绝状态。但对于某些临时性错误(如网络瞬断、API限流),我们可能希望自动重试任务。
- 实现方式:
- 在
addTask中,除了taskFn,还可以接受retryCount和retryDelay参数。 - 在
_processNextTask的catch块中,如果任务失败且retryCount > 0,则不立即reject外部 Promise,而是将任务重新包装(递减retryCount),并重新推入任务队列(或一个特殊重试队列),并可能添加一个延迟。
- 在
6.2 任务优先级
如果任务具有不同的重要性,我们可能希望高优先级的任务能够插队执行。
- 实现方式:
_taskQueue不再是简单的数组,而是一个优先级队列(如基于堆实现),或者多个独立的队列(高优、中优、低优)。_processNextTask在选择任务时,优先从高优先级队列中取出。
6.3 任务取消
有时用户可能希望取消一个正在等待或正在执行的任务。
- 实现方式:
- 对于等待中的任务:可以在
addTask返回的 Promise 上添加一个cancel方法,调用时从_taskQueue中移除该任务。 - 对于运行中的任务:JavaScript 的
AbortControllerAPI 是一个很好的选择。taskFn应该接收一个signal对象,并用它来监听取消事件。调度器在取消时调用abortController.abort()。
- 对于等待中的任务:可以在
6.4 调度器状态监控
为了更好地了解调度器的运行状况,可以添加更多的监控指标。
- 实现方式:
getRunningCount(),getPendingCount(),getTotalCompleted(),getTotalFailed(),getAverageLatency()等方法。- 可以集成事件发射器(
EventEmitter),在任务启动、完成、失败、队列空闲等事件发生时发出通知。
6.5 动态调整并发数
在某些情况下,我们可能希望在运行时根据系统负载或外部条件动态调整 maxConcurrency。
- 实现方式:
- 为
Semaphore类添加一个setCapacity(newCapacity)方法。 - 这个方法需要小心处理:如果
newCapacity小于当前正在运行的任务数,可能需要等到一些任务完成才能达到新的限制。如果newCapacity大于当前运行数,则可以立即尝试启动更多任务。
- 为
6.6 背压(Backpressure)机制
如果任务被添加的速度远超它们被处理的速度,_taskQueue 可能会无限增长,导致内存问题。
- 实现方式:
- 限制
_taskQueue的最大长度。当队列满时,addTask可以选择:- 立即拒绝新的任务(返回一个
Promise.reject)。 - 阻塞
addTask调用,直到队列有空位(这需要一个额外的信号量来控制addTask本身的并发)。
- 立即拒绝新的任务(返回一个
- 限制
6.7 任务超时
为每个任务设置一个超时时间,如果任务在规定时间内未完成,则视为失败。
- 实现方式:
- 在
_processNextTask中,将taskPromise与一个setTimeout包装的Promise.reject进行Promise.race。
- 在
7. 实际应用场景
基于信号量的 Promise 调度器在许多实际场景中都非常有用:
- Web 爬虫与数据抓取:限制对目标网站的请求频率,避免被封禁IP,同时提高抓取效率。
- 批量 API 请求:例如,需要从第三方服务获取大量数据,但服务有每秒请求次数限制。调度器可以确保不会超过这个限制。
- 文件处理队列:在前端上传多个文件时,可能希望同时只处理少量文件的压缩、预览生成等操作,而不是一次性处理所有文件。
- 图片/视频处理服务:后端服务处理用户上传的图片或视频时,为了避免服务器过载,可以限制同时进行的编码、缩放或水印添加任务。
- 数据库连接池(概念类比):虽然 JavaScript 通常不直接管理数据库连接,但在其他语言中,信号量是实现连接池的关键,限制同时活跃的数据库连接数。
- CI/CD 流水线:限制同时运行的构建或部署任务的数量。
8. 思考与展望
通过本次讲座,我们深入探讨了如何利用信号量这一经典并发控制原语,在 JavaScript 异步环境中构建一个健壮的 Promise 调度器。我们从信号量的基本概念、核心操作入手,逐步实现了 Semaphore 类,并将其巧妙地集成到 PromiseScheduler 中,最终形成了一个功能完善、逻辑严谨的并发控制解决方案。
这个调度器不仅解决了过度并发带来的资源消耗和稳定性问题,还为我们提供了一个灵活的框架,可以根据实际需求进行扩展,如添加重试、优先级、取消和监控等高级功能。掌握这种基于信号量的并发控制模式,对于构建高性能、高稳定性的异步应用至关重要。我鼓励大家在自己的项目中尝试使用和改进这个调度器,探索其在各种复杂场景下的应用潜力。