深度解析 ‘Read-Copy-Update’ (RCU):在用户态 C++ 中实现支持万级并发读的无锁配置系统

各位同事,各位技术爱好者,大家好!

今天我们将深入探讨一个在高性能并发系统中至关重要的无锁技术——Read-Copy-Update (RCU)。特别地,我们将聚焦于如何在用户态 C++ 中实现 RCU,并将其应用于构建一个支持万级并发读的配置系统。

在现代分布式服务和高并发应用中,配置系统扮演着核心角色。它需要提供极高的读取性能,因为几乎每一个请求处理流程都可能需要访问配置。同时,配置的更新频率相对较低,但更新时必须保证数据一致性,且不能长时间阻塞读取操作。传统的锁机制,如互斥锁(mutex),在读多写少的场景下会成为性能瓶颈,因为即使是读操作也可能需要获取共享锁,从而引入上下文切换和锁竞争开销。

为了解决这个问题,我们需要一种机制,允许读者几乎无障碍地访问数据,而写者则在后台进行更新,并在一个“安全”的时间点回收旧数据。RCU 正是为此而生。

一、RCU 核心思想与适用场景

RCU 的全称是 "Read-Copy-Update",顾名思义,它包含三个核心操作:

  1. Read (读):读者直接访问共享数据,无需获取锁。这是 RCU 能够实现极高读性能的关键。
  2. Copy (复制):当需要修改数据时,写者首先复制一份当前数据。
  3. Update (更新):写者修改复制出来的新数据,然后原子地将共享指针指向新数据。

RCU 的精髓在于,在写者原子更新共享指针后,旧版本的数据并不会立即被销毁。它需要等待一个“宽限期(Grace Period)”,确保所有在更新前开始读取旧数据的读者都已完成读取,不再持有对旧数据的引用。只有在宽限期结束后,旧数据才能安全地回收。

RCU 的优势:

  • 极高的读性能:读者几乎无锁(或只进行极轻量的原子操作),只需简单的指针解引用。
  • 低写者开销(相对):写者通过复制和原子更新指针完成数据修改,避免了长时间持有锁。
  • 适用于读多写少场景:这是 RCU 发挥最大价值的领域,如配置管理、路由表、DNS 缓存等。

RCU 的限制:

  • 写者开销:写者需要复制整个数据结构,这可能导致较高的内存和 CPU 开销,尤其对于大型数据结构。
  • 内存占用:在宽限期内,旧版本的数据会持续存在,增加了内存占用。
  • 复杂性:实现用户态 RCU 需要精心设计宽限期机制和内存回收策略。
  • 不适用于通用锁替代:RCU 不解决写写冲突,也不适用于需要频繁修改数据或数据结构拓扑变化的场景。

为什么 RCU 适合配置系统?

配置系统通常具有以下特点:

  • 读操作极其频繁:每个服务请求都可能读取配置。
  • 写操作相对稀疏:配置更新不频繁,但在更新时需要平滑过渡。
  • 数据结构相对稳定:配置数据通常是键值对、树或列表,其结构变化不大。
  • 一致性要求高:读者必须看到一致的配置视图。

RCU 完美契合这些需求,它能确保读者总是看到一个完整的、一致的配置快照,而不会被更新操作中断。

二、用户态 RCU 的挑战

RCU 最初是 Linux 内核中的一种同步机制,它利用了内核对进程调度和上下文切换的深刻理解来定义“宽限期”和“静止状态(Quiescent State)”。然而,在用户态实现 RCU,我们面临着独特的挑战:

  1. 缺乏内核支持:我们无法直接使用 synchronize_rcu() 这样的内核 API,也无法依赖内核来定义静止状态。在用户态,我们不能简单地假定一个线程在离开 CPU 后就进入了静止状态。
  2. 自定义宽限期机制:我们需要在用户态自行实现宽限期检测逻辑。这通常涉及线程注册、线程状态跟踪和全局计数器/ epoch 管理。
  3. 内存回收管理:内核 RCU 有 call_rcu() 这样的机制来在宽限期结束后异步释放内存。在用户态,我们需要实现类似的延迟回收机制。
  4. 线程生命周期管理:当线程启动或退出时,它必须正确地注册或注销其 RCU 状态,以避免僵尸线程状态影响宽限期检测。

为了应对这些挑战,我们将设计一套用户态 RCU 基础设施,它由一个全局的 RcuManager 和每个线程私有的 RcuState 组成。

三、用户态 RCU 核心设计与实现

我们将采用一种基于 epoch(纪元)的机制来管理 RCU 的宽限期。

3.1 核心组件

1. RcuState (线程本地 RCU 状态)

每个参与 RCU 读操作的线程都将拥有一个 thread_localRcuState 实例。

#include <atomic>
#include <vector>
#include <functional>
#include <mutex> // For global manager's registration/reclamation queues
#include <thread> // For std::this_thread::yield
#include <chrono> // For std::chrono::microseconds
#include <memory> // For std::shared_ptr
#include <map>    // For ConfigData

// Forward declaration for RcuManager to use in RcuState
class RcuManager;

// Thread-local RCU state
struct RcuState {
    // Stores the epoch when the thread *entered* its RCU read section.
    // A value of 0 indicates the thread is not currently in an RCU read section.
    std::atomic<uint64_t> thread_current_epoch;
    int read_section_depth; // To handle nested rcu_read_lock calls
    std::vector<std::function<void()>> thread_local_reclamation_queue; // Callbacks deferred by this thread

    RcuState() : thread_current_epoch(0), read_section_depth(0) {}

    // Method to register this thread's state with the global manager
    void register_with_manager(RcuManager* manager);
};
  • thread_current_epoch: 这是一个原子变量,用于存储线程进入 RCU 读区时 RcuManager 的全局 epoch 值。当线程退出 RCU 读区时,它会被设置为 0。synchronize_rcu 将等待所有活跃线程的 thread_current_epoch 值。
  • read_section_depth: 用于支持 RCU 读区的嵌套。只有当深度为 0 时,才真正进入或退出 RCU 读区。
  • thread_local_reclamation_queue: 用于存储由该线程调用 call_rcu 延迟的内存回收回调。当线程退出最外层 RCU 读区时,这些回调会被转移到全局的回收队列中。

2. RcuManager (全局 RCU 管理器)

RcuManager 是一个单例,负责管理全局 epoch、注册的线程状态以及处理内存回收。

class RcuManager {
public:
    static RcuManager& get_instance() {
        static RcuManager instance;
        return instance;
    }

    // Register a thread's RcuState. This is typically called once per thread
    // when it first enters an RCU read section.
    void register_thread_epoch(std::atomic<uint64_t>* thread_epoch_ptr) {
        std::lock_guard<std::mutex> lock(_registration_mutex);
        // Avoid duplicate registration if a thread re-registers (e.g., if we didn't track it locally)
        // For simplicity, we assume one-time registration per thread's lifetime here.
        // In a robust system, one might use a std::set or a map with thread_id as key.
        _registered_thread_epochs.push_back(thread_epoch_ptr);
    }

    // Get the current global epoch. Used by readers to mark their entry.
    uint64_t get_global_epoch() const {
        return _global_epoch.load(std::memory_order_relaxed);
    }

    // Schedule a callback for execution after a grace period.
    // This function is typically called by a writer or by rcu_read_unlock
    // to transfer thread-local deferred callbacks.
    void call_rcu_deferred(std::function<void()> callback) {
        std::lock_guard<std::mutex> lock(_global_reclamation_mutex);
        _global_reclamation_queue_next_epoch.push_back(std::move(callback));
    }

    // Block until all RCU read sections that started before the current
    // grace period have completed. Then, reclaim memory.
    void synchronize_rcu() {
        // 1. Advance the global epoch. This marks the start of a new grace period.
        // `target_epoch` will be the epoch value *before* the increment.
        // Readers starting *after* this point will see a higher epoch.
        uint64_t target_epoch = _global_epoch.fetch_add(1, std::memory_order_acq_rel);

        // 2. Wait for all registered threads to pass the 'target_epoch'.
        // This means they have either exited their RCU read section (thread_current_epoch == 0)
        // or have started a new RCU read section after target_epoch was set.
        std::vector<std::atomic<uint64_t>*> current_registered_threads;
        {
            std::lock_guard<std::mutex> lock(_registration_mutex);
            current_registered_threads = _registered_thread_epochs;
        }

        for (auto thread_epoch_ptr : current_registered_threads) {
            // Spin-wait until the thread's epoch marker is either 0 (not in RCU)
            // or greater than the target_epoch (started a new RCU section after target_epoch was set).
            while (true) {
                uint64_t thread_epoch = thread_epoch_ptr->load(std::memory_order_acquire);
                if (thread_epoch == 0 || thread_epoch > target_epoch) {
                    break;
                }
                // Yield to other threads to avoid busy-waiting too aggressively
                std::this_thread::yield();
            }
        }

        // 3. All readers that were active at or before `target_epoch` have now completed or moved on.
        // It's safe to execute reclamation callbacks associated with the completed grace period.
        {
            std::lock_guard<std::mutex> lock(_global_reclamation_mutex);
            for (auto& callback : _global_reclamation_queue) {
                callback(); // Execute the deferred deletion
            }
            _global_reclamation_queue.clear();

            // Move callbacks from the "next epoch" queue to the current queue.
            // These callbacks were scheduled *during* the grace period that just completed,
            // so they need to wait for the *next* grace period to be safe.
            _global_reclamation_queue.swap(_global_reclamation_queue_next_epoch);
        }
    }

private:
    RcuManager() : _global_epoch(1) {} // Start epoch at 1
    ~RcuManager() = default;
    RcuManager(const RcuManager&) = delete;
    RcuManager& operator=(const RcuManager&) = delete;

    std::atomic<uint64_t> _global_epoch;
    std::vector<std::atomic<uint64_t>*> _registered_thread_epochs;
    std::mutex _registration_mutex; // Protects _registered_thread_epochs

    // Callbacks for the current grace period (to be processed *after* the current grace period)
    std::vector<std::function<void()>> _global_reclamation_queue;
    // Callbacks for the next grace period (to be added to _global_reclamation_queue after current is processed)
    std::vector<std::function<void()>> _global_reclamation_queue_next_epoch;
    std::mutex _global_reclamation_mutex; // Protects reclamation queues
};
  • _global_epoch: 全局原子计数器,每次 synchronize_rcu 调用时递增。
  • _registered_thread_epochs: 存储所有已注册线程的 thread_current_epoch 指针。_registration_mutex 保护其访问。
  • _global_reclamation_queue_global_reclamation_queue_next_epoch: 这是两个回收队列,采用双缓冲机制。_global_reclamation_queue 存储需要在当前宽限期结束后回收的旧数据回调,而 _global_reclamation_queue_next_epoch 存储在当前宽限期进行中时新加入的回收回调,它们将在下一个宽限期结束后才被处理。这确保了足够长的延迟,以覆盖所有可能的读者。
  • synchronize_rcu(): 这是核心函数。它递增 _global_epoch,然后遍历所有已注册线程的 thread_current_epoch。它会自旋等待,直到所有线程的 thread_current_epoch> target_epoch(表示它们已经看到了新的 epoch 并进入了新的 RCU 读区)或等于 0(表示它们已退出 RCU 读区)。一旦所有线程都满足条件,就表示宽限期已过,可以安全地执行 _global_reclamation_queue 中的回调,然后交换两个回收队列。

3.2 RCU 操作宏/函数

接下来,我们定义用户态 RCU 的基本操作:

// Define thread_local RcuState
thread_local RcuState t_rcu_state;

// Implement RcuState::register_with_manager
void RcuState::register_with_manager(RcuManager* manager) {
    manager->register_thread_epoch(&thread_current_epoch);
}

// RCU read-side critical section entry
inline void rcu_read_lock() {
    if (t_rcu_state.read_section_depth == 0) {
        // First entry into an RCU read section for this thread
        // Lazily register this thread's RCU state if not already registered.
        // `registered_this_thread` is thread_local, ensuring it's set once per thread.
        static thread_local bool registered_this_thread = false;
        if (!registered_this_thread) {
            t_rcu_state.register_with_manager(&RcuManager::get_instance());
            registered_this_thread = true;
        }

        // Store the current global epoch into the thread's marker.
        // This signifies that this thread is now active in an RCU read section
        // associated with this epoch.
        t_rcu_state.thread_current_epoch.store(
            RcuManager::get_instance().get_global_epoch(),
            std::memory_order_relaxed // Relaxed is fine here, as synchronization is handled by synchronize_rcu
        );
    }
    t_rcu_state.read_section_depth++;
}

// RCU read-side critical section exit
inline void rcu_read_unlock() {
    t_rcu_state.read_section_depth--;
    if (t_rcu_state.read_section_depth == 0) {
        // Last exit from an RCU read section for this thread
        // Set thread_current_epoch to 0 to signal that this thread is no longer
        // in an RCU read section. This is a release operation to ensure prior
        // memory accesses are visible.
        t_rcu_state.thread_current_epoch.store(0, std::memory_order_release); 

        // Move thread-local deferred callbacks to the global queue for the *next* grace period.
        // This ensures they are processed with a sufficient delay.
        if (!t_rcu_state.thread_local_reclamation_queue.empty()) {
            RcuManager::get_instance().call_rcu_deferred_internal(
                std::make_move_iterator(t_rcu_state.thread_local_reclamation_queue.begin()),
                std::make_move_iterator(t_rcu_state.thread_local_reclamation_queue.end())
            );
            t_rcu_state.thread_local_reclamation_queue.clear();
        }
    }
}

// Defer a reclamation callback
inline void call_rcu(std::function<void()> callback) {
    t_rcu_state.thread_local_reclamation_queue.push_back(std::move(callback));
}
  • rcu_read_lock():
    • 如果这是该线程第一次进入 RCU 读区(read_section_depth == 0),它会确保线程的 RcuState 已注册到 RcuManager
    • 然后,它将 _global_epoch 的当前值存储到 t_rcu_state.thread_current_epoch 中,表示该线程正在读取此 epoch 的数据。
    • read_section_depth 递增。
  • rcu_read_unlock():
    • read_section_depth 递减。
    • 如果深度变为 0,表示线程完全离开了 RCU 读区。此时,t_rcu_state.thread_current_epoch 被设置为 0(一个 release 操作,确保所有读操作在退出前完成),表示线程不再活跃。
    • 同时,将线程本地的回收队列中的回调移动到 RcuManager_global_reclamation_queue_next_epoch 中。
  • call_rcu(): 允许任何线程(通常是读者在 RCU 读区内,或写者在更新操作中)将一个回调函数添加到其线程本地的回收队列中。

RcuManager::call_rcu_deferred_internal
为了批量移动 std::vector 的内容到另一个 std::vector,需要一个内部辅助函数:

// Add to RcuManager class:
    // Internal helper to add multiple callbacks efficiently
    void call_rcu_deferred_internal(std::vector<std::function<void()>>::iterator begin, 
                                    std::vector<std::function<void()>>::iterator end) {
        std::lock_guard<std::mutex> lock(_global_reclamation_mutex);
        _global_reclamation_queue_next_epoch.insert(_global_reclamation_queue_next_epoch.end(),
                                                      std::make_move_iterator(begin),
                                                      std::make_move_iterator(end));
    }

3.3 内存顺序与原子性

RCU 的正确性高度依赖于 C++11 引入的内存模型和原子操作。

  • _current_config.load(std::memory_order_acquire) (读者)
    • acquire 语义确保在加载 _current_config 指针之后的所有内存访问,都不能被重排序到加载操作之前。
    • 这意味着读者在获取到新指针后,能够看到写者在更新指针之前对新数据结构的所有修改。
  • _current_config.store(new_config_data, std::memory_order_release) (写者)
    • release 语义确保在存储 _current_config 指针之前的所有内存访问(即对 new_config_data 的修改),都不能被重排序到存储操作之后。
    • 这意味着当读者通过 acquire 语义看到新指针时,它也能看到写者在存储新指针之前所做的所有修改。
  • _global_epoch.fetch_add(1, std::memory_order_acq_rel) (synchronize_rcu )
    • acquire-release 语义确保 _global_epoch 的递增操作本身是原子且有序的。
    • 它在 fetch_add 之前对所有操作起到 release 作用,在 fetch_add 之后对所有操作起到 acquire 作用,从而建立了一个全局的同步点。
  • thread_epoch_ptr->load(std::memory_order_acquire) (synchronize_rcu 等待)
    • synchronize_rcu 读取线程的 thread_current_epoch 时,使用 acquire 语义。这与 rcu_read_unlock 中将 thread_current_epoch 设置为 0 的 release 语义形成同步对。
    • 这意味着一旦 synchronize_rcu 看到线程的 thread_current_epoch 为 0 或 > target_epoch,它就能保证该线程在退出或更新 thread_current_epoch 之前的所有操作都已经完成并可见。

这些内存屏障和原子操作共同确保了 RCU 的三个关键属性:

  1. 数据一致性:读者总是看到一个完整且一致的数据快照。
  2. 写后读可见性:一旦写者更新了指针,新读者将看到新数据。
  3. 旧数据回收安全性:只有在所有旧读者都已完成其读操作后,旧数据才会被回收。

四、基于 RCU 的配置系统实现

现在,我们把 RCU 机制应用到配置系统上。

4.1 配置数据结构

// Example configuration data structure
struct ConfigData {
    std::map<std::string, std::string> settings;
    uint64_t version; // To track configuration version

    ConfigData(uint64_t v = 0) : version(v) {}

    // Deep copy constructor for copy-on-write
    ConfigData(const ConfigData& other) : settings(other.settings), version(other.version) {}

    void print() const {
        std::cout << "Config Version: " << version << std::endl;
        for (const auto& pair : settings) {
            std::cout << "  " << pair.first << ": " << pair.second << std::endl;
        }
    }
};

// Smart pointer alias for configuration data managed by RCU
using ConfigPtr = ConfigData*;

ConfigData 是我们的配置负载,它包含一个 std::map 来存储键值对,以及一个 version 字段来追踪配置版本。提供一个深拷贝构造函数是实现“Copy-on-Write”的关键。

4.2 ConfigManager

ConfigManager 是配置系统的核心,它持有指向当前配置的 std::atomic 指针,并提供读写接口。

class ConfigManager {
public:
    ConfigManager() : _current_config(new ConfigData()) {}

    // Read configuration (RCU read-side)
    std::shared_ptr<const ConfigData> get_config() const {
        rcu_read_lock();
        // The pointer itself is atomic, but the data it points to is immutable for readers.
        // We use a custom deleter for shared_ptr to integrate with RCU reclamation.
        ConfigPtr raw_ptr = _current_config.load(std::memory_order_acquire);

        // Create a shared_ptr from the raw pointer with an RCU-aware deleter.
        // The shared_ptr takes ownership, and when its last instance is destructed,
        // RcuDeleter schedules the actual deletion after a grace period via call_rcu.
        struct RcuDeleter {
            void operator()(const ConfigData* ptr) const {
                // When the last shared_ptr goes out of scope, defer deletion via RCU
                call_rcu([ptr]() {
                    delete ptr;
                });
            }
        };
        std::shared_ptr<const ConfigData> config_snapshot(raw_ptr, RcuDeleter());
        rcu_read_unlock(); // Unlock after creating the shared_ptr, which now manages lifetime
        return config_snapshot;
    }

    // Update configuration (RCU write-side)
    void update_config(std::function<void(ConfigData&)> update_func) {
        // 1. Create a new version of the configuration
        ConfigPtr old_config = _current_config.load(std::memory_order_relaxed);
        ConfigData* new_config_data = new ConfigData(*old_config); // Copy-on-write
        new_config_data->version++; // Increment version
        update_func(*new_config_data); // Apply updates

        // 2. Atomically swap the pointer
        _current_config.store(new_config_data, std::memory_order_release);

        // 3. Schedule old_config for reclamation after a grace period.
        // This is a writer deferring, so it goes directly to RcuManager's global queue.
        RcuManager::get_instance().call_rcu_deferred([old_config]() {
            delete old_config;
        });

        // 4. Wait for a grace period. This ensures no reader is still using old_config.
        // This is the blocking part for writers, but readers are never blocked.
        RcuManager::get_instance().synchronize_rcu();
    }

private:
    std::atomic<ConfigPtr> _current_config;
};
  • _current_config: std::atomic<ConfigPtr> 存储指向当前活跃配置数据的指针。这是 RCU 机制的核心共享可变状态。
  • get_config() (读操作):
    1. 调用 rcu_read_lock() 进入 RCU 读区。
    2. 原子地加载 _current_config 指针。由于 RCU 保证了旧数据在宽限期内有效,我们无需担心 raw_ptr 立即失效。
    3. 关键步骤:创建一个 std::shared_ptr,并为其提供一个自定义的 RcuDeleter。当这个 shared_ptr 的引用计数降为 0 时,RcuDeleter 不会立即 delete 原始指针,而是通过 call_rcu 将删除操作延迟到宽限期结束后。这样,shared_ptr 就能在 RCU 读区之外安全地管理配置对象的生命周期。
    4. 调用 rcu_read_unlock() 退出 RCU 读区。
  • update_config() (写操作):
    1. 加载当前活跃的配置指针 old_config
    2. 基于 old_config 创建一个全新的 ConfigData 副本 new_config_data(Copy-on-Write)。
    3. new_config_data 应用所有必要的更新,例如修改设置、递增版本号。
    4. 原子地将 _current_config 指针更新为 new_config_data。这是一个 release 操作,确保所有对 new_config_data 的修改在指针更新前完成。
    5. 调用 RcuManager::get_instance().call_rcu_deferred()old_config 的删除操作延迟到宽限期结束后。
    6. 调用 RcuManager::get_instance().synchronize_rcu()。这是写者唯一的阻塞点,它会等待所有在指针更新前开始读取 old_config 的读者都完成。

4.3 整体流程总结

RCU 配置系统的读写流程如下表所示:

角色 操作 步骤 内存顺序/同步点
读者 get_config() 1. 调用 rcu_read_lock() 进入 RCU 读区。 thread_current_epoch 写入 relaxed
2. 原子加载 _current_config 指针。 _current_config 加载 acquire
3. 基于加载的指针创建 std::shared_ptr,并附带 RCU 感知的自定义删除器。
4. 调用 rcu_read_unlock() 退出 RCU 读区。 thread_current_epoch 写入 release
写者 update_config() 1. 加载 _current_config 指针(old_config)。 _current_config 加载 relaxed
2. 深拷贝 old_confignew_config_data,并进行修改。
3. 原子更新 _current_config 指针为 new_config_data _current_config 存储 release
4. 将 old_config 的删除操作通过 RcuManager::call_rcu_deferred 延迟到宽限期结束后。 _global_reclamation_mutex 保护
5. 调用 RcuManager::synchronize_rcu() 等待宽限期结束:
a. 递增 _global_epoch (target_epoch)。
b. 自旋等待所有注册线程的 thread_current_epoch 变为 0> target_epoch
c. 执行 _global_reclamation_queue 中的回调,释放旧数据。
d. 交换回收队列。
_global_epoch acq_relthread_epoch_ptr 加载 acquire

4.4 示例与测试

// --- Test Code ---

ConfigManager g_config_manager;

// This function needs to be called by each thread that might perform RCU operations.
// It ensures thread_local RcuState is properly initialized and registered.
void initialize_rcu_thread() {
    // Calling rcu_read_lock once will register the thread's RcuState
    // even if it immediately unlocks.
    rcu_read_lock();
    rcu_read_unlock();
}

void reader_thread_func(int id, int num_reads) {
    initialize_rcu_thread(); // Initialize RCU for this thread
    for (int i = 0; i < num_reads; ++i) {
        std::shared_ptr<const ConfigData> config = g_config_manager.get_config();
        // Simulate work with configuration data
        // For demonstration, avoid heavy I/O in reader loop to focus on RCU performance
        // std::cout << "Reader " << id << ": Config version " << config->version << ", Key 'app.name': " << config->settings.at("app.name") << std::endl;
        (void)config->version; // Just access to ensure it's valid
        if (config->settings.count("app.name")) {
            (void)config->settings.at("app.name");
        }
        std::this_thread::sleep_for(std::chrono::microseconds(10)); // Simulate some work
    }
}

void writer_thread_func(int id, int num_writes) {
    initialize_rcu_thread(); // Initialize RCU for this thread
    for (int i = 0; i < num_writes; ++i) {
        g_config_manager.update_config([&](ConfigData& data) {
            data.settings["app.name"] = "MyCoolApp_v" + std::to_string(data.version + 1);
            data.settings["feature.toggle"] = (data.version % 2 == 0) ? "enabled" : "disabled";
            data.settings["writer.id"] = std::to_string(id);
        });
        std::cout << "Writer " << id << ": Updated config to version " << g_config_manager.get_config()->version << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(50)); // Simulate some work
    }
}

int main() {
    std::cout << "Starting RCU-based configuration system simulation." << std::endl;

    const int num_readers = 10000; // 万级并发读
    const int num_writers = 2;
    const int reads_per_reader = 100;
    const int writes_per_writer = 10;

    // Initial configuration
    g_config_manager.update_config([](ConfigData& data){
        data.settings["app.name"] = "InitialApp";
        data.settings["feature.toggle"] = "enabled";
    });
    std::cout << "Initial config version: " << g_config_manager.get_config()->version << std::endl;

    std::vector<std::thread> readers;
    for (int i = 0; i < num_readers; ++i) {
        readers.emplace_back(reader_thread_func, i, reads_per_reader);
    }

    std::vector<std::thread> writers;
    for (int i = 0; i < num_writers; ++i) {
        writers.emplace_back(writer_thread_func, i, writes_per_writer);
    }

    for (auto& t : readers) {
        t.join();
    }
    for (auto& t : writers) {
        t.join();
    }

    std::cout << "Simulation finished. Final config version: " << g_config_manager.get_config()->version << std::endl;
    // Ensure all remaining deferred callbacks are processed
    RcuManager::get_instance().synchronize_rcu();
    RcuManager::get_instance().synchronize_rcu(); // Run twice to catch all double-buffered callbacks
    std::cout << "All deferred reclamations processed." << std::endl;

    return 0;
}

main 函数中,我们创建了 10000 个读者线程和 2 个写者线程。读者线程会频繁地读取配置,而写者线程则周期性地更新配置。通过运行此代码,您会观察到读者线程在几乎没有停顿的情况下持续读取配置,而写者线程在更新时虽然会经历 synchronize_rcu 的阻塞,但不会影响其他读者。

注意:为了确保所有线程都能正确注册 RCU 状态,每个线程在其生命周期中第一次执行 RCU 操作前,都应该调用 initialize_rcu_thread()。这可以确保 thread_local RcuState 被正确地添加到 RcuManager 的跟踪列表中。

五、性能考量与扩展性

RCU 在读性能方面表现卓越,但其写性能和内存占用需要仔细评估:

  • 读性能rcu_read_lock()rcu_read_unlock() 仅涉及轻量级的原子操作和 thread_local 变量访问,开销极低。get_config() 主要开销在于 shared_ptr 的构造和析构。
  • 写性能update_config() 的开销主要来自:
    • 数据复制:`ConfigData

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注