好的,各位朋友,今天咱们来聊聊C++分布式锁这事儿。分布式锁,听起来高大上,其实就是解决多个进程(或者机器)同时访问共享资源时,避免数据混乱的一种机制。想象一下,一群人抢一个厕所,没锁的话,那画面太美我不敢看。分布式锁就是那个厕所门上的锁,保证每次只有一个“人”(进程)能进去“方便”(访问资源)。
今天我们要讨论三个“锁匠”:ZooKeeper、etcd和Redis。他们各有绝活,能打造不同风格的锁。
第一位锁匠:ZooKeeper – 稳如老狗的锁匠
ZooKeeper,江湖人称“动物管理员”,它本质上是一个分布式协调服务,但用它来实现分布式锁,那是相当靠谱。它的核心思想是利用其提供的顺序一致性和临时节点特性。
- 顺序一致性: 保证所有客户端看到的事件顺序是一致的。
- 临时节点: 客户端与ZooKeeper断开连接后,该节点会被自动删除。
ZooKeeper锁的原理:
- 加锁: 客户端尝试创建一个带有顺序编号的临时节点,比如
/lock/node-0000000001
。 - 判断: 客户端获取
/lock
节点下的所有子节点,并按照序号排序。如果自己创建的节点序号最小,则认为获取锁成功。 - 监听: 如果自己创建的节点序号不是最小的,则监听排在自己前面的那个节点的删除事件。
- 释放锁: 客户端完成操作后,删除自己创建的节点。
C++代码示例 (使用Zookeeper C API):
#include <iostream>
#include <string>
#include <vector>
#include <algorithm>
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper_log.h>
class ZookeeperLock {
public:
ZookeeperLock(const std::string& hosts, const std::string& lockPath) : hosts_(hosts), lockPath_(lockPath), zk_(nullptr) {}
~ZookeeperLock() {
unlock(); // 确保析构时释放锁
if (zk_) {
zookeeper_close(zk_);
}
}
bool lock() {
zk_ = zookeeper_init(hosts_.c_str(), nullptr, 10000, 0, this, 0);
if (!zk_) {
std::cerr << "Error connecting to ZooKeeper." << std::endl;
return false;
}
// 等待连接建立
struct timeval tv;
tv.tv_sec = 10; // 等待10秒
tv.tv_usec = 0;
int rc = zoo_exists(zk_, "/", 0, nullptr); // 用zoo_exists 触发连接回调
if (rc != ZOK) {
std::cerr << "Error checking root node existence." << std::endl;
zookeeper_close(zk_);
zk_ = nullptr;
return false;
}
std::string lockZnodePath = lockPath_ + "/lock-";
char pathBuffer[256];
int bufferLen = sizeof(pathBuffer);
int ret = zoo_create(zk_, lockZnodePath.c_str(), nullptr, 0, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL | ZOO_SEQUENCE, pathBuffer, bufferLen);
if (ret != ZOK) {
std::cerr << "Error creating lock node: " << zerror(ret) << std::endl;
zookeeper_close(zk_);
zk_ = nullptr;
return false;
}
lockNodeName_ = pathBuffer;
std::cout << "Created lock node: " << lockNodeName_ << std::endl;
return tryAcquireLock();
}
void unlock() {
if (zk_ && !lockNodeName_.empty()) {
int ret = zoo_delete(zk_, lockNodeName_.c_str(), -1);
if (ret != ZOK && ret != ZNONODE) { // ZNONODE 表示节点可能已经被删除,比如会话超时
std::cerr << "Error deleting lock node: " << zerror(ret) << std::endl;
} else {
std::cout << "Released lock." << std::endl;
}
lockNodeName_.clear();
}
}
private:
bool tryAcquireLock() {
std::vector<std::string> children;
if (!getChildren(children)) {
return false;
}
std::sort(children.begin(), children.end());
std::string myNodeName = lockNodeName_.substr(lockPath_.length() + 1); // 提取节点名称,不包含路径
if (children.empty() || children[0] == myNodeName) {
std::cout << "Acquired lock." << std::endl;
return true;
}
// 找到自己的位置
auto it = std::find(children.begin(), children.end(), myNodeName);
if (it == children.end()) {
std::cerr << "Error: Lock node not found in children list." << std::endl;
return false;
}
// 监听前一个节点
if (it != children.begin()) {
std::string previousNode = *std::prev(it);
std::string watchPath = lockPath_ + "/" + previousNode;
std::cout << "Watching node: " << watchPath << std::endl;
int ret = zoo_exists(zk_, watchPath.c_str(), 1, watchCallback); // 1 表示设置watch
if (ret != ZOK && ret != ZNONODE) {
std::cerr << "Error setting watch on previous node: " << zerror(ret) << std::endl;
return false;
}
if(ret == ZNONODE) {
std::cout << "Previous node already gone, retrying lock acquisition" << std::endl;
return tryAcquireLock();
}
// 等待watch触发 (这里为了简化,使用busy waiting,实际应用中应该使用条件变量或者其他机制)
while (!watchTriggered_) {
usleep(10000); // 10ms
}
watchTriggered_ = false;
std::cout << "Watch triggered, retrying lock acquisition" << std::endl;
return tryAcquireLock(); // 重新尝试获取锁
}
return false; // 不应该到达这里
}
bool getChildren(std::vector<std::string>& children) {
String_vector string_vector;
int ret = zoo_get_children(zk_, lockPath_.c_str(), 0, &string_vector);
if (ret != ZOK) {
std::cerr << "Error getting children: " << zerror(ret) << std::endl;
return false;
}
for (int i = 0; i < string_vector.count; ++i) {
children.push_back(string_vector.data[i]);
}
deallocate_String_vector(&string_vector);
return true;
}
static void watchCallback(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
ZookeeperLock* lock = static_cast<ZookeeperLock*>(watcherCtx);
if (type == ZOO_DELETED_EVENT) {
std::cout << "Watch triggered for node deletion: " << path << std::endl;
lock->watchTriggered_ = true;
} else {
std::cout << "Unexpected watch event type: " << type << std::endl;
}
}
private:
std::string hosts_;
std::string lockPath_;
std::string lockNodeName_;
zhandle_t* zk_;
std::atomic<bool> watchTriggered_ {false};
};
int main() {
// 初始化ZooKeeper连接和锁路径
ZookeeperLock lock("localhost:2181", "/mylock"); // 确保ZooKeeper服务器运行在localhost:2181,且/mylock存在
// 创建 /mylock 节点,如果不存在
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
zhandle_t* zk = zookeeper_init("localhost:2181", nullptr, 10000, 0, nullptr, 0);
if (!zk) {
std::cerr << "Error connecting to ZooKeeper." << std::endl;
return 1;
}
if (zoo_exists(zk, "/mylock", 0, nullptr) == ZNONODE) {
int ret = zoo_create(zk, "/mylock", nullptr, 0, &ZOO_OPEN_ACL_UNSAFE, 0, nullptr, 0);
if (ret != ZOK) {
std::cerr << "Error creating lock path: " << zerror(ret) << std::endl;
zookeeper_close(zk);
return 1;
}
}
zookeeper_close(zk);
// 尝试获取锁
if (lock.lock()) {
// 模拟临界区操作
std::cout << "Executing critical section..." << std::endl;
sleep(5); // 模拟耗时操作
std::cout << "Critical section finished." << std::endl;
// 释放锁
lock.unlock();
} else {
std::cerr << "Failed to acquire lock." << std::endl;
}
return 0;
}
代码解释:
ZookeeperLock
类封装了锁的加锁和解锁逻辑。lock()
方法创建临时顺序节点,并尝试获取锁。tryAcquireLock()
方法获取所有子节点,判断自己是否是最小的节点。如果不是,则监听前一个节点的删除事件。unlock()
方法删除自己创建的节点,释放锁。watchCallback
函数是监听器回调函数,当监听的节点被删除时,会触发该函数,重新尝试获取锁。
ZooKeeper锁的优点:
- 可靠性高: ZooKeeper集群保证了锁的可用性。
- 强一致性: ZooKeeper保证了锁的公平性,先到先得。
- 自动释放: 客户端崩溃后,临时节点会被自动删除,避免死锁。
ZooKeeper锁的缺点:
- 性能相对较低: 每次加锁和解锁都需要与ZooKeeper集群通信。
- 实现复杂: 需要自己处理节点创建、监听、排序等逻辑。
第二位锁匠:etcd – 后起之秀的锁匠
etcd,一个分布式键值存储系统,也是Kubernetes集群的核心组件。它在分布式锁方面表现出色,并且比ZooKeeper更简洁。etcd使用lease(租约)和compare-and-swap (CAS)操作来实现锁。
- Lease: 可以理解为一把带有效期的钥匙,过期后自动失效。
- CAS: 原子性的比较并交换操作,保证只有一个客户端能成功修改键值。
etcd锁的原理:
- 创建Lease: 客户端创建一个带有TTL(Time-To-Live)的Lease。
- 加锁: 客户端尝试创建一个键,并将Lease ID绑定到该键上。如果键不存在,则创建成功,获取锁。
- 续租: 客户端定期续租Lease,防止Lease过期。
- 释放锁: 客户端删除该键或Lease过期,释放锁。
C++代码示例 (使用etcd gRPC API):
#include <iostream>
#include <memory>
#include <string>
#include <chrono>
#include <thread>
#include <grpcpp/grpcpp.h>
#include <etcd/Client.hpp>
#include <etcd/KV.hpp>
#include <etcd/Lease.hpp>
#include <etcd/Watch.hpp>
using namespace etcd;
class EtcdLock {
public:
EtcdLock(const std::string& endpoints, const std::string& lockKey, int leaseTTL = 10)
: endpoints_(endpoints), lockKey_(lockKey), leaseTTL_(leaseTTL), leaseId_(0) {}
~EtcdLock() {
unlock();
}
bool lock() {
Client client(endpoints_);
// 1. 创建 Lease
auto leaseResponse = client.lease().grant(leaseTTL_).get();
if (!leaseResponse.is_ok()) {
std::cerr << "Failed to grant lease: " << leaseResponse.error_message() << std::endl;
return false;
}
leaseId_ = leaseResponse.value().ID();
std::cout << "Granted lease with ID: " << leaseId_ << std::endl;
// 2. 续租 Lease (后台线程)
stopLeaseKeepAlive_ = false;
leaseKeepAliveThread_ = std::thread([this, endpoints_]() {
Client keepAliveClient(endpoints_);
while (!stopLeaseKeepAlive_) {
auto keepAliveResponse = keepAliveClient.lease().keepAlive(leaseId_).get();
if (!keepAliveResponse.is_ok()) {
std::cerr << "Failed to keep alive lease: " << keepAliveResponse.error_message() << std::endl;
break; // 退出续租循环
}
std::this_thread::sleep_for(std::chrono::seconds(leaseTTL_ / 2)); // 续租频率要小于TTL
}
std::cout << "Lease keep-alive thread stopped." << std::endl;
});
// 3. 尝试加锁 (CAS)
auto putResponse = client.kv().put(lockKey_, "locked", leaseId_, etcd::PrevExistType::DO_NOT_EXIST).get();
if (!putResponse.is_ok()) {
std::cerr << "Failed to acquire lock: " << putResponse.error_message() << std::endl;
// 释放 lease
stopLeaseKeepAlive_ = true;
if (leaseKeepAliveThread_.joinable()) {
leaseKeepAliveThread_.join();
}
client.lease().revoke(leaseId_).get();
leaseId_ = 0;
return false;
}
std::cout << "Acquired lock." << std::endl;
return true;
}
void unlock() {
if (leaseId_ != 0) {
stopLeaseKeepAlive_ = true;
if (leaseKeepAliveThread_.joinable()) {
leaseKeepAliveThread_.join();
}
Client client(endpoints_);
auto deleteResponse = client.kv().del(lockKey_).get(); //先删除key
if (!deleteResponse.is_ok()) {
std::cerr << "Failed to delete key: " << deleteResponse.error_message() << std::endl;
}
auto revokeResponse = client.lease().revoke(leaseId_).get();
if (!revokeResponse.is_ok()) {
std::cerr << "Failed to revoke lease: " << revokeResponse.error_message() << std::endl;
}
leaseId_ = 0;
std::cout << "Released lock." << std::endl;
}
}
private:
std::string endpoints_;
std::string lockKey_;
int leaseTTL_;
int64_t leaseId_;
std::thread leaseKeepAliveThread_;
std::atomic<bool> stopLeaseKeepAlive_ {false};
};
int main() {
// 初始化 etcd 连接和锁key
EtcdLock lock("http://localhost:2379", "/mylock"); // 确保 etcd 服务器运行在 localhost:2379
// 尝试获取锁
if (lock.lock()) {
// 模拟临界区操作
std::cout << "Executing critical section..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟耗时操作
std::cout << "Critical section finished." << std::endl;
// 释放锁
lock.unlock();
} else {
std::cerr << "Failed to acquire lock." << std::endl;
}
return 0;
}
代码解释:
EtcdLock
类封装了锁的加锁和解锁逻辑。lock()
方法创建Lease,然后尝试使用CAS操作创建键。unlock()
方法删除键,并revoke Lease。- 使用一个独立的线程来续租Lease,防止Lease过期。
etcd锁的优点:
- 性能较高: etcd使用gRPC协议,通信效率高。
- 实现简洁: 使用Lease和CAS操作,逻辑简单。
- 自动释放: Lease过期后自动释放锁,避免死锁。
etcd锁的缺点:
- 需要续租: 客户端需要定期续租Lease,增加了复杂度。
- 依赖Lease机制: 如果Lease机制出现问题,可能会导致锁失效。
第三位锁匠:Redis – 速度担当的锁匠
Redis,一个内存数据结构存储系统,以其高性能著称。使用Redis实现分布式锁,速度飞快,但需要注意一些细节。Redis实现分布式锁主要依赖SETNX
(SET if Not eXists)命令和EXPIRE
命令。
- SETNX: 如果键不存在,则设置键的值。
- EXPIRE: 设置键的过期时间。
Redis锁的原理:
- 加锁: 客户端尝试使用
SETNX
命令设置一个键,如果设置成功,则获取锁。 - 设置过期时间: 客户端使用
EXPIRE
命令设置键的过期时间,防止死锁。 - 释放锁: 客户端删除该键,释放锁。
C++代码示例 (使用 hiredis 库):
#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include <cstdlib> // For random()
#include <hiredis/hiredis.h>
class RedisLock {
public:
RedisLock(const std::string& host, int port, const std::string& lockKey, int lockTimeout = 30)
: host_(host), port_(port), lockKey_(lockKey), lockTimeout_(lockTimeout), redisContext_(nullptr) {}
~RedisLock() {
unlock();
if (redisContext_) {
redisFree(redisContext_);
}
}
bool lock() {
redisContext_ = redisConnect(host_.c_str(), port_);
if (redisContext_ == nullptr || redisContext_->err) {
if (redisContext_) {
std::cerr << "Error connecting to Redis: " << redisContext_->errstr << std::endl;
redisFree(redisContext_);
redisContext_ = nullptr;
} else {
std::cerr << "Error allocating redis context." << std::endl;
}
return false;
}
// 使用 SET key value NX PX milliseconds 实现原子性加锁
std::string value = generateRandomValue(); // 生成一个随机值,用于解锁时验证
lockValue_ = value;
std::string command = "SET " + lockKey_ + " " + value + " NX PX " + std::to_string(lockTimeout_ * 1000);
redisReply* reply = (redisReply*)redisCommand(redisContext_, command.c_str());
if (reply == nullptr) {
std::cerr << "Error executing command: " << redisContext_->errstr << std::endl;
redisFreeReplyObject(reply);
redisFree(redisContext_);
redisContext_ = nullptr;
return false;
}
if (reply->type == REDIS_REPLY_STATUS && strcasecmp(reply->str, "OK") == 0) {
std::cout << "Acquired lock." << std::endl;
freeReplyObject(reply);
return true;
} else {
std::cout << "Failed to acquire lock." << std::endl;
freeReplyObject(reply);
return false;
}
}
void unlock() {
if (redisContext_) {
// 使用 Lua 脚本原子性地验证和删除锁
const char* script =
"if redis.call('get', KEYS[1]) == ARGV[1] thenn"
" return redis.call('del', KEYS[1])n"
"elsen"
" return 0n"
"end";
redisReply* reply = (redisReply*)redisCommand(redisContext_, "EVAL %s 1 %s %s", script, lockKey_.c_str(), lockValue_.c_str());
if (reply == nullptr) {
std::cerr << "Error executing command: " << redisContext_->errstr << std::endl;
redisFree(redisContext_);
redisContext_ = nullptr;
return;
}
if (reply->type == REDIS_REPLY_INTEGER && reply->integer == 1) {
std::cout << "Released lock." << std::endl;
} else {
std::cout << "Failed to release lock (lock may have expired or been released by another client)." << std::endl;
}
freeReplyObject(reply);
}
}
private:
std::string generateRandomValue() {
// 生成一个简单的随机字符串,用于验证锁的持有者
const int length = 16;
const char charset[] = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
std::string result;
result.reserve(length);
for (int i = 0; i < length; ++i) {
int index = rand() % (sizeof(charset) - 1);
result += charset[index];
}
return result;
}
private:
std::string host_;
int port_;
std::string lockKey_;
int lockTimeout_;
std::string lockValue_;
redisContext* redisContext_;
};
int main() {
// 初始化 Redis 连接和锁key
srand(time(0)); // 初始化随机数种子
RedisLock lock("localhost", 6379, "mylock", 10); // 确保 Redis 服务器运行在 localhost:6379
// 尝试获取锁
if (lock.lock()) {
// 模拟临界区操作
std::cout << "Executing critical section..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟耗时操作
std::cout << "Critical section finished." << std::endl;
// 释放锁
lock.unlock();
} else {
std::cerr << "Failed to acquire lock." << std::endl;
}
return 0;
}
代码解释:
RedisLock
类封装了锁的加锁和解锁逻辑。lock()
方法使用SET key value NX PX milliseconds
命令原子性地设置键和过期时间。unlock()
方法使用Lua脚本原子性地验证和删除锁,避免误删其他客户端的锁。
Redis锁的优点:
- 性能极高: Redis是内存数据库,速度非常快。
- 实现简单: 使用
SETNX
和EXPIRE
命令,逻辑简单。
Redis锁的缺点:
- 可靠性相对较低: 如果Redis主节点宕机,可能会导致锁失效。可以使用Redis Sentinel或Cluster来提高可靠性。
- 可能出现死锁: 如果客户端在设置过期时间之前崩溃,可能会导致死锁。可以使用Redlock算法来解决这个问题(Redlock比较复杂,这里不展开)。
- 解锁的安全性: 必须验证锁的持有者,否则可能误删其他客户端的锁。 使用Lua脚本可以原子性的进行验证和删除操作。
总结:
特性 | ZooKeeper | etcd | Redis |
---|---|---|---|
一致性 | 强一致性 | 强一致性 | 最终一致性 (需要 Sentinel/Cluster 增强可靠性) |
性能 | 相对较低 | 较高 | 极高 |
可靠性 | 高 | 高 | 相对较低 (需要 Sentinel/Cluster 增强可靠性) |
实现难度 | 复杂 | 简单 | 简单 (但需要注意细节) |
适用场景 | 对可靠性要求极高的场景,例如配置管理、领导者选举 | 对性能有一定要求,且需要配置管理的场景,例如Kubernetes | 对性能要求极高的场景,例如秒杀、抢购 |
选择哪个锁匠,取决于你的需求:
- 追求稳如泰山: 选择ZooKeeper,虽然慢点,但可靠。
- 想要速度与可靠性兼顾: 选择etcd,性能和可靠性都不错。
- 要快如闪电: 选择Redis,但要注意可靠性问题。
记住,没有银弹,只有最适合你的解决方案。希望今天的分享能帮助你找到最合适的“锁”,守护你的数据安全。下次再见!