观察者模式与发布/订阅模式:区分这两种设计模式,并实现一个完整的事件总线(Event Bus)。

观察者模式与发布/订阅模式:一场关于解耦的深度探索

大家好,今天我们来聊聊两种密切相关,但又常常被混淆的设计模式:观察者模式和发布/订阅模式。它们都旨在实现对象之间的松耦合,但实现方式和适用场景却有所不同。我们将深入探讨它们的区别,并动手实现一个功能完善的事件总线(Event Bus),进一步理解发布/订阅模式的强大之处。

观察者模式:直接的依赖关系

观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。当主题对象的状态发生改变时,所有观察者对象都会收到通知并自动更新。

核心要素:

  • 主题 (Subject): 维护一个观察者列表,并提供添加、删除观察者的方法。在状态改变时,负责通知观察者。
  • 观察者 (Observer): 定义一个更新接口,当接收到主题的通知时,执行相应的更新操作。
  • 具体主题 (ConcreteSubject): 主题的具体实现,存储主题状态,并在状态改变时通知观察者。
  • 具体观察者 (ConcreteObserver): 观察者的具体实现,实现更新接口,响应主题的通知。

代码示例(Python):

from abc import ABC, abstractmethod

# 抽象观察者
class Observer(ABC):
    @abstractmethod
    def update(self, subject):
        pass

# 抽象主题
class Subject(ABC):
    def __init__(self):
        self._observers = []

    def attach(self, observer):
        self._observers.append(observer)

    def detach(self, observer):
        self._observers.remove(observer)

    def notify(self):
        for observer in self._observers:
            observer.update(self)

# 具体主题
class ConcreteSubject(Subject):
    def __init__(self):
        super().__init__()
        self._state = None

    @property
    def state(self):
        return self._state

    @state.setter
    def state(self, value):
        self._state = value
        self.notify()

# 具体观察者
class ConcreteObserver(Observer):
    def __init__(self, name, subject):
        self._name = name
        self._subject = subject
        self._subject.attach(self)

    def update(self, subject):
        print(f"{self._name} 收到通知,主题状态变为:{subject.state}")

# 客户端代码
if __name__ == "__main__":
    subject = ConcreteSubject()

    observer1 = ConcreteObserver("Observer 1", subject)
    observer2 = ConcreteObserver("Observer 2", subject)

    subject.state = "State 1"
    subject.state = "State 2"

    subject.detach(observer1)

    subject.state = "State 3"

关键特征:

  • 观察者直接订阅主题,主题知道所有观察者的存在。
  • 观察者和主题之间存在直接的依赖关系。
  • 通常同步执行,主题状态改变后立即通知观察者。

发布/订阅模式:解耦的中间层

发布/订阅模式是一种消息传递模式,发布者(Publisher)发布消息,订阅者(Subscriber)订阅感兴趣的消息类型。消息通过一个消息代理(Message Broker)或事件总线(Event Bus)进行路由。发布者和订阅者彼此不知道对方的存在,从而实现了解耦。

核心要素:

  • 发布者 (Publisher): 发布消息到消息代理/事件总线。
  • 订阅者 (Subscriber): 订阅特定类型的消息,并接收来自消息代理/事件总线的消息。
  • 消息代理/事件总线 (Message Broker/Event Bus): 负责接收发布者的消息,并根据订阅关系将消息路由到相应的订阅者。

关键区别:

特性 观察者模式 发布/订阅模式
耦合度 较高,观察者直接依赖主题 较低,发布者和订阅者通过中间件解耦
知晓对方存在 主题知道所有观察者 发布者和订阅者互不知晓对方
消息传递方式 主题直接调用观察者的更新方法 通过消息代理/事件总线传递消息
中间件 有,消息代理/事件总线
复杂性 相对简单 相对复杂,需要维护消息队列和订阅关系
适用场景 对象之间的紧密协作,同步更新 对象之间松耦合,异步消息传递
可扩展性 较差,添加观察者需要修改主题代码 较好,添加发布者和订阅者无需修改其他组件

实现一个事件总线(Event Bus)

现在,让我们动手实现一个事件总线,来更好地理解发布/订阅模式。

import threading
from collections import defaultdict

class EventBus:
    def __init__(self):
        self._topics = defaultdict(list)  # 存储主题和订阅者的映射关系
        self._lock = threading.Lock()   # 线程锁,保证线程安全

    def subscribe(self, topic, callback):
        """
        订阅主题。

        Args:
            topic: 主题名称。
            callback: 订阅者接收到消息后执行的回调函数。
        """
        with self._lock:
            if callback not in self._topics[topic]:
                self._topics[topic].append(callback)
            else:
                print(f"Warning: Callback already subscribed to topic '{topic}'.")

    def unsubscribe(self, topic, callback):
        """
        取消订阅主题。

        Args:
            topic: 主题名称。
            callback: 要取消订阅的回调函数。
        """
        with self._lock:
            try:
                self._topics[topic].remove(callback)
            except ValueError:
                print(f"Warning: Callback not subscribed to topic '{topic}'.")

    def publish(self, topic, message):
        """
        发布消息到指定主题。

        Args:
            topic: 主题名称。
            message: 要发布的消息。
        """
        with self._lock:
            if topic in self._topics:
                for callback in self._topics[topic]:
                    try:
                        # 使用线程异步执行回调函数
                        threading.Thread(target=callback, args=(message,)).start()
                    except Exception as e:
                        print(f"Error executing callback for topic '{topic}': {e}")

    def clear_topic(self, topic):
        """
        清除指定主题的所有订阅者。

        Args:
            topic: 要清除的主题名称。
        """
        with self._lock:
            if topic in self._topics:
                self._topics[topic].clear()
            else:
                print(f"Warning: Topic '{topic}' does not exist.")

    def get_subscribers(self, topic):
        """
        获取指定主题的订阅者列表。

        Args:
            topic: 主题名称。

        Returns:
            一个包含订阅者的列表,如果主题不存在则返回空列表。
        """
        with self._lock:
            if topic in self._topics:
                return list(self._topics[topic])  # 返回一个拷贝,防止外部修改
            else:
                return []

    def has_subscriber(self, topic, callback):
        """
        检查指定主题是否包含某个订阅者。

        Args:
            topic: 主题名称。
            callback: 要检查的回调函数。

        Returns:
            如果主题包含该订阅者,则返回 True,否则返回 False。
        """
        with self._lock:
            if topic in self._topics:
                return callback in self._topics[topic]
            else:
                return False

代码解释:

  • EventBus 类是事件总线的核心实现。
  • _topics 字典用于存储主题和订阅者(回调函数)的映射关系。 使用 defaultdict(list) 可以方便地处理新主题的添加,避免了每次都要检查主题是否存在的情况。
  • subscribe(topic, callback) 方法用于订阅主题,将回调函数添加到对应主题的订阅者列表中。 检查callback是否存在避免重复订阅。
  • unsubscribe(topic, callback) 方法用于取消订阅主题,从对应主题的订阅者列表中移除回调函数。
  • publish(topic, message) 方法用于发布消息,遍历对应主题的订阅者列表,并异步执行每个订阅者的回调函数。使用 threading.Thread 确保回调函数不会阻塞发布者的执行。
  • _lock 线程锁用于保证线程安全,防止多个线程同时修改 _topics 字典导致数据竞争。
  • clear_topic(topic) 清除某个主题的所有订阅者,方便测试和资源清理。
  • get_subscribers(topic) 获取某个topic的所有订阅者,返回一个拷贝防止外部修改。
  • has_subscriber(topic, callback) 检查某个主题是否包含某个订阅者。

使用示例:

import time

# 创建事件总线实例
event_bus = EventBus()

# 定义订阅者回调函数
def subscriber1(message):
    print(f"Subscriber 1 接收到消息:{message}")
    time.sleep(1) # 模拟耗时操作
    print(f"Subscriber 1 处理完毕")

def subscriber2(message):
    print(f"Subscriber 2 接收到消息:{message}")

def subscriber3(message):
    print(f"Subscriber 3 接收到消息:{message}")

# 订阅主题
event_bus.subscribe("topic1", subscriber1)
event_bus.subscribe("topic1", subscriber2)
event_bus.subscribe("topic2", subscriber3)

# 发布消息
print("发布消息到 topic1")
event_bus.publish("topic1", "Hello, topic1!")
print("发布消息到 topic2")
event_bus.publish("topic2", "Hello, topic2!")

# 等待一段时间,确保异步回调函数执行完毕
time.sleep(2)

#取消订阅
event_bus.unsubscribe("topic1", subscriber1)
print("发布消息到 topic1 (subscriber1 已取消订阅)")
event_bus.publish("topic1", "Hello again, topic1!")

time.sleep(1)

# 清除topic1的所有订阅者
event_bus.clear_topic("topic1")
print("发布消息到 topic1 (所有订阅者已清除)")
event_bus.publish("topic1", "Another message for topic1!")

time.sleep(1)

# 检查订阅者
print(f"topic1 是否包含 subscriber2: {event_bus.has_subscriber('topic1', subscriber2)}") #False, 因为之前clear_topic了
print(f"topic2 是否包含 subscriber3: {event_bus.has_subscriber('topic2', subscriber3)}") #True

运行结果 (输出顺序可能因线程调度而不同):

发布消息到 topic1
发布消息到 topic2
Subscriber 1 接收到消息:Hello, topic1!
Subscriber 2 接收到消息:Hello, topic1!
Subscriber 3 接收到消息:Hello, topic2!
Subscriber 1 处理完毕
发布消息到 topic1 (subscriber1 已取消订阅)
Subscriber 2 接收到消息:Hello again, topic1!
发布消息到 topic1 (所有订阅者已清除)
topic1 是否包含 subscriber2: False
topic2 是否包含 subscriber3: True

事件总线的优势:

  • 解耦: 发布者和订阅者之间完全解耦,互不知晓对方的存在。
  • 可扩展性: 可以轻松地添加新的发布者和订阅者,而无需修改现有代码。
  • 灵活性: 可以动态地订阅和取消订阅主题。
  • 异步性: 发布消息后,发布者可以继续执行其他任务,而无需等待订阅者处理消息。

总结:解耦之道,各有所长

观察者模式和发布/订阅模式都是用于实现对象之间松耦合的设计模式。观察者模式适用于对象之间的紧密协作和同步更新,而发布/订阅模式适用于对象之间的松耦合和异步消息传递。选择哪种模式取决于具体的应用场景和需求。

进一步的思考

事件总线只是一个简单的实现,在实际应用中,还需要考虑以下问题:

  • 消息持久化: 将消息存储到数据库或消息队列中,防止消息丢失。
  • 消息过滤: 根据消息内容或其他属性,对消息进行过滤,只传递给感兴趣的订阅者。
  • 消息优先级: 为消息设置优先级,确保重要消息能够及时处理。
  • 错误处理: 处理订阅者回调函数执行过程中出现的错误。
  • 分布式事件总线: 在分布式系统中,需要使用专门的消息队列服务,例如 RabbitMQ、Kafka 等。

希望今天的分享能够帮助大家更好地理解观察者模式和发布/订阅模式,并能够灵活运用它们来构建更加松耦合、可扩展的应用程序。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注