C++20 Coroutines 深度解析:为高并发网络库手写一个轻量级调度器
各位同仁,下午好!今天我们齐聚一堂,共同探讨 C++20 Coroutines 这一现代 C++ 的里程碑式特性。我的目标是深入剖析 Coroutines 的底层机制,并以此为基石,一步步带大家手写一个轻量级、高效的协程调度器,专门服务于高并发网络库的场景。这不仅仅是理论讲解,更是一场实践的演练,我们将看到 C++20 如何让我们以同步的思维编写异步代码,从而彻底改变高并发编程的范式。
1. 为什么是 C++20 Coroutines?高并发网络编程的痛点与机遇
在高并发网络服务领域,性能和可伸缩性始终是核心挑战。传统的编程模型主要有以下几种:
- 多线程/进程模型(Thread/Process Per Connection): 每个客户端连接分配一个独立的线程或进程。
- 优点: 编程模型直观,易于理解。
- 缺点: 操作系统线程/进程的创建、销毁和上下文切换开销巨大。当连接数达到数千甚至数万时,资源消耗和调度开销将成为瓶颈。内存占用高,且受限于操作系统对线程数的限制。
- 异步回调模型(Asynchronous Callbacks): 使用事件驱动和回调函数处理网络事件。
- 优点: 单线程或少量线程即可处理大量连接,资源开销小。
- 缺点: “回调地狱”(Callback Hell)问题严重,代码逻辑分散、难以追踪和维护。错误处理复杂,不易实现顺序逻辑。
- Future/Promise 模型: 引入 Future/Promise 模式来管理异步操作的结果。
- 优点: 比纯回调模型有所改善,提供了更好的组合性。
- 缺点: 仍然需要显式地链式调用,控制流仍然分散,特别是对于复杂的业务逻辑。
C++20 Coroutines 的出现,为解决这些痛点带来了革命性的机遇。它允许我们:
- 以同步风格编写异步代码: 协程能够在不阻塞当前线程的情况下,暂停自身的执行,并在适当的时机恢复。这使得原本复杂且分散的异步逻辑,可以写得像传统的同步代码一样直观、易读、易维护。
- 极低的上下文切换开销: 协程的上下文切换由用户态控制,无需陷入内核,其开销远低于操作系统线程。
- 高效利用系统资源: 少量操作系统线程即可驱动大量协程,显著降低内存和 CPU 占用。
因此,手写一个轻量级调度器,将 C++20 Coroutines 与 I/O 多路复用(如 epoll, io_uring, kqueue, IOCP)相结合,是构建高性能、高并发网络库的理想选择。
2. C++20 Coroutines 核心概念解析
在着手实现调度器之前,我们必须对 C++20 Coroutines 的核心组件和工作原理有深入的理解。C++20 Coroutines 是一种“无栈协程”(Stackless Coroutine),其状态全部存储在堆上分配的协程帧(Coroutine Frame)中。
2.1. 协程的构成要素
一个 C++20 协程由以下几个关键部分组成:
- 协程函数 (Coroutine Function): 任何包含
co_await,co_yield或co_return关键字的函数都是协程函数。 - Promise 类型 (Promise Type): 协程函数返回类型(例如
Task<T>) 必须有一个嵌套的promise_type类型定义。这个promise_type是协程与外部世界交互的接口,它定义了协程的生命周期行为(何时暂停、何时恢复、如何返回值或异常)。 - 协程句柄 (
std::coroutine_handle): 这是协程帧的类型擦除指针。通过std::coroutine_handle<PromiseType>或std::coroutine_handle<void>,我们可以操作协程(恢复、销毁)。 - Awaitable (可等待对象): 任何实现了
await_ready(),await_suspend(),await_resume()三个成员函数的类型都是 Awaitable。await_ready(): 检查是否可以立即继续执行,如果为true,则不暂停协程。await_suspend(std::coroutine_handle<>): 执行暂停操作。通常会注册一个回调或将协程句柄传递给调度器,然后返回一个bool(表示是否真的暂停) 或void。await_resume(): 在协程恢复后执行,返回co_await表达式的结果。
- 协程帧 (Coroutine Frame): 当协程函数被调用时,编译器会生成一个内部数据结构,通常在堆上分配,用于存储协程的局部变量、参数、返回值以及协程的当前状态(即恢复点)。
std::coroutine_handle就是指向这个帧的指针。
2.2. 协程生命周期与关键字
C++20 提供了三个核心关键字来控制协程的生命周期:
co_await: 用于暂停当前协程,并等待一个 Awaitable 对象的结果。- 当遇到
co_await时,会调用 Awaitable 的await_ready()。 - 如果
await_ready()返回false,则调用await_suspend(),当前协程暂停,控制权返回给调用者(或调度器)。 - 当 Awaitable 操作完成,协程被恢复时,调用
await_resume()。
- 当遇到
co_yield: 用于生成一个值并暂停当前协程。主要用于实现生成器(Generator)。co_return: 用于结束协程,并返回一个值(如果协程有返回值)。这会触发promise_type的return_value()或return_void()方法,并可能导致协程最终暂停(通过final_suspend())。
Promise Type 的关键方法:
promise_type 是协程行为的“配置中心”。以下是其关键方法:
get_return_object(): 在协程函数开始时被调用,返回协程的“外部接口”对象(例如Task<T>),这个对象通常包含一个std::coroutine_handle。initial_suspend(): 在协程函数体执行前被调用,决定协程是否在启动时立即暂停。- 通常返回
std::suspend_always{}(立即暂停,等待调度器恢复) 或std::suspend_never{}(立即执行)。
- 通常返回
final_suspend(): 在协程函数体执行完毕(包括co_return或异常)后被调用,决定协程是否在结束时暂停。- 通常返回
std::suspend_always{}(暂停,等待调度器销毁或重新调度) 或std::suspend_never{}(立即销毁协程帧)。
- 通常返回
return_value(T value)/return_void(): 当协程执行到co_return时被调用,将返回值传递给 promise。unhandled_exception(): 当协程内部抛出未捕获的异常时被调用。
#include <iostream>
#include <coroutine>
#include <thread>
#include <chrono>
// 1. 定义一个简单的 Task 类型,用于演示
template<typename T>
struct Task {
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
handle_type handle;
Task(handle_type h) : handle(h) {}
Task(Task&& other) noexcept : handle(std::exchange(other.handle, nullptr)) {}
~Task() {
if (handle) {
handle.destroy();
}
}
T get_result() {
// 在实际调度器中,这里会等待协程完成
// 为了演示,我们直接恢复并获取结果
if (!handle.done()) {
handle.resume(); // 强制恢复,直到完成
}
return handle.promise().result;
}
struct promise_type {
T result;
Task get_return_object() {
return Task{handle_type::from_promise(*this)};
}
// 协程启动时立即暂停,等待外部调度
std::suspend_always initial_suspend() { return {}; }
// 协程结束时也暂停,等待外部销毁
std::suspend_always final_suspend() noexcept { return {}; }
void return_value(T value) {
result = value;
}
void unhandled_exception() {
std::terminate(); // 简单处理未捕获异常
}
};
};
// 2. 一个简单的 Awaitable,用于模拟异步操作
struct Delay {
std::chrono::milliseconds duration;
bool await_ready() const noexcept { return duration.count() == 0; }
void await_suspend(std::coroutine_handle<> h) {
std::thread([h, this]() {
std::this_thread::sleep_for(duration);
h.resume(); // 延迟结束后恢复协程
}).detach(); // 为了演示方便,直接分离线程
}
void await_resume() const noexcept {}
};
// 3. 协程函数示例
Task<int> my_coroutine() {
std::cout << "Coroutine started on thread " << std::this_thread::get_id() << std::endl;
co_await Delay{std::chrono::milliseconds(100)};
std::cout << "Coroutine resumed after 100ms on thread " << std::this_thread::get_id() << std::endl;
co_await Delay{std::chrono::milliseconds(200)};
std::cout << "Coroutine resumed after another 200ms on thread " << std::this_thread::get_id() << std::endl;
co_return 42;
}
// int main() {
// std::cout << "Main thread started on thread " << std::this_thread::get_id() << std::endl;
// Task<int> task = my_coroutine();
// std::cout << "Coroutine created, now waiting for it to complete..." << std::endl;
// // 在调度器中,我们会通过事件循环来驱动 task.handle.resume()
// // 这里我们手动调用 get_result() 来触发第一次 resume 并等待
// int result = task.get_result(); // 第一次 resume 由 get_result() 触发
// std::cout << "Coroutine finished with result: " << result << std::endl;
// std::cout << "Main thread finished." << std::endl;
// // 注意:由于 await_suspend 是在分离线程中恢复,main 函数可能先于协程结束。
// // 实际调度器会管理这些。
// std::this_thread::sleep_for(std::chrono::seconds(1)); // 确保所有线程有时间完成
// return 0;
// }
上述代码演示了 Task、promise_type 和一个简单的 Delay Awaitable。main 函数中我们手动触发协程的创建和恢复,但在真实的高并发场景中,我们需要一个统一的调度器来管理这些协程的生命周期和执行。
3. 设计轻量级协程调度器:核心理念与组件
我们的目标是手写一个轻量级调度器,它能够在一个或少量 OS 线程上高效地管理大量协程,特别适用于网络 I/O。其核心理念是:将协程的执行与 I/O 等待分离。当协程等待 I/O 时,它暂停执行,将控制权交还给调度器。调度器则利用 I/O 多路复用机制(如 epoll)监听所有等待中的 I/O 事件。当某个 I/O 事件就绪时,调度器会恢复相应的协程。
3.1. 调度器核心组件
一个轻量级协程调度器通常包含以下关键组件:
| 组件名称 | 描述 |
|---|---|
Task<T> (协程包装器) |
用户编写协程时使用的返回类型。它封装了 std::coroutine_handle<promise_type>,并提供了 promise_type 的定义。promise_type 会与调度器交互,例如在 initial_suspend() 和 final_suspend() 中将协程句柄推送到调度器的待执行队列。 |
Scheduler / EventLoop (调度器核心) |
这是调度器的主要执行实体,通常在一个 OS 线程中运行。它维护一个“可运行队列”(Ready Queue)来存放等待执行的协程,以及一个“等待 I/O 队列”(Pending I/O Map)来存放等待 I/O 事件的协程。它包含一个 I/O 多路复用器(如 epoll 实例),负责监听文件描述符上的事件。 |
IoAwaiter (I/O 等待器) |
这是一个特殊的 Awaitable 对象。当协程需要等待某个文件描述符(Socket)的 I/O 事件(读、写、接受连接等)时,它会 co_await IoAwaiter。IoAwaiter 的 await_suspend() 方法会将当前协程的句柄注册到调度器的 I/O 监听器中,并暂停协程。当 I/O 事件就绪时,调度器会找到对应的协程句柄并恢复它。 |
| 可运行队列 (Ready Queue) | 一个线程安全的队列(例如 std::deque 或 moodycamel::ConcurrentQueue),用于存储已准备好恢复执行的协程句柄。调度器在空闲时或 I/O 事件处理后,会从这个队列中取出协程并恢复它们。 |
| 等待 I/O 映射 (Pending I/O Map) | 一个将文件描述符 (fd) 映射到 std::coroutine_handle<> 的数据结构(例如 std::unordered_map<int, std::coroutine_handle<>>)。当 IoAwaiter 暂停协程时,它会将协程句柄与对应的 fd 关联起来。当 epoll_wait 报告 fd 就绪时,调度器就能通过这个映射找到并恢复正确的协程。 |
| 计时器管理 (Timer Management) | 用于处理延时任务和超时。一个优先级队列或红黑树可以用来存储定时任务及其对应的协程句柄。调度器在每次循环中检查是否有定时任务到期,并调度相应的协程。 |
| 线程池 (可选) | 对于计算密集型任务,或者为了利用多核 CPU,调度器可以运行在一个线程池中。每个线程运行一个 EventLoop 实例,或者共享一个 EventLoop 并通过互斥锁保护共享队列。对于网络 I/O,通常一个 EventLoop 运行在一个线程中就足够高效了,多个 EventLoop 实例可以绑定不同的 CPU 核心。 |
3.2. 调度器的工作流程
- 协程启动: 用户调用协程函数,返回一个
Task对象。Task的promise_type在initial_suspend()中将协程句柄添加到调度器的可运行队列。 - 调度器循环 (
EventLoop::run()):- 处理可运行任务: 从可运行队列中取出所有就绪的协程,并逐一
resume()它们。 - 等待 I/O 事件: 调用
epoll_wait(或等效的 I/O 多路复用 API) 阻塞等待 I/O 事件发生。epoll_wait的超时时间可以根据最近的计时器事件来设置。 - 处理 I/O 事件: 当
epoll_wait返回时,遍历所有就绪的事件。对于每个事件,根据文件描述符从“等待 I/O 映射”中找到对应的协程句柄,并将其添加到可运行队列中。 - 处理计时器事件: 检查是否有到期的计时器任务,如果有,将对应的协程句柄添加到可运行队列。
- 重复: 循环往复,直到调度器被显式停止。
- 处理可运行任务: 从可运行队列中取出所有就绪的协程,并逐一
- 协程暂停 (通过
co_await IoAwaiter):- 协程执行到
co_await IoAwaiter。 IoAwaiter::await_suspend()被调用,它将当前协程句柄注册到调度器的“等待 I/O 映射”中,并通知 I/O 多路复用器监听此文件描述符。await_suspend()返回,协程暂停,控制权返回给调度器。
- 协程执行到
- 协程恢复: 当 I/O 事件就绪时,调度器从“等待 I/O 映射”中取出对应的协程句柄,将其放入可运行队列。在下一轮循环中,调度器会
resume()该协程,使其从co_await点之后继续执行。
4. 手写轻量级调度器:代码实现
我们将以 Linux 上的 epoll 为例,逐步构建我们的调度器。
4.1. 基础工具类和宏定义
#include <iostream>
#include <coroutine>
#include <thread>
#include <chrono>
#include <deque>
#include <vector>
#include <map>
#include <unordered_map>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <optional>
#include <memory>
#include <atomic>
// For epoll
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <string.h> // For strerror
// 错误处理辅助宏
#define THROW_IF_NEGATIVE(val, msg)
do {
if ((val) < 0) {
perror(msg);
throw std::runtime_error(std::string(msg) + ": " + strerror(errno));
}
} while (0)
enum class IoEvent {
READ = EPOLLIN,
WRITE = EPOLLOUT,
EDGE_TRIGGERED = EPOLLET,
ONE_SHOT = EPOLLONESHOT
};
// 简单的日志函数
void log_info(const std::string& msg) {
std::cout << "[INFO][" << std::this_thread::get_id() << "] " << msg << std::endl;
}
void log_error(const std::string& msg) {
std::cerr << "[ERROR][" << std::this_thread::get_id() << "] " << msg << std::endl;
}
// 设置非阻塞文件描述符
int set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
THROW_IF_NEGATIVE(flags, "fcntl(F_GETFL)");
THROW_IF_NEGATIVE(fcntl(fd, F_SETFL, flags | O_NONBLOCK), "fcntl(F_SETFL, O_NONBLOCK)");
return 0;
}
4.2. Scheduler / EventLoop 类
这是调度器的核心,负责管理协程队列和 I/O 事件。
class EventLoop {
public:
using CoroHandle = std::coroutine_handle<>;
using TaskFunc = std::function<void()>;
private:
std::deque<CoroHandle> runnable_tasks;
std::mutex tasks_mutex;
std::condition_variable tasks_cv;
// fd -> CoroutineHandle 映射,用于I/O事件就绪时恢复协程
std::unordered_map<int, CoroHandle> pending_io_reads;
std::unordered_map<int, CoroHandle> pending_io_writes;
std::mutex io_mutex; // 保护 pending_io_reads/writes
int epoll_fd;
std::atomic<bool> running;
std::thread event_thread;
// 计时器管理: 时间点 -> CoroHandle
// 使用 std::multimap 允许不同协程在同一时间点触发
std::multimap<std::chrono::steady_clock::time_point, CoroHandle> timers;
std::mutex timers_mutex;
std::condition_variable timers_cv; // 用于唤醒 epoll_wait 或等待新任务
public:
EventLoop() : running(false) {
epoll_fd = epoll_create1(0);
THROW_IF_NEGATIVE(epoll_fd, "epoll_create1");
log_info("EventLoop created with epoll_fd: " + std::to_string(epoll_fd));
}
~EventLoop() {
stop();
if (epoll_fd != -1) {
close(epoll_fd);
}
log_info("EventLoop destroyed.");
}
void start() {
if (running.exchange(true)) {
return; // Already running
}
event_thread = std::thread(&EventLoop::run, this);
log_info("EventLoop started.");
}
void stop() {
if (!running.exchange(false)) {
return; // Not running
}
// Wake up epoll_wait if it's blocked, so it can exit its loop
// A common way is to write to a pipe monitored by epoll.
// For simplicity, we just hope it eventually times out or processes existing events.
// A more robust shutdown mechanism would use a pipe.
log_info("EventLoop stopping...");
if (event_thread.joinable()) {
event_thread.join();
}
log_info("EventLoop stopped.");
}
// 将协程句柄添加到可运行队列
void schedule(CoroHandle handle) {
{
std::lock_guard<std::mutex> lock(tasks_mutex);
runnable_tasks.push_back(handle);
}
tasks_cv.notify_one(); // 唤醒调度线程
}
// 从其他线程提交一个任务 (lambda) 到主循环执行
void post(TaskFunc func) {
// 创建一个无返回值的协程来包装这个函数
// 实际应用中,可以考虑更直接的 post 机制
schedule([func]() -> CoroHandle { // 这是一个立即执行的 lambda,返回 CoroHandle
struct FuncPromise {
EventLoop* loop_ptr;
CoroHandle get_return_object() { return CoroHandle::from_promise(*this); }
std::suspend_always initial_suspend() { return {}; }
void return_void() {}
void unhandled_exception() { std::terminate(); }
// final_suspend 默认是 suspend_always
};
auto handle = CoroHandle::from_promise(*static_cast<FuncPromise*>(nullptr)); // Dummy handle for return type
func(); // 执行实际的函数
co_return; // 结束协程
}()); // 立即调用 lambda 以创建协程并获取句柄
}
// 注册I/O事件,并暂停当前协程
void register_io_await(int fd, IoEvent event_type, CoroHandle handle) {
std::lock_guard<std::mutex> lock(io_mutex);
// 记录协程句柄
if ((static_cast<int>(event_type) & EPOLLIN) && pending_io_reads.find(fd) == pending_io_reads.end()) {
pending_io_reads[fd] = handle;
}
if ((static_cast<int>(event_type) & EPOLLOUT) && pending_io_writes.find(fd) == pending_io_writes.end()) {
pending_io_writes[fd] = handle;
}
epoll_event event{};
event.data.fd = fd;
// 关注的事件,通常是 EPOLLET (边缘触发)
event.events = static_cast<uint32_t>(event_type) | EPOLLERR | EPOLLHUP;
// EPOLL_CTL_ADD 或 EPOLL_CTL_MOD
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
if (errno == EEXIST) {
// 如果fd已经存在,则尝试修改
THROW_IF_NEGATIVE(epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event), "epoll_ctl(MOD)");
} else {
THROW_IF_NEGATIVE(-1, "epoll_ctl(ADD)");
}
}
// log_info("Registered fd " + std::to_string(fd) + " for event " + std::to_string(static_cast<int>(event_type)));
}
// 取消注册I/O事件
void unregister_io(int fd, IoEvent event_type) {
std::lock_guard<std::mutex> lock(io_mutex);
bool removed_read = false;
if (static_cast<int>(event_type) & EPOLLIN) {
removed_read = pending_io_reads.erase(fd) > 0;
}
bool removed_write = false;
if (static_cast<int>(event_type) & EPOLLOUT) {
removed_write = pending_io_writes.erase(fd) > 0;
}
// 只有当该fd不再被任何协程等待时才从epoll中移除
if (pending_io_reads.find(fd) == pending_io_reads.end() &&
pending_io_writes.find(fd) == pending_io_writes.end()) {
epoll_event event{}; // data.fd is irrelevant for DEL
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &event) == -1 && errno != ENOENT) {
// ENOENT means it wasn't there, which is fine if already handled or fd closed
log_error("Failed to unregister fd " + std::to_string(fd) + " from epoll: " + strerror(errno));
} else {
// log_info("Unregistered fd " + std::to_string(fd) + " from epoll.");
}
}
}
// 添加一个定时器任务
void add_timer(std::chrono::steady_clock::time_point deadline, CoroHandle handle) {
{
std::lock_guard<std::mutex> lock(timers_mutex);
timers.emplace(deadline, handle);
log_info("Added timer for coroutine at " + std::to_string(deadline.time_since_epoch().count()));
}
timers_cv.notify_one(); // 唤醒 EventLoop 线程,重新计算 epoll_wait 超时时间
}
private:
void run() {
log_info("EventLoop thread started.");
constexpr int MAX_EVENTS = 64;
epoll_event events[MAX_EVENTS];
while (running) {
// 1. 处理可运行队列中的协程
process_runnable_tasks();
// 2. 计算 epoll_wait 的超时时间
int timeout_ms = -1; // 默认无限等待
{
std::lock_guard<std::mutex> lock(timers_mutex);
if (!timers.empty()) {
auto now = std::chrono::steady_clock::now();
auto next_deadline = timers.begin()->first;
if (next_deadline <= now) {
timeout_ms = 0; // 立即处理计时器
} else {
timeout_ms = std::chrono::duration_cast<std::chrono::milliseconds>(next_deadline - now).count();
if (timeout_ms < 0) timeout_ms = 0; // 避免负数
}
}
}
// 3. 等待 I/O 事件
int num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, timeout_ms);
if (!running) break; // 及时退出
if (num_events == -1) {
if (errno == EINTR) {
continue; // 被信号中断,重新等待
}
log_error("epoll_wait failed: " + std::string(strerror(errno)));
// 致命错误,退出循环
running = false;
break;
}
// 4. 处理 I/O 就绪事件
process_io_events(events, num_events);
// 5. 处理到期的计时器
process_timers();
}
log_info("EventLoop thread exiting.");
}
void process_runnable_tasks() {
std::deque<CoroHandle> current_tasks;
{
std::unique_lock<std::mutex> lock(tasks_mutex);
// 等待直到有任务或EventLoop停止
tasks_cv.wait(lock, [this]{ return !runnable_tasks.empty() || !running; });
if (!running && runnable_tasks.empty()) return;
current_tasks.swap(runnable_tasks);
}
for (CoroHandle handle : current_tasks) {
if (!handle.done()) {
// log_info("Resuming scheduled coroutine.");
handle.resume();
} else {
// log_info("Coroutine already done, destroying.");
handle.destroy();
}
}
}
void process_io_events(epoll_event* events, int num_events) {
std::lock_guard<std::mutex> lock(io_mutex);
for (int i = 0; i < num_events; ++i) {
int fd = events[i].data.fd;
uint32_t event_flags = events[i].events;
CoroHandle handle;
bool found = false;
if (event_flags & (EPOLLIN | EPOLLERR | EPOLLHUP)) {
auto it = pending_io_reads.find(fd);
if (it != pending_io_reads.end()) {
handle = it->second;
pending_io_reads.erase(it); // 移除,因为协程即将恢复
found = true;
}
}
// 如果同时监听读写,并且读事件处理后仍需要处理写事件
if (!found && (event_flags & (EPOLLOUT | EPOLLERR | EPOLLHUP))) {
auto it = pending_io_writes.find(fd);
if (it != pending_io_writes.end()) {
handle = it->second;
pending_io_writes.erase(it); // 移除
found = true;
}
}
if (found) {
// log_info("I/O event on fd " + std::to_string(fd) + ", scheduling coroutine for resume.");
schedule(handle); // 将协程重新放入可运行队列
} else {
// 可能是未注册的fd,或者已经处理过的fd
// log_info("I/O event on fd " + std::to_string(fd) + " but no pending coroutine found.");
}
}
}
void process_timers() {
std::lock_guard<std::mutex> lock(timers_mutex);
auto now = std::chrono::steady_clock::now();
auto it = timers.begin();
while (it != timers.end() && it->first <= now) {
schedule(it->second);
it = timers.erase(it);
}
}
};
4.3. Task<T> (协程包装器)
我们需要一个 Task<T> 类型来作为协程的返回类型,它将协程句柄与调度器关联起来。
template<typename T>
struct Task {
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
handle_type handle;
EventLoop* scheduler_ptr = nullptr; // 指向调度器实例
Task(handle_type h, EventLoop* sched) : handle(h), scheduler_ptr(sched) {}
Task(Task&& other) noexcept : handle(std::exchange(other.handle, nullptr)), scheduler_ptr(std::exchange(other.scheduler_ptr, nullptr)) {}
Task(const Task&) = delete;
Task& operator=(const Task&) = delete;
Task& operator=(Task&& other) noexcept {
if (this != &other) {
if (handle) handle.destroy();
handle = std::exchange(other.handle, nullptr);
scheduler_ptr = std::exchange(other.scheduler_ptr, nullptr);
}
return *this;
}
~Task() {
if (handle) {
// log_info("Task destructor: destroying coroutine handle.");
handle.destroy();
}
}
// Awaitable 接口,允许 `co_await Task<T>`
bool await_ready() const noexcept { return !handle || handle.done(); }
void await_suspend(std::coroutine_handle<> awaiting_coro) noexcept {
// 当一个协程等待另一个 Task<T> 完成时,将等待者注册到被等待的 Task 的 promise 中
// 这样当被等待的 Task 完成时,可以恢复等待者
handle.promise().set_continuation(awaiting_coro);
// 如果 Task 还没有开始执行 (initial_suspend), 调度它
if (!handle.promise().started) {
scheduler_ptr->schedule(handle);
}
}
T await_resume() {
if (handle.promise().exception_ptr) {
std::rethrow_exception(handle.promise().exception_ptr);
}
return handle.promise().result;
}
struct promise_type {
T result;
std::exception_ptr exception_ptr;
EventLoop* scheduler_ptr = nullptr;
std::coroutine_handle<> continuation_handle; // 等待当前Task完成的协程
bool started = false; // 标记协程是否已开始执行
Task get_return_object() {
return Task{handle_type::from_promise(*this), scheduler_ptr};
}
std::suspend_always initial_suspend() {
// 协程启动时暂停,等待调度器调度
started = false;
return {};
}
std::suspend_always final_suspend() noexcept {
// 协程结束时暂停
if (continuation_handle) {
// 如果有等待者,将等待者调度
scheduler_ptr->schedule(continuation_handle);
}
return {};
}
void return_value(T value) {
result = value;
}
void unhandled_exception() {
exception_ptr = std::current_exception();
}
void set_scheduler(EventLoop* sched) {
scheduler_ptr = sched;
}
void set_continuation(std::coroutine_handle<> h) {
continuation_handle = h;
}
};
};
// Task<void> 的特化
template<>
struct Task<void> {
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
handle_type handle;
EventLoop* scheduler_ptr = nullptr;
Task(handle_type h, EventLoop* sched) : handle(h), scheduler_ptr(sched) {}
Task(Task&& other) noexcept : handle(std::exchange(other.handle, nullptr)), scheduler_ptr(std::exchange(other.scheduler_ptr, nullptr)) {}
Task(const Task&) = delete;
Task& operator=(const Task&) = delete;
Task& operator=(Task&& other) noexcept {
if (this != &other) {
if (handle) handle.destroy();
handle = std::exchange(other.handle, nullptr);
scheduler_ptr = std::exchange(other.scheduler_ptr, nullptr);
}
return *this;
}
~Task() {
if (handle) {
// log_info("Task<void> destructor: destroying coroutine handle.");
handle.destroy();
}
}
bool await_ready() const noexcept { return !handle || handle.done(); }
void await_suspend(std::coroutine_handle<> awaiting_coro) noexcept {
handle.promise().set_continuation(awaiting_coro);
if (!handle.promise().started) {
scheduler_ptr->schedule(handle);
}
}
void await_resume() {
if (handle.promise().exception_ptr) {
std::rethrow_exception(handle.promise().exception_ptr);
}
}
struct promise_type {
std::exception_ptr exception_ptr;
EventLoop* scheduler_ptr = nullptr;
std::coroutine_handle<> continuation_handle;
bool started = false;
Task get_return_object() {
return Task{handle_type::from_promise(*this), scheduler_ptr};
}
std::suspend_always initial_suspend() {
started = false;
return {};
}
std::suspend_always final_suspend() noexcept {
if (continuation_handle) {
scheduler_ptr->schedule(continuation_handle);
}
return {};
}
void return_void() {}
void unhandled_exception() {
exception_ptr = std::current_exception();
}
void set_scheduler(EventLoop* sched) {
scheduler_ptr = sched;
}
void set_continuation(std::coroutine_handle<> h) {
continuation_handle = h;
}
};
};
// 辅助函数,设置协程的调度器
template<typename T, typename... Args>
Task<T> make_task(EventLoop& scheduler, Args&&... args) {
Task<T> task = args...; // 构造Task
task.handle.promise().set_scheduler(&scheduler); // 设置promise的scheduler
scheduler.schedule(task.handle); // 调度Task的首次执行
task.handle.promise().started = true; // 标记已调度
return task;
}
4.4. IoAwaiter (I/O 等待器)
这是实现非阻塞 I/O 的关键。
struct IoAwaiter {
int fd;
IoEvent event_type;
EventLoop* scheduler_ptr;
IoAwaiter(int _fd, IoEvent _event_type, EventLoop* sched)
: fd(_fd), event_type(_event_type), scheduler_ptr(sched) {}
bool await_ready() const noexcept {
// 总是返回 false,确保协程暂停并将控制权交给调度器
// 除非有特殊情况,例如数据已在缓冲区中,可以直接读取
return false;
}
void await_suspend(std::coroutine_handle<> h) {
// 注册 I/O 事件到调度器
scheduler_ptr->register_io_await(fd, event_type, h);
}
void await_resume() {
// I/O 事件已就绪,协程恢复执行。
// 在这里可以进行清理工作,例如从epoll中移除fd(如果是一次性事件)。
// 对于边缘触发,通常不会立即移除,而是等待下一次co_await
// log_info("IoAwaiter: I/O event ready for fd " + std::to_string(fd) + " event " + std::to_string(static_cast<int>(event_type)));
}
};
// 辅助函数,方便创建 IoAwaiter
IoAwaiter io_wait(int fd, IoEvent event_type, EventLoop& scheduler) {
return IoAwaiter(fd, event_type, &scheduler);
}
4.5. TimerAwaiter (计时器等待器)
用于实现 co_await delay(milliseconds)。
struct DelayAwaiter {
std::chrono::milliseconds duration;
EventLoop* scheduler_ptr;
DelayAwaiter(std::chrono::milliseconds d, EventLoop* sched)
: duration(d), scheduler_ptr(sched) {}
bool await_ready() const noexcept {
return duration.count() == 0;
}
void await_suspend(std::coroutine_handle<> h) {
auto deadline = std::chrono::steady_clock::now() + duration;
scheduler_ptr->add_timer(deadline, h);
}
void await_resume() const noexcept {}
};
DelayAwaiter delay(std::chrono::milliseconds d, EventLoop& scheduler) {
return DelayAwaiter(d, &scheduler);
}
4.6. 示例:Echo Server
现在,我们使用上述组件构建一个简单的 TCP Echo Server。
// 在所有 Task 和 EventLoop 定义之后
// Acceptor 协程:监听并接受新连接
Task<void> acceptor_coroutine(int listen_fd, EventLoop& scheduler) {
log_info("Acceptor coroutine started on listen_fd: " + std::to_string(listen_fd));
set_nonblocking(listen_fd);
while (true) {
co_await io_wait(listen_fd, IoEvent::READ | IoEvent::EDGE_TRIGGERED, scheduler);
sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_fd = accept(listen_fd, (sockaddr*)&client_addr, &client_len);
if (client_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 没有更多连接了,继续等待
continue;
} else {
log_error("Accept failed: " + std::string(strerror(errno)));
co_return; // 致命错误,退出acceptor
}
}
set_nonblocking(client_fd);
char client_ip[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &client_addr.sin_addr, client_ip, INET_ADDRSTRLEN);
log_info("Accepted connection from " + std::string(client_ip) + ":" + std::to_string(ntohs(client_addr.sin_port)) + " on fd " + std::to_string(client_fd));
// 启动一个新的协程来处理客户端连接
make_task<void>(scheduler, client_handler_coroutine(client_fd, scheduler));
}
}
// Client Handler 协程:处理单个客户端连接的读写
Task<void> client_handler_coroutine(int client_fd, EventLoop& scheduler) {
log_info("Client handler coroutine started for fd: " + std::to_string(client_fd));
char buffer[1024];
try {
while (true) {
// 等待可读事件
co_await io_wait(client_fd, IoEvent::READ | IoEvent::EDGE_TRIGGERED, scheduler);
ssize_t bytes_read = recv(client_fd, buffer, sizeof(buffer), 0);
if (bytes_read == 0) {
log_info("Client disconnected from fd: " + std::to_string(client_fd));
break; // 客户端关闭连接
} else if (bytes_read == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 没有数据可读了,继续等待
continue;
} else {
log_error("Recv failed for fd " + std::to_string(client_fd) + ": " + std::string(strerror(errno)));
break;
}
}
log_info("Received " + std::to_string(bytes_read) + " bytes from fd " + std::to_string(client_fd));
// 等待可写事件
co_await io_wait(client_fd, IoEvent::WRITE | IoEvent::EDGE_TRIGGERED, scheduler);
ssize_t bytes_sent = send(client_fd, buffer, bytes_read, 0);
if (bytes_sent == -1) {
log_error("Send failed for fd " + std::to_string(client_fd) + ": " + std::string(strerror(errno)));
break;
}
log_info("Sent " + std::to_string(bytes_sent) + " bytes to fd " + std::to_string(client_fd));
}
} catch (const std::exception& e) {
log_error("Exception in client_handler_coroutine for fd " + std::to_string(client_fd) + ": " + e.what());
}
// 清理工作
scheduler.unregister_io(client_fd, IoEvent::READ | IoEvent::WRITE);
close(client_fd);
log_info("Client handler coroutine finished for fd: " + std::to_string(client_fd));
}
// 主函数
int main() {
log_info("Server starting...");
EventLoop scheduler;
scheduler.start(); // 启动调度器线程
// 创建监听 socket
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
THROW_IF_NEGATIVE(listen_fd, "socket");
set_nonblocking(listen_fd);
int opt = 1;
THROW_IF_NEGATIVE(setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)), "setsockopt");
sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY; // 监听所有可用接口
server_addr.sin_port = htons(8080); // 监听端口8080
THROW_IF_NEGATIVE(bind(listen_fd, (sockaddr*)&server_addr, sizeof(server_addr)), "bind");
THROW_IF_NEGATIVE(listen(listen_fd, SOMAXCONN), "listen");
log_info("Listening on port 8080...");
// 启动 acceptor 协程
make_task<void>(scheduler, acceptor_coroutine(listen_fd, scheduler));
// 保持主线程运行,直到按Enter键
std::cout << "Press Enter to stop the server..." << std::endl;
std::cin.ignore();
log_info("Stopping server...");
scheduler.stop(); // 停止调度器线程
close(listen_fd);
log_info("Server stopped.");
return 0;
}
5. 进一步的考量与优化
我们已经构建了一个功能完备但相对简单的协程调度器。在实际生产环境中,还需要考虑以下高级特性和优化:
- 错误处理与异常传播: 当前的
Task简单地std::terminate或std::rethrow_exception。更健壮的系统需要更细致的异常处理策略,例如将异常包装在std::expected或std::variant中返回,或提供协程级别的错误回调。 - 取消机制: 如何优雅地取消一个正在运行或等待中的协程?这通常需要一个
cancellation_token,并在await_suspend中注册取消回调,在await_resume中检查是否被取消。 - 内存管理: 协程帧通常在堆上分配,长时间运行的协程可能会导致内存碎片。可以考虑使用自定义内存分配器(例如对象池)来管理协程帧的生命周期。
- 线程模型:
- 单线程
EventLoop: 适用于 I/O 密集型任务,避免锁竞争。 - 多线程
EventLoop: 多个EventLoop实例,每个运行在一个 OS 线程上,可以绑定不同的 CPU 核心,通过std::hash(fd)或其他负载均衡策略将文件描述符分配给不同的EventLoop。 - 计算密集型任务: 对于需要大量 CPU 计算的协程,不应直接在
EventLoop线程中执行。可以引入一个独立的线程池,将计算任务post到线程池中执行,待计算完成后再resume回 I/OEventLoop。
- 单线程
- I/O 多路复用 API 扩展:
io_uring: Linux 5.1+ 提供的异步 I/O 接口,能够提供比epoll更高的性能和更丰富的异步操作。将其集成到调度器中将是一个显著的升级。kqueue(macOS/FreeBSD),IOCP(Windows): 跨平台支持需要抽象 I/O 接口。
- 协程局部存储 (Coroutine Local Storage): 类似于线程局部存储,但作用于协程。这对于某些上下文信息(如请求 ID、用户身份)在协程生命周期内传递非常有用。
- 调试: 协程的调试比传统线程更复杂,因为堆栈帧不连续。需要专门的调试工具或日志记录来追踪协程的执行路径。
6. 结语
C++20 Coroutines 为我们打开了构建高性能、高并发系统的全新大门。通过手写一个轻量级调度器,我们不仅深入理解了 Coroutines 的工作机制,更掌握了如何将这一强大特性与底层系统 I/O 机制相结合,以同步的直观性编写出异步的高效代码。这仅仅是一个起点,未来在 io_uring、更精细的调度策略和更完善的库支持下,C++ Coroutines 的潜力将得到更充分的释放。希望今天的讲座能为大家在 C++ 高并发网络编程的探索之路上提供有益的启发和实践指导。