C++ Thread Pool 实现:高效管理线程资源与任务调度

好的,各位观众老爷们,今天咱们就来聊聊C++里的“线程池”这个神奇的东西。别害怕,虽然听起来像个很专业的名词,但其实它就是一个能帮你管理线程,让你的程序跑得更快更流畅的小管家。想象一下,你开了一家餐厅,线程池就是你餐厅里的服务员团队,而你要处理的任务就是顾客的点单。没有线程池的时候,来一个顾客你就临时招一个服务员,顾客走了服务员就没事干了,是不是很浪费?有了线程池,你就可以提前雇好一批服务员,顾客来了直接让他们去服务,顾客走了他们还可以继续服务下一位,效率嗖嗖嗖就上去了!

一、 啥是线程池?为啥要用它?

线程池,顾名思义,就是一个装满了线程的池子。它是一个预先创建好的线程集合,可以用来执行并发任务。

为啥要用线程池?

  • 减少线程创建和销毁的开销: 线程的创建和销毁是很耗费资源的。如果每个任务都创建一个线程,任务结束后销毁线程,就会导致大量的资源浪费。线程池可以避免频繁的创建和销毁线程,提高程序的性能。就像你不用每次顾客来都重新培训一个服务员,而是直接用现成的。

  • 提高响应速度: 当有新任务到达时,线程池可以立即分配一个空闲线程来执行任务,无需等待线程的创建。这样可以大大缩短任务的响应时间。想象一下,顾客来了,马上就有服务员过去点单,是不是很爽?

  • 控制并发线程的数量: 线程池可以限制并发执行的线程数量,防止过多的线程占用系统资源,导致系统崩溃。就像你的餐厅座位有限,不能让所有顾客都进来,否则会乱套的。

  • 提高线程的可管理性: 线程池可以统一管理线程的生命周期,方便监控和调优。你可以随时查看服务员的工作状态,调整他们的工作安排。

二、 线程池的核心组件

一个基本的线程池通常包含以下几个核心组件:

  • 任务队列 (Task Queue): 用于存放待执行的任务。可以是一个 FIFO 队列,也可以是一个优先级队列。就像餐厅的点单本,记录了所有顾客的点单。

  • 线程管理器 (Thread Manager): 负责创建、销毁和管理线程。就像餐厅的经理,负责招聘、培训和服务员的管理。

  • 工作线程 (Worker Threads): 线程池中的线程,负责从任务队列中取出任务并执行。就像餐厅的服务员,负责点单、上菜。

  • 结果队列 (Result Queue,可选): 用于存放任务的执行结果。就像餐厅的厨房,负责将做好的菜品交给服务员。

三、 C++ 线程池的简单实现

下面我们来用 C++ 实现一个简单的线程池。为了让大家更容易理解,我们先写一个最基础的版本,然后再逐步完善。

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>

class ThreadPool {
public:
    ThreadPool(size_t num_threads) : num_threads_(num_threads), stop_(false) {
        threads_.reserve(num_threads_);
        for (size_t i = 0; i < num_threads_; ++i) {
            threads_.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex_);
                        condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
                        if (stop_ && tasks_.empty())
                            return;
                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }
                    task();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (std::thread &thread : threads_) {
            thread.join();
        }
    }

    template<typename F, typename... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;

        auto task = std::make_shared<std::packaged_task<return_type()>>(
                std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            if (stop_)
                throw std::runtime_error("enqueue on stopped ThreadPool");

            tasks_.emplace([task]() { (*task)(); });
        }
        condition_.notify_one();
        return res;
    }

private:
    size_t num_threads_;
    std::vector<std::thread> threads_;
    std::queue<std::function<void()>> tasks_;
    std::mutex queue_mutex_;
    std::condition_variable condition_;
    bool stop_;
};

int main() {
    ThreadPool pool(4); // 创建一个包含4个线程的线程池

    std::vector<std::future<int>> results;

    for (int i = 0; i < 8; ++i) {
        results.emplace_back(
            pool.enqueue([i]{
                std::cout << "Task " << i << " running on thread " << std::this_thread::get_id() << std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(1));
                return i * i;
            })
        );
    }

    for (auto &result : results)
        std::cout << result.get() << " ";
    std::cout << std::endl;

    return 0;
}

代码解释:

  1. ThreadPool 类: 这是线程池的核心类。

    • num_threads_: 线程池中线程的数量。
    • threads_: 存储线程对象的 std::vector
    • tasks_: 任务队列,存储待执行的函数对象。这里使用了 std::queue,先进先出。
    • queue_mutex_: 保护任务队列的互斥锁。
    • condition_: 条件变量,用于线程间的同步,当任务队列为空时,线程会进入等待状态。
    • stop_: 标志位,用于通知线程池停止工作。
  2. 构造函数 ThreadPool(size_t num_threads):

    • 初始化线程池,创建指定数量的线程。
    • 每个线程执行一个循环,不断从任务队列中取出任务并执行。
    • 使用了 std::unique_lockstd::condition_variable 来实现线程的同步和等待。
    • condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); }); 这行代码是关键,它会让线程在任务队列为空时进入等待状态,直到有新的任务加入或者线程池被停止。
  3. 析构函数 ~ThreadPool():

    • 停止线程池,通知所有线程退出。
    • 使用 condition_.notify_all() 唤醒所有等待的线程。
    • 使用 thread.join() 等待所有线程执行完毕。
  4. enqueue(F&& f, Args&&... args) 函数:

    • 用于向任务队列中添加任务。
    • 使用了模板和完美转发,可以接受任意类型的函数对象和参数。
    • 使用 std::bind 将函数对象和参数绑定在一起,创建一个新的函数对象。
    • 使用 std::packaged_task 封装任务,可以获取任务的执行结果。
    • 将任务添加到任务队列中,并使用 condition_.notify_one() 唤醒一个等待的线程。
  5. main() 函数:

    • 创建一个包含 4 个线程的线程池。
    • 向线程池中添加 8 个任务。
    • 每个任务会打印一条消息,并休眠 1 秒。
    • 获取每个任务的执行结果并打印。

四、 线程池的改进和优化

上面的代码只是一个简单的线程池实现,还有很多可以改进和优化的地方。

  1. 异常处理:

    • 在线程执行任务时,可能会抛出异常。如果没有处理异常,可能会导致程序崩溃。
    • 可以使用 try...catch 块来捕获异常,并进行处理。
    • 可以将异常信息记录到日志中,或者将异常重新抛出。
  2. 任务优先级:

    • 可以为任务设置优先级,让优先级高的任务先执行。
    • 可以使用 std::priority_queue 来实现优先级队列。
    • 需要自定义比较函数,根据任务的优先级进行排序.
  3. 线程池大小的动态调整:

    • 可以根据任务的负载情况,动态调整线程池的大小。
    • 当任务队列过长时,可以增加线程池的大小。
    • 当任务队列很短时,可以减少线程池的大小。
    • 需要注意的是,动态调整线程池的大小可能会带来一些性能开销。
  4. 空闲线程的回收:

    • 当线程池中的线程长时间处于空闲状态时,可以将其回收,释放系统资源。
    • 可以使用定时器来检测空闲线程,并将其回收。
  5. 拒绝策略:

    • 当任务队列已满时,需要采取一定的拒绝策略,防止任务丢失。
    • 常见的拒绝策略有:
      • 丢弃任务: 直接丢弃新来的任务。
      • 抛出异常: 抛出一个异常,通知调用者任务无法执行。
      • 阻塞等待: 阻塞调用者,直到任务队列有空闲位置。
      • 调用者运行: 让调用者自己执行任务。

五、 一个更完善的线程池实现

下面是一个更完善的线程池实现,包含了异常处理、任务优先级和拒绝策略等功能。

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <algorithm>

enum class RejectPolicy {
    Discard,
    ThrowException,
    CallerRuns
};

class ThreadPool {
public:
    ThreadPool(size_t num_threads, RejectPolicy reject_policy = RejectPolicy::Discard)
        : num_threads_(num_threads), stop_(false), reject_policy_(reject_policy) {
        threads_.reserve(num_threads_);
        for (size_t i = 0; i < num_threads_; ++i) {
            threads_.emplace_back([this] {
                while (true) {
                    Task task;
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex_);
                        condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
                        if (stop_ && tasks_.empty())
                            return;
                        task = std::move(tasks_.top());
                        tasks_.pop();
                    }
                    try {
                        task.func(); // 执行任务
                    } catch (const std::exception& e) {
                        std::cerr << "Exception in thread: " << e.what() << std::endl;
                    } catch (...) {
                        std::cerr << "Unknown exception in thread" << std::endl;
                    }
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (std::thread& thread : threads_) {
            thread.join();
        }
    }

    template<typename F, typename... Args>
    auto enqueue(F&& f, Args&&... args, int priority = 0)
        -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;

        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        std::future<return_type> res = task->get_future();

        Task t;
        t.priority = priority;
        t.func = [task]() { (*task)(); };

        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            if (stop_) {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }

            if (tasks_.size() >= max_queue_size_ && reject_policy_ != RejectPolicy::CallerRuns) {
                switch (reject_policy_) {
                case RejectPolicy::Discard:
                    std::cerr << "Task discarded due to queue full." << std::endl;
                    return std::future<return_type>(); // 返回一个空的 future
                case RejectPolicy::ThrowException:
                    throw std::runtime_error("Task queue is full");
                default:
                    break;
                }
            }

            tasks_.push(t);
        }
        condition_.notify_one();

        if (tasks_.size() >= max_queue_size_ && reject_policy_ == RejectPolicy::CallerRuns) {
            (*task)(); // 在调用者线程中运行
        }
        return res;
    }

private:
    size_t num_threads_;
    std::vector<std::thread> threads_;

    struct Task {
        int priority;
        std::function<void()> func;

        bool operator<(const Task& other) const {
            return priority < other.priority; // 优先级高的先执行
        }
    };

    std::priority_queue<Task, std::vector<Task>> tasks_; // 优先级队列
    std::mutex queue_mutex_;
    std::condition_variable condition_;
    bool stop_;
    RejectPolicy reject_policy_;
    static const size_t max_queue_size_ = 10; // 最大任务队列大小
};

int main() {
    ThreadPool pool(4, RejectPolicy::ThrowException); // 创建一个包含4个线程的线程池, 拒绝策略为抛出异常

    std::vector<std::future<int>> results;

    try {
        for (int i = 0; i < 12; ++i) {
            results.emplace_back(
                pool.enqueue([i] {
                    std::cout << "Task " << i << " running on thread " << std::this_thread::get_id() << std::endl;
                    std::this_thread::sleep_for(std::chrono::seconds(1));
                    return i * i;
                }, i) // 优先级为 i
            );
        }
    } catch (const std::exception& e) {
        std::cerr << "Exception: " << e.what() << std::endl;
    }

    for (auto& result : results)
        if(result.valid()) // 检查 future 是否有效,避免获取空 future 的值
            std::cout << result.get() << " ";
    std::cout << std::endl;

    return 0;
}

代码解释:

  • RejectPolicy 枚举: 定义了拒绝策略的类型。
  • 构造函数 ThreadPool(size_t num_threads, RejectPolicy reject_policy): 接受一个 RejectPolicy 参数,用于设置拒绝策略。
  • enqueue(F&& f, Args&&... args, int priority) 函数: 接受一个 priority 参数,用于设置任务的优先级。
  • Task 结构体: 包含任务的优先级和函数对象。
  • tasks_: 使用 std::priority_queue 作为任务队列,实现优先级队列。
  • 在线程执行任务时,使用 try...catch 块捕获异常。
  • 当任务队列已满时,根据拒绝策略进行处理。

六、总结

线程池是一个非常有用的工具,可以帮助你提高程序的性能和可管理性。虽然实现一个线程池需要一定的技巧,但是只要理解了线程池的核心原理,就可以轻松地实现一个适合自己需求的线程池。

希望今天的讲解对大家有所帮助,下次再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注