手写一个 EventEmitter(发布订阅模式):支持 on, off, emit, once

各位开发者,下午好!

今天,我们将深入探讨一个在现代软件开发中无处不在,却又常常被忽视其内部机制的核心模式——发布-订阅模式(Publish-Subscribe Pattern),并亲手实现一个功能完备的 EventEmitter。作为一名编程专家,理解并能构建这样的基础组件,是衡量我们对系统设计和事件驱动架构掌握程度的重要标志。

第一章:发布-订阅模式的核心理念

1.1 什么是发布-订阅模式?

发布-订阅模式,通常简称为“Pub/Sub”,是一种消息传递模式,用于在系统组件之间实现松散耦合。它定义了这样一种机制:

  • 发布者 (Publisher):负责创建并发送事件或消息。发布者不知道哪些订阅者会接收这些消息,也不知道这些订阅者如何处理消息。
  • 订阅者 (Subscriber):注册对特定事件或消息的兴趣。当事件发生时,订阅者会被通知,并执行相应的处理逻辑。订阅者不知道是哪个发布者发送了消息。
  • 事件通道/代理 (Event Channel/Broker):这是模式的核心,充当发布者和订阅者之间的中介。发布者将事件发布到通道,订阅者从通道订阅事件。这个通道负责维护事件到订阅者的映射,并在事件发生时通知所有相关订阅者。

1.2 为什么需要发布-订阅模式?

这种模式带来了显著的优势:

  1. 解耦性 (Decoupling):发布者和订阅者之间没有直接依赖关系。它们只知道事件通道的存在。这意味着你可以独立地修改发布者或订阅者的实现,而不会影响另一方。
  2. 灵活性 (Flexibility):系统可以更容易地扩展。添加新的订阅者或发布者通常只需要少量配置,而无需修改现有代码。
  3. 可维护性 (Maintainability):由于模块之间的依赖性降低,代码库更容易理解、测试和维护。
  4. 异步处理 (Asynchronous Processing):事件发布和处理可以是异步的,这对于非阻塞操作和提高系统响应性至关重要。
  5. 实时性 (Real-time):在需要实时响应变化的场景(如UI交互、数据流处理)中非常有用。

1.3 常见的应用场景

发布-订阅模式无处不在,以下是一些典型例子:

  • 前端开发:浏览器事件(点击、键盘输入、加载完成)、组件间通信。
  • 后端开发:Node.js 的 EventEmitter、微服务架构中的消息队列(Kafka, RabbitMQ)、系统日志处理。
  • 操作系统:进程间通信、文件系统事件。
  • 游戏开发:事件系统,例如玩家行动、NPC 行为触发。

在今天的讲座中,我们将专注于构建一个类似于 Node.js EventEmitter 的自定义实现,它将作为我们理解发布-订阅模式的实践基础。

第二章:设计 EventEmitter 的核心数据结构

要构建一个 EventEmitter,首先需要解决的核心问题是如何存储和管理事件及其对应的监听器(订阅者)。我们需要一种高效的方式来:

  1. 根据事件名称快速查找所有相关的监听器。
  2. 添加、移除特定的监听器。

2.1 数据结构的选择

最直观且高效的数据结构是使用一个映射(Map 或 Object),其中键是事件名称(字符串),值是该事件对应的监听器列表。

为什么选择 Map 而不是普通 Object

在 JavaScript 中,Map 相比于普通 Object 有几个优势,尤其是在用作键值对存储时:

  • 键的类型Map 的键可以是任意类型(字符串、数字、对象等),而 Object 的键最终都会被转换为字符串。虽然我们的事件名称通常是字符串,但 Map 提供了更大的灵活性。
  • 顺序保证Map 会保留键的插入顺序,这在某些需要特定处理顺序的场景下很有用。
  • 性能:对于频繁的添加、删除和遍历操作,尤其是在键的数量非常大时,Map 通常比 Object 表现出更好的性能。
  • 迭代Map 提供了 keys(), values(), entries() 等迭代器方法,使用起来更方便。
  • 大小Map 可以通过 size 属性直接获取键值对的数量,而 Object 需要 Object.keys().length

因此,我们将使用 Map 来存储事件和监听器的关系。

监听器列表的选择:Array 还是 Set

对于每个事件名称对应的值,我们可以选择 ArraySet 来存储监听器函数。

  • Array<Function>
    • 优点:可以存储重复的监听器实例(尽管通常不推荐)。保留添加顺序。可以通过索引进行高效删除(如果知道索引)。
    • 缺点:查找特定监听器进行删除时可能需要遍历。
  • Set<Function>
    • 优点:自动去重,确保每个监听器只被添加一次。查找和删除操作通常是 O(1) 平均时间复杂度。
    • 缺点:不保证插入顺序。

考虑到 EventEmitter 的标准行为通常允许同一个函数被多次监听(虽然不常见,但为了通用性),并且 once 方法需要精确移除一个包装过的函数,Array 提供了更细粒度的控制,尤其是在处理 once 包装器时。因此,我们将选择 Array<Function>

最终数据结构设计:

// 伪代码
class EventEmitter {
    private listeners: Map<string, Function[]>;

    constructor() {
        this.listeners = new Map<string, Function[]>();
    }
    // ... 方法实现
}

每个事件名称将映射到一个函数数组,数组中的每个函数都是该事件的监听器。

第三章:实现核心方法

现在,我们有了数据结构,可以开始实现 EventEmitter 的核心方法:on, emit, off, once

3.1 on(eventName, listener):注册事件监听器

on 方法用于为指定的 eventName 注册一个 listener 函数。当 eventName 事件被触发时,该 listener 将会被调用。

  • 如果 eventName 尚未有任何监听器,则创建一个新的数组。
  • listener 添加到 eventName 对应的监听器数组中。
  • 为了支持链式调用,方法返回 this (EventEmitter 实例)。
class EventEmitter {
    private listeners: Map<string, Function[]>;
    private _maxListeners: number; // 用于限制每个事件的监听器数量,防止内存泄漏

    constructor() {
        this.listeners = new Map<string, Function[]>();
        this._maxListeners = EventEmitter.defaultMaxListeners; // 默认值
    }

    /**
     * 设置单个事件的最大监听器数量。
     * @param {number} n 新的最大监听器数量。
     * @returns {EventEmitter} 返回 EventEmitter 实例,支持链式调用。
     */
    setMaxListeners(n: number): EventEmitter {
        if (typeof n !== 'number' || n < 0 || isNaN(n)) {
            throw new TypeError('The value of "n" must be a non-negative number. Received ' + n + '.');
        }
        this._maxListeners = n;
        return this;
    }

    /**
     * 获取单个事件的最大监听器数量。
     * @returns {number} 当前的最大监听器数量。
     */
    getMaxListeners(): number {
        return this._maxListeners;
    }

    /**
     * 注册一个事件监听器。
     * @param {string} eventName 事件名称。
     * @param {Function} listener 监听器函数。
     * @returns {EventEmitter} 返回 EventEmitter 实例,支持链式调用。
     */
    on(eventName: string, listener: Function): EventEmitter {
        if (typeof listener !== 'function') {
            throw new TypeError('The "listener" argument must be of type Function. Received type ' + typeof listener);
        }

        if (!this.listeners.has(eventName)) {
            this.listeners.set(eventName, []);
        }

        const handlers = this.listeners.get(eventName)!;
        handlers.push(listener);

        // 检查监听器数量是否超过限制(Node.js EventEmitter 的一个特性)
        if (handlers.length > this._maxListeners && this._maxListeners !== 0) {
            console.warn(
                `Possible EventEmitter memory leak detected. ${handlers.length} ${eventName} listeners added. ` +
                `Use emitter.setMaxListeners() to increase limit.`
            );
        }

        return this;
    }

    /**
     * 别名方法:addListener
     */
    addListener(eventName: string, listener: Function): EventEmitter {
        return this.on(eventName, listener);
    }
}

// 静态属性,提供默认的最大监听器数量
EventEmitter.defaultMaxListeners = 10;

setMaxListenersgetMaxListeners 的说明:

这两个方法是 Node.js EventEmitter 的标准组成部分,用于帮助开发者在开发过程中发现潜在的内存泄漏问题。当某个事件的监听器数量超过设定的阈值时,会发出警告。这并不会阻止监听器的添加,而只是一个提醒。

3.2 emit(eventName, ...args):触发事件

emit 方法用于触发指定 eventName 的事件,并以任意数量的参数 ...args 调用所有注册到该事件的监听器。

  • 获取 eventName 对应的所有监听器。
  • 如果没有监听器,则不执行任何操作。
  • 遍历监听器数组,依次调用每个监听器,并传递 ...args
  • 监听器中的 this 上下文应指向 EventEmitter 实例。
  • 如果事件有监听器被调用,返回 true,否则返回 false
  • 错误处理:如果触发 error 事件且没有为该事件注册监听器,Node.js EventEmitter 会抛出一个错误。我们将模拟这个行为,以提高健壮性。
// 假设 EventEmitter 类已经定义了前面的部分

class EventEmitter {
    // ... 构造函数及 on, addListener, setMaxListeners, getMaxListeners 方法

    /**
     * 触发指定事件,并向所有监听器传递参数。
     * @param {string} eventName 事件名称。
     * @param {...any} args 传递给监听器的参数。
     * @returns {boolean} 如果有监听器被调用,返回 true;否则返回 false。
     */
    emit(eventName: string, ...args: any[]): boolean {
        const handlers = this.listeners.get(eventName);

        if (!handlers || handlers.length === 0) {
            // 如果是 'error' 事件且没有监听器,则抛出错误,这是 Node.js EventEmitter 的行为
            if (eventName === 'error') {
                const error = args[0] instanceof Error ? args[0] : new Error(`Unhandled 'error' event. (${args[0]})`);
                throw error;
            }
            return false;
        }

        // 遍历并调用所有监听器
        // 使用 slice() 创建一个副本,以防止在遍历过程中监听器被移除导致迭代问题
        const listenersToCall = handlers.slice();
        for (const listener of listenersToCall) {
            try {
                // 使用 apply 来确保监听器内部的 'this' 指向 EventEmitter 实例
                listener.apply(this, args);
            } catch (error) {
                // 如果监听器内部抛出错误,且有 'error' 事件监听器,则发布 'error' 事件
                // 否则,直接抛出错误。
                if (this.listeners.has('error') && this.listeners.get('error')!.length > 0) {
                    this.emit('error', error);
                } else {
                    console.error(`Error in listener for event "${eventName}":`, error);
                    // 如果没有专门的错误处理,也可以选择直接抛出,取决于具体需求
                    // throw error;
                }
            }
        }

        return true;
    }
}

this 上下文的说明:

emit 方法中,我们使用 listener.apply(this, args) 来调用监听器。这确保了在监听器函数内部,this 关键字将指向当前的 EventEmitter 实例。这与 Node.js EventEmitter 的行为保持一致,并且对于许多场景(例如,监听器需要访问 EventEmitter 实例上的其他方法或属性)来说至关重要。

3.3 off(eventName, listener):移除事件监听器

off 方法用于移除指定 eventName 的特定 listener

  • 如果未提供 listener,则移除 eventName 的所有监听器。
  • 如果未提供 eventNamelistener,则移除所有事件的所有监听器。
  • 如果 listener 是一个 once 包装器(我们在 once 方法中会创建),则需要特殊处理,移除其内部的原始监听器。
// 假设 EventEmitter 类已经定义了前面的部分

class EventEmitter {
    // ... 构造函数及 on, addListener, setMaxListeners, getMaxListeners, emit 方法

    /**
     * 移除指定事件的监听器。
     * 如果未指定 listener,则移除该事件的所有监听器。
     * 如果未指定 eventName 和 listener,则移除所有事件的所有监听器。
     * @param {string} [eventName] 事件名称。
     * @param {Function} [listener] 要移除的监听器函数。
     * @returns {EventEmitter} 返回 EventEmitter 实例,支持链式调用。
     */
    off(eventName?: string, listener?: Function): EventEmitter {
        // 如果没有指定 eventName,则移除所有事件的所有监听器
        if (eventName === undefined) {
            this.listeners.clear();
            return this;
        }

        const handlers = this.listeners.get(eventName);
        if (!handlers || handlers.length === 0) {
            return this; // 没有监听器,直接返回
        }

        // 如果指定了 eventName 但未指定 listener,则移除该事件的所有监听器
        if (listener === undefined) {
            this.listeners.delete(eventName);
            return this;
        }

        if (typeof listener !== 'function') {
            throw new TypeError('The "listener" argument must be of type Function. Received type ' + typeof listener);
        }

        // 移除特定的监听器
        let index = -1;
        for (let i = 0; i < handlers.length; i++) {
            // 考虑 once 包装器:如果当前处理器是 once 包装器,并且其 originalListener 属性与要移除的 listener 匹配
            // 或者处理器本身就是 listener
            if (handlers[i] === listener || (handlers[i] as any).originalListener === listener) {
                index = i;
                break;
            }
        }

        if (index !== -1) {
            handlers.splice(index, 1);
            if (handlers.length === 0) {
                this.listeners.delete(eventName); // 如果事件没有监听器了,就从 Map 中移除该事件
            }
        }

        return this;
    }

    /**
     * 别名方法:removeListener
     */
    removeListener(eventName: string, listener: Function): EventEmitter {
        return this.off(eventName, listener);
    }

    /**
     * 移除指定事件的所有监听器。
     * @param {string} [eventName] 事件名称。如果未指定,则移除所有事件的所有监听器。
     * @returns {EventEmitter} 返回 EventEmitter 实例,支持链式调用。
     */
    removeAllListeners(eventName?: string): EventEmitter {
        if (eventName === undefined) {
            this.listeners.clear();
        } else {
            this.listeners.delete(eventName);
        }
        return this;
    }
}

once 包装器处理的说明:

off 方法在查找要移除的监听器时,需要同时检查 handlers[i] === listener(handlers[i] as any).originalListener === listener。这是为了处理 once 方法注册的监听器。once 方法会创建一个包装器函数,并将原始监听器作为其 originalListener 属性。当我们尝试使用原始监听器来 off 时,我们需要识别并移除这个包装器。

3.4 once(eventName, listener):只触发一次的事件监听器

once 方法注册一个只会被触发一次的事件监听器。一旦事件被触发,该监听器就会自动从事件列表中移除。

  • 创建一个包装器函数 wrapper
  • wrapper 函数在被调用时,首先执行原始的 listener,然后立即通过 off 方法将自身从事件列表中移除。
  • 为了让 off 方法能够识别并移除这个包装器,我们需要将原始 listener 存储在 wrapper 的一个属性上(例如 wrapper.originalListener)。
  • wrapper 函数注册到 eventName
// 假设 EventEmitter 类已经定义了前面的部分

class EventEmitter {
    // ... 构造函数及 on, addListener, setMaxListeners, getMaxListeners, emit, off, removeListener, removeAllListeners 方法

    /**
     * 注册一个只触发一次的事件监听器。
     * @param {string} eventName 事件名称。
     * @param {Function} listener 监听器函数。
     * @returns {EventEmitter} 返回 EventEmitter 实例,支持链式调用。
     */
    once(eventName: string, listener: Function): EventEmitter {
        if (typeof listener !== 'function') {
            throw new TypeError('The "listener" argument must be of type Function. Received type ' + typeof listener);
        }

        // 创建一个包装器函数
        // 这里的 'this' 上下文将是 EventEmitter 实例,因为 emit 会用 apply 调用
        const wrapper = (...args: any[]) => {
            this.off(eventName, wrapper); // 在调用原始监听器之前先移除自身
            listener.apply(this, args); // 调用原始监听器,保持 this 上下文
        };

        // 将原始监听器存储在包装器上,以便 off 方法可以识别和移除它
        (wrapper as any).originalListener = listener;

        // 将包装器函数注册为事件监听器
        this.on(eventName, wrapper);

        return this;
    }
}

wrapper.originalListener 的重要性:

这个属性是 onceoff 协同工作的关键。当用户调用 emitter.once('foo', myFunc),然后又想通过 emitter.off('foo', myFunc) 来取消订阅时,off 方法需要知道它应该移除的是那个包裹着 myFuncwrapper 函数。通过将 myFunc 存储在 wrapper.originalListener 上,off 方法就能够找到并移除正确的 wrapper

第四章:辅助方法与高级考量

为了使 EventEmitter 更加完善和易于使用,我们还需要添加一些辅助方法,并讨论一些高级考量。

4.1 辅助方法

4.1.1 prependListener(eventName, listener)

on 类似,但将监听器添加到监听器数组的开头,使其在其他监听器之前被调用。

class EventEmitter {
    // ... 其他方法

    /**
     * 将监听器添加到指定事件的监听器数组的开头。
     * @param {string} eventName 事件名称。
     * @param {Function} listener 监听器函数。
     * @returns {EventEmitter} 返回 EventEmitter 实例,支持链式调用。
     */
    prependListener(eventName: string, listener: Function): EventEmitter {
        if (typeof listener !== 'function') {
            throw new TypeError('The "listener" argument must be of type Function. Received type ' + typeof listener);
        }

        if (!this.listeners.has(eventName)) {
            this.listeners.set(eventName, []);
        }

        const handlers = this.listeners.get(eventName)!;
        handlers.unshift(listener); // 添加到数组开头

        if (handlers.length > this._maxListeners && this._maxListeners !== 0) {
            console.warn(
                `Possible EventEmitter memory leak detected. ${handlers.length} ${eventName} listeners added. ` +
                `Use emitter.setMaxListeners() to increase limit.`
            );
        }
        return this;
    }
}

4.1.2 prependOnceListener(eventName, listener)

once 类似,但将 once 包装器添加到监听器数组的开头。

class EventEmitter {
    // ... 其他方法

    /**
     * 将只触发一次的监听器添加到指定事件的监听器数组的开头。
     * @param {string} eventName 事件名称。
     * @param {Function} listener 监听器函数。
     * @returns {EventEmitter} 返回 EventEmitter 实例,支持链式调用。
     */
    prependOnceListener(eventName: string, listener: Function): EventEmitter {
        if (typeof listener !== 'function') {
            throw new TypeError('The "listener" argument must be of type Function. Received type ' + typeof listener);
        }

        const wrapper = (...args: any[]) => {
            this.off(eventName, wrapper);
            listener.apply(this, args);
        };

        (wrapper as any).originalListener = listener;

        if (!this.listeners.has(eventName)) {
            this.listeners.set(eventName, []);
        }

        const handlers = this.listeners.get(eventName)!;
        handlers.unshift(wrapper); // 添加到数组开头

        if (handlers.length > this._maxListeners && this._maxListeners !== 0) {
            console.warn(
                `Possible EventEmitter memory leak detected. ${handlers.length} ${eventName} listeners added. ` +
                `Use emitter.setMaxListeners() to increase limit.`
            );
        }
        return this;
    }
}

4.1.3 eventNames()

返回一个包含所有已注册事件名称的数组。

class EventEmitter {
    // ... 其他方法

    /**
     * 返回一个包含所有已注册事件名称的数组。
     * @returns {string[]} 事件名称数组。
     */
    eventNames(): string[] {
        return Array.from(this.listeners.keys());
    }
}

4.1.4 listenerCount(eventName)

返回指定 eventName 的监听器数量。

class EventEmitter {
    // ... 其他方法

    /**
     * 返回指定事件的监听器数量。
     * @param {string} eventName 事件名称。
     * @returns {number} 监听器数量。
     */
    listenerCount(eventName: string): number {
        const handlers = this.listeners.get(eventName);
        return handlers ? handlers.length : 0;
    }
}

4.1.5 listeners(eventName)

返回指定 eventName 的所有监听器函数数组。对于 once 监听器,返回的是原始监听器函数。

class EventEmitter {
    // ... 其他方法

    /**
     * 返回指定事件的所有监听器函数数组。
     * 对于 once 监听器,返回的是原始监听器函数。
     * @param {string} eventName 事件名称。
     * @returns {Function[]} 监听器函数数组。
     */
    listeners(eventName: string): Function[] {
        const handlers = this.listeners.get(eventName);
        if (!handlers) {
            return [];
        }
        // 对于 once 包装器,返回其 originalListener,否则返回本身
        return handlers.map(handler => (handler as any).originalListener || handler);
    }

    /**
     * 别名方法:rawListeners
     * 返回指定事件的所有原始监听器函数数组,包括 once 包装器本身。
     * @param {string} eventName 事件名称。
     * @returns {Function[]} 监听器函数数组。
     */
    rawListeners(eventName: string): Function[] {
        const handlers = this.listeners.get(eventName);
        return handlers ? handlers.slice() : []; // 返回副本
    }
}

4.2 完整代码示例

// EventEmitter.ts
class EventEmitter {
    private listeners: Map<string, Function[]>;
    private _maxListeners: number;
    static defaultMaxListeners: number = 10;

    constructor() {
        this.listeners = new Map<string, Function[]>();
        this._maxListeners = EventEmitter.defaultMaxListeners;
    }

    setMaxListeners(n: number): EventEmitter {
        if (typeof n !== 'number' || n < 0 || isNaN(n)) {
            throw new TypeError('The value of "n" must be a non-negative number. Received ' + n + '.');
        }
        this._maxListeners = n;
        return this;
    }

    getMaxListeners(): number {
        return this._maxListeners;
    }

    on(eventName: string, listener: Function): EventEmitter {
        if (typeof listener !== 'function') {
            throw new TypeError('The "listener" argument must be of type Function. Received type ' + typeof listener);
        }

        if (!this.listeners.has(eventName)) {
            this.listeners.set(eventName, []);
        }

        const handlers = this.listeners.get(eventName)!;
        handlers.push(listener);

        if (handlers.length > this._maxListeners && this._maxListeners !== 0) {
            console.warn(
                `Possible EventEmitter memory leak detected. ${handlers.length} ${eventName} listeners added. ` +
                `Use emitter.setMaxListeners() to increase limit.`
            );
        }
        return this;
    }

    addListener(eventName: string, listener: Function): EventEmitter {
        return this.on(eventName, listener);
    }

    prependListener(eventName: string, listener: Function): EventEmitter {
        if (typeof listener !== 'function') {
            throw new TypeError('The "listener" argument must be of type Function. Received type ' + typeof listener);
        }

        if (!this.listeners.has(eventName)) {
            this.listeners.set(eventName, []);
        }

        const handlers = this.listeners.get(eventName)!;
        handlers.unshift(listener);

        if (handlers.length > this._maxListeners && this._maxListeners !== 0) {
            console.warn(
                `Possible EventEmitter memory leak detected. ${handlers.length} ${eventName} listeners added. ` +
                `Use emitter.setMaxListeners() to increase limit.`
            );
        }
        return this;
    }

    once(eventName: string, listener: Function): EventEmitter {
        if (typeof listener !== 'function') {
            throw new TypeError('The "listener" argument must be of type Function. Received type ' + typeof listener);
        }

        const wrapper = (...args: any[]) => {
            this.off(eventName, wrapper);
            listener.apply(this, args);
        };

        (wrapper as any).originalListener = listener;
        this.on(eventName, wrapper);
        return this;
    }

    prependOnceListener(eventName: string, listener: Function): EventEmitter {
        if (typeof listener !== 'function') {
            throw new TypeError('The "listener" argument must be of type Function. Received type ' + typeof listener);
        }

        const wrapper = (...args: any[]) => {
            this.off(eventName, wrapper);
            listener.apply(this, args);
        };

        (wrapper as any).originalListener = listener;

        if (!this.listeners.has(eventName)) {
            this.listeners.set(eventName, []);
        }

        const handlers = this.listeners.get(eventName)!;
        handlers.unshift(wrapper);

        if (handlers.length > this._maxListeners && this._maxListeners !== 0) {
            console.warn(
                `Possible EventEmitter memory leak detected. ${handlers.length} ${eventName} listeners added. ` +
                `Use emitter.setMaxListeners() to increase limit.`
            );
        }
        return this;
    }

    off(eventName?: string, listener?: Function): EventEmitter {
        if (eventName === undefined) {
            this.listeners.clear();
            return this;
        }

        const handlers = this.listeners.get(eventName);
        if (!handlers || handlers.length === 0) {
            return this;
        }

        if (listener === undefined) {
            this.listeners.delete(eventName);
            return this;
        }

        if (typeof listener !== 'function') {
            throw new TypeError('The "listener" argument must be of type Function. Received type ' + typeof listener);
        }

        let index = -1;
        for (let i = 0; i < handlers.length; i++) {
            if (handlers[i] === listener || (handlers[i] as any).originalListener === listener) {
                index = i;
                break;
            }
        }

        if (index !== -1) {
            handlers.splice(index, 1);
            if (handlers.length === 0) {
                this.listeners.delete(eventName);
            }
        }
        return this;
    }

    removeListener(eventName: string, listener: Function): EventEmitter {
        return this.off(eventName, listener);
    }

    removeAllListeners(eventName?: string): EventEmitter {
        if (eventName === undefined) {
            this.listeners.clear();
        } else {
            this.listeners.delete(eventName);
        }
        return this;
    }

    emit(eventName: string, ...args: any[]): boolean {
        const handlers = this.listeners.get(eventName);

        if (!handlers || handlers.length === 0) {
            if (eventName === 'error') {
                const error = args[0] instanceof Error ? args[0] : new Error(`Unhandled 'error' event. (${args[0]})`);
                throw error;
            }
            return false;
        }

        const listenersToCall = handlers.slice(); // 创建副本防止在迭代中修改
        for (const listener of listenersToCall) {
            try {
                listener.apply(this, args);
            } catch (error) {
                if (this.listeners.has('error') && this.listeners.get('error')!.length > 0) {
                    this.emit('error', error);
                } else {
                    console.error(`Error in listener for event "${eventName}":`, error);
                    // throw error; // 也可以选择直接抛出
                }
            }
        }
        return true;
    }

    eventNames(): string[] {
        return Array.from(this.listeners.keys());
    }

    listenerCount(eventName: string): number {
        const handlers = this.listeners.get(eventName);
        return handlers ? handlers.length : 0;
    }

    listeners(eventName: string): Function[] {
        const handlers = this.listeners.get(eventName);
        if (!handlers) {
            return [];
        }
        return handlers.map(handler => (handler as any).originalListener || handler);
    }

    rawListeners(eventName: string): Function[] {
        const handlers = this.listeners.get(eventName);
        return handlers ? handlers.slice() : [];
    }
}

4.3 EventEmitter 方法速览

以下表格总结了我们实现的 EventEmitter 的主要方法及其功能:

方法名称 参数 返回值 描述
on / addListener eventName: string, listener: Function EventEmitter 注册一个事件监听器。
prependListener eventName: string, listener: Function EventEmitter 在监听器列表的开头注册一个事件监听器。
once eventName: string, listener: Function EventEmitter 注册一个只触发一次的事件监听器。
prependOnceListener eventName: string, listener: Function EventEmitter 在监听器列表的开头注册一个只触发一次的事件监听器。
off / removeListener eventName?: string, listener?: Function EventEmitter 移除指定事件的特定监听器、所有监听器,或移除所有事件的所有监听器。
removeAllListeners eventName?: string EventEmitter 移除指定事件的所有监听器,或移除所有事件的所有监听器。
emit eventName: string, ...args: any[] boolean 触发指定事件,并向所有监听器传递参数。如果事件有监听器被调用,返回 true
eventNames string[] 返回一个包含所有已注册事件名称的数组。
listenerCount eventName: string number 返回指定事件的监听器数量。
listeners eventName: string Function[] 返回指定事件的所有原始监听器函数数组。
rawListeners eventName: string Function[] 返回指定事件的所有原始监听器函数数组,包括 once 包装器本身。
setMaxListeners n: number EventEmitter 设置单个事件的最大监听器数量警告阈值。
getMaxListeners number 获取当前最大监听器数量警告阈值。

4.4 实际使用示例

让我们通过几个例子来看看我们实现的 EventEmitter 是如何工作的。

// 假设 EventEmitter 类已经如上定义并导入
// import { EventEmitter } from './EventEmitter'; // 如果是模块化环境

const myEmitter = new EventEmitter();

// 1. 基本的 on 和 emit
myEmitter.on('greet', (name: string) => {
    console.log(`Hello, ${name}!`);
});

myEmitter.emit('greet', 'Alice'); // 输出: Hello, Alice!

// 2. 多个监听器
myEmitter.on('data', (payload: any) => {
    console.log('Received data (listener 1):', payload);
});
myEmitter.on('data', (payload: any) => {
    console.log('Processing data (listener 2):', payload.id);
});

myEmitter.emit('data', { id: 101, value: 'test' });
// 输出:
// Received data (listener 1): { id: 101, value: 'test' }
// Processing data (listener 2): 101

// 3. once 监听器
myEmitter.once('setup', () => {
    console.log('One-time setup complete.');
});
myEmitter.emit('setup'); // 输出: One-time setup complete.
myEmitter.emit('setup'); // 无输出,因为 once 监听器已被移除

// 4. off 移除特定监听器
function myCallback(message: string) {
    console.log('My callback:', message);
}
myEmitter.on('status', myCallback);
myEmitter.on('status', (msg: string) => console.log('Another status:', msg));

myEmitter.emit('status', 'initializing'); // 输出: My callback: initializing, Another status: initializing

myEmitter.off('status', myCallback); // 移除 myCallback
myEmitter.emit('status', 'running'); // 输出: Another status: running (myCallback 不再被调用)

// 5. off 移除所有监听器 for an event
myEmitter.removeAllListeners('data');
console.log('Data listeners count after removeAllListeners:', myEmitter.listenerCount('data')); // 输出: 0

// 6. 错误事件处理
myEmitter.on('error', (err: Error) => {
    console.error('Caught an error:', err.message);
});

myEmitter.on('task', () => {
    throw new Error('Something went wrong in task!');
});

myEmitter.emit('task'); // 输出: Caught an error: Something went wrong in task!

// 7. prependListener
myEmitter.on('log', (msg: string) => console.log(`[Default] ${msg}`));
myEmitter.prependListener('log', (msg: string) => console.log(`[Urgent] ${msg}`));
myEmitter.emit('log', 'System alert!');
// 输出:
// [Urgent] System alert!
// [Default] System alert!

// 8. 链式调用
myEmitter
    .on('chain', () => console.log('Chain 1'))
    .on('chain', () => console.log('Chain 2'))
    .emit('chain');
// 输出:
// Chain 1
// Chain 2

// 9. listenerCount, eventNames
console.log('Active event names:', myEmitter.eventNames());
console.log('Status event listeners:', myEmitter.listenerCount('status'));
console.log('Error event raw listeners:', myEmitter.rawListeners('error'));
console.log('Error event listeners (unwrapped):', myEmitter.listeners('error'));

// 10. setMaxListeners
myEmitter.setMaxListeners(1); // 设置最大监听器为1
myEmitter.on('warn_event', () => console.log('Handler 1'));
myEmitter.on('warn_event', () => console.log('Handler 2'));
// 此时会输出警告信息: Possible EventEmitter memory leak detected. 2 warn_event listeners added. ...

4.5 与 Node.js EventEmitter 的对比

我们构建的 EventEmitter 借鉴了 Node.js 的设计哲学和 API 签名。以下是一些主要相似点和差异:

相似点:

  • 核心 APIon, off, emit, once (以及它们的别名 addListener, removeListener) 的功能和行为高度一致。
  • this 上下文:监听器中的 this 默认指向 EventEmitter 实例。
  • 错误事件处理:对 error 事件的特殊处理(如果没有监听器则抛出)。
  • setMaxListeners:用于内存泄漏警告的机制。
  • 辅助方法prependListener, prependOnceListener, eventNames, listenerCount, listeners, rawListeners 等也与 Node.js 提供的功能相符。

差异点(Node.js EventEmitter 更高级的特性):

  • 域 (Domains):在旧版 Node.js 中,EventEmitter 可以与 domain 模块集成,用于在异步操作中捕获错误。这在现代 Node.js 中已不推荐使用。
  • 性能优化:Node.js 的内部实现可能包含更底层的 C++ 绑定和更复杂的优化,尤其是在处理大量事件和监听器时。
  • Symbol 作为事件名称:Node.js 允许使用 Symbol 作为事件名称,这有助于避免命名冲突。我们的实现目前只支持字符串。
  • EventEmitter.listenerCount(emitter, eventName) 静态方法:Node.js 提供了静态的 listenerCount 方法,可以直接通过类调用。我们的 listenerCount 是实例方法。
  • new EventTarget(): 现代浏览器和 Node.js 也支持 EventTarget 接口,它提供了 addEventListener, removeEventListener, dispatchEvent,这与 EventEmitter 略有不同,但理念相似。EventEmitter 更侧重于事件名称的字符串标识,而 EventTarget 通常用于 DOM 事件或 Web API。

总的来说,我们实现的 EventEmitter 已经非常接近 Node.js EventEmitter 的核心功能,足以满足大多数自定义事件系统的需求。

第五章:深入思考与最佳实践

5.1 异步监听器与错误处理

我们的 emit 方法是同步的。这意味着当 emit 被调用时,所有监听器会立即按顺序执行。如果某个监听器执行的是异步操作(例如网络请求、定时器),emit 不会等待这些异步操作完成。

myEmitter.on('asyncEvent', async () => {
    console.log('Async listener started');
    await new Promise(resolve => setTimeout(resolve, 100));
    console.log('Async listener finished');
});

console.log('Before emit');
myEmitter.emit('asyncEvent');
console.log('After emit');

// 输出顺序可能是:
// Before emit
// Async listener started
// After emit
// Async listener finished

这种行为通常是期望的,因为 EventEmitter 的职责是通知,而不是协调异步流程。如果需要协调异步操作,可以考虑使用 Promise.all 包装监听器,或引入更高级的事件流库(如 RxJS)。

对于监听器内部的错误,我们已经实现了基本的 try...catch 机制,并将错误重新 emit'error' 事件。这是一个健壮的实践,因为它防止了一个监听器的错误影响到其他监听器的执行,并提供了一个集中的错误处理点。

5.2 内存泄漏的考量

EventEmitter 最大的潜在陷阱之一就是内存泄漏。当事件监听器被注册,但从未被移除时,即使监听器函数本身或其闭包中的变量不再需要,它们也可能一直被 EventEmitter 实例引用,导致无法被垃圾回收。

常见的内存泄漏场景:

  • 长期存活的对象监听短期存活的事件:例如,一个全局的 EventEmitter 监听了许多在组件生命周期结束后没有被移除的组件事件。
  • 循环引用:监听器函数内部引用了 EventEmitter 实例,而 EventEmitter 实例又引用了监听器。
  • once 的滥用或误用:虽然 once 会自动移除自身,但如果事件从未被触发,或者包装器因为某种原因没有被移除,仍然可能导致泄漏。

预防策略:

  1. 始终移除监听器:在组件销毁、任务完成或不再需要监听时,调用 offremoveListener
  2. 使用 once:对于只需要一次通知的场景,优先使用 once
  3. setMaxListeners:利用 setMaxListeners 提供的警告机制,帮助发现潜在问题。
  4. 审查代码:定期审查 ononce 的使用,确保有匹配的 off 调用或合理的生命周期管理。

5.3 事件命名约定

清晰一致的事件命名对于维护大型系统至关重要。建议遵循以下约定:

  • 语义化:事件名称应清晰地描述发生的事情,例如 userLoggedIn, orderProcessed, dataReceived
  • 过去式或完成式:通常事件表示已经发生的事情,使用过去式动词更合适。
  • 命名空间:如果事件可能来自不同模块或组件,可以使用点分隔符创建命名空间,例如 user.loggedIn, payment.failed
  • 避免过于通用:例如 changeupdate,它们可能不够具体,难以理解实际发生了什么。

5.4 何时使用 EventEmitter?何时不使用?

使用场景:

  • 组件通信:在没有直接父子关系的组件之间进行通信。
  • 状态变化通知:当某个对象的内部状态发生变化时,通知外部感兴趣的部分。
  • 自定义事件系统:构建特定于应用程序的事件总线或消息代理。
  • 解耦模块:使模块之间通过事件而非直接函数调用进行交互,降低耦合度。

不使用场景:

  • 简单的函数调用:如果可以直接调用函数,并且没有解耦的需求,不要为了使用 EventEmitter 而使用。
  • 复杂的数据流管理:对于需要严格数据流控制、副作用管理和可追溯性的场景,EventEmitter 可能不足以管理复杂性,此时更适合使用状态管理库(如 Redux, Vuex)或响应式编程库(如 RxJS)。
  • 全局滥用:过度使用全局 EventEmitter 会导致事件冒泡和难以追踪的副作用,使系统难以理解和调试。尽量将 EventEmitter 实例限制在合理的上下文范围。

第六章:思考、实践、提升

我们已经详细地实现了一个功能完备的 EventEmitter,并深入探讨了发布-订阅模式的原理、应用及最佳实践。这个过程不仅帮助我们理解了 on, off, emit, once 等核心方法的内部逻辑,更重要的是,它教会了我们如何从零开始构建一个健壮、可维护、可扩展的系统组件。

作为编程专家,我们不仅仅是API的使用者,更应是底层机制的洞察者和创造者。通过这次实践,我们强化了对事件驱动架构的理解,提升了在复杂系统中设计和实现解耦机制的能力。希望这次讲座能为大家带来启发,并在未来的开发工作中提供有益的指导。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注