各位观众老爷,各位程序猿、程序媛们,大家好!今天咱们来聊聊C++里的任务调度,这可是个挺有意思的话题。你想想,你写了个多线程程序,噼里啪啦扔出一堆任务,但这些任务到底谁先执行,谁后执行,最终能不能把CPU的算力榨干,这都得靠任务调度来安排。
我们今天主要讲三种策略:工作窃取(Work Stealing)、优先级队列(Priority Queue)以及负载均衡(Load Balancing)。这三种策略各有千秋,适用场景也不同。咱争取用大白话,加上代码示例,让大家伙儿听明白、能上手。
一、工作窃取(Work Stealing):懒人有懒福,忙人别累死
想象一下,你开了一家餐馆,雇了几个厨师。如果有的厨师手脚麻利,很快就把自己的菜做完了,而有的厨师慢吞吞的,订单都排到门外了,这肯定不行。工作窃取就是解决这个问题的。
核心思想:每个线程都有自己的任务队列(局部队列),当线程空闲时,它会尝试从其他线程的任务队列里“偷”一些任务过来执行。这样就能保证每个线程尽量都处于忙碌状态,提高CPU利用率。
优点:
- 自动负载均衡: 忙的线程自动把任务分给闲的线程,不用人工干预。
- 减少线程同步开销: 每个线程主要操作自己的局部队列,减少了锁竞争。
缺点:
- 窃取任务有开销: 线程需要尝试去其他线程的队列里偷任务,这本身也需要消耗一些资源。
- 可能存在不公平: 总是慢的线程可能一直被“偷”,而快的线程总是忙个不停。
代码示例(伪代码):
class Task {
public:
virtual void execute() = 0; // 虚函数,定义任务执行的具体内容
virtual ~Task() {}
};
class WorkerThread {
public:
WorkerThread(int id) : id_(id) {}
void run() {
while (true) {
Task* task = getTaskFromLocalQueue(); // 从自己的任务队列获取任务
if (task) {
task->execute();
delete task;
} else {
// 如果自己的队列空了,就尝试去其他线程的队列里偷任务
task = stealTaskFromOtherQueue();
if (task) {
task->execute();
delete task;
} else {
// 如果偷不到任务,就休息一下
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
}
}
private:
Task* getTaskFromLocalQueue() {
std::lock_guard<std::mutex> lock(local_queue_mutex_);
if (!local_queue_.empty()) {
Task* task = local_queue_.front();
local_queue_.pop();
return task;
}
return nullptr;
}
Task* stealTaskFromOtherQueue() {
// 遍历其他线程的任务队列,尝试偷取任务
for (int i = 0; i < num_threads; ++i) {
if (i != id_) { // 不偷自己的
std::lock_guard<std::mutex> lock(other_queues_mutexes_[i]);
if (!other_queues_[i]->empty()) {
// 从队列尾部偷取任务 (LIFO)
Task* task = other_queues_[i]->back();
other_queues_[i]->pop_back();
return task;
}
}
}
return nullptr;
}
public:
void pushTask(Task* task) {
std::lock_guard<std::mutex> lock(local_queue_mutex_);
local_queue_.push(task);
}
private:
int id_; // 线程ID
std::queue<Task*> local_queue_; // 局部任务队列
std::mutex local_queue_mutex_;
// 假设我们有多个 WorkerThread 实例, 用来访问其他线程的队列
std::vector<std::queue<Task*>*> other_queues_;
std::vector<std::mutex> other_queues_mutexes_;
int num_threads;
};
// 示例Task
class MyTask : public Task {
public:
MyTask(int id) : task_id(id) {}
void execute() override {
std::cout << "Task " << task_id << " executed by thread " << std::this_thread::get_id() << std::endl;
// 模拟耗时操作
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
private:
int task_id;
};
int main() {
const int num_threads = std::thread::hardware_concurrency(); // 获取硬件支持的线程数
std::vector<WorkerThread> workers;
std::vector<std::thread> threads;
// 初始化 WorkerThread 和 队列
std::vector<std::queue<Task*>*> queues(num_threads);
std::vector<std::mutex> queue_mutexes(num_threads);
for(int i = 0; i < num_threads; ++i) {
queues[i] = new std::queue<Task*>();
workers.emplace_back(i);
workers[i].other_queues_ = queues;
workers[i].other_queues_mutexes_ = queue_mutexes;
workers[i].num_threads = num_threads;
}
// 创建线程
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back([&workers, i]() { workers[i].run(); });
}
// 创建一些任务并分配给不同的线程
for (int i = 0; i < 20; ++i) {
workers[i % num_threads].pushTask(new MyTask(i));
}
// 等待一段时间,让任务执行完毕
std::this_thread::sleep_for(std::chrono::seconds(5));
// 停止线程 (这里只是一个简单示例,实际应用需要更优雅的停止机制)
exit(0);
// 清理资源
for(int i = 0; i < num_threads; ++i) {
delete queues[i];
}
return 0;
}
代码解释:
Task
是一个抽象基类,定义了任务的接口,execute()
方法是任务执行的具体内容。WorkerThread
类代表一个工作线程,它维护一个local_queue_
局部任务队列。run()
方法是线程的主循环,它会不断地从局部队列获取任务执行,如果局部队列为空,就尝试从其他线程的队列里偷取任务。getTaskFromLocalQueue()
方法从局部队列获取任务,并使用std::mutex
保证线程安全。stealTaskFromOtherQueue()
方法遍历其他线程的队列,尝试偷取任务。注意,这里使用了std::lock_guard
来保证线程安全。- 主函数创建了一些
WorkerThread
实例,并将它们运行在独立的线程中。然后,主函数创建了一些MyTask
任务,并将它们分配给不同的线程。
注意事项:
- 任务队列的锁竞争: 多个线程可能会同时尝试从同一个队列里偷任务,因此需要使用锁来保证线程安全。但是,锁竞争会降低性能,所以需要尽量减少锁的粒度。
- 窃取策略: 线程可以从其他线程的队列头部或尾部偷取任务。从头部偷取任务(FIFO)可能会导致饥饿,而从尾部偷取任务(LIFO)可以提高缓存命中率。
- 停止机制: 在实际应用中,需要一种优雅的停止机制来停止工作线程。上面的代码直接使用了
exit(0)
,这在生产环境中是不可接受的。
适用场景:
- 任务数量多且大小不均匀。
- 对延迟不敏感,允许一定的任务窃取开销。
二、优先级队列(Priority Queue):皇上说了算,谁重要谁先来
有些时候,任务的重要性是不一样的。比如,用户界面的渲染任务肯定比后台的数据备份任务更重要,因为用户能直接感受到界面的卡顿。这时候,就需要使用优先级队列来调度任务。
核心思想: 每个任务都有一个优先级,优先级高的任务先执行。
优点:
- 保证重要任务的及时执行: 优先级高的任务可以优先获得CPU资源。
- 可以根据任务类型调整优先级: 可以根据实际需求动态调整任务的优先级。
缺点:
- 可能导致低优先级任务饥饿: 如果一直有高优先级任务,低优先级任务可能永远无法执行。
- 优先级设置需要仔细考虑: 优先级设置不合理可能会导致系统性能下降。
代码示例:
#include <iostream>
#include <queue>
#include <thread>
#include <chrono>
#include <mutex>
#include <condition_variable>
class Task {
public:
virtual void execute() = 0;
virtual int getPriority() const = 0;
virtual ~Task() {}
};
class MyTask : public Task {
public:
MyTask(int id, int priority) : task_id(id), priority_(priority) {}
void execute() override {
std::cout << "Task " << task_id << " executed by thread " << std::this_thread::get_id()
<< " with priority " << priority_ << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Simulate work
}
int getPriority() const override {
return priority_;
}
private:
int task_id;
int priority_;
};
// 自定义比较函数,用于优先级队列
struct TaskComparator {
bool operator()(Task* a, Task* b) {
return a->getPriority() > b->getPriority(); // 优先级高的排在前面
}
};
class PriorityScheduler {
public:
PriorityScheduler(int num_threads) : num_threads_(num_threads), running_(true) {
threads_.resize(num_threads_);
for (int i = 0; i < num_threads_; ++i) {
threads_[i] = std::thread(&PriorityScheduler::workerThread, this);
}
}
~PriorityScheduler() {
stop();
for (auto& thread : threads_) {
thread.join();
}
}
void submitTask(Task* task) {
{
std::lock_guard<std::mutex> lock(queue_mutex_);
task_queue_.push(task);
}
condition_.notify_one(); // 通知一个等待的线程
}
void stop() {
running_ = false;
condition_.notify_all(); // 通知所有线程停止
}
private:
void workerThread() {
while (running_) {
Task* task = nullptr;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_.wait(lock, [this]() { return !task_queue_.empty() || !running_; });
if (!running_ && task_queue_.empty()) {
return;
}
task = task_queue_.top();
task_queue_.pop();
}
if (task) {
task->execute();
delete task;
}
}
}
private:
std::priority_queue<Task*, std::vector<Task*>, TaskComparator> task_queue_;
std::mutex queue_mutex_;
std::condition_variable condition_;
std::vector<std::thread> threads_;
int num_threads_;
bool running_;
};
int main() {
PriorityScheduler scheduler(4); // 4个线程
// 提交一些任务,优先级不同
scheduler.submitTask(new MyTask(1, 3)); // 高优先级
scheduler.submitTask(new MyTask(2, 1)); // 低优先级
scheduler.submitTask(new MyTask(3, 2)); // 中优先级
scheduler.submitTask(new MyTask(4, 3)); // 高优先级
scheduler.submitTask(new MyTask(5, 1)); // 低优先级
scheduler.submitTask(new MyTask(6, 2)); // 中优先级
std::this_thread::sleep_for(std::chrono::seconds(3)); // 让任务执行一段时间
return 0;
}
代码解释:
TaskComparator
是一个自定义的比较函数,用于优先级队列。它比较两个Task
对象的优先级,优先级高的排在前面。std::priority_queue
是C++标准库提供的优先级队列,它会根据比较函数自动排序。PriorityScheduler
类负责管理任务队列和工作线程。submitTask()
方法将任务添加到优先级队列中。workerThread()
方法是线程的主循环,它会不断地从优先级队列获取任务执行。这里使用了std::unique_lock
和std::condition_variable
来实现线程同步。
注意事项:
- 优先级反转: 当一个高优先级任务需要等待一个低优先级任务释放资源时,可能会发生优先级反转。为了解决这个问题,可以使用优先级继承或优先级天花板等技术。
- 动态优先级调整: 可以根据任务的执行情况动态调整优先级。比如,如果一个任务长时间没有执行,可以提高它的优先级,防止饥饿。
- 实时性要求: 如果对任务的实时性要求很高,需要使用实时操作系统提供的调度机制。
适用场景:
- 任务的重要性不同。
- 需要保证重要任务的及时执行。
三、负载均衡(Load Balancing):雨露均沾,资源不浪费
在分布式系统中,通常会有多个计算节点。为了充分利用每个节点的算力,需要将任务合理地分配到不同的节点上,这就是负载均衡。
核心思想: 将任务均匀地分配到不同的计算节点上,避免某些节点过载,而其他节点空闲。
常见的负载均衡算法:
- 轮询(Round Robin): 将任务依次分配到不同的节点上。
- 加权轮询(Weighted Round Robin): 根据节点的性能分配不同权重的任务。
- 最少连接(Least Connections): 将任务分配到当前连接数最少的节点上。
- 哈希(Hash): 根据任务的某些属性(比如用户ID)计算哈希值,并将任务分配到对应的节点上。
- 随机(Random): 随机选择一个节点分配任务。
代码示例(简单轮询):
#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
class Task {
public:
virtual void execute() = 0;
virtual ~Task() {}
};
class MyTask : public Task {
public:
MyTask(int id) : task_id(id) {}
void execute() override {
std::cout << "Task " << task_id << " executed by node " << node_id << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Simulate work
}
int node_id;
private:
int task_id;
};
class LoadBalancer {
public:
LoadBalancer(int num_nodes) : num_nodes_(num_nodes), current_node_(0) {
nodes_.resize(num_nodes_);
for (int i = 0; i < num_nodes_; ++i) {
nodes_[i] = i; // 模拟节点ID
}
}
int getNode() {
int node = nodes_[current_node_];
current_node_ = (current_node_ + 1) % num_nodes_; // 轮询
return node;
}
private:
std::vector<int> nodes_;
int num_nodes_;
int current_node_;
};
void executeTaskOnNode(int node_id, Task* task) {
MyTask* myTask = dynamic_cast<MyTask*>(task);
if(myTask){
myTask->node_id = node_id;
myTask->execute();
delete task;
}
}
int main() {
const int num_nodes = 3;
LoadBalancer loadBalancer(num_nodes);
std::vector<std::thread> node_threads(num_nodes);
std::vector<std::vector<Task*>> tasks_per_node(num_nodes);
// 创建一些任务,并根据负载均衡算法分配到不同的节点上
for (int i = 0; i < 10; ++i) {
int node = loadBalancer.getNode();
tasks_per_node[node].push_back(new MyTask(i));
}
// 启动线程模拟节点处理任务
for(int i = 0; i < num_nodes; ++i){
node_threads[i] = std::thread([i, &tasks_per_node](){
for(Task* task : tasks_per_node[i]){
executeTaskOnNode(i, task);
}
});
}
// 等待线程执行完毕
for(auto& thread : node_threads){
thread.join();
}
return 0;
}
代码解释:
LoadBalancer
类实现了简单的轮询负载均衡算法。getNode()
方法返回下一个节点的ID,并使用current_node_
变量来维护当前节点的索引。- 主函数创建了一些
MyTask
任务,并根据负载均衡算法将它们分配到不同的节点上。
注意事项:
- 节点健康检查: 需要定期检查节点的健康状态,如果节点出现故障,需要将其从负载均衡列表中移除。
- 会话保持: 对于需要保持会话状态的应用,需要使用会话保持机制,保证同一个用户的请求被分配到同一个节点上。
- 动态伸缩: 可以根据节点的负载情况动态增加或减少节点数量。
适用场景:
- 分布式系统。
- 需要充分利用每个节点的算力。
总结:
调度策略 | 核心思想 | 优点 | 缺点 | 适用场景 |
---|---|---|---|---|
工作窃取 | 闲的线程偷忙的线程的任务 | 自动负载均衡,减少线程同步开销 | 窃取任务有开销,可能存在不公平 | 任务数量多且大小不均匀,对延迟不敏感 |
优先级队列 | 优先级高的任务先执行 | 保证重要任务的及时执行,可以根据任务类型调整优先级 | 可能导致低优先级任务饥饿,优先级设置需要仔细考虑 | 任务的重要性不同,需要保证重要任务的及时执行 |
负载均衡 | 将任务均匀地分配到不同的计算节点上 | 充分利用每个节点的算力,避免某些节点过载 | 需要考虑节点健康检查、会话保持、动态伸缩等问题 | 分布式系统,需要充分利用每个节点的算力 |
好了,今天就先聊到这儿。希望大家通过今天的讲解,能够对C++的任务调度策略有一个更清晰的认识。记住,没有最好的策略,只有最适合你的策略。在实际应用中,需要根据具体的需求选择合适的调度策略,才能让你的程序跑得更快、更稳! 感谢各位的观看!