C++20 协程内核:探究无栈协程(Stackless Coroutines)的状态机拆解与堆对称性

C++20 协程内核:探究无栈协程的状态机拆解与堆对称性

引言:为什么需要协程?

在现代软件开发中,处理并发和异步操作已成为常态。传统的并发模型主要围绕线程展开,但线程存在一些固有的挑战。首先,线程的创建和上下文切换开销相对较大,尤其是在需要大量并发任务的场景下,性能瓶颈显而易见。其次,多线程编程常常伴随着复杂的同步机制(互斥锁、条件变量等),这不仅增加了编程的复杂性,也极易引入死锁、竞态条件等难以调试的问题。最后,基于回调(callback)的异步编程模式,虽然避免了线程的直接管理,却常常导致臭名昭著的“回调地狱”(Callback Hell),代码可读性和可维护性急剧下降。

为了应对这些挑战,协程(Coroutines)应运而生。协程是一种用户态的轻量级并发原语,它允许函数在执行过程中暂停,并在稍后从暂停点恢复执行,而无需阻塞调用线程。与线程不同,协程的调度完全由程序员控制,其上下文切换开销极小,通常只涉及少量寄存器和栈指针的保存与恢复。最重要的是,协程允许我们以近乎同步的、顺序的编码风格来表达异步逻辑,极大地提升了代码的清晰度和可维护性,有效解决了回调地狱问题。

C++20 标准正式引入了协程支持,这标志着协程从库级解决方案(如Boost.Context、libco)上升到了语言和编译器层面。C++220协程是一种“无栈协程”(Stackless Coroutines),这意味着协程的暂停和恢复状态不依赖于独立的调用栈,而是由编译器将协程函数转换为一个状态机,并将所有必要的上下文数据(如局部变量、参数、当前执行点)存储在一个特殊的内存区域——协程帧(Coroutine Frame)中,这个帧通常分配在堆上。本文将深入探讨C++20无栈协程的内部机制,特别是其状态机拆解原理和堆对称性。

C++20 协程基础概念

C++20 协程的实现依赖于三个核心关键字和一套库类型:

  • co_await: 用于暂停当前协程,等待一个可等待(Awaitable)对象的完成,并在完成后恢复执行。它是协程实现异步操作的关键。
  • co_yield: 用于暂停当前协程,并向调用者返回一个值。它主要用于实现生成器(Generator)模式。
  • co_return: 用于结束协程的执行,并返回一个最终结果(或无结果)。

除了关键字,C++20 协程还需要一些特定的类型来定义其行为:

  1. 协程句柄 (std::coroutine_handle):
    这是一个轻量级的、非拥有型的类型,用于操纵一个已暂停的协程。通过协程句柄,我们可以检查协程是否已经完成,或者恢复(resume())它的执行,甚至销毁(destroy())它。

    template<typename Promise = void>
    struct coroutine_handle;

    Promise 是协程承诺类型,通过它我们可以访问协程帧中的承诺对象。

  2. 承诺类型 (Promise Type):
    每个协程函数都关联一个承诺类型,它定义了协程与外部世界交互的接口和行为。编译器通过承诺类型的方法来控制协程的生命周期、返回值、异常处理以及暂停/恢复逻辑。承诺类型是协程机制的核心,它必须提供以下方法(或其变体):

    • initial_suspend(): 协程体执行前的初始暂停行为。
    • final_suspend(): 协程体执行完成后的最终暂停行为。
    • get_return_object(): 获取协程的返回对象(通常是 TaskFuture 类型),这是协程函数签名的实际返回类型。
    • return_void() / return_value(T): 处理 co_return 语句。
    • unhandled_exception(): 处理协程内部未捕获的异常。
    • yield_value(T): 处理 co_yield 语句。
  3. 可等待对象 (Awaitable) 与等待器 (Awaiter):
    co_await 一个表达式时,该表达式的结果必须是一个可等待对象。可等待对象负责提供等待器。等待器是实际定义 co_await 操作行为的类型,它必须实现以下三个方法:

    • await_ready(): 检查是否需要暂停。如果返回 true,协程不暂停,立即执行 await_resume()
    • await_suspend(std::coroutine_handle<>): 执行暂停操作。它接收当前协程的句柄,允许等待器在暂停期间保存该句柄,以便稍后恢复。
    • await_resume(): 在协程恢复后执行,用于获取 co_await 表达式的结果。
  4. 协程的返回类型 (Return Type):
    协程函数的声明必须返回一个可被 std::coroutine_traits 特化的类型,或者该类型本身就是 std::coroutine_handlevoid。这个返回类型通常是一个封装了协程状态或结果的 TaskFuture 对象。编译器会通过 Promise::get_return_object() 方法来构造这个返回对象。

无栈协程的本质:状态机拆解

无栈协程的核心思想在于,它不为每个协程分配一个独立的运行时栈。相反,编译器会深度介入,将协程函数转换为一个复杂的状态机。这个状态机包含了协程的所有暂停点、恢复逻辑以及局部变量和参数的存储。

栈帧的限制与无栈的必要性

在传统的函数调用中,每个函数调用都会在调用栈上创建一个栈帧,存储函数的参数、局部变量、返回地址等信息。当函数返回时,其栈帧被销毁。有栈协程(如Go语言的goroutines或Boost.Context的fibers)会为每个协程维护一个独立的栈,当协程切换时,整个栈的状态都会被保存和恢复。这种方式简单直观,但每个协程都需要一个相对较大的固定大小的栈空间(即使大部分时间是空的),并且栈的切换开销也比无栈协程大。

无栈协程之所以“无栈”,是因为它的生命周期可以独立于其调用者的栈帧。一个协程可能在其调用函数返回后很久才恢复执行,这意味着协程的局部变量和参数不能存储在调用者的栈上。为了解决这个问题,编译器会将所有需要在协程暂停和恢复之间持久化的数据(即协程的“上下文”)从调用栈中“提升”出来,统一存储在一个特殊的内存区域,这个区域被称为“协程帧”(Coroutine Frame)。

编译器魔法:协程函数到状态机的转换

当编译器遇到一个包含 co_awaitco_yieldco_return 关键字的函数时,它会识别出这是一个协程。然后,它会执行以下关键转换步骤:

  1. 识别暂停点: 编译器会找出所有的 co_awaitco_yield 语句,这些都是潜在的暂停点。
  2. 提取持久化状态: 所有在协程暂停后可能需要被访问的局部变量、参数以及 this 指针(对于成员函数)都会被识别出来,并从栈上移动到协程帧中。
  3. 生成状态机: 协程函数体被重写为一个隐式的状态机。每个暂停点对应一个状态。当协程暂停时,状态机记录下当前的状态(即下一个要执行的语句)。当协程恢复时,状态机根据记录的状态跳转到正确的执行点。
  4. 创建协程帧: 编译器会为每个协程实例生成一个协程帧结构。这个结构体包含了:
    • 协程的承诺对象(Promise)。
    • 所有需要持久化的局部变量和参数。
    • 一个表示当前协程状态的字段(通常是一个整数或枚举)。
    • 其他内部管理信息。

概念模型: 考虑一个简单的协程函数:

Task my_coroutine(int initial_value) {
    std::cout << "Coroutine started with: " << initial_value << std::endl;
    co_await std::suspend_always{}; // 暂停点1
    std::cout << "Coroutine resumed after suspend_always." << std::endl;
    int x = initial_value + 10;
    co_await std::suspend_never{}; // 暂停点2 (实际不暂停)
    std::cout << "Coroutine resumed after suspend_never. x = " << x << std::endl;
    co_return x * 2;
}

这个协程在编译器眼中可能被转换为类似于以下伪代码的结构:

// 伪协程帧结构
struct CoroutineFrame {
    // 承诺对象
    MyPromise promise;
    // 持久化参数
    int initial_value;
    // 持久化局部变量
    int x; // 在 co_await std::suspend_always{} 之后才定义,但编译器可能将其提升
    // 状态指示器
    int state; // 0: 初始, 1: suspend_always之后, 2: suspend_never之后, -1: 完成

    // 伪协程执行函数
    void resume() {
        switch (state) {
            case 0: // 初始状态
                std::cout << "Coroutine started with: " << initial_value << std::endl;
                // 执行 initial_suspend()
                // ...
                // co_await std::suspend_always{}; 的 await_suspend 部分
                state = 1; // 记录下一个状态
                return; // 暂停并返回
            case 1: // 从 suspend_always 恢复
                // co_await std::suspend_always{}; 的 await_resume 部分
                std::cout << "Coroutine resumed after suspend_always." << std::endl;
                x = initial_value + 10;
                // co_await std::suspend_never{}; 的 await_ready() 返回 true
                // 故不暂停,直接执行 await_resume()
                // state = 2; // 实际不会暂停到这里
            case 2: // 从 suspend_never 恢复 (理论上不会到这里)
                // co_await std::suspend_never{}; 的 await_resume 部分
                std::cout << "Coroutine resumed after suspend_never. x = " << x << std::endl;
                // co_return x * 2;
                promise.return_value(x * 2);
                state = -1; // 标记完成
                // 执行 final_suspend()
                // ...
                return; // 结束协程
        }
    }
};

上述伪代码展示了编译器如何通过一个 state 变量来跟踪协程的执行位置,并通过 switch 语句在恢复时跳转到正确的位置。所有需要在暂停点之间保持的变量都被放入 CoroutineFrame 中。

协程帧的内存布局与堆对称性

C++20 无栈协程的另一个关键特性是其“堆对称性”(Heap Symmetry)。这意味着协程帧的生命周期与堆内存的分配和释放紧密相关。

谁分配协程帧?

协程帧的内存分配是由协程的承诺类型(Promise Type)控制的。当一个协程函数被调用时,编译器会首先尝试创建一个承诺对象。如果承诺类型定义了静态成员函数 operator new,编译器会调用这个重载的 operator new 来为协程帧分配内存。否则,将使用全局 operator new 进行分配。

具体来说,编译器在协程函数调用时,会执行以下步骤:

  1. 分配协程帧内存: 调用 Promise::operator new (如果存在) 或全局 operator new 来分配足够的内存,以容纳协程帧(包括 Promise 对象本身、持久化变量、参数和内部状态)。
  2. 构造承诺对象: 在分配的内存中构造 Promise 对象。
  3. 获取返回对象: 调用 Promise::get_return_object() 方法,该方法通常返回一个 TaskFuture 对象,这个对象会被返回给协程的调用者。
  4. 执行 initial_suspend: 调用 Promise::initial_suspend() 方法,决定协程是否在开始执行前暂停。

如果内存分配失败,承诺类型还可以通过 Promise::get_return_object_on_allocation_failure() 提供一个备用的返回对象。

示例:自定义协程帧内存分配

为了观察和控制协程帧的内存分配,我们可以重载 Promise 类型的 operator newoperator delete

#include <iostream>
#include <coroutine>
#include <vector>
#include <string>
#include <memory> // For std::addressof

// 定义一个简单的 Task 类型,用于协程的返回
template <typename T>
struct MyTask {
    struct promise_type;
    using handle_type = std::coroutine_handle<promise_type>;

    handle_type coro_handle;

    MyTask(handle_type h) : coro_handle(h) {
        std::cout << "MyTask: Coroutine handle created." << std::endl;
    }

    MyTask(MyTask&& other) noexcept : coro_handle(other.coro_handle) {
        other.coro_handle = nullptr;
        std::cout << "MyTask: Coroutine handle moved." << std::endl;
    }

    ~MyTask() {
        if (coro_handle) {
            std::cout << "MyTask: Coroutine handle destroyed, attempting to destroy coroutine frame." << std::endl;
            coro_handle.destroy(); // 销毁协程帧
        }
    }

    T get_result() {
        if (!coro_handle || !coro_handle.done()) {
            throw std::runtime_error("Coroutine not finished yet!");
        }
        return coro_handle.promise().result_value;
    }

    void resume() {
        if (coro_handle && !coro_handle.done()) {
            coro_handle.resume();
        }
    }

    struct promise_type {
        T result_value;

        // 自定义内存分配器
        static void* operator new(std::size_t size) {
            std::cout << "Promise_type: Allocating " << size << " bytes for coroutine frame on heap." << std::endl;
            return ::operator new(size);
        }

        static void operator delete(void* ptr, std::size_t size) {
            std::cout << "Promise_type: Deallocating " << size << " bytes for coroutine frame from heap at " << ptr << std::endl;
            ::operator delete(ptr);
        }

        MyTask get_return_object() {
            std::cout << "Promise_type: get_return_object called." << std::endl;
            return MyTask{handle_type::from_promise(*this)};
        }

        std::suspend_always initial_suspend() {
            std::cout << "Promise_type: initial_suspend called. Coroutine will suspend immediately." << std::endl;
            return {};
        }

        std::suspend_always final_suspend() noexcept {
            std::cout << "Promise_type: final_suspend called. Coroutine will suspend at end." << std::endl;
            return {};
        }

        void return_value(T value) {
            std::cout << "Promise_type: return_value called with: " << value << std::endl;
            result_value = value;
        }

        void unhandled_exception() {
            std::cerr << "Promise_type: Unhandled exception caught in coroutine." << std::endl;
            std::terminate();
        }
    };
};

MyTask<int> compute_something(int a, int b) {
    std::cout << "Coroutine 'compute_something' started. Params: a=" << a << ", b=" << b << std::endl;
    int sum = a + b;
    std::cout << "Current sum: " << sum << std::endl;
    co_await std::suspend_always{}; // 第一次暂停
    std::cout << "Coroutine 'compute_something' resumed after first suspend." << std::endl;
    int product = a * b;
    std::cout << "Current product: " << product << std::endl;
    co_await std::suspend_always{}; // 第二次暂停
    std::cout << "Coroutine 'compute_something' resumed after second suspend." << std::endl;
    co_return sum + product;
}

int main() {
    std::cout << "Main: Calling compute_something(5, 3)..." << std::endl;
    MyTask<int> task = compute_something(5, 3);
    std::cout << "Main: compute_something returned. Task object created." << std::endl;

    // 此时协程已暂停在 initial_suspend
    // 协程帧的地址可以通过 handle.address() 获取,但直接访问其内部结构是 UB
    // 我们可以观察到 Promise 对象的地址与 operator new 返回的地址相近
    std::cout << "Main: Coroutine is initially suspended. Resuming..." << std::endl;
    task.resume(); // 恢复协程到第一个 co_await 之后

    std::cout << "Main: Coroutine resumed once. Suspending main for a bit..." << std::endl;
    // 此时协程已暂停在第一个 co_await 之后

    task.resume(); // 恢复协程到第二个 co_await 之后
    std::cout << "Main: Coroutine resumed twice. Suspending main for a bit..." << std::endl;
    // 此时协程已暂停在第二个 co_await 之后

    task.resume(); // 恢复协程到 co_return 之后,然后到 final_suspend
    std::cout << "Main: Coroutine resumed three times. Checking result..." << std::endl;

    std::cout << "Main: Result from coroutine: " << task.get_result() << std::endl;
    std::cout << "Main: Coroutine finished." << std::endl;

    // task 析构时会调用 coro_handle.destroy(),进而调用 Promise::operator delete
    std::cout << "Main: Exiting main function." << std::endl;
    return 0;
}

输出示例(略有简化,实际地址可能不同):

Main: Calling compute_something(5, 3)...
Promise_type: Allocating 128 bytes for coroutine frame on heap. // 协程帧分配
Promise_type: get_return_object called.
MyTask: Coroutine handle created.
Promise_type: initial_suspend called. Coroutine will suspend immediately.
Main: compute_something returned. Task object created.
Main: Coroutine is initially suspended. Resuming...
Coroutine 'compute_something' started. Params: a=5, b=3
Current sum: 8
Coroutine 'compute_something' resumed after first suspend.
Main: Coroutine resumed once. Suspending main for a bit...
Coroutine 'compute_something' resumed after second suspend.
Current product: 15
Coroutine 'compute_something' resumed after third suspend.
Main: Coroutine resumed twice. Suspending main for a bit...
Promise_type: return_value called with: 23
Promise_type: final_suspend called. Coroutine will suspend at end.
Main: Coroutine resumed three times. Checking result...
Main: Result from coroutine: 23
Main: Coroutine finished.
Main: Exiting main function.
MyTask: Coroutine handle destroyed, attempting to destroy coroutine frame.
Promise_type: Deallocating 128 bytes for coroutine frame from heap at 0x... // 协程帧释放

从输出中可以看到,Promise::operator newoperator delete 被调用,证明了协程帧确实是在堆上分配和释放的。协程帧的实际大小由编译器根据协程内部需要持久化的数据自动计算。

堆分配的必要性

堆分配对于无栈协程至关重要,因为它解决了协程生命周期与函数调用栈生命周期不一致的问题。一个协程可以被创建,然后暂停,其创建者函数可能早已返回,其栈帧也已销毁。但协程本身可能在未来的某个时间点被恢复。如果协程的上下文数据(局部变量、参数等)存储在创建者的栈上,那么当创建者返回后,这些数据就会失效,导致悬空引用和未定义行为。

通过将协程帧分配到堆上,协程的生命周期不再受限于任何特定的栈帧。协程帧的生命周期由其 std::coroutine_handle 管理,当不再需要协程时,调用 handle.destroy() 即可安全地释放协程帧的内存。

生命周期管理:协程句柄与协程帧的绑定

std::coroutine_handle 是协程帧的“智能指针”,但它是一个非拥有型句柄。这意味着它本身不负责协程帧的分配和释放。通常,协程的返回类型(例如我们上面定义的 MyTask)会封装 std::coroutine_handle,并在其析构函数中调用 handle.destroy() 来确保协程帧被正确释放。

协程帧的生命周期流程:

  1. 创建: 调用协程函数时,通过 Promise::operator new 分配协程帧,构造 Promise 对象,并返回 MyTask 对象(包含 coroutine_handle)。
  2. 暂停: 协程在 initial_suspendco_await 处暂停,将控制权返回给调用者。此时,协程帧及其内部状态保持不变。
  3. 恢复: 调用 coroutine_handle::resume(),协程从上次暂停点恢复执行。
  4. 完成: 协程执行到 co_return 或抛出异常时,进入 final_suspend。此时,协程被标记为完成(handle.done() 返回 true)。
  5. 销毁: 当 MyTask 对象被销毁时,其析构函数调用 coroutine_handle::destroy()。这会:
    • 调用 Promise 对象的析构函数。
    • 调用 Promise::operator delete 释放协程帧的内存。

这种堆对称性确保了协程的独立性和正确的资源管理,是无栈协程得以实现的关键基石。

深入 co_await:等待器模式

co_await 是 C++20 协程中最核心的关键字,它实现了协程的暂停和恢复。当协程 co_await 一个表达式时,该表达式的结果必须是一个“可等待对象”(Awaitable)。这个可等待对象会通过 operator co_awaitawait_transform 方法提供一个“等待器”(Awaiter)。等待器是实际执行暂停/恢复逻辑的对象。

等待器必须实现以下三个方法(或其变体):

  1. await_ready(): bool await_ready() const noexcept;
    在协程暂停之前调用。如果返回 true,表示等待的对象已经准备好,无需暂停,协程将直接执行 await_resume() 并继续。这通常用于优化,避免不必要的上下文切换。如果返回 false,协程将暂停。

  2. await_suspend(): void await_suspend(std::coroutine_handle<> caller_coro_handle) noexcept;bool await_suspend(...)std::coroutine_handle<> await_suspend(...)
    await_ready() 返回 false 时调用。这是协程实际暂停的地方。

    • caller_coro_handle 参数是当前正在 co_await 的协程的句柄。等待器应该保存这个句柄,以便在异步操作完成后恢复该协程。
    • 返回值行为:
      • void: await_suspend 必须将控制权转移到某个地方(例如,调度到另一个协程或线程),或者协程将永远不会恢复。
      • bool: 如果返回 false,表示暂停失败,协程会立即恢复。如果返回 true,表示成功暂停,协程将等待被恢复。
      • std::coroutine_handle<>: 返回一个句柄,控制权将转移到该句柄所代表的协程。这允许等待器在暂停当前协程的同时,立即切换到另一个协程。
  3. await_resume(): T await_resume() noexcept;
    当协程被恢复时调用。此方法返回 co_await 表达式的结果。如果异步操作失败,此方法可以在这里抛出异常。

自定义调度器示例:基于线程池的异步任务

为了更好地理解 co_await 的工作原理,我们来构建一个简单的异步任务调度器,将协程的恢复调度到线程池中执行。

#include <iostream>
#include <coroutine>
#include <thread>
#include <future>
#include <queue>
#include <mutex>
#include <functional>
#include <condition_variable>

// ------------------- 线程池实现 -------------------
class ThreadPool {
public:
    ThreadPool(size_t num_threads) : stop(false) {
        for (size_t i = 0; i < num_threads; ++i) {
            workers.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
                        if (this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    task();
                }
            });
        }
    }

    template <class F, class... Args>
    void enqueue(F&& f, Args&&... args) {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            if (stop)
                throw std::runtime_error("enqueue on stopped ThreadPool");
            tasks.emplace(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        }
        condition.notify_one();
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread& worker : workers)
            worker.join();
    }

private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

// 全局线程池实例
ThreadPool g_thread_pool(std::thread::hardware_concurrency() > 0 ? std::thread::hardware_concurrency() : 2);

// ------------------- 调度器 awaitable/awaiter -------------------
struct Scheduler {
    // Awaitable 自身就是 Awaiter
    bool await_ready() const noexcept {
        return false; // 总是暂停,以便调度
    }

    void await_suspend(std::coroutine_handle<> h) const {
        // 将协程的恢复操作提交到线程池
        g_thread_pool.enqueue([h]() mutable {
            std::cout << "[" << std::this_thread::get_id() << "] Resuming coroutine." << std::endl;
            h.resume();
        });
        std::cout << "[" << std::this_thread::get_id() << "] Coroutine suspended, scheduled for resume." << std::endl;
    }

    void await_resume() const noexcept {
        // 无返回值,只是恢复执行
    }
};

// ------------------- 协程 Task 类型 -------------------
template <typename T>
struct Task {
    struct promise_type;
    using handle_type = std::coroutine_handle<promise_type>;

    handle_type coro_handle;
    std::optional<T> result_value;
    std::exception_ptr exception;

    Task(handle_type h) : coro_handle(h) {}
    Task(Task&& other) noexcept : coro_handle(other.coro_handle) { other.coro_handle = nullptr; }
    Task& operator=(Task&& other) noexcept {
        if (this != &other) {
            if (coro_handle) coro_handle.destroy();
            coro_handle = other.coro_handle;
            other.coro_handle = nullptr;
        }
        return *this;
    }

    ~Task() {
        if (coro_handle) {
            coro_handle.destroy();
        }
    }

    T get() {
        if (!coro_handle) {
            throw std::runtime_error("Task has been moved or destroyed.");
        }
        while (!coro_handle.done()) {
            // 在实际应用中,这里应该有更复杂的等待机制,例如事件循环或同步等待
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
        }
        if (exception) {
            std::rethrow_exception(exception);
        }
        return result_value.value();
    }

    void resume() {
        if (coro_handle && !coro_handle.done()) {
            coro_handle.resume();
        }
    }

    struct promise_type {
        Task get_return_object() {
            return Task{handle_type::from_promise(*this)};
        }

        std::suspend_always initial_suspend() { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }

        void return_value(T value) {
            handle_type::from_promise(*this).coro_handle.result_value = value;
        }

        void unhandled_exception() {
            handle_type::from_promise(*this).coro_handle.exception = std::current_exception();
        }
    };
};

// ------------------- 协程函数 -------------------
Task<int> async_compute(int x, int y) {
    std::cout << "[" << std::this_thread::get_id() << "] async_compute: Starting, x=" << x << ", y=" << y << std::endl;
    co_await Scheduler{}; // 第一次暂停,调度到线程池
    std::cout << "[" << std::this_thread::get_id() << "] async_compute: Resumed after first suspend." << std::endl;
    int sum = x + y;
    co_await Scheduler{}; // 第二次暂停,调度到线程池
    std::cout << "[" << std::this_thread::get_id() << "] async_compute: Resumed after second suspend." << std::endl;
    int product = x * y;
    co_return sum + product;
}

int main() {
    std::cout << "[" << std::this_thread::get_id() << "] main: Calling async_compute." << std::endl;
    Task<int> result_task = async_compute(10, 20);

    std::cout << "[" << std::this_thread::get_id() << "] main: async_compute returned, initial suspend." << std::endl;

    // 第一次恢复,将调度到线程池执行
    result_task.resume();

    std::cout << "[" << std::this_thread::get_id() << "] main: After first resume call." << std::endl;

    // 第二次恢复,将调度到线程池执行
    // 注意:这里的resume是立即返回的,协程的实际执行在线程池中异步进行
    result_task.resume();

    std::cout << "[" << std::this_thread::get_id() << "] main: After second resume call." << std::endl;

    // 第三次恢复,将调度到线程池执行,直到 co_return
    result_task.resume();

    std::cout << "[" << std::this_thread::get_id() << "] main: After third resume call, waiting for result." << std::endl;

    // 等待并获取结果
    try {
        int final_result = result_task.get();
        std::cout << "[" << std::this_thread::get_id() << "] main: Final result: " << final_result << std::endl;
    } catch (const std::exception& e) {
        std::cerr << "[" << std::this_thread::get_id() << "] main: Error: " << e.what() << std::endl;
    }

    std::cout << "[" << std::this_thread::get_id() << "] main: Exiting." << std::endl;
    return 0;
}

在这个例子中:

  • Scheduler 是一个可等待对象,它的 await_suspend 方法将当前协程的句柄 h 捕获到一个 lambda 中,然后将这个 lambda 作为一个任务提交给 g_thread_poolawait_suspend 返回 void,表示它将控制权完全转移给线程池,当前协程在调用线程上暂停。
  • 当线程池中的某个工作线程执行这个 lambda 时,它会调用 h.resume(),从而恢复协程的执行。
  • await_resume 在协程恢复后立即执行,它在这里没有实际操作,因为 Scheduler 只是用于调度。

通过这种方式,我们实现了协程在不同线程之间的无缝切换和调度,极大地简化了异步编程的复杂性,避免了手动管理线程和同步原语。

co_yield:生成器模式

co_yield 关键字用于实现生成器(Generator)模式,它允许协程在每次产生一个值时暂停,并将控制权返回给调用者,待下次请求时再从暂停点恢复,继续生成下一个值。这与 co_await 的主要区别在于,co_yield 总是暂停并返回一个值,而 co_await 通常是等待一个异步操作完成。

为了支持 co_yield,协程的 Promise Type 必须实现 yield_value(T value) 方法。

表:co_yield 相关 Promise Type 方法

| 方法签名 | 描述

    #include <iostream>
    #include <coroutine>
    #include <thread>
    #include <chrono>
    #include <optional>
    #include <exception>

    // 1. 定义 Promise Type 所需的 Task 返回类型
    template <typename T>
    struct Generator {
        struct promise_type;
        using handle_type = std::coroutine_handle<promise_type>;

        handle_type coro_handle;
        std::optional<T> current_value; // 用于存储 co_yield 的值

        Generator(handle_type h) : coro_handle(h) {}
        Generator(Generator&& other) noexcept : coro_handle(other.coro_handle) {
            other.coro_handle = nullptr;
        }
        Generator& operator=(Generator&& other) noexcept {
            if (this != &other) {
                if (coro_handle) coro_handle.destroy();
                coro_handle = other.coro_handle;
                other.coro_handle = nullptr;
            }
            return *this;
        }
        ~Generator() {
            if (coro_handle) {
                coro_handle.destroy();
            }
        }

        // 迭代器接口,以便于在 range-based for 循环中使用
        struct Iterator {
            handle_type current_handle;

            // 构造函数:默认构造表示 end() 迭代器
            Iterator() : current_handle(nullptr) {}
            Iterator(handle_type h) : current_handle(h) {}

            // 前置递增操作符:恢复协程以获取下一个值
            Iterator& operator++() {
                current_handle.resume();
                if (current_handle.done()) {
                    current_handle = nullptr; // 标记为结束
                }
                return *this;
            }

            // 解引用操作符:获取当前产生的值
            T operator*() const {
                return current_handle.promise().value_storage;
            }

            // 判等操作符:判断是否到达结束
            bool operator==(const Iterator& other) const {
                return current_handle == other.current_handle;
            }
            bool operator!=(const Iterator& other) const {
                return !(*this == other);
            }
        };

        Iterator begin() {
            if (coro_handle) {
                coro_handle.resume(); // 第一次恢复,执行到第一个 co_yield 或 co_return
                if (coro_handle.done()) {
                    return Iterator{}; // 如果协程一上来就结束,返回 end 迭代器
                }
            }
            return Iterator{coro_handle};
        }

        Iterator end() {
            return Iterator{}; // 默认构造的迭代器表示结束
        }

        struct promise_type {
            T value_storage; // 用于存储 co_yield 产生的值

            Generator get_return_object() {
                return Generator{handle_type::from_promise(*this)};
            }

            // initial_suspend 总是暂停,等待 begin() 调用 resume()
            std::suspend_always initial_suspend() {
                std::cout << "  [Promise] initial_suspend: Suspending at start." << std::endl;
                return {};
            }

            // final_suspend 总是暂停,以便在 Generator 析构时安全销毁
            std::suspend_always final_suspend() noexcept {
                std::cout << "  [Promise] final_suspend: Suspending at end." << std::endl;
                return {};
            }

            // 处理 co_yield T;
            std::suspend_always yield_value(T value) {
                value_storage = value;
                std::cout << "  [Promise] yield_value: Storing " << value << ", suspending." << std::endl;
                return {}; // 暂停,将控制权返回给调用者
            }

            // 处理 co_return; (void)
            void return_void() {
                std::cout << "  [Promise] return_void: Coroutine finished." << std::endl;
            }

            void unhandled_exception() {
                std::cerr << "  [Promise] Unhandled exception in generator." << std::endl;
                std::terminate();
            }
        };
    };

    // 2. 协程函数使用 co_yield
    Generator<int> fibonacci(int n) {
        std::cout << "[Generator] fibonacci(" << n << "): Starting." << std::endl;
        if (n == 0) {
            co_return;
        }
        if (n == 1) {
            co_yield 0;
            co_return;
        }

        int a = 0, b = 1;
        co_yield a;
        co_yield b;

        for (int i = 2; i < n; ++i) {
            int next = a + b;
            co_yield next;
            a = b;
            b = next;
        }
        std::cout << "[Generator] fibonacci(" << n << "): Reached co_return." << std::endl;
    }

    int main() {
        std::cout << "[Main] Calling fibonacci(10)." << std::endl;
        Generator<int> gen = fibonacci(10); // 协程被创建,并立即在 initial_suspend 暂停
        std::cout << "[Main] Generator object created. Iterating..." << std::endl;

        int count = 0;
        for (int val : gen) { // begin() 会恢复协程到第一个 co_yield
            std::cout << "[Main] Received: " << val << std::endl;
            count++;
            if (count > 5) { // 提前退出循环,观察资源清理
                std::cout << "[Main] Breaking loop after 5 elements." << std::endl;
                break;
            }
        }
        std::cout << "[Main] Loop finished." << std::endl;

        // gen 析构时会调用 coro_handle.destroy(),释放协程帧
        std::cout << "[Main] Exiting main." << std::endl;
        return 0;
    }

输出示例:

[Main] Calling fibonacci(10).
  [Promise] initial_suspend: Suspending at start.
[Main] Generator object created. Iterating...
[Generator] fibonacci(10): Starting.
  [Promise] yield_value: Storing 0, suspending.
[Main] Received: 0
  [Promise] yield_value: Storing 1, suspending.
[Main] Received: 1
  [Promise] yield_value: Storing 1, suspending.
[Main] Received: 1
  [Promise] yield_value: Storing 2, suspending.
[Main] Received: 2
  [Promise] yield_value: Storing 3, suspending.
[Main] Received: 3
  [Promise] yield_value: Storing 5, suspending.
[Main] Received: 5
[Main] Breaking loop after 5 elements.
[Main] Loop finished.
[Main] Exiting main.
  [Promise] final_suspend: Suspending at end. // 如果协程提前销毁,这里的 final_suspend 可能不会被调用,取决于协程是否已完成。

注意: 如果在循环中提前 break,协程的 destroy() 会被调用,它会直接销毁协程帧,而不会让协程继续执行到 final_suspend。只有当协程自然执行到 co_return 并且 final_suspend 返回 std::suspend_always 时,协程才会暂停在 final_suspend

这个 Generator 示例展示了如何通过 co_yield 配合 promise_type::yield_value 实现一个可迭代的协程。yield_value 方法在存储值后返回 std::suspend_always,确保协程暂停并将控制权交还给调用者。调用者通过 ++ 运算符(即 Iterator::operator++)恢复协程,直到协程完成 (coro_handle.done())。

co_return:结果与异常处理

co_return 关键字用于结束协程的执行,并可以返回一个值。它对应于 Promise Type 中的 return_void()return_value(T value) 方法。同时,协程还需要一种机制来处理内部发生的未捕获异常,这通过 Promise Type 中的 unhandled_exception() 方法实现。

表:co_return 和异常处理相关 Promise Type 方法

方法签名 描述
void return_void(); 当协程执行 co_return; 时调用。用于不返回值的协程。
void return_value(T value); 当协程执行 co_return expression; 时调用。expression 的类型必须可转换为 T。用于返回值的协程。
void unhandled_exception(); 当协程内部抛出但未捕获的异常时调用。通常在这里捕获异常并存储 std::exception_ptr,以便在协程外部重新抛出。
std::suspend_always final_suspend() noexcept; 协程体执行完成(无论是通过 co_return 还是未处理异常)后,但协程帧被销毁之前调用。它决定协程是否在最终点暂停。通常返回 std::suspend_always,允许外部控制协程的销毁时机。

获取协程结果的策略:

由于协程是异步的,其结果不能像普通函数那样直接返回。通常,协程的返回类型(例如 Task<T>)会封装协程的最终结果,并在协程完成后提供一种机制来获取它。

  1. 直接存储在 Promise 中:
    如我们 MyTask 示例所示,Promise 对象内部可以有一个 result_value 字段来存储 co_return 的值。当协程完成时,return_value 方法将值写入此字段。外部通过 Task::get_result() 访问。

  2. std::future 适配:
    可以设计 Promise Type 来与 std::promisestd::future 结合,将协程的结果设置到 std::promise 中。

    // 伪代码
    template<typename T>
    struct FutureTask {
        struct promise_type {
            std::promise<T> p;
            FutureTask get_return_object() { return FutureTask{p.get_future()}; }
            std::suspend_never initial_suspend() { return {}; }
            std::suspend_always final_suspend() { return {}; }
            void return_value(T value) { p.set_value(value); }
            void unhandled_exception() { p.set_exception(std::current_exception()); }
        };
        std::future<T> fut;
        // ... 构造函数等
    };
  3. 自定义 Task 类型:
    这是最常见的方式,如本文示例中的 TaskGenerator。自定义 Task 类型可以封装协程句柄,并提供 get() 或迭代器接口来获取结果。它还可以包含 std::optional<T> 来存储值,以及 std::exception_ptr 来存储异常,从而实现完整的错误传播。

异常处理:

当协程内部发生未捕获的异常时,编译器会自动调用 Promise::unhandled_exception()。在这个方法中,我们通常会捕获当前的异常(std::current_exception())并将其存储起来,以便在协程的返回对象(如 Task<T>)被外部代码 get() 时重新抛出。这确保了协程中的异常不会静默丢失,而是能够被调用者处理。

// 示例中的 Task::promise_type::unhandled_exception 实现
void unhandled_exception() {
    handle_type::from_promise(*this).coro_handle.exception = std::current_exception();
}

通过这种机制,C++20 协程为异步操作提供了健壮的错误处理能力,使得异常能够像在同步代码中一样自然地传播。

协程与异步编程模式

C++20 协程为构建各种异步编程模式提供了强大的原语。它们可以与现有的异步库(如 Boost.ASIO)无缝集成,也可以用于构建全新的、高效的并发模型。

  1. 任务(Task)与未来(Future)
    这是协程最常见的应用模式。一个协程函数返回一个 Task<T> 对象,这个 Task 封装了协程的执行状态和最终结果(或异常)。调用者可以 co_await 这个 Task 来等待其完成,或者在单独的线程中阻塞等待 (Task::get())。这与 std::future 的概念非常相似,但协程的 Task 通常具有更细粒度的控制能力,例如可以暂停和恢复。

  2. 事件循环集成
    在高性能网络服务或GUI应用中,事件循环是核心。协程可以很容易地与事件循环集成。await_suspend 方法可以将协程的恢复操作提交到事件队列中,当相应的事件触发时,事件循环调度器会从队列中取出协程句柄并 resume() 它。这种模式使得在单线程事件循环中编写复杂的异步逻辑变得像编写同步代码一样简单。

    例如,一个 co_await network_read(socket, buffer) 表达式可以在 await_suspend 中注册一个套接字读事件到事件循环。当数据可用时,事件循环回调会恢复这个协程,并从 await_resume 中返回读取的数据。

  3. 并发与并行
    协程是并发的构建块,但它们本身不是并行的。并发意味着多个任务在逻辑上同时进行,但不一定在物理上同时执行(例如,通过时间片轮转)。并行意味着多个任务在物理上同时执行(例如,在多个CPU核心上)。
    协程可以用于在一个线程上实现高效的并发,减少线程切换开销。如果需要并行执行,可以将协程的恢复操作调度到不同的线程池中,如我们 Scheduler 示例所示。通过将协程与线程池结合,可以实现既高效又易于管理的并发和并行。

最佳实践与注意事项

虽然 C++20 协程带来了巨大的便利和能力,但在使用时仍需注意一些最佳实践和潜在问题:

  1. 生命周期管理
    协程帧是堆分配的,其生命周期必须被妥善管理。协程的返回对象(如 TaskGenerator)通常负责在其析构时调用 coroutine_handle::destroy()。确保协程句柄不会悬空,并且在协程不再需要时及时销毁,以避免内存泄漏。当协程被 co_await 时,如果 await_suspend 返回 std::coroutine_handle<>,则由返回的句柄所代表的协程负责管理当前协程的生命周期。

  2. 异常安全
    Promise::unhandled_exception() 是协程内部异常的最后一道防线。务必正确实现它,捕获 std::current_exception() 并将其存储在 Task 对象中,以便外部能够重新抛出和处理。否则,未处理的协程异常可能导致程序终止或静默错误。

  3. 性能考量

    • 堆分配开销:每个协程实例都需要一次堆分配(协程帧)。对于极度细粒度的协程,这可能成为性能瓶颈。考虑是否可以使用更粗粒度的协程,或者为协程帧实现自定义的内存池分配器。
    • 上下文切换:虽然协程上下文切换比线程轻量,但频繁的 co_await 仍然会引入一些开销。在性能敏感的代码中,应权衡 co_await 的粒度。
    • await_ready() 优化:如果一个 awaitable 能够很快地完成,await_ready() 返回 true 可以避免不必要的暂停和恢复,从而提升性能。
  4. 调试挑战
    协程的执行流是非线性的,一个协程可能在不同的时间点、甚至不同的线程上被恢复。这使得传统的栈回溯(stack trace)变得不再直观。调试协程需要更高级的工具支持,例如集成开发环境(IDE)对协程状态的显示,或者自定义的日志和追踪机制。

  5. 避免混合编程模型
    尽量避免在同一个逻辑流中混合使用协程、回调和阻塞式API。选择一种主要的异步编程模型,并坚持使用它,以保持代码的一致性和可读性。

  6. std::coroutine_handle 的线程安全性
    std::coroutine_handle 本身是线程安全的,但 resume()destroy() 操作不是。通常,对同一个协程句柄的 resume()destroy() 调用应该被外部同步(例如,通过互斥锁或确保只有一个线程拥有恢复/销毁的权利)。

展望与结束语

C++20 协程的引入是 C++ 语言发展的一个重要里程碑。它为 C++ 开发者提供了一种优雅而强大的工具,用于构建高性能、可读性强的并发和异步系统。通过深入理解其无栈状态机拆解的本质和堆对称性的内存管理,我们可以更好地利用协程的潜力。

协程在现代 C++ 应用中扮演着越来越重要的角色,尤其是在网络编程、GUI 事件处理、游戏开发和高性能计算等领域。它使得以顺序思维编写复杂的异步逻辑成为可能,极大地提升了开发效率,并有助于构建更加健壮和响应迅速的系统。随着 C++23 和 C++26 对协程生态系统的进一步完善,我们有理由相信,协程将成为 C++ 中处理并发和异步任务的首选范式,引领下一代 C++ 应用程序的开发模式。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注