好的,让我们开始这场关于 C++ 线程池的“脱口秀”吧!
大家好!欢迎来到“线程池奇妙夜”!
今天我们要聊的是 C++ 中如何用 std::mutex
和 std::condition_variable
这对黄金搭档,打造一个高效、稳定的线程池。
开场白:为什么要线程池?
想象一下,你开了一家餐厅。每来一个顾客,你就临时雇佣一个厨师。顾客走了,厨师也走了。这效率高吗?当然不高!频繁地雇佣和解雇厨师(创建和销毁线程)会浪费大量时间和资源。
线程池就像你的餐厅里有一个固定的厨师团队。当有顾客来(任务)时,他们会立即开始工作,而不是从头开始准备。这样可以显著提高效率,尤其是在任务量大且任务执行时间短的情况下。
第一幕:基础概念回顾
在深入代码之前,让我们快速回顾一下 std::mutex
和 std::condition_variable
这两位主角:
std::mutex
(互斥锁): 就像餐厅厨房的门锁。同一时间只能允许一个厨师进入厨房操作(访问共享资源)。它可以防止多个线程同时访问共享数据,避免数据竞争。std::condition_variable
(条件变量): 就像餐厅里的服务员。当厨房里没有顾客点餐时,厨师可以休息(等待)。当有新的顾客点餐时,服务员会通知厨师开始工作。条件变量允许线程在特定条件下等待,并在条件满足时被唤醒。
第二幕:线程池的骨架
我们先来搭建线程池的基本框架:
#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional> // std::function
#include <future> // std::packaged_task
class ThreadPool {
public:
ThreadPool(size_t numThreads);
~ThreadPool();
template<typename F, typename... Args>
std::future<typename std::result_of<F(Args...)>::type> enqueue(F&& f, Args&&... args);
private:
std::vector<std::thread> workers;
std::queue<std::packaged_task<typename std::result_of<F(Args...)>::type()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
让我们逐行解读:
workers
: 一个std::thread
类型的vector
,用于存储线程池中的所有线程。tasks
: 一个std::packaged_task
类型的queue
,用于存储待执行的任务。std::packaged_task
可以将函数和参数打包成一个可调用的对象,并且可以获取函数的返回值。queue_mutex
: 一个std::mutex
,用于保护任务队列tasks
的访问。condition
: 一个std::condition_variable
,用于线程间的通信。当任务队列为空时,线程会等待在这个条件变量上。当有新的任务加入队列时,会唤醒一个等待的线程。stop
: 一个bool
类型的变量,用于控制线程池的停止。
第三幕:构造函数和析构函数
接下来,我们实现构造函数和析构函数:
ThreadPool::ThreadPool(size_t numThreads) : stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back([this] {
while (true) {
std::packaged_task<typename std::result_of<void()>::type> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty())
return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker : workers)
worker.join();
}
-
构造函数: 构造函数会创建指定数量的线程,并将它们添加到
workers
向量中。每个线程都运行一个循环,不断从任务队列中获取任务并执行。std::unique_lock<std::mutex> lock(queue_mutex);
: 创建一个unique_lock
来锁定互斥锁queue_mutex
。unique_lock
在离开作用域时会自动释放锁,防止死锁。condition.wait(lock, [this] { return stop || !tasks.empty(); });
: 线程会等待在条件变量condition
上,直到满足以下两个条件之一:stop
为true
(线程池被停止)。tasks
不为空 (任务队列中有新的任务)。
if (stop && tasks.empty()) return;
: 如果线程池被停止,并且任务队列为空,线程会退出循环。task = std::move(tasks.front()); tasks.pop();
: 从任务队列中取出一个任务,并将其从队列中移除。std::move
用于将任务的所有权转移给task
变量,避免不必要的拷贝。task();
: 执行任务。
-
析构函数: 析构函数负责停止线程池中的所有线程,并等待它们完成。
stop = true;
: 设置stop
标志为true
,通知所有线程停止工作。condition.notify_all();
: 唤醒所有等待在条件变量condition
上的线程。worker.join();
: 等待每个线程完成。
第四幕:添加任务
接下来,我们实现 enqueue
方法,用于向线程池中添加任务:
template<typename F, typename... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::enqueue(F&& f, Args&&... args) {
using return_type = typename std::result_of<F(Args...)>::type;
std::packaged_task<return_type()> task(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task.get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace(std::move(task));
}
condition.notify_one();
return res;
}
让我们再次逐行解读:
using return_type = typename std::result_of<F(Args...)>::type;
: 使用std::result_of
获取函数f
的返回值类型。std::packaged_task<return_type()> task(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
: 创建一个std::packaged_task
对象,将函数f
和参数args
打包成一个可调用的对象。std::bind
用于将函数f
和参数args
绑定在一起。std::forward
用于完美转发参数,保留参数的原始类型 (左值或右值)。std::future<return_type> res = task.get_future();
: 获取std::future
对象,用于获取函数的返回值。if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");
: 如果线程池已经被停止,则抛出一个异常。tasks.emplace(std::move(task));
: 将任务添加到任务队列中。condition.notify_one();
: 唤醒一个等待在条件变量condition
上的线程。return res;
: 返回std::future
对象,用于获取函数的返回值。
第五幕:完整代码示例
#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional> // std::function
#include <future> // std::packaged_task
#include <chrono>
#include <random>
class ThreadPool {
public:
ThreadPool(size_t numThreads);
~ThreadPool();
template<typename F, typename... Args>
std::future<typename std::result_of<F(Args...)>::type> enqueue(F&& f, Args&&... args);
private:
std::vector<std::thread> workers;
std::queue<std::packaged_task<typename std::result_of<void()>::type()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
ThreadPool::ThreadPool(size_t numThreads) : stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back([this] {
while (true) {
std::packaged_task<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty())
return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker : workers)
worker.join();
}
template<typename F, typename... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::enqueue(F&& f, Args&&... args) {
using return_type = typename std::result_of<F(Args...)>::type;
std::packaged_task<return_type()> task(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task.get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace(std::move(task));
}
condition.notify_one();
return res;
}
int main() {
ThreadPool pool(4);
std::vector< std::future<int> > results;
for (int i = 0; i < 8; ++i) {
results.emplace_back(
pool.enqueue([i] {
std::cout << "hello " << i << std::endl;
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distrib(1, 5);
std::this_thread::sleep_for(std::chrono::seconds(distrib(gen)));
std::cout << "world " << i << std::endl;
return i*i;
})
);
}
for (auto && result : results)
std::cout << result.get() << ' ';
std::cout << std::endl;
return 0;
}
第六幕:代码解释和一些小技巧
-
std::packaged_task
的妙用:std::packaged_task
就像一个任务包裹,它可以把一个函数和它的参数打包起来,同时还能返回一个std::future
对象,让你可以在稍后的某个时刻获取函数的返回值。 -
std::bind
的威力:std::bind
可以将函数和参数绑定在一起,创建一个可调用对象。这在线程池中非常有用,因为我们需要将任务添加到队列中,但并不立即执行它们。 -
std::forward
的完美转发:std::forward
可以完美转发参数,保留参数的原始类型 (左值或右值)。这可以避免不必要的拷贝,提高效率。 -
避免死锁: 在多线程编程中,死锁是一个常见的问题。为了避免死锁,我们需要小心地管理锁的获取和释放。
std::unique_lock
在离开作用域时会自动释放锁,可以帮助我们避免死锁。 -
异常处理: 在多线程环境中,异常处理尤为重要。我们需要确保即使在任务执行过程中发生异常,线程池也能正常工作。
第七幕:性能考量
线程池的大小对性能有很大影响。如果线程池太小,任务可能会排队等待,导致响应时间变长。如果线程池太大,会消耗过多的系统资源,导致性能下降。
一般来说,线程池的大小应该设置为 CPU 核心数的 1 到 2 倍。但是,最佳的线程池大小取决于具体的应用场景。
第八幕:优化建议
-
任务窃取: 当一个线程的任务队列为空时,它可以从其他线程的任务队列中“窃取”任务来执行。这可以提高线程池的利用率,减少任务的等待时间。
-
优先级队列: 可以使用优先级队列来存储任务,优先执行优先级较高的任务。
-
动态调整线程池大小: 可以根据任务的负载情况,动态调整线程池的大小。
第九幕:总结
我们今天一起探索了如何使用 std::mutex
和 std::condition_variable
构建一个功能完备的 C++ 线程池。我们了解了线程池的基本原理、实现细节以及一些优化技巧。
希望今天的“线程池奇妙夜”能帮助你更好地理解和应用线程池技术。记住,线程池就像一个高效的餐厅厨房,可以帮助你更快、更好地处理任务!
感谢大家的收看!我们下期再见!
表格总结
组件 | 作用 |
---|---|
std::mutex |
保护共享资源,防止多个线程同时访问。 |
std::condition_variable |
允许线程在特定条件下等待,并在条件满足时被唤醒,实现线程间的同步。 |
std::thread |
代表一个执行线程。 |
std::queue |
存储待执行的任务。 |
std::packaged_task |
将函数和参数打包成一个可调用的对象,并提供获取函数返回值的能力。 |
std::future |
用于获取异步操作的结果。 |
std::bind |
将函数和参数绑定在一起,创建一个可调用对象。 |
std::forward |
完美转发参数,保留参数的原始类型 (左值或右值)。 |
一些补充说明:
-
异常安全: 确保你的代码在抛出异常时仍然能够正确地释放资源 (例如,锁)。 使用 RAII (Resource Acquisition Is Initialization) 技术,例如
std::unique_lock
,可以帮助你管理资源,确保它们在异常发生时被正确地释放。 -
避免忙等待: 不要使用循环检查条件来代替条件变量。 忙等待会消耗大量的 CPU 资源。
-
测试: 编写单元测试来验证你的线程池是否正确工作。 测试应该包括并发测试,以确保你的线程池能够正确地处理并发任务。
希望这些补充说明能帮助你更好地理解和应用线程池技术。