各位观众,各位听众,欢迎来到今天的“C++线程安全队列:手把手实现高效的无锁或有锁队列”讲座。我是你们的老朋友,今天就带大家深入浅出地搞定这个并发编程里的重要角色——线程安全队列。
咱先说说,为啥需要线程安全队列?想象一下,你开了个煎饼摊,一个窗口负责擀面,一个窗口负责放料,一个窗口负责收钱。如果没个靠谱的流程(也就是队列),那还不乱套了?线程安全队列就是这个流程,保证多个线程能安全、有序地访问共享数据,避免出现数据损坏、死锁等幺蛾子。
今天咱们主要讲两种实现方式:有锁队列和无锁队列。有锁队列就像煎饼摊的阿姨明确规定:“下一个!下一个!”,保证同一时间只有一个线程能操作队列。无锁队列就像阿姨练就了眼观六路耳听八方的神功,不用排队也能高效地处理所有订单。
一、有锁队列:简单粗暴,稳定可靠
有锁队列的思路很简单:加锁!就像煎饼摊阿姨喊号一样,保证同一时间只有一个线程能操作队列。C++里常用的锁就是std::mutex
。
1.1 基本结构
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class ThreadSafeQueue {
private:
std::queue<T> queue_; // 内部队列
std::mutex mutex_; // 互斥锁,保护队列的访问
std::condition_variable cv_; // 条件变量,用于线程间的同步
public:
ThreadSafeQueue() = default;
~ThreadSafeQueue() = default;
// 入队
void enqueue(T value) {
std::lock_guard<std::mutex> lock(mutex_); // RAII 锁,自动加锁解锁
queue_.push(value);
cv_.notify_one(); // 通知等待的线程
}
// 出队
T dequeue() {
std::unique_lock<std::mutex> lock(mutex_); // unique_lock 更灵活
cv_.wait(lock, [this]{ return !queue_.empty(); }); // 等待队列非空
T value = queue_.front();
queue_.pop();
return value;
}
// 尝试出队(非阻塞)
bool try_dequeue(T& value) {
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
return true;
}
// 判空
bool empty() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
// 大小
size_t size() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
};
代码解释:
std::queue<T> queue_
: 这是队列的核心,用来存储数据。std::mutex mutex_
: 互斥锁,保证对队列的互斥访问。std::condition_variable cv_
: 条件变量,用于在队列为空时,让消费者线程等待。
1.2 入队 (enqueue
)
void enqueue(T value) {
std::lock_guard<std::mutex> lock(mutex_); // RAII 锁
queue_.push(value);
cv_.notify_one(); // 通知等待的线程
}
std::lock_guard<std::mutex> lock(mutex_)
: 这是一种 RAII (Resource Acquisition Is Initialization) 风格的锁,在构造时自动加锁,析构时自动解锁,避免忘记解锁导致死锁。queue_.push(value)
: 将数据放入队列。cv_.notify_one()
: 通知一个等待在条件变量上的线程,队列里有新数据了,可以起来干活了。
1.3 出队 (dequeue
)
T dequeue() {
std::unique_lock<std::mutex> lock(mutex_); // unique_lock 更灵活
cv_.wait(lock, [this]{ return !queue_.empty(); }); // 等待队列非空
T value = queue_.front();
queue_.pop();
return value;
}
std::unique_lock<std::mutex> lock(mutex_)
:unique_lock
比lock_guard
更灵活,可以手动加锁和解锁,也可以传递给条件变量的wait
函数。cv_.wait(lock, [this]{ return !queue_.empty(); })
: 这是最重要的部分。线程会一直等待,直到队列非空。wait
函数会原子地释放锁,并让线程进入睡眠状态。当被notify_one
或notify_all
唤醒时,它会重新获取锁,并检查条件是否满足(queue_.empty()
是否为false)。如果条件不满足,它会再次释放锁并进入睡眠状态。queue_.front()
: 获取队首元素。queue_.pop()
: 移除队首元素。
1.4 尝试出队 (try_dequeue
)
bool try_dequeue(T& value) {
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
return true;
}
- 这个函数是非阻塞的,如果队列为空,立即返回
false
,不会等待。
1.5 判空 (empty
) 和 大小 (size
)
bool empty() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
size_t size() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
- 这两个函数都需要加锁,保证在多线程环境下获取的队列状态是正确的。
1.6 有锁队列的优缺点
特点 | 优点 | 缺点 |
---|---|---|
实现难度 | 简单 | 性能较低,锁竞争会导致线程阻塞,降低吞吐量 |
线程安全 | 绝对安全,互斥锁保证了数据的一致性 | 可能出现死锁(如果锁的使用不当) |
适用场景 | 数据量不大,对性能要求不高的场景,或者对线程安全要求极高的场景 | 高并发、高性能要求的场景 |
二、无锁队列:风驰电掣,极致性能
无锁队列就像煎饼摊阿姨不用排队系统,直接根据经验和眼力见儿来处理订单。它使用原子操作(Atomic Operations)来实现线程安全,避免了锁的开销。
2.1 原子操作
在深入无锁队列之前,先了解一下原子操作。原子操作是不可分割的操作,要么全部执行,要么完全不执行。C++11 提供了 std::atomic
模板类,用于实现原子操作。
常用的原子操作有:
load()
: 原子地读取一个值。store()
: 原子地写入一个值。exchange()
: 原子地交换一个值。compare_exchange_weak()
和compare_exchange_strong()
: 原子地比较并交换一个值。
2.2 无锁队列的实现思路
无锁队列的实现通常基于循环队列,并使用原子操作来管理队列的头尾指针。
常见的无锁队列实现方式有:
- 单生产者单消费者 (SPSC) 队列: 只有一个生产者线程和一个消费者线程。
- 多生产者多消费者 (MPSC) 队列: 多个生产者线程和多个消费者线程。
SPSC 队列相对简单,MPSC 队列更复杂,需要解决更多的并发问题。
2.3 SPSC 无锁队列
#include <atomic>
#include <cstdint>
#include <memory>
#include <stdexcept>
template <typename T, size_t Capacity>
class SPSCLockFreeQueue {
private:
std::unique_ptr<T[]> buffer_;
std::atomic<uint64_t> head_{0}; // 读指针
std::atomic<uint64_t> tail_{0}; // 写指针
static constexpr size_t kCapacity = Capacity + 1; // 环形buffer,需要多一个位置来区分空和满
public:
SPSCLockFreeQueue() : buffer_(std::make_unique<T[]>(kCapacity)) {}
~SPSCLockFreeQueue() = default;
// 入队
bool enqueue(const T& value) {
uint64_t current_tail = tail_.load(std::memory_order_relaxed);
uint64_t next_tail = (current_tail + 1) % kCapacity;
// 队列已满
if (next_tail == head_.load(std::memory_order_acquire)) {
return false;
}
buffer_[current_tail] = value;
tail_.store(next_tail, std::memory_order_release);
return true;
}
// 出队
bool dequeue(T& value) {
uint64_t current_head = head_.load(std::memory_order_relaxed);
// 队列为空
if (current_head == tail_.load(std::memory_order_acquire)) {
return false;
}
value = buffer_[current_head];
uint64_t next_head = (current_head + 1) % kCapacity;
head_.store(next_head, std::memory_order_release);
return true;
}
bool empty() const {
return head_.load(std::memory_order_acquire) == tail_.load(std::memory_order_acquire);
}
size_t size() const {
uint64_t head = head_.load(std::memory_order_acquire);
uint64_t tail = tail_.load(std::memory_order_acquire);
if(tail >= head) {
return tail - head;
} else {
return tail + kCapacity - head;
}
}
};
代码解释:
std::unique_ptr<T[]> buffer_
: 循环缓冲区,用于存储数据。std::atomic<uint64_t> head_
: 读指针,指向队首元素。std::atomic<uint64_t> tail_
: 写指针,指向队尾元素的下一个位置。kCapacity
: 缓冲区的大小,需要比实际容量大 1,用于区分队列空和满的状态。
2.4 入队 (enqueue
)
bool enqueue(const T& value) {
uint64_t current_tail = tail_.load(std::memory_order_relaxed);
uint64_t next_tail = (current_tail + 1) % kCapacity;
// 队列已满
if (next_tail == head_.load(std::memory_order_acquire)) {
return false;
}
buffer_[current_tail] = value;
tail_.store(next_tail, std::memory_order_release);
return true;
}
tail_.load(std::memory_order_relaxed)
: 原子地读取写指针的值。std::memory_order_relaxed
表示宽松的内存顺序,只保证原子性,不保证线程间的同步。next_tail = (current_tail + 1) % kCapacity
: 计算下一个写指针的位置。head_.load(std::memory_order_acquire)
: 原子地读取读指针的值。std::memory_order_acquire
表示获取内存屏障,保证在读取读指针之前,所有之前的写操作都对当前线程可见。buffer_[current_tail] = value
: 将数据写入缓冲区。tail_.store(next_tail, std::memory_order_release)
: 原子地更新写指针的值。std::memory_order_release
表示释放内存屏障,保证在更新写指针之后,所有之前的写操作都对其他线程可见。
2.5 出队 (dequeue
)
bool dequeue(T& value) {
uint64_t current_head = head_.load(std::memory_order_relaxed);
// 队列为空
if (current_head == tail_.load(std::memory_order_acquire)) {
return false;
}
value = buffer_[current_head];
uint64_t next_head = (current_head + 1) % kCapacity;
head_.store(next_head, std::memory_order_release);
return true;
}
head_.load(std::memory_order_relaxed)
: 原子地读取读指针的值。tail_.load(std::memory_order_acquire)
: 原子地读取写指针的值。std::memory_order_acquire
表示获取内存屏障,保证在读取写指针之前,所有之前的写操作都对当前线程可见。value = buffer_[current_head]
: 从缓冲区读取数据。head_.store(next_head, std::memory_order_release)
: 原子地更新读指针的值。std::memory_order_release
表示释放内存屏障,保证在更新读指针之后,所有之前的写操作都对其他线程可见。
2.6 内存顺序 (Memory Order)
无锁队列的实现中,内存顺序非常重要。它决定了线程间的可见性和同步。
常用的内存顺序有:
std::memory_order_relaxed
: 宽松的内存顺序,只保证原子性,不保证线程间的同步。std::memory_order_acquire
: 获取内存屏障,保证在读取之前,所有之前的写操作都对当前线程可见。std::memory_order_release
: 释放内存屏障,保证在写入之后,所有之前的写操作都对其他线程可见。std::memory_order_acq_rel
: 同时具有获取和释放的语义。std::memory_order_seq_cst
: 顺序一致性,最强的内存顺序,保证所有线程以相同的顺序看到所有操作。
选择合适的内存顺序需要仔细考虑,错误的内存顺序可能导致数据竞争和未定义的行为。
2.7 MPSC 无锁队列
MPSC 无锁队列的实现比 SPSC 队列复杂得多,需要解决多个生产者和消费者之间的竞争问题。通常使用 CAS (Compare and Swap) 操作来实现。
由于篇幅限制,这里只给出 MPSC 队列的基本思路,不提供完整的代码实现。
- 使用链表作为底层数据结构,每个节点包含一个数据和一个指向下一个节点的指针。
- 使用原子操作来管理链表的头尾指针。
- 使用 CAS 操作来添加和删除节点。
MPSC 队列的实现需要仔细处理 ABA 问题,并使用内存屏障来保证线程间的同步。
2.8 无锁队列的优缺点
特点 | 优点 | 缺点 |
---|---|---|
实现难度 | 复杂 | 实现难度高,容易出错,需要对原子操作和内存顺序有深入的理解 |
线程安全 | 依赖于原子操作和内存顺序的正确使用 | 容易出现 ABA 问题,需要仔细处理 |
性能 | 极高,避免了锁的开销,吞吐量高 | 在高竞争情况下,CAS 操作可能会失败并重试,导致性能下降 |
适用场景 | 高并发、高性能要求的场景,对延迟敏感的应用,或者需要避免死锁的场景 | 对代码的正确性要求极高,需要进行充分的测试和验证 |
三、总结与选择
到这里,咱们就把有锁队列和无锁队列的基本原理和实现方式都过了一遍。 最后,咱们来总结一下,该如何选择合适的队列:
- 如果对性能要求不高,或者对线程安全要求极高,可以选择有锁队列。 它的实现简单,容易理解,而且可以保证数据的一致性。
- 如果对性能要求极高,或者需要避免死锁,可以选择无锁队列。 但是,无锁队列的实现非常复杂,需要对原子操作和内存顺序有深入的理解,并且需要进行充分的测试和验证。
- 在实际应用中,可以根据具体的需求选择合适的队列。 例如,可以使用有锁队列来处理一些不重要的任务,而使用无锁队列来处理一些关键的任务。
四、代码示例:测试
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
// 使用有锁队列
void test_locked_queue() {
ThreadSafeQueue<int> queue;
std::vector<std::thread> threads;
int num_threads = 4;
int num_items = 100000;
// 生产者线程
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back([&queue, num_items, i]() {
for (int j = i * (num_items / num_threads); j < (i + 1) * (num_items / num_threads); ++j) {
queue.enqueue(j);
}
});
}
// 消费者线程
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back([&queue, num_items, i]() {
int value;
for (int j = i * (num_items / num_threads); j < (i + 1) * (num_items / num_threads); ++j) {
value = queue.dequeue();
}
});
}
for (auto& thread : threads) {
thread.join();
}
std::cout << "Locked Queue Test Finished. Queue Size: " << queue.size() << std::endl;
}
// 使用 SPSC 无锁队列
void test_spsc_queue() {
SPSCLockFreeQueue<int, 1024> queue;
int num_items = 1000000;
int consumed = 0;
std::thread producer([&queue, num_items]() {
for (int i = 0; i < num_items; ++i) {
while (!queue.enqueue(i)); // Spin until enqueue succeeds
}
});
std::thread consumer([&queue, num_items, &consumed]() {
int value;
for (int i = 0; i < num_items; ++i) {
while (!queue.dequeue(value)); // Spin until dequeue succeeds
consumed++;
}
});
producer.join();
consumer.join();
std::cout << "SPSC Queue Test Finished. Consumed: " << consumed << std::endl;
}
int main() {
std::cout << "Starting Locked Queue Test..." << std::endl;
auto start_locked = std::chrono::high_resolution_clock::now();
test_locked_queue();
auto end_locked = std::chrono::high_resolution_clock::now();
auto duration_locked = std::chrono::duration_cast<std::chrono::milliseconds>(end_locked - start_locked);
std::cout << "Locked Queue Test Duration: " << duration_locked.count() << " ms" << std::endl;
std::cout << "nStarting SPSC Queue Test..." << std::endl;
auto start_spsc = std::chrono::high_resolution_clock::now();
test_spsc_queue();
auto end_spsc = std::chrono::high_resolution_clock::now();
auto duration_spsc = std::chrono::duration_cast<std::chrono::milliseconds>(end_spsc - start_spsc);
std::cout << "SPSC Queue Test Duration: " << duration_spsc.count() << " ms" << std::endl;
return 0;
}
这段代码展示了如何使用有锁队列和 SPSC 无锁队列,并通过简单的测试来比较它们的性能。记住,这只是一个简单的示例,实际应用中需要根据具体情况进行调整。
五、总结
今天咱们一起学习了线程安全队列的两种实现方式:有锁队列和无锁队列。 咱们了解了它们的原理、优缺点,以及适用场景。 希望今天的讲解能帮助大家更好地理解并发编程,并在实际工作中灵活运用线程安全队列。 记住,没有银弹,选择最适合你的才是最好的!
感谢大家的观看,下次再见!