C++ 任务调度策略:工作窃取、优先级队列与负载均衡

各位观众老爷,各位程序猿、程序媛们,大家好!今天咱们来聊聊C++里的任务调度,这可是个挺有意思的话题。你想想,你写了个多线程程序,噼里啪啦扔出一堆任务,但这些任务到底谁先执行,谁后执行,最终能不能把CPU的算力榨干,这都得靠任务调度来安排。

我们今天主要讲三种策略:工作窃取(Work Stealing)、优先级队列(Priority Queue)以及负载均衡(Load Balancing)。这三种策略各有千秋,适用场景也不同。咱争取用大白话,加上代码示例,让大家伙儿听明白、能上手。

一、工作窃取(Work Stealing):懒人有懒福,忙人别累死

想象一下,你开了一家餐馆,雇了几个厨师。如果有的厨师手脚麻利,很快就把自己的菜做完了,而有的厨师慢吞吞的,订单都排到门外了,这肯定不行。工作窃取就是解决这个问题的。

核心思想:每个线程都有自己的任务队列(局部队列),当线程空闲时,它会尝试从其他线程的任务队列里“偷”一些任务过来执行。这样就能保证每个线程尽量都处于忙碌状态,提高CPU利用率。

优点:

  • 自动负载均衡: 忙的线程自动把任务分给闲的线程,不用人工干预。
  • 减少线程同步开销: 每个线程主要操作自己的局部队列,减少了锁竞争。

缺点:

  • 窃取任务有开销: 线程需要尝试去其他线程的队列里偷任务,这本身也需要消耗一些资源。
  • 可能存在不公平: 总是慢的线程可能一直被“偷”,而快的线程总是忙个不停。

代码示例(伪代码):

class Task {
public:
    virtual void execute() = 0; // 虚函数,定义任务执行的具体内容
    virtual ~Task() {}
};

class WorkerThread {
public:
    WorkerThread(int id) : id_(id) {}

    void run() {
        while (true) {
            Task* task = getTaskFromLocalQueue(); // 从自己的任务队列获取任务
            if (task) {
                task->execute();
                delete task;
            } else {
                // 如果自己的队列空了,就尝试去其他线程的队列里偷任务
                task = stealTaskFromOtherQueue();
                if (task) {
                    task->execute();
                    delete task;
                } else {
                    // 如果偷不到任务,就休息一下
                    std::this_thread::sleep_for(std::chrono::milliseconds(1));
                }
            }
        }
    }

private:
    Task* getTaskFromLocalQueue() {
        std::lock_guard<std::mutex> lock(local_queue_mutex_);
        if (!local_queue_.empty()) {
            Task* task = local_queue_.front();
            local_queue_.pop();
            return task;
        }
        return nullptr;
    }

    Task* stealTaskFromOtherQueue() {
        // 遍历其他线程的任务队列,尝试偷取任务
        for (int i = 0; i < num_threads; ++i) {
            if (i != id_) { // 不偷自己的
                std::lock_guard<std::mutex> lock(other_queues_mutexes_[i]);
                if (!other_queues_[i]->empty()) {
                    // 从队列尾部偷取任务 (LIFO)
                    Task* task = other_queues_[i]->back();
                    other_queues_[i]->pop_back();
                    return task;
                }
            }
        }
        return nullptr;
    }

public:
    void pushTask(Task* task) {
        std::lock_guard<std::mutex> lock(local_queue_mutex_);
        local_queue_.push(task);
    }

private:
    int id_; // 线程ID
    std::queue<Task*> local_queue_; // 局部任务队列
    std::mutex local_queue_mutex_;

    // 假设我们有多个 WorkerThread 实例, 用来访问其他线程的队列
    std::vector<std::queue<Task*>*> other_queues_;
    std::vector<std::mutex> other_queues_mutexes_;
    int num_threads;
};

// 示例Task
class MyTask : public Task {
public:
    MyTask(int id) : task_id(id) {}
    void execute() override {
        std::cout << "Task " << task_id << " executed by thread " << std::this_thread::get_id() << std::endl;
        // 模拟耗时操作
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }

private:
    int task_id;
};

int main() {
    const int num_threads = std::thread::hardware_concurrency(); // 获取硬件支持的线程数
    std::vector<WorkerThread> workers;
    std::vector<std::thread> threads;

    // 初始化 WorkerThread 和 队列
    std::vector<std::queue<Task*>*> queues(num_threads);
    std::vector<std::mutex> queue_mutexes(num_threads);

    for(int i = 0; i < num_threads; ++i) {
        queues[i] = new std::queue<Task*>();
        workers.emplace_back(i);
        workers[i].other_queues_ = queues;
        workers[i].other_queues_mutexes_ = queue_mutexes;
        workers[i].num_threads = num_threads;
    }

    // 创建线程
    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back([&workers, i]() { workers[i].run(); });
    }

    // 创建一些任务并分配给不同的线程
    for (int i = 0; i < 20; ++i) {
        workers[i % num_threads].pushTask(new MyTask(i));
    }

    // 等待一段时间,让任务执行完毕
    std::this_thread::sleep_for(std::chrono::seconds(5));

    // 停止线程 (这里只是一个简单示例,实际应用需要更优雅的停止机制)
    exit(0);

    // 清理资源
    for(int i = 0; i < num_threads; ++i) {
        delete queues[i];
    }

    return 0;
}

代码解释:

  • Task 是一个抽象基类,定义了任务的接口,execute() 方法是任务执行的具体内容。
  • WorkerThread 类代表一个工作线程,它维护一个 local_queue_ 局部任务队列。
  • run() 方法是线程的主循环,它会不断地从局部队列获取任务执行,如果局部队列为空,就尝试从其他线程的队列里偷取任务。
  • getTaskFromLocalQueue() 方法从局部队列获取任务,并使用 std::mutex 保证线程安全。
  • stealTaskFromOtherQueue() 方法遍历其他线程的队列,尝试偷取任务。注意,这里使用了 std::lock_guard 来保证线程安全。
  • 主函数创建了一些 WorkerThread 实例,并将它们运行在独立的线程中。然后,主函数创建了一些 MyTask 任务,并将它们分配给不同的线程。

注意事项:

  • 任务队列的锁竞争: 多个线程可能会同时尝试从同一个队列里偷任务,因此需要使用锁来保证线程安全。但是,锁竞争会降低性能,所以需要尽量减少锁的粒度。
  • 窃取策略: 线程可以从其他线程的队列头部或尾部偷取任务。从头部偷取任务(FIFO)可能会导致饥饿,而从尾部偷取任务(LIFO)可以提高缓存命中率。
  • 停止机制: 在实际应用中,需要一种优雅的停止机制来停止工作线程。上面的代码直接使用了 exit(0),这在生产环境中是不可接受的。

适用场景:

  • 任务数量多且大小不均匀。
  • 对延迟不敏感,允许一定的任务窃取开销。

二、优先级队列(Priority Queue):皇上说了算,谁重要谁先来

有些时候,任务的重要性是不一样的。比如,用户界面的渲染任务肯定比后台的数据备份任务更重要,因为用户能直接感受到界面的卡顿。这时候,就需要使用优先级队列来调度任务。

核心思想: 每个任务都有一个优先级,优先级高的任务先执行。

优点:

  • 保证重要任务的及时执行: 优先级高的任务可以优先获得CPU资源。
  • 可以根据任务类型调整优先级: 可以根据实际需求动态调整任务的优先级。

缺点:

  • 可能导致低优先级任务饥饿: 如果一直有高优先级任务,低优先级任务可能永远无法执行。
  • 优先级设置需要仔细考虑: 优先级设置不合理可能会导致系统性能下降。

代码示例:

#include <iostream>
#include <queue>
#include <thread>
#include <chrono>
#include <mutex>
#include <condition_variable>

class Task {
public:
    virtual void execute() = 0;
    virtual int getPriority() const = 0;
    virtual ~Task() {}
};

class MyTask : public Task {
public:
    MyTask(int id, int priority) : task_id(id), priority_(priority) {}
    void execute() override {
        std::cout << "Task " << task_id << " executed by thread " << std::this_thread::get_id()
                  << " with priority " << priority_ << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Simulate work
    }

    int getPriority() const override {
        return priority_;
    }

private:
    int task_id;
    int priority_;
};

// 自定义比较函数,用于优先级队列
struct TaskComparator {
    bool operator()(Task* a, Task* b) {
        return a->getPriority() > b->getPriority(); // 优先级高的排在前面
    }
};

class PriorityScheduler {
public:
    PriorityScheduler(int num_threads) : num_threads_(num_threads), running_(true) {
        threads_.resize(num_threads_);
        for (int i = 0; i < num_threads_; ++i) {
            threads_[i] = std::thread(&PriorityScheduler::workerThread, this);
        }
    }

    ~PriorityScheduler() {
        stop();
        for (auto& thread : threads_) {
            thread.join();
        }
    }

    void submitTask(Task* task) {
        {
            std::lock_guard<std::mutex> lock(queue_mutex_);
            task_queue_.push(task);
        }
        condition_.notify_one(); // 通知一个等待的线程
    }

    void stop() {
        running_ = false;
        condition_.notify_all(); // 通知所有线程停止
    }

private:
    void workerThread() {
        while (running_) {
            Task* task = nullptr;
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);
                condition_.wait(lock, [this]() { return !task_queue_.empty() || !running_; });

                if (!running_ && task_queue_.empty()) {
                    return;
                }

                task = task_queue_.top();
                task_queue_.pop();
            }

            if (task) {
                task->execute();
                delete task;
            }
        }
    }

private:
    std::priority_queue<Task*, std::vector<Task*>, TaskComparator> task_queue_;
    std::mutex queue_mutex_;
    std::condition_variable condition_;
    std::vector<std::thread> threads_;
    int num_threads_;
    bool running_;
};

int main() {
    PriorityScheduler scheduler(4); // 4个线程

    // 提交一些任务,优先级不同
    scheduler.submitTask(new MyTask(1, 3)); // 高优先级
    scheduler.submitTask(new MyTask(2, 1)); // 低优先级
    scheduler.submitTask(new MyTask(3, 2)); // 中优先级
    scheduler.submitTask(new MyTask(4, 3)); // 高优先级
    scheduler.submitTask(new MyTask(5, 1)); // 低优先级
    scheduler.submitTask(new MyTask(6, 2)); // 中优先级

    std::this_thread::sleep_for(std::chrono::seconds(3)); // 让任务执行一段时间

    return 0;
}

代码解释:

  • TaskComparator 是一个自定义的比较函数,用于优先级队列。它比较两个 Task 对象的优先级,优先级高的排在前面。
  • std::priority_queue 是C++标准库提供的优先级队列,它会根据比较函数自动排序。
  • PriorityScheduler 类负责管理任务队列和工作线程。
  • submitTask() 方法将任务添加到优先级队列中。
  • workerThread() 方法是线程的主循环,它会不断地从优先级队列获取任务执行。这里使用了 std::unique_lockstd::condition_variable 来实现线程同步。

注意事项:

  • 优先级反转: 当一个高优先级任务需要等待一个低优先级任务释放资源时,可能会发生优先级反转。为了解决这个问题,可以使用优先级继承或优先级天花板等技术。
  • 动态优先级调整: 可以根据任务的执行情况动态调整优先级。比如,如果一个任务长时间没有执行,可以提高它的优先级,防止饥饿。
  • 实时性要求: 如果对任务的实时性要求很高,需要使用实时操作系统提供的调度机制。

适用场景:

  • 任务的重要性不同。
  • 需要保证重要任务的及时执行。

三、负载均衡(Load Balancing):雨露均沾,资源不浪费

在分布式系统中,通常会有多个计算节点。为了充分利用每个节点的算力,需要将任务合理地分配到不同的节点上,这就是负载均衡。

核心思想: 将任务均匀地分配到不同的计算节点上,避免某些节点过载,而其他节点空闲。

常见的负载均衡算法:

  • 轮询(Round Robin): 将任务依次分配到不同的节点上。
  • 加权轮询(Weighted Round Robin): 根据节点的性能分配不同权重的任务。
  • 最少连接(Least Connections): 将任务分配到当前连接数最少的节点上。
  • 哈希(Hash): 根据任务的某些属性(比如用户ID)计算哈希值,并将任务分配到对应的节点上。
  • 随机(Random): 随机选择一个节点分配任务。

代码示例(简单轮询):

#include <iostream>
#include <vector>
#include <thread>
#include <chrono>

class Task {
public:
    virtual void execute() = 0;
    virtual ~Task() {}
};

class MyTask : public Task {
public:
    MyTask(int id) : task_id(id) {}
    void execute() override {
        std::cout << "Task " << task_id << " executed by node " << node_id << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Simulate work
    }

    int node_id;
private:
    int task_id;
};

class LoadBalancer {
public:
    LoadBalancer(int num_nodes) : num_nodes_(num_nodes), current_node_(0) {
        nodes_.resize(num_nodes_);
        for (int i = 0; i < num_nodes_; ++i) {
            nodes_[i] = i; // 模拟节点ID
        }
    }

    int getNode() {
        int node = nodes_[current_node_];
        current_node_ = (current_node_ + 1) % num_nodes_; // 轮询
        return node;
    }

private:
    std::vector<int> nodes_;
    int num_nodes_;
    int current_node_;
};

void executeTaskOnNode(int node_id, Task* task) {
  MyTask* myTask = dynamic_cast<MyTask*>(task);
  if(myTask){
    myTask->node_id = node_id;
    myTask->execute();
    delete task;
  }
}

int main() {
    const int num_nodes = 3;
    LoadBalancer loadBalancer(num_nodes);

    std::vector<std::thread> node_threads(num_nodes);
    std::vector<std::vector<Task*>> tasks_per_node(num_nodes);

    // 创建一些任务,并根据负载均衡算法分配到不同的节点上
    for (int i = 0; i < 10; ++i) {
        int node = loadBalancer.getNode();
        tasks_per_node[node].push_back(new MyTask(i));
    }

    // 启动线程模拟节点处理任务
    for(int i = 0; i < num_nodes; ++i){
      node_threads[i] = std::thread([i, &tasks_per_node](){
        for(Task* task : tasks_per_node[i]){
          executeTaskOnNode(i, task);
        }
      });
    }

    // 等待线程执行完毕
    for(auto& thread : node_threads){
      thread.join();
    }

    return 0;
}

代码解释:

  • LoadBalancer 类实现了简单的轮询负载均衡算法。
  • getNode() 方法返回下一个节点的ID,并使用 current_node_ 变量来维护当前节点的索引。
  • 主函数创建了一些 MyTask 任务,并根据负载均衡算法将它们分配到不同的节点上。

注意事项:

  • 节点健康检查: 需要定期检查节点的健康状态,如果节点出现故障,需要将其从负载均衡列表中移除。
  • 会话保持: 对于需要保持会话状态的应用,需要使用会话保持机制,保证同一个用户的请求被分配到同一个节点上。
  • 动态伸缩: 可以根据节点的负载情况动态增加或减少节点数量。

适用场景:

  • 分布式系统。
  • 需要充分利用每个节点的算力。

总结:

调度策略 核心思想 优点 缺点 适用场景
工作窃取 闲的线程偷忙的线程的任务 自动负载均衡,减少线程同步开销 窃取任务有开销,可能存在不公平 任务数量多且大小不均匀,对延迟不敏感
优先级队列 优先级高的任务先执行 保证重要任务的及时执行,可以根据任务类型调整优先级 可能导致低优先级任务饥饿,优先级设置需要仔细考虑 任务的重要性不同,需要保证重要任务的及时执行
负载均衡 将任务均匀地分配到不同的计算节点上 充分利用每个节点的算力,避免某些节点过载 需要考虑节点健康检查、会话保持、动态伸缩等问题 分布式系统,需要充分利用每个节点的算力

好了,今天就先聊到这儿。希望大家通过今天的讲解,能够对C++的任务调度策略有一个更清晰的认识。记住,没有最好的策略,只有最适合你的策略。在实际应用中,需要根据具体的需求选择合适的调度策略,才能让你的程序跑得更快、更稳! 感谢各位的观看!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注