PHP 消息队列 (Kafka/RabbitMQ) 与 `Event Sourcing` (事件溯源)

嘿,大家好!今天咱们来聊聊PHP里消息队列和Event Sourcing这两位“好基友”。它们经常一起出现,但各自又身怀绝技。咱们争取用最接地气的方式,把它们扒个精光,让大家以后遇到它们不再发怵。

开场白:故事的开始

想象一下,你正在开发一个电商网站。用户下单后,你需要做的事情可多了:

  • 扣减库存
  • 生成订单
  • 发送短信通知
  • 更新用户积分
  • 通知仓库发货

如果这些事情都同步执行,用户得等到猴年马月才能看到“下单成功”的页面。而且,一旦某个环节出了问题,整个下单流程就可能失败,用户体验直线下降。

这时候,消息队列就派上用场了。它可以把这些“繁琐”的任务放到队列里,让它们异步执行,用户下单后,只需要快速生成订单,剩下的事情交给消息队列慢慢处理。

第一位主角:消息队列(Message Queue)

消息队列就像一个“中转站”,负责接收、存储和转发消息。生产者(Producer)将消息发送到队列,消费者(Consumer)从队列中取出消息进行处理。

常见的消息队列:Kafka vs RabbitMQ

PHP程序员最常用的消息队列莫过于Kafka和RabbitMQ了,它们各有千秋:

特性 Kafka RabbitMQ
定位 分布式流处理平台 消息中间件
吞吐量 非常高,适合海量数据 相对较低,但仍然很高
持久化 强制持久化,消息不会丢失 可选持久化,可以配置消息是否持久化
消息模型 发布/订阅 支持多种消息模型(发布/订阅、点对点等)
适用场景 日志收集、实时数据分析、事件驱动架构等 异步任务处理、系统解耦等

PHP中使用消息队列的代码示例

这里以RabbitMQ为例,演示如何在PHP中使用消息队列:

  1. 安装RabbitMQ扩展

    pecl install amqp
  2. 生产者代码(Producer)

    <?php
    
    $exchange = 'my_exchange';
    $queue = 'my_queue';
    $routeKey = 'my_route_key';
    
    $connection = new AMQPConnection([
        'host' => 'localhost',
        'port' => 5672,
        'vhost' => '/',
        'login' => 'guest',
        'password' => 'guest'
    ]);
    
    try {
        $connection->connect();
    } catch (AMQPConnectionException $e) {
        die("Cannot connect to the broker: " . $e->getMessage());
    }
    
    $channel = new AMQPChannel($connection);
    
    $ex = new AMQPExchange($channel);
    $ex->setName($exchange);
    $ex->setType(AMQP_EX_TYPE_DIRECT); //direct, fanout, topic, headers
    $ex->declareExchange();
    
    $q = new AMQPQueue($channel);
    $q->setName($queue);
    $q->declareQueue();
    $q->bind($exchange, $routeKey);
    
    $message = "Hello, RabbitMQ!";
    
    $ex->publish($message, $routeKey);
    
    echo " [x] Sent '$message'n";
    
    $connection->disconnect();
    ?>
  3. 消费者代码(Consumer)

    <?php
    
    $exchange = 'my_exchange';
    $queue = 'my_queue';
    $routeKey = 'my_route_key';
    
    $connection = new AMQPConnection([
        'host' => 'localhost',
        'port' => 5672,
        'vhost' => '/',
        'login' => 'guest',
        'password' => 'guest'
    ]);
    
    try {
        $connection->connect();
    } catch (AMQPConnectionException $e) {
        die("Cannot connect to the broker: " . $e->getMessage());
    }
    
    $channel = new AMQPChannel($connection);
    
    $q = new AMQPQueue($channel);
    $q->setName($queue);
    $q->declareQueue();
    $q->bind($exchange, $routeKey);
    
    echo " [*] Waiting for messages. To exit press CTRL+Cn";
    
    $q->consume(function (AMQPEnvelope $envelope, AMQPQueue $q) {
        $msg = $envelope->getBody();
        echo " [x] Received '" . $msg . "'n";
        // process the message here
        // 在这里处理消息
        $q->ack($envelope->getDeliveryTag());
    });
    
    $connection->disconnect();
    ?>

这段代码演示了如何使用RabbitMQ的Direct Exchange模式,生产者将消息发送到指定的Exchange,Exchange根据Routing Key将消息路由到对应的Queue,消费者从Queue中取出消息进行处理。

第二位主角:Event Sourcing(事件溯源)

Event Sourcing 是一种数据持久化模式,它不直接保存对象的状态,而是保存导致状态变更的事件序列。换句话说,它就像一个“时光机”,记录了对象从诞生到现在的每一个“足迹”。

Event Sourcing 的好处

  • 完整的审计日志: 可以追溯对象的每一个状态变更,方便审计和调试。
  • 时间旅行: 可以回溯到对象的任意历史状态,用于分析和恢复数据。
  • 更好的可扩展性: 可以根据事件进行异步处理,提高系统的响应速度。
  • 领域驱动设计(DDD): 与DDD结合使用,可以更好地表达业务逻辑。

Event Sourcing 的代码示例

咱们用一个简单的Account(账户)类来演示Event Sourcing:

  1. 定义事件接口

    <?php
    
    interface Event
    {
        public function getAggregateId(): string;
        public function getType(): string;
        public function getPayload(): array;
        public function getCreatedAt(): DateTimeImmutable;
    }
    
  2. 定义抽象事件类

    <?php
    
    use DateTimeImmutable;
    use RamseyUuidUuid;
    
    abstract class AbstractEvent implements Event
    {
        protected string $aggregateId;
        protected string $type;
        protected array $payload;
        protected DateTimeImmutable $createdAt;
    
        public function __construct(string $aggregateId, array $payload = [])
        {
            $this->aggregateId = $aggregateId;
            $this->type = (new ReflectionClass($this))->getShortName(); // e.g., 'AccountCreated'
            $this->payload = $payload;
            $this->createdAt = new DateTimeImmutable();
        }
    
        public function getAggregateId(): string
        {
            return $this->aggregateId;
        }
    
        public function getType(): string
        {
            return $this->type;
        }
    
        public function getPayload(): array
        {
            return $this->payload;
        }
    
        public function getCreatedAt(): DateTimeImmutable
        {
            return $this->createdAt;
        }
    }
  3. 定义账户创建事件

    <?php
    
    class AccountCreated extends AbstractEvent
    {
        public function __construct(string $aggregateId, string $ownerName)
        {
            parent::__construct($aggregateId, ['ownerName' => $ownerName]);
        }
    
        public function getOwnerName(): string
        {
            return $this->payload['ownerName'];
        }
    }
  4. 定义存款事件

    <?php
    
    class MoneyDeposited extends AbstractEvent
    {
        public function __construct(string $aggregateId, float $amount)
        {
            parent::__construct($aggregateId, ['amount' => $amount]);
        }
    
        public function getAmount(): float
        {
            return $this->payload['amount'];
        }
    }
  5. 定义账户类

    <?php
    
    class Account
    {
        private string $id;
        private string $ownerName;
        private float $balance;
    
        public function __construct(string $id, string $ownerName)
        {
            $this->id = $id;
            $this->ownerName = $ownerName;
            $this->balance = 0.0;
        }
    
        public static function create(string $id, string $ownerName): self
        {
            return new self($id, $ownerName);
        }
    
        public function deposit(float $amount): void
        {
            if ($amount <= 0) {
                throw new InvalidArgumentException("Deposit amount must be positive.");
            }
            $this->balance += $amount;
        }
    
        public function getId(): string
        {
            return $this->id;
        }
    
        public function getOwnerName(): string
        {
            return $this->ownerName;
        }
    
        public function getBalance(): float
        {
            return $this->balance;
        }
    
        public function apply(Event $event): void
        {
            // Here, you'd use a switch statement or a similar mechanism
            // to apply the event to the Account's state.  This is
            // where you "rehydrate" the Account.
    
            switch ($event->getType()) {
                case 'AccountCreated':
                    // No need to do anything as the Account is already created in constructor
                    break;
                case 'MoneyDeposited':
                    $this->deposit($event->getPayload()['amount']);
                    break;
    
                // Add more cases as needed
            }
        }
    
        public static function reconstituteFromEvents(string $id, iterable $events): self
        {
            $account = null;
            foreach ($events as $event) {
                 if ($event->getType() === 'AccountCreated') {
                    $account = new self($event->getAggregateId(), $event->getPayload()['ownerName']);
                 } else if ($account) {
                    $account->apply($event);
                 }
            }
    
            if (!$account) {
                throw new Exception("No AccountCreated event found for ID: " . $id);
            }
    
            return $account;
        }
    }
  6. 事件存储接口

    <?php
    
    interface EventStore
    {
        public function append(Event $event): void;
        public function getEventsForAggregate(string $aggregateId): array;
    }
  7. 简单的内存事件存储实现

    <?php
    class InMemoryEventStore implements EventStore
    {
        private array $events = [];
    
        public function append(Event $event): void
        {
            $this->events[] = $event;
        }
    
        public function getEventsForAggregate(string $aggregateId): array
        {
            return array_filter($this->events, function (Event $event) use ($aggregateId) {
                return $event->getAggregateId() === $aggregateId;
            });
        }
    }
  8. 使用示例

    <?php
    
    use RamseyUuidUuid;
    
    // Generate a unique ID for the account
    $accountId = Uuid::uuid4()->toString();
    
    // Create an event store
    $eventStore = new InMemoryEventStore();
    
    // Create a new account
    $account = Account::create($accountId, 'John Doe');
    $event = new AccountCreated($accountId, 'John Doe');
    $eventStore->append($event);
    
    // Deposit some money
    $account->deposit(100);
    $event = new MoneyDeposited($accountId, 100);
    $eventStore->append($event);
    
    // Deposit more money
    $account->deposit(50);
    $event = new MoneyDeposited($accountId, 50);
    $eventStore->append($event);
    
    // Get the account events
    $events = $eventStore->getEventsForAggregate($accountId);
    
    // Reconstitute the account from the events
    $reconstitutedAccount = Account::reconstituteFromEvents($accountId, $events);
    
    // Verify the account balance
    echo "Account balance: " . $reconstitutedAccount->getBalance() . "n"; // Output: Account balance: 150

这个例子演示了如何使用Event Sourcing来创建一个账户,并记录账户的存款事件。通过Account::reconstituteFromEvents 方法,可以从事件序列中恢复账户的状态。

消息队列 + Event Sourcing = 绝配

消息队列和Event Sourcing结合起来,可以构建更加灵活、可扩展的系统。例如:

  • 事件发布: 当有新的事件产生时,将事件发布到消息队列。
  • 异步处理: 消费者从消息队列中取出事件,进行异步处理,例如更新Read Model。

示例场景:更新Read Model

假设我们需要创建一个“账户余额视图”,用于快速查询用户的账户余额。我们可以通过以下步骤实现:

  1. 定义Read Model

    <?php
    
    class AccountBalanceView
    {
        private string $accountId;
        private string $ownerName;
        private float $balance;
    
        public function __construct(string $accountId, string $ownerName, float $balance)
        {
            $this->accountId = $accountId;
            $this->ownerName = $ownerName;
            $this->balance = $balance;
        }
    
        public function getAccountId(): string
        {
            return $this->accountId;
        }
    
        public function getOwnerName(): string
        {
            return $this->ownerName;
        }
    
        public function getBalance(): float
        {
            return $this->balance;
        }
    }
  2. 定义Read Model存储接口

    <?php
    
    interface AccountBalanceViewRepository
    {
        public function find(string $accountId): ?AccountBalanceView;
        public function save(AccountBalanceView $accountBalanceView): void;
    }
  3. 简单的内存Read Model存储实现

    <?php
    
    class InMemoryAccountBalanceViewRepository implements AccountBalanceViewRepository
    {
        private array $views = [];
    
        public function find(string $accountId): ?AccountBalanceView
        {
            return $this->views[$accountId] ?? null;
        }
    
        public function save(AccountBalanceView $accountBalanceView): void
        {
            $this->views[$accountBalanceView->getAccountId()] = $accountBalanceView;
        }
    }
  4. 消费者代码(更新Read Model)

    <?php
    
    // 假设我们从消息队列中接收到事件
    
    function handleEvent(Event $event, AccountBalanceViewRepository $repository): void
    {
        $accountId = $event->getAggregateId();
    
        $view = $repository->find($accountId);
    
        switch ($event->getType()) {
            case 'AccountCreated':
                $view = new AccountBalanceView(
                    $accountId,
                    $event->getPayload()['ownerName'],
                    0.0
                );
                $repository->save($view);
                break;
            case 'MoneyDeposited':
                if ($view) {
                    $currentBalance = $view->getBalance();
                    $newBalance = $currentBalance + $event->getPayload()['amount'];
                    $view = new AccountBalanceView(
                        $accountId,
                        $view->getOwnerName(),
                        $newBalance
                    );
                    $repository->save($view);
                }
                break;
            // 其他事件处理...
        }
    }
    
    // 使用示例
    $repository = new InMemoryAccountBalanceViewRepository();
    $eventStore = new InMemoryEventStore();
    
    // Replay events to rebuild the read model (This could be in a separate process/service)
    foreach ($eventStore->getEventsForAggregate($accountId) as $event) {
        handleEvent($event, $repository);
    }
    
    // Get the account balance view
    $accountBalanceView = $repository->find($accountId);
    
    if ($accountBalanceView) {
        echo "Account balance: " . $accountBalanceView->getBalance() . "n";
    } else {
        echo "Account not found.n";
    }
    

这个例子演示了如何使用消息队列和Event Sourcing来更新Read Model。当有新的事件产生时,将事件发布到消息队列,消费者从消息队列中取出事件,更新Read Model。这样,我们就可以快速查询用户的账户余额,而无需每次都从事件序列中重新计算。

总结

消息队列和Event Sourcing是构建现代、可扩展系统的利器。它们可以解耦系统、提高响应速度、提供审计日志、支持时间旅行。当然,使用它们也会增加系统的复杂性,需要权衡利弊。

希望今天的分享能帮助大家更好地理解消息队列和Event Sourcing,并在实际项目中灵活运用它们。谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注