各位听众,大家好。今天我们来深入探讨一个在现代并发编程中日益重要且引人入胜的话题——“结构化并发”(Structured Concurrency),并着重讲解如何在 C++ 中,特别是结合 C++20 协程的背景下,管理协程的生命周期并实践结构化并发的最佳策略。
并发编程的挑战与结构化并发的兴起
在过去几十年里,并发编程一直是软件开发领域的一大难题。传统的并发模型,如基于线程和锁的模型,虽然强大,但也带来了诸多挑战:
- 复杂性与错误率: 手动管理线程的生命周期、同步原语(互斥量、信号量、条件变量)以及共享数据的访问,极易引入死锁、活锁、数据竞争等难以调试的并发错误。
- 资源管理: 线程是相对重量级的操作系统资源,频繁创建和销毁开销较大。此外,如何确保在并发操作完成或失败时,相关资源得到正确释放,是一个持续的挑战。
- 错误传播与取消: 当一个并发任务失败时,如何有效地将错误传播给其发起者?当一个操作不再需要时,又如何优雅地取消它及其所有相关的子任务?这些问题在传统模型中通常需要复杂的、手动协调的机制。
为了应对这些挑战,软件社区不断探索更高级的并发抽象。协程(Coroutines)的出现是其中一个重要里程碑。协程提供了轻量级、用户态的协作式多任务处理能力,它们允许程序在某个点暂停执行,并在稍后从同一点恢复,而无需像线程那样进行昂贵的上下文切换。这极大地提高了并发调度的效率和灵活性。
然而,仅仅拥有协程还不足以解决所有问题。裸协程本身只是一种底层机制,它并没有自带生命周期管理、错误传播和取消的语义。开发者仍然需要手动处理这些复杂的方面。正是在这样的背景下,“结构化并发”的概念应运而生。
什么是结构化并发?核心概念与原则
结构化并发是一种编程范式,旨在将并发操作组织成具有清晰作用域和生命周期的结构,类似于我们熟悉的同步函数调用栈。它的核心思想可以概括为:
当一个父操作(或一个并发作用域)启动了多个子操作时,该父操作必须在其所有子操作完成或终止之前不完成。
这听起来简单,但其蕴含的原则对并发编程的健壮性和可预测性至关重要。我们可以将其与结构化编程中的 if 语句、for 循环或函数调用进行类比:一个 if 语句的执行流总是在其内部的条件分支完成后才继续;一个函数在其所有内部逻辑执行完毕后才返回。结构化并发将这种“等待子任务完成”的语义扩展到了并发领域。
结构化并发的关键特性包括:
- 作用域绑定(Scope Binding): 并发操作的生命周期被限制在一个明确的作用域内。当这个作用域退出时,所有由它启动的子任务要么已经完成,要么会被强制终止(取消)。这通常通过 RAII(Resource Acquisition Is Initialization)风格的结构来实现,例如一个
task_group或concurrency_scope对象。 - 父子关系(Parent-Child Relationship): 任务之间形成明确的父子层次结构。父任务负责管理其子任务,并在其析构时隐式或显式地等待所有子任务完成。
- 错误传播(Error Propagation): 如果任何一个子任务失败(例如抛出异常),错误会向上冒泡并通知其父任务。父任务通常会收集这些错误,或者在遇到第一个错误时取消所有其他子任务并重新抛出。
- 取消传播(Cancellation Propagation): 当父任务决定取消时,这个取消请求会级联地传播给所有正在运行的子任务。子任务被期望能够响应取消请求并优雅地终止。
| 特性 | 传统并发模型(线程/锁) | 结构化并发模型(协程/结构化API) |
|---|---|---|
| 生命周期管理 | 手动、复杂,易导致悬挂任务或资源泄漏。 | 自动、作用域绑定,RAII 确保任务生命周期与作用域同步。 |
| 错误处理 | 需要手动传递异常或错误码,复杂且易漏。 | 错误自动向上冒泡,父任务可统一处理或聚合。 |
| 取消机制 | 需要手动设计和实现取消令牌、标志或中断机制。 | 取消请求可级联传播给所有子任务,简化取消逻辑。 |
| 可推理性 | 程序的并发行为难以预测和调试。 | 并发流与同步流类似,行为更可预测,易于调试。 |
| 资源管理 | 需小心处理共享资源,确保所有路径都释放。 | 作用域确保在所有子任务完成后,资源才被安全释放。 |
结构化并发的优势
采纳结构化并发范式能带来多方面显著的优势:
- 可预测性与安全性: 消除了悬挂任务(dangling tasks)的风险。当一个并发作用域退出时,可以保证所有由它启动的任务要么已完成,要么已被取消。这确保了资源(如内存、文件句柄、网络连接)在父作用域生命周期结束时能被正确释放,避免了资源泄漏和未定义行为。
- 简化错误处理: 错误不再在并发任务之间“迷失”。当一个子任务抛出异常时,这个异常会结构化地传播到其父任务。父任务可以决定如何处理这个异常,例如,它可以捕获它,记录它,或者重新抛出,甚至可以导致所有其他兄弟任务被取消。
- 高效的取消机制: 结构化并发提供了一种优雅的方式来取消一组相关的并发操作。当父操作被取消时,取消请求会自动传播到其所有子任务。这使得实现超时、用户取消或早期退出等功能变得更加简单和可靠。
- 易于调试和推理: 程序的并发行为变得更易于理解。由于并发任务的生命周期与代码的结构(作用域)紧密绑定,开发者可以更容易地推理程序的执行流程、数据流和错误路径。调试器也可以更好地理解和展示这种结构。
- 更好的资源管理: 结构化并发是 RAII 原则在并发领域的自然延伸。它鼓励开发者将并发操作的生命周期与 C++ 对象的生命周期绑定,从而利用 C++ 的析构函数来执行清理逻辑,极大地简化了复杂的资源管理问题。
C++ 协程(C++20 coroutines)基础回顾
C++20 引入了对协程的语言级支持,这是一项强大的特性,允许开发者编写非阻塞、异步的代码,而无需使用传统的回调地狱(callback hell)或复杂的线程管理。C++ 协程是底层的、无栈的,并且是协作式的。
核心关键字:
co_await: 暂停当前协程,并将控制权返回给调用者,等待一个可等待对象(awaitable)完成。co_yield: 暂停当前协程,返回一个值给调用者,并在下次恢复时从暂停点继续。主要用于实现生成器。co_return: 结束协程的执行,可选地返回一个值。
一个 C++ 协程的生命周期由其 promise_type 管理,这是一个模板参数,由协程的返回类型(Awaitable)定义。promise_type 负责:
get_return_object(): 获取协程的返回对象(例如std::future或自定义的Task类型)。initial_suspend(): 定义协程的初始暂停行为(立即执行还是先暂停)。final_suspend(): 定义协程的最终暂停行为(结束时立即销毁还是先暂停)。return_value()/return_void(): 处理co_return返回的值。unhandled_exception(): 处理协程内部未捕获的异常。
一个简单的协程返回类型 Task 可能看起来像这样:
#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <optional>
#include <thread>
#include <chrono>
// 1. 定义一个简单的Task返回类型
template <typename T = void>
struct Task {
struct promise_type {
T value_; // 用于co_return返回值
std::exception_ptr exception_; // 用于捕获异常
Task get_return_object() { return Task{std::coroutine_handle<promise_type>::from_promise(*this)}; }
std::suspend_always initial_suspend() { return {}; } // 初始暂停
std::suspend_always final_suspend() noexcept { return {}; } // 最终暂停
void unhandled_exception() { exception_ = std::current_exception(); }
void return_value(T value) { value_ = std::move(value); }
void return_void() { /* for Task<void> */ }
};
std::coroutine_handle<promise_type> handle_;
Task(std::coroutine_handle<promise_type> h) : handle_(h) {}
Task(Task&& other) noexcept : handle_(std::exchange(other.handle_, nullptr)) {}
Task& operator=(Task&& other) noexcept {
if (this != &other) {
if (handle_) handle_.destroy();
handle_ = std::exchange(other.handle_, nullptr);
}
return *this;
}
~Task() { if (handle_) handle_.destroy(); } // RAII 销毁协程帧
// Awaiter 接口
bool await_ready() { return handle_.done(); }
void await_suspend(std::coroutine_handle<> awaiting_coroutine) {
// 在实际应用中,这里需要将 awaiting_coroutine 存储起来,
// 在 Task 完成时恢复它。这里简化处理,直接恢复。
// 但对于一个真正的 Task,应该由调度器或完成回调来做。
// 为了演示,这里假设在某些场景下,我们可以直接恢复。
// 更准确地说,这里应该注册一个回调,当handle_完成时,
// 调度awaiting_coroutine恢复。
}
T await_resume() {
if (handle_.promise().exception_) {
std::rethrow_exception(handle_.promise().exception_);
}
if constexpr (!std::is_void_v<T>) {
return std::move(handle_.promise().value_);
}
}
// 运行协程直到完成
void run() {
while (!handle_.done()) {
handle_.resume();
}
// 检查异常并在run结束时抛出
if (handle_.promise().exception_) {
std::rethrow_exception(handle_.promise().exception_);
}
}
};
// 简单的awaitable,用于模拟异步操作
struct Delay {
std::chrono::milliseconds duration_;
Delay(std::chrono::milliseconds d) : duration_(d) {}
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<> h) {
std::thread([h, d = duration_]() {
std::this_thread::sleep_for(d);
h.resume(); // 在另一个线程中恢复协程
}).detach(); // 注意:detach在这里是演示,实际生产中需要更好的线程管理
}
void await_resume() const {}
};
Task<int> compute_async(int id) {
std::cout << "Task " << id << " started on thread " << std::this_thread::get_id() << std::endl;
co_await Delay(std::chrono::milliseconds(100 * id)); // 模拟异步工作
if (id == 3) {
throw std::runtime_error("Error in task 3!");
}
std::cout << "Task " << id << " finished on thread " << std::this_thread::get_id() << std::endl;
co_return id * 10;
}
// int main() {
// std::cout << "Main thread " << std::this_thread::get_id() << std::endl;
// Task<int> t1 = compute_async(1);
// t1.run(); // 运行协程直到完成
// Task<int> t2 = compute_async(2);
// t2.run();
// try {
// Task<int> t3 = compute_async(3);
// t3.run();
// } catch (const std::exception& e) {
// std::cerr << "Caught exception: " << e.what() << std::endl;
// }
// return 0;
// }
痛点: 尽管协程提供了灵活性,但上述 Task 的 run() 方法是阻塞的,并且手动调用 run() 来驱动协程,这不符合异步编程的初衷。更重要的是,如果我们要启动多个 Task,如何等待它们全部完成?如何处理其中一个失败的情况?这些都需要更高层级的抽象来管理,而这正是结构化并发的目标。
在 C++ 中实现结构化并发的策略与模式
C++20 协程是语言的底层机制,它本身不提供开箱即用的结构化并发支持。这意味着我们需要在标准库之上,或者通过第三方库,来构建我们自己的结构化并发抽象。最常见的模式是实现一个 task_group 或 concurrency_scope 类。
核心模式:task_group 或 concurrency_scope
一个 RAII 风格的 task_group 类是实现结构化并发的基石。它的设计思路如下:
- RAII 封装:
task_group是一个对象,在构造时初始化其内部状态,在析构时执行关键的同步操作(等待所有子任务完成)。 - 任务启动接口: 提供一个方法(例如
spawn或add)来启动新的协程子任务,并将其添加到组中。 - 生命周期管理: 内部存储所有启动的子任务的句柄或代表其生命周期的对象(例如
std::future或自定义的Task对象)。 - 同步等待: 在
task_group的析构函数中,它会显式或隐式地join()所有已启动的子任务,确保它们全部完成。 - 错误聚合与传播: 在等待子任务完成时,它会检查是否有子任务抛出了异常。它可以选择捕获第一个异常并重新抛出,或者聚合所有异常。
- 取消机制集成:
task_group可以与取消令牌(Cancellation Token)机制集成,当task_group被销毁或显式请求取消时,向所有子任务发送取消信号。
C++ 中的实现策略
由于 C++ 标准库目前没有直接的 task_group 实现,我们需要自己构建。以下是一些关键组件和实现思路:
- 协程包装类型 (
Task或Awaitable): 我们需要一个协程的返回类型,它不仅能表示协程的执行结果,还能处理异常和取消。它通常会封装std::coroutine_handle,并提供await_ready,await_suspend,await_resume方法。 - 并发容器:
task_group需要一个容器来存储所有正在运行的子任务。std::vector<Task>或std::vector<std::future<T>>都是不错的选择。 - 同步机制: 为了在析构函数中等待所有任务完成,我们需要同步原语。
std::latch(C++20),std::counting_semaphore(C++20), 或者简单的std::mutex+std::condition_variable都可以用来实现等待。 - 线程管理: 协程本身是无栈的,需要在某个执行器(Executor)或线程上下文中运行。
task_group可以选择在当前线程上调度子任务,或者使用线程池(例如std::jthread或自定义线程池)来并行执行。
C++ 结构化并发中的关键技术点与挑战
构建一个健壮的结构化并发框架,需要仔细处理以下几个关键技术点:
错误传播与异常处理
在并发环境中,异常处理比同步代码更复杂。一个子任务抛出的异常不应默默地消失,而应该被父任务感知并处理。
策略:
std::exception_ptr: C++ 标准库提供了std::exception_ptr,它允许我们捕获一个异常的副本,并在稍后重新抛出。每个Task的promise_type应该包含一个std::exception_ptr成员来存储未处理的异常。- 父任务聚合:
task_group在等待所有子任务完成时,需要遍历所有子任务,检查它们的exception_ptr。- “第一个失败者获胜”: 遇到第一个异常时,立即取消所有其他任务,并重新抛出这个异常。
- “聚合所有异常”: 收集所有子任务的异常,并在父任务中抛出一个包含所有这些异常的复合异常类型。
std::terminate的风险: 如果协程内的异常没有被promise_type::unhandled_exception()处理,或者处理不当,可能会导致std::terminate被调用。
// 示例:Task的promise_type中处理异常
template <typename T>
struct Task {
struct promise_type {
// ... 其他成员 ...
std::exception_ptr exception_;
void unhandled_exception() {
exception_ = std::current_exception(); // 捕获异常
}
// ...
};
// ...
// 在await_resume() 或 run() 中检查并重新抛出
T await_resume() {
if (handle_.promise().exception_) {
std::rethrow_exception(handle_.promise().exception_);
}
// ...
}
// ...
};
取消机制
优雅地取消正在运行的并发任务是结构化并发的另一个核心优势。
策略:
- 取消令牌(Cancellation Token): 设计一对
cancellation_source和cancellation_token。cancellation_source用于发起取消,cancellation_token传递给子任务以供查询。 - 协作式取消: C++ 协程默认是协作式的。子任务需要定期检查
cancellation_token是否被请求取消,并在接收到请求时,执行清理工作并提前退出(例如,通过抛出std::runtime_error("Task cancelled")或返回一个特殊的取消状态)。 - 可等待对象(Awaitable)与取消:
co_await操作本身可以设计为响应取消。例如,一个Delayawaitable 可以在等待过程中检查取消,如果被取消则立即返回。 task_group中的取消: 当task_group的析构函数被调用,或者task_group所在的函数抛出异常时,task_group可以自动触发其cancellation_source,从而取消所有子任务。
#include <atomic>
#include <vector>
#include <mutex>
#include <condition_variable>
// 简单的取消令牌实现
class cancellation_token;
class cancellation_source {
friend class cancellation_token;
std::atomic<bool> cancelled_ = false;
std::mutex mtx_;
std::condition_variable cv_;
public:
void request_cancellation() {
cancelled_.store(true, std::memory_order_release);
cv_.notify_all(); // 通知所有等待的协程
}
cancellation_token get_token() const; // 定义在 cancellation_token 之后
};
class cancellation_token {
const cancellation_source* source_;
public:
cancellation_token(const cancellation_source& source) : source_(&source) {}
bool is_cancellation_requested() const {
return source_->cancelled_.load(std::memory_order_acquire);
}
void throw_if_cancellation_requested() const {
if (is_cancellation_requested()) {
throw std::runtime_error("Operation cancelled.");
}
}
// Awaitable for waiting on cancellation
struct Awaiter {
const cancellation_token& token_;
std::coroutine_handle<> awaiting_coroutine_;
bool await_ready() const { return token_.is_cancellation_requested(); }
void await_suspend(std::coroutine_handle<> h) {
awaiting_coroutine_ = h;
// 注册回调或等待条件变量
std::unique_lock<std::mutex> lock(token_.source_->mtx_);
token_.source_->cv_.wait(lock, [&]{
return token_.is_cancellation_requested();
});
h.resume(); // 取消时立即恢复
}
void await_resume() const { token_.throw_if_cancellation_requested(); }
};
Awaiter operator co_await() const { return Awaiter{*this}; }
};
inline cancellation_token cancellation_source::get_token() const {
return cancellation_token(*this);
}
// 示例协程如何使用取消令牌
Task<void> cancellable_task(int id, cancellation_token token) {
std::cout << "Cancellable task " << id << " started." << std::endl;
for (int i = 0; i < 5; ++i) {
co_await Delay(std::chrono::milliseconds(100)); // 模拟工作
if (token.is_cancellation_requested()) {
std::cout << "Cancellable task " << id << " detected cancellation." << std::endl;
token.throw_if_cancellation_requested(); // 抛出异常退出
}
std::cout << "Cancellable task " << id << " working..." << std::endl;
}
std::cout << "Cancellable task " << id << " finished." << std::endl;
co_return;
}
协程调度与执行器(Executors)
协程本身不包含线程信息,它需要在某个线程上下文中被 resume()。结构化并发与调度器紧密相关。
策略:
- 自定义执行器: 可以创建一个简单的线程池作为执行器,
task_group的spawn方法可以将协程提交给这个线程池执行。 std::jthread: C++20 的std::jthread提供了自动join()功能,可以简化线程管理。但它不直接提供协程调度功能。- 上下文切换:
co_await操作通常会涉及到上下文切换,即将控制权返回给调用者。如果await_suspend决定在另一个线程上恢复协程,那么协程的后续执行就会切换到那个线程。 - 线程亲和性: 某些任务可能需要绑定到特定线程。结构化并发框架应允许指定任务的执行器。
资源管理与 RAII
结构化并发的核心思想就是将并发操作的生命周期与 C++ 对象的生命周期绑定,充分利用 RAII。
策略:
task_group作为 RAII 守卫:task_group对象在栈上创建,其析构函数负责等待所有子任务完成,从而确保在其作用域退出时所有资源得到清理。- 协程返回类型: 协程的返回类型(
Task)也应该是 RAII 风格的,在其析构函数中销毁协程帧(如果尚未销毁)。
返回类型与句柄管理
- 统一的
Task类型: 为所有协程定义一个通用的Task<T>返回类型,它封装了协程句柄、异常指针等。 - 句柄的生命周期:
std::coroutine_handle本身只是一个指针,它不拥有协程帧的内存。协程帧的内存由promise_type的get_return_object返回的Task对象拥有。确保Task对象的生命周期足够长,直到协程完成并被销毁。task_group负责管理这些Task实例。
实践案例:构建一个简易的 task_group
现在,让我们通过一个实际的 C++ 代码示例,来构建一个简易的 task_group,它将结合我们前面讨论的取消、异常传播和 RAII 原则。
我们将构建以下组件:
cancellation_source和cancellation_token:用于取消机制。Task:一个基础协程返回类型,支持异常传播和外部恢复。thread_pool_executor:一个简单的线程池,用于调度协程。task_group:我们的结构化并发核心,用于启动和管理Task。
#include <coroutine>
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <future> // 用于简化Task的等待
#include <atomic>
#include <stdexcept>
#include <optional>
#include <chrono>
// --- 1. Cancellation Mechanism ---
class cancellation_token;
class cancellation_source {
friend class cancellation_token;
std::atomic<bool> cancelled_ = false;
std::mutex mtx_;
std::condition_variable cv_;
public:
void request_cancellation() {
cancelled_.store(true, std::memory_order_release);
cv_.notify_all();
}
cancellation_token get_token() const; // Forward declaration
};
class cancellation_token {
const cancellation_source* source_;
public:
cancellation_token(const cancellation_source& source) : source_(&source) {}
bool is_cancellation_requested() const {
return source_->cancelled_.load(std::memory_order_acquire);
}
void throw_if_cancellation_requested() const {
if (is_cancellation_requested()) {
throw std::runtime_error("Operation cancelled.");
}
}
// Awaitable: co_await token; will suspend until cancelled or throws
struct Awaiter {
const cancellation_token& token_;
std::coroutine_handle<> awaiting_coroutine_;
bool await_ready() const { return token_.is_cancellation_requested(); }
void await_suspend(std::coroutine_handle<> h) {
awaiting_coroutine_ = h;
std::unique_lock<std::mutex> lock(token_.source_->mtx_);
// Wait for cancellation or spurious wakeup, then resume
token_.source_->cv_.wait(lock, [&]{
return token_.is_cancellation_requested();
});
// If we are here, cancellation was requested. Resume the awaiting coroutine.
if (awaiting_coroutine_) awaiting_coroutine_.resume();
}
void await_resume() const { token_.throw_if_cancellation_requested(); }
};
Awaiter operator co_await() const { return Awaiter{*this}; }
};
inline cancellation_token cancellation_source::get_token() const {
return cancellation_token(*this);
}
// --- 2. Simple Thread Pool Executor ---
class thread_pool_executor {
std::vector<std::thread> workers_;
std::queue<std::coroutine_handle<>> tasks_;
std::mutex mtx_;
std::condition_variable cv_;
std::atomic<bool> stop_ = false;
public:
thread_pool_executor(size_t num_threads = std::thread::hardware_concurrency()) {
for (size_t i = 0; i < num_threads; ++i) {
workers_.emplace_back([this] {
while (true) {
std::coroutine_handle<> task;
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this] {
return stop_ || !tasks_.empty();
});
if (stop_ && tasks_.empty()) {
return;
}
task = tasks_.front();
tasks_.pop();
}
task.resume(); // Resume the coroutine
}
});
}
}
~thread_pool_executor() {
stop_.store(true, std::memory_order_release);
cv_.notify_all();
for (std::thread& worker : workers_) {
if (worker.joinable()) {
worker.join();
}
}
}
void schedule(std::coroutine_handle<> h) {
{
std::unique_lock<std::mutex> lock(mtx_);
tasks_.push(h);
}
cv_.notify_one();
}
};
// Global thread pool for simplicity in this example
thread_pool_executor global_executor(4);
// --- 3. Task Coroutine Return Type ---
template <typename T = void>
struct Task {
struct promise_type {
T value_; // For co_return value
std::exception_ptr exception_; // For exceptions
std::coroutine_handle<> continuation_; // For awaiting coroutine
thread_pool_executor* executor_ = &global_executor; // Default executor
Task get_return_object() {
return Task{std::coroutine_handle<promise_type>::from_promise(*this)};
}
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept {
// If there's a continuation, schedule it on the executor
if (continuation_) {
executor_->schedule(continuation_);
}
return {};
}
void unhandled_exception() { exception_ = std::current_exception(); }
void return_value(T value) { value_ = std::move(value); }
void return_void() { /* for Task<void> */ }
};
std::coroutine_handle<promise_type> handle_;
Task(std::coroutine_handle<promise_type> h) : handle_(h) {}
Task(Task&& other) noexcept : handle_(std::exchange(other.handle_, nullptr)) {}
Task& operator=(Task&& other) noexcept {
if (this != &other) {
if (handle_) handle_.destroy();
handle_ = std::exchange(other.handle_, nullptr);
}
return *this;
}
~Task() { if (handle_) handle_.destroy(); }
// Awaiter interface
bool await_ready() { return handle_.done(); }
void await_suspend(std::coroutine_handle<> awaiting_coroutine) {
handle_.promise().continuation_ = awaiting_coroutine; // Store who to resume
global_executor.schedule(handle_); // Schedule this task to run on the pool
}
T await_resume() {
if (handle_.promise().exception_) {
std::rethrow_exception(handle_.promise().exception_);
}
if constexpr (!std::is_void_v<T>) {
return std::move(handle_.promise().value_);
}
}
// Blocking wait for the task to complete (for debugging/testing)
void join() {
while (!handle_.done()) {
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // Busy wait, not ideal
// In a real system, you'd have a mechanism to wait for a specific task completion
// like a future or a condition variable associated with the task.
// For now, we rely on the executor eventually finishing the task.
}
// Check for exception after task completion
if (handle_.promise().exception_) {
std::rethrow_exception(handle_.promise().exception_);
}
}
};
// Awaitable for simulating asynchronous delay
struct Delay {
std::chrono::milliseconds duration_;
Delay(std::chrono::milliseconds d) : duration_(d) {}
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<> h) {
std::thread([h, d = duration_, exec = &global_executor]() {
std::this_thread::sleep_for(d);
exec->schedule(h); // Schedule continuation on the executor
}).detach();
}
void await_resume() const {}
};
// --- 4. task_group: The Structured Concurrency Core ---
class task_group {
cancellation_source cancellation_src_;
std::vector<Task<void>> tasks_; // Store tasks as void for simplicity
std::vector<std::exception_ptr> exceptions_;
std::mutex mtx_; // For protecting exceptions_ and tasks_ in join_all
public:
task_group() = default;
// Disallow copy and move for safety with RAII
task_group(const task_group&) = delete;
task_group& operator=(const task_group&) = delete;
task_group(task_group&&) = delete;
task_group& operator=(task_group&&) = delete;
~task_group() noexcept(false) {
try {
join_all(); // Wait for all tasks to complete or be cancelled
} catch (...) {
// Rethrow the collected exceptions
// Note: If multiple exceptions are collected, only the first one is rethrown here.
// A more robust solution might use std::nested_exception or a custom aggregate exception.
if (!exceptions_.empty()) {
std::rethrow_exception(exceptions_[0]);
}
throw; // Re-throw if join_all itself throws something unexpected
}
}
// Spawn a new task within this group
// Note: This simplified version only accepts Task<void>
// A more generic version would return Task<T> and collect results.
template<typename F, typename... Args>
void spawn(F&& f, Args&&... args) {
std::unique_lock<std::mutex> lock(mtx_);
// Create a task that captures the cancellation token and runs the user's coroutine
tasks_.emplace_back(
[&, coroutine_func = std::forward<F>(f), ...captured_args = std::forward<Args>(args)]() -> Task<void> {
try {
co_await coroutine_func(cancellation_src_.get_token(), captured_args...);
} catch (const std::exception& e) {
std::cerr << "Task caught exception: " << e.what() << std::endl;
// Store the exception for the parent to handle
std::unique_lock<std::mutex> exc_lock(mtx_);
exceptions_.push_back(std::current_exception());
cancellation_src_.request_cancellation(); // Cancel other tasks on first error
} catch (...) {
std::cerr << "Task caught unknown exception." << std::endl;
std::unique_lock<std::mutex> exc_lock(mtx_);
exceptions_.push_back(std::current_exception());
cancellation_src_.request_cancellation();
}
co_return;
}()
);
// We don't need to explicitly schedule the 'wrapper' Task,
// as its initial_suspend will cause it to be picked up by the executor
// when 'await_suspend' is called internally.
// For the outermost task_group, we might need an initial resume if initial_suspend() is std::suspend_never.
// Here, Task's initial_suspend is std::suspend_always, so it needs to be awaited or resumed.
// We are implicitly awaiting it by putting it into the vector and letting its await_suspend
// be called when we try to await it. This is a bit tricky with Task<void> directly.
// A better design might have spawn return a future or explicitly schedule.
// For this example, we'll ensure the `Task`s are eventually run via `join_all`.
}
void join_all() {
// Request cancellation for any tasks still running
cancellation_src_.request_cancellation();
// Wait for all tasks to complete
for (auto& task : tasks_) {
try {
task.join(); // Blocking wait for each task
} catch (const std::exception& e) {
// This catch block will catch exceptions from tasks that didn't use the
// cancellation token or had a bug.
// Exceptions propagated via `task.promise().exception_` are handled by `task.join()`.
// This is mostly a safeguard.
std::unique_lock<std::mutex> lock(mtx_);
exceptions_.push_back(std::current_exception());
}
}
// Re-throw the first collected exception if any
if (!exceptions_.empty()) {
std::rethrow_exception(exceptions_[0]);
}
}
};
// Example Coroutine for task_group
Task<void> my_concurrent_work(int id, cancellation_token token) {
std::cout << "Task " << id << " started on thread " << std::this_thread::get_id() << std::endl;
for (int i = 0; i < 5; ++i) {
co_await Delay(std::chrono::milliseconds(100));
token.throw_if_cancellation_requested(); // Check for cancellation
std::cout << "Task " << id << " working iteration " << i << " on thread " << std::this_thread::get_id() << std::endl;
if (id == 2 && i == 2) {
throw std::runtime_error("Task 2 failed prematurely!");
}
}
std::cout << "Task " << id << " finished." << std::endl;
co_return;
}
Task<void> parent_task() {
std::cout << "Parent task started." << std::endl;
try {
task_group tg;
tg.spawn(my_concurrent_work, 1);
tg.spawn(my_concurrent_work, 2); // This one will fail
tg.spawn(my_concurrent_work, 3);
tg.spawn(my_concurrent_work, 4);
// tg's destructor will call join_all()
// and handle exceptions/cancellation
} catch (const std::exception& e) {
std::cerr << "Parent task caught exception from task_group: " << e.what() << std::endl;
}
std::cout << "Parent task finished." << std::endl;
co_return;
}
int main() {
std::cout << "Main thread " << std::this_thread::get_id() << std::endl;
Task<void> root_task = parent_task();
root_task.join(); // Run the parent task until completion
std::cout << "All tasks completed." << std::endl;
return 0;
}
代码解析与改进思考:
cancellation_source/cancellation_token: 实现了基本的取消机制。cancellation_token::Awaiter允许协程通过co_await token;来暂停,直到取消被请求。thread_pool_executor: 一个简单的线程池,用于调度协程。在实际生产环境中,这会是一个更复杂的执行器,可能支持优先级、调度策略等。Task<T>: 协程的返回类型。它的promise_type负责捕获异常,并在final_suspend处调度后续协程。await_suspend中,我们将当前Task调度到线程池运行。join()方法是阻塞的,用于等待Task完成,并重新抛出内部异常。task_group:- RAII 风格:
task_group的生命周期与其作用域绑定。在析构函数中,它会调用join_all()。 spawn方法: 接受一个可调用对象(通常是 lambda 表达式),该 lambda 内部会调用用户提供的协程,并捕获cancellation_token。它还包含了异常处理逻辑,将子任务的异常存储起来,并在第一个异常发生时请求取消所有其他任务。join_all(): 这是一个阻塞方法,它遍历所有Task并调用它们的join()方法,等待它们完成。它还会检查并重新抛出任何收集到的异常。- 异常处理策略: 这里采用了“第一个失败者获胜”的策略,即当一个任务失败时,立即取消所有其他任务,并在
task_group析构时重新抛出第一个异常。更复杂的策略可以聚合所有异常。 - 线程安全性:
tasks_和exceptions_需要mtx_保护,尤其是在spawn和join_all之间存在并发访问时。
- RAII 风格:
局限性与改进空间:
Task::join()的忙等待: 当前的Task::join()使用std::this_thread::sleep_for进行忙等待,效率低下。一个更优的实现会使用条件变量或std::future来实现真正的阻塞等待,或者更好的,提供一个非阻塞的co_await task;接口。task_group::spawn的返回类型: 目前spawn仅接受返回Task<void>的协程。如果需要收集不同类型的返回值,task_group需要更复杂的模板和存储机制(例如std::vector<std::any_task>或类型擦除)。- 异常聚合: 多个子任务可能同时失败。目前的
task_group只重新抛出第一个捕获到的异常。一个更完善的方案可能需要一个自定义的aggregate_exception类型。 - 调度细节:
thread_pool_executor比较基础。实际应用中,调度器可能需要更复杂的任务队列、优先级、亲和性等。 - 所有权语义:
task_group目前不支持移动语义,这在某些场景下可能不方便。 Task::await_suspend的实现: 目前Task的await_suspend只是简单地将自己调度到线程池,并设置continuation_。一个更完整的Task实现需要一个回调机制,在Task真正完成时通知其continuation_恢复。
进阶议题与未来展望
结构化并发在 C++ 中仍处于演进阶段。
- 语言和标准库的演进:
- C++ 标准委员会正在积极讨论在未来的版本中(例如 C++26 或 C++29)引入结构化并发的语言特性或标准库组件。例如,提案 P2300R7
std::execution旨在提供统一的执行器和调度器模型,这为结构化并发提供了更坚实的基础。 - 社区对
std::task,std::async_scope等概念的讨论,表明了对标准库结构化并发支持的强烈需求。
- C++ 标准委员会正在积极讨论在未来的版本中(例如 C++26 或 C++29)引入结构化并发的语言特性或标准库组件。例如,提案 P2300R7
- 现有库:
- Boost.Asio: 作为 C++ 异步编程的先行者,Boost.Asio 的
co_spawn函数结合use_awaitable已经提供了类似结构化并发的强大能力。它允许你在一个io_context上启动多个协程,并在父协程中co_await这些子协程,其生命周期和错误传播都得到了很好的管理。 - Folly (Meta): Meta 的开源库 Folly 提供了
folly::coro::collectAll,folly::coro::collectAny等工具,以及folly::Executor抽象,这些都非常符合结构化并发的思想。 - Seastar: 高性能异步框架 Seastar 也内建了其自己的协程和任务管理机制,其设计哲学与结构化并发高度契合。
- Boost.Asio: 作为 C++ 异步编程的先行者,Boost.Asio 的
理解这些库的设计模式和原则,可以帮助我们更好地设计自己的结构化并发抽象,并为未来 C++ 标准库的演进做好准备。
结论性思考
结构化并发为 C++ 中的异步和并发编程带来了急需的秩序和可预测性。通过将并发操作与清晰的作用域和生命周期绑定,它极大地提升了代码的健壮性、可维护性和可理解性。虽然 C++ 标准库目前尚未提供开箱即用的结构化并发支持,但通过精心设计的抽象和模式,如我们今天探讨的 task_group,开发者可以构建出强大而可靠的并发系统。理解并采纳结构化并发的原则,将是构建现代 C++ 高性能、高并发应用的关键。它不仅简化了错误处理和取消逻辑,更重要的是,它改变了我们思考并发程序的方式,使其从复杂的、容易出错的“并发线程”转变为更易于推理和管理的“并发作用域”。