各位同学,大家下午好。
今天,我们将深入探讨C++并发编程领域一个日益重要且充满变革性潜力的概念——“结构化并发”(Structured Concurrency),并结合C++23中备受期待的std::execution (P2300) 提案,解析其背后的调度哲学和实践意义。
在现代软件系统中,并发已是无处不在的需求。从响应灵敏的用户界面到高吞吐量的服务器,再到利用多核硬件的计算密集型任务,我们都离不开并发。然而,并发编程的复杂性也常常令人望而却步,它充满了竞态条件、死锁、资源泄漏和难以追踪的错误。结构化并发正是为了应对这些挑战而生。
1. 并发编程的困境与结构化并发的崛起
让我们从并发编程的常见问题开始。想象一下,你正在编写一个需要同时执行多个独立任务的程序:例如,从多个网络源下载数据,处理图像的不同区域,或并行计算一个大型矩阵的不同部分。
传统的并发模型,如直接使用std::thread,要求程序员手动管理线程的生命周期。如果你启动了一个线程,但忘记join()或detach()它,程序可能会崩溃或泄漏资源。更糟的是,detach()一个线程后,它就变成了“野马”,其生命周期与父线程完全解耦。如果它访问了父线程栈上的数据,而父线程已经退出,那么就会导致未定义行为。
std::future和std::promise提供了一种传递结果和异常的机制,但它们本质上是关于“值”的,而不是关于“任务生命周期”的。它们没有提供一种自然的方式来表达一组相关任务的层次结构,也没有自动的错误传播或取消机制。std::async虽然可以返回std::future,但其默认的launch::deferred策略可能导致任务根本不执行,或者在get()时才执行,这与真正的并发背道而驰。
这些“非结构化”的并发操作带来了一系列痛点:
- 资源泄漏: 如果任务崩溃或被忽略,相关资源(如线程、文件句柄、内存)可能无法及时释放。
- 错误处理复杂: 异常在线程边界上难以可靠传播。你可能需要手动捕获异常,并通过某种队列或共享状态将其传递回主线程。
- 取消操作困难: 中断一个正在运行的并发任务通常需要复杂的协同机制(如检查原子标志、使用条件变量或
std::stop_token),并且需要任务本身进行配合。 - 生命周期管理混乱: 难以判断一个并发操作何时真正完成,或者其结果何时可用。
- 调试困难: 缺乏清晰的调用栈和执行路径,使得定位并发问题成为噩梦。
结构化并发正是为了解决这些问题而提出的一种编程范式。其核心思想是将并发操作视为类似于结构化控制流(如if/else语句、for循环、函数调用)的构造。
结构化并发的几个关键原则包括:
- 父子关系(Parent-Child Relationship): 每个并发任务都由一个“父”操作启动,并在该父操作的明确作用域内运行。父操作通常会在其所有子操作完成(或被取消)后才退出。
- 明确的生命周期(Clear Lifecycle): 并发任务的开始和结束时间点与其父操作的生命周期紧密关联。当父操作的范围结束时,所有子操作要么已经完成,要么被取消。
- 资源自动管理(Automatic Resource Management): 伴随并发任务启动的资源(如线程、内存)与任务的生命周期绑定,并在任务完成或取消时自动释放。
- 错误和取消传播(Error and Cancellation Propagation): 错误和取消请求能够沿着父子层次结构可靠地传播,避免任务“迷失”或挂起。
我们可以将结构化并发类比为函数调用。当你调用一个函数时,你知道它将在当前函数的栈帧中执行,并且在它返回之前,当前函数不会继续。如果被调用函数抛出异常,异常会沿着调用栈传播。结构化并发旨在将这种清晰的语义带入并发领域。
C++20引入的std::jthread是一个很好的起点,它通过RAII(资源获取即初始化)机制自动在析构时join()线程,并提供了一个默认的std::stop_token用于协作式取消。这解决了std::thread的一些痛点,使其更接近结构化并发的思想。然而,std::jthread仍然是线程(而不是任务)中心的,对于复杂的异步工作流和异构执行环境,它显得力不从心。
2. Senders/Receivers模型:P2300的核心思想
为了实现更强大、更灵活的结构化并发,C++社区一直在探索新的模型。std::execution提案(P2300,通常被称为Sender/Receiver模型)正是这一探索的成果,它被认为是C++并发编程的未来基石。
Sender/Receiver模型的核心哲学是:
- 解耦(Decoupling): 将“做什么”(任务逻辑)、“在哪里做”(调度器/执行器)和“如何处理结果”(接收器)彻底分离。
- 声明式编程(Declarative Programming): 你描述你想要执行的操作序列,而不是命令式地控制线程。
- 非阻塞(Non-blocking): 大多数操作是非阻塞的,允许高效地利用I/O和计算资源。
- 可组合性(Composability): 简单的操作可以像乐高积木一样组合成复杂的异步工作流。
- 背压(Backpressure): 通过显式的连接和启动机制,可以自然地管理生产者和消费者之间的速率。
让我们深入了解这个模型中的几个核心概念:
2.1 Sender(发送器)
Sender是一个描述异步操作的类型。它本身不执行任何工作,只是一个“食谱”或者“蓝图”。Sender告诉我们:当这个操作完成时,它会产生什么类型的值,或者它可能抛出什么类型的错误,或者它可能被取消。
Sender通过提供以下三种信号之一来完成其工作:
set_value(Receiver, Args...):操作成功完成,并产生零个或多个值。set_error(Receiver, Error):操作失败,并产生一个错误。set_stopped(Receiver):操作被取消。
Sender是可组合的。你可以使用各种适配器(如then、transfer、when_all)将多个Sender连接起来,形成一个复杂的异步操作图。
2.2 Receiver(接收器)
Receiver是一个等待异步操作结果的类型。它是一个回调接口,由Sender在操作完成后调用。Receiver必须能够处理Sender发出的三种信号:set_value、set_error和set_stopped。
Receiver通常由库内部实现,或者由用户自定义以处理特定结果。当我们将一个Sender连接到一个Receiver时,我们实际上是在告诉Sender:“当你完成时,请调用这个Receiver的相应方法。”
2.3 Scheduler(调度器)
Scheduler是一个抽象,代表一个执行上下文,例如一个线程池、一个事件循环、一个I/O完成端口或一个GPU队列。Scheduler知道如何在它所代表的上下文中安排工作。
Scheduler通过提供一个或多个“调度器Sender”来与模型集成。这些调度器Sender通常只做一件事:将后续操作转移到该调度器的上下文中执行。例如,on(my_scheduler, some_sender)将确保some_sender(或其后续操作)在my_scheduler上执行。
2.4 Operation State(操作状态)
当一个Sender和一个Receiver通过connect(Sender, Receiver)操作连接在一起时,它们会产生一个Operation State对象。这个对象代表了异步操作的“活动状态”。它包含了Sender和Receiver的内部状态,以及执行操作所需的所有信息。
Operation State是“被动”的。它只是一个状态机,直到你调用它的start()方法,实际的异步工作才会开始。一旦工作开始,Operation State就会管理整个异步操作的生命周期,直到它调用Receiver的set_value、set_error或set_stopped方法为止。
Operation State的重要性在于:
- 它是管理异步任务生命周期的RAII句柄。它的存在意味着任务正在进行或等待开始。
- 它的析构可以被用来触发任务的取消(如果任务支持的话)。
- 它封装了任务的内部状态,使得任务可以是非阻塞和可恢复的。
2.5 调度哲学:拉式(Pull-based)而非推式(Push-based)
传统的基于回调或std::async的模型往往是“推式”的:你启动一个任务,它在某个时候完成,然后“推”出结果。这种模型在复杂场景下容易导致“回调地狱”,并且难以控制任务的执行上下文和资源。
Sender/Receiver模型是“拉式”的:你构建一个Sender链,但它什么都不做,直到你显式地连接一个Receiver并调用start()。Receiver是主动的,它“拉”取Sender的最终结果。这种拉式模型带来了巨大的灵活性:
- 延迟执行: Sender可以被构建和传递,但只有在需要时才执行。
- 执行上下文控制: 可以精确地控制任务的哪一部分在哪个调度器上运行。
- 资源管理: Operation State的生命周期与执行密切相关,使得资源管理更加自动化和可靠。
3. std::execution (P2300) 如何实现结构化并发
现在,让我们结合P2300中的具体适配器,看看它如何将这些概念转化为结构化并发的实践。
3.1 核心概念的抽象表示
在P2300中,这些概念都是通过C++的Concept(概念)来实现的,这意味着它们是编译期检查的接口,而不是具体基类。
// 简化概念示意
namespace std::execution {
// Sender概念:描述一个异步操作
template <typename S>
concept sender = requires(S s) {
// S必须可连接到一个Receiver,产生一个operation_state
{ connect(std::move(s), receiver_of<S>{}) } -> operation_state;
// ... 还需要其他成员类型和特征,如value_types, error_types, sends_stopped
};
// Receiver概念:接收异步操作的结果
template <typename R, typename S>
concept receiver_of = requires(R r) {
// 必须能接收S的value_types
// set_value(r, Args...)
// set_error(r, Error)
// set_stopped(r)
// ...
};
// Scheduler概念:提供执行上下文
template <typename Sch>
concept scheduler = requires(Sch sch) {
// 必须能产生一个调度器Sender
{ schedule(sch) } -> sender;
};
// Operation State概念:连接Sender和Receiver后产生的活动状态
template <typename Op>
concept operation_state = requires(Op op) {
{ start(op) } -> void; // 必须能被启动
};
} // namespace std::execution
3.2 结构化并发的关键适配器
P2300提供了一系列强大的“适配器”(或称“算法”),用于组合和转换Sender。正是这些适配器,使得结构化并发成为可能。
a) just(...):创建立即完成的Sender
just创建一个立即调用set_value的Sender。它常用于作为异步链的起点。
#include <iostream>
#include <string>
#include <std_execution> // 假设已包含P2300头文件
namespace ex = std::execution;
// 简单的同步等待函数,用于演示
template <ex::sender S>
auto sync_wait(S&& s) {
// 实际的std::execution::sync_wait 会更复杂,这里仅作演示
// 它会创建一个临时的Receiver和operation_state,在当前线程阻塞等待
// 直到S完成,然后返回结果。
std::optional<typename ex::sender_traits<S>::value_types::template at<0>> result;
std::exception_ptr error;
bool stopped = false;
struct my_receiver {
std::optional<typename ex::sender_traits<S>::value_types::template at<0>>& result_;
std::exception_ptr& error_;
bool& stopped_;
std::condition_variable cv_;
std::mutex m_;
bool done_ = false;
void set_value(typename ex::sender_traits<S>::value_types::template at<0> val) {
std::lock_guard lk(m_);
result_.emplace(std::move(val));
done_ = true;
cv_.notify_one();
}
void set_error(std::exception_ptr e) {
std::lock_guard lk(m_);
error_ = e;
done_ = true;
cv_.notify_one();
}
void set_stopped() {
std::lock_guard lk(m_);
stopped_ = true;
done_ = true;
cv_.notify_one();
}
void wait() {
std::unique_lock lk(m_);
cv_.wait(lk, [&]{ return done_; });
}
};
my_receiver r{result, error, stopped};
auto op_state = ex::connect(std::forward<S>(s), r);
ex::start(op_state);
r.wait();
if (error) {
std::rethrow_exception(error);
}
if (stopped) {
throw std::runtime_error("Operation was stopped.");
}
return *result;
}
void demo_just() {
std::cout << "--- Demo: just ---" << std::endl;
auto s1 = ex::just(42);
int value = sync_wait(s1);
std::cout << "just(42) resulted in: " << value << std::endl; // 输出 42
auto s2 = ex::just("Hello", 123); // 可以发送多个值
std::tuple<const char*, int> values = sync_wait(s2);
std::cout << "just("Hello", 123) resulted in: "
<< std::get<0>(values) << ", " << std::get<1>(values) << std::endl;
}
b) then(Sender, F):链式操作
then是构建异步操作链的基础。当前一个Sender完成并产生值后,then会调用提供的函数F,F的返回值将作为下一个Sender的值。如果前一个Sender产生错误或被取消,F不会被调用,错误/取消信号会继续向下传播。
void demo_then() {
std::cout << "--- Demo: then ---" << std::endl;
auto s = ex::just(10)
| ex::then([](int x) {
std::cout << "First then: " << x << std::endl;
return x + 5;
})
| ex::then([](int x) {
std::cout << "Second then: " << x << std::endl;
return std::to_string(x * 2);
});
std::string result = sync_wait(s);
std::cout << "Final result: " << result << std::endl; // 输出 "30"
}
c) transfer(Sender, Scheduler) / on(Scheduler, Sender):切换执行上下文
这些适配器允许你在异步操作链的特定点切换到不同的调度器。transfer将前一个Sender的执行上下文(如果需要)转移到指定的调度器,并确保后续操作在该调度器上执行。on是另一个类似的适配器,它将一个Sender包装起来,以便它在给定的调度器上执行。
// 假设我们有模拟的调度器
struct my_thread_pool_scheduler {
// 实际实现会管理一个线程池
auto schedule() const {
// 返回一个Sender,它会在线程池中安排后续任务
// 简化表示
return ex::just(); // 实际会返回一个在线程池中执行的Sender
}
};
void demo_transfer_on() {
std::cout << "--- Demo: transfer/on ---" << std::endl;
my_thread_pool_scheduler tp_scheduler;
auto s = ex::just("Start")
| ex::then([](const char* s_val) {
std::cout << "Initial work on current thread: " << s_val << std::endl;
return std::string(s_val) + " -> Transferred";
})
| ex::transfer(tp_scheduler) // 将后续操作转移到线程池
| ex::then([](std::string s_val) {
// 模拟在线程池中执行
std::cout << "Work on thread pool: " << s_val << std::endl;
return s_val + " -> Done";
});
std::string result = sync_wait(s);
std::cout << "Final result (back on main thread after sync_wait): " << result << std::endl;
}
通过transfer或on,我们可以清晰地定义任务的哪个部分在哪个执行上下文运行,这对于优化性能和管理资源至关重要。
d) when_all(Sender1, Sender2, ...):并行执行并等待所有任务完成
when_all是实现结构化并发的关键适配器之一。它接收多个Sender,并返回一个新Sender。这个新Sender只有在所有输入Sender都成功完成后才会完成,并产生一个包含所有结果的std::tuple。如果任何一个输入Sender失败或被取消,when_all会立即停止所有未完成的Sender,并传播第一个遇到的错误或取消信号。
这完美地体现了父子关系:when_all是父操作,它启动并等待其所有子操作完成。
#include <thread>
#include <chrono>
void demo_when_all() {
std::cout << "--- Demo: when_all ---" << std::endl;
// 模拟一个耗时任务
auto long_task = ex::just(1)
| ex::then([](int x) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "Long task done." << std::endl;
return x + 10;
});
// 模拟另一个耗时任务
auto short_task = ex::just("Hello")
| ex::then([](const char* s) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
std::cout << "Short task done." << std::endl;
return std::string(s) + " World";
});
// 同时启动这两个任务,并等待它们都完成
auto combined_task = ex::when_all(long_task, short_task)
| ex::then([](std::tuple<int, std::string> results) {
std::cout << "All tasks finished. Processing results..." << std::endl;
int num_res = std::get<0>(results);
std::string str_res = std::get<1>(results);
return "Combined: " + std::to_string(num_res) + " and " + str_res;
});
std::string final_message = sync_wait(combined_task);
std::cout << "Final combined message: " << final_message << std::endl;
}
在这个例子中,combined_task是父操作,long_task和short_task是子操作。combined_task会等待两个子任务并行执行并完成,然后才继续执行其then回调。如果long_task或short_task中的任何一个抛出错误,when_all会捕获该错误,并取消另一个未完成的任务(如果可能),然后将错误传播下去。
e) upon_error(Sender, F) / upon_stopped(Sender, F):错误和取消处理
这些适配器允许你在异步链中插入错误恢复逻辑或取消处理逻辑。
void demo_error_handling() {
std::cout << "--- Demo: error handling ---" << std::endl;
auto fallible_task = ex::just(true)
| ex::then([](bool succeed) -> ex::sender auto {
if (succeed) {
std::cout << "Fallible task succeeded." << std::endl;
return ex::just(42);
} else {
std::cout << "Fallible task failed." << std::endl;
// ex::just_error 返回一个立即调用 set_error 的 Sender
return ex::just_error(std::make_exception_ptr(std::runtime_error("Something went wrong!")));
}
});
auto recovery_task = fallible_task
| ex::then([](int value) {
std::cout << "Value received: " << value << std::endl;
return value;
})
| ex::upon_error([](std::exception_ptr eptr) {
try {
std::rethrow_exception(eptr);
} catch (const std::runtime_error& e) {
std::cerr << "Caught error: " << e.what() << ". Recovering..." << std::endl;
return ex::just(-1); // 恢复,返回一个默认值
}
});
int result1 = sync_wait(recovery_task | ex::let_value([](int x){ return ex::just(x); })); // 成功路径
std::cout << "Result after potential recovery (succeed): " << result1 << std::endl;
// 模拟失败路径
auto fallible_task_fail = ex::just(false) // 传入false使其失败
| ex::then([](bool succeed) -> ex::sender auto {
if (succeed) {
return ex::just(42);
} else {
return ex::just_error(std::make_exception_ptr(std::runtime_error("Something went wrong!")));
}
});
auto recovery_task_fail = fallible_task_fail
| ex::then([](int value) {
std::cout << "Value received: " << value << std::endl;
return value;
})
| ex::upon_error([](std::exception_ptr eptr) {
try {
std::rethrow_exception(eptr);
} catch (const std::runtime_error& e) {
std::cerr << "Caught error: " << e.what() << ". Recovering..." << std::endl;
return ex::just(-1); // 恢复,返回一个默认值
}
});
int result2 = sync_wait(recovery_task_fail | ex::let_value([](int x){ return ex::just(x); })); // 失败路径,但被恢复
std::cout << "Result after potential recovery (fail): " << result2 << std::endl;
}
upon_error和upon_stopped提供了一种结构化的方式来处理异常和取消,而无需在每个回调中手动检查状态或使用try-catch块。它们允许在特定的点上集中处理这些控制流事件,并可以返回一个新的Sender来继续执行,从而实现错误恢复。
f) start_detached(Sender):非结构化但有用的火花
虽然P2300旨在推广结构化并发,但它也提供了start_detached用于“火与忘”(fire-and-forget)场景。这是一种非结构化的操作,因为它启动一个Sender,但不等待其完成,也不捕获其结果、错误或取消状态。它适用于那些你真的不关心结果的任务,例如发送日志消息。
void demo_start_detached() {
std::cout << "--- Demo: start_detached ---" << std::endl;
auto s = ex::just("Logging message")
| ex::then([](const char* msg) {
std::cout << "Detached task processing: " << msg << std::endl;
// 模拟一些耗时操作,主线程不会等待它
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "Detached task finished." << std::endl;
});
ex::start_detached(s); // 启动任务但不等待
std::cout << "Main thread continues immediately after start_detached." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 确保detached任务有机会执行
}
尽管start_detached是“非结构化”的,但它的存在是务实的。它允许程序员在明确知道后果的情况下,选择这种简单的并发模式。
3.3 P2300调度哲学总结
P2300的调度哲学可以用以下几点概括:
- 惰性执行(Lazy Evaluation): Sender只是描述,
connect创建operation_state,start才真正触发执行。这允许在调度前进行优化和转换。 - 细粒度控制(Fine-grained Control): 通过
transfer和on,程序员可以精确控制任务的每个阶段在哪个执行上下文运行。 - 层次化取消(Hierarchical Cancellation):
set_stopped信号可以沿着Sender链传播。父操作可以取消子操作,反之亦然,如果子操作被取消,父操作可以感知。这与std::stop_token机制天然契合。 - 统一的错误处理(Unified Error Handling): 异常通过
set_error信号传播,并且可以被upon_error统一处理。 - 资源管理作为一等公民(Resource Management as First-Class Citizen):
operation_state的RAII特性确保了异步操作的生命周期得到妥善管理,减少了资源泄漏的风险。 - 异构计算支持(Heterogeneous Computing Support): Schedulers可以抽象任何执行环境,从CPU线程池到GPU队列,为C++进入更广泛的异构计算领域铺平道路。
4. 结构化并发与P2300的优势
1. 消除回调地狱,提升代码可读性:
通过链式调用的适配器,异步代码可以写成接近同步代码的线性流程,避免了多层嵌套回调带来的复杂性。
2. 强大的错误和取消传播机制:
set_error和set_stopped提供了标准化的错误和取消信号,适配器(如when_all)能够自动处理这些信号,并在整个异步任务图中传播。这意味着你不再需要手动编写大量的错误检查和取消逻辑。
3. 资源管理的自动化和可靠性:
operation_state作为RAII句柄,确保了与异步任务相关的资源在任务完成、失败或取消时能够被自动清理。这极大地降低了资源泄漏的风险。
4. 易于组合和重用:
Sender是高度可组合的。你可以创建小的、独立的Sender,然后将它们组合成复杂的异步工作流,就像使用函数一样。这种模块化使得代码更易于维护和重用。
5. 灵活的调度策略:
通过Scheduler抽象,你可以轻松地将任务调度到不同的执行上下文,而无需改变任务本身的逻辑。这对于性能优化、负载均衡和异构计算至关重要。
6. 编译器优化潜力:
由于Sender/Receiver模型是声明式的,并且基于Concept和类型擦除(在需要时),编译器可以有更多的机会进行优化,例如避免不必要的内存分配、内联代码,甚至静态分析任务图。
7. 与C++20协程的协同:
虽然P2300可以独立使用,但它与C++20协程有着天然的协同潜力。协程可以用来实现复杂的Sender或Receiver,使得异步操作的内部逻辑更加简洁。P2300可以作为协程的调度器和结果传递机制。
5. 挑战与展望
尽管std::execution带来了诸多优势,但它也带来了一些挑战:
- 学习曲线: Sender/Receiver模型是一种全新的思维方式,需要时间来适应和掌握。
- 概念的复杂性: 尽管设计优雅,但其背后的概念(Concept、定制点查找等)对于初学者来说可能比较抽象。
- 调试: 异步任务的调试本身就比同步任务复杂,虽然结构化并发有所改善,但仍然需要新的工具和技术。
- 标准库生态的成熟: P2300只是一个基础框架,需要其他标准库组件(如
std::net、std::filesystem的异步版本)来充分发挥其潜力。
展望未来,随着std::execution在C++23中获得采纳并逐渐成熟,我们可以期待C++并发编程的范式将发生深刻的变革。它将不仅仅是关于“如何启动一个线程”,更是关于“如何优雅地编排复杂的异步操作流”。结构化并发将使我们的并发代码更健壮、更易于理解、更易于维护,从而释放C++在高性能和高并发领域更强大的潜力。
6. 总结与展望
结构化并发,以C++23 std::execution提案的Sender/Receiver模型为代表,为C++带来了前所未有的并发编程范式。它通过明确的任务生命周期、自动的资源管理以及强大的错误和取消传播机制,将并发操作提升到结构化控制流的水平,显著提高了异步代码的可靠性、可读性和可维护性。随着这一模型的普及,C++开发者将能够更自信地构建复杂、高效且健壮的并发系统。
演示代码示例 (需要 P2300 库支持):
#include <iostream>
#include <string>
#include <vector>
#include <thread>
#include <chrono>
#include <tuple>
#include <optional>
#include <exception>
#include <stdexcept>
#include <condition_variable>
#include <mutex>
// --- 模拟 P2300 概念和适配器 ---
// 实际的 P2300 实现会非常复杂,这里只提供一个极简的、能运行示例的骨架
// 完整且正确的 P2300 实现需要 extensive use of C++20 concepts,
// customized point lookups (CPOs), and type erasure/template metaprogramming.
namespace std::execution {
// Forward declarations for concepts and traits
template <typename S> struct sender_traits;
template <typename S> concept sender = requires(S s) {
typename sender_traits<S>::value_types;
typename sender_traits<S>::error_types;
{ sender_traits<S>::sends_stopped } -> std::same_as<bool>;
// More complex requirements would be here
};
template <typename R, typename... Args>
concept receiver_of_value = requires(R r, Args... args) {
{ r.set_value(std::forward<Args>(args)...) } -> std::same_as<void>;
};
template <typename R>
concept receiver_of_error = requires(R r, std::exception_ptr e) {
{ r.set_error(e) } -> std::same_as<void>;
};
template <typename R>
concept receiver_of_stopped = requires(R r) {
{ r.set_stopped() } -> std::same_as<void>;
};
template <typename R, typename S>
concept receiver = receiver_of_value<R, typename sender_traits<S>::value_types::template at<0>> &&
receiver_of_error<R> &&
receiver_of_stopped<R>;
template <typename Op>
concept operation_state = requires(Op op) {
{ op.start() } -> std::same_as<void>;
};
// Simplified type list for sender_traits
template<typename... Ts> struct type_list {};
template<typename... Ts> struct single_value_list { template<int I> using at = std::tuple_element_t<I, std::tuple<Ts...>>; };
// --- Simplified `connect` and `start` ---
template <sender S, typename R>
operation_state auto connect(S&& s, R&& r); // Defined later
void start(operation_state auto& op_state) {
op_state.start();
}
// --- `just` sender ---
template <typename... Args>
struct just_sender {
std::tuple<Args...> values_;
template <typename R>
struct operation_state_impl {
R receiver_;
std::tuple<Args...> values_;
void start() {
std::apply([&](auto&&... args) {
receiver_.set_value(std::forward<decltype(args)>(args)...);
}, values_);
}
};
template <typename R>
operation_state_impl<R> connect(R&& r) && {
return {std::forward<R>(r), std::move(values_)};
}
};
template <typename... Args>
struct sender_traits<just_sender<Args...>> {
using value_types = single_value_list<Args...>;
using error_types = type_list<>;
static constexpr bool sends_stopped = false;
};
template <typename... Args>
just_sender<Args...> just(Args... args) {
return {std::make_tuple(std::forward<Args>(args)...)};
}
// --- `then` adaptor ---
template <sender BaseSender, typename F>
struct then_sender {
BaseSender base_sender_;
F func_;
template <typename R>
struct operation_state_impl {
R receiver_;
F func_;
// Inner receiver for the base_sender_
struct inner_receiver {
operation_state_impl* outer_;
template <typename... Args>
void set_value(Args&&... args) {
try {
// Call the user function F
if constexpr (std::is_void_v<std::invoke_result_t<F, Args...>>) {
std::invoke(outer_->func_, std::forward<Args>(args)...);
outer_->receiver_.set_value(); // If F returns void, then_sender returns void
} else {
outer_->receiver_.set_value(std::invoke(outer_->func_, std::forward<Args>(args)...));
}
} catch (...) {
outer_->receiver_.set_error(std::current_exception());
}
}
void set_error(std::exception_ptr e) { outer_->receiver_.set_error(e); }
void set_stopped() { outer_->receiver_.set_stopped(); }
} inner_receiver_;
// Operation state for the base_sender_
std::remove_cvref_t<decltype(connect(std::declval<BaseSender>(), std::declval<inner_receiver>()))> base_op_state_;
operation_state_impl(R&& r, BaseSender&& s, F&& f)
: receiver_(std::forward<R>(r)), func_(std::forward<F>(f)), inner_receiver_{this},
base_op_state_(connect(std::forward<BaseSender>(s), std::move(inner_receiver_))) {}
void start() {
std::execution::start(base_op_state_);
}
};
template <typename R>
operation_state_impl<R> connect(R&& r) && {
return {std::forward<R>(r), std::move(base_sender_), std::move(func_)};
}
};
template <sender BaseSender, typename F>
struct sender_traits<then_sender<BaseSender, F>> {
// Determine the value type of the result
using base_value_types = typename sender_traits<BaseSender>::value_types;
using result_type = std::invoke_result_t<F, typename base_value_types::template at<0>>;
using value_types = single_value_list<result_type>;
using error_types = typename sender_traits<BaseSender>::error_types; // Error types from base, plus any F might throw
static constexpr bool sends_stopped = sender_traits<BaseSender>::sends_stopped;
};
template <sender BaseSender, typename F>
then_sender<BaseSender, F> then(BaseSender&& s, F&& f) {
return {std::forward<BaseSender>(s), std::forward<F>(f)};
}
// --- `transfer` adaptor (simplified, no actual scheduler implementation) ---
struct scheduler_base {}; // Just a tag for now
template <sender BaseSender, typename Scheduler>
struct transfer_sender {
BaseSender base_sender_;
Scheduler scheduler_;
template <typename R>
struct operation_state_impl {
R receiver_;
Scheduler scheduler_;
// Inner receiver for the base_sender_
struct inner_receiver {
operation_state_impl* outer_;
template <typename... Args>
void set_value(Args&&... args) {
// In a real implementation, this would enqueue a task on scheduler_
// and that task would call outer_->receiver_.set_value
outer_->receiver_.set_value(std::forward<Args>(args)...);
}
void set_error(std::exception_ptr e) { outer_->receiver_.set_error(e); }
void set_stopped() { outer_->receiver_.set_stopped(); }
} inner_receiver_;
std::remove_cvref_t<decltype(connect(std::declval<BaseSender>(), std::declval<inner_receiver>()))> base_op_state_;
operation_state_impl(R&& r, BaseSender&& s, Scheduler&& sch)
: receiver_(std::forward<R>(r)), scheduler_(std::forward<Scheduler>(sch)), inner_receiver_{this},
base_op_state_(connect(std::forward<BaseSender>(s), std::move(inner_receiver_))) {}
void start() {
// In a real implementation, the scheduler would be used here.
// For this demo, we just start the base immediately.
std::execution::start(base_op_state_);
}
};
template <typename R>
operation_state_impl<R> connect(R&& r) && {
return {std::forward<R>(r), std::move(base_sender_), std::move(scheduler_)};
}
};
template <sender BaseSender, typename Scheduler>
struct sender_traits<transfer_sender<BaseSender, Scheduler>> {
using value_types = typename sender_traits<BaseSender>::value_types;
using error_types = typename sender_traits<BaseSender>::error_types;
static constexpr bool sends_stopped = sender_traits<BaseSender>::sends_stopped;
};
template <sender BaseSender, typename Scheduler>
transfer_sender<BaseSender, Scheduler> transfer(BaseSender&& s, Scheduler&& sch) {
return {std::forward<BaseSender>(s), std::forward<Scheduler>(sch)};
}
// --- `when_all` adaptor ---
template <sender... Senders>
struct when_all_sender {
std::tuple<Senders...> senders_;
template <typename R>
struct operation_state_impl {
R receiver_;
std::tuple<Senders...> senders_;
std::tuple<typename sender_traits<Senders>::value_types::template at<0>...> results_;
std::atomic<int> completed_count_{0};
std::atomic<bool> cancelled_{false};
std::exception_ptr stored_exception_;
std::mutex m_; // For protecting shared state on completion
// Inner receiver for each sub-sender
template <int I>
struct inner_receiver_impl {
operation_state_impl* outer_;
template <typename... Args>
void set_value(Args&&... args) {
if (outer_->cancelled_.load()) return; // If parent already cancelled
std::get<I>(outer_->results_) = std::tuple<Args...>{std::forward<Args>(args)...};
if (++outer_->completed_count_ == sizeof...(Senders)) {
outer_->receiver_.set_value(std::move(outer_->results_));
}
}
void set_error(std::exception_ptr e) {
std::lock_guard lk(outer_->m_);
if (!outer_->cancelled_.exchange(true)) { // Only store the first error
outer_->stored_exception_ = e;
// In a real implementation, would try to cancel other ops
outer_->receiver_.set_error(e);
}
}
void set_stopped() {
std::lock_guard lk(outer_->m_);
if (!outer_->cancelled_.exchange(true)) { // Only propagate first stop
outer_->receiver_.set_stopped();
}
}
};
std::tuple<inner_receiver_impl<0>, inner_receiver_impl<1>...> inner_receivers_; // For two senders for simplicity
// Operation states for each sub-sender
std::tuple<
std::remove_cvref_t<decltype(connect(std::declval<Senders>(), std::declval<inner_receiver_impl<0>>()))>...
> base_op_states_;
operation_state_impl(R&& r, std::tuple<Senders...>&& s_tuple)
: receiver_(std::forward<R>(r)), senders_(std::move(s_tuple)),
inner_receivers_{inner_receiver_impl<0>{this}, inner_receiver_impl<1>{this}} // For 2 senders
{
// Connect each sender
// This part is tricky to generalize for N senders without recursion or boost.hana
base_op_states_ = std::make_tuple(
connect(std::get<0>(senders_), std::get<0>(inner_receivers_)),
connect(std::get<1>(senders_), std::get<1>(inner_receivers_))
);
}
void start() {
// Start all sub-senders
std::apply([](auto&... ops){ (std::execution::start(ops), ...); }, base_op_states_);
}
};
template <typename R>
operation_state_impl<R> connect(R&& r) && {
return {std::forward<R>(r), std::move(senders_)};
}
};
template <sender... Senders>
struct sender_traits<when_all_sender<Senders...>> {
using value_types = single_value_list<std::tuple<typename sender_traits<Senders>::value_types::template at<0>...>>;
using error_types = type_list<std::exception_ptr>; // Can propagate any error
static constexpr bool sends_stopped = (sender_traits<Senders>::sends_stopped || ...);
};
template <sender... Senders>
when_all_sender<Senders...> when_all(Senders&&... s) {
return {std::make_tuple(std::forward<Senders>(s)...)};
}
// --- `just_error` sender ---
struct just_error_sender {
std::exception_ptr error_;
template <typename R>
struct operation_state_impl {
R receiver_;
std::exception_ptr error_;
void start() {
receiver_.set_error(error_);
}
};
template <typename R>
operation_state_impl<R> connect(R&& r) && {
return {std::forward<R>(r), std::move(error_)};
}
};
template <>
struct sender_traits<just_error_sender> {
using value_types = single_value_list<>;
using error_types = type_list<std::exception_ptr>;
static constexpr bool sends_stopped = false;
};
just_error_sender just_error(std::exception_ptr e) {
return {std::move(e)};
}
// --- `upon_error` adaptor ---
template <sender BaseSender, typename F>
struct upon_error_sender {
BaseSender base_sender_;
F func_;
template <typename R>
struct operation_state_impl {
R receiver_;
F func_;
std::optional<std::remove_cvref_t<decltype(connect(std::declval<decltype(std::invoke(std::declval<F>(), std::declval<std::exception_ptr>()))>(), std::declval<R>()))>> recovery_op_state_;
struct inner_receiver {
operation_state_impl* outer_;
template <typename... Args> void set_value(Args&&... args) { outer_->receiver_.set_value(std::forward<Args>(args)...); }
void set_stopped() { outer_->receiver_.set_stopped(); }
void set_error(std::exception_ptr e) {
try {
// Call F to get a recovery sender
auto recovery_sender = std::invoke(outer_->func_, e);
outer_->recovery_op_state_.emplace(connect(std::move(recovery_sender), outer_->receiver_));
std::execution::start(*outer_->recovery_op_state_);
} catch (...) {
outer_->receiver_.set_error(std::current_exception()); // F itself threw
}
}
} inner_receiver_;
std::remove_cvref_t<decltype(connect(std::declval<BaseSender>(), std::declval<inner_receiver>()))> base_op_state_;
operation_state_impl(R&& r, BaseSender&& s, F&& f)
: receiver_(std::forward<R>(r)), func_(std::forward<F>(f)), inner_receiver_{this},
base_op_state_(connect(std::forward<BaseSender>(s), std::move(inner_receiver_))) {}
void start() {
std::execution::start(base_op_state_);
}
};
template <typename R>
operation_state_impl<R> connect(R&& r) && {
return {std::forward<R>(r), std::move(base_sender_), std::move(func_)};
}
};
template <sender BaseSender, typename F>
struct sender_traits<upon_error_sender<BaseSender, F>> {
using value_types = typename sender_traits<BaseSender>::value_types; // Same value types
using error_types = type_list<std::exception_ptr>; // F might throw, or the recovery sender might error
static constexpr bool sends_stopped = sender_traits<BaseSender>::sends_stopped;
};
template <sender BaseSender, typename F>
upon_error_sender<BaseSender, F> upon_error(BaseSender&& s, F&& f) {
return {std::forward<BaseSender>(s), std::forward<F>(f)};
}
// --- `let_value` adaptor (simplified, needed for chaining senders from `then` or `upon_error` that return senders) ---
template <sender BaseSender, typename F>
struct let_value_sender {
BaseSender base_sender_;
F func_;
template <typename R>
struct operation_state_impl {
R receiver_;
F func_;
std::optional<std::remove_cvref_t<decltype(connect(std::declval<decltype(std::invoke(std::declval<F>(), std::declval<typename sender_traits<BaseSender>::value_types::template at<0>>()))>(), std::declval<R>()))>> next_op_state_;
struct inner_receiver {
operation_state_impl* outer_;
template <typename... Args>
void set_value(Args&&... args) {
try {
auto next_sender = std::invoke(outer_->func_, std::forward<Args>(args)...);
outer_->next_op_state_.emplace(connect(std::move(next_sender), outer_->receiver_));
std::execution::start(*outer_->next_op_state_);
} catch (...) {
outer_->receiver_.set_error(std::current_exception());
}
}
void set_error(std::exception_ptr e) { outer_->receiver_.set_error(e); }
void set_stopped() { outer_->receiver_.set_stopped(); }
} inner_receiver_;
std::remove_cvref_t<decltype(connect(std::declval<BaseSender>(), std::declval<inner_receiver>()))> base_op_state_;
operation_state_impl(R&& r, BaseSender&& s, F&& f)
: receiver_(std::forward<R>(r)), func_(std::forward<F>(f)), inner_receiver_{this},
base_op_state_(connect(std::forward<BaseSender>(s), std::move(inner_receiver_))) {}
void start() {
std::execution::start(base_op_state_);
}
};
template <typename R>
operation_state_impl<R> connect(R&& r) && {
return {std::forward<R>(r), std::move(base_sender_), std::move(func_)};
}
};
template <sender BaseSender, typename F>
struct sender_traits<let_value_sender<BaseSender, F>> {
using next_sender_type = std::invoke_result_t<F, typename sender_traits<BaseSender>::value_types::template at<0>>;
using value_types = typename sender_traits<next_sender_type>::value_types;
using error_types = typename sender_traits<BaseSender>::error_types; // Plus error types from next_sender_type
static constexpr bool sends_stopped = sender_traits<BaseSender>::sends_stopped || sender_traits<next_sender_type>::sends_stopped;
};
template <sender BaseSender, typename F>
let_value_sender<BaseSender, F> let_value(BaseSender&& s, F&& f) {
return {std::forward<BaseSender>(s), std::forward<F>(f)};
}
// --- `start_detached` ---
template <sender S>
struct detached_receiver {
// Does nothing with the results
template <typename... Args> void set_value(Args&&...) {}
void set_error(std::exception_ptr) {
// In a real system, you'd log this error
std::cerr << "Detached task encountered an error!" << std::endl;
}
void set_stopped() {
std::cout << "Detached task was stopped." << std::endl;
}
};
template <sender S>
void start_detached(S&& s) {
// In a real system, op_state would be moved to a thread/task manager
// to manage its lifetime. Here, it's just a local variable.
// This is explicitly unstructured, so resource management is basic.
auto op_state = connect(std::forward<S>(s), detached_receiver<S>{});
start(op_state);
}
// --- Simplified `sync_wait` implementation ---
template <sender S>
auto sync_wait(S&& s) {
// Define a basic receiver for sync_wait
struct sync_wait_receiver {
std::optional<typename sender_traits<S>::value_types::template at<0>> result_storage;
std::exception_ptr error_storage;
bool stopped_flag = false;
std::condition_variable cv;
std::mutex m;
bool done = false;
template <typename... Args>
void set_value(Args&&... args) {
std::lock_guard lk(m);
result_storage.emplace(std::forward<Args>(args)...);
done = true;
cv.notify_one();
}
void set_error(std::exception_ptr e) {
std::lock_guard lk(m);
error_storage = e;
done = true;
cv.notify_one();
}
void set_stopped() {
std::lock_guard lk(m);
stopped_flag = true;
done = true;
cv.notify_one();
}
void wait() {
std::unique_lock lk(m);
cv.wait(lk, [this]{ return done; });
}
};
sync_wait_receiver r;
auto op_state = connect(std::forward<S>(s), r);
start(op_state);
r.wait();
if (r.error_storage) {
std::rethrow_exception(r.error_storage);
}
if (r.stopped_flag) {
throw std::runtime_error("Operation was stopped.");
}
return *r.result_storage;
}
// --- Simplified `connect` definition (requires types to be fully defined) ---
template <sender S, typename R>
operation_state auto connect(S&& s, R&& r) {
return std::forward<S>(s).connect(std::forward<R>(r));
}
} // namespace std::execution
// --- Pipe operator for chaining senders ---
template <std::execution::sender S, typename F>
auto operator|(S&& s, F&& f) {
return std::forward<F>(f)(std::forward<S>(s));
}
// --- Demo Functions ---
// (The demo functions from the lecture body are placed here)
void demo_just() {
std::cout << "--- Demo: just ---" << std::endl;
auto s1 = std::execution::just(42);
int value = std::execution::sync_wait(s1);
std::cout << "just(42) resulted in: " << value << std::endl;
auto s2 = std::execution::just("Hello", 123);
std::tuple<const char*, int> values = std::execution::sync_wait(s2);
std::cout << "just("Hello", 123) resulted in: "
<< std::get<0>(values) << ", " << std::get<1>(values) << std::endl;
}
void demo_then() {
std::cout << "--- Demo: then ---" << std::endl;
auto s = std::execution::just(10)
| std::execution::then([](int x) {
std::cout << "First then: " << x << std::endl;
return x + 5;
})
| std::execution::then([](int x) {
std::cout << "Second then: " << x << std::endl;
return std::to_string(x * 2);
});
std::string result = std::execution::sync_wait(s);
std::cout << "Final result: " << result << std::endl;
}
struct my_thread_pool_scheduler : std::execution::scheduler_base {
auto schedule() const {
return std::execution::just(); // Simplified, real one would schedule on a thread pool
}
};
void demo_transfer_on() {
std::cout << "--- Demo: transfer/on ---" << std::endl;
my_thread_pool_scheduler tp_scheduler;
auto s = std::execution::just("Start")
| std::execution::then([](const char* s_val) {
std::cout << "Initial work on current thread: " << s_val << std::endl;
return std::string(s_val) + " -> Transferred";
})
| std::execution::transfer(tp_scheduler)
| std::execution::then([](std::string s_val) {
std::cout << "Work on thread pool: " << s_val << std::endl;
return s_val + " -> Done";
});
std::string result = std::execution::sync_wait(s);
std::cout << "Final result (back on main thread after sync_wait): " << result << std::endl;
}
void demo_when_all() {
std::cout << "--- Demo: when_all ---" << std::endl;
auto long_task = std::execution::just(1)
| std::execution::then([](int x) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "Long task done." << std::endl;
return x + 10;
});
auto short_task = std::execution::just("Hello")
| std::execution::then([](const char* s) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
std::cout << "Short task done." << std::endl;
return std::string(s) + " World";
});
auto combined_task = std::execution::when_all(long_task, short_task)
| std::execution::then([](std::tuple<int, std::string> results) {
std::cout << "All tasks finished. Processing results..." << std::endl;
int num_res = std::get<0>(results);
std::string str_res = std::get<1>(results);
return "Combined: " + std::to_string(num_res) + " and " + str_res;
});
std::string final_message = std::execution::sync_wait(combined_task);
std::cout << "Final combined message: " << final_message << std::endl;
}
void demo_error_handling() {
std::cout << "--- Demo: error handling ---" << std::endl;
auto fallible_task_succeed = std::execution::just(true)
| std::execution::let_value([](bool succeed) -> std::execution::sender auto {
if (succeed) {
std::cout << "Fallible task succeeded." << std::endl;
return std::execution::just(42);
} else {
std::cout << "Fallible task failed." << std::endl;
return std::execution::just_error(std::make_exception_ptr(std::runtime_error("Something went wrong!")));
}
});
auto recovery_task_succeed = fallible_task_succeed
| std::execution::upon_error([](std::exception_ptr eptr) {
try {
std::rethrow_exception(eptr);
} catch (const std::runtime_error& e) {
std::cerr << "Caught error: " << e.what() << ". Recovering..." << std::endl;
return std::execution::just(-1);
}
});
int result1 = std::execution::sync_wait(recovery_task_succeed);
std::cout << "Result after potential recovery (succeed): " << result1 << std::endl;
auto fallible_task_fail = std::execution::just(false)
| std::execution::let_value([](bool succeed) -> std::execution::sender auto {
if (succeed) {
return std::execution::just(42);
} else {
return std::execution::just_error(std::make_exception_ptr(std::runtime_error("Something went wrong!")));
}
});
auto recovery_task_fail = fallible_task_fail
| std::execution::upon_error([](std::exception_ptr eptr) {
try {
std::rethrow_exception(eptr);
} catch (const std::runtime_error& e) {
std::cerr << "Caught error: " << e.what() << ". Recovering..." << std::endl;
return std::execution::just(-1);
}
});
int result2 = std::execution::sync_wait(recovery_task_fail);
std::cout << "Result after potential recovery (fail): " << result2 << std::endl;
}
void demo_start_detached() {
std::cout << "--- Demo: start_detached ---" << std::endl;
auto s = std::execution::just("Logging message")
| std::execution::then([](const char* msg) {
std::cout << "Detached task processing: " << msg << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "Detached task finished." << std::endl;
});
std::execution::start_detached(s);
std::cout << "Main thread continues immediately after start_detached." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
int main() {
demo_just();
std::cout << std::endl;
demo_then();
std::cout << std::endl;
demo_transfer_on();
std::cout << std::endl;
demo_when_all();
std::cout << std::endl;
demo_error_handling();
std::cout << std::endl;
demo_start_detached();
std::cout << std::endl;
return 0;
}