C++ 观察者模式:在多线程环境中实现事件通知

好的,各位观众老爷们,大家好!今天咱就来聊聊C++里那个有点意思的“观察者模式”,而且得是在“多线程”这个热闹的舞台上玩。这俩词儿一凑一块儿,就有点像在嘈杂的菜市场里试图用广播通知大家“猪肉降价了!”——既要保证消息传到,又不能把市场搞得更乱。

啥是观察者模式?(别跟我说你不知道!)

简单来说,观察者模式就像订阅报纸。你(观察者)订阅了《头条日报》(主题),一旦《头条日报》有新内容(事件发生),它就会自动把消息推送给你。你不用天天打电话问编辑“今天有啥新闻没?”。

用更技术的话说:

  • 主题(Subject): 维护一个观察者列表,当状态发生变化时,通知所有观察者。
  • 观察者(Observer): 定义一个更新接口,用于接收主题的通知。
  • 具体主题(Concrete Subject): 继承主题,实现状态变化时通知观察者的逻辑。
  • 具体观察者(Concrete Observer): 继承观察者,实现接收到通知后的处理逻辑。

C++代码来一波!(别光说不练!)

先来个简单的单线程版本,让大家热热身:

#include <iostream>
#include <vector>
#include <string>

// 观察者接口
class Observer {
public:
    virtual void update(const std::string& message) = 0;
    virtual ~Observer() {}
};

// 具体观察者
class ConcreteObserver : public Observer {
public:
    ConcreteObserver(const std::string& name) : name_(name) {}

    void update(const std::string& message) override {
        std::cout << name_ << " received message: " << message << std::endl;
    }

private:
    std::string name_;
};

// 主题接口
class Subject {
public:
    virtual void attach(Observer* observer) = 0;
    virtual void detach(Observer* observer) = 0;
    virtual void notify(const std::string& message) = 0;
    virtual ~Subject() {}
};

// 具体主题
class ConcreteSubject : public Subject {
public:
    void attach(Observer* observer) override {
        observers_.push_back(observer);
    }

    void detach(Observer* observer) override {
        for (auto it = observers_.begin(); it != observers_.end(); ++it) {
            if (*it == observer) {
                observers_.erase(it);
                return;
            }
        }
    }

    void notify(const std::string& message) override {
        for (Observer* observer : observers_) {
            observer->update(message);
        }
    }

    void sendMessage(const std::string& message) {
        notify(message);
    }

private:
    std::vector<Observer*> observers_;
};

int main() {
    ConcreteSubject subject;
    ConcreteObserver observer1("小明");
    ConcreteObserver observer2("小红");

    subject.attach(&observer1);
    subject.attach(&observer2);

    subject.sendMessage("今天猪肉降价啦!");

    subject.detach(&observer1);
    subject.sendMessage("明天猪肉可能涨价!");

    return 0;
}

这段代码展示了一个简单的主题和两个观察者。主题发送消息时,所有注册的观察者都会收到通知。

多线程的坑!(一不小心就翻车!)

现在,咱们把这个模式搬到多线程的环境里。想象一下,主题在多个线程中同时发送消息,观察者也在不同的线程中接收消息。如果没有处理好并发问题,就会出现以下几种情况:

  • 数据竞争(Data Race): 多个线程同时访问和修改observers_列表,导致数据损坏。比如,一个线程正在添加观察者,另一个线程正在遍历列表,这可能会导致崩溃或者错误。
  • 条件竞争(Race Condition): 消息的顺序不确定。一个线程发送的消息可能被另一个线程发送的消息覆盖。
  • 死锁(Deadlock): 多个线程相互等待对方释放资源,导致程序卡死。

多线程安全方案!(不能怂,就是干!)

为了解决这些问题,我们需要使用一些同步机制来保护共享资源。常用的方法包括:

  1. 互斥锁(Mutex): 保护observers_列表的访问。
  2. 读写锁(Read-Write Lock): 允许多个线程同时读取observers_列表,但只允许一个线程写入。
  3. 原子操作(Atomic Operations): 对于简单的状态变量,可以使用原子操作来保证线程安全。
  4. 线程安全队列(Thread-Safe Queue): 将消息放入队列,由一个单独的线程来处理消息。

代码升级!(加点料!)

我们来修改上面的代码,使用互斥锁来保护observers_列表:

#include <iostream>
#include <vector>
#include <string>
#include <mutex>
#include <thread>

// 观察者接口
class Observer {
public:
    virtual void update(const std::string& message) = 0;
    virtual ~Observer() {}
};

// 具体观察者
class ConcreteObserver : public Observer {
public:
    ConcreteObserver(const std::string& name) : name_(name) {}

    void update(const std::string& message) override {
        std::cout << name_ << " received message: " << message << " in thread " << std::this_thread::get_id() << std::endl;
    }

private:
    std::string name_;
};

// 主题接口
class Subject {
public:
    virtual void attach(Observer* observer) = 0;
    virtual void detach(Observer* observer) = 0;
    virtual void notify(const std::string& message) = 0;
    virtual ~Subject() {}
};

// 具体主题
class ConcreteSubject : public Subject {
public:
    void attach(Observer* observer) override {
        std::lock_guard<std::mutex> lock(mutex_);
        observers_.push_back(observer);
    }

    void detach(Observer* observer) override {
        std::lock_guard<std::mutex> lock(mutex_);
        for (auto it = observers_.begin(); it != observers_.end(); ++it) {
            if (*it == observer) {
                observers_.erase(it);
                return;
            }
        }
    }

    void notify(const std::string& message) override {
        std::lock_guard<std::mutex> lock(mutex_);
        for (Observer* observer : observers_) {
            observer->update(message);
        }
    }

    void sendMessage(const std::string& message) {
        notify(message);
    }

private:
    std::vector<Observer*> observers_;
    std::mutex mutex_;
};

int main() {
    ConcreteSubject subject;
    ConcreteObserver observer1("小明");
    ConcreteObserver observer2("小红");

    subject.attach(&observer1);
    subject.attach(&observer2);

    std::thread t1([&]() {
        subject.sendMessage("线程1:今天猪肉降价啦!");
    });

    std::thread t2([&]() {
        subject.sendMessage("线程2:明天猪肉可能涨价!");
    });

    t1.join();
    t2.join();

    subject.detach(&observer1);
    subject.sendMessage("主线程:后天猪肉肯定要涨价!");

    return 0;
}

在这个版本中,我们使用std::mutex来保护observers_列表。attachdetachnotify方法都使用std::lock_guard来自动加锁和解锁。这样可以保证在任何时候只有一个线程可以访问observers_列表,从而避免数据竞争。

读写锁优化!(更上一层楼!)

如果读操作远多于写操作,使用读写锁可以提高性能。读写锁允许多个线程同时读取共享资源,但只允许一个线程写入。

#include <iostream>
#include <vector>
#include <string>
#include <shared_mutex> // C++17
#include <thread>

// 观察者接口
class Observer {
public:
    virtual void update(const std::string& message) = 0;
    virtual ~Observer() {}
};

// 具体观察者
class ConcreteObserver : public Observer {
public:
    ConcreteObserver(const std::string& name) : name_(name) {}

    void update(const std::string& message) override {
        std::cout << name_ << " received message: " << message << " in thread " << std::this_thread::get_id() << std::endl;
    }

private:
    std::string name_;
};

// 主题接口
class Subject {
public:
    virtual void attach(Observer* observer) = 0;
    virtual void detach(Observer* observer) = 0;
    virtual void notify(const std::string& message) = 0;
    virtual ~Subject() {}
};

// 具体主题
class ConcreteSubject : public Subject {
public:
    void attach(Observer* observer) override {
        std::lock_guard<std::shared_mutex> lock(mutex_); //Exclusive lock for writing
        observers_.push_back(observer);
    }

    void detach(Observer* observer) override {
        std::lock_guard<std::shared_mutex> lock(mutex_); //Exclusive lock for writing
        for (auto it = observers_.begin(); it != observers_.end(); ++it) {
            if (*it == observer) {
                observers_.erase(it);
                return;
            }
        }
    }

    void notify(const std::string& message) override {
        std::shared_lock<std::shared_mutex> lock(mutex_); //Shared lock for reading
        for (Observer* observer : observers_) {
            observer->update(message);
        }
    }

    void sendMessage(const std::string& message) {
        notify(message);
    }

private:
    std::vector<Observer*> observers_;
    std::shared_mutex mutex_;
};

int main() {
    ConcreteSubject subject;
    ConcreteObserver observer1("小明");
    ConcreteObserver observer2("小红");

    subject.attach(&observer1);
    subject.attach(&observer2);

    std::thread t1([&]() {
        subject.sendMessage("线程1:今天猪肉降价啦!");
    });

    std::thread t2([&]() {
        subject.sendMessage("线程2:明天猪肉可能涨价!");
    });

    t1.join();
    t2.join();

    subject.detach(&observer1);
    subject.sendMessage("主线程:后天猪肉肯定要涨价!");

    return 0;
}

在这个版本中,我们使用std::shared_mutex(C++17引入)作为读写锁。attachdetach方法使用std::lock_guard来获取独占锁(写锁),而notify方法使用std::shared_lock来获取共享锁(读锁)。

线程安全队列!(异步处理!)

如果消息处理比较耗时,可以使用线程安全队列来异步处理消息。主题将消息放入队列,一个单独的线程从队列中取出消息并通知观察者。

#include <iostream>
#include <vector>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

// 观察者接口
class Observer {
public:
    virtual void update(const std::string& message) = 0;
    virtual ~Observer() {}
};

// 具体观察者
class ConcreteObserver : public Observer {
public:
    ConcreteObserver(const std::string& name) : name_(name) {}

    void update(const std::string& message) override {
        std::cout << name_ << " received message: " << message << " in thread " << std::this_thread::get_id() << std::endl;
    }

private:
    std::string name_;
};

// 线程安全队列
class ThreadSafeQueue {
public:
    void enqueue(const std::string& message) {
        std::lock_guard<std::mutex> lock(mutex_);
        queue_.push(message);
        condition_.notify_one();
    }

    std::string dequeue() {
        std::unique_lock<std::mutex> lock(mutex_);
        condition_.wait(lock, [this] { return !queue_.empty(); });
        std::string message = queue_.front();
        queue_.pop();
        return message;
    }

private:
    std::queue<std::string> queue_;
    std::mutex mutex_;
    std::condition_variable condition_;
};

// 主题接口
class Subject {
public:
    virtual void attach(Observer* observer) = 0;
    virtual void detach(Observer* observer) = 0;
    virtual ~Subject() {}
};

// 具体主题
class ConcreteSubject : public Subject {
public:
    ConcreteSubject() : running_(true) {
        worker_thread_ = std::thread([this]() {
            while (running_) {
                std::string message = queue_.dequeue();
                notify(message);
            }
        });
    }

    ~ConcreteSubject() {
        running_ = false;
        queue_.enqueue(""); // Signal the worker thread to exit
        worker_thread_.join();

        //Clean up observers
        std::lock_guard<std::mutex> lock(mutex_);
        for(auto observer : observers_){
            delete observer;
        }
    }

    void attach(Observer* observer) override {
        std::lock_guard<std::mutex> lock(mutex_);
        observers_.push_back(observer);
    }

    void detach(Observer* observer) override {
        std::lock_guard<std::mutex> lock(mutex_);
        for (auto it = observers_.begin(); it != observers_.end(); ++it) {
            if (*it == observer) {
                observers_.erase(it);
                return;
            }
        }
    }

    void sendMessage(const std::string& message) {
        queue_.enqueue(message);
    }

private:
    void notify(const std::string& message) {
        std::lock_guard<std::mutex> lock(mutex_);
        for (Observer* observer : observers_) {
            observer->update(message);
        }
    }
    std::vector<Observer*> observers_;
    std::mutex mutex_;
    ThreadSafeQueue queue_;
    std::thread worker_thread_;
    std::atomic<bool> running_;

};

int main() {
    ConcreteSubject subject;
    Observer* observer1 = new ConcreteObserver("小明");
    Observer* observer2 = new ConcreteObserver("小红");

    subject.attach(observer1);
    subject.attach(observer2);

    std::thread t1([&]() {
        subject.sendMessage("线程1:今天猪肉降价啦!");
    });

    std::thread t2([&]() {
        subject.sendMessage("线程2:明天猪肉可能涨价!");
    });

    t1.join();
    t2.join();

    subject.detach(observer1);
    subject.sendMessage("主线程:后天猪肉肯定要涨价!");

    return 0;
}

在这个版本中,ConcreteSubject内部维护了一个ThreadSafeQueuesendMessage方法将消息放入队列,一个单独的线程(worker_thread_)从队列中取出消息并调用notify方法通知观察者。这样可以避免在发送消息时阻塞主线程。使用了条件变量来避免忙等待。

各种方案的优缺点对比!(选哪个好呢?)

方案 优点 缺点 适用场景
互斥锁 简单易用,适用于各种场景。 并发度较低,所有操作都需要互斥访问。 读写操作都比较频繁,且对性能要求不高的场景。
读写锁 允许多个线程同时读取,提高并发度。 写操作会阻塞所有读操作,可能导致写饥饿。 读操作远多于写操作,且对性能要求较高的场景。
线程安全队列 异步处理消息,避免阻塞发送线程。 增加了额外的线程和队列维护开销。 消息处理比较耗时,且不希望阻塞发送线程的场景。
原子操作 对于简单的状态变量,可以实现无锁并发,提高性能。 只能用于简单的状态变量,不适用于复杂的数据结构。 只需要对单个变量进行原子操作的场景。

总结!(划重点啦!)

在多线程环境下使用观察者模式需要特别注意线程安全问题。选择合适的同步机制可以避免数据竞争、条件竞争和死锁等问题,从而保证程序的正确性和性能。

记住,没有银弹。选择哪种方案取决于具体的应用场景和性能需求。

好了,今天的分享就到这里。希望大家以后在多线程的菜市场里也能优雅地通知“猪肉降价了!”。 咱们下期再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注