好的,各位观众老爷们,今天咱们就来聊聊C++里的“线程池”这个神奇的东西。别害怕,虽然听起来像个很专业的名词,但其实它就是一个能帮你管理线程,让你的程序跑得更快更流畅的小管家。想象一下,你开了一家餐厅,线程池就是你餐厅里的服务员团队,而你要处理的任务就是顾客的点单。没有线程池的时候,来一个顾客你就临时招一个服务员,顾客走了服务员就没事干了,是不是很浪费?有了线程池,你就可以提前雇好一批服务员,顾客来了直接让他们去服务,顾客走了他们还可以继续服务下一位,效率嗖嗖嗖就上去了!
一、 啥是线程池?为啥要用它?
线程池,顾名思义,就是一个装满了线程的池子。它是一个预先创建好的线程集合,可以用来执行并发任务。
为啥要用线程池?
-
减少线程创建和销毁的开销: 线程的创建和销毁是很耗费资源的。如果每个任务都创建一个线程,任务结束后销毁线程,就会导致大量的资源浪费。线程池可以避免频繁的创建和销毁线程,提高程序的性能。就像你不用每次顾客来都重新培训一个服务员,而是直接用现成的。
-
提高响应速度: 当有新任务到达时,线程池可以立即分配一个空闲线程来执行任务,无需等待线程的创建。这样可以大大缩短任务的响应时间。想象一下,顾客来了,马上就有服务员过去点单,是不是很爽?
-
控制并发线程的数量: 线程池可以限制并发执行的线程数量,防止过多的线程占用系统资源,导致系统崩溃。就像你的餐厅座位有限,不能让所有顾客都进来,否则会乱套的。
-
提高线程的可管理性: 线程池可以统一管理线程的生命周期,方便监控和调优。你可以随时查看服务员的工作状态,调整他们的工作安排。
二、 线程池的核心组件
一个基本的线程池通常包含以下几个核心组件:
-
任务队列 (Task Queue): 用于存放待执行的任务。可以是一个 FIFO 队列,也可以是一个优先级队列。就像餐厅的点单本,记录了所有顾客的点单。
-
线程管理器 (Thread Manager): 负责创建、销毁和管理线程。就像餐厅的经理,负责招聘、培训和服务员的管理。
-
工作线程 (Worker Threads): 线程池中的线程,负责从任务队列中取出任务并执行。就像餐厅的服务员,负责点单、上菜。
-
结果队列 (Result Queue,可选): 用于存放任务的执行结果。就像餐厅的厨房,负责将做好的菜品交给服务员。
三、 C++ 线程池的简单实现
下面我们来用 C++ 实现一个简单的线程池。为了让大家更容易理解,我们先写一个最基础的版本,然后再逐步完善。
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
class ThreadPool {
public:
ThreadPool(size_t num_threads) : num_threads_(num_threads), stop_(false) {
threads_.reserve(num_threads_);
for (size_t i = 0; i < num_threads_; ++i) {
threads_.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty())
return;
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for (std::thread &thread : threads_) {
thread.join();
}
}
template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex_);
if (stop_)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks_.emplace([task]() { (*task)(); });
}
condition_.notify_one();
return res;
}
private:
size_t num_threads_;
std::vector<std::thread> threads_;
std::queue<std::function<void()>> tasks_;
std::mutex queue_mutex_;
std::condition_variable condition_;
bool stop_;
};
int main() {
ThreadPool pool(4); // 创建一个包含4个线程的线程池
std::vector<std::future<int>> results;
for (int i = 0; i < 8; ++i) {
results.emplace_back(
pool.enqueue([i]{
std::cout << "Task " << i << " running on thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
return i * i;
})
);
}
for (auto &result : results)
std::cout << result.get() << " ";
std::cout << std::endl;
return 0;
}
代码解释:
-
ThreadPool
类: 这是线程池的核心类。num_threads_
: 线程池中线程的数量。threads_
: 存储线程对象的std::vector
。tasks_
: 任务队列,存储待执行的函数对象。这里使用了std::queue
,先进先出。queue_mutex_
: 保护任务队列的互斥锁。condition_
: 条件变量,用于线程间的同步,当任务队列为空时,线程会进入等待状态。stop_
: 标志位,用于通知线程池停止工作。
-
构造函数
ThreadPool(size_t num_threads)
:- 初始化线程池,创建指定数量的线程。
- 每个线程执行一个循环,不断从任务队列中取出任务并执行。
- 使用了
std::unique_lock
和std::condition_variable
来实现线程的同步和等待。 condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
这行代码是关键,它会让线程在任务队列为空时进入等待状态,直到有新的任务加入或者线程池被停止。
-
析构函数
~ThreadPool()
:- 停止线程池,通知所有线程退出。
- 使用
condition_.notify_all()
唤醒所有等待的线程。 - 使用
thread.join()
等待所有线程执行完毕。
-
enqueue(F&& f, Args&&... args)
函数:- 用于向任务队列中添加任务。
- 使用了模板和完美转发,可以接受任意类型的函数对象和参数。
- 使用
std::bind
将函数对象和参数绑定在一起,创建一个新的函数对象。 - 使用
std::packaged_task
封装任务,可以获取任务的执行结果。 - 将任务添加到任务队列中,并使用
condition_.notify_one()
唤醒一个等待的线程。
-
main()
函数:- 创建一个包含 4 个线程的线程池。
- 向线程池中添加 8 个任务。
- 每个任务会打印一条消息,并休眠 1 秒。
- 获取每个任务的执行结果并打印。
四、 线程池的改进和优化
上面的代码只是一个简单的线程池实现,还有很多可以改进和优化的地方。
-
异常处理:
- 在线程执行任务时,可能会抛出异常。如果没有处理异常,可能会导致程序崩溃。
- 可以使用
try...catch
块来捕获异常,并进行处理。 - 可以将异常信息记录到日志中,或者将异常重新抛出。
-
任务优先级:
- 可以为任务设置优先级,让优先级高的任务先执行。
- 可以使用
std::priority_queue
来实现优先级队列。 - 需要自定义比较函数,根据任务的优先级进行排序.
-
线程池大小的动态调整:
- 可以根据任务的负载情况,动态调整线程池的大小。
- 当任务队列过长时,可以增加线程池的大小。
- 当任务队列很短时,可以减少线程池的大小。
- 需要注意的是,动态调整线程池的大小可能会带来一些性能开销。
-
空闲线程的回收:
- 当线程池中的线程长时间处于空闲状态时,可以将其回收,释放系统资源。
- 可以使用定时器来检测空闲线程,并将其回收。
-
拒绝策略:
- 当任务队列已满时,需要采取一定的拒绝策略,防止任务丢失。
- 常见的拒绝策略有:
- 丢弃任务: 直接丢弃新来的任务。
- 抛出异常: 抛出一个异常,通知调用者任务无法执行。
- 阻塞等待: 阻塞调用者,直到任务队列有空闲位置。
- 调用者运行: 让调用者自己执行任务。
五、 一个更完善的线程池实现
下面是一个更完善的线程池实现,包含了异常处理、任务优先级和拒绝策略等功能。
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <algorithm>
enum class RejectPolicy {
Discard,
ThrowException,
CallerRuns
};
class ThreadPool {
public:
ThreadPool(size_t num_threads, RejectPolicy reject_policy = RejectPolicy::Discard)
: num_threads_(num_threads), stop_(false), reject_policy_(reject_policy) {
threads_.reserve(num_threads_);
for (size_t i = 0; i < num_threads_; ++i) {
threads_.emplace_back([this] {
while (true) {
Task task;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty())
return;
task = std::move(tasks_.top());
tasks_.pop();
}
try {
task.func(); // 执行任务
} catch (const std::exception& e) {
std::cerr << "Exception in thread: " << e.what() << std::endl;
} catch (...) {
std::cerr << "Unknown exception in thread" << std::endl;
}
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for (std::thread& thread : threads_) {
thread.join();
}
}
template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args, int priority = 0)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
Task t;
t.priority = priority;
t.func = [task]() { (*task)(); };
{
std::unique_lock<std::mutex> lock(queue_mutex_);
if (stop_) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
if (tasks_.size() >= max_queue_size_ && reject_policy_ != RejectPolicy::CallerRuns) {
switch (reject_policy_) {
case RejectPolicy::Discard:
std::cerr << "Task discarded due to queue full." << std::endl;
return std::future<return_type>(); // 返回一个空的 future
case RejectPolicy::ThrowException:
throw std::runtime_error("Task queue is full");
default:
break;
}
}
tasks_.push(t);
}
condition_.notify_one();
if (tasks_.size() >= max_queue_size_ && reject_policy_ == RejectPolicy::CallerRuns) {
(*task)(); // 在调用者线程中运行
}
return res;
}
private:
size_t num_threads_;
std::vector<std::thread> threads_;
struct Task {
int priority;
std::function<void()> func;
bool operator<(const Task& other) const {
return priority < other.priority; // 优先级高的先执行
}
};
std::priority_queue<Task, std::vector<Task>> tasks_; // 优先级队列
std::mutex queue_mutex_;
std::condition_variable condition_;
bool stop_;
RejectPolicy reject_policy_;
static const size_t max_queue_size_ = 10; // 最大任务队列大小
};
int main() {
ThreadPool pool(4, RejectPolicy::ThrowException); // 创建一个包含4个线程的线程池, 拒绝策略为抛出异常
std::vector<std::future<int>> results;
try {
for (int i = 0; i < 12; ++i) {
results.emplace_back(
pool.enqueue([i] {
std::cout << "Task " << i << " running on thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
return i * i;
}, i) // 优先级为 i
);
}
} catch (const std::exception& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
for (auto& result : results)
if(result.valid()) // 检查 future 是否有效,避免获取空 future 的值
std::cout << result.get() << " ";
std::cout << std::endl;
return 0;
}
代码解释:
RejectPolicy
枚举: 定义了拒绝策略的类型。- 构造函数
ThreadPool(size_t num_threads, RejectPolicy reject_policy)
: 接受一个RejectPolicy
参数,用于设置拒绝策略。 enqueue(F&& f, Args&&... args, int priority)
函数: 接受一个priority
参数,用于设置任务的优先级。Task
结构体: 包含任务的优先级和函数对象。tasks_
: 使用std::priority_queue
作为任务队列,实现优先级队列。- 在线程执行任务时,使用
try...catch
块捕获异常。 - 当任务队列已满时,根据拒绝策略进行处理。
六、总结
线程池是一个非常有用的工具,可以帮助你提高程序的性能和可管理性。虽然实现一个线程池需要一定的技巧,但是只要理解了线程池的核心原理,就可以轻松地实现一个适合自己需求的线程池。
希望今天的讲解对大家有所帮助,下次再见!