并发限制调度器:如何实现一个同时只能运行 N 个任务的 Scheduler(字节跳动高频题)

并发限制调度器:如何实现一个同时只能运行 N 个任务的 Scheduler(字节跳动高频题)

大家好,今天我们来深入探讨一个在高并发系统中非常常见也极其重要的设计模式——并发限制调度器(Concurrency-Limited Scheduler)。这个问题不仅出现在字节跳动的面试中,也是很多大型互联网公司如阿里、腾讯、美团等高频考察点。

为什么这个话题这么重要?因为现实中我们经常遇到这样的场景:

  • 后端服务要调用第三方 API,但对方只允许每秒最多 10 次请求;
  • 批量数据处理任务不能一次性启动全部线程,否则会压垮服务器;
  • 异步任务队列需要控制最大并发数以避免资源耗尽;
  • Web 应用中的图片压缩或视频转码任务需要限制并行数量防止 CPU 占满。

这些问题的核心本质就是:如何让一批异步任务按指定的最大并发数顺序执行?


一、问题定义与目标

我们要实现一个 Scheduler 类,它具备以下能力:

功能 描述
支持任意数量的任务提交 可以动态添加多个任务
控制最大并发数 例如最多同时运行 3 个任务
自动排队等待 超出限制的任务自动挂起,直到有空闲槽位
保证执行顺序 先进先出(FIFO),不乱序
错误处理机制 单个任务失败不影响其他任务

这正是字节跳动这类大厂喜欢问的问题:“给你一个任务列表和最大并发数,怎么写一个调度器让它刚好最多跑 N 个任务?”


二、核心思路解析

核心思想:信号量 + 队列

我们可以用两个关键组件来解决这个问题:

  1. 信号量(Semaphore):用于控制并发数量。

    • 初始值为 N(比如 3)
    • 每次执行任务前 acquire() 获取许可
    • 任务完成后 release() 释放许可
  2. 任务队列(Queue):存储待执行的任务,先进先出。

    • 使用标准库的 queue.Queue 或自定义链表结构均可
    • 新任务进来时如果信号量不足就入队等待

当某个任务完成并释放信号量后,从队列里取出下一个任务继续执行。

这样就能做到:
✅ 最多只有 N 个任务同时运行
✅ 所有任务最终都会被执行
✅ 不会出现饥饿(所有任务都能被处理)
✅ 易于扩展支持超时、取消等功能


三、Python 实现版本(推荐用于面试)

下面是完整可运行的代码示例(基于 Python 的 asyncio 和 threading):

import asyncio
import queue
from typing import Callable, Any, Optional

class ConcurrencyLimitedScheduler:
    def __init__(self, max_concurrent: int):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.task_queue = queue.Queue()
        self._running_tasks = set()

    async def submit(self, func: Callable[[], Any], *args, **kwargs) -> asyncio.Task:
        """
        提交一个异步函数到调度器中,返回 Task 对象
        """
        task = asyncio.create_task(self._run_with_semaphore(func, *args, **kwargs))
        self._running_tasks.add(task)

        def done_callback(fut: asyncio.Task):
            self._running_tasks.discard(fut)

        task.add_done_callback(done_callback)
        return task

    async def _run_with_semaphore(self, func: Callable[[], Any], *args, **kwargs):
        """
        内部方法:获取信号量后执行具体任务
        """
        async with self.semaphore:
            try:
                result = await func(*args, **kwargs)
                return result
            except Exception as e:
                print(f"Task failed: {e}")
                raise

    def submit_sync(self, func: Callable[[], Any], *args, **kwargs) -> None:
        """
        如果是同步函数,也可以通过此接口提交(内部自动包装成协程)
        """
        async def wrapper():
            return await self._run_with_semaphore(func, *args, **kwargs)

        asyncio.create_task(wrapper())

    async def wait_all(self):
        """
        等待所有任务完成
        """
        while self._running_tasks:
            await asyncio.sleep(0.1)

# 示例使用方式
async def example_task(name: str, delay: float):
    print(f"[{name}] 开始执行")
    await asyncio.sleep(delay)
    print(f"[{name}] 执行完毕")

async def main():
    scheduler = ConcurrencyLimitedScheduler(max_concurrent=2)

    # 添加多个任务
    tasks = [
        scheduler.submit(example_task, "A", 1),
        scheduler.submit(example_task, "B", 2),
        scheduler.submit(example_task, "C", 0.5),
        scheduler.submit(example_task, "D", 3),
        scheduler.submit(example_task, "E", 1),
    ]

    await scheduler.wait_all()

    print("所有任务已完成!")

if __name__ == "__main__":
    asyncio.run(main())

输出结果示例(模拟并发行为):

[A] 开始执行
[B] 开始执行
[C] 开始执行   # 这时候 A/B 已经在跑,C 被阻塞,等 A 或 B 完成
[D] 开始执行   # C 执行完后 D 接着上
[E] 开始执行   # D 执行完后 E 上
[A] 执行完毕
[B] 执行完毕
[C] 执行完毕
[D] 执行完毕
[E] 执行完毕
所有任务已完成!

✅ 符合预期:最多同时运行 2 个任务,其余排队等待。


四、为什么不用简单的线程池?

很多人可能会想到用 ThreadPoolExecutorProcessPoolExecutor 来做这件事,但这不是最优解!

方案 是否满足需求 缺陷
ThreadPoolExecutor(max_workers=N) ❌ 不满足 无法动态调整并发数;任务一旦提交就不能撤销;无法优雅地处理异常
自己维护线程池 + 队列 ✅ 可行 复杂度高,容易出错(死锁、资源泄漏)
我们提出的 Semaphore + Queue 方案 ✅ 完美匹配 简洁、清晰、易扩展、健壮性强

关键在于:信号量提供了“令牌”的概念,而队列负责缓冲和排序 —— 这正是并发控制的经典组合。


五、进阶功能扩展(适合面试加分项)

如果你能在面试中提到这些扩展点,会让面试官眼前一亮:

1. 支持任务超时

async def _run_with_semaphore(self, func: Callable[[], Any], timeout: Optional[float] = None, *args, **kwargs):
    try:
        if timeout is not None:
            return await asyncio.wait_for(self._run_with_semaphore(func, *args, **kwargs), timeout=timeout)
        else:
            return await self._run_with_semaphore(func, *args, **kwargs)
    except asyncio.TimeoutError:
        print("任务超时")
        raise

2. 支持取消任务

def cancel_all(self):
    for task in self._running_tasks:
        if not task.done():
            task.cancel()

3. 支持优先级队列(如高优任务插队)

可以用 heapq 实现优先级队列,比如给每个任务打标签(0~9),数值越小优先级越高。

4. 监控指标收集

可以记录:

  • 总共执行了多少任务
  • 平均等待时间
  • 最大排队长度
  • 成功率统计

这对生产环境非常重要!


六、Java 版本参考(如果你熟悉 Java)

虽然 Python 更适合快速演示,但很多企业用 Java,这里给出一个简化版 Java 实现:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ConcurrencyLimitedScheduler {
    private final ExecutorService executor;
    private final Semaphore semaphore;
    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
    private final AtomicInteger runningCount = new AtomicInteger(0);

    public ConcurrencyLimitedScheduler(int maxConcurrent) {
        this.semaphore = new Semaphore(maxConcurrent);
        this.executor = Executors.newFixedThreadPool(maxConcurrent);
    }

    public void submit(Runnable task) {
        queue.offer(() -> {
            try {
                semaphore.acquire();
                runningCount.incrementAndGet();
                try {
                    task.run();
                } finally {
                    runningCount.decrementAndGet();
                    semaphore.release();
                    // 继续尝试从队列取下一个任务
                    scheduleNext();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    private void scheduleNext() {
        Runnable next = queue.poll();
        if (next != null) {
            executor.execute(next);
        }
    }

    public void shutdown() {
        executor.shutdown();
    }
}

这个 Java 版本逻辑一致,只是语言层面略有不同。注意:这里用了 LinkedBlockingQueue 来做任务缓冲,并且通过 executor.execute() 触发真正的执行。


七、性能对比与优化建议

场景 建议做法
小规模任务(< 100 个) 直接使用上述方案即可,无需复杂优化
中大规模任务(1k+) 加入日志追踪、监控埋点、熔断机制
高吞吐要求 使用更高效的队列(如 Disruptor)替代 Queue
分布式场景 结合 Redis 或 Kafka 实现跨节点协调调度

⚠️ 注意:不要盲目追求极致性能,而是要根据业务需求选择合适粒度。例如,在微服务中,通常只需要控制单机并发即可。


八、常见陷阱与避坑指南

误区 正确做法
认为直接用线程池就够了 必须配合信号量才能精确控制并发数
把任务放进线程池后不管了 必须跟踪任务状态,防止内存泄漏
忽略错误处理 每个任务都应该 try-catch 包裹,防止一个任务崩溃导致整个调度器瘫痪
使用全局锁保护队列 不要用 synchronized,应该用并发安全的数据结构(如 BlockingQueue)
不测试边界情况 测试 0 并发、负数、大量任务等情况是否能正常工作

九、总结:为什么这是高频考点?

这个问题之所以频繁出现,是因为它融合了以下几个高级知识点:

知识点 在本题中的体现
并发编程基础 Semaphore 控制并发数
异步编程模型 asyncio / CompletableFuture
数据结构应用 Queue 作为缓冲区
错误恢复机制 try-except / try-finally
设计模式 责任链 + 工厂 + 调度器模式

掌握这个题目,不仅能帮你拿下字节跳动、快手、滴滴等公司的 offer,更能让你在未来构建高性能服务时游刃有余。


十、课后练习(建议动手实践)

  1. ✅ 实现一个带优先级的任务调度器(比如紧急任务插队)
  2. ✅ 添加日志输出:记录每个任务的等待时间和执行时间
  3. ✅ 支持任务中断(用户主动取消某个任务)
  4. ✅ 将调度器封装成独立模块,支持配置文件读取最大并发数
  5. ✅ 模拟压力测试:提交 1000 个任务,观察是否有内存泄露或性能瓶颈

希望这篇讲解能帮你彻底理解并发限制调度器的设计原理和实现细节。记住:优秀的程序员不是只会写代码的人,而是懂得如何设计系统的人。这个调度器看似简单,实则背后藏着无数工程智慧。

祝你面试顺利,早日拿到心仪 offer!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注