好的,各位观众老爷们,今天咱们来聊聊 C++20 引入的 std::counting_semaphore
,这玩意儿可不是你奶奶用的缝纫机,它是一个线程同步的利器,让你在多线程的世界里游刃有余。
什么是信号量?
首先,我们要搞清楚什么是信号量。你可以把它想象成一个停车场管理员,他手里拿着一定数量的停车位钥匙。
- acquire() (停车): 线程想停车(访问共享资源)的时候,就向管理员要一把钥匙,如果管理员手里还有钥匙,就给它一把,线程就可以停车了。如果管理员手里没钥匙了,线程就只能乖乖等待,直到有车开走(释放钥匙)。
- release() (还车): 线程停完车(使用完共享资源)后,就把钥匙还给管理员,这样其他等待的线程就可以拿到钥匙停车了。
std::counting_semaphore
就是这个停车场管理员,它管理着一个计数器,这个计数器就代表着可用的资源数量。
std::counting_semaphore
的基本用法
std::counting_semaphore
有两个主要的成员函数:
acquire()
: 减少计数器,如果计数器为 0,则阻塞当前线程,直到计数器大于 0。release()
: 增加计数器,如果有一个或多个线程在等待,则唤醒其中一个线程。
下面是一个简单的例子:
#include <iostream>
#include <thread>
#include <semaphore>
std::counting_semaphore<3> semaphore(3); // 初始化信号量,计数器为 3
void worker(int id) {
std::cout << "线程 " << id << " 尝试获取资源..." << std::endl;
semaphore.acquire(); // 获取资源,如果计数器为 0,则阻塞
std::cout << "线程 " << id << " 获取到资源!" << std::endl;
// 模拟使用资源
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "线程 " << id << " 释放资源!" << std::endl;
semaphore.release(); // 释放资源,增加计数器
}
int main() {
std::thread t1(worker, 1);
std::thread t2(worker, 2);
std::thread t3(worker, 3);
std::thread t4(worker, 4); // 超过信号量初始值的线程
t1.join();
t2.join();
t3.join();
t4.join();
std::cout << "所有线程执行完毕!" << std::endl;
return 0;
}
在这个例子中,我们创建了一个计数器初始值为 3 的 std::counting_semaphore
。这意味着最多可以有 3 个线程同时访问共享资源。线程 4 会被阻塞,直到前 3 个线程中的某个线程释放资源。
std::counting_semaphore
的构造函数
std::counting_semaphore
有两种构造函数:
counting_semaphore(long long initial_count);
: 使用给定的初始计数器值构造信号量。counting_semaphore(long long initial_count, long long max_count);
: 使用给定的初始计数器值和最大计数器值构造信号量。 如果只提供一个参数,最大计数器值默认等于initial_count
。
例如:
std::counting_semaphore<5> semaphore1(5); // 初始计数器为 5,最大计数器为 5
std::counting_semaphore<10, 20> semaphore2(10); // 初始计数器为 10,最大计数器为 20
try_acquire()
的妙用
有时候,我们不想让线程一直阻塞在那里,而是希望它尝试获取资源,如果获取不到就立即返回。这时候,try_acquire()
就派上用场了。
try_acquire()
尝试减少计数器。如果计数器大于 0,则减少计数器并返回 true
。如果计数器为 0,则立即返回 false
,不会阻塞线程。
#include <iostream>
#include <thread>
#include <semaphore>
std::counting_semaphore<1> semaphore(1);
void worker(int id) {
std::cout << "线程 " << id << " 尝试获取资源..." << std::endl;
if (semaphore.try_acquire()) {
std::cout << "线程 " << id << " 成功获取到资源!" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "线程 " << id << " 释放资源!" << std::endl;
semaphore.release();
} else {
std::cout << "线程 " << id << " 获取资源失败,稍后重试!" << std::endl;
}
}
int main() {
std::thread t1(worker, 1);
std::thread t2(worker, 2);
t1.join();
t2.join();
std::cout << "所有线程执行完毕!" << std::endl;
return 0;
}
在这个例子中,线程 2 尝试获取资源,但由于线程 1 已经占用了资源,try_acquire()
返回 false
,线程 2 不会被阻塞,而是直接执行后面的代码。
try_acquire_for()
和 try_acquire_until()
的超时机制
有时候,我们希望线程在等待资源一段时间后,如果仍然获取不到资源,就放弃等待。try_acquire_for()
和 try_acquire_until()
提供了超时机制。
try_acquire_for(duration)
: 尝试在给定的时间内获取资源。如果在这段时间内获取到资源,则返回true
,否则返回false
。try_acquire_until(time_point)
: 尝试在给定的时间点之前获取资源。如果在该时间点之前获取到资源,则返回true
,否则返回false
。
#include <iostream>
#include <thread>
#include <semaphore>
#include <chrono>
std::counting_semaphore<1> semaphore(1);
void worker(int id) {
std::cout << "线程 " << id << " 尝试获取资源..." << std::endl;
if (semaphore.try_acquire_for(std::chrono::seconds(1))) {
std::cout << "线程 " << id << " 成功获取到资源!" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "线程 " << id << " 释放资源!" << std::endl;
semaphore.release();
} else {
std::cout << "线程 " << id << " 获取资源超时,放弃等待!" << std::endl;
}
}
int main() {
// 模拟线程 1 先占用资源
std::thread t1([](){
semaphore.acquire();
std::this_thread::sleep_for(std::chrono::seconds(3));
semaphore.release();
});
std::thread t2(worker, 2);
t1.join();
t2.join();
std::cout << "所有线程执行完毕!" << std::endl;
return 0;
}
在这个例子中,线程 1 先获取资源并持有 3 秒。线程 2 尝试在 1 秒内获取资源,但由于资源被线程 1 占用,所以超时了,try_acquire_for()
返回 false
。
release(n)
一次释放多个资源
release()
函数还有一个重载版本 release(n)
,可以一次释放多个资源。这在某些场景下非常有用,例如:
#include <iostream>
#include <thread>
#include <semaphore>
#include <vector>
std::counting_semaphore<0, 10> semaphore(0); // 初始计数器为 0,最大计数器为 10
std::vector<int> data;
void producer() {
for (int i = 0; i < 10; ++i) {
data.push_back(i);
std::cout << "生产者生产了数据: " << i << std::endl;
}
semaphore.release(10); // 一次释放 10 个资源
}
void consumer() {
semaphore.acquire(10); // 消费者需要 10 个资源才能开始消费
std::cout << "消费者开始消费数据..." << std::endl;
for (int i = 0; i < data.size(); ++i) {
std::cout << "消费者消费了数据: " << data[i] << std::endl;
}
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
std::cout << "所有线程执行完毕!" << std::endl;
return 0;
}
在这个例子中,生产者生产 10 个数据后,一次性释放 10 个资源,然后消费者一次性获取 10 个资源并消费数据。注意这里初始值设为0,最大值设为10。
std::binary_semaphore
:二元信号量
std::binary_semaphore
是 std::counting_semaphore
的一个特例,它的计数器只能是 0 或 1。 它等价于 std::counting_semaphore<1>
,但是更高效。 通常用于互斥锁的场景。
#include <iostream>
#include <thread>
#include <semaphore>
std::binary_semaphore semaphore(1); // 初始化信号量,计数器为 1
void worker(int id) {
std::cout << "线程 " << id << " 尝试获取资源..." << std::endl;
semaphore.acquire(); // 获取资源,如果计数器为 0,则阻塞
std::cout << "线程 " << id << " 获取到资源!" << std::endl;
// 模拟使用资源
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "线程 " << id << " 释放资源!" << std::endl;
semaphore.release(); // 释放资源,增加计数器
}
int main() {
std::thread t1(worker, 1);
std::thread t2(worker, 2);
t1.join();
t2.join();
std::cout << "所有线程执行完毕!" << std::endl;
return 0;
}
std::counting_semaphore
的使用场景
std::counting_semaphore
可以用于以下场景:
- 限制并发访问数量: 例如,限制同时访问数据库的连接数。
- 生产者-消费者模型: 生产者生产数据,消费者消费数据,信号量用于控制生产和消费的速度。
- 资源池: 管理一组可重用的资源,例如线程池。
- 事件通知: 一个线程等待某个事件发生,另一个线程在事件发生时释放信号量。
std::counting_semaphore
vs std::mutex
特性 | std::counting_semaphore |
std::mutex |
---|---|---|
功能 | 控制对资源的并发访问数量 | 提供互斥访问,保证同一时间只有一个线程访问资源 |
计数器 | 维护一个计数器,表示可用资源数量 | 没有计数器,只有锁定和解锁状态 |
所有权 | 没有所有权的概念,任何线程都可以 release() |
具有所有权的概念,只有锁定它的线程才能 unlock() |
使用场景 | 限制并发访问数量、生产者-消费者模型、资源池等 | 保护共享数据,防止数据竞争 |
灵活性 | 更加灵活,可以控制并发访问的数量,支持超时等待等 | 更加简单,适用于简单的互斥访问场景 |
死锁的防范
使用信号量也需要注意死锁问题。死锁是指两个或多个线程互相等待对方释放资源,导致所有线程都无法继续执行的情况。
以下是一些防范死锁的技巧:
- 避免循环等待: 不要让线程互相等待对方持有的资源。
- 按相同的顺序获取资源: 如果多个线程需要获取多个资源,确保它们按相同的顺序获取资源。
- 使用超时机制: 如果线程在等待资源一段时间后仍然获取不到资源,就放弃等待。
实际案例:下载限速器
让我们来一个实际的例子,实现一个下载限速器。假设我们有一个下载程序,需要限制其下载速度,防止占用过多的网络带宽。
#include <iostream>
#include <thread>
#include <semaphore>
#include <chrono>
#include <random>
// 定义每秒允许下载的字节数
const int BYTES_PER_SECOND = 1024 * 1024; // 1MB/s
// 定义信号量,初始值为每秒允许下载的字节数
std::counting_semaphore<BYTES_PER_SECOND> download_semaphore(BYTES_PER_SECOND);
// 模拟下载函数
void download(int bytes_to_download) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distrib(1, 100); // 模拟下载速率波动
for (int i = 0; i < bytes_to_download; ++i) {
// 尝试获取一个字节的下载许可
download_semaphore.acquire();
// 模拟下载过程
//std::cout << "."; // 可以打开看效果,但是会刷屏
std::this_thread::sleep_for(std::chrono::microseconds(distrib(gen))); //模拟下载速率波动
}
std::cout << "下载完成,下载大小: " << bytes_to_download << " 字节" << std::endl;
}
int main() {
// 创建多个线程模拟并发下载
std::thread t1(download, 5 * 1024 * 1024); // 5MB
std::thread t2(download, 3 * 1024 * 1024); // 3MB
t1.join();
t2.join();
std::cout << "所有下载任务完成!" << std::endl;
// 模拟定时释放下载许可,保持下载速度
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
download_semaphore.release(BYTES_PER_SECOND);
//std::cout << "重新填充信号量" << std::endl; // 可以打开看效果
}
return 0;
}
这个例子中,我们使用 std::counting_semaphore
来控制下载速度。每秒钟释放一定数量的下载许可,下载线程需要获取下载许可才能下载数据。这样就可以限制下载速度,防止占用过多的网络带宽。需要注意的是,这个例子简化了下载过程,实际的下载程序需要更复杂的逻辑。为了模拟下载过程中的速率波动,这里使用了随机数。
总结
std::counting_semaphore
是一个强大的线程同步工具,可以用于控制并发访问数量、实现生产者-消费者模型、管理资源池等。熟练掌握 std::counting_semaphore
的使用方法,可以让你在多线程编程中更加得心应手。 记住,灵活运用,才能让你的代码更加高效稳定。 希望这篇文章能帮助你更好地理解和使用 std::counting_semaphore
。
当然,这只是冰山一角,std::counting_semaphore
还有很多高级用法,等待你去探索。希望你能通过本文的学习,掌握 std::counting_semaphore
的基本用法,并在实际项目中灵活运用。 祝你编程愉快!