JS 并发限制:使用信号量模式控制并发请求数量

各位观众老爷,晚上好!我是你们的老朋友,今天咱来聊聊JavaScript并发控制这档子事儿,保证让各位听得明白,用得溜溜的。

开场白:为啥要搞并发限制?

话说,咱们写代码,尤其是在前端,经常要跟服务器打交道,发请求拿数据。要是用户一顿操作猛如虎,一下子发了十几个请求,浏览器或者服务器可能就懵逼了,轻则卡顿,重则崩溃。这就好比一根水管,同时往里灌太多水,那不得爆了?所以,我们需要一个“阀门”,控制一下并发请求的数量,保证系统平稳运行。

并发限制的几种姿势

并发限制的手段有很多,比如:

  • 队列 + 定时器: 先把请求放到队列里,然后用定时器每次从队列里取一个请求执行。
  • Promise.all + 分片: 把请求分成若干批次,用 Promise.all 并行执行每个批次。
  • Semaphore(信号量): 咱今天的主角,一种更优雅、更灵活的并发控制方案。

什么是信号量?

信号量,英文叫 Semaphore,你可以把它想象成一个停车场。

  • 停车场有固定数量的停车位(并发数)。
  • 每来一辆车(发起一个请求),就占用一个停车位。
  • 车走了(请求完成),就释放一个停车位。
  • 如果停车场满了(达到并发限制),后面的车就只能在外面等着(阻塞)。

信号量模式的JavaScript实现

咱们用JavaScript来实现一个简单的信号量:

class Semaphore {
  constructor(maxConcurrent) {
    this.maxConcurrent = maxConcurrent; // 最大并发数
    this.currentConcurrent = 0; // 当前并发数
    this.queue = []; // 等待队列
  }

  /**
   * 获取许可,如果当前并发数未达到最大值,则立即执行,否则加入等待队列
   * @returns {Promise<() => void>} 返回一个Promise,resolve时返回一个release函数,用于释放许可
   */
  acquire() {
    return new Promise((resolve) => {
      if (this.currentConcurrent < this.maxConcurrent) {
        this.currentConcurrent++;
        resolve(this.release.bind(this)); // 立即resolve,并返回release函数
      } else {
        // 加入等待队列
        this.queue.push(resolve);
      }
    });
  }

  /**
   * 释放许可,如果等待队列中有等待者,则唤醒一个等待者
   */
  release() {
    this.currentConcurrent--;
    if (this.queue.length > 0) {
      const next = this.queue.shift();
      this.currentConcurrent++;
      next(this.release.bind(this)); // 唤醒等待者,并传递release函数
    }
  }

  /**
   * 包装一个异步函数,使其具有并发限制
   * @param {Function} fn 需要执行的异步函数
   * @returns {Function} 返回一个包装后的函数
   */
  wrap(fn) {
    return async (...args) => {
      const release = await this.acquire(); // 获取许可
      try {
        const result = await fn(...args); // 执行异步函数
        return result;
      } finally {
        release(); // 释放许可,无论成功失败都要释放
      }
    };
  }
}

代码解释:

  • constructor(maxConcurrent): 构造函数,接收一个参数 maxConcurrent,表示最大并发数。
  • acquire(): 获取许可的方法。如果当前并发数小于最大并发数,则立即增加当前并发数,并返回一个 release 函数,用于释放许可。如果当前并发数已经达到最大并发数,则将 resolve 函数放入等待队列。
  • release(): 释放许可的方法。减少当前并发数,如果等待队列中有等待者,则从队列中取出一个 resolve 函数并执行,唤醒一个等待者。
  • wrap(fn): 包装一个异步函数,使其具有并发限制。返回一个包装后的函数,该函数在执行之前会先调用 acquire() 获取许可,执行完毕后调用 release() 释放许可。

使用示例:

假设我们有一个异步请求函数 fetchData

async function fetchData(url) {
  // 模拟网络请求延迟
  await new Promise(resolve => setTimeout(resolve, Math.random() * 1000));
  console.log(`Fetching data from ${url}`);
  return `Data from ${url}`;
}

现在,我们使用信号量来限制并发请求的数量:

const semaphore = new Semaphore(3); // 最大并发数为3
const wrappedFetchData = semaphore.wrap(fetchData);

async function main() {
  const urls = [
    "https://example.com/data1",
    "https://example.com/data2",
    "https://example.com/data3",
    "https://example.com/data4",
    "https://example.com/data5",
    "https://example.com/data6"
  ];

  const promises = urls.map(url => wrappedFetchData(url));
  const results = await Promise.all(promises);

  console.log("All data fetched:", results);
}

main();

运行结果分析:

你会发现,同一时刻最多只有3个 fetchData 函数在执行,其他的请求都在等待。等到有请求完成后,才会从等待队列中取出一个请求执行。这样就保证了并发请求的数量不会超过最大值,避免了系统崩溃。

信号量的优点:

  • 简单易懂: 信号量的概念比较直观,容易理解和使用。
  • 灵活可控: 可以根据实际情况调整最大并发数,灵活控制并发程度。
  • 避免阻塞: 使用等待队列,避免了无意义的阻塞,提高了资源利用率。
  • 适用场景广: 适用于各种需要并发控制的场景,比如网络请求、数据库连接等。

信号量的应用场景:

  • 限制API请求频率: 防止短时间内发送大量请求,避免被服务器封禁。
  • 控制数据库连接数: 避免连接池耗尽,导致数据库崩溃。
  • 限制上传/下载速度: 避免占用过多带宽,影响其他用户的体验。
  • 任务调度: 控制任务的并发执行数量,提高系统效率。

信号量的进阶用法

  • 带超时的信号量: 可以设置一个超时时间,如果等待时间超过了超时时间,则放弃获取许可,避免死锁。
  • 可重入的信号量: 允许同一个任务多次获取许可,而不会阻塞自己。

示例:带超时的信号量

class TimeoutSemaphore extends Semaphore {
    constructor(maxConcurrent) {
        super(maxConcurrent);
    }

    async acquire(timeout = 0) {
        return new Promise((resolve, reject) => {
            if (this.currentConcurrent < this.maxConcurrent) {
                this.currentConcurrent++;
                resolve(this.release.bind(this));
                return;
            }

            let timeoutId;
            const timeoutHandler = () => {
                this.queue = this.queue.filter(r => r !== resolve); //移除队列中的resolve
                reject(new Error("Timeout acquiring semaphore"));
                this.currentConcurrent--; // 释放许可,避免资源泄露
            };

            if (timeout > 0) {
                timeoutId = setTimeout(timeoutHandler, timeout);
            }

            this.queue.push((release) => {
                clearTimeout(timeoutId); //如果获取到许可,清除timeout
                resolve(release);
            });
        });
    }

    wrap(fn, timeout = 0) {
        return async (...args) => {
            try {
                const release = await this.acquire(timeout); // 获取许可,带有超时
                try {
                    const result = await fn(...args);
                    return result;
                } finally {
                    release();
                }
            } catch (error) {
                console.error("Error executing wrapped function:", error);
                throw error; // 继续抛出异常
            }
        };
    }
}

// 使用示例
const timeoutSemaphore = new TimeoutSemaphore(2);
const wrappedFetchDataTimeout = timeoutSemaphore.wrap(fetchData, 500); // 设置超时时间为500ms

async function mainWithTimeout() {
    const urls = [
        "https://example.com/data1",
        "https://example.com/data2",
        "https://example.com/data3",
        "https://example.com/data4"
    ];

    const promises = urls.map(url => wrappedFetchDataTimeout(url).catch(error => {
        console.error(`Failed to fetch ${url}:`, error);
        return null; // 或者返回一个默认值
    }));

    const results = await Promise.all(promises);
    console.log("All data fetched (with timeout):", results);
}

mainWithTimeout();

这个改进版本增加了以下功能:

  • 超时机制: acquire 方法现在接受一个可选的 timeout 参数。如果超过指定的超时时间仍未获取到许可,则 Promise 将被拒绝,并抛出一个错误。
  • 错误处理: wrap 方法现在包含一个 try...catch 块,用于捕获异步函数执行期间可能发生的错误,并将其记录到控制台。同时,它会继续抛出异常,以便调用者可以处理该错误。
  • 资源清理: 在超时的情况下,从队列中移除对应的 resolve 函数,防止内存泄漏。
  • 错误处理:mainWithTimeout 函数中,对每个 wrappedFetchDataTimeout 调用使用 .catch 方法来捕获并处理任何错误。

表格总结:信号量与其他并发控制方法的比较

特性 信号量 队列 + 定时器 Promise.all + 分片
实现难度 中等 简单 中等
灵活性 较低 中等
资源利用率 较高 较低 较高
适用场景 各种并发控制场景 简单任务队列 批量任务处理
维护性 较高 简单 较高
可扩展性 较高 较低 较高
实时性 较好 较差 较好
错误处理 灵活 简单 较为复杂
是否需要等待队列

注意事项:

  • 避免死锁: 在使用信号量时,要特别注意避免死锁的发生。比如,一个任务同时需要获取多个信号量的许可,如果获取顺序不当,可能会导致死锁。
  • 资源释放: 务必保证在任务完成后释放许可,无论任务是成功还是失败。否则,可能会导致信号量一直处于占用状态,影响其他任务的执行。
  • 选择合适的并发数: 最大并发数的选择需要根据实际情况进行调整。如果并发数太小,可能会导致资源利用率不高;如果并发数太大,可能会导致系统崩溃。

总结:

信号量是一种强大的并发控制工具,可以帮助我们更好地管理JavaScript中的并发请求,提高系统性能和稳定性。掌握信号量的使用方法,可以让我们在编写高并发应用时更加得心应手。希望今天的分享对大家有所帮助!

各位,下课!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注