C++ 分布式锁:ZooKeeper, etcd, Redis 在 C++ 中的集成

好的,各位朋友,今天咱们来聊聊C++分布式锁这事儿。分布式锁,听起来高大上,其实就是解决多个进程(或者机器)同时访问共享资源时,避免数据混乱的一种机制。想象一下,一群人抢一个厕所,没锁的话,那画面太美我不敢看。分布式锁就是那个厕所门上的锁,保证每次只有一个“人”(进程)能进去“方便”(访问资源)。

今天我们要讨论三个“锁匠”:ZooKeeper、etcd和Redis。他们各有绝活,能打造不同风格的锁。

第一位锁匠:ZooKeeper – 稳如老狗的锁匠

ZooKeeper,江湖人称“动物管理员”,它本质上是一个分布式协调服务,但用它来实现分布式锁,那是相当靠谱。它的核心思想是利用其提供的顺序一致性临时节点特性。

  • 顺序一致性: 保证所有客户端看到的事件顺序是一致的。
  • 临时节点: 客户端与ZooKeeper断开连接后,该节点会被自动删除。

ZooKeeper锁的原理:

  1. 加锁: 客户端尝试创建一个带有顺序编号的临时节点,比如/lock/node-0000000001
  2. 判断: 客户端获取/lock节点下的所有子节点,并按照序号排序。如果自己创建的节点序号最小,则认为获取锁成功。
  3. 监听: 如果自己创建的节点序号不是最小的,则监听排在自己前面的那个节点的删除事件。
  4. 释放锁: 客户端完成操作后,删除自己创建的节点。

C++代码示例 (使用Zookeeper C API):

#include <iostream>
#include <string>
#include <vector>
#include <algorithm>
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper_log.h>

class ZookeeperLock {
public:
    ZookeeperLock(const std::string& hosts, const std::string& lockPath) : hosts_(hosts), lockPath_(lockPath), zk_(nullptr) {}

    ~ZookeeperLock() {
        unlock(); // 确保析构时释放锁
        if (zk_) {
            zookeeper_close(zk_);
        }
    }

    bool lock() {
        zk_ = zookeeper_init(hosts_.c_str(), nullptr, 10000, 0, this, 0);
        if (!zk_) {
            std::cerr << "Error connecting to ZooKeeper." << std::endl;
            return false;
        }

        // 等待连接建立
        struct timeval tv;
        tv.tv_sec = 10; // 等待10秒
        tv.tv_usec = 0;
        int rc = zoo_exists(zk_, "/", 0, nullptr); // 用zoo_exists 触发连接回调
        if (rc != ZOK) {
            std::cerr << "Error checking root node existence." << std::endl;
            zookeeper_close(zk_);
            zk_ = nullptr;
            return false;
        }

        std::string lockZnodePath = lockPath_ + "/lock-";
        char pathBuffer[256];
        int bufferLen = sizeof(pathBuffer);
        int ret = zoo_create(zk_, lockZnodePath.c_str(), nullptr, 0, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL | ZOO_SEQUENCE, pathBuffer, bufferLen);

        if (ret != ZOK) {
            std::cerr << "Error creating lock node: " << zerror(ret) << std::endl;
            zookeeper_close(zk_);
            zk_ = nullptr;
            return false;
        }

        lockNodeName_ = pathBuffer;
        std::cout << "Created lock node: " << lockNodeName_ << std::endl;

        return tryAcquireLock();
    }

    void unlock() {
        if (zk_ && !lockNodeName_.empty()) {
            int ret = zoo_delete(zk_, lockNodeName_.c_str(), -1);
            if (ret != ZOK && ret != ZNONODE) {  // ZNONODE 表示节点可能已经被删除,比如会话超时
                std::cerr << "Error deleting lock node: " << zerror(ret) << std::endl;
            } else {
                std::cout << "Released lock." << std::endl;
            }
            lockNodeName_.clear();

        }
    }

private:

    bool tryAcquireLock() {
        std::vector<std::string> children;
        if (!getChildren(children)) {
            return false;
        }

        std::sort(children.begin(), children.end());

        std::string myNodeName = lockNodeName_.substr(lockPath_.length() + 1); // 提取节点名称,不包含路径

        if (children.empty() || children[0] == myNodeName) {
            std::cout << "Acquired lock." << std::endl;
            return true;
        }

        // 找到自己的位置
        auto it = std::find(children.begin(), children.end(), myNodeName);
        if (it == children.end()) {
            std::cerr << "Error: Lock node not found in children list." << std::endl;
            return false;
        }

        // 监听前一个节点
        if (it != children.begin()) {
            std::string previousNode = *std::prev(it);
            std::string watchPath = lockPath_ + "/" + previousNode;
            std::cout << "Watching node: " << watchPath << std::endl;
            int ret = zoo_exists(zk_, watchPath.c_str(), 1, watchCallback); // 1 表示设置watch
            if (ret != ZOK && ret != ZNONODE) {
                std::cerr << "Error setting watch on previous node: " << zerror(ret) << std::endl;
                return false;
            }
            if(ret == ZNONODE) {
                std::cout << "Previous node already gone, retrying lock acquisition" << std::endl;
                return tryAcquireLock();
            }

            // 等待watch触发 (这里为了简化,使用busy waiting,实际应用中应该使用条件变量或者其他机制)
            while (!watchTriggered_) {
                usleep(10000); // 10ms
            }
            watchTriggered_ = false;
            std::cout << "Watch triggered, retrying lock acquisition" << std::endl;
            return tryAcquireLock(); // 重新尝试获取锁
        }

        return false; // 不应该到达这里
    }

    bool getChildren(std::vector<std::string>& children) {
        String_vector string_vector;
        int ret = zoo_get_children(zk_, lockPath_.c_str(), 0, &string_vector);
        if (ret != ZOK) {
            std::cerr << "Error getting children: " << zerror(ret) << std::endl;
            return false;
        }

        for (int i = 0; i < string_vector.count; ++i) {
            children.push_back(string_vector.data[i]);
        }

        deallocate_String_vector(&string_vector);
        return true;
    }

    static void watchCallback(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
        ZookeeperLock* lock = static_cast<ZookeeperLock*>(watcherCtx);
        if (type == ZOO_DELETED_EVENT) {
            std::cout << "Watch triggered for node deletion: " << path << std::endl;
            lock->watchTriggered_ = true;
        } else {
            std::cout << "Unexpected watch event type: " << type << std::endl;
        }
    }

private:
    std::string hosts_;
    std::string lockPath_;
    std::string lockNodeName_;
    zhandle_t* zk_;
    std::atomic<bool> watchTriggered_ {false};

};

int main() {
    // 初始化ZooKeeper连接和锁路径
    ZookeeperLock lock("localhost:2181", "/mylock"); // 确保ZooKeeper服务器运行在localhost:2181,且/mylock存在
    // 创建 /mylock 节点,如果不存在
    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
    zhandle_t* zk = zookeeper_init("localhost:2181", nullptr, 10000, 0, nullptr, 0);
    if (!zk) {
        std::cerr << "Error connecting to ZooKeeper." << std::endl;
        return 1;
    }

    if (zoo_exists(zk, "/mylock", 0, nullptr) == ZNONODE) {
        int ret = zoo_create(zk, "/mylock", nullptr, 0, &ZOO_OPEN_ACL_UNSAFE, 0, nullptr, 0);
        if (ret != ZOK) {
            std::cerr << "Error creating lock path: " << zerror(ret) << std::endl;
            zookeeper_close(zk);
            return 1;
        }
    }
    zookeeper_close(zk);

    // 尝试获取锁
    if (lock.lock()) {
        // 模拟临界区操作
        std::cout << "Executing critical section..." << std::endl;
        sleep(5); // 模拟耗时操作
        std::cout << "Critical section finished." << std::endl;

        // 释放锁
        lock.unlock();
    } else {
        std::cerr << "Failed to acquire lock." << std::endl;
    }

    return 0;
}

代码解释:

  • ZookeeperLock类封装了锁的加锁和解锁逻辑。
  • lock()方法创建临时顺序节点,并尝试获取锁。
  • tryAcquireLock()方法获取所有子节点,判断自己是否是最小的节点。如果不是,则监听前一个节点的删除事件。
  • unlock()方法删除自己创建的节点,释放锁。
  • watchCallback函数是监听器回调函数,当监听的节点被删除时,会触发该函数,重新尝试获取锁。

ZooKeeper锁的优点:

  • 可靠性高: ZooKeeper集群保证了锁的可用性。
  • 强一致性: ZooKeeper保证了锁的公平性,先到先得。
  • 自动释放: 客户端崩溃后,临时节点会被自动删除,避免死锁。

ZooKeeper锁的缺点:

  • 性能相对较低: 每次加锁和解锁都需要与ZooKeeper集群通信。
  • 实现复杂: 需要自己处理节点创建、监听、排序等逻辑。

第二位锁匠:etcd – 后起之秀的锁匠

etcd,一个分布式键值存储系统,也是Kubernetes集群的核心组件。它在分布式锁方面表现出色,并且比ZooKeeper更简洁。etcd使用lease(租约)和compare-and-swap (CAS)操作来实现锁。

  • Lease: 可以理解为一把带有效期的钥匙,过期后自动失效。
  • CAS: 原子性的比较并交换操作,保证只有一个客户端能成功修改键值。

etcd锁的原理:

  1. 创建Lease: 客户端创建一个带有TTL(Time-To-Live)的Lease。
  2. 加锁: 客户端尝试创建一个键,并将Lease ID绑定到该键上。如果键不存在,则创建成功,获取锁。
  3. 续租: 客户端定期续租Lease,防止Lease过期。
  4. 释放锁: 客户端删除该键或Lease过期,释放锁。

C++代码示例 (使用etcd gRPC API):

#include <iostream>
#include <memory>
#include <string>
#include <chrono>
#include <thread>

#include <grpcpp/grpcpp.h>
#include <etcd/Client.hpp>
#include <etcd/KV.hpp>
#include <etcd/Lease.hpp>
#include <etcd/Watch.hpp>

using namespace etcd;

class EtcdLock {
public:
    EtcdLock(const std::string& endpoints, const std::string& lockKey, int leaseTTL = 10)
        : endpoints_(endpoints), lockKey_(lockKey), leaseTTL_(leaseTTL), leaseId_(0) {}

    ~EtcdLock() {
        unlock();
    }

    bool lock() {
        Client client(endpoints_);

        // 1. 创建 Lease
        auto leaseResponse = client.lease().grant(leaseTTL_).get();
        if (!leaseResponse.is_ok()) {
            std::cerr << "Failed to grant lease: " << leaseResponse.error_message() << std::endl;
            return false;
        }
        leaseId_ = leaseResponse.value().ID();
        std::cout << "Granted lease with ID: " << leaseId_ << std::endl;

        // 2. 续租 Lease (后台线程)
        stopLeaseKeepAlive_ = false;
        leaseKeepAliveThread_ = std::thread([this, endpoints_]() {
            Client keepAliveClient(endpoints_);
            while (!stopLeaseKeepAlive_) {
                auto keepAliveResponse = keepAliveClient.lease().keepAlive(leaseId_).get();
                if (!keepAliveResponse.is_ok()) {
                    std::cerr << "Failed to keep alive lease: " << keepAliveResponse.error_message() << std::endl;
                    break; // 退出续租循环
                }
                std::this_thread::sleep_for(std::chrono::seconds(leaseTTL_ / 2)); // 续租频率要小于TTL
            }
            std::cout << "Lease keep-alive thread stopped." << std::endl;
        });

        // 3. 尝试加锁 (CAS)
        auto putResponse = client.kv().put(lockKey_, "locked", leaseId_, etcd::PrevExistType::DO_NOT_EXIST).get();
        if (!putResponse.is_ok()) {
            std::cerr << "Failed to acquire lock: " << putResponse.error_message() << std::endl;
            // 释放 lease
             stopLeaseKeepAlive_ = true;
             if (leaseKeepAliveThread_.joinable()) {
                 leaseKeepAliveThread_.join();
             }
            client.lease().revoke(leaseId_).get();
            leaseId_ = 0;

            return false;
        }

        std::cout << "Acquired lock." << std::endl;
        return true;
    }

    void unlock() {
        if (leaseId_ != 0) {
            stopLeaseKeepAlive_ = true;
            if (leaseKeepAliveThread_.joinable()) {
                leaseKeepAliveThread_.join();
            }

            Client client(endpoints_);
            auto deleteResponse = client.kv().del(lockKey_).get(); //先删除key
            if (!deleteResponse.is_ok()) {
                std::cerr << "Failed to delete key: " << deleteResponse.error_message() << std::endl;
            }

            auto revokeResponse = client.lease().revoke(leaseId_).get();
            if (!revokeResponse.is_ok()) {
                std::cerr << "Failed to revoke lease: " << revokeResponse.error_message() << std::endl;
            }
            leaseId_ = 0;
            std::cout << "Released lock." << std::endl;
        }
    }

private:
    std::string endpoints_;
    std::string lockKey_;
    int leaseTTL_;
    int64_t leaseId_;
    std::thread leaseKeepAliveThread_;
    std::atomic<bool> stopLeaseKeepAlive_ {false};

};

int main() {
    // 初始化 etcd 连接和锁key
    EtcdLock lock("http://localhost:2379", "/mylock"); // 确保 etcd 服务器运行在 localhost:2379

    // 尝试获取锁
    if (lock.lock()) {
        // 模拟临界区操作
        std::cout << "Executing critical section..." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟耗时操作
        std::cout << "Critical section finished." << std::endl;

        // 释放锁
        lock.unlock();
    } else {
        std::cerr << "Failed to acquire lock." << std::endl;
    }

    return 0;
}

代码解释:

  • EtcdLock类封装了锁的加锁和解锁逻辑。
  • lock()方法创建Lease,然后尝试使用CAS操作创建键。
  • unlock()方法删除键,并revoke Lease。
  • 使用一个独立的线程来续租Lease,防止Lease过期。

etcd锁的优点:

  • 性能较高: etcd使用gRPC协议,通信效率高。
  • 实现简洁: 使用Lease和CAS操作,逻辑简单。
  • 自动释放: Lease过期后自动释放锁,避免死锁。

etcd锁的缺点:

  • 需要续租: 客户端需要定期续租Lease,增加了复杂度。
  • 依赖Lease机制: 如果Lease机制出现问题,可能会导致锁失效。

第三位锁匠:Redis – 速度担当的锁匠

Redis,一个内存数据结构存储系统,以其高性能著称。使用Redis实现分布式锁,速度飞快,但需要注意一些细节。Redis实现分布式锁主要依赖SETNX(SET if Not eXists)命令和EXPIRE命令。

  • SETNX: 如果键不存在,则设置键的值。
  • EXPIRE: 设置键的过期时间。

Redis锁的原理:

  1. 加锁: 客户端尝试使用SETNX命令设置一个键,如果设置成功,则获取锁。
  2. 设置过期时间: 客户端使用EXPIRE命令设置键的过期时间,防止死锁。
  3. 释放锁: 客户端删除该键,释放锁。

C++代码示例 (使用 hiredis 库):

#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include <cstdlib> // For random()

#include <hiredis/hiredis.h>

class RedisLock {
public:
    RedisLock(const std::string& host, int port, const std::string& lockKey, int lockTimeout = 30)
        : host_(host), port_(port), lockKey_(lockKey), lockTimeout_(lockTimeout), redisContext_(nullptr) {}

    ~RedisLock() {
        unlock();
        if (redisContext_) {
            redisFree(redisContext_);
        }
    }

    bool lock() {
        redisContext_ = redisConnect(host_.c_str(), port_);
        if (redisContext_ == nullptr || redisContext_->err) {
            if (redisContext_) {
                std::cerr << "Error connecting to Redis: " << redisContext_->errstr << std::endl;
                redisFree(redisContext_);
                redisContext_ = nullptr;
            } else {
                std::cerr << "Error allocating redis context." << std::endl;
            }
            return false;
        }

        // 使用 SET key value NX PX milliseconds 实现原子性加锁
        std::string value = generateRandomValue(); // 生成一个随机值,用于解锁时验证
        lockValue_ = value;

        std::string command = "SET " + lockKey_ + " " + value + " NX PX " + std::to_string(lockTimeout_ * 1000);

        redisReply* reply = (redisReply*)redisCommand(redisContext_, command.c_str());

        if (reply == nullptr) {
            std::cerr << "Error executing command: " << redisContext_->errstr << std::endl;
            redisFreeReplyObject(reply);
            redisFree(redisContext_);
            redisContext_ = nullptr;
            return false;
        }

        if (reply->type == REDIS_REPLY_STATUS && strcasecmp(reply->str, "OK") == 0) {
            std::cout << "Acquired lock." << std::endl;
            freeReplyObject(reply);
            return true;
        } else {
            std::cout << "Failed to acquire lock." << std::endl;
            freeReplyObject(reply);
            return false;
        }
    }

    void unlock() {
        if (redisContext_) {
            // 使用 Lua 脚本原子性地验证和删除锁
            const char* script =
                "if redis.call('get', KEYS[1]) == ARGV[1] thenn"
                "  return redis.call('del', KEYS[1])n"
                "elsen"
                "  return 0n"
                "end";

            redisReply* reply = (redisReply*)redisCommand(redisContext_, "EVAL %s 1 %s %s", script, lockKey_.c_str(), lockValue_.c_str());
            if (reply == nullptr) {
                std::cerr << "Error executing command: " << redisContext_->errstr << std::endl;
                 redisFree(redisContext_);
                 redisContext_ = nullptr;
                 return;

            }

            if (reply->type == REDIS_REPLY_INTEGER && reply->integer == 1) {
                std::cout << "Released lock." << std::endl;
            } else {
                std::cout << "Failed to release lock (lock may have expired or been released by another client)." << std::endl;
            }
            freeReplyObject(reply);
        }
    }

private:

    std::string generateRandomValue() {
        // 生成一个简单的随机字符串,用于验证锁的持有者
        const int length = 16;
        const char charset[] = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
        std::string result;
        result.reserve(length);

        for (int i = 0; i < length; ++i) {
            int index = rand() % (sizeof(charset) - 1);
            result += charset[index];
        }
        return result;
    }

private:
    std::string host_;
    int port_;
    std::string lockKey_;
    int lockTimeout_;
    std::string lockValue_;
    redisContext* redisContext_;
};

int main() {
    // 初始化 Redis 连接和锁key
    srand(time(0)); // 初始化随机数种子

    RedisLock lock("localhost", 6379, "mylock", 10); // 确保 Redis 服务器运行在 localhost:6379

    // 尝试获取锁
    if (lock.lock()) {
        // 模拟临界区操作
        std::cout << "Executing critical section..." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟耗时操作
        std::cout << "Critical section finished." << std::endl;

        // 释放锁
        lock.unlock();
    } else {
        std::cerr << "Failed to acquire lock." << std::endl;
    }

    return 0;
}

代码解释:

  • RedisLock类封装了锁的加锁和解锁逻辑。
  • lock()方法使用SET key value NX PX milliseconds命令原子性地设置键和过期时间。
  • unlock()方法使用Lua脚本原子性地验证和删除锁,避免误删其他客户端的锁。

Redis锁的优点:

  • 性能极高: Redis是内存数据库,速度非常快。
  • 实现简单: 使用SETNXEXPIRE命令,逻辑简单。

Redis锁的缺点:

  • 可靠性相对较低: 如果Redis主节点宕机,可能会导致锁失效。可以使用Redis Sentinel或Cluster来提高可靠性。
  • 可能出现死锁: 如果客户端在设置过期时间之前崩溃,可能会导致死锁。可以使用Redlock算法来解决这个问题(Redlock比较复杂,这里不展开)。
  • 解锁的安全性: 必须验证锁的持有者,否则可能误删其他客户端的锁。 使用Lua脚本可以原子性的进行验证和删除操作。

总结:

特性 ZooKeeper etcd Redis
一致性 强一致性 强一致性 最终一致性 (需要 Sentinel/Cluster 增强可靠性)
性能 相对较低 较高 极高
可靠性 相对较低 (需要 Sentinel/Cluster 增强可靠性)
实现难度 复杂 简单 简单 (但需要注意细节)
适用场景 对可靠性要求极高的场景,例如配置管理、领导者选举 对性能有一定要求,且需要配置管理的场景,例如Kubernetes 对性能要求极高的场景,例如秒杀、抢购

选择哪个锁匠,取决于你的需求:

  • 追求稳如泰山: 选择ZooKeeper,虽然慢点,但可靠。
  • 想要速度与可靠性兼顾: 选择etcd,性能和可靠性都不错。
  • 要快如闪电: 选择Redis,但要注意可靠性问题。

记住,没有银弹,只有最适合你的解决方案。希望今天的分享能帮助你找到最合适的“锁”,守护你的数据安全。下次再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注