C++ 发布-订阅模式:基于并发原语实现高效事件系统

各位观众,各位朋友,欢迎来到今天的“C++并发原语与高效发布-订阅模式”讲座!今天咱们不搞虚的,直接上干货,用C++并发原语给你撸一个高性能的事件系统,让你的代码跑得飞起,告别卡顿!

啥是发布-订阅模式?

在咱们开始造轮子之前,先简单聊聊“发布-订阅”模式。这就像你订阅了一个报纸,报社(发布者)一有新消息(事件),就会自动送到你的邮箱(订阅者)。简单来说:

  • 发布者 (Publisher): 负责产生事件。
  • 订阅者 (Subscriber): 负责接收特定类型的事件。
  • 事件 (Event): 包含特定信息的对象。
  • 消息队列/事件总线 (Message Queue/Event Bus): 连接发布者和订阅者的中间件。

好处嘛,那就是解耦!发布者和订阅者不需要知道彼此的存在,全靠事件总线这个“媒婆”牵线搭桥。

为啥要用并发原语?

单线程的发布-订阅模式?那还不如用函数指针回调呢!我们追求的是高性能,充分利用多核CPU的优势。所以,并发原语是关键。我们要保证:

  • 线程安全: 多个线程同时发布和订阅事件,数据不能乱。
  • 高效: 尽可能减少锁的竞争,提高吞吐量。

并发原语选型:

并发原语 优点 缺点 适用场景
std::mutex 简单易用,适用性广。 竞争激烈时性能下降明显。 少量线程竞争,对性能要求不高的场景。
std::shared_mutex 允许多个读线程同时访问,写线程独占访问。 写线程饥饿问题,读线程也可能阻塞。 读多写少的场景。
std::atomic 提供原子操作,无需加锁,性能高。 只能用于简单的数据类型,不支持复杂的同步操作。 计数器、标志位等简单同步场景。
std::condition_variable 允许线程在特定条件下等待,避免忙等待。 使用复杂,容易出错。 需要等待特定条件满足才能继续执行的场景。
无锁数据结构 通过CAS (Compare-and-Swap) 等原子操作实现无锁并发,性能极高。 实现复杂,调试困难。 高并发,对性能要求极高的场景。

咱们今天的例子,为了兼顾简单性和性能,主要使用 std::mutexstd::shared_mutex。 对于锁的粒度,采用的是读写锁, 订阅者可以同时进行,发布者独占, 从而提高并发性

代码实现:一个简易的事件系统

好,废话不多说,直接上代码!

#include <iostream>
#include <vector>
#include <string>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <algorithm>

// 事件基类
class Event {
public:
    virtual ~Event() = default;
    virtual std::string getType() const = 0; // 获取事件类型
};

// 示例事件:消息事件
class MessageEvent : public Event {
public:
    MessageEvent(std::string message) : message_(std::move(message)) {}

    std::string getType() const override { return "MessageEvent"; }
    std::string getMessage() const { return message_; }

private:
    std::string message_;
};

// 订阅者接口
class Subscriber {
public:
    virtual ~Subscriber() = default;
    virtual void onEvent(const std::shared_ptr<Event>& event) = 0;
};

// 事件总线 (发布-订阅中心)
class EventBus {
public:
    // 注册订阅者
    void subscribe(const std::string& eventType, Subscriber* subscriber) {
        std::lock_guard<std::mutex> lock(mutex_);
        subscribers_[eventType].push_back(subscriber);
    }

    // 取消订阅
    void unsubscribe(const std::string& eventType, Subscriber* subscriber) {
        std::lock_guard<std::mutex> lock(mutex_);
        auto& subs = subscribers_[eventType];
        subs.erase(std::remove(subs.begin(), subs.end(), subscriber), subs.end());
    }

    // 发布事件
    void publish(std::shared_ptr<Event> event) {
        std::shared_lock<std::shared_mutex> lock(shared_mutex_);
        const std::string& eventType = event->getType();
        auto it = subscribers_.find(eventType);
        if (it != subscribers_.end()) {
            for (Subscriber* subscriber : it->second) {
                subscriber->onEvent(event);
            }
        }
    }

private:
    std::mutex mutex_; // 保护 subscribers_ 的互斥锁
    std::shared_mutex shared_mutex_; //用于读写锁,允许多个读, 单个写
    std::unordered_map<std::string, std::vector<Subscriber*>> subscribers_;
};

// 示例订阅者
class MessageSubscriber : public Subscriber {
public:
    void onEvent(const std::shared_ptr<Event>& event) override {
        if (event->getType() == "MessageEvent") {
            auto messageEvent = std::dynamic_pointer_cast<MessageEvent>(event);
            std::cout << "MessageSubscriber received: " << messageEvent->getMessage() << std::endl;
        }
    }
};

int main() {
    EventBus eventBus;

    // 创建订阅者
    MessageSubscriber subscriber1;
    MessageSubscriber subscriber2;

    // 订阅事件
    eventBus.subscribe("MessageEvent", &subscriber1);
    eventBus.subscribe("MessageEvent", &subscriber2);

    // 发布事件
    auto event1 = std::make_shared<MessageEvent>("Hello, world!");
    eventBus.publish(event1);

    auto event2 = std::make_shared<MessageEvent>("Goodbye, world!");
    eventBus.publish(event2);

    // 取消订阅
    eventBus.unsubscribe("MessageEvent", &subscriber1);

    // 再次发布事件
    auto event3 = std::make_shared<MessageEvent>("Only subscriber2 will receive this.");
    eventBus.publish(event3);

    return 0;
}

代码讲解:

  1. Event 类: 所有事件的基类,getType() 方法用于获取事件类型。
  2. MessageEvent 类: 一个简单的消息事件,包含一个字符串消息。
  3. Subscriber 类: 订阅者接口,onEvent() 方法用于处理接收到的事件。
  4. MessageSubscriber 类: 一个简单的订阅者,接收 MessageEvent 并打印消息。
  5. EventBus 类: 事件总线,负责管理订阅者和发布事件。
    • subscribe() 方法:注册订阅者,将订阅者添加到对应事件类型的订阅者列表中。
    • unsubscribe() 方法:取消订阅,从订阅者列表中移除订阅者。
    • publish() 方法:发布事件,遍历对应事件类型的订阅者列表,调用每个订阅者的 onEvent() 方法。
  6. main() 函数: 创建事件总线、订阅者,订阅事件,发布事件,取消订阅,再次发布事件。

并发安全分析:

  • mutex_: 用于保护 subscribers_ 成员变量,保证在注册、取消订阅时线程安全。使用场景是写操作,加锁保护写操作
  • shared_mutex_: 用于保护 subscribers_ 成员变量,保证在发布事件时线程安全。 使用场景是读操作,多个线程可以同时读取订阅者列表,但只有一个线程可以写入。

进一步优化:

上面的代码只是一个简单的示例,还有很多地方可以优化。

  1. 线程池: publish() 方法中直接调用订阅者的 onEvent() 方法,如果 onEvent() 方法耗时较长,会阻塞发布线程。可以使用线程池来异步执行 onEvent() 方法,提高吞吐量。
#include <thread>
#include <future>

class EventBus {
public:
    // ... (之前的代码)

    // 发布事件 (使用线程池)
    void publish(std::shared_ptr<Event> event) {
        std::shared_lock<std::shared_mutex> lock(shared_mutex_);
        const std::string& eventType = event->getType();
        auto it = subscribers_.find(eventType);
        if (it != subscribers_.end()) {
            for (Subscriber* subscriber : it->second) {
                // 使用线程池异步执行 onEvent()
                thread_pool_.enqueue([subscriber, event]() {
                    subscriber->onEvent(event);
                });
            }
        }
    }

private:
    // ... (之前的代码)
    //简易线程池
    class ThreadPool {
    public:
        ThreadPool(size_t num_threads) : stop_(false) {
            threads_.reserve(num_threads);
            for (size_t i = 0; i < num_threads; ++i) {
                threads_.emplace_back([this]() {
                    while (true) {
                        std::function<void()> task;

                        {
                            std::unique_lock<std::mutex> lock(queue_mutex_);
                            condition_.wait(lock, [this]() { return stop_ || !tasks_.empty(); });
                            if (stop_ && tasks_.empty()) return;
                            task = std::move(tasks_.front());
                            tasks_.pop();
                        }

                        task();
                    }
                });
            }
        }

        ~ThreadPool() {
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);
                stop_ = true;
            }
            condition_.notify_all();
            for (std::thread& thread : threads_) {
                thread.join();
            }
        }

        template<typename F, typename... Args>
        auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
            using return_type = typename std::result_of<F(Args...)>::type;

            auto task = std::make_shared<std::packaged_task<return_type()>>(
                std::bind(std::forward<F>(f), std::forward<Args>(args)...)
                );

            std::future<return_type> res = task->get_future();
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);

                // don't allow enqueueing after stopping the pool
                if (stop_)
                    throw std::runtime_error("enqueue on stopped ThreadPool");

                tasks_.emplace([task]() { (*task)(); });
            }
            condition_.notify_one();
            return res;
        }

    private:
        std::vector<std::thread> threads_;
        std::queue<std::function<void()>> tasks_;
        std::mutex queue_mutex_;
        std::condition_variable condition_;
        bool stop_;
    };

    ThreadPool thread_pool_{4}; // 创建一个包含 4 个线程的线程池
};
  1. 无锁数据结构: 对于高并发的场景,可以考虑使用无锁数据结构,例如无锁队列,来存储订阅者列表。但这会增加实现的复杂度。
  2. 事件过滤: 可以为订阅者添加事件过滤功能,只接收满足特定条件的事件。
  3. 事件优先级: 可以为事件设置优先级,优先级高的事件优先处理。
  4. 更灵活的订阅方式: 不仅仅根据事件类型订阅,还可以根据事件内容进行订阅。
  5. 更强的错误处理机制: 当订阅者处理事件失败时,可以进行重试或者记录日志。

代码示例:事件过滤

// 订阅者接口 (添加事件过滤器)
class Subscriber {
public:
    virtual ~Subscriber() = default;
    virtual void onEvent(const std::shared_ptr<Event>& event) = 0;
    virtual bool accept(const std::shared_ptr<Event>& event) const { return true; } // 默认接受所有事件
};

// 事件总线 (添加事件过滤)
class EventBus {
public:
    // 发布事件 (添加事件过滤)
    void publish(std::shared_ptr<Event> event) {
        std::shared_lock<std::shared_mutex> lock(shared_mutex_);
        const std::string& eventType = event->getType();
        auto it = subscribers_.find(eventType);
        if (it != subscribers_.end()) {
            for (Subscriber* subscriber : it->second) {
                if (subscriber->accept(event)) { // 添加事件过滤
                    subscriber->onEvent(event);
                }
            }
        }
    }
  // 注册订阅者
    void subscribe(const std::string& eventType, Subscriber* subscriber) {
        std::lock_guard<std::mutex> lock(mutex_);
        subscribers_[eventType].push_back(subscriber);
    }

    // 取消订阅
    void unsubscribe(const std::string& eventType, Subscriber* subscriber) {
        std::lock_guard<std::mutex> lock(mutex_);
        auto& subs = subscribers_[eventType];
        subs.erase(std::remove(subs.begin(), subs.end(), subscriber), subs.end());
    }
private:
    std::mutex mutex_; // 保护 subscribers_ 的互斥锁
    std::shared_mutex shared_mutex_; //用于读写锁,允许多个读, 单个写
    std::unordered_map<std::string, std::vector<Subscriber*>> subscribers_;
};

// 示例订阅者 (添加事件过滤器)
class FilteredMessageSubscriber : public Subscriber {
public:
    FilteredMessageSubscriber(std::string keyword) : keyword_(std::move(keyword)) {}

    void onEvent(const std::shared_ptr<Event>& event) override {
        if (event->getType() == "MessageEvent") {
            auto messageEvent = std::dynamic_pointer_cast<MessageEvent>(event);
            std::cout << "FilteredMessageSubscriber received: " << messageEvent->getMessage() << std::endl;
        }
    }

    bool accept(const std::shared_ptr<Event>& event) const override {
        if (event->getType() == "MessageEvent") {
            auto messageEvent = std::dynamic_pointer_cast<MessageEvent>(event);
            return messageEvent->getMessage().find(keyword_) != std::string::npos; // 只接受包含关键字的消息
        }
        return false;
    }

private:
    std::string keyword_;
};

int main() {
    EventBus eventBus;

    // 创建订阅者 (添加事件过滤器)
    FilteredMessageSubscriber subscriber("important");

    // 订阅事件
    eventBus.subscribe("MessageEvent", &subscriber);

    // 发布事件
    auto event1 = std::make_shared<MessageEvent>("This is an important message.");
    eventBus.publish(event1);

    auto event2 = std::make_shared<MessageEvent>("This is a normal message.");
    eventBus.publish(event2);

    return 0;
}

总结:

今天我们一起用C++并发原语实现了一个简单的事件系统。虽然只是一个雏形,但已经具备了发布、订阅、取消订阅等基本功能,并且考虑了线程安全。记住,并发编程的重点在于:

  • 选择合适的并发原语: 根据实际场景选择最合适的并发原语,避免过度使用锁。
  • 控制锁的粒度: 尽可能缩小锁的范围,减少锁的竞争。
  • 避免死锁: 注意锁的获取顺序,避免死锁。

希望今天的讲座对你有所帮助。记住,编程是一门实践的艺术,多写代码,多踩坑,才能真正掌握并发编程的精髓。 下课!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注