Swoole/Hyperf中的自定义事件分发器:在协程中同步或异步触发事件

Swoole/Hyperf 自定义事件分发器:协程中的同步与异步触发

各位同学,大家好!今天我们来深入探讨一下 Swoole/Hyperf 框架中自定义事件分发器的实现,重点关注在协程环境下如何进行同步和异步事件触发。事件驱动架构是构建可扩展、解耦系统的利器,而 Swoole/Hyperf 提供的协程机制则为事件处理带来了更高的并发性能。

1. 事件驱动架构与 Swoole/Hyperf 的契合

事件驱动架构(EDA)是一种软件架构模式,它通过事件的产生、检测、处理和响应来解耦系统的各个组件。当一个事件发生时,生产者(Producer)发布该事件,而一个或多个消费者(Consumer)订阅该事件并执行相应的处理逻辑。

Swoole/Hyperf 框架基于 Swoole 扩展构建,天然支持协程。协程是一种轻量级的线程,可以在用户态进行切换,避免了线程切换的开销。结合 EDA,我们可以在协程中异步地处理事件,从而提高系统的并发能力。

2. 实现一个简单的事件分发器

首先,我们来实现一个简单的事件分发器,它包含事件的注册、触发和监听功能。

<?php

namespace AppEvent;

use PsrEventDispatcherEventDispatcherInterface;
use PsrEventDispatcherListenerProviderInterface;
use PsrEventDispatcherStoppableEventInterface;

class EventDispatcher implements EventDispatcherInterface
{
    /**
     * @var ListenerProviderInterface
     */
    private ListenerProviderInterface $listenerProvider;

    public function __construct(ListenerProviderInterface $listenerProvider)
    {
        $this->listenerProvider = $listenerProvider;
    }

    /**
     * Dispatches an event to all registered listeners.
     *
     * @param object $event The event to pass to the listeners.
     *
     * @return object The event that was passed, now modified by listeners.
     */
    public function dispatch(object $event): object
    {
        foreach ($this->listenerProvider->getListenersForEvent($event) as $listener) {
            if ($event instanceof StoppableEventInterface && $event->isPropagationStopped()) {
                break;
            }

            $listener($event);
        }

        return $event;
    }
}

这个 EventDispatcher 类实现了 PsrEventDispatcherEventDispatcherInterface 接口,该接口定义了 dispatch 方法,用于将事件分发给所有注册的监听器。它依赖于 ListenerProviderInterface 接口,用于获取事件对应的监听器列表。

接下来,我们实现一个简单的 ListenerProvider

<?php

namespace AppEvent;

use PsrEventDispatcherListenerProviderInterface;

class ListenerProvider implements ListenerProviderInterface
{
    /**
     * @var array<string, array<callable>>
     */
    private array $listeners = [];

    /**
     * @param string $eventClassName
     * @param callable $listener
     */
    public function addListener(string $eventClassName, callable $listener): void
    {
        if (!isset($this->listeners[$eventClassName])) {
            $this->listeners[$eventClassName] = [];
        }

        $this->listeners[$eventClassName][] = $listener;
    }

    /**
     * @param object $event An event for which to provide listeners.
     *
     * @return iterable<callable>
     *   An iterable (array, iterator, etc.) of callables (closures,
     *   invokable objects, etc.) that are interested in the passed
     *   $event.
     */
    public function getListenersForEvent(object $event): iterable
    {
        $eventClassName = get_class($event);

        return $this->listeners[$eventClassName] ?? [];
    }
}

ListenerProvider 类实现了 PsrEventDispatcherListenerProviderInterface 接口,它维护了一个事件类名与监听器列表的映射关系。addListener 方法用于注册监听器,getListenersForEvent 方法用于获取指定事件的监听器列表。

3. 定义事件和监听器

现在,我们来定义一个事件和一个监听器。

<?php

namespace AppEvent;

class UserRegistered
{
    public int $userId;

    public function __construct(int $userId)
    {
        $this->userId = $userId;
    }
}

UserRegistered 事件表示用户注册成功,它包含一个 userId 属性。

<?php

namespace AppListener;

use AppEventUserRegistered;
use PsrLogLoggerInterface;

class SendWelcomeEmail
{
    private LoggerInterface $logger;

    public function __construct(LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    public function __invoke(UserRegistered $event): void
    {
        // 模拟发送欢迎邮件
        $this->logger->info("Sending welcome email to user: {$event->userId}");
        sleep(1); // 模拟耗时操作
        $this->logger->info("Welcome email sent to user: {$event->userId}");
    }
}

SendWelcomeEmail 监听器用于处理 UserRegistered 事件,它会发送一封欢迎邮件。这里我们使用 PsrLogLoggerInterface 记录日志,并使用 sleep(1) 模拟耗时操作,以便演示异步事件处理的效果。

4. 同步触发事件

现在,我们来演示如何同步触发事件。

<?php

use AppEventEventDispatcher;
use AppEventListenerProvider;
use AppEventUserRegistered;
use AppListenerSendWelcomeEmail;
use PsrLogLoggerInterface;

require_once 'vendor/autoload.php';

// 模拟Logger
class Logger implements LoggerInterface
{
    public function emergency(string|Stringable $message, array $context = []): void {}
    public function alert(string|Stringable $message, array $context = []): void {}
    public function critical(string|Stringable $message, array $context = []): void {}
    public function error(string|Stringable $message, array $context = []): void {}
    public function warning(string|Stringable $message, array $context = []): void {}
    public function notice(string|Stringable $message, array $context = []): void {}
    public function info(string|Stringable $message, array $context = []): void {echo $message . PHP_EOL;}
    public function debug(string|Stringable $message, array $context = []): void {}
    public function log($level, string|Stringable $message, array $context = []): void {}
}

$logger = new Logger();

$listenerProvider = new ListenerProvider();
$listenerProvider->addListener(UserRegistered::class, new SendWelcomeEmail($logger));

$eventDispatcher = new EventDispatcher($listenerProvider);

$userRegisteredEvent = new UserRegistered(123);

echo "Dispatching UserRegistered event synchronously..." . PHP_EOL;
$eventDispatcher->dispatch($userRegisteredEvent);
echo "UserRegistered event dispatched synchronously." . PHP_EOL;

这段代码首先创建了一个 ListenerProvider 实例,并将 SendWelcomeEmail 监听器注册到 UserRegistered 事件上。然后,创建了一个 EventDispatcher 实例,并将 ListenerProvider 注入到其中。最后,创建了一个 UserRegistered 事件,并使用 EventDispatcher::dispatch 方法同步触发该事件。

同步触发事件的特点是,dispatch 方法会阻塞当前协程,直到所有的监听器都执行完毕。这意味着,如果某个监听器执行时间较长,会影响整个系统的响应速度。

5. 异步触发事件

为了解决同步触发事件带来的性能问题,我们可以使用协程异步地触发事件。

<?php

use AppEventEventDispatcher;
use AppEventListenerProvider;
use AppEventUserRegistered;
use AppListenerSendWelcomeEmail;
use PsrLogLoggerInterface;
use SwooleCoroutine;

require_once 'vendor/autoload.php';

// 模拟Logger
class Logger implements LoggerInterface
{
    public function emergency(string|Stringable $message, array $context = []): void {}
    public function alert(string|Stringable $message, array $context = []): void {}
    public function critical(string|Stringable $message, array $context = []): void {}
    public function error(string|Stringable $message, array $context = []): void {}
    public function warning(string|Stringable $message, array $context = []): void {}
    public function notice(string|Stringable $message, array $context = []): void {}
    public function info(string|Stringable $message, array $context = []): void {echo $message . PHP_EOL;}
    public function debug(string|Stringable $message, array $context = []): void {}
    public function log($level, string|Stringable $message, array $context = []): void {}
}

$logger = new Logger();

$listenerProvider = new ListenerProvider();
$listenerProvider->addListener(UserRegistered::class, new SendWelcomeEmail($logger));

$eventDispatcher = new EventDispatcher($listenerProvider);

$userRegisteredEvent = new UserRegistered(123);

echo "Dispatching UserRegistered event asynchronously..." . PHP_EOL;
Coroutine::create(function () use ($eventDispatcher, $userRegisteredEvent) {
    $eventDispatcher->dispatch($userRegisteredEvent);
});
echo "UserRegistered event dispatched asynchronously." . PHP_EOL;

这段代码与同步触发事件的代码基本相同,唯一的区别是,我们使用 SwooleCoroutine::create 方法创建了一个新的协程,并在该协程中调用 EventDispatcher::dispatch 方法。

异步触发事件的特点是,dispatch 方法不会阻塞当前协程,而是立即返回。监听器会在新的协程中执行,不会影响主协程的运行。这意味着,即使某个监听器执行时间较长,也不会影响整个系统的响应速度。

6. 自定义事件分发器中的协程处理

在实际项目中,我们可能需要更灵活地控制事件的处理方式。例如,我们可能希望某些事件同步处理,而另一些事件异步处理。为此,我们可以对事件分发器进行定制。

<?php

namespace AppEvent;

use PsrEventDispatcherEventDispatcherInterface;
use PsrEventDispatcherListenerProviderInterface;
use PsrEventDispatcherStoppableEventInterface;
use SwooleCoroutine;

class CustomEventDispatcher implements EventDispatcherInterface
{
    /**
     * @var ListenerProviderInterface
     */
    private ListenerProviderInterface $listenerProvider;

    /**
     * @var array<string, bool>  // event_class => async flag
     */
    private array $asyncEvents = [];

    public function __construct(ListenerProviderInterface $listenerProvider, array $asyncEvents = [])
    {
        $this->listenerProvider = $listenerProvider;
        $this->asyncEvents = $asyncEvents;
    }

    public function setAsync(string $eventClassName, bool $async = true): void
    {
        $this->asyncEvents[$eventClassName] = $async;
    }

    /**
     * Dispatches an event to all registered listeners.
     *
     * @param object $event The event to pass to the listeners.
     *
     * @return object The event that was passed, now modified by listeners.
     */
    public function dispatch(object $event): object
    {
        $eventClassName = get_class($event);
        $async = $this->asyncEvents[$eventClassName] ?? false;

        foreach ($this->listenerProvider->getListenersForEvent($event) as $listener) {
            if ($event instanceof StoppableEventInterface && $event->isPropagationStopped()) {
                break;
            }

            if ($async) {
                Coroutine::create(function () use ($listener, $event) {
                    $listener($event);
                });
            } else {
                $listener($event);
            }
        }

        return $event;
    }
}

在这个 CustomEventDispatcher 类中,我们添加了一个 $asyncEvents 属性,用于指定哪些事件需要异步处理。dispatch 方法会根据 $asyncEvents 属性的值,决定是否在协程中执行监听器。

使用示例:

<?php

use AppEventCustomEventDispatcher;
use AppEventListenerProvider;
use AppEventUserRegistered;
use AppEventOrderCreated;
use AppListenerSendWelcomeEmail;
use AppListenerSendOrderNotification;
use PsrLogLoggerInterface;

require_once 'vendor/autoload.php';

// 模拟Logger
class Logger implements LoggerInterface
{
    public function emergency(string|Stringable $message, array $context = []): void {}
    public function alert(string|Stringable $message, array $context = []): void {}
    public function critical(string|Stringable $message, array $context = []): void {}
    public function error(string|Stringable $message, array $context = []): void {}
    public function warning(string|Stringable $message, array $context = []): void {}
    public function notice(string|Stringable $message, array $context = []): void {}
    public function info(string|Stringable $message, array $context = []): void {echo $message . PHP_EOL;}
    public function debug(string|Stringable $message, array $context = []): void {}
    public function log($level, string|Stringable $message, array $context = []): void {}
}

$logger = new Logger();

$listenerProvider = new ListenerProvider();
$listenerProvider->addListener(UserRegistered::class, new SendWelcomeEmail($logger));
$listenerProvider->addListener(OrderCreated::class, new SendOrderNotification($logger));

$eventDispatcher = new CustomEventDispatcher($listenerProvider, [
    UserRegistered::class => true, // UserRegistered event will be dispatched asynchronously
]);

$userRegisteredEvent = new UserRegistered(123);
$orderCreatedEvent = new OrderCreated(456);

echo "Dispatching UserRegistered event..." . PHP_EOL;
$eventDispatcher->dispatch($userRegisteredEvent);
echo "Dispatching OrderCreated event..." . PHP_EOL;
$eventDispatcher->dispatch($orderCreatedEvent);

echo "Events dispatched." . PHP_EOL;

class OrderCreated {
    public int $orderId;

    public function __construct(int $orderId)
    {
        $this->orderId = $orderId;
    }
}

class SendOrderNotification {
    private LoggerInterface $logger;

    public function __construct(LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    public function __invoke(OrderCreated $event): void
    {
        // 模拟发送订单通知
        $this->logger->info("Sending order notification for order: {$event->orderId}");
        sleep(1); // 模拟耗时操作
        $this->logger->info("Order notification sent for order: {$event->orderId}");
    }
}

在这个示例中,我们将 UserRegistered 事件设置为异步处理,而 OrderCreated 事件使用同步处理。

7. 使用 Hyperf 的事件系统

Hyperf 框架内置了事件系统,它基于 symfony/event-dispatcher 组件,并提供了更方便的配置和使用方式。

首先,我们需要在 config/autoload/listeners.php 文件中配置监听器。

<?php

declare(strict_types=1);

return [
    AppListenerSendWelcomeEmail::class,
    AppListenerSendOrderNotification::class,
];

然后,在 config/autoload/events.php 文件中配置事件与监听器的映射关系。

<?php

declare(strict_types=1);

return [
    'listeners' => [
        AppEventUserRegistered::class => [
            AppListenerSendWelcomeEmail::class,
        ],
        AppEventOrderCreated::class => [
            AppListenerSendOrderNotification::class,
        ],
    ],
];

最后,我们可以使用 HyperfContractEventDispatcherInterface 接口来触发事件。

<?php

namespace AppService;

use AppEventUserRegistered;
use AppEventOrderCreated;
use HyperfContractEventDispatcherInterface;

class UserService
{
    private EventDispatcherInterface $eventDispatcher;

    public function __construct(EventDispatcherInterface $eventDispatcher)
    {
        $this->eventDispatcher = $eventDispatcher;
    }

    public function register(int $userId): void
    {
        // 用户注册逻辑
        // ...

        $this->eventDispatcher->dispatch(new UserRegistered($userId));
    }

    public function createOrder(int $orderId): void
    {
        // 创建订单逻辑
        // ...

        $this->eventDispatcher->dispatch(new OrderCreated($orderId));
    }
}

Hyperf 的事件系统默认使用协程异步处理事件。如果需要同步处理事件,可以在监听器中实现 HyperfEventContractListenerInterface 接口,并设置 $preserve = true

<?php

namespace AppListener;

use AppEventUserRegistered;
use HyperfEventContractListenerInterface;
use PsrLogLoggerInterface;

class SendWelcomeEmail implements ListenerInterface
{
    private LoggerInterface $logger;

    public function __construct(LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    public function process(object $event): void
    {
        if ($event instanceof UserRegistered) {
            // 模拟发送欢迎邮件
            $this->logger->info("Sending welcome email to user: {$event->userId}");
            sleep(1); // 模拟耗时操作
            $this->logger->info("Welcome email sent to user: {$event->userId}");
        }
    }

    public function isEnable(): bool
    {
        return true;
    }

    public function isAsync(): bool {
        return true; // Set to false for synchronous processing
    }
}

表格总结同步与异步事件处理的差异

特性 同步事件处理 异步事件处理
执行方式 阻塞当前协程,顺序执行 不阻塞当前协程,并发执行
性能 较低,耗时监听器会影响系统响应速度 较高,耗时监听器不会影响系统响应速度
可靠性 较高,如果监听器执行失败,会抛出异常 较低,如果监听器执行失败,可能无法捕获异常
适用场景 对实时性要求较高,且监听器执行时间较短的事件 对实时性要求不高,或监听器执行时间较长的事件

8. 错误处理与重试机制

在异步事件处理中,错误处理是一个重要的环节。由于事件是在新的协程中执行的,主协程无法直接捕获监听器抛出的异常。因此,我们需要在监听器中进行错误处理,并采取适当的措施,例如记录日志、发送警报、重试等。

可以使用 try-catch 块捕获监听器中的异常,并使用重试机制来处理临时性的错误。

<?php

namespace AppListener;

use AppEventUserRegistered;
use PsrLogLoggerInterface;
use SwooleCoroutine;

class SendWelcomeEmail
{
    private LoggerInterface $logger;

    public function __construct(LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    public function __invoke(UserRegistered $event): void
    {
        $retryCount = 3;
        for ($i = 0; $i < $retryCount; $i++) {
            try {
                // 模拟发送欢迎邮件
                $this->logger->info("Sending welcome email to user: {$event->userId}, attempt: {$i + 1}");
                // 模拟网络错误或服务器错误
                if (rand(0, 1) == 0) {
                    throw new Exception("Failed to send welcome email");
                }
                sleep(1); // 模拟耗时操作
                $this->logger->info("Welcome email sent to user: {$event->userId}");
                return; // 成功发送,退出循环
            } catch (Exception $e) {
                $this->logger->error("Failed to send welcome email: {$e->getMessage()}");
                Coroutine::sleep(1); // 等待一段时间后重试
            }
        }

        $this->logger->error("Failed to send welcome email after {$retryCount} attempts for user: {$event->userId}");
        // 可以将事件放入死信队列,稍后手动处理
    }
}

9. 事件溯源与持久化

在一些场景中,我们需要对事件进行溯源,即记录事件的发生过程和状态变化。这可以通过将事件持久化到数据库或消息队列来实现。

事件溯源可以帮助我们分析系统的行为,排查问题,以及进行数据恢复。

10. 事件的停止传播

有时候,我们可能希望在某个监听器处理完事件后,停止事件的传播,不再执行后续的监听器。可以通过实现 PsrEventDispatcherStoppableEventInterface 接口来实现。

<?php

namespace AppEvent;

use PsrEventDispatcherStoppableEventInterface;

class UserRegistered implements StoppableEventInterface
{
    public int $userId;

    private bool $propagationStopped = false;

    public function __construct(int $userId)
    {
        $this->userId = $userId;
    }

    public function stopPropagation(): void
    {
        $this->propagationStopped = true;
    }

    public function isPropagationStopped(): bool
    {
        return $this->propagationStopped;
    }
}

在监听器中,调用 $event->stopPropagation() 方法可以停止事件的传播。

关键点回顾:

  • 事件驱动架构解耦系统组件,提升可扩展性。
  • Swoole/Hyperf 的协程机制提升并发性能,适合异步事件处理。
  • 自定义事件分发器可以灵活控制同步和异步事件触发。
  • Hyperf 内置事件系统提供更便捷的配置和使用方式。
  • 错误处理、重试机制、事件溯源和停止传播是事件处理中的重要考虑因素。

希望今天的讲解对大家有所帮助!谢谢!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注