手写实现一个 Pub/Sub(发布-订阅)模式:支持命名空间与异步事件发布

各位技术同仁,下午好!

今天,我们将深入探讨一个在现代软件架构中无处不在,却又常常被低估其复杂性的设计模式——发布-订阅(Pub/Sub)模式。我将带领大家手写实现一个功能完备的 Pub/Sub 系统,它不仅支持基本的事件发布与订阅,更将融入命名空间管理与异步事件发布这两大核心特性,以满足真实世界中复杂系统的需求。

1. 发布-订阅模式的基石:解耦与协作

发布-订阅模式的核心理念在于解耦。它允许系统的不同组件在不直接知晓彼此存在的情况下进行通信。想象一下,一个报社(发布者)发布新闻,而读者(订阅者)阅读新闻。报社不需要知道每个读者的姓名和地址,读者也不需要直接联系报社获取新闻,他们都通过一个中间媒介——报纸(事件总线)进行交互。

在软件领域,这种模式的优势显而易见:

  • 降低耦合度: 发布者和订阅者之间没有直接依赖,它们只依赖于事件总线和预定义的事件类型。这使得系统模块化,更易于开发、测试和维护。
  • 提高可扩展性: 可以在不修改现有发布者代码的情况下,轻松添加新的订阅者来响应特定事件。反之亦然。
  • 增强灵活性: 订阅者可以按需订阅感兴趣的事件,发布者可以专注于发布事件,无需关心事件的处理逻辑。
  • 支持异步处理: 事件的发布和处理可以是异步的,这对于非阻塞操作、后台任务和性能优化至关重要。

我们的目标是构建一个名为 AsyncEventBus 的类,它将作为事件总线,负责管理事件的注册、发布和分发。

2. 构建核心 Pub/Sub 机制(同步版)

在引入命名空间和异步特性之前,我们先从最基础的同步 Pub/Sub 机制开始。这有助于我们理解其基本的数据结构和操作。

核心组件:

  • _subscribers 字典: 用于存储所有订阅者。键是事件主题(topic),值是一个列表,包含所有订阅了该主题的回调函数(handler)。
  • subscribe(topic, handler) 方法: 注册一个订阅者。
  • unsubscribe(topic, handler) 方法: 移除一个订阅者。
  • *`publish(topic, args, kwargs)` 方法: 发布一个事件,并同步通知所有相关订阅者。
import collections

class BasicEventBus:
    """
    一个基础的同步发布-订阅事件总线。
    """
    def __init__(self):
        # 使用 defaultdict 简化主题不存在时的处理,自动创建空列表
        self._subscribers = collections.defaultdict(list)

    def subscribe(self, topic: str, handler):
        """
        订阅一个事件主题。
        :param topic: 事件主题的字符串标识符。
        :param handler: 当事件发布时将被调用的回调函数。
        """
        if not callable(handler):
            raise TypeError("Handler must be a callable function or method.")
        if handler not in self._subscribers[topic]:
            self._subscribers[topic].append(handler)
        print(f"Subscribed: Handler {handler.__name__} to topic '{topic}'")

    def unsubscribe(self, topic: str, handler):
        """
        取消订阅一个事件主题。
        :param topic: 事件主题的字符串标识符。
        :param handler: 之前订阅过的回调函数。
        """
        if topic in self._subscribers and handler in self._subscribers[topic]:
            self._subscribers[topic].remove(handler)
            print(f"Unsubscribed: Handler {handler.__name__} from topic '{topic}'")
            # 如果主题下没有订阅者了,可以清理空列表,但 defaultdict 会自动处理
            if not self._subscribers[topic]:
                del self._subscribers[topic]
        else:
            print(f"Warning: Handler {handler.__name__} not found for topic '{topic}'")

    def publish(self, topic: str, *args, **kwargs):
        """
        发布一个事件到指定主题。
        :param topic: 事件主题的字符串标识符。
        :param args: 传递给订阅者回调函数的定位参数。
        :param kwargs: 传递给订阅者回调函数的关键字参数。
        """
        print(f"Publishing event to topic '{topic}' with args={args}, kwargs={kwargs}")
        if topic in self._subscribers:
            # 迭代订阅者列表的副本,防止在迭代过程中修改列表(例如通过 unsubscribe)
            for handler in list(self._subscribers[topic]):
                try:
                    handler(*args, **kwargs)
                except Exception as e:
                    print(f"Error in handler {handler.__name__} for topic '{topic}': {e}")
        else:
            print(f"No subscribers for topic '{topic}'")

# 示例用法
if __name__ == "__main__":
    event_bus = BasicEventBus()

    def handler_a(message):
        print(f"Handler A received: {message}")

    def handler_b(data1, data2):
        print(f"Handler B received: {data1} and {data2}")

    def handler_c():
        print("Handler C received an event with no data.")

    event_bus.subscribe("user:created", handler_a)
    event_bus.subscribe("data:updated", handler_b)
    event_bus.subscribe("user:created", handler_c) # 订阅同一个主题

    print("n--- Publishing 'user:created' ---")
    event_bus.publish("user:created", "New user John Doe registered.")

    print("n--- Publishing 'data:updated' ---")
    event_bus.publish("data:updated", 123, data2="hello")

    print("n--- Publishing 'non:existent' ---")
    event_bus.publish("non:existent", "Some message")

    print("n--- Unsubscribing handler_a from 'user:created' ---")
    event_bus.unsubscribe("user:created", handler_a)

    print("n--- Publishing 'user:created' again ---")
    event_bus.publish("user:created", "Another user Jane Doe registered.")

    print("n--- Unsubscribing a non-existent handler ---")
    event_bus.unsubscribe("user:created", handler_a) # 再次尝试取消

代码解释:

  • collections.defaultdict(list) 是一个非常方便的数据结构,当访问一个不存在的键时,它会自动创建一个新的空列表。这避免了在使用 _subscribers[topic].append(handler) 之前手动检查 topic 是否存在的麻烦。
  • subscribe 方法确保同一个 handler 不会被多次添加到同一个 topic 的订阅列表中。
  • publish 方法遍历 _subscribers[topic] 中的所有 handler 并调用它们。为了防止在迭代过程中列表被修改(例如,一个 handler 内部调用了 unsubscribe),我们使用 list(self._subscribers[topic]) 创建一个副本进行迭代。
  • 错误处理:每个 handler 的调用都包裹在 try-except 块中,确保一个 handler 的失败不会阻止其他 handler 的执行。

这个 BasicEventBus 实现了最基础的 Pub/Sub 功能。但正如我们前面提到的,它有局限性:所有事件处理都是同步的,并且没有提供任何命名空间管理机制。

3. 引入异步事件发布

同步事件处理在某些场景下是可接受的,但它有一个显著的缺点:当一个事件被发布时,发布者必须等待所有订阅者处理完事件才能继续执行。如果某个订阅者执行耗时操作(如数据库查询、网络请求),那么整个系统都会被阻塞,导致性能瓶颈和用户体验下降。

为了解决这个问题,我们需要引入异步事件发布。这意味着当一个事件被发布时,订阅者的处理函数将在后台执行,而发布者可以立即返回,继续其主流程。

我们将使用 Python 的 concurrent.futures.ThreadPoolExecutor 来实现异步。这是一个高级的接口,用于在单独的线程中执行可调用对象。

修改策略:

  1. 引入 ThreadPoolExecutorEventBus 的构造函数中创建一个线程池。
  2. 修改 publish 方法: 不再直接调用 handler,而是将 handler 提交到线程池中执行。
import collections
import concurrent.futures
import threading # 用于线程安全

class AsyncEventBus:
    """
    一个支持异步事件发布、命名空间和通配符订阅的事件总线。
    """
    WILDCARD = '*'
    NAMESPACE_DELIMITER = ':'

    def __init__(self, max_workers: int = 5):
        # 存储所有订阅者。键是订阅模式(topic或namespace:*, *:event等),值是回调函数列表。
        self._subscribers = collections.defaultdict(list)
        # 线程池用于异步执行事件处理器
        self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
        # 保护 _subscribers 字典的线程锁,防止并发修改
        self._lock = threading.Lock()
        print(f"AsyncEventBus initialized with {max_workers} worker threads.")

    def _validate_topic(self, topic: str):
        """验证主题或订阅模式的格式是否有效。"""
        if not isinstance(topic, str) or not topic:
            raise ValueError("Topic or subscription pattern must be a non-empty string.")
        return True

    def subscribe(self, pattern: str, handler):
        """
        订阅一个事件模式。
        模式可以包含命名空间和通配符。
        例如: "user:created", "user:*", "*:deleted", "*"
        :param pattern: 订阅模式的字符串标识符。
        :param handler: 当事件发布时将被异步调用的回调函数。
        """
        self._validate_topic(pattern)
        if not callable(handler):
            raise TypeError("Handler must be a callable function or method.")

        with self._lock:
            if handler not in self._subscribers[pattern]:
                self._subscribers[pattern].append(handler)
                print(f"Subscribed: Handler {handler.__name__} to pattern '{pattern}'")
            else:
                print(f"Handler {handler.__name__} already subscribed to pattern '{pattern}'")

    def unsubscribe(self, pattern: str, handler):
        """
        取消订阅一个事件模式。
        :param pattern: 之前订阅过的事件模式。
        :param handler: 之前订阅过的回调函数。
        """
        self._validate_topic(pattern)
        with self._lock:
            if pattern in self._subscribers and handler in self._subscribers[pattern]:
                self._subscribers[pattern].remove(handler)
                print(f"Unsubscribed: Handler {handler.__name__} from pattern '{pattern}'")
                if not self._subscribers[pattern]:
                    del self._subscribers[pattern]
            else:
                print(f"Warning: Handler {handler.__name__} not found for pattern '{pattern}'")

    def _get_matching_patterns(self, topic: str):
        """
        根据发布的事件主题,查找所有匹配的订阅模式。
        支持单层通配符。
        例如,如果主题是 "user:created",它会匹配 "user:created", "user:*", "*:created", "*"
        :param topic: 发布的事件主题。
        :return: 匹配的订阅模式列表。
        """
        self._validate_topic(topic)
        matching_patterns = set()

        # 1. 精确匹配
        if topic in self._subscribers:
            matching_patterns.add(topic)

        # 2. 通配符匹配
        parts = topic.split(self.NAMESPACE_DELIMITER)

        if len(parts) == 1: # 只有一层,例如 "event"
            # 匹配 "*:event" (不适用,因为没有分隔符)
            # 匹配 "*"
            if self.WILDCARD in self._subscribers:
                matching_patterns.add(self.WILDCARD)

            # 匹配 "event" (已在精确匹配中处理)

        elif len(parts) == 2: # 典型命名空间格式,例如 "namespace:event"
            namespace, event_name = parts[0], parts[1]

            # 匹配 "namespace:*"
            ns_wildcard_pattern = f"{namespace}{self.NAMESPACE_DELIMITER}{self.WILDCARD}"
            if ns_wildcard_pattern in self._subscribers:
                matching_patterns.add(ns_wildcard_pattern)

            # 匹配 "*:event_name"
            event_wildcard_pattern = f"{self.WILDCARD}{self.NAMESPACE_DELIMITER}{event_name}"
            if event_wildcard_pattern in self._subscribers:
                matching_patterns.add(event_wildcard_pattern)

            # 匹配 "*" (全局通配符)
            if self.WILDCARD in self._subscribers:
                matching_patterns.add(self.WILDCARD)
        else: # 更多层级的命名空间,例如 "a:b:c"
            # 为了简化,我们目前只支持单层通配符。
            # 如果需要多层通配符,例如 "a:*:c" 或 "a:**", 匹配逻辑会更复杂
            # 暂时只匹配 "a:b:c" (精确匹配) 和 "*"
            if self.WILDCARD in self._subscribers:
                matching_patterns.add(self.WILDCARD)

        return list(matching_patterns)

    def publish(self, topic: str, *args, **kwargs):
        """
        异步发布一个事件到指定主题。
        所有匹配该主题的订阅者都将在单独的线程中被调用。
        :param topic: 事件主题的字符串标识符。
        :param args: 传递给订阅者回调函数的定位参数。
        :param kwargs: 传递给订阅者回调函数的关键字参数。
        """
        self._validate_topic(topic)
        print(f"nPublishing event to topic '{topic}' (asynchronously)")

        # 获取所有匹配的模式
        matching_patterns = self._get_matching_patterns(topic)

        # 收集所有需要调用的处理器
        handlers_to_execute = []
        with self._lock:
            for pattern in matching_patterns:
                handlers_to_execute.extend(list(self._subscribers[pattern])) # 复制列表

        if not handlers_to_execute:
            print(f"No subscribers found for topic '{topic}' or matching patterns.")
            return

        futures = []
        for handler in handlers_to_execute:
            try:
                # 提交任务到线程池,并捕获 Future 对象
                future = self._executor.submit(handler, *args, **kwargs)
                futures.append(future)
            except Exception as e:
                print(f"Error submitting handler {handler.__name__} for topic '{topic}' to executor: {e}")

        # 可以选择等待所有 Future 完成,或者在后台处理结果
        # 对于异步发布,通常不等待,但为了演示,我们可以打印结果
        # 注意:这里打印 Future 的结果会阻塞发布者,实际应用中可能不会这么做
        # 而是通过 Future.add_done_callback() 来处理结果
        print(f"Event '{topic}' published. {len(futures)} handlers submitted to thread pool.")
        for future in futures:
            # 在实际生产系统中,通常不会在这里阻塞等待结果
            # 而是使用 `add_done_callback` 或者将 Future 存储起来供后续处理
            # 这里的 `result()` 只是为了演示异步执行的完成情况和潜在异常
            try:
                future.result() # 阻塞获取结果,如果处理器抛出异常,这里会重新抛出
            except Exception as e:
                print(f"Error during async execution of a handler for topic '{topic}': {e}")

    def shutdown(self):
        """
        关闭事件总线,等待所有挂起的异步任务完成。
        """
        print("Shutting down AsyncEventBus...")
        self._executor.shutdown(wait=True)
        print("AsyncEventBus shut down successfully.")

# 异步和命名空间示例用法
if __name__ == "__main__":
    bus = AsyncEventBus(max_workers=3)

    def user_created_logger(username):
        import time
        print(f"  [Handler] User created: {username} (logging)")
        time.sleep(0.1) # Simulate some work

    def user_profile_updater(username):
        import time
        print(f"  [Handler] Updating profile for: {username} (DB op)")
        time.sleep(0.5) # Simulate a longer DB operation

    def notification_sender(event_data):
        import time
        print(f"  [Handler] Sending notification: {event_data} (network op)")
        time.sleep(0.3) # Simulate a network request

    def analytics_tracker(event_name, data):
        import time
        print(f"  [Handler] Tracking analytics for '{event_name}': {data}")
        time.sleep(0.2)

    def global_error_handler(topic, error_message):
        print(f"  [Handler] GLOBAL ERROR: {topic} - {error_message}")

    def specific_error_handler(error_code):
        print(f"  [Handler] Specific error handler received code: {error_code}")
        raise ValueError("Simulated error in handler!") # This handler will fail

    print("n--- Subscriptions ---")
    bus.subscribe("user:created", user_created_logger)
    bus.subscribe("user:created", user_profile_updater)
    bus.subscribe("system:error", global_error_handler)
    bus.subscribe("system:error", specific_error_handler) # This one will raise an exception
    bus.subscribe("analytics:pageview", analytics_tracker)
    bus.subscribe("analytics:click", analytics_tracker)

    # 命名空间通配符订阅
    bus.subscribe("user:*", notification_sender) # 订阅所有user命名空间下的事件
    bus.subscribe("*:error", global_error_handler) # 订阅所有命名空间下的 error 事件
    bus.subscribe(bus.WILDCARD, lambda *args, **kwargs: print(f"  [Handler] Global listener caught: {args} {kwargs}"))

    print("n--- Publishing 'user:created' ---")
    bus.publish("user:created", "Alice")
    print("Publisher continues immediately after 'user:created' event.")

    print("n--- Publishing 'system:error' ---")
    bus.publish("system:error", "DB_CONNECTION_FAILED", error_message="Failed to connect to primary DB.")
    print("Publisher continues immediately after 'system:error' event.")

    print("n--- Publishing 'analytics:pageview' ---")
    bus.publish("analytics:pageview", "homepage", data={"user_id": 123})

    print("n--- Publishing 'user:deleted' (matches user:* and *) ---")
    bus.publish("user:deleted", "Bob")

    print("n--- Publishing 'payment:failed' (matches *:error and *) ---")
    bus.publish("payment:failed", "TRANSACTION_TIMEOUT", error_message="Payment gateway timed out.")

    print("n--- Unsubscribing specific_error_handler ---")
    bus.unsubscribe("system:error", specific_error_handler)

    print("n--- Publishing 'system:error' again (specific_error_handler is gone) ---")
    bus.publish("system:error", "CACHE_INVALIDATION_FAILED", error_message="Failed to invalidate cache.")

    # 确保所有异步任务完成
    print("n--- Waiting for all events to be processed ---")
    bus.shutdown()
    print("All events processed. Application exiting.")

关键改进:

  • ThreadPoolExecutor __init__ 中创建了一个线程池。max_workers 参数控制了可以并行执行的线程数量。
  • publish 方法异步化: 当调用 publish 时,它不再直接调用 handler,而是使用 self._executor.submit(handler, *args, **kwargs)handler 的执行提交到线程池。submit 方法会立即返回一个 Future 对象,而 handler 会在后台线程中执行。发布者无需等待。
  • shutdown 方法: 这是一个非常重要的方法。在应用程序退出之前,必须调用 bus.shutdown() 来优雅地关闭线程池,等待所有已提交但尚未完成的任务完成。
  • 线程安全: _subscribers 字典是一个共享资源,可能在多个线程中被 subscribeunsubscribepublish 方法访问。为了避免竞态条件和数据不一致,我们使用 threading.Lock 来保护对 _subscribers 的修改和读取操作。with self._lock: 语句确保在访问共享资源时获取锁并在离开代码块时释放锁。
  • 异步错误处理: future.result() 方法可以用来获取异步任务的返回值,如果任务在执行过程中抛出异常,result() 调用也会重新抛出该异常。在实际应用中,通常会使用 future.add_done_callback() 来注册一个回调函数,在异步任务完成时处理结果或异常,而不是阻塞发布者。这里为了演示,简单地在发布者侧 try-except future.result()

4. 引入命名空间与通配符订阅

随着系统规模的增长,事件的数量也会急剧增加。扁平化的主题列表(如 user_created, order_placed)很快就会变得难以管理。命名空间提供了一种组织事件的方式,可以将相关事件分组,例如 user:created, user:deleted, order:placed, order:cancelled

更进一步,我们希望能够订阅一个命名空间下的所有事件,或者所有命名空间中特定类型的事件。这就引入了通配符订阅

命名空间与通配符设计:

  • 分隔符: 我们将使用 : 作为命名空间分隔符,例如 user:created
  • 通配符: 我们将使用 * 作为通配符,表示匹配任何单个段。
    • user:created: 精确订阅 user 命名空间下的 created 事件。
    • user:*: 订阅 user 命名空间下的所有事件(例如 user:created, user:deleted)。
    • *:created: 订阅所有命名空间下的 created 事件(例如 user:created, order:created)。
    • *: 全局订阅,匹配所有事件。

修改策略:

subscribeunsubscribe 方法的接口保持不变,但它们的 pattern 参数现在可以接受通配符。
核心的改变在于 publish 方法:它需要一个机制来识别发布的 topic 能够匹配哪些 pattern。为此,我们将引入一个辅助方法 _get_matching_patterns

_get_matching_patterns(topic) 实现细节:

假设发布的 topicnamespace:event_name

它需要匹配以下几种模式:

  1. 精确匹配: namespace:event_name
  2. 命名空间通配符: namespace:*
  3. 事件名通配符: *:event_name
  4. 全局通配符: *

对于更复杂的多层命名空间(例如 a:b:c),如果我们希望支持 a:*:c 这样的匹配,_get_matching_patterns 的逻辑会变得更加复杂,可能需要递归或更高级的模式匹配算法。为了保持实现的简洁性和实用性,我们目前的实现将主要关注单层命名空间和通配符。

代码已在上一节的 AsyncEventBus 中集成。

_get_matching_patterns 逻辑分析:

  1. 精确匹配: 总是检查 topic 本身是否被订阅。
  2. 分割主题:topic 按照 NAMESPACE_DELIMITER 分割成 parts
  3. 单层主题处理: 如果 parts 只有一个元素(例如 event_name),则它只能匹配 event_name (精确匹配) 和 * (全局通配符)。
  4. 双层主题处理: 如果 parts 有两个元素(例如 namespace:event_name),则除了精确匹配,还需要检查 namespace:**:event_name 以及 *
  5. 多层主题处理: 对于 a:b:c 这样的主题,为了简化,当前实现只匹配精确主题 a:b:c 和全局通配符 *。要支持 a:*:ca:b:*,需要更复杂的模式树或正则表达式匹配。

通配符匹配逻辑表格:

发布主题 (Topic) 匹配模式 (Pattern) 匹配结果举例
user:created user:created 精确匹配
user:created user:* user 命名空间下的所有事件
user:created *:created 所有 created 事件
user:created * 所有事件
system:error system:error 精确匹配
system:error system:* system 命名空间下的所有事件
system:error *:error 所有 error 事件
system:error * 所有事件
simple_event simple_event 精确匹配 (无命名空间)
simple_event * 所有事件 (全局通配符)
another:sub:event another:sub:event 精确匹配 (多层命名空间,当前通配符机制不处理 a:*:c)
another:sub:event * 所有事件 (全局通配符)

publish 方法中的匹配:

publish 方法首先通过 _get_matching_patterns(topic) 获取所有匹配当前发布主题的订阅模式。然后,它会遍历这些模式,从 _subscribers 中收集所有对应的 handler,并将它们提交到线程池中执行。

5. 精炼与高级考量

我们的 AsyncEventBus 已经相当强大,但作为编程专家,我们还需要考虑一些精炼和高级特性。

5.1 错误处理与日志

异步事件处理器中的错误不应该静默失败。虽然我们已经通过 try-except 包裹了 handler 的调用,并打印了错误信息,但在生产环境中,我们需要更健壮的错误处理:

  • 集中式错误日志: 将错误信息记录到日志系统,而不是直接打印到控制台。
  • 重试机制: 对于某些瞬时错误,可以考虑实现自动重试。
  • 死信队列(Dead Letter Queue): 对于无法处理的事件,将其发送到一个特殊的队列,以便后续人工检查或处理。
  • 错误回调: 允许订阅者注册一个专门的错误处理回调,当它们的异步任务失败时被调用。
# 示例:更高级的错误处理(可以在 Future.add_done_callback 中使用)
def _handle_future_result(future, topic, handler_name):
    """
    一个用于处理异步任务结果的回调函数。
    """
    try:
        result = future.result() # 获取任务结果,如果失败会抛出异常
        # print(f"  [Async Result] Handler '{handler_name}' for topic '{topic}' completed successfully.")
        # 可以在这里处理成功的结果
    except Exception as e:
        print(f"  [Async Error] Handler '{handler_name}' for topic '{topic}' failed: {type(e).__name__}: {e}")
        # 记录到日志系统
        # 触发一个专门的错误事件,例如 bus.publish("event:error", topic, handler_name, str(e))

5.2 单次订阅 (once)

有时,订阅者只对某个事件的第一次发生感兴趣。once 方法可以实现这一点:在事件被触发并处理后,自动取消订阅。

class AsyncEventBus:
    # ... (其他代码保持不变) ...

    def once(self, pattern: str, handler):
        """
        订阅一个事件模式,但只接收一次通知。
        在事件触发后,该处理器将自动取消订阅。
        :param pattern: 订阅模式的字符串标识符。
        :param handler: 当事件发布时将被调用的回调函数。
        """
        # 创建一个包装器,在执行后自动取消订阅
        def once_wrapper(*args, **kwargs):
            try:
                handler(*args, **kwargs)
            finally:
                # 确保在执行后尝试取消订阅,即使 handler 失败
                self.unsubscribe(pattern, once_wrapper)

        # 将原始 handler 存储在 wrapper 属性中,以便后续匹配和取消订阅
        once_wrapper._original_handler = handler 

        self.subscribe(pattern, once_wrapper)
        print(f"Subscribed once: Handler {handler.__name__} to pattern '{pattern}'")

    # `unsubscribe` 方法需要稍微调整,以便能够正确取消 `once_wrapper`
    # ...
    # def unsubscribe(self, pattern: str, handler):
    #     with self._lock:
    #         if pattern in self._subscribers:
    #             # 检查原始 handler 或 wrapper 本身
    #             handlers_to_remove = []
    #             for existing_handler in self._subscribers[pattern]:
    #                 if existing_handler == handler: # 精确匹配
    #                     handlers_to_remove.append(existing_handler)
    #                 elif hasattr(existing_handler, '_original_handler') and existing_handler._original_handler == handler:
    #                     handlers_to_remove.append(existing_handler)
    #             
    #             for h in handlers_to_remove:
    #                 self._subscribers[pattern].remove(h)
    #                 print(f"Unsubscribed: Handler {handler.__name__} from pattern '{pattern}'")
    #             
    #             if not self._subscribers[pattern]:
    #                 del self._subscribers[pattern]
    #         else:
    #             print(f"Warning: Handler {handler.__name__} not found for pattern '{pattern}'")

注意: once 的实现需要 unsubscribe 能够识别包装器 once_wrapper 和原始 handler 的关系。一个简单的做法是在 once_wrapper 上添加一个属性来指向原始 handler,以便在 unsubscribe 时能够通过原始 handler 来找到并移除 wrapper。为了文章主线清晰,这里不深入展开修改 unsubscribe,仅提供 once 的概念。

5.3 事件数据结构化

目前我们使用 *args**kwargs 传递事件数据。在更复杂的系统中,将事件数据封装成一个统一的事件对象会更有利于类型检查、数据一致性和未来扩展。

from typing import Any, Dict

class Event:
    """
    一个结构化的事件对象。
    """
    def __init__(self, topic: str, payload: Dict[str, Any] = None, timestamp: float = None):
        self.topic = topic
        self.payload = payload if payload is not None else {}
        self.timestamp = timestamp if timestamp is not None else time.time()

    def __repr__(self):
        return f"Event(topic='{self.topic}', payload={self.payload}, timestamp={self.timestamp})"

# 那么 publish 方法可以变为:
# def publish(self, event: Event):
#     # ...
#     for handler in handlers_to_execute:
#         self._executor.submit(handler, event) # 只传递 event 对象
#
# 订阅者则接收 event 对象:
# def my_handler(event: Event):
#     print(f"Received event {event.topic} with payload {event.payload}")

这种方式将事件的元数据(如 topic, timestamp)与业务数据 (payload) 分离,使事件处理更加规范。

5.4 性能考量

对于高并发、高吞吐量的系统,需要更深入地考虑性能:

  • 线程池大小: max_workers 的选择至关重要。过小会导致任务积压,过大可能导致线程切换开销。通常根据 CPU 核数和 I/O 密集型任务的比例进行调整。
  • 避免阻塞线程池: 确保订阅者处理器本身不会执行长时间的 CPU 密集型计算或长时间的阻塞 I/O 操作。如果确实有这类任务,考虑使用 ProcessPoolExecutor 或专门的异步 I/O 框架(如 asyncio)。
  • 订阅者数据结构优化: 对于海量订阅者,_subscribers 的查找效率可能会成为瓶颈。如果通配符模式非常复杂,可能需要使用 Trie 树或其他高级数据结构来优化匹配过程。

6. 完整的 AsyncEventBus 实现

我们将把所有的考量和特性整合到一个最终的 AsyncEventBus 类中。

import collections
import concurrent.futures
import threading
import time
from typing import Any, Dict, List, Callable, Set

# 定义一个结构化的事件对象,方便传递和处理事件数据
class Event:
    """
    一个结构化的事件对象,包含事件主题、负载和时间戳。
    """
    def __init__(self, topic: str, payload: Dict[str, Any] = None, timestamp: float = None):
        if not isinstance(topic, str) or not topic:
            raise ValueError("Event topic must be a non-empty string.")
        self.topic = topic
        self.payload = payload if payload is not None else {}
        self.timestamp = timestamp if timestamp is not None else time.time()

    def __repr__(self):
        return f"Event(topic='{self.topic}', payload={self.payload}, timestamp={self.timestamp}, ts_iso='{time.ctime(self.timestamp)}')"

    def __str__(self):
        return self.__repr__()

class AsyncEventBus:
    """
    一个支持命名空间、通配符订阅和异步事件发布的事件总线。
    它使用线程池来异步执行事件处理器,并包含线程安全机制。
    """
    WILDCARD = '*'
    NAMESPACE_DELIMITER = ':'

    def __init__(self, max_workers: int = 5):
        # 存储所有订阅者。键是订阅模式(topic或namespace:*, *:event等),值是回调函数列表。
        # 使用 defaultdict 简化主题不存在时的处理,自动创建空列表
        self._subscribers: Dict[str, List[Callable]] = collections.defaultdict(list)

        # 线程池用于异步执行事件处理器
        self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)

        # 保护 _subscribers 字典的线程锁,防止并发修改
        self._lock = threading.Lock()

        print(f"[{time.ctime()}] AsyncEventBus initialized with {max_workers} worker threads.")

    def _validate_pattern(self, pattern: str):
        """验证主题或订阅模式的格式是否有效。"""
        if not isinstance(pattern, str) or not pattern:
            raise ValueError("Pattern (topic or subscription) must be a non-empty string.")
        # 进一步可以添加规则,例如不允许连续的通配符或分隔符等
        return True

    def subscribe(self, pattern: str, handler: Callable):
        """
        订阅一个事件模式。
        模式可以包含命名空间和通配符。
        例如: "user:created", "user:*", "*:deleted", "*"
        :param pattern: 订阅模式的字符串标识符。
        :param handler: 当事件发布时将被异步调用的回调函数。它应该接受一个 Event 对象作为参数。
        """
        self._validate_pattern(pattern)
        if not callable(handler):
            raise TypeError("Handler must be a callable function or method.")

        with self._lock:
            if handler not in self._subscribers[pattern]:
                self._subscribers[pattern].append(handler)
                print(f"[{time.ctime()}] Subscribed: Handler '{handler.__name__}' to pattern '{pattern}'")
            else:
                print(f"[{time.ctime()}] Warning: Handler '{handler.__name__}' already subscribed to pattern '{pattern}'")

    def unsubscribe(self, pattern: str, handler: Callable):
        """
        取消订阅一个事件模式。
        :param pattern: 之前订阅过的事件模式。
        :param handler: 之前订阅过的回调函数。
        """
        self._validate_pattern(pattern)
        with self._lock:
            if pattern in self._subscribers:
                # 考虑到 once 订阅可能使用了包装器,我们需要查找原始 handler
                handlers_to_remove = []
                for existing_handler in self._subscribers[pattern]:
                    if existing_handler == handler: # 精确匹配
                        handlers_to_remove.append(existing_handler)
                    # 如果是 once_wrapper,检查其内部是否包装了目标 handler
                    elif hasattr(existing_handler, '_original_handler') and existing_handler._original_handler == handler:
                        handlers_to_remove.append(existing_handler)

                for h in handlers_to_remove:
                    self._subscribers[pattern].remove(h)
                    print(f"[{time.ctime()}] Unsubscribed: Handler '{handler.__name__}' from pattern '{pattern}'")

                if not self._subscribers[pattern]:
                    del self._subscribers[pattern] # 清理空列表
            else:
                print(f"[{time.ctime()}] Warning: Pattern '{pattern}' has no subscribers to unsubscribe '{handler.__name__}'.")

    def once(self, pattern: str, handler: Callable):
        """
        订阅一个事件模式,但只接收一次通知。
        在事件触发并成功处理后,该处理器将自动取消订阅。
        :param pattern: 订阅模式的字符串标识符。
        :param handler: 当事件发布时将被调用的回调函数。它应该接受一个 Event 对象作为参数。
        """
        def once_wrapper(event: Event):
            try:
                handler(event)
            finally:
                # 即使 handler 失败,也尝试取消订阅,确保只执行一次
                self.unsubscribe(pattern, once_wrapper)

        # 将原始 handler 存储在 wrapper 属性中,以便 unsubscribe 能够识别
        once_wrapper._original_handler = handler 

        self.subscribe(pattern, once_wrapper)
        print(f"[{time.ctime()}] Subscribed once: Handler '{handler.__name__}' to pattern '{pattern}'")

    def _get_matching_patterns(self, topic: str) -> List[str]:
        """
        根据发布的事件主题,查找所有匹配的订阅模式。
        支持单层通配符。
        例如,如果主题是 "user:created",它会匹配 "user:created", "user:*", "*:created", "*"
        :param topic: 发布的事件主题。
        :return: 匹配的订阅模式列表。
        """
        self._validate_pattern(topic)
        matching_patterns: Set[str] = set()

        # 1. 精确匹配
        if topic in self._subscribers:
            matching_patterns.add(topic)

        # 2. 通配符匹配
        parts = topic.split(self.NAMESPACE_DELIMITER)

        if len(parts) == 1: # 只有一层,例如 "event_name"
            # 匹配 "event_name" (已在精确匹配中处理)
            # 匹配 "*"
            if self.WILDCARD in self._subscribers:
                matching_patterns.add(self.WILDCARD)

        elif len(parts) == 2: # 典型命名空间格式,例如 "namespace:event_name"
            namespace, event_name = parts[0], parts[1]

            # 匹配 "namespace:*"
            ns_wildcard_pattern = f"{namespace}{self.NAMESPACE_DELIMITER}{self.WILDCARD}"
            if ns_wildcard_pattern in self._subscribers:
                matching_patterns.add(ns_wildcard_pattern)

            # 匹配 "*:event_name"
            event_wildcard_pattern = f"{self.WILDCARD}{self.NAMESPACE_DELIMITER}{event_name}"
            if event_wildcard_pattern in self._subscribers:
                matching_patterns.add(event_wildcard_pattern)

            # 匹配 "*" (全局通配符)
            if self.WILDCARD in self._subscribers:
                matching_patterns.add(self.WILDCARD)
        else: # 更多层级的命名空间,例如 "a:b:c"
            # 为了简化,我们目前只支持单层通配符。
            # 暂时只匹配精确主题和 "*"
            if self.WILDCARD in self._subscribers:
                matching_patterns.add(self.WILDCARD)

        return list(matching_patterns)

    def publish(self, topic: str, **payload: Any):
        """
        异步发布一个事件到指定主题。
        所有匹配该主题的订阅者都将在单独的线程中被调用。
        事件数据被封装在 Event 对象中传递给处理器。
        :param topic: 事件主题的字符串标识符。
        :param payload: 传递给订阅者 Event 对象的负载数据。
        """
        self._validate_pattern(topic)
        event = Event(topic=topic, payload=payload)

        print(f"n[{time.ctime()}] Publishing event: {event.topic} (asynchronously)")

        # 获取所有匹配的模式
        matching_patterns = self._get_matching_patterns(topic)

        # 收集所有需要调用的处理器
        handlers_to_execute = []
        with self._lock:
            for pattern in matching_patterns:
                # 使用 list() 创建副本,防止在迭代时修改列表
                handlers_to_execute.extend(list(self._subscribers[pattern])) 

        if not handlers_to_execute:
            print(f"[{time.ctime()}] No subscribers found for event '{event.topic}' or matching patterns.")
            return

        futures = []
        for handler in handlers_to_execute:
            try:
                # 提交任务到线程池,并捕获 Future 对象
                future = self._executor.submit(handler, event)
                future.add_done_callback(
                    lambda f, h=handler.__name__, t=event.topic: self._handle_future_result(f, t, h)
                )
                futures.append(future)
            except Exception as e:
                print(f"[{time.ctime()}] Error submitting handler '{handler.__name__}' for topic '{event.topic}' to executor: {e}")

        print(f"[{time.ctime()}] Event '{event.topic}' published. {len(futures)} handlers submitted to thread pool.")
        # 发布者立即返回,不等待 Future 完成

    def _handle_future_result(self, future: concurrent.futures.Future, topic: str, handler_name: str):
        """
        一个私有回调函数,用于处理异步任务的完成(成功或失败)。
        这个函数将在任务完成时由线程池调用。
        """
        try:
            future.result() # 尝试获取结果,如果任务失败,这里会重新抛出异常
            # print(f"[{time.ctime()}]   [Async Result] Handler '{handler_name}' for topic '{topic}' completed successfully.")
        except Exception as e:
            print(f"[{time.ctime()}]   [Async Error] Handler '{handler_name}' for topic '{topic}' failed: {type(e).__name__}: {e}")
            # 这里可以集成更高级的错误处理,如记录到日志、发送警报、发布一个错误事件等
            # self.publish("bus:error", original_topic=topic, handler=handler_name, error_message=str(e)) # 避免无限循环

    def shutdown(self, wait: bool = True, timeout: float = None):
        """
        关闭事件总线,等待所有挂起的异步任务完成。
        :param wait: 如果为 True,则在所有挂起的工作完成之前阻塞。
        :param timeout: 如果 wait 为 True,则在超时秒后强制关闭。
        """
        print(f"n[{time.ctime()}] Shutting down AsyncEventBus (wait={wait}, timeout={timeout})...")
        self._executor.shutdown(wait=wait, timeout=timeout)
        print(f"[{time.ctime()}] AsyncEventBus shut down successfully.")

# --- 完整示例用法 ---
if __name__ == "__main__":
    bus = AsyncEventBus(max_workers=3)

    def user_created_logger(event: Event):
        print(f"[{time.ctime()}]   [Handler: Logger] User created: {event.payload.get('username')} (logging for {event.topic})")
        time.sleep(0.1) 

    def user_profile_updater(event: Event):
        print(f"[{time.ctime()}]   [Handler: ProfileUpdater] Updating profile for: {event.payload.get('username')} (DB op for {event.topic})")
        time.sleep(0.5) 

    def notification_sender(event: Event):
        print(f"[{time.ctime()}]   [Handler: Notifier] Sending notification for: {event.topic} event with data: {event.payload} (network op)")
        time.sleep(0.3) 

    def analytics_tracker(event: Event):
        print(f"[{time.ctime()}]   [Handler: Analytics] Tracking '{event.topic}': {event.payload}")
        time.sleep(0.2)

    def global_error_handler(event: Event):
        print(f"[{time.ctime()}]   [Handler: GlobalError] GLOBAL ERROR: {event.topic} - {event.payload.get('error_message')}")

    def specific_error_handler(event: Event):
        print(f"[{time.ctime()}]   [Handler: SpecificError] Specific error handler received code: {event.payload.get('error_code')}")
        raise ValueError("Simulated error in specific_error_handler!") # This handler will fail

    def once_handler(event: Event):
        print(f"[{time.ctime()}]   [Handler: Once] This should only run once for {event.topic}!")
        time.sleep(0.05)

    print("n--- Subscriptions ---")
    bus.subscribe("user:created", user_created_logger)
    bus.subscribe("user:created", user_profile_updater)
    bus.subscribe("system:error", global_error_handler)
    bus.subscribe("system:error", specific_error_handler) # This one will raise an exception
    bus.subscribe("analytics:pageview", analytics_tracker)
    bus.subscribe("analytics:click", analytics_tracker)
    bus.once("user:deleted", once_handler) # 订阅一次性事件

    # 命名空间通配符订阅
    bus.subscribe("user:*", notification_sender) # 订阅所有user命名空间下的事件
    bus.subscribe("*:error", global_error_handler) # 订阅所有命名空间下的 error 事件 (注意与 system:error 的重复)
    bus.subscribe(bus.WILDCARD, lambda event: print(f"[{time.ctime()}]   [Handler: GlobalListener] Caught ALL: {event.topic}"))

    print("n--- Publishing 'user:created' ---")
    bus.publish("user:created", username="Alice", email="[email protected]")
    print(f"[{time.ctime()}] Publisher continues immediately after 'user:created' event.")

    print("n--- Publishing 'system:error' ---")
    bus.publish("system:error", error_code="DB_CONNECTION_FAILED", error_message="Failed to connect to primary DB.")
    print(f"[{time.ctime()}] Publisher continues immediately after 'system:error' event.")

    print("n--- Publishing 'analytics:pageview' ---")
    bus.publish("analytics:pageview", page_url="/home", user_id=123)

    print("n--- Publishing 'user:deleted' (matches user:* and * and once_handler) ---")
    bus.publish("user:deleted", username="Bob", reason="left company")
    print(f"[{time.ctime()}] Publisher continues immediately after 'user:deleted' event.")

    print("n--- Publishing 'user:deleted' again (once_handler should not run) ---")
    bus.publish("user:deleted", username="Charlie", reason="privacy request")
    print(f"[{time.ctime()}] Publisher continues immediately after 'user:deleted' event.")

    print("n--- Publishing 'payment:failed' (matches *:error and *) ---")
    bus.publish("payment:failed", error_code="TRANSACTION_TIMEOUT", error_message="Payment gateway timed out.")

    print("n--- Unsubscribing specific_error_handler ---")
    bus.unsubscribe("system:error", specific_error_handler)

    print("n--- Publishing 'system:error' again (specific_error_handler is gone) ---")
    bus.publish("system:error", error_code="CACHE_INVALIDATION_FAILED", error_message="Failed to invalidate cache.")

    # 确保所有异步任务完成
    bus.shutdown()
    print(f"[{time.ctime()}] All events processed. Application exiting.")

代码总结:

  1. Event 类: 定义了标准化的事件载体,包含 topicpayloadtimestamp。所有处理器都将接收一个 Event 实例。
  2. AsyncEventBus 初始化: 在构造函数中创建 ThreadPoolExecutorthreading.Lock
  3. subscribeonce
    • subscribe 负责将 handler 注册到 _subscribers 字典中。
    • once 方法通过一个包装器 once_wrapper 实现,它在执行 handler 后自动调用 unsubscribe。为了让 unsubscribe 能正确识别,once_wrapper 上附加了 _original_handler 属性。
  4. unsubscribe 增强了逻辑,不仅能移除原始 handler,也能移除 once 创建的包装器。
  5. _get_matching_patterns 核心的模式匹配逻辑,负责识别发布的 topic 能够触发哪些通配符订阅。
  6. publish
    • 创建 Event 对象。
    • 调用 _get_matching_patterns 获取所有匹配的订阅模式。
    • 收集所有匹配模式下的 handler
    • 将每个 handlerEvent 对象提交给 _executor 线程池。
    • 为每个 Future 添加 _handle_future_result 回调,用于在任务完成时处理结果或错误。
    • 发布者立即返回,不会等待事件处理完成。
  7. _handle_future_result 异步任务完成后,由线程池调用。负责捕获并打印处理器中发生的异常,可以扩展为更复杂的错误报告机制。
  8. shutdown 优雅地关闭线程池,等待所有提交的任务完成,防止数据丢失或资源泄露。

7. 结语

我们从 Pub/Sub 模式的基础概念出发,逐步构建了一个功能强大的 AsyncEventBus。它实现了事件的发布与订阅、支持命名空间和通配符匹配,并通过线程池实现了异步事件处理,同时兼顾了线程安全和基本错误处理。

这个实现为构建解耦、可扩展和响应迅速的应用程序提供了一个坚实的基础。在实际项目中,您可以根据具体需求进一步扩展,例如支持更复杂的通配符规则、事件优先级、持久化事件、分布式 Pub/Sub 消息队列集成,或是与 asyncio 等异步框架结合,以适应更广泛的场景。理解并掌握这样的模式,是成为一名优秀编程专家的必经之路。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注