各位同仁,女士们,先生们,
欢迎来到今天的技术讲座。今天我们将深入探讨一个在现代C++异步编程中日益重要的概念:Async Generator。我们将不仅仅停留在理论层面,更将利用C++协程的强大能力,设计并实现一个基于流(Streaming)的高性能并行计算排队逻辑。这对于处理大规模数据流、构建响应式系统以及优化资源利用率至关重要。
引言:高性能流式处理的挑战与协程的答案
在当今数据密集型应用中,我们经常面临处理海量数据流的挑战。这些数据可能源源不断地从网络、文件系统或其他服务涌入。传统的处理方式,如阻塞I/O、回调函数嵌套("callback hell")或简单的线程池模型,往往暴露出效率低下、难以维护、背压(backpressure)机制缺失等问题。
想象一个场景:您正在构建一个实时数据分析系统,需要从传感器或消息队列持续接收数据包,对每个数据包执行复杂的计算,并将结果按原始顺序输出。
- 挑战1:吞吐量。 数据包到达速度可能非常快,单一线程无法及时处理。
- 挑战2:延迟。 即使并行处理,如果处理逻辑是阻塞的,也会导致整个系统响应迟钝。
- 挑战3:背压。 如果数据生产者速度远超消费者,内存会迅速耗尽。我们需要一种机制让下游压力反馈到上游,从而减缓生产速度。
- 挑战4:代码可读性与维护性。 复杂的异步逻辑容易导致代码难以理解和调试。
C++20引入的协程(Coroutines)为解决这些问题提供了优雅而强大的工具。协程允许函数在执行过程中暂停(co_await, co_yield),并在稍后从暂停点恢复,而无需保存整个调用栈。这使得编写异步代码变得像编写同步代码一样直观,极大地提高了代码的可读性和可维护性。
Async Generator,顾名思义,是异步版本的生成器。它是一个可以异步地 co_yield 值,同时也可以异步地 co_await 其他异步操作的协程。它将异步操作和按需生成数据流的特性结合起来,是构建高性能、响应式流处理系统的理想选择。
什么是Generator (同步生成器)?
在深入Async Generator之前,我们先快速回顾一下同步生成器(Generator)。生成器是一种特殊的函数,它能够暂停执行并返回一个值(co_yield),然后在下次请求时从上次暂停的地方继续执行。它实现了迭代器模式,但不是一次性生成所有数据,而是按需生成。
核心概念:
co_yield: 暂停协程并返回一个值。promise_type: 协程的“骨架”,定义了协程的生命周期行为(何时暂停、何时恢复、如何返回值、如何处理异常等)。std::coroutine_handle: 协程的句柄,用于控制协程的执行(恢复、销毁)。
示例:一个简单的同步Fibonacci生成器
#include <coroutine>
#include <iostream>
#include <optional>
#include <stdexcept>
#include <thread>
#include <deque>
#include <map>
#include <functional>
#include <future>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <vector>
// 1. 同步生成器 Generator<T> 的 Promise 类型
template<typename T>
struct GeneratorPromise {
T value_; // 存储 co_yield 产生的值
std::coroutine_handle<> caller_handle_; // 用于恢复调用者(这里是迭代器)
// 获取返回对象,即 Generator<T>
auto get_return_object() {
return Generator<T>{std::coroutine_handle<GeneratorPromise>::from_promise(*this)};
}
// 初始暂停:总是暂停,由调用者显式启动
std::suspend_always initial_suspend() { return {}; }
// 最终暂停:总是暂停,允许消费者在协程结束后获取最终状态或处理
std::suspend_always final_suspend() noexcept { return {}; }
// 处理 co_yield T value
std::suspend_always yield_value(T value) {
value_ = std::move(value);
return {}; // 暂停协程,等待下次迭代器请求
}
// 处理 co_return; (通常在生成器中不直接使用,通过协程结束隐式返回void)
void return_void() {}
// 异常处理
void unhandled_exception() {
// 简单起见,这里直接重新抛出,实际应用中可能更复杂
throw;
}
};
// 2. 同步生成器 Generator<T> 类型 (返回对象)
template<typename T>
struct Generator {
using promise_type = GeneratorPromise<T>;
std::coroutine_handle<promise_type> handle_;
// 构造函数
Generator(std::coroutine_handle<promise_type> h) : handle_(h) {}
// 移动语义
Generator(Generator&& other) noexcept : handle_(std::exchange(other.handle_, {})) {}
Generator& operator=(Generator&& other) noexcept {
if (this != &other) {
if (handle_) handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
// 析构函数:确保协程句柄被销毁
~Generator() {
if (handle_) handle_.destroy();
}
// 禁止拷贝
Generator(const Generator&) = delete;
Generator& operator=(const Generator&) = delete;
// 迭代器接口
struct iterator {
std::coroutine_handle<promise_type> handle_;
iterator(std::coroutine_handle<promise_type> h) : handle_(h) {}
iterator() : handle_(nullptr) {}
// 前置递增运算符
iterator& operator++() {
if (handle_ && !handle_.done()) {
handle_.resume(); // 恢复协程,生成下一个值
}
if (handle_.done()) {
handle_ = nullptr; // 协程结束,标记为 end 迭代器
}
return *this;
}
// 解引用运算符
T operator*() const {
return handle_.promise().value_;
}
// 比较运算符
bool operator!=(const iterator& other) const {
return handle_ != other.handle_;
}
};
// begin() 方法
iterator begin() {
if (handle_) {
handle_.resume(); // 启动协程
if (handle_.done()) {
return {}; // 如果协程立即结束,返回 end 迭代器
}
}
return iterator{handle_};
}
// end() 方法
iterator end() {
return {}; // 默认构造的迭代器代表结束
}
};
// 3. Fibonacci 生成器函数
Generator<int> fibonacci_generator(int count) {
int a = 0, b = 1;
for (int i = 0; i < count; ++i) {
co_yield a; // 暂停并返回当前Fibonacci数
int next = a + b;
a = b;
b = next;
}
}
/*
int main() {
std::cout << "Synchronous Fibonacci Generator:" << std::endl;
for (int value : fibonacci_generator(10)) {
std::cout << value << " ";
}
std::cout << std::endl;
return 0;
}
*/
同步生成器总结:
- 通过
co_yield暂停并返回一个值。 - 通过
resume()恢复执行。 - 本质上是同步的,每次
operator++都会立即执行协程直到下一个co_yield或结束。 - 适用于按需生成数据,但数据源本身是同步的或已经准备好的场景。
从同步到异步:Async Generator的诞生
同步生成器在数据源就绪的情况下表现出色,但如果数据生成过程本身是异步的呢?例如,从网络接收数据、等待数据库查询结果、或者等待另一个并行任务完成。这时,简单地 co_yield 一个值然后立即恢复协程就会导致阻塞,或者无法充分利用异步I/O的优势。
Async Generator 应运而生。它将 Generator 的按需生成特性与异步编程的非阻塞特性结合起来。一个Async Generator可以:
co_await一个异步操作,等待其完成。co_yield一个值,这个值的生成可能涉及异步操作。当值准备好后,它会通知消费者可以co_await并获取下一个值了。
核心挑战:
- 如何让
co_yield暂停协程,并让消费者在值准备好时异步地co_await获得它? - 如何管理协程的生命周期和状态,使其能够与事件循环或线程池无缝集成?
构建 Async Generator 的核心组件
我们将定义一个 AsyncGenerator<T> 类型,它代表一个异步的值流。这个 AsyncGenerator<T> 将是一个Awaitable,意味着消费者可以 co_await 它来获取下一个值。
1. Task<T>:基础异步操作的表示
为了更好地集成协程和异步操作(例如线程池任务),我们需要一个统一的 Awaitable 类型。std::future 在与协程集成时存在局限性(无法直接注册回调),因此我们通常会定义自己的 Task<T> 类型。
Task<T> 是一个轻量级的异步操作句柄,它封装了对一个异步结果的承诺。当结果可用时,它能够恢复等待它的协程。
// 辅助类:包装协程句柄,用于在完成时恢复
struct CoroutineResumer {
std::coroutine_handle<> handle_to_resume;
void operator()() {
if (handle_to_resume) {
handle_to_resume.resume();
}
}
};
// Task 的 Promise 类型
template<typename T>
struct TaskPromise {
std::optional<T> result_;
std::exception_ptr exception_;
std::coroutine_handle<> awaiting_coroutine_; // 等待此任务完成的协程
auto get_return_object() {
return Task<T>{std::coroutine_handle<TaskPromise>::from_promise(*this)};
}
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_value(T value) {
result_ = std::move(value);
if (awaiting_coroutine_) {
awaiting_coroutine_.resume(); // 任务完成,恢复等待者
}
}
void unhandled_exception() {
exception_ = std::current_exception();
if (awaiting_coroutine_) {
awaiting_coroutine_.resume(); // 异常发生,恢复等待者
}
}
};
// Task<T> 类型:协程返回的对象,也是一个 awaitable
template<typename T>
struct Task {
using promise_type = TaskPromise<T>;
std::coroutine_handle<promise_type> handle_;
Task(std::coroutine_handle<promise_type> h) : handle_(h) {}
Task(Task&& other) noexcept : handle_(std::exchange(other.handle_, {})) {}
Task& operator=(Task&& other) noexcept {
if (this != &other) {
if (handle_) handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
~Task() { if (handle_) handle_.destroy(); }
bool await_ready() const {
return !handle_ || handle_.promise().result_.has_value() || handle_.promise().exception_ != nullptr;
}
// 当任务尚未完成时,暂停当前协程,并将当前协程的句柄存储起来
void await_suspend(std::coroutine_handle<> awaiting_coroutine) {
handle_.promise().awaiting_coroutine_ = awaiting_coroutine;
// 如果任务协程还没有开始执行,现在开始
if (handle_ && !handle_.done() && !handle_.promise().result_.has_value() && handle_.promise().exception_ == nullptr) {
handle_.resume();
}
}
T await_resume() {
if (handle_.promise().exception_ != nullptr) {
std::rethrow_exception(handle_.promise().exception_);
}
return std::move(handle_.promise().result_.value());
}
};
// 特化 Task<void>
template<>
struct Task<void> {
using promise_type = TaskPromise<void>;
std::coroutine_handle<promise_type> handle_;
Task(std::coroutine_handle<promise_type> h) : handle_(h) {}
Task(Task&& other) noexcept : handle_(std::exchange(other.handle_, {})) {}
Task& operator=(Task&& other) noexcept {
if (this != &other) {
if (handle_) handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
~Task() { if (handle_) handle_.destroy(); }
bool await_ready() const {
return !handle_ || handle_.promise().exception_ != nullptr; // For void task, only check exception or if already done
}
void await_suspend(std::coroutine_handle<> awaiting_coroutine) {
handle_.promise().awaiting_coroutine_ = awaiting_coroutine;
if (handle_ && !handle_.done() && handle_.promise().exception_ == nullptr) {
handle_.resume();
}
}
void await_resume() {
if (handle_.promise().exception_ != nullptr) {
std::rethrow_exception(handle_.promise().exception_);
}
}
};
// TaskPromise<void> 需要一个 return_void()
template<>
struct TaskPromise<void> {
std::exception_ptr exception_;
std::coroutine_handle<> awaiting_coroutine_;
auto get_return_object() {
return Task<void>{std::coroutine_handle<TaskPromise>::from_promise(*this)};
}
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() {
if (awaiting_coroutine_) {
awaiting_coroutine_.resume();
}
}
void unhandled_exception() {
exception_ = std::current_exception();
if (awaiting_coroutine_) {
awaiting_coroutine_.resume();
}
}
};
Task<T>作为基础,我们将用它来包装线程池的计算结果,以及作为AsyncGenerator的next()方法的返回类型。
2. AsyncGenerator<T>:异步生成器本身
AsyncGenerator<T> 的 promise_type 将会复杂一些,因为它不仅要处理 co_yield,还要与 Task<T> 机制配合,实现异步暂停和恢复。
// AsyncGenerator 的 Promise 类型
template<typename T>
struct AsyncGeneratorPromise {
std::optional<T> value_; // co_yield 产生的值
std::exception_ptr exception_;
std::coroutine_handle<> awaiting_consumer_; // 等待下一个值的消费者协程
std::coroutine_handle<> producer_handle_; // 协程句柄自身
AsyncGeneratorPromise() : producer_handle_(nullptr) {}
auto get_return_object() {
producer_handle_ = std::coroutine_handle<AsyncGeneratorPromise>::from_promise(*this);
return AsyncGenerator<T>{producer_handle_};
}
// 初始暂停:总是暂停,等待消费者调用 next() 启动
std::suspend_always initial_suspend() { return {}; }
// 最终暂停:总是暂停,允许消费者在协程结束后检查状态
std::suspend_always final_suspend() noexcept { return {}; }
// 处理 co_yield T value
// 当值准备好时,存储值,并暂停,等待消费者来取
std::suspend_always yield_value(T value) {
value_ = std::move(value);
if (awaiting_consumer_) {
awaiting_consumer_.resume(); // 唤醒消费者
}
return {}; // 暂停生成器协程
}
// 处理 co_return; 标记流结束
void return_void() {
value_.reset(); // 清空值,表示结束
if (awaiting_consumer_) {
awaiting_consumer_.resume(); // 唤醒消费者,通知流已结束
}
}
void unhandled_exception() {
exception_ = std::current_exception();
if (awaiting_consumer_) {
awaiting_consumer_.resume();
}
}
};
// AsyncGenerator<T> 类型 (返回对象)
template<typename T>
struct AsyncGenerator {
using promise_type = AsyncGeneratorPromise<T>;
std::coroutine_handle<promise_type> handle_;
AsyncGenerator(std::coroutine_handle<promise_type> h) : handle_(h) {}
AsyncGenerator(AsyncGenerator&& other) noexcept : handle_(std::exchange(other.handle_, {})) {}
AsyncGenerator& operator=(AsyncGenerator&& other) noexcept {
if (this != &other) {
if (handle_) handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
~AsyncGenerator() { if (handle_) handle_.destroy(); }
AsyncGenerator(const AsyncGenerator&) = delete;
AsyncGenerator& operator=(const AsyncGenerator&) = delete;
// Awaitable 结构,用于 co_await generator.next()
struct NextAwaitable {
AsyncGenerator<T>& generator_;
std::optional<T> value_; // 存储获取到的值
bool is_done_;
NextAwaitable(AsyncGenerator<T>& gen) : generator_(gen), is_done_(false) {}
bool await_ready() const {
if (!generator_.handle_ || generator_.handle_.done()) {
return true; // 协程已结束
}
return generator_.handle_.promise().value_.has_value() || generator_.handle_.promise().exception_ != nullptr;
}
void await_suspend(std::coroutine_handle<> awaiting_coroutine) {
generator_.handle_.promise().awaiting_consumer_ = awaiting_coroutine;
// 恢复生成器以产生下一个值
generator_.handle_.resume();
}
bool await_resume() {
if (generator_.handle_.promise().exception_ != nullptr) {
std::rethrow_exception(generator_.handle_.promise().exception_);
}
if (generator_.handle_.promise().value_.has_value()) {
value_ = std::move(generator_.handle_.promise().value_.value());
generator_.handle_.promise().value_.reset(); // 值已被消费,清空
return true; // 成功获取到值
}
is_done_ = true; // 流已结束
return false;
}
// 获取值的方法
T get_value() {
if (value_.has_value()) {
return std::move(value_.value());
}
throw std::runtime_error("No value available or generator finished.");
}
// 检查流是否结束
bool done() const { return is_done_; }
};
NextAwaitable next() {
return NextAwaitable(*this);
}
};
在 AsyncGenerator 的设计中,next() 方法返回一个 NextAwaitable 对象。消费者会 co_await 这个 NextAwaitable。
- 当
await_suspend被调用时,它会存储消费者协程的句柄,然后resume()生成器协程。 - 生成器协程执行,直到
co_yield一个值或co_return(表示结束)。 - 当生成器
co_yield值时,它会调用awaiting_consumer_.resume()唤醒消费者。 - 消费者被唤醒后,其
await_resume()方法被调用,它会从promise中取出值。
基于流的高性能并行计算排队逻辑
现在,我们将利用 AsyncGenerator 和 Task 来实现一个高性能的并行计算排队逻辑。
场景描述:
我们有一个异步的输入流,源源不断地提供 ComputationTask。我们需要将这些任务提交给一个线程池进行并行处理,然后将处理结果收集起来,并以原始任务的顺序通过另一个异步流 co_yield 出去。整个过程必须是非阻塞的,并具备背压机制。
核心组件:
ComputationTask和ComputationResult: 定义计算任务和结果的数据结构。ThreadPool: 用于并行执行计算任务的线程池。它将返回我们的Task<T>类型。AsyncSourceGenerator: 模拟异步的任务输入流。ParallelProcessorGenerator: 核心逻辑,它将co_await输入任务,提交给线程池,管理在途任务,实现背压和结果排序,然后co_yield出结果。AsyncConsumer: 消费最终的异步结果流。Scheduler: 一个简单的调度器,用于驱动所有协程的执行。
1. ComputationTask 和 ComputationResult
// 计算任务结构
struct ComputationTask {
size_t id;
int data; // 模拟任务数据
// 构造函数
ComputationTask(size_t i, int d) : id(i), data(d) {}
};
// 计算结果结构
struct ComputationResult {
size_t id;
long long result_data; // 模拟结果数据
// 构造函数
ComputationResult(size_t i, long long r) : id(i), result_data(r) {}
};
2. ThreadPool:返回 Task<T> 的线程池
我们的线程池将接收一个函数对象,并在一个工作线程上执行它。执行完成后,它会通过我们自定义的 Task<T> 机制来恢复等待结果的协程。
class ThreadPool {
public:
ThreadPool(size_t num_threads) : stop_(false) {
for (size_t i = 0; i < num_threads; ++i) {
workers_.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) {
return;
}
task = std::move(tasks_.front());
tasks_.pop_front();
}
task(); // 执行任务
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for (std::thread& worker : workers_) {
worker.join();
}
}
// 提交一个任务,返回一个 Task<T>
template<typename F, typename... Args>
auto submit(F&& f, Args&&... args) -> Task<std::invoke_result_t<F, Args...>> {
using result_type = std::invoke_result_t<F, Args...>;
// 创建 Task 对应的 Promise
auto task_promise = new TaskPromise<result_type>();
Task<result_type> task_obj{std::coroutine_handle<TaskPromise<result_type>>::from_promise(*task_promise)};
// 包装实际的函数,在执行后设置 Promise 的值
std::function<void()> wrapped_task = [
func = std::bind(std::forward<F>(f), std::forward<Args>(args)...),
promise_ptr = task_promise
]() mutable {
try {
if constexpr (std::is_void_v<result_type>) {
func();
promise_ptr->return_void();
} else {
promise_ptr->return_value(func());
}
} catch (...) {
promise_ptr->unhandled_exception();
}
};
{
std::unique_lock<std::mutex> lock(queue_mutex_);
tasks_.emplace_back(std::move(wrapped_task));
}
condition_.notify_one();
return task_obj;
}
private:
std::vector<std::thread> workers_;
std::deque<std::function<void()>> tasks_;
std::mutex queue_mutex_;
std::condition_variable condition_;
bool stop_;
};
这里的 ThreadPool::submit 返回 Task<T>。当 submit 返回时,计算任务可能尚未开始执行。调用者可以 co_await 这个 Task<T>,当线程池完成计算并设置了 Promise 的结果时,等待的协程就会被恢复。
3. AsyncSourceGenerator:模拟输入流
这个生成器模拟一个外部异步数据源,例如从网络读取或从文件中分块读取。它会 co_yield ComputationTask 对象。
// 模拟一个异步任务源
AsyncGenerator<ComputationTask> async_task_source(int num_tasks, int delay_ms) {
for (size_t i = 0; i < num_tasks; ++i) {
// 模拟异步I/O或等待
std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); // 实际应 co_await 异步操作
std::cout << "[Source] Yielding task " << i << std::endl;
co_yield ComputationTask(i, static_cast<int>(i * 10));
}
std::cout << "[Source] All tasks yielded." << std::endl;
co_return;
}
注意: 这里的 std::this_thread::sleep_for 在实际的异步生成器中应该替换为真正的异步等待(例如 co_await 一个定时器或网络读取操作)。为了简化示例,我们使用同步睡眠来模拟延迟。在一个真正的事件循环中,std::this_thread::sleep_for 会阻塞整个调度器。
4. ParallelProcessorGenerator:核心并行处理与排队逻辑
这是我们系统的核心。它将从 AsyncSourceGenerator 中 co_await 任务,将它们提交给 ThreadPool,然后管理在途任务的生命周期和结果的排序,最终将结果 co_yield 出去。
它需要实现:
- 并行性:同时处理多个任务。
- 背压:限制同时在线程池中执行的任务数量,防止过载。
- 结果排序:确保
co_yield出的结果与输入任务的id顺序一致。
// 核心并行处理器,带有背压和结果排序
AsyncGenerator<ComputationResult> parallel_processor_generator(
AsyncGenerator<ComputationTask> input_tasks,
ThreadPool& pool,
size_t max_in_flight_tasks)
{
// 存储正在执行的任务及其对应的 Task 对象
std::map<size_t, Task<ComputationResult>> in_flight_tasks;
// 存储已经完成但尚未按序 co_yield 的结果
std::map<size_t, ComputationResult> completed_results_buffer;
size_t next_expected_id = 0; // 下一个期望输出的任务ID
bool source_exhausted = false;
while (!source_exhausted || !in_flight_tasks.empty() || !completed_results_buffer.empty()) {
// 1. 尝试从输入流获取新任务并提交到线程池
// 只有当“在途任务”数量未达到上限且输入流未耗尽时才拉取新任务
if (!source_exhausted && in_flight_tasks.size() < max_in_flight_tasks) {
auto next_task_awaitable = input_tasks.next();
if (co_await next_task_awaitable) { // 尝试获取下一个任务
ComputationTask task = next_task_awaitable.get_value();
std::cout << "[Processor] Submitting task " << task.id << " to thread pool." << std::endl;
// 模拟耗时计算
auto compute_func = [task]() -> ComputationResult {
std::this_thread::sleep_for(std::chrono::milliseconds(100 + (task.id % 5) * 20)); // 模拟不同耗时
long long res = static_cast<long long>(task.data) * 2;
std::cout << "[Worker] Task " << task.id << " computed, result: " << res << std::endl;
return ComputationResult(task.id, res);
};
in_flight_tasks.emplace(task.id, pool.submit(compute_func));
} else {
source_exhausted = true; // 输入流已耗尽
std::cout << "[Processor] Input source exhausted." << std::endl;
}
}
// 2. 检查是否有已完成且按序的结果可以 co_yield
while (completed_results_buffer.count(next_expected_id)) {
ComputationResult result = std::move(completed_results_buffer.at(next_expected_id));
completed_results_buffer.erase(next_expected_id);
std::cout << "[Processor] Yielding result for task " << result.id << std::endl;
co_yield result;
next_expected_id++;
}
// 3. 如果有在途任务,并且当前处理器协程没有任何其他事情可做(例如,输入流已耗尽,或在途任务已满,或没有按序结果),
// 则等待最早的在途任务完成。这提供了隐式背压。
if (!in_flight_tasks.empty()) {
// 找到ID最小的在途任务(最早提交的)
auto it = in_flight_tasks.begin();
// 在这里,我们需要一个机制来 co_await *any* of the in-flight tasks.
// std::map<size_t, Task<ComputationResult>> 不直接支持 await_any。
// 为了简化,我们暂时只 await 最早的那个,这会阻塞直到它完成。
// 更高级的调度器会实现一个 select/poll 机制来 co_await 多个 Task。
// 对于本示例,我们假定等待最早的 in_flight_tasks.begin()->second 完成。
// 实际中,会有一个全局的调度器来管理所有 active 的 Task,并在任何一个 Task 完成时唤醒处理器。
// 临时解决方案:如果 in_flight_tasks 满了,或者输入源耗尽,就等待一个在途任务完成。
// 否则,如果还有空间和输入,就不阻塞,直接进入下一轮循环尝试拉取任务。
if (in_flight_tasks.size() >= max_in_flight_tasks || source_exhausted) {
size_t current_waiting_id = it->first;
std::cout << "[Processor] Waiting for task " << current_waiting_id << " to complete..." << std::endl;
ComputationResult result = co_await it->second; // 等待最早的 Task 完成
std::cout << "[Processor] Task " << result.id << " completed. Buffering." << std::endl;
completed_results_buffer.emplace(result.id, std::move(result));
in_flight_tasks.erase(it);
} else {
// 如果还有空间,并且输入源未耗尽,我们不阻塞,继续循环尝试拉取任务
// 但为了避免忙等,这里需要一个机制让协程在没有立即可做的事情时暂停。
// 暂时,我们可以通过检查是否有任何任务已完成,然后立即处理。
// 否则,如果没有任何任务完成,并且没有新的输入,我们可能需要一个 co_await_any_task() 机制。
// 对于这个简化示例,如果没有任何任务立即完成,并且没有新的输入,
// 协程会空转一轮,然后在下一个循环中再次检查。
// 实际的调度器会在这里插入一个 `co_await scheduler.next_ready_event()` 这样的调用。
}
}
// 关键逻辑:如果没有任何事情可以做(没有新的输入,没有完成的任务,也没有可输出的按序结果),
// 那么此协程必须暂停,否则会忙等。
// 一个更完善的调度器会通过一个全局的 `co_await scheduler.wait_for_any_event()` 来实现。
// 这里,我们通过一个简单的条件来模拟:
if (source_exhausted && in_flight_tasks.empty() && completed_results_buffer.empty()) {
break; // 所有任务处理完毕,退出循环
}
}
std::cout << "[Processor] All results processed and yielded." << std::endl;
co_return;
}
ParallelProcessorGenerator 的复杂性:
上面代码中的注释已经指出,co_await *any* of the in-flight tasks 是一个挑战。C++标准库的协程原语本身不提供 co_await_any 这样的功能。一个完整的异步框架(如 asio)会提供这样的机制。
在我们的简化实现中,if (in_flight_tasks.size() >= max_in_flight_tasks || source_exhausted) 这一段,如果条件满足,我们会 co_await it->second,即等待最早提交的那个任务。这确实能实现背压,但当有很多在途任务时,如果最早的任务迟迟不完成,即使后面的任务已经完成了,也无法被处理和 co_yield。这会导致一定程度的效率损失,因为它不是真正的 await_any。
为了实现更高效的 await_any 行为:
我们需要一个自定义的 Awaitable,它能够监控一个 Task<T> 集合,并在其中任何一个 Task 完成时恢复等待的协程。这通常通过在 TaskPromise 中注册一个回调到调度器来实现,当 Task 完成时,调度器会遍历所有等待它的协程,并唤醒相应的处理器。然而,实现一个完整的 await_any 机制超出了本次讲座的范围,它通常需要一个完整的事件循环/调度器框架。
为了保持示例的简洁性,我们当前的设计将通过定期检查 in_flight_tasks 和 completed_results_buffer 并有条件地 co_await 最早的在途任务来实现。这意味着协程可能会在没有立即可做的事情时短暂地恢复,这在单线程事件循环中不是最优的(可能导致忙等),但在多线程环境中(如我们这里有 ThreadPool 在后台工作)可以接受,或者在更完善的调度器中,它会被替换为一个等待事件的机制。
5. AsyncConsumer:消费结果流
// 异步消费者
Task<void> async_consumer(AsyncGenerator<ComputationResult> results) {
std::cout << "n[Consumer] Starting to consume results." << std::endl;
auto awaitable_next = results.next();
while (co_await awaitable_next) { // co_await 直到有下一个结果或流结束
ComputationResult res = awaitable_next.get_value();
std::cout << "[Consumer] Received result for task " << res.id << ": " << res.result_data << std::endl;
// 模拟消费处理时间
// std::this_thread::sleep_for(std::chrono::milliseconds(50));
awaitable_next = results.next(); // 准备获取下一个
}
std::cout << "[Consumer] All results consumed." << std::endl;
co_return;
}
6. Scheduler:驱动协程执行
由于C++标准库没有内置的协程调度器,我们需要一个简单的 Scheduler 来启动和驱动我们的 Task<void> 协程。
// 简单调度器
class Scheduler {
public:
// 启动一个 Task<void>
void start(Task<void> task) {
// Task<void> 的 await_suspend 默认会恢复其句柄,
// 但如果它已经完成,则不会。
// 我们需要确保 task 协程的 initial_suspend 之后被 resume 一次。
// 通常,协程在 get_return_object 后会执行 initial_suspend,然后暂停。
// 我们需要手动恢复它来启动它。
if (task.handle_ && !task.handle_.done()) {
task.handle_.resume();
}
// 对于 Task<void>,它本身就是驱动者,当它完成时,它会恢复等待它的协程。
// main 函数会 co_await 这个 task。
// 这里只是为了确保它能启动。
}
};
在我们的例子中,main 函数本身将作为最顶层的调度器,直接 co_await async_consumer 返回的 Task<void>。这意味着 main 函数所在的线程将是协程的执行上下文。
7. 整合:main 函数中的编排
// main 函数
int main() {
std::cout << "--- Starting High-Performance Streaming Parallel Computation ---" << std::endl;
// 1. 初始化线程池
size_t num_worker_threads = 4;
ThreadPool thread_pool(num_worker_threads);
std::cout << "ThreadPool initialized with " << num_worker_threads << " threads." << std::endl;
// 2. 创建异步任务源
int total_tasks = 20;
int source_delay_ms = 10; // 模拟源生成任务的延迟
AsyncGenerator<ComputationTask> source_gen = async_task_source(total_tasks, source_delay_ms);
// 3. 创建并行处理器
size_t max_concurrent_tasks = 8; // 最大在途任务数,用于背压
AsyncGenerator<ComputationResult> processor_gen =
parallel_processor_generator(std::move(source_gen), thread_pool, max_concurrent_tasks);
// 4. 创建异步消费者
Task<void> final_consumer_task = async_consumer(std::move(processor_gen));
// 5. 启动调度器并等待最终消费者任务完成
// 在这里,main 作为一个协程驱动者,直接 co_await 最终任务。
// 这意味着 main 函数在等待 final_consumer_task 完成时会暂停。
// 在实际的异步框架中,这里会启动一个事件循环。
std::cout << "n[Main] Starting the final consumer task (and implicitly, the entire pipeline)." << std::endl;
// 为了让 main 函数可以 co_await,它本身也需要是一个协程。
// 但 main 不能是协程。通常的做法是封装在一个立即执行的协程中。
// 或者,对于演示目的,我们可以让 main 阻塞等待一个 future。
// 更好的方式是使用一个驱动器来运行 Task<void>。
// 为了演示,我们创建一个简单的同步等待机制来运行顶层 Task
// 这是一个简化的驱动器,实际应用中会是一个事件循环
std::atomic<bool> task_done = false;
std::mutex m;
std::condition_variable cv;
// 为了让 Task<void> 能够被外部的同步代码等待,我们需要一个桥接。
// 我们可以创建一个新的 TaskPromise,让它的 return_void() 唤醒一个 condition_variable。
struct MainTaskPromise : TaskPromise<void> {
std::condition_variable* cv_ptr;
std::mutex* mutex_ptr;
void return_void() {
TaskPromise<void>::return_void();
if (cv_ptr) {
std::unique_lock<std::mutex> lock(*mutex_ptr);
cv_ptr->notify_one();
}
}
void unhandled_exception() {
TaskPromise<void>::unhandled_exception();
if (cv_ptr) {
std::unique_lock<std::mutex> lock(*mutex_ptr);
cv_ptr->notify_one();
}
}
};
std::coroutine_handle<MainTaskPromise> main_coro_handle;
// 启动一个 lambda 协程来 co_await final_consumer_task,并将其 Promise 与 main 线程的 cv 绑定
auto wrapper_task = [&]() -> Task<void> {
main_coro_handle = std::coroutine_handle<MainTaskPromise>::from_promise(co_await std::suspend_always{});
main_coro_handle.promise().cv_ptr = &cv;
main_coro_handle.promise().mutex_ptr = &m;
co_await final_consumer_task;
co_return;
}();
{
std::unique_lock<std::mutex> lock(m);
// 恢复 wrapper_task 协程,它会立即暂停到其 initial_suspend
wrapper_task.handle_.resume();
// 等待 wrapper_task 协程的 promise (MainTaskPromise) 被完成时唤醒
cv.wait(lock, [&]{ return wrapper_task.handle_.done(); });
}
if (wrapper_task.handle_.promise().exception_ != nullptr) {
std::cout << "[Main] Consumer task finished with exception." << std::endl;
std::rethrow_exception(wrapper_task.handle_.promise().exception_);
} else {
std::cout << "[Main] Consumer task finished successfully." << std::endl;
}
std::cout << "--- All computations completed ---" << std::endl;
return 0;
}
main 函数的驱动方式说明:
由于 main 函数不能直接是一个C++协程,我们通常需要一个外部的“事件循环”或“调度器”来驱动顶层的 Task<void>。在上面的例子中,我创建了一个小的 wrapper_task 协程,它会 co_await final_consumer_task。这个 wrapper_task 的 promise 被修改,使其在完成时通过 std::condition_variable 唤醒 main 线程。这模拟了一个最简单的事件循环,main 线程在等待所有异步操作完成时阻塞。
流程概览表格
| 阶段 | 组件 | 职责 | 关键技术点 |
|---|---|---|---|
| 输入流 | AsyncSourceGenerator |
异步生成 ComputationTask |
co_yield ComputationTask |
| 并行处理 | ThreadPool |
异步执行计算任务 | submit 返回 Task<T>,在后台线程完成 Task |
| 核心逻辑 | ParallelProcessorGenerator |
1. co_await 输入任务 |
AsyncGenerator 的 next() 方法 |
2. 提交任务到 ThreadPool,获取 Task<Result> |
ThreadPool::submit |
||
| 3. 管理在途任务和背压 | std::map<ID, Task<Result>>, max_in_flight |
||
4. 确保结果按序 co_yield |
std::map<ID, Result> 缓冲,next_expected_id |
||
| 输出流 | AsyncConsumer |
异步消费 ComputationResult |
co_await AsyncGenerator 的 next() |
| 调度 | main (简化调度器) |
驱动顶层协程的执行 | std::condition_variable 阻塞等待顶层 Task 完成 |
性能考量与高级话题
- 协程开销: C++协程是无栈的,这意味着它们的内存占用通常比传统线程小。协程帧的分配可能在堆上,但现代编译器和分配器会优化这个过程。对于高性能应用,考虑使用自定义的内存池来管理协程帧的分配。
- 调度器集成: 示例中的调度器非常简陋。在实际生产环境中,您会使用更成熟的异步框架,如
Boost.Asio,libuv或folly::fibers,它们提供了完整的事件循环、I/O多路复用以及更高级的协程调度策略(例如await_any)。 - 背压机制优化: 我们的
ParallelProcessorGenerator通过限制max_in_flight_tasks来实现背压。当达到上限时,它会co_await最早的在途任务。更精细的背压机制可能包括:- 动态调整
max_in_flight_tasks基于系统负载。 - 在
AsyncSourceGenerator层面,如果下游处理器满了,源应该暂停生产。这要求co_yield能够感知下游的就绪状态。
- 动态调整
- 错误处理:
Promise类型中的unhandled_exception()是处理协程内部异常的关键。异常会沿着协程调用链传播,直到被捕获或到达顶层Task的await_resume()。 - 内存管理与生命周期: 协程句柄 (
std::coroutine_handle) 的生命周期管理至关重要。确保在协程不再需要时调用handle_.destroy(),以避免内存泄漏。AsyncGenerator和Task的移动语义和析构函数已经考虑了这一点。 std::future的局限性: 再次强调,std::future不直接支持协程的await_suspend所需的回调注册机制。因此,我们创建了自己的Task<T>类型来弥补这一不足,并实现与ThreadPool的无缝集成。
结语
通过本次讲座,我们深入探讨了C++协程和Async Generator的强大能力。我们从同步生成器开始,逐步过渡到异步模型,并构建了一个基于流的高性能并行计算排队逻辑。这个系统演示了如何有效地结合C++协程、线程池和自定义异步原语,实现非阻塞、带背压且结果有序的数据流处理。
Async Generator 不仅提高了异步代码的可读性,更提供了一种灵活且高效的方式来管理复杂的数据流和并发操作。随着C++协程在标准中的不断演进和更广泛的应用,掌握这些技术将是构建现代高性能C++系统的必备技能。未来的C++异步编程,将因协程而变得更加优雅和强大。