基于事件溯源(Event Sourcing)和CQRS的Java领域驱动设计(DDD)实践

基于事件溯源(Event Sourcing)和CQRS的Java领域驱动设计(DDD)实践

大家好,今天我们来聊聊一个相对复杂但威力强大的架构模式组合:基于事件溯源(Event Sourcing)和命令查询职责分离(CQRS)的Java领域驱动设计(DDD)实践。 这三种模式单独拿出来都有各自的优势,组合起来更是能解决传统应用开发中遇到的许多问题。 本次讲座将通过一个具体的例子,深入浅出地讲解如何将它们应用到实际项目中。

1. 领域驱动设计(DDD)回顾

DDD 是一种软件开发方法论,它强调以领域为中心,通过与领域专家协作,理解领域模型,并将模型反映到代码中。DDD 关注业务逻辑,而非技术细节。它的核心思想是:

  • 限界上下文(Bounded Context): 定义领域模型的边界,每个限界上下文中都有自己的领域模型,避免模型之间的混淆。
  • 通用语言(Ubiquitous Language): 团队成员(包括开发人员、领域专家等)使用统一的语言描述领域概念,消除沟通障碍。
  • 实体(Entity): 具有唯一标识的对象,其状态会随着时间变化。
  • 值对象(Value Object): 不具有唯一标识的对象,其状态不可变,只关心其属性值。
  • 聚合(Aggregate): 一组相关对象的集合,其中一个对象是聚合根(Aggregate Root),外部只能通过聚合根访问聚合内的其他对象,保证数据一致性。
  • 领域服务(Domain Service): 执行跨多个聚合的业务逻辑。
  • 领域事件(Domain Event): 领域中发生的有意义的事件,用于通知其他限界上下文或应用组件。
  • 仓储(Repository): 提供访问聚合的接口,隐藏数据访问细节。

2. 事件溯源(Event Sourcing)

事件溯源的核心思想是将领域对象的状态变化记录为一系列的事件。每次状态变更不是直接更新对象的状态,而是将事件追加到事件日志中。通过重放事件日志,可以重建对象的当前状态。

传统 CRUD 方式的弊端:

  • 数据丢失风险: 每次更新数据都会覆盖旧数据,无法追溯历史状态。
  • 审计困难: 难以追踪数据的变更历史,不便于审计和分析。
  • 并发冲突: 多用户同时修改同一数据时,容易发生并发冲突。

事件溯源的优势:

  • 完整的数据历史: 能够追溯对象的每一个状态变化。
  • 审计和分析: 便于进行数据审计、分析和报表生成。
  • 可追溯性: 能够回溯到任意时间点的状态。
  • 异步处理: 事件可以用于触发异步处理,提高系统响应速度。
  • 更容易的微服务架构: 各个微服务可以通过订阅事件进行数据同步,实现最终一致性。

事件溯源的挑战:

  • 复杂性增加: 需要设计事件模型、事件存储和事件重放机制。
  • 查询复杂: 不能直接从数据库读取对象的当前状态,需要重放事件。
  • 事件演化: 需要处理事件模式的演化,保证旧事件在新系统中仍然可用。

3. 命令查询职责分离(CQRS)

CQRS 的核心思想是将系统的读操作(查询)和写操作(命令)分离。命令负责修改系统状态,查询负责获取系统状态。

传统架构的弊端:

  • 数据库压力: 读写操作都访问同一个数据库,容易造成数据库压力。
  • 性能瓶颈: 复杂的查询操作会影响写操作的性能。
  • 缓存失效: 频繁的写操作会导致缓存失效,降低读取性能。

CQRS 的优势:

  • 性能优化: 可以针对读操作和写操作分别进行优化,提高系统性能。
  • 可扩展性: 可以独立扩展读模型和写模型,提高系统可扩展性。
  • 安全隔离: 可以对读操作和写操作进行安全隔离,提高系统安全性。
  • 更清晰的领域模型: 命令处理流程更关注业务逻辑,查询流程更关注数据展示,使得领域模型更加清晰。

CQRS 的挑战:

  • 复杂性增加: 需要维护两个不同的模型,增加开发和维护成本。
  • 最终一致性: 读模型和写模型之间可能存在数据延迟,需要处理最终一致性问题。

4. DDD + Event Sourcing + CQRS:一个电商订单系统的例子

为了更好地理解这三种模式的结合,我们以一个简化的电商订单系统为例进行讲解。

4.1 领域模型

  • 限界上下文: 订单上下文(Order Context)
  • 通用语言: 订单(Order)、订单项(OrderItem)、商品(Product)、客户(Customer)、订单ID(OrderId)、下单(PlaceOrder)、支付(PayOrder)、发货(ShipOrder)、取消订单(CancelOrder)
  • 聚合: 订单(Order)
    • 聚合根: 订单(Order)
    • 实体: 订单项(OrderItem)
    • 值对象: 订单ID(OrderId)
  • 领域事件:
    • OrderPlacedEvent:订单已创建
    • OrderPaidEvent:订单已支付
    • OrderShippedEvent:订单已发货
    • OrderCancelledEvent:订单已取消

4.2 事件模型

// 抽象事件类
public abstract class Event {
    private final UUID eventId;
    private final Date timestamp;

    public Event() {
        this.eventId = UUID.randomUUID();
        this.timestamp = new Date();
    }

    public UUID getEventId() {
        return eventId;
    }

    public Date getTimestamp() {
        return timestamp;
    }
}

// 订单已创建事件
public class OrderPlacedEvent extends Event {
    private final UUID orderId;
    private final UUID customerId;
    private final List<OrderItem> orderItems;

    public OrderPlacedEvent(UUID orderId, UUID customerId, List<OrderItem> orderItems) {
        this.orderId = orderId;
        this.customerId = customerId;
        this.orderItems = orderItems;
    }

    public UUID getOrderId() {
        return orderId;
    }

    public UUID getCustomerId() {
        return customerId;
    }

    public List<OrderItem> getOrderItems() {
        return orderItems;
    }
}

// 订单已支付事件
public class OrderPaidEvent extends Event {
    private final UUID orderId;

    public OrderPaidEvent(UUID orderId) {
        this.orderId = orderId;
    }

    public UUID getOrderId() {
        return orderId;
    }
}

// 订单已发货事件
public class OrderShippedEvent extends Event {
    private final UUID orderId;

    public OrderShippedEvent(UUID orderId) {
        this.orderId = orderId;
    }

    public UUID getOrderId() {
        return orderId;
    }
}

// 订单已取消事件
public class OrderCancelledEvent extends Event {
    private final UUID orderId;

    public OrderCancelledEvent(UUID orderId) {
        this.orderId = orderId;
    }

    public UUID getOrderId() {
        return orderId;
    }
}

//订单项
public class OrderItem {

    private final UUID productId;
    private final int quantity;
    private final BigDecimal price;

    public OrderItem(UUID productId, int quantity, BigDecimal price) {
        this.productId = productId;
        this.quantity = quantity;
        this.price = price;
    }

    public UUID getProductId() {
        return productId;
    }

    public int getQuantity() {
        return quantity;
    }

    public BigDecimal getPrice() {
        return price;
    }
}

4.3 聚合根(Order)

public class Order {
    private UUID orderId;
    private UUID customerId;
    private List<OrderItem> orderItems;
    private OrderStatus status;
    private List<Event> changes = new ArrayList<>();

    public Order(UUID orderId, UUID customerId, List<OrderItem> orderItems) {
        this.orderId = orderId;
        this.customerId = customerId;
        this.orderItems = orderItems;
        this.status = OrderStatus.NEW;
        raiseEvent(new OrderPlacedEvent(orderId, customerId, orderItems));
    }

    // 从事件重建订单
    public Order(List<Event> events) {
        for (Event event : events) {
            apply(event);
        }
    }

    public void pay() {
        if (this.status != OrderStatus.NEW) {
            throw new IllegalStateException("Order cannot be paid in current status: " + this.status);
        }
        this.status = OrderStatus.PAID;
        raiseEvent(new OrderPaidEvent(this.orderId));
    }

    public void ship() {
        if (this.status != OrderStatus.PAID) {
            throw new IllegalStateException("Order cannot be shipped in current status: " + this.status);
        }
        this.status = OrderStatus.SHIPPED;
        raiseEvent(new OrderShippedEvent(this.orderId));
    }

    public void cancel() {
        if (this.status != OrderStatus.NEW) {
            throw new IllegalStateException("Order cannot be cancelled in current status: " + this.status);
        }
        this.status = OrderStatus.CANCELLED;
        raiseEvent(new OrderCancelledEvent(this.orderId));
    }

    // 应用事件
    private void apply(Event event) {
        if (event instanceof OrderPlacedEvent) {
            OrderPlacedEvent e = (OrderPlacedEvent) event;
            this.orderId = e.getOrderId();
            this.customerId = e.getCustomerId();
            this.orderItems = e.getOrderItems();
            this.status = OrderStatus.NEW;
        } else if (event instanceof OrderPaidEvent) {
            this.status = OrderStatus.PAID;
        } else if (event instanceof OrderShippedEvent) {
            this.status = OrderStatus.SHIPPED;
        } else if (event instanceof OrderCancelledEvent) {
            this.status = OrderStatus.CANCELLED;
        }
    }

    // 记录领域事件
    private void raiseEvent(Event event) {
        apply(event);
        this.changes.add(event);
    }

    public UUID getOrderId() {
        return orderId;
    }

    public UUID getCustomerId() {
        return customerId;
    }

    public List<OrderItem> getOrderItems() {
        return orderItems;
    }

    public OrderStatus getStatus() {
        return status;
    }

    public List<Event> getUncommittedChanges() {
        return changes;
    }

    public void markChangesAsCommitted() {
        changes.clear();
    }

    //订单状态枚举
    public enum OrderStatus {
        NEW,
        PAID,
        SHIPPED,
        CANCELLED
    }
}

4.4 命令模型(Write Model)

命令模型负责接收和处理命令,并产生领域事件。

// 命令接口
public interface Command {
}

// 下单命令
public class PlaceOrderCommand implements Command {
    private final UUID orderId;
    private final UUID customerId;
    private final List<OrderItem> orderItems;

    public PlaceOrderCommand(UUID orderId, UUID customerId, List<OrderItem> orderItems) {
        this.orderId = orderId;
        this.customerId = customerId;
        this.orderItems = orderItems;
    }

    public UUID getOrderId() {
        return orderId;
    }

    public UUID getCustomerId() {
        return customerId;
    }

    public List<OrderItem> getOrderItems() {
        return orderItems;
    }
}

// 支付订单命令
public class PayOrderCommand implements Command {
    private final UUID orderId;

    public PayOrderCommand(UUID orderId) {
        this.orderId = orderId;
    }

    public UUID getOrderId() {
        return orderId;
    }
}

// 发货订单命令
public class ShipOrderCommand implements Command {
    private final UUID orderId;

    public ShipOrderCommand(UUID orderId) {
        this.orderId = orderId;
    }

    public UUID getOrderId() {
        return orderId;
    }
}

// 取消订单命令
public class CancelOrderCommand implements Command {
    private final UUID orderId;

    public CancelOrderCommand(UUID orderId) {
        this.orderId = orderId;
    }

    public UUID getOrderId() {
        return orderId;
    }
}

// 命令处理器接口
public interface CommandHandler<T extends Command> {
    void handle(T command);
}

// 下单命令处理器
public class PlaceOrderCommandHandler implements CommandHandler<PlaceOrderCommand> {

    private final EventStore eventStore;

    public PlaceOrderCommandHandler(EventStore eventStore) {
        this.eventStore = eventStore;
    }

    @Override
    public void handle(PlaceOrderCommand command) {
        Order order = new Order(command.getOrderId(), command.getCustomerId(), command.getOrderItems());
        eventStore.saveEvents(command.getOrderId(), order.getUncommittedChanges(), -1);
        order.markChangesAsCommitted();
    }
}

// 支付订单命令处理器
public class PayOrderCommandHandler implements CommandHandler<PayOrderCommand> {

    private final EventStore eventStore;

    public PayOrderCommandHandler(EventStore eventStore) {
        this.eventStore = eventStore;
    }

    @Override
    public void handle(PayOrderCommand command) {
        Order order = loadOrder(command.getOrderId());
        order.pay();
        eventStore.saveEvents(command.getOrderId(), order.getUncommittedChanges(), -1);
        order.markChangesAsCommitted();
    }

    private Order loadOrder(UUID orderId) {
        List<Event> events = eventStore.loadEvents(orderId);
        if (events == null || events.isEmpty()) {
            throw new IllegalArgumentException("Order not found: " + orderId);
        }
        return new Order(events);
    }
}

// 发货订单命令处理器
public class ShipOrderCommandHandler implements CommandHandler<ShipOrderCommand> {

    private final EventStore eventStore;

    public ShipOrderCommandHandler(EventStore eventStore) {
        this.eventStore = eventStore;
    }

    @Override
    public void handle(ShipOrderCommand command) {
        Order order = loadOrder(command.getOrderId());
        order.ship();
        eventStore.saveEvents(command.getOrderId(), order.getUncommittedChanges(), -1);
        order.markChangesAsCommitted();
    }

    private Order loadOrder(UUID orderId) {
        List<Event> events = eventStore.loadEvents(orderId);
        if (events == null || events.isEmpty()) {
            throw new IllegalArgumentException("Order not found: " + orderId);
        }
        return new Order(events);
    }
}

// 取消订单命令处理器
public class CancelOrderCommandHandler implements CommandHandler<CancelOrderCommand> {

    private final EventStore eventStore;

    public CancelOrderCommandHandler(EventStore eventStore) {
        this.eventStore = eventStore;
    }

    @Override
    public void handle(CancelOrderCommand command) {
        Order order = loadOrder(command.getOrderId());
        order.cancel();
        eventStore.saveEvents(command.getOrderId(), order.getUncommittedChanges(), -1);
        order.markChangesAsCommitted();
    }

    private Order loadOrder(UUID orderId) {
        List<Event> events = eventStore.loadEvents(orderId);
        if (events == null || events.isEmpty()) {
            throw new IllegalArgumentException("Order not found: " + orderId);
        }
        return new Order(events);
    }
}

4.5 事件存储(Event Store)

事件存储负责持久化和读取事件。

// 事件存储接口
public interface EventStore {
    void saveEvents(UUID aggregateId, List<Event> events, int expectedVersion);
    List<Event> loadEvents(UUID aggregateId);
}

// 基于内存的事件存储实现(仅用于演示)
public class InMemoryEventStore implements EventStore {
    private final Map<UUID, List<Event>> events = new ConcurrentHashMap<>();

    @Override
    public void saveEvents(UUID aggregateId, List<Event> newEvents, int expectedVersion) {
        List<Event> existingEvents = events.computeIfAbsent(aggregateId, k -> new ArrayList<>());

        // 乐观锁,防止并发修改
        if (expectedVersion != -1 && existingEvents.size() != expectedVersion) {
            throw new ConcurrencyException("Concurrency exception, expected version: " + expectedVersion + ", actual version: " + existingEvents.size());
        }

        existingEvents.addAll(newEvents);
    }

    @Override
    public List<Event> loadEvents(UUID aggregateId) {
        return events.getOrDefault(aggregateId, Collections.emptyList());
    }

    public static class ConcurrencyException extends RuntimeException {
        public ConcurrencyException(String message) {
            super(message);
        }
    }
}

4.6 查询模型(Read Model)

查询模型负责提供查询接口,并根据事件更新读模型。

// 订单 DTO,用于查询
public class OrderDTO {
    private UUID orderId;
    private UUID customerId;
    private List<OrderItem> orderItems;
    private String status;

    // Getters and Setters
    public OrderDTO() {}

    public OrderDTO(UUID orderId, UUID customerId, List<OrderItem> orderItems, String status) {
        this.orderId = orderId;
        this.customerId = customerId;
        this.orderItems = orderItems;
        this.status = status;
    }

    public UUID getOrderId() {
        return orderId;
    }

    public void setOrderId(UUID orderId) {
        this.orderId = orderId;
    }

    public UUID getCustomerId() {
        return customerId;
    }

    public void setCustomerId(UUID customerId) {
        this.customerId = customerId;
    }

    public List<OrderItem> getOrderItems() {
        return orderItems;
    }

    public void setOrderItems(List<OrderItem> orderItems) {
        this.orderItems = orderItems;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }
}

// 订单查询服务
public interface OrderQueryService {
    OrderDTO getOrder(UUID orderId);
    List<OrderDTO> getOrdersByCustomer(UUID customerId);
}

// 基于内存的订单查询服务实现(仅用于演示)
public class InMemoryOrderQueryService implements OrderQueryService {

    private final Map<UUID, OrderDTO> orders = new ConcurrentHashMap<>();

    public InMemoryOrderQueryService(EventStore eventStore) {
        // 初始化读模型,从事件存储中重建所有订单
        List<UUID> aggregateIds = new ArrayList<>(eventStore.loadEvents(null).stream().map(e -> {
            if(e instanceof OrderPlacedEvent){
                return ((OrderPlacedEvent) e).getOrderId();
            }
            if(e instanceof  OrderPaidEvent){
                return  ((OrderPaidEvent) e).getOrderId();
            }
            if(e instanceof OrderShippedEvent){
                return ((OrderShippedEvent) e).getOrderId();
            }
            if(e instanceof OrderCancelledEvent){
                return ((OrderCancelledEvent) e).getOrderId();
            }
            return null;
        }).collect(Collectors.toSet()));

        for (UUID aggregateId : aggregateIds) {
            List<Event> events = eventStore.loadEvents(aggregateId);
            if (!events.isEmpty()) {
                Order order = new Order(events);
                OrderDTO orderDTO = new OrderDTO(order.getOrderId(), order.getCustomerId(), order.getOrderItems(), order.getStatus().toString());
                orders.put(order.getOrderId(), orderDTO);
            }
        }
    }

    @Override
    public OrderDTO getOrder(UUID orderId) {
        return orders.get(orderId);
    }

    @Override
    public List<OrderDTO> getOrdersByCustomer(UUID customerId) {
        return orders.values().stream()
                .filter(order -> order.getCustomerId().equals(customerId))
                .collect(Collectors.toList());
    }

    // 更新读模型
    public void on(OrderPlacedEvent event) {
        OrderDTO orderDTO = new OrderDTO(event.getOrderId(), event.getCustomerId(), event.getOrderItems(), Order.OrderStatus.NEW.toString());
        orders.put(event.getOrderId(), orderDTO);
    }

    public void on(OrderPaidEvent event) {
        OrderDTO orderDTO = orders.get(event.getOrderId());
        if (orderDTO != null) {
            orderDTO.setStatus(Order.OrderStatus.PAID.toString());
        }
    }

    public void on(OrderShippedEvent event) {
        OrderDTO orderDTO = orders.get(event.getOrderId());
        if (orderDTO != null) {
            orderDTO.setStatus(Order.OrderStatus.SHIPPED.toString());
        }
    }

    public void on(OrderCancelledEvent event) {
        OrderDTO orderDTO = orders.get(event.getOrderId());
        if (orderDTO != null) {
            orderDTO.setStatus(Order.OrderStatus.CANCELLED.toString());
        }
    }
}

4.7 事件处理器(Event Handler)

事件处理器负责订阅领域事件,并更新查询模型。通常使用消息队列来实现异步事件处理。

// 事件处理器接口
public interface EventHandler<T extends Event> {
    void handle(T event);
}

// 订单事件处理器
public class OrderEventHandler {
    private final InMemoryOrderQueryService orderQueryService;

    public OrderEventHandler(InMemoryOrderQueryService orderQueryService) {
        this.orderQueryService = orderQueryService;
    }

    @EventListener
    public void handle(OrderPlacedEvent event) {
        orderQueryService.on(event);
    }

    @EventListener
    public void handle(OrderPaidEvent event) {
        orderQueryService.on(event);
    }

    @EventListener
    public void handle(OrderShippedEvent event) {
        orderQueryService.on(event);
    }

    @EventListener
    public void handle(OrderCancelledEvent event) {
        orderQueryService.on(event);
    }
}

5. 代码示例

public class Main {

    public static void main(String[] args) {
        // 初始化基础设施
        InMemoryEventStore eventStore = new InMemoryEventStore();
        InMemoryOrderQueryService orderQueryService = new InMemoryOrderQueryService(eventStore);
        OrderEventHandler orderEventHandler = new OrderEventHandler(orderQueryService);

        // 初始化命令处理器
        PlaceOrderCommandHandler placeOrderCommandHandler = new PlaceOrderCommandHandler(eventStore);
        PayOrderCommandHandler payOrderCommandHandler = new PayOrderCommandHandler(eventStore);
        ShipOrderCommandHandler shipOrderCommandHandler = new ShipOrderCommandHandler(eventStore);
        CancelOrderCommandHandler cancelOrderCommandHandler = new CancelOrderCommandHandler(eventStore);

        // 创建订单
        UUID orderId = UUID.randomUUID();
        UUID customerId = UUID.randomUUID();
        List<OrderItem> orderItems = Arrays.asList(
                new OrderItem(UUID.randomUUID(), 2, new BigDecimal("10.00")),
                new OrderItem(UUID.randomUUID(), 1, new BigDecimal("20.00"))
        );

        PlaceOrderCommand placeOrderCommand = new PlaceOrderCommand(orderId, customerId, orderItems);
        placeOrderCommandHandler.handle(placeOrderCommand);
        orderEventHandler.handle(new OrderPlacedEvent(orderId, customerId, orderItems));

        // 支付订单
        PayOrderCommand payOrderCommand = new PayOrderCommand(orderId);
        payOrderCommandHandler.handle(payOrderCommand);
        orderEventHandler.handle(new OrderPaidEvent(orderId));

        // 查询订单
        OrderDTO orderDTO = orderQueryService.getOrder(orderId);
        System.out.println("Order ID: " + orderDTO.getOrderId());
        System.out.println("Customer ID: " + orderDTO.getCustomerId());
        System.out.println("Status: " + orderDTO.getStatus());
    }
}

6. 总结

通过以上示例,我们了解了如何将 DDD、事件溯源和 CQRS 结合起来构建一个简单的订单系统。DDD 帮助我们理解和建模领域,事件溯源提供了完整的数据历史和审计能力,CQRS 优化了读写性能。虽然这种架构模式比较复杂,但它能解决传统应用开发中遇到的许多问题,并为构建可扩展、可维护的系统提供了强大的支持。

7. 一些使用上的建议

  • 从简单开始: 不要一开始就尝试应用所有的模式,可以先从一个简单的限界上下文开始,逐步引入事件溯源和 CQRS。
  • 关注领域: 始终关注领域模型,与领域专家保持密切沟通,确保模型能够准确反映业务需求。
  • 选择合适的技术: 选择适合项目需求的技术栈,例如事件存储可以使用关系数据库、NoSQL 数据库或专门的事件存储系统。
  • 处理最终一致性: 读模型和写模型之间可能存在数据延迟,需要采取合适的策略处理最终一致性问题,例如使用重试机制、Saga 模式等。
  • 监控和告警: 建立完善的监控和告警机制,及时发现和解决问题。

8. 这种模式的价值是什么

结合DDD,Event Sourcing 和 CQRS 可以建立一个高度可维护、可扩展和可审计的系统,更好地适应业务变化,并提供更强大的数据分析能力。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注