C++ 与 GPU 通信:通过 C++ 封装 CUDA Stream 实现异构计算的异步编排

各位好,

今天我们将深入探讨一个在高性能计算领域至关重要的话题:如何通过C++封装CUDA Stream,实现异构计算的异步编排。随着计算任务的复杂性日益增加,我们不再满足于顺序执行,而是追求极致的并行与吞吐量。GPU作为强大的并行计算单元,其潜能的充分发挥,离恰当的异步编程模式密不可分。C++作为系统级编程语言的佼佼者,为我们提供了构建高效、可维护的抽象层面的强大工具。

1. 异构计算与异步编程的必要性

在现代计算体系结构中,CPU和GPU协同工作,形成异构计算环境。CPU擅长串行逻辑控制,而GPU则以其众多的计算核心,在数据并行任务上展现出无与伦比的优势。然而,CPU与GPU之间的通信(如数据传输)以及GPU内部不同计算任务的执行,都存在固有的延迟。如果采用纯粹的同步模式,即每一步操作都等待前一步完成,那么这些延迟将严重制约整体性能,导致GPU利用率低下。

异步编程的核心思想是,当一个耗时操作(如数据传输或核函数执行)被提交后,CPU不必等待其完成,而是可以立即执行后续操作。这样,多个操作可以在时间上重叠,从而:

  1. 隐藏延迟 (Latency Hiding):例如,在GPU执行计算时,CPU可以准备下一批数据,或者将前一批计算结果从GPU传输回主机。
  2. 提高资源利用率 (Resource Utilization):GPU的多个计算单元可以同时执行不同的任务,或者在一个任务的计算过程中,利用空闲的内存带宽进行数据传输。
  3. 提升吞吐量 (Throughput):通过最大化并行和重叠,单位时间内完成的工作量显著增加。

CUDA Stream正是NVIDIA GPU上实现异步行为的基石。

2. CUDA Stream 基础

CUDA Stream是GPU上操作的序列。CUDA编程模型中的所有操作,无论是内存传输(cudaMemcpy),核函数启动(<<<...>>>),还是事件操作(cudaEventRecord),都发生在一个特定的Stream中。

2.1 默认 Stream (Stream 0)

在没有明确指定Stream的情况下,所有的CUDA操作都默认在NULL Stream (或 Stream 0) 上执行。Stream 0有一个特殊行为:它隐式地与设备上的所有其他Streams同步。这意味着,当一个操作在Stream 0上完成时,设备上所有其他非默认Stream上已经提交的操作也必须完成。反之,当一个非默认Stream上的操作完成时,Stream 0上的后续操作可以开始执行。这种行为简化了编程,但也限制了并行性。为了实现真正的异步和重叠,我们必须使用非默认Stream

2.2 非默认 Stream

非默认Stream是独立的操作序列。在不同的非默认Stream中提交的操作,只要硬件资源允许,可以并发执行。这正是实现异步编排的关键。

表 1: CUDA Stream 相关核心 API 函数

函数名称 描述
cudaStreamCreate 创建一个新的CUDA Stream。
cudaStreamDestroy 销毁一个CUDA Stream,释放相关资源。
cudaMemcpyAsync 异步地进行内存拷贝,不阻塞CPU。需要指定目标Stream。
kernel<<<grid, block, shmem, stream>>> 异步地启动一个核函数,不阻塞CPU。需要指定目标Stream。
cudaStreamSynchronize 阻塞CPU,直到指定Stream中的所有操作完成。
cudaDeviceSynchronize 阻塞CPU,直到设备上所有Stream中的所有操作都完成。
cudaEventCreate 创建一个CUDA事件。
cudaEventDestroy 销毁一个CUDA事件。
cudaEventRecord 在指定Stream中记录一个事件。当此Stream中的所有先前操作完成时,事件被记录。
cudaStreamWaitEvent 使一个Stream等待另一个Stream中记录的事件完成。
cudaEventSynchronize 阻塞CPU,直到指定事件被记录。
cudaEventElapsedTime 计算两个事件之间的时间差。

3. 同步编程模式的局限性

让我们通过一个简单的向量加法示例来体会同步编程的局限性。

#include <iostream>
#include <vector>
#include <chrono>

// CUDA核函数:向量加法
__global__ void addVectors(float* a, float* b, float* c, int N) {
    int idx = blockIdx.x * blockDim.x + threadIdx.x;
    if (idx < N) {
        c[idx] = a[idx] + b[idx];
    }
}

// 错误检查宏
#define CUDA_CHECK(call) 
    do { 
        cudaError_t err = call; 
        if (err != cudaSuccess) { 
            std::cerr << "CUDA Error at " << __FILE__ << ":" << __LINE__ 
                      << " - " << cudaGetErrorString(err) << std::endl; 
            exit(EXIT_FAILURE); 
        } 
    } while (0)

int main() {
    const int N = 1 << 20; // 1M elements
    size_t size = N * sizeof(float);

    // 1. 主机内存分配与初始化
    std::vector<float> h_a(N), h_b(N), h_c(N);
    for (int i = 0; i < N; ++i) {
        h_a[i] = static_cast<float>(i);
        h_b[i] = static_cast<float>(i * 2);
    }

    // 2. 设备内存分配
    float *d_a, *d_b, *d_c;
    CUDA_CHECK(cudaMalloc(&d_a, size));
    CUDA_CHECK(cudaMalloc(&d_b, size));
    CUDA_CHECK(cudaMalloc(&d_c, size));

    auto start_time = std::chrono::high_resolution_clock::now();

    // 3. 数据从主机传输到设备 (同步)
    CUDA_CHECK(cudaMemcpy(d_a, h_a.data(), size, cudaMemcpyHostToDevice));
    CUDA_CHECK(cudaMemcpy(d_b, h_b.data(), size, cudaMemcpyHostToDevice));

    // 4. 启动核函数 (同步)
    int blockSize = 256;
    int gridSize = (N + blockSize - 1) / blockSize;
    addVectors<<<gridSize, blockSize>>>(d_a, d_b, d_c, N);
    CUDA_CHECK(cudaGetLastError()); // 检查核函数启动错误

    // 5. 等待核函数完成 (隐式同步:通常在下一个cudaMemcpyHostToDevice或cudaMemcpyDeviceToHost前)
    // 或者显式同步,例如:
    CUDA_CHECK(cudaDeviceSynchronize()); // 强制等待所有GPU任务完成

    // 6. 数据从设备传输回主机 (同步)
    CUDA_CHECK(cudaMemcpy(h_c.data(), d_c, size, cudaMemcpyDeviceToHost));

    auto end_time = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed_time = end_time - start_time;
    std::cout << "Synchronous execution time: " << elapsed_time.count() << " seconds" << std::endl;

    // 7. 验证结果 (可选)
    // ...

    // 8. 释放设备内存
    CUDA_CHECK(cudaFree(d_a));
    CUDA_CHECK(cudaFree(d_b));
    CUDA_CHECK(cudaFree(d_c));

    return 0;
}

在这个同步例子中:

  • cudaMemcpy 阻塞CPU,直到数据传输完成。
  • 核函数启动后,虽然是异步的,但紧随其后的 cudaDeviceSynchronize 会强制CPU等待核函数完成。
  • 从设备传输回主机的数据传输也阻塞CPU。

这意味着CPU在大部分时间都在等待GPU完成任务,无法利用这段时间做其他有意义的工作。

4. C++ 封装 CUDA Stream:构建抽象层

为了更好地管理CUDA Stream和Event,并遵循C++的RAII(Resource Acquisition Is Initialization)原则,我们可以创建C++类来封装底层的CUDA API。这不仅能简化代码,提高可读性,还能自动处理资源的创建和销毁,降低内存泄漏的风险。

我们将封装 cudaStream_tcudaEvent_t,并提供方便的接口。为了更好地管理设备内存,我们甚至可以封装 cudaMalloccudaFree

4.1 CudaError 类 (可选,但推荐)

一个简单的错误处理类,用于封装CUDA错误。

#include <stdexcept>
#include <string>
#include <cuda_runtime.h> // For cudaError_t and cudaGetErrorString

class CudaError : public std::runtime_error {
public:
    explicit CudaError(cudaError_t err, const std::string& msg = "")
        : std::runtime_error(msg + (msg.empty() ? "" : ": ") + cudaGetErrorString(err)),
          error_code_(err) {}

    cudaError_t get_error_code() const { return error_code_; }

private:
    cudaError_t error_code_;
};

// 辅助函数,用于检查CUDA API调用结果
inline void checkCudaError(cudaError_t err, const std::string& msg = "") {
    if (err != cudaSuccess) {
        throw CudaError(err, msg);
    }
}

4.2 CudaStream 类

这个类将封装 cudaStream_t 句柄,并在构造时创建Stream,在析构时销毁Stream。

#include <cuda_runtime.h>
#include <string>
#include <iostream>

// Forward declaration for CudaEvent
class CudaEvent;

class CudaStream {
public:
    // 构造函数:创建CUDA Stream
    explicit CudaStream(unsigned int flags = cudaStreamDefault) {
        checkCudaError(cudaStreamCreateWithFlags(&stream_, flags), "Failed to create CUDA stream");
        // std::cout << "CudaStream created: " << stream_ << std::endl; // For debugging
    }

    // 析构函数:销毁CUDA Stream
    ~CudaStream() {
        if (stream_) {
            // 注意:cudaStreamDestroy是异步的,可能在销毁前还有任务未完成。
            // 生产环境中可能需要先调用cudaStreamSynchronize()
            // 或者确保在所有使用此Stream的任务完成后才销毁
            cudaError_t err = cudaStreamDestroy(stream_);
            if (err != cudaSuccess) {
                std::cerr << "Warning: Failed to destroy CUDA stream " << stream_
                          << ": " << cudaGetErrorString(err) << std::endl;
            }
            // std::cout << "CudaStream destroyed: " << stream_ << std::endl; // For debugging
        }
    }

    // 禁用拷贝构造和拷贝赋值,避免双重释放
    CudaStream(const CudaStream&) = delete;
    CudaStream& operator=(const CudaStream&) = delete;

    // 移动构造和移动赋值
    CudaStream(CudaStream&& other) noexcept : stream_(other.stream_) {
        other.stream_ = nullptr;
    }
    CudaStream& operator=(CudaStream&& other) noexcept {
        if (this != &other) {
            if (stream_) {
                // 销毁当前Stream
                cudaError_t err = cudaStreamDestroy(stream_);
                if (err != cudaSuccess) {
                    std::cerr << "Warning: Failed to destroy old CUDA stream " << stream_
                              << " during move assignment: " << cudaGetErrorString(err) << std::endl;
                }
            }
            stream_ = other.stream_;
            other.stream_ = nullptr;
        }
        return *this;
    }

    // 获取底层的cudaStream_t句柄
    operator cudaStream_t() const {
        return stream_;
    }

    // 显式同步此Stream
    void synchronize() const {
        checkCudaError(cudaStreamSynchronize(stream_), "Failed to synchronize CUDA stream");
    }

    // 异步内存拷贝 (Host to Device)
    template<typename T>
    void memcpyHtoDAsync(T* dst, const T* src, size_t count) const {
        checkCudaError(cudaMemcpyAsync(dst, src, count * sizeof(T), cudaMemcpyHostToDevice, stream_),
                       "Failed to async memcpy HtoD");
    }

    // 异步内存拷贝 (Device to Host)
    template<typename T>
    void memcpyDtoHAsync(T* dst, const T* src, size_t count) const {
        checkCudaError(cudaMemcpyAsync(dst, src, count * sizeof(T), cudaMemcpyDeviceToHost, stream_),
                       "Failed to async memcpy DtoH");
    }

    // 异步内存拷贝 (Device to Device)
    template<typename T>
    void memcpyDtoDAsync(T* dst, const T* src, size_t count) const {
        checkCudaError(cudaMemcpyAsync(dst, src, count * sizeof(T), cudaMemcpyDeviceToDevice, stream_),
                       "Failed to async memcpy DtoD");
    }

    // 核函数启动封装 (简化版,实际可能需要更复杂的模板参数推导)
    // 这里的核函数需要接受cudaStream_t作为最后一个参数
    template<typename... Args>
    void launchKernel(void (*kernel_func)(Args...), dim3 grid, dim3 block, size_t shmem_bytes, Args... args) const {
        kernel_func<<<grid, block, shmem_bytes, stream_>>>(args...);
        checkCudaError(cudaGetLastError(), "Failed to launch kernel");
    }

    // 记录事件
    void recordEvent(CudaEvent& event) const; // 定义在CudaEvent之后

    // 等待事件
    void waitEvent(const CudaEvent& event) const; // 定义在CudaEvent之后

private:
    cudaStream_t stream_;
};

4.3 CudaEvent 类

这个类将封装 cudaEvent_t 句柄,用于Stream之间的同步和时间测量。

#include <cuda_runtime.h>
#include <string>
#include <iostream>

// Forward declaration for CudaStream
class CudaStream;

class CudaEvent {
public:
    // 构造函数:创建CUDA Event
    explicit CudaEvent(unsigned int flags = cudaEventDefault) {
        checkCudaError(cudaEventCreateWithFlags(&event_, flags), "Failed to create CUDA event");
        // std::cout << "CudaEvent created: " << event_ << std::endl; // For debugging
    }

    // 析构函数:销毁CUDA Event
    ~CudaEvent() {
        if (event_) {
            cudaError_t err = cudaEventDestroy(event_);
            if (err != cudaSuccess) {
                std::cerr << "Warning: Failed to destroy CUDA event " << event_
                          << ": " << cudaGetErrorString(err) << std::endl;
            }
            // std::cout << "CudaEvent destroyed: " << event_ << std::endl; // For debugging
        }
    }

    // 禁用拷贝构造和拷贝赋值
    CudaEvent(const CudaEvent&) = delete;
    CudaEvent& operator=(const CudaEvent&) = delete;

    // 移动构造和移动赋值
    CudaEvent(CudaEvent&& other) noexcept : event_(other.event_) {
        other.event_ = nullptr;
    }
    CudaEvent& operator=(CudaEvent&& other) noexcept {
        if (this != &other) {
            if (event_) {
                // 销毁当前Event
                cudaError_t err = cudaEventDestroy(event_);
                if (err != cudaSuccess) {
                    std::cerr << "Warning: Failed to destroy old CUDA event " << event_
                              << " during move assignment: " << cudaGetErrorString(err) << std::endl;
                }
            }
            event_ = other.event_;
            other.event_ = nullptr;
        }
        return *this;
    }

    // 获取底层的cudaEvent_t句柄
    operator cudaEvent_t() const {
        return event_;
    }

    // 记录事件到指定的Stream
    void record(const CudaStream& stream) {
        checkCudaError(cudaEventRecord(event_, stream), "Failed to record event");
    }

    // 阻塞CPU直到事件完成
    void synchronize() const {
        checkCudaError(cudaEventSynchronize(event_), "Failed to synchronize event");
    }

    // 检查事件是否完成 (非阻塞)
    bool query() const {
        cudaError_t err = cudaEventQuery(event_);
        if (err == cudaSuccess) return true;
        if (err == cudaErrorNotReady) return false;
        throw CudaError(err, "Failed to query event");
    }

    // 计算两个事件之间的时间差 (ms)
    float elapsedTime(const CudaEvent& start_event) const {
        float ms;
        checkCudaError(cudaEventElapsedTime(&ms, start_event.event_, event_), "Failed to calculate elapsed time");
        return ms;
    }

private:
    cudaEvent_t event_;
};

// CudaStream的成员函数定义 (需要CudaEvent完整定义后才能定义)
inline void CudaStream::recordEvent(CudaEvent& event) const {
    event.record(*this);
}

inline void CudaStream::waitEvent(const CudaEvent& event) const {
    checkCudaError(cudaStreamWaitEvent(stream_, event, 0), "Failed to wait for event");
}

4.4 CudaMemory 类 (可选,但非常推荐)

为了完整的RAII体验,封装设备内存的分配和释放。

#include <cuda_runtime.h>
#include <cstddef> // For size_t
#include <memory>  // For std::unique_ptr

template<typename T>
class CudaMemory {
public:
    // 构造函数:分配设备内存
    explicit CudaMemory(size_t count) : count_(count) {
        checkCudaError(cudaMalloc(&ptr_, count * sizeof(T)), "Failed to allocate device memory");
        // std::cout << "CudaMemory allocated: " << ptr_ << std::endl; // For debugging
    }

    // 析构函数:释放设备内存
    ~CudaMemory() {
        if (ptr_) {
            cudaError_t err = cudaFree(ptr_);
            if (err != cudaSuccess) {
                std::cerr << "Warning: Failed to free device memory " << ptr_
                          << ": " << cudaGetErrorString(err) << std::endl;
            }
            // std::cout << "CudaMemory freed: " << ptr_ << std::endl; // For debugging
        }
    }

    // 禁用拷贝构造和拷贝赋值
    CudaMemory(const CudaMemory&) = delete;
    CudaMemory& operator=(const CudaMemory&) = delete;

    // 移动构造和移动赋值
    CudaMemory(CudaMemory&& other) noexcept
        : ptr_(other.ptr_), count_(other.count_) {
        other.ptr_ = nullptr;
        other.count_ = 0;
    }

    CudaMemory& operator=(CudaMemory&& other) noexcept {
        if (this != &other) {
            if (ptr_) {
                // 销毁当前内存
                cudaError_t err = cudaFree(ptr_);
                if (err != cudaSuccess) {
                    std::cerr << "Warning: Failed to free old device memory " << ptr_
                              << " during move assignment: " << cudaGetErrorString(err) << std::endl;
                }
            }
            ptr_ = other.ptr_;
            count_ = other.count_;
            other.ptr_ = nullptr;
            other.count_ = 0;
        }
        return *this;
    }

    // 获取指向设备内存的指针
    T* get() const {
        return ptr_;
    }

    // 隐式转换为T*,方便直接传递给CUDA函数
    operator T*() const {
        return ptr_;
    }

    // 获取元素数量
    size_t size() const {
        return count_;
    }

    // 获取总字节数
    size_t byteSize() const {
        return count_ * sizeof(T);
    }

private:
    T* ptr_;
    size_t count_;
};

通过这些C++封装,我们的CUDA代码将变得更加简洁、安全,并且符合C++编程习惯。

5. 基于Stream的异步编排:流水线化计算

现在,我们利用封装好的C++类,来实现一个更高效的异步向量加法。我们将数据分成多个块,并采用流水线(pipeline)的方式进行处理:

  1. 将数据块A从主机传输到设备。
  2. 在GPU上处理数据块A。
  3. 将数据块A的结果从设备传输回主机。

这三个步骤可以通过不同的Stream并行执行,形成一个三阶段的流水线。

核函数 (与之前相同):

__global__ void addVectors(float* a, float* b, float* c, int N) {
    int idx = blockIdx.x * blockDim.x + threadIdx.x;
    if (idx < N) {
        c[idx] = a[idx] + b[idx];
    }
}

主程序逻辑:

#include <iostream>
#include <vector>
#include <chrono>
#include <numeric> // For std::iota

// 包含我们之前定义的C++封装类
// #include "CudaError.h"
// #include "CudaStream.h"
// #include "CudaEvent.h"
// #include "CudaMemory.h"
// 为了演示方便,这里假设它们都在同一个文件或已正确包含

int main() {
    const int TOTAL_N = 1 << 22; // 4M elements total
    const int CHUNK_SIZE = 1 << 20; // 1M elements per chunk
    const int NUM_CHUNKS = TOTAL_N / CHUNK_SIZE;

    if (TOTAL_N % CHUNK_SIZE != 0) {
        std::cerr << "Error: TOTAL_N must be divisible by CHUNK_SIZE." << std::endl;
        return 1;
    }

    size_t chunk_byte_size = CHUNK_SIZE * sizeof(float);

    // 1. 主机内存分配与初始化
    std::vector<float> h_a_total(TOTAL_N), h_b_total(TOTAL_N), h_c_total(TOTAL_N);
    for (int i = 0; i < TOTAL_N; ++i) {
        h_a_total[i] = static_cast<float>(i);
        h_b_total[i] = static_cast<float>(i * 2);
    }

    // 2. 创建CudaStream对象
    // 我们将使用3个Stream来形成流水线:
    // stream_h2d: 负责主机到设备的传输
    // stream_kernel: 负责核函数计算
    // stream_d2h: 负责设备到主机的传输
    CudaStream stream_h2d;
    CudaStream stream_kernel;
    CudaStream stream_d2h;

    // 3. 创建CudaEvent对象
    // 事件用于Stream之间的同步,确保操作的依赖关系
    // h2d_done[i]: 标记第i个chunk的h2d传输完成
    // kernel_done[i]: 标记第i个chunk的核函数计算完成
    std::vector<CudaEvent> h2d_done(NUM_CHUNKS);
    std::vector<CudaEvent> kernel_done(NUM_CHUNKS);

    // 4. 分配设备内存
    // 为每个chunk分配独立的设备内存区域,或者使用3个区域轮流使用
    // 这里我们为简化起见,使用3个设备内存区域进行循环使用,以适应流水线
    const int NUM_DEVICE_BUFFERS = 3; // 对应 h2d, kernel, d2h 阶段
    std::vector<CudaMemory<float>> d_a_buffers;
    std::vector<CudaMemory<float>> d_b_buffers;
    std::vector<CudaMemory<float>> d_c_buffers;
    for (int i = 0; i < NUM_DEVICE_BUFFERS; ++i) {
        d_a_buffers.emplace_back(CHUNK_SIZE);
        d_b_buffers.emplace_back(CHUNK_SIZE);
        d_c_buffers.emplace_back(CHUNK_SIZE);
    }

    auto start_time = std::chrono::high_resolution_clock::now();

    // 5. 流水线循环
    int blockSize = 256;
    int gridSize = (CHUNK_SIZE + blockSize - 1) / blockSize;

    for (int i = 0; i < NUM_CHUNKS + NUM_DEVICE_BUFFERS - 1; ++i) {
        int current_input_chunk = i;
        int current_process_chunk = i - 1;
        int current_output_chunk = i - 2;

        // 计算当前循环使用的设备缓冲区索引
        int buffer_idx = i % NUM_DEVICE_BUFFERS;

        // 阶段 1: 将数据传输到设备 (HtoD)
        if (current_input_chunk < NUM_CHUNKS) {
            // 等待此缓冲区之前的D2H传输完成(如果缓冲区被重复使用)
            if (current_input_chunk >= NUM_DEVICE_BUFFERS) {
                stream_h2d.waitEvent(kernel_done[current_input_chunk - NUM_DEVICE_BUFFERS]);
            }

            const float* h_a_ptr = h_a_total.data() + current_input_chunk * CHUNK_SIZE;
            const float* h_b_ptr = h_b_total.data() + current_input_chunk * CHUNK_SIZE;

            stream_h2d.memcpyHtoDAsync(d_a_buffers[buffer_idx].get(), h_a_ptr, CHUNK_SIZE);
            stream_h2d.memcpyHtoDAsync(d_b_buffers[buffer_idx].get(), h_b_ptr, CHUNK_SIZE);
            stream_h2d.recordEvent(h2d_done[current_input_chunk]); // 标记H2D完成
        }

        // 阶段 2: 核函数计算 (Kernel)
        if (current_process_chunk >= 0 && current_process_chunk < NUM_CHUNKS) {
            stream_kernel.waitEvent(h2d_done[current_process_chunk]); // 等待H2D传输完成

            stream_kernel.launchKernel(addVectors, dim3(gridSize), dim3(blockSize), 0,
                                       d_a_buffers[buffer_idx].get(),
                                       d_b_buffers[buffer_idx].get(),
                                       d_c_buffers[buffer_idx].get(),
                                       CHUNK_SIZE);
            stream_kernel.recordEvent(kernel_done[current_process_chunk]); // 标记核函数计算完成
        }

        // 阶段 3: 将结果传输回主机 (DtoH)
        if (current_output_chunk >= 0 && current_output_chunk < NUM_CHUNKS) {
            stream_d2h.waitEvent(kernel_done[current_output_chunk]); // 等待核函数计算完成

            float* h_c_ptr = h_c_total.data() + current_output_chunk * CHUNK_SIZE;
            stream_d2h.memcpyDtoHAsync(h_c_ptr, d_c_buffers[buffer_idx].get(), CHUNK_SIZE);
            // 这里不需要记录事件,因为这是流水线的最后一步,直接等待所有任务完成即可
        }
    }

    // 等待所有Stream中的任务完成
    stream_h2d.synchronize();
    stream_kernel.synchronize();
    stream_d2h.synchronize();
    // 也可以直接 cudaDeviceSynchronize();

    auto end_time = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed_time = end_time - start_time;
    std::cout << "Asynchronous pipelined execution time: " << elapsed_time.count() << " seconds" << std::endl;

    // 验证结果 (可选)
    // for (int i = 0; i < TOTAL_N; ++i) {
    //     if (h_c_total[i] != h_a_total[i] + h_b_total[i]) {
    //         std::cerr << "Mismatch at index " << i << ": "
    //                   << h_c_total[i] << " != " << h_a_total[i] + h_b_total[i] << std::endl;
    //         return 1;
    //     }
    // }
    // std::cout << "Result verification successful." << std::endl;

    return 0;
}

流水线工作原理分析:

这个示例构建了一个三阶段的流水线:

  • 阶段1 (HtoD): stream_h2d 负责将新的数据块从主机内存传输到设备内存。
  • 阶段2 (Kernel): stream_kernel 负责在GPU上执行核函数。它会等待相应数据块的HtoD传输完成事件 (h2d_done[i]) 后才开始计算。
  • 阶段3 (DtoH): stream_d2h 负责将已处理的数据块结果从设备内存传输回主机内存。它会等待相应数据块的核函数计算完成事件 (kernel_done[i]) 后才开始传输。

表 2: 流水线各阶段操作及依赖关系

迭代 i stream_h2d (当前输入 i) stream_kernel (当前处理 i-1) stream_d2h (当前输出 i-2)
0 HtoD (chunk 0)
1 HtoD (chunk 1) Kernel (chunk 0)
2 HtoD (chunk 2) Kernel (chunk 1) DtoH (chunk 0)
N-1 HtoD (chunk N-1) Kernel (chunk N-2) DtoH (chunk N-3)
N Kernel (chunk N-1) DtoH (chunk N-2)
N+1 DtoH (chunk N-1)

在循环的每一次迭代中,我们尝试提交三个不同阶段的任务。通过 waitEvent 机制,我们确保了数据依赖的正确性,同时允许不同Stream上的独立操作并发执行。例如,当 stream_kernel 正在处理 chunk 0 时,stream_h2d 已经可以开始传输 chunk 1 的数据了,甚至 stream_d2h 也可以开始传输 chunk -1(如果存在的话)的结果。这种重叠极大地提高了GPU的利用率。

需要注意的是,设备缓冲区 (d_a_buffers, d_b_buffers, d_c_buffers) 的数量应至少为流水线阶段数,这里是3。它们被循环使用,因此在传输新数据到某个缓冲区之前,必须确保该缓冲区上之前的计算或传输已经完成,这正是 stream_h2d.waitEvent(kernel_done[current_input_chunk - NUM_DEVICE_BUFFERS]); 的作用。

6. 进阶考量

6.1 Stream优先级

CUDA允许为Stream设置优先级,这在多任务、资源竞争的场景下非常有用。例如,你可以让一个对延迟敏感的Stream拥有更高的优先级。

int leastPriority, greatestPriority;
checkCudaError(cudaDeviceGetStreamPriorityRange(&leastPriority, &greatestPriority));

CudaStream highPriorityStream(cudaStreamNonBlocking, greatestPriority);
CudaStream lowPriorityStream(cudaStreamNonBlocking, leastPriority);

这里的 CudaStream 构造函数需要扩展以支持优先级参数。

6.2 CUDA Host Function 回调

cudaLaunchHostFn 允许你在GPU Stream中的某个点插入一个主机函数回调。这使得CPU可以在GPU Stream的特定操作完成后,在不阻塞整个Stream的情况下执行一些操作。这对于实现更复杂的CPU-GPU同步逻辑,或者在GPU完成部分任务后立即进行CPU端的数据处理非常有用。

void CUDART_CB hostCallback(void* userData) {
    std::cout << "Host function callback executed after GPU ops in stream." << std::endl;
    int* data = static_cast<int*>(userData);
    // ... do some CPU work with data ...
    delete data; // Free allocated data if necessary
}

// In main or a CudaStream method:
// int* callback_data = new int(123);
// stream_kernel.recordHostFn(hostCallback, callback_data);

CudaStream 类可以添加一个 recordHostFn 方法来封装 cudaLaunchHostFn

6.3 Unified Memory 与 Stream

CUDA Unified Memory (UM) 简化了CPU和GPU之间的数据管理。当结合Stream使用时,UM可以提供更灵活的异步数据访问模式。通过 cudaMemPrefetchAsynccudaStreamAttachMemAsync,你可以指示运行时在哪个Stream中预取数据或管理内存访问,进一步优化性能。

6.4 多 GPU 编排

如果系统中有多个GPU,你可以为每个GPU创建独立的Stream和Context,并通过 cudaSetDevice 在不同的设备之间切换,提交任务。对于需要跨GPU通信的场景,NVIDIA的NCCL库提供了优化的集体通信原语,而CUDA的Peer-to-Peer (P2P) 访问则允许GPU直接访问彼此的内存。

6.5 错误处理和调试

在异步编程中,错误可能不易发现。cudaGetLastError() 只能检查最近一次核函数启动或异步操作的错误。对于长时间运行的异步Stream,最好在关键点使用 cudaStreamSynchronize()cudaEventSynchronize() 来强制同步并检查错误。

Profiling 是异步编程中不可或缺的一环。使用NVIDIA Nsight Systems或旧版NVPROF等工具,可以可视化GPU上的Stream活动、核函数执行、数据传输,以及它们之间的重叠情况。这能帮助你验证异步行为是否按预期工作,并找出性能瓶颈。

7. C++ 封装与工程实践

通过C++封装CUDA Stream和Event,我们获得了以下显著优势:

  • RAII (Resource Acquisition Is Initialization):Stream和Event的生命周期与C++对象的生命周期绑定,自动管理资源的创建和销毁,有效避免资源泄漏。
  • 类型安全和抽象:封装隐藏了底层的 cudaStream_tcudaEvent_t 句柄,提供了更高级别的、类型安全的接口,降低了误用的可能性。
  • 代码可读性与维护性:清晰的类和方法命名使得代码意图更明确,易于理解和维护。
  • 错误处理集中化:通过 CudaError 类和 checkCudaError 辅助函数,可以集中处理CUDA API调用可能产生的错误,提高代码的健壮性。
  • 灵活性:C++模板和多态性可以进一步扩展这些封装,例如,为不同数据类型或更复杂的核函数签名提供通用的启动方法。

在实际工程中,除了Stream和Event,还可以考虑封装设备内存、纹理内存、常数内存等,构建一套完整的CUDA C++工具集。然而,过度封装也可能引入不必要的复杂性或性能开销,关键在于找到抽象级别和性能之间的最佳平衡点。

通过C++对CUDA Stream的巧妙封装,我们能够以更加现代、安全且高效的方式,驾驭GPU的强大并行能力,实现复杂异构计算任务的异步编排。这不仅提升了代码的质量,也为高性能计算应用的开发带来了前所未有的灵活性和生产力。记住,异步编程的艺术在于理解任务依赖、合理划分工作负载,并利用Stream和Event进行精细协调,最终的目标是最大化硬件资源的利用率,从而达到卓越的性能表现。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注