JAVA 分布式锁持有时间不足?结合 Lua 实现原子续期方案

JAVA 分布式锁持有时间不足?结合 Lua 实现原子续期方案

大家好,今天我们来聊聊分布式锁,特别是当Java应用中使用分布式锁时,可能遇到的锁持有时间不足的问题,以及如何利用Lua脚本实现原子续期方案。

分布式锁的需求与挑战

在分布式系统中,不同的服务实例可能需要访问共享资源。为了保证数据一致性,我们需要一种机制来协调这些服务实例对共享资源的访问,这就是分布式锁。 简单来说,分布式锁就是让多个进程在访问共享资源时,同一时刻只有一个进程可以获得锁并访问资源,其他进程需要等待。

常见方案包括基于数据库、ZooKeeper、Redis等实现。每种方案都有其优缺点,例如:

  • 基于数据库: 实现简单,但性能较差,存在单点故障风险。
  • 基于ZooKeeper: 可靠性高,支持锁的自动释放,但性能相对较低,尤其在高并发场景下。
  • 基于Redis: 性能高,易于实现,但需要考虑锁的自动释放和续期问题。

今天我们重点讨论基于Redis的分布式锁,以及如何解决其锁持有时间不足的问题。

Redis 分布式锁的简单实现

一个简单的 Redis 分布式锁的实现,通常使用 SETNX (SET if Not eXists) 命令来尝试获取锁,使用 DEL 命令来释放锁。

import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;

import java.util.Collections;

public class RedisLock {

    private final Jedis jedis;
    private final String lockKey;
    private final String lockValue;
    private final int expireTimeSeconds;

    public RedisLock(Jedis jedis, String lockKey, String lockValue, int expireTimeSeconds) {
        this.jedis = jedis;
        this.lockKey = lockKey;
        this.lockValue = lockValue;
        this.expireTimeSeconds = expireTimeSeconds;
    }

    public boolean tryLock() {
        SetParams setParams = new SetParams().ex(expireTimeSeconds).nx();
        String result = jedis.set(lockKey, lockValue, setParams);
        return "OK".equals(result);
    }

    public void unlock() {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(lockValue));
        if ("1".equals(result.toString())) {
            System.out.println("解锁成功");
        } else {
            System.out.println("解锁失败");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis("localhost", 6379); // 替换成你的Redis地址
        String lockKey = "my_lock";
        String lockValue = "unique_value_" + System.currentTimeMillis();
        int expireTimeSeconds = 10;

        RedisLock lock = new RedisLock(jedis, lockKey, lockValue, expireTimeSeconds);

        if (lock.tryLock()) {
            try {
                System.out.println("获取锁成功,开始执行业务逻辑...");
                Thread.sleep(5000); // 模拟业务逻辑执行时间
            } finally {
                lock.unlock();
                System.out.println("释放锁");
            }
        } else {
            System.out.println("获取锁失败,稍后重试");
        }

        jedis.close();
    }
}
  • tryLock(): 尝试获取锁,使用 SETNX 命令设置锁的键值,如果键不存在则设置成功,否则失败。 同时设置锁的过期时间,防止死锁。
  • unlock(): 释放锁,使用Lua脚本确保只有持有锁的线程才能释放锁,防止误删。 脚本原子性地检查锁的值是否与当前线程持有的值相同,如果相同则删除锁。

问题:锁持有时间不足

如果业务逻辑的执行时间超过了锁的过期时间,那么锁会被自动释放,其他线程可能会获取到锁,导致并发问题。 例如,上面的例子中,锁的过期时间是10秒,但业务逻辑执行了5秒,看似没问题,但如果业务逻辑执行时间变成了15秒,就会导致问题。

续期方案:守护线程 + Lua 脚本

为了解决锁持有时间不足的问题,一个常见的方案是使用守护线程 + Lua 脚本来实现锁的自动续期。

原理:

  1. 守护线程: 在获取锁成功后,启动一个守护线程,定期检查锁是否即将过期。
  2. Lua 脚本: 如果锁即将过期,使用 Lua 脚本原子性地延长锁的过期时间。

Lua 脚本的优势:

  • 原子性: Lua 脚本在 Redis 中是原子执行的,可以避免并发问题。
  • 性能: Lua 脚本执行效率高,可以减少对 Redis 的访问次数。

具体实现:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;

import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class RedisLockWithRenewal {

    private final Jedis jedis;
    private final String lockKey;
    private final String lockValue;
    private final int expireTimeSeconds;
    private final int renewalIntervalSeconds; // 续期间隔时间,通常小于过期时间
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private final AtomicBoolean isLocked = new AtomicBoolean(false);

    public RedisLockWithRenewal(Jedis jedis, String lockKey, String lockValue, int expireTimeSeconds, int renewalIntervalSeconds) {
        this.jedis = jedis;
        this.lockKey = lockKey;
        this.lockValue = lockValue;
        this.expireTimeSeconds = expireTimeSeconds;
        this.renewalIntervalSeconds = renewalIntervalSeconds;
    }

    public boolean tryLock() {
        SetParams setParams = new SetParams().ex(expireTimeSeconds).nx();
        String result = jedis.set(lockKey, lockValue, setParams);
        if ("OK".equals(result)) {
            isLocked.set(true);
            startRenewalThread();
            return true;
        }
        return false;
    }

    private void startRenewalThread() {
        scheduler.scheduleAtFixedRate(() -> {
            if (isLocked.get()) {
                String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('pexpire', KEYS[1], ARGV[2]) else return 0 end";
                Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.asList(lockValue, String.valueOf(expireTimeSeconds * 1000L)));
                if ("1".equals(result.toString())) {
                    System.out.println("续期成功");
                } else {
                    System.out.println("续期失败,可能锁已丢失");
                    stopRenewalThread(); // 停止续期,防止死循环
                }
            } else {
                stopRenewalThread(); // 如果锁已经释放,停止续期
            }
        }, renewalIntervalSeconds, renewalIntervalSeconds, TimeUnit.SECONDS);
    }

    public void unlock() {
        stopRenewalThread();
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(lockValue));
        if ("1".equals(result.toString())) {
            System.out.println("解锁成功");
        } else {
            System.out.println("解锁失败");
        }
        isLocked.set(false);
    }

    private void stopRenewalThread() {
        scheduler.shutdown();
    }

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis("localhost", 6379); // 替换成你的Redis地址
        String lockKey = "my_lock_renewal";
        String lockValue = "unique_value_" + System.currentTimeMillis();
        int expireTimeSeconds = 10;
        int renewalIntervalSeconds = 5; // 续期间隔,设置为过期时间的一半

        RedisLockWithRenewal lock = new RedisLockWithRenewal(jedis, lockKey, lockValue, expireTimeSeconds, renewalIntervalSeconds);

        if (lock.tryLock()) {
            try {
                System.out.println("获取锁成功,开始执行业务逻辑...");
                Thread.sleep(15000); // 模拟业务逻辑执行时间,超过过期时间
            } finally {
                lock.unlock();
                System.out.println("释放锁");
            }
        } else {
            System.out.println("获取锁失败,稍后重试");
        }

        jedis.close();
    }
}
  • renewalIntervalSeconds: 续期间隔时间,需要小于过期时间,以保证在锁过期之前进行续期。 通常设置为过期时间的一半。
  • startRenewalThread(): 启动续期线程,使用 ScheduledExecutorService 定期执行续期任务。
  • Lua 脚本 (pexpire): 使用 pexpire 命令原子性地延长锁的过期时间,单位是毫秒。
  • stopRenewalThread(): 停止续期线程,避免资源浪费。
  • isLocked: 使用 AtomicBoolean 保证线程安全。

代码解释:

  1. tryLock():尝试获取锁,如果成功,将isLocked设置为 true 并启动续期线程startRenewalThread()
  2. startRenewalThread():使用ScheduledExecutorService以固定的频率(renewalIntervalSeconds)执行续期任务。 在续期任务中,使用Lua脚本检查锁是否存在且值是否匹配,如果匹配,则使用pexpire命令原子性地延长锁的过期时间。 如果续期失败,则停止续期线程。
  3. unlock():释放锁,停止续期线程,并使用Lua脚本原子性地删除锁。
  4. stopRenewalThread():关闭ScheduledExecutorService,停止续期线程。

Lua 脚本详解:

if redis.call('get', KEYS[1]) == ARGV[1] then
  return redis.call('pexpire', KEYS[1], ARGV[2])
else
  return 0
end
  • KEYS[1]: 锁的键名。
  • ARGV[1]: 锁的值。
  • ARGV[2]: 新的过期时间,单位是毫秒。

该脚本首先检查锁的键名是否存在,并且锁的值是否与当前线程持有的值相同。 如果都满足,则使用 pexpire 命令延长锁的过期时间,并返回 1。 否则,返回 0。

注意事项:

  • 续期间隔: renewalIntervalSeconds 的设置需要合理,太短会增加 Redis 的压力,太长则可能导致锁过期。
  • 网络抖动: 在分布式系统中,网络抖动是不可避免的。 如果网络抖动导致续期失败,可能会导致锁过期。因此,需要考虑一定的容错机制,例如,在续期失败后进行重试。
  • Redis 集群: 如果使用 Redis 集群,需要确保 Lua 脚本在同一个节点上执行,可以使用 JedisClusterRedisson 等客户端。
  • 锁的粒度: 锁的粒度需要根据实际业务需求进行选择,粒度太粗可能会影响并发性能,粒度太细则可能增加锁的管理成本。

Redisson:更高级的分布式锁实现

Redisson 是一个基于 Redis 的 Java 驻内存数据网格(In-Memory Data Grid)。它提供了更高级的分布式锁实现,包括可重入锁、公平锁、读写锁等。

Redisson 已经内置了锁的自动续期机制,无需手动编写 Lua 脚本和守护线程。

示例代码:

import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.config.Config;

import java.util.concurrent.TimeUnit;

public class RedissonLockExample {

    public static void main(String[] args) throws InterruptedException {
        // 1. Create config object
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379"); // 替换成你的Redis地址

        // 2. Create Redisson instance
        Redisson redisson = (Redisson) Redisson.create(config);

        // 3. Get the lock
        RLock lock = redisson.getLock("my_redisson_lock");

        try {
            // 4. Try to acquire the lock
            boolean acquired = lock.tryLock(10, 30, TimeUnit.SECONDS); // 尝试获取锁,最多等待10秒,锁自动释放时间30秒
            if (acquired) {
                try {
                    System.out.println("获取锁成功,开始执行业务逻辑...");
                    Thread.sleep(20000); // 模拟业务逻辑执行时间
                } finally {
                    lock.unlock();
                    System.out.println("释放锁");
                }
            } else {
                System.out.println("获取锁失败,稍后重试");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            redisson.shutdown();
        }
    }
}
  • tryLock(long waitTime, long leaseTime, TimeUnit unit): 尝试获取锁,最多等待 waitTime 时间,如果获取成功,则锁的自动释放时间为 leaseTime。 Redisson 会自动进行锁的续期,直到 leaseTime 时间到达。
  • Redisson 的自动续期机制: Redisson 内部使用 Watchdog 线程来监控锁的过期时间,并在锁即将过期时自动进行续期。

Redisson 的优势:

  • 简单易用: Redisson 提供了丰富的 API,简化了分布式锁的使用。
  • 自动续期: 无需手动编写 Lua 脚本和守护线程,Redisson 会自动进行锁的续期。
  • 多种锁类型: Redisson 提供了多种锁类型,包括可重入锁、公平锁、读写锁等,可以满足不同的业务需求。
  • 集群支持: Redisson 完美支持 Redis 集群。

方案对比

特性 简单 Redis 锁 Lua 脚本 + 守护线程 Redisson
实现难度 简单 复杂 简单
自动续期 不支持 支持 支持
锁类型 单一 单一 多种 (可重入锁, 公平锁等)
集群支持 需要手动处理 需要手动处理 完美支持
性能 较高 较高 较高
适用场景 简单场景 复杂场景 各种场景
维护成本

总结:选择适合自己的方案

本文讨论了Java分布式锁可能遇到的锁持有时间不足的问题,并介绍了两种解决方案:基于Lua脚本的原子续期和Redisson。基于Lua脚本的原子续期方案需要手动编写代码,实现较为复杂,但可以更好地控制锁的续期过程。Redisson提供了更高级的分布式锁实现,简化了分布式锁的使用,并内置了锁的自动续期机制,但需要引入额外的依赖。选择哪种方案取决于具体的业务需求和技术栈。如果对锁的控制有较高的要求,可以选择基于Lua脚本的原子续期方案;如果希望快速实现分布式锁,并且对锁的控制要求不高,可以选择Redisson。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注