各位同仁,大家好!
今天,我们将深入探讨 C++ 协程中一个至关重要的概念——’Awaitable’ 协议。协程的引入,彻底改变了 C++ 在异步编程领域的面貌,使得我们能够以同步代码的直观方式来编写复杂的异步逻辑,告别了回调地狱和状态机手动管理。而 co_await 操作符,正是这一切魔法的核心,它允许我们暂停一个协程的执行,等待某个异步操作完成,并在操作完成后恢复。
但 co_await 背后究竟发生了什么?它又是如何做到这一切的?答案就隐藏在 ‘Awaitable’ 协议及其三个关键方法:await_ready、await_suspend 和 await_resume 中。今天,我将带领大家深度拆解 co_await 操作符的展开过程,理解这些状态流转的精妙之处。
引言:协程与异步编程的基石
在传统的异步编程中,我们常常面临两大挑战:
- 回调地狱(Callback Hell):当多个异步操作需要串联执行时,代码会嵌套多层回调函数,导致逻辑难以阅读、理解和维护。
- 线程模型开销:虽然多线程可以实现并发,但线程的创建、销毁和上下文切换都有不小的开销,对于大量轻量级并发任务并不高效。
C++20 引入的协程(Coroutines)为这些问题提供了一个优雅的解决方案。协程是一种用户态的轻量级并发机制,它允许函数在执行过程中暂停,并在稍后从暂停点恢复执行,而无需阻塞底层线程。C++ 协程通过三个关键字实现其核心功能:
co_await:暂停当前协程,等待一个 Awaitable 对象完成。co_yield:暂停当前协程,并返回一个值,类似于生成器。co_return:结束当前协程,并返回一个值或表示完成。
今天,我们的焦点将完全集中在 co_await。理解 co_await 的工作机制,特别是它如何与 Awaitable 协议交互,是掌握 C++ 协程的关键。
Awaitable 协议的概述
在 C++ 协程中,任何可以作为 co_await 操作数表达式(expr)的类型,都被称为是 Awaitable 的。Awaitable 并不是一个具体的接口或基类,而是一个概念协议。编译器通过对 expr 进行模式匹配来判断它是否满足 Awaitable 协议。
一个表达式 expr 被认为是 Awaitable 的,如果它或者由它返回的 Awaiter 类型,满足以下条件之一:
expr本身是一个 Awaiter,也就是说,它直接提供了await_ready()、await_suspend()和await_resume()这三个成员函数。expr有一个operator co_await()成员函数或自由函数,该函数返回一个 Awaiter 对象。
通常情况下,我们倾向于第二种模式,即让 Awaitable 类型通过 operator co_await() 返回一个独立的 Awaiter 对象。这样做的好处是:
- 分离关注点:Awaitable 对象可以携带任务的描述信息,而 Awaiter 则专注于协程的挂起和恢复逻辑。
- 生命周期管理:Awaiter 对象可以在
co_await表达式的生命周期内存在,而不会影响原始 Awaitable 对象的生命周期。
无论是哪种情况,核心都在于 Awaiter 类型必须提供以下三个方法:
| 方法签名 | 描述 |
|---|---|
bool await_ready() const noexcept |
检查异步操作是否已经完成。如果返回 true,表示操作已就绪,协程无需挂起,直接跳到 await_resume()。如果返回 false,表示操作未就绪,协程需要挂起,将继续调用 await_suspend()。 |
void await_suspend(std::coroutine_handle<> h) noexcept bool await_suspend(std::coroutine_handle<> h) noexcept std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept |
当 await_ready() 返回 false 时调用。负责:1. 挂起当前协程 h。2. 将协程句柄 h 保存起来,通常在异步操作完成时用于恢复协程。3. 安排恢复协程的机制(例如,将 h 注册到事件循环或调度器)。返回值有三种形式,将在后续详细解释。 |
ReturnType await_resume() noexcept |
当协程从挂起状态恢复时调用。负责: 1. 获取异步操作的最终结果。 2. 处理可能发生的异常。 其返回值将成为 co_await 表达式的结果。 |
理解这三个方法的调用顺序和作用,是理解 co_await 展开机制的关键。
co_await 操作符的语义展开
现在,我们来揭开 co_await expr; 或 ResultType res = co_await expr; 背后编译器是如何将其转换为一系列对 Awaiter 方法调用的。
假设我们有一个协程 MyCoroutine,并在其中使用了 co_await expr;。编译器会将其大致展开为以下伪代码:
// 假设 expr 是一个 Awaitable 对象
// 1. 获取 Awaiter 对象
auto&& awaiter = get_awaiter(expr); // 如果 expr 是 Awaiter,则直接是 expr;
// 否则是 expr.operator co_await() 或 operator co_await(expr) 的结果。
// 2. 调用 await_ready() 检查是否需要挂起
if (!awaiter.await_ready()) {
// 3. 如果未就绪,则需要挂起。
// 将当前协程的句柄传递给 await_suspend()。
// 此处的 h 代表当前正在执行的协程的 std::coroutine_handle<P>。
// 编译器在这里插入一个关键的“挂起点”:
// 当前协程帧的状态被保存,控制权暂时移交给 await_suspend。
// 伪代码表示的挂起逻辑:
// auto&& await_suspend_result = awaiter.await_suspend(std::coroutine_handle<P>::from_promise(this->promise));
// 根据 await_suspend_result 的类型和值,可能会有不同的行为:
// - void: 协程被挂起,控制权返回给 awaiter.await_suspend 的调用者(通常是协程的调度器或父协程)。
// - bool: 如果返回 true,协程被挂起;如果返回 false,协程不挂起,直接继续执行 await_resume。
// - coroutine_handle<>: 协程被挂起,控制权转移到 await_suspend 返回的 coroutine_handle 指定的协程。
// 这里我们先假设最常见的 void 返回值,表示协程被挂起。
// 控制流会在此处暂停,直到外部机制(例如事件循环或另一个线程)
// 通过调用 h.resume() 来恢复当前协程。
// 当协程被恢复时,它会从这里继续执行。
}
// 4. 协程恢复后(或者如果 await_ready() 返回 true,则直接),调用 await_resume() 获取结果。
// 如果 `co_await expr;` 有返回值,则赋值给 `res`。
ResultType res = awaiter.await_resume(); // 如果没有返回值,则直接调用。
这张图景清晰地展示了 co_await 并非一个简单的函数调用,而是一个复杂的控制流转移和状态管理的机制。接下来,我们将逐一深入探讨 await_ready、await_suspend 和 await_resume 的细节。
深入拆解:await_ready() – 准备就绪检查
await_ready() 方法是 Awaitable 协议的第一个入口点。它的核心职责是:判断异步操作是否已经完成,或者是否可以同步完成,从而决定当前协程是否需要挂起。
- 签名:
bool await_ready() const noexcept; - 返回值:
true:表示异步操作已经就绪,或者可以立即完成,协程无需挂起。编译器将跳过await_suspend(),直接调用await_resume()。这是一种优化,避免了不必要的上下文切换开销。false:表示异步操作尚未就绪,协程需要挂起。编译器将继续调用await_suspend()。
典型场景:
- 缓存命中:如果你正在等待的数据可能已经存在于缓存中,
await_ready()可以检查缓存。如果命中,则返回true,直接从缓存中获取数据。 - 同步完成的操作:有些“异步”操作实际上可能在调用时就已经完成了(例如,一个线程池任务如果线程池已满,可能会同步执行)。
- 调度优化:在某些调度器中,如果任务可以立即执行(例如,没有其他高优先级任务),
await_ready()也可以返回true。
代码示例:一个简单的同步 Awaitable
假设我们有一个 SyncOperation,它总是能够立即提供结果,无需真正挂起。
#include <iostream>
#include <string>
#include <coroutine> // 包含协程相关的头文件
// 1. 定义一个 Awaiter 类
class SyncAwaiter {
public:
explicit SyncAwaiter(const std::string& msg) : message_(msg) {}
// 总是返回 true,表示操作已就绪,无需挂起
bool await_ready() const noexcept {
std::cout << "SyncAwaiter::await_ready() called. Returning true (no suspend)." << std::endl;
return true;
}
// 如果 await_ready() 返回 true,此方法将不会被调用
void await_suspend(std::coroutine_handle<> h) noexcept {
// 通常这里会保存句柄并安排恢复,但对于 SyncAwaiter,这不会发生
std::cout << "SyncAwaiter::await_suspend() called. (This should not happen if await_ready() is true)" << std::endl;
// 理论上这里不应该被执行到,如果被执行到,说明逻辑有问题
h.resume(); // 仅为演示,如果真的被调用了,立即恢复
}
// 获取操作结果
std::string await_resume() noexcept {
std::cout << "SyncAwaiter::await_resume() called. Returning result." << std::endl;
return "Synchronous result for: " + message_;
}
private:
std::string message_;
};
// 2. 定义一个 Awaitable 类,它通过 operator co_await 返回 SyncAwaiter
class SyncAwaitable {
public:
explicit SyncAwaitable(const std::string& msg) : message_(msg) {}
// 返回一个 SyncAwaiter 实例
SyncAwaiter operator co_await() const noexcept {
std::cout << "SyncAwaitable::operator co_await() called." << std::endl;
return SyncAwaiter(message_);
}
private:
std::string message_;
};
// 3. 定义一个 Task 类型作为协程的返回对象
// 这里为了简化,我们只定义一个基本框架
template<typename T>
struct Task {
struct promise_type {
T value_;
std::exception_ptr exception_;
std::coroutine_handle<promise_type> get_return_object() {
return std::coroutine_handle<promise_type>::from_promise(*this);
}
std::suspend_always initial_suspend() {
std::cout << "Task::initial_suspend()" << std::endl;
return {};
}
std::suspend_always final_suspend() noexcept {
std::cout << "Task::final_suspend()" << std::endl;
return {};
}
void return_value(T value) { value_ = value; }
void unhandled_exception() { exception_ = std::current_exception(); }
};
std::coroutine_handle<promise_type> handle_;
explicit Task(std::coroutine_handle<promise_type> h) : handle_(h) {}
Task(Task&& other) noexcept : handle_(std::exchange(other.handle_, nullptr)) {}
~Task() {
if (handle_) {
std::cout << "Task destructor: destroying coroutine handle." << std::endl;
handle_.destroy();
}
}
T get_result() {
if (handle_) {
// 确保协程已经执行完毕
if (!handle_.done()) {
handle_.resume(); // 恢复以完成执行
}
if (handle_.promise().exception_) {
std::rethrow_exception(handle_.promise().exception_);
}
return handle_.promise().value_;
}
throw std::runtime_error("Task is empty or moved.");
}
};
// 4. 协程函数
Task<std::string> perform_sync_task() {
std::cout << "perform_sync_task: before co_await" << std::endl;
std::string result = co_await SyncAwaitable("Hello Sync");
std::cout << "perform_sync_task: after co_await, result = " << result << std::endl;
co_return result;
}
int main() {
std::cout << "main: Calling perform_sync_task()" << std::endl;
Task<std::string> task = perform_sync_task();
std::cout << "main: perform_sync_task() returned, now getting result." << std::endl;
// 协程初始挂起,需要手动resume
task.handle_.resume();
std::string final_result = task.get_result();
std::cout << "main: Final result from task: " << final_result << std::endl;
return 0;
}
输出:
main: Calling perform_sync_task()
Task::initial_suspend()
main: perform_sync_task() returned, now getting result.
perform_sync_task: before co_await
SyncAwaitable::operator co_await() called.
SyncAwaiter::await_ready() called. Returning true (no suspend).
SyncAwaiter::await_resume() called. Returning result.
perform_sync_task: after co_await, result = Synchronous result for: Hello Sync
Task::final_suspend()
main: Final result from task: Synchronous result for: Hello Sync
Task destructor: destroying coroutine handle.
从输出可以看出,await_ready() 返回 true 后,await_suspend() 确实被跳过了,直接进入了 await_resume()。这完美展示了 await_ready() 的优化作用。
深入拆解:await_suspend() – 挂起与调度
当 await_ready() 返回 false 时,表示异步操作尚未完成,协程需要挂起。此时,编译器会调用 await_suspend()。这个方法是 co_await 机制的核心,因为它负责实际的协程挂起和调度。
-
签名与返回值:
await_suspend有三种可能的返回值类型,这提供了极大的灵活性:-
void await_suspend(std::coroutine_handle<> h) noexcept;- 最常见的用法。表示当前协程
h被挂起,控制权将返回给调用await_suspend的上下文(通常是协程的调度器或父协程)。await_suspend的实现者负责在异步操作完成后,通过调用h.resume()来恢复协程。
- 最常见的用法。表示当前协程
-
bool await_suspend(std::coroutine_handle<> h) noexcept;- 返回
true:与void返回值行为相同,协程h被挂起,控制权返回给调用者。 - 返回
false:这是一个高级优化。它表示虽然await_ready()返回了false,但在await_suspend的执行过程中,异步操作奇迹般地完成了,或者出于某种原因,协程不应该被挂起,而是应该立即恢复。此时,编译器会跳过真正的挂起操作,直接继续执行await_resume()。这在某些竞态条件下非常有用,可以避免一次不必要的上下文切换。
- 返回
-
std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept;- 返回一个
std::coroutine_handle<>句柄next_h。这表示当前协程h被挂起,但控制权不会返回给调用者,而是直接转移到next_h所代表的协程。这种机制可以用于实现协程间的直接切换,例如工作窃取(work-stealing)调度器。如果返回h本身,则表示不挂起当前协程,直接恢复它(类似于bool返回false)。如果返回一个空句柄,行为未定义。
- 返回一个
-
-
参数:
std::coroutine_handle<P> hh是一个指向当前正在执行co_await的协程的句柄。P是当前协程的promise_type。这个句柄是协程的唯一标识,也是恢复协程的唯一途径。
典型场景:
- 异步I/O操作:当发起一个网络请求或文件读取时,操作不会立即完成。
await_suspend()会将当前协程句柄保存起来,注册到I/O事件循环中,然后返回,让出CPU。当I/O操作完成后,事件循环会通过保存的句柄来恢复协程。 - 线程池调度:将协程任务提交到线程池。
await_suspend()会将协程句柄封装成一个可执行的任务,添加到线程池的队列中。当线程池中的某个线程空闲时,会取出任务并调用h.resume()。 - 延迟操作:等待一个定时器到期。
await_suspend()会将协程句柄注册到定时器管理器中,当时间到达时,定时器管理器会恢复协程。
代码示例:一个简单的异步 Awaitable(使用 void 返回值)
我们将模拟一个延迟操作,使用一个简单的全局调度器来恢复协程。
#include <iostream>
#include <string>
#include <coroutine>
#include <thread>
#include <chrono>
#include <vector>
#include <functional>
#include <queue>
#include <mutex>
// 1. 简单的全局调度器
class SimpleScheduler {
public:
static SimpleScheduler& get_instance() {
static SimpleScheduler instance;
return instance;
}
void schedule(std::function<void()> task) {
std::lock_guard<std::mutex> lock(mtx_);
tasks_.push(std::move(task));
cv_.notify_one();
}
void run() {
std::unique_lock<std::mutex> lock(mtx_);
while (!stop_) {
cv_.wait(lock, [this]{ return !tasks_.empty() || stop_; });
if (stop_) break;
auto task = tasks_.front();
tasks_.pop();
lock.unlock(); // 释放锁以执行任务,避免死锁或长时间持有锁
task();
lock.lock(); // 重新获取锁
}
std::cout << "Scheduler stopped." << std::endl;
}
void stop() {
std::lock_guard<std::mutex> lock(mtx_);
stop_ = true;
cv_.notify_all();
}
private:
SimpleScheduler() = default;
~SimpleScheduler() = default;
SimpleScheduler(const SimpleScheduler&) = delete;
SimpleScheduler& operator=(const SimpleScheduler&) = delete;
std::queue<std::function<void()>> tasks_;
std::mutex mtx_;
std::condition_variable cv_;
bool stop_ = false;
};
// 2. 定义一个异步 Awaiter
class DelayAwaiter {
public:
explicit DelayAwaiter(std::chrono::milliseconds delay_ms) : delay_(delay_ms) {}
bool await_ready() const noexcept {
// 对于模拟的延迟,通常不会立即就绪,除非延迟为0
std::cout << "DelayAwaiter::await_ready() called." << std::endl;
return delay_.count() == 0;
}
void await_suspend(std::coroutine_handle<> h) noexcept {
std::cout << "DelayAwaiter::await_suspend() called. Suspending for "
<< delay_.count() << "ms. Coroutine handle: " << h.address() << std::endl;
// 保存协程句柄,并在延迟后恢复
std::thread([h, delay = delay_] {
std::this_thread::sleep_for(delay);
std::cout << "Delay finished. Resuming coroutine handle: " << h.address() << std::endl;
SimpleScheduler::get_instance().schedule([h]{
if (h) h.resume(); // 在调度器线程中恢复协程
});
}).detach(); // 启动一个独立线程来等待并调度恢复
}
// 假设延迟操作没有返回值,或者返回一个确认状态
void await_resume() noexcept {
std::cout << "DelayAwaiter::await_resume() called." << std::endl;
}
private:
std::chrono::milliseconds delay_;
};
// 3. 定义一个 Awaitable 类
class Delay {
public:
explicit Delay(std::chrono::milliseconds delay_ms) : delay_ms_(delay_ms) {}
DelayAwaiter operator co_await() const noexcept {
std::cout << "Delay::operator co_await() called." << std::endl;
return DelayAwaiter(delay_ms_);
}
private:
std::chrono::milliseconds delay_ms_;
};
// 4. 定义一个 Task 类型作为协程的返回对象
// 这里为了简化,我们只定义一个基本框架
template<typename T>
struct Task {
struct promise_type {
T value_;
std::exception_ptr exception_;
std::coroutine_handle<promise_type> get_return_object() {
return std::coroutine_handle<promise_type>::from_promise(*this);
}
std::suspend_always initial_suspend() {
std::cout << "Task::initial_suspend()" << std::endl;
return {};
}
std::suspend_always final_suspend() noexcept {
std::cout << "Task::final_suspend()" << std::endl;
return {};
}
void return_value(T value) { value_ = value; }
void unhandled_exception() { exception_ = std::current_exception(); }
};
std::coroutine_handle<promise_type> handle_;
explicit Task(std::coroutine_handle<promise_type> h) : handle_(h) {}
Task(Task&& other) noexcept : handle_(std::exchange(other.handle_, nullptr)) {}
~Task() {
if (handle_) {
std::cout << "Task destructor: destroying coroutine handle." << std::endl;
handle_.destroy();
}
}
T get_result() {
if (handle_) {
// 协程可能还在挂起,需要确保它运行完毕
// 在实际应用中,这里会通过调度器等待任务完成
while (!handle_.done()) {
// For this example, we'll actively resume if not done.
// In a real system, the scheduler would handle this.
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 防止忙等待
if (!handle_.done()) { // 再次检查,可能已经由其他线程恢复
SimpleScheduler::get_instance().schedule([h = handle_]{
if (h && !h.done()) h.resume(); // 确保在调度器线程中恢复
});
}
}
if (handle_.promise().exception_) {
std::rethrow_exception(handle_.promise().exception_);
}
return handle_.promise().value_;
}
throw std::runtime_error("Task is empty or moved.");
}
// 用于启动协程的方便方法
void start() {
if (handle_ && !handle_.done()) {
SimpleScheduler::get_instance().schedule([h = handle_]{
if (h && !h.done()) h.resume();
});
}
}
};
// 5. 协程函数
Task<std::string> perform_async_delay_task() {
std::cout << "perform_async_delay_task: Before first co_await (500ms)" << std::endl;
co_await Delay(std::chrono::milliseconds(500));
std::cout << "perform_async_delay_task: After first co_await (500ms)" << std::endl;
std::cout << "perform_async_delay_task: Before second co_await (200ms)" << std::endl;
co_await Delay(std::chrono::milliseconds(200));
std::cout << "perform_async_delay_task: After second co_await (200ms)" << std::endl;
co_return "Async tasks completed!";
}
int main() {
std::cout << "main: Starting scheduler thread." << std::endl;
std::thread scheduler_thread([]{ SimpleScheduler::get_instance().run(); });
std::cout << "main: Calling perform_async_delay_task()" << std::endl;
Task<std::string> task = perform_async_delay_task();
task.start(); // 恢复初始挂起的协程
std::cout << "main: perform_async_delay_task() returned, main thread continues..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 给调度器一些时间
std::string final_result = task.get_result(); // 等待并获取结果
std::cout << "main: Final result from task: " << final_result << std::endl;
SimpleScheduler::get_instance().stop();
scheduler_thread.join();
return 0;
}
输出(大致顺序,实际可能因线程调度而异):
main: Starting scheduler thread.
main: Calling perform_async_delay_task()
Task::initial_suspend()
main: perform_async_delay_task() returned, main thread continues...
perform_async_delay_task: Before first co_await (500ms)
Delay::operator co_await() called.
DelayAwaiter::await_ready() called.
DelayAwaiter::await_suspend() called. Suspending for 500ms. Coroutine handle: 0x...
// main thread sleeps and then calls get_result() which might resume the task if it hasn't completed
// Scheduler thread is running in the background...
// ... (some delay) ...
Delay finished. Resuming coroutine handle: 0x...
DelayAwaiter::await_resume() called.
perform_async_delay_task: After first co_await (500ms)
perform_async_delay_task: Before second co_await (200ms)
Delay::operator co_await() called.
DelayAwaiter::await_ready() called.
DelayAwaiter::await_suspend() called. Suspending for 200ms. Coroutine handle: 0x...
// ... (some delay) ...
Delay finished. Resuming coroutine handle: 0x...
DelayAwaiter::await_resume() called.
perform_async_delay_task: After second co_await (200ms)
Task::final_suspend()
main: Final result from task: Async tasks completed!
Task destructor: destroying coroutine handle.
Scheduler stopped.
从输出中可以看到,当 await_ready() 返回 false 后,await_suspend() 被调用,协程被挂起。主线程继续执行,而一个新的线程被创建用于模拟异步延迟。当延迟结束后,该线程将协程句柄提交给调度器,调度器最终调用 h.resume() 恢复了协程的执行,从而继续执行 co_await 之后的代码。这清晰地展示了 await_suspend() 如何实现协程的挂起和外部调度。
深入拆解:await_resume() – 恢复与结果获取
当协程从挂起状态被恢复时,或者当 await_ready() 返回 true 导致协程无需挂起时,await_resume() 方法就会被调用。它是 co_await 表达式的最后一个阶段。
- 签名:
ReturnType await_resume() noexcept; - 返回值:
ReturnType:await_resume()的返回值就是整个co_await expr表达式的最终结果。如果co_await表达式没有被赋值给任何变量,那么ReturnType可以是void。- 这个返回值通常是从异步操作中获取的实际数据,或者是一个表示操作成功的状态。
典型场景:
- 获取异步操作结果:例如,从一个
std::future或std::promise中获取异步计算的结果。 - 错误处理:检查异步操作是否成功。如果操作失败,
await_resume()可以抛出异常,让协程内部的try-catch块捕获处理,或者返回一个错误码/状态。 - 资源清理:在某些情况下,可以在获取结果后进行一些轻量级的清理工作。
代码示例:在异步 Awaitable 中获取结果
我们将修改之前的 DelayAwaiter,让它在延迟完成后返回一个字符串消息。
#include <iostream>
#include <string>
#include <coroutine>
#include <thread>
#include <chrono>
#include <vector>
#include <functional>
#include <queue>
#include <mutex>
#include <future> // 用于从异步线程获取结果
// ... (SimpleScheduler, Task promise_type and Task class remain the same) ...
// 为了保持文章长度和焦点,这里省略重复的 SimpleScheduler 和 Task 的定义。
// 假设它们已经定义如上文。
// 定义一个异步 Awaiter,能够返回结果
class DelayResultAwaiter {
public:
explicit DelayResultAwaiter(std::chrono::milliseconds delay_ms, const std::string& message)
: delay_(delay_ms), message_(message) {}
bool await_ready() const noexcept {
std::cout << "DelayResultAwaiter::await_ready() called." << std::endl;
return delay_.count() == 0;
}
void await_suspend(std::coroutine_handle<> h) noexcept {
std::cout << "DelayResultAwaiter::await_suspend() called. Suspending for "
<< delay_.count() << "ms. Coroutine handle: " << h.address() << std::endl;
// 使用 std::promise 和 std::future 来在异步线程中设置结果
// 并在主协程恢复时通过 await_resume 获取
std::promise<std::string> p;
future_ = p.get_future(); // 保存 future 以便 await_resume 使用
std::thread([h, delay = delay_, msg = message_, p = std::move(p)]() mutable {
std::this_thread::sleep_for(delay);
std::cout << "Delay finished. Setting result and resuming coroutine handle: " << h.address() << std::endl;
p.set_value("Delayed result for: " + msg); // 设置结果
SimpleScheduler::get_instance().schedule([h]{
if (h) h.resume(); // 在调度器线程中恢复协程
});
}).detach();
}
// 获取操作结果
std::string await_resume() noexcept {
std::cout << "DelayResultAwaiter::await_resume() called. Getting result from future." << std::endl;
// 等待并获取异步线程设置的结果
// 注意:这里调用 .get() 会阻塞当前(恢复协程的)线程,
// 在实际应用中,如果 future 还没有就绪,可能需要更复杂的非阻塞等待机制。
// 但由于我们是在 await_suspend 中确保了恢复时结果已就绪,这里是安全的。
return future_.get();
}
private:
std::chrono::milliseconds delay_;
std::string message_;
std::shared_future<std::string> future_; // 使用 shared_future 以便可以拷贝或多次访问
};
// 3. 定义一个 Awaitable 类
class DelayWithResult {
public:
explicit DelayWithResult(std::chrono::milliseconds delay_ms, const std::string& message)
: delay_ms_(delay_ms), message_(message) {}
DelayResultAwaiter operator co_await() const noexcept {
std::cout << "DelayWithResult::operator co_await() called." << std::endl;
return DelayResultAwaiter(delay_ms_, message_);
}
private:
std::chrono::milliseconds delay_ms_;
std::string message_;
};
// 4. 协程函数
Task<std::string> perform_async_task_with_result() {
std::cout << "perform_async_task_with_result: Before first co_await (300ms)" << std::endl;
std::string result1 = co_await DelayWithResult(std::chrono::milliseconds(300), "First delay");
std::cout << "perform_async_task_with_result: After first co_await, result1 = " << result1 << std::endl;
std::cout << "perform_async_task_with_result: Before second co_await (100ms)" << std::endl;
std::string result2 = co_await DelayWithResult(std::chrono::milliseconds(100), "Second delay");
std::cout << "perform_async_task_with_result: After second co_await, result2 = " << result2 << std::endl;
co_return result1 + " | " + result2;
}
int main() {
std::cout << "main: Starting scheduler thread." << std::endl;
std::thread scheduler_thread([]{ SimpleScheduler::get_instance().run(); });
std::cout << "main: Calling perform_async_task_with_result()" << std::endl;
Task<std::string> task = perform_async_task_with_result();
task.start(); // 恢复初始挂起的协程
std::cout << "main: perform_async_task_with_result() returned, main thread continues..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 给调度器一些时间
std::string final_result = task.get_result(); // 等待并获取结果
std::cout << "main: Final result from task: " << final_result << std::endl;
SimpleScheduler::get_instance().stop();
scheduler_thread.join();
return 0;
}
输出(大致顺序):
main: Starting scheduler thread.
main: Calling perform_async_task_with_result()
Task::initial_suspend()
main: perform_async_task_with_result() returned, main thread continues...
perform_async_task_with_result: Before first co_await (300ms)
DelayWithResult::operator co_await() called.
DelayResultAwaiter::await_ready() called.
DelayResultAwaiter::await_suspend() called. Suspending for 300ms. Coroutine handle: 0x...
// ... (some delay) ...
Delay finished. Setting result and resuming coroutine handle: 0x...
DelayResultAwaiter::await_resume() called. Getting result from future.
perform_async_task_with_result: After first co_await, result1 = Delayed result for: First delay
perform_async_task_with_result: Before second co_await (100ms)
DelayWithResult::operator co_await() called.
DelayResultAwaiter::await_ready() called.
DelayResultAwaiter::await_suspend() called. Suspending for 100ms. Coroutine handle: 0x...
// ... (some delay) ...
Delay finished. Setting result and resuming coroutine handle: 0x...
DelayResultAwaiter::await_resume() called. Getting result from future.
perform_async_task_with_result: After second co_await, result2 = Delayed result for: Second delay
Task::final_suspend()
main: Final result from task: Delayed result for: First delay | Delayed result for: Second delay
Task destructor: destroying coroutine handle.
Scheduler stopped.
这个例子清晰地展示了 await_resume() 如何从异步操作中获取最终结果,并将其作为 co_await 表达式的值返回。
Awaiter 类型与定制 Awaitable
在上面的例子中,我们看到了 Awaitable 类型 (如 Delay) 通过 operator co_await() 返回一个 Awaiter 类型 (如 DelayAwaiter)。这是一种推荐的模式,因为它将 Awaitable 的描述与 Awaiter 的行为逻辑解耦。
- Awaitable:通常是一个轻量级的对象,它描述了一个异步操作,例如一个网络请求的参数、一个文件路径和一个操作类型。它本身可能不直接包含协程挂起/恢复的逻辑。
- Awaiter:这是一个更重量级的对象,它拥有实际的
await_ready、await_suspend和await_resume方法。它通常会捕获 Awaitable 对象的信息,并管理协程句柄的生命周期。
通过 operator co_await 机制,我们可以:
- 惰性创建 Awaiter:只有当
expr真正被co_await时,operator co_await才会被调用,从而创建 Awaiter 对象。 - 资源管理:Awaiter 可以持有异步操作所需的临时资源,并在
await_resume完成后或析构时释放。 - 泛型 Awaitable:一个 Awaitable 类型可以为不同的协程上下文(通过
std::coroutine_traits)提供不同的operator co_await重载,从而返回定制的 Awaiter。
协程句柄 (std::coroutine_handle) 详解
std::coroutine_handle 是 C++ 协程的核心抽象,它是对协程帧(coroutine frame)的非拥有型引用。协程帧是编译器为协程创建的一块内存区域,用于存储协程的局部变量、参数、当前暂停点以及 promise_type 实例。
std::coroutine_handle<P>:这是一个带有promise_type模板参数的协程句柄。P是协程的promise_type类型。通过它可以访问协程的promise_type实例 (h.promise())。std::coroutine_handle<void>或std::coroutine_handle<>:这是一个不带promise_type信息的通用协程句柄,只能用于恢复 (resume()) 或销毁 (destroy()) 协程,但不能访问promise_type。在await_suspend中,通常使用std::coroutine_handle<>作为参数,因为它不关心具体的promise_type。
关键方法:
h.resume():恢复协程的执行。协程将从上次暂停的位置继续。h.destroy():销毁协程帧,释放其占用的内存。调用h.destroy()后,h变得无效。通常在协程完全结束后(通过final_suspend或co_return)由返回对象负责调用。h.done():检查协程是否已经执行完毕。返回true表示协程已完成,不能再被恢复。std::coroutine_handle<P>::from_promise(P& promise):从promise_type实例获取其对应的协程句柄。这是协程内部获取自身句柄的唯一方式。
await_suspend 接收 std::coroutine_handle<> 参数,意味着它可以在不了解协程具体类型的情况下,调度任何协程的恢复。这是协程调度器设计的基础。
promise_type 与 Awaitable 的协同
promise_type 是协程帧中的一个关键组成部分,它定义了协程的生命周期行为以及与外部世界的交互方式。Awaitable 协议与 promise_type 紧密协作,共同管理协程的执行。
promise_type 提供了一系列定制点:
get_return_object():在协程函数调用时立即调用,用于创建并返回协程的“返回对象”(例如Task<T>、std::future等)。这个对象是协程与外部世界通信的桥梁。initial_suspend():协程体真正执行前调用。它的返回值必须是一个 Awaitable。- 如果
initial_suspend().await_ready()为true,协程会立即开始执行。 - 如果
initial_suspend().await_ready()为false,协程会立即挂起(通常返回std::suspend_always{}),等待外部调用handle.resume()来启动。这使得我们可以在协程创建后,延迟其执行,进行一些初始化或调度。
- 如果
final_suspend():协程体执行完毕(无论是通过co_return、异常还是执行到末尾)后调用。它的返回值也必须是一个 Awaitable。- 如果
final_suspend().await_ready()为true,协程帧会被立即销毁。 - 如果
final_suspend().await_ready()为false,协程会再次挂起(通常返回std::suspend_always{}),允许返回对象在销毁协程帧之前,获取最终结果或进行清理。这对于实现类似于std::future的语义非常重要,因为它允许在结果被消费后才销毁协程。
- 如果
return_value(T val)/return_void():处理co_return语句。return_value用于有返回值的协程,return_void用于返回void的协程。它们通常用于将协程的结果存储在promise_type中,以便get_return_object返回的对象能够访问。unhandled_exception():当协程内部抛出未捕获的异常时调用。它通常用于将异常存储在promise_type中,以便返回对象能够重新抛出该异常。
initial_suspend() 和 final_suspend() 本身就是 Awaitable,这意味着它们遵循 await_ready、await_suspend 和 await_resume 的协议。这为协程的启动和结束提供了极大的灵活性和可定制性。
Awaitable 协议的强大与精妙
通过深入拆解 co_await 展开后的 await_ready、await_suspend 和 await_resume 状态流转,我们看到了 C++ 协程如何将复杂的异步操作封装成简洁、线性的代码。Awaitable 协议是 C++ 协程机制的基石,它提供了一个强大的可扩展框架,使得开发者能够:
- 实现各种异步原语:无论是异步I/O、定时器、线程池任务,还是更复杂的并发模式,都可以通过实现 Awaitable 协议来与
co_await无缝集成。 - 细粒度控制协程行为:通过
await_ready进行同步路径优化,通过await_suspend控制挂起和调度,通过await_resume处理结果和异常。 - 构建高效的调度器:
await_suspend中的coroutine_handle使得调度器能够完全控制协程的生命周期和执行顺序。
C++ 协程的引入,标志着 C++ 在异步和并发编程领域迈出了重要一步。理解 Awaitable 协议的内在机制,将帮助我们更好地设计和实现高性能、高可维护性的并发系统。它提供了一种优雅的方式,让我们能够以人类更易理解的方式,来表达那些原本需要复杂状态机或回调链条才能实现的异步逻辑。