大家好,我是今天的主讲人,很高兴能和大家一起聊聊Java领域里两个听起来很高大上,实际上也确实挺有意思的概念:Event Sourcing 和 CQRS。咱们争取用最轻松的方式,把它们掰开了揉碎了,让大家明白它们到底是什么,有什么用,以及怎么在实际项目里用起来。
开场白:故事的开始
咱们先从一个故事开始。假设你是一家银行的CTO,你的系统需要记录每一笔交易。传统的做法是直接更新数据库里的账户余额。但是,如果有一天,审计部门突然要查一笔交易的来龙去脉,或者更可怕的是,数据库崩了,数据丢失了,怎么办?
Event Sourcing就像是给你的银行系统装了一个录像机,它不直接修改账户余额,而是记录每一笔发生的事件(比如存款、取款、转账),这些事件就像录像带一样,完整地记录了系统的每一次状态变化。
CQRS呢,就像银行里分开了柜台和后台。柜台负责快速处理客户的请求(查询余额、转账),而后台负责慢慢地处理更复杂的事情(生成报表、风险控制)。这样可以避免柜台被后台拖慢,提高整体效率。
好,故事讲完了,咱们进入正题。
第一章:Event Sourcing – 记录历史的艺术
- 什么是Event Sourcing?
Event Sourcing 是一种架构模式,它不直接存储实体的当前状态,而是存储导致实体状态变化的一系列事件。你可以通过回放这些事件来重建实体的任何历史状态。
-
核心概念
- 事件 (Event): 表示系统中发生的某种状态变化。例如,
AccountCreatedEvent
(账户创建事件),MoneyDepositedEvent
(存款事件),MoneyWithdrawnEvent
(取款事件)。 - 事件存储 (Event Store): 用于持久化存储事件的仓库。它可以是一个专门的数据库 (例如 EventStoreDB), 也可以是关系型数据库或者 NoSQL 数据库。
- 聚合 (Aggregate): 一个领域模型概念,代表一组相关联的实体,被视为一个整体。例如,
Account
(账户) 可以是一个聚合。 - 聚合根 (Aggregate Root): 聚合的入口点,负责处理命令和发布事件。
- 命令 (Command): 表示用户意图改变系统状态的请求。例如,
CreateAccountCommand
(创建账户命令),DepositMoneyCommand
(存款命令),WithdrawMoneyCommand
(取款命令)。 - 快照 (Snapshot): 为了加快重建聚合状态的速度,可以定期保存聚合的快照。
- 事件 (Event): 表示系统中发生的某种状态变化。例如,
-
Event Sourcing 的工作流程
- 命令处理: 接收到用户发起的命令,例如
DepositMoneyCommand
。 - 聚合根处理: 聚合根接收到命令,进行业务逻辑处理,并产生一个或多个事件,例如
MoneyDepositedEvent
。 - 事件持久化: 将事件存储到 Event Store 中。
- 状态更新: 聚合根应用事件,更新自身状态。
- 事件发布: 将事件发布到消息队列,供其他服务消费。
- 状态重建: 需要查询聚合状态时,从 Event Store 中读取所有历史事件,并按时间顺序重放,重建聚合状态。
- 命令处理: 接收到用户发起的命令,例如
-
代码示例(使用Axon Framework)
首先,引入Axon Framework的依赖(Maven):
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>4.9.1</version>
</dependency>
定义事件:
import org.axonframework.serialization.Revision;
@Revision("1") // 推荐加上 Revision,方便事件演化
public class AccountCreatedEvent {
private final String accountId;
private final String owner;
public AccountCreatedEvent(String accountId, String owner) {
this.accountId = accountId;
this.owner = owner;
}
public String getAccountId() {
return accountId;
}
public String getOwner() {
return owner;
}
}
@Revision("1")
public class MoneyDepositedEvent {
private final String accountId;
private final double amount;
public MoneyDepositedEvent(String accountId, double amount) {
this.accountId = accountId;
this.amount = amount;
}
public String getAccountId() {
return accountId;
}
public double getAmount() {
return amount;
}
}
定义命令:
import org.axonframework.modelling.command.TargetAggregateIdentifier;
public class CreateAccountCommand {
@TargetAggregateIdentifier
private final String accountId;
private final String owner;
public CreateAccountCommand(String accountId, String owner) {
this.accountId = accountId;
this.owner = owner;
}
public String getAccountId() {
return accountId;
}
public String getOwner() {
return owner;
}
}
public class DepositMoneyCommand {
@TargetAggregateIdentifier
private final String accountId;
private final double amount;
public DepositMoneyCommand(String accountId, double amount) {
this.accountId = accountId;
this.amount = amount;
}
public String getAccountId() {
return accountId;
}
public double getAmount() {
return amount;
}
}
定义聚合根:
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.spring.stereotype.Aggregate;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
@Aggregate
public class Account {
@AggregateIdentifier
private String accountId;
private String owner;
private double balance;
public Account() {
// Axon 需要一个无参构造函数
}
@CommandHandler
public Account(CreateAccountCommand command) {
apply(new AccountCreatedEvent(command.getAccountId(), command.getOwner()));
}
@CommandHandler
public void handle(DepositMoneyCommand command) {
apply(new MoneyDepositedEvent(command.getAccountId(), command.getAmount()));
}
@EventSourcingHandler
public void on(AccountCreatedEvent event) {
this.accountId = event.getAccountId();
this.owner = event.getOwner();
this.balance = 0;
}
@EventSourcingHandler
public void on(MoneyDepositedEvent event) {
this.balance += event.getAmount();
}
// Getters (省略)
}
解释:
-
@Aggregate
: 声明这是一个 Axon 的聚合根。 -
@AggregateIdentifier
: 标记聚合根的唯一标识。 -
@CommandHandler
: 标记处理命令的方法。 命令处理方法必须是public的。 -
@EventSourcingHandler
: 标记处理事件的方法。事件处理方法必须是public的。 -
apply()
: 应用事件,触发事件处理流程。 -
Event Sourcing 的优缺点
优点 | 缺点 |
---|---|
完整的审计日志:可以追溯系统的任何状态变化。 | 查询复杂:需要回放事件才能获取当前状态,查询性能较差。 |
易于调试和重构:可以回放事件来重现问题,方便调试。 | 事件演化:事件结构发生变化时,需要考虑如何兼容旧事件。 |
更好的扩展性:可以基于事件构建不同的视图,满足不同的查询需求。 | 最终一致性:事件发布和处理是异步的,可能存在最终一致性问题。 |
业务洞察: 事件数据可以用于分析用户行为和业务趋势。 | 复杂性: Event Sourcing 增加了系统的复杂性,需要更多的设计和开发工作。 |
第二章:CQRS – 读写分离的艺术
- 什么是 CQRS?
CQRS (Command Query Responsibility Segregation) 是一种架构模式,它将系统的读操作(Query)和写操作(Command)分离。这样可以针对不同的操作进行优化,提高系统的性能和可维护性。
-
核心概念
- Command: 表示修改系统状态的意图。例如,
CreateAccountCommand
(创建账户命令),DepositMoneyCommand
(存款命令)。 - Query: 表示查询系统状态的请求。例如,
GetAccountBalanceQuery
(获取账户余额查询)。 - Command Model: 负责处理命令,执行业务逻辑,并产生事件。
- Query Model: 负责处理查询,提供读取数据的接口。
- 事件总线 (Event Bus): 用于将 Command Model 产生的事件发布到 Query Model。
- Command: 表示修改系统状态的意图。例如,
-
CQRS 的工作流程
- 接收命令: 接收到用户发起的命令,例如
DepositMoneyCommand
。 - Command Model 处理: Command Model 接收到命令,进行业务逻辑处理,并产生一个或多个事件,例如
MoneyDepositedEvent
。 - 事件发布: 将事件发布到事件总线。
- Query Model 更新: Query Model 订阅事件总线,接收到事件后,更新自身的查询模型。
- 处理查询: 接收到用户发起的查询,例如
GetAccountBalanceQuery
,直接从 Query Model 中读取数据。
- 接收命令: 接收到用户发起的命令,例如
-
代码示例 (结合 Event Sourcing 和 Axon Framework)
继续使用上面的例子,我们需要创建 Query Model 和相应的 Query Handler。
定义查询:
public class GetAccountBalanceQuery {
private final String accountId;
public GetAccountBalanceQuery(String accountId) {
this.accountId = accountId;
}
public String getAccountId() {
return accountId;
}
}
定义 Query Model (一个简单的DTO):
import javax.persistence.Entity;
import javax.persistence.Id;
@Entity
public class AccountBalance {
@Id
private String accountId;
private double balance;
public AccountBalance() {
// JPA 需要一个无参构造函数
}
public AccountBalance(String accountId, double balance) {
this.accountId = accountId;
this.balance = balance;
}
public String getAccountId() {
return accountId;
}
public double getBalance() {
return balance;
}
public void setBalance(double balance) {
this.balance = balance;
}
}
定义 Query Handler:
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.stereotype.Component;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
@Component
public class AccountBalanceProjection {
@PersistenceContext
private EntityManager entityManager;
@EventHandler
public void on(AccountCreatedEvent event) {
AccountBalance accountBalance = new AccountBalance(event.getAccountId(), 0);
entityManager.persist(accountBalance);
}
@EventHandler
public void on(MoneyDepositedEvent event) {
AccountBalance accountBalance = entityManager.find(AccountBalance.class, event.getAccountId());
if (accountBalance != null) {
accountBalance.setBalance(accountBalance.getBalance() + event.getAmount());
} else {
// 处理账户不存在的情况 (例如,抛出异常)
throw new IllegalStateException("Account not found: " + event.getAccountId());
}
}
@QueryHandler
public AccountBalance handle(GetAccountBalanceQuery query) {
return entityManager.find(AccountBalance.class, query.getAccountId());
}
}
解释:
-
@Component
: 声明这是一个 Spring 组件。 -
@EventHandler
: 标记处理事件的方法。 这些方法负责更新 Query Model。 -
@QueryHandler
: 标记处理查询的方法。 这些方法负责从 Query Model 中读取数据。 -
EntityManager
: 用于与数据库交互。 -
CQRS 的优缺点
优点 | 缺点 |
---|---|
性能优化:可以针对读操作和写操作进行不同的优化,提高系统性能。 | 复杂性:增加了系统的复杂性,需要更多的设计和开发工作。 |
可扩展性:可以独立扩展读模型和写模型,提高系统的可扩展性。 | 最终一致性:读模型和写模型之间存在最终一致性问题。 |
更好的安全性:可以将读模型暴露给外部,而隐藏写模型,提高系统的安全性。 | 数据同步:需要考虑如何保证读模型和写模型之间的数据同步。 |
灵活性:可以根据不同的查询需求,创建不同的读模型,提高系统的灵活性。 | 事件处理: 需要考虑如何处理事件的顺序性和幂等性。 |
第三章:Event Sourcing + CQRS – 黄金搭档
Event Sourcing 和 CQRS 通常一起使用,它们可以互相补充,发挥更大的威力。
- Event Sourcing 提供数据来源: Event Sourcing 负责存储所有事件,作为系统的数据来源。
- CQRS 提供读写分离: CQRS 将读操作和写操作分离,可以针对不同的操作进行优化。
- 事件驱动的 Query Model 更新: Command Model 产生事件后,Query Model 订阅事件,并更新自身的查询模型。
这样,我们既可以获得 Event Sourcing 的完整审计日志和易于调试的优点,又可以获得 CQRS 的高性能和可扩展性。
- 结合 Event Sourcing 和 CQRS 的架构图
一个简单的架构图大概是这样:
[Client] --> [Command Bus] --> [Command Handler] --> [Aggregate Root] --> [Event] --> [Event Bus] --> [Query Model Updater] --> [Query Model] --> [Client]
^
|
[Event Store] -----
-
实际应用场景
- 金融系统: 银行、证券、保险等金融系统需要记录每一笔交易,并提供高性能的查询服务。
- 电商系统: 电商系统需要记录用户的订单、支付、物流等信息,并提供实时的商品库存查询。
- 物联网系统: 物联网系统需要记录设备的各种状态数据,并提供实时的设备监控和控制。
第四章:Axon Framework – 简化 Event Sourcing 和 CQRS 的利器
Axon Framework 是一个开源的 Java 框架,它提供了构建 Event Sourcing 和 CQRS 应用程序所需的各种组件和工具。
-
Axon Framework 的核心组件
- Command Bus: 用于路由命令到相应的 Command Handler。
- Event Bus: 用于发布事件到相应的 Event Handler。
- Command Handler: 负责处理命令,执行业务逻辑,并产生事件。
- Event Handler: 负责处理事件,更新 Query Model。
- Aggregate: 领域模型中的聚合根。
- Event Store: 用于持久化存储事件的仓库。 Axon 支持多种 Event Store 实现,包括关系型数据库、NoSQL 数据库和专门的 EventStoreDB。
- Saga: 用于管理跨多个聚合的业务流程。
-
使用 Axon Framework 的优势
- 简化开发: Axon Framework 提供了各种组件和工具,可以简化 Event Sourcing 和 CQRS 应用程序的开发。
- 提高可测试性: Axon Framework 提供了测试工具,可以方便地测试 Command Handler 和 Event Handler。
- 支持多种 Event Store: Axon Framework 支持多种 Event Store 实现,可以根据实际需求选择合适的 Event Store。
- 提供 Saga 支持: Axon Framework 提供了 Saga 支持,可以方便地管理跨多个聚合的业务流程。
- Spring 集成: Axon Framework 深度集成 Spring,可以方便地与 Spring 生态系统中的其他组件集成。
-
Axon Framework 的一些高级特性
- Event Sourcing Handler 的快照 (Snapshotting): 为了加快重建聚合状态的速度,可以定期保存聚合的快照。
- 事件 Upcasting: 当事件结构发生变化时,可以使用 Upcaster 将旧事件转换为新事件。
- Saga 的补偿事务: 在 Saga 流程中,如果某个步骤失败,可以使用补偿事务来回滚之前的操作。
- 分布式事务管理 (Atomikos): Axon 可以与 Atomikos 集成,实现分布式事务管理。
第五章:EventStoreDB – 专业的 Event Store
EventStoreDB 是一个专门为 Event Sourcing 设计的数据库。它具有高性能、高可用性和高可靠性等特点。
-
EventStoreDB 的特点
- 基于事件的存储: EventStoreDB 将事件作为一等公民进行存储,可以高效地存储和查询事件。
- 流 (Stream): EventStoreDB 使用流来组织事件,每个流代表一个聚合的事件序列。
- 原子性: EventStoreDB 保证事件的原子性写入,要么全部写入成功,要么全部写入失败。
- 顺序性: EventStoreDB 保证事件的顺序性,事件按照写入的顺序进行存储和读取。
- 持久性: EventStoreDB 将事件持久化存储到磁盘上,保证数据的可靠性。
- 订阅: EventStoreDB 支持订阅机制,可以将事件推送到其他服务。
-
为什么选择 EventStoreDB?
- 性能: EventStoreDB 针对 Event Sourcing 场景进行了优化,具有更高的性能。
- 易用性: EventStoreDB 提供了简单的 API 和工具,可以方便地进行事件的存储和查询。
- 可靠性: EventStoreDB 具有高可用性和高可靠性等特点,可以保证数据的可靠性。
- 社区支持: EventStoreDB 有一个活跃的社区,可以提供技术支持和帮助。
-
Axon Framework 与 EventStoreDB 的集成
Axon Framework 提供了 EventStoreDB 的集成模块,可以方便地将 Axon Framework 与 EventStoreDB 集成。
总结:选择的艺术
Event Sourcing 和 CQRS 都是强大的架构模式,但它们也增加了系统的复杂性。在选择使用它们之前,需要仔细评估项目的需求和团队的技能。
-
什么时候适合使用 Event Sourcing 和 CQRS?
- 需要完整的审计日志: 如果需要追溯系统的任何状态变化,Event Sourcing 是一个不错的选择。
- 需要高性能的查询服务: 如果需要提供高性能的查询服务,CQRS 可以将读操作和写操作分离,提高系统性能。
- 领域模型复杂: 如果领域模型比较复杂,Event Sourcing 和 CQRS 可以帮助你更好地组织代码。
- 需要高可扩展性: 如果需要独立扩展读模型和写模型,CQRS 可以提高系统的可扩展性。
-
什么时候不适合使用 Event Sourcing 和 CQRS?
- 项目规模小: 如果项目规模比较小,使用 Event Sourcing 和 CQRS 可能会增加不必要的复杂性。
- 团队技能不足: 如果团队对 Event Sourcing 和 CQRS 不熟悉,可能会增加开发难度。
- 数据一致性要求高: 如果数据一致性要求非常高,需要考虑最终一致性带来的影响。
最终,技术的选择应该服务于业务需求。 不要为了用而用,要根据实际情况选择最合适的解决方案。
好了,今天的分享就到这里。希望大家对 Event Sourcing 和 CQRS 有了更深入的了解。 如果大家有什么问题,欢迎提问。 谢谢大家!