好的,没问题。让我们开始这场关于C++线程池异常处理与任务结果收集的讲座吧!
各位观众,各位朋友,欢迎来到今天的“C++线程池异常处理与任务结果收集”特别节目!我是你们的老朋友,代码界的段子手,BUG克星,今天咱们就来聊聊这个听起来高大上,实则接地气的话题。
开场白:线程池的那些事儿
线程池,顾名思义,就是一堆线程的“澡堂子”。 想象一下,你开了一家餐厅,顾客(任务)络绎不绝。如果每来一个顾客,你就现招一个厨师(线程),炒完菜就让厨师走人,那效率得有多低? 线程池就像你餐厅里预先雇好的一批厨师,顾客来了直接分配,省时省力。
但是,厨师(线程)在炒菜(执行任务)的过程中,万一不小心把盐当成糖放了(抛出异常),或者菜做糊了(任务失败),我们该怎么办? 这就是今天我们要重点讨论的问题:如何在线程池中优雅地处理异常,并收集任务的结果。
第一部分:异常处理,避免线程池“爆炸”
在多线程环境中,异常处理可不是一件小事。一个线程抛出未捕获的异常,轻则导致该线程崩溃,重则可能导致整个程序挂掉。 所以,我们需要一套完善的异常处理机制,确保线程池的稳定运行。
1. 捕获并处理任务中的异常
最直接的方法,就是在任务函数内部进行try-catch。 就像给厨师配上灭火器,一旦着火,立刻扑灭。
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <future>
class ThreadPool {
public:
ThreadPool(size_t numThreads) : numThreads_(numThreads), stop_(false) {
threads_.reserve(numThreads_);
for (size_t i = 0; i < numThreads_; ++i) {
threads_.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex_);
condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) {
return;
}
task = std::move(tasks_.front());
tasks_.pop();
}
try {
task(); // 执行任务,并捕获异常
} catch (const std::exception& e) {
std::cerr << "Task threw an exception: " << e.what() << std::endl;
// 这里可以添加更复杂的错误处理逻辑,例如记录日志
} catch (...) {
std::cerr << "Task threw an unknown exception." << std::endl;
// 处理未知异常
}
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex_);
stop_ = true;
}
condition_.notify_all();
for (std::thread& thread : threads_) {
thread.join();
}
}
template<typename Func, typename... Args>
auto enqueue(Func&& func, Args&&... args) -> std::future<typename std::result_of<Func(Args...)>::type> {
using return_type = typename std::result_of<Func(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queueMutex_);
if (stop_) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks_.emplace([task]() { (*task)(); });
}
condition_.notify_one();
return res;
}
private:
std::vector<std::thread> threads_;
std::queue<std::function<void()>> tasks_;
std::mutex queueMutex_;
std::condition_variable condition_;
size_t numThreads_;
bool stop_;
};
int main() {
ThreadPool pool(4);
// 提交一个可能抛出异常的任务
auto future = pool.enqueue([]() -> int {
std::cout << "Task running..." << std::endl;
throw std::runtime_error("Something went wrong in the task!");
return 42; // 这行代码不会被执行
});
// 提交一个正常运行的任务
auto future2 = pool.enqueue([]() -> int {
std::cout << "Another task running..." << std::endl;
return 100;
});
try {
std::cout << "Result of future2: " << future2.get() << std::endl;
future.get(); // 获取 future 的结果,可能会抛出异常
} catch (const std::exception& e) {
std::cerr << "Exception caught in main: " << e.what() << std::endl;
}
return 0;
}
在这个例子中,ThreadPool
类创建了一个线程池。 enqueue
方法接受一个函数 func
和参数 args
,并将它们绑定到一个 std::packaged_task
中。 packaged_task
将任务的返回值或异常存储在一个 std::future
中,允许调用者获取任务的结果或捕获异常。 在线程池的线程中,任务被包装在一个 try-catch
块中,以捕获任务中可能抛出的任何异常。
2. 使用std::future
传递异常
std::future
不仅可以用来获取任务的结果,还可以用来传递异常。 当任务抛出异常时,std::future::get()
会重新抛出该异常,让调用者可以捕获并处理。
// (接上面的代码)
// 在 enqueue 方法中,已经使用了 std::future 来获取任务结果
// 在 main 函数中,也演示了如何通过 future.get() 捕获异常
3. 自定义异常处理策略
对于某些特定的异常,我们可能需要采取不同的处理策略。 例如,对于一些可以重试的异常,我们可以尝试重新执行任务;对于一些致命的异常,我们可以选择终止程序。
// (接上面的代码,修改 ThreadPool 类的构造函数)
ThreadPool(size_t numThreads, std::function<void(const std::exception&)> errorHandler = nullptr)
: numThreads_(numThreads), stop_(false), errorHandler_(errorHandler) {
// ... 其他代码 ...
threads_.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex_);
condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) {
return;
}
task = std::move(tasks_.front());
tasks_.pop();
}
try {
task(); // 执行任务,并捕获异常
} catch (const std::exception& e) {
if (errorHandler_) {
errorHandler_(e); // 调用自定义的错误处理函数
} else {
std::cerr << "Task threw an exception: " << e.what() << std::endl;
}
} catch (...) {
std::cerr << "Task threw an unknown exception." << std::endl;
}
}
});
}
// 添加一个成员变量用于存储错误处理函数
std::function<void(const std::exception&)> errorHandler_;
// 在 main 函数中使用自定义的错误处理函数
int main() {
// 创建一个自定义的错误处理函数
auto myErrorHandler = [](const std::exception& e) {
std::cerr << "Custom error handler: " << e.what() << std::endl;
// 可以在这里添加重试逻辑或其他处理
};
ThreadPool pool(4, myErrorHandler); // 传递自定义的错误处理函数
// ... 提交任务的代码 ...
}
在这个改进后的 ThreadPool
类中,构造函数接受一个可选的 errorHandler
参数,它是一个 std::function
,用于处理任务中抛出的异常。 如果提供了 errorHandler
,则在捕获到异常时调用它。 这样,你就可以灵活地定义不同的错误处理策略,而无需修改线程池的核心代码。
第二部分:任务结果收集,一个都不能少
光处理异常还不够,我们还得把任务的结果收集起来。 就像餐厅上菜,不能只管炒菜,还得把菜端到客人面前。
1. 使用std::future
获取结果
std::future
是收集任务结果的最佳搭档。 任务执行完毕后,结果会保存在std::future
中,我们可以通过std::future::get()
来获取结果。
// (接上面的代码,继续使用 std::future)
// ThreadPool 类的 enqueue 方法已经返回 std::future
// main 函数中已经演示了如何通过 future.get() 获取结果
2. 处理std::future
的异常
前面已经提到,std::future::get()
在任务抛出异常时会重新抛出该异常。 因此,在使用std::future::get()
获取结果时,务必使用try-catch块。
// (接上面的代码,main 函数中已经包含了 try-catch 块)
3. 批量获取结果
如果我们需要收集多个任务的结果,可以使用std::vector<std::future>
来保存这些future
对象,然后循环调用std::future::get()
来获取结果。
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <future>
class ThreadPool {
public:
ThreadPool(size_t numThreads) : numThreads_(numThreads), stop_(false) {
threads_.reserve(numThreads_);
for (size_t i = 0; i < numThreads_; ++i) {
threads_.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex_);
condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) {
return;
}
task = std::move(tasks_.front());
tasks_.pop();
}
try {
task(); // 执行任务,并捕获异常
} catch (const std::exception& e) {
std::cerr << "Task threw an exception: " << e.what() << std::endl;
// 这里可以添加更复杂的错误处理逻辑,例如记录日志
} catch (...) {
std::cerr << "Task threw an unknown exception." << std::endl;
// 处理未知异常
}
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex_);
stop_ = true;
}
condition_.notify_all();
for (std::thread& thread : threads_) {
thread.join();
}
}
template<typename Func, typename... Args>
auto enqueue(Func&& func, Args&&... args) -> std::future<typename std::result_of<Func(Args...)>::type> {
using return_type = typename std::result_of<Func(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queueMutex_);
if (stop_) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks_.emplace([task]() { (*task)(); });
}
condition_.notify_one();
return res;
}
private:
std::vector<std::thread> threads_;
std::queue<std::function<void()>> tasks_;
std::mutex queueMutex_;
std::condition_variable condition_;
size_t numThreads_;
bool stop_;
};
int main() {
ThreadPool pool(4);
std::vector<std::future<int>> futures;
// 提交多个任务
for (int i = 0; i < 5; ++i) {
futures.emplace_back(pool.enqueue([i]() -> int {
std::cout << "Task " << i << " running..." << std::endl;
if (i == 2) {
throw std::runtime_error("Task 2 failed!");
}
return i * 10;
}));
}
// 收集所有任务的结果
for (int i = 0; i < futures.size(); ++i) {
try {
std::cout << "Result of task " << i << ": " << futures[i].get() << std::endl;
} catch (const std::exception& e) {
std::cerr << "Exception caught for task " << i << ": " << e.what() << std::endl;
}
}
return 0;
}
第三部分:高级技巧,让线程池更上一层楼
掌握了基本的异常处理和结果收集,我们还可以学习一些高级技巧,让线程池更加强大。
1. 使用std::shared_future
共享结果
std::shared_future
允许多个线程同时访问同一个任务的结果。 这在某些场景下非常有用,例如,多个线程需要依赖同一个计算结果。
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <future>
class ThreadPool {
public:
ThreadPool(size_t numThreads) : numThreads_(numThreads), stop_(false) {
threads_.reserve(numThreads_);
for (size_t i = 0; i < numThreads_; ++i) {
threads_.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex_);
condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) {
return;
}
task = std::move(tasks_.front());
tasks_.pop();
}
try {
task(); // 执行任务,并捕获异常
} catch (const std::exception& e) {
std::cerr << "Task threw an exception: " << e.what() << std::endl;
// 这里可以添加更复杂的错误处理逻辑,例如记录日志
} catch (...) {
std::cerr << "Task threw an unknown exception." << std::endl;
// 处理未知异常
}
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex_);
stop_ = true;
}
condition_.notify_all();
for (std::thread& thread : threads_) {
thread.join();
}
}
template<typename Func, typename... Args>
auto enqueue(Func&& func, Args&&... args) -> std::shared_future<typename std::result_of<Func(Args...)>::type> {
using return_type = typename std::result_of<Func(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...)
);
std::shared_future<return_type> res = task->get_future().share(); // 创建 shared_future
{
std::unique_lock<std::mutex> lock(queueMutex_);
if (stop_) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks_.emplace([task]() { (*task)(); });
}
condition_.notify_one();
return res;
}
private:
std::vector<std::thread> threads_;
std::queue<std::function<void()>> tasks_;
std::mutex queueMutex_;
std::condition_variable condition_;
size_t numThreads_;
bool stop_;
};
int main() {
ThreadPool pool(4);
// 提交一个任务
std::shared_future<int> sharedFuture = pool.enqueue([]() -> int {
std::cout << "Task running..." << std::endl;
return 42;
});
// 多个线程同时获取结果
std::vector<std::thread> threads;
for (int i = 0; i < 3; ++i) {
threads.emplace_back([sharedFuture, i]() {
try {
std::cout << "Thread " << i << " Result: " << sharedFuture.get() << std::endl;
} catch (const std::exception& e) {
std::cerr << "Thread " << i << " Exception: " << e.what() << std::endl;
}
});
}
for (auto& thread : threads) {
thread.join();
}
return 0;
}
2. 使用std::promise
手动设置结果
std::promise
允许我们在任务执行的任何时候手动设置结果,或者设置一个异常。 这在某些需要异步设置结果的场景下非常有用。
#include <iostream>
#include <thread>
#include <future>
int main() {
std::promise<int> promise;
std::future<int> future = promise.get_future();
std::thread t([&promise]() {
try {
// 模拟一些耗时操作
std::this_thread::sleep_for(std::chrono::seconds(2));
// 设置结果
promise.set_value(42);
} catch (...) {
// 设置异常
promise.set_exception(std::current_exception());
}
});
try {
std::cout << "Result: " << future.get() << std::endl;
} catch (const std::exception& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
t.join();
return 0;
}
3. 使用回调函数处理结果
除了使用std::future
,我们还可以使用回调函数来处理任务的结果。 任务执行完毕后,直接调用回调函数,将结果传递给调用者。
// (接上面的 ThreadPool 代码, 添加一个回调函数参数到 enqueue 方法)
template<typename Func, typename... Args>
void enqueue(Func&& func, Args&&... args, std::function<void(typename std::result_of<Func(Args...)>::type)> callback) {
using return_type = typename std::result_of<Func(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...)
);
{
std::unique_lock<std::mutex> lock(queueMutex_);
if (stop_) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks_.emplace([task, callback]() {
try {
(*task)();
callback(task->get_future().get()); // 调用回调函数,传递结果
} catch (const std::exception& e) {
std::cerr << "Task threw an exception: " << e.what() << std::endl;
// 在这里也可以调用一个错误处理回调函数
}
});
}
condition_.notify_one();
}
// 在 main 函数中使用回调函数
int main() {
ThreadPool pool(4);
pool.enqueue([]() -> int {
std::cout << "Task running..." << std::endl;
return 42;
}, [](int result) {
std::cout << "Result from callback: " << result << std::endl;
});
// ... 其他代码 ...
}
总结:线程池的“葵花宝典”
今天的讲座,我们深入探讨了C++线程池中的异常处理和任务结果收集。 记住以下几点:
- 异常处理是关键: 使用try-catch块捕获任务中的异常,避免线程池崩溃。
std::future
是好帮手: 使用std::future
获取任务结果,并处理可能抛出的异常。- 灵活运用高级技巧: 使用
std::shared_future
共享结果,使用std::promise
手动设置结果,使用回调函数处理结果。
掌握了这些技巧,你就可以打造一个稳定、高效、可靠的C++线程池,让你的程序如虎添翼!
结束语:代码之路,永无止境
代码的世界充满挑战,也充满乐趣。 希望今天的讲座能对你有所帮助。 记住,学习永无止境,让我们一起在代码的海洋里乘风破浪,勇往直前! 感谢各位的收看,咱们下期再见!