Spring Boot整合MyBatis缓存穿透导致DB压力暴涨的解决方法

Spring Boot整合MyBatis缓存穿透应对策略

大家好,今天我们来聊一聊Spring Boot整合MyBatis时,如何有效应对缓存穿透问题,避免数据库压力暴涨。缓存穿透是缓存失效时,查询请求直击数据库,如果大量请求涌入,DB很可能崩溃。我们将深入探讨缓存穿透的成因,并提供多种解决方案,包含详细的代码示例和逻辑分析,帮助大家在实际项目中构建健壮的缓存机制。

缓存穿透的原理与危害

缓存穿透是指客户端请求的数据在缓存和数据库中都不存在,导致每次请求都直接访问数据库。如果恶意攻击或者大量访问不存在的数据,数据库会承受巨大的压力,甚至崩溃。

为什么会发生缓存穿透?

  • 缓存未命中: 请求的数据对应的key在缓存中不存在。
  • 数据库不存在: 即使缓存中没有,数据库中也不存在对应的数据。

缓存穿透的危害:

  • 数据库压力剧增: 所有请求都直接访问数据库,导致数据库负载过高。
  • 系统性能下降: 数据库成为瓶颈,导致整个系统响应速度变慢。
  • 服务不可用: 在高并发情况下,数据库可能崩溃,导致服务不可用。

解决方案一:缓存空对象

最简单的解决方案是,当数据库查询结果为空时,我们仍然将一个特殊的值(例如null值或者特定的空对象)放入缓存。下次再遇到相同的请求时,缓存直接返回这个特殊值,避免访问数据库。

优点:

  • 实现简单,易于理解。
  • 可以有效防止缓存穿透。

缺点:

  • 缓存中存储了无效数据,占用一定的缓存空间。
  • 需要设置较短的过期时间,否则可能会导致数据不一致。

代码示例:

@Service
public class UserServiceImpl implements UserService {

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String USER_CACHE_PREFIX = "user:";
    private static final String NULL_VALUE = "NULL"; // 空值标识

    @Override
    public User getUserById(Long id) {
        String key = USER_CACHE_PREFIX + id;
        User user = (User) redisTemplate.opsForValue().get(key);

        if (user != null && !NULL_VALUE.equals(user.getUsername())) { // 检查是否是空值
            System.out.println("从缓存中获取用户");
            return user;
        }

        user = userRepository.findById(id).orElse(null);

        if (user == null) {
            System.out.println("数据库中不存在用户,缓存空对象");
            // 缓存空对象,设置较短的过期时间,比如1分钟
            User nullUser = new User();
            nullUser.setUsername(NULL_VALUE); // 用一个特殊标识代表空对象
            redisTemplate.opsForValue().set(key, nullUser, 1, TimeUnit.MINUTES);
            return null; // 返回null,表示没有找到用户
        }

        System.out.println("从数据库中获取用户");
        redisTemplate.opsForValue().set(key, user, 1, TimeUnit.HOURS); // 设置缓存过期时间
        return user;
    }
}

代码解释:

  1. NULL_VALUE 定义一个常量作为空值的标识符,用于区分真实的用户对象和空对象。
  2. 缓存命中判断: 在从Redis获取到数据后,需要判断取出的对象是否是空对象,如果是空对象,则不返回。
  3. 缓存空对象: 如果数据库查询结果为空,创建一个特殊的空对象,并将其放入缓存,设置较短的过期时间。
  4. 过期时间: 空对象的过期时间要短,例如1分钟,避免长时间的缓存无效数据。正常数据的过期时间可以长一些,例如1小时。

关键点:

  • 选择一个合适的空值标识,例如NULL_VALUE
  • 设置空对象的较短过期时间。
  • 在从缓存获取数据时,需要判断是否是空对象。

解决方案二:布隆过滤器

布隆过滤器是一种概率型数据结构,用于判断一个元素是否存在于一个集合中。它可以高效地告诉你“可能存在”或“绝对不存在”。 如果布隆过滤器判断某个key不存在,那么这个key肯定不存在于数据库中,就可以直接返回,避免查询数据库。

优点:

  • 可以非常有效地过滤掉不存在的key,减少数据库的访问。
  • 占用空间较小。

缺点:

  • 存在一定的误判率,即可能会把存在的数据判断为不存在。
  • 删除元素比较困难。

代码示例:

首先,引入 Google Guava 库,它提供了 BloomFilter 的实现:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.1-jre</version>
</dependency>

然后,实现布隆过滤器:

@Service
public class UserServiceBloomFilterImpl implements UserService {

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String USER_CACHE_PREFIX = "user:";

    private BloomFilter<Long> bloomFilter;

    @PostConstruct // 在Bean初始化后执行
    public void init() {
        // 从数据库加载所有用户ID,初始化布隆过滤器
        List<Long> userIds = userRepository.findAll().stream().map(User::getId).collect(Collectors.toList());

        // 创建布隆过滤器,设置预期插入数量和误判率
        bloomFilter = BloomFilter.create(Funnels.longFunnel(), userIds.size(), 0.01); // 0.01 是误判率

        // 将用户ID添加到布隆过滤器
        userIds.forEach(bloomFilter::put);
    }

    @Override
    public User getUserById(Long id) {
        String key = USER_CACHE_PREFIX + id;
        User user = (User) redisTemplate.opsForValue().get(key);

        if (user != null) {
            System.out.println("从缓存中获取用户");
            return user;
        }

        // 先判断ID是否存在于布隆过滤器中
        if (!bloomFilter.mightContain(id)) {
            System.out.println("ID 不存在于布隆过滤器中,直接返回");
            return null; // 直接返回,避免查询数据库
        }

        user = userRepository.findById(id).orElse(null);

        if (user != null) {
            System.out.println("从数据库中获取用户");
            redisTemplate.opsForValue().set(key, user, 1, TimeUnit.HOURS); // 设置缓存过期时间
        }

        return user;
    }
}

代码解释:

  1. BloomFilter<Long> bloomFilter 定义布隆过滤器对象。
  2. @PostConstruct 使用@PostConstruct注解,在Bean初始化后执行init()方法。
  3. init()方法:
    • 从数据库加载所有用户ID。
    • 创建布隆过滤器,设置预期插入数量和误判率。
    • 将用户ID添加到布隆过滤器。
  4. bloomFilter.mightContain(id) 在查询数据库之前,先判断ID是否存在于布隆过滤器中,如果不存在,则直接返回,避免查询数据库。

关键点:

  • 在应用启动时,从数据库加载所有数据到布隆过滤器中。
  • 选择合适的误判率,需要根据实际情况进行调整。
  • 需要定期更新布隆过滤器中的数据,以保证数据的一致性。

布隆过滤器参数选择:

布隆过滤器的关键在于选择合适的expectedInsertions(预期插入数量)和fpp(误判率)。 这些参数会影响布隆过滤器的大小和性能。

参数 含义 影响
expectedInsertions 预计要插入到布隆过滤器中的元素数量。 如果实际插入的数量远大于预期,误判率会急剧上升。
fpp 期望的误判率(False Positive Probability)。 例如,0.01 表示 1% 的误判率。 误判率越低,布隆过滤器需要的空间越大。 在性能和准确性之间需要权衡。

计算布隆过滤器大小:

布隆过滤器的大小(位数m)和哈希函数的个数k,可以通过以下公式计算:

  • m = -n * ln(p) / (ln(2))^2
  • k = m / n * ln(2)

其中:

  • n 是预期插入的元素数量 (expectedInsertions)
  • p 是期望的误判率 (fpp)

例如,如果 n = 1000000 (1百万) 且 p = 0.01 (1%),则:

  • m ≈ -1000000 * ln(0.01) / (ln(2))^2 ≈ 9585058 位 (大约 1.2MB)
  • k ≈ 9585058 / 1000000 * ln(2) ≈ 6.64 (通常取整数,例如 7)

因此,你需要大约 1.2MB 的空间,并使用 7 个哈希函数。

如何选择合适的参数:

  1. 确定 expectedInsertions 根据你的应用场景,预估可能插入到布隆过滤器中的最大元素数量。 宁可高估,也不要低估,否则误判率会超出预期。
  2. 确定 fpp 根据你的应用场景,确定可接受的误判率。 如果对准确性要求很高,选择较低的误判率,但会增加内存消耗。 如果对内存消耗更敏感,可以适当提高误判率。
  3. 使用在线计算器或公式: 可以使用在线布隆过滤器计算器(例如:https://hur.st/bloomfilter/)或上述公式,根据 expectedInsertionsfpp 计算出合适的布隆过滤器大小和哈希函数个数。

动态更新布隆过滤器:

布隆过滤器不支持直接删除元素。 如果需要删除元素,通常的做法是重建布隆过滤器。可以定期重建布隆过滤器,或者当删除操作达到一定数量时重建。

解决方案三:互斥锁

当缓存未命中时,使用互斥锁(Mutex)来防止大量请求同时访问数据库。 只有一个请求可以获得锁,访问数据库,并将结果写入缓存。 其他请求需要等待锁释放后,再从缓存中获取数据。

优点:

  • 可以有效防止缓存穿透。
  • 实现简单,易于理解。

缺点:

  • 在高并发情况下,可能会导致部分请求阻塞,影响性能。
  • 如果获取锁的线程发生异常,可能导致死锁。

代码示例:

@Service
public class UserServiceMutexImpl implements UserService {

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String USER_CACHE_PREFIX = "user:";

    private final Object lock = new Object(); //互斥锁

    @Override
    public User getUserById(Long id) {
        String key = USER_CACHE_PREFIX + id;
        User user = (User) redisTemplate.opsForValue().get(key);

        if (user != null) {
            System.out.println("从缓存中获取用户");
            return user;
        }

        // 使用互斥锁,防止缓存穿透
        synchronized (lock) {
            // 再次检查缓存,防止重复查询数据库 (double-check locking)
            user = (User) redisTemplate.opsForValue().get(key);
            if (user != null) {
                System.out.println("从缓存中获取用户 (double-check)");
                return user;
            }

            user = userRepository.findById(id).orElse(null);

            if (user != null) {
                System.out.println("从数据库中获取用户");
                redisTemplate.opsForValue().set(key, user, 1, TimeUnit.HOURS); // 设置缓存过期时间
            }

            return user;
        }
    }
}

代码解释:

  1. private final Object lock = new Object(); 定义一个互斥锁对象。
  2. synchronized (lock) 使用synchronized关键字,对代码块进行加锁。
  3. Double-Check Locking: 在获取锁之后,再次检查缓存是否存在,防止重复查询数据库。

关键点:

  • 使用synchronized或者其他锁机制,确保只有一个线程可以访问数据库。
  • 使用Double-Check Locking,减少锁的竞争。

使用 Redis 分布式锁:

如果你的应用是分布式部署的,那么 Java 的 synchronized 锁只能保证单机环境下的互斥性。 你需要使用 Redis 分布式锁来实现跨机器的互斥。

@Service
public class UserServiceRedisLockImpl implements UserService {

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private StringRedisTemplate stringRedisTemplate; // 注意这里使用 StringRedisTemplate

    private static final String USER_CACHE_PREFIX = "user:";
    private static final String LOCK_PREFIX = "lock:user:";

    @Override
    public User getUserById(Long id) {
        String key = USER_CACHE_PREFIX + id;
        User user = (User) stringRedisTemplate.opsForValue().get(key); // 缓存中存储的是 String,需要转换

        if (user != null) {
            System.out.println("从缓存中获取用户");
            return convertJsonToUser(user); // 从缓存中获取的是 JSON String,需要转换成 User 对象
        }

        String lockKey = LOCK_PREFIX + id;
        String lockValue = UUID.randomUUID().toString(); // 唯一值,防止误删

        try {
            // 尝试获取锁,设置过期时间,防止死锁
            Boolean lockAcquired = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS); // 10 秒过期时间

            if (lockAcquired != null && lockAcquired) {
                System.out.println("成功获取锁");

                // 再次检查缓存,防止重复查询数据库
                user = (User) stringRedisTemplate.opsForValue().get(key);
                if (user != null) {
                    System.out.println("从缓存中获取用户 (double-check)");
                    return convertJsonToUser(user);
                }

                User dbUser = userRepository.findById(id).orElse(null);

                if (dbUser != null) {
                    System.out.println("从数据库中获取用户");
                    stringRedisTemplate.opsForValue().set(key, convertUserToJson(dbUser), 1, TimeUnit.HOURS); // 设置缓存过期时间,存储 JSON String
                    return dbUser;
                }
            } else {
                System.out.println("获取锁失败,稍后重试");
                // 获取锁失败,可以稍后重试,或者抛出异常
                try {
                    Thread.sleep(50); // 短暂休眠后重试
                    return getUserById(id); // 递归调用,重试
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return null; // 返回 null 或抛出异常
                }
            }

        } finally {
            // 释放锁,必须确保锁是由当前线程持有的
            if (lockValue.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
                stringRedisTemplate.delete(lockKey);
                System.out.println("释放锁");
            }
        }

        return null;
    }

    // User 对象转换为 JSON String
    private String convertUserToJson(User user) {
        // 使用 Jackson, Gson 等 JSON 库
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.writeValueAsString(user);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null; // 或者抛出异常
        }
    }

    // JSON String 转换为 User 对象
    private User convertJsonToUser(String json) {
         ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.readValue(json, User.class);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null; // 或者抛出异常
        }
    }
}

代码解释:

  1. StringRedisTemplate 使用 StringRedisTemplate,因为 Redis 存储的是字符串。
  2. LOCK_PREFIX 定义锁的 Key 前缀。
  3. lockValue 使用 UUID 作为锁的值,确保唯一性,防止误删。
  4. setIfAbsent 使用 setIfAbsent 方法尝试获取锁,并设置过期时间,防止死锁。
  5. 释放锁: 必须在 finally 块中释放锁,并且要验证锁是否是由当前线程持有的,才能删除锁。
  6. JSON 转换: 因为 Redis 存储的是字符串,需要将 User 对象转换为 JSON 字符串,再存储到 Redis 中。 从 Redis 中读取时,也需要将 JSON 字符串转换回 User 对象。

关键点:

  • 使用 setIfAbsent 方法原子性地获取锁。
  • 设置锁的过期时间,防止死锁。
  • 使用 UUID 作为锁的值,防止误删。
  • finally 块中释放锁,并且要验证锁是否是由当前线程持有的。
  • 使用 JSON 转换器来序列化和反序列化 User 对象。
  • 获取锁失败后,可以进行重试。

解决方案四: 异步更新缓存

当缓存未命中时,先返回一个默认值或者空值,然后异步地从数据库加载数据,更新缓存。 这样可以避免阻塞用户请求,提高系统的响应速度。

优点:

  • 可以提高系统的响应速度。
  • 可以避免阻塞用户请求。

缺点:

  • 实现相对复杂。
  • 存在数据不一致的风险。

代码示例:

@Service
public class UserServiceAsyncImpl implements UserService {

    @Autowired
    private UserRepository userRepository;

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private ApplicationEventPublisher eventPublisher; // 事件发布器

    private static final String USER_CACHE_PREFIX = "user:";

    @Override
    public User getUserById(Long id) {
        String key = USER_CACHE_PREFIX + id;
        User user = (User) redisTemplate.opsForValue().get(key);

        if (user != null) {
            System.out.println("从缓存中获取用户");
            return user;
        }

        // 缓存未命中,先返回 null 或者默认值
        System.out.println("缓存未命中,返回 null,并异步更新缓存");

        // 发布一个事件,异步更新缓存
        eventPublisher.publishEvent(new CacheRefreshEvent(id));

        return null; // 返回 null 或者默认值
    }

    // 异步更新缓存的监听器
    @EventListener
    @Async
    public void onCacheRefreshEvent(CacheRefreshEvent event) {
        Long id = event.getId();
        String key = USER_CACHE_PREFIX + id;

        System.out.println("异步更新缓存,ID: " + id);

        User user = userRepository.findById(id).orElse(null);

        if (user != null) {
            System.out.println("从数据库中获取用户");
            redisTemplate.opsForValue().set(key, user, 1, TimeUnit.HOURS); // 设置缓存过期时间
        }
    }

    // 定义一个事件
    public static class CacheRefreshEvent {
        private final Long id;

        public CacheRefreshEvent(Long id) {
            this.id = id;
        }

        public Long getId() {
            return id;
        }
    }
}

代码解释:

  1. ApplicationEventPublisher eventPublisher 使用 ApplicationEventPublisher 发布事件。
  2. CacheRefreshEvent 定义一个事件类,包含需要刷新的 ID。
  3. eventPublisher.publishEvent(new CacheRefreshEvent(id)) 发布事件,触发异步更新缓存的操作。
  4. @EventListener 使用 @EventListener 注解,监听 CacheRefreshEvent 事件。
  5. @Async 使用 @Async 注解,将事件处理方法异步执行。

关键点:

  • 使用 Spring 的事件机制,实现异步更新缓存。
  • 需要配置 @EnableAsync,开启异步支持。
  • 需要处理异步任务可能出现的异常。
  • 返回默认值或空值,避免阻塞用户请求。

配置 @EnableAsync:

在 Spring Boot 的启动类或者配置类中,需要添加 @EnableAsync 注解,开启异步支持。

@SpringBootApplication
@EnableAsync // 开启异步支持
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

各种方案的对比

方案 优点 缺点 适用场景
缓存空对象 实现简单,有效防止穿透 占用缓存空间,需要设置较短的过期时间 适用于数据不存在的情况较少,且对缓存空间不敏感的场景。
布隆过滤器 高效过滤不存在的key,占用空间小 存在误判率,删除元素困难 适用于数据量大,允许一定误判率,且数据变动不频繁的场景。 例如,黑名单过滤。
互斥锁 实现简单,有效防止穿透 高并发下可能导致请求阻塞,存在死锁风险 适用于并发量不高,对数据一致性要求高的场景。
异步更新缓存 提高系统响应速度,避免阻塞用户请求 实现复杂,存在数据不一致的风险 适用于对响应速度要求高,可以容忍短暂数据不一致的场景。 例如,允许用户先看到旧数据,然后异步更新。

总结与抉择

面对缓存穿透,我们需要根据实际情况选择合适的解决方案。 没有一种方案是万能的,需要权衡各种因素,选择最适合自己的方案。 例如,如果数据量不大,可以选择缓存空对象;如果数据量很大,可以选择布隆过滤器;如果对数据一致性要求高,可以选择互斥锁;如果对响应速度要求高,可以选择异步更新缓存。

总结:

  • 了解缓存穿透的原理和危害。
  • 掌握多种缓存穿透的解决方案。
  • 根据实际情况选择合适的解决方案。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注