Python高级技术之:`Python`的`Observer`模式:如何实现发布-订阅模型。

各位观众老爷们,大家好!今天咱们来聊聊Python中一个非常有趣的设计模式——观察者模式(Observer Pattern),也叫做发布-订阅模式。这玩意儿听起来高大上,但实际上理解起来很简单,用起来也相当顺手。

什么是观察者模式?(别被名字吓跑!)

想象一下,你订阅了一个你喜欢的博主的博客。只要他一更新文章,你的邮箱就会收到通知。这,就是一个典型的观察者模式!

简单来说,观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。当这个主题对象的状态发生改变时,所有依赖它的观察者对象都会收到通知并自动更新。

这么说可能有点抽象,咱们换个更接地气的例子:

角色 作用
主题(Subject) 相当于那个博主,拥有自己的状态(比如文章内容),并且负责维护一个观察者列表,当状态改变时,通知所有观察者。
观察者(Observer) 相当于你,订阅了博主,当博主更新文章时,你会收到通知。

为什么要用观察者模式?(它能解决什么问题?)

  • 解耦: 主题对象和观察者对象之间是松散耦合的。主题对象不需要知道观察者的具体实现,只需要知道它们实现了观察者接口即可。这使得系统更加灵活,易于扩展和维护。
  • 易于扩展: 你可以随时添加新的观察者,而不需要修改主题对象的代码。
  • 事件驱动: 观察者模式是事件驱动编程的基础。当某个事件发生时(比如主题对象的状态改变),会自动触发相应的处理逻辑。

Python 中的观察者模式实现(代码说话!)

Python 实现观察者模式的方式有很多种,这里我们先用最基本的方式来实现,然后再介绍一些更高级的技巧。

1. 基本实现

class Subject:
    """主题对象"""

    def __init__(self):
        self._observers = []  # 观察者列表
        self._state = None    # 主题的状态

    def attach(self, observer):
        """添加观察者"""
        self._observers.append(observer)

    def detach(self, observer):
        """移除观察者"""
        self._observers.remove(observer)

    def notify(self):
        """通知所有观察者"""
        for observer in self._observers:
            observer.update(self._state)

    @property
    def state(self):
        return self._state

    @state.setter
    def state(self, new_state):
        """设置主题状态,并通知观察者"""
        self._state = new_state
        self.notify()

class Observer:
    """观察者对象"""

    def update(self, state):
        """更新方法,当主题状态改变时被调用"""
        print(f"Observer received update: {state}")

# 示例
subject = Subject()

observer1 = Observer()
observer2 = Observer()

subject.attach(observer1)
subject.attach(observer2)

subject.state = "New article published!"  # 输出:Observer received update: New article published!
                                        #      Observer received update: New article published!

subject.detach(observer1)

subject.state = "Another update!"      # 输出:Observer received update: Another update!

这个例子非常简单,Subject 类维护了一个观察者列表,当状态改变时,调用 notify() 方法通知所有观察者。 Observer 类定义了一个 update() 方法,用于接收主题对象的状态更新。

2. 使用抽象基类(更规范!)

为了让代码更加规范,我们可以使用 Python 的 abc 模块来定义抽象基类。

import abc

class Subject(abc.ABC):
    """抽象主题类"""

    def __init__(self):
        self._observers = []

    def attach(self, observer):
        self._observers.append(observer)

    def detach(self, observer):
        self._observers.remove(observer)

    def notify(self):
        for observer in self._observers:
            observer.update(self.get_state())

    @abc.abstractmethod
    def get_state(self):
        """抽象方法,获取主题状态,子类必须实现"""
        pass

    @abc.abstractmethod
    def set_state(self, state):
        """抽象方法,设置主题状态,子类必须实现"""
        pass

class ConcreteSubject(Subject):
    """具体主题类"""

    def __init__(self):
        super().__init__()
        self._state = None

    def get_state(self):
        return self._state

    def set_state(self, state):
        self._state = state
        self.notify()

class Observer(abc.ABC):
    """抽象观察者类"""

    @abc.abstractmethod
    def update(self, state):
        """抽象方法,更新方法,子类必须实现"""
        pass

class ConcreteObserver(Observer):
    """具体观察者类"""

    def __init__(self, name):
        self._name = name

    def update(self, state):
        print(f"{self._name} received update: {state}")

# 示例
subject = ConcreteSubject()

observer1 = ConcreteObserver("Observer 1")
observer2 = ConcreteObserver("Observer 2")

subject.attach(observer1)
subject.attach(observer2)

subject.set_state("New article published!")
subject.detach(observer1)
subject.set_state("Another update!")

这里,我们定义了 SubjectObserver 的抽象基类,强制子类实现特定的方法。 这样做的好处是:

  • 明确接口: 确保所有主题和观察者都遵循相同的接口。
  • 类型检查: 可以在运行时进行类型检查,防止出现意外错误。

3. 使用信号(Signals)和槽(Slots)(更高级!)

在一些 GUI 框架(比如 PyQt、Tkinter)中,观察者模式通常使用信号和槽的概念来实现。 信号相当于主题对象发出的事件,槽相当于观察者对象接收事件并进行处理的函数。

虽然我们不能直接在纯 Python 代码中使用 GUI 框架的信号和槽机制,但我们可以模拟类似的功能。

class Signal:
    """信号类,用于发布事件"""

    def __init__(self):
        self._slots = []

    def connect(self, slot):
        """连接槽函数"""
        self._slots.append(slot)

    def disconnect(self, slot):
        """断开槽函数"""
        self._slots.remove(slot)

    def emit(self, *args, **kwargs):
        """发射信号,调用所有连接的槽函数"""
        for slot in self._slots:
            slot(*args, **kwargs)

class Subject:
    """主题对象"""

    def __init__(self):
        self.state_changed = Signal()  # 定义一个信号

    def set_state(self, state):
        self._state = state
        self.state_changed.emit(state)  # 发射信号

class Observer:
    """观察者对象"""

    def __init__(self, name):
        self._name = name

    def on_state_changed(self, state):
        """槽函数,当主题状态改变时被调用"""
        print(f"{self._name} received state changed signal: {state}")

# 示例
subject = Subject()

observer1 = Observer("Observer 1")
observer2 = Observer("Observer 2")

# 连接信号和槽
subject.state_changed.connect(observer1.on_state_changed)
subject.state_changed.connect(observer2.on_state_changed)

subject.set_state("New article published!")

# 断开连接
subject.state_changed.disconnect(observer1.on_state_changed)

subject.set_state("Another update!")

在这个例子中,Signal 类模拟了信号的功能,emit() 方法用于发射信号,调用所有连接的槽函数。 Subject 类定义了一个 state_changed 信号,当状态改变时,发射该信号。 Observer 类定义了一个 on_state_changed() 槽函数,用于接收 state_changed 信号并进行处理。

4. 使用 weakref 避免循环引用(很重要!)

在使用观察者模式时,需要特别注意循环引用的问题。 如果主题对象和观察者对象相互引用,可能会导致内存泄漏。

为了解决这个问题,可以使用 weakref 模块创建弱引用。弱引用不会增加对象的引用计数,当对象不再被其他对象引用时,会被垃圾回收器回收。

import weakref

class Subject:
    """主题对象"""

    def __init__(self):
        self._observers = []

    def attach(self, observer):
        # 使用 weakref.ref 创建弱引用
        self._observers.append(weakref.ref(observer))

    def detach(self, observer):
        # 移除弱引用
        for ref in self._observers:
            if ref() == observer:
                self._observers.remove(ref)
                break

    def notify(self, state):
        # 遍历弱引用列表,调用观察者的 update 方法
        for ref in self._observers:
            observer = ref()
            if observer is not None:  # 检查对象是否仍然存在
                observer.update(state)

class Observer:
    """观察者对象"""

    def __init__(self, name, subject):
        self._name = name
        self._subject = subject
        subject.attach(self)  # 将自身添加到主题的观察者列表中

    def update(self, state):
        print(f"{self._name} received update: {state}")

    def __del__(self):
        print(f"{self._name} is being deleted")
        self._subject.detach(self)

# 示例
subject = Subject()

observer1 = Observer("Observer 1", subject)
observer2 = Observer("Observer 2", subject)

subject.notify("New article published!")

del observer1  # 删除 observer1,触发垃圾回收
subject.notify("Another update!")

在这个例子中,Subject 类使用 weakref.ref() 创建观察者的弱引用。 当观察者对象被删除时,弱引用会自动失效,避免循环引用导致的内存泄漏。 __del__ 方法用于在对象被销毁时,从主题的观察者列表中移除自身。

观察者模式的应用场景(用武之地!)

  • GUI 编程: 比如按钮的点击事件、文本框的内容改变等。
  • 事件处理: 比如消息队列、日志系统等。
  • 模型-视图-控制器(MVC)架构: 模型对象的状态改变会通知视图对象进行更新。
  • 发布-订阅系统: 消息发布者发布消息,订阅者接收消息。

总结(敲黑板!)

观察者模式是一种非常有用的设计模式,它可以帮助我们实现松散耦合、易于扩展的系统。 在 Python 中,我们可以使用多种方式来实现观察者模式,包括基本实现、抽象基类、信号和槽等。 在使用观察者模式时,需要特别注意循环引用的问题,可以使用 weakref 模块创建弱引用来避免内存泄漏。

好了,今天的讲座就到这里。 咱们下期再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注