手写实现一个支持 Promise 的 EventEmitter:异步事件发布与监听

讲座主题:手写实现一个支持 Promise 的 EventEmitter:异步事件发布与监听

各位技术同仁,大家好!

在现代软件架构中,事件驱动模式扮演着至关重要的角色。它提供了一种松耦合、高扩展性的通信机制,使得系统的不同组件可以独立地发布和订阅事件,从而实现模块间的解耦。在 Node.js 生态系统中,内置的 EventEmitter 是这一模式的基石,它简单、高效,被广泛应用于各种场景。然而,随着异步编程范式的普及,尤其是 Promise 和 async/await 的出现,传统的同步 EventEmitter 在处理耗时操作、I/O 密集型任务或网络请求时,逐渐暴露出其局限性。

今天,我们将深入探讨如何从零开始,手写实现一个支持 Promise 的 EventEmitter。这个增强版的事件系统不仅能够处理传统的同步事件,还能优雅地管理异步事件的发布与监听,让您的事件驱动架构能够无缝融入现代异步编程的洪流。

一、事件驱动架构与传统 EventEmitter 的局限

1.1 什么是事件驱动架构?

事件驱动架构(Event-Driven Architecture, EDA)是一种软件架构范式,其核心思想是系统中的各个组件通过发布(publish)和订阅(subscribe)事件来进行通信。当某个组件发生特定状态变化或完成某项任务时,它会发布一个事件。对这个事件感兴趣的其他组件则会订阅该事件,并在事件发生时执行相应的处理逻辑。

EDA 的主要优势包括:

  • 松耦合: 发布者和订阅者之间无需直接了解对方,只需知道事件的类型。
  • 高扩展性: 增加新的订阅者不会影响现有系统,只需注册新的事件处理器。
  • 响应性: 系统可以对外部事件快速作出反应。
  • 异步性: 事件处理通常可以是非阻塞的,提高系统吞吐量。

1.2 Node.js 中的 EventEmitter

在 Node.js 中,EventEmitter 是一个核心模块,它提供了一个简单的 API 来实现事件发布和订阅。许多内置模块(如 http.Serverfs.ReadStream 等)都继承自 EventEmitter

其基本用法如下:

const EventEmitter = require('events');

class MyEmitter extends EventEmitter {}

const myEmitter = new MyEmitter();
myEmitter.on('event', () => {
  console.log('An event occurred!');
});
myEmitter.emit('event'); // 输出: An event occurred!

1.3 传统 EventEmitter 的同步特性及局限

传统 EventEmitteremit 方法是同步执行所有注册的监听器的。这意味着当 emit 方法被调用时,它会立即遍历并执行所有与该事件名称关联的监听器函数,然后才将控制权返回给调用者。

优点:

  • 简单直接: 理解和使用都非常简单。
  • 性能高效: 同步执行避免了异步调度的开销,对于快速、非阻塞的任务非常高效。

局限性:
然而,在现代应用中,很多操作本质上是异步的,例如:

  • 数据库查询: myEmitter.on('userCreated', async (user) => { await saveUserToDB(user); });
  • 网络请求: myEmitter.on('dataReceived', async (data) => { await sendDataToService(data); });
  • 文件读写: myEmitter.on('fileUploaded', async (path) => { await processFile(path); });
  • 长时间计算: 耗时的 CPU 密集型任务。

如果我们将这些异步逻辑直接放在传统的同步监听器中,会遇到以下问题:

  1. 无法等待异步操作完成: emit 方法会立即返回,而不会等待监听器内部的 Promise 解决。调用者无法知道事件是否被“完全处理”。
  2. 错误处理困难: 异步监听器内部抛出的未捕获错误(Promise reject)可能不会被 emit 调用者捕获,导致难以调试和管理。
  3. 逻辑割裂: 调用 emit 的代码无法方便地链式处理事件处理完成后的逻辑,可能需要额外的回调或Promise。

这些局限性促使我们思考:如何才能构建一个能够理解并管理异步监听器,并提供统一异步处理接口的 EventEmitter?答案就在于利用现代 JavaScript 异步编程的基石——Promise 和 async/await

二、现代异步编程基石:Promise 与 async/await

在深入实现之前,我们快速回顾一下 Promise 和 async/await 的核心概念。

2.1 Promise 的基本概念

Promise 是 JavaScript 中处理异步操作的对象,它代表一个异步操作的最终完成(或失败)及其结果值。一个 Promise 对象有三种状态:

  • Pending (待定): 初始状态,既没有成功,也没有失败。
  • Fulfilled (已成功): 异步操作成功完成,并返回一个值。
  • Rejected (已失败): 异步操作失败,并返回一个错误。

Promise 的核心在于其链式调用能力,通过 .then().catch() 方法,我们可以对异步操作的结果进行处理,并避免回调地狱。

function fetchData(url) {
    return new Promise((resolve, reject) => {
        // 模拟异步网络请求
        setTimeout(() => {
            if (url === 'success') {
                resolve({ data: 'Hello World' });
            } else {
                reject(new Error('Failed to fetch data'));
            }
        }, 1000);
    });
}

fetchData('success')
    .then(result => console.log('Success:', result.data)) // Success: Hello World
    .catch(error => console.error('Error:', error.message));

fetchData('fail')
    .then(result => console.log('Success:', result.data))
    .catch(error => console.error('Error:', error.message)); // Error: Failed to fetch data

2.2 async/await:Promise 的语法糖

async/await 是 ES2017 引入的语法,它建立在 Promise 之上,旨在使异步代码看起来和行为更像同步代码,从而提高可读性和可维护性。

  • async 函数: 任何被 async 关键字修饰的函数都会返回一个 Promise。函数内部可以使用 await 关键字。
  • await 表达式: await 关键字只能在 async 函数内部使用,它会暂停 async 函数的执行,直到其后的 Promise 解决(fulfilled)或拒绝(rejected)。如果 Promise 解决,await 表达式会返回解决的值;如果 Promise 拒绝,await 会抛出拒绝的原因。
async function processData() {
    try {
        console.log('Fetching data...');
        const result = await fetchData('success'); // 暂停直到fetchData完成
        console.log('Processing:', result.data);

        // 尝试获取一个会失败的数据
        console.log('Fetching another data (will fail)...');
        await fetchData('fail'); // 这里会抛出错误
        console.log('This line will not be reached.');
    } catch (error) {
        console.error('An error occurred during processing:', error.message);
    } finally {
        console.log('Processing finished.');
    }
}

processData();
// 输出顺序大致为:
// Fetching data...
// Processing: Hello World
// Fetching another data (will fail)...
// An error occurred during processing: Failed to fetch data
// Processing finished.

2.3 为什么 Promise 是实现异步 EventEmitter 的关键

Promise 和 async/await 提供了我们所需的所有工具,来将异步操作的结果和错误以统一的方式进行封装和处理。

  • 统一的接口: 无论是同步函数返回一个值,还是异步函数返回一个 Promise,我们都可以将它们视为“可能需要等待”的实体。
  • 错误传播: Promise 的拒绝机制能够将异步操作中的错误有效地传播出去,方便 emit 调用者捕获和处理。
  • 并发控制: Promise.all()Promise.allSettled() 等方法允许我们并发地执行多个 Promise,并在它们全部完成或失败后进行统一处理,这正是异步 EventEmitter emit 方法所需要的。

三、设计一个支持 Promise 的 EventEmitter

现在,让我们开始设计我们的 PromiseEventEmitter

3.1 核心需求分析

为了实现一个健壮且支持 Promise 的 EventEmitter,我们需要满足以下核心需求:

  1. 兼容性: 能够同时注册和触发同步监听器和异步监听器(返回 Promise 的函数)。
  2. 异步 emit emit 方法本身应该返回一个 Promise,该 Promise 在所有相关事件监听器执行完毕(无论成功或失败)后解决。
  3. 结果聚合: emit 返回的 Promise 应该能够提供所有监听器的执行结果(或错误)。
  4. 错误处理: 异步监听器中的错误必须能够被 emit 方法捕获,并以可管理的方式报告给 emit 的调用者。
  5. API 相似性: 尽可能保持与 Node.js EventEmitter 相似的 API 接口 (on, off, once, emit 等),以便于迁移和理解。
  6. once 语义: once 注册的监听器在执行一次后应自动移除,无论其是同步还是异步。

3.2 内部数据结构

我们将使用 Map 来存储事件名称和其对应的监听器列表,这与 Node.js 的 EventEmitter 内部实现类似。然而,为了支持 once 语义和统一管理不同类型的监听器,我们需要一个稍微复杂的结构来包装每个监听器。

我们将定义以下类型:

  • ListenerFunction: 传统的同步监听器,返回任意值。
  • AsyncListenerFunction: 异步监听器,返回一个 Promise。
  • GeneralListener: 泛型监听器,可以是同步或异步。
  • ListenerWrapper: 这是一个内部结构,用于封装原始监听器,并携带额外的元数据(例如,是否是 once 监听器,以及在 once 情况下对原始监听器的引用)。
// 定义各种监听器类型
type ListenerFunction = (...args: any[]) => any;
type AsyncListenerFunction = (...args: any[]) => Promise<any>;
type GeneralListener = ListenerFunction | AsyncListenerFunction;

// 监听器包装器,用于内部管理
interface ListenerWrapper {
    listener: GeneralListener; // 实际执行的监听器函数 (对于once,这可能是一个包装函数)
    once: boolean;             // 标识是否为一次性监听器
    originalListener?: GeneralListener; // 对于once监听器,存储其原始函数,以便off能够匹配
}

我们的 PromiseEventEmitter 类将包含一个私有属性 _events

class PromiseEventEmitter {
    private _events: Map<string, ListenerWrapper[]>;

    constructor() {
        this._events = new Map();
    }

    // ... 其他方法
}

3.3 emit 方法的核心挑战:异步处理

emit 方法是整个设计的核心。它需要遍历指定事件的所有监听器,并执行它们。对于异步监听器,它必须等待 Promise 解决。为了聚合所有结果并处理潜在的错误,Promise.allSettled() 是一个理想的选择。

Promise.allSettled() 简介:

Promise.allSettled() 方法返回一个 Promise,该 Promise 在所有给定的 Promise 都已解决或拒绝后解决,并带有一个对象数组,每个对象都描述了对应的 Promise 的结果。

属性 描述
status "fulfilled""rejected" 表示 Promise 的最终状态
value Promise 解决时的值 (仅在 fulfilled)
reason Promise 拒绝时的错误 (仅在 rejected)

Promise.allSettled() 的优势在于,即使某些 Promise 拒绝,它也不会立即拒绝,而是会等待所有 Promise 完成,然后报告每个 Promise 的状态。这非常适合我们的 EventEmitter 场景,因为我们通常希望所有监听器都能有机会执行,即使其中一些失败了,我们也希望知道哪些失败了。

四、核心方法实现:逐步构建 PromiseEventEmitter

现在,我们来逐步实现 PromiseEventEmitter 的各个核心方法。

4.1 基础结构与辅助方法

首先,定义我们的类和一些辅助函数。

// 类型定义
type ListenerFunction = (...args: any[]) => any;
type AsyncListenerFunction = (...args: any[]) => Promise<any>;
type GeneralListener = ListenerFunction | AsyncListenerFunction;

// 监听器包装器接口
interface ListenerWrapper {
    listener: GeneralListener;
    once: boolean;
    originalListener?: GeneralListener; // 用于once监听器,以便off能识别原始监听器
}

/**
 * 一个支持 Promise 的 EventEmitter。
 * 能够注册和触发同步或异步的事件监听器。
 * emit 方法返回一个 Promise,该 Promise 在所有监听器执行完毕后解决。
 */
class PromiseEventEmitter {
    private _events: Map<string, ListenerWrapper[]>;

    constructor() {
        this._events = new Map();
    }

    // 辅助方法:检查一个值是否是 Promise
    private isPromise(obj: any): obj is Promise<any> {
        return obj && typeof obj.then === 'function' && typeof obj.catch === 'function';
    }

    // 辅助方法:确保事件名存在于map中
    private getListeners(eventName: string): ListenerWrapper[] {
        if (!this._events.has(eventName)) {
            this._events.set(eventName, []);
        }
        return this._events.get(eventName)!;
    }

    // 辅助方法:移除包装器
    private removeListenerInternal(eventName: string, targetListener: GeneralListener, onceOnly: boolean = false): this {
        const listeners = this._events.get(eventName);
        if (!listeners || listeners.length === 0) {
            return this;
        }

        const initialLength = listeners.length;
        this._events.set(eventName, listeners.filter(wrapper => {
            // 如果是onceOnly,只移除once的监听器
            if (onceOnly && !wrapper.once) {
                return true; // 保留非once的
            }

            // 匹配原始监听器或包装后的监听器
            return !(wrapper.listener === targetListener || wrapper.originalListener === targetListener);
        }));

        // 如果移除后监听器列表为空,则从Map中删除该事件名,节省内存
        if (this._events.get(eventName)?.length === 0) {
            this._events.delete(eventName);
        }

        return this;
    }
}

4.2 on(eventName, listener):注册事件监听器

on 方法负责将监听器添加到事件列表中。

    /**
     * 为指定的事件注册一个监听器。
     * @param eventName 事件名称。
     * @param listener 监听器函数,可以是同步或返回 Promise 的异步函数。
     * @returns PromiseEventEmitter 实例,用于链式调用。
     */
    public on(eventName: string, listener: GeneralListener): this {
        this.getListeners(eventName).push({ listener, once: false });
        return this;
    }

    /**
     * `on` 的别名。
     */
    public addListener(eventName: string, listener: GeneralListener): this {
        return this.on(eventName, listener);
    }

4.3 off(eventName, listener) / removeListener(eventName, listener):移除事件监听器

off 方法需要能够移除普通监听器,以及由 once 方法注册的包装过的监听器。这是通过在 ListenerWrapper 中存储 originalListener 来实现的。

    /**
     * 移除指定事件的指定监听器。
     * @param eventName 事件名称。
     * @param listener 要移除的监听器函数。
     * @returns PromiseEventEmitter 实例,用于链式调用。
     */
    public off(eventName: string, listener: GeneralListener): this {
        return this.removeListenerInternal(eventName, listener);
    }

    /**
     * `off` 的别名。
     */
    public removeListener(eventName: string, listener: GeneralListener): this {
        return this.off(eventName, listener);
    }

4.4 once(eventName, listener):注册一次性事件监听器

once 方法会创建一个包装函数。这个包装函数在第一次执行时,会先执行原始监听器,然后自动将自己从事件列表中移除。

    /**
     * 为指定的事件注册一个一次性监听器。该监听器在事件首次触发后立即移除。
     * @param eventName 事件名称。
     * @param listener 要注册的监听器函数。
     * @returns PromiseEventEmitter 实例,用于链式调用。
     */
    public once(eventName: string, listener: GeneralListener): this {
        const wrapper: ListenerWrapper = {
            listener: (...args: any[]) => {
                this.removeListenerInternal(eventName, wrapper.listener, true); // 移除包装器本身
                return listener(...args); // 执行原始监听器
            },
            once: true,
            originalListener: listener // 存储原始监听器,以便off能识别
        };
        this.getListeners(eventName).push(wrapper);
        return this;
    }

注意 once 方法中的 removeListenerInternal 调用。当 once 监听器被触发时,它会移除自己。removeListenerInternalonceOnly 参数确保在移除 wrapper.listener 时,只会移除 once 类型的监听器,防止误删用户手动注册的相同函数引用(尽管这种情况不常见)。更重要的是,originalListener 字段确保了当用户调用 off(eventName, originalListener) 时,也能正确移除 once 包装器。

4.5 emit(eventName, ...args):发布事件(核心实现)

这是整个类的核心,它需要:

  1. 获取所有监听器。
  2. 为每个监听器创建一个 Promise(即使是同步监听器,也包装成 Promise)。
  3. 使用 Promise.allSettled 等待所有这些 Promise 完成。
  4. 处理 once 监听器的自动移除。
  5. 返回一个 Promise,包含所有监听器的执行结果。
    /**
     * 以异步方式发布一个事件,并等待所有监听器执行完毕。
     * @param eventName 事件名称。
     * @param args 传递给监听器的参数。
     * @returns 一个 Promise,该 Promise 在所有监听器执行完毕后解决。
     *          解决值为 PromiseSettledResult 数组,包含每个监听器的状态和结果/错误。
     */
    public emit(eventName: string, ...args: any[]): Promise<PromiseSettledResult<any>[]> {
        const listeners = this._events.get(eventName);
        if (!listeners || listeners.length === 0) {
            return Promise.resolve([]); // 没有监听器,直接返回一个已解决的 Promise
        }

        // 复制一份监听器列表,以防止在遍历过程中被修改 (例如,once监听器移除自己)
        const listenersToExecute = [...listeners];
        const results: Promise<any>[] = [];

        for (const wrapper of listenersToExecute) {
            // 对于once监听器,在执行前检查是否已经触发过并移除。
            // 实际上,once内部的removeListenerInternal会在执行时移除自己,
            // 这里的复制和遍历确保了即使在emit过程中移除,也不会影响当前循环。
            // 但如果一个once监听器在Promise.allSettled开始前就被其他emit移除了,
            // 那么它就不会被包含在当前emit的results中。
            // 这里我们主要处理的是其自身执行后移除的逻辑。

            try {
                const listenerResult = wrapper.listener.apply(this, args); // 注意:这里使用apply来绑定this上下文

                if (this.isPromise(listenerResult)) {
                    results.push(listenerResult);
                } else {
                    // 同步函数的结果也包装成一个 Promise
                    results.push(Promise.resolve(listenerResult));
                }
            } catch (syncError) {
                // 捕获同步监听器中抛出的错误
                results.push(Promise.reject(syncError));
            }
        }

        // 使用 Promise.allSettled 等待所有监听器完成,无论成功或失败
        return Promise.allSettled(results);
    }

4.6 prependListener(eventName, listener)prependOnceListener(eventName, listener)

这些方法用于在监听器列表的开头添加监听器,这使得它们比普通监听器更早地被触发。

    /**
     * 在指定事件的监听器列表的开头添加一个监听器。
     * @param eventName 事件名称。
     * @param listener 监听器函数。
     * @returns PromiseEventEmitter 实例。
     */
    public prependListener(eventName: string, listener: GeneralListener): this {
        this.getListeners(eventName).unshift({ listener, once: false });
        return this;
    }

    /**
     * 在指定事件的监听器列表的开头添加一个一次性监听器。
     * @param eventName 事件名称。
     * @param listener 监听器函数。
     * @returns PromiseEventEmitter 实例。
     */
    public prependOnceListener(eventName: string, listener: GeneralListener): this {
        const wrapper: ListenerWrapper = {
            listener: (...args: any[]) => {
                this.removeListenerInternal(eventName, wrapper.listener, true);
                return listener(...args);
            },
            once: true,
            originalListener: listener
        };
        this.getListeners(eventName).unshift(wrapper);
        return this;
    }

4.7 其他辅助方法

为了保持与 Node.js EventEmitter 的 API 兼容性,我们还需要实现一些查询和批量移除方法。

    /**
     * 返回指定事件的监听器数量。
     * @param eventName 事件名称。
     * @returns 监听器数量。
     */
    public listenerCount(eventName: string): number {
        return this._events.get(eventName)?.length || 0;
    }

    /**
     * 返回指定事件的所有监听器函数数组。
     * 对于once监听器,返回其原始函数。
     * @param eventName 事件名称。
     * @returns 监听器函数数组。
     */
    public listeners(eventName: string): GeneralListener[] {
        const wrappers = this._events.get(eventName);
        if (!wrappers) {
            return [];
        }
        return wrappers.map(wrapper => wrapper.originalListener || wrapper.listener);
    }

    /**
     * 移除指定事件的所有监听器,或移除所有事件的所有监听器。
     * @param eventName 可选参数,如果指定,则只移除该事件的监听器。
     * @returns PromiseEventEmitter 实例。
     */
    public removeAllListeners(eventName?: string): this {
        if (eventName) {
            this._events.delete(eventName);
        } else {
            this._events = new Map(); // 清空所有事件
        }
        return this;
    }

五、代码实现:PromiseEventEmitter 完整示例

将以上所有部分组合起来,我们得到了一个完整的 PromiseEventEmitter 类。

// 类型定义
type ListenerFunction = (...args: any[]) => any;
type AsyncListenerFunction = (...args: any[]) => Promise<any>;
type GeneralListener = ListenerFunction | AsyncListenerFunction;

// 监听器包装器接口
interface ListenerWrapper {
    listener: GeneralListener;
    once: boolean;
    originalListener?: GeneralListener; // 对于once监听器,存储其原始函数,以便off能够匹配
}

/**
 * 一个支持 Promise 的 EventEmitter。
 * 能够注册和触发同步或异步的事件监听器。
 * emit 方法返回一个 Promise,该 Promise 在所有监听器执行完毕后解决。
 */
class PromiseEventEmitter {
    private _events: Map<string, ListenerWrapper[]>;

    constructor() {
        this._events = new Map();
    }

    // 辅助方法:检查一个值是否是 Promise
    private isPromise(obj: any): obj is Promise<any> {
        return obj && typeof obj.then === 'function' && typeof obj.catch === 'function';
    }

    // 辅助方法:确保事件名存在于map中
    private getListeners(eventName: string): ListenerWrapper[] {
        if (!this._events.has(eventName)) {
            this._events.set(eventName, []);
        }
        return this._events.get(eventName)!;
    }

    // 辅助方法:移除包装器
    private removeListenerInternal(eventName: string, targetListener: GeneralListener, onceOnly: boolean = false): this {
        const listeners = this._events.get(eventName);
        if (!listeners || listeners.length === 0) {
            return this;
        }

        this._events.set(eventName, listeners.filter(wrapper => {
            // 如果是onceOnly模式,只移除once的监听器
            if (onceOnly && !wrapper.once) {
                return true; // 保留非once的监听器
            }

            // 匹配包装后的监听器或原始监听器
            return !(wrapper.listener === targetListener || wrapper.originalListener === targetListener);
        }));

        // 如果移除后监听器列表为空,则从Map中删除该事件名,节省内存
        if (this._events.get(eventName)?.length === 0) {
            this._events.delete(eventName);
        }

        return this;
    }

    /**
     * 为指定的事件注册一个监听器。
     * @param eventName 事件名称。
     * @param listener 监听器函数,可以是同步或返回 Promise 的异步函数。
     * @returns PromiseEventEmitter 实例,用于链式调用。
     */
    public on(eventName: string, listener: GeneralListener): this {
        this.getListeners(eventName).push({ listener, once: false });
        return this;
    }

    /**
     * `on` 的别名。
     */
    public addListener(eventName: string, listener: GeneralListener): this {
        return this.on(eventName, listener);
    }

    /**
     * 移除指定事件的指定监听器。
     * @param eventName 事件名称。
     * @param listener 要移除的监听器函数。
     * @returns PromiseEventEmitter 实例,用于链式调用。
     */
    public off(eventName: string, listener: GeneralListener): this {
        return this.removeListenerInternal(eventName, listener);
    }

    /**
     * `off` 的别名。
     */
    public removeListener(eventName: string, listener: GeneralListener): this {
        return this.off(eventName, listener);
    }

    /**
     * 为指定的事件注册一个一次性监听器。该监听器在事件首次触发后立即移除。
     * @param eventName 事件名称。
     * @param listener 要注册的监听器函数。
     * @returns PromiseEventEmitter 实例,用于链式调用。
     */
    public once(eventName: string, listener: GeneralListener): this {
        const wrapper: ListenerWrapper = {
            listener: (...args: any[]) => {
                this.removeListenerInternal(eventName, wrapper.listener, true); // 移除包装器本身
                return listener.apply(this, args); // 执行原始监听器,并绑定this上下文
            },
            once: true,
            originalListener: listener // 存储原始监听器,以便off能识别
        };
        this.getListeners(eventName).push(wrapper);
        return this;
    }

    /**
     * 在指定事件的监听器列表的开头添加一个监听器。
     * @param eventName 事件名称。
     * @param listener 监听器函数。
     * @returns PromiseEventEmitter 实例。
     */
    public prependListener(eventName: string, listener: GeneralListener): this {
        this.getListeners(eventName).unshift({ listener, once: false });
        return this;
    }

    /**
     * 在指定事件的监听器列表的开头添加一个一次性监听器。
     * @param eventName 事件名称。
     * @param listener 监听器函数。
     * @returns PromiseEventEmitter 实例。
     */
    public prependOnceListener(eventName: string, listener: GeneralListener): this {
        const wrapper: ListenerWrapper = {
            listener: (...args: any[]) => {
                this.removeListenerInternal(eventName, wrapper.listener, true);
                return listener.apply(this, args); // 执行原始监听器,并绑定this上下文
            },
            once: true,
            originalListener: listener
        };
        this.getListeners(eventName).unshift(wrapper);
        return this;
    }

    /**
     * 以异步方式发布一个事件,并等待所有监听器执行完毕。
     * @param eventName 事件名称。
     * @param args 传递给监听器的参数。
     * @returns 一个 Promise,该 Promise 在所有监听器执行完毕后解决。
     *          解决值为 PromiseSettledResult 数组,包含每个监听器的状态和结果/错误。
     */
    public emit(eventName: string, ...args: any[]): Promise<PromiseSettledResult<any>[]> {
        const listeners = this._events.get(eventName);
        if (!listeners || listeners.length === 0) {
            return Promise.resolve([]); // 没有监听器,直接返回一个已解决的 Promise
        }

        // 复制一份监听器列表,以防止在遍历过程中被修改 (例如,once监听器移除自己)
        const listenersToExecute = [...listeners];
        const results: Promise<any>[] = [];

        for (const wrapper of listenersToExecute) {
            try {
                // 使用 apply 方法确保监听器内部的 this 指向 PromiseEventEmitter 实例
                const listenerResult = wrapper.listener.apply(this, args);

                if (this.isPromise(listenerResult)) {
                    results.push(listenerResult);
                } else {
                    // 同步函数的结果也包装成一个 Promise
                    results.push(Promise.resolve(listenerResult));
                }
            } catch (syncError) {
                // 捕获同步监听器中抛出的错误,并将其包装成一个 Promise.reject
                results.push(Promise.reject(syncError));
            }
        }

        // 使用 Promise.allSettled 等待所有监听器完成,无论成功或失败
        return Promise.allSettled(results);
    }

    /**
     * 返回指定事件的监听器数量。
     * @param eventName 事件名称。
     * @returns 监听器数量。
     */
    public listenerCount(eventName: string): number {
        return this._events.get(eventName)?.length || 0;
    }

    /**
     * 返回指定事件的所有监听器函数数组。
     * 对于once监听器,返回其原始函数。
     * @param eventName 事件名称。
     * @returns 监听器函数数组。
     */
    public listeners(eventName: string): GeneralListener[] {
        const wrappers = this._events.get(eventName);
        if (!wrappers) {
            return [];
        }
        return wrappers.map(wrapper => wrapper.originalListener || wrapper.listener);
    }

    /**
     * 移除指定事件的所有监听器,或移除所有事件的所有监听器。
     * @param eventName 可选参数,如果指定,则只移除该事件的监听器。
     * @returns PromiseEventEmitter 实例。
     */
    public removeAllListeners(eventName?: string): this {
        if (eventName) {
            this._events.delete(eventName);
        } else {
            this._events = new Map(); // 清空所有事件
        }
        return this;
    }
}

六、高级主题与最佳实践

6.1 错误处理策略的细化

emit 方法中,我们使用了 Promise.allSettled 来收集所有监听器的结果,包括成功和失败。这是一种“收集所有错误”的策略,它允许 emit 的调用者看到所有监听器的状态。

表:Promise.all vs Promise.allSettledemit 中的选择

特性 Promise.all Promise.allSettled
行为 任何一个 Promise 拒绝,则整体 Promise 立即拒绝。 等待所有 Promise 完成,无论成功或失败。
返回类型 Promise<T[]> Promise<PromiseSettledResult[]>
错误报告 只报告第一个拒绝的错误。 报告所有 Promise 的状态(fulfilled/rejected)及值/原因。
适用场景 “所有都成功才算成功”,适合需要快速失败的场景。 “知道所有结果”,即使有失败也想继续执行并收集信息。
EventEmitter 实现 failFastEmit,第一个监听器失败就中断。 实现 emit,收集所有监听器结果,默认策略。

默认的 emit 实现使用了 Promise.allSettled,这提供了一个全面的错误报告。如果需要“快速失败”的行为,可以实现一个 failFastEmit 方法:

    /**
     * 以快速失败模式发布事件。任何一个监听器拒绝,整个 Promise 立即拒绝。
     * @param eventName 事件名称。
     * @param args 传递给监听器的参数。
     * @returns 一个 Promise,该 Promise 在所有监听器成功后解决,或在第一个监听器拒绝时拒绝。
     *          解决值为所有监听器返回值的数组。
     */
    public async failFastEmit(eventName: string, ...args: any[]): Promise<any[]> {
        const listeners = this._events.get(eventName);
        if (!listeners || listeners.length === 0) {
            return Promise.resolve([]);
        }

        const promises: Promise<any>[] = [];
        for (const wrapper of listeners) {
            try {
                const listenerResult = wrapper.listener.apply(this, args);
                promises.push(this.isPromise(listenerResult) ? listenerResult : Promise.resolve(listenerResult));
            } catch (syncError) {
                // 同步错误直接导致 Promise.all 拒绝
                promises.push(Promise.reject(syncError));
            }
        }
        return Promise.all(promises); // 使用 Promise.all
    }

事件内部错误处理:
监听器内部应该尽可能地自行处理错误。例如:

myEmitter.on('data', async (data) => {
    try {
        await processData(data);
        console.log('Data processed successfully.');
    } catch (error) {
        console.error('Error processing data:', error.message);
        // 可以选择重新抛出错误,或者吞掉它
        // throw error; // 如果想让emit的Promise拒绝,则重新抛出
    }
});

6.2 并发与顺序执行

我们当前的 emit 实现中,所有异步监听器都是并发执行的,因为 Promise.allSettled 会同时等待所有 Promise。这通常是理想的,因为它最大化了效率。

然而,在某些场景下,你可能需要监听器按照注册的顺序顺序执行。这可以通过 Array.prototype.reduce 链式调用 Promise 来实现。

    /**
     * 以顺序方式发布一个事件,等待每个监听器完成后再执行下一个。
     * @param eventName 事件名称。
     * @param args 传递给监听器的参数。
     * @returns 一个 Promise,该 Promise 在所有监听器顺序执行完毕后解决。
     *          解决值为 PromiseSettledResult 数组。
     */
    public async emitSequentially(eventName: string, ...args: any[]): Promise<PromiseSettledResult<any>[]> {
        const listeners = this._events.get(eventName);
        if (!listeners || listeners.length === 0) {
            return Promise.resolve([]);
        }

        const results: PromiseSettledResult<any>[] = [];
        let currentPromiseChain = Promise.resolve(); // 初始链

        for (const wrapper of listeners) {
            currentPromiseChain = currentPromiseChain.then(async () => {
                let listenerResultPromise: Promise<any>;
                try {
                    const res = wrapper.listener.apply(this, args);
                    listenerResultPromise = this.isPromise(res) ? res : Promise.resolve(res);
                } catch (syncError) {
                    listenerResultPromise = Promise.reject(syncError);
                }

                // 等待当前监听器完成,并收集其结果
                try {
                    const value = await listenerResultPromise;
                    results.push({ status: 'fulfilled', value });
                } catch (reason) {
                    results.push({ status: 'rejected', reason });
                }
            });
        }
        await currentPromiseChain; // 等待整个链完成
        return results;
    }

何时选择并发,何时选择顺序?

  • 并发 (默认 emit):
    • 优点: 效率最高,总执行时间取决于最慢的监听器。
    • 缺点: 监听器之间不能有顺序依赖。
    • 适用场景: 大多数情况,当监听器是独立的,互不影响时。
  • 顺序 (emitSequentially):
    • 优点: 确保监听器按照注册顺序执行,前一个完成才能触发下一个。
    • 缺点: 效率较低,总执行时间是所有监听器执行时间之和。
    • 适用场景: 当监听器之间存在严格的顺序依赖关系时(例如,先验证数据,再保存数据,再发送通知)。

6.3 上下文绑定

emitonce 的包装函数中,我们使用了 listener.apply(this, args)。这里的 this 指向 PromiseEventEmitter 的实例。这遵循了 Node.js EventEmitter 的行为,即监听器内部的 this 默认指向 EventEmitter 实例本身。如果监听器是箭头函数,this 会词法绑定到定义它的作用域。如果需要自定义 this,监听器可以自行使用 bind

const myObject = {
    name: 'My Context',
    handler: async function(data: string) {
        console.log(`Context: ${this.name}, Received: ${data}`);
        await new Promise(res => setTimeout(res, 100));
        return `Processed by ${this.name}`;
    }
};

emitter.on('data', myObject.handler.bind(myObject)); // 显式绑定上下文

6.4 性能考量

  • Promise 开销: 每次 emit 调用都会创建新的 Promise,并使用 Promise.allSettled。对于极高频率的事件和大量监听器,这会引入一定的开销。在大多数业务场景下,这种开销是可接受的,因为 Promise 已经高度优化。
  • 监听器数量: 监听器数量越多,emit 遍历和等待的 Promise 就越多,自然会增加处理时间。
  • 避免过度使用: 尽管 PromiseEventEmitter 功能强大,但并非所有异步通信都适合事件驱动。简单的 Promise 链或直接函数调用可能更高效。事件驱动更适用于解耦和一对多通信。

6.5 命名约定

我们遵循了 Node.js EventEmitter 的命名约定:

  • onaddListener 是等价的。
  • offremoveListener 是等价的。
  • emit 是事件发布方法。

七、测试与示例

现在,让我们通过一些测试用例来验证 PromiseEventEmitter 的功能。

const emitter = new PromiseEventEmitter();

// --- 1. 同步监听器测试 ---
console.log('--- 同步监听器测试 ---');
emitter.on('syncEvent', (data: string) => {
    console.log(`Sync Listener 1: ${data}`);
    return 'Sync1 Result';
});

emitter.on('syncEvent', (data: string) => {
    console.log(`Sync Listener 2: ${data}`);
    if (data === 'error') {
        throw new Error('Sync Listener 2 failed!');
    }
    return 'Sync2 Result';
});

(async () => {
    const syncResults = await emitter.emit('syncEvent', 'Hello Sync!');
    console.log('Sync emit results:', syncResults);
    // 预期输出:
    // Sync Listener 1: Hello Sync!
    // Sync Listener 2: Hello Sync!
    // Sync emit results: [ { status: 'fulfilled', value: 'Sync1 Result' }, { status: 'fulfilled', value: 'Sync2 Result' } ]

    const syncErrorResults = await emitter.emit('syncEvent', 'error');
    console.log('Sync emit with error results:', syncErrorResults);
    // 预期输出:
    // Sync Listener 1: error
    // Sync Listener 2: error
    // Sync emit with error results: [ { status: 'fulfilled', value: 'Sync1 Result' }, { status: 'rejected', reason: [Error: Sync Listener 2 failed!] } ]
})();

// --- 2. 异步监听器测试 ---
console.log('n--- 异步监听器测试 ---');
emitter.on('asyncEvent', async (data: string) => {
    await new Promise(res => setTimeout(res, 100));
    console.log(`Async Listener 1: ${data}`);
    return `Async1 Processed: ${data}`;
});

emitter.on('asyncEvent', async (data: string) => {
    await new Promise(res => setTimeout(res, 50));
    if (data === 'fail') {
        throw new Error('Async Listener 2 failed!');
    }
    console.log(`Async Listener 2: ${data}`);
    return `Async2 Processed: ${data}`;
});

(async () => {
    const asyncResults = await emitter.emit('asyncEvent', 'Hello Async!');
    console.log('Async emit results:', asyncResults);
    // 预期输出 (顺序可能因setTimeout而异,但最终结果会收集):
    // Async Listener 2: Hello Async!
    // Async Listener 1: Hello Async!
    // Async emit results: [ { status: 'fulfilled', value: 'Async1 Processed: Hello Async!' }, { status: 'fulfilled', value: 'Async2 Processed: Hello Async!' } ]

    const asyncErrorResults = await emitter.emit('asyncEvent', 'fail');
    console.log('Async emit with error results:', asyncErrorResults);
    // 预期输出:
    // Async Listener 2: fail
    // Async Listener 1: fail
    // Async emit with error results: [ { status: 'fulfilled', value: 'Async1 Processed: fail' }, { status: 'rejected', reason: [Error: Async Listener 2 failed!] } ]
})();

// --- 3. once 行为测试 ---
console.log('n--- once 行为测试 ---');
let onceCounter = 0;
const onceListener = async (data: string) => {
    await new Promise(res => setTimeout(res, 10));
    onceCounter++;
    console.log(`Once Listener executed. Count: ${onceCounter}, Data: ${data}`);
    return `Once Result ${onceCounter}`;
};
emitter.once('onceEvent', onceListener);
emitter.on('onceEvent', (data: string) => {
    console.log(`Normal Listener for onceEvent: ${data}`);
    return 'Normal for Once';
});

(async () => {
    console.log('emit onceEvent (1st time)');
    await emitter.emit('onceEvent', 'First Call');
    console.log('Listeners for onceEvent after 1st emit:', emitter.listenerCount('onceEvent')); // 预期 1 (只剩普通监听器)

    console.log('emit onceEvent (2nd time)');
    await emitter.emit('onceEvent', 'Second Call');
    console.log('Listeners for onceEvent after 2nd emit:', emitter.listenerCount('onceEvent')); // 预期 1

    console.log('Once listener counter:', onceCounter); // 预期 1
})();

// --- 4. removeListener/off 测试 ---
console.log('n--- removeListener/off 测试 ---');
const handler1 = () => console.log('Handler 1');
const handler2 = () => console.log('Handler 2');
emitter.on('removeTest', handler1);
emitter.on('removeTest', handler2);
console.log('Before remove:', emitter.listenerCount('removeTest')); // 预期 2
emitter.off('removeTest', handler1);
console.log('After removing handler1:', emitter.listenerCount('removeTest')); // 预期 1

// 移除once监听器测试
const onceRemovable = () => console.log('This once should be removable');
emitter.once('removableOnce', onceRemovable);
console.log('Before removing once:', emitter.listenerCount('removableOnce')); // 预期 1
emitter.off('removableOnce', onceRemovable); // 应该能通过 originalListener 移除
console.log('After removing once:', emitter.listenerCount('removableOnce')); // 预期 0

// --- 5. prependListener 测试 ---
console.log('n--- prependListener 测试 ---');
emitter.on('prependTest', () => console.log('Last Listener'));
emitter.prependListener('prependTest', () => console.log('First Listener'));
emitter.emit('prependTest');
// 预期输出:
// First Listener
// Last Listener

// --- 6. failFastEmit 测试 ---
console.log('n--- failFastEmit 测试 ---');
const failFastEmitter = new PromiseEventEmitter();
failFastEmitter.on('failFast', async () => { await new Promise(res => setTimeout(res, 50)); console.log('FailFast Listener 1'); return 'L1'; });
failFastEmitter.on('failFast', async () => { await new Promise((_, rej) => setTimeout(() => rej(new Error('FailFast Error!')), 10)); console.log('FailFast Listener 2'); });
failFastEmitter.on('failFast', async () => { await new Promise(res => setTimeout(res, 100)); console.log('FailFast Listener 3'); return 'L3'; });

(async () => {
    try {
        console.log('Attempting failFastEmit...');
        const results = await failFastEmitter.failFastEmit('failFast');
        console.log('FailFast results:', results); // 这行不应该执行
    } catch (error: any) {
        console.error('FailFast caught error:', error.message);
    }
    // 预期输出:
    // Attempting failFastEmit...
    // FailFast caught error: FailFast Error!
    // (Listener 1和3可能不会被完全打印,因为Promise.all会提前拒绝)
})();

// --- 7. emitSequentially 测试 ---
console.log('n--- emitSequentially 测试 ---');
const sequentialEmitter = new PromiseEventEmitter();
sequentialEmitter.on('seqEvent', async () => { await new Promise(res => setTimeout(res, 50)); console.log('Seq Listener 1'); return 'S1'; });
sequentialEmitter.on('seqEvent', async () => { await new Promise(res => setTimeout(res, 20)); console.log('Seq Listener 2'); return 'S2'; });
sequentialEmitter.on('seqEvent', async () => { await new Promise(res => setTimeout(res, 80)); console.log('Seq Listener 3'); return 'S3'; });

(async () => {
    console.log('Attempting emitSequentially...');
    const results = await sequentialEmitter.emitSequentially('seqEvent');
    console.log('Sequential emit results:', results);
    // 预期输出 (严格按照顺序):
    // Attempting emitSequentially...
    // Seq Listener 1
    // Seq Listener 2
    // Seq Listener 3
    // Sequential emit results: [ { status: 'fulfilled', value: 'S1' }, { status: 'fulfilled', value: 'S2' }, { status: 'fulfilled', value: 'S3' } ]
})();

八、实际应用场景与部署考量

支持 Promise 的 EventEmitter 极大地扩展了事件驱动模式的应用范围,使其在现代异步系统中更具实用性。

  • 微服务架构中的事件总线: 在服务内部,可以用它来协调多个业务逻辑模块。例如,当一个用户注册事件发生时,可以触发多个异步任务,如发送欢迎邮件、初始化用户数据、更新统计信息等。emit 方法返回的 Promise 可以让调用服务知道所有这些任务是否都已启动或完成。
  • 复杂前端应用的状态管理: 在 React、Vue 等框架中,可以使用 PromiseEventEmitter 作为组件间通信或状态更新的机制。例如,用户执行某个操作(如点击保存),emit 一个 saveData 事件,多个组件可以异步响应(如更新 UI、发送网络请求、本地缓存等)。
  • 插件系统与扩展机制: 允许第三方插件通过注册异步监听器来扩展核心应用的功能。
  • 服务器端长时间运行任务的协调: 当一个请求触发了多个需要长时间执行的后台任务时,PromiseEventEmitter 可以协调这些任务,并最终向客户端返回所有任务的状态。
  • 避免滥用: 尽管功能强大,但并非所有场景都适合事件驱动。
    • 何时使用 PromiseEventEmitter 当你需要一对多的通信模式,并且事件的发布者不关心(或不应该关心)具体有多少个订阅者,以及订阅者的具体实现细节时。特别适用于需要解耦、扩展性和异步协调的场景。
    • 何时直接使用 Promise 或回调: 当通信是一对一的,或者调用者需要对被调用者的行为有强烈的控制和直接的返回结果时,直接使用 Promise 链、async/await 或传统回调可能更简单直观。

九、本篇讲座的要点概括

我们深入探讨了如何从零开始构建一个功能强大、支持 Promise 的 EventEmitter。通过引入异步能力,我们极大地扩展了事件驱动架构的应用范围,使其能无缝集成到现代异步编程范式中。理解其内部机制、权衡各种设计选择,将有助于您在复杂系统中更有效地利用事件驱动模式。掌握这个模式,您将能够构建出更加健壮、灵活且易于维护的异步应用。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注