Python的分布式锁实现:基于Redis或ZooKeeper的容错与一致性保障

Python 分布式锁实现:基于 Redis 或 ZooKeeper 的容错与一致性保障

各位朋友,大家好。今天我们来聊聊 Python 分布式锁的实现,重点关注如何利用 Redis 和 ZooKeeper 这两个强大的工具,构建具备容错性和一致性保障的分布式锁。

什么是分布式锁? 为什么要用它?

在单体应用中,我们通常使用编程语言自带的锁机制(例如 Python 的 threading.Lock)来保证对共享资源的互斥访问。但在分布式系统中,多个独立的进程运行在不同的机器上,这些进程都需要访问同一个共享资源,传统的锁机制就无法满足需求了。

这时候,就需要分布式锁。 简单来说,分布式锁是一种控制分布式系统之间互斥访问共享资源的机制。 它的作用是:

  • 互斥性(Mutual Exclusion): 在任何时刻,只有一个客户端能够获得锁。
  • 避免死锁(Deadlock Avoidance): 即使持有锁的客户端崩溃,锁也能自动释放,避免其他客户端永远无法获得锁。
  • 容错性(Fault Tolerance): 锁服务本身需要具备高可用性,即使部分节点发生故障,锁服务依然能够正常工作。

实现分布式锁的常见方案

实现分布式锁的方案有很多,比较常见的有:

  • 基于数据库: 利用数据库的唯一索引或悲观锁来实现。 这种方式简单易懂,但性能相对较差。
  • 基于 Redis: 利用 Redis 的 SETNX 命令(set if not exists)和过期时间来实现。 性能高,实现简单,但需要注意一些细节,以避免锁的误释放和死锁。
  • 基于 ZooKeeper: 利用 ZooKeeper 的临时节点和 Watcher 机制来实现。 提供更强的一致性保证,但也更复杂。

今天,我们重点讨论基于 Redis 和 ZooKeeper 的分布式锁实现。

基于 Redis 的分布式锁

Redis 是一种高性能的键值存储系统,非常适合用于实现分布式锁。

1. 基本实现:SETNX 和过期时间

最简单的 Redis 分布式锁实现,依赖两个关键特性:

  • SETNX key value (Set If Not Exists): 如果 key 不存在,则将其设置为 value,并返回 1;如果 key 已经存在,则不进行任何操作,并返回 0。
  • 过期时间 (Expiration): 为 key 设置一个过期时间,即使客户端崩溃,锁也能在一段时间后自动释放。

Python 代码示例:

import redis
import time
import uuid

class RedisLock:
    def __init__(self, redis_client, lock_name, lock_timeout=10):
        self.redis_client = redis_client
        self.lock_name = lock_name
        self.lock_key = f"lock:{lock_name}" # 使用前缀,避免key冲突
        self.lock_timeout = lock_timeout # 锁的过期时间,单位:秒
        self.lock_value = str(uuid.uuid4()) # 唯一标识,用于解锁

    def acquire(self):
        """尝试获取锁"""
        end = time.time() + self.lock_timeout
        while time.time() < end:
            if self.redis_client.setnx(self.lock_key, self.lock_value):
                self.redis_client.expire(self.lock_key, self.lock_timeout)
                return True
            time.sleep(0.01) # 短暂休眠,避免CPU空转
        return False

    def release(self):
        """释放锁"""
        try:
            if self.redis_client.get(self.lock_key) == self.lock_value.encode(): # 确认是自己加的锁
                self.redis_client.delete(self.lock_key)
                return True
            else:
                return False # 不是自己加的锁,不能释放
        except Exception as e:
            print(f"解锁失败: {e}")
            return False

使用示例:

redis_client = redis.Redis(host='localhost', port=6379, db=0)
lock = RedisLock(redis_client, "my_resource", lock_timeout=5)

if lock.acquire():
    try:
        print("成功获取锁,执行受保护的操作...")
        time.sleep(2) # 模拟执行耗时操作
    finally:
        if lock.release():
            print("成功释放锁")
        else:
            print("释放锁失败")
else:
    print("获取锁失败")

解释:

  • RedisLock 类封装了锁的获取和释放逻辑。
  • acquire() 方法尝试获取锁,如果获取成功,则立即返回 True。如果获取失败,则循环等待,直到超时。
  • release() 方法释放锁,只有持有锁的客户端才能释放锁。
  • 使用 UUID 作为锁的值,避免不同客户端误释放锁。
  • lock_key 使用前缀,避免与其他 Redis key冲突。

2. 优化:Lua 脚本实现原子性操作

上面的实现存在一个潜在的问题:SETNXEXPIRE 不是原子操作。如果在 SETNX 成功后,EXPIRE 执行之前,客户端崩溃了,那么这个锁将永远不会被释放,导致死锁。

为了解决这个问题,可以使用 Redis 的 Lua 脚本,将 SETNXEXPIRE 合并成一个原子操作。

修改后的 RedisLock 类:

import redis
import time
import uuid

class RedisLock:
    def __init__(self, redis_client, lock_name, lock_timeout=10):
        self.redis_client = redis_client
        self.lock_name = lock_name
        self.lock_key = f"lock:{lock_name}"
        self.lock_timeout = lock_timeout
        self.lock_value = str(uuid.uuid4())

        # Lua 脚本,用于原子性地设置 key 和过期时间
        self.lock_script = """
            if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then
                redis.call('pexpire', KEYS[1], ARGV[2])
                return 1
            else
                return 0
            end
        """
        self.unlock_script = """
            if redis.call('get', KEYS[1]) == ARGV[1] then
                return redis.call('del', KEYS[1])
            else
                return 0
            end
        """
        self.lock_script_compiled = self.redis_client.register_script(self.lock_script)
        self.unlock_script_compiled = self.redis_client.register_script(self.unlock_script)

    def acquire(self):
        """尝试获取锁"""
        end = time.time() + self.lock_timeout
        while time.time() < end:
            if self.lock_script_compiled(keys=[self.lock_key], args=[self.lock_value, self.lock_timeout * 1000]): #过期时间单位: 毫秒
                return True
            time.sleep(0.01)
        return False

    def release(self):
        """释放锁"""
        try:
             result = self.unlock_script_compiled(keys=[self.lock_key], args=[self.lock_value])
             return result == 1
        except Exception as e:
            print(f"解锁失败: {e}")
            return False

解释:

  • lock_scriptunlock_script 分别是获取锁和释放锁的 Lua 脚本。
  • redis_client.register_script() 方法将 Lua 脚本注册到 Redis 服务器,并将返回的对象保存到 lock_script_compiledunlock_script_compiled。这样可以避免每次都将脚本发送到服务器,提高性能。
  • acquire() 方法中,调用 lock_script_compiled() 方法执行 Lua 脚本,实现原子性操作。 过期时间单位需要是毫秒,因此需要乘以1000.
  • release() 方法中,调用 unlock_script_compiled() 方法执行Lua脚本,进行解锁。

3. Redlock 算法 (Redis >= 5.0)

虽然 Lua 脚本可以解决原子性问题,但 Redis 单节点仍然存在单点故障的风险。如果 Redis 主节点崩溃,数据尚未同步到从节点,可能会导致多个客户端同时获得锁。

Redlock 算法是一种更高级的分布式锁算法,它通过在多个独立的 Redis 节点上尝试获取锁,来提高锁的可用性和容错性。

Redlock 算法的核心思想是:

  1. 客户端尝试从 N 个独立的 Redis 节点获取锁。
  2. 客户端使用相同的 key 和 value,以及一个较短的锁过期时间(例如几百毫秒)。
  3. 客户端必须在 majority (N/2 + 1) 个节点上成功获取锁,才能认为获取锁成功。
  4. 如果客户端获取锁失败,则需要释放所有节点上的锁。
  5. 为了避免死锁,客户端需要在锁过期后,才能再次尝试获取锁。

由于Redlock算法较为复杂,涉及多个Redis实例的管理和协调,这里只给出概念性的说明,并提供一个简单的示例代码框架。 实际生产环境中,建议使用成熟的Redlock库,例如redlock-py

# (仅为示例代码框架,未经完整测试)
import redis
import time

class Redlock:
    def __init__(self, redis_nodes, lock_name, lock_timeout=10): # 单位:秒
        self.redis_nodes = redis_nodes # Redis 节点列表,例如:[{'host': '127.0.0.1', 'port': 6379, 'db': 0}, ...]
        self.lock_name = lock_name
        self.lock_key = f"lock:{lock_name}"
        self.lock_timeout = int(lock_timeout * 1000) # 毫秒
        self.quorum = len(redis_nodes) // 2 + 1 # 大多数节点数量
        self.lock_value = str(uuid.uuid4())

    def acquire(self):
        """尝试获取 Redlock"""
        start_time = time.time()
        success_count = 0

        for node in self.redis_nodes:
            try:
                r = redis.Redis(**node)
                if r.set(self.lock_key, self.lock_value, nx=True, px=self.lock_timeout): # 使用 set 命令的 nx 和 px 参数
                    success_count += 1
            except Exception as e:
                print(f"获取锁失败: {e}")

        # 判断是否获取到大多数节点的锁
        if success_count >= self.quorum:
            # 验证锁的有效时间,避免时钟漂移导致的问题
            validity_time = int(self.lock_timeout - (time.time() - start_time) * 1000)
            if validity_time > 0:
                return True
            else:
                self.release_all() # 释放所有节点的锁,因为锁的有效时间不足
                return False
        else:
            self.release_all() # 释放所有节点的锁
            return False

    def release_all(self):
        """释放所有节点的锁"""
        for node in self.redis_nodes:
            try:
                r = redis.Redis(**node)
                script = """
                    if redis.call('get', KEYS[1]) == ARGV[1] then
                        return redis.call('del', KEYS[1])
                    else
                        return 0
                    end
                """
                r.eval(script, 1, self.lock_key, self.lock_value) # 使用 eval 执行 Lua 脚本
            except Exception as e:
                print(f"释放锁失败: {e}")

重要提示:

  • Redlock 算法实现较为复杂,需要仔细考虑时钟漂移、网络延迟等因素。
  • 强烈建议使用经过充分测试的 Redlock 库,例如 redlock-py
  • Redlock 算法并不能保证绝对的一致性,但可以显著提高锁的可用性和容错性。

基于 ZooKeeper 的分布式锁

ZooKeeper 是一个分布式协调服务,提供高可用性、高可靠性的分布式数据一致性解决方案。

1. 基本原理:临时节点和 Watcher

ZooKeeper 分布式锁的基本原理是:

  • 创建临时节点: 客户端尝试在 ZooKeeper 中创建一个临时节点(ephemeral node),例如 /locks/my_resource
  • 节点唯一性: 由于 ZooKeeper 中同一个路径只能存在一个节点,因此只有一个客户端能够成功创建该节点,从而获得锁。
  • 临时性: 如果客户端崩溃,其创建的临时节点会自动删除,从而释放锁。
  • Watcher 机制: 其他客户端可以监听该节点的变化。当节点被删除时,ZooKeeper 会通知这些客户端,这些客户端可以再次尝试获取锁。

2. 实现步骤

  1. 客户端尝试创建一个临时顺序节点,例如 /locks/my_resource/lock-0000000001。 顺序节点保证了创建节点的顺序性。
  2. 客户端获取 /locks/my_resource 目录下所有的子节点。
  3. 客户端判断自己创建的节点是否是序号最小的节点。
    • 如果是,则获得锁。
    • 如果不是,则监听序号比自己小的那个节点的变化。
  4. 当监听的节点被删除时,客户端再次尝试获取锁。

3. Python 代码示例

from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError, NoNodeError
import time
import logging

logging.basicConfig(level=logging.INFO)

class ZookeeperLock:
    def __init__(self, hosts, lock_name, lock_path="/locks", timeout=10):
        self.hosts = hosts
        self.lock_name = lock_name
        self.lock_path = f"{lock_path}/{lock_name}"
        self.zk = KazooClient(hosts=self.hosts)
        self.zk.start()
        self.timeout = timeout

        # 确保锁的根路径存在
        try:
            self.zk.create(lock_path, makepath=True) # 递归创建路径
        except NodeExistsError:
            pass

    def acquire(self):
        """尝试获取锁"""
        my_lock_path = None
        try:
            my_lock_path = self.zk.create(self.lock_path + "/lock-", ephemeral=True, sequence=True)
            logging.info(f"创建锁节点: {my_lock_path}")
            while True:
                children = self.zk.get_children(self.lock_path)
                children.sort()

                my_name = my_lock_path.split('/')[-1]
                if children[0] == my_name:
                    logging.info("成功获取锁")
                    return True
                else:
                    # 监听比自己小的节点
                    index = children.index(my_name)
                    preceding_node = self.lock_path + "/" + children[index - 1]
                    logging.info(f"监听节点: {preceding_node}")

                    def watch_callback(event):
                        logging.info(f"节点 {preceding_node} 发生变化: {event}")

                    try:
                        self.zk.get(preceding_node, watch=watch_callback)
                        time.sleep(0.1) # 短暂休眠
                    except NoNodeError: # 前面的节点可能已经消失
                        continue

        except Exception as e:
            logging.error(f"获取锁失败: {e}")
            if my_lock_path:
                try:
                    self.zk.delete(my_lock_path) # 删除自己创建的节点
                except NoNodeError:
                    pass
            return False

    def release(self):
        """释放锁"""
        try:
            children = self.zk.get_children(self.lock_path)
            children.sort()
            my_lock_path = None

            for child in children:
              if child.startswith("lock-"):
                try:
                    self.zk.delete(f"{self.lock_path}/{child}")
                    logging.info(f"释放锁节点: {self.lock_path}/{child}")
                    return True
                except NoNodeError:
                    logging.warning(f"节点 {self.lock_path}/{child} 不存在")
                    continue
            logging.warning("没有找到可以释放的锁节点")
            return False

        except Exception as e:
            logging.error(f"释放锁失败: {e}")
            return False

    def close(self):
        self.zk.stop()
        self.zk.close()

使用示例:

hosts = "127.0.0.1:2181" # ZooKeeper 集群地址
lock = ZookeeperLock(hosts, "my_resource")

if lock.acquire():
    try:
        print("成功获取锁,执行受保护的操作...")
        time.sleep(2)
    finally:
        if lock.release():
            print("成功释放锁")
        else:
            print("释放锁失败")
        lock.close()
else:
    print("获取锁失败")
    lock.close()

解释:

  • ZookeeperLock 类封装了锁的获取和释放逻辑。
  • acquire() 方法尝试创建临时顺序节点,并判断自己是否是序号最小的节点。如果不是,则监听序号比自己小的节点的变化。
  • release() 方法删除自己创建的节点,释放锁。
  • close() 方法关闭 ZooKeeper 连接。

Redis vs ZooKeeper: 如何选择?

特性 Redis ZooKeeper
性能 高,适合高并发场景 相对较低,但满足一般需求
可靠性 单节点存在单点故障风险,Redlock 可以提高 高,基于 Paxos 算法,具有高可用性和一致性
一致性 最终一致性 强一致性
实现复杂度 简单 相对复杂
使用场景 对性能要求高,容忍短暂不一致的场景 对一致性要求高,例如分布式配置管理、服务发现

总结:

  • 如果对性能要求非常高,且能够容忍短暂的不一致,可以选择 Redis。 使用Lua脚本保证原子性,并考虑Redlock算法提高可用性。
  • 如果对一致性要求非常高,且能够接受稍低的性能,可以选择 ZooKeeper。

分布式锁的注意事项

  1. 锁的粒度: 锁的粒度越细,并发性越高,但实现复杂度也越高。 需要根据实际情况选择合适的锁粒度。
  2. 锁的超时时间: 锁的超时时间需要根据业务场景进行设置。 如果超时时间过短,可能会导致锁被误释放;如果超时时间过长,可能会导致死锁。
  3. 重试机制: 获取锁失败后,可以采用重试机制。 重试次数和重试间隔需要根据实际情况进行调整。
  4. 避免长时间持有锁: 长时间持有锁会降低系统的并发性。 尽量缩短持有锁的时间。
  5. 监控和告警: 需要对分布式锁进行监控和告警。 及时发现和解决潜在问题。

一些想法

  • 分布式锁是解决分布式系统并发问题的关键技术。
  • Redis 和 ZooKeeper 是实现分布式锁的常用工具。
  • 选择合适的分布式锁方案需要根据实际场景进行权衡。
  • 需要注意分布式锁的各种细节,以避免潜在问题。

希望今天的分享对大家有所帮助,谢谢!

更多IT精英技术系列讲座,到智猿学院

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注