PHP `Event Sourcing` 与 `CQRS` 结合:构建可审计、可扩展的系统

各位听众,大家好!我是老码,今天咱们来聊聊一个稍微有点儿高深,但绝对实用有趣的玩意儿:PHP Event Sourcing 与 CQRS 结合,打造可审计、可扩展的系统。这玩意儿听着唬人,其实没那么可怕,咱们一步一步来,保证大家听完之后能撸起袖子就开干。

开场白:你的代码也“碎碎念”吗?

想象一下,你正在开发一个电商系统。用户下单、支付、发货、退货,每个操作都直接修改数据库里的订单状态。这种做法简单粗暴,就像一个沉默寡言的人,啥也不说,直接动手。时间一长,你想知道订单是怎么变成现在这个状态的,就得翻遍代码和日志,简直是噩梦!

Event Sourcing 就像一个“碎碎念”的系统,它会记录下每一个发生过的事件,就像一个孜孜不倦的日记本。CQRS 呢,则像一个“分工明确”的团队,把读和写操作分开处理,让系统更高效。当这两个家伙结合起来,就能打造出一个既健壮又灵活的系统,就像一个既能说会道又能干的超级团队。

第一部分:什么是 Event Sourcing?

Event Sourcing 是一种架构模式,它不直接存储数据的当前状态,而是存储一系列的事件。每个事件都代表了一个状态的改变。要获取数据的当前状态,只需要把这些事件按时间顺序重放一遍就行了。

1.1 为什么要用 Event Sourcing?

  • 可审计性: 所有状态的改变都有记录,可以轻松追溯历史。就像法庭上的证据链,一环扣一环,让你对系统的行为了如指掌。
  • 可重现性: 可以随时回到过去的状态,进行调试或分析。这就像时间机器,让你能够重新体验每一个关键时刻。
  • 解耦: 事件生产者和消费者之间解耦,可以灵活地添加新的功能或修改现有功能。这就像搭积木,可以随意组合,创造出各种各样的模型。
  • 领域驱动设计(DDD): Event Sourcing 非常适合与 DDD 结合使用,可以更好地表达业务逻辑。这就像用专业的语言来描述业务,让代码更贴近现实。

1.2 Event Sourcing 的基本概念

  • Event (事件): 代表系统状态的一次改变。例如,OrderCreatedEvent(订单已创建事件)、PaymentReceivedEvent(收到付款事件)、OrderShippedEvent(订单已发货事件)。
  • Aggregate (聚合): 一组相关对象的集合,被视为一个整体。例如,Order(订单)聚合包含了订单项、收货地址等信息。
  • Event Store (事件存储): 存储所有事件的数据库。通常使用专门的数据库,例如 EventStoreDB、AxonDB,或者关系型数据库也可以。
  • Snapshot (快照): 聚合在某个时间点的状态。为了提高性能,可以定期保存快照,避免每次都重放所有事件。

1.3 Event Sourcing 的 PHP 代码示例

<?php

// 定义一个事件基类
abstract class Event
{
    public $aggregateId; // 聚合ID
    public $occurredOn; // 事件发生时间

    public function __construct(string $aggregateId, DateTimeImmutable $occurredOn = null)
    {
        $this->aggregateId = $aggregateId;
        $this->occurredOn = $occurredOn ?? new DateTimeImmutable();
    }
}

// 定义一个订单创建事件
class OrderCreatedEvent extends Event
{
    public $customerId;
    public $orderItems;

    public function __construct(string $aggregateId, string $customerId, array $orderItems)
    {
        parent::__construct($aggregateId);
        $this->customerId = $customerId;
        $this->orderItems = $orderItems;
    }
}

// 定义一个订单聚合
class Order
{
    private $id;
    private $customerId;
    private $orderItems;
    private $status; // 订单状态

    public function __construct(string $id, string $customerId, array $orderItems)
    {
        $this->id = $id;
        $this->customerId = $customerId;
        $this->orderItems = $orderItems;
        $this->status = 'CREATED'; // 初始状态
    }

    // 应用事件
    public function applyOrderCreatedEvent(OrderCreatedEvent $event)
    {
        $this->id = $event->aggregateId;
        $this->customerId = $event->customerId;
        $this->orderItems = $event->orderItems;
        $this->status = 'CREATED';
    }

    // 创建订单
    public static function create(string $id, string $customerId, array $orderItems) : Order
    {
        $order = new Order($id, $customerId, $orderItems);
        $event = new OrderCreatedEvent($id, $customerId, $orderItems);
        $order->applyOrderCreatedEvent($event); // 将事件应用到自身
        return $order;
    }

    // 从事件流中重建订单
    public static function reconstituteFromHistory(array $events) : Order
    {
        if (empty($events)) {
            throw new Exception("No events to reconstitute the order.");
        }

        $order = null;
        foreach ($events as $event) {
             if ($event instanceof OrderCreatedEvent) {
                $order = new Order($event->aggregateId, $event->customerId, $event->orderItems);
                $order->applyOrderCreatedEvent($event);
             }
             // 其他事件的处理逻辑... 比如 PaymentReceivedEvent, OrderShippedEvent等
        }

        return $order;
    }

    public function getId(): string {
        return $this->id;
    }

    public function getStatus(): string {
        return $this->status;
    }
}

// 示例:创建订单
$orderId = uniqid();
$customerId = 'user123';
$orderItems = [
    ['productId' => 'product1', 'quantity' => 2],
    ['productId' => 'product2', 'quantity' => 1],
];

$order = Order::create($orderId, $customerId, $orderItems);

echo "Order ID: " . $order->getId() . "n";
echo "Order Status: " . $order->getStatus() . "n";

//存储事件到 Event Store 的代码省略,需要根据具体的 Event Store 实现
//例如,使用关系型数据库,需要创建一个表来存储事件,包含 aggregate_id, event_type, payload, occurred_on 等字段

// 从事件流中重建订单(模拟)
$events = [new OrderCreatedEvent($orderId, $customerId, $orderItems)]; // 假设从 Event Store 中获取了这些事件
$reconstitutedOrder = Order::reconstituteFromHistory($events);

echo "Reconstituted Order ID: " . $reconstitutedOrder->getId() . "n";
echo "Reconstituted Order Status: " . $reconstitutedOrder->getStatus() . "n";

?>

代码解释:

  • Event 类是所有事件的基类,包含聚合 ID 和事件发生时间。
  • OrderCreatedEvent 类代表订单创建事件,包含客户 ID 和订单项。
  • Order 类是订单聚合,包含了订单的基本信息和状态。
  • applyOrderCreatedEvent 方法用于将 OrderCreatedEvent 应用到 Order 聚合上,更新其状态。
  • create 方法创建订单,同时产生 OrderCreatedEvent 事件,并将事件应用到订单上。
  • reconstituteFromHistory 方法从事件流中重建订单。

1.4 Event Store 的选择

选择合适的 Event Store 至关重要。以下是一些常见的选择:

Event Store 优点 缺点
EventStoreDB 专门为 Event Sourcing 设计,性能优秀,支持复杂的查询。 学习曲线较陡峭,需要一定的运维成本。
AxonDB 与 Axon Framework 集成,提供完整的 Event Sourcing 和 CQRS 解决方案。 商业产品,需要购买许可证。
关系型数据库 易于上手,成本较低。 性能相对较差,需要自己实现事件存储和查询逻辑。
NoSQL 数据库 具有良好的扩展性,适合存储大量的事件。 需要自己实现事件存储和查询逻辑。

第二部分:什么是 CQRS?

CQRS (Command Query Responsibility Segregation) 是一种架构模式,它将读(Query)和写(Command)操作分离。简单来说,就是把系统分成两个部分:一个负责处理写操作,一个负责处理读操作。

2.1 为什么要用 CQRS?

  • 性能优化: 可以针对读和写操作分别进行优化。例如,读操作可以使用缓存,而写操作可以使用消息队列。
  • 可扩展性: 可以独立地扩展读和写部分,以满足不同的需求。
  • 安全性: 可以对读和写操作进行不同的权限控制。
  • 领域驱动设计(DDD): CQRS 可以更好地支持 DDD,将复杂的业务逻辑分解成更小的、更易于管理的部分。

2.2 CQRS 的基本概念

  • Command (命令): 代表一个写操作。例如,CreateOrderCommand(创建订单命令)、PayOrderCommand(支付订单命令)。
  • Query (查询): 代表一个读操作。例如,GetOrderByIdQuery(根据 ID 获取订单查询)、GetOrdersByCustomerQuery(根据客户获取订单查询)。
  • Command Handler (命令处理器): 处理命令,执行相应的业务逻辑,并产生事件。
  • Query Handler (查询处理器): 处理查询,从数据存储中读取数据,并返回结果。
  • Read Model (读模型): 专门为读操作设计的数据库或数据结构。读模型通常与写模型不同,可以针对读操作进行优化。

2.3 CQRS 的 PHP 代码示例

<?php

// 定义一个命令基类
abstract class Command
{
    public $aggregateId;

    public function __construct(string $aggregateId)
    {
        $this->aggregateId = $aggregateId;
    }
}

// 定义一个创建订单命令
class CreateOrderCommand extends Command
{
    public $customerId;
    public $orderItems;

    public function __construct(string $aggregateId, string $customerId, array $orderItems)
    {
        parent::__construct($aggregateId);
        $this->customerId = $customerId;
        $this->orderItems = $orderItems;
    }
}

// 定义一个命令处理器接口
interface CommandHandler
{
    public function handle(Command $command);
}

// 定义一个创建订单命令处理器
class CreateOrderCommandHandler implements CommandHandler
{
    private $eventStore;

    public function __construct(EventStore $eventStore)
    {
        $this->eventStore = $eventStore;
    }

    public function handle(Command $command)
    {
        if (!($command instanceof CreateOrderCommand)) {
            throw new InvalidArgumentException("Invalid command type.");
        }

        $orderId = $command->aggregateId;
        $customerId = $command->customerId;
        $orderItems = $command->orderItems;

        // 创建订单
        $order = Order::create($orderId, $customerId, $orderItems);

        // 将事件存储到 Event Store
        $events = []; // 假设Order::create会返回生成的事件
        $reflection = new ReflectionClass(Order::class);
        $constructor = $reflection->getConstructor();
        $parameters = $constructor->getParameters();

        $orderId = $command->aggregateId;
        $customerId = $command->customerId;
        $orderItems = $command->orderItems;

        $event = new OrderCreatedEvent($orderId, $customerId, $orderItems);
        $this->eventStore->append($orderId, $event);

        return $order;
    }
}

// 定义一个查询基类
abstract class Query
{
}

// 定义一个根据 ID 获取订单查询
class GetOrderByIdQuery extends Query
{
    public $orderId;

    public function __construct(string $orderId)
    {
        $this->orderId = $orderId;
    }
}

// 定义一个查询处理器接口
interface QueryHandler
{
    public function handle(Query $query);
}

// 定义一个根据 ID 获取订单查询处理器
class GetOrderByIdQueryHandler implements QueryHandler
{
    private $readModel;

    public function __construct(ReadModel $readModel)
    {
        $this->readModel = $readModel;
    }

    public function handle(Query $query)
    {
        if (!($query instanceof GetOrderByIdQuery)) {
            throw new InvalidArgumentException("Invalid query type.");
        }

        $orderId = $query->orderId;

        // 从读模型中获取订单
        $order = $this->readModel->getOrderById($orderId);

        return $order;
    }
}

// 定义一个 EventStore 接口
interface EventStore {
    public function append(string $aggregateId, Event $event): void;
    public function getEventsForAggregate(string $aggregateId): array;
}

// 示例:使用 CQRS 创建订单
$orderId = uniqid();
$customerId = 'user123';
$orderItems = [
    ['productId' => 'product1', 'quantity' => 2],
    ['productId' => 'product2', 'quantity' => 1],
];

// 初始化 EventStore (这里只是一个简单的示例,实际应用中需要使用具体的 EventStore 实现)
class InMemoryEventStore implements EventStore {
    private $events = [];

    public function append(string $aggregateId, Event $event): void {
        if (!isset($this->events[$aggregateId])) {
            $this->events[$aggregateId] = [];
        }
        $this->events[$aggregateId][] = $event;
    }

    public function getEventsForAggregate(string $aggregateId): array {
        return $this->events[$aggregateId] ?? [];
    }
}

$eventStore = new InMemoryEventStore();

// 创建命令
$createOrderCommand = new CreateOrderCommand($orderId, $customerId, $orderItems);

// 创建命令处理器
$createOrderCommandHandler = new CreateOrderCommandHandler($eventStore);

// 处理命令
$order = $createOrderCommandHandler->handle($createOrderCommand);

echo "Order ID: " . $order->getId() . "n";
echo "Order Status: " . $order->getStatus() . "n";

// 初始化 ReadModel (这里只是一个简单的示例,实际应用中需要使用具体的 ReadModel 实现)
class InMemoryReadModel {
    private $orders = [];

    public function __construct(EventStore $eventStore) {
        // 初始化 ReadModel,从 EventStore 中重建数据
        $allOrders = [];
        foreach ($eventStore->events as $aggregateId => $events) {
            $allOrders[$aggregateId] = Order::reconstituteFromHistory($events);
        }
        $this->orders = $allOrders;
    }

    public function getOrderById(string $orderId): ?Order {
        return $this->orders[$orderId] ?? null;
    }
}

$readModel = new InMemoryReadModel($eventStore);

// 创建查询
$getOrderByIdQuery = new GetOrderByIdQuery($orderId);

// 创建查询处理器
$getOrderByIdQueryHandler = new GetOrderByIdQueryHandler($readModel);

// 处理查询
$order = $getOrderByIdQueryHandler->handle($getOrderByIdQuery);

echo "Retrieved Order ID: " . $order->getId() . "n";
echo "Retrieved Order Status: " . $order->getStatus() . "n";

?>

代码解释:

  • Command 类是所有命令的基类,包含聚合 ID。
  • CreateOrderCommand 类代表创建订单命令,包含客户 ID 和订单项。
  • CommandHandler 接口定义了命令处理器的接口。
  • CreateOrderCommandHandler 类处理创建订单命令,调用 Order::create 方法创建订单,并将事件存储到 Event Store。
  • Query 类是所有查询的基类。
  • GetOrderByIdQuery 类代表根据 ID 获取订单查询,包含订单 ID。
  • QueryHandler 接口定义了查询处理器的接口。
  • GetOrderByIdQueryHandler 类处理根据 ID 获取订单查询,从读模型中获取订单。
  • ReadModel 类是读模型,专门为读操作设计的数据存储。

2.4 Read Model 的构建

Read Model 的构建是一个关键环节。可以根据不同的查询需求,创建不同的 Read Model。例如,可以创建一个包含订单基本信息的 Read Model,用于显示订单列表;还可以创建一个包含订单详细信息的 Read Model,用于显示订单详情。

Read Model 的构建通常通过订阅 Event Store 中的事件来实现。当有新的事件产生时,Read Model 会更新自身的数据。这就像一个默默无闻的工人,时刻关注着 Event Store 的动态,并及时更新数据。

第三部分:Event Sourcing + CQRS:珠联璧合

当 Event Sourcing 和 CQRS 结合起来,就能发挥出更大的威力。Event Sourcing 负责记录所有状态的改变,CQRS 负责分离读和写操作。

3.1 如何结合使用?

  1. 命令处理: 接收命令,执行业务逻辑,产生事件,并将事件存储到 Event Store。
  2. 事件存储: 将所有事件存储到 Event Store 中。
  3. 读模型构建: 订阅 Event Store 中的事件,构建和更新 Read Model。
  4. 查询处理: 接收查询,从 Read Model 中读取数据,并返回结果。

3.2 优势

  • 可审计性: Event Sourcing 保证了所有状态的改变都有记录,可以轻松追溯历史。
  • 可扩展性: CQRS 允许独立地扩展读和写部分,以满足不同的需求。
  • 性能优化: CQRS 可以针对读和写操作分别进行优化。
  • 灵活性: 可以根据不同的查询需求,创建不同的 Read Model。

3.3 示例架构图

[客户端] --> [命令总线] --> [命令处理器] --> [领域模型] --> [事件] --> [事件总线] --> [事件存储]
                                                                        |
                                                                        V
                                                                   [投影器] --> [读模型] --> [查询总线] --> [查询处理器] --> [客户端]

第四部分:注意事项

  • 复杂性: Event Sourcing 和 CQRS 增加了系统的复杂性,需要仔细权衡。
  • 最终一致性: Read Model 的更新是异步的,可能存在最终一致性问题。需要考虑如何处理最终一致性问题。
  • 事件版本控制: 当事件的结构发生改变时,需要进行版本控制,以保证历史事件能够正确地被处理。
  • 事件溯源数据库的选择: 选择适合的事件溯源数据库,充分考量性能和可扩展性需求。

第五部分:总结

Event Sourcing 和 CQRS 是一种强大的架构模式,可以打造可审计、可扩展的系统。虽然它们增加了系统的复杂性,但带来的好处也是显而易见的。

老码的建议:

  • 从小处着手,逐步引入 Event Sourcing 和 CQRS。
  • 选择合适的 Event Store 和框架。
  • 深入理解业务逻辑,合理地划分聚合和事件。
  • 关注最终一致性问题,并采取相应的措施。

希望今天的分享对大家有所帮助。 记住,代码不是死的,关键在于理解背后的思想。 实践出真知,赶紧动手试试吧! 咱们下期再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注