什么是 ‘Continuable Promises’?解析如何在 C++ 中构建类似 JavaScript `Promise.all` 的组合子

各位同仁,大家好。今天我们将深入探讨一个在现代异步编程中至关重要的概念——“Continuable Promises”,并以C++为舞台,构建一个类似JavaScript中强大的Promise.all组合子。作为一名编程专家,我将以讲座的形式,逐步解析其设计理念、实现细节以及在C++复杂环境下的考量。

一、 承诺的本质:异步操作的优雅管理

在进入“Continuable Promises”的具体实现之前,我们首先要理解“Promise”这一概念的核心。在传统的同步编程中,当一个函数被调用时,它会立即执行并返回结果。然而,在许多现代应用场景中,我们不得不面对耗时操作,如网络请求、文件I/O或复杂的计算。这些操作如果同步执行,会阻塞主线程,导致用户界面卡顿甚至程序无响应。

异步编程应运而生,它允许我们在后台执行这些耗时操作,并在操作完成时通知我们结果。但传统的异步编程往往伴随着“回调地狱”(Callback Hell),即多层嵌套的回调函数导致代码难以阅读、维护和错误处理。

Promise(承诺)正是为了解决这些问题而诞生的。它代表了一个异步操作的最终结果,这个结果可能在未来某个时间点可用,也可能永远不会可用(因为操作失败)。一个Promise对象有以下几个核心特性:

  1. 状态(State)

    • Pending (待定):初始状态,既没有成功,也没有失败。
    • Fulfilled (已成功):操作成功完成,并返回了一个结果值。
    • Rejected (已失败):操作失败,并返回了一个错误原因。
      一旦一个Promise进入Fulfilled或Rejected状态,它就变成了Settled (已落定)状态,且其状态不可再改变。
  2. 值(Value)或原因(Reason)

    • 当Promise Fulfilled时,它会带上一个成功的值。
    • 当Promise Rejected时,它会带上一个失败的原因(通常是一个异常对象)。
  3. 链式调用(Chaining)
    Promise最强大的特性之一是其可链式调用。通过then方法,我们可以在一个Promise完成之后,执行另一个操作,这个操作本身也可以返回一个新的Promise,从而形成一个异步操作序列。

Continuable Promises,顾名思义,就是那些可以被“延续”的Promise。它们允许我们指定当Promise成功或失败时应该执行哪些后续操作,并且这些后续操作的结果又可以生成新的Promise,形成一个无缝的异步工作流。这正是我们今天构建C++ Promise库的核心目标。

二、 C++ Promise 的基础构建:状态、值与回调机制

为了在C++中构建Continuable Promises,我们需要一个核心的Promise类。这个类需要管理Promise的状态、存储结果或异常,并提供注册回调函数的机制。

2.1 Promise 状态和数据结构

我们首先定义Promise的状态。为了通用性,Promise应该是一个模板类,可以处理任何类型的结果。

#include <functional>
#include <memory>
#include <mutex>
#include <vector>
#include <exception>
#include <optional>
#include <future> // For std::async, though a custom thread pool is better for production

// Forward declaration for mutual dependency
template <typename T> class Promise;

// Type erasure for callbacks returning void
template <typename R>
struct OnFulfilledCallback {
    std::function<void(R)> func;
};

// Type erasure for callbacks returning Promise<U>
template <typename R, typename U>
struct OnFulfilledCallbackPromise {
    std::function<Promise<U>(R)> func;
};

// Type erasure for error callbacks
struct OnRejectedCallback {
    std::function<void(std::exception_ptr)> func;
};

// Internal state of a promise, shared across copies
template <typename T>
struct PromiseState {
    enum State { PENDING, FULFILLED, REJECTED };
    State state = PENDING;
    std::optional<T> value; // Using std::optional to hold value only if fulfilled
    std::exception_ptr exception; // Holds exception if rejected

    // Callbacks waiting for this promise to settle
    // Using std::vector<std::function<void()>> to store type-erased callbacks
    // These callbacks will trigger the resolution/rejection of the *next* promise in the chain
    std::vector<std::function<void()>> on_settled_callbacks;

    std::mutex mutex; // Protects state, value, exception, and callback vector
    std::condition_variable cv; // For synchronous waiting (e.g., in a `get()` method, not part of core promise chaining)

    // For `Promise<void>` specialization
    // This is a common pattern: `Promise<void>` doesn't hold a value.
    // For simplicity, we'll keep `std::optional<T>` and just ignore `value` for `void`.
    // A full implementation would specialize `PromiseState<void>`.
};

表格:Promise 状态

状态 描述 结果
PENDING 初始状态,操作正在进行中
FULFILLED 操作成功完成 成功值 T
REJECTED 操作失败,通常带有错误信息 exception_ptr 错误原因 std::exception_ptr

2.2 Promise 类结构

现在我们来定义Promise类。为了允许多个Promise对象引用同一个异步操作的结果(例如,当一个Promise被传递给多个then调用时),我们需要使用std::shared_ptr来管理其内部状态。

template <typename T>
class Promise {
private:
    std::shared_ptr<PromiseState<T>> state_;

    // Private constructor for internal use (e.g., by then() method)
    Promise(std::shared_ptr<PromiseState<T>> state) : state_(std::move(state)) {}

    // Helper to resolve/reject a promise, handling state transitions and callbacks
    void internal_resolve(const T& val) {
        {
            std::lock_guard<std::mutex> lock(state_->mutex);
            if (state_->state != PromiseState<T>::PENDING) {
                return; // Already settled
            }
            state_->state = PromiseState<T>::FULFILLED;
            state_->value = val;
        }
        state_->cv.notify_all(); // Notify any threads waiting on `get()`
        // Execute callbacks asynchronously to avoid blocking the resolver
        for (const auto& cb : state_->on_settled_callbacks) {
            // In a real system, this would dispatch to a thread pool
            // For this example, we'll use std::async for simplicity,
            // but beware of its limitations (e.g., potential for new threads per callback)
            std::async(std::launch::async, cb);
        }
        state_->on_settled_callbacks.clear(); // Callbacks consumed
    }

    void internal_reject(std::exception_ptr ex) {
        {
            std::lock_guard<std::mutex> lock(state_->mutex);
            if (state_->state != PromiseState<T>::PENDING) {
                return; // Already settled
            }
            state_->state = PromiseState<T>::REJECTED;
            state_->exception = std::move(ex);
        }
        state_->cv.notify_all();
        for (const auto& cb : state_->on_settled_callbacks) {
            std::async(std::launch::async, cb);
        }
        state_->on_settled_callbacks.clear();
    }

public:
    // Default constructor for an unresolved promise
    Promise() : state_(std::make_shared<PromiseState<T>>()) {}

    // Public method to resolve the promise
    void resolve(const T& val) {
        internal_resolve(val);
    }

    // Public method to reject the promise
    void reject(std::exception_ptr ex) {
        internal_reject(ex);
    }

    // Overload for rejecting with an exception object
    template <typename E>
    void reject(const E& ex) {
        internal_reject(std::make_exception_ptr(ex));
    }

    // `then` method for chaining
    // This is where the "continuable" aspect comes in.
    // It returns a *new* promise, which will be resolved/rejected based on the callback's result.
    template <typename OnFulfilled, typename OnRejected = std::function<void(std::exception_ptr)>>
    auto then(OnFulfilled on_fulfilled, OnRejected on_rejected = nullptr) {
        using ReturnType = decltype(on_fulfilled(std::declval<T>()));
        // If OnFulfilled returns void, the next promise is Promise<void>
        // If OnFulfilled returns U, the next promise is Promise<U>
        // If OnFulfilled returns Promise<U>, the next promise is Promise<U>

        // This requires careful SFINAE or template metaprogramming for robust return type deduction.
        // For simplicity in this lecture, let's assume `on_fulfilled` returns a value `U` or `Promise<U>`.

        // Let's assume on_fulfilled returns a plain value U for now,
        // and we will handle Promise<U> return type later for full spec compliance.
        // The most general case is that `then` returns a `Promise<U>` where `U` is the result of `on_fulfilled`.
        using NewPromiseValueType = typename std::conditional<
            std::is_same_v<ReturnType, void>,
            void,
            typename std::conditional<
                is_promise_v<ReturnType>, // Custom type trait to check if ReturnType is Promise<U>
                typename ReturnType::value_type, // If Promise<U>, extract U
                ReturnType // If U, then U
            >::type
        >::type;

        // Custom type trait for checking if a type is a Promise
        // This is a simplified version. A robust one would check for Promise<T> templates.
        template <typename U>
        struct is_promise : std::false_type {};
        template <typename U>
        struct is_promise<Promise<U>> : std::true_type {};
        template <typename U>
        inline constexpr bool is_promise_v = is_promise<U>::value;

        // The new promise that this `then` call will return
        auto next_promise = Promise<NewPromiseValueType>();

        // We need to capture `state_` by value (shared_ptr) to ensure it lives
        // as long as the lambda callback might be executed.
        auto current_state = state_;

        // The actual logic of `then`
        // Register a callback on the current promise's state.
        // This callback will be executed when the current promise settles.
        std::function<void()> callback = [
            current_state,
            next_promise_state = next_promise.state_, // Capture the state of the next promise
            on_fulfilled_copy = std::move(on_fulfilled),
            on_rejected_copy = std::move(on_rejected)
        ]() mutable {
            std::lock_guard<std::mutex> lock(current_state->mutex); // Protect current_state access
            if (current_state->state == PromiseState<T>::FULFILLED) {
                if (on_fulfilled_copy) {
                    try {
                        // Execute on_fulfilled and resolve/reject the next promise based on its result
                        // This part needs careful handling of return types (value vs. Promise)
                        // For simplicity, let's assume on_fulfilled returns a value U or void for now
                        if constexpr (std::is_void_v<NewPromiseValueType>) {
                            // If on_fulfilled returns void
                            on_fulfilled_copy(current_state->value.value());
                            Promise<void>(next_promise_state).resolve(); // Resolve the next void promise
                        } else if constexpr (is_promise_v<ReturnType>) {
                            // If on_fulfilled returns Promise<U>
                            ReturnType inner_promise = on_fulfilled_copy(current_state->value.value());
                            // Link the inner_promise to next_promise
                            inner_promise.then(
                                [next_promise_state](const typename ReturnType::value_type& val) {
                                    Promise<typename ReturnType::value_type>(next_promise_state).resolve(val);
                                },
                                [next_promise_state](std::exception_ptr ex) {
                                    Promise<typename ReturnType::value_type>(next_promise_state).reject(ex);
                                }
                            );
                        } else {
                            // If on_fulfilled returns a value U
                            next_promise.resolve(on_fulfilled_copy(current_state->value.value()));
                        }
                    } catch (...) {
                        next_promise.reject(std::current_exception());
                    }
                } else {
                    // No on_fulfilled handler, propagate value to next promise
                    next_promise.resolve(current_state->value.value());
                }
            } else if (current_state->state == PromiseState<T>::REJECTED) {
                if (on_rejected_copy) {
                    try {
                        // An on_rejected handler can "recover" from an error by returning a value (or void)
                        // If it throws, the error propagates.
                        // If it returns a value, the next promise is resolved.
                        // Here, we assume on_rejected returns void or a value that resolves the next promise.
                        // For a full spec, on_rejected can return Promise<U> too.
                        if constexpr (std::is_void_v<NewPromiseValueType>) {
                             on_rejected_copy(current_state->exception);
                             Promise<void>(next_promise_state).resolve();
                        } else if constexpr (is_promise_v<ReturnType>) {
                             ReturnType inner_promise = on_rejected_copy(current_state->exception);
                             inner_promise.then(
                                [next_promise_state](const typename ReturnType::value_type& val) {
                                    Promise<typename ReturnType::value_type>(next_promise_state).resolve(val);
                                },
                                [next_promise_state](std::exception_ptr ex) {
                                    Promise<typename ReturnType::value_type>(next_promise_state).reject(ex);
                                }
                             );
                        } else {
                             next_promise.resolve(on_rejected_copy(current_state->exception)); // This implies on_rejected returns T
                        }
                    } catch (...) {
                        next_promise.reject(std::current_exception());
                    }
                } else {
                    // No on_rejected handler, propagate error to next promise
                    next_promise.reject(current_state->exception);
                }
            }
        };

        // Add the callback to the current promise's state
        {
            std::lock_guard<std::mutex> lock(current_state->mutex);
            if (current_state->state == PromiseState<T>::PENDING) {
                current_state->on_settled_callbacks.push_back(std::move(callback));
            } else {
                // If the promise is already settled, execute callback immediately (asynchronously)
                std::async(std::launch::async, std::move(callback));
            }
        }

        return next_promise;
    }

    // `catch` method for error handling (syntactic sugar for then(nullptr, on_rejected))
    template <typename OnRejected>
    auto catch_error(OnRejected on_rejected) {
        return then(nullptr, on_rejected);
    }

    // Synchronous wait for result (discouraged in async patterns, but useful for testing/blocking scenarios)
    T get() {
        std::unique_lock<std::mutex> lock(state_->mutex);
        state_->cv.wait(lock, [this]{ return state_->state != PromiseState<T>::PENDING; });
        if (state_->state == PromiseState<T>::FULFILLED) {
            return state_->value.value();
        } else {
            std::rethrow_exception(state_->exception);
        }
    }

    // Specialization for Promise<void>
    void resolve() requires std::is_void_v<T> {
        internal_resolve(); // Needs a void-specific internal_resolve
    }
    void get() requires std::is_void_v<T> {
        std::unique_lock<std::mutex> lock(state_->mutex);
        state_->cv.wait(lock, [this]{ return state_->state != PromiseState<T>::PENDING; });
        if (state_->state == PromiseState<T>::REJECTED) {
            std::rethrow_exception(state_->exception);
        }
    }
};

// Specialization for Promise<void> (simplified, would ideally be a full specialization)
template <>
struct PromiseState<void> {
    enum State { PENDING, FULFILLED, REJECTED };
    State state = PENDING;
    std::exception_ptr exception;

    std::vector<std::function<void()>> on_settled_callbacks;
    std::mutex mutex;
    std::condition_variable cv;
};

template <>
class Promise<void> {
private:
    std::shared_ptr<PromiseState<void>> state_;

    Promise(std::shared_ptr<PromiseState<void>> state) : state_(std::move(state)) {}

    void internal_resolve() {
        {
            std::lock_guard<std::mutex> lock(state_->mutex);
            if (state_->state != PromiseState<void>::PENDING) return;
            state_->state = PromiseState<void>::FULFILLED;
        }
        state_->cv.notify_all();
        for (const auto& cb : state_->on_settled_callbacks) {
            std::async(std::launch::async, cb);
        }
        state_->on_settled_callbacks.clear();
    }

    void internal_reject(std::exception_ptr ex) {
        {
            std::lock_guard<std::mutex> lock(state_->mutex);
            if (state_->state != PromiseState<void>::PENDING) return;
            state_->state = PromiseState<void>::REJECTED;
            state_->exception = std::move(ex);
        }
        state_->cv.notify_all();
        for (const auto& cb : state_->on_settled_callbacks) {
            std::async(std::launch::async, cb);
        }
        state_->on_settled_callbacks.clear();
    }

public:
    Promise() : state_(std::make_shared<PromiseState<void>>()) {}

    void resolve() { internal_resolve(); }
    template <typename E> void reject(const E& ex) { internal_reject(std::make_exception_ptr(ex)); }
    void reject(std::exception_ptr ex) { internal_reject(ex); }

    // Simplified then for Promise<void>
    template <typename OnFulfilled, typename OnRejected = std::function<void(std::exception_ptr)>>
    auto then(OnFulfilled on_fulfilled, OnRejected on_rejected = nullptr) {
        using ReturnType = decltype(on_fulfilled()); // Call with no args for void promise
        using NewPromiseValueType = typename std::conditional<
            std::is_same_v<ReturnType, void>,
            void,
            typename std::conditional<
                is_promise_v<ReturnType>,
                typename ReturnType::value_type,
                ReturnType
            >::type
        >::type;
        auto next_promise = Promise<NewPromiseValueType>();
        auto current_state = state_;

        std::function<void()> callback = [
            current_state,
            next_promise_state = next_promise.state_,
            on_fulfilled_copy = std::move(on_fulfilled),
            on_rejected_copy = std::move(on_rejected)
        ]() mutable {
            std::lock_guard<std::mutex> lock(current_state->mutex);
            if (current_state->state == PromiseState<void>::FULFILLED) {
                if (on_fulfilled_copy) {
                    try {
                        if constexpr (std::is_void_v<NewPromiseValueType>) {
                            on_fulfilled_copy();
                            Promise<void>(next_promise_state).resolve();
                        } else if constexpr (is_promise_v<ReturnType>) {
                            ReturnType inner_promise = on_fulfilled_copy();
                            inner_promise.then(
                                [next_promise_state](const typename ReturnType::value_type& val) { Promise<typename ReturnType::value_type>(next_promise_state).resolve(val); },
                                [next_promise_state](std::exception_ptr ex) { Promise<typename ReturnType::value_type>(next_promise_state).reject(ex); }
                            );
                        } else {
                            next_promise.resolve(on_fulfilled_copy());
                        }
                    } catch (...) {
                        next_promise.reject(std::current_exception());
                    }
                } else {
                    Promise<void>(next_promise_state).resolve(); // Propagate void fulfillment
                }
            } else if (current_state->state == PromiseState<void>::REJECTED) {
                if (on_rejected_copy) {
                    try {
                        if constexpr (std::is_void_v<NewPromiseValueType>) {
                            on_rejected_copy(current_state->exception);
                            Promise<void>(next_promise_state).resolve();
                        } else if constexpr (is_promise_v<ReturnType>) {
                            ReturnType inner_promise = on_rejected_copy(current_state->exception);
                            inner_promise.then(
                                [next_promise_state](const typename ReturnType::value_type& val) { Promise<typename ReturnType::value_type>(next_promise_state).resolve(val); },
                                [next_promise_state](std::exception_ptr ex) { Promise<typename ReturnType::value_type>(next_promise_state).reject(ex); }
                            );
                        } else {
                            next_promise.resolve(on_rejected_copy(current_state->exception));
                        }
                    } catch (...) {
                        next_promise.reject(std::current_exception());
                    }
                } else {
                    next_promise.reject(current_state->exception);
                }
            }
        };

        {
            std::lock_guard<std::mutex> lock(current_state->mutex);
            if (current_state->state == PromiseState<void>::PENDING) {
                current_state->on_settled_callbacks.push_back(std::move(callback));
            } else {
                std::async(std::launch::async, std::move(callback));
            }
        }
        return next_promise;
    }

    template <typename OnRejected> auto catch_error(OnRejected on_rejected) { return then(nullptr, on_rejected); }
    void get() {
        std::unique_lock<std::mutex> lock(state_->mutex);
        state_->cv.wait(lock, [this]{ return state_->state != PromiseState<void>::PENDING; });
        if (state_->state == PromiseState<void>::REJECTED) {
            std::rethrow_exception(state_->exception);
        }
    }
};

代码解析要点:

  • PromiseState<T>:这是一个辅助结构体,包含了Promise的实际状态、值/异常和等待回调。它通过std::shared_ptr在多个Promise实例之间共享,确保所有引用同一异步操作的Promise都能看到相同的最新状态。
  • std::mutexstd::condition_variablestd::mutex用于保护PromiseState的所有共享成员,防止多线程访问冲突。std::condition_variable主要用于get()方法,允许线程等待Promise完成。
  • resolve(const T& val)reject(std::exception_ptr ex):这些是设置Promise最终状态的方法。它们会:
    1. 加锁以修改状态。
    2. 如果Promise已落定,则忽略后续调用。
    3. 设置状态和值/异常。
    4. 通知所有等待的线程(通过cv.notify_all())。
    5. 异步执行所有注册的回调。这里我们使用std::async,但在生产环境中,一个专门的线程池会是更好的选择,以控制资源和避免无限创建新线程。
  • then(OnFulfilled on_fulfilled, OnRejected on_rejected):这是Promise链的核心。
    1. 它总是返回一个新的Promise实例,这使得链式调用成为可能。
    2. 它将一个Lambda函数(捕获了on_fulfilledon_rejected)注册到当前Promise的on_settled_callbacks列表中。
    3. 当当前Promise落定后,这个Lambda会被执行。
    4. 在Lambda内部,根据当前Promise是Fulfilled还是Rejected,调用相应的on_fulfilledon_rejected回调。
    5. 回调函数的返回值决定了下一个Promise的状态:
      • 如果回调返回一个值,下一个Promise就会用这个值来Resolve。
      • 如果回调返回另一个Promise(即Promise扁平化),则下一个Promise会“跟随”这个返回的Promise的状态。
      • 如果回调抛出异常,下一个Promise就会被Reject。
      • 如果没有提供相应的回调,Promise的状态(值或异常)会直接传递给下一个Promise。
  • Promise<void> 特化:为了处理不返回任何值的异步操作(如文件写入),我们需要Promise<void>的特化。它的行为类似,只是不存储具体的值。
  • is_promise_v:一个简化的类型特性,用于在编译时检查一个类型是否是Promise<T>。在实际项目中,这会更复杂。

2.3 线程安全与异步调度

C++ Promise的关键挑战之一是线程安全。由于Promise的状态和回调队列可能在多个线程中被访问和修改(例如,一个线程resolve,另一个线程then),所以必须使用互斥锁 (std::mutex) 来保护共享数据。

此外,回调的调度方式也至关重要。直接在resolve/reject的同一线程中执行回调可能会导致死锁或长时间阻塞生产者线程。因此,我们选择将回调异步调度到另一个线程执行(这里使用std::async)。一个更健壮的方案是使用一个固定大小的线程池,避免std::async可能带来的性能开销和资源浪费。

三、 复合承诺:构建 Promise.all

现在我们有了基本的Promise机制,可以开始构建更高级的组合子。Promise.all是其中一个非常实用的模式,它接受一个Promise数组(或可迭代对象),并返回一个新的Promise。这个新的Promise会在所有输入Promise都成功时成功,其结果是一个包含所有输入Promise结果的数组,顺序与输入Promise的顺序一致。一旦任何一个输入Promise失败,Promise.all返回的Promise就会立即失败,并带上第一个失败Promise的错误。

3.1 Promise.all 的行为模式

表格:Promise.all 行为

输入 Promises 状态 Promise.all 返回的 Promise 状态 结果
所有 Fulfilled Fulfilled std::vector<T>,包含所有成功值,顺序不变
任意一个 Rejected Rejected 第一个 Rejected Promise 的错误原因

3.2 Promise::all 的设计与实现

Promise::all将作为一个静态方法,接收一个std::vector<Promise<T>>并返回Promise<std::vector<T>>

// Add this static method inside the Promise class (or as a freestanding function)
template <typename T>
class Promise {
    // ... existing Promise class members ...

public:
    // Static factory method for Promise::all
    static Promise<std::vector<T>> all(std::vector<Promise<T>> promises) {
        // The promise that will be returned by Promise::all
        auto aggregated_promise = Promise<std::vector<T>>();

        if (promises.empty()) {
            // If no promises are passed, resolve immediately with an empty vector
            aggregated_promise.resolve({});
            return aggregated_promise;
        }

        // Shared state for the aggregation logic
        struct AllState {
            std::mutex mutex;
            int pending_count;
            std::vector<std::optional<T>> results; // Use optional to distinguish unset from default-constructed
            bool rejected_early = false; // Flag to stop processing further rejections
        };
        auto all_state = std::make_shared<AllState>();
        all_state->pending_count = promises.size();
        all_state->results.resize(promises.size());

        // Iterate through each input promise and attach a then handler
        for (size_t i = 0; i < promises.size(); ++i) {
            promises[i].then(
                // OnFulfilled handler
                [i, all_state, aggregated_promise_state = aggregated_promise.state_](const T& val) {
                    std::lock_guard<std::mutex> lock(all_state->mutex);
                    if (all_state->rejected_early) {
                        return; // Already rejected, ignore further completions
                    }

                    all_state->results[i] = val;
                    all_state->pending_count--;

                    if (all_state->pending_count == 0) {
                        // All promises fulfilled, aggregate results and resolve the main promise
                        std::vector<T> final_results;
                        final_results.reserve(all_state->results.size());
                        for (const auto& opt_val : all_state->results) {
                            final_results.push_back(opt_val.value()); // .value() is safe here as all are fulfilled
                        }
                        Promise<std::vector<T>>(aggregated_promise_state).resolve(final_results);
                    }
                },
                // OnRejected handler
                [all_state, aggregated_promise_state = aggregated_promise.state_](std::exception_ptr ex) {
                    std::lock_guard<std::mutex> lock(all_state->mutex);
                    if (all_state->rejected_early) {
                        return; // Already rejected by another promise
                    }

                    all_state->rejected_early = true; // Mark as rejected
                    Promise<std::vector<T>>(aggregated_promise_state).reject(ex); // Reject the main promise
                }
            );
        }

        return aggregated_promise;
    }

    // Specialization for Promise<void>::all
    static Promise<void> all(std::vector<Promise<void>> promises) {
        auto aggregated_promise = Promise<void>();

        if (promises.empty()) {
            aggregated_promise.resolve();
            return aggregated_promise;
        }

        struct AllState {
            std::mutex mutex;
            int pending_count;
            bool rejected_early = false;
        };
        auto all_state = std::make_shared<AllState>();
        all_state->pending_count = promises.size();

        for (size_t i = 0; i < promises.size(); ++i) {
            promises[i].then(
                // OnFulfilled handler (for void, no value)
                [all_state, aggregated_promise_state = aggregated_promise.state_]() {
                    std::lock_guard<std::mutex> lock(all_state->mutex);
                    if (all_state->rejected_early) return;

                    all_state->pending_count--;
                    if (all_state->pending_count == 0) {
                        Promise<void>(aggregated_promise_state).resolve();
                    }
                },
                // OnRejected handler
                [all_state, aggregated_promise_state = aggregated_promise.state_](std::exception_ptr ex) {
                    std::lock_guard<std::mutex> lock(all_state->mutex);
                    if (all_state->rejected_early) return;

                    all_state->rejected_early = true;
                    Promise<void>(aggregated_promise_state).reject(ex);
                }
            );
        }
        return aggregated_promise;
    }
};

Promise::all 实现要点:

  1. 返回聚合PromisePromise::all会立即返回一个新的Promise<std::vector<T>>(或Promise<void>),这个Promise将代表所有输入Promise的聚合结果。
  2. AllState 结构体:我们定义了一个内部的AllState结构体来管理Promise.all的聚合逻辑所需的状态:
    • pending_count:记录还有多少个输入Promise尚未落定。
    • results:一个std::vector<std::optional<T>>,用于按索引顺序存储每个成功Promise的结果。std::optional在这里很重要,因为它允许我们区分一个位置是“未设置”还是“已被一个默认构造的值填充”(如果T是可默认构造的)。
    • rejected_early:一个布尔标志,用于一旦有任何一个Promise被拒绝,就立即停止处理其他Promise的成功。
  3. 循环附加 then 处理器:对于promises向量中的每个Promise,我们都附加一个then处理器。
    • 成功回调 (on_fulfilled):当一个输入Promise成功时,它的结果会被存储到all_state->results中对应的位置。pending_count减一。如果pending_count变为零,说明所有Promise都已成功,此时aggregated_promise会被解析为收集到的结果向量。
    • 失败回调 (on_rejected):当一个输入Promise失败时,rejected_early标志会被设置为true,并且aggregated_promise会立即被拒绝,并带上这个失败Promise的异常。后续其他Promise的成功或失败都将被忽略。
  4. 线程安全AllState中的所有成员都通过std::mutex进行保护,因为多个then回调可能在不同的线程中并发执行,同时修改pending_countresultsrejected_early
  5. 空输入处理:如果输入的promises向量为空,Promise::all会立即返回一个已成功的Promise,其结果是一个空向量。
  6. Promise<void>::all 特化:与Promise<T>类似,但不需要存储结果值,只需追踪完成数量。

3.3 示例用法

#include <iostream>
#include <thread>
#include <chrono>

// Assume Promise class and PromiseState are defined as above

// Helper function to simulate an async operation
Promise<int> async_add(int a, int b, int delay_ms, bool should_fail = false) {
    Promise<int> p;
    std::thread([=, captured_p = p]() mutable { // Capture p by value to extend its lifetime
        std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
        if (should_fail) {
            captured_p.reject(std::runtime_error("Failed to add!"));
        } else {
            captured_p.resolve(a + b);
        }
    }).detach(); // Detach the thread, it will run independently
    return p;
}

Promise<std::string> async_fetch_data(const std::string& url, int delay_ms, bool should_fail = false) {
    Promise<std::string> p;
    std::thread([=, captured_p = p]() mutable {
        std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
        if (should_fail) {
            captured_p.reject(std::runtime_error("Network error fetching " + url));
        } else {
            captured_p.resolve("Data from " + url);
        }
    }).detach();
    return p;
}

Promise<void> async_log(const std::string& message, int delay_ms) {
    Promise<void> p;
    std::thread([=, captured_p = p]() mutable {
        std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
        std::cout << "[LOG] " << message << std::endl;
        captured_p.resolve();
    }).detach();
    return p;
}

int main() {
    std::cout << "--- Promise Chaining Example ---" << std::endl;
    async_add(5, 3, 1000) // Returns Promise<int>
        .then([](int result) {
            std::cout << "First add result: " << result << std::endl; // result is 8
            return async_add(result, 10, 500); // Returns Promise<int>
        })
        .then([](int result) {
            std::cout << "Second add result: " << result << std::endl; // result is 18
            return async_fetch_data("api.example.com/data", 700); // Returns Promise<string>
        })
        .then([](const std::string& data) {
            std::cout << "Fetched data: " << data << std::endl;
            return async_log("Data processed successfully!", 200); // Returns Promise<void>
        })
        .then([]() { // For Promise<void>, the fulfilled handler takes no arguments
            std::cout << "All operations completed!" << std::endl;
        })
        .catch_error([](std::exception_ptr ex) {
            try {
                std::rethrow_exception(ex);
            } catch (const std::exception& e) {
                std::cerr << "Caught error in chain: " << e.what() << std::endl;
            }
        });

    std::cout << "n--- Promise.all Example (Success) ---" << std::endl;
    std::vector<Promise<int>> int_promises;
    int_promises.push_back(async_add(1, 2, 100));
    int_promises.push_back(async_add(3, 4, 300));
    int_promises.push_back(async_add(5, 6, 200));

    Promise<int>::all(std::move(int_promises))
        .then([](const std::vector<int>& results) {
            std::cout << "All int promises fulfilled. Results: ";
            for (int r : results) {
                std::cout << r << " ";
            }
            std::cout << std::endl;
        })
        .catch_error([](std::exception_ptr ex) {
            try {
                std::rethrow_exception(ex);
            } catch (const std::exception& e) {
                std::cerr << "Promise.all(int) failed: " << e.what() << std::endl;
            }
        });

    std::cout << "n--- Promise.all Example (Failure) ---" << std::endl;
    std::vector<Promise<std::string>> string_promises;
    string_promises.push_back(async_fetch_data("url1", 500));
    string_promises.push_back(async_fetch_data("url2", 100, true)); // This one will fail
    string_promises.push_back(async_fetch_data("url3", 800));

    Promise<std::string>::all(std::move(string_promises))
        .then([](const std::vector<std::string>& results) {
            std::cout << "All string promises fulfilled. This should not happen." << std::endl;
        })
        .catch_error([](std::exception_ptr ex) {
            try {
                std::rethrow_exception(ex);
            } catch (const std::exception& e) {
                std::cerr << "Promise.all(string) failed as expected: " << e.what() << std::endl;
            }
        });

    std::cout << "n--- Promise.all Example (void) ---" << std::endl;
    std::vector<Promise<void>> void_promises;
    void_promises.push_back(async_log("Log message 1", 400));
    void_promises.push_back(async_log("Log message 2", 200));
    void_promises.push_back(async_log("Log message 3", 600));

    Promise<void>::all(std::move(void_promises))
        .then([]() {
            std::cout << "All void promises completed successfully." << std::endl;
        })
        .catch_error([](std::exception_ptr ex) {
            try {
                std::rethrow_exception(ex);
            } catch (const std::exception& e) {
                std::cerr << "Promise.all(void) failed: " << e.what() << std::endl;
            }
        });

    // Keep main thread alive long enough for async operations to complete
    std::this_thread::sleep_for(std::chrono::seconds(5));
    std::cout << "Main finished." << std::endl;
    return 0;
}

四、 进阶考量与优化

4.1 执行器/调度器

在我们的示例中,回调的异步执行简单地使用了std::async(std::launch::async, cb)。这虽然方便,但在生产环境中并不理想:

  • 每次std::async都可能创建新线程,导致线程创建销毁开销大,资源管理混乱。
  • 无法控制并发度,可能导致系统过载。

一个更优的方案是引入一个执行器(Executor)调度器(Scheduler)。这是一个负责管理线程池并向其提交任务的组件。resolve/reject方法会将回调任务提交给执行器,由执行器决定何时何地执行这些任务。

// Example of a simple Executor interface
class Executor {
public:
    virtual ~Executor() = default;
    virtual void post(std::function<void()> task) = 0;
};

// A simple thread pool executor (simplified for brevity)
class ThreadPoolExecutor : public Executor {
    // ... implementation with std::thread, std::queue, std::condition_variable ...
public:
    void post(std::function<void()> task) override {
        // Add task to queue and notify worker thread
    }
};

// Then, in Promise, pass an Executor instance:
// Promise::resolve(const T& val, Executor& executor) {
//     ...
//     executor.post(cb);
//     ...
// }

通过这种方式,我们可以集中管理异步任务的执行,提高效率和控制力。

4.2 错误处理链的精细化

Promise 的错误处理模型非常强大。catch_error 方法实际上是 then(nullptr, on_rejected) 的语法糖。一个 on_rejected 回调可以:

  • 恢复(Recover):如果 on_rejected 成功执行并返回一个值(或一个成功的 Promise),那么后续的 Promise 链将从这个值开始继续执行,就好像之前的错误从未发生过一样。
  • 传播(Propagate):如果 on_rejected 没有被提供,或者它再次抛出异常(或返回一个被拒绝的 Promise),错误将继续向下传递,直到遇到下一个 on_rejected 回调。

这种机制使得我们可以将错误处理集中在一个地方,或者在链的不同阶段进行局部错误恢复。

4.3 泛型 Promise.all(异构类型)

我们当前实现的 Promise::all 要求所有输入的 Promise 具有相同的类型 T。如果我们需要聚合不同类型的 Promise(例如 Promise<int>Promise<std::string>),情况会变得复杂。
一种解决方案是使用 std::any 来存储结果:

// Example: Promise<std::any>::all for heterogeneous types
static Promise<std::vector<std::any>> all_heterogeneous(std::vector<Promise<std::any>> promises) {
    // ... similar logic as Promise<T>::all, but store std::any values
    // This requires input promises to already be Promise<std::any>
    // Or, a more complex template that converts input Promise<T> to Promise<std::any>
}

这要求在从 std::any 中取出结果时进行类型转换,增加了运行时开销和潜在的类型错误。更常见且类型安全的方法是,如果结果在逻辑上是同构的,但类型略有不同,则可以将它们映射到一个共同的基类或接口。

4.4 资源管理和生命周期

C++ 中 std::shared_ptr 的使用是管理 Promise 状态生命周期的关键。当一个 Promise 被链式调用或传递时,shared_ptr 确保只要有任何 Promise 实例仍然需要访问其状态,该状态就不会被销毁。这避免了悬空指针和内存泄漏。然而,过度使用 shared_ptr 也可能导致循环引用,需要注意。

在我们的实现中,std::thread::detach() 使得工作线程独立于创建它的线程。这意味着如果主线程退出,这些 detached 的线程可能仍在运行,或者如果它们尝试访问已被销毁的资源,可能会导致未定义行为。在实际应用中,应使用 std::jthread 或自定义线程池来管理线程的生命周期,确保在程序退出前所有工作线程都能优雅地完成或停止。

五、 展望:C++ 协程与异步编程的未来

尽管我们构建的 Promise 库非常强大且灵活,但它仍然基于回调和 lambda 表达式,可能导致代码结构相对复杂。C++20 引入的协程(Coroutines)为异步编程带来了更革命性的方法。

协程允许我们编写看起来像同步的异步代码,通过 co_awaitco_yieldco_return 等关键字暂停和恢复函数的执行。一个 Promise 库可以作为协程的底层实现,将协程的暂停/恢复操作映射到 Promise 的状态变化。

例如,一个基于协程的异步函数可能看起来像这样:

// Imaginary C++20+ Coroutine example
// (Requires a coroutine-enabled Promise/Future library)
Promise<int> co_async_sum(int a, int b) {
    int res1 = co_await async_add(a, b, 1000);
    int res2 = co_await async_add(res1, 5, 500);
    co_return res2;
}

// Usage:
co_async_sum(1, 2).then([](int final_result){
    std::cout << "Coroutine sum: " << final_result << std::endl;
});

这种模式极大地提高了异步代码的可读性和可维护性,是 C++ 异步编程的未来方向。然而,底层的 Promise 机制仍然是支撑这些高级抽象的基石。

构建Continuable Promises,特别是像Promise.all这样的组合子,是掌握现代异步编程范式的关键一步。它教会我们如何优雅地处理异步操作的生命周期、结果传递和错误恢复。通过细致的线程安全设计和对C++语言特性的灵活运用,我们可以在C++中实现与JavaScript等其他语言同样强大且富有表现力的异步编程模型。随着C++标准的不断演进,尤其是协程的引入,异步编程在C++中的体验将变得越来越流畅和高效。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注