观察者模式与发布/订阅模式:一场关于解耦的深度探索
大家好,今天我们来聊聊两种密切相关,但又常常被混淆的设计模式:观察者模式和发布/订阅模式。它们都旨在实现对象之间的松耦合,但实现方式和适用场景却有所不同。我们将深入探讨它们的区别,并动手实现一个功能完善的事件总线(Event Bus),进一步理解发布/订阅模式的强大之处。
观察者模式:直接的依赖关系
观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。当主题对象的状态发生改变时,所有观察者对象都会收到通知并自动更新。
核心要素:
- 主题 (Subject): 维护一个观察者列表,并提供添加、删除观察者的方法。在状态改变时,负责通知观察者。
- 观察者 (Observer): 定义一个更新接口,当接收到主题的通知时,执行相应的更新操作。
- 具体主题 (ConcreteSubject): 主题的具体实现,存储主题状态,并在状态改变时通知观察者。
- 具体观察者 (ConcreteObserver): 观察者的具体实现,实现更新接口,响应主题的通知。
代码示例(Python):
from abc import ABC, abstractmethod
# 抽象观察者
class Observer(ABC):
@abstractmethod
def update(self, subject):
pass
# 抽象主题
class Subject(ABC):
def __init__(self):
self._observers = []
def attach(self, observer):
self._observers.append(observer)
def detach(self, observer):
self._observers.remove(observer)
def notify(self):
for observer in self._observers:
observer.update(self)
# 具体主题
class ConcreteSubject(Subject):
def __init__(self):
super().__init__()
self._state = None
@property
def state(self):
return self._state
@state.setter
def state(self, value):
self._state = value
self.notify()
# 具体观察者
class ConcreteObserver(Observer):
def __init__(self, name, subject):
self._name = name
self._subject = subject
self._subject.attach(self)
def update(self, subject):
print(f"{self._name} 收到通知,主题状态变为:{subject.state}")
# 客户端代码
if __name__ == "__main__":
subject = ConcreteSubject()
observer1 = ConcreteObserver("Observer 1", subject)
observer2 = ConcreteObserver("Observer 2", subject)
subject.state = "State 1"
subject.state = "State 2"
subject.detach(observer1)
subject.state = "State 3"
关键特征:
- 观察者直接订阅主题,主题知道所有观察者的存在。
- 观察者和主题之间存在直接的依赖关系。
- 通常同步执行,主题状态改变后立即通知观察者。
发布/订阅模式:解耦的中间层
发布/订阅模式是一种消息传递模式,发布者(Publisher)发布消息,订阅者(Subscriber)订阅感兴趣的消息类型。消息通过一个消息代理(Message Broker)或事件总线(Event Bus)进行路由。发布者和订阅者彼此不知道对方的存在,从而实现了解耦。
核心要素:
- 发布者 (Publisher): 发布消息到消息代理/事件总线。
- 订阅者 (Subscriber): 订阅特定类型的消息,并接收来自消息代理/事件总线的消息。
- 消息代理/事件总线 (Message Broker/Event Bus): 负责接收发布者的消息,并根据订阅关系将消息路由到相应的订阅者。
关键区别:
特性 | 观察者模式 | 发布/订阅模式 |
---|---|---|
耦合度 | 较高,观察者直接依赖主题 | 较低,发布者和订阅者通过中间件解耦 |
知晓对方存在 | 主题知道所有观察者 | 发布者和订阅者互不知晓对方 |
消息传递方式 | 主题直接调用观察者的更新方法 | 通过消息代理/事件总线传递消息 |
中间件 | 无 | 有,消息代理/事件总线 |
复杂性 | 相对简单 | 相对复杂,需要维护消息队列和订阅关系 |
适用场景 | 对象之间的紧密协作,同步更新 | 对象之间松耦合,异步消息传递 |
可扩展性 | 较差,添加观察者需要修改主题代码 | 较好,添加发布者和订阅者无需修改其他组件 |
实现一个事件总线(Event Bus)
现在,让我们动手实现一个事件总线,来更好地理解发布/订阅模式。
import threading
from collections import defaultdict
class EventBus:
def __init__(self):
self._topics = defaultdict(list) # 存储主题和订阅者的映射关系
self._lock = threading.Lock() # 线程锁,保证线程安全
def subscribe(self, topic, callback):
"""
订阅主题。
Args:
topic: 主题名称。
callback: 订阅者接收到消息后执行的回调函数。
"""
with self._lock:
if callback not in self._topics[topic]:
self._topics[topic].append(callback)
else:
print(f"Warning: Callback already subscribed to topic '{topic}'.")
def unsubscribe(self, topic, callback):
"""
取消订阅主题。
Args:
topic: 主题名称。
callback: 要取消订阅的回调函数。
"""
with self._lock:
try:
self._topics[topic].remove(callback)
except ValueError:
print(f"Warning: Callback not subscribed to topic '{topic}'.")
def publish(self, topic, message):
"""
发布消息到指定主题。
Args:
topic: 主题名称。
message: 要发布的消息。
"""
with self._lock:
if topic in self._topics:
for callback in self._topics[topic]:
try:
# 使用线程异步执行回调函数
threading.Thread(target=callback, args=(message,)).start()
except Exception as e:
print(f"Error executing callback for topic '{topic}': {e}")
def clear_topic(self, topic):
"""
清除指定主题的所有订阅者。
Args:
topic: 要清除的主题名称。
"""
with self._lock:
if topic in self._topics:
self._topics[topic].clear()
else:
print(f"Warning: Topic '{topic}' does not exist.")
def get_subscribers(self, topic):
"""
获取指定主题的订阅者列表。
Args:
topic: 主题名称。
Returns:
一个包含订阅者的列表,如果主题不存在则返回空列表。
"""
with self._lock:
if topic in self._topics:
return list(self._topics[topic]) # 返回一个拷贝,防止外部修改
else:
return []
def has_subscriber(self, topic, callback):
"""
检查指定主题是否包含某个订阅者。
Args:
topic: 主题名称。
callback: 要检查的回调函数。
Returns:
如果主题包含该订阅者,则返回 True,否则返回 False。
"""
with self._lock:
if topic in self._topics:
return callback in self._topics[topic]
else:
return False
代码解释:
EventBus
类是事件总线的核心实现。_topics
字典用于存储主题和订阅者(回调函数)的映射关系。 使用defaultdict(list)
可以方便地处理新主题的添加,避免了每次都要检查主题是否存在的情况。subscribe(topic, callback)
方法用于订阅主题,将回调函数添加到对应主题的订阅者列表中。 检查callback是否存在避免重复订阅。unsubscribe(topic, callback)
方法用于取消订阅主题,从对应主题的订阅者列表中移除回调函数。publish(topic, message)
方法用于发布消息,遍历对应主题的订阅者列表,并异步执行每个订阅者的回调函数。使用threading.Thread
确保回调函数不会阻塞发布者的执行。_lock
线程锁用于保证线程安全,防止多个线程同时修改_topics
字典导致数据竞争。clear_topic(topic)
清除某个主题的所有订阅者,方便测试和资源清理。get_subscribers(topic)
获取某个topic的所有订阅者,返回一个拷贝防止外部修改。has_subscriber(topic, callback)
检查某个主题是否包含某个订阅者。
使用示例:
import time
# 创建事件总线实例
event_bus = EventBus()
# 定义订阅者回调函数
def subscriber1(message):
print(f"Subscriber 1 接收到消息:{message}")
time.sleep(1) # 模拟耗时操作
print(f"Subscriber 1 处理完毕")
def subscriber2(message):
print(f"Subscriber 2 接收到消息:{message}")
def subscriber3(message):
print(f"Subscriber 3 接收到消息:{message}")
# 订阅主题
event_bus.subscribe("topic1", subscriber1)
event_bus.subscribe("topic1", subscriber2)
event_bus.subscribe("topic2", subscriber3)
# 发布消息
print("发布消息到 topic1")
event_bus.publish("topic1", "Hello, topic1!")
print("发布消息到 topic2")
event_bus.publish("topic2", "Hello, topic2!")
# 等待一段时间,确保异步回调函数执行完毕
time.sleep(2)
#取消订阅
event_bus.unsubscribe("topic1", subscriber1)
print("发布消息到 topic1 (subscriber1 已取消订阅)")
event_bus.publish("topic1", "Hello again, topic1!")
time.sleep(1)
# 清除topic1的所有订阅者
event_bus.clear_topic("topic1")
print("发布消息到 topic1 (所有订阅者已清除)")
event_bus.publish("topic1", "Another message for topic1!")
time.sleep(1)
# 检查订阅者
print(f"topic1 是否包含 subscriber2: {event_bus.has_subscriber('topic1', subscriber2)}") #False, 因为之前clear_topic了
print(f"topic2 是否包含 subscriber3: {event_bus.has_subscriber('topic2', subscriber3)}") #True
运行结果 (输出顺序可能因线程调度而不同):
发布消息到 topic1
发布消息到 topic2
Subscriber 1 接收到消息:Hello, topic1!
Subscriber 2 接收到消息:Hello, topic1!
Subscriber 3 接收到消息:Hello, topic2!
Subscriber 1 处理完毕
发布消息到 topic1 (subscriber1 已取消订阅)
Subscriber 2 接收到消息:Hello again, topic1!
发布消息到 topic1 (所有订阅者已清除)
topic1 是否包含 subscriber2: False
topic2 是否包含 subscriber3: True
事件总线的优势:
- 解耦: 发布者和订阅者之间完全解耦,互不知晓对方的存在。
- 可扩展性: 可以轻松地添加新的发布者和订阅者,而无需修改现有代码。
- 灵活性: 可以动态地订阅和取消订阅主题。
- 异步性: 发布消息后,发布者可以继续执行其他任务,而无需等待订阅者处理消息。
总结:解耦之道,各有所长
观察者模式和发布/订阅模式都是用于实现对象之间松耦合的设计模式。观察者模式适用于对象之间的紧密协作和同步更新,而发布/订阅模式适用于对象之间的松耦合和异步消息传递。选择哪种模式取决于具体的应用场景和需求。
进一步的思考
事件总线只是一个简单的实现,在实际应用中,还需要考虑以下问题:
- 消息持久化: 将消息存储到数据库或消息队列中,防止消息丢失。
- 消息过滤: 根据消息内容或其他属性,对消息进行过滤,只传递给感兴趣的订阅者。
- 消息优先级: 为消息设置优先级,确保重要消息能够及时处理。
- 错误处理: 处理订阅者回调函数执行过程中出现的错误。
- 分布式事件总线: 在分布式系统中,需要使用专门的消息队列服务,例如 RabbitMQ、Kafka 等。
希望今天的分享能够帮助大家更好地理解观察者模式和发布/订阅模式,并能够灵活运用它们来构建更加松耦合、可扩展的应用程序。