CQRS在PHP中的实现:结合Event Sourcing的实战讲座
大家好,今天我们要深入探讨CQRS(命令查询职责分离)架构在PHP中的实现,并将其与Event Sourcing策略结合。这种组合可以帮助我们构建更具可扩展性、可维护性和审计性的应用程序。
一、CQRS架构的核心思想
CQRS的核心思想是将应用程序的操作分为两类:
- 命令(Commands): 用于修改系统状态的操作。例如,创建用户、更新产品价格等。
- 查询(Queries): 用于从系统读取数据的操作。例如,获取用户信息、查询产品列表等。
传统的CRUD(创建、读取、更新、删除)架构通常将读写操作混合在同一个模型或服务中。而CQRS则将它们完全分离,使用不同的模型、服务甚至数据库进行处理。
1.1 CQRS的优势
| 优势 | 描述 |
|---|---|
| 性能优化 | 可以为读写操作选择不同的数据库和存储策略。例如,可以使用专门为读取优化的数据库,提高查询性能。 |
| 可扩展性 | 读写操作可以独立扩展。在读操作远多于写操作的场景下,可以独立扩展读模型,避免写操作成为瓶颈。 |
| 安全性 | 可以对命令和查询实施不同的安全策略。例如,只有授权用户才能执行命令,而所有用户都可以执行查询。 |
| 代码复杂性降低 | 命令模型和查询模型可以针对各自的需求进行优化,避免复杂的通用模型。 |
| 更好的领域建模 | 允许使用更适合写操作的命令模型,以及更适合读操作的查询模型。 |
1.2 CQRS的挑战
- 复杂性增加: 需要维护两个独立的模型和数据存储。
- 最终一致性: 读模型可能不是总是最新的,因为需要从写模型同步数据。
二、Event Sourcing:记录状态变更
Event Sourcing是一种持久化数据的方式,它不是直接存储应用程序的当前状态,而是存储导致状态变更的事件序列。
2.1 Event Sourcing的工作原理
- 当执行一个命令时,会生成一个或多个事件。
- 这些事件被追加到事件存储(Event Store)中。
- 要获取应用程序的当前状态,需要重放(replay)事件存储中的所有事件。
2.2 Event Sourcing的优势
| 优势 | 描述 |
|---|---|
| 审计性 | 可以追溯应用程序的任何状态变更。 |
| 可调试性 | 可以通过重放事件来调试应用程序。 |
| 时间旅行 | 可以恢复到应用程序的任何历史状态。 |
| 数据集成 | 事件可以被发布到其他系统,实现数据集成。 |
| 领域洞察 | 通过分析事件序列,可以获得对业务流程的深入了解。 |
2.3 Event Sourcing的挑战
- 复杂性增加: 需要维护事件存储和事件处理逻辑。
- 性能问题: 重放所有事件可能需要很长时间。可以使用快照(Snapshot)来缓解这个问题,快照是应用程序在某个时间点的状态的副本。
- 事件演化: 当事件结构发生变化时,需要处理旧事件的兼容性问题。
三、CQRS与Event Sourcing的结合
CQRS和Event Sourcing可以完美地结合在一起。命令模型负责生成事件,事件存储负责持久化事件,查询模型通过订阅事件来更新其状态。
3.1 架构图
+-----------------+ +-----------------+ +-----------------+
| 命令总线 | --> | 命令处理器 | --> | 领域模型 |
+-----------------+ +-----------------+ +-----------------+
| |
| | +-----------------+
| | | 事件发布器 |
| | +-----------------+
| | |
| | V
| | +-----------------+
| | | 事件存储 |
| | +-----------------+
| | |
| | V
| | +-----------------+
| | | 事件处理器 |
| | +-----------------+
| | |
| | V
| | +-----------------+
| | | 查询模型 |
| | +-----------------+
|
V
+-----------------+
| 查询总线 | --> 查询处理器 --> 查询模型 (只读)
+-----------------+
3.2 实现细节
- 命令(Commands): 定义应用程序可以执行的操作。例如:
CreateUserCommand、UpdateProductNameCommand。 - 命令处理器(Command Handlers): 负责处理命令,并调用领域模型的方法来执行相应的操作。
- 领域模型(Domain Model): 包含业务逻辑和数据验证。
- 事件(Events): 记录领域模型状态的变更。例如:
UserCreatedEvent、ProductNameUpdatedEvent。 - 事件存储(Event Store): 持久化事件的数据库。可以使用关系型数据库、NoSQL数据库或专门的事件存储数据库。
- 事件发布器(Event Publisher): 将事件发布到事件总线,供其他系统或组件订阅。
- 事件处理器(Event Handlers): 订阅事件,并更新查询模型。
- 查询模型(Query Model): 为查询优化的数据模型。可以使用不同的数据库和存储策略。
- 查询(Queries): 定义可以执行的查询操作。例如:
GetUserByIdQuery、GetProductListQuery。 - 查询处理器(Query Handlers): 负责执行查询,并从查询模型中获取数据。
四、PHP代码示例
为了更好地理解,我们提供一个简化的PHP代码示例,展示如何使用CQRS和Event Sourcing创建一个用户管理系统。
4.1 定义命令和事件
<?php
// Commands
interface Command {}
class CreateUserCommand implements Command {
public string $userId;
public string $username;
public string $email;
public function __construct(string $userId, string $username, string $email) {
$this->userId = $userId;
$this->username = $username;
$this->email = $email;
}
}
class UpdateUserEmailCommand implements Command {
public string $userId;
public string $email;
public function __construct(string $userId, string $email) {
$this->userId = $userId;
$this->email = $email;
}
}
// Events
interface Event {}
class UserCreatedEvent implements Event {
public string $userId;
public string $username;
public string $email;
public function __construct(string $userId, string $username, string $email) {
$this->userId = $userId;
$this->username = $username;
$this->email = $email;
}
}
class UserEmailUpdatedEvent implements Event {
public string $userId;
public string $email;
public function __construct(string $userId, string $email) {
$this->userId = $userId;
$this->email = $email;
}
}
4.2 领域模型
<?php
class User {
private string $userId;
private string $username;
private string $email;
private array $events = []; // 存储应用到 User 聚合上的事件
public function __construct(string $userId, string $username, string $email) {
$this->userId = $userId;
$this->username = $username;
$this->email = $email;
$this->recordEvent(new UserCreatedEvent($userId, $username, $email));
}
public static function reconstituteFromHistory(array $events): ?User
{
if (empty($events)) {
return null;
}
$user = null;
foreach ($events as $event) {
if ($event instanceof UserCreatedEvent) {
$user = new User($event->userId, $event->username, $event->email);
$user->events = []; // 重建对象时,清空事件集合,因为这些事件已经被应用过了
} elseif ($event instanceof UserEmailUpdatedEvent) {
if ($user === null) {
throw new Exception('UserCreatedEvent not found before UserEmailUpdatedEvent');
}
$user->apply($event);
}
}
return $user;
}
public function updateEmail(string $email): void {
if ($this->email !== $email) {
$this->email = $email;
$this->recordEvent(new UserEmailUpdatedEvent($this->userId, $email));
}
}
private function apply(UserEmailUpdatedEvent $event): void {
$this->email = $event->email;
}
private function recordEvent(Event $event): void {
$this->events[] = $event;
}
public function getEvents(): array {
return $this->events;
}
public function clearEvents(): void {
$this->events = [];
}
public function getUserId():string {
return $this->userId;
}
public function getUsername():string {
return $this->username;
}
public function getEmail():string {
return $this->email;
}
}
4.3 命令处理器
<?php
interface CommandHandler {
public function handle(Command $command): void;
}
class CreateUserCommandHandler implements CommandHandler {
private EventStore $eventStore;
public function __construct(EventStore $eventStore) {
$this->eventStore = $eventStore;
}
public function handle(Command $command): void {
if (!($command instanceof CreateUserCommand)) {
throw new InvalidArgumentException('Invalid command type.');
}
$user = new User($command->userId, $command->username, $command->email);
$events = $user->getEvents();
foreach ($events as $event) {
$this->eventStore->append($event);
}
$user->clearEvents();
}
}
class UpdateUserEmailCommandHandler implements CommandHandler {
private EventStore $eventStore;
public function __construct(EventStore $eventStore) {
$this->eventStore = $eventStore;
}
public function handle(Command $command): void {
if (!($command instanceof UpdateUserEmailCommand)) {
throw new InvalidArgumentException('Invalid command type.');
}
// 从事件存储重建 User 聚合
$events = $this->eventStore->getEventsForAggregate($command->userId);
$user = User::reconstituteFromHistory($events);
if ($user === null) {
throw new Exception("User with ID {$command->userId} not found.");
}
$user->updateEmail($command->email);
$events = $user->getEvents();
foreach ($events as $event) {
$this->eventStore->append($event);
}
$user->clearEvents();
}
}
4.4 事件存储接口与简单实现
<?php
interface EventStore {
public function append(Event $event): void;
public function getEventsForAggregate(string $aggregateId): array;
}
class InMemoryEventStore implements EventStore {
private array $events = [];
public function append(Event $event): void {
$this->events[] = $event;
}
public function getEventsForAggregate(string $aggregateId): array {
$aggregateEvents = [];
foreach ($this->events as $event) {
if (method_exists($event, 'getUserId') && $event->getUserId() === $aggregateId) { // Assuming events have getUserId() method
$aggregateEvents[] = $event;
} elseif (property_exists($event, 'userId') && $event->userId === $aggregateId) {
$aggregateEvents[] = $event;
}
}
return $aggregateEvents;
}
}
4.5 事件处理器(更新查询模型)
<?php
class UserReadModel {
private array $users = [];
public function applyUserCreated(UserCreatedEvent $event): void {
$this->users[$event->userId] = [
'username' => $event->username,
'email' => $event->email,
];
}
public function applyUserEmailUpdated(UserEmailUpdatedEvent $event): void {
if (isset($this->users[$event->userId])) {
$this->users[$event->userId]['email'] = $event->email;
}
}
public function getUser(string $userId): ?array {
return $this->users[$userId] ?? null;
}
}
class UserEventHandler {
private UserReadModel $readModel;
public function __construct(UserReadModel $readModel) {
$this->readModel = $readModel;
}
public function handle(Event $event): void {
if ($event instanceof UserCreatedEvent) {
$this->readModel->applyUserCreated($event);
} elseif ($event instanceof UserEmailUpdatedEvent) {
$this->readModel->applyUserEmailUpdated($event);
}
}
}
4.6 查询与查询处理器
<?php
interface Query {}
class GetUserByIdQuery implements Query {
public string $userId;
public function __construct(string $userId) {
$this->userId = $userId;
}
}
interface QueryHandler {
public function handle(Query $query): ?array;
}
class GetUserByIdQueryHandler implements QueryHandler {
private UserReadModel $readModel;
public function __construct(UserReadModel $readModel) {
$this->readModel = $readModel;
}
public function handle(Query $query): ?array {
if (!($query instanceof GetUserByIdQuery)) {
throw new InvalidArgumentException('Invalid query type.');
}
return $this->readModel->getUser($query->userId);
}
}
4.7 使用示例
<?php
// 初始化
$eventStore = new InMemoryEventStore();
$readModel = new UserReadModel();
$userEventHandler = new UserEventHandler($readModel);
// 创建用户
$createUserCommand = new CreateUserCommand('user123', 'JohnDoe', '[email protected]');
$createUserCommandHandler = new CreateUserCommandHandler($eventStore);
$createUserCommandHandler->handle($createUserCommand);
// 更新用户邮箱
$updateUserEmailCommand = new UpdateUserEmailCommand('user123', '[email protected]');
$updateUserEmailCommandHandler = new UpdateUserEmailCommandHandler($eventStore);
$updateUserEmailCommandHandler->handle($updateUserEmailCommand);
// 读取用户信息
$getUserByIdQuery = new GetUserByIdQuery('user123');
$getUserByIdQueryHandler = new GetUserByIdQueryHandler($readModel);
$user = $getUserByIdQueryHandler->handle($getUserByIdQuery);
print_r($user); // 输出: Array ( [username] => JohnDoe [email] => [email protected] )
// 应用所有事件到 Read Model
$allEvents = $eventStore->getEventsForAggregate('user123');
foreach($allEvents as $event) {
$userEventHandler->handle($event);
}
// 再次读取用户信息
$getUserByIdQuery = new GetUserByIdQuery('user123');
$getUserByIdQueryHandler = new GetUserByIdQueryHandler($readModel);
$user = $getUserByIdQueryHandler->handle($getUserByIdQuery);
print_r($user); // 输出: Array ( [username] => JohnDoe [email] => [email protected] )
五、事件总线(Event Bus)
上面的例子中,事件处理器直接从事件存储中读取事件。在一个更复杂的系统中,可以使用事件总线来实现事件的发布和订阅。
5.1 事件总线的实现方式
- 内存事件总线: 最简单的实现方式,适用于单进程应用程序。
- 消息队列: 使用消息队列(例如RabbitMQ、Kafka)来实现事件的异步发布和订阅。
- 数据库: 使用数据库表来存储事件,并使用轮询或触发器来实现事件的发布和订阅。
5.2 示例:使用内存事件总线
<?php
interface EventBus {
public function publish(Event $event): void;
public function subscribe(string $eventClass, callable $handler): void;
}
class InMemoryEventBus implements EventBus {
private array $handlers = [];
public function publish(Event $event): void {
$eventClass = get_class($event);
if (isset($this->handlers[$eventClass])) {
foreach ($this->handlers[$eventClass] as $handler) {
$handler($event);
}
}
}
public function subscribe(string $eventClass, callable $handler): void {
if (!isset($this->handlers[$eventClass])) {
$this->handlers[$eventClass] = [];
}
$this->handlers[$eventClass][] = $handler;
}
}
六、快照(Snapshots)
当事件序列很长时,重放所有事件可能会导致性能问题。可以使用快照来缓解这个问题。快照是应用程序在某个时间点的状态的副本。
6.1 快照的工作原理
- 定期创建应用程序的快照。
- 当需要获取应用程序的当前状态时,首先加载最新的快照。
- 然后,重放快照之后发生的事件。
6.2 示例
<?php
interface SnapshotStore {
public function saveSnapshot(string $aggregateId, int $version, object $snapshot): void;
public function getSnapshot(string $aggregateId): ?array; // 返回 [snapshot, version]
}
class InMemorySnapshotStore implements SnapshotStore {
private array $snapshots = [];
public function saveSnapshot(string $aggregateId, int $version, object $snapshot): void {
$this->snapshots[$aggregateId] = ['snapshot' => $snapshot, 'version' => $version];
}
public function getSnapshot(string $aggregateId): ?array {
return $this->snapshots[$aggregateId] ?? null;
}
}
// 修改 User 聚合以支持快照
class User {
// ... (之前的属性和方法)
public function createSnapshot(): object {
return (object) [
'userId' => $this->userId,
'username' => $this->username,
'email' => $this->email,
];
}
public static function reconstituteFromSnapshot(object $snapshot): User {
$user = new User($snapshot->userId, $snapshot->username, $snapshot->email);
$user->events = []; // 清空事件
return $user;
}
}
// 修改 EventStore 以支持快照
class InMemoryEventStore implements EventStore {
// ... (之前的属性和方法)
private SnapshotStore $snapshotStore;
public function __construct(SnapshotStore $snapshotStore) {
$this->snapshotStore = $snapshotStore;
}
public function getEventsForAggregate(string $aggregateId): array {
$snapshotData = $this->snapshotStore->getSnapshot($aggregateId);
$events = [];
if ($snapshotData) {
// 从快照版本开始筛选事件
$snapshotVersion = $snapshotData['version'];
foreach ($this->events as $event) {
if (property_exists($event, 'userId') && $event->userId === $aggregateId && property_exists($event, 'version') && $event->version > $snapshotVersion )
{
$events[] = $event;
}
}
} else {
foreach ($this->events as $event) {
if (property_exists($event, 'userId') && $event->userId === $aggregateId) {
$events[] = $event;
}
}
}
return $events;
}
}
// 定期创建快照
$snapshotStore = new InMemorySnapshotStore();
$user = User::reconstituteFromHistory($eventStore->getEventsForAggregate('user123'));
$snapshot = $user->createSnapshot();
$snapshotStore->saveSnapshot('user123', count($eventStore->getEventsForAggregate('user123')), $snapshot);
七、事件演化(Event Versioning)
当事件结构发生变化时,需要处理旧事件的兼容性问题。
7.1 事件演化的策略
- 向上转型(Upcasting): 将旧事件转换为新事件。
- 平行事件版本: 同时维护多个版本的事件。
- 忽略旧事件: 直接忽略旧事件(仅在可以接受数据丢失的情况下使用)。
7.2 示例:向上转型
<?php
// 假设 UserEmailUpdatedEvent 的结构发生了变化,添加了一个新的字段:reason
class UserEmailUpdatedEventV2 implements Event {
public string $userId;
public string $email;
public string $reason; // 新增字段
public function __construct(string $userId, string $email, string $reason) {
$this->userId = $userId;
$this->email = $email;
$this->reason = $reason;
}
}
// 创建一个向上转型器
class UserEmailUpdatedEventUpcaster {
public function upcast(UserEmailUpdatedEvent $event): UserEmailUpdatedEventV2 {
return new UserEmailUpdatedEventV2($event->userId, $event->email, 'unknown');
}
}
// 在事件处理器中使用向上转型器
class UserEventHandler {
private UserReadModel $readModel;
private UserEmailUpdatedEventUpcaster $upcaster;
public function __construct(UserReadModel $readModel, UserEmailUpdatedEventUpcaster $upcaster) {
$this->readModel = $readModel;
$this->upcaster = $upcaster;
}
public function handle(Event $event): void {
if ($event instanceof UserCreatedEvent) {
$this->readModel->applyUserCreated($event);
} elseif ($event instanceof UserEmailUpdatedEvent) {
$eventV2 = $this->upcaster->upcast($event);
$this->readModel->applyUserEmailUpdated($eventV2);
} elseif ($event instanceof UserEmailUpdatedEventV2) {
$this->readModel->applyUserEmailUpdated($event);
}
}
}
八、总结
CQRS和Event Sourcing是强大的架构模式,可以帮助我们构建更具可扩展性、可维护性和审计性的应用程序。
九、核心要点回顾
CQRS分离了命令和查询,提升了系统灵活性。Event Sourcing记录所有状态变更,便于审计和调试。两者结合,能构建出健壮且可追溯的应用。