各位观众老爷,晚上好! 今儿咱们来聊聊Python这门“万金油”语言里,如何用Redis这把“瑞士军刀”来搞定缓存和分布式锁这两大难题。Redis-py 呢,就是咱们手里的“工具箱”,里面各种扳手螺丝刀,哦不对,是各种函数方法,应有尽有。
第一部分:Redis-py“工具箱”初探
首先,咱们得把“工具箱”请到家里来,也就是安装redis-py库。打开你的终端,输入:
pip install redis
安装完毕,就可以开始“拆箱”了。Redis-py 提供了Redis客户端,可以连接到你的Redis服务器。最简单的用法就是:
import redis
# 连接Redis服务器 (默认host='localhost', port=6379, db=0)
r = redis.Redis(host='localhost', port=6379, db=0)
# 试试Ping一下,看看连接是不是正常
try:
r.ping()
print("Redis连接成功!")
except redis.exceptions.ConnectionError as e:
print(f"Redis连接失败:{e}")
exit()
# 存个字符串进去
r.set('mykey', 'Hello, Redis!')
# 取出来看看
value = r.get('mykey')
print(f"从Redis取出的值:{value.decode('utf-8')}") # 注意decode,Redis里存的是字节
这段代码就演示了最基本的连接、设置和获取键值对。记住,Redis里存储的是字节,所以取出来的时候要decode成字符串。
第二部分:缓存:让你的程序飞起来
缓存,顾名思义,就是把一些“昂贵”的数据,比如数据库查询结果,计算结果,存到Redis里,下次再要用的时候直接从Redis取,不用再费力气去数据库查或者重新计算。这样可以大大提高程序的响应速度。
2.1 简单的缓存示例
假设我们有个函数,需要从数据库查询用户信息,这个查询很耗时。我们可以用Redis来缓存查询结果:
import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0)
def get_user_info_from_db(user_id):
"""
模拟从数据库查询用户信息
这是一个非常耗时的操作
"""
print(f"正在从数据库查询用户ID为 {user_id} 的信息...")
time.sleep(2) # 模拟数据库查询的耗时
user_info = {'id': user_id, 'name': f'用户{user_id}', 'email': f'user{user_id}@example.com'}
return user_info
def get_user_info(user_id):
"""
先从Redis缓存中查找用户信息,如果找不到,再从数据库查询,并缓存到Redis
"""
key = f'user:{user_id}' # 缓存的key,建议带上业务标识
user_info = r.get(key)
if user_info:
print(f"从Redis缓存中获取用户ID为 {user_id} 的信息")
return eval(user_info.decode('utf-8')) # 从字符串还原为字典 (注意安全性,如果数据来源不可信,不要使用eval)
else:
user_info = get_user_info_from_db(user_id)
r.set(key, str(user_info), ex=60) # 缓存60秒,ex参数设置过期时间
print(f"用户ID为 {user_id} 的信息已缓存到Redis")
return user_info
# 测试
user_id = 1
user_info1 = get_user_info(user_id)
print(user_info1)
user_info2 = get_user_info(user_id) # 第二次查询,直接从Redis获取
print(user_info2)
在这个例子中,get_user_info
函数首先尝试从Redis获取用户信息,如果Redis里没有,才调用get_user_info_from_db
从数据库查询,并将结果缓存到Redis,设置了60秒的过期时间。
2.2 缓存策略:过期时间很重要
缓存的过期时间非常重要。如果过期时间设置得太短,缓存命中率会降低,起不到缓存的效果;如果过期时间设置得太长,缓存中的数据可能与数据库中的数据不一致,导致脏数据。
常见的缓存策略有:
- TTL (Time To Live):为每个缓存项设置一个过期时间,过期后自动失效。上面的例子就是用的TTL策略。
- LRU (Least Recently Used):当缓存达到容量上限时,移除最近最少使用的缓存项。Redis可以通过配置
maxmemory-policy
来实现LRU策略。 - LFU (Least Frequently Used):当缓存达到容量上限时,移除使用频率最低的缓存项。 Redis可以通过配置
maxmemory-policy
来实现LFU策略。
2.3 缓存击穿、缓存穿透、缓存雪崩
这三个“兄弟”都是缓存使用中常见的问题,需要小心应对。
- 缓存击穿:某个热点key过期了,大量请求同时访问这个key,导致请求直接打到数据库,造成数据库压力过大。 解决方案:
- 互斥锁:只允许一个请求去查询数据库,并将结果缓存到Redis。其他请求等待,获取缓存。 (这就是咱们接下来要讲的分布式锁)
- 永不过期:热点key永不过期,或者设置一个很长的过期时间。
- 缓存穿透:请求的key在Redis和数据库中都不存在,导致每次请求都打到数据库。 解决方案:
- 缓存空对象:即使数据库中不存在该key,也缓存一个空对象,并设置一个较短的过期时间。
- 布隆过滤器:在缓存之前,使用布隆过滤器过滤掉不存在的key。
- 缓存雪崩:大量的key在同一时间过期,导致大量的请求直接打到数据库。 解决方案:
- 随机过期时间:为每个key设置一个随机的过期时间,避免大量key同时过期。
- 互斥锁:同缓存击穿的互斥锁方案。
- 服务降级:当Redis挂掉时,提供一个备用方案,比如直接返回默认值。
咱们用一个表格来总结一下:
问题 | 描述 | 解决方案 |
---|---|---|
缓存击穿 | 热点key过期,大量请求直接打到数据库。 | 互斥锁、永不过期 |
缓存穿透 | 请求的key在Redis和数据库中都不存在,每次请求都打到数据库。 | 缓存空对象、布隆过滤器 |
缓存雪崩 | 大量的key在同一时间过期,导致大量的请求直接打到数据库。 | 随机过期时间、互斥锁、服务降级 |
第三部分:分布式锁:让你的程序井然有序
在分布式系统中,多个进程可能同时访问共享资源,为了避免数据竞争,需要使用分布式锁。 Redis提供了一种简单高效的实现分布式锁的方式。
3.1 Redis实现分布式锁
Redis-py提供了setnx
(SET if Not eXists) 命令,可以用来实现分布式锁。
import redis
import time
import uuid
r = redis.Redis(host='localhost', port=6379, db=0)
def acquire_lock(lock_name, acquire_timeout=10, lock_timeout=10):
"""
获取锁
:param lock_name: 锁的名称
:param acquire_timeout: 获取锁的超时时间,单位秒
:param lock_timeout: 锁的过期时间,单位秒
:return: 锁的value,如果获取失败返回None
"""
lock_identifier = str(uuid.uuid4()) # 唯一的锁标识
lock_key = f'lock:{lock_name}'
end = time.time() + acquire_timeout
while time.time() < end:
if r.setnx(lock_key, lock_identifier): # 尝试获取锁
r.expire(lock_key, lock_timeout) # 设置过期时间,防止死锁
return lock_identifier
elif r.ttl(lock_key) == -1: # 如果锁没有设置过期时间,则设置一个
r.expire(lock_key, lock_timeout)
time.sleep(0.001) # 稍微等待一下,避免CPU空转
return None # 获取锁超时
def release_lock(lock_name, lock_identifier):
"""
释放锁
:param lock_name: 锁的名称
:param lock_identifier: 锁的value
:return: True if released, False otherwise
"""
lock_key = f'lock:{lock_name}'
try:
with r.pipeline() as pipe:
pipe.watch(lock_key) # 监听lock_key的变化
if pipe.get(lock_key) == lock_identifier.encode('utf-8'):
pipe.multi() # 开启事务
pipe.delete(lock_key)
result = pipe.execute() # 执行事务
if result is None:
return False # 锁可能被其他客户端修改
return result[0] == 1 # 成功删除返回True
else:
pipe.unwatch() # 取消监听
return False # 锁已经被其他客户端持有
except redis.exceptions.WatchError:
return False # 锁可能在watch期间被修改
# 测试
lock_name = 'my_resource'
lock_identifier = acquire_lock(lock_name)
if lock_identifier:
try:
print("成功获取锁!")
# 模拟执行一些需要锁保护的操作
time.sleep(5)
print("操作完成!")
finally:
release_result = release_lock(lock_name, lock_identifier)
if release_result:
print("成功释放锁!")
else:
print("释放锁失败!")
else:
print("获取锁失败!")
这段代码定义了acquire_lock
和 release_lock
两个函数,分别用于获取和释放锁。
acquire_lock
:使用setnx
命令尝试设置锁,如果设置成功,说明获取锁成功。为了防止死锁,还设置了过期时间。如果在指定时间内没有获取到锁,则返回None。release_lock
:使用delete
命令释放锁。为了保证释放的是自己的锁,使用watch
命令监听锁的变化,并在事务中判断锁的value是否与自己的lock_identifier相同。
3.2 Redlock:更可靠的分布式锁
上面的简单实现有一个问题:如果Redis是单点部署,当Redis宕机时,锁就失效了。 Redlock 算法是一种更可靠的分布式锁算法,它使用多个独立的Redis实例,只有当超过半数的Redis实例都获取到锁时,才认为获取锁成功。
Redlock算法比较复杂,这里就不贴代码了,有兴趣的同学可以参考Redis官方文档:https://redis.io/docs/reference/patterns/distributed-locks/
3.3 Lua脚本:保证原子性
在Redis中执行多个命令时,可以使用Lua脚本来保证原子性。 比如,上面的release_lock
函数可以使用Lua脚本来实现:
import redis
import time
import uuid
r = redis.Redis(host='localhost', port=6379, db=0)
# 定义Lua脚本
release_lock_script = """
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
"""
# 将脚本加载到Redis服务器
release_lock = r.register_script(release_lock_script)
def acquire_lock(lock_name, acquire_timeout=10, lock_timeout=10):
"""
获取锁
:param lock_name: 锁的名称
:param acquire_timeout: 获取锁的超时时间,单位秒
:param lock_timeout: 锁的过期时间,单位秒
:return: 锁的value,如果获取失败返回None
"""
lock_identifier = str(uuid.uuid4()) # 唯一的锁标识
lock_key = f'lock:{lock_name}'
end = time.time() + acquire_timeout
while time.time() < end:
if r.setnx(lock_key, lock_identifier): # 尝试获取锁
r.expire(lock_key, lock_timeout) # 设置过期时间,防止死锁
return lock_identifier
elif r.ttl(lock_key) == -1: # 如果锁没有设置过期时间,则设置一个
r.expire(lock_key, lock_timeout)
time.sleep(0.001) # 稍微等待一下,避免CPU空转
return None # 获取锁超时
def release_lock_lua(lock_name, lock_identifier):
"""
使用Lua脚本释放锁
:param lock_name: 锁的名称
:param lock_identifier: 锁的value
:return: True if released, False otherwise
"""
lock_key = f'lock:{lock_name}'
result = release_lock(keys=[lock_key], args=[lock_identifier])
return result == 1
# 测试
lock_name = 'my_resource'
lock_identifier = acquire_lock(lock_name)
if lock_identifier:
try:
print("成功获取锁!")
# 模拟执行一些需要锁保护的操作
time.sleep(5)
print("操作完成!")
finally:
release_result = release_lock_lua(lock_name, lock_identifier)
if release_result:
print("成功释放锁!")
else:
print("释放锁失败!")
else:
print("获取锁失败!")
在这个例子中,我们定义了一个Lua脚本,用于判断锁的value是否与自己的lock_identifier相同,如果相同,则删除锁。然后使用r.register_script
将脚本加载到Redis服务器,并生成一个可调用的函数release_lock
。 这样可以保证释放锁的原子性。
3.4 分布式锁的应用场景
分布式锁有很多应用场景,比如:
- 防止重复提交:在处理用户提交的表单时,可以使用分布式锁来防止用户重复提交。
- 定时任务调度:在分布式系统中,可以使用分布式锁来保证只有一个节点执行定时任务。
- 秒杀活动:在秒杀活动中,可以使用分布式锁来防止超卖。
总结
今天咱们聊了Redis-py在缓存和分布式锁中的应用。缓存可以提高程序的响应速度,分布式锁可以保证程序的并发安全性。掌握这些技术,可以让你写出更高效、更可靠的Python程序。 希望各位观众老爷以后在写代码的时候,能想起今天咱们聊的这些东西,用好Redis这把“瑞士军刀”,搞定各种难题。 散会!