Swoole/Hyperf 自定义事件分发器:协程中的同步与异步触发
各位同学,大家好!今天我们来深入探讨一下 Swoole/Hyperf 框架中自定义事件分发器的实现,重点关注在协程环境下如何进行同步和异步事件触发。事件驱动架构是构建可扩展、解耦系统的利器,而 Swoole/Hyperf 提供的协程机制则为事件处理带来了更高的并发性能。
1. 事件驱动架构与 Swoole/Hyperf 的契合
事件驱动架构(EDA)是一种软件架构模式,它通过事件的产生、检测、处理和响应来解耦系统的各个组件。当一个事件发生时,生产者(Producer)发布该事件,而一个或多个消费者(Consumer)订阅该事件并执行相应的处理逻辑。
Swoole/Hyperf 框架基于 Swoole 扩展构建,天然支持协程。协程是一种轻量级的线程,可以在用户态进行切换,避免了线程切换的开销。结合 EDA,我们可以在协程中异步地处理事件,从而提高系统的并发能力。
2. 实现一个简单的事件分发器
首先,我们来实现一个简单的事件分发器,它包含事件的注册、触发和监听功能。
<?php
namespace AppEvent;
use PsrEventDispatcherEventDispatcherInterface;
use PsrEventDispatcherListenerProviderInterface;
use PsrEventDispatcherStoppableEventInterface;
class EventDispatcher implements EventDispatcherInterface
{
/**
* @var ListenerProviderInterface
*/
private ListenerProviderInterface $listenerProvider;
public function __construct(ListenerProviderInterface $listenerProvider)
{
$this->listenerProvider = $listenerProvider;
}
/**
* Dispatches an event to all registered listeners.
*
* @param object $event The event to pass to the listeners.
*
* @return object The event that was passed, now modified by listeners.
*/
public function dispatch(object $event): object
{
foreach ($this->listenerProvider->getListenersForEvent($event) as $listener) {
if ($event instanceof StoppableEventInterface && $event->isPropagationStopped()) {
break;
}
$listener($event);
}
return $event;
}
}
这个 EventDispatcher 类实现了 PsrEventDispatcherEventDispatcherInterface 接口,该接口定义了 dispatch 方法,用于将事件分发给所有注册的监听器。它依赖于 ListenerProviderInterface 接口,用于获取事件对应的监听器列表。
接下来,我们实现一个简单的 ListenerProvider。
<?php
namespace AppEvent;
use PsrEventDispatcherListenerProviderInterface;
class ListenerProvider implements ListenerProviderInterface
{
/**
* @var array<string, array<callable>>
*/
private array $listeners = [];
/**
* @param string $eventClassName
* @param callable $listener
*/
public function addListener(string $eventClassName, callable $listener): void
{
if (!isset($this->listeners[$eventClassName])) {
$this->listeners[$eventClassName] = [];
}
$this->listeners[$eventClassName][] = $listener;
}
/**
* @param object $event An event for which to provide listeners.
*
* @return iterable<callable>
* An iterable (array, iterator, etc.) of callables (closures,
* invokable objects, etc.) that are interested in the passed
* $event.
*/
public function getListenersForEvent(object $event): iterable
{
$eventClassName = get_class($event);
return $this->listeners[$eventClassName] ?? [];
}
}
ListenerProvider 类实现了 PsrEventDispatcherListenerProviderInterface 接口,它维护了一个事件类名与监听器列表的映射关系。addListener 方法用于注册监听器,getListenersForEvent 方法用于获取指定事件的监听器列表。
3. 定义事件和监听器
现在,我们来定义一个事件和一个监听器。
<?php
namespace AppEvent;
class UserRegistered
{
public int $userId;
public function __construct(int $userId)
{
$this->userId = $userId;
}
}
UserRegistered 事件表示用户注册成功,它包含一个 userId 属性。
<?php
namespace AppListener;
use AppEventUserRegistered;
use PsrLogLoggerInterface;
class SendWelcomeEmail
{
private LoggerInterface $logger;
public function __construct(LoggerInterface $logger)
{
$this->logger = $logger;
}
public function __invoke(UserRegistered $event): void
{
// 模拟发送欢迎邮件
$this->logger->info("Sending welcome email to user: {$event->userId}");
sleep(1); // 模拟耗时操作
$this->logger->info("Welcome email sent to user: {$event->userId}");
}
}
SendWelcomeEmail 监听器用于处理 UserRegistered 事件,它会发送一封欢迎邮件。这里我们使用 PsrLogLoggerInterface 记录日志,并使用 sleep(1) 模拟耗时操作,以便演示异步事件处理的效果。
4. 同步触发事件
现在,我们来演示如何同步触发事件。
<?php
use AppEventEventDispatcher;
use AppEventListenerProvider;
use AppEventUserRegistered;
use AppListenerSendWelcomeEmail;
use PsrLogLoggerInterface;
require_once 'vendor/autoload.php';
// 模拟Logger
class Logger implements LoggerInterface
{
public function emergency(string|Stringable $message, array $context = []): void {}
public function alert(string|Stringable $message, array $context = []): void {}
public function critical(string|Stringable $message, array $context = []): void {}
public function error(string|Stringable $message, array $context = []): void {}
public function warning(string|Stringable $message, array $context = []): void {}
public function notice(string|Stringable $message, array $context = []): void {}
public function info(string|Stringable $message, array $context = []): void {echo $message . PHP_EOL;}
public function debug(string|Stringable $message, array $context = []): void {}
public function log($level, string|Stringable $message, array $context = []): void {}
}
$logger = new Logger();
$listenerProvider = new ListenerProvider();
$listenerProvider->addListener(UserRegistered::class, new SendWelcomeEmail($logger));
$eventDispatcher = new EventDispatcher($listenerProvider);
$userRegisteredEvent = new UserRegistered(123);
echo "Dispatching UserRegistered event synchronously..." . PHP_EOL;
$eventDispatcher->dispatch($userRegisteredEvent);
echo "UserRegistered event dispatched synchronously." . PHP_EOL;
这段代码首先创建了一个 ListenerProvider 实例,并将 SendWelcomeEmail 监听器注册到 UserRegistered 事件上。然后,创建了一个 EventDispatcher 实例,并将 ListenerProvider 注入到其中。最后,创建了一个 UserRegistered 事件,并使用 EventDispatcher::dispatch 方法同步触发该事件。
同步触发事件的特点是,dispatch 方法会阻塞当前协程,直到所有的监听器都执行完毕。这意味着,如果某个监听器执行时间较长,会影响整个系统的响应速度。
5. 异步触发事件
为了解决同步触发事件带来的性能问题,我们可以使用协程异步地触发事件。
<?php
use AppEventEventDispatcher;
use AppEventListenerProvider;
use AppEventUserRegistered;
use AppListenerSendWelcomeEmail;
use PsrLogLoggerInterface;
use SwooleCoroutine;
require_once 'vendor/autoload.php';
// 模拟Logger
class Logger implements LoggerInterface
{
public function emergency(string|Stringable $message, array $context = []): void {}
public function alert(string|Stringable $message, array $context = []): void {}
public function critical(string|Stringable $message, array $context = []): void {}
public function error(string|Stringable $message, array $context = []): void {}
public function warning(string|Stringable $message, array $context = []): void {}
public function notice(string|Stringable $message, array $context = []): void {}
public function info(string|Stringable $message, array $context = []): void {echo $message . PHP_EOL;}
public function debug(string|Stringable $message, array $context = []): void {}
public function log($level, string|Stringable $message, array $context = []): void {}
}
$logger = new Logger();
$listenerProvider = new ListenerProvider();
$listenerProvider->addListener(UserRegistered::class, new SendWelcomeEmail($logger));
$eventDispatcher = new EventDispatcher($listenerProvider);
$userRegisteredEvent = new UserRegistered(123);
echo "Dispatching UserRegistered event asynchronously..." . PHP_EOL;
Coroutine::create(function () use ($eventDispatcher, $userRegisteredEvent) {
$eventDispatcher->dispatch($userRegisteredEvent);
});
echo "UserRegistered event dispatched asynchronously." . PHP_EOL;
这段代码与同步触发事件的代码基本相同,唯一的区别是,我们使用 SwooleCoroutine::create 方法创建了一个新的协程,并在该协程中调用 EventDispatcher::dispatch 方法。
异步触发事件的特点是,dispatch 方法不会阻塞当前协程,而是立即返回。监听器会在新的协程中执行,不会影响主协程的运行。这意味着,即使某个监听器执行时间较长,也不会影响整个系统的响应速度。
6. 自定义事件分发器中的协程处理
在实际项目中,我们可能需要更灵活地控制事件的处理方式。例如,我们可能希望某些事件同步处理,而另一些事件异步处理。为此,我们可以对事件分发器进行定制。
<?php
namespace AppEvent;
use PsrEventDispatcherEventDispatcherInterface;
use PsrEventDispatcherListenerProviderInterface;
use PsrEventDispatcherStoppableEventInterface;
use SwooleCoroutine;
class CustomEventDispatcher implements EventDispatcherInterface
{
/**
* @var ListenerProviderInterface
*/
private ListenerProviderInterface $listenerProvider;
/**
* @var array<string, bool> // event_class => async flag
*/
private array $asyncEvents = [];
public function __construct(ListenerProviderInterface $listenerProvider, array $asyncEvents = [])
{
$this->listenerProvider = $listenerProvider;
$this->asyncEvents = $asyncEvents;
}
public function setAsync(string $eventClassName, bool $async = true): void
{
$this->asyncEvents[$eventClassName] = $async;
}
/**
* Dispatches an event to all registered listeners.
*
* @param object $event The event to pass to the listeners.
*
* @return object The event that was passed, now modified by listeners.
*/
public function dispatch(object $event): object
{
$eventClassName = get_class($event);
$async = $this->asyncEvents[$eventClassName] ?? false;
foreach ($this->listenerProvider->getListenersForEvent($event) as $listener) {
if ($event instanceof StoppableEventInterface && $event->isPropagationStopped()) {
break;
}
if ($async) {
Coroutine::create(function () use ($listener, $event) {
$listener($event);
});
} else {
$listener($event);
}
}
return $event;
}
}
在这个 CustomEventDispatcher 类中,我们添加了一个 $asyncEvents 属性,用于指定哪些事件需要异步处理。dispatch 方法会根据 $asyncEvents 属性的值,决定是否在协程中执行监听器。
使用示例:
<?php
use AppEventCustomEventDispatcher;
use AppEventListenerProvider;
use AppEventUserRegistered;
use AppEventOrderCreated;
use AppListenerSendWelcomeEmail;
use AppListenerSendOrderNotification;
use PsrLogLoggerInterface;
require_once 'vendor/autoload.php';
// 模拟Logger
class Logger implements LoggerInterface
{
public function emergency(string|Stringable $message, array $context = []): void {}
public function alert(string|Stringable $message, array $context = []): void {}
public function critical(string|Stringable $message, array $context = []): void {}
public function error(string|Stringable $message, array $context = []): void {}
public function warning(string|Stringable $message, array $context = []): void {}
public function notice(string|Stringable $message, array $context = []): void {}
public function info(string|Stringable $message, array $context = []): void {echo $message . PHP_EOL;}
public function debug(string|Stringable $message, array $context = []): void {}
public function log($level, string|Stringable $message, array $context = []): void {}
}
$logger = new Logger();
$listenerProvider = new ListenerProvider();
$listenerProvider->addListener(UserRegistered::class, new SendWelcomeEmail($logger));
$listenerProvider->addListener(OrderCreated::class, new SendOrderNotification($logger));
$eventDispatcher = new CustomEventDispatcher($listenerProvider, [
UserRegistered::class => true, // UserRegistered event will be dispatched asynchronously
]);
$userRegisteredEvent = new UserRegistered(123);
$orderCreatedEvent = new OrderCreated(456);
echo "Dispatching UserRegistered event..." . PHP_EOL;
$eventDispatcher->dispatch($userRegisteredEvent);
echo "Dispatching OrderCreated event..." . PHP_EOL;
$eventDispatcher->dispatch($orderCreatedEvent);
echo "Events dispatched." . PHP_EOL;
class OrderCreated {
public int $orderId;
public function __construct(int $orderId)
{
$this->orderId = $orderId;
}
}
class SendOrderNotification {
private LoggerInterface $logger;
public function __construct(LoggerInterface $logger)
{
$this->logger = $logger;
}
public function __invoke(OrderCreated $event): void
{
// 模拟发送订单通知
$this->logger->info("Sending order notification for order: {$event->orderId}");
sleep(1); // 模拟耗时操作
$this->logger->info("Order notification sent for order: {$event->orderId}");
}
}
在这个示例中,我们将 UserRegistered 事件设置为异步处理,而 OrderCreated 事件使用同步处理。
7. 使用 Hyperf 的事件系统
Hyperf 框架内置了事件系统,它基于 symfony/event-dispatcher 组件,并提供了更方便的配置和使用方式。
首先,我们需要在 config/autoload/listeners.php 文件中配置监听器。
<?php
declare(strict_types=1);
return [
AppListenerSendWelcomeEmail::class,
AppListenerSendOrderNotification::class,
];
然后,在 config/autoload/events.php 文件中配置事件与监听器的映射关系。
<?php
declare(strict_types=1);
return [
'listeners' => [
AppEventUserRegistered::class => [
AppListenerSendWelcomeEmail::class,
],
AppEventOrderCreated::class => [
AppListenerSendOrderNotification::class,
],
],
];
最后,我们可以使用 HyperfContractEventDispatcherInterface 接口来触发事件。
<?php
namespace AppService;
use AppEventUserRegistered;
use AppEventOrderCreated;
use HyperfContractEventDispatcherInterface;
class UserService
{
private EventDispatcherInterface $eventDispatcher;
public function __construct(EventDispatcherInterface $eventDispatcher)
{
$this->eventDispatcher = $eventDispatcher;
}
public function register(int $userId): void
{
// 用户注册逻辑
// ...
$this->eventDispatcher->dispatch(new UserRegistered($userId));
}
public function createOrder(int $orderId): void
{
// 创建订单逻辑
// ...
$this->eventDispatcher->dispatch(new OrderCreated($orderId));
}
}
Hyperf 的事件系统默认使用协程异步处理事件。如果需要同步处理事件,可以在监听器中实现 HyperfEventContractListenerInterface 接口,并设置 $preserve = true。
<?php
namespace AppListener;
use AppEventUserRegistered;
use HyperfEventContractListenerInterface;
use PsrLogLoggerInterface;
class SendWelcomeEmail implements ListenerInterface
{
private LoggerInterface $logger;
public function __construct(LoggerInterface $logger)
{
$this->logger = $logger;
}
public function process(object $event): void
{
if ($event instanceof UserRegistered) {
// 模拟发送欢迎邮件
$this->logger->info("Sending welcome email to user: {$event->userId}");
sleep(1); // 模拟耗时操作
$this->logger->info("Welcome email sent to user: {$event->userId}");
}
}
public function isEnable(): bool
{
return true;
}
public function isAsync(): bool {
return true; // Set to false for synchronous processing
}
}
表格总结同步与异步事件处理的差异
| 特性 | 同步事件处理 | 异步事件处理 |
|---|---|---|
| 执行方式 | 阻塞当前协程,顺序执行 | 不阻塞当前协程,并发执行 |
| 性能 | 较低,耗时监听器会影响系统响应速度 | 较高,耗时监听器不会影响系统响应速度 |
| 可靠性 | 较高,如果监听器执行失败,会抛出异常 | 较低,如果监听器执行失败,可能无法捕获异常 |
| 适用场景 | 对实时性要求较高,且监听器执行时间较短的事件 | 对实时性要求不高,或监听器执行时间较长的事件 |
8. 错误处理与重试机制
在异步事件处理中,错误处理是一个重要的环节。由于事件是在新的协程中执行的,主协程无法直接捕获监听器抛出的异常。因此,我们需要在监听器中进行错误处理,并采取适当的措施,例如记录日志、发送警报、重试等。
可以使用 try-catch 块捕获监听器中的异常,并使用重试机制来处理临时性的错误。
<?php
namespace AppListener;
use AppEventUserRegistered;
use PsrLogLoggerInterface;
use SwooleCoroutine;
class SendWelcomeEmail
{
private LoggerInterface $logger;
public function __construct(LoggerInterface $logger)
{
$this->logger = $logger;
}
public function __invoke(UserRegistered $event): void
{
$retryCount = 3;
for ($i = 0; $i < $retryCount; $i++) {
try {
// 模拟发送欢迎邮件
$this->logger->info("Sending welcome email to user: {$event->userId}, attempt: {$i + 1}");
// 模拟网络错误或服务器错误
if (rand(0, 1) == 0) {
throw new Exception("Failed to send welcome email");
}
sleep(1); // 模拟耗时操作
$this->logger->info("Welcome email sent to user: {$event->userId}");
return; // 成功发送,退出循环
} catch (Exception $e) {
$this->logger->error("Failed to send welcome email: {$e->getMessage()}");
Coroutine::sleep(1); // 等待一段时间后重试
}
}
$this->logger->error("Failed to send welcome email after {$retryCount} attempts for user: {$event->userId}");
// 可以将事件放入死信队列,稍后手动处理
}
}
9. 事件溯源与持久化
在一些场景中,我们需要对事件进行溯源,即记录事件的发生过程和状态变化。这可以通过将事件持久化到数据库或消息队列来实现。
事件溯源可以帮助我们分析系统的行为,排查问题,以及进行数据恢复。
10. 事件的停止传播
有时候,我们可能希望在某个监听器处理完事件后,停止事件的传播,不再执行后续的监听器。可以通过实现 PsrEventDispatcherStoppableEventInterface 接口来实现。
<?php
namespace AppEvent;
use PsrEventDispatcherStoppableEventInterface;
class UserRegistered implements StoppableEventInterface
{
public int $userId;
private bool $propagationStopped = false;
public function __construct(int $userId)
{
$this->userId = $userId;
}
public function stopPropagation(): void
{
$this->propagationStopped = true;
}
public function isPropagationStopped(): bool
{
return $this->propagationStopped;
}
}
在监听器中,调用 $event->stopPropagation() 方法可以停止事件的传播。
关键点回顾:
- 事件驱动架构解耦系统组件,提升可扩展性。
- Swoole/Hyperf 的协程机制提升并发性能,适合异步事件处理。
- 自定义事件分发器可以灵活控制同步和异步事件触发。
- Hyperf 内置事件系统提供更便捷的配置和使用方式。
- 错误处理、重试机制、事件溯源和停止传播是事件处理中的重要考虑因素。
希望今天的讲解对大家有所帮助!谢谢!