各位同仁,大家好。今天我们将深入探讨一个在现代异步编程中至关重要的概念——“Continuable Promises”,并以C++为舞台,构建一个类似JavaScript中强大的Promise.all组合子。作为一名编程专家,我将以讲座的形式,逐步解析其设计理念、实现细节以及在C++复杂环境下的考量。
一、 承诺的本质:异步操作的优雅管理
在进入“Continuable Promises”的具体实现之前,我们首先要理解“Promise”这一概念的核心。在传统的同步编程中,当一个函数被调用时,它会立即执行并返回结果。然而,在许多现代应用场景中,我们不得不面对耗时操作,如网络请求、文件I/O或复杂的计算。这些操作如果同步执行,会阻塞主线程,导致用户界面卡顿甚至程序无响应。
异步编程应运而生,它允许我们在后台执行这些耗时操作,并在操作完成时通知我们结果。但传统的异步编程往往伴随着“回调地狱”(Callback Hell),即多层嵌套的回调函数导致代码难以阅读、维护和错误处理。
Promise(承诺)正是为了解决这些问题而诞生的。它代表了一个异步操作的最终结果,这个结果可能在未来某个时间点可用,也可能永远不会可用(因为操作失败)。一个Promise对象有以下几个核心特性:
-
状态(State):
- Pending (待定):初始状态,既没有成功,也没有失败。
- Fulfilled (已成功):操作成功完成,并返回了一个结果值。
- Rejected (已失败):操作失败,并返回了一个错误原因。
一旦一个Promise进入Fulfilled或Rejected状态,它就变成了Settled (已落定)状态,且其状态不可再改变。
-
值(Value)或原因(Reason):
- 当Promise Fulfilled时,它会带上一个成功的值。
- 当Promise Rejected时,它会带上一个失败的原因(通常是一个异常对象)。
-
链式调用(Chaining):
Promise最强大的特性之一是其可链式调用。通过then方法,我们可以在一个Promise完成之后,执行另一个操作,这个操作本身也可以返回一个新的Promise,从而形成一个异步操作序列。
Continuable Promises,顾名思义,就是那些可以被“延续”的Promise。它们允许我们指定当Promise成功或失败时应该执行哪些后续操作,并且这些后续操作的结果又可以生成新的Promise,形成一个无缝的异步工作流。这正是我们今天构建C++ Promise库的核心目标。
二、 C++ Promise 的基础构建:状态、值与回调机制
为了在C++中构建Continuable Promises,我们需要一个核心的Promise类。这个类需要管理Promise的状态、存储结果或异常,并提供注册回调函数的机制。
2.1 Promise 状态和数据结构
我们首先定义Promise的状态。为了通用性,Promise应该是一个模板类,可以处理任何类型的结果。
#include <functional>
#include <memory>
#include <mutex>
#include <vector>
#include <exception>
#include <optional>
#include <future> // For std::async, though a custom thread pool is better for production
// Forward declaration for mutual dependency
template <typename T> class Promise;
// Type erasure for callbacks returning void
template <typename R>
struct OnFulfilledCallback {
std::function<void(R)> func;
};
// Type erasure for callbacks returning Promise<U>
template <typename R, typename U>
struct OnFulfilledCallbackPromise {
std::function<Promise<U>(R)> func;
};
// Type erasure for error callbacks
struct OnRejectedCallback {
std::function<void(std::exception_ptr)> func;
};
// Internal state of a promise, shared across copies
template <typename T>
struct PromiseState {
enum State { PENDING, FULFILLED, REJECTED };
State state = PENDING;
std::optional<T> value; // Using std::optional to hold value only if fulfilled
std::exception_ptr exception; // Holds exception if rejected
// Callbacks waiting for this promise to settle
// Using std::vector<std::function<void()>> to store type-erased callbacks
// These callbacks will trigger the resolution/rejection of the *next* promise in the chain
std::vector<std::function<void()>> on_settled_callbacks;
std::mutex mutex; // Protects state, value, exception, and callback vector
std::condition_variable cv; // For synchronous waiting (e.g., in a `get()` method, not part of core promise chaining)
// For `Promise<void>` specialization
// This is a common pattern: `Promise<void>` doesn't hold a value.
// For simplicity, we'll keep `std::optional<T>` and just ignore `value` for `void`.
// A full implementation would specialize `PromiseState<void>`.
};
表格:Promise 状态
| 状态 | 描述 | 结果 |
|---|---|---|
PENDING |
初始状态,操作正在进行中 | 无 |
FULFILLED |
操作成功完成 | 成功值 T |
REJECTED |
操作失败,通常带有错误信息 exception_ptr |
错误原因 std::exception_ptr |
2.2 Promise 类结构
现在我们来定义Promise类。为了允许多个Promise对象引用同一个异步操作的结果(例如,当一个Promise被传递给多个then调用时),我们需要使用std::shared_ptr来管理其内部状态。
template <typename T>
class Promise {
private:
std::shared_ptr<PromiseState<T>> state_;
// Private constructor for internal use (e.g., by then() method)
Promise(std::shared_ptr<PromiseState<T>> state) : state_(std::move(state)) {}
// Helper to resolve/reject a promise, handling state transitions and callbacks
void internal_resolve(const T& val) {
{
std::lock_guard<std::mutex> lock(state_->mutex);
if (state_->state != PromiseState<T>::PENDING) {
return; // Already settled
}
state_->state = PromiseState<T>::FULFILLED;
state_->value = val;
}
state_->cv.notify_all(); // Notify any threads waiting on `get()`
// Execute callbacks asynchronously to avoid blocking the resolver
for (const auto& cb : state_->on_settled_callbacks) {
// In a real system, this would dispatch to a thread pool
// For this example, we'll use std::async for simplicity,
// but beware of its limitations (e.g., potential for new threads per callback)
std::async(std::launch::async, cb);
}
state_->on_settled_callbacks.clear(); // Callbacks consumed
}
void internal_reject(std::exception_ptr ex) {
{
std::lock_guard<std::mutex> lock(state_->mutex);
if (state_->state != PromiseState<T>::PENDING) {
return; // Already settled
}
state_->state = PromiseState<T>::REJECTED;
state_->exception = std::move(ex);
}
state_->cv.notify_all();
for (const auto& cb : state_->on_settled_callbacks) {
std::async(std::launch::async, cb);
}
state_->on_settled_callbacks.clear();
}
public:
// Default constructor for an unresolved promise
Promise() : state_(std::make_shared<PromiseState<T>>()) {}
// Public method to resolve the promise
void resolve(const T& val) {
internal_resolve(val);
}
// Public method to reject the promise
void reject(std::exception_ptr ex) {
internal_reject(ex);
}
// Overload for rejecting with an exception object
template <typename E>
void reject(const E& ex) {
internal_reject(std::make_exception_ptr(ex));
}
// `then` method for chaining
// This is where the "continuable" aspect comes in.
// It returns a *new* promise, which will be resolved/rejected based on the callback's result.
template <typename OnFulfilled, typename OnRejected = std::function<void(std::exception_ptr)>>
auto then(OnFulfilled on_fulfilled, OnRejected on_rejected = nullptr) {
using ReturnType = decltype(on_fulfilled(std::declval<T>()));
// If OnFulfilled returns void, the next promise is Promise<void>
// If OnFulfilled returns U, the next promise is Promise<U>
// If OnFulfilled returns Promise<U>, the next promise is Promise<U>
// This requires careful SFINAE or template metaprogramming for robust return type deduction.
// For simplicity in this lecture, let's assume `on_fulfilled` returns a value `U` or `Promise<U>`.
// Let's assume on_fulfilled returns a plain value U for now,
// and we will handle Promise<U> return type later for full spec compliance.
// The most general case is that `then` returns a `Promise<U>` where `U` is the result of `on_fulfilled`.
using NewPromiseValueType = typename std::conditional<
std::is_same_v<ReturnType, void>,
void,
typename std::conditional<
is_promise_v<ReturnType>, // Custom type trait to check if ReturnType is Promise<U>
typename ReturnType::value_type, // If Promise<U>, extract U
ReturnType // If U, then U
>::type
>::type;
// Custom type trait for checking if a type is a Promise
// This is a simplified version. A robust one would check for Promise<T> templates.
template <typename U>
struct is_promise : std::false_type {};
template <typename U>
struct is_promise<Promise<U>> : std::true_type {};
template <typename U>
inline constexpr bool is_promise_v = is_promise<U>::value;
// The new promise that this `then` call will return
auto next_promise = Promise<NewPromiseValueType>();
// We need to capture `state_` by value (shared_ptr) to ensure it lives
// as long as the lambda callback might be executed.
auto current_state = state_;
// The actual logic of `then`
// Register a callback on the current promise's state.
// This callback will be executed when the current promise settles.
std::function<void()> callback = [
current_state,
next_promise_state = next_promise.state_, // Capture the state of the next promise
on_fulfilled_copy = std::move(on_fulfilled),
on_rejected_copy = std::move(on_rejected)
]() mutable {
std::lock_guard<std::mutex> lock(current_state->mutex); // Protect current_state access
if (current_state->state == PromiseState<T>::FULFILLED) {
if (on_fulfilled_copy) {
try {
// Execute on_fulfilled and resolve/reject the next promise based on its result
// This part needs careful handling of return types (value vs. Promise)
// For simplicity, let's assume on_fulfilled returns a value U or void for now
if constexpr (std::is_void_v<NewPromiseValueType>) {
// If on_fulfilled returns void
on_fulfilled_copy(current_state->value.value());
Promise<void>(next_promise_state).resolve(); // Resolve the next void promise
} else if constexpr (is_promise_v<ReturnType>) {
// If on_fulfilled returns Promise<U>
ReturnType inner_promise = on_fulfilled_copy(current_state->value.value());
// Link the inner_promise to next_promise
inner_promise.then(
[next_promise_state](const typename ReturnType::value_type& val) {
Promise<typename ReturnType::value_type>(next_promise_state).resolve(val);
},
[next_promise_state](std::exception_ptr ex) {
Promise<typename ReturnType::value_type>(next_promise_state).reject(ex);
}
);
} else {
// If on_fulfilled returns a value U
next_promise.resolve(on_fulfilled_copy(current_state->value.value()));
}
} catch (...) {
next_promise.reject(std::current_exception());
}
} else {
// No on_fulfilled handler, propagate value to next promise
next_promise.resolve(current_state->value.value());
}
} else if (current_state->state == PromiseState<T>::REJECTED) {
if (on_rejected_copy) {
try {
// An on_rejected handler can "recover" from an error by returning a value (or void)
// If it throws, the error propagates.
// If it returns a value, the next promise is resolved.
// Here, we assume on_rejected returns void or a value that resolves the next promise.
// For a full spec, on_rejected can return Promise<U> too.
if constexpr (std::is_void_v<NewPromiseValueType>) {
on_rejected_copy(current_state->exception);
Promise<void>(next_promise_state).resolve();
} else if constexpr (is_promise_v<ReturnType>) {
ReturnType inner_promise = on_rejected_copy(current_state->exception);
inner_promise.then(
[next_promise_state](const typename ReturnType::value_type& val) {
Promise<typename ReturnType::value_type>(next_promise_state).resolve(val);
},
[next_promise_state](std::exception_ptr ex) {
Promise<typename ReturnType::value_type>(next_promise_state).reject(ex);
}
);
} else {
next_promise.resolve(on_rejected_copy(current_state->exception)); // This implies on_rejected returns T
}
} catch (...) {
next_promise.reject(std::current_exception());
}
} else {
// No on_rejected handler, propagate error to next promise
next_promise.reject(current_state->exception);
}
}
};
// Add the callback to the current promise's state
{
std::lock_guard<std::mutex> lock(current_state->mutex);
if (current_state->state == PromiseState<T>::PENDING) {
current_state->on_settled_callbacks.push_back(std::move(callback));
} else {
// If the promise is already settled, execute callback immediately (asynchronously)
std::async(std::launch::async, std::move(callback));
}
}
return next_promise;
}
// `catch` method for error handling (syntactic sugar for then(nullptr, on_rejected))
template <typename OnRejected>
auto catch_error(OnRejected on_rejected) {
return then(nullptr, on_rejected);
}
// Synchronous wait for result (discouraged in async patterns, but useful for testing/blocking scenarios)
T get() {
std::unique_lock<std::mutex> lock(state_->mutex);
state_->cv.wait(lock, [this]{ return state_->state != PromiseState<T>::PENDING; });
if (state_->state == PromiseState<T>::FULFILLED) {
return state_->value.value();
} else {
std::rethrow_exception(state_->exception);
}
}
// Specialization for Promise<void>
void resolve() requires std::is_void_v<T> {
internal_resolve(); // Needs a void-specific internal_resolve
}
void get() requires std::is_void_v<T> {
std::unique_lock<std::mutex> lock(state_->mutex);
state_->cv.wait(lock, [this]{ return state_->state != PromiseState<T>::PENDING; });
if (state_->state == PromiseState<T>::REJECTED) {
std::rethrow_exception(state_->exception);
}
}
};
// Specialization for Promise<void> (simplified, would ideally be a full specialization)
template <>
struct PromiseState<void> {
enum State { PENDING, FULFILLED, REJECTED };
State state = PENDING;
std::exception_ptr exception;
std::vector<std::function<void()>> on_settled_callbacks;
std::mutex mutex;
std::condition_variable cv;
};
template <>
class Promise<void> {
private:
std::shared_ptr<PromiseState<void>> state_;
Promise(std::shared_ptr<PromiseState<void>> state) : state_(std::move(state)) {}
void internal_resolve() {
{
std::lock_guard<std::mutex> lock(state_->mutex);
if (state_->state != PromiseState<void>::PENDING) return;
state_->state = PromiseState<void>::FULFILLED;
}
state_->cv.notify_all();
for (const auto& cb : state_->on_settled_callbacks) {
std::async(std::launch::async, cb);
}
state_->on_settled_callbacks.clear();
}
void internal_reject(std::exception_ptr ex) {
{
std::lock_guard<std::mutex> lock(state_->mutex);
if (state_->state != PromiseState<void>::PENDING) return;
state_->state = PromiseState<void>::REJECTED;
state_->exception = std::move(ex);
}
state_->cv.notify_all();
for (const auto& cb : state_->on_settled_callbacks) {
std::async(std::launch::async, cb);
}
state_->on_settled_callbacks.clear();
}
public:
Promise() : state_(std::make_shared<PromiseState<void>>()) {}
void resolve() { internal_resolve(); }
template <typename E> void reject(const E& ex) { internal_reject(std::make_exception_ptr(ex)); }
void reject(std::exception_ptr ex) { internal_reject(ex); }
// Simplified then for Promise<void>
template <typename OnFulfilled, typename OnRejected = std::function<void(std::exception_ptr)>>
auto then(OnFulfilled on_fulfilled, OnRejected on_rejected = nullptr) {
using ReturnType = decltype(on_fulfilled()); // Call with no args for void promise
using NewPromiseValueType = typename std::conditional<
std::is_same_v<ReturnType, void>,
void,
typename std::conditional<
is_promise_v<ReturnType>,
typename ReturnType::value_type,
ReturnType
>::type
>::type;
auto next_promise = Promise<NewPromiseValueType>();
auto current_state = state_;
std::function<void()> callback = [
current_state,
next_promise_state = next_promise.state_,
on_fulfilled_copy = std::move(on_fulfilled),
on_rejected_copy = std::move(on_rejected)
]() mutable {
std::lock_guard<std::mutex> lock(current_state->mutex);
if (current_state->state == PromiseState<void>::FULFILLED) {
if (on_fulfilled_copy) {
try {
if constexpr (std::is_void_v<NewPromiseValueType>) {
on_fulfilled_copy();
Promise<void>(next_promise_state).resolve();
} else if constexpr (is_promise_v<ReturnType>) {
ReturnType inner_promise = on_fulfilled_copy();
inner_promise.then(
[next_promise_state](const typename ReturnType::value_type& val) { Promise<typename ReturnType::value_type>(next_promise_state).resolve(val); },
[next_promise_state](std::exception_ptr ex) { Promise<typename ReturnType::value_type>(next_promise_state).reject(ex); }
);
} else {
next_promise.resolve(on_fulfilled_copy());
}
} catch (...) {
next_promise.reject(std::current_exception());
}
} else {
Promise<void>(next_promise_state).resolve(); // Propagate void fulfillment
}
} else if (current_state->state == PromiseState<void>::REJECTED) {
if (on_rejected_copy) {
try {
if constexpr (std::is_void_v<NewPromiseValueType>) {
on_rejected_copy(current_state->exception);
Promise<void>(next_promise_state).resolve();
} else if constexpr (is_promise_v<ReturnType>) {
ReturnType inner_promise = on_rejected_copy(current_state->exception);
inner_promise.then(
[next_promise_state](const typename ReturnType::value_type& val) { Promise<typename ReturnType::value_type>(next_promise_state).resolve(val); },
[next_promise_state](std::exception_ptr ex) { Promise<typename ReturnType::value_type>(next_promise_state).reject(ex); }
);
} else {
next_promise.resolve(on_rejected_copy(current_state->exception));
}
} catch (...) {
next_promise.reject(std::current_exception());
}
} else {
next_promise.reject(current_state->exception);
}
}
};
{
std::lock_guard<std::mutex> lock(current_state->mutex);
if (current_state->state == PromiseState<void>::PENDING) {
current_state->on_settled_callbacks.push_back(std::move(callback));
} else {
std::async(std::launch::async, std::move(callback));
}
}
return next_promise;
}
template <typename OnRejected> auto catch_error(OnRejected on_rejected) { return then(nullptr, on_rejected); }
void get() {
std::unique_lock<std::mutex> lock(state_->mutex);
state_->cv.wait(lock, [this]{ return state_->state != PromiseState<void>::PENDING; });
if (state_->state == PromiseState<void>::REJECTED) {
std::rethrow_exception(state_->exception);
}
}
};
代码解析要点:
PromiseState<T>:这是一个辅助结构体,包含了Promise的实际状态、值/异常和等待回调。它通过std::shared_ptr在多个Promise实例之间共享,确保所有引用同一异步操作的Promise都能看到相同的最新状态。std::mutex和std::condition_variable:std::mutex用于保护PromiseState的所有共享成员,防止多线程访问冲突。std::condition_variable主要用于get()方法,允许线程等待Promise完成。resolve(const T& val)和reject(std::exception_ptr ex):这些是设置Promise最终状态的方法。它们会:- 加锁以修改状态。
- 如果Promise已落定,则忽略后续调用。
- 设置状态和值/异常。
- 通知所有等待的线程(通过
cv.notify_all())。 - 异步执行所有注册的回调。这里我们使用
std::async,但在生产环境中,一个专门的线程池会是更好的选择,以控制资源和避免无限创建新线程。
then(OnFulfilled on_fulfilled, OnRejected on_rejected):这是Promise链的核心。- 它总是返回一个新的
Promise实例,这使得链式调用成为可能。 - 它将一个Lambda函数(捕获了
on_fulfilled和on_rejected)注册到当前Promise的on_settled_callbacks列表中。 - 当当前Promise落定后,这个Lambda会被执行。
- 在Lambda内部,根据当前Promise是Fulfilled还是Rejected,调用相应的
on_fulfilled或on_rejected回调。 - 回调函数的返回值决定了下一个Promise的状态:
- 如果回调返回一个值,下一个Promise就会用这个值来Resolve。
- 如果回调返回另一个Promise(即Promise扁平化),则下一个Promise会“跟随”这个返回的Promise的状态。
- 如果回调抛出异常,下一个Promise就会被Reject。
- 如果没有提供相应的回调,Promise的状态(值或异常)会直接传递给下一个Promise。
- 它总是返回一个新的
Promise<void>特化:为了处理不返回任何值的异步操作(如文件写入),我们需要Promise<void>的特化。它的行为类似,只是不存储具体的值。is_promise_v:一个简化的类型特性,用于在编译时检查一个类型是否是Promise<T>。在实际项目中,这会更复杂。
2.3 线程安全与异步调度
C++ Promise的关键挑战之一是线程安全。由于Promise的状态和回调队列可能在多个线程中被访问和修改(例如,一个线程resolve,另一个线程then),所以必须使用互斥锁 (std::mutex) 来保护共享数据。
此外,回调的调度方式也至关重要。直接在resolve/reject的同一线程中执行回调可能会导致死锁或长时间阻塞生产者线程。因此,我们选择将回调异步调度到另一个线程执行(这里使用std::async)。一个更健壮的方案是使用一个固定大小的线程池,避免std::async可能带来的性能开销和资源浪费。
三、 复合承诺:构建 Promise.all
现在我们有了基本的Promise机制,可以开始构建更高级的组合子。Promise.all是其中一个非常实用的模式,它接受一个Promise数组(或可迭代对象),并返回一个新的Promise。这个新的Promise会在所有输入Promise都成功时成功,其结果是一个包含所有输入Promise结果的数组,顺序与输入Promise的顺序一致。一旦任何一个输入Promise失败,Promise.all返回的Promise就会立即失败,并带上第一个失败Promise的错误。
3.1 Promise.all 的行为模式
表格:Promise.all 行为
| 输入 Promises 状态 | Promise.all 返回的 Promise 状态 |
结果 |
|---|---|---|
| 所有 Fulfilled | Fulfilled | std::vector<T>,包含所有成功值,顺序不变 |
| 任意一个 Rejected | Rejected | 第一个 Rejected Promise 的错误原因 |
3.2 Promise::all 的设计与实现
Promise::all将作为一个静态方法,接收一个std::vector<Promise<T>>并返回Promise<std::vector<T>>。
// Add this static method inside the Promise class (or as a freestanding function)
template <typename T>
class Promise {
// ... existing Promise class members ...
public:
// Static factory method for Promise::all
static Promise<std::vector<T>> all(std::vector<Promise<T>> promises) {
// The promise that will be returned by Promise::all
auto aggregated_promise = Promise<std::vector<T>>();
if (promises.empty()) {
// If no promises are passed, resolve immediately with an empty vector
aggregated_promise.resolve({});
return aggregated_promise;
}
// Shared state for the aggregation logic
struct AllState {
std::mutex mutex;
int pending_count;
std::vector<std::optional<T>> results; // Use optional to distinguish unset from default-constructed
bool rejected_early = false; // Flag to stop processing further rejections
};
auto all_state = std::make_shared<AllState>();
all_state->pending_count = promises.size();
all_state->results.resize(promises.size());
// Iterate through each input promise and attach a then handler
for (size_t i = 0; i < promises.size(); ++i) {
promises[i].then(
// OnFulfilled handler
[i, all_state, aggregated_promise_state = aggregated_promise.state_](const T& val) {
std::lock_guard<std::mutex> lock(all_state->mutex);
if (all_state->rejected_early) {
return; // Already rejected, ignore further completions
}
all_state->results[i] = val;
all_state->pending_count--;
if (all_state->pending_count == 0) {
// All promises fulfilled, aggregate results and resolve the main promise
std::vector<T> final_results;
final_results.reserve(all_state->results.size());
for (const auto& opt_val : all_state->results) {
final_results.push_back(opt_val.value()); // .value() is safe here as all are fulfilled
}
Promise<std::vector<T>>(aggregated_promise_state).resolve(final_results);
}
},
// OnRejected handler
[all_state, aggregated_promise_state = aggregated_promise.state_](std::exception_ptr ex) {
std::lock_guard<std::mutex> lock(all_state->mutex);
if (all_state->rejected_early) {
return; // Already rejected by another promise
}
all_state->rejected_early = true; // Mark as rejected
Promise<std::vector<T>>(aggregated_promise_state).reject(ex); // Reject the main promise
}
);
}
return aggregated_promise;
}
// Specialization for Promise<void>::all
static Promise<void> all(std::vector<Promise<void>> promises) {
auto aggregated_promise = Promise<void>();
if (promises.empty()) {
aggregated_promise.resolve();
return aggregated_promise;
}
struct AllState {
std::mutex mutex;
int pending_count;
bool rejected_early = false;
};
auto all_state = std::make_shared<AllState>();
all_state->pending_count = promises.size();
for (size_t i = 0; i < promises.size(); ++i) {
promises[i].then(
// OnFulfilled handler (for void, no value)
[all_state, aggregated_promise_state = aggregated_promise.state_]() {
std::lock_guard<std::mutex> lock(all_state->mutex);
if (all_state->rejected_early) return;
all_state->pending_count--;
if (all_state->pending_count == 0) {
Promise<void>(aggregated_promise_state).resolve();
}
},
// OnRejected handler
[all_state, aggregated_promise_state = aggregated_promise.state_](std::exception_ptr ex) {
std::lock_guard<std::mutex> lock(all_state->mutex);
if (all_state->rejected_early) return;
all_state->rejected_early = true;
Promise<void>(aggregated_promise_state).reject(ex);
}
);
}
return aggregated_promise;
}
};
Promise::all 实现要点:
- 返回聚合Promise:
Promise::all会立即返回一个新的Promise<std::vector<T>>(或Promise<void>),这个Promise将代表所有输入Promise的聚合结果。 AllState结构体:我们定义了一个内部的AllState结构体来管理Promise.all的聚合逻辑所需的状态:pending_count:记录还有多少个输入Promise尚未落定。results:一个std::vector<std::optional<T>>,用于按索引顺序存储每个成功Promise的结果。std::optional在这里很重要,因为它允许我们区分一个位置是“未设置”还是“已被一个默认构造的值填充”(如果T是可默认构造的)。rejected_early:一个布尔标志,用于一旦有任何一个Promise被拒绝,就立即停止处理其他Promise的成功。
- 循环附加
then处理器:对于promises向量中的每个Promise,我们都附加一个then处理器。- 成功回调 (
on_fulfilled):当一个输入Promise成功时,它的结果会被存储到all_state->results中对应的位置。pending_count减一。如果pending_count变为零,说明所有Promise都已成功,此时aggregated_promise会被解析为收集到的结果向量。 - 失败回调 (
on_rejected):当一个输入Promise失败时,rejected_early标志会被设置为true,并且aggregated_promise会立即被拒绝,并带上这个失败Promise的异常。后续其他Promise的成功或失败都将被忽略。
- 成功回调 (
- 线程安全:
AllState中的所有成员都通过std::mutex进行保护,因为多个then回调可能在不同的线程中并发执行,同时修改pending_count、results或rejected_early。 - 空输入处理:如果输入的
promises向量为空,Promise::all会立即返回一个已成功的Promise,其结果是一个空向量。 Promise<void>::all特化:与Promise<T>类似,但不需要存储结果值,只需追踪完成数量。
3.3 示例用法
#include <iostream>
#include <thread>
#include <chrono>
// Assume Promise class and PromiseState are defined as above
// Helper function to simulate an async operation
Promise<int> async_add(int a, int b, int delay_ms, bool should_fail = false) {
Promise<int> p;
std::thread([=, captured_p = p]() mutable { // Capture p by value to extend its lifetime
std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
if (should_fail) {
captured_p.reject(std::runtime_error("Failed to add!"));
} else {
captured_p.resolve(a + b);
}
}).detach(); // Detach the thread, it will run independently
return p;
}
Promise<std::string> async_fetch_data(const std::string& url, int delay_ms, bool should_fail = false) {
Promise<std::string> p;
std::thread([=, captured_p = p]() mutable {
std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
if (should_fail) {
captured_p.reject(std::runtime_error("Network error fetching " + url));
} else {
captured_p.resolve("Data from " + url);
}
}).detach();
return p;
}
Promise<void> async_log(const std::string& message, int delay_ms) {
Promise<void> p;
std::thread([=, captured_p = p]() mutable {
std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
std::cout << "[LOG] " << message << std::endl;
captured_p.resolve();
}).detach();
return p;
}
int main() {
std::cout << "--- Promise Chaining Example ---" << std::endl;
async_add(5, 3, 1000) // Returns Promise<int>
.then([](int result) {
std::cout << "First add result: " << result << std::endl; // result is 8
return async_add(result, 10, 500); // Returns Promise<int>
})
.then([](int result) {
std::cout << "Second add result: " << result << std::endl; // result is 18
return async_fetch_data("api.example.com/data", 700); // Returns Promise<string>
})
.then([](const std::string& data) {
std::cout << "Fetched data: " << data << std::endl;
return async_log("Data processed successfully!", 200); // Returns Promise<void>
})
.then([]() { // For Promise<void>, the fulfilled handler takes no arguments
std::cout << "All operations completed!" << std::endl;
})
.catch_error([](std::exception_ptr ex) {
try {
std::rethrow_exception(ex);
} catch (const std::exception& e) {
std::cerr << "Caught error in chain: " << e.what() << std::endl;
}
});
std::cout << "n--- Promise.all Example (Success) ---" << std::endl;
std::vector<Promise<int>> int_promises;
int_promises.push_back(async_add(1, 2, 100));
int_promises.push_back(async_add(3, 4, 300));
int_promises.push_back(async_add(5, 6, 200));
Promise<int>::all(std::move(int_promises))
.then([](const std::vector<int>& results) {
std::cout << "All int promises fulfilled. Results: ";
for (int r : results) {
std::cout << r << " ";
}
std::cout << std::endl;
})
.catch_error([](std::exception_ptr ex) {
try {
std::rethrow_exception(ex);
} catch (const std::exception& e) {
std::cerr << "Promise.all(int) failed: " << e.what() << std::endl;
}
});
std::cout << "n--- Promise.all Example (Failure) ---" << std::endl;
std::vector<Promise<std::string>> string_promises;
string_promises.push_back(async_fetch_data("url1", 500));
string_promises.push_back(async_fetch_data("url2", 100, true)); // This one will fail
string_promises.push_back(async_fetch_data("url3", 800));
Promise<std::string>::all(std::move(string_promises))
.then([](const std::vector<std::string>& results) {
std::cout << "All string promises fulfilled. This should not happen." << std::endl;
})
.catch_error([](std::exception_ptr ex) {
try {
std::rethrow_exception(ex);
} catch (const std::exception& e) {
std::cerr << "Promise.all(string) failed as expected: " << e.what() << std::endl;
}
});
std::cout << "n--- Promise.all Example (void) ---" << std::endl;
std::vector<Promise<void>> void_promises;
void_promises.push_back(async_log("Log message 1", 400));
void_promises.push_back(async_log("Log message 2", 200));
void_promises.push_back(async_log("Log message 3", 600));
Promise<void>::all(std::move(void_promises))
.then([]() {
std::cout << "All void promises completed successfully." << std::endl;
})
.catch_error([](std::exception_ptr ex) {
try {
std::rethrow_exception(ex);
} catch (const std::exception& e) {
std::cerr << "Promise.all(void) failed: " << e.what() << std::endl;
}
});
// Keep main thread alive long enough for async operations to complete
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "Main finished." << std::endl;
return 0;
}
四、 进阶考量与优化
4.1 执行器/调度器
在我们的示例中,回调的异步执行简单地使用了std::async(std::launch::async, cb)。这虽然方便,但在生产环境中并不理想:
- 每次
std::async都可能创建新线程,导致线程创建销毁开销大,资源管理混乱。 - 无法控制并发度,可能导致系统过载。
一个更优的方案是引入一个执行器(Executor)或调度器(Scheduler)。这是一个负责管理线程池并向其提交任务的组件。resolve/reject方法会将回调任务提交给执行器,由执行器决定何时何地执行这些任务。
// Example of a simple Executor interface
class Executor {
public:
virtual ~Executor() = default;
virtual void post(std::function<void()> task) = 0;
};
// A simple thread pool executor (simplified for brevity)
class ThreadPoolExecutor : public Executor {
// ... implementation with std::thread, std::queue, std::condition_variable ...
public:
void post(std::function<void()> task) override {
// Add task to queue and notify worker thread
}
};
// Then, in Promise, pass an Executor instance:
// Promise::resolve(const T& val, Executor& executor) {
// ...
// executor.post(cb);
// ...
// }
通过这种方式,我们可以集中管理异步任务的执行,提高效率和控制力。
4.2 错误处理链的精细化
Promise 的错误处理模型非常强大。catch_error 方法实际上是 then(nullptr, on_rejected) 的语法糖。一个 on_rejected 回调可以:
- 恢复(Recover):如果
on_rejected成功执行并返回一个值(或一个成功的 Promise),那么后续的 Promise 链将从这个值开始继续执行,就好像之前的错误从未发生过一样。 - 传播(Propagate):如果
on_rejected没有被提供,或者它再次抛出异常(或返回一个被拒绝的 Promise),错误将继续向下传递,直到遇到下一个on_rejected回调。
这种机制使得我们可以将错误处理集中在一个地方,或者在链的不同阶段进行局部错误恢复。
4.3 泛型 Promise.all(异构类型)
我们当前实现的 Promise::all 要求所有输入的 Promise 具有相同的类型 T。如果我们需要聚合不同类型的 Promise(例如 Promise<int> 和 Promise<std::string>),情况会变得复杂。
一种解决方案是使用 std::any 来存储结果:
// Example: Promise<std::any>::all for heterogeneous types
static Promise<std::vector<std::any>> all_heterogeneous(std::vector<Promise<std::any>> promises) {
// ... similar logic as Promise<T>::all, but store std::any values
// This requires input promises to already be Promise<std::any>
// Or, a more complex template that converts input Promise<T> to Promise<std::any>
}
这要求在从 std::any 中取出结果时进行类型转换,增加了运行时开销和潜在的类型错误。更常见且类型安全的方法是,如果结果在逻辑上是同构的,但类型略有不同,则可以将它们映射到一个共同的基类或接口。
4.4 资源管理和生命周期
C++ 中 std::shared_ptr 的使用是管理 Promise 状态生命周期的关键。当一个 Promise 被链式调用或传递时,shared_ptr 确保只要有任何 Promise 实例仍然需要访问其状态,该状态就不会被销毁。这避免了悬空指针和内存泄漏。然而,过度使用 shared_ptr 也可能导致循环引用,需要注意。
在我们的实现中,std::thread::detach() 使得工作线程独立于创建它的线程。这意味着如果主线程退出,这些 detached 的线程可能仍在运行,或者如果它们尝试访问已被销毁的资源,可能会导致未定义行为。在实际应用中,应使用 std::jthread 或自定义线程池来管理线程的生命周期,确保在程序退出前所有工作线程都能优雅地完成或停止。
五、 展望:C++ 协程与异步编程的未来
尽管我们构建的 Promise 库非常强大且灵活,但它仍然基于回调和 lambda 表达式,可能导致代码结构相对复杂。C++20 引入的协程(Coroutines)为异步编程带来了更革命性的方法。
协程允许我们编写看起来像同步的异步代码,通过 co_await、co_yield 和 co_return 等关键字暂停和恢复函数的执行。一个 Promise 库可以作为协程的底层实现,将协程的暂停/恢复操作映射到 Promise 的状态变化。
例如,一个基于协程的异步函数可能看起来像这样:
// Imaginary C++20+ Coroutine example
// (Requires a coroutine-enabled Promise/Future library)
Promise<int> co_async_sum(int a, int b) {
int res1 = co_await async_add(a, b, 1000);
int res2 = co_await async_add(res1, 5, 500);
co_return res2;
}
// Usage:
co_async_sum(1, 2).then([](int final_result){
std::cout << "Coroutine sum: " << final_result << std::endl;
});
这种模式极大地提高了异步代码的可读性和可维护性,是 C++ 异步编程的未来方向。然而,底层的 Promise 机制仍然是支撑这些高级抽象的基石。
构建Continuable Promises,特别是像Promise.all这样的组合子,是掌握现代异步编程范式的关键一步。它教会我们如何优雅地处理异步操作的生命周期、结果传递和错误恢复。通过细致的线程安全设计和对C++语言特性的灵活运用,我们可以在C++中实现与JavaScript等其他语言同样强大且富有表现力的异步编程模型。随着C++标准的不断演进,尤其是协程的引入,异步编程在C++中的体验将变得越来越流畅和高效。