好的,咱们今天来聊聊C++里一个经典的并发问题,也是死锁的典型案例:哲学家就餐问题。这问题听起来挺高大上,其实本质上就是一群哲学家抢筷子的故事,咱们把它扒开来,看看里面到底藏着什么猫腻,以及如何避免这些猫腻。
一、哲学家就餐问题:一个哲学家的吃饭难题
想象一下,有五位哲学家围着一张圆桌坐着,桌子上放着五个盘子,每个盘子之间都有一根筷子。哲学家们肚子饿了,想吃饭,但是他们必须同时拿到左右两边的筷子才能吃东西。问题来了,如果每个哲学家都先拿起左边的筷子,然后等待右边的筷子,结果会怎么样?
Bingo!所有哲学家都拿着左边的筷子,等待右边的筷子,谁也吃不成饭,进入了无尽的等待,这就是死锁。
这个问题的关键在于资源的竞争和占有。哲学家需要两根筷子才能吃饭,而筷子是有限的资源。当多个哲学家同时请求资源,并且每个哲学家都占有部分资源,等待其他哲学家释放自己需要的资源时,就可能发生死锁。
二、C++代码模拟哲学家就餐问题
咱们用C++代码来模拟一下这个场景,让大家更直观地感受死锁的发生。
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <chrono>
#include <random>
using namespace std;
// 哲学家数量
const int NUM_PHILOSOPHERS = 5;
// 筷子(互斥锁)
vector<mutex> chopsticks(NUM_PHILOSOPHERS);
// 随机数生成器,用于模拟思考时间
random_device rd;
mt19937 gen(rd());
uniform_int_distribution<> distrib(1, 5); // 思考时间 1-5 秒
// 哲学家类
class Philosopher {
public:
Philosopher(int id) : id_(id) {}
void dine() {
while (true) {
// 思考
think();
// 尝试拿起筷子
takeChopsticks();
// 吃饭
eat();
// 放下筷子
putChopsticks();
}
}
private:
void think() {
cout << "Philosopher " << id_ << " is thinking." << endl;
this_thread::sleep_for(chrono::seconds(distrib(gen))); // 模拟思考
}
void takeChopsticks() {
int left_chopstick = id_;
int right_chopstick = (id_ + 1) % NUM_PHILOSOPHERS;
cout << "Philosopher " << id_ << " is trying to take chopstick " << left_chopstick << "." << endl;
chopsticks[left_chopstick].lock(); // 拿起左边的筷子
cout << "Philosopher " << id_ << " took chopstick " << left_chopstick << "." << endl;
// 短暂的延迟,增加死锁发生的概率
this_thread::sleep_for(chrono::milliseconds(50));
cout << "Philosopher " << id_ << " is trying to take chopstick " << right_chopstick << "." << endl;
chopsticks[right_chopstick].lock(); // 拿起右边的筷子
cout << "Philosopher " << id_ << " took chopstick " << right_chopstick << "." << endl;
}
void eat() {
cout << "Philosopher " << id_ << " is eating." << endl;
this_thread::sleep_for(chrono::seconds(distrib(gen))); // 模拟吃饭
}
void putChopsticks() {
int left_chopstick = id_;
int right_chopstick = (id_ + 1) % NUM_PHILOSOPHERS;
chopsticks[left_chopstick].unlock(); // 放下左边的筷子
cout << "Philosopher " << id_ << " put down chopstick " << left_chopstick << "." << endl;
chopsticks[right_chopstick].unlock(); // 放下右边的筷子
cout << "Philosopher " << id_ << " put down chopstick " << right_chopstick << "." << endl;
}
private:
int id_;
};
int main() {
vector<thread> philosophers;
vector<Philosopher> philosopher_objects;
for (int i = 0; i < NUM_PHILOSOPHERS; ++i) {
philosopher_objects.emplace_back(i);
philosophers.emplace_back(&Philosopher::dine, &philosopher_objects[i]);
}
for (auto& philosopher : philosophers) {
philosopher.join();
}
return 0;
}
这段代码模拟了哲学家就餐的过程。每个哲学家都是一个线程,他们会不断地思考、尝试拿起筷子、吃饭、放下筷子。 仔细观察 takeChopsticks()
函数,会发现哲学家先拿起左边的筷子,然后再拿起右边的筷子。这就是死锁发生的根源。
运行这段代码,你会发现程序很快就卡住了,哲学家们都在等待对方释放筷子,谁也吃不成饭。
三、死锁产生的原因:四大金刚
要避免死锁,首先要了解死锁产生的原因。死锁的产生需要同时满足以下四个条件,我们可以称它们为“死锁四大金刚”:
条件 | 描述 |
---|---|
互斥条件 | 资源必须处于独占模式,即一次只能被一个线程占用。如果资源可以被多个线程共享,就不会出现死锁。 |
占有且等待条件 | 线程必须至少占有一个资源,并等待获取其他线程占用的资源。也就是说,线程不能释放已经占有的资源,必须等待获取所有需要的资源后才能释放。 |
不可剥夺条件 | 资源只能由占有它的线程主动释放,不能被其他线程强行剥夺。如果资源可以被剥夺,那么当一个线程占有资源但无法继续执行时,其他线程可以抢占该资源,从而避免死锁。 |
环路等待条件 | 必须存在一个线程-资源的环路链,链中的每个线程都在等待下一个线程占用的资源。例如,线程A等待线程B占用的资源,线程B等待线程C占用的资源,线程C等待线程A占用的资源,形成一个环路。 |
只有当这四个条件同时满足时,才会发生死锁。因此,要避免死锁,只需要破坏其中一个条件即可。
四、避免死锁的策略:十八般武艺
针对死锁的四大金刚,我们可以采用以下策略来避免死锁:
-
破坏互斥条件:
- 资源共享: 尽量使用可共享的资源,避免使用独占资源。例如,使用读写锁代替互斥锁,允许多个线程同时读取资源。
- 虚拟化: 将独占资源虚拟化为多个可共享的资源。例如,使用线程池来管理线程,将线程资源虚拟化为多个可用的线程。
但是,在哲学家就餐问题中,筷子本身就是互斥资源,必须被独占使用,所以这个策略不适用。
-
破坏占有且等待条件:
- 一次性申请所有资源: 线程在开始执行前,一次性申请所有需要的资源。如果资源不足,则线程阻塞等待,直到所有资源都可用。
- 资源预分配: 在程序启动时,预先分配所有需要的资源给线程。
在哲学家就餐问题中,我们可以让每个哲学家在开始吃饭前,同时申请两根筷子。如果两根筷子都可用,则哲学家可以吃饭;否则,哲学家释放已经占有的筷子,并等待一段时间后重新尝试。
// 修改后的 takeChopsticks 函数 void takeChopsticks() { int left_chopstick = id_; int right_chopstick = (id_ + 1) % NUM_PHILOSOPHERS; while (true) { // 使用 try_lock() 尝试同时获取两把筷子 if (chopsticks[left_chopstick].try_lock()) { cout << "Philosopher " << id_ << " took chopstick " << left_chopstick << "." << endl; if (chopsticks[right_chopstick].try_lock()) { cout << "Philosopher " << id_ << " took chopstick " << right_chopstick << "." << endl; // 成功获取两把筷子,退出循环 return; } else { // 未成功获取右边的筷子,释放左边的筷子 chopsticks[left_chopstick].unlock(); cout << "Philosopher " << id_ << " released chopstick " << left_chopstick << "." << endl; } } // 等待一段时间后重新尝试 this_thread::sleep_for(chrono::milliseconds(100)); } } // 修改后的 putChopsticks 函数 void putChopsticks() { int left_chopstick = id_; int right_chopstick = (id_ + 1) % NUM_PHILOSOPHERS; chopsticks[left_chopstick].unlock(); // 放下左边的筷子 cout << "Philosopher " << id_ << " put down chopstick " << left_chopstick << "." << endl; chopsticks[right_chopstick].unlock(); // 放下右边的筷子 cout << "Philosopher " << id_ << " put down chopstick " << right_chopstick << "." << endl; }
在这个修改后的代码中,我们使用了
try_lock()
函数来尝试同时获取两把筷子。如果两把筷子都可用,则try_lock()
函数返回true
,哲学家可以吃饭;否则,try_lock()
函数返回false
,哲学家释放已经占有的筷子,并等待一段时间后重新尝试。 -
破坏不可剥夺条件:
- 资源抢占: 允许线程抢占其他线程占用的资源。当一个线程占有资源但无法继续执行时,其他线程可以抢占该资源,从而避免死锁。
在哲学家就餐问题中,这个策略不太适用,因为筷子是物理资源,无法被强行剥夺。
-
破坏环路等待条件:
- 资源排序: 对所有资源进行排序,并要求线程按照固定的顺序申请资源。这样可以避免形成环路等待。
在哲学家就餐问题中,我们可以给筷子编号,并要求哲学家总是先拿起编号较小的筷子,然后再拿起编号较大的筷子。
// 修改后的 takeChopsticks 函数 void takeChopsticks() { int left_chopstick = id_; int right_chopstick = (id_ + 1) % NUM_PHILOSOPHERS; // 确保总是先拿起编号较小的筷子 if (left_chopstick > right_chopstick) { swap(left_chopstick, right_chopstick); } cout << "Philosopher " << id_ << " is trying to take chopstick " << left_chopstick << "." << endl; chopsticks[left_chopstick].lock(); // 拿起左边的筷子 cout << "Philosopher " << id_ << " took chopstick " << left_chopstick << "." << endl; // 短暂的延迟,增加死锁发生的概率 this_thread::sleep_for(chrono::milliseconds(50)); cout << "Philosopher " << id_ << " is trying to take chopstick " << right_chopstick << "." << endl; chopsticks[right_chopstick].lock(); // 拿起右边的筷子 cout << "Philosopher " << id_ << " took chopstick " << right_chopstick << "." << endl; } // 修改后的 putChopsticks 函数 void putChopsticks() { int left_chopstick = id_; int right_chopstick = (id_ + 1) % NUM_PHILOSOPHERS; // 确保总是先放下编号较小的筷子 if (left_chopstick > right_chopstick) { swap(left_chopstick, right_chopstick); } chopsticks[left_chopstick].unlock(); // 放下左边的筷子 cout << "Philosopher " << id_ << " put down chopstick " << left_chopstick << "." << endl; chopsticks[right_chopstick].unlock(); // 放下右边的筷子 cout << "Philosopher " << id_ << " put down chopstick " << right_chopstick << "." << endl; }
在这个修改后的代码中,我们首先比较左右两根筷子的编号,确保总是先拿起编号较小的筷子。这样就避免了形成环路等待,从而避免了死锁。
-
死锁检测与恢复:
- 死锁检测: 定期检测系统中是否存在死锁。
- 死锁恢复: 当检测到死锁时,采取措施恢复系统,例如,终止一个或多个线程,或者回滚事务。
死锁检测与恢复是一种事后补救的策略,需要在系统运行时进行监控和干预。这种策略比较复杂,一般用于对死锁容忍度较低的系统中。
五、总结:避免死锁,人人有责
死锁是一个复杂的并发问题,但只要我们理解死锁产生的原因,并采取相应的策略,就可以有效地避免死锁的发生。
- 预防胜于治疗: 尽量在设计阶段就考虑到死锁的可能性,并采取预防措施。
- 代码审查: 定期进行代码审查,检查是否存在潜在的死锁风险。
- 测试: 编写并发测试用例,模拟死锁场景,验证死锁预防措施的有效性。
总而言之,避免死锁需要我们时刻保持警惕,并不断学习和实践。只有这样,才能写出健壮、可靠的并发程序。
希望今天的讲座能帮助大家更好地理解和避免C++中的死锁问题。 记住,哲学家们能不能吃上饭,就看你们的代码写得怎么样了!