各位老铁,晚上好!我是你们的老朋友,今晚咱们聊聊 JavaScript 里的 CQRS 和 Event Sourcing 这俩好基友,看看它们怎么帮我们搞定那些规模大、要求高的系统。放心,咱不整那些虚头巴脑的,直接上干货。
开场白:为啥我们需要 CQRS 和 Event Sourcing?
先问大家个问题:你有没有遇到过这样的情况,一个数据库表,既要处理大量的写入操作(比如用户注册、订单创建),又要支撑复杂的查询(比如各种维度的数据分析、报表生成)?
如果你的答案是“Yes”,恭喜你,你已经感受到了传统 CRUD 架构的痛苦了!CRUD(Create, Read, Update, Delete)简单粗暴,但当业务复杂起来,它就显得力不从心了。
- 性能瓶颈: 读写操作争夺同一资源,导致性能下降。
- 数据一致性: 复杂的业务逻辑容易导致数据不一致。
- 可扩展性: 难以应对业务的快速增长。
- 审计困难: 很难追踪数据的变化历史。
这时候,CQRS 和 Event Sourcing 这俩“救星”就该登场了。它们能帮你解耦读写操作,提高性能,增强可扩展性,并提供强大的审计能力。
第一幕:CQRS (Command Query Responsibility Segregation)——职责分离
CQRS 的核心思想很简单:把读操作(Query)和写操作(Command)分离到不同的模型中。就像夫妻分工一样,一个负责挣钱(写),一个负责管钱(读),这样才能把日子过好。
- Command: 代表一个意图,用来改变系统的状态。比如“创建订单”、“修改用户信息”。
- Query: 用来查询系统的状态,不应该有任何副作用。比如“获取订单详情”、“获取用户列表”。
CQRS 的好处:
- 性能提升: 读写分离后,可以针对不同的模型进行优化。比如,读模型可以使用更适合查询的数据库或数据结构。
- 简化模型: 读写模型分离后,可以专注于各自的领域,降低模型的复杂度。
- 更好的可扩展性: 可以独立扩展读写模型,更好地应对业务增长。
CQRS 的架构图:
[Client] -> [Command] -> [Command Handler] -> [Domain Model] -> [Event Bus] -> [Event Handlers] -> [Read Model] -> [Query] -> [Client]
代码示例(JavaScript):
咱们来个简化的用户注册示例:
// Command: RegisterUserCommand
class RegisterUserCommand {
constructor(public readonly userId: string, public readonly name: string, public readonly email: string) {}
}
// Command Handler: RegisterUserCommandHandler
class RegisterUserCommandHandler {
constructor(private readonly userRepository: UserRepository) {}
async handle(command: RegisterUserCommand): Promise<void> {
const user = new User(command.userId, command.name, command.email);
await this.userRepository.save(user);
// 发送领域事件,例如 UserRegisteredEvent
eventBus.publish(new UserRegisteredEvent(user.userId));
}
}
// Query: GetUserQuery
class GetUserQuery {
constructor(public readonly userId: string) {}
}
// Query Handler: GetUserQueryHandler
class GetUserQueryHandler {
constructor(private readonly userReadRepository: UserReadRepository) {}
async handle(query: GetUserQuery): Promise<UserReadModel | null> {
return this.userReadRepository.getById(query.userId);
}
}
// UserRepository (写模型)
class UserRepository {
async save(user: User): Promise<void> {
// 保存用户到数据库 (例如 MongoDB)
console.log(`User saved to database: ${user.userId}`);
}
}
// UserReadRepository (读模型)
class UserReadRepository {
async getById(userId: string): Promise<UserReadModel | null> {
// 从专门的读模型数据库 (例如 Redis, Elasticsearch) 获取用户
console.log(`Fetching user from read model: ${userId}`);
return { userId, name: 'Example User', email: '[email protected]' }; // 模拟数据
}
}
// UserReadModel (读模型的数据结构)
interface UserReadModel {
userId: string;
name: string;
email: string;
}
// User (领域模型)
class User {
constructor(public readonly userId: string, public readonly name: string, public readonly email: string) {}
}
// Event Bus (简化版)
const eventBus = {
publish: (event: any) => {
console.log(`Event published: ${event.constructor.name}`);
// 这里应该调用相应的事件处理器
eventHandlers.forEach(handler => {
if (handler.supports(event)) {
handler.handle(event);
}
});
}
};
// 事件处理器 (简化版)
const eventHandlers = [];
// UserRegisteredEvent
class UserRegisteredEvent {
constructor(public readonly userId: string) {}
}
// 事件处理器接口
interface EventHandler {
supports(event: any): boolean;
handle(event: any): void;
}
// 创建 Read Model 的事件处理器
class CreateUserReadModelHandler implements EventHandler {
constructor(private readonly userReadRepository: UserReadRepository) {}
supports(event: any): boolean {
return event instanceof UserRegisteredEvent;
}
async handle(event: UserRegisteredEvent): Promise<void> {
const user = new User('dummy', 'dummy', 'dummy'); // 这里应该从写模型中获取完整数据,或者从事件中获取足够的信息
//await this.userReadRepository.save(user); // save 方法应该接受 UserReadModel 类型
console.log(`Creating read model for user: ${event.userId}`);
}
}
eventHandlers.push(new CreateUserReadModelHandler(new UserReadRepository()));
// 使用示例
const userRepository = new UserRepository();
const userReadRepository = new UserReadRepository();
const registerUserCommandHandler = new RegisterUserCommandHandler(userRepository);
const getUserQueryHandler = new GetUserQueryHandler(userReadRepository);
const registerCommand = new RegisterUserCommand('123', 'John Doe', '[email protected]');
registerUserCommandHandler.handle(registerCommand);
const getUserQuery = new GetUserQuery('123');
getUserQueryHandler.handle(getUserQuery);
代码解释:
RegisterUserCommand
和GetUserQuery
分别代表写操作和读操作。RegisterUserCommandHandler
和GetUserQueryHandler
分别处理对应的命令和查询。UserRepository
负责写模型的数据持久化,UserReadRepository
负责读模型的数据查询。EventBus
用于发布领域事件,通知其他模块状态的变更。
注意事项:
- CQRS 会增加代码的复杂性,需要权衡利弊。
- 读写模型之间的数据同步是一个挑战,可以使用事件驱动的方式来实现最终一致性。
- 选择合适的读模型存储介质非常重要,要根据查询需求选择。
第二幕:Event Sourcing——事件溯源
Event Sourcing 的核心思想是:把系统的状态变化记录为一系列的事件(Event),而不是直接存储最终状态。就像写日记一样,记录下每天发生了什么,而不是只记录最终的生活状态。
- Event: 代表一个已经发生的事实,比如“用户已注册”、“订单已创建”、“商品已发货”。
- Event Store: 用于存储事件的数据库,通常是 append-only 的。
- Aggregate: 领域模型的聚合根,负责处理命令并生成事件。
- Projection: 从事件流中构建读模型的组件。
Event Sourcing 的好处:
- 完整的审计日志: 可以追踪系统的每一个状态变化,方便审计和调试。
- 时间旅行: 可以回溯到系统的任何一个历史状态,用于问题诊断和数据恢复。
- 更好的可扩展性: 可以通过 replay 事件来构建新的读模型。
- 领域驱动设计: 更好地体现领域模型的业务逻辑。
Event Sourcing 的架构图:
[Client] -> [Command] -> [Command Handler] -> [Aggregate] -> [Event] -> [Event Store] -> [Event Bus] -> [Projection] -> [Read Model] -> [Query] -> [Client]
代码示例(JavaScript):
咱们用 Event Sourcing 来实现一个简单的银行账户:
// Event: AccountCreatedEvent
class AccountCreatedEvent {
constructor(public readonly accountId: string, public readonly owner: string) {}
}
// Event: DepositEvent
class DepositEvent {
constructor(public readonly accountId: string, public readonly amount: number) {}
}
// Event: WithdrawEvent
class WithdrawEvent {
constructor(public readonly accountId: string, public readonly amount: number) {}
}
// Aggregate: Account
class Account {
private balance: number = 0;
private events: any[] = [];
constructor(public readonly accountId: string, public readonly owner: string) {
this.apply(new AccountCreatedEvent(accountId, owner));
}
deposit(amount: number): void {
if (amount <= 0) {
throw new Error('Deposit amount must be positive.');
}
this.apply(new DepositEvent(this.accountId, amount));
}
withdraw(amount: number): void {
if (amount <= 0) {
throw new Error('Withdraw amount must be positive.');
}
if (this.balance < amount) {
throw new Error('Insufficient funds.');
}
this.apply(new WithdrawEvent(this.accountId, amount));
}
// 应用事件
apply(event: any): void {
if (event instanceof AccountCreatedEvent) {
this.handleAccountCreated(event);
} else if (event instanceof DepositEvent) {
this.handleDeposit(event);
} else if (event instanceof WithdrawEvent) {
this.handleWithdraw(event);
}
this.events.push(event);
}
// 事件处理器
private handleAccountCreated(event: AccountCreatedEvent): void {
console.log(`Account created: ${event.accountId}`);
}
private handleDeposit(event: DepositEvent): void {
this.balance += event.amount;
console.log(`Deposited ${event.amount} into account ${event.accountId}. New balance: ${this.balance}`);
}
private handleWithdraw(event: WithdrawEvent): void {
this.balance -= event.amount;
console.log(`Withdrawn ${event.amount} from account ${event.accountId}. New balance: ${this.balance}`);
}
// 获取未提交的事件
getUncommittedEvents(): any[] {
return this.events;
}
// 清空未提交的事件
clearUncommittedEvents(): void {
this.events = [];
}
// 从事件流中重建 Aggregate
static fromEvents(accountId: string, events: any[]): Account {
const account = new Account(accountId, 'dummy'); // 创建一个临时的 Account 实例
account.balance = 0; // 重置余额
account.events = []; // 清空事件列表
events.forEach(event => {
if (event instanceof AccountCreatedEvent) {
// do nothing
} else if (event instanceof DepositEvent) {
account.balance += event.amount;
} else if (event instanceof WithdrawEvent) {
account.balance -= event.amount;
}
});
return account;
}
}
// Event Store (简化版)
class EventStore {
private events: any[] = [];
async saveEvents(accountId: string, events: any[]): Promise<void> {
this.events = this.events.concat(events);
console.log(`Events saved to event store for account ${accountId}`);
}
async getEvents(accountId: string): Promise<any[]> {
return this.events.filter(event => (event as any).accountId === accountId);
}
}
// Projection (简化版)
class AccountProjection {
async getAccountBalance(accountId: string): Promise<number> {
const eventStore = new EventStore();
const events = await eventStore.getEvents(accountId);
let balance = 0;
events.forEach(event => {
if (event instanceof DepositEvent) {
balance += event.amount;
} else if (event instanceof WithdrawEvent) {
balance -= event.amount;
}
});
return balance;
}
}
// 使用示例
async function main() {
const eventStore = new EventStore();
const accountProjection = new AccountProjection();
const accountId = '123';
const account = new Account(accountId, 'John Doe');
account.deposit(100);
account.withdraw(50);
await eventStore.saveEvents(accountId, account.getUncommittedEvents());
account.clearUncommittedEvents();
const balance = await accountProjection.getAccountBalance(accountId);
console.log(`Current balance for account ${accountId}: ${balance}`);
// 从事件流中重建 Account
const events = await eventStore.getEvents(accountId);
const reconstructedAccount = Account.fromEvents(accountId, events);
console.log(`Reconstructed account balance: ${reconstructedAccount.balance}`);
}
main();
代码解释:
AccountCreatedEvent
,DepositEvent
,WithdrawEvent
代表账户相关的事件。Account
是聚合根,负责处理存款和取款操作,并生成相应的事件。EventStore
负责存储事件。AccountProjection
负责从事件流中计算账户余额。
注意事项:
- Event Sourcing 会增加系统的复杂度,需要认真评估。
- 选择合适的 Event Store 非常重要,要考虑性能、可靠性和可扩展性。
- 事件的版本控制是一个挑战,需要仔细设计。
- 快照(Snapshot)可以减少 replay 事件的时间。
第三幕:CQRS + Event Sourcing = 完美搭档?
CQRS 和 Event Sourcing 经常一起使用,它们可以互相补充,发挥更大的威力。
- CQRS 提供读写分离的架构,Event Sourcing 提供完整的审计日志和时间旅行能力。
- Command Handler 负责处理命令,并调用 Aggregate 生成事件。
- Event Bus 将事件发布给 Projection,Projection 负责构建读模型。
- Query Handler 负责查询读模型。
架构图:
[Client] -> [Command] -> [Command Handler] -> [Aggregate] -> [Event] -> [Event Store] -> [Event Bus] -> [Projection] -> [Read Model] -> [Query] -> [Client]
代码示例:
(这里就不再重复写完整的代码了,因为前面的示例已经包含了 CQRS 和 Event Sourcing 的基本要素。只需要将它们结合起来即可。)
如何选择 CQRS 和 Event Sourcing?
特性 | CQRS | Event Sourcing |
---|---|---|
核心思想 | 读写分离 | 事件溯源 |
优点 | 性能提升,简化模型,更好的可扩展性 | 完整的审计日志,时间旅行,更好的可扩展性 |
缺点 | 增加代码复杂性,数据同步挑战 | 增加系统复杂度,事件版本控制挑战,需要快照支持 |
适用场景 | 读写比例悬殊,业务逻辑复杂,需要高性能的系统 | 需要完整的审计日志,需要时间旅行,领域驱动设计 |
是否需要一起用 | 不一定,可以单独使用 | 不一定,可以单独使用 |
总结:
CQRS 和 Event Sourcing 是强大的设计模式,可以帮助我们构建可伸缩、可审计的系统。但是,它们也增加了系统的复杂度,需要认真评估。在选择使用它们之前,要充分了解它们的优缺点,并根据实际情况进行权衡。
结尾:
希望今天的分享对大家有所帮助。记住,没有银弹,选择合适的技术方案才是最重要的。晚安!