Python 分布式锁实现:基于 Redis 或 ZooKeeper 的容错与一致性保障
各位朋友,大家好。今天我们来聊聊 Python 分布式锁的实现,重点关注如何利用 Redis 和 ZooKeeper 这两个强大的工具,构建具备容错性和一致性保障的分布式锁。
什么是分布式锁? 为什么要用它?
在单体应用中,我们通常使用编程语言自带的锁机制(例如 Python 的 threading.Lock)来保证对共享资源的互斥访问。但在分布式系统中,多个独立的进程运行在不同的机器上,这些进程都需要访问同一个共享资源,传统的锁机制就无法满足需求了。
这时候,就需要分布式锁。 简单来说,分布式锁是一种控制分布式系统之间互斥访问共享资源的机制。 它的作用是:
- 互斥性(Mutual Exclusion): 在任何时刻,只有一个客户端能够获得锁。
- 避免死锁(Deadlock Avoidance): 即使持有锁的客户端崩溃,锁也能自动释放,避免其他客户端永远无法获得锁。
- 容错性(Fault Tolerance): 锁服务本身需要具备高可用性,即使部分节点发生故障,锁服务依然能够正常工作。
实现分布式锁的常见方案
实现分布式锁的方案有很多,比较常见的有:
- 基于数据库: 利用数据库的唯一索引或悲观锁来实现。 这种方式简单易懂,但性能相对较差。
- 基于 Redis: 利用 Redis 的
SETNX命令(set if not exists)和过期时间来实现。 性能高,实现简单,但需要注意一些细节,以避免锁的误释放和死锁。 - 基于 ZooKeeper: 利用 ZooKeeper 的临时节点和 Watcher 机制来实现。 提供更强的一致性保证,但也更复杂。
今天,我们重点讨论基于 Redis 和 ZooKeeper 的分布式锁实现。
基于 Redis 的分布式锁
Redis 是一种高性能的键值存储系统,非常适合用于实现分布式锁。
1. 基本实现:SETNX 和过期时间
最简单的 Redis 分布式锁实现,依赖两个关键特性:
SETNX key value(Set If Not Exists): 如果 key 不存在,则将其设置为 value,并返回 1;如果 key 已经存在,则不进行任何操作,并返回 0。- 过期时间 (Expiration): 为 key 设置一个过期时间,即使客户端崩溃,锁也能在一段时间后自动释放。
Python 代码示例:
import redis
import time
import uuid
class RedisLock:
def __init__(self, redis_client, lock_name, lock_timeout=10):
self.redis_client = redis_client
self.lock_name = lock_name
self.lock_key = f"lock:{lock_name}" # 使用前缀,避免key冲突
self.lock_timeout = lock_timeout # 锁的过期时间,单位:秒
self.lock_value = str(uuid.uuid4()) # 唯一标识,用于解锁
def acquire(self):
"""尝试获取锁"""
end = time.time() + self.lock_timeout
while time.time() < end:
if self.redis_client.setnx(self.lock_key, self.lock_value):
self.redis_client.expire(self.lock_key, self.lock_timeout)
return True
time.sleep(0.01) # 短暂休眠,避免CPU空转
return False
def release(self):
"""释放锁"""
try:
if self.redis_client.get(self.lock_key) == self.lock_value.encode(): # 确认是自己加的锁
self.redis_client.delete(self.lock_key)
return True
else:
return False # 不是自己加的锁,不能释放
except Exception as e:
print(f"解锁失败: {e}")
return False
使用示例:
redis_client = redis.Redis(host='localhost', port=6379, db=0)
lock = RedisLock(redis_client, "my_resource", lock_timeout=5)
if lock.acquire():
try:
print("成功获取锁,执行受保护的操作...")
time.sleep(2) # 模拟执行耗时操作
finally:
if lock.release():
print("成功释放锁")
else:
print("释放锁失败")
else:
print("获取锁失败")
解释:
RedisLock类封装了锁的获取和释放逻辑。acquire()方法尝试获取锁,如果获取成功,则立即返回 True。如果获取失败,则循环等待,直到超时。release()方法释放锁,只有持有锁的客户端才能释放锁。- 使用 UUID 作为锁的值,避免不同客户端误释放锁。
lock_key使用前缀,避免与其他 Redis key冲突。
2. 优化:Lua 脚本实现原子性操作
上面的实现存在一个潜在的问题:SETNX 和 EXPIRE 不是原子操作。如果在 SETNX 成功后,EXPIRE 执行之前,客户端崩溃了,那么这个锁将永远不会被释放,导致死锁。
为了解决这个问题,可以使用 Redis 的 Lua 脚本,将 SETNX 和 EXPIRE 合并成一个原子操作。
修改后的 RedisLock 类:
import redis
import time
import uuid
class RedisLock:
def __init__(self, redis_client, lock_name, lock_timeout=10):
self.redis_client = redis_client
self.lock_name = lock_name
self.lock_key = f"lock:{lock_name}"
self.lock_timeout = lock_timeout
self.lock_value = str(uuid.uuid4())
# Lua 脚本,用于原子性地设置 key 和过期时间
self.lock_script = """
if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
else
return 0
end
"""
self.unlock_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
self.lock_script_compiled = self.redis_client.register_script(self.lock_script)
self.unlock_script_compiled = self.redis_client.register_script(self.unlock_script)
def acquire(self):
"""尝试获取锁"""
end = time.time() + self.lock_timeout
while time.time() < end:
if self.lock_script_compiled(keys=[self.lock_key], args=[self.lock_value, self.lock_timeout * 1000]): #过期时间单位: 毫秒
return True
time.sleep(0.01)
return False
def release(self):
"""释放锁"""
try:
result = self.unlock_script_compiled(keys=[self.lock_key], args=[self.lock_value])
return result == 1
except Exception as e:
print(f"解锁失败: {e}")
return False
解释:
lock_script和unlock_script分别是获取锁和释放锁的 Lua 脚本。redis_client.register_script()方法将 Lua 脚本注册到 Redis 服务器,并将返回的对象保存到lock_script_compiled和unlock_script_compiled。这样可以避免每次都将脚本发送到服务器,提高性能。- 在
acquire()方法中,调用lock_script_compiled()方法执行 Lua 脚本,实现原子性操作。 过期时间单位需要是毫秒,因此需要乘以1000. - 在
release()方法中,调用unlock_script_compiled()方法执行Lua脚本,进行解锁。
3. Redlock 算法 (Redis >= 5.0)
虽然 Lua 脚本可以解决原子性问题,但 Redis 单节点仍然存在单点故障的风险。如果 Redis 主节点崩溃,数据尚未同步到从节点,可能会导致多个客户端同时获得锁。
Redlock 算法是一种更高级的分布式锁算法,它通过在多个独立的 Redis 节点上尝试获取锁,来提高锁的可用性和容错性。
Redlock 算法的核心思想是:
- 客户端尝试从 N 个独立的 Redis 节点获取锁。
- 客户端使用相同的 key 和 value,以及一个较短的锁过期时间(例如几百毫秒)。
- 客户端必须在 majority (N/2 + 1) 个节点上成功获取锁,才能认为获取锁成功。
- 如果客户端获取锁失败,则需要释放所有节点上的锁。
- 为了避免死锁,客户端需要在锁过期后,才能再次尝试获取锁。
由于Redlock算法较为复杂,涉及多个Redis实例的管理和协调,这里只给出概念性的说明,并提供一个简单的示例代码框架。 实际生产环境中,建议使用成熟的Redlock库,例如redlock-py。
# (仅为示例代码框架,未经完整测试)
import redis
import time
class Redlock:
def __init__(self, redis_nodes, lock_name, lock_timeout=10): # 单位:秒
self.redis_nodes = redis_nodes # Redis 节点列表,例如:[{'host': '127.0.0.1', 'port': 6379, 'db': 0}, ...]
self.lock_name = lock_name
self.lock_key = f"lock:{lock_name}"
self.lock_timeout = int(lock_timeout * 1000) # 毫秒
self.quorum = len(redis_nodes) // 2 + 1 # 大多数节点数量
self.lock_value = str(uuid.uuid4())
def acquire(self):
"""尝试获取 Redlock"""
start_time = time.time()
success_count = 0
for node in self.redis_nodes:
try:
r = redis.Redis(**node)
if r.set(self.lock_key, self.lock_value, nx=True, px=self.lock_timeout): # 使用 set 命令的 nx 和 px 参数
success_count += 1
except Exception as e:
print(f"获取锁失败: {e}")
# 判断是否获取到大多数节点的锁
if success_count >= self.quorum:
# 验证锁的有效时间,避免时钟漂移导致的问题
validity_time = int(self.lock_timeout - (time.time() - start_time) * 1000)
if validity_time > 0:
return True
else:
self.release_all() # 释放所有节点的锁,因为锁的有效时间不足
return False
else:
self.release_all() # 释放所有节点的锁
return False
def release_all(self):
"""释放所有节点的锁"""
for node in self.redis_nodes:
try:
r = redis.Redis(**node)
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
r.eval(script, 1, self.lock_key, self.lock_value) # 使用 eval 执行 Lua 脚本
except Exception as e:
print(f"释放锁失败: {e}")
重要提示:
- Redlock 算法实现较为复杂,需要仔细考虑时钟漂移、网络延迟等因素。
- 强烈建议使用经过充分测试的 Redlock 库,例如
redlock-py。 - Redlock 算法并不能保证绝对的一致性,但可以显著提高锁的可用性和容错性。
基于 ZooKeeper 的分布式锁
ZooKeeper 是一个分布式协调服务,提供高可用性、高可靠性的分布式数据一致性解决方案。
1. 基本原理:临时节点和 Watcher
ZooKeeper 分布式锁的基本原理是:
- 创建临时节点: 客户端尝试在 ZooKeeper 中创建一个临时节点(ephemeral node),例如
/locks/my_resource。 - 节点唯一性: 由于 ZooKeeper 中同一个路径只能存在一个节点,因此只有一个客户端能够成功创建该节点,从而获得锁。
- 临时性: 如果客户端崩溃,其创建的临时节点会自动删除,从而释放锁。
- Watcher 机制: 其他客户端可以监听该节点的变化。当节点被删除时,ZooKeeper 会通知这些客户端,这些客户端可以再次尝试获取锁。
2. 实现步骤
- 客户端尝试创建一个临时顺序节点,例如
/locks/my_resource/lock-0000000001。 顺序节点保证了创建节点的顺序性。 - 客户端获取
/locks/my_resource目录下所有的子节点。 - 客户端判断自己创建的节点是否是序号最小的节点。
- 如果是,则获得锁。
- 如果不是,则监听序号比自己小的那个节点的变化。
- 当监听的节点被删除时,客户端再次尝试获取锁。
3. Python 代码示例
from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError, NoNodeError
import time
import logging
logging.basicConfig(level=logging.INFO)
class ZookeeperLock:
def __init__(self, hosts, lock_name, lock_path="/locks", timeout=10):
self.hosts = hosts
self.lock_name = lock_name
self.lock_path = f"{lock_path}/{lock_name}"
self.zk = KazooClient(hosts=self.hosts)
self.zk.start()
self.timeout = timeout
# 确保锁的根路径存在
try:
self.zk.create(lock_path, makepath=True) # 递归创建路径
except NodeExistsError:
pass
def acquire(self):
"""尝试获取锁"""
my_lock_path = None
try:
my_lock_path = self.zk.create(self.lock_path + "/lock-", ephemeral=True, sequence=True)
logging.info(f"创建锁节点: {my_lock_path}")
while True:
children = self.zk.get_children(self.lock_path)
children.sort()
my_name = my_lock_path.split('/')[-1]
if children[0] == my_name:
logging.info("成功获取锁")
return True
else:
# 监听比自己小的节点
index = children.index(my_name)
preceding_node = self.lock_path + "/" + children[index - 1]
logging.info(f"监听节点: {preceding_node}")
def watch_callback(event):
logging.info(f"节点 {preceding_node} 发生变化: {event}")
try:
self.zk.get(preceding_node, watch=watch_callback)
time.sleep(0.1) # 短暂休眠
except NoNodeError: # 前面的节点可能已经消失
continue
except Exception as e:
logging.error(f"获取锁失败: {e}")
if my_lock_path:
try:
self.zk.delete(my_lock_path) # 删除自己创建的节点
except NoNodeError:
pass
return False
def release(self):
"""释放锁"""
try:
children = self.zk.get_children(self.lock_path)
children.sort()
my_lock_path = None
for child in children:
if child.startswith("lock-"):
try:
self.zk.delete(f"{self.lock_path}/{child}")
logging.info(f"释放锁节点: {self.lock_path}/{child}")
return True
except NoNodeError:
logging.warning(f"节点 {self.lock_path}/{child} 不存在")
continue
logging.warning("没有找到可以释放的锁节点")
return False
except Exception as e:
logging.error(f"释放锁失败: {e}")
return False
def close(self):
self.zk.stop()
self.zk.close()
使用示例:
hosts = "127.0.0.1:2181" # ZooKeeper 集群地址
lock = ZookeeperLock(hosts, "my_resource")
if lock.acquire():
try:
print("成功获取锁,执行受保护的操作...")
time.sleep(2)
finally:
if lock.release():
print("成功释放锁")
else:
print("释放锁失败")
lock.close()
else:
print("获取锁失败")
lock.close()
解释:
ZookeeperLock类封装了锁的获取和释放逻辑。acquire()方法尝试创建临时顺序节点,并判断自己是否是序号最小的节点。如果不是,则监听序号比自己小的节点的变化。release()方法删除自己创建的节点,释放锁。close()方法关闭 ZooKeeper 连接。
Redis vs ZooKeeper: 如何选择?
| 特性 | Redis | ZooKeeper |
|---|---|---|
| 性能 | 高,适合高并发场景 | 相对较低,但满足一般需求 |
| 可靠性 | 单节点存在单点故障风险,Redlock 可以提高 | 高,基于 Paxos 算法,具有高可用性和一致性 |
| 一致性 | 最终一致性 | 强一致性 |
| 实现复杂度 | 简单 | 相对复杂 |
| 使用场景 | 对性能要求高,容忍短暂不一致的场景 | 对一致性要求高,例如分布式配置管理、服务发现 |
总结:
- 如果对性能要求非常高,且能够容忍短暂的不一致,可以选择 Redis。 使用Lua脚本保证原子性,并考虑Redlock算法提高可用性。
- 如果对一致性要求非常高,且能够接受稍低的性能,可以选择 ZooKeeper。
分布式锁的注意事项
- 锁的粒度: 锁的粒度越细,并发性越高,但实现复杂度也越高。 需要根据实际情况选择合适的锁粒度。
- 锁的超时时间: 锁的超时时间需要根据业务场景进行设置。 如果超时时间过短,可能会导致锁被误释放;如果超时时间过长,可能会导致死锁。
- 重试机制: 获取锁失败后,可以采用重试机制。 重试次数和重试间隔需要根据实际情况进行调整。
- 避免长时间持有锁: 长时间持有锁会降低系统的并发性。 尽量缩短持有锁的时间。
- 监控和告警: 需要对分布式锁进行监控和告警。 及时发现和解决潜在问题。
一些想法
- 分布式锁是解决分布式系统并发问题的关键技术。
- Redis 和 ZooKeeper 是实现分布式锁的常用工具。
- 选择合适的分布式锁方案需要根据实际场景进行权衡。
- 需要注意分布式锁的各种细节,以避免潜在问题。
希望今天的分享对大家有所帮助,谢谢!
更多IT精英技术系列讲座,到智猿学院