解析 ‘Cancellation Tokens’:如何在并发 C++ 任务流中优雅且安全地终止一个正在执行的协程?

各位编程专家和技术爱好者们,大家好!

今天,我们将深入探讨一个在现代并发编程中至关重要的话题:如何在C++协程任务流中实现优雅且安全的终止。具体来说,我们将围绕“取消令牌”(Cancellation Tokens)这一核心概念,剖析其设计哲学、实现细节,以及如何将其无缝集成到C++20协程中,从而构建出更健壮、响应更快的并发系统。

在复杂的并发应用中,我们常常需要启动长时间运行的任务。然而,这些任务并非总能一帆风顺地执行到结束。用户可能关闭应用,外部事件可能使其不再需要,或者系统资源可能需要回收。在这种情况下,我们不能简单地粗暴中断一个正在执行的线程或协程,那将导致资源泄露、数据损坏或未定义的行为。我们需要一种机制,能够以合作的方式,通知正在执行的任务“请你尽快停止”,并允许它在停止前完成必要的清理工作。这就是取消令牌的用武之地。

开篇:并发任务的困境与优雅终止的需求

在多线程和异步编程的世界里,任务的生命周期管理是一个永恒的挑战。考虑一个典型的场景:你启动了一个后台任务,例如从网络下载大文件、执行复杂的科学计算、或者处理一连串的数据库事务。如果用户在任务完成前点击了“取消”按钮,或者程序的某个逻辑分支决定不再需要这个任务的结果,我们应该如何处理?

最糟糕的方案是直接杀死线程或协程。这就像在手术进行到一半时,突然切断病人的生命支持系统,后果不堪设想。被强制终止的任务可能:

  1. 泄露资源:文件句柄、网络连接、内存分配等可能没有被正确释放。
  2. 破坏数据一致性:如果任务正在修改共享数据结构,突然终止可能导致数据处于中间状态。
  3. 死锁或活锁:如果任务持有锁但未能释放就被终止,其他线程可能永远无法获取到该锁。
  4. 未定义的行为:在C++标准中,强制终止一个线程除了std::terminate之外,没有标准的、安全的方法。协程也面临类似问题。

因此,我们需要一种“软终止”机制,即任务能够主动响应终止请求,并在退出前执行清理工作。这种机制的核心思想是“合作式取消”(Cooperative Cancellation)。任务本身需要周期性地检查一个“取消信号”,一旦发现信号被触发,便开始优雅地收尾并退出。取消令牌正是这种信号的一种标准且高效的封装。

理解取消令牌(Cancellation Tokens)的核心理念

取消令牌并非一个C++标准库直接提供的概念,但其思想在许多现代并发框架(如.NET的CancellationTokenSource/CancellationToken,以及许多C++异步库)中都有体现。它的核心理念是将取消请求抽象为两个主要组件:

  1. CancellationTokenSource:取消信号的“发布者”或“源头”。它负责触发取消操作,并管理所有与之关联的取消令牌。当CancellationTokenSource被通知取消时,它会向所有关联的CancellationToken广播这个信号。
  2. CancellationToken:取消信号的“接收者”或“消费者”。它是一个轻量级的对象,可以传递给各种任务和协程。任务通过检查CancellationToken的状态来判断是否应该停止。它还可以注册回调函数,以便在取消发生时执行特定操作。

这种设计将取消的“发起”与“响应”解耦,使得任务的取消逻辑可以被清晰地隔离和复用。

基本工作流程概览:

  1. 创建一个CancellationTokenSource实例。
  2. CancellationTokenSource获取一个或多个CancellationToken实例。
  3. CancellationToken传递给需要被取消的任务或协程。
  4. 任务/协程在执行过程中,周期性地检查CancellationTokenis_cancellation_requested()方法。
  5. 如果任务发现取消被请求,它会执行清理工作,然后以适当的方式(例如抛出异常、返回特定值)退出。
  6. 在需要取消任务时,调用CancellationTokenSourcecancel()方法。
  7. cancel()方法会设置内部状态,并通知所有注册到CancellationToken上的回调函数。
组件 职责 主要方法
CancellationTokenSource 发起取消请求,管理取消状态和所有关联的令牌 cancel(): 触发取消
get_token(): 获取关联的CancellationToken
~CancellationTokenSource(): 析构时解除所有回调注册
CancellationToken 检查取消状态,注册/注销取消回调 is_cancellation_requested(): 检查是否已请求取消
throw_if_cancellation_requested(): 如果已请求取消则抛出异常
register_callback(): 注册一个在取消时执行的回调

C++并发基础与协程:构建取消机制的基石

在深入实现之前,我们快速回顾一下C++中与并发和异步相关的关键概念。

C++并发原语:

  • std::atomic<bool>:用于表示取消状态的原子布尔值,确保跨线程的可见性和修改的原子性。
  • std::mutex:用于保护共享数据(例如注册的回调列表)的访问,防止竞态条件。
  • std::condition_variable:在需要等待取消信号或通知取消时非常有用,但对于我们这里以轮询为主的取消机制,它的直接作用可能不如回调机制显著。然而,在实现await_suspend等阻塞操作时,它仍是必不可少的。
  • std::future / std::promise:虽然它们提供了异步结果的传递,但它们本身不直接提供取消机制。一个std::promise一旦set_valueset_exception,就无法再“取消”其关联的std::future

C++20协程基础:
C++20协程(Coroutines)提供了一种编写异步非阻塞代码的新范式,它允许函数在执行过程中暂停(co_await)和恢复,而无需显式的回调或状态机。理解以下几个核心概念对于集成取消令牌至关重要:

  • co_await:暂停当前协程,等待一个“可等待对象”(Awaitable)完成。
  • co_return:从协程返回一个值。
  • co_yield:暂停协程并产生一个值(用于生成器协程)。
  • promise_type:每个协程都关联一个promise_type,它定义了协程的生命周期行为,包括如何创建协程、如何处理返回值和异常、以及如何处理co_await操作。
  • Awaitable / Awaiter:一个可等待对象需要实现await_ready()await_suspend()await_resume()三个方法。
    • await_ready():如果立即可以恢复,返回true,不暂停。
    • await_suspend(std::coroutine_handle<P>):暂停协程。在这里可以调度协程到线程池,或者注册唤醒回调。
    • await_resume():协程恢复后执行,返回co_await表达式的结果。

为何标准库没有内置的取消机制?
C++标准库通常倾向于提供底层原语,而不是高层策略。取消机制的设计往往与特定的异步模型(例如基于事件循环、线程池或特定IO库)紧密耦合。一个通用的、适用于所有场景的取消模型很难标准化,因此C++标准将这个责任留给了库的实现者。这也正是我们今天自己构建取消令牌系统的原因。

设计一个C++风格的取消令牌系统

现在,让我们着手设计并实现我们自己的CancellationTokenSourceCancellationToken

CancellationTokenSource 的设计

CancellationTokenSource是取消请求的源头。它需要:

  1. 一个原子布尔变量来存储取消状态。
  2. 一个互斥量来保护对回调列表的并发访问。
  3. 一个回调函数列表,用于存储当取消发生时需要执行的操作。
#include <atomic>
#include <vector>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <stdexcept>
#include <memory>
#include <coroutine>
#include <thread>
#include <chrono>
#include <iostream>

// 前向声明,因为CancellationToken需要访问CancellationTokenSource的内部状态
class CancellationToken;

// 用于存储CancellationTokenSource和CancellationToken共享的状态
struct CancellationTokenState {
    std::atomic<bool> cancelled_ = false;
    std::mutex mutex_;
    std::condition_variable cv_;
    // 回调列表,存储std::function<void()>,当取消发生时执行
    std::vector<std::function<void()>> callbacks_;

    void add_callback(std::function<void()> callback) {
        if (cancelled_.load(std::memory_order_acquire)) {
            // 如果已经取消,立即执行回调
            callback();
        } else {
            std::lock_guard<std::mutex> lock(mutex_);
            if (cancelled_.load(std::memory_order_acquire)) {
                // 双重检查,防止在锁获取前被取消
                callback();
            } else {
                callbacks_.push_back(std::move(callback));
            }
        }
    }

    void remove_callback(void* callback_id) {
        // 实际实现中,callback_id需要更健壮的方式来标识和移除特定的回调
        // 简单示例中,我们暂不实现精确移除,而是依赖shared_ptr的生命周期管理
        // 对于高级场景,可以考虑返回一个Registration对象,其析构函数执行移除
    }

    void cancel() {
        if (!cancelled_.exchange(true, std::memory_order_acq_rel)) {
            // 只有当状态从false变为true时才执行通知
            std::vector<std::function<void()>> local_callbacks;
            {
                std::lock_guard<std::mutex> lock(mutex_);
                local_callbacks.swap(callbacks_); // 移动回调到局部变量,减少锁持有时间
            }
            // 释放锁后执行回调,避免回调内部再次获取锁导致死锁
            for (const auto& callback : local_callbacks) {
                try {
                    callback();
                } catch (const std::exception& e) {
                    // 记录回调中发生的异常,避免影响其他回调
                    std::cerr << "CancellationToken callback threw exception: " << e.what() << std::endl;
                }
            }
            cv_.notify_all(); // 通知所有等待取消信号的线程/协程
        }
    }
};

class CancellationTokenSource {
private:
    std::shared_ptr<CancellationTokenState> state_;

public:
    CancellationTokenSource() : state_(std::make_shared<CancellationTokenState>()) {}

    // 禁用拷贝构造和赋值,因为CancellationTokenSource通常是唯一的控制点
    CancellationTokenSource(const CancellationTokenSource&) = delete;
    CancellationTokenSource& operator=(const CancellationTokenSource&) = delete;

    // 允许移动语义
    CancellationTokenSource(CancellationTokenSource&&) noexcept = default;
    CancellationTokenSource& operator=(CancellationTokenSource&&) noexcept = default;

    CancellationToken get_token(); // 在CancellationToken定义后实现

    void cancel() {
        state_->cancel();
    }

    bool is_cancellation_requested() const {
        return state_->cancelled_.load(std::memory_order_acquire);
    }
};

CancellationToken 的设计

CancellationToken是轻量级的,它仅仅是CancellationTokenState的一个弱引用或共享引用。它不拥有状态,只是访问状态。

// 自定义异常,用于表示操作被取消
class OperationCanceledException : public std::runtime_error {
public:
    OperationCanceledException() : std::runtime_error("Operation was canceled.") {}
    explicit OperationCanceledException(const std::string& message) : std::runtime_error(message) {}
};

class CancellationToken {
private:
    std::shared_ptr<CancellationTokenState> state_;

    // 允许CancellationTokenSource访问私有构造函数
    friend class CancellationTokenSource;

    // 私有构造函数,只能通过CancellationTokenSource::get_token()创建
    explicit CancellationToken(std::shared_ptr<CancellationTokenState> state)
        : state_(std::move(state)) {}

public:
    CancellationToken() = default; // 允许默认构造空令牌 (不关联任何源)

    bool is_cancellation_requested() const {
        return state_ && state_->cancelled_.load(std::memory_order_acquire);
    }

    void throw_if_cancellation_requested() const {
        if (is_cancellation_requested()) {
            throw OperationCanceledException();
        }
    }

    // 注册一个回调函数,当取消发生时执行。
    // 返回一个可用于取消注册的对象(此处简化,实际可能返回一个unique_ptr<Registration>)
    // 为了简化示例,我们暂时不提供精确的注销机制,而是依赖shared_ptr的生命周期
    template<typename Func>
    void register_callback(Func&& callback) const {
        if (!state_) {
            // 如果没有关联状态,则永远不会取消
            return;
        }
        state_->add_callback(std::forward<Func>(callback));
    }

    // 等待取消的辅助函数 (通常用于阻塞线程,但在协程中我们会有更优雅的方式)
    void wait_for_cancellation() const {
        if (!state_) return;
        std::unique_lock<std::mutex> lock(state_->mutex_);
        state_->cv_.wait(lock, [this]{ return state_->cancelled_.load(std::memory_order_acquire); });
    }
};

// CancellationTokenSource::get_token() 的实现
inline CancellationToken CancellationTokenSource::get_token() {
    return CancellationToken(state_);
}

设计要点分析:

  • std::shared_ptr<CancellationTokenState>CancellationTokenSourceCancellationToken共享同一个CancellationTokenState实例。当所有关联的CancellationTokenCancellationTokenSource都销毁时,CancellationTokenState才会被释放。
  • 原子操作cancelled_使用std::atomic<bool>确保了多线程环境下取消状态的正确读写。
  • 互斥量与条件变量mutex_保护了callbacks_列表的并发访问。cv_用于通知等待取消的线程(尽管在协程中,我们会倾向于使用co_await而非阻塞等待)。
  • 回调机制add_callback允许任务注册在取消发生时执行的清理函数。这对于那些需要在取消时执行特定逻辑的组件非常有用,例如关闭文件、释放网络连接等。
  • 异常OperationCanceledException是推荐的取消通知方式。当任务检查到取消信号时,抛出此异常,可以沿着调用栈传播,让上层代码捕获并处理。
  • throw_if_cancellation_requested():这是一个非常方便的辅助函数,简化了在任务中检查取消状态的代码。

将取消令牌集成到协程中

现在我们有了取消令牌的基本框架,是时候将其与C++20协程结合起来了。目标是让协程能够:

  1. co_await一个可等待对象时,如果取消已被请求,则不暂停,而是立即退出。
  2. 在协程内部执行计算密集型任务时,定期检查取消状态。
  3. 将取消令牌有效地传递给嵌套的子协程。

使Awaitable可取消

一个典型的可等待对象,例如一个延迟操作(delay),在没有取消机制时会简单地暂停。现在,我们希望它在暂停前和暂停期间都能响应取消。

// 协程任务的返回类型,通常是一个future-like对象
template<typename T>
struct CoroutineFuture {
    struct promise_type {
        T value_;
        std::exception_ptr exception_;
        std::coroutine_handle<> continuation_;

        CoroutineFuture get_return_object() {
            return CoroutineFuture{std::coroutine_handle<promise_type>::from_promise(*this)};
        }
        std::suspend_always initial_suspend() { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; } // 协程结束后暂停,等待调用者resume

        void return_value(T value) { value_ = std::move(value); }
        void unhandled_exception() { exception_ = std::current_exception(); }

        // 当协程被co_await时,会调用此函数
        auto await_transform(std::suspend_always) { return std::suspend_always{}; }
        auto await_transform(std::suspend_never) { return std::suspend_never{}; }

        // 处理CancellationToken,使其能够作为co_await表达式的一部分
        CancellationToken await_transform(CancellationToken token) {
            return token;
        }
    };

    std::coroutine_handle<promise_type> handle_;

    explicit CoroutineFuture(std::coroutine_handle<promise_type> h) : handle_(h) {}
    CoroutineFuture(CoroutineFuture&& other) noexcept : handle_(std::exchange(other.handle_, {})) {}
    ~CoroutineFuture() { if (handle_) handle_.destroy(); }

    T get() {
        if (!handle_) throw std::runtime_error("CoroutineFuture moved or invalid.");
        while (!handle_.done()) {
            handle_.resume(); // 简单示例,实际可能需要调度器
        }
        if (handle_.promise().exception_) {
            std::rethrow_exception(handle_.promise().exception_);
        }
        return std::move(handle_.promise().value_);
    }
};

// -----------------------------------------------------------------------------
// 可取消的延时Awaitable
// -----------------------------------------------------------------------------
struct CancellableDelay {
    std::chrono::milliseconds duration_;
    CancellationToken token_;

    CancellableDelay(std::chrono::milliseconds duration, CancellationToken token)
        : duration_(duration), token_(std::move(token)) {}

    bool await_ready() const noexcept {
        return token_.is_cancellation_requested(); // 如果已取消,立即就绪,不暂停
    }

    void await_suspend(std::coroutine_handle<> h) {
        if (token_.is_cancellation_requested()) {
            // 在await_ready返回false后,但在await_suspend被调用时,
            // 令牌可能已被取消。此时我们不暂停协程,而是立即恢复。
            // 注意:在这种情况下,h.resume()会导致协程立即恢复,
            // 但如果await_ready已经返回true,此方法根本不会被调用。
            // 这里的逻辑主要是为了处理await_ready和await_suspend之间的竞态
            h.resume();
            return;
        }

        // 注册一个回调,当取消发生时,恢复协程。
        // 注意:这里需要确保回调能够安全地恢复协程句柄。
        // 一个常见模式是将coroutine_handle存储在shared_ptr中,确保其生命周期。
        // 或者使用一个外部调度器来处理resume。
        // 为了简化,我们假设CancellationTokenState的生命周期足够长,
        // 且协程句柄在回调执行时仍然有效。
        // 实际应用中,需要更复杂的生命周期管理。

        // 模拟异步操作:在另一个线程中等待或被取消时恢复
        std::thread([h, token_copy = token_, duration_ = duration_]() mutable {
            // 等待取消或延时结束
            std::unique_lock<std::mutex> lock(token_copy.state_->mutex_);
            token_copy.state_->cv_.wait_for(lock, duration_, [&]{
                return token_copy.is_cancellation_requested();
            });

            if (h) {
                h.resume(); // 唤醒协程
            }
        }).detach(); // 实际生产代码中不建议detach,应管理线程生命周期
    }

    void await_resume() {
        token_.throw_if_cancellation_requested(); // 恢复时检查取消状态,如果已取消则抛出异常
    }
};

// 辅助函数,用于创建CancellableDelay
inline CancellableDelay delay(std::chrono::milliseconds duration, CancellationToken token) {
    return CancellableDelay{duration, std::move(token)};
}

CancellableDelay的关键点:

  • await_ready():这是协程暂停前的第一个检查点。如果token_.is_cancellation_requested()true,协程将不会暂停,而是直接进入await_resume()
  • await_suspend():如果await_ready()返回false(即当前未取消),协程将在此处暂停。
    • 我们注册了一个回调,当取消发生时,它会通过h.resume()来唤醒协程。
    • 为了防止在await_ready返回falseawait_suspend执行前,取消信号到达的竞态条件,我们在await_suspend内部再次检查token_.is_cancellation_requested()。如果此时已取消,则立即恢复协程。
    • 为了模拟实际的异步操作,这里启动了一个std::thread,它会等待指定的duration_或者直到token_被取消。
  • await_resume():当协程被唤醒时,await_resume()会被调用。在这里,我们再次调用token_.throw_if_cancellation_requested()。如果此时取消已被请求,将抛出OperationCanceledException,协程的执行流将转到异常处理。

协程内部的取消检查

对于那些不涉及co_await的计算密集型任务,协程需要主动在内部检查取消状态。

// -----------------------------------------------------------------------------
// 可取消的长时间运行计算任务
// -----------------------------------------------------------------------------
CoroutineFuture<long long> long_running_computation(int iterations, CancellationToken token) {
    long long sum = 0;
    for (int i = 0; i < iterations; ++i) {
        // 每隔一定迭代次数检查一次取消状态
        if (i % 100000 == 0) {
            token.throw_if_cancellation_requested();
            // 也可以选择 co_await 一个检查点,让调度器有机会切换
            // co_await std::suspend_always{}; // 每次检查都暂停一下,让出控制权
        }
        sum += i;
    }
    co_return sum;
}

在这个例子中,long_running_computation协程会定期调用token.throw_if_cancellation_requested()。这确保了即使没有co_await操作,协程也能响应取消。

取消的传播与嵌套协程

CancellationToken从父协程传递给子协程是实现取消传播的关键。

// -----------------------------------------------------------------------------
// 嵌套协程示例
// -----------------------------------------------------------------------------
CoroutineFuture<std::string> sub_task(std::string name, std::chrono::milliseconds delay_ms, CancellationToken token) {
    try {
        std::cout << "[" << name << "] Sub-task started." << std::endl;
        co_await delay(delay_ms, token); // 使用可取消的延时
        std::cout << "[" << name << "] Sub-task finished." << std::endl;
        co_return "Sub-task " + name + " completed.";
    } catch (const OperationCanceledException& e) {
        std::cout << "[" << name << "] Sub-task caught cancellation: " << e.what() << std::endl;
        throw; // 重新抛出,让上层知道取消
    } catch (const std::exception& e) {
        std::cerr << "[" << name << "] Sub-task caught unexpected exception: " << e.what() << std::endl;
        throw;
    }
}

CoroutineFuture<std::string> main_task(CancellationToken token) {
    try {
        std::cout << "[Main] Main task started." << std::endl;

        // 传递相同的token给子任务
        auto result1_future = sub_task("A", std::chrono::milliseconds(2000), token);
        auto result2_future = sub_task("B", std::chrono::milliseconds(3000), token);

        // 尝试等待子任务
        std::cout << "[Main] Awaiting sub-task A..." << std::endl;
        std::string res1 = co_await result1_future.handle_; // co_await CoroutineFuture的handle
        std::cout << "[Main] Sub-task A result: " << res1 << std::endl;

        std::cout << "[Main] Awaiting sub-task B..." << std::endl;
        std::string res2 = co_await result2_future.handle_;
        std::cout << "[Main] Sub-task B result: " << res2 << std::endl;

        co_return "Main task completed successfully.";

    } catch (const OperationCanceledException& e) {
        std::cout << "[Main] Main task caught cancellation: " << e.what() << std::endl;
        co_return "Main task canceled.";
    } catch (const std::exception& e) {
        std::cerr << "[Main] Main task caught unexpected exception: " << e.what() << std::endl;
        throw;
    }
}

main_task中,我们创建了两个sub_task,并将同一个token传递给它们。当token被取消时,所有关联的子任务都会收到取消信号,并有机会进行清理。这种模式使得取消能够沿着协程调用链向下传播。

完整的示例程序:

int main() {
    std::cout << "Starting cancellation token example." << std::endl;

    CancellationTokenSource cts;
    CancellationToken token = cts.get_token();

    // 启动主任务
    auto main_future = main_task(token);

    // 启动一个线程,在一段时间后触发取消
    std::thread canceller_thread([&cts]() {
        std::this_thread::sleep_for(std::chrono::milliseconds(2500));
        std::cout << "n--- Requesting cancellation ---n" << std::endl;
        cts.cancel();
    });

    try {
        // 获取主任务的结果
        std::string final_result = main_future.get();
        std::cout << "nFinal Result: " << final_result << std::endl;
    } catch (const OperationCanceledException& e) {
        std::cout << "nMain future caught cancellation: " << e.what() << std::endl;
    } catch (const std::exception& e) {
        std::cerr << "nMain future caught unexpected exception: " << e.what() << std::endl;
    }

    canceller_thread.join();
    std::cout << "Example finished." << std::endl;

    return 0;
}

运行结果预期:
程序启动后,main_task会启动sub_task Asub_task Bsub_task A需要2秒,sub_task B需要3秒。
canceller_thread会在2.5秒后调用cts.cancel()
这意味着sub_task A将在完成前被取消,sub_task B也将在启动后不久被取消。
main_taskco_await result1_future.handle_co_await result2_future.handle_时,会因为底层的CancellableDelay抛出OperationCanceledException而被中断,并捕获该异常。

Starting cancellation token example.
[Main] Main task started.
[A] Sub-task started.
[B] Sub-task started.
[Main] Awaiting sub-task A...
--- Requesting cancellation ---

[A] Sub-task caught cancellation: Operation was canceled.
[Main] Main task caught cancellation: Operation was canceled.
Example finished.

这正是我们期望的合作式取消行为。

取消策略:合作式与(有限的)抢占式

合作式取消的优势与实践

我们目前所实现的取消机制就是典型的合作式取消。它的核心特点是:

  • 任务主动检查:被取消的任务必须在其代码中显式地检查取消状态。
  • 可控的清理:任务可以在响应取消之前执行必要的清理工作,如释放锁、关闭文件、回滚事务等。
  • 确定性:任务何时停止、以何种状态停止是可预测的,避免了资源泄露和数据损坏。

合作式取消是C++并发编程中推荐的做法,因为它提供了最高的安全性、可控性和可预测性。

抢占式取消的困境与C++的限制

抢占式取消意味着外部实体(例如操作系统或运行时)可以在任务没有显式检查的情况下强制中断其执行。在大多数情况下,C++中实现通用的、安全的抢占式取消是非常困难甚至不可能的,原因如下:

  • 资源清理问题:如果一个任务在持有锁、分配内存、或者进行文件I/O时被突然中断,它将没有机会释放这些资源,导致泄露或死锁。
  • 栈展开与析构:在C++中,析构函数是清理资源的自然场所。抢占式终止通常不会安全地执行栈展开,因此析构函数可能不会被调用。
  • C++标准缺少支持:C++标准没有提供跨平台的、安全地终止任意线程或协程的机制(除了致命错误导致的std::terminate)。

std::jthread的尝试:
C++20引入了std::jthread,它在析构时会自动join(),并提供了一个stop_token机制,可以向线程发送停止请求。std::jthreadstop_token是合作式取消的一个很好的例子,它允许线程在自己的代码中检查stop_token并决定何时停止。然而,这仍然是合作式的,它不会强制终止线程。它为线程提供了一个request_stop()信号,但线程必须自己响应。

对于协程,抢占式取消的挑战更大,因为协程的执行上下文可以被挂起和恢复,其生命周期与线程不完全绑定。没有标准的方法可以外部强行“销毁”一个正在运行的协程而保证安全。

结论:在C++中,我们应该始终优先考虑和设计合作式取消机制。

错误处理、资源管理与异常安全

在取消场景下,妥善的错误处理和资源管理至关重要。

OperationCanceledException 的设计与处理

我们自定义的OperationCanceledException是一种特殊的异常类型,它表示一个预期的、非错误的终止。

  • 捕获和区分:调用者应该能够捕获OperationCanceledException,并将其与真正的错误(如std::runtime_error)区分开来。通常,当捕获到OperationCanceledException时,不需要记录为错误日志,只需进行清理并正常退出。
  • 传播:当子任务被取消时,它应该抛出OperationCanceledException,让父任务能够感知并做出相应。
  • std::exception_ptr:在协程中,如果一个协程抛出异常,它的promise_type会捕获这个异常并存储在std::exception_ptr中。当CoroutineFuture::get()被调用时,这个异常会被重新抛出。这对于取消异常同样适用,确保取消信号能够正确传播。

RAII 在取消场景下的重要性

RAII (Resource Acquisition Is Initialization) 是C++中管理资源的核心范式。它在取消场景中尤为重要:

  • 自动清理:当协程因取消而抛出异常时,栈会展开。所有在栈上创建的局部对象(如std::unique_lock, std::fstream, std::vector等)的析构函数都会被调用。这意味着即使任务被取消,这些资源也能得到及时和正确的释放。
  • 锁管理:使用std::unique_lockstd::lock_guard可以确保即使在任务被取消并抛出异常时,锁也能被正确释放。
// 示例:RAII在取消时的应用
CoroutineFuture<void> task_with_resource(CancellationToken token) {
    std::cout << "[ResourceTask] Acquiring resource (e.g., file lock)." << std::endl;
    std::unique_lock<std::mutex> lock(some_global_mutex); // 模拟获取资源锁

    try {
        token.throw_if_cancellation_requested(); // 检查取消

        // 模拟一些工作
        co_await delay(std::chrono::milliseconds(1000), token);

        std::cout << "[ResourceTask] Releasing resource." << std::endl;
        lock.unlock(); // 正常完成时释放
        co_return;
    } catch (const OperationCanceledException&) {
        std::cout << "[ResourceTask] Cancellation requested. Resource (lock) will be released by RAII." << std::endl;
        // lock的析构函数会在栈展开时自动释放互斥量
        throw; // 重新抛出,让上层知道取消
    }
}

部分完成工作的处理

当任务被取消时,它可能已经完成了一部分工作。如何处理这些部分完成的工作取决于具体业务逻辑:

  • 原子性操作:如果任务涉及多步操作,且这些操作需要原子性(要么全做,要么全不做),那么在取消时应该回滚所有已完成的部分。例如,数据库事务可以通过rollback()来撤销。
  • 可恢复性:如果部分完成的工作可以保存并在以后恢复,那么在取消时可以保存当前进度,以便后续从中断点继续。
  • 丢弃:如果部分完成的工作没有副作用,或者其结果不重要,那么在取消时可以直接丢弃。

取消令牌本身不提供这些业务逻辑,但它提供了触发这些逻辑的机制。任务在捕获OperationCanceledException时,应根据业务需求执行相应的清理和回滚操作。

高级取消场景与考量

超时取消

超时是一种特殊的取消形式。我们希望一个任务在指定时间内未能完成时自动取消。这可以通过结合CancellationTokenSource和计时器来实现:

// 结合超时和CancellationTokenSource
CoroutineFuture<std::string> task_with_timeout(std::chrono::milliseconds timeout_ms, CancellationToken external_token) {
    CancellationTokenSource internal_cts;
    CancellationToken internal_token = internal_cts.get_token();

    // 链接外部令牌和内部令牌
    // 当外部令牌取消时,内部令牌也会取消
    // 实际的链接实现可能更复杂,需要CancellationTokenSource支持注册其他CancellationToken
    // 这里简化为:我们只依赖内部CTS来触发取消,而外部令牌则作为另一个独立的检查点

    std::cout << "[TimeoutTask] Starting with timeout of " << timeout_ms.count() << "ms." << std::endl;

    // 启动一个线程,在超时后或外部取消时触发内部取消
    std::thread timer_thread([&internal_cts, timeout_ms, external_token_copy = external_token]() {
        // 等待超时,或者外部令牌被取消
        // 这是一个简化的等待,更鲁棒的实现会使用条件变量
        auto start_time = std::chrono::steady_clock::now();
        while (std::chrono::steady_clock::now() - start_time < timeout_ms && !external_token_copy.is_cancellation_requested()) {
            std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 轮询检查
        }

        if (!internal_cts.is_cancellation_requested()) { // 避免重复取消
            std::cout << "[TimeoutTask] Timeout or external cancellation triggered, cancelling internal CTS." << std::endl;
            internal_cts.cancel();
        }
    });
    timer_thread.detach(); // 实际应管理线程生命周期

    try {
        // 执行一个子任务,使用内部令牌
        std::string result = co_await sub_task("Timed", timeout_ms + std::chrono::milliseconds(500), internal_token); // 故意让子任务时间长于超时
        co_return result;
    } catch (const OperationCanceledException& e) {
        std::cout << "[TimeoutTask] Caught cancellation: " << e.what() << std::endl;
        co_return "Timeout task canceled.";
    }
}

注意:上述timer_thread的实现是简化的轮询,在生产环境中,应使用std::condition_variableasio等库提供的异步计时器来避免忙等待。

链接令牌(Linking Tokens)

有时,一个任务可能需要响应多个取消源。例如,它可能需要响应用户界面上的“取消”按钮,或者一个全局的“关机”信号。我们可以创建一个链接令牌源,它在任何一个源被取消时都会触发自身的取消。这需要CancellationTokenSource能够注册其他CancellationToken作为其触发器。

// 假设 CancellationTokenSource 有一个 add_linked_token 方法
// CancellationTokenSource::add_linked_token(CancellationToken other_token) {
//     other_token.register_callback([this]{ this->cancel(); });
// }

// 示例:链接令牌的逻辑
void demonstrate_linked_tokens() {
    CancellationTokenSource global_cts;
    CancellationTokenSource user_cts;
    CancellationTokenSource linked_cts;

    // linked_cts 链接到 global_cts 和 user_cts
    // 当 global_cts 或 user_cts 中的任何一个被取消时,linked_cts 也会被取消
    // 这需要CancellationTokenSource提供相应的接口,例如:
    // linked_cts.add_linked_token(global_cts.get_token());
    // linked_cts.add_linked_token(user_cts.get_token());
    // 这里我们直接注册回调模拟
    global_cts.get_token().register_callback([&linked_cts]{ linked_cts.cancel(); });
    user_cts.get_token().register_callback([&linked_cts]{ linked_cts.cancel(); });

    CancellationToken task_token = linked_cts.get_token();

    std::cout << "nDemonstrating Linked Tokens:" << std::endl;
    auto linked_task_future = sub_task("Linked", std::chrono::milliseconds(5000), task_token);

    std::thread t1([&global_cts]{
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        std::cout << "Global CTS cancels." << std::endl;
        global_cts.cancel();
    });

    std::thread t2([&user_cts]{
        std::this_thread::sleep_for(std::chrono::milliseconds(2000));
        std::cout << "User CTS cancels." << std::endl;
        user_cts.cancel();
    });

    try {
        linked_task_future.get();
    } catch (const OperationCanceledException& e) {
        std::cout << "Linked task caught cancellation: " << e.what() << std::endl;
    }

    t1.join();
    t2.join();
}

在上述例子中,global_cts会在1秒后取消,user_cts会在2秒后取消。linked_cts会在global_cts取消时立即收到通知并触发取消,从而使sub_task("Linked", ...)被取消。

取消与I/O操作

对于阻塞式I/O操作,取消通常需要操作系统的支持。例如,在Windows上可以使用CancelSynchronousIo或异步I/O的CancelIoEx。在Linux上,某些I/O操作(如read, write)在被信号中断时会返回EINTR

然而,对于异步I/O库(如Boost.Asio),它们通常会提供自己的取消机制,例如async_cancel()。在这种情况下,我们的CancellationToken可以作为触发这些库特定取消操作的信号。例如,当CancellationToken被取消时,我们可以注册一个回调,在该回调中调用asio::cancel()

线程池与任务队列中的取消

在基于线程池的任务调度系统中,任务通常被放入一个队列。当任务被取消时,如果它还在队列中等待执行,最好的做法是将其从队列中移除,而不是执行它。如果任务已经开始执行,则按照我们上述的合作式取消原则处理。

实践中的性能与线程安全

  • 原子操作的开销std::atomic<bool>的加载和存储操作通常比互斥量轻量,但并非零开销。在性能关键路径上频繁检查取消状态,需要权衡其开销。
  • 锁的粒度CancellationTokenState中的std::mutex保护回调列表。cancel()方法将回调移动到局部变量再执行,以最小化锁的持有时间,这是良好的实践。
  • 回调函数的执行上下文:当CancellationTokenSource::cancel()被调用时,回调函数会在调用cancel()的线程上同步执行。如果回调函数执行时间过长或包含阻塞操作,可能会阻塞取消操作的完成。对于耗时回调,应将其调度到单独的线程或线程池中执行。
  • 协程句柄的生命周期:在await_suspend中注册回调来恢复协程时,必须确保协程句柄h在回调执行时仍然有效。这通常需要将h包装在std::shared_ptr中,或者确保其生命周期由外部管理(例如通过调度器)。我们示例中的std::thread([h, ...])虽然工作,但在实际生产环境中,h的生命周期管理是一个复杂的挑战。通常会有一个ExecutorScheduler来负责管理协程的生命周期和恢复。

构建弹性并发系统:取消机制的价值

通过今天对取消令牌的深入探讨,我们看到它不仅仅是一个简单的布尔标志。它是一个功能丰富的协作机制,能够使我们的C++并发任务流更加健壮、响应更快。通过CancellationTokenSourceCancellationToken的精心设计,结合C++20协程的强大能力,我们可以在任务的生命周期中注入优雅的终止点。这使得我们能够安全地管理资源,避免数据损坏,并为用户提供更流畅、更可控的体验。记住,在并发编程中,合作式取消是实现复杂任务优雅退出的黄金法则。它的价值在于将控制权交还给任务本身,允许它在被要求停止时,以最负责任和最安全的方式完成其使命。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注