各位好,我是老码,今天咱们聊聊数据库缓存这块儿的“疑难杂症”——雪崩、穿透、击穿。这仨兄弟,个顶个的让人头疼,但只要摸清了它们的脾气,对症下药,就能把它们收拾得服服帖帖的。
一、缓存,好东西也可能惹麻烦
缓存,顾名思义,就是把数据暂存在一个更快的地方,比如内存,这样下次要用的时候就不用再去慢悠悠的数据库里捞了。这就像咱们平时用的浏览器缓存,下次打开相同的网页,速度嗖嗖的。
但缓存也不是万能的,用不好就容易出问题。想象一下,如果缓存突然失效,所有请求都直接打到数据库,那数据库可就遭殃了,这就是所谓的“雪崩”。
二、雪崩:集体“阵亡”的惨剧
-
成因:
- 大面积缓存同时失效: 比如设置了相同的过期时间,到期后一起失效。
- 缓存服务器宕机: 整个缓存系统挂了,所有请求都直接访问数据库。
-
后果:
- 数据库压力骤增,可能直接崩溃。
- 服务响应时间急剧上升,用户体验极差。
-
应对策略:
-
避免统一过期时间: 给缓存的过期时间加上一个随机值,错开失效时间,避免“集体阵亡”。
import random import time def set_cache_with_random_expiry(key, value, base_expiry): """ 设置缓存,过期时间在base_expiry的基础上加上一个随机值。 """ random_expiry = base_expiry + random.randint(0, 300) # 随机增加0-300秒 # 这里假设你用的是Redis,换成你实际用的缓存库 # redis_client.set(key, value, ex=random_expiry) print(f"设置缓存 {key},过期时间:{random_expiry} 秒") # 模拟缓存存储,实际使用时替换为真正的缓存操作 cache_store[key] = (value, time.time() + random_expiry) # 模拟缓存读取 def get_cache(key): if key in cache_store: value, expiry_time = cache_store[key] if time.time() < expiry_time: return value else: del cache_store[key] return None else: return None # 模拟全局缓存存储 cache_store = {} # 示例 key1 = "product:1" value1 = {"name": "iPhone", "price": 7999} set_cache_with_random_expiry(key1, value1, 3600) # 基础过期时间3600秒 key2 = "product:2" value2 = {"name": "Android", "price": 6999} set_cache_with_random_expiry(key2, value2, 3600) print(f"从缓存获取 {key1}: {get_cache(key1)}") print(f"从缓存获取 {key2}: {get_cache(key2)}")
-
互斥锁(Mutex): 当缓存失效时,只允许一个线程去数据库加载数据,其他线程等待,避免大量请求同时访问数据库。
import threading import time # 模拟数据库查询 def get_data_from_db(key): print(f"正在从数据库查询 {key}...") time.sleep(2) # 模拟数据库查询耗时 data = {"key": key, "value": f"Data from DB for {key}"} print(f"从数据库查询到数据: {data}") return data # 缓存读取和重建函数 def get_data_with_mutex(key): # 首先尝试从缓存获取数据 data = get_cache(key) if data: print(f"从缓存获取到数据: {data}") return data # 如果缓存未命中,尝试获取锁 with lock: # 使用上下文管理器自动释放锁 # 再次检查缓存,防止多个线程同时重建缓存 data = get_cache(key) if data: print(f"从缓存获取到数据: {data}") return data # 如果缓存仍然未命中,从数据库加载数据 data = get_data_from_db(key) # 将数据写入缓存 set_cache(key, data, 3600) # 设置过期时间为3600秒 print(f"已将数据写入缓存: {data}") return data # 模拟缓存设置 def set_cache(key, value, expiry): # 这里假设你用的是Redis,换成你实际用的缓存库 # redis_client.set(key, value, ex=expiry) print(f"设置缓存 {key},过期时间:{expiry} 秒") # 模拟缓存存储,实际使用时替换为真正的缓存操作 cache_store[key] = (value, time.time() + expiry) # 模拟缓存读取 def get_cache(key): if key in cache_store: value, expiry_time = cache_store[key] if time.time() < expiry_time: return value else: del cache_store[key] return None else: return None # 模拟全局缓存存储 cache_store = {} # 创建锁 lock = threading.Lock() # 创建多个线程并发访问 threads = [] for i in range(5): thread = threading.Thread(target=get_data_with_mutex, args=("key1",)) threads.append(thread) thread.start() # 等待所有线程完成 for thread in threads: thread.join() print("所有线程已完成")
-
服务降级: 在缓存失效时,提供一个备用方案,比如返回一个默认值或者简单的静态页面,保证服务可用性。
-
熔断机制: 如果数据库持续出现问题,可以暂时熔断缓存,直接返回错误,避免大量无效请求冲击数据库。
-
多级缓存: 使用本地缓存(如Guava Cache) + 分布式缓存(如Redis)的多级缓存架构,降低分布式缓存的压力。 本地缓存速度快,但容量有限,分布式缓存容量大,但速度稍慢。
-
-
预防胜于治疗:
- 监控: 监控缓存的命中率、失效时间等指标,及时发现问题。
- 预热: 在系统启动时,提前加载一部分热点数据到缓存中。
三、穿透:查无此人的尴尬
-
成因:
- 大量请求查询不存在的数据,缓存中没有,数据库中也没有,每次都直接访问数据库。
- 恶意攻击者故意构造不存在的key,绕过缓存,直接攻击数据库。
-
后果:
- 数据库压力增大,可能被大量无效请求拖垮。
-
应对策略:
-
缓存空对象: 当数据库中不存在对应的数据时,也在缓存中设置一个空对象(比如null),下次再请求相同key时,直接从缓存返回,避免访问数据库。
import time # 模拟数据库查询 def get_data_from_db(key): print(f"正在从数据库查询 {key}...") time.sleep(2) # 模拟数据库查询耗时 # 模拟数据库中不存在该数据 print(f"数据库中不存在 key 为 {key} 的数据") return None # 返回 None 表示数据库中不存在该数据 # 缓存读取和重建函数 def get_data_with_null_cache(key): # 首先尝试从缓存获取数据 data = get_cache(key) if data is not None: if data == "NULL_VALUE": print(f"从缓存获取到空对象,key: {key}") return None # 返回 None 表示数据库中不存在该数据 else: print(f"从缓存获取到数据: {data}") return data # 如果缓存未命中,从数据库加载数据 data = get_data_from_db(key) # 如果数据库中不存在该数据,则将空对象写入缓存 if data is None: set_cache(key, "NULL_VALUE", 60) # 设置过期时间为60秒 print(f"已将空对象写入缓存,key: {key}") return None # 返回 None 表示数据库中不存在该数据 # 将数据写入缓存 set_cache(key, data, 3600) # 设置过期时间为3600秒 print(f"已将数据写入缓存: {data}") return data # 模拟缓存设置 def set_cache(key, value, expiry): # 这里假设你用的是Redis,换成你实际用的缓存库 # redis_client.set(key, value, ex=expiry) print(f"设置缓存 {key},过期时间:{expiry} 秒") # 模拟缓存存储,实际使用时替换为真正的缓存操作 cache_store[key] = (value, time.time() + expiry) # 模拟缓存读取 def get_cache(key): if key in cache_store: value, expiry_time = cache_store[key] if time.time() < expiry_time: return value else: del cache_store[key] return None else: return None # 模拟全局缓存存储 cache_store = {} # 示例:查询一个不存在的 key key = "non_existent_key" data = get_data_with_null_cache(key) if data is None: print(f"查询 {key} 失败,数据库中不存在该数据") else: print(f"查询到数据: {data}") # 再次查询该 key,验证是否从缓存中读取空对象 data = get_data_with_null_cache(key) if data is None: print(f"第二次查询 {key} 失败,从缓存读取空对象") else: print(f"第二次查询到数据: {data}")
-
布隆过滤器(Bloom Filter): 在访问缓存之前,先通过布隆过滤器判断key是否存在,如果不存在,直接拦截,避免访问缓存和数据库。 布隆过滤器是一种概率型数据结构,可以高效地判断一个元素是否存在于一个集合中。 它有一定的误判率,但可以有效地过滤掉不存在的key。
from pybloom_live import BloomFilter # 初始化布隆过滤器 # capacity:预期存储的元素数量 # error_rate:误判率,越小占用空间越大 bloom_filter = BloomFilter(capacity=1000, error_rate=0.01) # 将数据库中的所有 key 添加到布隆过滤器中 existing_keys = ["product:1", "product:2", "product:3"] # 模拟数据库中的 key for key in existing_keys: bloom_filter.add(key) # 模拟数据库查询 def get_data_from_db(key): print(f"正在从数据库查询 {key}...") time.sleep(2) # 模拟数据库查询耗时 # 模拟数据库查询结果 if key in existing_keys: data = {"key": key, "value": f"Data from DB for {key}"} print(f"从数据库查询到数据: {data}") return data else: print(f"数据库中不存在 key 为 {key} 的数据") return None # 返回 None 表示数据库中不存在该数据 # 缓存读取和重建函数 def get_data_with_bloom_filter(key): # 首先使用布隆过滤器判断 key 是否存在 if key in bloom_filter: # key 可能存在,尝试从缓存获取数据 data = get_cache(key) if data: print(f"从缓存获取到数据: {data}") return data # 如果缓存未命中,从数据库加载数据 data = get_data_from_db(key) # 如果数据库中存在该数据,则将数据写入缓存 if data: set_cache(key, data, 3600) # 设置过期时间为3600秒 print(f"已将数据写入缓存: {data}") return data else: return None else: # key 肯定不存在,直接返回 None print(f"使用布隆过滤器判断 key {key} 肯定不存在,直接返回") return None # 模拟缓存设置 def set_cache(key, value, expiry): # 这里假设你用的是Redis,换成你实际用的缓存库 # redis_client.set(key, value, ex=expiry) print(f"设置缓存 {key},过期时间:{expiry} 秒") # 模拟缓存存储,实际使用时替换为真正的缓存操作 cache_store[key] = (value, time.time() + expiry) # 模拟缓存读取 def get_cache(key): if key in cache_store: value, expiry_time = cache_store[key] if time.time() < expiry_time: return value else: del cache_store[key] return None else: return None # 模拟全局缓存存储 cache_store = {} # 示例:查询一个存在的 key key1 = "product:1" data1 = get_data_with_bloom_filter(key1) if data1: print(f"查询到数据: {data1}") else: print(f"查询 {key1} 失败") # 示例:查询一个不存在的 key key2 = "product:4" data2 = get_data_with_bloom_filter(key2) if data2: print(f"查询到数据: {data2}") else: print(f"查询 {key2} 失败")
-
参数校验: 对请求的参数进行严格校验,过滤掉非法参数,避免恶意攻击。
-
四、击穿:热点数据的危机
-
成因:
- 某个热点key过期,大量请求同时访问该key,缓存失效,所有请求直接打到数据库。
-
后果:
- 数据库压力骤增,可能被热点数据击垮。
-
应对策略:
- 永不过期: 对于热点数据,可以设置永不过期,或者设置一个很长的过期时间。 但要注意,如果数据更新,需要及时更新缓存。
- 互斥锁(Mutex): 和雪崩的应对策略类似,当缓存失效时,只允许一个线程去数据库加载数据,其他线程等待,避免大量请求同时访问数据库。
- 预热: 在系统启动时,提前加载热点数据到缓存中。
- 二级缓存: 使用本地缓存 + 分布式缓存 的方式。 当热点数据失效时,本地缓存仍然可以提供服务,缓解数据库压力。
- 热点数据备份: 将热点数据备份多份,分散到不同的缓存节点,降低单个节点的压力。
五、总结
问题 | 成因 | 后果 | 应对策略 |
---|---|---|---|
雪崩 | 大面积缓存同时失效,缓存服务器宕机 | 数据库压力骤增,服务崩溃 | 避免统一过期时间,互斥锁,服务降级,熔断机制,多级缓存,监控,预热 |
穿透 | 大量请求查询不存在的数据 | 数据库压力增大,可能被拖垮 | 缓存空对象,布隆过滤器,参数校验 |
击穿 | 热点key过期 | 数据库压力骤增,可能被热点数据击垮 | 永不过期,互斥锁,预热,二级缓存,热点数据备份 |
解决缓存问题没有银弹,需要根据实际场景选择合适的策略。 记住,缓存是为了提升性能,而不是为了增加复杂性。 在设计缓存方案时,要充分考虑各种风险,并制定相应的应对措施。
好了,今天的讲座就到这里,希望对大家有所帮助。 记住,代码在手,bug抖三抖!