各位编程领域的同仁们,
今天,我们将深入探讨一个在高性能、高并发系统中至关重要的概念:Hazard Pointers。在无锁(lock-free)编程的世界里,我们追求极致的并发性能,规避传统锁机制带来的开销和死锁风险。然而,无锁编程并非没有挑战,其中最棘手的问题之一就是内存安全地销毁。当多个线程同时操作共享数据结构时,一个线程可能正在访问一个内存区域,而另一个线程却认为该区域已不再需要,并将其释放。这会导致经典的“使用后释放”(use-after-free)错误,轻则程序崩溃,重则引发难以追踪的数据损坏或安全漏洞。
Hazard Pointers 正是为解决这一核心难题而生。它提供了一种机制,允许读者线程“声明”它们当前正在访问哪些内存区域,从而阻止写者(或删除者)线程在读者线程完成访问之前释放这些内存。
1. 无锁编程的诱惑与挑战
无锁编程旨在通过使用原子操作而非互斥锁来协调线程间的访问,从而实现并发。其优势显而易见:
- 性能提升: 避免了锁的上下文切换、内核态/用户态切换以及锁竞争带来的等待时间。
- 可伸缩性: 在多核处理器上,无锁算法通常能更好地扩展。
- 死锁规避: 根本上消除了死锁的可能性,因为没有锁可以被持有。
- 容错性: 即使一个线程在关键区域崩溃,也不会像持有锁一样导致整个系统停滞。
然而,无锁编程的复杂性也令人望而却步。你需要深入理解内存模型、原子操作、指令重排等底层机制。而其中最大的绊脚石,就是我们今天的主题——内存管理。
考虑一个简单的无锁链表或队列。当一个线程将一个节点从链表中移除时,它如何知道此时没有其他线程正在读取这个节点?如果贸然释放,那么正在读取的线程就会访问到一块无效的内存,或者更糟的是,这块内存已经被操作系统重新分配给了其他用途,导致数据混乱。这就是“使用后释放”的典型场景。
传统的解决方案,如垃圾回收(GC),在某些语言中是内置的,但对于C++这类需要手动管理内存的语言,或者在对延迟和资源控制有极高要求的场景(如操作系统内核、嵌入式系统),GC往往不是一个可行的选项。引用计数在无锁环境中极难正确实现,且容易陷入ABA问题和循环引用。
2. 内存回收的困境:一个例子
让我们通过一个简化的无锁栈的pop操作来具体化这个问题:
template<typename T>
struct Node {
T value;
std::atomic<Node<T>*> next;
Node(T val) : value(val), next(nullptr) {}
};
template<typename T>
class LockFreeStack {
private:
std::atomic<Node<T>*> head;
public:
// ... 构造函数等
T pop() {
Node<T>* old_head = head.load(std::memory_order_relaxed);
Node<T>* new_head;
do {
if (old_head == nullptr) {
throw std::runtime_error("Stack is empty");
}
new_head = old_head->next.load(std::memory_order_relaxed);
} while (!head.compare_exchange_weak(old_head, new_head,
std::memory_order_release,
std::memory_order_relaxed));
// 问题出现:此时old_head指向的节点已经从栈中移除。
// 其他线程可能仍在访问 old_head。
// 如果在此处直接 delete old_head; 就会有 use-after-free 的风险。
T result = old_head->value; // 假设这里是安全的
// delete old_head; // 危险!
// 那么,何时才能安全地 delete old_head 呢?
return result;
}
// ... push 等
};
在上述pop操作中,一旦compare_exchange_weak成功,old_head所指向的节点就不再是栈的一部分了。从逻辑上讲,它可以被回收。但关键在于,另一个线程T2可能在T1执行old_head = head.load(...)之后,T1执行new_head = old_head->next.load(...)之前,也获取了同样的old_head指针并开始对其进行操作。如果T1此时直接delete old_head,那么T2的后续访问将是无效的,导致崩溃。
3. Hazard Pointers 的核心思想
Hazard Pointers 机制的核心思想非常直观:让读者线程声明它们正在访问的指针,删除者线程在回收内存之前检查这些声明。
具体来说:
- 声明危险(Hazard): 当一个线程(通常是读者或准备删除的线程)要访问一个可能被其他线程删除的共享数据结构中的节点时,它会将该节点的地址复制到一个特殊的、公开的“危险指针”槽中。这个槽是全局可见的,表示“我正在看这个地址,请勿删除”。
- 退休节点(Retire): 当一个线程完成对某个节点的逻辑移除(例如,从链表中解链),但不能立即删除它时,它会将该节点标记为“退休”。退休的节点不会被立即释放,而是放入一个待处理的“退休列表”中。
- 扫描与回收(Scan and Reclaim): 当退休列表积累到一定数量或者在特定时机(例如,每次操作后),负责回收的线程(通常是删除者线程自己,或者一个专门的回收线程)会遍历其退休列表。对于列表中的每个节点,它会检查所有活跃线程发布的所有危险指针。
- 如果一个退休节点没有被任何一个危险指针指向,这意味着没有任何活跃线程正在访问它,此时可以安全地释放该节点。
- 如果一个退休节点被至少一个危险指针指向,那么它暂时不能被释放,必须留在退休列表中等待下一次扫描。
通过这种机制,危险指针充当了“安全哨兵”,确保在任何线程仍在潜在访问某块内存时,这块内存不会被释放。
4. Hazard Pointers 机制的详细实现
为了实现 Hazard Pointers,我们需要以下几个核心组件:
4.1 全局危险指针数组 (Hazard Pointer Array)
这是最核心的部分。它是一个全局共享的数组,每个活跃线程都在其中拥有一个或多个槽位,用于发布其危险指针。
HP_MAX_THREADS: 系统中可能同时活跃的最大线程数。HP_MAX_POINTERS: 每个线程可能同时引用的最大“危险”指针数量。例如,在双向链表中,一个线程可能同时引用当前节点和其下一个节点,就需要两个危险指针。- *`std::atomic<void>`:** 每个危险指针槽位都应该是原子的,以确保线程安全地发布和读取。
我们可以用一个二维数组来表示:
// hazard_pointers_manager.h
#include <atomic>
#include <vector>
#include <thread>
#include <memory>
#include <stdexcept>
#include <functional> // For std::function
// 假设我们支持的最大线程数和每个线程可能持有的最大危险指针数量
// 实际应用中需要根据并发程度和数据结构复杂性调整
static constexpr int HP_MAX_THREADS = 128; // 最大线程数
static constexpr int HP_MAX_POINTERS = 3; // 每个线程的最大危险指针数
// 全局危险指针表
// hazard_pointers[thread_id][hp_index]
inline std::atomic<void*> hazard_pointers[HP_MAX_THREADS][HP_MAX_POINTERS];
// 线程ID分配器(简单的实现,实际可能需要更复杂的无锁ID分配器)
inline std::atomic<int> next_thread_id_counter{0};
// 线程本地数据,用于管理Hazard Pointers和退休列表
struct ThreadData {
int thread_id;
// 退休节点列表,存储待回收的指针
std::vector<void*> retired_nodes;
// 回收阈值:当retired_nodes大小达到此值时触发回收
static constexpr size_t RETIRED_LIST_THRESHOLD = 256;
// 存储节点的删除器,用于自定义类型或复杂析构
std::vector<std::pair<void*, std::function<void(void*)>>> retired_deleters;
ThreadData() : thread_id(-1) {} // 初始为无效ID
};
// 线程本地存储,用于获取当前线程的ThreadData
// thread_local 保证每个线程有自己的ThreadData实例
inline thread_local ThreadData current_thread_data;
// Hazard Pointers 管理器
class HazardPointersManager {
public:
HazardPointersManager() {
// 注册线程ID
if (current_thread_data.thread_id == -1) {
current_thread_data.thread_id = next_thread_id_counter.fetch_add(1, std::memory_order_relaxed);
if (current_thread_data.thread_id >= HP_MAX_THREADS) {
throw std::runtime_error("Exceeded maximum number of threads for Hazard Pointers.");
}
// 初始化当前线程的危险指针槽
for (int i = 0; i < HP_MAX_POINTERS; ++i) {
hazard_pointers[current_thread_data.thread_id][i].store(nullptr, std::memory_order_relaxed);
}
}
}
~HazardPointersManager() {
// 线程退出时清除其危险指针,并尝试回收其退休列表
int tid = current_thread_data.thread_id;
if (tid != -1) {
for (int i = 0; i < HP_MAX_POINTERS; ++i) {
hazard_pointers[tid][i].store(nullptr, std::memory_order_relaxed);
}
// 线程退出前执行一次回收,清理所有退休节点
scan_and_reclaim();
// 在实际系统中,next_thread_id_counter可能需要减少,或使用更复杂的ID重用机制
}
}
// 获取一个危险指针槽位的引用,用于设置/清除指针
std::atomic<void*>& get_hp_slot(int index) {
if (index < 0 || index >= HP_MAX_POINTERS) {
throw std::out_of_range("Hazard Pointer index out of bounds.");
}
return hazard_pointers[current_thread_data.thread_id][index];
}
// 设置危险指针
void set_hazard_pointer(int index, void* ptr) {
get_hp_slot(index).store(ptr, std::memory_order_release); // release ensures visibility
}
// 清除危险指针
void clear_hazard_pointer(int index) {
get_hp_slot(index).store(nullptr, std::memory_order_release); // release ensures visibility
}
// 将节点添加到当前线程的退休列表
template<typename T>
void retire_node(T* node) {
if (node == nullptr) return;
current_thread_data.retired_nodes.push_back(node);
if (current_thread_data.retired_nodes.size() >= ThreadData::RETIRED_LIST_THRESHOLD) {
scan_and_reclaim();
}
}
// 将节点添加到当前线程的退休列表,并指定自定义删除器
void retire_node_with_deleter(void* node, std::function<void(void*)> deleter) {
if (node == nullptr) return;
current_thread_data.retired_deleters.push_back({node, deleter});
if (current_thread_data.retired_deleters.size() >= ThreadData::RETIRED_LIST_THRESHOLD) {
scan_and_reclaim();
}
}
// 扫描所有危险指针并回收退休节点
void scan_and_reclaim() {
// 1. 收集所有活跃的危险指针值
std::vector<void*> active_hps;
active_hps.reserve(HP_MAX_THREADS * HP_MAX_POINTERS); // 预留空间
for (int i = 0; i < next_thread_id_counter.load(std::memory_order_relaxed); ++i) {
for (int j = 0; j < HP_MAX_POINTERS; ++j) {
void* hp_val = hazard_pointers[i][j].load(std::memory_order_acquire); // acquire ensures visibility of hazard pointers
if (hp_val != nullptr) {
active_hps.push_back(hp_val);
}
}
}
// 对活跃的危险指针排序,以便快速查找(可选,但通常有助于性能)
std::sort(active_hps.begin(), active_hps.end());
// 2. 遍历当前线程的退休节点列表,尝试回收
std::vector<void*> new_retired_nodes;
new_retired_nodes.reserve(current_thread_data.retired_nodes.size());
for (void* node : current_thread_data.retired_nodes) {
// 检查node是否在active_hps中
bool is_hazardous = std::binary_search(active_hps.begin(), active_hps.end(), node);
if (is_hazardous) {
// 仍然被引用,不能回收,放回新的退休列表
new_retired_nodes.push_back(node);
} else {
// 没有被引用,安全回收
delete static_cast<char*>(node); // 假设是new char[]分配的,或者简单delete
}
}
current_thread_data.retired_nodes = std::move(new_retired_nodes); // 更新退休列表
// 3. 处理带自定义删除器的退休节点
std::vector<std::pair<void*, std::function<void(void*)>>> new_retired_deleters;
new_retired_deleters.reserve(current_thread_data.retired_deleters.size());
for (auto& entry : current_thread_data.retired_deleters) {
void* node = entry.first;
std::function<void(void*)> deleter = entry.second;
bool is_hazardous = std::binary_search(active_hps.begin(), active_hps.end(), node);
if (is_hazardous) {
new_retired_deleters.push_back(entry);
} else {
deleter(node); // 使用自定义删除器
}
}
current_thread_data.retired_deleters = std::move(new_retired_deleters);
}
};
// 全局Hazard Pointers管理器实例,确保每个线程首次访问时进行初始化
// 推荐在每个需要使用HP的线程入口处创建或确保其存在
inline thread_local HazardPointersManager hp_manager;
4.2 内存序 (Memory Order)
理解内存序在Hazard Pointers中至关重要:
- 设置危险指针 (
set_hazard_pointer): 使用std::memory_order_release。这确保了在设置危险指针之前对共享数据的所有操作都已完成并对其他线程可见。 - 读取危险指针 (
loadinscan_and_reclaim): 使用std::memory_order_acquire。这确保了在读取危险指针之后,对共享数据的任何后续访问都将看到最新的值。 - 清除危险指针 (
clear_hazard_pointer): 使用std::memory_order_release。 - 读取共享数据结构中的指针 (
head.load(),node->next.load()等): 通常使用std::memory_order_acquire。这确保了在加载指针后,对该指针指向的数据的访问是可见的。 - 更新共享数据结构中的指针 (
head.compare_exchange_weak): 通常使用std::memory_order_release。这确保了在更新指针之前,所有相关的数据修改都已完成并对其他线程可见。
4.3 HazardPointersManager 的使用模式
一个典型的使用 Hazard Pointers 的操作流程(以pop为例):
-
获取危险指针槽: 调用
hp_manager.get_hp_slot(index)获取一个空闲的危险指针槽位。 -
加载头节点并设置危险指针:
Node* p = head.load(std::memory_order_acquire);hp_manager.set_hazard_pointer(0, p);// 声明我正在看p-
CAS循环: 在循环中重新加载
head,因为在设置危险指针和 CAS 之间head可能已经改变。需要确保p在设置危险指针时仍然是head的最新值。// 核心循环 Node<T>* old_head = nullptr; Node<T>* new_head = nullptr; do { old_head = head.load(std::memory_order_acquire); // 重新加载 hp_manager.set_hazard_pointer(0, old_head); // 再次设置危险指针 if (old_head == nullptr) { // Stack empty. Clear HP and return. hp_manager.clear_hazard_pointer(0); throw std::runtime_error("Stack is empty"); } // 关键检查:确保设置的危险指针仍然指向当前的 old_head // 这是一个双重检查机制,防止在设置HP和加载old_head之间发生ABA或其他问题 // 理论上,如果old_head在设置HP后被其他线程改变(并被回收), // 那么此时hp_manager.get_hp_slot(0).load()就不会等于old_head。 // 但更重要的是,如果old_head被回收,那么它将不会在retired_nodes中, // 并且hp_manager.get_hp_slot(0)将指向一个无效地址。 // 这里更安全的做法是,加载old_head之后立即设置HP,然后再次加载head, // 如果两次加载的head不同,则重新尝试。 if (head.load(std::memory_order_acquire) != old_head) { continue; // head changed, retry } new_head = old_head->next.load(std::memory_order_relaxed); // 访问 old_head->next } while (!head.compare_exchange_weak(old_head, new_head, std::memory_order_release, std::memory_order_relaxed));重要说明: 上述双重检查
if (head.load(...) != old_head)是为了防止在加载old_head和设置其为危险指针之间,head被另一个线程修改。如果没有这个检查,hp可能指向一个已经不是head的节点,但我们仍然尝试 CAS。虽然这个检查增加了健壮性,但真正的安全保证来自回收时的检查。一个更简洁且同样安全的模式是,在循环中不断尝试设置危险指针并加载head,直到hp指向的节点和head加载的节点一致。
-
使用节点: 此时可以安全地访问
old_head指向的节点,因为它的地址已经发布为危险指针,不会被回收。 -
退休节点:
hp_manager.retire_node(old_head);将old_head添加到当前线程的退休列表。 -
清除危险指针:
hp_manager.clear_hazard_pointer(0);任务完成后,清除危险指针,解除对old_head的保护。
4.4 改进的LockFreeStack示例
#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
#include <stdexcept>
#include <functional>
#include <algorithm> // For std::sort, std::binary_search
// 引入Hazard Pointers管理器的定义
// (假设 hazard_pointers_manager.h 的内容已经包含在前面,为了演示方便,这里直接写在一起)
// 定义Hazard Pointers相关的静态和全局变量
static constexpr int HP_MAX_THREADS = 128;
static constexpr int HP_MAX_POINTERS = 3;
inline std::atomic<void*> hazard_pointers[HP_MAX_THREADS][HP_MAX_POINTERS];
inline std::atomic<int> next_thread_id_counter{0};
struct ThreadData {
int thread_id;
std::vector<void*> retired_nodes;
static constexpr size_t RETIRED_LIST_THRESHOLD = 256;
std::vector<std::pair<void*, std::function<void(void*)>>> retired_deleters;
ThreadData() : thread_id(-1) {}
};
inline thread_local ThreadData current_thread_data;
class HazardPointersManager {
public:
HazardPointersManager() {
if (current_thread_data.thread_id == -1) {
current_thread_data.thread_id = next_thread_id_counter.fetch_add(1, std::memory_order_relaxed);
if (current_thread_data.thread_id >= HP_MAX_THREADS) {
throw std::runtime_error("Exceeded maximum number of threads for Hazard Pointers.");
}
for (int i = 0; i < HP_MAX_POINTERS; ++i) {
hazard_pointers[current_thread_data.thread_id][i].store(nullptr, std::memory_order_relaxed);
}
}
}
~HazardPointersManager() {
int tid = current_thread_data.thread_id;
if (tid != -1) {
for (int i = 0; i < HP_MAX_POINTERS; ++i) {
hazard_pointers[tid][i].store(nullptr, std::memory_order_release); // Ensure cleared HPs are visible
}
scan_and_reclaim(); // Reclaim any remaining retired nodes
}
}
std::atomic<void*>& get_hp_slot(int index) {
if (index < 0 || index >= HP_MAX_POINTERS) {
throw std::out_of_range("Hazard Pointer index out of bounds.");
}
return hazard_pointers[current_thread_data.thread_id][index];
}
void set_hazard_pointer(int index, void* ptr) {
get_hp_slot(index).store(ptr, std::memory_order_release);
}
void clear_hazard_pointer(int index) {
get_hp_slot(index).store(nullptr, std::memory_order_release);
}
template<typename T>
void retire_node(T* node) {
if (node == nullptr) return;
// 使用 lambda 捕获 T 的类型信息,以便正确调用 delete
current_thread_data.retired_deleters.push_back({node, [](void* p){ delete static_cast<T*>(p); }});
if (current_thread_data.retired_deleters.size() >= ThreadData::RETIRED_LIST_THRESHOLD) {
scan_and_reclaim();
}
}
void scan_and_reclaim() {
std::vector<void*> active_hps;
active_hps.reserve(HP_MAX_THREADS * HP_MAX_POINTERS);
// Collect all active hazard pointers
int current_max_tid = next_thread_id_counter.load(std::memory_order_acquire); // Acquire for latest tid
for (int i = 0; i < current_max_tid; ++i) {
for (int j = 0; j < HP_MAX_POINTERS; ++j) {
void* hp_val = hazard_pointers[i][j].load(std::memory_order_acquire);
if (hp_val != nullptr) {
active_hps.push_back(hp_val);
}
}
}
std::sort(active_hps.begin(), active_hps.end()); // Sorting for binary search
// Reclaim nodes with custom deleters
std::vector<std::pair<void*, std::function<void(void*)>>> new_retired_deleters;
new_retired_deleters.reserve(current_thread_data.retired_deleters.size());
for (auto& entry : current_thread_data.retired_deleters) {
void* node = entry.first;
std::function<void(void*)> deleter = entry.second;
bool is_hazardous = std::binary_search(active_hps.begin(), active_hps.end(), node);
if (is_hazardous) {
new_retired_deleters.push_back(entry);
} else {
deleter(node); // Safe to delete
}
}
current_thread_data.retired_deleters = std::move(new_retired_deleters);
}
};
inline thread_local HazardPointersManager hp_manager;
// Lock-Free Stack using Hazard Pointers
template<typename T>
struct Node {
T value;
std::atomic<Node<T>*> next;
Node(T val) : value(val), next(nullptr) {}
};
template<typename T>
class LockFreeStack {
private:
std::atomic<Node<T>*> head;
public:
LockFreeStack() : head(nullptr) {}
~LockFreeStack() {
// 清理所有剩余节点
while (pop_raw() != nullptr);
// 确保所有退休节点都被回收
hp_manager.scan_and_reclaim();
}
void push(T value) {
Node<T>* new_node = new Node<T>(value);
new_node->next.store(head.load(std::memory_order_relaxed), std::memory_order_relaxed);
while (!head.compare_exchange_weak(new_node->next, new_node,
std::memory_order_release,
std::memory_order_relaxed));
}
T pop() {
Node<T>* node_to_delete = pop_raw();
if (node_to_delete == nullptr) {
throw std::runtime_error("Stack is empty");
}
T result = node_to_delete->value;
hp_manager.retire_node(node_to_delete); // 将节点退休
return result;
}
// 内部函数,用于获取节点指针,但不进行立即回收
Node<T>* pop_raw() {
Node<T>* old_head = nullptr;
Node<T>* new_head = nullptr;
// 获取并设置第一个危险指针
std::atomic<void*>& hp1 = hp_manager.get_hp_slot(0);
do {
old_head = head.load(std::memory_order_acquire);
hp1.store(old_head, std::memory_order_release); // 声明正在访问 old_head
// 关键的 ABA 检查:在设置危险指针后,head可能已经改变
// 如果 head 改变了,说明 old_head 已经不是当前的栈顶了
// 此时必须重新从 head 加载,并重新设置 hazard pointer
if (head.load(std::memory_order_acquire) != old_head) {
continue; // head changed, retry loop
}
if (old_head == nullptr) {
hp1.store(nullptr, std::memory_order_release); // 栈空,清除危险指针
return nullptr;
}
new_head = old_head->next.load(std::memory_order_relaxed);
} while (!head.compare_exchange_weak(old_head, new_head,
std::memory_order_release,
std::memory_order_relaxed));
hp1.store(nullptr, std::memory_order_release); // CAS成功,清除危险指针
return old_head; // 返回要处理的节点
}
bool empty() const {
return head.load(std::memory_order_acquire) == nullptr;
}
};
// 简单测试函数
void stack_worker(LockFreeStack<int>& stack, int num_ops, int thread_id) {
// 确保每个线程在使用 HazardPointersManager 前都进行了初始化
// (hp_manager 是 thread_local,首次访问会自动构造)
std::cout << "Thread " << thread_id << " started." << std::endl;
for (int i = 0; i < num_ops; ++i) {
if (i % 2 == 0) {
stack.push(thread_id * 1000 + i);
} else {
try {
stack.pop();
} catch (const std::runtime_error& e) {
// Stack might be empty due to other threads popping
}
}
}
std::cout << "Thread " << thread_id << " finished." << std::endl;
}
int main() {
LockFreeStack<int> stack;
int num_threads = 4;
int ops_per_thread = 100000;
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(stack_worker, std::ref(stack), ops_per_thread, i);
}
for (auto& t : threads) {
t.join();
}
std::cout << "All threads finished. Final stack state:" << std::endl;
int count = 0;
while (!stack.empty()) {
try {
stack.pop();
count++;
} catch (const std::runtime_error& e) {
break;
}
}
std::cout << "Remaining elements in stack: " << count << std::endl;
return 0;
}
代码解释:
-
HazardPointersManager:thread_local HazardPointersManager hp_manager;:每个线程都有一个独立的HazardPointersManager实例,它负责管理该线程的危险指针槽位和退休列表。- 构造函数中,每个线程会获取一个唯一的
thread_id,并将其危险指针槽位初始化为nullptr。 get_hp_slot(index):返回当前线程指定索引的原子指针引用。set_hazard_pointer/clear_hazard_pointer:设置/清除危险指针。retire_node<T>(T* node):将一个T类型的节点添加到当前线程的退休列表。这里使用了std::function来存储特定类型的删除器,使得scan_and_reclaim可以通用地处理不同类型的节点。scan_and_reclaim():这是核心的回收逻辑。- 它首先遍历所有线程(通过
next_thread_id_counter知道有多少个线程注册过),收集它们当前发布的所有非nullptr的危险指针。 - 将这些危险指针排序,以便后续使用二分查找提高效率。
- 然后,遍历当前线程的
retired_deleters列表。对于每个退休节点,检查它是否在active_hps中。 - 如果不在,则调用其关联的删除器安全地释放内存。
- 如果仍在,则将其保留在新的
retired_deleters列表中,等待下一次扫描。 - 当
retired_deleters达到RETIRED_LIST_THRESHOLD时,会自动触发一次扫描。
- 它首先遍历所有线程(通过
-
LockFreeStack:pop_raw():这是实际移除节点的逻辑。- 它在循环中不断尝试获取
head,并立即将其地址设置到危险指针hp1中。 if (head.load(...) != old_head)这个检查非常关键,它解决了pop操作中,在加载old_head和设置hp1之间,head被其他线程修改而导致hp1指向“旧的”head的问题。如果head改变了,说明old_head已经不是栈顶,需要重新尝试。- 一旦CAS成功,
old_head就从栈中逻辑移除。此时,由于old_head的地址仍在hp1中,它是安全的。pop_raw返回old_head。 - CAS成功后,必须清除危险指针! 否则该
old_head将永远无法被回收。
- 它在循环中不断尝试获取
pop():调用pop_raw()获取节点,然后调用hp_manager.retire_node()将其加入退休列表,最后返回节点的值。- 析构函数和
main函数中的测试代码展示了如何创建和使用栈,并确保在程序结束时清理所有资源。
5. Hazard Pointers 的性能考量与权衡
Hazard Pointers 提供了一个强大的安全机制,但它并非没有代价:
5.1 内存开销
- 危险指针数组:
HP_MAX_THREADS * HP_MAX_POINTERS个std::atomic<void*>指针。对于少量线程和少量危险指针,这个开销可以忽略不计。但如果线程数和每个线程的危险指针数很多,内存占用会增加。 - 退休列表: 每个线程都有自己的退休列表,用于存储待回收的节点。在回收不及时的情况下,这个列表可能会增长,导致内存使用量的临时增加。
5.2 CPU 开销
- 设置/清除危险指针: 每次关键操作(如
pop)都需要至少设置和清除一次危险指针。这些是原子操作,比普通指针操作开销大。 - 扫描与回收: 这是开销最大的部分。
- 收集活跃危险指针: 需要遍历所有活跃线程的危险指针槽,进行原子加载,并可能进行排序。这会随着线程数的增加而线性增长。
- 检查退休节点: 对于每个退休节点,都需要在活跃危险指针列表中进行查找(例如二分查找),这会增加回收的计算量。
- 回收频率: 如果回收频率过高(
RETIRED_LIST_THRESHOLD过小),会频繁进行昂贵的扫描操作。如果频率过低(RETIRED_LIST_THRESHOLD过大),则退休列表会增长,内存占用增加,且回收的延迟变大。
5.3 回收延迟
节点不会被立即回收,而是进入退休列表,等待后续的扫描。这意味着内存不会立即返回给系统。在某些对内存占用有严格限制的实时系统中,这可能是一个问题。
5.4 ABA 问题
Hazard Pointers 并不能直接解决 ABA 问题。ABA 问题是关于指针值循环利用的逻辑问题,即一个指针从 A 变为 B 又变回 A。Hazard Pointers 仅保证当一个线程正在访问地址 A 时,该地址不会被释放。如果地址 A 被释放后又被重新分配,并且新分配的内存又恰好等于 A,那么 Hazard Pointers 不会阻止这种重用。要解决 ABA 问题,通常需要结合版本计数器(tagged pointers)来实现。
例如,一个节点的指针可以是一个结构体,包含实际地址和版本号:struct TaggedPointer { Node* ptr; size_t tag; };。每次修改指针时,都增加tag。CAS操作需要同时检查ptr和tag。
5.5 调优
HP_MAX_POINTERS: 根据数据结构复杂性确定。例如,链表通常需要一个(当前节点),双向链表或需要同时访问父子节点可能需要两个或更多。RETIRED_LIST_THRESHOLD: 这是一个平衡点。过小会导致频繁回收(CPU开销),过大可能导致内存占用过高。通常通过性能测试来确定最佳值。- 回收时机: 除了达到阈值自动回收,也可以在特定操作(如线程退出、空闲时)手动触发回收。
6. 替代方案简述
虽然 Hazard Pointers 是一个非常有效的方案,但也有其他内存回收机制,它们各有优劣:
- Epoch-Based Reclamation (EBR): 基于“时代”(epoch)的概念。线程在进入和退出关键区时声明其当前时代。回收者只回收那些在所有活跃线程声明的时代之前退休的节点。通常比 Hazard Pointers 简单,且在许多场景下性能更优,因为它避免了对每个节点进行逐个检查。
- Read-Copy-Update (RCU): 主要用于 Linux 内核。它允许读者在不加锁的情况下并发访问数据,而写者则通过创建数据的副本、修改副本,然后原子地更新指针来替换旧数据。旧数据会在所有活跃读者都完成访问后才被回收。RCU 非常强大,但实现复杂,且并非所有场景都适用。
- Quiescent State Reclamation (QSR): 是 EBR 的泛化,线程通过进入“静止状态”来表明它们没有持有任何共享指针。
- Lock-Free Garbage Collection: 某些语言(如 Java, Go)内置的垃圾回收器就是无锁的,它们由运行时环境管理。
- Delegated Reclamation: 将内存回收的任务委托给一个或多个后台线程。删除者将节点放入一个队列,后台线程专门负责回收。这可以平摊回收开销,减少主线程的延迟。
7. 实践建议与适用场景
Hazard Pointers 适用于以下场景:
- C/C++ 等需要手动内存管理的语言环境。
- 构建高性能、低延迟的无锁数据结构,如链表、队列、哈希表等。
- 不希望引入语言运行时垃圾回收机制的系统。
- 对内存回收时机有一定容忍度,允许一定程度的内存延迟释放。
在决定使用 Hazard Pointers 时,务必进行详细的性能测试和基准测试,以确定其开销是否符合你的应用需求。同时,正确理解和实现内存序是确保算法正确性的关键。
结语
Hazard Pointers 是无锁编程工具箱中的一个强大而优雅的工具,它提供了一种在多线程环境中安全管理动态内存的机制。通过让读者主动声明其访问意图,并由删除者在回收前进行检查,我们有效地规避了“使用后释放”这一并发编程中的顽疾。理解其工作原理、性能权衡以及与其他技术的协同使用,将使您能够构建出更加健壮、高效和可伸缩的并发系统。