深入分析 JavaScript 中的 CQRS (Command Query Responsibility Segregation) 和 Event Sourcing (事件溯源) 模式在构建可伸缩、可审计系统中的作用。

各位老铁,晚上好!我是你们的老朋友,今晚咱们聊聊 JavaScript 里的 CQRS 和 Event Sourcing 这俩好基友,看看它们怎么帮我们搞定那些规模大、要求高的系统。放心,咱不整那些虚头巴脑的,直接上干货。

开场白:为啥我们需要 CQRS 和 Event Sourcing?

先问大家个问题:你有没有遇到过这样的情况,一个数据库表,既要处理大量的写入操作(比如用户注册、订单创建),又要支撑复杂的查询(比如各种维度的数据分析、报表生成)?

如果你的答案是“Yes”,恭喜你,你已经感受到了传统 CRUD 架构的痛苦了!CRUD(Create, Read, Update, Delete)简单粗暴,但当业务复杂起来,它就显得力不从心了。

  • 性能瓶颈: 读写操作争夺同一资源,导致性能下降。
  • 数据一致性: 复杂的业务逻辑容易导致数据不一致。
  • 可扩展性: 难以应对业务的快速增长。
  • 审计困难: 很难追踪数据的变化历史。

这时候,CQRS 和 Event Sourcing 这俩“救星”就该登场了。它们能帮你解耦读写操作,提高性能,增强可扩展性,并提供强大的审计能力。

第一幕:CQRS (Command Query Responsibility Segregation)——职责分离

CQRS 的核心思想很简单:把读操作(Query)和写操作(Command)分离到不同的模型中。就像夫妻分工一样,一个负责挣钱(写),一个负责管钱(读),这样才能把日子过好。

  • Command: 代表一个意图,用来改变系统的状态。比如“创建订单”、“修改用户信息”。
  • Query: 用来查询系统的状态,不应该有任何副作用。比如“获取订单详情”、“获取用户列表”。

CQRS 的好处:

  • 性能提升: 读写分离后,可以针对不同的模型进行优化。比如,读模型可以使用更适合查询的数据库或数据结构。
  • 简化模型: 读写模型分离后,可以专注于各自的领域,降低模型的复杂度。
  • 更好的可扩展性: 可以独立扩展读写模型,更好地应对业务增长。

CQRS 的架构图:

[Client] -> [Command] -> [Command Handler] -> [Domain Model] -> [Event Bus] -> [Event Handlers] -> [Read Model] -> [Query] -> [Client]

代码示例(JavaScript):

咱们来个简化的用户注册示例:

// Command: RegisterUserCommand
class RegisterUserCommand {
  constructor(public readonly userId: string, public readonly name: string, public readonly email: string) {}
}

// Command Handler: RegisterUserCommandHandler
class RegisterUserCommandHandler {
  constructor(private readonly userRepository: UserRepository) {}

  async handle(command: RegisterUserCommand): Promise<void> {
    const user = new User(command.userId, command.name, command.email);
    await this.userRepository.save(user);
    // 发送领域事件,例如 UserRegisteredEvent
    eventBus.publish(new UserRegisteredEvent(user.userId));
  }
}

// Query: GetUserQuery
class GetUserQuery {
  constructor(public readonly userId: string) {}
}

// Query Handler: GetUserQueryHandler
class GetUserQueryHandler {
  constructor(private readonly userReadRepository: UserReadRepository) {}

  async handle(query: GetUserQuery): Promise<UserReadModel | null> {
    return this.userReadRepository.getById(query.userId);
  }
}

// UserRepository (写模型)
class UserRepository {
  async save(user: User): Promise<void> {
    // 保存用户到数据库 (例如 MongoDB)
    console.log(`User saved to database: ${user.userId}`);
  }
}

// UserReadRepository (读模型)
class UserReadRepository {
  async getById(userId: string): Promise<UserReadModel | null> {
    // 从专门的读模型数据库 (例如 Redis, Elasticsearch) 获取用户
    console.log(`Fetching user from read model: ${userId}`);
    return { userId, name: 'Example User', email: '[email protected]' }; // 模拟数据
  }
}

// UserReadModel (读模型的数据结构)
interface UserReadModel {
  userId: string;
  name: string;
  email: string;
}

// User (领域模型)
class User {
    constructor(public readonly userId: string, public readonly name: string, public readonly email: string) {}
}

// Event Bus (简化版)
const eventBus = {
  publish: (event: any) => {
    console.log(`Event published: ${event.constructor.name}`);
    // 这里应该调用相应的事件处理器
    eventHandlers.forEach(handler => {
      if (handler.supports(event)) {
        handler.handle(event);
      }
    });
  }
};

// 事件处理器 (简化版)
const eventHandlers = [];

// UserRegisteredEvent
class UserRegisteredEvent {
  constructor(public readonly userId: string) {}
}

// 事件处理器接口
interface EventHandler {
  supports(event: any): boolean;
  handle(event: any): void;
}

// 创建 Read Model 的事件处理器
class CreateUserReadModelHandler implements EventHandler {
  constructor(private readonly userReadRepository: UserReadRepository) {}

  supports(event: any): boolean {
    return event instanceof UserRegisteredEvent;
  }

  async handle(event: UserRegisteredEvent): Promise<void> {
    const user = new User('dummy', 'dummy', 'dummy'); // 这里应该从写模型中获取完整数据,或者从事件中获取足够的信息
    //await this.userReadRepository.save(user); // save 方法应该接受 UserReadModel 类型
    console.log(`Creating read model for user: ${event.userId}`);
  }
}

eventHandlers.push(new CreateUserReadModelHandler(new UserReadRepository()));

// 使用示例
const userRepository = new UserRepository();
const userReadRepository = new UserReadRepository();
const registerUserCommandHandler = new RegisterUserCommandHandler(userRepository);
const getUserQueryHandler = new GetUserQueryHandler(userReadRepository);

const registerCommand = new RegisterUserCommand('123', 'John Doe', '[email protected]');
registerUserCommandHandler.handle(registerCommand);

const getUserQuery = new GetUserQuery('123');
getUserQueryHandler.handle(getUserQuery);

代码解释:

  • RegisterUserCommandGetUserQuery 分别代表写操作和读操作。
  • RegisterUserCommandHandlerGetUserQueryHandler 分别处理对应的命令和查询。
  • UserRepository 负责写模型的数据持久化,UserReadRepository 负责读模型的数据查询。
  • EventBus 用于发布领域事件,通知其他模块状态的变更。

注意事项:

  • CQRS 会增加代码的复杂性,需要权衡利弊。
  • 读写模型之间的数据同步是一个挑战,可以使用事件驱动的方式来实现最终一致性。
  • 选择合适的读模型存储介质非常重要,要根据查询需求选择。

第二幕:Event Sourcing——事件溯源

Event Sourcing 的核心思想是:把系统的状态变化记录为一系列的事件(Event),而不是直接存储最终状态。就像写日记一样,记录下每天发生了什么,而不是只记录最终的生活状态。

  • Event: 代表一个已经发生的事实,比如“用户已注册”、“订单已创建”、“商品已发货”。
  • Event Store: 用于存储事件的数据库,通常是 append-only 的。
  • Aggregate: 领域模型的聚合根,负责处理命令并生成事件。
  • Projection: 从事件流中构建读模型的组件。

Event Sourcing 的好处:

  • 完整的审计日志: 可以追踪系统的每一个状态变化,方便审计和调试。
  • 时间旅行: 可以回溯到系统的任何一个历史状态,用于问题诊断和数据恢复。
  • 更好的可扩展性: 可以通过 replay 事件来构建新的读模型。
  • 领域驱动设计: 更好地体现领域模型的业务逻辑。

Event Sourcing 的架构图:

[Client] -> [Command] -> [Command Handler] -> [Aggregate] -> [Event] -> [Event Store] -> [Event Bus] -> [Projection] -> [Read Model] -> [Query] -> [Client]

代码示例(JavaScript):

咱们用 Event Sourcing 来实现一个简单的银行账户:

// Event: AccountCreatedEvent
class AccountCreatedEvent {
  constructor(public readonly accountId: string, public readonly owner: string) {}
}

// Event: DepositEvent
class DepositEvent {
  constructor(public readonly accountId: string, public readonly amount: number) {}
}

// Event: WithdrawEvent
class WithdrawEvent {
  constructor(public readonly accountId: string, public readonly amount: number) {}
}

// Aggregate: Account
class Account {
  private balance: number = 0;
  private events: any[] = [];

  constructor(public readonly accountId: string, public readonly owner: string) {
    this.apply(new AccountCreatedEvent(accountId, owner));
  }

  deposit(amount: number): void {
    if (amount <= 0) {
      throw new Error('Deposit amount must be positive.');
    }
    this.apply(new DepositEvent(this.accountId, amount));
  }

  withdraw(amount: number): void {
    if (amount <= 0) {
      throw new Error('Withdraw amount must be positive.');
    }
    if (this.balance < amount) {
      throw new Error('Insufficient funds.');
    }
    this.apply(new WithdrawEvent(this.accountId, amount));
  }

  // 应用事件
  apply(event: any): void {
    if (event instanceof AccountCreatedEvent) {
      this.handleAccountCreated(event);
    } else if (event instanceof DepositEvent) {
      this.handleDeposit(event);
    } else if (event instanceof WithdrawEvent) {
      this.handleWithdraw(event);
    }
    this.events.push(event);
  }

  // 事件处理器
  private handleAccountCreated(event: AccountCreatedEvent): void {
    console.log(`Account created: ${event.accountId}`);
  }

  private handleDeposit(event: DepositEvent): void {
    this.balance += event.amount;
    console.log(`Deposited ${event.amount} into account ${event.accountId}. New balance: ${this.balance}`);
  }

  private handleWithdraw(event: WithdrawEvent): void {
    this.balance -= event.amount;
    console.log(`Withdrawn ${event.amount} from account ${event.accountId}. New balance: ${this.balance}`);
  }

  // 获取未提交的事件
  getUncommittedEvents(): any[] {
    return this.events;
  }

  // 清空未提交的事件
  clearUncommittedEvents(): void {
    this.events = [];
  }

  // 从事件流中重建 Aggregate
  static fromEvents(accountId: string, events: any[]): Account {
    const account = new Account(accountId, 'dummy'); // 创建一个临时的 Account 实例
    account.balance = 0; // 重置余额
    account.events = []; // 清空事件列表

    events.forEach(event => {
      if (event instanceof AccountCreatedEvent) {
        //  do nothing
      } else if (event instanceof DepositEvent) {
        account.balance += event.amount;
      } else if (event instanceof WithdrawEvent) {
        account.balance -= event.amount;
      }
    });
    return account;
  }
}

// Event Store (简化版)
class EventStore {
  private events: any[] = [];

  async saveEvents(accountId: string, events: any[]): Promise<void> {
    this.events = this.events.concat(events);
    console.log(`Events saved to event store for account ${accountId}`);
  }

  async getEvents(accountId: string): Promise<any[]> {
    return this.events.filter(event => (event as any).accountId === accountId);
  }
}

// Projection (简化版)
class AccountProjection {
  async getAccountBalance(accountId: string): Promise<number> {
    const eventStore = new EventStore();
    const events = await eventStore.getEvents(accountId);
    let balance = 0;

    events.forEach(event => {
      if (event instanceof DepositEvent) {
        balance += event.amount;
      } else if (event instanceof WithdrawEvent) {
        balance -= event.amount;
      }
    });

    return balance;
  }
}

// 使用示例
async function main() {
  const eventStore = new EventStore();
  const accountProjection = new AccountProjection();

  const accountId = '123';
  const account = new Account(accountId, 'John Doe');

  account.deposit(100);
  account.withdraw(50);

  await eventStore.saveEvents(accountId, account.getUncommittedEvents());
  account.clearUncommittedEvents();

  const balance = await accountProjection.getAccountBalance(accountId);
  console.log(`Current balance for account ${accountId}: ${balance}`);

  // 从事件流中重建 Account
  const events = await eventStore.getEvents(accountId);
  const reconstructedAccount = Account.fromEvents(accountId, events);
  console.log(`Reconstructed account balance: ${reconstructedAccount.balance}`);
}

main();

代码解释:

  • AccountCreatedEvent, DepositEvent, WithdrawEvent 代表账户相关的事件。
  • Account 是聚合根,负责处理存款和取款操作,并生成相应的事件。
  • EventStore 负责存储事件。
  • AccountProjection 负责从事件流中计算账户余额。

注意事项:

  • Event Sourcing 会增加系统的复杂度,需要认真评估。
  • 选择合适的 Event Store 非常重要,要考虑性能、可靠性和可扩展性。
  • 事件的版本控制是一个挑战,需要仔细设计。
  • 快照(Snapshot)可以减少 replay 事件的时间。

第三幕:CQRS + Event Sourcing = 完美搭档?

CQRS 和 Event Sourcing 经常一起使用,它们可以互相补充,发挥更大的威力。

  • CQRS 提供读写分离的架构,Event Sourcing 提供完整的审计日志和时间旅行能力。
  • Command Handler 负责处理命令,并调用 Aggregate 生成事件。
  • Event Bus 将事件发布给 Projection,Projection 负责构建读模型。
  • Query Handler 负责查询读模型。

架构图:

[Client] -> [Command] -> [Command Handler] -> [Aggregate] -> [Event] -> [Event Store] -> [Event Bus] -> [Projection] -> [Read Model] -> [Query] -> [Client]

代码示例:

(这里就不再重复写完整的代码了,因为前面的示例已经包含了 CQRS 和 Event Sourcing 的基本要素。只需要将它们结合起来即可。)

如何选择 CQRS 和 Event Sourcing?

特性 CQRS Event Sourcing
核心思想 读写分离 事件溯源
优点 性能提升,简化模型,更好的可扩展性 完整的审计日志,时间旅行,更好的可扩展性
缺点 增加代码复杂性,数据同步挑战 增加系统复杂度,事件版本控制挑战,需要快照支持
适用场景 读写比例悬殊,业务逻辑复杂,需要高性能的系统 需要完整的审计日志,需要时间旅行,领域驱动设计
是否需要一起用 不一定,可以单独使用 不一定,可以单独使用

总结:

CQRS 和 Event Sourcing 是强大的设计模式,可以帮助我们构建可伸缩、可审计的系统。但是,它们也增加了系统的复杂度,需要认真评估。在选择使用它们之前,要充分了解它们的优缺点,并根据实际情况进行权衡。

结尾:

希望今天的分享对大家有所帮助。记住,没有银弹,选择合适的技术方案才是最重要的。晚安!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注