C++ 线程池设计模式:固定大小、动态大小与任务队列

各位观众老爷们,大家好!欢迎来到今天的C++线程池“脱口秀”!今天咱们要聊聊C++线程池的那些事儿,保证让大家听得明白,看得有趣,用得顺手。

咱们今天的主题是:C++线程池设计模式:固定大小、动态大小与任务队列。

线程池是个啥?为啥要用它?

想象一下,你开了一家小餐馆,来一个客人就临时雇一个厨师,客人走了厨师也走了。要是客人不多还好,客人多了,你雇厨师的速度赶不上客人点的速度,厨房就得瘫痪。而且,频繁的雇佣和解雇厨师也很费劲,对吧?

线程池就像一个“厨师中介”,你提前雇好一批厨师(线程),让他们随时待命。客人(任务)来了,直接分配给空闲的厨师做,做完后厨师继续待命,等待下一个任务。这样就避免了频繁创建和销毁线程的开销,提高了效率,稳定了性能。

线程池的核心组件

一个线程池,至少得有这几个核心组件:

  • 线程管理器(ThreadPool): 负责线程的创建、销毁、分配任务等核心管理工作。
  • 工作线程(WorkerThread): 真正干活的线程,从任务队列中取出任务并执行。
  • 任务队列(TaskQueue): 存放待执行任务的队列,相当于“订单列表”。
  • 任务(Task): 需要执行的具体工作,相当于“菜谱”。

固定大小线程池:简单粗暴,胜在稳定

固定大小线程池,顾名思义,就是线程的数量一开始就确定了,并且在整个生命周期内都不会改变。 这种线程池实现起来比较简单,适合任务量相对稳定、对响应时间要求不高的场景。

代码示例:固定大小线程池

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional> // std::function

class ThreadPool {
public:
    ThreadPool(size_t numThreads) : stop(false) {
        threads.resize(numThreads);
        for (size_t i = 0; i < numThreads; ++i) {
            threads[i] = std::thread([this]() {
                while (true) {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(queueMutex);
                        condition.wait(lock, [this]() { return stop || !tasks.empty(); });

                        if (stop && tasks.empty()) {
                            return;
                        }

                        task = tasks.front();
                        tasks.pop();
                    }

                    task();
                }
            });
        }
    }

    template<typename F>
    void enqueue(F f) {
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            tasks.emplace(f);
        }
        condition.notify_one();
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread &thread : threads) {
            thread.join();
        }
    }

private:
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;
    std::mutex queueMutex;
    std::condition_variable condition;
    bool stop;
};

int main() {
    ThreadPool pool(4); // 创建一个包含 4 个线程的线程池

    for (int i = 0; i < 8; ++i) {
        pool.enqueue([i]() {
            std::cout << "Task " << i << " is running in thread " << std::this_thread::get_id() << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时任务
        });
    }

    std::this_thread::sleep_for(std::chrono::seconds(3)); // 等待任务完成
    std::cout << "All tasks submitted." << std::endl;

    return 0;
}

代码解读:

  • ThreadPool(size_t numThreads): 构造函数,初始化线程池的大小,创建指定数量的线程,并让它们开始等待任务。
  • enqueue(F f): 将任务 f 加入任务队列,并通知一个等待的线程。 这里的 F 可以是任何可调用对象 (函数、lambda 表达式、函数对象)。
  • ~ThreadPool(): 析构函数,设置 stop 标志,通知所有线程退出,并等待所有线程结束。
  • tasks: 任务队列,使用 std::queue 存储待执行的任务。
  • queueMutex: 互斥锁,用于保护任务队列的线程安全。
  • condition: 条件变量,用于线程间的同步,当任务队列为空时,线程进入等待状态,当有新任务加入时,线程被唤醒。
  • stop: 一个标志,用于通知线程池停止工作。

优点:

  • 实现简单,易于理解。
  • 线程数量固定,资源占用可控。

缺点:

  • 线程数量固定,无法根据任务量动态调整,可能造成资源浪费或任务积压。
  • 如果任务量突然增大,可能会导致响应时间变长。

动态大小线程池:灵活应变,适应性强

动态大小线程池,允许线程的数量根据任务量动态调整。 当任务量增加时,线程池可以创建新的线程来处理任务;当任务量减少时,线程池可以销毁空闲的线程,从而节省资源。 这种线程池适合任务量波动较大、对响应时间要求较高的场景。

设计思路:

  1. 设定线程池的最大和最小线程数: 限制线程池的规模,防止无限增长或过度收缩。
  2. 监控任务队列的长度: 根据任务队列的长度来判断是否需要创建或销毁线程。
  3. 设置线程的空闲时间: 如果一个线程在一段时间内没有执行任何任务,则认为该线程是空闲的,可以销毁。

代码示例:动态大小线程池

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <chrono>

class DynamicThreadPool {
public:
    DynamicThreadPool(size_t minThreads, size_t maxThreads, std::chrono::seconds idleTime)
        : minThreads_(minThreads), maxThreads_(maxThreads), idleTime_(idleTime), stop(false), currentThreads_(0) {
        // 初始化最小数量的线程
        for (size_t i = 0; i < minThreads_; ++i) {
            createWorkerThread();
        }
    }

    template<typename F>
    void enqueue(F f) {
        {
            std::unique_lock<std::mutex> lock(queueMutex_);
            tasks_.emplace(f);
        }
        condition_.notify_one();

        // 尝试创建新的线程,如果任务队列过长
        if (tasks_.size() > (currentThreads_ * 2) && currentThreads_ < maxThreads_) {
            createWorkerThread();
        }
    }

    ~DynamicThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queueMutex_);
            stop = true;
        }
        condition_.notify_all();
        for (std::thread &thread : threads_) {
            thread.join();
        }
    }

private:
    void createWorkerThread() {
        if (currentThreads_ >= maxThreads_) return;

        threads_.emplace_back(std::thread([this]() {
            while (true) {
                std::function<void()> task;
                std::unique_lock<std::mutex> lock(queueMutex_);

                // 使用带超时的 wait
                if (condition_.wait_for(lock, idleTime_, [this]() { return stop || !tasks_.empty(); })) {
                    if (stop && tasks_.empty()) {
                        break; // 退出线程
                    }

                    task = tasks_.front();
                    tasks_.pop();
                    lock.unlock(); // 释放锁,允许其他线程访问任务队列

                    task(); // 执行任务
                } else {
                    // 超时,线程空闲时间过长
                    if (currentThreads_ > minThreads_) {
                        std::cout << "Thread " << std::this_thread::get_id() << " is idle, exiting." << std::endl;
                        break; // 退出线程
                    }
                }
            }

            // 线程退出时,减少线程计数器
            {
                std::unique_lock<std::mutex> lock(queueMutex_);
                --currentThreads_;
            }
        }));

        ++currentThreads_;
        std::cout << "Thread created, current thread count: " << currentThreads_ << std::endl;
    }

    std::vector<std::thread> threads_;
    std::queue<std::function<void()>> tasks_;
    std::mutex queueMutex_;
    std::condition_variable condition_;
    size_t minThreads_;
    size_t maxThreads_;
    std::chrono::seconds idleTime_;
    bool stop;
    size_t currentThreads_; // 当前线程数量
};

int main() {
    DynamicThreadPool pool(2, 5, std::chrono::seconds(2)); // 最小 2 个线程,最大 5 个线程,空闲 2 秒

    for (int i = 0; i < 10; ++i) {
        pool.enqueue([i]() {
            std::cout << "Task " << i << " is running in thread " << std::this_thread::get_id() << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时任务
        });
    }

    std::this_thread::sleep_for(std::chrono::seconds(5)); // 等待任务完成
    std::cout << "All tasks submitted." << std::endl;

    return 0;
}

代码解读:

  • DynamicThreadPool(size_t minThreads, size_t maxThreads, std::chrono::seconds idleTime): 构造函数,初始化最小线程数、最大线程数和空闲时间。
  • createWorkerThread(): 创建新的工作线程,并增加线程计数器。
  • condition_.wait_for(lock, idleTime_, ...): 使用带超时的 wait_for 函数,如果线程在指定时间内没有收到任务,则认为该线程是空闲的,可以退出。
  • currentThreads_: 记录当前线程池中的线程数量。
  • *`if (tasks.size() > (currentThreads 2) && currentThreads < maxThreads)`:** 这是一个简单的策略,当任务队列的长度超过当前线程数的两倍时,并且当前线程数小于最大线程数,则创建新的线程。 你可以根据实际情况调整这个策略。

优点:

  • 能够根据任务量动态调整线程数量,更好地利用资源。
  • 能够应对任务量波动较大的场景,保持较好的响应时间。

缺点:

  • 实现相对复杂,需要考虑线程的创建和销毁策略。
  • 线程数量的动态调整可能会带来一定的开销。

任务队列:先进先出,保障公平

任务队列是线程池中存放待执行任务的容器。 它负责将任务按照一定的顺序传递给工作线程。 最常用的任务队列是 FIFO (First-In, First-Out) 队列,也就是先进先出队列。 这样可以保证任务按照提交的顺序执行,避免某些任务被饿死。

选择合适的任务队列

特性 std::queue std::priority_queue
顺序 FIFO (先进先出) 基于优先级排序
适用场景 需要按照提交顺序执行任务的场景 需要优先处理某些任务的场景,例如紧急任务
线程安全 需要手动加锁保护 需要手动加锁保护
复杂性 简单 相对复杂

扩展:优先级队列

除了 FIFO 队列,还可以使用优先级队列来存放任务。 优先级队列允许为每个任务设置一个优先级,线程池会优先执行优先级高的任务。 这种方式适用于需要优先处理某些任务的场景,例如紧急任务。

代码示例:使用优先级队列

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>

class PriorityTask {
public:
    int priority;
    std::function<void()> task;

    PriorityTask(int priority, std::function<void()> task) : priority(priority), task(task) {}

    // 重载小于运算符,用于优先级队列排序
    bool operator>(const PriorityTask& other) const {
        return priority > other.priority; // 优先级数值越小,优先级越高
    }
};

class PriorityThreadPool {
public:
    PriorityThreadPool(size_t numThreads) : stop(false) {
        threads.resize(numThreads);
        for (size_t i = 0; i < numThreads; ++i) {
            threads[i] = std::thread([this]() {
                while (true) {
                    PriorityTask task;

                    {
                        std::unique_lock<std::mutex> lock(queueMutex);
                        condition.wait(lock, [this]() { return stop || !tasks.empty(); });

                        if (stop && tasks.empty()) {
                            return;
                        }

                        task = tasks.top();
                        tasks.pop();
                    }

                    task.task();
                }
            });
        }
    }

    template<typename F>
    void enqueue(int priority, F f) {
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            tasks.emplace(priority, f);
        }
        condition.notify_one();
    }

    ~PriorityThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread &thread : threads) {
            thread.join();
        }
    }

private:
    std::vector<std::thread> threads;
    std::priority_queue<PriorityTask, std::vector<PriorityTask>, std::greater<PriorityTask>> tasks; // 优先级队列
    std::mutex queueMutex;
    std::condition_variable condition;
    bool stop;
};

int main() {
    PriorityThreadPool pool(4);

    pool.enqueue(2, []() { // 优先级为 2
        std::cout << "Task with priority 2 is running in thread " << std::this_thread::get_id() << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    });

    pool.enqueue(1, []() { // 优先级为 1
        std::cout << "Task with priority 1 is running in thread " << std::this_thread::get_id() << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    });

    pool.enqueue(3, []() { // 优先级为 3
        std::cout << "Task with priority 3 is running in thread " << std::this_thread::get_id() << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    });

    std::this_thread::sleep_for(std::chrono::seconds(3));
    std::cout << "All tasks submitted." << std::endl;

    return 0;
}

代码解读:

  • PriorityTask: 定义一个包含优先级和任务的类,并重载了 operator> 运算符,用于优先级队列的排序。 注意,这里使用 std::greater,使得优先级数值越小的任务优先级越高。
  • std::priority_queue<PriorityTask, std::vector<PriorityTask>, std::greater<PriorityTask>> tasks;: 声明一个优先级队列,用于存放 PriorityTask 对象。

总结:

线程池的设计模式有很多种,选择哪种方式取决于具体的应用场景。

  • 固定大小线程池: 简单稳定,适合任务量相对稳定的场景。
  • 动态大小线程池: 灵活应变,适合任务量波动较大的场景。
  • 任务队列: 可以选择 FIFO 队列或优先级队列,根据需要选择合适的策略。

希望今天的“脱口秀”能帮助大家更好地理解和使用C++线程池。 记住,没有最好的线程池,只有最适合你的线程池! 谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注