各位编程专家和技术爱好者们,大家好!
今天,我们将深入探讨一个在现代并发编程中至关重要的话题:如何在C++协程任务流中实现优雅且安全的终止。具体来说,我们将围绕“取消令牌”(Cancellation Tokens)这一核心概念,剖析其设计哲学、实现细节,以及如何将其无缝集成到C++20协程中,从而构建出更健壮、响应更快的并发系统。
在复杂的并发应用中,我们常常需要启动长时间运行的任务。然而,这些任务并非总能一帆风顺地执行到结束。用户可能关闭应用,外部事件可能使其不再需要,或者系统资源可能需要回收。在这种情况下,我们不能简单地粗暴中断一个正在执行的线程或协程,那将导致资源泄露、数据损坏或未定义的行为。我们需要一种机制,能够以合作的方式,通知正在执行的任务“请你尽快停止”,并允许它在停止前完成必要的清理工作。这就是取消令牌的用武之地。
开篇:并发任务的困境与优雅终止的需求
在多线程和异步编程的世界里,任务的生命周期管理是一个永恒的挑战。考虑一个典型的场景:你启动了一个后台任务,例如从网络下载大文件、执行复杂的科学计算、或者处理一连串的数据库事务。如果用户在任务完成前点击了“取消”按钮,或者程序的某个逻辑分支决定不再需要这个任务的结果,我们应该如何处理?
最糟糕的方案是直接杀死线程或协程。这就像在手术进行到一半时,突然切断病人的生命支持系统,后果不堪设想。被强制终止的任务可能:
- 泄露资源:文件句柄、网络连接、内存分配等可能没有被正确释放。
- 破坏数据一致性:如果任务正在修改共享数据结构,突然终止可能导致数据处于中间状态。
- 死锁或活锁:如果任务持有锁但未能释放就被终止,其他线程可能永远无法获取到该锁。
- 未定义的行为:在C++标准中,强制终止一个线程除了
std::terminate之外,没有标准的、安全的方法。协程也面临类似问题。
因此,我们需要一种“软终止”机制,即任务能够主动响应终止请求,并在退出前执行清理工作。这种机制的核心思想是“合作式取消”(Cooperative Cancellation)。任务本身需要周期性地检查一个“取消信号”,一旦发现信号被触发,便开始优雅地收尾并退出。取消令牌正是这种信号的一种标准且高效的封装。
理解取消令牌(Cancellation Tokens)的核心理念
取消令牌并非一个C++标准库直接提供的概念,但其思想在许多现代并发框架(如.NET的CancellationTokenSource/CancellationToken,以及许多C++异步库)中都有体现。它的核心理念是将取消请求抽象为两个主要组件:
CancellationTokenSource:取消信号的“发布者”或“源头”。它负责触发取消操作,并管理所有与之关联的取消令牌。当CancellationTokenSource被通知取消时,它会向所有关联的CancellationToken广播这个信号。CancellationToken:取消信号的“接收者”或“消费者”。它是一个轻量级的对象,可以传递给各种任务和协程。任务通过检查CancellationToken的状态来判断是否应该停止。它还可以注册回调函数,以便在取消发生时执行特定操作。
这种设计将取消的“发起”与“响应”解耦,使得任务的取消逻辑可以被清晰地隔离和复用。
基本工作流程概览:
- 创建一个
CancellationTokenSource实例。 - 从
CancellationTokenSource获取一个或多个CancellationToken实例。 - 将
CancellationToken传递给需要被取消的任务或协程。 - 任务/协程在执行过程中,周期性地检查
CancellationToken的is_cancellation_requested()方法。 - 如果任务发现取消被请求,它会执行清理工作,然后以适当的方式(例如抛出异常、返回特定值)退出。
- 在需要取消任务时,调用
CancellationTokenSource的cancel()方法。 cancel()方法会设置内部状态,并通知所有注册到CancellationToken上的回调函数。
| 组件 | 职责 | 主要方法 |
|---|---|---|
CancellationTokenSource |
发起取消请求,管理取消状态和所有关联的令牌 | cancel(): 触发取消get_token(): 获取关联的CancellationToken~CancellationTokenSource(): 析构时解除所有回调注册 |
CancellationToken |
检查取消状态,注册/注销取消回调 | is_cancellation_requested(): 检查是否已请求取消throw_if_cancellation_requested(): 如果已请求取消则抛出异常register_callback(): 注册一个在取消时执行的回调 |
C++并发基础与协程:构建取消机制的基石
在深入实现之前,我们快速回顾一下C++中与并发和异步相关的关键概念。
C++并发原语:
std::atomic<bool>:用于表示取消状态的原子布尔值,确保跨线程的可见性和修改的原子性。std::mutex:用于保护共享数据(例如注册的回调列表)的访问,防止竞态条件。std::condition_variable:在需要等待取消信号或通知取消时非常有用,但对于我们这里以轮询为主的取消机制,它的直接作用可能不如回调机制显著。然而,在实现await_suspend等阻塞操作时,它仍是必不可少的。std::future/std::promise:虽然它们提供了异步结果的传递,但它们本身不直接提供取消机制。一个std::promise一旦set_value或set_exception,就无法再“取消”其关联的std::future。
C++20协程基础:
C++20协程(Coroutines)提供了一种编写异步非阻塞代码的新范式,它允许函数在执行过程中暂停(co_await)和恢复,而无需显式的回调或状态机。理解以下几个核心概念对于集成取消令牌至关重要:
co_await:暂停当前协程,等待一个“可等待对象”(Awaitable)完成。co_return:从协程返回一个值。co_yield:暂停协程并产生一个值(用于生成器协程)。promise_type:每个协程都关联一个promise_type,它定义了协程的生命周期行为,包括如何创建协程、如何处理返回值和异常、以及如何处理co_await操作。- Awaitable / Awaiter:一个可等待对象需要实现
await_ready()、await_suspend()和await_resume()三个方法。await_ready():如果立即可以恢复,返回true,不暂停。await_suspend(std::coroutine_handle<P>):暂停协程。在这里可以调度协程到线程池,或者注册唤醒回调。await_resume():协程恢复后执行,返回co_await表达式的结果。
为何标准库没有内置的取消机制?
C++标准库通常倾向于提供底层原语,而不是高层策略。取消机制的设计往往与特定的异步模型(例如基于事件循环、线程池或特定IO库)紧密耦合。一个通用的、适用于所有场景的取消模型很难标准化,因此C++标准将这个责任留给了库的实现者。这也正是我们今天自己构建取消令牌系统的原因。
设计一个C++风格的取消令牌系统
现在,让我们着手设计并实现我们自己的CancellationTokenSource和CancellationToken。
CancellationTokenSource 的设计
CancellationTokenSource是取消请求的源头。它需要:
- 一个原子布尔变量来存储取消状态。
- 一个互斥量来保护对回调列表的并发访问。
- 一个回调函数列表,用于存储当取消发生时需要执行的操作。
#include <atomic>
#include <vector>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <stdexcept>
#include <memory>
#include <coroutine>
#include <thread>
#include <chrono>
#include <iostream>
// 前向声明,因为CancellationToken需要访问CancellationTokenSource的内部状态
class CancellationToken;
// 用于存储CancellationTokenSource和CancellationToken共享的状态
struct CancellationTokenState {
std::atomic<bool> cancelled_ = false;
std::mutex mutex_;
std::condition_variable cv_;
// 回调列表,存储std::function<void()>,当取消发生时执行
std::vector<std::function<void()>> callbacks_;
void add_callback(std::function<void()> callback) {
if (cancelled_.load(std::memory_order_acquire)) {
// 如果已经取消,立即执行回调
callback();
} else {
std::lock_guard<std::mutex> lock(mutex_);
if (cancelled_.load(std::memory_order_acquire)) {
// 双重检查,防止在锁获取前被取消
callback();
} else {
callbacks_.push_back(std::move(callback));
}
}
}
void remove_callback(void* callback_id) {
// 实际实现中,callback_id需要更健壮的方式来标识和移除特定的回调
// 简单示例中,我们暂不实现精确移除,而是依赖shared_ptr的生命周期管理
// 对于高级场景,可以考虑返回一个Registration对象,其析构函数执行移除
}
void cancel() {
if (!cancelled_.exchange(true, std::memory_order_acq_rel)) {
// 只有当状态从false变为true时才执行通知
std::vector<std::function<void()>> local_callbacks;
{
std::lock_guard<std::mutex> lock(mutex_);
local_callbacks.swap(callbacks_); // 移动回调到局部变量,减少锁持有时间
}
// 释放锁后执行回调,避免回调内部再次获取锁导致死锁
for (const auto& callback : local_callbacks) {
try {
callback();
} catch (const std::exception& e) {
// 记录回调中发生的异常,避免影响其他回调
std::cerr << "CancellationToken callback threw exception: " << e.what() << std::endl;
}
}
cv_.notify_all(); // 通知所有等待取消信号的线程/协程
}
}
};
class CancellationTokenSource {
private:
std::shared_ptr<CancellationTokenState> state_;
public:
CancellationTokenSource() : state_(std::make_shared<CancellationTokenState>()) {}
// 禁用拷贝构造和赋值,因为CancellationTokenSource通常是唯一的控制点
CancellationTokenSource(const CancellationTokenSource&) = delete;
CancellationTokenSource& operator=(const CancellationTokenSource&) = delete;
// 允许移动语义
CancellationTokenSource(CancellationTokenSource&&) noexcept = default;
CancellationTokenSource& operator=(CancellationTokenSource&&) noexcept = default;
CancellationToken get_token(); // 在CancellationToken定义后实现
void cancel() {
state_->cancel();
}
bool is_cancellation_requested() const {
return state_->cancelled_.load(std::memory_order_acquire);
}
};
CancellationToken 的设计
CancellationToken是轻量级的,它仅仅是CancellationTokenState的一个弱引用或共享引用。它不拥有状态,只是访问状态。
// 自定义异常,用于表示操作被取消
class OperationCanceledException : public std::runtime_error {
public:
OperationCanceledException() : std::runtime_error("Operation was canceled.") {}
explicit OperationCanceledException(const std::string& message) : std::runtime_error(message) {}
};
class CancellationToken {
private:
std::shared_ptr<CancellationTokenState> state_;
// 允许CancellationTokenSource访问私有构造函数
friend class CancellationTokenSource;
// 私有构造函数,只能通过CancellationTokenSource::get_token()创建
explicit CancellationToken(std::shared_ptr<CancellationTokenState> state)
: state_(std::move(state)) {}
public:
CancellationToken() = default; // 允许默认构造空令牌 (不关联任何源)
bool is_cancellation_requested() const {
return state_ && state_->cancelled_.load(std::memory_order_acquire);
}
void throw_if_cancellation_requested() const {
if (is_cancellation_requested()) {
throw OperationCanceledException();
}
}
// 注册一个回调函数,当取消发生时执行。
// 返回一个可用于取消注册的对象(此处简化,实际可能返回一个unique_ptr<Registration>)
// 为了简化示例,我们暂时不提供精确的注销机制,而是依赖shared_ptr的生命周期
template<typename Func>
void register_callback(Func&& callback) const {
if (!state_) {
// 如果没有关联状态,则永远不会取消
return;
}
state_->add_callback(std::forward<Func>(callback));
}
// 等待取消的辅助函数 (通常用于阻塞线程,但在协程中我们会有更优雅的方式)
void wait_for_cancellation() const {
if (!state_) return;
std::unique_lock<std::mutex> lock(state_->mutex_);
state_->cv_.wait(lock, [this]{ return state_->cancelled_.load(std::memory_order_acquire); });
}
};
// CancellationTokenSource::get_token() 的实现
inline CancellationToken CancellationTokenSource::get_token() {
return CancellationToken(state_);
}
设计要点分析:
std::shared_ptr<CancellationTokenState>:CancellationTokenSource和CancellationToken共享同一个CancellationTokenState实例。当所有关联的CancellationToken和CancellationTokenSource都销毁时,CancellationTokenState才会被释放。- 原子操作:
cancelled_使用std::atomic<bool>确保了多线程环境下取消状态的正确读写。 - 互斥量与条件变量:
mutex_保护了callbacks_列表的并发访问。cv_用于通知等待取消的线程(尽管在协程中,我们会倾向于使用co_await而非阻塞等待)。 - 回调机制:
add_callback允许任务注册在取消发生时执行的清理函数。这对于那些需要在取消时执行特定逻辑的组件非常有用,例如关闭文件、释放网络连接等。 - 异常:
OperationCanceledException是推荐的取消通知方式。当任务检查到取消信号时,抛出此异常,可以沿着调用栈传播,让上层代码捕获并处理。 throw_if_cancellation_requested():这是一个非常方便的辅助函数,简化了在任务中检查取消状态的代码。
将取消令牌集成到协程中
现在我们有了取消令牌的基本框架,是时候将其与C++20协程结合起来了。目标是让协程能够:
- 在
co_await一个可等待对象时,如果取消已被请求,则不暂停,而是立即退出。 - 在协程内部执行计算密集型任务时,定期检查取消状态。
- 将取消令牌有效地传递给嵌套的子协程。
使Awaitable可取消
一个典型的可等待对象,例如一个延迟操作(delay),在没有取消机制时会简单地暂停。现在,我们希望它在暂停前和暂停期间都能响应取消。
// 协程任务的返回类型,通常是一个future-like对象
template<typename T>
struct CoroutineFuture {
struct promise_type {
T value_;
std::exception_ptr exception_;
std::coroutine_handle<> continuation_;
CoroutineFuture get_return_object() {
return CoroutineFuture{std::coroutine_handle<promise_type>::from_promise(*this)};
}
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; } // 协程结束后暂停,等待调用者resume
void return_value(T value) { value_ = std::move(value); }
void unhandled_exception() { exception_ = std::current_exception(); }
// 当协程被co_await时,会调用此函数
auto await_transform(std::suspend_always) { return std::suspend_always{}; }
auto await_transform(std::suspend_never) { return std::suspend_never{}; }
// 处理CancellationToken,使其能够作为co_await表达式的一部分
CancellationToken await_transform(CancellationToken token) {
return token;
}
};
std::coroutine_handle<promise_type> handle_;
explicit CoroutineFuture(std::coroutine_handle<promise_type> h) : handle_(h) {}
CoroutineFuture(CoroutineFuture&& other) noexcept : handle_(std::exchange(other.handle_, {})) {}
~CoroutineFuture() { if (handle_) handle_.destroy(); }
T get() {
if (!handle_) throw std::runtime_error("CoroutineFuture moved or invalid.");
while (!handle_.done()) {
handle_.resume(); // 简单示例,实际可能需要调度器
}
if (handle_.promise().exception_) {
std::rethrow_exception(handle_.promise().exception_);
}
return std::move(handle_.promise().value_);
}
};
// -----------------------------------------------------------------------------
// 可取消的延时Awaitable
// -----------------------------------------------------------------------------
struct CancellableDelay {
std::chrono::milliseconds duration_;
CancellationToken token_;
CancellableDelay(std::chrono::milliseconds duration, CancellationToken token)
: duration_(duration), token_(std::move(token)) {}
bool await_ready() const noexcept {
return token_.is_cancellation_requested(); // 如果已取消,立即就绪,不暂停
}
void await_suspend(std::coroutine_handle<> h) {
if (token_.is_cancellation_requested()) {
// 在await_ready返回false后,但在await_suspend被调用时,
// 令牌可能已被取消。此时我们不暂停协程,而是立即恢复。
// 注意:在这种情况下,h.resume()会导致协程立即恢复,
// 但如果await_ready已经返回true,此方法根本不会被调用。
// 这里的逻辑主要是为了处理await_ready和await_suspend之间的竞态
h.resume();
return;
}
// 注册一个回调,当取消发生时,恢复协程。
// 注意:这里需要确保回调能够安全地恢复协程句柄。
// 一个常见模式是将coroutine_handle存储在shared_ptr中,确保其生命周期。
// 或者使用一个外部调度器来处理resume。
// 为了简化,我们假设CancellationTokenState的生命周期足够长,
// 且协程句柄在回调执行时仍然有效。
// 实际应用中,需要更复杂的生命周期管理。
// 模拟异步操作:在另一个线程中等待或被取消时恢复
std::thread([h, token_copy = token_, duration_ = duration_]() mutable {
// 等待取消或延时结束
std::unique_lock<std::mutex> lock(token_copy.state_->mutex_);
token_copy.state_->cv_.wait_for(lock, duration_, [&]{
return token_copy.is_cancellation_requested();
});
if (h) {
h.resume(); // 唤醒协程
}
}).detach(); // 实际生产代码中不建议detach,应管理线程生命周期
}
void await_resume() {
token_.throw_if_cancellation_requested(); // 恢复时检查取消状态,如果已取消则抛出异常
}
};
// 辅助函数,用于创建CancellableDelay
inline CancellableDelay delay(std::chrono::milliseconds duration, CancellationToken token) {
return CancellableDelay{duration, std::move(token)};
}
CancellableDelay的关键点:
await_ready():这是协程暂停前的第一个检查点。如果token_.is_cancellation_requested()为true,协程将不会暂停,而是直接进入await_resume()。await_suspend():如果await_ready()返回false(即当前未取消),协程将在此处暂停。- 我们注册了一个回调,当取消发生时,它会通过
h.resume()来唤醒协程。 - 为了防止在
await_ready返回false但await_suspend执行前,取消信号到达的竞态条件,我们在await_suspend内部再次检查token_.is_cancellation_requested()。如果此时已取消,则立即恢复协程。 - 为了模拟实际的异步操作,这里启动了一个
std::thread,它会等待指定的duration_或者直到token_被取消。
- 我们注册了一个回调,当取消发生时,它会通过
await_resume():当协程被唤醒时,await_resume()会被调用。在这里,我们再次调用token_.throw_if_cancellation_requested()。如果此时取消已被请求,将抛出OperationCanceledException,协程的执行流将转到异常处理。
协程内部的取消检查
对于那些不涉及co_await的计算密集型任务,协程需要主动在内部检查取消状态。
// -----------------------------------------------------------------------------
// 可取消的长时间运行计算任务
// -----------------------------------------------------------------------------
CoroutineFuture<long long> long_running_computation(int iterations, CancellationToken token) {
long long sum = 0;
for (int i = 0; i < iterations; ++i) {
// 每隔一定迭代次数检查一次取消状态
if (i % 100000 == 0) {
token.throw_if_cancellation_requested();
// 也可以选择 co_await 一个检查点,让调度器有机会切换
// co_await std::suspend_always{}; // 每次检查都暂停一下,让出控制权
}
sum += i;
}
co_return sum;
}
在这个例子中,long_running_computation协程会定期调用token.throw_if_cancellation_requested()。这确保了即使没有co_await操作,协程也能响应取消。
取消的传播与嵌套协程
将CancellationToken从父协程传递给子协程是实现取消传播的关键。
// -----------------------------------------------------------------------------
// 嵌套协程示例
// -----------------------------------------------------------------------------
CoroutineFuture<std::string> sub_task(std::string name, std::chrono::milliseconds delay_ms, CancellationToken token) {
try {
std::cout << "[" << name << "] Sub-task started." << std::endl;
co_await delay(delay_ms, token); // 使用可取消的延时
std::cout << "[" << name << "] Sub-task finished." << std::endl;
co_return "Sub-task " + name + " completed.";
} catch (const OperationCanceledException& e) {
std::cout << "[" << name << "] Sub-task caught cancellation: " << e.what() << std::endl;
throw; // 重新抛出,让上层知道取消
} catch (const std::exception& e) {
std::cerr << "[" << name << "] Sub-task caught unexpected exception: " << e.what() << std::endl;
throw;
}
}
CoroutineFuture<std::string> main_task(CancellationToken token) {
try {
std::cout << "[Main] Main task started." << std::endl;
// 传递相同的token给子任务
auto result1_future = sub_task("A", std::chrono::milliseconds(2000), token);
auto result2_future = sub_task("B", std::chrono::milliseconds(3000), token);
// 尝试等待子任务
std::cout << "[Main] Awaiting sub-task A..." << std::endl;
std::string res1 = co_await result1_future.handle_; // co_await CoroutineFuture的handle
std::cout << "[Main] Sub-task A result: " << res1 << std::endl;
std::cout << "[Main] Awaiting sub-task B..." << std::endl;
std::string res2 = co_await result2_future.handle_;
std::cout << "[Main] Sub-task B result: " << res2 << std::endl;
co_return "Main task completed successfully.";
} catch (const OperationCanceledException& e) {
std::cout << "[Main] Main task caught cancellation: " << e.what() << std::endl;
co_return "Main task canceled.";
} catch (const std::exception& e) {
std::cerr << "[Main] Main task caught unexpected exception: " << e.what() << std::endl;
throw;
}
}
在main_task中,我们创建了两个sub_task,并将同一个token传递给它们。当token被取消时,所有关联的子任务都会收到取消信号,并有机会进行清理。这种模式使得取消能够沿着协程调用链向下传播。
完整的示例程序:
int main() {
std::cout << "Starting cancellation token example." << std::endl;
CancellationTokenSource cts;
CancellationToken token = cts.get_token();
// 启动主任务
auto main_future = main_task(token);
// 启动一个线程,在一段时间后触发取消
std::thread canceller_thread([&cts]() {
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
std::cout << "n--- Requesting cancellation ---n" << std::endl;
cts.cancel();
});
try {
// 获取主任务的结果
std::string final_result = main_future.get();
std::cout << "nFinal Result: " << final_result << std::endl;
} catch (const OperationCanceledException& e) {
std::cout << "nMain future caught cancellation: " << e.what() << std::endl;
} catch (const std::exception& e) {
std::cerr << "nMain future caught unexpected exception: " << e.what() << std::endl;
}
canceller_thread.join();
std::cout << "Example finished." << std::endl;
return 0;
}
运行结果预期:
程序启动后,main_task会启动sub_task A和sub_task B。sub_task A需要2秒,sub_task B需要3秒。
canceller_thread会在2.5秒后调用cts.cancel()。
这意味着sub_task A将在完成前被取消,sub_task B也将在启动后不久被取消。
main_task在co_await result1_future.handle_或co_await result2_future.handle_时,会因为底层的CancellableDelay抛出OperationCanceledException而被中断,并捕获该异常。
Starting cancellation token example.
[Main] Main task started.
[A] Sub-task started.
[B] Sub-task started.
[Main] Awaiting sub-task A...
--- Requesting cancellation ---
[A] Sub-task caught cancellation: Operation was canceled.
[Main] Main task caught cancellation: Operation was canceled.
Example finished.
这正是我们期望的合作式取消行为。
取消策略:合作式与(有限的)抢占式
合作式取消的优势与实践
我们目前所实现的取消机制就是典型的合作式取消。它的核心特点是:
- 任务主动检查:被取消的任务必须在其代码中显式地检查取消状态。
- 可控的清理:任务可以在响应取消之前执行必要的清理工作,如释放锁、关闭文件、回滚事务等。
- 确定性:任务何时停止、以何种状态停止是可预测的,避免了资源泄露和数据损坏。
合作式取消是C++并发编程中推荐的做法,因为它提供了最高的安全性、可控性和可预测性。
抢占式取消的困境与C++的限制
抢占式取消意味着外部实体(例如操作系统或运行时)可以在任务没有显式检查的情况下强制中断其执行。在大多数情况下,C++中实现通用的、安全的抢占式取消是非常困难甚至不可能的,原因如下:
- 资源清理问题:如果一个任务在持有锁、分配内存、或者进行文件I/O时被突然中断,它将没有机会释放这些资源,导致泄露或死锁。
- 栈展开与析构:在C++中,析构函数是清理资源的自然场所。抢占式终止通常不会安全地执行栈展开,因此析构函数可能不会被调用。
- C++标准缺少支持:C++标准没有提供跨平台的、安全地终止任意线程或协程的机制(除了致命错误导致的
std::terminate)。
std::jthread的尝试:
C++20引入了std::jthread,它在析构时会自动join(),并提供了一个stop_token机制,可以向线程发送停止请求。std::jthread的stop_token是合作式取消的一个很好的例子,它允许线程在自己的代码中检查stop_token并决定何时停止。然而,这仍然是合作式的,它不会强制终止线程。它为线程提供了一个request_stop()信号,但线程必须自己响应。
对于协程,抢占式取消的挑战更大,因为协程的执行上下文可以被挂起和恢复,其生命周期与线程不完全绑定。没有标准的方法可以外部强行“销毁”一个正在运行的协程而保证安全。
结论:在C++中,我们应该始终优先考虑和设计合作式取消机制。
错误处理、资源管理与异常安全
在取消场景下,妥善的错误处理和资源管理至关重要。
OperationCanceledException 的设计与处理
我们自定义的OperationCanceledException是一种特殊的异常类型,它表示一个预期的、非错误的终止。
- 捕获和区分:调用者应该能够捕获
OperationCanceledException,并将其与真正的错误(如std::runtime_error)区分开来。通常,当捕获到OperationCanceledException时,不需要记录为错误日志,只需进行清理并正常退出。 - 传播:当子任务被取消时,它应该抛出
OperationCanceledException,让父任务能够感知并做出相应。 std::exception_ptr:在协程中,如果一个协程抛出异常,它的promise_type会捕获这个异常并存储在std::exception_ptr中。当CoroutineFuture::get()被调用时,这个异常会被重新抛出。这对于取消异常同样适用,确保取消信号能够正确传播。
RAII 在取消场景下的重要性
RAII (Resource Acquisition Is Initialization) 是C++中管理资源的核心范式。它在取消场景中尤为重要:
- 自动清理:当协程因取消而抛出异常时,栈会展开。所有在栈上创建的局部对象(如
std::unique_lock,std::fstream,std::vector等)的析构函数都会被调用。这意味着即使任务被取消,这些资源也能得到及时和正确的释放。 - 锁管理:使用
std::unique_lock或std::lock_guard可以确保即使在任务被取消并抛出异常时,锁也能被正确释放。
// 示例:RAII在取消时的应用
CoroutineFuture<void> task_with_resource(CancellationToken token) {
std::cout << "[ResourceTask] Acquiring resource (e.g., file lock)." << std::endl;
std::unique_lock<std::mutex> lock(some_global_mutex); // 模拟获取资源锁
try {
token.throw_if_cancellation_requested(); // 检查取消
// 模拟一些工作
co_await delay(std::chrono::milliseconds(1000), token);
std::cout << "[ResourceTask] Releasing resource." << std::endl;
lock.unlock(); // 正常完成时释放
co_return;
} catch (const OperationCanceledException&) {
std::cout << "[ResourceTask] Cancellation requested. Resource (lock) will be released by RAII." << std::endl;
// lock的析构函数会在栈展开时自动释放互斥量
throw; // 重新抛出,让上层知道取消
}
}
部分完成工作的处理
当任务被取消时,它可能已经完成了一部分工作。如何处理这些部分完成的工作取决于具体业务逻辑:
- 原子性操作:如果任务涉及多步操作,且这些操作需要原子性(要么全做,要么全不做),那么在取消时应该回滚所有已完成的部分。例如,数据库事务可以通过
rollback()来撤销。 - 可恢复性:如果部分完成的工作可以保存并在以后恢复,那么在取消时可以保存当前进度,以便后续从中断点继续。
- 丢弃:如果部分完成的工作没有副作用,或者其结果不重要,那么在取消时可以直接丢弃。
取消令牌本身不提供这些业务逻辑,但它提供了触发这些逻辑的机制。任务在捕获OperationCanceledException时,应根据业务需求执行相应的清理和回滚操作。
高级取消场景与考量
超时取消
超时是一种特殊的取消形式。我们希望一个任务在指定时间内未能完成时自动取消。这可以通过结合CancellationTokenSource和计时器来实现:
// 结合超时和CancellationTokenSource
CoroutineFuture<std::string> task_with_timeout(std::chrono::milliseconds timeout_ms, CancellationToken external_token) {
CancellationTokenSource internal_cts;
CancellationToken internal_token = internal_cts.get_token();
// 链接外部令牌和内部令牌
// 当外部令牌取消时,内部令牌也会取消
// 实际的链接实现可能更复杂,需要CancellationTokenSource支持注册其他CancellationToken
// 这里简化为:我们只依赖内部CTS来触发取消,而外部令牌则作为另一个独立的检查点
std::cout << "[TimeoutTask] Starting with timeout of " << timeout_ms.count() << "ms." << std::endl;
// 启动一个线程,在超时后或外部取消时触发内部取消
std::thread timer_thread([&internal_cts, timeout_ms, external_token_copy = external_token]() {
// 等待超时,或者外部令牌被取消
// 这是一个简化的等待,更鲁棒的实现会使用条件变量
auto start_time = std::chrono::steady_clock::now();
while (std::chrono::steady_clock::now() - start_time < timeout_ms && !external_token_copy.is_cancellation_requested()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 轮询检查
}
if (!internal_cts.is_cancellation_requested()) { // 避免重复取消
std::cout << "[TimeoutTask] Timeout or external cancellation triggered, cancelling internal CTS." << std::endl;
internal_cts.cancel();
}
});
timer_thread.detach(); // 实际应管理线程生命周期
try {
// 执行一个子任务,使用内部令牌
std::string result = co_await sub_task("Timed", timeout_ms + std::chrono::milliseconds(500), internal_token); // 故意让子任务时间长于超时
co_return result;
} catch (const OperationCanceledException& e) {
std::cout << "[TimeoutTask] Caught cancellation: " << e.what() << std::endl;
co_return "Timeout task canceled.";
}
}
注意:上述timer_thread的实现是简化的轮询,在生产环境中,应使用std::condition_variable或asio等库提供的异步计时器来避免忙等待。
链接令牌(Linking Tokens)
有时,一个任务可能需要响应多个取消源。例如,它可能需要响应用户界面上的“取消”按钮,或者一个全局的“关机”信号。我们可以创建一个链接令牌源,它在任何一个源被取消时都会触发自身的取消。这需要CancellationTokenSource能够注册其他CancellationToken作为其触发器。
// 假设 CancellationTokenSource 有一个 add_linked_token 方法
// CancellationTokenSource::add_linked_token(CancellationToken other_token) {
// other_token.register_callback([this]{ this->cancel(); });
// }
// 示例:链接令牌的逻辑
void demonstrate_linked_tokens() {
CancellationTokenSource global_cts;
CancellationTokenSource user_cts;
CancellationTokenSource linked_cts;
// linked_cts 链接到 global_cts 和 user_cts
// 当 global_cts 或 user_cts 中的任何一个被取消时,linked_cts 也会被取消
// 这需要CancellationTokenSource提供相应的接口,例如:
// linked_cts.add_linked_token(global_cts.get_token());
// linked_cts.add_linked_token(user_cts.get_token());
// 这里我们直接注册回调模拟
global_cts.get_token().register_callback([&linked_cts]{ linked_cts.cancel(); });
user_cts.get_token().register_callback([&linked_cts]{ linked_cts.cancel(); });
CancellationToken task_token = linked_cts.get_token();
std::cout << "nDemonstrating Linked Tokens:" << std::endl;
auto linked_task_future = sub_task("Linked", std::chrono::milliseconds(5000), task_token);
std::thread t1([&global_cts]{
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "Global CTS cancels." << std::endl;
global_cts.cancel();
});
std::thread t2([&user_cts]{
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::cout << "User CTS cancels." << std::endl;
user_cts.cancel();
});
try {
linked_task_future.get();
} catch (const OperationCanceledException& e) {
std::cout << "Linked task caught cancellation: " << e.what() << std::endl;
}
t1.join();
t2.join();
}
在上述例子中,global_cts会在1秒后取消,user_cts会在2秒后取消。linked_cts会在global_cts取消时立即收到通知并触发取消,从而使sub_task("Linked", ...)被取消。
取消与I/O操作
对于阻塞式I/O操作,取消通常需要操作系统的支持。例如,在Windows上可以使用CancelSynchronousIo或异步I/O的CancelIoEx。在Linux上,某些I/O操作(如read, write)在被信号中断时会返回EINTR。
然而,对于异步I/O库(如Boost.Asio),它们通常会提供自己的取消机制,例如async_cancel()。在这种情况下,我们的CancellationToken可以作为触发这些库特定取消操作的信号。例如,当CancellationToken被取消时,我们可以注册一个回调,在该回调中调用asio::cancel()。
线程池与任务队列中的取消
在基于线程池的任务调度系统中,任务通常被放入一个队列。当任务被取消时,如果它还在队列中等待执行,最好的做法是将其从队列中移除,而不是执行它。如果任务已经开始执行,则按照我们上述的合作式取消原则处理。
实践中的性能与线程安全
- 原子操作的开销:
std::atomic<bool>的加载和存储操作通常比互斥量轻量,但并非零开销。在性能关键路径上频繁检查取消状态,需要权衡其开销。 - 锁的粒度:
CancellationTokenState中的std::mutex保护回调列表。cancel()方法将回调移动到局部变量再执行,以最小化锁的持有时间,这是良好的实践。 - 回调函数的执行上下文:当
CancellationTokenSource::cancel()被调用时,回调函数会在调用cancel()的线程上同步执行。如果回调函数执行时间过长或包含阻塞操作,可能会阻塞取消操作的完成。对于耗时回调,应将其调度到单独的线程或线程池中执行。 - 协程句柄的生命周期:在
await_suspend中注册回调来恢复协程时,必须确保协程句柄h在回调执行时仍然有效。这通常需要将h包装在std::shared_ptr中,或者确保其生命周期由外部管理(例如通过调度器)。我们示例中的std::thread([h, ...])虽然工作,但在实际生产环境中,h的生命周期管理是一个复杂的挑战。通常会有一个Executor或Scheduler来负责管理协程的生命周期和恢复。
构建弹性并发系统:取消机制的价值
通过今天对取消令牌的深入探讨,我们看到它不仅仅是一个简单的布尔标志。它是一个功能丰富的协作机制,能够使我们的C++并发任务流更加健壮、响应更快。通过CancellationTokenSource和CancellationToken的精心设计,结合C++20协程的强大能力,我们可以在任务的生命周期中注入优雅的终止点。这使得我们能够安全地管理资源,避免数据损坏,并为用户提供更流畅、更可控的体验。记住,在并发编程中,合作式取消是实现复杂任务优雅退出的黄金法则。它的价值在于将控制权交还给任务本身,允许它在被要求停止时,以最负责任和最安全的方式完成其使命。