在当今高度依赖数据的世界中,分布式存储系统已成为支撑现代应用不可或缺的基础设施。这些系统面临的核心挑战之一是如何在面对节点故障、网络分区等不确定性因素时,依然能提供高可用性和数据一致性。Raft 共识协议正是为解决这一问题而生,它提供了一种易于理解且能保证强一致性的日志复制状态机方案。本讲座将深入探讨如何在分布式存储系统的内核中,利用 C++ 的强大能力,高效、可靠地实现 Raft 共识协议,构建一个高可靠的日志复制状态机。
分布式系统中的共识问题与 Raft 协议的基石
分布式系统中的共识(Consensus)问题,旨在让一个集群中的所有节点就某个提议的值达成一致。在分布式存储场景下,这意味着所有节点需要就操作序列(即日志)的顺序和内容达成一致。一旦达成共识,每个节点都能按照相同的顺序执行这些操作,从而维护一个复制的状态机。
早期的 Paxos 协议虽然理论上完美,但其复杂性使得工程实现极具挑战性。Raft 协议的出现,正是为了在保证与 Paxos 相同安全性和活性的前提下,极大地提升了协议的可理解性和实现难度。Raft 协议的核心思想是“首先理解,然后实现”,它通过精心设计的三种角色、两种 RPC 类型和严格的日志复制规则,使得协议逻辑清晰明了。
Raft 的核心概念与角色
Raft 协议定义了三种节点角色:
- Follower (跟随者):被动接收来自 Leader 的日志和心跳。如果长时间未收到 Leader 的消息,Follower 会成为 Candidate。
- Candidate (候选者):在选举超时后,Follower 会转换为 Candidate,发起选举以争取成为新的 Leader。
- Leader (领导者):负责处理所有客户端请求,管理日志复制,并周期性地向 Follower 发送心跳以维持领导地位。一个 Raft 集群在任何时刻只有一个 Leader。
Raft 协议通过一个不断递增的整数 Term (任期) 来逻辑地划分时间。每个 Term 都从一次选举开始,成功当选的 Leader 在该 Term 内领导集群。如果选举失败,或者 Leader 发现有更高 Term 的节点,它将退位。Term 在 Raft 的安全性和活性中扮演着至关重要的角色。
Raft 的核心 RPCs
Raft 协议主要通过两种 RPC (远程过程调用) 来实现其功能:
- RequestVote RPC (请求投票):由 Candidate 发送给其他节点,用于请求投票。
- AppendEntries RPC (附加日志条目):由 Leader 发送给 Follower,用于日志复制和心跳。
这些 RPC 不仅承载数据,还包含 Term 信息,用于确保节点始终遵守最新 Term 的领导。
日志复制状态机
Raft 的核心是复制状态机(Replicated State Machine)。客户端请求被封装成命令(Command),这些命令作为日志条目(Log Entry)附加到 Leader 的日志中。Leader 负责将这些日志条目复制到大多数 Follower 节点上。一旦某个日志条目被大多数节点复制并持久化,它就被认为是 已提交 (Committed)。Leader 会通知 Follower 提交这些条目,然后每个节点按序将已提交的日志条目应用到其本地状态机中,从而保证所有节点的状态最终一致。
为何选择 C++ 实现 Raft?
在分布式存储系统的内核中实现 Raft,对性能、资源控制和稳定性有着极高的要求。C++ 在此场景下展现出无与伦比的优势:
- 极致性能与低延迟:C++ 允许直接内存管理和底层硬件访问,避免了垃圾回收等高级语言的运行时开销,使得 Raft 协议的关键路径(如日志持久化、网络通信、状态机应用)能够以纳秒级的延迟执行。这对于存储系统吞吐量和响应时间至关重要。
- 精细的资源控制:在内核或接近内核的环境中,对内存、CPU 和 I/O 资源的精确控制是必须的。C++ 提供了手动内存管理(
new/delete),以及通过std::unique_ptr/std::shared_ptr实现智能指针,既能保证性能又能兼顾安全性。 - 跨平台兼容性与系统级编程能力:C++ 拥有强大的系统编程能力,能够直接调用操作系统 API,处理文件 I/O、网络套接字、多线程与并发原语,这使得它非常适合构建底层基础设施。
- 丰富的生态系统与成熟工具链:尽管 C++ 语言本身是低级的,但其生态系统提供了大量高性能库,如 Boost.Asio 用于异步网络编程,Protobuf 用于高效序列化,以及各种并发工具。
- 确定性行为:对于分布式共识协议,确定性是关键。C++ 程序的行为在很大程度上是可预测的,有助于调试和理解复杂的并发逻辑。
Raft 协议的 C++ 实现架构设计
一个 Raft 节点的 C++ 实现需要精心设计其内部结构,以高效管理状态、日志、网络通信和并发。
核心数据结构
1. LogEntry
日志条目是 Raft 协议中复制的基本单元。它包含任期、索引和客户端命令。
// raft_node.h
struct LogEntry {
uint64_t term; // 创建该日志条目时的任期
uint64_t index; // 日志条目在日志中的索引 (从1开始)
std::vector<char> command; // 客户端命令的序列化字节
// 其他元数据,例如CRC校验和,用于数据完整性
// 默认构造函数
LogEntry() : term(0), index(0) {}
// 参数化构造函数
LogEntry(uint64_t t, uint64_t idx, const std::vector<char>& cmd)
: term(t), index(idx), command(cmd) {}
// 序列化与反序列化方法 (可能通过 Protobuf 或自定义格式实现)
std::vector<char> serialize() const {
// ... 实现序列化逻辑,例如将term, index, command长度和command数据打包
// 示例:简单的拼接
std::vector<char> buffer;
buffer.resize(sizeof(term) + sizeof(index) + sizeof(uint32_t) + command.size());
size_t offset = 0;
memcpy(buffer.data() + offset, &term, sizeof(term)); offset += sizeof(term);
memcpy(buffer.data() + offset, &index, sizeof(index)); offset += sizeof(index);
uint32_t cmd_size = command.size();
memcpy(buffer.data() + offset, &cmd_size, sizeof(cmd_size)); offset += sizeof(cmd_size);
if (!command.empty()) {
memcpy(buffer.data() + offset, command.data(), command.size());
}
return buffer;
}
static LogEntry deserialize(const std::vector<char>& buffer) {
LogEntry entry;
size_t offset = 0;
if (buffer.size() < sizeof(entry.term) + sizeof(entry.index) + sizeof(uint32_t)) {
// 错误处理:缓冲区过小
return LogEntry();
}
memcpy(&entry.term, buffer.data() + offset, sizeof(entry.term)); offset += sizeof(entry.term);
memcpy(&entry.index, buffer.data() + offset, sizeof(entry.index)); offset += sizeof(entry.index);
uint32_t cmd_size;
memcpy(&cmd_size, buffer.data() + offset, sizeof(cmd_size)); offset += sizeof(cmd_size);
if (offset + cmd_size > buffer.size()) {
// 错误处理:命令数据大小不匹配
return LogEntry();
}
entry.command.assign(buffer.data() + offset, buffer.data() + offset + cmd_size);
return entry;
}
};
2. RaftNode 类
RaftNode 是核心,封装了节点的所有状态、逻辑和行为。
// raft_node.h
#include <vector>
#include <string>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <map>
#include <chrono>
#include <memory>
#include <functional>
// 前向声明RPC相关结构
struct RequestVoteArgs;
struct RequestVoteReply;
struct AppendEntriesArgs;
struct AppendEntriesReply;
// 节点状态
enum class NodeState {
Follower,
Candidate,
Leader
};
class RaftNode {
public:
RaftNode(uint64_t id, const std::vector<std::string>& peer_addrs,
std::function<void(const LogEntry&)> apply_log_callback);
~RaftNode();
void start();
void stop();
// 客户端接口
bool propose(const std::vector<char>& command); // 提议一个新命令
// RPC 处理函数
RequestVoteReply handleRequestVote(const RequestVoteArgs& args);
AppendEntriesReply handleAppendEntries(const AppendEntriesArgs& args);
private:
uint64_t node_id_;
std::vector<std::string> peer_addrs_; // 存储集群中所有节点的地址 (包括自身)
std::map<uint64_t, std::string> peer_id_to_addr_map_; // 节点ID到地址的映射
// 持久化状态 (需要定期写入磁盘)
std::atomic<uint64_t> current_term_; // 节点看到的最新任期
std::atomic<uint64_t> voted_for_; // 在当前任期投票给的候选者ID (0表示未投票)
std::vector<LogEntry> log_; // 日志条目集合
// 易失性状态 (所有服务器)
std::atomic<uint64_t> commit_index_; // 已知最高的已提交日志条目索引
std::atomic<uint64_t> last_applied_; // 已应用到状态机的最高日志条目索引
// 易失性状态 (Leader 特有)
std::map<uint64_t, uint64_t> next_index_; // 对于每个 Follower,Leader 接下来要发送给它的日志条目索引
std::map<uint64_t, uint64_t> match_index_; // 对于每个 Follower,Leader 已知它已复制的最高日志条目索引
// 状态机
NodeState state_;
std::mutex mtx_; // 保护RaftNode内部状态的互斥锁
std::condition_variable cv_; // 用于线程间通信
// 定时器相关
std::thread timer_thread_;
std::atomic<bool> running_;
std::chrono::steady_clock::time_point last_heartbeat_time_; // 上次收到Leader心跳或有效AppendEntries的时间
std::chrono::steady_clock::time_point election_start_time_; // 当前选举的开始时间
std::chrono::milliseconds election_timeout_min_;
std::chrono::milliseconds election_timeout_max_;
std::chrono::milliseconds heartbeat_interval_;
// 网络层接口 (抽象)
std::unique_ptr<RpcServer> rpc_server_;
std::unique_ptr<RpcClientPool> rpc_client_pool_; // 每个peer一个client,或连接池
// 状态机应用回调
std::function<void(const LogEntry&)> apply_log_callback_;
// 内部方法
void run_loop(); // Raft主循环
void become_follower(uint64_t term);
void become_candidate();
void become_leader();
void reset_election_timer();
std::chrono::milliseconds get_random_election_timeout();
void send_append_entries_to_peers(bool is_heartbeat = false);
void send_request_vote_to_peers();
uint64_t get_last_log_index() const;
uint64_t get_last_log_term() const;
void persist_state(); // 持久化 current_term, voted_for, log
void load_state(); // 从磁盘加载状态
// 辅助函数,用于检查日志匹配
bool check_log_match(uint64_t prev_log_index, uint64_t prev_log_term) const;
void apply_committed_entries(); // 将已提交的日志应用到状态机
};
RPC 接口定义
Raft 协议的 RPCs 需要清晰的参数和返回结构,以便于序列化和网络传输。
// raft_rpc_messages.h
// RequestVote RPC
struct RequestVoteArgs {
uint64_t term; // 候选者的任期
uint64_t candidate_id; // 候选者ID
uint64_t last_log_index; // 候选者最新日志条目的索引
uint64_t last_log_term; // 候选者最新日志条目的任期
// 序列化/反序列化方法
std::vector<char> serialize() const { /* ... */ return {}; }
static RequestVoteArgs deserialize(const std::vector<char>& buffer) { /* ... */ return {}; }
};
struct RequestVoteReply {
uint64_t term; // 接收者当前的任期,用于候选者更新自身任期
bool vote_granted; // 如果候选者获得了投票,则为 true
// 序列化/反序列化方法
std::vector<char> serialize() const { /* ... */ return {}; }
static RequestVoteReply deserialize(const std::vector<char>& buffer) { /* ... */ return {}; }
};
// AppendEntries RPC
struct AppendEntriesArgs {
uint64_t term; // Leader 的任期
uint64_t leader_id; // Leader 的 ID
uint64_t prev_log_index; // 紧随新日志条目之前的日志条目索引
uint64_t prev_log_term; // 紧随新日志条目之前的日志条目任期
std::vector<LogEntry> entries; // 待复制的日志条目 (可能为空,用于心跳)
uint64_t leader_commit; // Leader 已提交的最高日志条目索引
// 序列化/反序列化方法
std::vector<char> serialize() const { /* ... */ return {}; }
static AppendEntriesArgs deserialize(const std::vector<char>& buffer) { /* ... */ return {}; }
};
struct AppendEntriesReply {
uint64_t term; // 接收者当前的任期,用于 Leader 更新自身任期
bool success; // 如果 Follower 包含了匹配 prev_log_index 和 prev_log_term 的条目,则为 true
uint64_t conflict_term; // 用于快速回溯,如果 success 为 false 且日志冲突,此为冲突日志的任期
uint64_t conflict_index; // 用于快速回溯,如果 success 为 false 且日志冲突,此为冲突日志在该任期内的第一个索引
// 序列化/反序列化方法
std::vector<char> serialize() const { /* ... */ return {}; }
static AppendEntriesReply deserialize(const std::vector<char>& buffer) { /* ... */ return {}; }
};
序列化/反序列化是网络通信的关键。在实际系统中,通常会使用高性能的序列化库,如 Google Protobuf、FlatBuffers 或 MessagePack。这些库能够将 C++ 对象高效地转换为字节流进行网络传输,并反之。
持久化层设计
Raft 协议要求 current_term、voted_for 和 log 必须是持久化的,以便在节点崩溃重启后能够恢复状态。
// raft_node.cpp (部分)
void RaftNode::persist_state() {
// 实际实现中,这里应该将 current_term_, voted_for_, log_ 写入到持久存储。
// 可以使用文件系统,例如一个专门的日志文件,一个元数据文件。
// 为了性能和原子性,通常会采用 WAL (Write-Ahead Log) 机制。
// 例如:
// 1. 创建一个临时文件
// 2. 将 current_term, voted_for 写入元数据文件
// 3. 将 log_ entries 逐条或批量写入日志文件
// 4. fsync() 确保数据写入物理磁盘
// 5. 原子性地替换旧文件 (rename)
// 简化示例:将状态序列化到内存,实际应写入磁盘
std::vector<char> term_data(sizeof(uint64_t));
uint64_t current_term_val = current_term_.load();
memcpy(term_data.data(), ¤t_term_val, sizeof(uint64_t));
std::vector<char> voted_for_data(sizeof(uint64_t));
uint64_t voted_for_val = voted_for_.load();
memcpy(voted_for_data.data(), &voted_for_val, sizeof(uint64_t));
// 假设有一个持久化存储接口
// persistence_manager_->write_metadata("current_term", term_data);
// persistence_manager_->write_metadata("voted_for", voted_for_data);
// persistence_manager_->write_log_entries(log_);
// LOG(INFO) << "Raft state persisted.";
}
void RaftNode::load_state() {
// 从磁盘加载持久化状态
// 示例:
// std::vector<char> term_data = persistence_manager_->read_metadata("current_term");
// if (!term_data.empty()) {
// uint64_t current_term_val;
// memcpy(¤t_term_val, term_data.data(), sizeof(uint64_t));
// current_term_.store(current_term_val);
// }
// ... 类似地加载 voted_for 和 log
// 简化:假定初始状态
current_term_.store(0);
voted_for_.store(0);
log_.clear();
// 确保日志至少包含一个虚拟的空条目,索引为0,任期为0
// 这有助于简化 prevLogIndex 和 prevLogTerm 的处理
log_.emplace_back(0, 0, std::vector<char>());
// LOG(INFO) << "Raft state loaded or initialized.";
}
日志的持久化通常采用分段文件和索引的方式,以支持高效的随机读写和修剪(例如,当生成快照时)。
网络通信与并发模型
高性能的 Raft 实现离不开高效的网络通信和并发处理。
1. 网络层抽象
为了解耦 Raft 核心逻辑与具体的网络库,我们通常会定义一个抽象的网络接口。
// rpc_interface.h
class RpcServer {
public:
virtual ~RpcServer() = default;
virtual void start() = 0;
virtual void stop() = 0;
// 注册RPC处理函数,例如:
// void register_handler(const std::string& method_name, std::function<std::vector<char>(const std::vector<char>&)> handler);
};
class RpcClient {
public:
virtual ~RpcClient() = default;
virtual std::vector<char> send_request(const std::string& method_name, const std::vector<char>& request_data,
std::chrono::milliseconds timeout) = 0;
};
// 实际实现可以使用 Boost.Asio 或裸露的 socket API
// 例如 Boost.Asio 实现的 RpcServer 和 RpcClient
在 RaftNode 中,rpc_server_ 负责监听传入的 RPC 请求,并将其分派给 handleRequestVote 或 handleAppendEntries 方法。rpc_client_pool_ 负责向其他节点发送 RPC 请求。
2. 并发模型
Raft 协议的事件驱动性质(定时器、RPC 请求)决定了其并发模型。
- 主循环线程:一个专用的线程(或
asio::io_context)运行 Raft 协议的主循环,处理定时器事件(选举超时、心跳),并驱动状态转换。 - RPC 服务线程池:一个线程池处理所有传入的 RPC 请求。每个请求在一个单独的线程中执行其处理逻辑。
- RPC 客户端线程池/异步发送:对于 Leader 向 Follower 发送的
AppendEntries请求,可以采用异步非阻塞的方式,或者使用一个小的线程池来并行发送。
互斥锁与条件变量:RaftNode 的所有共享状态(current_term_, voted_for_, log_, state_, next_index_, match_index_ 等)都必须受到互斥锁(std::mutex mtx_)的保护,以防止数据竞争。条件变量(std::condition_variable cv_)用于在不同线程之间同步,例如当日志被提交时通知应用状态机线程。
// raft_node.cpp (部分)
void RaftNode::run_loop() {
// 这是一个简化版的主循环,实际可能与 Boost.Asio 的 io_context 结合
running_ = true;
reset_election_timer(); // 初始时作为Follower,启动选举计时器
while (running_) {
std::unique_lock<std::mutex> lock(mtx_); // 锁定RaftNode状态
// 等待选举超时或收到外部信号
// 使用条件变量等待,直到超时或被其他事件唤醒
bool timeout = cv_.wait_for(lock, get_random_election_timeout(), [this]() {
// Predicate: 如果收到心跳或有效AppendEntries,则计时器重置
// 实际上这个逻辑会在 handleAppendEntries 中更新 last_heartbeat_time_
// 并且通过 notify_one/all 来唤醒
return !running_ || (state_ == NodeState::Follower &&
std::chrono::steady_clock::now() < last_heartbeat_time_ + get_random_election_timeout());
// 上述Predicate需要更精细的控制,例如,如果handleAppendEntries重置了计时器,
// 那么run_loop应该被唤醒并重新计算等待时间。
// 简化处理:wait_for直接返回超时或被通知
return !running_; // 如果running_变为false,则退出循环
});
if (!running_) {
break; // 停止
}
switch (state_) {
case NodeState::Follower: {
// 如果选举超时,且没有收到Leader的心跳
if (std::chrono::steady_clock::now() > last_heartbeat_time_ + get_random_election_timeout()) {
become_candidate();
}
break;
}
case NodeState::Candidate: {
// 选举超时,再次发起选举
if (std::chrono::steady_clock::now() > election_start_time_ + get_random_election_timeout()) {
become_candidate(); // 再次转为Candidate,开始新的任期和选举
}
break;
}
case NodeState::Leader: {
// Leader 定期发送心跳
if (std::chrono::steady_clock::now() > last_heartbeat_time_ + heartbeat_interval_) {
send_append_entries_to_peers(true); // 发送心跳 (空的AppendEntries)
last_heartbeat_time_ = std::chrono::steady_clock::now();
}
break;
}
}
// 在实际应用中,这里可能会使用更复杂的定时器管理,例如 Boost.Asio 的 `steady_timer`
// 确保每次循环都能处理RPC或定时器事件
lock.unlock(); // 解锁,以便其他线程可以处理RPC请求
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 避免CPU空转
}
}
实现 Raft 状态机逻辑
1. Follower 状态
- 初始化:所有节点启动时都作为 Follower。
- 选举超时:如果 Follower 在选举超时时间内没有收到来自 Leader 或 Candidate 的有效
AppendEntries或RequestVoteRPC,它将转换为 Candidate 状态并开始一次新的选举。 - RPC 处理:
- RequestVote RPC:
- 如果请求的
term小于当前term,拒绝投票。 - 如果请求的
term等于当前term且已经投票给其他 Candidate,拒绝投票。 - 如果请求的
term大于当前term,或者term相等且尚未投票或已投票给自己,则检查 Candidate 的日志是否至少和自己的日志一样新。- 日志“一样新”的定义:
last_log_term更大,或者last_log_term相等但last_log_index更大。 - 如果日志足够新,投票给 Candidate,更新
current_term和voted_for,并重置选举计时器。
- 日志“一样新”的定义:
- 如果请求的
- AppendEntries RPC:
- 如果请求的
term小于当前term,拒绝。 - 如果请求的
term大于等于当前term,则接受 Leader,更新current_term,并重置选举计时器。 - 检查
prev_log_index和prev_log_term是否与本地日志匹配。- 如果不匹配,返回
success=false,并提供冲突信息以帮助 Leader 快速回溯。 - 如果匹配,则删除从
prev_log_index + 1开始的所有冲突日志条目,然后追加 Leader 发来的新日志条目。
- 如果不匹配,返回
- 更新
commit_index到leader_commit和本地last_log_index的最小值。
- 如果请求的
- RequestVote RPC:
// raft_node.cpp (handleAppendEntries 简化版)
AppendEntriesReply RaftNode::handleAppendEntries(const AppendEntriesArgs& args) {
std::lock_guard<std::mutex> lock(mtx_);
AppendEntriesReply reply;
reply.term = current_term_.load();
reply.success = false;
reply.conflict_term = 0;
reply.conflict_index = 0;
// 1. 如果 Leader 的 Term 小于当前 Term,拒绝
if (args.term < current_term_.load()) {
return reply;
}
// 2. 发现更高 Term 的 Leader,或当前 Leader有效,转换为 Follower
if (args.term > current_term_.load() || state_ != NodeState::Follower) {
become_follower(args.term);
// 如果是新的Leader,需要重置投票信息
if (args.term > current_term_.load()) {
voted_for_.store(0);
}
current_term_.store(args.term); // 更新到Leader的term
reply.term = current_term_.load();
}
reset_election_timer(); // 收到有效心跳或日志,重置选举计时器
// 3. 检查 prev_log_index 和 prev_log_term 是否匹配
// 注意:log_的索引从1开始,log_[0]是虚拟条目
if (args.prev_log_index >= log_.size() || log_[args.prev_log_index].term != args.prev_log_term) {
// 日志不匹配,需要告诉 Leader 回溯
if (args.prev_log_index >= log_.size()) {
reply.conflict_index = log_.size(); // Follower 日志太短
} else {
reply.conflict_term = log_[args.prev_log_index].term;
// 找到该冲突任期的第一个日志条目索引
uint64_t conflict_idx = args.prev_log_index;
while (conflict_idx > 0 && log_[conflict_idx - 1].term == reply.conflict_term) {
conflict_idx--;
}
reply.conflict_index = conflict_idx;
}
return reply;
}
// 4. 如果有新日志条目与现有日志冲突,删除冲突部分
size_t new_entries_start_idx = args.prev_log_index + 1;
size_t num_conflicting_entries = 0;
for (size_t i = 0; i < args.entries.size(); ++i) {
if (new_entries_start_idx + i < log_.size() &&
log_[new_entries_start_idx + i].term != args.entries[i].term) {
num_conflicting_entries = log_.size() - (new_entries_start_idx + i);
log_.erase(log_.begin() + new_entries_start_idx + i, log_.end());
break;
}
}
// 5. 追加 Leader 发来的新日志条目
for (const auto& entry : args.entries) {
// 避免重复添加已存在的日志条目
if (entry.index >= log_.size() || log_[entry.index].term != entry.term) {
// 这里的entry.index可能比log_.size()大,需要填充中间的空洞,但Raft协议保证是连续的
// 实际实现中,如果 prev_log_index + 1 != log_.size(),则存在不一致,需要处理
// 简单处理:直接push_back,假定prev_log_index已正确匹配且冲突已解决
log_.push_back(entry);
}
}
persist_state(); // 日志有变动,持久化状态
// 6. 更新 commit_index
if (args.leader_commit > commit_index_.load()) {
commit_index_.store(std::min(args.leader_commit, get_last_log_index()));
cv_.notify_one(); // 通知应用状态机线程有新的日志可以应用
}
reply.success = true;
return reply;
}
2. Candidate 状态
- 转换:从 Follower 状态因选举超时而转换。
- 选举开始:
- 递增
current_term。 - 投票给自己 (
voted_for = node_id_)。 - 重置选举计时器。
- 向所有其他节点发送
RequestVoteRPC。
- 递增
- RPC 处理:
- AppendEntries RPC:如果收到来自 Leader 的
AppendEntriesRPC (Term 大于等于当前 Term),立即转换回 Follower 状态。 - RequestVote RPC:
- 如果请求的
term大于当前term,则转换回 Follower 状态,并处理该RequestVote。 - 如果请求的
term小于或等于当前term,拒绝投票。
- 如果请求的
- AppendEntries RPC:如果收到来自 Leader 的
- 投票统计:
- 如果收到大多数节点的投票,转换为 Leader 状态。
- 如果在选举超时时间内没有赢得选举,则再次递增
current_term,开始新的选举。
// raft_node.cpp (部分)
void RaftNode::become_candidate() {
state_ = NodeState::Candidate;
current_term_++;
voted_for_.store(node_id_);
persist_state(); // 任期和投票信息改变,持久化
election_start_time_ = std::chrono::steady_clock::now();
// LOG(INFO) << "Node " << node_id_ << " became Candidate for Term " << current_term_.load();
// 启动一个新线程或使用线程池发送 RequestVote RPCs
std::thread([this]() {
send_request_vote_to_peers();
}).detach();
}
void RaftNode::send_request_vote_to_peers() {
RequestVoteArgs args;
args.term = current_term_.load();
args.candidate_id = node_id_;
args.last_log_index = get_last_log_index();
args.last_log_term = get_last_log_term();
std::atomic<int> votes_received(1); // 包含自己的投票
std::atomic<bool> became_leader_or_follower(false);
for (const auto& entry : peer_id_to_addr_map_) {
uint64_t peer_id = entry.first;
if (peer_id == node_id_) continue;
// 异步发送 RequestVote RPC
std::thread([this, peer_id, args, &votes_received, &became_leader_or_follower]() {
// RpcClient::send_request 应该处理网络错误和超时
std::vector<char> reply_data = rpc_client_pool_->get_client(peer_id)->send_request(
"RequestVote", args.serialize(), std::chrono::milliseconds(500));
if (reply_data.empty()) {
// LOG(WARNING) << "RequestVote RPC to " << peer_id << " timed out or failed.";
return;
}
RequestVoteReply reply = RequestVoteReply::deserialize(reply_data);
std::lock_guard<std::mutex> lock(mtx_);
if (state_ != NodeState::Candidate) { // 可能已经变为Leader或Follower
return;
}
if (reply.term > current_term_.load()) {
// 发现更高任期的节点,退回Follower
// LOG(INFO) << "Node " << node_id_ << " found higher term " << reply.term
// << " from " << peer_id << ", stepping down to Follower.";
become_follower(reply.term);
became_leader_or_follower.store(true);
cv_.notify_one(); // 唤醒主循环,重新评估状态
return;
}
if (reply.vote_granted && reply.term == current_term_.load()) {
votes_received++;
if (votes_received.load() > peer_id_to_addr_map_.size() / 2 && !became_leader_or_follower.load()) {
// 赢得多数投票
// LOG(INFO) << "Node " << node_id_ << " won election for Term " << current_term_.load();
become_leader();
became_leader_or_follower.store(true);
cv_.notify_one(); // 唤醒主循环,重新评估状态
}
}
}).detach(); // 使用detach,允许线程独立运行
}
}
3. Leader 状态
- 转换:从 Candidate 状态因赢得选举而转换。
- 初始化:
- 对于每个 Follower,初始化
next_index为last_log_index + 1( Leader 认为 Follower 下一个应该接收的日志索引)。 - 初始化
match_index为 0。 - 立即发送一次心跳 (空的
AppendEntriesRPC) 给所有 Follower。
- 对于每个 Follower,初始化
- 客户端请求:
- 接收客户端命令,将其封装为
LogEntry,附加到自己的日志中。 - 持久化日志。
- 并行向所有 Follower 发送
AppendEntriesRPC。
- 接收客户端命令,将其封装为
- 日志复制:
- Leader 周期性地发送
AppendEntriesRPC (心跳或带有新日志条目)。 - 根据 Follower 返回的
AppendEntriesReply更新next_index和match_index。- 如果
success=true,更新match_index和next_index。 - 如果
success=false(通常是日志不匹配),减少next_index并重试AppendEntries,直到日志匹配。
- 如果
- Leader 周期性地发送
- 提交日志:
- Leader 追踪每个日志条目在大多数 Follower 上被复制的情况。
- 如果存在一个日志条目
N,它在当前term中被 Leader 创建,并且已经被大多数 Follower 复制(即match_index[i] >= N对于大多数i成立),Leader 将commit_index更新为N。 - 然后 Leader 通知 Follower 提交该条目。
- 退位:如果 Leader 发现有更高
term的 RPC (例如来自新 Leader 的AppendEntries),它将立即转换回 Follower 状态。
// raft_node.cpp (部分)
void RaftNode::become_leader() {
state_ = NodeState::Leader;
// LOG(INFO) << "Node " << node_id_ << " became Leader for Term " << current_term_.load();
// 初始化 Leader 状态
uint64_t last_log_idx = get_last_log_index();
for (const auto& entry : peer_id_to_addr_map_) {
uint64_t peer_id = entry.first;
if (peer_id == node_id_) continue;
next_index_[peer_id] = last_log_idx + 1;
match_index_[peer_id] = 0;
}
// 立即发送一次心跳
send_append_entries_to_peers(true);
last_heartbeat_time_ = std::chrono::steady_clock::now();
}
// 客户端调用此函数向 Leader 提议一个命令
bool RaftNode::propose(const std::vector<char>& command) {
std::lock_guard<std::mutex> lock(mtx_);
if (state_ != NodeState::Leader) {
// 客户端需要知道当前的 Leader,或 Leader 转发请求
// LOG(WARNING) << "Node " << node_id_ << " is not Leader, cannot propose command.";
return false;
}
uint64_t new_log_index = get_last_log_index() + 1;
log_.emplace_back(current_term_.load(), new_log_index, command);
persist_state(); // 新日志条目需要持久化
// LOG(INFO) << "Leader " << node_id_ << " proposed command at index " << new_log_index;
// 立即向所有 Follower 复制新日志
send_append_entries_to_peers(false); // 不是心跳,带有新日志
// Leader 还需要等待日志被多数复制并提交,这里可能需要一个异步等待机制
// 例如,返回一个Future,客户端可以等待Future完成
// 简化:假设这里只是发起复制,不等待结果立即返回
return true;
}
void RaftNode::send_append_entries_to_peers(bool is_heartbeat) {
// 异步发送 AppendEntries RPC 到所有 Follower
for (const auto& entry : peer_id_to_addr_map_) {
uint64_t peer_id = entry.first;
if (peer_id == node_id_) continue;
std::thread([this, peer_id, is_heartbeat]() {
std::lock_guard<std::mutex> lock(mtx_); // 锁定以访问 next_index_, match_index_, log_
AppendEntriesArgs args;
args.term = current_term_.load();
args.leader_id = node_id_;
args.leader_commit = commit_index_.load();
uint64_t next_idx_for_peer = next_index_[peer_id];
// 如果 next_idx_for_peer 是0,说明是初始状态或者需要从头开始发送
// 正常情况下,next_idx_for_peer至少为1(对应虚拟条目之后的第一个实际条目)
if (next_idx_for_peer == 0) { // 应该是 last_log_index + 1
next_idx_for_peer = 1; // 假定从第一个实际日志条目开始
}
// prev_log_index 是 next_index_for_peer - 1
args.prev_log_index = next_idx_for_peer - 1;
args.prev_log_term = (args.prev_log_index > 0 && args.prev_log_index < log_.size()) ?
log_[args.prev_log_index].term : 0;
// 如果是心跳,entries为空;否则,发送从 next_idx_for_peer 开始的所有日志条目
if (!is_heartbeat) {
for (size_t i = next_idx_for_peer; i < log_.size(); ++i) {
args.entries.push_back(log_[i]);
}
}
// 发送 RPC
std::vector<char> reply_data = rpc_client_pool_->get_client(peer_id)->send_request(
"AppendEntries", args.serialize(), std::chrono::milliseconds(500));
if (reply_data.empty()) {
// LOG(WARNING) << "AppendEntries RPC to " << peer_id << " timed out or failed.";
return;
}
AppendEntriesReply reply = AppendEntriesReply::deserialize(reply_data);
// 处理回复
std::lock_guard<std::mutex> lock_reply(mtx_); // 再次锁定,防止与主循环冲突
if (state_ != NodeState::Leader) { // 可能在发送期间已经退位
return;
}
if (reply.term > current_term_.load()) {
// 发现更高任期的节点,退回Follower
become_follower(reply.term);
cv_.notify_one();
return;
}
if (reply.success) {
// 成功复制,更新 next_index 和 match_index
// 成功复制的日志条目数量是 args.entries.size()
// match_index 应该是 prev_log_index + entries.size()
match_index_[peer_id] = args.prev_log_index + args.entries.size();
next_index_[peer_id] = match_index_[peer_id] + 1;
} else {
// 日志不匹配,需要回溯 next_index
// 利用 reply.conflict_term 和 reply.conflict_index 快速回溯
if (reply.conflict_term != 0) {
// 尝试在 Leader 日志中找到冲突任期的最后一个条目
uint64_t idx = get_last_log_index();
while (idx > 0 && log_[idx].term > reply.conflict_term) {
idx--;
}
if (idx == 0 || log_[idx].term != reply.conflict_term) {
// Leader 日志中没有该任期,或者该任期早于冲突任期,直接回溯到冲突索引
next_index_[peer_id] = reply.conflict_index;
} else {
// 找到冲突任期,从该任期最后一个条目之后开始发送
next_index_[peer_id] = idx + 1;
}
} else {
// 默认回溯一个条目
next_index_[peer_id] = std::max(1ULL, next_index_[peer_id] - 1);
}
// LOG(INFO) << "Leader " << node_id_ << " failed to append to " << peer_id
// << ", trying to backtrack to next_index " << next_index_[peer_id];
}
// 检查是否可以提交新的日志条目
// 从当前的 commit_index + 1 开始,尝试提交更高索引的日志
uint64_t current_commit_idx = commit_index_.load();
for (uint64_t n = get_last_log_index(); n > current_commit_idx; --n) {
// 只有当前任期的日志条目才能通过计数方式提交
if (log_[n].term != current_term_.load()) {
continue;
}
int replicas = 1; // Leader 自己也有一份
for (const auto& kv : match_index_) {
if (kv.first != node_id_ && kv.second >= n) {
replicas++;
}
}
if (replicas > peer_id_to_addr_map_.size() / 2) {
commit_index_.store(n);
cv_.notify_one(); // 通知应用状态机
// LOG(INFO) << "Leader " << node_id_ << " committed log " << n;
break; // 每次只提交一个最高索引
}
}
}).detach();
}
}
状态机应用
一个独立的线程负责将 commit_index 到 last_applied 之间的日志条目应用到实际的状态机中。
// raft_node.cpp (部分)
void RaftNode::apply_committed_entries() {
while (running_) {
std::unique_lock<std::mutex> lock(mtx_);
// 等待直到有新的日志可以应用,或者停止
cv_.wait(lock, [this]() {
return !running_ || commit_index_.load() > last_applied_.load();
});
if (!running_) {
break;
}
while (last_applied_.load() < commit_index_.load()) {
last_applied_++;
LogEntry entry_to_apply = log_[last_applied_.load()];
lock.unlock(); // 解锁,避免在执行用户回调时阻塞 Raft 核心逻辑
apply_log_callback_(entry_to_apply); // 调用用户提供的回调,应用到存储状态机
lock.lock(); // 重新锁定
}
}
}
挑战与高级主题
1. 日志压缩 (Snapshotting)
随着时间推移,Raft 日志会不断增长,消耗大量存储空间并延长启动时间。日志压缩通过创建快照来解决此问题。快照包含当前状态机的完整状态,以及最新的 last_included_index 和 last_included_term。创建快照后,快照之前的所有日志条目都可以被丢弃。Leader 需要将快照发送给落后的 Follower。
2. 成员变更
动态地添加或移除集群成员是一个复杂的问题。Raft 协议通过两阶段提交的方式安全地处理集群成员变更。Leader 首先将一个包含新旧配置的联合配置条目复制到集群中,待其提交后,再提交一个只包含新配置的条目。
3. 客户端交互
客户端需要知道当前的 Leader 才能发送请求。常见的做法是,客户端随机向一个节点发送请求。如果该节点不是 Leader,它会返回 Leader 的地址(或者一个错误,让客户端重试),客户端缓存 Leader 信息。
4. 性能优化
- 批量提交:将多个客户端请求打包成一个日志条目,减少 RPC 次数和磁盘 I/O。
- 并行复制:Leader 可以并行地向多个 Follower 发送
AppendEntriesRPC。 - 零拷贝:在日志持久化和网络传输中尽量使用零拷贝技术,减少数据在内存中的复制。
5. 容错与测试
- 故障注入:模拟网络分区、节点崩溃、消息丢失/乱序等故障,验证 Raft 实现的健壮性。
- 线性一致性验证:确保所有客户端读写操作都满足线性一致性语义。
结语
在分布式存储系统内核中利用 C++ 实现 Raft 共识协议,是一项充满挑战但极具价值的工作。它要求开发者不仅精通 Raft 协议的每一个细节,还需要深入理解 C++ 的性能特性、并发模型和系统级编程。通过本文的探讨,我们希望能够为读者勾勒出一条清晰的实现路径,并强调在追求极致性能和高可靠性时,C++ 依然是构建此类核心基础设施的卓越选择。一个健壮的 Raft 实现,是分布式存储系统实现高可用、强一致性和数据持久化的基石。