手写实现一个 EventBus(事件总线):处理组件间的全局事件通信与解耦

各位同学,大家好!

今天,我们将深入探讨一个在现代软件架构中极其重要的设计模式和工具——事件总线(EventBus)。在复杂的应用中,组件间的通信往往是痛点。紧耦合的代码不仅难以测试,更难以维护和扩展。事件总线正是为了解决这一难题而生,它提供了一种优雅的、解耦的通信机制。

我们将从零开始,手写实现一个功能完备的 EventBus。在这个过程中,不仅会讲解其核心原理,还会一步步添加高级功能,并深入探讨设计考量、潜在问题及最佳实践。我的目标是,通过这次讲座,让大家不仅能实现一个 EventBus,更能理解其背后的设计哲学,并在未来的项目中灵活运用。


第一章:引言 – 为什么我们需要事件总线?

在分布式系统、单页应用(SPA)乃至简单的桌面应用中,各个组件或模块之间经常需要交换信息。例如,用户点击了一个按钮(组件 A),需要通知另一个组件(组件 B)更新其显示;数据加载完成(模块 C),需要通知多个组件(组件 D、E、F)进行刷新。

传统的通信方式可能包括:

  1. 直接调用(Direct Invocation):组件 A 直接调用组件 B 的方法。这导致 A 强依赖于 B,两者耦合紧密。如果 B 的接口改变,A 必须随之修改。
  2. 回调函数(Callbacks):组件 A 将一个函数传递给组件 B,当 B 完成某个操作后,调用这个函数通知 A。这在简单的场景下可行,但在多对多通信时会演变成“回调地狱”,难以管理。
  3. 共享状态(Shared State):组件通过一个共享的状态对象进行通信。这可能导致状态管理复杂,且难以追踪状态变化触发的副作用。

这些方式在小型项目中尚可接受,但随着项目规模的扩大,组件数量的增加,它们的问题会逐渐暴露:

  • 紧耦合(Tight Coupling):组件之间相互依赖,任何一方的变更都可能影响另一方。
  • 难以维护(Hard to Maintain):事件流和数据流变得复杂,难以追踪问题。
  • 可测试性差(Poor Testability):测试一个组件时,可能需要模拟其所有依赖,增加了测试的复杂性。
  • 可扩展性差(Poor Scalability):增加新的组件或功能时,需要修改现有组件的通信逻辑。

为了解决这些问题,我们引入了 发布-订阅(Publish-Subscribe)模式。事件总线正是这种模式的一种实现。

发布-订阅模式的核心思想

  • 发布者(Publisher):当某个事件发生时,发布者不直接通知任何特定的订阅者,而是向一个中介(事件总线)“发布”一个事件。
  • 订阅者(Subscriber):订阅者对它感兴趣的特定事件进行“订阅”。当事件总线接收到它订阅的事件时,就会通知它。
  • 事件总线(EventBus):作为发布者和订阅者之间的中介,负责接收事件并将其分发给所有相关的订阅者。

通过这种模式,发布者和订阅者之间不再直接通信,而是通过事件总线间接通信。它们不再需要知道彼此的存在,从而实现了 松散耦合(Loose Coupling)。发布者只知道事件总线,订阅者也只知道事件总线。这极大地提高了代码的模块化、可维护性和可扩展性。


第二章:EventBus 的核心原理与基础构建

现在,让我们开始构建我们自己的 EventBus。一个 EventBus 的核心功能非常简单:注册监听器和触发事件。

2.1 核心数据结构

首先,我们需要一个数据结构来存储所有注册的事件监听器。最直观的方式是使用一个 Map,其中键是事件的名称(字符串),值是一个数组,包含所有订阅了该事件的监听器函数。

// 定义事件类型和监听器函数类型
type EventName = string;
type EventListener<T = any> = (payload: T) => void;

// 存储所有事件监听器的数据结构
// Key: 事件名称 (string)
// Value: 监听该事件的所有监听器函数数组 (EventListener[])
private listeners: Map<EventName, EventListener[]>;

2.2 EventBus 类骨架

我们将 EventBus 封装成一个类,这样可以创建多个实例,或者作为单例使用。

/**
 * EventBus 类:实现发布-订阅模式的事件总线
 */
class EventBus {
    // 存储所有事件监听器
    private listeners: Map<EventName, EventListener[]>;

    constructor() {
        this.listeners = new Map<EventName, EventListener[]>();
        console.log("EventBus 初始化完成。");
    }

    /**
     * 注册事件监听器
     * 当指定事件触发时,会调用此监听器。
     * @param eventName 事件的名称
     * @param listener 监听器函数,接收事件载荷作为参数
     */
    subscribe<T = any>(eventName: EventName, listener: EventListener<T>): void {
        // 待实现
    }

    /**
     * 触发事件
     * 会通知所有订阅了该事件的监听器。
     * @param eventName 事件的名称
     * @param payload 事件的载荷(数据)
     */
    publish<T = any>(eventName: EventName, payload?: T): void {
        // 待实现
    }

    /**
     * 移除事件监听器
     * @param eventName 事件的名称
     * @param listener 要移除的监听器函数
     */
    unsubscribe<T = any>(eventName: EventName, listener: EventListener<T>): void {
        // 待实现
    }

    /**
     * 注册只触发一次的事件监听器
     * @param eventName 事件的名称
     * @param listener 监听器函数
     */
    once<T = any>(eventName: EventName, listener: EventListener<T>): void {
        // 待实现
    }
}

2.3 实现 subscribe 方法:注册事件监听器

subscribe 方法负责将一个监听器函数与一个事件名称关联起来。当一个事件被订阅时,我们需要检查该事件是否已经被注册过。如果没有,就创建一个新的数组来存储监听器;如果已经存在,就将新的监听器添加到现有数组中。

class EventBus {
    // ... (其他属性和构造函数)

    /**
     * 注册事件监听器
     * 当指定事件触发时,会调用此监听器。
     * @param eventName 事件的名称
     * @param listener 监听器函数,接收事件载荷作为参数
     */
    subscribe<T = any>(eventName: EventName, listener: EventListener<T>): void {
        if (!this.listeners.has(eventName)) {
            // 如果事件名称不存在,则创建一个新的监听器数组
            this.listeners.set(eventName, []);
        }
        // 获取该事件名称对应的监听器数组,并添加新的监听器
        // 确保同一个监听器不会被重复添加,尽管这通常不是subscribe的职责,
        // 但为了健壮性,这里可以考虑加一个检查,或者依赖调用者避免重复订阅。
        // 在实际应用中,如果同一个函数实例被多次subscribe,它也会被多次调用,这通常是预期行为。
        // 这里我们简单地直接添加。
        this.listeners.get(eventName)!.push(listener);
        console.log(`事件 '${eventName}' 添加了一个新的监听器。当前监听器数量: ${this.listeners.get(eventName)!.length}`);
    }

    // ... (其他方法)
}

subscribe 方法的逻辑流程

  1. 检查事件是否存在:使用 this.listeners.has(eventName) 判断 Map 中是否已经存在 eventName 对应的条目。
  2. 创建新列表:如果不存在,说明这是该事件的第一个监听器,因此我们用 this.listeners.set(eventName, []) 创建一个空的数组,并将其作为值与 eventName 关联。
  3. 添加监听器:无论事件是新创建的还是已经存在的,我们都通过 this.listeners.get(eventName)! 获取到对应的监听器数组,然后使用 push() 方法将新的 listener 函数添加到数组的末尾。这里的 ! 是 TypeScript 的非空断言操作符,因为我们前面已经检查过或创建过,所以可以确定它不会是 undefined

2.4 实现 publish 方法:触发事件并通知所有监听器

publish 方法是 EventBus 的核心。当它被调用时,它会查找所有订阅了指定事件的监听器,并依次执行它们。

class EventBus {
    // ... (其他属性和方法)

    /**
     * 触发事件
     * 会通知所有订阅了该事件的监听器。
     * @param eventName 事件的名称
     * @param payload 事件的载荷(数据)
     */
    publish<T = any>(eventName: EventName, payload?: T): void {
        // 获取所有订阅了该事件的监听器
        const eventListeners = this.listeners.get(eventName);

        if (eventListeners && eventListeners.length > 0) {
            console.log(`事件 '${eventName}' 被触发,共 ${eventListeners.length} 个监听器将被调用。`);
            // 遍历并执行所有监听器
            // 使用 for...of 循环,以确保在遍历过程中即使有监听器被移除(如 once 监听器),
            // 也不会影响到当前循环的正确性(因为我们是遍历一个副本或者在循环前获取到所有引用)。
            // 复制一份数组可以避免在监听器内部对数组进行修改时导致的问题,例如 `once` 监听器会在调用后移除自身。
            // 如果不复制,直接在原数组上迭代并在回调中移除元素,会导致索引错乱。
            [...eventListeners].forEach(listener => {
                try {
                    listener(payload as T); // 执行监听器,并传递事件载荷
                } catch (error) {
                    console.error(`在处理事件 '${eventName}' 时,监听器抛出错误:`, error);
                    // 错误处理策略:记录错误,但不中断其他监听器的执行
                }
            });
        } else {
            console.log(`事件 '${eventName}' 被触发,但没有找到任何监听器。`);
        }
    }

    // ... (其他方法)
}

publish 方法的逻辑流程

  1. 获取监听器列表:使用 this.listeners.get(eventName) 获取与 eventName 关联的监听器数组。
  2. 检查是否存在监听器:如果 eventListeners 为空或数组长度为 0,则说明没有监听器订阅该事件,直接返回。
  3. 遍历并执行:如果存在监听器,我们通过 [...eventListeners] 创建一个监听器数组的浅拷贝。这样做是为了避免在事件处理过程中,某个监听器(例如 once 监听器,我们稍后会实现)移除自身时,导致原数组的长度或索引发生变化,从而影响当前 forEach 循环的正确性。然后,我们遍历这个拷贝数组,对每一个 listener 函数,调用它并传入 payload 作为参数。
  4. 错误处理:每个监听器的执行都被 try-catch 块包裹。这意味着如果某个监听器在执行过程中抛出异常,它不会中断其他监听器的执行,只会将错误记录下来。这是一个非常重要的健壮性设计。

2.5 基础 EventBus 示例

现在,我们有了一个可以运行的 EventBus 基础版本。

// 完整的基础 EventBus 类
type EventName = string;
type EventListener<T = any> = (payload: T) => void;

class EventBus {
    private listeners: Map<EventName, EventListener[]>;

    constructor() {
        this.listeners = new Map<EventName, EventListener[]>();
        console.log("EventBus 初始化完成。");
    }

    subscribe<T = any>(eventName: EventName, listener: EventListener<T>): void {
        if (!this.listeners.has(eventName)) {
            this.listeners.set(eventName, []);
        }
        this.listeners.get(eventName)!.push(listener);
        console.log(`[EventBus] 事件 '${eventName}' 添加了一个新的监听器。当前监听器数量: ${this.listeners.get(eventName)!.length}`);
    }

    publish<T = any>(eventName: EventName, payload?: T): void {
        const eventListeners = this.listeners.get(eventName);

        if (eventListeners && eventListeners.length > 0) {
            console.log(`[EventBus] 事件 '${eventName}' 被触发,共 ${eventListeners.length} 个监听器将被调用。`);
            [...eventListeners].forEach(listener => {
                try {
                    listener(payload as T);
                } catch (error) {
                    console.error(`[EventBus Error] 在处理事件 '${eventName}' 时,监听器抛出错误:`, error);
                }
            });
        } else {
            console.log(`[EventBus Info] 事件 '${eventName}' 被触发,但没有找到任何监听器。`);
        }
    }

    // 待实现: unsubscribe, once
    unsubscribe<T = any>(eventName: EventName, listener: EventListener<T>): void { /* ... */ }
    once<T = any>(eventName: EventName, listener: EventListener<T>): void { /* ... */ }
}

// 使用示例
const bus = new EventBus();

// 订阅事件
const handler1 = (data: { message: string }) => {
    console.log(`处理器 1 收到消息: ${data.message}`);
};
const handler2 = (data: { message: string }) => {
    console.log(`处理器 2 收到消息: ${data.message.toUpperCase()}`);
};
const handler3 = (data: string) => {
    console.log(`处理器 3 收到原始字符串: ${data}`);
};

bus.subscribe('userLoggedIn', handler1);
bus.subscribe('userLoggedIn', handler2);
bus.subscribe('messageReceived', handler3);

// 触发事件
console.log("n--- 第一次触发 userLoggedIn 事件 ---");
bus.publish('userLoggedIn', { message: 'Alice 登录成功' });

console.log("n--- 触发 messageReceived 事件 ---");
bus.publish('messageReceived', 'Hello EventBus!');

console.log("n--- 第二次触发 userLoggedIn 事件 ---");
bus.publish('userLoggedIn', { message: 'Bob 登录成功' });

/*
预期输出:
EventBus 初始化完成。
[EventBus] 事件 'userLoggedIn' 添加了一个新的监听器。当前监听器数量: 1
[EventBus] 事件 'userLoggedIn' 添加了一个新的监听器。当前监听器数量: 2
[EventBus] 事件 'messageReceived' 添加了一个新的监听器。当前监听器数量: 1

--- 第一次触发 userLoggedIn 事件 ---
[EventBus] 事件 'userLoggedIn' 被触发,共 2 个监听器将被调用。
处理器 1 收到消息: Alice 登录成功
处理器 2 收到消息: ALICE 登录成功

--- 触发 messageReceived 事件 ---
[EventBus] 事件 'messageReceived' 被触发,共 1 个监听器将被调用。
处理器 3 收到原始字符串: Hello EventBus!

--- 第二次触发 userLoggedIn 事件 ---
[EventBus] 事件 'userLoggedIn' 被触发,共 2 个监听器将被调用。
处理器 1 收到消息: Bob 登录成功
处理器 2 收到消息: BOB 登录成功
*/

讨论:同步执行的优缺点

当前我们的 publish 方法是同步执行的,即当 publish 方法被调用时,所有监听器会立即按顺序执行,直到所有监听器都执行完毕,publish 方法才会返回。

优点

  • 简单直观:事件的发生与处理之间的关系清晰,易于理解。
  • 即时响应:监听器会立即响应事件,没有额外的延迟。
  • 顺序保证:在 publish 调用期间,所有监听器按注册顺序执行,这在某些场景下很重要。

缺点

  • 阻塞主线程:如果某个监听器执行时间过长(例如,进行复杂的计算或同步 I/O 操作),它会阻塞 publish 方法的调用者以及整个 JavaScript 主线程,导致 UI 卡顿或响应迟缓。
  • 错误传播:虽然我们使用了 try-catch,但如果在监听器中出现未经捕获的异步错误,或者它导致了严重的运行时问题,可能会影响整个应用。
  • 解耦不彻底:虽然发布者和订阅者之间解耦了,但发布者仍然需要等待所有订阅者处理完事件才能继续。

在大多数前端应用场景中,为了保持 UI 响应性,我们通常倾向于异步事件处理。我们将在后续章节中探讨如何实现异步处理。


第三章:完善 EventBus 功能 – 解耦与健壮性

一个实用的 EventBus 需要更多功能来处理各种复杂的场景。本章我们将实现 unsubscribeonce 方法,并进一步完善错误处理和 this 上下文。

3.1 实现 unsubscribe 方法:移除监听器

随着应用的运行,组件可能会被销毁,或者某些监听器不再需要。如果这些监听器不被移除,它们将继续占用内存,并在事件触发时被无意义地调用,导致 内存泄漏 和不必要的性能开销。因此,unsubscribe 方法至关重要。

移除监听器的关键在于,我们必须能够精确地识别出要移除的是哪个监听器。这意味着在 unsubscribe 时,需要提供与 subscribe完全相同的函数引用

class EventBus {
    // ... (其他属性和方法)

    /**
     * 移除事件监听器
     * 必须提供与 subscribe 时完全相同的函数引用才能成功移除。
     * @param eventName 事件的名称
     * @param listener 要移除的监听器函数
     */
    unsubscribe<T = any>(eventName: EventName, listener: EventListener<T>): void {
        const eventListeners = this.listeners.get(eventName);

        if (!eventListeners || eventListeners.length === 0) {
            console.log(`[EventBus Info] 事件 '${eventName}' 没有找到监听器,无需移除。`);
            return;
        }

        // 过滤掉匹配的监听器
        const newListeners = eventListeners.filter(l => l !== listener);

        if (newListeners.length < eventListeners.length) {
            // 如果有监听器被移除了
            this.listeners.set(eventName, newListeners);
            console.log(`[EventBus] 事件 '${eventName}' 的一个监听器已被移除。当前监听器数量: ${newListeners.length}`);
            // 如果该事件的所有监听器都已移除,可以进一步清理 Map 中的条目
            if (newListeners.length === 0) {
                this.listeners.delete(eventName);
                console.log(`[EventBus] 事件 '${eventName}' 的所有监听器均已移除,已清理 Map 条目。`);
            }
        } else {
            console.log(`[EventBus Info] 事件 '${eventName}' 未找到匹配的监听器,无法移除。`);
        }
    }

    // ... (其他方法)
}

unsubscribe 方法的逻辑流程

  1. 获取监听器列表:同样,先获取 eventName 对应的监听器数组。
  2. 检查是否存在:如果数组不存在或为空,则直接返回,没有监听器可移除。
  3. 过滤移除:使用 filter() 方法创建一个新的数组 newListeners,其中只包含那些与传入的 listener 不相同 的监听器。这里利用了 JavaScript 函数是引用类型这一特性,只有完全相同的函数引用才能被移除。
  4. 更新 Map:如果 newListeners 的长度小于 eventListeners 的长度,说明至少有一个监听器被成功移除了。我们将更新后的 newListeners 重新设置回 Map
  5. 清理空数组:如果 newListeners 的长度变为 0,意味着该事件已经没有监听器了,此时可以从 Map 中彻底删除这个事件条目,进一步节省内存。

重要提示unsubscribe 的关键在于函数引用的匹配。这意味着匿名函数无法被移除,因为每次创建的匿名函数都是不同的引用。

// 错误示例:匿名函数无法被移除
bus.subscribe('myEvent', (data) => console.log('This is an anonymous listener:', data));
// 尝试移除此匿名函数将失败,因为传入的是一个新的函数引用
bus.unsubscribe('myEvent', (data) => console.log('This is an anonymous listener:', data)); // 无法移除

为了能够移除,必须保存函数引用:

// 正确示例:保存函数引用以便移除
const myListener = (data: any) => console.log('This is a named listener:', data);
bus.subscribe('myEvent', myListener);
bus.unsubscribe('myEvent', myListener); // 可以成功移除

3.2 处理 this 上下文

在 JavaScript 中,函数的 this 上下文是一个常见的“陷阱”。当一个对象的方法作为事件监听器被传递时,如果直接传递 obj.method,那么在事件触发时,method 内部的 this 可能不再指向 obj,而是指向 EventBus 实例或者 undefined(在严格模式下)。

为了确保监听器内部的 this 指向正确的对象,有几种常见的解决方案:

  1. 使用箭头函数:箭头函数没有自己的 this,它会捕获其定义时的 this 上下文。
  2. 使用 Function.prototype.bind():显式地将监听器绑定到其上下文。
  3. subscribe 时传入上下文:EventBus 可以在注册时接收一个 context 参数,并在调用监听器时使用 listener.call(context, payload) 来设置 this

我们选择在 subscribe 时不直接处理 this,而是要求调用者在传入监听器时自行处理。这使得 EventBus 保持简洁,并将 this 管理的责任留给组件自身。

推荐做法

class MyComponent {
    private name: string = 'MyComponent';
    private eventBus: EventBus;

    constructor(bus: EventBus) {
        this.eventBus = bus;
        // 推荐做法 1: 使用箭头函数,this 自动绑定到 MyComponent 实例
        this.eventBus.subscribe('dataReady', (data: string) => this.handleData(data));

        // 推荐做法 2: 使用 bind 显式绑定 this
        this.eventBus.subscribe('componentInit', this.onInit.bind(this));

        // 如果要移除,需要保存绑定后的引用
        const boundHandler = this.onSpecificEvent.bind(this);
        this.eventBus.subscribe('specificEvent', boundHandler);
        // ... 稍后移除
        // this.eventBus.unsubscribe('specificEvent', boundHandler);
    }

    private handleData(data: string): void {
        console.log(`${this.name} 收到数据: ${data}`);
    }

    private onInit(timestamp: number): void {
        console.log(`${this.name} 在 ${new Date(timestamp).toLocaleTimeString()} 初始化。`);
    }

    private onSpecificEvent(payload: any): void {
        console.log(`${this.name} 处理特定事件:`, payload);
    }
}

这种处理方式将 this 上下文的复杂性从 EventBus 内部移除,保持了其职责的单一性。

3.3 实现 once 方法:只触发一次的监听器

有些场景下,我们只需要监听某个事件发生一次,例如用户第一次登录成功后执行某个初始化操作。once 方法就是为此而生。它注册一个监听器,该监听器在事件被触发一次后,就会自动从 EventBus 中移除。

实现 once 的思路是,在内部创建一个包装函数,这个包装函数在被调用后,除了执行原始监听器,还会立即调用 unsubscribe 方法将自身移除。

class EventBus {
    // ... (其他属性和方法)

    /**
     * 注册只触发一次的事件监听器
     * @param eventName 事件的名称
     * @param listener 监听器函数
     */
    once<T = any>(eventName: EventName, listener: EventListener<T>): void {
        // 创建一个包装函数
        const onceWrapper: EventListener<T> = (payload: T) => {
            // 在执行原始监听器之前,先将包装函数从 EventBus 中移除
            this.unsubscribe(eventName, onceWrapper);
            // 然后执行原始监听器
            listener(payload);
        };

        // 将包装函数注册到 EventBus
        this.subscribe(eventName, onceWrapper);
        console.log(`[EventBus] 事件 '${eventName}' 添加了一个 'once' 监听器。`);
    }

    // ... (其他方法)
}

once 方法的逻辑流程

  1. 创建包装函数onceWrapper 是一个新函数,它接收 payload 参数。
  2. 自移除逻辑:在 onceWrapper 内部,当它被调用时,首先调用 this.unsubscribe(eventName, onceWrapper)onceWrapper 自己从 EventBus 中移除。
  3. 执行原始监听器:然后,它再调用原始的 listener 函数,并传入 payload
  4. 注册包装函数:最后,将这个 onceWrapper 注册到 EventBus 中,而不是原始的 listener

这样,当 eventName 事件第一次触发时,onceWrapper 会被调用,它会执行原始监听器,并立即将自己从订阅列表中移除,确保下次事件触发时不再被调用。

3.4 增强错误处理机制

我们已经在 publish 方法中为每个监听器添加了 try-catch 块,以防止单个监听器出错导致整个事件分发中断。这是 EventBus 健壮性的一个重要方面。

错误处理策略

  • 捕获并记录:当监听器抛出错误时,捕获它并将其记录到控制台或日志服务中。
  • 不中断其他监听器:确保一个监听器的错误不会阻止其他监听器的执行。
  • 可选的错误回调:更高级的 EventBus 可以提供一个全局的错误处理回调函数,允许应用自定义错误报告或恢复逻辑。

当前实现已经满足了前两点。如果需要全局错误回调,可以这样扩展:

// 在 EventBus 类中添加一个属性来存储错误处理器
private errorHandler: ((eventName: EventName, error: Error, payload: any) => void) | null = null;

// 提供一个方法来设置全局错误处理器
public setErrorHandler(handler: (eventName: EventName, error: Error, payload: any) => void): void {
    this.errorHandler = handler;
}

// 修改 publish 方法
publish<T = any>(eventName: EventName, payload?: T): void {
    const eventListeners = this.listeners.get(eventName);

    if (eventListeners && eventListeners.length > 0) {
        console.log(`[EventBus] 事件 '${eventName}' 被触发,共 ${eventListeners.length} 个监听器将被调用。`);
        [...eventListeners].forEach(listener => {
            try {
                listener(payload as T);
            } catch (error: any) { // 使用 any 类型来捕获所有可能的错误类型
                if (this.errorHandler) {
                    this.errorHandler(eventName, error, payload);
                } else {
                    console.error(`[EventBus Error] 在处理事件 '${eventName}' 时,监听器抛出错误:`, error);
                }
            }
        });
    } else {
        console.log(`[EventBus Info] 事件 '${eventName}' 被触发,但没有找到任何监听器。`);
    }
}

现在,应用可以自定义如何处理 EventBus 内部的监听器错误:

const bus = new EventBus();

bus.setErrorHandler((eventName, error, payload) => {
    console.warn(`自定义错误处理:事件 '${eventName}' 发生错误,错误信息: ${error.message}`);
    // 可以上报到错误监控系统,或者进行其他恢复操作
});

bus.subscribe('errorEvent', () => {
    throw new Error('这是一个测试错误!');
});

bus.publish('errorEvent');
// 预期输出:
// [EventBus] 事件 'errorEvent' 添加了一个新的监听器。当前监听器数量: 1
// [EventBus] 事件 'errorEvent' 被触发,共 1 个监听器将被调用。
// 自定义错误处理:事件 'errorEvent' 发生错误,错误信息: 这是一个测试错误!

通过这种方式,EventBus 变得更加灵活和健壮,允许应用根据自身需求来管理错误。

3.5 完善后的 EventBus 示例

type EventName = string;
type EventListener<T = any> = (payload: T) => void;
type ErrorHandler = (eventName: EventName, error: Error, payload: any) => void;

class EventBus {
    private listeners: Map<EventName, EventListener[]>;
    private errorHandler: ErrorHandler | null = null;

    constructor() {
        this.listeners = new Map<EventName, EventListener[]>();
        console.log("[EventBus] 初始化完成。");
    }

    public setErrorHandler(handler: ErrorHandler): void {
        this.errorHandler = handler;
        console.log("[EventBus] 已设置全局错误处理器。");
    }

    subscribe<T = any>(eventName: EventName, listener: EventListener<T>): void {
        if (!this.listeners.has(eventName)) {
            this.listeners.set(eventName, []);
        }
        // 避免重复订阅同一个监听器,这在某些场景下可能是期望的,但通常会避免。
        // 如果需要严格防止重复,可以在这里添加检查:
        // if (!this.listeners.get(eventName)!.includes(listener)) {
        //     this.listeners.get(eventName)!.push(listener);
        // }
        this.listeners.get(eventName)!.push(listener);
        console.log(`[EventBus] 事件 '${eventName}' 添加了一个新的监听器。当前监听器数量: ${this.listeners.get(eventName)!.length}`);
    }

    unsubscribe<T = any>(eventName: EventName, listener: EventListener<T>): void {
        const eventListeners = this.listeners.get(eventName);

        if (!eventListeners || eventListeners.length === 0) {
            console.log(`[EventBus Info] 事件 '${eventName}' 没有找到监听器,无需移除。`);
            return;
        }

        const initialLength = eventListeners.length;
        const newListeners = eventListeners.filter(l => l !== listener);

        if (newListeners.length < initialLength) {
            this.listeners.set(eventName, newListeners);
            console.log(`[EventBus] 事件 '${eventName}' 的一个监听器已被移除。当前监听器数量: ${newListeners.length}`);
            if (newListeners.length === 0) {
                this.listeners.delete(eventName);
                console.log(`[EventBus] 事件 '${eventName}' 的所有监听器均已移除,已清理 Map 条目。`);
            }
        } else {
            console.log(`[EventBus Info] 事件 '${eventName}' 未找到匹配的监听器,无法移除。`);
        }
    }

    once<T = any>(eventName: EventName, listener: EventListener<T>): void {
        const onceWrapper: EventListener<T> = (payload: T) => {
            this.unsubscribe(eventName, onceWrapper); // 移除自身
            listener(payload); // 执行原始监听器
        };
        this.subscribe(eventName, onceWrapper);
        console.log(`[EventBus] 事件 '${eventName}' 添加了一个 'once' 监听器。`);
    }

    publish<T = any>(eventName: EventName, payload?: T): void {
        const eventListeners = this.listeners.get(eventName);

        if (eventListeners && eventListeners.length > 0) {
            console.log(`[EventBus] 事件 '${eventName}' 被触发,共 ${eventListeners.length} 个监听器将被调用。`);
            // 遍历时复制数组,防止在循环中修改数组(如 once 监听器)导致迭代问题
            [...eventListeners].forEach(listener => {
                try {
                    listener(payload as T);
                } catch (error: any) {
                    if (this.errorHandler) {
                        this.errorHandler(eventName, error, payload);
                    } else {
                        console.error(`[EventBus Error] 在处理事件 '${eventName}' 时,监听器抛出错误:`, error);
                    }
                }
            });
        } else {
            console.log(`[EventBus Info] 事件 '${eventName}' 被触发,但没有找到任何监听器。`);
        }
    }

    /**
     * 清空所有事件和监听器
     */
    clearAll(): void {
        this.listeners.clear();
        console.log("[EventBus] 所有事件和监听器已被清空。");
    }

    /**
     * 获取指定事件的所有监听器数量
     */
    getListenerCount(eventName: EventName): number {
        return this.listeners.has(eventName) ? this.listeners.get(eventName)!.length : 0;
    }

    /**
     * 获取所有注册的事件名称
     */
    getEventNames(): EventName[] {
        return Array.from(this.listeners.keys());
    }
}

// 使用示例
const bus = new EventBus();

// 设置一个全局错误处理器
bus.setErrorHandler((eventName, error, payload) => {
    console.error(`[Custom Error Handler] 事件 '${eventName}' 监听器执行失败,原因: ${error.message}`, { payload, error });
});

// 定义一些监听器
const userLoginHandler = (user: { id: string, name: string }) => {
    console.log(`User ${user.name} (ID: ${user.id}) logged in.`);
};
const greetUserHandler = (user: { id: string, name: string }) => {
    console.log(`Welcome, ${user.name}!`);
};
const logoutLogger = () => {
    console.log('User logged out.');
};
const errorHandlerTest = () => {
    throw new Error('Something went wrong in this handler!');
};
const onceMessageHandler = (msg: string) => {
    console.log(`This message will only be seen once: ${msg}`);
};

// 订阅事件
bus.subscribe('user.login', userLoginHandler);
bus.subscribe('user.login', greetUserHandler);
bus.subscribe('user.logout', logoutLogger);
bus.subscribe('error.test', errorHandlerTest);
bus.once('initialLoad', onceMessageHandler);

console.log('n--- Initial State ---');
console.log('Listeners for user.login:', bus.getListenerCount('user.login'));
console.log('Listeners for initialLoad:', bus.getListenerCount('initialLoad'));

// 触发事件
console.log('n--- Publishing Events ---');
bus.publish('user.login', { id: '123', name: 'Alice' });
bus.publish('user.login', { id: '456', name: 'Bob' }); // 再次触发

bus.publish('user.logout');

bus.publish('error.test'); // 测试错误处理

console.log('n--- Publishing once event ---');
bus.publish('initialLoad', 'Application started!');
bus.publish('initialLoad', 'This should not appear!'); // 再次触发,但 once 监听器已被移除

console.log('n--- State after once event ---');
console.log('Listeners for initialLoad:', bus.getListenerCount('initialLoad')); // 应该为 0

// 移除监听器
console.log('n--- Unsubscribing ---');
bus.unsubscribe('user.login', userLoginHandler);
console.log('Listeners for user.login after unsubscribe:', bus.getListenerCount('user.login'));

// 尝试移除不存在的监听器
bus.unsubscribe('user.login', () => console.log('I am a new function'));

// 清空所有
console.log('n--- Clearing all listeners ---');
bus.clearAll();
console.log('All event names after clear:', bus.getEventNames());

/*
预期输出概览:
[EventBus] 初始化完成。
[EventBus] 已设置全局错误处理器。
[EventBus] 事件 'user.login' 添加了一个新的监听器。当前监听器数量: 1
[EventBus] 事件 'user.login' 添加了一个新的监听器。当前监听器数量: 2
[EventBus] 事件 'user.logout' 添加了一个新的监听器。当前监听器数量: 1
[EventBus] 事件 'error.test' 添加了一个新的监听器。当前监听器数量: 1
[EventBus] 事件 'initialLoad' 添加了一个 'once' 监听器。当前监听器数量: 1

--- Initial State ---
Listeners for user.login: 2
Listeners for initialLoad: 1

--- Publishing Events ---
[EventBus] 事件 'user.login' 被触发,共 2 个监听器将被调用。
User Alice (ID: 123) logged in.
Welcome, Alice!
[EventBus] 事件 'user.login' 被触发,共 2 个监听器将被调用。
User Bob (ID: 456) logged in.
Welcome, Bob!
[EventBus] 事件 'user.logout' 被触发,共 1 个监听器将被调用。
User logged out.
[EventBus] 事件 'error.test' 被触发,共 1 个监听器将被调用。
[Custom Error Handler] 事件 'error.test' 监听器执行失败,原因: Something went wrong in this handler! { payload: undefined, error: Error: Something went wrong in this handler! ... }

--- Publishing once event ---
[EventBus] 事件 'initialLoad' 被触发,共 1 个监听器将被调用。
[EventBus] 事件 'initialLoad' 的一个监听器已被移除。当前监听器数量: 0
[EventBus] 事件 'initialLoad' 的所有监听器均已移除,已清理 Map 条目。
This message will only be seen once: Application started!
[EventBus Info] 事件 'initialLoad' 被触发,但没有找到任何监听器。

--- State after once event ---
Listeners for initialLoad: 0

--- Unsubscribing ---
[EventBus] 事件 'user.login' 的一个监听器已被移除。当前监听器数量: 1
Listeners for user.login after unsubscribe: 1
[EventBus Info] 事件 'user.login' 未找到匹配的监听器,无法移除。

--- Clearing all listeners ---
[EventBus] 所有事件和监听器已被清空。
All event names after clear: []
*/

第四章:进阶 EventBus 特性 – 灵活性与可扩展性

现在我们已经有了一个功能完善且健壮的 EventBus。但作为编程专家,我们不能止步于此。接下来,我们将探讨一些进阶特性,它们能让 EventBus 在更复杂的场景中表现出色,并提供更高的灵活性和可扩展性。

4.1 异步事件处理

前面提到,同步事件处理可能会阻塞主线程。在许多场景下,尤其是当监听器可能执行耗时操作时,异步处理是更优的选择。异步处理意味着 publish 方法会立即返回,而监听器在后台(或在下一个事件循环周期)执行。

实现异步 publish 的几种方式

  1. setTimeout(fn, 0):将监听器放入宏任务队列,在当前事件循环结束后执行。
  2. Promise.resolve().then(fn):将监听器放入微任务队列,在当前同步代码执行后,宏任务之前执行。通常比 setTimeout(0) 更快。
  3. queueMicrotask(fn) (现代浏览器和 Node.js):与 Promise.resolve().then(fn) 类似,也是放入微任务队列。

我们选择 Promise.resolve().then() 作为异步处理的方式,因为它在大多数现代环境中表现稳定且性能较好。

class EventBus {
    // ... (其他属性和方法)

    /**
     * 异步触发事件
     * 会在下一个微任务队列中通知所有订阅了该事件的监听器,不会阻塞当前主线程。
     * @param eventName 事件的名称
     * @param payload 事件的载荷(数据)
     */
    publishAsync<T = any>(eventName: EventName, payload?: T): Promise<void[]> {
        const eventListeners = this.listeners.get(eventName);

        if (!eventListeners || eventListeners.length === 0) {
            console.log(`[EventBus Info] 异步事件 '${eventName}' 被触发,但没有找到任何监听器。`);
            return Promise.resolve([]); // 返回一个已解决的 Promise
        }

        console.log(`[EventBus] 异步事件 '${eventName}' 被触发,共 ${eventListeners.length} 个监听器将被异步调用。`);

        // 创建一个 Promise 数组,每个 Promise 代表一个异步监听器的执行
        const listenerPromises = [...eventListeners].map(listener => {
            return Promise.resolve().then(() => {
                try {
                    listener(payload as T);
                } catch (error: any) {
                    if (this.errorHandler) {
                        this.errorHandler(eventName, error, payload);
                    } else {
                        console.error(`[EventBus Error] 在异步处理事件 '${eventName}' 时,监听器抛出错误:`, error);
                    }
                }
            });
        });

        // 返回一个 Promise,该 Promise 在所有监听器都执行完毕(无论成功或失败)后解决
        return Promise.all(listenerPromises) as Promise<void[]>;
    }

    // ... (其他方法)
}

publishAsync 方法的逻辑流程

  1. 获取监听器列表:与同步 publish 相同。
  2. 创建 Promise 数组:对于每个监听器,我们不直接执行它,而是创建一个 Promise.resolve().then(...)。这会将监听器包装在一个微任务中。
  3. 异步执行与错误处理:每个微任务中依然包含 try-catch 块,确保异步执行时的错误也能被捕获和处理。
  4. 返回 Promise.allpublishAsync 方法返回 Promise.all(listenerPromises)。这意味着调用者可以 await 这个 Promise,以等待所有异步监听器执行完毕。如果不需要等待,可以简单地调用它而不 await

同步与异步的权衡

特性 同步 publish 异步 publish
性能 可能阻塞主线程,影响 UI 响应性。 不阻塞主线程,提高 UI 响应性。
时序 监听器按注册顺序立即执行。 监听器在下一个事件循环周期执行,顺序可能不严格保证(取决于 Promise 调度),但通常保持注册顺序。
复杂度 实现简单。 引入 Promise,略微增加实现和理解的复杂性。
错误 错误立即抛出,可以在当前调用栈中捕获。 错误在异步上下文中抛出,需要通过 Promise 链捕获或 EventBus 的错误处理器。
适用场景 对响应时间要求极高且监听器操作轻量级的场景。 大多数前端应用,监听器可能耗时或涉及 IO 操作的场景。

在实际应用中,通常会提供同步和异步两种 publish 方法,让开发者根据具体需求选择。

4.2 事件优先级 (Event Prioritization)

在某些情况下,我们希望某些监听器在其他监听器之前或之后执行。例如,一个日志记录器可能希望在任何业务逻辑之前捕获事件,或者一个数据校验器希望在数据处理之前运行。这可以通过为监听器分配优先级来实现。

实现思路

  1. subscribe 增强subscribe 方法接受一个额外的 priority 参数(例如,一个数字,数字越大优先级越高)。
  2. 存储结构变更:监听器不再是简单地存储在数组中,而是存储为包含监听器函数和优先级的对象。
  3. publish 排序:在 publish 时,先根据优先级对监听器进行排序,然后再依次执行。
// 定义一个包含监听器函数和优先级的接口
interface PriorityEventListener<T = any> {
    listener: EventListener<T>;
    priority: number; // 优先级,数字越大优先级越高
    originalListener?: EventListener<T>; // 用于 once 包装函数的原始引用
}

class EventBus {
    // 存储结构修改为 Map<EventName, PriorityEventListener[]>
    private listeners: Map<EventName, PriorityEventListener[]>;

    constructor() {
        this.listeners = new Map<EventName, PriorityEventListener[]>();
        // ...
    }

    // 修改 subscribe 方法以接受优先级
    subscribe<T = any>(eventName: EventName, listener: EventListener<T>, priority: number = 0): void {
        if (!this.listeners.has(eventName)) {
            this.listeners.set(eventName, []);
        }
        const eventListeners = this.listeners.get(eventName)!;
        eventListeners.push({ listener, priority });
        // 每次添加后重新排序,或者只在 publish 时排序
        eventListeners.sort((a, b) => b.priority - a.priority); // 降序排列,高优先级在前
        console.log(`[EventBus] 事件 '${eventName}' 添加了一个新的监听器 (优先级: ${priority})。`);
    }

    // 修改 unsubscribe 方法以匹配 PriorityEventListener
    unsubscribe<T = any>(eventName: EventName, listener: EventListener<T>): void {
        const eventListeners = this.listeners.get(eventName);
        if (!eventListeners || eventListeners.length === 0) return;

        const initialLength = eventListeners.length;
        // 过滤时匹配内部的 listener 函数
        const newListeners = eventListeners.filter(item => {
            // 需要考虑 once 包装的情况,如果 item 是包装函数,则比较 originalListener
            return item.listener !== listener && item.originalListener !== listener;
        });

        if (newListeners.length < initialLength) {
            this.listeners.set(eventName, newListeners);
            if (newListeners.length === 0) {
                this.listeners.delete(eventName);
            }
            console.log(`[EventBus] 事件 '${eventName}' 的一个监听器已被移除。`);
        } else {
            console.log(`[EventBus Info] 事件 '${eventName}' 未找到匹配的监听器,无法移除。`);
        }
    }

    // 修改 once 方法以支持优先级
    once<T = any>(eventName: EventName, listener: EventListener<T>, priority: number = 0): void {
        const onceWrapper: EventListener<T> = (payload: T) => {
            this.unsubscribe(eventName, listener); // 注意这里移除的是原始监听器,而不是 onceWrapper
            listener(payload);
        };
        // 为了让 unsubscribe 能识别 onceWrapper 的原始 listener,我们需要在 onceWrapper 对象中保存原始 listener
        const onceItem: PriorityEventListener<T> = {
            listener: onceWrapper,
            priority,
            originalListener: listener // 保存原始监听器引用
        };
        if (!this.listeners.has(eventName)) {
            this.listeners.set(eventName, []);
        }
        this.listeners.get(eventName)!.push(onceItem);
        this.listeners.get(eventName)!.sort((a, b) => b.priority - a.priority);
        console.log(`[EventBus] 事件 '${eventName}' 添加了一个 'once' 监听器 (优先级: ${priority})。`);
    }

    // 修改 publish 方法以遍历 PriorityEventListener
    publish<T = any>(eventName: EventName, payload?: T): void {
        const eventListeners = this.listeners.get(eventName);

        if (eventListeners && eventListeners.length > 0) {
            console.log(`[EventBus] 事件 '${eventName}' 被触发,共 ${eventListeners.length} 个监听器将被调用。`);
            // 遍历时复制数组,防止在循环中修改数组
            [...eventListeners].forEach(item => { // item 是 { listener, priority }
                try {
                    item.listener(payload as T); // 执行监听器
                } catch (error: any) {
                    if (this.errorHandler) {
                        this.errorHandler(eventName, error, payload);
                    } else {
                        console.error(`[EventBus Error] 在处理事件 '${eventName}' 时,监听器抛出错误:`, error);
                    }
                }
            });
        } else {
            // ...
        }
    }

    // publishAsync 同样需要修改以遍历 PriorityEventListener
    publishAsync<T = any>(eventName: EventName, payload?: T): Promise<void[]> {
        const eventListeners = this.listeners.get(eventName);
        if (!eventListeners || eventListeners.length === 0) return Promise.resolve([]);

        console.log(`[EventBus] 异步事件 '${eventName}' 被触发,共 ${eventListeners.length} 个监听器将被异步调用。`);

        const listenerPromises = [...eventListeners].map(item => { // item 是 { listener, priority }
            return Promise.resolve().then(() => {
                try {
                    item.listener(payload as T);
                } catch (error: any) {
                    if (this.errorHandler) {
                        this.errorHandler(eventName, error, payload);
                    } else {
                        console.error(`[EventBus Error] 在异步处理事件 '${eventName}' 时,监听器抛出错误:`, error);
                    }
                }
            });
        });
        return Promise.all(listenerPromises) as Promise<void[]>;
    }
}

优先级实现的关键点

  1. 数据结构变化:监听器不再是函数本身,而是 { listener: Function, priority: number } 对象。
  2. 排序:在 subscribepublish 时对监听器数组进行排序,确保高优先级的监听器在前。这里我们选择在每次 subscribe 后都进行一次排序,这样 publish 就可以直接遍历已排序的数组。
  3. unsubscribe 逻辑调整:由于 listeners 数组中存储的是对象,unsubscribe 也要相应地修改,以正确识别和移除包装在对象中的监听器。对于 once 监听器,unsubscribe 需要能够识别原始的 listener,因此在 PriorityEventListener 中添加了 originalListener 字段来辅助。

通过引入优先级,我们可以更精细地控制事件处理的顺序,这在复杂的业务逻辑和模块间协作中非常有用。

4.3 通配符事件 (Wildcard Events)

通配符事件允许订阅者监听一系列相关的事件,而无需为每个具体事件单独订阅。例如,订阅 user.* 可以监听 user.createduser.updateduser.deleted 等所有以 user. 开头的事件。

实现思路

  1. 事件名称解析:在 subscribepublish 时,需要解析事件名称。
  2. subscribe 增强:当订阅 user.* 时,将其作为一种特殊的模式存储。
  3. publish 匹配:当发布 user.created 时,不仅要查找精确匹配 user.created 的监听器,还要查找匹配 user.* 的监听器。

这是一个相对复杂的特性,因为它涉及到模式匹配算法。对于一个 EventBus 的核心功能来说,通常不直接实现通配符,而是通过更高级的事件系统(如消息队列、RxJS)来处理。但我们可以简单地说明其概念和挑战。

挑战

  • 匹配算法:如何高效地匹配 user.*user.profile.updated 这样的模式?可能需要树形结构或正则表达式。
  • 性能:如果有很多通配符订阅,每次 publish 都进行大量模式匹配可能会影响性能。
  • 优先级冲突:如果 user.*user.created 都被订阅,且都有优先级,如何确定它们的执行顺序?

示例(概念性,不深入代码实现)

假设一个简化版,只支持 event.subeventevent.*

// 内部可能需要维护两类监听器:
// private exactListeners: Map<string, PriorityEventListener[]>; // 精确匹配
// private wildcardListeners: Map<string, PriorityEventListener[]>; // 通配符匹配,如 'user' -> [listener for 'user.*']

// publish 方法逻辑:
// 1. 查找 exactListeners.get(eventName)
// 2. 解析 eventName,例如 'user.created' -> 提取 'user'
// 3. 查找 wildcardListeners.get('user')
// 4. 合并并去重所有匹配的监听器,然后按优先级排序执行。

通配符事件增加了 EventBus 的灵活性,但也增加了内部复杂性和潜在的性能开销。在设计时需要权衡其带来的好处与实现成本。

4.4 EventBus 实例管理:单例与多实例

在应用中,我们 EventBus 的实例化方式会影响其作用范围和管理方式。

1. 单例模式 (Singleton EventBus)

  • 特点:整个应用只存在一个 EventBus 实例。
  • 优点
    • 全局通信:任何组件都可以访问这个唯一的实例,方便进行全局范围的事件通信。
    • 简单:无需考虑多个实例的传递和管理。
  • 缺点
    • 命名冲突:所有事件都在同一个命名空间下,可能导致事件名称冲突。
    • 调试困难:事件流可能变得复杂,难以追踪某个事件是由哪个组件触发,又影响了哪些组件。
    • 测试困难:全局单例可能导致测试之间相互影响,需要每次测试后清理状态。
  • 实现:通常通过一个全局变量或模块导出一个 EventBus 实例。
// singletonEventBus.ts
import { EventBus } from './EventBus'; // 假设你的 EventBus 类在一个单独的文件中

export const globalEventBus = new EventBus();

// 其他模块
// import { globalEventBus } from './singletonEventBus';
// globalEventBus.subscribe(...);

2. 多实例模式 (Multiple EventBus Instances)

  • 特点:应用可以拥有多个 EventBus 实例,每个实例负责特定模块或组件的事件通信。
  • 优点
    • 作用域明确:每个 EventBus 实例只处理特定范围内的事件,避免了全局命名冲突。
    • 解耦更彻底:模块间的事件通信更加清晰,一个模块的 EventBus 不会影响其他模块。
    • 易于测试:测试时可以为每个模块创建独立的 EventBus 实例,避免相互影响。
  • 缺点
    • 管理开销:需要决定何时创建、销毁和传递这些实例。
    • 跨模块通信:如果两个模块需要通信,但它们有各自的 EventBus,可能需要引入一个“桥接” EventBus 或更高级的通信机制。
  • 实现:根据需要实例化 EventBus。
// moduleA.ts
import { EventBus } from './EventBus';

export class ModuleA {
    private eventBus: EventBus;
    constructor() {
        this.eventBus = new EventBus(); // 模块 A 自己的 EventBus
        this.eventBus.subscribe('moduleA.init', () => console.log('Module A initialized.'));
    }
    getBus() { return this.eventBus; }
}

// moduleB.ts
import { EventBus } from './EventBus';

export class ModuleB {
    private eventBus: EventBus;
    constructor() {
        this.eventBus = new EventBus(); // 模块 B 自己的 EventBus
        this.eventBus.subscribe('moduleB.start', () => console.log('Module B started.'));
    }
    getBus() { return this.eventBus; }
}

// main.ts
const moduleA = new ModuleA();
const moduleB = new ModuleB();

// 如果 ModuleA 需要通知 ModuleB,它们不能直接通过各自的 bus,
// 可能需要一个共同的父级 EventBus 或者显式地传递
// moduleA.getBus().publish('moduleA.init');
// moduleB.getBus().publish('moduleB.start');

何时选择哪种模式

  • 小型应用或全局性事件:选择单例模式,简单方便。
  • 中大型应用,强模块划分:选择多实例模式,配合明确的事件命名规范和通信策略,可以带来更好的可维护性和扩展性。
  • 混合模式:可以有一个全局的 EventBus 处理应用范围的事件,同时每个主要模块也可以有自己的局部 EventBus 处理模块内部事件。

第五章:深度探讨 – 设计考量与最佳实践

EventBus 虽好,但并非银弹。在使用和设计时,我们需要考虑更多因素,以确保其能真正带来价值,而不是引入新的复杂性。

5.1 类型安全 (TypeScript)

在 TypeScript 项目中,为 EventBus 引入类型安全可以极大地提高开发效率和代码质量,减少运行时错误。我们可以定义事件的类型及其载荷的结构。

实现思路

  1. 定义事件映射接口:一个接口来映射事件名称到其载荷类型。
  2. 泛型 EventBus:让 EventBus 类本身成为泛型,以接受这个事件映射接口。
// 定义一个事件映射接口
interface AppEvents {
    'user.login': { id: string; name: string; timestamp: number };
    'user.logout': undefined; // 或者 void
    'data.loaded': string[];
    'error.critical': { code: number; message: string; details?: any };
}

// 修改 EventBus 类,使其成为泛型
class TypedEventBus<Events extends Record<string, any>> {
    private listeners: Map<keyof Events, PriorityEventListener<any>>; // keyof Events 限制事件名称
    private errorHandler: ErrorHandler | null = null;

    constructor() {
        this.listeners = new Map<keyof Events, PriorityEventListener<any>>();
        console.log("[TypedEventBus] 初始化完成。");
    }

    public setErrorHandler(handler: ErrorHandler): void {
        this.errorHandler = handler;
        console.log("[TypedEventBus] 已设置全局错误处理器。");
    }

    // subscribe 方法现在可以推断 payload 类型
    subscribe<K extends keyof Events>(eventName: K, listener: EventListener<Events[K]>, priority: number = 0): void {
        if (!this.listeners.has(eventName)) {
            this.listeners.set(eventName, []);
        }
        const eventListeners = this.listeners.get(eventName)! as PriorityEventListener<Events[K]>[]; // 类型断言
        eventListeners.push({ listener: listener as EventListener<any>, priority });
        eventListeners.sort((a, b) => b.priority - a.priority);
        console.log(`[TypedEventBus] 事件 '${String(eventName)}' 添加了一个新的监听器 (优先级: ${priority})。`);
    }

    // unsubscribe 方法同样推断 payload 类型
    unsubscribe<K extends keyof Events>(eventName: K, listener: EventListener<Events[K]>): void {
        const eventListeners = this.listeners.get(eventName);
        if (!eventListeners || eventListeners.length === 0) return;

        const initialLength = eventListeners.length;
        const newListeners = eventListeners.filter(item => {
            return (item.listener !== (listener as EventListener<any>)) && (item.originalListener !== (listener as EventListener<any>));
        });

        if (newListeners.length < initialLength) {
            this.listeners.set(eventName, newListeners as PriorityEventListener<any>[]);
            if (newListeners.length === 0) {
                this.listeners.delete(eventName);
            }
            console.log(`[TypedEventBus] 事件 '${String(eventName)}' 的一个监听器已被移除。`);
        } else {
            console.log(`[TypedEventBus Info] 事件 '${String(eventName)}' 未找到匹配的监听器,无法移除。`);
        }
    }

    // once 方法同样推断 payload 类型
    once<K extends keyof Events>(eventName: K, listener: EventListener<Events[K]>, priority: number = 0): void {
        const onceWrapper: EventListener<Events[K]> = (payload: Events[K]) => {
            this.unsubscribe(eventName, listener); // 移除原始监听器
            listener(payload);
        };
        const onceItem: PriorityEventListener<Events[K]> = {
            listener: onceWrapper as EventListener<any>,
            priority,
            originalListener: listener as EventListener<any>
        };
        if (!this.listeners.has(eventName)) {
            this.listeners.set(eventName, []);
        }
        (this.listeners.get(eventName)! as PriorityEventListener<Events[K]>[]).push(onceItem);
        (this.listeners.get(eventName)! as PriorityEventListener<Events[K]>[]).sort((a, b) => b.priority - a.priority);
        console.log(`[TypedEventBus] 事件 '${String(eventName)}' 添加了一个 'once' 监听器 (优先级: ${priority})。`);
    }

    // publish 方法现在可以推断 payload 类型
    publish<K extends keyof Events>(eventName: K, payload: Events[K]): void {
        const eventListeners = this.listeners.get(eventName);

        if (eventListeners && eventListeners.length > 0) {
            console.log(`[TypedEventBus] 事件 '${String(eventName)}' 被触发,共 ${eventListeners.length} 个监听器将被调用。`);
            [...eventListeners].forEach(item => {
                try {
                    item.listener(payload);
                } catch (error: any) {
                    if (this.errorHandler) {
                        this.errorHandler(String(eventName), error, payload);
                    } else {
                        console.error(`[TypedEventBus Error] 在处理事件 '${String(eventName)}' 时,监听器抛出错误:`, error);
                    }
                }
            });
        } else {
            console.log(`[TypedEventBus Info] 事件 '${String(eventName)}' 被触发,但没有找到任何监听器。`);
        }
    }

    // publishAsync 同样推断 payload 类型
    publishAsync<K extends keyof Events>(eventName: K, payload: Events[K]): Promise<void[]> {
        const eventListeners = this.listeners.get(eventName);
        if (!eventListeners || eventListeners.length === 0) return Promise.resolve([]);

        console.log(`[TypedEventBus] 异步事件 '${String(eventName)}' 被触发,共 ${eventListeners.length} 个监听器将被异步调用。`);

        const listenerPromises = [...eventListeners].map(item => {
            return Promise.resolve().then(() => {
                try {
                    item.listener(payload);
                } catch (error: any) {
                    if (this.errorHandler) {
                        this.errorHandler(String(eventName), error, payload);
                    } else {
                        console.error(`[TypedEventBus Error] 在异步处理事件 '${String(eventName)}' 时,监听器抛出错误:`, error);
                    }
                }
            });
        });
        return Promise.all(listenerPromises) as Promise<void[]>;
    }

    // ... (clearAll, getListenerCount, getEventNames 等辅助方法)
    clearAll(): void {
        this.listeners.clear();
        console.log("[TypedEventBus] 所有事件和监听器已被清空。");
    }

    getListenerCount(eventName: keyof Events): number {
        return this.listeners.has(eventName) ? this.listeners.get(eventName)!.length : 0;
    }

    getEventNames(): (keyof Events)[] {
        return Array.from(this.listeners.keys());
    }
}

// 使用示例
const bus = new TypedEventBus<AppEvents>();

bus.subscribe('user.login', (data) => {
    // data 会被自动推断为 { id: string, name: string, timestamp: number }
    console.log(`User ${data.name} logged in at ${new Date(data.timestamp).toLocaleTimeString()}`);
});

bus.subscribe('user.logout', () => {
    // data 会被自动推断为 undefined
    console.log('User logged out.');
});

// 尝试订阅一个不存在的事件,TypeScript 会报错
// bus.subscribe('nonexistent.event', () => {}); // 编译错误!

// 尝试发布事件时,payload 类型不匹配也会报错
// bus.publish('user.login', { id: '123', name: 'Alice' }); // 编译错误,缺少 timestamp
bus.publish('user.login', { id: '123', name: 'Alice', timestamp: Date.now() });

bus.publish('user.logout', undefined); // 正确

// bus.publish('user.logout', { some: 'data' }); // 编译错误,期望 undefined

通过泛型和 keyof 操作符,EventBus 获得了强大的类型检查能力。这在大型项目中尤为重要,它能在编译阶段捕获错误,而不是等到运行时。

5.2 性能优化

虽然 EventBus 的核心操作相对简单,但在高并发事件或大量监听器的场景下,性能依然值得关注。

  • 监听器数量:避免单个事件有过多的监听器(例如几百上千个)。如果存在,考虑是否可以合并监听器或优化业务逻辑。
  • 事件触发频率:如果某些事件触发非常频繁(例如鼠标移动、滚动),而监听器又执行耗时操作,可能会导致性能问题。此时可以考虑使用 节流(throttle)防抖(debounce) 来限制监听器的执行频率。
  • 内部数据结构Map 和数组 filterpushsort 操作在大多数情况下性能良好。但如果监听器数组非常庞大,filtersort 操作可能会有性能开销。
    • 优化 unsubscribe:如果 unsubscribe 调用频繁,filter 每次都创建新数组可能效率不高。可以考虑使用链表结构,或者在数组中标记为“已移除”并在 publish 时跳过,定期进行清理("compact")。但对于大多数应用,目前的 filter 方案足够。
    • 优化 sort:如果 subscribe 频繁且监听器数量多,每次 sort 也会有开销。可以只在 publish 时才进行排序,或者使用插入排序保持有序。
  • 避免不必要的事件:只发布真正需要被监听的事件。

5.3 内存管理与调试

  • 及时 unsubscribe:这是防止内存泄漏最关键的一点。当组件被销毁或不再需要监听时,必须调用 unsubscribe。在 React/Vue 等框架中,这通常在组件的 componentWillUnmountonUnmounted 生命周期钩子中完成。
  • 提供调试工具
    • getListenerCount(eventName):获取某个事件的监听器数量,帮助检查是否有未移除的监听器。
    • getEventNames():获取所有已注册的事件名称。
    • listAllListeners():一个更高级的调试方法,可以打印出所有事件及其对应的监听器(可能需要修改 PriorityEventListener 来存储监听器函数的名称或一个 ID)。

这些辅助方法在我们调试 EventBus 相关的内存泄漏或事件流问题时非常有用。

5.4 与现有框架/库的集成与对比

EventBus 模式在许多框架和库中都有体现,或者有替代方案:

  • React/Vue/Angular:这些框架本身提供了组件间通信的机制(props down, events up, Context API, Vuex/Redux 状态管理,服务等)。在这些框架中,EventBus 常常用于跨组件层级、无直接父子关系的通信,或者作为一种全局通知机制。过度使用 EventBus 可能会使得数据流难以追踪,因此在框架内部通信时,应优先考虑框架自身提供的机制。
  • RxJS (ReactiveX for JavaScript):RxJS 是一个强大的响应式编程库,其 SubjectObservable 非常适合作为 EventBus 的替代品。它提供了更强大的功能,如操作符(map, filter, debounce, throttle, merge, concat)、错误处理、完成通知等。
    • 对比:我们的手写 EventBus 相对轻量,易于理解和控制。RxJS 功能强大但学习曲线较陡峭,引入的体积也更大。对于简单的事件通信,手写 EventBus 足矣;对于复杂的事件流和数据转换,RxJS 更具优势。
  • Node.js EventEmitter:Node.js 内置了 EventEmitter 类,其 API 与我们的 EventBus 类似,也是基于发布-订阅模式。在 Node.js 环境下,可以直接使用它。

选择 EventBus 还是其他方案,取决于项目的规模、复杂性、团队熟悉度以及对功能和性能的需求。

5.5 何时不使用 EventBus

尽管 EventBus 带来了强大的解耦能力,但它并非万能,甚至在某些情况下会引入新的问题:

  • 过度使用导致事件流混乱:如果所有通信都通过 EventBus,系统会变成一个“大泥球”,事件满天飞,很难追踪一个事件的源头和它触发的所有副作用。这被称为“事件风暴”或“事件地狱”。
  • 难以追踪事件源和影响:当一个 bug 出现时,如果事件流不清晰,很难找出是哪个组件发布了错误的事件,或者哪个监听器处理不当。
  • 简单的父子通信:对于简单的父子组件通信,直接使用 props/emit 或回调函数更直接、更易于理解和维护,无需引入 EventBus 的额外抽象。
  • 共享状态管理:EventBus 适合通知“发生了什么”,但不适合管理和同步“当前是什么状态”。对于复杂的状态管理,应该使用专门的状态管理库(如 Redux, Vuex, MobX)。

最佳实践

  • 明确事件命名规范:使用有意义的、层级化的事件名称(如 user.loggedIn, order.created, ui.modal.opened)。
  • 事件载荷清晰:事件 payload 应该是一个结构化的数据对象,包含所有必要的信息。
  • 适度使用:将其用于解耦跨模块、跨层级且无直接依赖的通信。
  • 文档化:记录所有事件的名称、载荷结构和预期行为,方便团队成员理解和使用。

第六章:EventBus 在现代应用中的定位与展望

EventBus 作为发布-订阅模式的经典实现,在现代应用架构中扮演着重要角色。它在实现组件解耦、提升系统可维护性和扩展性方面发挥着不可替代的作用。从小型工具到大型单页应用,再到微服务架构中的消息队列,其核心思想无处不在。

展望未来,随着前端框架和状态管理库的不断发展,EventBus 的使用场景可能会更加聚焦于那些难以通过传统组件通信模式解决的“全局性”或“跨领域”的通知需求。同时,结合 TypeScript 的类型安全、异步处理以及严格的事件命名和文档化,EventBus 将继续成为构建健壮、可维护应用的强大工具。然而,如同任何强大的工具一样,它的价值最大化依赖于开发者对其原理的深入理解和审慎的使用。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注