C++ 基于 `std::mutex` 和 `std::condition_variable` 实现线程池

好的,让我们开始这场关于 C++ 线程池的“脱口秀”吧!

大家好!欢迎来到“线程池奇妙夜”!

今天我们要聊的是 C++ 中如何用 std::mutexstd::condition_variable 这对黄金搭档,打造一个高效、稳定的线程池。

开场白:为什么要线程池?

想象一下,你开了一家餐厅。每来一个顾客,你就临时雇佣一个厨师。顾客走了,厨师也走了。这效率高吗?当然不高!频繁地雇佣和解雇厨师(创建和销毁线程)会浪费大量时间和资源。

线程池就像你的餐厅里有一个固定的厨师团队。当有顾客来(任务)时,他们会立即开始工作,而不是从头开始准备。这样可以显著提高效率,尤其是在任务量大且任务执行时间短的情况下。

第一幕:基础概念回顾

在深入代码之前,让我们快速回顾一下 std::mutexstd::condition_variable 这两位主角:

  • std::mutex (互斥锁): 就像餐厅厨房的门锁。同一时间只能允许一个厨师进入厨房操作(访问共享资源)。它可以防止多个线程同时访问共享数据,避免数据竞争。
  • std::condition_variable (条件变量): 就像餐厅里的服务员。当厨房里没有顾客点餐时,厨师可以休息(等待)。当有新的顾客点餐时,服务员会通知厨师开始工作。条件变量允许线程在特定条件下等待,并在条件满足时被唤醒。

第二幕:线程池的骨架

我们先来搭建线程池的基本框架:

#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional> // std::function
#include <future> // std::packaged_task

class ThreadPool {
public:
    ThreadPool(size_t numThreads);
    ~ThreadPool();

    template<typename F, typename... Args>
    std::future<typename std::result_of<F(Args...)>::type> enqueue(F&& f, Args&&... args);

private:
    std::vector<std::thread> workers;
    std::queue<std::packaged_task<typename std::result_of<F(Args...)>::type()>> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

让我们逐行解读:

  • workers: 一个 std::thread 类型的 vector,用于存储线程池中的所有线程。
  • tasks: 一个 std::packaged_task 类型的 queue,用于存储待执行的任务。std::packaged_task 可以将函数和参数打包成一个可调用的对象,并且可以获取函数的返回值。
  • queue_mutex: 一个 std::mutex,用于保护任务队列 tasks 的访问。
  • condition: 一个 std::condition_variable,用于线程间的通信。当任务队列为空时,线程会等待在这个条件变量上。当有新的任务加入队列时,会唤醒一个等待的线程。
  • stop: 一个 bool 类型的变量,用于控制线程池的停止。

第三幕:构造函数和析构函数

接下来,我们实现构造函数和析构函数:

ThreadPool::ThreadPool(size_t numThreads) : stop(false) {
    for (size_t i = 0; i < numThreads; ++i) {
        workers.emplace_back([this] {
            while (true) {
                std::packaged_task<typename std::result_of<void()>::type> task;

                {
                    std::unique_lock<std::mutex> lock(queue_mutex);
                    condition.wait(lock, [this] { return stop || !tasks.empty(); });
                    if (stop && tasks.empty())
                        return;
                    task = std::move(tasks.front());
                    tasks.pop();
                }

                task();
            }
        });
    }
}

ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread &worker : workers)
        worker.join();
}
  • 构造函数: 构造函数会创建指定数量的线程,并将它们添加到 workers 向量中。每个线程都运行一个循环,不断从任务队列中获取任务并执行。

    • std::unique_lock<std::mutex> lock(queue_mutex);: 创建一个 unique_lock 来锁定互斥锁 queue_mutexunique_lock 在离开作用域时会自动释放锁,防止死锁。
    • condition.wait(lock, [this] { return stop || !tasks.empty(); });: 线程会等待在条件变量 condition 上,直到满足以下两个条件之一:
      • stoptrue (线程池被停止)。
      • tasks 不为空 (任务队列中有新的任务)。
    • if (stop && tasks.empty()) return;: 如果线程池被停止,并且任务队列为空,线程会退出循环。
    • task = std::move(tasks.front()); tasks.pop();: 从任务队列中取出一个任务,并将其从队列中移除。 std::move 用于将任务的所有权转移给 task 变量,避免不必要的拷贝。
    • task();: 执行任务。
  • 析构函数: 析构函数负责停止线程池中的所有线程,并等待它们完成。

    • stop = true;: 设置 stop 标志为 true,通知所有线程停止工作。
    • condition.notify_all();: 唤醒所有等待在条件变量 condition 上的线程。
    • worker.join();: 等待每个线程完成。

第四幕:添加任务

接下来,我们实现 enqueue 方法,用于向线程池中添加任务:

template<typename F, typename... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::enqueue(F&& f, Args&&... args) {
    using return_type = typename std::result_of<F(Args...)>::type;

    std::packaged_task<return_type()> task(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    std::future<return_type> res = task.get_future();

    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        if (stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");
        tasks.emplace(std::move(task));
    }
    condition.notify_one();
    return res;
}

让我们再次逐行解读:

  • using return_type = typename std::result_of<F(Args...)>::type;: 使用 std::result_of 获取函数 f 的返回值类型。
  • std::packaged_task<return_type()> task(std::bind(std::forward<F>(f), std::forward<Args>(args)...));: 创建一个 std::packaged_task 对象,将函数 f 和参数 args 打包成一个可调用的对象。 std::bind 用于将函数 f 和参数 args 绑定在一起。 std::forward 用于完美转发参数,保留参数的原始类型 (左值或右值)。
  • std::future<return_type> res = task.get_future();: 获取 std::future 对象,用于获取函数的返回值。
  • if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");: 如果线程池已经被停止,则抛出一个异常。
  • tasks.emplace(std::move(task));: 将任务添加到任务队列中。
  • condition.notify_one();: 唤醒一个等待在条件变量 condition 上的线程。
  • return res;: 返回 std::future 对象,用于获取函数的返回值。

第五幕:完整代码示例

#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional> // std::function
#include <future> // std::packaged_task
#include <chrono>
#include <random>

class ThreadPool {
public:
    ThreadPool(size_t numThreads);
    ~ThreadPool();

    template<typename F, typename... Args>
    std::future<typename std::result_of<F(Args...)>::type> enqueue(F&& f, Args&&... args);

private:
    std::vector<std::thread> workers;
    std::queue<std::packaged_task<typename std::result_of<void()>::type()>> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

ThreadPool::ThreadPool(size_t numThreads) : stop(false) {
    for (size_t i = 0; i < numThreads; ++i) {
        workers.emplace_back([this] {
            while (true) {
                std::packaged_task<void()> task;

                {
                    std::unique_lock<std::mutex> lock(queue_mutex);
                    condition.wait(lock, [this] { return stop || !tasks.empty(); });
                    if (stop && tasks.empty())
                        return;
                    task = std::move(tasks.front());
                    tasks.pop();
                }

                task();
            }
        });
    }
}

ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread &worker : workers)
        worker.join();
}

template<typename F, typename... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::enqueue(F&& f, Args&&... args) {
    using return_type = typename std::result_of<F(Args...)>::type;

    std::packaged_task<return_type()> task(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    std::future<return_type> res = task.get_future();

    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        if (stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");
        tasks.emplace(std::move(task));
    }
    condition.notify_one();
    return res;
}

int main() {
    ThreadPool pool(4);

    std::vector< std::future<int> > results;

    for (int i = 0; i < 8; ++i) {
        results.emplace_back(
            pool.enqueue([i] {
                std::cout << "hello " << i << std::endl;
                std::random_device rd;
                std::mt19937 gen(rd());
                std::uniform_int_distribution<> distrib(1, 5);
                std::this_thread::sleep_for(std::chrono::seconds(distrib(gen)));
                std::cout << "world " << i << std::endl;
                return i*i;
            })
        );
    }

    for (auto && result : results)
        std::cout << result.get() << ' ';
    std::cout << std::endl;

    return 0;
}

第六幕:代码解释和一些小技巧

  1. std::packaged_task 的妙用: std::packaged_task 就像一个任务包裹,它可以把一个函数和它的参数打包起来,同时还能返回一个 std::future 对象,让你可以在稍后的某个时刻获取函数的返回值。

  2. std::bind 的威力: std::bind 可以将函数和参数绑定在一起,创建一个可调用对象。这在线程池中非常有用,因为我们需要将任务添加到队列中,但并不立即执行它们。

  3. std::forward 的完美转发: std::forward 可以完美转发参数,保留参数的原始类型 (左值或右值)。这可以避免不必要的拷贝,提高效率。

  4. 避免死锁: 在多线程编程中,死锁是一个常见的问题。为了避免死锁,我们需要小心地管理锁的获取和释放。std::unique_lock 在离开作用域时会自动释放锁,可以帮助我们避免死锁。

  5. 异常处理: 在多线程环境中,异常处理尤为重要。我们需要确保即使在任务执行过程中发生异常,线程池也能正常工作。

第七幕:性能考量

线程池的大小对性能有很大影响。如果线程池太小,任务可能会排队等待,导致响应时间变长。如果线程池太大,会消耗过多的系统资源,导致性能下降。

一般来说,线程池的大小应该设置为 CPU 核心数的 1 到 2 倍。但是,最佳的线程池大小取决于具体的应用场景。

第八幕:优化建议

  • 任务窃取: 当一个线程的任务队列为空时,它可以从其他线程的任务队列中“窃取”任务来执行。这可以提高线程池的利用率,减少任务的等待时间。

  • 优先级队列: 可以使用优先级队列来存储任务,优先执行优先级较高的任务。

  • 动态调整线程池大小: 可以根据任务的负载情况,动态调整线程池的大小。

第九幕:总结

我们今天一起探索了如何使用 std::mutexstd::condition_variable 构建一个功能完备的 C++ 线程池。我们了解了线程池的基本原理、实现细节以及一些优化技巧。

希望今天的“线程池奇妙夜”能帮助你更好地理解和应用线程池技术。记住,线程池就像一个高效的餐厅厨房,可以帮助你更快、更好地处理任务!

感谢大家的收看!我们下期再见!

表格总结

组件 作用
std::mutex 保护共享资源,防止多个线程同时访问。
std::condition_variable 允许线程在特定条件下等待,并在条件满足时被唤醒,实现线程间的同步。
std::thread 代表一个执行线程。
std::queue 存储待执行的任务。
std::packaged_task 将函数和参数打包成一个可调用的对象,并提供获取函数返回值的能力。
std::future 用于获取异步操作的结果。
std::bind 将函数和参数绑定在一起,创建一个可调用对象。
std::forward 完美转发参数,保留参数的原始类型 (左值或右值)。

一些补充说明:

  • 异常安全: 确保你的代码在抛出异常时仍然能够正确地释放资源 (例如,锁)。 使用 RAII (Resource Acquisition Is Initialization) 技术,例如 std::unique_lock,可以帮助你管理资源,确保它们在异常发生时被正确地释放。

  • 避免忙等待: 不要使用循环检查条件来代替条件变量。 忙等待会消耗大量的 CPU 资源。

  • 测试: 编写单元测试来验证你的线程池是否正确工作。 测试应该包括并发测试,以确保你的线程池能够正确地处理并发任务。

希望这些补充说明能帮助你更好地理解和应用线程池技术。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注