CQRS与Event Sourcing在PHP中的实现

好的,各位观众,各位朋友,欢迎来到“PHP架构大冒险”特别节目!今天,我们要聊一个听起来高大上,但其实可以很接地气的话题:CQRS(Command Query Responsibility Segregation)与 Event Sourcing 在 PHP 中的实现。

准备好了吗?让我们系好安全带,开启一段充满乐趣和挑战的旅程吧!🚀

第一幕:CQRS,你是谁?从“一股脑”到“各司其职”

想象一下,你是一家餐厅的老板,每天要做的事情堆积如山:

  • 接听顾客的订餐电话
  • 记录顾客的特殊要求(比如不要香菜!😱)
  • 安排厨师准备食材
  • 查看库存,确保食材充足
  • 处理顾客的投诉(希望没有太多!🙏)
  • ……

如果所有的事情都由你一个人负责,那肯定会手忙脚乱,效率低下。这就是传统的“一股脑”式系统,读写操作混杂在一起,性能瓶颈和代码混乱是家常便饭。

CQRS就像是给餐厅引入了专业的管理团队:

  • Command(命令): 负责修改数据,比如接受订单、更新库存。
  • Query(查询): 负责读取数据,比如查看菜单、查询订单状态。

简单来说,CQRS 将读写操作分离到不同的模型中,让它们各司其职,互不干扰。

用PHP代码来解释一下:

// 传统模式:读写混杂
class ProductRepository {
    public function getProduct($id) {
        // 从数据库读取产品信息
        // ...
    }

    public function updateProductPrice($id, $newPrice) {
        // 更新数据库中的产品价格
        // ...
    }
}

// CQRS模式:读写分离
// Command Model
class UpdateProductPriceCommand {
    public $productId;
    public $newPrice;

    public function __construct($productId, $newPrice) {
        $this->productId = $productId;
        $this->newPrice = $newPrice;
    }
}

interface CommandHandler {
    public function handle(object $command): void;
}

class UpdateProductPriceCommandHandler implements CommandHandler {
    private $productRepository;

    public function __construct(ProductRepository $productRepository) {
        $this->productRepository = $productRepository;
    }

    public function handle(object $command): void {
        if (!$command instanceof UpdateProductPriceCommand) {
            throw new InvalidArgumentException('Invalid command type.');
        }

        $this->productRepository->updateProductPrice($command->productId, $command->newPrice);
    }
}

// Query Model
class ProductReadModel {
    public function getProduct($id) {
        // 从优化的只读数据库读取产品信息
        // ...
    }
}

可以看到,UpdateProductPriceCommand 只是一个简单的数据载体,而 UpdateProductPriceCommandHandler 负责实际的业务逻辑。ProductReadModel 则专注于高效的读取操作。

CQRS的优点:

  • 性能提升: 读写操作可以针对性地进行优化,例如读模型可以使用缓存,写模型可以使用更严格的事务控制。
  • 可扩展性: 读写模型可以独立扩展,应对不同的负载压力。
  • 代码清晰: 职责分离,代码结构更清晰,易于维护。
  • 灵活性: 可以针对不同的业务场景选择不同的数据存储方式。

CQRS的缺点:

  • 复杂性增加: 需要维护更多的代码和模型。
  • 数据一致性挑战: 读写分离可能导致数据短暂的不一致,需要采用合适的策略来解决。

第二幕:Event Sourcing,时间的魔法师

想象一下,你是一名侦探,要调查一桩银行账户资金变动的案件。你会怎么做?

  • 传统做法: 直接查看账户的当前余额。但你只能知道最终结果,无法了解资金变动的过程。
  • Event Sourcing做法: 查看银行的交易日志,记录了每一笔存款、取款、转账等事件。你可以还原账户的完整历史,找到真相!

Event Sourcing 不是直接存储数据的当前状态,而是存储一系列的事件(Event),通过重放这些事件来还原数据的状态。

用PHP代码来解释一下:

// Event
interface Event {
    public function getAggregateId(): string;
}

class AccountCreatedEvent implements Event {
    private $accountId;
    private $initialBalance;

    public function __construct(string $accountId, float $initialBalance) {
        $this->accountId = $accountId;
        $this->initialBalance = $initialBalance;
    }

    public function getAggregateId(): string {
        return $this->accountId;
    }

    public function getInitialBalance(): float {
        return $this->initialBalance;
    }
}

class MoneyDepositedEvent implements Event {
    private $accountId;
    private $amount;

    public function __construct(string $accountId, float $amount) {
        $this->accountId = $accountId;
        $this->amount = $amount;
    }

    public function getAggregateId(): string {
        return $this->accountId;
    }

    public function getAmount(): float {
        return $this->amount;
    }
}

// Aggregate Root
class Account {
    private $accountId;
    private $balance;
    private $events = [];

    public function __construct(string $accountId, float $initialBalance) {
        $this->accountId = $accountId;
        $this->balance = $initialBalance;

        $this->recordEvent(new AccountCreatedEvent($accountId, $initialBalance));
    }

    public function deposit(float $amount): void {
        $this->balance += $amount;
        $this->recordEvent(new MoneyDepositedEvent($this->accountId, $amount));
    }

    private function recordEvent(Event $event): void {
        $this->events[] = $event;
    }

    public function getEvents(): array {
        return $this->events;
    }

    public function getBalance(): float {
        return $this->balance;
    }
}

// Event Store
interface EventStore {
    public function append(Event $event): void;
    public function getEventsForAggregate(string $aggregateId): array;
}

class InMemoryEventStore implements EventStore {
    private $events = [];

    public function append(Event $event): void {
        $this->events[] = $event;
    }

    public function getEventsForAggregate(string $aggregateId): array {
        return array_filter($this->events, function (Event $event) use ($aggregateId) {
            return $event->getAggregateId() === $aggregateId;
        });
    }
}

// Rehydrate Account from Events
$eventStore = new InMemoryEventStore();
$accountId = uniqid();
$initialBalance = 100.00;

$account = new Account($accountId, $initialBalance);
$account->deposit(50.00);

foreach ($account->getEvents() as $event) {
    $eventStore->append($event);
}

// Rebuild the account state
$events = $eventStore->getEventsForAggregate($accountId);
$rehydratedAccount = new Account($accountId, 0); // Start from zero, events will update the balance
$reflection = new ReflectionClass($rehydratedAccount);
$balanceProperty = $reflection->getProperty('balance');
$balanceProperty->setAccessible(true);
$balanceProperty->setValue($rehydratedAccount, 0); // Reset to zero

foreach ($events as $event) {
    if ($event instanceof AccountCreatedEvent) {
        $balanceProperty->setValue($rehydratedAccount, $event->getInitialBalance());
    } elseif ($event instanceof MoneyDepositedEvent) {
        $balanceProperty->setValue($rehydratedAccount, $balanceProperty->getValue($rehydratedAccount) + $event->getAmount());
    }
}

echo "Rehydrated Account Balance: " . $rehydratedAccount->getBalance() . PHP_EOL; // Output: 150

在这个例子中,AccountCreatedEventMoneyDepositedEvent 记录了账户的创建和存款事件。Account 是聚合根(Aggregate Root),负责处理业务逻辑和记录事件。EventStore 负责存储和读取事件。

Event Sourcing的优点:

  • 完整历史记录: 可以追溯数据的每一个状态变化,方便审计和调试。
  • 可扩展性: 可以轻松地构建新的读模型,满足不同的查询需求。
  • 领域驱动设计(DDD): 与DDD的思想高度契合,可以更好地表达业务逻辑。
  • 时间旅行: 可以回到过去的状态,进行分析和模拟。

Event Sourcing的缺点:

  • 复杂性增加: 需要维护事件存储和事件处理逻辑。
  • 事件溯源的最终一致性: 从事件重构状态需要时间,可能导致短暂的不一致。
  • 事件的版本控制: 事件结构可能会发生变化,需要考虑如何处理旧版本的事件。

第三幕:CQRS + Event Sourcing,珠联璧合,天下无敌?

CQRS和Event Sourcing就像一对天作之合的搭档,它们可以一起解决复杂系统中的各种问题。

  • CQRS负责分离读写操作,提高性能和可扩展性。
  • Event Sourcing负责记录数据的完整历史,提供审计和分析能力。

将它们结合起来,可以构建一个高度灵活、可扩展、可维护的系统。

一个简单的例子:电商订单系统

  1. Command Model: 负责处理订单创建、支付、发货等命令。
  2. Event Sourcing: 记录订单创建、支付、发货等事件。
  3. Query Model: 负责提供订单列表、订单详情等查询接口。

当用户创建一个订单时,会触发一个 CreateOrderCommand,该命令会被相应的 CommandHandler 处理,并生成一个 OrderCreatedEvent。该事件会被存储到 EventStore 中。

同时,OrderCreatedEvent 也会被发送到其他的订阅者(Subscriber),例如:

  • 库存服务: 减少相应的商品库存。
  • 物流服务: 创建物流订单。
  • 通知服务: 发送订单创建成功的通知。

当用户查询订单列表时,Query Model 会从优化的只读数据库中读取数据,该数据库的数据可以通过订阅事件来实时更新。

PHP代码示例(简化版):

// Event
interface Event {
    public function getName(): string;
    public function getPayload(): array;
}

class OrderCreatedEvent implements Event {
    private $orderId;
    private $customerId;
    private $items;

    public function __construct(string $orderId, string $customerId, array $items) {
        $this->orderId = $orderId;
        $this->customerId = $customerId;
        $this->items = $items;
    }

    public function getName(): string {
        return 'order.created';
    }

    public function getPayload(): array {
        return [
            'orderId' => $this->orderId,
            'customerId' => $this->customerId,
            'items' => $this->items,
        ];
    }
}

// Command
class CreateOrderCommand {
    public $customerId;
    public $items;

    public function __construct(string $customerId, array $items) {
        $this->customerId = $customerId;
        $this->items = $items;
    }
}

// Command Handler
class CreateOrderCommandHandler implements CommandHandler {
    private $eventStore;
    private $eventBus;

    public function __construct(EventStore $eventStore, EventBus $eventBus) {
        $this->eventStore = $eventStore;
        $this->eventBus = $eventBus;
    }

    public function handle(object $command): void {
        if (!$command instanceof CreateOrderCommand) {
            throw new InvalidArgumentException('Invalid command type.');
        }

        $orderId = uniqid();
        $event = new OrderCreatedEvent($orderId, $command->customerId, $command->items);

        $this->eventStore->append($event);
        $this->eventBus->publish($event);
    }
}

// Event Bus
interface EventBus {
    public function publish(Event $event): void;
    public function subscribe(string $eventName, callable $handler): void;
}

class InMemoryEventBus implements EventBus {
    private $subscriptions = [];

    public function publish(Event $event): void {
        $eventName = $event->getName();
        if (isset($this->subscriptions[$eventName])) {
            foreach ($this->subscriptions[$eventName] as $handler) {
                $handler($event);
            }
        }
    }

    public function subscribe(string $eventName, callable $handler): void {
        if (!isset($this->subscriptions[$eventName])) {
            $this->subscriptions[$eventName] = [];
        }
        $this->subscriptions[$eventName][] = $handler;
    }
}

// Subscriber (Example: Inventory Service)
class InventoryService {
    public function __construct(ProductRepository $productRepository) {
        $this->productRepository = $productRepository;
    }

    public function handleOrderCreatedEvent(OrderCreatedEvent $event): void {
        $payload = $event->getPayload();
        foreach ($payload['items'] as $item) {
            $this->productRepository->decreaseStock($item['productId'], $item['quantity']);
        }
    }
}

// Query Model
class OrderReadModel {
    public function getOrder($orderId) {
        // 从优化的只读数据库读取订单信息
        // ...
    }
}

// Usage
$eventStore = new InMemoryEventStore();
$eventBus = new InMemoryEventBus();
$productRepository = new ProductRepository(); // Mock ProductRepository
$inventoryService = new InventoryService($productRepository);
$eventBus->subscribe('order.created', [$inventoryService, 'handleOrderCreatedEvent']);

$commandHandler = new CreateOrderCommandHandler($eventStore, $eventBus);
$command = new CreateOrderCommand('customer123', [
    ['productId' => 'product1', 'quantity' => 2],
    ['productId' => 'product2', 'quantity' => 1],
]);

$commandHandler->handle($command);

// In a real application, you would have a command bus and query bus
// to decouple the command and query handling from the application layer.

这个例子展示了CQRS和Event Sourcing如何协同工作,构建一个可扩展的电商订单系统。 当然,这只是一个非常简化的示例,实际应用中还需要考虑更多的细节,例如:

  • 事件的版本控制
  • 数据一致性
  • 错误处理
  • 性能优化

第四幕:PHP框架与库的加持

在PHP中实现CQRS和Event Sourcing,可以使用一些优秀的框架和库来简化开发:

  • Tactician: 一个轻量级的命令总线库,可以方便地实现命令的派发和处理。
  • Prooph: 一个完整的Event Sourcing解决方案,提供了事件存储、事件总线、聚合根等组件。
  • Laravel Event Sourcing: 一个基于Laravel的Event Sourcing包,简化了在Laravel项目中应用Event Sourcing的流程。
  • Symfony Messenger: Symfony的组件,可用于实现命令总线和事件总线。

这些框架和库可以帮助你快速搭建CQRS和Event Sourcing的基础架构,减少重复劳动,专注于业务逻辑的实现。

第五幕:总结与展望

CQRS和Event Sourcing是强大的架构模式,可以解决复杂系统中的各种问题。但它们也并非银弹,需要根据具体的业务场景进行选择和权衡。

  • 如果你的系统读写操作比例悬殊,或者需要更高的可扩展性和灵活性,那么CQRS可能是一个不错的选择。
  • 如果你的系统需要完整的历史记录、审计能力,或者与领域驱动设计高度契合,那么Event Sourcing可能更适合你。
  • 如果你的系统既需要高性能和可扩展性,又需要完整的历史记录,那么CQRS和Event Sourcing的结合可能是最佳方案。

希望今天的节目能够帮助你更好地理解CQRS和Event Sourcing,并在实际项目中灵活运用。记住,没有最好的架构,只有最适合的架构。

感谢大家的收看,我们下期再见! 👋

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注