C++ 线程安全队列:手把手实现高效的无锁或有锁队列

各位观众,各位听众,欢迎来到今天的“C++线程安全队列:手把手实现高效的无锁或有锁队列”讲座。我是你们的老朋友,今天就带大家深入浅出地搞定这个并发编程里的重要角色——线程安全队列。

咱先说说,为啥需要线程安全队列?想象一下,你开了个煎饼摊,一个窗口负责擀面,一个窗口负责放料,一个窗口负责收钱。如果没个靠谱的流程(也就是队列),那还不乱套了?线程安全队列就是这个流程,保证多个线程能安全、有序地访问共享数据,避免出现数据损坏、死锁等幺蛾子。

今天咱们主要讲两种实现方式:有锁队列和无锁队列。有锁队列就像煎饼摊的阿姨明确规定:“下一个!下一个!”,保证同一时间只有一个线程能操作队列。无锁队列就像阿姨练就了眼观六路耳听八方的神功,不用排队也能高效地处理所有订单。

一、有锁队列:简单粗暴,稳定可靠

有锁队列的思路很简单:加锁!就像煎饼摊阿姨喊号一样,保证同一时间只有一个线程能操作队列。C++里常用的锁就是std::mutex

1.1 基本结构

#include <queue>
#include <mutex>
#include <condition_variable>

template <typename T>
class ThreadSafeQueue {
private:
    std::queue<T> queue_;  // 内部队列
    std::mutex mutex_;     // 互斥锁,保护队列的访问
    std::condition_variable cv_; // 条件变量,用于线程间的同步

public:
    ThreadSafeQueue() = default;
    ~ThreadSafeQueue() = default;

    // 入队
    void enqueue(T value) {
        std::lock_guard<std::mutex> lock(mutex_); // RAII 锁,自动加锁解锁
        queue_.push(value);
        cv_.notify_one(); // 通知等待的线程
    }

    // 出队
    T dequeue() {
        std::unique_lock<std::mutex> lock(mutex_); // unique_lock 更灵活
        cv_.wait(lock, [this]{ return !queue_.empty(); }); // 等待队列非空
        T value = queue_.front();
        queue_.pop();
        return value;
    }

    // 尝试出队(非阻塞)
    bool try_dequeue(T& value) {
        std::lock_guard<std::mutex> lock(mutex_);
        if (queue_.empty()) {
            return false;
        }
        value = queue_.front();
        queue_.pop();
        return true;
    }

    // 判空
    bool empty() const {
        std::lock_guard<std::mutex> lock(mutex_);
        return queue_.empty();
    }

    // 大小
    size_t size() const {
        std::lock_guard<std::mutex> lock(mutex_);
        return queue_.size();
    }
};

代码解释:

  • std::queue<T> queue_: 这是队列的核心,用来存储数据。
  • std::mutex mutex_: 互斥锁,保证对队列的互斥访问。
  • std::condition_variable cv_: 条件变量,用于在队列为空时,让消费者线程等待。

1.2 入队 (enqueue)

void enqueue(T value) {
    std::lock_guard<std::mutex> lock(mutex_); // RAII 锁
    queue_.push(value);
    cv_.notify_one(); // 通知等待的线程
}
  • std::lock_guard<std::mutex> lock(mutex_): 这是一种 RAII (Resource Acquisition Is Initialization) 风格的锁,在构造时自动加锁,析构时自动解锁,避免忘记解锁导致死锁。
  • queue_.push(value): 将数据放入队列。
  • cv_.notify_one(): 通知一个等待在条件变量上的线程,队列里有新数据了,可以起来干活了。

1.3 出队 (dequeue)

T dequeue() {
    std::unique_lock<std::mutex> lock(mutex_); // unique_lock 更灵活
    cv_.wait(lock, [this]{ return !queue_.empty(); }); // 等待队列非空
    T value = queue_.front();
    queue_.pop();
    return value;
}
  • std::unique_lock<std::mutex> lock(mutex_): unique_locklock_guard 更灵活,可以手动加锁和解锁,也可以传递给条件变量的 wait 函数。
  • cv_.wait(lock, [this]{ return !queue_.empty(); }): 这是最重要的部分。线程会一直等待,直到队列非空。 wait 函数会原子地释放锁,并让线程进入睡眠状态。当被 notify_onenotify_all 唤醒时,它会重新获取锁,并检查条件是否满足(queue_.empty()是否为false)。如果条件不满足,它会再次释放锁并进入睡眠状态。
  • queue_.front(): 获取队首元素。
  • queue_.pop(): 移除队首元素。

1.4 尝试出队 (try_dequeue)

bool try_dequeue(T& value) {
    std::lock_guard<std::mutex> lock(mutex_);
    if (queue_.empty()) {
        return false;
    }
    value = queue_.front();
    queue_.pop();
    return true;
}
  • 这个函数是非阻塞的,如果队列为空,立即返回 false,不会等待。

1.5 判空 (empty) 和 大小 (size)

bool empty() const {
    std::lock_guard<std::mutex> lock(mutex_);
    return queue_.empty();
}

size_t size() const {
    std::lock_guard<std::mutex> lock(mutex_);
    return queue_.size();
}
  • 这两个函数都需要加锁,保证在多线程环境下获取的队列状态是正确的。

1.6 有锁队列的优缺点

特点 优点 缺点
实现难度 简单 性能较低,锁竞争会导致线程阻塞,降低吞吐量
线程安全 绝对安全,互斥锁保证了数据的一致性 可能出现死锁(如果锁的使用不当)
适用场景 数据量不大,对性能要求不高的场景,或者对线程安全要求极高的场景 高并发、高性能要求的场景

二、无锁队列:风驰电掣,极致性能

无锁队列就像煎饼摊阿姨不用排队系统,直接根据经验和眼力见儿来处理订单。它使用原子操作(Atomic Operations)来实现线程安全,避免了锁的开销。

2.1 原子操作

在深入无锁队列之前,先了解一下原子操作。原子操作是不可分割的操作,要么全部执行,要么完全不执行。C++11 提供了 std::atomic 模板类,用于实现原子操作。

常用的原子操作有:

  • load(): 原子地读取一个值。
  • store(): 原子地写入一个值。
  • exchange(): 原子地交换一个值。
  • compare_exchange_weak()compare_exchange_strong(): 原子地比较并交换一个值。

2.2 无锁队列的实现思路

无锁队列的实现通常基于循环队列,并使用原子操作来管理队列的头尾指针。

常见的无锁队列实现方式有:

  • 单生产者单消费者 (SPSC) 队列: 只有一个生产者线程和一个消费者线程。
  • 多生产者多消费者 (MPSC) 队列: 多个生产者线程和多个消费者线程。

SPSC 队列相对简单,MPSC 队列更复杂,需要解决更多的并发问题。

2.3 SPSC 无锁队列

#include <atomic>
#include <cstdint>
#include <memory>
#include <stdexcept>

template <typename T, size_t Capacity>
class SPSCLockFreeQueue {
private:
    std::unique_ptr<T[]> buffer_;
    std::atomic<uint64_t> head_{0};  // 读指针
    std::atomic<uint64_t> tail_{0};  // 写指针
    static constexpr size_t kCapacity = Capacity + 1; // 环形buffer,需要多一个位置来区分空和满

public:
    SPSCLockFreeQueue() : buffer_(std::make_unique<T[]>(kCapacity)) {}
    ~SPSCLockFreeQueue() = default;

    // 入队
    bool enqueue(const T& value) {
        uint64_t current_tail = tail_.load(std::memory_order_relaxed);
        uint64_t next_tail = (current_tail + 1) % kCapacity;

        // 队列已满
        if (next_tail == head_.load(std::memory_order_acquire)) {
            return false;
        }

        buffer_[current_tail] = value;
        tail_.store(next_tail, std::memory_order_release);
        return true;
    }

    // 出队
    bool dequeue(T& value) {
        uint64_t current_head = head_.load(std::memory_order_relaxed);

        // 队列为空
        if (current_head == tail_.load(std::memory_order_acquire)) {
            return false;
        }

        value = buffer_[current_head];
        uint64_t next_head = (current_head + 1) % kCapacity;
        head_.store(next_head, std::memory_order_release);
        return true;
    }

    bool empty() const {
        return head_.load(std::memory_order_acquire) == tail_.load(std::memory_order_acquire);
    }

    size_t size() const {
        uint64_t head = head_.load(std::memory_order_acquire);
        uint64_t tail = tail_.load(std::memory_order_acquire);
        if(tail >= head) {
            return tail - head;
        } else {
            return tail + kCapacity - head;
        }
    }

};

代码解释:

  • std::unique_ptr<T[]> buffer_: 循环缓冲区,用于存储数据。
  • std::atomic<uint64_t> head_: 读指针,指向队首元素。
  • std::atomic<uint64_t> tail_: 写指针,指向队尾元素的下一个位置。
  • kCapacity: 缓冲区的大小,需要比实际容量大 1,用于区分队列空和满的状态。

2.4 入队 (enqueue)

bool enqueue(const T& value) {
    uint64_t current_tail = tail_.load(std::memory_order_relaxed);
    uint64_t next_tail = (current_tail + 1) % kCapacity;

    // 队列已满
    if (next_tail == head_.load(std::memory_order_acquire)) {
        return false;
    }

    buffer_[current_tail] = value;
    tail_.store(next_tail, std::memory_order_release);
    return true;
}
  • tail_.load(std::memory_order_relaxed): 原子地读取写指针的值。std::memory_order_relaxed 表示宽松的内存顺序,只保证原子性,不保证线程间的同步。
  • next_tail = (current_tail + 1) % kCapacity: 计算下一个写指针的位置。
  • head_.load(std::memory_order_acquire): 原子地读取读指针的值。std::memory_order_acquire 表示获取内存屏障,保证在读取读指针之前,所有之前的写操作都对当前线程可见。
  • buffer_[current_tail] = value: 将数据写入缓冲区。
  • tail_.store(next_tail, std::memory_order_release): 原子地更新写指针的值。std::memory_order_release 表示释放内存屏障,保证在更新写指针之后,所有之前的写操作都对其他线程可见。

2.5 出队 (dequeue)

bool dequeue(T& value) {
    uint64_t current_head = head_.load(std::memory_order_relaxed);

    // 队列为空
    if (current_head == tail_.load(std::memory_order_acquire)) {
        return false;
    }

    value = buffer_[current_head];
    uint64_t next_head = (current_head + 1) % kCapacity;
    head_.store(next_head, std::memory_order_release);
    return true;
}
  • head_.load(std::memory_order_relaxed): 原子地读取读指针的值。
  • tail_.load(std::memory_order_acquire): 原子地读取写指针的值。std::memory_order_acquire 表示获取内存屏障,保证在读取写指针之前,所有之前的写操作都对当前线程可见。
  • value = buffer_[current_head]: 从缓冲区读取数据。
  • head_.store(next_head, std::memory_order_release): 原子地更新读指针的值。std::memory_order_release 表示释放内存屏障,保证在更新读指针之后,所有之前的写操作都对其他线程可见。

2.6 内存顺序 (Memory Order)

无锁队列的实现中,内存顺序非常重要。它决定了线程间的可见性和同步。

常用的内存顺序有:

  • std::memory_order_relaxed: 宽松的内存顺序,只保证原子性,不保证线程间的同步。
  • std::memory_order_acquire: 获取内存屏障,保证在读取之前,所有之前的写操作都对当前线程可见。
  • std::memory_order_release: 释放内存屏障,保证在写入之后,所有之前的写操作都对其他线程可见。
  • std::memory_order_acq_rel: 同时具有获取和释放的语义。
  • std::memory_order_seq_cst: 顺序一致性,最强的内存顺序,保证所有线程以相同的顺序看到所有操作。

选择合适的内存顺序需要仔细考虑,错误的内存顺序可能导致数据竞争和未定义的行为。

2.7 MPSC 无锁队列

MPSC 无锁队列的实现比 SPSC 队列复杂得多,需要解决多个生产者和消费者之间的竞争问题。通常使用 CAS (Compare and Swap) 操作来实现。

由于篇幅限制,这里只给出 MPSC 队列的基本思路,不提供完整的代码实现。

  • 使用链表作为底层数据结构,每个节点包含一个数据和一个指向下一个节点的指针。
  • 使用原子操作来管理链表的头尾指针。
  • 使用 CAS 操作来添加和删除节点。

MPSC 队列的实现需要仔细处理 ABA 问题,并使用内存屏障来保证线程间的同步。

2.8 无锁队列的优缺点

特点 优点 缺点
实现难度 复杂 实现难度高,容易出错,需要对原子操作和内存顺序有深入的理解
线程安全 依赖于原子操作和内存顺序的正确使用 容易出现 ABA 问题,需要仔细处理
性能 极高,避免了锁的开销,吞吐量高 在高竞争情况下,CAS 操作可能会失败并重试,导致性能下降
适用场景 高并发、高性能要求的场景,对延迟敏感的应用,或者需要避免死锁的场景 对代码的正确性要求极高,需要进行充分的测试和验证

三、总结与选择

到这里,咱们就把有锁队列和无锁队列的基本原理和实现方式都过了一遍。 最后,咱们来总结一下,该如何选择合适的队列:

  • 如果对性能要求不高,或者对线程安全要求极高,可以选择有锁队列。 它的实现简单,容易理解,而且可以保证数据的一致性。
  • 如果对性能要求极高,或者需要避免死锁,可以选择无锁队列。 但是,无锁队列的实现非常复杂,需要对原子操作和内存顺序有深入的理解,并且需要进行充分的测试和验证。
  • 在实际应用中,可以根据具体的需求选择合适的队列。 例如,可以使用有锁队列来处理一些不重要的任务,而使用无锁队列来处理一些关键的任务。

四、代码示例:测试

#include <iostream>
#include <thread>
#include <vector>
#include <chrono>

// 使用有锁队列
void test_locked_queue() {
    ThreadSafeQueue<int> queue;
    std::vector<std::thread> threads;
    int num_threads = 4;
    int num_items = 100000;

    // 生产者线程
    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back([&queue, num_items, i]() {
            for (int j = i * (num_items / num_threads); j < (i + 1) * (num_items / num_threads); ++j) {
                queue.enqueue(j);
            }
        });
    }

    // 消费者线程
    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back([&queue, num_items, i]() {
            int value;
            for (int j = i * (num_items / num_threads); j < (i + 1) * (num_items / num_threads); ++j) {
                value = queue.dequeue();
            }
        });
    }

    for (auto& thread : threads) {
        thread.join();
    }

    std::cout << "Locked Queue Test Finished. Queue Size: " << queue.size() << std::endl;
}

// 使用 SPSC 无锁队列
void test_spsc_queue() {
    SPSCLockFreeQueue<int, 1024> queue;
    int num_items = 1000000;
    int consumed = 0;

    std::thread producer([&queue, num_items]() {
        for (int i = 0; i < num_items; ++i) {
            while (!queue.enqueue(i)); // Spin until enqueue succeeds
        }
    });

    std::thread consumer([&queue, num_items, &consumed]() {
        int value;
        for (int i = 0; i < num_items; ++i) {
            while (!queue.dequeue(value)); // Spin until dequeue succeeds
            consumed++;
        }
    });

    producer.join();
    consumer.join();

    std::cout << "SPSC Queue Test Finished. Consumed: " << consumed << std::endl;
}

int main() {
    std::cout << "Starting Locked Queue Test..." << std::endl;
    auto start_locked = std::chrono::high_resolution_clock::now();
    test_locked_queue();
    auto end_locked = std::chrono::high_resolution_clock::now();
    auto duration_locked = std::chrono::duration_cast<std::chrono::milliseconds>(end_locked - start_locked);
    std::cout << "Locked Queue Test Duration: " << duration_locked.count() << " ms" << std::endl;

    std::cout << "nStarting SPSC Queue Test..." << std::endl;
    auto start_spsc = std::chrono::high_resolution_clock::now();
    test_spsc_queue();
    auto end_spsc = std::chrono::high_resolution_clock::now();
    auto duration_spsc = std::chrono::duration_cast<std::chrono::milliseconds>(end_spsc - start_spsc);
    std::cout << "SPSC Queue Test Duration: " << duration_spsc.count() << " ms" << std::endl;

    return 0;
}

这段代码展示了如何使用有锁队列和 SPSC 无锁队列,并通过简单的测试来比较它们的性能。记住,这只是一个简单的示例,实际应用中需要根据具体情况进行调整。

五、总结

今天咱们一起学习了线程安全队列的两种实现方式:有锁队列和无锁队列。 咱们了解了它们的原理、优缺点,以及适用场景。 希望今天的讲解能帮助大家更好地理解并发编程,并在实际工作中灵活运用线程安全队列。 记住,没有银弹,选择最适合你的才是最好的!

感谢大家的观看,下次再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注