PHP `CQRS` `Command Bus` 与 `Query Bus` 的实现

各位观众老爷们,今天咱们唠唠嗑,说说PHP里怎么玩转CQRS,让你的代码瞬间高大上,维护起来也倍儿爽!

开场白:CQRS是个啥?

CQRS,全称Command Query Responsibility Segregation,翻译成人话就是“命令查询职责分离”。 顾名思义,它把咱们的应用程序分成两部分:一部分负责修改数据(Command),另一部分负责查询数据(Query)。 就像餐厅一样,点菜(Command)和上菜(Query)是两个完全不同的流程,分开处理效率更高。

为什么要搞CQRS?

  • 性能优化: 查询和修改的数据模型往往不一样。比如,展示商品列表,可能只需要商品名称、价格和缩略图,而修改商品信息,则需要更多的字段。CQRS允许针对查询和修改分别优化数据模型,提高性能。
  • 复杂度降低: 将读写操作分离,可以简化业务逻辑,降低代码复杂度,提高可维护性。
  • 伸缩性增强: 可以针对读写操作分别进行扩展,例如,增加查询服务器的数量来应对更高的查询负载。
  • 更好的安全性: 可以对命令进行更严格的权限控制,防止非法修改数据。

核心概念:Command Bus & Query Bus

CQRS的核心是Command Bus和Query Bus,它们是连接命令和命令处理器、查询和查询处理器的桥梁。

  • Command Bus: 负责接收命令,并将命令分发给相应的命令处理器。
  • Query Bus: 负责接收查询,并将查询分发给相应的查询处理器。

PHP实现CQRS:撸起袖子开干

咱们一步一步来,先定义一些接口和类,然后用实例演示。

1. 定义接口

<?php

namespace AppCQRS;

// 命令接口
interface CommandInterface
{
}

// 查询接口
interface QueryInterface
{
}

// 命令处理器接口
interface CommandHandlerInterface
{
    public function handle(CommandInterface $command);
}

// 查询处理器接口
interface QueryHandlerInterface
{
    public function handle(QueryInterface $query);
}

// Command Bus 接口
interface CommandBusInterface
{
    public function handle(CommandInterface $command);
}

// Query Bus 接口
interface QueryBusInterface
{
    public function handle(QueryInterface $query);
}

2. 实现Command和Query的基类

<?php
namespace AppCQRS;

abstract class Command implements CommandInterface {}
abstract class Query implements QueryInterface {}

3. 命令和查询示例

<?php

namespace AppCommand;

use AppCQRSCommand;

// 创建用户的命令
class CreateUserCommand extends Command
{
    public string $name;
    public string $email;

    public function __construct(string $name, string $email)
    {
        $this->name = $name;
        $this->email = $email;
    }
}
<?php

namespace AppQuery;

use AppCQRSQuery;

// 获取用户的查询
class GetUserByIdQuery extends Query
{
    public int $id;

    public function __construct(int $id)
    {
        $this->id = $id;
    }
}

4. 命令处理器和查询处理器示例

<?php

namespace AppCommandHandler;

use AppCommandCreateUserCommand;
use AppCQRSCommandHandlerInterface;
use AppUserRepository; // 假设有个UserRepository
use Exception;

class CreateUserCommandHandler implements CommandHandlerInterface
{
    private UserRepository $userRepository;

    public function __construct(UserRepository $userRepository)
    {
        $this->userRepository = $userRepository;
    }

    public function handle(object $command): void //这里用object类型,需要做判断,防止传入类型错误
    {
        if (!$command instanceof CreateUserCommand) {
            throw new Exception('Invalid command type.');
        }

        $this->userRepository->createUser($command->name, $command->email);
    }
}
<?php

namespace AppQueryHandler;

use AppQueryGetUserByIdQuery;
use AppCQRSQueryHandlerInterface;
use AppUserRepository; // 假设有个UserRepository
use Exception;

class GetUserByIdQueryHandler implements QueryHandlerInterface
{
    private UserRepository $userRepository;

    public function __construct(UserRepository $userRepository)
    {
        $this->userRepository = $userRepository;
    }

    public function handle(object $query) //这里用object类型,需要做判断,防止传入类型错误
    {
        if (!$query instanceof GetUserByIdQuery) {
             throw new Exception('Invalid query type.');
        }
        return $this->userRepository->getUserById($query->id);
    }
}

5. 实现Command Bus和Query Bus

这里咱们用一个简单的数组来存储处理器,实际项目中可以用依赖注入容器(比如Symfony的Container、Laravel的Service Container)来管理处理器。

<?php

namespace AppCQRS;

class SimpleCommandBus implements CommandBusInterface
{
    private array $handlers = [];

    public function addHandler(string $commandClass, CommandHandlerInterface $handler): void
    {
        $this->handlers[$commandClass] = $handler;
    }

    public function handle(CommandInterface $command): void
    {
        $commandClass = get_class($command);
        if (!isset($this->handlers[$commandClass])) {
            throw new Exception("No handler registered for command: " . $commandClass);
        }

        $handler = $this->handlers[$commandClass];
        $handler->handle($command);
    }
}
<?php

namespace AppCQRS;

class SimpleQueryBus implements QueryBusInterface
{
    private array $handlers = [];

    public function addHandler(string $queryClass, QueryHandlerInterface $handler): void
    {
        $this->handlers[$queryClass] = $handler;
    }

    public function handle(QueryInterface $query)
    {
        $queryClass = get_class($query);
        if (!isset($this->handlers[$queryClass])) {
            throw new Exception("No handler registered for query: " . $queryClass);
        }

        $handler = $this->handlers[$queryClass];
        return $handler->handle($query);
    }
}

6. 使用示例

<?php

use AppCommandCreateUserCommand;
use AppCommandHandlerCreateUserCommandHandler;
use AppQueryGetUserByIdQuery;
use AppQueryHandlerGetUserByIdQueryHandler;
use AppCQRSSimpleCommandBus;
use AppCQRSSimpleQueryBus;
use AppUserRepository; // 假设有个UserRepository

// 初始化
$userRepository = new UserRepository();
$commandBus = new SimpleCommandBus();
$queryBus = new SimpleQueryBus();

// 注册处理器
$commandBus->addHandler(CreateUserCommand::class, new CreateUserCommandHandler($userRepository));
$queryBus->addHandler(GetUserByIdQuery::class, new GetUserByIdQueryHandler($userRepository));

// 创建用户
$createUserCommand = new CreateUserCommand('张三', '[email protected]');
$commandBus->handle($createUserCommand);

// 获取用户
$getUserByIdQuery = new GetUserByIdQuery(1);
$user = $queryBus->handle($getUserByIdQuery);

var_dump($user);

进阶:中间件(Middleware)

Command Bus和Query Bus可以加入中间件,实现一些通用的功能,比如:

  • 日志记录: 记录每个命令和查询的执行情况。
  • 事务管理: 在命令执行前后开启和提交/回滚事务。
  • 权限验证: 验证用户是否有权限执行某个命令或查询。

中间件示例(Command Bus)

<?php

namespace AppMiddleware;

use AppCQRSCommandInterface;
use AppCQRSCommandBusInterface;

interface CommandMiddlewareInterface
{
    public function process(CommandInterface $command, callable $next);
}

class LoggingMiddleware implements CommandMiddlewareInterface
{
    public function process(CommandInterface $command, callable $next)
    {
        echo "Logging command: " . get_class($command) . "n";
        $result = $next($command);
        echo "Command completed.n";
        return $result;
    }
}

class TransactionMiddleware implements CommandMiddlewareInterface
{
    // 假设有个数据库连接
    private $dbConnection;

    public function __construct($dbConnection)
    {
        $this->dbConnection = $dbConnection;
    }

    public function process(CommandInterface $command, callable $next)
    {
        $this->dbConnection->beginTransaction();
        try {
            $result = $next($command);
            $this->dbConnection->commit();
            return $result;
        } catch (Exception $e) {
            $this->dbConnection->rollBack();
            throw $e;
        }
    }
}

class PipelineCommandBus implements CommandBusInterface
{
    private CommandBusInterface $bus;
    private array $middlewares;

    public function __construct(CommandBusInterface $bus, array $middlewares = [])
    {
        $this->bus = $bus;
        $this->middlewares = $middlewares;
    }

    public function handle(CommandInterface $command)
    {
        $pipeline = function (CommandInterface $command) {
            return $this->bus->handle($command);
        };

        foreach (array_reverse($this->middlewares) as $middleware) {
            $pipeline = function (CommandInterface $command) use ($middleware, $pipeline) {
                return $middleware->process($command, $pipeline);
            };
        }

        return $pipeline($command);
    }
}

// 使用示例
$dbConnection = new stdClass(); // 模拟数据库连接
$dbConnection->inTransaction = false;
$dbConnection->beginTransaction = function() use ($dbConnection) {
  $dbConnection->inTransaction = true;
  echo "开始事务n";
};
$dbConnection->commit = function() use ($dbConnection) {
  $dbConnection->inTransaction = false;
  echo "提交事务n";
};
$dbConnection->rollBack = function() use ($dbConnection) {
  $dbConnection->inTransaction = false;
  echo "回滚事务n";
};

$simpleCommandBus = new SimpleCommandBus();
$simpleCommandBus->addHandler(CreateUserCommand::class, new CreateUserCommandHandler(new UserRepository()));

$pipelineCommandBus = new PipelineCommandBus($simpleCommandBus, [
    new LoggingMiddleware(),
    new TransactionMiddleware($dbConnection),
]);

$createUserCommand = new CreateUserCommand('王五', '[email protected]');
$pipelineCommandBus->handle($createUserCommand);

表格总结:关键点一览

组件 职责 实现方式
Command 表示一个操作,用于修改数据 定义一个Command类,包含操作所需的数据。
Query 表示一个查询,用于获取数据 定义一个Query类,包含查询条件。
Command Handler 处理Command,执行相应的操作 定义一个CommandHandler类,实现CommandHandlerInterface接口。
Query Handler 处理Query,返回查询结果 定义一个QueryHandler类,实现QueryHandlerInterface接口。
Command Bus 接收Command,并将Command分发给相应的Handler 可以使用简单的数组存储Handler,也可以使用依赖注入容器。 可以添加中间件,实现日志记录、事务管理等功能。
Query Bus 接收Query,并将Query分发给相应的Handler 可以使用简单的数组存储Handler,也可以使用依赖注入容器。 可以添加中间件,实现缓存、权限验证等功能。
Middleware 在Command/Query处理前后执行一些通用的操作 定义一个Middleware类,实现相应的接口。

注意事项

  • 命令的幂等性: 尽量保证命令的幂等性,即多次执行同一个命令,结果应该相同。
  • 事件溯源(Event Sourcing): CQRS通常与事件溯源一起使用,将所有状态变更都记录为事件,可以通过回放事件来重建状态。
  • 最终一致性: CQRS可能会导致读写分离,需要考虑最终一致性的问题。

总结:CQRS,用起来真香!

CQRS不是银弹,不是所有项目都适合。但是,对于复杂的业务场景,CQRS可以有效地降低复杂度,提高性能和可维护性。 希望今天的分享能让你对PHP的CQRS实现有一个初步的了解。 各位,下课!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注