各位高性能计算的同仁们,大家好!
今天,我们将深入探讨一个在现代多核、多路服务器架构中至关重要的主题:NUMA(Non-Uniform Memory Access,非统一内存访问)感知的 C++ 编程。随着处理器核心数量的爆炸式增长,以及系统内存带宽瓶颈的日益凸显,简单地将所有 CPU 核心视为一个扁平的计算资源池已经不足以应对性能优化的挑战。NUMA 架构正是这一演进的产物,它在带来巨大计算能力的同时,也为软件开发者带来了新的挑战和机遇。
作为一名资深的编程专家,我将带领大家理解 NUMA 的本质,揭示其对 C++ 应用程序性能的影响,并详细讲解如何利用 libnuma 库以及高级编程策略来优化多路服务器上的程序性能。
第一章:NUMA 架构的兴起与核心概念
1.1 从 UMA 到 NUMA:历史的必然
在多核处理器发展早期,我们通常面对的是 UMA(Uniform Memory Access,统一内存访问)架构。在 UMA 系统中,所有 CPU 都能以相同的访问延迟和带宽访问系统中的任何一块内存。这简化了编程模型,开发者无需关心内存的物理位置。
然而,随着处理器核心数量的增加,以及为了满足日益增长的内存需求,单个内存控制器已经无法满足所有核心的并发访问需求。将所有内存都连接到单个总线上,会导致总线争用和带宽瓶颈。为了解决这个问题,处理器制造商引入了 NUMA 架构。
NUMA 架构的核心思想是:将系统内存分割成多个区域,每个区域都直接连接到一个或一组 CPU(通常是一个物理 CPU 插槽,也称为一个 NUMA 节点)。每个 CPU 可以直接访问其本地内存(连接到自己 NUMA 节点的内存),这种访问速度快、延迟低。但是,当 CPU 需要访问非本地内存(连接到其他 NUMA 节点的内存)时,它必须通过特殊的互连总线(如 Intel 的 QPI 或 UPI)进行跨节点通信,这会导致更高的延迟和更低的带宽。
1.2 NUMA 节点与内存距离
在一个典型的 NUMA 系统中,每个物理 CPU 插槽通常对应一个 NUMA 节点。每个节点包含一个或多个 CPU 核心、一个内存控制器以及与之相连的本地内存。节点之间通过高速互连总线通信。
内存距离(Memory Distance) 是衡量一个 CPU 访问不同 NUMA 节点内存的相对开销。操作系统和 libnuma 库通常会提供这些距离信息。例如,访问本地内存的距离通常定义为 10,而访问邻近节点的内存可能是 20,访问更远节点的内存可能是 30。这些数字是相对的,反映了访问延迟的差异。
核心概念总结:
- NUMA 节点(NUMA Node):一个包含 CPU 核心、内存控制器和本地内存的物理单元。
- 本地内存(Local Memory):与 CPU 处于同一 NUMA 节点的内存,访问速度最快。
- 远程内存(Remote Memory):与 CPU 处于不同 NUMA 节点的内存,访问需要跨节点通信,速度较慢。
- 内存距离(Memory Distance):衡量不同节点间内存访问开销的指标。
1.3 NUMA 对性能的影响
NUMA 架构对应用程序性能的影响是深远的,主要体现在以下几个方面:
- 内存访问延迟和带宽:访问远程内存会导致显著的延迟增加和带宽下降。如果一个线程频繁地访问远程内存,其性能将远低于访问本地内存的线程。
- 缓存一致性协议:跨 NUMA 节点访问或修改共享数据时,需要复杂的缓存一致性协议(如 MESI 协议)来确保所有核心看到的数据都是最新的。这会产生额外的通信开销和延迟。
- 伪共享(False Sharing):在 NUMA 环境下,伪共享的问题更为突出。如果两个位于不同 NUMA 节点上的线程,分别访问同一个缓存行中不相干的数据,那么每次修改都会导致缓存行在节点间来回“弹跳”,从而引发大量的跨节点通信,严重降低性能。
理解这些基本概念是进行 NUMA 感知编程的第一步。
第二章:为什么 C++ 开发者需要关注 NUMA?
C++ 作为一种追求极致性能的语言,其开发者尤其需要关注 NUMA。许多 C++ 应用程序,特别是高性能计算、数据库、网络服务器和金融交易系统,都需要在多路服务器上运行,并充分利用其并行处理能力。然而,如果程序没有 NUMA 感知,可能会面临以下问题:
2.1 默认的内存分配策略:First-Touch
大多数操作系统采用“First-Touch”策略来分配内存。这意味着,当一个线程首次访问(写入或读取)一块通过 malloc、new 或 mmap 分配的内存页时,这块内存页会被分配到该线程当前运行的 NUMA 节点上。
这个策略本身并没有错,但在某些情况下会导致性能问题:
- 初始化线程与工作线程分离:如果一个线程(例如主线程)分配并初始化了一个大型数据结构,但实际使用该数据结构的是其他线程,并且这些线程位于不同的 NUMA 节点上,那么所有工作线程都将不得不访问远程内存。
- 数据结构跨节点访问:如果一个数据结构被多个线程共享,而这些线程分布在不同的 NUMA 节点上,那么无论内存被分配到哪个节点,总会有一些线程需要进行远程访问。
示例:First-Touch 问题
假设我们有一个主线程负责分配一个大数组,然后多个工作线程分别处理数组的不同部分。
#include <iostream>
#include <vector>
#include <thread>
#include <numeric>
#include <chrono>
// 模拟一个耗时操作
void do_work(double* data, size_t start_idx, size_t end_idx, int thread_id) {
long double sum = 0;
for (size_t i = start_idx; i < end_idx; ++i) {
sum += data[i];
// 模拟一些计算,让CPU保持活跃
if (i % 1000 == 0) {
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
}
// std::cout << "Thread " << thread_id << " finished, sum part: " << sum << std::endl;
}
int main() {
const size_t array_size = 1024 * 1024 * 128; // 1GB double array
const int num_threads = 8; // 假设有8个核心,可能分布在多个NUMA节点
// 1. 主线程分配并初始化内存 (First-Touch 发生在主线程所在的NUMA节点)
std::cout << "Allocating and initializing array..." << std::endl;
double* data = new double[array_size];
for (size_t i = 0; i < array_size; ++i) {
data[i] = static_cast<double>(i);
}
std::cout << "Array initialized." << std::endl;
std::vector<std::thread> threads;
size_t chunk_size = array_size / num_threads;
auto start_time = std::chrono::high_resolution_clock::now();
// 2. 工作线程开始处理数据
for (int i = 0; i < num_threads; ++i) {
size_t start_idx = i * chunk_size;
size_t end_idx = (i == num_threads - 1) ? array_size : (i + 1) * chunk_size;
threads.emplace_back(do_work, data, start_idx, end_idx, i);
}
for (auto& t : threads) {
t.join();
}
auto end_time = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end_time - start_time;
std::cout << "Total time taken: " << elapsed.count() << " seconds." << std::endl;
delete[] data;
return 0;
}
在上述代码中,data 数组的内存将主要分配在 main 函数运行的 NUMA 节点上。如果 num_threads 跨越了多个 NUMA 节点,那么那些运行在远程节点的线程将不得不通过互连总线访问 data,导致性能下降。
2.2 伪共享与 NUMA 架构
伪共享在单 NUMA 节点内也存在,但在 NUMA 架构中,其危害更大。当不同 NUMA 节点上的线程修改同一个缓存行时,缓存行需要在这些节点之间进行迁移,每次迁移都会带来较高的延迟。
示例:NUMA 环境下的伪共享
#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
#include <atomic>
// 假设我们有一个共享的计数器数组,每个线程更新自己的计数器
struct Counter {
std::atomic<long long> value;
// 添加填充以避免单NUMA节点内的伪共享,但在NUMA间效果有限
char padding[64 - sizeof(std::atomic<long long>)];
};
Counter counters[4]; // 假设4个计数器,由4个线程更新
void increment_counter(int thread_id, int num_iterations) {
for (int i = 0; i < num_iterations; ++i) {
counters[thread_id].value.fetch_add(1, std::memory_order_relaxed);
}
}
int main() {
const int num_threads = 4;
const int num_iterations = 1000 * 1000 * 100; // 1亿次
for (int i = 0; i < num_threads; ++i) {
counters[i].value = 0;
}
std::vector<std::thread> threads;
auto start_time = std::chrono::high_resolution_clock::now();
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(increment_counter, i, num_iterations);
}
for (auto& t : threads) {
t.join();
}
auto end_time = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end_time - start_time;
std::cout << "Total time taken: " << elapsed.count() << " seconds." << std::endl;
for (int i = 0; i < num_threads; ++i) {
std::cout << "Counter " << i << ": " << counters[i].value << std::endl;
}
return 0;
}
虽然我们加入了 padding 来避免在同一个缓存行内出现多个 Counter 实例,从而减少单 NUMA 节点内的伪共享,但如果这些 Counter 实例分布在内存中,并且其所在的缓存行在不同 NUMA 节点之间被访问,仍然会产生跨节点的数据传输。更根本的解决方案是,让每个线程访问其本地 NUMA 节点上的数据。
这些问题表明,仅仅编写正确的并发代码是不够的,我们还需要考虑数据和线程的物理位置,这就是 NUMA 感知编程的精髓。
第三章:libnuma 库:NUMA 感知编程的基石
libnuma 是一个 Linux 库,它提供了一系列 API,允许应用程序查询 NUMA 系统的拓扑结构,并控制内存分配策略和进程/线程的 CPU 亲和性。它是进行 NUMA 优化不可或缺的工具。
3.1 libnuma 的安装与链接
在大多数基于 Debian/Ubuntu 的系统上,可以通过以下命令安装:
sudo apt-get install libnuma-dev
在基于 Red Hat/CentOS 的系统上:
sudo yum install numactl-devel
编译 C++ 程序时,需要链接 libnuma:
g++ your_program.cpp -o your_program -lnuma -pthread
3.2 核心 libnuma 函数概览
libnuma 提供了丰富的函数来管理 NUMA 资源。以下是一些最常用的函数及其用途:
| 函数类别 | 函数名称 | 描述
NUMA (Non-Uniform Memory Access) 感知的 C++ 编程,核心在于理解和利用现代多路服务器的内存拓扑结构,以最大化程序的内存访问效率和整体性能。这不仅仅是编写正确并发代码的问题,更是关于如何安排数据和计算使其尽可能地“靠近”。
第一章:NUMA 架构的本质与性能挑战
1.1 从 UMA 到 NUMA:现代服务器的演进
在早期的多处理器系统中,UMA (Uniform Memory Access,统一内存访问) 是主流。所有 CPU 核心共享一个内存控制器和内存总线,对任何内存地址的访问都具有相同的延迟和带宽。这种模型简单直观,但随着核心数量的激增,单个内存控制器和总线很快成为性能瓶颈。
为了突破这一瓶颈,NUMA 架构应运而生。在 NUMA 系统中,物理内存被划分为多个独立的“NUMA 节点”,每个节点通常与一个或一组 CPU 核心及其专属的内存控制器紧密相连。这意味着:
- 本地内存访问:CPU 核心访问其所在 NUMA 节点的内存时,速度最快,延迟最低。
- 远程内存访问:CPU 核心访问其他 NUMA 节点的内存时,需要通过高速互连总线(如 Intel 的 QPI 或 UPI)进行数据传输。这会导致更高的延迟和更低的有效带宽。
这种“非统一”的内存访问特性,正是 NUMA 名称的由来。
1.2 NUMA 拓扑概览
一个典型的 NUMA 服务器可能包含多个 CPU 插槽(Socket),每个插槽就是一个 NUMA 节点。
| 特性 | UMA 系统 | NUMA 系统 “`cppcpp
include
include
include
include
include
include
include
include
include // For std::setprecision
// NUMA-related includes
ifdef linux
include
include // For set_mempolicy, mbind
include // For sched_setaffinity
else
// Mock NUMA functions for non-Linux systems or when libnuma is not available
// This allows the code to compile and run, but without actual NUMA features.
define numa_available() (-1)
define numa_num_configured_nodes() (1)
define numa_max_node() (0)
define numa_alloc_onnode(size, node) (malloc(size))
define numa_free(ptr, size) (free(ptr))
define numa_bind(nodemask) (0)
define numa_set_localalloc() do {} while(0)
define numa_set_membind(nodemask) (0)
define numa_set_interleave(nodemask) (0)
define numa_set_preferred(node) (0)
define numa_run_on_node(node) (0)
define numa_run_on_node_mask(mask) (0)
define numa_node_to_cpus(node, cpumask) do { (void)node; (void)cpumask; } while(0)
// Define cpu_set_t and related macros for non-Linux if not already defined
ifndef _GNU_SOURCE
define _GNU_SOURCE
endif
ifndef CPU_ZERO
define CPU_SETSIZE 1024 // A reasonable default size
typedef struct { unsigned long __bits[CPU_SETSIZE / (8 * sizeof(unsigned long))]; } cpu_set_t;
define CPU_ZERO(set) (memset((set), 0, sizeof(cpu_set_t)))
define CPU_SET(cpu, set) ((set)->__bits[(cpu) / (8 sizeof(unsigned long))] |= (1UL << ((cpu) % (8 sizeof(unsigned long)))))
define CPU_ISSET(cpu, set) (((set)->__bits[(cpu) / (8 sizeof(unsigned long))] >> ((cpu) % (8 sizeof(unsigned long)))) & 1)
define CPU_COUNT(set) (builtin_popcountll((set)->bits[0]) + builtin_popcountll((set)->bits[1])) // Simplified for 2 parts
endif
// Mock get_mempolicy and mbind for non-Linux
int get_mempolicy(int mode, unsigned long nodemask, unsigned long maxnode, void addr, int flags) {
if (mode) mode = MPOL_DEFAULT;
if (nodemask) nodemask = 1; // Default to node 0
return 0;
}
int mbind(void addr, unsigned long len, int mode, unsigned long *nodemask, unsigned long maxnode, unsigned flags) {
return 0;
}
define MPOL_DEFAULT 0
define MPOL_BIND 1
define MPOL_INTERLEAVE 2
define MPOL_PREFERRED 3
endif // linux
// Utility function to get current thread’s NUMA node
int get_current_numa_node() {
ifdef linux
if (numa_available() == -1) {
return -1; // NUMA not available
}
int cpu = sched_getcpu();
if (cpu == -1) {
return -1; // Failed to get CPU
}
return numa_node_of_cpu(cpu);
else
return 0; // Mock node 0
endif
}
// Helper to print a cpu_set_t
void print_cpu_set(const cpu_set_t& cpuset, int max_cpus) {
std::cout << "[";
bool first = true;
for (int i = 0; i < max_cpus; ++i) {
if (CPU_ISSET(i, &cpuset)) {
if (!first) std::cout << ",";
std::cout << i;
first = false;
}
}
std::cout << "]";
}
// A simple structure to hold worker thread data
struct WorkerData {
double* data_ptr;
size_t start_idx;
size_t end_idx;
int thread_id;
int node_id; // The NUMA node this worker is assigned to
std::chrono::duration elapsed_time;
};
// Worker function for processing data
void process_data_worker(WorkerData& worker_info) {
// Attempt to set thread affinity
ifdef linux
if (numa_available() != -1) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
struct bitmask* node_cpus = numa_node_to_cpus(worker_info.node_id, NULL);
if (node_cpus) {
for (int i = 0; i < node_cpus->size; ++i) {
if (numa_bitmask_isbitset(node_cpus, i)) {
CPU_SET(i, &cpuset);
}
}
numa_free_cpumask(node_cpus);
if (sched_setaffinity(0, sizeof(cpu_set_t), &cpuset) == 0) {
// std::cout << "Thread " << worker_info.thread_id << " bound to node "
// << worker_info.node_id << " (CPUs: ";
// print_cpu_set(cpuset, numa_num_configured_cpus());
// std::cout << ")" << std::endl;
} else {
std::cerr << "Warning: Failed to set affinity for thread " << worker_info.thread_id
<< " to node " << worker_info.node_id << std::endl;
}
}
}
endif
auto start_time = std::chrono::high_resolution_clock::now();
long double sum = 0;
for (size_t i = worker_info.start_idx; i < worker_info.end_idx; ++i) {
sum += worker_info.data_ptr[i];
// Simulate some computation
if (i % 1000 == 0) {
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
}
auto end_time = std::chrono::high_resolution_clock::now();
worker_info.elapsed_time = end_time - start_time;
// std::cout << "Thread " << worker_info.thread_id << " on NUMA node " << get_current_numa_node()
// << " finished. Sum part: " << std::setprecision(10) << sum << std::endl;
}
// Function to print current memory policy for an address
void print_memory_policy(void* addr) {
ifdef linux
int mode;
unsigned long nodemask_val;
if (get_mempolicy(&mode, &nodemask_val, NUMA_NUM_NODES, addr, MPOL_F_ADDR | MPOL_F_NODE) == 0) {
std::cout << " Memory policy for address " << addr << ": ";
switch (mode) {
case MPOL_DEFAULT: std::cout << "DEFAULT"; break;
case MPOL_BIND: std::cout << "BIND (mask: " << std::hex << nodemask_val << std::dec << ")"; break;
case MPOL_INTERLEAVE: std::cout << "INTERLEAVE (mask: " << std::hex << nodemask_val << std::dec << ")"; break;
case MPOL_PREFERRED: std::cout << "PREFERRED (node: " << __builtin_ctzll(nodemask_val) << ")"; break;
default: std::cout << "UNKNOWN (" << mode << ")"; break;
}
std::cout << std::endl;
} else {
std::cerr << " Failed to get memory policy for address " << addr << std::endl;
}
else
std::cout << " Memory policy check not available on this system." << std::endl;
endif
}
int main(int argc, char* argv[]) {
// — NUMA System Info —
std::cout << "— NUMA System Information —" << std::endl;
if (numa_available() == -1) {
std::cout << "NUMA support is not available on this system or libnuma is not linked." << std::endl;
return 1;
}
int num_numa_nodes = numa_num_configured_nodes();
std::cout << "Configured NUMA nodes: " << num_numa_nodes << std::endl;
std::cout << "Max possible NUMA node ID: " << numa_max_node() << std::endl;
ifdef linux
std::cout << "NUMA distances:" << std::endl;
for (int i = 0; i < num_numa_nodes; ++i) {
for (int j = 0; j < num_numa_nodes; ++j) {
std::cout << " Node " << i << " to Node " << j << ": " << numa_distance(i, j) << std::endl;
}
}
std::cout << "CPU to Node mapping:" << std::endl;
int max_cpu_id = numa_num_configured_cpus();
for (int i = 0; i < num_numa_nodes; ++i) {
struct bitmask* cpus = numa_node_to_cpus(i, NULL);
if (cpus) {
std::cout << " Node " << i << " has CPUs: ";
print_cpu_set(*((cpu_set_t*)cpus->maskp), max_cpu_id); // Cast to cpu_set_t for printing
std::cout << std::endl;
numa_free_cpumask(cpus);
}
}
endif
std::cout << "---------------------------------" << std::endl << std::endl;
// --- Configuration ---
const size_t array_size = 1024 * 1024 * 512; // 4GB double array (512M elements)
const int num_threads = std::thread::hardware_concurrency(); // Use all available logical cores
std::cout << "Array size (elements): " << array_size << std::endl;
std::cout << "Number of threads: " << num_threads << std::endl;
std::cout << "Memory per thread (elements): " << array_size / num_threads << std::endl;
std::cout << "Total memory requested: " << (array_size * sizeof(double)) / (1024.0 * 1024.0 * 1024.0) << " GB" << std::endl;
if (num_threads < num_numa_nodes) {
std::cout << "Warning: Number of threads (" << num_threads << ") is less than NUMA nodes ("
<< num_numa_nodes << "). Some nodes might be idle." << std::endl;
}
// --- Scenario 1: Non-NUMA Aware (First-Touch) ---
std::cout << "n--- Scenario 1: Non-NUMA Aware (First-Touch) ---" << std::endl;
double* data_non_numa = new double[array_size];
std::cout << " Allocating and initializing array (First-Touch on current node: " << get_current_numa_node() << ")..." << std::endl;
for (size_t i = 0; i < array_size; ++i) {
data_non_numa[i] = static_cast<double>(i);
}
print_memory_policy(data_non_numa);
std::vector<WorkerData> worker_data_non_numa(num_threads);
std::vector<std::thread> threads_non_numa;
size_t chunk_size_non_numa = array_size / num_threads;
auto start_time_non_numa = std::chrono::high_resolution_clock::now();
for (int i = 0; i < num_threads; ++i) {
worker_data_non_numa[i].data_ptr = data_non_numa;
worker_data_non_numa[i].start_idx = i * chunk_size_non_numa;
worker_data_non_numa[i].end_idx = (i == num_threads - 1) ? array_size : (i + 1) * chunk_size_non_numa;
worker_data_non_numa[i].thread_id = i;
// Assign node for affinity, round-robin
worker_data_non_numa[i].node_id = i % num_numa_nodes;
threads_non_numa.emplace_back(process_data_worker, std::ref(worker_data_non_numa[i]));
}
for (auto& t : threads_non_numa) {
t.join();
}
auto end_time_non_numa = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed_non_numa = end_time_non_numa - start_time_non_numa;
std::cout << " Total time (Non-NUMA Aware): " << elapsed_non_numa.count() << " seconds." << std::endl;
delete[] data_non_numa;
// --- Scenario 2: NUMA Aware (Local Allocation + Affinity) ---
std::cout << "n--- Scenario 2: NUMA Aware (Local Allocation + Affinity) ---" << std::endl;
if (num_numa_nodes == 1) {
std::cout << " Skipping NUMA-aware scenario: Only one NUMA node detected." << std::endl;
} else {
std::vector<double*> data_numa_parts(num_threads);
std::vector<WorkerData> worker_data_numa(num_threads);
std::vector<std::thread> threads_numa;
size_t chunk_size_numa = array_size / num_threads;
std::cout << " Allocating memory locally for each worker and setting affinity..." << std::endl;
for (int i = 0; i < num_threads; ++i) {
worker_data_numa[i].thread_id = i;
worker_data_numa[i].start_idx = 0; // Each worker gets its own local chunk starting from 0
worker_data_numa[i].end_idx = chunk_size_numa;
worker_data_numa[i].node_id = i % num_numa_nodes; // Assign worker to a specific NUMA node
// Allocate memory on the worker's assigned NUMA node
// Note: numa_alloc_onnode returns void*, need to cast
data_numa_parts[i] = static_cast<double*>(numa_alloc_onnode(chunk_size_numa * sizeof(double), worker_data_numa[i].node_id));
if (!data_numa_parts[i]) {
std::cerr << "Error: numa_alloc_onnode failed for thread " << i << " on node "
<< worker_data_numa[i].node_id << std::endl;
// Fallback to regular malloc or handle error
data_numa_parts[i] = new double[chunk_size_numa];
}
worker_data_numa[i].data_ptr = data_numa_parts[i];
// Initialize the local chunk (first-touch will be on the correct node due to numa_alloc_onnode)
for (size_t j = 0; j < chunk_size_numa; ++j) {
data_numa_parts[i][j] = static_cast<double>(i * chunk_size_numa + j);
}
// print_memory_policy(data_numa_parts[i]); // Can check policy for each part
}
std::cout << " Memory parts allocated and initialized." << std::endl;
auto start_time_numa = std::chrono::high_resolution_clock::now();
for (int i = 0; i < num_threads; ++i) {
threads_numa.emplace_back(process_data_worker, std::ref(worker_data_numa[i]));
}
for (auto& t : threads_numa) {
t.join();
}
auto end_time_numa = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed_numa = end_time_numa - start_time_numa;
std::cout << " Total time (NUMA Aware): " << elapsed_numa.count() << " seconds." << std::endl;
for (int i = 0; i < num_threads; ++i) {
if (data_numa_parts[i]) {
numa_free(data_numa_parts[i], chunk_size_numa * sizeof(double));
}
}
}
// --- Scenario 3: NUMA Aware (Interleaved Allocation + Affinity) ---
std::cout << "n--- Scenario 3: NUMA Aware (Interleaved Allocation + Affinity) ---" << std::endl;
if (num_numa_nodes == 1) {
std::cout << " Skipping NUMA-aware scenario: Only one NUMA node detected." << std::endl;
} else {
double* data_interleaved = nullptr;
unsigned long nodemask = (1UL << num_numa_nodes) - 1; // Mask for all nodes
// Set interleave policy for current thread, subsequent allocations will follow
if (numa_set_interleave(numa_all_nodes_ptr) != 0) { // numa_all_nodes_ptr is a bitmask covering all nodes
std::cerr << "Warning: Failed to set interleave policy for current thread." << std::endl;
data_interleaved = new double[array_size]; // Fallback
} else {
std::cout << " Setting interleave memory policy for current thread on all nodes." << std::endl;
data_interleaved = static_cast<double*>(numa_alloc_interleaved(array_size * sizeof(double)));
if (!data_interleaved) {
std::cerr << "Error: numa_alloc_interleaved failed." << std::endl;
data_interleaved = new double[array_size]; // Fallback
}
}
std::cout << " Allocating and initializing array with interleaved policy..." << std::endl;
for (size_t i = 0; i < array_size; ++i) {
data_interleaved[i] = static_cast<double>(i);
}
print_memory_policy(data_interleaved);
std::vector<WorkerData> worker_data_interleaved(num_threads);
std::vector<std::thread> threads_interleaved;
size_t chunk_size_interleaved = array_size / num_threads;
auto start_time_interleaved = std::chrono::high_resolution_clock::now();
for (int i = 0; i < num_threads; ++i) {
worker_data_interleaved[i].data_ptr = data_interleaved;
worker_data_interleaved[i].start_idx = i * chunk_size_interleaved;
worker_data_interleaved[i].end_idx = (i == num_threads - 1) ? array_size : (i + 1) * chunk_size_interleaved;
worker_data_interleaved[i].thread_id = i;
worker_data_interleaved[i].node_id = i % num_numa_nodes; // Still pin threads for even load
threads_interleaved.emplace_back(process_data_worker, std::ref(worker_data_interleaved[i]));
}
for (auto& t : threads_interleaved) {
t.join();
}
auto end_time_interleaved = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed_interleaved = end_time_interleaved - start_time_interleaved;
std::cout << " Total time (NUMA Interleaved): " << elapsed_interleaved.count() << " seconds." << std::endl;
if (data_interleaved) {
numa_free(data_interleaved, array_size * sizeof(double));
}
}
std::cout << "n--- Comparison of scenarios ---" << std::endl;
std::cout << "Non-NUMA Aware time: " << elapsed_non_numa.count() << "s" << std::endl;
if (num_numa_nodes > 1) {
std::cout << "NUMA Aware (Local Alloc) time: " << elapsed_numa.count() << "s" << std::endl;
std::cout << "NUMA Aware (Interleaved) time: " << elapsed_interleaved.count() << "s" << std::endl;
}
return 0;
}
### 第二章:NUMA 感知的 C++ 编程策略
NUMA 感知编程的核心原则是:**让数据与处理它的线程尽可能地位于同一个 NUMA 节点上。** 这通常通过以下几种策略实现:
#### 2.1 内存分配策略
`libnuma` 提供了多种内存分配策略,可以覆盖默认的 First-Touch 行为。
##### 2.1.1 显式本地分配 (`numa_alloc_onnode`, `numa_alloc_local`)
这是最直接的 NUMA 优化方式,适用于数据主要由特定 NUMA 节点上的线程访问的场景。
* `void* numa_alloc_onnode(size_t size, int node)`:在指定的 `node` 上分配 `size` 字节的内存。
* `void* numa_alloc_local(size_t size)`:在当前线程所在的 NUMA 节点上分配 `size` 字节的内存。
**适用场景**:
* **分片数据结构**:如果一个大型数据结构可以被逻辑地划分为多个独立的部分,并且每个部分主要由一个或一组线程处理,那么可以将每个部分分配到这些线程所在的 NUMA 节点上。
* **每线程数据**:为每个工作线程分配的私有缓冲区、栈空间或数据结构,应尽可能地分配到该线程的本地 NUMA 节点。
**示例:本地内存分配**
```cpp
#include <numa.h> // Assuming libnuma is installed and linked
// ... (main function and other includes)
void allocate_on_node_example(int node_id) {
if (numa_available() == -1) {
std::cerr << "NUMA support not available." << std::endl;
return;
}
size_t buffer_size = 1024 * 1024; // 1MB
int* buffer = static_cast<int*>(numa_alloc_onnode(buffer_size * sizeof(int), node_id));
if (!buffer) {
std::cerr << "Failed to allocate buffer on NUMA node " << node_id << std::endl;
return;
}
std::cout << "Allocated " << buffer_size * sizeof(int) << " bytes on NUMA node " << node_id << std::endl;
// Initialize buffer, first-touch will be on node_id
for (size_t i = 0; i < buffer_size; ++i) {
buffer[i] = i;
}
// After use, free the memory
numa_free(buffer, buffer_size * sizeof(int));
std::cout << "Freed buffer from NUMA node " << node_id << std::endl;
}
2.1.2 交错式分配 (numa_alloc_interleaved, numa_set_interleave)
当数据被多个 NUMA 节点上的线程均匀访问时,或者无法有效地将数据划分为本地部分时,交错式分配是一个好的选择。它将内存页轮流分配到指定的 NUMA 节点集合中。
void* numa_alloc_interleaved(size_t size):在所有可用 NUMA 节点上交错分配size字节的内存。int numa_set_interleave(struct bitmask *nodemask):为当前线程设置交错式内存分配策略。后续由malloc或new分配的内存将遵循此策略,直到策略被更改或线程退出。nodemask指定了参与交错的节点集合。
适用场景:
- 共享队列:多个生产者和消费者线程可能分布在不同的节点上,共享一个队列。
- 全局只读数据:大型只读查找表,被所有节点上的线程频繁访问。交错分配可以使每个节点都能从本地或近端获取部分数据。
- 难以分区的数据:数据访问模式复杂,难以有效分区的共享数据。
示例:交错式内存分配
#include <numa.h> // Assuming libnuma is installed and linked
// ... (main function and other includes)
void allocate_interleaved_example() {
if (numa_available() == -1) {
std::cerr << "NUMA support not available." << std::endl;
return;
}
size_t buffer_size = 1024 * 1024 * 16; // 16MB
// Allocate interleaved memory across all available NUMA nodes
int* buffer = static_cast<int*>(numa_alloc_interleaved(buffer_size * sizeof(int)));
if (!buffer) {
std::cerr << "Failed to allocate interleaved buffer." << std::endl;
return;
}
std::cout << "Allocated " << buffer_size * sizeof(int) << " bytes with interleaved policy." << std::endl;
// Initialize buffer
for (size_t i = 0; i < buffer_size; ++i) {
buffer[i] = i;
}
numa_free(buffer, buffer_size * sizeof(int));
std::cout << "Freed interleaved buffer." << std::endl;
}
2.1.3 内存绑定 (numa_set_membind) 和首选节点 (numa_set_preferred)
这两种策略影响一个进程或线程的默认内存分配行为。
int numa_set_membind(struct bitmask *nodemask):将当前进程/线程的内存分配限制在nodemask指定的节点集合中。如果这些节点内存不足,分配将失败。int numa_set_preferred(int node):设置当前进程/线程优先从指定node分配内存。如果该节点内存不足,会尝试从其他节点分配(软策略)。
适用场景:
numa_set_membind:用于严格控制内存位置的场景,例如将一个数据库实例的所有内存都限制在特定的 NUMA 节点上。numa_set_preferred:提供一个性能优化的提示,同时保持灵活性,例如大多数数据应在某个节点,但允许在必要时溢出到其他节点。
2.2 线程亲和性(Thread Affinity)
仅仅控制内存分配是不够的,还需要将处理数据的线程绑定到数据所在的 NUMA 节点上。这称为线程亲和性或 CPU 绑定。
int numa_run_on_node(int node):尝试将当前线程绑定到指定node上的任何一个 CPU。int numa_run_on_node_mask(struct bitmask *cpumask):更细粒度地将当前线程绑定到cpumask中指定的 CPU 核心集合上。
除了 libnuma 提供的函数,还可以直接使用 POSIX 的 sched_setaffinity 函数来设置 CPU 亲和性。
示例:线程亲和性设置
#include <numa.h>
#include <sched.h> // For sched_setaffinity
#include <thread>
#include <vector>
// ... (main function and other includes)
void worker_thread_with_affinity(int thread_id, int target_node) {
if (numa_available() == -1) {
std::cerr << "NUMA support not available for thread " << thread_id << std::endl;
return;
}
// Get CPU mask for the target node
struct bitmask* cpus = numa_node_to_cpus(target_node, NULL);
if (!cpus) {
std::cerr << "Failed to get CPUs for node " << target_node << std::endl;
return;
}
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
for (int i = 0; i < cpus->size; ++i) {
if (numa_bitmask_isbitset(cpus, i)) {
CPU_SET(i, &cpuset);
}
}
numa_free_cpumask(cpus);
// Set affinity
if (sched_setaffinity(0, sizeof(cpu_set_t), &cpuset) == -1) {
std::cerr << "Failed to set affinity for thread " << thread_id << " to node " << target_node << std::endl;
return;
}
std::cout << "Thread " << thread_id << " now running on NUMA node " << numa_node_of_cpu(sched_getcpu()) << std::endl;
// Perform computations...
std::this_thread::sleep_for(std::chrono::seconds(1)); // Simulate work
}
void affinity_example() {
if (numa_available() == -1) {
std::cerr << "NUMA support not available." << std::endl;
return;
}
int num_nodes = numa_num_configured_nodes();
std::vector<std::thread> threads;
for (int i = 0; i < num_nodes; ++i) {
threads.emplace_back(worker_thread_with_affinity, i, i); // Bind thread i to node i
}
for (auto& t : threads) {
t.join();
}
}
线程亲和性与内存分配的协同作用:
当一个线程被绑定到特定的 NUMA 节点时,如果它使用 numa_alloc_local 或默认的 First-Touch 策略分配内存,那么内存将很大概率分配到该节点。这是 NUMA 优化中最强大的组合。
2.3 数据结构设计
NUMA 感知不仅是关于 API 调用,更是关于如何从根本上设计应用程序的数据结构和工作流。
2.3.1 按 NUMA 节点分区数据
将大型共享数据结构(如哈希表、队列、树)分解为多个子结构,每个子结构都分配给一个特定的 NUMA 节点,并由该节点上的线程负责管理和访问。
- 示例:分区的哈希表
一个全局哈希表可以被设计为std::vector<std::unique_ptr<HashTableNode>> tables_per_node;,其中每个HashTableNode及其数据都通过numa_alloc_onnode分配到不同的 NUMA 节点上。当线程需要访问哈希表时,它根据某种哈希或分区逻辑,确定应访问哪个 NUMA 节点上的子表。
2.3.2 复制只读数据
对于频繁访问的只读数据,如果其大小不是特别巨大,可以考虑在每个 NUMA 节点上复制一份。这消除了跨节点访问的延迟。
2.3.3 避免跨节点伪共享
伪共享在 NUMA 环境下会造成巨大的性能损失。除了经典的缓存行填充技术,更重要的是要确保不同 NUMA 节点上的线程不会频繁地访问同一个缓存行。这通常通过严格的数据分区来实现。
2.4 NUMA 感知的生产者-消费者模式
考虑一个典型的生产者-消费者模式。一个非 NUMA 感知的实现可能会导致所有生产者将数据写入一个全局队列,而所有消费者从该队列读取,如果队列内存集中在某个 NUMA 节点,那么远程节点上的线程将遭受性能损失。
NUMA 感知优化方案:
- 多队列设计:为每个 NUMA 节点创建一个独立的队列。生产者将数据写入其本地节点的队列,消费者从其本地节点的队列中读取。
- 跨节点窃取(Work Stealing):当一个节点的本地队列为空时,消费者可以尝试从其他 NUMA 节点的队列中“窃取”任务。这可以提高负载均衡,但要小心设计窃取策略,避免频繁的远程访问。
- 交错队列:如果数据无法有效分区,或者生产/消费负载不均衡,可以考虑使用一个通过
numa_alloc_interleaved分配的共享队列。虽然仍可能存在远程访问,但交错分配可以使内存访问分布更均匀,减少单个节点的瓶颈。
示例:NUMA 感知多队列
#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <chrono>
#ifdef __linux__
#include <numa.h>
#include <sched.h>
#endif
// ... (Helper functions like set_thread_affinity_to_node, get_current_numa_node from previous examples)
template<typename T>
class NumaAwareQueue {
public:
NumaAwareQueue() : _stop(false) {}
void push(T val) {
std::unique_lock<std::mutex> lock(_mtx);
_queue.push(val);
_cv.notify_one();
}
bool pop(T& val) {
std::unique_lock<std::mutex> lock(_mtx);
_cv.wait(lock, [this]{ return !_queue.empty() || _stop.load(); });
if (_stop.load() && _queue.empty()) {
return false;
}
val = _queue.front();
_queue.pop();
return true;
}
void stop() {
_stop.store(true);
_cv.notify_all();
}
private:
std::queue<T> _queue;
std::mutex _mtx;
std::condition_variable _cv;
std::atomic<bool> _stop;
};
// Global collection of per-NUMA-node queues
std::vector<std::unique_ptr<NumaAwareQueue<int>>> global_queues;
void producer_task(int producer_id, int num_items, int target_node) {
#ifdef __linux__
// Set thread affinity for producer
// set_thread_affinity_to_node(target_node); // Assuming this helper exists
#endif
// std::cout << "Producer " << producer_id << " on node " << get_current_numa_node()
// << " producing to queue on node " << target_node << std::endl;
for (int i = 0; i < num_items; ++i) {
global_queues[target_node]->push(producer_id * 1000 + i);
}
}
void consumer_task(int consumer_id, int target_node, std::atomic<long long>& total_sum) {
#ifdef __linux__
// Set thread affinity for consumer
// set_thread_affinity_to_node(target_node); // Assuming this helper exists
#endif
// std::cout << "Consumer " << consumer_id << " on node " << get_current_numa_node()
// << " consuming from queue on node " << target_node << std::endl;
int val;
long long local_sum = 0;
while (global_queues[target_node]->pop(val)) {
local_sum += val;
}
total_sum.fetch_add(local_sum, std::memory_order_relaxed);
}
// In main:
/*
int num_nodes = numa_num_configured_nodes();
int items_per_producer = 100000;
int num_producers = num_nodes; // One producer per node
int num_consumers = num_nodes; // One consumer per node
for (int i = 0; i < num_nodes; ++i) {
global_queues.emplace_back(std::make_unique<NumaAwareQueue<int>>());
}
std::vector<std::thread> producer_threads;
std::vector<std::thread> consumer_threads;
std::atomic<long long> total_sum_val(0);
auto start_time = std::chrono::high_resolution_clock::now();
for (int i = 0; i < num_producers; ++i) {
producer_threads.emplace_back(producer_task, i, items_per_producer, i % num_nodes);
}
for (int i = 0; i < num_consumers; ++i) {
consumer_threads.emplace_back(consumer_task, i, i % num_nodes, std::ref(total_sum_val));
}
for (auto& t : producer_threads) {
t.join();
}
for (int i = 0; i < num_nodes; ++i) {
global_queues[i]->stop(); // Signal consumers to stop
}
for (auto& t : consumer_threads) {
t.join();
}
auto end_time = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end_time - start_time;
std::cout << "Total time (NUMA Aware P-C): " << elapsed.count() << " seconds." << std::endl;
std::cout << "Total sum: " << total_sum_val << std::endl;
*/
上述代码片段展示了 NUMA 感知多队列的骨架。每个 NUMA 节点拥有自己的队列,生产者和消费者都尝试在其本地节点上操作。线程亲和性(注释掉的部分)将进一步确保生产者和消费者运行在它们操作的队列所在的 NUMA 节点上,从而最大化本地内存访问。
第三章:NUMA 优化工具与最佳实践
仅仅了解 libnuma API 是不够的,还需要结合工具进行分析和调试。
3.1 诊断工具
numactl:这是libnuma的命令行工具。numactl --hardware:显示 NUMA 节点的拓扑、CPU 核心分布和内存大小。numactl --show:显示当前进程的 NUMA