C++ 异构任务图(Task Graph):在 C++ 调度器中构建任务依赖链路以实现硬件单元的重叠执行(Overlapping)

C++ 异构任务图:构建调度链路以实现硬件单元重叠执行

在现代高性能计算领域,单一的处理器架构已无法满足日益增长的计算需求。从通用处理器(CPU)到图形处理器(GPU)、现场可编程门阵列(FPGA)、数字信号处理器(DSP)乃至各类定制加速器,异构计算系统已成为主流。这些不同的硬件单元各有所长,擅长处理特定类型的工作负载。然而,如何高效地协调这些异构资源,使它们能够协同工作并最大限度地发挥其潜力,是系统设计者面临的核心挑战。其中一个关键策略便是通过任务图(Task Graph)来构建任务依赖链路,从而实现硬件单元的重叠执行(Overlapping),有效隐藏延迟,提升系统吞吐量和资源利用率。

本讲座将深入探讨如何在 C++ 调度器中设计和实现异构任务图,以实现硬件单元之间的无缝协作和重叠执行。我们将从任务图的基本概念出发,逐步构建任务、依赖关系、调度器和执行机制,并着重讨论如何利用异步编程和硬件特性来实现真正的重叠。

1. 异构计算的挑战与重叠执行的必然性

1.1 异构计算的背景

异构计算系统由多种不同类型的处理器组成,每种处理器针对特定的计算模式进行了优化。

  • CPU (Central Processing Unit):通用性强,擅长串行逻辑控制、复杂分支预测和低延迟单线程操作。
  • GPU (Graphics Processing Unit):拥有数千个小核心,擅长大规模并行数据处理,尤其适用于矩阵运算、图像处理等计算密集型任务。
  • FPGA (Field-Programmable Gate Array):提供硬件级别的可编程性,允许用户定制电路逻辑,实现极低延迟和高吞吐量的特定算法。
  • DSP (Digital Signal Processor):专为数字信号处理优化,在音频、视频编解码等领域表现优异。
  • ASIC (Application-Specific Integrated Circuit):为特定应用定制的硬件,性能和能效最高,但缺乏灵活性。

将这些单元组合起来,可以实现“取长补短”,例如,CPU 处理控制流和数据预处理,GPU 执行大规模并行计算,FPGA 处理实时流数据。

1.2 为什么需要重叠执行?

异构系统虽然强大,但也带来了数据传输和同步的开销。例如,将数据从 CPU 主内存传输到 GPU 显存,或等待 GPU 计算完成后再传回 CPU。这些操作通常是串行执行的,会引入显著的延迟,导致硬件单元空闲,系统整体性能下降。

重叠执行的核心思想是:当一个硬件单元(如 CPU)在处理某个任务时,另一个硬件单元(如 GPU)可以同时处理另一个不相关的任务,或者在数据传输的同时进行计算。其主要优势包括:

  • 延迟隐藏 (Latency Hiding):将数据传输延迟、设备初始化延迟等隐藏在计算任务之后,或将计算延迟隐藏在数据传输之后。
  • 吞吐量最大化 (Throughput Maximization):多个硬件单元并行工作,单位时间内完成更多任务。
  • 资源利用率提升 (Resource Utilization Improvement):减少硬件单元的空闲时间,充分利用宝贵的计算资源。

例如,在图像处理管线中,当 CPU 正在处理图像 A 的结果时,GPU 可以同时处理图像 B 的卷积操作,而数据传输单元可以将图像 C 的数据异步传输到 GPU。这种多层面的并行与重叠是实现极致性能的关键。

1.3 任务图作为解决方案

为了实现复杂的重叠执行,我们需要一种机制来明确表达任务之间的依赖关系、它们应在哪个设备上执行以及数据如何流动。任务图(Task Graph)正是这样一种强大的抽象。

任务图是一个有向无环图(DAG),其中:

  • 节点(Nodes):代表独立的计算或数据传输任务。
  • 边(Edges):代表任务之间的依赖关系。如果任务 B 依赖于任务 A,则从 A 到 B 有一条有向边,表示 A 必须在 B 之前完成。

通过任务图,调度器可以:

  1. 显式管理依赖:确保任务按照正确的顺序执行。
  2. 自动发现并行性:没有依赖关系的节点可以并行执行。
  3. 灵活分配资源:根据任务类型将其分配给最合适的硬件单元。
  4. 支持异步操作:利用依赖关系在不阻碍其他任务的情况下启动异步操作。

接下来,我们将深入探讨如何构建这样的任务图和调度器。

2. 任务图基础:节点与边的定义

2.1 任务(Task)的抽象与分类

在异构任务图中,每一个可独立执行的工作单元都被抽象为一个 Task。这些任务可能在不同的设备上运行,处理不同的数据类型,并具有不同的执行特性。因此,我们需要一个通用的任务接口和针对特定设备类型的具体实现。

DeviceType 枚举:首先,定义一个枚举来标识任务将在哪个类型的设备上执行。

// Common.h
#pragma once
#include <string>
#include <set>
#include <vector>
#include <atomic>
#include <mutex>
#include <functional>
#include <iostream>
#include <chrono>
#include <thread>
#include <queue>
#include <map>
#include <condition_variable>
#include <memory>

// Task ID type
using TaskID = unsigned long long;

// Device types for heterogeneous tasks
enum class DeviceType {
    CPU,        // Central Processing Unit
    GPU,        // Graphics Processing Unit (e.g., CUDA, OpenCL)
    MEMORY,     // Data transfer tasks (e.g., Host-to-Device, Device-to-Host)
    // Add other device types as needed (e.g., FPGA, DSP)
};

// Task execution status
enum class TaskStatus {
    PENDING,    // Task has been added, waiting for predecessors to complete
    READY,      // All predecessors completed, ready for dispatch
    RUNNING,    // Currently being executed on a device
    COMPLETED,  // Execution finished successfully
    FAILED      // Execution failed
};

Task 基类:所有具体任务类的基础,定义了任务的通用属性和行为。

// Task.h
#pragma once
#include "Common.h"

class Task {
public:
    Task(std::string name, DeviceType type)
        : id_(generate_id()), name_(std::move(name)), type_(type), status_(TaskStatus::PENDING) {}
    virtual ~Task() = default;

    // Accessors
    TaskID get_id() const { return id_; }
    const std::string& get_name() const { return name_; }
    DeviceType get_device_type() const { return type_; }
    TaskStatus get_status() const { return status_; }

    // Mutators (thread-safe for status updates)
    void set_status(TaskStatus status) {
        std::lock_guard<std::mutex> lock(mtx_);
        status_ = status;
    }

    // Dependency management
    void add_predecessor(TaskID pred_id) {
        std::lock_guard<std::mutex> lock(mtx_);
        predecessors_.insert(pred_id);
    }
    void add_successor(TaskID succ_id) {
        std::lock_guard<std::mutex> lock(mtx_);
        successors_.insert(succ_id);
    }
    const std::set<TaskID>& get_predecessors() const { return predecessors_; }
    const std::set<TaskID>& get_successors() const { return successors_; }

    // Notifies this task that one of its predecessors has completed.
    // This method is called by the scheduler.
    void notify_predecessor_completed(TaskID pred_id) {
        std::lock_guard<std::mutex> lock(mtx_);
        completed_predecessors_.insert(pred_id);
        if (status_ == TaskStatus::PENDING && is_ready_internal()) {
            status_ = TaskStatus::READY; // Transition to READY
        }
    }

    // Checks if the task is ready to be executed (all predecessors completed).
    // This check is performed by the scheduler.
    bool is_ready() const {
        std::lock_guard<std::mutex> lock(mtx_);
        return status_ == TaskStatus::READY;
    }

    // Pure virtual method for task execution.
    // Concrete task types will implement this.
    virtual void execute() = 0;

protected:
    mutable std::mutex mtx_; // Protects status_ and dependency tracking
    TaskID id_;
    std::string name_;
    DeviceType type_;
    TaskStatus status_;

    std::set<TaskID> predecessors_; // Tasks that must complete before this task can start
    std::set<TaskID> successors_;   // Tasks that depend on this task
    std::set<TaskID> completed_predecessors_; // Track completed predecessors

    // Internal check for readiness without locking (assumes mtx_ is held)
    bool is_ready_internal() const {
        return completed_predecessors_.size() == predecessors_.size();
    }

    // Generates a unique ID for each task
    static TaskID generate_id() {
        static std::atomic<TaskID> next_id = 0;
        return next_id++;
    }
};

具体任务实现

  1. CpuTask: 在 CPU 上执行的通用任务。它封装一个 std::function

    // CpuTask.h
    #pragma once
    #include "Task.h"
    
    class CpuTask : public Task {
    public:
        using Func = std::function<void()>;
        CpuTask(std::string name, Func func)
            : Task(std::move(name), DeviceType::CPU), func_(std::move(func)) {}
    
        void execute() override {
            std::cout << "[CPU] Executing: " << get_name() << " (ID: " << get_id() << ")" << std::endl;
            func_(); // Call the encapsulated function
            std::cout << "[CPU] Completed: " << get_name() << " (ID: " << get_id() << ")" << std::endl;
        }
    
    private:
        Func func_;
    };
  2. GpuTask: 在 GPU 上执行的任务。在实际系统中,这将涉及 CUDA/OpenCL API 调用。为了简化示例,我们仍然使用 std::function,但其内部会模拟 GPU 工作。

    // GpuTask.h
    #pragma once
    #include "Task.h"
    
    // Forward declare CUDA stream type to avoid including CUDA headers everywhere
    struct CUstream_st;
    using cudaStream_t = CUstream_st*;
    
    class GpuTask : public Task {
    public:
        // KernelFunc takes a cudaStream_t for asynchronous execution
        using KernelFunc = std::function<void(cudaStream_t)>;
        GpuTask(std::string name, KernelFunc func)
            : Task(std::move(name), DeviceType::GPU), kernel_func_(std::move(func)) {}
    
        void execute() override {
            // In a real system, the scheduler's GPU_Executor would provide a stream.
            // For this example, we simulate the GPU work.
            // The actual CUDA launch would be something like:
            // kernel_func_(this->stream_); // assuming stream_ is assigned by executor
    
            // Simulate GPU kernel execution. This 'execute' method is called
            // by a dedicated GPU thread or asynchronous mechanism in the scheduler.
            std::cout << "[GPU] Launching: " << get_name() << " (ID: " << get_id() << ")" << std::endl;
            // The KernelFunc itself would encapsulate the actual CUDA kernel launch.
            // For this simplified demo, we just simulate the time.
            // A dummy stream value passed, as we're not doing real CUDA calls here.
            kernel_func_(nullptr); 
            std::cout << "[GPU] Completed: " << get_name() << " (ID: " << get_id() << ")" << std::endl;
        }
    
        // In a real implementation, you might need to set the stream
        // void set_stream(cudaStream_t stream) { stream_ = stream; }
    
    private:
        KernelFunc kernel_func_;
        // cudaStream_t stream_ = nullptr; // Managed by the GPU executor
    };
  3. DataTransferTask: 负责在不同设备内存之间传输数据。这是实现重叠执行的关键环节,因为数据传输通常可以与计算并行。

    // DataTransferTask.h
    #pragma once
    #include "Task.h"
    
    class DataTransferTask : public Task {
    public:
        enum class TransferDirection {
            HOST_TO_DEVICE,
            DEVICE_TO_HOST,
            DEVICE_TO_DEVICE // P2P transfer if supported
        };
    
        DataTransferTask(std::string name, TransferDirection dir, size_t size_bytes)
            : Task(std::move(name), DeviceType::MEMORY), direction_(dir), size_bytes_(size_bytes) {}
    
        void execute() override {
            std::string dir_str;
            switch (direction_) {
                case TransferDirection::HOST_TO_DEVICE: dir_str = "H2D"; break;
                case TransferDirection::DEVICE_TO_HOST: dir_str = "D2H"; break;
                case TransferDirection::DEVICE_TO_DEVICE: dir_str = "D2D"; break;
            }
            std::cout << "[MEM] Transferring (" << dir_str << ", " << size_bytes_ << " bytes): "
                      << get_name() << " (ID: " << get_id() << ")" << std::endl;
            // Simulate asynchronous memory transfer
            std::this_thread::sleep_for(std::chrono::milliseconds(size_bytes_ / (1024 * 1024) * 10 + 5)); // Simulate transfer time
            std::cout << "[MEM] Completed Transfer: " << get_name() << " (ID: " << get_id() << ")" << std::endl;
        }
    
    private:
        TransferDirection direction_;
        size_t size_bytes_;
        // Pointers to source/destination data buffers would be managed here
    };

2.2 数据依赖与数据句柄

任务之间除了控制流依赖(即 A 完成后 B 才能开始)外,更重要的是数据流依赖。一个任务的输出可能成为另一个任务的输入。为了安全、高效地管理这些数据,尤其是在异构内存空间中,我们通常采用数据句柄(Data Handle)或共享数据结构。

  • DataHandle 概念:一个抽象的引用,指向一段数据。这个句柄可以包含数据的类型、大小、以及它当前所在设备的内存地址(CPU 内存、GPU 显存等)。当任务需要数据时,它通过句柄获取。
  • 内存管理:调度器或专门的内存管理器负责在不同设备之间移动数据,并确保数据在被访问时位于正确的设备上。这通常涉及:
    • 统一内存 (Unified Memory):在 CUDA 中,允许 CPU 和 GPU 共享一个地址空间,简化数据管理,但性能可能不如显式传输。
    • Pinned Memory (页锁定内存):CPU 端的一种特殊内存,可以直接被 GPU 访问,加速 Host-to-Device 传输。
    • 异步传输:使用 cudaMemcpyAsync 等函数,允许数据传输与计算重叠。

在上述 Task 基类中,我们主要关注了控制流依赖 (add_predecessor, add_successor)。在更复杂的系统中,Task 类会包含 input_data_handles_output_data_handles_ 这样的成员,并且 execute 方法会利用这些句柄来访问和生成数据。

示例:一个简化的数据句柄

// DataHandle.h
#pragma once
#include "Common.h"
#include <variant> // For holding different types of pointers

// Forward declarations for device-specific pointers
void* get_cpu_ptr_from_data_handle(const TaskID data_id);
void* get_gpu_ptr_from_data_handle(const TaskID data_id);

// Represents a piece of data that can reside on different devices
class DataHandle {
public:
    DataHandle(size_t size_bytes, DeviceType initial_location)
        : id_(generate_id()), size_bytes_(size_bytes), current_location_(initial_location) {
        // Allocate initial memory based on initial_location
        if (initial_location == DeviceType::CPU) {
            cpu_ptr_ = std::make_shared<std::vector<char>>(size_bytes);
        } else if (initial_location == DeviceType::GPU) {
            // Simulate GPU memory allocation
            // cudaMalloc(&gpu_ptr_, size_bytes_);
            gpu_ptr_ = nullptr; // Placeholder for actual GPU pointer
        }
    }

    ~DataHandle() {
        // Free GPU memory if allocated
        // if (current_location_ == DeviceType::GPU && gpu_ptr_) {
        //     cudaFree(gpu_ptr_);
        // }
    }

    TaskID get_id() const { return id_; }
    size_t get_size_bytes() const { return size_bytes_; }
    DeviceType get_current_location() const { return current_location_; }
    void set_current_location(DeviceType new_location) { current_location_ = new_location; }

    // Get raw pointers (requires casting by the user)
    std::shared_ptr<std::vector<char>> get_cpu_buffer() const { return cpu_ptr_; }
    void* get_gpu_buffer() const { return gpu_ptr_; } // This would be the actual device pointer

private:
    TaskID id_; // Unique ID for this data handle
    size_t size_bytes_;
    DeviceType current_location_;

    std::shared_ptr<std::vector<char>> cpu_ptr_; // CPU memory
    void* gpu_ptr_; // Placeholder for GPU device pointer

    static TaskID generate_id() {
        static std::atomic<TaskID> next_id = 0;
        return next_id++;
    }
};

// Example of how a Task might use DataHandle (conceptual)
/*
class MyGpuComputeTask : public GpuTask {
public:
    MyGpuComputeTask(std::string name, std::shared_ptr<DataHandle> input, std::shared_ptr<DataHandle> output)
        : GpuTask(std::move(name), std::bind(&MyGpuComputeTask::run_kernel, this, std::placeholders::_1)),
          input_data_(input), output_data_(output) {}

private:
    std::shared_ptr<DataHandle> input_data_;
    std::shared_ptr<DataHandle> output_data_;

    void run_kernel(cudaStream_t stream) {
        // Ensure input data is on GPU (scheduler's job, or explicit transfer task)
        void* input_gpu_ptr = input_data_->get_gpu_buffer();
        void* output_gpu_ptr = output_data_->get_gpu_buffer();

        // Launch actual CUDA kernel
        // my_cuda_kernel<<<grid, block, 0, stream>>>(input_gpu_ptr, output_gpu_ptr, ...);
        std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Simulate work
    }
};
*/

在实际应用中,DataHandle 的实现会更加复杂,需要考虑多种数据类型、数据生命周期、所有权以及跨设备数据一致性。调度器会识别 DataTransferTask,并根据 DataHandle 的当前位置和目标位置决定执行何种传输操作。

3. 调度器架构:异构资源与异步执行

调度器是任务图的“大脑”,负责解析任务图,管理异构资源,并根据依赖关系和资源可用性调度任务执行。实现重叠执行是调度器的核心目标。

3.1 调度器核心职责

  • 任务管理:存储所有任务,跟踪其状态(PENDING, READY, RUNNING, COMPLETED)。
  • 依赖解析:当一个任务完成时,通知其所有后继任务,并检查后继任务是否已就绪。
  • 资源管理:管理 CPU 线程池、GPU 流/队列等计算资源,确保任务被分配到正确的设备上。
  • 任务分发:从就绪队列中取出任务,将其调度到对应的硬件单元上执行。
  • 异步执行与同步:利用设备提供的异步机制(如 CUDA Stream、std::async),并在必要时插入同步点。
  • 性能监控:跟踪任务执行时间、资源利用率等,以便优化。

3.2 调度器组件

一个典型的异构调度器可能包含以下组件:

组件名称 职责
TaskRegistry 存储所有 Task 对象,通过 TaskID 进行查找。
ReadyQueue 维护不同设备类型的就绪任务队列(例如 cpu_ready_queue, gpu_ready_queue)。
CpuExecutor 管理一个 CPU 线程池,从 cpu_ready_queue 中取任务并在线程中执行。
GpuExecutor 管理 GPU 设备和 CUDA 流/OpenCL 队列,从 gpu_ready_queue 中取任务并异步启动 GPU 内核。
MemoryExecutor 负责数据传输任务,通常利用 GPU 异步传输或 DMA 机制。可以与 GpuExecutor 合并。
DependencyResolver 当任务完成时,更新依赖计数,并使依赖它的任务进入 READY 状态。
CompletionHandler 接收任务完成通知,调用 DependencyResolver,并可能触发下一个调度周期。
ResourceManager 跟踪 CPU 线程、GPU 流的可用性,避免资源超载。

3.3 实现一个简化的 Scheduler

// Scheduler.h
#pragma once
#include "Common.h"
#include "Task.h"
#include "CpuTask.h"
#include "GpuTask.h"
#include "DataTransferTask.h"

// For efficient concurrent queue for worker threads
#include <moodycamel/concurrentqueue.h> 

// Forward declare CUDA stream type
struct CUstream_st;
using cudaStream_t = CUstream_st*;

class Scheduler {
public:
    Scheduler(int num_cpu_threads, int num_gpu_streams);
    ~Scheduler();

    // Add a task to the scheduler
    void add_task(std::shared_ptr<Task> task);

    // Add a dependency between two tasks (predecessor must complete before successor)
    void add_dependency(TaskID predecessor_id, TaskID successor_id);

    // Starts the main scheduling loop
    void run();

    // Waits for all tasks to complete
    void wait_for_completion();

private:
    // Task storage
    std::map<TaskID, std::shared_ptr<Task>> tasks_;

    // Queues for ready tasks, protected by mtx_
    std::queue<TaskID> cpu_ready_queue_guarded_; // For tasks ready for CPU
    std::queue<TaskID> gpu_ready_queue_guarded_; // For tasks ready for GPU/Memory

    // Mutex for protecting shared scheduler state (tasks_, ready queues, counts)
    mutable std::mutex mtx_;
    // Condition variable to signal completion or new ready tasks
    std::condition_variable cv_;

    // Control for scheduler's main loop and worker threads
    std::atomic<bool> running_;
    std::atomic<int> completed_tasks_count_;
    std::atomic<int> total_tasks_count_;

    // CPU Worker Threads (CpuExecutor)
    std::vector<std::thread> cpu_worker_threads_;
    // Concurrent queue for CPU tasks, accessed by worker threads
    moodycamel::ConcurrentQueue<TaskID> cpu_task_queue_internal_; 

    // GPU Streams (GpuExecutor) - Simplified for this example
    // In a real system, this would be managed by a dedicated GPU_Executor class
    std::vector<cudaStream_t> gpu_streams_;
    // For simplicity, we'll use a single thread to simulate async GPU launches
    // and completions, but a real GPU_Executor would be more robust.
    std::vector<std::thread> gpu_worker_threads_; // One thread per stream (or a few)
    moodycamel::ConcurrentQueue<TaskID> gpu_task_queue_internal_; 

    // Worker loop for CPU threads
    void cpu_worker_loop();

    // Worker loop for GPU/Memory tasks (simulated async)
    void gpu_worker_loop(cudaStream_t stream);

    // Dispatches tasks from ready queues to internal worker queues
    void dispatch_ready_tasks();

    // Processes a completed task, updates status, and notifies successors
    void process_completed_task(TaskID task_id);

    // Notifies successors of a completed task and potentially makes them ready
    void notify_successors(TaskID completed_task_id);
};

3.4 Scheduler 实现细节

// Scheduler.cpp
#include "Scheduler.h"

// Constructor: Initializes worker threads and GPU streams
Scheduler::Scheduler(int num_cpu_threads, int num_gpu_streams)
    : running_(true), completed_tasks_count_(0), total_tasks_count_(0) {

    // Initialize CPU worker threads
    for (int i = 0; i < num_cpu_threads; ++i) {
        cpu_worker_threads_.emplace_back(&Scheduler::cpu_worker_loop, this);
    }

    // Initialize GPU streams (CUDA example) and corresponding worker threads
    // In a production system, error handling for CUDA API calls is crucial.
    for (int i = 0; i < num_gpu_streams; ++i) {
        cudaStream_t stream;
        // cudaStreamCreate(&stream); // Actual CUDA stream creation
        stream = reinterpret_cast<cudaStream_t>(new int()); // Mock stream for compilation
        gpu_streams_.push_back(stream);
        // Each GPU stream might have a dedicated worker thread to manage its launches and events
        gpu_worker_threads_.emplace_back(&Scheduler::gpu_worker_loop, this, stream);
    }
}

// Destructor: Cleans up resources
Scheduler::~Scheduler() {
    running_ = false; // Signal worker threads to stop
    cv_.notify_all(); // Wake up any waiting threads

    // Join CPU worker threads
    for (auto& t : cpu_worker_threads_) {
        if (t.joinable()) {
            t.join();
        }
    }

    // Join GPU worker threads and destroy streams
    for (size_t i = 0; i < gpu_worker_threads_.size(); ++i) {
        if (gpu_worker_threads_[i].joinable()) {
            gpu_worker_threads_[i].join();
        }
        // cudaStreamDestroy(gpu_streams_[i]); // Actual CUDA stream destruction
        delete reinterpret_cast<int*>(gpu_streams_[i]); // Clean up mock stream
    }
}

// Adds a task to the scheduler
void Scheduler::add_task(std::shared_ptr<Task> task) {
    std::lock_guard<std::mutex> lock(mtx_);
    tasks_[task->get_id()] = task;
    total_tasks_count_++;

    // If task has no predecessors, it's immediately ready
    if (task->get_predecessors().empty()) {
        task->set_status(TaskStatus::READY);
        if (task->get_device_type() == DeviceType::CPU) {
            cpu_ready_queue_guarded_.push(task->get_id());
        } else { // GPU or MEMORY tasks
            gpu_ready_queue_guarded_.push(task->get_id());
        }
        cv_.notify_one(); // Notify scheduler that new tasks might be ready
    }
}

// Adds a dependency between tasks
void Scheduler::add_dependency(TaskID predecessor_id, TaskID successor_id) {
    std::lock_guard<std::mutex> lock(mtx_);
    auto pred_it = tasks_.find(predecessor_id);
    auto succ_it = tasks_.find(successor_id);

    if (pred_it == tasks_.end() || succ_it == tasks_.end()) {
        throw std::runtime_error("Invalid task ID for dependency.");
    }

    pred_it->second->add_successor(successor_id);
    succ_it->second->add_predecessor(predecessor_id);
}

// CPU worker thread loop: continuously dequeues and executes CPU tasks
void Scheduler::cpu_worker_loop() {
    while (running_) {
        TaskID task_id;
        if (cpu_task_queue_internal_.try_dequeue(task_id)) {
            std::shared_ptr<Task> task;
            {
                std::lock_guard<std::mutex> lock(mtx_);
                task = tasks_[task_id];
                task->set_status(TaskStatus::RUNNING); // Mark as running
            }

            task->execute(); // Execute the CPU task

            process_completed_task(task->get_id());
        } else {
            // No tasks, sleep briefly to avoid busy-waiting, or wait on a condition variable
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
        }
    }
    std::cout << "CPU Worker Thread Exiting." << std::endl;
}

// GPU worker thread loop: dequeues and simulates execution of GPU/Memory tasks
// In a real system, this would involve cudaLaunchKernel and cudaEventRecord/cudaStreamWaitEvent
void Scheduler::gpu_worker_loop(cudaStream_t stream) {
    while (running_) {
        TaskID task_id;
        if (gpu_task_queue_internal_.try_dequeue(task_id)) {
            std::shared_ptr<Task> task;
            {
                std::lock_guard<std::mutex> lock(mtx_);
                task = tasks_[task_id];
                task->set_status(TaskStatus::RUNNING); // Mark as running
            }

            // Execute the GPU/Memory task.
            // For GpuTask, the kernel_func_ would be launched asynchronously on 'stream'.
            // For DataTransferTask, cudaMemcpyAsync would be used on 'stream'.
            // The execute() method itself simulates the work.
            if (task->get_device_type() == DeviceType::GPU) {
                std::dynamic_pointer_cast<GpuTask>(task)->execute(); // Pass stream if needed
            } else if (task->get_device_type() == DeviceType::MEMORY) {
                std::dynamic_pointer_cast<DataTransferTask>(task)->execute(); // Pass stream if needed
            }

            // In a real system, we'd record a CUDA event here and wait for it
            // cudaEvent_t event;
            // cudaEventCreate(&event);
            // cudaEventRecord(event, stream);
            // cudaEventSynchronize(event); // This would block until GPU work is done
            // cudaEventDestroy(event);
            // To achieve overlapping, the completion notification would be asynchronous.
            // For this demo, we just directly process completion after simulated work.

            process_completed_task(task->get_id());
        } else {
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
        }
    }
    std::cout << "GPU Worker Thread Exiting for stream: " << stream << std::endl;
}

// Main scheduling loop: dispatches ready tasks to appropriate worker queues
void Scheduler::dispatch_ready_tasks() {
    std::lock_guard<std::mutex> lock(mtx_);

    // Dispatch CPU tasks
    while (!cpu_ready_queue_guarded_.empty()) {
        TaskID task_id = cpu_ready_queue_guarded_.front();
        cpu_ready_queue_guarded_.pop();

        // Only dispatch if still READY (status might change if a dependency was revoked, though unlikely in DAG)
        if (tasks_[task_id]->get_status() == TaskStatus::READY) {
            cpu_task_queue_internal_.enqueue(task_id); // Move to concurrent queue for workers
        }
    }

    // Dispatch GPU/Memory tasks
    // This is simplified: a real GPU_Executor would manage streams more intelligently
    // For now, we just enqueue to a general GPU task queue.
    while (!gpu_ready_queue_guarded_.empty()) {
        TaskID task_id = gpu_ready_queue_guarded_.front();
        gpu_ready_queue_guarded_.pop();

        if (tasks_[task_id]->get_status() == TaskStatus::READY) {
             gpu_task_queue_internal_.enqueue(task_id); // Move to concurrent queue for workers
        }
    }
}

// Handles task completion, updates status, increments count, and notifies successors
void Scheduler::process_completed_task(TaskID task_id) {
    std::lock_guard<std::mutex> lock(mtx_);
    auto task = tasks_[task_id];
    task->set_status(TaskStatus::COMPLETED);
    completed_tasks_count_++;

    notify_successors(task_id); // Notify tasks that depend on this one
    cv_.notify_all(); // Notify run/wait_for_completion potentially
}

// Iterates through successors of a completed task and updates their readiness
void Scheduler::notify_successors(TaskID completed_task_id) {
    for (TaskID succ_id : tasks_[completed_task_id]->get_successors()) {
        auto successor_task = tasks_[succ_id];
        successor_task->notify_predecessor_completed(completed_task_id); // Update predecessor count

        if (successor_task->is_ready()) { // Check if successor is now fully ready
            if (successor_task->get_device_type() == DeviceType::CPU) {
                cpu_ready_queue_guarded_.push(succ_id);
            } else { // GPU or MEMORY tasks
                gpu_ready_queue_guarded_.push(succ_id);
            }
            cv_.notify_one(); // Notify scheduler that new tasks might be ready
        }
    }
}

// Main scheduler loop
void Scheduler::run() {
    std::unique_lock<std::mutex> lock(mtx_);
    while (running_ && completed_tasks_count_ < total_tasks_count_) {
        // Wait until there are ready tasks or tasks complete, or scheduler stops
        cv_.wait(lock, [this]{ 
            return !cpu_ready_queue_guarded_.empty() || !gpu_ready_queue_guarded_.empty() || 
                   completed_tasks_count_ == total_tasks_count_ || !running_; 
        });

        if (!running_ || completed_tasks_count_ == total_tasks_count_) {
            break; // Exit if done or stopped
        }

        // Release lock briefly to allow dispatch to enqueue tasks
        // This is a subtle point: dispatch needs to acquire the lock internally.
        // A better design might use a separate "dispatching thread" that wakes up
        // or a lock-free queue for ready tasks.
        lock.unlock(); 
        dispatch_ready_tasks();
        lock.lock(); // Re-acquire lock for the next wait cycle
    }
    running_ = false; // All tasks done or no more ready tasks
    std::cout << "Scheduler finished its main execution loop." << std::endl;
}

// Waits for all tasks to complete
void Scheduler::wait_for_completion() {
    std::unique_lock<std::mutex> lock(mtx_);
    cv_.wait(lock, [this]{ return completed_tasks_count_ == total_tasks_count_; });
    std::cout << "All " << total_tasks_count_ << " tasks completed." << std::endl;
}

关于并发队列的说明: moodycamel::ConcurrentQueue 是一个高性能的无锁/少锁并发队列,非常适合在生产者-消费者模式中(例如调度器线程与工作线程之间)高效地传递任务。使用它比 std::queue 配合 std::mutexstd::condition_variable 在高并发场景下能提供更好的性能。

关于 GPU 模拟执行的说明: 在 Scheduler::gpu_worker_loop 中,task->execute() 被直接调用并模拟了阻塞。为了实现真正的 GPU 异步和重叠,execute() 应该是非阻塞地启动 GPU 内核(例如 cudaLaunchKernel),然后 gpu_worker_loop 应该使用 cudaEventRecordcudaEventQuerycudaStreamWaitEvent 来非阻塞地等待内核完成,并在完成后再调用 process_completed_task。由于 C++ 侧无法直接等待 GPU 事件而不阻塞线程,通常会有一个专门的线程循环来查询事件状态,或者注册一个 CUDA 回调函数。本示例通过将 GPU 任务推送到一个单独的线程来模拟这种异步性,但其 execute() 方法内部仍然是阻塞模拟。

4. 构建任务图与重叠执行示例

现在,我们使用前面定义的组件来构建一个简单的图像处理任务图,并展示如何通过调度器实现重叠执行。

任务图场景:图像处理管线

  1. 加载数据 (CPU):从磁盘加载图像数据。
  2. CPU 预处理 (CPU):例如,图像解码、缩放、颜色空间转换。
  3. 数据传输 H2D (Memory):将预处理后的数据从 CPU 内存传输到 GPU 显存。
  4. GPU 计算 A (GPU):在 GPU 上执行第一个计算密集型操作,例如卷积。
  5. GPU 计算 B (GPU):在 GPU 上执行第二个计算密集型操作,例如滤波。
  6. 数据传输 D2H (Memory):将 GPU 计算结果从 GPU 显存传输回 CPU 内存。
  7. CPU 后处理 (CPU):例如,结果聚合、格式编码。
  8. 保存结果 (CPU):将最终结果写入磁盘。

依赖关系

  • 1 -> 2
  • 2 -> 3
  • 3 -> 4
  • 4 -> 5
  • 5 -> 6
  • 6 -> 7
  • 7 -> 8

这个链式依赖图虽然简单,但调度器仍然可以利用异步传输和 GPU 计算的特性进行重叠。例如,当 GPU 计算 A 正在运行时,如果 数据传输 H2D 是异步的,它们可以部分重叠。更重要的是,在更复杂的图中,可能存在独立的分支,可以完全并行执行。

C++ main 函数示例

// main.cpp
#include "Scheduler.h"
#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
#include <functional>
#include <map>
#include <set>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <memory>

int main() {
    std::cout << "--- Starting Heterogeneous Task Graph Scheduler Example ---" << std::endl;

    // Initialize scheduler with 2 CPU threads and 1 GPU stream (simulated)
    // For true overlapping, num_gpu_streams > 0 is essential.
    Scheduler scheduler(2, 1); 

    // Define tasks
    auto task_load_data = std::make_shared<CpuTask>("Load Image Data", []() {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        // Simulate loading data, e.g., from disk
    });

    auto task_preprocess_cpu = std::make_shared<CpuTask>("CPU Preprocessing", []() {
        std::this_thread::sleep_for(std::chrono::milliseconds(150));
        // Simulate image decoding, resizing etc.
    });

    auto task_transfer_to_gpu = std::make_shared<DataTransferTask>("Transfer H2D", 
        DataTransferTask::TransferDirection::HOST_TO_DEVICE, 1024 * 1024 * 4); // 4MB data

    auto task_gpu_compute_A = std::make_shared<GpuTask>("GPU Compute A (Convolution)", [](cudaStream_t stream) {
        // Simulate CUDA kernel launch for convolution
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    });

    auto task_gpu_compute_B = std::make_shared<GpuTask>("GPU Compute B (Filter)", [](cudaStream_t stream) {
        // Simulate CUDA kernel launch for filtering
        std::this_thread::sleep_for(std::chrono::milliseconds(250));
    });

    auto task_transfer_from_gpu = std::make_shared<DataTransferTask>("Transfer D2H", 
        DataTransferTask::TransferDirection::DEVICE_TO_HOST, 1024 * 1024 * 4); // 4MB data

    auto task_postprocess_cpu = std::make_shared<CpuTask>("CPU Postprocessing", []() {
        std::this_thread::sleep_for(std::chrono::milliseconds(120));
        // Simulate result aggregation, encoding
    });

    auto task_save_results = std::make_shared<CpuTask>("Save Results", []() {
        std::this_thread::sleep_for(std::chrono::milliseconds(80));
        // Simulate writing results to disk
    });

    // Add tasks to the scheduler
    scheduler.add_task(task_load_data);
    scheduler.add_task(task_preprocess_cpu);
    scheduler.add_task(task_transfer_to_gpu);
    scheduler.add_task(task_gpu_compute_A);
    scheduler.add_task(task_gpu_compute_B);
    scheduler.add_task(task_transfer_from_gpu);
    scheduler.add_task(task_postprocess_cpu);
    scheduler.add_task(task_save_results);

    // Define dependencies (explicitly building the DAG)
    scheduler.add_dependency(task_load_data->get_id(), task_preprocess_cpu->get_id());
    scheduler.add_dependency(task_preprocess_cpu->get_id(), task_transfer_to_gpu->get_id());
    scheduler.add_dependency(task_transfer_to_gpu->get_id(), task_gpu_compute_A->get_id());
    scheduler.add_dependency(task_gpu_compute_A->get_id(), task_gpu_compute_B->get_id()); // Chained GPU tasks
    scheduler.add_dependency(task_gpu_compute_B->get_id(), task_transfer_from_gpu->get_id());
    scheduler.add_dependency(task_transfer_from_gpu->get_id(), task_postprocess_cpu->get_id());
    scheduler.add_dependency(task_postprocess_cpu->get_id(), task_save_results->get_id());

    // --- Demonstrate Overlapping Potential with a parallel branch ---
    // Let's add an independent CPU task that can run in parallel with the GPU pipeline
    auto task_independent_cpu_log = std::make_shared<CpuTask>("Independent CPU Logging", []() {
        std::this_thread::sleep_for(std::chrono::milliseconds(300));
        std::cout << "[CPU] Independent logging task completed." << std::endl;
    });
    scheduler.add_task(task_independent_cpu_log);
    // This task has no predecessors, so it becomes READY immediately and can run in parallel
    // with task_load_data and subsequent tasks, as long as CPU resources are available.

    // A slightly more complex parallel branch:
    // CPU Task: Initial setup for another branch
    auto task_branch_setup = std::make_shared<CpuTask>("Branch Setup CPU", []() {
        std::this_thread::sleep_for(std::chrono::milliseconds(70));
        std::cout << "[CPU] Branch setup completed." << std::endl;
    });
    scheduler.add_task(task_branch_setup);
    scheduler.add_dependency(task_load_data->get_id(), task_branch_setup->get_id()); // Depends on initial data load

    // GPU Task: Small GPU task in the parallel branch
    auto task_branch_gpu_small = std::make_shared<GpuTask>("Branch GPU Small Compute", [](cudaStream_t stream) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::cout << "[GPU] Branch small GPU compute completed." << std::endl;
    });
    scheduler.add_task(task_branch_gpu_small);
    scheduler.add_dependency(task_branch_setup->get_id(), task_branch_gpu_small->get_id());

    // CPU Task: Final processing for the parallel branch, depends on main pipeline's GPU compute B
    auto task_branch_final_cpu = std::make_shared<CpuTask>("Branch Final CPU", []() {
        std::this_thread::sleep_for(std::chrono::milliseconds(90));
        std::cout << "[CPU] Branch final CPU processing completed." << std::endl;
    });
    scheduler.add_task(task_branch_final_cpu);
    scheduler.add_dependency(task_branch_gpu_small->get_id(), task_branch_final_cpu->get_id());
    scheduler.add_dependency(task_gpu_compute_B->get_id(), task_branch_final_cpu->get_id()); // Merges with main pipeline

    // Run the scheduler in a separate thread to allow main to wait
    std::thread scheduler_thread([&]() {
        scheduler.run();
    });

    // Wait for all tasks to complete
    scheduler.wait_for_completion();

    // Ensure scheduler thread finishes
    scheduler_thread.join(); 

    std::cout << "--- Heterogeneous Task Graph Scheduler Example Finished ---" << std::endl;

    return 0;
}

编译和运行:
要编译此代码,您需要 C++17 或更高版本。moodycamel/concurrentqueue.h 是一个第三方库,您需要将其包含在项目中。最简单的方法是下载其头文件并将其放在您的包含路径中。
对于 cudaStream_t 的模拟,我们在 Scheduler.cpp 中使用了 reinterpret_cast<cudaStream_t>(new int())。在实际项目中,您需要安装 CUDA Toolkit 并链接 CUDA 库。

# 假设您已将 moodycamel/concurrentqueue.h 放在当前目录或-I指定的目录
# 假设您在Linux上使用g++
g++ -std=c++17 -O2 -Wall -I. Task.cpp CpuTask.cpp GpuTask.cpp DataTransferTask.cpp Scheduler.cpp main.cpp -o scheduler_demo -pthread 
# 如果是真正的CUDA项目,需要nvcc编译CUDA文件,并链接cuda运行时库
# nvcc -std=c++17 -x cu -c GpuTask.cu -o GpuTask.o # 假设GpuTask.cu包含真正的CUDA代码
# g++ -std=c++17 -O2 -Wall -I. Task.cpp CpuTask.cpp DataTransferTask.cpp Scheduler.cpp main.cpp GpuTask.o -o scheduler_demo -pthread -lcudart

运行 scheduler_demo 后,您会看到不同设备上的任务输出交错进行,这表明 CPU、GPU 和数据传输任务正在尝试重叠执行。例如,在 GPU 任务运行时,独立的 CPU 任务可能已经开始甚至完成。

5. 高级重叠与性能优化

为了最大限度地实现硬件单元的重叠,并提升整体性能,需要考虑以下高级方面:

5.1 异步执行与事件同步

  • CUDA Streams / OpenCL Command Queues: GPU 上的并发执行单元。不同的流可以独立执行内核和数据传输。通过在不同流中放置不相关的操作,可以实现 GPU 内部的重叠。
  • CUDA Events / OpenCL Events: 用于流内或流间同步的轻量级机制。一个事件可以在一个流中的某个点被记录,然后在另一个流中等待这个事件,从而建立精确的依赖关系而无需 CPU 介入。
  • CPU-GPU 同步:
    • cudaStreamSynchronize(): 阻塞 CPU 直到指定流中的所有操作完成。应尽量避免,因为它破坏了 CPU-GPU 重叠。
    • cudaEventSynchronize(): 阻塞 CPU 直到指定事件发生。比流同步更精细。
    • cudaStreamWaitEvent(): 让一个流等待另一个流中的事件,实现 GPU 内部同步,CPU 不阻塞。
    • cudaHostFn_t (CUDA Stream Callbacks): 在流中某个点插入一个 CPU 回调函数,当流执行到该点时,CPU 异步执行回调。这是通知调度器任务完成的理想方式,无需 CPU 轮询或阻塞。

5.2 数据局部性与传输优化

  • 零拷贝 (Zero-Copy) / 统一内存 (Unified Memory):减少甚至消除显式的数据复制。对于某些工作负载,统一内存可以简化编程模型,但在性能关键路径上,显式管理数据传输通常效果更好。
  • Pinned Memory (页锁定内存):CPU 主内存的一部分,操作系统保证其物理地址固定,从而允许 GPU 直接通过 DMA 访问,加速 Host-to-Device 传输。
  • 异步传输与计算重叠: cudaMemcpyAsync() 可以在一个流中启动数据传输,同时另一个流中的计算任务可以并行执行。调度器应智能地安排这些操作,例如:
    1. GPU 计算 A
    2. GPU 计算 A 完成后,在流 S1 中启动 cudaMemcpyAsync(D2H)
    3. 在流 S2 中启动 GPU 计算 B (如果它不依赖于 A 的输出)。
      这种方式可以在数据传输的同时,GPU 执行其他计算任务。

5.3 资源管理与调度策略

  • 动态线程池/流池: 根据系统负载动态调整 CPU 线程池大小或 GPU 流的数量。
  • 优先级调度: 对紧急任务或高优先级任务优先调度。
  • 亲和性调度: 尽量将任务分配给数据已经存在的设备,减少数据传输。
  • 负载均衡: 在多个同类型设备(例如多 GPU 系统)之间分配任务。
  • 内存感知调度: 考虑每个任务所需的内存量和可用内存,避免内存溢出。
  • 任务粒度: 任务的粒度会显著影响调度开销和并行性。太细的任务会增加调度开销,太粗的任务会限制并行性。找到最佳粒度是一个挑战。

5.4 动态任务图生成

在某些复杂应用中,任务图的结构可能不是静态已知的,而是根据运行时数据或条件动态生成的。例如,一个任务的输出可能会决定后续要执行哪些任务。这要求调度器能够动态地添加任务和依赖关系。

6. 挑战与最佳实践

6.1 挑战

  • 复杂性管理: 异构任务图和调度器本身就具有很高的复杂性。随着任务数量和设备类型的增加,图的构建、验证和调试变得更加困难。
  • 错误处理: 硬件错误、内存分配失败、内核执行失败等都必须妥善处理,并能将错误传播到上层应用。
  • 性能瓶颈分析: 确定系统性能瓶颈可能位于 CPU、GPU、内存带宽还是 PCIe 带宽,需要专业的分析工具(如 NVIDIA Nsight, Intel VTune)。
  • 可移植性: 针对特定硬件(如 CUDA)编写的代码通常不直接兼容其他硬件(如 OpenCL)。需要抽象层(如 SYCL, OpenMP offload, oneAPI)来提高代码可移植性。
  • 数据一致性: 在多设备、多线程环境中维护数据的一致性和正确性是一个严峻的挑战。

6.2 最佳实践

  • 模块化设计: 将任务定义、调度器、执行器、数据管理等职责分离,降低耦合度。
  • 利用异步 API: 尽可能使用硬件提供的异步 API (cudaMemcpyAsync, cudaLaunchKernel, std::async) 来实现重叠。
  • 最小化同步点: 只有在绝对必要时才进行同步,例如当一个任务确实需要另一个任务的输出时。
  • 细粒度任务与粗粒度任务的平衡: 任务粒度应足够细,以便发现并行性,但不能太细,以避免过高的调度开销。
  • 数据优先: 优先考虑数据传输的优化,包括使用 pinned memory、异步传输和数据局部性。
  • 日志与可视化: 详尽的日志记录和任务图的可视化工具对于调试和性能分析至关重要。可以利用 Graphviz 等工具将任务图导出为图片。
  • 单元测试与集成测试: 彻底测试任务的正确性和调度器的行为。

7. 异构任务调度:未来展望

C++ 异构任务图的构建和调度是现代高性能计算中的一项核心技术。通过精心设计的任务抽象、依赖管理和异步调度机制,我们能够充分利用系统中多样化的硬件资源,实现计算、数据传输和 I/O 的高度重叠,从而显著提升应用程序的性能和效率。

随着硬件异构性越来越高,以及对更低延迟和更高吞吐量的持续追求,智能的、自适应的异构任务调度器将变得更加关键。未来的发展方向可能包括:更强大的运行时优化、基于机器学习的调度决策、以及更高级的编程模型和工具链,以进一步简化异构编程的复杂性,使开发者能够更专注于算法本身。构建一个健壮且高效的异构任务图调度器,是解锁现代硬件潜力的必由之路。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注