大家好,我是今天的主讲人,很高兴能和大家一起探讨Redis缓存的三大难题:缓存穿透、雪崩和击穿,以及它们对应的解决方案。咱们今天这场讲座,不搞那些虚头巴脑的理论,直接上干货,用最接地气的方式把这些问题给掰开了、揉碎了,再给大家伙儿喂进去。
第一部分:缓存穿透 – 防君子不防小人?不存在的!
啥是缓存穿透?简单来说,就是黑客或者恶意用户拿着压根不存在的key去请求你的数据,Redis里没有,数据库里也没有,每次都得请求数据库,这就像拿着空气当宝贝,白白浪费服务器的资源。如果攻击者构造大量的非法key,那数据库就遭殃了。
想象一下,你开了一家包子铺,正常情况下,顾客来买包子,你直接从蒸笼里拿,速度快得很。但突然来了个捣乱的,每天都问你有没有“火星馅”的包子,你每次都得打开蒸笼看看,结果当然是没有。时间长了,其他顾客也没法好好买包子了,这就是缓存穿透的威力。
解决方案:布隆过滤器(Bloom Filter)
布隆过滤器就像一个“黑名单”,它能告诉你某个key是否存在。注意,它说“不存在”的时候,那肯定是真不存在;但它说“存在”的时候,有可能是误判。但这没关系,我们只需要把那些不存在的key挡在Redis之外,就能大大减轻数据库的压力。
原理:
布隆过滤器其实就是一个bit数组和多个哈希函数。当我们往布隆过滤器里添加一个key的时候,会用多个哈希函数计算出多个哈希值,然后把bit数组中对应位置的值设置为1。当我们查询一个key是否存在的时候,同样用这几个哈希函数计算出哈希值,然后检查bit数组中对应位置的值是否都为1。如果都为1,说明这个key可能存在;如果有一个不为1,那肯定不存在。
代码示例(Python):
虽然Redis本身没有内置布隆过滤器,但我们可以使用现有的库,例如pybloom_live
。
from pybloom_live import BloomFilter
# 初始化布隆过滤器,capacity是期望存储的元素数量,error_rate是误判率
bf = BloomFilter(capacity=10000, error_rate=0.001)
# 模拟一些已存在的key
existing_keys = ['user1', 'user2', 'product3', 'order4']
for key in existing_keys:
bf.add(key)
# 模拟请求
def get_data(key):
if key in bf:
# 布隆过滤器说可能存在,再去Redis或数据库查
print(f"Key '{key}' 可能存在,尝试从Redis/数据库获取...")
# 假设Redis或数据库查询逻辑
data = get_from_redis_or_db(key)
if data:
print(f"成功获取到数据: {data}")
return data
else:
print(f"Redis/数据库中也不存在Key '{key}'")
return None
else:
# 布隆过滤器说不存在,直接返回空
print(f"Key '{key}' 肯定不存在,直接返回None")
return None
# 模拟从Redis或数据库获取数据的函数
def get_from_redis_or_db(key):
# 这里仅仅是模拟,实际实现要连接Redis或数据库
if key in ['user1', 'user2', 'product3', 'order4']:
return f"Data for {key}"
else:
return None
# 测试
print("--- 测试存在的Key ---")
get_data('user1')
print("n--- 测试不存在的Key ---")
get_data('nonexistent_key')
print("n--- 测试可能存在的Key (误判) ---")
get_data('user5') # 模拟误判的情况,实际上user5不存在
优点:
- 空间效率高:布隆过滤器只需要很小的空间就能存储大量的key信息。
- 查询速度快:只需要进行几次哈希计算和bit位检查,速度非常快。
缺点:
- 存在误判:可能会把不存在的key误判为存在。
- 不支持删除:一旦添加到布隆过滤器中的key,就无法删除。
使用场景:
- 防止缓存穿透
- 垃圾邮件过滤
- URL去重
第二部分:缓存雪崩 – 蝴蝶效应的恐怖之处
缓存雪崩指的是在同一时刻,大量的缓存key同时失效,导致大量的请求直接打到数据库上,数据库瞬间崩溃。这就像多米诺骨牌一样,一个倒了,跟着倒一片。
还是拿包子铺举例,如果突然停电,所有的蒸笼都无法工作,顾客来买包子,你只能现做,速度慢不说,一下子来太多顾客,你根本忙不过来,最后只能关门大吉。
解决方案:多级缓存 + 熔断降级
-
多级缓存: 不要把所有鸡蛋放在一个篮子里。除了Redis,我们还可以使用本地缓存(例如Guava Cache、Caffeine),甚至CDN缓存。当Redis挂掉的时候,本地缓存可以顶一阵子,给恢复争取时间。
-
熔断降级: 当Redis出现故障的时候,及时熔断,阻止流量继续涌入。同时,可以提供一些降级服务,例如返回默认值或者静态页面,保证系统的基本可用性。
代码示例(Java + Spring Boot):
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
@Service
public class UserService {
// 使用Spring Cache,结合Redis和本地缓存
@Cacheable(value = "userCache", key = "#userId", unless = "#result == null")
public User getUserById(Long userId) {
// 模拟从数据库查询用户
User user = queryUserFromDatabase(userId);
return user;
}
private User queryUserFromDatabase(Long userId) {
// 模拟数据库查询逻辑
System.out.println("从数据库查询用户:" + userId);
// 这里可以添加熔断逻辑,例如使用Hystrix或Sentinel
if (isDatabaseDown()) {
// 如果数据库挂了,直接返回null或者使用降级策略
return null; // Or return a default user object
}
if (userId == 123L) {
return new User(userId, "张三", 28);
} else {
return null;
}
}
// 模拟数据库是否挂掉
private boolean isDatabaseDown() {
// 在实际应用中,可以根据监控指标判断数据库状态
// 这里简单模拟,假设每隔一段时间数据库会挂掉
return System.currentTimeMillis() % 60000 > 30000; // 模拟30秒正常,30秒故障
}
// 降级方法 ( fallbackMethod ),在Hystrix或Sentinel中配置
public User getDefaultUser(Long userId) {
// 返回默认用户对象或者从备用数据源获取
System.out.println("执行降级方法,返回默认用户");
return new User(userId, "默认用户", 0);
}
}
class User {
private Long id;
private String name;
private int age;
public User(Long id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
// Getters and setters (省略)
public Long getId() {
return id;
}
public String getName() {
return name;
}
public int getAge() {
return age;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + ''' +
", age=" + age +
'}';
}
}
Spring Boot 配置示例 (application.properties):
spring.cache.type=redis
spring.redis.host=localhost
spring.redis.port=6379
# 配置本地缓存 ( Caffeine )
spring.cache.cache-names=userCache
spring.cache.caffeine.spec=maximumSize=100,expireAfterWrite=60s
代码解释:
@Cacheable
注解:Spring Cache的核心注解,用于将方法的结果缓存起来。value
指定缓存的名称,key
指定缓存的key。unless
指定不缓存的条件,这里表示如果方法返回null,则不缓存。queryUserFromDatabase
方法:模拟从数据库查询用户,同时加入了熔断逻辑。如果数据库挂了,直接返回null,触发降级。getDefaultUser
方法:降级方法,在Hystrix或Sentinel中配置。当数据库挂掉或者Redis挂掉的时候,会调用这个方法,返回默认用户对象。application.properties
:配置Redis连接信息和本地缓存Caffeine的参数。
优化策略:
- 设置不同的过期时间: 不要让所有的key在同一时刻过期。可以在设置缓存的时候,给每个key加上一个随机的过期时间。
- 互斥锁: 当缓存失效的时候,使用互斥锁,只允许一个线程去数据库查询数据,其他线程等待。
- 服务降级: 当系统负载过高的时候,可以暂时关闭一些非核心功能,释放资源。
第三部分:缓存击穿 – 狙击手专挑软柿子捏
缓存击穿指的是一个热点key过期了,大量的请求同时访问这个key,导致所有的请求都打到数据库上,数据库瞬间压力剧增。这就像一个明星的八卦新闻,刚爆出来的时候,所有人都想看,服务器扛不住。
还是包子铺,突然某个口味的包子特别受欢迎,但你又没及时补充库存,导致这个口味的包子卖光了。顾客来了都想买这种包子,你只能现做,根本供不应求。
解决方案:永不过期 + 互斥锁
-
永不过期: 热点key永远不过期。当然,这并不是真的永不过期,而是指逻辑上的永不过期。我们可以设置一个定时任务,定期更新热点key的缓存。
-
互斥锁: 当缓存失效的时候,使用互斥锁,只允许一个线程去数据库查询数据,其他线程等待。这样可以避免大量的请求同时打到数据库上。
代码示例(Java + Redis):
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
import java.util.Random;
@Service
public class ProductService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String PRODUCT_KEY_PREFIX = "product:";
private static final String PRODUCT_LOCK_PREFIX = "lock:product:";
private static final long LOCK_EXPIRE_TIME = 5; // 锁的过期时间,单位秒
public Product getProductById(Long productId) {
String productKey = PRODUCT_KEY_PREFIX + productId;
String productLockKey = PRODUCT_LOCK_PREFIX + productId;
// 1. 从Redis获取数据
String productJson = redisTemplate.opsForValue().get(productKey);
if (productJson != null) {
// 2. 缓存命中,直接返回
System.out.println("缓存命中,直接返回");
return convertJsonToProduct(productJson);
} else {
// 3. 缓存未命中,尝试获取锁
boolean lockAcquired = tryAcquireLock(productLockKey);
if (lockAcquired) {
try {
// 4. 获取到锁,从数据库查询数据
System.out.println("获取到锁,从数据库查询数据");
Product product = queryProductFromDatabase(productId);
if (product != null) {
// 5. 将数据写入Redis,并设置过期时间 (逻辑永不过期,定时更新)
String newProductJson = convertProductToJson(product);
redisTemplate.opsForValue().set(productKey, newProductJson);
// 定时任务更新,可以设置一个稍微长一点的过期时间,防止缓存雪崩
redisTemplate.expire(productKey, 60 + new Random().nextInt(30), TimeUnit.SECONDS); // 随机增加过期时间
return product;
} else {
// 6. 数据库中也不存在,写入空值,防止缓存穿透 (可选)
redisTemplate.opsForValue().set(productKey, "", 60, TimeUnit.SECONDS); // 设置一个较短的过期时间
return null;
}
} finally {
// 7. 释放锁
releaseLock(productLockKey);
}
} else {
// 8. 没有获取到锁,等待一段时间后重试 (自旋)
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return getProductById(productId); // 递归调用,重试
}
}
}
// 尝试获取锁
private boolean tryAcquireLock(String lockKey) {
Boolean acquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "locked", LOCK_EXPIRE_TIME, TimeUnit.SECONDS);
return acquired != null && acquired;
}
// 释放锁
private void releaseLock(String lockKey) {
redisTemplate.delete(lockKey);
}
// 模拟从数据库查询商品
private Product queryProductFromDatabase(Long productId) {
// 模拟数据库查询逻辑
System.out.println("从数据库查询商品:" + productId);
if (productId == 1L) {
return new Product(productId, "超级好吃的包子", 10.0);
} else {
return null;
}
}
// 将Product对象转换为JSON字符串
private String convertProductToJson(Product product) {
// 使用JSON库,例如Jackson或Gson
// 这里简化处理
return String.format("{"id":%d,"name":"%s","price":%.2f}", product.getId(), product.getName(), product.getPrice());
}
// 将JSON字符串转换为Product对象
private Product convertJsonToProduct(String productJson) {
// 使用JSON库,例如Jackson或Gson
// 这里简化处理
if (productJson.contains("超级好吃的包子")) {
return new Product(1L, "超级好吃的包子", 10.0);
}
return null;
}
}
class Product {
private Long id;
private String name;
private Double price;
public Product(Long id, String name, Double price) {
this.id = id;
this.name = name;
this.price = price;
}
// Getters and setters (省略)
public Long getId() {
return id;
}
public String getName() {
return name;
}
public Double getPrice() {
return price;
}
@Override
public String toString() {
return "Product{" +
"id=" + id +
", name='" + name + ''' +
", price=" + price +
'}';
}
}
代码解释:
getProductById
方法:首先从Redis获取数据,如果缓存命中,直接返回。如果缓存未命中,则尝试获取锁。如果获取到锁,则从数据库查询数据,并将数据写入Redis,同时设置一个稍微长一点的过期时间,防止缓存雪崩。如果没有获取到锁,则等待一段时间后重试。tryAcquireLock
方法:尝试获取锁,使用setIfAbsent
命令,只有在key不存在的时候才能设置成功。releaseLock
方法:释放锁,使用delete
命令删除key。queryProductFromDatabase
方法:模拟从数据库查询商品。convertProductToJson
和convertJsonToProduct
方法:用于将Product对象转换为JSON字符串,以及将JSON字符串转换为Product对象。
优化策略:
- 预热: 在系统启动的时候,提前将热点数据加载到缓存中。
- 限流: 对热点key的访问进行限流,防止大量的请求同时打到数据库上。
- 多副本: 为热点key创建多个副本,分散请求压力。
总结:
问题 | 现象 | 解决方案 | 适用场景 |
---|---|---|---|
缓存穿透 | 大量请求不存在的key,直击数据库 | 布隆过滤器:快速判断key是否存在,过滤非法请求 | 恶意攻击,防止非法key访问数据库 |
缓存雪崩 | 大量key同时失效,数据库瞬间崩溃 | 多级缓存:使用本地缓存等作为Redis的补充;熔断降级:当Redis故障时,熔断请求,提供降级服务;设置不同的过期时间:避免key同时失效 | 系统维护,Redis故障,避免数据库崩溃 |
缓存击穿 | 热点key失效,大量请求涌向数据库 | 永不过期(逻辑过期):定时更新热点key;互斥锁:只允许一个线程访问数据库,其他线程等待 | 热点数据访问,避免数据库压力过大 |
今天我们一起学习了Redis缓存的三大难题以及对应的解决方案。希望大家能够灵活运用这些技巧,让你的系统更加稳定、高效!记住,没有银弹,只有适合场景的解决方案。选择合适的策略,才能让你的缓存发挥最大的价值。感谢大家的参与!