Python中的分布式锁机制:保证资源访问的互斥性与一致性

Python 分布式锁机制:保证资源访问的互斥性与一致性

大家好,今天我们来聊聊 Python 中的分布式锁。在单机环境中,锁机制相对简单,比如可以使用 Python 的 threading.Lock 来保证多线程访问共享资源的互斥性。但是,当应用扩展到分布式环境,多个服务实例同时运行,单机锁就失效了。我们需要一种能够在多个实例之间协调,保证资源访问互斥性的机制,这就是分布式锁。

为什么需要分布式锁?

想象一个场景:多个用户同时购买同一件商品,库存只有一个。如果没有锁机制,多个服务实例可能同时读取到剩余库存为 1,然后都进行扣减操作,导致超卖。分布式锁就是为了解决这类并发问题,保证数据的一致性。

更具体地说,分布式锁主要解决以下问题:

  • 互斥性 (Mutual Exclusion): 保证在任何时刻,只有一个客户端可以获得锁。
  • 容错性 (Fault Tolerance): 即使持有锁的客户端崩溃,锁也应该能够自动释放,避免死锁。
  • 高可用性 (High Availability): 锁服务本身应该是高可用的,避免单点故障。

分布式锁的实现方式

常见的分布式锁实现方式包括:

  1. 基于数据库的锁
  2. 基于 Redis 的锁
  3. 基于 ZooKeeper 的锁

我们来逐一分析。

1. 基于数据库的锁

这种方式是最简单的,利用数据库的唯一索引或者悲观锁来实现。

a. 基于唯一索引

创建一个表,包含一个唯一索引的字段,尝试插入一条数据,如果插入成功,则获取锁;如果插入失败,则说明锁已被其他客户端持有。

import pymysql

class DatabaseLock:
    def __init__(self, host, user, password, database, lock_name):
        self.host = host
        self.user = user
        self.password = password
        self.database = database
        self.lock_name = lock_name
        self.connection = None

    def connect(self):
        self.connection = pymysql.connect(
            host=self.host,
            user=self.user,
            password=self.password,
            database=self.database,
            cursorclass=pymysql.cursors.DictCursor
        )

    def acquire_lock(self):
        self.connect()
        try:
            with self.connection.cursor() as cursor:
                sql = "INSERT INTO locks (lock_name) VALUES (%s)"
                cursor.execute(sql, (self.lock_name,))
                self.connection.commit()
                return True
        except pymysql.err.IntegrityError:
            self.connection.rollback()  # Rollback to avoid auto-commit
            return False
        finally:
            if self.connection:
                self.connection.close()

    def release_lock(self):
        self.connect()
        try:
            with self.connection.cursor() as cursor:
                sql = "DELETE FROM locks WHERE lock_name = %s"
                cursor.execute(sql, (self.lock_name,))
                self.connection.commit()
                return True
        except Exception as e:
            print(f"Error releasing lock: {e}")
            self.connection.rollback()
            return False
        finally:
            if self.connection:
                self.connection.close()

# 示例用法
if __name__ == '__main__':
    lock = DatabaseLock("localhost", "user", "password", "database", "my_resource_lock")

    if lock.acquire_lock():
        print("Acquired lock!")
        try:
            # 执行需要保护的操作
            print("Performing protected operation...")
        finally:
            lock.release_lock()
            print("Released lock!")
    else:
        print("Failed to acquire lock.")

#创建 locks 表的 SQL 语句
# CREATE TABLE locks (
#    lock_name VARCHAR(255) NOT NULL UNIQUE
# );

优点:

  • 简单易懂,容易实现。

缺点:

  • 存在单点故障的风险。
  • 性能较低,每次获取和释放锁都需要进行数据库操作。
  • 没有超时机制,如果持有锁的客户端崩溃,可能会导致死锁。

b. 基于悲观锁

使用 SELECT ... FOR UPDATE 语句,锁定一行数据,直到事务结束才释放锁。

import pymysql

class PessimisticLock:
    def __init__(self, host, user, password, database, resource_id):
        self.host = host
        self.user = user
        self.password = password
        self.database = database
        self.resource_id = resource_id
        self.connection = None

    def connect(self):
        self.connection = pymysql.connect(
            host=self.host,
            user=self.user,
            password=self.password,
            database=self.database,
            autocommit=False,  # Important: Disable autocommit
            cursorclass=pymysql.cursors.DictCursor
        )

    def acquire_lock(self, timeout=10):
        self.connect()
        try:
            with self.connection.cursor() as cursor:
                sql = "SELECT * FROM resources WHERE id = %s FOR UPDATE"
                cursor.execute(sql, (self.resource_id,))
                result = cursor.fetchone()

                if result:
                    # Lock acquired
                    self.connection.commit() # Explicitly commit the SELECT FOR UPDATE
                    return True
                else:
                    self.connection.rollback() # Rollback if resource doesn't exist
                    return False

        except Exception as e:
            print(f"Error acquiring lock: {e}")
            self.connection.rollback()
            return False

    def release_lock(self):
        if not self.connection:  # Check if there's an active connection
            return  # Or raise an exception, depending on your needs
        try:
             self.connection.commit()  #Commit the transaction releases the lock
        except Exception as e:
            print(f"Error releasing lock: {e}")
            self.connection.rollback()
        finally:
            if self.connection:
                self.connection.close()

# 示例用法
if __name__ == '__main__':
    lock = PessimisticLock("localhost", "user", "password", "database", 1)  # Assuming resource ID is 1

    if lock.acquire_lock():
        print("Acquired lock!")
        try:
            # 执行需要保护的操作
            print("Performing protected operation...")
            # Example: Update the resource
            # with lock.connection.cursor() as cursor:
            #     sql = "UPDATE resources SET data = 'updated' WHERE id = %s"
            #     cursor.execute(sql, (lock.resource_id,))
            #     lock.connection.commit()
        finally:
            lock.release_lock()
            print("Released lock!")
    else:
        print("Failed to acquire lock.")

#创建 resources 表的 SQL 语句 (简单示例)
# CREATE TABLE resources (
#     id INT PRIMARY KEY,
#     data VARCHAR(255)
# );

优点:

  • 实现简单。
  • 数据库本身提供了一定的可靠性保证。

缺点:

  • 性能较低,每次获取和释放锁都需要进行数据库操作。
  • 容易造成死锁,需要谨慎设计。
  • 锁的持有时间依赖于事务的执行时间,如果事务执行时间过长,会影响并发性能。

总结: 基于数据库的锁实现简单,但性能和可靠性方面存在一些问题,不适合高并发的场景。

2. 基于 Redis 的锁

Redis 是一个高性能的键值存储系统,适合用来实现分布式锁。

a. SETNX (Set If Not Exists)

SETNX key value 命令,如果 key 不存在,则设置 key 的值为 value,并返回 1;如果 key 存在,则不进行任何操作,并返回 0。

import redis
import time
import uuid

class RedisLock:
    def __init__(self, host, port, db, lock_name, expire_time=10):
        self.redis = redis.Redis(host=host, port=port, db=db)
        self.lock_name = lock_name
        self.expire_time = expire_time
        self.lock_value = str(uuid.uuid4())  # 使用UUID作为锁的值,防止误解锁

    def acquire_lock(self, timeout=10):
        end = time.time() + timeout
        while time.time() < end:
            if self.redis.setnx(self.lock_name, self.lock_value):
                self.redis.expire(self.lock_name, self.expire_time)  # 设置过期时间
                return True
            time.sleep(0.01)  # 短暂休眠,避免CPU空转
        return False

    def release_lock(self):
        try:
            if self.redis.get(self.lock_name) == self.lock_value.encode(): # 验证锁的持有者
                self.redis.delete(self.lock_name)
                return True
            else:
                return False # 如果锁不是当前客户端持有的,则不删除,防止误删
        except Exception as e:
            print(f"Error releasing lock: {e}")
            return False

# 示例用法
if __name__ == '__main__':
    lock = RedisLock("localhost", 6379, 0, "my_resource_lock", expire_time=5)

    if lock.acquire_lock():
        print("Acquired lock!")
        try:
            # 执行需要保护的操作
            print("Performing protected operation...")
            time.sleep(2)  # 模拟操作耗时
        finally:
            lock.release_lock()
            print("Released lock!")
    else:
        print("Failed to acquire lock.")

优点:

  • 性能高,Redis 的读写速度非常快。
  • 可以设置过期时间,避免死锁。

缺点:

  • 如果客户端在设置过期时间之前崩溃,仍然可能导致死锁(虽然概率较低)。
  • 需要手动处理锁的续约问题,如果业务逻辑执行时间超过过期时间,锁可能会被其他客户端获取。
  • 可能存在锁误删的问题:如果客户端 A 获取锁后,由于网络延迟等原因,导致锁过期,此时客户端 B 获取了锁。然后客户端 A 的请求到达 Redis,执行 DEL 命令,将客户端 B 的锁删除了。

b. Redlock 算法

为了解决 Redis 单点故障的问题,Redis 官方提出了 Redlock 算法。Redlock 算法的核心思想是:使用多个独立的 Redis 实例,只有当超过半数的实例都成功获取锁时,才认为获取锁成功。

Redlock 算法的步骤如下:

  1. 客户端尝试从 N 个独立的 Redis 实例获取锁。
  2. 客户端使用相同的 key 和 value,并设置一个较短的过期时间。
  3. 客户端计算获取锁的时间,只有当超过半数的实例都成功获取锁,并且获取锁的总时间小于过期时间时,才认为获取锁成功。
  4. 如果获取锁成功,则延长锁的过期时间(续约)。
  5. 如果获取锁失败,则释放所有实例上的锁。
import redis
import time
import uuid

class Redlock:
    def __init__(self, redis_nodes, lock_name, lock_expiry, retry_count=3, retry_delay=0.2):
        self.redis_nodes = redis_nodes
        self.lock_name = lock_name
        self.lock_expiry = lock_expiry
        self.retry_count = retry_count
        self.retry_delay = retry_delay
        self.lock_value = str(uuid.uuid4())

    def _get_redis_connection(self, node):
        return redis.Redis(host=node['host'], port=node['port'], db=node.get('db', 0))

    def acquire_lock(self):
        for attempt in range(self.retry_count):
            start_time = time.time()
            acquired_count = 0
            redis_connections = []
            for node in self.redis_nodes:
                try:
                    redis_connection = self._get_redis_connection(node)
                    redis_connections.append((redis_connection, False)) # Store connection and status (acquired or not)

                    if redis_connection.set(self.lock_name, self.lock_value, nx=True, ex=self.lock_expiry):
                        acquired_count += 1
                        redis_connections[-1] = (redis_connection, True)  # Mark as acquired
                except Exception as e:
                    print(f"Error connecting to Redis node: {e}")
                    # Continue to the next node even if one fails

            # Check if majority of nodes acquired the lock
            if acquired_count > len(self.redis_nodes) / 2 and (time.time() - start_time) < self.lock_expiry:
                return True

            # Release locks if not successful
            self.release_lock(redis_connections)

            # Retry if needed
            if attempt < self.retry_count - 1:
                time.sleep(self.retry_delay)
        return False

    def release_lock(self, connections=None):
        if connections is None: # Release lock if called outside acquire_lock (e.g., in a finally block)
            connections = []
            for node in self.redis_nodes:
                try:
                    redis_connection = self._get_redis_connection(node)
                    connections.append((redis_connection, False))
                except:
                    pass # Ignore connection errors during release

        for conn, acquired in connections:
            try:
                if acquired: # Only delete the key if we acquired the lock on this node
                    if conn.get(self.lock_name) == self.lock_value.encode():
                        conn.delete(self.lock_name)

            except Exception as e:
                print(f"Error releasing lock: {e}")

# 示例用法
if __name__ == '__main__':
    redis_nodes = [
        {'host': 'localhost', 'port': 6379, 'db': 0},
        {'host': 'localhost', 'port': 6380, 'db': 0},
        {'host': 'localhost', 'port': 6381, 'db': 0}
    ]
    lock = Redlock(redis_nodes, "my_resource_lock", 10)

    if lock.acquire_lock():
        print("Acquired lock!")
        try:
            # 执行需要保护的操作
            print("Performing protected operation...")
            time.sleep(5)  # 模拟操作耗时
        finally:
            lock.release_lock()
            print("Released lock!")
    else:
        print("Failed to acquire lock.")

优点:

  • 提高了可靠性,避免单点故障。

缺点:

  • 实现复杂,需要维护多个 Redis 实例。
  • 性能略有下降,需要与多个 Redis 实例进行通信。
  • Redlock 算法的正确性存在争议,需要谨慎使用。

总结: 基于 Redis 的锁性能高,可以设置过期时间,但需要注意锁的续约和误删问题。Redlock 算法提高了可靠性,但实现复杂,正确性存在争议。

3. 基于 ZooKeeper 的锁

ZooKeeper 是一个分布式协调服务,提供了一种可靠的分布式锁实现方式。

a. 临时顺序节点

ZooKeeper 的锁机制基于临时顺序节点来实现。

  1. 客户端在 ZooKeeper 上创建一个临时顺序节点,例如 /locks/my_resource_lock/lock-0000000001
  2. 客户端获取 /locks/my_resource_lock 目录下所有子节点。
  3. 客户端判断自己创建的节点是否是序号最小的节点。如果是,则获取锁成功;否则,监听比自己序号小的那个节点的变化。
  4. 如果监听的节点被删除,则重复步骤 2 和 3。
  5. 客户端释放锁时,删除自己创建的节点。
import kazoo.client
import kazoo.exceptions
import time
import uuid

class ZookeeperLock:
    def __init__(self, zk_hosts, lock_path):
        self.zk_hosts = zk_hosts
        self.lock_path = lock_path
        self.client = kazoo.client.KazooClient(hosts=self.zk_hosts)
        self.lock_node = None
        self.lock_value = str(uuid.uuid4()).encode('utf-8')

    def connect(self):
        try:
            self.client.start()
            self.client.ensure_path(self.lock_path)  # Ensure the lock path exists
        except Exception as e:
            print(f"Error connecting to ZooKeeper: {e}")
            if self.client.state != 'LOST':
                self.client.stop()
            raise

    def acquire_lock(self, timeout=10):
        self.connect()
        try:
            # Create an ephemeral sequential node
            self.lock_node = self.client.create(self.lock_path + "/lock-", ephemeral=True, sequence=True, value = self.lock_value)
            while True:
                children = sorted(self.client.get_children(self.lock_path))
                my_node_name = self.lock_node.split('/')[-1]

                if my_node_name == children[0]:
                    # We are the first node, so we have the lock
                    return True

                # Find the node before us
                my_index = children.index(my_node_name)
                preceding_node = children[my_index - 1]
                preceding_node_path = self.lock_path + "/" + preceding_node

                # Check if the preceding node still exists (important for handling session expiration)
                if not self.client.exists(preceding_node_path):
                    # The preceding node is gone, so retry from the beginning of the loop
                    continue

                # Wait for the preceding node to be deleted
                event = self.client.exists(preceding_node_path, watch=lambda event: True)

                if event is None: # Handle potential race condition: node doesn't exist by now
                    continue

                # Wait with a timeout
                if event.wait(timeout): # Wait returns True if the event fired
                    continue  #Try again from the beginning
                else:
                    # Timeout occurred, release the lock node and return False
                    self.release_lock()
                    return False

        except Exception as e:
            print(f"Error acquiring lock: {e}")
            self.release_lock()
            return False

    def release_lock(self):
        try:
            if self.lock_node and self.client.exists(self.lock_node):
                # Double check the node's content before deleting
                data, stat = self.client.get(self.lock_node)
                if data == self.lock_value:
                   self.client.delete(self.lock_node)
        except Exception as e:
            print(f"Error releasing lock: {e}")
        finally:
            if self.client.state != 'LOST' and self.client.state != 'SUSPENDED' : # Avoid reconnecting if zk connection is lost
                self.client.stop() # Close the connection after use
            self.lock_node = None

# 示例用法
if __name__ == '__main__':
    lock = ZookeeperLock("localhost:2181", "/my_resource_lock")

    if lock.acquire_lock():
        print("Acquired lock!")
        try:
            # 执行需要保护的操作
            print("Performing protected operation...")
            time.sleep(5)  # 模拟操作耗时
        finally:
            lock.release_lock()
            print("Released lock!")
    else:
        print("Failed to acquire lock.")

优点:

  • 可靠性高,ZooKeeper 本身具有高可用性。
  • 公平锁,按照请求的先后顺序获取锁。
  • 自动释放锁,如果客户端崩溃,ZooKeeper 会自动删除临时节点,释放锁。

缺点:

  • 性能相对较低,每次获取和释放锁都需要与 ZooKeeper 进行通信。
  • 实现复杂,需要理解 ZooKeeper 的原理。

总结: 基于 ZooKeeper 的锁可靠性高,具有公平性和自动释放锁的特性,但性能相对较低,实现复杂。

三种实现方式的对比

为了更清晰地了解这三种分布式锁实现方式的优缺点,我们用表格进行对比:

特性 数据库锁 Redis 锁 ZooKeeper 锁
性能
可靠性 较低 中(Redlock 较高)
实现难度 简单 复杂
超时机制 依赖数据库事务,需要手动处理 支持 支持(临时节点)
死锁 容易产生死锁,需要谨慎设计 需要手动处理锁续约 自动释放锁
公平性 非公平锁 非公平锁 公平锁
适用场景 低并发,对性能要求不高的场景 高并发,对性能要求高的场景,可以接受一定的风险 对可靠性要求高的场景,例如金融系统
额外依赖 数据库 Redis ZooKeeper

如何选择合适的分布式锁?

选择合适的分布式锁,需要根据具体的业务场景进行权衡。

  • 如果对性能要求很高,可以接受一定的风险,可以选择 Redis 锁。 但需要注意锁的续约和误删问题。
  • 如果对可靠性要求很高,可以选择 ZooKeeper 锁。 但需要考虑 ZooKeeper 的性能瓶颈。
  • 如果业务场景简单,并发量不高,可以选择数据库锁。 但需要注意死锁问题。

最佳实践

  1. 设置合理的过期时间: 避免死锁,但也不能设置得太短,否则可能导致锁被提前释放。
  2. 使用 UUID 作为锁的值: 防止误删锁。
  3. 实现锁的续约机制: 避免业务逻辑执行时间超过过期时间,导致锁被其他客户端获取。
  4. 处理异常情况: 在获取锁和释放锁的过程中,可能会出现各种异常,需要进行处理,避免死锁。
  5. 监控锁的运行状态: 及时发现和解决问题。
  6. 考虑锁的重入性: 对于需要重入的场景,例如一个方法内部调用另一个需要锁的方法,需要实现可重入锁,例如通过 Redis 的 hash 结构来记录锁的持有次数。

关于分布式锁的一些思考

分布式锁是一个复杂的问题,没有银弹。在选择分布式锁的实现方式时,需要根据具体的业务场景进行权衡。除了上述三种实现方式,还有一些其他的分布式锁实现方式,例如基于 etcd 的锁,基于 Consul 的锁等。

在设计分布式系统时,应该尽量避免使用分布式锁,例如可以通过数据分片、最终一致性等方式来解决并发问题。如果必须使用分布式锁,则需要仔细评估其对系统性能和可靠性的影响。

资源访问控制的重点总结

分布式锁是解决分布式环境下资源访问互斥性的重要手段。根据不同的场景和需求,可以选择基于数据库、Redis 或 ZooKeeper 的锁实现。理解各种锁的优缺点,并结合实际情况进行选择,才能构建稳定可靠的分布式系统。

更多IT精英技术系列讲座,到智猿学院

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注