好的,各位观众老爷,欢迎来到今天的“事件源(Event Sourcing)奇妙之旅”!今天咱们不聊虚的,直接上干货,用最接地气的方式,把Event Sourcing这玩意儿给扒个精光。
啥是Event Sourcing?别跟我拽术语!
想象一下,你平时记账,是直接修改银行账户余额,还是每次都记录“今天买了杯咖啡花了20”、“昨天收到工资5000”?
- 直接修改余额: 这就是传统的CRUD(Create, Read, Update, Delete)模式,每次修改都覆盖之前的状态。
- 记录所有事件: 这就是Event Sourcing的核心思想。我们不直接修改状态,而是记录发生的所有事件。最终的状态可以通过回放这些事件来得到。
用人话说,Event Sourcing就像是给你的应用装了个“时光机器”,你想知道过去任何时刻的状态,只要回放当时的事件就行了。
为啥要用Event Sourcing?吃饱了撑的?
当然不是!Event Sourcing可是有很多好处的,不然谁没事儿找事儿啊。
- 审计追踪: 知道谁、在什么时候、做了什么,对于合规性要求高的系统简直是福音。
- 调试神器: 出了问题,直接回放事件,重现当时的场景,定位问题贼快。
- 时间旅行: 想知道昨天下午3点账户有多少钱?回放到那个时间点就行了,方便快捷。
- 更好的扩展性: 事件可以异步处理,解耦系统各个部分,让系统更具弹性。
- 领域驱动设计(DDD)好基友: Event Sourcing天然契合DDD,更好地表达业务逻辑。
Event Sourcing的组成部分:
要玩转Event Sourcing,你需要了解几个关键组件:
- Event Store(事件存储): 顾名思义,就是用来存储事件的地方。它是一个只追加的日志,只能往里面写事件,不能修改或删除。
- Events(事件): 描述系统中发生的行为。比如“用户注册”、“商品添加购物车”、“订单支付”等。
- Aggregates(聚合): DDD里的概念,代表一个业务实体,负责处理事件并维护自身状态。
- Event Handlers(事件处理器): 监听Event Store中的事件,并执行相应的操作,比如更新读模型。
- Read Models(读模型): 为了查询优化而存在的。因为回放所有事件来查询效率太低,所以我们提前把数据处理好,放到专门的查询数据库里。
代码时间到!咱们撸起袖子干!
为了演示Event Sourcing,咱们用Python搞一个简单的银行账户系统。
1. 事件(Events):
import json
from dataclasses import dataclass, asdict
@dataclass
class Event:
event_type: str
def to_json(self):
return json.dumps(asdict(self))
@classmethod
def from_json(cls, json_str):
data = json.loads(json_str)
event_type = data.get('event_type')
if event_type == 'AccountCreated':
return AccountCreated(**data)
elif event_type == 'DepositPerformed':
return DepositPerformed(**data)
elif event_type == 'WithdrawalPerformed':
return WithdrawalPerformed(**data)
else:
raise ValueError(f"Unknown event type: {event_type}")
@dataclass
class AccountCreated(Event):
account_id: str
initial_balance: float
def __post_init__(self):
self.event_type = 'AccountCreated'
@dataclass
class DepositPerformed(Event):
account_id: str
amount: float
def __post_init__(self):
self.event_type = 'DepositPerformed'
@dataclass
class WithdrawalPerformed(Event):
account_id: str
amount: float
def __post_init__(self):
self.event_type = 'WithdrawalPerformed'
# Example Usage
account_created = AccountCreated(account_id="123", initial_balance=100.0)
deposit_performed = DepositPerformed(account_id="123", amount=50.0)
# Serialize to JSON
account_created_json = account_created.to_json()
deposit_performed_json = deposit_performed.to_json()
print(f"Account Created JSON: {account_created_json}")
print(f"Deposit Performed JSON: {deposit_performed_json}")
# Deserialize from JSON
account_created_deserialized = AccountCreated.from_json(account_created_json)
deposit_performed_deserialized = DepositPerformed.from_json(deposit_performed_json)
print(f"Account Created Deserialized: {account_created_deserialized}")
print(f"Deposit Performed Deserialized: {deposit_performed_deserialized}")
这里我们定义了三个事件:AccountCreated
(账户创建)、DepositPerformed
(存款)、WithdrawalPerformed
(取款)。 注意,我们使用了dataclass简化代码,并且实现了序列化和反序列化方法.
2. 聚合(Aggregate):
class Account:
def __init__(self, account_id, balance=0.0):
self.account_id = account_id
self.balance = balance
self.events = [] # 存储当前聚合未提交的事件。
def create(self, initial_balance):
if self.balance != 0.0:
raise Exception("Account already created")
self.apply(AccountCreated(account_id=self.account_id, initial_balance=initial_balance))
return self.events[-1]
def deposit(self, amount):
if amount <= 0:
raise ValueError("Deposit amount must be positive")
self.apply(DepositPerformed(account_id=self.account_id, amount=amount))
return self.events[-1]
def withdraw(self, amount):
if amount <= 0:
raise ValueError("Withdrawal amount must be positive")
if self.balance < amount:
raise ValueError("Insufficient funds")
self.apply(WithdrawalPerformed(account_id=self.account_id, amount=amount))
return self.events[-1]
def apply(self, event):
if isinstance(event, AccountCreated):
self.balance = event.initial_balance
elif isinstance(event, DepositPerformed):
self.balance += event.amount
elif isinstance(event, WithdrawalPerformed):
self.balance -= event.amount
else:
raise ValueError(f"Unknown event type: {type(event)}")
self.events.append(event)
def get_balance(self):
return self.balance
def load_from_history(self, events):
"""从事件历史中重构聚合状态"""
self.events = []
for event in events:
self.apply(event) # 注意这里不能添加到self.events里,因为这些事件已经持久化了。
return self
Account
类代表银行账户聚合。deposit
和withdraw
方法会生成相应的事件,并调用apply
方法更新自身状态。注意apply
方法是关键,它负责根据事件修改聚合的状态。 load_from_history方法用于从事件存储中加载事件,重建聚合。
3. 事件存储(Event Store):
这里我们用一个简单的列表来模拟事件存储,实际项目中可以使用数据库或者专门的Event Store,比如EventStoreDB。
class EventStore:
def __init__(self):
self.events = []
def append(self, event):
self.events.append(event)
def get_events(self, account_id):
return [event for event in self.events if hasattr(event, 'account_id') and event.account_id == account_id]
def all_events(self):
return self.events
EventStore
类提供append
方法来添加事件,get_events
方法来获取指定账户的所有事件。
4. 使用示例:
# 初始化
event_store = EventStore()
account = Account(account_id="123")
# 创建账户
event = account.create(100.0)
event_store.append(event)
# 存款
event = account.deposit(50.0)
event_store.append(event)
# 取款
event = account.withdraw(20.0)
event_store.append(event)
# 查询账户余额
events = event_store.get_events(account.account_id)
account = Account(account_id="123").load_from_history(events)
print(f"Current balance: {account.get_balance()}") # 输出:Current balance: 130.0
# 打印所有事件
all_events = event_store.all_events()
for e in all_events:
print(e)
这段代码演示了如何创建账户、存款、取款,以及如何从事件存储中恢复账户状态。
5. 读模型(Read Model):
为了优化查询性能,我们可以创建一个读模型,将账户余额信息提前计算好并存储起来。
class AccountReadModel:
def __init__(self):
self.accounts = {} # {account_id: balance}
def apply_event(self, event):
if isinstance(event, AccountCreated):
self.accounts[event.account_id] = event.initial_balance
elif isinstance(event, DepositPerformed):
if event.account_id in self.accounts:
self.accounts[event.account_id] += event.amount
elif isinstance(event, WithdrawalPerformed):
if event.account_id in self.accounts:
self.accounts[event.account_id] -= event.amount
def get_balance(self, account_id):
return self.accounts.get(account_id, 0.0)
# 创建读模型实例
read_model = AccountReadModel()
# 从事件存储中加载所有事件,并更新读模型
all_events = event_store.all_events()
for event in all_events:
read_model.apply_event(event)
# 查询账户余额
print(f"Account 123 balance (from read model): {read_model.get_balance('123')}") # 输出:Account 123 balance (from read model): 130.0
AccountReadModel
类维护了一个账户余额的字典。apply_event
方法根据事件更新余额。通过提前计算好余额,查询时直接从accounts
字典里取,速度杠杠的。
Event Sourcing的挑战:
Event Sourcing虽好,但也有一些坑需要注意:
- 事件溯源的复杂性: 需要仔细设计事件,确保能够完整描述业务行为。
- 事件演化: 当事件结构发生变化时,需要考虑如何兼容旧的事件。可以使用事件转换(Event Transformation)或者版本控制来解决。
- 最终一致性: 读模型和写模型之间可能存在延迟,需要接受最终一致性的概念。
- 事件存储的选择: 需要选择合适的事件存储,比如EventStoreDB、Kafka等。
- 查询复杂性: 复杂的查询可能需要构建更复杂的读模型。
事件演化(Event Evolution)策略:
当你的应用不断发展,你可能需要修改现有的事件结构。这可能会导致问题,因为旧的事件可能无法被新的代码正确处理。以下是一些解决事件演化的策略:
-
事件版本控制(Event Versioning):
- 为每个事件类型分配一个版本号。
- 当事件结构发生变化时,创建一个新的事件版本。
- 在事件处理器中,根据事件版本号来处理不同版本的事件。
@dataclass class AccountCreated(Event): account_id: str initial_balance: float event_version: int = 1 # 添加版本号 def __post_init__(self): self.event_type = 'AccountCreated'
在事件处理器中:
def apply_event(self, event): if isinstance(event, AccountCreated): if event.event_version == 1: self.accounts[event.account_id] = event.initial_balance elif event.event_version == 2: # 处理新的事件结构 pass
-
事件迁移(Event Migration):
- 创建一个迁移脚本,将旧的事件转换为新的事件结构。
- 运行迁移脚本,更新事件存储中的所有旧事件。
- 这种方法需要谨慎操作,确保数据不会丢失或损坏。
-
事件转换(Event Transformation):
- 在事件处理器中,将旧的事件转换为新的事件结构。
- 这种方法不需要修改事件存储中的事件,但会增加事件处理器的复杂性。
def apply_event(self, event): if isinstance(event, AccountCreated): if hasattr(event, 'initial_balance'): balance = event.initial_balance else: # 从旧的事件结构中获取余额 balance = event.old_balance_field self.accounts[event.account_id] = balance
总结:
Event Sourcing是一种强大的架构模式,可以带来很多好处。但是,它也增加了一些复杂性。在决定是否使用Event Sourcing时,需要权衡其优点和缺点。 如果你的系统需要审计追踪、时间旅行、或者需要更好的扩展性,那么Event Sourcing可能是一个不错的选择。
最后,送大家一句话:
“代码虐我千百遍,我待代码如初恋!”
希望今天的讲座能帮助大家更好地理解Event Sourcing。 感谢各位的观看,咱们下期再见!