各位同仁,各位对高性能计算和人工智能部署充满热情的工程师们,大家好。
在当前人工智能技术爆炸式发展的时代,我们面对着一个日益普遍且复杂的挑战:如何在一个有限的计算资源池上,高效、稳定地同时运行多个机器学习模型。这不仅仅是简单地将模型并行化的问题,更深层次的需求是实现“多模型并发推理”下的“算力隔离”。今天,我将以一名编程专家的视角,为大家深入剖析如何利用 C++ 的强大能力,结合操作系统和硬件层面的机制,实现这一目标。
1. 多模型并发推理的崛起与算力隔离的必要性
随着人工智能应用场景的不断拓展,我们不再满足于单个模型解决单一问题。一个复杂的智能系统,可能需要同时部署多个不同架构、不同功能的模型:例如,一个智能客服系统可能同时运行语义理解模型、情感分析模型和知识图谱检索模型;一个自动驾驶系统则可能同时处理目标检测、路径规划和驾驶决策模型。这些模型可能由不同的团队开发,使用不同的框架,对计算资源的需求也大相径庭。
在这种多模型并发推理的场景下,如果没有有效的资源管理和隔离机制,我们将面临一系列严峻的问题:
- 性能抖动与服务质量 (QoS) 下降: 某个资源密集型模型可能会抢占所有可用算力,导致其他模型的推理延迟急剧增加,甚至超时。
- 资源利用率低下: 为了避免相互干扰,我们可能被迫为每个模型预留远超实际需求的峰值资源,造成大量算力闲置。
- 系统稳定性与可靠性风险: 一个模型的内存泄漏或计算错误,可能会影响到整个进程或系统,导致所有模型崩溃。
- 安全性与公平性问题: 在多租户或共享资源环境中,缺乏隔离可能导致敏感数据泄露,或某些用户无法获得公平的算力分配。
因此,“算力隔离”成为了实现高效、稳定、安全多模型并发推理的关键。它旨在确保每个模型或模型组都能够获得其所需的计算资源,并防止它们之间相互干扰。
2. 核心问题剖析:资源争抢的本质
在深入探讨实现方法之前,我们必须清晰地理解多模型并发推理中,究竟哪些计算资源会发生争抢。
2.1 中央处理器 (CPU)
CPU 是大多数模型推理流程的起点和终点。它负责加载模型、预处理输入数据、调度计算任务、后处理输出结果,并且对于一些轻量级模型或特定层,CPU 也能直接执行推理。
- 核心争抢: 多个模型并发运行时,如果线程数量超过可用 CPU 核心数,就会发生频繁的上下文切换,增加调度开销。
- 缓存争抢: 各模型的数据和指令可能会互相冲刷 CPU 缓存(L1, L2, L3),导致缓存命中率下降,内存访问延迟增加。
- NUMA 效应: 在多路 CPU 系统中,如果模型的数据和执行线程分布在不同的 NUMA 节点上,跨节点内存访问会显著降低性能。
2.2 图形处理器 (GPU)
对于深度学习模型,GPU 几乎是不可或缺的加速器。它提供了海量的并行计算单元和高带宽显存。
- 计算单元争抢: 多个模型同时在 GPU 上执行核心运算时,会争夺 SMs (Streaming Multiprocessors) 等计算资源。
- 显存 (VRAM) 争抢: 每个模型都需要加载自身的权重参数和中间激活值到显存。显存容量有限,过度分配会导致 OOM (Out Of Memory) 错误,或者频繁的数据交换(在 Unified Memory 或虚拟内存机制下)。
- 显存带宽争抢: 模型前向推理涉及大量数据从显存加载到计算单元,以及计算结果写回显存。多个模型并发时会争夺有限的显存带宽。
- PCIe 带宽争抢: CPU 与 GPU 之间的数据传输通过 PCIe 总线,多个模型的数据传输会争抢 PCIe 带宽。
2.3 主内存 (RAM)
RAM 主要用于存储模型加载前的原始数据、CPU 侧的预处理数据、模型本身的权重副本(如果模型在 CPU 上运行或部分运行),以及操作系统和应用程序的其他数据。
- 容量争抢: 多个模型及其相关数据可能超出系统可用 RAM,导致操作系统进行页面置换 (paging/swapping),将数据写入磁盘,严重拖慢性能。
- 带宽争抢: 多个线程或进程同时访问主内存,会争夺内存总线带宽。
2.4 I/O (磁盘与网络)
- 磁盘 I/O: 模型加载、日志记录、输入/输出数据存储等都涉及磁盘访问。
- 网络 I/O: 分布式推理、远程数据获取、服务间通信等都依赖网络。
理解这些资源争抢的本质,是我们设计有效隔离方案的基础。
3. C++ 实现并发推理的常见架构模式
在 C++ 中实现多模型并发推理,可以采用多种架构模式,每种模式在隔离性、性能、复杂性等方面都有不同的权衡。
3.1 单进程多线程 (Single Process, Multiple Threads)
这是最常见的并发模型。所有模型都加载到同一个进程中,每个模型的推理任务由一个或多个线程执行。
- 优点:
- 低开销: 线程创建和切换的开销远小于进程。
- 共享内存: 线程之间共享进程的地址空间,数据传递非常高效。
- 易于实现: 利用 C++
std::thread、std::mutex、std::condition_variable等标准库即可构建。
- 缺点:
- 隔离性差: 所有线程共享 CPU 缓存、RAM、VRAM (如果使用同一 GPU)。一个线程的内存泄漏或崩溃可能导致整个进程崩溃。
- 资源争抢严重: 难以细粒度地控制每个线程的 CPU/GPU 资源配额。
- 调试复杂: 共享状态和同步问题可能导致难以复现的 Bug。
3.2 多进程 (Multiple Processes)
每个模型或一组模型运行在独立的进程中。进程间通过 IPC (Inter-Process Communication) 机制进行通信。
- 优点:
- 强大的 OS 级别隔离: 操作系统为每个进程提供独立的地址空间,一个进程的崩溃通常不会影响其他进程。
- 资源分配清晰: OS 可以更细粒度地为每个进程分配 CPU、内存等资源。
- 易于管理: 可以独立启动、停止、监控每个模型服务。
- 缺点:
- 高开销: 进程创建和切换的开销较大。
- 数据传递复杂: 进程间通信需要额外的机制(如共享内存、消息队列、管道、Socket),开销高于线程间直接内存访问。
- GPU 资源管理: 默认情况下,多个进程仍然会争抢同一个 GPU 上的计算资源和显存,需要专门的 GPU 隔离技术配合。
3.3 容器化 (Containerization,如 Docker)
将每个模型或模型服务打包成一个独立的容器。容器利用操作系统的 CGroup (Control Group) 和 Namespace 机制提供轻量级虚拟化。
- 优点:
- 极佳的隔离性: CGroup 可以精确限制 CPU、内存、I/O 等资源,Namespace 提供文件系统、网络、进程 ID 等隔离。
- 可移植性强: 容器镜像包含了所有依赖,可以在任何支持 Docker 的环境中运行。
- 易于部署和管理: 容器编排工具 (如 Kubernetes) 能够自动化部署、扩缩容和故障恢复。
- 缺点:
- 引入额外层: 增加了部署复杂性,对系统资源(如存储、CPU)有一定额外开销。
- GPU 虚拟化挑战: 虽有 GPU Passthrough 或 NVIDIA Container Runtime 等方案,但完全的 GPU 虚拟化和细粒度隔离仍有局限。
本讲座将主要聚焦于利用 C++ 结合操作系统 API 来实现细粒度的算力隔离,特别是在多线程和多进程架构下的实践。容器化技术虽然强大,但其核心隔离机制往往由操作系统和容器运行时提供,C++ 应用更多是作为容器内的负载。
4. C++ 实现算力隔离:CPU 与内存
我们将从操作系统层面的机制入手,探讨如何利用 C++ 调用这些机制来隔离 CPU 和主内存。
4.1 CPU 隔离
CPU 隔离的主要目标是确保每个模型或模型组能够独占或按比例使用特定的 CPU 核心,减少上下文切换和缓存争抢。
4.1.1 线程/进程亲和性 (CPU Affinity)
线程/进程亲和性允许我们将一个线程或进程绑定到指定的 CPU 核心集合上。这在多核系统中非常有效,可以避免关键任务在不同核心间频繁迁移,从而减少缓存失效和调度开销。
在 Linux 系统中,我们可以使用 sched_setaffinity 或 pthread_setaffinity_np 函数来设置 CPU 亲和性。
#include <iostream>
#include <thread>
#include <vector>
#include <sched.h> // For sched_setaffinity, cpu_set_t
#include <unistd.h> // For getpid
// Helper function to set CPU affinity for the current thread/process
bool set_cpu_affinity(const std::vector<int>& cpu_ids) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset); // Clear the CPU set
for (int id : cpu_ids) {
if (id < 0 || id >= sysconf(_SC_NPROCESSORS_ONLN)) {
std::cerr << "Error: Invalid CPU ID " << id << std::endl;
return false;
}
CPU_SET(id, &cpuset); // Add CPU ID to the set
}
// Set affinity for the current thread
// pthread_setaffinity_np for threads, sched_setaffinity for current process
int result = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
if (result != 0) {
std::cerr << "Error setting thread CPU affinity: " << strerror(errno) << std::endl;
return false;
}
return true;
}
// A dummy inference function
void model_inference(int model_id, const std::vector<int>& cpu_ids) {
if (!set_cpu_affinity(cpu_ids)) {
std::cerr << "Model " << model_id << " failed to set CPU affinity." << std::endl;
return;
}
std::cout << "Model " << model_id << " (PID: " << getpid()
<< ", TID: " << std::this_thread::get_id()
<< ") running on CPU(s): ";
for (int id : cpu_ids) {
std::cout << id << " ";
}
std::cout << std::endl;
// Simulate some work
for (volatile long i = 0; i < 1e9; ++i); // Heavy computation
std::cout << "Model " << model_id << " inference complete." << std::endl;
}
int main() {
std::cout << "Total CPUs available: " << sysconf(_SC_NPROCESSORS_ONLN) << std::endl;
// Example 1: Two models, each pinned to a separate core
std::thread model1_thread(model_inference, 1, std::vector<int>{0}); // Model 1 on CPU 0
std::thread model2_thread(model_inference, 2, std::vector<int>{1}); // Model 2 on CPU 1
model1_thread.join();
model2_thread.join();
std::cout << "n--- Separate CPU cores example finished ---n" << std::endl;
// Example 2: Two models, one pinned to a single core, another to multiple cores
// This assumes at least 3 cores are available (0, 1, 2)
if (sysconf(_SC_NPROCESSORS_ONLN) >= 3) {
std::thread model3_thread(model_inference, 3, std::vector<int>{0}); // Model 3 on CPU 0
std::thread model4_thread(model_inference, 4, std::vector<int>{1, 2}); // Model 4 on CPU 1 and 2
model3_thread.join();
model4_thread.join();
std::cout << "n--- Mixed CPU cores example finished ---n" << std::endl;
} else {
std::cout << "Not enough CPU cores for the mixed example (need >= 3)." << std::endl;
}
// Example 3: Demonstrate process affinity (for the main process itself)
// To set affinity for a child process, you'd call sched_setaffinity in the child after fork().
std::cout << "nSetting main process affinity to CPU 0 and 1..." << std::endl;
if (set_cpu_affinity(std::vector<int>{0, 1})) {
std::cout << "Main process affinity set successfully." << std::endl;
} else {
std::cout << "Failed to set main process affinity." << std::endl;
}
return 0;
}
注意事项:
_SC_NPROCESSORS_ONLN获取在线 CPU 数量。pthread_self()获取当前线程 ID。sched_setaffinity可以用于设置进程的 CPU 亲和性,它作用于调用它的进程或由pid参数指定的进程。- CPU 亲和性是操作系统层面的建议,不是强制隔离。高优先级的任务仍可能抢占低优先级任务的 CPU 时间片。
- 在 NUMA 架构下,应考虑将线程绑定到与其数据最近的 NUMA 节点上的 CPU 核心,以减少跨节点内存访问延迟。
4.1.2 进程/线程优先级 (Scheduling Priority)
操作系统调度器根据进程/线程的优先级来决定它们的执行顺序和时间片。高优先级的任务将获得更多的 CPU 时间。
在 Linux 中,可以使用 setpriority 函数设置进程/线程的优先级 (nice 值),或者使用 sched_setscheduler 设置实时调度策略。
#include <iostream>
#include <thread>
#include <vector>
#include <sys/resource.h> // For setpriority, getpriority
#include <sched.h> // For sched_getscheduler, sched_setscheduler
#include <unistd.h> // For getpid
// Helper function to set thread/process nice value (lower nice means higher priority)
bool set_nice_priority(int priority_value) {
// priority_value typically ranges from -20 (highest) to 19 (lowest)
int result = setpriority(PRIO_PROCESS, 0, priority_value); // 0 means current process
if (result != 0) {
std::cerr << "Error setting nice priority: " << strerror(errno) << std::endl;
return false;
}
return true;
}
// Helper function to set thread/process real-time scheduling policy (more advanced)
bool set_realtime_priority(int policy, int rt_priority) {
struct sched_param param;
param.sched_priority = rt_priority; // For real-time policies, 1 (lowest) to 99 (highest)
// 0 means current process
int result = sched_setscheduler(0, policy, ¶m);
if (result != 0) {
std::cerr << "Error setting real-time priority: " << strerror(errno) << std::endl;
return false;
}
return true;
}
void model_inference_with_priority(int model_id, int nice_value) {
if (!set_nice_priority(nice_value)) {
std::cerr << "Model " << model_id << " failed to set nice priority." << std::endl;
return;
}
std::cout << "Model " << model_id << " (PID: " << getpid()
<< ", TID: " << std::this_thread::get_id()
<< ") running with nice priority: " << getpriority(PRIO_PROCESS, 0) << std::endl;
for (volatile long i = 0; i < 5e8; ++i); // Simulate some work
std::cout << "Model " << model_id << " inference complete." << std::endl;
}
int main() {
std::cout << "Main process default nice priority: " << getpriority(PRIO_PROCESS, 0) << std::endl;
// Model 1: Higher priority (lower nice value)
std::thread model1_thread(model_inference_with_priority, 1, -10); // -10 is higher priority
// Model 2: Lower priority (higher nice value)
std::thread model2_thread(model_inference_with_priority, 2, 10); // 10 is lower priority
model1_thread.join();
model2_thread.join();
std::cout << "n--- Priority example finished ---n" << std::endl;
// Demonstrating real-time priority (requires root privileges for SCHED_FIFO/SCHED_RR)
// Be careful with real-time priorities, as they can starve other processes.
std::cout << "Attempting to set real-time priority for main process..." << std::endl;
if (set_realtime_priority(SCHED_FIFO, 50)) { // SCHED_FIFO or SCHED_RR
std::cout << "Main process real-time priority set successfully." << std::endl;
} else {
std::cout << "Failed to set real-time priority. This often requires root privileges." << std::endl;
}
return 0;
}
注意事项:
nice值:范围通常是 -20 (最高优先级) 到 19 (最低优先级)。默认是 0。setpriority作用于进程。如果想设置线程优先级,需要通过pthread_setschedparam配合实时调度策略。- 实时调度策略 (
SCHED_FIFO,SCHED_RR) 提供更强的优先级保证,但通常需要CAP_SYS_NICE能力或 root 权限,并且使用不当可能导致系统不稳定。
4.1.3 资源限制 (Resource Limits – setrlimit)
setrlimit 系统调用允许设置进程的各种资源限制,包括 CPU 时间、内存大小、文件描述符数量等。虽然 CPU 时间限制 (RLIMIT_CPU) 可以防止一个进程无限期地占用 CPU,但它通常用于限制批处理任务的运行时间,而不是为持续推理服务提供细粒度隔离。
#include <iostream>
#include <sys/resource.h> // For setrlimit, getrlimit
#include <unistd.h> // For fork, sleep
#include <thread> // For std::this_thread::sleep_for
#include <chrono>
void consume_cpu() {
std::cout << "Child process (PID: " << getpid() << ") consuming CPU..." << std::endl;
long long count = 0;
while (true) {
count++;
// Avoid optimizing out the loop
if (count % 100000000 == 0) {
std::cout << "Child process (PID: " << getpid() << ") count: " << count << std::endl;
}
}
}
int main() {
struct rlimit cpu_limit;
// Get current CPU limit
getrlimit(RLIMIT_CPU, &cpu_limit);
std::cout << "Current CPU soft limit: " << cpu_limit.rlim_cur << ", hard limit: " << cpu_limit.rlim_max << std::endl;
// Set CPU soft limit to 5 seconds, hard limit to 10 seconds
cpu_limit.rlim_cur = 5; // Soft limit: 5 seconds
cpu_limit.rlim_max = 10; // Hard limit: 10 seconds
if (setrlimit(RLIMIT_CPU, &cpu_limit) != 0) {
std::cerr << "Error setting CPU limit: " << strerror(errno) << std::endl;
return 1;
}
std::cout << "CPU limit set to soft: 5s, hard: 10s for main process." << std::endl;
pid_t pid = fork();
if (pid == -1) {
std::cerr << "Fork failed." << std::endl;
return 1;
} else if (pid == 0) { // Child process
// Child inherits the CPU limits from the parent
consume_cpu();
// This line should not be reached if CPU limit is hit
std::cout << "Child process finished normally (should not happen if limit hit)." << std::endl;
exit(0);
} else { // Parent process
std::cout << "Parent process (PID: " << getpid() << ") spawned child (PID: " << pid << ")." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait for the child to hit its soft limit
int status;
waitpid(pid, &status, 0); // Wait for child to terminate
if (WIFSIGNALED(status)) {
std::cout << "Child process terminated by signal: " << WTERMSIG(status)
<< " (likely SIGXCPU due to CPU limit)." << std::endl;
} else {
std::cout << "Child process terminated with status: " << WEXITSTATUS(status) << std::endl;
}
}
return 0;
}
注意事项:
RLIMIT_CPU限制的是 CPU 实际运行时间,而不是墙钟时间。- 当进程达到软限制时,它会收到
SIGXCPU信号。如果捕获并处理该信号,进程可以继续运行直到达到硬限制,届时会收到SIGKILL信号强制终止。 - 对于常驻的推理服务,
RLIMIT_CPU并不是理想的隔离机制,因为它会导致服务中断。更适合用于惩罚性限制或批处理作业。
4.2 内存 (RAM) 隔离
内存隔离旨在防止一个模型过度占用主内存,导致其他模型 OOM 或系统性能下降。
4.2.1 进程独立地址空间
这是多进程架构最主要的内存隔离优势。每个进程都有其独立的虚拟地址空间,由操作系统内存管理单元 (MMU) 映射到物理内存。一个进程的内存错误不会直接影响其他进程。
4.2.2 资源限制 (setrlimit for Memory)
我们可以使用 setrlimit 来限制进程可以使用的虚拟内存 (RLIMIT_AS) 或常驻内存 (RLIMIT_RSS)。
#include <iostream>
#include <sys/resource.h> // For setrlimit, getrlimit
#include <vector>
#include <memory>
#include <unistd.h> // For fork, sleep
#include <chrono>
#include <thread>
void allocate_memory(size_t size_mb) {
size_t size_bytes = size_mb * 1024 * 1024;
std::cout << "Child process (PID: " << getpid() << ") attempting to allocate " << size_mb << " MB..." << std::endl;
try {
// Using std::vector to ensure actual allocation and potential writing
std::vector<char> data(size_bytes);
// Touch memory to ensure it's actually mapped and potentially resident
for (size_t i = 0; i < size_bytes; i += 4096) { // Touch pages
data[i] = 'A';
}
std::cout << "Child process (PID: " << getpid() << ") successfully allocated and touched " << size_mb << " MB." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(10)); // Keep memory alive
} catch (const std::bad_alloc& e) {
std::cerr << "Child process (PID: " << getpid() << ") failed to allocate memory: " << e.what() << std::endl;
} catch (...) {
std::cerr << "Child process (PID: " << getpid() << ") encountered unknown error during allocation." << std::endl;
}
}
int main() {
struct rlimit mem_limit;
// Set virtual memory soft limit to 100 MB, hard limit to 200 MB
mem_limit.rlim_cur = 100 * 1024 * 1024; // 100 MB
mem_limit.rlim_max = 200 * 1024 * 1024; // 200 MB
if (setrlimit(RLIMIT_AS, &mem_limit) != 0) {
std::cerr << "Error setting virtual memory limit: " << strerror(errno) << std::endl;
return 1;
}
std::cout << "Virtual memory limit set to soft: 100MB, hard: 200MB for main process." << std::endl;
pid_t pid = fork();
if (pid == -1) {
std::cerr << "Fork failed." << std::endl;
return 1;
} else if (pid == 0) { // Child process
// Child inherits the memory limits from the parent
allocate_memory(150); // Try to allocate 150 MB, which is > soft limit (100MB) but < hard limit (200MB)
// If it exceeds hard limit, malloc will return nullptr or throw bad_alloc.
exit(0);
} else { // Parent process
std::cout << "Parent process (PID: " << getpid() << ") spawned child (PID: " << pid << ")." << std::endl;
int status;
waitpid(pid, &status, 0); // Wait for child to terminate
if (WIFEXITED(status)) {
std::cout << "Child process exited with status: " << WEXITSTATUS(status) << std::endl;
} else {
std::cout << "Child process terminated abnormally." << std::endl;
}
}
// Another example: Try to allocate less than soft limit
pid_t pid2 = fork();
if (pid2 == -1) {
std::cerr << "Fork 2 failed." << std::endl;
return 1;
} else if (pid2 == 0) {
allocate_memory(50); // Allocate 50MB, should succeed
exit(0);
} else {
std::cout << "Parent process (PID: " << getpid() << ") spawned second child (PID: " << pid2 << ")." << std::endl;
int status;
waitpid(pid2, &status, 0);
std::cout << "Second child process exited with status: " << WEXITSTATUS(status) << std::endl;
}
return 0;
}
注意事项:
RLIMIT_AS限制进程的虚拟地址空间大小。RLIMIT_RSS(Resident Set Size) 限制进程可以占用的物理内存大小。这个限制在 Linux 上通常是软限制,并且实际效果可能不如RLIMIT_AS严格。- 当进程尝试分配超过
RLIMIT_AS限制的内存时,malloc或new会失败。 - 内存限制对于防止单个模型耗尽系统内存非常有效,但需要精确估算每个模型的内存需求。
4.2.3 NUMA 内存分配
在多核多路 CPU 系统中,通常会有多个 NUMA (Non-Uniform Memory Access) 节点,每个节点包含一部分 CPU 核心和与之直接相连的本地内存。访问本地内存比访问其他 NUMA 节点的内存要快得多。
C++ 标准库本身不直接提供 NUMA 感知内存分配。我们需要借助 libnuma 库 (numactl 工具集的一部分)。
#include <iostream>
#include <vector>
#include <thread>
#include <numa.h> // For NUMA functions
#include <numaif.h> // For set_mempolicy
#include <unistd.h> // For getpid
#include <algorithm>
// Function to set thread affinity and NUMA memory policy
bool set_numa_policy(int node_id, const std::vector<int>& cpu_ids) {
// 1. Set CPU affinity
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
for (int id : cpu_ids) {
CPU_SET(id, &cpuset);
}
if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) {
std::cerr << "Error setting CPU affinity for node " << node_id << ": " << strerror(errno) << std::endl;
return false;
}
// 2. Set NUMA memory policy (allocate memory on specified node)
// MPOL_BIND: Only allocate on node_id. If node_id is full, allocation fails.
// MPOL_PREFERRED: Prefer node_id, but fall back to others if needed.
// MPOL_INTERLEAVE: Interleave memory allocation across specified nodes.
nodemask_t nodemask;
unsigned long mask = 0;
mask |= (1UL << node_id);
numa_bitmask_clearall(&nodemask);
numa_bitmask_setbit(&nodemask, node_id);
// Apply policy to current thread and its children
if (set_mempolicy(MPOL_BIND, nodemask.maskp, nodemask.size / sizeof(unsigned long)) != 0) {
std::cerr << "Error setting NUMA memory policy for node " << node_id << ": " << strerror(errno) << std::endl;
return false;
}
return true;
}
void model_inference_numa(int model_id, int numa_node, const std::vector<int>& cpu_ids, size_t alloc_mb) {
if (!set_numa_policy(numa_node, cpu_ids)) {
std::cerr << "Model " << model_id << " failed to set NUMA policy." << std::endl;
return;
}
std::cout << "Model " << model_id << " (PID: " << getpid()
<< ", TID: " << std::this_thread::get_id()
<< ") assigned to NUMA node " << numa_node
<< ", CPU(s): ";
for (int id : cpu_ids) {
std::cout << id << " ";
}
std::cout << std::endl;
// Allocate memory and touch it to ensure it's on the local NUMA node
size_t size_bytes = alloc_mb * 1024 * 1024;
try {
std::vector<char> data(size_bytes);
for (size_t i = 0; i < size_bytes; i += 4096) { // Touch pages
data[i] = 'X';
}
std::cout << "Model " << model_id << " successfully allocated and touched " << alloc_mb << " MB on NUMA node " << numa_node << std::endl;
for (volatile long i = 0; i < 1e9; ++i); // Simulate some work
} catch (const std::bad_alloc& e) {
std::cerr << "Model " << model_id << " failed to allocate " << alloc_mb << " MB: " << e.what() << std::endl;
}
std::cout << "Model " << model_id << " inference complete." << std::endl;
}
int main() {
if (numa_available() == -1) {
std::cerr << "NUMA support not available on this system." << std::endl;
return 1;
}
std::cout << "NUMA nodes available: " << numa_num_configured_nodes() << std::endl;
if (numa_num_configured_nodes() < 2) {
std::cerr << "Less than 2 NUMA nodes configured. NUMA example might not be effective." << std::endl;
// Proceed anyway for demonstration, but results might not show NUMA benefits clearly.
}
// Assuming NUMA node 0 has CPUs 0,1 and NUMA node 1 has CPUs 2,3
// This mapping needs to be verified on your specific hardware (e.g., using `numactl --hardware`)
std::thread model_node0(model_inference_numa, 1, 0, std::vector<int>{0, 1}, 50); // Model 1 on Node 0, CPUs 0,1, 50MB
std::thread model_node1(model_inference_numa, 2, 1, std::vector<int>{2, 3}, 50); // Model 2 on Node 1, CPUs 2,3, 50MB
model_node0.join();
model_node1.join();
std::cout << "n--- NUMA-aware inference finished ---n" << std::endl;
return 0;
}
注意事项:
- 需要安装
libnuma-dev(或类似名称) 软件包并链接-lnuma。 numactl --hardware命令可以查看系统 NUMA 拓扑和 CPU 到节点的映射关系。set_mempolicy影响后续的内存分配。为了确保已分配内存的 NUMA 属性,可能需要重新分配或通过mbind调整。- NUMA 感知优化对于高性能计算至关重要,尤其是在多路服务器上。
5. C++ 实现算力隔离:GPU 与显存
GPU 资源的隔离更为复杂,因为 GPU 本身是高度并行的设备,其设计目标是最大化吞吐量,而非严格隔离。然而,借助 CUDA/ROCm 等 API 和 NVIDIA MIG 等硬件特性,我们仍然可以实现不同程度的隔离。
本节以 NVIDIA CUDA 为例。
5.1 GPU 设备选择 (Device Selection)
最直接的 GPU 隔离方法是将不同的模型部署到不同的物理 GPU 上。如果系统有多个 GPU,每个模型可以独占一个 GPU,从而实现最强的隔离。
#include <iostream>
#include <vector>
#include <thread>
#include <cuda_runtime.h> // For CUDA API
// Helper function to check CUDA errors
void checkCuda(cudaError_t result, const char* func) {
if (result != cudaSuccess) {
std::cerr << "CUDA error at " << func << ": " << cudaGetErrorString(result) << std::endl;
exit(1);
}
}
void model_inference_gpu(int model_id, int gpu_id, size_t vram_alloc_mb) {
// Set the target GPU device for this thread
checkCuda(cudaSetDevice(gpu_id), "cudaSetDevice");
std::cout << "Model " << model_id << " (PID: " << getpid()
<< ", TID: " << std::this_thread::get_id()
<< ") assigned to GPU " << gpu_id << std::endl;
// Allocate some VRAM to simulate model loading/data
size_t vram_size_bytes = vram_alloc_mb * 1024 * 1024;
void* device_ptr = nullptr;
cudaError_t alloc_status = cudaMalloc(&device_ptr, vram_size_bytes);
if (alloc_status == cudaSuccess) {
std::cout << "Model " << model_id << " successfully allocated " << vram_alloc_mb << " MB on GPU " << gpu_id << std::endl;
// Simulate GPU computation
// In a real scenario, this would involve kernel launches, data transfers etc.
std::this_thread::sleep_for(std::chrono::seconds(5));
checkCuda(cudaFree(device_ptr), "cudaFree");
std::cout << "Model " << model_id << " freed VRAM on GPU " << gpu_id << std::endl;
} else {
std::cerr << "Model " << model_id << " failed to allocate " << vram_alloc_mb << " MB on GPU " << gpu_id
<< ": " << cudaGetErrorString(alloc_status) << std::endl;
}
std::cout << "Model " << model_id << " inference complete." << std::endl;
}
int main() {
int device_count;
checkCuda(cudaGetDeviceCount(&device_count), "cudaGetDeviceCount");
std::cout << "Total CUDA devices found: " << device_count << std::endl;
if (device_count < 2) {
std::cerr << "Less than 2 GPUs found. Multi-GPU isolation example might not be effective." << std::endl;
// Proceed anyway for demonstration
}
// Model 1 on GPU 0
std::thread model1_thread(model_inference_gpu, 1, 0, 1000); // Allocate 1GB VRAM
// Model 2 on GPU 1 (if available)
std::thread model2_thread;
if (device_count >= 2) {
model2_thread = std::thread(model_inference_gpu, 2, 1, 1500); // Allocate 1.5GB VRAM
} else {
std::cerr << "Model 2 cannot run on a separate GPU as only 1 GPU is available." << std::endl;
}
model1_thread.join();
if (model2_thread.joinable()) {
model2_thread.join();
}
std::cout << "n--- Multi-GPU device selection example finished ---n" << std::endl;
return 0;
}
注意事项:
cudaSetDevice必须在任何 CUDA API 调用之前,并且是线程本地的。- 这种方法要求系统有足够的物理 GPU。
- 虽然物理 GPU 是独立的,但它们仍然共享 PCIe 总线(如果不是通过 NVLink 等点对点连接),以及 CPU 资源。
5.2 CUDA 流 (CUDA Streams)
CUDA 流允许在单个 GPU 上并发执行多个内核函数和内存操作。它们是 GPU 内部的调度机制,提供了一种在 GPU 上重叠计算和数据传输的方式。
虽然 CUDA 流可以提高 GPU 利用率,但它们并不是严格的隔离机制。所有流仍然共享 GPU 的计算单元、显存带宽和全局显存。如果多个流同时请求大量计算或显存带宽,它们仍然会相互影响。
#include <iostream>
#include <vector>
#include <thread>
#include <cuda_runtime.h> // For CUDA API
#include <chrono>
// Dummy CUDA kernel
__global__ void dummy_kernel(float* data, int N) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < N) {
data[idx] = data[idx] * 2.0f + 1.0f;
}
}
void model_inference_stream(int model_id, int gpu_id) {
checkCuda(cudaSetDevice(gpu_id), "cudaSetDevice");
std::cout << "Model " << model_id << " (PID: " << getpid()
<< ", TID: " << std::this_thread::get_id()
<< ") using GPU " << gpu_id << std::endl;
cudaStream_t stream;
checkCuda(cudaStreamCreate(&stream), "cudaStreamCreate");
// Allocate device memory
int N = 1 << 20; // 1M elements
size_t data_size = N * sizeof(float);
float* d_data;
checkCuda(cudaMallocAsync(&d_data, data_size, stream), "cudaMallocAsync"); // CUDA 11.2+ async malloc
// Initialize host data
std::vector<float> h_data(N, 1.0f);
// Copy data to device asynchronously
checkCuda(cudaMemcpyAsync(d_data, h_data.data(), data_size, cudaMemcpyHostToDevice, stream), "cudaMemcpyAsync");
// Launch kernel asynchronously
int blocks = (N + 255) / 256;
dummy_kernel<<<blocks, 256, 0, stream>>>(d_data, N);
// Simulate more work or another kernel
// dummy_kernel<<<blocks, 256, 0, stream>>>(d_data, N);
// Wait for stream to complete
checkCuda(cudaStreamSynchronize(stream), "cudaStreamSynchronize");
// Copy result back (optional)
// checkCuda(cudaMemcpyAsync(h_data.data(), d_data, data_size, cudaMemcpyDeviceToHost, stream), "cudaMemcpyAsync");
// checkCuda(cudaStreamSynchronize(stream), "cudaStreamSynchronize");
checkCuda(cudaFreeAsync(d_data, stream), "cudaFreeAsync"); // CUDA 11.2+ async free
checkCuda(cudaStreamDestroy(stream), "cudaStreamDestroy");
std::cout << "Model " << model_id << " inference on stream complete." << std::endl;
}
int main() {
int device_count;
checkCuda(cudaGetDeviceCount(&device_count), "cudaGetDeviceCount");
if (device_count == 0) {
std::cerr << "No CUDA devices found." << std::endl;
return 1;
}
std::cout << "Running two models concurrently on streams on GPU 0." << std::endl;
// Both models run on GPU 0, but using separate streams
std::thread model1_thread(model_inference_stream, 1, 0);
std::thread model2_thread(model_inference_stream, 2, 0);
model1_thread.join();
model2_thread.join();
std::cout << "n--- CUDA stream example finished ---n" << std::endl;
return 0;
}
5.3 CUDA 多进程服务 (Multi-Process Service – MPS)
NVIDIA MPS 是一个 CUDA 驱动程序功能,允许在同一个 GPU 上,由多个进程并发执行 CUDA 内核,而无需进行完全上下文切换。它通过一个 MPS 控制守护进程来协调来自不同进程的 CUDA API 调用,并将它们提交给 GPU。
- 优点: 允许多个 CUDA 进程共享同一个 GPU,提高 GPU 利用率,同时提供比完全上下文切换更低的开销。
- 缺点: MPS 并没有提供严格的计算和显存隔离。所有进程仍然共享 GPU 的计算单元、显存和带宽。一个进程的过度使用可能会影响其他进程。它主要解决了多进程共享 GPU 的效率问题,而非资源隔离。
- 使用方式: MPS 需要手动启动 MPS 守护进程。C++ 应用程序代码本身无需特殊修改,只需像往常一样调用 CUDA API。
# How to start MPS (typically in a server environment)
# Start MPS daemon
nvidia-cuda-mps-control -d
# Check if MPS is running
ps aux | grep mps
# Now run your C++ applications that use CUDA.
# Each application will connect to the MPS server and share the GPU.
# For example:
# ./my_model_app_1 &
# ./my_model_app_2 &
# Stop MPS daemon
echo quit | nvidia-cuda-mps-control
5.4 NVIDIA Multi-Instance GPU (MIG)
MIG 是 NVIDIA Ampere 架构及更高版本 GPU 引入的一项硬件特性,它允许将单个物理 GPU 划分为多达七个完全独立的 GPU 实例(称为 "GPU Instances" 和 "Compute Instances")。每个实例都有自己独立的显存、缓存、计算单元和内存带宽。
- 优点: 提供了硬件级别的、真正的算力隔离。每个模型实例可以在自己的 MIG 实例上运行,互不干扰,拥有可预测的性能。这对于多租户和严格 QoS 要求的场景至关重要。
- 缺点:
- 仅适用于支持 MIG 的 NVIDIA GPU(如 A100)。
- 需要预先配置 MIG 实例,这通常通过
nvidia-smi工具完成,而不是在 C++ 应用程序内部动态控制。 - C++ 应用程序需要通过
cudaSetDevice选择对应的 MIG 设备(它们会作为独立的 CUDA 设备暴露给系统)。
// Example of C++ application using MIG (after MIG instances are configured by nvidia-smi)
#include <iostream>
#include <vector>
#include <thread>
#include <cuda_runtime.h>
void checkCuda(cudaError_t result, const char* func) {
if (result != cudaSuccess) {
std::cerr << "CUDA error at " << func << ": " << cudaGetErrorString(result) << std::endl;
exit(1);
}
}
void model_inference_mig(int model_id, int mig_device_id, size_t vram_alloc_mb) {
// Each MIG instance appears as a separate CUDA device
checkCuda(cudaSetDevice(mig_device_id), "cudaSetDevice");
cudaDeviceProp prop;
checkCuda(cudaGetDeviceProperties(&prop, mig_device_id), "cudaGetDeviceProperties");
std::cout << "Model " << model_id << " (PID: " << getpid()
<< ", TID: " << std::this_thread::get_id()
<< ") assigned to MIG Device ID " << mig_device_id
<< " (Name: " << prop.name << ")" << std::endl;
size_t vram_size_bytes = vram_alloc_mb * 1024 * 1024;
void* device_ptr = nullptr;
cudaError_t alloc_status = cudaMalloc(&device_ptr, vram_size_bytes);
if (alloc_status == cudaSuccess) {
std::cout << "Model " << model_id << " successfully allocated " << vram_alloc_mb << " MB on MIG Device " << mig_device_id << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5)); // Simulate GPU computation
checkCuda(cudaFree(device_ptr), "cudaFree");
std::cout << "Model " << model_id << " freed VRAM on MIG Device " << mig_device_id << std::endl;
} else {
std::cerr << "Model " << model_id << " failed to allocate " << vram_alloc_mb << " MB on MIG Device " << mig_device_id
<< ": " << cudaGetErrorString(alloc_status) << std::endl;
}
std::cout << "Model " << model_id << " inference complete." << std::endl;
}
int main() {
int device_count;
checkCuda(cudaGetDeviceCount(&device_count), "cudaGetDeviceCount");
std::cout << "Total CUDA devices (including MIG instances) found: " << device_count << std::endl;
if (device_count < 2) {
std::cerr << "Less than 2 CUDA devices (or MIG instances) found. MIG example might not be effective." << std::endl;
return 1;
}
// Assuming MIG instances are configured and appear as device 0 and device 1
std::thread model1_thread(model_inference_mig, 1, 0, 1000); // Model 1 on MIG device 0
std::thread model2_thread(model_inference_mig, 2, 1, 1000); // Model 2 on MIG device 1
model1_thread.join();
model2_thread.join();
std::cout << "n--- MIG-enabled inference example finished ---n" << std::endl;
return 0;
}
MIG 配置示例 (Shell 命令):
- 检查 GPU 是否支持 MIG:
nvidia-smi -q | grep MIG - 启用 MIG 模式:
sudo nvidia-smi -i 0 -mig 1(对 GPU 0) - 创建 GPU 实例和计算实例 (这需要根据你的 GPU 型号和需求来配置,例如创建一个 1g.5gb 大小的实例):
sudo nvidia-smi mig -cgi 1g.5gb -C -i 0
sudo nvidia-smi mig -cci -gi 0 -C -i 0(为 GPU 实例 0 创建计算实例) - 查看新创建的 MIG 设备:
nvidia-smi(会显示新的 MIG 设备 ID) - 运行 C++ 应用程序,通过
cudaSetDevice选择这些新的设备 ID。 - 禁用 MIG 模式:
sudo nvidia-smi -i 0 -mig 0
5.5 显存限制 (CUDA Memory Pools / cudaMallocAsync)
CUDA 11.2 引入了新的内存分配 API,如 cudaMallocAsync 和内存池 (cudaMemPool)。虽然这些不是严格的隔离机制,但它们提供了更精细的显存管理,有助于减少显存碎片,提高分配效率,并间接辅助隔离。我们可以为每个模型创建独立的内存池,或者限制其分配行为。
#include <iostream>
#include <vector>
#include <thread>
#include <cuda_runtime.h>
#include <chrono>
void checkCuda(cudaError_t result, const char* func) {
if (result != cudaSuccess) {
std::cerr << "CUDA error at " << func << ": " << cudaGetErrorString(result) << std::endl;
exit(1);
}
}
void model_inference_mem_pool(int model_id, int gpu_id, size_t max_vram_mb) {
checkCuda(cudaSetDevice(gpu_id), "cudaSetDevice");
std::cout << "Model " << model_id << " (PID: " << getpid()
<< ", TID: " << std::this_thread::get_id()
<< ") on GPU " << gpu_id << ", max VRAM: " << max_vram_mb << " MB" << std::endl;
// Create a memory pool for this model
cudaMemPool_t memPool;
cudaMemPoolProps poolProps = {};
poolProps.allocType = cudaMemPoolAllocationTypePinnedIpc;
poolProps.handleType = cudaMemPoolHandleTypeNone;
poolProps.location.type = cudaMemLocationTypeDevice;
poolProps.location.id = gpu_id;
checkCuda(cudaMemPoolCreate(&memPool, &poolProps), "cudaMemPoolCreate");
// Set a maximum memory size for this pool
size_t max_bytes = max_vram_mb * 1024 * 1024;
checkCuda(cudaMemPoolSetAttribute(memPool, cudaMemPoolAttrReleaseThreshold, &max_bytes), "cudaMemPoolSetAttribute ReleaseThreshold");
// Also consider cudaMemPoolAttrMaxSize for a hard cap.
// Associate this pool with the current stream/thread
// By default, the current stream uses the thread's current pool.
// We can explicitly set it for a stream:
cudaStream_t stream;
checkCuda(cudaStreamCreate(&stream), "cudaStreamCreate");
checkCuda(cudaStreamSetAttribute(stream, cudaStreamAttributeMemPoolCreatedEvent, (void*)&memPool), "cudaStreamSetAttribute MemPoolCreatedEvent");
// Note: cudaStreamSetAttribute(stream, cudaStreamAttributeMemPoolCreatedEvent) requires CUDA 11.6+
// For older versions, you might need to set the pool globally for the device context or use a custom allocator.
// Allocate memory from the pool using cudaMallocAsync
size_t alloc_mb_1 = max_vram_mb / 2;
size_t alloc_bytes_1 = alloc_mb_1 * 1024 * 1024;
void* ptr1 = nullptr;
cudaError_t alloc_status_1 = cudaMallocAsync(&ptr1, alloc_bytes_1, stream);
if (alloc_status_1 == cudaSuccess) {
std::cout << "Model " << model_id << " allocated " << alloc_mb_1 << " MB from pool." << std::endl;
} else {
std::cerr << "Model " << model_id << " failed to allocate " << alloc_mb_1 << " MB from pool: " << cudaGetErrorString(alloc_status_1) << std::endl;
}
size_t alloc_mb_2 = max_vram_mb / 2 + 100; // Attempt to exceed the limit slightly
size_t alloc_bytes_2 = alloc_mb_2 * 1024 * 1024;
void* ptr2 = nullptr;
cudaError_t alloc_status_2 = cudaMallocAsync(&ptr2, alloc_bytes_2, stream);
if (alloc_status_2 == cudaSuccess) {
std::cout << "Model " << model_id << " allocated " << alloc_mb_2 << " MB from pool." << std::endl;
} else {
std::cerr << "Model " << model_id << " failed to allocate " << alloc_mb_2 << " MB from pool: " << cudaGetErrorString(alloc_status_2) << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(5)); // Simulate work
if (ptr1) checkCuda(cudaFreeAsync(ptr1, stream), "cudaFreeAsync ptr1");
if (ptr2) checkCuda(cudaFreeAsync(ptr2, stream), "cudaFreeAsync ptr2");
checkCuda(cudaStreamDestroy(stream), "cudaStreamDestroy");
checkCuda(cudaMemPoolDestroy(memPool), "cudaMemPoolDestroy");
std::cout << "Model " << model_id << " inference complete." << std::endl;
}
int main() {
int device_count;
checkCuda(cudaGetDeviceCount(&device_count), "cudaGetDeviceCount");
if (device_count == 0) {
std::cerr << "No CUDA devices found." << std::endl;
return 1;
}
std::cout << "Running two models with VRAM pool limits on GPU 0." << std::endl;
// Both models on GPU 0, but with their own memory pools and limits
std::thread model1_thread(model_inference_mem_pool, 1, 0, 1024); // Model 1: Max 1GB VRAM
std::thread model2_thread(model_inference_mem_pool, 2, 0, 1536); // Model 2: Max 1.5GB VRAM
model1_thread.join();
model2_thread.join();
std::cout << "n--- CUDA memory pool example finished ---n" << std::endl;
return 0;
}
注意事项:
cudaMallocAsync和cudaMemPool是 CUDA 11.2+ 的特性。cudaMemPoolAttrReleaseThreshold属性设置了内存池在空闲时释放回驱动程序的内存量阈值。cudaMemPoolAttrMaxSize属性可以设置内存池的最大占用量,超出此限制的分配将失败。- 内存池可以减少碎片,提高分配速度,但它们本身不提供硬性隔离。如果一个池耗尽了 GPU 的物理显存,其他池的分配仍可能失败。
6. 整合方案:多进程管理器架构
为了实现更强大的隔离性、更好的容错性和可管理性,通常会采用多进程架构,由一个主管理器进程负责调度和资源分配,而每个模型或模型组运行在独立的子进程中。
6.1 架构概述
- 管理器进程 (Manager Process):
- 负责接收外部推理请求。
- 维护所有工作进程的状态(负载、可用资源、模型类型)。
- 根据负载均衡策略将请求分发给合适的工作进程。
- 监控工作进程的健康状态。
- 在启动时为每个工作进程配置资源隔离参数(CPU 亲和性、内存限制、GPU 设备 ID 等)。
- 工作进程 (Worker Process):
- 每个工作进程负责加载一个或多个特定模型。
- 在启动时应用管理器分配的资源隔离设置。
- 接收来自管理器的推理请求,执行推理,并将结果返回。
- 处理模型加载、预处理、推理、后处理逻辑。
- 进程间通信 (IPC):
- 用于管理器与工作进程之间的请求/响应传递、状态更新。
- 常见 IPC 机制:共享内存 (Shared Memory)、消息队列 (Message Queues)、Socket (Unix Domain Socket 或 TCP/IP)。
6.2 C++ 实现骨架 (Multi-Process with Shared Memory IPC)
下面是一个简化的 C++ 骨架,演示如何使用 fork() 创建工作进程,并通过共享内存进行简单的通信。
#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
#include <sys/wait.h>
#include <sys/mman.h> // For shm_open, mmap, shm_unlink
#include <fcntl.h> // For O_CREAT, O_RDWR
#include <unistd.h> // For fork, getpid, ftruncate
#include <string>
#include <cstring> // For memcpy
#include <atomic> // For std::atomic
// Structure for shared memory communication
struct SharedData {
std::atomic<int> request_id;
int model_id;
char input_data[256]; // Simplified input data
char output_result[256]; // Simplified output result
std::atomic<bool> ready_for_request;
std::atomic<bool> request_processed;
};
const char* SHM_NAME = "/multi_model_shm";
const size_t SHM_SIZE = sizeof(SharedData);
// Function to set CPU affinity (re-using from previous example)
bool set_cpu_affinity(const std::vector<int>& cpu_ids) {
// ... (same as before)
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
for (int id : cpu_ids) {
CPU_SET(id, &cpuset);
}
int result = sched_setaffinity(0, sizeof(cpu_set_t), &cpuset); // 0 for current process
if (result != 0) {
std::cerr << "Error setting process CPU affinity: " << strerror(errno) << std::endl;
return false;
}
return true;
}
// Worker process function
void worker_main(int worker_id, int model_type, SharedData* shm_data,
const std::vector<int>& cpu_ids_for_worker, int gpu_id_for_worker) {
std::cout << "Worker " << worker_id << " (PID: " << getpid()
<< ") starting for Model Type " << model_type << "." << std::endl;
// Apply CPU isolation
if (!cpu_ids_for_worker.empty()) {
if (set_cpu_affinity(cpu_ids_for_worker)) {
std::cout << "Worker " << worker_id << " CPU affinity set to: ";
for (int id : cpu_ids_for_worker) std::cout << id << " ";
std::cout << std::endl;
}
}
// Apply GPU isolation (if needed, simplified for this example)
#ifdef USE_CUDA
if (gpu_id_for_worker != -1) {
cudaError_t err = cudaSetDevice(gpu_id_for_worker);
if (err == cudaSuccess) {
std::cout << "Worker " << worker_id << " GPU set to device " << gpu_id_for_worker << std::endl;
} else {
std::cerr << "Worker " << worker_id << " failed to set GPU to device " << gpu_id_for_worker
<< ": " << cudaGetErrorString(err) << std::endl;
}
}
#endif
// Simulate model loading
std::cout << "Worker " << worker_id << " loading Model Type " << model_type << "..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "Worker " << worker_id << " Model Type " << model_type << " loaded." << std::endl;
shm_data->model_id = model_type; // Announce which model this worker handles
shm_data->ready_for_request.store(true);
while (true) {
// Wait for a request
while (shm_data->ready_for_request.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
int current_request_id = shm_data->request_id.load();
std::cout << "Worker " << worker_id << " (Model " << model_type
<< ") received request " << current_request_id
<< " with input: " << shm_data->input_data << std::endl;
// Simulate inference
std::this_thread::sleep_for(std::chrono::milliseconds(current_request_id * 100)); // Longer for higher IDs
std::string result = "Processed_by_Worker" + std::to_string(worker_id) + "_Req" + std::to_string(current_request_id);
strncpy(shm_data->output_result, result.c_str(), sizeof(shm_data->output_result) - 1);
shm_data->output_result[sizeof(shm_data->output_result) - 1] = '';
std::cout << "Worker " << worker_id << " finished request " << current_request_id << std::endl;
shm_data->request_processed.store(true);
shm_data->ready_for_request.store(true); // Signal ready for next request
}
}
int main() {
// 1. Create shared memory
int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666);
if (shm_fd == -1) {
std::cerr << "Failed to create shared memory: " << strerror(errno) << std::endl;
return 1;
}
if (ftruncate(shm_fd, SHM_SIZE) == -1) {
std::cerr << "Failed to truncate shared memory: " << strerror(errno) << std::endl;
shm_unlink(SHM_NAME);
return 1;
}
SharedData* shm_data = static_cast<SharedData*>(mmap(NULL, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0));
if (shm_data == MAP_FAILED) {
std::cerr << "Failed to mmap shared memory: " << strerror(errno) << std::endl;
shm_unlink(SHM_NAME);
return 1;
}
close(shm_fd); // Close the file descriptor, shared memory object remains
// Initialize shared data
shm_data->request_id.store(0);
shm_data->ready_for_request.store(false); // Initially not ready for manager to send
shm_data->request_processed.store(false);
std::cout << "Manager (PID: " << getpid() << ") started." << std::endl;
std::vector<pid_t> worker_pids;
std::vector<SharedData*> worker_shm_data_ptrs; // In a real system, each worker would have its own SHM or a more complex IPC
// For simplicity, we'll use one shared memory segment for all workers
// In a robust system, each worker might have its own dedicated SHM segment for requests/responses
// or use a more sophisticated message queue system.
// 2. Spawn worker processes
// Worker 1: Model Type 1, on CPU 0, GPU 0
pid_t pid1 = fork();
if (pid1 == 0) { // Child process 1
worker_main(1, 1, shm_data, {0}, 0);
exit(0);
} else if (pid1 > 0) {
worker_pids.push_back(pid1);
worker_shm_data_ptrs.push_back(shm_data); // Point to the same SHM for this simplified example
}
// Worker 2: Model Type 2, on CPU 1, GPU 1 (if available)
pid_t pid2 = fork();
if (pid2 == 0) { // Child process 2
worker_main(2, 2, shm_data, {1}, 1);
exit(0);
} else if (pid2 > 0) {
worker_pids.push_back(pid2);
worker_shm_data_ptrs.push_back(shm_data);
}
// 3. Manager waits for workers to be ready
std::cout << "Manager waiting for workers to be ready..." << std::endl;
for (size_t i = 0; i < worker_pids.size(); ++i) {
// In a real system, each worker would signal its readiness independently
// Here, we just wait a bit and assume they are ready for this simple example.
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "All workers assumed ready." << std::endl;
// 4. Manager sends requests
for (int i = 1; i <= 5; ++i) {
std::cout << "nManager sending request " << i << "..." << std::endl;
// Wait for worker to be ready to receive (previous request processed)
// This is a very simplified load balancing: just wait for the single SHM to be free.
// A real system would pick an available worker from a pool.
while (!shm_data->ready_for_request.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
shm_data->ready_for_request.store(false); // Mark not ready for new request
shm_data->request_id.store(i);
shm_data->model_id = (i % 2 == 0) ? 2 : 1; // Alternate model types
std::string input = "Input_for_Req" + std::to_string(i);
strncpy(shm_data->input_data, input.c_str(), sizeof(shm_data->input_data) - 1);
shm_data->input_data[sizeof(shm_data->input_data) - 1] = '';
shm_data->request_processed.store(false); // Reset processed flag
std::cout << "Manager sent request " << i << " for Model Type " << shm_data->model_id << std::endl;
// Wait for worker to process
while (!shm_data->request_processed.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
std::cout << "Manager received response for request " << i << ": " << shm_data->output_result << std::endl;
}
// 5. Cleanup
std::cout << "nManager sending termination signal (not implemented in this simple example)..." << std::endl;
// In a real system, manager would send a termination signal/message to workers
// and wait for them to exit gracefully.
for (pid_t pid : worker_pids) {
kill(pid, SIGTERM); // Send terminate signal
}
for (pid_t pid : worker_pids) {
waitpid(pid, NULL, 0);
}
munmap(shm_data, SHM_SIZE);
shm_unlink(SHM_NAME);
std::cout << "Manager finished and cleaned up." << std::endl;
return 0;
}
编译命令 (Linux):
g++ -std=c++17 -O2 multi_model_inference.cpp -o multi_model_inference -pthread -lrt -lnuma
如果使用了 CUDA 相关代码,还需要:
nvcc -std=c++17 -O2 multi_model_inference.cpp -o multi_model_inference -pthread -lrt -lnuma
或者分步编译:
g++ -std=c++17 -O2 -c multi_model_inference.cpp -o multi_model_inference.o -pthread -lrt -lnuma
nvcc -dlink multi_model_inference.o -o multi_model_inference_link.o
g++ -std=c++17 -O2 multi_model_inference_link.o -o multi_model_inference -pthread -lrt -lnuma -lcudart
上述代码的简化说明:
- 共享内存: 仅用一个
SharedData结构体在主进程和所有子进程之间共享。在生产环境中,这通常意味着每个工作进程有其独立的共享内存区域,或者使用更复杂的 IPC 机制(如 Unix Domain Socket、gRPC over shared memory)。 - 负载均衡: 非常简单,主进程只是顺序发送请求,并等待共享内存被释放。实际系统会维护一个可用工作进程池,并根据负载、模型类型等选择最合适的工作进程。
- 错误处理与优雅退出: 简化了,实际系统需要更健壮的错误处理和信号处理。
6.3 隔离技术对比总结
| 特性 / 技术 | CPU 隔离 | GPU 隔离 |