CQRS(命令查询职责分离)架构在PHP中的实现:结合Event Sourcing的实战

CQRS在PHP中的实现:结合Event Sourcing的实战讲座

大家好,今天我们要深入探讨CQRS(命令查询职责分离)架构在PHP中的实现,并将其与Event Sourcing策略结合。这种组合可以帮助我们构建更具可扩展性、可维护性和审计性的应用程序。

一、CQRS架构的核心思想

CQRS的核心思想是将应用程序的操作分为两类:

  • 命令(Commands): 用于修改系统状态的操作。例如,创建用户、更新产品价格等。
  • 查询(Queries): 用于从系统读取数据的操作。例如,获取用户信息、查询产品列表等。

传统的CRUD(创建、读取、更新、删除)架构通常将读写操作混合在同一个模型或服务中。而CQRS则将它们完全分离,使用不同的模型、服务甚至数据库进行处理。

1.1 CQRS的优势

优势 描述
性能优化 可以为读写操作选择不同的数据库和存储策略。例如,可以使用专门为读取优化的数据库,提高查询性能。
可扩展性 读写操作可以独立扩展。在读操作远多于写操作的场景下,可以独立扩展读模型,避免写操作成为瓶颈。
安全性 可以对命令和查询实施不同的安全策略。例如,只有授权用户才能执行命令,而所有用户都可以执行查询。
代码复杂性降低 命令模型和查询模型可以针对各自的需求进行优化,避免复杂的通用模型。
更好的领域建模 允许使用更适合写操作的命令模型,以及更适合读操作的查询模型。

1.2 CQRS的挑战

  • 复杂性增加: 需要维护两个独立的模型和数据存储。
  • 最终一致性: 读模型可能不是总是最新的,因为需要从写模型同步数据。

二、Event Sourcing:记录状态变更

Event Sourcing是一种持久化数据的方式,它不是直接存储应用程序的当前状态,而是存储导致状态变更的事件序列。

2.1 Event Sourcing的工作原理

  1. 当执行一个命令时,会生成一个或多个事件。
  2. 这些事件被追加到事件存储(Event Store)中。
  3. 要获取应用程序的当前状态,需要重放(replay)事件存储中的所有事件。

2.2 Event Sourcing的优势

优势 描述
审计性 可以追溯应用程序的任何状态变更。
可调试性 可以通过重放事件来调试应用程序。
时间旅行 可以恢复到应用程序的任何历史状态。
数据集成 事件可以被发布到其他系统,实现数据集成。
领域洞察 通过分析事件序列,可以获得对业务流程的深入了解。

2.3 Event Sourcing的挑战

  • 复杂性增加: 需要维护事件存储和事件处理逻辑。
  • 性能问题: 重放所有事件可能需要很长时间。可以使用快照(Snapshot)来缓解这个问题,快照是应用程序在某个时间点的状态的副本。
  • 事件演化: 当事件结构发生变化时,需要处理旧事件的兼容性问题。

三、CQRS与Event Sourcing的结合

CQRS和Event Sourcing可以完美地结合在一起。命令模型负责生成事件,事件存储负责持久化事件,查询模型通过订阅事件来更新其状态。

3.1 架构图

+-----------------+     +-----------------+     +-----------------+
| 命令总线        | --> | 命令处理器      | --> | 领域模型        |
+-----------------+     +-----------------+     +-----------------+
       |                      |
       |                      |  +-----------------+
       |                      |  | 事件发布器      |
       |                      |  +-----------------+
       |                      |         |
       |                      |         V
       |                      |  +-----------------+
       |                      |  | 事件存储        |
       |                      |  +-----------------+
       |                      |         |
       |                      |         V
       |                      |  +-----------------+
       |                      |  | 事件处理器      |
       |                      |  +-----------------+
       |                      |         |
       |                      |         V
       |                      |  +-----------------+
       |                      |  | 查询模型        |
       |                      |  +-----------------+
       |
       V
+-----------------+
| 查询总线        | -->  查询处理器 --> 查询模型 (只读)
+-----------------+

3.2 实现细节

  1. 命令(Commands): 定义应用程序可以执行的操作。例如:CreateUserCommandUpdateProductNameCommand
  2. 命令处理器(Command Handlers): 负责处理命令,并调用领域模型的方法来执行相应的操作。
  3. 领域模型(Domain Model): 包含业务逻辑和数据验证。
  4. 事件(Events): 记录领域模型状态的变更。例如:UserCreatedEventProductNameUpdatedEvent
  5. 事件存储(Event Store): 持久化事件的数据库。可以使用关系型数据库、NoSQL数据库或专门的事件存储数据库。
  6. 事件发布器(Event Publisher): 将事件发布到事件总线,供其他系统或组件订阅。
  7. 事件处理器(Event Handlers): 订阅事件,并更新查询模型。
  8. 查询模型(Query Model): 为查询优化的数据模型。可以使用不同的数据库和存储策略。
  9. 查询(Queries): 定义可以执行的查询操作。例如:GetUserByIdQueryGetProductListQuery
  10. 查询处理器(Query Handlers): 负责执行查询,并从查询模型中获取数据。

四、PHP代码示例

为了更好地理解,我们提供一个简化的PHP代码示例,展示如何使用CQRS和Event Sourcing创建一个用户管理系统。

4.1 定义命令和事件

<?php

// Commands
interface Command {}

class CreateUserCommand implements Command {
    public string $userId;
    public string $username;
    public string $email;

    public function __construct(string $userId, string $username, string $email) {
        $this->userId = $userId;
        $this->username = $username;
        $this->email = $email;
    }
}

class UpdateUserEmailCommand implements Command {
    public string $userId;
    public string $email;

    public function __construct(string $userId, string $email) {
        $this->userId = $userId;
        $this->email = $email;
    }
}

// Events
interface Event {}

class UserCreatedEvent implements Event {
    public string $userId;
    public string $username;
    public string $email;

    public function __construct(string $userId, string $username, string $email) {
        $this->userId = $userId;
        $this->username = $username;
        $this->email = $email;
    }
}

class UserEmailUpdatedEvent implements Event {
    public string $userId;
    public string $email;

    public function __construct(string $userId, string $email) {
        $this->userId = $userId;
        $this->email = $email;
    }
}

4.2 领域模型

<?php

class User {
    private string $userId;
    private string $username;
    private string $email;
    private array $events = []; // 存储应用到 User 聚合上的事件

    public function __construct(string $userId, string $username, string $email) {
        $this->userId = $userId;
        $this->username = $username;
        $this->email = $email;
        $this->recordEvent(new UserCreatedEvent($userId, $username, $email));
    }

    public static function reconstituteFromHistory(array $events): ?User
    {
        if (empty($events)) {
            return null;
        }

        $user = null;
        foreach ($events as $event) {
            if ($event instanceof UserCreatedEvent) {
                $user = new User($event->userId, $event->username, $event->email);
                $user->events = []; // 重建对象时,清空事件集合,因为这些事件已经被应用过了
            } elseif ($event instanceof UserEmailUpdatedEvent) {
                if ($user === null) {
                    throw new Exception('UserCreatedEvent not found before UserEmailUpdatedEvent');
                }
                $user->apply($event);
            }
        }

        return $user;
    }

    public function updateEmail(string $email): void {
        if ($this->email !== $email) {
            $this->email = $email;
            $this->recordEvent(new UserEmailUpdatedEvent($this->userId, $email));
        }
    }

    private function apply(UserEmailUpdatedEvent $event): void {
        $this->email = $event->email;
    }

    private function recordEvent(Event $event): void {
        $this->events[] = $event;
    }

    public function getEvents(): array {
        return $this->events;
    }

    public function clearEvents(): void {
        $this->events = [];
    }

    public function getUserId():string {
        return $this->userId;
    }

    public function getUsername():string {
        return $this->username;
    }

    public function getEmail():string {
        return $this->email;
    }
}

4.3 命令处理器

<?php

interface CommandHandler {
    public function handle(Command $command): void;
}

class CreateUserCommandHandler implements CommandHandler {
    private EventStore $eventStore;

    public function __construct(EventStore $eventStore) {
        $this->eventStore = $eventStore;
    }

    public function handle(Command $command): void {
        if (!($command instanceof CreateUserCommand)) {
            throw new InvalidArgumentException('Invalid command type.');
        }

        $user = new User($command->userId, $command->username, $command->email);
        $events = $user->getEvents();

        foreach ($events as $event) {
           $this->eventStore->append($event);
        }
        $user->clearEvents();

    }
}

class UpdateUserEmailCommandHandler implements CommandHandler {
    private EventStore $eventStore;

    public function __construct(EventStore $eventStore) {
        $this->eventStore = $eventStore;
    }

    public function handle(Command $command): void {
        if (!($command instanceof UpdateUserEmailCommand)) {
            throw new InvalidArgumentException('Invalid command type.');
        }

        // 从事件存储重建 User 聚合
        $events = $this->eventStore->getEventsForAggregate($command->userId);
        $user = User::reconstituteFromHistory($events);

        if ($user === null) {
            throw new Exception("User with ID {$command->userId} not found.");
        }

        $user->updateEmail($command->email);
        $events = $user->getEvents();

        foreach ($events as $event) {
            $this->eventStore->append($event);
        }

        $user->clearEvents();
    }
}

4.4 事件存储接口与简单实现

<?php

interface EventStore {
    public function append(Event $event): void;
    public function getEventsForAggregate(string $aggregateId): array;
}

class InMemoryEventStore implements EventStore {
    private array $events = [];

    public function append(Event $event): void {
        $this->events[] = $event;
    }

    public function getEventsForAggregate(string $aggregateId): array {
        $aggregateEvents = [];
        foreach ($this->events as $event) {
            if (method_exists($event, 'getUserId') && $event->getUserId() === $aggregateId) { // Assuming events have getUserId() method
                $aggregateEvents[] = $event;
            } elseif (property_exists($event, 'userId') && $event->userId === $aggregateId) {
                $aggregateEvents[] = $event;
            }
        }
        return $aggregateEvents;
    }
}

4.5 事件处理器(更新查询模型)

<?php

class UserReadModel {
    private array $users = [];

    public function applyUserCreated(UserCreatedEvent $event): void {
        $this->users[$event->userId] = [
            'username' => $event->username,
            'email' => $event->email,
        ];
    }

    public function applyUserEmailUpdated(UserEmailUpdatedEvent $event): void {
        if (isset($this->users[$event->userId])) {
            $this->users[$event->userId]['email'] = $event->email;
        }
    }

    public function getUser(string $userId): ?array {
        return $this->users[$userId] ?? null;
    }
}

class UserEventHandler {
    private UserReadModel $readModel;

    public function __construct(UserReadModel $readModel) {
        $this->readModel = $readModel;
    }

    public function handle(Event $event): void {
        if ($event instanceof UserCreatedEvent) {
            $this->readModel->applyUserCreated($event);
        } elseif ($event instanceof UserEmailUpdatedEvent) {
            $this->readModel->applyUserEmailUpdated($event);
        }
    }
}

4.6 查询与查询处理器

<?php

interface Query {}

class GetUserByIdQuery implements Query {
    public string $userId;

    public function __construct(string $userId) {
        $this->userId = $userId;
    }
}

interface QueryHandler {
    public function handle(Query $query): ?array;
}

class GetUserByIdQueryHandler implements QueryHandler {
    private UserReadModel $readModel;

    public function __construct(UserReadModel $readModel) {
        $this->readModel = $readModel;
    }

    public function handle(Query $query): ?array {
        if (!($query instanceof GetUserByIdQuery)) {
            throw new InvalidArgumentException('Invalid query type.');
        }

        return $this->readModel->getUser($query->userId);
    }
}

4.7 使用示例

<?php

// 初始化
$eventStore = new InMemoryEventStore();
$readModel = new UserReadModel();
$userEventHandler = new UserEventHandler($readModel);

// 创建用户
$createUserCommand = new CreateUserCommand('user123', 'JohnDoe', '[email protected]');
$createUserCommandHandler = new CreateUserCommandHandler($eventStore);
$createUserCommandHandler->handle($createUserCommand);

// 更新用户邮箱
$updateUserEmailCommand = new UpdateUserEmailCommand('user123', '[email protected]');
$updateUserEmailCommandHandler = new UpdateUserEmailCommandHandler($eventStore);
$updateUserEmailCommandHandler->handle($updateUserEmailCommand);

// 读取用户信息
$getUserByIdQuery = new GetUserByIdQuery('user123');
$getUserByIdQueryHandler = new GetUserByIdQueryHandler($readModel);
$user = $getUserByIdQueryHandler->handle($getUserByIdQuery);

print_r($user); // 输出: Array ( [username] => JohnDoe [email] => [email protected] )

// 应用所有事件到 Read Model
$allEvents = $eventStore->getEventsForAggregate('user123');
foreach($allEvents as $event) {
    $userEventHandler->handle($event);
}

// 再次读取用户信息
$getUserByIdQuery = new GetUserByIdQuery('user123');
$getUserByIdQueryHandler = new GetUserByIdQueryHandler($readModel);
$user = $getUserByIdQueryHandler->handle($getUserByIdQuery);

print_r($user); // 输出: Array ( [username] => JohnDoe [email] => [email protected] )

五、事件总线(Event Bus)

上面的例子中,事件处理器直接从事件存储中读取事件。在一个更复杂的系统中,可以使用事件总线来实现事件的发布和订阅。

5.1 事件总线的实现方式

  • 内存事件总线: 最简单的实现方式,适用于单进程应用程序。
  • 消息队列: 使用消息队列(例如RabbitMQ、Kafka)来实现事件的异步发布和订阅。
  • 数据库: 使用数据库表来存储事件,并使用轮询或触发器来实现事件的发布和订阅。

5.2 示例:使用内存事件总线

<?php

interface EventBus {
    public function publish(Event $event): void;
    public function subscribe(string $eventClass, callable $handler): void;
}

class InMemoryEventBus implements EventBus {
    private array $handlers = [];

    public function publish(Event $event): void {
        $eventClass = get_class($event);
        if (isset($this->handlers[$eventClass])) {
            foreach ($this->handlers[$eventClass] as $handler) {
                $handler($event);
            }
        }
    }

    public function subscribe(string $eventClass, callable $handler): void {
        if (!isset($this->handlers[$eventClass])) {
            $this->handlers[$eventClass] = [];
        }
        $this->handlers[$eventClass][] = $handler;
    }
}

六、快照(Snapshots)

当事件序列很长时,重放所有事件可能会导致性能问题。可以使用快照来缓解这个问题。快照是应用程序在某个时间点的状态的副本。

6.1 快照的工作原理

  1. 定期创建应用程序的快照。
  2. 当需要获取应用程序的当前状态时,首先加载最新的快照。
  3. 然后,重放快照之后发生的事件。

6.2 示例

<?php

interface SnapshotStore {
    public function saveSnapshot(string $aggregateId, int $version, object $snapshot): void;
    public function getSnapshot(string $aggregateId): ?array; // 返回 [snapshot, version]
}

class InMemorySnapshotStore implements SnapshotStore {
    private array $snapshots = [];

    public function saveSnapshot(string $aggregateId, int $version, object $snapshot): void {
        $this->snapshots[$aggregateId] = ['snapshot' => $snapshot, 'version' => $version];
    }

    public function getSnapshot(string $aggregateId): ?array {
        return $this->snapshots[$aggregateId] ?? null;
    }
}

// 修改 User 聚合以支持快照
class User {
    // ... (之前的属性和方法)

    public function createSnapshot(): object {
        return (object) [
            'userId' => $this->userId,
            'username' => $this->username,
            'email' => $this->email,
        ];
    }

    public static function reconstituteFromSnapshot(object $snapshot): User {
        $user = new User($snapshot->userId, $snapshot->username, $snapshot->email);
        $user->events = []; // 清空事件
        return $user;
    }
}

// 修改 EventStore 以支持快照
class InMemoryEventStore implements EventStore {
    // ... (之前的属性和方法)
    private SnapshotStore $snapshotStore;

    public function __construct(SnapshotStore $snapshotStore) {
        $this->snapshotStore = $snapshotStore;
    }

    public function getEventsForAggregate(string $aggregateId): array {
         $snapshotData = $this->snapshotStore->getSnapshot($aggregateId);
        $events = [];

        if ($snapshotData) {
           // 从快照版本开始筛选事件
           $snapshotVersion = $snapshotData['version'];
            foreach ($this->events as $event) {
                if (property_exists($event, 'userId') && $event->userId === $aggregateId &&  property_exists($event, 'version') && $event->version > $snapshotVersion )
                {
                    $events[] = $event;
                }
            }
        } else {
             foreach ($this->events as $event) {
                if (property_exists($event, 'userId') && $event->userId === $aggregateId) {
                    $events[] = $event;
                }
            }
        }

        return $events;
    }
}

// 定期创建快照
$snapshotStore = new InMemorySnapshotStore();
$user = User::reconstituteFromHistory($eventStore->getEventsForAggregate('user123'));
$snapshot = $user->createSnapshot();
$snapshotStore->saveSnapshot('user123', count($eventStore->getEventsForAggregate('user123')), $snapshot);

七、事件演化(Event Versioning)

当事件结构发生变化时,需要处理旧事件的兼容性问题。

7.1 事件演化的策略

  • 向上转型(Upcasting): 将旧事件转换为新事件。
  • 平行事件版本: 同时维护多个版本的事件。
  • 忽略旧事件: 直接忽略旧事件(仅在可以接受数据丢失的情况下使用)。

7.2 示例:向上转型

<?php

// 假设 UserEmailUpdatedEvent 的结构发生了变化,添加了一个新的字段:reason
class UserEmailUpdatedEventV2 implements Event {
    public string $userId;
    public string $email;
    public string $reason; // 新增字段

    public function __construct(string $userId, string $email, string $reason) {
        $this->userId = $userId;
        $this->email = $email;
        $this->reason = $reason;
    }
}

// 创建一个向上转型器
class UserEmailUpdatedEventUpcaster {
    public function upcast(UserEmailUpdatedEvent $event): UserEmailUpdatedEventV2 {
        return new UserEmailUpdatedEventV2($event->userId, $event->email, 'unknown');
    }
}

// 在事件处理器中使用向上转型器
class UserEventHandler {
    private UserReadModel $readModel;
    private UserEmailUpdatedEventUpcaster $upcaster;

    public function __construct(UserReadModel $readModel, UserEmailUpdatedEventUpcaster $upcaster) {
        $this->readModel = $readModel;
        $this->upcaster = $upcaster;
    }

    public function handle(Event $event): void {
        if ($event instanceof UserCreatedEvent) {
            $this->readModel->applyUserCreated($event);
        } elseif ($event instanceof UserEmailUpdatedEvent) {
            $eventV2 = $this->upcaster->upcast($event);
            $this->readModel->applyUserEmailUpdated($eventV2);
        }  elseif ($event instanceof UserEmailUpdatedEventV2) {
            $this->readModel->applyUserEmailUpdated($event);
        }
    }
}

八、总结

CQRS和Event Sourcing是强大的架构模式,可以帮助我们构建更具可扩展性、可维护性和审计性的应用程序。

九、核心要点回顾

CQRS分离了命令和查询,提升了系统灵活性。Event Sourcing记录所有状态变更,便于审计和调试。两者结合,能构建出健壮且可追溯的应用。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注