JAVA 定时任务集群环境重复执行?详解分布式任务锁实现方案

JAVA 定时任务集群环境重复执行?详解分布式任务锁实现方案

各位朋友,大家好!今天我们来聊聊一个在分布式系统中经常遇到的问题:JAVA 定时任务集群环境下的重复执行

在单机环境下,定时任务通常通过 TimerScheduledExecutorService 或者 Spring 的 @Scheduled 注解来实现。这些方案简单易用,但在集群环境下,每个节点都会执行相同的定时任务,导致重复执行,造成数据不一致或者资源浪费。

想象一下,如果你有一个定时任务是每天凌晨 2 点统计前一天的订单数据,并生成报表。在单机环境下,一切运行良好。但是,当你的系统扩展到多个节点后,每个节点都会在凌晨 2 点执行一次统计任务,最终生成多份重复的报表,这显然不是我们想要的。

那么,如何解决这个问题呢?答案就是:分布式任务锁

1. 分布式任务锁的概念

分布式任务锁是一种在分布式系统中用于控制对共享资源的并发访问的机制。它的核心思想是:在执行定时任务之前,先尝试获取锁,只有成功获取锁的节点才能执行任务,其他节点则放弃执行。这样,就保证了在集群环境中只有一个节点执行任务,从而避免了重复执行的问题。

2. 分布式任务锁的实现方案

实现分布式任务锁的方案有很多种,常用的包括:

  • 基于数据库的锁
  • 基于 Redis 的锁
  • 基于 ZooKeeper 的锁

下面我们分别来详细介绍这三种方案,并给出相应的代码示例。

2.1 基于数据库的锁

基于数据库的锁是最简单的一种实现方式。它的原理是:利用数据库的排他锁来实现互斥访问。

实现步骤:

  1. 创建一个锁表,包含以下字段:
    • lock_name:锁的名称,对应于定时任务的名称。
    • owner:锁的持有者,可以是节点 ID 或者 IP 地址。
    • create_time:锁的创建时间。
  2. 在执行定时任务之前,尝试向锁表中插入一条记录,其中 lock_name 为定时任务的名称,owner 为当前节点的标识。
  3. 如果插入成功,则表示获取锁成功,可以执行定时任务。
  4. 如果插入失败,则表示锁已经被其他节点持有,当前节点放弃执行任务。
  5. 任务执行完毕后,删除锁表中对应的记录。

代码示例 (Java + Spring JDBC):

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
public class DatabaseLock {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    private final String lockTableName = "distributed_lock";

    /**
     * 获取锁
     * @param lockName 锁的名称
     * @param owner 锁的持有者
     * @return true: 获取锁成功, false: 获取锁失败
     */
    @Transactional
    public boolean acquireLock(String lockName, String owner) {
        try {
            String sql = "INSERT INTO " + lockTableName + " (lock_name, owner, create_time) VALUES (?, ?, NOW())";
            jdbcTemplate.update(sql, lockName, owner);
            return true;
        } catch (DuplicateKeyException e) {
            // 锁已被占用
            return false;
        }
    }

    /**
     * 释放锁
     * @param lockName 锁的名称
     * @param owner 锁的持有者
     */
    public void releaseLock(String lockName, String owner) {
        String sql = "DELETE FROM " + lockTableName + " WHERE lock_name = ? AND owner = ?";
        jdbcTemplate.update(sql, lockName, owner);
    }
}

表结构 (MySQL):

CREATE TABLE `distributed_lock` (
  `lock_name` varchar(255) NOT NULL COMMENT '锁的名称',
  `owner` varchar(255) NOT NULL COMMENT '锁的持有者',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`lock_name`),
  UNIQUE KEY `uk_lock_name` (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='分布式锁表';

使用示例:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class MyTask {

    @Autowired
    private DatabaseLock databaseLock;

    private final String lockName = "my_task_lock";
    private final String owner = "node_1"; // 可以是节点ID或者IP地址

    @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
    public void executeTask() {
        if (databaseLock.acquireLock(lockName, owner)) {
            try {
                System.out.println("开始执行定时任务...");
                // 执行定时任务的逻辑
                Thread.sleep(5000); // 模拟任务执行时间
                System.out.println("定时任务执行完毕。");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                databaseLock.releaseLock(lockName, owner);
                System.out.println("释放锁。");
            }
        } else {
            System.out.println("未能获取到锁,放弃执行。");
        }
    }
}

优点:

  • 实现简单,易于理解。
  • 不需要引入额外的依赖。

缺点:

  • 性能较低,每次获取锁都需要进行数据库操作。
  • 存在单点故障的风险,如果数据库宕机,则所有定时任务都无法执行。
  • 需要手动释放锁,如果程序出现异常,可能导致锁无法释放,造成死锁。
  • 依赖数据库事务,性能瓶颈明显。

2.2 基于 Redis 的锁

基于 Redis 的锁是目前比较流行的分布式锁实现方案。它的原理是:利用 Redis 的 SETNX 命令的原子性来实现互斥访问。

实现步骤:

  1. 在执行定时任务之前,尝试使用 SETNX 命令设置一个 key,key 的名称对应于定时任务的名称,value 可以是当前节点的标识。
  2. 如果 SETNX 命令返回 1,则表示获取锁成功,可以执行定时任务。
  3. 如果 SETNX 命令返回 0,则表示锁已经被其他节点持有,当前节点放弃执行任务。
  4. 为了防止死锁,需要设置 key 的过期时间,即使程序出现异常,锁也会在一定时间后自动释放。
  5. 任务执行完毕后,删除 key。

代码示例 (Java + Jedis):

import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;
import org.springframework.stereotype.Component;

import java.util.Collections;

@Component
public class RedisLock {

    private final String LOCK_SUCCESS = "OK";
    private final Long RELEASE_SUCCESS = 1L;

    private final String redisHost = "127.0.0.1"; // Redis 服务器地址
    private final int redisPort = 6379; // Redis 服务器端口
    private final int lockExpireTime = 60; // 锁的过期时间,单位秒

    /**
     * 获取锁
     * @param lockName 锁的名称
     * @param requestId 请求ID,用于标识锁的持有者
     * @return true: 获取锁成功, false: 获取锁失败
     */
    public boolean acquireLock(String lockName, String requestId) {
        try (Jedis jedis = new Jedis(redisHost, redisPort)) {
            SetParams setParams = new SetParams().ex(lockExpireTime).nx();
            String result = jedis.set(lockName, requestId, setParams);
            return LOCK_SUCCESS.equals(result);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 释放锁
     * @param lockName 锁的名称
     * @param requestId 请求ID,用于标识锁的持有者
     * @return true: 释放锁成功, false: 释放锁失败
     */
    public boolean releaseLock(String lockName, String requestId) {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        try (Jedis jedis = new Jedis(redisHost, redisPort)) {
            Object result = jedis.eval(script, Collections.singletonList(lockName), Collections.singletonList(requestId));
            return RELEASE_SUCCESS.equals(result);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
}

使用示例:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class MyTask {

    @Autowired
    private RedisLock redisLock;

    private final String lockName = "my_task_lock";
    private final String requestId = UUID.randomUUID().toString(); // 使用UUID作为请求ID

    @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
    public void executeTask() {
        if (redisLock.acquireLock(lockName, requestId)) {
            try {
                System.out.println("开始执行定时任务...");
                // 执行定时任务的逻辑
                Thread.sleep(5000); // 模拟任务执行时间
                System.out.println("定时任务执行完毕。");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (redisLock.releaseLock(lockName, requestId)) {
                    System.out.println("释放锁成功。");
                } else {
                    System.out.println("释放锁失败。");
                }
            }
        } else {
            System.out.println("未能获取到锁,放弃执行。");
        }
    }
}

优点:

  • 性能较高,Redis 的读写速度非常快。
  • 可以设置 key 的过期时间,防止死锁。
  • 实现相对简单。

缺点:

  • 需要引入 Redis 依赖。
  • 存在 Redis 单点故障的风险,虽然可以通过 Redis 集群来解决,但是增加了复杂性。
  • 需要手动释放锁,需要使用lua脚本保证原子性,如果程序出现异常,可能导致锁无法释放。

2.3 基于 ZooKeeper 的锁

基于 ZooKeeper 的锁是一种可靠的分布式锁实现方案。它的原理是:利用 ZooKeeper 的临时顺序节点来实现互斥访问。

实现步骤:

  1. 在执行定时任务之前,创建一个临时顺序节点,节点的名称对应于定时任务的名称。
  2. 获取所有子节点,并按照节点名称排序。
  3. 判断当前节点是否是最小的节点。
  4. 如果是最小的节点,则表示获取锁成功,可以执行定时任务。
  5. 如果不是最小的节点,则监听比自己小的那个节点的变化,如果该节点被删除,则重新尝试获取锁。
  6. 任务执行完毕后,删除当前节点。

代码示例 (Java + Curator):

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.stereotype.Component;

@Component
public class ZookeeperLock {

    private final String zookeeperConnectionString = "127.0.0.1:2181"; // ZooKeeper 服务器地址
    private final String lockPath = "/my_task_lock"; // 锁的路径

    private CuratorFramework client;

    public ZookeeperLock() {
        // 创建 CuratorFramework 客户端
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
        client.start();
    }

    /**
     * 获取锁
     * @return true: 获取锁成功, false: 获取锁失败
     */
    public boolean acquireLock() {
        InterProcessMutex lock = new InterProcessMutex(client, lockPath);
        try {
            lock.acquire(); // 阻塞式获取锁
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 释放锁
     */
    public void releaseLock() {
        InterProcessMutex lock = new InterProcessMutex(client, lockPath);
        try {
            if (lock.isAcquiredInThisProcess()) { // 确保当前线程持有锁
                lock.release();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

使用示例:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class MyTask {

    @Autowired
    private ZookeeperLock zookeeperLock;

    @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
    public void executeTask() {
        if (zookeeperLock.acquireLock()) {
            try {
                System.out.println("开始执行定时任务...");
                // 执行定时任务的逻辑
                Thread.sleep(5000); // 模拟任务执行时间
                System.out.println("定时任务执行完毕。");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                zookeeperLock.releaseLock();
                System.out.println("释放锁。");
            }
        } else {
            System.out.println("未能获取到锁,放弃执行。");
        }
    }
}

优点:

  • 可靠性高,ZooKeeper 本身就具有高可用性。
  • 可以自动释放锁,即使程序出现异常,锁也会自动释放。
  • 不需要手动释放锁。

缺点:

  • 性能相对较低,ZooKeeper 的读写速度比 Redis 慢。
  • 需要引入 ZooKeeper 依赖。
  • 实现相对复杂。

3. 三种方案的对比

为了更清晰地了解这三种方案的优缺点,我们将其进行对比:

特性 基于数据库的锁 基于 Redis 的锁 基于 ZooKeeper 的锁
性能 较低
可靠性
实现复杂度 简单 复杂
依赖 数据库 Redis ZooKeeper
锁释放方式 手动 手动 (Lua) 自动
是否防死锁 是 (过期时间) 是 (临时节点)

4. 如何选择合适的方案

选择哪种方案取决于你的具体需求。

  • 如果你的系统对性能要求不高,并且不需要很高的可靠性,那么基于数据库的锁是一个不错的选择。 它的优点是实现简单,不需要引入额外的依赖。
  • 如果你的系统对性能要求较高,并且可以容忍一定的风险,那么基于 Redis 的锁是一个更好的选择。 它的优点是性能高,实现相对简单。
  • 如果你的系统对可靠性要求非常高,并且可以接受一定的性能损失,那么基于 ZooKeeper 的锁是最佳选择。 它的优点是可靠性高,可以自动释放锁。

5. 分布式任务锁的其他考虑因素

除了上面介绍的三种方案之外,还有一些其他的因素需要考虑:

  • 锁的粒度: 锁的粒度越细,并发性越高,但是实现也越复杂。需要根据实际情况选择合适的锁粒度。
  • 锁的超时时间: 锁的超时时间需要根据任务的执行时间来设置,如果超时时间设置过短,可能导致锁被提前释放,造成并发问题。
  • 重试机制: 如果获取锁失败,可以尝试重试,但是需要设置重试次数和重试间隔,防止无限重试。
  • 监控和告警: 需要对分布式锁进行监控和告警,及时发现和解决问题。

6. 分布式锁的增强策略

在实际使用中,为了保证分布式锁的可靠性,可以采取一些增强策略:

  • Redlock: Redlock 是一种基于 Redis 的分布式锁算法,它通过在多个 Redis 实例上获取锁来提高锁的可靠性。
  • 租约机制: 租约机制是指在获取锁之后,定期续租,以防止锁过期。
  • 看门狗机制: 看门狗机制是指在获取锁之后,启动一个线程来监控锁的状态,如果锁被意外释放,则重新尝试获取锁。

最后的一些建议

分布式锁是一个比较复杂的问题,需要根据实际情况选择合适的方案,并进行充分的测试。希望今天的分享能够帮助大家更好地理解分布式锁的原理和实现,并在实际项目中更好地应用它。

选择适合的分布式锁方案

根据性能、可靠性和复杂度等因素,选择适合自身业务场景的分布式锁方案。

确保锁的正确释放

无论是哪种实现方式,都要确保在任务执行完毕后,锁能够被正确释放,防止死锁的发生。

考虑锁的续约和重试机制

对于执行时间较长的任务,可以考虑锁的续约机制,防止锁过期。同时,可以设置重试机制,提高获取锁的成功率。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注