Spring Boot整合MyBatis缓存穿透应对策略
大家好,今天我们来聊一聊Spring Boot整合MyBatis时,如何有效应对缓存穿透问题,避免数据库压力暴涨。缓存穿透是缓存失效时,查询请求直击数据库,如果大量请求涌入,DB很可能崩溃。我们将深入探讨缓存穿透的成因,并提供多种解决方案,包含详细的代码示例和逻辑分析,帮助大家在实际项目中构建健壮的缓存机制。
缓存穿透的原理与危害
缓存穿透是指客户端请求的数据在缓存和数据库中都不存在,导致每次请求都直接访问数据库。如果恶意攻击或者大量访问不存在的数据,数据库会承受巨大的压力,甚至崩溃。
为什么会发生缓存穿透?
- 缓存未命中: 请求的数据对应的key在缓存中不存在。
- 数据库不存在: 即使缓存中没有,数据库中也不存在对应的数据。
缓存穿透的危害:
- 数据库压力剧增: 所有请求都直接访问数据库,导致数据库负载过高。
- 系统性能下降: 数据库成为瓶颈,导致整个系统响应速度变慢。
- 服务不可用: 在高并发情况下,数据库可能崩溃,导致服务不可用。
解决方案一:缓存空对象
最简单的解决方案是,当数据库查询结果为空时,我们仍然将一个特殊的值(例如null值或者特定的空对象)放入缓存。下次再遇到相同的请求时,缓存直接返回这个特殊值,避免访问数据库。
优点:
- 实现简单,易于理解。
- 可以有效防止缓存穿透。
缺点:
- 缓存中存储了无效数据,占用一定的缓存空间。
- 需要设置较短的过期时间,否则可能会导致数据不一致。
代码示例:
@Service
public class UserServiceImpl implements UserService {
@Autowired
private UserRepository userRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String USER_CACHE_PREFIX = "user:";
private static final String NULL_VALUE = "NULL"; // 空值标识
@Override
public User getUserById(Long id) {
String key = USER_CACHE_PREFIX + id;
User user = (User) redisTemplate.opsForValue().get(key);
if (user != null && !NULL_VALUE.equals(user.getUsername())) { // 检查是否是空值
System.out.println("从缓存中获取用户");
return user;
}
user = userRepository.findById(id).orElse(null);
if (user == null) {
System.out.println("数据库中不存在用户,缓存空对象");
// 缓存空对象,设置较短的过期时间,比如1分钟
User nullUser = new User();
nullUser.setUsername(NULL_VALUE); // 用一个特殊标识代表空对象
redisTemplate.opsForValue().set(key, nullUser, 1, TimeUnit.MINUTES);
return null; // 返回null,表示没有找到用户
}
System.out.println("从数据库中获取用户");
redisTemplate.opsForValue().set(key, user, 1, TimeUnit.HOURS); // 设置缓存过期时间
return user;
}
}
代码解释:
NULL_VALUE: 定义一个常量作为空值的标识符,用于区分真实的用户对象和空对象。- 缓存命中判断: 在从Redis获取到数据后,需要判断取出的对象是否是空对象,如果是空对象,则不返回。
- 缓存空对象: 如果数据库查询结果为空,创建一个特殊的空对象,并将其放入缓存,设置较短的过期时间。
- 过期时间: 空对象的过期时间要短,例如1分钟,避免长时间的缓存无效数据。正常数据的过期时间可以长一些,例如1小时。
关键点:
- 选择一个合适的空值标识,例如
NULL_VALUE。 - 设置空对象的较短过期时间。
- 在从缓存获取数据时,需要判断是否是空对象。
解决方案二:布隆过滤器
布隆过滤器是一种概率型数据结构,用于判断一个元素是否存在于一个集合中。它可以高效地告诉你“可能存在”或“绝对不存在”。 如果布隆过滤器判断某个key不存在,那么这个key肯定不存在于数据库中,就可以直接返回,避免查询数据库。
优点:
- 可以非常有效地过滤掉不存在的key,减少数据库的访问。
- 占用空间较小。
缺点:
- 存在一定的误判率,即可能会把存在的数据判断为不存在。
- 删除元素比较困难。
代码示例:
首先,引入 Google Guava 库,它提供了 BloomFilter 的实现:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
然后,实现布隆过滤器:
@Service
public class UserServiceBloomFilterImpl implements UserService {
@Autowired
private UserRepository userRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String USER_CACHE_PREFIX = "user:";
private BloomFilter<Long> bloomFilter;
@PostConstruct // 在Bean初始化后执行
public void init() {
// 从数据库加载所有用户ID,初始化布隆过滤器
List<Long> userIds = userRepository.findAll().stream().map(User::getId).collect(Collectors.toList());
// 创建布隆过滤器,设置预期插入数量和误判率
bloomFilter = BloomFilter.create(Funnels.longFunnel(), userIds.size(), 0.01); // 0.01 是误判率
// 将用户ID添加到布隆过滤器
userIds.forEach(bloomFilter::put);
}
@Override
public User getUserById(Long id) {
String key = USER_CACHE_PREFIX + id;
User user = (User) redisTemplate.opsForValue().get(key);
if (user != null) {
System.out.println("从缓存中获取用户");
return user;
}
// 先判断ID是否存在于布隆过滤器中
if (!bloomFilter.mightContain(id)) {
System.out.println("ID 不存在于布隆过滤器中,直接返回");
return null; // 直接返回,避免查询数据库
}
user = userRepository.findById(id).orElse(null);
if (user != null) {
System.out.println("从数据库中获取用户");
redisTemplate.opsForValue().set(key, user, 1, TimeUnit.HOURS); // 设置缓存过期时间
}
return user;
}
}
代码解释:
BloomFilter<Long> bloomFilter: 定义布隆过滤器对象。@PostConstruct: 使用@PostConstruct注解,在Bean初始化后执行init()方法。init()方法:- 从数据库加载所有用户ID。
- 创建布隆过滤器,设置预期插入数量和误判率。
- 将用户ID添加到布隆过滤器。
bloomFilter.mightContain(id): 在查询数据库之前,先判断ID是否存在于布隆过滤器中,如果不存在,则直接返回,避免查询数据库。
关键点:
- 在应用启动时,从数据库加载所有数据到布隆过滤器中。
- 选择合适的误判率,需要根据实际情况进行调整。
- 需要定期更新布隆过滤器中的数据,以保证数据的一致性。
布隆过滤器参数选择:
布隆过滤器的关键在于选择合适的expectedInsertions(预期插入数量)和fpp(误判率)。 这些参数会影响布隆过滤器的大小和性能。
| 参数 | 含义 | 影响 |
|---|---|---|
expectedInsertions |
预计要插入到布隆过滤器中的元素数量。 | 如果实际插入的数量远大于预期,误判率会急剧上升。 |
fpp |
期望的误判率(False Positive Probability)。 例如,0.01 表示 1% 的误判率。 | 误判率越低,布隆过滤器需要的空间越大。 在性能和准确性之间需要权衡。 |
计算布隆过滤器大小:
布隆过滤器的大小(位数m)和哈希函数的个数k,可以通过以下公式计算:
m = -n * ln(p) / (ln(2))^2k = m / n * ln(2)
其中:
n是预期插入的元素数量 (expectedInsertions)p是期望的误判率 (fpp)
例如,如果 n = 1000000 (1百万) 且 p = 0.01 (1%),则:
m ≈ -1000000 * ln(0.01) / (ln(2))^2 ≈ 9585058位 (大约 1.2MB)k ≈ 9585058 / 1000000 * ln(2) ≈ 6.64(通常取整数,例如 7)
因此,你需要大约 1.2MB 的空间,并使用 7 个哈希函数。
如何选择合适的参数:
- 确定
expectedInsertions: 根据你的应用场景,预估可能插入到布隆过滤器中的最大元素数量。 宁可高估,也不要低估,否则误判率会超出预期。 - 确定
fpp: 根据你的应用场景,确定可接受的误判率。 如果对准确性要求很高,选择较低的误判率,但会增加内存消耗。 如果对内存消耗更敏感,可以适当提高误判率。 - 使用在线计算器或公式: 可以使用在线布隆过滤器计算器(例如:https://hur.st/bloomfilter/)或上述公式,根据
expectedInsertions和fpp计算出合适的布隆过滤器大小和哈希函数个数。
动态更新布隆过滤器:
布隆过滤器不支持直接删除元素。 如果需要删除元素,通常的做法是重建布隆过滤器。可以定期重建布隆过滤器,或者当删除操作达到一定数量时重建。
解决方案三:互斥锁
当缓存未命中时,使用互斥锁(Mutex)来防止大量请求同时访问数据库。 只有一个请求可以获得锁,访问数据库,并将结果写入缓存。 其他请求需要等待锁释放后,再从缓存中获取数据。
优点:
- 可以有效防止缓存穿透。
- 实现简单,易于理解。
缺点:
- 在高并发情况下,可能会导致部分请求阻塞,影响性能。
- 如果获取锁的线程发生异常,可能导致死锁。
代码示例:
@Service
public class UserServiceMutexImpl implements UserService {
@Autowired
private UserRepository userRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String USER_CACHE_PREFIX = "user:";
private final Object lock = new Object(); //互斥锁
@Override
public User getUserById(Long id) {
String key = USER_CACHE_PREFIX + id;
User user = (User) redisTemplate.opsForValue().get(key);
if (user != null) {
System.out.println("从缓存中获取用户");
return user;
}
// 使用互斥锁,防止缓存穿透
synchronized (lock) {
// 再次检查缓存,防止重复查询数据库 (double-check locking)
user = (User) redisTemplate.opsForValue().get(key);
if (user != null) {
System.out.println("从缓存中获取用户 (double-check)");
return user;
}
user = userRepository.findById(id).orElse(null);
if (user != null) {
System.out.println("从数据库中获取用户");
redisTemplate.opsForValue().set(key, user, 1, TimeUnit.HOURS); // 设置缓存过期时间
}
return user;
}
}
}
代码解释:
private final Object lock = new Object();: 定义一个互斥锁对象。synchronized (lock): 使用synchronized关键字,对代码块进行加锁。- Double-Check Locking: 在获取锁之后,再次检查缓存是否存在,防止重复查询数据库。
关键点:
- 使用
synchronized或者其他锁机制,确保只有一个线程可以访问数据库。 - 使用Double-Check Locking,减少锁的竞争。
使用 Redis 分布式锁:
如果你的应用是分布式部署的,那么 Java 的 synchronized 锁只能保证单机环境下的互斥性。 你需要使用 Redis 分布式锁来实现跨机器的互斥。
@Service
public class UserServiceRedisLockImpl implements UserService {
@Autowired
private UserRepository userRepository;
@Autowired
private StringRedisTemplate stringRedisTemplate; // 注意这里使用 StringRedisTemplate
private static final String USER_CACHE_PREFIX = "user:";
private static final String LOCK_PREFIX = "lock:user:";
@Override
public User getUserById(Long id) {
String key = USER_CACHE_PREFIX + id;
User user = (User) stringRedisTemplate.opsForValue().get(key); // 缓存中存储的是 String,需要转换
if (user != null) {
System.out.println("从缓存中获取用户");
return convertJsonToUser(user); // 从缓存中获取的是 JSON String,需要转换成 User 对象
}
String lockKey = LOCK_PREFIX + id;
String lockValue = UUID.randomUUID().toString(); // 唯一值,防止误删
try {
// 尝试获取锁,设置过期时间,防止死锁
Boolean lockAcquired = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS); // 10 秒过期时间
if (lockAcquired != null && lockAcquired) {
System.out.println("成功获取锁");
// 再次检查缓存,防止重复查询数据库
user = (User) stringRedisTemplate.opsForValue().get(key);
if (user != null) {
System.out.println("从缓存中获取用户 (double-check)");
return convertJsonToUser(user);
}
User dbUser = userRepository.findById(id).orElse(null);
if (dbUser != null) {
System.out.println("从数据库中获取用户");
stringRedisTemplate.opsForValue().set(key, convertUserToJson(dbUser), 1, TimeUnit.HOURS); // 设置缓存过期时间,存储 JSON String
return dbUser;
}
} else {
System.out.println("获取锁失败,稍后重试");
// 获取锁失败,可以稍后重试,或者抛出异常
try {
Thread.sleep(50); // 短暂休眠后重试
return getUserById(id); // 递归调用,重试
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null; // 返回 null 或抛出异常
}
}
} finally {
// 释放锁,必须确保锁是由当前线程持有的
if (lockValue.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
stringRedisTemplate.delete(lockKey);
System.out.println("释放锁");
}
}
return null;
}
// User 对象转换为 JSON String
private String convertUserToJson(User user) {
// 使用 Jackson, Gson 等 JSON 库
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(user);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null; // 或者抛出异常
}
}
// JSON String 转换为 User 对象
private User convertJsonToUser(String json) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.readValue(json, User.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null; // 或者抛出异常
}
}
}
代码解释:
StringRedisTemplate: 使用StringRedisTemplate,因为 Redis 存储的是字符串。LOCK_PREFIX: 定义锁的 Key 前缀。lockValue: 使用 UUID 作为锁的值,确保唯一性,防止误删。setIfAbsent: 使用setIfAbsent方法尝试获取锁,并设置过期时间,防止死锁。- 释放锁: 必须在
finally块中释放锁,并且要验证锁是否是由当前线程持有的,才能删除锁。 - JSON 转换: 因为 Redis 存储的是字符串,需要将 User 对象转换为 JSON 字符串,再存储到 Redis 中。 从 Redis 中读取时,也需要将 JSON 字符串转换回 User 对象。
关键点:
- 使用
setIfAbsent方法原子性地获取锁。 - 设置锁的过期时间,防止死锁。
- 使用 UUID 作为锁的值,防止误删。
- 在
finally块中释放锁,并且要验证锁是否是由当前线程持有的。 - 使用 JSON 转换器来序列化和反序列化 User 对象。
- 获取锁失败后,可以进行重试。
解决方案四: 异步更新缓存
当缓存未命中时,先返回一个默认值或者空值,然后异步地从数据库加载数据,更新缓存。 这样可以避免阻塞用户请求,提高系统的响应速度。
优点:
- 可以提高系统的响应速度。
- 可以避免阻塞用户请求。
缺点:
- 实现相对复杂。
- 存在数据不一致的风险。
代码示例:
@Service
public class UserServiceAsyncImpl implements UserService {
@Autowired
private UserRepository userRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ApplicationEventPublisher eventPublisher; // 事件发布器
private static final String USER_CACHE_PREFIX = "user:";
@Override
public User getUserById(Long id) {
String key = USER_CACHE_PREFIX + id;
User user = (User) redisTemplate.opsForValue().get(key);
if (user != null) {
System.out.println("从缓存中获取用户");
return user;
}
// 缓存未命中,先返回 null 或者默认值
System.out.println("缓存未命中,返回 null,并异步更新缓存");
// 发布一个事件,异步更新缓存
eventPublisher.publishEvent(new CacheRefreshEvent(id));
return null; // 返回 null 或者默认值
}
// 异步更新缓存的监听器
@EventListener
@Async
public void onCacheRefreshEvent(CacheRefreshEvent event) {
Long id = event.getId();
String key = USER_CACHE_PREFIX + id;
System.out.println("异步更新缓存,ID: " + id);
User user = userRepository.findById(id).orElse(null);
if (user != null) {
System.out.println("从数据库中获取用户");
redisTemplate.opsForValue().set(key, user, 1, TimeUnit.HOURS); // 设置缓存过期时间
}
}
// 定义一个事件
public static class CacheRefreshEvent {
private final Long id;
public CacheRefreshEvent(Long id) {
this.id = id;
}
public Long getId() {
return id;
}
}
}
代码解释:
ApplicationEventPublisher eventPublisher: 使用ApplicationEventPublisher发布事件。CacheRefreshEvent: 定义一个事件类,包含需要刷新的 ID。eventPublisher.publishEvent(new CacheRefreshEvent(id)): 发布事件,触发异步更新缓存的操作。@EventListener: 使用@EventListener注解,监听CacheRefreshEvent事件。@Async: 使用@Async注解,将事件处理方法异步执行。
关键点:
- 使用 Spring 的事件机制,实现异步更新缓存。
- 需要配置
@EnableAsync,开启异步支持。 - 需要处理异步任务可能出现的异常。
- 返回默认值或空值,避免阻塞用户请求。
配置 @EnableAsync:
在 Spring Boot 的启动类或者配置类中,需要添加 @EnableAsync 注解,开启异步支持。
@SpringBootApplication
@EnableAsync // 开启异步支持
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
各种方案的对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 缓存空对象 | 实现简单,有效防止穿透 | 占用缓存空间,需要设置较短的过期时间 | 适用于数据不存在的情况较少,且对缓存空间不敏感的场景。 |
| 布隆过滤器 | 高效过滤不存在的key,占用空间小 | 存在误判率,删除元素困难 | 适用于数据量大,允许一定误判率,且数据变动不频繁的场景。 例如,黑名单过滤。 |
| 互斥锁 | 实现简单,有效防止穿透 | 高并发下可能导致请求阻塞,存在死锁风险 | 适用于并发量不高,对数据一致性要求高的场景。 |
| 异步更新缓存 | 提高系统响应速度,避免阻塞用户请求 | 实现复杂,存在数据不一致的风险 | 适用于对响应速度要求高,可以容忍短暂数据不一致的场景。 例如,允许用户先看到旧数据,然后异步更新。 |
总结与抉择
面对缓存穿透,我们需要根据实际情况选择合适的解决方案。 没有一种方案是万能的,需要权衡各种因素,选择最适合自己的方案。 例如,如果数据量不大,可以选择缓存空对象;如果数据量很大,可以选择布隆过滤器;如果对数据一致性要求高,可以选择互斥锁;如果对响应速度要求高,可以选择异步更新缓存。
总结:
- 了解缓存穿透的原理和危害。
- 掌握多种缓存穿透的解决方案。
- 根据实际情况选择合适的解决方案。