Java `Event Sourcing` (`Axon Framework`, `EventStore`) 与 `CQRS` 模式深度

大家好,我是今天的主讲人,很高兴能和大家一起聊聊Java领域里两个听起来很高大上,实际上也确实挺有意思的概念:Event Sourcing 和 CQRS。咱们争取用最轻松的方式,把它们掰开了揉碎了,让大家明白它们到底是什么,有什么用,以及怎么在实际项目里用起来。

开场白:故事的开始

咱们先从一个故事开始。假设你是一家银行的CTO,你的系统需要记录每一笔交易。传统的做法是直接更新数据库里的账户余额。但是,如果有一天,审计部门突然要查一笔交易的来龙去脉,或者更可怕的是,数据库崩了,数据丢失了,怎么办?

Event Sourcing就像是给你的银行系统装了一个录像机,它不直接修改账户余额,而是记录每一笔发生的事件(比如存款、取款、转账),这些事件就像录像带一样,完整地记录了系统的每一次状态变化。

CQRS呢,就像银行里分开了柜台和后台。柜台负责快速处理客户的请求(查询余额、转账),而后台负责慢慢地处理更复杂的事情(生成报表、风险控制)。这样可以避免柜台被后台拖慢,提高整体效率。

好,故事讲完了,咱们进入正题。

第一章:Event Sourcing – 记录历史的艺术

  • 什么是Event Sourcing?

Event Sourcing 是一种架构模式,它不直接存储实体的当前状态,而是存储导致实体状态变化的一系列事件。你可以通过回放这些事件来重建实体的任何历史状态。

  • 核心概念

    • 事件 (Event): 表示系统中发生的某种状态变化。例如,AccountCreatedEvent (账户创建事件), MoneyDepositedEvent (存款事件), MoneyWithdrawnEvent (取款事件)。
    • 事件存储 (Event Store): 用于持久化存储事件的仓库。它可以是一个专门的数据库 (例如 EventStoreDB), 也可以是关系型数据库或者 NoSQL 数据库。
    • 聚合 (Aggregate): 一个领域模型概念,代表一组相关联的实体,被视为一个整体。例如,Account (账户) 可以是一个聚合。
    • 聚合根 (Aggregate Root): 聚合的入口点,负责处理命令和发布事件。
    • 命令 (Command): 表示用户意图改变系统状态的请求。例如,CreateAccountCommand (创建账户命令), DepositMoneyCommand (存款命令), WithdrawMoneyCommand (取款命令)。
    • 快照 (Snapshot): 为了加快重建聚合状态的速度,可以定期保存聚合的快照。
  • Event Sourcing 的工作流程

    1. 命令处理: 接收到用户发起的命令,例如 DepositMoneyCommand
    2. 聚合根处理: 聚合根接收到命令,进行业务逻辑处理,并产生一个或多个事件,例如 MoneyDepositedEvent
    3. 事件持久化: 将事件存储到 Event Store 中。
    4. 状态更新: 聚合根应用事件,更新自身状态。
    5. 事件发布: 将事件发布到消息队列,供其他服务消费。
    6. 状态重建: 需要查询聚合状态时,从 Event Store 中读取所有历史事件,并按时间顺序重放,重建聚合状态。
  • 代码示例(使用Axon Framework)

首先,引入Axon Framework的依赖(Maven):

<dependency>
    <groupId>org.axonframework</groupId>
    <artifactId>axon-spring-boot-starter</artifactId>
    <version>4.9.1</version>
</dependency>

定义事件:

import org.axonframework.serialization.Revision;

@Revision("1") // 推荐加上 Revision,方便事件演化
public class AccountCreatedEvent {
    private final String accountId;
    private final String owner;

    public AccountCreatedEvent(String accountId, String owner) {
        this.accountId = accountId;
        this.owner = owner;
    }

    public String getAccountId() {
        return accountId;
    }

    public String getOwner() {
        return owner;
    }
}

@Revision("1")
public class MoneyDepositedEvent {
    private final String accountId;
    private final double amount;

    public MoneyDepositedEvent(String accountId, double amount) {
        this.accountId = accountId;
        this.amount = amount;
    }

    public String getAccountId() {
        return accountId;
    }

    public double getAmount() {
        return amount;
    }
}

定义命令:

import org.axonframework.modelling.command.TargetAggregateIdentifier;

public class CreateAccountCommand {
    @TargetAggregateIdentifier
    private final String accountId;
    private final String owner;

    public CreateAccountCommand(String accountId, String owner) {
        this.accountId = accountId;
        this.owner = owner;
    }

    public String getAccountId() {
        return accountId;
    }

    public String getOwner() {
        return owner;
    }
}

public class DepositMoneyCommand {
    @TargetAggregateIdentifier
    private final String accountId;
    private final double amount;

    public DepositMoneyCommand(String accountId, double amount) {
        this.accountId = accountId;
        this.amount = amount;
    }

    public String getAccountId() {
        return accountId;
    }

    public double getAmount() {
        return amount;
    }
}

定义聚合根:

import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.spring.stereotype.Aggregate;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;

@Aggregate
public class Account {

    @AggregateIdentifier
    private String accountId;
    private String owner;
    private double balance;

    public Account() {
        // Axon 需要一个无参构造函数
    }

    @CommandHandler
    public Account(CreateAccountCommand command) {
        apply(new AccountCreatedEvent(command.getAccountId(), command.getOwner()));
    }

    @CommandHandler
    public void handle(DepositMoneyCommand command) {
        apply(new MoneyDepositedEvent(command.getAccountId(), command.getAmount()));
    }

    @EventSourcingHandler
    public void on(AccountCreatedEvent event) {
        this.accountId = event.getAccountId();
        this.owner = event.getOwner();
        this.balance = 0;
    }

    @EventSourcingHandler
    public void on(MoneyDepositedEvent event) {
        this.balance += event.getAmount();
    }

    // Getters (省略)
}

解释:

  • @Aggregate: 声明这是一个 Axon 的聚合根。

  • @AggregateIdentifier: 标记聚合根的唯一标识。

  • @CommandHandler: 标记处理命令的方法。 命令处理方法必须是public的。

  • @EventSourcingHandler: 标记处理事件的方法。事件处理方法必须是public的。

  • apply(): 应用事件,触发事件处理流程。

  • Event Sourcing 的优缺点

优点 缺点
完整的审计日志:可以追溯系统的任何状态变化。 查询复杂:需要回放事件才能获取当前状态,查询性能较差。
易于调试和重构:可以回放事件来重现问题,方便调试。 事件演化:事件结构发生变化时,需要考虑如何兼容旧事件。
更好的扩展性:可以基于事件构建不同的视图,满足不同的查询需求。 最终一致性:事件发布和处理是异步的,可能存在最终一致性问题。
业务洞察: 事件数据可以用于分析用户行为和业务趋势。 复杂性: Event Sourcing 增加了系统的复杂性,需要更多的设计和开发工作。

第二章:CQRS – 读写分离的艺术

  • 什么是 CQRS?

CQRS (Command Query Responsibility Segregation) 是一种架构模式,它将系统的读操作(Query)和写操作(Command)分离。这样可以针对不同的操作进行优化,提高系统的性能和可维护性。

  • 核心概念

    • Command: 表示修改系统状态的意图。例如,CreateAccountCommand (创建账户命令), DepositMoneyCommand (存款命令)。
    • Query: 表示查询系统状态的请求。例如,GetAccountBalanceQuery (获取账户余额查询)。
    • Command Model: 负责处理命令,执行业务逻辑,并产生事件。
    • Query Model: 负责处理查询,提供读取数据的接口。
    • 事件总线 (Event Bus): 用于将 Command Model 产生的事件发布到 Query Model。
  • CQRS 的工作流程

    1. 接收命令: 接收到用户发起的命令,例如 DepositMoneyCommand
    2. Command Model 处理: Command Model 接收到命令,进行业务逻辑处理,并产生一个或多个事件,例如 MoneyDepositedEvent
    3. 事件发布: 将事件发布到事件总线。
    4. Query Model 更新: Query Model 订阅事件总线,接收到事件后,更新自身的查询模型。
    5. 处理查询: 接收到用户发起的查询,例如 GetAccountBalanceQuery,直接从 Query Model 中读取数据。
  • 代码示例 (结合 Event Sourcing 和 Axon Framework)

继续使用上面的例子,我们需要创建 Query Model 和相应的 Query Handler。

定义查询:

public class GetAccountBalanceQuery {
    private final String accountId;

    public GetAccountBalanceQuery(String accountId) {
        this.accountId = accountId;
    }

    public String getAccountId() {
        return accountId;
    }
}

定义 Query Model (一个简单的DTO):

import javax.persistence.Entity;
import javax.persistence.Id;

@Entity
public class AccountBalance {

    @Id
    private String accountId;
    private double balance;

    public AccountBalance() {
        // JPA 需要一个无参构造函数
    }

    public AccountBalance(String accountId, double balance) {
        this.accountId = accountId;
        this.balance = balance;
    }

    public String getAccountId() {
        return accountId;
    }

    public double getBalance() {
        return balance;
    }

    public void setBalance(double balance) {
        this.balance = balance;
    }
}

定义 Query Handler:

import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.stereotype.Component;

import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

@Component
public class AccountBalanceProjection {

    @PersistenceContext
    private EntityManager entityManager;

    @EventHandler
    public void on(AccountCreatedEvent event) {
        AccountBalance accountBalance = new AccountBalance(event.getAccountId(), 0);
        entityManager.persist(accountBalance);
    }

    @EventHandler
    public void on(MoneyDepositedEvent event) {
        AccountBalance accountBalance = entityManager.find(AccountBalance.class, event.getAccountId());
        if (accountBalance != null) {
            accountBalance.setBalance(accountBalance.getBalance() + event.getAmount());
        } else {
            // 处理账户不存在的情况 (例如,抛出异常)
            throw new IllegalStateException("Account not found: " + event.getAccountId());
        }
    }

    @QueryHandler
    public AccountBalance handle(GetAccountBalanceQuery query) {
        return entityManager.find(AccountBalance.class, query.getAccountId());
    }
}

解释:

  • @Component: 声明这是一个 Spring 组件。

  • @EventHandler: 标记处理事件的方法。 这些方法负责更新 Query Model。

  • @QueryHandler: 标记处理查询的方法。 这些方法负责从 Query Model 中读取数据。

  • EntityManager: 用于与数据库交互。

  • CQRS 的优缺点

优点 缺点
性能优化:可以针对读操作和写操作进行不同的优化,提高系统性能。 复杂性:增加了系统的复杂性,需要更多的设计和开发工作。
可扩展性:可以独立扩展读模型和写模型,提高系统的可扩展性。 最终一致性:读模型和写模型之间存在最终一致性问题。
更好的安全性:可以将读模型暴露给外部,而隐藏写模型,提高系统的安全性。 数据同步:需要考虑如何保证读模型和写模型之间的数据同步。
灵活性:可以根据不同的查询需求,创建不同的读模型,提高系统的灵活性。 事件处理: 需要考虑如何处理事件的顺序性和幂等性。

第三章:Event Sourcing + CQRS – 黄金搭档

Event Sourcing 和 CQRS 通常一起使用,它们可以互相补充,发挥更大的威力。

  • Event Sourcing 提供数据来源: Event Sourcing 负责存储所有事件,作为系统的数据来源。
  • CQRS 提供读写分离: CQRS 将读操作和写操作分离,可以针对不同的操作进行优化。
  • 事件驱动的 Query Model 更新: Command Model 产生事件后,Query Model 订阅事件,并更新自身的查询模型。

这样,我们既可以获得 Event Sourcing 的完整审计日志和易于调试的优点,又可以获得 CQRS 的高性能和可扩展性。

  • 结合 Event Sourcing 和 CQRS 的架构图

一个简单的架构图大概是这样:

[Client] --> [Command Bus] --> [Command Handler] --> [Aggregate Root] --> [Event] --> [Event Bus] --> [Query Model Updater] --> [Query Model] --> [Client]
                                                                                                      ^
                                                                                                      |
                                                                                         [Event Store] -----
  • 实际应用场景

    • 金融系统: 银行、证券、保险等金融系统需要记录每一笔交易,并提供高性能的查询服务。
    • 电商系统: 电商系统需要记录用户的订单、支付、物流等信息,并提供实时的商品库存查询。
    • 物联网系统: 物联网系统需要记录设备的各种状态数据,并提供实时的设备监控和控制。

第四章:Axon Framework – 简化 Event Sourcing 和 CQRS 的利器

Axon Framework 是一个开源的 Java 框架,它提供了构建 Event Sourcing 和 CQRS 应用程序所需的各种组件和工具。

  • Axon Framework 的核心组件

    • Command Bus: 用于路由命令到相应的 Command Handler。
    • Event Bus: 用于发布事件到相应的 Event Handler。
    • Command Handler: 负责处理命令,执行业务逻辑,并产生事件。
    • Event Handler: 负责处理事件,更新 Query Model。
    • Aggregate: 领域模型中的聚合根。
    • Event Store: 用于持久化存储事件的仓库。 Axon 支持多种 Event Store 实现,包括关系型数据库、NoSQL 数据库和专门的 EventStoreDB。
    • Saga: 用于管理跨多个聚合的业务流程。
  • 使用 Axon Framework 的优势

    • 简化开发: Axon Framework 提供了各种组件和工具,可以简化 Event Sourcing 和 CQRS 应用程序的开发。
    • 提高可测试性: Axon Framework 提供了测试工具,可以方便地测试 Command Handler 和 Event Handler。
    • 支持多种 Event Store: Axon Framework 支持多种 Event Store 实现,可以根据实际需求选择合适的 Event Store。
    • 提供 Saga 支持: Axon Framework 提供了 Saga 支持,可以方便地管理跨多个聚合的业务流程。
    • Spring 集成: Axon Framework 深度集成 Spring,可以方便地与 Spring 生态系统中的其他组件集成。
  • Axon Framework 的一些高级特性

    • Event Sourcing Handler 的快照 (Snapshotting): 为了加快重建聚合状态的速度,可以定期保存聚合的快照。
    • 事件 Upcasting: 当事件结构发生变化时,可以使用 Upcaster 将旧事件转换为新事件。
    • Saga 的补偿事务: 在 Saga 流程中,如果某个步骤失败,可以使用补偿事务来回滚之前的操作。
    • 分布式事务管理 (Atomikos): Axon 可以与 Atomikos 集成,实现分布式事务管理。

第五章:EventStoreDB – 专业的 Event Store

EventStoreDB 是一个专门为 Event Sourcing 设计的数据库。它具有高性能、高可用性和高可靠性等特点。

  • EventStoreDB 的特点

    • 基于事件的存储: EventStoreDB 将事件作为一等公民进行存储,可以高效地存储和查询事件。
    • 流 (Stream): EventStoreDB 使用流来组织事件,每个流代表一个聚合的事件序列。
    • 原子性: EventStoreDB 保证事件的原子性写入,要么全部写入成功,要么全部写入失败。
    • 顺序性: EventStoreDB 保证事件的顺序性,事件按照写入的顺序进行存储和读取。
    • 持久性: EventStoreDB 将事件持久化存储到磁盘上,保证数据的可靠性。
    • 订阅: EventStoreDB 支持订阅机制,可以将事件推送到其他服务。
  • 为什么选择 EventStoreDB?

    • 性能: EventStoreDB 针对 Event Sourcing 场景进行了优化,具有更高的性能。
    • 易用性: EventStoreDB 提供了简单的 API 和工具,可以方便地进行事件的存储和查询。
    • 可靠性: EventStoreDB 具有高可用性和高可靠性等特点,可以保证数据的可靠性。
    • 社区支持: EventStoreDB 有一个活跃的社区,可以提供技术支持和帮助。
  • Axon Framework 与 EventStoreDB 的集成

Axon Framework 提供了 EventStoreDB 的集成模块,可以方便地将 Axon Framework 与 EventStoreDB 集成。

总结:选择的艺术

Event Sourcing 和 CQRS 都是强大的架构模式,但它们也增加了系统的复杂性。在选择使用它们之前,需要仔细评估项目的需求和团队的技能。

  • 什么时候适合使用 Event Sourcing 和 CQRS?

    • 需要完整的审计日志: 如果需要追溯系统的任何状态变化,Event Sourcing 是一个不错的选择。
    • 需要高性能的查询服务: 如果需要提供高性能的查询服务,CQRS 可以将读操作和写操作分离,提高系统性能。
    • 领域模型复杂: 如果领域模型比较复杂,Event Sourcing 和 CQRS 可以帮助你更好地组织代码。
    • 需要高可扩展性: 如果需要独立扩展读模型和写模型,CQRS 可以提高系统的可扩展性。
  • 什么时候不适合使用 Event Sourcing 和 CQRS?

    • 项目规模小: 如果项目规模比较小,使用 Event Sourcing 和 CQRS 可能会增加不必要的复杂性。
    • 团队技能不足: 如果团队对 Event Sourcing 和 CQRS 不熟悉,可能会增加开发难度。
    • 数据一致性要求高: 如果数据一致性要求非常高,需要考虑最终一致性带来的影响。

最终,技术的选择应该服务于业务需求。 不要为了用而用,要根据实际情况选择最合适的解决方案。

好了,今天的分享就到这里。希望大家对 Event Sourcing 和 CQRS 有了更深入的了解。 如果大家有什么问题,欢迎提问。 谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注