什么是 ‘Async Generator’?利用 C++ 协程实现基于流(Streaming)的高性能并行计算排队逻辑

各位同仁,女士们,先生们,

欢迎来到今天的技术讲座。今天我们将深入探讨一个在现代C++异步编程中日益重要的概念:Async Generator。我们将不仅仅停留在理论层面,更将利用C++协程的强大能力,设计并实现一个基于流(Streaming)的高性能并行计算排队逻辑。这对于处理大规模数据流、构建响应式系统以及优化资源利用率至关重要。

引言:高性能流式处理的挑战与协程的答案

在当今数据密集型应用中,我们经常面临处理海量数据流的挑战。这些数据可能源源不断地从网络、文件系统或其他服务涌入。传统的处理方式,如阻塞I/O、回调函数嵌套("callback hell")或简单的线程池模型,往往暴露出效率低下、难以维护、背压(backpressure)机制缺失等问题。

想象一个场景:您正在构建一个实时数据分析系统,需要从传感器或消息队列持续接收数据包,对每个数据包执行复杂的计算,并将结果按原始顺序输出。

  • 挑战1:吞吐量。 数据包到达速度可能非常快,单一线程无法及时处理。
  • 挑战2:延迟。 即使并行处理,如果处理逻辑是阻塞的,也会导致整个系统响应迟钝。
  • 挑战3:背压。 如果数据生产者速度远超消费者,内存会迅速耗尽。我们需要一种机制让下游压力反馈到上游,从而减缓生产速度。
  • 挑战4:代码可读性与维护性。 复杂的异步逻辑容易导致代码难以理解和调试。

C++20引入的协程(Coroutines)为解决这些问题提供了优雅而强大的工具。协程允许函数在执行过程中暂停(co_await, co_yield),并在稍后从暂停点恢复,而无需保存整个调用栈。这使得编写异步代码变得像编写同步代码一样直观,极大地提高了代码的可读性和可维护性。

Async Generator,顾名思义,是异步版本的生成器。它是一个可以异步地 co_yield 值,同时也可以异步地 co_await 其他异步操作的协程。它将异步操作和按需生成数据流的特性结合起来,是构建高性能、响应式流处理系统的理想选择。

什么是Generator (同步生成器)?

在深入Async Generator之前,我们先快速回顾一下同步生成器(Generator)。生成器是一种特殊的函数,它能够暂停执行并返回一个值(co_yield),然后在下次请求时从上次暂停的地方继续执行。它实现了迭代器模式,但不是一次性生成所有数据,而是按需生成。

核心概念:

  • co_yield: 暂停协程并返回一个值。
  • promise_type: 协程的“骨架”,定义了协程的生命周期行为(何时暂停、何时恢复、如何返回值、如何处理异常等)。
  • std::coroutine_handle: 协程的句柄,用于控制协程的执行(恢复、销毁)。

示例:一个简单的同步Fibonacci生成器

#include <coroutine>
#include <iostream>
#include <optional>
#include <stdexcept>
#include <thread>
#include <deque>
#include <map>
#include <functional>
#include <future>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <vector>

// 1. 同步生成器 Generator<T> 的 Promise 类型
template<typename T>
struct GeneratorPromise {
    T value_; // 存储 co_yield 产生的值
    std::coroutine_handle<> caller_handle_; // 用于恢复调用者(这里是迭代器)

    // 获取返回对象,即 Generator<T>
    auto get_return_object() {
        return Generator<T>{std::coroutine_handle<GeneratorPromise>::from_promise(*this)};
    }

    // 初始暂停:总是暂停,由调用者显式启动
    std::suspend_always initial_suspend() { return {}; }

    // 最终暂停:总是暂停,允许消费者在协程结束后获取最终状态或处理
    std::suspend_always final_suspend() noexcept { return {}; }

    // 处理 co_yield T value
    std::suspend_always yield_value(T value) {
        value_ = std::move(value);
        return {}; // 暂停协程,等待下次迭代器请求
    }

    // 处理 co_return; (通常在生成器中不直接使用,通过协程结束隐式返回void)
    void return_void() {}

    // 异常处理
    void unhandled_exception() {
        // 简单起见,这里直接重新抛出,实际应用中可能更复杂
        throw;
    }
};

// 2. 同步生成器 Generator<T> 类型 (返回对象)
template<typename T>
struct Generator {
    using promise_type = GeneratorPromise<T>;
    std::coroutine_handle<promise_type> handle_;

    // 构造函数
    Generator(std::coroutine_handle<promise_type> h) : handle_(h) {}

    // 移动语义
    Generator(Generator&& other) noexcept : handle_(std::exchange(other.handle_, {})) {}
    Generator& operator=(Generator&& other) noexcept {
        if (this != &other) {
            if (handle_) handle_.destroy();
            handle_ = std::exchange(other.handle_, {});
        }
        return *this;
    }

    // 析构函数:确保协程句柄被销毁
    ~Generator() {
        if (handle_) handle_.destroy();
    }

    // 禁止拷贝
    Generator(const Generator&) = delete;
    Generator& operator=(const Generator&) = delete;

    // 迭代器接口
    struct iterator {
        std::coroutine_handle<promise_type> handle_;

        iterator(std::coroutine_handle<promise_type> h) : handle_(h) {}
        iterator() : handle_(nullptr) {}

        // 前置递增运算符
        iterator& operator++() {
            if (handle_ && !handle_.done()) {
                handle_.resume(); // 恢复协程,生成下一个值
            }
            if (handle_.done()) {
                handle_ = nullptr; // 协程结束,标记为 end 迭代器
            }
            return *this;
        }

        // 解引用运算符
        T operator*() const {
            return handle_.promise().value_;
        }

        // 比较运算符
        bool operator!=(const iterator& other) const {
            return handle_ != other.handle_;
        }
    };

    // begin() 方法
    iterator begin() {
        if (handle_) {
            handle_.resume(); // 启动协程
            if (handle_.done()) {
                return {}; // 如果协程立即结束,返回 end 迭代器
            }
        }
        return iterator{handle_};
    }

    // end() 方法
    iterator end() {
        return {}; // 默认构造的迭代器代表结束
    }
};

// 3. Fibonacci 生成器函数
Generator<int> fibonacci_generator(int count) {
    int a = 0, b = 1;
    for (int i = 0; i < count; ++i) {
        co_yield a; // 暂停并返回当前Fibonacci数
        int next = a + b;
        a = b;
        b = next;
    }
}

/*
int main() {
    std::cout << "Synchronous Fibonacci Generator:" << std::endl;
    for (int value : fibonacci_generator(10)) {
        std::cout << value << " ";
    }
    std::cout << std::endl;
    return 0;
}
*/

同步生成器总结:

  • 通过 co_yield 暂停并返回一个值。
  • 通过 resume() 恢复执行。
  • 本质上是同步的,每次 operator++ 都会立即执行协程直到下一个 co_yield 或结束。
  • 适用于按需生成数据,但数据源本身是同步的或已经准备好的场景。

从同步到异步:Async Generator的诞生

同步生成器在数据源就绪的情况下表现出色,但如果数据生成过程本身是异步的呢?例如,从网络接收数据、等待数据库查询结果、或者等待另一个并行任务完成。这时,简单地 co_yield 一个值然后立即恢复协程就会导致阻塞,或者无法充分利用异步I/O的优势。

Async Generator 应运而生。它将 Generator 的按需生成特性与异步编程的非阻塞特性结合起来。一个Async Generator可以:

  • co_await 一个异步操作,等待其完成。
  • co_yield 一个值,这个值的生成可能涉及异步操作。当值准备好后,它会通知消费者可以 co_await 并获取下一个值了。

核心挑战:

  • 如何让 co_yield 暂停协程,并让消费者在值准备好时异步地 co_await 获得它?
  • 如何管理协程的生命周期和状态,使其能够与事件循环或线程池无缝集成?

构建 Async Generator 的核心组件

我们将定义一个 AsyncGenerator<T> 类型,它代表一个异步的值流。这个 AsyncGenerator<T> 将是一个Awaitable,意味着消费者可以 co_await 它来获取下一个值。

1. Task<T>:基础异步操作的表示

为了更好地集成协程和异步操作(例如线程池任务),我们需要一个统一的 Awaitable 类型。std::future 在与协程集成时存在局限性(无法直接注册回调),因此我们通常会定义自己的 Task<T> 类型。

Task<T> 是一个轻量级的异步操作句柄,它封装了对一个异步结果的承诺。当结果可用时,它能够恢复等待它的协程。

// 辅助类:包装协程句柄,用于在完成时恢复
struct CoroutineResumer {
    std::coroutine_handle<> handle_to_resume;
    void operator()() {
        if (handle_to_resume) {
            handle_to_resume.resume();
        }
    }
};

// Task 的 Promise 类型
template<typename T>
struct TaskPromise {
    std::optional<T> result_;
    std::exception_ptr exception_;
    std::coroutine_handle<> awaiting_coroutine_; // 等待此任务完成的协程

    auto get_return_object() {
        return Task<T>{std::coroutine_handle<TaskPromise>::from_promise(*this)};
    }

    std::suspend_always initial_suspend() { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }

    void return_value(T value) {
        result_ = std::move(value);
        if (awaiting_coroutine_) {
            awaiting_coroutine_.resume(); // 任务完成,恢复等待者
        }
    }

    void unhandled_exception() {
        exception_ = std::current_exception();
        if (awaiting_coroutine_) {
            awaiting_coroutine_.resume(); // 异常发生,恢复等待者
        }
    }
};

// Task<T> 类型:协程返回的对象,也是一个 awaitable
template<typename T>
struct Task {
    using promise_type = TaskPromise<T>;
    std::coroutine_handle<promise_type> handle_;

    Task(std::coroutine_handle<promise_type> h) : handle_(h) {}
    Task(Task&& other) noexcept : handle_(std::exchange(other.handle_, {})) {}
    Task& operator=(Task&& other) noexcept {
        if (this != &other) {
            if (handle_) handle_.destroy();
            handle_ = std::exchange(other.handle_, {});
        }
        return *this;
    }
    ~Task() { if (handle_) handle_.destroy(); }

    bool await_ready() const {
        return !handle_ || handle_.promise().result_.has_value() || handle_.promise().exception_ != nullptr;
    }

    // 当任务尚未完成时,暂停当前协程,并将当前协程的句柄存储起来
    void await_suspend(std::coroutine_handle<> awaiting_coroutine) {
        handle_.promise().awaiting_coroutine_ = awaiting_coroutine;
        // 如果任务协程还没有开始执行,现在开始
        if (handle_ && !handle_.done() && !handle_.promise().result_.has_value() && handle_.promise().exception_ == nullptr) {
            handle_.resume();
        }
    }

    T await_resume() {
        if (handle_.promise().exception_ != nullptr) {
            std::rethrow_exception(handle_.promise().exception_);
        }
        return std::move(handle_.promise().result_.value());
    }
};

// 特化 Task<void>
template<>
struct Task<void> {
    using promise_type = TaskPromise<void>;
    std::coroutine_handle<promise_type> handle_;

    Task(std::coroutine_handle<promise_type> h) : handle_(h) {}
    Task(Task&& other) noexcept : handle_(std::exchange(other.handle_, {})) {}
    Task& operator=(Task&& other) noexcept {
        if (this != &other) {
            if (handle_) handle_.destroy();
            handle_ = std::exchange(other.handle_, {});
        }
        return *this;
    }
    ~Task() { if (handle_) handle_.destroy(); }

    bool await_ready() const {
        return !handle_ || handle_.promise().exception_ != nullptr; // For void task, only check exception or if already done
    }

    void await_suspend(std::coroutine_handle<> awaiting_coroutine) {
        handle_.promise().awaiting_coroutine_ = awaiting_coroutine;
        if (handle_ && !handle_.done() && handle_.promise().exception_ == nullptr) {
            handle_.resume();
        }
    }

    void await_resume() {
        if (handle_.promise().exception_ != nullptr) {
            std::rethrow_exception(handle_.promise().exception_);
        }
    }
};

// TaskPromise<void> 需要一个 return_void()
template<>
struct TaskPromise<void> {
    std::exception_ptr exception_;
    std::coroutine_handle<> awaiting_coroutine_;

    auto get_return_object() {
        return Task<void>{std::coroutine_handle<TaskPromise>::from_promise(*this)};
    }

    std::suspend_always initial_suspend() { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }

    void return_void() {
        if (awaiting_coroutine_) {
            awaiting_coroutine_.resume();
        }
    }

    void unhandled_exception() {
        exception_ = std::current_exception();
        if (awaiting_coroutine_) {
            awaiting_coroutine_.resume();
        }
    }
};

Task<T>作为基础,我们将用它来包装线程池的计算结果,以及作为AsyncGeneratornext()方法的返回类型。

2. AsyncGenerator<T>:异步生成器本身

AsyncGenerator<T>promise_type 将会复杂一些,因为它不仅要处理 co_yield,还要与 Task<T> 机制配合,实现异步暂停和恢复。

// AsyncGenerator 的 Promise 类型
template<typename T>
struct AsyncGeneratorPromise {
    std::optional<T> value_; // co_yield 产生的值
    std::exception_ptr exception_;
    std::coroutine_handle<> awaiting_consumer_; // 等待下一个值的消费者协程
    std::coroutine_handle<> producer_handle_; // 协程句柄自身

    AsyncGeneratorPromise() : producer_handle_(nullptr) {}

    auto get_return_object() {
        producer_handle_ = std::coroutine_handle<AsyncGeneratorPromise>::from_promise(*this);
        return AsyncGenerator<T>{producer_handle_};
    }

    // 初始暂停:总是暂停,等待消费者调用 next() 启动
    std::suspend_always initial_suspend() { return {}; }

    // 最终暂停:总是暂停,允许消费者在协程结束后检查状态
    std::suspend_always final_suspend() noexcept { return {}; }

    // 处理 co_yield T value
    // 当值准备好时,存储值,并暂停,等待消费者来取
    std::suspend_always yield_value(T value) {
        value_ = std::move(value);
        if (awaiting_consumer_) {
            awaiting_consumer_.resume(); // 唤醒消费者
        }
        return {}; // 暂停生成器协程
    }

    // 处理 co_return; 标记流结束
    void return_void() {
        value_.reset(); // 清空值,表示结束
        if (awaiting_consumer_) {
            awaiting_consumer_.resume(); // 唤醒消费者,通知流已结束
        }
    }

    void unhandled_exception() {
        exception_ = std::current_exception();
        if (awaiting_consumer_) {
            awaiting_consumer_.resume();
        }
    }
};

// AsyncGenerator<T> 类型 (返回对象)
template<typename T>
struct AsyncGenerator {
    using promise_type = AsyncGeneratorPromise<T>;
    std::coroutine_handle<promise_type> handle_;

    AsyncGenerator(std::coroutine_handle<promise_type> h) : handle_(h) {}
    AsyncGenerator(AsyncGenerator&& other) noexcept : handle_(std::exchange(other.handle_, {})) {}
    AsyncGenerator& operator=(AsyncGenerator&& other) noexcept {
        if (this != &other) {
            if (handle_) handle_.destroy();
            handle_ = std::exchange(other.handle_, {});
        }
        return *this;
    }
    ~AsyncGenerator() { if (handle_) handle_.destroy(); }

    AsyncGenerator(const AsyncGenerator&) = delete;
    AsyncGenerator& operator=(const AsyncGenerator&) = delete;

    // Awaitable 结构,用于 co_await generator.next()
    struct NextAwaitable {
        AsyncGenerator<T>& generator_;
        std::optional<T> value_; // 存储获取到的值
        bool is_done_;

        NextAwaitable(AsyncGenerator<T>& gen) : generator_(gen), is_done_(false) {}

        bool await_ready() const {
            if (!generator_.handle_ || generator_.handle_.done()) {
                return true; // 协程已结束
            }
            return generator_.handle_.promise().value_.has_value() || generator_.handle_.promise().exception_ != nullptr;
        }

        void await_suspend(std::coroutine_handle<> awaiting_coroutine) {
            generator_.handle_.promise().awaiting_consumer_ = awaiting_coroutine;
            // 恢复生成器以产生下一个值
            generator_.handle_.resume();
        }

        bool await_resume() {
            if (generator_.handle_.promise().exception_ != nullptr) {
                std::rethrow_exception(generator_.handle_.promise().exception_);
            }
            if (generator_.handle_.promise().value_.has_value()) {
                value_ = std::move(generator_.handle_.promise().value_.value());
                generator_.handle_.promise().value_.reset(); // 值已被消费,清空
                return true; // 成功获取到值
            }
            is_done_ = true; // 流已结束
            return false;
        }

        // 获取值的方法
        T get_value() {
            if (value_.has_value()) {
                return std::move(value_.value());
            }
            throw std::runtime_error("No value available or generator finished.");
        }

        // 检查流是否结束
        bool done() const { return is_done_; }
    };

    NextAwaitable next() {
        return NextAwaitable(*this);
    }
};

AsyncGenerator 的设计中,next() 方法返回一个 NextAwaitable 对象。消费者会 co_await 这个 NextAwaitable

  • await_suspend 被调用时,它会存储消费者协程的句柄,然后 resume() 生成器协程。
  • 生成器协程执行,直到 co_yield 一个值或 co_return (表示结束)。
  • 当生成器 co_yield 值时,它会调用 awaiting_consumer_.resume() 唤醒消费者。
  • 消费者被唤醒后,其 await_resume() 方法被调用,它会从 promise 中取出值。

基于流的高性能并行计算排队逻辑

现在,我们将利用 AsyncGeneratorTask 来实现一个高性能的并行计算排队逻辑。

场景描述:
我们有一个异步的输入流,源源不断地提供 ComputationTask。我们需要将这些任务提交给一个线程池进行并行处理,然后将处理结果收集起来,并以原始任务的顺序通过另一个异步流 co_yield 出去。整个过程必须是非阻塞的,并具备背压机制。

核心组件:

  1. ComputationTaskComputationResult: 定义计算任务和结果的数据结构。
  2. ThreadPool: 用于并行执行计算任务的线程池。它将返回我们的 Task<T> 类型。
  3. AsyncSourceGenerator: 模拟异步的任务输入流。
  4. ParallelProcessorGenerator: 核心逻辑,它将 co_await 输入任务,提交给线程池,管理在途任务,实现背压和结果排序,然后 co_yield 出结果。
  5. AsyncConsumer: 消费最终的异步结果流。
  6. Scheduler: 一个简单的调度器,用于驱动所有协程的执行。

1. ComputationTaskComputationResult

// 计算任务结构
struct ComputationTask {
    size_t id;
    int data; // 模拟任务数据
    // 构造函数
    ComputationTask(size_t i, int d) : id(i), data(d) {}
};

// 计算结果结构
struct ComputationResult {
    size_t id;
    long long result_data; // 模拟结果数据
    // 构造函数
    ComputationResult(size_t i, long long r) : id(i), result_data(r) {}
};

2. ThreadPool:返回 Task<T> 的线程池

我们的线程池将接收一个函数对象,并在一个工作线程上执行它。执行完成后,它会通过我们自定义的 Task<T> 机制来恢复等待结果的协程。

class ThreadPool {
public:
    ThreadPool(size_t num_threads) : stop_(false) {
        for (size_t i = 0; i < num_threads; ++i) {
            workers_.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex_);
                        condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
                        if (stop_ && tasks_.empty()) {
                            return;
                        }
                        task = std::move(tasks_.front());
                        tasks_.pop_front();
                    }
                    task(); // 执行任务
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (std::thread& worker : workers_) {
            worker.join();
        }
    }

    // 提交一个任务,返回一个 Task<T>
    template<typename F, typename... Args>
    auto submit(F&& f, Args&&... args) -> Task<std::invoke_result_t<F, Args...>> {
        using result_type = std::invoke_result_t<F, Args...>;

        // 创建 Task 对应的 Promise
        auto task_promise = new TaskPromise<result_type>();
        Task<result_type> task_obj{std::coroutine_handle<TaskPromise<result_type>>::from_promise(*task_promise)};

        // 包装实际的函数,在执行后设置 Promise 的值
        std::function<void()> wrapped_task = [
            func = std::bind(std::forward<F>(f), std::forward<Args>(args)...),
            promise_ptr = task_promise
        ]() mutable {
            try {
                if constexpr (std::is_void_v<result_type>) {
                    func();
                    promise_ptr->return_void();
                } else {
                    promise_ptr->return_value(func());
                }
            } catch (...) {
                promise_ptr->unhandled_exception();
            }
        };

        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            tasks_.emplace_back(std::move(wrapped_task));
        }
        condition_.notify_one();
        return task_obj;
    }

private:
    std::vector<std::thread> workers_;
    std::deque<std::function<void()>> tasks_;
    std::mutex queue_mutex_;
    std::condition_variable condition_;
    bool stop_;
};

这里的 ThreadPool::submit 返回 Task<T>。当 submit 返回时,计算任务可能尚未开始执行。调用者可以 co_await 这个 Task<T>,当线程池完成计算并设置了 Promise 的结果时,等待的协程就会被恢复。

3. AsyncSourceGenerator:模拟输入流

这个生成器模拟一个外部异步数据源,例如从网络读取或从文件中分块读取。它会 co_yield ComputationTask 对象。

// 模拟一个异步任务源
AsyncGenerator<ComputationTask> async_task_source(int num_tasks, int delay_ms) {
    for (size_t i = 0; i < num_tasks; ++i) {
        // 模拟异步I/O或等待
        std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); // 实际应 co_await 异步操作
        std::cout << "[Source] Yielding task " << i << std::endl;
        co_yield ComputationTask(i, static_cast<int>(i * 10));
    }
    std::cout << "[Source] All tasks yielded." << std::endl;
    co_return;
}

注意: 这里的 std::this_thread::sleep_for 在实际的异步生成器中应该替换为真正的异步等待(例如 co_await 一个定时器或网络读取操作)。为了简化示例,我们使用同步睡眠来模拟延迟。在一个真正的事件循环中,std::this_thread::sleep_for 会阻塞整个调度器。

4. ParallelProcessorGenerator:核心并行处理与排队逻辑

这是我们系统的核心。它将从 AsyncSourceGeneratorco_await 任务,将它们提交给 ThreadPool,然后管理在途任务的生命周期和结果的排序,最终将结果 co_yield 出去。

它需要实现:

  • 并行性:同时处理多个任务。
  • 背压:限制同时在线程池中执行的任务数量,防止过载。
  • 结果排序:确保 co_yield 出的结果与输入任务的 id 顺序一致。
// 核心并行处理器,带有背压和结果排序
AsyncGenerator<ComputationResult> parallel_processor_generator(
    AsyncGenerator<ComputationTask> input_tasks,
    ThreadPool& pool,
    size_t max_in_flight_tasks)
{
    // 存储正在执行的任务及其对应的 Task 对象
    std::map<size_t, Task<ComputationResult>> in_flight_tasks;
    // 存储已经完成但尚未按序 co_yield 的结果
    std::map<size_t, ComputationResult> completed_results_buffer;
    size_t next_expected_id = 0; // 下一个期望输出的任务ID

    bool source_exhausted = false;

    while (!source_exhausted || !in_flight_tasks.empty() || !completed_results_buffer.empty()) {
        // 1. 尝试从输入流获取新任务并提交到线程池
        // 只有当“在途任务”数量未达到上限且输入流未耗尽时才拉取新任务
        if (!source_exhausted && in_flight_tasks.size() < max_in_flight_tasks) {
            auto next_task_awaitable = input_tasks.next();
            if (co_await next_task_awaitable) { // 尝试获取下一个任务
                ComputationTask task = next_task_awaitable.get_value();
                std::cout << "[Processor] Submitting task " << task.id << " to thread pool." << std::endl;
                // 模拟耗时计算
                auto compute_func = [task]() -> ComputationResult {
                    std::this_thread::sleep_for(std::chrono::milliseconds(100 + (task.id % 5) * 20)); // 模拟不同耗时
                    long long res = static_cast<long long>(task.data) * 2;
                    std::cout << "[Worker] Task " << task.id << " computed, result: " << res << std::endl;
                    return ComputationResult(task.id, res);
                };
                in_flight_tasks.emplace(task.id, pool.submit(compute_func));
            } else {
                source_exhausted = true; // 输入流已耗尽
                std::cout << "[Processor] Input source exhausted." << std::endl;
            }
        }

        // 2. 检查是否有已完成且按序的结果可以 co_yield
        while (completed_results_buffer.count(next_expected_id)) {
            ComputationResult result = std::move(completed_results_buffer.at(next_expected_id));
            completed_results_buffer.erase(next_expected_id);
            std::cout << "[Processor] Yielding result for task " << result.id << std::endl;
            co_yield result;
            next_expected_id++;
        }

        // 3. 如果有在途任务,并且当前处理器协程没有任何其他事情可做(例如,输入流已耗尽,或在途任务已满,或没有按序结果),
        //    则等待最早的在途任务完成。这提供了隐式背压。
        if (!in_flight_tasks.empty()) {
            // 找到ID最小的在途任务(最早提交的)
            auto it = in_flight_tasks.begin();
            // 在这里,我们需要一个机制来 co_await *any* of the in-flight tasks.
            // std::map<size_t, Task<ComputationResult>> 不直接支持 await_any。
            // 为了简化,我们暂时只 await 最早的那个,这会阻塞直到它完成。
            // 更高级的调度器会实现一个 select/poll 机制来 co_await 多个 Task。
            // 对于本示例,我们假定等待最早的 in_flight_tasks.begin()->second 完成。
            // 实际中,会有一个全局的调度器来管理所有 active 的 Task,并在任何一个 Task 完成时唤醒处理器。

            // 临时解决方案:如果 in_flight_tasks 满了,或者输入源耗尽,就等待一个在途任务完成。
            // 否则,如果还有空间和输入,就不阻塞,直接进入下一轮循环尝试拉取任务。
            if (in_flight_tasks.size() >= max_in_flight_tasks || source_exhausted) {
                size_t current_waiting_id = it->first;
                std::cout << "[Processor] Waiting for task " << current_waiting_id << " to complete..." << std::endl;
                ComputationResult result = co_await it->second; // 等待最早的 Task 完成
                std::cout << "[Processor] Task " << result.id << " completed. Buffering." << std::endl;
                completed_results_buffer.emplace(result.id, std::move(result));
                in_flight_tasks.erase(it);
            } else {
                // 如果还有空间,并且输入源未耗尽,我们不阻塞,继续循环尝试拉取任务
                // 但为了避免忙等,这里需要一个机制让协程在没有立即可做的事情时暂停。
                // 暂时,我们可以通过检查是否有任何任务已完成,然后立即处理。
                // 否则,如果没有任何任务完成,并且没有新的输入,我们可能需要一个 co_await_any_task() 机制。
                // 对于这个简化示例,如果没有任何任务立即完成,并且没有新的输入,
                // 协程会空转一轮,然后在下一个循环中再次检查。
                // 实际的调度器会在这里插入一个 `co_await scheduler.next_ready_event()` 这样的调用。
            }
        }

        // 关键逻辑:如果没有任何事情可以做(没有新的输入,没有完成的任务,也没有可输出的按序结果),
        // 那么此协程必须暂停,否则会忙等。
        // 一个更完善的调度器会通过一个全局的 `co_await scheduler.wait_for_any_event()` 来实现。
        // 这里,我们通过一个简单的条件来模拟:
        if (source_exhausted && in_flight_tasks.empty() && completed_results_buffer.empty()) {
            break; // 所有任务处理完毕,退出循环
        }
    }
    std::cout << "[Processor] All results processed and yielded." << std::endl;
    co_return;
}

ParallelProcessorGenerator 的复杂性:
上面代码中的注释已经指出,co_await *any* of the in-flight tasks 是一个挑战。C++标准库的协程原语本身不提供 co_await_any 这样的功能。一个完整的异步框架(如 asio)会提供这样的机制。
在我们的简化实现中,if (in_flight_tasks.size() >= max_in_flight_tasks || source_exhausted) 这一段,如果条件满足,我们会 co_await it->second,即等待最早提交的那个任务。这确实能实现背压,但当有很多在途任务时,如果最早的任务迟迟不完成,即使后面的任务已经完成了,也无法被处理和 co_yield。这会导致一定程度的效率损失,因为它不是真正的 await_any

为了实现更高效的 await_any 行为:
我们需要一个自定义的 Awaitable,它能够监控一个 Task<T> 集合,并在其中任何一个 Task 完成时恢复等待的协程。这通常通过在 TaskPromise 中注册一个回调到调度器来实现,当 Task 完成时,调度器会遍历所有等待它的协程,并唤醒相应的处理器。然而,实现一个完整的 await_any 机制超出了本次讲座的范围,它通常需要一个完整的事件循环/调度器框架。

为了保持示例的简洁性,我们当前的设计将通过定期检查 in_flight_taskscompleted_results_buffer 并有条件地 co_await 最早的在途任务来实现。这意味着协程可能会在没有立即可做的事情时短暂地恢复,这在单线程事件循环中不是最优的(可能导致忙等),但在多线程环境中(如我们这里有 ThreadPool 在后台工作)可以接受,或者在更完善的调度器中,它会被替换为一个等待事件的机制。

5. AsyncConsumer:消费结果流

// 异步消费者
Task<void> async_consumer(AsyncGenerator<ComputationResult> results) {
    std::cout << "n[Consumer] Starting to consume results." << std::endl;
    auto awaitable_next = results.next();
    while (co_await awaitable_next) { // co_await 直到有下一个结果或流结束
        ComputationResult res = awaitable_next.get_value();
        std::cout << "[Consumer] Received result for task " << res.id << ": " << res.result_data << std::endl;
        // 模拟消费处理时间
        // std::this_thread::sleep_for(std::chrono::milliseconds(50));
        awaitable_next = results.next(); // 准备获取下一个
    }
    std::cout << "[Consumer] All results consumed." << std::endl;
    co_return;
}

6. Scheduler:驱动协程执行

由于C++标准库没有内置的协程调度器,我们需要一个简单的 Scheduler 来启动和驱动我们的 Task<void> 协程。

// 简单调度器
class Scheduler {
public:
    // 启动一个 Task<void>
    void start(Task<void> task) {
        // Task<void> 的 await_suspend 默认会恢复其句柄,
        // 但如果它已经完成,则不会。
        // 我们需要确保 task 协程的 initial_suspend 之后被 resume 一次。
        // 通常,协程在 get_return_object 后会执行 initial_suspend,然后暂停。
        // 我们需要手动恢复它来启动它。
        if (task.handle_ && !task.handle_.done()) {
            task.handle_.resume();
        }
        // 对于 Task<void>,它本身就是驱动者,当它完成时,它会恢复等待它的协程。
        // main 函数会 co_await 这个 task。
        // 这里只是为了确保它能启动。
    }
};

在我们的例子中,main 函数本身将作为最顶层的调度器,直接 co_await async_consumer 返回的 Task<void>。这意味着 main 函数所在的线程将是协程的执行上下文。

7. 整合:main 函数中的编排

// main 函数
int main() {
    std::cout << "--- Starting High-Performance Streaming Parallel Computation ---" << std::endl;

    // 1. 初始化线程池
    size_t num_worker_threads = 4;
    ThreadPool thread_pool(num_worker_threads);
    std::cout << "ThreadPool initialized with " << num_worker_threads << " threads." << std::endl;

    // 2. 创建异步任务源
    int total_tasks = 20;
    int source_delay_ms = 10; // 模拟源生成任务的延迟
    AsyncGenerator<ComputationTask> source_gen = async_task_source(total_tasks, source_delay_ms);

    // 3. 创建并行处理器
    size_t max_concurrent_tasks = 8; // 最大在途任务数,用于背压
    AsyncGenerator<ComputationResult> processor_gen = 
        parallel_processor_generator(std::move(source_gen), thread_pool, max_concurrent_tasks);

    // 4. 创建异步消费者
    Task<void> final_consumer_task = async_consumer(std::move(processor_gen));

    // 5. 启动调度器并等待最终消费者任务完成
    // 在这里,main 作为一个协程驱动者,直接 co_await 最终任务。
    // 这意味着 main 函数在等待 final_consumer_task 完成时会暂停。
    // 在实际的异步框架中,这里会启动一个事件循环。
    std::cout << "n[Main] Starting the final consumer task (and implicitly, the entire pipeline)." << std::endl;

    // 为了让 main 函数可以 co_await,它本身也需要是一个协程。
    // 但 main 不能是协程。通常的做法是封装在一个立即执行的协程中。
    // 或者,对于演示目的,我们可以让 main 阻塞等待一个 future。
    // 更好的方式是使用一个驱动器来运行 Task<void>。

    // 为了演示,我们创建一个简单的同步等待机制来运行顶层 Task
    // 这是一个简化的驱动器,实际应用中会是一个事件循环
    std::atomic<bool> task_done = false;
    std::mutex m;
    std::condition_variable cv;

    // 为了让 Task<void> 能够被外部的同步代码等待,我们需要一个桥接。
    // 我们可以创建一个新的 TaskPromise,让它的 return_void() 唤醒一个 condition_variable。
    struct MainTaskPromise : TaskPromise<void> {
        std::condition_variable* cv_ptr;
        std::mutex* mutex_ptr;

        void return_void() {
            TaskPromise<void>::return_void();
            if (cv_ptr) {
                std::unique_lock<std::mutex> lock(*mutex_ptr);
                cv_ptr->notify_one();
            }
        }
        void unhandled_exception() {
            TaskPromise<void>::unhandled_exception();
            if (cv_ptr) {
                std::unique_lock<std::mutex> lock(*mutex_ptr);
                cv_ptr->notify_one();
            }
        }
    };

    std::coroutine_handle<MainTaskPromise> main_coro_handle;

    // 启动一个 lambda 协程来 co_await final_consumer_task,并将其 Promise 与 main 线程的 cv 绑定
    auto wrapper_task = [&]() -> Task<void> {
        main_coro_handle = std::coroutine_handle<MainTaskPromise>::from_promise(co_await std::suspend_always{});
        main_coro_handle.promise().cv_ptr = &cv;
        main_coro_handle.promise().mutex_ptr = &m;
        co_await final_consumer_task;
        co_return;
    }();

    {
        std::unique_lock<std::mutex> lock(m);
        // 恢复 wrapper_task 协程,它会立即暂停到其 initial_suspend
        wrapper_task.handle_.resume(); 
        // 等待 wrapper_task 协程的 promise (MainTaskPromise) 被完成时唤醒
        cv.wait(lock, [&]{ return wrapper_task.handle_.done(); });
    }

    if (wrapper_task.handle_.promise().exception_ != nullptr) {
        std::cout << "[Main] Consumer task finished with exception." << std::endl;
        std::rethrow_exception(wrapper_task.handle_.promise().exception_);
    } else {
        std::cout << "[Main] Consumer task finished successfully." << std::endl;
    }

    std::cout << "--- All computations completed ---" << std::endl;

    return 0;
}

main 函数的驱动方式说明:
由于 main 函数不能直接是一个C++协程,我们通常需要一个外部的“事件循环”或“调度器”来驱动顶层的 Task<void>。在上面的例子中,我创建了一个小的 wrapper_task 协程,它会 co_await final_consumer_task。这个 wrapper_taskpromise 被修改,使其在完成时通过 std::condition_variable 唤醒 main 线程。这模拟了一个最简单的事件循环,main 线程在等待所有异步操作完成时阻塞。

流程概览表格

阶段 组件 职责 关键技术点
输入流 AsyncSourceGenerator 异步生成 ComputationTask co_yield ComputationTask
并行处理 ThreadPool 异步执行计算任务 submit 返回 Task<T>,在后台线程完成 Task
核心逻辑 ParallelProcessorGenerator 1. co_await 输入任务 AsyncGeneratornext() 方法
2. 提交任务到 ThreadPool,获取 Task<Result> ThreadPool::submit
3. 管理在途任务和背压 std::map<ID, Task<Result>>, max_in_flight
4. 确保结果按序 co_yield std::map<ID, Result> 缓冲,next_expected_id
输出流 AsyncConsumer 异步消费 ComputationResult co_await AsyncGeneratornext()
调度 main (简化调度器) 驱动顶层协程的执行 std::condition_variable 阻塞等待顶层 Task 完成

性能考量与高级话题

  1. 协程开销: C++协程是无栈的,这意味着它们的内存占用通常比传统线程小。协程帧的分配可能在堆上,但现代编译器和分配器会优化这个过程。对于高性能应用,考虑使用自定义的内存池来管理协程帧的分配。
  2. 调度器集成: 示例中的调度器非常简陋。在实际生产环境中,您会使用更成熟的异步框架,如 Boost.Asio, libuvfolly::fibers,它们提供了完整的事件循环、I/O多路复用以及更高级的协程调度策略(例如 await_any)。
  3. 背压机制优化: 我们的 ParallelProcessorGenerator 通过限制 max_in_flight_tasks 来实现背压。当达到上限时,它会 co_await 最早的在途任务。更精细的背压机制可能包括:
    • 动态调整 max_in_flight_tasks 基于系统负载。
    • AsyncSourceGenerator 层面,如果下游处理器满了,源应该暂停生产。这要求 co_yield 能够感知下游的就绪状态。
  4. 错误处理: Promise 类型中的 unhandled_exception() 是处理协程内部异常的关键。异常会沿着协程调用链传播,直到被捕获或到达顶层 Taskawait_resume()
  5. 内存管理与生命周期: 协程句柄 (std::coroutine_handle) 的生命周期管理至关重要。确保在协程不再需要时调用 handle_.destroy(),以避免内存泄漏。AsyncGeneratorTask 的移动语义和析构函数已经考虑了这一点。
  6. std::future 的局限性: 再次强调,std::future 不直接支持协程的 await_suspend 所需的回调注册机制。因此,我们创建了自己的 Task<T> 类型来弥补这一不足,并实现与 ThreadPool 的无缝集成。

结语

通过本次讲座,我们深入探讨了C++协程和Async Generator的强大能力。我们从同步生成器开始,逐步过渡到异步模型,并构建了一个基于流的高性能并行计算排队逻辑。这个系统演示了如何有效地结合C++协程、线程池和自定义异步原语,实现非阻塞、带背压且结果有序的数据流处理。

Async Generator 不仅提高了异步代码的可读性,更提供了一种灵活且高效的方式来管理复杂的数据流和并发操作。随着C++协程在标准中的不断演进和更广泛的应用,掌握这些技术将是构建现代高性能C++系统的必备技能。未来的C++异步编程,将因协程而变得更加优雅和强大。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注