C++ `std::condition_variable`:条件变量的原理与生产者-消费者模型

好的,没问题。让我们开始一场关于 C++ std::condition_variable 的技术讲座吧!

C++ std::condition_variable:条件变量的原理与生产者-消费者模型

大家好!欢迎来到今天的“并发编程小课堂”。今天我们要聊的是一个非常重要,但也经常让人挠头的家伙:C++ 的 std::condition_variable。别怕,我会尽量用大白话把它讲清楚,再结合经典的生产者-消费者模型,让大家彻底理解它的作用和用法。

一、什么是条件变量?(Condition Variable:等等党的福音)

想象一下,你是一位等待通知的员工。你的老板(一个线程)告诉你:“任务还没准备好,你先等着,准备好了我会通知你。” 你就只能乖乖地坐在一旁,啥也干不了,直到老板跑过来拍拍你的肩膀说:“活来了,开工!”

std::condition_variable 就像这个“老板拍肩膀”的机制。它允许一个或多个线程 等待 某个条件变为真,然后 被另一个线程唤醒

核心概念:

  • 等待 (Wait): 线程主动放弃 CPU,进入休眠状态,等待被唤醒。
  • 唤醒 (Notify): 另一个线程改变了条件,并通知等待的线程。
  • 条件 (Condition): 指的是某个布尔表达式,只有当它为真时,等待的线程才应该继续执行。

为什么要用条件变量?

你可能会问:“我用 while 循环轮询检查条件不行吗?” 当然可以,但那太傻了!

  • 浪费 CPU 资源: 轮询会占用 CPU 时间,即使条件没有变化。
  • 效率低下: 轮询的频率难以控制,频率太低可能错过通知,频率太高则浪费资源。

条件变量可以精确地在条件满足时唤醒线程,避免了不必要的 CPU 消耗,提高了程序的效率。

二、std::condition_variable 的基本用法(三板斧)

std::condition_variable 经常和 std::mutex 一起使用,以保证线程安全。它主要有三个方法:

  • wait(std::unique_lock<std::mutex>& lock): 等待直到被其他线程唤醒。
  • notify_one(): 唤醒一个等待的线程。
  • notify_all(): 唤醒所有等待的线程。

使用步骤:

  1. 获取互斥锁 (Lock): 使用 std::unique_lock<std::mutex> 保护共享数据。
  2. 检查条件 (Check Condition): 检查条件是否满足,如果满足则继续执行,否则进入等待。
  3. 等待 (Wait): 调用 condition_variable.wait(lock) 释放互斥锁并进入休眠状态。
  4. 唤醒 (Notify): 当条件变为真时,另一个线程获取互斥锁,修改共享数据,然后调用 condition_variable.notify_one()condition_variable.notify_all() 唤醒等待的线程。
  5. 重新检查条件 (Re-check Condition): 被唤醒的线程重新获取互斥锁,并再次检查条件是否真的满足,因为可能存在虚假唤醒 (spurious wakeup)。

代码示例:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv;
bool data_ready = false;

void worker_thread() {
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, []{ return data_ready; }); // 等待 data_ready 变为 true
    std::cout << "Worker thread is processing data.n";
}

void signal_thread() {
    std::this_thread::sleep_for(std::chrono::seconds(2));
    {
        std::lock_guard<std::mutex> lock(mtx);
        data_ready = true;
        std::cout << "Signal thread is notifying worker thread.n";
    }
    cv.notify_one(); // 唤醒一个等待的线程
}

int main() {
    std::thread worker(worker_thread);
    std::thread signal(signal_thread);

    worker.join();
    signal.join();

    return 0;
}

代码解释:

  • worker_thread 函数:先获取互斥锁,然后调用 cv.wait(lock, []{ return data_ready; }) 等待 data_ready 变为 true。注意,这里的 lambda 表达式是 wait 函数的第二个参数,用于检查条件是否满足。
  • signal_thread 函数:休眠 2 秒后,设置 data_readytrue,然后调用 cv.notify_one() 唤醒 worker_thread
  • main 函数:创建并启动两个线程,然后等待它们结束。

三、生产者-消费者模型(Producer-Consumer Pattern:经典案例)

生产者-消费者模型是一个非常经典的并发编程模型,它描述了生产者线程生产数据,消费者线程消费数据的场景。 std::condition_variable 在这个模型中扮演着至关重要的角色。

模型描述:

  • 生产者 (Producer): 生产数据,并将数据放入缓冲区。
  • 消费者 (Consumer): 从缓冲区取出数据,并进行消费。
  • 缓冲区 (Buffer): 用于存储生产者生产的数据,供消费者使用。

问题:

  • 缓冲区为空: 消费者不能从空缓冲区中取出数据,需要等待生产者生产数据。
  • 缓冲区已满: 生产者不能向满缓冲区中放入数据,需要等待消费者消费数据。

解决方案:

使用 std::condition_variable 来解决这个问题。

  • 生产者: 当缓冲区满时,等待一个条件变量。当有空闲空间时,唤醒等待的消费者。
  • 消费者: 当缓冲区为空时,等待一个条件变量。当有新的数据时,唤醒等待的生产者。

代码示例:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

std::mutex mtx;
std::condition_variable cv;
std::queue<int> buffer;
const int buffer_size = 10;

void producer() {
    for (int i = 0; i < 20; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, []{ return buffer.size() < buffer_size; }); // 等待缓冲区不满

        buffer.push(i);
        std::cout << "Producer produced: " << i << std::endl;

        cv.notify_one(); // 唤醒一个等待的消费者
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void consumer() {
    for (int i = 0; i < 20; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, []{ return !buffer.empty(); }); // 等待缓冲区不空

        int data = buffer.front();
        buffer.pop();
        std::cout << "Consumer consumed: " << data << std::endl;

        cv.notify_one(); // 唤醒一个等待的生产者
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }
}

int main() {
    std::thread producer_thread(producer);
    std::thread consumer_thread(consumer);

    producer_thread.join();
    consumer_thread.join();

    return 0;
}

代码解释:

  • producer 函数:生成数据并放入 buffer 中,如果 buffer 满了,则等待 cv,直到 buffer 不满。
  • consumer 函数:从 buffer 中取出数据并消费,如果 buffer 空了,则等待 cv,直到 buffer 不空。
  • cv 用于在 producerconsumer 之间同步,确保 producer 不会在 buffer 满时继续生产,consumer 不会在 buffer 空时继续消费。

四、wait 函数的进阶用法(超时等待)

std::condition_variablewait 函数还有一些进阶用法,例如超时等待。

wait_forwait_until

  • wait_for(lock, duration): 等待一段时间,如果在指定的时间内没有被唤醒,则返回。
  • wait_until(lock, time_point): 等待到指定的时间点,如果在指定的时间点之前没有被唤醒,则返回。

代码示例:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>

std::mutex mtx;
std::condition_variable cv;
bool data_ready = false;

void worker_thread() {
    std::unique_lock<std::mutex> lock(mtx);
    if (cv.wait_for(lock, std::chrono::seconds(3), []{ return data_ready; })) {
        std::cout << "Worker thread is processing data.n";
    } else {
        std::cout << "Worker thread timed out waiting for data.n";
    }
}

void signal_thread() {
    std::this_thread::sleep_for(std::chrono::seconds(5)); // 休眠 5 秒
    {
        std::lock_guard<std::mutex> lock(mtx);
        data_ready = true;
        std::cout << "Signal thread is notifying worker thread.n";
    }
    cv.notify_one();
}

int main() {
    std::thread worker(worker_thread);
    std::thread signal(signal_thread);

    worker.join();
    signal.join();

    return 0;
}

代码解释:

  • worker_thread 函数:使用 cv.wait_for(lock, std::chrono::seconds(3), []{ return data_ready; }) 等待 3 秒,如果 3 秒内 data_ready 变为 true,则继续执行,否则输出超时信息。
  • signal_thread 函数:休眠 5 秒后才设置 data_readytrue,因此 worker_thread 会超时。

五、虚假唤醒 (Spurious Wakeup):一个需要注意的坑

虚假唤醒是指线程在没有被 notify_onenotify_all 显式唤醒的情况下,从 wait 函数返回。 这听起来很奇怪,但它确实可能发生,原因在于操作系统的调度机制。

如何处理虚假唤醒?

永远不要假设被唤醒后条件就一定满足了! 必须 在被唤醒后重新检查条件。 这就是为什么 wait 函数通常与一个 lambda 表达式一起使用,这个 lambda 表达式会检查条件是否真的满足。

正确的写法:

std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, []{ return condition; }); // 循环检查条件
// 现在可以安全地访问共享数据

错误的写法:

std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock); // 错误!没有检查条件
// 假设条件已经满足,但实际上可能没有

六、notify_one vs notify_all:如何选择?

  • notify_one(): 唤醒一个等待的线程。 适用于只有一个线程需要处理的情况,例如生产者-消费者模型中,只需要唤醒一个消费者或一个生产者。
  • notify_all(): 唤醒所有等待的线程。 适用于所有等待的线程都需要处理的情况,例如多个线程等待同一个条件变为真。

选择原则:

  • 如果只有一个线程能满足条件,使用 notify_one(),效率更高。
  • 如果有多个线程都能满足条件,使用 notify_all(),确保所有符合条件的线程都能被唤醒。
  • 如果不确定,或者为了避免死锁,也可以使用 notify_all(),但效率可能会降低。

七、总结 (Conclusion)

std::condition_variable 是 C++ 并发编程中一个非常有用的工具,它可以帮助我们实现线程间的同步和通信,避免忙等待,提高程序的效率。 但是,它也需要我们小心使用,避免虚假唤醒和死锁等问题。

核心要点:

  • 始终使用 std::unique_lock<std::mutex> 保护共享数据。
  • wait 函数中使用 lambda 表达式检查条件是否真的满足。
  • 根据实际情况选择 notify_one()notify_all()
  • 理解虚假唤醒并正确处理。

表格总结:

函数 作用 备注
wait(lock) 等待直到被唤醒 释放互斥锁并进入休眠状态,被唤醒后重新获取互斥锁。
wait(lock, predicate) 等待直到 predicate 返回 true 防止虚假唤醒,循环检查条件。
wait_for(lock, duration, predicate) 等待一段时间或直到 predicate 返回 true 超时等待,如果在指定的时间内没有被唤醒,则返回 false
wait_until(lock, time_point, predicate) 等待到指定的时间点或直到 predicate 返回 true 超时等待,如果在指定的时间点之前没有被唤醒,则返回 false
notify_one() 唤醒一个等待的线程
notify_all() 唤醒所有等待的线程

希望今天的讲座对大家有所帮助! 掌握 std::condition_variable,你就能写出更加高效、稳定的并发程序。 下次再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注