C++ `libdispatch` (Grand Central Dispatch) 在 C++ 中的思想借鉴

哈喽,各位好!今天咱们来聊聊一个挺有意思的话题:C++ 如何借鉴 libdispatch (Grand Central Dispatch) 的思想。如果你写过 iOS 或 macOS 应用,那对 GCD 肯定不陌生;就算没写过,听过“并发编程”总该有印象吧?GCD 就是苹果家用来搞并发的一把利器。

C++ 标准库虽然不像 GCD 那样直接提供一个完整的调度系统,但它也在不断演进,吸收并发编程的精华。咱们就来看看 C++ 在哪些方面借鉴了 GCD 的思想,以及如何用 C++ 实现类似的功能。

一、GCD 的核心思想:抽象、解耦与调度

要理解 C++ 如何借鉴 GCD,首先得明白 GCD 的核心思想是什么。GCD 的本质在于:

  1. 任务抽象: 将要执行的代码块(通常是闭包或函数对象)抽象成任务(dispatch_block_t)。
  2. 执行解耦: 将任务的定义与执行解耦,提交任务到队列,由系统决定何时、何地执行。
  3. 智能调度: 系统根据队列的类型(串行、并行)、优先级以及系统资源,智能地调度任务的执行。

这种思想带来的好处是显而易见的:

  • 简化并发编程: 开发者无需关心线程管理、锁机制等底层细节,只需关注任务的逻辑。
  • 提高程序性能: 系统可以根据硬件资源和任务依赖关系,优化任务的执行顺序和并发度。
  • 增强代码可维护性: 任务的定义和执行分离,使代码更易于理解和修改。

二、C++ 中的并发编程工具:std::threadstd::futurestd::asyncstd::packaged_task

C++11 引入了一系列并发编程工具,虽然没有直接复制 GCD 的 API,但却体现了类似的思想。

  • std::thread 最基础的线程类,允许你创建和管理线程。虽然功能强大,但需要手动处理线程同步和资源管理,比较底层。

    #include <iostream>
    #include <thread>
    
    void task(int id) {
        std::cout << "Task " << id << " is running in thread " << std::this_thread::get_id() << std::endl;
    }
    
    int main() {
        std::thread t1(task, 1);
        std::thread t2(task, 2);
    
        t1.join();
        t2.join();
    
        std::cout << "Main thread finished." << std::endl;
        return 0;
    }
  • std::futurestd::promise 用于在线程之间传递结果。std::promise 用于设置结果,std::future 用于获取结果,它们共同实现了一种异步通信机制。

    #include <iostream>
    #include <future>
    #include <thread>
    
    int calculate_sum(int a, int b) {
        std::cout << "Calculating sum in thread " << std::this_thread::get_id() << std::endl;
        return a + b;
    }
    
    int main() {
        std::promise<int> promise;
        std::future<int> future = promise.get_future();
    
        std::thread t([&promise]() {
            int result = calculate_sum(10, 20);
            promise.set_value(result);
        });
    
        int sum = future.get(); // 阻塞,直到结果可用
        std::cout << "Sum is " << sum << std::endl;
    
        t.join();
        return 0;
    }
  • std::async 提供了一种更便捷的方式来异步执行任务。它会自动创建一个线程(或者使用线程池),执行给定的函数,并返回一个 std::future 对象,用于获取结果。

    #include <iostream>
    #include <future>
    
    int calculate_product(int a, int b) {
        std::cout << "Calculating product in thread " << std::this_thread::get_id() << std::endl;
        return a * b;
    }
    
    int main() {
        std::future<int> future = std::async(std::launch::async, calculate_product, 5, 8); // 强制异步执行
    
        std::cout << "Waiting for result..." << std::endl;
        int product = future.get(); // 阻塞,直到结果可用
        std::cout << "Product is " << product << std::endl;
    
        return 0;
    }
  • std::packaged_task 将函数或可调用对象包装成一个任务,并将其结果与 std::future 关联。这允许你将任务提交到线程池或自定义的调度器中执行。

    #include <iostream>
    #include <future>
    #include <thread>
    #include <queue>
    #include <mutex>
    #include <condition_variable>
    
    class ThreadPool {
    public:
        ThreadPool(size_t num_threads) : stop_(false) {
            for (size_t i = 0; i < num_threads; ++i) {
                threads_.emplace_back([this]() {
                    while (true) {
                        std::function<void()> task;
    
                        {
                            std::unique_lock<std::mutex> lock(queue_mutex_);
                            condition_.wait(lock, [this]() { return stop_ || !tasks_.empty(); });
                            if (stop_ && tasks_.empty()) {
                                return;
                            }
                            task = std::move(tasks_.front());
                            tasks_.pop();
                        }
    
                        task();
                    }
                });
            }
        }
    
        template<class F, class... Args>
        auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
            using return_type = typename std::result_of<F(Args...)>::type;
    
            auto task = std::make_shared<std::packaged_task<return_type()>>(
                    std::bind(std::forward<F>(f), std::forward<Args>(args)...)
            );
    
            std::future<return_type> res = task->get_future();
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);
                if (stop_) {
                    throw std::runtime_error("enqueue on stopped ThreadPool");
                }
                tasks_.emplace([task]() { (*task)(); });
            }
            condition_.notify_one();
            return res;
        }
    
        ~ThreadPool() {
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);
                stop_ = true;
            }
            condition_.notify_all();
            for (std::thread& thread : threads_) {
                thread.join();
            }
        }
    
    private:
        std::vector<std::thread> threads_;
        std::queue<std::function<void()>> tasks_;
        std::mutex queue_mutex_;
        std::condition_variable condition_;
        bool stop_;
    };
    
    int main() {
        ThreadPool pool(4);
    
        std::vector<std::future<int>> results;
        for (int i = 0; i < 8; ++i) {
            results.emplace_back(
                    pool.enqueue([i]() {
                        std::cout << "Task " << i << " is running in thread " << std::this_thread::get_id() << std::endl;
                        return i * i;
                    })
            );
        }
    
        for (auto& result : results) {
            std::cout << "Result: " << result.get() << std::endl;
        }
    
        return 0;
    }

三、C++ 如何借鉴 GCD 的思想:任务抽象与调度策略

虽然 C++ 没有像 GCD 那样直接提供队列的概念,但我们可以利用上述工具,模拟出类似的功能。

  • 任务抽象:使用 std::functionstd::packaged_task

    std::function 可以表示任何可调用对象(函数、lambda 表达式、函数对象),这使得我们可以将任务抽象成一个通用的类型。std::packaged_task 则更进一步,将任务与 std::future 关联,方便获取任务的结果。

    这就像 GCD 中的 dispatch_block_t,都可以代表一个要执行的代码块。

  • 执行解耦:使用线程池

    我们可以创建一个线程池,将任务提交到线程池中执行。线程池负责管理线程的创建和销毁,以及任务的调度。

    这类似于 GCD 中的队列,任务被提交到队列后,由 GCD 负责调度执行。

  • 智能调度:自定义调度器

    虽然 C++ 标准库没有提供内置的调度器,但我们可以自定义调度器,根据任务的优先级、依赖关系等因素,优化任务的执行顺序。

    这类似于 GCD 中不同类型的队列(串行、并行),以及不同的优先级。

四、C++ 实现类似 GCD 功能的示例

下面是一个简单的示例,演示如何使用 C++ 实现一个类似 GCD 的任务调度器。

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <vector>
#include <future>

class TaskQueue {
public:
    using Task = std::function<void()>;

    void enqueue(Task task) {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            tasks_.push(task);
        }
        condition_.notify_one();
    }

    Task dequeue() {
        std::unique_lock<std::mutex> lock(mutex_);
        condition_.wait(lock, [this]() { return !tasks_.empty(); });
        Task task = tasks_.front();
        tasks_.pop();
        return task;
    }

    bool empty() const {
        std::lock_guard<std::mutex> lock(mutex_);
        return tasks_.empty();
    }

private:
    std::queue<Task> tasks_;
    std::mutex mutex_;
    std::condition_variable condition_;
};

class ThreadPool {
public:
    ThreadPool(size_t num_threads) : stop_(false) {
        for (size_t i = 0; i < num_threads; ++i) {
            threads_.emplace_back([this]() {
                while (true) {
                    if (stop_ && task_queue_.empty()) {
                        break;
                    }
                    TaskQueue::Task task;
                    try {
                        task = task_queue_.dequeue();
                        task();
                    } catch (const std::exception& e) {
                        std::cerr << "Exception in thread: " << e.what() << std::endl;
                    }

                }
            });
        }
    }

    template<typename F, typename... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;

        auto task = std::make_shared<std::packaged_task<return_type()>>(
                std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        std::future<return_type> res = task->get_future();
        task_queue_.enqueue([task]() { (*task)(); });
        return res;
    }

    ~ThreadPool() {
        stop_ = true;
        for (auto& thread : threads_) {
            if (thread.joinable()) {
                thread.join();
            }
        }
    }

private:
    std::vector<std::thread> threads_;
    TaskQueue task_queue_;
    std::atomic<bool> stop_;
};

int main() {
    ThreadPool pool(4);

    for (int i = 0; i < 10; ++i) {
        pool.enqueue([i]() {
            std::cout << "Task " << i << " is running in thread " << std::this_thread::get_id() << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        });
    }

    std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait for tasks to finish
    return 0;
}

五、C++20 和 C++23 的新特性:std::jthreadstd::stop_token 和协程

C++20 和 C++23 引入了一些新的特性,进一步增强了 C++ 的并发编程能力,也更贴近 GCD 的思想。

  • std::jthread 类似于 std::thread,但在析构时会自动 join,避免了忘记 join 导致的问题。此外,std::jthread 还支持 std::stop_token,可以优雅地停止线程。

    #include <iostream>
    #include <thread>
    #include <stop_token>
    
    void worker(std::stop_token stop_token) {
        while (!stop_token.stop_requested()) {
            std::cout << "Working..." << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
        std::cout << "Worker stopped." << std::endl;
    }
    
    int main() {
        std::jthread t(worker);
        std::this_thread::sleep_for(std::chrono::seconds(1));
        t.request_stop(); // 请求停止线程
        return 0;
    }
  • 协程: 协程是一种轻量级的并发机制,允许你在函数执行过程中暂停和恢复,而无需创建新的线程。协程可以提高程序的并发性和响应性,尤其是在 I/O 密集型应用中。

    C++ 的协程支持还在不断发展,但已经可以用于实现一些有趣的并发模式。

六、总结:C++ 并发编程的未来

C++ 正在不断吸收并发编程的精华,虽然没有直接复制 GCD 的 API,但却在思想上有很多共通之处。通过 std::threadstd::futurestd::asyncstd::packaged_taskstd::jthread 和协程等工具,我们可以构建出灵活、高效的并发应用。

特性 GCD C++
任务抽象 dispatch_block_t std::function, std::packaged_task
执行解耦 Dispatch Queues 线程池, std::async
智能调度 系统自动调度 可以自定义调度器,但不如 GCD 强大
线程管理 系统自动管理 需要手动管理,但 std::jthread 简化了线程管理
异步通信 Block 和 Dispatch Groups std::future, std::promise
线程停止 dispatch_block_cancel std::stop_token (C++20)
轻量级并发 N/A 协程 (C++20)

当然,C++ 的并发编程仍然面临一些挑战:

  • 复杂性: C++ 的并发编程工具比较底层,需要开发者对线程、锁等概念有深入的理解。
  • 缺乏统一的调度器: C++ 标准库没有提供内置的调度器,需要开发者自行实现。
  • 错误处理: 并发程序容易出现死锁、竞争条件等问题,需要仔细设计和测试。

尽管如此,C++ 仍然是构建高性能并发应用的重要选择。随着 C++ 标准的不断演进,相信 C++ 的并发编程能力会越来越强大,越来越易于使用。

好了,今天的分享就到这里。希望大家对 C++ 如何借鉴 libdispatch 的思想有了更深入的了解。下次再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注