C++ 与 Paxos 选主算法:在分布式 C++ 协同系统中利用异步状态机实现高可靠的角色转换逻辑

尊敬的各位专家、同事们,大家好!

在当今瞬息万变的数字化时代,分布式系统已成为支撑现代应用不可或缺的基石。从云计算平台到大数据处理,从微服务架构到区块链技术,分布式系统的身影无处不在。然而,随着系统规模的扩大和复杂性的增加,如何确保这些系统的高可用性、数据一致性与故障容忍能力,成为了我们面临的核心挑战。今天,我将与大家深入探讨一个在分布式系统中至关重要的议题:如何利用C++和异步状态机,实现基于Paxos算法的高可靠选主逻辑,从而构建一个坚如磐石的分布式协同系统。

分布式系统的基石:为何需要选主?

我们先从分布式系统的基本概念和选主的需求说起。一个分布式系统由多台独立的计算机节点组成,它们通过网络协同工作,共同完成某个任务或提供服务。这种架构带来了诸多优势,如高可用性(单个节点故障不影响整体服务)、可伸缩性(通过增加节点扩展处理能力)和高性能(并行处理)。

然而,分布式系统也伴随着固有挑战:

  1. 并发性与一致性: 多个节点同时操作共享数据时,如何保证数据的一致性?
  2. 故障容忍: 节点可能随时崩溃、网络可能出现分区、消息可能丢失或延迟,系统如何在这种不确定性下保持正常运行?
  3. 协调与同步: 节点之间如何有效地协调行动,避免冲突和死锁?

在许多分布式场景中,为了简化系统设计、确保决策的唯一性和数据的一致性,我们需要一个“领导者”(Leader)角色。领导者负责协调所有从属节点(Follower)的行为,例如:

  • 主备复制: 领导者负责接收所有写入请求,并将其复制到备份节点。
  • 分布式锁: 领导者可以作为分布式锁服务的协调者,确保资源互斥访问。
  • 任务调度: 领导者负责分配任务给工作节点。
  • 配置管理: 领导者统一管理和分发系统配置。

选主机制的目标是:在任何时候,系统中只能有一个被公认的领导者,并且当当前领导者失效时,系统能够自动、快速、可靠地选举出新的领导者。如果选主机制出现问题,例如出现“脑裂”(Split-Brain,即多个节点都认为自己是领导者),将导致数据不一致、服务混乱甚至系统崩溃的灾难性后果。因此,一个健壮的选主算法是构建高可靠分布式系统的核心。

Paxos:分布式共识的基石

在众多分布式共识算法中,Paxos算法以其卓越的容错性和严谨的数学证明,被誉为分布式领域的“圣杯”。它由Leslie Lamport于1990年提出,旨在解决分布式系统在面对节点故障、网络延迟和消息丢失等不确定性时,如何就某个单一值达成一致的问题。

Paxos算法的核心思想是,通过Proposer(提议者)、Acceptor(接受者)和Learner(学习者)这三种角色的协作,即使在多数节点故障的情况下,也能保证最终只有一个值被“选定”(chosen)。在选主场景中,这个“值”就是当前集群中领导者的ID。

Paxos算法的角色

在Paxos算法中,节点可以扮演以下一个或多个角色:

角色 职责
Proposer (提议者) 提出一个值(在选主场景中是希望成为领导者的节点ID),并试图说服Acceptor接受这个值。Proposer必须生成一个唯一的提案编号,并在收到多数Acceptor的承诺后,才能将提案发送给Acceptor。
Acceptor (接受者) 接收Proposer的提案。Acceptor会根据提案编号和自身的内部状态(最高已承诺的提案编号和最高已接受的提案)来决定是否承诺或接受提案。一个Acceptor可以承诺接受多个提案,但只能接受一个值。
Learner (学习者) 从Acceptor那里学习被选定的值。Learner本身不参与提案过程,只负责接收并记录最终的共识结果。在选主场景中,所有节点最终都应知道谁是当前的领导者。

在一个实际的分布式系统中,一个物理节点通常会同时扮演Proposer、Acceptor和Learner的角色。当它尝试成为领导者时,它就是Proposer;当它响应其他节点的提案时,它就是Acceptor;当它得知最终的领导者是谁时,它就是Learner。

Paxos算法的两个阶段

Paxos算法的核心是两个阶段:准备阶段(Phase 1)接受阶段(Phase 2)

阶段1:准备阶段 (Prepare Phase)

目标:Proposer尝试获取Acceptor的“承诺”,以便后续提交自己的提案。

  1. 1a. Prepare 请求:

    • Proposer选择一个唯一的提案编号 n(这个编号必须是单调递增的,通常由一个递增的计数器和Proposer的ID组合而成,以确保全局唯一性)。
    • Proposer向所有(或多数)Acceptor发送一个 Prepare(n) 请求。
  2. 1b. Promise 响应:

    • Acceptor收到 Prepare(n) 请求后,会检查 n 是否大于它之前已经承诺过的任何提案编号。
    • 如果 n 小于等于 Acceptor 已经承诺的最高提案编号,Acceptor 会忽略这个请求或者返回一个拒绝响应。
    • 如果 n 大于 Acceptor 已经承诺的最高提案编号:
      • Acceptor 记录下 n 作为它承诺的最高提案编号,并“承诺”不再接受任何小于 n 的提案。
      • Acceptor 回复 Proposer 一个 Promise(n, accepted_value, accepted_proposal_number) 响应。如果 Acceptor 之前已经接受过某个值 accepted_value 及其提案编号 accepted_proposal_number,它会一并告知Proposer。否则,accepted_value 为空。

阶段2:接受阶段 (Accept Phase)

目标:Proposer在获得多数Acceptor的承诺后,尝试让它们接受一个特定的值。

  1. 2a. Accept 请求:

    • Proposer在收到来自多数(N/2 + 1)Acceptor的 Promise 响应后,进入此阶段。
    • Proposer需要决定要提议的值 v
      • 如果在所有收到的 Promise 响应中,有任何一个Acceptor报告它已经接受过一个值,Proposer必须选择其中提案编号最大的那个值作为 v
      • 如果所有收到的 Promise 响应都表示它们从未接受过任何值,Proposer可以自由选择一个新值 v(在选主场景中,这个 v 就是Proposer自身的ID)。
    • Proposer向所有(或多数)Acceptor发送一个 Accept(n, v) 请求,其中 n 是之前 Prepare 请求中的提案编号,v 是决定好的值。
  2. 2b. Accepted 响应:

    • Acceptor收到 Accept(n, v) 请求后,会检查 n 是否大于或等于它当前已承诺的最高提案编号。
    • 如果 n 小于它已承诺的最高提案编号,Acceptor 会拒绝这个请求。
    • 如果 n 大于或等于它已承诺的最高提案编号:
      • Acceptor 接受这个提案,记录下 nv 作为它已接受的提案和值。
      • Acceptor 回复 Proposer 一个 Accepted(n, v) 响应。同时,Acceptor 可以将这个 (n, v) 通知给所有Learner。

Paxos的安全性与活性

  • 安全性 (Safety): Paxos保证最终只有一个值被选定。一旦一个值被选定,它将永远不会改变。这是通过严格的提案编号比较和多数派机制来实现的。
  • 活性 (Liveness): Paxos在非拜占庭故障(即节点不会恶意行为,只会崩溃或网络延迟)和多数节点可用的情况下,最终能够选定一个值。然而,在某些极端情况下(例如多个Proposer同时发起提案且每次都未能获得多数),可能会出现“活锁”,即系统不断地进行选举,但没有一个提案能最终被接受。实际实现中需要加入随机退避和超时机制来缓解。

在选主场景中,Paxos的强大之处在于,即使在复杂的分布式环境下,也能保证最终所有节点都能一致地识别出唯一的领导者。

C++与异步状态机实现高可靠选主逻辑

现在,我们将Paxos算法的理论应用于实践,探讨如何使用C++和异步状态机来实现一个高可靠的选主模块。C++以其高性能、精细的内存控制和丰富的并发原语,是构建底层分布式服务理想的选择。异步状态机模式则能优雅地处理网络通信的非确定性、超时机制以及复杂的协议状态转换。

异步状态机设计理念

一个分布式节点在进行选主时,其内部状态会根据接收到的消息、定时器事件以及自身发起的行为而不断变化。异步状态机(Asynchronous State Machine, ASM)是管理这种复杂行为的有效模式。

ASM的优势:

  • 清晰的逻辑: 将系统的行为分解为明确定义的状态和状态转换,易于理解和维护。
  • 非阻塞: 避免使用阻塞I/O,通过事件驱动的方式处理并发操作,提高系统吞吐量。
  • 故障容忍: 能够更好地管理超时和重试逻辑,应对网络不可靠性。

我们将为每个参与选主的节点设计一个状态机。

核心组件与数据结构

  1. NodeId 节点的唯一标识符,通常是整数或字符串。

  2. ProposalNumber 提案编号,由 (epoch_timestamp, node_id) 组成,保证全局唯一性和单调递增。epoch_timestamp 可以是当前时间戳,node_id 用于在同一时间戳下区分不同Proposer的提案。

  3. 消息结构: 定义用于节点间通信的消息类型。

    #include <string>
    #include <vector>
    #include <chrono>
    #include <map>
    #include <mutex>
    #include <thread>
    #include <condition_variable>
    #include <iostream>
    #include <random>
    #include <set>
    #include <optional>
    
    // 节点ID
    using NodeId = int;
    
    // 提案编号,由时间戳和节点ID组成,保证全局唯一且单调递增
    struct ProposalNumber {
        long long timestamp;
        NodeId proposer_id;
    
        bool operator<(const ProposalNumber& other) const {
            if (timestamp != other.timestamp) {
                return timestamp < other.timestamp;
            }
            return proposer_id < other.proposer_id;
        }
    
        bool operator==(const ProposalNumber& other) const {
            return timestamp == other.timestamp && proposer_id == other.proposer_id;
        }
    
        bool operator<=(const ProposalNumber& other) const {
            return *this < other || *this == other;
        }
    
        bool operator>(const ProposalNumber& other) const {
            return !(*this <= other);
        }
    
        bool operator>=(const ProposalNumber& other) const {
            return !(*this < other);
        }
    };
    
    // 为std::map和std::set使用ProposalNumber作为键提供hash支持
    namespace std {
        template <> struct hash<ProposalNumber> {
            size_t operator()(const ProposalNumber& p) const {
                return hash<long long>()(p.timestamp) ^ (hash<NodeId>()(p.proposer_id) << 1);
            }
        };
    }
    
    // Paxos消息类型
    enum class MessageType {
        PREPARE,
        PROMISE,
        ACCEPT,
        ACCEPTED,
        HEARTBEAT // 用于领导者维持心跳
    };
    
    // 所有消息的基类
    struct BaseMessage {
        MessageType type;
        NodeId sender_id;
        NodeId receiver_id; // 接收者ID,如果是广播则可能为-1或特殊值
    
        BaseMessage(MessageType t, NodeId s, NodeId r) : type(t), sender_id(s), receiver_id(r) {}
        virtual ~BaseMessage() = default;
    };
    
    // Prepare消息
    struct PrepareMessage : public BaseMessage {
        ProposalNumber proposal_num;
    
        PrepareMessage(NodeId s, NodeId r, ProposalNumber pn)
            : BaseMessage(MessageType::PREPARE, s, r), proposal_num(pn) {}
    };
    
    // Promise消息
    struct PromiseMessage : public BaseMessage {
        ProposalNumber proposal_num; // 承诺的提案编号
        std::optional<ProposalNumber> accepted_proposal_num; // 如果Acceptor之前接受过值,则为该值的提案编号
        std::optional<NodeId> accepted_value; // 如果Acceptor之前接受过值,则为该值 (leader ID)
    
        PromiseMessage(NodeId s, NodeId r, ProposalNumber pn,
                       std::optional<ProposalNumber> apn = std::nullopt,
                       std::optional<NodeId> av = std::nullopt)
            : BaseMessage(MessageType::PROMISE, s, r),
              proposal_num(pn), accepted_proposal_num(apn), accepted_value(av) {}
    };
    
    // Accept消息
    struct AcceptMessage : public BaseMessage {
        ProposalNumber proposal_num; // 提议的提案编号
        NodeId value; // 提议的值 (leader ID)
    
        AcceptMessage(NodeId s, NodeId r, ProposalNumber pn, NodeId v)
            : BaseMessage(MessageType::ACCEPT, s, r), proposal_num(pn), value(v) {}
    };
    
    // Accepted消息
    struct AcceptedMessage : public BaseMessage {
        ProposalNumber proposal_num; // 已接受的提案编号
        NodeId value; // 已接受的值 (leader ID)
    
        AcceptedMessage(NodeId s, NodeId r, ProposalNumber pn, NodeId v)
            : BaseMessage(MessageType::ACCEPTED, s, r), proposal_num(pn), value(v) {}
    };
    
    // Heartbeat消息
    struct HeartbeatMessage : public BaseMessage {
        NodeId leader_id; // 当前领导者ID
        long long term; // 领导者任期,用于区分过期的心跳
    
        HeartbeatMessage(NodeId s, NodeId r, NodeId lid, long long t)
            : BaseMessage(MessageType::HEARTBEAT, s, r), leader_id(lid), term(t) {}
    };
    
    // 抽象网络接口,用于发送和接收消息
    class NetworkInterface {
    public:
        virtual ~NetworkInterface() = default;
        virtual void send_message(NodeId receiver_id, std::unique_ptr<BaseMessage> msg) = 0;
        virtual void broadcast_message(std::unique_ptr<BaseMessage> msg, const std::set<NodeId>& exclude_nodes) = 0;
        // 模拟消息接收,实际中通常通过回调或队列实现
        virtual std::unique_ptr<BaseMessage> receive_message() = 0;
    };
  4. PaxosNode 状态: 节点可能处于的几种状态。

    enum class PaxosNodeState {
        FOLLOWER,   // 跟随者,等待领导者心跳或发起选举
        CANDIDATE,  // 候选者,正在尝试成为领导者 (Proposer 角色)
        LEADER      // 领导者,负责发送心跳和协调
    };
  5. PaxosNode 类: 封装一个Paxos节点的全部逻辑。

    class PaxosNode {
    private:
        NodeId my_id;
        std::set<NodeId> peer_ids; // 其他节点的ID
        std::unique_ptr<NetworkInterface> network_interface;
    
        PaxosNodeState current_state;
        NodeId current_leader_id; // 当前已知的领导者ID
        long long current_term;   // 当前领导者任期
    
        // Proposer 状态
        ProposalNumber current_proposal_num; // 当前Proposer使用的提案编号
        std::map<NodeId, PromiseMessage> received_promises; // 存储收到的Promise响应
        int promise_count; // 收到承诺的数量
    
        // Acceptor 状态
        ProposalNumber highest_promised_num; // 接受者已承诺的最高提案编号
        std::optional<ProposalNumber> accepted_proposal_num; // 接受者已接受的提案编号
        std::optional<NodeId> accepted_value; // 接受者已接受的值 (leader ID)
    
        // Learner 状态
        NodeId decided_leader_id; // 最终确认的领导者ID
    
        // 选举超时计时器
        std::chrono::steady_clock::time_point last_heartbeat_time;
        std::chrono::milliseconds election_timeout_min;
        std::chrono::milliseconds election_timeout_max;
        std::chrono::milliseconds heartbeat_interval;
        std::unique_ptr<std::thread> timer_thread;
        std::unique_ptr<std::thread> receiver_thread;
        std::mutex mtx;
        std::condition_variable cv_timer;
        std::condition_variable cv_message;
        bool running;
        std::queue<std::unique_ptr<BaseMessage>> message_queue; // 异步消息队列
    
        std::default_random_engine random_engine;
    
        void generate_new_proposal_number() {
            current_proposal_num = {std::chrono::duration_cast<std::chrono::milliseconds>(
                                        std::chrono::system_clock::now().time_since_epoch()).count(),
                                    my_id};
            std::cout << "Node " << my_id << " generated new proposal number: "
                      << current_proposal_num.timestamp << "." << current_proposal_num.proposer_id << std::endl;
        }
    
        void reset_election_timeout() {
            last_heartbeat_time = std::chrono::steady_clock::now();
            // 在FOLLOWER状态下,随机化超时,避免活锁
            if (current_state == PaxosNodeState::FOLLOWER) {
                std::uniform_int_distribution<int> dist(election_timeout_min.count(), election_timeout_max.count());
                election_timeout_min = std::chrono::milliseconds(dist(random_engine)); // 更新下次超时
            }
            std::cout << "Node " << my_id << " reset election timeout, next timeout in "
                      << election_timeout_min.count() << "ms." << std::endl;
        }
    
        void timer_loop() {
            while (running) {
                std::unique_lock<std::mutex> lock(mtx);
                auto now = std::chrono::steady_clock::now();
                std::chrono::milliseconds time_since_last_heartbeat =
                    std::chrono::duration_cast<std::chrono::milliseconds>(now - last_heartbeat_time);
    
                if (current_state == PaxosNodeState::LEADER) {
                    // 领导者定期发送心跳
                    if (time_since_last_heartbeat >= heartbeat_interval) {
                        lock.unlock(); // 释放锁,防止发送消息时阻塞其他操作
                        send_heartbeat();
                        lock.lock(); // 重新获取锁
                        last_heartbeat_time = std::chrono::steady_clock::now(); // 重置心跳计时
                    }
                } else if (current_state == PaxosNodeState::FOLLOWER || current_state == PaxosNodeState::CANDIDATE) {
                    // 跟随者和候选者等待领导者心跳或进行选举
                    if (time_since_last_heartbeat >= election_timeout_min) {
                        std::cout << "Node " << my_id << " election timeout! Current state: "
                                  << (current_state == PaxosNodeState::FOLLOWER ? "FOLLOWER" : "CANDIDATE")
                                  << ". Starting new election." << std::endl;
                        start_election();
                        reset_election_timeout(); // 每次超时后重置计时器
                    }
                }
                cv_timer.wait_for(lock, std::chrono::milliseconds(100)); // 每100ms检查一次
            }
        }
    
        void receiver_loop() {
            while (running) {
                std::unique_ptr<BaseMessage> msg = network_interface->receive_message();
                if (msg) {
                    std::unique_lock<std::mutex> lock(mtx);
                    message_queue.push(std::move(msg));
                    cv_message.notify_one(); // 通知主线程有新消息
                }
                // 避免忙等,适当休眠
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
            }
        }
    
        void process_message_queue() {
            while (running) {
                std::unique_lock<std::mutex> lock(mtx);
                cv_message.wait(lock, [this]{ return !message_queue.empty() || !running; });
                if (!running) break;
    
                while (!message_queue.empty()) {
                    std::unique_ptr<BaseMessage> msg = std::move(message_queue.front());
                    message_queue.pop();
                    lock.unlock(); // 处理消息时释放锁,允许其他线程访问共享资源
                    handle_message(std::move(msg));
                    lock.lock(); // 处理完消息后重新获取锁
                }
            }
        }
    
    public:
        PaxosNode(NodeId id, const std::set<NodeId>& peers, std::unique_ptr<NetworkInterface> net_if)
            : my_id(id), peer_ids(peers), network_interface(std::move(net_if)),
              current_state(PaxosNodeState::FOLLOWER),
              current_leader_id(0), // 初始未知
              current_term(0),
              promise_count(0),
              highest_promised_num({0, 0}),
              running(false),
              election_timeout_min(150), // 默认最小选举超时
              election_timeout_max(300), // 默认最大选举超时
              heartbeat_interval(50),
              random_engine(std::chrono::system_clock::now().time_since_epoch().count() + id) {
            reset_election_timeout();
        }
    
        ~PaxosNode() {
            stop();
        }
    
        void start() {
            if (running) return;
            running = true;
            timer_thread = std::make_unique<std::thread>(&PaxosNode::timer_loop, this);
            receiver_thread = std::make_unique<std::thread>(&PaxosNode::receiver_loop, this);
            // 主线程也可以处理消息,这里用一个单独的线程模拟主循环处理消息
            // 或者可以合并到timer_loop或main函数中
            // std::thread msg_processor_thread(&PaxosNode::process_message_queue, this);
            // msg_processor_thread.detach(); // 模拟后台处理
            std::cout << "Node " << my_id << " started." << std::endl;
        }
    
        void stop() {
            if (!running) return;
            running = false;
            cv_timer.notify_all();
            cv_message.notify_all();
            if (timer_thread && timer_thread->joinable()) {
                timer_thread->join();
            }
            if (receiver_thread && receiver_thread->joinable()) {
                receiver_thread->join();
            }
            std::cout << "Node " << my_id << " stopped." << std::endl;
        }
    
        // 处理接收到的消息
        void handle_message(std::unique_ptr<BaseMessage> msg) {
            std::unique_lock<std::mutex> lock(mtx); // 保护内部状态
            std::cout << "Node " << my_id << " received message from " << msg->sender_id << " of type "
                      << static_cast<int>(msg->type) << std::endl;
    
            switch (msg->type) {
                case MessageType::PREPARE:
                    handle_prepare(static_cast<PrepareMessage&>(*msg));
                    break;
                case MessageType::PROMISE:
                    handle_promise(static_cast<PromiseMessage&>(*msg));
                    break;
                case MessageType::ACCEPT:
                    handle_accept(static_cast<AcceptMessage&>(*msg));
                    break;
                case MessageType::ACCEPTED:
                    handle_accepted(static_cast<AcceptedMessage&>(*msg));
                    break;
                case MessageType::HEARTBEAT:
                    handle_heartbeat(static_cast<HeartbeatMessage&>(*msg));
                    break;
                default:
                    std::cerr << "Node " << my_id << " received unknown message type." << std::endl;
            }
        }
    
    private:
        // Paxos Proposer/Acceptor/Learner 逻辑实现
    
        void start_election() {
            std::lock_guard<std::mutex> lock(mtx);
            current_state = PaxosNodeState::CANDIDATE;
            current_term++; // 每次选举开始,增加任期
            generate_new_proposal_number(); // 生成新的提案编号
            received_promises.clear();
            promise_count = 0;
    
            std::cout << "Node " << my_id << " is starting an election (term " << current_term
                      << ") with proposal " << current_proposal_num.timestamp << "." << current_proposal_num.proposer_id << std::endl;
    
            // Phase 1a: Send Prepare messages
            for (NodeId peer_id : peer_ids) {
                if (peer_id == my_id) continue;
                network_interface->send_message(peer_id,
                                                std::make_unique<PrepareMessage>(my_id, peer_id, current_proposal_num));
            }
        }
    
        void handle_prepare(const PrepareMessage& msg) {
            std::lock_guard<std::mutex> lock(mtx);
            // 如果收到更高任期或更高提案编号的Prepare,则退回FOLLOWER
            if (msg.proposal_num > highest_promised_num) {
                highest_promised_num = msg.proposal_num;
                // 如果当前是领导者,但收到更高提案的Prepare,也需要退位
                if (current_state == PaxosNodeState::LEADER) {
                    std::cout << "Node " << my_id << " (LEADER) received higher PREPARE from " << msg.sender_id
                              << ", stepping down to FOLLOWER." << std::endl;
                    current_state = PaxosNodeState::FOLLOWER;
                }
                // 如果是候选者,但收到更高提案的Prepare,也需要退回FOLLOWER
                else if (current_state == PaxosNodeState::CANDIDATE && msg.proposal_num > current_proposal_num) {
                    std::cout << "Node " << my_id << " (CANDIDATE) received higher PREPARE from " << msg.sender_id
                              << ", stepping down to FOLLOWER." << std::endl;
                    current_state = PaxosNodeState::FOLLOWER;
                }
                reset_election_timeout(); // 收到有效Prepare,重置超时
    
                // Phase 1b: Send Promise
                network_interface->send_message(msg.sender_id,
                                                std::make_unique<PromiseMessage>(my_id, msg.sender_id,
                                                                                 msg.proposal_num,
                                                                                 accepted_proposal_num, accepted_value));
            } else {
                std::cout << "Node " << my_id << " rejected PREPARE from " << msg.sender_id
                          << " (higher_promised_num: " << highest_promised_num.timestamp << "." << highest_promised_num.proposer_id
                          << " vs received: " << msg.proposal_num.timestamp << "." << msg.proposal_num.proposer_id << ")" << std::endl;
                // 可以选择发送拒绝消息,或直接忽略
            }
        }
    
        void handle_promise(const PromiseMessage& msg) {
            std::lock_guard<std::mutex> lock(mtx);
            if (current_state != PaxosNodeState::CANDIDATE || !(msg.proposal_num == current_proposal_num)) {
                std::cout << "Node " << my_id << " ignored PROMISE from " << msg.sender_id
                          << " (not CANDIDATE or proposal mismatch)." << std::endl;
                return; // 忽略过期的Promise或非当前选举的Promise
            }
    
            // 只处理相同提案编号的Promise
            if (received_promises.find(msg.sender_id) == received_promises.end()) {
                received_promises[msg.sender_id] = msg;
                promise_count++;
                std::cout << "Node " << my_id << " received PROMISE from " << msg.sender_id
                          << ". Total promises: " << promise_count << std::endl;
    
                // 检查是否获得多数承诺
                if (promise_count > peer_ids.size() / 2) {
                    std::cout << "Node " << my_id << " received majority promises (" << promise_count << "/" << peer_ids.size() << ")!" << std::endl;
                    // Phase 2a: Send Accept message
                    NodeId value_to_propose = my_id; // 默认提议自己作为领导者
                    std::optional<ProposalNumber> max_accepted_proposal_num = std::nullopt;
    
                    // 找出所有Promise中,提案编号最高的已接受值
                    for (const auto& pair : received_promises) {
                        if (pair.second.accepted_proposal_num && pair.second.accepted_value) {
                            if (!max_accepted_proposal_num || *pair.second.accepted_proposal_num > *max_accepted_proposal_num) {
                                max_accepted_proposal_num = pair.second.accepted_proposal_num;
                                value_to_propose = *pair.second.accepted_value;
                            }
                        }
                    }
    
                    // 发送Accept消息
                    std::cout << "Node " << my_id << " proposing value: " << value_to_propose << std::endl;
                    for (NodeId peer_id : peer_ids) {
                        if (peer_id == my_id) continue;
                        network_interface->send_message(peer_id,
                                                        std::make_unique<AcceptMessage>(my_id, peer_id,
                                                                                        current_proposal_num, value_to_propose));
                    }
                    // 自身也接受这个值
                    accepted_proposal_num = current_proposal_num;
                    accepted_value = value_to_propose;
                    decided_leader_id = value_to_propose; // 暂时认为自己已经决定
                    current_leader_id = value_to_propose; // 更新当前领导者
                }
            }
        }
    
        void handle_accept(const AcceptMessage& msg) {
            std::lock_guard<std::mutex> lock(mtx);
            // Acceptor 逻辑:如果提案编号大于或等于已承诺的最高提案编号
            if (msg.proposal_num >= highest_promised_num) {
                highest_promised_num = msg.proposal_num; // 更新承诺
                accepted_proposal_num = msg.proposal_num; // 接受此提案
                accepted_value = msg.value; // 接受此值
    
                std::cout << "Node " << my_id << " accepted ACCEPT from " << msg.sender_id
                          << " for proposal " << msg.proposal_num.timestamp << "." << msg.proposal_num.proposer_id
                          << " with value " << msg.value << std::endl;
                reset_election_timeout(); // 收到有效Accept,重置超时
    
                // Phase 2b: Send Accepted
                network_interface->send_message(msg.sender_id,
                                                std::make_unique<AcceptedMessage>(my_id, msg.sender_id,
                                                                                  msg.proposal_num, msg.value));
                // 同时,将这个接受结果广播给所有Learner (包括自己)
                // 实际中可能由Proposer在收到多数Accepted后广播,这里简化为Acceptor广播
                network_interface->broadcast_message(std::make_unique<AcceptedMessage>(my_id, -1,
                                                                                         msg.proposal_num, msg.value),
                                                     {my_id}); // 排除自己,因为自己已经处理
                decided_leader_id = msg.value; // 学习到新的领导者
                current_leader_id = msg.value; // 更新当前领导者
            } else {
                std::cout << "Node " << my_id << " rejected ACCEPT from " << msg.sender_id
                          << " (higher_promised_num: " << highest_promised_num.timestamp << "." << highest_promised_num.proposer_id
                          << " vs received: " << msg.proposal_num.timestamp << "." << msg.proposal_num.proposer_id << ")" << std::endl;
            }
        }
    
        void handle_accepted(const AcceptedMessage& msg) {
            std::lock_guard<std::mutex> lock(mtx);
            // Learner 逻辑:学习被接受的值
            if (decided_leader_id != msg.value) { // 如果是新的决定
                decided_leader_id = msg.value;
                current_leader_id = msg.value; // 更新当前领导者
    
                std::cout << "Node " << my_id << " learned leader is: " << decided_leader_id << std::endl;
    
                // 如果自己是被选定的领导者,则转换状态
                if (decided_leader_id == my_id && current_state != PaxosNodeState::LEADER) {
                    std::cout << "Node " << my_id << " elected as LEADER!" << std::endl;
                    current_state = PaxosNodeState::LEADER;
                    last_heartbeat_time = std::chrono::steady_clock::now(); // 立即发送心跳
                } else if (decided_leader_id != my_id && current_state != PaxosNodeState::FOLLOWER) {
                    std::cout << "Node " << my_id << " is now FOLLOWER to leader " << decided_leader_id << std::endl;
                    current_state = PaxosNodeState::FOLLOWER;
                    reset_election_timeout(); // 学习到领导者后重置选举超时
                }
            }
        }
    
        void handle_heartbeat(const HeartbeatMessage& msg) {
            std::lock_guard<std::mutex> lock(mtx);
            // 如果心跳的任期更高,或者心跳来自当前领导者
            if (msg.term >= current_term || (msg.leader_id == current_leader_id && current_leader_id != 0)) {
                if (msg.term > current_term) {
                    current_term = msg.term;
                }
                current_leader_id = msg.leader_id;
                decided_leader_id = msg.leader_id; // 学习到新的领导者
                current_state = PaxosNodeState::FOLLOWER; // 收到心跳,退回跟随者
                reset_election_timeout(); // 重置选举超时
                // std::cout << "Node " << my_id << " received heartbeat from leader " << msg.leader_id
                //           << " (term " << msg.term << "), resetting timeout." << std::endl;
            } else {
                // 收到过期心跳,忽略
                // std::cout << "Node " << my_id << " ignored old heartbeat from " << msg.sender_id
                //           << " (term " << msg.term << " vs current " << current_term << ")." << std::endl;
            }
        }
    
        void send_heartbeat() {
            std::lock_guard<std::mutex> lock(mtx);
            if (current_state == PaxosNodeState::LEADER) {
                // std::cout << "Node " << my_id << " (LEADER) sending heartbeat (term " << current_term << ")." << std::endl;
                network_interface->broadcast_message(std::make_unique<HeartbeatMessage>(my_id, -1, my_id, current_term),
                                                     {my_id});
            }
        }
    };

异步网络与消息处理

在实际系统中,NetworkInterface 会是一个复杂的组件,可能基于TCP/UDP、gRPC、ZeroMQ等实现。为了简化示例,这里我们模拟一个简单的网络接口,通过队列进行消息传递。

// 模拟网络环境,所有节点共享一个消息队列
class MockNetworkInterface : public NetworkInterface {
private:
    NodeId my_id;
    std::map<NodeId, std::queue<std::unique_ptr<BaseMessage>>>* global_message_queues;
    std::mutex* global_queue_mtx;
    std::condition_variable* global_queue_cv;

public:
    MockNetworkInterface(NodeId id, std::map<NodeId, std::queue<std::unique_ptr<BaseMessage>>>* queues,
                         std::mutex* mtx, std::condition_variable* cv)
        : my_id(id), global_message_queues(queues), global_queue_mtx(mtx), global_queue_cv(cv) {}

    void send_message(NodeId receiver_id, std::unique_ptr<BaseMessage> msg) override {
        // 模拟消息延迟
        std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 20 + 10)); // 10-30ms延迟

        std::unique_lock<std::mutex> lock(*global_queue_mtx);
        if (global_message_queues->count(receiver_id)) {
            msg->receiver_id = receiver_id; // 确保接收者ID正确
            global_message_queues->at(receiver_id).push(std::move(msg));
            global_queue_cv->notify_all(); // 通知所有节点,可能有新消息
        } else {
            std::cerr << "MockNetwork: Receiver " << receiver_id << " not found for message from " << my_id << std::endl;
        }
    }

    void broadcast_message(std::unique_ptr<BaseMessage> msg, const std::set<NodeId>& exclude_nodes) override {
        // 模拟消息延迟
        std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 20 + 10)); // 10-30ms延迟

        std::unique_lock<std::mutex> lock(*global_queue_mtx);
        for (auto const& [node_id, queue] : *global_message_queues) {
            if (node_id == my_id || exclude_nodes.count(node_id)) continue;

            // 复制消息发送给每个接收者
            std::unique_ptr<BaseMessage> msg_copy;
            // 根据消息类型创建副本
            if (msg->type == MessageType::HEARTBEAT) {
                const HeartbeatMessage& hb_msg = static_cast<const HeartbeatMessage&>(*msg);
                msg_copy = std::make_unique<HeartbeatMessage>(hb_msg.sender_id, node_id, hb_msg.leader_id, hb_msg.term);
            } else if (msg->type == MessageType::ACCEPTED) {
                const AcceptedMessage& acc_msg = static_cast<const AcceptedMessage&>(*msg);
                msg_copy = std::make_unique<AcceptedMessage>(acc_msg.sender_id, node_id, acc_msg.proposal_num, acc_msg.value);
            }
            // ... 其他消息类型也需要复制

            if (msg_copy) {
                global_message_queues->at(node_id).push(std::move(msg_copy));
            }
        }
        global_queue_cv->notify_all();
    }

    std::unique_ptr<BaseMessage> receive_message() override {
        std::unique_lock<std::mutex> lock(*global_queue_mtx);
        global_queue_cv->wait(lock, [this]{
            return global_message_queues->count(my_id) && !global_message_queues->at(my_id).empty();
        });

        if (global_message_queues->count(my_id) && !global_message_queues->at(my_id).empty()) {
            std::unique_ptr<BaseMessage> msg = std::move(global_message_queues->at(my_id).front());
            global_message_queues->at(my_id).pop();
            return msg;
        }
        return nullptr;
    }
};

// Main函数用于模拟集群运行
int main() {
    std::srand(std::time(0)); // 初始化随机数种子

    const int num_nodes = 5;
    std::set<NodeId> all_node_ids;
    for (int i = 1; i <= num_nodes; ++i) {
        all_node_ids.insert(i);
    }

    std::map<NodeId, std::queue<std::unique_ptr<BaseMessage>>> global_message_queues;
    std::mutex global_queue_mtx;
    std::condition_variable global_queue_cv;

    std::vector<std::unique_ptr<PaxosNode>> nodes;
    std::vector<std::thread> node_message_processing_threads;

    for (int i = 1; i <= num_nodes; ++i) {
        global_message_queues[i] = std::queue<std::unique_ptr<BaseMessage>>(); // 为每个节点创建消息队列
        std::unique_ptr<MockNetworkInterface> net_if = std::make_unique<MockNetworkInterface>(
            i, &global_message_queues, &global_queue_mtx, &global_queue_cv);
        nodes.push_back(std::make_unique<PaxosNode>(i, all_node_ids, std::move(net_if)));
    }

    for (auto& node : nodes) {
        node->start();
        // 每个节点启动一个线程来处理其消息队列
        node_message_processing_threads.emplace_back([&node]{
            while(true) { // 持续处理消息
                std::unique_ptr<BaseMessage> msg = node->network_interface->receive_message();
                if (msg) {
                    node->handle_message(std::move(msg));
                }
                // std::this_thread::sleep_for(std::chrono::milliseconds(5)); // 避免忙等
            }
        });
    }

    // 运行一段时间,观察选主过程
    std::this_thread::sleep_for(std::chrono::seconds(10));

    // 模拟节点故障 (例如,节点3崩溃)
    std::cout << "n--- Simulating Node 3 failure ---n" << std::endl;
    nodes[2]->stop(); // 节点ID为3的节点在vector中索引为2
    node_message_processing_threads[2].join(); // 等待其消息处理线程结束

    std::this_thread::sleep_for(std::chrono::seconds(10)); // 观察新一轮选举

    std::cout << "n--- Simulating Node 1 failure ---n" << std::endl;
    nodes[0]->stop();
    node_message_processing_threads[0].join();

    std::this_thread::sleep_for(std::chrono::seconds(10)); // 观察在仅剩多数节点时的选举

    // 停止所有节点
    for (size_t i = 0; i < nodes.size(); ++i) {
        if (nodes[i]) { // 确保节点仍在运行
            nodes[i]->stop();
            if (node_message_processing_threads[i].joinable()) {
                node_message_processing_threads[i].join();
            }
        }
    }

    return 0;
}

代码解析:

  1. ProposalNumber 结构体: 使用 long long timestampNodeId proposer_id 组合确保提案编号的全局唯一性和单调递增性。时间戳保证了新提案的编号通常更大,proposer_id 解决了同一时间戳下多个Proposer的问题。
  2. BaseMessage 及其派生类: 定义了不同Paxos阶段所需的消息结构,便于序列化和反序列化(在实际网络中)。
  3. NetworkInterface 抽象类和 MockNetworkInterface 模拟了网络通信,包括消息发送、广播和接收,并加入了随机延迟以模拟真实网络环境。global_message_queues 模拟了所有节点的信箱。
  4. PaxosNode 类:
    • 状态管理: current_state (FOLLOWER, CANDIDATE, LEADER) 驱动着节点的行为。
    • Proposer 状态: current_proposal_num, received_promises, promise_count 用于跟踪选举进程。
    • Acceptor 状态: highest_promised_num, accepted_proposal_num, accepted_value 维护了Acceptor的承诺和接受记录。
    • Learner 状态: decided_leader_id 存储了当前学习到的领导者。
    • 异步机制:
      • timer_thread:负责管理心跳发送和选举超时检测。
      • receiver_thread:专门负责从网络接口接收消息,并将其放入内部队列。
      • message_queue:用于缓冲接收到的消息,通过 cv_message 通知主逻辑线程处理。
      • handle_message 方法是消息处理的入口,根据消息类型分发到具体的处理函数。
    • Paxos逻辑: start_election, handle_prepare, handle_promise, handle_accept, handle_accepted, handle_heartbeat 等方法严格按照Paxos算法的两个阶段和角色职责实现。
    • 超时与随机化: reset_election_timeout 在FOLLOWER状态下使用随机超时,以减少活锁的概率。
    • 线程安全: 使用 std::mutex 保护所有共享状态,确保并发访问的正确性。

状态转换示例

| 当前状态 | 事件/消息 | 条件 “`cpp

include

#include <vector>
#include <string>
#include <map>
#include <set>
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <optional>
#include <random>
#include <algorithm> // For std::max and std::min

// 节点ID
using NodeId = int;

// 提案编号,由时间戳和节点ID组成,保证全局唯一且单调递增
// 使用 std::chrono::system_clock::now().time_since_epoch().count() 获取毫秒级时间戳
struct ProposalNumber {
    long long timestamp;
    NodeId proposer_id;

    // 重载比较运算符以支持排序和比较
    bool operator<(const ProposalNumber& other) const {
        if (timestamp != other.timestamp) {
            return timestamp < other.timestamp;
        }
        return proposer_id < other.proposer_id;
    }

    bool operator==(const ProposalNumber& other) const {
        return timestamp == other.timestamp && proposer_id == other.proposer_id;
    }

    bool operator<=(const ProposalNumber& other) const {
        return *this < other || *this == other;
    }

    bool operator>(const ProposalNumber& other) const {
        return !(*this <= other);
    }

    bool operator>=(const ProposalNumber& other) const {
        return !(*this < other);
    }
};

// 为 std::map 和 std::set 使用 ProposalNumber 作为键提供 hash 支持 (可选,但推荐)
namespace std {
    template <> struct hash<ProposalNumber> {
        size_t operator()(const ProposalNumber& p) const {
            // 简单的组合哈希,确保不同 ProposalNumber 有不同哈希值
            return hash<long long>()(p.timestamp) ^ (hash<NodeId>()(p.proposer_id) << 1);
        }
    };
}

// Paxos 消息类型
enum class MessageType {
    PREPARE,
    PROMISE,
    ACCEPT,
    ACCEPTED,
    HEARTBEAT // 用于领导者维持心跳
};

// 所有消息的基类
struct BaseMessage {
    MessageType type;
    NodeId sender_id;
    NodeId receiver_id; // 接收者ID,如果是广播则可能为-1或特殊值

    BaseMessage(MessageType t, NodeId s, NodeId r) : type(t), sender_id(s), receiver_id(r) {}
    virtual ~BaseMessage() = default; // 虚析构函数,确保正确释放派生类资源
    virtual std::unique_ptr<BaseMessage> clone() const = 0; // 用于消息复制
};

// Prepare 消息
struct PrepareMessage : public BaseMessage {
    ProposalNumber proposal_num;

    PrepareMessage(NodeId s, NodeId r, ProposalNumber pn)
        : BaseMessage(MessageType::PREPARE, s, r), proposal_num(pn) {}

    std::unique_ptr<BaseMessage> clone() const override {
        return std::make_unique<PrepareMessage>(*this);
    }
};

// Promise 消息
struct PromiseMessage : public BaseMessage {
    ProposalNumber proposal_num; // 承诺的提案编号
    std::optional<ProposalNumber> accepted_proposal_num; // 如果 Acceptor 之前接受过值,则为该值的提案编号
    std::optional<NodeId> accepted_value; // 如果 Acceptor 之前接受过值,则为该值 (leader ID)

    PromiseMessage(NodeId s, NodeId r, ProposalNumber pn,
                   std::optional<ProposalNumber> apn = std::nullopt,
                   std::optional<NodeId> av = std::nullopt)
        : BaseMessage(MessageType::PROMISE, s, r),
          proposal_num(pn), accepted_proposal_num(apn), accepted_value(av) {}

    std::unique_ptr<BaseMessage> clone() const override {
        return std::make_unique<PromiseMessage>(*this);
    }
};

// Accept 消息
struct AcceptMessage : public BaseMessage {
    ProposalNumber proposal_num; // 提议的提案编号
    NodeId value; // 提议的值 (leader ID)

    AcceptMessage(NodeId s, NodeId r, ProposalNumber pn, NodeId v)
        : BaseMessage(MessageType::ACCEPT, s, r), proposal_num(pn), value(v) {}

    std::unique_ptr<BaseMessage> clone() const override {
        return std::make_unique<AcceptMessage>(*this);
    }
};

// Accepted 消息
struct AcceptedMessage : public BaseMessage {
    ProposalNumber proposal_num; // 已接受的提案编号
    NodeId value; // 已接受的值 (leader ID)

    AcceptedMessage(NodeId s, NodeId r, ProposalNumber pn, NodeId v)
        : BaseMessage(MessageType::ACCEPTED, s, r), proposal_num(pn), value(v) {}

    std::unique_ptr<BaseMessage> clone() const override {
        return std::make_unique<AcceptedMessage>(*this);
    }
};

// Heartbeat 消息
struct HeartbeatMessage : public BaseMessage {
    NodeId leader_id; // 当前领导者ID
    long long term; // 领导者任期,用于区分过期的心跳

    HeartbeatMessage(NodeId s, NodeId r, NodeId lid, long long t)
        : BaseMessage(MessageType::HEARTBEAT, s, r), leader_id(lid), term(t) {}

    std::unique_ptr<BaseMessage> clone() const override {
        return std::make_unique<HeartbeatMessage>(*this);
    }
};

// 抽象网络接口,用于发送和接收消息
class NetworkInterface {
public:
    virtual ~NetworkInterface() = default;
    virtual void send_message(NodeId receiver_id, std::unique_ptr<BaseMessage> msg) = 0;
    virtual void broadcast_message(std::unique_ptr<BaseMessage> msg, const std::set<NodeId>& exclude_nodes) = 0;
    // 模拟消息接收,实际中通常通过回调或队列实现
    virtual std::unique_ptr<BaseMessage> receive_message() = 0;
};

// PaxosNode 的状态
enum class PaxosNodeState {
    FOLLOWER,   // 跟随者,等待领导者心跳或发起选举
    CANDIDATE,  // 候选者,正在尝试成为领导者 (Proposer 角色)
    LEADER      // 领导者,负责发送心跳和协调
};

// Paxos 节点的实现
class PaxosNode {
private:
    NodeId my_id;
    std::set<NodeId> peer_ids; // 集群中所有其他节点的ID,包括自己
    std::unique_ptr<NetworkInterface> network_interface;

    PaxosNodeState current_state;
    NodeId current_leader_id; // 当前已知的领导者ID
    long long current_term;   // 当前领导者任期,用于领导者心跳和选举的上下文

    // Proposer 状态 (用于 CANDIDATE 状态)
    ProposalNumber current_proposal_num; // 当前 Proposer 使用的提案编号
    std::map<NodeId, PromiseMessage> received_promises; // 存储收到的 Promise 响应
    int promise_count; // 收到承诺的数量,用于判断是否达到多数

    // Acceptor 状态 (所有节点都扮演 Acceptor 角色)
    ProposalNumber highest_promised_num; // 接受者已承诺的最高提案编号
    std::optional<ProposalNumber> accepted_proposal_num; // 接受者已接受的提案编号
    std::optional<NodeId> accepted_value; // 接受者已接受的值 (leader ID)

    // Learner 状态 (所有节点都扮演 Learner 角色)
    // NodeId decided_leader_id; // 最终确认的领导者ID,这里合并到 current_leader_id

    // 选举超时计时器相关
    std::chrono::steady_clock::time_point last_heartbeat_time; // 上次收到心跳或发送心跳的时间
    std::chrono::milliseconds election_timeout_min; // 选举超时最小随机值
    std::chrono::milliseconds election_timeout_max; // 选举超时最大随机值
    std::chrono::milliseconds heartbeat_interval;   // 领导者心跳间隔

    // 线程与同步
    std::unique_ptr<std::thread> timer_thread;     // 计时器线程
    std::unique_ptr<std::thread> receiver_thread;  // 消息接收线程
    std::mutex mtx;                                // 保护节点内部状态的互斥锁
    std::condition_variable cv_timer;               // 用于计时器线程等待
    std::condition_variable cv_message;             // 用于消息处理线程等待新消息
    bool running;                                   // 节点运行标志
    std::queue<std::unique_ptr<BaseMessage>> message_queue; // 异步消息队列

    std::default_random_engine random_engine;       // 随机数生成器,用于随机化选举超时

    // 生成新的提案编号
    void generate_new_proposal_number() {
        // 使用当前系统时间作为时间戳,结合节点ID确保唯一性
        current_proposal_num = {std::chrono::duration_cast<std::chrono::milliseconds>(
                                    std::chrono::system_clock::now().time_since_epoch()).count(),
                                my_id};
        std::cout << "[Node " << my_id << "] Generated new proposal number: "
                  << current_proposal_num.timestamp << "." << current_proposal_num.proposer_id << std::endl;
    }

    // 重置选举超时
    void reset_election_timeout() {
        last_heartbeat_time = std::chrono::steady_clock::now();
        // 在 FOLLOWER 状态下,为了避免活锁,每次超时后随机化下一次的超时时间
        if (current_state == PaxosNodeState::FOLLOWER) {
            std::uniform_int_distribution<int> dist(election_timeout_min.count(), election_timeout_max.count());
            // 更新 election_timeout_min 以存储本次随机出的超时值
            election_timeout_min = std::chrono::milliseconds(dist(random_engine));
        }
        // std::cout << "[Node " << my_id << "] Reset election timeout, next timeout in "
        //           << election_timeout_min.count() << "ms." << std::endl;
    }

    // 计时器循环,处理心跳和选举超时
    void timer_loop() {
        while (running) {
            std::unique_lock<std::mutex> lock(mtx);
            auto now = std::chrono::steady_clock::now();
            std::chrono::milliseconds time_since_last_event =
                std::chrono::duration_cast<std::chrono::milliseconds>(now - last_heartbeat_time);

            if (current_state == PaxosNodeState::LEADER) {
                // 领导者定期发送心跳
                if (time_since_last_event >= heartbeat_interval) {
                    lock.unlock(); // 释放锁,防止发送消息时阻塞其他操作
                    send_heartbeat();
                    lock.lock(); // 重新获取锁
                    last_heartbeat_time = std::chrono::steady_clock::now(); // 重置心跳计时
                }
            } else if (current_state == PaxosNodeState::FOLLOWER || current_state == PaxosNodeState::CANDIDATE) {
                // 跟随者和候选者等待领导者心跳或进行选举
                if (time_since_last_event >= election_timeout_min) {
                    std::cout << "[Node " << my_id << "] Election timeout! Current state: "
                              << (current_state == PaxosNodeState::FOLLOWER ? "FOLLOWER" : "CANDIDATE")
                              << ". Starting new election." << std::endl;
                    start_election(); // 发起新的选举
                    reset_election_timeout(); // 每次超时后重置计时器
                }
            }
            // 等待下一个检查点,避免忙等
            cv_timer.wait_for(lock, std::chrono::milliseconds(50));
        }
    }

    // 消息接收循环,将网络消息放入内部队列
    void receiver_loop() {
        while (running) {
            std::unique_ptr<BaseMessage> msg = network_interface->receive_message();
            if (msg) {
                std::unique_lock<std::mutex> lock(mtx);
                message_queue.push(std::move(msg));
                cv_message.notify_one(); // 通知主线程有新消息
            }
            // 避免忙等,适当休眠
            std::this_thread::sleep_for(std::chrono::milliseconds(5));
        }
    }

    // 主消息处理循环(可以由主线程或单独的worker线程调用)
    void process_message_queue() {
        while (running) {
            std::unique_lock<std::mutex> lock(mtx);
            // 等待队列非空或节点停止
            cv_message.wait(lock, [this]{ return !message_queue.empty() || !running; });
            if (!running) break;

            while (!message_queue.empty()) {
                std::unique_ptr<BaseMessage> msg = std::move(message_queue.front());
                message_queue.pop();
                lock.unlock(); // 处理消息时释放锁,允许其他线程访问共享资源
                handle_message_internal(std::move(msg)); // 调用实际的消息处理逻辑
                lock.lock(); // 处理完消息后重新获取锁
            }
        }
    }

    // 内部消息处理函数,用于状态保护
    void handle_message_internal(std::unique_ptr<BaseMessage> msg) {
        std::lock_guard<std::mutex> lock(mtx); // 保护内部状态
        // std::cout << "[Node " << my_id << "] Received message from " << msg->sender_id << " of type "
        //           << static_cast<int>(msg->type) << std::endl;

        switch (msg->type) {
            case MessageType::PREPARE:
                handle_prepare(static_cast<PrepareMessage&>(*msg));
                break;
            case MessageType::PROMISE:
                handle_promise(static_cast<PromiseMessage&>(*msg));
                break;
            case MessageType::ACCEPT:
                handle_accept(static_cast<AcceptMessage&>(*msg));
                break;
            case MessageType::ACCEPTED:
                handle_accepted(static_cast<AcceptedMessage&>(*msg));
                break;
            case MessageType::HEARTBEAT:
                handle_heartbeat(static_cast<HeartbeatMessage&>(*msg));
                break;
            default:
                std::cerr << "[Node " << my_id << "] Received unknown message type." << std::endl;
        }
    }

public:
    // 构造函数
    PaxosNode(NodeId id, const std::set<NodeId>& peers, std::unique_ptr<NetworkInterface> net_if)
        : my_id(id), peer_ids(peers), network_interface(std::move(net_if)),
          current_state(PaxosNodeState::FOLLOWER),
          current_leader_id(0), // 初始未知领导者
          current_term(0),      // 初始任期为0
          promise_count(0),
          highest_promised_num({0, 0}), // 初始为最小提案编号
          running(false),
          election_timeout_min(150), // 默认最小选举超时
          election_timeout_max(300), // 默认最大选举超时
          heartbeat_interval(50),    // 默认心跳间隔
          random_engine(std::chrono::system_clock::now().time_since_epoch().count() + id) { // 使用时间+ID作为种子
        reset_election_timeout(); // 初始化选举超时
    }

    // 析构函数,确保线程正确停止
    ~PaxosNode() {
        stop();
    }

    // 启动节点
    void start() {
        if (running) return;
        running = true;
        timer_thread = std::make_unique<std::thread>(&PaxosNode::timer_loop, this);
        receiver_thread = std::make_unique<std::thread>(&PaxosNode::receiver_loop, this);
        // 启动一个单独的线程来处理消息队列,避免阻塞主线程或计时器线程
        std::thread msg_processor_thread(&PaxosNode::process_message_queue, this);
        msg_processor_thread.detach(); // 让其后台运行
        std::cout << "[Node " << my_id << "] Started." << std::endl;
    }

    // 停止节点
    void stop() {
        if (!running) return;
        running = false;
        cv_timer.notify_all();     // 唤醒计时器线程
        cv_message.notify_all();   // 唤醒消息处理线程
        if (timer_thread && timer_thread->joinable()) {
            timer_thread->join(); // 等待计时器线程结束
        }
        if (receiver_thread && receiver_thread->joinable()) {
            receiver_thread->join(); // 等待消息接收线程结束
        }
        std::cout << "[Node " << my_id << "] Stopped." << std::endl;
    }

    // 获取当前状态 (用于外部查询)
    PaxosNodeState get_state() const {
        std::lock_guard<std::mutex> lock(mtx);
        return current_state;
    }

    // 获取当前领导者ID (用于外部查询)
    NodeId get_leader_id() const {
        std::lock_guard<std::mutex> lock(mtx);
        return current_leader_id;
    }

private:
    // Paxos Proposer 逻辑
    void start_election() {
        std::lock_guard<std::mutex> lock(mtx); // 保护状态变更
        current_state = PaxosNodeState

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注