Python 分布式锁机制:保证资源访问的互斥性与一致性
大家好,今天我们来聊聊 Python 中的分布式锁。在单机环境中,锁机制相对简单,比如可以使用 Python 的 threading.Lock 来保证多线程访问共享资源的互斥性。但是,当应用扩展到分布式环境,多个服务实例同时运行,单机锁就失效了。我们需要一种能够在多个实例之间协调,保证资源访问互斥性的机制,这就是分布式锁。
为什么需要分布式锁?
想象一个场景:多个用户同时购买同一件商品,库存只有一个。如果没有锁机制,多个服务实例可能同时读取到剩余库存为 1,然后都进行扣减操作,导致超卖。分布式锁就是为了解决这类并发问题,保证数据的一致性。
更具体地说,分布式锁主要解决以下问题:
- 互斥性 (Mutual Exclusion): 保证在任何时刻,只有一个客户端可以获得锁。
- 容错性 (Fault Tolerance): 即使持有锁的客户端崩溃,锁也应该能够自动释放,避免死锁。
- 高可用性 (High Availability): 锁服务本身应该是高可用的,避免单点故障。
分布式锁的实现方式
常见的分布式锁实现方式包括:
- 基于数据库的锁
- 基于 Redis 的锁
- 基于 ZooKeeper 的锁
我们来逐一分析。
1. 基于数据库的锁
这种方式是最简单的,利用数据库的唯一索引或者悲观锁来实现。
a. 基于唯一索引
创建一个表,包含一个唯一索引的字段,尝试插入一条数据,如果插入成功,则获取锁;如果插入失败,则说明锁已被其他客户端持有。
import pymysql
class DatabaseLock:
def __init__(self, host, user, password, database, lock_name):
self.host = host
self.user = user
self.password = password
self.database = database
self.lock_name = lock_name
self.connection = None
def connect(self):
self.connection = pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
cursorclass=pymysql.cursors.DictCursor
)
def acquire_lock(self):
self.connect()
try:
with self.connection.cursor() as cursor:
sql = "INSERT INTO locks (lock_name) VALUES (%s)"
cursor.execute(sql, (self.lock_name,))
self.connection.commit()
return True
except pymysql.err.IntegrityError:
self.connection.rollback() # Rollback to avoid auto-commit
return False
finally:
if self.connection:
self.connection.close()
def release_lock(self):
self.connect()
try:
with self.connection.cursor() as cursor:
sql = "DELETE FROM locks WHERE lock_name = %s"
cursor.execute(sql, (self.lock_name,))
self.connection.commit()
return True
except Exception as e:
print(f"Error releasing lock: {e}")
self.connection.rollback()
return False
finally:
if self.connection:
self.connection.close()
# 示例用法
if __name__ == '__main__':
lock = DatabaseLock("localhost", "user", "password", "database", "my_resource_lock")
if lock.acquire_lock():
print("Acquired lock!")
try:
# 执行需要保护的操作
print("Performing protected operation...")
finally:
lock.release_lock()
print("Released lock!")
else:
print("Failed to acquire lock.")
#创建 locks 表的 SQL 语句
# CREATE TABLE locks (
# lock_name VARCHAR(255) NOT NULL UNIQUE
# );
优点:
- 简单易懂,容易实现。
缺点:
- 存在单点故障的风险。
- 性能较低,每次获取和释放锁都需要进行数据库操作。
- 没有超时机制,如果持有锁的客户端崩溃,可能会导致死锁。
b. 基于悲观锁
使用 SELECT ... FOR UPDATE 语句,锁定一行数据,直到事务结束才释放锁。
import pymysql
class PessimisticLock:
def __init__(self, host, user, password, database, resource_id):
self.host = host
self.user = user
self.password = password
self.database = database
self.resource_id = resource_id
self.connection = None
def connect(self):
self.connection = pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
autocommit=False, # Important: Disable autocommit
cursorclass=pymysql.cursors.DictCursor
)
def acquire_lock(self, timeout=10):
self.connect()
try:
with self.connection.cursor() as cursor:
sql = "SELECT * FROM resources WHERE id = %s FOR UPDATE"
cursor.execute(sql, (self.resource_id,))
result = cursor.fetchone()
if result:
# Lock acquired
self.connection.commit() # Explicitly commit the SELECT FOR UPDATE
return True
else:
self.connection.rollback() # Rollback if resource doesn't exist
return False
except Exception as e:
print(f"Error acquiring lock: {e}")
self.connection.rollback()
return False
def release_lock(self):
if not self.connection: # Check if there's an active connection
return # Or raise an exception, depending on your needs
try:
self.connection.commit() #Commit the transaction releases the lock
except Exception as e:
print(f"Error releasing lock: {e}")
self.connection.rollback()
finally:
if self.connection:
self.connection.close()
# 示例用法
if __name__ == '__main__':
lock = PessimisticLock("localhost", "user", "password", "database", 1) # Assuming resource ID is 1
if lock.acquire_lock():
print("Acquired lock!")
try:
# 执行需要保护的操作
print("Performing protected operation...")
# Example: Update the resource
# with lock.connection.cursor() as cursor:
# sql = "UPDATE resources SET data = 'updated' WHERE id = %s"
# cursor.execute(sql, (lock.resource_id,))
# lock.connection.commit()
finally:
lock.release_lock()
print("Released lock!")
else:
print("Failed to acquire lock.")
#创建 resources 表的 SQL 语句 (简单示例)
# CREATE TABLE resources (
# id INT PRIMARY KEY,
# data VARCHAR(255)
# );
优点:
- 实现简单。
- 数据库本身提供了一定的可靠性保证。
缺点:
- 性能较低,每次获取和释放锁都需要进行数据库操作。
- 容易造成死锁,需要谨慎设计。
- 锁的持有时间依赖于事务的执行时间,如果事务执行时间过长,会影响并发性能。
总结: 基于数据库的锁实现简单,但性能和可靠性方面存在一些问题,不适合高并发的场景。
2. 基于 Redis 的锁
Redis 是一个高性能的键值存储系统,适合用来实现分布式锁。
a. SETNX (Set If Not Exists)
SETNX key value 命令,如果 key 不存在,则设置 key 的值为 value,并返回 1;如果 key 存在,则不进行任何操作,并返回 0。
import redis
import time
import uuid
class RedisLock:
def __init__(self, host, port, db, lock_name, expire_time=10):
self.redis = redis.Redis(host=host, port=port, db=db)
self.lock_name = lock_name
self.expire_time = expire_time
self.lock_value = str(uuid.uuid4()) # 使用UUID作为锁的值,防止误解锁
def acquire_lock(self, timeout=10):
end = time.time() + timeout
while time.time() < end:
if self.redis.setnx(self.lock_name, self.lock_value):
self.redis.expire(self.lock_name, self.expire_time) # 设置过期时间
return True
time.sleep(0.01) # 短暂休眠,避免CPU空转
return False
def release_lock(self):
try:
if self.redis.get(self.lock_name) == self.lock_value.encode(): # 验证锁的持有者
self.redis.delete(self.lock_name)
return True
else:
return False # 如果锁不是当前客户端持有的,则不删除,防止误删
except Exception as e:
print(f"Error releasing lock: {e}")
return False
# 示例用法
if __name__ == '__main__':
lock = RedisLock("localhost", 6379, 0, "my_resource_lock", expire_time=5)
if lock.acquire_lock():
print("Acquired lock!")
try:
# 执行需要保护的操作
print("Performing protected operation...")
time.sleep(2) # 模拟操作耗时
finally:
lock.release_lock()
print("Released lock!")
else:
print("Failed to acquire lock.")
优点:
- 性能高,Redis 的读写速度非常快。
- 可以设置过期时间,避免死锁。
缺点:
- 如果客户端在设置过期时间之前崩溃,仍然可能导致死锁(虽然概率较低)。
- 需要手动处理锁的续约问题,如果业务逻辑执行时间超过过期时间,锁可能会被其他客户端获取。
- 可能存在锁误删的问题:如果客户端 A 获取锁后,由于网络延迟等原因,导致锁过期,此时客户端 B 获取了锁。然后客户端 A 的请求到达 Redis,执行
DEL命令,将客户端 B 的锁删除了。
b. Redlock 算法
为了解决 Redis 单点故障的问题,Redis 官方提出了 Redlock 算法。Redlock 算法的核心思想是:使用多个独立的 Redis 实例,只有当超过半数的实例都成功获取锁时,才认为获取锁成功。
Redlock 算法的步骤如下:
- 客户端尝试从 N 个独立的 Redis 实例获取锁。
- 客户端使用相同的 key 和 value,并设置一个较短的过期时间。
- 客户端计算获取锁的时间,只有当超过半数的实例都成功获取锁,并且获取锁的总时间小于过期时间时,才认为获取锁成功。
- 如果获取锁成功,则延长锁的过期时间(续约)。
- 如果获取锁失败,则释放所有实例上的锁。
import redis
import time
import uuid
class Redlock:
def __init__(self, redis_nodes, lock_name, lock_expiry, retry_count=3, retry_delay=0.2):
self.redis_nodes = redis_nodes
self.lock_name = lock_name
self.lock_expiry = lock_expiry
self.retry_count = retry_count
self.retry_delay = retry_delay
self.lock_value = str(uuid.uuid4())
def _get_redis_connection(self, node):
return redis.Redis(host=node['host'], port=node['port'], db=node.get('db', 0))
def acquire_lock(self):
for attempt in range(self.retry_count):
start_time = time.time()
acquired_count = 0
redis_connections = []
for node in self.redis_nodes:
try:
redis_connection = self._get_redis_connection(node)
redis_connections.append((redis_connection, False)) # Store connection and status (acquired or not)
if redis_connection.set(self.lock_name, self.lock_value, nx=True, ex=self.lock_expiry):
acquired_count += 1
redis_connections[-1] = (redis_connection, True) # Mark as acquired
except Exception as e:
print(f"Error connecting to Redis node: {e}")
# Continue to the next node even if one fails
# Check if majority of nodes acquired the lock
if acquired_count > len(self.redis_nodes) / 2 and (time.time() - start_time) < self.lock_expiry:
return True
# Release locks if not successful
self.release_lock(redis_connections)
# Retry if needed
if attempt < self.retry_count - 1:
time.sleep(self.retry_delay)
return False
def release_lock(self, connections=None):
if connections is None: # Release lock if called outside acquire_lock (e.g., in a finally block)
connections = []
for node in self.redis_nodes:
try:
redis_connection = self._get_redis_connection(node)
connections.append((redis_connection, False))
except:
pass # Ignore connection errors during release
for conn, acquired in connections:
try:
if acquired: # Only delete the key if we acquired the lock on this node
if conn.get(self.lock_name) == self.lock_value.encode():
conn.delete(self.lock_name)
except Exception as e:
print(f"Error releasing lock: {e}")
# 示例用法
if __name__ == '__main__':
redis_nodes = [
{'host': 'localhost', 'port': 6379, 'db': 0},
{'host': 'localhost', 'port': 6380, 'db': 0},
{'host': 'localhost', 'port': 6381, 'db': 0}
]
lock = Redlock(redis_nodes, "my_resource_lock", 10)
if lock.acquire_lock():
print("Acquired lock!")
try:
# 执行需要保护的操作
print("Performing protected operation...")
time.sleep(5) # 模拟操作耗时
finally:
lock.release_lock()
print("Released lock!")
else:
print("Failed to acquire lock.")
优点:
- 提高了可靠性,避免单点故障。
缺点:
- 实现复杂,需要维护多个 Redis 实例。
- 性能略有下降,需要与多个 Redis 实例进行通信。
- Redlock 算法的正确性存在争议,需要谨慎使用。
总结: 基于 Redis 的锁性能高,可以设置过期时间,但需要注意锁的续约和误删问题。Redlock 算法提高了可靠性,但实现复杂,正确性存在争议。
3. 基于 ZooKeeper 的锁
ZooKeeper 是一个分布式协调服务,提供了一种可靠的分布式锁实现方式。
a. 临时顺序节点
ZooKeeper 的锁机制基于临时顺序节点来实现。
- 客户端在 ZooKeeper 上创建一个临时顺序节点,例如
/locks/my_resource_lock/lock-0000000001。 - 客户端获取
/locks/my_resource_lock目录下所有子节点。 - 客户端判断自己创建的节点是否是序号最小的节点。如果是,则获取锁成功;否则,监听比自己序号小的那个节点的变化。
- 如果监听的节点被删除,则重复步骤 2 和 3。
- 客户端释放锁时,删除自己创建的节点。
import kazoo.client
import kazoo.exceptions
import time
import uuid
class ZookeeperLock:
def __init__(self, zk_hosts, lock_path):
self.zk_hosts = zk_hosts
self.lock_path = lock_path
self.client = kazoo.client.KazooClient(hosts=self.zk_hosts)
self.lock_node = None
self.lock_value = str(uuid.uuid4()).encode('utf-8')
def connect(self):
try:
self.client.start()
self.client.ensure_path(self.lock_path) # Ensure the lock path exists
except Exception as e:
print(f"Error connecting to ZooKeeper: {e}")
if self.client.state != 'LOST':
self.client.stop()
raise
def acquire_lock(self, timeout=10):
self.connect()
try:
# Create an ephemeral sequential node
self.lock_node = self.client.create(self.lock_path + "/lock-", ephemeral=True, sequence=True, value = self.lock_value)
while True:
children = sorted(self.client.get_children(self.lock_path))
my_node_name = self.lock_node.split('/')[-1]
if my_node_name == children[0]:
# We are the first node, so we have the lock
return True
# Find the node before us
my_index = children.index(my_node_name)
preceding_node = children[my_index - 1]
preceding_node_path = self.lock_path + "/" + preceding_node
# Check if the preceding node still exists (important for handling session expiration)
if not self.client.exists(preceding_node_path):
# The preceding node is gone, so retry from the beginning of the loop
continue
# Wait for the preceding node to be deleted
event = self.client.exists(preceding_node_path, watch=lambda event: True)
if event is None: # Handle potential race condition: node doesn't exist by now
continue
# Wait with a timeout
if event.wait(timeout): # Wait returns True if the event fired
continue #Try again from the beginning
else:
# Timeout occurred, release the lock node and return False
self.release_lock()
return False
except Exception as e:
print(f"Error acquiring lock: {e}")
self.release_lock()
return False
def release_lock(self):
try:
if self.lock_node and self.client.exists(self.lock_node):
# Double check the node's content before deleting
data, stat = self.client.get(self.lock_node)
if data == self.lock_value:
self.client.delete(self.lock_node)
except Exception as e:
print(f"Error releasing lock: {e}")
finally:
if self.client.state != 'LOST' and self.client.state != 'SUSPENDED' : # Avoid reconnecting if zk connection is lost
self.client.stop() # Close the connection after use
self.lock_node = None
# 示例用法
if __name__ == '__main__':
lock = ZookeeperLock("localhost:2181", "/my_resource_lock")
if lock.acquire_lock():
print("Acquired lock!")
try:
# 执行需要保护的操作
print("Performing protected operation...")
time.sleep(5) # 模拟操作耗时
finally:
lock.release_lock()
print("Released lock!")
else:
print("Failed to acquire lock.")
优点:
- 可靠性高,ZooKeeper 本身具有高可用性。
- 公平锁,按照请求的先后顺序获取锁。
- 自动释放锁,如果客户端崩溃,ZooKeeper 会自动删除临时节点,释放锁。
缺点:
- 性能相对较低,每次获取和释放锁都需要与 ZooKeeper 进行通信。
- 实现复杂,需要理解 ZooKeeper 的原理。
总结: 基于 ZooKeeper 的锁可靠性高,具有公平性和自动释放锁的特性,但性能相对较低,实现复杂。
三种实现方式的对比
为了更清晰地了解这三种分布式锁实现方式的优缺点,我们用表格进行对比:
| 特性 | 数据库锁 | Redis 锁 | ZooKeeper 锁 |
|---|---|---|---|
| 性能 | 低 | 高 | 中 |
| 可靠性 | 较低 | 中(Redlock 较高) | 高 |
| 实现难度 | 简单 | 中 | 复杂 |
| 超时机制 | 依赖数据库事务,需要手动处理 | 支持 | 支持(临时节点) |
| 死锁 | 容易产生死锁,需要谨慎设计 | 需要手动处理锁续约 | 自动释放锁 |
| 公平性 | 非公平锁 | 非公平锁 | 公平锁 |
| 适用场景 | 低并发,对性能要求不高的场景 | 高并发,对性能要求高的场景,可以接受一定的风险 | 对可靠性要求高的场景,例如金融系统 |
| 额外依赖 | 数据库 | Redis | ZooKeeper |
如何选择合适的分布式锁?
选择合适的分布式锁,需要根据具体的业务场景进行权衡。
- 如果对性能要求很高,可以接受一定的风险,可以选择 Redis 锁。 但需要注意锁的续约和误删问题。
- 如果对可靠性要求很高,可以选择 ZooKeeper 锁。 但需要考虑 ZooKeeper 的性能瓶颈。
- 如果业务场景简单,并发量不高,可以选择数据库锁。 但需要注意死锁问题。
最佳实践
- 设置合理的过期时间: 避免死锁,但也不能设置得太短,否则可能导致锁被提前释放。
- 使用 UUID 作为锁的值: 防止误删锁。
- 实现锁的续约机制: 避免业务逻辑执行时间超过过期时间,导致锁被其他客户端获取。
- 处理异常情况: 在获取锁和释放锁的过程中,可能会出现各种异常,需要进行处理,避免死锁。
- 监控锁的运行状态: 及时发现和解决问题。
- 考虑锁的重入性: 对于需要重入的场景,例如一个方法内部调用另一个需要锁的方法,需要实现可重入锁,例如通过 Redis 的 hash 结构来记录锁的持有次数。
关于分布式锁的一些思考
分布式锁是一个复杂的问题,没有银弹。在选择分布式锁的实现方式时,需要根据具体的业务场景进行权衡。除了上述三种实现方式,还有一些其他的分布式锁实现方式,例如基于 etcd 的锁,基于 Consul 的锁等。
在设计分布式系统时,应该尽量避免使用分布式锁,例如可以通过数据分片、最终一致性等方式来解决并发问题。如果必须使用分布式锁,则需要仔细评估其对系统性能和可靠性的影响。
资源访问控制的重点总结
分布式锁是解决分布式环境下资源访问互斥性的重要手段。根据不同的场景和需求,可以选择基于数据库、Redis 或 ZooKeeper 的锁实现。理解各种锁的优缺点,并结合实际情况进行选择,才能构建稳定可靠的分布式系统。
更多IT精英技术系列讲座,到智猿学院