什么是 ‘Awaitable’ 协议?深度拆解 `co_await` 展开后的 `ready`, `suspend`, `resume` 状态流转

各位同仁,大家好!

今天,我们将深入探讨 C++ 协程中一个至关重要的概念——’Awaitable’ 协议。协程的引入,彻底改变了 C++ 在异步编程领域的面貌,使得我们能够以同步代码的直观方式来编写复杂的异步逻辑,告别了回调地狱和状态机手动管理。而 co_await 操作符,正是这一切魔法的核心,它允许我们暂停一个协程的执行,等待某个异步操作完成,并在操作完成后恢复。

co_await 背后究竟发生了什么?它又是如何做到这一切的?答案就隐藏在 ‘Awaitable’ 协议及其三个关键方法:await_readyawait_suspendawait_resume 中。今天,我将带领大家深度拆解 co_await 操作符的展开过程,理解这些状态流转的精妙之处。


引言:协程与异步编程的基石

在传统的异步编程中,我们常常面临两大挑战:

  1. 回调地狱(Callback Hell):当多个异步操作需要串联执行时,代码会嵌套多层回调函数,导致逻辑难以阅读、理解和维护。
  2. 线程模型开销:虽然多线程可以实现并发,但线程的创建、销毁和上下文切换都有不小的开销,对于大量轻量级并发任务并不高效。

C++20 引入的协程(Coroutines)为这些问题提供了一个优雅的解决方案。协程是一种用户态的轻量级并发机制,它允许函数在执行过程中暂停,并在稍后从暂停点恢复执行,而无需阻塞底层线程。C++ 协程通过三个关键字实现其核心功能:

  • co_await:暂停当前协程,等待一个 Awaitable 对象完成。
  • co_yield:暂停当前协程,并返回一个值,类似于生成器。
  • co_return:结束当前协程,并返回一个值或表示完成。

今天,我们的焦点将完全集中在 co_await。理解 co_await 的工作机制,特别是它如何与 Awaitable 协议交互,是掌握 C++ 协程的关键。


Awaitable 协议的概述

在 C++ 协程中,任何可以作为 co_await 操作数表达式(expr)的类型,都被称为是 Awaitable 的。Awaitable 并不是一个具体的接口或基类,而是一个概念协议。编译器通过对 expr 进行模式匹配来判断它是否满足 Awaitable 协议。

一个表达式 expr 被认为是 Awaitable 的,如果它或者由它返回的 Awaiter 类型,满足以下条件之一:

  1. expr 本身是一个 Awaiter,也就是说,它直接提供了 await_ready()await_suspend()await_resume() 这三个成员函数。
  2. expr 有一个 operator co_await() 成员函数或自由函数,该函数返回一个 Awaiter 对象。

通常情况下,我们倾向于第二种模式,即让 Awaitable 类型通过 operator co_await() 返回一个独立的 Awaiter 对象。这样做的好处是:

  • 分离关注点:Awaitable 对象可以携带任务的描述信息,而 Awaiter 则专注于协程的挂起和恢复逻辑。
  • 生命周期管理:Awaiter 对象可以在 co_await 表达式的生命周期内存在,而不会影响原始 Awaitable 对象的生命周期。

无论是哪种情况,核心都在于 Awaiter 类型必须提供以下三个方法:

方法签名 描述
bool await_ready() const noexcept 检查异步操作是否已经完成。如果返回 true,表示操作已就绪,协程无需挂起,直接跳到 await_resume()。如果返回 false,表示操作未就绪,协程需要挂起,将继续调用 await_suspend()
void await_suspend(std::coroutine_handle<> h) noexcept
bool await_suspend(std::coroutine_handle<> h) noexcept
std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
await_ready() 返回 false 时调用。负责:
1. 挂起当前协程 h
2. 将协程句柄 h 保存起来,通常在异步操作完成时用于恢复协程。
3. 安排恢复协程的机制(例如,将 h 注册到事件循环或调度器)。
返回值有三种形式,将在后续详细解释。
ReturnType await_resume() noexcept 当协程从挂起状态恢复时调用。负责:
1. 获取异步操作的最终结果。
2. 处理可能发生的异常。
其返回值将成为 co_await 表达式的结果。

理解这三个方法的调用顺序和作用,是理解 co_await 展开机制的关键。


co_await 操作符的语义展开

现在,我们来揭开 co_await expr;ResultType res = co_await expr; 背后编译器是如何将其转换为一系列对 Awaiter 方法调用的。

假设我们有一个协程 MyCoroutine,并在其中使用了 co_await expr;。编译器会将其大致展开为以下伪代码:

// 假设 expr 是一个 Awaitable 对象
// 1. 获取 Awaiter 对象
auto&& awaiter = get_awaiter(expr); // 如果 expr 是 Awaiter,则直接是 expr;
                                    // 否则是 expr.operator co_await() 或 operator co_await(expr) 的结果。

// 2. 调用 await_ready() 检查是否需要挂起
if (!awaiter.await_ready()) {
    // 3. 如果未就绪,则需要挂起。
    //    将当前协程的句柄传递给 await_suspend()。
    //    此处的 h 代表当前正在执行的协程的 std::coroutine_handle<P>。

    // 编译器在这里插入一个关键的“挂起点”:
    // 当前协程帧的状态被保存,控制权暂时移交给 await_suspend。

    // 伪代码表示的挂起逻辑:
    // auto&& await_suspend_result = awaiter.await_suspend(std::coroutine_handle<P>::from_promise(this->promise));

    // 根据 await_suspend_result 的类型和值,可能会有不同的行为:
    // - void: 协程被挂起,控制权返回给 awaiter.await_suspend 的调用者(通常是协程的调度器或父协程)。
    // - bool: 如果返回 true,协程被挂起;如果返回 false,协程不挂起,直接继续执行 await_resume。
    // - coroutine_handle<>: 协程被挂起,控制权转移到 await_suspend 返回的 coroutine_handle 指定的协程。

    // 这里我们先假设最常见的 void 返回值,表示协程被挂起。
    // 控制流会在此处暂停,直到外部机制(例如事件循环或另一个线程)
    // 通过调用 h.resume() 来恢复当前协程。

    // 当协程被恢复时,它会从这里继续执行。
}

// 4. 协程恢复后(或者如果 await_ready() 返回 true,则直接),调用 await_resume() 获取结果。
//    如果 `co_await expr;` 有返回值,则赋值给 `res`。
ResultType res = awaiter.await_resume(); // 如果没有返回值,则直接调用。

这张图景清晰地展示了 co_await 并非一个简单的函数调用,而是一个复杂的控制流转移和状态管理的机制。接下来,我们将逐一深入探讨 await_readyawait_suspendawait_resume 的细节。


深入拆解:await_ready() – 准备就绪检查

await_ready() 方法是 Awaitable 协议的第一个入口点。它的核心职责是:判断异步操作是否已经完成,或者是否可以同步完成,从而决定当前协程是否需要挂起。

  • 签名bool await_ready() const noexcept;
  • 返回值
    • true:表示异步操作已经就绪,或者可以立即完成,协程无需挂起。编译器将跳过 await_suspend(),直接调用 await_resume()。这是一种优化,避免了不必要的上下文切换开销。
    • false:表示异步操作尚未就绪,协程需要挂起。编译器将继续调用 await_suspend()

典型场景:

  1. 缓存命中:如果你正在等待的数据可能已经存在于缓存中,await_ready() 可以检查缓存。如果命中,则返回 true,直接从缓存中获取数据。
  2. 同步完成的操作:有些“异步”操作实际上可能在调用时就已经完成了(例如,一个线程池任务如果线程池已满,可能会同步执行)。
  3. 调度优化:在某些调度器中,如果任务可以立即执行(例如,没有其他高优先级任务),await_ready() 也可以返回 true

代码示例:一个简单的同步 Awaitable

假设我们有一个 SyncOperation,它总是能够立即提供结果,无需真正挂起。

#include <iostream>
#include <string>
#include <coroutine> // 包含协程相关的头文件

// 1. 定义一个 Awaiter 类
class SyncAwaiter {
public:
    explicit SyncAwaiter(const std::string& msg) : message_(msg) {}

    // 总是返回 true,表示操作已就绪,无需挂起
    bool await_ready() const noexcept {
        std::cout << "SyncAwaiter::await_ready() called. Returning true (no suspend)." << std::endl;
        return true;
    }

    // 如果 await_ready() 返回 true,此方法将不会被调用
    void await_suspend(std::coroutine_handle<> h) noexcept {
        // 通常这里会保存句柄并安排恢复,但对于 SyncAwaiter,这不会发生
        std::cout << "SyncAwaiter::await_suspend() called. (This should not happen if await_ready() is true)" << std::endl;
        // 理论上这里不应该被执行到,如果被执行到,说明逻辑有问题
        h.resume(); // 仅为演示,如果真的被调用了,立即恢复
    }

    // 获取操作结果
    std::string await_resume() noexcept {
        std::cout << "SyncAwaiter::await_resume() called. Returning result." << std::endl;
        return "Synchronous result for: " + message_;
    }

private:
    std::string message_;
};

// 2. 定义一个 Awaitable 类,它通过 operator co_await 返回 SyncAwaiter
class SyncAwaitable {
public:
    explicit SyncAwaitable(const std::string& msg) : message_(msg) {}

    // 返回一个 SyncAwaiter 实例
    SyncAwaiter operator co_await() const noexcept {
        std::cout << "SyncAwaitable::operator co_await() called." << std::endl;
        return SyncAwaiter(message_);
    }

private:
    std::string message_;
};

// 3. 定义一个 Task 类型作为协程的返回对象
//    这里为了简化,我们只定义一个基本框架
template<typename T>
struct Task {
    struct promise_type {
        T value_;
        std::exception_ptr exception_;
        std::coroutine_handle<promise_type> get_return_object() {
            return std::coroutine_handle<promise_type>::from_promise(*this);
        }
        std::suspend_always initial_suspend() { 
            std::cout << "Task::initial_suspend()" << std::endl;
            return {}; 
        }
        std::suspend_always final_suspend() noexcept { 
            std::cout << "Task::final_suspend()" << std::endl;
            return {}; 
        }
        void return_value(T value) { value_ = value; }
        void unhandled_exception() { exception_ = std::current_exception(); }
    };

    std::coroutine_handle<promise_type> handle_;
    explicit Task(std::coroutine_handle<promise_type> h) : handle_(h) {}
    Task(Task&& other) noexcept : handle_(std::exchange(other.handle_, nullptr)) {}
    ~Task() { 
        if (handle_) {
            std::cout << "Task destructor: destroying coroutine handle." << std::endl;
            handle_.destroy(); 
        }
    }

    T get_result() {
        if (handle_) {
            // 确保协程已经执行完毕
            if (!handle_.done()) {
                handle_.resume(); // 恢复以完成执行
            }
            if (handle_.promise().exception_) {
                std::rethrow_exception(handle_.promise().exception_);
            }
            return handle_.promise().value_;
        }
        throw std::runtime_error("Task is empty or moved.");
    }
};

// 4. 协程函数
Task<std::string> perform_sync_task() {
    std::cout << "perform_sync_task: before co_await" << std::endl;
    std::string result = co_await SyncAwaitable("Hello Sync");
    std::cout << "perform_sync_task: after co_await, result = " << result << std::endl;
    co_return result;
}

int main() {
    std::cout << "main: Calling perform_sync_task()" << std::endl;
    Task<std::string> task = perform_sync_task();
    std::cout << "main: perform_sync_task() returned, now getting result." << std::endl;
    // 协程初始挂起,需要手动resume
    task.handle_.resume(); 
    std::string final_result = task.get_result();
    std::cout << "main: Final result from task: " << final_result << std::endl;
    return 0;
}

输出:

main: Calling perform_sync_task()
Task::initial_suspend()
main: perform_sync_task() returned, now getting result.
perform_sync_task: before co_await
SyncAwaitable::operator co_await() called.
SyncAwaiter::await_ready() called. Returning true (no suspend).
SyncAwaiter::await_resume() called. Returning result.
perform_sync_task: after co_await, result = Synchronous result for: Hello Sync
Task::final_suspend()
main: Final result from task: Synchronous result for: Hello Sync
Task destructor: destroying coroutine handle.

从输出可以看出,await_ready() 返回 true 后,await_suspend() 确实被跳过了,直接进入了 await_resume()。这完美展示了 await_ready() 的优化作用。


深入拆解:await_suspend() – 挂起与调度

await_ready() 返回 false 时,表示异步操作尚未完成,协程需要挂起。此时,编译器会调用 await_suspend()。这个方法是 co_await 机制的核心,因为它负责实际的协程挂起和调度。

  • 签名与返回值await_suspend 有三种可能的返回值类型,这提供了极大的灵活性:

    1. void await_suspend(std::coroutine_handle<> h) noexcept;

      • 最常见的用法。表示当前协程 h 被挂起,控制权将返回给调用 await_suspend 的上下文(通常是协程的调度器或父协程)。await_suspend 的实现者负责在异步操作完成后,通过调用 h.resume() 来恢复协程。
    2. bool await_suspend(std::coroutine_handle<> h) noexcept;

      • 返回 true:与 void 返回值行为相同,协程 h 被挂起,控制权返回给调用者。
      • 返回 false:这是一个高级优化。它表示虽然 await_ready() 返回了 false,但在 await_suspend 的执行过程中,异步操作奇迹般地完成了,或者出于某种原因,协程不应该被挂起,而是应该立即恢复。此时,编译器会跳过真正的挂起操作,直接继续执行 await_resume()。这在某些竞态条件下非常有用,可以避免一次不必要的上下文切换。
    3. std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept;

      • 返回一个 std::coroutine_handle<> 句柄 next_h。这表示当前协程 h 被挂起,但控制权不会返回给调用者,而是直接转移next_h 所代表的协程。这种机制可以用于实现协程间的直接切换,例如工作窃取(work-stealing)调度器。如果返回 h 本身,则表示不挂起当前协程,直接恢复它(类似于 bool 返回 false)。如果返回一个空句柄,行为未定义。
  • 参数std::coroutine_handle<P> h

    • h 是一个指向当前正在执行 co_await 的协程的句柄。P 是当前协程的 promise_type。这个句柄是协程的唯一标识,也是恢复协程的唯一途径。

典型场景:

  1. 异步I/O操作:当发起一个网络请求或文件读取时,操作不会立即完成。await_suspend() 会将当前协程句柄保存起来,注册到I/O事件循环中,然后返回,让出CPU。当I/O操作完成后,事件循环会通过保存的句柄来恢复协程。
  2. 线程池调度:将协程任务提交到线程池。await_suspend() 会将协程句柄封装成一个可执行的任务,添加到线程池的队列中。当线程池中的某个线程空闲时,会取出任务并调用 h.resume()
  3. 延迟操作:等待一个定时器到期。await_suspend() 会将协程句柄注册到定时器管理器中,当时间到达时,定时器管理器会恢复协程。

代码示例:一个简单的异步 Awaitable(使用 void 返回值)

我们将模拟一个延迟操作,使用一个简单的全局调度器来恢复协程。

#include <iostream>
#include <string>
#include <coroutine>
#include <thread>
#include <chrono>
#include <vector>
#include <functional>
#include <queue>
#include <mutex>

// 1. 简单的全局调度器
class SimpleScheduler {
public:
    static SimpleScheduler& get_instance() {
        static SimpleScheduler instance;
        return instance;
    }

    void schedule(std::function<void()> task) {
        std::lock_guard<std::mutex> lock(mtx_);
        tasks_.push(std::move(task));
        cv_.notify_one();
    }

    void run() {
        std::unique_lock<std::mutex> lock(mtx_);
        while (!stop_) {
            cv_.wait(lock, [this]{ return !tasks_.empty() || stop_; });
            if (stop_) break;

            auto task = tasks_.front();
            tasks_.pop();
            lock.unlock(); // 释放锁以执行任务,避免死锁或长时间持有锁
            task();
            lock.lock(); // 重新获取锁
        }
        std::cout << "Scheduler stopped." << std::endl;
    }

    void stop() {
        std::lock_guard<std::mutex> lock(mtx_);
        stop_ = true;
        cv_.notify_all();
    }

private:
    SimpleScheduler() = default;
    ~SimpleScheduler() = default;
    SimpleScheduler(const SimpleScheduler&) = delete;
    SimpleScheduler& operator=(const SimpleScheduler&) = delete;

    std::queue<std::function<void()>> tasks_;
    std::mutex mtx_;
    std::condition_variable cv_;
    bool stop_ = false;
};

// 2. 定义一个异步 Awaiter
class DelayAwaiter {
public:
    explicit DelayAwaiter(std::chrono::milliseconds delay_ms) : delay_(delay_ms) {}

    bool await_ready() const noexcept {
        // 对于模拟的延迟,通常不会立即就绪,除非延迟为0
        std::cout << "DelayAwaiter::await_ready() called." << std::endl;
        return delay_.count() == 0;
    }

    void await_suspend(std::coroutine_handle<> h) noexcept {
        std::cout << "DelayAwaiter::await_suspend() called. Suspending for " 
                  << delay_.count() << "ms. Coroutine handle: " << h.address() << std::endl;
        // 保存协程句柄,并在延迟后恢复
        std::thread([h, delay = delay_] {
            std::this_thread::sleep_for(delay);
            std::cout << "Delay finished. Resuming coroutine handle: " << h.address() << std::endl;
            SimpleScheduler::get_instance().schedule([h]{
                if (h) h.resume(); // 在调度器线程中恢复协程
            });
        }).detach(); // 启动一个独立线程来等待并调度恢复
    }

    // 假设延迟操作没有返回值,或者返回一个确认状态
    void await_resume() noexcept {
        std::cout << "DelayAwaiter::await_resume() called." << std::endl;
    }

private:
    std::chrono::milliseconds delay_;
};

// 3. 定义一个 Awaitable 类
class Delay {
public:
    explicit Delay(std::chrono::milliseconds delay_ms) : delay_ms_(delay_ms) {}

    DelayAwaiter operator co_await() const noexcept {
        std::cout << "Delay::operator co_await() called." << std::endl;
        return DelayAwaiter(delay_ms_);
    }

private:
    std::chrono::milliseconds delay_ms_;
};

// 4. 定义一个 Task 类型作为协程的返回对象
//    这里为了简化,我们只定义一个基本框架
template<typename T>
struct Task {
    struct promise_type {
        T value_;
        std::exception_ptr exception_;
        std::coroutine_handle<promise_type> get_return_object() {
            return std::coroutine_handle<promise_type>::from_promise(*this);
        }
        std::suspend_always initial_suspend() { 
            std::cout << "Task::initial_suspend()" << std::endl;
            return {}; 
        }
        std::suspend_always final_suspend() noexcept { 
            std::cout << "Task::final_suspend()" << std::endl;
            return {}; 
        }
        void return_value(T value) { value_ = value; }
        void unhandled_exception() { exception_ = std::current_exception(); }
    };

    std::coroutine_handle<promise_type> handle_;
    explicit Task(std::coroutine_handle<promise_type> h) : handle_(h) {}
    Task(Task&& other) noexcept : handle_(std::exchange(other.handle_, nullptr)) {}
    ~Task() { 
        if (handle_) {
            std::cout << "Task destructor: destroying coroutine handle." << std::endl;
            handle_.destroy(); 
        }
    }

    T get_result() {
        if (handle_) {
            // 协程可能还在挂起,需要确保它运行完毕
            // 在实际应用中,这里会通过调度器等待任务完成
            while (!handle_.done()) {
                 // For this example, we'll actively resume if not done.
                 // In a real system, the scheduler would handle this.
                 std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 防止忙等待
                 if (!handle_.done()) { // 再次检查,可能已经由其他线程恢复
                    SimpleScheduler::get_instance().schedule([h = handle_]{
                        if (h && !h.done()) h.resume(); // 确保在调度器线程中恢复
                    });
                 }
            }
            if (handle_.promise().exception_) {
                std::rethrow_exception(handle_.promise().exception_);
            }
            return handle_.promise().value_;
        }
        throw std::runtime_error("Task is empty or moved.");
    }

    // 用于启动协程的方便方法
    void start() {
        if (handle_ && !handle_.done()) {
            SimpleScheduler::get_instance().schedule([h = handle_]{
                if (h && !h.done()) h.resume();
            });
        }
    }
};

// 5. 协程函数
Task<std::string> perform_async_delay_task() {
    std::cout << "perform_async_delay_task: Before first co_await (500ms)" << std::endl;
    co_await Delay(std::chrono::milliseconds(500));
    std::cout << "perform_async_delay_task: After first co_await (500ms)" << std::endl;

    std::cout << "perform_async_delay_task: Before second co_await (200ms)" << std::endl;
    co_await Delay(std::chrono::milliseconds(200));
    std::cout << "perform_async_delay_task: After second co_await (200ms)" << std::endl;

    co_return "Async tasks completed!";
}

int main() {
    std::cout << "main: Starting scheduler thread." << std::endl;
    std::thread scheduler_thread([]{ SimpleScheduler::get_instance().run(); });

    std::cout << "main: Calling perform_async_delay_task()" << std::endl;
    Task<std::string> task = perform_async_delay_task();
    task.start(); // 恢复初始挂起的协程

    std::cout << "main: perform_async_delay_task() returned, main thread continues..." << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 给调度器一些时间

    std::string final_result = task.get_result(); // 等待并获取结果
    std::cout << "main: Final result from task: " << final_result << std::endl;

    SimpleScheduler::get_instance().stop();
    scheduler_thread.join();

    return 0;
}

输出(大致顺序,实际可能因线程调度而异):

main: Starting scheduler thread.
main: Calling perform_async_delay_task()
Task::initial_suspend()
main: perform_async_delay_task() returned, main thread continues...
perform_async_delay_task: Before first co_await (500ms)
Delay::operator co_await() called.
DelayAwaiter::await_ready() called.
DelayAwaiter::await_suspend() called. Suspending for 500ms. Coroutine handle: 0x...
// main thread sleeps and then calls get_result() which might resume the task if it hasn't completed
// Scheduler thread is running in the background...
// ... (some delay) ...
Delay finished. Resuming coroutine handle: 0x...
DelayAwaiter::await_resume() called.
perform_async_delay_task: After first co_await (500ms)
perform_async_delay_task: Before second co_await (200ms)
Delay::operator co_await() called.
DelayAwaiter::await_ready() called.
DelayAwaiter::await_suspend() called. Suspending for 200ms. Coroutine handle: 0x...
// ... (some delay) ...
Delay finished. Resuming coroutine handle: 0x...
DelayAwaiter::await_resume() called.
perform_async_delay_task: After second co_await (200ms)
Task::final_suspend()
main: Final result from task: Async tasks completed!
Task destructor: destroying coroutine handle.
Scheduler stopped.

从输出中可以看到,当 await_ready() 返回 false 后,await_suspend() 被调用,协程被挂起。主线程继续执行,而一个新的线程被创建用于模拟异步延迟。当延迟结束后,该线程将协程句柄提交给调度器,调度器最终调用 h.resume() 恢复了协程的执行,从而继续执行 co_await 之后的代码。这清晰地展示了 await_suspend() 如何实现协程的挂起和外部调度。


深入拆解:await_resume() – 恢复与结果获取

当协程从挂起状态被恢复时,或者当 await_ready() 返回 true 导致协程无需挂起时,await_resume() 方法就会被调用。它是 co_await 表达式的最后一个阶段。

  • 签名ReturnType await_resume() noexcept;
  • 返回值
    • ReturnTypeawait_resume() 的返回值就是整个 co_await expr 表达式的最终结果。如果 co_await 表达式没有被赋值给任何变量,那么 ReturnType 可以是 void
    • 这个返回值通常是从异步操作中获取的实际数据,或者是一个表示操作成功的状态。

典型场景:

  1. 获取异步操作结果:例如,从一个 std::futurestd::promise 中获取异步计算的结果。
  2. 错误处理:检查异步操作是否成功。如果操作失败,await_resume() 可以抛出异常,让协程内部的 try-catch 块捕获处理,或者返回一个错误码/状态。
  3. 资源清理:在某些情况下,可以在获取结果后进行一些轻量级的清理工作。

代码示例:在异步 Awaitable 中获取结果

我们将修改之前的 DelayAwaiter,让它在延迟完成后返回一个字符串消息。

#include <iostream>
#include <string>
#include <coroutine>
#include <thread>
#include <chrono>
#include <vector>
#include <functional>
#include <queue>
#include <mutex>
#include <future> // 用于从异步线程获取结果

// ... (SimpleScheduler, Task promise_type and Task class remain the same) ...
// 为了保持文章长度和焦点,这里省略重复的 SimpleScheduler 和 Task 的定义。
// 假设它们已经定义如上文。

// 定义一个异步 Awaiter,能够返回结果
class DelayResultAwaiter {
public:
    explicit DelayResultAwaiter(std::chrono::milliseconds delay_ms, const std::string& message)
        : delay_(delay_ms), message_(message) {}

    bool await_ready() const noexcept {
        std::cout << "DelayResultAwaiter::await_ready() called." << std::endl;
        return delay_.count() == 0;
    }

    void await_suspend(std::coroutine_handle<> h) noexcept {
        std::cout << "DelayResultAwaiter::await_suspend() called. Suspending for " 
                  << delay_.count() << "ms. Coroutine handle: " << h.address() << std::endl;

        // 使用 std::promise 和 std::future 来在异步线程中设置结果
        // 并在主协程恢复时通过 await_resume 获取
        std::promise<std::string> p;
        future_ = p.get_future(); // 保存 future 以便 await_resume 使用

        std::thread([h, delay = delay_, msg = message_, p = std::move(p)]() mutable {
            std::this_thread::sleep_for(delay);
            std::cout << "Delay finished. Setting result and resuming coroutine handle: " << h.address() << std::endl;
            p.set_value("Delayed result for: " + msg); // 设置结果
            SimpleScheduler::get_instance().schedule([h]{
                if (h) h.resume(); // 在调度器线程中恢复协程
            });
        }).detach();
    }

    // 获取操作结果
    std::string await_resume() noexcept {
        std::cout << "DelayResultAwaiter::await_resume() called. Getting result from future." << std::endl;
        // 等待并获取异步线程设置的结果
        // 注意:这里调用 .get() 会阻塞当前(恢复协程的)线程,
        // 在实际应用中,如果 future 还没有就绪,可能需要更复杂的非阻塞等待机制。
        // 但由于我们是在 await_suspend 中确保了恢复时结果已就绪,这里是安全的。
        return future_.get(); 
    }

private:
    std::chrono::milliseconds delay_;
    std::string message_;
    std::shared_future<std::string> future_; // 使用 shared_future 以便可以拷贝或多次访问
};

// 3. 定义一个 Awaitable 类
class DelayWithResult {
public:
    explicit DelayWithResult(std::chrono::milliseconds delay_ms, const std::string& message)
        : delay_ms_(delay_ms), message_(message) {}

    DelayResultAwaiter operator co_await() const noexcept {
        std::cout << "DelayWithResult::operator co_await() called." << std::endl;
        return DelayResultAwaiter(delay_ms_, message_);
    }

private:
    std::chrono::milliseconds delay_ms_;
    std::string message_;
};

// 4. 协程函数
Task<std::string> perform_async_task_with_result() {
    std::cout << "perform_async_task_with_result: Before first co_await (300ms)" << std::endl;
    std::string result1 = co_await DelayWithResult(std::chrono::milliseconds(300), "First delay");
    std::cout << "perform_async_task_with_result: After first co_await, result1 = " << result1 << std::endl;

    std::cout << "perform_async_task_with_result: Before second co_await (100ms)" << std::endl;
    std::string result2 = co_await DelayWithResult(std::chrono::milliseconds(100), "Second delay");
    std::cout << "perform_async_task_with_result: After second co_await, result2 = " << result2 << std::endl;

    co_return result1 + " | " + result2;
}

int main() {
    std::cout << "main: Starting scheduler thread." << std::endl;
    std::thread scheduler_thread([]{ SimpleScheduler::get_instance().run(); });

    std::cout << "main: Calling perform_async_task_with_result()" << std::endl;
    Task<std::string> task = perform_async_task_with_result();
    task.start(); // 恢复初始挂起的协程

    std::cout << "main: perform_async_task_with_result() returned, main thread continues..." << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 给调度器一些时间

    std::string final_result = task.get_result(); // 等待并获取结果
    std::cout << "main: Final result from task: " << final_result << std::endl;

    SimpleScheduler::get_instance().stop();
    scheduler_thread.join();

    return 0;
}

输出(大致顺序):

main: Starting scheduler thread.
main: Calling perform_async_task_with_result()
Task::initial_suspend()
main: perform_async_task_with_result() returned, main thread continues...
perform_async_task_with_result: Before first co_await (300ms)
DelayWithResult::operator co_await() called.
DelayResultAwaiter::await_ready() called.
DelayResultAwaiter::await_suspend() called. Suspending for 300ms. Coroutine handle: 0x...
// ... (some delay) ...
Delay finished. Setting result and resuming coroutine handle: 0x...
DelayResultAwaiter::await_resume() called. Getting result from future.
perform_async_task_with_result: After first co_await, result1 = Delayed result for: First delay
perform_async_task_with_result: Before second co_await (100ms)
DelayWithResult::operator co_await() called.
DelayResultAwaiter::await_ready() called.
DelayResultAwaiter::await_suspend() called. Suspending for 100ms. Coroutine handle: 0x...
// ... (some delay) ...
Delay finished. Setting result and resuming coroutine handle: 0x...
DelayResultAwaiter::await_resume() called. Getting result from future.
perform_async_task_with_result: After second co_await, result2 = Delayed result for: Second delay
Task::final_suspend()
main: Final result from task: Delayed result for: First delay | Delayed result for: Second delay
Task destructor: destroying coroutine handle.
Scheduler stopped.

这个例子清晰地展示了 await_resume() 如何从异步操作中获取最终结果,并将其作为 co_await 表达式的值返回。


Awaiter 类型与定制 Awaitable

在上面的例子中,我们看到了 Awaitable 类型 (如 Delay) 通过 operator co_await() 返回一个 Awaiter 类型 (如 DelayAwaiter)。这是一种推荐的模式,因为它将 Awaitable 的描述与 Awaiter 的行为逻辑解耦。

  • Awaitable:通常是一个轻量级的对象,它描述了一个异步操作,例如一个网络请求的参数、一个文件路径和一个操作类型。它本身可能不直接包含协程挂起/恢复的逻辑。
  • Awaiter:这是一个更重量级的对象,它拥有实际的 await_readyawait_suspendawait_resume 方法。它通常会捕获 Awaitable 对象的信息,并管理协程句柄的生命周期。

通过 operator co_await 机制,我们可以:

  1. 惰性创建 Awaiter:只有当 expr 真正被 co_await 时,operator co_await 才会被调用,从而创建 Awaiter 对象。
  2. 资源管理:Awaiter 可以持有异步操作所需的临时资源,并在 await_resume 完成后或析构时释放。
  3. 泛型 Awaitable:一个 Awaitable 类型可以为不同的协程上下文(通过 std::coroutine_traits)提供不同的 operator co_await 重载,从而返回定制的 Awaiter。

协程句柄 (std::coroutine_handle) 详解

std::coroutine_handle 是 C++ 协程的核心抽象,它是对协程帧(coroutine frame)的非拥有型引用。协程帧是编译器为协程创建的一块内存区域,用于存储协程的局部变量、参数、当前暂停点以及 promise_type 实例。

  • std::coroutine_handle<P>:这是一个带有 promise_type 模板参数的协程句柄。P 是协程的 promise_type 类型。通过它可以访问协程的 promise_type 实例 (h.promise())。
  • std::coroutine_handle<void>std::coroutine_handle<>:这是一个不带 promise_type 信息的通用协程句柄,只能用于恢复 (resume()) 或销毁 (destroy()) 协程,但不能访问 promise_type。在 await_suspend 中,通常使用 std::coroutine_handle<> 作为参数,因为它不关心具体的 promise_type

关键方法:

  • h.resume():恢复协程的执行。协程将从上次暂停的位置继续。
  • h.destroy():销毁协程帧,释放其占用的内存。调用 h.destroy() 后,h 变得无效。通常在协程完全结束后(通过 final_suspendco_return)由返回对象负责调用。
  • h.done():检查协程是否已经执行完毕。返回 true 表示协程已完成,不能再被恢复。
  • std::coroutine_handle<P>::from_promise(P& promise):从 promise_type 实例获取其对应的协程句柄。这是协程内部获取自身句柄的唯一方式。

await_suspend 接收 std::coroutine_handle<> 参数,意味着它可以在不了解协程具体类型的情况下,调度任何协程的恢复。这是协程调度器设计的基础。


promise_type 与 Awaitable 的协同

promise_type 是协程帧中的一个关键组成部分,它定义了协程的生命周期行为以及与外部世界的交互方式。Awaitable 协议与 promise_type 紧密协作,共同管理协程的执行。

promise_type 提供了一系列定制点:

  • get_return_object():在协程函数调用时立即调用,用于创建并返回协程的“返回对象”(例如 Task<T>std::future 等)。这个对象是协程与外部世界通信的桥梁。
  • initial_suspend():协程体真正执行前调用。它的返回值必须是一个 Awaitable。
    • 如果 initial_suspend().await_ready()true,协程会立即开始执行。
    • 如果 initial_suspend().await_ready()false,协程会立即挂起(通常返回 std::suspend_always{}),等待外部调用 handle.resume() 来启动。这使得我们可以在协程创建后,延迟其执行,进行一些初始化或调度。
  • final_suspend():协程体执行完毕(无论是通过 co_return、异常还是执行到末尾)后调用。它的返回值也必须是一个 Awaitable。
    • 如果 final_suspend().await_ready()true,协程帧会被立即销毁。
    • 如果 final_suspend().await_ready()false,协程会再次挂起(通常返回 std::suspend_always{}),允许返回对象在销毁协程帧之前,获取最终结果或进行清理。这对于实现类似于 std::future 的语义非常重要,因为它允许在结果被消费后才销毁协程。
  • return_value(T val) / return_void():处理 co_return 语句。return_value 用于有返回值的协程,return_void 用于返回 void 的协程。它们通常用于将协程的结果存储在 promise_type 中,以便 get_return_object 返回的对象能够访问。
  • unhandled_exception():当协程内部抛出未捕获的异常时调用。它通常用于将异常存储在 promise_type 中,以便返回对象能够重新抛出该异常。

initial_suspend()final_suspend() 本身就是 Awaitable,这意味着它们遵循 await_readyawait_suspendawait_resume 的协议。这为协程的启动和结束提供了极大的灵活性和可定制性。


Awaitable 协议的强大与精妙

通过深入拆解 co_await 展开后的 await_readyawait_suspendawait_resume 状态流转,我们看到了 C++ 协程如何将复杂的异步操作封装成简洁、线性的代码。Awaitable 协议是 C++ 协程机制的基石,它提供了一个强大的可扩展框架,使得开发者能够:

  • 实现各种异步原语:无论是异步I/O、定时器、线程池任务,还是更复杂的并发模式,都可以通过实现 Awaitable 协议来与 co_await 无缝集成。
  • 细粒度控制协程行为:通过 await_ready 进行同步路径优化,通过 await_suspend 控制挂起和调度,通过 await_resume 处理结果和异常。
  • 构建高效的调度器await_suspend 中的 coroutine_handle 使得调度器能够完全控制协程的生命周期和执行顺序。

C++ 协程的引入,标志着 C++ 在异步和并发编程领域迈出了重要一步。理解 Awaitable 协议的内在机制,将帮助我们更好地设计和实现高性能、高可维护性的并发系统。它提供了一种优雅的方式,让我们能够以人类更易理解的方式,来表达那些原本需要复杂状态机或回调链条才能实现的异步逻辑。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注