各位观众老爷,晚上好!我是你们的老朋友,今天咱来聊聊JavaScript并发控制这档子事儿,保证让各位听得明白,用得溜溜的。
开场白:为啥要搞并发限制?
话说,咱们写代码,尤其是在前端,经常要跟服务器打交道,发请求拿数据。要是用户一顿操作猛如虎,一下子发了十几个请求,浏览器或者服务器可能就懵逼了,轻则卡顿,重则崩溃。这就好比一根水管,同时往里灌太多水,那不得爆了?所以,我们需要一个“阀门”,控制一下并发请求的数量,保证系统平稳运行。
并发限制的几种姿势
并发限制的手段有很多,比如:
- 队列 + 定时器: 先把请求放到队列里,然后用定时器每次从队列里取一个请求执行。
- Promise.all + 分片: 把请求分成若干批次,用
Promise.all
并行执行每个批次。 - Semaphore(信号量): 咱今天的主角,一种更优雅、更灵活的并发控制方案。
什么是信号量?
信号量,英文叫 Semaphore,你可以把它想象成一个停车场。
- 停车场有固定数量的停车位(并发数)。
- 每来一辆车(发起一个请求),就占用一个停车位。
- 车走了(请求完成),就释放一个停车位。
- 如果停车场满了(达到并发限制),后面的车就只能在外面等着(阻塞)。
信号量模式的JavaScript实现
咱们用JavaScript来实现一个简单的信号量:
class Semaphore {
constructor(maxConcurrent) {
this.maxConcurrent = maxConcurrent; // 最大并发数
this.currentConcurrent = 0; // 当前并发数
this.queue = []; // 等待队列
}
/**
* 获取许可,如果当前并发数未达到最大值,则立即执行,否则加入等待队列
* @returns {Promise<() => void>} 返回一个Promise,resolve时返回一个release函数,用于释放许可
*/
acquire() {
return new Promise((resolve) => {
if (this.currentConcurrent < this.maxConcurrent) {
this.currentConcurrent++;
resolve(this.release.bind(this)); // 立即resolve,并返回release函数
} else {
// 加入等待队列
this.queue.push(resolve);
}
});
}
/**
* 释放许可,如果等待队列中有等待者,则唤醒一个等待者
*/
release() {
this.currentConcurrent--;
if (this.queue.length > 0) {
const next = this.queue.shift();
this.currentConcurrent++;
next(this.release.bind(this)); // 唤醒等待者,并传递release函数
}
}
/**
* 包装一个异步函数,使其具有并发限制
* @param {Function} fn 需要执行的异步函数
* @returns {Function} 返回一个包装后的函数
*/
wrap(fn) {
return async (...args) => {
const release = await this.acquire(); // 获取许可
try {
const result = await fn(...args); // 执行异步函数
return result;
} finally {
release(); // 释放许可,无论成功失败都要释放
}
};
}
}
代码解释:
constructor(maxConcurrent)
: 构造函数,接收一个参数maxConcurrent
,表示最大并发数。acquire()
: 获取许可的方法。如果当前并发数小于最大并发数,则立即增加当前并发数,并返回一个release
函数,用于释放许可。如果当前并发数已经达到最大并发数,则将resolve
函数放入等待队列。release()
: 释放许可的方法。减少当前并发数,如果等待队列中有等待者,则从队列中取出一个resolve
函数并执行,唤醒一个等待者。wrap(fn)
: 包装一个异步函数,使其具有并发限制。返回一个包装后的函数,该函数在执行之前会先调用acquire()
获取许可,执行完毕后调用release()
释放许可。
使用示例:
假设我们有一个异步请求函数 fetchData
:
async function fetchData(url) {
// 模拟网络请求延迟
await new Promise(resolve => setTimeout(resolve, Math.random() * 1000));
console.log(`Fetching data from ${url}`);
return `Data from ${url}`;
}
现在,我们使用信号量来限制并发请求的数量:
const semaphore = new Semaphore(3); // 最大并发数为3
const wrappedFetchData = semaphore.wrap(fetchData);
async function main() {
const urls = [
"https://example.com/data1",
"https://example.com/data2",
"https://example.com/data3",
"https://example.com/data4",
"https://example.com/data5",
"https://example.com/data6"
];
const promises = urls.map(url => wrappedFetchData(url));
const results = await Promise.all(promises);
console.log("All data fetched:", results);
}
main();
运行结果分析:
你会发现,同一时刻最多只有3个 fetchData
函数在执行,其他的请求都在等待。等到有请求完成后,才会从等待队列中取出一个请求执行。这样就保证了并发请求的数量不会超过最大值,避免了系统崩溃。
信号量的优点:
- 简单易懂: 信号量的概念比较直观,容易理解和使用。
- 灵活可控: 可以根据实际情况调整最大并发数,灵活控制并发程度。
- 避免阻塞: 使用等待队列,避免了无意义的阻塞,提高了资源利用率。
- 适用场景广: 适用于各种需要并发控制的场景,比如网络请求、数据库连接等。
信号量的应用场景:
- 限制API请求频率: 防止短时间内发送大量请求,避免被服务器封禁。
- 控制数据库连接数: 避免连接池耗尽,导致数据库崩溃。
- 限制上传/下载速度: 避免占用过多带宽,影响其他用户的体验。
- 任务调度: 控制任务的并发执行数量,提高系统效率。
信号量的进阶用法
- 带超时的信号量: 可以设置一个超时时间,如果等待时间超过了超时时间,则放弃获取许可,避免死锁。
- 可重入的信号量: 允许同一个任务多次获取许可,而不会阻塞自己。
示例:带超时的信号量
class TimeoutSemaphore extends Semaphore {
constructor(maxConcurrent) {
super(maxConcurrent);
}
async acquire(timeout = 0) {
return new Promise((resolve, reject) => {
if (this.currentConcurrent < this.maxConcurrent) {
this.currentConcurrent++;
resolve(this.release.bind(this));
return;
}
let timeoutId;
const timeoutHandler = () => {
this.queue = this.queue.filter(r => r !== resolve); //移除队列中的resolve
reject(new Error("Timeout acquiring semaphore"));
this.currentConcurrent--; // 释放许可,避免资源泄露
};
if (timeout > 0) {
timeoutId = setTimeout(timeoutHandler, timeout);
}
this.queue.push((release) => {
clearTimeout(timeoutId); //如果获取到许可,清除timeout
resolve(release);
});
});
}
wrap(fn, timeout = 0) {
return async (...args) => {
try {
const release = await this.acquire(timeout); // 获取许可,带有超时
try {
const result = await fn(...args);
return result;
} finally {
release();
}
} catch (error) {
console.error("Error executing wrapped function:", error);
throw error; // 继续抛出异常
}
};
}
}
// 使用示例
const timeoutSemaphore = new TimeoutSemaphore(2);
const wrappedFetchDataTimeout = timeoutSemaphore.wrap(fetchData, 500); // 设置超时时间为500ms
async function mainWithTimeout() {
const urls = [
"https://example.com/data1",
"https://example.com/data2",
"https://example.com/data3",
"https://example.com/data4"
];
const promises = urls.map(url => wrappedFetchDataTimeout(url).catch(error => {
console.error(`Failed to fetch ${url}:`, error);
return null; // 或者返回一个默认值
}));
const results = await Promise.all(promises);
console.log("All data fetched (with timeout):", results);
}
mainWithTimeout();
这个改进版本增加了以下功能:
- 超时机制:
acquire
方法现在接受一个可选的timeout
参数。如果超过指定的超时时间仍未获取到许可,则Promise
将被拒绝,并抛出一个错误。 - 错误处理:
wrap
方法现在包含一个try...catch
块,用于捕获异步函数执行期间可能发生的错误,并将其记录到控制台。同时,它会继续抛出异常,以便调用者可以处理该错误。 - 资源清理: 在超时的情况下,从队列中移除对应的
resolve
函数,防止内存泄漏。 - 错误处理: 在
mainWithTimeout
函数中,对每个wrappedFetchDataTimeout
调用使用.catch
方法来捕获并处理任何错误。
表格总结:信号量与其他并发控制方法的比较
特性 | 信号量 | 队列 + 定时器 | Promise.all + 分片 |
---|---|---|---|
实现难度 | 中等 | 简单 | 中等 |
灵活性 | 高 | 较低 | 中等 |
资源利用率 | 较高 | 较低 | 较高 |
适用场景 | 各种并发控制场景 | 简单任务队列 | 批量任务处理 |
维护性 | 较高 | 简单 | 较高 |
可扩展性 | 较高 | 较低 | 较高 |
实时性 | 较好 | 较差 | 较好 |
错误处理 | 灵活 | 简单 | 较为复杂 |
是否需要等待队列 | 是 | 是 | 否 |
注意事项:
- 避免死锁: 在使用信号量时,要特别注意避免死锁的发生。比如,一个任务同时需要获取多个信号量的许可,如果获取顺序不当,可能会导致死锁。
- 资源释放: 务必保证在任务完成后释放许可,无论任务是成功还是失败。否则,可能会导致信号量一直处于占用状态,影响其他任务的执行。
- 选择合适的并发数: 最大并发数的选择需要根据实际情况进行调整。如果并发数太小,可能会导致资源利用率不高;如果并发数太大,可能会导致系统崩溃。
总结:
信号量是一种强大的并发控制工具,可以帮助我们更好地管理JavaScript中的并发请求,提高系统性能和稳定性。掌握信号量的使用方法,可以让我们在编写高并发应用时更加得心应手。希望今天的分享对大家有所帮助!
各位,下课!