基于事件溯源(Event Sourcing)和CQRS的Java领域驱动设计(DDD)实践
大家好,今天我们来聊聊一个相对复杂但威力强大的架构模式组合:基于事件溯源(Event Sourcing)和命令查询职责分离(CQRS)的Java领域驱动设计(DDD)实践。 这三种模式单独拿出来都有各自的优势,组合起来更是能解决传统应用开发中遇到的许多问题。 本次讲座将通过一个具体的例子,深入浅出地讲解如何将它们应用到实际项目中。
1. 领域驱动设计(DDD)回顾
DDD 是一种软件开发方法论,它强调以领域为中心,通过与领域专家协作,理解领域模型,并将模型反映到代码中。DDD 关注业务逻辑,而非技术细节。它的核心思想是:
- 限界上下文(Bounded Context): 定义领域模型的边界,每个限界上下文中都有自己的领域模型,避免模型之间的混淆。
- 通用语言(Ubiquitous Language): 团队成员(包括开发人员、领域专家等)使用统一的语言描述领域概念,消除沟通障碍。
- 实体(Entity): 具有唯一标识的对象,其状态会随着时间变化。
- 值对象(Value Object): 不具有唯一标识的对象,其状态不可变,只关心其属性值。
- 聚合(Aggregate): 一组相关对象的集合,其中一个对象是聚合根(Aggregate Root),外部只能通过聚合根访问聚合内的其他对象,保证数据一致性。
- 领域服务(Domain Service): 执行跨多个聚合的业务逻辑。
- 领域事件(Domain Event): 领域中发生的有意义的事件,用于通知其他限界上下文或应用组件。
- 仓储(Repository): 提供访问聚合的接口,隐藏数据访问细节。
2. 事件溯源(Event Sourcing)
事件溯源的核心思想是将领域对象的状态变化记录为一系列的事件。每次状态变更不是直接更新对象的状态,而是将事件追加到事件日志中。通过重放事件日志,可以重建对象的当前状态。
传统 CRUD 方式的弊端:
- 数据丢失风险: 每次更新数据都会覆盖旧数据,无法追溯历史状态。
- 审计困难: 难以追踪数据的变更历史,不便于审计和分析。
- 并发冲突: 多用户同时修改同一数据时,容易发生并发冲突。
事件溯源的优势:
- 完整的数据历史: 能够追溯对象的每一个状态变化。
- 审计和分析: 便于进行数据审计、分析和报表生成。
- 可追溯性: 能够回溯到任意时间点的状态。
- 异步处理: 事件可以用于触发异步处理,提高系统响应速度。
- 更容易的微服务架构: 各个微服务可以通过订阅事件进行数据同步,实现最终一致性。
事件溯源的挑战:
- 复杂性增加: 需要设计事件模型、事件存储和事件重放机制。
- 查询复杂: 不能直接从数据库读取对象的当前状态,需要重放事件。
- 事件演化: 需要处理事件模式的演化,保证旧事件在新系统中仍然可用。
3. 命令查询职责分离(CQRS)
CQRS 的核心思想是将系统的读操作(查询)和写操作(命令)分离。命令负责修改系统状态,查询负责获取系统状态。
传统架构的弊端:
- 数据库压力: 读写操作都访问同一个数据库,容易造成数据库压力。
- 性能瓶颈: 复杂的查询操作会影响写操作的性能。
- 缓存失效: 频繁的写操作会导致缓存失效,降低读取性能。
CQRS 的优势:
- 性能优化: 可以针对读操作和写操作分别进行优化,提高系统性能。
- 可扩展性: 可以独立扩展读模型和写模型,提高系统可扩展性。
- 安全隔离: 可以对读操作和写操作进行安全隔离,提高系统安全性。
- 更清晰的领域模型: 命令处理流程更关注业务逻辑,查询流程更关注数据展示,使得领域模型更加清晰。
CQRS 的挑战:
- 复杂性增加: 需要维护两个不同的模型,增加开发和维护成本。
- 最终一致性: 读模型和写模型之间可能存在数据延迟,需要处理最终一致性问题。
4. DDD + Event Sourcing + CQRS:一个电商订单系统的例子
为了更好地理解这三种模式的结合,我们以一个简化的电商订单系统为例进行讲解。
4.1 领域模型
- 限界上下文: 订单上下文(Order Context)
- 通用语言: 订单(Order)、订单项(OrderItem)、商品(Product)、客户(Customer)、订单ID(OrderId)、下单(PlaceOrder)、支付(PayOrder)、发货(ShipOrder)、取消订单(CancelOrder)
- 聚合: 订单(Order)
- 聚合根: 订单(Order)
- 实体: 订单项(OrderItem)
- 值对象: 订单ID(OrderId)
- 领域事件:
OrderPlacedEvent:订单已创建OrderPaidEvent:订单已支付OrderShippedEvent:订单已发货OrderCancelledEvent:订单已取消
4.2 事件模型
// 抽象事件类
public abstract class Event {
private final UUID eventId;
private final Date timestamp;
public Event() {
this.eventId = UUID.randomUUID();
this.timestamp = new Date();
}
public UUID getEventId() {
return eventId;
}
public Date getTimestamp() {
return timestamp;
}
}
// 订单已创建事件
public class OrderPlacedEvent extends Event {
private final UUID orderId;
private final UUID customerId;
private final List<OrderItem> orderItems;
public OrderPlacedEvent(UUID orderId, UUID customerId, List<OrderItem> orderItems) {
this.orderId = orderId;
this.customerId = customerId;
this.orderItems = orderItems;
}
public UUID getOrderId() {
return orderId;
}
public UUID getCustomerId() {
return customerId;
}
public List<OrderItem> getOrderItems() {
return orderItems;
}
}
// 订单已支付事件
public class OrderPaidEvent extends Event {
private final UUID orderId;
public OrderPaidEvent(UUID orderId) {
this.orderId = orderId;
}
public UUID getOrderId() {
return orderId;
}
}
// 订单已发货事件
public class OrderShippedEvent extends Event {
private final UUID orderId;
public OrderShippedEvent(UUID orderId) {
this.orderId = orderId;
}
public UUID getOrderId() {
return orderId;
}
}
// 订单已取消事件
public class OrderCancelledEvent extends Event {
private final UUID orderId;
public OrderCancelledEvent(UUID orderId) {
this.orderId = orderId;
}
public UUID getOrderId() {
return orderId;
}
}
//订单项
public class OrderItem {
private final UUID productId;
private final int quantity;
private final BigDecimal price;
public OrderItem(UUID productId, int quantity, BigDecimal price) {
this.productId = productId;
this.quantity = quantity;
this.price = price;
}
public UUID getProductId() {
return productId;
}
public int getQuantity() {
return quantity;
}
public BigDecimal getPrice() {
return price;
}
}
4.3 聚合根(Order)
public class Order {
private UUID orderId;
private UUID customerId;
private List<OrderItem> orderItems;
private OrderStatus status;
private List<Event> changes = new ArrayList<>();
public Order(UUID orderId, UUID customerId, List<OrderItem> orderItems) {
this.orderId = orderId;
this.customerId = customerId;
this.orderItems = orderItems;
this.status = OrderStatus.NEW;
raiseEvent(new OrderPlacedEvent(orderId, customerId, orderItems));
}
// 从事件重建订单
public Order(List<Event> events) {
for (Event event : events) {
apply(event);
}
}
public void pay() {
if (this.status != OrderStatus.NEW) {
throw new IllegalStateException("Order cannot be paid in current status: " + this.status);
}
this.status = OrderStatus.PAID;
raiseEvent(new OrderPaidEvent(this.orderId));
}
public void ship() {
if (this.status != OrderStatus.PAID) {
throw new IllegalStateException("Order cannot be shipped in current status: " + this.status);
}
this.status = OrderStatus.SHIPPED;
raiseEvent(new OrderShippedEvent(this.orderId));
}
public void cancel() {
if (this.status != OrderStatus.NEW) {
throw new IllegalStateException("Order cannot be cancelled in current status: " + this.status);
}
this.status = OrderStatus.CANCELLED;
raiseEvent(new OrderCancelledEvent(this.orderId));
}
// 应用事件
private void apply(Event event) {
if (event instanceof OrderPlacedEvent) {
OrderPlacedEvent e = (OrderPlacedEvent) event;
this.orderId = e.getOrderId();
this.customerId = e.getCustomerId();
this.orderItems = e.getOrderItems();
this.status = OrderStatus.NEW;
} else if (event instanceof OrderPaidEvent) {
this.status = OrderStatus.PAID;
} else if (event instanceof OrderShippedEvent) {
this.status = OrderStatus.SHIPPED;
} else if (event instanceof OrderCancelledEvent) {
this.status = OrderStatus.CANCELLED;
}
}
// 记录领域事件
private void raiseEvent(Event event) {
apply(event);
this.changes.add(event);
}
public UUID getOrderId() {
return orderId;
}
public UUID getCustomerId() {
return customerId;
}
public List<OrderItem> getOrderItems() {
return orderItems;
}
public OrderStatus getStatus() {
return status;
}
public List<Event> getUncommittedChanges() {
return changes;
}
public void markChangesAsCommitted() {
changes.clear();
}
//订单状态枚举
public enum OrderStatus {
NEW,
PAID,
SHIPPED,
CANCELLED
}
}
4.4 命令模型(Write Model)
命令模型负责接收和处理命令,并产生领域事件。
// 命令接口
public interface Command {
}
// 下单命令
public class PlaceOrderCommand implements Command {
private final UUID orderId;
private final UUID customerId;
private final List<OrderItem> orderItems;
public PlaceOrderCommand(UUID orderId, UUID customerId, List<OrderItem> orderItems) {
this.orderId = orderId;
this.customerId = customerId;
this.orderItems = orderItems;
}
public UUID getOrderId() {
return orderId;
}
public UUID getCustomerId() {
return customerId;
}
public List<OrderItem> getOrderItems() {
return orderItems;
}
}
// 支付订单命令
public class PayOrderCommand implements Command {
private final UUID orderId;
public PayOrderCommand(UUID orderId) {
this.orderId = orderId;
}
public UUID getOrderId() {
return orderId;
}
}
// 发货订单命令
public class ShipOrderCommand implements Command {
private final UUID orderId;
public ShipOrderCommand(UUID orderId) {
this.orderId = orderId;
}
public UUID getOrderId() {
return orderId;
}
}
// 取消订单命令
public class CancelOrderCommand implements Command {
private final UUID orderId;
public CancelOrderCommand(UUID orderId) {
this.orderId = orderId;
}
public UUID getOrderId() {
return orderId;
}
}
// 命令处理器接口
public interface CommandHandler<T extends Command> {
void handle(T command);
}
// 下单命令处理器
public class PlaceOrderCommandHandler implements CommandHandler<PlaceOrderCommand> {
private final EventStore eventStore;
public PlaceOrderCommandHandler(EventStore eventStore) {
this.eventStore = eventStore;
}
@Override
public void handle(PlaceOrderCommand command) {
Order order = new Order(command.getOrderId(), command.getCustomerId(), command.getOrderItems());
eventStore.saveEvents(command.getOrderId(), order.getUncommittedChanges(), -1);
order.markChangesAsCommitted();
}
}
// 支付订单命令处理器
public class PayOrderCommandHandler implements CommandHandler<PayOrderCommand> {
private final EventStore eventStore;
public PayOrderCommandHandler(EventStore eventStore) {
this.eventStore = eventStore;
}
@Override
public void handle(PayOrderCommand command) {
Order order = loadOrder(command.getOrderId());
order.pay();
eventStore.saveEvents(command.getOrderId(), order.getUncommittedChanges(), -1);
order.markChangesAsCommitted();
}
private Order loadOrder(UUID orderId) {
List<Event> events = eventStore.loadEvents(orderId);
if (events == null || events.isEmpty()) {
throw new IllegalArgumentException("Order not found: " + orderId);
}
return new Order(events);
}
}
// 发货订单命令处理器
public class ShipOrderCommandHandler implements CommandHandler<ShipOrderCommand> {
private final EventStore eventStore;
public ShipOrderCommandHandler(EventStore eventStore) {
this.eventStore = eventStore;
}
@Override
public void handle(ShipOrderCommand command) {
Order order = loadOrder(command.getOrderId());
order.ship();
eventStore.saveEvents(command.getOrderId(), order.getUncommittedChanges(), -1);
order.markChangesAsCommitted();
}
private Order loadOrder(UUID orderId) {
List<Event> events = eventStore.loadEvents(orderId);
if (events == null || events.isEmpty()) {
throw new IllegalArgumentException("Order not found: " + orderId);
}
return new Order(events);
}
}
// 取消订单命令处理器
public class CancelOrderCommandHandler implements CommandHandler<CancelOrderCommand> {
private final EventStore eventStore;
public CancelOrderCommandHandler(EventStore eventStore) {
this.eventStore = eventStore;
}
@Override
public void handle(CancelOrderCommand command) {
Order order = loadOrder(command.getOrderId());
order.cancel();
eventStore.saveEvents(command.getOrderId(), order.getUncommittedChanges(), -1);
order.markChangesAsCommitted();
}
private Order loadOrder(UUID orderId) {
List<Event> events = eventStore.loadEvents(orderId);
if (events == null || events.isEmpty()) {
throw new IllegalArgumentException("Order not found: " + orderId);
}
return new Order(events);
}
}
4.5 事件存储(Event Store)
事件存储负责持久化和读取事件。
// 事件存储接口
public interface EventStore {
void saveEvents(UUID aggregateId, List<Event> events, int expectedVersion);
List<Event> loadEvents(UUID aggregateId);
}
// 基于内存的事件存储实现(仅用于演示)
public class InMemoryEventStore implements EventStore {
private final Map<UUID, List<Event>> events = new ConcurrentHashMap<>();
@Override
public void saveEvents(UUID aggregateId, List<Event> newEvents, int expectedVersion) {
List<Event> existingEvents = events.computeIfAbsent(aggregateId, k -> new ArrayList<>());
// 乐观锁,防止并发修改
if (expectedVersion != -1 && existingEvents.size() != expectedVersion) {
throw new ConcurrencyException("Concurrency exception, expected version: " + expectedVersion + ", actual version: " + existingEvents.size());
}
existingEvents.addAll(newEvents);
}
@Override
public List<Event> loadEvents(UUID aggregateId) {
return events.getOrDefault(aggregateId, Collections.emptyList());
}
public static class ConcurrencyException extends RuntimeException {
public ConcurrencyException(String message) {
super(message);
}
}
}
4.6 查询模型(Read Model)
查询模型负责提供查询接口,并根据事件更新读模型。
// 订单 DTO,用于查询
public class OrderDTO {
private UUID orderId;
private UUID customerId;
private List<OrderItem> orderItems;
private String status;
// Getters and Setters
public OrderDTO() {}
public OrderDTO(UUID orderId, UUID customerId, List<OrderItem> orderItems, String status) {
this.orderId = orderId;
this.customerId = customerId;
this.orderItems = orderItems;
this.status = status;
}
public UUID getOrderId() {
return orderId;
}
public void setOrderId(UUID orderId) {
this.orderId = orderId;
}
public UUID getCustomerId() {
return customerId;
}
public void setCustomerId(UUID customerId) {
this.customerId = customerId;
}
public List<OrderItem> getOrderItems() {
return orderItems;
}
public void setOrderItems(List<OrderItem> orderItems) {
this.orderItems = orderItems;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}
// 订单查询服务
public interface OrderQueryService {
OrderDTO getOrder(UUID orderId);
List<OrderDTO> getOrdersByCustomer(UUID customerId);
}
// 基于内存的订单查询服务实现(仅用于演示)
public class InMemoryOrderQueryService implements OrderQueryService {
private final Map<UUID, OrderDTO> orders = new ConcurrentHashMap<>();
public InMemoryOrderQueryService(EventStore eventStore) {
// 初始化读模型,从事件存储中重建所有订单
List<UUID> aggregateIds = new ArrayList<>(eventStore.loadEvents(null).stream().map(e -> {
if(e instanceof OrderPlacedEvent){
return ((OrderPlacedEvent) e).getOrderId();
}
if(e instanceof OrderPaidEvent){
return ((OrderPaidEvent) e).getOrderId();
}
if(e instanceof OrderShippedEvent){
return ((OrderShippedEvent) e).getOrderId();
}
if(e instanceof OrderCancelledEvent){
return ((OrderCancelledEvent) e).getOrderId();
}
return null;
}).collect(Collectors.toSet()));
for (UUID aggregateId : aggregateIds) {
List<Event> events = eventStore.loadEvents(aggregateId);
if (!events.isEmpty()) {
Order order = new Order(events);
OrderDTO orderDTO = new OrderDTO(order.getOrderId(), order.getCustomerId(), order.getOrderItems(), order.getStatus().toString());
orders.put(order.getOrderId(), orderDTO);
}
}
}
@Override
public OrderDTO getOrder(UUID orderId) {
return orders.get(orderId);
}
@Override
public List<OrderDTO> getOrdersByCustomer(UUID customerId) {
return orders.values().stream()
.filter(order -> order.getCustomerId().equals(customerId))
.collect(Collectors.toList());
}
// 更新读模型
public void on(OrderPlacedEvent event) {
OrderDTO orderDTO = new OrderDTO(event.getOrderId(), event.getCustomerId(), event.getOrderItems(), Order.OrderStatus.NEW.toString());
orders.put(event.getOrderId(), orderDTO);
}
public void on(OrderPaidEvent event) {
OrderDTO orderDTO = orders.get(event.getOrderId());
if (orderDTO != null) {
orderDTO.setStatus(Order.OrderStatus.PAID.toString());
}
}
public void on(OrderShippedEvent event) {
OrderDTO orderDTO = orders.get(event.getOrderId());
if (orderDTO != null) {
orderDTO.setStatus(Order.OrderStatus.SHIPPED.toString());
}
}
public void on(OrderCancelledEvent event) {
OrderDTO orderDTO = orders.get(event.getOrderId());
if (orderDTO != null) {
orderDTO.setStatus(Order.OrderStatus.CANCELLED.toString());
}
}
}
4.7 事件处理器(Event Handler)
事件处理器负责订阅领域事件,并更新查询模型。通常使用消息队列来实现异步事件处理。
// 事件处理器接口
public interface EventHandler<T extends Event> {
void handle(T event);
}
// 订单事件处理器
public class OrderEventHandler {
private final InMemoryOrderQueryService orderQueryService;
public OrderEventHandler(InMemoryOrderQueryService orderQueryService) {
this.orderQueryService = orderQueryService;
}
@EventListener
public void handle(OrderPlacedEvent event) {
orderQueryService.on(event);
}
@EventListener
public void handle(OrderPaidEvent event) {
orderQueryService.on(event);
}
@EventListener
public void handle(OrderShippedEvent event) {
orderQueryService.on(event);
}
@EventListener
public void handle(OrderCancelledEvent event) {
orderQueryService.on(event);
}
}
5. 代码示例
public class Main {
public static void main(String[] args) {
// 初始化基础设施
InMemoryEventStore eventStore = new InMemoryEventStore();
InMemoryOrderQueryService orderQueryService = new InMemoryOrderQueryService(eventStore);
OrderEventHandler orderEventHandler = new OrderEventHandler(orderQueryService);
// 初始化命令处理器
PlaceOrderCommandHandler placeOrderCommandHandler = new PlaceOrderCommandHandler(eventStore);
PayOrderCommandHandler payOrderCommandHandler = new PayOrderCommandHandler(eventStore);
ShipOrderCommandHandler shipOrderCommandHandler = new ShipOrderCommandHandler(eventStore);
CancelOrderCommandHandler cancelOrderCommandHandler = new CancelOrderCommandHandler(eventStore);
// 创建订单
UUID orderId = UUID.randomUUID();
UUID customerId = UUID.randomUUID();
List<OrderItem> orderItems = Arrays.asList(
new OrderItem(UUID.randomUUID(), 2, new BigDecimal("10.00")),
new OrderItem(UUID.randomUUID(), 1, new BigDecimal("20.00"))
);
PlaceOrderCommand placeOrderCommand = new PlaceOrderCommand(orderId, customerId, orderItems);
placeOrderCommandHandler.handle(placeOrderCommand);
orderEventHandler.handle(new OrderPlacedEvent(orderId, customerId, orderItems));
// 支付订单
PayOrderCommand payOrderCommand = new PayOrderCommand(orderId);
payOrderCommandHandler.handle(payOrderCommand);
orderEventHandler.handle(new OrderPaidEvent(orderId));
// 查询订单
OrderDTO orderDTO = orderQueryService.getOrder(orderId);
System.out.println("Order ID: " + orderDTO.getOrderId());
System.out.println("Customer ID: " + orderDTO.getCustomerId());
System.out.println("Status: " + orderDTO.getStatus());
}
}
6. 总结
通过以上示例,我们了解了如何将 DDD、事件溯源和 CQRS 结合起来构建一个简单的订单系统。DDD 帮助我们理解和建模领域,事件溯源提供了完整的数据历史和审计能力,CQRS 优化了读写性能。虽然这种架构模式比较复杂,但它能解决传统应用开发中遇到的许多问题,并为构建可扩展、可维护的系统提供了强大的支持。
7. 一些使用上的建议
- 从简单开始: 不要一开始就尝试应用所有的模式,可以先从一个简单的限界上下文开始,逐步引入事件溯源和 CQRS。
- 关注领域: 始终关注领域模型,与领域专家保持密切沟通,确保模型能够准确反映业务需求。
- 选择合适的技术: 选择适合项目需求的技术栈,例如事件存储可以使用关系数据库、NoSQL 数据库或专门的事件存储系统。
- 处理最终一致性: 读模型和写模型之间可能存在数据延迟,需要采取合适的策略处理最终一致性问题,例如使用重试机制、Saga 模式等。
- 监控和告警: 建立完善的监控和告警机制,及时发现和解决问题。
8. 这种模式的价值是什么
结合DDD,Event Sourcing 和 CQRS 可以建立一个高度可维护、可扩展和可审计的系统,更好地适应业务变化,并提供更强大的数据分析能力。