各位编程专家和高性能计算爱好者,大家好!
今天,我们将深入探讨一个在高性能计算(HPC)领域至关重要的话题:C++ 自定义分配器(Allocators)及其在处理 NUMA(Non-Uniform Memory Access,非统一内存访问)亲和性方面的应用。随着现代处理器架构的演进,尤其是多核、多插槽系统的普及,内存访问模式对程序性能的影响日益显著。默认的内存分配策略往往无法充分利用硬件特性,甚至可能成为性能瓶颈。通过定制内存分配器,我们能够精细控制内存布局,实现与 NUMA 架构的深度适配,从而在数据密集型应用中获得显著的性能提升。
1. C++ 内存管理基石与 NUMA 挑战
在 C++ 中,我们通常使用 new 和 delete 运算符来动态分配和释放内存。这些操作符底层通常依赖于 C 语言的 malloc 和 free 函数,而 malloc 和 free 又会与操作系统进行交互,请求或归还内存页。对于大多数通用应用程序而言,这种默认的内存管理方式是足够高效且方便的。
然而,在高性能计算场景下,默认分配器的局限性开始显现:
- 性能开销: 频繁的小对象分配和释放可能导致高昂的系统调用开销和内存碎片。
- 预测性差: 无法保证内存分配的局部性,这在缓存敏感型应用中是致命的。
- 硬件无感知: 默认分配器通常对底层硬件架构(如缓存层次、NUMA 拓扑)一无所知,无法做出优化的决策。
这正是 NUMA 架构带来的挑战。NUMA 是一种多处理器系统架构,其中每个处理器或一组处理器都有其自己的本地内存控制器和内存。这意味着处理器访问本地内存的速度要比访问其他处理器(或 NUMA 节点)的远程内存快得多。这种速度差异可能高达数倍甚至一个数量级。
NUMA 架构核心概念:
- NUMA 节点 (NUMA Node): 一个包含一个或多个 CPU 核、一个内存控制器和其直接连接的本地内存的单元。
- 本地内存 (Local Memory): CPU 访问其所在 NUMA 节点上的内存。访问速度最快,延迟最低。
- 远程内存 (Remote Memory): CPU 访问其他 NUMA 节点上的内存。访问速度较慢,延迟较高,因为数据需要通过互连总线(如 Intel QPI/UPI, AMD Infinity Fabric)传输。
图示 NUMA 拓扑 (概念性):
假设我们有一个双插槽系统:
- NUMA Node 0: CPU 0, Memory 0
- NUMA Node 1: CPU 1, Memory 1
如果运行在 CPU 0 上的线程尝试访问分配在 Memory 1 上的数据,这就是一次远程内存访问。这种跨节点访问会导致额外的延迟,降低程序的整体性能。在 HPC 应用中,数据量通常非常庞大,远程内存访问的累积效应可能导致计算效率大幅下降。
因此,我们的目标是:确保运行在某个 NUMA 节点上的线程,尽可能地访问分配在该节点上的内存。 这就是所谓的 NUMA 亲和性内存分配策略。为了实现这一目标,我们需要 C++ 自定义分配器。
2. C++ 标准库分配器模型:std::allocator 及其接口
C++ 标准库容器(如 std::vector, std::list, std::map 等)都接受一个模板参数,用于指定内存分配器。默认情况下,它们使用 std::allocator<T>,这是一个简单地包装了 ::operator new 和 ::operator delete 的分配器。
一个符合 C++ 标准的自定义分配器必须提供一套特定的接口,以便与标准库容器无缝协作。这个接口在 C++11 之后主要由 std::allocator_traits 辅助定义和使用,但在 C++03 时代以及为了理解底层机制,直接实现这些成员函数仍然是最佳实践。
核心接口要求 (以 C++11/14/17 风格):
| 成员类型或函数 | 描述 |
|---|---|
value_type |
分配器所能分配的单个元素的类型。 |
pointer |
指向 value_type 的指针类型,通常是 T*。 |
const_pointer |
指向 const value_type 的指针类型,通常是 const T*。 |
reference |
value_type 的引用类型,通常是 T&。 |
const_reference |
const value_type 的引用类型,通常是 const T&。 |
size_type |
无符号整数类型,足以表示最大可分配对象的大小。通常是 std::size_t。 |
difference_type |
有符号整数类型,足以表示两个指针之间的距离。通常是 std::ptrdiff_t。 |
propagate_on_container_copy_assignment |
布尔值,指示容器在复制赋值时是否应复制其分配器。通常为 std::false_type。 |
propagate_on_container_move_assignment |
布尔值,指示容器在移动赋值时是否应移动其分配器。通常为 std::true_type。 |
propagate_on_container_swap |
布尔值,指示容器在交换时是否应交换其分配器。通常为 std::false_type。 |
is_always_equal |
C++17 新增。布尔值,表示此分配器类型的所有实例是否总是相等。如果为 true_type,则容器可以优化其行为,例如避免存储分配器或在某些操作中避免复制它。 |
rebind<U>::other |
C++11 之前的标准要求。一个嵌套的模板结构,用于获取一个可以分配 U 类型对象的分配器。在 C++11 及以后,std::allocator_traits 会自动处理 rebind 逻辑,通常无需显式提供。 |
allocate(n) |
分配 n 个 value_type 对象的原始内存,并返回指向第一个对象的指针。不进行对象构造。可能抛出 std::bad_alloc。 |
deallocate(p, n) |
释放由 allocate 分配的 p 指向的 n 个 value_type 对象的内存。不进行对象析构。 |
construct(p, args...) |
在 p 指向的内存处构造一个 value_type 对象,使用 args 进行初始化。C++17 之后,std::allocator 默认不提供 construct 和 destroy,而是通过 std::allocator_traits 调用全局的 ::new 和 ::delete 或 std::construct_at。但在自定义分配器中提供它们是常见的。 |
destroy(p) |
销毁 p 指向的 value_type 对象,但不释放内存。C++17 之后同上。 |
max_size() |
返回分配器可以分配的最大元素数量。 |
operator==(const Alloc&, const Alloc&) |
比较两个分配器实例是否相等。如果相等,则它们可以互相释放对方分配的内存。 |
operator!=(const Alloc&, const Alloc&) |
比较两个分配器实例是否不相等。 |
一个最简单的自定义分配器示例 (包装 malloc/free):
#include <cstddef> // For std::size_t, std::ptrdiff_t
#include <new> // For placement new, std::bad_alloc
#include <limits> // For std::numeric_limits
#include <iostream>
template <typename T>
class MallocAllocator {
public:
// 类型定义
using value_type = T;
using pointer = T*;
using const_pointer = const T*;
using reference = T&;
using const_reference = const T&;
using size_type = std::size_t;
using difference_type = std::ptrdiff_t;
// 默认构造函数
MallocAllocator() = default;
// 拷贝构造函数,允许从不同类型的分配器构造
template <typename U>
MallocAllocator(const MallocAllocator<U>&) noexcept {}
// 内存分配
pointer allocate(size_type n) {
if (n == 0) return nullptr;
if (n > std::numeric_limits<size_type>::max() / sizeof(T)) {
throw std::bad_alloc(); // 避免乘法溢出
}
void* p = std::malloc(n * sizeof(T));
if (!p) {
throw std::bad_alloc();
}
std::cout << "Allocated " << n * sizeof(T) << " bytes at " << p << std::endl;
return static_cast<pointer>(p);
}
// 内存释放
void deallocate(pointer p, size_type n) noexcept {
(void)n; // n在这里可能不会被使用,但标准要求参数存在
std::cout << "Deallocated " << n * sizeof(T) << " bytes at " << p << std::endl;
std::free(p);
}
// 对象构造 (C++17 之前需要,之后可选)
template <typename U, typename... Args>
void construct(U* p, Args&&... args) {
new (p) U(std::forward<Args>(args)...); // placement new
}
// 对象析构 (C++17 之前需要,之后可选)
template <typename U>
void destroy(U* p) {
p->~U();
}
// 最大可分配大小
size_type max_size() const noexcept {
return std::numeric_limits<size_type>::max() / sizeof(T);
}
// 分配器比较 (所有 MallocAllocator 实例都等价)
bool operator==(const MallocAllocator& other) const noexcept { return true; }
bool operator!=(const MallocAllocator& other) const noexcept { return false; }
// Rebind 机制 (C++11 及以上版本通常通过 std::allocator_traits 自动处理)
// 但为了兼容旧代码或清晰性,可以提供
template <typename U>
struct rebind {
using other = MallocAllocator<U>;
};
};
// 示例:使用 MallocAllocator
/*
#include <vector>
int main() {
std::vector<int, MallocAllocator<int>> myVec;
myVec.reserve(5); // allocate will be called
for (int i = 0; i < 5; ++i) {
myVec.push_back(i * 10); // construct will be called
}
std::cout << "Vector elements: ";
for (int x : myVec) {
std::cout << x << " ";
}
std::cout << std::endl;
// When myVec goes out of scope, destruct and deallocate will be called.
return 0;
}
*/
这个 MallocAllocator 只是 std::allocator 的一个简化版本,它演示了自定义分配器需要实现的基本接口。
3. NUMA 架构深度解析与操作系统 API
为了构建一个 NUMA 亲和性分配器,我们首先需要深入理解 NUMA 架构,并掌握操作系统提供的相关 API。
3.1 NUMA 架构的物理与逻辑视图
在多插槽服务器中,CPU 和内存被物理地组织成多个 NUMA 节点。每个节点都有其独立的内存总线和内存控制器。节点之间通过高速互连总线(如 Intel 的 QPI/UPI, AMD 的 Infinity Fabric)进行通信。
-
物理视图:
- CPU Die (包含多个核心)
- 集成内存控制器 (IMC)
- DRAM 模块 (通过 IMC 连接)
- 互连链路 (连接不同 CPU Die)
-
逻辑视图 (操作系统抽象):
- Node 0: CPU(s) 0, 1, …, k-1; Memory 0
- Node 1: CPU(s) k, k+1, …, m-1; Memory 1
- …
当一个线程在 CPU 0 上运行时,如果它访问 Memory 0,这是本地访问,速度快。如果它访问 Memory 1,这是远程访问,速度慢。远程访问的延迟不仅包括数据传输时间,还可能包括额外的缓存一致性协议开销。
3.2 Linux 上的 NUMA API (libnuma)
Linux 提供了一套强大的 libnuma 库(通常在 <numa.h> 或 <numaif.h> 中声明)来查询和控制 NUMA 策略。
核心功能:
-
查询 NUMA 拓扑:
numa_available(): 检查系统是否支持 NUMA。numa_num_configured_nodes(): 获取配置的 NUMA 节点总数。numa_num_configured_cpus(): 获取配置的 CPU 核总数。numa_node_of_cpu(cpu_id): 获取指定 CPU 核所属的 NUMA 节点 ID。numa_preferred(): 获取当前进程的默认首选 NUMA 节点。numa_node_size(node_id, &total, &free): 获取指定节点的内存大小。
-
内存分配策略:
numa_alloc(size): 在当前进程的首选节点上分配内存。numa_alloc_onnode(size, node_id): 在指定 NUMA 节点上分配size字节的内存。这是我们实现 NUMA 亲和性分配器的核心函数。numa_free(addr, size): 释放由numa_alloc或numa_alloc_onnode分配的内存。numa_alloc_interleave(size, nodemask): 在指定的 NUMA 节点掩码上交错分配内存。页面将轮流从每个节点分配。适用于需要在所有节点上均匀分布数据的情况。numa_alloc_local(size): 在当前 CPU 所在的 NUMA 节点上分配内存。numa_alloc_current(size): 在当前进程绑定的 NUMA 节点上分配内存。
-
进程/线程内存策略:
set_mempolicy(mode, nodemask, maxnode): 设置当前进程或线程的默认内存分配策略。MPOL_DEFAULT: 使用系统默认策略(通常是“首次触摸”)。MPOL_PREFERRED: 优先在指定节点分配,如果失败则回退到其他节点。MPOL_BIND: 严格绑定到指定节点集,只能从这些节点分配。如果这些节点内存不足,分配失败。MPOL_INTERLEAVE: 在指定节点集上交错分配。MPOL_LOCAL: 始终在当前 CPU 所在的 NUMA 节点上分配。
mbind(addr, len, mode, nodemask, maxnode, flags): 对指定内存区域设置 NUMA 策略。这允许更细粒度的控制,例如将一个大数组的不同部分绑定到不同节点。numa_set_preferred(node_id): 设置当前进程的首选 NUMA 节点。
-
CPU 亲和性:
sched_setaffinity(pid, cpusetsize, mask): 将进程/线程绑定到指定的 CPU 核集。pthread_setaffinity_np(pthread_t thread, size_t cpusetsize, const cpu_set_t *cpuset): 将指定线程绑定到 CPU 核集。这与内存亲和性协同工作,确保线程和它访问的内存都在同一个 NUMA 节点上。
获取当前线程的 NUMA 节点 ID:
这是构建 NUMA 分配器的一个关键步骤。我们可以通过以下方式获取:
- 先获取当前线程运行的 CPU ID:
sched_getcpu()(Linux 独有,非常快,但不是标准 C++)。 - 再通过 CPU ID 获取 NUMA 节点 ID:
numa_node_of_cpu(sched_getcpu())。
3.3 Windows 上的 NUMA API
Windows 也提供了相应的 NUMA API,主要通过 <windows.h> 中的函数实现:
GetNumaProcessorNode(UCHAR Processor, PUSHORT NodeNumber): 获取指定处理器(逻辑处理器索引)所在的 NUMA 节点。GetNumaNodeProcessorMask(UCHAR Node, PULONG_PTR ProcessorMask): 获取指定 NUMA 节点的所有处理器掩码。GetNumaAvailableMemoryNode(UCHAR Node, PULONG64 AvailableBytes): 获取指定 NUMA 节点的可用内存。VirtualAllocExNuma(HANDLE hProcess, LPVOID lpAddress, SIZE_T dwSize, DWORD flAllocationType, DWORD flProtect, UCHAR nndPreferred): 在指定 NUMA 节点上分配虚拟内存。这是 Windows 上实现 NUMA 亲和性分配器的核心函数。SetThreadAffinityMask(HANDLE hThread, DWORD_PTR dwThreadAffinityMask): 设置线程的处理器亲和性掩码。SetProcessAffinityMask(HANDLE hProcess, DWORD_PTR dwProcessAffinityMask): 设置进程的处理器亲和性掩码。
由于本文主要关注 Linux HPC 环境,后续的实现将以 libnuma 为主。
4. 设计一个 NUMA 亲和性自定义分配器
现在,我们有了理论基础和操作系统 API,可以着手设计我们的 NUMAAllocator 了。
4.1 核心设计理念
- 线程-NUMA 节点绑定: 每个线程在启动时或首次分配内存时,确定其当前运行的 NUMA 节点 ID。
- 节点本地分配: 当线程请求内存时,分配器应尝试从该线程所在的 NUMA 节点分配内存。
- 故障回退: 如果本地节点内存不足,可以配置为回退到其他节点(例如,通过
MPOL_PREFERRED策略)或直接失败。在 HPC 中,通常更倾向于失败或更严格的控制。 - 内存池优化: 为了减少
numa_alloc_onnode/numa_free的系统调用开销,每个 NUMA 节点可以维护一个独立的内存池。 - 线程安全: 如果多个线程可能同时请求分配内存,内存池需要线程安全机制(如互斥锁)。
4.2 基本 NUMAAllocator 结构
我们将构建一个 NUMAAllocator 类模板,它将包装 numa_alloc_onnode 和 numa_free。为了简单起见,我们首先实现一个不带内存池的版本,直接调用 libnuma 函数。
#ifndef NUMA_ALLOCATOR_HPP
#define NUMA_ALLOCATOR_HPP
#include <cstddef> // std::size_t, std::ptrdiff_t
#include <new> // std::bad_alloc, placement new
#include <limits> // std::numeric_limits
#include <iostream>
#include <stdexcept> // std::runtime_error
#ifdef __linux__
#include <numa.h> // numa_alloc_onnode, numa_free, numa_available etc.
#include <sched.h> // sched_getcpu
#include <thread> // std::this_thread::get_id
#include <map> // For node-specific allocators (if using pools later)
#include <mutex> // For thread safety
#include <vector> // For node_cpu_map
#endif
// 辅助函数:获取当前线程所在的 NUMA 节点 ID
static int get_current_numa_node() {
#ifdef __linux__
if (!numa_available() || numa_num_configured_nodes() < 2) {
// NUMA not available or only one node, fallback to default behavior
// or signal an error if strict NUMA is required.
// For simplicity, we'll return a default node 0 or -1 to indicate no NUMA
return 0;
}
// sched_getcpu() returns the current CPU core ID.
// numa_node_of_cpu() maps a CPU ID to a NUMA node ID.
int cpu_id = sched_getcpu();
if (cpu_id == -1) {
std::cerr << "Warning: sched_getcpu() failed, returning default NUMA node 0." << std::endl;
return 0; // Fallback
}
int node_id = numa_node_of_cpu(cpu_id);
if (node_id == -1) {
std::cerr << "Warning: numa_node_of_cpu() failed for CPU " << cpu_id << ", returning default NUMA node 0." << std::endl;
return 0; // Fallback
}
return node_id;
#else
// Non-Linux systems: return a default node or throw an error
std::cerr << "Warning: NUMAAllocator is primarily designed for Linux. Returning default node 0." << std::endl;
return 0;
#endif
}
// 线程局部存储,用于缓存当前线程的 NUMA 节点 ID
// 避免每次分配时都调用 sched_getcpu() 和 numa_node_of_cpu()
thread_local int current_thread_numa_node_id = -1;
// 初始化线程局部 NUMA 节点 ID
static void initialize_thread_numa_node() {
if (current_thread_numa_node_id == -1) {
current_thread_numa_node_id = get_current_numa_node();
std::cout << "[Thread " << std::this_thread::get_id()
<< "] Initialized NUMA node to: " << current_thread_numa_node_id << std::endl;
}
}
template <typename T>
class NUMAAllocator {
public:
// 类型定义
using value_type = T;
using pointer = T*;
using const_pointer = const T*;
using reference = T&;
using const_reference = const T&;
using size_type = std::size_t;
using difference_type = std::ptrdiff_t;
// 默认构造函数
NUMAAllocator() noexcept {
initialize_thread_numa_node();
}
// 拷贝构造函数,允许从不同类型的分配器构造
template <typename U>
NUMAAllocator(const NUMAAllocator<U>&) noexcept {
initialize_thread_numa_node();
}
// 内存分配
pointer allocate(size_type n) {
if (n == 0) return nullptr;
if (n > std::numeric_limits<size_type>::max() / sizeof(T)) {
throw std::bad_alloc(); // 避免乘法溢出
}
void* p = nullptr;
#ifdef __linux__
if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
int node_id = current_thread_numa_node_id;
if (node_id == -1) { // Fallback if TLS was not initialized for some reason
node_id = get_current_numa_node();
current_thread_numa_node_id = node_id; // Cache it
}
p = numa_alloc_onnode(n * sizeof(T), node_id);
if (!p) {
// 如果在指定节点分配失败,可以考虑回退策略,例如在任意节点分配,
// 但为了严格的 NUMA 亲和性,这里选择抛出异常。
// 实际应用中可以根据需求调整。
std::cerr << "Error: Failed to allocate " << n * sizeof(T)
<< " bytes on NUMA node " << node_id << ". Trying fallback to any node." << std::endl;
p = std::malloc(n * sizeof(T)); // Fallback to default malloc
if (!p) {
throw std::bad_alloc();
}
}
std::cout << "[Thread " << std::this_thread::get_id()
<< "] NUMAAllocator: Allocated " << n * sizeof(T) << " bytes for " << n
<< " elements of type " << typeid(T).name() << " on node " << node_id << " at " << p << std::endl;
} else {
// NUMA not available or single node, use default malloc
p = std::malloc(n * sizeof(T));
if (!p) {
throw std::bad_alloc();
}
std::cout << "[Thread " << std::this_thread::get_id()
<< "] NUMAAllocator: Allocated " << n * sizeof(T) << " bytes (via malloc) at " << p << std::endl;
}
#else
// Non-Linux systems, fall back to default malloc
p = std::malloc(n * sizeof(T));
if (!p) {
throw std::bad_alloc();
}
std::cout << "[Thread " << std::this_thread::get_id()
<< "] NUMAAllocator: Allocated " << n * sizeof(T) << " bytes (via malloc) at " << p << std::endl;
#endif
return static_cast<pointer>(p);
}
// 内存释放
void deallocate(pointer p, size_type n) noexcept {
if (!p) return;
(void)n; // n在这里可能不会被使用,但标准要求参数存在
#ifdef __linux__
if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
// numa_free doesn't need the node ID, it automatically finds it
std::cout << "[Thread " << std::this_thread::get_id()
<< "] NUMAAllocator: Deallocated " << n * sizeof(T) << " bytes at " << p << std::endl;
numa_free(p, n * sizeof(T));
} else {
std::cout << "[Thread " << std::this_thread::get_id()
<< "] NUMAAllocator: Deallocated " << n * sizeof(T) << " bytes (via free) at " << p << std::endl;
std::free(p);
}
#else
std::cout << "[Thread " << std::this_thread::get_id()
<< "] NUMAAllocator: Deallocated " << n * sizeof(T) << " bytes (via free) at " << p << std::endl;
std::free(p);
#endif
}
// 对象构造 (C++17 之前需要,之后可选)
template <typename U, typename... Args>
void construct(U* p, Args&&... args) {
new (p) U(std::forward<Args>(args)...); // placement new
}
// 对象析构 (C++17 之前需要,之后可选)
template <typename U>
void destroy(U* p) {
p->~U();
}
// 最大可分配大小
size_type max_size() const noexcept {
return std::numeric_limits<size_type>::max() / sizeof(T);
}
// 分配器比较 (所有 NUMAAllocator 实例都等价,因为它们都基于当前线程的NUMA节点)
// 但如果分配器实例内部持有状态(例如指向特定内存池的指针),则需要更复杂的比较逻辑。
bool operator==(const NUMAAllocator& other) const noexcept { return true; }
bool operator!=(const NUMAAllocator& other) const noexcept { return false; }
// Rebind 机制
template <typename U>
struct rebind {
using other = NUMAAllocator<U>;
};
};
#endif // NUMA_ALLOCATOR_HPP
代码解析:
- 平台判断: 使用
#ifdef __linux__确保libnuma相关的代码只在 Linux 系统上编译。在其他系统上,它会回退到std::malloc和std::free。 get_current_numa_node(): 这个静态辅助函数负责获取当前线程运行的 CPU ID,然后将其映射到 NUMA 节点 ID。它包含基本的错误检查和在 NUMA 不可用或单节点系统上的回退逻辑。thread_local int current_thread_numa_node_id: 这是一个关键优化。thread_local变量意味着每个线程都有其独立的current_thread_numa_node_id副本。这样,initialize_thread_numa_node()只会在每个线程首次使用NUMAAllocator时调用一次get_current_numa_node(),从而避免了在每次allocate调用中重复进行系统调用(sched_getcpu()和numa_node_of_cpu())。allocate():- 首先进行大小检查,防止溢出。
- 在 Linux 系统且 NUMA 可用时,获取当前线程的 NUMA 节点 ID。
- 调用
numa_alloc_onnode(n * sizeof(T), node_id)在指定节点上分配内存。 - 如果
numa_alloc_onnode失败,当前实现会打印错误并回退到std::malloc。在严格的 HPC 场景下,这可能需要更精细的错误处理或直接抛出异常。
deallocate():- 在 Linux 系统且 NUMA 可用时,调用
numa_free(p, n * sizeof(T))释放内存。numa_free会自动识别p所属的 NUMA 节点并释放。 - 在其他情况下,调用
std::free(p)。
- 在 Linux 系统且 NUMA 可用时,调用
construct()和destroy(): 提供了标准的placement new和显式析构调用,以便容器正确管理对象的生命周期。operator==: 简单地返回true,表示所有NUMAAllocator实例都是等价的。这是因为它们都基于当前线程的 NUMA 节点进行分配,不持有内部状态来区分彼此的分配策略。
4.3 使用 NUMAAllocator
现在我们可以在标准库容器中使用它了:
// main.cpp
#include "numa_allocator.hpp"
#include <vector>
#include <list>
#include <map>
#include <thread>
#include <chrono> // For std::this_thread::sleep_for
#include <numeric> // For std::iota
// 用于模拟工作负载的结构体
struct MyData {
int id;
double value;
char buffer[64]; // 模拟一些数据
MyData(int i = 0, double v = 0.0) : id(i), value(v) {
std::iota(std::begin(buffer), std::end(buffer), 0); // 填充一些数据
}
};
// 线程函数,模拟在不同NUMA节点上运行并使用NUMAAllocator
void numa_worker(int thread_id, int node_to_bind_to) {
#ifdef __linux__
// 尝试将线程绑定到指定的CPU核心,进而绑定到NUMA节点
// 假设每个NUMA节点至少有一个CPU核心
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(node_to_bind_to, &cpuset); // 绑定到与NUMA节点ID相同的CPU核心ID(简化处理)
int s = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
if (s != 0) {
std::cerr << "Error: pthread_setaffinity_np failed for thread " << thread_id << ", node " << node_to_bind_to << ": " << strerror(s) << std::endl;
} else {
std::cout << "Thread " << thread_id << " successfully bound to CPU " << node_to_bind_to << std::endl;
}
#endif
// 此时,current_thread_numa_node_id 会被初始化为 node_to_bind_to (或其对应的NUMA节点)
// 且后续的 NUMAAllocator 分配将尝试在该节点进行
std::cout << "Thread " << thread_id << " (NUMA node " << get_current_numa_node() << ") starting work..." << std::endl;
// 使用 NUMAAllocator 的 std::vector
std::vector<MyData, NUMAAllocator<MyData>> vec;
vec.reserve(100); // 预分配100个MyData对象
for (int i = 0; i < 100; ++i) {
vec.emplace_back(thread_id * 100 + i, (double)i / 10.0);
}
std::cout << "Thread " << thread_id << " (NUMA node " << get_current_numa_node() << ") vector size: " << vec.size() << std::endl;
// 模拟一些计算,确保数据被访问
double sum_val = 0.0;
for (const auto& data : vec) {
sum_val += data.value;
}
std::cout << "Thread " << thread_id << " (NUMA node " << get_current_numa_node() << ") sum_val: " << sum_val << std::endl;
// 另一个容器:std::list
std::list<int, NUMAAllocator<int>> my_list;
for (int i = 0; i < 50; ++i) {
my_list.push_back(i * 2);
}
std::cout << "Thread " << thread_id << " (NUMA node " << get_current_numa_node() << ") list size: " << my_list.size() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟工作
std::cout << "Thread " << thread_id << " finished." << std::endl;
}
int main() {
#ifdef __linux__
if (numa_available() < 0) {
std::cerr << "NUMA support not available on this system." << std::endl;
return 1;
}
int num_nodes = numa_num_configured_nodes();
if (num_nodes < 2) {
std::cout << "Only one NUMA node configured, NUMAAllocator will behave like default allocator." << std::endl;
// Still run the test to show it works, but without NUMA benefits
}
std::cout << "System has " << num_nodes << " NUMA nodes." << std::endl;
std::vector<std::thread> threads;
// 为每个 NUMA 节点启动一个线程,并尝试将其绑定到该节点
for (int i = 0; i < num_nodes; ++i) {
threads.emplace_back(numa_worker, i, i); // 假设node ID和CPU ID可以简单映射
}
for (auto& t : threads) {
t.join();
}
#else
std::cout << "NUMAAllocator example is primarily for Linux. Running a basic test without NUMA affinity." << std::endl;
// Still run a single thread test
numa_worker(0, 0);
#endif
std::cout << "Main program finished." << std::endl;
return 0;
}
编译和运行 (Linux):
- 保存
numa_allocator.hpp和main.cpp。 - 编译:
g++ -std=c++17 -Wall -Wextra -pthread -lnuma main.cpp -o numa_test-std=c++17: 使用 C++17 标准。-Wall -Wextra: 开启警告。-pthread: 链接pthread库(用于pthread_setaffinity_np和std::thread)。-lnuma: 链接libnuma库。
- 运行:
./numa_test你会看到输出日志显示内存是在哪个 NUMA 节点上分配的。通过
numactl --hardware命令可以查看系统的 NUMA 拓扑。
这个简单的 NUMAAllocator 已经能够实现基本的 NUMA 亲和性分配。然而,它直接调用 numa_alloc_onnode 和 numa_free,这仍然是系统调用,可能带来一定的开销。在高性能场景下,我们通常会结合内存池技术来进一步优化。
5. 高级 NUMA 分配策略与优化
5.1 内存池与 NUMA 结合
为了减少系统调用和内存碎片,我们可以在每个 NUMA 节点上实现一个独立的内存池。当线程需要内存时,它首先从其本地 NUMA 节点的内存池中获取;当释放内存时,返回到相应的内存池。
设计思路:
- 节点池管理: 一个全局的
NUMAPoolManager负责创建和管理每个 NUMA 节点的内存池。 - 线程本地访问:
NUMAAllocator通过current_thread_numa_node_id获取本地节点 ID,然后向NUMAPoolManager请求该节点的内存池。 - 内存池实现: 每个节点内存池可以是简单的固定大小块分配器(Block Allocator)或更复杂的自由列表分配器(Free List Allocator)。
- 线程安全: 每个节点内存池需要内部锁来保证并发访问的线程安全。
示例:一个简化的节点内存池 (Block Allocator)
// numa_pool_allocator.hpp
#ifndef NUMA_POOL_ALLOCATOR_HPP
#define NUMA_POOL_ALLOCATOR_HPP
#include <cstddef>
#include <new>
#include <limits>
#include <iostream>
#include <stdexcept>
#include <vector>
#include <mutex>
#include <map>
#include <thread>
#include <atomic>
#ifdef __linux__
#include <numa.h>
#include <sched.h>
#endif
// 辅助函数:获取当前线程所在的 NUMA 节点 ID
static int get_current_numa_node_fast() {
#ifdef __linux__
if (!numa_available() || numa_num_configured_nodes() < 2) {
return 0; // Fallback
}
int cpu_id = sched_getcpu();
if (cpu_id == -1) {
return 0;
}
int node_id = numa_node_of_cpu(cpu_id);
if (node_id == -1) {
return 0;
}
return node_id;
#else
return 0;
#endif
}
// 线程局部存储,用于缓存当前线程的 NUMA 节点 ID
thread_local int current_thread_numa_node_id_pool = -1;
static void initialize_thread_numa_node_pool() {
if (current_thread_numa_node_id_pool == -1) {
current_thread_numa_node_id_pool = get_current_numa_node_fast();
// std::cout << "[Pool Thread " << std::this_thread::get_id()
// << "] Initialized NUMA node to: " << current_thread_numa_node_id_pool << std::endl;
}
}
// --- 简单的固定块内存池实现 ---
class FixedBlockPool {
private:
std::vector<void*> blocks;
std::vector<char> memory_arena;
std::atomic<std::size_t> next_block_idx;
std::size_t block_size;
std::size_t num_blocks;
std::mutex mtx; // For pool management
public:
FixedBlockPool(std::size_t blk_sz, std::size_t num_blk, int numa_node_id)
: block_size(blk_sz), num_blocks(num_blk), next_block_idx(0) {
std::size_t total_size = block_size * num_blocks;
#ifdef __linux__
if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
memory_arena.resize(total_size); // Allocate space first
void* p = numa_alloc_onnode(total_size, numa_node_id);
if (!p) {
std::cerr << "Error: Failed to allocate " << total_size << " bytes on NUMA node " << numa_node_id << " for FixedBlockPool. Falling back to malloc." << std::endl;
p = std::malloc(total_size);
if (!p) {
throw std::bad_alloc();
}
}
// Manually copy the allocated memory to memory_arena, or better, directly use p
// For simplicity here, we assume memory_arena can hold it and we manage pointers
// A more robust design would have memory_arena be a pointer to the numa_alloc'd memory.
// Let's simplify: directly manage the raw pointer from numa_alloc_onnode.
// This means `memory_arena` won't hold the actual memory, but `numa_alloc_ptr` will.
// For a simple fixed block pool, we might not even need `std::vector<char> memory_arena`.
// Better: use a raw pointer and manually manage.
// This example directly allocates blocks from numa_alloc_onnode one by one for simplicity.
// A true pool would pre-allocate a large chunk.
// For now, let's just make it a simple "allocate-on-demand from numa_alloc_onnode"
// And use this pool for tracking. This simplifies the initial pool setup.
// Re-evaluating: A FixedBlockPool should pre-allocate.
// Let's make it a pre-allocated pool of raw memory.
void* base_ptr = numa_alloc_onnode(total_size, numa_node_id);
if (!base_ptr) {
std::cerr << "Error: FixedBlockPool failed to pre-allocate on NUMA node " << numa_node_id << std::endl;
base_ptr = std::malloc(total_size); // Fallback
if (!base_ptr) throw std::bad_alloc();
}
// Initialize blocks
for (std::size_t i = 0; i < num_blocks; ++i) {
blocks.push_back(static_cast<char*>(base_ptr) + i * block_size);
}
std::cout << "FixedBlockPool: Pre-allocated " << total_size << " bytes on NUMA node " << numa_node_id << std::endl;
} else {
// Fallback to malloc for non-NUMA systems or single node
void* base_ptr = std::malloc(total_size);
if (!base_ptr) throw std::bad_alloc();
for (std::size_t i = 0; i < num_blocks; ++i) {
blocks.push_back(static_cast<char*>(base_ptr) + i * block_size);
}
std::cout << "FixedBlockPool: Pre-allocated " << total_size << " bytes via malloc." << std::endl;
}
#else
void* base_ptr = std::malloc(total_size);
if (!base_ptr) throw std::bad_alloc();
for (std::size_t i = 0; i < num_blocks; ++i) {
blocks.push_back(static_cast<char*>(base_ptr) + i * block_size);
}
std::cout << "FixedBlockPool: Pre-allocated " << total_size << " bytes via malloc." << std::endl;
#endif
}
~FixedBlockPool() {
if (!blocks.empty()) {
// Only free the base pointer if it was allocated by numa_alloc_onnode or malloc
// For simplicity, we assume the first block's address is the base.
#ifdef __linux__
if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
numa_free(blocks[0], block_size * num_blocks);
} else {
std::free(blocks[0]);
}
#else
std::free(blocks[0]);
#endif
std::cout << "FixedBlockPool: Deallocated base memory." << std::endl;
}
}
void* allocate_block() {
// Use a mutex for simplicity, a lock-free approach would be faster
std::lock_guard<std::mutex> lock(mtx);
if (next_block_idx < num_blocks) {
void* p = blocks[next_block_idx++];
// std::cout << "Allocating block from pool: " << p << std::endl;
return p;
}
return nullptr; // Pool exhausted
}
// For a simple fixed block pool, deallocate might not immediately return to the pool
// unless we implement a free list. For this example, we assume blocks are not returned
// individually, or the pool is designed for one-time allocation then full clear.
// A more advanced pool would manage a free list of blocks.
void deallocate_block(void* p) {
// For a simple fixed-size block allocator without a free list, this is a no-op
// or would require more complex tracking.
// For demonstration, let's just acknowledge it.
// std::cout << "Deallocating block to pool (no-op for this simple pool): " << p << std::endl;
}
};
// --- NUMA 内存池管理器 ---
class NUMAPoolManager {
private:
std::map<int, std::unique_ptr<FixedBlockPool>> node_pools;
std::mutex manager_mtx; // Protects access to node_pools
std::size_t default_block_size;
std::size_t default_num_blocks;
NUMAPoolManager() : default_block_size(128), default_num_blocks(1024) { // Default values
// Initialize pools for all available NUMA nodes
#ifdef __linux__
if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
int num_nodes = numa_num_configured_nodes();
for (int i = 0; i < num_nodes; ++i) {
node_pools[i] = std::make_unique<FixedBlockPool>(default_block_size, default_num_blocks, i);
}
} else {
// For non-NUMA systems, create a single pool on node 0
node_pools[0] = std::make_unique<FixedBlockPool>(default_block_size, default_num_blocks, 0);
}
#else
node_pools[0] = std::make_unique<FixedBlockPool>(default_block_size, default_num_blocks, 0);
#endif
}
public:
static NUMAPoolManager& instance() {
static NUMAPoolManager mgr;
return mgr;
}
FixedBlockPool* get_pool_for_node(int node_id) {
std::lock_guard<std::mutex> lock(manager_mtx);
auto it = node_pools.find(node_id);
if (it != node_pools.end()) {
return it->second.get();
}
// If node_id not found (e.g., node_id is invalid or system has fewer nodes), return pool for node 0
std::cerr << "Warning: NUMA pool for node " << node_id << " not found. Falling back to node 0." << std::endl;
return node_pools[0].get();
}
// Setters for pool parameters if needed
void set_pool_params(std::size_t blk_sz, std::size_t num_blk) {
std::lock_guard<std::mutex> lock(manager_mtx);
default_block_size = blk_sz;
default_num_blocks = num_blk;
// Note: Changing params after initialization won't affect existing pools.
// This would require re-initializing all pools, which is complex.
}
};
// --- NUMAAllocator with Pool Support ---
template <typename T>
class NUMAPoolAllocator {
public:
// Type definitions
using value_type = T;
using pointer = T*;
using const_pointer = const T*;
using reference = T&;
using const_reference = const T&;
using size_type = std::size_t;
using difference_type = std::ptrdiff_t;
NUMAPoolAllocator() noexcept {
initialize_thread_numa_node_pool();
}
template <typename U>
NUMAPoolAllocator(const NUMAPoolAllocator<U>&) noexcept {
initialize_thread_numa_node_pool();
}
pointer allocate(size_type n) {
if (n == 0) return nullptr;
std::size_t bytes_to_allocate = n * sizeof(T);
// Check if allocation size matches pool block size
// For simplicity, this pool allocator only handles requests for its exact block size.
// A real pool allocator would support various sizes or be a general-purpose allocator.
if (bytes_to_allocate > NUMAPoolManager::instance().default_block_size) {
std::cerr << "Warning: Request for " << bytes_to_allocate << " bytes exceeds FixedBlockPool block size ("
<< NUMAPoolManager::instance().default_block_size << "). Falling back to direct NUMA alloc/malloc." << std::endl;
#ifdef __linux__
if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
int node_id = current_thread_numa_node_id_pool;
if (node_id == -1) node_id = get_current_numa_node_fast();
void* p = numa_alloc_onnode(bytes_to_allocate, node_id);
if (!p) throw std::bad_alloc();
return static_cast<pointer>(p);
}
#endif
void* p = std::malloc(bytes_to_allocate);
if (!p) throw std::bad_alloc();
return static_cast<pointer>(p);
}
int node_id = current_thread_numa_node_id_pool;
if (node_id == -1) node_id = get_current_numa_node_fast();
FixedBlockPool* pool = NUMAPoolManager::instance().get_pool_for_node(node_id);
void* p = pool->allocate_block();
if (!p) {
std::cerr << "Error: NUMA Pool for node " << node_id << " exhausted. Falling back to direct NUMA alloc/malloc." << std::endl;
#ifdef __linux__
if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
void* direct_p = numa_alloc_onnode(bytes_to_allocate, node_id);
if (!direct_p) throw std::bad_alloc();
return static_cast<pointer>(direct_p);
}
#endif
void* direct_p = std::malloc(bytes_to_allocate);
if (!direct_p) throw std::bad_alloc();
return static_cast<pointer>(direct_p);
}
// std::cout << "Allocated " << bytes_to_allocate << " bytes from pool on node " << node_id << " at " << p << std::endl;
return static_cast<pointer>(p);
}
void deallocate(pointer p, size_type n) noexcept {
if (!p) return;
std::size_t bytes_to_deallocate = n * sizeof(T);
// This pool allocator only handles blocks that it allocated.
// If it was allocated via direct numa_alloc_onnode or malloc, it should be freed accordingly.
// Determining if `p` came from the pool or direct allocation is complex without tracking.
// For simplicity, we assume if `bytes_to_deallocate` matches default block size, it came from pool.
// Otherwise, it was a direct allocation. This is a simplification and not robust.
if (bytes_to_deallocate == NUMAPoolManager::instance().default_block_size) {
int node_id = current_thread_numa_node_id_pool; // Could be incorrect if allocated from another node, but it's a heuristic
if (node_id == -1) node_id = get_current_numa_node_fast();
FixedBlockPool* pool = NUMAPoolManager::instance().get_pool_for_node(node_id);
pool->deallocate_block(p); // This simple FixedBlockPool does not really re-use blocks.
// std::cout << "Deallocated " << bytes_to_deallocate << " bytes to pool on node " << node_id << " at " << p << std::endl;
} else {
// Assume it was a direct allocation (numa_alloc_onnode or malloc)
#ifdef __linux__
if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
numa_free(p, bytes_to_deallocate);
} else {
std::free(p);
}
#else
std::free(p);
#endif
// std::cout << "Deallocated " << bytes_to_deallocate << " bytes via direct free at " << p << std::endl;
}
}
template <typename U, typename... Args>
void construct(U* p, Args&&... args) {
new (p) U(std::forward<Args>(args)...);
}
template <typename U>
void destroy(U* p) {
p->~U();
}
size_type max_size() const noexcept {
return std::numeric_limits<size_type>::max() / sizeof(T);
}
bool operator==(const NUMAPoolAllocator& other) const noexcept { return true; }
bool operator!=(const NUMAPoolAllocator& other) const noexcept { return false; }
template <typename U>
struct rebind {
using other = NUMAPoolAllocator<U>;
};
};
#endif // NUMA_POOL_ALLOCATOR_HPP
注意:
上述 FixedBlockPool 和 NUMAPoolAllocator 是一个非常简化的内存池实现,主要用于演示概念。
FixedBlockPool仅支持固定大小的块分配,并且其deallocate_block实际上是空操作,不将块返回到池中(这意味着池会耗尽)。一个真正的池分配器需要一个自由列表来管理已释放的块。NUMAPoolAllocator对不同大小的请求有简单的回退逻辑,并且在deallocate时通过启发式方法判断内存来源,这在生产环境中是不够鲁棒的。- 一个健壮的 NUMA 内存池分配器通常会采用更复杂的策略,如 Buddy Allocator 或 Slab Allocator,并且能够处理不同大小的分配请求。
5.2 线程亲和性 (Thread Affinity)
NUMA 亲和性内存分配策略的最大优势在于它与线程亲和性(Thread Affinity)的结合。如果一个线程总是运行在同一个 NUMA 节点上的 CPU 核上,那么它访问分配在该节点上的内存时,将始终是本地访问。
Linux (pthread_setaffinity_np):
我们已经在 numa_worker 示例中展示了如何使用 pthread_setaffinity_np 将线程绑定到特定的 CPU 核。
// 在 numa_worker 函数中
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(node_to_bind_to, &cpuset); // 绑定到与NUMA节点ID相同的CPU核心ID(简化处理)
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
这需要仔细规划:你需要知道哪个 CPU 核属于哪个 NUMA 节点,并据此将线程绑定到正确的核。numactl --hardware 和 lscpu -p 可以帮助你了解系统的 CPU 和 NUMA 拓扑。
5.3 懒惰分配 (Lazy Allocation) 与策略性绑定
除了 numa_alloc_onnode 这种显式分配方式,我们还可以利用操作系统的内存策略 API 来实现更灵活的 NUMA 内存管理。
- “首次触摸” (First-touch) 策略: 这是许多操作系统(包括 Linux)的默认 NUMA 内存分配策略。内存页面在首次被访问时,会被分配到访问它的 CPU 所在的 NUMA 节点上。这意味着如果你希望数据在某个节点上,你需要确保该节点上的线程是第一个访问这些数据的线程。
mbind(): 允许你将一个已经分配的内存区域绑定到特定的 NUMA 节点或节点集。这对于已经通过malloc分配但希望后续访问具有 NUMA 亲和性的内存很有用。// 示例:使用 mbind 将现有内存绑定到 NUMA 节点 void* data = std::malloc(1024 * 1024); // 1MB int node_id = get_current_numa_node_fast(); // 获取当前线程的NUMA节点 unsigned long nodemask = (1UL << node_id); long status = mbind(data, 1024 * 1024, MPOL_BIND, &nodemask, 32, 0); if (status != 0) { std::cerr << "mbind failed: " << strerror(errno) << std::endl; }numa_set_preferred(): 设置当前线程的“首选”NUMA 节点。后续的malloc或new操作会尝试在该节点分配内存,如果失败则回退到其他节点。这比numa_alloc_onnode更宽松,但能提供一定程度的亲和性。
5.4 异构 NUMA 分配策略
有时,我们可能需要更复杂的 NUMA 策略:
MPOL_INTERLEAVE(交错分配): 将内存页轮流分配到指定的 NUMA 节点集。适用于需要将数据均匀分布在多个节点上以提高带宽,或者避免单个节点内存瓶颈的情况。例如,一个大型只读查找表,被多个节点上的线程频繁访问,交错分配可以提高整体访问效率。// 示例:交错分配 void* data = numa_alloc_interleave(1024 * 1024, numa_all_nodes_ptr); // 在所有可用节点上交错分配MPOL_LOCAL: 始终在当前 CPU 所在的 NUMA 节点上分配。这与我们NUMAAllocator的核心思想一致。
这些高级策略提供了极大的灵活性,但同时也增加了复杂性。选择哪种策略取决于具体的应用需求、数据访问模式和 NUMA 拓扑。
5.5 性能测量与工具
为了验证 NUMA 亲和性分配器的效果,性能测量至关重要。
numactl:numactl --hardware: 查看 NUMA 拓扑和每个节点的内存信息。numactl --show: 查看当前进程的 NUMA 策略。numactl --membind=0 --cpunodebind=0 ./my_program: 运行程序,并将其内存和 CPU 绑定到 NUMA 节点 0。这是一种无需修改代码即可测试 NUMA 策略的强大工具。
perf: Linux 上的性能分析工具。可以用来检测缓存未命中、TLB 未命中等性能事件,间接反映远程内存访问的开销。- 自定义计时器: 在代码中测量关键部分的执行时间,比较使用和不使用 NUMA 分配器时的性能差异。
- 操作系统统计:
vmstat -s可以查看系统的 NUMA 统计信息,如numa_foreign(远程内存访问) 等。
6. C++ 标准库容器与自定义分配器
自定义分配器通过模板参数与标准库容器无缝集成。
#include <vector>
#include <list>
#include <map>
#include "numa_allocator.hpp" // 或者 numa_pool_allocator.hpp
// 使用 NUMAAllocator 的 std::vector
std::vector<int, NUMAAllocator<int>> my_numa_vector;
// 使用 NUMAAllocator 的 std::list
std::list<double, NUMAAllocator<double>> my_numa_list;
// 使用 NUMAAllocator 的 std::map (键和值都使用分配器分配)
std::map<std::string, MyData, std::less<std::string>, NUMAAllocator<std::pair<const std::string, MyData>>> my_numa_map;
C++17 std::pmr (Polymorphic Memory Resources)
C++17 引入了 std::pmr 命名空间,提供了一套多态内存资源(Polymorphic Memory Resources)的抽象。这使得内存分配策略可以在运行时选择和切换,而不是在编译时通过模板参数确定。
std::pmr 的优势:
- 类型擦除: 容器不再需要作为模板参数接受分配器类型,而是接受一个指向
std::pmr::memory_resource接口的指针。 - 运行时多态: 可以在运行时根据需求(如 NUMA 节点负载、内存大小)动态选择不同的内存分配策略。
- 简化接口:
std::pmr::polymorphic_allocator是一个通用的分配器,它内部持有一个memory_resource指针。
将 NUMA 逻辑集成到 std::pmr:
我们可以实现一个自定义的 std::pmr::memory_resource 派生类,将 libnuma 的分配逻辑封装在其中。
// numa_pmr_resource.hpp
#ifndef NUMA_PMR_RESOURCE_HPP
#define NUMA_PMR_RESOURCE_HPP
#include <memory_resource> // For std::pmr::memory_resource
#include <iostream>
#include <mutex> // For thread safety
#ifdef __linux__
#include <numa.h>
#include <sched.h>
#include <thread>
#endif
// 辅助函数:获取当前线程所在的 NUMA 节点 ID
static int get_current_numa_node_pmr() {
#ifdef __linux__
if (numa_available() < 0 || numa_num_configured_nodes() < 2) {
return 0;
}
int cpu_id = sched_getcpu();
if (cpu_id == -1) {
return 0;
}
int node_id = numa_node_of_cpu(cpu_id);
if (node_id == -1) {
return 0;
}
return node_id;
#else
return 0;
#endif
}
// 线程局部存储,用于缓存当前线程的 NUMA 节点 ID
thread_local int current_thread_numa_node_id_pmr = -1;
static void initialize_thread_numa_node_pmr() {
if (current_thread_numa_node_id_pmr == -1) {
current_thread_numa_node_id_pmr = get_current_numa_node_pmr();
// std::cout << "[PMR Thread " << std::this_thread::get_id()
// << "] Initialized NUMA node for PMR to: " << current_thread_numa_node_id_pmr << std::endl;
}
}
class NUMAPmrResource : public std::pmr::memory_resource {
private:
// This example resource directly uses numa_alloc_onnode/numa_free.
// In a real scenario, it might wrap a NUMA-aware pool.
protected:
void* do_allocate(std::size_t bytes, std::size_t alignment) override {
initialize_thread_numa_node_pmr();
void* p = nullptr;
#ifdef __linux__
if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
int node_id = current_thread_numa_node_id_pmr;
if (node_id == -1) { // Fallback if TLS was not initialized
node_id = get_current_numa_node_pmr();
current_thread_numa_node_id_pmr = node_id;
}
p = numa_alloc_onnode(bytes, node_id);
if (!p) {
std::cerr << "PMR Error: Failed to allocate " << bytes << " bytes on NUMA node " << node_id << ". Fallback to default." << std::endl;
p = std::pmr::get_default_resource()->allocate(bytes, alignment); // Fallback to default PMR resource
}
// std::cout << "[PMR Thread " << std::this_thread::get_id()
// << "] PMR NUMA Alloc: " << bytes << " bytes on node " << node_id << " at " << p << std::endl;
} else {
p = std::pmr::get_default_resource()->allocate(bytes, alignment);
}
#else
p = std::pmr::get_default_resource()->allocate(bytes, alignment);
#endif
return p;
}
void do_deallocate(void* p, std::size_t bytes, std::size_t alignment) override {
if (!p) return;
#ifdef __linux__
if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
numa_free(p, bytes);
// std::cout << "[PMR Thread " << std::this_thread::get_id()
// << "] PMR NUMA Dealloc: " << bytes << " bytes at " << p << std::endl;
} else {
std::pmr::get_default_resource()->deallocate(p, bytes, alignment);
}
#else
std::pmr::get_default_resource()->deallocate(p, bytes, alignment);
#endif
}
bool do_is_equal(const std::pmr::memory_resource& other) const noexcept override {
return this == &other; // Identity comparison for unique resources
}
public:
static NUMAPmrResource& instance() {
static NUMAPmrResource res;
return res;
}
};
#endif // NUMA_PMR_RESOURCE_HPP
使用 NUMAPmrResource:
// main_pmr.cpp
#include "numa_pmr_resource.hpp"
#include <vector>
#include <string>
#include <map>
#include <thread>
#include <chrono>
// 模拟工作负载的结构体
struct MyPmrData {
int id;
double value;
char buffer[64];
MyPmrData(int i = 0, double v = 0.0) : id(i), value(v) {
std::iota(std::begin(buffer), std::end(buffer), 0);
}
};
void numa_pmr_worker(int thread_id, int node_to_bind_to) {
#ifdef __linux__
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(node_to_bind_to, &cpuset);
int s = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
if (s != 0) {
std::cerr << "Error: pthread_setaffinity_np failed for PMR thread " << thread_id << ", node " << node_to_bind_to << ": " << strerror(s) << std::endl;
} else {
std::cout << "PMR Thread " << thread_id << " successfully bound to CPU " << node_to_bind_to << std::endl;
}
#endif
std::cout << "PMR Thread " << thread_id << " (NUMA node " << get_current_numa_node_pmr() << ") starting work..." << std::endl;
// 创建一个多态分配器,使用我们的 NUMAPmrResource
std::pmr::polymorphic_allocator<MyPmrData> pmr_alloc(&NUMAPmrResource::instance());
// 使用 std::pmr::vector
std::pmr::vector<MyPmrData> vec(pmr_alloc);
vec.reserve(100);
for (int i = 0; i < 100; ++i) {
vec.emplace_back(thread_id * 100 + i, (double)i / 10.0);
}
std::cout << "PMR Thread " << thread_id << " (NUMA node " << get_current_numa_node_pmr() << ") vector size: " << vec.size() << std::endl;
// 模拟一些计算
double sum_val = 0.0;
for (const auto& data : vec) {
sum_val += data.value;
}
std::cout << "PMR Thread " << thread_id << " (NUMA node " << get_current_numa_node_pmr() << ") sum_val: " << sum_val << std::endl;
std::pmr::map<std::string, int> my_map(pmr_alloc);
my_map["one"] = 1;
my_map["two"] = 2;
std::cout << "PMR Thread " << thread_id << " (NUMA node " << get_current_numa_node_pmr() << ") map size: " << my_map.size() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "PMR Thread " << thread_id << " finished." << std::endl;
}
int main() {
#ifdef __linux__
if (numa_available() < 0) {
std::cerr << "NUMA support not available on this system." << std::endl;
return 1;
}
int num_nodes = numa_num_configured_nodes();
if (num_nodes < 2) {
std::cout << "Only one NUMA node configured, PMR NUMA allocator will behave like default allocator." << std::endl;
}
std::cout << "System has " << num_nodes << " NUMA nodes." << std::endl;
std::vector<std::thread> threads;
for (int i = 0; i < num_nodes; ++i) {
threads.emplace_back(numa_pmr_worker, i, i);
}
for (auto& t : threads) {
t.join();
}
#else
std::cout << "PMR NUMAAllocator example is primarily for Linux. Running a basic test without NUMA affinity." << std::endl;
numa_pmr_worker(0, 0);
#endif
std::cout << "Main PMR program finished." << std::endl;
return 0;
}
编译和运行:
g++ -std=c++17 -Wall -Wextra -pthread -lnuma main_pmr.cpp -o numa_pmr_test
./numa_pmr_test
std::pmr 提供了更大的灵活性,尤其适用于需要运行时内存策略调整的复杂系统。