什么是 ‘Structured Concurrency’ 在 C++ 中的体现?解析 `std::execution` (P2300) 提案的调度哲学

各位同学,大家下午好。

今天,我们将深入探讨C++并发编程领域一个日益重要且充满变革性潜力的概念——“结构化并发”(Structured Concurrency),并结合C++23中备受期待的std::execution (P2300) 提案,解析其背后的调度哲学和实践意义。

在现代软件系统中,并发已是无处不在的需求。从响应灵敏的用户界面到高吞吐量的服务器,再到利用多核硬件的计算密集型任务,我们都离不开并发。然而,并发编程的复杂性也常常令人望而却步,它充满了竞态条件、死锁、资源泄漏和难以追踪的错误。结构化并发正是为了应对这些挑战而生。

1. 并发编程的困境与结构化并发的崛起

让我们从并发编程的常见问题开始。想象一下,你正在编写一个需要同时执行多个独立任务的程序:例如,从多个网络源下载数据,处理图像的不同区域,或并行计算一个大型矩阵的不同部分。

传统的并发模型,如直接使用std::thread,要求程序员手动管理线程的生命周期。如果你启动了一个线程,但忘记join()detach()它,程序可能会崩溃或泄漏资源。更糟的是,detach()一个线程后,它就变成了“野马”,其生命周期与父线程完全解耦。如果它访问了父线程栈上的数据,而父线程已经退出,那么就会导致未定义行为。

std::futurestd::promise提供了一种传递结果和异常的机制,但它们本质上是关于“值”的,而不是关于“任务生命周期”的。它们没有提供一种自然的方式来表达一组相关任务的层次结构,也没有自动的错误传播或取消机制。std::async虽然可以返回std::future,但其默认的launch::deferred策略可能导致任务根本不执行,或者在get()时才执行,这与真正的并发背道而驰。

这些“非结构化”的并发操作带来了一系列痛点:

  1. 资源泄漏: 如果任务崩溃或被忽略,相关资源(如线程、文件句柄、内存)可能无法及时释放。
  2. 错误处理复杂: 异常在线程边界上难以可靠传播。你可能需要手动捕获异常,并通过某种队列或共享状态将其传递回主线程。
  3. 取消操作困难: 中断一个正在运行的并发任务通常需要复杂的协同机制(如检查原子标志、使用条件变量或std::stop_token),并且需要任务本身进行配合。
  4. 生命周期管理混乱: 难以判断一个并发操作何时真正完成,或者其结果何时可用。
  5. 调试困难: 缺乏清晰的调用栈和执行路径,使得定位并发问题成为噩梦。

结构化并发正是为了解决这些问题而提出的一种编程范式。其核心思想是将并发操作视为类似于结构化控制流(如if/else语句、for循环、函数调用)的构造。

结构化并发的几个关键原则包括:

  • 父子关系(Parent-Child Relationship): 每个并发任务都由一个“父”操作启动,并在该父操作的明确作用域内运行。父操作通常会在其所有子操作完成(或被取消)后才退出。
  • 明确的生命周期(Clear Lifecycle): 并发任务的开始和结束时间点与其父操作的生命周期紧密关联。当父操作的范围结束时,所有子操作要么已经完成,要么被取消。
  • 资源自动管理(Automatic Resource Management): 伴随并发任务启动的资源(如线程、内存)与任务的生命周期绑定,并在任务完成或取消时自动释放。
  • 错误和取消传播(Error and Cancellation Propagation): 错误和取消请求能够沿着父子层次结构可靠地传播,避免任务“迷失”或挂起。

我们可以将结构化并发类比为函数调用。当你调用一个函数时,你知道它将在当前函数的栈帧中执行,并且在它返回之前,当前函数不会继续。如果被调用函数抛出异常,异常会沿着调用栈传播。结构化并发旨在将这种清晰的语义带入并发领域。

C++20引入的std::jthread是一个很好的起点,它通过RAII(资源获取即初始化)机制自动在析构时join()线程,并提供了一个默认的std::stop_token用于协作式取消。这解决了std::thread的一些痛点,使其更接近结构化并发的思想。然而,std::jthread仍然是线程(而不是任务)中心的,对于复杂的异步工作流和异构执行环境,它显得力不从心。

2. Senders/Receivers模型:P2300的核心思想

为了实现更强大、更灵活的结构化并发,C++社区一直在探索新的模型。std::execution提案(P2300,通常被称为Sender/Receiver模型)正是这一探索的成果,它被认为是C++并发编程的未来基石。

Sender/Receiver模型的核心哲学是:

  • 解耦(Decoupling): 将“做什么”(任务逻辑)、“在哪里做”(调度器/执行器)和“如何处理结果”(接收器)彻底分离。
  • 声明式编程(Declarative Programming): 你描述你想要执行的操作序列,而不是命令式地控制线程。
  • 非阻塞(Non-blocking): 大多数操作是非阻塞的,允许高效地利用I/O和计算资源。
  • 可组合性(Composability): 简单的操作可以像乐高积木一样组合成复杂的异步工作流。
  • 背压(Backpressure): 通过显式的连接和启动机制,可以自然地管理生产者和消费者之间的速率。

让我们深入了解这个模型中的几个核心概念:

2.1 Sender(发送器)

Sender是一个描述异步操作的类型。它本身不执行任何工作,只是一个“食谱”或者“蓝图”。Sender告诉我们:当这个操作完成时,它会产生什么类型的值,或者它可能抛出什么类型的错误,或者它可能被取消。

Sender通过提供以下三种信号之一来完成其工作:

  • set_value(Receiver, Args...):操作成功完成,并产生零个或多个值。
  • set_error(Receiver, Error):操作失败,并产生一个错误。
  • set_stopped(Receiver):操作被取消。

Sender是可组合的。你可以使用各种适配器(如thentransferwhen_all)将多个Sender连接起来,形成一个复杂的异步操作图。

2.2 Receiver(接收器)

Receiver是一个等待异步操作结果的类型。它是一个回调接口,由Sender在操作完成后调用。Receiver必须能够处理Sender发出的三种信号:set_valueset_errorset_stopped

Receiver通常由库内部实现,或者由用户自定义以处理特定结果。当我们将一个Sender连接到一个Receiver时,我们实际上是在告诉Sender:“当你完成时,请调用这个Receiver的相应方法。”

2.3 Scheduler(调度器)

Scheduler是一个抽象,代表一个执行上下文,例如一个线程池、一个事件循环、一个I/O完成端口或一个GPU队列。Scheduler知道如何在它所代表的上下文中安排工作。

Scheduler通过提供一个或多个“调度器Sender”来与模型集成。这些调度器Sender通常只做一件事:将后续操作转移到该调度器的上下文中执行。例如,on(my_scheduler, some_sender)将确保some_sender(或其后续操作)在my_scheduler上执行。

2.4 Operation State(操作状态)

当一个Sender和一个Receiver通过connect(Sender, Receiver)操作连接在一起时,它们会产生一个Operation State对象。这个对象代表了异步操作的“活动状态”。它包含了Sender和Receiver的内部状态,以及执行操作所需的所有信息。

Operation State是“被动”的。它只是一个状态机,直到你调用它的start()方法,实际的异步工作才会开始。一旦工作开始,Operation State就会管理整个异步操作的生命周期,直到它调用Receiver的set_valueset_errorset_stopped方法为止。

Operation State的重要性在于:

  • 它是管理异步任务生命周期的RAII句柄。它的存在意味着任务正在进行或等待开始。
  • 它的析构可以被用来触发任务的取消(如果任务支持的话)。
  • 它封装了任务的内部状态,使得任务可以是非阻塞和可恢复的。

2.5 调度哲学:拉式(Pull-based)而非推式(Push-based)

传统的基于回调或std::async的模型往往是“推式”的:你启动一个任务,它在某个时候完成,然后“推”出结果。这种模型在复杂场景下容易导致“回调地狱”,并且难以控制任务的执行上下文和资源。

Sender/Receiver模型是“拉式”的:你构建一个Sender链,但它什么都不做,直到你显式地连接一个Receiver并调用start()。Receiver是主动的,它“拉”取Sender的最终结果。这种拉式模型带来了巨大的灵活性:

  • 延迟执行: Sender可以被构建和传递,但只有在需要时才执行。
  • 执行上下文控制: 可以精确地控制任务的哪一部分在哪个调度器上运行。
  • 资源管理: Operation State的生命周期与执行密切相关,使得资源管理更加自动化和可靠。

3. std::execution (P2300) 如何实现结构化并发

现在,让我们结合P2300中的具体适配器,看看它如何将这些概念转化为结构化并发的实践。

3.1 核心概念的抽象表示

在P2300中,这些概念都是通过C++的Concept(概念)来实现的,这意味着它们是编译期检查的接口,而不是具体基类。

// 简化概念示意
namespace std::execution {

// Sender概念:描述一个异步操作
template <typename S>
concept sender = requires(S s) {
    // S必须可连接到一个Receiver,产生一个operation_state
    { connect(std::move(s), receiver_of<S>{}) } -> operation_state;
    // ... 还需要其他成员类型和特征,如value_types, error_types, sends_stopped
};

// Receiver概念:接收异步操作的结果
template <typename R, typename S>
concept receiver_of = requires(R r) {
    // 必须能接收S的value_types
    // set_value(r, Args...)
    // set_error(r, Error)
    // set_stopped(r)
    // ...
};

// Scheduler概念:提供执行上下文
template <typename Sch>
concept scheduler = requires(Sch sch) {
    // 必须能产生一个调度器Sender
    { schedule(sch) } -> sender;
};

// Operation State概念:连接Sender和Receiver后产生的活动状态
template <typename Op>
concept operation_state = requires(Op op) {
    { start(op) } -> void; // 必须能被启动
};

} // namespace std::execution

3.2 结构化并发的关键适配器

P2300提供了一系列强大的“适配器”(或称“算法”),用于组合和转换Sender。正是这些适配器,使得结构化并发成为可能。

a) just(...):创建立即完成的Sender

just创建一个立即调用set_value的Sender。它常用于作为异步链的起点。

#include <iostream>
#include <string>
#include <std_execution> // 假设已包含P2300头文件

namespace ex = std::execution;

// 简单的同步等待函数,用于演示
template <ex::sender S>
auto sync_wait(S&& s) {
    // 实际的std::execution::sync_wait 会更复杂,这里仅作演示
    // 它会创建一个临时的Receiver和operation_state,在当前线程阻塞等待
    // 直到S完成,然后返回结果。
    std::optional<typename ex::sender_traits<S>::value_types::template at<0>> result;
    std::exception_ptr error;
    bool stopped = false;

    struct my_receiver {
        std::optional<typename ex::sender_traits<S>::value_types::template at<0>>& result_;
        std::exception_ptr& error_;
        bool& stopped_;
        std::condition_variable cv_;
        std::mutex m_;
        bool done_ = false;

        void set_value(typename ex::sender_traits<S>::value_types::template at<0> val) {
            std::lock_guard lk(m_);
            result_.emplace(std::move(val));
            done_ = true;
            cv_.notify_one();
        }
        void set_error(std::exception_ptr e) {
            std::lock_guard lk(m_);
            error_ = e;
            done_ = true;
            cv_.notify_one();
        }
        void set_stopped() {
            std::lock_guard lk(m_);
            stopped_ = true;
            done_ = true;
            cv_.notify_one();
        }

        void wait() {
            std::unique_lock lk(m_);
            cv_.wait(lk, [&]{ return done_; });
        }
    };

    my_receiver r{result, error, stopped};
    auto op_state = ex::connect(std::forward<S>(s), r);
    ex::start(op_state);
    r.wait();

    if (error) {
        std::rethrow_exception(error);
    }
    if (stopped) {
        throw std::runtime_error("Operation was stopped.");
    }
    return *result;
}

void demo_just() {
    std::cout << "--- Demo: just ---" << std::endl;
    auto s1 = ex::just(42);
    int value = sync_wait(s1);
    std::cout << "just(42) resulted in: " << value << std::endl; // 输出 42

    auto s2 = ex::just("Hello", 123); // 可以发送多个值
    std::tuple<const char*, int> values = sync_wait(s2);
    std::cout << "just("Hello", 123) resulted in: "
              << std::get<0>(values) << ", " << std::get<1>(values) << std::endl;
}

b) then(Sender, F):链式操作

then是构建异步操作链的基础。当前一个Sender完成并产生值后,then会调用提供的函数FF的返回值将作为下一个Sender的值。如果前一个Sender产生错误或被取消,F不会被调用,错误/取消信号会继续向下传播。

void demo_then() {
    std::cout << "--- Demo: then ---" << std::endl;
    auto s = ex::just(10)
             | ex::then([](int x) {
                   std::cout << "First then: " << x << std::endl;
                   return x + 5;
               })
             | ex::then([](int x) {
                   std::cout << "Second then: " << x << std::endl;
                   return std::to_string(x * 2);
               });

    std::string result = sync_wait(s);
    std::cout << "Final result: " << result << std::endl; // 输出 "30"
}

c) transfer(Sender, Scheduler) / on(Scheduler, Sender):切换执行上下文

这些适配器允许你在异步操作链的特定点切换到不同的调度器。transfer将前一个Sender的执行上下文(如果需要)转移到指定的调度器,并确保后续操作在该调度器上执行。on是另一个类似的适配器,它将一个Sender包装起来,以便它在给定的调度器上执行。

// 假设我们有模拟的调度器
struct my_thread_pool_scheduler {
    // 实际实现会管理一个线程池
    auto schedule() const {
        // 返回一个Sender,它会在线程池中安排后续任务
        // 简化表示
        return ex::just(); // 实际会返回一个在线程池中执行的Sender
    }
};

void demo_transfer_on() {
    std::cout << "--- Demo: transfer/on ---" << std::endl;
    my_thread_pool_scheduler tp_scheduler;

    auto s = ex::just("Start")
             | ex::then([](const char* s_val) {
                   std::cout << "Initial work on current thread: " << s_val << std::endl;
                   return std::string(s_val) + " -> Transferred";
               })
             | ex::transfer(tp_scheduler) // 将后续操作转移到线程池
             | ex::then([](std::string s_val) {
                   // 模拟在线程池中执行
                   std::cout << "Work on thread pool: " << s_val << std::endl;
                   return s_val + " -> Done";
               });

    std::string result = sync_wait(s);
    std::cout << "Final result (back on main thread after sync_wait): " << result << std::endl;
}

通过transferon,我们可以清晰地定义任务的哪个部分在哪个执行上下文运行,这对于优化性能和管理资源至关重要。

d) when_all(Sender1, Sender2, ...):并行执行并等待所有任务完成

when_all是实现结构化并发的关键适配器之一。它接收多个Sender,并返回一个新Sender。这个新Sender只有在所有输入Sender都成功完成后才会完成,并产生一个包含所有结果的std::tuple。如果任何一个输入Sender失败或被取消,when_all会立即停止所有未完成的Sender,并传播第一个遇到的错误或取消信号。

这完美地体现了父子关系:when_all是父操作,它启动并等待其所有子操作完成。

#include <thread>
#include <chrono>

void demo_when_all() {
    std::cout << "--- Demo: when_all ---" << std::endl;

    // 模拟一个耗时任务
    auto long_task = ex::just(1)
                     | ex::then([](int x) {
                           std::this_thread::sleep_for(std::chrono::milliseconds(500));
                           std::cout << "Long task done." << std::endl;
                           return x + 10;
                       });

    // 模拟另一个耗时任务
    auto short_task = ex::just("Hello")
                      | ex::then([](const char* s) {
                           std::this_thread::sleep_for(std::chrono::milliseconds(200));
                           std::cout << "Short task done." << std::endl;
                           return std::string(s) + " World";
                       });

    // 同时启动这两个任务,并等待它们都完成
    auto combined_task = ex::when_all(long_task, short_task)
                         | ex::then([](std::tuple<int, std::string> results) {
                               std::cout << "All tasks finished. Processing results..." << std::endl;
                               int num_res = std::get<0>(results);
                               std::string str_res = std::get<1>(results);
                               return "Combined: " + std::to_string(num_res) + " and " + str_res;
                           });

    std::string final_message = sync_wait(combined_task);
    std::cout << "Final combined message: " << final_message << std::endl;
}

在这个例子中,combined_task是父操作,long_taskshort_task是子操作。combined_task会等待两个子任务并行执行并完成,然后才继续执行其then回调。如果long_taskshort_task中的任何一个抛出错误,when_all会捕获该错误,并取消另一个未完成的任务(如果可能),然后将错误传播下去。

e) upon_error(Sender, F) / upon_stopped(Sender, F):错误和取消处理

这些适配器允许你在异步链中插入错误恢复逻辑或取消处理逻辑。

void demo_error_handling() {
    std::cout << "--- Demo: error handling ---" << std::endl;

    auto fallible_task = ex::just(true)
                         | ex::then([](bool succeed) -> ex::sender auto {
                               if (succeed) {
                                   std::cout << "Fallible task succeeded." << std::endl;
                                   return ex::just(42);
                               } else {
                                   std::cout << "Fallible task failed." << std::endl;
                                   // ex::just_error 返回一个立即调用 set_error 的 Sender
                                   return ex::just_error(std::make_exception_ptr(std::runtime_error("Something went wrong!")));
                               }
                           });

    auto recovery_task = fallible_task
                         | ex::then([](int value) {
                               std::cout << "Value received: " << value << std::endl;
                               return value;
                           })
                         | ex::upon_error([](std::exception_ptr eptr) {
                               try {
                                   std::rethrow_exception(eptr);
                               } catch (const std::runtime_error& e) {
                                   std::cerr << "Caught error: " << e.what() << ". Recovering..." << std::endl;
                                   return ex::just(-1); // 恢复,返回一个默认值
                               }
                           });

    int result1 = sync_wait(recovery_task | ex::let_value([](int x){ return ex::just(x); })); // 成功路径
    std::cout << "Result after potential recovery (succeed): " << result1 << std::endl;

    // 模拟失败路径
    auto fallible_task_fail = ex::just(false) // 传入false使其失败
                         | ex::then([](bool succeed) -> ex::sender auto {
                               if (succeed) {
                                   return ex::just(42);
                               } else {
                                   return ex::just_error(std::make_exception_ptr(std::runtime_error("Something went wrong!")));
                               }
                           });

    auto recovery_task_fail = fallible_task_fail
                         | ex::then([](int value) {
                               std::cout << "Value received: " << value << std::endl;
                               return value;
                           })
                         | ex::upon_error([](std::exception_ptr eptr) {
                               try {
                                   std::rethrow_exception(eptr);
                               } catch (const std::runtime_error& e) {
                                   std::cerr << "Caught error: " << e.what() << ". Recovering..." << std::endl;
                                   return ex::just(-1); // 恢复,返回一个默认值
                               }
                           });
    int result2 = sync_wait(recovery_task_fail | ex::let_value([](int x){ return ex::just(x); })); // 失败路径,但被恢复
    std::cout << "Result after potential recovery (fail): " << result2 << std::endl;
}

upon_errorupon_stopped提供了一种结构化的方式来处理异常和取消,而无需在每个回调中手动检查状态或使用try-catch块。它们允许在特定的点上集中处理这些控制流事件,并可以返回一个新的Sender来继续执行,从而实现错误恢复。

f) start_detached(Sender):非结构化但有用的火花

虽然P2300旨在推广结构化并发,但它也提供了start_detached用于“火与忘”(fire-and-forget)场景。这是一种非结构化的操作,因为它启动一个Sender,但不等待其完成,也不捕获其结果、错误或取消状态。它适用于那些你真的不关心结果的任务,例如发送日志消息。

void demo_start_detached() {
    std::cout << "--- Demo: start_detached ---" << std::endl;
    auto s = ex::just("Logging message")
             | ex::then([](const char* msg) {
                   std::cout << "Detached task processing: " << msg << std::endl;
                   // 模拟一些耗时操作,主线程不会等待它
                   std::this_thread::sleep_for(std::chrono::milliseconds(100));
                   std::cout << "Detached task finished." << std::endl;
               });
    ex::start_detached(s); // 启动任务但不等待
    std::cout << "Main thread continues immediately after start_detached." << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 确保detached任务有机会执行
}

尽管start_detached是“非结构化”的,但它的存在是务实的。它允许程序员在明确知道后果的情况下,选择这种简单的并发模式。

3.3 P2300调度哲学总结

P2300的调度哲学可以用以下几点概括:

  1. 惰性执行(Lazy Evaluation): Sender只是描述,connect创建operation_statestart才真正触发执行。这允许在调度前进行优化和转换。
  2. 细粒度控制(Fine-grained Control): 通过transferon,程序员可以精确控制任务的每个阶段在哪个执行上下文运行。
  3. 层次化取消(Hierarchical Cancellation): set_stopped信号可以沿着Sender链传播。父操作可以取消子操作,反之亦然,如果子操作被取消,父操作可以感知。这与std::stop_token机制天然契合。
  4. 统一的错误处理(Unified Error Handling): 异常通过set_error信号传播,并且可以被upon_error统一处理。
  5. 资源管理作为一等公民(Resource Management as First-Class Citizen): operation_state的RAII特性确保了异步操作的生命周期得到妥善管理,减少了资源泄漏的风险。
  6. 异构计算支持(Heterogeneous Computing Support): Schedulers可以抽象任何执行环境,从CPU线程池到GPU队列,为C++进入更广泛的异构计算领域铺平道路。

4. 结构化并发与P2300的优势

1. 消除回调地狱,提升代码可读性:
通过链式调用的适配器,异步代码可以写成接近同步代码的线性流程,避免了多层嵌套回调带来的复杂性。

2. 强大的错误和取消传播机制:
set_errorset_stopped提供了标准化的错误和取消信号,适配器(如when_all)能够自动处理这些信号,并在整个异步任务图中传播。这意味着你不再需要手动编写大量的错误检查和取消逻辑。

3. 资源管理的自动化和可靠性:
operation_state作为RAII句柄,确保了与异步任务相关的资源在任务完成、失败或取消时能够被自动清理。这极大地降低了资源泄漏的风险。

4. 易于组合和重用:
Sender是高度可组合的。你可以创建小的、独立的Sender,然后将它们组合成复杂的异步工作流,就像使用函数一样。这种模块化使得代码更易于维护和重用。

5. 灵活的调度策略:
通过Scheduler抽象,你可以轻松地将任务调度到不同的执行上下文,而无需改变任务本身的逻辑。这对于性能优化、负载均衡和异构计算至关重要。

6. 编译器优化潜力:
由于Sender/Receiver模型是声明式的,并且基于Concept和类型擦除(在需要时),编译器可以有更多的机会进行优化,例如避免不必要的内存分配、内联代码,甚至静态分析任务图。

7. 与C++20协程的协同:
虽然P2300可以独立使用,但它与C++20协程有着天然的协同潜力。协程可以用来实现复杂的Sender或Receiver,使得异步操作的内部逻辑更加简洁。P2300可以作为协程的调度器和结果传递机制。

5. 挑战与展望

尽管std::execution带来了诸多优势,但它也带来了一些挑战:

  • 学习曲线: Sender/Receiver模型是一种全新的思维方式,需要时间来适应和掌握。
  • 概念的复杂性: 尽管设计优雅,但其背后的概念(Concept、定制点查找等)对于初学者来说可能比较抽象。
  • 调试: 异步任务的调试本身就比同步任务复杂,虽然结构化并发有所改善,但仍然需要新的工具和技术。
  • 标准库生态的成熟: P2300只是一个基础框架,需要其他标准库组件(如std::netstd::filesystem的异步版本)来充分发挥其潜力。

展望未来,随着std::execution在C++23中获得采纳并逐渐成熟,我们可以期待C++并发编程的范式将发生深刻的变革。它将不仅仅是关于“如何启动一个线程”,更是关于“如何优雅地编排复杂的异步操作流”。结构化并发将使我们的并发代码更健壮、更易于理解、更易于维护,从而释放C++在高性能和高并发领域更强大的潜力。

6. 总结与展望

结构化并发,以C++23 std::execution提案的Sender/Receiver模型为代表,为C++带来了前所未有的并发编程范式。它通过明确的任务生命周期、自动的资源管理以及强大的错误和取消传播机制,将并发操作提升到结构化控制流的水平,显著提高了异步代码的可靠性、可读性和可维护性。随着这一模型的普及,C++开发者将能够更自信地构建复杂、高效且健壮的并发系统。


演示代码示例 (需要 P2300 库支持):

#include <iostream>
#include <string>
#include <vector>
#include <thread>
#include <chrono>
#include <tuple>
#include <optional>
#include <exception>
#include <stdexcept>
#include <condition_variable>
#include <mutex>

// --- 模拟 P2300 概念和适配器 ---
// 实际的 P2300 实现会非常复杂,这里只提供一个极简的、能运行示例的骨架
// 完整且正确的 P2300 实现需要 extensive use of C++20 concepts,
// customized point lookups (CPOs), and type erasure/template metaprogramming.

namespace std::execution {

// Forward declarations for concepts and traits
template <typename S> struct sender_traits;
template <typename S> concept sender = requires(S s) {
    typename sender_traits<S>::value_types;
    typename sender_traits<S>::error_types;
    { sender_traits<S>::sends_stopped } -> std::same_as<bool>;
    // More complex requirements would be here
};

template <typename R, typename... Args>
concept receiver_of_value = requires(R r, Args... args) {
    { r.set_value(std::forward<Args>(args)...) } -> std::same_as<void>;
};

template <typename R>
concept receiver_of_error = requires(R r, std::exception_ptr e) {
    { r.set_error(e) } -> std::same_as<void>;
};

template <typename R>
concept receiver_of_stopped = requires(R r) {
    { r.set_stopped() } -> std::same_as<void>;
};

template <typename R, typename S>
concept receiver = receiver_of_value<R, typename sender_traits<S>::value_types::template at<0>> &&
                   receiver_of_error<R> &&
                   receiver_of_stopped<R>;

template <typename Op>
concept operation_state = requires(Op op) {
    { op.start() } -> std::same_as<void>;
};

// Simplified type list for sender_traits
template<typename... Ts> struct type_list {};
template<typename... Ts> struct single_value_list { template<int I> using at = std::tuple_element_t<I, std::tuple<Ts...>>; };

// --- Simplified `connect` and `start` ---
template <sender S, typename R>
operation_state auto connect(S&& s, R&& r); // Defined later

void start(operation_state auto& op_state) {
    op_state.start();
}

// --- `just` sender ---
template <typename... Args>
struct just_sender {
    std::tuple<Args...> values_;

    template <typename R>
    struct operation_state_impl {
        R receiver_;
        std::tuple<Args...> values_;

        void start() {
            std::apply([&](auto&&... args) {
                receiver_.set_value(std::forward<decltype(args)>(args)...);
            }, values_);
        }
    };

    template <typename R>
    operation_state_impl<R> connect(R&& r) && {
        return {std::forward<R>(r), std::move(values_)};
    }
};

template <typename... Args>
struct sender_traits<just_sender<Args...>> {
    using value_types = single_value_list<Args...>;
    using error_types = type_list<>;
    static constexpr bool sends_stopped = false;
};

template <typename... Args>
just_sender<Args...> just(Args... args) {
    return {std::make_tuple(std::forward<Args>(args)...)};
}

// --- `then` adaptor ---
template <sender BaseSender, typename F>
struct then_sender {
    BaseSender base_sender_;
    F func_;

    template <typename R>
    struct operation_state_impl {
        R receiver_;
        F func_;
        // Inner receiver for the base_sender_
        struct inner_receiver {
            operation_state_impl* outer_;

            template <typename... Args>
            void set_value(Args&&... args) {
                try {
                    // Call the user function F
                    if constexpr (std::is_void_v<std::invoke_result_t<F, Args...>>) {
                        std::invoke(outer_->func_, std::forward<Args>(args)...);
                        outer_->receiver_.set_value(); // If F returns void, then_sender returns void
                    } else {
                        outer_->receiver_.set_value(std::invoke(outer_->func_, std::forward<Args>(args)...));
                    }
                } catch (...) {
                    outer_->receiver_.set_error(std::current_exception());
                }
            }
            void set_error(std::exception_ptr e) { outer_->receiver_.set_error(e); }
            void set_stopped() { outer_->receiver_.set_stopped(); }
        } inner_receiver_;

        // Operation state for the base_sender_
        std::remove_cvref_t<decltype(connect(std::declval<BaseSender>(), std::declval<inner_receiver>()))> base_op_state_;

        operation_state_impl(R&& r, BaseSender&& s, F&& f)
            : receiver_(std::forward<R>(r)), func_(std::forward<F>(f)), inner_receiver_{this},
              base_op_state_(connect(std::forward<BaseSender>(s), std::move(inner_receiver_))) {}

        void start() {
            std::execution::start(base_op_state_);
        }
    };

    template <typename R>
    operation_state_impl<R> connect(R&& r) && {
        return {std::forward<R>(r), std::move(base_sender_), std::move(func_)};
    }
};

template <sender BaseSender, typename F>
struct sender_traits<then_sender<BaseSender, F>> {
    // Determine the value type of the result
    using base_value_types = typename sender_traits<BaseSender>::value_types;
    using result_type = std::invoke_result_t<F, typename base_value_types::template at<0>>;
    using value_types = single_value_list<result_type>;
    using error_types = typename sender_traits<BaseSender>::error_types; // Error types from base, plus any F might throw
    static constexpr bool sends_stopped = sender_traits<BaseSender>::sends_stopped;
};

template <sender BaseSender, typename F>
then_sender<BaseSender, F> then(BaseSender&& s, F&& f) {
    return {std::forward<BaseSender>(s), std::forward<F>(f)};
}

// --- `transfer` adaptor (simplified, no actual scheduler implementation) ---
struct scheduler_base {}; // Just a tag for now

template <sender BaseSender, typename Scheduler>
struct transfer_sender {
    BaseSender base_sender_;
    Scheduler scheduler_;

    template <typename R>
    struct operation_state_impl {
        R receiver_;
        Scheduler scheduler_;
        // Inner receiver for the base_sender_
        struct inner_receiver {
            operation_state_impl* outer_;
            template <typename... Args>
            void set_value(Args&&... args) {
                // In a real implementation, this would enqueue a task on scheduler_
                // and that task would call outer_->receiver_.set_value
                outer_->receiver_.set_value(std::forward<Args>(args)...);
            }
            void set_error(std::exception_ptr e) { outer_->receiver_.set_error(e); }
            void set_stopped() { outer_->receiver_.set_stopped(); }
        } inner_receiver_;

        std::remove_cvref_t<decltype(connect(std::declval<BaseSender>(), std::declval<inner_receiver>()))> base_op_state_;

        operation_state_impl(R&& r, BaseSender&& s, Scheduler&& sch)
            : receiver_(std::forward<R>(r)), scheduler_(std::forward<Scheduler>(sch)), inner_receiver_{this},
              base_op_state_(connect(std::forward<BaseSender>(s), std::move(inner_receiver_))) {}

        void start() {
            // In a real implementation, the scheduler would be used here.
            // For this demo, we just start the base immediately.
            std::execution::start(base_op_state_);
        }
    };

    template <typename R>
    operation_state_impl<R> connect(R&& r) && {
        return {std::forward<R>(r), std::move(base_sender_), std::move(scheduler_)};
    }
};

template <sender BaseSender, typename Scheduler>
struct sender_traits<transfer_sender<BaseSender, Scheduler>> {
    using value_types = typename sender_traits<BaseSender>::value_types;
    using error_types = typename sender_traits<BaseSender>::error_types;
    static constexpr bool sends_stopped = sender_traits<BaseSender>::sends_stopped;
};

template <sender BaseSender, typename Scheduler>
transfer_sender<BaseSender, Scheduler> transfer(BaseSender&& s, Scheduler&& sch) {
    return {std::forward<BaseSender>(s), std::forward<Scheduler>(sch)};
}

// --- `when_all` adaptor ---
template <sender... Senders>
struct when_all_sender {
    std::tuple<Senders...> senders_;

    template <typename R>
    struct operation_state_impl {
        R receiver_;
        std::tuple<Senders...> senders_;
        std::tuple<typename sender_traits<Senders>::value_types::template at<0>...> results_;
        std::atomic<int> completed_count_{0};
        std::atomic<bool> cancelled_{false};
        std::exception_ptr stored_exception_;
        std::mutex m_; // For protecting shared state on completion

        // Inner receiver for each sub-sender
        template <int I>
        struct inner_receiver_impl {
            operation_state_impl* outer_;

            template <typename... Args>
            void set_value(Args&&... args) {
                if (outer_->cancelled_.load()) return; // If parent already cancelled

                std::get<I>(outer_->results_) = std::tuple<Args...>{std::forward<Args>(args)...};
                if (++outer_->completed_count_ == sizeof...(Senders)) {
                    outer_->receiver_.set_value(std::move(outer_->results_));
                }
            }

            void set_error(std::exception_ptr e) {
                std::lock_guard lk(outer_->m_);
                if (!outer_->cancelled_.exchange(true)) { // Only store the first error
                    outer_->stored_exception_ = e;
                    // In a real implementation, would try to cancel other ops
                    outer_->receiver_.set_error(e);
                }
            }

            void set_stopped() {
                std::lock_guard lk(outer_->m_);
                if (!outer_->cancelled_.exchange(true)) { // Only propagate first stop
                    outer_->receiver_.set_stopped();
                }
            }
        };
        std::tuple<inner_receiver_impl<0>, inner_receiver_impl<1>...> inner_receivers_; // For two senders for simplicity

        // Operation states for each sub-sender
        std::tuple<
            std::remove_cvref_t<decltype(connect(std::declval<Senders>(), std::declval<inner_receiver_impl<0>>()))>...
        > base_op_states_;

        operation_state_impl(R&& r, std::tuple<Senders...>&& s_tuple)
            : receiver_(std::forward<R>(r)), senders_(std::move(s_tuple)),
              inner_receivers_{inner_receiver_impl<0>{this}, inner_receiver_impl<1>{this}} // For 2 senders
        {
            // Connect each sender
            // This part is tricky to generalize for N senders without recursion or boost.hana
            base_op_states_ = std::make_tuple(
                connect(std::get<0>(senders_), std::get<0>(inner_receivers_)),
                connect(std::get<1>(senders_), std::get<1>(inner_receivers_))
            );
        }

        void start() {
            // Start all sub-senders
            std::apply([](auto&... ops){ (std::execution::start(ops), ...); }, base_op_states_);
        }
    };

    template <typename R>
    operation_state_impl<R> connect(R&& r) && {
        return {std::forward<R>(r), std::move(senders_)};
    }
};

template <sender... Senders>
struct sender_traits<when_all_sender<Senders...>> {
    using value_types = single_value_list<std::tuple<typename sender_traits<Senders>::value_types::template at<0>...>>;
    using error_types = type_list<std::exception_ptr>; // Can propagate any error
    static constexpr bool sends_stopped = (sender_traits<Senders>::sends_stopped || ...);
};

template <sender... Senders>
when_all_sender<Senders...> when_all(Senders&&... s) {
    return {std::make_tuple(std::forward<Senders>(s)...)};
}

// --- `just_error` sender ---
struct just_error_sender {
    std::exception_ptr error_;

    template <typename R>
    struct operation_state_impl {
        R receiver_;
        std::exception_ptr error_;
        void start() {
            receiver_.set_error(error_);
        }
    };

    template <typename R>
    operation_state_impl<R> connect(R&& r) && {
        return {std::forward<R>(r), std::move(error_)};
    }
};

template <>
struct sender_traits<just_error_sender> {
    using value_types = single_value_list<>;
    using error_types = type_list<std::exception_ptr>;
    static constexpr bool sends_stopped = false;
};

just_error_sender just_error(std::exception_ptr e) {
    return {std::move(e)};
}

// --- `upon_error` adaptor ---
template <sender BaseSender, typename F>
struct upon_error_sender {
    BaseSender base_sender_;
    F func_;

    template <typename R>
    struct operation_state_impl {
        R receiver_;
        F func_;
        std::optional<std::remove_cvref_t<decltype(connect(std::declval<decltype(std::invoke(std::declval<F>(), std::declval<std::exception_ptr>()))>(), std::declval<R>()))>> recovery_op_state_;

        struct inner_receiver {
            operation_state_impl* outer_;
            template <typename... Args> void set_value(Args&&... args) { outer_->receiver_.set_value(std::forward<Args>(args)...); }
            void set_stopped() { outer_->receiver_.set_stopped(); }
            void set_error(std::exception_ptr e) {
                try {
                    // Call F to get a recovery sender
                    auto recovery_sender = std::invoke(outer_->func_, e);
                    outer_->recovery_op_state_.emplace(connect(std::move(recovery_sender), outer_->receiver_));
                    std::execution::start(*outer_->recovery_op_state_);
                } catch (...) {
                    outer_->receiver_.set_error(std::current_exception()); // F itself threw
                }
            }
        } inner_receiver_;

        std::remove_cvref_t<decltype(connect(std::declval<BaseSender>(), std::declval<inner_receiver>()))> base_op_state_;

        operation_state_impl(R&& r, BaseSender&& s, F&& f)
            : receiver_(std::forward<R>(r)), func_(std::forward<F>(f)), inner_receiver_{this},
              base_op_state_(connect(std::forward<BaseSender>(s), std::move(inner_receiver_))) {}

        void start() {
            std::execution::start(base_op_state_);
        }
    };

    template <typename R>
    operation_state_impl<R> connect(R&& r) && {
        return {std::forward<R>(r), std::move(base_sender_), std::move(func_)};
    }
};

template <sender BaseSender, typename F>
struct sender_traits<upon_error_sender<BaseSender, F>> {
    using value_types = typename sender_traits<BaseSender>::value_types; // Same value types
    using error_types = type_list<std::exception_ptr>; // F might throw, or the recovery sender might error
    static constexpr bool sends_stopped = sender_traits<BaseSender>::sends_stopped;
};

template <sender BaseSender, typename F>
upon_error_sender<BaseSender, F> upon_error(BaseSender&& s, F&& f) {
    return {std::forward<BaseSender>(s), std::forward<F>(f)};
}

// --- `let_value` adaptor (simplified, needed for chaining senders from `then` or `upon_error` that return senders) ---
template <sender BaseSender, typename F>
struct let_value_sender {
    BaseSender base_sender_;
    F func_;

    template <typename R>
    struct operation_state_impl {
        R receiver_;
        F func_;
        std::optional<std::remove_cvref_t<decltype(connect(std::declval<decltype(std::invoke(std::declval<F>(), std::declval<typename sender_traits<BaseSender>::value_types::template at<0>>()))>(), std::declval<R>()))>> next_op_state_;

        struct inner_receiver {
            operation_state_impl* outer_;
            template <typename... Args>
            void set_value(Args&&... args) {
                try {
                    auto next_sender = std::invoke(outer_->func_, std::forward<Args>(args)...);
                    outer_->next_op_state_.emplace(connect(std::move(next_sender), outer_->receiver_));
                    std::execution::start(*outer_->next_op_state_);
                } catch (...) {
                    outer_->receiver_.set_error(std::current_exception());
                }
            }
            void set_error(std::exception_ptr e) { outer_->receiver_.set_error(e); }
            void set_stopped() { outer_->receiver_.set_stopped(); }
        } inner_receiver_;

        std::remove_cvref_t<decltype(connect(std::declval<BaseSender>(), std::declval<inner_receiver>()))> base_op_state_;

        operation_state_impl(R&& r, BaseSender&& s, F&& f)
            : receiver_(std::forward<R>(r)), func_(std::forward<F>(f)), inner_receiver_{this},
              base_op_state_(connect(std::forward<BaseSender>(s), std::move(inner_receiver_))) {}

        void start() {
            std::execution::start(base_op_state_);
        }
    };

    template <typename R>
    operation_state_impl<R> connect(R&& r) && {
        return {std::forward<R>(r), std::move(base_sender_), std::move(func_)};
    }
};

template <sender BaseSender, typename F>
struct sender_traits<let_value_sender<BaseSender, F>> {
    using next_sender_type = std::invoke_result_t<F, typename sender_traits<BaseSender>::value_types::template at<0>>;
    using value_types = typename sender_traits<next_sender_type>::value_types;
    using error_types = typename sender_traits<BaseSender>::error_types; // Plus error types from next_sender_type
    static constexpr bool sends_stopped = sender_traits<BaseSender>::sends_stopped || sender_traits<next_sender_type>::sends_stopped;
};

template <sender BaseSender, typename F>
let_value_sender<BaseSender, F> let_value(BaseSender&& s, F&& f) {
    return {std::forward<BaseSender>(s), std::forward<F>(f)};
}

// --- `start_detached` ---
template <sender S>
struct detached_receiver {
    // Does nothing with the results
    template <typename... Args> void set_value(Args&&...) {}
    void set_error(std::exception_ptr) {
        // In a real system, you'd log this error
        std::cerr << "Detached task encountered an error!" << std::endl;
    }
    void set_stopped() {
        std::cout << "Detached task was stopped." << std::endl;
    }
};

template <sender S>
void start_detached(S&& s) {
    // In a real system, op_state would be moved to a thread/task manager
    // to manage its lifetime. Here, it's just a local variable.
    // This is explicitly unstructured, so resource management is basic.
    auto op_state = connect(std::forward<S>(s), detached_receiver<S>{});
    start(op_state);
}

// --- Simplified `sync_wait` implementation ---
template <sender S>
auto sync_wait(S&& s) {
    // Define a basic receiver for sync_wait
    struct sync_wait_receiver {
        std::optional<typename sender_traits<S>::value_types::template at<0>> result_storage;
        std::exception_ptr error_storage;
        bool stopped_flag = false;
        std::condition_variable cv;
        std::mutex m;
        bool done = false;

        template <typename... Args>
        void set_value(Args&&... args) {
            std::lock_guard lk(m);
            result_storage.emplace(std::forward<Args>(args)...);
            done = true;
            cv.notify_one();
        }
        void set_error(std::exception_ptr e) {
            std::lock_guard lk(m);
            error_storage = e;
            done = true;
            cv.notify_one();
        }
        void set_stopped() {
            std::lock_guard lk(m);
            stopped_flag = true;
            done = true;
            cv.notify_one();
        }

        void wait() {
            std::unique_lock lk(m);
            cv.wait(lk, [this]{ return done; });
        }
    };

    sync_wait_receiver r;
    auto op_state = connect(std::forward<S>(s), r);
    start(op_state);
    r.wait();

    if (r.error_storage) {
        std::rethrow_exception(r.error_storage);
    }
    if (r.stopped_flag) {
        throw std::runtime_error("Operation was stopped.");
    }
    return *r.result_storage;
}

// --- Simplified `connect` definition (requires types to be fully defined) ---
template <sender S, typename R>
operation_state auto connect(S&& s, R&& r) {
    return std::forward<S>(s).connect(std::forward<R>(r));
}

} // namespace std::execution

// --- Pipe operator for chaining senders ---
template <std::execution::sender S, typename F>
auto operator|(S&& s, F&& f) {
    return std::forward<F>(f)(std::forward<S>(s));
}

// --- Demo Functions ---
// (The demo functions from the lecture body are placed here)

void demo_just() {
    std::cout << "--- Demo: just ---" << std::endl;
    auto s1 = std::execution::just(42);
    int value = std::execution::sync_wait(s1);
    std::cout << "just(42) resulted in: " << value << std::endl;

    auto s2 = std::execution::just("Hello", 123);
    std::tuple<const char*, int> values = std::execution::sync_wait(s2);
    std::cout << "just("Hello", 123) resulted in: "
              << std::get<0>(values) << ", " << std::get<1>(values) << std::endl;
}

void demo_then() {
    std::cout << "--- Demo: then ---" << std::endl;
    auto s = std::execution::just(10)
             | std::execution::then([](int x) {
                   std::cout << "First then: " << x << std::endl;
                   return x + 5;
               })
             | std::execution::then([](int x) {
                   std::cout << "Second then: " << x << std::endl;
                   return std::to_string(x * 2);
               });

    std::string result = std::execution::sync_wait(s);
    std::cout << "Final result: " << result << std::endl;
}

struct my_thread_pool_scheduler : std::execution::scheduler_base {
    auto schedule() const {
        return std::execution::just(); // Simplified, real one would schedule on a thread pool
    }
};

void demo_transfer_on() {
    std::cout << "--- Demo: transfer/on ---" << std::endl;
    my_thread_pool_scheduler tp_scheduler;

    auto s = std::execution::just("Start")
             | std::execution::then([](const char* s_val) {
                   std::cout << "Initial work on current thread: " << s_val << std::endl;
                   return std::string(s_val) + " -> Transferred";
               })
             | std::execution::transfer(tp_scheduler)
             | std::execution::then([](std::string s_val) {
                   std::cout << "Work on thread pool: " << s_val << std::endl;
                   return s_val + " -> Done";
               });

    std::string result = std::execution::sync_wait(s);
    std::cout << "Final result (back on main thread after sync_wait): " << result << std::endl;
}

void demo_when_all() {
    std::cout << "--- Demo: when_all ---" << std::endl;

    auto long_task = std::execution::just(1)
                     | std::execution::then([](int x) {
                           std::this_thread::sleep_for(std::chrono::milliseconds(500));
                           std::cout << "Long task done." << std::endl;
                           return x + 10;
                       });

    auto short_task = std::execution::just("Hello")
                      | std::execution::then([](const char* s) {
                           std::this_thread::sleep_for(std::chrono::milliseconds(200));
                           std::cout << "Short task done." << std::endl;
                           return std::string(s) + " World";
                       });

    auto combined_task = std::execution::when_all(long_task, short_task)
                         | std::execution::then([](std::tuple<int, std::string> results) {
                               std::cout << "All tasks finished. Processing results..." << std::endl;
                               int num_res = std::get<0>(results);
                               std::string str_res = std::get<1>(results);
                               return "Combined: " + std::to_string(num_res) + " and " + str_res;
                           });

    std::string final_message = std::execution::sync_wait(combined_task);
    std::cout << "Final combined message: " << final_message << std::endl;
}

void demo_error_handling() {
    std::cout << "--- Demo: error handling ---" << std::endl;

    auto fallible_task_succeed = std::execution::just(true)
                         | std::execution::let_value([](bool succeed) -> std::execution::sender auto {
                               if (succeed) {
                                   std::cout << "Fallible task succeeded." << std::endl;
                                   return std::execution::just(42);
                               } else {
                                   std::cout << "Fallible task failed." << std::endl;
                                   return std::execution::just_error(std::make_exception_ptr(std::runtime_error("Something went wrong!")));
                               }
                           });

    auto recovery_task_succeed = fallible_task_succeed
                         | std::execution::upon_error([](std::exception_ptr eptr) {
                               try {
                                   std::rethrow_exception(eptr);
                               } catch (const std::runtime_error& e) {
                                   std::cerr << "Caught error: " << e.what() << ". Recovering..." << std::endl;
                                   return std::execution::just(-1);
                               }
                           });

    int result1 = std::execution::sync_wait(recovery_task_succeed);
    std::cout << "Result after potential recovery (succeed): " << result1 << std::endl;

    auto fallible_task_fail = std::execution::just(false)
                         | std::execution::let_value([](bool succeed) -> std::execution::sender auto {
                               if (succeed) {
                                   return std::execution::just(42);
                               } else {
                                   return std::execution::just_error(std::make_exception_ptr(std::runtime_error("Something went wrong!")));
                               }
                           });

    auto recovery_task_fail = fallible_task_fail
                         | std::execution::upon_error([](std::exception_ptr eptr) {
                               try {
                                   std::rethrow_exception(eptr);
                               } catch (const std::runtime_error& e) {
                                   std::cerr << "Caught error: " << e.what() << ". Recovering..." << std::endl;
                                   return std::execution::just(-1);
                               }
                           });
    int result2 = std::execution::sync_wait(recovery_task_fail);
    std::cout << "Result after potential recovery (fail): " << result2 << std::endl;
}

void demo_start_detached() {
    std::cout << "--- Demo: start_detached ---" << std::endl;
    auto s = std::execution::just("Logging message")
             | std::execution::then([](const char* msg) {
                   std::cout << "Detached task processing: " << msg << std::endl;
                   std::this_thread::sleep_for(std::chrono::milliseconds(100));
                   std::cout << "Detached task finished." << std::endl;
               });
    std::execution::start_detached(s);
    std::cout << "Main thread continues immediately after start_detached." << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(200));
}

int main() {
    demo_just();
    std::cout << std::endl;
    demo_then();
    std::cout << std::endl;
    demo_transfer_on();
    std::cout << std::endl;
    demo_when_all();
    std::cout << std::endl;
    demo_error_handling();
    std::cout << std::endl;
    demo_start_detached();
    std::cout << std::endl;

    return 0;
}

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注