什么是 ‘Structured Concurrency’ (结构化并发)?在 C++ 中如何保证父协程退出前子协程全部关闭?

各位同仁,各位对现代C++并发编程充满热情的开发者们,下午好!

今天,我们将深入探讨一个在现代并发编程领域日益受到关注,并被视为最佳实践的核心概念——“结构化并发”(Structured Concurrency)。尤其在C++20协程(Coroutines)的引入之后,如何有效地管理异步任务的生命周期、错误处理和取消,成为了我们必须面对的挑战。本次讲座,我将围绕C++中的结构化并发,为您详细阐述其理念、实现机制,以及如何确保父协程在退出前,所有子协程都能被妥善关闭。

为什么我们需要结构化并发?

在深入结构化并发的细节之前,让我们先回顾一下传统并发模型所带来的挑战。长期以来,并发编程一直是一个充满陷阱的领域。

我们来看一个简单的场景:你有一个主任务,需要启动几个子任务并行执行,然后等待它们全部完成,或者在某个子任务失败时,能够取消其他所有子任务并向上报告错误。在传统的线程模型中,这通常意味着:

  1. 手动管理线程生命周期:使用 std::thread,你必须显式地 join()detach() 线程。忘记 join() 会导致资源泄露(std::terminate),而 detach() 又会使得子任务的生命周期与父任务完全脱钩,难以追踪和管理。
  2. 复杂的错误传播:线程之间默认不共享异常。一个子线程抛出的异常,如果不在其内部捕获处理,通常只会导致该线程终止,而不会自动传播到父线程。这使得错误检测和恢复变得异常困难。
  3. 困难的取消机制:取消一个正在运行的线程通常需要复杂的合作机制(如轮询 std::atomic<bool> 标志),而且很难保证所有资源都能被安全释放。
  4. 资源泄露风险:如果子任务在执行过程中打开了文件、网络连接或分配了内存,但在父任务完成前未能妥善关闭,就可能导致资源泄露。
  5. 难以推理和调试:当并发操作的生命周期交织在一起,并且没有明确的父子关系时,理解程序的执行流程,特别是追踪 bug,会变得异常困难。

这些问题,使得并发编程成为了一种高风险、高复杂度的活动。我们需要一种更高级别的抽象,一种能够将并发操作的复杂性封装起来,使其像我们熟悉的顺序编程一样易于推理和管理。这就是结构化并发诞生的原因。

什么是结构化并发?

结构化并发是一种编程范式,旨在将并发操作的生命周期与代码的结构(通常是词法作用域)绑定起来。它的核心思想是:任何被启动的并发操作,都必须是某个明确的“父”操作的“子”操作,并且子操作的生命周期不能超出其父操作的生命周期。

我们可以将结构化并发类比为顺序编程中的函数调用栈。当函数A调用函数B时,函数B是函数A的“子”函数。函数B必须在函数A返回之前完成执行(或抛出异常)。如果函数B抛出异常,它会传播到函数A。这种清晰的层次结构,使得顺序编程非常容易理解和调试。结构化并发的目标,就是将这种清晰性带入并发编程领域。

结构化并发的关键特性:

  1. 父子关系与作用域绑定:并发任务在其创建的词法作用域内执行。当该作用域退出时,所有由该作用域启动的子任务都必须完成、被取消或终止。
  2. 生命周期管理:子任务的生命周期受父作用域的严格约束。父作用域负责等待所有子任务完成,从而防止“悬空”任务和资源泄露。
  3. 错误传播:子任务中发生的任何错误都会被捕获并传播到父作用域。父作用域可以决定如何处理这些错误(例如,重新抛出、记录日志或尝试恢复)。
  4. 取消传播:如果父作用域被取消,其取消信号应能自动传播到所有子任务,促使它们优雅地终止。
  5. 确定性完成:父作用域在退出前,能够确定性地知道所有子任务的状态(完成、失败或被取消)。

结构化并发带来的好处:

  • 提高可靠性:通过强制生命周期管理和错误传播,减少了资源泄露、未处理异常和僵尸任务的风险。
  • 简化错误处理:错误不再是“防火墙”问题,而是沿着父子链传播,使得集中式错误处理成为可能。
  • 易于取消:统一的取消机制使得在复杂任务中实现优雅终止变得简单。
  • 更好的可读性和可维护性:代码的并发结构与逻辑结构相匹配,更容易理解和推理。
  • 简化调试:由于任务生命周期和错误传播路径清晰,调试变得更加直接。

C++ Coroutines 与结构化并发

C++20引入的协程(Coroutines)为异步编程带来了革命性的改变,它提供了构建异步操作的基本构件:co_await, co_yield, co_return。协程使得编写看起来像同步代码的异步逻辑成为可能,极大地改善了异步代码的可读性。

然而,需要明确的是,C++协程本身并不直接提供结构化并发。它们是低层级的原语,类似于线程是并发的基本原语。你可以使用 std::thread 编写非结构化的并发代码,你同样可以使用 C++协程编写非结构化的异步代码(例如,启动一个协程而不 co_await 它,使其“飞出去”)。

结构化并发需要建立在协程之上,通过特定的库和设计模式来实现。它要求我们设计出能够管理协程生命周期的“容器”或“作用域”对象。

在 C++ 中实现结构化并发的关键机制

要在 C++ 中实现结构化并发,我们需要利用 RAII (Resource Acquisition Is Initialization) 原则,并结合 C++20 协程的特性,设计出能够充当“并发作用域”的抽象。这个抽象通常被称为 task_groupconcurrency_scope 或类似的名称。

一个理想的 concurrency_scope 应该具备以下能力:

  1. spawn 方法:用于启动新的子任务(协程),并将其纳入当前作用域的管理。
  2. co_await 接口:允许父协程 co_await 整个并发作用域,从而暂停自身,直到所有子任务完成。
  3. 析构函数:作为最后的保障,如果在没有 co_await 的情况下作用域被销毁,它应该尝试清理或等待所有子任务。
  4. 错误聚合与传播:收集子任务中抛出的异常,并在父协程 co_await 作用域时重新抛出。
  5. 取消传播:当父作用域(或其更上层的祖先)被取消时,能够将取消信号传递给所有子任务。

为了演示,我们首先需要一个简化的 Task 类型,它能够代表一个可 co_await 的协程,并能够携带结果或异常。

1. 简化版 Task<T> 协程类型

这是一个基于 C++20 协程的简化版 Task 类型,它提供 co_await 接口,并能处理返回值和异常。在实际的异步库中,这会是一个更复杂的实现(例如 cppcoro::taskboost::asio::awaitable)。

#include <iostream>
#include <vector>
#include <string>
#include <chrono>
#include <thread>
#include <exception>
#include <coroutine> // C++20 coroutine header
#include <numeric>   // For std::iota
#include <future>    // For std::async and std::future for comparison

// --- Simplified Task Type (Minimal Coroutine for Demonstration) ---
// This Task type is designed to be co_awaitable and to carry a result or exception.
// It uses std::suspend_always for initial_suspend and final_suspend,
// meaning the coroutine won't run until explicitly resumed (initial)
// and won't destroy its frame until explicitly resumed by its awaiter (final).

template<typename T = void>
class Task;

namespace detail {

// Tag type for void tasks to distinguish from non-void in promise_type
struct empty_result_tag {};

template<typename T>
struct task_promise_base {
    std::exception_ptr exception_;
    std::coroutine_handle<> awaiting_coroutine_; // The coroutine that co_awaited this task

    void unhandled_exception() {
        exception_ = std::current_exception();
    }

    // Coroutine starts suspended
    std::suspend_always initial_suspend() noexcept { return {}; }

    // Coroutine ends suspended, allowing its awaiter to resume it and get result/exception
    std::suspend_always final_suspend() noexcept { return {}; }

    // Sets the continuation handle for when this task completes
    void set_continuation(std::coroutine_handle<> awaiting_handle) {
        awaiting_coroutine_ = awaiting_handle;
    }

    bool has_exception() const noexcept {
        return static_cast<bool>(exception_);
    }
};

template<typename T>
struct task_promise : task_promise_base<T> {
    T value_;
    bool has_value_ = false;

    Task<T> get_return_object() {
        return Task<T>{std::coroutine_handle<task_promise>::from_promise(*this)};
    }

    void return_value(T value) {
        value_ = std::move(value);
        has_value_ = true;
    }

    bool has_value() const noexcept {
        return has_value_;
    }

    T get_result() {
        if (this->exception_) {
            std::rethrow_exception(this->exception_);
        }
        if (!has_value_) {
            throw std::runtime_error("Task did not return a value. Possibly still running or logic error.");
        }
        return std::move(value_);
    }
};

template<>
struct task_promise<empty_result_tag> : task_promise_base<empty_result_tag> {
    Task<void> get_return_object() {
        return Task<void>{std::coroutine_handle<task_promise>::from_promise(*this)};
    }

    void return_void() {
        // Nothing to do for void return
    }

    bool has_value() const noexcept {
        return !this->has_exception(); // For void, 'has_value' means it completed without exception
    }

    void get_result() {
        if (this->exception_) {
            std::rethrow_exception(this->exception_);
        }
    }
};

// Awaiter for any Task<T>
template<typename T>
struct task_awaiter {
    std::coroutine_handle<task_promise<T>> coro_handle_;

    task_awaiter(std::coroutine_handle<task_promise<T>> handle) : coro_handle_(handle) {}

    bool await_ready() const noexcept {
        // If the coroutine is already done, or has a result/exception, no need to suspend
        return coro_handle_.done() || coro_handle_.promise().has_value() || coro_handle_.promise().has_exception();
    }

    void await_suspend(std::coroutine_handle<> awaiting_handle) noexcept {
        coro_handle_.promise().set_continuation(awaiting_handle);
        // Resume the task coroutine itself if it hasn't started yet
        // In a real scheduler, this would enqueue it. Here, we just run it.
        if (!coro_handle_.done()) {
            coro_handle_.resume();
        }
    }

    T await_resume() {
        return coro_handle_.promise().get_result();
    }
};

template<>
struct task_awaiter<empty_result_tag> {
    std::coroutine_handle<task_promise<empty_result_tag>> coro_handle_;

    task_awaiter(std::coroutine_handle<task_promise<empty_result_tag>> handle) : coro_handle_(handle) {}

    bool await_ready() const noexcept {
        return coro_handle_.done() || coro_handle_.promise().has_value() || coro_handle_.promise().has_exception();
    }

    void await_suspend(std::coroutine_handle<> awaiting_handle) noexcept {
        coro_handle_.promise().set_continuation(awaiting_handle);
        if (!coro_handle_.done()) {
            coro_handle_.resume();
        }
    }

    void await_resume() {
        coro_handle_.promise().get_result();
    }
};

} // namespace detail

template<typename T>
class Task {
public:
    using promise_type = detail::task_promise<T>;
    using CoroHandle = std::coroutine_handle<promise_type>;

    Task(CoroHandle handle) : handle_(handle) {}

    Task(Task&& other) noexcept : handle_(std::exchange(other.handle_, nullptr)) {}
    Task& operator=(Task&& other) no_except {
        if (this != &other) {
            if (handle_) handle_.destroy();
            handle_ = std::exchange(other.handle_, nullptr);
        }
        return *this;
    }

    ~Task() {
        if (handle_) {
            // If a Task is destructed without being awaited, it's a fire-and-forget,
            // which structured concurrency tries to avoid.
            // In a real system, this might log an error or attempt to cancel/join.
            if (!handle_.done()) {
                // For demonstration, we'll try to resume it to completion,
                // but this is not a robust cancellation/cleanup mechanism.
                // handle_.resume();
            }
            handle_.destroy();
        }
    }

    Task(const Task&) = delete;
    Task& operator=(const Task&) = delete;

    // Awaitable interface
    auto operator co_await() const& {
        return detail::task_awaiter<T>(handle_);
    }
    auto operator co_await() && {
        return detail::task_awaiter<T>(handle_);
    }

    bool is_done() const {
        return handle_.done();
    }

    // For debugging or specific scenarios, to start the coroutine without awaiting
    void resume() {
        if (handle_ && !handle_.done()) {
            handle_.resume();
        }
    }

    // Get the coroutine handle (for advanced use, e.g., external scheduler)
    CoroHandle get_handle() const { return handle_; }

private:
    CoroHandle handle_;
};

template<>
class Task<void> {
public:
    using promise_type = detail::task_promise<detail::empty_result_tag>;
    using CoroHandle = std::coroutine_handle<promise_type>;

    Task(CoroHandle handle) : handle_(handle) {}

    Task(Task&& other) noexcept : handle_(std::exchange(other.handle_, nullptr)) {}
    Task& operator=(Task&& other) noexcept {
        if (this != &other) {
            if (handle_) handle_.destroy();
            handle_ = std::exchange(other.handle_, nullptr);
        }
        return *this;
    }

    ~Task() {
        if (handle_) {
            if (!handle_.done()) {
                // Same logic as above: potential issue if not awaited.
                // handle_.resume();
            }
            handle_.destroy();
        }
    }

    Task(const Task&) = delete;
    Task& operator=(const Task&) = delete;

    auto operator co_await() const& {
        return detail::task_awaiter<detail::empty_result_tag>(handle_);
    }
    auto operator co_await() && {
        return detail::task_awaiter<detail::empty_result_tag>(handle_);
    }

    bool is_done() const {
        return handle_.done();
    }

    void resume() {
        if (handle_ && !handle_.done()) {
            handle_.resume();
        }
    }

    CoroHandle get_handle() const { return handle_; }

private:
    CoroHandle handle_;
};

// --- Sync_Wait: A simple blocking runner for Tasks ---
// This is a minimal implementation, primarily for testing and demonstration.
// A real-world `sync_wait` would typically involve an event loop or a more
// sophisticated scheduler to handle task resumption efficiently.
template<typename T>
T sync_wait(Task<T> task) {
    if (!task.get_handle()) {
        throw std::runtime_error("Attempted to sync_wait an empty task.");
    }

    if (task.is_done()) {
        return task.get_handle().promise().get_result();
    }

    struct MainAwaiterPromise {
        T result;
        std::exception_ptr exception;
        bool done = false;

        Task<T> get_return_object() {
            return Task<T>{std::coroutine_handle<MainAwaiterPromise>::from_promise(*this)};
        }

        std::suspend_always initial_suspend() noexcept { return {}; }
        std::suspend_always final_suspend() noexcept {
            done = true;
            return {};
        }

        void return_value(T val) { result = std::move(val); }
        void unhandled_exception() { exception = std::current_exception(); }
        void return_void() {}
    };

    // Allocate promise on heap to avoid stack issues with coroutine_handle
    auto main_awaiter_handle = std::coroutine_handle<MainAwaiterPromise>::from_promise(*new MainAwaiterPromise());

    task.get_handle().promise().set_continuation(main_awaiter_handle);
    task.resume(); // Start the task

    // Busy-wait until the task completes and resumes our main_awaiter_handle
    while (!main_awaiter_handle.promise().done) {
        std::this_thread::sleep_for(std::chrono::microseconds(100));
    }

    // Now the task has completed, get its result or rethrow exception
    std::exception_ptr task_exception = task.get_handle().promise().exception_;
    T task_result = T{};
    if (!task_exception) {
        task_result = task.get_handle().promise().get_result();
    }

    main_awaiter_handle.destroy(); // Clean up the temporary coroutine frame

    if (task_exception) {
        std::rethrow_exception(task_exception);
    }
    return task_result;
}

template<>
void sync_wait<void>(Task<void> task) {
    if (!task.get_handle()) {
        throw std::runtime_error("Attempted to sync_wait an empty task.");
    }

    if (task.is_done()) {
        task.get_handle().promise().get_result();
        return;
    }

    struct MainAwaiterPromise {
        std::exception_ptr exception;
        bool done = false;

        Task<void> get_return_object() {
            return Task<void>{std::coroutine_handle<MainAwaiterPromise>::from_promise(*this)};
        }

        std::suspend_always initial_suspend() noexcept { return {}; }
        std::suspend_always final_suspend() noexcept {
            done = true;
            return {};
        }

        void return_void() {}
        void unhandled_exception() { exception = std::current_exception(); }
    };

    auto main_awaiter_handle = std::coroutine_handle<MainAwaiterPromise>::from_promise(*new MainAwaiterPromise());

    task.get_handle().promise().set_continuation(main_awaiter_handle);
    task.resume();

    while (!main_awaiter_handle.promise().done) {
        std::this_thread::sleep_for(std::chrono::microseconds(100));
    }

    std::exception_ptr task_exception = task.get_handle().promise().exception_;

    main_awaiter_handle.destroy();
    if (task_exception) {
        std::rethrow_exception(task_exception);
    }
    task.get_handle().promise().get_result(); // Re-throw any exception
}

Task 类型说明:

  • promise_type:这是 C++ 协程的核心。Taskpromise_type 定义了协程的生命周期行为(何时暂停、何时恢复、如何处理返回值和异常)。
    • initial_suspend()final_suspend() 都返回 std::suspend_always,这意味着协程在创建时和完成时都会暂停,需要外部显式地 resume() 或通过 co_await 机制来恢复。
    • set_continuation() 用于记录 co_awaitTask 的父协程句柄,以便在 Task 完成时恢复父协程。
    • unhandled_exception() 捕获协程内部未处理的异常。
    • get_result() 用于获取协程的返回值,如果协程抛出了异常,则重新抛出该异常。
  • Task 类本身:封装了 std::coroutine_handle,提供了 co_await 运算符重载,使其可以被 co_await。移动语义确保协程句柄的正确转移和销毁。
  • sync_wait 函数:一个简化的同步等待函数,用于在非协程上下文(如 main 函数)中运行并阻塞等待一个 Task 完成。它通过将自身设置为 Task 的延续,并在一个循环中等待 Task 完成来模拟阻塞。

2. concurrency_scope 实现:结构化并发的核心

现在,我们有了基础 Task 类型,可以构建 concurrency_scope 了。这个类将作为父协程和子协程之间的桥梁,强制执行结构化并发的原则。

// --- Concurrency_Scope for Structured Concurrency ---
// This class demonstrates the core principles of structured concurrency.
// It acts as a parent scope, managing the lifetime and completion of child Tasks.

class concurrency_scope {
public:
    concurrency_scope() = default;

    // Not copyable or movable, as it manages its own set of children
    concurrency_scope(const concurrency_scope&) = delete;
    concurrency_scope& operator=(const concurrency_scope&) = delete;
    concurrency_scope(concurrency_scope&&) = delete;
    concurrency_scope& operator=(concurrency_scope&&) = delete;

    // Destructor: Ensures all children are completed.
    // In a real async system, if the scope is destructed without being co_awaited,
    // this might imply unhandled tasks, which could be an error or require explicit cancellation.
    // For this example, we'll try to sync_wait them, though that's generally not
    // ideal in an async context. The primary mechanism is co_awaiting the scope itself.
    ~concurrency_scope() {
        if (!children_.empty()) {
            std::cerr << "Warning: concurrency_scope being destructed with unawaited children. Attempting to sync_wait." << std::endl;
            // A more robust library would likely throw an error here or force cancellation.
            // For demonstration, we'll try to wait, but this can lead to blocking.
            // The ideal usage is always to co_await the scope.
            for (auto& child : children_) {
                if (!child.is_done()) {
                    try {
                        sync_wait(std::move(child)); // Will block until child completes
                    } catch (const std::exception& e) {
                        std::cerr << "Exception in unawaited child during scope destruction: " << e.what() << std::endl;
                    } catch (...) {
                        std::cerr << "Unknown exception in unawaited child during scope destruction." << std::endl;
                    }
                }
            }
        }
    }

    // Spawn a new child Task<void> within this scope
    template<typename F>
    void spawn(F&& func) {
        // Create a Task from the callable.
        // This Task starts suspended.
        children_.push_back(std::forward<F>(func)());
    }

    // Awaiter for concurrency_scope itself
    // When a parent coroutine co_awaits this scope, it waits for all children.
    struct awaiter {
        concurrency_scope& scope_;
        std::coroutine_handle<> awaiting_handle_;
        std::vector<std::exception_ptr> child_exceptions_;

        awaiter(concurrency_scope& scope) : scope_(scope) {}

        bool await_ready() const noexcept {
            // If there are no children, or all children are already done, we are ready.
            for (const auto& child : scope_.children_) {
                if (!child.is_done()) {
                    return false;
                }
            }
            return true;
        }

        void await_suspend(std::coroutine_handle<> awaiting_handle) noexcept {
            awaiting_handle_ = awaiting_handle;
            // Set this awaiting_handle as the continuation for all children.
            // When a child completes, it will resume this parent's awaiter.
            for (auto& child_task : scope_.children_) {
                child_task.get_handle().promise().set_continuation(awaiting_handle_);
                child_task.resume(); // Start the child task if it hasn't started
            }
        }

        void await_resume() {
            // All children should have completed and resumed this awaiter.
            // Now, collect any exceptions and rethrow if necessary.
            for (auto& child_task : scope_.children_) {
                try {
                    child_task.get_handle().promise().get_result(); // Will rethrow if child had an exception
                } catch (...) {
                    child_exceptions_.push_back(std::current_exception());
                }
            }

            // Clear children after they have been processed and their results/exceptions handled
            scope_.children_.clear();

            if (!child_exceptions_.empty()) {
                // For simplicity, rethrow the first exception.
                // A real system might aggregate them (e.g., using std::nested_exception)
                // or provide a way to access all of them.
                std::rethrow_exception(child_exceptions_[0]);
            }
        }
    };

    awaiter operator co_await() {
        return awaiter(*this);
    }

private:
    std::vector<Task<void>> children_; // Stores the child coroutine tasks
};

// --- Helper for structured concurrency within a function ---
// This is a common pattern to ensure a concurrency_scope is always awaited.
// It works like a `std::jthread` for coroutines.
template<typename F>
Task<void> co_scope(F&& func) {
    concurrency_scope scope;
    co_await std::forward<F>(func)(scope); // Pass the scope to the function
    co_await scope; // Ensure the scope itself is awaited before this co_scope task completes
    co_return;
}

concurrency_scope 说明:

  • RAII 原则concurrency_scope 是一个典型的 RAII 对象。它的生命周期定义了它所管理的子任务的并发作用域。当 concurrency_scope 对象被销毁时,它所管理的资源(即子任务)也会被清理。
  • spawn 方法:接受一个可调用对象(通常是一个 lambda 表达式),该对象会返回一个 Task<void>spawn 将这个 Task 添加到内部的 children_ 列表中。
  • co_await 运算符重载:这是 concurrency_scope 实现结构化并发的关键。当一个父协程 co_await 一个 concurrency_scope 对象时:
    • await_ready():检查是否所有子任务都已完成。如果都已完成,父协程无需暂停。
    • await_suspend():如果子任务未完成,父协程将在此处暂停 (awaiting_handle_)。同时,concurrency_scope 会将父协程的句柄注册为所有子任务的延续。这意味着,当每个子任务完成时,它都会尝试恢复这个父协程。
    • await_resume():当所有子任务都完成并恢复了父协程后,父协程会在这里继续执行。await_resume() 负责遍历所有子任务,收集并处理它们可能抛出的任何异常。为简单起见,这里只重新抛出第一个异常。
  • 析构函数:作为一个防御性编程措施,如果 concurrency_scope 在没有被 co_await 的情况下被销毁(例如,在同步函数中创建了一个 concurrency_scope 对象,但没有等待它),析构函数会尝试 sync_wait 剩余的子任务。这通常不是异步编程的最佳实践,因为它会导致阻塞。理想情况下,concurrency_scope 应该总是被 co_await
  • co_scope 辅助函数:这是一个非常有用的模式,它接受一个 lambda 函数,该函数接收一个 concurrency_scope 对象。co_scope 确保在 lambda 执行完毕后,concurrency_scope 会被 co_await,从而保证所有子任务都被等待。这类似于 std::jthread 自动 join 线程的行为。

保证父协程退出前子协程全部关闭

现在我们来到了讲座的核心问题:在 C++ 中如何保证父协程退出前子协程全部关闭?

通过上述 concurrency_scope 的设计,我们可以清晰地回答这个问题:

  1. 通过 co_await concurrency_scope 显式等待:
    这是最主要、最推荐的机制。当父协程执行 co_await my_scope; 时,它会暂停自身的执行,直到 my_scope 中所有通过 spawn 启动的子协程都完成。只有所有子协程都完成(或抛出异常),父协程才能继续执行。这确保了父协程不会在子协程之前退出。

    工作原理:

    • concurrency_scope::operator co_await() 返回一个 awaiter 对象。
    • awaiter::await_suspend() 被调用,父协程句柄 (awaiting_handle_) 被传递给它。
    • awaiter 将此父协程句柄注册为所有子 Task 的延续。
    • 当所有子 Task 完成(无论是正常返回还是抛出异常)时,它们会尝试恢复这个父协程。
    • 一旦所有子 Task 完成,父协程在 awaiter::await_resume() 处恢复,并处理子 Task 可能抛出的异常。

    这种机制完美地实现了结构化并发的父子生命周期管理。

  2. 通过 co_scope 辅助函数强制等待:
    co_scope 函数通过封装 concurrency_scope 对象并强制对其进行 co_await,确保了在其作用域内启动的所有子协程都会被等待。这提供了一种简洁、安全的方式来创建结构化并发块。

    Task<void> parent_task() {
        std::cout << "Parent: Starting child tasks..." << std::endl;
        co_await co_scope([](concurrency_scope& scope) -> Task<void> {
            scope.spawn([]() -> Task<void> {
                std::cout << "Child 1: Working..." << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(500));
                std::cout << "Child 1: Done." << std::endl;
                co_return;
            });
    
            scope.spawn([]() -> Task<void> {
                std::cout << "Child 2: Working..." << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(800));
                std::cout << "Child 2: Done." << std::endl;
                co_return;
            });
            co_return;
        });
        std::cout << "Parent: All child tasks completed." << std::endl;
        co_return;
    }
    
    // sync_wait(parent_task()); // 在main函数中运行
  3. concurrency_scope 析构函数作为最后的安全网(非推荐):
    虽然 concurrency_scope 的析构函数尝试 sync_wait 任何未完成的子任务,但这通常被视为一种“应急”措施,而不是推荐的异步编程方式。在真正的异步系统中,阻塞 sync_wait 会导致事件循环停滞,性能下降。它主要用于在非协程上下文或异常路径中,防止任务完全泄露。在设计良好的异步代码中,所有 concurrency_scope 都应该通过 co_await 被显式等待。

表格总结:保证子协程关闭的机制

机制 描述 优点 缺点/注意事项
co_await my_scope; 父协程显式 co_await concurrency_scope 对象,暂停自身直到所有子协程完成。 推荐方式,符合结构化并发理念。异步、非阻塞,错误和取消传播清晰。 要求父协程本身是协程。
co_scope 辅助函数 封装 concurrency_scope 的创建和 co_await 过程,提供 RAII 风格的结构化并发块。 提供简洁、安全的结构化并发语法糖。确保 co_await 发生。 同上,要求父协程是协程。
concurrency_scope 析构函数 concurrency_scope 对象生命周期结束时,如果仍有未完成的子协程,尝试阻塞等待它们完成。 作为安全网,防止任务完全“飞出”失控。 不推荐作为异步编程的主要机制。会阻塞当前线程/事件循环,可能导致性能问题或死锁。在异步代码中应尽量避免阻塞。如果频繁触发,通常表明设计缺陷。

错误处理与取消

结构化并发不仅关乎生命周期管理,还对错误处理和取消有着明确的语义。

错误处理

在我们的 concurrency_scope 实现中,错误处理机制如下:

  • 子协程抛出异常:如果一个子 Task 内部抛出异常,它的 promise_type::unhandled_exception() 会捕获该异常并存储为 std::exception_ptrTask 仍然会完成,但其内部状态会标记为异常。
  • 父协程 co_await 作用域:当父协程 co_await concurrency_scope 时,concurrency_scope::awaiter::await_resume() 会遍历所有子 Task。它会调用每个子 Taskget_result()。如果子 Task 内部存有异常,get_result() 会重新抛出该异常。
  • 异常传播concurrency_scopeawaiter::await_resume() 会收集所有子任务的异常。在本示例中,它会重新抛出第一个遇到的异常,从而将错误传播到父协程。在更复杂的场景中,可以设计一个异常聚合器,将所有子任务的异常打包成一个 std::nested_exception 或自定义的异常类型。

这使得错误处理变得像顺序代码一样直观:子任务的错误会向上冒泡到其父作用域。

取消

取消机制需要更多的设计。通常,这涉及到一个取消令牌(Cancellation Token)的概念。

  1. 取消令牌源(Cancellation Source):一个对象,可以被设置为“已取消”状态。
  2. 取消令牌(Cancellation Token):可以从取消令牌源获取,用于查询当前是否已被取消。

集成到 concurrency_scope

  • concurrency_scope 可以内部持有一个 cancellation_source
  • spawn 子任务时,可以将 cancellation_sourcecancellation_token 传递给子任务。
  • 子任务在其执行逻辑中,定期检查 cancellation_token 是否已被取消,如果取消,则提前 co_return 或抛出 std::operation_canceled 异常。
  • concurrency_scope 也可以提供一个 cancel() 方法。当调用 cancel() 时,它会设置其内部 cancellation_source 为已取消状态,从而通知所有子任务终止。
  • 当父协程 co_await concurrency_scope 时,如果父协程本身被取消,concurrency_scope 也可以响应这种取消,并向其子任务传播。

由于取消机制相对复杂且需要额外的 cancellation_token 实现,为了保持本讲座的简洁性和核心焦点,我们的 concurrency_scope 示例中没有包含完整的取消实现,但它的扩展方向是明确的。

实际应用场景

结构化并发在现代软件开发中具有广泛的应用,尤其是在以下领域:

  • Web 服务器和 API 处理器:处理单个 HTTP 请求时,可能需要并发地从数据库读取数据、调用其他微服务、处理文件等。一个 concurrency_scope 可以管理这些子操作,确保请求处理完整,并统一报告错误。
  • 并行数据处理:将一个大型数据集的处理任务分解为多个子任务并行执行,例如图像处理、数据分析。concurrency_scope 可以等待所有子任务完成,然后聚合结果。
  • UI 编程:在后台执行耗时操作(如网络请求、文件I/O)而不会阻塞用户界面。一旦后台任务完成或失败,结果会以结构化的方式返回到UI线程进行更新。
  • 资源密集型计算:在科学计算或金融建模中,常常需要并行执行多个复杂的计算步骤。
  • 游戏开发:加载资源、处理AI逻辑、物理模拟等都可以通过结构化并发来组织。

现有库与框架

虽然 C++ 标准库目前还没有直接提供像 concurrency_scope 这样的结构化并发原语,但一些流行的异步库已经采纳了这些思想:

  • Boost.Asio:作为 C++ 异步编程的基石,Boost.Asio 的 boost::asio::awaitableco_spawn 机制,结合其 executor 模型,可以用来构建结构化并发。asio::experimental::make_parallel_group 是一个实验性特性,旨在实现类似的功能。
  • CPPGCT (C++ Generic Concurrency Toolkit):这是一个旨在提供结构化并发原语的库,其设计深受 Go 语言 goroutineselect 的启发。
  • cppcoro (Lewis Baker):提供了高质量的 task 和其他协程构建块,虽然没有直接提供 concurrency_scope,但其设计理念和工具集是构建结构化并发的良好基础。
  • std::jthread (C++20):虽然是针对线程而非协程,但 std::jthread 的自动 join 行为正是结构化并发思想在线程层面的体现。这表明 C++ 标准库也在朝着这个方向发展。

展望未来

结构化并发是现代并发编程领域的一个重要里程碑,它将并发的复杂性转化为可管理的层次结构,使得异步编程更加安全、可预测和易于维护。C++20 协程为我们提供了实现这一目标的强大工具,但它需要我们在库层面进行精心的设计和封装。

随着 C++ 标准的不断演进,我们有理由期待未来标准库能够提供更完善、更直接的结构化并发原语。在此之前,通过像 concurrency_scope 这样的自定义实现,我们已经可以

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注