分布式锁竞争严重导致系统抖动的Redis与Zookeeper优化对比实战

分布式锁竞争严重导致系统抖动的Redis与Zookeeper优化对比实战

大家好,今天我们来聊聊分布式锁,特别是当锁竞争激烈时,如何利用Redis和Zookeeper进行优化,以避免系统抖动。分布式锁是解决分布式环境下数据一致性问题的关键工具,但在高并发场景下,不合理的锁设计会导致严重的性能瓶颈,进而引起系统抖动。本文将深入探讨Redis和Zookeeper两种常用分布式锁的实现方式,分析其优缺点,并结合实际案例,探讨如何优化锁竞争问题。

一、分布式锁的基本概念与必要性

在单体应用中,我们可以使用Java内置的synchronized关键字或ReentrantLock来实现线程同步,保证共享资源的安全访问。但在分布式系统中,多个服务实例独立运行,无法直接利用JVM层面的锁机制。因此,我们需要分布式锁,协调不同服务实例对共享资源的访问。

分布式锁的核心目标是:

  • 互斥性(Mutual Exclusion): 在任何时刻,只有一个客户端能持有锁。
  • 容错性(Fault Tolerance): 当持有锁的客户端发生故障时,锁能够被自动释放,避免死锁。
  • 可重入性(Reentrancy): 同一个客户端可以多次获取同一个锁。

二、Redis实现分布式锁

Redis实现分布式锁通常基于SETNX (SET if Not eXists) 命令和 EXPIRE 命令。SETNX 命令用于尝试设置一个键值对,如果键不存在则设置成功并返回1,如果键已存在则设置失败并返回0。 EXPIRE 命令用于设置键的过期时间,防止死锁。

2.1 初步实现(存在的问题)

以下是一个初步的Redis分布式锁的Java代码示例:

import redis.clients.jedis.Jedis;

public class RedisLock {

    private final String lockKey;
    private final Jedis jedis;
    private final int expireTime; // 锁的过期时间,单位秒

    public RedisLock(String lockKey, Jedis jedis, int expireTime) {
        this.lockKey = lockKey;
        this.jedis = jedis;
        this.expireTime = expireTime;
    }

    public boolean tryLock() {
        Long result = jedis.setnx(lockKey, String.valueOf(System.currentTimeMillis() + expireTime * 1000));
        if (result == 1L) {
            jedis.expire(lockKey, expireTime);
            return true;
        } else {
            return false;
        }
    }

    public void unlock() {
        jedis.del(lockKey);
    }

    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379);
        RedisLock lock = new RedisLock("my_lock", jedis, 10); // 锁10秒过期

        if (lock.tryLock()) {
            try {
                System.out.println("获取锁成功,执行业务逻辑...");
                Thread.sleep(5000); // 模拟业务处理
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println("释放锁");
            }
        } else {
            System.out.println("获取锁失败");
        }
        jedis.close();
    }
}

这段代码存在一个严重的问题:SETNXEXPIRE 命令不是原子操作。如果在 SETNX 成功后,EXPIRE 命令执行之前,服务器宕机,那么这个锁将永远不会被释放,造成死锁。

2.2 改进:使用原子性操作(SETNX + EXPIRE)

为了解决原子性问题,Redis 2.6.12版本引入了 SET 命令的扩展参数,允许同时设置键值和过期时间,保证原子性。

import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;

public class RedisLock {

    private final String lockKey;
    private final Jedis jedis;
    private final int expireTime;
    private final String clientId;

    public RedisLock(String lockKey, Jedis jedis, int expireTime, String clientId) {
        this.lockKey = lockKey;
        this.jedis = jedis;
        this.expireTime = expireTime;
        this.clientId = clientId;
    }

    public boolean tryLock() {
        SetParams params = new SetParams().nx().ex(expireTime);
        String result = jedis.set(lockKey, clientId, params);
        return "OK".equals(result);
    }

    public void unlock() {
        // 只有持有锁的客户端才能释放锁,防止误删
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, 1, lockKey, clientId);
        if ("1".equals(result.toString())) {
            System.out.println("解锁成功");
        } else {
            System.out.println("解锁失败");
        }

    }

    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379);
        String clientId = "client1"; // 唯一标识,可以是UUID
        RedisLock lock = new RedisLock("my_lock", jedis, 10, clientId);

        if (lock.tryLock()) {
            try {
                System.out.println("获取锁成功,clientId: " + clientId + ", 执行业务逻辑...");
                Thread.sleep(5000); // 模拟业务处理
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println("释放锁");
            }
        } else {
            System.out.println("获取锁失败");
        }
        jedis.close();
    }
}

这个改进后的代码使用了 SET 命令的 NXEX 参数,保证了原子性。 同时,在释放锁时,使用Lua脚本进行原子性判断,确保只有持有锁的客户端才能释放锁,避免误删其他客户端的锁。 同时,使用了唯一的clientId来标识客户端,防止误删锁,并方便后续的可重入锁实现。

2.3 解决锁竞争:重试机制与RedLock

在高并发场景下,多个客户端同时竞争锁,tryLock() 方法很可能返回 false。为了提高获取锁的成功率,可以使用重试机制:

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    long startTime = System.currentTimeMillis();
    long endTime = startTime + unit.toMillis(timeout);

    while (System.currentTimeMillis() < endTime) {
        if (tryLock()) {
            return true;
        }
        Thread.sleep(100); // 短暂休眠,避免CPU空转
    }
    return false;
}

这段代码在指定时间内不断尝试获取锁,如果获取成功则返回 true,否则返回 falseThread.sleep(100) 用于避免CPU空转,降低资源消耗。

但是,单Redis实例仍然存在单点故障的风险。为了提高可用性,可以使用Redis官方推荐的Redlock算法。 Redlock算法涉及到多个独立的Redis实例(通常是5个),客户端尝试在所有实例上获取锁,只有当超过半数的实例成功获取锁时,才认为获取锁成功。 释放锁时,需要释放所有实例上的锁。

Redlock算法的代码实现较为复杂,这里仅给出核心思路:

  1. 尝试在N个独立的Redis实例上执行 SET 命令(带 NXEX 参数)。
  2. 计算成功获取锁的实例数量。
  3. 如果成功获取锁的实例数量大于 N/2,则认为获取锁成功。
  4. 计算获取锁的总耗时。如果耗时超过锁的有效时间,则认为获取锁失败。
  5. 如果获取锁成功,则将锁的有效时间设置为 有效时间 - 获取锁的总耗时
  6. 释放锁时,需要释放所有实例上的锁。

Redlock算法虽然提高了可用性,但也增加了复杂性,并且存在一些争议。在实际应用中,需要根据业务场景权衡利弊。

三、Zookeeper实现分布式锁

Zookeeper是一个分布式协调服务,可以用来实现分布式锁。 Zookeeper的锁机制基于其临时顺序节点特性。

3.1 基本原理

  1. 客户端在Zookeeper的指定节点下创建一个临时顺序节点。
  2. 客户端获取该节点下的所有子节点,并判断自己创建的节点是否是序号最小的节点。
  3. 如果是序号最小的节点,则认为获取锁成功。
  4. 如果不是序号最小的节点,则监听比自己序号小的那个节点的删除事件。
  5. 如果监听的节点被删除,则重新尝试获取锁。
  6. 释放锁时,删除自己创建的临时节点。

由于Zookeeper的临时节点特性,如果客户端发生故障,连接断开,则Zookeeper会自动删除该客户端创建的临时节点,从而释放锁,避免死锁。

3.2 代码实现

以下是一个Zookeeper分布式锁的Java代码示例:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class ZookeeperLock {

    private final ZooKeeper zk;
    private final String lockPath;
    private String currentLockPath;
    private String waitLockPath;
    private final CountDownLatch connectedSignal = new CountDownLatch(1);

    public ZookeeperLock(String connectString, String lockPath) throws IOException, InterruptedException {
        this.lockPath = lockPath;
        this.zk = new ZooKeeper(connectString, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    connectedSignal.countDown();
                }
                if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitLockPath)) {
                    // 监听的节点被删除,重新尝试获取锁
                    try {
                        lock();
                    } catch (KeeperException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        connectedSignal.await();

        // 检查锁根节点是否存在,不存在则创建
        Stat stat = zk.exists(lockPath, false);
        if (stat == null) {
            try {
                zk.create(lockPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            } catch (KeeperException.NodeExistsException e) {
                // 节点已存在,忽略
            }
        }
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws KeeperException, InterruptedException {
        long startTime = System.currentTimeMillis();
        long endTime = startTime + unit.toMillis(timeout);

        while (System.currentTimeMillis() < endTime) {
            if (lock()) {
                return true;
            }
            Thread.sleep(100); // 短暂休眠
        }
        return false;
    }

    public boolean lock() throws KeeperException, InterruptedException {
        // 1. 创建临时顺序节点
        currentLockPath = zk.create(lockPath + "/lock-", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        // 2. 获取所有子节点并排序
        List<String> children = zk.getChildren(lockPath, false);
        Collections.sort(children);

        // 3. 判断自己是否是最小的节点
        if (currentLockPath.equals(lockPath + "/" + children.get(0))) {
            // 获取锁成功
            return true;
        } else {
            // 4. 监听比自己小的节点
            String subStr = currentLockPath.substring(lockPath.length() + 1);
            int currentIndex = Collections.binarySearch(children, subStr);
            waitLockPath = lockPath + "/" + children.get(currentIndex - 1);

            Stat stat = zk.exists(waitLockPath, true); // 监听节点删除事件
            if (stat != null) {
                return false; // 等待监听事件触发
            } else {
                //如果发现等待的节点不存在了,重新尝试获取锁
                return lock();
            }

        }
    }

    public void unlock() throws KeeperException, InterruptedException {
        zk.delete(currentLockPath, -1);
        System.out.println("释放锁: " + currentLockPath);
    }

    public void close() throws InterruptedException {
        zk.close();
    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        String connectString = "localhost:2181";
        String lockPath = "/my_lock";

        ZookeeperLock lock = new ZookeeperLock(connectString, lockPath);

        if (lock.tryLock(10, TimeUnit.SECONDS)) {
            try {
                System.out.println("获取锁成功,执行业务逻辑...");
                Thread.sleep(5000); // 模拟业务处理
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        } else {
            System.out.println("获取锁失败");
        }
        lock.close();
    }
}

3.3 解决锁竞争:羊群效应优化

当锁被释放时,所有监听该节点删除事件的客户端都会收到通知,并尝试获取锁。 如果客户端数量很多,这会导致大量的客户端同时竞争锁,造成"羊群效应",加重Zookeeper的负担。

为了解决羊群效应,我们可以将监听目标从前一个节点删除事件改为监听当前节点是否存在,如果前一个节点已经不存在,则直接尝试获取锁,避免所有客户端同时竞争。 上述代码已经实现了该优化。

四、Redis vs Zookeeper:优缺点对比与选择

特性 Redis Zookeeper
性能 高,基于内存,读写速度快 相对较低,需要进行磁盘IO
可靠性 单点故障风险,Redlock可提高可用性 高,基于Paxos协议,保证数据一致性
实现复杂度 简单,代码量少 相对复杂,需要理解Zookeeper API和原理
适用场景 高并发、低延迟场景 强调数据一致性、高可用性场景
锁的类型 悲观锁 乐观锁
是否支持重入 需要额外实现 默认支持
锁的自动释放 需要设置过期时间,存在误删风险 支持临时节点,客户端断开自动释放
资源消耗 相对较低 相对较高

五、优化分布式锁竞争的通用策略

无论是Redis还是Zookeeper,以下策略都可以在一定程度上缓解锁竞争:

  1. 减小锁的粒度: 将一个大的锁拆分成多个小的锁,减少锁冲突的可能性。例如,如果锁保护的是一张表中的数据,可以考虑将锁的粒度细化到行级别。
  2. 缩短锁的持有时间: 尽量在锁的临界区内只执行必要的代码,减少锁被占用的时间。
  3. 使用更高效的锁机制: 根据业务场景选择合适的锁机制。例如,如果对数据一致性要求不高,可以考虑使用乐观锁。
  4. 增加重试次数或重试间隔: 在获取锁失败时,可以增加重试次数或重试间隔,提高获取锁的成功率。但是,过多的重试可能会加剧锁竞争。
  5. 使用公平锁: 公平锁可以保证所有客户端都有机会获取锁,避免某些客户端一直无法获取锁的情况。Redis和Zookeeper都需要额外实现公平锁。
  6. 避免长事务: 长事务会长时间占用锁,增加锁冲突的可能性。应该尽量避免长事务,将事务拆分成多个小的事务。
  7. 批量操作: 将多个小的操作合并成一个大的操作,减少获取锁的次数。
  8. 错峰处理: 如果可能,尽量将请求分散到不同的时间段,避免大量的请求同时竞争锁。

六、案例分析:电商秒杀系统

假设我们有一个电商秒杀系统,需要保证库存的原子性更新。在高并发场景下,多个用户同时抢购同一件商品,会导致严重的锁竞争。

6.1 优化方案一:Redis + 乐观锁

我们可以使用Redis的 WATCH 命令实现乐观锁。

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import java.util.List;

public class SecKill {

    private static final String STOCK_KEY = "stock:item_id_123";
    private static final String USER_KEY_PREFIX = "user:item_id_123:";

    public static boolean secKill(String userId, Jedis jedis) {
        String userKey = USER_KEY_PREFIX + userId;

        // 检查用户是否已经购买过
        if (jedis.exists(userKey)) {
            System.out.println("用户 " + userId + " 已经购买过,不能重复购买");
            return false;
        }

        try {
            jedis.watch(STOCK_KEY); // 监听库存键
            String stock = jedis.get(STOCK_KEY);
            if (stock == null || Integer.parseInt(stock) <= 0) {
                System.out.println("商品已售罄");
                jedis.unwatch();
                return false;
            }

            Transaction tx = jedis.multi();
            tx.decr(STOCK_KEY); // 库存减一
            tx.set(userKey, "1"); // 记录用户已购买
            List<Object> result = tx.exec(); // 执行事务

            if (result == null || result.isEmpty()) {
                System.out.println("秒杀失败,库存不足或并发冲突");
                return false;
            } else {
                System.out.println("用户 " + userId + " 秒杀成功");
                return true;
            }
        } finally {
            jedis.close();
        }
    }

    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379);
        // 初始化库存
        jedis.set(STOCK_KEY, "10");
        jedis.close();

        for (int i = 1; i <= 20; i++) {
            final String userId = "user_" + i;
            new Thread(() -> {
                Jedis jedisThread = new Jedis("localhost", 6379);
                secKill(userId, jedisThread);
            }).start();
        }
    }
}

WATCH 命令用于监听库存键,如果在事务执行期间,库存键被其他客户端修改,则事务会被取消。

6.2 优化方案二:Zookeeper + 乐观锁

虽然Zookeeper本身没有提供类似Redis的WATCH命令的机制来实现乐观锁,但可以通过版本号来实现类似的功能。在Zookeeper中,每次节点的数据发生变化,其版本号都会增加。客户端在更新数据时,可以指定版本号,只有当版本号匹配时,更新才能成功。

  1. 读取数据和版本号: 客户端首先从Zookeeper读取需要更新的数据及其版本号。
  2. 进行本地计算: 在本地进行业务逻辑计算,得到新的数据。
  3. 尝试更新数据: 使用setData()方法更新数据,并指定期望的版本号。如果版本号与当前Zookeeper节点上的版本号一致,则更新成功;否则,更新失败。
  4. 处理更新失败: 如果更新失败,说明有其他客户端已经修改了数据。客户端需要重新读取数据和版本号,并重复上述步骤。
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZookeeperOptimisticLock {

    private final ZooKeeper zk;
    private final String path;
    private final CountDownLatch connectedSignal = new CountDownLatch(1);

    public ZookeeperOptimisticLock(String connectString, String path) throws IOException, InterruptedException {
        this.path = path;
        this.zk = new ZooKeeper(connectString, 5000, event -> {
            if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                connectedSignal.countDown();
            }
        });
        connectedSignal.await();
    }

    public boolean updateData(String newData) throws KeeperException, InterruptedException {
        while (true) {
            Stat stat = new Stat();
            byte[] data = zk.getData(path, false, stat); // 获取数据和版本号
            String currentValue = new String(data);
            int version = stat.getVersion();

            System.out.println("当前数据: " + currentValue + ", 版本号: " + version);
            if (currentValue.equals(newData)) {
                System.out.println("数据已经是最新的,无需更新");
                return true;
            }

            // 尝试更新数据,指定版本号
            try {
                Stat newStat = zk.setData(path, newData.getBytes(), version);
                System.out.println("数据更新成功,新版本号: " + newStat.getVersion());
                return true;
            } catch (KeeperException.BadVersionException e) {
                System.out.println("版本冲突,重新尝试更新");
                // 版本冲突,重新尝试更新
            }
        }
    }

    public void close() throws InterruptedException {
        zk.close();
    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        String connectString = "localhost:2181";
        String path = "/mydata";

        ZookeeperOptimisticLock lock = new ZookeeperOptimisticLock(connectString, path);
        // 创建节点,如果不存在
        ZooKeeper zk = lock.zk;
        if (zk.exists(path, false) == null) {
            zk.create(path, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

        // 模拟多个客户端并发更新数据
        for (int i = 0; i < 10; i++) {
            final int clientId = i;
            new Thread(() -> {
                try {
                    String newData = String.valueOf(clientId);
                    lock.updateData(newData);
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        // 等待一段时间,让所有线程执行完毕
        Thread.sleep(5000);
        lock.close();
    }
}

6.3 选择哪种方案?

  • 如果对性能要求极高,且可以容忍小概率的数据不一致,可以选择Redis + 乐观锁。Redis的性能优势可以减少锁竞争的发生,提高系统的吞吐量。
  • 如果对数据一致性要求非常高,可以选择Zookeeper + 乐观锁。Zookeeper可以保证数据的一致性,避免出现超卖等问题。
  • 如果需要更强的事务支持,可以考虑使用分布式事务,例如Seata。

七、总结:选择合适的锁,优化锁竞争

本文深入探讨了Redis和Zookeeper两种分布式锁的实现方式,分析了其优缺点,并给出了优化锁竞争的通用策略。在实际应用中,需要根据业务场景和性能要求选择合适的锁机制,并采取相应的优化措施,避免系统抖动。

锁的选择和优化是关键

选择Redis还是Zookeeper取决于业务需求,高并发选Redis,强一致性选Zookeeper。 优化锁竞争,可以从锁的粒度、持有时间、重试机制等方面入手。

案例分析,理论结合实践

通过电商秒杀系统的案例,展示了如何使用Redis和Zookeeper实现乐观锁,并分析了不同方案的优缺点。 实际应用中,需要结合具体场景选择合适的方案。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注