各位编程领域的同仁们,大家好!
今天,我们将深入探讨一个在高性能并发编程中至关重要且极具挑战性的话题:无锁内存回收(Lock-free Memory Reclamation)。在构建高度可伸缩的并发数据结构时,我们常常追求无锁(lock-free)或无等待(wait-free)的算法,以避免传统锁机制带来的上下文切换、死锁和优先级反转等问题。然而,一旦我们摆脱了锁,就必须面对一个棘手的问题:当一个线程从数据结构中移除一个节点并打算释放它时,如何确保没有其他线程仍然持有该节点的指针,并且可能在访问它?这就是“使用后释放”(use-after-free)错误,它是并发编程中的一个经典难题。
传统的 delete 或 free 操作是同步的、立即的。但在无锁环境中,一个线程可能已经将一个节点从链表中摘除,但在它完成 delete 之前,另一个线程可能正在遍历链表,并且仍然持有对该节点的引用。如果此时 delete 发生,那么正在遍历的线程就会访问到一块已经被释放的内存,这会导致程序崩溃、数据损坏或更难以追踪的未定义行为。
为了解决这个问题,我们不能直接释放内存。我们需要一种机制来“延迟”内存的释放,直到所有可能持有该内存引用的线程都已不再使用它。这正是无锁内存回收技术所要解决的核心问题。今天,我们将重点对比两种最常用且有效的无锁内存回收策略:危险指针(Hazard Pointers) 和 基于纪元回收(Epoch-based Reclamation, EBR)。
1. 无锁编程的挑战与内存回收的必要性
在深入具体技术之前,我们先来回顾一下无锁编程的基本理念和它带来的挑战。无锁编程的目标是设计并发算法,使得即使在多个线程同时访问共享数据时,也能保证程序正确性,并且至少有一个线程能持续地向前推进,而不会因为等待锁而阻塞。这通常通过原子操作(Atomic Operations)和内存屏障(Memory Barriers/Fences)来实现。
例如,一个简单的无锁栈的 pop 操作可能如下:
template <typename T>
class LockFreeStack {
private:
struct Node {
T data;
Node* next;
Node(const T& d) : data(d), next(nullptr) {}
};
std::atomic<Node*> head;
public:
LockFreeStack() : head(nullptr) {}
void push(const T& value) {
Node* new_node = new Node(value);
Node* old_head;
do {
old_head = head.load(std::memory_order_relaxed);
new_node->next = old_head;
} while (!head.compare_exchange_weak(old_head, new_node,
std::memory_order_release,
std::memory_order_relaxed));
}
// 假设的 pop 操作,尚未考虑内存回收
T pop_unsafe() {
Node* old_head;
Node* new_head;
do {
old_head = head.load(std::memory_order_acquire);
if (!old_head) {
// Stack is empty
throw std::runtime_error("Stack is empty");
}
new_head = old_head->next;
} while (!head.compare_exchange_weak(old_head, new_head,
std::memory_order_release,
std::memory_order_relaxed));
T result = old_head->data; // Danger! old_head might be freed by another pop_unsafe() if multiple pops happen concurrently
// delete old_head; // This is the problematic part!
return result;
}
};
在上述 pop_unsafe 函数中,如果线程 A 成功地通过 compare_exchange_weak 将 head 更新为 new_head,那么 old_head 指向的节点理论上就从栈中移除了。此时,线程 A 想要 delete old_head;。
但是,考虑以下场景:
- 线程 A 读到
head为N1。 - 线程 B 读到
head为N1。 - 线程 A 成功地将
head更新为N2(N1->next)。 - 线程 A 继续执行
T result = old_head->data;,此时old_head仍然是N1。 - 在线程 A 访问
N1->data之前,另一个线程 C 也调用pop_unsafe,并且它成功地pop掉了N1,然后delete N1;。 - 线程 A 现在尝试访问
old_head->data(即N1->data),但N1已经被释放!这会导致经典的“使用后释放”错误。
这个问题被称为“ABA 问题”的一个变种,但更直接的危险是“使用后释放”。解决这类问题的核心思路就是:一个节点一旦从数据结构中逻辑移除,不能立即物理释放。它必须进入一个“待回收”状态,直到系统确认所有可能持有其引用的线程都已不再使用它。
2. 危险指针(Hazard Pointers)
危险指针是一种由 Maged M. Michael 在 2004 年提出的内存回收技术。其核心思想是:每个活跃的读线程都维护一个或多个特殊的指针,称为“危险指针”,用来指向它当前正在访问的那些可能被其他线程删除的共享数据结构节点。当一个线程想要删除一个节点时,它首先检查所有其他线程的危险指针。如果某个节点的地址出现在任何一个危险指针中,那么该节点就不能被立即删除,必须等待。
2.1 机制详解
- 线程本地危险指针数组: 每个参与无锁数据结构操作的线程,都拥有一个固定大小的危险指针数组(通常是 1-3 个指针)。这些数组是共享的,其他线程可以读取它们。
- 发布危险指针: 当一个读线程要访问一个从共享数据结构中读取到的指针
P时,它会先将P的地址写入自己的一个危险指针槽位中。这个写入必须是原子操作,并且确保写入对其他线程可见(通常通过std::atomic_store配合memory_order_release)。 - 验证指针有效性: 在发布危险指针后,读线程必须重新读取共享数据结构中指向
P的那个指针。如果重新读取的值与P不一致(这意味着P已经被其他线程移除了),那么读线程必须放弃当前的P,重新开始操作。这个步骤是防止 ABA 问题的关键。如果一致,则P是安全的,可以被当前线程安全地访问。 - 退休节点: 当一个线程从数据结构中逻辑上移除了一个节点
N时,它不能立即释放N。相反,它将N添加到一个线程本地的“待回收列表”(retired list)中。 - 回收(Reclamation): 周期性地(例如,当待回收列表达到一定大小时,或者在某个固定时间间隔后),某个线程(可以是任何线程,也可以是专门的回收器线程)会触发回收过程。
- 它会收集当前所有活跃线程的危险指针数组中的所有指针,形成一个全局的“危险指针集合”。
- 然后,它遍历自己线程的待回收列表中的每个节点。如果一个节点
M的地址不在全局危险指针集合中,那么就说明没有其他线程正在访问M,此时M可以被安全地释放。 - 如果
M的地址在危险指针集合中,它必须继续留在待回收列表中,等待下一次回收尝试。
2.2 算法步骤
假设我们有一个全局的 HazardPointerManager 管理所有线程的危险指针。
读者线程 T_read 的操作流程:
- 获取一个危险指针槽位
hp_slot(例如,hp_slot = hazard_pointers[thread_id][0])。 - 循环:
a.ptr = shared_ptr.load(std::memory_order_acquire);// 从共享数据结构中读取指针
b.hp_slot.store(ptr, std::memory_order_release);// 发布 ptr 到危险指针
c.std::atomic_thread_fence(std::memory_order_seq_cst);// 内存屏障,确保发布在重读之前完成
d.if (shared_ptr.load(std::memory_order_acquire) != ptr)// 重新检查共享指针是否仍指向 ptr
continue;// 如果不一致,说明 ptr 可能已被修改,重试
e.break;// 否则,ptr 是安全的,可以访问 - 使用
ptr进行操作。 - 操作完成后,
hp_slot.store(nullptr, std::memory_order_release);// 清除危险指针
写入线程 T_write (或回收线程) 的操作流程:
- 当
T_write从数据结构中逻辑移除节点N后,将其添加到线程本地的retired_list。
retired_list.push_back(N); - 当
retired_list达到某个阈值或周期性地:
a. 收集所有活跃线程的危险指针,构建一个hazard_set(例如,std::unordered_set<void*>)。
for each thread_id:
for each hp_slot in hazard_pointers[thread_id]:
hazard_set.insert(hp_slot.load(std::memory_order_acquire));
b. 创建新的new_retired_list。
c.for each node M in retired_list:
if (hazard_set.find(M) == hazard_set.end()):// 如果 M 不在危险指针集合中
delete M;// 安全释放
else:
new_retired_list.push_back(M);// 否则,保留待下次回收
d.retired_list = new_retired_list;
2.3 代码示例 (简化版 C++)
为了演示核心机制,我们省略了线程管理和复杂的内存管理,只关注危险指针的发布和回收。
#include <atomic>
#include <vector>
#include <thread>
#include <set>
#include <iostream>
#include <stdexcept> // For std::runtime_error
#include <map> // For thread-local hazard pointers and retired lists
// --- Hazard Pointer Global Management ---
class HazardPointerManager {
public:
static constexpr int MAX_HAZARD_POINTERS_PER_THREAD = 2; // Typically small
static constexpr int MAX_THREADS = 16; // Max number of threads we expect
// Global array of hazard pointers for all threads
// Each thread gets MAX_HAZARD_POINTERS_PER_THREAD slots
std::atomic<void*> hazard_pointers[MAX_THREADS][MAX_HAZARD_POINTERS_PER_THREAD];
std::atomic<int> thread_counter; // To assign unique thread IDs
HazardPointerManager() : thread_counter(0) {
for (int i = 0; i < MAX_THREADS; ++i) {
for (int j = 0; j < MAX_HAZARD_POINTERS_PER_THREAD; ++j) {
hazard_pointers[i][j].store(nullptr, std::memory_order_relaxed);
}
}
}
// Register a thread and get its ID
int register_thread() {
int id = thread_counter.fetch_add(1, std::memory_order_relaxed);
if (id >= MAX_THREADS) {
throw std::runtime_error("Too many threads registered for HazardPointerManager");
}
return id;
}
// Get a specific hazard pointer slot for a thread
std::atomic<void*>& get_hazard_pointer_slot(int thread_id, int slot_idx) {
if (thread_id < 0 || thread_id >= MAX_THREADS || slot_idx < 0 || slot_idx >= MAX_HAZARD_POINTERS_PER_THREAD) {
throw std::out_of_range("Invalid thread_id or slot_idx for hazard pointer");
}
return hazard_pointers[thread_id][slot_idx];
}
// Collect all active hazard pointers
std::set<void*> collect_all_hazard_pointers() {
std::set<void*> active_hazards;
for (int i = 0; i < thread_counter.load(std::memory_order_acquire); ++i) {
for (int j = 0; j < MAX_HAZARD_POINTERS_PER_THREAD; ++j) {
void* ptr = hazard_pointers[i][j].load(std::memory_order_acquire);
if (ptr) {
active_hazards.insert(ptr);
}
}
}
return active_hazards;
}
};
// Global instance of the manager
HazardPointerManager g_hp_manager;
// Thread-local data for managing retired nodes
thread_local int tl_thread_id = -1; // Assigned on first use
thread_local std::vector<void*> tl_retired_nodes;
// Helper to get/assign thread ID
int get_or_assign_thread_id() {
if (tl_thread_id == -1) {
tl_thread_id = g_hp_manager.register_thread();
}
return tl_thread_id;
}
// Retire a node (add to thread-local retired list)
void retire_node(void* node) {
tl_retired_nodes.push_back(node);
}
// Scan and reclaim retired nodes
void scan_and_reclaim() {
if (tl_retired_nodes.empty()) {
return;
}
std::set<void*> active_hazards = g_hp_manager.collect_all_hazard_pointers();
std::vector<void*> new_retired_nodes;
for (void* node : tl_retired_nodes) {
if (active_hazards.find(node) == active_hazards.end()) {
// No one is pointing to this node, safe to delete
delete static_cast<char*>(node); // Assuming generic deletion, adjust for specific types
} else {
new_retired_nodes.push_back(node);
}
}
tl_retired_nodes = new_retired_nodes;
}
// --- Example Lock-Free Stack using Hazard Pointers ---
template <typename T>
class LockFreeStackHP {
private:
struct Node {
T data;
Node* next;
Node(const T& d) : data(d), next(nullptr) {}
};
std::atomic<Node*> head;
public:
LockFreeStackHP() : head(nullptr) {}
~LockFreeStackHP() {
// Clean up remaining nodes (simplified, not fully robust for concurrent shutdown)
Node* current = head.load();
while (current) {
Node* next = current->next;
delete current;
current = next;
}
}
void push(const T& value) {
Node* new_node = new Node(value);
Node* old_head;
do {
old_head = head.load(std::memory_order_relaxed);
new_node->next = old_head;
} while (!head.compare_exchange_weak(old_head, new_node,
std::memory_order_release,
std::memory_order_relaxed));
}
bool pop(T& result) {
int thread_id = get_or_assign_thread_id();
std::atomic<void*>& hp_slot = g_hp_manager.get_hazard_pointer_slot(thread_id, 0); // Use slot 0
Node* old_head;
Node* next_node;
do {
old_head = head.load(std::memory_order_acquire);
if (!old_head) {
hp_slot.store(nullptr, std::memory_order_release); // Clear hazard pointer
return false; // Stack is empty
}
hp_slot.store(old_head, std::memory_order_release); // Publish old_head as hazard
std::atomic_thread_fence(std::memory_order_seq_cst); // Ensure hazard is visible before re-reading head
// Re-read head to check if it changed (ABA problem mitigation)
if (head.load(std::memory_order_acquire) != old_head) {
continue; // head changed, retry
}
next_node = old_head->next;
} while (!head.compare_exchange_weak(old_head, next_node,
std::memory_order_release,
std::memory_order_relaxed));
result = old_head->data; // Now it's safe to access old_head->data
hp_slot.store(nullptr, std::memory_order_release); // Clear hazard pointer
retire_node(old_head); // Retire the node instead of deleting immediately
// Trigger reclamation periodically (e.g., after every N pops, or when retired_nodes list is large)
// For simplicity, let's trigger it every 10 pops or if list size > 100
if (tl_retired_nodes.size() > 100) { // Arbitrary threshold
scan_and_reclaim();
}
return true;
}
};
// Example usage in main function (not part of the lecture, but for completeness)
/*
void hp_test_thread(LockFreeStackHP<int>& stack, int num_ops) {
for (int i = 0; i < num_ops; ++i) {
stack.push(i);
int val;
if (stack.pop(val)) {
// std::cout << "Popped: " << val << std::endl;
}
}
// Ensure any remaining retired nodes are attempted for reclamation
scan_and_reclaim();
}
int main() {
LockFreeStackHP<int> stack;
std::vector<std::thread> threads;
int num_threads = 4;
int ops_per_thread = 10000;
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(hp_test_thread, std::ref(stack), ops_per_thread);
}
for (auto& t : threads) {
t.join();
}
std::cout << "Hazard Pointer stack test finished." << std::endl;
// Final scan to clean up any remaining retired nodes
scan_and_reclaim();
return 0;
}
*/
2.4 优缺点分析
优点:
- 细粒度保护: 危险指针只保护那些当前被线程实际引用的节点。一旦线程不再引用,节点就可以被回收。
- 可以处理任意数据结构: 只要你能识别出可能被回收的指针,就可以使用危险指针。
- 易于理解的核心思想: 概念上相对直观,即“我正在看这个,别动它”。
- 每个线程的危险指针数量固定且通常很小: 这限制了每个线程的内存开销。
缺点:
- 性能开销:
- 发布/清除开销: 每次读取共享指针并打算使用时,线程都需要执行原子写操作来发布危险指针,并在使用完毕后清除它。这可能涉及多次内存屏障,导致显著的 CPU 开销和缓存失效。
- 重读验证开销: 为了防止 ABA 问题,需要发布后立即重读共享指针并比较,这增加了额外的读取操作。
- 回收开销: 回收器需要收集所有活跃线程的所有危险指针,并构建一个
std::set(或哈希表)来快速查找。这个过程是 O(N * M),其中 N 是线程数,M 是每个线程的危险指针数。随着线程数增加,这个全局扫描开销会显著增加。
- 内存占用: 待回收列表可能会累积大量节点,直到回收器运行。如果回收频率不够高或线程长时间持有危险指针,内存占用可能很高。
- 扩展性挑战: 全局扫描危险指针的开销随着线程数的增加而线性增长,这在超大规模并发场景下可能成为瓶颈。
- 复杂性: 管理线程 ID、危险指针槽位以及何时触发回收等机制,增加了实现的复杂性。
3. 基于纪元回收(Epoch-based Reclamation, EBR)
基于纪元回收(EBR)是一种更粗粒度的内存回收策略,由 Dave Dice 和 Mark Moir 在 2005 年提出。它不跟踪单个指针,而是跟踪一个全局的“纪元”(epoch)和每个线程当前的“活动纪元”。一个节点只有在它被“退休”的纪元足够老,并且所有活跃线程都已从那个老纪元前进时,才能被安全释放。
3.1 机制详解
- 全局纪元计数器: 系统维护一个全局的原子计数器,表示当前的纪元(例如
std::atomic<long> global_epoch)。这个计数器会周期性地递增。 - 线程本地纪元注册: 每个参与无锁操作的线程,在进入其“关键区”(critical section)时,会读取当前的
global_epoch并将其注册到自己的线程本地槽位(例如thread_epochs[thread_id])中。这个注册操作通常是原子写入。当线程离开关键区时,它会清除自己的注册(例如写入 0 或 -1)。 - 退休节点: 当一个线程从数据结构中逻辑上移除了一个节点
N时,它会将N添加到一个线程本地的“待回收列表”中,并记录下N被退休时的global_epoch。 - 回收(Reclamation): 周期性地,某个线程(或专用回收器)会触发回收过程:
- 它首先原子地递增
global_epoch。 - 然后,它会检查所有活跃线程的
thread_epochs注册。目标是确定是否存在一个“安全纪元”(quiescent epoch),例如global_epoch - K(K 通常为 2 或 3)。 - 一个线程被认为是“安静的”(quiescent)相对于
global_epoch - K,如果它的thread_epochs槽位是 0(表示它不在关键区),或者它的thread_epochs槽位大于等于global_epoch - K。 - 如果所有活跃线程都对
global_epoch - K保持安静,那么所有在global_epoch - K或更早纪元退休的节点都可以被安全释放。 - 核心思想: 如果一个节点在纪元
E被退休,并且现在所有线程都已进入纪元E+K或更晚的纪元(或已离开关键区),那么可以断定没有线程会再持有对该节点的引用。这个K值(通常为 2 或 3)是为了确保足够的内存可见性和同步延迟。例如,K=2意味着一个节点在E退休,如果当前纪元是E+2且所有线程都报告其纪元>= E+2或不在关键区,那么E退休的节点是安全的。
- 它首先原子地递增
3.2 算法步骤
假设我们有一个全局的 EpochManager 管理所有线程的纪元。
全局状态:
std::atomic<long> global_epoch;std::atomic<long> thread_epoch_registry[MAX_THREADS];// 每个线程注册其当前纪元
读者线程 T_read 的操作流程:
-
int thread_id = get_or_assign_thread_id(); -
long current_global_epoch = global_epoch.load(std::memory_order_acquire); -
thread_epoch_registry[thread_id].store(current_global_epoch, std::memory_order_release);// 注册当前纪元 -
std::atomic_thread_fence(std::memory_order_seq_cst);// 内存屏障,确保注册在访问共享数据之前完成 -
访问共享数据结构: 进行读操作,例如
Node* ptr = shared_ptr.load();- 注意:这里不需要像危险指针那样为每个
ptr发布/清除。
- 注意:这里不需要像危险指针那样为每个
-
thread_epoch_registry[thread_id].store(0, std::memory_order_release);// 离开关键区,清除注册(0表示不在关键区)
写入线程 T_write (或回收线程) 的操作流程:
- 当
T_write从数据结构中逻辑移除节点N后,将其添加到线程本地的retired_list,并记录当前纪元。
retired_list.push_back({N, global_epoch.load(std::memory_order_relaxed)}); -
当
retired_list达到某个阈值或周期性地:
a.long current_epoch = global_epoch.fetch_add(1, std::memory_order_acq_rel);// 递增全局纪元
b.long safe_to_reclaim_epoch = current_epoch - 2;// 例如,K=2c.
bool all_quiescent = true;
d.for each thread_id:
long registered_epoch = thread_epoch_registry[thread_id].load(std::memory_order_acquire);
if (registered_epoch > 0 && registered_epoch < safe_to_reclaim_epoch):
all_quiescent = false;
break;// 有线程仍停留在太老的纪元
e.if (all_quiescent):
std::vector<void*> new_retired_nodes;
for each item {node, retired_at_epoch} in retired_list:
if (retired_at_epoch <= safe_to_reclaim_epoch):
delete node;// 安全释放
else:
new_retired_nodes.push_back({node, retired_at_epoch});
retired_list = new_retired_nodes;
3.3 代码示例 (简化版 C++)
#include <atomic>
#include <vector>
#include <thread>
#include <iostream>
#include <stdexcept>
#include <map>
// --- Epoch Global Management ---
class EpochManager {
public:
static constexpr int MAX_THREADS = 16;
static constexpr long EPOCH_QUIESCENT_GAP = 2; // K value for reclamation safety
std::atomic<long> global_epoch;
std::atomic<long> thread_epoch_registry[MAX_THREADS];
std::atomic<int> thread_counter;
EpochManager() : global_epoch(0), thread_counter(0) {
for (int i = 0; i < MAX_THREADS; ++i) {
thread_epoch_registry[i].store(0, std::memory_order_relaxed); // 0 means not in critical section
}
}
int register_thread() {
int id = thread_counter.fetch_add(1, std::memory_order_relaxed);
if (id >= MAX_THREADS) {
throw std::runtime_error("Too many threads registered for EpochManager");
}
return id;
}
// Enter critical section: register current epoch
void enter_epoch(int thread_id) {
long current_epoch = global_epoch.load(std::memory_order_acquire);
thread_epoch_registry[thread_id].store(current_epoch, std::memory_order_release);
// A fence might be needed here to ensure all reads *after* this point
// are ordered after the epoch registration.
// std::atomic_thread_fence(std::memory_order_seq_cst);
}
// Exit critical section: clear epoch registration
void exit_epoch(int thread_id) {
thread_epoch_registry[thread_id].store(0, std::memory_order_release);
}
// For reclamation: checks if all threads are quiescent relative to a target_epoch
bool are_all_quiescent_to(long target_epoch) {
for (int i = 0; i < thread_counter.load(std::memory_order_acquire); ++i) {
long registered_epoch = thread_epoch_registry[i].load(std::memory_order_acquire);
// If a thread is active (registered_epoch > 0) AND its registered epoch is older than target_epoch,
// then it's not quiescent relative to target_epoch.
if (registered_epoch > 0 && registered_epoch < target_epoch) {
return false;
}
}
return true;
}
// Advance global epoch and check for reclamation
// This function typically called by a dedicated reclaimer or a "lucky" thread
void advance_epoch_and_reclaim(std::vector<std::pair<void*, long>>& global_retired_list) {
long current_epoch = global_epoch.fetch_add(1, std::memory_order_acq_rel);
long safe_to_reclaim_epoch = current_epoch - EPOCH_QUIESCENT_GAP;
if (safe_to_reclaim_epoch < 0) return; // Not enough epochs have passed yet
if (are_all_quiescent_to(safe_to_reclaim_epoch)) {
// Collect all retired nodes that are now safe to delete
std::vector<std::pair<void*, long>> new_global_retired_list;
for (auto it = global_retired_list.begin(); it != global_retired_list.end(); ) {
if (it->second <= safe_to_reclaim_epoch) {
delete static_cast<char*>(it->first); // Delete the node
it = global_retired_list.erase(it); // Remove from list
} else {
++it;
}
}
// For simplicity, we are modifying global_retired_list directly here.
// In a real system, each thread would have its own retired list,
// and the reclaimer would collect them.
}
}
};
// Global instance of the manager
EpochManager g_epoch_manager;
// Thread-local data for managing retired nodes
thread_local int tl_thread_id_ebr = -1;
thread_local std::vector<std::pair<void*, long>> tl_retired_nodes_ebr; // Node pointer and epoch it was retired in
// Helper to get/assign thread ID
int get_or_assign_thread_id_ebr() {
if (tl_thread_id_ebr == -1) {
tl_thread_id_ebr = g_epoch_manager.register_thread();
}
return tl_thread_id_ebr;
}
// Retire a node (add to thread-local retired list with current epoch)
void retire_node_ebr(void* node) {
long current_epoch = g_epoch_manager.global_epoch.load(std::memory_order_relaxed);
tl_retired_nodes_ebr.push_back({node, current_epoch});
}
// Global retired list for simplicity in demonstration (in real EBR, reclaimer collects from thread-locals)
std::vector<std::pair<void*, long>> g_global_retired_list_ebr;
std::mutex g_global_retired_list_mutex; // For protecting g_global_retired_list_ebr in this simplified demo
// --- Example Lock-Free Stack using EBR ---
template <typename T>
class LockFreeStackEBR {
private:
struct Node {
T data;
Node* next;
Node(const T& d) : data(d), next(nullptr) {}
};
std::atomic<Node*> head;
public:
LockFreeStackEBR() : head(nullptr) {}
~LockFreeStackEBR() {
// Simplified cleanup
Node* current = head.load();
while (current) {
Node* next = current->next;
delete current;
current = next;
}
// Also need to clean up g_global_retired_list_ebr if any
// (This part is tricky in a full concurrent shutdown scenario)
std::lock_guard<std::mutex> lock(g_global_retired_list_mutex);
for(auto& pair : g_global_retired_list_ebr) {
delete static_cast<char*>(pair.first);
}
g_global_retired_list_ebr.clear();
}
void push(const T& value) {
Node* new_node = new Node(value);
Node* old_head;
do {
old_head = head.load(std::memory_order_relaxed);
new_node->next = old_head;
} while (!head.compare_exchange_weak(old_head, new_node,
std::memory_order_release,
std::memory_order_relaxed));
}
bool pop(T& result) {
int thread_id = get_or_assign_thread_id_ebr();
g_epoch_manager.enter_epoch(thread_id); // Enter critical section
Node* old_head;
Node* next_node;
do {
old_head = head.load(std::memory_order_acquire);
if (!old_head) {
g_epoch_manager.exit_epoch(thread_id); // Exit critical section
return false; // Stack is empty
}
next_node = old_head->next;
} while (!head.compare_exchange_weak(old_head, next_node,
std::memory_order_release,
std::memory_order_relaxed));
result = old_head->data; // Safe to access, as we are in a protected epoch
g_epoch_manager.exit_epoch(thread_id); // Exit critical section
retire_node_ebr(old_head); // Retire the node
// Periodically attempt reclamation
// In a real system, the reclaimer would gather all tl_retired_nodes_ebr
// For this demo, let's just push to a global list and let a "lucky" thread try to reclaim
if (tl_retired_nodes_ebr.size() > 100) { // Arbitrary threshold
std::lock_guard<std::mutex> lock(g_global_retired_list_mutex);
g_global_retired_list_ebr.insert(g_global_retired_list_ebr.end(),
tl_retired_nodes_ebr.begin(),
tl_retired_nodes_ebr.end());
tl_retired_nodes_ebr.clear(); // Clear thread-local list after moving
g_epoch_manager.advance_epoch_and_reclaim(g_global_retired_list_ebr);
}
return true;
}
};
// Example usage in main function (not part of the lecture, but for completeness)
/*
void ebr_test_thread(LockFreeStackEBR<int>& stack, int num_ops) {
for (int i = 0; i < num_ops; ++i) {
stack.push(i);
int val;
if (stack.pop(val)) {
// std::cout << "Popped: " << val << std::endl;
}
}
// Attempt final reclamation for this thread's retired nodes
std::lock_guard<std::mutex> lock(g_global_retired_list_mutex);
g_global_retired_list_ebr.insert(g_global_retired_list_ebr.end(),
tl_retired_nodes_ebr.begin(),
tl_retired_nodes_ebr.end());
tl_retired_nodes_ebr.clear();
g_epoch_manager.advance_epoch_and_reclaim(g_global_retired_list_ebr);
}
int main() {
LockFreeStackEBR<int> stack;
std::vector<std::thread> threads;
int num_threads = 4;
int ops_per_thread = 10000;
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(ebr_test_thread, std::ref(stack), ops_per_thread);
}
for (auto& t : threads) {
t.join();
}
std::cout << "Epoch-based Reclamation stack test finished." << std::endl;
// Final reclamation for any remaining nodes
std::lock_guard<std::mutex> lock(g_global_retired_list_mutex);
g_epoch_manager.advance_epoch_and_reclaim(g_global_retired_list_ebr);
return 0;
}
*/
3.4 优缺点分析
优点:
- 低读取器开销: 读取线程在访问共享数据时,只需要在进入/退出关键区时进行一次原子操作来注册/清除其纪元。这比危险指针的“每指针访问”开销低得多。
- 更好的读取器扩展性: 由于读取器开销低,EBR 在高并发读取场景下通常表现更好,不会因为频繁的原子操作和内存屏障而导致缓存行争用。
- 避免 ABA 问题: EBR 不直接关心指针是否被重用(ABA)。它通过纪元保证了在特定时间段内,所有可能持有旧引用的线程都已离开那个纪元,从而规避了 ABA 问题对内存回收的影响。
- 实现逻辑相对简单: 核心逻辑是基于纪元的整数比较,而不是复杂的指针集合管理。
缺点:
- 更高的内存占用: 节点可能需要在待回收列表中停留更长时间,等待多个纪元过去以及所有线程都达到“安静状态”。这会导致待回收列表(以及总内存)显著增长。
- 回收延迟: 如果有一个或多个线程长时间处于关键区,或由于调度原因长时间未能更新其纪元,那么回收过程可能会被阻塞,导致大量节点无法被释放,内存持续增长。这被称为“慢线程问题”。
- 粗粒度回收: EBR 只能回收一个完整纪元内的所有已退休节点,而不是像危险指针那样可以单独回收一个不再被引用的节点。
- “关键区”定义: 需要仔细定义哪些操作构成一个“关键区”,以便线程正确地注册和清除其纪元。错误的定义可能导致安全问题或不必要的性能损失。
- “假阳性”: 即使一个节点实际上已经安全,EBR 也可能因为某个线程仍处于“老”纪元而无法回收它,即使那个线程根本没有引用这个特定节点。
4. 两种策略的对比
现在,让我们用一个表格来清晰地对比这两种无锁内存回收策略的优劣。
| Feature/Criterion | 危险指针 (Hazard Pointers) | 基于纪元回收 (Epoch-based Reclamation, EBR) |
|---|---|---|
| 核心机制 | 线程注册它正在访问的特定指针。回收时检查所有危险指针。 | 线程注册它当前所属的“纪元”。回收时等待所有线程离开一个旧纪元。 |
| 回收粒度 | 细粒度:单个节点一旦不再被任何危险指针引用,就可以被回收。 | 粗粒度:批量回收在某个旧纪元退休的所有节点。 |
| 读者线程开销 | 高:每次读取共享指针并使用时,需要发布/清除危险指针(原子操作、内存屏障),并进行重读验证。 | 低:在进入/退出“关键区”时各一次原子操作(注册/清除纪元),与访问指针数量无关。 |
| 回收器开销 | 高:需要扫描所有活跃线程的所有危险指针(O(N*M),N=线程数,M=每线程危险指针数),并构建查找集合。 | 中等:需要扫描所有活跃线程的纪元注册(O(N)),然后遍历待回收列表。 |
| 回收延迟 | 低-中等:取决于回收频率。如果危险指针被清除,节点可以相对迅速被回收。 | 中等-高:受限于最慢的线程。如果某个线程长时间停留在旧纪元,大量节点可能被阻塞无法回收。 |
| 内存占用 | 中等:待回收列表可能增长;每个线程有少量危险指针。 | 高:待回收列表可能累积大量节点,等待纪元前进和所有线程安静。 |
| 可扩展性 (读取) | 中等:每次指针访问的原子操作和缓存行争用会随线程数增加而影响性能。 | 高:读取器开销与线程数无关,只在进入/退出关键区时有固定开销。 |
| 可扩展性 (回收) | 低:全局扫描危险指针是 O(N*M) 操作,随着线程数增加会成为瓶颈。 | 中等:全局扫描线程纪元是 O(N) 操作,比危险指针的回收更具扩展性。 |
| ABA 问题处理 | 通过“发布危险指针 + 重读验证”机制有效缓解。 | 通过纪元概念间接处理:节点在旧纪元退休,只要所有线程都通过该纪元,节点即安全。 |
| 实现复杂度 | 中等,需要仔细管理危险指针槽位和全局集合。 | 中等,需要仔细定义关键区和纪元推进逻辑。 |
| 适用场景 | 需要低回收延迟、对内存占用敏感、或者读取操作中对单个指针的保护需求强烈的场景。 | 读多写少、对读取器吞吐量要求高、对回收延迟有一定容忍度、内存资源相对充足的场景。 |
5. 混合方法与高级话题
除了这两种基本策略,还有一些相关的或者更高级的内存回收技术:
- RCU (Read-Copy Update): 广泛应用于 Linux 内核,是一种优化读取操作的并发机制。它与 EBR 有异曲同工之妙,也是通过“宽限期”(grace period)来确保在数据更新时,旧数据仍然对现有读者可见。RCU 的关键在于写入方复制数据、修改副本、然后原子地更新指针指向新副本,而读取方则在宽限期内继续访问旧数据。
- QSBR (Quiescent State Based Reclamation): 是 EBR 的一个变种,它将线程的“安静状态”(quiescent state)定义得更灵活。线程不是注册一个纪元,而是简单地周期性地通知系统它已经进入了一个安静状态(即它当前没有访问任何被保护的数据)。回收器等待所有线程都至少经历一次安静状态后,才能回收在之前某个时间点退休的节点。
- 引用计数(Reference Counting): 例如
std::shared_ptr。虽然它能解决内存回收问题,但通常不被认为是“无锁”的。因为shared_ptr的引用计数器本身是一个原子变量,其增减操作会引入原子操作和缓存一致性开销。在高度竞争的场景下,引用计数的原子操作可能成为瓶颈,且最后一个引用归零时的delete操作仍然可能触发问题。
6. 选择正确的策略
在实际应用中,选择危险指针还是 EBR 取决于具体的场景和需求:
- 如果你的应用程序对内存回收的延迟非常敏感,或者希望尽快释放内存以减少内存占用,并且读取操作相对不频繁,那么危险指针可能更适合。它能够更精细地控制单个节点的生命周期。
- 如果你的应用程序以高并发读取为主,对读取器的吞吐量要求极高,并且可以容忍一定的内存占用增长和回收延迟,那么EBR 通常是更好的选择。它显著降低了每个读取操作的开销,从而提高了整体吞吐量。
- 线程数量也是一个重要因素。如果线程数量很多,危险指针在回收时的全局扫描开销会非常大,EBR 的 O(N) 扫描可能更具优势。
- 数据结构的性质也会影响选择。有些数据结构可能更容易适应 EBR 的粗粒度纪元保护,而另一些则可能需要危险指针的精细控制。
无锁内存回收是构建高性能并发数据结构不可或缺的一部分。危险指针和基于纪元回收作为两种主流技术,各自在性能、复杂性和资源利用方面有着独特的权衡。深入理解它们的机制与优缺点,是设计健壮且高效的并发系统的关键。根据具体的应用场景,审慎选择或甚至结合这些技术,才能达到最佳效果。