引言:并发控制的艺术与挑战
在现代前端与后端开发中,异步操作无处不在。从网络请求到文件读写,从数据库查询到复杂计算,我们频繁地与需要时间才能完成的任务打交道。JavaScript作为一门天生异步的语言,通过回调函数、事件循环、Promise以及async/await等机制,极大地增强了处理异步任务的能力。
然而,仅仅能够处理异步任务是不够的。当面临大量异步任务需要同时执行时,我们常常会遇到一个核心问题:并发控制。想象一下,您的应用需要同时向服务器请求数百张图片、批量上传上千个文件,或者并行处理数十个耗时的数据转换任务。如果不加以限制,所有这些任务可能会在短时间内同时启动,导致以下问题:
- 服务器过载:短时间内接收到大量请求,超出服务器处理能力,可能导致服务变慢甚至崩溃。
- 客户端资源耗尽:浏览器或Node.js进程可能因为同时维护大量网络连接、Promise实例和内存占用而变得卡顿甚至崩溃。
- API限流:许多第三方服务对API调用有严格的速率限制。无限制的并发请求很可能触发限流机制,导致请求失败。
- 用户体验下降:过多的并发操作可能阻塞UI渲染,或使应用响应迟钝。
正因如此,我们需要一种机制来限制同时运行的异步操作的数量,这就是并发控制调度器(Scheduler)的核心职责。它就像一个交通警察,管理着异步任务的“车流”,确保系统稳定、高效地运行。
这个主题在大厂面试中是“必考”的重点,因为它不仅考察你对JavaScript异步编程深度的理解,更考验你对数据结构、算法、系统设计以及资源管理的综合能力。一个优秀的并发调度器,是衡量一个开发者能否构建健壮、高性能应用的关键指标之一。
本讲座将深入探讨如何手写实现这样一个并发控制调度器,从基础概念到高级特性,带你一步步构建一个功能完善、易于扩展的解决方案。
理解核心概念:Promise、并发与调度
在深入实现之前,我们首先需要对几个核心概念有清晰的理解。
JavaScript与事件循环:单线程的并发魔法
JavaScript是单线程的。这意味着在任何给定时间点,JavaScript引擎只能执行一个任务。那么,它是如何实现“并发”的呢?答案是事件循环(Event Loop)。
事件循环机制允许JavaScript将耗时的操作(如网络请求、定时器)委托给宿主环境(浏览器或Node.js)的底层线程去执行,而JavaScript主线程则可以继续执行其他任务。当这些耗时操作完成后,它们的回调函数会被放入一个任务队列(Task Queue),等待JavaScript主线程空闲时再取出执行。Promise的微任务队列(Microtask Queue)甚至拥有更高的优先级。
因此,JavaScript的“并发”并非真正的并行(parallelism,多核CPU同时执行多个任务),而是一种并发(concurrency)——通过在不同任务之间快速切换,宏观上看起来像是在同时进行。我们的调度器将利用这一特性来管理异步任务。
Promise:异步操作的优雅封装
Promise是JavaScript中处理异步操作的利器。它代表了一个异步操作的最终完成(或失败)及其结果值。一个Promise有三种状态:
- Pending(待定):初始状态,既没有成功,也没有失败。
- Fulfilled(已成功):操作成功完成。
- Rejected(已失败):操作失败。
Promise通过.then()和.catch()方法链式调用,使得异步代码更易读、更易维护,避免了“回调地狱”。async/await语法糖更是将Promise的强大能力提升到了新的层次,让异步代码看起来像同步代码一样直观。
我们的调度器将围绕Promise来构建,因为大多数现代JavaScript异步操作都返回或可以封装成Promise。
调度器:资源与任务的智能管家
调度器是一个软件组件,负责管理和协调任务的执行。在并发控制的语境下,它有以下职责:
- 任务排队:当任务数量超过最大并发限制时,将多余的任务放入等待队列。
- 任务调度:当有资源(即并发槽位)可用时,从等待队列中取出任务并启动执行。
- 状态监控:跟踪当前正在运行的任务数量,以及等待队列中的任务数量。
- 结果收集与传递:确保每个任务的执行结果(成功或失败)能够正确地返回给调用者。
- 整体完成通知:提供机制让调用者知道所有任务何时全部完成。
一个设计良好的调度器,能够有效地平衡系统性能与资源利用,提升应用的健壮性和用户体验。
初探问题:无限制并发的陷阱
在构建调度器之前,我们先来看一个没有并发控制的典型场景,以及它可能带来的问题。
假设我们有一个需要从API获取大量用户数据的任务。每个用户的数据请求都是一个独立的异步操作。
// 模拟一个异步请求用户数据的函数
function fetchUserData(userId) {
console.log(`[START] Fetching data for user: ${userId}`);
return new Promise(resolve => {
const delay = Math.random() * 2000 + 500; // 模拟网络延迟
setTimeout(() => {
console.log(`[END] Fetched data for user: ${userId}`);
resolve({ id: userId, name: `User ${userId}`, data: `Some data for ${userId}` });
}, delay);
});
}
// 假设我们有10个用户需要获取数据
const userIds = Array.from({ length: 10 }, (_, i) => i + 1);
console.log("--- 开始无限制并发请求 ---");
// 使用Promise.all无限制地发起所有请求
Promise.all(userIds.map(id => fetchUserData(id)))
.then(results => {
console.log("--- 所有用户数据请求完成 (无限制) ---");
// console.log(results);
})
.catch(error => {
console.error("--- 某些请求失败 (无限制) ---", error);
});
console.log("--- 请求已全部发起,等待结果 (无限制) ---");
问题分析:
当我们运行上述代码时,你会立即看到所有10个[START]日志几乎同时打印出来,这意味着所有的fetchUserData Promise都在同一时间被创建并开始执行。虽然JavaScript是单线程的,但它会立即将所有这些网络请求(在浏览器中可能是XHR或Fetch API,在Node.js中可能是http模块)发送出去。
如果userIds的数量是1000而不是10,会发生什么?
- 浏览器环境:浏览器可能会在短时间内打开数百个甚至上千个TCP连接。这会消耗大量的内存和CPU资源,可能导致浏览器卡顿、崩溃,甚至影响其他网站的正常运行。同时,由于浏览器的同源策略和每个域名下的最大连接数限制,过多的并发请求可能会被浏览器内部限制,但其尝试发起的数量仍然是巨大的。
- Node.js环境:Node.js进程会迅速创建大量网络请求,消耗大量文件描述符和内存。这可能导致进程性能急剧下降,甚至因为资源耗尽而崩溃。
- 服务器端:服务器会同时处理这1000个请求,如果其处理能力有限,可能会导致响应变慢,甚至拒绝服务(DoS)。
- API限流:如果
fetchUserData是调用一个有速率限制的API,那么很大概率会因为短时间内请求过多而被限流,导致大量请求失败。
显然,这种“无限制并发”的方式在处理大量异步任务时是不可取的。我们需要一个机制来控制同时进行的任务数量。
构建基础调度器:核心逻辑与数据结构
现在,让我们开始构建一个能够限制并发数量的调度器。我们的目标是创建一个Scheduler类,它能够接收任务(Promise或返回Promise的函数),并确保在任何时候,最多只有指定数量的任务正在运行。
设计思路:队列、计数器与循环
一个基本的并发调度器需要以下核心组件和逻辑:
- 最大并发数 (
maxConcurrency):一个数字,定义了同时运行的最大任务数量。 - 任务队列 (
queue):一个数组或链表,用于存储等待执行的任务。当正在运行的任务数量达到maxConcurrency时,新任务会被添加到这个队列中。 - 运行中任务计数器 (
runningCount):一个数字,跟踪当前正在执行的任务数量。 - 调度循环 (
_runNext):一个内部方法,负责检查是否有空闲的并发槽位,并从队列中取出任务执行。这个方法会在任务完成(成功或失败)后被调用,以尝试启动下一个任务。
调度器组件:maxConcurrency、queue、runningCount
我们来定义Scheduler类的基本结构。
class Scheduler {
constructor(maxConcurrency) {
if (typeof maxConcurrency !== 'number' || maxConcurrency <= 0) {
throw new Error('maxConcurrency must be a positive number.');
}
this.maxConcurrency = maxConcurrency; // 最大并发数
this.queue = []; // 等待执行的任务队列
this.runningCount = 0; // 当前正在运行的任务数
this.totalTasks = 0; // 记录总共添加了多少任务
this.completedTasks = 0; // 记录已经完成的任务数
// 用于等待所有任务完成的Promise
this._allTasksPromise = null;
this._resolveAllTasks = null;
this._rejectAllTasks = null;
}
// 添加一个任务到调度器
add(taskFn) {
// taskFn 必须是一个返回 Promise 的函数
if (typeof taskFn !== 'function') {
return Promise.reject(new Error('Task must be a function that returns a Promise.'));
}
// 创建一个Promise来代表这个任务的执行结果
return new Promise((resolve, reject) => {
// 将任务及其对应的resolve/reject函数封装成一个对象,放入队列
this.queue.push({ taskFn, resolve, reject });
this.totalTasks++;
// 尝试运行下一个任务
this._runNext();
});
}
// 内部方法:尝试运行下一个任务
_runNext() {
// 只有当有空闲的并发槽位 并且 队列中有等待任务时才运行
while (this.runningCount < this.maxConcurrency && this.queue.length > 0) {
const { taskFn, resolve, reject } = this.queue.shift(); // 从队列头部取出一个任务
this.runningCount++; // 增加正在运行的任务计数
// 执行任务函数,并确保它返回一个Promise
// 使用 Promise.resolve().then() 包装,以防 taskFn 不是严格返回 Promise
Promise.resolve().then(() => taskFn())
.then(result => {
resolve(result); // 任务成功,解决外部Promise
})
.catch(error => {
reject(error); // 任务失败,拒绝外部Promise
})
.finally(() => {
this.runningCount--; // 任务完成(无论成功失败),减少运行中计数
this.completedTasks++; // 增加已完成任务计数
// 检查是否所有任务都已完成
this._checkAllTasksCompletion();
// 递归调用_runNext,尝试启动下一个任务
// 这一步非常关键,它形成了调度循环
this._runNext();
});
}
}
// 检查是否所有任务都已完成,并解决_allTasksPromise
_checkAllTasksCompletion() {
if (this.totalTasks > 0 && this.completedTasks === this.totalTasks) {
if (this._resolveAllTasks) {
this._resolveAllTasks();
// 清理,防止重复触发
this._allTasksPromise = null;
this._resolveAllTasks = null;
this._rejectAllTasks = null;
}
}
}
// 等待所有任务完成
waitAll() {
if (!this._allTasksPromise) {
this._allTasksPromise = new Promise((resolve, reject) => {
this._resolveAllTasks = resolve;
this._rejectAllTasks = reject;
});
// 如果所有任务已经完成(例如,在调用waitAll之前就已完成),则立即解决
this._checkAllTasksCompletion();
}
return this._allTasksPromise;
}
}
详细解释调度流程:任务入队、出队、执行、完成与递归调度
-
constructor(maxConcurrency):- 初始化
maxConcurrency、queue、runningCount。 totalTasks和completedTasks用于跟踪所有任务的整体完成状态。_allTasksPromise及其相关的_resolveAllTasks,_rejectAllTasks是为了实现waitAll()方法,允许我们等待调度器中所有任务的最终完成。
- 初始化
-
add(taskFn):- 接收一个
taskFn,它是一个返回Promise的函数。这是调度器管理的基本任务单元。 - 立即返回一个Promise。这个Promise代表了
taskFn的最终结果。调用者可以通过await scheduler.add(myTaskFn())或scheduler.add(myTaskFn()).then(...)来获取单个任务的结果。 - 创建一个内部Promise,并将其
resolve和reject函数与taskFn一起封装成一个对象{ taskFn, resolve, reject }。这个对象被推入queue数组。 totalTasks增加,记录新添加的任务。- 调用
_runNext()尝试立即启动任务。
- 接收一个
-
_runNext():这是调度器的核心逻辑。- 条件检查:
while (this.runningCount < this.maxConcurrency && this.queue.length > 0)。这个循环会持续检查:- 当前正在运行的任务数量是否小于最大并发数。
- 等待队列中是否还有任务。
- 只有这两个条件都满足时,循环才会继续,意味着有空闲槽位且有任务可执行。
- 任务出队:
const { taskFn, resolve, reject } = this.queue.shift();从队列头部取出一个任务。shift()方法是高效的,因为它直接从数组头部移除元素。 - 增加计数:
this.runningCount++;增加正在运行任务的计数。 - 执行任务:
Promise.resolve().then(() => taskFn()):这一步非常重要。它确保taskFn总是被当作一个Promise来处理,即使taskFn本身不是严格返回Promise(例如,它可能直接返回一个值,或者是一个同步函数)。Promise.resolve()将任何值封装成Promise。.then(result => { resolve(result); }):当taskFn成功完成时,调用之前封装的resolve函数,从而解决add方法返回的那个Promise。.catch(error => { reject(error); }):当taskFn失败时,调用之前封装的reject函数,拒绝add方法返回的那个Promise。
- 任务完成后的清理与调度:
.finally(() => { ... })块会在任务无论成功或失败后都执行:this.runningCount--;:减少正在运行任务的计数,释放一个并发槽位。this.completedTasks++;:增加已完成任务的计数。this._checkAllTasksCompletion();:检查是否所有任务都已完成,以解决waitAll()的Promise。this._runNext();:这是最关键的一步。任务完成后,会立即再次调用_runNext()。这意味着:- 如果队列中还有等待任务且有空闲槽位,它将立即启动下一个任务。
- 如果队列为空或者没有空闲槽位,
while循环的条件将不满足,_runNext函数会自然退出。
- 条件检查:
-
_checkAllTasksCompletion():- 在每次任务完成时被调用。
- 检查
completedTasks是否等于totalTasks。如果相等,并且_allTasksPromise已经被创建,就调用_resolveAllTasks来解决整个调度器的waitAll()Promise。
-
waitAll():- 返回一个Promise,这个Promise会在调度器中所有任务都完成时被解决。
- 如果
_allTasksPromise尚未创建,则创建一个新的Promise,并保存其resolve/reject回调。 - 立即调用
_checkAllTasksCompletion(),以防在waitAll()被调用之前所有任务已经完成。
代码示例:基础调度器运行
让我们使用这个Scheduler来重新运行之前的用户数据请求示例,但这次限制并发数为3。
// 模拟一个异步请求用户数据的函数
function fetchUserData(userId) {
console.log(`[START] Fetching data for user: ${userId}, Running: ${scheduler.runningCount}`);
return new Promise(resolve => {
const delay = Math.random() * 2000 + 500; // 模拟网络延迟
setTimeout(() => {
console.log(`[END] Fetched data for user: ${userId}`);
resolve({ id: userId, name: `User ${userId}`, data: `Some data for ${userId}` });
}, delay);
});
}
const userIds = Array.from({ length: 10 }, (_, i) => i + 1);
const MAX_CONCURRENCY = 3;
const scheduler = new Scheduler(MAX_CONCURRENCY);
console.log(`--- 开始并发限制请求 (并发数: ${MAX_CONCURRENCY}) ---`);
const taskPromises = userIds.map(id => {
// 将 fetchUserData(id) 封装成一个函数,因为 add 方法需要一个函数
return scheduler.add(() => fetchUserData(id));
});
// 等待所有任务完成
Promise.all(taskPromises)
.then(results => {
console.log("--- 所有用户数据请求完成 (并发限制) ---");
// console.log(results);
})
.catch(error => {
console.error("--- 某些请求失败 (并发限制) ---", error);
});
// 或者使用调度器自带的 waitAll 方法
// scheduler.waitAll()
// .then(() => {
// console.log("--- 所有用户数据请求完成 (通过 scheduler.waitAll) ---");
// })
// .catch(error => {
// console.error("--- 某些请求失败 (通过 scheduler.waitAll) ---", error);
// });
console.log(`--- 请求已全部添加到调度器,等待结果 (并发数: ${MAX_CONCURRENCY}) ---`);
运行结果分析:
当你运行这段代码时,你会观察到以下现象:
- 最开始,只有3个
[START]日志被打印出来(因为MAX_CONCURRENCY是3)。 - 当其中一个任务完成(打印
[END])后,调度器会自动从队列中取出下一个任务并启动它,然后打印其[START]日志。 - 这个过程会一直持续,直到所有10个任务都完成。
scheduler.runningCount会实时反映当前正在运行的任务数量,它将始终保持在maxConcurrency或更少。
这表明我们的基础调度器已经成功地实现了并发控制。
完善调度器:错误处理与整体完成
上面的基础调度器已经可以工作,但它还有一些可以改进的地方,使其更加健壮和易用。
add方法返回Promise:跟踪单个任务状态
我们当前add方法已经返回了一个Promise,这允许我们独立地跟踪每个任务的成功或失败。
// 示例:跟踪单个任务
scheduler.add(() => fetchUserData(11))
.then(data => console.log(`Task 11 completed:`, data.id))
.catch(error => console.error(`Task 11 failed:`, error));
处理单个任务失败:不影响整体运行
在_runNext方法中,我们已经正确地使用了.then().catch().finally(),这意味着如果一个任务taskFn内部抛出错误或返回一个rejected的Promise,它只会拒绝add方法返回的那个Promise,而不会中断整个调度器的运行。其他正在运行或等待的任务将继续它们的生命周期。
等待所有任务完成:waitAll方法的设计与实现
我们已经为waitAll()方法设计了骨架,并通过_allTasksPromise和_checkAllTasksCompletion进行管理。
让我们考虑一个场景:如果所有任务都成功,waitAll()应该解决;如果至少有一个任务失败,waitAll()应该拒绝。这与Promise.all的行为相似。
当前的_checkAllTasksCompletion只处理了成功的情况,没有考虑如何将失败的状态传递给waitAll()。为了实现Promise.all的行为,我们需要收集所有任务的结果和错误。但是,如果waitAll()只是简单地等待所有任务完成,而不关心具体是成功还是失败(即类似于Promise.allSettled或只关心Promise.all的成功路径),那么当前实现是没问题的,它在所有任务都finally之后解决。
如果waitAll需要像Promise.all一样,一旦有任务失败,它就立即拒绝,那么我们需要修改_checkAllTasksCompletion和_runNext中的逻辑,捕获第一个失败的任务,并拒绝_allTasksPromise。
为了保持调度器的通用性,通常waitAll只会等待所有任务最终完成(无论成功或失败),并且如果所有任务都成功,它就解决;如果至少有一个任务失败,它就拒绝。这需要一个数组来保存所有任务的Promise,然后对这个数组调用Promise.all。
改进waitAll方法:
与其在调度器内部维护一个复杂的_allTasksPromise来模仿Promise.all,不如让调度器专注于任务的调度,而让调用者负责聚合结果。add方法已经返回了每个任务的Promise。调用者可以收集这些Promise并自己使用Promise.all或Promise.allSettled。
但是,如果调度器本身需要提供一个等待所有任务完成的入口,那么它需要知道所有任务的Promise。
让我们修改Scheduler,使其能够更自然地与Promise.all结合,或者提供一个更明确的waitAll行为。
方案一:调度器只负责调度,聚合交给外部
这是最简洁的方式。add方法返回任务Promise,外部收集后用Promise.all。
// ... Scheduler class as defined before ...
// 外部调用:
const taskPromises = userIds.map(id => {
return scheduler.add(() => fetchUserData(id));
});
Promise.all(taskPromises) // 外部负责 Promise.all
.then(results => { /* ... */ })
.catch(error => { /* ... */ });
这种方式清晰地分离了关注点:调度器负责并发控制,外部负责结果聚合。这是推荐的做法。
方案二:调度器内部实现类似Promise.all的waitAll
如果我们真的需要在Scheduler内部实现一个能像Promise.all一样,一旦有任务失败就立即拒绝的waitAll,我们需要:
- 在
add方法中,除了将任务推入队列,还要将返回的Promise推入一个内部数组。 waitAll方法就直接对这个内部数组调用Promise.all。
这将使得Scheduler变得稍微复杂,因为它需要维护所有任务的Promise引用。
class Scheduler {
constructor(maxConcurrency) {
// ... (同前) ...
this.allAddedTaskPromises = []; // 存储所有通过add添加的任务的Promise
this.hasFailedTask = false; // 标记是否有任务失败
}
add(taskFn) {
if (typeof taskFn !== 'function') {
return Promise.reject(new Error('Task must be a function that returns a Promise.'));
}
const taskPromise = new Promise((resolve, reject) => {
this.queue.push({ taskFn, resolve, reject });
this.totalTasks++;
this._runNext();
});
// 将每个任务的Promise添加到列表中
this.allAddedTaskPromises.push(taskPromise);
// 监听每个任务的失败,更新hasFailedTask标记
taskPromise.catch(() => {
this.hasFailedTask = true;
});
return taskPromise;
}
_runNext() {
while (this.runningCount < this.maxConcurrency && this.queue.length > 0) {
const { taskFn, resolve, reject } = this.queue.shift();
this.runningCount++;
Promise.resolve().then(() => taskFn())
.then(result => {
resolve(result);
})
.catch(error => {
reject(error);
})
.finally(() => {
this.runningCount--;
this.completedTasks++;
// 不需要在这里调用 _checkAllTasksCompletion 了,因为 waitAll 将使用 Promise.all
this._runNext();
});
}
}
// 等待所有任务完成,行为类似 Promise.all
waitAll() {
// 如果还没有添加任何任务,直接返回一个已解决的Promise
if (this.allAddedTaskPromises.length === 0) {
return Promise.resolve([]);
}
// 使用 Promise.all 来等待所有任务的完成
return Promise.all(this.allAddedTaskPromises);
}
}
这个修改后的Scheduler的waitAll()方法就具有了Promise.all的语义:所有任务都成功则解决,任何一个任务失败则拒绝。
为什么这个版本更好?
- 语义清晰:
waitAll直接映射到Promise.all的功能,易于理解。 - 简单实现:调度器不需要自己管理
_allTasksPromise的resolve/reject逻辑,而是利用了Promise.all的内置能力。 - 健壮性:
Promise.all本身就考虑了所有任务的成功和失败情况。
使用示例:
// ... (Scheduler 类定义如上) ...
const userIds = Array.from({ length: 10 }, (_, i) => i + 1);
const MAX_CONCURRENCY = 3;
const scheduler = new Scheduler(MAX_CONCURRENCY);
console.log(`--- 开始并发限制请求 (并发数: ${MAX_CONCURRENCY}) ---`);
userIds.forEach(id => {
scheduler.add(() => fetchUserData(id));
});
scheduler.waitAll()
.then(results => {
console.log("--- 所有用户数据请求完成 (通过 scheduler.waitAll) ---");
// console.log(results); // 包含所有任务的成功结果
})
.catch(error => {
console.error("--- 某些请求失败 (通过 scheduler.waitAll) ---", error);
// 如果有一个任务失败,这里会捕获到第一个失败任务的错误
});
console.log(`--- 请求已全部添加到调度器,等待结果 (并发数: ${MAX_CONCURRENCY}) ---`);
// 模拟一个失败的任务
const failingScheduler = new Scheduler(2);
failingScheduler.add(() => Promise.resolve('Success 1'));
failingScheduler.add(() => Promise.reject('Error during task 2'));
failingScheduler.add(() => Promise.resolve('Success 3'));
failingScheduler.waitAll()
.then(results => console.log('Failing scheduler all tasks success:', results))
.catch(error => console.error('Failing scheduler encountered an error:', error)); // 会捕获到 'Error during task 2'
通过这个改进,我们的调度器不仅能控制并发,还能方便地等待所有任务的最终完成状态。
高级特性与生产级考量
为了让我们的调度器更加强大和适应生产环境,我们可以考虑添加一些高级特性。
1. 任务优先级
有时,并不是所有任务都具有相同的紧急程度。例如,在一个图像处理队列中,用户上传的图片可能比系统后台自动处理的图片优先级更高。
实现任务优先级可以通过改变queue的数据结构来实现:
- 多队列:为不同优先级设置独立的队列,
_runNext优先从高优先级队列中取任务。 - 优先级队列(Min-heap/Max-heap):使用堆结构来存储任务,每次
_runNext直接取出优先级最高的任务。这需要实现一个堆数据结构,或者利用外部库。 - 排序队列:每次
add任务时,根据优先级插入到队列的正确位置,保持队列有序。或者在_runNext每次shift之前,对queue进行排序(性能开销较大,不推荐)。
基于排序的简单优先级实现(仅作示例,性能一般):
class PriorityScheduler extends Scheduler {
constructor(maxConcurrency) {
super(maxConcurrency);
}
add(taskFn, priority = 0) { // 增加 priority 参数,数字越大优先级越高
if (typeof taskFn !== 'function') {
return Promise.reject(new Error('Task must be a function that returns a Promise.'));
}
const taskPromise = new Promise((resolve, reject) => {
const taskItem = { taskFn, resolve, reject, priority };
// 根据优先级插入到队列的正确位置
let inserted = false;
for (let i = 0; i < this.queue.length; i++) {
if (taskItem.priority > this.queue[i].priority) {
this.queue.splice(i, 0, taskItem); // 插入到当前元素之前
inserted = true;
break;
}
}
if (!inserted) {
this.queue.push(taskItem); // 如果优先级最低,则添加到末尾
}
this.totalTasks++;
this._runNext();
});
this.allAddedTaskPromises.push(taskPromise);
taskPromise.catch(() => { this.hasFailedTask = true; });
return taskPromise;
}
// _runNext 和 waitAll 保持不变,因为 _runNext 总是从 queue.shift() 取任务,
// 而 queue 已经通过 add 方法维护了优先级顺序。
}
// 示例:
const priorityScheduler = new PriorityScheduler(2);
priorityScheduler.add(() => new Promise(res => setTimeout(() => { console.log('Task A (P0)'); res('A'); }, 1000)), 0);
priorityScheduler.add(() => new Promise(res => setTimeout(() => { console.log('Task B (P1)'); res('B'); }, 500)), 1);
priorityScheduler.add(() => new Promise(res => setTimeout(() => { console.log('Task C (P0)'); res('C'); }, 1200)), 0);
priorityScheduler.add(() => new Promise(res => setTimeout(() => { console.log('Task D (P2)'); res('D'); }, 200)), 2); // 最高优先级
priorityScheduler.waitAll().then(results => console.log('Priority tasks completed:', results));
// 预期输出顺序:D, B, A/C (取决于A和C的完成时间)
2. 任务取消
有时用户可能不再需要一个等待中的任务(例如,切换页面后不再需要之前的API请求)。取消一个任务意味着:
- 如果任务还在队列中等待,将其从队列中移除。
- 如果任务正在执行,尝试中断它(这通常取决于任务本身是否支持取消,例如
AbortController)。
实现取消通常需要为每个任务添加一个唯一的ID,并在add方法返回的Promise上添加一个cancel方法,或者提供一个独立的cancelTask(taskId)方法。
// 简化的取消示例 (只取消队列中的任务)
class CancellableScheduler extends Scheduler {
constructor(maxConcurrency) {
super(maxConcurrency);
this.nextTaskId = 0;
}
add(taskFn) {
const taskId = this.nextTaskId++;
const taskPromise = new Promise((resolve, reject) => {
const taskItem = { taskFn, resolve, reject, taskId };
this.queue.push(taskItem);
this.totalTasks++;
this._runNext();
});
this.allAddedTaskPromises.push(taskPromise);
taskPromise.catch(() => { this.hasFailedTask = true; });
// 增强 Promise,使其支持取消
const cancellablePromise = Object.assign(taskPromise, {
cancel: () => {
const index = this.queue.findIndex(item => item.taskId === taskId);
if (index !== -1) {
this.queue.splice(index, 1);
// 标记为取消,不计入完成计数,或计入取消计数
this.totalTasks--; // 如果取消,不应算作 totalTasks
console.log(`Task ${taskId} cancelled from queue.`);
// 拒绝这个Promise,表明它被取消
reject(new Error(`Task ${taskId} cancelled.`));
return true;
}
// 如果任务已在运行或已完成,则无法取消队列中的状态
return false;
}
});
return cancellablePromise;
}
}
// 示例
const cancellableScheduler = new CancellableScheduler(1);
const task1 = cancellableScheduler.add(() => new Promise(res => setTimeout(() => { console.log('Task 1 done'); res(1); }, 2000)));
const task2 = cancellableScheduler.add(() => new Promise(res => setTimeout(() => { console.log('Task 2 done'); res(2); }, 1000)));
const task3 = cancellableScheduler.add(() => new Promise(res => setTimeout(() => { console.log('Task 3 done'); res(3); }, 500)));
// task1 正在运行
// task2, task3 在队列中等待
setTimeout(() => {
task3.cancel(); // 应该能取消 task3
}, 100);
cancellableScheduler.waitAll().then(results => console.log('Cancellable tasks completed:', results))
.catch(err => console.error('Cancellable tasks error:', err));
3. 任务超时与重试
- 超时:每个任务可以设置一个最大执行时间。如果任务超过这个时间仍未完成,则自动将其标记为失败。这可以通过
Promise.race与一个带有setTimeout的Promise实现。 - 重试:如果任务失败,调度器可以尝试重新执行它几次。这需要跟踪每个任务的重试次数,并在
catch块中根据重试策略决定是重试还是最终标记为失败。
// 任务超时和重试 (只提供思路,不完全实现到Scheduler中,因为它会增加Scheduler的复杂性)
// 任务超时函数
function withTimeout(promiseFn, timeout) {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
reject(new Error(`Task timed out after ${timeout}ms`));
}, timeout);
promiseFn().then(result => {
clearTimeout(timeoutId);
resolve(result);
}).catch(error => {
clearTimeout(timeoutId);
reject(error);
});
});
}
// 任务重试函数
function withRetry(promiseFn, retries = 3, delay = 100) {
return new Promise((resolve, reject) => {
const attempt = (count) => {
promiseFn()
.then(resolve)
.catch(error => {
if (count < retries) {
console.warn(`Retrying task (attempt ${count + 1}/${retries})...`);
setTimeout(() => attempt(count + 1), delay);
} else {
reject(new Error(`Task failed after ${retries} retries: ${error.message}`));
}
});
};
attempt(0);
});
}
// 在 Scheduler.add 中使用:
// scheduler.add(() => withRetry(() => withTimeout(() => fetchUserData(id), 5000), 2, 200));
4. 事件通知
调度器可以通过EventEmitter模式,在关键生命周期事件(如任务开始、任务完成、任务失败、调度器空闲等)触发事件,允许外部系统监听和响应这些事件。
const EventEmitter = require('events'); // Node.js
// 或者自定义一个简单的 EventEmitter 类
class EventfulScheduler extends Scheduler {
constructor(maxConcurrency) {
super(maxConcurrency);
this.events = new EventEmitter();
}
add(taskFn) {
// ... (同 Scheduler.add) ...
const taskPromise = new Promise((resolve, reject) => {
const taskId = this.nextTaskId++; // 假设有 taskId
const taskItem = { taskFn, resolve, reject, taskId };
this.queue.push(taskItem);
this.totalTasks++;
this.events.emit('taskAdded', taskId); // 任务添加事件
this._runNext();
});
// ... (同 Scheduler.add) ...
return taskPromise;
}
_runNext() {
while (this.runningCount < this.maxConcurrency && this.queue.length > 0) {
const { taskFn, resolve, reject, taskId } = this.queue.shift(); // 假设有 taskId
this.runningCount++;
this.events.emit('taskStarted', taskId); // 任务开始事件
Promise.resolve().then(() => taskFn())
.then(result => {
resolve(result);
this.events.emit('taskCompleted', taskId, result); // 任务完成事件
})
.catch(error => {
reject(error);
this.events.emit('taskFailed', taskId, error); // 任务失败事件
})
.finally(() => {
this.runningCount--;
this.completedTasks++;
this._runNext();
if (this.runningCount === 0 && this.queue.length === 0 && this.completedTasks === this.totalTasks) {
this.events.emit('idle'); // 调度器空闲事件
}
});
}
}
}
// 示例
const eventfulScheduler = new EventfulScheduler(2);
eventfulScheduler.events.on('taskStarted', taskId => console.log(`Event: Task ${taskId} started.`));
eventfulScheduler.events.on('taskCompleted', (taskId, result) => console.log(`Event: Task ${taskId} completed with result: ${result}`));
eventfulScheduler.events.on('taskFailed', (taskId, error) => console.error(`Event: Task ${taskId} failed with error: ${error.message}`));
eventfulScheduler.events.on('idle', () => console.log('Event: Scheduler is idle.'));
eventfulScheduler.add(() => new Promise(res => setTimeout(() => res('T1'), 1000)));
eventfulScheduler.add(() => new Promise(res => setTimeout(() => res('T2'), 800)));
eventfulScheduler.add(() => new Promise((_, rej) => setTimeout(() => rej(new Error('T3 Error')), 1500)));
5. 灵活的任务类型
目前我们的add方法要求传入一个返回Promise的函数。在实际应用中,我们可能希望更灵活,例如直接传入一个Promise实例,或者一个async函数,甚至一个同步函数。调度器可以内部进行适配。
// 在 add 方法中进行适配
// function add(task) {
// let taskFn;
// if (typeof task === 'function') {
// // 已经是函数,可能是返回Promise的函数或普通函数
// taskFn = task;
// } else if (task instanceof Promise) {
// // 如果直接传入Promise,则包装成一个返回该Promise的函数
// taskFn = () => task;
// } else {
// // 其他类型,例如直接一个值,也包装成返回Promise的函数
// taskFn = () => Promise.resolve(task);
// }
// // ... 后续处理 taskFn ...
// }
对比与选择:Promise.all与并发调度器的适用场景
| 特性/场景 | Promise.all |
并发调度器(Scheduler) |
|---|---|---|
| 并发控制 | 无限制,所有Promise同时开始执行 | 有明确限制,由maxConcurrency决定 |
| 适用任务量 | 少量或中等数量的任务(几十个),且资源充足 | 大量任务(几百、几千个),需要精细控制资源使用 |
| 资源消耗 | 可能导致短时高CPU/内存/网络连接消耗 | 平稳的资源消耗,避免系统过载 |
| API限流 | 极易触发API限流 | 有效避免API限流,可根据API限制调整maxConcurrency |
| 错误处理 | 任意一个Promise拒绝,Promise.all立即拒绝 |
单个任务失败不影响其他任务,waitAll行为可配置(类似all或allSettled) |
| 任务排队 | 无排队机制,所有任务立即发起 | 内置排队机制,等待队列中的任务按序执行 |
| 任务优先级 | 不支持 | 可扩展支持,通过修改队列数据结构实现 |
| 任务取消 | 不支持 | 可扩展支持,取消等待中的任务或信号中断运行中的任务 |
| 复杂性 | 简单易用,内置于JS标准库 | 需要手写实现,引入额外的类和逻辑 |
| 主要应用场景 | 同时等待几个不相关联的异步操作完成 | 批量处理、数据抓取、文件上传/下载、API限流、资源密集型任务 |
何时选择?
- 如果您需要同时启动的异步操作数量不多(例如,少于10个),并且这些操作对系统资源消耗不大,那么
Promise.all通常是更简洁、更直接的选择。 - 如果您面对的是数百甚至数千个异步操作,或者这些操作非常耗费资源,或者您需要严格遵守API速率限制,那么一个并发调度器是必不可少的。它能帮助您构建更稳定、更高效、更具容错性的应用。
实战应用场景与面试技巧
并发控制调度器不仅仅是一个技术难题,更是一个在实际开发中广泛应用的模式。
实战应用场景
- API限流与批量请求:最经典的场景。当需要从第三方服务获取大量数据时(如爬虫、数据同步),使用调度器可以确保在不违反API速率限制的前提下,高效地完成所有请求。
- 文件上传/下载:在批量上传或下载文件时,限制同时进行的上传/下载任务数量,可以避免网络拥堵,提高用户体验,并减少服务器压力。
- 图片懒加载与预加载:可以调度图片的加载顺序和数量,优先加载视口内的图片,同时限制并发加载数量,优化页面性能。
- 数据抓取与爬虫:这是调度器发挥巨大作用的领域。通过限制并发请求,可以模拟用户行为,避免被目标网站识别为恶意爬虫而被封禁IP。同时,可以结合代理池、重试机制等,构建强大的爬虫系统。
- 离线数据同步:在离线应用中,当网络恢复时,可能需要将本地积累的大量数据同步到服务器。调度器可以控制同步任务的数量,保证数据传输的稳定性。
- 后台任务处理:在Node.js后端服务中,可能有很多CPU密集型或IO密集型的后台任务需要处理。调度器可以限制同时运行的任务数量,防止服务过载。
面试官视角:考察点深度解析
当大厂面试官提出“实现一个并发控制调度器”时,他们通常会考察以下几个核心能力:
-
对JavaScript异步编程的理解:
- 是否熟悉Promise的生命周期、
then/catch/finally、async/await。 - 是否理解事件循环机制,以及JavaScript的单线程模型如何实现并发。
- 能否正确处理Promise的链式调用和错误冒泡。
- 是否熟悉Promise的生命周期、
-
数据结构与算法的应用:
- 能否想到使用队列(数组)来存储等待任务。
- 如何高效地从队列中添加和移除任务(
push和shift)。 - 如果涉及到优先级,能否考虑优先级队列(堆)或有序数组的实现。
-
异常处理与健壮性:
- 单个任务失败是否会影响其他任务的执行(不应影响)。
- 整个调度器是否能捕获并传递所有任务的错误。
- 如何处理
add方法传入非函数的情况。 maxConcurrency为0或负数时的处理(应抛出错误)。
-
系统设计与可扩展性:
- 代码结构是否清晰,模块化程度如何。
Scheduler类是否职责单一,易于理解和维护。- 是否考虑了未来可能的功能扩展(如优先级、取消、重试、事件通知)。
waitAll方法的设计是否合理,是等待所有完成还是像Promise.all一样一旦失败就拒绝。
-
代码的严谨性与细节:
- 变量命名是否规范,代码风格是否一致。
- 是否有不必要的内存泄漏(例如,是否及时清理已完成任务的引用)。
_runNext中的循环和递归调用是否正确,不会导致栈溢出(在JS事件循环中,异步递归不会导致栈溢出)。
在面试中,除了实现基本功能,主动提出并讨论这些高级特性和边缘情况,将极大地加深面试官对你能力的印象。这表明你不仅能解决问题,还能预见问题、优化方案。
理解并发控制的精髓
并发控制调度器是现代异步编程中不可或缺的工具。它不仅是解决实际应用中资源管理和性能瓶颈的关键,更是衡量一个开发者对异步编程、系统设计和问题解决能力深度的试金石。通过手写实现它,我们不仅掌握了一个强大的模式,更深入理解了JavaScript异步机制的本质和如何构建健壮可扩展的系统。