C++ `std::counting_semaphore` (C++20):计数信号量的灵活运用

好的,各位观众老爷们,今天咱们来聊聊 C++20 引入的 std::counting_semaphore,这玩意儿可不是你奶奶用的缝纫机,它是一个线程同步的利器,让你在多线程的世界里游刃有余。

什么是信号量?

首先,我们要搞清楚什么是信号量。你可以把它想象成一个停车场管理员,他手里拿着一定数量的停车位钥匙。

  • acquire() (停车): 线程想停车(访问共享资源)的时候,就向管理员要一把钥匙,如果管理员手里还有钥匙,就给它一把,线程就可以停车了。如果管理员手里没钥匙了,线程就只能乖乖等待,直到有车开走(释放钥匙)。
  • release() (还车): 线程停完车(使用完共享资源)后,就把钥匙还给管理员,这样其他等待的线程就可以拿到钥匙停车了。

std::counting_semaphore 就是这个停车场管理员,它管理着一个计数器,这个计数器就代表着可用的资源数量。

std::counting_semaphore 的基本用法

std::counting_semaphore 有两个主要的成员函数:

  • acquire(): 减少计数器,如果计数器为 0,则阻塞当前线程,直到计数器大于 0。
  • release(): 增加计数器,如果有一个或多个线程在等待,则唤醒其中一个线程。

下面是一个简单的例子:

#include <iostream>
#include <thread>
#include <semaphore>

std::counting_semaphore<3> semaphore(3); // 初始化信号量,计数器为 3

void worker(int id) {
    std::cout << "线程 " << id << " 尝试获取资源..." << std::endl;
    semaphore.acquire(); // 获取资源,如果计数器为 0,则阻塞
    std::cout << "线程 " << id << " 获取到资源!" << std::endl;

    // 模拟使用资源
    std::this_thread::sleep_for(std::chrono::seconds(2));

    std::cout << "线程 " << id << " 释放资源!" << std::endl;
    semaphore.release(); // 释放资源,增加计数器
}

int main() {
    std::thread t1(worker, 1);
    std::thread t2(worker, 2);
    std::thread t3(worker, 3);
    std::thread t4(worker, 4);  // 超过信号量初始值的线程

    t1.join();
    t2.join();
    t3.join();
    t4.join();

    std::cout << "所有线程执行完毕!" << std::endl;
    return 0;
}

在这个例子中,我们创建了一个计数器初始值为 3 的 std::counting_semaphore。这意味着最多可以有 3 个线程同时访问共享资源。线程 4 会被阻塞,直到前 3 个线程中的某个线程释放资源。

std::counting_semaphore 的构造函数

std::counting_semaphore 有两种构造函数:

  • counting_semaphore(long long initial_count);: 使用给定的初始计数器值构造信号量。
  • counting_semaphore(long long initial_count, long long max_count);: 使用给定的初始计数器值和最大计数器值构造信号量。 如果只提供一个参数,最大计数器值默认等于 initial_count

例如:

std::counting_semaphore<5> semaphore1(5); // 初始计数器为 5,最大计数器为 5
std::counting_semaphore<10, 20> semaphore2(10); // 初始计数器为 10,最大计数器为 20

try_acquire() 的妙用

有时候,我们不想让线程一直阻塞在那里,而是希望它尝试获取资源,如果获取不到就立即返回。这时候,try_acquire() 就派上用场了。

try_acquire() 尝试减少计数器。如果计数器大于 0,则减少计数器并返回 true。如果计数器为 0,则立即返回 false,不会阻塞线程。

#include <iostream>
#include <thread>
#include <semaphore>

std::counting_semaphore<1> semaphore(1);

void worker(int id) {
    std::cout << "线程 " << id << " 尝试获取资源..." << std::endl;
    if (semaphore.try_acquire()) {
        std::cout << "线程 " << id << " 成功获取到资源!" << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(2));
        std::cout << "线程 " << id << " 释放资源!" << std::endl;
        semaphore.release();
    } else {
        std::cout << "线程 " << id << " 获取资源失败,稍后重试!" << std::endl;
    }
}

int main() {
    std::thread t1(worker, 1);
    std::thread t2(worker, 2);

    t1.join();
    t2.join();

    std::cout << "所有线程执行完毕!" << std::endl;
    return 0;
}

在这个例子中,线程 2 尝试获取资源,但由于线程 1 已经占用了资源,try_acquire() 返回 false,线程 2 不会被阻塞,而是直接执行后面的代码。

try_acquire_for()try_acquire_until() 的超时机制

有时候,我们希望线程在等待资源一段时间后,如果仍然获取不到资源,就放弃等待。try_acquire_for()try_acquire_until() 提供了超时机制。

  • try_acquire_for(duration): 尝试在给定的时间内获取资源。如果在这段时间内获取到资源,则返回 true,否则返回 false
  • try_acquire_until(time_point): 尝试在给定的时间点之前获取资源。如果在该时间点之前获取到资源,则返回 true,否则返回 false
#include <iostream>
#include <thread>
#include <semaphore>
#include <chrono>

std::counting_semaphore<1> semaphore(1);

void worker(int id) {
    std::cout << "线程 " << id << " 尝试获取资源..." << std::endl;
    if (semaphore.try_acquire_for(std::chrono::seconds(1))) {
        std::cout << "线程 " << id << " 成功获取到资源!" << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(2));
        std::cout << "线程 " << id << " 释放资源!" << std::endl;
        semaphore.release();
    } else {
        std::cout << "线程 " << id << " 获取资源超时,放弃等待!" << std::endl;
    }
}

int main() {
    // 模拟线程 1 先占用资源
    std::thread t1([](){
        semaphore.acquire();
        std::this_thread::sleep_for(std::chrono::seconds(3));
        semaphore.release();
    });

    std::thread t2(worker, 2);

    t1.join();
    t2.join();

    std::cout << "所有线程执行完毕!" << std::endl;
    return 0;
}

在这个例子中,线程 1 先获取资源并持有 3 秒。线程 2 尝试在 1 秒内获取资源,但由于资源被线程 1 占用,所以超时了,try_acquire_for() 返回 false

release(n) 一次释放多个资源

release() 函数还有一个重载版本 release(n),可以一次释放多个资源。这在某些场景下非常有用,例如:

#include <iostream>
#include <thread>
#include <semaphore>
#include <vector>

std::counting_semaphore<0, 10> semaphore(0); // 初始计数器为 0,最大计数器为 10
std::vector<int> data;

void producer() {
    for (int i = 0; i < 10; ++i) {
        data.push_back(i);
        std::cout << "生产者生产了数据: " << i << std::endl;
    }
    semaphore.release(10); // 一次释放 10 个资源
}

void consumer() {
    semaphore.acquire(10); // 消费者需要 10 个资源才能开始消费
    std::cout << "消费者开始消费数据..." << std::endl;
    for (int i = 0; i < data.size(); ++i) {
        std::cout << "消费者消费了数据: " << data[i] << std::endl;
    }
}

int main() {
    std::thread t1(producer);
    std::thread t2(consumer);

    t1.join();
    t2.join();

    std::cout << "所有线程执行完毕!" << std::endl;
    return 0;
}

在这个例子中,生产者生产 10 个数据后,一次性释放 10 个资源,然后消费者一次性获取 10 个资源并消费数据。注意这里初始值设为0,最大值设为10。

std::binary_semaphore:二元信号量

std::binary_semaphorestd::counting_semaphore 的一个特例,它的计数器只能是 0 或 1。 它等价于 std::counting_semaphore<1>,但是更高效。 通常用于互斥锁的场景。

#include <iostream>
#include <thread>
#include <semaphore>

std::binary_semaphore semaphore(1); // 初始化信号量,计数器为 1

void worker(int id) {
    std::cout << "线程 " << id << " 尝试获取资源..." << std::endl;
    semaphore.acquire(); // 获取资源,如果计数器为 0,则阻塞
    std::cout << "线程 " << id << " 获取到资源!" << std::endl;

    // 模拟使用资源
    std::this_thread::sleep_for(std::chrono::seconds(2));

    std::cout << "线程 " << id << " 释放资源!" << std::endl;
    semaphore.release(); // 释放资源,增加计数器
}

int main() {
    std::thread t1(worker, 1);
    std::thread t2(worker, 2);

    t1.join();
    t2.join();

    std::cout << "所有线程执行完毕!" << std::endl;
    return 0;
}

std::counting_semaphore 的使用场景

std::counting_semaphore 可以用于以下场景:

  • 限制并发访问数量: 例如,限制同时访问数据库的连接数。
  • 生产者-消费者模型: 生产者生产数据,消费者消费数据,信号量用于控制生产和消费的速度。
  • 资源池: 管理一组可重用的资源,例如线程池。
  • 事件通知: 一个线程等待某个事件发生,另一个线程在事件发生时释放信号量。

std::counting_semaphore vs std::mutex

特性 std::counting_semaphore std::mutex
功能 控制对资源的并发访问数量 提供互斥访问,保证同一时间只有一个线程访问资源
计数器 维护一个计数器,表示可用资源数量 没有计数器,只有锁定和解锁状态
所有权 没有所有权的概念,任何线程都可以 release() 具有所有权的概念,只有锁定它的线程才能 unlock()
使用场景 限制并发访问数量、生产者-消费者模型、资源池等 保护共享数据,防止数据竞争
灵活性 更加灵活,可以控制并发访问的数量,支持超时等待等 更加简单,适用于简单的互斥访问场景

死锁的防范

使用信号量也需要注意死锁问题。死锁是指两个或多个线程互相等待对方释放资源,导致所有线程都无法继续执行的情况。

以下是一些防范死锁的技巧:

  • 避免循环等待: 不要让线程互相等待对方持有的资源。
  • 按相同的顺序获取资源: 如果多个线程需要获取多个资源,确保它们按相同的顺序获取资源。
  • 使用超时机制: 如果线程在等待资源一段时间后仍然获取不到资源,就放弃等待。

实际案例:下载限速器

让我们来一个实际的例子,实现一个下载限速器。假设我们有一个下载程序,需要限制其下载速度,防止占用过多的网络带宽。

#include <iostream>
#include <thread>
#include <semaphore>
#include <chrono>
#include <random>

// 定义每秒允许下载的字节数
const int BYTES_PER_SECOND = 1024 * 1024; // 1MB/s

// 定义信号量,初始值为每秒允许下载的字节数
std::counting_semaphore<BYTES_PER_SECOND> download_semaphore(BYTES_PER_SECOND);

// 模拟下载函数
void download(int bytes_to_download) {
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> distrib(1, 100); // 模拟下载速率波动

    for (int i = 0; i < bytes_to_download; ++i) {
        // 尝试获取一个字节的下载许可
        download_semaphore.acquire();

        // 模拟下载过程
        //std::cout << "."; // 可以打开看效果,但是会刷屏
        std::this_thread::sleep_for(std::chrono::microseconds(distrib(gen))); //模拟下载速率波动
    }
    std::cout << "下载完成,下载大小: " << bytes_to_download << " 字节" << std::endl;
}

int main() {
    // 创建多个线程模拟并发下载
    std::thread t1(download, 5 * 1024 * 1024); // 5MB
    std::thread t2(download, 3 * 1024 * 1024); // 3MB

    t1.join();
    t2.join();

    std::cout << "所有下载任务完成!" << std::endl;

    // 模拟定时释放下载许可,保持下载速度
    while (true) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        download_semaphore.release(BYTES_PER_SECOND);
        //std::cout << "重新填充信号量" << std::endl; // 可以打开看效果
    }

    return 0;
}

这个例子中,我们使用 std::counting_semaphore 来控制下载速度。每秒钟释放一定数量的下载许可,下载线程需要获取下载许可才能下载数据。这样就可以限制下载速度,防止占用过多的网络带宽。需要注意的是,这个例子简化了下载过程,实际的下载程序需要更复杂的逻辑。为了模拟下载过程中的速率波动,这里使用了随机数。

总结

std::counting_semaphore 是一个强大的线程同步工具,可以用于控制并发访问数量、实现生产者-消费者模型、管理资源池等。熟练掌握 std::counting_semaphore 的使用方法,可以让你在多线程编程中更加得心应手。 记住,灵活运用,才能让你的代码更加高效稳定。 希望这篇文章能帮助你更好地理解和使用 std::counting_semaphore

当然,这只是冰山一角,std::counting_semaphore 还有很多高级用法,等待你去探索。希望你能通过本文的学习,掌握 std::counting_semaphore 的基本用法,并在实际项目中灵活运用。 祝你编程愉快!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注