C++读写锁(RWLock)的饥饿问题预防与公平性保证
各位同学们,大家好。今天我们来深入探讨一个并发编程中常见但又容易被忽视的问题:读写锁(RWLock)的饥饿问题,以及如何通过一些策略来预防和保证读写锁的公平性。
读写锁是一种并发控制机制,它允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。这种锁在读操作远多于写操作的场景下,可以显著提高并发性能。然而,如果设计不当,读写锁可能会导致写线程饥饿,甚至完全无法获取锁,从而影响系统的稳定性和响应速度。
什么是饥饿问题?
饥饿问题(Starvation)是指一个或多个线程因为某种原因,长时间甚至永远无法获得所需的资源,导致其无法执行。在读写锁的上下文中,写线程饥饿通常发生在以下情况:
-
读者优先策略: 当有读线程正在持有读锁时,如果有新的读线程请求读锁,那么它可以立即获得锁,而无需等待。这种策略在读操作频繁的情况下可以提高并发性,但如果读线程持续不断地到达,写线程可能永远无法获得锁,导致写线程饥饿。
-
锁释放机制不公平: 即使采用某种公平策略,如果锁的释放机制不合理,也可能导致某些线程一直无法获得锁。
读者优先策略下的饥饿问题案例
为了更直观地理解读者优先策略下的饥饿问题,我们来看一个简单的例子:
#include <iostream>
#include <thread>
#include <shared_mutex>
#include <chrono>
std::shared_mutex rw_mutex;
void reader(int id) {
while (true) {
rw_mutex.lock_shared();
std::cout << "Reader " << id << " is reading" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
rw_mutex.unlock_shared();
}
}
void writer(int id) {
while (true) {
rw_mutex.lock();
std::cout << "Writer " << id << " is writing" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
rw_mutex.unlock();
}
}
int main() {
std::thread readers[5];
std::thread writer1(writer, 1);
for (int i = 0; i < 5; ++i) {
readers[i] = std::thread(reader, i + 1);
}
writer1.join(); // 永远无法join,因为writer可能一直无法获取锁
for (int i = 0; i < 5; ++i) {
readers[i].join();
}
return 0;
}
在这个例子中,我们创建了5个读线程和一个写线程。读线程不断地获取读锁并读取数据,写线程不断地获取写锁并写入数据。由于读线程的持续存在,写线程很可能永远无法获得写锁,导致写线程饥饿。
预防饥饿问题的策略
为了解决读写锁的饥饿问题,我们可以采用以下几种策略:
-
写者优先策略(Writer Preference): 当有读线程正在持有读锁时,如果有写线程请求写锁,那么后续的读线程必须等待,直到写线程完成。这种策略可以保证写线程不会被饿死,但可能会降低并发性。
-
公平策略(Fairness): 按照线程请求锁的先后顺序来分配锁。这种策略可以保证每个线程都有机会获得锁,但可能会降低并发性。
-
条件变量(Condition Variable): 使用条件变量来显式地控制读写锁的获取和释放,从而实现更复杂的公平性策略。
下面我们将详细介绍这几种策略,并给出相应的 C++ 代码实现。
1. 写者优先策略的实现
写者优先策略的核心思想是:当有写线程等待时,阻止新的读线程获取读锁。一种简单的实现方式是使用一个计数器来记录等待写线程的数量。
#include <iostream>
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <condition_variable>
class WriterPreferredRWLock {
private:
std::mutex mutex_;
std::condition_variable read_cv_;
std::condition_variable write_cv_;
int readers_ = 0;
int writers_waiting_ = 0;
bool writer_active_ = false;
public:
void lock_shared() {
std::unique_lock<std::mutex> lock(mutex_);
while (writer_active_ || writers_waiting_ > 0) {
read_cv_.wait(lock);
}
readers_++;
}
void unlock_shared() {
std::unique_lock<std::mutex> lock(mutex_);
readers_--;
if (readers_ == 0) {
write_cv_.notify_one();
}
}
void lock() {
std::unique_lock<std::mutex> lock(mutex_);
writers_waiting_++;
while (readers_ > 0 || writer_active_) {
write_cv_.wait(lock);
}
writers_waiting_--;
writer_active_ = true;
}
void unlock() {
std::unique_lock<std::mutex> lock(mutex_);
writer_active_ = false;
write_cv_.notify_one();
read_cv_.notify_all();
}
};
代码解释:
readers_: 记录当前持有读锁的线程数量。writers_waiting_: 记录正在等待写锁的线程数量。writer_active_: 标记是否有写线程正在持有写锁。read_cv_: 读线程等待的条件变量。write_cv_: 写线程等待的条件变量。
lock_shared() (获取读锁):
- 首先获取互斥锁
mutex_。 - 如果当前有写线程正在活动 (
writer_active_) 或者有写线程正在等待 (writers_waiting_ > 0),则读线程进入等待状态,直到read_cv_被唤醒。 - 如果可以获取读锁,则
readers_计数器加 1。
unlock_shared() (释放读锁):
- 首先获取互斥锁
mutex_。 readers_计数器减 1。- 如果
readers_变为 0,表示没有读线程持有读锁,则唤醒一个等待的写线程 (write_cv_.notify_one())。
lock() (获取写锁):
- 首先获取互斥锁
mutex_。 writers_waiting_计数器加 1。- 如果当前有读线程持有读锁 (
readers_ > 0) 或者有写线程正在活动 (writer_active_),则写线程进入等待状态,直到write_cv_被唤醒。 - 如果可以获取写锁,则
writers_waiting_计数器减 1,并设置writer_active_为true。
unlock() (释放写锁):
- 首先获取互斥锁
mutex_。 - 设置
writer_active_为false。 - 唤醒一个等待的写线程 (
write_cv_.notify_one()) 和所有等待的读线程 (read_cv_.notify_all())。
优点: 保证写线程不会被饿死。
缺点: 降低了并发性,因为即使没有写线程等待,新的读线程也必须等待已经存在的读线程释放锁。
2. 公平策略的实现
公平策略的实现通常需要维护一个等待队列,按照线程请求锁的先后顺序来分配锁。一种简单的实现方式是使用 std::queue 来维护等待队列。
#include <iostream>
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <condition_variable>
#include <queue>
class FairRWLock {
private:
std::mutex mutex_;
std::condition_variable read_cv_;
std::condition_variable write_cv_;
int readers_ = 0;
bool writer_active_ = false;
enum class LockType { READ, WRITE };
std::queue<LockType> waiting_queue_;
public:
void lock_shared() {
std::unique_lock<std::mutex> lock(mutex_);
waiting_queue_.push(LockType::READ);
while (writer_active_ || (!waiting_queue_.empty() && waiting_queue_.front() != LockType::READ)) {
read_cv_.wait(lock);
}
waiting_queue_.pop();
readers_++;
}
void unlock_shared() {
std::unique_lock<std::mutex> lock(mutex_);
readers_--;
if (readers_ == 0) {
if (!waiting_queue_.empty()) {
if (waiting_queue_.front() == LockType::WRITE) {
write_cv_.notify_one();
} else {
read_cv_.notify_all(); // Notify all waiting readers if next in queue is also a reader.
}
}
}
}
void lock() {
std::unique_lock<std::mutex> lock(mutex_);
waiting_queue_.push(LockType::WRITE);
while (readers_ > 0 || writer_active_ || (!waiting_queue_.empty() && waiting_queue_.front() != LockType::WRITE)) {
write_cv_.wait(lock);
}
waiting_queue_.pop();
writer_active_ = true;
}
void unlock() {
std::unique_lock<std::mutex> lock(mutex_);
writer_active_ = false;
if (!waiting_queue_.empty()) {
if (waiting_queue_.front() == LockType::WRITE) {
write_cv_.notify_one();
} else {
read_cv_.notify_all();
}
}
}
};
代码解释:
waiting_queue_: 维护一个请求锁的队列,按照请求的先后顺序排列。LockType: 枚举类型,表示读锁或写锁。
lock_shared() (获取读锁):
- 将
LockType::READ加入等待队列。 - 如果当前有写线程正在活动,或者等待队列的头部不是读锁,则等待。
- 从等待队列中移除读锁请求,
readers_加 1。
unlock_shared() (释放读锁):
readers_减 1。- 如果
readers_为 0,检查等待队列,唤醒下一个等待的线程(读线程或写线程)。
lock() (获取写锁):
- 将
LockType::WRITE加入等待队列。 - 如果当前有读线程正在活动,或者有写线程正在活动,或者等待队列的头部不是写锁,则等待。
- 从等待队列中移除写锁请求,设置
writer_active_为true。
unlock() (释放写锁):
- 设置
writer_active_为false。 - 检查等待队列,唤醒下一个等待的线程(读线程或写线程)。
优点: 保证每个线程都有机会获得锁,避免饥饿问题。
缺点: 降低了并发性,因为即使可以并发执行读操作,也需要按照请求的先后顺序来分配锁。
3. 使用条件变量的更细粒度控制
除了上述两种策略外,我们还可以使用条件变量来显式地控制读写锁的获取和释放,从而实现更复杂的公平性策略。这种方法更加灵活,可以根据具体的应用场景进行定制。
#include <iostream>
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <condition_variable>
class AdvancedRWLock {
private:
std::mutex mutex_;
std::condition_variable read_cv_;
std::condition_variable write_cv_;
int readers_ = 0;
int writers_waiting_ = 0;
bool writer_active_ = false;
public:
void lock_shared() {
std::unique_lock<std::mutex> lock(mutex_);
while (writer_active_ || writers_waiting_ > 0) {
read_cv_.wait(lock);
}
readers_++;
}
void unlock_shared() {
std::unique_lock<std::mutex> lock(mutex_);
readers_--;
if (readers_ == 0) {
write_cv_.notify_one(); // Give priority to waiting writers
}
}
void lock() {
std::unique_lock<std::mutex> lock(mutex_);
writers_waiting_++;
while (readers_ > 0 || writer_active_) {
write_cv_.wait(lock);
}
writers_waiting_--;
writer_active_ = true;
}
void unlock() {
std::unique_lock<std::mutex> lock(mutex_);
writer_active_ = false;
if (writers_waiting_ > 0) {
write_cv_.notify_one(); // Give priority to waiting writers
} else {
read_cv_.notify_all(); // Allow multiple readers to proceed
}
}
};
这个例子和写者优先策略的实现非常相似,但是更加灵活,可以根据具体的需求进行调整。例如,可以根据等待时间来调整线程的优先级,或者根据系统负载来动态调整策略。
各种策略的对比
为了更清晰地了解各种策略的优缺点,我们用一个表格来总结一下:
| 策略 | 优点 | 缺点 |
|---|---|---|
| 读者优先 | 并发性高,读操作性能好 | 写线程容易饥饿 |
| 写者优先 | 保证写线程不会被饿死 | 降低并发性,即使没有写线程等待,新的读线程也必须等待 |
| 公平策略 | 保证每个线程都有机会获得锁,避免饥饿问题 | 降低并发性,需要维护等待队列 |
| 条件变量控制 | 灵活性高,可以根据具体的应用场景进行定制 | 实现复杂,需要仔细设计 |
选择合适的策略
选择合适的读写锁策略需要根据具体的应用场景进行权衡。
-
读操作远多于写操作,且对写操作的响应时间要求不高: 可以选择读者优先策略。但是需要注意监控写线程的饥饿情况,并采取相应的措施。
-
读操作和写操作的比例接近,或者对写操作的响应时间要求较高: 可以选择写者优先策略或公平策略。
-
需要更复杂的公平性策略,或者需要根据系统负载来动态调整策略: 可以使用条件变量来显式地控制读写锁的获取和释放。
小结
今天我们讨论了读写锁的饥饿问题,以及如何通过写者优先策略、公平策略和条件变量控制等方法来预防饥饿问题,保证读写锁的公平性。选择合适的策略需要根据具体的应用场景进行权衡,以达到最佳的并发性能和公平性。希望今天的讲解对大家有所帮助。
通过本文的学习,我们了解了读写锁的饥饿问题,以及如何通过不同的策略来解决这个问题。关键在于理解各种策略的优缺点,并根据实际的应用场景做出选择。
我们学习了三种策略:读者优先、写者优先和公平策略。每种策略都有其适用的场景,我们需要根据实际情况选择合适的策略,以达到最佳的并发性能和公平性。
希望大家在实际的并发编程中,能够灵活运用这些知识,写出高效、稳定的代码。
更多IT精英技术系列讲座,到智猿学院