Spring动态数据源事务传播行为混乱?AbstractRoutingDataSource与TransactionSynchronization

Spring动态数据源事务传播行为混乱分析与解决

大家好,今天我们来深入探讨一个在Spring动态数据源场景下容易遇到的问题:事务传播行为混乱。这个问题往往发生在复杂的业务系统中,由于多种数据源的参与,以及对Spring事务传播机制理解不透彻,导致数据一致性出现问题。我们将结合AbstractRoutingDataSourceTransactionSynchronization这两个关键组件,剖析问题产生的原因,并提供相应的解决方案。

1. 动态数据源简介与AbstractRoutingDataSource

在实际应用中,我们经常会遇到需要连接多个数据库的情况。例如,根据用户ID路由到不同的数据库,或者读写分离架构中,读操作路由到只读数据库,写操作路由到主数据库。Spring提供了AbstractRoutingDataSource来简化这种动态数据源的配置和管理。

AbstractRoutingDataSource是一个抽象类,它继承自AbstractDataSource,实现了DataSource接口。它的核心思想是:在每次获取数据库连接时,动态地决定使用哪个实际的数据源。

其核心方法是determineTargetDataSource(),这是一个抽象方法,需要我们自己实现,根据某种规则来决定使用哪个数据源。

以下是一个简单的AbstractRoutingDataSource实现示例:

public class DynamicDataSource extends AbstractRoutingDataSource {

    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();

    public static void setDataSource(String dataSource) {
        contextHolder.set(dataSource);
    }

    public static String getDataSource() {
        return contextHolder.get();
    }

    public static void clearDataSource() {
        contextHolder.remove();
    }

    @Override
    protected Object determineCurrentLookupKey() {
        return getDataSource();
    }
}

在这个例子中,我们使用ThreadLocal来保存当前线程需要使用的数据源的key。determineCurrentLookupKey()方法会从ThreadLocal中获取这个key,Spring会根据这个key找到对应的数据源。

2. 事务传播行为与TransactionSynchronization

Spring的事务管理机制非常强大,它提供了多种事务传播行为,例如:

  • REQUIRED: 如果当前存在事务,则加入该事务;如果当前没有事务,则创建一个新的事务。
  • REQUIRES_NEW: 无论当前是否存在事务,都创建一个新的事务。如果当前存在事务,则将当前事务挂起。
  • SUPPORTS: 如果当前存在事务,则加入该事务;如果当前没有事务,则以非事务方式执行。
  • NOT_SUPPORTED: 以非事务方式执行操作,如果当前存在事务,则将当前事务挂起。
  • MANDATORY: 如果当前存在事务,则加入该事务;如果当前没有事务,则抛出异常。
  • NEVER: 以非事务方式执行,如果当前存在事务,则抛出异常。
  • NESTED: 如果当前存在事务,则创建一个嵌套事务。嵌套事务可以独立于外部事务进行提交或回滚。

这些传播行为定义了当一个带有事务的方法被另一个带有事务的方法调用时,事务应该如何传播和管理。

TransactionSynchronization是Spring事务管理中的一个接口,它允许我们在事务的不同阶段(例如,事务开始前、事务提交后、事务回滚后)执行一些自定义的操作。我们可以通过实现TransactionSynchronization接口,并在事务管理器中注册我们的实现类,来监听事务的状态变化。

3. 事务传播行为混乱的场景分析

现在,我们来看一个可能导致事务传播行为混乱的场景。假设我们有一个业务场景:用户注册。注册过程涉及到两个数据库:

  • 用户数据库 (user_db): 用于存储用户基本信息,例如用户名、密码等。
  • 积分数据库 (point_db): 用于存储用户的积分信息。

我们希望在用户注册成功后,自动给用户赠送一定的积分。

代码如下:

@Service
public class UserService {

    @Autowired
    private UserDao userDao;

    @Autowired
    private PointService pointService;

    @Transactional(propagation = Propagation.REQUIRED)
    public void registerUser(String username, String password) {
        // 1. 插入用户基本信息到 user_db
        DynamicDataSource.setDataSource("user_db");
        userDao.insertUser(username, password);

        // 2. 给用户赠送积分到 point_db
        pointService.addPoints(username, 100);
    }
}

@Service
public class PointService {

    @Autowired
    private PointDao pointDao;

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void addPoints(String username, int points) {
        DynamicDataSource.setDataSource("point_db");
        pointDao.addPoints(username, points);
    }
}

@Repository
public interface UserDao {
    void insertUser(String username, String password);
}

@Repository
public interface PointDao {
    void addPoints(String username, int points);
}

在这个例子中,UserService.registerUser()方法使用REQUIRED传播行为,PointService.addPoints()方法使用REQUIRES_NEW传播行为。我们希望addPoints()方法在一个新的事务中执行,这样即使用户基本信息插入失败,积分也应该能够成功赠送。

然而,在这个代码中,由于动态数据源的切换,REQUIRES_NEW传播行为可能会失效。原因在于,Spring的事务管理器在创建新的事务时,会从当前的DataSource中获取连接。但是,在PointService.addPoints()方法中,我们已经通过DynamicDataSource.setDataSource("point_db")切换了数据源。这意味着,Spring的事务管理器仍然会从user_db中获取连接,而不是从point_db中获取连接。

因此,addPoints()方法实际上并没有在一个新的事务中执行,而是加入了registerUser()方法的事务。如果registerUser()方法回滚,addPoints()方法也会被回滚,这与我们的预期不符。

更具体的原因

更具体的原因在于,Spring的事务管理器在创建事务时,会和当前线程绑定一个TransactionSynchronizationManager,这个Manager会维护当前事务的一些信息,包括当前事务使用的DataSource

当我们调用DynamicDataSource.setDataSource("point_db")时,仅仅改变了ThreadLocal中保存的数据源key,并没有改变TransactionSynchronizationManager中保存的DataSource。因此,Spring的事务管理器仍然会使用之前的DataSource来创建新的事务。

4. 解决方案:使用TransactionTemplate

为了解决这个问题,我们可以使用TransactionTemplate来显式地管理事务。TransactionTemplate允许我们在代码中手动控制事务的开始、提交和回滚。

修改后的代码如下:

@Service
public class UserService {

    @Autowired
    private UserDao userDao;

    @Autowired
    private PointService pointService;

    @Transactional(propagation = Propagation.REQUIRED)
    public void registerUser(String username, String password) {
        // 1. 插入用户基本信息到 user_db
        DynamicDataSource.setDataSource("user_db");
        userDao.insertUser(username, password);

        // 2. 给用户赠送积分到 point_db
        pointService.addPoints(username, 100);
    }
}

@Service
public class PointService {

    @Autowired
    private PointDao pointDao;

    @Autowired
    private PlatformTransactionManager transactionManager;

    public void addPoints(String username, int points) {
        TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
        transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);

        transactionTemplate.execute(status -> {
            DynamicDataSource.setDataSource("point_db");
            pointDao.addPoints(username, points);
            return null;
        });
    }
}

@Repository
public interface UserDao {
    void insertUser(String username, String password);
}

@Repository
public interface PointDao {
    void addPoints(String username, int points);
}

在这个修改后的代码中,我们不再使用@Transactional注解来声明PointService.addPoints()方法的事务。而是使用TransactionTemplate来手动创建一个新的事务。

transactionTemplate.execute()方法中,我们首先切换数据源到point_db,然后执行插入积分的操作。由于我们显式地创建了一个新的事务,Spring的事务管理器会从point_db中获取连接,并创建一个新的事务。

这样,即使registerUser()方法回滚,addPoints()方法也会成功提交,符合我们的预期。

5. 另一种解决方案:手动管理TransactionSynchronization

另一种更复杂但更灵活的解决方案是手动管理TransactionSynchronization。这种方法允许我们在数据源切换前后注册和注销TransactionSynchronization,以确保事务管理器能够正确地管理事务。

代码如下:

@Service
public class UserService {

    @Autowired
    private UserDao userDao;

    @Autowired
    private PointService pointService;

    @Transactional(propagation = Propagation.REQUIRED)
    public void registerUser(String username, String password) {
        // 1. 插入用户基本信息到 user_db
        DynamicDataSource.setDataSource("user_db");
        userDao.insertUser(username, password);

        // 2. 给用户赠送积分到 point_db
        pointService.addPoints(username, 100);
    }
}

@Service
public class PointService {

    @Autowired
    private PointDao pointDao;

    @Autowired
    private PlatformTransactionManager transactionManager;

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void addPoints(String username, int points) {
        // 1. 获取当前事务同步管理器
        TransactionSynchronizationManager synchronizationManager = TransactionSynchronizationManager.isSynchronizationActive() ? TransactionSynchronizationManager.getCurrent() : null;

        // 2. 切换数据源前,保存当前事务同步状态
        boolean existingSynchronization = TransactionSynchronizationManager.isSynchronizationActive();
        List<TransactionSynchronization> synchronizations = null;
        if (existingSynchronization) {
            synchronizations = new ArrayList<>(TransactionSynchronizationManager.getSynchronizations());
            TransactionSynchronizationManager.clear(); // 清空当前线程的同步器,避免影响新数据源的事务同步
        }

        // 3. 切换数据源
        DynamicDataSource.setDataSource("point_db");

        try {
            // 4. 执行业务逻辑
            pointDao.addPoints(username, points);
        } finally {
            // 5. 恢复事务同步状态
            if (existingSynchronization && synchronizations != null) {
                TransactionSynchronizationManager.initSynchronization();  // 初始化新的同步器
                for (TransactionSynchronization synchronization : synchronizations) {
                    TransactionSynchronizationManager.registerSynchronization(synchronization);  // 重新注册之前的同步器
                }
            }
            DynamicDataSource.clearDataSource();
        }
    }
}

@Repository
public interface UserDao {
    void insertUser(String username, String password);
}

@Repository
public interface PointDao {
    void addPoints(username, int points);
}

这个方案的核心思想是:

  1. 保存当前事务同步状态: 在切换数据源之前,我们需要保存当前线程的TransactionSynchronization,并清空TransactionSynchronizationManager
  2. 切换数据源: 切换到新的数据源。
  3. 执行业务逻辑: 在新的数据源上执行业务逻辑。
  4. 恢复事务同步状态: 在业务逻辑执行完毕后,我们需要恢复之前的TransactionSynchronization,并将之前保存的TransactionSynchronization重新注册到TransactionSynchronizationManager中。
  5. 清理数据源 清理当前线程绑定的数据源。

这种方法更加复杂,但它可以更好地控制事务的传播和管理。它也更加灵活,可以应对更复杂的动态数据源场景。

6. 总结与最佳实践

问题 解决方案 优点 缺点
动态数据源切换导致事务传播行为混乱 使用 TransactionTemplate 手动管理事务 简单易用,代码清晰 需要手动编写事务管理代码,增加代码量
动态数据源切换导致事务传播行为混乱 手动管理 TransactionSynchronization 更加灵活,可以应对更复杂的动态数据源场景 代码复杂,容易出错,需要对Spring事务管理机制有深入的理解
动态数据源切换导致事务传播行为混乱 尽量避免跨数据源的事务,如果必须跨数据源事务,可以考虑使用分布式事务解决方案(例如Seata、Atomikos等) 避免了复杂的事务管理,降低了系统复杂度 分布式事务的性能开销较大,会影响系统的性能

在实际应用中,我们应该尽量避免跨数据源的事务。如果必须跨数据源事务,可以考虑使用分布式事务解决方案。

总而言之,在Spring动态数据源场景下,我们需要特别注意事务的传播行为。我们需要深入理解Spring的事务管理机制,并选择合适的解决方案来确保数据的一致性。

避免混乱,掌握关键点

理解AbstractRoutingDataSource的原理,以及TransactionSynchronization在事务管理中的作用,是解决问题的关键。TransactionTemplate和手动管理TransactionSynchronization是两种可行的解决方案,但需要根据具体场景选择。

数据源切换时的事务管理至关重要

在动态数据源场景下,数据源切换的时机和方式对事务的传播行为有着重要的影响。我们需要 carefully 地设计数据源切换的逻辑,并确保事务管理器能够正确地管理事务。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注