各位观众,各位朋友,欢迎来到今天的“C++并发原语与高效发布-订阅模式”讲座!今天咱们不搞虚的,直接上干货,用C++并发原语给你撸一个高性能的事件系统,让你的代码跑得飞起,告别卡顿!
啥是发布-订阅模式?
在咱们开始造轮子之前,先简单聊聊“发布-订阅”模式。这就像你订阅了一个报纸,报社(发布者)一有新消息(事件),就会自动送到你的邮箱(订阅者)。简单来说:
- 发布者 (Publisher): 负责产生事件。
- 订阅者 (Subscriber): 负责接收特定类型的事件。
- 事件 (Event): 包含特定信息的对象。
- 消息队列/事件总线 (Message Queue/Event Bus): 连接发布者和订阅者的中间件。
好处嘛,那就是解耦!发布者和订阅者不需要知道彼此的存在,全靠事件总线这个“媒婆”牵线搭桥。
为啥要用并发原语?
单线程的发布-订阅模式?那还不如用函数指针回调呢!我们追求的是高性能,充分利用多核CPU的优势。所以,并发原语是关键。我们要保证:
- 线程安全: 多个线程同时发布和订阅事件,数据不能乱。
- 高效: 尽可能减少锁的竞争,提高吞吐量。
并发原语选型:
并发原语 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
std::mutex |
简单易用,适用性广。 | 竞争激烈时性能下降明显。 | 少量线程竞争,对性能要求不高的场景。 |
std::shared_mutex |
允许多个读线程同时访问,写线程独占访问。 | 写线程饥饿问题,读线程也可能阻塞。 | 读多写少的场景。 |
std::atomic |
提供原子操作,无需加锁,性能高。 | 只能用于简单的数据类型,不支持复杂的同步操作。 | 计数器、标志位等简单同步场景。 |
std::condition_variable |
允许线程在特定条件下等待,避免忙等待。 | 使用复杂,容易出错。 | 需要等待特定条件满足才能继续执行的场景。 |
无锁数据结构 | 通过CAS (Compare-and-Swap) 等原子操作实现无锁并发,性能极高。 | 实现复杂,调试困难。 | 高并发,对性能要求极高的场景。 |
咱们今天的例子,为了兼顾简单性和性能,主要使用 std::mutex
和 std::shared_mutex
。 对于锁的粒度,采用的是读写锁, 订阅者可以同时进行,发布者独占, 从而提高并发性
代码实现:一个简易的事件系统
好,废话不多说,直接上代码!
#include <iostream>
#include <vector>
#include <string>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <algorithm>
// 事件基类
class Event {
public:
virtual ~Event() = default;
virtual std::string getType() const = 0; // 获取事件类型
};
// 示例事件:消息事件
class MessageEvent : public Event {
public:
MessageEvent(std::string message) : message_(std::move(message)) {}
std::string getType() const override { return "MessageEvent"; }
std::string getMessage() const { return message_; }
private:
std::string message_;
};
// 订阅者接口
class Subscriber {
public:
virtual ~Subscriber() = default;
virtual void onEvent(const std::shared_ptr<Event>& event) = 0;
};
// 事件总线 (发布-订阅中心)
class EventBus {
public:
// 注册订阅者
void subscribe(const std::string& eventType, Subscriber* subscriber) {
std::lock_guard<std::mutex> lock(mutex_);
subscribers_[eventType].push_back(subscriber);
}
// 取消订阅
void unsubscribe(const std::string& eventType, Subscriber* subscriber) {
std::lock_guard<std::mutex> lock(mutex_);
auto& subs = subscribers_[eventType];
subs.erase(std::remove(subs.begin(), subs.end(), subscriber), subs.end());
}
// 发布事件
void publish(std::shared_ptr<Event> event) {
std::shared_lock<std::shared_mutex> lock(shared_mutex_);
const std::string& eventType = event->getType();
auto it = subscribers_.find(eventType);
if (it != subscribers_.end()) {
for (Subscriber* subscriber : it->second) {
subscriber->onEvent(event);
}
}
}
private:
std::mutex mutex_; // 保护 subscribers_ 的互斥锁
std::shared_mutex shared_mutex_; //用于读写锁,允许多个读, 单个写
std::unordered_map<std::string, std::vector<Subscriber*>> subscribers_;
};
// 示例订阅者
class MessageSubscriber : public Subscriber {
public:
void onEvent(const std::shared_ptr<Event>& event) override {
if (event->getType() == "MessageEvent") {
auto messageEvent = std::dynamic_pointer_cast<MessageEvent>(event);
std::cout << "MessageSubscriber received: " << messageEvent->getMessage() << std::endl;
}
}
};
int main() {
EventBus eventBus;
// 创建订阅者
MessageSubscriber subscriber1;
MessageSubscriber subscriber2;
// 订阅事件
eventBus.subscribe("MessageEvent", &subscriber1);
eventBus.subscribe("MessageEvent", &subscriber2);
// 发布事件
auto event1 = std::make_shared<MessageEvent>("Hello, world!");
eventBus.publish(event1);
auto event2 = std::make_shared<MessageEvent>("Goodbye, world!");
eventBus.publish(event2);
// 取消订阅
eventBus.unsubscribe("MessageEvent", &subscriber1);
// 再次发布事件
auto event3 = std::make_shared<MessageEvent>("Only subscriber2 will receive this.");
eventBus.publish(event3);
return 0;
}
代码讲解:
Event
类: 所有事件的基类,getType()
方法用于获取事件类型。MessageEvent
类: 一个简单的消息事件,包含一个字符串消息。Subscriber
类: 订阅者接口,onEvent()
方法用于处理接收到的事件。MessageSubscriber
类: 一个简单的订阅者,接收MessageEvent
并打印消息。EventBus
类: 事件总线,负责管理订阅者和发布事件。subscribe()
方法:注册订阅者,将订阅者添加到对应事件类型的订阅者列表中。unsubscribe()
方法:取消订阅,从订阅者列表中移除订阅者。publish()
方法:发布事件,遍历对应事件类型的订阅者列表,调用每个订阅者的onEvent()
方法。
main()
函数: 创建事件总线、订阅者,订阅事件,发布事件,取消订阅,再次发布事件。
并发安全分析:
mutex_
: 用于保护subscribers_
成员变量,保证在注册、取消订阅时线程安全。使用场景是写操作,加锁保护写操作shared_mutex_
: 用于保护subscribers_
成员变量,保证在发布事件时线程安全。 使用场景是读操作,多个线程可以同时读取订阅者列表,但只有一个线程可以写入。
进一步优化:
上面的代码只是一个简单的示例,还有很多地方可以优化。
- 线程池:
publish()
方法中直接调用订阅者的onEvent()
方法,如果onEvent()
方法耗时较长,会阻塞发布线程。可以使用线程池来异步执行onEvent()
方法,提高吞吐量。
#include <thread>
#include <future>
class EventBus {
public:
// ... (之前的代码)
// 发布事件 (使用线程池)
void publish(std::shared_ptr<Event> event) {
std::shared_lock<std::shared_mutex> lock(shared_mutex_);
const std::string& eventType = event->getType();
auto it = subscribers_.find(eventType);
if (it != subscribers_.end()) {
for (Subscriber* subscriber : it->second) {
// 使用线程池异步执行 onEvent()
thread_pool_.enqueue([subscriber, event]() {
subscriber->onEvent(event);
});
}
}
}
private:
// ... (之前的代码)
//简易线程池
class ThreadPool {
public:
ThreadPool(size_t num_threads) : stop_(false) {
threads_.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i) {
threads_.emplace_back([this]() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_.wait(lock, [this]() { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for (std::thread& thread : threads_) {
thread.join();
}
}
template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex_);
// don't allow enqueueing after stopping the pool
if (stop_)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks_.emplace([task]() { (*task)(); });
}
condition_.notify_one();
return res;
}
private:
std::vector<std::thread> threads_;
std::queue<std::function<void()>> tasks_;
std::mutex queue_mutex_;
std::condition_variable condition_;
bool stop_;
};
ThreadPool thread_pool_{4}; // 创建一个包含 4 个线程的线程池
};
- 无锁数据结构: 对于高并发的场景,可以考虑使用无锁数据结构,例如无锁队列,来存储订阅者列表。但这会增加实现的复杂度。
- 事件过滤: 可以为订阅者添加事件过滤功能,只接收满足特定条件的事件。
- 事件优先级: 可以为事件设置优先级,优先级高的事件优先处理。
- 更灵活的订阅方式: 不仅仅根据事件类型订阅,还可以根据事件内容进行订阅。
- 更强的错误处理机制: 当订阅者处理事件失败时,可以进行重试或者记录日志。
代码示例:事件过滤
// 订阅者接口 (添加事件过滤器)
class Subscriber {
public:
virtual ~Subscriber() = default;
virtual void onEvent(const std::shared_ptr<Event>& event) = 0;
virtual bool accept(const std::shared_ptr<Event>& event) const { return true; } // 默认接受所有事件
};
// 事件总线 (添加事件过滤)
class EventBus {
public:
// 发布事件 (添加事件过滤)
void publish(std::shared_ptr<Event> event) {
std::shared_lock<std::shared_mutex> lock(shared_mutex_);
const std::string& eventType = event->getType();
auto it = subscribers_.find(eventType);
if (it != subscribers_.end()) {
for (Subscriber* subscriber : it->second) {
if (subscriber->accept(event)) { // 添加事件过滤
subscriber->onEvent(event);
}
}
}
}
// 注册订阅者
void subscribe(const std::string& eventType, Subscriber* subscriber) {
std::lock_guard<std::mutex> lock(mutex_);
subscribers_[eventType].push_back(subscriber);
}
// 取消订阅
void unsubscribe(const std::string& eventType, Subscriber* subscriber) {
std::lock_guard<std::mutex> lock(mutex_);
auto& subs = subscribers_[eventType];
subs.erase(std::remove(subs.begin(), subs.end(), subscriber), subs.end());
}
private:
std::mutex mutex_; // 保护 subscribers_ 的互斥锁
std::shared_mutex shared_mutex_; //用于读写锁,允许多个读, 单个写
std::unordered_map<std::string, std::vector<Subscriber*>> subscribers_;
};
// 示例订阅者 (添加事件过滤器)
class FilteredMessageSubscriber : public Subscriber {
public:
FilteredMessageSubscriber(std::string keyword) : keyword_(std::move(keyword)) {}
void onEvent(const std::shared_ptr<Event>& event) override {
if (event->getType() == "MessageEvent") {
auto messageEvent = std::dynamic_pointer_cast<MessageEvent>(event);
std::cout << "FilteredMessageSubscriber received: " << messageEvent->getMessage() << std::endl;
}
}
bool accept(const std::shared_ptr<Event>& event) const override {
if (event->getType() == "MessageEvent") {
auto messageEvent = std::dynamic_pointer_cast<MessageEvent>(event);
return messageEvent->getMessage().find(keyword_) != std::string::npos; // 只接受包含关键字的消息
}
return false;
}
private:
std::string keyword_;
};
int main() {
EventBus eventBus;
// 创建订阅者 (添加事件过滤器)
FilteredMessageSubscriber subscriber("important");
// 订阅事件
eventBus.subscribe("MessageEvent", &subscriber);
// 发布事件
auto event1 = std::make_shared<MessageEvent>("This is an important message.");
eventBus.publish(event1);
auto event2 = std::make_shared<MessageEvent>("This is a normal message.");
eventBus.publish(event2);
return 0;
}
总结:
今天我们一起用C++并发原语实现了一个简单的事件系统。虽然只是一个雏形,但已经具备了发布、订阅、取消订阅等基本功能,并且考虑了线程安全。记住,并发编程的重点在于:
- 选择合适的并发原语: 根据实际场景选择最合适的并发原语,避免过度使用锁。
- 控制锁的粒度: 尽可能缩小锁的范围,减少锁的竞争。
- 避免死锁: 注意锁的获取顺序,避免死锁。
希望今天的讲座对你有所帮助。记住,编程是一门实践的艺术,多写代码,多踩坑,才能真正掌握并发编程的精髓。 下课!