C++ 与 Raft 共识协议:在分布式存储系统内核中利用 C++ 实现高可靠日志复制状态机

在当今高度依赖数据的世界中,分布式存储系统已成为支撑现代应用不可或缺的基础设施。这些系统面临的核心挑战之一是如何在面对节点故障、网络分区等不确定性因素时,依然能提供高可用性和数据一致性。Raft 共识协议正是为解决这一问题而生,它提供了一种易于理解且能保证强一致性的日志复制状态机方案。本讲座将深入探讨如何在分布式存储系统的内核中,利用 C++ 的强大能力,高效、可靠地实现 Raft 共识协议,构建一个高可靠的日志复制状态机。

分布式系统中的共识问题与 Raft 协议的基石

分布式系统中的共识(Consensus)问题,旨在让一个集群中的所有节点就某个提议的值达成一致。在分布式存储场景下,这意味着所有节点需要就操作序列(即日志)的顺序和内容达成一致。一旦达成共识,每个节点都能按照相同的顺序执行这些操作,从而维护一个复制的状态机。

早期的 Paxos 协议虽然理论上完美,但其复杂性使得工程实现极具挑战性。Raft 协议的出现,正是为了在保证与 Paxos 相同安全性和活性的前提下,极大地提升了协议的可理解性和实现难度。Raft 协议的核心思想是“首先理解,然后实现”,它通过精心设计的三种角色、两种 RPC 类型和严格的日志复制规则,使得协议逻辑清晰明了。

Raft 的核心概念与角色

Raft 协议定义了三种节点角色:

  1. Follower (跟随者):被动接收来自 Leader 的日志和心跳。如果长时间未收到 Leader 的消息,Follower 会成为 Candidate。
  2. Candidate (候选者):在选举超时后,Follower 会转换为 Candidate,发起选举以争取成为新的 Leader。
  3. Leader (领导者):负责处理所有客户端请求,管理日志复制,并周期性地向 Follower 发送心跳以维持领导地位。一个 Raft 集群在任何时刻只有一个 Leader。

Raft 协议通过一个不断递增的整数 Term (任期) 来逻辑地划分时间。每个 Term 都从一次选举开始,成功当选的 Leader 在该 Term 内领导集群。如果选举失败,或者 Leader 发现有更高 Term 的节点,它将退位。Term 在 Raft 的安全性和活性中扮演着至关重要的角色。

Raft 的核心 RPCs

Raft 协议主要通过两种 RPC (远程过程调用) 来实现其功能:

  1. RequestVote RPC (请求投票):由 Candidate 发送给其他节点,用于请求投票。
  2. AppendEntries RPC (附加日志条目):由 Leader 发送给 Follower,用于日志复制和心跳。

这些 RPC 不仅承载数据,还包含 Term 信息,用于确保节点始终遵守最新 Term 的领导。

日志复制状态机

Raft 的核心是复制状态机(Replicated State Machine)。客户端请求被封装成命令(Command),这些命令作为日志条目(Log Entry)附加到 Leader 的日志中。Leader 负责将这些日志条目复制到大多数 Follower 节点上。一旦某个日志条目被大多数节点复制并持久化,它就被认为是 已提交 (Committed)。Leader 会通知 Follower 提交这些条目,然后每个节点按序将已提交的日志条目应用到其本地状态机中,从而保证所有节点的状态最终一致。

为何选择 C++ 实现 Raft?

在分布式存储系统的内核中实现 Raft,对性能、资源控制和稳定性有着极高的要求。C++ 在此场景下展现出无与伦比的优势:

  1. 极致性能与低延迟:C++ 允许直接内存管理和底层硬件访问,避免了垃圾回收等高级语言的运行时开销,使得 Raft 协议的关键路径(如日志持久化、网络通信、状态机应用)能够以纳秒级的延迟执行。这对于存储系统吞吐量和响应时间至关重要。
  2. 精细的资源控制:在内核或接近内核的环境中,对内存、CPU 和 I/O 资源的精确控制是必须的。C++ 提供了手动内存管理(new/delete),以及通过 std::unique_ptr/std::shared_ptr 实现智能指针,既能保证性能又能兼顾安全性。
  3. 跨平台兼容性与系统级编程能力:C++ 拥有强大的系统编程能力,能够直接调用操作系统 API,处理文件 I/O、网络套接字、多线程与并发原语,这使得它非常适合构建底层基础设施。
  4. 丰富的生态系统与成熟工具链:尽管 C++ 语言本身是低级的,但其生态系统提供了大量高性能库,如 Boost.Asio 用于异步网络编程,Protobuf 用于高效序列化,以及各种并发工具。
  5. 确定性行为:对于分布式共识协议,确定性是关键。C++ 程序的行为在很大程度上是可预测的,有助于调试和理解复杂的并发逻辑。

Raft 协议的 C++ 实现架构设计

一个 Raft 节点的 C++ 实现需要精心设计其内部结构,以高效管理状态、日志、网络通信和并发。

核心数据结构

1. LogEntry

日志条目是 Raft 协议中复制的基本单元。它包含任期、索引和客户端命令。

// raft_node.h
struct LogEntry {
    uint64_t term;      // 创建该日志条目时的任期
    uint64_t index;     // 日志条目在日志中的索引 (从1开始)
    std::vector<char> command; // 客户端命令的序列化字节
    // 其他元数据,例如CRC校验和,用于数据完整性

    // 默认构造函数
    LogEntry() : term(0), index(0) {}

    // 参数化构造函数
    LogEntry(uint64_t t, uint64_t idx, const std::vector<char>& cmd)
        : term(t), index(idx), command(cmd) {}

    // 序列化与反序列化方法 (可能通过 Protobuf 或自定义格式实现)
    std::vector<char> serialize() const {
        // ... 实现序列化逻辑,例如将term, index, command长度和command数据打包
        // 示例:简单的拼接
        std::vector<char> buffer;
        buffer.resize(sizeof(term) + sizeof(index) + sizeof(uint32_t) + command.size());
        size_t offset = 0;
        memcpy(buffer.data() + offset, &term, sizeof(term)); offset += sizeof(term);
        memcpy(buffer.data() + offset, &index, sizeof(index)); offset += sizeof(index);
        uint32_t cmd_size = command.size();
        memcpy(buffer.data() + offset, &cmd_size, sizeof(cmd_size)); offset += sizeof(cmd_size);
        if (!command.empty()) {
            memcpy(buffer.data() + offset, command.data(), command.size());
        }
        return buffer;
    }

    static LogEntry deserialize(const std::vector<char>& buffer) {
        LogEntry entry;
        size_t offset = 0;
        if (buffer.size() < sizeof(entry.term) + sizeof(entry.index) + sizeof(uint32_t)) {
            // 错误处理:缓冲区过小
            return LogEntry();
        }
        memcpy(&entry.term, buffer.data() + offset, sizeof(entry.term)); offset += sizeof(entry.term);
        memcpy(&entry.index, buffer.data() + offset, sizeof(entry.index)); offset += sizeof(entry.index);
        uint32_t cmd_size;
        memcpy(&cmd_size, buffer.data() + offset, sizeof(cmd_size)); offset += sizeof(cmd_size);
        if (offset + cmd_size > buffer.size()) {
            // 错误处理:命令数据大小不匹配
            return LogEntry();
        }
        entry.command.assign(buffer.data() + offset, buffer.data() + offset + cmd_size);
        return entry;
    }
};

2. RaftNode 类

RaftNode 是核心,封装了节点的所有状态、逻辑和行为。

// raft_node.h
#include <vector>
#include <string>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <map>
#include <chrono>
#include <memory>
#include <functional>

// 前向声明RPC相关结构
struct RequestVoteArgs;
struct RequestVoteReply;
struct AppendEntriesArgs;
struct AppendEntriesReply;

// 节点状态
enum class NodeState {
    Follower,
    Candidate,
    Leader
};

class RaftNode {
public:
    RaftNode(uint64_t id, const std::vector<std::string>& peer_addrs,
             std::function<void(const LogEntry&)> apply_log_callback);
    ~RaftNode();

    void start();
    void stop();

    // 客户端接口
    bool propose(const std::vector<char>& command); // 提议一个新命令

    // RPC 处理函数
    RequestVoteReply handleRequestVote(const RequestVoteArgs& args);
    AppendEntriesReply handleAppendEntries(const AppendEntriesArgs& args);

private:
    uint64_t node_id_;
    std::vector<std::string> peer_addrs_; // 存储集群中所有节点的地址 (包括自身)
    std::map<uint64_t, std::string> peer_id_to_addr_map_; // 节点ID到地址的映射

    // 持久化状态 (需要定期写入磁盘)
    std::atomic<uint64_t> current_term_; // 节点看到的最新任期
    std::atomic<uint64_t> voted_for_;    // 在当前任期投票给的候选者ID (0表示未投票)
    std::vector<LogEntry> log_;          // 日志条目集合

    // 易失性状态 (所有服务器)
    std::atomic<uint64_t> commit_index_; // 已知最高的已提交日志条目索引
    std::atomic<uint64_t> last_applied_; // 已应用到状态机的最高日志条目索引

    // 易失性状态 (Leader 特有)
    std::map<uint64_t, uint64_t> next_index_;  // 对于每个 Follower,Leader 接下来要发送给它的日志条目索引
    std::map<uint64_t, uint64_t> match_index_; // 对于每个 Follower,Leader 已知它已复制的最高日志条目索引

    // 状态机
    NodeState state_;
    std::mutex mtx_; // 保护RaftNode内部状态的互斥锁
    std::condition_variable cv_; // 用于线程间通信

    // 定时器相关
    std::thread timer_thread_;
    std::atomic<bool> running_;
    std::chrono::steady_clock::time_point last_heartbeat_time_; // 上次收到Leader心跳或有效AppendEntries的时间
    std::chrono::steady_clock::time_point election_start_time_; // 当前选举的开始时间
    std::chrono::milliseconds election_timeout_min_;
    std::chrono::milliseconds election_timeout_max_;
    std::chrono::milliseconds heartbeat_interval_;

    // 网络层接口 (抽象)
    std::unique_ptr<RpcServer> rpc_server_;
    std::unique_ptr<RpcClientPool> rpc_client_pool_; // 每个peer一个client,或连接池

    // 状态机应用回调
    std::function<void(const LogEntry&)> apply_log_callback_;

    // 内部方法
    void run_loop(); // Raft主循环
    void become_follower(uint64_t term);
    void become_candidate();
    void become_leader();

    void reset_election_timer();
    std::chrono::milliseconds get_random_election_timeout();

    void send_append_entries_to_peers(bool is_heartbeat = false);
    void send_request_vote_to_peers();

    uint64_t get_last_log_index() const;
    uint64_t get_last_log_term() const;

    void persist_state(); // 持久化 current_term, voted_for, log
    void load_state();    // 从磁盘加载状态

    // 辅助函数,用于检查日志匹配
    bool check_log_match(uint64_t prev_log_index, uint64_t prev_log_term) const;
    void apply_committed_entries(); // 将已提交的日志应用到状态机
};

RPC 接口定义

Raft 协议的 RPCs 需要清晰的参数和返回结构,以便于序列化和网络传输。

// raft_rpc_messages.h
// RequestVote RPC
struct RequestVoteArgs {
    uint64_t term;         // 候选者的任期
    uint64_t candidate_id; // 候选者ID
    uint64_t last_log_index; // 候选者最新日志条目的索引
    uint64_t last_log_term;  // 候选者最新日志条目的任期

    // 序列化/反序列化方法
    std::vector<char> serialize() const { /* ... */ return {}; }
    static RequestVoteArgs deserialize(const std::vector<char>& buffer) { /* ... */ return {}; }
};

struct RequestVoteReply {
    uint64_t term;       // 接收者当前的任期,用于候选者更新自身任期
    bool vote_granted;   // 如果候选者获得了投票,则为 true

    // 序列化/反序列化方法
    std::vector<char> serialize() const { /* ... */ return {}; }
    static RequestVoteReply deserialize(const std::vector<char>& buffer) { /* ... */ return {}; }
};

// AppendEntries RPC
struct AppendEntriesArgs {
    uint64_t term;         // Leader 的任期
    uint64_t leader_id;    // Leader 的 ID
    uint64_t prev_log_index; // 紧随新日志条目之前的日志条目索引
    uint64_t prev_log_term;  // 紧随新日志条目之前的日志条目任期
    std::vector<LogEntry> entries; // 待复制的日志条目 (可能为空,用于心跳)
    uint64_t leader_commit; // Leader 已提交的最高日志条目索引

    // 序列化/反序列化方法
    std::vector<char> serialize() const { /* ... */ return {}; }
    static AppendEntriesArgs deserialize(const std::vector<char>& buffer) { /* ... */ return {}; }
};

struct AppendEntriesReply {
    uint64_t term;    // 接收者当前的任期,用于 Leader 更新自身任期
    bool success;     // 如果 Follower 包含了匹配 prev_log_index 和 prev_log_term 的条目,则为 true
    uint64_t conflict_term; // 用于快速回溯,如果 success 为 false 且日志冲突,此为冲突日志的任期
    uint64_t conflict_index; // 用于快速回溯,如果 success 为 false 且日志冲突,此为冲突日志在该任期内的第一个索引

    // 序列化/反序列化方法
    std::vector<char> serialize() const { /* ... */ return {}; }
    static AppendEntriesReply deserialize(const std::vector<char>& buffer) { /* ... */ return {}; }
};

序列化/反序列化是网络通信的关键。在实际系统中,通常会使用高性能的序列化库,如 Google Protobuf、FlatBuffers 或 MessagePack。这些库能够将 C++ 对象高效地转换为字节流进行网络传输,并反之。

持久化层设计

Raft 协议要求 current_termvoted_forlog 必须是持久化的,以便在节点崩溃重启后能够恢复状态。

// raft_node.cpp (部分)
void RaftNode::persist_state() {
    // 实际实现中,这里应该将 current_term_, voted_for_, log_ 写入到持久存储。
    // 可以使用文件系统,例如一个专门的日志文件,一个元数据文件。
    // 为了性能和原子性,通常会采用 WAL (Write-Ahead Log) 机制。
    // 例如:
    // 1. 创建一个临时文件
    // 2. 将 current_term, voted_for 写入元数据文件
    // 3. 将 log_ entries 逐条或批量写入日志文件
    // 4. fsync() 确保数据写入物理磁盘
    // 5. 原子性地替换旧文件 (rename)

    // 简化示例:将状态序列化到内存,实际应写入磁盘
    std::vector<char> term_data(sizeof(uint64_t));
    uint64_t current_term_val = current_term_.load();
    memcpy(term_data.data(), &current_term_val, sizeof(uint64_t));

    std::vector<char> voted_for_data(sizeof(uint64_t));
    uint64_t voted_for_val = voted_for_.load();
    memcpy(voted_for_data.data(), &voted_for_val, sizeof(uint64_t));

    // 假设有一个持久化存储接口
    // persistence_manager_->write_metadata("current_term", term_data);
    // persistence_manager_->write_metadata("voted_for", voted_for_data);
    // persistence_manager_->write_log_entries(log_);
    // LOG(INFO) << "Raft state persisted.";
}

void RaftNode::load_state() {
    // 从磁盘加载持久化状态
    // 示例:
    // std::vector<char> term_data = persistence_manager_->read_metadata("current_term");
    // if (!term_data.empty()) {
    //     uint64_t current_term_val;
    //     memcpy(&current_term_val, term_data.data(), sizeof(uint64_t));
    //     current_term_.store(current_term_val);
    // }
    // ... 类似地加载 voted_for 和 log

    // 简化:假定初始状态
    current_term_.store(0);
    voted_for_.store(0);
    log_.clear();
    // 确保日志至少包含一个虚拟的空条目,索引为0,任期为0
    // 这有助于简化 prevLogIndex 和 prevLogTerm 的处理
    log_.emplace_back(0, 0, std::vector<char>()); 
    // LOG(INFO) << "Raft state loaded or initialized.";
}

日志的持久化通常采用分段文件和索引的方式,以支持高效的随机读写和修剪(例如,当生成快照时)。

网络通信与并发模型

高性能的 Raft 实现离不开高效的网络通信和并发处理。

1. 网络层抽象

为了解耦 Raft 核心逻辑与具体的网络库,我们通常会定义一个抽象的网络接口。

// rpc_interface.h
class RpcServer {
public:
    virtual ~RpcServer() = default;
    virtual void start() = 0;
    virtual void stop() = 0;
    // 注册RPC处理函数,例如:
    // void register_handler(const std::string& method_name, std::function<std::vector<char>(const std::vector<char>&)> handler);
};

class RpcClient {
public:
    virtual ~RpcClient() = default;
    virtual std::vector<char> send_request(const std::string& method_name, const std::vector<char>& request_data,
                                          std::chrono::milliseconds timeout) = 0;
};

// 实际实现可以使用 Boost.Asio 或裸露的 socket API
// 例如 Boost.Asio 实现的 RpcServer 和 RpcClient

RaftNode 中,rpc_server_ 负责监听传入的 RPC 请求,并将其分派给 handleRequestVotehandleAppendEntries 方法。rpc_client_pool_ 负责向其他节点发送 RPC 请求。

2. 并发模型

Raft 协议的事件驱动性质(定时器、RPC 请求)决定了其并发模型。

  • 主循环线程:一个专用的线程(或 asio::io_context)运行 Raft 协议的主循环,处理定时器事件(选举超时、心跳),并驱动状态转换。
  • RPC 服务线程池:一个线程池处理所有传入的 RPC 请求。每个请求在一个单独的线程中执行其处理逻辑。
  • RPC 客户端线程池/异步发送:对于 Leader 向 Follower 发送的 AppendEntries 请求,可以采用异步非阻塞的方式,或者使用一个小的线程池来并行发送。

互斥锁与条件变量RaftNode 的所有共享状态(current_term_, voted_for_, log_, state_, next_index_, match_index_ 等)都必须受到互斥锁(std::mutex mtx_)的保护,以防止数据竞争。条件变量(std::condition_variable cv_)用于在不同线程之间同步,例如当日志被提交时通知应用状态机线程。

// raft_node.cpp (部分)
void RaftNode::run_loop() {
    // 这是一个简化版的主循环,实际可能与 Boost.Asio 的 io_context 结合
    running_ = true;
    reset_election_timer(); // 初始时作为Follower,启动选举计时器

    while (running_) {
        std::unique_lock<std::mutex> lock(mtx_); // 锁定RaftNode状态

        // 等待选举超时或收到外部信号
        // 使用条件变量等待,直到超时或被其他事件唤醒
        bool timeout = cv_.wait_for(lock, get_random_election_timeout(), [this]() {
            // Predicate: 如果收到心跳或有效AppendEntries,则计时器重置
            // 实际上这个逻辑会在 handleAppendEntries 中更新 last_heartbeat_time_
            // 并且通过 notify_one/all 来唤醒
            return !running_ || (state_ == NodeState::Follower &&
                                 std::chrono::steady_clock::now() < last_heartbeat_time_ + get_random_election_timeout());
            // 上述Predicate需要更精细的控制,例如,如果handleAppendEntries重置了计时器,
            // 那么run_loop应该被唤醒并重新计算等待时间。
            // 简化处理:wait_for直接返回超时或被通知
            return !running_; // 如果running_变为false,则退出循环
        });

        if (!running_) {
            break; // 停止
        }

        switch (state_) {
            case NodeState::Follower: {
                // 如果选举超时,且没有收到Leader的心跳
                if (std::chrono::steady_clock::now() > last_heartbeat_time_ + get_random_election_timeout()) {
                    become_candidate();
                }
                break;
            }
            case NodeState::Candidate: {
                // 选举超时,再次发起选举
                if (std::chrono::steady_clock::now() > election_start_time_ + get_random_election_timeout()) {
                    become_candidate(); // 再次转为Candidate,开始新的任期和选举
                }
                break;
            }
            case NodeState::Leader: {
                // Leader 定期发送心跳
                if (std::chrono::steady_clock::now() > last_heartbeat_time_ + heartbeat_interval_) {
                    send_append_entries_to_peers(true); // 发送心跳 (空的AppendEntries)
                    last_heartbeat_time_ = std::chrono::steady_clock::now();
                }
                break;
            }
        }
        // 在实际应用中,这里可能会使用更复杂的定时器管理,例如 Boost.Asio 的 `steady_timer`
        // 确保每次循环都能处理RPC或定时器事件
        lock.unlock(); // 解锁,以便其他线程可以处理RPC请求
        std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 避免CPU空转
    }
}

实现 Raft 状态机逻辑

1. Follower 状态

  • 初始化:所有节点启动时都作为 Follower。
  • 选举超时:如果 Follower 在选举超时时间内没有收到来自 Leader 或 Candidate 的有效 AppendEntriesRequestVote RPC,它将转换为 Candidate 状态并开始一次新的选举。
  • RPC 处理
    • RequestVote RPC
      • 如果请求的 term 小于当前 term,拒绝投票。
      • 如果请求的 term 等于当前 term 且已经投票给其他 Candidate,拒绝投票。
      • 如果请求的 term 大于当前 term,或者 term 相等且尚未投票或已投票给自己,则检查 Candidate 的日志是否至少和自己的日志一样新。
        • 日志“一样新”的定义:last_log_term 更大,或者 last_log_term 相等但 last_log_index 更大。
        • 如果日志足够新,投票给 Candidate,更新 current_termvoted_for,并重置选举计时器。
    • AppendEntries RPC
      • 如果请求的 term 小于当前 term,拒绝。
      • 如果请求的 term 大于等于当前 term,则接受 Leader,更新 current_term,并重置选举计时器。
      • 检查 prev_log_indexprev_log_term 是否与本地日志匹配。
        • 如果不匹配,返回 success=false,并提供冲突信息以帮助 Leader 快速回溯。
        • 如果匹配,则删除从 prev_log_index + 1 开始的所有冲突日志条目,然后追加 Leader 发来的新日志条目。
      • 更新 commit_indexleader_commit 和本地 last_log_index 的最小值。
// raft_node.cpp (handleAppendEntries 简化版)
AppendEntriesReply RaftNode::handleAppendEntries(const AppendEntriesArgs& args) {
    std::lock_guard<std::mutex> lock(mtx_);
    AppendEntriesReply reply;
    reply.term = current_term_.load();
    reply.success = false;
    reply.conflict_term = 0;
    reply.conflict_index = 0;

    // 1. 如果 Leader 的 Term 小于当前 Term,拒绝
    if (args.term < current_term_.load()) {
        return reply;
    }

    // 2. 发现更高 Term 的 Leader,或当前 Leader有效,转换为 Follower
    if (args.term > current_term_.load() || state_ != NodeState::Follower) {
        become_follower(args.term);
        // 如果是新的Leader,需要重置投票信息
        if (args.term > current_term_.load()) {
            voted_for_.store(0);
        }
        current_term_.store(args.term); // 更新到Leader的term
        reply.term = current_term_.load();
    }

    reset_election_timer(); // 收到有效心跳或日志,重置选举计时器

    // 3. 检查 prev_log_index 和 prev_log_term 是否匹配
    // 注意:log_的索引从1开始,log_[0]是虚拟条目
    if (args.prev_log_index >= log_.size() || log_[args.prev_log_index].term != args.prev_log_term) {
        // 日志不匹配,需要告诉 Leader 回溯
        if (args.prev_log_index >= log_.size()) {
            reply.conflict_index = log_.size(); // Follower 日志太短
        } else {
            reply.conflict_term = log_[args.prev_log_index].term;
            // 找到该冲突任期的第一个日志条目索引
            uint64_t conflict_idx = args.prev_log_index;
            while (conflict_idx > 0 && log_[conflict_idx - 1].term == reply.conflict_term) {
                conflict_idx--;
            }
            reply.conflict_index = conflict_idx;
        }
        return reply;
    }

    // 4. 如果有新日志条目与现有日志冲突,删除冲突部分
    size_t new_entries_start_idx = args.prev_log_index + 1;
    size_t num_conflicting_entries = 0;
    for (size_t i = 0; i < args.entries.size(); ++i) {
        if (new_entries_start_idx + i < log_.size() &&
            log_[new_entries_start_idx + i].term != args.entries[i].term) {
            num_conflicting_entries = log_.size() - (new_entries_start_idx + i);
            log_.erase(log_.begin() + new_entries_start_idx + i, log_.end());
            break;
        }
    }

    // 5. 追加 Leader 发来的新日志条目
    for (const auto& entry : args.entries) {
        // 避免重复添加已存在的日志条目
        if (entry.index >= log_.size() || log_[entry.index].term != entry.term) {
             // 这里的entry.index可能比log_.size()大,需要填充中间的空洞,但Raft协议保证是连续的
             // 实际实现中,如果 prev_log_index + 1 != log_.size(),则存在不一致,需要处理
             // 简单处理:直接push_back,假定prev_log_index已正确匹配且冲突已解决
            log_.push_back(entry);
        }
    }
    persist_state(); // 日志有变动,持久化状态

    // 6. 更新 commit_index
    if (args.leader_commit > commit_index_.load()) {
        commit_index_.store(std::min(args.leader_commit, get_last_log_index()));
        cv_.notify_one(); // 通知应用状态机线程有新的日志可以应用
    }

    reply.success = true;
    return reply;
}

2. Candidate 状态

  • 转换:从 Follower 状态因选举超时而转换。
  • 选举开始
    • 递增 current_term
    • 投票给自己 (voted_for = node_id_)。
    • 重置选举计时器。
    • 向所有其他节点发送 RequestVote RPC。
  • RPC 处理
    • AppendEntries RPC:如果收到来自 Leader 的 AppendEntries RPC (Term 大于等于当前 Term),立即转换回 Follower 状态。
    • RequestVote RPC
      • 如果请求的 term 大于当前 term,则转换回 Follower 状态,并处理该 RequestVote
      • 如果请求的 term 小于或等于当前 term,拒绝投票。
  • 投票统计
    • 如果收到大多数节点的投票,转换为 Leader 状态。
    • 如果在选举超时时间内没有赢得选举,则再次递增 current_term,开始新的选举。
// raft_node.cpp (部分)
void RaftNode::become_candidate() {
    state_ = NodeState::Candidate;
    current_term_++;
    voted_for_.store(node_id_);
    persist_state(); // 任期和投票信息改变,持久化
    election_start_time_ = std::chrono::steady_clock::now();
    // LOG(INFO) << "Node " << node_id_ << " became Candidate for Term " << current_term_.load();

    // 启动一个新线程或使用线程池发送 RequestVote RPCs
    std::thread([this]() {
        send_request_vote_to_peers();
    }).detach();
}

void RaftNode::send_request_vote_to_peers() {
    RequestVoteArgs args;
    args.term = current_term_.load();
    args.candidate_id = node_id_;
    args.last_log_index = get_last_log_index();
    args.last_log_term = get_last_log_term();

    std::atomic<int> votes_received(1); // 包含自己的投票
    std::atomic<bool> became_leader_or_follower(false);

    for (const auto& entry : peer_id_to_addr_map_) {
        uint64_t peer_id = entry.first;
        if (peer_id == node_id_) continue;

        // 异步发送 RequestVote RPC
        std::thread([this, peer_id, args, &votes_received, &became_leader_or_follower]() {
            // RpcClient::send_request 应该处理网络错误和超时
            std::vector<char> reply_data = rpc_client_pool_->get_client(peer_id)->send_request(
                "RequestVote", args.serialize(), std::chrono::milliseconds(500));

            if (reply_data.empty()) {
                // LOG(WARNING) << "RequestVote RPC to " << peer_id << " timed out or failed.";
                return;
            }

            RequestVoteReply reply = RequestVoteReply::deserialize(reply_data);

            std::lock_guard<std::mutex> lock(mtx_);
            if (state_ != NodeState::Candidate) { // 可能已经变为Leader或Follower
                return;
            }

            if (reply.term > current_term_.load()) {
                // 发现更高任期的节点,退回Follower
                // LOG(INFO) << "Node " << node_id_ << " found higher term " << reply.term
                //           << " from " << peer_id << ", stepping down to Follower.";
                become_follower(reply.term);
                became_leader_or_follower.store(true);
                cv_.notify_one(); // 唤醒主循环,重新评估状态
                return;
            }

            if (reply.vote_granted && reply.term == current_term_.load()) {
                votes_received++;
                if (votes_received.load() > peer_id_to_addr_map_.size() / 2 && !became_leader_or_follower.load()) {
                    // 赢得多数投票
                    // LOG(INFO) << "Node " << node_id_ << " won election for Term " << current_term_.load();
                    become_leader();
                    became_leader_or_follower.store(true);
                    cv_.notify_one(); // 唤醒主循环,重新评估状态
                }
            }
        }).detach(); // 使用detach,允许线程独立运行
    }
}

3. Leader 状态

  • 转换:从 Candidate 状态因赢得选举而转换。
  • 初始化
    • 对于每个 Follower,初始化 next_indexlast_log_index + 1 ( Leader 认为 Follower 下一个应该接收的日志索引)。
    • 初始化 match_index 为 0。
    • 立即发送一次心跳 (空的 AppendEntries RPC) 给所有 Follower。
  • 客户端请求
    • 接收客户端命令,将其封装为 LogEntry,附加到自己的日志中。
    • 持久化日志。
    • 并行向所有 Follower 发送 AppendEntries RPC。
  • 日志复制
    • Leader 周期性地发送 AppendEntries RPC (心跳或带有新日志条目)。
    • 根据 Follower 返回的 AppendEntriesReply 更新 next_indexmatch_index
      • 如果 success=true,更新 match_indexnext_index
      • 如果 success=false (通常是日志不匹配),减少 next_index 并重试 AppendEntries,直到日志匹配。
  • 提交日志
    • Leader 追踪每个日志条目在大多数 Follower 上被复制的情况。
    • 如果存在一个日志条目 N,它在当前 term 中被 Leader 创建,并且已经被大多数 Follower 复制(即 match_index[i] >= N 对于大多数 i 成立),Leader 将 commit_index 更新为 N
    • 然后 Leader 通知 Follower 提交该条目。
  • 退位:如果 Leader 发现有更高 term 的 RPC (例如来自新 Leader 的 AppendEntries),它将立即转换回 Follower 状态。
// raft_node.cpp (部分)
void RaftNode::become_leader() {
    state_ = NodeState::Leader;
    // LOG(INFO) << "Node " << node_id_ << " became Leader for Term " << current_term_.load();

    // 初始化 Leader 状态
    uint64_t last_log_idx = get_last_log_index();
    for (const auto& entry : peer_id_to_addr_map_) {
        uint64_t peer_id = entry.first;
        if (peer_id == node_id_) continue;
        next_index_[peer_id] = last_log_idx + 1;
        match_index_[peer_id] = 0;
    }

    // 立即发送一次心跳
    send_append_entries_to_peers(true);
    last_heartbeat_time_ = std::chrono::steady_clock::now();
}

// 客户端调用此函数向 Leader 提议一个命令
bool RaftNode::propose(const std::vector<char>& command) {
    std::lock_guard<std::mutex> lock(mtx_);
    if (state_ != NodeState::Leader) {
        // 客户端需要知道当前的 Leader,或 Leader 转发请求
        // LOG(WARNING) << "Node " << node_id_ << " is not Leader, cannot propose command.";
        return false;
    }

    uint64_t new_log_index = get_last_log_index() + 1;
    log_.emplace_back(current_term_.load(), new_log_index, command);
    persist_state(); // 新日志条目需要持久化
    // LOG(INFO) << "Leader " << node_id_ << " proposed command at index " << new_log_index;

    // 立即向所有 Follower 复制新日志
    send_append_entries_to_peers(false); // 不是心跳,带有新日志

    // Leader 还需要等待日志被多数复制并提交,这里可能需要一个异步等待机制
    // 例如,返回一个Future,客户端可以等待Future完成
    // 简化:假设这里只是发起复制,不等待结果立即返回
    return true;
}

void RaftNode::send_append_entries_to_peers(bool is_heartbeat) {
    // 异步发送 AppendEntries RPC 到所有 Follower
    for (const auto& entry : peer_id_to_addr_map_) {
        uint64_t peer_id = entry.first;
        if (peer_id == node_id_) continue;

        std::thread([this, peer_id, is_heartbeat]() {
            std::lock_guard<std::mutex> lock(mtx_); // 锁定以访问 next_index_, match_index_, log_

            AppendEntriesArgs args;
            args.term = current_term_.load();
            args.leader_id = node_id_;
            args.leader_commit = commit_index_.load();

            uint64_t next_idx_for_peer = next_index_[peer_id];
            // 如果 next_idx_for_peer 是0,说明是初始状态或者需要从头开始发送
            // 正常情况下,next_idx_for_peer至少为1(对应虚拟条目之后的第一个实际条目)
            if (next_idx_for_peer == 0) { // 应该是 last_log_index + 1
                next_idx_for_peer = 1; // 假定从第一个实际日志条目开始
            }

            // prev_log_index 是 next_index_for_peer - 1
            args.prev_log_index = next_idx_for_peer - 1;
            args.prev_log_term = (args.prev_log_index > 0 && args.prev_log_index < log_.size()) ? 
                                 log_[args.prev_log_index].term : 0;

            // 如果是心跳,entries为空;否则,发送从 next_idx_for_peer 开始的所有日志条目
            if (!is_heartbeat) {
                for (size_t i = next_idx_for_peer; i < log_.size(); ++i) {
                    args.entries.push_back(log_[i]);
                }
            }

            // 发送 RPC
            std::vector<char> reply_data = rpc_client_pool_->get_client(peer_id)->send_request(
                "AppendEntries", args.serialize(), std::chrono::milliseconds(500));

            if (reply_data.empty()) {
                // LOG(WARNING) << "AppendEntries RPC to " << peer_id << " timed out or failed.";
                return;
            }

            AppendEntriesReply reply = AppendEntriesReply::deserialize(reply_data);

            // 处理回复
            std::lock_guard<std::mutex> lock_reply(mtx_); // 再次锁定,防止与主循环冲突
            if (state_ != NodeState::Leader) { // 可能在发送期间已经退位
                return;
            }

            if (reply.term > current_term_.load()) {
                // 发现更高任期的节点,退回Follower
                become_follower(reply.term);
                cv_.notify_one();
                return;
            }

            if (reply.success) {
                // 成功复制,更新 next_index 和 match_index
                // 成功复制的日志条目数量是 args.entries.size()
                // match_index 应该是 prev_log_index + entries.size()
                match_index_[peer_id] = args.prev_log_index + args.entries.size();
                next_index_[peer_id] = match_index_[peer_id] + 1;
            } else {
                // 日志不匹配,需要回溯 next_index
                // 利用 reply.conflict_term 和 reply.conflict_index 快速回溯
                if (reply.conflict_term != 0) {
                    // 尝试在 Leader 日志中找到冲突任期的最后一个条目
                    uint64_t idx = get_last_log_index();
                    while (idx > 0 && log_[idx].term > reply.conflict_term) {
                        idx--;
                    }
                    if (idx == 0 || log_[idx].term != reply.conflict_term) {
                        // Leader 日志中没有该任期,或者该任期早于冲突任期,直接回溯到冲突索引
                        next_index_[peer_id] = reply.conflict_index;
                    } else {
                        // 找到冲突任期,从该任期最后一个条目之后开始发送
                        next_index_[peer_id] = idx + 1;
                    }
                } else {
                    // 默认回溯一个条目
                    next_index_[peer_id] = std::max(1ULL, next_index_[peer_id] - 1);
                }
                // LOG(INFO) << "Leader " << node_id_ << " failed to append to " << peer_id
                //           << ", trying to backtrack to next_index " << next_index_[peer_id];
            }

            // 检查是否可以提交新的日志条目
            // 从当前的 commit_index + 1 开始,尝试提交更高索引的日志
            uint64_t current_commit_idx = commit_index_.load();
            for (uint64_t n = get_last_log_index(); n > current_commit_idx; --n) {
                // 只有当前任期的日志条目才能通过计数方式提交
                if (log_[n].term != current_term_.load()) {
                    continue;
                }
                int replicas = 1; // Leader 自己也有一份
                for (const auto& kv : match_index_) {
                    if (kv.first != node_id_ && kv.second >= n) {
                        replicas++;
                    }
                }
                if (replicas > peer_id_to_addr_map_.size() / 2) {
                    commit_index_.store(n);
                    cv_.notify_one(); // 通知应用状态机
                    // LOG(INFO) << "Leader " << node_id_ << " committed log " << n;
                    break; // 每次只提交一个最高索引
                }
            }
        }).detach();
    }
}

状态机应用

一个独立的线程负责将 commit_indexlast_applied 之间的日志条目应用到实际的状态机中。

// raft_node.cpp (部分)
void RaftNode::apply_committed_entries() {
    while (running_) {
        std::unique_lock<std::mutex> lock(mtx_);
        // 等待直到有新的日志可以应用,或者停止
        cv_.wait(lock, [this]() {
            return !running_ || commit_index_.load() > last_applied_.load();
        });

        if (!running_) {
            break;
        }

        while (last_applied_.load() < commit_index_.load()) {
            last_applied_++;
            LogEntry entry_to_apply = log_[last_applied_.load()];
            lock.unlock(); // 解锁,避免在执行用户回调时阻塞 Raft 核心逻辑
            apply_log_callback_(entry_to_apply); // 调用用户提供的回调,应用到存储状态机
            lock.lock(); // 重新锁定
        }
    }
}

挑战与高级主题

1. 日志压缩 (Snapshotting)

随着时间推移,Raft 日志会不断增长,消耗大量存储空间并延长启动时间。日志压缩通过创建快照来解决此问题。快照包含当前状态机的完整状态,以及最新的 last_included_indexlast_included_term。创建快照后,快照之前的所有日志条目都可以被丢弃。Leader 需要将快照发送给落后的 Follower。

2. 成员变更

动态地添加或移除集群成员是一个复杂的问题。Raft 协议通过两阶段提交的方式安全地处理集群成员变更。Leader 首先将一个包含新旧配置的联合配置条目复制到集群中,待其提交后,再提交一个只包含新配置的条目。

3. 客户端交互

客户端需要知道当前的 Leader 才能发送请求。常见的做法是,客户端随机向一个节点发送请求。如果该节点不是 Leader,它会返回 Leader 的地址(或者一个错误,让客户端重试),客户端缓存 Leader 信息。

4. 性能优化

  • 批量提交:将多个客户端请求打包成一个日志条目,减少 RPC 次数和磁盘 I/O。
  • 并行复制:Leader 可以并行地向多个 Follower 发送 AppendEntries RPC。
  • 零拷贝:在日志持久化和网络传输中尽量使用零拷贝技术,减少数据在内存中的复制。

5. 容错与测试

  • 故障注入:模拟网络分区、节点崩溃、消息丢失/乱序等故障,验证 Raft 实现的健壮性。
  • 线性一致性验证:确保所有客户端读写操作都满足线性一致性语义。

结语

在分布式存储系统内核中利用 C++ 实现 Raft 共识协议,是一项充满挑战但极具价值的工作。它要求开发者不仅精通 Raft 协议的每一个细节,还需要深入理解 C++ 的性能特性、并发模型和系统级编程。通过本文的探讨,我们希望能够为读者勾勒出一条清晰的实现路径,并强调在追求极致性能和高可靠性时,C++ 依然是构建此类核心基础设施的卓越选择。一个健壮的 Raft 实现,是分布式存储系统实现高可用、强一致性和数据持久化的基石。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注