C++ 线程池中的异常处理与任务结果收集

好的,没问题。让我们开始这场关于C++线程池异常处理与任务结果收集的讲座吧!

各位观众,各位朋友,欢迎来到今天的“C++线程池异常处理与任务结果收集”特别节目!我是你们的老朋友,代码界的段子手,BUG克星,今天咱们就来聊聊这个听起来高大上,实则接地气的话题。

开场白:线程池的那些事儿

线程池,顾名思义,就是一堆线程的“澡堂子”。 想象一下,你开了一家餐厅,顾客(任务)络绎不绝。如果每来一个顾客,你就现招一个厨师(线程),炒完菜就让厨师走人,那效率得有多低? 线程池就像你餐厅里预先雇好的一批厨师,顾客来了直接分配,省时省力。

但是,厨师(线程)在炒菜(执行任务)的过程中,万一不小心把盐当成糖放了(抛出异常),或者菜做糊了(任务失败),我们该怎么办? 这就是今天我们要重点讨论的问题:如何在线程池中优雅地处理异常,并收集任务的结果。

第一部分:异常处理,避免线程池“爆炸”

在多线程环境中,异常处理可不是一件小事。一个线程抛出未捕获的异常,轻则导致该线程崩溃,重则可能导致整个程序挂掉。 所以,我们需要一套完善的异常处理机制,确保线程池的稳定运行。

1. 捕获并处理任务中的异常

最直接的方法,就是在任务函数内部进行try-catch。 就像给厨师配上灭火器,一旦着火,立刻扑灭。

#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <future>

class ThreadPool {
public:
    ThreadPool(size_t numThreads) : numThreads_(numThreads), stop_(false) {
        threads_.reserve(numThreads_);
        for (size_t i = 0; i < numThreads_; ++i) {
            threads_.emplace_back([this] {
                while (true) {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(queueMutex_);
                        condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
                        if (stop_ && tasks_.empty()) {
                            return;
                        }
                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }

                    try {
                        task(); // 执行任务,并捕获异常
                    } catch (const std::exception& e) {
                        std::cerr << "Task threw an exception: " << e.what() << std::endl;
                        // 这里可以添加更复杂的错误处理逻辑,例如记录日志
                    } catch (...) {
                        std::cerr << "Task threw an unknown exception." << std::endl;
                        // 处理未知异常
                    }
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queueMutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (std::thread& thread : threads_) {
            thread.join();
        }
    }

    template<typename Func, typename... Args>
    auto enqueue(Func&& func, Args&&... args) -> std::future<typename std::result_of<Func(Args...)>::type> {
        using return_type = typename std::result_of<Func(Args...)>::type;

        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<Func>(func), std::forward<Args>(args)...)
        );

        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queueMutex_);
            if (stop_) {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }
            tasks_.emplace([task]() { (*task)(); });
        }
        condition_.notify_one();
        return res;
    }

private:
    std::vector<std::thread> threads_;
    std::queue<std::function<void()>> tasks_;
    std::mutex queueMutex_;
    std::condition_variable condition_;
    size_t numThreads_;
    bool stop_;
};

int main() {
    ThreadPool pool(4);

    // 提交一个可能抛出异常的任务
    auto future = pool.enqueue([]() -> int {
        std::cout << "Task running..." << std::endl;
        throw std::runtime_error("Something went wrong in the task!");
        return 42; // 这行代码不会被执行
    });

    // 提交一个正常运行的任务
    auto future2 = pool.enqueue([]() -> int {
        std::cout << "Another task running..." << std::endl;
        return 100;
    });

    try {
        std::cout << "Result of future2: " << future2.get() << std::endl;
        future.get(); // 获取 future 的结果,可能会抛出异常
    } catch (const std::exception& e) {
        std::cerr << "Exception caught in main: " << e.what() << std::endl;
    }

    return 0;
}

在这个例子中,ThreadPool 类创建了一个线程池。 enqueue 方法接受一个函数 func 和参数 args,并将它们绑定到一个 std::packaged_task 中。 packaged_task 将任务的返回值或异常存储在一个 std::future 中,允许调用者获取任务的结果或捕获异常。 在线程池的线程中,任务被包装在一个 try-catch 块中,以捕获任务中可能抛出的任何异常。

2. 使用std::future传递异常

std::future不仅可以用来获取任务的结果,还可以用来传递异常。 当任务抛出异常时,std::future::get()会重新抛出该异常,让调用者可以捕获并处理。

// (接上面的代码)
// 在 enqueue 方法中,已经使用了 std::future 来获取任务结果
// 在 main 函数中,也演示了如何通过 future.get() 捕获异常

3. 自定义异常处理策略

对于某些特定的异常,我们可能需要采取不同的处理策略。 例如,对于一些可以重试的异常,我们可以尝试重新执行任务;对于一些致命的异常,我们可以选择终止程序。

// (接上面的代码,修改 ThreadPool 类的构造函数)
ThreadPool(size_t numThreads, std::function<void(const std::exception&)> errorHandler = nullptr)
        : numThreads_(numThreads), stop_(false), errorHandler_(errorHandler) {
    // ... 其他代码 ...

     threads_.emplace_back([this] {
                while (true) {
                    std::function<void()> task;

                    {
                         std::unique_lock<std::mutex> lock(queueMutex_);
                        condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
                        if (stop_ && tasks_.empty()) {
                            return;
                        }
                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }

                    try {
                        task(); // 执行任务,并捕获异常
                    } catch (const std::exception& e) {
                         if (errorHandler_) {
                            errorHandler_(e); // 调用自定义的错误处理函数
                        } else {
                            std::cerr << "Task threw an exception: " << e.what() << std::endl;
                        }

                    } catch (...) {
                        std::cerr << "Task threw an unknown exception." << std::endl;
                    }
                }
            });
}

// 添加一个成员变量用于存储错误处理函数
std::function<void(const std::exception&)> errorHandler_;

// 在 main 函数中使用自定义的错误处理函数
int main() {
    // 创建一个自定义的错误处理函数
    auto myErrorHandler = [](const std::exception& e) {
        std::cerr << "Custom error handler: " << e.what() << std::endl;
        // 可以在这里添加重试逻辑或其他处理
    };

    ThreadPool pool(4, myErrorHandler); // 传递自定义的错误处理函数

   // ... 提交任务的代码 ...
}

在这个改进后的 ThreadPool 类中,构造函数接受一个可选的 errorHandler 参数,它是一个 std::function,用于处理任务中抛出的异常。 如果提供了 errorHandler,则在捕获到异常时调用它。 这样,你就可以灵活地定义不同的错误处理策略,而无需修改线程池的核心代码。

第二部分:任务结果收集,一个都不能少

光处理异常还不够,我们还得把任务的结果收集起来。 就像餐厅上菜,不能只管炒菜,还得把菜端到客人面前。

1. 使用std::future获取结果

std::future是收集任务结果的最佳搭档。 任务执行完毕后,结果会保存在std::future中,我们可以通过std::future::get()来获取结果。

// (接上面的代码,继续使用 std::future)
//  ThreadPool 类的 enqueue 方法已经返回 std::future
//  main 函数中已经演示了如何通过 future.get() 获取结果

2. 处理std::future的异常

前面已经提到,std::future::get()在任务抛出异常时会重新抛出该异常。 因此,在使用std::future::get()获取结果时,务必使用try-catch块。

// (接上面的代码,main 函数中已经包含了 try-catch 块)

3. 批量获取结果

如果我们需要收集多个任务的结果,可以使用std::vector<std::future>来保存这些future对象,然后循环调用std::future::get()来获取结果。

#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <future>

class ThreadPool {
public:
    ThreadPool(size_t numThreads) : numThreads_(numThreads), stop_(false) {
        threads_.reserve(numThreads_);
        for (size_t i = 0; i < numThreads_; ++i) {
            threads_.emplace_back([this] {
                while (true) {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(queueMutex_);
                        condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
                        if (stop_ && tasks_.empty()) {
                            return;
                        }
                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }

                    try {
                        task(); // 执行任务,并捕获异常
                    } catch (const std::exception& e) {
                        std::cerr << "Task threw an exception: " << e.what() << std::endl;
                        // 这里可以添加更复杂的错误处理逻辑,例如记录日志
                    } catch (...) {
                        std::cerr << "Task threw an unknown exception." << std::endl;
                        // 处理未知异常
                    }
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queueMutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (std::thread& thread : threads_) {
            thread.join();
        }
    }

    template<typename Func, typename... Args>
    auto enqueue(Func&& func, Args&&... args) -> std::future<typename std::result_of<Func(Args...)>::type> {
        using return_type = typename std::result_of<Func(Args...)>::type;

        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<Func>(func), std::forward<Args>(args)...)
        );

        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queueMutex_);
            if (stop_) {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }
            tasks_.emplace([task]() { (*task)(); });
        }
        condition_.notify_one();
        return res;
    }

private:
    std::vector<std::thread> threads_;
    std::queue<std::function<void()>> tasks_;
    std::mutex queueMutex_;
    std::condition_variable condition_;
    size_t numThreads_;
    bool stop_;
};

int main() {
    ThreadPool pool(4);

    std::vector<std::future<int>> futures;

    // 提交多个任务
    for (int i = 0; i < 5; ++i) {
        futures.emplace_back(pool.enqueue([i]() -> int {
            std::cout << "Task " << i << " running..." << std::endl;
            if (i == 2) {
                throw std::runtime_error("Task 2 failed!");
            }
            return i * 10;
        }));
    }

    // 收集所有任务的结果
    for (int i = 0; i < futures.size(); ++i) {
        try {
            std::cout << "Result of task " << i << ": " << futures[i].get() << std::endl;
        } catch (const std::exception& e) {
            std::cerr << "Exception caught for task " << i << ": " << e.what() << std::endl;
        }
    }

    return 0;
}

第三部分:高级技巧,让线程池更上一层楼

掌握了基本的异常处理和结果收集,我们还可以学习一些高级技巧,让线程池更加强大。

1. 使用std::shared_future共享结果

std::shared_future允许多个线程同时访问同一个任务的结果。 这在某些场景下非常有用,例如,多个线程需要依赖同一个计算结果。

#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <future>

class ThreadPool {
public:
    ThreadPool(size_t numThreads) : numThreads_(numThreads), stop_(false) {
        threads_.reserve(numThreads_);
        for (size_t i = 0; i < numThreads_; ++i) {
            threads_.emplace_back([this] {
                while (true) {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(queueMutex_);
                        condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
                        if (stop_ && tasks_.empty()) {
                            return;
                        }
                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }

                    try {
                        task(); // 执行任务,并捕获异常
                    } catch (const std::exception& e) {
                        std::cerr << "Task threw an exception: " << e.what() << std::endl;
                        // 这里可以添加更复杂的错误处理逻辑,例如记录日志
                    } catch (...) {
                        std::cerr << "Task threw an unknown exception." << std::endl;
                        // 处理未知异常
                    }
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queueMutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (std::thread& thread : threads_) {
            thread.join();
        }
    }

    template<typename Func, typename... Args>
    auto enqueue(Func&& func, Args&&... args) -> std::shared_future<typename std::result_of<Func(Args...)>::type> {
        using return_type = typename std::result_of<Func(Args...)>::type;

        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<Func>(func), std::forward<Args>(args)...)
        );

        std::shared_future<return_type> res = task->get_future().share(); // 创建 shared_future
        {
            std::unique_lock<std::mutex> lock(queueMutex_);
            if (stop_) {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }
            tasks_.emplace([task]() { (*task)(); });
        }
        condition_.notify_one();
        return res;
    }

private:
    std::vector<std::thread> threads_;
    std::queue<std::function<void()>> tasks_;
    std::mutex queueMutex_;
    std::condition_variable condition_;
    size_t numThreads_;
    bool stop_;
};

int main() {
    ThreadPool pool(4);

    // 提交一个任务
    std::shared_future<int> sharedFuture = pool.enqueue([]() -> int {
        std::cout << "Task running..." << std::endl;
        return 42;
    });

    // 多个线程同时获取结果
    std::vector<std::thread> threads;
    for (int i = 0; i < 3; ++i) {
        threads.emplace_back([sharedFuture, i]() {
            try {
                std::cout << "Thread " << i << " Result: " << sharedFuture.get() << std::endl;
            } catch (const std::exception& e) {
                std::cerr << "Thread " << i << " Exception: " << e.what() << std::endl;
            }
        });
    }

    for (auto& thread : threads) {
        thread.join();
    }

    return 0;
}

2. 使用std::promise手动设置结果

std::promise允许我们在任务执行的任何时候手动设置结果,或者设置一个异常。 这在某些需要异步设置结果的场景下非常有用。

#include <iostream>
#include <thread>
#include <future>

int main() {
    std::promise<int> promise;
    std::future<int> future = promise.get_future();

    std::thread t([&promise]() {
        try {
            // 模拟一些耗时操作
            std::this_thread::sleep_for(std::chrono::seconds(2));
            // 设置结果
            promise.set_value(42);
        } catch (...) {
            // 设置异常
            promise.set_exception(std::current_exception());
        }
    });

    try {
        std::cout << "Result: " << future.get() << std::endl;
    } catch (const std::exception& e) {
        std::cerr << "Exception: " << e.what() << std::endl;
    }

    t.join();

    return 0;
}

3. 使用回调函数处理结果

除了使用std::future,我们还可以使用回调函数来处理任务的结果。 任务执行完毕后,直接调用回调函数,将结果传递给调用者。

// (接上面的 ThreadPool 代码, 添加一个回调函数参数到 enqueue 方法)
template<typename Func, typename... Args>
void enqueue(Func&& func, Args&&... args, std::function<void(typename std::result_of<Func(Args...)>::type)> callback) {
    using return_type = typename std::result_of<Func(Args...)>::type;

    auto task = std::make_shared<std::packaged_task<return_type()>>(
        std::bind(std::forward<Func>(func), std::forward<Args>(args)...)
    );

    {
        std::unique_lock<std::mutex> lock(queueMutex_);
        if (stop_) {
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }
        tasks_.emplace([task, callback]() {
            try {
                (*task)();
                callback(task->get_future().get()); // 调用回调函数,传递结果
            } catch (const std::exception& e) {
                std::cerr << "Task threw an exception: " << e.what() << std::endl;
                // 在这里也可以调用一个错误处理回调函数
            }
        });
    }
    condition_.notify_one();
}

// 在 main 函数中使用回调函数
int main() {
    ThreadPool pool(4);

    pool.enqueue([]() -> int {
        std::cout << "Task running..." << std::endl;
        return 42;
    }, [](int result) {
        std::cout << "Result from callback: " << result << std::endl;
    });

    // ... 其他代码 ...
}

总结:线程池的“葵花宝典”

今天的讲座,我们深入探讨了C++线程池中的异常处理和任务结果收集。 记住以下几点:

  • 异常处理是关键: 使用try-catch块捕获任务中的异常,避免线程池崩溃。
  • std::future是好帮手: 使用std::future获取任务结果,并处理可能抛出的异常。
  • 灵活运用高级技巧: 使用std::shared_future共享结果,使用std::promise手动设置结果,使用回调函数处理结果。

掌握了这些技巧,你就可以打造一个稳定、高效、可靠的C++线程池,让你的程序如虎添翼!

结束语:代码之路,永无止境

代码的世界充满挑战,也充满乐趣。 希望今天的讲座能对你有所帮助。 记住,学习永无止境,让我们一起在代码的海洋里乘风破浪,勇往直前! 感谢各位的收看,咱们下期再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注