什么是 ‘NUMA’ (非统一内存访问) 感知的 C++ 编程?如何利用 `libnuma` 优化多路服务器性能?

各位高性能计算的同仁们,大家好!

今天,我们将深入探讨一个在现代多核、多路服务器架构中至关重要的主题:NUMA(Non-Uniform Memory Access,非统一内存访问)感知的 C++ 编程。随着处理器核心数量的爆炸式增长,以及系统内存带宽瓶颈的日益凸显,简单地将所有 CPU 核心视为一个扁平的计算资源池已经不足以应对性能优化的挑战。NUMA 架构正是这一演进的产物,它在带来巨大计算能力的同时,也为软件开发者带来了新的挑战和机遇。

作为一名资深的编程专家,我将带领大家理解 NUMA 的本质,揭示其对 C++ 应用程序性能的影响,并详细讲解如何利用 libnuma 库以及高级编程策略来优化多路服务器上的程序性能。


第一章:NUMA 架构的兴起与核心概念

1.1 从 UMA 到 NUMA:历史的必然

在多核处理器发展早期,我们通常面对的是 UMA(Uniform Memory Access,统一内存访问)架构。在 UMA 系统中,所有 CPU 都能以相同的访问延迟和带宽访问系统中的任何一块内存。这简化了编程模型,开发者无需关心内存的物理位置。

然而,随着处理器核心数量的增加,以及为了满足日益增长的内存需求,单个内存控制器已经无法满足所有核心的并发访问需求。将所有内存都连接到单个总线上,会导致总线争用和带宽瓶颈。为了解决这个问题,处理器制造商引入了 NUMA 架构。

NUMA 架构的核心思想是:将系统内存分割成多个区域,每个区域都直接连接到一个或一组 CPU(通常是一个物理 CPU 插槽,也称为一个 NUMA 节点)。每个 CPU 可以直接访问其本地内存(连接到自己 NUMA 节点的内存),这种访问速度快、延迟低。但是,当 CPU 需要访问非本地内存(连接到其他 NUMA 节点的内存)时,它必须通过特殊的互连总线(如 Intel 的 QPI 或 UPI)进行跨节点通信,这会导致更高的延迟和更低的带宽。

1.2 NUMA 节点与内存距离

在一个典型的 NUMA 系统中,每个物理 CPU 插槽通常对应一个 NUMA 节点。每个节点包含一个或多个 CPU 核心、一个内存控制器以及与之相连的本地内存。节点之间通过高速互连总线通信。

内存距离(Memory Distance) 是衡量一个 CPU 访问不同 NUMA 节点内存的相对开销。操作系统和 libnuma 库通常会提供这些距离信息。例如,访问本地内存的距离通常定义为 10,而访问邻近节点的内存可能是 20,访问更远节点的内存可能是 30。这些数字是相对的,反映了访问延迟的差异。

核心概念总结:

  • NUMA 节点(NUMA Node):一个包含 CPU 核心、内存控制器和本地内存的物理单元。
  • 本地内存(Local Memory):与 CPU 处于同一 NUMA 节点的内存,访问速度最快。
  • 远程内存(Remote Memory):与 CPU 处于不同 NUMA 节点的内存,访问需要跨节点通信,速度较慢。
  • 内存距离(Memory Distance):衡量不同节点间内存访问开销的指标。

1.3 NUMA 对性能的影响

NUMA 架构对应用程序性能的影响是深远的,主要体现在以下几个方面:

  • 内存访问延迟和带宽:访问远程内存会导致显著的延迟增加和带宽下降。如果一个线程频繁地访问远程内存,其性能将远低于访问本地内存的线程。
  • 缓存一致性协议:跨 NUMA 节点访问或修改共享数据时,需要复杂的缓存一致性协议(如 MESI 协议)来确保所有核心看到的数据都是最新的。这会产生额外的通信开销和延迟。
  • 伪共享(False Sharing):在 NUMA 环境下,伪共享的问题更为突出。如果两个位于不同 NUMA 节点上的线程,分别访问同一个缓存行中不相干的数据,那么每次修改都会导致缓存行在节点间来回“弹跳”,从而引发大量的跨节点通信,严重降低性能。

理解这些基本概念是进行 NUMA 感知编程的第一步。


第二章:为什么 C++ 开发者需要关注 NUMA?

C++ 作为一种追求极致性能的语言,其开发者尤其需要关注 NUMA。许多 C++ 应用程序,特别是高性能计算、数据库、网络服务器和金融交易系统,都需要在多路服务器上运行,并充分利用其并行处理能力。然而,如果程序没有 NUMA 感知,可能会面临以下问题:

2.1 默认的内存分配策略:First-Touch

大多数操作系统采用“First-Touch”策略来分配内存。这意味着,当一个线程首次访问(写入或读取)一块通过 mallocnewmmap 分配的内存页时,这块内存页会被分配到该线程当前运行的 NUMA 节点上。

这个策略本身并没有错,但在某些情况下会导致性能问题:

  • 初始化线程与工作线程分离:如果一个线程(例如主线程)分配并初始化了一个大型数据结构,但实际使用该数据结构的是其他线程,并且这些线程位于不同的 NUMA 节点上,那么所有工作线程都将不得不访问远程内存。
  • 数据结构跨节点访问:如果一个数据结构被多个线程共享,而这些线程分布在不同的 NUMA 节点上,那么无论内存被分配到哪个节点,总会有一些线程需要进行远程访问。

示例:First-Touch 问题

假设我们有一个主线程负责分配一个大数组,然后多个工作线程分别处理数组的不同部分。

#include <iostream>
#include <vector>
#include <thread>
#include <numeric>
#include <chrono>

// 模拟一个耗时操作
void do_work(double* data, size_t start_idx, size_t end_idx, int thread_id) {
    long double sum = 0;
    for (size_t i = start_idx; i < end_idx; ++i) {
        sum += data[i];
        // 模拟一些计算,让CPU保持活跃
        if (i % 1000 == 0) {
            std::this_thread::sleep_for(std::chrono::microseconds(1));
        }
    }
    // std::cout << "Thread " << thread_id << " finished, sum part: " << sum << std::endl;
}

int main() {
    const size_t array_size = 1024 * 1024 * 128; // 1GB double array
    const int num_threads = 8; // 假设有8个核心,可能分布在多个NUMA节点

    // 1. 主线程分配并初始化内存 (First-Touch 发生在主线程所在的NUMA节点)
    std::cout << "Allocating and initializing array..." << std::endl;
    double* data = new double[array_size];
    for (size_t i = 0; i < array_size; ++i) {
        data[i] = static_cast<double>(i);
    }
    std::cout << "Array initialized." << std::endl;

    std::vector<std::thread> threads;
    size_t chunk_size = array_size / num_threads;

    auto start_time = std::chrono::high_resolution_clock::now();

    // 2. 工作线程开始处理数据
    for (int i = 0; i < num_threads; ++i) {
        size_t start_idx = i * chunk_size;
        size_t end_idx = (i == num_threads - 1) ? array_size : (i + 1) * chunk_size;
        threads.emplace_back(do_work, data, start_idx, end_idx, i);
    }

    for (auto& t : threads) {
        t.join();
    }

    auto end_time = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = end_time - start_time;
    std::cout << "Total time taken: " << elapsed.count() << " seconds." << std::endl;

    delete[] data;
    return 0;
}

在上述代码中,data 数组的内存将主要分配在 main 函数运行的 NUMA 节点上。如果 num_threads 跨越了多个 NUMA 节点,那么那些运行在远程节点的线程将不得不通过互连总线访问 data,导致性能下降。

2.2 伪共享与 NUMA 架构

伪共享在单 NUMA 节点内也存在,但在 NUMA 架构中,其危害更大。当不同 NUMA 节点上的线程修改同一个缓存行时,缓存行需要在这些节点之间进行迁移,每次迁移都会带来较高的延迟。

示例:NUMA 环境下的伪共享

#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
#include <atomic>

// 假设我们有一个共享的计数器数组,每个线程更新自己的计数器
struct Counter {
    std::atomic<long long> value;
    // 添加填充以避免单NUMA节点内的伪共享,但在NUMA间效果有限
    char padding[64 - sizeof(std::atomic<long long>)]; 
};

Counter counters[4]; // 假设4个计数器,由4个线程更新

void increment_counter(int thread_id, int num_iterations) {
    for (int i = 0; i < num_iterations; ++i) {
        counters[thread_id].value.fetch_add(1, std::memory_order_relaxed);
    }
}

int main() {
    const int num_threads = 4;
    const int num_iterations = 1000 * 1000 * 100; // 1亿次

    for (int i = 0; i < num_threads; ++i) {
        counters[i].value = 0;
    }

    std::vector<std::thread> threads;
    auto start_time = std::chrono::high_resolution_clock::now();

    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back(increment_counter, i, num_iterations);
    }

    for (auto& t : threads) {
        t.join();
    }

    auto end_time = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = end_time - start_time;
    std::cout << "Total time taken: " << elapsed.count() << " seconds." << std::endl;

    for (int i = 0; i < num_threads; ++i) {
        std::cout << "Counter " << i << ": " << counters[i].value << std::endl;
    }

    return 0;
}

虽然我们加入了 padding 来避免在同一个缓存行内出现多个 Counter 实例,从而减少单 NUMA 节点内的伪共享,但如果这些 Counter 实例分布在内存中,并且其所在的缓存行在不同 NUMA 节点之间被访问,仍然会产生跨节点的数据传输。更根本的解决方案是,让每个线程访问其本地 NUMA 节点上的数据。

这些问题表明,仅仅编写正确的并发代码是不够的,我们还需要考虑数据和线程的物理位置,这就是 NUMA 感知编程的精髓。


第三章:libnuma 库:NUMA 感知编程的基石

libnuma 是一个 Linux 库,它提供了一系列 API,允许应用程序查询 NUMA 系统的拓扑结构,并控制内存分配策略和进程/线程的 CPU 亲和性。它是进行 NUMA 优化不可或缺的工具。

3.1 libnuma 的安装与链接

在大多数基于 Debian/Ubuntu 的系统上,可以通过以下命令安装:

sudo apt-get install libnuma-dev

在基于 Red Hat/CentOS 的系统上:

sudo yum install numactl-devel

编译 C++ 程序时,需要链接 libnuma

g++ your_program.cpp -o your_program -lnuma -pthread

3.2 核心 libnuma 函数概览

libnuma 提供了丰富的函数来管理 NUMA 资源。以下是一些最常用的函数及其用途:

| 函数类别 | 函数名称 | 描述
NUMA (Non-Uniform Memory Access) 感知的 C++ 编程,核心在于理解和利用现代多路服务器的内存拓扑结构,以最大化程序的内存访问效率和整体性能。这不仅仅是编写正确并发代码的问题,更是关于如何安排数据和计算使其尽可能地“靠近”。

第一章:NUMA 架构的本质与性能挑战

1.1 从 UMA 到 NUMA:现代服务器的演进

在早期的多处理器系统中,UMA (Uniform Memory Access,统一内存访问) 是主流。所有 CPU 核心共享一个内存控制器和内存总线,对任何内存地址的访问都具有相同的延迟和带宽。这种模型简单直观,但随着核心数量的激增,单个内存控制器和总线很快成为性能瓶颈。

为了突破这一瓶颈,NUMA 架构应运而生。在 NUMA 系统中,物理内存被划分为多个独立的“NUMA 节点”,每个节点通常与一个或一组 CPU 核心及其专属的内存控制器紧密相连。这意味着:

  • 本地内存访问:CPU 核心访问其所在 NUMA 节点的内存时,速度最快,延迟最低。
  • 远程内存访问:CPU 核心访问其他 NUMA 节点的内存时,需要通过高速互连总线(如 Intel 的 QPI 或 UPI)进行数据传输。这会导致更高的延迟和更低的有效带宽。

这种“非统一”的内存访问特性,正是 NUMA 名称的由来。

1.2 NUMA 拓扑概览

一个典型的 NUMA 服务器可能包含多个 CPU 插槽(Socket),每个插槽就是一个 NUMA 节点。

| 特性 | UMA 系统 | NUMA 系统 “`cppcpp

include

include

include

include

include

include

include

include

include // For std::setprecision

// NUMA-related includes

ifdef linux

include

include // For set_mempolicy, mbind

include // For sched_setaffinity

else

// Mock NUMA functions for non-Linux systems or when libnuma is not available
// This allows the code to compile and run, but without actual NUMA features.

define numa_available() (-1)

define numa_num_configured_nodes() (1)

define numa_max_node() (0)

define numa_alloc_onnode(size, node) (malloc(size))

define numa_free(ptr, size) (free(ptr))

define numa_bind(nodemask) (0)

define numa_set_localalloc() do {} while(0)

define numa_set_membind(nodemask) (0)

define numa_set_interleave(nodemask) (0)

define numa_set_preferred(node) (0)

define numa_run_on_node(node) (0)

define numa_run_on_node_mask(mask) (0)

define numa_node_to_cpus(node, cpumask) do { (void)node; (void)cpumask; } while(0)

// Define cpu_set_t and related macros for non-Linux if not already defined

ifndef _GNU_SOURCE

define _GNU_SOURCE

endif

ifndef CPU_ZERO

define CPU_SETSIZE 1024 // A reasonable default size

typedef struct { unsigned long __bits[CPU_SETSIZE / (8 * sizeof(unsigned long))]; } cpu_set_t;

define CPU_ZERO(set) (memset((set), 0, sizeof(cpu_set_t)))

define CPU_SET(cpu, set) ((set)->__bits[(cpu) / (8 sizeof(unsigned long))] |= (1UL << ((cpu) % (8 sizeof(unsigned long)))))

define CPU_ISSET(cpu, set) (((set)->__bits[(cpu) / (8 sizeof(unsigned long))] >> ((cpu) % (8 sizeof(unsigned long)))) & 1)

define CPU_COUNT(set) (builtin_popcountll((set)->bits[0]) + builtin_popcountll((set)->bits[1])) // Simplified for 2 parts

endif

// Mock get_mempolicy and mbind for non-Linux
int get_mempolicy(int mode, unsigned long nodemask, unsigned long maxnode, void addr, int flags) {
if (mode)
mode = MPOL_DEFAULT;
if (nodemask) nodemask = 1; // Default to node 0
return 0;
}
int mbind(void
addr, unsigned long len, int mode, unsigned long *nodemask, unsigned long maxnode, unsigned flags) {
return 0;
}

define MPOL_DEFAULT 0

define MPOL_BIND 1

define MPOL_INTERLEAVE 2

define MPOL_PREFERRED 3

endif // linux

// Utility function to get current thread’s NUMA node
int get_current_numa_node() {

ifdef linux

if (numa_available() == -1) {
    return -1; // NUMA not available
}
int cpu = sched_getcpu();
if (cpu == -1) {
    return -1; // Failed to get CPU
}
return numa_node_of_cpu(cpu);

else

return 0; // Mock node 0

endif

}

// Helper to print a cpu_set_t
void print_cpu_set(const cpu_set_t& cpuset, int max_cpus) {
std::cout << "[";
bool first = true;
for (int i = 0; i < max_cpus; ++i) {
if (CPU_ISSET(i, &cpuset)) {
if (!first) std::cout << ",";
std::cout << i;
first = false;
}
}
std::cout << "]";
}

// A simple structure to hold worker thread data
struct WorkerData {
double* data_ptr;
size_t start_idx;
size_t end_idx;
int thread_id;
int node_id; // The NUMA node this worker is assigned to
std::chrono::duration elapsed_time;
};

// Worker function for processing data
void process_data_worker(WorkerData& worker_info) {
// Attempt to set thread affinity

ifdef linux

if (numa_available() != -1) {
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    struct bitmask* node_cpus = numa_node_to_cpus(worker_info.node_id, NULL);
    if (node_cpus) {
        for (int i = 0; i < node_cpus->size; ++i) {
            if (numa_bitmask_isbitset(node_cpus, i)) {
                CPU_SET(i, &cpuset);
            }
        }
        numa_free_cpumask(node_cpus);

        if (sched_setaffinity(0, sizeof(cpu_set_t), &cpuset) == 0) {
            // std::cout << "Thread " << worker_info.thread_id << " bound to node " 
            //           << worker_info.node_id << " (CPUs: ";
            // print_cpu_set(cpuset, numa_num_configured_cpus());
            // std::cout << ")" << std::endl;
        } else {
            std::cerr << "Warning: Failed to set affinity for thread " << worker_info.thread_id 
                      << " to node " << worker_info.node_id << std::endl;
        }
    }
}

endif

auto start_time = std::chrono::high_resolution_clock::now();

long double sum = 0;
for (size_t i = worker_info.start_idx; i < worker_info.end_idx; ++i) {
    sum += worker_info.data_ptr[i];
    // Simulate some computation
    if (i % 1000 == 0) {
         std::this_thread::sleep_for(std::chrono::microseconds(1));
    }
}

auto end_time = std::chrono::high_resolution_clock::now();
worker_info.elapsed_time = end_time - start_time;
// std::cout << "Thread " << worker_info.thread_id << " on NUMA node " << get_current_numa_node() 
//           << " finished. Sum part: " << std::setprecision(10) << sum << std::endl;

}

// Function to print current memory policy for an address
void print_memory_policy(void* addr) {

ifdef linux

int mode;
unsigned long nodemask_val;
if (get_mempolicy(&mode, &nodemask_val, NUMA_NUM_NODES, addr, MPOL_F_ADDR | MPOL_F_NODE) == 0) {
    std::cout << "  Memory policy for address " << addr << ": ";
    switch (mode) {
        case MPOL_DEFAULT: std::cout << "DEFAULT"; break;
        case MPOL_BIND: std::cout << "BIND (mask: " << std::hex << nodemask_val << std::dec << ")"; break;
        case MPOL_INTERLEAVE: std::cout << "INTERLEAVE (mask: " << std::hex << nodemask_val << std::dec << ")"; break;
        case MPOL_PREFERRED: std::cout << "PREFERRED (node: " << __builtin_ctzll(nodemask_val) << ")"; break;
        default: std::cout << "UNKNOWN (" << mode << ")"; break;
    }
    std::cout << std::endl;
} else {
    std::cerr << "  Failed to get memory policy for address " << addr << std::endl;
}

else

std::cout << "  Memory policy check not available on this system." << std::endl;

endif

}

int main(int argc, char* argv[]) {
// — NUMA System Info —
std::cout << "— NUMA System Information —" << std::endl;
if (numa_available() == -1) {
std::cout << "NUMA support is not available on this system or libnuma is not linked." << std::endl;
return 1;
}
int num_numa_nodes = numa_num_configured_nodes();
std::cout << "Configured NUMA nodes: " << num_numa_nodes << std::endl;
std::cout << "Max possible NUMA node ID: " << numa_max_node() << std::endl;

ifdef linux

std::cout << "NUMA distances:" << std::endl;
for (int i = 0; i < num_numa_nodes; ++i) {
    for (int j = 0; j < num_numa_nodes; ++j) {
        std::cout << "  Node " << i << " to Node " << j << ": " << numa_distance(i, j) << std::endl;
    }
}
std::cout << "CPU to Node mapping:" << std::endl;
int max_cpu_id = numa_num_configured_cpus();
for (int i = 0; i < num_numa_nodes; ++i) {
    struct bitmask* cpus = numa_node_to_cpus(i, NULL);
    if (cpus) {
        std::cout << "  Node " << i << " has CPUs: ";
        print_cpu_set(*((cpu_set_t*)cpus->maskp), max_cpu_id); // Cast to cpu_set_t for printing
        std::cout << std::endl;
        numa_free_cpumask(cpus);
    }
}

endif

std::cout << "---------------------------------" << std::endl << std::endl;

// --- Configuration ---
const size_t array_size = 1024 * 1024 * 512; // 4GB double array (512M elements)
const int num_threads = std::thread::hardware_concurrency(); // Use all available logical cores
std::cout << "Array size (elements): " << array_size << std::endl;
std::cout << "Number of threads: " << num_threads << std::endl;
std::cout << "Memory per thread (elements): " << array_size / num_threads << std::endl;
std::cout << "Total memory requested: " << (array_size * sizeof(double)) / (1024.0 * 1024.0 * 1024.0) << " GB" << std::endl;

if (num_threads < num_numa_nodes) {
    std::cout << "Warning: Number of threads (" << num_threads << ") is less than NUMA nodes (" 
              << num_numa_nodes << "). Some nodes might be idle." << std::endl;
}

// --- Scenario 1: Non-NUMA Aware (First-Touch) ---
std::cout << "n--- Scenario 1: Non-NUMA Aware (First-Touch) ---" << std::endl;
double* data_non_numa = new double[array_size];
std::cout << "  Allocating and initializing array (First-Touch on current node: " << get_current_numa_node() << ")..." << std::endl;
for (size_t i = 0; i < array_size; ++i) {
    data_non_numa[i] = static_cast<double>(i);
}
print_memory_policy(data_non_numa);

std::vector<WorkerData> worker_data_non_numa(num_threads);
std::vector<std::thread> threads_non_numa;
size_t chunk_size_non_numa = array_size / num_threads;

auto start_time_non_numa = std::chrono::high_resolution_clock::now();
for (int i = 0; i < num_threads; ++i) {
    worker_data_non_numa[i].data_ptr = data_non_numa;
    worker_data_non_numa[i].start_idx = i * chunk_size_non_numa;
    worker_data_non_numa[i].end_idx = (i == num_threads - 1) ? array_size : (i + 1) * chunk_size_non_numa;
    worker_data_non_numa[i].thread_id = i;
    // Assign node for affinity, round-robin
    worker_data_non_numa[i].node_id = i % num_numa_nodes; 
    threads_non_numa.emplace_back(process_data_worker, std::ref(worker_data_non_numa[i]));
}
for (auto& t : threads_non_numa) {
    t.join();
}
auto end_time_non_numa = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed_non_numa = end_time_non_numa - start_time_non_numa;
std::cout << "  Total time (Non-NUMA Aware): " << elapsed_non_numa.count() << " seconds." << std::endl;
delete[] data_non_numa;

// --- Scenario 2: NUMA Aware (Local Allocation + Affinity) ---
std::cout << "n--- Scenario 2: NUMA Aware (Local Allocation + Affinity) ---" << std::endl;
if (num_numa_nodes == 1) {
    std::cout << "  Skipping NUMA-aware scenario: Only one NUMA node detected." << std::endl;
} else {
    std::vector<double*> data_numa_parts(num_threads);
    std::vector<WorkerData> worker_data_numa(num_threads);
    std::vector<std::thread> threads_numa;
    size_t chunk_size_numa = array_size / num_threads;

    std::cout << "  Allocating memory locally for each worker and setting affinity..." << std::endl;
    for (int i = 0; i < num_threads; ++i) {
        worker_data_numa[i].thread_id = i;
        worker_data_numa[i].start_idx = 0; // Each worker gets its own local chunk starting from 0
        worker_data_numa[i].end_idx = chunk_size_numa;
        worker_data_numa[i].node_id = i % num_numa_nodes; // Assign worker to a specific NUMA node

        // Allocate memory on the worker's assigned NUMA node
        // Note: numa_alloc_onnode returns void*, need to cast
        data_numa_parts[i] = static_cast<double*>(numa_alloc_onnode(chunk_size_numa * sizeof(double), worker_data_numa[i].node_id));
        if (!data_numa_parts[i]) {
            std::cerr << "Error: numa_alloc_onnode failed for thread " << i << " on node " 
                      << worker_data_numa[i].node_id << std::endl;
            // Fallback to regular malloc or handle error
            data_numa_parts[i] = new double[chunk_size_numa];
        }
        worker_data_numa[i].data_ptr = data_numa_parts[i];

        // Initialize the local chunk (first-touch will be on the correct node due to numa_alloc_onnode)
        for (size_t j = 0; j < chunk_size_numa; ++j) {
            data_numa_parts[i][j] = static_cast<double>(i * chunk_size_numa + j);
        }
        // print_memory_policy(data_numa_parts[i]); // Can check policy for each part
    }
    std::cout << "  Memory parts allocated and initialized." << std::endl;

    auto start_time_numa = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < num_threads; ++i) {
        threads_numa.emplace_back(process_data_worker, std::ref(worker_data_numa[i]));
    }
    for (auto& t : threads_numa) {
        t.join();
    }
    auto end_time_numa = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed_numa = end_time_numa - start_time_numa;
    std::cout << "  Total time (NUMA Aware): " << elapsed_numa.count() << " seconds." << std::endl;

    for (int i = 0; i < num_threads; ++i) {
        if (data_numa_parts[i]) {
            numa_free(data_numa_parts[i], chunk_size_numa * sizeof(double));
        }
    }
}

// --- Scenario 3: NUMA Aware (Interleaved Allocation + Affinity) ---
std::cout << "n--- Scenario 3: NUMA Aware (Interleaved Allocation + Affinity) ---" << std::endl;
if (num_numa_nodes == 1) {
    std::cout << "  Skipping NUMA-aware scenario: Only one NUMA node detected." << std::endl;
} else {
    double* data_interleaved = nullptr;
    unsigned long nodemask = (1UL << num_numa_nodes) - 1; // Mask for all nodes

    // Set interleave policy for current thread, subsequent allocations will follow
    if (numa_set_interleave(numa_all_nodes_ptr) != 0) { // numa_all_nodes_ptr is a bitmask covering all nodes
        std::cerr << "Warning: Failed to set interleave policy for current thread." << std::endl;
        data_interleaved = new double[array_size]; // Fallback
    } else {
        std::cout << "  Setting interleave memory policy for current thread on all nodes." << std::endl;
        data_interleaved = static_cast<double*>(numa_alloc_interleaved(array_size * sizeof(double)));
        if (!data_interleaved) {
            std::cerr << "Error: numa_alloc_interleaved failed." << std::endl;
            data_interleaved = new double[array_size]; // Fallback
        }
    }

    std::cout << "  Allocating and initializing array with interleaved policy..." << std::endl;
    for (size_t i = 0; i < array_size; ++i) {
        data_interleaved[i] = static_cast<double>(i);
    }
    print_memory_policy(data_interleaved);

    std::vector<WorkerData> worker_data_interleaved(num_threads);
    std::vector<std::thread> threads_interleaved;
    size_t chunk_size_interleaved = array_size / num_threads;

    auto start_time_interleaved = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < num_threads; ++i) {
        worker_data_interleaved[i].data_ptr = data_interleaved;
        worker_data_interleaved[i].start_idx = i * chunk_size_interleaved;
        worker_data_interleaved[i].end_idx = (i == num_threads - 1) ? array_size : (i + 1) * chunk_size_interleaved;
        worker_data_interleaved[i].thread_id = i;
        worker_data_interleaved[i].node_id = i % num_numa_nodes; // Still pin threads for even load
        threads_interleaved.emplace_back(process_data_worker, std::ref(worker_data_interleaved[i]));
    }
    for (auto& t : threads_interleaved) {
        t.join();
    }
    auto end_time_interleaved = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed_interleaved = end_time_interleaved - start_time_interleaved;
    std::cout << "  Total time (NUMA Interleaved): " << elapsed_interleaved.count() << " seconds." << std::endl;

    if (data_interleaved) {
         numa_free(data_interleaved, array_size * sizeof(double));
    }
}

std::cout << "n--- Comparison of scenarios ---" << std::endl;
std::cout << "Non-NUMA Aware time: " << elapsed_non_numa.count() << "s" << std::endl;
if (num_numa_nodes > 1) {
    std::cout << "NUMA Aware (Local Alloc) time: " << elapsed_numa.count() << "s" << std::endl;
    std::cout << "NUMA Aware (Interleaved) time: " << elapsed_interleaved.count() << "s" << std::endl;
}

return 0;

}


### 第二章:NUMA 感知的 C++ 编程策略

NUMA 感知编程的核心原则是:**让数据与处理它的线程尽可能地位于同一个 NUMA 节点上。** 这通常通过以下几种策略实现:

#### 2.1 内存分配策略

`libnuma` 提供了多种内存分配策略,可以覆盖默认的 First-Touch 行为。

##### 2.1.1 显式本地分配 (`numa_alloc_onnode`, `numa_alloc_local`)

这是最直接的 NUMA 优化方式,适用于数据主要由特定 NUMA 节点上的线程访问的场景。

*   `void* numa_alloc_onnode(size_t size, int node)`:在指定的 `node` 上分配 `size` 字节的内存。
*   `void* numa_alloc_local(size_t size)`:在当前线程所在的 NUMA 节点上分配 `size` 字节的内存。

**适用场景**:
*   **分片数据结构**:如果一个大型数据结构可以被逻辑地划分为多个独立的部分,并且每个部分主要由一个或一组线程处理,那么可以将每个部分分配到这些线程所在的 NUMA 节点上。
*   **每线程数据**:为每个工作线程分配的私有缓冲区、栈空间或数据结构,应尽可能地分配到该线程的本地 NUMA 节点。

**示例:本地内存分配**

```cpp
#include <numa.h> // Assuming libnuma is installed and linked

// ... (main function and other includes)

void allocate_on_node_example(int node_id) {
    if (numa_available() == -1) {
        std::cerr << "NUMA support not available." << std::endl;
        return;
    }

    size_t buffer_size = 1024 * 1024; // 1MB
    int* buffer = static_cast<int*>(numa_alloc_onnode(buffer_size * sizeof(int), node_id));
    if (!buffer) {
        std::cerr << "Failed to allocate buffer on NUMA node " << node_id << std::endl;
        return;
    }

    std::cout << "Allocated " << buffer_size * sizeof(int) << " bytes on NUMA node " << node_id << std::endl;
    // Initialize buffer, first-touch will be on node_id
    for (size_t i = 0; i < buffer_size; ++i) {
        buffer[i] = i;
    }

    // After use, free the memory
    numa_free(buffer, buffer_size * sizeof(int));
    std::cout << "Freed buffer from NUMA node " << node_id << std::endl;
}
2.1.2 交错式分配 (numa_alloc_interleaved, numa_set_interleave)

当数据被多个 NUMA 节点上的线程均匀访问时,或者无法有效地将数据划分为本地部分时,交错式分配是一个好的选择。它将内存页轮流分配到指定的 NUMA 节点集合中。

  • void* numa_alloc_interleaved(size_t size):在所有可用 NUMA 节点上交错分配 size 字节的内存。
  • int numa_set_interleave(struct bitmask *nodemask):为当前线程设置交错式内存分配策略。后续由 mallocnew 分配的内存将遵循此策略,直到策略被更改或线程退出。nodemask 指定了参与交错的节点集合。

适用场景

  • 共享队列:多个生产者和消费者线程可能分布在不同的节点上,共享一个队列。
  • 全局只读数据:大型只读查找表,被所有节点上的线程频繁访问。交错分配可以使每个节点都能从本地或近端获取部分数据。
  • 难以分区的数据:数据访问模式复杂,难以有效分区的共享数据。

示例:交错式内存分配

#include <numa.h> // Assuming libnuma is installed and linked

// ... (main function and other includes)

void allocate_interleaved_example() {
    if (numa_available() == -1) {
        std::cerr << "NUMA support not available." << std::endl;
        return;
    }

    size_t buffer_size = 1024 * 1024 * 16; // 16MB
    // Allocate interleaved memory across all available NUMA nodes
    int* buffer = static_cast<int*>(numa_alloc_interleaved(buffer_size * sizeof(int)));
    if (!buffer) {
        std::cerr << "Failed to allocate interleaved buffer." << std::endl;
        return;
    }

    std::cout << "Allocated " << buffer_size * sizeof(int) << " bytes with interleaved policy." << std::endl;
    // Initialize buffer
    for (size_t i = 0; i < buffer_size; ++i) {
        buffer[i] = i;
    }

    numa_free(buffer, buffer_size * sizeof(int));
    std::cout << "Freed interleaved buffer." << std::endl;
}
2.1.3 内存绑定 (numa_set_membind) 和首选节点 (numa_set_preferred)

这两种策略影响一个进程或线程的默认内存分配行为。

  • int numa_set_membind(struct bitmask *nodemask):将当前进程/线程的内存分配限制在 nodemask 指定的节点集合中。如果这些节点内存不足,分配将失败。
  • int numa_set_preferred(int node):设置当前进程/线程优先从指定 node 分配内存。如果该节点内存不足,会尝试从其他节点分配(软策略)。

适用场景

  • numa_set_membind:用于严格控制内存位置的场景,例如将一个数据库实例的所有内存都限制在特定的 NUMA 节点上。
  • numa_set_preferred:提供一个性能优化的提示,同时保持灵活性,例如大多数数据应在某个节点,但允许在必要时溢出到其他节点。

2.2 线程亲和性(Thread Affinity)

仅仅控制内存分配是不够的,还需要将处理数据的线程绑定到数据所在的 NUMA 节点上。这称为线程亲和性或 CPU 绑定。

  • int numa_run_on_node(int node):尝试将当前线程绑定到指定 node 上的任何一个 CPU。
  • int numa_run_on_node_mask(struct bitmask *cpumask):更细粒度地将当前线程绑定到 cpumask 中指定的 CPU 核心集合上。

除了 libnuma 提供的函数,还可以直接使用 POSIX 的 sched_setaffinity 函数来设置 CPU 亲和性。

示例:线程亲和性设置

#include <numa.h>
#include <sched.h> // For sched_setaffinity
#include <thread>
#include <vector>

// ... (main function and other includes)

void worker_thread_with_affinity(int thread_id, int target_node) {
    if (numa_available() == -1) {
        std::cerr << "NUMA support not available for thread " << thread_id << std::endl;
        return;
    }

    // Get CPU mask for the target node
    struct bitmask* cpus = numa_node_to_cpus(target_node, NULL);
    if (!cpus) {
        std::cerr << "Failed to get CPUs for node " << target_node << std::endl;
        return;
    }

    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    for (int i = 0; i < cpus->size; ++i) {
        if (numa_bitmask_isbitset(cpus, i)) {
            CPU_SET(i, &cpuset);
        }
    }
    numa_free_cpumask(cpus);

    // Set affinity
    if (sched_setaffinity(0, sizeof(cpu_set_t), &cpuset) == -1) {
        std::cerr << "Failed to set affinity for thread " << thread_id << " to node " << target_node << std::endl;
        return;
    }

    std::cout << "Thread " << thread_id << " now running on NUMA node " << numa_node_of_cpu(sched_getcpu()) << std::endl;
    // Perform computations...
    std::this_thread::sleep_for(std::chrono::seconds(1)); // Simulate work
}

void affinity_example() {
    if (numa_available() == -1) {
        std::cerr << "NUMA support not available." << std::endl;
        return;
    }

    int num_nodes = numa_num_configured_nodes();
    std::vector<std::thread> threads;

    for (int i = 0; i < num_nodes; ++i) {
        threads.emplace_back(worker_thread_with_affinity, i, i); // Bind thread i to node i
    }

    for (auto& t : threads) {
        t.join();
    }
}

线程亲和性与内存分配的协同作用
当一个线程被绑定到特定的 NUMA 节点时,如果它使用 numa_alloc_local 或默认的 First-Touch 策略分配内存,那么内存将很大概率分配到该节点。这是 NUMA 优化中最强大的组合。

2.3 数据结构设计

NUMA 感知不仅是关于 API 调用,更是关于如何从根本上设计应用程序的数据结构和工作流。

2.3.1 按 NUMA 节点分区数据

将大型共享数据结构(如哈希表、队列、树)分解为多个子结构,每个子结构都分配给一个特定的 NUMA 节点,并由该节点上的线程负责管理和访问。

  • 示例:分区的哈希表
    一个全局哈希表可以被设计为 std::vector<std::unique_ptr<HashTableNode>> tables_per_node;,其中每个 HashTableNode 及其数据都通过 numa_alloc_onnode 分配到不同的 NUMA 节点上。当线程需要访问哈希表时,它根据某种哈希或分区逻辑,确定应访问哪个 NUMA 节点上的子表。
2.3.2 复制只读数据

对于频繁访问的只读数据,如果其大小不是特别巨大,可以考虑在每个 NUMA 节点上复制一份。这消除了跨节点访问的延迟。

2.3.3 避免跨节点伪共享

伪共享在 NUMA 环境下会造成巨大的性能损失。除了经典的缓存行填充技术,更重要的是要确保不同 NUMA 节点上的线程不会频繁地访问同一个缓存行。这通常通过严格的数据分区来实现。

2.4 NUMA 感知的生产者-消费者模式

考虑一个典型的生产者-消费者模式。一个非 NUMA 感知的实现可能会导致所有生产者将数据写入一个全局队列,而所有消费者从该队列读取,如果队列内存集中在某个 NUMA 节点,那么远程节点上的线程将遭受性能损失。

NUMA 感知优化方案

  1. 多队列设计:为每个 NUMA 节点创建一个独立的队列。生产者将数据写入其本地节点的队列,消费者从其本地节点的队列中读取。
  2. 跨节点窃取(Work Stealing):当一个节点的本地队列为空时,消费者可以尝试从其他 NUMA 节点的队列中“窃取”任务。这可以提高负载均衡,但要小心设计窃取策略,避免频繁的远程访问。
  3. 交错队列:如果数据无法有效分区,或者生产/消费负载不均衡,可以考虑使用一个通过 numa_alloc_interleaved 分配的共享队列。虽然仍可能存在远程访问,但交错分配可以使内存访问分布更均匀,减少单个节点的瓶颈。

示例:NUMA 感知多队列

#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <chrono>

#ifdef __linux__
#include <numa.h>
#include <sched.h>
#endif

// ... (Helper functions like set_thread_affinity_to_node, get_current_numa_node from previous examples)

template<typename T>
class NumaAwareQueue {
public:
    NumaAwareQueue() : _stop(false) {}

    void push(T val) {
        std::unique_lock<std::mutex> lock(_mtx);
        _queue.push(val);
        _cv.notify_one();
    }

    bool pop(T& val) {
        std::unique_lock<std::mutex> lock(_mtx);
        _cv.wait(lock, [this]{ return !_queue.empty() || _stop.load(); });
        if (_stop.load() && _queue.empty()) {
            return false;
        }
        val = _queue.front();
        _queue.pop();
        return true;
    }

    void stop() {
        _stop.store(true);
        _cv.notify_all();
    }

private:
    std::queue<T> _queue;
    std::mutex _mtx;
    std::condition_variable _cv;
    std::atomic<bool> _stop;
};

// Global collection of per-NUMA-node queues
std::vector<std::unique_ptr<NumaAwareQueue<int>>> global_queues;

void producer_task(int producer_id, int num_items, int target_node) {
#ifdef __linux__
    // Set thread affinity for producer
    // set_thread_affinity_to_node(target_node); // Assuming this helper exists
#endif

    // std::cout << "Producer " << producer_id << " on node " << get_current_numa_node() 
    //           << " producing to queue on node " << target_node << std::endl;

    for (int i = 0; i < num_items; ++i) {
        global_queues[target_node]->push(producer_id * 1000 + i);
    }
}

void consumer_task(int consumer_id, int target_node, std::atomic<long long>& total_sum) {
#ifdef __linux__
    // Set thread affinity for consumer
    // set_thread_affinity_to_node(target_node); // Assuming this helper exists
#endif

    // std::cout << "Consumer " << consumer_id << " on node " << get_current_numa_node() 
    //           << " consuming from queue on node " << target_node << std::endl;

    int val;
    long long local_sum = 0;
    while (global_queues[target_node]->pop(val)) {
        local_sum += val;
    }
    total_sum.fetch_add(local_sum, std::memory_order_relaxed);
}

// In main:
/*
    int num_nodes = numa_num_configured_nodes();
    int items_per_producer = 100000;
    int num_producers = num_nodes; // One producer per node
    int num_consumers = num_nodes; // One consumer per node

    for (int i = 0; i < num_nodes; ++i) {
        global_queues.emplace_back(std::make_unique<NumaAwareQueue<int>>());
    }

    std::vector<std::thread> producer_threads;
    std::vector<std::thread> consumer_threads;
    std::atomic<long long> total_sum_val(0);

    auto start_time = std::chrono::high_resolution_clock::now();

    for (int i = 0; i < num_producers; ++i) {
        producer_threads.emplace_back(producer_task, i, items_per_producer, i % num_nodes);
    }

    for (int i = 0; i < num_consumers; ++i) {
        consumer_threads.emplace_back(consumer_task, i, i % num_nodes, std::ref(total_sum_val));
    }

    for (auto& t : producer_threads) {
        t.join();
    }
    for (int i = 0; i < num_nodes; ++i) {
        global_queues[i]->stop(); // Signal consumers to stop
    }
    for (auto& t : consumer_threads) {
        t.join();
    }

    auto end_time = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = end_time - start_time;
    std::cout << "Total time (NUMA Aware P-C): " << elapsed.count() << " seconds." << std::endl;
    std::cout << "Total sum: " << total_sum_val << std::endl;
*/

上述代码片段展示了 NUMA 感知多队列的骨架。每个 NUMA 节点拥有自己的队列,生产者和消费者都尝试在其本地节点上操作。线程亲和性(注释掉的部分)将进一步确保生产者和消费者运行在它们操作的队列所在的 NUMA 节点上,从而最大化本地内存访问。

第三章:NUMA 优化工具与最佳实践

仅仅了解 libnuma API 是不够的,还需要结合工具进行分析和调试。

3.1 诊断工具

  • numactl:这是 libnuma 的命令行工具。
    • numactl --hardware:显示 NUMA 节点的拓扑、CPU 核心分布和内存大小。
    • numactl --show:显示当前进程的 NUMA

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注