各位观众老爷们,大家好!欢迎来到今天的C++线程池“脱口秀”!今天咱们要聊聊C++线程池的那些事儿,保证让大家听得明白,看得有趣,用得顺手。
咱们今天的主题是:C++线程池设计模式:固定大小、动态大小与任务队列。
线程池是个啥?为啥要用它?
想象一下,你开了一家小餐馆,来一个客人就临时雇一个厨师,客人走了厨师也走了。要是客人不多还好,客人多了,你雇厨师的速度赶不上客人点的速度,厨房就得瘫痪。而且,频繁的雇佣和解雇厨师也很费劲,对吧?
线程池就像一个“厨师中介”,你提前雇好一批厨师(线程),让他们随时待命。客人(任务)来了,直接分配给空闲的厨师做,做完后厨师继续待命,等待下一个任务。这样就避免了频繁创建和销毁线程的开销,提高了效率,稳定了性能。
线程池的核心组件
一个线程池,至少得有这几个核心组件:
- 线程管理器(ThreadPool): 负责线程的创建、销毁、分配任务等核心管理工作。
- 工作线程(WorkerThread): 真正干活的线程,从任务队列中取出任务并执行。
- 任务队列(TaskQueue): 存放待执行任务的队列,相当于“订单列表”。
- 任务(Task): 需要执行的具体工作,相当于“菜谱”。
固定大小线程池:简单粗暴,胜在稳定
固定大小线程池,顾名思义,就是线程的数量一开始就确定了,并且在整个生命周期内都不会改变。 这种线程池实现起来比较简单,适合任务量相对稳定、对响应时间要求不高的场景。
代码示例:固定大小线程池
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional> // std::function
class ThreadPool {
public:
ThreadPool(size_t numThreads) : stop(false) {
threads.resize(numThreads);
for (size_t i = 0; i < numThreads; ++i) {
threads[i] = std::thread([this]() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this]() { return stop || !tasks.empty(); });
if (stop && tasks.empty()) {
return;
}
task = tasks.front();
tasks.pop();
}
task();
}
});
}
}
template<typename F>
void enqueue(F f) {
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.emplace(f);
}
condition.notify_one();
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread &thread : threads) {
thread.join();
}
}
private:
std::vector<std::thread> threads;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop;
};
int main() {
ThreadPool pool(4); // 创建一个包含 4 个线程的线程池
for (int i = 0; i < 8; ++i) {
pool.enqueue([i]() {
std::cout << "Task " << i << " is running in thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时任务
});
}
std::this_thread::sleep_for(std::chrono::seconds(3)); // 等待任务完成
std::cout << "All tasks submitted." << std::endl;
return 0;
}
代码解读:
ThreadPool(size_t numThreads)
: 构造函数,初始化线程池的大小,创建指定数量的线程,并让它们开始等待任务。enqueue(F f)
: 将任务f
加入任务队列,并通知一个等待的线程。 这里的F
可以是任何可调用对象 (函数、lambda 表达式、函数对象)。~ThreadPool()
: 析构函数,设置stop
标志,通知所有线程退出,并等待所有线程结束。tasks
: 任务队列,使用std::queue
存储待执行的任务。queueMutex
: 互斥锁,用于保护任务队列的线程安全。condition
: 条件变量,用于线程间的同步,当任务队列为空时,线程进入等待状态,当有新任务加入时,线程被唤醒。stop
: 一个标志,用于通知线程池停止工作。
优点:
- 实现简单,易于理解。
- 线程数量固定,资源占用可控。
缺点:
- 线程数量固定,无法根据任务量动态调整,可能造成资源浪费或任务积压。
- 如果任务量突然增大,可能会导致响应时间变长。
动态大小线程池:灵活应变,适应性强
动态大小线程池,允许线程的数量根据任务量动态调整。 当任务量增加时,线程池可以创建新的线程来处理任务;当任务量减少时,线程池可以销毁空闲的线程,从而节省资源。 这种线程池适合任务量波动较大、对响应时间要求较高的场景。
设计思路:
- 设定线程池的最大和最小线程数: 限制线程池的规模,防止无限增长或过度收缩。
- 监控任务队列的长度: 根据任务队列的长度来判断是否需要创建或销毁线程。
- 设置线程的空闲时间: 如果一个线程在一段时间内没有执行任何任务,则认为该线程是空闲的,可以销毁。
代码示例:动态大小线程池
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <chrono>
class DynamicThreadPool {
public:
DynamicThreadPool(size_t minThreads, size_t maxThreads, std::chrono::seconds idleTime)
: minThreads_(minThreads), maxThreads_(maxThreads), idleTime_(idleTime), stop(false), currentThreads_(0) {
// 初始化最小数量的线程
for (size_t i = 0; i < minThreads_; ++i) {
createWorkerThread();
}
}
template<typename F>
void enqueue(F f) {
{
std::unique_lock<std::mutex> lock(queueMutex_);
tasks_.emplace(f);
}
condition_.notify_one();
// 尝试创建新的线程,如果任务队列过长
if (tasks_.size() > (currentThreads_ * 2) && currentThreads_ < maxThreads_) {
createWorkerThread();
}
}
~DynamicThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex_);
stop = true;
}
condition_.notify_all();
for (std::thread &thread : threads_) {
thread.join();
}
}
private:
void createWorkerThread() {
if (currentThreads_ >= maxThreads_) return;
threads_.emplace_back(std::thread([this]() {
while (true) {
std::function<void()> task;
std::unique_lock<std::mutex> lock(queueMutex_);
// 使用带超时的 wait
if (condition_.wait_for(lock, idleTime_, [this]() { return stop || !tasks_.empty(); })) {
if (stop && tasks_.empty()) {
break; // 退出线程
}
task = tasks_.front();
tasks_.pop();
lock.unlock(); // 释放锁,允许其他线程访问任务队列
task(); // 执行任务
} else {
// 超时,线程空闲时间过长
if (currentThreads_ > minThreads_) {
std::cout << "Thread " << std::this_thread::get_id() << " is idle, exiting." << std::endl;
break; // 退出线程
}
}
}
// 线程退出时,减少线程计数器
{
std::unique_lock<std::mutex> lock(queueMutex_);
--currentThreads_;
}
}));
++currentThreads_;
std::cout << "Thread created, current thread count: " << currentThreads_ << std::endl;
}
std::vector<std::thread> threads_;
std::queue<std::function<void()>> tasks_;
std::mutex queueMutex_;
std::condition_variable condition_;
size_t minThreads_;
size_t maxThreads_;
std::chrono::seconds idleTime_;
bool stop;
size_t currentThreads_; // 当前线程数量
};
int main() {
DynamicThreadPool pool(2, 5, std::chrono::seconds(2)); // 最小 2 个线程,最大 5 个线程,空闲 2 秒
for (int i = 0; i < 10; ++i) {
pool.enqueue([i]() {
std::cout << "Task " << i << " is running in thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时任务
});
}
std::this_thread::sleep_for(std::chrono::seconds(5)); // 等待任务完成
std::cout << "All tasks submitted." << std::endl;
return 0;
}
代码解读:
DynamicThreadPool(size_t minThreads, size_t maxThreads, std::chrono::seconds idleTime)
: 构造函数,初始化最小线程数、最大线程数和空闲时间。createWorkerThread()
: 创建新的工作线程,并增加线程计数器。condition_.wait_for(lock, idleTime_, ...)
: 使用带超时的wait_for
函数,如果线程在指定时间内没有收到任务,则认为该线程是空闲的,可以退出。currentThreads_
: 记录当前线程池中的线程数量。- *`if (tasks.size() > (currentThreads 2) && currentThreads < maxThreads)`:** 这是一个简单的策略,当任务队列的长度超过当前线程数的两倍时,并且当前线程数小于最大线程数,则创建新的线程。 你可以根据实际情况调整这个策略。
优点:
- 能够根据任务量动态调整线程数量,更好地利用资源。
- 能够应对任务量波动较大的场景,保持较好的响应时间。
缺点:
- 实现相对复杂,需要考虑线程的创建和销毁策略。
- 线程数量的动态调整可能会带来一定的开销。
任务队列:先进先出,保障公平
任务队列是线程池中存放待执行任务的容器。 它负责将任务按照一定的顺序传递给工作线程。 最常用的任务队列是 FIFO (First-In, First-Out) 队列,也就是先进先出队列。 这样可以保证任务按照提交的顺序执行,避免某些任务被饿死。
选择合适的任务队列
特性 | std::queue |
std::priority_queue |
---|---|---|
顺序 | FIFO (先进先出) | 基于优先级排序 |
适用场景 | 需要按照提交顺序执行任务的场景 | 需要优先处理某些任务的场景,例如紧急任务 |
线程安全 | 需要手动加锁保护 | 需要手动加锁保护 |
复杂性 | 简单 | 相对复杂 |
扩展:优先级队列
除了 FIFO 队列,还可以使用优先级队列来存放任务。 优先级队列允许为每个任务设置一个优先级,线程池会优先执行优先级高的任务。 这种方式适用于需要优先处理某些任务的场景,例如紧急任务。
代码示例:使用优先级队列
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
class PriorityTask {
public:
int priority;
std::function<void()> task;
PriorityTask(int priority, std::function<void()> task) : priority(priority), task(task) {}
// 重载小于运算符,用于优先级队列排序
bool operator>(const PriorityTask& other) const {
return priority > other.priority; // 优先级数值越小,优先级越高
}
};
class PriorityThreadPool {
public:
PriorityThreadPool(size_t numThreads) : stop(false) {
threads.resize(numThreads);
for (size_t i = 0; i < numThreads; ++i) {
threads[i] = std::thread([this]() {
while (true) {
PriorityTask task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this]() { return stop || !tasks.empty(); });
if (stop && tasks.empty()) {
return;
}
task = tasks.top();
tasks.pop();
}
task.task();
}
});
}
}
template<typename F>
void enqueue(int priority, F f) {
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.emplace(priority, f);
}
condition.notify_one();
}
~PriorityThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread &thread : threads) {
thread.join();
}
}
private:
std::vector<std::thread> threads;
std::priority_queue<PriorityTask, std::vector<PriorityTask>, std::greater<PriorityTask>> tasks; // 优先级队列
std::mutex queueMutex;
std::condition_variable condition;
bool stop;
};
int main() {
PriorityThreadPool pool(4);
pool.enqueue(2, []() { // 优先级为 2
std::cout << "Task with priority 2 is running in thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
});
pool.enqueue(1, []() { // 优先级为 1
std::cout << "Task with priority 1 is running in thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
});
pool.enqueue(3, []() { // 优先级为 3
std::cout << "Task with priority 3 is running in thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
});
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "All tasks submitted." << std::endl;
return 0;
}
代码解读:
PriorityTask
: 定义一个包含优先级和任务的类,并重载了operator>
运算符,用于优先级队列的排序。 注意,这里使用std::greater
,使得优先级数值越小的任务优先级越高。std::priority_queue<PriorityTask, std::vector<PriorityTask>, std::greater<PriorityTask>> tasks;
: 声明一个优先级队列,用于存放PriorityTask
对象。
总结:
线程池的设计模式有很多种,选择哪种方式取决于具体的应用场景。
- 固定大小线程池: 简单稳定,适合任务量相对稳定的场景。
- 动态大小线程池: 灵活应变,适合任务量波动较大的场景。
- 任务队列: 可以选择 FIFO 队列或优先级队列,根据需要选择合适的策略。
希望今天的“脱口秀”能帮助大家更好地理解和使用C++线程池。 记住,没有最好的线程池,只有最适合你的线程池! 谢谢大家!