ZooKeeper Recipes:分布式锁、队列与组管理

好嘞!各位看官老爷们,欢迎来到今天的“动物园奇妙夜”特别节目!咳咳,别误会,咱们不是真去动物园,而是要聊聊一个跟动物园同名的神器—— ZooKeeper!

今天的主题是:“ZooKeeper Recipes:分布式锁、队列与组管理”。 听起来是不是有点高大上?别怕,我保证用最接地气的方式,把这些概念揉碎了,嚼烂了,喂到你嘴里,保证你消化吸收,学完就能在项目里大展身手!

第一幕:ZooKeeper,你到底是只啥动物?

首先,我们要搞清楚,ZooKeeper 究竟是个什么玩意儿? 难道是动物管理员?当然不是! 它可不是真管动物的,它管的是你的分布式系统!

你可以把 ZooKeeper 想象成一个分布式协调服务,就像一个中央情报局,负责协调各个服务器之间的行动,保证大家步调一致,不会出现混乱。 想象一下,如果没有 ZooKeeper,各个服务器就像一群脱缰的野马,各自为政,后果不堪设想!🤯

更形象一点,你可以把它看作是一个高度可靠的配置中心 + 分布式锁服务 + 命名服务 + 分布式队列 的集合体。 功能强大,用途广泛,简直是分布式系统界的瑞士军刀!

ZooKeeper 的核心特性:

  • 分层命名空间: 数据以树形结构组织,就像一个文件系统,方便管理和查找。
  • Watch机制: 客户端可以注册监听器,当数据发生变化时,ZooKeeper 会及时通知客户端。 这就像订阅了一个新闻推送,一旦有新消息,立刻收到提醒。
  • 原子性: 所有操作都是原子性的,要么全部成功,要么全部失败。 保证了数据的一致性。
  • 可靠性: ZooKeeper 集群通常由多个节点组成,即使部分节点宕机,服务仍然可用。 这就像一个备份系统,确保万无一失。
  • 顺序性: 事务的执行顺序与客户端发起的顺序一致。 保证了操作的有序性。

第二幕:分布式锁,谁是老大?

好了,了解了 ZooKeeper 的基本概念,我们开始进入正题,先来看看第一个“菜谱”:分布式锁!

在单机环境中,我们可以使用 Java 的 synchronized 关键字或者 ReentrantLock 来实现锁。 但是在分布式环境中,这些方法就失效了。 因为这些锁只能保证单个 JVM 进程的线程安全,无法保证多个 JVM 进程之间的互斥访问。

这时候,就需要分布式锁来救场了! 分布式锁的作用是:在多个服务器之间,保证只有一个客户端能够访问共享资源,避免数据冲突。

ZooKeeper 实现分布式锁的原理:

  1. 创建临时顺序节点: 当一个客户端想要获取锁时,它会在 ZooKeeper 中创建一个临时顺序节点。 节点路径类似于 /locks/mylock_0000000001。 临时节点意味着,当客户端断开连接时,该节点会自动删除。 顺序节点意味着,ZooKeeper 会自动为节点名称添加一个递增的序号。
  2. 获取所有子节点: 客户端获取 /locks 节点下的所有子节点。
  3. 判断是否是最小节点: 客户端判断自己创建的节点是否是所有子节点中序号最小的节点。 如果是,则获取锁成功。
  4. 监听前一个节点: 如果不是最小节点,则客户端监听序号比自己小的那个节点。 当前一个节点被删除时,客户端会收到通知,然后再次尝试获取锁。
  5. 释放锁: 当客户端完成对共享资源的访问后,它会删除自己创建的临时节点,释放锁。

流程图:

sequenceDiagram
    participant Client1
    participant Client2
    participant ZooKeeper

    Client1->>ZooKeeper: 创建临时顺序节点 /locks/mylock_0000000001
    activate ZooKeeper
    ZooKeeper-->>Client1: 创建成功
    deactivate ZooKeeper
    Client1->>ZooKeeper: 获取 /locks 下所有子节点
    activate ZooKeeper
    ZooKeeper-->>Client1: 返回子节点列表
    deactivate ZooKeeper
    Client1->>Client1: 判断是否是最小节点
    alt 是最小节点
        Client1->>Client1: 获取锁成功
        Client1->>Client1: 访问共享资源
        Client1->>ZooKeeper: 删除临时顺序节点
        activate ZooKeeper
        ZooKeeper-->>Client1: 删除成功
        deactivate ZooKeeper
    else 不是最小节点
        Client1->>ZooKeeper: 监听前一个节点
        activate ZooKeeper
        ZooKeeper-->>Client1: 监听成功
        deactivate ZooKeeper
        ZooKeeper->>Client1: 前一个节点被删除
        Client1->>Client1: 再次尝试获取锁
    end

    Client2->>ZooKeeper: 创建临时顺序节点 /locks/mylock_0000000002
    activate ZooKeeper
    ZooKeeper-->>Client2: 创建成功
    deactivate ZooKeeper
    Client2->>ZooKeeper: 获取 /locks 下所有子节点
    activate ZooKeeper
    ZooKeeper-->>Client2: 返回子节点列表
    deactivate ZooKeeper
    Client2->>Client2: 判断是否是最小节点
    alt 是最小节点
        Client2->>Client2: 获取锁成功
        Client2->>Client2: 访问共享资源
        Client2->>ZooKeeper: 删除临时顺序节点
        activate ZooKeeper
        ZooKeeper-->>Client2: 删除成功
        deactivate ZooKeeper
    else 不是最小节点
        Client2->>ZooKeeper: 监听前一个节点
        activate ZooKeeper
        ZooKeeper-->>Client2: 监听成功
        deactivate ZooKeeper
        ZooKeeper->>Client2: 前一个节点被删除
        Client2->>Client2: 再次尝试获取锁
    end

代码示例 (Java):

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class DistributedLock implements Watcher {

    private ZooKeeper zooKeeper;
    private String rootNode = "/locks";
    private String lockNode;
    private String currentLockPath;
    private String previousLockPath;
    private CountDownLatch latch = new CountDownLatch(1);

    public DistributedLock(String connectString, String lockName) throws IOException, KeeperException, InterruptedException {
        this.zooKeeper = new ZooKeeper(connectString, 5000, this);
        this.lockNode = lockName;

        // 确保根节点存在
        Stat stat = zooKeeper.exists(rootNode, false);
        if (stat == null) {
            zooKeeper.create(rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    public void acquireLock() throws KeeperException, InterruptedException {
        // 1. 创建临时顺序节点
        currentLockPath = zooKeeper.create(rootNode + "/" + lockNode + "_", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("Thread " + Thread.currentThread().getName() + " created lock node: " + currentLockPath);

        // 2. 尝试获取锁
        tryLock();
    }

    private boolean tryLock() throws KeeperException, InterruptedException {
        // 3. 获取所有子节点
        List<String> children = zooKeeper.getChildren(rootNode, false);

        // 4. 排序子节点
        Collections.sort(children);

        // 5. 判断是否是最小节点
        String currentLockName = currentLockPath.substring(rootNode.length() + 1);
        int index = children.indexOf(currentLockName);

        if (index == 0) {
            // 获取锁成功
            System.out.println("Thread " + Thread.currentThread().getName() + " acquired lock.");
            return true;
        } else {
            // 6. 监听前一个节点
            previousLockPath = rootNode + "/" + children.get(index - 1);
            Stat stat = zooKeeper.exists(previousLockPath, this); // 注册 Watcher
            if (stat == null) {
                // 前一个节点可能已经不存在了,再次尝试获取锁
                return tryLock();
            }

            // 等待前一个节点释放锁
            System.out.println("Thread " + Thread.currentThread().getName() + " waiting for lock release from " + previousLockPath);
            latch = new CountDownLatch(1);
            latch.await(5, TimeUnit.SECONDS); // 设置超时时间,避免死锁
            if (latch.getCount() > 0) {
                System.out.println("Thread " + Thread.currentThread().getName() + " timeout waiting for lock.");
                return false;
            }
            System.out.println("Thread " + Thread.currentThread().getName() + " received lock release notification.");
            return true;
        }
    }

    public void releaseLock() throws KeeperException, InterruptedException {
        // 7. 释放锁
        System.out.println("Thread " + Thread.currentThread().getName() + " releasing lock.");
        zooKeeper.delete(currentLockPath, -1);
        zooKeeper.close();
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeDeleted) {
            System.out.println("Received NodeDeleted event: " + event.getPath());
            if (event.getPath().equals(previousLockPath)) {
                latch.countDown(); // 唤醒等待线程
            }
        }
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        String connectString = "localhost:2181";
        String lockName = "mylock";

        for (int i = 0; i < 3; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    DistributedLock lock = new DistributedLock(connectString, lockName);
                    lock.acquireLock();
                    System.out.println("Thread " + threadId + " is working...");
                    Thread.sleep(2000); // 模拟工作
                    lock.releaseLock();
                } catch (IOException | KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
            }, "Thread-" + i).start();
        }
    }
}

优点:

  • 可靠性高: 基于 ZooKeeper 的集群特性,即使部分节点宕机,锁服务仍然可用。
  • 公平性: 基于顺序节点,保证了锁的获取顺序,避免了饥饿现象。

缺点:

  • 性能相对较低: 每次获取锁都需要与 ZooKeeper 进行多次通信,存在一定的延迟。
  • 实现相对复杂: 需要理解 ZooKeeper 的底层原理,代码实现相对繁琐。

第三幕:分布式队列,排队领盒饭!

接下来,我们来看看第二个“菜谱”:分布式队列!

在分布式系统中,经常需要使用队列来进行异步处理、消息传递、流量削峰等操作。 传统的队列,例如 RabbitMQ, Kafka等,都属于消息中间件,相对比较重。 ZooKeeper 也可以实现一个简单的分布式队列,虽然性能不如专业的队列,但是在某些场景下,使用 ZooKeeper 来实现队列更加轻量级。

ZooKeeper 实现分布式队列的原理:

  1. 创建持久节点: 创建一个持久节点作为队列的根节点,例如 /queue
  2. 生产者: 当生产者需要向队列中添加元素时,它会在队列根节点下创建一个临时顺序节点,例如 /queue/item_0000000001,节点数据就是队列元素的内容。
  3. 消费者: 消费者监听队列根节点下的子节点变化。当有新的子节点创建时,消费者会收到通知。
  4. 消费: 消费者获取所有子节点,然后按照顺序消费。 消费完成后,删除对应的子节点。

流程图:

sequenceDiagram
    participant Producer
    participant ZooKeeper
    participant Consumer

    Producer->>ZooKeeper: 创建临时顺序节点 /queue/item_0000000001
    activate ZooKeeper
    ZooKeeper-->>Producer: 创建成功
    deactivate ZooKeeper

    Consumer->>ZooKeeper: 监听 /queue 下子节点变化
    activate ZooKeeper
    ZooKeeper-->>Consumer: 监听成功
    deactivate ZooKeeper

    ZooKeeper->>Consumer: 节点 /queue/item_0000000001 创建事件
    Consumer->>ZooKeeper: 获取 /queue 下所有子节点
    activate ZooKeeper
    ZooKeeper-->>Consumer: 返回子节点列表
    deactivate ZooKeeper
    Consumer->>Consumer: 按照顺序消费
    Consumer->>ZooKeeper: 删除节点 /queue/item_0000000001
    activate ZooKeeper
    ZooKeeper-->>Consumer: 删除成功
    deactivate ZooKeeper

代码示例 (Java):

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;
import java.util.Collections;

public class DistributedQueue implements Watcher {

    private ZooKeeper zooKeeper;
    private String queueNode = "/queue";

    public DistributedQueue(String connectString) throws IOException, KeeperException, InterruptedException {
        this.zooKeeper = new ZooKeeper(connectString, 5000, this);

        // 确保队列根节点存在
        Stat stat = zooKeeper.exists(queueNode, false);
        if (stat == null) {
            zooKeeper.create(queueNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    public void enqueue(String data) throws KeeperException, InterruptedException {
        zooKeeper.create(queueNode + "/item_", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("Enqueued: " + data);
    }

    public String dequeue() throws KeeperException, InterruptedException {
        while (true) {
            List<String> children = zooKeeper.getChildren(queueNode, this);
            if (children.isEmpty()) {
                System.out.println("Queue is empty, waiting...");
                synchronized (this) {
                    wait(); // 等待通知
                }
                continue;
            }

            Collections.sort(children);
            String firstItem = children.get(0);
            String itemPath = queueNode + "/" + firstItem;

            try {
                byte[] data = zooKeeper.getData(itemPath, false, null);
                zooKeeper.delete(itemPath, -1);
                System.out.println("Dequeued: " + new String(data));
                return new String(data);
            } catch (KeeperException.NoNodeException e) {
                // 节点可能已经被其他消费者消费了,继续循环
                System.out.println("Node already deleted by another consumer, retrying...");
            }
        }
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeChildrenChanged) {
            System.out.println("Queue children changed, notifying consumer...");
            synchronized (this) {
                notifyAll(); // 唤醒等待线程
            }
        }
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        String connectString = "localhost:2181";
        DistributedQueue queue = new DistributedQueue(connectString);

        // 生产者
        new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    queue.enqueue("Message-" + i);
                    Thread.sleep(100);
                }
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
        }, "Producer").start();

        // 消费者
        new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    queue.dequeue();
                    Thread.sleep(500);
                }
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
        }, "Consumer").start();

        Thread.sleep(5000); // 等待一段时间
    }
}

优点:

  • 实现简单: 基于 ZooKeeper 的节点操作,实现简单方便。
  • 可靠性高: 基于 ZooKeeper 的集群特性,队列服务仍然可用。

缺点:

  • 性能较低: 每次入队和出队都需要与 ZooKeeper 进行通信,性能瓶颈明显。
  • 功能简单: 只能实现简单的 FIFO 队列,不支持优先级队列、延迟队列等高级功能。

第四幕:组管理,抱团取暖!

最后,我们来看看第三个“菜谱”:组管理!

在分布式系统中,经常需要将多个服务器组成一个组,共同完成一项任务。 例如,多个服务器组成一个集群,共同提供服务; 或者多个服务器组成一个计算组,共同完成一个计算任务。

ZooKeeper 实现组管理的原理:

  1. 创建持久节点: 创建一个持久节点作为组的根节点,例如 /group
  2. 加入组: 当一个服务器需要加入组时,它会在组根节点下创建一个临时节点,例如 /group/server1
  3. 获取成员列表: 客户端可以获取组根节点下的所有子节点,从而获取组的成员列表。
  4. 监听成员变化: 客户端可以监听组根节点下的子节点变化,当有新的服务器加入或退出组时,客户端会收到通知。

流程图:

sequenceDiagram
    participant Server1
    participant Server2
    participant ZooKeeper
    participant Client

    Server1->>ZooKeeper: 创建临时节点 /group/server1
    activate ZooKeeper
    ZooKeeper-->>Server1: 创建成功
    deactivate ZooKeeper

    Server2->>ZooKeeper: 创建临时节点 /group/server2
    activate ZooKeeper
    ZooKeeper-->>Server2: 创建成功
    deactivate ZooKeeper

    Client->>ZooKeeper: 获取 /group 下所有子节点
    activate ZooKeeper
    ZooKeeper-->>Client: 返回子节点列表 (server1, server2)
    deactivate ZooKeeper

    Client->>ZooKeeper: 监听 /group 下子节点变化
    activate ZooKeeper
    ZooKeeper-->>Client: 监听成功
    deactivate ZooKeeper

    Server1->>ZooKeeper: 断开连接
    activate ZooKeeper
    ZooKeeper-->>Server1: 断开成功
    deactivate ZooKeeper
    ZooKeeper->>Client: 节点 /group/server1 删除事件

代码示例 (Java):

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;

public class GroupManagement implements Watcher {

    private ZooKeeper zooKeeper;
    private String groupNode = "/group";
    private String memberNode;

    public GroupManagement(String connectString, String memberName) throws IOException, KeeperException, InterruptedException {
        this.zooKeeper = new ZooKeeper(connectString, 5000, this);
        this.memberNode = memberName;

        // 确保组根节点存在
        Stat stat = zooKeeper.exists(groupNode, false);
        if (stat == null) {
            zooKeeper.create(groupNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

        // 加入组
        joinGroup();

        // 监听成员变化
        getGroupMembers();
    }

    private void joinGroup() throws KeeperException, InterruptedException {
        String nodePath = zooKeeper.create(groupNode + "/" + memberNode + "_", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("Joined group: " + nodePath);
    }

    private void getGroupMembers() throws KeeperException, InterruptedException {
        List<String> members = zooKeeper.getChildren(groupNode, this);
        System.out.println("Group members: " + members);
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeChildrenChanged) {
            System.out.println("Group members changed!");
            try {
                getGroupMembers();
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        String connectString = "localhost:2181";

        new GroupManagement(connectString, "server1");
        new GroupManagement(connectString, "server2");

        Thread.sleep(60000); // 运行一段时间
    }
}

优点:

  • 实现简单: 基于 ZooKeeper 的节点操作,实现简单方便。
  • 实时性: 可以实时获取组的成员列表,并监听成员变化。

缺点:

  • 功能简单: 只能实现简单的组管理,不支持复杂的权限控制、角色管理等功能。

第五幕:总结与展望

好啦,今天的“动物园奇妙夜”就到这里了! 咱们一起探索了 ZooKeeper 的三种“菜谱”:分布式锁、分布式队列和组管理。 希望通过今天的讲解,你对 ZooKeeper 的应用场景有了更深入的了解。

表格总结:

功能 原理 优点 缺点 适用场景
分布式锁 创建临时顺序节点,监听前一个节点 可靠性高,公平性 性能相对较低,实现相对复杂 需要保证强一致性的场景,例如:数据库主备切换、资源互斥访问等
分布式队列 创建持久节点,创建临时顺序节点,监听子节点变化 实现简单,可靠性高 性能较低,功能简单 消息量较小,对性能要求不高的场景,例如:简单的任务调度、事件通知等
组管理 创建持久节点,创建临时节点,监听子节点变化 实现简单,实时性 功能简单 需要实时获取组的成员列表,并监听成员变化的场景,例如:服务注册与发现、配置中心等

当然,ZooKeeper 的功能远不止这些,它还可以用于实现配置中心、命名服务等等。 只要你发挥想象力,就能发现更多有趣的用法! 🚀

最后,希望大家在学习和使用 ZooKeeper 的过程中,能够像动物园里的动物一样,保持好奇心,不断探索,发现更多精彩! 谢谢大家! 🎉

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注