各位技术同仁,下午好!
今天,我们将深入探讨一个在现代软件架构中无处不在,却又常常被低估其复杂性的设计模式——发布-订阅(Pub/Sub)模式。我将带领大家手写实现一个功能完备的 Pub/Sub 系统,它不仅支持基本的事件发布与订阅,更将融入命名空间管理与异步事件发布这两大核心特性,以满足真实世界中复杂系统的需求。
1. 发布-订阅模式的基石:解耦与协作
发布-订阅模式的核心理念在于解耦。它允许系统的不同组件在不直接知晓彼此存在的情况下进行通信。想象一下,一个报社(发布者)发布新闻,而读者(订阅者)阅读新闻。报社不需要知道每个读者的姓名和地址,读者也不需要直接联系报社获取新闻,他们都通过一个中间媒介——报纸(事件总线)进行交互。
在软件领域,这种模式的优势显而易见:
- 降低耦合度: 发布者和订阅者之间没有直接依赖,它们只依赖于事件总线和预定义的事件类型。这使得系统模块化,更易于开发、测试和维护。
- 提高可扩展性: 可以在不修改现有发布者代码的情况下,轻松添加新的订阅者来响应特定事件。反之亦然。
- 增强灵活性: 订阅者可以按需订阅感兴趣的事件,发布者可以专注于发布事件,无需关心事件的处理逻辑。
- 支持异步处理: 事件的发布和处理可以是异步的,这对于非阻塞操作、后台任务和性能优化至关重要。
我们的目标是构建一个名为 AsyncEventBus 的类,它将作为事件总线,负责管理事件的注册、发布和分发。
2. 构建核心 Pub/Sub 机制(同步版)
在引入命名空间和异步特性之前,我们先从最基础的同步 Pub/Sub 机制开始。这有助于我们理解其基本的数据结构和操作。
核心组件:
_subscribers字典: 用于存储所有订阅者。键是事件主题(topic),值是一个列表,包含所有订阅了该主题的回调函数(handler)。subscribe(topic, handler)方法: 注册一个订阅者。unsubscribe(topic, handler)方法: 移除一个订阅者。- *`publish(topic, args, kwargs)` 方法: 发布一个事件,并同步通知所有相关订阅者。
import collections
class BasicEventBus:
"""
一个基础的同步发布-订阅事件总线。
"""
def __init__(self):
# 使用 defaultdict 简化主题不存在时的处理,自动创建空列表
self._subscribers = collections.defaultdict(list)
def subscribe(self, topic: str, handler):
"""
订阅一个事件主题。
:param topic: 事件主题的字符串标识符。
:param handler: 当事件发布时将被调用的回调函数。
"""
if not callable(handler):
raise TypeError("Handler must be a callable function or method.")
if handler not in self._subscribers[topic]:
self._subscribers[topic].append(handler)
print(f"Subscribed: Handler {handler.__name__} to topic '{topic}'")
def unsubscribe(self, topic: str, handler):
"""
取消订阅一个事件主题。
:param topic: 事件主题的字符串标识符。
:param handler: 之前订阅过的回调函数。
"""
if topic in self._subscribers and handler in self._subscribers[topic]:
self._subscribers[topic].remove(handler)
print(f"Unsubscribed: Handler {handler.__name__} from topic '{topic}'")
# 如果主题下没有订阅者了,可以清理空列表,但 defaultdict 会自动处理
if not self._subscribers[topic]:
del self._subscribers[topic]
else:
print(f"Warning: Handler {handler.__name__} not found for topic '{topic}'")
def publish(self, topic: str, *args, **kwargs):
"""
发布一个事件到指定主题。
:param topic: 事件主题的字符串标识符。
:param args: 传递给订阅者回调函数的定位参数。
:param kwargs: 传递给订阅者回调函数的关键字参数。
"""
print(f"Publishing event to topic '{topic}' with args={args}, kwargs={kwargs}")
if topic in self._subscribers:
# 迭代订阅者列表的副本,防止在迭代过程中修改列表(例如通过 unsubscribe)
for handler in list(self._subscribers[topic]):
try:
handler(*args, **kwargs)
except Exception as e:
print(f"Error in handler {handler.__name__} for topic '{topic}': {e}")
else:
print(f"No subscribers for topic '{topic}'")
# 示例用法
if __name__ == "__main__":
event_bus = BasicEventBus()
def handler_a(message):
print(f"Handler A received: {message}")
def handler_b(data1, data2):
print(f"Handler B received: {data1} and {data2}")
def handler_c():
print("Handler C received an event with no data.")
event_bus.subscribe("user:created", handler_a)
event_bus.subscribe("data:updated", handler_b)
event_bus.subscribe("user:created", handler_c) # 订阅同一个主题
print("n--- Publishing 'user:created' ---")
event_bus.publish("user:created", "New user John Doe registered.")
print("n--- Publishing 'data:updated' ---")
event_bus.publish("data:updated", 123, data2="hello")
print("n--- Publishing 'non:existent' ---")
event_bus.publish("non:existent", "Some message")
print("n--- Unsubscribing handler_a from 'user:created' ---")
event_bus.unsubscribe("user:created", handler_a)
print("n--- Publishing 'user:created' again ---")
event_bus.publish("user:created", "Another user Jane Doe registered.")
print("n--- Unsubscribing a non-existent handler ---")
event_bus.unsubscribe("user:created", handler_a) # 再次尝试取消
代码解释:
collections.defaultdict(list)是一个非常方便的数据结构,当访问一个不存在的键时,它会自动创建一个新的空列表。这避免了在使用_subscribers[topic].append(handler)之前手动检查topic是否存在的麻烦。subscribe方法确保同一个handler不会被多次添加到同一个topic的订阅列表中。publish方法遍历_subscribers[topic]中的所有handler并调用它们。为了防止在迭代过程中列表被修改(例如,一个handler内部调用了unsubscribe),我们使用list(self._subscribers[topic])创建一个副本进行迭代。- 错误处理:每个
handler的调用都包裹在try-except块中,确保一个handler的失败不会阻止其他handler的执行。
这个 BasicEventBus 实现了最基础的 Pub/Sub 功能。但正如我们前面提到的,它有局限性:所有事件处理都是同步的,并且没有提供任何命名空间管理机制。
3. 引入异步事件发布
同步事件处理在某些场景下是可接受的,但它有一个显著的缺点:当一个事件被发布时,发布者必须等待所有订阅者处理完事件才能继续执行。如果某个订阅者执行耗时操作(如数据库查询、网络请求),那么整个系统都会被阻塞,导致性能瓶颈和用户体验下降。
为了解决这个问题,我们需要引入异步事件发布。这意味着当一个事件被发布时,订阅者的处理函数将在后台执行,而发布者可以立即返回,继续其主流程。
我们将使用 Python 的 concurrent.futures.ThreadPoolExecutor 来实现异步。这是一个高级的接口,用于在单独的线程中执行可调用对象。
修改策略:
- 引入
ThreadPoolExecutor: 在EventBus的构造函数中创建一个线程池。 - 修改
publish方法: 不再直接调用handler,而是将handler提交到线程池中执行。
import collections
import concurrent.futures
import threading # 用于线程安全
class AsyncEventBus:
"""
一个支持异步事件发布、命名空间和通配符订阅的事件总线。
"""
WILDCARD = '*'
NAMESPACE_DELIMITER = ':'
def __init__(self, max_workers: int = 5):
# 存储所有订阅者。键是订阅模式(topic或namespace:*, *:event等),值是回调函数列表。
self._subscribers = collections.defaultdict(list)
# 线程池用于异步执行事件处理器
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
# 保护 _subscribers 字典的线程锁,防止并发修改
self._lock = threading.Lock()
print(f"AsyncEventBus initialized with {max_workers} worker threads.")
def _validate_topic(self, topic: str):
"""验证主题或订阅模式的格式是否有效。"""
if not isinstance(topic, str) or not topic:
raise ValueError("Topic or subscription pattern must be a non-empty string.")
return True
def subscribe(self, pattern: str, handler):
"""
订阅一个事件模式。
模式可以包含命名空间和通配符。
例如: "user:created", "user:*", "*:deleted", "*"
:param pattern: 订阅模式的字符串标识符。
:param handler: 当事件发布时将被异步调用的回调函数。
"""
self._validate_topic(pattern)
if not callable(handler):
raise TypeError("Handler must be a callable function or method.")
with self._lock:
if handler not in self._subscribers[pattern]:
self._subscribers[pattern].append(handler)
print(f"Subscribed: Handler {handler.__name__} to pattern '{pattern}'")
else:
print(f"Handler {handler.__name__} already subscribed to pattern '{pattern}'")
def unsubscribe(self, pattern: str, handler):
"""
取消订阅一个事件模式。
:param pattern: 之前订阅过的事件模式。
:param handler: 之前订阅过的回调函数。
"""
self._validate_topic(pattern)
with self._lock:
if pattern in self._subscribers and handler in self._subscribers[pattern]:
self._subscribers[pattern].remove(handler)
print(f"Unsubscribed: Handler {handler.__name__} from pattern '{pattern}'")
if not self._subscribers[pattern]:
del self._subscribers[pattern]
else:
print(f"Warning: Handler {handler.__name__} not found for pattern '{pattern}'")
def _get_matching_patterns(self, topic: str):
"""
根据发布的事件主题,查找所有匹配的订阅模式。
支持单层通配符。
例如,如果主题是 "user:created",它会匹配 "user:created", "user:*", "*:created", "*"
:param topic: 发布的事件主题。
:return: 匹配的订阅模式列表。
"""
self._validate_topic(topic)
matching_patterns = set()
# 1. 精确匹配
if topic in self._subscribers:
matching_patterns.add(topic)
# 2. 通配符匹配
parts = topic.split(self.NAMESPACE_DELIMITER)
if len(parts) == 1: # 只有一层,例如 "event"
# 匹配 "*:event" (不适用,因为没有分隔符)
# 匹配 "*"
if self.WILDCARD in self._subscribers:
matching_patterns.add(self.WILDCARD)
# 匹配 "event" (已在精确匹配中处理)
elif len(parts) == 2: # 典型命名空间格式,例如 "namespace:event"
namespace, event_name = parts[0], parts[1]
# 匹配 "namespace:*"
ns_wildcard_pattern = f"{namespace}{self.NAMESPACE_DELIMITER}{self.WILDCARD}"
if ns_wildcard_pattern in self._subscribers:
matching_patterns.add(ns_wildcard_pattern)
# 匹配 "*:event_name"
event_wildcard_pattern = f"{self.WILDCARD}{self.NAMESPACE_DELIMITER}{event_name}"
if event_wildcard_pattern in self._subscribers:
matching_patterns.add(event_wildcard_pattern)
# 匹配 "*" (全局通配符)
if self.WILDCARD in self._subscribers:
matching_patterns.add(self.WILDCARD)
else: # 更多层级的命名空间,例如 "a:b:c"
# 为了简化,我们目前只支持单层通配符。
# 如果需要多层通配符,例如 "a:*:c" 或 "a:**", 匹配逻辑会更复杂
# 暂时只匹配 "a:b:c" (精确匹配) 和 "*"
if self.WILDCARD in self._subscribers:
matching_patterns.add(self.WILDCARD)
return list(matching_patterns)
def publish(self, topic: str, *args, **kwargs):
"""
异步发布一个事件到指定主题。
所有匹配该主题的订阅者都将在单独的线程中被调用。
:param topic: 事件主题的字符串标识符。
:param args: 传递给订阅者回调函数的定位参数。
:param kwargs: 传递给订阅者回调函数的关键字参数。
"""
self._validate_topic(topic)
print(f"nPublishing event to topic '{topic}' (asynchronously)")
# 获取所有匹配的模式
matching_patterns = self._get_matching_patterns(topic)
# 收集所有需要调用的处理器
handlers_to_execute = []
with self._lock:
for pattern in matching_patterns:
handlers_to_execute.extend(list(self._subscribers[pattern])) # 复制列表
if not handlers_to_execute:
print(f"No subscribers found for topic '{topic}' or matching patterns.")
return
futures = []
for handler in handlers_to_execute:
try:
# 提交任务到线程池,并捕获 Future 对象
future = self._executor.submit(handler, *args, **kwargs)
futures.append(future)
except Exception as e:
print(f"Error submitting handler {handler.__name__} for topic '{topic}' to executor: {e}")
# 可以选择等待所有 Future 完成,或者在后台处理结果
# 对于异步发布,通常不等待,但为了演示,我们可以打印结果
# 注意:这里打印 Future 的结果会阻塞发布者,实际应用中可能不会这么做
# 而是通过 Future.add_done_callback() 来处理结果
print(f"Event '{topic}' published. {len(futures)} handlers submitted to thread pool.")
for future in futures:
# 在实际生产系统中,通常不会在这里阻塞等待结果
# 而是使用 `add_done_callback` 或者将 Future 存储起来供后续处理
# 这里的 `result()` 只是为了演示异步执行的完成情况和潜在异常
try:
future.result() # 阻塞获取结果,如果处理器抛出异常,这里会重新抛出
except Exception as e:
print(f"Error during async execution of a handler for topic '{topic}': {e}")
def shutdown(self):
"""
关闭事件总线,等待所有挂起的异步任务完成。
"""
print("Shutting down AsyncEventBus...")
self._executor.shutdown(wait=True)
print("AsyncEventBus shut down successfully.")
# 异步和命名空间示例用法
if __name__ == "__main__":
bus = AsyncEventBus(max_workers=3)
def user_created_logger(username):
import time
print(f" [Handler] User created: {username} (logging)")
time.sleep(0.1) # Simulate some work
def user_profile_updater(username):
import time
print(f" [Handler] Updating profile for: {username} (DB op)")
time.sleep(0.5) # Simulate a longer DB operation
def notification_sender(event_data):
import time
print(f" [Handler] Sending notification: {event_data} (network op)")
time.sleep(0.3) # Simulate a network request
def analytics_tracker(event_name, data):
import time
print(f" [Handler] Tracking analytics for '{event_name}': {data}")
time.sleep(0.2)
def global_error_handler(topic, error_message):
print(f" [Handler] GLOBAL ERROR: {topic} - {error_message}")
def specific_error_handler(error_code):
print(f" [Handler] Specific error handler received code: {error_code}")
raise ValueError("Simulated error in handler!") # This handler will fail
print("n--- Subscriptions ---")
bus.subscribe("user:created", user_created_logger)
bus.subscribe("user:created", user_profile_updater)
bus.subscribe("system:error", global_error_handler)
bus.subscribe("system:error", specific_error_handler) # This one will raise an exception
bus.subscribe("analytics:pageview", analytics_tracker)
bus.subscribe("analytics:click", analytics_tracker)
# 命名空间通配符订阅
bus.subscribe("user:*", notification_sender) # 订阅所有user命名空间下的事件
bus.subscribe("*:error", global_error_handler) # 订阅所有命名空间下的 error 事件
bus.subscribe(bus.WILDCARD, lambda *args, **kwargs: print(f" [Handler] Global listener caught: {args} {kwargs}"))
print("n--- Publishing 'user:created' ---")
bus.publish("user:created", "Alice")
print("Publisher continues immediately after 'user:created' event.")
print("n--- Publishing 'system:error' ---")
bus.publish("system:error", "DB_CONNECTION_FAILED", error_message="Failed to connect to primary DB.")
print("Publisher continues immediately after 'system:error' event.")
print("n--- Publishing 'analytics:pageview' ---")
bus.publish("analytics:pageview", "homepage", data={"user_id": 123})
print("n--- Publishing 'user:deleted' (matches user:* and *) ---")
bus.publish("user:deleted", "Bob")
print("n--- Publishing 'payment:failed' (matches *:error and *) ---")
bus.publish("payment:failed", "TRANSACTION_TIMEOUT", error_message="Payment gateway timed out.")
print("n--- Unsubscribing specific_error_handler ---")
bus.unsubscribe("system:error", specific_error_handler)
print("n--- Publishing 'system:error' again (specific_error_handler is gone) ---")
bus.publish("system:error", "CACHE_INVALIDATION_FAILED", error_message="Failed to invalidate cache.")
# 确保所有异步任务完成
print("n--- Waiting for all events to be processed ---")
bus.shutdown()
print("All events processed. Application exiting.")
关键改进:
ThreadPoolExecutor:__init__中创建了一个线程池。max_workers参数控制了可以并行执行的线程数量。publish方法异步化: 当调用publish时,它不再直接调用handler,而是使用self._executor.submit(handler, *args, **kwargs)将handler的执行提交到线程池。submit方法会立即返回一个Future对象,而handler会在后台线程中执行。发布者无需等待。shutdown方法: 这是一个非常重要的方法。在应用程序退出之前,必须调用bus.shutdown()来优雅地关闭线程池,等待所有已提交但尚未完成的任务完成。- 线程安全:
_subscribers字典是一个共享资源,可能在多个线程中被subscribe、unsubscribe和publish方法访问。为了避免竞态条件和数据不一致,我们使用threading.Lock来保护对_subscribers的修改和读取操作。with self._lock:语句确保在访问共享资源时获取锁并在离开代码块时释放锁。 - 异步错误处理:
future.result()方法可以用来获取异步任务的返回值,如果任务在执行过程中抛出异常,result()调用也会重新抛出该异常。在实际应用中,通常会使用future.add_done_callback()来注册一个回调函数,在异步任务完成时处理结果或异常,而不是阻塞发布者。这里为了演示,简单地在发布者侧try-except future.result()。
4. 引入命名空间与通配符订阅
随着系统规模的增长,事件的数量也会急剧增加。扁平化的主题列表(如 user_created, order_placed)很快就会变得难以管理。命名空间提供了一种组织事件的方式,可以将相关事件分组,例如 user:created, user:deleted, order:placed, order:cancelled。
更进一步,我们希望能够订阅一个命名空间下的所有事件,或者所有命名空间中特定类型的事件。这就引入了通配符订阅。
命名空间与通配符设计:
- 分隔符: 我们将使用
:作为命名空间分隔符,例如user:created。 - 通配符: 我们将使用
*作为通配符,表示匹配任何单个段。user:created: 精确订阅user命名空间下的created事件。user:*: 订阅user命名空间下的所有事件(例如user:created,user:deleted)。*:created: 订阅所有命名空间下的created事件(例如user:created,order:created)。*: 全局订阅,匹配所有事件。
修改策略:
subscribe 和 unsubscribe 方法的接口保持不变,但它们的 pattern 参数现在可以接受通配符。
核心的改变在于 publish 方法:它需要一个机制来识别发布的 topic 能够匹配哪些 pattern。为此,我们将引入一个辅助方法 _get_matching_patterns。
_get_matching_patterns(topic) 实现细节:
假设发布的 topic 是 namespace:event_name。
它需要匹配以下几种模式:
- 精确匹配:
namespace:event_name - 命名空间通配符:
namespace:* - 事件名通配符:
*:event_name - 全局通配符:
*
对于更复杂的多层命名空间(例如 a:b:c),如果我们希望支持 a:*:c 这样的匹配,_get_matching_patterns 的逻辑会变得更加复杂,可能需要递归或更高级的模式匹配算法。为了保持实现的简洁性和实用性,我们目前的实现将主要关注单层命名空间和通配符。
代码已在上一节的 AsyncEventBus 中集成。
_get_matching_patterns 逻辑分析:
- 精确匹配: 总是检查
topic本身是否被订阅。 - 分割主题: 将
topic按照NAMESPACE_DELIMITER分割成parts。 - 单层主题处理: 如果
parts只有一个元素(例如event_name),则它只能匹配event_name(精确匹配) 和*(全局通配符)。 - 双层主题处理: 如果
parts有两个元素(例如namespace:event_name),则除了精确匹配,还需要检查namespace:*和*:event_name以及*。 - 多层主题处理: 对于
a:b:c这样的主题,为了简化,当前实现只匹配精确主题a:b:c和全局通配符*。要支持a:*:c或a:b:*,需要更复杂的模式树或正则表达式匹配。
通配符匹配逻辑表格:
| 发布主题 (Topic) | 匹配模式 (Pattern) | 匹配结果举例 |
|---|---|---|
user:created |
user:created |
精确匹配 |
user:created |
user:* |
user 命名空间下的所有事件 |
user:created |
*:created |
所有 created 事件 |
user:created |
* |
所有事件 |
system:error |
system:error |
精确匹配 |
system:error |
system:* |
system 命名空间下的所有事件 |
system:error |
*:error |
所有 error 事件 |
system:error |
* |
所有事件 |
simple_event |
simple_event |
精确匹配 (无命名空间) |
simple_event |
* |
所有事件 (全局通配符) |
another:sub:event |
another:sub:event |
精确匹配 (多层命名空间,当前通配符机制不处理 a:*:c) |
another:sub:event |
* |
所有事件 (全局通配符) |
publish 方法中的匹配:
publish 方法首先通过 _get_matching_patterns(topic) 获取所有匹配当前发布主题的订阅模式。然后,它会遍历这些模式,从 _subscribers 中收集所有对应的 handler,并将它们提交到线程池中执行。
5. 精炼与高级考量
我们的 AsyncEventBus 已经相当强大,但作为编程专家,我们还需要考虑一些精炼和高级特性。
5.1 错误处理与日志
异步事件处理器中的错误不应该静默失败。虽然我们已经通过 try-except 包裹了 handler 的调用,并打印了错误信息,但在生产环境中,我们需要更健壮的错误处理:
- 集中式错误日志: 将错误信息记录到日志系统,而不是直接打印到控制台。
- 重试机制: 对于某些瞬时错误,可以考虑实现自动重试。
- 死信队列(Dead Letter Queue): 对于无法处理的事件,将其发送到一个特殊的队列,以便后续人工检查或处理。
- 错误回调: 允许订阅者注册一个专门的错误处理回调,当它们的异步任务失败时被调用。
# 示例:更高级的错误处理(可以在 Future.add_done_callback 中使用)
def _handle_future_result(future, topic, handler_name):
"""
一个用于处理异步任务结果的回调函数。
"""
try:
result = future.result() # 获取任务结果,如果失败会抛出异常
# print(f" [Async Result] Handler '{handler_name}' for topic '{topic}' completed successfully.")
# 可以在这里处理成功的结果
except Exception as e:
print(f" [Async Error] Handler '{handler_name}' for topic '{topic}' failed: {type(e).__name__}: {e}")
# 记录到日志系统
# 触发一个专门的错误事件,例如 bus.publish("event:error", topic, handler_name, str(e))
5.2 单次订阅 (once)
有时,订阅者只对某个事件的第一次发生感兴趣。once 方法可以实现这一点:在事件被触发并处理后,自动取消订阅。
class AsyncEventBus:
# ... (其他代码保持不变) ...
def once(self, pattern: str, handler):
"""
订阅一个事件模式,但只接收一次通知。
在事件触发后,该处理器将自动取消订阅。
:param pattern: 订阅模式的字符串标识符。
:param handler: 当事件发布时将被调用的回调函数。
"""
# 创建一个包装器,在执行后自动取消订阅
def once_wrapper(*args, **kwargs):
try:
handler(*args, **kwargs)
finally:
# 确保在执行后尝试取消订阅,即使 handler 失败
self.unsubscribe(pattern, once_wrapper)
# 将原始 handler 存储在 wrapper 属性中,以便后续匹配和取消订阅
once_wrapper._original_handler = handler
self.subscribe(pattern, once_wrapper)
print(f"Subscribed once: Handler {handler.__name__} to pattern '{pattern}'")
# `unsubscribe` 方法需要稍微调整,以便能够正确取消 `once_wrapper`
# ...
# def unsubscribe(self, pattern: str, handler):
# with self._lock:
# if pattern in self._subscribers:
# # 检查原始 handler 或 wrapper 本身
# handlers_to_remove = []
# for existing_handler in self._subscribers[pattern]:
# if existing_handler == handler: # 精确匹配
# handlers_to_remove.append(existing_handler)
# elif hasattr(existing_handler, '_original_handler') and existing_handler._original_handler == handler:
# handlers_to_remove.append(existing_handler)
#
# for h in handlers_to_remove:
# self._subscribers[pattern].remove(h)
# print(f"Unsubscribed: Handler {handler.__name__} from pattern '{pattern}'")
#
# if not self._subscribers[pattern]:
# del self._subscribers[pattern]
# else:
# print(f"Warning: Handler {handler.__name__} not found for pattern '{pattern}'")
注意: once 的实现需要 unsubscribe 能够识别包装器 once_wrapper 和原始 handler 的关系。一个简单的做法是在 once_wrapper 上添加一个属性来指向原始 handler,以便在 unsubscribe 时能够通过原始 handler 来找到并移除 wrapper。为了文章主线清晰,这里不深入展开修改 unsubscribe,仅提供 once 的概念。
5.3 事件数据结构化
目前我们使用 *args 和 **kwargs 传递事件数据。在更复杂的系统中,将事件数据封装成一个统一的事件对象会更有利于类型检查、数据一致性和未来扩展。
from typing import Any, Dict
class Event:
"""
一个结构化的事件对象。
"""
def __init__(self, topic: str, payload: Dict[str, Any] = None, timestamp: float = None):
self.topic = topic
self.payload = payload if payload is not None else {}
self.timestamp = timestamp if timestamp is not None else time.time()
def __repr__(self):
return f"Event(topic='{self.topic}', payload={self.payload}, timestamp={self.timestamp})"
# 那么 publish 方法可以变为:
# def publish(self, event: Event):
# # ...
# for handler in handlers_to_execute:
# self._executor.submit(handler, event) # 只传递 event 对象
#
# 订阅者则接收 event 对象:
# def my_handler(event: Event):
# print(f"Received event {event.topic} with payload {event.payload}")
这种方式将事件的元数据(如 topic, timestamp)与业务数据 (payload) 分离,使事件处理更加规范。
5.4 性能考量
对于高并发、高吞吐量的系统,需要更深入地考虑性能:
- 线程池大小:
max_workers的选择至关重要。过小会导致任务积压,过大可能导致线程切换开销。通常根据 CPU 核数和 I/O 密集型任务的比例进行调整。 - 避免阻塞线程池: 确保订阅者处理器本身不会执行长时间的 CPU 密集型计算或长时间的阻塞 I/O 操作。如果确实有这类任务,考虑使用
ProcessPoolExecutor或专门的异步 I/O 框架(如asyncio)。 - 订阅者数据结构优化: 对于海量订阅者,
_subscribers的查找效率可能会成为瓶颈。如果通配符模式非常复杂,可能需要使用 Trie 树或其他高级数据结构来优化匹配过程。
6. 完整的 AsyncEventBus 实现
我们将把所有的考量和特性整合到一个最终的 AsyncEventBus 类中。
import collections
import concurrent.futures
import threading
import time
from typing import Any, Dict, List, Callable, Set
# 定义一个结构化的事件对象,方便传递和处理事件数据
class Event:
"""
一个结构化的事件对象,包含事件主题、负载和时间戳。
"""
def __init__(self, topic: str, payload: Dict[str, Any] = None, timestamp: float = None):
if not isinstance(topic, str) or not topic:
raise ValueError("Event topic must be a non-empty string.")
self.topic = topic
self.payload = payload if payload is not None else {}
self.timestamp = timestamp if timestamp is not None else time.time()
def __repr__(self):
return f"Event(topic='{self.topic}', payload={self.payload}, timestamp={self.timestamp}, ts_iso='{time.ctime(self.timestamp)}')"
def __str__(self):
return self.__repr__()
class AsyncEventBus:
"""
一个支持命名空间、通配符订阅和异步事件发布的事件总线。
它使用线程池来异步执行事件处理器,并包含线程安全机制。
"""
WILDCARD = '*'
NAMESPACE_DELIMITER = ':'
def __init__(self, max_workers: int = 5):
# 存储所有订阅者。键是订阅模式(topic或namespace:*, *:event等),值是回调函数列表。
# 使用 defaultdict 简化主题不存在时的处理,自动创建空列表
self._subscribers: Dict[str, List[Callable]] = collections.defaultdict(list)
# 线程池用于异步执行事件处理器
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
# 保护 _subscribers 字典的线程锁,防止并发修改
self._lock = threading.Lock()
print(f"[{time.ctime()}] AsyncEventBus initialized with {max_workers} worker threads.")
def _validate_pattern(self, pattern: str):
"""验证主题或订阅模式的格式是否有效。"""
if not isinstance(pattern, str) or not pattern:
raise ValueError("Pattern (topic or subscription) must be a non-empty string.")
# 进一步可以添加规则,例如不允许连续的通配符或分隔符等
return True
def subscribe(self, pattern: str, handler: Callable):
"""
订阅一个事件模式。
模式可以包含命名空间和通配符。
例如: "user:created", "user:*", "*:deleted", "*"
:param pattern: 订阅模式的字符串标识符。
:param handler: 当事件发布时将被异步调用的回调函数。它应该接受一个 Event 对象作为参数。
"""
self._validate_pattern(pattern)
if not callable(handler):
raise TypeError("Handler must be a callable function or method.")
with self._lock:
if handler not in self._subscribers[pattern]:
self._subscribers[pattern].append(handler)
print(f"[{time.ctime()}] Subscribed: Handler '{handler.__name__}' to pattern '{pattern}'")
else:
print(f"[{time.ctime()}] Warning: Handler '{handler.__name__}' already subscribed to pattern '{pattern}'")
def unsubscribe(self, pattern: str, handler: Callable):
"""
取消订阅一个事件模式。
:param pattern: 之前订阅过的事件模式。
:param handler: 之前订阅过的回调函数。
"""
self._validate_pattern(pattern)
with self._lock:
if pattern in self._subscribers:
# 考虑到 once 订阅可能使用了包装器,我们需要查找原始 handler
handlers_to_remove = []
for existing_handler in self._subscribers[pattern]:
if existing_handler == handler: # 精确匹配
handlers_to_remove.append(existing_handler)
# 如果是 once_wrapper,检查其内部是否包装了目标 handler
elif hasattr(existing_handler, '_original_handler') and existing_handler._original_handler == handler:
handlers_to_remove.append(existing_handler)
for h in handlers_to_remove:
self._subscribers[pattern].remove(h)
print(f"[{time.ctime()}] Unsubscribed: Handler '{handler.__name__}' from pattern '{pattern}'")
if not self._subscribers[pattern]:
del self._subscribers[pattern] # 清理空列表
else:
print(f"[{time.ctime()}] Warning: Pattern '{pattern}' has no subscribers to unsubscribe '{handler.__name__}'.")
def once(self, pattern: str, handler: Callable):
"""
订阅一个事件模式,但只接收一次通知。
在事件触发并成功处理后,该处理器将自动取消订阅。
:param pattern: 订阅模式的字符串标识符。
:param handler: 当事件发布时将被调用的回调函数。它应该接受一个 Event 对象作为参数。
"""
def once_wrapper(event: Event):
try:
handler(event)
finally:
# 即使 handler 失败,也尝试取消订阅,确保只执行一次
self.unsubscribe(pattern, once_wrapper)
# 将原始 handler 存储在 wrapper 属性中,以便 unsubscribe 能够识别
once_wrapper._original_handler = handler
self.subscribe(pattern, once_wrapper)
print(f"[{time.ctime()}] Subscribed once: Handler '{handler.__name__}' to pattern '{pattern}'")
def _get_matching_patterns(self, topic: str) -> List[str]:
"""
根据发布的事件主题,查找所有匹配的订阅模式。
支持单层通配符。
例如,如果主题是 "user:created",它会匹配 "user:created", "user:*", "*:created", "*"
:param topic: 发布的事件主题。
:return: 匹配的订阅模式列表。
"""
self._validate_pattern(topic)
matching_patterns: Set[str] = set()
# 1. 精确匹配
if topic in self._subscribers:
matching_patterns.add(topic)
# 2. 通配符匹配
parts = topic.split(self.NAMESPACE_DELIMITER)
if len(parts) == 1: # 只有一层,例如 "event_name"
# 匹配 "event_name" (已在精确匹配中处理)
# 匹配 "*"
if self.WILDCARD in self._subscribers:
matching_patterns.add(self.WILDCARD)
elif len(parts) == 2: # 典型命名空间格式,例如 "namespace:event_name"
namespace, event_name = parts[0], parts[1]
# 匹配 "namespace:*"
ns_wildcard_pattern = f"{namespace}{self.NAMESPACE_DELIMITER}{self.WILDCARD}"
if ns_wildcard_pattern in self._subscribers:
matching_patterns.add(ns_wildcard_pattern)
# 匹配 "*:event_name"
event_wildcard_pattern = f"{self.WILDCARD}{self.NAMESPACE_DELIMITER}{event_name}"
if event_wildcard_pattern in self._subscribers:
matching_patterns.add(event_wildcard_pattern)
# 匹配 "*" (全局通配符)
if self.WILDCARD in self._subscribers:
matching_patterns.add(self.WILDCARD)
else: # 更多层级的命名空间,例如 "a:b:c"
# 为了简化,我们目前只支持单层通配符。
# 暂时只匹配精确主题和 "*"
if self.WILDCARD in self._subscribers:
matching_patterns.add(self.WILDCARD)
return list(matching_patterns)
def publish(self, topic: str, **payload: Any):
"""
异步发布一个事件到指定主题。
所有匹配该主题的订阅者都将在单独的线程中被调用。
事件数据被封装在 Event 对象中传递给处理器。
:param topic: 事件主题的字符串标识符。
:param payload: 传递给订阅者 Event 对象的负载数据。
"""
self._validate_pattern(topic)
event = Event(topic=topic, payload=payload)
print(f"n[{time.ctime()}] Publishing event: {event.topic} (asynchronously)")
# 获取所有匹配的模式
matching_patterns = self._get_matching_patterns(topic)
# 收集所有需要调用的处理器
handlers_to_execute = []
with self._lock:
for pattern in matching_patterns:
# 使用 list() 创建副本,防止在迭代时修改列表
handlers_to_execute.extend(list(self._subscribers[pattern]))
if not handlers_to_execute:
print(f"[{time.ctime()}] No subscribers found for event '{event.topic}' or matching patterns.")
return
futures = []
for handler in handlers_to_execute:
try:
# 提交任务到线程池,并捕获 Future 对象
future = self._executor.submit(handler, event)
future.add_done_callback(
lambda f, h=handler.__name__, t=event.topic: self._handle_future_result(f, t, h)
)
futures.append(future)
except Exception as e:
print(f"[{time.ctime()}] Error submitting handler '{handler.__name__}' for topic '{event.topic}' to executor: {e}")
print(f"[{time.ctime()}] Event '{event.topic}' published. {len(futures)} handlers submitted to thread pool.")
# 发布者立即返回,不等待 Future 完成
def _handle_future_result(self, future: concurrent.futures.Future, topic: str, handler_name: str):
"""
一个私有回调函数,用于处理异步任务的完成(成功或失败)。
这个函数将在任务完成时由线程池调用。
"""
try:
future.result() # 尝试获取结果,如果任务失败,这里会重新抛出异常
# print(f"[{time.ctime()}] [Async Result] Handler '{handler_name}' for topic '{topic}' completed successfully.")
except Exception as e:
print(f"[{time.ctime()}] [Async Error] Handler '{handler_name}' for topic '{topic}' failed: {type(e).__name__}: {e}")
# 这里可以集成更高级的错误处理,如记录到日志、发送警报、发布一个错误事件等
# self.publish("bus:error", original_topic=topic, handler=handler_name, error_message=str(e)) # 避免无限循环
def shutdown(self, wait: bool = True, timeout: float = None):
"""
关闭事件总线,等待所有挂起的异步任务完成。
:param wait: 如果为 True,则在所有挂起的工作完成之前阻塞。
:param timeout: 如果 wait 为 True,则在超时秒后强制关闭。
"""
print(f"n[{time.ctime()}] Shutting down AsyncEventBus (wait={wait}, timeout={timeout})...")
self._executor.shutdown(wait=wait, timeout=timeout)
print(f"[{time.ctime()}] AsyncEventBus shut down successfully.")
# --- 完整示例用法 ---
if __name__ == "__main__":
bus = AsyncEventBus(max_workers=3)
def user_created_logger(event: Event):
print(f"[{time.ctime()}] [Handler: Logger] User created: {event.payload.get('username')} (logging for {event.topic})")
time.sleep(0.1)
def user_profile_updater(event: Event):
print(f"[{time.ctime()}] [Handler: ProfileUpdater] Updating profile for: {event.payload.get('username')} (DB op for {event.topic})")
time.sleep(0.5)
def notification_sender(event: Event):
print(f"[{time.ctime()}] [Handler: Notifier] Sending notification for: {event.topic} event with data: {event.payload} (network op)")
time.sleep(0.3)
def analytics_tracker(event: Event):
print(f"[{time.ctime()}] [Handler: Analytics] Tracking '{event.topic}': {event.payload}")
time.sleep(0.2)
def global_error_handler(event: Event):
print(f"[{time.ctime()}] [Handler: GlobalError] GLOBAL ERROR: {event.topic} - {event.payload.get('error_message')}")
def specific_error_handler(event: Event):
print(f"[{time.ctime()}] [Handler: SpecificError] Specific error handler received code: {event.payload.get('error_code')}")
raise ValueError("Simulated error in specific_error_handler!") # This handler will fail
def once_handler(event: Event):
print(f"[{time.ctime()}] [Handler: Once] This should only run once for {event.topic}!")
time.sleep(0.05)
print("n--- Subscriptions ---")
bus.subscribe("user:created", user_created_logger)
bus.subscribe("user:created", user_profile_updater)
bus.subscribe("system:error", global_error_handler)
bus.subscribe("system:error", specific_error_handler) # This one will raise an exception
bus.subscribe("analytics:pageview", analytics_tracker)
bus.subscribe("analytics:click", analytics_tracker)
bus.once("user:deleted", once_handler) # 订阅一次性事件
# 命名空间通配符订阅
bus.subscribe("user:*", notification_sender) # 订阅所有user命名空间下的事件
bus.subscribe("*:error", global_error_handler) # 订阅所有命名空间下的 error 事件 (注意与 system:error 的重复)
bus.subscribe(bus.WILDCARD, lambda event: print(f"[{time.ctime()}] [Handler: GlobalListener] Caught ALL: {event.topic}"))
print("n--- Publishing 'user:created' ---")
bus.publish("user:created", username="Alice", email="[email protected]")
print(f"[{time.ctime()}] Publisher continues immediately after 'user:created' event.")
print("n--- Publishing 'system:error' ---")
bus.publish("system:error", error_code="DB_CONNECTION_FAILED", error_message="Failed to connect to primary DB.")
print(f"[{time.ctime()}] Publisher continues immediately after 'system:error' event.")
print("n--- Publishing 'analytics:pageview' ---")
bus.publish("analytics:pageview", page_url="/home", user_id=123)
print("n--- Publishing 'user:deleted' (matches user:* and * and once_handler) ---")
bus.publish("user:deleted", username="Bob", reason="left company")
print(f"[{time.ctime()}] Publisher continues immediately after 'user:deleted' event.")
print("n--- Publishing 'user:deleted' again (once_handler should not run) ---")
bus.publish("user:deleted", username="Charlie", reason="privacy request")
print(f"[{time.ctime()}] Publisher continues immediately after 'user:deleted' event.")
print("n--- Publishing 'payment:failed' (matches *:error and *) ---")
bus.publish("payment:failed", error_code="TRANSACTION_TIMEOUT", error_message="Payment gateway timed out.")
print("n--- Unsubscribing specific_error_handler ---")
bus.unsubscribe("system:error", specific_error_handler)
print("n--- Publishing 'system:error' again (specific_error_handler is gone) ---")
bus.publish("system:error", error_code="CACHE_INVALIDATION_FAILED", error_message="Failed to invalidate cache.")
# 确保所有异步任务完成
bus.shutdown()
print(f"[{time.ctime()}] All events processed. Application exiting.")
代码总结:
Event类: 定义了标准化的事件载体,包含topic、payload和timestamp。所有处理器都将接收一个Event实例。AsyncEventBus初始化: 在构造函数中创建ThreadPoolExecutor和threading.Lock。subscribe和once:subscribe负责将handler注册到_subscribers字典中。once方法通过一个包装器once_wrapper实现,它在执行handler后自动调用unsubscribe。为了让unsubscribe能正确识别,once_wrapper上附加了_original_handler属性。
unsubscribe: 增强了逻辑,不仅能移除原始handler,也能移除once创建的包装器。_get_matching_patterns: 核心的模式匹配逻辑,负责识别发布的topic能够触发哪些通配符订阅。publish:- 创建
Event对象。 - 调用
_get_matching_patterns获取所有匹配的订阅模式。 - 收集所有匹配模式下的
handler。 - 将每个
handler和Event对象提交给_executor线程池。 - 为每个
Future添加_handle_future_result回调,用于在任务完成时处理结果或错误。 - 发布者立即返回,不会等待事件处理完成。
- 创建
_handle_future_result: 异步任务完成后,由线程池调用。负责捕获并打印处理器中发生的异常,可以扩展为更复杂的错误报告机制。shutdown: 优雅地关闭线程池,等待所有提交的任务完成,防止数据丢失或资源泄露。
7. 结语
我们从 Pub/Sub 模式的基础概念出发,逐步构建了一个功能强大的 AsyncEventBus。它实现了事件的发布与订阅、支持命名空间和通配符匹配,并通过线程池实现了异步事件处理,同时兼顾了线程安全和基本错误处理。
这个实现为构建解耦、可扩展和响应迅速的应用程序提供了一个坚实的基础。在实际项目中,您可以根据具体需求进一步扩展,例如支持更复杂的通配符规则、事件优先级、持久化事件、分布式 Pub/Sub 消息队列集成,或是与 asyncio 等异步框架结合,以适应更广泛的场景。理解并掌握这样的模式,是成为一名优秀编程专家的必经之路。