C++ 自定义分配器(Allocators):在高性能计算中适配 NUMA 亲和性的内存分配策略

各位编程专家和高性能计算爱好者,大家好!

今天,我们将深入探讨一个在高性能计算(HPC)领域至关重要的话题:C++ 自定义分配器(Allocators)及其在处理 NUMA(Non-Uniform Memory Access,非统一内存访问)亲和性方面的应用。随着现代处理器架构的演进,尤其是多核、多插槽系统的普及,内存访问模式对程序性能的影响日益显著。默认的内存分配策略往往无法充分利用硬件特性,甚至可能成为性能瓶颈。通过定制内存分配器,我们能够精细控制内存布局,实现与 NUMA 架构的深度适配,从而在数据密集型应用中获得显著的性能提升。

1. C++ 内存管理基石与 NUMA 挑战

在 C++ 中,我们通常使用 newdelete 运算符来动态分配和释放内存。这些操作符底层通常依赖于 C 语言的 mallocfree 函数,而 mallocfree 又会与操作系统进行交互,请求或归还内存页。对于大多数通用应用程序而言,这种默认的内存管理方式是足够高效且方便的。

然而,在高性能计算场景下,默认分配器的局限性开始显现:

  1. 性能开销: 频繁的小对象分配和释放可能导致高昂的系统调用开销和内存碎片。
  2. 预测性差: 无法保证内存分配的局部性,这在缓存敏感型应用中是致命的。
  3. 硬件无感知: 默认分配器通常对底层硬件架构(如缓存层次、NUMA 拓扑)一无所知,无法做出优化的决策。

这正是 NUMA 架构带来的挑战。NUMA 是一种多处理器系统架构,其中每个处理器或一组处理器都有其自己的本地内存控制器和内存。这意味着处理器访问本地内存的速度要比访问其他处理器(或 NUMA 节点)的远程内存快得多。这种速度差异可能高达数倍甚至一个数量级。

NUMA 架构核心概念:

  • NUMA 节点 (NUMA Node): 一个包含一个或多个 CPU 核、一个内存控制器和其直接连接的本地内存的单元。
  • 本地内存 (Local Memory): CPU 访问其所在 NUMA 节点上的内存。访问速度最快,延迟最低。
  • 远程内存 (Remote Memory): CPU 访问其他 NUMA 节点上的内存。访问速度较慢,延迟较高,因为数据需要通过互连总线(如 Intel QPI/UPI, AMD Infinity Fabric)传输。

图示 NUMA 拓扑 (概念性):
假设我们有一个双插槽系统:

  • NUMA Node 0: CPU 0, Memory 0
  • NUMA Node 1: CPU 1, Memory 1

如果运行在 CPU 0 上的线程尝试访问分配在 Memory 1 上的数据,这就是一次远程内存访问。这种跨节点访问会导致额外的延迟,降低程序的整体性能。在 HPC 应用中,数据量通常非常庞大,远程内存访问的累积效应可能导致计算效率大幅下降。

因此,我们的目标是:确保运行在某个 NUMA 节点上的线程,尽可能地访问分配在该节点上的内存。 这就是所谓的 NUMA 亲和性内存分配策略。为了实现这一目标,我们需要 C++ 自定义分配器。

2. C++ 标准库分配器模型:std::allocator 及其接口

C++ 标准库容器(如 std::vector, std::list, std::map 等)都接受一个模板参数,用于指定内存分配器。默认情况下,它们使用 std::allocator<T>,这是一个简单地包装了 ::operator new::operator delete 的分配器。

一个符合 C++ 标准的自定义分配器必须提供一套特定的接口,以便与标准库容器无缝协作。这个接口在 C++11 之后主要由 std::allocator_traits 辅助定义和使用,但在 C++03 时代以及为了理解底层机制,直接实现这些成员函数仍然是最佳实践。

核心接口要求 (以 C++11/14/17 风格):

成员类型或函数 描述
value_type 分配器所能分配的单个元素的类型。
pointer 指向 value_type 的指针类型,通常是 T*
const_pointer 指向 const value_type 的指针类型,通常是 const T*
reference value_type 的引用类型,通常是 T&
const_reference const value_type 的引用类型,通常是 const T&
size_type 无符号整数类型,足以表示最大可分配对象的大小。通常是 std::size_t
difference_type 有符号整数类型,足以表示两个指针之间的距离。通常是 std::ptrdiff_t
propagate_on_container_copy_assignment 布尔值,指示容器在复制赋值时是否应复制其分配器。通常为 std::false_type
propagate_on_container_move_assignment 布尔值,指示容器在移动赋值时是否应移动其分配器。通常为 std::true_type
propagate_on_container_swap 布尔值,指示容器在交换时是否应交换其分配器。通常为 std::false_type
is_always_equal C++17 新增。布尔值,表示此分配器类型的所有实例是否总是相等。如果为 true_type,则容器可以优化其行为,例如避免存储分配器或在某些操作中避免复制它。
rebind<U>::other C++11 之前的标准要求。一个嵌套的模板结构,用于获取一个可以分配 U 类型对象的分配器。在 C++11 及以后,std::allocator_traits 会自动处理 rebind 逻辑,通常无需显式提供。
allocate(n) 分配 nvalue_type 对象的原始内存,并返回指向第一个对象的指针。不进行对象构造。可能抛出 std::bad_alloc
deallocate(p, n) 释放由 allocate 分配的 p 指向的 nvalue_type 对象的内存。不进行对象析构。
construct(p, args...) p 指向的内存处构造一个 value_type 对象,使用 args 进行初始化。C++17 之后,std::allocator 默认不提供 constructdestroy,而是通过 std::allocator_traits 调用全局的 ::new::deletestd::construct_at。但在自定义分配器中提供它们是常见的。
destroy(p) 销毁 p 指向的 value_type 对象,但不释放内存。C++17 之后同上。
max_size() 返回分配器可以分配的最大元素数量。
operator==(const Alloc&, const Alloc&) 比较两个分配器实例是否相等。如果相等,则它们可以互相释放对方分配的内存。
operator!=(const Alloc&, const Alloc&) 比较两个分配器实例是否不相等。

一个最简单的自定义分配器示例 (包装 malloc/free):

#include <cstddef> // For std::size_t, std::ptrdiff_t
#include <new>     // For placement new, std::bad_alloc
#include <limits>  // For std::numeric_limits
#include <iostream>

template <typename T>
class MallocAllocator {
public:
    // 类型定义
    using value_type = T;
    using pointer = T*;
    using const_pointer = const T*;
    using reference = T&;
    using const_reference = const T&;
    using size_type = std::size_t;
    using difference_type = std::ptrdiff_t;

    // 默认构造函数
    MallocAllocator() = default;

    // 拷贝构造函数,允许从不同类型的分配器构造
    template <typename U>
    MallocAllocator(const MallocAllocator<U>&) noexcept {}

    // 内存分配
    pointer allocate(size_type n) {
        if (n == 0) return nullptr;
        if (n > std::numeric_limits<size_type>::max() / sizeof(T)) {
            throw std::bad_alloc(); // 避免乘法溢出
        }
        void* p = std::malloc(n * sizeof(T));
        if (!p) {
            throw std::bad_alloc();
        }
        std::cout << "Allocated " << n * sizeof(T) << " bytes at " << p << std::endl;
        return static_cast<pointer>(p);
    }

    // 内存释放
    void deallocate(pointer p, size_type n) noexcept {
        (void)n; // n在这里可能不会被使用,但标准要求参数存在
        std::cout << "Deallocated " << n * sizeof(T) << " bytes at " << p << std::endl;
        std::free(p);
    }

    // 对象构造 (C++17 之前需要,之后可选)
    template <typename U, typename... Args>
    void construct(U* p, Args&&... args) {
        new (p) U(std::forward<Args>(args)...); // placement new
    }

    // 对象析构 (C++17 之前需要,之后可选)
    template <typename U>
    void destroy(U* p) {
        p->~U();
    }

    // 最大可分配大小
    size_type max_size() const noexcept {
        return std::numeric_limits<size_type>::max() / sizeof(T);
    }

    // 分配器比较 (所有 MallocAllocator 实例都等价)
    bool operator==(const MallocAllocator& other) const noexcept { return true; }
    bool operator!=(const MallocAllocator& other) const noexcept { return false; }

    // Rebind 机制 (C++11 及以上版本通常通过 std::allocator_traits 自动处理)
    // 但为了兼容旧代码或清晰性,可以提供
    template <typename U>
    struct rebind {
        using other = MallocAllocator<U>;
    };
};

// 示例:使用 MallocAllocator
/*
#include <vector>
int main() {
    std::vector<int, MallocAllocator<int>> myVec;
    myVec.reserve(5); // allocate will be called
    for (int i = 0; i < 5; ++i) {
        myVec.push_back(i * 10); // construct will be called
    }
    std::cout << "Vector elements: ";
    for (int x : myVec) {
        std::cout << x << " ";
    }
    std::cout << std::endl;
    // When myVec goes out of scope, destruct and deallocate will be called.
    return 0;
}
*/

这个 MallocAllocator 只是 std::allocator 的一个简化版本,它演示了自定义分配器需要实现的基本接口。

3. NUMA 架构深度解析与操作系统 API

为了构建一个 NUMA 亲和性分配器,我们首先需要深入理解 NUMA 架构,并掌握操作系统提供的相关 API。

3.1 NUMA 架构的物理与逻辑视图

在多插槽服务器中,CPU 和内存被物理地组织成多个 NUMA 节点。每个节点都有其独立的内存总线和内存控制器。节点之间通过高速互连总线(如 Intel 的 QPI/UPI, AMD 的 Infinity Fabric)进行通信。

  • 物理视图:

    • CPU Die (包含多个核心)
    • 集成内存控制器 (IMC)
    • DRAM 模块 (通过 IMC 连接)
    • 互连链路 (连接不同 CPU Die)
  • 逻辑视图 (操作系统抽象):

    • Node 0: CPU(s) 0, 1, …, k-1; Memory 0
    • Node 1: CPU(s) k, k+1, …, m-1; Memory 1

当一个线程在 CPU 0 上运行时,如果它访问 Memory 0,这是本地访问,速度快。如果它访问 Memory 1,这是远程访问,速度慢。远程访问的延迟不仅包括数据传输时间,还可能包括额外的缓存一致性协议开销。

3.2 Linux 上的 NUMA API (libnuma)

Linux 提供了一套强大的 libnuma 库(通常在 <numa.h><numaif.h> 中声明)来查询和控制 NUMA 策略。

核心功能:

  1. 查询 NUMA 拓扑:

    • numa_available(): 检查系统是否支持 NUMA。
    • numa_num_configured_nodes(): 获取配置的 NUMA 节点总数。
    • numa_num_configured_cpus(): 获取配置的 CPU 核总数。
    • numa_node_of_cpu(cpu_id): 获取指定 CPU 核所属的 NUMA 节点 ID。
    • numa_preferred(): 获取当前进程的默认首选 NUMA 节点。
    • numa_node_size(node_id, &total, &free): 获取指定节点的内存大小。
  2. 内存分配策略:

    • numa_alloc(size): 在当前进程的首选节点上分配内存。
    • numa_alloc_onnode(size, node_id): 在指定 NUMA 节点上分配 size 字节的内存。这是我们实现 NUMA 亲和性分配器的核心函数。
    • numa_free(addr, size): 释放由 numa_allocnuma_alloc_onnode 分配的内存。
    • numa_alloc_interleave(size, nodemask): 在指定的 NUMA 节点掩码上交错分配内存。页面将轮流从每个节点分配。适用于需要在所有节点上均匀分布数据的情况。
    • numa_alloc_local(size): 在当前 CPU 所在的 NUMA 节点上分配内存。
    • numa_alloc_current(size): 在当前进程绑定的 NUMA 节点上分配内存。
  3. 进程/线程内存策略:

    • set_mempolicy(mode, nodemask, maxnode): 设置当前进程或线程的默认内存分配策略。
      • MPOL_DEFAULT: 使用系统默认策略(通常是“首次触摸”)。
      • MPOL_PREFERRED: 优先在指定节点分配,如果失败则回退到其他节点。
      • MPOL_BIND: 严格绑定到指定节点集,只能从这些节点分配。如果这些节点内存不足,分配失败。
      • MPOL_INTERLEAVE: 在指定节点集上交错分配。
      • MPOL_LOCAL: 始终在当前 CPU 所在的 NUMA 节点上分配。
    • mbind(addr, len, mode, nodemask, maxnode, flags): 对指定内存区域设置 NUMA 策略。这允许更细粒度的控制,例如将一个大数组的不同部分绑定到不同节点。
    • numa_set_preferred(node_id): 设置当前进程的首选 NUMA 节点。
  4. CPU 亲和性:

    • sched_setaffinity(pid, cpusetsize, mask): 将进程/线程绑定到指定的 CPU 核集。
    • pthread_setaffinity_np(pthread_t thread, size_t cpusetsize, const cpu_set_t *cpuset): 将指定线程绑定到 CPU 核集。这与内存亲和性协同工作,确保线程和它访问的内存都在同一个 NUMA 节点上。

获取当前线程的 NUMA 节点 ID:
这是构建 NUMA 分配器的一个关键步骤。我们可以通过以下方式获取:

  • 先获取当前线程运行的 CPU ID:sched_getcpu() (Linux 独有,非常快,但不是标准 C++)。
  • 再通过 CPU ID 获取 NUMA 节点 ID:numa_node_of_cpu(sched_getcpu())

3.3 Windows 上的 NUMA API

Windows 也提供了相应的 NUMA API,主要通过 <windows.h> 中的函数实现:

  • GetNumaProcessorNode(UCHAR Processor, PUSHORT NodeNumber): 获取指定处理器(逻辑处理器索引)所在的 NUMA 节点。
  • GetNumaNodeProcessorMask(UCHAR Node, PULONG_PTR ProcessorMask): 获取指定 NUMA 节点的所有处理器掩码。
  • GetNumaAvailableMemoryNode(UCHAR Node, PULONG64 AvailableBytes): 获取指定 NUMA 节点的可用内存。
  • VirtualAllocExNuma(HANDLE hProcess, LPVOID lpAddress, SIZE_T dwSize, DWORD flAllocationType, DWORD flProtect, UCHAR nndPreferred): 在指定 NUMA 节点上分配虚拟内存。这是 Windows 上实现 NUMA 亲和性分配器的核心函数。
  • SetThreadAffinityMask(HANDLE hThread, DWORD_PTR dwThreadAffinityMask): 设置线程的处理器亲和性掩码。
  • SetProcessAffinityMask(HANDLE hProcess, DWORD_PTR dwProcessAffinityMask): 设置进程的处理器亲和性掩码。

由于本文主要关注 Linux HPC 环境,后续的实现将以 libnuma 为主。

4. 设计一个 NUMA 亲和性自定义分配器

现在,我们有了理论基础和操作系统 API,可以着手设计我们的 NUMAAllocator 了。

4.1 核心设计理念

  1. 线程-NUMA 节点绑定: 每个线程在启动时或首次分配内存时,确定其当前运行的 NUMA 节点 ID。
  2. 节点本地分配: 当线程请求内存时,分配器应尝试从该线程所在的 NUMA 节点分配内存。
  3. 故障回退: 如果本地节点内存不足,可以配置为回退到其他节点(例如,通过 MPOL_PREFERRED 策略)或直接失败。在 HPC 中,通常更倾向于失败或更严格的控制。
  4. 内存池优化: 为了减少 numa_alloc_onnode / numa_free 的系统调用开销,每个 NUMA 节点可以维护一个独立的内存池。
  5. 线程安全: 如果多个线程可能同时请求分配内存,内存池需要线程安全机制(如互斥锁)。

4.2 基本 NUMAAllocator 结构

我们将构建一个 NUMAAllocator 类模板,它将包装 numa_alloc_onnodenuma_free。为了简单起见,我们首先实现一个不带内存池的版本,直接调用 libnuma 函数。

#ifndef NUMA_ALLOCATOR_HPP
#define NUMA_ALLOCATOR_HPP

#include <cstddef>     // std::size_t, std::ptrdiff_t
#include <new>         // std::bad_alloc, placement new
#include <limits>      // std::numeric_limits
#include <iostream>
#include <stdexcept>   // std::runtime_error

#ifdef __linux__
#include <numa.h>      // numa_alloc_onnode, numa_free, numa_available etc.
#include <sched.h>     // sched_getcpu
#include <thread>      // std::this_thread::get_id
#include <map>         // For node-specific allocators (if using pools later)
#include <mutex>       // For thread safety
#include <vector>      // For node_cpu_map
#endif

// 辅助函数:获取当前线程所在的 NUMA 节点 ID
static int get_current_numa_node() {
#ifdef __linux__
    if (!numa_available() || numa_num_configured_nodes() < 2) {
        // NUMA not available or only one node, fallback to default behavior
        // or signal an error if strict NUMA is required.
        // For simplicity, we'll return a default node 0 or -1 to indicate no NUMA
        return 0; 
    }

    // sched_getcpu() returns the current CPU core ID.
    // numa_node_of_cpu() maps a CPU ID to a NUMA node ID.
    int cpu_id = sched_getcpu();
    if (cpu_id == -1) {
        std::cerr << "Warning: sched_getcpu() failed, returning default NUMA node 0." << std::endl;
        return 0; // Fallback
    }

    int node_id = numa_node_of_cpu(cpu_id);
    if (node_id == -1) {
        std::cerr << "Warning: numa_node_of_cpu() failed for CPU " << cpu_id << ", returning default NUMA node 0." << std::endl;
        return 0; // Fallback
    }
    return node_id;
#else
    // Non-Linux systems: return a default node or throw an error
    std::cerr << "Warning: NUMAAllocator is primarily designed for Linux. Returning default node 0." << std::endl;
    return 0;
#endif
}

// 线程局部存储,用于缓存当前线程的 NUMA 节点 ID
// 避免每次分配时都调用 sched_getcpu() 和 numa_node_of_cpu()
thread_local int current_thread_numa_node_id = -1;

// 初始化线程局部 NUMA 节点 ID
static void initialize_thread_numa_node() {
    if (current_thread_numa_node_id == -1) {
        current_thread_numa_node_id = get_current_numa_node();
        std::cout << "[Thread " << std::this_thread::get_id() 
                  << "] Initialized NUMA node to: " << current_thread_numa_node_id << std::endl;
    }
}

template <typename T>
class NUMAAllocator {
public:
    // 类型定义
    using value_type = T;
    using pointer = T*;
    using const_pointer = const T*;
    using reference = T&;
    using const_reference = const T&;
    using size_type = std::size_t;
    using difference_type = std::ptrdiff_t;

    // 默认构造函数
    NUMAAllocator() noexcept {
        initialize_thread_numa_node();
    }

    // 拷贝构造函数,允许从不同类型的分配器构造
    template <typename U>
    NUMAAllocator(const NUMAAllocator<U>&) noexcept {
        initialize_thread_numa_node();
    }

    // 内存分配
    pointer allocate(size_type n) {
        if (n == 0) return nullptr;
        if (n > std::numeric_limits<size_type>::max() / sizeof(T)) {
            throw std::bad_alloc(); // 避免乘法溢出
        }

        void* p = nullptr;
#ifdef __linux__
        if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
            int node_id = current_thread_numa_node_id;
            if (node_id == -1) { // Fallback if TLS was not initialized for some reason
                 node_id = get_current_numa_node();
                 current_thread_numa_node_id = node_id; // Cache it
            }

            p = numa_alloc_onnode(n * sizeof(T), node_id);
            if (!p) {
                // 如果在指定节点分配失败,可以考虑回退策略,例如在任意节点分配,
                // 但为了严格的 NUMA 亲和性,这里选择抛出异常。
                // 实际应用中可以根据需求调整。
                std::cerr << "Error: Failed to allocate " << n * sizeof(T) 
                          << " bytes on NUMA node " << node_id << ". Trying fallback to any node." << std::endl;
                p = std::malloc(n * sizeof(T)); // Fallback to default malloc
                if (!p) {
                    throw std::bad_alloc();
                }
            }
            std::cout << "[Thread " << std::this_thread::get_id() 
                      << "] NUMAAllocator: Allocated " << n * sizeof(T) << " bytes for " << n 
                      << " elements of type " << typeid(T).name() << " on node " << node_id << " at " << p << std::endl;
        } else {
            // NUMA not available or single node, use default malloc
            p = std::malloc(n * sizeof(T));
            if (!p) {
                throw std::bad_alloc();
            }
            std::cout << "[Thread " << std::this_thread::get_id() 
                      << "] NUMAAllocator: Allocated " << n * sizeof(T) << " bytes (via malloc) at " << p << std::endl;
        }
#else
        // Non-Linux systems, fall back to default malloc
        p = std::malloc(n * sizeof(T));
        if (!p) {
            throw std::bad_alloc();
        }
        std::cout << "[Thread " << std::this_thread::get_id() 
                  << "] NUMAAllocator: Allocated " << n * sizeof(T) << " bytes (via malloc) at " << p << std::endl;
#endif
        return static_cast<pointer>(p);
    }

    // 内存释放
    void deallocate(pointer p, size_type n) noexcept {
        if (!p) return;
        (void)n; // n在这里可能不会被使用,但标准要求参数存在

#ifdef __linux__
        if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
            // numa_free doesn't need the node ID, it automatically finds it
            std::cout << "[Thread " << std::this_thread::get_id() 
                      << "] NUMAAllocator: Deallocated " << n * sizeof(T) << " bytes at " << p << std::endl;
            numa_free(p, n * sizeof(T));
        } else {
            std::cout << "[Thread " << std::this_thread::get_id() 
                      << "] NUMAAllocator: Deallocated " << n * sizeof(T) << " bytes (via free) at " << p << std::endl;
            std::free(p);
        }
#else
        std::cout << "[Thread " << std::this_thread::get_id() 
                  << "] NUMAAllocator: Deallocated " << n * sizeof(T) << " bytes (via free) at " << p << std::endl;
        std::free(p);
#endif
    }

    // 对象构造 (C++17 之前需要,之后可选)
    template <typename U, typename... Args>
    void construct(U* p, Args&&... args) {
        new (p) U(std::forward<Args>(args)...); // placement new
    }

    // 对象析构 (C++17 之前需要,之后可选)
    template <typename U>
    void destroy(U* p) {
        p->~U();
    }

    // 最大可分配大小
    size_type max_size() const noexcept {
        return std::numeric_limits<size_type>::max() / sizeof(T);
    }

    // 分配器比较 (所有 NUMAAllocator 实例都等价,因为它们都基于当前线程的NUMA节点)
    // 但如果分配器实例内部持有状态(例如指向特定内存池的指针),则需要更复杂的比较逻辑。
    bool operator==(const NUMAAllocator& other) const noexcept { return true; }
    bool operator!=(const NUMAAllocator& other) const noexcept { return false; }

    // Rebind 机制
    template <typename U>
    struct rebind {
        using other = NUMAAllocator<U>;
    };
};

#endif // NUMA_ALLOCATOR_HPP

代码解析:

  1. 平台判断: 使用 #ifdef __linux__ 确保 libnuma 相关的代码只在 Linux 系统上编译。在其他系统上,它会回退到 std::mallocstd::free
  2. get_current_numa_node(): 这个静态辅助函数负责获取当前线程运行的 CPU ID,然后将其映射到 NUMA 节点 ID。它包含基本的错误检查和在 NUMA 不可用或单节点系统上的回退逻辑。
  3. thread_local int current_thread_numa_node_id: 这是一个关键优化。thread_local 变量意味着每个线程都有其独立的 current_thread_numa_node_id 副本。这样,initialize_thread_numa_node() 只会在每个线程首次使用 NUMAAllocator 时调用一次 get_current_numa_node(),从而避免了在每次 allocate 调用中重复进行系统调用(sched_getcpu()numa_node_of_cpu() )。
  4. allocate():
    • 首先进行大小检查,防止溢出。
    • 在 Linux 系统且 NUMA 可用时,获取当前线程的 NUMA 节点 ID。
    • 调用 numa_alloc_onnode(n * sizeof(T), node_id) 在指定节点上分配内存。
    • 如果 numa_alloc_onnode 失败,当前实现会打印错误并回退到 std::malloc。在严格的 HPC 场景下,这可能需要更精细的错误处理或直接抛出异常。
  5. deallocate():
    • 在 Linux 系统且 NUMA 可用时,调用 numa_free(p, n * sizeof(T)) 释放内存。numa_free 会自动识别 p 所属的 NUMA 节点并释放。
    • 在其他情况下,调用 std::free(p)
  6. construct()destroy(): 提供了标准的 placement new 和显式析构调用,以便容器正确管理对象的生命周期。
  7. operator==: 简单地返回 true,表示所有 NUMAAllocator 实例都是等价的。这是因为它们都基于当前线程的 NUMA 节点进行分配,不持有内部状态来区分彼此的分配策略。

4.3 使用 NUMAAllocator

现在我们可以在标准库容器中使用它了:

// main.cpp
#include "numa_allocator.hpp"
#include <vector>
#include <list>
#include <map>
#include <thread>
#include <chrono> // For std::this_thread::sleep_for
#include <numeric> // For std::iota

// 用于模拟工作负载的结构体
struct MyData {
    int id;
    double value;
    char buffer[64]; // 模拟一些数据

    MyData(int i = 0, double v = 0.0) : id(i), value(v) {
        std::iota(std::begin(buffer), std::end(buffer), 0); // 填充一些数据
    }
};

// 线程函数,模拟在不同NUMA节点上运行并使用NUMAAllocator
void numa_worker(int thread_id, int node_to_bind_to) {
#ifdef __linux__
    // 尝试将线程绑定到指定的CPU核心,进而绑定到NUMA节点
    // 假设每个NUMA节点至少有一个CPU核心
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(node_to_bind_to, &cpuset); // 绑定到与NUMA节点ID相同的CPU核心ID(简化处理)
    int s = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
    if (s != 0) {
        std::cerr << "Error: pthread_setaffinity_np failed for thread " << thread_id << ", node " << node_to_bind_to << ": " << strerror(s) << std::endl;
    } else {
        std::cout << "Thread " << thread_id << " successfully bound to CPU " << node_to_bind_to << std::endl;
    }
#endif

    // 此时,current_thread_numa_node_id 会被初始化为 node_to_bind_to (或其对应的NUMA节点)
    // 且后续的 NUMAAllocator 分配将尝试在该节点进行

    std::cout << "Thread " << thread_id << " (NUMA node " << get_current_numa_node() << ") starting work..." << std::endl;

    // 使用 NUMAAllocator 的 std::vector
    std::vector<MyData, NUMAAllocator<MyData>> vec;
    vec.reserve(100); // 预分配100个MyData对象

    for (int i = 0; i < 100; ++i) {
        vec.emplace_back(thread_id * 100 + i, (double)i / 10.0);
    }

    std::cout << "Thread " << thread_id << " (NUMA node " << get_current_numa_node() << ") vector size: " << vec.size() << std::endl;

    // 模拟一些计算,确保数据被访问
    double sum_val = 0.0;
    for (const auto& data : vec) {
        sum_val += data.value;
    }
    std::cout << "Thread " << thread_id << " (NUMA node " << get_current_numa_node() << ") sum_val: " << sum_val << std::endl;

    // 另一个容器:std::list
    std::list<int, NUMAAllocator<int>> my_list;
    for (int i = 0; i < 50; ++i) {
        my_list.push_back(i * 2);
    }
    std::cout << "Thread " << thread_id << " (NUMA node " << get_current_numa_node() << ") list size: " << my_list.size() << std::endl;

    std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟工作
    std::cout << "Thread " << thread_id << " finished." << std::endl;
}

int main() {
#ifdef __linux__
    if (numa_available() < 0) {
        std::cerr << "NUMA support not available on this system." << std::endl;
        return 1;
    }
    int num_nodes = numa_num_configured_nodes();
    if (num_nodes < 2) {
        std::cout << "Only one NUMA node configured, NUMAAllocator will behave like default allocator." << std::endl;
        // Still run the test to show it works, but without NUMA benefits
    }
    std::cout << "System has " << num_nodes << " NUMA nodes." << std::endl;

    std::vector<std::thread> threads;
    // 为每个 NUMA 节点启动一个线程,并尝试将其绑定到该节点
    for (int i = 0; i < num_nodes; ++i) {
        threads.emplace_back(numa_worker, i, i); // 假设node ID和CPU ID可以简单映射
    }

    for (auto& t : threads) {
        t.join();
    }
#else
    std::cout << "NUMAAllocator example is primarily for Linux. Running a basic test without NUMA affinity." << std::endl;
    // Still run a single thread test
    numa_worker(0, 0); 
#endif
    std::cout << "Main program finished." << std::endl;
    return 0;
}

编译和运行 (Linux):

  1. 保存 numa_allocator.hppmain.cpp
  2. 编译:
    g++ -std=c++17 -Wall -Wextra -pthread -lnuma main.cpp -o numa_test
    • -std=c++17: 使用 C++17 标准。
    • -Wall -Wextra: 开启警告。
    • -pthread: 链接 pthread 库(用于 pthread_setaffinity_npstd::thread)。
    • -lnuma: 链接 libnuma 库。
  3. 运行:
    ./numa_test

    你会看到输出日志显示内存是在哪个 NUMA 节点上分配的。通过 numactl --hardware 命令可以查看系统的 NUMA 拓扑。

这个简单的 NUMAAllocator 已经能够实现基本的 NUMA 亲和性分配。然而,它直接调用 numa_alloc_onnodenuma_free,这仍然是系统调用,可能带来一定的开销。在高性能场景下,我们通常会结合内存池技术来进一步优化。

5. 高级 NUMA 分配策略与优化

5.1 内存池与 NUMA 结合

为了减少系统调用和内存碎片,我们可以在每个 NUMA 节点上实现一个独立的内存池。当线程需要内存时,它首先从其本地 NUMA 节点的内存池中获取;当释放内存时,返回到相应的内存池。

设计思路:

  1. 节点池管理: 一个全局的 NUMAPoolManager 负责创建和管理每个 NUMA 节点的内存池。
  2. 线程本地访问: NUMAAllocator 通过 current_thread_numa_node_id 获取本地节点 ID,然后向 NUMAPoolManager 请求该节点的内存池。
  3. 内存池实现: 每个节点内存池可以是简单的固定大小块分配器(Block Allocator)或更复杂的自由列表分配器(Free List Allocator)。
  4. 线程安全: 每个节点内存池需要内部锁来保证并发访问的线程安全。

示例:一个简化的节点内存池 (Block Allocator)

// numa_pool_allocator.hpp
#ifndef NUMA_POOL_ALLOCATOR_HPP
#define NUMA_POOL_ALLOCATOR_HPP

#include <cstddef>
#include <new>
#include <limits>
#include <iostream>
#include <stdexcept>
#include <vector>
#include <mutex>
#include <map>
#include <thread>
#include <atomic>

#ifdef __linux__
#include <numa.h>
#include <sched.h>
#endif

// 辅助函数:获取当前线程所在的 NUMA 节点 ID
static int get_current_numa_node_fast() {
#ifdef __linux__
    if (!numa_available() || numa_num_configured_nodes() < 2) {
        return 0; // Fallback
    }
    int cpu_id = sched_getcpu();
    if (cpu_id == -1) {
        return 0; 
    }
    int node_id = numa_node_of_cpu(cpu_id);
    if (node_id == -1) {
        return 0;
    }
    return node_id;
#else
    return 0;
#endif
}

// 线程局部存储,用于缓存当前线程的 NUMA 节点 ID
thread_local int current_thread_numa_node_id_pool = -1;

static void initialize_thread_numa_node_pool() {
    if (current_thread_numa_node_id_pool == -1) {
        current_thread_numa_node_id_pool = get_current_numa_node_fast();
        // std::cout << "[Pool Thread " << std::this_thread::get_id() 
        //           << "] Initialized NUMA node to: " << current_thread_numa_node_id_pool << std::endl;
    }
}

// --- 简单的固定块内存池实现 ---
class FixedBlockPool {
private:
    std::vector<void*> blocks;
    std::vector<char> memory_arena;
    std::atomic<std::size_t> next_block_idx;
    std::size_t block_size;
    std::size_t num_blocks;
    std::mutex mtx; // For pool management

public:
    FixedBlockPool(std::size_t blk_sz, std::size_t num_blk, int numa_node_id)
        : block_size(blk_sz), num_blocks(num_blk), next_block_idx(0) {

        std::size_t total_size = block_size * num_blocks;
#ifdef __linux__
        if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
            memory_arena.resize(total_size); // Allocate space first
            void* p = numa_alloc_onnode(total_size, numa_node_id);
            if (!p) {
                std::cerr << "Error: Failed to allocate " << total_size << " bytes on NUMA node " << numa_node_id << " for FixedBlockPool. Falling back to malloc." << std::endl;
                p = std::malloc(total_size);
                if (!p) {
                    throw std::bad_alloc();
                }
            }
            // Manually copy the allocated memory to memory_arena, or better, directly use p
            // For simplicity here, we assume memory_arena can hold it and we manage pointers
            // A more robust design would have memory_arena be a pointer to the numa_alloc'd memory.
            // Let's simplify: directly manage the raw pointer from numa_alloc_onnode.
            // This means `memory_arena` won't hold the actual memory, but `numa_alloc_ptr` will.
            // For a simple fixed block pool, we might not even need `std::vector<char> memory_arena`.

            // Better: use a raw pointer and manually manage.
            // This example directly allocates blocks from numa_alloc_onnode one by one for simplicity.
            // A true pool would pre-allocate a large chunk.

            // For now, let's just make it a simple "allocate-on-demand from numa_alloc_onnode"
            // And use this pool for tracking. This simplifies the initial pool setup.
            // Re-evaluating: A FixedBlockPool should pre-allocate.
            // Let's make it a pre-allocated pool of raw memory.
            void* base_ptr = numa_alloc_onnode(total_size, numa_node_id);
            if (!base_ptr) {
                std::cerr << "Error: FixedBlockPool failed to pre-allocate on NUMA node " << numa_node_id << std::endl;
                base_ptr = std::malloc(total_size); // Fallback
                if (!base_ptr) throw std::bad_alloc();
            }
            // Initialize blocks
            for (std::size_t i = 0; i < num_blocks; ++i) {
                blocks.push_back(static_cast<char*>(base_ptr) + i * block_size);
            }
            std::cout << "FixedBlockPool: Pre-allocated " << total_size << " bytes on NUMA node " << numa_node_id << std::endl;

        } else {
            // Fallback to malloc for non-NUMA systems or single node
            void* base_ptr = std::malloc(total_size);
            if (!base_ptr) throw std::bad_alloc();
            for (std::size_t i = 0; i < num_blocks; ++i) {
                blocks.push_back(static_cast<char*>(base_ptr) + i * block_size);
            }
            std::cout << "FixedBlockPool: Pre-allocated " << total_size << " bytes via malloc." << std::endl;
        }
#else
        void* base_ptr = std::malloc(total_size);
        if (!base_ptr) throw std::bad_alloc();
        for (std::size_t i = 0; i < num_blocks; ++i) {
            blocks.push_back(static_cast<char*>(base_ptr) + i * block_size);
        }
        std::cout << "FixedBlockPool: Pre-allocated " << total_size << " bytes via malloc." << std::endl;
#endif
    }

    ~FixedBlockPool() {
        if (!blocks.empty()) {
            // Only free the base pointer if it was allocated by numa_alloc_onnode or malloc
            // For simplicity, we assume the first block's address is the base.
#ifdef __linux__
            if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
                numa_free(blocks[0], block_size * num_blocks);
            } else {
                std::free(blocks[0]);
            }
#else
            std::free(blocks[0]);
#endif
            std::cout << "FixedBlockPool: Deallocated base memory." << std::endl;
        }
    }

    void* allocate_block() {
        // Use a mutex for simplicity, a lock-free approach would be faster
        std::lock_guard<std::mutex> lock(mtx);
        if (next_block_idx < num_blocks) {
            void* p = blocks[next_block_idx++];
            // std::cout << "Allocating block from pool: " << p << std::endl;
            return p;
        }
        return nullptr; // Pool exhausted
    }

    // For a simple fixed block pool, deallocate might not immediately return to the pool
    // unless we implement a free list. For this example, we assume blocks are not returned
    // individually, or the pool is designed for one-time allocation then full clear.
    // A more advanced pool would manage a free list of blocks.
    void deallocate_block(void* p) {
        // For a simple fixed-size block allocator without a free list, this is a no-op
        // or would require more complex tracking.
        // For demonstration, let's just acknowledge it.
        // std::cout << "Deallocating block to pool (no-op for this simple pool): " << p << std::endl;
    }
};

// --- NUMA 内存池管理器 ---
class NUMAPoolManager {
private:
    std::map<int, std::unique_ptr<FixedBlockPool>> node_pools;
    std::mutex manager_mtx; // Protects access to node_pools
    std::size_t default_block_size;
    std::size_t default_num_blocks;

    NUMAPoolManager() : default_block_size(128), default_num_blocks(1024) { // Default values
        // Initialize pools for all available NUMA nodes
#ifdef __linux__
        if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
            int num_nodes = numa_num_configured_nodes();
            for (int i = 0; i < num_nodes; ++i) {
                node_pools[i] = std::make_unique<FixedBlockPool>(default_block_size, default_num_blocks, i);
            }
        } else {
            // For non-NUMA systems, create a single pool on node 0
            node_pools[0] = std::make_unique<FixedBlockPool>(default_block_size, default_num_blocks, 0);
        }
#else
        node_pools[0] = std::make_unique<FixedBlockPool>(default_block_size, default_num_blocks, 0);
#endif
    }

public:
    static NUMAPoolManager& instance() {
        static NUMAPoolManager mgr;
        return mgr;
    }

    FixedBlockPool* get_pool_for_node(int node_id) {
        std::lock_guard<std::mutex> lock(manager_mtx);
        auto it = node_pools.find(node_id);
        if (it != node_pools.end()) {
            return it->second.get();
        }
        // If node_id not found (e.g., node_id is invalid or system has fewer nodes), return pool for node 0
        std::cerr << "Warning: NUMA pool for node " << node_id << " not found. Falling back to node 0." << std::endl;
        return node_pools[0].get();
    }

    // Setters for pool parameters if needed
    void set_pool_params(std::size_t blk_sz, std::size_t num_blk) {
        std::lock_guard<std::mutex> lock(manager_mtx);
        default_block_size = blk_sz;
        default_num_blocks = num_blk;
        // Note: Changing params after initialization won't affect existing pools.
        // This would require re-initializing all pools, which is complex.
    }
};

// --- NUMAAllocator with Pool Support ---
template <typename T>
class NUMAPoolAllocator {
public:
    // Type definitions
    using value_type = T;
    using pointer = T*;
    using const_pointer = const T*;
    using reference = T&;
    using const_reference = const T&;
    using size_type = std::size_t;
    using difference_type = std::ptrdiff_t;

    NUMAPoolAllocator() noexcept {
        initialize_thread_numa_node_pool();
    }

    template <typename U>
    NUMAPoolAllocator(const NUMAPoolAllocator<U>&) noexcept {
        initialize_thread_numa_node_pool();
    }

    pointer allocate(size_type n) {
        if (n == 0) return nullptr;
        std::size_t bytes_to_allocate = n * sizeof(T);

        // Check if allocation size matches pool block size
        // For simplicity, this pool allocator only handles requests for its exact block size.
        // A real pool allocator would support various sizes or be a general-purpose allocator.
        if (bytes_to_allocate > NUMAPoolManager::instance().default_block_size) {
            std::cerr << "Warning: Request for " << bytes_to_allocate << " bytes exceeds FixedBlockPool block size ("
                      << NUMAPoolManager::instance().default_block_size << "). Falling back to direct NUMA alloc/malloc." << std::endl;
#ifdef __linux__
            if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
                int node_id = current_thread_numa_node_id_pool;
                if (node_id == -1) node_id = get_current_numa_node_fast();
                void* p = numa_alloc_onnode(bytes_to_allocate, node_id);
                if (!p) throw std::bad_alloc();
                return static_cast<pointer>(p);
            }
#endif
            void* p = std::malloc(bytes_to_allocate);
            if (!p) throw std::bad_alloc();
            return static_cast<pointer>(p);
        }

        int node_id = current_thread_numa_node_id_pool;
        if (node_id == -1) node_id = get_current_numa_node_fast();

        FixedBlockPool* pool = NUMAPoolManager::instance().get_pool_for_node(node_id);
        void* p = pool->allocate_block();
        if (!p) {
            std::cerr << "Error: NUMA Pool for node " << node_id << " exhausted. Falling back to direct NUMA alloc/malloc." << std::endl;
#ifdef __linux__
            if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
                void* direct_p = numa_alloc_onnode(bytes_to_allocate, node_id);
                if (!direct_p) throw std::bad_alloc();
                return static_cast<pointer>(direct_p);
            }
#endif
            void* direct_p = std::malloc(bytes_to_allocate);
            if (!direct_p) throw std::bad_alloc();
            return static_cast<pointer>(direct_p);
        }
        // std::cout << "Allocated " << bytes_to_allocate << " bytes from pool on node " << node_id << " at " << p << std::endl;
        return static_cast<pointer>(p);
    }

    void deallocate(pointer p, size_type n) noexcept {
        if (!p) return;
        std::size_t bytes_to_deallocate = n * sizeof(T);

        // This pool allocator only handles blocks that it allocated.
        // If it was allocated via direct numa_alloc_onnode or malloc, it should be freed accordingly.
        // Determining if `p` came from the pool or direct allocation is complex without tracking.
        // For simplicity, we assume if `bytes_to_deallocate` matches default block size, it came from pool.
        // Otherwise, it was a direct allocation. This is a simplification and not robust.
        if (bytes_to_deallocate == NUMAPoolManager::instance().default_block_size) {
            int node_id = current_thread_numa_node_id_pool; // Could be incorrect if allocated from another node, but it's a heuristic
            if (node_id == -1) node_id = get_current_numa_node_fast();
            FixedBlockPool* pool = NUMAPoolManager::instance().get_pool_for_node(node_id);
            pool->deallocate_block(p); // This simple FixedBlockPool does not really re-use blocks.
            // std::cout << "Deallocated " << bytes_to_deallocate << " bytes to pool on node " << node_id << " at " << p << std::endl;
        } else {
            // Assume it was a direct allocation (numa_alloc_onnode or malloc)
#ifdef __linux__
            if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
                numa_free(p, bytes_to_deallocate);
            } else {
                std::free(p);
            }
#else
            std::free(p);
#endif
            // std::cout << "Deallocated " << bytes_to_deallocate << " bytes via direct free at " << p << std::endl;
        }
    }

    template <typename U, typename... Args>
    void construct(U* p, Args&&... args) {
        new (p) U(std::forward<Args>(args)...);
    }

    template <typename U>
    void destroy(U* p) {
        p->~U();
    }

    size_type max_size() const noexcept {
        return std::numeric_limits<size_type>::max() / sizeof(T);
    }

    bool operator==(const NUMAPoolAllocator& other) const noexcept { return true; }
    bool operator!=(const NUMAPoolAllocator& other) const noexcept { return false; }

    template <typename U>
    struct rebind {
        using other = NUMAPoolAllocator<U>;
    };
};

#endif // NUMA_POOL_ALLOCATOR_HPP

注意:
上述 FixedBlockPoolNUMAPoolAllocator 是一个非常简化的内存池实现,主要用于演示概念。

  • FixedBlockPool 仅支持固定大小的块分配,并且其 deallocate_block 实际上是空操作,不将块返回到池中(这意味着池会耗尽)。一个真正的池分配器需要一个自由列表来管理已释放的块。
  • NUMAPoolAllocator 对不同大小的请求有简单的回退逻辑,并且在 deallocate 时通过启发式方法判断内存来源,这在生产环境中是不够鲁棒的。
  • 一个健壮的 NUMA 内存池分配器通常会采用更复杂的策略,如 Buddy Allocator 或 Slab Allocator,并且能够处理不同大小的分配请求。

5.2 线程亲和性 (Thread Affinity)

NUMA 亲和性内存分配策略的最大优势在于它与线程亲和性(Thread Affinity)的结合。如果一个线程总是运行在同一个 NUMA 节点上的 CPU 核上,那么它访问分配在该节点上的内存时,将始终是本地访问。

Linux (pthread_setaffinity_np):
我们已经在 numa_worker 示例中展示了如何使用 pthread_setaffinity_np 将线程绑定到特定的 CPU 核。

// 在 numa_worker 函数中
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(node_to_bind_to, &cpuset); // 绑定到与NUMA节点ID相同的CPU核心ID(简化处理)
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);

这需要仔细规划:你需要知道哪个 CPU 核属于哪个 NUMA 节点,并据此将线程绑定到正确的核。numactl --hardwarelscpu -p 可以帮助你了解系统的 CPU 和 NUMA 拓扑。

5.3 懒惰分配 (Lazy Allocation) 与策略性绑定

除了 numa_alloc_onnode 这种显式分配方式,我们还可以利用操作系统的内存策略 API 来实现更灵活的 NUMA 内存管理。

  • “首次触摸” (First-touch) 策略: 这是许多操作系统(包括 Linux)的默认 NUMA 内存分配策略。内存页面在首次被访问时,会被分配到访问它的 CPU 所在的 NUMA 节点上。这意味着如果你希望数据在某个节点上,你需要确保该节点上的线程是第一个访问这些数据的线程。
  • mbind(): 允许你将一个已经分配的内存区域绑定到特定的 NUMA 节点或节点集。这对于已经通过 malloc 分配但希望后续访问具有 NUMA 亲和性的内存很有用。
    // 示例:使用 mbind 将现有内存绑定到 NUMA 节点
    void* data = std::malloc(1024 * 1024); // 1MB
    int node_id = get_current_numa_node_fast(); // 获取当前线程的NUMA节点
    unsigned long nodemask = (1UL << node_id);
    long status = mbind(data, 1024 * 1024, MPOL_BIND, &nodemask, 32, 0);
    if (status != 0) {
        std::cerr << "mbind failed: " << strerror(errno) << std::endl;
    }
  • numa_set_preferred(): 设置当前线程的“首选”NUMA 节点。后续的 mallocnew 操作会尝试在该节点分配内存,如果失败则回退到其他节点。这比 numa_alloc_onnode 更宽松,但能提供一定程度的亲和性。

5.4 异构 NUMA 分配策略

有时,我们可能需要更复杂的 NUMA 策略:

  • MPOL_INTERLEAVE (交错分配): 将内存页轮流分配到指定的 NUMA 节点集。适用于需要将数据均匀分布在多个节点上以提高带宽,或者避免单个节点内存瓶颈的情况。例如,一个大型只读查找表,被多个节点上的线程频繁访问,交错分配可以提高整体访问效率。
    // 示例:交错分配
    void* data = numa_alloc_interleave(1024 * 1024, numa_all_nodes_ptr); // 在所有可用节点上交错分配
  • MPOL_LOCAL: 始终在当前 CPU 所在的 NUMA 节点上分配。这与我们 NUMAAllocator 的核心思想一致。

这些高级策略提供了极大的灵活性,但同时也增加了复杂性。选择哪种策略取决于具体的应用需求、数据访问模式和 NUMA 拓扑。

5.5 性能测量与工具

为了验证 NUMA 亲和性分配器的效果,性能测量至关重要。

  • numactl:
    • numactl --hardware: 查看 NUMA 拓扑和每个节点的内存信息。
    • numactl --show: 查看当前进程的 NUMA 策略。
    • numactl --membind=0 --cpunodebind=0 ./my_program: 运行程序,并将其内存和 CPU 绑定到 NUMA 节点 0。这是一种无需修改代码即可测试 NUMA 策略的强大工具。
  • perf: Linux 上的性能分析工具。可以用来检测缓存未命中、TLB 未命中等性能事件,间接反映远程内存访问的开销。
  • 自定义计时器: 在代码中测量关键部分的执行时间,比较使用和不使用 NUMA 分配器时的性能差异。
  • 操作系统统计: vmstat -s 可以查看系统的 NUMA 统计信息,如 numa_foreign (远程内存访问) 等。

6. C++ 标准库容器与自定义分配器

自定义分配器通过模板参数与标准库容器无缝集成。

#include <vector>
#include <list>
#include <map>
#include "numa_allocator.hpp" // 或者 numa_pool_allocator.hpp

// 使用 NUMAAllocator 的 std::vector
std::vector<int, NUMAAllocator<int>> my_numa_vector;

// 使用 NUMAAllocator 的 std::list
std::list<double, NUMAAllocator<double>> my_numa_list;

// 使用 NUMAAllocator 的 std::map (键和值都使用分配器分配)
std::map<std::string, MyData, std::less<std::string>, NUMAAllocator<std::pair<const std::string, MyData>>> my_numa_map;

C++17 std::pmr (Polymorphic Memory Resources)

C++17 引入了 std::pmr 命名空间,提供了一套多态内存资源(Polymorphic Memory Resources)的抽象。这使得内存分配策略可以在运行时选择和切换,而不是在编译时通过模板参数确定。

std::pmr 的优势:

  • 类型擦除: 容器不再需要作为模板参数接受分配器类型,而是接受一个指向 std::pmr::memory_resource 接口的指针。
  • 运行时多态: 可以在运行时根据需求(如 NUMA 节点负载、内存大小)动态选择不同的内存分配策略。
  • 简化接口: std::pmr::polymorphic_allocator 是一个通用的分配器,它内部持有一个 memory_resource 指针。

将 NUMA 逻辑集成到 std::pmr:
我们可以实现一个自定义的 std::pmr::memory_resource 派生类,将 libnuma 的分配逻辑封装在其中。

// numa_pmr_resource.hpp
#ifndef NUMA_PMR_RESOURCE_HPP
#define NUMA_PMR_RESOURCE_HPP

#include <memory_resource> // For std::pmr::memory_resource
#include <iostream>
#include <mutex> // For thread safety

#ifdef __linux__
#include <numa.h>
#include <sched.h>
#include <thread>
#endif

// 辅助函数:获取当前线程所在的 NUMA 节点 ID
static int get_current_numa_node_pmr() {
#ifdef __linux__
    if (numa_available() < 0 || numa_num_configured_nodes() < 2) {
        return 0; 
    }
    int cpu_id = sched_getcpu();
    if (cpu_id == -1) {
        return 0; 
    }
    int node_id = numa_node_of_cpu(cpu_id);
    if (node_id == -1) {
        return 0;
    }
    return node_id;
#else
    return 0;
#endif
}

// 线程局部存储,用于缓存当前线程的 NUMA 节点 ID
thread_local int current_thread_numa_node_id_pmr = -1;

static void initialize_thread_numa_node_pmr() {
    if (current_thread_numa_node_id_pmr == -1) {
        current_thread_numa_node_id_pmr = get_current_numa_node_pmr();
        // std::cout << "[PMR Thread " << std::this_thread::get_id() 
        //           << "] Initialized NUMA node for PMR to: " << current_thread_numa_node_id_pmr << std::endl;
    }
}

class NUMAPmrResource : public std::pmr::memory_resource {
private:
    // This example resource directly uses numa_alloc_onnode/numa_free.
    // In a real scenario, it might wrap a NUMA-aware pool.
protected:
    void* do_allocate(std::size_t bytes, std::size_t alignment) override {
        initialize_thread_numa_node_pmr();
        void* p = nullptr;
#ifdef __linux__
        if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
            int node_id = current_thread_numa_node_id_pmr;
            if (node_id == -1) { // Fallback if TLS was not initialized
                 node_id = get_current_numa_node_pmr();
                 current_thread_numa_node_id_pmr = node_id;
            }

            p = numa_alloc_onnode(bytes, node_id);
            if (!p) {
                std::cerr << "PMR Error: Failed to allocate " << bytes << " bytes on NUMA node " << node_id << ". Fallback to default." << std::endl;
                p = std::pmr::get_default_resource()->allocate(bytes, alignment); // Fallback to default PMR resource
            }
            // std::cout << "[PMR Thread " << std::this_thread::get_id() 
            //           << "] PMR NUMA Alloc: " << bytes << " bytes on node " << node_id << " at " << p << std::endl;
        } else {
            p = std::pmr::get_default_resource()->allocate(bytes, alignment);
        }
#else
        p = std::pmr::get_default_resource()->allocate(bytes, alignment);
#endif
        return p;
    }

    void do_deallocate(void* p, std::size_t bytes, std::size_t alignment) override {
        if (!p) return;
#ifdef __linux__
        if (numa_available() >= 0 && numa_num_configured_nodes() > 1) {
            numa_free(p, bytes);
            // std::cout << "[PMR Thread " << std::this_thread::get_id() 
            //           << "] PMR NUMA Dealloc: " << bytes << " bytes at " << p << std::endl;
        } else {
            std::pmr::get_default_resource()->deallocate(p, bytes, alignment);
        }
#else
        std::pmr::get_default_resource()->deallocate(p, bytes, alignment);
#endif
    }

    bool do_is_equal(const std::pmr::memory_resource& other) const noexcept override {
        return this == &other; // Identity comparison for unique resources
    }

public:
    static NUMAPmrResource& instance() {
        static NUMAPmrResource res;
        return res;
    }
};

#endif // NUMA_PMR_RESOURCE_HPP

使用 NUMAPmrResource:

// main_pmr.cpp
#include "numa_pmr_resource.hpp"
#include <vector>
#include <string>
#include <map>
#include <thread>
#include <chrono>

// 模拟工作负载的结构体
struct MyPmrData {
    int id;
    double value;
    char buffer[64];

    MyPmrData(int i = 0, double v = 0.0) : id(i), value(v) {
        std::iota(std::begin(buffer), std::end(buffer), 0);
    }
};

void numa_pmr_worker(int thread_id, int node_to_bind_to) {
#ifdef __linux__
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(node_to_bind_to, &cpuset); 
    int s = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
    if (s != 0) {
        std::cerr << "Error: pthread_setaffinity_np failed for PMR thread " << thread_id << ", node " << node_to_bind_to << ": " << strerror(s) << std::endl;
    } else {
        std::cout << "PMR Thread " << thread_id << " successfully bound to CPU " << node_to_bind_to << std::endl;
    }
#endif

    std::cout << "PMR Thread " << thread_id << " (NUMA node " << get_current_numa_node_pmr() << ") starting work..." << std::endl;

    // 创建一个多态分配器,使用我们的 NUMAPmrResource
    std::pmr::polymorphic_allocator<MyPmrData> pmr_alloc(&NUMAPmrResource::instance());

    // 使用 std::pmr::vector
    std::pmr::vector<MyPmrData> vec(pmr_alloc);
    vec.reserve(100); 

    for (int i = 0; i < 100; ++i) {
        vec.emplace_back(thread_id * 100 + i, (double)i / 10.0);
    }
    std::cout << "PMR Thread " << thread_id << " (NUMA node " << get_current_numa_node_pmr() << ") vector size: " << vec.size() << std::endl;

    // 模拟一些计算
    double sum_val = 0.0;
    for (const auto& data : vec) {
        sum_val += data.value;
    }
    std::cout << "PMR Thread " << thread_id << " (NUMA node " << get_current_numa_node_pmr() << ") sum_val: " << sum_val << std::endl;

    std::pmr::map<std::string, int> my_map(pmr_alloc);
    my_map["one"] = 1;
    my_map["two"] = 2;
    std::cout << "PMR Thread " << thread_id << " (NUMA node " << get_current_numa_node_pmr() << ") map size: " << my_map.size() << std::endl;

    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    std::cout << "PMR Thread " << thread_id << " finished." << std::endl;
}

int main() {
#ifdef __linux__
    if (numa_available() < 0) {
        std::cerr << "NUMA support not available on this system." << std::endl;
        return 1;
    }
    int num_nodes = numa_num_configured_nodes();
    if (num_nodes < 2) {
        std::cout << "Only one NUMA node configured, PMR NUMA allocator will behave like default allocator." << std::endl;
    }
    std::cout << "System has " << num_nodes << " NUMA nodes." << std::endl;

    std::vector<std::thread> threads;
    for (int i = 0; i < num_nodes; ++i) {
        threads.emplace_back(numa_pmr_worker, i, i); 
    }

    for (auto& t : threads) {
        t.join();
    }
#else
    std::cout << "PMR NUMAAllocator example is primarily for Linux. Running a basic test without NUMA affinity." << std::endl;
    numa_pmr_worker(0, 0); 
#endif
    std::cout << "Main PMR program finished." << std::endl;
    return 0;
}

编译和运行:

g++ -std=c++17 -Wall -Wextra -pthread -lnuma main_pmr.cpp -o numa_pmr_test
./numa_pmr_test

std::pmr 提供了更大的灵活性,尤其适用于需要运行时内存策略调整的复杂系统。

7. 实际案例与注意事项

7.1 何时使用自定义

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注