MySQL高级讲座篇之:数据库缓存的艺术:雪崩、穿透、击穿问题的成因与应对策略。

各位好,我是老码,今天咱们聊聊数据库缓存这块儿的“疑难杂症”——雪崩、穿透、击穿。这仨兄弟,个顶个的让人头疼,但只要摸清了它们的脾气,对症下药,就能把它们收拾得服服帖帖的。

一、缓存,好东西也可能惹麻烦

缓存,顾名思义,就是把数据暂存在一个更快的地方,比如内存,这样下次要用的时候就不用再去慢悠悠的数据库里捞了。这就像咱们平时用的浏览器缓存,下次打开相同的网页,速度嗖嗖的。

但缓存也不是万能的,用不好就容易出问题。想象一下,如果缓存突然失效,所有请求都直接打到数据库,那数据库可就遭殃了,这就是所谓的“雪崩”。

二、雪崩:集体“阵亡”的惨剧

  • 成因:

    • 大面积缓存同时失效: 比如设置了相同的过期时间,到期后一起失效。
    • 缓存服务器宕机: 整个缓存系统挂了,所有请求都直接访问数据库。
  • 后果:

    • 数据库压力骤增,可能直接崩溃。
    • 服务响应时间急剧上升,用户体验极差。
  • 应对策略:

    • 避免统一过期时间: 给缓存的过期时间加上一个随机值,错开失效时间,避免“集体阵亡”。

      import random
      import time
      
      def set_cache_with_random_expiry(key, value, base_expiry):
          """
          设置缓存,过期时间在base_expiry的基础上加上一个随机值。
          """
          random_expiry = base_expiry + random.randint(0, 300)  # 随机增加0-300秒
          # 这里假设你用的是Redis,换成你实际用的缓存库
          # redis_client.set(key, value, ex=random_expiry)
          print(f"设置缓存 {key},过期时间:{random_expiry} 秒")
          # 模拟缓存存储,实际使用时替换为真正的缓存操作
          cache_store[key] = (value, time.time() + random_expiry)
      
      # 模拟缓存读取
      def get_cache(key):
          if key in cache_store:
              value, expiry_time = cache_store[key]
              if time.time() < expiry_time:
                  return value
              else:
                  del cache_store[key]
                  return None
          else:
              return None
      
      # 模拟全局缓存存储
      cache_store = {}
      
      # 示例
      key1 = "product:1"
      value1 = {"name": "iPhone", "price": 7999}
      set_cache_with_random_expiry(key1, value1, 3600) # 基础过期时间3600秒
      
      key2 = "product:2"
      value2 = {"name": "Android", "price": 6999}
      set_cache_with_random_expiry(key2, value2, 3600)
      
      print(f"从缓存获取 {key1}: {get_cache(key1)}")
      print(f"从缓存获取 {key2}: {get_cache(key2)}")
    • 互斥锁(Mutex): 当缓存失效时,只允许一个线程去数据库加载数据,其他线程等待,避免大量请求同时访问数据库。

      import threading
      import time
      
      # 模拟数据库查询
      def get_data_from_db(key):
          print(f"正在从数据库查询 {key}...")
          time.sleep(2)  # 模拟数据库查询耗时
          data = {"key": key, "value": f"Data from DB for {key}"}
          print(f"从数据库查询到数据: {data}")
          return data
      
      # 缓存读取和重建函数
      def get_data_with_mutex(key):
          # 首先尝试从缓存获取数据
          data = get_cache(key)
          if data:
              print(f"从缓存获取到数据: {data}")
              return data
      
          # 如果缓存未命中,尝试获取锁
          with lock:  # 使用上下文管理器自动释放锁
              # 再次检查缓存,防止多个线程同时重建缓存
              data = get_cache(key)
              if data:
                  print(f"从缓存获取到数据: {data}")
                  return data
      
              # 如果缓存仍然未命中,从数据库加载数据
              data = get_data_from_db(key)
      
              # 将数据写入缓存
              set_cache(key, data, 3600)  # 设置过期时间为3600秒
              print(f"已将数据写入缓存: {data}")
              return data
      
      # 模拟缓存设置
      def set_cache(key, value, expiry):
          # 这里假设你用的是Redis,换成你实际用的缓存库
          # redis_client.set(key, value, ex=expiry)
          print(f"设置缓存 {key},过期时间:{expiry} 秒")
          # 模拟缓存存储,实际使用时替换为真正的缓存操作
          cache_store[key] = (value, time.time() + expiry)
      
      # 模拟缓存读取
      def get_cache(key):
          if key in cache_store:
              value, expiry_time = cache_store[key]
              if time.time() < expiry_time:
                  return value
              else:
                  del cache_store[key]
                  return None
          else:
              return None
      
      # 模拟全局缓存存储
      cache_store = {}
      # 创建锁
      lock = threading.Lock()
      
      # 创建多个线程并发访问
      threads = []
      for i in range(5):
          thread = threading.Thread(target=get_data_with_mutex, args=("key1",))
          threads.append(thread)
          thread.start()
      
      # 等待所有线程完成
      for thread in threads:
          thread.join()
      
      print("所有线程已完成")
      
    • 服务降级: 在缓存失效时,提供一个备用方案,比如返回一个默认值或者简单的静态页面,保证服务可用性。

    • 熔断机制: 如果数据库持续出现问题,可以暂时熔断缓存,直接返回错误,避免大量无效请求冲击数据库。

    • 多级缓存: 使用本地缓存(如Guava Cache) + 分布式缓存(如Redis)的多级缓存架构,降低分布式缓存的压力。 本地缓存速度快,但容量有限,分布式缓存容量大,但速度稍慢。

  • 预防胜于治疗:

    • 监控: 监控缓存的命中率、失效时间等指标,及时发现问题。
    • 预热: 在系统启动时,提前加载一部分热点数据到缓存中。

三、穿透:查无此人的尴尬

  • 成因:

    • 大量请求查询不存在的数据,缓存中没有,数据库中也没有,每次都直接访问数据库。
    • 恶意攻击者故意构造不存在的key,绕过缓存,直接攻击数据库。
  • 后果:

    • 数据库压力增大,可能被大量无效请求拖垮。
  • 应对策略:

    • 缓存空对象: 当数据库中不存在对应的数据时,也在缓存中设置一个空对象(比如null),下次再请求相同key时,直接从缓存返回,避免访问数据库。

      import time
      
      # 模拟数据库查询
      def get_data_from_db(key):
          print(f"正在从数据库查询 {key}...")
          time.sleep(2)  # 模拟数据库查询耗时
          # 模拟数据库中不存在该数据
          print(f"数据库中不存在 key 为 {key} 的数据")
          return None  # 返回 None 表示数据库中不存在该数据
      
      # 缓存读取和重建函数
      def get_data_with_null_cache(key):
          # 首先尝试从缓存获取数据
          data = get_cache(key)
          if data is not None:
              if data == "NULL_VALUE":
                  print(f"从缓存获取到空对象,key: {key}")
                  return None  # 返回 None 表示数据库中不存在该数据
              else:
                  print(f"从缓存获取到数据: {data}")
                  return data
      
          # 如果缓存未命中,从数据库加载数据
          data = get_data_from_db(key)
      
          # 如果数据库中不存在该数据,则将空对象写入缓存
          if data is None:
              set_cache(key, "NULL_VALUE", 60)  # 设置过期时间为60秒
              print(f"已将空对象写入缓存,key: {key}")
              return None  # 返回 None 表示数据库中不存在该数据
      
          # 将数据写入缓存
          set_cache(key, data, 3600)  # 设置过期时间为3600秒
          print(f"已将数据写入缓存: {data}")
          return data
      
      # 模拟缓存设置
      def set_cache(key, value, expiry):
          # 这里假设你用的是Redis,换成你实际用的缓存库
          # redis_client.set(key, value, ex=expiry)
          print(f"设置缓存 {key},过期时间:{expiry} 秒")
          # 模拟缓存存储,实际使用时替换为真正的缓存操作
          cache_store[key] = (value, time.time() + expiry)
      
      # 模拟缓存读取
      def get_cache(key):
          if key in cache_store:
              value, expiry_time = cache_store[key]
              if time.time() < expiry_time:
                  return value
              else:
                  del cache_store[key]
                  return None
          else:
              return None
      
      # 模拟全局缓存存储
      cache_store = {}
      
      # 示例:查询一个不存在的 key
      key = "non_existent_key"
      data = get_data_with_null_cache(key)
      if data is None:
          print(f"查询 {key} 失败,数据库中不存在该数据")
      else:
          print(f"查询到数据: {data}")
      
      # 再次查询该 key,验证是否从缓存中读取空对象
      data = get_data_with_null_cache(key)
      if data is None:
          print(f"第二次查询 {key} 失败,从缓存读取空对象")
      else:
          print(f"第二次查询到数据: {data}")
      
    • 布隆过滤器(Bloom Filter): 在访问缓存之前,先通过布隆过滤器判断key是否存在,如果不存在,直接拦截,避免访问缓存和数据库。 布隆过滤器是一种概率型数据结构,可以高效地判断一个元素是否存在于一个集合中。 它有一定的误判率,但可以有效地过滤掉不存在的key。

      from pybloom_live import BloomFilter
      
      # 初始化布隆过滤器
      # capacity:预期存储的元素数量
      # error_rate:误判率,越小占用空间越大
      bloom_filter = BloomFilter(capacity=1000, error_rate=0.01)
      
      # 将数据库中的所有 key 添加到布隆过滤器中
      existing_keys = ["product:1", "product:2", "product:3"]  # 模拟数据库中的 key
      for key in existing_keys:
          bloom_filter.add(key)
      
      # 模拟数据库查询
      def get_data_from_db(key):
          print(f"正在从数据库查询 {key}...")
          time.sleep(2)  # 模拟数据库查询耗时
          # 模拟数据库查询结果
          if key in existing_keys:
              data = {"key": key, "value": f"Data from DB for {key}"}
              print(f"从数据库查询到数据: {data}")
              return data
          else:
              print(f"数据库中不存在 key 为 {key} 的数据")
              return None  # 返回 None 表示数据库中不存在该数据
      
      # 缓存读取和重建函数
      def get_data_with_bloom_filter(key):
          # 首先使用布隆过滤器判断 key 是否存在
          if key in bloom_filter:
              # key 可能存在,尝试从缓存获取数据
              data = get_cache(key)
              if data:
                  print(f"从缓存获取到数据: {data}")
                  return data
      
              # 如果缓存未命中,从数据库加载数据
              data = get_data_from_db(key)
      
              # 如果数据库中存在该数据,则将数据写入缓存
              if data:
                  set_cache(key, data, 3600)  # 设置过期时间为3600秒
                  print(f"已将数据写入缓存: {data}")
                  return data
              else:
                  return None
          else:
              # key 肯定不存在,直接返回 None
              print(f"使用布隆过滤器判断 key {key} 肯定不存在,直接返回")
              return None
      
      # 模拟缓存设置
      def set_cache(key, value, expiry):
          # 这里假设你用的是Redis,换成你实际用的缓存库
          # redis_client.set(key, value, ex=expiry)
          print(f"设置缓存 {key},过期时间:{expiry} 秒")
          # 模拟缓存存储,实际使用时替换为真正的缓存操作
          cache_store[key] = (value, time.time() + expiry)
      
      # 模拟缓存读取
      def get_cache(key):
          if key in cache_store:
              value, expiry_time = cache_store[key]
              if time.time() < expiry_time:
                  return value
              else:
                  del cache_store[key]
                  return None
          else:
              return None
      
      # 模拟全局缓存存储
      cache_store = {}
      
      # 示例:查询一个存在的 key
      key1 = "product:1"
      data1 = get_data_with_bloom_filter(key1)
      if data1:
          print(f"查询到数据: {data1}")
      else:
          print(f"查询 {key1} 失败")
      
      # 示例:查询一个不存在的 key
      key2 = "product:4"
      data2 = get_data_with_bloom_filter(key2)
      if data2:
          print(f"查询到数据: {data2}")
      else:
          print(f"查询 {key2} 失败")
    • 参数校验: 对请求的参数进行严格校验,过滤掉非法参数,避免恶意攻击。

四、击穿:热点数据的危机

  • 成因:

    • 某个热点key过期,大量请求同时访问该key,缓存失效,所有请求直接打到数据库。
  • 后果:

    • 数据库压力骤增,可能被热点数据击垮。
  • 应对策略:

    • 永不过期: 对于热点数据,可以设置永不过期,或者设置一个很长的过期时间。 但要注意,如果数据更新,需要及时更新缓存。
    • 互斥锁(Mutex): 和雪崩的应对策略类似,当缓存失效时,只允许一个线程去数据库加载数据,其他线程等待,避免大量请求同时访问数据库。
    • 预热: 在系统启动时,提前加载热点数据到缓存中。
    • 二级缓存: 使用本地缓存 + 分布式缓存 的方式。 当热点数据失效时,本地缓存仍然可以提供服务,缓解数据库压力。
    • 热点数据备份: 将热点数据备份多份,分散到不同的缓存节点,降低单个节点的压力。

五、总结

问题 成因 后果 应对策略
雪崩 大面积缓存同时失效,缓存服务器宕机 数据库压力骤增,服务崩溃 避免统一过期时间,互斥锁,服务降级,熔断机制,多级缓存,监控,预热
穿透 大量请求查询不存在的数据 数据库压力增大,可能被拖垮 缓存空对象,布隆过滤器,参数校验
击穿 热点key过期 数据库压力骤增,可能被热点数据击垮 永不过期,互斥锁,预热,二级缓存,热点数据备份

解决缓存问题没有银弹,需要根据实际场景选择合适的策略。 记住,缓存是为了提升性能,而不是为了增加复杂性。 在设计缓存方案时,要充分考虑各种风险,并制定相应的应对措施。

好了,今天的讲座就到这里,希望对大家有所帮助。 记住,代码在手,bug抖三抖!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注