Python 事件源(Event Sourcing):基于事件的系统设计

好的,各位观众老爷,欢迎来到今天的“事件源(Event Sourcing)奇妙之旅”!今天咱们不聊虚的,直接上干货,用最接地气的方式,把Event Sourcing这玩意儿给扒个精光。

啥是Event Sourcing?别跟我拽术语!

想象一下,你平时记账,是直接修改银行账户余额,还是每次都记录“今天买了杯咖啡花了20”、“昨天收到工资5000”?

  • 直接修改余额: 这就是传统的CRUD(Create, Read, Update, Delete)模式,每次修改都覆盖之前的状态。
  • 记录所有事件: 这就是Event Sourcing的核心思想。我们不直接修改状态,而是记录发生的所有事件。最终的状态可以通过回放这些事件来得到。

用人话说,Event Sourcing就像是给你的应用装了个“时光机器”,你想知道过去任何时刻的状态,只要回放当时的事件就行了。

为啥要用Event Sourcing?吃饱了撑的?

当然不是!Event Sourcing可是有很多好处的,不然谁没事儿找事儿啊。

  • 审计追踪: 知道谁、在什么时候、做了什么,对于合规性要求高的系统简直是福音。
  • 调试神器: 出了问题,直接回放事件,重现当时的场景,定位问题贼快。
  • 时间旅行: 想知道昨天下午3点账户有多少钱?回放到那个时间点就行了,方便快捷。
  • 更好的扩展性: 事件可以异步处理,解耦系统各个部分,让系统更具弹性。
  • 领域驱动设计(DDD)好基友: Event Sourcing天然契合DDD,更好地表达业务逻辑。

Event Sourcing的组成部分:

要玩转Event Sourcing,你需要了解几个关键组件:

  • Event Store(事件存储): 顾名思义,就是用来存储事件的地方。它是一个只追加的日志,只能往里面写事件,不能修改或删除。
  • Events(事件): 描述系统中发生的行为。比如“用户注册”、“商品添加购物车”、“订单支付”等。
  • Aggregates(聚合): DDD里的概念,代表一个业务实体,负责处理事件并维护自身状态。
  • Event Handlers(事件处理器): 监听Event Store中的事件,并执行相应的操作,比如更新读模型。
  • Read Models(读模型): 为了查询优化而存在的。因为回放所有事件来查询效率太低,所以我们提前把数据处理好,放到专门的查询数据库里。

代码时间到!咱们撸起袖子干!

为了演示Event Sourcing,咱们用Python搞一个简单的银行账户系统。

1. 事件(Events):

import json
from dataclasses import dataclass, asdict

@dataclass
class Event:
    event_type: str

    def to_json(self):
        return json.dumps(asdict(self))

    @classmethod
    def from_json(cls, json_str):
        data = json.loads(json_str)
        event_type = data.get('event_type')

        if event_type == 'AccountCreated':
            return AccountCreated(**data)
        elif event_type == 'DepositPerformed':
            return DepositPerformed(**data)
        elif event_type == 'WithdrawalPerformed':
            return WithdrawalPerformed(**data)
        else:
            raise ValueError(f"Unknown event type: {event_type}")

@dataclass
class AccountCreated(Event):
    account_id: str
    initial_balance: float

    def __post_init__(self):
        self.event_type = 'AccountCreated'

@dataclass
class DepositPerformed(Event):
    account_id: str
    amount: float

    def __post_init__(self):
        self.event_type = 'DepositPerformed'

@dataclass
class WithdrawalPerformed(Event):
    account_id: str
    amount: float

    def __post_init__(self):
        self.event_type = 'WithdrawalPerformed'

# Example Usage
account_created = AccountCreated(account_id="123", initial_balance=100.0)
deposit_performed = DepositPerformed(account_id="123", amount=50.0)

# Serialize to JSON
account_created_json = account_created.to_json()
deposit_performed_json = deposit_performed.to_json()

print(f"Account Created JSON: {account_created_json}")
print(f"Deposit Performed JSON: {deposit_performed_json}")

# Deserialize from JSON
account_created_deserialized = AccountCreated.from_json(account_created_json)
deposit_performed_deserialized = DepositPerformed.from_json(deposit_performed_json)

print(f"Account Created Deserialized: {account_created_deserialized}")
print(f"Deposit Performed Deserialized: {deposit_performed_deserialized}")

这里我们定义了三个事件:AccountCreated(账户创建)、DepositPerformed(存款)、WithdrawalPerformed(取款)。 注意,我们使用了dataclass简化代码,并且实现了序列化和反序列化方法.
2. 聚合(Aggregate):

class Account:
    def __init__(self, account_id, balance=0.0):
        self.account_id = account_id
        self.balance = balance
        self.events = [] # 存储当前聚合未提交的事件。

    def create(self, initial_balance):
        if self.balance != 0.0:
            raise Exception("Account already created")
        self.apply(AccountCreated(account_id=self.account_id, initial_balance=initial_balance))
        return self.events[-1]

    def deposit(self, amount):
        if amount <= 0:
            raise ValueError("Deposit amount must be positive")
        self.apply(DepositPerformed(account_id=self.account_id, amount=amount))
        return self.events[-1]

    def withdraw(self, amount):
        if amount <= 0:
            raise ValueError("Withdrawal amount must be positive")
        if self.balance < amount:
            raise ValueError("Insufficient funds")
        self.apply(WithdrawalPerformed(account_id=self.account_id, amount=amount))
        return self.events[-1]

    def apply(self, event):
        if isinstance(event, AccountCreated):
            self.balance = event.initial_balance
        elif isinstance(event, DepositPerformed):
            self.balance += event.amount
        elif isinstance(event, WithdrawalPerformed):
            self.balance -= event.amount
        else:
            raise ValueError(f"Unknown event type: {type(event)}")
        self.events.append(event)

    def get_balance(self):
        return self.balance

    def load_from_history(self, events):
        """从事件历史中重构聚合状态"""
        self.events = []
        for event in events:
            self.apply(event) # 注意这里不能添加到self.events里,因为这些事件已经持久化了。
        return self

Account类代表银行账户聚合。depositwithdraw方法会生成相应的事件,并调用apply方法更新自身状态。注意apply方法是关键,它负责根据事件修改聚合的状态。 load_from_history方法用于从事件存储中加载事件,重建聚合。

3. 事件存储(Event Store):

这里我们用一个简单的列表来模拟事件存储,实际项目中可以使用数据库或者专门的Event Store,比如EventStoreDB。

class EventStore:
    def __init__(self):
        self.events = []

    def append(self, event):
        self.events.append(event)

    def get_events(self, account_id):
        return [event for event in self.events if hasattr(event, 'account_id') and event.account_id == account_id]

    def all_events(self):
        return self.events

EventStore类提供append方法来添加事件,get_events方法来获取指定账户的所有事件。

4. 使用示例:

# 初始化
event_store = EventStore()
account = Account(account_id="123")

# 创建账户
event = account.create(100.0)
event_store.append(event)

# 存款
event = account.deposit(50.0)
event_store.append(event)

# 取款
event = account.withdraw(20.0)
event_store.append(event)

# 查询账户余额
events = event_store.get_events(account.account_id)
account = Account(account_id="123").load_from_history(events)
print(f"Current balance: {account.get_balance()}")  # 输出:Current balance: 130.0

# 打印所有事件
all_events = event_store.all_events()
for e in all_events:
    print(e)

这段代码演示了如何创建账户、存款、取款,以及如何从事件存储中恢复账户状态。

5. 读模型(Read Model):

为了优化查询性能,我们可以创建一个读模型,将账户余额信息提前计算好并存储起来。

class AccountReadModel:
    def __init__(self):
        self.accounts = {}  # {account_id: balance}

    def apply_event(self, event):
        if isinstance(event, AccountCreated):
            self.accounts[event.account_id] = event.initial_balance
        elif isinstance(event, DepositPerformed):
            if event.account_id in self.accounts:
                self.accounts[event.account_id] += event.amount
        elif isinstance(event, WithdrawalPerformed):
            if event.account_id in self.accounts:
                self.accounts[event.account_id] -= event.amount

    def get_balance(self, account_id):
        return self.accounts.get(account_id, 0.0)

# 创建读模型实例
read_model = AccountReadModel()

# 从事件存储中加载所有事件,并更新读模型
all_events = event_store.all_events()
for event in all_events:
    read_model.apply_event(event)

# 查询账户余额
print(f"Account 123 balance (from read model): {read_model.get_balance('123')}")  # 输出:Account 123 balance (from read model): 130.0

AccountReadModel类维护了一个账户余额的字典。apply_event方法根据事件更新余额。通过提前计算好余额,查询时直接从accounts字典里取,速度杠杠的。

Event Sourcing的挑战:

Event Sourcing虽好,但也有一些坑需要注意:

  • 事件溯源的复杂性: 需要仔细设计事件,确保能够完整描述业务行为。
  • 事件演化: 当事件结构发生变化时,需要考虑如何兼容旧的事件。可以使用事件转换(Event Transformation)或者版本控制来解决。
  • 最终一致性: 读模型和写模型之间可能存在延迟,需要接受最终一致性的概念。
  • 事件存储的选择: 需要选择合适的事件存储,比如EventStoreDB、Kafka等。
  • 查询复杂性: 复杂的查询可能需要构建更复杂的读模型。

事件演化(Event Evolution)策略:

当你的应用不断发展,你可能需要修改现有的事件结构。这可能会导致问题,因为旧的事件可能无法被新的代码正确处理。以下是一些解决事件演化的策略:

  1. 事件版本控制(Event Versioning):

    • 为每个事件类型分配一个版本号。
    • 当事件结构发生变化时,创建一个新的事件版本。
    • 在事件处理器中,根据事件版本号来处理不同版本的事件。
    @dataclass
    class AccountCreated(Event):
        account_id: str
        initial_balance: float
        event_version: int = 1  # 添加版本号
    
        def __post_init__(self):
            self.event_type = 'AccountCreated'

    在事件处理器中:

    def apply_event(self, event):
        if isinstance(event, AccountCreated):
            if event.event_version == 1:
                self.accounts[event.account_id] = event.initial_balance
            elif event.event_version == 2:
                # 处理新的事件结构
                pass
  2. 事件迁移(Event Migration):

    • 创建一个迁移脚本,将旧的事件转换为新的事件结构。
    • 运行迁移脚本,更新事件存储中的所有旧事件。
    • 这种方法需要谨慎操作,确保数据不会丢失或损坏。
  3. 事件转换(Event Transformation):

    • 在事件处理器中,将旧的事件转换为新的事件结构。
    • 这种方法不需要修改事件存储中的事件,但会增加事件处理器的复杂性。
    def apply_event(self, event):
        if isinstance(event, AccountCreated):
            if hasattr(event, 'initial_balance'):
                balance = event.initial_balance
            else:
                # 从旧的事件结构中获取余额
                balance = event.old_balance_field
            self.accounts[event.account_id] = balance

总结:

Event Sourcing是一种强大的架构模式,可以带来很多好处。但是,它也增加了一些复杂性。在决定是否使用Event Sourcing时,需要权衡其优点和缺点。 如果你的系统需要审计追踪、时间旅行、或者需要更好的扩展性,那么Event Sourcing可能是一个不错的选择。

最后,送大家一句话:

“代码虐我千百遍,我待代码如初恋!”

希望今天的讲座能帮助大家更好地理解Event Sourcing。 感谢各位的观看,咱们下期再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注