各位同事,各位技术爱好者,大家好!
今天我们将深入探讨一个在高性能并发系统中至关重要的无锁技术——Read-Copy-Update (RCU)。特别地,我们将聚焦于如何在用户态 C++ 中实现 RCU,并将其应用于构建一个支持万级并发读的配置系统。
在现代分布式服务和高并发应用中,配置系统扮演着核心角色。它需要提供极高的读取性能,因为几乎每一个请求处理流程都可能需要访问配置。同时,配置的更新频率相对较低,但更新时必须保证数据一致性,且不能长时间阻塞读取操作。传统的锁机制,如互斥锁(mutex),在读多写少的场景下会成为性能瓶颈,因为即使是读操作也可能需要获取共享锁,从而引入上下文切换和锁竞争开销。
为了解决这个问题,我们需要一种机制,允许读者几乎无障碍地访问数据,而写者则在后台进行更新,并在一个“安全”的时间点回收旧数据。RCU 正是为此而生。
一、RCU 核心思想与适用场景
RCU 的全称是 "Read-Copy-Update",顾名思义,它包含三个核心操作:
- Read (读):读者直接访问共享数据,无需获取锁。这是 RCU 能够实现极高读性能的关键。
- Copy (复制):当需要修改数据时,写者首先复制一份当前数据。
- Update (更新):写者修改复制出来的新数据,然后原子地将共享指针指向新数据。
RCU 的精髓在于,在写者原子更新共享指针后,旧版本的数据并不会立即被销毁。它需要等待一个“宽限期(Grace Period)”,确保所有在更新前开始读取旧数据的读者都已完成读取,不再持有对旧数据的引用。只有在宽限期结束后,旧数据才能安全地回收。
RCU 的优势:
- 极高的读性能:读者几乎无锁(或只进行极轻量的原子操作),只需简单的指针解引用。
- 低写者开销(相对):写者通过复制和原子更新指针完成数据修改,避免了长时间持有锁。
- 适用于读多写少场景:这是 RCU 发挥最大价值的领域,如配置管理、路由表、DNS 缓存等。
RCU 的限制:
- 写者开销:写者需要复制整个数据结构,这可能导致较高的内存和 CPU 开销,尤其对于大型数据结构。
- 内存占用:在宽限期内,旧版本的数据会持续存在,增加了内存占用。
- 复杂性:实现用户态 RCU 需要精心设计宽限期机制和内存回收策略。
- 不适用于通用锁替代:RCU 不解决写写冲突,也不适用于需要频繁修改数据或数据结构拓扑变化的场景。
为什么 RCU 适合配置系统?
配置系统通常具有以下特点:
- 读操作极其频繁:每个服务请求都可能读取配置。
- 写操作相对稀疏:配置更新不频繁,但在更新时需要平滑过渡。
- 数据结构相对稳定:配置数据通常是键值对、树或列表,其结构变化不大。
- 一致性要求高:读者必须看到一致的配置视图。
RCU 完美契合这些需求,它能确保读者总是看到一个完整的、一致的配置快照,而不会被更新操作中断。
二、用户态 RCU 的挑战
RCU 最初是 Linux 内核中的一种同步机制,它利用了内核对进程调度和上下文切换的深刻理解来定义“宽限期”和“静止状态(Quiescent State)”。然而,在用户态实现 RCU,我们面临着独特的挑战:
- 缺乏内核支持:我们无法直接使用
synchronize_rcu()这样的内核 API,也无法依赖内核来定义静止状态。在用户态,我们不能简单地假定一个线程在离开 CPU 后就进入了静止状态。 - 自定义宽限期机制:我们需要在用户态自行实现宽限期检测逻辑。这通常涉及线程注册、线程状态跟踪和全局计数器/ epoch 管理。
- 内存回收管理:内核 RCU 有
call_rcu()这样的机制来在宽限期结束后异步释放内存。在用户态,我们需要实现类似的延迟回收机制。 - 线程生命周期管理:当线程启动或退出时,它必须正确地注册或注销其 RCU 状态,以避免僵尸线程状态影响宽限期检测。
为了应对这些挑战,我们将设计一套用户态 RCU 基础设施,它由一个全局的 RcuManager 和每个线程私有的 RcuState 组成。
三、用户态 RCU 核心设计与实现
我们将采用一种基于 epoch(纪元)的机制来管理 RCU 的宽限期。
3.1 核心组件
1. RcuState (线程本地 RCU 状态)
每个参与 RCU 读操作的线程都将拥有一个 thread_local 的 RcuState 实例。
#include <atomic>
#include <vector>
#include <functional>
#include <mutex> // For global manager's registration/reclamation queues
#include <thread> // For std::this_thread::yield
#include <chrono> // For std::chrono::microseconds
#include <memory> // For std::shared_ptr
#include <map> // For ConfigData
// Forward declaration for RcuManager to use in RcuState
class RcuManager;
// Thread-local RCU state
struct RcuState {
// Stores the epoch when the thread *entered* its RCU read section.
// A value of 0 indicates the thread is not currently in an RCU read section.
std::atomic<uint64_t> thread_current_epoch;
int read_section_depth; // To handle nested rcu_read_lock calls
std::vector<std::function<void()>> thread_local_reclamation_queue; // Callbacks deferred by this thread
RcuState() : thread_current_epoch(0), read_section_depth(0) {}
// Method to register this thread's state with the global manager
void register_with_manager(RcuManager* manager);
};
thread_current_epoch: 这是一个原子变量,用于存储线程进入 RCU 读区时RcuManager的全局 epoch 值。当线程退出 RCU 读区时,它会被设置为 0。synchronize_rcu将等待所有活跃线程的thread_current_epoch值。read_section_depth: 用于支持 RCU 读区的嵌套。只有当深度为 0 时,才真正进入或退出 RCU 读区。thread_local_reclamation_queue: 用于存储由该线程调用call_rcu延迟的内存回收回调。当线程退出最外层 RCU 读区时,这些回调会被转移到全局的回收队列中。
2. RcuManager (全局 RCU 管理器)
RcuManager 是一个单例,负责管理全局 epoch、注册的线程状态以及处理内存回收。
class RcuManager {
public:
static RcuManager& get_instance() {
static RcuManager instance;
return instance;
}
// Register a thread's RcuState. This is typically called once per thread
// when it first enters an RCU read section.
void register_thread_epoch(std::atomic<uint64_t>* thread_epoch_ptr) {
std::lock_guard<std::mutex> lock(_registration_mutex);
// Avoid duplicate registration if a thread re-registers (e.g., if we didn't track it locally)
// For simplicity, we assume one-time registration per thread's lifetime here.
// In a robust system, one might use a std::set or a map with thread_id as key.
_registered_thread_epochs.push_back(thread_epoch_ptr);
}
// Get the current global epoch. Used by readers to mark their entry.
uint64_t get_global_epoch() const {
return _global_epoch.load(std::memory_order_relaxed);
}
// Schedule a callback for execution after a grace period.
// This function is typically called by a writer or by rcu_read_unlock
// to transfer thread-local deferred callbacks.
void call_rcu_deferred(std::function<void()> callback) {
std::lock_guard<std::mutex> lock(_global_reclamation_mutex);
_global_reclamation_queue_next_epoch.push_back(std::move(callback));
}
// Block until all RCU read sections that started before the current
// grace period have completed. Then, reclaim memory.
void synchronize_rcu() {
// 1. Advance the global epoch. This marks the start of a new grace period.
// `target_epoch` will be the epoch value *before* the increment.
// Readers starting *after* this point will see a higher epoch.
uint64_t target_epoch = _global_epoch.fetch_add(1, std::memory_order_acq_rel);
// 2. Wait for all registered threads to pass the 'target_epoch'.
// This means they have either exited their RCU read section (thread_current_epoch == 0)
// or have started a new RCU read section after target_epoch was set.
std::vector<std::atomic<uint64_t>*> current_registered_threads;
{
std::lock_guard<std::mutex> lock(_registration_mutex);
current_registered_threads = _registered_thread_epochs;
}
for (auto thread_epoch_ptr : current_registered_threads) {
// Spin-wait until the thread's epoch marker is either 0 (not in RCU)
// or greater than the target_epoch (started a new RCU section after target_epoch was set).
while (true) {
uint64_t thread_epoch = thread_epoch_ptr->load(std::memory_order_acquire);
if (thread_epoch == 0 || thread_epoch > target_epoch) {
break;
}
// Yield to other threads to avoid busy-waiting too aggressively
std::this_thread::yield();
}
}
// 3. All readers that were active at or before `target_epoch` have now completed or moved on.
// It's safe to execute reclamation callbacks associated with the completed grace period.
{
std::lock_guard<std::mutex> lock(_global_reclamation_mutex);
for (auto& callback : _global_reclamation_queue) {
callback(); // Execute the deferred deletion
}
_global_reclamation_queue.clear();
// Move callbacks from the "next epoch" queue to the current queue.
// These callbacks were scheduled *during* the grace period that just completed,
// so they need to wait for the *next* grace period to be safe.
_global_reclamation_queue.swap(_global_reclamation_queue_next_epoch);
}
}
private:
RcuManager() : _global_epoch(1) {} // Start epoch at 1
~RcuManager() = default;
RcuManager(const RcuManager&) = delete;
RcuManager& operator=(const RcuManager&) = delete;
std::atomic<uint64_t> _global_epoch;
std::vector<std::atomic<uint64_t>*> _registered_thread_epochs;
std::mutex _registration_mutex; // Protects _registered_thread_epochs
// Callbacks for the current grace period (to be processed *after* the current grace period)
std::vector<std::function<void()>> _global_reclamation_queue;
// Callbacks for the next grace period (to be added to _global_reclamation_queue after current is processed)
std::vector<std::function<void()>> _global_reclamation_queue_next_epoch;
std::mutex _global_reclamation_mutex; // Protects reclamation queues
};
_global_epoch: 全局原子计数器,每次synchronize_rcu调用时递增。_registered_thread_epochs: 存储所有已注册线程的thread_current_epoch指针。_registration_mutex保护其访问。_global_reclamation_queue和_global_reclamation_queue_next_epoch: 这是两个回收队列,采用双缓冲机制。_global_reclamation_queue存储需要在当前宽限期结束后回收的旧数据回调,而_global_reclamation_queue_next_epoch存储在当前宽限期进行中时新加入的回收回调,它们将在下一个宽限期结束后才被处理。这确保了足够长的延迟,以覆盖所有可能的读者。synchronize_rcu(): 这是核心函数。它递增_global_epoch,然后遍历所有已注册线程的thread_current_epoch。它会自旋等待,直到所有线程的thread_current_epoch都> target_epoch(表示它们已经看到了新的 epoch 并进入了新的 RCU 读区)或等于0(表示它们已退出 RCU 读区)。一旦所有线程都满足条件,就表示宽限期已过,可以安全地执行_global_reclamation_queue中的回调,然后交换两个回收队列。
3.2 RCU 操作宏/函数
接下来,我们定义用户态 RCU 的基本操作:
// Define thread_local RcuState
thread_local RcuState t_rcu_state;
// Implement RcuState::register_with_manager
void RcuState::register_with_manager(RcuManager* manager) {
manager->register_thread_epoch(&thread_current_epoch);
}
// RCU read-side critical section entry
inline void rcu_read_lock() {
if (t_rcu_state.read_section_depth == 0) {
// First entry into an RCU read section for this thread
// Lazily register this thread's RCU state if not already registered.
// `registered_this_thread` is thread_local, ensuring it's set once per thread.
static thread_local bool registered_this_thread = false;
if (!registered_this_thread) {
t_rcu_state.register_with_manager(&RcuManager::get_instance());
registered_this_thread = true;
}
// Store the current global epoch into the thread's marker.
// This signifies that this thread is now active in an RCU read section
// associated with this epoch.
t_rcu_state.thread_current_epoch.store(
RcuManager::get_instance().get_global_epoch(),
std::memory_order_relaxed // Relaxed is fine here, as synchronization is handled by synchronize_rcu
);
}
t_rcu_state.read_section_depth++;
}
// RCU read-side critical section exit
inline void rcu_read_unlock() {
t_rcu_state.read_section_depth--;
if (t_rcu_state.read_section_depth == 0) {
// Last exit from an RCU read section for this thread
// Set thread_current_epoch to 0 to signal that this thread is no longer
// in an RCU read section. This is a release operation to ensure prior
// memory accesses are visible.
t_rcu_state.thread_current_epoch.store(0, std::memory_order_release);
// Move thread-local deferred callbacks to the global queue for the *next* grace period.
// This ensures they are processed with a sufficient delay.
if (!t_rcu_state.thread_local_reclamation_queue.empty()) {
RcuManager::get_instance().call_rcu_deferred_internal(
std::make_move_iterator(t_rcu_state.thread_local_reclamation_queue.begin()),
std::make_move_iterator(t_rcu_state.thread_local_reclamation_queue.end())
);
t_rcu_state.thread_local_reclamation_queue.clear();
}
}
}
// Defer a reclamation callback
inline void call_rcu(std::function<void()> callback) {
t_rcu_state.thread_local_reclamation_queue.push_back(std::move(callback));
}
rcu_read_lock():- 如果这是该线程第一次进入 RCU 读区(
read_section_depth == 0),它会确保线程的RcuState已注册到RcuManager。 - 然后,它将
_global_epoch的当前值存储到t_rcu_state.thread_current_epoch中,表示该线程正在读取此 epoch 的数据。 read_section_depth递增。
- 如果这是该线程第一次进入 RCU 读区(
rcu_read_unlock():read_section_depth递减。- 如果深度变为 0,表示线程完全离开了 RCU 读区。此时,
t_rcu_state.thread_current_epoch被设置为 0(一个release操作,确保所有读操作在退出前完成),表示线程不再活跃。 - 同时,将线程本地的回收队列中的回调移动到
RcuManager的_global_reclamation_queue_next_epoch中。
call_rcu(): 允许任何线程(通常是读者在 RCU 读区内,或写者在更新操作中)将一个回调函数添加到其线程本地的回收队列中。
RcuManager::call_rcu_deferred_internal
为了批量移动 std::vector 的内容到另一个 std::vector,需要一个内部辅助函数:
// Add to RcuManager class:
// Internal helper to add multiple callbacks efficiently
void call_rcu_deferred_internal(std::vector<std::function<void()>>::iterator begin,
std::vector<std::function<void()>>::iterator end) {
std::lock_guard<std::mutex> lock(_global_reclamation_mutex);
_global_reclamation_queue_next_epoch.insert(_global_reclamation_queue_next_epoch.end(),
std::make_move_iterator(begin),
std::make_move_iterator(end));
}
3.3 内存顺序与原子性
RCU 的正确性高度依赖于 C++11 引入的内存模型和原子操作。
_current_config.load(std::memory_order_acquire)(读者):acquire语义确保在加载_current_config指针之后的所有内存访问,都不能被重排序到加载操作之前。- 这意味着读者在获取到新指针后,能够看到写者在更新指针之前对新数据结构的所有修改。
_current_config.store(new_config_data, std::memory_order_release)(写者):release语义确保在存储_current_config指针之前的所有内存访问(即对new_config_data的修改),都不能被重排序到存储操作之后。- 这意味着当读者通过
acquire语义看到新指针时,它也能看到写者在存储新指针之前所做的所有修改。
_global_epoch.fetch_add(1, std::memory_order_acq_rel)(synchronize_rcu):acquire-release语义确保_global_epoch的递增操作本身是原子且有序的。- 它在
fetch_add之前对所有操作起到release作用,在fetch_add之后对所有操作起到acquire作用,从而建立了一个全局的同步点。
thread_epoch_ptr->load(std::memory_order_acquire)(synchronize_rcu等待):- 当
synchronize_rcu读取线程的thread_current_epoch时,使用acquire语义。这与rcu_read_unlock中将thread_current_epoch设置为 0 的release语义形成同步对。 - 这意味着一旦
synchronize_rcu看到线程的thread_current_epoch为 0 或> target_epoch,它就能保证该线程在退出或更新thread_current_epoch之前的所有操作都已经完成并可见。
- 当
这些内存屏障和原子操作共同确保了 RCU 的三个关键属性:
- 数据一致性:读者总是看到一个完整且一致的数据快照。
- 写后读可见性:一旦写者更新了指针,新读者将看到新数据。
- 旧数据回收安全性:只有在所有旧读者都已完成其读操作后,旧数据才会被回收。
四、基于 RCU 的配置系统实现
现在,我们把 RCU 机制应用到配置系统上。
4.1 配置数据结构
// Example configuration data structure
struct ConfigData {
std::map<std::string, std::string> settings;
uint64_t version; // To track configuration version
ConfigData(uint64_t v = 0) : version(v) {}
// Deep copy constructor for copy-on-write
ConfigData(const ConfigData& other) : settings(other.settings), version(other.version) {}
void print() const {
std::cout << "Config Version: " << version << std::endl;
for (const auto& pair : settings) {
std::cout << " " << pair.first << ": " << pair.second << std::endl;
}
}
};
// Smart pointer alias for configuration data managed by RCU
using ConfigPtr = ConfigData*;
ConfigData 是我们的配置负载,它包含一个 std::map 来存储键值对,以及一个 version 字段来追踪配置版本。提供一个深拷贝构造函数是实现“Copy-on-Write”的关键。
4.2 ConfigManager 类
ConfigManager 是配置系统的核心,它持有指向当前配置的 std::atomic 指针,并提供读写接口。
class ConfigManager {
public:
ConfigManager() : _current_config(new ConfigData()) {}
// Read configuration (RCU read-side)
std::shared_ptr<const ConfigData> get_config() const {
rcu_read_lock();
// The pointer itself is atomic, but the data it points to is immutable for readers.
// We use a custom deleter for shared_ptr to integrate with RCU reclamation.
ConfigPtr raw_ptr = _current_config.load(std::memory_order_acquire);
// Create a shared_ptr from the raw pointer with an RCU-aware deleter.
// The shared_ptr takes ownership, and when its last instance is destructed,
// RcuDeleter schedules the actual deletion after a grace period via call_rcu.
struct RcuDeleter {
void operator()(const ConfigData* ptr) const {
// When the last shared_ptr goes out of scope, defer deletion via RCU
call_rcu([ptr]() {
delete ptr;
});
}
};
std::shared_ptr<const ConfigData> config_snapshot(raw_ptr, RcuDeleter());
rcu_read_unlock(); // Unlock after creating the shared_ptr, which now manages lifetime
return config_snapshot;
}
// Update configuration (RCU write-side)
void update_config(std::function<void(ConfigData&)> update_func) {
// 1. Create a new version of the configuration
ConfigPtr old_config = _current_config.load(std::memory_order_relaxed);
ConfigData* new_config_data = new ConfigData(*old_config); // Copy-on-write
new_config_data->version++; // Increment version
update_func(*new_config_data); // Apply updates
// 2. Atomically swap the pointer
_current_config.store(new_config_data, std::memory_order_release);
// 3. Schedule old_config for reclamation after a grace period.
// This is a writer deferring, so it goes directly to RcuManager's global queue.
RcuManager::get_instance().call_rcu_deferred([old_config]() {
delete old_config;
});
// 4. Wait for a grace period. This ensures no reader is still using old_config.
// This is the blocking part for writers, but readers are never blocked.
RcuManager::get_instance().synchronize_rcu();
}
private:
std::atomic<ConfigPtr> _current_config;
};
_current_config:std::atomic<ConfigPtr>存储指向当前活跃配置数据的指针。这是 RCU 机制的核心共享可变状态。get_config()(读操作):- 调用
rcu_read_lock()进入 RCU 读区。 - 原子地加载
_current_config指针。由于 RCU 保证了旧数据在宽限期内有效,我们无需担心raw_ptr立即失效。 - 关键步骤:创建一个
std::shared_ptr,并为其提供一个自定义的RcuDeleter。当这个shared_ptr的引用计数降为 0 时,RcuDeleter不会立即delete原始指针,而是通过call_rcu将删除操作延迟到宽限期结束后。这样,shared_ptr就能在 RCU 读区之外安全地管理配置对象的生命周期。 - 调用
rcu_read_unlock()退出 RCU 读区。
- 调用
update_config()(写操作):- 加载当前活跃的配置指针
old_config。 - 基于
old_config创建一个全新的ConfigData副本new_config_data(Copy-on-Write)。 - 对
new_config_data应用所有必要的更新,例如修改设置、递增版本号。 - 原子地将
_current_config指针更新为new_config_data。这是一个release操作,确保所有对new_config_data的修改在指针更新前完成。 - 调用
RcuManager::get_instance().call_rcu_deferred()将old_config的删除操作延迟到宽限期结束后。 - 调用
RcuManager::get_instance().synchronize_rcu()。这是写者唯一的阻塞点,它会等待所有在指针更新前开始读取old_config的读者都完成。
- 加载当前活跃的配置指针
4.3 整体流程总结
RCU 配置系统的读写流程如下表所示:
| 角色 | 操作 | 步骤 | 内存顺序/同步点 |
|---|---|---|---|
| 读者 | get_config() |
1. 调用 rcu_read_lock() 进入 RCU 读区。 |
thread_current_epoch 写入 relaxed |
2. 原子加载 _current_config 指针。 |
_current_config 加载 acquire |
||
3. 基于加载的指针创建 std::shared_ptr,并附带 RCU 感知的自定义删除器。 |
无 | ||
4. 调用 rcu_read_unlock() 退出 RCU 读区。 |
thread_current_epoch 写入 release |
||
| 写者 | update_config() |
1. 加载 _current_config 指针(old_config)。 |
_current_config 加载 relaxed |
2. 深拷贝 old_config 到 new_config_data,并进行修改。 |
无 | ||
3. 原子更新 _current_config 指针为 new_config_data。 |
_current_config 存储 release |
||
4. 将 old_config 的删除操作通过 RcuManager::call_rcu_deferred 延迟到宽限期结束后。 |
_global_reclamation_mutex 保护 |
||
5. 调用 RcuManager::synchronize_rcu() 等待宽限期结束:a. 递增 _global_epoch (target_epoch)。b. 自旋等待所有注册线程的 thread_current_epoch 变为 0 或 > target_epoch。c. 执行 _global_reclamation_queue 中的回调,释放旧数据。d. 交换回收队列。 |
_global_epoch acq_rel;thread_epoch_ptr 加载 acquire |
4.4 示例与测试
// --- Test Code ---
ConfigManager g_config_manager;
// This function needs to be called by each thread that might perform RCU operations.
// It ensures thread_local RcuState is properly initialized and registered.
void initialize_rcu_thread() {
// Calling rcu_read_lock once will register the thread's RcuState
// even if it immediately unlocks.
rcu_read_lock();
rcu_read_unlock();
}
void reader_thread_func(int id, int num_reads) {
initialize_rcu_thread(); // Initialize RCU for this thread
for (int i = 0; i < num_reads; ++i) {
std::shared_ptr<const ConfigData> config = g_config_manager.get_config();
// Simulate work with configuration data
// For demonstration, avoid heavy I/O in reader loop to focus on RCU performance
// std::cout << "Reader " << id << ": Config version " << config->version << ", Key 'app.name': " << config->settings.at("app.name") << std::endl;
(void)config->version; // Just access to ensure it's valid
if (config->settings.count("app.name")) {
(void)config->settings.at("app.name");
}
std::this_thread::sleep_for(std::chrono::microseconds(10)); // Simulate some work
}
}
void writer_thread_func(int id, int num_writes) {
initialize_rcu_thread(); // Initialize RCU for this thread
for (int i = 0; i < num_writes; ++i) {
g_config_manager.update_config([&](ConfigData& data) {
data.settings["app.name"] = "MyCoolApp_v" + std::to_string(data.version + 1);
data.settings["feature.toggle"] = (data.version % 2 == 0) ? "enabled" : "disabled";
data.settings["writer.id"] = std::to_string(id);
});
std::cout << "Writer " << id << ": Updated config to version " << g_config_manager.get_config()->version << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(50)); // Simulate some work
}
}
int main() {
std::cout << "Starting RCU-based configuration system simulation." << std::endl;
const int num_readers = 10000; // 万级并发读
const int num_writers = 2;
const int reads_per_reader = 100;
const int writes_per_writer = 10;
// Initial configuration
g_config_manager.update_config([](ConfigData& data){
data.settings["app.name"] = "InitialApp";
data.settings["feature.toggle"] = "enabled";
});
std::cout << "Initial config version: " << g_config_manager.get_config()->version << std::endl;
std::vector<std::thread> readers;
for (int i = 0; i < num_readers; ++i) {
readers.emplace_back(reader_thread_func, i, reads_per_reader);
}
std::vector<std::thread> writers;
for (int i = 0; i < num_writers; ++i) {
writers.emplace_back(writer_thread_func, i, writes_per_writer);
}
for (auto& t : readers) {
t.join();
}
for (auto& t : writers) {
t.join();
}
std::cout << "Simulation finished. Final config version: " << g_config_manager.get_config()->version << std::endl;
// Ensure all remaining deferred callbacks are processed
RcuManager::get_instance().synchronize_rcu();
RcuManager::get_instance().synchronize_rcu(); // Run twice to catch all double-buffered callbacks
std::cout << "All deferred reclamations processed." << std::endl;
return 0;
}
在 main 函数中,我们创建了 10000 个读者线程和 2 个写者线程。读者线程会频繁地读取配置,而写者线程则周期性地更新配置。通过运行此代码,您会观察到读者线程在几乎没有停顿的情况下持续读取配置,而写者线程在更新时虽然会经历 synchronize_rcu 的阻塞,但不会影响其他读者。
注意:为了确保所有线程都能正确注册 RCU 状态,每个线程在其生命周期中第一次执行 RCU 操作前,都应该调用 initialize_rcu_thread()。这可以确保 thread_local RcuState 被正确地添加到 RcuManager 的跟踪列表中。
五、性能考量与扩展性
RCU 在读性能方面表现卓越,但其写性能和内存占用需要仔细评估:
- 读性能:
rcu_read_lock()和rcu_read_unlock()仅涉及轻量级的原子操作和thread_local变量访问,开销极低。get_config()主要开销在于shared_ptr的构造和析构。 - 写性能:
update_config()的开销主要来自:- 数据复制:`ConfigData