C++ 线程模型:基于 Work-stealing 算法的高性能任务并行库构建

C++ 线程模型:基于 Work-stealing 算法的高性能任务并行库构建

各位技术同仁,大家好!

在现代计算环境中,多核处理器已成为主流,如何充分利用这些硬件资源,榨取程序的并行性能,是每个C++开发者面临的核心挑战。传统的 std::thread 编程模型虽然提供了线程抽象,但在处理复杂任务图、动态负载均衡以及追求极致性能时,其局限性日益凸显。今天,我们将深入探讨一种先进的并行计算范式——基于 Work-stealing 算法的任务并行库,并逐步构建一个高性能的C++实现。

1. 现代多核架构下的挑战与机遇

随着摩尔定律的放缓,CPU制造商转向了多核架构以提升计算能力。这意味着,如果我们的应用程序依然是单线程的,那么它将无法充分利用现代硬件的潜力。然而,并行编程并非易事。它引入了死锁、竞态条件、内存可见性问题以及难以调试的复杂性。

传统的线程模型,如直接使用 std::thread 和互斥锁,存在以下几个显著问题:

  • 上下文切换开销: 操作系统级别的线程是重量级的。频繁的线程创建、销毁和上下文切换会带来显著的性能开销。
  • 锁竞争与死锁: 粗粒度的互斥锁(std::mutex)可能导致严重的性能瓶颈,而细粒度锁的管理又极其复杂,容易引入死锁。
  • 负载不均衡: 静态的任务分配往往无法适应程序运行时的动态变化,导致部分核心空闲,而另一些核心过载。
  • 缓存局部性差: 线程频繁访问非本地数据可能导致缓存失效,增加内存访问延迟。
  • 编程模型复杂性: 开发者需要手动管理线程生命周期、同步机制,将业务逻辑与并行控制紧密耦合,降低了代码的可读性和可维护性。

为了克服这些挑战,我们需要一种更高级别的抽象,能够将并行控制从业务逻辑中解耦,并自动优化资源利用。任务并行(Task Parallelism)应运而生,它将计算分解为一系列独立的任务,由运行时系统负责调度和执行。而 Work-stealing 算法,正是实现高性能任务并行的核心驱动力之一。

2. Work-stealing 算法:原理与优势

Work-stealing 是一种动态负载均衡算法,它在分布式和并行计算系统中表现出色。其核心思想是:每个工作线程(Worker Thread)维护一个私有的任务队列。当工作线程完成自己的所有任务后,它不会立即空闲,而是尝试从其他工作线程的任务队列中“窃取”任务来执行。

2.1 基本机制

一个典型的 Work-stealing 系统通常包含以下组件和行为:

  • 工作线程 (Worker Thread): 实际执行计算任务的线程。通常,系统会创建与CPU核心数相同或略多的工作线程。
  • 任务双端队列 (Task Deque): 每个工作线程都拥有一个私有的双端队列(Deque)。
    • 本地操作: 工作线程将新创建的任务推入自己 Deque 的一端(通常是顶部),并从同一端弹出任务来执行(LIFO,后进先出)。这种 LIFO 行为有利于缓存局部性,因为最近创建的任务往往与当前任务的数据更相关。
    • 窃取操作: 当一个工作线程的本地 Deque 为空时,它会变成一个“窃取者”(stealer)。它会随机选择一个其他工作线程(“受害者”,victim),并尝试从其 Deque 的另一端(通常是底部)窃取任务(FIFO,先进先出)。FIFO 窃取行为旨在窃取“老”任务,这些任务可能与其他任务的依赖性较小,且能最大程度地减少与受害者本地操作的冲突。

2.2 Work-stealing 的优势

Work-stealing 算法带来了诸多显著的优势:

  1. 自动负载均衡: 当某个线程的任务量过大时,其他空闲线程会自动帮助其分担任务,无需程序员显式干预。这使得系统能够高效地适应动态变化的工作负载。
  2. 改善缓存局部性: 工作线程优先处理自己本地队列中的任务,这些任务通常由该线程创建,数据很可能还在其CPU缓存中。这大大减少了缓存失效和内存访问延迟。
  3. 降低锁竞争: 大多数任务操作(本地任务的推入和弹出)都发生在线程的私有队列上,通常是无锁或使用非常轻量级的锁。只有在窃取操作时,才需要跨线程访问共享队列,这部分竞争被设计得非常精巧,以最小化开销。
  4. 高吞吐量: 减少了线程空闲时间,提高了CPU利用率,从而增加了整体任务吞吐量。
  5. 简化编程模型: 开发者只需定义任务并提交给调度器,无需关心底层线程管理和负载均衡的复杂性。

Work-stealing 算法在 Intel TBB (Threading Building Blocks)、Microsoft PPL (Parallel Patterns Library)、Go 语言运行时调度器以及许多高性能并行库中得到了广泛应用,充分证明了其有效性。

3. Work-stealing 任务并行库的架构设计

构建一个 Work-stealing 任务并行库,我们需要精心设计以下核心组件:

组件名称 职责 关键设计考量
Task (任务) 抽象的计算单元,可执行的函数对象、lambda。 泛型化,支持返回值和异常处理。
LockFreeDeque 每个工作线程私有的无锁双端队列,支持本地推拉和远程窃取。 保证线程安全,最小化竞争,使用原子操作。
WorkerThread 实际执行任务的工作线程,包含一个 LockFreeDeque 和执行循环。 管理线程生命周期,实现任务调度逻辑(本地优先,再窃取)。
Scheduler 任务调度器,管理 WorkerThread 池,提供任务提交接口。 线程池管理,任务分发,支持等待任务完成。
Future / Promise 任务结果的异步获取机制。 提供非阻塞的结果获取和异常传播。

3.1 任务 (Task) 的抽象

任务是并行库的基本单元。在 C++ 中,std::function<void()>std::function<R()> 是一个很好的选择,因为它能够封装任何可调用对象(函数指针、lambda、函数对象)。为了支持任务返回值和异常传播,我们可以结合 std::promisestd::future

#include <functional>
#include <future>
#include <memory>
#include <utility>

// 基础任务接口
struct ITask {
    virtual void run() = 0;
    virtual ~ITask() = default;
};

// 带有返回值的任务
template <typename R>
class PackagedTask : public ITask {
public:
    // 构造函数接受一个可调用对象
    template <typename F>
    PackagedTask(F&& func) : func_(std::forward<F>(func)) {}

    void run() override {
        try {
            if constexpr (!std::is_void_v<R>) {
                promise_.set_value(func_());
            } else {
                func_();
                promise_.set_value();
            }
        } catch (...) {
            promise_.set_exception(std::current_exception());
        }
    }

    // 获取 future,用于异步获取结果
    std::future<R> get_future() {
        return promise_.get_future();
    }

private:
    std::function<R()> func_;
    std::promise<R> promise_;
};

// 辅助函数,用于创建 PackagedTask
template <typename F, typename R = std::invoke_result_t<F>>
std::unique_ptr<PackagedTask<R>> make_packaged_task(F&& func) {
    return std::make_unique<PackagedTask<R>>(std::forward<F>(func));
}

通过 ITask 接口,我们将不同类型、不同返回值的任务统一抽象。PackagedTask 利用 std::promisestd::future 实现了任务执行结果的异步传递和异常处理。

3.2 无锁双端队列 (LockFreeDeque) 的实现要点

LockFreeDeque 是 Work-stealing 算法的心脏。它的挑战在于如何高效、无锁地支持以下操作:

  1. push_back (本地推入): 工作线程将任务推入其队列的“顶部”。
  2. pop_back (本地弹出): 工作线程从其队列的“顶部”弹出任务。
  3. steal (远程窃取): 其他工作线程从其队列的“底部”窃取任务。

通常,LockFreeDeque 会使用一个固定大小的环形缓冲区(circular buffer)作为底层存储,并依赖 C++11 引入的原子操作(std::atomic)和内存模型(std::memory_order)来保证线程安全。

实现关键点:

  • 头尾指针/索引: 使用 std::atomic<size_t> 来维护队列的头(head_)和尾(tail_)索引。
  • CAS (Compare-And-Swap): 这是实现无锁数据结构的核心。例如,当一个线程尝试修改 tail_head_ 时,它会读取当前值,计算新值,然后使用 CAS 操作尝试更新。如果期间有其他线程修改了相同的值,CAS 会失败,线程需要重试。
  • 内存序 (Memory Order):
    • std::memory_order_relaxed: 最宽松的内存序,不保证顺序性,只保证原子操作本身的原子性。
    • std::memory_order_acquire: 读操作,确保此操作之后的所有内存访问不会被重排到此操作之前。
    • std::memory_order_release: 写操作,确保此操作之前的所有内存访问不会被重排到此操作之后。
    • std::memory_order_acq_rel: 读-改-写操作(如 CAS),兼具 acquire 和 release 语义。
    • std::memory_order_seq_cst: 最严格的内存序,提供全局的顺序一致性,但开销最大。
      在 Work-stealing Deque 中,acquirerelease 语义对于正确同步头尾指针以及任务数据的可见性至关重要。

简化的 LockFreeDeque 伪代码及关键操作:

我们以一个固定大小的 std::vector<std::unique_ptr<ITask>> 作为环形缓冲区为例。

#include <atomic>
#include <vector>
#include <numeric> // for std::iota
#include <optional> // C++17 for optional return value

// 假设 ITask 已经定义如上
// 为了简化,这里使用 raw pointer 演示,实际生产代码应使用 unique_ptr/shared_ptr
// 并处理好内存管理,例如在 Deque 析构时清空所有任务。
class LockFreeDeque {
public:
    explicit LockFreeDeque(size_t capacity) : capacity_(capacity),
                                              buffer_(capacity),
                                              head_(0),
                                              tail_(0) {}

    // 本地线程推入任务 (LIFO from top/tail)
    bool push_back(std::unique_ptr<ITask> task) {
        // 先读取尾部索引
        size_t current_tail = tail_.load(std::memory_order_relaxed);
        // 计算新的尾部索引
        size_t next_tail = (current_tail + 1) % capacity_;

        // 检查队列是否已满
        if (next_tail == head_.load(std::memory_order_acquire)) {
            // 队列已满
            return false;
        }

        // 放入任务
        buffer_[current_tail] = std::move(task);
        // 更新尾部索引,使用 release 语义,确保任务数据在更新tail之前可见
        tail_.store(next_tail, std::memory_order_release);
        return true;
    }

    // 本地线程弹出任务 (LIFO from top/tail)
    std::unique_ptr<ITask> pop_back() {
        // 先读取尾部索引
        size_t current_tail = tail_.load(std::memory_order_relaxed);
        // 计算“逻辑上”的前一个尾部索引
        size_t prev_tail = (current_tail + capacity_ - 1) % capacity_;

        // 尝试 CAS 减小 tail 指针
        // 如果 CAS 成功,说明我们独占了 prev_tail 这个位置
        if (!tail_.compare_exchange_strong(current_tail, prev_tail,
                                           std::memory_order_release, // 成功:release 语义,确保任务数据在 tail 更新前可见
                                           std::memory_order_relaxed)) { // 失败:relaxed 语义
            // 有其他线程在修改 tail,重试或返回空
            return nullptr; // 或者循环重试
        }

        // 检查队列是否为空 (在 CAS 成功之后)
        if (prev_tail == head_.load(std::memory_order_acquire)) {
            // 队列为空,或者在 CAS 之后变空,需要回滚 tail
            // 这里为了简化,直接返回 nullptr,实际需要更复杂的处理
            // 例如,把 tail_ 重新设置为 current_tail,然后返回 nullptr
            // 或者使用更复杂的双重检查模式来处理空队列情况
            // 为了避免 ABA 问题,这里需要更精细的 CAS 逻辑或版本计数
            tail_.store(current_tail, std::memory_order_relaxed); // 回滚
            return nullptr;
        }

        // 取出任务
        std::unique_ptr<ITask> task = std::move(buffer_[prev_tail]);
        return task;
    }

    // 远程线程窃取任务 (FIFO from bottom/head)
    std::unique_ptr<ITask> steal() {
        // 先读取头部索引
        size_t current_head = head_.load(std::memory_order_acquire); // acquire 语义,确保能看到最新的 head
        // 读取尾部索引
        size_t current_tail = tail_.load(std::memory_order_relaxed);

        // 检查队列是否为空
        if (current_head == current_tail) {
            return nullptr; // 队列为空
        }

        // 取出任务
        std::unique_ptr<ITask> task = std::move(buffer_[current_head]);

        // 尝试 CAS 增加 head 指针
        if (!head_.compare_exchange_strong(current_head, (current_head + 1) % capacity_,
                                           std::memory_order_release, // 成功:release 语义
                                           std::memory_order_relaxed)) { // 失败:relaxed 语义
            // CAS 失败,说明有其他线程修改了 head (可能是另一个窃取者)
            // 此时任务可能已经被偷走,或者 head 已被更新,返回 nullptr
            // 注意:这里没有回滚 buffer_[current_head] 的所有权,需要确保如果 CAS 失败,task 仍然是 nullptr
            // 实际实现中,要先 CAS head_,成功后才取出任务。
            return nullptr;
        }
        return task;
    }

    // 检查队列是否为空 (近似值,可能不准确,仅供参考)
    bool empty() const {
        return head_.load(std::memory_order_relaxed) == tail_.load(std::memory_order_relaxed);
    }

private:
    size_t capacity_;
    std::vector<std::unique_ptr<ITask>> buffer_;
    std::atomic<size_t> head_; // 窃取者操作
    std::atomic<size_t> tail_; // 本地线程操作
};

关于 pop_backsteal 的原子操作细节:

  • pop_back (本地线程):

    • 首先原子读取 tail_
    • 尝试使用 CAS 来递减 tail_。如果 CAS 成功,说明该线程成功地“预订”了队列中的一个位置,并且这个位置现在被视为可用的。
    • 然后检查 head_ 和更新后的 tail_ 来判断队列是否为空。如果为空,说明在尝试弹出时队列已空,需要回滚 tail_ 并返回空。
    • 最后取出任务。
    • 这种设计避免了 ABA 问题,因为 tail_ 是递减的,不会回滚到相同的值,除非队列被完全清空又重新填满。
  • steal (远程线程):

    • 原子读取 head_
    • 原子读取 tail_
    • 如果 head_ == tail_,队列为空。
    • 如果队列不为空,则取出 buffer_[current_head] 中的任务。
    • 然后使用 CAS 尝试递增 head_。如果 CAS 成功,表示成功窃取任务,并更新了 head_。如果失败,说明在读取 head_ 后有其他窃取者先一步更新了 head_,则本次窃取失败。

内存序的抉择:

  • head_tail_load(std::memory_order_relaxed) 可以在不影响正确性的前提下提高性能,因为它们只是获取当前值进行比较或计算。
  • tail_.store(..., std::memory_order_release)push_back 中是关键,确保任务数据在 tail_ 更新之前完全写入。
  • head_.load(..., std::memory_order_acquire)steal 中是关键,确保窃取者能看到 tail_ 之前写入的所有任务数据。
  • compare_exchange_strongsuccess 路径通常使用 release 语义,failure 路径通常使用 relaxed 语义。

这是一个简化版的 LockFreeDeque,一个生产级别的无锁双端队列实现要复杂得多,需要处理更多的边缘情况,如 ABA 问题(通常通过版本计数器解决)、内存回收、动态扩容等。这里的目标是展示其核心原理。

3.3 工作线程 (WorkerThread) 的实现

每个 WorkerThread 都有自己的 LockFreeDeque,并在一个循环中执行任务。

#include <thread>
#include <deque> // For stealing attempts
#include <random> // For random victim selection
#include <chrono> // For sleep

// Forward declaration
class Scheduler;

class WorkerThread {
public:
    WorkerThread(size_t id, Scheduler* scheduler, size_t deque_capacity)
        : id_(id), scheduler_(scheduler), tasks_(deque_capacity), running_(true) {}

    void start() {
        thread_ = std::thread(&WorkerThread::run_loop, this);
    }

    void stop() {
        running_ = false;
        if (thread_.joinable()) {
            thread_.join();
        }
    }

    // 提供给 Scheduler 提交任务到本地队列
    bool push_task(std::unique_ptr<ITask> task) {
        return tasks_.push_back(std::move(task));
    }

    // 获取本地 Deque 的引用,供 Scheduler 访问(例如窃取)
    LockFreeDeque& get_deque() {
        return tasks_;
    }

private:
    void run_loop(); // 实现核心调度逻辑

    size_t id_;
    Scheduler* scheduler_; // 允许访问调度器,进行窃取等操作
    LockFreeDeque tasks_;
    std::thread thread_;
    std::atomic<bool> running_;
};

run_loopWorkerThread 的核心,它定义了任务的调度策略:

// WorkerThread::run_loop 的实现
void WorkerThread::run_loop() {
    std::random_device rd;
    std::mt19937 generator(rd()); // 每个线程独立的随机数生成器

    while (running_.load(std::memory_order_acquire)) {
        std::unique_ptr<ITask> task = nullptr;

        // 1. 尝试从本地队列弹出任务 (LIFO)
        task = tasks_.pop_back();

        if (task) {
            task->run();
            continue; // 继续处理本地任务
        }

        // 2. 如果本地队列为空,尝试从其他工作线程窃取任务 (FIFO)
        size_t num_workers = scheduler_->get_num_workers();
        if (num_workers > 1) { // 只有存在其他工作线程时才窃取
            // 随机选择一个受害者线程,但不选择自己
            std::uniform_int_distribution<size_t> distribution(0, num_workers - 1);
            size_t victim_id;
            do {
                victim_id = distribution(generator);
            } while (victim_id == id_);

            WorkerThread* victim_worker = scheduler_->get_worker(victim_id);
            if (victim_worker) {
                task = victim_worker->get_deque().steal();
                if (task) {
                    task->run();
                    continue; // 成功窃取并执行
                }
            }
        }

        // 3. 如果本地队列和窃取都失败,则空闲,等待新任务
        // 实际中可能需要更复杂的等待机制 (条件变量、信号量)
        // 这里为了简化,使用短暂休眠
        std::this_thread::sleep_for(std::chrono::microseconds(10));
    }
}

3.4 调度器 (Scheduler) 的实现

Scheduler 负责管理 WorkerThread 池,并提供任务提交的公共接口。

#include <vector>
#include <memory>
#include <thread>
#include <stdexcept>
#include <numeric> // for std::iota

// Forward declaration
class WorkerThread; // Defined above

class Scheduler {
public:
    explicit Scheduler(size_t num_workers = std::thread::hardware_concurrency(),
                       size_t deque_capacity = 256)
        : num_workers_(num_workers), deque_capacity_(deque_capacity) {
        if (num_workers_ == 0) {
            num_workers_ = 1; // 至少一个工作线程
        }
        workers_.reserve(num_workers_);
        for (size_t i = 0; i < num_workers_; ++i) {
            workers_.push_back(std::make_unique<WorkerThread>(i, this, deque_capacity_));
        }
    }

    ~Scheduler() {
        stop_all_workers();
    }

    void start_all_workers() {
        for (auto& worker : workers_) {
            worker->start();
        }
    }

    void stop_all_workers() {
        for (auto& worker : workers_) {
            worker->stop(); // 发送停止信号并等待线程结束
        }
    }

    // 提交任务到调度器
    template <typename F, typename R = std::invoke_result_t<F>>
    std::future<R> submit(F&& func) {
        auto packaged_task = make_packaged_task<F, R>(std::forward<F>(func));
        std::future<R> future = packaged_task->get_future();

        // 尝试将任务提交给当前线程的本地队列(如果当前线程是工作线程)
        // 或者随机选择一个工作线程的队列
        // 这是一个简化的策略,实际生产中可能需要一个全局任务队列
        // 或者更智能的负载感知策略。
        WorkerThread* current_worker = get_current_worker(); // 获取当前线程关联的 WorkerThread
        if (current_worker && current_worker->push_task(std::move(packaged_task))) {
            return future;
        }

        // 如果当前线程不是工作线程,或者本地队列已满
        // 随机选择一个工作线程来提交任务
        std::random_device rd;
        std::mt19937 generator(rd());
        std::uniform_int_distribution<size_t> distribution(0, num_workers_ - 1);

        bool pushed = false;
        // 循环尝试,直到成功推入一个队列
        // 实际中可能设置重试次数或回退到全局队列
        while (!pushed) {
            size_t target_worker_id = distribution(generator);
            if (workers_[target_worker_id]->push_task(std::move(packaged_task))) {
                pushed = true;
            } else {
                // 目标队列已满,短暂等待或尝试下一个随机队列
                std::this_thread::sleep_for(std::chrono::milliseconds(1));
            }
        }
        return future;
    }

    // 获取工作线程数量
    size_t get_num_workers() const {
        return num_workers_;
    }

    // 根据 ID 获取工作线程指针
    WorkerThread* get_worker(size_t id) {
        if (id < num_workers_) {
            return workers_[id].get();
        }
        return nullptr;
    }

private:
    size_t num_workers_;
    size_t deque_capacity_;
    std::vector<std::unique_ptr<WorkerThread>> workers_;

    // 线程局部存储,用于快速获取当前线程对应的 WorkerThread
    static thread_local WorkerThread* current_worker_instance;

    // 获取当前线程对应的 WorkerThread 指针
    WorkerThread* get_current_worker() {
        // 实际实现需要更复杂的注册机制,
        // 例如 WorkerThread 在 run_loop 开始时注册自己到 thread_local 变量
        // 并保存 Scheduler 的指针,以便 Scheduler 可以通过 thread_local 访问
        return current_worker_instance; // 这是一个简化,需要额外机制设置
    }
};

// 静态成员变量的定义
thread_local WorkerThread* Scheduler::current_worker_instance = nullptr;

// 为了让 WorkerThread 能够设置 thread_local 变量,需要修改 WorkerThread::run_loop
/*
void WorkerThread::run_loop() {
    Scheduler::current_worker_instance = this; // 注册当前工作线程
    // ... 原有逻辑 ...
    Scheduler::current_worker_instance = nullptr; // 解除注册
}
*/

任务提交策略:
submit 方法中,我们首先尝试将任务提交给当前线程的本地队列(如果当前线程本身就是一个 WorkerThread)。这种“任务窃取”的逆过程被称为“任务推入”,如果任务是由正在执行其他任务的工作线程创建的,那么将其推入本地队列能够更好地利用缓存。如果当前线程不是工作线程,或者本地队列已满,则随机选择一个工作线程的队列进行提交。在生产级别的库中,可能会有更复杂的策略,例如:

  • 全局任务队列: 当所有本地队列都满时,任务可以被推入一个全局的、受保护的队列。
  • 负载感知: 调度器可以根据各个工作线程的负载情况(例如,队列长度)来选择目标队列。
  • 通知机制: 提交任务后,如果工作线程处于休眠状态,需要通过条件变量或信号量唤醒它。

4. 任务粒度与性能考量

高性能任务并行库的构建,除了算法和架构设计,还需要在实际应用中考虑以下性能因素:

  • 任务粒度 (Task Granularity):

    • 过小: 如果任务的计算量过小,调度和同步的开销可能超过实际计算的收益。例如,一个只执行几次加法操作的任务,其创建、提交、执行和结果获取的开销可能远大于其本身的计算。
    • 过大: 如果任务粒度过大,可能导致并行度不足,部分工作线程空闲,从而影响负载均衡。
    • 最佳实践: 寻找一个平衡点,使任务的计算量足够大,以摊销调度开销,同时又足够小,以提供足够的并行度和负载均衡能力。通常,经验法则是任务的执行时间应在微秒到毫秒级别。对于递归算法,可以通过“分治”策略,当子任务足够小(低于某个阈值)时,直接在当前线程同步执行,而不是提交为新的异步任务。
  • 缓存局部性 (Cache Locality): Work-stealing 算法通过优先处理本地任务来改善缓存局部性,但任务本身的设计也很重要。尽量让相关数据在处理过程中保持在CPU缓存中。

  • 线程数 (Number of Workers): 通常,工作线程的数量应设置为与CPU的物理核心数或逻辑核心数(包括超线程)相同。过多的线程会导致频繁的上下文切换开销,过少则无法充分利用硬件资源。

  • 伪共享 (False Sharing): 当两个或更多个独立的原子变量(或被并发访问的数据)位于同一个缓存行中时,即使它们本身没有共享,也会因为缓存一致性协议而导致性能下降。一个线程修改了其中一个变量,会导致整个缓存行失效,迫使其他核心重新加载。在 LockFreeDeque 中,head_tail_ 应该被填充(padding)到不同的缓存行,以避免伪共享。

  • 内存分配器 (Memory Allocator): 频繁的任务创建和销毁会导致大量的内存分配和释放。使用高性能的内存池(Memory Pool)或定制的内存分配器可以显著减少这些开销。

5. 代码示例:使用 Work-Stealing 库计算斐波那契数列

我们来演示如何使用上述 Scheduler 提交一个典型的递归任务:计算斐波那契数列。这是一个很好的例子,因为它具有天然的并行结构。

#include <iostream>
#include <numeric> // For std::iota
#include <vector>

// 完整包含之前的 LockFreeDeque, ITask, PackagedTask, WorkerThread, Scheduler 的定义

// 定义一个计算斐波那契数列的函数
long long fibonacci(int n) {
    if (n <= 1) {
        return n;
    }
    return fibonacci(n - 1) + fibonacci(n - 2);
}

// 演示如何使用 Scheduler
int main() {
    std::cout << "Starting Work-Stealing Fibonacci example..." << std::endl;

    // 创建调度器,使用默认的工作线程数量 (通常是硬件并发数)
    Scheduler scheduler;
    scheduler.start_all_workers(); // 启动所有工作线程

    int n_to_calc = 40; // 计算第40个斐波那契数,这是一个耗时任务

    // 提交斐波那契计算任务
    std::cout << "Submitting fibonacci(" << n_to_calc << ") task..." << std::endl;
    std::future<long long> fib_future = scheduler.submit([n_to_calc]() {
        return fibonacci(n_to_calc);
    });

    // 等待任务完成并获取结果
    long long result = fib_future.get();
    std::cout << "fibonacci(" << n_to_calc << ") = " << result << std::endl;

    // 提交多个任务
    std::vector<std::future<long long>> futures;
    for (int i = 0; i < 10; ++i) {
        int current_n = 30 + i;
        futures.push_back(scheduler.submit([current_n]() {
            return fibonacci(current_n);
        }));
    }

    std::cout << "Submitted multiple Fibonacci tasks. Waiting for results..." << std::endl;
    for (int i = 0; i < 10; ++i) {
        long long res = futures[i].get();
        std::cout << "fibonacci(" << (30 + i) << ") = " << res << std::endl;
    }

    scheduler.stop_all_workers(); // 停止所有工作线程
    std::cout << "Work-Stealing Fibonacci example finished." << std::endl;

    return 0;
}

注意事项:
上述 fibonacci 函数是一个简单的递归实现,它在 n 较小时直接计算。为了更好地展示并行性,我们通常会将递归深度达到某个阈值时,将子任务提交给调度器。例如:

// 改进的并行斐波那契函数
long long parallel_fibonacci(int n, Scheduler& scheduler) {
    if (n <= 20) { // 阈值,小于20直接计算,避免任务粒度过小
        return fibonacci(n); // 调用非并行版本
    }

    // 提交两个子任务
    std::future<long long> f1 = scheduler.submit([&scheduler, n]() {
        return parallel_fibonacci(n - 1, scheduler);
    });
    std::future<long long> f2 = scheduler.submit([&scheduler, n]() {
        return parallel_fibonacci(n - 2, scheduler);
    });

    return f1.get() + f2.get();
}

// 在 main 中调用:
// std::future<long long> fib_future = scheduler.submit([&scheduler, n_to_calc]() {
//     return parallel_fibonacci(n_to_calc, scheduler);
// });

通过这种方式,只有当任务足够大时才进行并行化,避免了过多的调度开销,同时保持了良好的负载均衡。

6. 实际应用与进阶话题

构建一个健壮、高性能的并行库是一个复杂而持续的过程。我们所构建的 Work-stealing 框架只是一个起点,还有许多进阶话题值得探讨:

  • 任务依赖管理 (Task Dependencies): 许多复杂算法涉及任务之间的依赖关系(例如,任务 B 必须在任务 A 完成后才能开始)。这需要构建一个有向无环图 (DAG) 调度器,并在任务就绪时将其提交给工作线程。
  • 任务优先级 (Task Priorities): 支持不同优先级的任务,确保关键任务能够尽快执行。这可能需要在 Deque 中引入优先级队列,或者在窃取策略中考虑优先级。
  • 取消机制 (Cancellation): 如何优雅地取消正在执行或等待的任务?这需要任务能够响应取消请求,并在适当的时机中止执行。
  • 异常处理与传播 (Exception Handling): 任务内部抛出的异常如何捕获、传递并最终由原始调用者处理?std::promisestd::future 提供了基础的异常传播机制。
  • 调试挑战: 并发程序的调试是臭名昭著的困难。竞态条件、死锁和内存序问题可能难以复现和诊断。需要利用专门的并发调试工具和技术。
  • 与现有库的比较:
    • Intel TBB (Threading Building Blocks): 提供了丰富的并行算法和数据结构,其任务调度器就是基于 Work-stealing 算法。
    • OpenMP: 编译器指令驱动的并行编程模型,易于使用,但灵活性相对较低。
    • C++20 Executors: C++20 引入了 Executors 概念,旨在提供一个标准化的方式来抽象任务的执行策略。未来,Work-stealing 库可能会作为 Executors 的一个实现。

7. 构建高性能并行库的思考

构建一个高性能的并行库,不仅仅是实现一个算法,更是一种设计哲学。它要求我们:

  • 平衡性能与可用性: 极致的性能往往伴随着复杂的实现,但一个好的库也需要易于使用和理解。
  • 拥抱并发原语: 深入理解 C++ 的内存模型和原子操作,是编写正确、高效并发代码的基础。
  • 持续优化与测试: 并行性能受多种因素影响,需要反复测试、分析瓶颈并进行迭代优化。
  • 理解底层硬件: 了解 CPU 缓存、内存层次结构等硬件特性,有助于做出更明智的设计决策。

Work-stealing 算法为我们提供了一个强大的工具,能够构建出动态、高效且易于使用的任务并行库。它代表了现代并行编程的一个重要方向,使得开发者能够更专注于业务逻辑本身,而将底层的并行化细节交给强大的运行时系统处理。希望本次讲座能为您在C++高性能并行计算领域提供新的视角和实践指导。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注