Redis 缓存穿透、雪崩、击穿的应对方案:布隆过滤器、多级缓存、熔断降级

大家好,我是今天的主讲人,很高兴能和大家一起探讨Redis缓存的三大难题:缓存穿透、雪崩和击穿,以及它们对应的解决方案。咱们今天这场讲座,不搞那些虚头巴脑的理论,直接上干货,用最接地气的方式把这些问题给掰开了、揉碎了,再给大家伙儿喂进去。

第一部分:缓存穿透 – 防君子不防小人?不存在的!

啥是缓存穿透?简单来说,就是黑客或者恶意用户拿着压根不存在的key去请求你的数据,Redis里没有,数据库里也没有,每次都得请求数据库,这就像拿着空气当宝贝,白白浪费服务器的资源。如果攻击者构造大量的非法key,那数据库就遭殃了。

想象一下,你开了一家包子铺,正常情况下,顾客来买包子,你直接从蒸笼里拿,速度快得很。但突然来了个捣乱的,每天都问你有没有“火星馅”的包子,你每次都得打开蒸笼看看,结果当然是没有。时间长了,其他顾客也没法好好买包子了,这就是缓存穿透的威力。

解决方案:布隆过滤器(Bloom Filter)

布隆过滤器就像一个“黑名单”,它能告诉你某个key是否存在。注意,它说“不存在”的时候,那肯定是真不存在;但它说“存在”的时候,有可能是误判。但这没关系,我们只需要把那些不存在的key挡在Redis之外,就能大大减轻数据库的压力。

原理:

布隆过滤器其实就是一个bit数组和多个哈希函数。当我们往布隆过滤器里添加一个key的时候,会用多个哈希函数计算出多个哈希值,然后把bit数组中对应位置的值设置为1。当我们查询一个key是否存在的时候,同样用这几个哈希函数计算出哈希值,然后检查bit数组中对应位置的值是否都为1。如果都为1,说明这个key可能存在;如果有一个不为1,那肯定不存在。

代码示例(Python):

虽然Redis本身没有内置布隆过滤器,但我们可以使用现有的库,例如pybloom_live

from pybloom_live import BloomFilter

# 初始化布隆过滤器,capacity是期望存储的元素数量,error_rate是误判率
bf = BloomFilter(capacity=10000, error_rate=0.001)

# 模拟一些已存在的key
existing_keys = ['user1', 'user2', 'product3', 'order4']
for key in existing_keys:
    bf.add(key)

# 模拟请求
def get_data(key):
    if key in bf:
        # 布隆过滤器说可能存在,再去Redis或数据库查
        print(f"Key '{key}' 可能存在,尝试从Redis/数据库获取...")
        # 假设Redis或数据库查询逻辑
        data = get_from_redis_or_db(key)
        if data:
            print(f"成功获取到数据: {data}")
            return data
        else:
            print(f"Redis/数据库中也不存在Key '{key}'")
            return None
    else:
        # 布隆过滤器说不存在,直接返回空
        print(f"Key '{key}' 肯定不存在,直接返回None")
        return None

# 模拟从Redis或数据库获取数据的函数
def get_from_redis_or_db(key):
  # 这里仅仅是模拟,实际实现要连接Redis或数据库
  if key in ['user1', 'user2', 'product3', 'order4']:
    return f"Data for {key}"
  else:
    return None

# 测试
print("--- 测试存在的Key ---")
get_data('user1')
print("n--- 测试不存在的Key ---")
get_data('nonexistent_key')
print("n--- 测试可能存在的Key (误判) ---")
get_data('user5') # 模拟误判的情况,实际上user5不存在

优点:

  • 空间效率高:布隆过滤器只需要很小的空间就能存储大量的key信息。
  • 查询速度快:只需要进行几次哈希计算和bit位检查,速度非常快。

缺点:

  • 存在误判:可能会把不存在的key误判为存在。
  • 不支持删除:一旦添加到布隆过滤器中的key,就无法删除。

使用场景:

  • 防止缓存穿透
  • 垃圾邮件过滤
  • URL去重

第二部分:缓存雪崩 – 蝴蝶效应的恐怖之处

缓存雪崩指的是在同一时刻,大量的缓存key同时失效,导致大量的请求直接打到数据库上,数据库瞬间崩溃。这就像多米诺骨牌一样,一个倒了,跟着倒一片。

还是拿包子铺举例,如果突然停电,所有的蒸笼都无法工作,顾客来买包子,你只能现做,速度慢不说,一下子来太多顾客,你根本忙不过来,最后只能关门大吉。

解决方案:多级缓存 + 熔断降级

  • 多级缓存: 不要把所有鸡蛋放在一个篮子里。除了Redis,我们还可以使用本地缓存(例如Guava Cache、Caffeine),甚至CDN缓存。当Redis挂掉的时候,本地缓存可以顶一阵子,给恢复争取时间。

  • 熔断降级: 当Redis出现故障的时候,及时熔断,阻止流量继续涌入。同时,可以提供一些降级服务,例如返回默认值或者静态页面,保证系统的基本可用性。

代码示例(Java + Spring Boot):

import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;

@Service
public class UserService {

    // 使用Spring Cache,结合Redis和本地缓存
    @Cacheable(value = "userCache", key = "#userId", unless = "#result == null")
    public User getUserById(Long userId) {
        // 模拟从数据库查询用户
        User user = queryUserFromDatabase(userId);
        return user;
    }

    private User queryUserFromDatabase(Long userId) {
        // 模拟数据库查询逻辑
        System.out.println("从数据库查询用户:" + userId);
        // 这里可以添加熔断逻辑,例如使用Hystrix或Sentinel
        if (isDatabaseDown()) {
            // 如果数据库挂了,直接返回null或者使用降级策略
            return null; // Or return a default user object
        }
        if (userId == 123L) {
            return new User(userId, "张三", 28);
        } else {
            return null;
        }
    }

    // 模拟数据库是否挂掉
    private boolean isDatabaseDown() {
        // 在实际应用中,可以根据监控指标判断数据库状态
        // 这里简单模拟,假设每隔一段时间数据库会挂掉
        return System.currentTimeMillis() % 60000 > 30000; // 模拟30秒正常,30秒故障
    }

    // 降级方法 ( fallbackMethod ),在Hystrix或Sentinel中配置
    public User getDefaultUser(Long userId) {
        // 返回默认用户对象或者从备用数据源获取
        System.out.println("执行降级方法,返回默认用户");
        return new User(userId, "默认用户", 0);
    }
}

class User {
    private Long id;
    private String name;
    private int age;

    public User(Long id, String name, int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    // Getters and setters (省略)
    public Long getId() {
        return id;
    }

    public String getName() {
        return name;
    }

    public int getAge() {
        return age;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + ''' +
                ", age=" + age +
                '}';
    }
}

Spring Boot 配置示例 (application.properties):

spring.cache.type=redis
spring.redis.host=localhost
spring.redis.port=6379

# 配置本地缓存 ( Caffeine )
spring.cache.cache-names=userCache
spring.cache.caffeine.spec=maximumSize=100,expireAfterWrite=60s

代码解释:

  • @Cacheable 注解:Spring Cache的核心注解,用于将方法的结果缓存起来。value 指定缓存的名称,key 指定缓存的key。unless 指定不缓存的条件,这里表示如果方法返回null,则不缓存。
  • queryUserFromDatabase 方法:模拟从数据库查询用户,同时加入了熔断逻辑。如果数据库挂了,直接返回null,触发降级。
  • getDefaultUser 方法:降级方法,在Hystrix或Sentinel中配置。当数据库挂掉或者Redis挂掉的时候,会调用这个方法,返回默认用户对象。
  • application.properties:配置Redis连接信息和本地缓存Caffeine的参数。

优化策略:

  • 设置不同的过期时间: 不要让所有的key在同一时刻过期。可以在设置缓存的时候,给每个key加上一个随机的过期时间。
  • 互斥锁: 当缓存失效的时候,使用互斥锁,只允许一个线程去数据库查询数据,其他线程等待。
  • 服务降级: 当系统负载过高的时候,可以暂时关闭一些非核心功能,释放资源。

第三部分:缓存击穿 – 狙击手专挑软柿子捏

缓存击穿指的是一个热点key过期了,大量的请求同时访问这个key,导致所有的请求都打到数据库上,数据库瞬间压力剧增。这就像一个明星的八卦新闻,刚爆出来的时候,所有人都想看,服务器扛不住。

还是包子铺,突然某个口味的包子特别受欢迎,但你又没及时补充库存,导致这个口味的包子卖光了。顾客来了都想买这种包子,你只能现做,根本供不应求。

解决方案:永不过期 + 互斥锁

  • 永不过期: 热点key永远不过期。当然,这并不是真的永不过期,而是指逻辑上的永不过期。我们可以设置一个定时任务,定期更新热点key的缓存。

  • 互斥锁: 当缓存失效的时候,使用互斥锁,只允许一个线程去数据库查询数据,其他线程等待。这样可以避免大量的请求同时打到数据库上。

代码示例(Java + Redis):

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;
import java.util.Random;

@Service
public class ProductService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final String PRODUCT_KEY_PREFIX = "product:";
    private static final String PRODUCT_LOCK_PREFIX = "lock:product:";
    private static final long LOCK_EXPIRE_TIME = 5; // 锁的过期时间,单位秒

    public Product getProductById(Long productId) {
        String productKey = PRODUCT_KEY_PREFIX + productId;
        String productLockKey = PRODUCT_LOCK_PREFIX + productId;

        // 1. 从Redis获取数据
        String productJson = redisTemplate.opsForValue().get(productKey);

        if (productJson != null) {
            // 2. 缓存命中,直接返回
            System.out.println("缓存命中,直接返回");
            return convertJsonToProduct(productJson);
        } else {
            // 3. 缓存未命中,尝试获取锁
            boolean lockAcquired = tryAcquireLock(productLockKey);
            if (lockAcquired) {
                try {
                    // 4. 获取到锁,从数据库查询数据
                    System.out.println("获取到锁,从数据库查询数据");
                    Product product = queryProductFromDatabase(productId);

                    if (product != null) {
                        // 5. 将数据写入Redis,并设置过期时间 (逻辑永不过期,定时更新)
                        String newProductJson = convertProductToJson(product);
                        redisTemplate.opsForValue().set(productKey, newProductJson);
                        // 定时任务更新,可以设置一个稍微长一点的过期时间,防止缓存雪崩
                        redisTemplate.expire(productKey, 60 + new Random().nextInt(30), TimeUnit.SECONDS); // 随机增加过期时间
                        return product;
                    } else {
                        // 6. 数据库中也不存在,写入空值,防止缓存穿透 (可选)
                        redisTemplate.opsForValue().set(productKey, "", 60, TimeUnit.SECONDS); // 设置一个较短的过期时间
                        return null;
                    }
                } finally {
                    // 7. 释放锁
                    releaseLock(productLockKey);
                }
            } else {
                // 8. 没有获取到锁,等待一段时间后重试 (自旋)
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return getProductById(productId); // 递归调用,重试
            }
        }
    }

    // 尝试获取锁
    private boolean tryAcquireLock(String lockKey) {
        Boolean acquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "locked", LOCK_EXPIRE_TIME, TimeUnit.SECONDS);
        return acquired != null && acquired;
    }

    // 释放锁
    private void releaseLock(String lockKey) {
        redisTemplate.delete(lockKey);
    }

    // 模拟从数据库查询商品
    private Product queryProductFromDatabase(Long productId) {
        // 模拟数据库查询逻辑
        System.out.println("从数据库查询商品:" + productId);
        if (productId == 1L) {
            return new Product(productId, "超级好吃的包子", 10.0);
        } else {
            return null;
        }
    }

    // 将Product对象转换为JSON字符串
    private String convertProductToJson(Product product) {
        // 使用JSON库,例如Jackson或Gson
        // 这里简化处理
        return String.format("{"id":%d,"name":"%s","price":%.2f}", product.getId(), product.getName(), product.getPrice());
    }

    // 将JSON字符串转换为Product对象
    private Product convertJsonToProduct(String productJson) {
        // 使用JSON库,例如Jackson或Gson
        // 这里简化处理
        if (productJson.contains("超级好吃的包子")) {
           return new Product(1L, "超级好吃的包子", 10.0);
        }
        return null;
    }
}

class Product {
    private Long id;
    private String name;
    private Double price;

    public Product(Long id, String name, Double price) {
        this.id = id;
        this.name = name;
        this.price = price;
    }

    // Getters and setters (省略)
    public Long getId() {
        return id;
    }

    public String getName() {
        return name;
    }

    public Double getPrice() {
        return price;
    }

    @Override
    public String toString() {
        return "Product{" +
                "id=" + id +
                ", name='" + name + ''' +
                ", price=" + price +
                '}';
    }
}

代码解释:

  • getProductById 方法:首先从Redis获取数据,如果缓存命中,直接返回。如果缓存未命中,则尝试获取锁。如果获取到锁,则从数据库查询数据,并将数据写入Redis,同时设置一个稍微长一点的过期时间,防止缓存雪崩。如果没有获取到锁,则等待一段时间后重试。
  • tryAcquireLock 方法:尝试获取锁,使用setIfAbsent命令,只有在key不存在的时候才能设置成功。
  • releaseLock 方法:释放锁,使用delete命令删除key。
  • queryProductFromDatabase 方法:模拟从数据库查询商品。
  • convertProductToJsonconvertJsonToProduct 方法:用于将Product对象转换为JSON字符串,以及将JSON字符串转换为Product对象。

优化策略:

  • 预热: 在系统启动的时候,提前将热点数据加载到缓存中。
  • 限流: 对热点key的访问进行限流,防止大量的请求同时打到数据库上。
  • 多副本: 为热点key创建多个副本,分散请求压力。

总结:

问题 现象 解决方案 适用场景
缓存穿透 大量请求不存在的key,直击数据库 布隆过滤器:快速判断key是否存在,过滤非法请求 恶意攻击,防止非法key访问数据库
缓存雪崩 大量key同时失效,数据库瞬间崩溃 多级缓存:使用本地缓存等作为Redis的补充;熔断降级:当Redis故障时,熔断请求,提供降级服务;设置不同的过期时间:避免key同时失效 系统维护,Redis故障,避免数据库崩溃
缓存击穿 热点key失效,大量请求涌向数据库 永不过期(逻辑过期):定时更新热点key;互斥锁:只允许一个线程访问数据库,其他线程等待 热点数据访问,避免数据库压力过大

今天我们一起学习了Redis缓存的三大难题以及对应的解决方案。希望大家能够灵活运用这些技巧,让你的系统更加稳定、高效!记住,没有银弹,只有适合场景的解决方案。选择合适的策略,才能让你的缓存发挥最大的价值。感谢大家的参与!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注