各位同行,大家好。
在当今高性能计算领域,多核、多处理器(Multi-socket)服务器已成为常态。它们提供了强大的计算能力,但也引入了新的性能挑战,其中之一便是非统一内存访问(Non-Uniform Memory Access,简称 NUMA)架构。理解并优化 NUMA 架构下的应用性能,对于追求极致低延迟和高吞吐量的系统至关重要。
今天,我们将深入探讨 ‘NUMA-aware Scheduling’ 这一主题,特别是它在 Go 语言环境下的应用。我们的目标是,通过强制 Goroutine 绑定到本地内存节点,以显著降低内存访问延迟,从而提升整体应用性能。
NUMA 架构的本质
从 SMP 到 NUMA:演进之路
在理解 NUMA 之前,我们先回顾一下传统的对称多处理器(Symmetric Multi-Processor, SMP)架构。在 SMP 系统中,所有 CPU 共享同一块内存,对任何 CPU 来说,访问任何内存地址的延迟都是相同的。这简化了编程,但也带来了瓶颈:随着 CPU 核心数量的增加,所有 CPU 争抢同一套内存总线和内存控制器,导致内存带宽和延迟成为瓶颈。
为了突破这一瓶颈,NUMA 架构应运而生。NUMA 的核心思想是将系统内存划分为多个“本地”区域,每个 CPU 或 CPU 组(通常是一个物理处理器插槽上的所有核心)都有自己的本地内存控制器和一部分物理内存。这些 CPU 和其本地内存构成一个 NUMA 节点。
NUMA 节点的结构
在一个 NUMA 系统中,通常会有多个这样的 NUMA 节点。每个节点内的 CPU 访问其本地内存速度非常快,延迟低。然而,当一个 CPU 需要访问另一个 NUMA 节点上的内存时,它必须通过特殊的互联总线(如 Intel 的 QPI 或 UPI,AMD 的 Infinity Fabric)进行通信。这种跨节点访问的延迟明显高于本地访问,并且会消耗互联总线的带宽。
我们可以用一个简化的表格来描述这种访问延迟差异:
| 内存访问类型 | 访问路径 | 相对延迟 | 相对带宽 |
|---|---|---|---|
| 本地访问 | CPU -> 本地内存控制器 -> 本地内存 | 最低 | 最高 |
| 远程访问 | CPU -> 互联总线 -> 远程内存控制器 -> 远程内存 | 较高 (2-5x) | 较低 |
如何识别 NUMA 拓扑
在 Linux 系统中,我们可以使用一些工具来查看 NUMA 拓扑结构:
-
lscpu: 提供 CPU 架构的概述,包括 NUMA 节点数量和每个节点上的 CPU 列表。$ lscpu | grep NUMA NUMA node(s): 2 NUMA node0 CPU(s): 0-11,24-35 NUMA node1 CPU(s): 12-23,36-47从这个输出可以看出,系统有 2 个 NUMA 节点。
node0包含 CPU 0-11 和 24-35,node1包含 CPU 12-23 和 36-47。这通常意味着每个物理 CPU 插槽对应一个 NUMA 节点,并且每个核心有超线程(Hyper-threading)或同步多线程(SMT)开启。 -
numactl --hardware: 提供更详细的 NUMA 节点信息,包括每个节点的距离(用于计算远程访问成本)和内存大小。$ numactl --hardware available: 2 nodes (0-1) node 0 cpus: 0 1 2 3 4 5 6 7 8 9 10 11 24 25 26 27 28 29 30 31 32 33 34 35 node 0 size: 64402 MB node 0 free: 63688 MB node 1 cpus: 12 13 14 15 16 17 18 19 20 21 22 23 36 37 38 39 40 41 42 43 44 45 46 47 node 1 size: 64510 MB node 1 free: 63935 MB node distances: node 0 1 0: 10 21 1: 21 10这里的
node distances表明了节点间的“距离”或访问成本。node 0访问node 0的内存成本是 10(基准),访问node 1的内存成本是 21,大约是本地访问的两倍。
NUMA 对性能的影响
当应用程序在一个 NUMA 节点上运行,但频繁访问另一个 NUMA 节点上的数据时,就会发生“远程内存访问”。这会导致:
- 更高的延迟:数据需要跨越互联总线传输,增加了访问时间。
- 带宽瓶颈:互联总线成为共享资源,多个远程访问可能导致争用。
- 缓存效率降低:远程数据可能在其他节点的缓存中,导致更多的缓存同步开销。
对于内存密集型、延迟敏感型应用(如数据库、高性能网络服务、实时分析系统),这些性能损失是不可接受的。因此,确保 Goroutine 及其操作的数据尽可能地位于同一个 NUMA 节点上,是提升性能的关键。这就是 NUMA-aware scheduling 的核心目标。
Go 语言的调度器与 NUMA 挑战
Go 调度器模型回顾
Go 语言的运行时(runtime)包含一个高度优化的调度器,它实现了 M:N 的调度模型:
- G (Goroutine): 轻量级用户态线程,Go 语言并发的基本单位。
- M (Machine): 操作系统线程,由 Go runtime 管理。
- P (Processor): 逻辑处理器,代表 Go 调度器可以使用的 CPU 核心。每个 P 都有一个本地 Goroutine 队列。
Go 调度器负责将 G 映射到 M 上执行,P 充当 M 的上下文。通常,GOMAXPROCS 环境变量决定了 P 的数量,默认等于系统 CPU 核心数。
Go 调度器的一个显著特点是其“工作窃取”(work-stealing)机制:当一个 M 绑定的 P 的本地队列为空时,它会尝试从其他 P 的本地队列中“窃取” Goroutine 来执行。这使得 Goroutine 能够高效地在不同的 M 和 P 之间迁移,充分利用所有可用的 CPU 资源。
Go 调度器的 NUMA “无感知”特性
默认情况下,Go 调度器是 NUMA 无感知的。它将所有可用的 CPU 核心视为一个扁平的池,不区分它们属于哪个 NUMA 节点。这意味着:
- Goroutine 可以在任何 CPU 上执行:一个 Goroutine 可能会在运行时在不同的 NUMA 节点之间迁移。
- 内存分配由操作系统决定:当 Go runtime 分配内存时,它会向操作系统请求。操作系统通常遵循“首次触碰”(first-touch)策略:内存页会被分配到首次访问它的 CPU 所在的 NUMA 节点上。
这带来了潜在的 NUMA 性能问题:
- 数据本地性丢失:如果一个 Goroutine 在 NUMA 节点 A 上分配了一块内存,然后被调度到 NUMA 节点 B 上继续执行并访问这块内存,那么节点 B 上的 CPU 将不得不进行远程内存访问。
- 缓存污染:跨节点访问还会导致缓存行在节点之间来回迁移,降低缓存效率。
对于大多数应用程序,Go 调度器的这种默认行为是完全可以接受的,甚至是有益的,因为它提供了高度的灵活性和资源利用率。然而,对于那些对延迟极度敏感、处理大量数据、且运行在多路 NUMA 服务器上的应用程序来说,这种“无感知”可能会成为性能瓶颈。
模拟 NUMA 性能问题
为了直观理解 NUMA 带来的性能影响,我们编写一个 Go 程序来模拟跨 NUMA 节点访问内存的情况。由于 Go 默认不提供直接的 NUMA 内存分配接口,我们将通过控制 Goroutine 的 CPU 亲和性,并利用操作系统的“首次触碰”策略来间接模拟。
我们将创建两个 Goroutine:
- 生产者 Goroutine:绑定到某个 NUMA 节点(例如 Node 0)的 CPU 上,并在此处分配和初始化一个大数组。
- 消费者 Goroutine:
- 情况 A (本地访问模拟):绑定到与生产者相同的 NUMA 节点(Node 0)的 CPU 上,然后访问该数组。
- 情况 B (远程访问模拟):绑定到另一个 NUMA 节点(Node 1)的 CPU 上,然后访问该数组。
通过比较这两种情况下的访问时间,我们可以看到 NUMA 效应带来的延迟差异。
首先,我们需要一些辅助函数来获取 NUMA 节点信息和设置 CPU 亲和性。
package main
import (
"bytes"
"fmt"
"io/ioutil"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"
)
const arraySize = 1024 * 1024 * 256 // 256MB,确保数据量足够大,不易完全缓存
// getCPUsForNUMANode 解析 /sys/devices/system/node/nodeX/cpulist 文件
// 获取指定 NUMA 节点上可用的 CPU 列表
func getCPUsForNUMANode(nodeID int) ([]int, error) {
path := fmt.Sprintf("/sys/devices/system/node/node%d/cpulist", nodeID)
content, err := ioutil.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read %s: %w", path, err)
}
cpuListStr := strings.TrimSpace(string(content))
var cpus []int
ranges := strings.Split(cpuListStr, ",")
for _, r := range ranges {
if strings.Contains(r, "-") {
parts := strings.Split(r, "-")
start, _ := strconv.Atoi(parts[0])
end, _ := strconv.Atoi(parts[1])
for i := start; i <= end; i++ {
cpus = append(cpus, i)
}
} else {
cpuID, _ := strconv.Atoi(r)
cpus = append(cpus, cpuID)
}
}
return cpus, nil
}
// setCPUAffinity 将当前 OS 线程绑定到指定的 CPU 核心。
// pid 设为 0 表示当前线程。
func setCPUAffinity(cpuID int) error {
var cpuset syscall.CPUSet
cpuset.Zero()
cpuset.Set(cpuID)
pid := 0 // Current thread
return syscall.SchedSetaffinity(pid, &cpuset)
}
// getGID 是一个非官方但常用的获取 Goroutine ID 的方法,仅用于日志输出。
func getGID() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
b = b[bytes.IndexByte(b, ' ')+1:]
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}
func main() {
// 确保 Go 调度器可以使用所有 CPU,但不限制它。
// 实际绑定 Goroutine 到 OS 线程和特定 CPU 核心会覆盖 Go 调度器的部分控制。
runtime.GOMAXPROCS(runtime.NumCPU())
// 1. 发现 NUMA 节点和其 CPU
fmt.Println("Discovering NUMA topology...")
numaNodes := make(map[int][]int)
for i := 0; ; i++ {
cpus, err := getCPUsForNUMANode(i)
if err != nil {
if strings.Contains(err.Error(), "no such file or directory") {
break // 没有更多的 NUMA 节点了
}
fmt.Printf("Error getting CPUs for node %d: %vn", i, err)
return
}
numaNodes[i] = cpus
fmt.Printf("NUMA Node %d has CPUs: %vn", i, cpus)
}
if len(numaNodes) < 2 {
fmt.Println("Warning: Less than 2 NUMA nodes detected. NUMA-aware scheduling benefits will be minimal or non-existent.")
fmt.Println("This program is designed for multi-socket NUMA systems to show performance difference.")
fmt.Println("Exiting as a meaningful comparison requires multiple NUMA nodes.")
return
}
// 确保我们至少有两个节点用于演示
node0CPUs, ok0 := numaNodes[0]
node1CPUs, ok1 := numaNodes[1]
if !ok0 || !ok1 || len(node0CPUs) == 0 || len(node1CPUs) == 0 {
fmt.Println("Error: Could not find enough NUMA nodes/CPUs for demonstration.")
return
}
// 从前两个 NUMA 节点各选择一个 CPU
cpu0 := node0CPUs[0]
cpu1 := node1CPUs[0]
fmt.Printf("nRunning NUMA-aware benchmark with CPU %d (Node 0) and CPU %d (Node 1)...n", cpu0, cpu1)
var wg sync.WaitGroup
var dataOnNode0 []byte // 将由绑定到 Node 0 的 Goroutine 分配的内存
// --- 场景 1: 数据分配在 Node 0,由绑定到 Node 0 的 Goroutine 访问 (本地访问) ---
fmt.Println("n--- 场景 1: 本地访问 (数据在 Node 0,Goroutine 在 Node 0) ---")
wg.Add(1)
go func() {
defer wg.Done()
runtime.LockOSThread() // 将当前 Goroutine 锁定到其 OS 线程
defer runtime.UnlockOSThread()
tid := syscall.Gettid()
fmt.Printf("Goroutine (Producer/Consumer) GID: %d, OS Thread ID: %d. Binding to CPU %d (Node 0).n", getGID(), tid, cpu0)
if err := setCPUAffinity(cpu0); err != nil {
fmt.Printf("Error setting affinity for producer on CPU %d: %vn", cpu0, err)
return
}
// 在绑定后分配和初始化数据。依赖 'first-touch' 策略。
dataOnNode0 = make([]byte, arraySize)
for i := 0; i < arraySize; i++ {
dataOnNode0[i] = byte(i % 256)
}
fmt.Printf("Producer on Node 0 finished allocating %d bytes.n", arraySize)
// 立即在本地读取它
fmt.Println("Goroutine on Node 0 (Consumer) reading its local data.")
start := time.Now()
sum := 0
for i := 0; i < arraySize; i++ {
sum += int(dataOnNode0[i])
}
_ = sum // 防止编译器优化掉循环
elapsed := time.Since(start)
fmt.Printf("本地访问 (Node 0 -> Node 0) 耗时: %sn", elapsed)
}()
wg.Wait()
// 确保 dataOnNode0 已被填充,以便进行远程访问
if dataOnNode0 == nil {
fmt.Println("Error: Data on Node 0 was not allocated. Exiting.")
return
}
// --- 场景 2: 数据分配在 Node 0,由绑定到 Node 1 的 Goroutine 访问 (远程访问) ---
fmt.Println("n--- 场景 2: 远程访问 (数据在 Node 0,Goroutine 在 Node 1) ---")
wg.Add(1)
go func() {
defer wg.Done()
runtime.LockOSThread() // 锁定当前 Goroutine 到其 OS 线程
defer runtime.UnlockOSThread()
tid := syscall.Gettid()
fmt.Printf("Goroutine (Consumer) GID: %d, OS Thread ID: %d. Binding to CPU %d (Node 1).n", getGID(), tid, cpu1)
if err := setCPUAffinity(cpu1); err != nil {
fmt.Printf("Error setting affinity for consumer on CPU %d: %vn", cpu1, err)
return
}
// 访问 dataOnNode0,它是由绑定到 Node 0 的 Goroutine 分配的
fmt.Println("Goroutine on Node 1 (Consumer) reading data from Node 0.")
start := time.Now()
sum := 0
for i := 0; i < arraySize; i++ {
sum += int(dataOnNode0[i]) // 这是跨 NUMA 访问
}
_ = sum
elapsed := time.Since(start)
fmt.Printf("远程访问 (Node 1 -> Node 0) 耗时: %sn", elapsed)
}()
wg.Wait()
fmt.Println("nBenchmark finished.")
}
运行上述代码前,请确保您的系统:
- 是一个多路(Multi-socket)NUMA 服务器。
- 安装了
numactl工具(通常通过sudo apt install numactl或sudo yum install numactl安装)。 - 允许 Go 程序使用
syscall.SchedSetaffinity(通常需要 CAP_SYS_NICE 权限,或者以 root 身份运行,但一般用户也可以)。
预期输出(示例,实际时间会因硬件而异):
Discovering NUMA topology...
NUMA Node 0 has CPUs: [0 1 2 3 4 5 6 7 8 9 10 11 24 25 26 27 28 29 30 31 32 33 34 35]
NUMA Node 1 has CPUs: [12 13 14 15 16 17 18 19 20 21 22 23 36 37 38 39 40 41 42 43 44 45 46 47]
Running NUMA-aware benchmark with CPU 0 (Node 0) and CPU 12 (Node 1)...
--- 场景 1: 本地访问 (数据在 Node 0,Goroutine 在 Node 0) ---
Goroutine (Producer/Consumer) GID: 5, OS Thread ID: 12345. Binding to CPU 0 (Node 0).
Producer on Node 0 finished allocating 268435456 bytes.
Goroutine on Node 0 (Consumer) reading its local data.
本地访问 (Node 0 -> Node 0) 耗时: 15.234567ms
--- 场景 2: 远程访问 (数据在 Node 0,Goroutine 在 Node 1) ---
Goroutine (Consumer) GID: 7, OS Thread ID: 12346. Binding to CPU 12 (Node 1).
Goroutine on Node 1 (Consumer) reading data from Node 0.
远程访问 (Node 1 -> Node 0) 耗时: 32.876543ms
Benchmark finished.
从这个示例输出中,我们可以清楚地看到远程访问的耗时几乎是本地访问的两倍。这直接证明了 NUMA 架构对内存访问延迟的显著影响。
Go 语言中的 NUMA-aware Scheduling 策略
既然我们已经理解了 NUMA 的挑战,那么如何在 Go 应用程序中实现 NUMA-aware scheduling 呢?Go 运行时本身并没有提供高级的 NUMA 亲和性 API,但我们可以利用操作系统提供的机制,并通过 Go 的 runtime.LockOSThread 等功能进行间接控制。
以下是几种主要的策略,从粗粒度到细粒度:
1. 操作系统级别的进程绑定 (粗粒度)
这是最简单也是最粗暴的方法,通过 numactl 命令将整个 Go 进程绑定到一个或多个 NUMA 节点。
# 将整个 Go 程序绑定到 NUMA 节点 0 的 CPU 和内存
numactl --cpunodebind=0 --membind=0 go run your_numa_app.go
# 绑定到节点 0 和 1 的 CPU,但只在节点 0 上分配内存
numactl --cpunodebind=0,1 --membind=0 go run your_numa_app.go
优点:
- 实现简单,无需修改 Go 代码。
- 对整个进程生效,所有 Goroutine 和内存分配都将受限。
缺点:
- 粒度过粗:将整个进程限制在一个或少数 NUMA 节点上,可能无法充分利用其他节点的 CPU 和内存资源。
- 不适用于复杂场景:如果你的应用需要将不同的 Goroutine 工作负载分配到不同的 NUMA 节点,这种方法无法实现。
- 潜在的资源浪费:如果你的应用实际上只在一个节点上繁忙,而其他节点空闲,那么这种绑定可能会导致其他节点的 CPU 和内存无法被利用。
适用场景:
- 应用程序本身是单线程或 Goroutine 数量较少,并且其大部分工作负载集中在一个 NUMA 节点内。
- 你希望确保应用程序不会因跨节点访问而出现意外性能下降,即使这意味着牺牲一些并行度。
2. Go Goroutine 绑定到 OS 线程,OS 线程绑定到 CPU (中等粒度)
这是在 Go 语言中实现 NUMA-aware scheduling 的主要手段,它涉及以下两个关键步骤:
runtime.LockOSThread(): Go 运行时的一个函数,它将当前正在执行的 Goroutine 永久地绑定到它当前所运行的操作系统线程(M)。一旦绑定,这个 Goroutine 将始终在这个特定的 OS 线程上运行,直到调用runtime.UnlockOSThread()。syscall.SchedSetaffinity(): 这是一个 Linux 系统调用(在其他操作系统上可能需要使用相应的 API),用于设置一个进程或线程的 CPU 亲和性。我们可以用它将runtime.LockOSThread()绑定的 OS 线程进一步限制到特定的 CPU 核心,从而间接将其限制到特定的 NUMA 节点。
结合这两个机制,我们可以实现将特定的 Goroutine 及其相关的 OS 线程“钉”在某个 NUMA 节点上。当这个 Goroutine 在这个被绑定的 OS 线程上进行内存分配时,操作系统的“首次触碰”策略会倾向于将内存分配到该 OS 线程当前所在的 NUMA 节点上。
让我们扩展之前的示例,展示如何使用这种方法:
package main
import (
"bytes"
"fmt"
"io/ioutil"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"
"unsafe" // For potential CGO later
)
const arraySize = 1024 * 1024 * 256 // 256MB
// (getCPUsForNUMANode, setCPUAffinity, getGID functions are the same as before)
// getCPUsForNUMANode 解析 /sys/devices/system/node/nodeX/cpulist 文件
// 获取指定 NUMA 节点上可用的 CPU 列表
func getCPUsForNUMANode(nodeID int) ([]int, error) {
path := fmt.Sprintf("/sys/devices/system/node/node%d/cpulist", nodeID)
content, err := ioutil.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read %s: %w", path, err)
}
cpuListStr := strings.TrimSpace(string(content))
var cpus []int
ranges := strings.Split(cpuListStr, ",")
for _, r := range ranges {
if strings.Contains(r, "-") {
parts := strings.Split(r, "-")
start, _ := strconv.Atoi(parts[0])
end, _ := strconv.Atoi(parts[1])
for i := start; i <= end; i++ {
cpus = append(cpus, i)
}
} else {
cpuID, _ := strconv.Atoi(r)
cpus = append(cpus, cpuID)
}
}
return cpus, nil
}
// setCPUAffinity 将当前 OS 线程绑定到指定的 CPU 核心。
// pid 设为 0 表示当前线程。
func setCPUAffinity(cpuID int) error {
var cpuset syscall.CPUSet
cpuset.Zero()
cpuset.Set(cpuID)
pid := 0 // Current thread
return syscall.SchedSetaffinity(pid, &cpuset)
}
// getGID 是一个非官方但常用的获取 Goroutine ID 的方法,仅用于日志输出。
func getGID() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
fmt.Println("Discovering NUMA topology...")
numaNodes := make(map[int][]int)
for i := 0; ; i++ {
cpus, err := getCPUsForNUMANode(i)
if err != nil {
if strings.Contains(err.Error(), "no such file or directory") {
break
}
fmt.Printf("Error getting CPUs for node %d: %vn", i, err)
return
}
numaNodes[i] = cpus
fmt.Printf("NUMA Node %d has CPUs: %vn", i, cpus)
}
if len(numaNodes) < 2 {
fmt.Println("Warning: Less than 2 NUMA nodes detected. NUMA-aware scheduling benefits will be minimal or non-existent.")
fmt.Println("This program is designed for multi-socket NUMA systems.")
return
}
node0CPUs, ok0 := numaNodes[0]
node1CPUs, ok1 := numaNodes[1]
if !ok0 || !ok1 || len(node0CPUs) == 0 || len(node1CPUs) == 0 {
fmt.Println("Error: Could not find enough NUMA nodes/CPUs for demonstration.")
return
}
cpu0 := node0CPUs[0] // Node 0 的第一个 CPU
cpu1 := node1CPUs[0] // Node 1 的第一个 CPU
fmt.Printf("nRunning NUMA-aware benchmark with CPU %d (Node 0) and CPU %d (Node 1)...n", cpu0, cpu1)
var wg sync.WaitGroup
var dataOnNode0 []byte // 由绑定到 Node 0 的 Goroutine 分配的内存
var dataOnNode1 []byte // 由绑定到 Node 1 的 Goroutine 分配的内存
// --- 场景 1: 生产 Goroutine 绑定到 Node 0,分配数据。消费 Goroutine 绑定到 Node 0,访问数据。 ---
fmt.Println("n--- 场景 1: 本地访问 (数据在 Node 0,消费 Goroutine 在 Node 0) ---")
wg.Add(1)
go func() {
defer wg.Done()
runtime.LockOSThread()
defer runtime.UnlockOSThread()
tid := syscall.Gettid()
fmt.Printf("Producer/Consumer GID: %d, OS Thread ID: %d. Binding to CPU %d (Node 0).n", getGID(), tid, cpu0)
if err := setCPUAffinity(cpu0); err != nil {
fmt.Printf("Error setting affinity for Goroutine on CPU %d: %vn", cpu0, err)
return
}
dataOnNode0 = make([]byte, arraySize) // 内存将在 Node 0 上分配 (first-touch)
for i := 0; i < arraySize; i++ {
dataOnNode0[i] = byte(i % 256)
}
fmt.Printf("Producer on Node 0 finished allocating %d bytes.n", arraySize)
fmt.Println("Consumer on Node 0 reading its local data.")
start := time.Now()
sum := 0
for i := 0; i < arraySize; i++ {
sum += int(dataOnNode0[i])
}
_ = sum
elapsed := time.Since(start)
fmt.Printf("本地访问 (Node 0 -> Node 0) 耗时: %sn", elapsed)
}()
wg.Wait()
if dataOnNode0 == nil {
fmt.Println("Error: Data on Node 0 was not allocated. Exiting.")
return
}
// --- 场景 2: 消费 Goroutine 绑定到 Node 1,访问 Node 0 上的数据。 (远程访问) ---
fmt.Println("n--- 场景 2: 远程访问 (数据在 Node 0,消费 Goroutine 在 Node 1) ---")
wg.Add(1)
go func() {
defer wg.Done()
runtime.LockOSThread()
defer runtime.UnlockOSThread()
tid := syscall.Gettid()
fmt.Printf("Consumer GID: %d, OS Thread ID: %d. Binding to CPU %d (Node 1).n", getGID(), tid, cpu1)
if err := setCPUAffinity(cpu1); err != nil {
fmt.Printf("Error setting affinity for Goroutine on CPU %d: %vn", cpu1, err)
return
}
fmt.Println("Consumer on Node 1 reading data from Node 0.")
start := time.Now()
sum := 0
for i := 0; i < arraySize; i++ {
sum += int(dataOnNode0[i]) // 跨 NUMA 访问
}
_ = sum
elapsed := time.Since(start)
fmt.Printf("远程访问 (Node 1 -> Node 0) 耗时: %sn", elapsed)
}()
wg.Wait()
// --- 场景 3: 生产 Goroutine 绑定到 Node 1,分配数据。消费 Goroutine 绑定到 Node 1,访问数据。 ---
fmt.Println("n--- 场景 3: 本地访问 (数据在 Node 1,消费 Goroutine 在 Node 1) ---")
wg.Add(1)
go func() {
defer wg.Done()
runtime.LockOSThread()
defer runtime.UnlockOSThread()
tid := syscall.Gettid()
fmt.Printf("Producer/Consumer GID: %d, OS Thread ID: %d. Binding to CPU %d (Node 1).n", getGID(), tid, cpu1)
if err := setCPUAffinity(cpu1); err != nil {
fmt.Printf("Error setting affinity for Goroutine on CPU %d: %vn", cpu1, err)
return
}
dataOnNode1 = make([]byte, arraySize) // 内存将在 Node 1 上分配 (first-touch)
for i := 0; i < arraySize; i++ {
dataOnNode1[i] = byte(i % 256)
}
fmt.Printf("Producer on Node 1 finished allocating %d bytes.n", arraySize)
fmt.Println("Consumer on Node 1 reading its local data.")
start := time.Now()
sum := 0
for i := 0; i < arraySize; i++ {
sum += int(dataOnNode1[i])
}
_ = sum
elapsed := time.Since(start)
fmt.Printf("本地访问 (Node 1 -> Node 1) 耗时: %sn", elapsed)
}()
wg.Wait()
fmt.Println("nBenchmark finished.")
}
运行和分析:
运行此程序,您将观察到:
- 绑定到同一 NUMA 节点的 Goroutine 访问其本地分配的数据时,延迟最低。
- 绑定到不同 NUMA 节点的 Goroutine 访问远程数据时,延迟会显著增加。
这验证了通过 runtime.LockOSThread 和 syscall.SchedSetaffinity 组合,我们可以有效地控制 Goroutine 的执行位置和其“首次触碰”的内存分配位置,从而实现 NUMA 优化。
注意事项:
runtime.GOMAXPROCS的影响:当使用runtime.LockOSThread时,它会从 Go 调度器的 P 队列中移除一个 M,并将其专门用于绑定的 Goroutine。这意味着它会减少 Go 调度器可以使用的 P 数量。如果过度使用LockOSThread,可能会导致 Go 调度器无法高效地调度其他 Goroutine,甚至出现死锁。因此,谨慎使用,只对需要 NUMA 优化的关键 Goroutine 进行绑定。- 内存“首次触碰”策略:这种方法依赖于操作系统的内存分配策略。在 Linux 上,默认的“首次触碰”通常是有效的,但这不是 Go 语言层面的保证。
- 资源管理:绑定的 OS 线程不会被 Go 运行时回收,直到 Goroutine 结束并调用
runtime.UnlockOSThread()。确保 Goroutine 最终会结束,或者使用一个线程池来管理这些绑定的 OS 线程。
3. CGO 结合 libnuma 进行内存分配 (细粒度/高级)
前一种方法通过绑定 Goroutine 来影响内存分配,但并没有直接控制内存分配的位置。如果需要更精细、更确定的内存 NUMA 亲和性控制,可以直接使用 CGO 调用 libnuma 库提供的函数。libnuma 是 Linux 上用于 NUMA 内存管理的标准库,它提供了直接在特定 NUMA 节点上分配内存的 API。
核心 libnuma 函数:
numa_alloc_onnode(size, node): 在指定的 NUMA 节点上分配size字节的内存。numa_free(ptr, size): 释放numa_alloc_onnode分配的内存。numa_set_preferred(node): 设置当前线程的优先 NUMA 节点,后续内存分配将优先在该节点进行。
CGO 实现示例:
首先,需要一个 C 语言文件(例如 numa_alloc.c)来封装 libnuma 函数:
// numa_alloc.c
#include <numa.h> // 需要安装 libnuma-dev
#include <stdlib.h>
#include <stdio.h>
// 在指定的 NUMA 节点上分配内存
void* allocate_on_numa_node(size_t size, int node) {
if (numa_available() == -1) {
fprintf(stderr, "NUMA functions not available.n");
return NULL;
}
void* ptr = numa_alloc_onnode(size, node);
if (ptr == NULL) {
perror("numa_alloc_onnode failed");
}
return ptr;
}
// 释放由 numa_alloc_onnode 分配的内存
void free_on_numa_node(void* ptr, size_t size) {
if (ptr) {
numa_free(ptr, size);
}
}
// 设置当前线程的 NUMA 偏好节点
void set_numa_preferred(int node) {
if (numa_available() != -1) {
numa_set_preferred(node);
}
}
然后,在 Go 代码中通过 CGO 调用这些 C 函数:
package main
/*
#cgo LDFLAGS: -lnuma
#include <numa.h>
#include <stdlib.h>
#include <stdio.h> // For fprintf and perror
extern void* allocate_on_numa_node(size_t size, int node);
extern void free_on_numa_node(void* ptr, size_t size);
extern void set_numa_preferred(int node);
*/
import "C" // import "C" 必须紧邻 CGO 注释
import (
"bytes"
"fmt"
"io/ioutil"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"
"unsafe"
)
const arraySize = 1024 * 1024 * 256 // 256MB
// (getCPUsForNUMANode, setCPUAffinity, getGID functions are the same as before)
func getCPUsForNUMANode(nodeID int) ([]int, error) {
path := fmt.Sprintf("/sys/devices/system/node/node%d/cpulist", nodeID)
content, err := ioutil.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read %s: %w", path, err)
}
cpuListStr := strings.TrimSpace(string(content))
var cpus []int
ranges := strings.Split(cpuListStr, ",")
for _, r := range ranges {
if strings.Contains(r, "-") {
parts := strings.Split(r, "-")
start, _ := strconv.Atoi(parts[0])
end, _ := strconv.Atoi(parts[1])
for i := start; i <= end; i++ {
cpus = append(cpus, i)
}
} else {
cpuID, _ := strconv.Atoi(r)
cpus = append(cpus, cpuID)
}
}
return cpus, nil
}
func setCPUAffinity(cpuID int) error {
var cpuset syscall.CPUSet
cpuset.Zero()
cpuset.Set(cpuID)
pid := 0 // Current thread
return syscall.SchedSetaffinity(pid, &cpuset)
}
func getGID() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
fmt.Println("Discovering NUMA topology...")
numaNodes := make(map[int][]int)
for i := 0; ; i++ {
cpus, err := getCPUsForNUMANode(i)
if err != nil {
if strings.Contains(err.Error(), "no such file or directory") {
break
}
fmt.Printf("Error getting CPUs for node %d: %vn", i, err)
return
}
numaNodes[i] = cpus
fmt.Printf("NUMA Node %d has CPUs: %vn", i, cpus)
}
if len(numaNodes) < 2 {
fmt.Println("Warning: Less than 2 NUMA nodes detected. NUMA-aware scheduling benefits will be minimal or non-existent.")
fmt.Println("This program is designed for multi-socket NUMA systems.")
return
}
node0CPUs, ok0 := numaNodes[0]
node1CPUs, ok1 := numaNodes[1]
if !ok0 || !ok1 || len(node0CPUs) == 0 || len(node1CPUs) == 0 {
fmt.Println("Error: Could not find enough NUMA nodes/CPUs for demonstration.")
return
}
cpu0 := node0CPUs[0]
cpu1 := node1CPUs[0]
fmt.Printf("nRunning NUMA-aware benchmark with CPU %d (Node 0) and CPU %d (Node 1)...n", cpu0, cpu1)
var wg sync.WaitGroup
var cDataOnNode0 unsafe.Pointer // 使用 CGO 分配的内存指针
var cDataOnNode1 unsafe.Pointer
// --- 场景 4: 使用 CGO 和 libnuma 在特定节点分配内存,并在同一节点访问 ---
fmt.Println("n--- 场景 4: CGO + libnuma 本地访问 (数据在 Node 0,消费 Goroutine 在 Node 0) ---")
wg.Add(1)
go func() {
defer wg.Done()
runtime.LockOSThread()
defer runtime.UnlockOSThread()
tid := syscall.Gettid()
fmt.Printf("Producer/Consumer GID: %d, OS Thread ID: %d. Binding to CPU %d (Node 0).n", getGID(), tid, cpu0)
if err := setCPUAffinity(cpu0); err != nil {
fmt.Printf("Error setting affinity for Goroutine on CPU %d: %vn", cpu0, err)
return
}
// 可以选择设置线程的 NUMA 偏好,但这只影响后续常规 malloc,libnuma_alloc_onnode 更直接
// C.set_numa_preferred(C.int(0))
// 使用 libnuma 在 Node 0 上分配内存
cSize := C.size_t(arraySize)
cNodeID := C.int(0)
cDataOnNode0 = C.allocate_on_numa_node(cSize, cNodeID)
if cDataOnNode0 == nil {
fmt.Println("Error: numa_alloc_onnode failed for Node 0.")
return
}
defer C.free_on_numa_node(cDataOnNode0, cSize) // 确保内存被释放
// 将 C 指针转换为 Go 切片进行操作 (unsafe!)
numaAllocatedData := (*[arraySize]byte)(cDataOnNode0)[:]
for i := 0; i < arraySize; i++ {
numaAllocatedData[i] = byte(i % 256)
}
fmt.Printf("Producer on Node 0 finished allocating %d bytes using libnuma.n", arraySize)
fmt.Println("Consumer on Node 0 reading its libnuma-allocated local data.")
start := time.Now()
sum := 0
for i := 0; i < arraySize; i++ {
sum += int(numaAllocatedData[i])
}
_ = sum
elapsed := time.Since(start)
fmt.Printf("CGO 本地访问 (Node 0 -> Node 0) 耗时: %sn", elapsed)
}()
wg.Wait()
// --- 场景 5: 使用 CGO 和 libnuma 在特定节点分配内存,并在不同节点访问 (远程) ---
// 为了演示远程访问,我们需要先在 Node 0 分配数据,然后让 Node 1 的 Goroutine 访问
fmt.Println("n--- 场景 5: CGO + libnuma 远程访问 (数据在 Node 0,消费 Goroutine 在 Node 1) ---")
// 先在 Node 0 分配数据
cSize := C.size_t(arraySize)
cNodeID := C.int(0)
cDataOnNode0 = C.allocate_on_numa_node(cSize, cNodeID)
if cDataOnNode0 == nil {
fmt.Println("Error: numa_alloc_onnode failed for Node 0 in preparation for remote access.")
return
}
defer C.free_on_numa_node(cDataOnNode0, cSize)
numaAllocatedDataForRemote := (*[arraySize]byte)(cDataOnNode0)[:]
for i := 0; i < arraySize; i++ {
numaAllocatedDataForRemote[i] = byte(i % 256)
}
fmt.Printf("Prepared %d bytes on Node 0 using libnuma for remote access test.n", arraySize)
wg.Add(1)
go func() {
defer wg.Done()
runtime.LockOSThread()
defer runtime.UnlockOSThread()
tid := syscall.Gettid()
fmt.Printf("Consumer GID: %d, OS Thread ID: %d. Binding to CPU %d (Node 1).n", getGID(), tid, cpu1)
if err := setCPUAffinity(cpu1); err != nil {
fmt.Printf("Error setting affinity for Goroutine on CPU %d: %vn", cpu1, err)
return
}
fmt.Println("Consumer on Node 1 reading libnuma-allocated data from Node 0.")
start := time.Now()
sum := 0
for i := 0; i < arraySize; i++ {
sum += int(numaAllocatedDataForRemote[i]) // 跨 NUMA 访问
}
_ = sum
elapsed := time.Since(start)
fmt.Printf("CGO 远程访问 (Node 1 -> Node 0) 耗时: %sn", elapsed)
}()
wg.Wait()
fmt.Println("nBenchmark finished.")
}
编译和运行 CGO 示例:
- 安装
libnuma-dev:sudo apt-get install libnuma-dev # Debian/Ubuntu sudo yum install numactl-devel # CentOS/RHEL - 保存 C 文件:将
numa_alloc.c放在与 Go 文件相同的目录下。 - 编译 Go 程序:
go build -gcflags=-N -gcflags=-l -tags cgo -o numa_cgo_app . # -gcflags=-N -gcflags=-l 用于禁用 Go 编译器的优化,以减少对基准测试结果的影响。 # -tags cgo 启用 CGO - 运行:
./numa_cgo_app
优点:
- 最精确的内存控制:可以直接指定内存在哪个 NUMA 节点上分配,与 CPU 亲和性解耦。
- 避免“首次触碰”策略的不确定性:不再依赖 OS 的首次触碰,内存分配位置是确定的。
缺点:
- 增加了复杂性:需要 CGO,涉及 C 语言代码、编译配置(
#cgo LDFLAGS),以及 Go 与 C 数据类型转换的unsafe操作。 - 降低可移植性:
libnuma是 Linux 特有的,且 CGO 会增加跨平台编译的难度。 - 内存管理负担:需要手动
numa_free,Go GC 无法管理这些内存。
适用场景:
- 对内存布局有严格要求,需要极致性能优化,且愿意承担 CGO 带来的复杂性。
- 构建自定义内存池或高性能数据结构,需要精细控制内存的 NUMA 本地性。
4. 线程池与 NUMA-aware Worker (高级模式)
对于更复杂的、需要动态调度任务的应用程序,可以构建一个 NUMA-aware 的 Goroutine 线程池。这个池中的每个工作 Goroutine 都被绑定到特定的 NUMA 节点(通过 LockOSThread 和 SetAffinity),并负责处理分配给该节点的工作。
基本思想:
- 初始化固定数量的 Worker Goroutine:每个 NUMA 节点启动一个或多个 Worker Goroutine。
- 绑定 Worker:每个 Worker Goroutine 在启动时调用
runtime.LockOSThread()并设置其 OS 线程的 CPU 亲和性到其目标 NUMA 节点的某个 CPU。 - 任务队列:为每个 NUMA 节点维护一个独立的任务队列。
- 任务分发:当有新任务到来时,根据任务涉及的数据(如果已知其 NUMA 节点)或负载均衡策略,将其分发到相应 NUMA 节点的任务队列。
- Worker 执行:Worker 从其本地队列中取出任务执行,确保数据和计算的本地性。
示例结构 (概念性代码):
package main
import (
"fmt"
"io/ioutil"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"
)
// (getCPUsForNUMANode, setCPUAffinity, getGID functions are the same as before)
// ... (omitted for brevity, assume they are present)
type Task struct {
ID int
Data []byte // 假设任务会操作这块数据
TargetNode int // 提示任务应该在哪一个NUMA节点执行
}
type NumaWorker struct {
nodeID int
cpuID int
tasks chan Task
quit chan struct{}
wg *sync.WaitGroup
}
func NewNumaWorker(nodeID, cpuID int, wg *sync.WaitGroup) *NumaWorker {
return &NumaWorker{
nodeID: nodeID,
cpuID: cpuID,
tasks: make(chan Task, 100), // 有缓冲的任务队列
quit: make(chan struct{}),
wg: wg,
}
}
func (nw *NumaWorker) Start() {
nw.wg.Add(1)
go func() {
defer nw.wg.Done()
runtime.LockOSThread() // 绑定 Goroutine 到 OS 线程
defer runtime.UnlockOSThread()
tid := syscall.Gettid()
fmt.Printf("Worker GID: %d, OS Thread ID: %d. Binding to CPU %d (Node %d).n", getGID(), tid, nw.cpuID, nw.nodeID)
if err := setCPUAffinity(nw.cpuID); err != nil {
fmt.Printf("Worker on Node %d failed to set affinity to CPU %d: %vn", nw.nodeID, nw.cpuID, err)
return
}
for {
select {
case task := <-nw.tasks:
fmt.Printf("Worker on Node %d (CPU %d) processing Task %d. Data len: %dn", nw.nodeID, nw.cpuID, task.ID, len(task.Data))
// 模拟数据处理,确保内存访问是本地的
sum := 0
for i := 0; i < len(task.Data); i++ {
sum += int(task.Data[i])
}
_ = sum
time.Sleep(10 * time.Millisecond) // 模拟工作
fmt.Printf("Worker on Node %d (CPU %d) finished Task %d.n", nw.nodeID, nw.cpuID, task.ID)
case <-nw.quit:
fmt.Printf("Worker on Node %d (CPU %d) quitting.n", nw.nodeID, nw.cpuID)
return
}
}
}()
}
func (nw *NumaWorker) Stop() {
close(nw.quit)
}
func (nw *NumaWorker) Submit(task Task) {
nw.tasks <- task
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
fmt.Println("Discovering NUMA topology...")
numaNodes := make(map[int][]int)
for i := 0; ; i++ {
cpus, err := getCPUsForNUMANode(i)
if err != nil {
if strings.Contains(err.Error(), "no such file or directory") {
break
}
fmt.Printf("Error getting CPUs for node %d: %vn", i, err)
return
}
numaNodes[i] = cpus
fmt.Printf("NUMA Node %d has CPUs: %vn", i, cpus)
}
if len(numaNodes) < 2 {
fmt.Println("Warning: Less than 2 NUMA nodes detected. NUMA-aware scheduling benefits will be minimal or non-existent.")
fmt.Println("This program is designed for multi-socket NUMA systems.")
return
}
var wg sync.WaitGroup
workers := make(map[int]*NumaWorker)
// 为每个 NUMA 节点创建一个 Worker
for nodeID, cpus := range numaNodes {
if len(cpus) > 0 {
workerCPU := cpus[0] // 使用每个节点的第一个 CPU
worker := NewNumaWorker(nodeID, workerCPU, &wg)
worker.Start()
workers[nodeID] = worker
}
}
// 模拟任务分发
fmt.Println("nSimulating task distribution...")
for i := 0; i < 10; i++ {
targetNode := i % len(numaNodes) // 轮流分配到不同节点
data := make([]byte, 1024*1024) // 1MB 数据
// 模拟数据在目标节点分配 (first-touch)
wg.Add(1)
go func(taskID, node int, data []byte) {
defer wg.Done()
runtime.LockOSThread()
defer runtime.UnlockOSThread()
if err := setCPUAffinity(numaNodes[node][0]); err != nil { // 绑定到目标节点的CPU
fmt.Printf("Error binding for data init for Task %d: %vn", taskID, err)
return
}
for j := 0; j < len(data); j++ {
data[j] = byte(j % 256)
}
fmt.Printf("Task %d data initialized on Node %d.n", taskID, node)
workers[node].Submit(Task{ID: taskID, Data: data, TargetNode: node})
}(i, targetNode, data)
}
wg.Wait() // 等待所有数据初始化并提交
time.Sleep(2 * time.Second) // 等待 worker 处理一些任务
// 停止所有 worker
for _, worker := range workers {
worker.Stop()
}
wg.Wait() // 等待所有 worker 退出
fmt.Println("nNUMA-aware worker pool demonstration finished.")
}
优点:
- 高度优化:能够最大限度地利用 NUMA 架构的优势,确保数据和计算的本地性。
- 可控性强:应用程序可以根据自身的逻辑,精确控制任务在哪个 NUMA 节点上执行。
- 资源隔离:不同 NUMA 节点上的工作负载可以更好地隔离,减少相互干扰。
缺点:
- 开发复杂性高:需要应用程序本身具有 NUMA-aware 的设计,包括任务分发、数据结构布局等。
- Go 调度器干预减少:由于大量 Goroutine 被
LockOSThread绑定,Go 调度器的工作窃取等优化机制会受限。 - 调试难度大:涉及 Go 运行时、操作系统调度、NUMA 内存管理等多层面。
适用场景:
- 高性能网络服务、数据处理引擎、科学计算等,对延迟和吞吐量有极高要求,且工作负载可以自然地划分为 NUMA 本地任务的场景。
策略总结与比较
| 策略 | 粒度 | 内存分配控制 | CPU 亲和性控制 | 复杂性 | 适用场景 | 优势 | 劣势 |
|---|---|---|---|---|---|---|---|
OS 进程绑定 (numactl) |
进程级 | 整个进程内存 | 整个进程 CPU | 低 | 简单应用,单 NUMA 节点工作负载 | 易于使用,无需代码修改 | 粒度过粗,无法细粒度优化,可能浪费资源 |
Go Goroutine 绑定 (LockOSThread + SetAffinity) |
Goroutine | 依赖“首次触碰”,间接控制 | Goroutine 绑定的 OS 线程 | 中 | 特定 Goroutine 关键路径优化,数据局部性强 |